mirror of
https://github.com/nesquena/hermes-webui.git
synced 2026-05-25 11:10:18 +00:00
363 lines
13 KiB
Python
363 lines
13 KiB
Python
"""
|
|
Session recovery from .bak snapshots — last line of defense against
|
|
data-loss bugs like #1558.
|
|
|
|
``Session.save()`` writes a ``<sid>.json.bak`` snapshot of the previous
|
|
state whenever an incoming save would shrink the messages array. This
|
|
module reads those snapshots back and restores any session whose live
|
|
file has fewer messages than its backup, or whose live file is missing
|
|
while a valid backup remains.
|
|
|
|
Three integration points:
|
|
|
|
1. ``recover_all_sessions_on_startup()`` — called from server.py at boot,
|
|
scans the session dir, restores any session whose JSON has fewer
|
|
messages than its .bak, and recreates a missing ``<sid>.json`` from an
|
|
orphaned ``<sid>.json.bak`` when the canonical state DB still has that
|
|
session. Idempotent: a clean run is a no-op.
|
|
|
|
2. ``recover_session(sid)`` — single-session helper backing the
|
|
``POST /api/session/recover`` endpoint, so users can re-run recovery
|
|
manually if their session was open through a server restart.
|
|
|
|
3. ``inspect_session_recovery_status(sid)`` — read-only audit returning
|
|
message counts for the live JSON, the .bak, and a recommendation.
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import json
|
|
import logging
|
|
import shutil
|
|
import sqlite3
|
|
from pathlib import Path
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
def _msg_count(p: Path) -> int:
|
|
"""Return the number of messages in a session JSON file, or -1 on read/parse error.
|
|
|
|
Returns -1 for any non-session-shape file:
|
|
- File can't be read (OSError)
|
|
- Top-level isn't valid JSON or is invalid (JSONDecodeError, ValueError)
|
|
- Top-level isn't a dict (AttributeError on .get) — e.g. ``_index.json``
|
|
which is a top-level list of session metadata, not a session itself.
|
|
The startup recovery scanner globs ``*.json`` and would otherwise
|
|
crash on the first non-dict file it encounters.
|
|
"""
|
|
try:
|
|
data = json.loads(p.read_text(encoding='utf-8'))
|
|
except (OSError, json.JSONDecodeError, ValueError):
|
|
return -1
|
|
if not isinstance(data, dict):
|
|
return -1
|
|
msgs = data.get('messages')
|
|
return len(msgs) if isinstance(msgs, list) else -1
|
|
|
|
|
|
def inspect_session_recovery_status(session_path: Path) -> dict:
|
|
"""Return a status dict describing whether recovery is recommended.
|
|
|
|
{
|
|
"session_id": "...",
|
|
"live_messages": int, # -1 if live file unreadable
|
|
"bak_messages": int, # -1 if no .bak or unreadable
|
|
"recommend": "restore" | "no_action" | "no_backup",
|
|
}
|
|
"""
|
|
bak_path = session_path.with_suffix('.json.bak')
|
|
live_count = _msg_count(session_path)
|
|
if not bak_path.exists():
|
|
return {
|
|
"session_id": session_path.stem,
|
|
"live_messages": live_count,
|
|
"bak_messages": -1,
|
|
"recommend": "no_backup",
|
|
}
|
|
bak_count = _msg_count(bak_path)
|
|
if bak_count > live_count:
|
|
return {
|
|
"session_id": session_path.stem,
|
|
"live_messages": live_count,
|
|
"bak_messages": bak_count,
|
|
"recommend": "restore",
|
|
}
|
|
return {
|
|
"session_id": session_path.stem,
|
|
"live_messages": live_count,
|
|
"bak_messages": bak_count,
|
|
"recommend": "no_action",
|
|
}
|
|
|
|
|
|
def recover_session(session_path: Path) -> dict:
|
|
"""Restore session_path from its .bak when the bak has more messages.
|
|
|
|
Returns a status dict identical to ``inspect_session_recovery_status``
|
|
plus a "restored" boolean.
|
|
"""
|
|
status = inspect_session_recovery_status(session_path)
|
|
if status["recommend"] != "restore":
|
|
return {**status, "restored": False}
|
|
bak_path = session_path.with_suffix('.json.bak')
|
|
# Stage the recovery via a tmp copy + atomic replace so a crash mid-restore
|
|
# cannot leave a half-written session.json.
|
|
tmp_path = session_path.with_suffix('.json.recover.tmp')
|
|
try:
|
|
shutil.copyfile(bak_path, tmp_path)
|
|
tmp_path.replace(session_path)
|
|
except OSError as exc:
|
|
logger.warning("recover_session: copy failed for %s: %s", session_path, exc)
|
|
try:
|
|
tmp_path.unlink(missing_ok=True)
|
|
except OSError:
|
|
pass
|
|
return {**status, "restored": False, "error": str(exc)}
|
|
logger.warning(
|
|
"recover_session: restored %s from .bak (live=%d → bak=%d messages). "
|
|
"See #1558 for the data-loss class this guards against.",
|
|
session_path.name, status["live_messages"], status["bak_messages"],
|
|
)
|
|
return {**status, "restored": True}
|
|
|
|
|
|
def _state_db_has_session(session_id: str, state_db_path: Path | None) -> bool:
|
|
"""Return whether state.db still knows this session.
|
|
|
|
The check is deliberately fail-open: recovery must not be prevented by a
|
|
locked, absent, or older-schema state DB. When a DB is readable and has no
|
|
row, treat the orphan backup as a tombstoned/deleted session and skip it.
|
|
"""
|
|
if state_db_path is None or not state_db_path.exists():
|
|
return True
|
|
try:
|
|
with sqlite3.connect(f"file:{state_db_path}?mode=ro", uri=True) as conn:
|
|
cur = conn.execute(
|
|
"select 1 from sqlite_master where type='table' and name='sessions'"
|
|
)
|
|
if cur.fetchone() is None:
|
|
return True
|
|
cur = conn.execute("select 1 from sessions where id = ? limit 1", (session_id,))
|
|
return cur.fetchone() is not None
|
|
except Exception as exc:
|
|
logger.debug("state_db session tombstone check failed for %s: %s", session_id, exc)
|
|
return True
|
|
|
|
|
|
def _orphaned_backup_live_paths(
|
|
session_dir: Path,
|
|
state_db_path: Path | None = None,
|
|
) -> list[Path]:
|
|
"""Return live ``<sid>.json`` paths whose ``<sid>.json.bak`` exists.
|
|
|
|
``Path.glob('*.json')`` does not see orphan backups because their suffix is
|
|
``.bak``. Existing startup recovery only handled shrunken live files; this
|
|
helper covers the crash shape where the live sidecar is gone but the rescue
|
|
copy remains.
|
|
"""
|
|
paths: list[Path] = []
|
|
for bak_path in sorted(session_dir.glob('*.json.bak')):
|
|
live_path = bak_path.with_suffix('')
|
|
if live_path.name.startswith('_') or live_path.exists():
|
|
continue
|
|
if _msg_count(bak_path) < 0:
|
|
continue
|
|
session_id = live_path.stem
|
|
if not _state_db_has_session(session_id, state_db_path):
|
|
logger.info(
|
|
"recover_all_sessions_on_startup: skipped orphan backup %s; "
|
|
"state.db has no live session row",
|
|
bak_path.name,
|
|
)
|
|
continue
|
|
paths.append(live_path)
|
|
return paths
|
|
|
|
|
|
def _new_audit_item(
|
|
session_id: str,
|
|
kind: str,
|
|
category: str,
|
|
recommendation: str,
|
|
live_messages: int = -1,
|
|
bak_messages: int = -1,
|
|
) -> dict:
|
|
return {
|
|
"session_id": session_id,
|
|
"kind": kind,
|
|
"category": category,
|
|
"recommendation": recommendation,
|
|
"live_messages": live_messages,
|
|
"bak_messages": bak_messages,
|
|
}
|
|
|
|
|
|
def _read_index_session_ids(index_path: Path) -> set[str]:
|
|
try:
|
|
data = json.loads(index_path.read_text(encoding='utf-8'))
|
|
except (OSError, json.JSONDecodeError, ValueError):
|
|
return set()
|
|
if not isinstance(data, list):
|
|
return set()
|
|
ids: set[str] = set()
|
|
for entry in data:
|
|
if isinstance(entry, dict) and isinstance(entry.get('session_id'), str):
|
|
ids.add(entry['session_id'])
|
|
return ids
|
|
|
|
|
|
def audit_session_recovery(session_dir: Path, state_db_path: Path | None = None) -> dict:
|
|
"""Read-only audit of session recovery state.
|
|
|
|
The audit intentionally does not mutate files. It classifies only the safe
|
|
recovery primitives this module knows how to perform: backup restores and
|
|
derived index rebuilds. Call ``recover_all_sessions_on_startup`` separately
|
|
for safe repairs.
|
|
"""
|
|
if not session_dir.exists():
|
|
return {
|
|
"status": "ok",
|
|
"summary": {"ok": 0, "repairable": 0, "unsafe_to_repair": 0},
|
|
"items": [],
|
|
}
|
|
|
|
items: list[dict] = []
|
|
live_paths = sorted(p for p in session_dir.glob('*.json') if not p.name.startswith('_'))
|
|
live_ids = {p.stem for p in live_paths}
|
|
|
|
for live_path in live_paths:
|
|
status = inspect_session_recovery_status(live_path)
|
|
if status.get('recommend') == 'restore':
|
|
items.append(_new_audit_item(
|
|
status['session_id'],
|
|
"shrunken_live",
|
|
"repairable",
|
|
"restore_from_bak",
|
|
status.get('live_messages', -1),
|
|
status.get('bak_messages', -1),
|
|
))
|
|
|
|
for bak_path in sorted(session_dir.glob('*.json.bak')):
|
|
live_path = bak_path.with_suffix('')
|
|
if live_path.exists() or live_path.name.startswith('_'):
|
|
continue
|
|
bak_messages = _msg_count(bak_path)
|
|
session_id = live_path.stem
|
|
if bak_messages < 0:
|
|
items.append(_new_audit_item(
|
|
session_id, "malformed_orphan_backup", "unsafe_to_repair", "manual_review", -1, bak_messages
|
|
))
|
|
elif _state_db_has_session(session_id, state_db_path):
|
|
items.append(_new_audit_item(
|
|
session_id, "orphan_backup", "repairable", "restore_from_bak", -1, bak_messages
|
|
))
|
|
else:
|
|
items.append(_new_audit_item(
|
|
session_id,
|
|
"orphan_backup_without_state_row",
|
|
"unsafe_to_repair",
|
|
"manual_review",
|
|
-1,
|
|
bak_messages,
|
|
))
|
|
|
|
index_path = session_dir / '_index.json'
|
|
if index_path.exists():
|
|
index_ids = _read_index_session_ids(index_path)
|
|
for session_id in sorted(index_ids - live_ids):
|
|
items.append(_new_audit_item(
|
|
session_id, "index_missing_file", "repairable", "rebuild_index"
|
|
))
|
|
for session_id in sorted(live_ids - index_ids):
|
|
items.append(_new_audit_item(
|
|
session_id, "index_missing_entry", "repairable", "rebuild_index",
|
|
_msg_count(session_dir / f"{session_id}.json"), -1,
|
|
))
|
|
|
|
summary = {"ok": len(live_paths), "repairable": 0, "unsafe_to_repair": 0}
|
|
for item in items:
|
|
category = item.get('category')
|
|
if category in summary:
|
|
summary[category] += 1
|
|
if summary["unsafe_to_repair"]:
|
|
overall = "needs_manual_review"
|
|
elif summary["repairable"]:
|
|
overall = "warn"
|
|
else:
|
|
overall = "ok"
|
|
return {"status": overall, "summary": summary, "items": items}
|
|
|
|
|
|
def recover_all_sessions_on_startup(
|
|
session_dir: Path,
|
|
rebuild_index: bool = False,
|
|
state_db_path: Path | None = None,
|
|
) -> dict:
|
|
"""Scan session_dir for shrunken/orphaned sessions and restore from .bak.
|
|
|
|
Returns {"scanned": N, "restored": M, "orphaned_backups": K, "details": [...]}.
|
|
"""
|
|
if not session_dir.exists():
|
|
return {"scanned": 0, "restored": 0, "orphaned_backups": 0, "details": []}
|
|
scanned = 0
|
|
restored = 0
|
|
details: list[dict] = []
|
|
live_paths = [path for path in sorted(session_dir.glob('*.json')) if not path.name.startswith('_')]
|
|
orphan_paths = _orphaned_backup_live_paths(session_dir, state_db_path=state_db_path)
|
|
for path in [*live_paths, *orphan_paths]:
|
|
# Skip non-session JSON files in the same dir:
|
|
# - ``_index.json`` is a top-level list of session metadata
|
|
# - any future non-session JSON marked with the ``_`` convention is
|
|
# skipped automatically (project convention for system files in
|
|
# directories that otherwise hold user data)
|
|
scanned += 1
|
|
try:
|
|
result = recover_session(path)
|
|
except Exception as exc:
|
|
# Defensive: a malformed session file shouldn't break recovery
|
|
# for the rest. Log and continue.
|
|
logger.warning(
|
|
"recover_all_sessions_on_startup: skipped %s due to %s: %s",
|
|
path.name, type(exc).__name__, exc,
|
|
)
|
|
continue
|
|
if result.get("restored"):
|
|
restored += 1
|
|
details.append(result)
|
|
if restored:
|
|
logger.warning(
|
|
"recover_all_sessions_on_startup: restored %d/%d sessions from .bak. "
|
|
"If you weren't expecting this, check the session list for missing "
|
|
"messages — see #1558.", restored, scanned,
|
|
)
|
|
if rebuild_index:
|
|
try:
|
|
from api.models import _write_session_index
|
|
_write_session_index(updates=None)
|
|
except Exception as exc:
|
|
logger.warning("recover_all_sessions_on_startup: index rebuild failed: %s", exc)
|
|
return {
|
|
"scanned": scanned,
|
|
"restored": restored,
|
|
"orphaned_backups": len(orphan_paths),
|
|
"details": details,
|
|
}
|
|
|
|
|
|
def _main() -> int:
|
|
parser = argparse.ArgumentParser(description="Audit Hermes WebUI session recovery state")
|
|
parser.add_argument("--audit", action="store_true", help="run a read-only recovery audit")
|
|
parser.add_argument("--session-dir", type=Path, required=True, help="path to WebUI sessions directory")
|
|
parser.add_argument("--state-db", type=Path, default=None, help="optional Hermes state.db path")
|
|
args = parser.parse_args()
|
|
if not args.audit:
|
|
parser.error("currently only --audit is supported")
|
|
report = audit_session_recovery(args.session_dir, state_db_path=args.state_db)
|
|
print(json.dumps(report, sort_keys=True))
|
|
return 0
|
|
|
|
|
|
if __name__ == "__main__":
|
|
raise SystemExit(_main())
|