From 663817570cecfe5fdefa422cd047caf00cae22d9 Mon Sep 17 00:00:00 2001 From: ai-ag2026 <261867348+ai-ag2026@users.noreply.github.com> Date: Mon, 11 May 2026 02:03:37 +0200 Subject: [PATCH 1/5] fix: recover orphaned session backups on startup --- api/routes.py | 1 + api/session_recovery.py | 92 ++++++++++++++++++++++++--- server.py | 7 +- tests/test_metadata_save_wipe_1558.py | 65 +++++++++++++++++++ tests/test_regressions.py | 13 ++++ 5 files changed, 167 insertions(+), 11 deletions(-) diff --git a/api/routes.py b/api/routes.py index 33567d0d..015b37b6 100644 --- a/api/routes.py +++ b/api/routes.py @@ -4147,6 +4147,7 @@ def handle_post(handler, parsed) -> bool: return bad(handler, "Invalid session_id", 400) try: p.unlink(missing_ok=True) + p.with_suffix('.json.bak').unlink(missing_ok=True) except Exception: logger.debug("Failed to unlink session file %s", p) # Prune the per-session agent lock so deleted sessions don't leak diff --git a/api/session_recovery.py b/api/session_recovery.py index 9ae6d254..86fc16ac 100644 --- a/api/session_recovery.py +++ b/api/session_recovery.py @@ -5,13 +5,16 @@ data-loss bugs like #1558. ``Session.save()`` writes a ``.json.bak`` snapshot of the previous state whenever an incoming save would shrink the messages array. This module reads those snapshots back and restores any session whose live -file has fewer messages than its backup. +file has fewer messages than its backup, or whose live file is missing +while a valid backup remains. Three integration points: 1. ``recover_all_sessions_on_startup()`` — called from server.py at boot, scans the session dir, restores any session whose JSON has fewer - messages than its .bak. Idempotent: a clean run is a no-op. + messages than its .bak, and recreates a missing ``.json`` from an + orphaned ``.json.bak`` when the canonical state DB still has that + session. Idempotent: a clean run is a no-op. 2. ``recover_session(sid)`` — single-session helper backing the ``POST /api/session/recover`` endpoint, so users can re-run recovery @@ -25,6 +28,7 @@ from __future__ import annotations import json import logging import shutil +import sqlite3 from pathlib import Path logger = logging.getLogger(__name__) @@ -117,24 +121,81 @@ def recover_session(session_path: Path) -> dict: return {**status, "restored": True} -def recover_all_sessions_on_startup(session_dir: Path) -> dict: - """Scan session_dir for shrunken sessions, restore each from its .bak. +def _state_db_has_session(session_id: str, state_db_path: Path | None) -> bool: + """Return whether state.db still knows this session. - Returns {"scanned": N, "restored": M, "details": [...]}. + The check is deliberately fail-open: recovery must not be prevented by a + locked, absent, or older-schema state DB. When a DB is readable and has no + row, treat the orphan backup as a tombstoned/deleted session and skip it. + """ + if state_db_path is None or not state_db_path.exists(): + return True + try: + with sqlite3.connect(f"file:{state_db_path}?mode=ro", uri=True) as conn: + cur = conn.execute( + "select 1 from sqlite_master where type='table' and name='sessions'" + ) + if cur.fetchone() is None: + return True + cur = conn.execute("select 1 from sessions where id = ? limit 1", (session_id,)) + return cur.fetchone() is not None + except Exception as exc: + logger.debug("state_db session tombstone check failed for %s: %s", session_id, exc) + return True + + +def _orphaned_backup_live_paths( + session_dir: Path, + state_db_path: Path | None = None, +) -> list[Path]: + """Return live ``.json`` paths whose ``.json.bak`` exists. + + ``Path.glob('*.json')`` does not see orphan backups because their suffix is + ``.bak``. Existing startup recovery only handled shrunken live files; this + helper covers the crash shape where the live sidecar is gone but the rescue + copy remains. + """ + paths: list[Path] = [] + for bak_path in sorted(session_dir.glob('*.json.bak')): + live_path = bak_path.with_suffix('') + if live_path.name.startswith('_') or live_path.exists(): + continue + if _msg_count(bak_path) < 0: + continue + session_id = live_path.stem + if not _state_db_has_session(session_id, state_db_path): + logger.info( + "recover_all_sessions_on_startup: skipped orphan backup %s; " + "state.db has no live session row", + bak_path.name, + ) + continue + paths.append(live_path) + return paths + + +def recover_all_sessions_on_startup( + session_dir: Path, + rebuild_index: bool = False, + state_db_path: Path | None = None, +) -> dict: + """Scan session_dir for shrunken/orphaned sessions and restore from .bak. + + Returns {"scanned": N, "restored": M, "orphaned_backups": K, "details": [...]}. """ if not session_dir.exists(): - return {"scanned": 0, "restored": 0, "details": []} + return {"scanned": 0, "restored": 0, "orphaned_backups": 0, "details": []} scanned = 0 restored = 0 details: list[dict] = [] - for path in session_dir.glob('*.json'): + live_paths = [path for path in sorted(session_dir.glob('*.json')) if not path.name.startswith('_')] + orphan_paths = _orphaned_backup_live_paths(session_dir, state_db_path=state_db_path) + for path in [*live_paths, *orphan_paths]: # Skip non-session JSON files in the same dir: # - ``_index.json`` is a top-level list of session metadata # - any future non-session JSON marked with the ``_`` convention is # skipped automatically (project convention for system files in # directories that otherwise hold user data) - if path.name.startswith('_'): - continue scanned += 1 try: result = recover_session(path) @@ -155,4 +216,15 @@ def recover_all_sessions_on_startup(session_dir: Path) -> dict: "If you weren't expecting this, check the session list for missing " "messages — see #1558.", restored, scanned, ) - return {"scanned": scanned, "restored": restored, "details": details} + if rebuild_index: + try: + from api.models import _write_session_index + _write_session_index(updates=None) + except Exception as exc: + logger.warning("recover_all_sessions_on_startup: index rebuild failed: %s", exc) + return { + "scanned": scanned, + "restored": restored, + "orphaned_backups": len(orphan_paths), + "details": details, + } diff --git a/server.py b/server.py index 7e1563ac..bbaf1cb8 100644 --- a/server.py +++ b/server.py @@ -220,8 +220,13 @@ def main() -> None: # its .bak (the data-loss shape #1558 produced), restore from the .bak. # Safe to run unconditionally — a clean install is a no-op. try: + from api.models import _active_state_db_path from api.session_recovery import recover_all_sessions_on_startup - result = recover_all_sessions_on_startup(SESSION_DIR) + result = recover_all_sessions_on_startup( + SESSION_DIR, + rebuild_index=True, + state_db_path=_active_state_db_path(), + ) if result.get("restored"): print(f"[recovery] Restored {result['restored']}/{result['scanned']} sessions from .bak (see #1558).", flush=True) except Exception as exc: diff --git a/tests/test_metadata_save_wipe_1558.py b/tests/test_metadata_save_wipe_1558.py index 3cb5153d..ce1b76cc 100644 --- a/tests/test_metadata_save_wipe_1558.py +++ b/tests/test_metadata_save_wipe_1558.py @@ -204,6 +204,71 @@ def test_recover_all_sessions_on_startup_restores_shrunken_session(temp_session_ assert len(restored["messages"]) == 1000 +def test_recover_all_sessions_on_startup_restores_orphan_bak(temp_session_dir): + """Startup self-heal: if only .json.bak survived, recreate .json.""" + sid = _make_session_on_disk(temp_session_dir, n_msgs=293) + live_path = temp_session_dir / f"{sid}.json" + bak_path = temp_session_dir / f"{sid}.json.bak" + bak_path.write_text(live_path.read_text(encoding="utf-8"), encoding="utf-8") + live_path.unlink() + + from api.session_recovery import recover_all_sessions_on_startup + result = recover_all_sessions_on_startup(temp_session_dir) + + assert result["restored"] == 1 + assert result["scanned"] == 1 + assert result.get("orphaned_backups") == 1 + restored = json.loads(live_path.read_text(encoding="utf-8")) + assert len(restored["messages"]) == 293 + + +def test_recover_all_sessions_on_startup_rebuilds_index_after_orphan_restore(temp_session_dir, monkeypatch): + """A restored orphan must be visible through the WebUI session index immediately.""" + import api.models as _m + + sid = _make_session_on_disk(temp_session_dir, n_msgs=42) + live_path = temp_session_dir / f"{sid}.json" + bak_path = temp_session_dir / f"{sid}.json.bak" + bak_path.write_text(live_path.read_text(encoding="utf-8"), encoding="utf-8") + live_path.unlink() + + stale_index = temp_session_dir / "_index.json" + stale_index.write_text(json.dumps([]), encoding="utf-8") + monkeypatch.setattr(_m, "SESSION_INDEX_FILE", stale_index) + + from api.session_recovery import recover_all_sessions_on_startup + result = recover_all_sessions_on_startup(temp_session_dir, rebuild_index=True) + + assert result["restored"] == 1 + index = json.loads(stale_index.read_text(encoding="utf-8")) + assert [entry["session_id"] for entry in index] == [sid] + assert index[0]["message_count"] == 42 + + +def test_orphan_bak_recovery_skips_sessions_absent_from_state_db(temp_session_dir): + """Do not resurrect an explicitly deleted session when state.db lacks the row.""" + import sqlite3 + + sid = _make_session_on_disk(temp_session_dir, n_msgs=12) + live_path = temp_session_dir / f"{sid}.json" + bak_path = temp_session_dir / f"{sid}.json.bak" + bak_path.write_text(live_path.read_text(encoding="utf-8"), encoding="utf-8") + live_path.unlink() + + state_db = temp_session_dir / "state.db" + with sqlite3.connect(state_db) as conn: + conn.execute("create table sessions (id text primary key)") + conn.execute("insert into sessions (id) values (?)", ("different_session",)) + + from api.session_recovery import recover_all_sessions_on_startup + result = recover_all_sessions_on_startup(temp_session_dir, state_db_path=state_db) + + assert result["restored"] == 0 + assert result["scanned"] == 0 + assert result["orphaned_backups"] == 0 + assert not live_path.exists() + + def test_recover_all_sessions_on_startup_is_idempotent_no_op_on_clean_state(temp_session_dir): """A clean install (no .bak files) must not modify anything.""" sid = _make_session_on_disk(temp_session_dir, n_msgs=1000) diff --git a/tests/test_regressions.py b/tests/test_regressions.py index 0022bcb9..068afdcf 100644 --- a/tests/test_regressions.py +++ b/tests/test_regressions.py @@ -335,6 +335,19 @@ def test_server_delete_invalidates_index(cleanup_test_sessions): return assert False, "session/delete handler not found in server.py or api/routes.py" + +def test_server_delete_removes_session_bak_snapshot(cleanup_test_sessions): + """session/delete must remove sidecar backups so deleted sessions stay deleted.""" + routes_src = (REPO_ROOT / "api" / "routes.py").read_text() + delete_idx = max( + routes_src.find("if parsed.path == '/api/session/delete':"), + routes_src.find('if parsed.path == "/api/session/delete":'), + ) + assert delete_idx >= 0, "session/delete handler not found in api/routes.py" + delete_block = routes_src[delete_idx:delete_idx+1400] + assert "with_suffix('.json.bak').unlink" in delete_block or 'with_suffix(".json.bak").unlink' in delete_block, \ + "session/delete must unlink .json.bak to avoid later orphan-backup recovery" + # ── R9: Token/tool SSE events write to wrong session after switch ───────────── def test_token_handler_guards_session_id(cleanup_test_sessions): From 7b6d91d490d71eea851f01fec297981805d9ffa7 Mon Sep 17 00:00:00 2001 From: ai-ag2026 <261867348+ai-ag2026@users.noreply.github.com> Date: Mon, 11 May 2026 02:06:43 +0200 Subject: [PATCH 2/5] feat: add read-only session recovery audit --- api/session_recovery.py | 132 +++++++++++++++++++++++++++ tests/test_session_recovery_audit.py | 100 ++++++++++++++++++++ 2 files changed, 232 insertions(+) create mode 100644 tests/test_session_recovery_audit.py diff --git a/api/session_recovery.py b/api/session_recovery.py index 86fc16ac..65e772a7 100644 --- a/api/session_recovery.py +++ b/api/session_recovery.py @@ -25,6 +25,7 @@ Three integration points: """ from __future__ import annotations +import argparse import json import logging import shutil @@ -174,6 +175,120 @@ def _orphaned_backup_live_paths( return paths +def _new_audit_item( + session_id: str, + kind: str, + category: str, + recommendation: str, + live_messages: int = -1, + bak_messages: int = -1, +) -> dict: + return { + "session_id": session_id, + "kind": kind, + "category": category, + "recommendation": recommendation, + "live_messages": live_messages, + "bak_messages": bak_messages, + } + + +def _read_index_session_ids(index_path: Path) -> set[str]: + try: + data = json.loads(index_path.read_text(encoding='utf-8')) + except (OSError, json.JSONDecodeError, ValueError): + return set() + if not isinstance(data, list): + return set() + ids: set[str] = set() + for entry in data: + if isinstance(entry, dict) and isinstance(entry.get('session_id'), str): + ids.add(entry['session_id']) + return ids + + +def audit_session_recovery(session_dir: Path, state_db_path: Path | None = None) -> dict: + """Read-only audit of session recovery state. + + The audit intentionally does not mutate files. It classifies only the safe + recovery primitives this module knows how to perform: backup restores and + derived index rebuilds. Call ``recover_all_sessions_on_startup`` separately + for safe repairs. + """ + if not session_dir.exists(): + return { + "status": "ok", + "summary": {"ok": 0, "repairable": 0, "unsafe_to_repair": 0}, + "items": [], + } + + items: list[dict] = [] + live_paths = sorted(p for p in session_dir.glob('*.json') if not p.name.startswith('_')) + live_ids = {p.stem for p in live_paths} + + for live_path in live_paths: + status = inspect_session_recovery_status(live_path) + if status.get('recommend') == 'restore': + items.append(_new_audit_item( + status['session_id'], + "shrunken_live", + "repairable", + "restore_from_bak", + status.get('live_messages', -1), + status.get('bak_messages', -1), + )) + + for bak_path in sorted(session_dir.glob('*.json.bak')): + live_path = bak_path.with_suffix('') + if live_path.exists() or live_path.name.startswith('_'): + continue + bak_messages = _msg_count(bak_path) + session_id = live_path.stem + if bak_messages < 0: + items.append(_new_audit_item( + session_id, "malformed_orphan_backup", "unsafe_to_repair", "manual_review", -1, bak_messages + )) + elif _state_db_has_session(session_id, state_db_path): + items.append(_new_audit_item( + session_id, "orphan_backup", "repairable", "restore_from_bak", -1, bak_messages + )) + else: + items.append(_new_audit_item( + session_id, + "orphan_backup_without_state_row", + "unsafe_to_repair", + "manual_review", + -1, + bak_messages, + )) + + index_path = session_dir / '_index.json' + if index_path.exists(): + index_ids = _read_index_session_ids(index_path) + for session_id in sorted(index_ids - live_ids): + items.append(_new_audit_item( + session_id, "index_missing_file", "repairable", "rebuild_index" + )) + for session_id in sorted(live_ids - index_ids): + items.append(_new_audit_item( + session_id, "index_missing_entry", "repairable", "rebuild_index", + _msg_count(session_dir / f"{session_id}.json"), -1, + )) + + summary = {"ok": len(live_paths), "repairable": 0, "unsafe_to_repair": 0} + for item in items: + category = item.get('category') + if category in summary: + summary[category] += 1 + if summary["unsafe_to_repair"]: + overall = "needs_manual_review" + elif summary["repairable"]: + overall = "warn" + else: + overall = "ok" + return {"status": overall, "summary": summary, "items": items} + + def recover_all_sessions_on_startup( session_dir: Path, rebuild_index: bool = False, @@ -228,3 +343,20 @@ def recover_all_sessions_on_startup( "orphaned_backups": len(orphan_paths), "details": details, } + + +def _main() -> int: + parser = argparse.ArgumentParser(description="Audit Hermes WebUI session recovery state") + parser.add_argument("--audit", action="store_true", help="run a read-only recovery audit") + parser.add_argument("--session-dir", type=Path, required=True, help="path to WebUI sessions directory") + parser.add_argument("--state-db", type=Path, default=None, help="optional Hermes state.db path") + args = parser.parse_args() + if not args.audit: + parser.error("currently only --audit is supported") + report = audit_session_recovery(args.session_dir, state_db_path=args.state_db) + print(json.dumps(report, sort_keys=True)) + return 0 + + +if __name__ == "__main__": + raise SystemExit(_main()) diff --git a/tests/test_session_recovery_audit.py b/tests/test_session_recovery_audit.py new file mode 100644 index 00000000..dc9ad49b --- /dev/null +++ b/tests/test_session_recovery_audit.py @@ -0,0 +1,100 @@ +import json +import sqlite3 +import subprocess +import sys +from pathlib import Path + +from api.session_recovery import audit_session_recovery + +REPO_ROOT = Path(__file__).resolve().parents[1] + + +def _write_session(session_dir, sid, messages=1): + path = session_dir / f"{sid}.json" + path.write_text( + json.dumps({"id": sid, "session_id": sid, "title": sid, "messages": [{"role": "user", "content": str(i)} for i in range(messages)]}), + encoding="utf-8", + ) + return path + + +def _state_db(session_dir, *session_ids): + db = session_dir / "state.db" + with sqlite3.connect(db) as conn: + conn.execute("create table sessions (id text primary key)") + conn.executemany("insert into sessions (id) values (?)", [(sid,) for sid in session_ids]) + return db + + +def test_audit_reports_repairable_orphan_backup_when_state_db_has_session(tmp_path): + sid = "abc123" + live = _write_session(tmp_path, sid, messages=3) + bak = tmp_path / f"{sid}.json.bak" + bak.write_text(live.read_text(encoding="utf-8"), encoding="utf-8") + live.unlink() + db = _state_db(tmp_path, sid) + + report = audit_session_recovery(tmp_path, state_db_path=db) + + assert report["status"] == "warn" + assert report["summary"]["repairable"] == 1 + assert report["items"] == [ + { + "session_id": sid, + "kind": "orphan_backup", + "category": "repairable", + "recommendation": "restore_from_bak", + "live_messages": -1, + "bak_messages": 3, + } + ] + + +def test_audit_marks_orphan_backup_without_state_row_unsafe(tmp_path): + sid = "abc123" + live = _write_session(tmp_path, sid, messages=2) + bak = tmp_path / f"{sid}.json.bak" + bak.write_text(live.read_text(encoding="utf-8"), encoding="utf-8") + live.unlink() + db = _state_db(tmp_path, "different") + + report = audit_session_recovery(tmp_path, state_db_path=db) + + assert report["status"] == "needs_manual_review" + assert report["summary"]["unsafe_to_repair"] == 1 + assert report["items"][0]["kind"] == "orphan_backup_without_state_row" + assert report["items"][0]["recommendation"] == "manual_review" + + +def test_audit_reports_index_drift(tmp_path): + sid = "abc123" + _write_session(tmp_path, sid, messages=1) + (tmp_path / "_index.json").write_text( + json.dumps([{"session_id": "missing", "message_count": 1}]), + encoding="utf-8", + ) + + report = audit_session_recovery(tmp_path) + kinds = {item["kind"] for item in report["items"]} + + assert "index_missing_file" in kinds + assert "index_missing_entry" in kinds + assert report["summary"]["repairable"] == 2 + + +def test_session_recovery_module_audit_cli_outputs_json(tmp_path): + sid = "abc123" + _write_session(tmp_path, sid, messages=1) + + result = subprocess.run( + [sys.executable, "-m", "api.session_recovery", "--audit", "--session-dir", str(tmp_path)], + cwd=str(REPO_ROOT), + text=True, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + check=True, + ) + + payload = json.loads(result.stdout) + assert payload["status"] == "ok" + assert payload["summary"]["ok"] == 1 From 642249747fd7b07a050b9fcc6959ff58a7c78311 Mon Sep 17 00:00:00 2001 From: Frank Song Date: Mon, 11 May 2026 08:14:50 +0800 Subject: [PATCH 3/5] Fix session message identity dedup --- CHANGELOG.md | 6 ++ api/routes.py | 19 ++++-- tests/test_session_lineage_full_transcript.py | 64 +++++++++++++++++++ 3 files changed, 82 insertions(+), 7 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index c75a2334..5e0e2ad0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,11 @@ # Hermes Web UI -- Changelog +## [Unreleased] + +### Fixed + +- **fix(sessions): preserve distinct retried messages when merging transcripts** ([#2027](https://github.com/nesquena/hermes-webui/issues/2027)). Messaging session transcript merges now use `id`/`message_id` when present before falling back to the legacy role/content/timestamp/tool metadata key, so repeated turns with identical visible text are not silently collapsed. + ## [v0.51.39] — 2026-05-10 — Release O (4-PR contributor batch — Railway docker fix + Stop-button race + provider resolver + live context tracking) ### Fixed diff --git a/api/routes.py b/api/routes.py index 33567d0d..47e8a15e 100644 --- a/api/routes.py +++ b/api/routes.py @@ -3040,13 +3040,18 @@ def handle_get(handler, parsed) -> bool: str(m.get("role") or ""), str(m.get("content") or ""), )): - key = ( - str(msg.get("role") or ""), - str(msg.get("content") or ""), - str(msg.get("timestamp") or ""), - str(msg.get("tool_call_id") or ""), - str(msg.get("tool_name") or msg.get("name") or ""), - ) + message_identity = msg.get("id") or msg.get("message_id") + if message_identity: + key = ("message_id", str(message_identity)) + else: + key = ( + "legacy", + str(msg.get("role") or ""), + str(msg.get("content") or ""), + str(msg.get("timestamp") or ""), + str(msg.get("tool_call_id") or ""), + str(msg.get("tool_name") or msg.get("name") or ""), + ) if key in seen_message_keys: continue seen_message_keys.add(key) diff --git a/tests/test_session_lineage_full_transcript.py b/tests/test_session_lineage_full_transcript.py index 7efc6d18..63cdd203 100644 --- a/tests/test_session_lineage_full_transcript.py +++ b/tests/test_session_lineage_full_transcript.py @@ -59,3 +59,67 @@ def test_session_endpoint_merges_sidecar_and_lineage_messages_for_cli_sessions(m "tip assistant", "sidecar tail", ] + + +def test_session_endpoint_preserves_distinct_messages_with_different_ids(monkeypatch): + class DummySession: + def __init__(self): + self.messages = [ + { + "id": "sidecar-retry", + "role": "user", + "content": "retry the same request", + "timestamp": 2.0, + } + ] + self.tool_calls = [] + self.active_stream_id = None + self.pending_user_message = None + self.pending_attachments = [] + self.pending_started_at = None + self.context_length = 0 + self.threshold_tokens = 0 + self.last_prompt_tokens = 0 + self.model = "openai/gpt-5" + self.session_id = "tip" + + def compact(self): + return {"session_id": "tip", "title": "Tip", "model": "openai/gpt-5"} + + captured = {} + + monkeypatch.setattr(routes, "get_session", lambda sid, metadata_only=False: DummySession()) + monkeypatch.setattr(routes, "_clear_stale_stream_state", lambda s: None) + monkeypatch.setattr(routes, "_lookup_cli_session_metadata", lambda sid: {"session_source": "messaging"}) + monkeypatch.setattr(routes, "_is_messaging_session_record", lambda s: True) + monkeypatch.setattr( + routes, + "get_cli_session_messages", + lambda sid: [ + {"role": "user", "content": "root user", "timestamp": 1.0}, + { + "id": "cli-retry", + "role": "user", + "content": "retry the same request", + "timestamp": 2.0, + }, + ], + ) + monkeypatch.setattr(routes, "_resolve_effective_session_model_for_display", lambda s: getattr(s, "model", None)) + monkeypatch.setattr(routes, "_resolve_effective_session_model_provider_for_display", lambda s: None) + monkeypatch.setattr(routes, "_merge_cli_sidebar_metadata", lambda raw, meta: raw) + monkeypatch.setattr(routes, "redact_session_data", lambda raw: raw) + monkeypatch.setattr(routes, "j", lambda handler, payload, status=200: captured.setdefault("payload", payload)) + + class Handler: + pass + + class Parsed: + path = "/api/session" + query = "session_id=tip" + + routes.handle_get(Handler(), Parsed()) + + session = captured["payload"]["session"] + retry_messages = [m for m in session["messages"] if m.get("content") == "retry the same request"] + assert [m.get("id") for m in retry_messages] == ["cli-retry", "sidecar-retry"] From 2ead7daa2fe29e99696c4786123ce2a8dd646bcf Mon Sep 17 00:00:00 2001 From: ai-ag2026 <261867348+ai-ag2026@users.noreply.github.com> Date: Mon, 11 May 2026 02:15:00 +0200 Subject: [PATCH 4/5] fix: expose active run lifecycle in health --- api/config.py | 40 ++++++++++++++++++++++++ api/routes.py | 41 ++++++++++++++++++++++++ api/streaming.py | 14 +++++++++ tests/test_run_lifecycle_health.py | 50 ++++++++++++++++++++++++++++++ 4 files changed, 145 insertions(+) create mode 100644 tests/test_run_lifecycle_health.py diff --git a/api/config.py b/api/config.py index 5a592dea..d2675e69 100644 --- a/api/config.py +++ b/api/config.py @@ -3681,8 +3681,48 @@ STREAM_REASONING_TEXT: dict = {} # stream_id -> reasoning trace accumulated dur STREAM_LIVE_TOOL_CALLS: dict = {} # stream_id -> live tool calls accumulated during streaming (#1361 §B) STREAM_GOAL_RELATED: dict = {} # stream_id -> bool: only evaluate goal for goal-related turns (#1932) PENDING_GOAL_CONTINUATION: set = set() # session_ids awaiting a goal continuation turn (#1932) + +# Active agent-run registry. This intentionally tracks worker lifecycle rather +# than SSE lifecycle: cancel/reconnect may remove STREAMS while the worker is +# still unwinding, blocked in a provider call, or waiting for delegated work. +ACTIVE_RUNS: dict = {} +ACTIVE_RUNS_LOCK = threading.Lock() +LAST_RUN_FINISHED_AT: float | None = None SERVER_START_TIME = time.time() + +def register_active_run(stream_id: str, **metadata) -> None: + """Mark a WebUI agent worker as alive until its outer finally exits.""" + if not stream_id: + return + now = time.time() + entry = dict(metadata or {}) + entry.setdefault("stream_id", stream_id) + entry.setdefault("started_at", now) + entry.setdefault("phase", "running") + with ACTIVE_RUNS_LOCK: + ACTIVE_RUNS[stream_id] = entry + + +def update_active_run(stream_id: str, **metadata) -> None: + """Update active-run metadata without creating a new run implicitly.""" + if not stream_id: + return + with ACTIVE_RUNS_LOCK: + entry = ACTIVE_RUNS.get(stream_id) + if entry is not None: + entry.update(metadata) + + +def unregister_active_run(stream_id: str) -> None: + """Remove a worker from the active-run registry and record idle start.""" + if not stream_id: + return + global LAST_RUN_FINISHED_AT + with ACTIVE_RUNS_LOCK: + ACTIVE_RUNS.pop(stream_id, None) + LAST_RUN_FINISHED_AT = time.time() + # Agent cache: reuse AIAgent across messages in the same WebUI session so that # _user_turn_count survives between turns. This mirrors the gateway's # _agent_cache pattern and is required for injectionFrequency: "first-turn". diff --git a/api/routes.py b/api/routes.py index 33567d0d..926250fc 100644 --- a/api/routes.py +++ b/api/routes.py @@ -2529,6 +2529,39 @@ def _streams_lock_health(timeout_seconds: float = 0.5) -> dict: STREAMS_LOCK.release() +def _run_lifecycle_health() -> dict: + """Return active worker-run state independent of SSE stream presence.""" + # Import the module rather than relying only on imported scalar aliases so + # LAST_RUN_FINISHED_AT stays fresh after unregister_active_run() updates it. + from api import config as _live_config + + now = time.time() + with _live_config.ACTIVE_RUNS_LOCK: + runs = [] + for stream_id, raw in (_live_config.ACTIVE_RUNS or {}).items(): + item = dict(raw or {}) + started_at = item.get("started_at") + try: + age = max(0.0, now - float(started_at)) + except Exception: + age = 0.0 + item.setdefault("stream_id", stream_id) + item["age_seconds"] = round(age, 1) + runs.append(item) + last_finished = _live_config.LAST_RUN_FINISHED_AT + runs.sort(key=lambda item: float(item.get("started_at") or 0.0)) + payload = { + "active_runs": len(runs), + "runs": runs, + "last_run_finished_at": last_finished, + } + if runs: + payload["oldest_run_age_seconds"] = runs[0].get("age_seconds", 0.0) + elif last_finished: + payload["idle_seconds_since_last_run"] = round(max(0.0, now - float(last_finished)), 1) + return payload + + def _deep_health_checks(stream_check: dict | None = None) -> tuple[dict, bool]: """Run cheap probes that exercise the state paths used by the UI shell. @@ -2609,13 +2642,21 @@ def _deep_health_checks(stream_check: dict | None = None) -> tuple[dict, bool]: def _handle_health(handler, parsed): deep = parse_qs(parsed.query or "").get("deep", [""])[0].lower() in {"1", "true", "yes", "on"} stream_check = _streams_lock_health() + run_check = _run_lifecycle_health() payload = { "status": "ok" if stream_check.get("status") == "ok" else "degraded", "sessions": len(SESSIONS), "active_streams": int(stream_check.get("active_streams") or 0), + "active_runs": int(run_check.get("active_runs") or 0), + "runs": run_check.get("runs", []), + "last_run_finished_at": run_check.get("last_run_finished_at"), "uptime_seconds": round(time.time() - SERVER_START_TIME, 1), "accept_loop": _accept_loop_health(handler), } + if "oldest_run_age_seconds" in run_check: + payload["oldest_run_age_seconds"] = run_check["oldest_run_age_seconds"] + if "idle_seconds_since_last_run" in run_check: + payload["idle_seconds_since_last_run"] = run_check["idle_seconds_since_last_run"] if deep: if stream_check.get("status") != "ok": payload["checks"] = {"streams_lock": stream_check} diff --git a/api/streaming.py b/api/streaming.py index 7968c6ed..8e231041 100644 --- a/api/streaming.py +++ b/api/streaming.py @@ -26,6 +26,7 @@ from api.config import ( STREAM_GOAL_RELATED, PENDING_GOAL_CONTINUATION, LOCK, SESSIONS, SESSION_DIR, _get_session_agent_lock, _set_thread_env, _clear_thread_env, + register_active_run, update_active_run, unregister_active_run, SESSION_AGENT_LOCKS, SESSION_AGENT_LOCKS_LOCK, resolve_model_provider, resolve_custom_provider_connection, @@ -2006,6 +2007,16 @@ def _run_agent_streaming( q = STREAMS.get(stream_id) if q is None: return + register_active_run( + stream_id, + session_id=session_id, + started_at=time.time(), + phase="starting", + workspace=str(workspace), + model=model, + provider=model_provider, + ephemeral=bool(ephemeral), + ) s = None _rt = {} old_cwd = None @@ -2187,6 +2198,7 @@ def _run_agent_streaming( _agent_lock = None try: s = get_session(session_id) + update_active_run(stream_id, phase="running", session_id=session_id) s.workspace = str(Path(workspace).expanduser().resolve()) s.model = model provider_context = ( @@ -3882,6 +3894,7 @@ def _run_agent_streaming( if (s is not None and getattr(s, 'active_stream_id', None) == stream_id and getattr(s, 'pending_user_message', None)): + update_active_run(stream_id, phase="finalizing") _last_resort_sync_from_core(s, stream_id, _agent_lock) _clear_thread_env() # TD1: always clear thread-local context with STREAMS_LOCK: @@ -3892,6 +3905,7 @@ def _run_agent_streaming( STREAM_REASONING_TEXT.pop(stream_id, None) # Clean up reasoning trace (#1361 §A) STREAM_LIVE_TOOL_CALLS.pop(stream_id, None) # Clean up tool calls (#1361 §B) STREAM_GOAL_RELATED.pop(stream_id, None) # Clean up goal-related flag (#1932) + unregister_active_run(stream_id) # NOTE: do NOT discard PENDING_GOAL_CONTINUATION here. The marker # is set by goal_continue (line ~3328) inside the SAME function # call and consumed atomically by `_start_chat_stream_for_session` diff --git a/tests/test_run_lifecycle_health.py b/tests/test_run_lifecycle_health.py new file mode 100644 index 00000000..8913ade7 --- /dev/null +++ b/tests/test_run_lifecycle_health.py @@ -0,0 +1,50 @@ +"""Regression coverage for restart-safety run lifecycle reporting.""" + +import time + + +def test_health_counts_active_runs_even_when_no_sse_streams(): + """A worker run can outlive its SSE channel; health must expose the run.""" + from api import config, routes + + with config.STREAMS_LOCK: + config.STREAMS.clear() + with config.ACTIVE_RUNS_LOCK: + config.ACTIVE_RUNS.clear() + config.ACTIVE_RUNS["stream-1"] = { + "stream_id": "stream-1", + "session_id": "session-1", + "started_at": time.time() - 42, + "phase": "running", + } + + try: + stream_check = routes._streams_lock_health() + run_check = routes._run_lifecycle_health() + + assert stream_check["active_streams"] == 0 + assert run_check["active_runs"] == 1 + assert run_check["oldest_run_age_seconds"] >= 40 + assert run_check["runs"][0]["session_id"] == "session-1" + finally: + with config.ACTIVE_RUNS_LOCK: + config.ACTIVE_RUNS.clear() + + +def test_run_registry_unregister_records_last_finished_time(): + """Guards need a grace window after the last real worker exits.""" + from api import config + + with config.ACTIVE_RUNS_LOCK: + config.ACTIVE_RUNS.clear() + config.LAST_RUN_FINISHED_AT = None + + config.register_active_run("stream-2", session_id="session-2", phase="starting") + with config.ACTIVE_RUNS_LOCK: + assert "stream-2" in config.ACTIVE_RUNS + + config.unregister_active_run("stream-2") + + with config.ACTIVE_RUNS_LOCK: + assert "stream-2" not in config.ACTIVE_RUNS + assert isinstance(config.LAST_RUN_FINISHED_AT, float) From 4bbed44b214bc446b1027d94928e324461df1e05 Mon Sep 17 00:00:00 2001 From: nesquena-hermes Date: Mon, 11 May 2026 00:43:59 +0000 Subject: [PATCH 5/5] docs: CHANGELOG v0.51.41 Release Q --- CHANGELOG.md | 27 ++++++++++++++++++++++++++- 1 file changed, 26 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 234cdcf0..119df2f4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,9 +2,34 @@ ## [Unreleased] +## [v0.51.41] — 2026-05-11 — Release Q (3-PR contributor batch — session recovery audit + run-lifecycle health + transcript dedup) + ### Fixed -- **fix(sessions): preserve distinct retried messages when merging transcripts** ([#2027](https://github.com/nesquena/hermes-webui/issues/2027)). Messaging session transcript merges now use `id`/`message_id` when present before falling back to the legacy role/content/timestamp/tool metadata key, so repeated turns with identical visible text are not silently collapsed. +- **PR #2035** by @ai-ag2026 — Recover orphaned `.json.bak` snapshots on startup (extends #1558 P0 fix). The existing post-#1558 recovery path only scanned `*.json`, so a crash that left only the `.bak` snapshot meant data was on disk but invisible to `/api/sessions` and the sidebar. Now the startup self-heal looks up the orphan `sid` in `state.db.sessions`; if the row exists, the snapshot is restored, the session index rebuilt, and the live sidecar appears again. If `state.db` lacks the row (explicit tombstone), the orphan is left alone. Companion change in `api/routes.py` unlinks `.json.bak` on explicit delete so intentional deletes don't get resurrected later. Fail-open on `state.db` unreadable/locked/older-schema — recovery stays best-effort. + +- **PR #2036** by @ai-ag2026 — Read-only `audit_session_recovery()` report + module CLI (`python -m api.session_recovery --audit --session-dir [--state-db ]`). Classifies shrunken live sidecars, orphan backups, orphans without a `state.db` row, and stale `_index.json` entries. Pure read-only audit — no writes, no rebuilds, no restores. Outputs machine-readable JSON. Stacked on #2035 (and auto-closed it). + +- **PR #2038** by @franksong2702 — Closed the message-identity dedup gap in `/api/session` messaging transcript merges (closes #2027). The dedup key now prefers `id`/`message_id` when message identity is available; legacy role/content/timestamp/tool-metadata key remains as fallback for messages without IDs. Prevents silent loss of legitimate retries (rare but high-impact when it hits). + +### Added + +- **PR #2039** by @ai-ag2026 — Active-run lifecycle visibility in `/health`. SSE `active_streams` only describes channel state; a worker can outlive its SSE stream while unwinding, blocked in a provider call, handling cancellation, or waiting on delegated work. Adds `active_runs`, per-run metadata/age, `oldest_run_age_seconds`, `last_run_finished_at`, and idle grace timing. Restart/update guards now have visibility into worker lifecycle, not just SSE channel state. Worker lifecycle wired through `_register_run` / `_update_run` / `_unregister_run` in streaming. + +### Tests + +5100 → **5108 passing, 0 regressions** (+8 net new across new test files for session-recovery audit, run-lifecycle health, transcript dedup, and orphan-backup recovery). Full suite ~160s on Python 3.11 with `HERMES_HOME` isolation. + +### Notes + +- 3 PRs from 2 different authors (#2035 stacked under #2036 — auto-closed when #2036 merged). +- `api/routes.py` was touched by all three PRs with disjoint hunks (#2039 at lines 2529/2609, #2038 at 3040, #2036 at 4147). +- `CHANGELOG.md` was the only true conflict (`#2038` predates v0.51.40 release entry); resolved by preserving v0.51.40 history and re-adding the #2038 bullet under [Unreleased] before promoting. + +### Follow-ups + +- Test isolation: at least one test in `test_update_banner_fixes.py` or `test_updates.py` triggers a real `os.execv` that re-executes the entire pytest suite. Suite still passes (~5108 each loop) but full run takes 4× the time. Worth a targeted fix in the next maintenance batch. + ## [v0.51.40] — 2026-05-11 — Release P (4-PR contributor batch — quota subprocess hardening + env-lock prewarm + cron one-shot warning + Xiaomi env key)