Files
hermes-webui/api/runner_client.py

157 lines
6.6 KiB
Python

"""HTTP client boundary for a supervised Hermes WebUI runner backend.
This module intentionally contains no process-local run maps, stream queues,
cancellation registries, approval/clarify queues, or cached agent instances. It
is only a JSON-over-HTTP transport used by ``RunnerRuntimeAdapter`` when an
operator explicitly configures a runner endpoint.
"""
from __future__ import annotations
import json
import os
import urllib.error
import urllib.parse
import urllib.request
from typing import Any
_RUNNER_BASE_URL_ENV = "HERMES_WEBUI_RUNNER_BASE_URL"
_RUNNER_API_KEY_ENV = "HERMES_WEBUI_RUNNER_API_KEY"
class RunnerClientError(RuntimeError):
"""Raised when a configured runner endpoint rejects or fails a request."""
def runner_client_configured(environ: dict[str, str] | None = None) -> bool:
source = os.environ if environ is None else environ
return bool(str(source.get(_RUNNER_BASE_URL_ENV) or "").strip())
class HttpRunnerClient:
"""Small JSON HTTP client for the external/supervised runner boundary."""
def __init__(self, *, base_url: str, api_key: str = ""):
self.base_url = str(base_url or "").strip().rstrip("/")
if not self.base_url:
raise ValueError("runner base_url is required")
# Hardening: the runner endpoint is operator-configured, but reject any
# non-HTTP(S) scheme so a misconfigured HERMES_WEBUI_RUNNER_BASE_URL
# (e.g. file:///etc/passwd or ftp://) can never be handed to urlopen.
_scheme = urllib.parse.urlsplit(self.base_url).scheme.lower()
if _scheme not in ("http", "https"):
raise ValueError(
f"runner base_url must be http(s); got scheme '{_scheme or '(none)'}'"
)
self.api_key = str(api_key or "").strip()
@classmethod
def from_env(cls, environ: dict[str, str] | None = None) -> "HttpRunnerClient":
source = os.environ if environ is None else environ
base_url = str(source.get(_RUNNER_BASE_URL_ENV) or "").strip()
if not base_url:
raise NotImplementedError("runner-local chat backend is not configured")
return cls(base_url=base_url, api_key=str(source.get(_RUNNER_API_KEY_ENV) or ""))
def start_run(self, request) -> dict[str, Any]:
return self._post("/v1/runs", {
"session_id": request.session_id,
"message": request.message,
"attachments": list(request.attachments or []),
"workspace": request.workspace,
"profile": request.profile,
"provider": request.provider,
"model": request.model,
"toolsets": list(request.toolsets or []),
"source": request.source,
"metadata": dict(request.metadata or {}),
})
def observe_run(self, run_id: str, *, cursor: str | None = None) -> dict[str, Any]:
query = ""
if cursor not in (None, ""):
query = "?cursor=" + urllib.parse.quote(str(cursor), safe="")
return self._get(f"/v1/runs/{urllib.parse.quote(str(run_id), safe='')}/events{query}")
def get_run(self, run_id: str) -> dict[str, Any]:
return self._get(f"/v1/runs/{urllib.parse.quote(str(run_id), safe='')}")
def cancel_run(self, run_id: str) -> dict[str, Any]:
return self._post(f"/v1/runs/{urllib.parse.quote(str(run_id), safe='')}/cancel", {})
def respond_approval(self, run_id: str, approval_id: str, choice: str) -> dict[str, Any]:
return self._post(
f"/v1/runs/{urllib.parse.quote(str(run_id), safe='')}/approval",
{"choice": choice, "approval_id": approval_id},
)
def respond_clarify(self, run_id: str, clarify_id: str, response: str) -> dict[str, Any]:
return self._post(
f"/v1/runs/{urllib.parse.quote(str(run_id), safe='')}/clarifications/{urllib.parse.quote(str(clarify_id), safe='')}/respond",
{"response": response},
)
def queue_message(self, run_id: str, message: str, *, mode: str = "queue") -> dict[str, Any]:
return self._post(
f"/v1/runs/{urllib.parse.quote(str(run_id), safe='')}/messages",
{"message": message, "mode": mode},
)
def update_goal(self, session_id: str, action: str, text: str = "") -> dict[str, Any]:
return self._post(
f"/v1/sessions/{urllib.parse.quote(str(session_id), safe='')}/goal",
{"action": action, "text": text},
)
def _headers(self) -> dict[str, str]:
headers = {
"Accept": "application/json",
"Content-Type": "application/json",
"User-Agent": "Hermes-WebUI-RunnerClient",
}
if self.api_key:
headers["Authorization"] = f"Bearer {self.api_key}"
return headers
def _get(self, path: str) -> dict[str, Any]:
req = urllib.request.Request(self.base_url + path, headers=self._headers(), method="GET")
return self._request_json(req)
def _post(self, path: str, payload: dict[str, Any]) -> dict[str, Any]:
req = urllib.request.Request(
self.base_url + path,
data=json.dumps(payload).encode("utf-8"),
headers=self._headers(),
method="POST",
)
return self._request_json(req)
def _opener(self) -> urllib.request.OpenerDirector:
# Hardening: do NOT follow redirects. A misbehaving/compromised runner
# returning 3xx Location could otherwise smuggle the Bearer token to
# another host. Treat any redirect as an error instead.
class _NoRedirect(urllib.request.HTTPRedirectHandler):
def redirect_request(self, *args, **kwargs):
return None
return urllib.request.build_opener(_NoRedirect)
def _request_json(self, req: urllib.request.Request) -> dict[str, Any]:
try:
with self._opener().open(req, timeout=60) as resp:
raw = resp.read().decode("utf-8", errors="replace")
except urllib.error.HTTPError as exc:
try:
detail = exc.read(2048).decode("utf-8", errors="replace")
except Exception:
detail = ""
raise RunnerClientError(f"Runner returned HTTP {exc.code}: {detail[:500]}") from exc
except Exception as exc:
raise RunnerClientError(f"Runner request failed: {exc}") from exc
try:
payload = json.loads(raw or "{}")
except json.JSONDecodeError as exc:
raise RunnerClientError("Runner returned invalid JSON") from exc
if not isinstance(payload, dict):
raise RunnerClientError("Runner returned a non-object JSON payload")
return payload