Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions docker/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,12 @@ RUN git clone https://github.com/vllm-project/vime.git /root/vime && \
RUN cd /root/vime/vime/backends/megatron_utils/kernels/int4_qat && \
pip install . --no-build-isolation

# ====================================== Agentic rollout dependencies ================================

# uni-agent: reward specs (swe_bench.py) used by coding_agent_rl
ARG UNIAGENT_COMMIT=main
RUN pip install "git+https://github.com/verl-project/uni-agent.git@${UNIAGENT_COMMIT}"

# ====================================== Build-time smoke ============================================

# Fail-fast import smoke + flashinfer version pin check. Catches ABI / version
Expand Down
53 changes: 42 additions & 11 deletions examples/coding_agent_rl/generate.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@
delegating segment-to-``Sample`` fan-out to ``vime.agent.trajectory``.

All sandbox-side details live in ``sandbox.py``; the LLM plumbing
(Anthropic <-> vLLM /inference/v1/generate, token capture, 3-kind segment split) uses
``vime.agent.adapters.AnthropicAdapter``.
(Anthropic <-> vLLM ``/inference/v1/generate``, token capture, 3-kind segment
split) uses ``vime.agent.adapters.AnthropicAdapter``.

Dataset row ``metadata`` schema::

Expand Down Expand Up @@ -90,16 +90,24 @@ def __init__(self, args) -> None:
self.reasoning_parser = getattr(args, "vllm_reasoning_parser", None) or None
vllm_url = f"http://{args.vllm_router_ip}:{args.vllm_router_port}"
public_host = os.environ.get("VIME_HEAD_HOST")
if not public_host:
# ADAPTER_URL_OVERRIDE lets a reverse tunnel (e.g. cloudflared) supply a
# ready-made public adapter URL when the head has no directly routable
# host:port -- e.g. Modal sandboxes on a private cluster. When set it
# fully replaces the VIME_HEAD_HOST + SHIM_PORT construction below.
adapter_url_override = os.environ.get("ADAPTER_URL_OVERRIDE")
if not public_host and not adapter_url_override:
raise RuntimeError(
"VIME_HEAD_HOST is not set. Export it to the host IP that "
"sandboxes can reach for reverse-connection to the Anthropic adapter. "
"Without it the sandbox cannot dial back and the rollout will "
"silently abort."
"Neither VIME_HEAD_HOST nor ADAPTER_URL_OVERRIDE is set. Export "
"VIME_HEAD_HOST to the host IP that sandboxes can reach for "
"reverse-connection to the Anthropic adapter, or set "
"ADAPTER_URL_OVERRIDE to a full public adapter URL (e.g. a "
"cloudflared tunnel). Without one the sandbox cannot dial back "
"and the rollout will silently abort."
)
self.adapter = AnthropicAdapter(
tokenizer=self.tokenizer,
vllm_url=vllm_url,
model=args.hf_checkpoint,
tool_parser=self.tool_parser,
reasoning_parser=self.reasoning_parser,
)
Expand All @@ -114,7 +122,7 @@ def __init__(self, args) -> None:
thread_name="anthropic-adapter",
runner_kwargs={"handler_cancellation": True},
)
self.adapter_url = f"http://{public_host}:{self.app_handle.port}"
self.adapter_url = adapter_url_override or f"http://{public_host}:{self.app_handle.port}"
logger.info(
"[coding_agent_rl] tokenizer=%s adapter=%s max_context_len=%s tool_parser=%s reasoning_parser=%s",
args.hf_checkpoint,
Expand Down Expand Up @@ -219,14 +227,19 @@ async def generate(args, sample: Sample, sampling_params: dict[str, Any]):
state = _State(args)
md = _metadata(sample)
if not md["image"] or not md["workdir"]:
logger.warning("[coding_agent_rl] ABORT missing_image_or_workdir: image=%s workdir=%s label=%s",
md.get("image"), md.get("workdir"), sample.label)
return _abort_result(sample, "missing_image_or_workdir")

instance_id = md["instance_id"]
session_id = _start_session(state, sample, md, sampling_params)
t0 = time.time()
try:
async with asyncio.timeout(SWE_GENERATE_GUARD_SEC):
logger.info("[coding_agent_rl] %s: booting sandbox image=%s ...", instance_id, md["image"])
async with sandbox.boot_agent_sandbox(md["image"]) as sb:
sb_id = getattr(sb, 'sandbox_id', 'unknown')
logger.info("[coding_agent_rl] %s: sandbox booted (id=%s), running claude-code ...", instance_id, sb_id)
await sandbox.run_claude_code(
sb,
workdir=md["workdir"],
Expand All @@ -237,14 +250,19 @@ async def generate(args, sample: Sample, sampling_params: dict[str, Any]):
swepro=md["swepro"],
pre_commands=md["pre_commands"],
)
logger.info("[coding_agent_rl] %s: claude-code done, capturing diff ...", instance_id)
diff_text = await sandbox.git_diff(sb, md["workdir"])
logger.info("[coding_agent_rl] %s: diff captured (%d chars), sandbox=%s",
instance_id, len(diff_text or ""), sb_id)

logger.info("[coding_agent_rl] %s: evaluating diff ...", instance_id)
reward, is_solved, applied_cleanly = await sandbox.evaluate(
image=md["image"],
workdir=md["workdir"],
diff_text=diff_text,
swepro=md["swepro"],
eval_cmd=md["eval_cmd"],
swebench_metadata=md.get("swebench_metadata"),
pre_commands=md["pre_commands"],
timeout_sec=SWE_EVAL_TIMEOUT_SEC,
)
Expand All @@ -254,23 +272,34 @@ async def generate(args, sample: Sample, sampling_params: dict[str, Any]):
applied_cleanly=bool(applied_cleanly),
)
segments = await state.adapter.finish_session(session_id)
return _merge_samples(
logger.info("[coding_agent_rl] %s: adapter returned %d segments, merging ...",
instance_id, len(segments))
result = _merge_samples(
sample=sample,
state=state,
segments=segments,
reward_result=reward_result,
elapsed_sec=time.time() - t0,
instance_id=instance_id,
)
if isinstance(result, list):
for i, s in enumerate(result):
logger.info("[coding_agent_rl] %s: fanned[%d] tokens=%d rollout_log_probs=%s loss_mask=%d",
instance_id, i, len(s.tokens or []),
"None" if s.rollout_log_probs is None else f"len={len(s.rollout_log_probs)}",
len(s.loss_mask or []))
return result

except asyncio.TimeoutError:
logger.error("[coding_agent_rl] %s: ABORT wall_clock_timeout after %.1fs", instance_id, time.time() - t0)
_log_timeout_diagnostic(t0)
return _abort_result(sample, "wall_clock_timeout")
except Exception as e:
logger.error(
"[coding_agent_rl] %s: rollout failed: %s\n%s",
"[coding_agent_rl] %s: ABORT exception:%s after %.1fs\n%s",
instance_id,
e,
type(e).__name__,
time.time() - t0,
traceback.format_exc(),
)
return _abort_result(sample, f"exception:{type(e).__name__}")
Expand Down Expand Up @@ -328,6 +357,7 @@ def _metadata(sample: Sample) -> dict[str, Any]:
"problem_statement": m.get("problem_statement") or _coerce_prompt(sample.prompt),
"swepro": m.get("swepro"),
"eval_cmd": m.get("eval_cmd") or _wrap_f2p_script(rem.get("f2p_script")),
"swebench_metadata": m.get("swebench_metadata"),
"pre_commands": m.get("pre_commands") or rem.get("pre_commands"),
}

