diff --git a/benches/benchmark_tasks.example.json b/benches/benchmark_tasks.example.json new file mode 100644 index 000000000..52dab1b32 --- /dev/null +++ b/benches/benchmark_tasks.example.json @@ -0,0 +1,24 @@ +[ + { + "id": "json-write-settings", + "category": "file_workflow", + "tools": ["filesystem"], + "goal": "Create settings.json as a JSON object with mode=\"fast\" and retries=2. Use write_file, then FINAL done.", + "oracle": [ + {"action": "write_file", "args": {"path": "settings.json", "content": "{\"mode\": \"fast\", \"retries\": 2}"}}, + {"final": "done"} + ], + "grader": {"type": "file_json_equals", "path": "settings.json", "required": {"mode": "fast", "retries": 2}} + }, + { + "id": "json-web-fact", + "category": "web_research", + "tools": ["web_search"], + "goal": "Use web_search to find what the Bradley-Terry model estimates from pairwise outcomes, then FINAL that single word (lowercase).", + "oracle": [ + {"action": "web_search", "args": {"query": "bradley terry model pairwise"}}, + {"final": "strength"} + ], + "grader": {"type": "answer_equals", "expected": "strength", "normalize": "lower_alpha"} + } +] diff --git a/benches/run_agentic_benchmark.py b/benches/run_agentic_benchmark.py new file mode 100644 index 000000000..dc35fd7f5 --- /dev/null +++ b/benches/run_agentic_benchmark.py @@ -0,0 +1,1123 @@ +#!/usr/bin/env python3 +"""Realistic agentic benchmark for the test-time-scaling inferlets. + +Professor's task: "use realistic agentic benchmarks to evaluate those +test-time-scaling methods implemented as inferlets", with Agent Arena +(https://arena.ai/blog/agent-arena/) as the example -- agents doing real work +with web search, filesystem, and terminal tools. + +Agent Arena itself is a live, human-voted platform (Extended Bradley-Terry over +millions of sessions) and cannot be run offline: there is no public task export +or scoring API. This runner is the faithful local stand-in. It is *agentic* in +the real sense, not a single prompt: + + * A real multi-step AGENT LOOP. The model emits one tool call per step + (ACTION/ARGS) or a FINAL answer; the harness executes the tool against real + resources, feeds the OBSERVATION back, and repeats up to --max-steps. + * REAL tools, all three the Arena post names: + filesystem -> real file reads/writes in an isolated temp workspace + terminal -> real subprocess execution with real exit codes + web_search -> a FROZEN local corpus (deterministic, offline-verifiable); + a live HTTP backend can be plugged in, but is off by default + * OBJECTIVE grading. Tasks are graded by ground truth -- a parsed file, a + process exit code, a value derived from the real filesystem, or an exact fact + -- never by keyword matching on prose and never by a human vote. + * Arena-style PAIRWISE scoring. For every inferlet we run baseline vs. method on + the same tasks and rank all arms with a Bradley-Terry model over pairwise + objective outcomes -- the same pairwise idea Arena uses, with an objective + judge instead of a human. + +Execution paths: + + --offline-self-check Runs the loop against a built-in ORACLE policy that + plays correct tool trajectories. Proves the loop, the + real tools, and the objective graders work end to end + with NO model and NO GPU. Expected: every task succeeds. + + Drives the real inferlets through `pie serve`. Meaningful + only with a real model/driver; under the dummy driver the + per-step text is random, so objective success is expected + to be ~0 while control flow still runs. Run on the GPU box. + + --report Rebuild the leaderboard markdown from an existing results + file. + +Outputs JSONL + CSV + leaderboard.md under --out-dir. +""" + +from __future__ import annotations + +import argparse +import asyncio +import csv +import json +import math +import re +import shlex +import shutil +import subprocess +import sys +import tempfile +import time +from datetime import datetime, timezone +from pathlib import Path +from typing import Any, Callable + +ROOT = Path(__file__).resolve().parent.parent +BENCHES_DIR = Path(__file__).resolve().parent +PIE_REPO = ROOT +ARENA_REFERENCE = "https://arena.ai/blog/agent-arena/" + +LANGUAGES = ("rust", "python", "js") +FAMILIES = ("modular-cache", "hierarchical-attention", "mcts") +# The tools an agent may call. A call to anything outside this set is treated as a +# hallucinated tool (one of the Arena-style signals we score). +TOOLS = ("read_file", "write_file", "list_files", "run_terminal", "web_search") + + +# =========================================================================== +# Real tools +# =========================================================================== +class Workspace: + """An isolated temp dir the agent acts on, with real file + terminal access.""" + + def __init__(self, prefix: str = "agentic_bench_") -> None: + self.path = Path(tempfile.mkdtemp(prefix=prefix)) + + def write_file(self, rel: str, content: str) -> None: + target = self.path / rel + target.parent.mkdir(parents=True, exist_ok=True) + target.write_text(content, encoding="utf-8") + + def read_file(self, rel: str) -> str: + return (self.path / rel).read_text(encoding="utf-8") + + def exists(self, rel: str) -> bool: + return (self.path / rel).exists() + + def list_files(self) -> list[str]: + return sorted(p.name for p in self.path.iterdir() if p.is_file()) + + def run_terminal(self, command: list[str], timeout: int = 30) -> dict[str, Any]: + start = time.time() + try: + proc = subprocess.run(command, cwd=self.path, capture_output=True, + text=True, timeout=timeout) + code, out, err = proc.returncode, proc.stdout, proc.stderr + except subprocess.TimeoutExpired: + code, out, err = 124, "", "TIMEOUT" + except FileNotFoundError: + code, out, err = 127, "", "command not found" + return {"exit_code": code, "stdout": out[-2000:], "stderr": err[-2000:], + "seconds": round(time.time() - start, 3)} + + def cleanup(self) -> None: + shutil.rmtree(self.path, ignore_errors=True) + + +class FrozenWebSearch: + """Deterministic, offline 'web search': fixed snippets keyed by query terms. + + This keeps the web_search tool REAL (the agent calls it and gets results back) + while staying reproducible and network-free. Swap in a live HTTP backend by + implementing the same .search(query) -> str interface. + """ + + CORPUS: list[dict[str, Any]] = [ + {"terms": ["dummy", "driver"], + "snippet": "Pie docs: the dummy driver emits RANDOM tokens and loads no " + "model weights; it is used for control-flow smoke tests only."}, + {"terms": ["agent", "arena", "signals"], + "snippet": "Agent Arena evaluates tool-using agents and ranks them by " + "pairwise human votes; relevant signals include task success " + "and tool reliability."}, + {"terms": ["bradley", "terry"], + "snippet": "The Bradley-Terry model estimates a strength per item from " + "pairwise win/loss outcomes via logistic comparison."}, + ] + + def search(self, query: str) -> str: + q = query.lower() + # Tolerant keyword match: a doc hits if all-but-one of its terms appear, + # so the agent doesn't have to phrase the query exactly. + hits = [c["snippet"] for c in self.CORPUS + if sum(t in q for t in c["terms"]) >= max(1, len(c["terms"]) - 1)] + if not hits: + return "web_search: no results." + return " | ".join(hits[:3]) + + +def parse_search_results(data: Any) -> list[str]: + """Pull snippet strings out of a search API's JSON, tolerant of shape. + + Handles Brave (`web.results[].description`) and generic shapes + (`results[].snippet|content|title`) so more than one provider can be used. + """ + out: list[str] = [] + if isinstance(data, dict): + brave = (data.get("web") or {}).get("results") + if isinstance(brave, list): + out += [r.get("description") or r.get("title") or "" for r in brave] + generic = data.get("results") + if isinstance(generic, list): + out += [r.get("snippet") or r.get("content") or r.get("title") or "" for r in generic] + return [s for s in out if s] + + +class HttpWebSearch: + """Live web_search backend; same `.search()` interface as FrozenWebSearch. + + Calls a JSON search API (Brave by default) and returns the top snippets. The + HTTP call is injectable via `fetch` so it is unit-tested without network, and + it stays OFF unless `--web-search live` is passed (the frozen corpus is the + default, to keep runs deterministic). Network/parse errors degrade to a plain + "no results" message rather than crashing a benchmark run. + """ + + ENDPOINTS = {"brave": "https://api.search.brave.com/res/v1/web/search"} + + def __init__(self, api_key: str, provider: str = "brave", count: int = 3, + fetch: Callable[[str, dict], str] | None = None, timeout: int = 10) -> None: + self.api_key = api_key + self.provider = provider + self.count = count + self.timeout = timeout + self._fetch = fetch or self._http_get + + def _http_get(self, url: str, headers: dict) -> str: + import urllib.request # local import: only the live path needs it + req = urllib.request.Request(url, headers=headers) + with urllib.request.urlopen(req, timeout=self.timeout) as resp: + return resp.read().decode("utf-8", "replace") + + def _build(self, query: str) -> tuple[str, dict]: + import urllib.parse + base = self.ENDPOINTS.get(self.provider, self.ENDPOINTS["brave"]) + url = f"{base}?q={urllib.parse.quote(query)}&count={self.count}" + headers = {"Accept": "application/json"} + if self.provider == "brave": + headers["X-Subscription-Token"] = self.api_key + else: + headers["Authorization"] = f"Bearer {self.api_key}" + return url, headers + + def search(self, query: str) -> str: + url, headers = self._build(query) + try: + data = json.loads(self._fetch(url, headers)) + except Exception as exc: # noqa: BLE001 -- a flaky search must not abort the run + return f"web_search: error ({type(exc).__name__})" + snippets = parse_search_results(data) + return " | ".join(snippets[: self.count]) or "web_search: no results." + + +def make_web_backend(args): + """Pick the web_search backend: frozen corpus (default) or a live API.""" + if getattr(args, "web_search", "frozen") == "live": + import os + key = os.environ.get(args.web_search_key_env) + if not key: + raise SystemExit( + f"--web-search live needs an API key in ${args.web_search_key_env}") + return HttpWebSearch(key, provider=args.web_search_provider) + return FrozenWebSearch() + + +# =========================================================================== +# Agent protocol: parse one step, execute one tool +# =========================================================================== +class Action: + def __init__(self, kind: str, tool: str = "", args: dict | None = None, + final: str = "", raw: str = "") -> None: + self.kind = kind # "tool" | "final" | "none" + self.tool = tool + self.args = args or {} + self.final = final + self.raw = raw + + +def parse_action(step_text: str) -> Action: + """Parse a single agent step. + + Expected formats (the system prompt asks the model for exactly one): + ACTION: + ARGS: + or: + FINAL: + """ + final_m = re.search(r"(?is)\bFINAL:\s*(.+)$", step_text) + action_m = re.search(r"(?im)^\s*ACTION:\s*([a-z_]+)\s*$", step_text) + # FINAL wins only if it is not part of an earlier ACTION block. + if final_m and (not action_m or final_m.start() < action_m.start()): + return Action("final", final=final_m.group(1).strip(), raw=step_text) + if action_m: + tool = action_m.group(1).strip() + args_m = re.search(r"(?im)^\s*ARGS:\s*(\{.*\})\s*$", step_text) + args: dict = {} + if args_m: + try: + args = json.loads(args_m.group(1)) + except json.JSONDecodeError: + args = {"__malformed__": args_m.group(1)} + return Action("tool", tool=tool, args=args, raw=step_text) + return Action("none", raw=step_text) + + +def execute_tool(ws: Workspace, web: FrozenWebSearch | None, action: Action) -> dict[str, Any]: + """Execute one tool. Returns observation text + error/hallucination flags.""" + name, args = action.tool, action.args + if name not in TOOLS: + return {"observation": f"ERROR: unknown tool '{name}'", "error": True, "hallucinated": True} + try: + if name == "read_file": + path = args["path"] + if not ws.exists(path): + return {"observation": f"ERROR: no such file '{path}'", "error": True, "hallucinated": False} + return {"observation": ws.read_file(path), "error": False, "hallucinated": False} + if name == "write_file": + ws.write_file(args["path"], args["content"]) + return {"observation": f"wrote {args['path']}", "error": False, "hallucinated": False} + if name == "list_files": + return {"observation": ", ".join(ws.list_files()), "error": False, "hallucinated": False} + if name == "run_terminal": + res = ws.run_terminal(shlex.split(args["command"])) + obs = f"exit={res['exit_code']}\n{res['stdout']}{res['stderr']}".strip() + return {"observation": obs, "error": res["exit_code"] in (124, 127), "hallucinated": False} + if name == "web_search": + if web is None: + return {"observation": "ERROR: web_search unavailable", "error": True, "hallucinated": False} + return {"observation": web.search(args["query"]), "error": False, "hallucinated": False} + except (KeyError, TypeError) as exc: + return {"observation": f"ERROR: bad args for {name}: {exc}", "error": True, "hallucinated": False} + return {"observation": "ERROR: unreachable", "error": True, "hallucinated": False} + + +def render_transcript(goal: str, history: list[dict[str, Any]]) -> str: + parts = [f"GOAL: {goal}"] + for h in history: + parts.append(f"ASSISTANT: {h['raw'].strip()}") + if "observation" in h: + parts.append(f"OBSERVATION: {h['observation']}") + return "\n".join(parts) + + +SYSTEM_PROMPT = ( + "You are an agent solving a task with tools. Each turn output EITHER exactly:\n" + "ACTION: \n" + "ARGS: \n" + "OR, when done:\n" + "FINAL: \n" + "Use real tools; do not invent tool results." +) + + +def run_agent(step_fn: Callable[[str, int], str], goal: str, ws: Workspace, + web: FrozenWebSearch | None, max_steps: int) -> dict[str, Any]: + """Drive the loop. step_fn(transcript, step_index) -> one step of model text.""" + history: list[dict[str, Any]] = [] + final_answer = "" + tool_calls = tool_errors = hallucinations = invalid = 0 + error_then_progress = False + had_error = False + for i in range(max_steps): + transcript = render_transcript(goal, history) + step_text = step_fn(transcript, i) + action = parse_action(step_text) + record: dict[str, Any] = {"raw": step_text, "kind": action.kind} + if action.kind == "final": + final_answer = action.final + history.append(record) + break + if action.kind == "tool": + tool_calls += 1 + result = execute_tool(ws, web, action) + record.update({"tool": action.tool, "observation": result["observation"]}) + if result["error"]: + tool_errors += 1 + had_error = True + if result["hallucinated"]: + hallucinations += 1 + elif had_error: + # A non-erroring tool call after an earlier error = recovery attempt. + error_then_progress = True + else: + invalid += 1 + record["observation"] = "no valid ACTION or FINAL parsed" + history.append(record) + return { + "final_answer": final_answer, + "history": history, + "steps": len(history), + "tool_calls": tool_calls, + "tool_errors": tool_errors, + "tool_hallucinations": hallucinations, + "invalid_actions": invalid, + "had_error": had_error, + "recovered": error_then_progress, + "raw_concat": "\n".join(h["raw"] for h in history), + } + + +# =========================================================================== +# Tasks: real setup, scripted oracle policy, objective grading +# =========================================================================== +class AgenticTask: + id: str + category: str + tools: tuple[str, ...] + needs_web: bool = False + + def setup(self, ws: Workspace) -> None: ... + def goal(self, ws: Workspace) -> str: ... + def oracle_step(self, transcript: str, step: int) -> str: ... + def grade(self, ws: Workspace, final_answer: str) -> dict[str, Any]: ... + + +class FsWriteConfigTask(AgenticTask): + id = "fs-write-config"; category = "file_workflow"; tools = ("filesystem",) + REQUIRED = {"name": "pie-bench", "retries": 3} + + def setup(self, ws): ws.write_file("README.md", "# target\nput config in config.json\n") + + def goal(self, ws): + return ('Create config.json containing a JSON object with exactly ' + 'name="pie-bench" (string) and retries=3 (integer). Use write_file, then FINAL "done".') + + def oracle_step(self, transcript, step): + if "wrote config.json" not in transcript: + payload = json.dumps({"path": "config.json", + "content": json.dumps(self.REQUIRED)}) + return f"ACTION: write_file\nARGS: {payload}" + return "FINAL: done" + + def grade(self, ws, final_answer): + if not ws.exists("config.json"): + return {"objective_pass": False, "detail": "config.json not created"} + try: + parsed = json.loads(ws.read_file("config.json")) + except (json.JSONDecodeError, OSError) as exc: + return {"objective_pass": False, "detail": f"invalid json: {exc}"} + ok = all(parsed.get(k) == v for k, v in self.REQUIRED.items()) + return {"objective_pass": bool(ok), "detail": "match" if ok else f"mismatch: {parsed}"} + + +class TerminalFixBugTask(AgenticTask): + id = "terminal-fix-bug"; category = "coding_debugging"; tools = ("filesystem", "terminal") + BUGGY = "def inclusive_range(start, end):\n return list(range(start, end))\n" + FIXED = "def inclusive_range(start, end):\n return list(range(start, end + 1))\n" + CHECK = ("from solution import inclusive_range\n" + "assert inclusive_range(2, 4) == [2, 3, 4], inclusive_range(2, 4)\n" + "print('ok')\n") + + def setup(self, ws): + ws.write_file("solution.py", self.BUGGY) + ws.write_file("check.py", self.CHECK) + + def goal(self, ws): + return ("Fix solution.py so inclusive_range(2,4)==[2,3,4] (end is inclusive). " + "Rewrite the file with write_file, run `python3 check.py` with run_terminal, " + 'then FINAL "fixed". check.py must exit 0.') + + def oracle_step(self, transcript, step): + if "wrote solution.py" not in transcript: + return ("ACTION: write_file\nARGS: " + + json.dumps({"path": "solution.py", "content": self.FIXED})) + if "exit=0" not in transcript: + return "ACTION: run_terminal\nARGS: " + json.dumps({"command": "python3 check.py"}) + return "FINAL: fixed" + + def grade(self, ws, final_answer): + if not ws.exists("solution.py"): + return {"objective_pass": False, "detail": "no solution.py"} + res = ws.run_terminal([sys.executable, "check.py"]) + ok = res["exit_code"] == 0 + return {"objective_pass": bool(ok), + "detail": "check exit 0" if ok else f"check exit {res['exit_code']}: {res['stderr'][:160]}", + "terminal_exit_code": res["exit_code"]} + + +class FsTerminalLocateTask(AgenticTask): + id = "fs-terminal-locate"; category = "repo_navigation"; tools = ("filesystem", "terminal") + FILES = {"alpha.py": "def helper_a():\n return 1\n", + "beta.py": "def target_function():\n return 42\n", + "gamma.py": "def helper_c():\n return 3\n"} + + def setup(self, ws): + for name, content in self.FILES.items(): + ws.write_file(name, content) + + def _truth(self, ws): + res = ws.run_terminal(["grep", "-rl", "def target_function", "."]) + match = (res["stdout"].splitlines() or [""])[0].strip() + return Path(match).name if match else "" + + def goal(self, ws): + return ("Exactly one file defines `def target_function`. Use run_terminal with grep " + "to find it, then FINAL the filename only.") + + def oracle_step(self, transcript, step): + if "exit=" not in transcript: + return "ACTION: run_terminal\nARGS: " + json.dumps( + {"command": "grep -rl \"def target_function\" ."}) + m = re.search(r"OBSERVATION:.*?([A-Za-z0-9_]+\.py)", transcript, re.DOTALL) + return f"FINAL: {m.group(1) if m else 'unknown.py'}" + + def grade(self, ws, final_answer): + truth = self._truth(ws) + guess = Path(final_answer.strip().split()[-1]).name if final_answer.strip() else "" + ok = bool(truth) and guess == truth + return {"objective_pass": bool(ok), "detail": f"expected {truth!r}, got {guess!r}", + "ground_truth": truth} + + +class WebResearchTask(AgenticTask): + id = "web-research-fact"; category = "web_research"; tools = ("web_search",); needs_web = True + + def setup(self, ws): pass + + def goal(self, ws): + return ("Use web_search to find what kind of tokens the Pie dummy driver emits, " + "then FINAL that single word (one word, lowercase).") + + def oracle_step(self, transcript, step): + if "OBSERVATION:" not in transcript: + return "ACTION: web_search\nARGS: " + json.dumps({"query": "pie dummy driver tokens"}) + return "FINAL: random" + + def grade(self, ws, final_answer): + word = re.sub(r"[^a-z]", "", final_answer.strip().lower()) + ok = word == "random" + return {"objective_pass": bool(ok), "detail": f"answer={word!r} (want 'random')"} + + +class DataCountTask(AgenticTask): + id = "data-row-count"; category = "data_workflow"; tools = ("filesystem", "terminal") + ROWS = [("id", "value"), ("1", "a"), ("2", "b"), ("3", "c"), ("4", "d")] + + def setup(self, ws): + ws.write_file("data.csv", "\n".join(",".join(r) for r in self.ROWS) + "\n") + + def _truth(self, ws): + return len(self.ROWS) - 1 # exclude header + + def goal(self, ws): + return ("data.csv has a header row plus data rows. Use run_terminal to count the " + "DATA rows (exclude the header), then FINAL that integer.") + + def oracle_step(self, transcript, step): + if "exit=" not in transcript: + return "ACTION: run_terminal\nARGS: " + json.dumps( + {"command": "bash -c \"tail -n +2 data.csv | wc -l\""}) + # Take the last integer in the most recent observation (skip the exit code). + last_obs = transcript.split("OBSERVATION:")[-1] + nums = re.findall(r"\d+", last_obs) + return f"FINAL: {nums[-1] if nums else '0'}" + + def grade(self, ws, final_answer): + nums = re.findall(r"\d+", final_answer) + guess = int(nums[-1]) if nums else None + ok = guess == self._truth(ws) + return {"objective_pass": bool(ok), "detail": f"expected {self._truth(ws)}, got {guess}"} + + +TASKS: list[AgenticTask] = [ + FsWriteConfigTask(), TerminalFixBugTask(), FsTerminalLocateTask(), + WebResearchTask(), DataCountTask(), +] + + +def _normalize_answer(text: str, mode: str) -> str: + t = text.strip() + if mode == "lower_alpha": + return re.sub(r"[^a-z]", "", t.lower()) + if mode == "int": + nums = re.findall(r"-?\d+", t) + return nums[-1] if nums else "" + return t + + +def grade_from_spec(ws: Workspace, final_answer: str, grader: dict) -> dict[str, Any]: + """Objective graders for JSON-defined tasks. No model needed to grade. + + Supported `type`s: file_exists, file_json_equals, file_contains, + terminal_exit_zero, answer_equals, answer_contains. + """ + gtype = grader.get("type") + if gtype == "file_exists": + ok = ws.exists(grader["path"]) + return {"objective_pass": bool(ok), "detail": f"exists={ok}"} + if gtype == "file_json_equals": + if not ws.exists(grader["path"]): + return {"objective_pass": False, "detail": "file missing"} + try: + parsed = json.loads(ws.read_file(grader["path"])) + except (json.JSONDecodeError, OSError) as exc: + return {"objective_pass": False, "detail": f"invalid json: {exc}"} + ok = all(parsed.get(k) == v for k, v in (grader.get("required") or {}).items()) + return {"objective_pass": bool(ok), "detail": "match" if ok else f"got {parsed}"} + if gtype == "file_contains": + if not ws.exists(grader["path"]): + return {"objective_pass": False, "detail": "file missing"} + ok = grader["substring"] in ws.read_file(grader["path"]) + return {"objective_pass": bool(ok), "detail": f"contains={ok}"} + if gtype == "terminal_exit_zero": + res = ws.run_terminal(shlex.split(grader["command"])) + ok = res["exit_code"] == 0 + return {"objective_pass": bool(ok), "detail": f"exit={res['exit_code']}", + "terminal_exit_code": res["exit_code"]} + if gtype == "answer_equals": + norm = grader.get("normalize", "none") + a, b = _normalize_answer(final_answer, norm), _normalize_answer(str(grader.get("expected", "")), norm) + return {"objective_pass": a == b, "detail": f"{a!r} vs {b!r}"} + if gtype == "answer_contains": + ok = grader.get("substring", "").lower() in final_answer.lower() + return {"objective_pass": bool(ok), "detail": f"contains={ok}"} + return {"objective_pass": False, "detail": f"unknown grader type: {gtype}"} + + +class JsonTask(AgenticTask): + """An agentic task defined entirely by a JSON spec -- no Python required. + + Lets the team grow the suite past the built-in tasks. Schema (see + benchmark_tasks.example.json): + id, category, tools, goal + setup_files : {relpath: contents} real files written before the run + oracle : [{action, args} | {final}] a correct trajectory, for self-check + grader : {type, ...} objective check, see grade_from_spec + """ + + def __init__(self, spec: dict) -> None: + self.spec = spec + self.id = spec["id"] + self.category = spec.get("category", "custom") + self.tools = tuple(spec.get("tools", [])) + self.needs_web = "web_search" in self.tools + + def setup(self, ws: Workspace) -> None: + for path, content in (self.spec.get("setup_files") or {}).items(): + ws.write_file(path, content) + + def goal(self, ws: Workspace) -> str: + return self.spec["goal"] + + def oracle_step(self, transcript: str, step: int) -> str: + steps = self.spec.get("oracle") or [] + if not steps: + return "FINAL: " + entry = steps[step] if step < len(steps) else steps[-1] + if "final" in entry: + return f"FINAL: {entry['final']}" + return f"ACTION: {entry['action']}\nARGS: {json.dumps(entry.get('args', {}))}" + + def grade(self, ws: Workspace, final_answer: str) -> dict[str, Any]: + return grade_from_spec(ws, final_answer, self.spec.get("grader") or {}) + + +def load_tasks_from_json(path: str) -> list[AgenticTask]: + data = json.loads(Path(path).read_text(encoding="utf-8")) + specs = data if isinstance(data, list) else data.get("tasks", []) + return [JsonTask(s) for s in specs] + + +# =========================================================================== +# Metrics, pairwise outcomes, Bradley-Terry leaderboard +# =========================================================================== +def trajectory_metrics(traj: dict[str, Any], success: bool) -> dict[str, Any]: + err = traj["tool_errors"] + return { + "task_success": 1 if success else 0, + "steps": traj["steps"], + "tool_calls": traj["tool_calls"], + "tool_errors": err, + "tool_hallucination": 1 if traj["tool_hallucinations"] else 0, + # error_recovery defined only when an error occurred: + "error_recovery": (1 if (traj["recovered"] and success) else 0) if err else None, + "invalid_actions": traj["invalid_actions"], + # steerability (Arena signal): did it stay on-protocol -- no unparseable + # steps and no calls to tools that don't exist. + "steerability": 1 if (traj["invalid_actions"] == 0 and not traj["tool_hallucinations"]) else 0, + } + + +def pairwise_outcome(a: dict[str, Any], b: dict[str, Any]) -> float: + """Return 1.0 if arm a beats b on a task, 0.0 if loses, 0.5 if tie. + + Primary key: objective success. Tie-break: fewer steps, then fewer tool errors. + """ + sa, sb = a.get("task_success", 0), b.get("task_success", 0) + if sa != sb: + return 1.0 if sa > sb else 0.0 + for key in ("steps", "tool_errors"): + va, vb = a.get(key, 0), b.get(key, 0) + if va != vb: + return 1.0 if va < vb else 0.0 + return 0.5 + + +def bradley_terry(arms: list[str], wins: dict[tuple[str, str], float], + iters: int = 200, smoothing: float = 0.5) -> dict[str, float]: + """Fit Bradley-Terry strengths from pairwise win totals via the MM algorithm. + + wins[(i, j)] = number of times i beat j (ties count 0.5 to each side). + `smoothing` adds symmetric pseudo-comparisons so all-win/all-loss stays finite. + Returns normalized strengths summing to 1. + """ + p = {a: 1.0 for a in arms} + # total wins per arm (with smoothing) and pair counts + pair = {} + for i in arms: + for j in arms: + if i < j: + w_ij = wins.get((i, j), 0.0) + smoothing + w_ji = wins.get((j, i), 0.0) + smoothing + pair[(i, j)] = (w_ij, w_ji) + for _ in range(iters): + new = {} + for i in arms: + num = 0.0 # total wins of i + den = 0.0 + for j in arms: + if i == j: + continue + key = (i, j) if i < j else (j, i) + w_ij, w_ji = pair[key] + wi = w_ij if i < j else w_ji + wj = w_ji if i < j else w_ij + n = wi + wj + num += wi + den += n / (p[i] + p[j]) + new[i] = num / den if den else p[i] + s = sum(new.values()) or 1.0 + p = {a: v / s for a, v in new.items()} + return dict(sorted(p.items(), key=lambda kv: kv[1], reverse=True)) + + +def wilson_interval(successes: float, n: int, z: float = 1.96) -> tuple[float, float]: + """Wilson 95% score interval for a success rate (what Arena reports as a CI). + + Returns (low, high) in [0, 1]; n == 0 -> (0.0, 0.0). More honest than a bare + rate on the small N a local suite produces. + """ + if n <= 0: + return (0.0, 0.0) + phat = successes / n + denom = 1.0 + z * z / n + center = (phat + z * z / (2 * n)) / denom + margin = (z * math.sqrt(phat * (1 - phat) / n + z * z / (4 * n * n))) / denom + return (round(max(0.0, center - margin), 3), round(min(1.0, center + margin), 3)) + + +def extended_bradley_terry(arm_features: dict[str, dict[str, float]], + wins: dict[tuple[str, str], float], + l2: float = 0.01, lr: float = 0.3, iters: int = 1000) -> dict[str, float]: + """Arena-style Extended Bradley-Terry: attribute strength to components. + + Each arm is a binary feature vector over its components (method, mode, + language). We fit weights so P(i beats j) = sigmoid(w . (x_i - x_j)) by + gradient ascent on the pairwise log-likelihood with L2 -- the same + design-matrix logistic-regression idea Agent Arena uses to score how much each + model / framework / tool contributes. Returns a weight per component. + """ + features = sorted({f for feats in arm_features.values() for f in feats}) + w = {f: 0.0 for f in features} + total = sum(c for c in wins.values() if c > 0) or 1.0 + for _ in range(iters): + grad = {f: 0.0 for f in features} + for (i, j), c in wins.items(): + if c <= 0 or i not in arm_features or j not in arm_features: + continue + xi, xj = arm_features[i], arm_features[j] + z = sum(w[f] * (xi.get(f, 0.0) - xj.get(f, 0.0)) for f in features) + z = max(-30.0, min(30.0, z)) # clamp so exp() stays finite + p = 1.0 / (1.0 + math.exp(-z)) + for f in features: + grad[f] += c * (1.0 - p) * (xi.get(f, 0.0) - xj.get(f, 0.0)) + for f in features: # averaged gradient step minus L2 pull toward 0 + w[f] = w[f] + lr * (grad[f] / total - l2 * w[f]) + return dict(sorted(w.items(), key=lambda kv: kv[1], reverse=True)) + + +def build_leaderboard(rows: list[dict[str, Any]]) -> dict[str, Any]: + """Aggregate per-arm stats + a Bradley-Terry ranking over shared tasks.""" + arms = sorted({r["arm"] for r in rows if r.get("arm")}) + by_arm: dict[str, list[dict[str, Any]]] = {a: [] for a in arms} + for r in rows: + if r.get("arm"): + by_arm[r["arm"]].append(r) + + # Pairwise wins over rows that share a (task_id, language). + wins: dict[tuple[str, str], float] = {} + keyed: dict[tuple[str, str], dict[str, dict[str, Any]]] = {} + for r in rows: + if not r.get("arm"): + continue + keyed.setdefault((r["task_id"], r.get("language", "")), {})[r["arm"]] = r + for arm_rows in keyed.values(): + present = list(arm_rows) + for x in range(len(present)): + for y in range(x + 1, len(present)): + i, j = present[x], present[y] + s = pairwise_outcome(arm_rows[i], arm_rows[j]) + wins[(i, j)] = wins.get((i, j), 0.0) + s + wins[(j, i)] = wins.get((j, i), 0.0) + (1.0 - s) + + bt = bradley_terry(arms, wins) if len(arms) >= 2 else {a: 1.0 for a in arms} + + summary = [] + for a in arms: + rs = by_arm[a] + n = len(rs) + succ = sum(r.get("task_success", 0) for r in rs) + steps = [r.get("steps", 0) for r in rs] + errs = sum(r.get("tool_errors", 0) for r in rs) + summary.append({ + "arm": a, + "bt_score": round(bt.get(a, 0.0), 4), + "success_rate": round(succ / n, 3) if n else 0.0, + "success_ci": list(wilson_interval(succ, n)), + "n": n, + "avg_steps": round(sum(steps) / n, 2) if n else 0.0, + "tool_errors": errs, + }) + summary.sort(key=lambda s: (s["bt_score"], s["success_rate"]), reverse=True) + + # Arena-style component attribution: split each arm into method/mode/language + # and fit how much each component contributes to winning. + arm_features: dict[str, dict[str, float]] = {} + for r in rows: + a = r.get("arm") + if not a or not r.get("family"): + continue + arm_features[a] = {f"method:{r['family']}": 1.0, + f"mode:{r.get('mode', '')}": 1.0, + f"lang:{r.get('language', '')}": 1.0} + components = extended_bradley_terry(arm_features, wins) if len(arm_features) >= 2 else {} + return {"arms": summary, "bt": bt, "components": components} + + +def render_leaderboard_md(board: dict[str, Any]) -> str: + lines = ["# Agentic benchmark leaderboard", + "", + "Arms ranked by Bradley-Terry strength over pairwise objective outcomes", + "(same pairwise idea as Agent Arena, with an objective judge).", + "", + "| Rank | Arm | BT score | Success | 95% CI | Avg steps | Tool errors | n |", + "|---|---|---|---|---|---|---|---|"] + for i, s in enumerate(board["arms"], 1): + ci = s.get("success_ci", [0.0, 0.0]) + lines.append(f"| {i} | {s['arm']} | {s['bt_score']} | {s['success_rate']} | " + f"[{ci[0]}, {ci[1]}] | {s['avg_steps']} | {s['tool_errors']} | {s['n']} |") + comps = board.get("components") or {} + if comps: + lines += ["", "## Component contributions (Extended Bradley-Terry)", + "Higher weight = that component (method / mode / language) helps win", + "pairwise comparisons, holding the others fixed.", "", + "| Component | Weight |", "|---|---|"] + lines += [f"| {name} | {round(wgt, 4)} |" for name, wgt in comps.items()] + return "\n".join(lines) + "\n" + + +# =========================================================================== +# Backends +# =========================================================================== +def _utc_now() -> str: + return datetime.now(timezone.utc).isoformat().replace("+00:00", "Z") + + +def base_row(task: AgenticTask, family: str, language: str, mode: str) -> dict[str, Any]: + arm = f"{family}-{language}-{mode}" if family else "oracle" + return { + "generated_at": _utc_now(), + "arena_reference": ARENA_REFERENCE, + "benchmark_kind": "agentic_agent_loop_objective", + "task_id": task.id, "task_category": task.category, "tools": list(task.tools), + "family": family, "language": language, "mode": mode, "arm": arm, + } + + +def run_task_offline(task: AgenticTask, max_steps: int, web: Any = None) -> dict[str, Any]: + ws = Workspace() + web = web or FrozenWebSearch() + try: + task.setup(ws) + traj = run_agent(lambda t, i: task.oracle_step(t, i), task.goal(ws), ws, + web, max_steps) + grade = task.grade(ws, traj["final_answer"]) + row = base_row(task, "", "", "oracle") + row.update(grade) + row.update(trajectory_metrics(traj, grade["objective_pass"])) + row.update({"status": "pass", "final_excerpt": traj["final_answer"][:200], + "history_len": traj["steps"]}) + return row + finally: + ws.cleanup() + + +def make_pie_step_fn(run_inferlet_fn, client, family, language, task, mode, args): + """A step function that calls one inferlet generation per agent step.""" + inferlet = f"{family}-{language}" + + async def _astep(transcript: str, step: int) -> str: + prompt = f"{SYSTEM_PROMPT}\n\n{transcript}\n\nYour next step:" + if family == "modular-cache": + use_cache = mode == "method" + modules = [ + {"id": "system/agent", "role": "system", "deps": [], "text": SYSTEM_PROMPT}, + {"id": f"task/{task.id}", "role": "user", "deps": ["system/agent"], + "text": f"GOAL: {task.goal_cached}"}, + {"id": "scratchpad/now", "role": "user", "deps": [f"task/{task.id}"], + "text": transcript}, + ] + params = {"modules": modules, "max_tokens": args.max_tokens, + "use_cache": use_cache, "save_cache": use_cache} + elif family == "hierarchical-attention": + params = {"prompt": prompt, "max_tokens": args.max_tokens, + "chunk_size_words": 45, + "selected_chunks": 999 if mode == "baseline" else 2, + "selection_mode": "all-visible-baseline" if mode == "baseline" else "lexical"} + else: + is_base = mode == "baseline" + params = {"prompt": prompt, "max_iterations": 1 if is_base else args.mcts_iterations, + "max_depth": 1 if is_base else 2, "branch_factor": 1 if is_base else 2, + "rollout_tokens": 32, "final_tokens": args.max_tokens, "show_trace": True} + return await run_inferlet_fn(client, inferlet, params, timeout=args.timeout) + + return _astep + + +async def run_task_pie(run_inferlet_fn, client, family, language, task, mode, args, web=None) -> dict[str, Any]: + ws = Workspace() + web = web or FrozenWebSearch() + task.goal_cached = task.goal(ws) # stable text for modular-cache modules + astep = make_pie_step_fn(run_inferlet_fn, client, family, language, task, mode, args) + history: list[dict[str, Any]] = [] + counts = {"tool_calls": 0, "tool_errors": 0, "tool_hallucinations": 0, "invalid": 0, + "had_error": False, "recovered": False} + final_answer, raw_parts = "", [] + try: + task.setup(ws) + start = time.time() + for i in range(args.max_steps): + transcript = render_transcript(task.goal_cached, history) + try: + step_text = await astep(transcript, i) + except Exception as exc: # noqa: BLE001 + row = base_row(task, family, language, mode) + row.update({"status": "fail", "objective_pass": False, + "detail": f"launch error: {exc}"[:200], + "task_success": 0, "steps": i, + "runtime_seconds": round(time.time() - start, 3)}) + return row + raw_parts.append(step_text) + action = parse_action(step_text) + record = {"raw": step_text, "kind": action.kind} + if action.kind == "final": + final_answer = action.final + history.append(record) + break + if action.kind == "tool": + counts["tool_calls"] += 1 + res = execute_tool(ws, web, action) + record.update({"tool": action.tool, "observation": res["observation"]}) + if res["error"]: + counts["tool_errors"] += 1 + counts["had_error"] = True + if res["hallucinated"]: + counts["tool_hallucinations"] += 1 + elif counts["had_error"]: + counts["recovered"] = True + else: + counts["invalid"] += 1 + record["observation"] = "no valid ACTION or FINAL parsed" + history.append(record) + elapsed = time.time() - start + traj = {"final_answer": final_answer, "steps": len(history), + "tool_calls": counts["tool_calls"], "tool_errors": counts["tool_errors"], + "tool_hallucinations": counts["tool_hallucinations"], + "invalid_actions": counts["invalid"], "recovered": counts["recovered"]} + grade = task.grade(ws, final_answer) + row = base_row(task, family, language, mode) + row.update(grade) + row.update(trajectory_metrics(traj, grade["objective_pass"])) + row.update({"status": "pass", "control_flow_ok": bool("".join(raw_parts).strip()), + "runtime_seconds": round(elapsed, 3), + "final_excerpt": final_answer[:200]}) + # Best-effort family signal metrics from the concatenated step logs. + row.update(_family_signals(family, "\n".join(raw_parts))) + return row + finally: + ws.cleanup() + + +def _family_signals(family: str, text: str) -> dict[str, Any]: + try: + import run_arena_benchmark as A # reuse the already-tested parsers + except Exception: # noqa: BLE001 + return {} + if family == "modular-cache": + return {f"sig_{k}": v for k, v in A.parse_modular_metrics(text).items()} + if family == "hierarchical-attention": + return {f"sig_{k}": v for k, v in A.parse_ha_metrics(text).items()} + if family == "mcts": + return {f"sig_{k}": v for k, v in A.parse_mcts_metrics(text).items()} + return {} + + +# =========================================================================== +# Output + drivers +# =========================================================================== +def write_results(rows: list[dict[str, Any]], out_dir: Path) -> dict[str, Any]: + out_dir.mkdir(parents=True, exist_ok=True) + (out_dir / "agentic_results.jsonl").write_text( + "".join(json.dumps(r, sort_keys=True) + "\n" for r in rows), encoding="utf-8") + keys: list[str] = [] + for row in rows: + for k in row: + if k not in keys: + keys.append(k) + with (out_dir / "agentic_results.csv").open("w", newline="", encoding="utf-8") as f: + w = csv.DictWriter(f, fieldnames=keys) + w.writeheader() + for row in rows: + w.writerow({k: json.dumps(v, sort_keys=True) if isinstance(v, (list, dict)) else v + for k, v in row.items()}) + board = build_leaderboard(rows) + (out_dir / "leaderboard.md").write_text(render_leaderboard_md(board), encoding="utf-8") + return board + + +def summarize(rows, board, out_dir): + total = len(rows) + launched = sum(1 for r in rows if r.get("status") == "pass") + objective = sum(1 for r in rows if r.get("objective_pass") is True) + print(f"\nAgentic rows: {total}") + print(f"Launched/ran: {launched}/{total}") + print(f"Objective pass: {objective}/{total}") + if board["arms"]: + print("\nLeaderboard (Bradley-Terry over pairwise objective outcomes):") + for i, s in enumerate(board["arms"], 1): + ci = s.get("success_ci", [0.0, 0.0]) + print(f" {i:2d}. {s['arm']:34s} bt={s['bt_score']:.4f} " + f"success={s['success_rate']:.2f} ci=[{ci[0]},{ci[1]}] steps={s['avg_steps']:.1f}") + if board.get("components"): + print("\nComponent contributions (Extended Bradley-Terry):") + for name, wgt in board["components"].items(): + print(f" {name:28s} {wgt:+.4f}") + print(f"\nJSONL: {out_dir / 'agentic_results.jsonl'}") + print(f"CSV: {out_dir / 'agentic_results.csv'}") + print(f"Board: {out_dir / 'leaderboard.md'}") + + +def load_pie_helpers(): + sys.path.insert(0, str(PIE_REPO / "client" / "python" / "src")) + sys.path.insert(0, str(PIE_REPO / "tests" / "inferlets")) + sys.path.insert(0, str(BENCHES_DIR)) + from pie_client import PieClient # noqa: PLC0415 + from conftest import run_inferlet # noqa: PLC0415 + return PieClient, run_inferlet + + +def _task_pool(args) -> list[AgenticTask]: + pool = list(TASKS) + if getattr(args, "tasks_file", None): + pool += load_tasks_from_json(args.tasks_file) + return pool + + +def selected_tasks(args) -> list[AgenticTask]: + pool = _task_pool(args) + if args.tasks == "all": + return pool + wanted = {t.strip() for t in args.tasks.split(",") if t.strip()} + chosen = [t for t in pool if t.id in wanted or t.category in wanted] + if not chosen: + raise SystemExit(f"No tasks matched: {args.tasks}") + return chosen + + +def run_offline_self_check(args) -> int: + web = make_web_backend(args) + tasks = selected_tasks(args) + rows = [run_task_offline(task, args.max_steps, web) for task in tasks] + out_dir = Path(args.out_dir) + board = write_results(rows, out_dir) + summarize(rows, board, out_dir) + passed = all(r.get("objective_pass") for r in rows) + print("\nOK: agent loop + real tools + objective graders verified." if passed + else "\nFAIL: harness self-check did not pass.") + return 0 if passed else 1 + + +async def run_pie(args) -> int: + families = [f.strip() for f in args.families.split(",") if f.strip()] + languages = [l.strip() for l in args.languages.split(",") if l.strip()] + web = make_web_backend(args) + tasks = selected_tasks(args) + PieClient, run_inferlet_fn = load_pie_helpers() + client = PieClient(args.url) + await client.connect() + if args.token and args.token.lower() != "none": + await client.auth_by_token(args.token) + rows: list[dict[str, Any]] = [] + try: + for task in tasks: + print(f"\n== {task.id} ({task.category}) tools={task.tools} ==") + for language in languages: + for family in families: + for mode in ("baseline", "method"): + print(f" {family}-{language} [{mode}]", flush=True) + rows.append(await run_task_pie(run_inferlet_fn, client, family, + language, task, mode, args, web)) + finally: + await client.close() + out_dir = Path(args.out_dir) + board = write_results(rows, out_dir) + summarize(rows, board, out_dir) + return 0 if all(r.get("status") == "pass" for r in rows) else 1 + + +def run_report(args) -> int: + rows = [json.loads(line) for line in Path(args.report).read_text().splitlines() if line.strip()] + out_dir = Path(args.out_dir) + board = write_results(rows, out_dir) + summarize(rows, board, out_dir) + return 0 + + +def make_parser() -> argparse.ArgumentParser: + p = argparse.ArgumentParser(description="Realistic agentic benchmark: real agent loop + tools.") + p.add_argument("url", nargs="?", help="Pie websocket URL, e.g. ws://127.0.0.1:8080") + p.add_argument("token", nargs="?", default="none", help="Pie internal token, or none") + p.add_argument("--offline-self-check", action="store_true", + help="Run the loop with the oracle policy to verify tools+graders (no model).") + p.add_argument("--report", help="Rebuild the leaderboard from an existing JSONL file.") + p.add_argument("--out-dir", default="benchmark_results/agentic") + p.add_argument("--families", default=",".join(FAMILIES)) + p.add_argument("--languages", default=",".join(LANGUAGES)) + p.add_argument("--tasks", default="all", help="all, or comma-separated ids/categories") + p.add_argument("--tasks-file", help="Load extra agentic tasks from a JSON file") + p.add_argument("--web-search", choices=("frozen", "live"), default="frozen", + help="frozen offline corpus (default, deterministic) or a live search API") + p.add_argument("--web-search-provider", default="brave", help="live search provider") + p.add_argument("--web-search-key-env", default="WEB_SEARCH_API_KEY", + help="env var holding the live search API key") + p.add_argument("--max-steps", type=int, default=4) + p.add_argument("--timeout", type=int, default=240) + p.add_argument("--max-tokens", type=int, default=64) + p.add_argument("--mcts-iterations", type=int, default=3) + return p + + +def main() -> int: + args = make_parser().parse_args() + if args.report: + return run_report(args) + if args.offline_self_check: + return run_offline_self_check(args) + if not args.url: + make_parser().error("url is required unless --offline-self-check or --report is set") + return asyncio.run(run_pie(args)) + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/benches/run_arena_benchmark.py b/benches/run_arena_benchmark.py new file mode 100644 index 000000000..429a60700 --- /dev/null +++ b/benches/run_arena_benchmark.py @@ -0,0 +1,767 @@ +#!/usr/bin/env python3 +"""Arena-style benchmark runner for the custom test-time-scaling inferlets. + +The professor's reference is Agent Arena by Arena/LMArena. The real Agent Arena +is a live, web-based platform: an "agent" is a (model + framework + tools) +configuration (e.g. GPT-4o + LangChain + Brave Search), and agents are ranked by +pairwise human votes on community tasks from its Prompt Hub, scored with an +Extended Bradley-Terry model that attributes credit to the model, framework, and +tool components. See https://arena.ai/blog/agent-arena/. + +Because that leaderboard is produced by live human preference votes, it cannot be +reproduced offline: there is no public Agent Arena task export or scoring API to +run locally against these inferlets. Instead, this runner +creates a local, deterministic Arena-style harness around the three inferlet +families that mirrors Agent Arena's tool-using workflow categories. The five +proxy signals below (task success, steerability, error recovery, user-feedback, +tool hallucination) are OUR local design choices, not numbers reported by Arena: + + * modular-cache: repeated related launches on one persistent engine + * hierarchical-attention: long-context task prompts with selected visibility + * mcts: retry/search-style reasoning over verifiable agent tasks + +It writes JSONL and CSV rows with common proxy metrics plus family-specific +control-flow metrics parsed from the inferlet logs. +""" + +from __future__ import annotations + +import argparse +import asyncio +import csv +import json +import re +import sys +import time +from datetime import datetime, timezone +from pathlib import Path +from typing import Any + + +ROOT = Path(__file__).resolve().parent.parent +PIE_REPO = ROOT +# Canonical Agent Arena reference (Arena / LMArena). This is the live, human-voted +# platform we mirror locally; it is not an offline benchmark we can score against. +ARENA_REFERENCE = "https://arena.ai/blog/agent-arena/" + +LANGUAGES = ("rust", "python", "js") +FAMILIES = ("modular-cache", "hierarchical-attention", "mcts") + + +TASKS: list[dict[str, Any]] = [ + { + "id": "code-debug-test-failure", + "category": "coding_debugging", + "workflow": "Diagnose a failing unit test from a terminal log and patch notes.", + "stable_context": ( + "Repository note: a scheduler changed from inclusive to exclusive end " + "indices. Terminal log: test_window_end fails with expected [2, 3, 4] " + "but got [2, 3]. The likely bug is an off-by-one range end." + ), + "initial_request": ( + "Explain the smallest code fix and the regression test that should be " + "added. Mention the off-by-one range end." + ), + "followup_request": ( + "Now phrase the same fix as a short pull-request review comment." + ), + "required_terms": ["off-by-one", "range", "test"], + "forbidden_terms": ["i ran pytest", "i opened the file"], + "error_terms": ["fails", "expected", "got"], + }, + { + "id": "web-research-source-triage", + "category": "web_research", + "workflow": "Synthesize a web-research answer from supplied search snippets.", + "stable_context": ( + "Search snippets supplied by the environment: Source A says Agent Arena " + "uses real sessions with web search, filesystem, and terminal tools. " + "Source B says its signals include task success, steerability, error " + "recovery, user praise versus complaint, and tool hallucination." + ), + "initial_request": ( + "Summarize the benchmark idea in three sentences and name the five " + "evaluation signals." + ), + "followup_request": ( + "Turn the same research into a concise note for an engineering team." + ), + "required_terms": ["task success", "steerability", "tool hallucination"], + "forbidden_terms": ["i searched the web", "live browser"], + "error_terms": [], + }, + { + "id": "document-analysis-action-items", + "category": "document_analysis", + "workflow": "Extract action items from a provided project memo.", + "stable_context": ( + "Project memo: We verified smoke tests with the dummy driver. Next, " + "we need an Arena-style benchmark runner, JSONL/CSV output, a real-model " + "path, and documentation that dummy results are not quality claims." + ), + "initial_request": ( + "Extract the action items and separate completed smoke-test work from " + "remaining benchmark work." + ), + "followup_request": ( + "Rewrite the action items as next-week priorities with owners omitted." + ), + "required_terms": ["jsonl", "csv", "dummy"], + "forbidden_terms": ["attached pdf", "i read the file system"], + "error_terms": [], + }, + { + "id": "app-build-acceptance-criteria", + "category": "app_building", + "workflow": "Turn app requirements into implementation acceptance criteria.", + "stable_context": ( + "App request: build a small benchmark dashboard with task filters, " + "per-inferlet rows, pass/fail status, raw-output links, and a chart for " + "task-success proxy by family. Constraint: no marketing landing page." + ), + "initial_request": ( + "Write implementation acceptance criteria for the dashboard." + ), + "followup_request": ( + "Condense the acceptance criteria into a QA checklist." + ), + "required_terms": ["filters", "pass", "raw-output"], + "forbidden_terms": ["hero section", "landing page"], + "error_terms": [], + }, + { + "id": "slide-deck-outline", + "category": "slide_deck", + "workflow": "Create a short slide outline from project notes.", + "stable_context": ( + "Talk notes: introduce Pie inferlets, explain the three examples, show " + "how Arena-style evaluation differs from dummy smoke tests, then close " + "with limitations and next steps." + ), + "initial_request": ( + "Create a five-slide outline with one key takeaway per slide." + ), + "followup_request": ( + "Rewrite the outline for a professor-facing project update." + ), + "required_terms": ["pie", "arena", "limitations"], + "forbidden_terms": ["image generation", "speaker notes created"], + "error_terms": [], + }, +] + + +def _utc_now() -> str: + return datetime.now(timezone.utc).isoformat().replace("+00:00", "Z") + + +def _excerpt(text: str, limit: int = 500) -> str: + cleaned = " ".join(text.split()) + if len(cleaned) <= limit: + return cleaned + return cleaned[: limit - 3] + "..." + + +def _last_int(pattern: str, text: str) -> int | None: + values = re.findall(pattern, text) + return int(values[-1]) if values else None + + +def _last_float(pattern: str, text: str) -> float | None: + values = re.findall(pattern, text) + return float(values[-1]) if values else None + + +def inferlet_name(family: str, language: str) -> str: + return f"{family}-{language}" + + +def make_modules(task: dict[str, Any], request: str) -> list[dict[str, Any]]: + """Build stable modules plus one task module for modular-cache runs.""" + return [ + { + "id": "system/arena-evaluator", + "role": "system", + "deps": [], + "text": ( + "You are evaluating an agentic workflow. Use only the supplied " + "task notes and tool transcript. Do not claim to have used tools." + ), + }, + { + "id": "policy/output", + "role": "user", + "deps": ["system/arena-evaluator"], + "text": ( + "Prefer concise, checkable answers. If an environment error is " + "shown, explain the recovery step." + ), + }, + { + "id": f"context/{task['category']}", + "role": "user", + "deps": ["policy/output"], + "text": f"Workflow category: {task['category']}. {task['workflow']}", + }, + { + "id": f"evidence/{task['id']}", + "role": "user", + "deps": [f"context/{task['category']}"], + "text": task["stable_context"], + }, + { + "id": "task/current", + "role": "user", + "deps": [f"evidence/{task['id']}"], + "text": request, + }, + ] + + +def make_ha_prompt(task: dict[str, Any]) -> str: + distractors = [ + ( + "Chunk: deployment note. The build cache stores generated artifacts " + "that should not be committed. This is unrelated to the answer." + ), + ( + "Chunk: meeting note. The team prefers short updates, plain language, " + "and direct limitations when reporting prototype results." + ), + ( + "Chunk: benchmark evidence. " + + task["stable_context"] + + " Request: " + + task["initial_request"] + ), + ( + "Chunk: unrelated research. A generic leaderboard may rank chat " + "quality, but this task is about tool-using agent workflows." + ), + ] + return ( + "You are answering an Arena-style agent evaluation task. Use the relevant " + "chunk and ignore unrelated chunks.\n\n" + + "\n\n".join(distractors) + + "\n\nFinal question: " + + task["initial_request"] + ) + + +def make_mcts_prompt(task: dict[str, Any]) -> str: + return ( + f"Agent task category: {task['category']}\n" + f"Workflow: {task['workflow']}\n" + f"Evidence:\n{task['stable_context']}\n\n" + f"Goal: {task['initial_request']}\n" + "Return a concise answer that would pass an objective project review." + ) + + +def proxy_grade(output: str, task: dict[str, Any]) -> dict[str, Any]: + """Local proxies for Arena-style quality signals. + + These are intentionally simple and deterministic. They are not equivalent + to Arena's live user-feedback or causal leaderboard methodology. + """ + lower = output.lower() + required = [term.lower() for term in task.get("required_terms", [])] + forbidden = [term.lower() for term in task.get("forbidden_terms", [])] + error_terms = [term.lower() for term in task.get("error_terms", [])] + + required_hits = [term for term in required if term in lower] + forbidden_hits = [term for term in forbidden if term in lower] + error_hits = [term for term in error_terms if term in lower] + + hallucination_phrases = [ + "i searched", + "i ran", + "i opened", + "i clicked", + "browser showed", + "terminal output shows", + ] + hallucinations = [phrase for phrase in hallucination_phrases if phrase in lower] + + task_success = 1.0 if required and len(required_hits) == len(required) else 0.0 + if not required: + task_success = 1.0 if output.strip() else 0.0 + + steerability = 1.0 if not forbidden_hits else 0.0 + error_recovery = 1.0 + if error_terms: + recovery_words = ("fix", "recover", "correct", "because", "root cause") + error_recovery = 1.0 if error_hits and any(w in lower for w in recovery_words) else 0.0 + + tool_hallucination = 1.0 if hallucinations else 0.0 + user_feedback = (task_success + steerability + error_recovery + (1.0 - tool_hallucination)) / 4.0 + + return { + "task_success_proxy": round(task_success, 3), + "steerability_proxy": round(steerability, 3), + "error_recovery_proxy": round(error_recovery, 3), + "user_feedback_proxy": round(user_feedback, 3), + "tool_hallucination_proxy": round(tool_hallucination, 3), + "required_hits": required_hits, + "forbidden_hits": forbidden_hits, + "tool_hallucination_hits": hallucinations, + } + + +def parse_modular_metrics(output: str) -> dict[str, Any]: + module_values = re.findall(r"(?m)^modules=(\d+)", output) + return { + "modules": int(module_values[-1]) if module_values else None, + "cache_miss_seen": "cache_miss" in output, + "cache_hit_modules": _last_int(r"cache_hit_modules=(\d+)", output), + "saved_count": len(re.findall(r"\bsaved=", output)), + "save_skipped_count": len(re.findall(r"\bsave_skipped=", output)), + "use_cache_false_seen": "use_cache=false" in output, + } + + +def modular_first_launch_ok(output: str, expected_full_hit: int) -> bool: + """First cache-reuse launch can miss or reuse a shorter warm prefix. + + In a full benchmark suite, earlier tasks can warm shared system/policy + modules in the same persistent engine. That is valid as long as the first + launch is not already a full hit for this exact task. + """ + metrics = parse_modular_metrics(output) + first_hit = metrics.get("cache_hit_modules") + return "cache_miss" in output or (first_hit is not None and first_hit < expected_full_hit) + + +def parse_ha_metrics(output: str) -> dict[str, Any]: + selected_match = re.search(r"selected_chunk=\[([^\]]*)\]", output) + selected = [] + if selected_match: + selected = [int(x) for x in re.findall(r"\d+", selected_match.group(1))] + + mask_match = re.search(r"mask_true_tokens=(\d+)\s*/\s*total=(\d+)", output) + true_tokens = int(mask_match.group(1)) if mask_match else None + total_tokens = int(mask_match.group(2)) if mask_match else None + visible_ratio = None + if true_tokens is not None and total_tokens: + visible_ratio = round(true_tokens / total_tokens, 4) + + return { + "chunks": _last_int(r"chunks=(\d+)", output), + "selected_chunks": selected, + "mask_true_tokens": true_tokens, + "mask_total_tokens": total_tokens, + "visible_token_ratio": visible_ratio, + "generated_tokens": _last_int(r"generated_tokens=(\d+)", output), + } + + +def parse_mcts_metrics(output: str) -> dict[str, Any]: + rollout_scores = [float(x) for x in re.findall(r"rollout_score=([0-9.]+)", output)] + return { + "iterations": _last_int(r"iterations=(\d+)", output), + "nodes": _last_int(r"nodes=(\d+)", output), + "best_score": _last_float(r"best_score=([0-9.]+)", output), + "rollout_scores": rollout_scores, + "score_variance_seen": len(set(rollout_scores)) > 1, + } + + +def base_row(family: str, language: str, inferlet: str, task: dict[str, Any], mode: str) -> dict[str, Any]: + return { + "generated_at": _utc_now(), + "arena_reference": ARENA_REFERENCE, + "benchmark_kind": "local_arena_style_proxy", + "family": family, + "language": language, + "inferlet": inferlet, + "mode": mode, + "task_id": task["id"], + "task_category": task["category"], + "workflow": task["workflow"], + } + + +async def launch_case(run_inferlet_fn, client, inferlet: str, params: dict[str, Any], timeout: int) -> tuple[str, str, float]: + start = time.time() + output = await run_inferlet_fn(client, inferlet, params, timeout=timeout) + return output, "pass", time.time() - start + + +def failure_row( + family: str, + language: str, + inferlet: str, + task: dict[str, Any], + mode: str, + exc: Exception, + elapsed: float, +) -> dict[str, Any]: + row = base_row(family, language, inferlet, task, mode) + row.update( + { + "status": "fail", + "control_flow_ok": False, + "runtime_seconds": round(elapsed, 3), + "error": str(exc)[:500], + } + ) + return row + + +async def run_modular_cache(run_inferlet_fn, client, language: str, task: dict[str, Any], args) -> list[dict[str, Any]]: + family = "modular-cache" + inferlet = inferlet_name(family, language) + rows = [] + + baseline_params = { + "modules": make_modules(task, task["initial_request"]), + "max_tokens": args.max_tokens, + "use_cache": False, + "save_cache": False, + } + start = time.time() + try: + output, status, elapsed = await launch_case( + run_inferlet_fn, client, inferlet, baseline_params, args.timeout + ) + except Exception as exc: # noqa: BLE001 + rows.append(failure_row(family, language, inferlet, task, "baseline_no_cache", exc, time.time() - start)) + else: + row = base_row(family, language, inferlet, task, "baseline_no_cache") + row.update(proxy_grade(output, task)) + row.update(parse_modular_metrics(output)) + row.update( + { + "status": status, + "control_flow_ok": "use_cache=false" in output and "cache_hit_modules" not in output, + "runtime_seconds": round(elapsed, 3), + "launches": 1, + "output_excerpt": _excerpt(output), + } + ) + rows.append(row) + + modules_first = make_modules(task, task["initial_request"]) + modules_followup = make_modules(task, task["followup_request"]) + expected_full = len(modules_first) + expected_partial = len(modules_first) - 1 + params_first = {"modules": modules_first, "max_tokens": args.max_tokens} + params_followup = {"modules": modules_followup, "max_tokens": args.max_tokens} + + start = time.time() + try: + out1, _, _ = await launch_case(run_inferlet_fn, client, inferlet, params_first, args.timeout) + out2, _, _ = await launch_case(run_inferlet_fn, client, inferlet, params_first, args.timeout) + out3, _, _ = await launch_case(run_inferlet_fn, client, inferlet, params_followup, args.timeout) + elapsed = time.time() - start + except Exception as exc: # noqa: BLE001 + rows.append(failure_row(family, language, inferlet, task, "method_cache_reuse", exc, time.time() - start)) + else: + hit_full = parse_modular_metrics(out2).get("cache_hit_modules") + hit_partial = parse_modular_metrics(out3).get("cache_hit_modules") + first_metrics = parse_modular_metrics(out1) + first_hit = first_metrics.get("cache_hit_modules") + first_launch_ok = modular_first_launch_ok(out1, expected_full) + combined = "\n".join([out1, out2, out3]) + row = base_row(family, language, inferlet, task, "method_cache_reuse") + row.update(proxy_grade(out3, task)) + row.update(parse_modular_metrics(combined)) + row.update( + { + "status": "pass", + "control_flow_ok": ( + first_launch_ok + and hit_full == expected_full + and hit_partial == expected_partial + ), + "runtime_seconds": round(elapsed, 3), + "launches": 3, + "first_launch_hit_modules": first_hit, + "expected_full_hit_modules": expected_full, + "full_hit_modules": hit_full, + "expected_partial_hit_modules": expected_partial, + "partial_hit_modules": hit_partial, + "output_excerpt": _excerpt(combined), + } + ) + rows.append(row) + + return rows + + +async def run_hierarchical_attention(run_inferlet_fn, client, language: str, task: dict[str, Any], args) -> list[dict[str, Any]]: + family = "hierarchical-attention" + inferlet = inferlet_name(family, language) + rows = [] + prompt = make_ha_prompt(task) + common = { + "prompt": prompt, + "max_tokens": args.max_tokens, + "chunk_size_words": args.ha_chunk_words, + "sink_tokens": args.ha_sink_tokens, + "summary_tokens_per_chunk": args.ha_summary_tokens, + "local_window_tokens": args.ha_local_window_tokens, + } + + modes = [ + ("baseline_all_visible", {**common, "selected_chunks": 999, "selection_mode": "all-visible-baseline"}), + ("method_lexical_selection", {**common, "selected_chunks": args.ha_selected_chunks, "selection_mode": "lexical"}), + ] + for mode, params in modes: + start = time.time() + try: + output, status, elapsed = await launch_case(run_inferlet_fn, client, inferlet, params, args.timeout) + except Exception as exc: # noqa: BLE001 + rows.append(failure_row(family, language, inferlet, task, mode, exc, time.time() - start)) + continue + + metrics = parse_ha_metrics(output) + row = base_row(family, language, inferlet, task, mode) + row.update(proxy_grade(output, task)) + row.update(metrics) + row.update( + { + "status": status, + "control_flow_ok": bool(metrics.get("chunks")) and metrics.get("mask_true_tokens") is not None, + "runtime_seconds": round(elapsed, 3), + "launches": 1, + "output_excerpt": _excerpt(output), + } + ) + rows.append(row) + + return rows + + +async def run_mcts(run_inferlet_fn, client, language: str, task: dict[str, Any], args) -> list[dict[str, Any]]: + family = "mcts" + inferlet = inferlet_name(family, language) + rows = [] + prompt = make_mcts_prompt(task) + modes = [ + ( + "baseline_single_search", + { + "prompt": prompt, + "max_iterations": 1, + "max_depth": 1, + "branch_factor": 1, + "rollout_tokens": args.mcts_rollout_tokens, + "final_tokens": args.max_tokens, + "show_trace": True, + }, + ), + ( + "method_mcts_search", + { + "prompt": prompt, + "max_iterations": args.mcts_iterations, + "max_depth": args.mcts_depth, + "branch_factor": args.mcts_branch_factor, + "rollout_tokens": args.mcts_rollout_tokens, + "final_tokens": args.max_tokens, + "show_trace": True, + }, + ), + ] + for mode, params in modes: + start = time.time() + try: + output, status, elapsed = await launch_case(run_inferlet_fn, client, inferlet, params, args.timeout) + except Exception as exc: # noqa: BLE001 + rows.append(failure_row(family, language, inferlet, task, mode, exc, time.time() - start)) + continue + + metrics = parse_mcts_metrics(output) + row = base_row(family, language, inferlet, task, mode) + row.update(proxy_grade(output, task)) + row.update(metrics) + row.update( + { + "status": status, + "control_flow_ok": bool(metrics.get("iterations")) and metrics.get("nodes") is not None, + "runtime_seconds": round(elapsed, 3), + "launches": 1, + "output_excerpt": _excerpt(output), + } + ) + rows.append(row) + + return rows + + +def parse_csv_list(value: str, allowed: tuple[str, ...], label: str) -> list[str]: + raw = [v.strip() for v in value.split(",") if v.strip()] + unknown = [v for v in raw if v not in allowed] + if unknown: + raise SystemExit(f"Unknown {label}: {', '.join(unknown)}. Allowed: {', '.join(allowed)}") + return raw + + +def selected_tasks(args) -> list[dict[str, Any]]: + if args.tasks == "all": + tasks = list(TASKS) + else: + wanted = {t.strip() for t in args.tasks.split(",") if t.strip()} + tasks = [task for task in TASKS if task["id"] in wanted or task["category"] in wanted] + missing = wanted - {task["id"] for task in tasks} - {task["category"] for task in tasks} + if missing: + raise SystemExit(f"Unknown task ids/categories: {', '.join(sorted(missing))}") + if args.max_tasks is not None: + tasks = tasks[: args.max_tasks] + return tasks + + +def write_results(rows: list[dict[str, Any]], out_dir: Path) -> None: + out_dir.mkdir(parents=True, exist_ok=True) + jsonl_path = out_dir / "arena_results.jsonl" + csv_path = out_dir / "arena_results.csv" + + with jsonl_path.open("w", encoding="utf-8") as f: + for row in rows: + f.write(json.dumps(row, sort_keys=True) + "\n") + + keys: list[str] = [] + for row in rows: + for key in row: + if key not in keys: + keys.append(key) + + with csv_path.open("w", newline="", encoding="utf-8") as f: + writer = csv.DictWriter(f, fieldnames=keys) + writer.writeheader() + for row in rows: + writer.writerow( + { + key: json.dumps(value, sort_keys=True) if isinstance(value, (list, dict)) else value + for key, value in row.items() + } + ) + + +def print_summary(rows: list[dict[str, Any]], out_dir: Path | None = None) -> None: + total = len(rows) + passed = sum(1 for row in rows if row.get("status") == "pass") + control_ok = sum(1 for row in rows if row.get("control_flow_ok") is True) + print(f"\nArena-style rows: {total}") + print(f"Launch status pass: {passed}/{total}") + print(f"Control-flow pass: {control_ok}/{total}") + if out_dir is not None: + print(f"JSONL: {out_dir / 'arena_results.jsonl'}") + print(f"CSV: {out_dir / 'arena_results.csv'}") + + +def dry_run_plan(args) -> list[dict[str, Any]]: + families = parse_csv_list(args.families, FAMILIES, "families") + languages = parse_csv_list(args.languages, LANGUAGES, "languages") + tasks = selected_tasks(args) + plan = [] + for task in tasks: + for family in families: + for language in languages: + plan.append( + { + "family": family, + "language": language, + "inferlet": inferlet_name(family, language), + "task_id": task["id"], + "task_category": task["category"], + } + ) + return plan + + +def load_pie_helpers(): + sys.path.insert(0, str(PIE_REPO / "client" / "python" / "src")) + sys.path.insert(0, str(PIE_REPO / "tests" / "inferlets")) + from pie_client import PieClient # noqa: PLC0415 + from conftest import run_inferlet # noqa: PLC0415 + + return PieClient, run_inferlet + + +async def run(args) -> int: + families = parse_csv_list(args.families, FAMILIES, "families") + languages = parse_csv_list(args.languages, LANGUAGES, "languages") + tasks = selected_tasks(args) + + PieClient, run_inferlet_fn = load_pie_helpers() + client = PieClient(args.url) + await client.connect() + if args.token and args.token.lower() != "none": + await client.auth_by_token(args.token) + + rows: list[dict[str, Any]] = [] + try: + for task in tasks: + print(f"\n== {task['id']} ({task['category']}) ==") + for language in languages: + for family in families: + print(f" {family}-{language}", flush=True) + if family == "modular-cache": + rows.extend(await run_modular_cache(run_inferlet_fn, client, language, task, args)) + elif family == "hierarchical-attention": + rows.extend(await run_hierarchical_attention(run_inferlet_fn, client, language, task, args)) + elif family == "mcts": + rows.extend(await run_mcts(run_inferlet_fn, client, language, task, args)) + finally: + await client.close() + + out_dir = Path(args.out_dir) + write_results(rows, out_dir) + print_summary(rows, out_dir) + return 0 if all(row.get("status") == "pass" for row in rows) else 1 + + +def make_parser() -> argparse.ArgumentParser: + parser = argparse.ArgumentParser( + description="Run local Arena-style proxy benchmarks for the custom Pie inferlets." + ) + parser.add_argument("url", nargs="?", help="Pie websocket URL, e.g. ws://127.0.0.1:8080") + parser.add_argument("token", nargs="?", default="none", help="Pie internal token, or none") + parser.add_argument("--dry-run", action="store_true", help="Print/write the planned benchmark matrix only") + parser.add_argument("--out-dir", default="benchmark_results/arena", help="Output directory for JSONL/CSV results") + parser.add_argument("--families", default=",".join(FAMILIES), help="Comma-separated inferlet families") + parser.add_argument("--languages", default=",".join(LANGUAGES), help="Comma-separated languages") + parser.add_argument("--tasks", default="all", help="all, or comma-separated task ids/categories") + parser.add_argument("--max-tasks", type=int, default=None, help="Limit number of tasks for a quick smoke run") + parser.add_argument("--timeout", type=int, default=240, help="Timeout per inferlet launch in seconds") + parser.add_argument("--max-tokens", type=int, default=16, help="Small generation cap per launch") + parser.add_argument("--ha-chunk-words", type=int, default=45) + parser.add_argument("--ha-selected-chunks", type=int, default=1) + parser.add_argument("--ha-sink-tokens", type=int, default=48) + parser.add_argument("--ha-summary-tokens", type=int, default=20) + parser.add_argument("--ha-local-window-tokens", type=int, default=96) + parser.add_argument("--mcts-iterations", type=int, default=3) + parser.add_argument("--mcts-depth", type=int, default=2) + parser.add_argument("--mcts-branch-factor", type=int, default=2) + parser.add_argument("--mcts-rollout-tokens", type=int, default=32) + return parser + + +def main() -> int: + parser = make_parser() + args = parser.parse_args() + + if args.dry_run: + plan = dry_run_plan(args) + out_dir = Path(args.out_dir) + out_dir.mkdir(parents=True, exist_ok=True) + plan_path = out_dir / "arena_plan.json" + plan_path.write_text(json.dumps({"arena_reference": ARENA_REFERENCE, "plan": plan}, indent=2), encoding="utf-8") + print(f"Planned benchmark cases: {len(plan)}") + print(f"Plan: {plan_path}") + for item in plan[:10]: + print(f" {item['inferlet']} :: {item['task_id']}") + if len(plan) > 10: + print(f" ... {len(plan) - 10} more") + return 0 + + if not args.url: + parser.error("url is required unless --dry-run is set") + + return asyncio.run(run(args)) + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/benches/test_agentic_benchmark.py b/benches/test_agentic_benchmark.py new file mode 100644 index 000000000..09a3ba2d8 --- /dev/null +++ b/benches/test_agentic_benchmark.py @@ -0,0 +1,247 @@ +"""Tests for the realistic agentic benchmark (real agent loop + tools). + +These exercise the REAL agent loop, REAL tools (file writes, subprocess, grep, +the frozen web-search corpus), the objective graders, the trajectory metrics, and +the Bradley-Terry leaderboard. They need Python, bash, and grep, but no Pie +engine, no model, and no GPU. + +Run directly: + + python3 tests/test_agentic_benchmark.py +""" + +import importlib.util +import sys +from pathlib import Path + + +def _load_runner(): + path = Path(__file__).resolve().parent / "run_agentic_benchmark.py" + spec = importlib.util.spec_from_file_location("agentic_benchmark", path) + module = importlib.util.module_from_spec(spec) + spec.loader.exec_module(module) + return module + + +B = _load_runner() + + +# ---- protocol parsing ----------------------------------------------------- # +def test_parse_action_tool(): + a = B.parse_action('THOUGHT: x\nACTION: write_file\nARGS: {"path": "p", "content": "c"}') + assert a.kind == "tool" + assert a.tool == "write_file" + assert a.args == {"path": "p", "content": "c"} + + +def test_parse_action_final_wins_when_no_action(): + a = B.parse_action("THOUGHT: done\nFINAL: the answer is 42") + assert a.kind == "final" + assert a.final == "the answer is 42" + + +def test_parse_action_none_and_malformed_args(): + assert B.parse_action("just chatting").kind == "none" + a = B.parse_action("ACTION: read_file\nARGS: {nope}") + assert a.kind == "tool" and "__malformed__" in a.args + + +# ---- tools ---------------------------------------------------------------- # +def test_frozen_web_search_returns_corpus_snippet(): + out = B.FrozenWebSearch().search("what does the pie dummy driver emit") + assert "RANDOM" in out + + +def test_execute_tool_filesystem_roundtrip(): + ws = B.Workspace() + try: + w = B.execute_tool(ws, None, B.Action("tool", "write_file", {"path": "f.txt", "content": "hi"})) + assert w["error"] is False and ws.read_file("f.txt") == "hi" + r = B.execute_tool(ws, None, B.Action("tool", "read_file", {"path": "f.txt"})) + assert r["observation"] == "hi" + ls = B.execute_tool(ws, None, B.Action("tool", "list_files", {})) + assert "f.txt" in ls["observation"] + finally: + ws.cleanup() + + +def test_execute_tool_unknown_is_hallucination(): + ws = B.Workspace() + try: + r = B.execute_tool(ws, None, B.Action("tool", "open_browser", {"url": "x"})) + assert r["error"] is True and r["hallucinated"] is True + finally: + ws.cleanup() + + +def test_execute_tool_terminal_runs_real_subprocess(): + ws = B.Workspace() + try: + r = B.execute_tool(ws, None, B.Action("tool", "run_terminal", {"command": "echo hello"})) + assert "hello" in r["observation"] and r["error"] is False + finally: + ws.cleanup() + + +# ---- full oracle trajectories per task (end to end, objective grade) ------ # +def test_every_task_succeeds_under_oracle(): + for task in B.TASKS: + row = B.run_task_offline(task, max_steps=6) + assert row["objective_pass"] is True, (task.id, row.get("detail")) + assert row["task_success"] == 1 + assert row["tool_calls"] >= 1, task.id + + +# ---- metrics -------------------------------------------------------------- # +def test_trajectory_metrics_error_recovery(): + base = {"steps": 4, "tool_calls": 3, "tool_hallucinations": 0, "invalid_actions": 0} + with_err = B.trajectory_metrics({**base, "tool_errors": 1, "recovered": True}, True) + assert with_err["error_recovery"] == 1 + no_err = B.trajectory_metrics({**base, "tool_errors": 0, "recovered": False}, True) + assert no_err["error_recovery"] is None + + +def test_pairwise_outcome_success_then_efficiency(): + win = {"task_success": 1, "steps": 5, "tool_errors": 0} + lose = {"task_success": 0, "steps": 1, "tool_errors": 0} + assert B.pairwise_outcome(win, lose) == 1.0 + assert B.pairwise_outcome(lose, win) == 0.0 + fast = {"task_success": 1, "steps": 2, "tool_errors": 0} + slow = {"task_success": 1, "steps": 9, "tool_errors": 0} + assert B.pairwise_outcome(fast, slow) == 1.0 + assert B.pairwise_outcome(fast, dict(fast)) == 0.5 + + +# ---- Bradley-Terry + leaderboard ------------------------------------------ # +def test_bradley_terry_ranks_stronger_arm_higher(): + arms = ["a_strong", "b_weak"] + wins = {("a_strong", "b_weak"): 9.0, ("b_weak", "a_strong"): 1.0} + bt = B.bradley_terry(arms, wins) + assert bt["a_strong"] > bt["b_weak"] + assert abs(sum(bt.values()) - 1.0) < 1e-6 + + +def test_wilson_interval_bounds(): + lo, hi = B.wilson_interval(5, 10) + assert 0.0 <= lo < 0.5 < hi <= 1.0 + assert B.wilson_interval(0, 0) == (0.0, 0.0) + lo2, hi2 = B.wilson_interval(10, 10) + assert lo2 <= hi2 <= 1.0 and lo2 > 0.5 # tighter, high-rate interval + + +def test_steerability_metric_in_trajectory(): + base = {"steps": 3, "tool_calls": 2, "tool_errors": 0, "recovered": False} + clean = B.trajectory_metrics({**base, "tool_hallucinations": 0, "invalid_actions": 0}, True) + assert clean["steerability"] == 1 + messy = B.trajectory_metrics({**base, "tool_hallucinations": 1, "invalid_actions": 0}, False) + assert messy["steerability"] == 0 + + +def test_extended_bradley_terry_isolates_winning_component(): + # "method" beats "baseline" in both languages -> mode:method must score higher. + feats = { + "A": {"mode:method": 1.0, "lang:x": 1.0}, + "B": {"mode:baseline": 1.0, "lang:x": 1.0}, + "C": {"mode:method": 1.0, "lang:y": 1.0}, + "D": {"mode:baseline": 1.0, "lang:y": 1.0}, + } + wins = {("A", "B"): 9.0, ("B", "A"): 1.0, ("C", "D"): 9.0, ("D", "C"): 1.0} + w = B.extended_bradley_terry(feats, wins) + assert w["mode:method"] > w["mode:baseline"] + + +def test_build_leaderboard_ranks_method_above_failing_baseline(): + rows = [] + for t in ("t1", "t2"): + rows.append({"arm": "m-method", "task_id": t, "language": "rust", + "task_success": 1, "steps": 3, "tool_errors": 0}) + rows.append({"arm": "m-baseline", "task_id": t, "language": "rust", + "task_success": 0, "steps": 2, "tool_errors": 0}) + board = B.build_leaderboard(rows) + assert board["arms"][0]["arm"] == "m-method" + rates = {s["arm"]: s["success_rate"] for s in board["arms"]} + assert rates["m-method"] == 1.0 and rates["m-baseline"] == 0.0 + + +def test_selected_tasks_filters_by_id_and_category(): + parser = B.make_parser() + args = parser.parse_args(["--offline-self-check", "--tasks", "terminal-fix-bug"]) + assert [t.id for t in B.selected_tasks(args)] == ["terminal-fix-bug"] + args2 = parser.parse_args(["--offline-self-check", "--tasks", "web_research"]) + assert [t.id for t in B.selected_tasks(args2)] == ["web-research-fact"] + + +# ---- live web-search adapter (no network: fetch is injected) -------------- # +def test_parse_search_results_handles_brave_and_generic(): + brave = {"web": {"results": [{"description": "d1"}, {"title": "t2"}]}} + assert B.parse_search_results(brave) == ["d1", "t2"] + generic = {"results": [{"snippet": "s1"}, {"content": "c1"}, {"title": "t3"}]} + assert B.parse_search_results(generic) == ["s1", "c1", "t3"] + assert B.parse_search_results({}) == [] + + +def test_http_web_search_with_injected_fetch(): + payload = '{"web": {"results": [{"description": "d1"}, {"title": "t2"}]}}' + hs = B.HttpWebSearch("key", fetch=lambda url, headers: payload) + assert hs.search("q") == "d1 | t2" + + +def test_http_web_search_error_degrades_gracefully(): + def boom(url, headers): + raise RuntimeError("net down") + hs = B.HttpWebSearch("k", fetch=boom) + assert "error" in hs.search("q") + + +def test_make_web_backend_default_frozen_and_live_requires_key(): + parser = B.make_parser() + frozen = B.make_web_backend(parser.parse_args(["--offline-self-check"])) + assert isinstance(frozen, B.FrozenWebSearch) + live = parser.parse_args(["--offline-self-check", "--web-search", "live", + "--web-search-key-env", "DEFINITELY_UNSET_ENV_XYZ"]) + try: + B.make_web_backend(live) + assert False, "expected SystemExit when the key env var is missing" + except SystemExit: + pass + + +# ---- JSON task loader + spec graders -------------------------------------- # +def test_grade_from_spec_variants(): + ws = B.Workspace() + try: + assert B.grade_from_spec(ws, "", {"type": "terminal_exit_zero", "command": "true"})["objective_pass"] is True + assert B.grade_from_spec(ws, "", {"type": "terminal_exit_zero", "command": "false"})["objective_pass"] is False + assert B.grade_from_spec(ws, "answer is 42", {"type": "answer_equals", "expected": "42", "normalize": "int"})["objective_pass"] is True + ws.write_file("c.json", '{"a": 1}') + assert B.grade_from_spec(ws, "", {"type": "file_json_equals", "path": "c.json", "required": {"a": 1}})["objective_pass"] is True + assert B.grade_from_spec(ws, "", {"type": "nope"})["objective_pass"] is False + finally: + ws.cleanup() + + +def test_json_task_loader_runs_example_tasks(): + path = Path(__file__).resolve().parent / "benchmark_tasks.example.json" + tasks = B.load_tasks_from_json(str(path)) + assert len(tasks) == 2 + for t in tasks: + row = B.run_task_offline(t, max_steps=6) + assert row["objective_pass"] is True, (t.id, row.get("detail")) + + +def main(): + tests = [v for k, v in sorted(globals().items()) if k.startswith("test_")] + failures = 0 + for test in tests: + try: + test() + print(f" ok {test.__name__}") + except Exception as exc: # noqa: BLE001 + failures += 1 + print(f" FAIL {test.__name__}: {exc}") + print(f"\n{len(tests) - failures}/{len(tests)} passed") + sys.exit(1 if failures else 0) + + +if __name__ == "__main__": + main() diff --git a/benches/test_arena_benchmark.py b/benches/test_arena_benchmark.py new file mode 100644 index 000000000..715868644 --- /dev/null +++ b/benches/test_arena_benchmark.py @@ -0,0 +1,133 @@ +"""Pure-helper tests for the Arena-style benchmark runner. + +Run directly: + + python3 tests/test_arena_benchmark.py +""" + +import importlib.util +import sys +from pathlib import Path + + +def _load_runner(): + path = Path(__file__).resolve().parent / "run_arena_benchmark.py" + spec = importlib.util.spec_from_file_location("arena_benchmark", path) + module = importlib.util.module_from_spec(spec) + spec.loader.exec_module(module) + return module + + +B = _load_runner() + + +def test_make_modules_keeps_only_task_suffix_variable(): + task = B.TASKS[0] + a = B.make_modules(task, task["initial_request"]) + b = B.make_modules(task, task["followup_request"]) + assert len(a) == 5 + assert [m["id"] for m in a] == [m["id"] for m in b] + assert [m["text"] for m in a[:-1]] == [m["text"] for m in b[:-1]] + assert a[-1]["text"] != b[-1]["text"] + + +def test_parse_modular_metrics(): + out = """--- modular-cache-python --- +modules=5 +use_cache=true save_cache=true +cache_miss +saved=x +cache_hit_modules=4 +""" + got = B.parse_modular_metrics(out) + assert got["modules"] == 5 + assert got["cache_miss_seen"] + assert got["cache_hit_modules"] == 4 + assert got["saved_count"] == 1 + + +def test_modular_warm_prefix_still_counts_as_method_control_flow(): + assert B.modular_first_launch_ok("cache_miss\n", 5) + assert B.modular_first_launch_ok("cache_hit_modules=2\n", 5) + assert not B.modular_first_launch_ok("cache_hit_modules=5\n", 5) + + +def test_parse_ha_metrics(): + out = """--- hierarchical-attention-python --- +chunks=4 +selected_chunk=[2] +mask_true_tokens=120 / total=300 +generated_tokens=7 +""" + got = B.parse_ha_metrics(out) + assert got["chunks"] == 4 + assert got["selected_chunks"] == [2] + assert got["mask_true_tokens"] == 120 + assert got["mask_total_tokens"] == 300 + assert got["visible_token_ratio"] == 0.4 + assert got["generated_tokens"] == 7 + + +def test_parse_mcts_metrics(): + out = """--- mcts-python --- +iterations=3 max_depth=2 branch_factor=2 c=1.414 +iteration=0 selected_node=0 expanded_children=1 rollout_score=0.500 best_score=0.500 +iteration=1 selected_node=0 expanded_children=2 rollout_score=0.750 best_score=0.750 +MCTS summary: +iterations=3 +nodes=4 +best_score=0.750 +""" + got = B.parse_mcts_metrics(out) + assert got["iterations"] == 3 + assert got["nodes"] == 4 + assert got["best_score"] == 0.75 + assert got["rollout_scores"] == [0.5, 0.75] + assert got["score_variance_seen"] + + +def test_proxy_grade_detects_required_terms_and_hallucination(): + task = B.TASKS[0] + out = "The fix is an off-by-one range end. Add a regression test. I ran pytest." + got = B.proxy_grade(out, task) + assert got["task_success_proxy"] == 1.0 + assert got["steerability_proxy"] == 0.0 + assert got["tool_hallucination_proxy"] == 1.0 + assert "i ran pytest" in got["forbidden_hits"] + + +def test_dry_run_plan_filters(): + parser = B.make_parser() + args = parser.parse_args( + [ + "--dry-run", + "--families", + "mcts", + "--languages", + "python", + "--tasks", + "coding_debugging", + ] + ) + plan = B.dry_run_plan(args) + assert len(plan) == 1 + assert plan[0]["inferlet"] == "mcts-python" + assert plan[0]["task_category"] == "coding_debugging" + + +def main(): + tests = [v for k, v in sorted(globals().items()) if k.startswith("test_")] + failures = 0 + for test in tests: + try: + test() + print(f" ok {test.__name__}") + except Exception as exc: # noqa: BLE001 + failures += 1 + print(f" FAIL {test.__name__}: {exc}") + print(f"\n{len(tests) - failures}/{len(tests)} passed") + sys.exit(1 if failures else 0) + + +if __name__ == "__main__": + main()