Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions examples/job/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ For installing and running an agent inside a single sandbox, see [`../install-ag
|--------|---------|-----------|
| [`bash/`](./bash/) | `BashJobConfig` | Run an arbitrary shell script inside a sandbox (data processing, external evaluation tools) |
| [`harbor/`](./harbor/) | `HarborJobConfig` | Run an AI agent benchmark task (SWE-bench, Terminal Bench, …) via the Harbor framework |
| [`observability/`](./observability/) | `BashJobConfig` | Show the exception observability layer (structured logs + optional OTLP metric) via a job that soft-fails on purpose |

Both backends share a single `Job(config).run()` entrypoint — pick the config type based on your scenario.

Expand Down
76 changes: 76 additions & 0 deletions examples/job/observability/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
# job/observability

Demo for the **exception observability** layer in the Job SDK
(`rock.sdk.job.observability`): structured exception logs + an optional OTLP
metric, emitted automatically from every `JobExecutor` phase.

You don't call the observability layer yourself — it's woven into the executor
via the `monitor_job_phase` decorator. Your only knobs are two env vars:

| Env var | Default | Effect |
|---------|---------|--------|
| `ROCK_JOB_METRICS_OTLP_ENDPOINT` | unset | Unset → **log-only**. Set → also emit the OTLP counter `rock_job.exception.total`. |
| `ROCK_JOB_METRICS_HIGH_CARDINALITY_LABELS` | `true` | `false` → drop `job_name` / `sandbox_id` from the **metric** (they stay on the log). |

On every job exception the SDK dual-writes:

1. a structured `key=value` **ERROR log** — always, with the full label set, and
2. a counter increment on `rock_job.exception.total` — only when an OTLP endpoint is configured.

Labels: `phase`, `severity`, `exception_type`, `trial_type`, `job_name`,
`experiment_id`, `namespace`, `sandbox_id`.

## Two failure semantics

| | What the SDK does | How you observe it |
|---|---|---|
| **soft fail** (script `exit != 0`, timeout, no output) | recorded as `severity=soft`, carried back in `TrialResult.exception_info` — `run()` does **not** raise | `JobResult.status == FAILED`, `TrialResult.exception_info` populated |
| **hard fail** (sandbox can't start, launch fails) | recorded as `severity=hard`, then **re-raised** (fail-fast across the batch) | `run()`/`wait()` raises |

See [`docs/dev/job/exception-handling.md`](../../../docs/dev/job/exception-handling.md) for the full taxonomy.

## Files

| File | Purpose |
|------|---------|
| [`observability_demo.py`](./observability_demo.py) | Entry point — echoes the two knobs, loads `JobConfig.from_yaml()`, runs `Job(config).run()`, prints per-trial `exception_info` |
| [`observability_job_config.yaml.template`](./observability_job_config.yaml.template) | `BashJobConfig` whose script exits non-zero, producing one soft-fail event |

## Quick run

```bash
# 1. copy the template and fill in real values (<placeholders>)
cp observability_job_config.yaml.template observability_job_config.yaml

# 2. (optional) turn metrics ON — otherwise the demo is log-only
export ROCK_JOB_METRICS_OTLP_ENDPOINT="http://localhost:4318/v1/metrics"

# 3. run
python observability_demo.py -c observability_job_config.yaml
```

The template's script does `exit 7`, so a clean run produces exactly one
soft-fail event. You'll see it twice in the output:

```
ERROR ... job exception phase=collect severity=soft exception_type=BashExitCode ... job_name=observability_demo ...
...
JobResult: status=failed completed=0 failed=1 exit_code=...
trial[0] ...: SOFT FAIL -> BashExitCode: ...
```

The ERROR log line is **always** emitted. The matching counter increment on
`rock_job.exception.total` is exported **only** when `ROCK_JOB_METRICS_OTLP_ENDPOINT`
is set. The demo calls `get_reporter().shutdown()` at the end to force a final
metric flush before this short-lived process exits (also registered via `atexit`,
so it's safe and idempotent).

To see a **hard fail** (`run()` raises, `severity=hard`) instead, point
`base_url` at an unreachable admin so the sandbox can't start.

## Wiring metrics to a collector

`ROCK_JOB_METRICS_OTLP_ENDPOINT` is a standard OTLP/HTTP metrics endpoint. Point
it at any OpenTelemetry Collector (or a backend that accepts OTLP/HTTP). A quick
local collector that logs received metrics to stdout is enough to eyeball the
counter; then scrape/forward to Prometheus, etc., in your real deployment.
119 changes: 119 additions & 0 deletions examples/job/observability/observability_demo.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
"""Observability demo for the ROCK Job SDK.

Same shape as ``examples/job/harbor/harbor_demo.py`` — load a YAML config, run
``Job(config).run()``, print the per-trial results — but pointed at the
**exception observability** layer in ``rock.sdk.job.observability``.

You do NOT call the observability layer yourself. It is woven automatically into
every ``JobExecutor`` phase (start / setup / launch / wait / collect). Your only
knobs are two environment variables:

ROCK_JOB_METRICS_OTLP_ENDPOINT # unset -> log-only; set -> also emit OTLP counter
ROCK_JOB_METRICS_HIGH_CARDINALITY_LABELS # "false" -> drop job_name/sandbox_id from the metric

On every job exception the SDK dual-writes:
1. a structured ``key=value`` ERROR log (always, with full labels), and
2. a counter ``rock_job.exception.total`` (only when an OTLP endpoint is set).

Two failure semantics are surfaced (see docs/dev/job/exception-handling.md):
- soft fail (script exit != 0, timeout): carried back as data in
``TrialResult.exception_info`` — run() does NOT raise.
- hard fail (sandbox can't start): re-raised out of run()/wait().

The bundled ``observability_job_config.yaml.template`` runs a script that exits
non-zero, so a clean run produces exactly one soft-fail event you can see in the
logs (and as a metric, if you set the OTLP endpoint).

Usage:
cp observability_job_config.yaml.template observability_job_config.yaml
# fill in <placeholders>, then optionally turn metrics on:
export ROCK_JOB_METRICS_OTLP_ENDPOINT=http://localhost:4318/v1/metrics
python examples/job/observability/observability_demo.py -c observability_job_config.yaml
"""

import argparse
import asyncio
import logging

from rock import env_vars
from rock.sdk.job import Job, JobConfig, observability
from rock.sdk.job.result import JobResult

logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s")
logger = logging.getLogger(__name__)
# disable httpx
logging.getLogger("httpx").setLevel(logging.WARNING)


def print_observability_status() -> None:
"""Echo the two knobs so it's obvious whether metrics are on."""
endpoint = env_vars.ROCK_JOB_METRICS_OTLP_ENDPOINT
high_card = env_vars.ROCK_JOB_METRICS_HIGH_CARDINALITY_LABELS
logger.info("─" * 60)
logger.info("Job observability config:")
if endpoint:
logger.info(" metrics: ON -> OTLP counter to %s", endpoint)
else:
logger.info(" metrics: OFF -> log-only (set ROCK_JOB_METRICS_OTLP_ENDPOINT to enable)")
logger.info(
" high-card labels: %s (job_name/sandbox_id %s on the metric)", high_card, "kept" if high_card else "dropped"
)
logger.info(" counter name: rock_job.exception.total")
logger.info("─" * 60)


def summarize(result: JobResult) -> None:
logger.info("─" * 60)
logger.info(
"JobResult: status=%s completed=%d failed=%d exit_code=%d",
result.status,
result.n_completed,
result.n_failed,
result.exit_code,
)
for i, trial in enumerate(result.trial_results):
if trial.exception_info is not None:
# SOFT fail: surfaced as data, run() did not raise.
logger.info(
" trial[%d] %s: SOFT FAIL -> %s: %s",
i,
trial.task_name or "?",
trial.exception_info.exception_type,
trial.exception_info.exception_message,
)
else:
logger.info(" trial[%d] %s: ok", i, trial.task_name or "?")
logger.info("─" * 60)
if result.n_failed:
logger.info("Each soft failure above was ALSO emitted as one ERROR log + one metric")
logger.info("increment, attributed to the 'collect' phase. Check the logs above.")


def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(description="Run a Bash job inside a ROCK sandbox and observe its exceptions")
parser.add_argument("-c", "--config", required=True, help="Path to BashJobConfig YAML file")
return parser.parse_args()


async def async_main(args: argparse.Namespace) -> None:
print_observability_status()
config = JobConfig.from_yaml(args.config)
try:
result = await Job(config).run()
except Exception:
# A HARD fail (e.g. sandbox can't start) propagates here and was already
# recorded by the observability layer (severity=hard) before re-raising.
logger.exception("Job hard-failed — observability recorded a severity=hard event for the failing phase")
raise
else:
summarize(result)
finally:
# Short-lived process: force a final metrics flush so the last buffered
# exceptions are exported before exit. atexit also calls this (idempotent),
# but doing it explicitly makes the flush deterministic in a demo.
observability.get_reporter().shutdown()


if __name__ == "__main__":
args = parse_args()
asyncio.run(async_main(args))
29 changes: 29 additions & 0 deletions examples/job/observability/observability_job_config.yaml.template
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
# BashJobConfig for the observability demo.
#
# The script below exits non-zero on purpose, so a clean run produces exactly
# one SOFT-FAIL event: one structured ERROR log (always) + one increment of the
# counter rock_job.exception.total (only if ROCK_JOB_METRICS_OTLP_ENDPOINT is set).
#
# Copy to observability_job_config.yaml and fill in the <placeholders>.

# ── Job Identity ─────────────────────────────────────
experiment_id: "job_observability_demo"
labels:
demo: "observability"
timeout: 120

# ── Rock Environment ─────────────────────────────────
environment:
base_url: "<your-rock-base-url>" # e.g. http://localhost:8080
xrl_authorization: "<your-xrl-auth-token>"
image: "<your-image:tag>" # e.g. python:3.11
cluster: "<your-cluster>" # optional

# ── Bash Script ──────────────────────────────────────
# exit 7 -> BashExitCode soft fail, attributed to the 'collect' phase.
# Swap in `sleep 100000` (with a small timeout above) to see a ProcessTimeout soft fail instead.
script: |
echo "[trial] doing work..."
sleep 1
echo "[trial] boom"
exit 7
10 changes: 10 additions & 0 deletions rock/env_vars.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,10 @@
ROCK_MODEL_SERVICE_TRAJ_APPEND_MODE: bool | None = None
ROCK_JOB_PROXY_REPLAY_FILE: str

# Client-side Job exception observability
ROCK_JOB_METRICS_OTLP_ENDPOINT: str | None = None
ROCK_JOB_METRICS_HIGH_CARDINALITY_LABELS: bool = True

# RuntimeEnv
ROCK_RTENV_PYTHON_V31114_INSTALL_CMD: str
ROCK_RTENV_PYTHON_V31212_INSTALL_CMD: str
Expand Down Expand Up @@ -147,6 +151,12 @@
),
# Docker temp auth directory
"ROCK_DOCKER_TEMP_AUTH_DIR": lambda: os.getenv("ROCK_DOCKER_TEMP_AUTH_DIR"),
# Client-side Job exception observability
"ROCK_JOB_METRICS_OTLP_ENDPOINT": lambda: os.getenv("ROCK_JOB_METRICS_OTLP_ENDPOINT"),
"ROCK_JOB_METRICS_HIGH_CARDINALITY_LABELS": lambda: os.getenv(
"ROCK_JOB_METRICS_HIGH_CARDINALITY_LABELS", "true"
).lower()
== "true",
}


Expand Down
29 changes: 27 additions & 2 deletions rock/sdk/job/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

from rock.actions import CreateBashSessionRequest
from rock.logger import init_logger
from rock.sdk.job.observability import monitor_job_phase
from rock.sdk.job.operator import Operator
from rock.sdk.job.result import TrialResult
from rock.sdk.sandbox.client import Sandbox
Expand Down Expand Up @@ -87,16 +88,27 @@ def _job_tmp_prefix(config: JobConfig) -> str:

async def _do_submit(self, trial: AbstractTrial) -> TrialClient:
"""Start sandbox + execute script for a single trial."""
sandbox = await self._phase_start(trial)
await self._phase_setup(trial, sandbox)
return await self._phase_launch(trial, sandbox)

@monitor_job_phase("start")
async def _phase_start(self, trial: AbstractTrial) -> Sandbox:
config = trial._config
sandbox = Sandbox(config.environment)
await sandbox.start()
logger.info(f"Sandbox started: sandbox_id={sandbox.sandbox_id}, job_name={config.job_name}")
return sandbox

@monitor_job_phase("setup")
async def _phase_setup(self, trial: AbstractTrial, sandbox: Sandbox) -> None:
# G4: let trial backfill config from sandbox state before setup
await trial.on_sandbox_ready(sandbox)

await trial.setup(sandbox)

@monitor_job_phase("launch")
async def _phase_launch(self, trial: AbstractTrial, sandbox: Sandbox) -> TrialClient:
config = trial._config
session = f"rock-job-{config.job_name or 'default'}"
env = self._build_session_env(config)
await sandbox.create_session(CreateBashSessionRequest(session=session, env_enable=True, env=env))
Expand All @@ -121,8 +133,11 @@ async def _do_submit(self, trial: AbstractTrial) -> TrialClient:

async def _do_wait(self, client: TrialClient) -> TrialResult | list[TrialResult]:
"""Wait for a single trial to finish, call trial.collect()."""
from rock.sdk.job.result import ExceptionInfo
obs, success, message = await self._phase_wait(client)
return await self._phase_collect(client, obs, success, message)

@monitor_job_phase("wait")
async def _phase_wait(self, client: TrialClient):
config = client.trial._config
success, message = await client.sandbox.wait_for_process_completion(
pid=client.pid,
Expand All @@ -138,6 +153,13 @@ async def _do_wait(self, client: TrialClient) -> TrialResult | list[TrialResult]
ignore_output=False,
response_limited_bytes_in_nohup=None,
)
return obs, success, message

@monitor_job_phase("collect")
async def _phase_collect(self, client: TrialClient, obs, success, message) -> TrialResult | list[TrialResult]:
from rock.sdk.job.result import ExceptionInfo

config = client.trial._config
exit_code = obs.exit_code if obs.exit_code is not None else 1
if obs.output:
logger.info(f"Trial output (job={config.job_name}):\n{obs.output}")
Expand All @@ -149,6 +171,9 @@ async def _do_wait(self, client: TrialClient) -> TrialResult | list[TrialResult]
r.raw_output = obs.output or ""
if r.exit_code == 0 and exit_code != 0:
r.exit_code = exit_code
# Timeout is detected in _phase_wait but annotated here so the soft-fail
# is attributed to the "collect" phase (the decorator scans this method's
# final return value).
if not success:
fail_info = ExceptionInfo(
exception_type="ProcessTimeout",
Expand Down
Loading
Loading