From 2a303de2a3aa92f2d43df8cd7a9f121a3ae62db5 Mon Sep 17 00:00:00 2001 From: Isla Liu Date: Wed, 20 May 2026 20:49:43 +0800 Subject: [PATCH] fix(session): preserve retry budget while journal is still arriving --- api/models.py | 68 +++++++++++++++++-- .../test_session_lost_response_regression.py | 36 +++++++++- 2 files changed, 98 insertions(+), 6 deletions(-) diff --git a/api/models.py b/api/models.py index b9cd532c..91b5fdbf 100644 --- a/api/models.py +++ b/api/models.py @@ -957,6 +957,39 @@ def _run_journal_has_visible_output(session, stream_id: str | None) -> bool: return False +def _journal_is_still_arriving(session, stream_id: str | None) -> bool: + """Return True for journals that may become visible on a later read. + + `read_run_events()` deliberately collapses missing files and empty files + into an empty event list, so the lazy retry path needs a small filesystem + visibility check to avoid burning all retry attempts while WSL2 / network + filesystems are still surfacing the journal. Non-empty journals are treated + as sealed enough for retry-budget accounting; if they contain no visible + output, the normal capped give-up path handles them. + """ + if not stream_id: + return False + try: + from api.run_journal import _run_path, latest_run_summary + + path = _run_path(session.session_id, stream_id) + summary = latest_run_summary(session.session_id, stream_id) + if summary.get('terminal'): + return False + try: + return (not path.exists()) or path.stat().st_size == 0 + except OSError: + return True + except Exception: + logger.debug( + "Session %s: failed to classify journal visibility for stream %s", + getattr(session, 'session_id', '?'), + stream_id, + exc_info=True, + ) + return False + + def _append_journaled_partial_output( session, stream_id: str | None, @@ -1137,7 +1170,10 @@ def _append_journaled_partial_output( # `_append_journaled_partial_output` with `dedupe_existing=True`. On # success the marker is promoted in place to the recovered-output # wording, the journaled rows are reordered to sit above the marker, -# and all retry meta is stripped. On failure attempts is incremented. +# and all retry meta is stripped. If the journal is still missing or +# zero-byte, the retry is a no-op and does not consume attempt budget. +# Terminal/non-useful journals consume attempt budget and can demote +# immediately at the max-attempt cap. # * After `_JOURNAL_RETRY_MAX_ATTEMPTS` failed retries or # `_JOURNAL_RETRY_GIVEUP_SECONDS` of wall-clock age, the marker is # demoted to the neutral wording ("Partial output may have been lost.") @@ -1245,7 +1281,9 @@ def _try_retry_journal_recovery_in_place(session) -> bool: logger.debug("lazy journal-retry already running for session %s", sid) return False try: - return _retry_journal_recovery_in_place(session) + return _retry_journal_recovery_in_place( + session, preserve_arriving_budget=True, + ) finally: lock.release() with _JOURNAL_RETRY_LOCKS_GUARD: @@ -1253,7 +1291,11 @@ def _try_retry_journal_recovery_in_place(session) -> bool: _JOURNAL_RETRY_LOCKS.pop(sid, None) -def _retry_journal_recovery_in_place(session) -> bool: +def _retry_journal_recovery_in_place( + session, + *, + preserve_arriving_budget: bool = False, +) -> bool: """Re-attempt run-journal recovery for the most recent pending marker. Returns True if the marker was promoted to the recovered-output wording. @@ -1338,12 +1380,28 @@ def _retry_journal_recovery_in_place(session) -> bool: attempts, ) return True - msg['_journal_retry_attempts'] = attempts + 1 + if ( + preserve_arriving_budget + and _journal_is_still_arriving(session, stream_id) + ): + logger.debug( + "Session %s: journal for stream %s still arriving; " + "preserving retry budget", + getattr(session, 'session_id', '?'), + stream_id, + ) + return False + next_attempts = attempts + 1 + if next_attempts >= _JOURNAL_RETRY_MAX_ATTEMPTS: + msg['content'] = _INTERRUPTED_NEUTRAL_WORDING + _strip_journal_retry_meta(msg) + else: + msg['_journal_retry_attempts'] = next_attempts try: session.save(touch_updated_at=False) except Exception: logger.debug( - "save() failed while incrementing retry counter for session %s", + "save() failed while updating retry counter for session %s", getattr(session, 'session_id', '?'), exc_info=True, ) diff --git a/tests/test_session_lost_response_regression.py b/tests/test_session_lost_response_regression.py index db129d45..0e8cea6c 100644 --- a/tests/test_session_lost_response_regression.py +++ b/tests/test_session_lost_response_regression.py @@ -240,7 +240,7 @@ def test_concurrent_get_session_serializes_lazy_journal_retry(hermes_home, monke counter_lock = threading.Lock() calls = 0 - def slow_retry(session): + def slow_retry(session, *, preserve_arriving_budget=False): nonlocal calls with counter_lock: calls += 1 @@ -260,3 +260,37 @@ def test_concurrent_get_session_serializes_lazy_journal_retry(hermes_home, monke assert calls == 1 + +def test_still_arriving_journal_does_not_consume_retry_budget(hermes_home, monkeypatch): + sid = "retry_arriving_sid" + stream_id = "retry_arriving_stream" + s = _make_pending_retry_session(sid, stream_id=stream_id) + models.SESSIONS[sid] = s + + monkeypatch.setattr(models, "_append_journaled_partial_output", lambda *a, **kw: False) + monkeypatch.setattr(models, "_journal_is_still_arriving", lambda *a, **kw: True) + + for _ in range(20): + assert models.get_session(sid) is s + + marker = s.messages[-1] + assert marker["_journal_retry_attempts"] == 0 + assert marker["_pending_journal_recovery"] is True + assert marker["content"] == models._INTERRUPTED_PENDING_RETRY_WORDING + + +def test_sealed_empty_journal_consumes_retry_budget_and_demotes_at_max(hermes_home, monkeypatch): + sid = "retry_sealed_sid" + stream_id = "retry_sealed_stream" + s = _make_pending_retry_session(sid, stream_id=stream_id) + s.messages[-1]["_journal_retry_attempts"] = models._JOURNAL_RETRY_MAX_ATTEMPTS - 1 + append_run_event(sid, stream_id, "stream_end", {}) + models.SESSIONS[sid] = s + + assert models.get_session(sid) is s + + marker = s.messages[-1] + assert marker["content"] == models._INTERRUPTED_NEUTRAL_WORDING + _assert_retry_meta_removed(marker) + assert not any(m.get("_recovered_from_run_journal") for m in s.messages) +