Expand All @@ -351,6 +381,7 @@ def _abort(sample: Sample, reason: str) -> Sample:
sample.response = ""
sample.response_length = 1
sample.loss_mask = [0]
sample.rollout_log_probs = [0.0]
sample.reward = 0.0
sample.status = Sample.Status.ABORTED
sample.metadata = {**(sample.metadata or {}), "abort_reason": reason}
Expand Down
47 changes: 40 additions & 7 deletions examples/coding_agent_rl/sandbox.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
from contextlib import asynccontextmanager
from pathlib import Path

from vime.agent.sandbox import E2BSandbox, Sandbox
from vime.agent.sandbox import E2BSandbox, ModalSandbox, Sandbox

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

The import of ModalSandbox from vime.agent.sandbox will fail with an ImportError because ModalSandbox is not defined or exported in vime/agent/sandbox.py. Please implement and export ModalSandbox in vime/agent/sandbox.py or remove this import.



logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -55,12 +55,28 @@
_BOOT_SEM: asyncio.Semaphore | None = None


def make_sandbox(image: str) -> Sandbox:
"""Construct the configured sandbox backend for ``image``.

Backend is chosen by ``VIME_AGENT_SANDBOX_BACKEND`` (read per call so run.sh
/ tests can set it): ``e2b`` (default, unchanged behavior) or ``modal``.
Both backends satisfy the same :class:`Sandbox` Protocol, so the work-sandbox
and eval-sandbox call sites below stay backend-agnostic.
"""
backend = os.environ.get("VIME_AGENT_SANDBOX_BACKEND", "e2b").strip().lower()
if backend == "modal":
return ModalSandbox(image)
if backend in ("", "e2b"):
return E2BSandbox(image)
raise ValueError(f"unknown VIME_AGENT_SANDBOX_BACKEND={backend!r} (expected 'e2b' or 'modal')")


