fix(recovery): close concurrency hazards in state.db sidecar reconciliation

Two concrete data-corruption vectors flagged in Opus review of PR #2041,
both fixed atomically so the new repair-safe endpoint is safe for production:

1. Shared tmp filename under concurrent calls
   `tmp = target.with_suffix('.json.reconcile.tmp')` produced a fixed path
   per session ID. Two simultaneous repair-safe POSTs would interleave bytes
   in the same tmp file, then both rename → corrupted JSON. Now matches the
   `Session.save()` convention at api/models.py:484 with a pid+tid suffix.

2. TOCTOU between target.exists() check and tmp.replace(target)
   `os.replace()` overwrites unconditionally. If a concurrent Session.save()
   for the same SID materialized the live sidecar in the microsecond window
   between the existence check and the rename, the reconciliation would
   silently overwrite a live sidecar with a (lossier) state.db reconstruction.
   Switched to `os.link()` + `unlink(tmp)` which is atomic create-or-fail —
   on FileExistsError we record `skipped: sidecar_appeared_during_reconcile`
   and keep the live sidecar untouched.

Plus a round-trip schema-parity test: materialize a sidecar from state.db,
then load it back through `Session.load()` and assert the messages survive.
Catches future schema drift between `_state_db_row_to_sidecar()` and
`Session.__init__()`. Also adds a guard test confirming the .reconcile.tmp
suffix includes pid+tid (regression guard for hazard #1).

Tests: 23 passing across the recovery suite (was 21; +2 new in this commit).

Co-authored-by: ai-ag2026 <261867348+ai-ag2026@users.noreply.github.com>
This commit is contained in:
nesquena-hermes
2026-05-11 02:44:38 +00:00
parent c710efb463
commit 9f3f8ea902
2 changed files with 88 additions and 4 deletions
+30 -4
View File
@@ -28,8 +28,10 @@ from __future__ import annotations
import argparse
import json
import logging
import os
import shutil
import sqlite3
import threading
from pathlib import Path
logger = logging.getLogger(__name__)
@@ -303,10 +305,13 @@ def recover_missing_sidecars_from_state_db(session_dir: Path, state_db_path: Pat
if target.exists():
continue
payload = _state_db_row_to_sidecar(row)
tmp = target.with_suffix('.json.reconcile.tmp')
# Per-process/per-thread tmp suffix to avoid corruption under
# concurrent reconciliation calls (matches api/models.py:484
# Session.save() convention).
tmp_suffix = f".json.reconcile.tmp.{os.getpid()}.{threading.current_thread().ident}"
tmp = target.with_suffix(tmp_suffix)
try:
tmp.write_text(json.dumps(payload, ensure_ascii=False, indent=2), encoding='utf-8')
tmp.replace(target)
except OSError as exc:
try:
tmp.unlink(missing_ok=True)
@@ -314,8 +319,29 @@ def recover_missing_sidecars_from_state_db(session_dir: Path, state_db_path: Pat
pass
details.append({'session_id': sid, 'materialized': False, 'error': str(exc)})
continue
materialized += 1
details.append({'session_id': sid, 'materialized': True, 'messages': len(payload.get('messages') or [])})
# Atomic create-or-fail: os.link() refuses to overwrite an existing
# target. Closes the TOCTOU window between the target.exists() check
# above and the rename — a concurrent Session.save() for the same SID
# will win and we silently skip rather than overwrite a live sidecar.
materialized_now = False
try:
os.link(str(tmp), str(target))
materialized_now = True
except FileExistsError:
# Live sidecar appeared between the check and the link — keep it.
pass
except OSError as exc:
details.append({'session_id': sid, 'materialized': False, 'error': str(exc)})
finally:
try:
tmp.unlink(missing_ok=True)
except OSError:
pass
if materialized_now:
materialized += 1
details.append({'session_id': sid, 'materialized': True, 'messages': len(payload.get('messages') or [])})
elif not any(d.get('session_id') == sid for d in details[-1:]):
details.append({'session_id': sid, 'materialized': False, 'skipped': 'sidecar_appeared_during_reconcile'})
return {'scanned': len(rows), 'materialized': materialized, 'details': details}
@@ -67,3 +67,61 @@ def test_audit_reports_state_db_row_missing_sidecar(tmp_path):
and item["recommendation"] == "materialize_from_state_db"
for item in report["items"]
)
def test_materialized_sidecar_round_trips_through_session_load(tmp_path, monkeypatch):
"""Schema parity guard: a materialized sidecar must be readable by Session.load
and the resulting Session must have the same messages we put in state.db.
Catches future schema drift where the hardcoded 35-key dict in
_state_db_row_to_sidecar() falls out of sync with what Session.__init__
expects. See Opus review on PR #2041 for context.
"""
import api.models as _m
sid = _make_state_db(tmp_path / "state.db", sid="rt_001", messages=3)
monkeypatch.setattr(_m, "SESSION_DIR", tmp_path)
result = recover_missing_sidecars_from_state_db(tmp_path, tmp_path / "state.db")
assert result["materialized"] == 1
loaded = _m.Session.load(sid)
assert loaded is not None, "Session.load returned None for materialized sidecar"
assert loaded.session_id == sid
assert len(loaded.messages) == 3
assert [m["content"] for m in loaded.messages] == [
"message 1",
"message 2",
"message 3",
]
assert loaded.model == "openai/gpt-5"
assert loaded.parent_session_id == "parent-1"
def test_recover_missing_sidecars_uses_per_process_tmp_suffix(tmp_path):
"""The tmp filename used during reconciliation must include pid/tid so
concurrent calls cannot corrupt each other's writes. See Opus review on
PR #2041 (matches Session.save() pattern at api/models.py:484).
"""
import os
import threading
_make_state_db(tmp_path / "state.db", sid="tmp_suffix_001", messages=1)
# Snapshot the directory before, run reconciliation, then check no
# generic ".json.reconcile.tmp" residue exists — it must have a
# pid.tid suffix and be cleaned up after.
result = recover_missing_sidecars_from_state_db(tmp_path, tmp_path / "state.db")
assert result["materialized"] == 1
# No leftover tmp files
leftover = list(tmp_path.glob("*.reconcile.tmp*"))
assert leftover == [], f"Reconciliation left tmp residue: {leftover}"
# And the source explicitly references pid + tid in the suffix
from pathlib import Path
src = (Path(__file__).resolve().parent.parent / "api" / "session_recovery.py").read_text(encoding="utf-8")
assert "os.getpid()" in src and "threading.current_thread().ident" in src, (
".reconcile.tmp suffix must include pid + tid for concurrency safety"
)