From 5ae63ddd131ef72561d7a7bbd3f31d1db429ab54 Mon Sep 17 00:00:00 2001 From: Frank Song Date: Wed, 13 May 2026 10:26:09 +0800 Subject: [PATCH] Fix stale stream state in session list --- CHANGELOG.md | 1 + api/routes.py | 30 +++++ ...ue2157_sessions_list_stale_stream_state.py | 104 ++++++++++++++++++ 3 files changed, 135 insertions(+) create mode 100644 tests/test_issue2157_sessions_list_stale_stream_state.py diff --git a/CHANGELOG.md b/CHANGELOG.md index 9d737b24..4bdcc956 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,7 @@ ### Fixed +- `/api/sessions` now reconciles stale persisted stream state before serializing sidebar rows, so sessions left with a dead `active_stream_id` after a restart or worker crash stop advertising stale runtime fields in the list response (refs #2157). - **PR #2136** by @LumenYoung — Stale stream writebacks no longer poison the active session transcript. `cancel_stream()` intentionally clears `active_stream_id` early so the UI can accept a follow-up turn while an old worker is unwinding — but the old worker could still return later from `run_conversation()` and persist its stale result over the newer transcript, causing visible transcript / turn journal / `state.db` to disagree (especially around cancel+retry on compressed continuations). Adds a single-line ownership check `_stream_writeback_is_current(session, stream_id)` (token equality against `session.active_stream_id`) and short-circuits both finalize paths: the success path in `_run_agent_streaming` and the cancel-handler path in `cancel_stream()`. When the stream no longer owns the writeback, both paths log `Skipping stale stream/cancel writeback` and return cleanly without persisting. 89-line regression suite in `tests/test_stale_stream_writeback.py`; companion updates to `tests/test_issue1361_cancel_data_loss.py` and `tests/test_sprint42.py` for the new return-without-persist behavior. ### Added diff --git a/api/routes.py b/api/routes.py index 65f12285..bee2bfee 100644 --- a/api/routes.py +++ b/api/routes.py @@ -953,6 +953,32 @@ def _clear_stale_stream_state(session) -> bool: pass return True + +def _reconcile_stale_stream_state_for_session_rows(session_rows) -> bool: + """Clear stale persisted stream fields before /api/sessions serializes rows.""" + changed = False + for row in session_rows: + if not isinstance(row, dict): + continue + sid = row.get("session_id") + if not sid or not row.get("active_stream_id"): + continue + if row.get("is_streaming") is True: + continue + try: + session = get_session(sid, metadata_only=True) + except Exception: + logger.debug( + "Failed to load session %s while reconciling stale stream state", + sid, + exc_info=True, + ) + continue + if session is None: + continue + changed = _clear_stale_stream_state(session) or changed + return changed + # ── CSRF: validate Origin/Referer on POST ──────────────────────────────────── import re as _re @@ -3385,6 +3411,10 @@ def handle_get(handler, parsed) -> bool: try: diag.stage("all_sessions") webui_sessions = all_sessions(diag=diag) + diag.stage("reconcile_stale_stream_state") + if _reconcile_stale_stream_state_for_session_rows(webui_sessions): + diag.stage("all_sessions_after_stale_stream_reconcile") + webui_sessions = all_sessions(diag=diag) diag.stage("load_settings") settings = load_settings() show_cli_sessions = bool(settings.get("show_cli_sessions")) diff --git a/tests/test_issue2157_sessions_list_stale_stream_state.py b/tests/test_issue2157_sessions_list_stale_stream_state.py new file mode 100644 index 00000000..251fabdb --- /dev/null +++ b/tests/test_issue2157_sessions_list_stale_stream_state.py @@ -0,0 +1,104 @@ +import io +import json +from urllib.parse import urlparse + +import api.profiles as profiles +import api.routes as routes + + +class _FakeHandler: + def __init__(self): + self.status = None + self.headers = {} + self.wfile = io.BytesIO() + + def send_response(self, status): + self.status = status + + def send_header(self, key, value): + self.headers[key] = value + + def end_headers(self): + pass + + def json_body(self): + return json.loads(self.wfile.getvalue().decode("utf-8")) + + +def test_sessions_list_reconciles_stale_stream_state_before_serializing(monkeypatch): + repaired = {"value": False} + all_sessions_calls = {"count": 0} + + class _Session: + def __init__(self): + self.session_id = "stale-session" + self.active_stream_id = "stale-stream" + + def fake_all_sessions(diag=None): + all_sessions_calls["count"] += 1 + if repaired["value"]: + active_stream_id = None + is_streaming = False + else: + active_stream_id = "stale-stream" + is_streaming = False + return [ + { + "session_id": "stale-session", + "title": "Stale Session", + "profile": "default", + "active_stream_id": active_stream_id, + "is_streaming": is_streaming, + "updated_at": 1, + "last_message_at": 1, + } + ] + + def fake_get_session(session_id, metadata_only=False): + assert session_id == "stale-session" + assert metadata_only is True + return _Session() + + def fake_clear_stale_stream_state(session): + repaired["value"] = True + session.active_stream_id = None + return True + + monkeypatch.setattr(routes, "all_sessions", fake_all_sessions) + monkeypatch.setattr(routes, "get_session", fake_get_session) + monkeypatch.setattr(routes, "_clear_stale_stream_state", fake_clear_stale_stream_state) + monkeypatch.setattr(routes, "load_settings", lambda: {"show_cli_sessions": False}) + monkeypatch.setattr(profiles, "get_active_profile_name", lambda: "default") + + handler = _FakeHandler() + parsed = urlparse("http://example.com/api/sessions") + routes.handle_get(handler, parsed) + + assert handler.status == 200 + payload = handler.json_body() + sessions = payload["sessions"] + assert all_sessions_calls["count"] == 2 + assert repaired["value"] is True + assert sessions[0]["active_stream_id"] is None + assert sessions[0]["is_streaming"] is False + + +def test_reconcile_stale_stream_state_skips_live_stream_rows(monkeypatch): + loaded = [] + + def fake_get_session(session_id, metadata_only=False): + loaded.append((session_id, metadata_only)) + raise AssertionError("live stream rows should not be loaded for cleanup") + + monkeypatch.setattr(routes, "get_session", fake_get_session) + + changed = routes._reconcile_stale_stream_state_for_session_rows([ + { + "session_id": "live-session", + "active_stream_id": "live-stream", + "is_streaming": True, + } + ]) + + assert changed is False + assert loaded == []