diff --git a/isaaclab_arena/agentic_environment_generation/environment_generation_agent.py b/isaaclab_arena/agentic_environment_generation/environment_generation_agent.py
index b496f0cdc..522b44d53 100644
--- a/isaaclab_arena/agentic_environment_generation/environment_generation_agent.py
+++ b/isaaclab_arena/agentic_environment_generation/environment_generation_agent.py
@@ -250,6 +250,33 @@ def generate_spec(
A ``(EnvironmentIntentSpec, raw_response)`` tuple. The raw text is
useful for debugging.
"""
+ data, text = self.fetch_intent_from_prompt(
+ prompt,
+ asset_catalog=asset_catalog,
+ relation_catalog=relation_catalog,
+ task_catalog=task_catalog,
+ temperature=temperature,
+ max_tokens=max_tokens,
+ max_retries=max_retries,
+ )
+ spec = EnvironmentIntentSpec.model_validate(data)
+ return spec, text
+
+ def fetch_intent_from_prompt(
+ self,
+ prompt: str,
+ asset_catalog: AssetCatalogue | None = None,
+ relation_catalog: RelationCatalogue | None = None,
+ task_catalog: TaskCatalogue | None = None,
+ temperature: float = 0.2,
+ max_tokens: int = 4096,
+ max_retries: int = 3,
+ ) -> tuple[dict[str, Any], str]:
+ """Call the model and return parsed intent JSON without registry validation.
+
+ Registry-backed :class:`EnvironmentIntentSpec` validation should run in a
+ SimApp process (e.g. the review GUI sidecar) where registries are warm.
+ """
asset_catalog = asset_catalog or build_asset_catalogue()
relation_catalog = relation_catalog or build_relation_catalogue()
task_catalog = task_catalog or build_task_catalogue()
@@ -268,7 +295,7 @@ def generate_spec(
last_exc: Exception | None = None
for attempt in range(1 + max_retries):
if attempt > 0:
- print(f"[generate_spec] retry {attempt}/{max_retries} after: {last_exc}", flush=True)
+ print(f"[fetch_intent_from_prompt] retry {attempt}/{max_retries} after: {last_exc}", flush=True)
try:
resp = self.client.chat.completions.create(
@@ -299,8 +326,7 @@ def generate_spec(
# (e.g. literal tabs) inside JSON strings — DeepSeek-v4-flash is known
# to emit these.
data = json.loads(text, strict=False)
- spec = EnvironmentIntentSpec.model_validate(data)
- return spec, text
+ return data, text
except Exception as exc:
last_exc = exc
diff --git a/isaaclab_arena/assets/object.py b/isaaclab_arena/assets/object.py
index e2a92af36..40e9d6089 100644
--- a/isaaclab_arena/assets/object.py
+++ b/isaaclab_arena/assets/object.py
@@ -11,12 +11,9 @@
from isaaclab.sim.spawners.spawner_cfg import SpawnerCfg
from isaaclab_arena.assets.object_base import ObjectBase, ObjectType
-from isaaclab_arena.assets.object_utils import detect_object_type
from isaaclab_arena.relations.relations import RelationBase
from isaaclab_arena.utils.bounding_box import AxisAlignedBoundingBox, quaternion_to_90_deg_z_quarters
from isaaclab_arena.utils.pose import Pose
-from isaaclab_arena.utils.usd.rigid_bodies import find_shallowest_rigid_body
-from isaaclab_arena.utils.usd_helpers import compute_local_bounding_box_from_usd, has_light, open_stage
class Object(ObjectBase):
@@ -49,6 +46,8 @@ def __init__(
"object_type is None (indicating auto-detect) but usd_path is also None. usd_path is required to detect"
" object type"
)
+ from isaaclab_arena.assets.object_utils import detect_object_type
+
object_type = detect_object_type(usd_path=usd_path)
super().__init__(name=name, prim_path=prim_path, object_type=object_type, **kwargs)
self.usd_path = usd_path
@@ -69,6 +68,8 @@ def add_relation(self, relation: RelationBase) -> None:
def get_bounding_box(self) -> AxisAlignedBoundingBox:
"""Get local bounding box (relative to object origin)."""
+ from isaaclab_arena.utils.usd_helpers import compute_local_bounding_box_from_usd
+
assert self.usd_path is not None
if self.bounding_box is None:
self.bounding_box = compute_local_bounding_box_from_usd(self.usd_path, self.scale)
@@ -88,6 +89,8 @@ def get_world_bounding_box(self) -> AxisAlignedBoundingBox:
return local_bbox.rotated_90_around_z(quarters).translated(self.initial_pose.position_xyz)
def get_corners(self, pos: torch.Tensor) -> torch.Tensor:
+ from isaaclab_arena.utils.usd_helpers import compute_local_bounding_box_from_usd
+
assert self.usd_path is not None
if self.bounding_box is None:
self.bounding_box = compute_local_bounding_box_from_usd(self.usd_path, self.scale)
@@ -107,6 +110,8 @@ def enable_reset_pose(self) -> None:
def get_contact_sensor_cfg(
self, contact_against_object: ObjectBase | None = None, usd_path: str | None = None
) -> ContactSensorCfg:
+ from isaaclab_arena.utils.usd.rigid_bodies import find_shallowest_rigid_body
+
assert self.object_type == ObjectType.RIGID, "Contact sensor is only supported for rigid objects"
# We override this function from the parent class because in some assets, the rigid body
# is not at the root of the USD file. To be robust to this, we find the shallowest rigid body
@@ -177,6 +182,8 @@ def _generate_articulation_cfg(self) -> ArticulationCfg:
return self._add_initial_pose_to_cfg(object_cfg)
def _generate_base_cfg(self) -> AssetBaseCfg:
+ from isaaclab_arena.utils.usd_helpers import has_light, open_stage
+
assert self.object_type == ObjectType.BASE
if self.spawner_cfg is None:
with open_stage(self.usd_path) as stage:
diff --git a/isaaclab_arena/assets/object_library.py b/isaaclab_arena/assets/object_library.py
index e7bf8c82a..ad565e2ac 100644
--- a/isaaclab_arena/assets/object_library.py
+++ b/isaaclab_arena/assets/object_library.py
@@ -22,7 +22,7 @@
from isaaclab_arena.assets.lightwheel_lazy import LightwheelLazyPath
from isaaclab_arena.assets.object import Object
from isaaclab_arena.assets.object_base import ObjectType
-from isaaclab_arena.assets.object_utils import (
+from isaaclab_arena.assets.object_spawn_defaults import (
EMPTY_ARTICULATION_INIT_STATE_CFG,
RIGID_BODY_PROPS_HIGH_PRECISION,
RIGID_BODY_PROPS_MEDIUM_PRECISION,
diff --git a/isaaclab_arena/assets/object_spawn_defaults.py b/isaaclab_arena/assets/object_spawn_defaults.py
new file mode 100644
index 000000000..4bf52e3f9
--- /dev/null
+++ b/isaaclab_arena/assets/object_spawn_defaults.py
@@ -0,0 +1,46 @@
+# Copyright (c) 2025-2026, The Isaac Lab Arena Project Developers (https://github.com/isaac-sim/IsaacLab-Arena/blob/main/CONTRIBUTORS.md).
+# All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
+"""Spawn/physics defaults for library objects (no USD / pxr imports)."""
+
+import isaaclab.sim as sim_utils
+from isaaclab.assets import ArticulationCfg
+
+# Predefined rigid body property configurations for assembly tasks
+# High iteration count for precision tasks (peg/hole insertion)
+RIGID_BODY_PROPS_HIGH_PRECISION = sim_utils.RigidBodyPropertiesCfg(
+ disable_gravity=False,
+ max_depenetration_velocity=5.0,
+ linear_damping=0.0,
+ angular_damping=0.0,
+ max_linear_velocity=1000.0,
+ max_angular_velocity=3666.0,
+ enable_gyroscopic_forces=True,
+ solver_position_iteration_count=192,
+ solver_velocity_iteration_count=1,
+ max_contact_impulse=1e32,
+)
+
+# Standard iteration count for gear mesh tasks
+RIGID_BODY_PROPS_MEDIUM_PRECISION = sim_utils.RigidBodyPropertiesCfg(
+ disable_gravity=False,
+ max_depenetration_velocity=5.0,
+ linear_damping=0.0,
+ angular_damping=0.0,
+ max_linear_velocity=1000.0,
+ max_angular_velocity=3666.0,
+ enable_gyroscopic_forces=True,
+ solver_position_iteration_count=32,
+ solver_velocity_iteration_count=32,
+ max_contact_impulse=1e32,
+)
+
+# Initial state configuration for articulations without joints (e.g., rigid bodies treated as articulations).
+# We explicitly set joint_pos and joint_vel to empty dicts to avoid the default pattern {".*": 0.0} in ArticulationCfg.InitialStateCfg,
+# which would fail to match when there are no joints in the articulation.
+EMPTY_ARTICULATION_INIT_STATE_CFG = ArticulationCfg.InitialStateCfg(
+ joint_pos={},
+ joint_vel={},
+)
diff --git a/isaaclab_arena/assets/object_utils.py b/isaaclab_arena/assets/object_utils.py
index 0b0d2c289..84445f794 100644
--- a/isaaclab_arena/assets/object_utils.py
+++ b/isaaclab_arena/assets/object_utils.py
@@ -4,15 +4,28 @@
# SPDX-License-Identifier: Apache-2.0
-import isaaclab.sim as sim_utils
-from isaaclab.assets import ArticulationCfg
-from pxr import Usd
+from typing import TYPE_CHECKING
from isaaclab_arena.assets.object_base import ObjectType
-from isaaclab_arena.utils.usd_helpers import get_prim_depth, is_articulation_root, is_rigid_body
+from isaaclab_arena.assets.object_spawn_defaults import (
+ EMPTY_ARTICULATION_INIT_STATE_CFG,
+ RIGID_BODY_PROPS_HIGH_PRECISION,
+ RIGID_BODY_PROPS_MEDIUM_PRECISION,
+)
+
+if TYPE_CHECKING:
+ from pxr import Usd
+
+# Re-export spawn defaults for backward compatibility.
+__all__ = [
+ "EMPTY_ARTICULATION_INIT_STATE_CFG",
+ "RIGID_BODY_PROPS_HIGH_PRECISION",
+ "RIGID_BODY_PROPS_MEDIUM_PRECISION",
+ "detect_object_type",
+]
-def detect_object_type(usd_path: str | None = None, stage: Usd.Stage | None = None) -> ObjectType:
+def detect_object_type(usd_path: str | None = None, stage: "Usd.Stage | None" = None) -> ObjectType:
"""Detect the object type of the asset
Goes through the USD tree and detects the object type. The detection is based
@@ -28,6 +41,10 @@ def detect_object_type(usd_path: str | None = None, stage: Usd.Stage | None = No
Returns:
The object type of the asset.
"""
+ from pxr import Usd
+
+ from isaaclab_arena.utils.usd_helpers import get_prim_depth, is_articulation_root, is_rigid_body
+
assert usd_path is not None or stage is not None, "Either usd_path or stage must be provided"
assert usd_path is None or stage is None, "Either usd_path or stage must be provided"
if usd_path is not None:
@@ -62,41 +79,3 @@ def detect_object_type(usd_path: str | None = None, stage: Usd.Stage | None = No
return ObjectType.ARTICULATION
else:
raise ValueError("This should not happen. There is an unknown USD type in the tree.")
-
-
-# Predefined rigid body property configurations for assembly tasks
-# High iteration count for precision tasks (peg/hole insertion)
-RIGID_BODY_PROPS_HIGH_PRECISION = sim_utils.RigidBodyPropertiesCfg(
- disable_gravity=False,
- max_depenetration_velocity=5.0,
- linear_damping=0.0,
- angular_damping=0.0,
- max_linear_velocity=1000.0,
- max_angular_velocity=3666.0,
- enable_gyroscopic_forces=True,
- solver_position_iteration_count=192,
- solver_velocity_iteration_count=1,
- max_contact_impulse=1e32,
-)
-
-# Standard iteration count for gear mesh tasks
-RIGID_BODY_PROPS_MEDIUM_PRECISION = sim_utils.RigidBodyPropertiesCfg(
- disable_gravity=False,
- max_depenetration_velocity=5.0,
- linear_damping=0.0,
- angular_damping=0.0,
- max_linear_velocity=1000.0,
- max_angular_velocity=3666.0,
- enable_gyroscopic_forces=True,
- solver_position_iteration_count=32,
- solver_velocity_iteration_count=32,
- max_contact_impulse=1e32,
-)
-
-# Initial state configuration for articulations without joints (e.g., rigid bodies treated as articulations).
-# We explicitly set joint_pos and joint_vel to empty dicts to avoid the default pattern {".*": 0.0} in ArticulationCfg.InitialStateCfg,
-# which would fail to match when there are no joints in the articulation.
-EMPTY_ARTICULATION_INIT_STATE_CFG = ArticulationCfg.InitialStateCfg(
- joint_pos={},
- joint_vel={},
-)
diff --git a/isaaclab_arena/environments/arena_env_graph_spec.py b/isaaclab_arena/environments/arena_env_graph_spec.py
index 0dc3d6db6..6176357f9 100644
--- a/isaaclab_arena/environments/arena_env_graph_spec.py
+++ b/isaaclab_arena/environments/arena_env_graph_spec.py
@@ -9,7 +9,7 @@
from pathlib import Path
from typing import TYPE_CHECKING, Any, Self
-from pydantic import BaseModel, Field, SerializeAsAny, field_validator, model_validator
+from pydantic import BaseModel, Field, SerializeAsAny, ValidationInfo, field_validator, model_validator
from isaaclab_arena.assets.registries import TaskRegistry
from isaaclab_arena.environments.arena_env_graph_types import (
@@ -83,6 +83,18 @@ def write_yaml(self, path: str | Path) -> None:
with Path(path).open("w", encoding="utf-8") as f:
yaml.safe_dump(self.to_dict(), f, sort_keys=False)
+ def to_yaml(self, path: str | Path) -> Path:
+ """Write this spec to ``path`` as YAML. Creates parent dirs as needed.
+
+ Returns the resolved :class:`Path` written. Symmetric with
+ :meth:`from_yaml`.
+ """
+ out_path = Path(path)
+ out_path.parent.mkdir(parents=True, exist_ok=True)
+ with out_path.open("w", encoding="utf-8") as f:
+ yaml.safe_dump(self.to_dict(), f, sort_keys=False)
+ return out_path
+
@property
def nodes_by_id(self) -> dict[str, ArenaEnvGraphNodeSpec]:
return {node.id: node for node in self.nodes}
@@ -158,8 +170,10 @@ class ArenaEnvInitialGraphSpec(ArenaEnvGraphSpecBase):
initial_state_spec: ArenaEnvGraphStateSpec
@model_validator(mode="after")
- def validate(self) -> Self:
+ def validate(self, info: ValidationInfo) -> Self:
"""Check unique IDs, constraint references, and spatial constraint shapes."""
+ if info.context and info.context.get("skip_registry"):
+ return self
assert_unique_ids(self.nodes, [], [self.initial_state_spec])
assert_constraint_references(self.nodes, [self.initial_state_spec])
assert_spatial_constraint_shapes([self.initial_state_spec])
diff --git a/isaaclab_arena/environments/arena_env_graph_types.py b/isaaclab_arena/environments/arena_env_graph_types.py
index 740f6a642..38e121175 100644
--- a/isaaclab_arena/environments/arena_env_graph_types.py
+++ b/isaaclab_arena/environments/arena_env_graph_types.py
@@ -16,7 +16,7 @@
from enum import Enum
from typing import Any
-from pydantic import BaseModel, Field, field_validator, model_validator
+from pydantic import BaseModel, Field, ValidationInfo, field_validator, model_validator
from isaaclab_arena.assets.object_type import ObjectType
from isaaclab_arena.assets.registries import ObjectRelationLibraryRegistry, TaskRegistry
@@ -115,7 +115,9 @@ class TaskSpec(BaseModel):
@field_validator("kind")
@classmethod
- def _validate_registered_task_type(cls, value: str) -> str:
+ def _validate_registered_task_type(cls, value: str, info: ValidationInfo) -> str:
+ if info.context and info.context.get("skip_registry"):
+ return value
registry = TaskRegistry()
assert registry.is_registered(value), f"Unknown task kind '{value}'"
return value
@@ -169,7 +171,9 @@ class SpatialRelationSpec(BaseModel):
)
@model_validator(mode="after")
- def _validate_kind_and_arity(self) -> SpatialRelationSpec:
+ def _validate_kind_and_arity(self, info: ValidationInfo) -> SpatialRelationSpec:
+ if info.context and info.context.get("skip_registry"):
+ return self
registry = ObjectRelationLibraryRegistry()
assert registry.is_registered(self.kind), f"Unknown relation kind '{self.kind}'"
relation_cls = registry.get_object_relation_by_name(self.kind)
diff --git a/isaaclab_arena/evaluation/eval_runner.py b/isaaclab_arena/evaluation/eval_runner.py
index c1512700d..42a82169f 100644
--- a/isaaclab_arena/evaluation/eval_runner.py
+++ b/isaaclab_arena/evaluation/eval_runner.py
@@ -5,14 +5,12 @@
import argparse
import dataclasses
-import gc
import json
import math
import os
import subprocess
import sys
import tempfile
-import torch
import traceback
from gymnasium.wrappers import RecordVideo
from pathlib import Path
@@ -24,7 +22,11 @@
from isaaclab_arena.evaluation.policy_runner import get_policy_cls, rollout_policy
from isaaclab_arena.metrics.aggregate_metrics import aggregate_metrics
from isaaclab_arena.metrics.metrics_logger import MetricsLogger
-from isaaclab_arena.utils.isaaclab_utils.simulation_app import SimulationAppContext, teardown_simulation_app
+from isaaclab_arena.utils.isaaclab_utils.simulation_app import (
+ SimulationAppContext,
+ close_env_and_reset_sim,
+ collect_garbage_and_clear_cuda_cache,
+)
from isaaclab_arena_environments.cli import get_arena_builder_from_cli, get_isaaclab_arena_environments_cli_parser
if TYPE_CHECKING:
@@ -111,31 +113,18 @@ def get_policy_from_job(job: Job) -> "PolicyBase":
return policy
-def _collect_garbage_and_clear_cuda_cache() -> None:
- gc.collect()
- if torch.cuda.is_available():
- torch.cuda.empty_cache()
-
-
def _close_policy(policy: "PolicyBase | None") -> None:
try:
if policy is not None:
policy.close()
finally:
- _collect_garbage_and_clear_cuda_cache()
+ collect_garbage_and_clear_cuda_cache()
def _close_env(env) -> None:
if env is None:
return
- try:
- teardown_simulation_app(suppress_exceptions=False, make_new_stage=True)
- finally:
- try:
- # cleanup managers, including recorder manager closing hdf5 file
- env.close()
- finally:
- _collect_garbage_and_clear_cuda_cache()
+ close_env_and_reset_sim(env)
def _close_job_resources(policy: "PolicyBase | None", env) -> None:
@@ -326,7 +315,7 @@ def main():
finally:
policy = None
env = None
- _collect_garbage_and_clear_cuda_cache()
+ collect_garbage_and_clear_cuda_cache()
# Aggregate the metrics from the different experiments into a single view.
if metrics_per_run:
diff --git a/isaaclab_arena/tasks/lift_object_task.py b/isaaclab_arena/tasks/lift_object_task.py
index 4c4461fd7..100c6f734 100644
--- a/isaaclab_arena/tasks/lift_object_task.py
+++ b/isaaclab_arena/tasks/lift_object_task.py
@@ -26,7 +26,7 @@
from isaaclab_arena.tasks.task_base import TaskBase
from isaaclab_arena.tasks.terminations import lift_object_il_success, lift_object_rl_success
from isaaclab_arena.utils.cameras import get_viewer_cfg_look_at_object
-from isaaclab_arena.utils.pose import PoseRange
+from isaaclab_arena.utils.pose import as_single_pose
@register_task
@@ -54,9 +54,7 @@ def __init__(
self.background_scene = background_scene
# Compute goal position from object's initial pose + delta
- initial_pose = lift_object.get_initial_pose()
- if isinstance(initial_pose, PoseRange):
- initial_pose = initial_pose.get_midpoint()
+ initial_pose = as_single_pose(lift_object.get_initial_pose())
# Store goal pose for success termination (IL/teleoperation uses fixed goal)
self.goal_position_xyz = (
@@ -163,9 +161,7 @@ def __init__(
self.minimum_height_to_lift = minimum_height_to_lift
# Get object's initial pose to compute absolute target ranges
- initial_pose = lift_object.get_initial_pose()
- if isinstance(initial_pose, PoseRange):
- initial_pose = initial_pose.get_midpoint()
+ initial_pose = as_single_pose(lift_object.get_initial_pose())
# Compute absolute target ranges from deltas
self.target_x_range = (
diff --git a/isaaclab_arena/tests/test_pose.py b/isaaclab_arena/tests/test_pose.py
index bba6405b1..e5b0cbe41 100644
--- a/isaaclab_arena/tests/test_pose.py
+++ b/isaaclab_arena/tests/test_pose.py
@@ -5,7 +5,7 @@
import math
-from isaaclab_arena.utils.pose import Pose, PosePerEnv, rotate_quat_by_yaw, wrap_angle_to_pi
+from isaaclab_arena.utils.pose import Pose, PosePerEnv, PoseRange, as_single_pose, rotate_quat_by_yaw, wrap_angle_to_pi
def _yaw_of(quat_xyzw: tuple[float, float, float, float]) -> float:
@@ -40,6 +40,25 @@ def test_pose_composition():
assert T_C_A.rotation_xyzw == (0.0, 0.0, 0.0, 1.0)
+def test_as_single_pose():
+ pose = Pose(position_xyz=(1.0, 2.0, 3.0))
+ assert as_single_pose(pose) is pose
+
+ pose_range = PoseRange(
+ position_xyz_min=(0.0, 0.0, 0.0),
+ position_xyz_max=(2.0, 4.0, 6.0),
+ )
+ assert as_single_pose(pose_range).position_xyz == (1.0, 2.0, 3.0)
+
+ pose_per_env = PosePerEnv(
+ poses=[
+ Pose(position_xyz=(1.0, 2.0, 3.0)),
+ Pose(position_xyz=(4.0, 5.0, 6.0)),
+ ]
+ )
+ assert as_single_pose(pose_per_env).position_xyz == (1.0, 2.0, 3.0)
+
+
def test_pose_per_env_stores_poses():
"""Test that PosePerEnv stores the list of Pose objects correctly."""
poses = [
diff --git a/isaaclab_arena/utils/cameras.py b/isaaclab_arena/utils/cameras.py
index 4567bb7e0..ddbb4c9e9 100644
--- a/isaaclab_arena/utils/cameras.py
+++ b/isaaclab_arena/utils/cameras.py
@@ -18,7 +18,7 @@
from isaaclab_arena.assets.asset import Asset
from isaaclab_arena.utils.configclass import make_configclass
-from isaaclab_arena.utils.pose import PoseRange
+from isaaclab_arena.utils.pose import as_single_pose
def make_camera_observation_cfg(
@@ -114,8 +114,7 @@ def get_viewer_cfg_look_at_object(lookat_object: Asset, offset: np.ndarray) -> V
print(f"{lookat_object.name} has no initial pose set. Using default ViewerCfg.")
return ViewerCfg()
- if isinstance(initial_pose, PoseRange):
- initial_pose = initial_pose.get_midpoint()
+ initial_pose = as_single_pose(initial_pose)
# TODO(cvolk): Add float coercion to Pose.__post_init__ so this conversion is unnecessary.
# Ensure we only pass primitive Python floats (not NumPy scalars) into ViewerCfg,
diff --git a/isaaclab_arena/utils/isaaclab_utils/simulation_app.py b/isaaclab_arena/utils/isaaclab_utils/simulation_app.py
index 02db2a0f3..7a4d4f264 100644
--- a/isaaclab_arena/utils/isaaclab_utils/simulation_app.py
+++ b/isaaclab_arena/utils/isaaclab_utils/simulation_app.py
@@ -82,6 +82,64 @@ def teardown_simulation_app(suppress_exceptions: bool = False, make_new_stage: b
omni.usd.get_context().new_stage()
+def collect_garbage_and_clear_cuda_cache() -> None:
+ """Run GC and release cached CUDA allocations after a sim env is torn down."""
+ import gc
+ import torch
+
+ gc.collect()
+ if torch.cuda.is_available():
+ torch.cuda.empty_cache()
+
+
+def close_env_and_reset_sim(
+ env=None,
+ *,
+ suppress_exceptions: bool = False,
+ make_new_stage: bool = True,
+ app=None,
+) -> None:
+ """Tear down sim state and close a gym env so another can be built in the same SimApp.
+
+ Close the gym env first so scene managers release prims while
+ :class:`~isaaclab.sim.SimulationContext` is still valid, then stop the timeline
+ and open a fresh USD stage.
+ """
+ error_manager = suppress(Exception) if suppress_exceptions else nullcontext()
+
+ with error_manager:
+ if env is not None and not getattr(env.unwrapped, "_is_closed", True):
+ env.close()
+
+ # env.close() clears the singleton, but callers may omit env or close may fail partway.
+ with error_manager:
+ from isaaclab.sim import SimulationContext
+
+ sim = SimulationContext.instance()
+ if sim is not None:
+ sim._disable_app_control_on_stop_handle = True # noqa: SLF001 (intentional private attr)
+ sim.stop()
+ sim.clear_instance()
+
+ with error_manager:
+ import omni.timeline
+
+ omni.timeline.get_timeline_interface().stop()
+
+ if make_new_stage:
+ with error_manager:
+ import omni.usd
+
+ omni.usd.get_context().new_stage()
+
+ if app is not None:
+ with error_manager:
+ for _ in range(20):
+ app.update()
+
+ collect_garbage_and_clear_cuda_cache()
+
+
def reapply_viewer_cfg(env) -> None:
"""Re-apply ViewerCfg camera position after visualizers are initialized.
diff --git a/isaaclab_arena/utils/pose.py b/isaaclab_arena/utils/pose.py
index ecc9289d6..3eb869df4 100644
--- a/isaaclab_arena/utils/pose.py
+++ b/isaaclab_arena/utils/pose.py
@@ -144,3 +144,15 @@ def get_midpoint(self) -> Pose:
position_xyz=tuple(position_xyz.tolist()),
rotation_xyzw=tuple(quat.tolist()),
)
+
+
+def as_single_pose(pose: Pose | PoseRange | PosePerEnv) -> Pose:
+ """Return one representative ``Pose`` for viewer and bbox code that needs a single point.
+
+ ``PosePerEnv`` uses env 0; ``PoseRange`` uses its midpoint.
+ """
+ if isinstance(pose, PosePerEnv):
+ return pose.poses[0]
+ if isinstance(pose, PoseRange):
+ return pose.get_midpoint()
+ return pose
diff --git a/isaaclab_arena_examples/agentic_environment_generation/review_gui/__init__.py b/isaaclab_arena_examples/agentic_environment_generation/review_gui/__init__.py
new file mode 100644
index 000000000..821e672b4
--- /dev/null
+++ b/isaaclab_arena_examples/agentic_environment_generation/review_gui/__init__.py
@@ -0,0 +1,6 @@
+# Copyright (c) 2026, The Isaac Lab Arena Project Developers (https://github.com/isaac-sim/IsaacLab-Arena/blob/main/CONTRIBUTORS.md).
+# All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
+"""Live editor and HTML dashboard for :class:`~isaaclab_arena.environments.arena_env_graph_spec.ArenaEnvInitialGraphSpec`."""
diff --git a/isaaclab_arena_examples/agentic_environment_generation/review_gui/render/__init__.py b/isaaclab_arena_examples/agentic_environment_generation/review_gui/render/__init__.py
new file mode 100644
index 000000000..2fd363882
--- /dev/null
+++ b/isaaclab_arena_examples/agentic_environment_generation/review_gui/render/__init__.py
@@ -0,0 +1,6 @@
+# Copyright (c) 2026, The Isaac Lab Arena Project Developers (https://github.com/isaac-sim/IsaacLab-Arena/blob/main/CONTRIBUTORS.md).
+# All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
+"""HTML rendering backend for the initial-graph review dashboard."""
diff --git a/isaaclab_arena_examples/agentic_environment_generation/review_gui/render/dashboard.py b/isaaclab_arena_examples/agentic_environment_generation/review_gui/render/dashboard.py
new file mode 100644
index 000000000..ee08bd731
--- /dev/null
+++ b/isaaclab_arena_examples/agentic_environment_generation/review_gui/render/dashboard.py
@@ -0,0 +1,61 @@
+# Copyright (c) 2026, The Isaac Lab Arena Project Developers (https://github.com/isaac-sim/IsaacLab-Arena/blob/main/CONTRIBUTORS.md).
+# All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
+from __future__ import annotations
+
+import html as html_lib
+
+from isaaclab_arena.environments.arena_env_graph_spec import ArenaEnvInitialGraphSpec
+from isaaclab_arena_examples.agentic_environment_generation.review_gui.render.mermaid_graph import render_mermaid_graph
+from isaaclab_arena_examples.agentic_environment_generation.review_gui.render.panels import (
+ render_node_cards,
+ render_tasks_table,
+ render_unary_constraints,
+)
+from isaaclab_arena_examples.agentic_environment_generation.review_gui.render.styles import DASHBOARD_CSS
+
+
+def render_dashboard_html(spec: ArenaEnvInitialGraphSpec, thumbnails: dict[str, bytes] | None = None) -> str:
+ """Render the self-contained review dashboard HTML for ``spec``."""
+ initial_state = spec.initial_state_spec
+ thumbnails = thumbnails or {}
+ return f"""
+
+
+
+{html_lib.escape(spec.env_name)} — graph review
+
+
+
+
+
+
+
+ Nodes
+ {render_node_cards(spec, thumbnails)}
+
+
+ Spatial graph (initial state: {html_lib.escape(initial_state.id)})
+
+
+
{render_mermaid_graph(spec, initial_state)}
+
+
+
+
+
+ Tasks
+ {render_tasks_table(spec)}
+
+
+
+
+
+"""
diff --git a/isaaclab_arena_examples/agentic_environment_generation/review_gui/render/mermaid_graph.py b/isaaclab_arena_examples/agentic_environment_generation/review_gui/render/mermaid_graph.py
new file mode 100644
index 000000000..7ec9c3fb0
--- /dev/null
+++ b/isaaclab_arena_examples/agentic_environment_generation/review_gui/render/mermaid_graph.py
@@ -0,0 +1,96 @@
+# Copyright (c) 2026, The Isaac Lab Arena Project Developers (https://github.com/isaac-sim/IsaacLab-Arena/blob/main/CONTRIBUTORS.md).
+# All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
+from __future__ import annotations
+
+import re
+
+from isaaclab_arena.environments.arena_env_graph_spec import ArenaEnvInitialGraphSpec
+from isaaclab_arena.environments.arena_env_graph_types import ArenaEnvGraphStateSpec
+
+_MERMAID_ID_SAFE = re.compile(r"[^A-Za-z0-9_]")
+
+
+def render_mermaid_graph(spec: ArenaEnvInitialGraphSpec, state: ArenaEnvGraphStateSpec) -> str:
+ """Emit a left-to-right mermaid graph of spatial and task constraints.
+
+ Binary spatial constraints (reference is set) are drawn as solid edges:
+ subject -->|kind| reference
+
+ Unary spatial constraints (no reference) are omitted from the graph and
+ listed to its right by :func:`render_unary_constraints` so their params are
+ visible.
+
+ Task constraints with a child are drawn as dashed edges:
+ parent -.->|type| child
+
+ object_reference nodes are drawn with a dotted edge to their parent node:
+ ref_node -. ref .-> parent_node
+ """
+ lines = ["graph LR"]
+
+ anchor_ids: set[str] = set()
+ edge_nodes: set[str] = set()
+
+ for constraint in state.spatial_constraints:
+ kind = constraint.kind
+ if kind == "is_anchor":
+ anchor_ids.add(constraint.subject)
+ if constraint.reference is not None:
+ lines.append(
+ f" {_mermaid_id(constraint.subject)}[{_mermaid_label(constraint.subject)}]"
+ f" -->|{kind}| "
+ f"{_mermaid_id(constraint.reference)}[{_mermaid_label(constraint.reference)}]"
+ )
+ edge_nodes.add(constraint.subject)
+ edge_nodes.add(constraint.reference)
+
+ for task_constraint in state.task_constraints:
+ if task_constraint.child is not None:
+ lines.append(
+ f" {_mermaid_id(task_constraint.parent)}[{_mermaid_label(task_constraint.parent)}]"
+ f" -.->|{_mermaid_label(task_constraint.type.value)}| "
+ f"{_mermaid_id(task_constraint.child)}[{_mermaid_label(task_constraint.child)}]"
+ )
+ edge_nodes.add(task_constraint.parent)
+ edge_nodes.add(task_constraint.child)
+
+ for node in spec.nodes:
+ if node.id not in edge_nodes:
+ lines.append(f" {_mermaid_id(node.id)}[{_mermaid_label(node.id)}]")
+
+ nodes_by_id = spec.nodes_by_id
+ for node in spec.nodes:
+ if node.type.value == "object_reference" and node.parent is not None:
+ if node.parent in nodes_by_id:
+ lines.append(f" {_mermaid_id(node.id)} -.->|ref| {_mermaid_id(node.parent)}")
+
+ for anchor_id in anchor_ids:
+ lines.append(f" style {_mermaid_id(anchor_id)} fill:#3a7d44,color:#fff,stroke:#7fd17f,stroke-width:2px")
+
+ type_palette = {
+ "background": ("#3a4f7a", "#7aa0d8"),
+ "embodiment": ("#7a3a3a", "#d87a7a"),
+ "object": ("#7a6b3a", "#d8c47a"),
+ "object_reference": ("#6b3a7a", "#c47ad8"),
+ "lighting": ("#3a7a7a", "#7ad8d8"),
+ }
+ for node in spec.nodes:
+ if node.id in anchor_ids:
+ continue
+ fill, stroke = type_palette.get(node.type.value, ("#3a3d44", "#888"))
+ lines.append(f" style {_mermaid_id(node.id)} fill:{fill},color:#fff,stroke:{stroke}")
+
+ return "\n".join(lines)
+
+
+def _mermaid_id(value: str) -> str:
+ """Mermaid node identifiers must be alphanumeric / underscore."""
+ return _MERMAID_ID_SAFE.sub("_", value)
+
+
+def _mermaid_label(value: str) -> str:
+ """Escape mermaid-significant characters inside node labels."""
+ return value.replace('"', """).replace("|", "|")
diff --git a/isaaclab_arena_examples/agentic_environment_generation/review_gui/render/panels.py b/isaaclab_arena_examples/agentic_environment_generation/review_gui/render/panels.py
new file mode 100644
index 000000000..8eeeae19e
--- /dev/null
+++ b/isaaclab_arena_examples/agentic_environment_generation/review_gui/render/panels.py
@@ -0,0 +1,79 @@
+# Copyright (c) 2026, The Isaac Lab Arena Project Developers (https://github.com/isaac-sim/IsaacLab-Arena/blob/main/CONTRIBUTORS.md).
+# All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
+from __future__ import annotations
+
+import html as html_lib
+import yaml
+
+from isaaclab_arena.environments.arena_env_graph_spec import ArenaEnvInitialGraphSpec
+from isaaclab_arena.environments.arena_env_graph_types import ArenaEnvGraphNodeSpec, ArenaEnvGraphStateSpec
+from isaaclab_arena_examples.agentic_environment_generation.review_gui.render.thumbnails import render_node_thumbnail
+
+
+def render_unary_constraints(state: ArenaEnvGraphStateSpec) -> str:
+ """List constraints without a reference beside the spatial graph."""
+ rows = []
+ for constraint in state.spatial_constraints:
+ if constraint.reference is not None:
+ continue
+ params = (
+ " {html_lib.escape(yaml.safe_dump(constraint.params, default_flow_style=True).rstrip())}'
+ if constraint.params
+ else ""
+ )
+ rows.append(
+ f'{html_lib.escape(constraint.kind)}'
+ f" on {html_lib.escape(constraint.subject)}{params}"
+ )
+ if not rows:
+ return 'No unary constraints.
'
+ return (
+ f'Unary constraints ({len(rows)})
'
+ f''
+ )
+
+
+def render_tasks_table(spec: ArenaEnvInitialGraphSpec) -> str:
+ if not spec.tasks:
+ return "No tasks defined.
"
+ rows = []
+ for index, task in enumerate(spec.tasks):
+ params_str = yaml.safe_dump(task.params, sort_keys=False).rstrip() if task.params else "(empty)"
+ description = html_lib.escape(task.description or "")
+ rows.append(
+ ""
+ f"{index} | "
+ f'{html_lib.escape(task.kind)} | '
+ f"{description} | "
+ f"{html_lib.escape(params_str)} | "
+ "
"
+ )
+ return (
+ ""
+ "| # | kind | description | params |
"
+ f"{''.join(rows)}"
+ "
"
+ )
+
+
+def render_node_cards(spec: ArenaEnvInitialGraphSpec, thumbnails: dict[str, bytes] | None = None) -> str:
+ thumbnails = thumbnails or {}
+ return "\n".join(render_node_card(node, thumbnails.get(node.id)) for node in spec.nodes)
+
+
+def render_node_card(node: ArenaEnvGraphNodeSpec, png_bytes: bytes | None = None) -> str:
+ node_dict = node.model_dump(mode="json", exclude_none=True)
+ node_yaml = yaml.safe_dump(node_dict, sort_keys=False).rstrip()
+ thumb = render_node_thumbnail(node, png_bytes)
+ return f"""
+ {thumb}
+
+ {html_lib.escape(node_yaml)}
+"""
diff --git a/isaaclab_arena_examples/agentic_environment_generation/review_gui/render/styles.py b/isaaclab_arena_examples/agentic_environment_generation/review_gui/render/styles.py
new file mode 100644
index 000000000..b2638549c
--- /dev/null
+++ b/isaaclab_arena_examples/agentic_environment_generation/review_gui/render/styles.py
@@ -0,0 +1,75 @@
+# Copyright (c) 2026, The Isaac Lab Arena Project Developers (https://github.com/isaac-sim/IsaacLab-Arena/blob/main/CONTRIBUTORS.md).
+# All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
+DASHBOARD_CSS = """
+:root {
+ --bg: #15181d;
+ --bg-elev: #1d2128;
+ --bg-elev2: #262b34;
+ --border: #2f343d;
+ --fg: #e4e6eb;
+ --fg-muted: #8a9099;
+ --accent: #7fd17f;
+}
+* { box-sizing: border-box; }
+body { margin: 0; padding: 24px; font: 14px/1.5 -apple-system, BlinkMacSystemFont, 'Segoe UI', Roboto, sans-serif;
+ background: var(--bg); color: var(--fg); }
+header { margin-bottom: 16px; }
+header h1 { margin: 0; font-size: 28px; font-weight: 700; }
+header .sub { margin: 4px 0 0; color: var(--fg-muted); font-size: 13px; }
+main { display: flex; flex-direction: column; gap: 16px; }
+.panel { background: var(--bg-elev); border: 1px solid var(--border); border-radius: 8px; padding: 16px; }
+.panel h2 { margin: 0 0 12px; font-size: 16px; font-weight: 600; letter-spacing: 0.02em; }
+.panel h2 .muted { color: var(--fg-muted); font-weight: 400; font-size: 13px; }
+.graph-row { display: grid; grid-template-columns: minmax(0, 1fr) minmax(220px, 300px); gap: 16px; align-items: start; }
+.graph-mermaid { min-width: 0; }
+.graph-unary { background: var(--bg-elev2); border: 1px solid var(--border); border-radius: 6px; padding: 12px; }
+.unary-heading { margin: 0 0 10px; font-size: 13px; font-weight: 600; letter-spacing: 0.02em; }
+.unary-heading .muted { font-weight: 400; }
+.unary-list { margin: 0; padding-left: 18px; list-style: disc; color: var(--fg); }
+.unary-list li { padding: 4px 0; font-size: 12px; }
+.unary-empty { margin: 0; font-size: 12px; }
+code { font-family: ui-monospace, 'SF Mono', Menlo, monospace; font-size: 12px;
+ background: var(--bg-elev2); padding: 1px 6px; border-radius: 4px; }
+pre { font-family: ui-monospace, 'SF Mono', Menlo, monospace; font-size: 12px;
+ background: var(--bg-elev2); padding: 10px 12px; border-radius: 6px; margin: 0;
+ white-space: pre-wrap; word-break: break-word; }
+.muted { color: var(--fg-muted); }
+.badge { display: inline-block; padding: 2px 8px; border-radius: 999px; font-size: 11px;
+ font-weight: 600; letter-spacing: 0.03em; background: var(--bg-elev2); color: var(--fg); }
+.badge.type-background { background: #3a4f7a; }
+.badge.type-embodiment { background: #7a3a3a; }
+.badge.type-object { background: #7a6b3a; }
+.badge.type-object_reference { background: #6b3a7a; }
+.badge.type-lighting { background: #3a7a7a; }
+.badge.type-is_anchor { background: #3a7d44; }
+.badge.type-position_limits, .badge.type-at_pose, .badge.type-at_position { background: #6b3a7a; }
+.badge.type-task { background: #2f343d; border: 1px solid #4a5; color: var(--accent); }
+.mermaid { background: var(--bg-elev2); padding: 8px; border-radius: 6px; min-height: 220px;
+ display: flex; align-items: center; justify-content: center; margin: 0; }
+table.tasks { width: 100%; border-collapse: collapse; }
+table.tasks th, table.tasks td { text-align: left; padding: 8px 10px; border-bottom: 1px solid var(--border);
+ vertical-align: top; font-size: 12px; }
+table.tasks th { color: var(--fg-muted); font-weight: 600; text-transform: uppercase; letter-spacing: 0.05em; }
+table.tasks pre { padding: 6px 8px; font-size: 11px; }
+.node-grid { display: grid; grid-template-columns: repeat(3, minmax(0, 1fr)); gap: 12px; }
+.node-card { background: var(--bg-elev2); border: 1px solid var(--border); border-radius: 8px;
+ padding: 12px; display: flex; flex-direction: column; gap: 10px; }
+.node-card .thumb { aspect-ratio: 1 / 1; background: linear-gradient(135deg, #2a2f37, #1c2026);
+ border-radius: 6px; display: flex; flex-direction: column;
+ align-items: center; justify-content: center; color: var(--fg-muted);
+ position: relative; overflow: hidden; }
+.node-card .thumb-rendered { background: #0e1115; }
+.node-card .thumb-rendered img { width: 100%; height: 100%; object-fit: contain; display: block; }
+.node-card .thumb-rendered .thumb-name { position: absolute; bottom: 0; left: 0; right: 0;
+ padding: 4px 6px; background: rgba(15, 17, 21, 0.78);
+ color: var(--fg); margin: 0; }
+.thumb-initial { font-size: 36px; font-weight: 700; color: var(--fg); opacity: 0.6;
+ font-family: ui-monospace, monospace; }
+.thumb-name { font-size: 10px; margin-top: 6px; padding: 0 8px; text-align: center; word-break: break-word; }
+.node-meta { display: flex; align-items: center; justify-content: space-between; gap: 8px; }
+.node-id { font-family: ui-monospace, monospace; font-size: 13px; font-weight: 600; word-break: break-all; }
+.node-yaml { font-size: 11px; }
+"""
diff --git a/isaaclab_arena_examples/agentic_environment_generation/review_gui/render/thumbnails.py b/isaaclab_arena_examples/agentic_environment_generation/review_gui/render/thumbnails.py
new file mode 100644
index 000000000..4b2a5eb23
--- /dev/null
+++ b/isaaclab_arena_examples/agentic_environment_generation/review_gui/render/thumbnails.py
@@ -0,0 +1,28 @@
+# Copyright (c) 2026, The Isaac Lab Arena Project Developers (https://github.com/isaac-sim/IsaacLab-Arena/blob/main/CONTRIBUTORS.md).
+# All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
+from __future__ import annotations
+
+import base64
+import html as html_lib
+
+from isaaclab_arena.environments.arena_env_graph_types import ArenaEnvGraphNodeSpec
+
+
+def render_node_thumbnail(node: ArenaEnvGraphNodeSpec, png_bytes: bytes | None = None) -> str:
+ """Per-node thumbnail: USD capture if available, else two-letter placeholder."""
+ if png_bytes:
+ b64 = base64.b64encode(png_bytes).decode("ascii")
+ return (
+ ''
+ f'

'
+ f'
{html_lib.escape(node.name)}'
+ "
"
+ )
+ initial = (node.name[:2] if node.name else "?").upper()
+ return f"""
+ {html_lib.escape(initial)}
+ {html_lib.escape(node.name)}
+
"""
diff --git a/isaaclab_arena_examples/agentic_environment_generation/review_gui/server.py b/isaaclab_arena_examples/agentic_environment_generation/review_gui/server.py
new file mode 100644
index 000000000..92b0fa58f
--- /dev/null
+++ b/isaaclab_arena_examples/agentic_environment_generation/review_gui/server.py
@@ -0,0 +1,88 @@
+# Copyright (c) 2026, The Isaac Lab Arena Project Developers (https://github.com/isaac-sim/IsaacLab-Arena/blob/main/CONTRIBUTORS.md).
+# All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
+"""CLI launcher for the ArenaEnvInitialGraphSpec live editor.
+
+Spawns Streamlit with :mod:`~isaaclab_arena_examples.agentic_environment_generation.review_gui.streamlit_ui`.
+
+Usage:
+ /isaac-sim/python.sh -m isaaclab_arena_examples.agentic_environment_generation.review_gui.server \\
+ --yaml isaaclab_arena/tests/test_data/pick_and_place_maple_table_init_env_graph.yaml
+
+ # Prompt-only (empty editor until you generate or paste YAML):
+ /isaac-sim/python.sh -m isaaclab_arena_examples.agentic_environment_generation.review_gui.server
+
+ # Custom port:
+ /isaac-sim/python.sh -m isaaclab_arena_examples.agentic_environment_generation.review_gui.server \\
+ --yaml --port 8600
+"""
+
+from __future__ import annotations
+
+import argparse
+import os
+import subprocess
+import sys
+from pathlib import Path
+
+
+def main() -> None:
+ parser = argparse.ArgumentParser(
+ description=__doc__,
+ formatter_class=argparse.RawDescriptionHelpFormatter,
+ )
+ parser.add_argument(
+ "--yaml",
+ type=Path,
+ default=None,
+ help="Optional ArenaEnvInitialGraphSpec YAML to open in the editor.",
+ )
+ parser.add_argument(
+ "--port",
+ type=int,
+ default=8501,
+ help="Streamlit server port (default: 8501).",
+ )
+ args = parser.parse_args()
+ serve_live_editor(args.yaml, port=args.port)
+
+
+def serve_live_editor(yaml_path: Path | None, port: int = 8501) -> None:
+ """Spawn ``streamlit run streamlit_ui.py`` and wait."""
+ app_path = Path(__file__).with_name("streamlit_ui.py")
+ if not app_path.exists():
+ raise FileNotFoundError(f"Streamlit app not found at {app_path} — installation is incomplete.")
+
+ cmd = [
+ sys.executable,
+ "-m",
+ "streamlit",
+ "run",
+ str(app_path),
+ "--server.port",
+ str(port),
+ "--browser.gatherUsageStats",
+ "false",
+ "--server.fileWatcherType",
+ "none",
+ "--",
+ ]
+ if yaml_path is not None:
+ cmd.extend(["--yaml", str(yaml_path.resolve())])
+
+ print(f"[review_gui] launching Streamlit live editor: {' '.join(cmd)}", file=sys.stderr)
+ try:
+ subprocess.run(cmd, env=os.environ.copy(), check=True)
+ except FileNotFoundError as exc:
+ raise SystemExit(
+ "Streamlit is not installed. Inside the isaaclab_arena container run:\n"
+ " python -m pip install --user --ignore-installed streamlit streamlit-ace"
+ ) from exc
+ except KeyboardInterrupt:
+ pass
+
+
+if __name__ == "__main__":
+ main()
diff --git a/isaaclab_arena_examples/agentic_environment_generation/review_gui/sim_preview.py b/isaaclab_arena_examples/agentic_environment_generation/review_gui/sim_preview.py
new file mode 100644
index 000000000..aad72af82
--- /dev/null
+++ b/isaaclab_arena_examples/agentic_environment_generation/review_gui/sim_preview.py
@@ -0,0 +1,194 @@
+# Copyright (c) 2026, The Isaac Lab Arena Project Developers (https://github.com/isaac-sim/IsaacLab-Arena/blob/main/CONTRIBUTORS.md).
+# All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
+"""16-env relation-solver rollout preview for the review GUI SimApp sidecar."""
+
+from __future__ import annotations
+
+import argparse
+import math
+import sys
+import time
+import uuid
+from contextlib import suppress
+from pathlib import Path
+from typing import Any
+
+from isaaclab.envs.common import ViewerCfg
+
+from isaaclab_arena.environments.arena_env_graph_spec import ArenaEnvInitialGraphSpec
+
+PREVIEW_CACHE_DIR = Path(__file__).resolve().parents[3] / ".cache" / "llm_env_gen_sim_preview"
+
+NUM_ENVS = 16
+ENV_SPACING_M = 1.5
+NUM_STEPS = 10
+
+# Placement pool size when preview uses resolve_on_reset=False (see ObjectPlacerParams).
+_PREVIEW_LAYOUTS_PER_ENV = 5
+
+
+def _preview_log(started_at: float, message: str) -> None:
+ elapsed = time.monotonic() - started_at
+ print(f"[sim_preview] +{elapsed:.1f}s {message}", file=sys.stderr, flush=True)
+
+
+def _preview_args() -> argparse.Namespace:
+ return argparse.Namespace(
+ num_envs=NUM_ENVS,
+ env_spacing=ENV_SPACING_M,
+ device="cuda:0",
+ disable_fabric=False,
+ solve_relations=True,
+ placement_seed=None,
+ resolve_on_reset=False,
+ random_yaw_init=False,
+ mimic=False,
+ distributed=False,
+ presets=None,
+ )
+
+
+def _overview_camera(
+ num_envs: int, env_spacing: float
+) -> tuple[tuple[float, float, float], tuple[float, float, float]]:
+ """Return (eye, lookat) in world frame for a high oblique view of the full env grid."""
+ cols = int(math.ceil(math.sqrt(num_envs)))
+ rows = int(math.ceil(num_envs / cols))
+ max_x = max((cols - 1) * env_spacing, 0.0)
+ max_y = max((rows - 1) * env_spacing, 0.0)
+ cx, cy = max_x * 0.5, max_y * 0.5
+ span = max(max_x, max_y, env_spacing)
+ # Oblique overview: close enough to fill the frame, still high enough for all clones.
+ height = span * 1.75 + env_spacing * 2.5
+ back = span * 1.4 + env_spacing * 2.0
+ side = span * 0.25
+ eye = (cx + side, cy - back, height)
+ target = (cx, cy, 0.75)
+ return eye, target
+
+
+def _apply_overview_camera(env, app, num_envs: int, env_spacing: float) -> None:
+ """Point the Kit viewport at the full multi-env grid (world frame)."""
+ eye, target = _overview_camera(num_envs, env_spacing)
+ unwrapped = env.unwrapped
+ vcc = getattr(unwrapped, "viewport_camera_controller", None)
+ if vcc is not None:
+ vcc.update_view_to_world()
+ vcc.update_view_location(eye=list(eye), lookat=list(target))
+ else:
+ unwrapped.sim.set_camera_view(eye, target)
+ for _ in range(20):
+ app.update()
+
+
+def _capture_viewport(app, cache_path: Path) -> bytes | None:
+ from omni.kit.viewport.utility import capture_viewport_to_file, get_active_viewport # noqa: PLC0415
+
+ from isaaclab_arena_examples.agentic_environment_generation.review_gui.thumbnail_render import ( # noqa: PLC0415
+ _wait_for_capture,
+ )
+
+ viewport = get_active_viewport()
+ cache_path.parent.mkdir(parents=True, exist_ok=True)
+ capture_obj = capture_viewport_to_file(viewport, str(cache_path))
+ for _ in range(10):
+ app.update()
+ _wait_for_capture(app, capture_obj, cache_path, max_updates=300)
+ if cache_path.exists() and cache_path.stat().st_size > 0:
+ return cache_path.read_bytes()
+ return None
+
+
+def run_sim_preview(app, yaml_text: str) -> dict[str, Any]:
+ """Link spec → arena env → relation solver → zero-action steps; capture viewport frames."""
+ import gymnasium as gym
+ import yaml
+
+ from isaaclab_arena.environments.arena_env_builder import ArenaEnvBuilder
+ from isaaclab_arena.policy.zero_action_policy import ZeroActionPolicy, ZeroActionPolicyArgs
+ from isaaclab_arena.utils.isaaclab_utils.simulation_app import close_env_and_reset_sim
+
+ started_at = time.monotonic()
+ _preview_log(started_at, "run_sim_preview started")
+
+ # Drop any stale sim/scene state left from a prior preview in this sidecar.
+ close_env_and_reset_sim(suppress_exceptions=True, app=app)
+ _preview_log(started_at, "cleared stale sim state")
+
+ raw = yaml.safe_load(yaml_text)
+ if not isinstance(raw, dict):
+ raise ValueError(f"expected mapping, got {type(raw).__name__}")
+
+ initial_spec = ArenaEnvInitialGraphSpec.model_validate(raw)
+ graph_spec = initial_spec.link()
+ arena_env = graph_spec.to_arena_env()
+ preview_name = f"{arena_env.name}_preview_{uuid.uuid4().hex[:8]}"
+ arena_env.name = preview_name
+ _preview_log(started_at, f"linked spec → arena env ({preview_name})")
+
+ args = _preview_args()
+ builder = ArenaEnvBuilder(arena_env, args)
+ policy = ZeroActionPolicy(ZeroActionPolicyArgs())
+
+ PREVIEW_CACHE_DIR.mkdir(parents=True, exist_ok=True)
+ stamp = int(time.time() * 1000)
+ first_path = PREVIEW_CACHE_DIR / f"{preview_name}_{stamp}_first.png"
+ last_path = PREVIEW_CACHE_DIR / f"{preview_name}_{stamp}_last.png"
+
+ pool_layouts = args.num_envs * _PREVIEW_LAYOUTS_PER_ENV
+ env = None
+ try:
+ eye, target = _overview_camera(args.num_envs, args.env_spacing)
+ _preview_log(
+ started_at,
+ f"solving spatial relations ({args.num_envs} envs, {pool_layouts} layout pool)…",
+ )
+ t_relations = time.monotonic()
+ env_cfg = builder.compose_manager_cfg()
+ _preview_log(started_at, f"relation solver finished ({time.monotonic() - t_relations:.1f}s)")
+
+ # World-frame overview (not task look-at-object) so all env clones are visible.
+ env_cfg.viewer = ViewerCfg(eye=eye, lookat=target, origin_type="world")
+ _preview_log(started_at, "spawning sim scene (gym.make)…")
+ t_spawn = time.monotonic()
+ env = builder.make_registered(env_cfg)
+ _preview_log(started_at, f"sim scene ready ({time.monotonic() - t_spawn:.1f}s)")
+
+ obs, _ = env.reset()
+ _apply_overview_camera(env, app, args.num_envs, args.env_spacing)
+
+ if _capture_viewport(app, first_path) is None:
+ raise RuntimeError("failed to capture first-frame viewport screenshot")
+
+ for _ in range(NUM_STEPS):
+ action = policy.get_action(env, obs)
+ obs, _, _, _, _ = env.step(action)
+
+ _apply_overview_camera(env, app, args.num_envs, args.env_spacing)
+
+ if _capture_viewport(app, last_path) is None:
+ raise RuntimeError("failed to capture last-frame viewport screenshot")
+
+ print(
+ f"[sim_preview] captured {NUM_ENVS} envs @ {ENV_SPACING_M}m spacing, {NUM_STEPS} zero-action steps "
+ f"(total {time.monotonic() - started_at:.1f}s)",
+ file=sys.stderr,
+ flush=True,
+ )
+ return {
+ "ok": True,
+ "first_frame": str(first_path),
+ "last_frame": str(last_path),
+ "env_name": preview_name,
+ "num_envs": args.num_envs,
+ "env_spacing": args.env_spacing,
+ "num_steps": NUM_STEPS,
+ }
+ finally:
+ close_env_and_reset_sim(env, app=app)
+ with suppress(Exception):
+ if preview_name in gym.registry:
+ del gym.registry[preview_name]
diff --git a/isaaclab_arena_examples/agentic_environment_generation/review_gui/simapp_sidecar.py b/isaaclab_arena_examples/agentic_environment_generation/review_gui/simapp_sidecar.py
new file mode 100644
index 000000000..17b1f727a
--- /dev/null
+++ b/isaaclab_arena_examples/agentic_environment_generation/review_gui/simapp_sidecar.py
@@ -0,0 +1,310 @@
+# Copyright (c) 2025-2026, The Isaac Lab Arena Project Developers (https://github.com/isaac-sim/IsaacLab-Arena/blob/main/CONTRIBUTORS.md).
+# All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
+"""Long-lived ``SimulationApp`` host process for the live review editor.
+
+Boots Kit's ``SimulationApp`` once (with ``--viz kit``) on *its own* main thread and serves
+validation and thumbnail-render requests over a newline-delimited JSON-RPC
+pipe on stdin/stdout. The parent (``streamlit_ui.py`` running inside
+Streamlit) spawns exactly one of these and reuses it for the entire server
+lifetime via
+:class:`isaaclab_arena_examples.agentic_environment_generation.review_gui.simapp_sidecar_client.SimAppSidecar`.
+
+Why a sidecar and not an in-process ``SimulationApp``:
+
+* ``signal.signal`` only works in the main thread. Streamlit's
+ ``ScriptRunner`` runs the script in a worker thread, so SimApp's signal
+ setup raises ``ValueError("signal only works in main thread …")``.
+* ``omni.usd.UsdContext`` is process-singleton AND can't tolerate cross-
+ thread driving from Streamlit reruns — driving it from worker threads
+ triggers ``[Error] [omni.usd] UsdContext busy`` and the open_stage call
+ fails. A dedicated process with serialized request handling avoids both.
+
+Protocol (newline-delimited JSON over stdin/stdout):
+
+ Ready handshake (sent by sidecar on boot before reading any request):
+ {"ready": true} # SimApp boot succeeded
+ {"ready": false, "error": "..."} # boot failed; sidecar exits
+
+ Requests:
+ {"cmd": "ping"}
+ → {"ok": true}
+
+ {"cmd": "validate_spec", "yaml_text": "..."}
+ → {"ok": true, "spec_dict": {...}}
+ (full :class:`ArenaEnvInitialGraphSpec` validation including registry
+ lookups — runs in the sidecar where registries are already warm)
+
+ {"cmd": "render_spec", "yaml_text": "..."}
+ → {"ok": true, "paths": {"node_id": "/abs/path/to.png", ...},
+ "errors": [{"node_id": "...", "error": "..."}]}
+ (paths are absolute filesystem paths on the disk cache. The PNGs
+ themselves stay on disk — the parent reads them itself.)
+
+ {"cmd": "build_catalogues"}
+ → {"ok": true, "asset_catalogue": {...}, "relation_catalogue": {...},
+ "task_catalogue": {...}}
+ (registry vocabulary for :meth:`EnvironmentGenerationAgent.fetch_intent_from_prompt`)
+
+ {"cmd": "compile_intent", "intent_dict": {...}}
+ → {"ok": true, "spec_dict": {...}, "has_resolution_errors": bool,
+ "trace": [{"stage": "...", "query": "...", ...}]}
+ (validates :class:`EnvironmentIntentSpec` and compiles to initial graph spec)
+
+ {"cmd": "run_sim_preview", "yaml_text": "..."}
+ → {"ok": true, "first_frame": "/abs/first.png", "last_frame": "/abs/last.png",
+ "num_envs": 16, "env_spacing": 1.5, "num_steps": 10}
+ (link → to_arena_env → relation solver → 10 zero-action steps; viewport captures)
+
+ {"cmd": "shutdown"}
+ → {"ok": true} # sidecar exits cleanly after replying
+
+ Parent EOF on stdin (parent process died) triggers the same graceful
+ shutdown as the explicit "shutdown" cmd.
+
+stdout multiplexing:
+
+Kit writes a lot to stdout (warnings, replicator startup, etc.) and that
+would corrupt the JSON channel the parent reads. We dup the original
+stdout fd before touching Kit, then redirect Kit's stdout to stderr —
+JSON replies go out through the saved fd; everything else from Kit
+appears on the user's terminal via inherited stderr.
+"""
+
+from __future__ import annotations
+
+import contextlib
+import json
+import os
+import signal
+import sys
+import traceback
+import yaml
+from pathlib import Path
+from typing import Any
+
+_JSON_FD = os.dup(1)
+os.dup2(2, 1)
+sys.stdout = sys.stderr
+
+
+def _send(payload: dict[str, Any]) -> None:
+ """Write one JSON line to the parent on the saved stdout fd."""
+ data = (json.dumps(payload) + "\n").encode("utf-8")
+ os.write(_JSON_FD, data)
+
+
+def _install_signal_handlers() -> None:
+ def _exit(signum, _frame):
+ raise SystemExit(0)
+
+ signal.signal(signal.SIGTERM, _exit)
+ signal.signal(signal.SIGINT, _exit)
+
+
+def _serve() -> int:
+ """Boot SimApp, hand-shake with the parent, then service requests."""
+ _install_signal_handlers()
+
+ try:
+ from isaaclab_arena_examples.agentic_environment_generation.review_gui.thumbnail_render import ( # noqa: PLC0415
+ _launch_simulation_app,
+ )
+ except Exception as exc:
+ _send({"ready": False, "error": f"import failed: {exc}", "traceback": traceback.format_exc()})
+ return 1
+
+ app = _launch_simulation_app()
+ if app is None:
+ _send({"ready": False, "error": "SimulationApp launch returned None"})
+ return 1
+
+ from isaaclab_arena.environments.arena_env_graph_spec import ArenaEnvInitialGraphSpec # noqa: PLC0415
+ from isaaclab_arena_examples.agentic_environment_generation.review_gui.thumbnail_render import ( # noqa: PLC0415
+ _render_thumbnails_with_app,
+ )
+
+ _send({"ready": True})
+
+ try:
+ for raw_line in sys.stdin:
+ line = raw_line.strip()
+ if not line:
+ continue
+ try:
+ req = json.loads(line)
+ except json.JSONDecodeError as exc:
+ _send({"ok": False, "error": f"bad json: {exc}"})
+ continue
+
+ cmd = req.get("cmd")
+ if cmd == "shutdown":
+ _send({"ok": True})
+ return 0
+
+ if cmd == "ping":
+ _send({"ok": True})
+ continue
+
+ if cmd == "validate_spec":
+ _send(_handle_validate_spec(req, ArenaEnvInitialGraphSpec))
+ continue
+
+ if cmd == "render_spec":
+ _send(_handle_render_spec(app, req, _render_thumbnails_with_app, ArenaEnvInitialGraphSpec))
+ continue
+
+ if cmd == "build_catalogues":
+ _send(_handle_build_catalogues())
+ continue
+
+ if cmd == "compile_intent":
+ _send(_handle_compile_intent(req))
+ continue
+
+ if cmd == "run_sim_preview":
+ _send(_handle_run_sim_preview(app, req))
+ continue
+
+ _send({"ok": False, "error": f"unknown cmd: {cmd!r}"})
+
+ return 0
+ finally:
+ with contextlib.suppress(Exception):
+ app.close()
+
+
+def _handle_validate_spec(req: dict[str, Any], spec_cls) -> dict[str, Any]:
+ """Parse and fully validate YAML as an initial graph spec."""
+ yaml_text = req.get("yaml_text")
+ if not isinstance(yaml_text, str):
+ return {"ok": False, "error": "validate_spec requires string 'yaml_text'"}
+
+ try:
+ raw = yaml.safe_load(yaml_text)
+ except Exception as exc:
+ return {"ok": False, "error": f"yaml parse failed: {exc}", "traceback": traceback.format_exc()}
+
+ if raw is None:
+ return {"ok": False, "error": "YAML is empty"}
+ if not isinstance(raw, dict):
+ return {"ok": False, "error": f"expected mapping, got {type(raw).__name__}"}
+
+ try:
+ spec = spec_cls.model_validate(raw)
+ except Exception as exc:
+ return {"ok": False, "error": f"spec validation failed: {exc}", "traceback": traceback.format_exc()}
+
+ return {"ok": True, "spec_dict": spec.to_dict()}
+
+
+def _handle_build_catalogues() -> dict[str, Any]:
+ """Return asset/relation/task catalogues for the env-generation agent."""
+ from dataclasses import asdict # noqa: PLC0415
+
+ from isaaclab_arena.agentic_environment_generation.environment_generation_agent import ( # noqa: PLC0415
+ build_asset_catalogue,
+ build_relation_catalogue,
+ build_task_catalogue,
+ )
+
+ try:
+ asset_catalogue = build_asset_catalogue()
+ relation_catalogue = build_relation_catalogue()
+ task_catalogue = build_task_catalogue()
+ except Exception as exc:
+ return {"ok": False, "error": f"catalogue build failed: {exc}", "traceback": traceback.format_exc()}
+
+ return {
+ "ok": True,
+ "asset_catalogue": asdict(asset_catalogue),
+ "relation_catalogue": {
+ "relations": [asdict(entry) for entry in relation_catalogue.relations],
+ },
+ "task_catalogue": {
+ "tasks": [asdict(entry) for entry in task_catalogue.tasks],
+ },
+ }
+
+
+def _handle_compile_intent(req: dict[str, Any]) -> dict[str, Any]:
+ """Validate an EnvironmentIntentSpec and compile it to an initial graph spec."""
+ from dataclasses import asdict # noqa: PLC0415
+
+ from isaaclab_arena.agentic_environment_generation.environment_intent_spec import ( # noqa: PLC0415
+ EnvironmentIntentSpec,
+ )
+ from isaaclab_arena.agentic_environment_generation.intent_compiler import IntentCompiler # noqa: PLC0415
+
+ intent_dict = req.get("intent_dict")
+ if not isinstance(intent_dict, dict):
+ return {"ok": False, "error": "compile_intent requires mapping 'intent_dict'"}
+
+ try:
+ intent = EnvironmentIntentSpec.model_validate(intent_dict)
+ compiler = IntentCompiler()
+ spec = compiler.compile(intent)
+ except Exception as exc:
+ return {"ok": False, "error": f"intent compile failed: {exc}", "traceback": traceback.format_exc()}
+
+ return {
+ "ok": True,
+ "spec_dict": spec.to_dict(),
+ "has_resolution_errors": compiler.has_resolution_errors,
+ "trace": [asdict(event) for event in compiler.trace],
+ "reasoning": intent.reasoning,
+ }
+
+
+def _handle_run_sim_preview(app, req: dict[str, Any]) -> dict[str, Any]:
+ """Build linked env, solve relations, roll out zero actions, capture overview frames."""
+ from isaaclab_arena_examples.agentic_environment_generation.review_gui.sim_preview import ( # noqa: PLC0415
+ run_sim_preview,
+ )
+
+ yaml_text = req.get("yaml_text")
+ if not isinstance(yaml_text, str):
+ return {"ok": False, "error": "run_sim_preview requires string 'yaml_text'"}
+
+ try:
+ return run_sim_preview(app, yaml_text)
+ except Exception as exc:
+ return {"ok": False, "error": f"sim preview failed: {exc}", "traceback": traceback.format_exc()}
+
+
+def _handle_render_spec(
+ app,
+ req: dict[str, Any],
+ render_fn,
+ spec_cls,
+) -> dict[str, Any]:
+ """Parse the spec, run thumbnail rendering, marshal the response."""
+ yaml_text = req.get("yaml_text")
+ if not isinstance(yaml_text, str):
+ return {"ok": False, "error": "render_spec requires string 'yaml_text'"}
+
+ try:
+ spec = spec_cls.from_dict(yaml.safe_load(yaml_text))
+ except Exception as exc:
+ return {"ok": False, "error": f"spec parse failed: {exc}", "traceback": traceback.format_exc()}
+
+ try:
+ paths: dict[str, Path] = render_fn(app, spec)
+ except Exception as exc:
+ return {"ok": False, "error": f"render failed: {exc}", "traceback": traceback.format_exc()}
+
+ return {
+ "ok": True,
+ "paths": {node_id: str(p) for node_id, p in paths.items()},
+ "errors": [],
+ }
+
+
+def main() -> int:
+ return _serve()
+
+
+if __name__ == "__main__":
+ sys.exit(main())
diff --git a/isaaclab_arena_examples/agentic_environment_generation/review_gui/simapp_sidecar_client.py b/isaaclab_arena_examples/agentic_environment_generation/review_gui/simapp_sidecar_client.py
new file mode 100644
index 000000000..b7758ed33
--- /dev/null
+++ b/isaaclab_arena_examples/agentic_environment_generation/review_gui/simapp_sidecar_client.py
@@ -0,0 +1,208 @@
+# Copyright (c) 2026, The Isaac Lab Arena Project Developers (https://github.com/isaac-sim/IsaacLab-Arena/blob/main/CONTRIBUTORS.md).
+# All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
+"""JSON-RPC client for the persistent :mod:`isaaclab_arena_examples.agentic_environment_generation.review_gui.simapp_sidecar` process."""
+
+from __future__ import annotations
+
+import contextlib
+import json
+import os
+import subprocess
+import sys
+import threading
+import yaml
+from pathlib import Path
+from typing import Any
+
+from isaaclab_arena.environments.arena_env_graph_spec import ArenaEnvInitialGraphSpec
+
+
+class SimAppSidecarError(RuntimeError):
+ """Raised when the SimApp sidecar process can't fulfil a request."""
+
+
+class SimAppSidecar:
+ """Long-lived Kit/SimApp host process exposed as a validation and render service."""
+
+ def __init__(self, *, boot_timeout_s: float = 180.0, shutdown_timeout_s: float = 10.0) -> None:
+ self._proc: subprocess.Popen | None = None
+ self._lock = threading.Lock()
+ self._boot_timeout_s = boot_timeout_s
+ self._shutdown_timeout_s = shutdown_timeout_s
+
+ def start(self) -> None:
+ """Spawn the sidecar process and wait for its ``{"ready": true}`` handshake."""
+ if self._proc is not None and self._proc.poll() is None:
+ return
+
+ cmd = [
+ sys.executable,
+ "-m",
+ "isaaclab_arena_examples.agentic_environment_generation.review_gui.simapp_sidecar",
+ ]
+ self._proc = subprocess.Popen(
+ cmd,
+ stdin=subprocess.PIPE,
+ stdout=subprocess.PIPE,
+ stderr=None,
+ text=True,
+ bufsize=1,
+ env=os.environ.copy(),
+ )
+
+ line = self._readline_or_die()
+ try:
+ msg = json.loads(line)
+ except json.JSONDecodeError as exc:
+ self._terminate()
+ raise SimAppSidecarError(f"Sidecar emitted non-JSON handshake: {line!r}") from exc
+
+ if not msg.get("ready"):
+ self._terminate()
+ raise SimAppSidecarError(
+ f"Sidecar boot failed: {msg.get('error', 'unknown error')}\n{msg.get('traceback', '')}"
+ )
+
+ def is_alive(self) -> bool:
+ return self._proc is not None and self._proc.poll() is None
+
+ def close(self) -> None:
+ """Send ``shutdown``, then terminate/kill if the process doesn't exit."""
+ proc = self._proc
+ if proc is None:
+ return
+ self._proc = None
+
+ if proc.poll() is None:
+ with contextlib.suppress(Exception):
+ proc.stdin.write(json.dumps({"cmd": "shutdown"}) + "\n")
+ proc.stdin.flush()
+ try:
+ proc.wait(timeout=self._shutdown_timeout_s)
+ except subprocess.TimeoutExpired:
+ proc.terminate()
+ try:
+ proc.wait(timeout=5)
+ except subprocess.TimeoutExpired:
+ proc.kill()
+ with contextlib.suppress(Exception):
+ proc.wait(timeout=5)
+
+ with contextlib.suppress(Exception):
+ if proc.stdin:
+ proc.stdin.close()
+ with contextlib.suppress(Exception):
+ if proc.stdout:
+ proc.stdout.close()
+
+ def validate_yaml_text(self, yaml_text: str) -> dict[str, Any]:
+ """Run full spec validation (including registry lookups) in the sidecar."""
+ if not self.is_alive():
+ raise SimAppSidecarError("SimApp sidecar is not running — start it first")
+
+ with self._lock:
+ return self._request({"cmd": "validate_spec", "yaml_text": yaml_text})
+
+ def build_catalogues(self) -> dict[str, Any]:
+ """Build asset/relation/task catalogues from warm registries in the sidecar."""
+ if not self.is_alive():
+ raise SimAppSidecarError("SimApp sidecar is not running — start it first")
+
+ with self._lock:
+ return self._request({"cmd": "build_catalogues"})
+
+ def compile_intent(self, intent_dict: dict[str, Any]) -> dict[str, Any]:
+ """Validate and compile an EnvironmentIntentSpec in the sidecar."""
+ if not self.is_alive():
+ raise SimAppSidecarError("SimApp sidecar is not running — start it first")
+
+ with self._lock:
+ return self._request({"cmd": "compile_intent", "intent_dict": intent_dict})
+
+ def run_sim_preview(self, yaml_text: str) -> dict[str, Any]:
+ """Link, build env, solve relations, and capture overview rollout frames."""
+ if not self.is_alive():
+ raise SimAppSidecarError("SimApp sidecar is not running — start it first")
+
+ with self._lock:
+ return self._request({"cmd": "run_sim_preview", "yaml_text": yaml_text})
+
+ def render_spec(self, spec: ArenaEnvInitialGraphSpec) -> dict[str, bytes]:
+ """Ask the sidecar to render thumbnails for ``spec``."""
+ if not self.is_alive():
+ raise SimAppSidecarError("SimApp sidecar is not running — start it first")
+
+ yaml_text = yaml.safe_dump(spec.to_dict(), sort_keys=False)
+
+ with self._lock:
+ response = self._request({"cmd": "render_spec", "yaml_text": yaml_text})
+
+ if not response.get("ok"):
+ raise SimAppSidecarError(
+ f"sidecar render failed: {response.get('error', 'unknown')}\n{response.get('traceback', '')}"
+ )
+
+ paths: dict[str, str] = response.get("paths", {}) or {}
+ results: dict[str, bytes] = {}
+ for node_id, path_str in paths.items():
+ path = Path(path_str)
+ if path.exists() and path.stat().st_size > 0:
+ results[node_id] = path.read_bytes()
+ else:
+ print(
+ f"[review_gui] sidecar reported {node_id} -> {path_str} but file is missing.",
+ file=sys.stderr,
+ )
+ return results
+
+ def ping(self) -> bool:
+ """Cheap liveness check round-trip — returns True on a healthy reply."""
+ if not self.is_alive():
+ return False
+ with self._lock:
+ try:
+ response = self._request({"cmd": "ping"})
+ except SimAppSidecarError:
+ return False
+ return bool(response.get("ok"))
+
+ def _request(self, payload: dict[str, Any]) -> dict[str, Any]:
+ assert self._proc is not None and self._proc.stdin is not None and self._proc.stdout is not None
+ line = json.dumps(payload) + "\n"
+ try:
+ self._proc.stdin.write(line)
+ self._proc.stdin.flush()
+ except BrokenPipeError as exc:
+ raise SimAppSidecarError("sidecar pipe closed unexpectedly") from exc
+
+ reply_line = self._readline_or_die()
+ try:
+ return json.loads(reply_line)
+ except json.JSONDecodeError as exc:
+ raise SimAppSidecarError(f"sidecar replied with non-JSON: {reply_line!r}") from exc
+
+ def _readline_or_die(self) -> str:
+ assert self._proc is not None and self._proc.stdout is not None
+ line = self._proc.stdout.readline()
+ if line == "":
+ exit_code = self._proc.poll()
+ raise SimAppSidecarError(
+ f"sidecar exited prematurely (exit code: {exit_code}). "
+ "See its stderr output above for the underlying cause."
+ )
+ return line
+
+ def _terminate(self) -> None:
+ if self._proc is None:
+ return
+ with contextlib.suppress(Exception):
+ self._proc.terminate()
+ try:
+ self._proc.wait(timeout=5)
+ except subprocess.TimeoutExpired:
+ with contextlib.suppress(Exception):
+ self._proc.kill()
+ self._proc = None
diff --git a/isaaclab_arena_examples/agentic_environment_generation/review_gui/streamlit_ui.py b/isaaclab_arena_examples/agentic_environment_generation/review_gui/streamlit_ui.py
new file mode 100644
index 000000000..0d93e65f7
--- /dev/null
+++ b/isaaclab_arena_examples/agentic_environment_generation/review_gui/streamlit_ui.py
@@ -0,0 +1,657 @@
+# Copyright (c) 2026, The Isaac Lab Arena Project Developers (https://github.com/isaac-sim/IsaacLab-Arena/blob/main/CONTRIBUTORS.md).
+# All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
+"""Streamlit UI for the initial-graph live editor.
+
+Launch via :mod:`~isaaclab_arena_examples.agentic_environment_generation.review_gui.server`:
+
+ /isaac-sim/python.sh -m isaaclab_arena_examples.agentic_environment_generation.review_gui.server \\
+ --yaml isaaclab_arena/tests/test_data/pick_and_place_maple_table_init_env_graph.yaml
+
+ # Prompt-only (empty editor until you generate or paste YAML):
+ /isaac-sim/python.sh -m isaaclab_arena_examples.agentic_environment_generation.review_gui.server
+
+Registry lookups (task kinds, relation kinds, asset USD paths) run in a
+persistent SimApp sidecar so YAML re-validation stays fast after the first
+~30s sidecar boot. Thumbnails are rendered live by the same sidecar.
+
+Natural-language generation calls the LLM from Streamlit (``NV_API_KEY``) and
+compiles the returned intent in the sidecar where registries are warm.
+"""
+
+from __future__ import annotations
+
+import argparse
+import atexit
+import traceback
+import yaml
+from dataclasses import dataclass
+from pathlib import Path
+from typing import Any
+
+import streamlit as st
+
+from isaaclab_arena.agentic_environment_generation.asset_matcher import ASSET_ERROR_STAGES
+from isaaclab_arena.agentic_environment_generation.environment_generation_agent import (
+ AssetCatalogue,
+ EnvironmentGenerationAgent,
+ RelationCatalogue,
+ RelationCatalogueEntry,
+ TaskCatalogue,
+ TaskCatalogueEntry,
+)
+from isaaclab_arena.agentic_environment_generation.intent_compiler import IntentCompiler
+from isaaclab_arena.environments.arena_env_graph_spec import ArenaEnvInitialGraphSpec
+from isaaclab_arena_examples.agentic_environment_generation.review_gui.render.dashboard import render_dashboard_html
+from isaaclab_arena_examples.agentic_environment_generation.review_gui.sim_preview import (
+ ENV_SPACING_M,
+ NUM_ENVS,
+ NUM_STEPS,
+)
+from isaaclab_arena_examples.agentic_environment_generation.review_gui.simapp_sidecar_client import (
+ SimAppSidecar,
+ SimAppSidecarError,
+)
+
+_IFRAME_HEIGHT_PX = 1100
+
+_SKIP_REGISTRY_CONTEXT: dict[str, Any] = {"skip_registry": True}
+
+_DEFAULT_GENERATION_PROMPT = (
+ "franka pick up avocado from the maple table and place it into a bowl on the table. "
+ "there are other veggies on the table as distractor"
+)
+
+_BROKEN_PLACEHOLDER_HTML = """
+No visualization yet — fix the YAML errors to auto-render.
+"""
+
+_DEFAULT_SAVE_PATH = "isaaclab_arena_environments/llm_generated/generated_spec.yaml"
+
+
+@st.cache_resource(show_spinner="Booting Isaac Sim sidecar (≈30s first run, cached afterwards)…")
+def _get_simapp_sidecar() -> SimAppSidecar | None:
+ """Spawn the SimApp sidecar once per Streamlit server process."""
+ sidecar = SimAppSidecar()
+ try:
+ sidecar.start()
+ except SimAppSidecarError as exc:
+ print(f"[review_gui] SimApp sidecar failed to start: {exc}", flush=True)
+ return None
+
+ atexit.register(sidecar.close)
+ return sidecar
+
+
+def _ensure_sidecar() -> SimAppSidecar | None:
+ """Return a healthy sidecar, re-spawning if the cached one died."""
+ sidecar = _get_simapp_sidecar()
+ if sidecar is not None and sidecar.is_alive():
+ return sidecar
+ if sidecar is not None:
+ sidecar.close()
+ _get_simapp_sidecar.clear()
+ return _get_simapp_sidecar()
+
+
+def _spec_from_sidecar_dict(spec_dict: dict[str, Any]) -> ArenaEnvInitialGraphSpec:
+ """Rebuild a validated spec locally without registry imports."""
+ return ArenaEnvInitialGraphSpec.model_validate(spec_dict, context=_SKIP_REGISTRY_CONTEXT)
+
+
+def _render_with_thumbnails(spec: ArenaEnvInitialGraphSpec) -> str:
+ """Render review HTML, asking the sidecar for live USD thumbnails."""
+ sidecar = _ensure_sidecar()
+ if sidecar is None:
+ st.warning(
+ "Isaac Sim sidecar is unavailable — showing placeholder thumbnails. "
+ "Check the terminal where you launched the server for the underlying error.",
+ icon="⚠️",
+ )
+ return render_dashboard_html(spec)
+
+ try:
+ thumbnails = sidecar.render_spec(spec)
+ except SimAppSidecarError as exc:
+ st.error(
+ f"Sidecar render failed; showing placeholder thumbnails.\n\n```\n{exc}\n```",
+ icon="🛑",
+ )
+ with st.spinner("Resetting the SimApp sidecar…"):
+ _get_simapp_sidecar.clear()
+ return render_dashboard_html(spec)
+
+ return render_dashboard_html(spec, thumbnails=thumbnails if thumbnails else None)
+
+
+@dataclass
+class ValidationResult:
+ """Outcome of parsing and validating YAML text as an initial graph spec."""
+
+ spec: ArenaEnvInitialGraphSpec | None
+ error: str | None
+
+ @property
+ def is_valid(self) -> bool:
+ return self.spec is not None
+
+
+def validate_yaml_text(text: str) -> ValidationResult:
+ """Parse YAML and validate via the SimApp sidecar (registry lookups run there)."""
+ if not text.strip():
+ return ValidationResult(spec=None, error=None)
+
+ try:
+ raw = yaml.safe_load(text)
+ except Exception:
+ return ValidationResult(spec=None, error=traceback.format_exc())
+
+ if raw is None:
+ return ValidationResult(spec=None, error="YAML is empty")
+ if not isinstance(raw, dict):
+ return ValidationResult(spec=None, error=f"Expected mapping, got {type(raw).__name__}")
+
+ sidecar = _ensure_sidecar()
+ if sidecar is None:
+ return ValidationResult(
+ spec=None,
+ error=(
+ "SimApp sidecar is unavailable — cannot validate registry entries. "
+ "Check the terminal where you launched the server."
+ ),
+ )
+
+ try:
+ response = sidecar.validate_yaml_text(text)
+ except SimAppSidecarError as exc:
+ _get_simapp_sidecar.clear()
+ return ValidationResult(spec=None, error=str(exc))
+
+ if not response.get("ok"):
+ err = response.get("error", "validation failed")
+ tb = response.get("traceback", "")
+ message = f"{err}\n\n{tb}" if tb else str(err)
+ return ValidationResult(spec=None, error=message)
+
+ try:
+ spec = _spec_from_sidecar_dict(response["spec_dict"])
+ except Exception:
+ return ValidationResult(spec=None, error=traceback.format_exc())
+
+ return ValidationResult(spec=spec, error=None)
+
+
+@dataclass
+class CatalogueBundle:
+ """Asset/relation/task vocabulary fetched from the SimApp sidecar."""
+
+ asset_catalogue: AssetCatalogue
+ relation_catalogue: RelationCatalogue
+ task_catalogue: TaskCatalogue
+
+
+def _catalogues_from_sidecar_response(response: dict[str, Any]) -> CatalogueBundle:
+ asset_raw = response["asset_catalogue"]
+ relation_raw = response["relation_catalogue"]
+ task_raw = response["task_catalogue"]
+ return CatalogueBundle(
+ asset_catalogue=AssetCatalogue(
+ embodiments=list(asset_raw["embodiments"]),
+ backgrounds=list(asset_raw["backgrounds"]),
+ objects=list(asset_raw["objects"]),
+ ),
+ relation_catalogue=RelationCatalogue(
+ relations=[RelationCatalogueEntry(**entry) for entry in relation_raw["relations"]],
+ ),
+ task_catalogue=TaskCatalogue(
+ tasks=[TaskCatalogueEntry(**entry) for entry in task_raw["tasks"]],
+ ),
+ )
+
+
+def _get_catalogue_bundle() -> CatalogueBundle | None:
+ """Return cached catalogues built in the sidecar (registry lookups run there)."""
+ cached = st.session_state.get("catalogue_bundle")
+ if cached is not None:
+ return cached
+
+ sidecar = _ensure_sidecar()
+ if sidecar is None:
+ return None
+
+ try:
+ response = sidecar.build_catalogues()
+ except SimAppSidecarError as exc:
+ st.session_state["catalogue_error"] = str(exc)
+ _get_simapp_sidecar.clear()
+ return None
+
+ if not response.get("ok"):
+ st.session_state["catalogue_error"] = response.get("error", "catalogue build failed")
+ return None
+
+ bundle = _catalogues_from_sidecar_response(response)
+ st.session_state["catalogue_bundle"] = bundle
+ st.session_state.pop("catalogue_error", None)
+ return bundle
+
+
+def _get_generation_agent() -> EnvironmentGenerationAgent | None:
+ """Lazy-init the LLM agent when ``NV_API_KEY`` is available."""
+ if st.session_state.get("generation_agent_error"):
+ return None
+ agent = st.session_state.get("generation_agent")
+ if agent is not None:
+ return agent
+ try:
+ agent = EnvironmentGenerationAgent()
+ except AssertionError as exc:
+ st.session_state["generation_agent_error"] = str(exc)
+ return None
+ except Exception as exc:
+ st.session_state["generation_agent_error"] = f"{type(exc).__name__}: {exc}"
+ return None
+ st.session_state["generation_agent"] = agent
+ st.session_state.pop("generation_agent_error", None)
+ return agent
+
+
+def _format_trace_lines(trace: list[dict[str, Any]], *, errors_only: bool = False) -> str:
+ error_stages = ASSET_ERROR_STAGES | IntentCompiler._ERROR_TRACE_STAGES
+ lines: list[str] = []
+ for event in trace:
+ stage = event.get("stage", "")
+ if errors_only and stage not in error_stages:
+ continue
+ chosen = event.get("chosen")
+ chosen_str = chosen if chosen is not None else ""
+ note = event.get("note") or ""
+ note_str = f" [{note}]" if note else ""
+ lines.append(f"{stage:34s} {event.get('query', ''):24s} -> {chosen_str}{note_str}")
+ return "\n".join(lines)
+
+
+def _apply_generated_yaml(yaml_text: str) -> None:
+ """Push compiled spec YAML into the editor and force a re-render on the next pass."""
+ st.session_state["edited_text"] = yaml_text
+ st.session_state["last_rendered_text"] = ""
+ st.session_state["editor_version"] = st.session_state.get("editor_version", 0) + 1
+
+
+def run_generation_pipeline(prompt: str) -> tuple[bool, str]:
+ """Call the LLM, compile intent in the sidecar, and load YAML into the editor."""
+ prompt = prompt.strip()
+ if not prompt:
+ return False, "Enter a prompt describing the environment."
+
+ agent = _get_generation_agent()
+ if agent is None:
+ err = st.session_state.get(
+ "generation_agent_error",
+ "Set NV_API_KEY in the environment before generating specs.",
+ )
+ return False, err
+
+ catalogues = _get_catalogue_bundle()
+ if catalogues is None:
+ err = st.session_state.get(
+ "catalogue_error",
+ "SimApp sidecar is unavailable — cannot build asset catalogues.",
+ )
+ return False, err
+
+ try:
+ intent_data, _raw = agent.fetch_intent_from_prompt(
+ prompt,
+ asset_catalog=catalogues.asset_catalogue,
+ relation_catalog=catalogues.relation_catalogue,
+ task_catalog=catalogues.task_catalogue,
+ )
+ except Exception:
+ return False, traceback.format_exc()
+
+ sidecar = _ensure_sidecar()
+ if sidecar is None:
+ return False, "SimApp sidecar is unavailable — cannot compile intent."
+
+ try:
+ response = sidecar.compile_intent(intent_data)
+ except SimAppSidecarError as exc:
+ _get_simapp_sidecar.clear()
+ return False, str(exc)
+
+ if not response.get("ok"):
+ err = response.get("error", "intent compile failed")
+ tb = response.get("traceback", "")
+ message = f"{err}\n\n{tb}" if tb else str(err)
+ return False, message
+
+ try:
+ yaml_text = yaml.safe_dump(response["spec_dict"], sort_keys=False)
+ except Exception:
+ return False, traceback.format_exc()
+
+ _apply_generated_yaml(yaml_text)
+
+ reasoning = response.get("reasoning", "")
+ if reasoning:
+ st.session_state["last_generation_reasoning"] = reasoning
+
+ trace = response.get("trace") or []
+ if trace:
+ st.session_state["last_generation_trace"] = trace
+
+ if response.get("has_resolution_errors"):
+ error_trace = _format_trace_lines(trace, errors_only=True)
+ return (
+ True,
+ (
+ "Spec generated with resolution warnings — review the trace below and edit the YAML as needed.\n\n"
+ f"{error_trace}"
+ ),
+ )
+
+ return True, "Spec generated and loaded into the YAML editor."
+
+
+def run_sim_preview_pipeline(yaml_text: str, *, validation: ValidationResult | None = None) -> tuple[bool, str]:
+ """Link, build, solve relations, and capture overview frames in the sidecar.
+
+ When ``validation`` is already valid (e.g. from the editor panel on the same
+ rerun), skip the redundant sidecar ``validate_spec`` round-trip.
+ """
+ if validation is None or not validation.is_valid:
+ validation = validate_yaml_text(yaml_text)
+ if not validation.is_valid:
+ return False, validation.error or "YAML must be valid before running sim preview."
+
+ sidecar = _ensure_sidecar()
+ if sidecar is None:
+ return False, "SimApp sidecar is unavailable — cannot run sim preview."
+
+ try:
+ response = sidecar.run_sim_preview(yaml_text)
+ except SimAppSidecarError as exc:
+ _get_simapp_sidecar.clear()
+ return False, str(exc)
+
+ if not response.get("ok"):
+ err = response.get("error", "sim preview failed")
+ tb = response.get("traceback", "")
+ message = f"{err}\n\n{tb}" if tb else str(err)
+ return False, message
+
+ try:
+ first_path = Path(response["first_frame"])
+ last_path = Path(response["last_frame"])
+ st.session_state["sim_preview_first"] = first_path.read_bytes()
+ st.session_state["sim_preview_last"] = last_path.read_bytes()
+ except OSError as exc:
+ return False, f"Failed to read preview frames: {exc}"
+
+ return (
+ True,
+ (
+ f"Sim preview complete — {response.get('num_envs', NUM_ENVS)} envs, "
+ f"{response.get('env_spacing', ENV_SPACING_M)} m spacing, "
+ f"{response.get('num_steps', NUM_STEPS)} steps."
+ ),
+ )
+
+
+def parse_args() -> argparse.Namespace:
+ parser = argparse.ArgumentParser(description=__doc__)
+ parser.add_argument(
+ "--yaml",
+ type=Path,
+ default=None,
+ help="Optional path to an ArenaEnvInitialGraphSpec YAML to open in the editor.",
+ )
+ return parser.parse_args()
+
+
+def initialize_state(yaml_path: Path | None) -> None:
+ """Seed ``st.session_state`` from disk exactly once per session."""
+ session_key = str(yaml_path.resolve()) if yaml_path is not None else ""
+ if st.session_state.get("_yaml_path") == session_key:
+ return
+
+ st.session_state["_yaml_path"] = session_key
+ st.session_state.setdefault("generation_prompt", _DEFAULT_GENERATION_PROMPT)
+ st.session_state.setdefault("editor_version", 0)
+
+ if yaml_path is None:
+ st.session_state["original_text"] = ""
+ st.session_state["edited_text"] = ""
+ st.session_state["last_rendered_text"] = ""
+ st.session_state["rendered_html"] = ""
+ st.session_state["save_path"] = _DEFAULT_SAVE_PATH
+ return
+
+ original_text = yaml_path.read_text(encoding="utf-8")
+
+ st.session_state["original_text"] = original_text
+ st.session_state["edited_text"] = original_text
+ st.session_state["last_rendered_text"] = original_text
+ st.session_state["save_path"] = str(yaml_path)
+
+ initial = validate_yaml_text(original_text)
+ if not initial.is_valid:
+ st.session_state["rendered_html"] = _BROKEN_PLACEHOLDER_HTML
+ else:
+ st.session_state["rendered_html"] = _render_with_thumbnails(initial.spec)
+
+
+def render_validation_badge(validation: ValidationResult) -> None:
+ if validation.spec is None and validation.error is None:
+ return
+ if validation.is_valid:
+ spec = validation.spec
+ st.success(
+ f"Valid spec — {spec.env_name} · {len(spec.nodes)} nodes · "
+ f"{len(spec.tasks)} tasks · initial state: {spec.initial_state_spec.id}",
+ icon="✅",
+ )
+ else:
+ st.error(f"Invalid YAML\n\n```\n{validation.error}\n```", icon="🛑")
+
+
+def render_save_button(validation: ValidationResult) -> None:
+ can_save = validation.is_valid
+ save_path_str = st.session_state["save_path"]
+ save_label = f"Save to {Path(save_path_str).name}" if save_path_str else "Save YAML"
+
+ if st.button(
+ save_label,
+ disabled=not can_save,
+ use_container_width=True,
+ help=f"Writes the editor contents to {save_path_str}. Disabled while YAML is invalid.",
+ ):
+ try:
+ out_path = Path(save_path_str)
+ out_path.parent.mkdir(parents=True, exist_ok=True)
+ out_path.write_text(st.session_state["edited_text"], encoding="utf-8")
+ st.session_state["original_text"] = st.session_state["edited_text"]
+ st.toast(f"Saved → {save_path_str}", icon="💾")
+ except OSError as exc:
+ st.error(f"Save failed: {exc}", icon="🛑")
+
+ with st.expander("Change save location", expanded=False):
+ new_path = st.text_input(
+ "Save path",
+ value=save_path_str,
+ key="save_path_input",
+ help="Defaults to the YAML file passed via --yaml, or a generated-spec path when none was given.",
+ )
+ if new_path and new_path != save_path_str:
+ st.session_state["save_path"] = new_path
+
+
+def render_editor_panel(yaml_path: Path | None) -> ValidationResult:
+ try:
+ from streamlit_ace import st_ace # noqa: PLC0415
+ except ImportError as exc:
+ st.error(
+ "`streamlit-ace` is not installed. Inside the isaaclab_arena container run:\n"
+ "`python -m pip install --user --ignore-installed streamlit-ace`\n\n"
+ f"Underlying error: {exc}",
+ icon="🛑",
+ )
+ st.stop()
+
+ st.subheader("YAML editor")
+ if yaml_path is not None:
+ st.caption(f"Source: `{yaml_path}`")
+ else:
+ st.caption("No file loaded — generate a spec or paste YAML.")
+
+ editor_key = str(yaml_path) if yaml_path is not None else "new"
+ new_text = st_ace(
+ value=st.session_state["edited_text"],
+ language="yaml",
+ theme="monokai",
+ keybinding="vscode",
+ font_size=13,
+ tab_size=2,
+ show_gutter=True,
+ show_print_margin=False,
+ wrap=False,
+ auto_update=False,
+ min_lines=30,
+ key=f"ace_editor::{editor_key}::{st.session_state.get('editor_version', 0)}",
+ )
+ if new_text is not None:
+ st.session_state["edited_text"] = new_text
+
+ validation = validate_yaml_text(st.session_state["edited_text"])
+ render_validation_badge(validation)
+
+ edited_since_render = st.session_state["edited_text"] != st.session_state["last_rendered_text"]
+ if validation.is_valid and edited_since_render:
+ with st.spinner("Rendering visualization…"):
+ st.session_state["rendered_html"] = _render_with_thumbnails(validation.spec)
+ st.session_state["last_rendered_text"] = st.session_state["edited_text"]
+ st.toast("Visualization updated.", icon="🔄")
+
+ render_save_button(validation)
+ return validation
+
+
+def render_generation_panel() -> None:
+ """Prompt input and generate-spec controls (top of the left column)."""
+ st.subheader("Generate from prompt")
+ st.caption("Calls the env-generation agent (LLM) then compiles intent in the SimApp sidecar.")
+
+ prompt = st.text_area(
+ "Prompt",
+ value=st.session_state.get("generation_prompt", _DEFAULT_GENERATION_PROMPT),
+ height=120,
+ placeholder="Describe the robot task, scene, objects, and distractors…",
+ )
+ st.session_state["generation_prompt"] = prompt
+
+ agent_error = st.session_state.get("generation_agent_error")
+ if agent_error:
+ st.info(f"LLM agent unavailable: {agent_error}", icon="ℹ️")
+
+ if st.button("Generate & compile", type="primary", use_container_width=True):
+ with st.spinner("Generating spec (LLM call + sidecar compile)…"):
+ ok, message = run_generation_pipeline(st.session_state["generation_prompt"])
+ if ok:
+ if "resolution warnings" in message:
+ st.warning(message, icon="⚠️")
+ else:
+ st.success(message, icon="✅")
+ st.rerun()
+ else:
+ st.error(f"Generation failed\n\n```\n{message}\n```", icon="🛑")
+
+ reasoning = st.session_state.get("last_generation_reasoning")
+ if reasoning:
+ with st.expander("Agent reasoning (last run)", expanded=False):
+ st.markdown(reasoning)
+
+ trace = st.session_state.get("last_generation_trace")
+ if trace:
+ with st.expander("Resolution trace (last run)", expanded=False):
+ st.code(_format_trace_lines(trace), language=None)
+
+
+def render_visualization_panel(validation: ValidationResult) -> None:
+ st.subheader("Visualization")
+ if not st.session_state.get("last_rendered_text", "").strip():
+ st.caption("Generate or enter valid YAML to see the visualization.")
+ else:
+ st.caption("Updates automatically when the YAML is valid.")
+ st.components.v1.html(
+ st.session_state["rendered_html"],
+ height=_IFRAME_HEIGHT_PX,
+ scrolling=True,
+ )
+
+ st.divider()
+ st.subheader("Sim preview")
+ st.caption(
+ f"Runs link → to_arena_env → relation solver, then {NUM_STEPS} zero-action steps "
+ f"with {NUM_ENVS} parallel envs at {ENV_SPACING_M} m spacing. "
+ "Viewport captures use a world-frame overview of the full env grid."
+ )
+
+ if st.button(
+ "Run link + relation solver preview",
+ type="secondary",
+ use_container_width=True,
+ disabled=not validation.is_valid,
+ help="Requires valid YAML and a healthy SimApp sidecar. This may take several minutes.",
+ ):
+ with st.spinner(
+ f"Building env, solving relations, and rolling out {NUM_STEPS} steps ({NUM_ENVS} envs @ {ENV_SPACING_M} m)…"
+ ):
+ ok, message = run_sim_preview_pipeline(st.session_state["edited_text"], validation=validation)
+ if ok:
+ st.success(message, icon="✅")
+ st.rerun()
+ else:
+ st.error(f"Sim preview failed\n\n```\n{message}\n```", icon="🛑")
+
+ first_frame = st.session_state.get("sim_preview_first")
+ last_frame = st.session_state.get("sim_preview_last")
+ if first_frame and last_frame:
+ frame_cols = st.columns(2)
+ with frame_cols[0]:
+ st.caption("Viewport — frame 1 (after reset)")
+ st.image(first_frame, use_container_width=True)
+ with frame_cols[1]:
+ st.caption(f"Viewport — frame 2 (after {NUM_STEPS} zero-action steps)")
+ st.image(last_frame, use_container_width=True)
+
+
+def main() -> None:
+ st.set_page_config(
+ page_title="ArenaEnvInitialGraphSpec live editor",
+ layout="wide",
+ initial_sidebar_state="collapsed",
+ )
+
+ args = parse_args()
+ yaml_path = args.yaml.resolve() if args.yaml is not None else None
+ if yaml_path is not None and not yaml_path.exists():
+ st.error(f"YAML file not found: {yaml_path}", icon="🛑")
+ st.stop()
+
+ initialize_state(yaml_path)
+
+ st.markdown("### ArenaEnvInitialGraphSpec live editor")
+ left, right = st.columns([2, 3], gap="large")
+ with left:
+ render_generation_panel()
+ validation = render_editor_panel(yaml_path)
+ with right:
+ render_visualization_panel(validation)
+
+
+main()
diff --git a/isaaclab_arena_examples/agentic_environment_generation/review_gui/thumbnail_render.py b/isaaclab_arena_examples/agentic_environment_generation/review_gui/thumbnail_render.py
new file mode 100644
index 000000000..f63f17d4e
--- /dev/null
+++ b/isaaclab_arena_examples/agentic_environment_generation/review_gui/thumbnail_render.py
@@ -0,0 +1,360 @@
+# Copyright (c) 2026, The Isaac Lab Arena Project Developers (https://github.com/isaac-sim/IsaacLab-Arena/blob/main/CONTRIBUTORS.md).
+# All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
+"""USD viewport thumbnail rendering for the review GUI SimApp sidecar."""
+
+from __future__ import annotations
+
+import argparse
+import hashlib
+import sys
+from pathlib import Path
+
+from isaaclab_arena.environments.arena_env_graph_spec import ArenaEnvInitialGraphSpec
+
+THUMBNAIL_CACHE_DIR = Path(__file__).resolve().parents[3] / ".cache" / "llm_env_gen_thumbnails"
+
+
+def _render_thumbnails_with_app(app, spec: ArenaEnvInitialGraphSpec) -> dict[str, Path]:
+ """Resolve each node's USD via ``AssetRegistry``, render cache-misses, return PNG paths.
+
+ ``app`` must already be a booted ``SimulationApp``. The caller owns the
+ lifecycle (Kit may turn ``app.close()`` into ``os._exit(0)`` — that's why
+ the sidecar holds the only reference and closes it inside its ``finally``).
+
+ Returns ``{node.id: png_path}`` for nodes whose asset USD could be located
+ *and* whose PNG exists on disk (either from the persistent cache under
+ ``THUMBNAIL_CACHE_DIR`` or freshly rendered into the cache by
+ :func:`_capture_usd_thumbnails`). Missing entries fall through to the
+ placeholder in :func:`_render_node_thumbnail`, so a partial failure (one
+ bad asset) never breaks the rest of the page.
+
+ We return ``Path`` rather than ``bytes`` so the sidecar protocol can ship
+ just the filenames over its stdin/stdout pipe (a few hundred bytes of JSON
+ instead of multiple MB of base64 PNG data). The parent reads the bytes
+ itself off the shared filesystem cache.
+
+ Ordering matters: ``SimulationApp`` MUST be launched before any
+ ``AssetRegistry`` access, because ``ensure_assets_registered()`` imports
+ isaaclab asset modules which transitively load ``pxr``. ``pxr`` loaded
+ before ``AppLauncher`` puts Kit's extension manager into an unrecoverable
+ state ("extension class wrapper for base class ... has not been created
+ yet"). This is the same root cause we fixed for the pytest suite.
+ """
+ asset_paths = _resolve_node_usd_paths(spec)
+ if not asset_paths:
+ print("[thumbnail_render] no asset USD paths resolved; skipping thumbnail rendering.", file=sys.stderr)
+ return {}
+
+ THUMBNAIL_CACHE_DIR.mkdir(parents=True, exist_ok=True)
+
+ # Split into cache-hits vs to-render. Cache key is sha1(usd_path) so
+ # the same USD across multiple envs / nodes hits the same PNG.
+ resolved: dict[str, Path] = {}
+ to_render: dict[str, tuple[str, Path]] = {}
+ for node_id, usd_path in asset_paths.items():
+ cache_path = THUMBNAIL_CACHE_DIR / f"{_usd_cache_key(usd_path)}.png"
+ if cache_path.exists() and cache_path.stat().st_size > 0:
+ resolved[node_id] = cache_path
+ else:
+ to_render[node_id] = (usd_path, cache_path)
+
+ if to_render:
+ print(
+ f"[thumbnail_render] rendering {len(to_render)} new thumbnail(s) "
+ f"(reusing {len(resolved)} from cache at {THUMBNAIL_CACHE_DIR})...",
+ file=sys.stderr,
+ )
+ # ``_capture_usd_thumbnails`` still returns ``{node_id: bytes}``, but
+ # we only use it as a presence signal here — the same call also wrote
+ # the PNG to ``cache_path`` as a side effect, which is what we return.
+ captured = _capture_usd_thumbnails(app, to_render)
+ for node_id, (_usd_path, cache_path) in to_render.items():
+ if node_id in captured and cache_path.exists() and cache_path.stat().st_size > 0:
+ resolved[node_id] = cache_path
+ else:
+ print(f"[thumbnail_render] all {len(resolved)} thumbnail(s) served from cache.", file=sys.stderr)
+
+ return resolved
+
+
+def _sidecar_launch_args() -> argparse.Namespace:
+ """AppLauncher args for the review GUI sidecar (Kit UI + viewport capture)."""
+ return argparse.Namespace(visualizer=["kit"], enable_cameras=True, livestream=-1)
+
+
+def _launch_simulation_app():
+ """Boot Isaac Sim's ``SimulationApp`` with the Kit visualizer, or ``None`` on failure.
+
+ Kept as a tiny helper so the call site can lazy-import inside this
+ function — module-level import of ``simulation_app`` would drag Kit
+ into every invocation, including ``--help``.
+ """
+ try:
+ # Lazy-import: keeps the default ``review_graph`` invocation Kit-free.
+ from isaaclab_arena.utils.isaaclab_utils.simulation_app import get_app_launcher # noqa: PLC0415
+
+ return get_app_launcher(_sidecar_launch_args()).app
+ except Exception as exc:
+ print(f"[thumbnail_render] SimulationApp launch failed: {exc}", file=sys.stderr)
+ return None
+
+
+def _resolve_node_usd_paths(spec: ArenaEnvInitialGraphSpec) -> dict[str, str]:
+ """Map ``node.id → usd_path`` via :class:`AssetRegistry`, skipping unresolvable nodes.
+
+ Tries two lookup strategies in order:
+
+ 1. Class-attribute ``cls.usd_path`` — the convention every ``LibraryObject``
+ subclass in ``object_library.py`` follows. No instantiation, cheap.
+
+ 2. ``cls().scene_config.robot.spawn.usd_path`` — the convention every
+ :class:`EmbodimentBase` subclass uses. Requires instantiating the
+ embodiment because the Franka embodiments populate ``scene_config.robot``
+ inside ``__init__`` rather than as a class default. Embodiment
+ ``__init__`` is light (no Kit / sim required) — it only constructs
+ configclass objects.
+
+ This function MUST be called only after ``SimulationApp`` has booted — see
+ the docstring of :func:`_render_thumbnails_with_app` for why.
+ """
+ try:
+ from isaaclab_arena.assets.registries import AssetRegistry # noqa: PLC0415
+ except Exception as exc:
+ print(f"[thumbnail_render] AssetRegistry import failed: {exc}", file=sys.stderr)
+ return {}
+
+ registry = AssetRegistry()
+ paths: dict[str, str] = {}
+ for node in spec.nodes:
+ try:
+ if not registry.is_registered(node.name):
+ print(f"[thumbnail_render] {node.id}: asset '{node.name}' not registered, skipping.", file=sys.stderr)
+ continue
+ cls = registry.get_asset_by_name(node.name)
+ usd_path = _extract_usd_path(cls)
+ if not usd_path:
+ print(f"[thumbnail_render] {node.id}: '{node.name}' has no usd_path, skipping.", file=sys.stderr)
+ continue
+ paths[node.id] = usd_path
+ except Exception as exc:
+ print(f"[thumbnail_render] {node.id}: lookup failed for '{node.name}': {exc}", file=sys.stderr)
+ return paths
+
+
+def _extract_usd_path(cls) -> str | None:
+ """Return the asset's root USD path, or ``None`` if not extractable.
+
+ See :func:`_resolve_node_usd_paths` for the two strategies tried in order.
+ """
+ # Strategy 1: ``LibraryObject`` convention.
+ usd_path = getattr(cls, "usd_path", None)
+ if usd_path:
+ return usd_path
+
+ # Strategy 2: ``EmbodimentBase`` convention. Walk
+ # ``instance.scene_config.robot.spawn.usd_path``. We instantiate with no
+ # args; every embodiment ``__init__`` defaults all parameters.
+ # NoEmbodiment legitimately has no robot — its instance.scene_config
+ # exists but ``.robot`` is absent / None, so the getattr chain returns
+ # None and we silently fall through.
+ try:
+ instance = cls()
+ except Exception:
+ return None
+ scene_config = getattr(instance, "scene_config", None)
+ robot = getattr(scene_config, "robot", None) if scene_config is not None else None
+ spawn = getattr(robot, "spawn", None) if robot is not None else None
+ return getattr(spawn, "usd_path", None) if spawn is not None else None
+
+
+def _usd_cache_key(usd_path: str) -> str:
+ return hashlib.sha1(usd_path.encode("utf-8")).hexdigest()[:16]
+
+
+def _capture_usd_thumbnails(app, to_render: dict[str, tuple[str, Path]]) -> dict[str, bytes]:
+ """Capture all queued USDs under one already-booted ``SimulationApp``.
+
+ Deduplicates by ``usd_path`` so the same USD shared by multiple nodes is
+ only rendered once and the bytes are fanned back out.
+ """
+ out: dict[str, bytes] = {}
+
+ path_to_node_ids: dict[str, list[str]] = {}
+ path_to_cache: dict[str, Path] = {}
+ for node_id, (usd_path, cache_path) in to_render.items():
+ path_to_node_ids.setdefault(usd_path, []).append(node_id)
+ path_to_cache[usd_path] = cache_path
+
+ for usd_path, node_ids in path_to_node_ids.items():
+ cache_path = path_to_cache[usd_path]
+ try:
+ png_bytes = _render_one_usd(app, usd_path, cache_path)
+ except Exception as exc:
+ print(f"[thumbnail_render] render failed for {usd_path}: {exc}", file=sys.stderr)
+ continue
+ if png_bytes:
+ for node_id in node_ids:
+ out[node_id] = png_bytes
+
+ return out
+
+
+def _render_one_usd(app, usd_path: str, cache_path: Path) -> bytes | None:
+ """Open ``usd_path`` directly as the stage, frame the camera, capture PNG.
+
+ Opening the USD as the stage root (rather than ``new_stage`` + reference
+ wrapper) is what makes viewport capture actually produce a file in
+ headless mode — Kit's viewport machinery binds to the just-opened stage
+ cleanly, whereas a referenced sub-stage left the render product empty in
+ every test we tried. The trade-off is that we lose isolation between
+ captures (each call replaces the stage), but Kit handles that fine
+ because we call ``open_stage`` again on the next asset.
+ """
+ import omni.usd # noqa: PLC0415
+ from omni.kit.viewport.utility import ( # noqa: PLC0415
+ capture_viewport_to_file,
+ frame_viewport_prims,
+ get_active_viewport,
+ )
+ from pxr import Sdf # noqa: PLC0415
+
+ ctx = omni.usd.get_context()
+ if not ctx.open_stage(usd_path):
+ print(f"[thumbnail_render] open_stage failed: {usd_path}", file=sys.stderr)
+ return None
+ stage = ctx.get_stage()
+
+ # Wait for textures / payloads / Nucleus fetches to settle before framing.
+ _wait_for_stage_load(app, ctx)
+
+ # Standalone object USDs (avocado, bowl, ...) ship no lights, so a viewport
+ # capture renders them as a near-black silhouette against the dark skybox
+ # — that's the "blank thumbnail" symptom. Complete scene USDs (maple table)
+ # already include their own lighting, so this is a no-op for them.
+ _ensure_default_lighting(stage)
+
+ # Use the default prim if present, otherwise the pseudo-root, for framing.
+ target_prim = stage.GetDefaultPrim()
+ if not target_prim or not target_prim.IsValid():
+ target_prim = stage.GetPrimAtPath(Sdf.Path("/"))
+
+ viewport = get_active_viewport()
+
+ # Use Kit's own ``frame_viewport_prims`` (the "F"-key equivalent / ``FramePrimsCommand``)
+ # so we go through the viewport camera controller. Manually editing the
+ # ``/OmniverseKit_Persp`` xform op directly worked sometimes but Kit's
+ # camera controller treats /OmniverseKit_Persp as an internal state and
+ # silently overrode our edits for small assets — that's why avocado / bowl
+ # captured as tiny specks even with the right math. Letting Kit do the
+ # framing is both correct and avoids us re-implementing the math.
+ framed = frame_viewport_prims(viewport, prims=[str(target_prim.GetPath())])
+ if not framed:
+ print(f"[thumbnail_render] warning: frame_viewport_prims failed for {usd_path}", file=sys.stderr)
+
+ # Settle Hydra after camera change so the captured frame matches the new pose.
+ for _ in range(30):
+ app.update()
+
+ cache_path.parent.mkdir(parents=True, exist_ok=True)
+ capture_obj = capture_viewport_to_file(viewport, str(cache_path))
+
+ _wait_for_capture(app, capture_obj, cache_path, max_updates=600)
+
+ if cache_path.exists() and cache_path.stat().st_size > 0:
+ return cache_path.read_bytes()
+ print(f"[thumbnail_render] capture produced no file: {cache_path}", file=sys.stderr)
+ return None
+
+
+def _wait_for_stage_load(app, usd_context, max_updates: int = 600) -> None:
+ """Pump frames until ``usd_context.get_stage_loading_status()`` reports nothing pending.
+
+ Returns after stage load completes or after the budget is exhausted. We
+ also need a few extra frames after the count goes to zero so material
+ binding / texture upload finishes — they don't show up in the load count.
+ """
+ settled = 0
+ for _ in range(max_updates):
+ app.update()
+ try:
+ _msg, loading_count, loaded_count = usd_context.get_stage_loading_status()
+ except Exception:
+ return
+ if loading_count == 0 and loaded_count == 0:
+ settled += 1
+ if settled > 15:
+ return
+ else:
+ settled = 0
+
+
+def _wait_for_capture(app, capture_obj, cache_path: Path, max_updates: int = 600) -> None:
+ """Pump ``app.update()`` until the capture PNG lands on disk (or we time out).
+
+ Kit's capture future is fulfilled inside its async loop during
+ ``app.update()``, but future completion doesn't always coincide with the
+ file being flushed — checking the file directly is the most reliable
+ completion signal. We also keep the future-based fast path so a
+ successful capture doesn't have to wait for the file system to settle.
+ """
+ if capture_obj is None:
+ for _ in range(max_updates):
+ app.update()
+ return
+
+ future = (
+ getattr(capture_obj, "_Capture__future", None)
+ or getattr(capture_obj, "_RenderCapture__future", None)
+ or getattr(capture_obj, "future", None)
+ )
+
+ for _ in range(max_updates):
+ app.update()
+ if cache_path.exists() and cache_path.stat().st_size > 0:
+ return
+ if future is not None and future.done():
+ # Future is done but file might still be flushing — give it a few frames.
+ for _ in range(15):
+ app.update()
+ if cache_path.exists() and cache_path.stat().st_size > 0:
+ return
+ return
+
+
+def _ensure_default_lighting(stage) -> None:
+ """Add a dome + key distant light if the stage has none.
+
+ Without this, standalone object USDs (which don't ship their own lights)
+ render as a near-black silhouette. We skip the addition if any
+ ``UsdLuxLight``-derived prim already exists on the stage to avoid
+ double-lighting scenes like the maple table that bake in their own rig.
+ """
+ from pxr import Gf, Sdf, UsdGeom, UsdLux # noqa: PLC0415
+
+ for prim in stage.Traverse():
+ if (
+ prim.HasAPI(UsdLux.LightAPI)
+ or prim.IsA(UsdLux.BoundableLightBase)
+ or prim.IsA(UsdLux.NonboundableLightBase)
+ ):
+ return
+
+ # Soft hemispherical fill so the asset is visible from any angle, plus a
+ # weak directional key for shape definition. Intensities are tuned for
+ # OmniPBR / RTX defaults; tweak if asset libraries adopt darker materials.
+ dome = UsdLux.DomeLight.Define(stage, Sdf.Path("/_ReviewDomeLight"))
+ dome.CreateIntensityAttr(800.0)
+ dome.CreateColorAttr(Gf.Vec3f(1.0, 1.0, 1.0))
+
+ key = UsdLux.DistantLight.Define(stage, Sdf.Path("/_ReviewKeyLight"))
+ key.CreateIntensityAttr(2500.0)
+ key.CreateAngleAttr(2.0)
+ # Aim the key roughly from the camera's 3/4 angle so the lit side faces
+ # the viewport.
+ key_xformable = UsdGeom.Xformable(key.GetPrim())
+ key_xformable.ClearXformOpOrder()
+ rot = key_xformable.AddRotateXYZOp()
+ rot.Set(Gf.Vec3f(-45.0, 30.0, 0.0))
diff --git a/setup.py b/setup.py
index 582f669ec..c411f629b 100644
--- a/setup.py
+++ b/setup.py
@@ -26,6 +26,8 @@
"jupyter",
"debugpy",
"tenacity",
+ "streamlit>=1.30",
+ "streamlit-ace>=0.1.1",
]
setup(