From c5680d674f8a442ab18a152ff6a0683bae82409d Mon Sep 17 00:00:00 2001 From: aiguldzh-nvidia Date: Wed, 17 Jun 2026 11:01:05 +0000 Subject: [PATCH 1/7] extend eval_runner with metrics output, per-episode success, and job records - Add --metrics_file and --episode_record_dir to eval_runner; write one JSON record per job (status, aggregated metrics, per-episode success flags, video paths, wall time) via _write_job_record. - Add per_episode_metrics.success_rate (list of per-episode booleans) derived from MetricsDataCollection.recorded_data; drop subtask_success_rate from the aggregate metrics output. - Move _write_job_record outside the rebuild loop so multi-rebuild jobs write a single record with aggregated metrics. - Use job.status.value for status strings; remove redundant status param. - Stamp metrics_file and episode_record_dir with the same run_dir used for videos so all outputs from one run share a timestamp directory. - Fix failure-path _write_job_record call: pass None instead of {} so the per_episode_metrics guard does not raise AttributeError. --- isaaclab_arena/evaluation/eval_runner.py | 126 +++++++++++++++++-- isaaclab_arena/evaluation/eval_runner_cli.py | 12 ++ isaaclab_arena/evaluation/job_manager.py | 15 +++ isaaclab_arena/evaluation/policy_runner.py | 1 + isaaclab_arena/metrics/metrics_logger.py | 2 + isaaclab_arena/tests/test_job_manager.py | 35 ++++++ 6 files changed, 184 insertions(+), 7 deletions(-) diff --git a/isaaclab_arena/evaluation/eval_runner.py b/isaaclab_arena/evaluation/eval_runner.py index 91f9fff31..9c08b90b7 100644 --- a/isaaclab_arena/evaluation/eval_runner.py +++ b/isaaclab_arena/evaluation/eval_runner.py @@ -4,6 +4,7 @@ # SPDX-License-Identifier: Apache-2.0 import argparse +import contextlib import dataclasses import gc import json @@ -14,6 +15,7 @@ import tempfile import torch import traceback +from datetime import datetime, timezone from pathlib import Path from typing import TYPE_CHECKING @@ -22,7 +24,7 @@ from isaaclab_arena.evaluation.job_manager import Job, JobManager, Status from isaaclab_arena.evaluation.policy_runner import get_policy_cls, rollout_policy from isaaclab_arena.metrics.aggregate_metrics import aggregate_metrics -from isaaclab_arena.metrics.metrics_logger import MetricsLogger +from isaaclab_arena.metrics.metrics_logger import MetricsLogger, metrics_to_plain_python_types from isaaclab_arena.utils.isaaclab_utils.simulation_app import SimulationAppContext, teardown_simulation_app from isaaclab_arena.video.video_recording import VideoRecordingCfg, timestamped_run_dir, wrap_env_for_video from isaaclab_arena_environments.cli import get_arena_builder_from_cli, get_isaaclab_arena_environments_cli_parser @@ -37,6 +39,7 @@ def load_env( job_name: str, variations: list[str] | None = None, render_mode: str | None = None, + run_dir: str | None = None, ): args_parser = get_isaaclab_arena_environments_cli_parser() @@ -46,9 +49,11 @@ def load_env( env_name, env_cfg = arena_builder.build_registered() - # Set unique dataset filename for this job to avoid file locking conflicts + # Timestamp suffix prevents EAGAIN from a stale lock left by a crashed previous run. + # Reuse the run_dir basename so the HDF5 filename shares the same timestamp as all other outputs. if hasattr(env_cfg, "recorders") and env_cfg.recorders is not None: - env_cfg.recorders.dataset_filename = f"dataset_{job_name}" + ts = os.path.basename(run_dir) if run_dir else datetime.now().strftime("%Y-%m-%d_%H-%M-%S") + env_cfg.recorders.dataset_filename = f"dataset_{job_name}_{ts}" env = arena_builder.make_registered(env_cfg, render_mode=render_mode) # Don't reset here - rollout_policy() will reset the env. Every reset triggers a new episode, initializing recorder & creating a new hdf5 entry. @@ -168,6 +173,78 @@ def _split_episodes_across_rebuilds(num_episodes: int | None, num_rebuilds: int, return episodes_per_rebuild +def _write_job_record( + record_dir: str, + job: Job, + metrics: "MetricsDataCollection | None", + video_dir: str | None, + env=None, +) -> str: + """Write a JSON record for one completed or failed job.""" + os.makedirs(record_dir, exist_ok=True) + + video_paths: list[str] = [] + if video_dir: + job_video_dir = os.path.join(video_dir, job.name) + if os.path.isdir(job_video_dir): + video_paths = sorted( + os.path.join(job_video_dir, n) for n in os.listdir(job_video_dir) if n.endswith(".mp4") + ) + + wall_time = None + if job.start_time is not None and job.end_time is not None: + wall_time = round(job.end_time - job.start_time, 3) + + language_instruction = job.language_instruction + if language_instruction is None and env is not None: + with contextlib.suppress(Exception): + language_instruction = env.unwrapped.cfg.task_description + + hdf5_path = None + if env is not None: + with contextlib.suppress(Exception): + cfg = env.unwrapped.cfg + if hasattr(cfg, "recorders") and cfg.recorders is not None: + hdf5_path = str( + Path(cfg.recorders.dataset_export_dir_path) / (cfg.recorders.dataset_filename + ".hdf5") + ) + + plain_metrics = metrics_to_plain_python_types(metrics) if metrics else {} + plain_metrics.pop("subtask_success_rate", None) + + # Per-episode success flags: recorded_data holds one (1,) bool array per episode. + per_episode_metrics: dict[str, list] = {} + if metrics is not None and "success_rate" in metrics.metric_data_entries: + metric_data = metrics.metric_data_entries["success_rate"] + per_episode_metrics["success_rate"] = [bool(arr.flat[0]) for arr in metric_data.recorded_data] + + record = { + "schema_version": "1.0", + "job_name": job.name, + "task_name": job.task_name, + "embodiment": job.embodiment, + "env_params": dict(job.env_params), + "policy_type": job.policy_type, + "policy_config": dict(job.policy_config_dict), + "num_envs": job.num_envs, + "num_steps": job.num_steps, + "num_episodes": job.num_episodes, + "language_instruction": language_instruction, + "status": job.status.value, + "metrics": plain_metrics, + "per_episode_metrics": per_episode_metrics, + "hdf5_path": hdf5_path, + "video_paths": video_paths, + "wall_time_seconds": wall_time, + "created_at": datetime.now(timezone.utc).isoformat(), + } + + path = os.path.join(record_dir, f"{job.name}.json") + with open(path, "w", encoding="utf-8") as f: + json.dump(record, f, indent=2) + return path + + def _run_chunk(chunk_label: str, chunk_jobs: list[dict]) -> int: """Run ``chunk_jobs`` in a fresh ``eval_runner`` subprocess and return its exit code.""" print(f"[eval_runner] {chunk_label}", flush=True) @@ -248,17 +325,23 @@ def main(): with SimulationAppContext(args_cli): job_manager = JobManager(eval_jobs_config["jobs"]) - metrics_logger = MetricsLogger() job_manager.print_jobs_info() # One reverse-dated run directory shared by all jobs; each job gets a subdirectory within it. recording = args_cli.record_viewport_video or args_cli.record_camera_video - run_video_dir = timestamped_run_dir(args_cli.video_base_dir) if recording else args_cli.video_base_dir - + run_dir = timestamped_run_dir(args_cli.video_base_dir) + run_video_dir = run_dir if recording else args_cli.video_base_dir if args_cli.record_viewport_video: os.makedirs(run_video_dir, exist_ok=True) print(f"[INFO] Video recording enabled. Videos will be saved to: {run_video_dir}") + if args_cli.metrics_file is not None: + args_cli.metrics_file = os.path.join(run_dir, os.path.basename(args_cli.metrics_file)) + if args_cli.episode_record_dir is not None: + args_cli.episode_record_dir = os.path.join(run_dir, os.path.basename(args_cli.episode_record_dir)) + print(f"[INFO] Episode records will be written to: {args_cli.episode_record_dir}") + + metrics_logger = MetricsLogger(metrics_file=args_cli.metrics_file or "metrics.json") for job in job_manager: if job is None: @@ -283,7 +366,11 @@ def main(): camera_name_prefix=f"robot-cam-rebuild{rebuild_idx}", ) env = load_env( - job.arena_env_args, job.name, variations=job.variations, render_mode=video_cfg.render_mode + job.arena_env_args, + job.name, + variations=job.variations, + render_mode=video_cfg.render_mode, + run_dir=run_dir, ) policy = get_policy_from_job(job) @@ -317,6 +404,15 @@ def main(): except Exception as e: job_manager.complete_job(job, metrics={}, status=Status.FAILED) + if args_cli.episode_record_dir: + with contextlib.suppress(Exception): + _write_job_record( + args_cli.episode_record_dir, + job, + None, + run_video_dir if recording else None, + env=env, + ) print(f"Job {job.name} failed with error: {e}") print(f"Traceback: {traceback.format_exc()}") if not args_cli.continue_on_error: @@ -331,12 +427,28 @@ def main(): _collect_garbage_and_clear_cuda_cache() # Aggregate the metrics from the different experiments into a single view. + # Write the episode record once, post-aggregation, so multi-rebuild jobs produce + # a single JSON with the full aggregated metrics rather than one file per rebuild. + aggregated_metrics = None if metrics_per_run: aggregated_metrics = aggregate_metrics(metrics_per_run) metrics_logger.append_job_metrics(job.name, aggregated_metrics) + if args_cli.episode_record_dir and job.status != Status.FAILED: + with contextlib.suppress(Exception): + rec_path = _write_job_record( + args_cli.episode_record_dir, + job, + aggregated_metrics, + run_video_dir if recording else None, + ) + print(f"[INFO] Episode record written to: {rec_path}") + job_manager.print_jobs_info() metrics_logger.print_metrics() + if args_cli.metrics_file is not None: + metrics_logger.save_metrics_to_file() + print(f"[INFO] Metrics saved to: {metrics_logger.metrics_file}") if __name__ == "__main__": diff --git a/isaaclab_arena/evaluation/eval_runner_cli.py b/isaaclab_arena/evaluation/eval_runner_cli.py index dcd373f50..e38428231 100644 --- a/isaaclab_arena/evaluation/eval_runner_cli.py +++ b/isaaclab_arena/evaluation/eval_runner_cli.py @@ -38,6 +38,18 @@ def add_eval_runner_arguments(parser: argparse.ArgumentParser) -> None: default=False, help="Continue evaluation with remaining jobs when a job fails instead of stopping immediately.", ) + parser.add_argument( + "--metrics_file", + type=str, + default=None, + help="Path to save metrics as JSON. If not set, metrics are only printed to stdout.", + ) + parser.add_argument( + "--episode_record_dir", + type=str, + default=None, + help="Directory to write one JSON record per job after it completes.", + ) parser.add_argument( "--chunk_size", type=int, diff --git a/isaaclab_arena/evaluation/job_manager.py b/isaaclab_arena/evaluation/job_manager.py index e33839ab4..cedef7e2b 100644 --- a/isaaclab_arena/evaluation/job_manager.py +++ b/isaaclab_arena/evaluation/job_manager.py @@ -31,6 +31,9 @@ def __init__( status: Status = None, language_instruction: str = None, variations: list[str] = None, + task_name: str = None, + embodiment: str = None, + env_params: dict = None, ): """Initialize a Job instance. @@ -49,6 +52,9 @@ def __init__( takes precedence over the task's own description. variations: Hydra variation override strings (e.g. ``"cracker_box.color.enabled=true"``) applied when composing the environment cfg. Defaults to no overrides. + task_name: Environment/task name (e.g. "put_item_in_fridge_and_close_door"). + embodiment: Robot embodiment name (e.g. "gr1_joint"). + env_params: Raw arena_env_args dict from the job config. """ self.name = name self.arena_env_args = arena_env_args @@ -65,6 +71,9 @@ def __init__( self.policy_type = policy_type self.policy_config_dict = policy_config_dict if policy_config_dict is not None else {} self.language_instruction = language_instruction + self.task_name = task_name + self.embodiment = embodiment + self.env_params = env_params if env_params is not None else {} self.status = status if status is not None else Status.PENDING self.start_time = None self.end_time = None @@ -110,6 +119,9 @@ def from_dict(cls, data: dict) -> "Job": else: status = Status.PENDING num_envs = data["arena_env_args"].get("num_envs", 1) + task_name = data["arena_env_args"].get("environment") + embodiment = data["arena_env_args"].get("embodiment") + env_params = dict(data["arena_env_args"]) return cls( name=data["name"], @@ -123,6 +135,9 @@ def from_dict(cls, data: dict) -> "Job": status=status, language_instruction=data.get("language_instruction"), variations=cls.convert_variations_dict_to_hydra_overrides(data.get("variations", {})), + task_name=task_name, + embodiment=embodiment, + env_params=env_params, ) @classmethod diff --git a/isaaclab_arena/evaluation/policy_runner.py b/isaaclab_arena/evaluation/policy_runner.py index 5f04021ab..c3f109193 100644 --- a/isaaclab_arena/evaluation/policy_runner.py +++ b/isaaclab_arena/evaluation/policy_runner.py @@ -99,6 +99,7 @@ def rollout_policy( ) env_ids = (terminated | truncated).nonzero().flatten() policy.reset(env_ids=env_ids) + # Break if number of episodes is reached completed_episodes = env_ids.shape[0] num_episodes_completed += completed_episodes diff --git a/isaaclab_arena/metrics/metrics_logger.py b/isaaclab_arena/metrics/metrics_logger.py index 23263d0b3..c337ffaaa 100644 --- a/isaaclab_arena/metrics/metrics_logger.py +++ b/isaaclab_arena/metrics/metrics_logger.py @@ -7,6 +7,7 @@ import json import numpy as np +import os from typing import TYPE_CHECKING if TYPE_CHECKING: @@ -51,6 +52,7 @@ def append_job_metrics(self, job_name: str, metrics_data: MetricsDataCollection) def save_metrics_to_file(self): """Save all metrics to JSON file.""" + os.makedirs(os.path.dirname(self.metrics_file) or ".", exist_ok=True) with open(self.metrics_file, "w", encoding="utf-8") as f: json.dump(self.metrics_data, f, indent=2) diff --git a/isaaclab_arena/tests/test_job_manager.py b/isaaclab_arena/tests/test_job_manager.py index 65af7dfe1..5206ab6af 100644 --- a/isaaclab_arena/tests/test_job_manager.py +++ b/isaaclab_arena/tests/test_job_manager.py @@ -313,3 +313,38 @@ def test_job_manager_iterator(): assert job_names == ["job1", "job2", "job3"] assert job_manager.is_empty() + + +def test_job_from_dict_task_metadata(): + """Test that task_name, embodiment, and env_params are extracted from arena_env_args.""" + job_dict = { + "name": "test_meta", + "arena_env_args": { + "environment": "put_item_in_fridge_and_close_door", + "embodiment": "gr1_joint", + "object": "jug", + "num_envs": 2, + }, + "policy_type": "zero_action", + "num_steps": 10, + } + job = Job.from_dict(job_dict) + + assert job.task_name == "put_item_in_fridge_and_close_door" + assert job.embodiment == "gr1_joint" + assert job.env_params["environment"] == "put_item_in_fridge_and_close_door" + assert job.env_params["object"] == "jug" + assert job.num_envs == 2 + + +def test_job_task_metadata_defaults(): + """Task metadata fields default to None / empty when not provided.""" + job = Job( + "test_defaults", + num_envs=1, + arena_env_args=[], + policy_type="zero_action", + ) + assert job.task_name is None + assert job.embodiment is None + assert job.env_params == {} From 1f04d9a8082c5a013830ffa5bb181fbb83916f76 Mon Sep 17 00:00:00 2001 From: aiguldzh-nvidia Date: Wed, 17 Jun 2026 15:37:18 +0000 Subject: [PATCH 2/7] fix: place all run artifacts under a shared / directory MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit All three output paths (videos, metrics, episode records) insert run_ts between their parent dir and their final path segment via _stamp_path(), so every artifact from a single run lands under the same ancestor: --video_base_dir parent/videos → parent//videos/ --metrics_file parent/metrics.json → parent//metrics.json --episode_record_dir parent/episode_records → parent//episode_records/ run_ts is generated via timestamped_run_dir("") so the format string stays in one place. Signed-off-by: aiguldzh-nvidia --- isaaclab_arena/evaluation/eval_runner.py | 23 ++++++++++++++--------- 1 file changed, 14 insertions(+), 9 deletions(-) diff --git a/isaaclab_arena/evaluation/eval_runner.py b/isaaclab_arena/evaluation/eval_runner.py index 9c08b90b7..75ae767d1 100644 --- a/isaaclab_arena/evaluation/eval_runner.py +++ b/isaaclab_arena/evaluation/eval_runner.py @@ -37,9 +37,9 @@ def load_env( arena_env_args: list[str], job_name: str, + run_ts: str, variations: list[str] | None = None, render_mode: str | None = None, - run_dir: str | None = None, ): args_parser = get_isaaclab_arena_environments_cli_parser() @@ -50,10 +50,9 @@ def load_env( env_name, env_cfg = arena_builder.build_registered() # Timestamp suffix prevents EAGAIN from a stale lock left by a crashed previous run. - # Reuse the run_dir basename so the HDF5 filename shares the same timestamp as all other outputs. + # Reuse run_ts so the HDF5 filename shares the same timestamp as all other outputs. if hasattr(env_cfg, "recorders") and env_cfg.recorders is not None: - ts = os.path.basename(run_dir) if run_dir else datetime.now().strftime("%Y-%m-%d_%H-%M-%S") - env_cfg.recorders.dataset_filename = f"dataset_{job_name}_{ts}" + env_cfg.recorders.dataset_filename = f"dataset_{job_name}_{run_ts}" env = arena_builder.make_registered(env_cfg, render_mode=render_mode) # Don't reset here - rollout_policy() will reset the env. Every reset triggers a new episode, initializing recorder & creating a new hdf5 entry. @@ -173,6 +172,12 @@ def _split_episodes_across_rebuilds(num_episodes: int | None, num_rebuilds: int, return episodes_per_rebuild +def _stamp_path(path: str, run_ts: str) -> str: + """Insert run_ts between the parent dir and the final segment of path.""" + d, final = os.path.split(path.rstrip(os.sep)) + return os.path.join(d, run_ts, final) if d else os.path.join(final, run_ts) + + def _write_job_record( record_dir: str, job: Job, @@ -330,15 +335,15 @@ def main(): # One reverse-dated run directory shared by all jobs; each job gets a subdirectory within it. recording = args_cli.record_viewport_video or args_cli.record_camera_video - run_dir = timestamped_run_dir(args_cli.video_base_dir) - run_video_dir = run_dir if recording else args_cli.video_base_dir + run_ts = timestamped_run_dir("") + run_video_dir = _stamp_path(args_cli.video_base_dir, run_ts) if recording else args_cli.video_base_dir if args_cli.record_viewport_video: os.makedirs(run_video_dir, exist_ok=True) print(f"[INFO] Video recording enabled. Videos will be saved to: {run_video_dir}") if args_cli.metrics_file is not None: - args_cli.metrics_file = os.path.join(run_dir, os.path.basename(args_cli.metrics_file)) + args_cli.metrics_file = _stamp_path(args_cli.metrics_file, run_ts) if args_cli.episode_record_dir is not None: - args_cli.episode_record_dir = os.path.join(run_dir, os.path.basename(args_cli.episode_record_dir)) + args_cli.episode_record_dir = _stamp_path(args_cli.episode_record_dir, run_ts) print(f"[INFO] Episode records will be written to: {args_cli.episode_record_dir}") metrics_logger = MetricsLogger(metrics_file=args_cli.metrics_file or "metrics.json") @@ -370,7 +375,7 @@ def main(): job.name, variations=job.variations, render_mode=video_cfg.render_mode, - run_dir=run_dir, + run_ts=run_ts, ) policy = get_policy_from_job(job) From 9c1885f5509b8d96628035d0172264a0a9be10cc Mon Sep 17 00:00:00 2001 From: aiguldzh-nvidia Date: Wed, 17 Jun 2026 15:41:48 +0000 Subject: [PATCH 3/7] fix: capture hdf5_path before env closes; write record on partial success --- isaaclab_arena/evaluation/eval_runner.py | 25 +++++++++++++++--------- 1 file changed, 16 insertions(+), 9 deletions(-) diff --git a/isaaclab_arena/evaluation/eval_runner.py b/isaaclab_arena/evaluation/eval_runner.py index 75ae767d1..26692ed44 100644 --- a/isaaclab_arena/evaluation/eval_runner.py +++ b/isaaclab_arena/evaluation/eval_runner.py @@ -178,12 +178,22 @@ def _stamp_path(path: str, run_ts: str) -> str: return os.path.join(d, run_ts, final) if d else os.path.join(final, run_ts) +def _extract_hdf5_path(env) -> str | None: + """Return the HDF5 output path from env's recorder config, or None.""" + with contextlib.suppress(Exception): + cfg = env.unwrapped.cfg + if hasattr(cfg, "recorders") and cfg.recorders is not None: + return str(Path(cfg.recorders.dataset_export_dir_path) / (cfg.recorders.dataset_filename + ".hdf5")) + return None + + def _write_job_record( record_dir: str, job: Job, metrics: "MetricsDataCollection | None", video_dir: str | None, env=None, + hdf5_path: str | None = None, ) -> str: """Write a JSON record for one completed or failed job.""" os.makedirs(record_dir, exist_ok=True) @@ -205,14 +215,8 @@ def _write_job_record( with contextlib.suppress(Exception): language_instruction = env.unwrapped.cfg.task_description - hdf5_path = None - if env is not None: - with contextlib.suppress(Exception): - cfg = env.unwrapped.cfg - if hasattr(cfg, "recorders") and cfg.recorders is not None: - hdf5_path = str( - Path(cfg.recorders.dataset_export_dir_path) / (cfg.recorders.dataset_filename + ".hdf5") - ) + if hdf5_path is None and env is not None: + hdf5_path = _extract_hdf5_path(env) plain_metrics = metrics_to_plain_python_types(metrics) if metrics else {} plain_metrics.pop("subtask_success_rate", None) @@ -355,6 +359,7 @@ def main(): policy = None metrics_per_run: list[MetricsDataCollection] = [] + last_hdf5_path: str | None = None # num_episodes is the total across rebuilds, so split it over the rebuilds. num_episodes_per_rebuild = _split_episodes_across_rebuilds(job.num_episodes, job.num_rebuilds, job.name) @@ -401,6 +406,7 @@ def main(): language_instruction=job.language_instruction, ) + last_hdf5_path = _extract_hdf5_path(env) job_manager.complete_job(job, metrics=metrics, status=Status.COMPLETED) # users may not specify metrics for a task, although it's not recommended @@ -439,13 +445,14 @@ def main(): aggregated_metrics = aggregate_metrics(metrics_per_run) metrics_logger.append_job_metrics(job.name, aggregated_metrics) - if args_cli.episode_record_dir and job.status != Status.FAILED: + if args_cli.episode_record_dir and aggregated_metrics is not None: with contextlib.suppress(Exception): rec_path = _write_job_record( args_cli.episode_record_dir, job, aggregated_metrics, run_video_dir if recording else None, + hdf5_path=last_hdf5_path, ) print(f"[INFO] Episode record written to: {rec_path}") From 6f73c2131db995fb034ff193e95151393c34f799 Mon Sep 17 00:00:00 2001 From: aiguldzh-nvidia Date: Wed, 17 Jun 2026 15:47:11 +0000 Subject: [PATCH 4/7] fix: handle bare filenames correctly in _stamp_path --- isaaclab_arena/evaluation/eval_runner.py | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/isaaclab_arena/evaluation/eval_runner.py b/isaaclab_arena/evaluation/eval_runner.py index 26692ed44..a1512c3de 100644 --- a/isaaclab_arena/evaluation/eval_runner.py +++ b/isaaclab_arena/evaluation/eval_runner.py @@ -173,9 +173,18 @@ def _split_episodes_across_rebuilds(num_episodes: int | None, num_rebuilds: int, def _stamp_path(path: str, run_ts: str) -> str: - """Insert run_ts between the parent dir and the final segment of path.""" + """Insert run_ts so all run artifacts share a common timestamped ancestor. + + ``parent/name`` -> ``parent//name`` (file or directory with a parent) + ``name`` -> ``name/`` (bare directory name) + ``name.ext`` -> ``/name.ext`` (bare filename — ts prepended) + """ d, final = os.path.split(path.rstrip(os.sep)) - return os.path.join(d, run_ts, final) if d else os.path.join(final, run_ts) + if d: + return os.path.join(d, run_ts, final) + if os.path.splitext(final)[1]: + return os.path.join(run_ts, final) + return os.path.join(final, run_ts) def _extract_hdf5_path(env) -> str | None: From e22455d412078a7a6d7e206c6ddf3866cda472f4 Mon Sep 17 00:00:00 2001 From: aiguldzh-nvidia Date: Wed, 17 Jun 2026 16:05:31 +0000 Subject: [PATCH 5/7] refactor: simplify episode record schema --- isaaclab_arena/evaluation/eval_runner.py | 48 ++---------------------- isaaclab_arena/evaluation/job_manager.py | 22 +++-------- 2 files changed, 9 insertions(+), 61 deletions(-) diff --git a/isaaclab_arena/evaluation/eval_runner.py b/isaaclab_arena/evaluation/eval_runner.py index a1512c3de..f29e39322 100644 --- a/isaaclab_arena/evaluation/eval_runner.py +++ b/isaaclab_arena/evaluation/eval_runner.py @@ -15,7 +15,6 @@ import tempfile import torch import traceback -from datetime import datetime, timezone from pathlib import Path from typing import TYPE_CHECKING @@ -187,34 +186,15 @@ def _stamp_path(path: str, run_ts: str) -> str: return os.path.join(final, run_ts) -def _extract_hdf5_path(env) -> str | None: - """Return the HDF5 output path from env's recorder config, or None.""" - with contextlib.suppress(Exception): - cfg = env.unwrapped.cfg - if hasattr(cfg, "recorders") and cfg.recorders is not None: - return str(Path(cfg.recorders.dataset_export_dir_path) / (cfg.recorders.dataset_filename + ".hdf5")) - return None - - def _write_job_record( record_dir: str, job: Job, metrics: "MetricsDataCollection | None", - video_dir: str | None, env=None, - hdf5_path: str | None = None, ) -> str: """Write a JSON record for one completed or failed job.""" os.makedirs(record_dir, exist_ok=True) - video_paths: list[str] = [] - if video_dir: - job_video_dir = os.path.join(video_dir, job.name) - if os.path.isdir(job_video_dir): - video_paths = sorted( - os.path.join(job_video_dir, n) for n in os.listdir(job_video_dir) if n.endswith(".mp4") - ) - wall_time = None if job.start_time is not None and job.end_time is not None: wall_time = round(job.end_time - job.start_time, 3) @@ -224,9 +204,6 @@ def _write_job_record( with contextlib.suppress(Exception): language_instruction = env.unwrapped.cfg.task_description - if hdf5_path is None and env is not None: - hdf5_path = _extract_hdf5_path(env) - plain_metrics = metrics_to_plain_python_types(metrics) if metrics else {} plain_metrics.pop("subtask_success_rate", None) @@ -239,9 +216,7 @@ def _write_job_record( record = { "schema_version": "1.0", "job_name": job.name, - "task_name": job.task_name, - "embodiment": job.embodiment, - "env_params": dict(job.env_params), + "arena_env_args": job.arena_env_args_dict, "policy_type": job.policy_type, "policy_config": dict(job.policy_config_dict), "num_envs": job.num_envs, @@ -251,10 +226,7 @@ def _write_job_record( "status": job.status.value, "metrics": plain_metrics, "per_episode_metrics": per_episode_metrics, - "hdf5_path": hdf5_path, - "video_paths": video_paths, "wall_time_seconds": wall_time, - "created_at": datetime.now(timezone.utc).isoformat(), } path = os.path.join(record_dir, f"{job.name}.json") @@ -368,7 +340,6 @@ def main(): policy = None metrics_per_run: list[MetricsDataCollection] = [] - last_hdf5_path: str | None = None # num_episodes is the total across rebuilds, so split it over the rebuilds. num_episodes_per_rebuild = _split_episodes_across_rebuilds(job.num_episodes, job.num_rebuilds, job.name) @@ -415,7 +386,6 @@ def main(): language_instruction=job.language_instruction, ) - last_hdf5_path = _extract_hdf5_path(env) job_manager.complete_job(job, metrics=metrics, status=Status.COMPLETED) # users may not specify metrics for a task, although it's not recommended @@ -426,13 +396,7 @@ def main(): job_manager.complete_job(job, metrics={}, status=Status.FAILED) if args_cli.episode_record_dir: with contextlib.suppress(Exception): - _write_job_record( - args_cli.episode_record_dir, - job, - None, - run_video_dir if recording else None, - env=env, - ) + _write_job_record(args_cli.episode_record_dir, job, None, env=env) print(f"Job {job.name} failed with error: {e}") print(f"Traceback: {traceback.format_exc()}") if not args_cli.continue_on_error: @@ -456,13 +420,7 @@ def main(): if args_cli.episode_record_dir and aggregated_metrics is not None: with contextlib.suppress(Exception): - rec_path = _write_job_record( - args_cli.episode_record_dir, - job, - aggregated_metrics, - run_video_dir if recording else None, - hdf5_path=last_hdf5_path, - ) + rec_path = _write_job_record(args_cli.episode_record_dir, job, aggregated_metrics) print(f"[INFO] Episode record written to: {rec_path}") job_manager.print_jobs_info() diff --git a/isaaclab_arena/evaluation/job_manager.py b/isaaclab_arena/evaluation/job_manager.py index cedef7e2b..04805abe0 100644 --- a/isaaclab_arena/evaluation/job_manager.py +++ b/isaaclab_arena/evaluation/job_manager.py @@ -23,6 +23,7 @@ def __init__( name: str, num_envs: int, arena_env_args: list[str], + arena_env_args_dict: dict, policy_type: str, num_steps: int = None, num_episodes: int = None, @@ -31,15 +32,14 @@ def __init__( status: Status = None, language_instruction: str = None, variations: list[str] = None, - task_name: str = None, - embodiment: str = None, - env_params: dict = None, ): """Initialize a Job instance. Args: name: Job name, used to identify the job in the queue and in the logs. - arena_env_args: arguments for configuring the arena environment + arena_env_args: arguments for configuring the arena environment as a CLI args list. + arena_env_args_dict: original arena_env_args dict from the job config, kept for + serialization (e.g. episode records) without re-parsing the CLI list. num_envs: Number of environments to simulate num_steps: Number of steps to run the policy for (mutually exclusive with num_episodes) num_episodes: Number of episodes to run the policy for (mutually exclusive with num_steps) @@ -52,12 +52,10 @@ def __init__( takes precedence over the task's own description. variations: Hydra variation override strings (e.g. ``"cracker_box.color.enabled=true"``) applied when composing the environment cfg. Defaults to no overrides. - task_name: Environment/task name (e.g. "put_item_in_fridge_and_close_door"). - embodiment: Robot embodiment name (e.g. "gr1_joint"). - env_params: Raw arena_env_args dict from the job config. """ self.name = name self.arena_env_args = arena_env_args + self.arena_env_args_dict = arena_env_args_dict self.variations = variations if variations is not None else [] assert num_envs > 0, "num_envs must be greater than 0" assert not ( @@ -71,9 +69,6 @@ def __init__( self.policy_type = policy_type self.policy_config_dict = policy_config_dict if policy_config_dict is not None else {} self.language_instruction = language_instruction - self.task_name = task_name - self.embodiment = embodiment - self.env_params = env_params if env_params is not None else {} self.status = status if status is not None else Status.PENDING self.start_time = None self.end_time = None @@ -119,13 +114,11 @@ def from_dict(cls, data: dict) -> "Job": else: status = Status.PENDING num_envs = data["arena_env_args"].get("num_envs", 1) - task_name = data["arena_env_args"].get("environment") - embodiment = data["arena_env_args"].get("embodiment") - env_params = dict(data["arena_env_args"]) return cls( name=data["name"], arena_env_args=cls.convert_args_dict_to_cli_args_list(data["arena_env_args"]), + arena_env_args_dict=dict(data["arena_env_args"]), policy_type=data["policy_type"], num_envs=num_envs, num_steps=num_steps, @@ -135,9 +128,6 @@ def from_dict(cls, data: dict) -> "Job": status=status, language_instruction=data.get("language_instruction"), variations=cls.convert_variations_dict_to_hydra_overrides(data.get("variations", {})), - task_name=task_name, - embodiment=embodiment, - env_params=env_params, ) @classmethod From 1668585fb3dc108125198a0d2c8acc919a517e78 Mon Sep 17 00:00:00 2001 From: aiguldzh-nvidia Date: Wed, 17 Jun 2026 16:07:48 +0000 Subject: [PATCH 6/7] remove unnecessary contextlib.suppress around task_description read --- isaaclab_arena/evaluation/eval_runner.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/isaaclab_arena/evaluation/eval_runner.py b/isaaclab_arena/evaluation/eval_runner.py index f29e39322..4f2ea0e05 100644 --- a/isaaclab_arena/evaluation/eval_runner.py +++ b/isaaclab_arena/evaluation/eval_runner.py @@ -201,8 +201,7 @@ def _write_job_record( language_instruction = job.language_instruction if language_instruction is None and env is not None: - with contextlib.suppress(Exception): - language_instruction = env.unwrapped.cfg.task_description + language_instruction = env.unwrapped.cfg.task_description plain_metrics = metrics_to_plain_python_types(metrics) if metrics else {} plain_metrics.pop("subtask_success_rate", None) From 22048d6989f4fcd3e1dfbf2aa3165a5247167387 Mon Sep 17 00:00:00 2001 From: aiguldzh-nvidia Date: Wed, 17 Jun 2026 16:15:49 +0000 Subject: [PATCH 7/7] fix: add arena_env_args_dict to direct Job() calls in tests; makedirs for job subpaths --- isaaclab_arena/evaluation/eval_runner.py | 4 ++-- isaaclab_arena/evaluation/policy_runner.py | 1 - isaaclab_arena/tests/test_job_manager.py | 26 ++++++---------------- 3 files changed, 9 insertions(+), 22 deletions(-) diff --git a/isaaclab_arena/evaluation/eval_runner.py b/isaaclab_arena/evaluation/eval_runner.py index 4f2ea0e05..43cf1cf1b 100644 --- a/isaaclab_arena/evaluation/eval_runner.py +++ b/isaaclab_arena/evaluation/eval_runner.py @@ -193,7 +193,8 @@ def _write_job_record( env=None, ) -> str: """Write a JSON record for one completed or failed job.""" - os.makedirs(record_dir, exist_ok=True) + path = os.path.join(record_dir, f"{job.name}.json") + os.makedirs(os.path.dirname(path), exist_ok=True) wall_time = None if job.start_time is not None and job.end_time is not None: @@ -228,7 +229,6 @@ def _write_job_record( "wall_time_seconds": wall_time, } - path = os.path.join(record_dir, f"{job.name}.json") with open(path, "w", encoding="utf-8") as f: json.dump(record, f, indent=2) return path diff --git a/isaaclab_arena/evaluation/policy_runner.py b/isaaclab_arena/evaluation/policy_runner.py index c3f109193..5f04021ab 100644 --- a/isaaclab_arena/evaluation/policy_runner.py +++ b/isaaclab_arena/evaluation/policy_runner.py @@ -99,7 +99,6 @@ def rollout_policy( ) env_ids = (terminated | truncated).nonzero().flatten() policy.reset(env_ids=env_ids) - # Break if number of episodes is reached completed_episodes = env_ids.shape[0] num_episodes_completed += completed_episodes diff --git a/isaaclab_arena/tests/test_job_manager.py b/isaaclab_arena/tests/test_job_manager.py index 5206ab6af..8b8bf194b 100644 --- a/isaaclab_arena/tests/test_job_manager.py +++ b/isaaclab_arena/tests/test_job_manager.py @@ -103,6 +103,7 @@ def test_job_manager_update_job_status(): "test_job", num_envs=1, arena_env_args={"environment": "test_env"}, + arena_env_args_dict={"environment": "test_env"}, policy_type="zero_action", policy_config_dict={}, ) @@ -127,6 +128,7 @@ def test_job_manager_failed_job(): "failing_job", num_envs=1, arena_env_args={"environment": "test_env"}, + arena_env_args_dict={"environment": "test_env"}, policy_type="zero_action", policy_config_dict={}, ) @@ -315,8 +317,8 @@ def test_job_manager_iterator(): assert job_manager.is_empty() -def test_job_from_dict_task_metadata(): - """Test that task_name, embodiment, and env_params are extracted from arena_env_args.""" +def test_job_from_dict_arena_env_args_dict(): + """arena_env_args_dict preserves the original config dict for serialization.""" job_dict = { "name": "test_meta", "arena_env_args": { @@ -330,21 +332,7 @@ def test_job_from_dict_task_metadata(): } job = Job.from_dict(job_dict) - assert job.task_name == "put_item_in_fridge_and_close_door" - assert job.embodiment == "gr1_joint" - assert job.env_params["environment"] == "put_item_in_fridge_and_close_door" - assert job.env_params["object"] == "jug" + assert job.arena_env_args_dict["environment"] == "put_item_in_fridge_and_close_door" + assert job.arena_env_args_dict["embodiment"] == "gr1_joint" + assert job.arena_env_args_dict["object"] == "jug" assert job.num_envs == 2 - - -def test_job_task_metadata_defaults(): - """Task metadata fields default to None / empty when not provided.""" - job = Job( - "test_defaults", - num_envs=1, - arena_env_args=[], - policy_type="zero_action", - ) - assert job.task_name is None - assert job.embodiment is None - assert job.env_params == {}