Fix stale stream state in session list

This commit is contained in:
Frank Song
2026-05-13 10:26:09 +08:00
parent 9268f411d8
commit 5ae63ddd13
3 changed files with 135 additions and 0 deletions
+1
View File
@@ -4,6 +4,7 @@
### Fixed
- `/api/sessions` now reconciles stale persisted stream state before serializing sidebar rows, so sessions left with a dead `active_stream_id` after a restart or worker crash stop advertising stale runtime fields in the list response (refs #2157).
- **PR #2136** by @LumenYoung — Stale stream writebacks no longer poison the active session transcript. `cancel_stream()` intentionally clears `active_stream_id` early so the UI can accept a follow-up turn while an old worker is unwinding — but the old worker could still return later from `run_conversation()` and persist its stale result over the newer transcript, causing visible transcript / turn journal / `state.db` to disagree (especially around cancel+retry on compressed continuations). Adds a single-line ownership check `_stream_writeback_is_current(session, stream_id)` (token equality against `session.active_stream_id`) and short-circuits both finalize paths: the success path in `_run_agent_streaming` and the cancel-handler path in `cancel_stream()`. When the stream no longer owns the writeback, both paths log `Skipping stale stream/cancel writeback` and return cleanly without persisting. 89-line regression suite in `tests/test_stale_stream_writeback.py`; companion updates to `tests/test_issue1361_cancel_data_loss.py` and `tests/test_sprint42.py` for the new return-without-persist behavior.
### Added
+30
View File
@@ -953,6 +953,32 @@ def _clear_stale_stream_state(session) -> bool:
pass
return True
def _reconcile_stale_stream_state_for_session_rows(session_rows) -> bool:
"""Clear stale persisted stream fields before /api/sessions serializes rows."""
changed = False
for row in session_rows:
if not isinstance(row, dict):
continue
sid = row.get("session_id")
if not sid or not row.get("active_stream_id"):
continue
if row.get("is_streaming") is True:
continue
try:
session = get_session(sid, metadata_only=True)
except Exception:
logger.debug(
"Failed to load session %s while reconciling stale stream state",
sid,
exc_info=True,
)
continue
if session is None:
continue
changed = _clear_stale_stream_state(session) or changed
return changed
# ── CSRF: validate Origin/Referer on POST ────────────────────────────────────
import re as _re
@@ -3385,6 +3411,10 @@ def handle_get(handler, parsed) -> bool:
try:
diag.stage("all_sessions")
webui_sessions = all_sessions(diag=diag)
diag.stage("reconcile_stale_stream_state")
if _reconcile_stale_stream_state_for_session_rows(webui_sessions):
diag.stage("all_sessions_after_stale_stream_reconcile")
webui_sessions = all_sessions(diag=diag)
diag.stage("load_settings")
settings = load_settings()
show_cli_sessions = bool(settings.get("show_cli_sessions"))
@@ -0,0 +1,104 @@
import io
import json
from urllib.parse import urlparse
import api.profiles as profiles
import api.routes as routes
class _FakeHandler:
def __init__(self):
self.status = None
self.headers = {}
self.wfile = io.BytesIO()
def send_response(self, status):
self.status = status
def send_header(self, key, value):
self.headers[key] = value
def end_headers(self):
pass
def json_body(self):
return json.loads(self.wfile.getvalue().decode("utf-8"))
def test_sessions_list_reconciles_stale_stream_state_before_serializing(monkeypatch):
repaired = {"value": False}
all_sessions_calls = {"count": 0}
class _Session:
def __init__(self):
self.session_id = "stale-session"
self.active_stream_id = "stale-stream"
def fake_all_sessions(diag=None):
all_sessions_calls["count"] += 1
if repaired["value"]:
active_stream_id = None
is_streaming = False
else:
active_stream_id = "stale-stream"
is_streaming = False
return [
{
"session_id": "stale-session",
"title": "Stale Session",
"profile": "default",
"active_stream_id": active_stream_id,
"is_streaming": is_streaming,
"updated_at": 1,
"last_message_at": 1,
}
]
def fake_get_session(session_id, metadata_only=False):
assert session_id == "stale-session"
assert metadata_only is True
return _Session()
def fake_clear_stale_stream_state(session):
repaired["value"] = True
session.active_stream_id = None
return True
monkeypatch.setattr(routes, "all_sessions", fake_all_sessions)
monkeypatch.setattr(routes, "get_session", fake_get_session)
monkeypatch.setattr(routes, "_clear_stale_stream_state", fake_clear_stale_stream_state)
monkeypatch.setattr(routes, "load_settings", lambda: {"show_cli_sessions": False})
monkeypatch.setattr(profiles, "get_active_profile_name", lambda: "default")
handler = _FakeHandler()
parsed = urlparse("http://example.com/api/sessions")
routes.handle_get(handler, parsed)
assert handler.status == 200
payload = handler.json_body()
sessions = payload["sessions"]
assert all_sessions_calls["count"] == 2
assert repaired["value"] is True
assert sessions[0]["active_stream_id"] is None
assert sessions[0]["is_streaming"] is False
def test_reconcile_stale_stream_state_skips_live_stream_rows(monkeypatch):
loaded = []
def fake_get_session(session_id, metadata_only=False):
loaded.append((session_id, metadata_only))
raise AssertionError("live stream rows should not be loaded for cleanup")
monkeypatch.setattr(routes, "get_session", fake_get_session)
changed = routes._reconcile_stale_stream_state_for_session_rows([
{
"session_id": "live-session",
"active_stream_id": "live-stream",
"is_streaming": True,
}
])
assert changed is False
assert loaded == []