Files
hermes-webui/tests/test_runtime_adapter_seam.py
T
2026-05-20 04:05:36 -07:00

538 lines
24 KiB
Python

import importlib
import queue
from tests.conftest import requires_agent_modules
def test_runtime_adapter_interface_and_legacy_journal_methods_exist():
runtime = importlib.import_module("api.runtime_adapter")
required = (
"start_run",
"observe_run",
"get_run",
"cancel_run",
"respond_approval",
"respond_clarify",
"queue_message",
"update_goal",
)
for name in required:
assert hasattr(runtime.RuntimeAdapter, name)
assert hasattr(runtime.LegacyJournalRuntimeAdapter, name)
assert hasattr(runtime.RunnerRuntimeAdapter, name)
assert runtime.runtime_adapter_mode({}) == "legacy-direct"
assert runtime.runtime_adapter_enabled({}) is False
assert runtime.runtime_adapter_mode({"HERMES_WEBUI_RUNTIME_ADAPTER": "legacy-journal"}) == "legacy-journal"
assert runtime.runtime_adapter_enabled({"HERMES_WEBUI_RUNTIME_ADAPTER": "legacy-journal"}) is True
assert runtime.runtime_adapter_mode({"HERMES_WEBUI_RUNTIME_ADAPTER": "sidecar"}) == "legacy-direct"
def test_legacy_journal_adapter_start_run_delegates_without_owning_runtime_state():
runtime = importlib.import_module("api.runtime_adapter")
calls = []
def start_delegate(request):
calls.append(request)
return {
"stream_id": "stream-123",
"session_id": request.session_id,
"status": "started",
"active_controls": ["cancel"],
}
adapter = runtime.LegacyJournalRuntimeAdapter(start_run_delegate=start_delegate)
request = runtime.StartRunRequest(
session_id="s1",
message="hello",
attachments=[{"name": "a.txt"}],
workspace="/tmp/work",
profile="default",
provider="openai-codex",
model="gpt-5.5",
toolsets=["terminal"],
source="webui",
metadata={"k": "v"},
)
result = adapter.start_run(request)
assert calls == [request]
assert result.session_id == "s1"
assert result.stream_id == "stream-123"
assert result.run_id == "stream-123"
assert result.status == "started"
assert result.active_controls == ["cancel"]
def test_legacy_journal_adapter_observe_and_get_run_use_journal_and_live_state(tmp_path):
runtime = importlib.import_module("api.runtime_adapter")
run_journal = importlib.import_module("api.run_journal")
run_journal.append_run_event("s1", "r1", "token", {"text": "a"}, session_dir=tmp_path)
run_journal.append_run_event("s1", "r1", "done", {"ok": True}, session_dir=tmp_path)
adapter = runtime.LegacyJournalRuntimeAdapter(
session_dir=tmp_path,
live_stream_lookup=lambda run_id: run_id == "live-run",
)
replay = adapter.observe_run("r1", cursor="0")
assert [event["type"] for event in replay.events] == ["token", "done"]
assert replay.last_event_id == "r1:2"
completed = adapter.get_run("r1")
assert completed.run_id == "r1"
assert completed.session_id == "s1"
assert completed.status == "completed"
assert completed.terminal_state == "completed"
assert completed.last_event_id == "r1:2"
live = adapter.get_run("live-run")
assert live.run_id == "live-run"
assert live.status == "running"
assert live.active_controls == ["cancel"]
def test_legacy_journal_adapter_controls_delegate_to_existing_handlers():
runtime = importlib.import_module("api.runtime_adapter")
calls = []
adapter = runtime.LegacyJournalRuntimeAdapter(
cancel_delegate=lambda run_id: calls.append(("cancel", run_id)) or True,
approval_delegate=lambda run_id, approval_id, choice: calls.append(("approval", run_id, approval_id, choice)) or True,
clarify_delegate=lambda run_id, clarify_id, response: calls.append(("clarify", run_id, clarify_id, response)) or True,
)
assert adapter.cancel_run("r1").accepted is True
assert adapter.respond_approval("r1", "a1", "once").accepted is True
assert adapter.respond_clarify("r1", "c1", "answer").accepted is True
assert calls == [
("cancel", "r1"),
("approval", "r1", "a1", "once"),
("clarify", "r1", "c1", "answer"),
]
def test_legacy_journal_adapter_queue_and_goal_delegate_without_owning_runtime_state():
runtime = importlib.import_module("api.runtime_adapter")
calls = []
adapter = runtime.LegacyJournalRuntimeAdapter(
queue_delegate=lambda run_id, message, mode: calls.append(("queue", run_id, message, mode)) or True,
goal_delegate=lambda session_id, action, text: calls.append(("goal", session_id, action, text)) or {
"ok": True,
"action": action,
"message": "Goal updated.",
},
)
queued = adapter.queue_message("r1", "follow up", mode="queue")
goal = adapter.update_goal("s1", "set", "finish the task")
assert queued.accepted is True
assert goal.accepted is True
assert goal.payload["action"] == "set"
assert calls == [
("queue", "r1", "follow up", "queue"),
("goal", "s1", "set", "finish the task"),
]
def test_legacy_journal_adapter_cancel_returns_bounded_not_active_status():
runtime = importlib.import_module("api.runtime_adapter")
calls = []
adapter = runtime.LegacyJournalRuntimeAdapter(
cancel_delegate=lambda run_id: calls.append(run_id) or False,
)
result = adapter.cancel_run("already-finished-run")
assert calls == ["already-finished-run"]
assert result.accepted is False
assert result.status == "not-active"
assert result.safe_message == "Legacy control did not accept the request."
def test_legacy_journal_adapter_approval_and_clarify_return_bounded_not_active_status():
runtime = importlib.import_module("api.runtime_adapter")
calls = []
adapter = runtime.LegacyJournalRuntimeAdapter(
approval_delegate=lambda run_id, approval_id, choice: calls.append(("approval", run_id, approval_id, choice)) or False,
clarify_delegate=lambda run_id, clarify_id, response: calls.append(("clarify", run_id, clarify_id, response)) or False,
)
approval = adapter.respond_approval("already-finished-run", "stale-approval", "deny")
clarify = adapter.respond_clarify("already-finished-run", "stale-clarify", "answer")
assert calls == [
("approval", "already-finished-run", "stale-approval", "deny"),
("clarify", "already-finished-run", "stale-clarify", "answer"),
]
assert approval.accepted is False
assert approval.status == "not-active"
assert clarify.accepted is False
assert clarify.status == "not-active"
def test_legacy_journal_adapter_queue_and_goal_return_bounded_statuses():
runtime = importlib.import_module("api.runtime_adapter")
adapter = runtime.LegacyJournalRuntimeAdapter(
queue_delegate=lambda run_id, message, mode: False,
goal_delegate=lambda session_id, action, text: {
"ok": False,
"action": action,
"error": "agent_running",
"message": "Agent is running.",
},
)
queued = adapter.queue_message("already-finished-run", "follow up")
goal = adapter.update_goal("s1", "set", "new goal")
assert queued.accepted is False
assert queued.status == "not-active"
assert goal.accepted is False
assert goal.status == "set"
assert goal.safe_message == "Agent is running."
assert goal.payload["error"] == "agent_running"
def test_chat_cancel_route_uses_adapter_only_when_flag_enabled():
routes = importlib.import_module("api.routes")
src = (routes.Path(__file__).parent.parent / "api" / "routes.py").read_text(encoding="utf-8")
cancel_idx = src.index('if parsed.path == "/api/chat/cancel":')
cancel_body = src[cancel_idx:src.index('if parsed.path == "/api/chat/stream":', cancel_idx)]
assert "runtime_adapter_enabled()" in cancel_body
assert "LegacyJournalRuntimeAdapter(cancel_delegate=cancel_stream)" in cancel_body
assert "adapter.cancel_run(stream_id).accepted" in cancel_body
assert "else:\n cancelled = cancel_stream(stream_id)" in cancel_body
assert "HERMES_WEBUI_RUNTIME_ADAPTER" not in cancel_body, "route should use runtime_adapter_enabled(), not inline env checks"
def test_approval_and_clarify_routes_use_adapter_only_when_flag_enabled():
routes = importlib.import_module("api.routes")
src = (routes.Path(__file__).parent.parent / "api" / "routes.py").read_text(encoding="utf-8")
approval_idx = src.index("def _handle_approval_respond")
approval_body = src[approval_idx:src.index("def _resolve_clarify_legacy", approval_idx)]
clarify_idx = src.index("def _handle_clarify_respond")
clarify_body = src[clarify_idx:src.index("class _ManualCompressionMemoryHandler", clarify_idx)]
assert "runtime_adapter_enabled()" in approval_body
assert "LegacyJournalRuntimeAdapter(approval_delegate=_resolve_approval_legacy)" in approval_body
assert "adapter.respond_approval(sid, approval_id, choice).accepted" in approval_body
assert "else:\n ok = _resolve_approval_legacy(sid, approval_id, choice)" in approval_body
assert "HERMES_WEBUI_RUNTIME_ADAPTER" not in approval_body
assert "runtime_adapter_enabled()" in clarify_body
assert "LegacyJournalRuntimeAdapter(clarify_delegate=_resolve_clarify_legacy)" in clarify_body
assert "adapter.respond_clarify(sid, clarify_id, response).accepted" in clarify_body
assert "else:\n ok = _resolve_clarify_legacy(sid, clarify_id, response)" in clarify_body
assert "HERMES_WEBUI_RUNTIME_ADAPTER" not in clarify_body
def test_goal_route_uses_adapter_only_when_flag_enabled():
routes = importlib.import_module("api.routes")
src = (routes.Path(__file__).parent.parent / "api" / "routes.py").read_text(encoding="utf-8")
goal_idx = src.index("def _handle_goal_command")
goal_body = src[goal_idx:src.index("def _handle_chat_start", goal_idx)]
assert "runtime_adapter_enabled()" in goal_body
assert "LegacyJournalRuntimeAdapter(goal_delegate=_legacy_goal_update)" in goal_body
assert "goal_adapter_action = _runtime_adapter_goal_action(goal_args)" in goal_body
assert "adapter.update_goal(" in goal_body
assert "goal_adapter_action," in goal_body
assert "payload = dict(control_result.payload)" in goal_body
assert "else:\n payload = _legacy_goal_update" in goal_body
assert "HERMES_WEBUI_RUNTIME_ADAPTER" not in goal_body
def test_goal_adapter_action_is_bounded_to_slice3c_actions():
routes = importlib.import_module("api.routes")
assert routes._runtime_adapter_goal_action("") == "status"
assert routes._runtime_adapter_goal_action("status") == "status"
assert routes._runtime_adapter_goal_action("pause") == "pause"
assert routes._runtime_adapter_goal_action("resume") == "resume"
assert routes._runtime_adapter_goal_action("clear") == "clear"
assert routes._runtime_adapter_goal_action("stop") == "clear"
assert routes._runtime_adapter_goal_action("done") == "clear"
assert routes._runtime_adapter_goal_action("ship #1925") == "set"
def test_approval_respond_does_not_fallback_to_oldest_when_explicit_id_is_stale():
routes = importlib.import_module("api.routes")
src = (routes.Path(__file__).parent.parent / "api" / "routes.py").read_text(encoding="utf-8")
helper_idx = src.index("def _resolve_approval_legacy")
helper_body = src[helper_idx:src.index("def _handle_approval_respond", helper_idx)]
assert "A stale explicit id must not accidentally approve" in helper_body
assert "if found_target or not approval_id:" in helper_body
stale_branch = helper_body[helper_body.index("else:", helper_body.index("for i, entry")):helper_body.index("else:\n pending = queue.pop(0)")]
assert "pending = None" in stale_branch
assert "queue.pop(0)" not in stale_branch
def test_approval_respond_peeks_gateway_queues_when_pending_empty() -> None:
"""When _pending has no matching entry but _gateway_queues does, the
helper should extract pattern_keys from the gateway queue and call
approve_session even though pending is None.
"""
routes = importlib.import_module("api.routes")
src = (routes.Path(__file__).parent.parent / "api" / "routes.py").read_text(encoding="utf-8")
helper_idx = src.index("def _resolve_approval_legacy")
helper_body = src[helper_idx:src.index("def _handle_approval_respond", helper_idx)]
assert "_gateway_queues" in helper_body, (
"_resolve_approval_legacy must reference _gateway_queues "
"to read pattern_keys when _pending is empty"
)
assert "gateway_keys" in helper_body, (
"Must extract pattern_keys from _gateway_queues into a gateway_keys variable"
)
assert "approve_session" in helper_body[helper_body.index("all_keys"):], (
"Must call approve_session for keys extracted from _gateway_queues"
)
@requires_agent_modules
def test_approval_respond_approves_from_gateway_queues_when_pending_empty() -> None:
"""Verify _resolve_approval_legacy peeks into _gateway_queues for
pattern_keys when _pending has no matching entry, and calls
approve_session() even though pending is None (the real streaming case).
"""
import threading
from api.routes import _resolve_approval_legacy
routes = importlib.import_module("api.routes")
approval_mod = importlib.import_module("tools.approval")
test_sid = "__test_gateway_approval_sid__"
test_key = "__test_pattern_key__"
# 1. Ensure _pending is empty for this sid
with approval_mod._lock:
approval_mod._pending.pop(test_sid, None)
# 2. Populate _gateway_queues with a real entry
entry = approval_mod._ApprovalEntry({
"command": "test_cmd",
"pattern_key": test_key,
"pattern_keys": [test_key],
"description": "test dangerous cmd",
})
with approval_mod._lock:
approval_mod._gateway_queues.setdefault(test_sid, []).append(entry)
try:
# 3. Run the helper with empty _pending but populated _gateway_queues
result = _resolve_approval_legacy(test_sid, "", "session")
# 4. Verify approve_session was called (is_approved must return True)
assert approval_mod.is_approved(test_sid, test_key), (
"approve_session should have been called for the pattern_key "
"extracted from _gateway_queues"
)
assert result is True, (
"_resolve_approval_legacy should return True when it finds "
"and resolves the gateway entry"
)
finally:
# 5. Cleanup
with approval_mod._lock:
approval_mod._gateway_queues.pop(test_sid, None)
approval_mod._session_approved.pop(test_sid, None)
approval_mod._pending.pop(test_sid, None)
def test_chat_start_route_selects_adapter_only_when_flag_enabled():
routes = importlib.import_module("api.routes")
src = (routes.Path(__file__).parent.parent / "api" / "routes.py").read_text(encoding="utf-8")
start_idx = src.index("def _handle_chat_start")
start_body = src[start_idx:src.index("def _resolve_chat_workspace_with_recovery", start_idx)]
assert "runtime_adapter_enabled()" in start_body
assert "LegacyJournalRuntimeAdapter" in start_body
assert "_start_chat_stream_for_session(" in start_body
assert "HERMES_WEBUI_RUNTIME_ADAPTER" not in start_body, "route should use runtime_adapter_enabled(), not inline env checks"
def test_chat_start_adapter_path_preserves_legacy_response_shape():
"""The RuntimeAdapter seam must be invisible to /api/chat/start callers.
The adapter can use run_id/status/controls internally, but the flagged
route must not add fields that the legacy-direct response does not expose.
"""
routes = importlib.import_module("api.routes")
src = (routes.Path(__file__).parent.parent / "api" / "routes.py").read_text(encoding="utf-8")
start_idx = src.index("def _handle_chat_start")
start_body = src[start_idx:src.index("def _resolve_chat_workspace_with_recovery", start_idx)]
branch_start = start_body.index("if runtime_adapter_enabled():")
branch_end = start_body.index("else:", branch_start)
adapter_branch = start_body[branch_start:branch_end]
assert 'response.setdefault("stream_id", result.stream_id)' in adapter_branch
assert 'response.setdefault("session_id", result.session_id)' in adapter_branch
assert 'response.setdefault("run_id", result.run_id)' not in adapter_branch
assert 'response.setdefault("status", result.status)' not in adapter_branch
assert 'response.setdefault("active_controls", result.active_controls)' not in adapter_branch
def test_rfc_distinguishes_goal_routing_from_queue_route_staging():
routes = importlib.import_module("api.routes")
rfc = (routes.Path(__file__).parent.parent / "docs" / "rfcs" / "hermes-run-adapter-contract.md").read_text(encoding="utf-8")
assert "#2544 shipped the first Slice 3c implementation" in rfc
assert "#2560 shipped the queue-staging clarification" in rfc
assert "route now uses `RuntimeAdapter.update_goal(...)`" in rfc
assert "`queue_message(...)` as a staged protocol method only" in rfc
assert "no new server-side queue endpoint" in rfc
assert "no server-side queue endpoint or queue\n scheduler should be added merely for adapter symmetry" in rfc
def test_rfc_defines_slice4_runner_contract_before_runner_code():
routes = importlib.import_module("api.routes")
rfc = (routes.Path(__file__).parent.parent / "docs" / "rfcs" / "hermes-run-adapter-contract.md").read_text(encoding="utf-8")
assert "#### Slice 4a: Runner contract gate" in rfc
assert "docs/test contract PR before any\nrunner code lands" in rfc
assert "feature-flagged, default-off" in rfc
assert "The runner, not the main WebUI request process, owns" in rfc
assert "restart only\n `hermes-webui.service`" in rfc
assert "profile,\n workspace, attachments, model/provider, toolset, and source metadata" in rfc
assert "no removal of the legacy in-process backend" in rfc
assert "no default-on runner mode" in rfc
assert "#### Slice 4b: Runner adapter client facade" in rfc
assert "Status as of 2026-05-20: shipped in v0.51.94 via #2599" in rfc
assert "delegates to an injected runner client" in rfc
assert "without relying on process-local `STREAMS`" in rfc
def test_rfc_defines_slice4c_runner_backend_harness_gate():
routes = importlib.import_module("api.routes")
rfc = (routes.Path(__file__).parent.parent / "docs" / "rfcs" / "hermes-run-adapter-contract.md").read_text(encoding="utf-8")
assert "#### Slice 4c: Feature-flagged runner backend and restart/reattach harness" in rfc
assert "`HERMES_WEBUI_RUNTIME_ADAPTER=runner-local`" in rfc
assert "`legacy-direct` remains the default" in rfc
assert "No route-shape drift" in rfc
assert "Restart/reattach harness" in rfc
assert "discard the first WebUI adapter instance" in rfc
assert "No runtime-surrogate globals" in rfc
assert "no live chat route switch to the runner backend before the restart/reattach" in rfc
def test_runner_runtime_adapter_passes_explicit_start_payload_without_env_mutation(monkeypatch):
runtime = importlib.import_module("api.runtime_adapter")
captured = []
class FakeRunnerClient:
def start_run(self, request):
captured.append(request)
return {
"run_id": "runner-1",
"session_id": request.session_id,
"stream_id": "runner-1",
"status": "running",
"active_controls": ["cancel", "approval", "clarify", "goal"],
}
before_terminal_cwd = "existing-cwd"
monkeypatch.setenv("TERMINAL_CWD", before_terminal_cwd)
adapter = runtime.RunnerRuntimeAdapter(client=FakeRunnerClient())
request = runtime.StartRunRequest(
session_id="s-runner",
message="hello runner",
attachments=[{"path": "/tmp/a.png", "mime": "image/png"}],
workspace="/workspace/project",
profile="research",
provider="openai-codex",
model="gpt-5.5",
toolsets=["terminal", "file"],
source="webui",
metadata={"route": "/api/chat/start", "csrf_checked": True},
)
result = adapter.start_run(request)
assert captured == [request]
assert captured[0].workspace == "/workspace/project"
assert captured[0].profile == "research"
assert captured[0].attachments == [{"path": "/tmp/a.png", "mime": "image/png"}]
assert captured[0].provider == "openai-codex"
assert captured[0].model == "gpt-5.5"
assert captured[0].toolsets == ["terminal", "file"]
assert result.run_id == "runner-1"
assert result.active_controls == ["cancel", "approval", "clarify", "goal"]
assert runtime.os.environ["TERMINAL_CWD"] == before_terminal_cwd
def test_runner_runtime_adapter_observe_and_get_survive_adapter_recreation():
runtime = importlib.import_module("api.runtime_adapter")
class FakeRunnerClient:
def __init__(self):
self.events = []
self.status = "unknown"
def start_run(self, request):
self.status = "running"
self.events.append({"event_id": "runner-1:1", "seq": 1, "type": "token", "data": {"text": "hi"}})
self.events.append({"event_id": "runner-1:2", "seq": 2, "type": "done", "data": {"ok": True}})
self.status = "completed"
return {"run_id": "runner-1", "session_id": request.session_id, "stream_id": "runner-1", "status": "running"}
def observe_run(self, run_id, *, cursor=None):
after = int(cursor or 0)
return {"run_id": run_id, "events": [e for e in self.events if e["seq"] > after]}
def get_run(self, run_id):
return {
"run_id": run_id,
"session_id": "s-runner",
"status": self.status,
"terminal_state": "completed",
"last_event_id": self.events[-1]["event_id"],
"active_controls": [],
}
shared_runner = FakeRunnerClient()
first_webui_process = runtime.RunnerRuntimeAdapter(client=shared_runner)
first_webui_process.start_run(runtime.StartRunRequest(session_id="s-runner", message="hello"))
restarted_webui_process = runtime.RunnerRuntimeAdapter(client=shared_runner)
replay = restarted_webui_process.observe_run("runner-1", cursor="1")
status = restarted_webui_process.get_run("runner-1")
assert [event["type"] for event in replay.events] == ["done"]
assert replay.cursor == "2"
assert replay.last_event_id == "runner-1:2"
assert status.status == "completed"
assert status.terminal_state == "completed"
assert status.last_event_id == "runner-1:2"
def test_runner_runtime_adapter_controls_are_bounded_and_do_not_use_legacy_state():
runtime = importlib.import_module("api.runtime_adapter")
class FakeRunnerClient:
def cancel_run(self, run_id):
return {"ok": False, "status": "not-active", "message": "Run is not active."}
adapter = runtime.RunnerRuntimeAdapter(client=FakeRunnerClient())
cancel = adapter.cancel_run("finished-run")
approval = adapter.respond_approval("finished-run", "approval-1", "once")
clarify = adapter.respond_clarify("finished-run", "clarify-1", "answer")
queued = adapter.queue_message("finished-run", "next")
goal = adapter.update_goal("s-runner", "status")
assert cancel.accepted is False
assert cancel.status == "not-active"
assert cancel.safe_message == "Run is not active."
for result in (approval, clarify, queued, goal):
assert result.accepted is False
assert result.status == "unsupported"
assert "not supported by this runner backend" in (result.safe_message or "")