# ---------------------------------------------------------------------------
# Sandbox bootstrap (Node + Claude Code + agent user)
# ---------------------------------------------------------------------------
@asynccontextmanager
async def boot_agent_sandbox(image: str) -> AsyncIterator[E2BSandbox]:
"""Boot a fresh E2B sandbox and install the Claude Code toolchain.
async def boot_agent_sandbox(image: str) -> AsyncIterator[Sandbox]:
"""Boot a fresh sandbox (E2B or Modal) and install the Claude Code toolchain.

This is the provisioning wrapper for the work sandbox: create the sandbox
from the dataset image, install Node 22 + Claude Code CLI from host
Expand All @@ -74,7 +90,7 @@ async def boot_agent_sandbox(image: str) -> AsyncIterator[E2BSandbox]:
sb = None
last_err: Exception | None = None
for attempt in range(SWE_BOOT_RETRIES):
cand = E2BSandbox(image)
cand = make_sandbox(image)
try:
async with _BOOT_SEM:
await cand.__aenter__()
Expand Down Expand Up @@ -301,18 +317,19 @@ async def evaluate(
diff_text: str,
swepro: dict | None = None,
eval_cmd: str | None = None,
swebench_metadata: dict | None = None,
pre_commands: list[str] | str | None = None,
timeout_sec: int = 600,
) -> tuple[float, bool, bool]:
"""Returns (reward, solved, applied_cleanly).

No-test-cheating guarantee: the eval sandbox is built from the same image
but starts CLEAN, so only the model-produced diff affects reward."""
if not (swepro or eval_cmd):
logger.warning("[e2b.evaluate] no swepro/eval_cmd; reward=0")
if not (swepro or eval_cmd or swebench_metadata):
logger.warning("[e2b.evaluate] no swepro/eval_cmd/swebench_metadata; reward=0")
return 0.0, False, True

async with E2BSandbox(image) as ev:
async with make_sandbox(image) as ev:
await ensure_agent_user(ev, workdir)
if swepro:
await _setup_swepro_assets(ev, swepro)
Expand All @@ -327,6 +344,9 @@ async def evaluate(
if swepro:
r, s = await _run_swepro(ev, workdir, swepro, timeout_sec)
return r, s, True
if swebench_metadata:
r, s = await _run_swebench_eval(ev, workdir, swebench_metadata, timeout_sec)
return r, s, True
r, s = await _run_eval_cmd(ev, workdir, eval_cmd, timeout_sec)
return r, s, True

Expand Down Expand Up @@ -395,6 +415,19 @@ async def _run_swepro(ev: Sandbox, workdir: str, swepro: dict, timeout: int) ->
return (1.0 if solved else 0.0), solved


async def _run_swebench_eval(ev: Sandbox, workdir: str, metadata: dict, timeout: int) -> tuple[float, bool]:
"""SWE-bench harness grading — delegates to uni_agent.reward.swe_bench."""
import re as _re
from uni_agent.reward.swe_bench import make_eval_script, parse_eval_output

eval_script = make_eval_script(metadata, workdir)
await ev.write_file("/tmp/_swebench_eval.sh", eval_script, user="root")
_, stdout, _ = await ev.exec("bash /tmp/_swebench_eval.sh 2>&1", user="root", check=False, timeout=timeout)
output = _re.sub(r"\x1b\[[0-9;]*m|\r", "", stdout or "")
solved, _ = parse_eval_output(metadata, output)
return (1.0 if solved else 0.0), solved


async def _run_eval_cmd(ev: Sandbox, workdir: str, cmd: str, timeout: int) -> tuple[float, bool]:
ec, _, _ = await ev.exec(f"cd {workdir} && {cmd}", user="agent", check=False, timeout=timeout)
return (1.0 if ec == 0 else 0.0), ec == 0
Loading