feat: reconcile missing WebUI sidecars from state db

This commit is contained in:
ai-ag2026
2026-05-11 02:30:00 +02:00
committed by nesquena-hermes
parent 90c3611732
commit a34ded8e99
2 changed files with 235 additions and 3 deletions
+166 -3
View File
@@ -175,6 +175,150 @@ def _orphaned_backup_live_paths(
return paths
def _read_state_db_missing_sidecar_rows(session_dir: Path, state_db_path: Path | None) -> list[dict]:
"""Return WebUI-origin state.db rows whose JSON sidecar is missing."""
if state_db_path is None or not state_db_path.exists():
return []
try:
with sqlite3.connect(f"file:{state_db_path}?mode=ro", uri=True) as conn:
conn.row_factory = sqlite3.Row
session_cols = {row[1] for row in conn.execute("PRAGMA table_info(sessions)").fetchall()}
message_cols = {row[1] for row in conn.execute("PRAGMA table_info(messages)").fetchall()}
if not {'id', 'source'}.issubset(session_cols):
return []
title_expr = _sql_optional_col('title', session_cols)
model_expr = _sql_optional_col('model', session_cols)
started_expr = _sql_optional_col('started_at', session_cols, '0')
parent_expr = _sql_optional_col('parent_session_id', session_cols)
msg_count_expr = _sql_optional_col('message_count', session_cols, '0')
rows = []
for row in conn.execute(
f"""
SELECT id, source, {title_expr}, {model_expr}, {started_expr},
{parent_expr}, {msg_count_expr}
FROM sessions
WHERE source = 'webui'
ORDER BY COALESCE(started_at, 0) DESC
"""
).fetchall():
data = dict(row)
sid = str(data.get('id') or '').strip()
if not sid or (session_dir / f"{sid}.json").exists():
continue
message_rows: list[dict] = []
if {'session_id', 'role', 'content'}.issubset(message_cols):
order = "timestamp, id" if 'timestamp' in message_cols and 'id' in message_cols else "rowid"
ts_expr = 'timestamp' if 'timestamp' in message_cols else 'NULL AS timestamp'
for msg in conn.execute(
f"SELECT role, content, {ts_expr} FROM messages WHERE session_id = ? ORDER BY {order}",
(sid,),
).fetchall():
message = {
'role': msg['role'],
'content': msg['content'] or '',
}
if msg['timestamp'] is not None:
message['timestamp'] = msg['timestamp']
message_rows.append(message)
if not message_rows:
continue
data['messages'] = message_rows
rows.append(data)
return rows
except Exception as exc:
logger.debug("state_db sidecar reconciliation scan failed for %s: %s", state_db_path, exc)
return []
def _sql_optional_col(name: str, columns: set[str], fallback: str = "NULL") -> str:
return name if name in columns else f"{fallback} AS {name}"
def _state_db_row_to_sidecar(row: dict) -> dict:
try:
from api.agent_sessions import normalize_agent_session_source
except Exception:
normalize_agent_session_source = None
source = str(row.get('source') or '').strip().lower()
source_meta = normalize_agent_session_source(source) if normalize_agent_session_source else {
'raw_source': source or None,
'session_source': source or None,
'source_label': source.title() if source else None,
}
started_at = row.get('started_at') or 0
messages = row.get('messages') if isinstance(row.get('messages'), list) else []
last_ts = messages[-1].get('timestamp') if messages and isinstance(messages[-1], dict) else started_at
return {
'session_id': row.get('id'),
'title': row.get('title') or 'Recovered WebUI Session',
'workspace': '',
'model': row.get('model') or 'unknown',
'model_provider': None,
'created_at': started_at,
'updated_at': last_ts or started_at,
'pinned': False,
'archived': False,
'project_id': None,
'profile': None,
'input_tokens': 0,
'output_tokens': 0,
'estimated_cost': None,
'personality': None,
'active_stream_id': None,
'pending_user_message': None,
'pending_attachments': [],
'pending_started_at': None,
'compression_anchor_visible_idx': None,
'compression_anchor_message_key': None,
'compression_anchor_summary': None,
'context_length': None,
'threshold_tokens': None,
'last_prompt_tokens': None,
'gateway_routing': None,
'gateway_routing_history': [],
'llm_title_generated': False,
'parent_session_id': row.get('parent_session_id'),
'is_cli_session': False,
'source_tag': source or None,
**source_meta,
'enabled_toolsets': None,
'composer_draft': {},
'messages': messages,
'tool_calls': [],
'_recovered_from_state_db': True,
}
def recover_missing_sidecars_from_state_db(session_dir: Path, state_db_path: Path | None) -> dict:
"""Materialize missing WebUI JSON sidecars from canonical state.db rows."""
rows = _read_state_db_missing_sidecar_rows(session_dir, state_db_path)
materialized = 0
details: list[dict] = []
session_dir.mkdir(parents=True, exist_ok=True)
for row in rows:
sid = str(row.get('id') or '').strip()
if not sid:
continue
target = session_dir / f"{sid}.json"
if target.exists():
continue
payload = _state_db_row_to_sidecar(row)
tmp = target.with_suffix('.json.reconcile.tmp')
try:
tmp.write_text(json.dumps(payload, ensure_ascii=False, indent=2), encoding='utf-8')
tmp.replace(target)
except OSError as exc:
try:
tmp.unlink(missing_ok=True)
except OSError:
pass
details.append({'session_id': sid, 'materialized': False, 'error': str(exc)})
continue
materialized += 1
details.append({'session_id': sid, 'materialized': True, 'messages': len(payload.get('messages') or [])})
return {'scanned': len(rows), 'materialized': materialized, 'details': details}
def _new_audit_item(
session_id: str,
kind: str,
@@ -275,6 +419,17 @@ def audit_session_recovery(session_dir: Path, state_db_path: Path | None = None)
_msg_count(session_dir / f"{session_id}.json"), -1,
))
for row in _read_state_db_missing_sidecar_rows(session_dir, state_db_path):
sid = str(row.get('id') or '')
items.append(_new_audit_item(
sid,
"state_db_missing_sidecar",
"repairable",
"materialize_from_state_db",
-1,
-1,
))
summary = {"ok": len(live_paths), "repairable": 0, "unsafe_to_repair": 0}
for item in items:
category = item.get('category')
@@ -297,19 +452,27 @@ def repair_safe_session_recovery(session_dir: Path, state_db_path: Path | None =
readable state.db. Unsafe audit findings remain for manual review.
"""
before = audit_session_recovery(session_dir, state_db_path=state_db_path)
repair = recover_all_sessions_on_startup(
backup_repair = recover_all_sessions_on_startup(
session_dir,
rebuild_index=True,
state_db_path=state_db_path,
)
sidecar_repair = recover_missing_sidecars_from_state_db(session_dir, state_db_path)
if sidecar_repair.get('materialized'):
try:
from api.models import _write_session_index
_write_session_index(updates=None)
except Exception as exc:
logger.warning("repair_safe_session_recovery: index rebuild after state.db reconciliation failed: %s", exc)
after = audit_session_recovery(session_dir, state_db_path=state_db_path)
unsafe_remaining = int((after.get("summary") or {}).get("unsafe_to_repair") or 0)
repairable_remaining = int((after.get("summary") or {}).get("repairable") or 0)
return {
"ok": unsafe_remaining == 0 and repairable_remaining == 0,
"repaired": int(repair.get("restored") or 0),
"repaired": int(backup_repair.get("restored") or 0) + int(sidecar_repair.get("materialized") or 0),
"before": before,
"repair": repair,
"backup_repair": backup_repair,
"sidecar_repair": sidecar_repair,
"after": after,
}
@@ -0,0 +1,69 @@
import json
import sqlite3
from api.session_recovery import recover_missing_sidecars_from_state_db, audit_session_recovery
def _make_state_db(path, *, sid="state_only_001", source="webui", messages=2):
conn = sqlite3.connect(path)
conn.execute(
"CREATE TABLE sessions (id TEXT PRIMARY KEY, source TEXT, title TEXT, model TEXT, started_at REAL, message_count INTEGER, parent_session_id TEXT)"
)
conn.execute(
"CREATE TABLE messages (id INTEGER PRIMARY KEY AUTOINCREMENT, session_id TEXT, role TEXT, content TEXT, timestamp REAL)"
)
conn.execute(
"INSERT INTO sessions (id, source, title, model, started_at, message_count, parent_session_id) VALUES (?, ?, ?, ?, ?, ?, ?)",
(sid, source, "Recovered from DB", "openai/gpt-5", 1234.0, messages, "parent-1"),
)
for i in range(messages):
conn.execute(
"INSERT INTO messages (session_id, role, content, timestamp) VALUES (?, ?, ?, ?)",
(sid, "user" if i % 2 == 0 else "assistant", f"message {i + 1}", 1234.0 + i),
)
conn.commit()
conn.close()
return sid
def test_recover_missing_sidecars_from_state_db_materializes_webui_row(tmp_path):
sid = _make_state_db(tmp_path / "state.db")
result = recover_missing_sidecars_from_state_db(tmp_path, tmp_path / "state.db")
assert result["materialized"] == 1
sidecar = tmp_path / f"{sid}.json"
assert sidecar.exists()
data = json.loads(sidecar.read_text(encoding="utf-8"))
assert data["session_id"] == sid
assert data["title"] == "Recovered from DB"
assert data["model"] == "openai/gpt-5"
assert data["parent_session_id"] == "parent-1"
assert data["source_tag"] == "webui"
assert data["session_source"] == "webui"
assert [m["content"] for m in data["messages"]] == ["message 1", "message 2"]
def test_recover_missing_sidecars_from_state_db_skips_existing_sidecar(tmp_path):
sid = _make_state_db(tmp_path / "state.db")
existing = tmp_path / f"{sid}.json"
existing.write_text(json.dumps({"session_id": sid, "messages": [{"role": "user", "content": "keep"}]}), encoding="utf-8")
result = recover_missing_sidecars_from_state_db(tmp_path, tmp_path / "state.db")
assert result["materialized"] == 0
assert json.loads(existing.read_text(encoding="utf-8"))["messages"][0]["content"] == "keep"
def test_audit_reports_state_db_row_missing_sidecar(tmp_path):
sid = _make_state_db(tmp_path / "state.db")
report = audit_session_recovery(tmp_path, state_db_path=tmp_path / "state.db")
assert any(
item["session_id"] == sid
and item["kind"] == "state_db_missing_sidecar"
and item["category"] == "repairable"
and item["recommendation"] == "materialize_from_state_db"
for item in report["items"]
)