Skip to content
104 changes: 97 additions & 7 deletions isaaclab_arena/evaluation/eval_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
# SPDX-License-Identifier: Apache-2.0

import argparse
import contextlib
import dataclasses
import gc
import json
Expand All @@ -22,7 +23,7 @@
from isaaclab_arena.evaluation.job_manager import Job, JobManager, Status
from isaaclab_arena.evaluation.policy_runner import get_policy_cls, rollout_policy
from isaaclab_arena.metrics.aggregate_metrics import aggregate_metrics
from isaaclab_arena.metrics.metrics_logger import MetricsLogger
from isaaclab_arena.metrics.metrics_logger import MetricsLogger, metrics_to_plain_python_types
from isaaclab_arena.utils.isaaclab_utils.simulation_app import SimulationAppContext, teardown_simulation_app
from isaaclab_arena.video.video_recording import VideoRecordingCfg, timestamped_run_dir, wrap_env_for_video
from isaaclab_arena_environments.cli import get_arena_builder_from_cli, get_isaaclab_arena_environments_cli_parser
Expand All @@ -35,6 +36,7 @@
def load_env(
arena_env_args: list[str],
job_name: str,
run_ts: str,
variations: list[str] | None = None,
render_mode: str | None = None,
):
Expand All @@ -46,9 +48,10 @@ def load_env(

env_name, env_cfg = arena_builder.build_registered()

# Set unique dataset filename for this job to avoid file locking conflicts
# Timestamp suffix prevents EAGAIN from a stale lock left by a crashed previous run.
# Reuse run_ts so the HDF5 filename shares the same timestamp as all other outputs.
if hasattr(env_cfg, "recorders") and env_cfg.recorders is not None:
env_cfg.recorders.dataset_filename = f"dataset_{job_name}"
env_cfg.recorders.dataset_filename = f"dataset_{job_name}_{run_ts}"

env = arena_builder.make_registered(env_cfg, render_mode=render_mode)
# Don't reset here - rollout_policy() will reset the env. Every reset triggers a new episode, initializing recorder & creating a new hdf5 entry.
Expand Down Expand Up @@ -168,6 +171,69 @@ def _split_episodes_across_rebuilds(num_episodes: int | None, num_rebuilds: int,
return episodes_per_rebuild


def _stamp_path(path: str, run_ts: str) -> str:
"""Insert run_ts so all run artifacts share a common timestamped ancestor.

``parent/name`` -> ``parent/<ts>/name`` (file or directory with a parent)
``name`` -> ``name/<ts>`` (bare directory name)
``name.ext`` -> ``<ts>/name.ext`` (bare filename — ts prepended)
"""
d, final = os.path.split(path.rstrip(os.sep))
if d:
return os.path.join(d, run_ts, final)
if os.path.splitext(final)[1]:
return os.path.join(run_ts, final)
return os.path.join(final, run_ts)


def _write_job_record(
record_dir: str,
job: Job,
metrics: "MetricsDataCollection | None",
env=None,
) -> str:
"""Write a JSON record for one completed or failed job."""
path = os.path.join(record_dir, f"{job.name}.json")
os.makedirs(os.path.dirname(path), exist_ok=True)

wall_time = None
if job.start_time is not None and job.end_time is not None:
wall_time = round(job.end_time - job.start_time, 3)

language_instruction = job.language_instruction
if language_instruction is None and env is not None:
language_instruction = env.unwrapped.cfg.task_description

plain_metrics = metrics_to_plain_python_types(metrics) if metrics else {}
plain_metrics.pop("subtask_success_rate", None)

# Per-episode success flags: recorded_data holds one (1,) bool array per episode.
per_episode_metrics: dict[str, list] = {}
if metrics is not None and "success_rate" in metrics.metric_data_entries:
metric_data = metrics.metric_data_entries["success_rate"]
per_episode_metrics["success_rate"] = [bool(arr.flat[0]) for arr in metric_data.recorded_data]

record = {
"schema_version": "1.0",
"job_name": job.name,
"arena_env_args": job.arena_env_args_dict,
"policy_type": job.policy_type,
"policy_config": dict(job.policy_config_dict),
"num_envs": job.num_envs,
"num_steps": job.num_steps,
"num_episodes": job.num_episodes,
Comment on lines +218 to +224

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All of these job.* fields are present in the config file that is input to the eval_runner.py via the config file. Here, we re-extract the information and reexport it.

Is there a reason that we can't just use the input file (for example see here), rather than reading and then re-exporting the information?

"language_instruction": language_instruction,
"status": job.status.value,
"metrics": plain_metrics,
"per_episode_metrics": per_episode_metrics,
"wall_time_seconds": wall_time,
}

with open(path, "w", encoding="utf-8") as f:
json.dump(record, f, indent=2)
return path


def _run_chunk(chunk_label: str, chunk_jobs: list[dict]) -> int:
"""Run ``chunk_jobs`` in a fresh ``eval_runner`` subprocess and return its exit code."""
print(f"[eval_runner] {chunk_label}", flush=True)
Expand Down Expand Up @@ -248,17 +314,23 @@ def main():

with SimulationAppContext(args_cli):
job_manager = JobManager(eval_jobs_config["jobs"])
metrics_logger = MetricsLogger()

job_manager.print_jobs_info()

# One reverse-dated run directory shared by all jobs; each job gets a subdirectory within it.
recording = args_cli.record_viewport_video or args_cli.record_camera_video
run_video_dir = timestamped_run_dir(args_cli.video_base_dir) if recording else args_cli.video_base_dir

run_ts = timestamped_run_dir("")
run_video_dir = _stamp_path(args_cli.video_base_dir, run_ts) if recording else args_cli.video_base_dir
if args_cli.record_viewport_video:
os.makedirs(run_video_dir, exist_ok=True)
print(f"[INFO] Video recording enabled. Videos will be saved to: {run_video_dir}")
if args_cli.metrics_file is not None:
args_cli.metrics_file = _stamp_path(args_cli.metrics_file, run_ts)
if args_cli.episode_record_dir is not None:
args_cli.episode_record_dir = _stamp_path(args_cli.episode_record_dir, run_ts)
print(f"[INFO] Episode records will be written to: {args_cli.episode_record_dir}")

metrics_logger = MetricsLogger(metrics_file=args_cli.metrics_file or "metrics.json")

for job in job_manager:
if job is None:
Expand All @@ -283,7 +355,11 @@ def main():
camera_name_prefix=f"robot-cam-rebuild{rebuild_idx}",
)
env = load_env(
job.arena_env_args, job.name, variations=job.variations, render_mode=video_cfg.render_mode
job.arena_env_args,
job.name,
variations=job.variations,
render_mode=video_cfg.render_mode,
run_ts=run_ts,
)

policy = get_policy_from_job(job)
Expand Down Expand Up @@ -317,6 +393,9 @@ def main():

except Exception as e:
job_manager.complete_job(job, metrics={}, status=Status.FAILED)
if args_cli.episode_record_dir:
with contextlib.suppress(Exception):
_write_job_record(args_cli.episode_record_dir, job, None, env=env)
print(f"Job {job.name} failed with error: {e}")
print(f"Traceback: {traceback.format_exc()}")
if not args_cli.continue_on_error:
Expand All @@ -331,12 +410,23 @@ def main():
_collect_garbage_and_clear_cuda_cache()

# Aggregate the metrics from the different experiments into a single view.
# Write the episode record once, post-aggregation, so multi-rebuild jobs produce
# a single JSON with the full aggregated metrics rather than one file per rebuild.
aggregated_metrics = None
if metrics_per_run:
aggregated_metrics = aggregate_metrics(metrics_per_run)
metrics_logger.append_job_metrics(job.name, aggregated_metrics)

if args_cli.episode_record_dir and aggregated_metrics is not None:
with contextlib.suppress(Exception):
rec_path = _write_job_record(args_cli.episode_record_dir, job, aggregated_metrics)
print(f"[INFO] Episode record written to: {rec_path}")
Comment thread
aiguldzh-nvidia marked this conversation as resolved.
Comment on lines +420 to +423

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 language_instruction is always null in completed job records when not set in job config

_write_job_record is called without env (so env=None). Inside the function, the fallback env.unwrapped.cfg.task_description is only reached when env is not None — but by this point the env has been closed and set to None in the finally block. When job.language_instruction is None (the common case for tasks that carry their description in the environment config rather than the YAML job spec), the language_instruction field will always be null in every completed job's JSON record. The failure path at line 398 correctly passes env=env while it is still open; the success path needs the same treatment but cannot, because env is already gone. The fix is to capture the value from env before teardown — e.g., store it in a local variable at the end of the try block and pass it as an explicit argument to _write_job_record.


job_manager.print_jobs_info()
metrics_logger.print_metrics()
if args_cli.metrics_file is not None:
metrics_logger.save_metrics_to_file()
print(f"[INFO] Metrics saved to: {metrics_logger.metrics_file}")
Comment on lines +427 to +429

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for adding that!



if __name__ == "__main__":
Expand Down
12 changes: 12 additions & 0 deletions isaaclab_arena/evaluation/eval_runner_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,18 @@ def add_eval_runner_arguments(parser: argparse.ArgumentParser) -> None:
default=False,
help="Continue evaluation with remaining jobs when a job fails instead of stopping immediately.",
)
parser.add_argument(
"--metrics_file",
type=str,
default=None,
help="Path to save metrics as JSON. If not set, metrics are only printed to stdout.",
)
parser.add_argument(
"--episode_record_dir",
type=str,
default=None,
help="Directory to write one JSON record per job after it completes.",
)
parser.add_argument(
"--chunk_size",
type=int,
Expand Down
7 changes: 6 additions & 1 deletion isaaclab_arena/evaluation/job_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ def __init__(
name: str,
num_envs: int,
arena_env_args: list[str],
arena_env_args_dict: dict,
policy_type: str,
num_steps: int = None,
num_episodes: int = None,
Expand All @@ -36,7 +37,9 @@ def __init__(

Args:
name: Job name, used to identify the job in the queue and in the logs.
arena_env_args: arguments for configuring the arena environment
arena_env_args: arguments for configuring the arena environment as a CLI args list.
arena_env_args_dict: original arena_env_args dict from the job config, kept for
serialization (e.g. episode records) without re-parsing the CLI list.
num_envs: Number of environments to simulate
num_steps: Number of steps to run the policy for (mutually exclusive with num_episodes)
num_episodes: Number of episodes to run the policy for (mutually exclusive with num_steps)
Expand All @@ -52,6 +55,7 @@ def __init__(
"""
self.name = name
self.arena_env_args = arena_env_args
self.arena_env_args_dict = arena_env_args_dict
self.variations = variations if variations is not None else []
assert num_envs > 0, "num_envs must be greater than 0"
assert not (
Expand Down Expand Up @@ -114,6 +118,7 @@ def from_dict(cls, data: dict) -> "Job":
return cls(
name=data["name"],
arena_env_args=cls.convert_args_dict_to_cli_args_list(data["arena_env_args"]),
arena_env_args_dict=dict(data["arena_env_args"]),
policy_type=data["policy_type"],
num_envs=num_envs,
num_steps=num_steps,
Expand Down
2 changes: 2 additions & 0 deletions isaaclab_arena/metrics/metrics_logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import json
import numpy as np
import os
from typing import TYPE_CHECKING

if TYPE_CHECKING:
Expand Down Expand Up @@ -51,6 +52,7 @@ def append_job_metrics(self, job_name: str, metrics_data: MetricsDataCollection)

def save_metrics_to_file(self):
"""Save all metrics to JSON file."""
os.makedirs(os.path.dirname(self.metrics_file) or ".", exist_ok=True)
with open(self.metrics_file, "w", encoding="utf-8") as f:
json.dump(self.metrics_data, f, indent=2)

Expand Down
23 changes: 23 additions & 0 deletions isaaclab_arena/tests/test_job_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ def test_job_manager_update_job_status():
"test_job",
num_envs=1,
arena_env_args={"environment": "test_env"},
arena_env_args_dict={"environment": "test_env"},
policy_type="zero_action",
policy_config_dict={},
)
Expand All @@ -127,6 +128,7 @@ def test_job_manager_failed_job():
"failing_job",
num_envs=1,
arena_env_args={"environment": "test_env"},
arena_env_args_dict={"environment": "test_env"},
policy_type="zero_action",
policy_config_dict={},
)
Expand Down Expand Up @@ -313,3 +315,24 @@ def test_job_manager_iterator():

assert job_names == ["job1", "job2", "job3"]
assert job_manager.is_empty()


def test_job_from_dict_arena_env_args_dict():
"""arena_env_args_dict preserves the original config dict for serialization."""
job_dict = {
"name": "test_meta",
"arena_env_args": {
"environment": "put_item_in_fridge_and_close_door",
"embodiment": "gr1_joint",
"object": "jug",
"num_envs": 2,
},
"policy_type": "zero_action",
"num_steps": 10,
}
job = Job.from_dict(job_dict)

assert job.arena_env_args_dict["environment"] == "put_item_in_fridge_and_close_door"
assert job.arena_env_args_dict["embodiment"] == "gr1_joint"
assert job.arena_env_args_dict["object"] == "jug"
assert job.num_envs == 2
Loading