From 9f3f8ea902ed0357ee7d569e41419d727bf38bd1 Mon Sep 17 00:00:00 2001 From: nesquena-hermes Date: Mon, 11 May 2026 02:44:38 +0000 Subject: [PATCH] fix(recovery): close concurrency hazards in state.db sidecar reconciliation MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Two concrete data-corruption vectors flagged in Opus review of PR #2041, both fixed atomically so the new repair-safe endpoint is safe for production: 1. Shared tmp filename under concurrent calls `tmp = target.with_suffix('.json.reconcile.tmp')` produced a fixed path per session ID. Two simultaneous repair-safe POSTs would interleave bytes in the same tmp file, then both rename → corrupted JSON. Now matches the `Session.save()` convention at api/models.py:484 with a pid+tid suffix. 2. TOCTOU between target.exists() check and tmp.replace(target) `os.replace()` overwrites unconditionally. If a concurrent Session.save() for the same SID materialized the live sidecar in the microsecond window between the existence check and the rename, the reconciliation would silently overwrite a live sidecar with a (lossier) state.db reconstruction. Switched to `os.link()` + `unlink(tmp)` which is atomic create-or-fail — on FileExistsError we record `skipped: sidecar_appeared_during_reconcile` and keep the live sidecar untouched. Plus a round-trip schema-parity test: materialize a sidecar from state.db, then load it back through `Session.load()` and assert the messages survive. Catches future schema drift between `_state_db_row_to_sidecar()` and `Session.__init__()`. Also adds a guard test confirming the .reconcile.tmp suffix includes pid+tid (regression guard for hazard #1). Tests: 23 passing across the recovery suite (was 21; +2 new in this commit). Co-authored-by: ai-ag2026 <261867348+ai-ag2026@users.noreply.github.com> --- api/session_recovery.py | 34 +++++++++-- .../test_session_db_sidecar_reconciliation.py | 58 +++++++++++++++++++ 2 files changed, 88 insertions(+), 4 deletions(-) diff --git a/api/session_recovery.py b/api/session_recovery.py index 6347bdb7..0e93033a 100644 --- a/api/session_recovery.py +++ b/api/session_recovery.py @@ -28,8 +28,10 @@ from __future__ import annotations import argparse import json import logging +import os import shutil import sqlite3 +import threading from pathlib import Path logger = logging.getLogger(__name__) @@ -303,10 +305,13 @@ def recover_missing_sidecars_from_state_db(session_dir: Path, state_db_path: Pat if target.exists(): continue payload = _state_db_row_to_sidecar(row) - tmp = target.with_suffix('.json.reconcile.tmp') + # Per-process/per-thread tmp suffix to avoid corruption under + # concurrent reconciliation calls (matches api/models.py:484 + # Session.save() convention). + tmp_suffix = f".json.reconcile.tmp.{os.getpid()}.{threading.current_thread().ident}" + tmp = target.with_suffix(tmp_suffix) try: tmp.write_text(json.dumps(payload, ensure_ascii=False, indent=2), encoding='utf-8') - tmp.replace(target) except OSError as exc: try: tmp.unlink(missing_ok=True) @@ -314,8 +319,29 @@ def recover_missing_sidecars_from_state_db(session_dir: Path, state_db_path: Pat pass details.append({'session_id': sid, 'materialized': False, 'error': str(exc)}) continue - materialized += 1 - details.append({'session_id': sid, 'materialized': True, 'messages': len(payload.get('messages') or [])}) + # Atomic create-or-fail: os.link() refuses to overwrite an existing + # target. Closes the TOCTOU window between the target.exists() check + # above and the rename — a concurrent Session.save() for the same SID + # will win and we silently skip rather than overwrite a live sidecar. + materialized_now = False + try: + os.link(str(tmp), str(target)) + materialized_now = True + except FileExistsError: + # Live sidecar appeared between the check and the link — keep it. + pass + except OSError as exc: + details.append({'session_id': sid, 'materialized': False, 'error': str(exc)}) + finally: + try: + tmp.unlink(missing_ok=True) + except OSError: + pass + if materialized_now: + materialized += 1 + details.append({'session_id': sid, 'materialized': True, 'messages': len(payload.get('messages') or [])}) + elif not any(d.get('session_id') == sid for d in details[-1:]): + details.append({'session_id': sid, 'materialized': False, 'skipped': 'sidecar_appeared_during_reconcile'}) return {'scanned': len(rows), 'materialized': materialized, 'details': details} diff --git a/tests/test_session_db_sidecar_reconciliation.py b/tests/test_session_db_sidecar_reconciliation.py index 631bf227..95b64d97 100644 --- a/tests/test_session_db_sidecar_reconciliation.py +++ b/tests/test_session_db_sidecar_reconciliation.py @@ -67,3 +67,61 @@ def test_audit_reports_state_db_row_missing_sidecar(tmp_path): and item["recommendation"] == "materialize_from_state_db" for item in report["items"] ) + + +def test_materialized_sidecar_round_trips_through_session_load(tmp_path, monkeypatch): + """Schema parity guard: a materialized sidecar must be readable by Session.load + and the resulting Session must have the same messages we put in state.db. + + Catches future schema drift where the hardcoded 35-key dict in + _state_db_row_to_sidecar() falls out of sync with what Session.__init__ + expects. See Opus review on PR #2041 for context. + """ + import api.models as _m + + sid = _make_state_db(tmp_path / "state.db", sid="rt_001", messages=3) + + monkeypatch.setattr(_m, "SESSION_DIR", tmp_path) + + result = recover_missing_sidecars_from_state_db(tmp_path, tmp_path / "state.db") + assert result["materialized"] == 1 + + loaded = _m.Session.load(sid) + assert loaded is not None, "Session.load returned None for materialized sidecar" + assert loaded.session_id == sid + assert len(loaded.messages) == 3 + assert [m["content"] for m in loaded.messages] == [ + "message 1", + "message 2", + "message 3", + ] + assert loaded.model == "openai/gpt-5" + assert loaded.parent_session_id == "parent-1" + + +def test_recover_missing_sidecars_uses_per_process_tmp_suffix(tmp_path): + """The tmp filename used during reconciliation must include pid/tid so + concurrent calls cannot corrupt each other's writes. See Opus review on + PR #2041 (matches Session.save() pattern at api/models.py:484). + """ + import os + import threading + + _make_state_db(tmp_path / "state.db", sid="tmp_suffix_001", messages=1) + + # Snapshot the directory before, run reconciliation, then check no + # generic ".json.reconcile.tmp" residue exists — it must have a + # pid.tid suffix and be cleaned up after. + result = recover_missing_sidecars_from_state_db(tmp_path, tmp_path / "state.db") + assert result["materialized"] == 1 + + # No leftover tmp files + leftover = list(tmp_path.glob("*.reconcile.tmp*")) + assert leftover == [], f"Reconciliation left tmp residue: {leftover}" + + # And the source explicitly references pid + tid in the suffix + from pathlib import Path + src = (Path(__file__).resolve().parent.parent / "api" / "session_recovery.py").read_text(encoding="utf-8") + assert "os.getpid()" in src and "threading.current_thread().ident" in src, ( + ".reconcile.tmp suffix must include pid + tid for concurrency safety" + )