Merge pull request #2043 from nesquena/stage-335

Release Q — v0.51.41 — 3-PR contributor batch (session recovery audit + run-lifecycle health + transcript dedup)
This commit is contained in:
nesquena-hermes
2026-05-10 17:52:39 -07:00
committed by GitHub
11 changed files with 649 additions and 19 deletions
+29 -1
View File
@@ -2,6 +2,35 @@
## [Unreleased]
## [v0.51.41] — 2026-05-11 — Release Q (3-PR contributor batch — session recovery audit + run-lifecycle health + transcript dedup)
### Fixed
- **PR #2035** by @ai-ag2026 — Recover orphaned `<sid>.json.bak` snapshots on startup (extends #1558 P0 fix). The existing post-#1558 recovery path only scanned `*.json`, so a crash that left only the `.bak` snapshot meant data was on disk but invisible to `/api/sessions` and the sidebar. Now the startup self-heal looks up the orphan `sid` in `state.db.sessions`; if the row exists, the snapshot is restored, the session index rebuilt, and the live sidecar appears again. If `state.db` lacks the row (explicit tombstone), the orphan is left alone. Companion change in `api/routes.py` unlinks `<sid>.json.bak` on explicit delete so intentional deletes don't get resurrected later. Fail-open on `state.db` unreadable/locked/older-schema — recovery stays best-effort.
- **PR #2036** by @ai-ag2026 — Read-only `audit_session_recovery()` report + module CLI (`python -m api.session_recovery --audit --session-dir <dir> [--state-db <db>]`). Classifies shrunken live sidecars, orphan backups, orphans without a `state.db` row, and stale `_index.json` entries. Pure read-only audit — no writes, no rebuilds, no restores. Outputs machine-readable JSON. Stacked on #2035 (and auto-closed it).
- **PR #2038** by @franksong2702 — Closed the message-identity dedup gap in `/api/session` messaging transcript merges (closes #2027). The dedup key now prefers `id`/`message_id` when message identity is available; legacy role/content/timestamp/tool-metadata key remains as fallback for messages without IDs. Prevents silent loss of legitimate retries (rare but high-impact when it hits).
### Added
- **PR #2039** by @ai-ag2026 — Active-run lifecycle visibility in `/health`. SSE `active_streams` only describes channel state; a worker can outlive its SSE stream while unwinding, blocked in a provider call, handling cancellation, or waiting on delegated work. Adds `active_runs`, per-run metadata/age, `oldest_run_age_seconds`, `last_run_finished_at`, and idle grace timing. Restart/update guards now have visibility into worker lifecycle, not just SSE channel state. Worker lifecycle wired through `_register_run` / `_update_run` / `_unregister_run` in streaming.
### Tests
5100 → **5108 passing, 0 regressions** (+8 net new across new test files for session-recovery audit, run-lifecycle health, transcript dedup, and orphan-backup recovery). Full suite ~160s on Python 3.11 with `HERMES_HOME` isolation.
### Notes
- 3 PRs from 2 different authors (#2035 stacked under #2036 — auto-closed when #2036 merged).
- `api/routes.py` was touched by all three PRs with disjoint hunks (#2039 at lines 2529/2609, #2038 at 3040, #2036 at 4147).
- `CHANGELOG.md` was the only true conflict (`#2038` predates v0.51.40 release entry); resolved by preserving v0.51.40 history and re-adding the #2038 bullet under [Unreleased] before promoting.
### Follow-ups
- Test isolation: at least one test in `test_update_banner_fixes.py` or `test_updates.py` triggers a real `os.execv` that re-executes the entire pytest suite. Suite still passes (~5108 each loop) but full run takes 4× the time. Worth a targeted fix in the next maintenance batch.
## [v0.51.40] — 2026-05-11 — Release P (4-PR contributor batch — quota subprocess hardening + env-lock prewarm + cron one-shot warning + Xiaomi env key)
### Fixed
@@ -22,7 +51,6 @@
- 4 PRs from 3 different authors. `api/providers.py` was touched by #2030 (+110/-7 in quota probe path) and #2034 (+1 in `_PROVIDER_ENV_VAR` map) with disjoint hunks. `CHANGELOG.md` Unreleased section was the only true conflict (#2033 + #2034 both added bullets); resolved by keeping both entries. Stage merge otherwise clean.
## [v0.51.39] — 2026-05-10 — Release O (4-PR contributor batch — Railway docker fix + Stop-button race + provider resolver + live context tracking)
### Fixed
+40
View File
@@ -3684,8 +3684,48 @@ STREAM_REASONING_TEXT: dict = {} # stream_id -> reasoning trace accumulated dur
STREAM_LIVE_TOOL_CALLS: dict = {} # stream_id -> live tool calls accumulated during streaming (#1361 §B)
STREAM_GOAL_RELATED: dict = {} # stream_id -> bool: only evaluate goal for goal-related turns (#1932)
PENDING_GOAL_CONTINUATION: set = set() # session_ids awaiting a goal continuation turn (#1932)
# Active agent-run registry. This intentionally tracks worker lifecycle rather
# than SSE lifecycle: cancel/reconnect may remove STREAMS while the worker is
# still unwinding, blocked in a provider call, or waiting for delegated work.
ACTIVE_RUNS: dict = {}
ACTIVE_RUNS_LOCK = threading.Lock()
LAST_RUN_FINISHED_AT: float | None = None
SERVER_START_TIME = time.time()
def register_active_run(stream_id: str, **metadata) -> None:
"""Mark a WebUI agent worker as alive until its outer finally exits."""
if not stream_id:
return
now = time.time()
entry = dict(metadata or {})
entry.setdefault("stream_id", stream_id)
entry.setdefault("started_at", now)
entry.setdefault("phase", "running")
with ACTIVE_RUNS_LOCK:
ACTIVE_RUNS[stream_id] = entry
def update_active_run(stream_id: str, **metadata) -> None:
"""Update active-run metadata without creating a new run implicitly."""
if not stream_id:
return
with ACTIVE_RUNS_LOCK:
entry = ACTIVE_RUNS.get(stream_id)
if entry is not None:
entry.update(metadata)
def unregister_active_run(stream_id: str) -> None:
"""Remove a worker from the active-run registry and record idle start."""
if not stream_id:
return
global LAST_RUN_FINISHED_AT
with ACTIVE_RUNS_LOCK:
ACTIVE_RUNS.pop(stream_id, None)
LAST_RUN_FINISHED_AT = time.time()
# Agent cache: reuse AIAgent across messages in the same WebUI session so that
# _user_turn_count survives between turns. This mirrors the gateway's
# _agent_cache pattern and is required for injectionFrequency: "first-turn".
+54 -7
View File
@@ -2529,6 +2529,39 @@ def _streams_lock_health(timeout_seconds: float = 0.5) -> dict:
STREAMS_LOCK.release()
def _run_lifecycle_health() -> dict:
"""Return active worker-run state independent of SSE stream presence."""
# Import the module rather than relying only on imported scalar aliases so
# LAST_RUN_FINISHED_AT stays fresh after unregister_active_run() updates it.
from api import config as _live_config
now = time.time()
with _live_config.ACTIVE_RUNS_LOCK:
runs = []
for stream_id, raw in (_live_config.ACTIVE_RUNS or {}).items():
item = dict(raw or {})
started_at = item.get("started_at")
try:
age = max(0.0, now - float(started_at))
except Exception:
age = 0.0
item.setdefault("stream_id", stream_id)
item["age_seconds"] = round(age, 1)
runs.append(item)
last_finished = _live_config.LAST_RUN_FINISHED_AT
runs.sort(key=lambda item: float(item.get("started_at") or 0.0))
payload = {
"active_runs": len(runs),
"runs": runs,
"last_run_finished_at": last_finished,
}
if runs:
payload["oldest_run_age_seconds"] = runs[0].get("age_seconds", 0.0)
elif last_finished:
payload["idle_seconds_since_last_run"] = round(max(0.0, now - float(last_finished)), 1)
return payload
def _deep_health_checks(stream_check: dict | None = None) -> tuple[dict, bool]:
"""Run cheap probes that exercise the state paths used by the UI shell.
@@ -2609,13 +2642,21 @@ def _deep_health_checks(stream_check: dict | None = None) -> tuple[dict, bool]:
def _handle_health(handler, parsed):
deep = parse_qs(parsed.query or "").get("deep", [""])[0].lower() in {"1", "true", "yes", "on"}
stream_check = _streams_lock_health()
run_check = _run_lifecycle_health()
payload = {
"status": "ok" if stream_check.get("status") == "ok" else "degraded",
"sessions": len(SESSIONS),
"active_streams": int(stream_check.get("active_streams") or 0),
"active_runs": int(run_check.get("active_runs") or 0),
"runs": run_check.get("runs", []),
"last_run_finished_at": run_check.get("last_run_finished_at"),
"uptime_seconds": round(time.time() - SERVER_START_TIME, 1),
"accept_loop": _accept_loop_health(handler),
}
if "oldest_run_age_seconds" in run_check:
payload["oldest_run_age_seconds"] = run_check["oldest_run_age_seconds"]
if "idle_seconds_since_last_run" in run_check:
payload["idle_seconds_since_last_run"] = run_check["idle_seconds_since_last_run"]
if deep:
if stream_check.get("status") != "ok":
payload["checks"] = {"streams_lock": stream_check}
@@ -3040,13 +3081,18 @@ def handle_get(handler, parsed) -> bool:
str(m.get("role") or ""),
str(m.get("content") or ""),
)):
key = (
str(msg.get("role") or ""),
str(msg.get("content") or ""),
str(msg.get("timestamp") or ""),
str(msg.get("tool_call_id") or ""),
str(msg.get("tool_name") or msg.get("name") or ""),
)
message_identity = msg.get("id") or msg.get("message_id")
if message_identity:
key = ("message_id", str(message_identity))
else:
key = (
"legacy",
str(msg.get("role") or ""),
str(msg.get("content") or ""),
str(msg.get("timestamp") or ""),
str(msg.get("tool_call_id") or ""),
str(msg.get("tool_name") or msg.get("name") or ""),
)
if key in seen_message_keys:
continue
seen_message_keys.add(key)
@@ -4147,6 +4193,7 @@ def handle_post(handler, parsed) -> bool:
return bad(handler, "Invalid session_id", 400)
try:
p.unlink(missing_ok=True)
p.with_suffix('.json.bak').unlink(missing_ok=True)
except Exception:
logger.debug("Failed to unlink session file %s", p)
# Prune the per-session agent lock so deleted sessions don't leak
+214 -10
View File
@@ -5,13 +5,16 @@ data-loss bugs like #1558.
``Session.save()`` writes a ``<sid>.json.bak`` snapshot of the previous
state whenever an incoming save would shrink the messages array. This
module reads those snapshots back and restores any session whose live
file has fewer messages than its backup.
file has fewer messages than its backup, or whose live file is missing
while a valid backup remains.
Three integration points:
1. ``recover_all_sessions_on_startup()`` — called from server.py at boot,
scans the session dir, restores any session whose JSON has fewer
messages than its .bak. Idempotent: a clean run is a no-op.
messages than its .bak, and recreates a missing ``<sid>.json`` from an
orphaned ``<sid>.json.bak`` when the canonical state DB still has that
session. Idempotent: a clean run is a no-op.
2. ``recover_session(sid)`` — single-session helper backing the
``POST /api/session/recover`` endpoint, so users can re-run recovery
@@ -22,9 +25,11 @@ Three integration points:
"""
from __future__ import annotations
import argparse
import json
import logging
import shutil
import sqlite3
from pathlib import Path
logger = logging.getLogger(__name__)
@@ -117,24 +122,195 @@ def recover_session(session_path: Path) -> dict:
return {**status, "restored": True}
def recover_all_sessions_on_startup(session_dir: Path) -> dict:
"""Scan session_dir for shrunken sessions, restore each from its .bak.
def _state_db_has_session(session_id: str, state_db_path: Path | None) -> bool:
"""Return whether state.db still knows this session.
Returns {"scanned": N, "restored": M, "details": [...]}.
The check is deliberately fail-open: recovery must not be prevented by a
locked, absent, or older-schema state DB. When a DB is readable and has no
row, treat the orphan backup as a tombstoned/deleted session and skip it.
"""
if state_db_path is None or not state_db_path.exists():
return True
try:
with sqlite3.connect(f"file:{state_db_path}?mode=ro", uri=True) as conn:
cur = conn.execute(
"select 1 from sqlite_master where type='table' and name='sessions'"
)
if cur.fetchone() is None:
return True
cur = conn.execute("select 1 from sessions where id = ? limit 1", (session_id,))
return cur.fetchone() is not None
except Exception as exc:
logger.debug("state_db session tombstone check failed for %s: %s", session_id, exc)
return True
def _orphaned_backup_live_paths(
session_dir: Path,
state_db_path: Path | None = None,
) -> list[Path]:
"""Return live ``<sid>.json`` paths whose ``<sid>.json.bak`` exists.
``Path.glob('*.json')`` does not see orphan backups because their suffix is
``.bak``. Existing startup recovery only handled shrunken live files; this
helper covers the crash shape where the live sidecar is gone but the rescue
copy remains.
"""
paths: list[Path] = []
for bak_path in sorted(session_dir.glob('*.json.bak')):
live_path = bak_path.with_suffix('')
if live_path.name.startswith('_') or live_path.exists():
continue
if _msg_count(bak_path) < 0:
continue
session_id = live_path.stem
if not _state_db_has_session(session_id, state_db_path):
logger.info(
"recover_all_sessions_on_startup: skipped orphan backup %s; "
"state.db has no live session row",
bak_path.name,
)
continue
paths.append(live_path)
return paths
def _new_audit_item(
session_id: str,
kind: str,
category: str,
recommendation: str,
live_messages: int = -1,
bak_messages: int = -1,
) -> dict:
return {
"session_id": session_id,
"kind": kind,
"category": category,
"recommendation": recommendation,
"live_messages": live_messages,
"bak_messages": bak_messages,
}
def _read_index_session_ids(index_path: Path) -> set[str]:
try:
data = json.loads(index_path.read_text(encoding='utf-8'))
except (OSError, json.JSONDecodeError, ValueError):
return set()
if not isinstance(data, list):
return set()
ids: set[str] = set()
for entry in data:
if isinstance(entry, dict) and isinstance(entry.get('session_id'), str):
ids.add(entry['session_id'])
return ids
def audit_session_recovery(session_dir: Path, state_db_path: Path | None = None) -> dict:
"""Read-only audit of session recovery state.
The audit intentionally does not mutate files. It classifies only the safe
recovery primitives this module knows how to perform: backup restores and
derived index rebuilds. Call ``recover_all_sessions_on_startup`` separately
for safe repairs.
"""
if not session_dir.exists():
return {"scanned": 0, "restored": 0, "details": []}
return {
"status": "ok",
"summary": {"ok": 0, "repairable": 0, "unsafe_to_repair": 0},
"items": [],
}
items: list[dict] = []
live_paths = sorted(p for p in session_dir.glob('*.json') if not p.name.startswith('_'))
live_ids = {p.stem for p in live_paths}
for live_path in live_paths:
status = inspect_session_recovery_status(live_path)
if status.get('recommend') == 'restore':
items.append(_new_audit_item(
status['session_id'],
"shrunken_live",
"repairable",
"restore_from_bak",
status.get('live_messages', -1),
status.get('bak_messages', -1),
))
for bak_path in sorted(session_dir.glob('*.json.bak')):
live_path = bak_path.with_suffix('')
if live_path.exists() or live_path.name.startswith('_'):
continue
bak_messages = _msg_count(bak_path)
session_id = live_path.stem
if bak_messages < 0:
items.append(_new_audit_item(
session_id, "malformed_orphan_backup", "unsafe_to_repair", "manual_review", -1, bak_messages
))
elif _state_db_has_session(session_id, state_db_path):
items.append(_new_audit_item(
session_id, "orphan_backup", "repairable", "restore_from_bak", -1, bak_messages
))
else:
items.append(_new_audit_item(
session_id,
"orphan_backup_without_state_row",
"unsafe_to_repair",
"manual_review",
-1,
bak_messages,
))
index_path = session_dir / '_index.json'
if index_path.exists():
index_ids = _read_index_session_ids(index_path)
for session_id in sorted(index_ids - live_ids):
items.append(_new_audit_item(
session_id, "index_missing_file", "repairable", "rebuild_index"
))
for session_id in sorted(live_ids - index_ids):
items.append(_new_audit_item(
session_id, "index_missing_entry", "repairable", "rebuild_index",
_msg_count(session_dir / f"{session_id}.json"), -1,
))
summary = {"ok": len(live_paths), "repairable": 0, "unsafe_to_repair": 0}
for item in items:
category = item.get('category')
if category in summary:
summary[category] += 1
if summary["unsafe_to_repair"]:
overall = "needs_manual_review"
elif summary["repairable"]:
overall = "warn"
else:
overall = "ok"
return {"status": overall, "summary": summary, "items": items}
def recover_all_sessions_on_startup(
session_dir: Path,
rebuild_index: bool = False,
state_db_path: Path | None = None,
) -> dict:
"""Scan session_dir for shrunken/orphaned sessions and restore from .bak.
Returns {"scanned": N, "restored": M, "orphaned_backups": K, "details": [...]}.
"""
if not session_dir.exists():
return {"scanned": 0, "restored": 0, "orphaned_backups": 0, "details": []}
scanned = 0
restored = 0
details: list[dict] = []
for path in session_dir.glob('*.json'):
live_paths = [path for path in sorted(session_dir.glob('*.json')) if not path.name.startswith('_')]
orphan_paths = _orphaned_backup_live_paths(session_dir, state_db_path=state_db_path)
for path in [*live_paths, *orphan_paths]:
# Skip non-session JSON files in the same dir:
# - ``_index.json`` is a top-level list of session metadata
# - any future non-session JSON marked with the ``_`` convention is
# skipped automatically (project convention for system files in
# directories that otherwise hold user data)
if path.name.startswith('_'):
continue
scanned += 1
try:
result = recover_session(path)
@@ -155,4 +331,32 @@ def recover_all_sessions_on_startup(session_dir: Path) -> dict:
"If you weren't expecting this, check the session list for missing "
"messages — see #1558.", restored, scanned,
)
return {"scanned": scanned, "restored": restored, "details": details}
if rebuild_index:
try:
from api.models import _write_session_index
_write_session_index(updates=None)
except Exception as exc:
logger.warning("recover_all_sessions_on_startup: index rebuild failed: %s", exc)
return {
"scanned": scanned,
"restored": restored,
"orphaned_backups": len(orphan_paths),
"details": details,
}
def _main() -> int:
parser = argparse.ArgumentParser(description="Audit Hermes WebUI session recovery state")
parser.add_argument("--audit", action="store_true", help="run a read-only recovery audit")
parser.add_argument("--session-dir", type=Path, required=True, help="path to WebUI sessions directory")
parser.add_argument("--state-db", type=Path, default=None, help="optional Hermes state.db path")
args = parser.parse_args()
if not args.audit:
parser.error("currently only --audit is supported")
report = audit_session_recovery(args.session_dir, state_db_path=args.state_db)
print(json.dumps(report, sort_keys=True))
return 0
if __name__ == "__main__":
raise SystemExit(_main())
+14
View File
@@ -26,6 +26,7 @@ from api.config import (
STREAM_GOAL_RELATED, PENDING_GOAL_CONTINUATION,
LOCK, SESSIONS, SESSION_DIR,
_get_session_agent_lock, _set_thread_env, _clear_thread_env,
register_active_run, update_active_run, unregister_active_run,
SESSION_AGENT_LOCKS, SESSION_AGENT_LOCKS_LOCK,
resolve_model_provider,
resolve_custom_provider_connection,
@@ -2030,6 +2031,16 @@ def _run_agent_streaming(
q = STREAMS.get(stream_id)
if q is None:
return
register_active_run(
stream_id,
session_id=session_id,
started_at=time.time(),
phase="starting",
workspace=str(workspace),
model=model,
provider=model_provider,
ephemeral=bool(ephemeral),
)
s = None
_rt = {}
old_cwd = None
@@ -2211,6 +2222,7 @@ def _run_agent_streaming(
_agent_lock = None
try:
s = get_session(session_id)
update_active_run(stream_id, phase="running", session_id=session_id)
s.workspace = str(Path(workspace).expanduser().resolve())
s.model = model
provider_context = (
@@ -3917,6 +3929,7 @@ def _run_agent_streaming(
if (s is not None
and getattr(s, 'active_stream_id', None) == stream_id
and getattr(s, 'pending_user_message', None)):
update_active_run(stream_id, phase="finalizing")
_last_resort_sync_from_core(s, stream_id, _agent_lock)
_clear_thread_env() # TD1: always clear thread-local context
with STREAMS_LOCK:
@@ -3927,6 +3940,7 @@ def _run_agent_streaming(
STREAM_REASONING_TEXT.pop(stream_id, None) # Clean up reasoning trace (#1361 §A)
STREAM_LIVE_TOOL_CALLS.pop(stream_id, None) # Clean up tool calls (#1361 §B)
STREAM_GOAL_RELATED.pop(stream_id, None) # Clean up goal-related flag (#1932)
unregister_active_run(stream_id)
# NOTE: do NOT discard PENDING_GOAL_CONTINUATION here. The marker
# is set by goal_continue (line ~3328) inside the SAME function
# call and consumed atomically by `_start_chat_stream_for_session`
+6 -1
View File
@@ -220,8 +220,13 @@ def main() -> None:
# its .bak (the data-loss shape #1558 produced), restore from the .bak.
# Safe to run unconditionally — a clean install is a no-op.
try:
from api.models import _active_state_db_path
from api.session_recovery import recover_all_sessions_on_startup
result = recover_all_sessions_on_startup(SESSION_DIR)
result = recover_all_sessions_on_startup(
SESSION_DIR,
rebuild_index=True,
state_db_path=_active_state_db_path(),
)
if result.get("restored"):
print(f"[recovery] Restored {result['restored']}/{result['scanned']} sessions from .bak (see #1558).", flush=True)
except Exception as exc:
+65
View File
@@ -204,6 +204,71 @@ def test_recover_all_sessions_on_startup_restores_shrunken_session(temp_session_
assert len(restored["messages"]) == 1000
def test_recover_all_sessions_on_startup_restores_orphan_bak(temp_session_dir):
"""Startup self-heal: if only <sid>.json.bak survived, recreate <sid>.json."""
sid = _make_session_on_disk(temp_session_dir, n_msgs=293)
live_path = temp_session_dir / f"{sid}.json"
bak_path = temp_session_dir / f"{sid}.json.bak"
bak_path.write_text(live_path.read_text(encoding="utf-8"), encoding="utf-8")
live_path.unlink()
from api.session_recovery import recover_all_sessions_on_startup
result = recover_all_sessions_on_startup(temp_session_dir)
assert result["restored"] == 1
assert result["scanned"] == 1
assert result.get("orphaned_backups") == 1
restored = json.loads(live_path.read_text(encoding="utf-8"))
assert len(restored["messages"]) == 293
def test_recover_all_sessions_on_startup_rebuilds_index_after_orphan_restore(temp_session_dir, monkeypatch):
"""A restored orphan must be visible through the WebUI session index immediately."""
import api.models as _m
sid = _make_session_on_disk(temp_session_dir, n_msgs=42)
live_path = temp_session_dir / f"{sid}.json"
bak_path = temp_session_dir / f"{sid}.json.bak"
bak_path.write_text(live_path.read_text(encoding="utf-8"), encoding="utf-8")
live_path.unlink()
stale_index = temp_session_dir / "_index.json"
stale_index.write_text(json.dumps([]), encoding="utf-8")
monkeypatch.setattr(_m, "SESSION_INDEX_FILE", stale_index)
from api.session_recovery import recover_all_sessions_on_startup
result = recover_all_sessions_on_startup(temp_session_dir, rebuild_index=True)
assert result["restored"] == 1
index = json.loads(stale_index.read_text(encoding="utf-8"))
assert [entry["session_id"] for entry in index] == [sid]
assert index[0]["message_count"] == 42
def test_orphan_bak_recovery_skips_sessions_absent_from_state_db(temp_session_dir):
"""Do not resurrect an explicitly deleted session when state.db lacks the row."""
import sqlite3
sid = _make_session_on_disk(temp_session_dir, n_msgs=12)
live_path = temp_session_dir / f"{sid}.json"
bak_path = temp_session_dir / f"{sid}.json.bak"
bak_path.write_text(live_path.read_text(encoding="utf-8"), encoding="utf-8")
live_path.unlink()
state_db = temp_session_dir / "state.db"
with sqlite3.connect(state_db) as conn:
conn.execute("create table sessions (id text primary key)")
conn.execute("insert into sessions (id) values (?)", ("different_session",))
from api.session_recovery import recover_all_sessions_on_startup
result = recover_all_sessions_on_startup(temp_session_dir, state_db_path=state_db)
assert result["restored"] == 0
assert result["scanned"] == 0
assert result["orphaned_backups"] == 0
assert not live_path.exists()
def test_recover_all_sessions_on_startup_is_idempotent_no_op_on_clean_state(temp_session_dir):
"""A clean install (no .bak files) must not modify anything."""
sid = _make_session_on_disk(temp_session_dir, n_msgs=1000)
+13
View File
@@ -335,6 +335,19 @@ def test_server_delete_invalidates_index(cleanup_test_sessions):
return
assert False, "session/delete handler not found in server.py or api/routes.py"
def test_server_delete_removes_session_bak_snapshot(cleanup_test_sessions):
"""session/delete must remove sidecar backups so deleted sessions stay deleted."""
routes_src = (REPO_ROOT / "api" / "routes.py").read_text()
delete_idx = max(
routes_src.find("if parsed.path == '/api/session/delete':"),
routes_src.find('if parsed.path == "/api/session/delete":'),
)
assert delete_idx >= 0, "session/delete handler not found in api/routes.py"
delete_block = routes_src[delete_idx:delete_idx+1400]
assert "with_suffix('.json.bak').unlink" in delete_block or 'with_suffix(".json.bak").unlink' in delete_block, \
"session/delete must unlink <sid>.json.bak to avoid later orphan-backup recovery"
# ── R9: Token/tool SSE events write to wrong session after switch ─────────────
def test_token_handler_guards_session_id(cleanup_test_sessions):
+50
View File
@@ -0,0 +1,50 @@
"""Regression coverage for restart-safety run lifecycle reporting."""
import time
def test_health_counts_active_runs_even_when_no_sse_streams():
"""A worker run can outlive its SSE channel; health must expose the run."""
from api import config, routes
with config.STREAMS_LOCK:
config.STREAMS.clear()
with config.ACTIVE_RUNS_LOCK:
config.ACTIVE_RUNS.clear()
config.ACTIVE_RUNS["stream-1"] = {
"stream_id": "stream-1",
"session_id": "session-1",
"started_at": time.time() - 42,
"phase": "running",
}
try:
stream_check = routes._streams_lock_health()
run_check = routes._run_lifecycle_health()
assert stream_check["active_streams"] == 0
assert run_check["active_runs"] == 1
assert run_check["oldest_run_age_seconds"] >= 40
assert run_check["runs"][0]["session_id"] == "session-1"
finally:
with config.ACTIVE_RUNS_LOCK:
config.ACTIVE_RUNS.clear()
def test_run_registry_unregister_records_last_finished_time():
"""Guards need a grace window after the last real worker exits."""
from api import config
with config.ACTIVE_RUNS_LOCK:
config.ACTIVE_RUNS.clear()
config.LAST_RUN_FINISHED_AT = None
config.register_active_run("stream-2", session_id="session-2", phase="starting")
with config.ACTIVE_RUNS_LOCK:
assert "stream-2" in config.ACTIVE_RUNS
config.unregister_active_run("stream-2")
with config.ACTIVE_RUNS_LOCK:
assert "stream-2" not in config.ACTIVE_RUNS
assert isinstance(config.LAST_RUN_FINISHED_AT, float)
@@ -59,3 +59,67 @@ def test_session_endpoint_merges_sidecar_and_lineage_messages_for_cli_sessions(m
"tip assistant",
"sidecar tail",
]
def test_session_endpoint_preserves_distinct_messages_with_different_ids(monkeypatch):
class DummySession:
def __init__(self):
self.messages = [
{
"id": "sidecar-retry",
"role": "user",
"content": "retry the same request",
"timestamp": 2.0,
}
]
self.tool_calls = []
self.active_stream_id = None
self.pending_user_message = None
self.pending_attachments = []
self.pending_started_at = None
self.context_length = 0
self.threshold_tokens = 0
self.last_prompt_tokens = 0
self.model = "openai/gpt-5"
self.session_id = "tip"
def compact(self):
return {"session_id": "tip", "title": "Tip", "model": "openai/gpt-5"}
captured = {}
monkeypatch.setattr(routes, "get_session", lambda sid, metadata_only=False: DummySession())
monkeypatch.setattr(routes, "_clear_stale_stream_state", lambda s: None)
monkeypatch.setattr(routes, "_lookup_cli_session_metadata", lambda sid: {"session_source": "messaging"})
monkeypatch.setattr(routes, "_is_messaging_session_record", lambda s: True)
monkeypatch.setattr(
routes,
"get_cli_session_messages",
lambda sid: [
{"role": "user", "content": "root user", "timestamp": 1.0},
{
"id": "cli-retry",
"role": "user",
"content": "retry the same request",
"timestamp": 2.0,
},
],
)
monkeypatch.setattr(routes, "_resolve_effective_session_model_for_display", lambda s: getattr(s, "model", None))
monkeypatch.setattr(routes, "_resolve_effective_session_model_provider_for_display", lambda s: None)
monkeypatch.setattr(routes, "_merge_cli_sidebar_metadata", lambda raw, meta: raw)
monkeypatch.setattr(routes, "redact_session_data", lambda raw: raw)
monkeypatch.setattr(routes, "j", lambda handler, payload, status=200: captured.setdefault("payload", payload))
class Handler:
pass
class Parsed:
path = "/api/session"
query = "session_id=tip"
routes.handle_get(Handler(), Parsed())
session = captured["payload"]["session"]
retry_messages = [m for m in session["messages"] if m.get("content") == "retry the same request"]
assert [m.get("id") for m in retry_messages] == ["cli-retry", "sidecar-retry"]
+100
View File
@@ -0,0 +1,100 @@
import json
import sqlite3
import subprocess
import sys
from pathlib import Path
from api.session_recovery import audit_session_recovery
REPO_ROOT = Path(__file__).resolve().parents[1]
def _write_session(session_dir, sid, messages=1):
path = session_dir / f"{sid}.json"
path.write_text(
json.dumps({"id": sid, "session_id": sid, "title": sid, "messages": [{"role": "user", "content": str(i)} for i in range(messages)]}),
encoding="utf-8",
)
return path
def _state_db(session_dir, *session_ids):
db = session_dir / "state.db"
with sqlite3.connect(db) as conn:
conn.execute("create table sessions (id text primary key)")
conn.executemany("insert into sessions (id) values (?)", [(sid,) for sid in session_ids])
return db
def test_audit_reports_repairable_orphan_backup_when_state_db_has_session(tmp_path):
sid = "abc123"
live = _write_session(tmp_path, sid, messages=3)
bak = tmp_path / f"{sid}.json.bak"
bak.write_text(live.read_text(encoding="utf-8"), encoding="utf-8")
live.unlink()
db = _state_db(tmp_path, sid)
report = audit_session_recovery(tmp_path, state_db_path=db)
assert report["status"] == "warn"
assert report["summary"]["repairable"] == 1
assert report["items"] == [
{
"session_id": sid,
"kind": "orphan_backup",
"category": "repairable",
"recommendation": "restore_from_bak",
"live_messages": -1,
"bak_messages": 3,
}
]
def test_audit_marks_orphan_backup_without_state_row_unsafe(tmp_path):
sid = "abc123"
live = _write_session(tmp_path, sid, messages=2)
bak = tmp_path / f"{sid}.json.bak"
bak.write_text(live.read_text(encoding="utf-8"), encoding="utf-8")
live.unlink()
db = _state_db(tmp_path, "different")
report = audit_session_recovery(tmp_path, state_db_path=db)
assert report["status"] == "needs_manual_review"
assert report["summary"]["unsafe_to_repair"] == 1
assert report["items"][0]["kind"] == "orphan_backup_without_state_row"
assert report["items"][0]["recommendation"] == "manual_review"
def test_audit_reports_index_drift(tmp_path):
sid = "abc123"
_write_session(tmp_path, sid, messages=1)
(tmp_path / "_index.json").write_text(
json.dumps([{"session_id": "missing", "message_count": 1}]),
encoding="utf-8",
)
report = audit_session_recovery(tmp_path)
kinds = {item["kind"] for item in report["items"]}
assert "index_missing_file" in kinds
assert "index_missing_entry" in kinds
assert report["summary"]["repairable"] == 2
def test_session_recovery_module_audit_cli_outputs_json(tmp_path):
sid = "abc123"
_write_session(tmp_path, sid, messages=1)
result = subprocess.run(
[sys.executable, "-m", "api.session_recovery", "--audit", "--session-dir", str(tmp_path)],
cwd=str(REPO_ROOT),
text=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
check=True,
)
payload = json.loads(result.stdout)
assert payload["status"] == "ok"
assert payload["summary"]["ok"] == 1