diff --git a/isaaclab_arena/evaluation/eval_runner.py b/isaaclab_arena/evaluation/eval_runner.py index 91f9fff31..43cf1cf1b 100644 --- a/isaaclab_arena/evaluation/eval_runner.py +++ b/isaaclab_arena/evaluation/eval_runner.py @@ -4,6 +4,7 @@ # SPDX-License-Identifier: Apache-2.0 import argparse +import contextlib import dataclasses import gc import json @@ -22,7 +23,7 @@ from isaaclab_arena.evaluation.job_manager import Job, JobManager, Status from isaaclab_arena.evaluation.policy_runner import get_policy_cls, rollout_policy from isaaclab_arena.metrics.aggregate_metrics import aggregate_metrics -from isaaclab_arena.metrics.metrics_logger import MetricsLogger +from isaaclab_arena.metrics.metrics_logger import MetricsLogger, metrics_to_plain_python_types from isaaclab_arena.utils.isaaclab_utils.simulation_app import SimulationAppContext, teardown_simulation_app from isaaclab_arena.video.video_recording import VideoRecordingCfg, timestamped_run_dir, wrap_env_for_video from isaaclab_arena_environments.cli import get_arena_builder_from_cli, get_isaaclab_arena_environments_cli_parser @@ -35,6 +36,7 @@ def load_env( arena_env_args: list[str], job_name: str, + run_ts: str, variations: list[str] | None = None, render_mode: str | None = None, ): @@ -46,9 +48,10 @@ def load_env( env_name, env_cfg = arena_builder.build_registered() - # Set unique dataset filename for this job to avoid file locking conflicts + # Timestamp suffix prevents EAGAIN from a stale lock left by a crashed previous run. + # Reuse run_ts so the HDF5 filename shares the same timestamp as all other outputs. if hasattr(env_cfg, "recorders") and env_cfg.recorders is not None: - env_cfg.recorders.dataset_filename = f"dataset_{job_name}" + env_cfg.recorders.dataset_filename = f"dataset_{job_name}_{run_ts}" env = arena_builder.make_registered(env_cfg, render_mode=render_mode) # Don't reset here - rollout_policy() will reset the env. Every reset triggers a new episode, initializing recorder & creating a new hdf5 entry. @@ -168,6 +171,69 @@ def _split_episodes_across_rebuilds(num_episodes: int | None, num_rebuilds: int, return episodes_per_rebuild +def _stamp_path(path: str, run_ts: str) -> str: + """Insert run_ts so all run artifacts share a common timestamped ancestor. + + ``parent/name`` -> ``parent//name`` (file or directory with a parent) + ``name`` -> ``name/`` (bare directory name) + ``name.ext`` -> ``/name.ext`` (bare filename — ts prepended) + """ + d, final = os.path.split(path.rstrip(os.sep)) + if d: + return os.path.join(d, run_ts, final) + if os.path.splitext(final)[1]: + return os.path.join(run_ts, final) + return os.path.join(final, run_ts) + + +def _write_job_record( + record_dir: str, + job: Job, + metrics: "MetricsDataCollection | None", + env=None, +) -> str: + """Write a JSON record for one completed or failed job.""" + path = os.path.join(record_dir, f"{job.name}.json") + os.makedirs(os.path.dirname(path), exist_ok=True) + + wall_time = None + if job.start_time is not None and job.end_time is not None: + wall_time = round(job.end_time - job.start_time, 3) + + language_instruction = job.language_instruction + if language_instruction is None and env is not None: + language_instruction = env.unwrapped.cfg.task_description + + plain_metrics = metrics_to_plain_python_types(metrics) if metrics else {} + plain_metrics.pop("subtask_success_rate", None) + + # Per-episode success flags: recorded_data holds one (1,) bool array per episode. + per_episode_metrics: dict[str, list] = {} + if metrics is not None and "success_rate" in metrics.metric_data_entries: + metric_data = metrics.metric_data_entries["success_rate"] + per_episode_metrics["success_rate"] = [bool(arr.flat[0]) for arr in metric_data.recorded_data] + + record = { + "schema_version": "1.0", + "job_name": job.name, + "arena_env_args": job.arena_env_args_dict, + "policy_type": job.policy_type, + "policy_config": dict(job.policy_config_dict), + "num_envs": job.num_envs, + "num_steps": job.num_steps, + "num_episodes": job.num_episodes, + "language_instruction": language_instruction, + "status": job.status.value, + "metrics": plain_metrics, + "per_episode_metrics": per_episode_metrics, + "wall_time_seconds": wall_time, + } + + with open(path, "w", encoding="utf-8") as f: + json.dump(record, f, indent=2) + return path + + def _run_chunk(chunk_label: str, chunk_jobs: list[dict]) -> int: """Run ``chunk_jobs`` in a fresh ``eval_runner`` subprocess and return its exit code.""" print(f"[eval_runner] {chunk_label}", flush=True) @@ -248,17 +314,23 @@ def main(): with SimulationAppContext(args_cli): job_manager = JobManager(eval_jobs_config["jobs"]) - metrics_logger = MetricsLogger() job_manager.print_jobs_info() # One reverse-dated run directory shared by all jobs; each job gets a subdirectory within it. recording = args_cli.record_viewport_video or args_cli.record_camera_video - run_video_dir = timestamped_run_dir(args_cli.video_base_dir) if recording else args_cli.video_base_dir - + run_ts = timestamped_run_dir("") + run_video_dir = _stamp_path(args_cli.video_base_dir, run_ts) if recording else args_cli.video_base_dir if args_cli.record_viewport_video: os.makedirs(run_video_dir, exist_ok=True) print(f"[INFO] Video recording enabled. Videos will be saved to: {run_video_dir}") + if args_cli.metrics_file is not None: + args_cli.metrics_file = _stamp_path(args_cli.metrics_file, run_ts) + if args_cli.episode_record_dir is not None: + args_cli.episode_record_dir = _stamp_path(args_cli.episode_record_dir, run_ts) + print(f"[INFO] Episode records will be written to: {args_cli.episode_record_dir}") + + metrics_logger = MetricsLogger(metrics_file=args_cli.metrics_file or "metrics.json") for job in job_manager: if job is None: @@ -283,7 +355,11 @@ def main(): camera_name_prefix=f"robot-cam-rebuild{rebuild_idx}", ) env = load_env( - job.arena_env_args, job.name, variations=job.variations, render_mode=video_cfg.render_mode + job.arena_env_args, + job.name, + variations=job.variations, + render_mode=video_cfg.render_mode, + run_ts=run_ts, ) policy = get_policy_from_job(job) @@ -317,6 +393,9 @@ def main(): except Exception as e: job_manager.complete_job(job, metrics={}, status=Status.FAILED) + if args_cli.episode_record_dir: + with contextlib.suppress(Exception): + _write_job_record(args_cli.episode_record_dir, job, None, env=env) print(f"Job {job.name} failed with error: {e}") print(f"Traceback: {traceback.format_exc()}") if not args_cli.continue_on_error: @@ -331,12 +410,23 @@ def main(): _collect_garbage_and_clear_cuda_cache() # Aggregate the metrics from the different experiments into a single view. + # Write the episode record once, post-aggregation, so multi-rebuild jobs produce + # a single JSON with the full aggregated metrics rather than one file per rebuild. + aggregated_metrics = None if metrics_per_run: aggregated_metrics = aggregate_metrics(metrics_per_run) metrics_logger.append_job_metrics(job.name, aggregated_metrics) + if args_cli.episode_record_dir and aggregated_metrics is not None: + with contextlib.suppress(Exception): + rec_path = _write_job_record(args_cli.episode_record_dir, job, aggregated_metrics) + print(f"[INFO] Episode record written to: {rec_path}") + job_manager.print_jobs_info() metrics_logger.print_metrics() + if args_cli.metrics_file is not None: + metrics_logger.save_metrics_to_file() + print(f"[INFO] Metrics saved to: {metrics_logger.metrics_file}") if __name__ == "__main__": diff --git a/isaaclab_arena/evaluation/eval_runner_cli.py b/isaaclab_arena/evaluation/eval_runner_cli.py index dcd373f50..e38428231 100644 --- a/isaaclab_arena/evaluation/eval_runner_cli.py +++ b/isaaclab_arena/evaluation/eval_runner_cli.py @@ -38,6 +38,18 @@ def add_eval_runner_arguments(parser: argparse.ArgumentParser) -> None: default=False, help="Continue evaluation with remaining jobs when a job fails instead of stopping immediately.", ) + parser.add_argument( + "--metrics_file", + type=str, + default=None, + help="Path to save metrics as JSON. If not set, metrics are only printed to stdout.", + ) + parser.add_argument( + "--episode_record_dir", + type=str, + default=None, + help="Directory to write one JSON record per job after it completes.", + ) parser.add_argument( "--chunk_size", type=int, diff --git a/isaaclab_arena/evaluation/job_manager.py b/isaaclab_arena/evaluation/job_manager.py index e33839ab4..04805abe0 100644 --- a/isaaclab_arena/evaluation/job_manager.py +++ b/isaaclab_arena/evaluation/job_manager.py @@ -23,6 +23,7 @@ def __init__( name: str, num_envs: int, arena_env_args: list[str], + arena_env_args_dict: dict, policy_type: str, num_steps: int = None, num_episodes: int = None, @@ -36,7 +37,9 @@ def __init__( Args: name: Job name, used to identify the job in the queue and in the logs. - arena_env_args: arguments for configuring the arena environment + arena_env_args: arguments for configuring the arena environment as a CLI args list. + arena_env_args_dict: original arena_env_args dict from the job config, kept for + serialization (e.g. episode records) without re-parsing the CLI list. num_envs: Number of environments to simulate num_steps: Number of steps to run the policy for (mutually exclusive with num_episodes) num_episodes: Number of episodes to run the policy for (mutually exclusive with num_steps) @@ -52,6 +55,7 @@ def __init__( """ self.name = name self.arena_env_args = arena_env_args + self.arena_env_args_dict = arena_env_args_dict self.variations = variations if variations is not None else [] assert num_envs > 0, "num_envs must be greater than 0" assert not ( @@ -114,6 +118,7 @@ def from_dict(cls, data: dict) -> "Job": return cls( name=data["name"], arena_env_args=cls.convert_args_dict_to_cli_args_list(data["arena_env_args"]), + arena_env_args_dict=dict(data["arena_env_args"]), policy_type=data["policy_type"], num_envs=num_envs, num_steps=num_steps, diff --git a/isaaclab_arena/metrics/metrics_logger.py b/isaaclab_arena/metrics/metrics_logger.py index 23263d0b3..c337ffaaa 100644 --- a/isaaclab_arena/metrics/metrics_logger.py +++ b/isaaclab_arena/metrics/metrics_logger.py @@ -7,6 +7,7 @@ import json import numpy as np +import os from typing import TYPE_CHECKING if TYPE_CHECKING: @@ -51,6 +52,7 @@ def append_job_metrics(self, job_name: str, metrics_data: MetricsDataCollection) def save_metrics_to_file(self): """Save all metrics to JSON file.""" + os.makedirs(os.path.dirname(self.metrics_file) or ".", exist_ok=True) with open(self.metrics_file, "w", encoding="utf-8") as f: json.dump(self.metrics_data, f, indent=2) diff --git a/isaaclab_arena/tests/test_job_manager.py b/isaaclab_arena/tests/test_job_manager.py index 65af7dfe1..8b8bf194b 100644 --- a/isaaclab_arena/tests/test_job_manager.py +++ b/isaaclab_arena/tests/test_job_manager.py @@ -103,6 +103,7 @@ def test_job_manager_update_job_status(): "test_job", num_envs=1, arena_env_args={"environment": "test_env"}, + arena_env_args_dict={"environment": "test_env"}, policy_type="zero_action", policy_config_dict={}, ) @@ -127,6 +128,7 @@ def test_job_manager_failed_job(): "failing_job", num_envs=1, arena_env_args={"environment": "test_env"}, + arena_env_args_dict={"environment": "test_env"}, policy_type="zero_action", policy_config_dict={}, ) @@ -313,3 +315,24 @@ def test_job_manager_iterator(): assert job_names == ["job1", "job2", "job3"] assert job_manager.is_empty() + + +def test_job_from_dict_arena_env_args_dict(): + """arena_env_args_dict preserves the original config dict for serialization.""" + job_dict = { + "name": "test_meta", + "arena_env_args": { + "environment": "put_item_in_fridge_and_close_door", + "embodiment": "gr1_joint", + "object": "jug", + "num_envs": 2, + }, + "policy_type": "zero_action", + "num_steps": 10, + } + job = Job.from_dict(job_dict) + + assert job.arena_env_args_dict["environment"] == "put_item_in_fridge_and_close_door" + assert job.arena_env_args_dict["embodiment"] == "gr1_joint" + assert job.arena_env_args_dict["object"] == "jug" + assert job.num_envs == 2