Files
hermes-webui/api/turn_journal.py
nesquena-hermes 396d0d0abd Release v0.51.326 — Release KP (#3618 + #3802 + #3762 + #3810) (#3816)
Batch: mic STT capability probe+fallback (#3618, live-drive verified), journal cleanup on delete (#3802), minimal-schema SQL guard (#3762), Help hover readability (#3810). Full suite 8265, Codex SAFE, Opus SHIP, CI 11/11. Co-authored-by: nesquena-hermes <nesquena-hermes@users.noreply.github.com> Co-authored-by: dso2ng <dso2ng@users.noreply.github.com>
2026-06-07 23:18:51 -07:00

282 lines
11 KiB
Python

"""Crash-safe WebUI turn journal helpers.
The journal is deliberately tiny: one JSONL file per session, append-only events,
and read helpers that tolerate malformed lines. Recovery and repair can then
reason about submitted turns without depending on in-memory stream state.
"""
from __future__ import annotations
import json
import os
import re
import time
import uuid
from contextlib import contextmanager
from pathlib import Path
from typing import Iterable
try: # pragma: no cover - fcntl is unavailable on Windows.
import fcntl as _fcntl
except ImportError: # pragma: no cover
_fcntl = None
TURN_JOURNAL_DIR_NAME = "_turn_journal"
_TERMINAL_EVENTS = {"completed", "interrupted"}
_SESSION_ID_RE = re.compile(r"^[A-Za-z0-9_.-]+$")
def _default_session_dir() -> Path:
from api.models import SESSION_DIR
return Path(SESSION_DIR)
def _journal_path(session_id: str, session_dir: Path | None = None) -> Path:
sid = str(session_id or "").strip()
if not sid or "/" in sid or "\\" in sid or not _SESSION_ID_RE.fullmatch(sid):
raise ValueError("invalid session_id")
root = Path(session_dir) if session_dir is not None else _default_session_dir()
return root / TURN_JOURNAL_DIR_NAME / f"{sid}~{os.getpid()}.jsonl"
def _make_turn_id() -> str:
return f"{time.strftime('%Y%m%dT%H%M%SZ', time.gmtime())}-{uuid.uuid4().hex[:12]}"
@contextmanager
def _journal_file_lock(file_obj):
"""Serialize multi-process journal writes when advisory locks exist.
``O_APPEND`` keeps normal same-process appends simple, but a long JSONL event
can exceed POSIX's small atomic-write boundary. On Unix, take an advisory
lock around the single event write+fsync so two WebUI worker processes cannot
interleave large submitted-message payloads into corrupted JSONL. Platforms
without ``fcntl`` keep the previous best-effort append behavior.
"""
if _fcntl is None:
yield
return
_fcntl.flock(file_obj.fileno(), _fcntl.LOCK_EX)
try:
yield
finally:
_fcntl.flock(file_obj.fileno(), _fcntl.LOCK_UN)
def append_turn_journal_event(
session_id: str,
event: dict,
*,
session_dir: Path | None = None,
) -> dict:
"""Append one turn journal event and fsync it before returning.
The returned event is the exact payload written, with default ``version``,
``session_id``, ``turn_id``, and ``created_at`` fields filled in.
"""
if not isinstance(event, dict):
raise TypeError("event must be a dict")
event_name = str(event.get("event") or "").strip()
if not event_name:
raise ValueError("event is required")
payload = dict(event)
payload.setdefault("version", 1)
payload["session_id"] = str(session_id)
payload.setdefault("turn_id", _make_turn_id())
payload.setdefault("created_at", time.time())
if event_name in _TERMINAL_EVENTS:
payload.setdefault("terminal", True)
path = _journal_path(session_id, session_dir=session_dir)
path.parent.mkdir(parents=True, exist_ok=True)
line = json.dumps(payload, ensure_ascii=False, separators=(",", ":")) + "\n"
fd = os.open(path, os.O_CREAT | os.O_APPEND | os.O_WRONLY, 0o600)
with os.fdopen(fd, "a", encoding="utf-8") as fh:
with _journal_file_lock(fh):
fh.write(line)
fh.flush()
os.fsync(fh.fileno())
o_directory = getattr(os, "O_DIRECTORY", None)
if o_directory is not None:
try:
dir_fd = os.open(path.parent, o_directory)
try:
os.fsync(dir_fd)
finally:
os.close(dir_fd)
except OSError:
pass
return payload
def read_turn_journal(session_id: str, *, session_dir: Path | None = None) -> dict:
"""Read a session journal, merging all pid-scoped shards and returning valid events plus malformed lines."""
sid = str(session_id or "").strip()
if not sid or "/" in sid or "\\" in sid or not _SESSION_ID_RE.fullmatch(sid):
raise ValueError("invalid session_id")
root = Path(session_dir) if session_dir is not None else _default_session_dir()
journal_dir = root / TURN_JOURNAL_DIR_NAME
events: list[dict] = []
malformed: list[dict] = []
# Collect pid-scoped shards ({sid}~{pid}.jsonl) plus legacy ({sid}.jsonl).
# The ~ separator cannot appear in session IDs (_SESSION_ID_RE allows only [A-Za-z0-9_.-]),
# so the glob is unambiguous even for dotted-numeric session IDs like "sess.123".
shards: list[Path] = list(journal_dir.glob(f"{sid}~*.jsonl")) if journal_dir.exists() else []
legacy = journal_dir / f"{sid}.jsonl"
if legacy.exists():
shards.append(legacy)
if not shards:
return {"session_id": str(session_id), "events": [], "malformed": []}
for shard in shards:
try:
lines = shard.read_text(encoding="utf-8").splitlines()
except FileNotFoundError:
continue
for line_no, raw in enumerate(lines, start=1):
if not raw.strip():
continue
try:
event = json.loads(raw)
except json.JSONDecodeError:
malformed.append({"line": line_no, "raw": raw, "shard": shard.name})
continue
if isinstance(event, dict):
events.append(event)
else:
malformed.append({"line": line_no, "raw": raw, "shard": shard.name})
def _safe_ts(e):
try:
return float(e.get("created_at") or 0)
except (ValueError, TypeError):
return 0.0
events.sort(key=_safe_ts)
return {"session_id": str(session_id), "events": events, "malformed": malformed}
def derive_turn_journal_states(events: Iterable[dict]) -> tuple[dict[str, dict], list[dict]]:
'''Return the latest event per ``turn_id`` and any terminal-collision entries.
The first element is the latest event per turn_id (same overwrite-by-timestamp
behaviour as before). The second element is a list of collision records, one
per turn_id that had more than one terminal event. Each collision record
contains ``turn_id`` and the ``events`` list (in ascending created_at order).
A collision means the same logical turn recorded both ``completed`` and
``interrupted`` terminal events -- the derived state still picks the latest
by timestamp, but callers can now detect and audit the double-terminal
situation explicitly rather than having it silently collapse.
'''
states: dict[str, dict] = {}
# Collect all terminal events per turn_id to detect collisions
terminal_events: dict[str, list[dict]] = {}
for event in events:
if not isinstance(event, dict):
continue
turn_id = str(event.get('turn_id') or '').strip()
if not turn_id:
continue
# Track terminal events for collision detection
if is_terminal_turn_event(event):
terminal_events.setdefault(turn_id, []).append(event)
# Existing latest-by-timestamp derivation
previous = states.get(turn_id)
if previous is None or float(event.get('created_at') or 0) >= float(previous.get('created_at') or 0):
states[turn_id] = event
# Build collision list: turn_ids with more than one terminal event
collisions = [
{'turn_id': tid, 'events': sorted(evts, key=lambda e: float(e.get('created_at') or 0))}
for tid, evts in terminal_events.items()
if len(evts) > 1
]
return states, collisions
def _latest_turn_id_for_stream(events: Iterable[dict], stream_id: str) -> str | None:
stream = str(stream_id or "").strip()
if not stream:
return None
latest: str | None = None
for event in events:
if not isinstance(event, dict):
continue
if str(event.get("stream_id") or "") != stream:
continue
turn_id = str(event.get("turn_id") or "").strip()
if turn_id:
latest = turn_id
return latest
def append_turn_journal_event_for_stream(
session_id: str,
stream_id: str,
event: dict,
*,
session_dir: Path | None = None,
) -> dict:
"""Append a lifecycle event for the turn associated with ``stream_id``."""
payload = dict(event)
payload["stream_id"] = str(stream_id)
if not payload.get("turn_id"):
journal = read_turn_journal(session_id, session_dir=session_dir)
turn_id = _latest_turn_id_for_stream(journal.get("events") or [], stream_id)
if turn_id:
payload["turn_id"] = turn_id
return append_turn_journal_event(session_id, payload, session_dir=session_dir)
def iter_turn_journal_session_ids(session_dir: Path) -> list[str]:
journal_dir = Path(session_dir) / TURN_JOURNAL_DIR_NAME
if not journal_dir.exists():
return []
session_ids: set[str] = set()
for path in journal_dir.glob("*.jsonl"):
if not path.is_file():
continue
stem = path.stem # e.g. "sid-1~12345" or "sid-1"
tilde = stem.find("~")
if tilde > 0:
session_ids.add(stem[:tilde])
else:
session_ids.add(stem)
return sorted(session_ids)
def delete_turn_journal(session_id: str, *, session_dir: Path | None = None) -> int:
"""Remove every turn-journal shard for ``session_id``.
Deletes both the pid-scoped shards (``{sid}~{pid}.jsonl``) written by
:func:`append_turn_journal_event` and the legacy single-file form
(``{sid}.jsonl``) that :func:`read_turn_journal` still merges. Returns the
number of files removed. Invalid/empty ids and a missing journal directory
are treated as a no-op so callers can invoke this unconditionally on delete.
"""
sid = str(session_id or "").strip()
# Reject "."/".." for parity with delete_run_journal — the regex permits
# dots, and a traversal id has no legitimate use here.
if sid in (".", "..") or not sid or "/" in sid or "\\" in sid or not _SESSION_ID_RE.fullmatch(sid):
return 0
root = Path(session_dir) if session_dir is not None else _default_session_dir()
journal_dir = root / TURN_JOURNAL_DIR_NAME
if not journal_dir.exists():
return 0
removed = 0
shards = list(journal_dir.glob(f"{sid}~*.jsonl"))
legacy = journal_dir / f"{sid}.jsonl"
if legacy.exists():
shards.append(legacy)
for shard in shards:
try:
shard.unlink()
removed += 1
except FileNotFoundError:
pass
except OSError:
# Best-effort cleanup; the caller logs the overall delete outcome.
pass
return removed
def is_terminal_turn_event(event: dict) -> bool:
return str((event or {}).get("event") or "") in _TERMINAL_EVENTS