Extend eval runner: per-env camera recording, metrics file, and episode boundaries#776
Extend eval runner: per-env camera recording, metrics file, and episode boundaries#776aiguldzh-nvidia wants to merge 4 commits into
Conversation
There was a problem hiding this comment.
Code Review: EpisodeRecord with Task Metadata, Metrics Output, All Envs Recording and Episode Boundaries
Summary
This PR adds a well-structured EpisodeRecord schema and wires up episode boundaries, metrics persistence, and multi-env camera recording into the eval runner. The overall architecture is clean — dataclass-based schema with versioning, clear separation between building and writing records, and thoughtful extension points for future phases. A few issues worth addressing before merge.
🔴 Critical Issues
1. MetricsLogger receives non-timestamped path (race with path mutation)
File: eval_runner.py (lines ~205–215)
metrics_logger = MetricsLogger(metrics_file=args_cli.metrics_file or "metrics.json")
# ... later ...
if args_cli.metrics_file is not None:
base, ext = os.path.splitext(args_cli.metrics_file)
args_cli.metrics_file = f"{base}_{run_ts}{ext}"MetricsLogger is initialized before args_cli.metrics_file is mutated to include the timestamp. The logger stores the original path at construction time, so save_metrics_to_file() writes to the non-timestamped path while the [INFO] print references metrics_logger.metrics_file (also non-timestamped). The timestamped path is effectively dead code.
Suggestion: Move MetricsLogger instantiation to after the timestamp mutation, or explicitly update metrics_logger.metrics_file post-mutation.
2. episode_boundaries lost on exception — NameError in caller
File: eval_runner.py (line ~268)
metrics, episode_boundaries = rollout_policy(...)If rollout_policy raises an exception (which re-raises after pbar cleanup), the tuple unpacking never completes, so episode_boundaries is unbound in the except block's scope. While the failure-path build_episode_record(...) call doesn't pass episode_boundaries, any future code that references it in the except block would hit a NameError. The variable should be initialized before the call:
episode_boundaries = []
metrics, episode_boundaries = rollout_policy(...)This also future-proofs the failure record to include partial boundaries collected before the crash.
🟡 Moderate Issues
3. MetricsLogger always receives a metrics_file argument even when user didn't request file output
File: eval_runner.py
metrics_logger = MetricsLogger(metrics_file=args_cli.metrics_file or "metrics.json")When --metrics_file is not passed, args_cli.metrics_file is None, so this falls through to "metrics.json". If MetricsLogger writes on destruction or has side effects, this could produce an unexpected metrics.json file. The original code passed no argument — this changes the default behavior. Consider:
metrics_logger = MetricsLogger(metrics_file=args_cli.metrics_file)and only calling save_metrics_to_file() when the file is set (which you already gate on args_cli.metrics_file is not None).
4. Off-by-one risk in num_episodes mode trailing boundary
File: policy_runner.py (lines ~141–148)
last_step = (num_steps_completed - 1) if num_steps is not None else num_steps_completedIn num_episodes mode, the comment says "break fired before increment." This is correct if the break at the episode-count check fires before num_steps_completed += 1. However, the break can also fire when num_steps is not None inside the inner if num_steps ... branch. Verify that both exit paths are covered by the same formula, especially when num_episodes mode terminates mid-step due to the num_episodes_completed >= num_episodes check happening after num_steps_completed is already incremented (the +=1 appears before the episode count check in the original code).
5. Memory accumulation unbounded — no chunked flush for multi-env recording
File: camera_video.py
The docstring warns about ~3.8 GB for 10 envs / 500 steps / 512×512×3, but there's no runtime guard. For longer runs where video_length is large, the in-memory buffers will grow linearly until _flush(). Consider adding a configurable max_buffer_frames parameter that triggers an intermediate flush-and-stitch, or at minimum a runtime warning when the estimated buffer exceeds a threshold (e.g., 2 GB).
🟢 Suggestions / Nits
6. EpisodeRecord.arena_env_args typed as list[str] but populated with list(job.arena_env_args)
File: episode_record.py (field declaration) vs job_manager.py (arena_env_args is read from a dict)
In Job.from_dict, arena_env_args comes from converting a dict to a CLI list — confirm the runtime type is always list[str] and not list[Any]. If the config dict values are non-string, this will produce a JSON record with mixed types that breaks the schema contract.
7. _find_video_paths only searches one level deep
File: episode_record.py
job_video_dir = os.path.join(video_dir, job_name)Now that video_dir already has the timestamp subdirectory appended (video_dir/run_ts), the video files are at video_dir/run_ts/job_name/*.mp4. But _find_video_paths receives args_cli.video_dir (already timestamped) and appends job_name. This seems correct, but worth a comment to clarify the expected directory layout for future maintainers.
8. CI failures unrelated to this PR
The "Run tests" and "GR00T closed-loop E2E" checks are failing — confirm these are pre-existing failures on main and not regressions introduced here.
✅ What Looks Good
- Clean dataclass schema with forward/backward compatibility (
from_dictfilters toknownfields) - Schema versioning from day one
- Episode boundary tracking is well-reasoned — start/end inclusive, handles both termination modes
- Failure-path episode record writing (silently swallowed exceptions prevent cascading failures)
- Timestamp-isolated output directories prevent stale file accumulation
- Good docstrings explaining the frame-index ↔ step-index correspondence
Update 2 (commit 66dd66a)
The latest commits complete the scope reduction by:
- Deleting
episode_record.pyentirely (335 lines removed) - Removing
--episode_record_dirCLI argument from eval_runner_cli.py - Cleaning up eval_runner.py to remove all episode record building/writing logic
- Adding new test coverage for task metadata extraction in Job.from_dict
Previous findings status:
| Finding | Status |
|---|---|
| 🔴 #1 (MetricsLogger timestamped-path race) | |
| 🔴 #2 (episode_boundaries NameError) | ✅ Moot — boundaries now discarded (_) |
| 🟡 #3 (MetricsLogger default fallback) | |
| 🟡 #4 (Off-by-one in boundaries) | ✅ Moot — boundaries no longer consumed |
| 🟡 #5 (Memory accumulation in camera_video) | |
| 🟢 #6, #7 (EpisodeRecord schema concerns) | ✅ Moot — file removed |
New observations:
- ✅ New tests
test_job_from_dict_task_metadataandtest_job_task_metadata_defaultsare well-structured - ✅ Minor docstring update in job_manager.py removes now-stale EpisodeRecord reference
- The PR now focuses cleanly on task metadata extraction, metrics output, and camera recording — EpisodeRecord is deferred
Remaining actionable items:
- 🔴 Fix MetricsLogger initialization order — move instantiation after timestamp mutation (or update the path post-mutation)
- 🟡 Consider removing the
or "metrics.json"fallback if file output is only intended when explicitly requested - 🟡 camera_video.py memory warning is still relevant for long runs
alexmillane
left a comment
There was a problem hiding this comment.
Thanks for adding this!
I have a few comments about the requirements for some of these things. I think we should be able to clean things up and simplify drastically.
| if self.buffers and all( | ||
| len(env_frames) >= self.video_length | ||
| for env_frame_lists in self.buffers.values() | ||
| for env_frames in env_frame_lists | ||
| ): |
There was a problem hiding this comment.
This compound statement is hard to read—suggestion to add a comment (or split it over multiple lines).
Maybe the comment is something like: "flush if all the videos in the buffer exceed the video length.
| for cam, env_frame_lists in self.buffers.items(): | ||
| for env_idx, frames in enumerate(env_frame_lists): | ||
| if not frames: | ||
| continue | ||
| path = os.path.join( | ||
| self.video_folder, | ||
| f"{self.name_prefix}-env{env_idx}-{_sanitize_cam_key(cam)}-step-{self.recording_start_step}.mp4", | ||
| ) | ||
| clip = ImageSequenceClip(list(frames), fps=self.fps) | ||
| clip.write_videofile(path, logger=None, audio=False) | ||
| del clip |
There was a problem hiding this comment.
Am I correct that this could take some time to write? What do you think about adding some logging here to indicate it's started. Maybe just a print at the start of the function?
| metrics_logger.append_job_metrics(job.name, metrics) | ||
|
|
||
| if args_cli.episode_record_dir: | ||
| rec_path = _write_episode_record( |
There was a problem hiding this comment.
Rollout policies above records for multiple episodes, so this function actually writes per-job right?
There was a problem hiding this comment.
Yes, correct, it writes one json per job, not per episode, the name is misleading, I'll rename it
| "job_name": job.name, | ||
| "task_name": job.task_name, | ||
| "embodiment": job.embodiment, | ||
| "env_params": dict(job.env_params), | ||
| "policy_type": job.policy_type, | ||
| "policy_config": dict(job.policy_config_dict), | ||
| "num_envs": job.num_envs, | ||
| "num_steps": job.num_steps, | ||
| "num_episodes": job.num_episodes, |
There was a problem hiding this comment.
All of these job.* fields are present in the config file that is input to the eval_runner.py via the config file. Here, we re-extract the information and reexport it.
Is there a reason that we can't just use the input file (for example see here), rather than reading and then re-exporting the information?
| args_cli.episode_record_dir, | ||
| job, | ||
| {}, | ||
| "failed", |
There was a problem hiding this comment.
Failed is written to the job status 5 lines above here. Consider using Job.status
| if args_cli.metrics_file is not None: | ||
| metrics_logger.save_metrics_to_file() | ||
| print(f"[INFO] Metrics saved to: {metrics_logger.metrics_file}") |
There was a problem hiding this comment.
Thanks for adding that!
| task_name: str = None, | ||
| embodiment: str = None, |
There was a problem hiding this comment.
These two variables are overdetermined. They're contained in the config .json inside the environment.
I guess what you want here is some serialization (i.e. a string) representing the Arena environment? Is that correct?
I think there is a better way to do this. Let's talk about it.
| language_instruction: str = None, | ||
| task_name: str = None, | ||
| embodiment: str = None, | ||
| env_params: dict = None, |
There was a problem hiding this comment.
These are already stored in the class as arena_env_args
| num_episodes: int | None, | ||
| language_instruction: str | None = None, | ||
| ) -> dict[str, Any]: | ||
| ) -> tuple[dict[str, Any] | None, list[dict]]: |
There was a problem hiding this comment.
See comment above. Suggestion to revert this and change our camera recorder to record per-episode videos.
| if args_cli.video or args_cli.camera_video or args_cli.metrics_file or args_cli.episode_record_dir: | ||
| run_ts = datetime.now().strftime("%Y%m%dT%H%M%S") |
There was a problem hiding this comment.
Let's just unconditionally query the date, and then we get rid of this quadruple if statement and de-indent the following if statements by one level.
3f06a09 to
5f309c5
Compare
5f309c5 to
663413f
Compare
| if args_cli.episode_record_dir and job.status != Status.FAILED: | ||
| with contextlib.suppress(Exception): | ||
| rec_path = _write_episode_record( | ||
| args_cli.episode_record_dir, | ||
| job, | ||
| aggregated_metrics, | ||
| "completed", | ||
| args_cli.video_dir if (args_cli.video or args_cli.camera_video) else None, | ||
| ) | ||
| print(f"[INFO] Episode record written to: {rec_path}") |
There was a problem hiding this comment.
hdf5_path is always null for completed jobs
The post-aggregation call to _write_episode_record never passes env, so the env parameter inside the function is None and the hdf5_path branch is never reached. By this point in the code env has already been set to None in the finally block (line 444: env = None). The HDF5 filename includes a per-run timestamp (f"dataset_{job_name}_{ts}") that is generated inside load_env and stored nowhere else, so the path cannot be reconstructed from any other field in the record. Every completed job's JSON will have "hdf5_path": null regardless of whether an HDF5 file was actually written.
Summary
Extend eval runner with per-env camera recording, metrics file output, timestamped run directories, and per-job episode record JSON
Detailed description
CameraObsVideoRecordernow records all parallel envs (one file per env per camera) instead of env 0 only; each run gets a timestamped subdirectory under--video_dirto avoid stale files accumulating across runseval_runnergains--camera_video,--metrics_file, and--episode_record_dirflags;save_metrics_to_file()and camera video recording existed but were never wired up from the eval runner--episode_record_dirwrites one JSON file per job after it completes (or fails), capturing:task_name,embodiment,env_params,policy_type,policy_config,language_instruction,hdf5_path,video_paths,metrics,episode_boundaries,wall_time_secondsrollout_policy()now returns(metrics, episode_boundaries)tracking{env_idx, start_step, end_step}per completed episode; frame index inCameraObsVideoRecorderoutput equals step index so boundaries can be used to slice per-env videos intoindividual episode clips
Jobnow carriestask_name,embodiment, andenv_paramsextracted fromarena_env_argsbefore it is converted to a CLI listEAGAINfile-lock conflicts when a previous run crashed without releasing the lock