diff --git a/api/session_recovery.py b/api/session_recovery.py index b42ab7f4..6347bdb7 100644 --- a/api/session_recovery.py +++ b/api/session_recovery.py @@ -175,6 +175,150 @@ def _orphaned_backup_live_paths( return paths +def _read_state_db_missing_sidecar_rows(session_dir: Path, state_db_path: Path | None) -> list[dict]: + """Return WebUI-origin state.db rows whose JSON sidecar is missing.""" + if state_db_path is None or not state_db_path.exists(): + return [] + try: + with sqlite3.connect(f"file:{state_db_path}?mode=ro", uri=True) as conn: + conn.row_factory = sqlite3.Row + session_cols = {row[1] for row in conn.execute("PRAGMA table_info(sessions)").fetchall()} + message_cols = {row[1] for row in conn.execute("PRAGMA table_info(messages)").fetchall()} + if not {'id', 'source'}.issubset(session_cols): + return [] + title_expr = _sql_optional_col('title', session_cols) + model_expr = _sql_optional_col('model', session_cols) + started_expr = _sql_optional_col('started_at', session_cols, '0') + parent_expr = _sql_optional_col('parent_session_id', session_cols) + msg_count_expr = _sql_optional_col('message_count', session_cols, '0') + rows = [] + for row in conn.execute( + f""" + SELECT id, source, {title_expr}, {model_expr}, {started_expr}, + {parent_expr}, {msg_count_expr} + FROM sessions + WHERE source = 'webui' + ORDER BY COALESCE(started_at, 0) DESC + """ + ).fetchall(): + data = dict(row) + sid = str(data.get('id') or '').strip() + if not sid or (session_dir / f"{sid}.json").exists(): + continue + message_rows: list[dict] = [] + if {'session_id', 'role', 'content'}.issubset(message_cols): + order = "timestamp, id" if 'timestamp' in message_cols and 'id' in message_cols else "rowid" + ts_expr = 'timestamp' if 'timestamp' in message_cols else 'NULL AS timestamp' + for msg in conn.execute( + f"SELECT role, content, {ts_expr} FROM messages WHERE session_id = ? ORDER BY {order}", + (sid,), + ).fetchall(): + message = { + 'role': msg['role'], + 'content': msg['content'] or '', + } + if msg['timestamp'] is not None: + message['timestamp'] = msg['timestamp'] + message_rows.append(message) + if not message_rows: + continue + data['messages'] = message_rows + rows.append(data) + return rows + except Exception as exc: + logger.debug("state_db sidecar reconciliation scan failed for %s: %s", state_db_path, exc) + return [] + + +def _sql_optional_col(name: str, columns: set[str], fallback: str = "NULL") -> str: + return name if name in columns else f"{fallback} AS {name}" + + +def _state_db_row_to_sidecar(row: dict) -> dict: + try: + from api.agent_sessions import normalize_agent_session_source + except Exception: + normalize_agent_session_source = None + source = str(row.get('source') or '').strip().lower() + source_meta = normalize_agent_session_source(source) if normalize_agent_session_source else { + 'raw_source': source or None, + 'session_source': source or None, + 'source_label': source.title() if source else None, + } + started_at = row.get('started_at') or 0 + messages = row.get('messages') if isinstance(row.get('messages'), list) else [] + last_ts = messages[-1].get('timestamp') if messages and isinstance(messages[-1], dict) else started_at + return { + 'session_id': row.get('id'), + 'title': row.get('title') or 'Recovered WebUI Session', + 'workspace': '', + 'model': row.get('model') or 'unknown', + 'model_provider': None, + 'created_at': started_at, + 'updated_at': last_ts or started_at, + 'pinned': False, + 'archived': False, + 'project_id': None, + 'profile': None, + 'input_tokens': 0, + 'output_tokens': 0, + 'estimated_cost': None, + 'personality': None, + 'active_stream_id': None, + 'pending_user_message': None, + 'pending_attachments': [], + 'pending_started_at': None, + 'compression_anchor_visible_idx': None, + 'compression_anchor_message_key': None, + 'compression_anchor_summary': None, + 'context_length': None, + 'threshold_tokens': None, + 'last_prompt_tokens': None, + 'gateway_routing': None, + 'gateway_routing_history': [], + 'llm_title_generated': False, + 'parent_session_id': row.get('parent_session_id'), + 'is_cli_session': False, + 'source_tag': source or None, + **source_meta, + 'enabled_toolsets': None, + 'composer_draft': {}, + 'messages': messages, + 'tool_calls': [], + '_recovered_from_state_db': True, + } + + +def recover_missing_sidecars_from_state_db(session_dir: Path, state_db_path: Path | None) -> dict: + """Materialize missing WebUI JSON sidecars from canonical state.db rows.""" + rows = _read_state_db_missing_sidecar_rows(session_dir, state_db_path) + materialized = 0 + details: list[dict] = [] + session_dir.mkdir(parents=True, exist_ok=True) + for row in rows: + sid = str(row.get('id') or '').strip() + if not sid: + continue + target = session_dir / f"{sid}.json" + if target.exists(): + continue + payload = _state_db_row_to_sidecar(row) + tmp = target.with_suffix('.json.reconcile.tmp') + try: + tmp.write_text(json.dumps(payload, ensure_ascii=False, indent=2), encoding='utf-8') + tmp.replace(target) + except OSError as exc: + try: + tmp.unlink(missing_ok=True) + except OSError: + pass + details.append({'session_id': sid, 'materialized': False, 'error': str(exc)}) + continue + materialized += 1 + details.append({'session_id': sid, 'materialized': True, 'messages': len(payload.get('messages') or [])}) + return {'scanned': len(rows), 'materialized': materialized, 'details': details} + + def _new_audit_item( session_id: str, kind: str, @@ -275,6 +419,17 @@ def audit_session_recovery(session_dir: Path, state_db_path: Path | None = None) _msg_count(session_dir / f"{session_id}.json"), -1, )) + for row in _read_state_db_missing_sidecar_rows(session_dir, state_db_path): + sid = str(row.get('id') or '') + items.append(_new_audit_item( + sid, + "state_db_missing_sidecar", + "repairable", + "materialize_from_state_db", + -1, + -1, + )) + summary = {"ok": len(live_paths), "repairable": 0, "unsafe_to_repair": 0} for item in items: category = item.get('category') @@ -297,19 +452,27 @@ def repair_safe_session_recovery(session_dir: Path, state_db_path: Path | None = readable state.db. Unsafe audit findings remain for manual review. """ before = audit_session_recovery(session_dir, state_db_path=state_db_path) - repair = recover_all_sessions_on_startup( + backup_repair = recover_all_sessions_on_startup( session_dir, rebuild_index=True, state_db_path=state_db_path, ) + sidecar_repair = recover_missing_sidecars_from_state_db(session_dir, state_db_path) + if sidecar_repair.get('materialized'): + try: + from api.models import _write_session_index + _write_session_index(updates=None) + except Exception as exc: + logger.warning("repair_safe_session_recovery: index rebuild after state.db reconciliation failed: %s", exc) after = audit_session_recovery(session_dir, state_db_path=state_db_path) unsafe_remaining = int((after.get("summary") or {}).get("unsafe_to_repair") or 0) repairable_remaining = int((after.get("summary") or {}).get("repairable") or 0) return { "ok": unsafe_remaining == 0 and repairable_remaining == 0, - "repaired": int(repair.get("restored") or 0), + "repaired": int(backup_repair.get("restored") or 0) + int(sidecar_repair.get("materialized") or 0), "before": before, - "repair": repair, + "backup_repair": backup_repair, + "sidecar_repair": sidecar_repair, "after": after, } diff --git a/tests/test_session_db_sidecar_reconciliation.py b/tests/test_session_db_sidecar_reconciliation.py new file mode 100644 index 00000000..631bf227 --- /dev/null +++ b/tests/test_session_db_sidecar_reconciliation.py @@ -0,0 +1,69 @@ +import json +import sqlite3 + +from api.session_recovery import recover_missing_sidecars_from_state_db, audit_session_recovery + + +def _make_state_db(path, *, sid="state_only_001", source="webui", messages=2): + conn = sqlite3.connect(path) + conn.execute( + "CREATE TABLE sessions (id TEXT PRIMARY KEY, source TEXT, title TEXT, model TEXT, started_at REAL, message_count INTEGER, parent_session_id TEXT)" + ) + conn.execute( + "CREATE TABLE messages (id INTEGER PRIMARY KEY AUTOINCREMENT, session_id TEXT, role TEXT, content TEXT, timestamp REAL)" + ) + conn.execute( + "INSERT INTO sessions (id, source, title, model, started_at, message_count, parent_session_id) VALUES (?, ?, ?, ?, ?, ?, ?)", + (sid, source, "Recovered from DB", "openai/gpt-5", 1234.0, messages, "parent-1"), + ) + for i in range(messages): + conn.execute( + "INSERT INTO messages (session_id, role, content, timestamp) VALUES (?, ?, ?, ?)", + (sid, "user" if i % 2 == 0 else "assistant", f"message {i + 1}", 1234.0 + i), + ) + conn.commit() + conn.close() + return sid + + +def test_recover_missing_sidecars_from_state_db_materializes_webui_row(tmp_path): + sid = _make_state_db(tmp_path / "state.db") + + result = recover_missing_sidecars_from_state_db(tmp_path, tmp_path / "state.db") + + assert result["materialized"] == 1 + sidecar = tmp_path / f"{sid}.json" + assert sidecar.exists() + data = json.loads(sidecar.read_text(encoding="utf-8")) + assert data["session_id"] == sid + assert data["title"] == "Recovered from DB" + assert data["model"] == "openai/gpt-5" + assert data["parent_session_id"] == "parent-1" + assert data["source_tag"] == "webui" + assert data["session_source"] == "webui" + assert [m["content"] for m in data["messages"]] == ["message 1", "message 2"] + + +def test_recover_missing_sidecars_from_state_db_skips_existing_sidecar(tmp_path): + sid = _make_state_db(tmp_path / "state.db") + existing = tmp_path / f"{sid}.json" + existing.write_text(json.dumps({"session_id": sid, "messages": [{"role": "user", "content": "keep"}]}), encoding="utf-8") + + result = recover_missing_sidecars_from_state_db(tmp_path, tmp_path / "state.db") + + assert result["materialized"] == 0 + assert json.loads(existing.read_text(encoding="utf-8"))["messages"][0]["content"] == "keep" + + +def test_audit_reports_state_db_row_missing_sidecar(tmp_path): + sid = _make_state_db(tmp_path / "state.db") + + report = audit_session_recovery(tmp_path, state_db_path=tmp_path / "state.db") + + assert any( + item["session_id"] == sid + and item["kind"] == "state_db_missing_sidecar" + and item["category"] == "repairable" + and item["recommendation"] == "materialize_from_state_db" + for item in report["items"] + )