From 75a26174aaf63be1633128783cd98490a43f96e7 Mon Sep 17 00:00:00 2001 From: Isla Liu Date: Wed, 20 May 2026 11:58:26 +0800 Subject: [PATCH 01/10] fix(session): lazily retry run-journal recovery so the interrupted-turn marker self-heals MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When the WebUI process restarts mid-stream and sidecar repair runs while the run-journal for the dead stream is not yet visible on disk (WSL2 9p / DrvFs page-cache loss, un-fsynced journal tail on network FS, …), `_append_journaled_partial_output()` returns False and the marker is permanently baked with the "no agent output was recovered" wording even though the journaled tokens appear on disk shortly afterwards. This commit reframes the recovery contract so the read side can self-heal: * `_interrupted_recovery_marker` gains a `pending_retry=True` mode that produces a third wording ("Recovering the partial output … reload this session to retry.") and stamps a `_pending_journal_recovery` flag. * `_apply_core_sync_or_error_marker` now writes that pending-retry marker (with `_journal_retry_stream_id`, `_journal_retry_attempts`, `_journal_retry_first_seen_ts` meta) whenever it cannot recover visible output AND the stream id is known. The legacy "no output" wording is reserved for the no-stream-id case. The core-sync branch leaves marker emission to the existing visible-output check (the core transcript itself is the canonical history in that branch). * A new `_retry_journal_recovery_in_place(session)` helper re-runs `_append_journaled_partial_output(…, dedupe_existing=True)` for the latest pending marker. On success the marker is promoted in place to the recovered-output wording, the journaled rows are reordered to sit above the marker (preserving chronological order), and all retry meta is stripped. On failure attempts is incremented; after _JOURNAL_RETRY_MAX_ATTEMPTS (12) or _JOURNAL_RETRY_GIVEUP_SECONDS (24h) the marker is demoted to a neutral "Partial output may have been lost." wording. * `get_session()` cheaply short-circuits via `_session_has_pending_journal_retry()` and invokes the helper on both cache-hit and cold-load paths when a pending marker is found. `metadata_only=True` skips the helper to keep sidebar refresh cheap. The retry call runs OUTSIDE the SESSIONS LOCK to avoid a deadlock with `session.save()` write paths. No streaming write path or run_journal fsync behaviour is changed — the fix is read-side only. --- api/models.py | 339 +++++++++++++++++++++++++++++++++++++++++++++++--- 1 file changed, 322 insertions(+), 17 deletions(-) diff --git a/api/models.py b/api/models.py index 233af200..7bb831e9 100644 --- a/api/models.py +++ b/api/models.py @@ -734,27 +734,69 @@ def _get_profile_home(profile) -> Path: return Path(os.environ.get('HERMES_HOME') or '~/.hermes').expanduser() -def _interrupted_recovery_marker(*, recovered_output: bool = False) -> dict: +_INTERRUPTED_RECOVERED_WORDING = ( + '**Response interrupted.**\n\n' + 'The WebUI process restarted before this turn finished. ' + 'The partial output above was recovered from the run journal, ' + 'but the interrupted agent process could not continue.' +) +_INTERRUPTED_NO_OUTPUT_WORDING = ( + '**Response interrupted.**\n\n' + 'The WebUI process restarted before this turn finished. ' + 'The user message above was preserved, but no agent output was recovered.' +) +_INTERRUPTED_PENDING_RETRY_WORDING = ( + '**Response interrupted.**\n\n' + 'The WebUI process restarted before this turn finished. ' + 'Recovering the partial output from the run journal — ' + 'reload this session to retry.' +) +# Neutral wording used when the lazy retry path gives up (max attempts reached +# or the marker has been pending longer than _JOURNAL_RETRY_GIVEUP_SECONDS). +_INTERRUPTED_NEUTRAL_WORDING = ( + '**Response interrupted.**\n\n' + 'The WebUI process restarted before this turn finished. ' + 'Partial output may have been lost.' +) + + +def _interrupted_recovery_marker( + *, + recovered_output: bool = False, + pending_retry: bool = False, +) -> dict: + """Build the standard interrupted-turn marker. + + ``recovered_output=True`` means the run journal already yielded visible + text on this repair pass — the marker advertises that the partial output + has been recovered. + + ``pending_retry=True`` is the lazy-retry hook: the journal was unreadable + on this pass (page-cache loss, un-fsynced writes on slow FS, etc.). The + marker carries a ``_pending_journal_recovery`` flag so a later + ``get_session()`` can re-attempt recovery without baking a permanent + "no output" claim into the transcript. + + The two are mutually exclusive; ``recovered_output`` wins if both are + set so the caller cannot accidentally re-arm retry on a successful + repair. + """ if recovered_output: - content = ( - '**Response interrupted.**\n\n' - 'The WebUI process restarted before this turn finished. ' - 'The partial output above was recovered from the run journal, ' - 'but the interrupted agent process could not continue.' - ) + content = _INTERRUPTED_RECOVERED_WORDING + elif pending_retry: + content = _INTERRUPTED_PENDING_RETRY_WORDING else: - content = ( - '**Response interrupted.**\n\n' - 'The WebUI process restarted before this turn finished. ' - 'The user message above was preserved, but no agent output was recovered.' - ) - return { + content = _INTERRUPTED_NO_OUTPUT_WORDING + marker = { 'role': 'assistant', 'content': content, 'timestamp': int(time.time()), '_error': True, 'type': 'interrupted', } + if pending_retry and not recovered_output: + marker['_pending_journal_recovery'] = True + return marker def _truncate_journal_tool_args(args, limit: int = 4) -> dict: @@ -1038,6 +1080,226 @@ def _append_journaled_partial_output( return appended_any +# ── Lazy run-journal recovery (read-side self-heal) ───────────────────────── +# +# When sidecar repair runs before the run-journal for the dead stream is +# visible on disk (page-cache loss on WSL2 9p / DrvFs, an un-fsynced journal +# tail, a slow network FS, …), `_append_journaled_partial_output` returns +# False even though the journaled events will appear on disk shortly. Without +# the helpers below the repair path baked a permanent "no agent output was +# recovered" claim into the marker, and a later session read could never +# correct it. +# +# The contract is: +# +# * Sidecar repair (`_apply_core_sync_or_error_marker`) writes a marker +# with `_pending_journal_recovery=True` whenever it could not recover +# visible output AND the stream id is known. Three retry-meta keys go +# onto the marker: `_journal_retry_stream_id`, `_journal_retry_attempts`, +# `_journal_retry_first_seen_ts`. +# * Every `get_session()` call that returns the full session checks the +# latest assistant marker; if the flag is set it re-runs +# `_append_journaled_partial_output` with `dedupe_existing=True`. On +# success the marker is promoted in place to the recovered-output +# wording, the journaled rows are reordered to sit above the marker, +# and all retry meta is stripped. On failure attempts is incremented. +# * After `_JOURNAL_RETRY_MAX_ATTEMPTS` failed retries or +# `_JOURNAL_RETRY_GIVEUP_SECONDS` of wall-clock age, the marker is +# demoted to the neutral wording ("Partial output may have been lost.") +# so users do not see "reload to retry" prompts forever. +_JOURNAL_RETRY_MAX_ATTEMPTS = 12 +_JOURNAL_RETRY_GIVEUP_SECONDS = 24 * 3600 + + +def _build_recovery_marker_with_retry_hook( + *, recovered_output: bool, stream_id: str | None, +) -> dict: + """Build an interrupted-turn marker, arming the lazy-retry hook when + visible output was not recovered yet but a stream id is available.""" + if recovered_output: + return _interrupted_recovery_marker(recovered_output=True) + if not stream_id: + return _interrupted_recovery_marker(recovered_output=False) + marker = _interrupted_recovery_marker(pending_retry=True) + marker['_journal_retry_stream_id'] = str(stream_id) + marker['_journal_retry_attempts'] = 0 + marker['_journal_retry_first_seen_ts'] = int(time.time()) + return marker + + +def _session_has_pending_journal_retry(session) -> bool: + """Cheap short-circuit: scan from the tail until the most recent normal + assistant turn. Any `_pending_journal_recovery` flag found before then + means a retry is queued. + """ + messages = getattr(session, 'messages', None) or [] + for msg in reversed(messages): + if not isinstance(msg, dict): + continue + if msg.get('_pending_journal_recovery'): + return True + if msg.get('role') == 'assistant' and not msg.get('_error'): + # A normal assistant turn after any pending marker — nothing to + # retry above this point. + return False + return False + + +def _strip_journal_retry_meta(marker: dict) -> None: + marker.pop('_pending_journal_recovery', None) + marker.pop('_journal_retry_stream_id', None) + marker.pop('_journal_retry_attempts', None) + marker.pop('_journal_retry_first_seen_ts', None) + + +def _reorder_journal_tail_above_marker(session, marker_idx: int) -> None: + """Move `_recovered_from_run_journal=True` rows appended *after* + ``marker_idx`` to sit immediately above the marker so chronological + order is preserved (journaled output happened during the turn, marker + annotates its end). + """ + messages = session.messages + if marker_idx < 0 or marker_idx >= len(messages): + return + tail = messages[marker_idx + 1 :] + if not tail: + return + journaled = [ + m for m in tail + if isinstance(m, dict) and m.get('_recovered_from_run_journal') + ] + if not journaled: + return + rest = [ + m for m in tail + if not (isinstance(m, dict) and m.get('_recovered_from_run_journal')) + ] + marker = messages[marker_idx] + new_messages = ( + messages[:marker_idx] + + journaled + + [marker] + + rest + ) + # Rebase any tool_calls.assistant_msg_idx values that pointed into the + # journaled rows when they were appended at the tail. + old_journaled_idx_base = marker_idx + 1 + new_journaled_idx_base = marker_idx + shift = new_journaled_idx_base - old_journaled_idx_base # = -1 + for tool_call in session.tool_calls or []: + if not isinstance(tool_call, dict): + continue + idx = tool_call.get('assistant_msg_idx') + if isinstance(idx, int) and idx >= old_journaled_idx_base \ + and idx < old_journaled_idx_base + len(journaled): + tool_call['assistant_msg_idx'] = idx + shift + session.messages = new_messages + + +def _retry_journal_recovery_in_place(session) -> bool: + """Re-attempt run-journal recovery for the most recent pending marker. + + Returns True if the marker was promoted to the recovered-output wording. + Never raises — caller is best-effort. + """ + try: + messages = session.messages or [] + for idx in range(len(messages) - 1, -1, -1): + msg = messages[idx] + if not isinstance(msg, dict): + continue + if msg.get('role') == 'assistant' and not msg.get('_error') \ + and not msg.get('_pending_journal_recovery'): + # Walked past the pending marker without finding it. + return False + if not ( + msg.get('type') == 'interrupted' + and msg.get('_pending_journal_recovery') + ): + continue + stream_id = msg.get('_journal_retry_stream_id') + first_seen = msg.get('_journal_retry_first_seen_ts') or 0 + attempts = int(msg.get('_journal_retry_attempts') or 0) + now = time.time() + give_up = ( + attempts >= _JOURNAL_RETRY_MAX_ATTEMPTS + or ( + first_seen + and now - float(first_seen) > _JOURNAL_RETRY_GIVEUP_SECONDS + ) + ) + if not stream_id: + # No stream id to retry against; demote immediately. + msg['content'] = _INTERRUPTED_NEUTRAL_WORDING + _strip_journal_retry_meta(msg) + try: + session.save(touch_updated_at=False) + except Exception: + logger.debug( + "save() failed while demoting marker for session %s", + getattr(session, 'session_id', '?'), + exc_info=True, + ) + return False + if give_up: + msg['content'] = _INTERRUPTED_NEUTRAL_WORDING + _strip_journal_retry_meta(msg) + try: + session.save(touch_updated_at=False) + except Exception: + logger.debug( + "save() failed while demoting marker for session %s", + getattr(session, 'session_id', '?'), + exc_info=True, + ) + return False + tail_len_before = len(session.messages) + ok = _append_journaled_partial_output( + session, stream_id, dedupe_existing=True, + ) + if ok: + msg['content'] = _INTERRUPTED_RECOVERED_WORDING + _strip_journal_retry_meta(msg) + # The journaled rows were appended at the end of messages; + # only the rows past the previous tail count as "newly + # journaled" and need to move above the marker. + _ = tail_len_before # informational; helper below scans + _reorder_journal_tail_above_marker(session, idx) + try: + session.save(touch_updated_at=False) + except Exception: + logger.debug( + "save() failed while promoting marker for session %s", + getattr(session, 'session_id', '?'), + exc_info=True, + ) + logger.info( + "Session %s: lazy journal-recovery promoted marker for " + "stream %s after %d attempts", + getattr(session, 'session_id', '?'), + stream_id, + attempts, + ) + return True + msg['_journal_retry_attempts'] = attempts + 1 + try: + session.save(touch_updated_at=False) + except Exception: + logger.debug( + "save() failed while incrementing retry counter for session %s", + getattr(session, 'session_id', '?'), + exc_info=True, + ) + return False + return False + except Exception: + logger.exception( + "_retry_journal_recovery_in_place failed for session %s", + getattr(session, 'session_id', '?'), + ) + return False + + def _apply_core_sync_or_error_marker( session, core_path, @@ -1105,11 +1367,16 @@ def _apply_core_sync_or_error_marker( session, stream_id_for_recheck or session.active_stream_id, ) + _stream_id = stream_id_for_recheck or session.active_stream_id session.active_stream_id = None session.pending_user_message = None session.pending_attachments = [] session.pending_started_at = None - session.messages.append(_interrupted_recovery_marker(recovered_output=recovered_output)) + session.messages.append( + _build_recovery_marker_with_retry_hook( + recovered_output=recovered_output, stream_id=_stream_id, + ) + ) session.save(touch_updated_at=touch_updated_at) logger.info( "Session %s: recovered pending user turn (messages non-empty), added error marker", @@ -1160,6 +1427,17 @@ def _apply_core_sync_or_error_marker( session.messages.append( _interrupted_recovery_marker(recovered_output=True) ) + # NOTE: when the core transcript was synced in but the run journal + # is not yet visible, intentionally do NOT append a lazy-retry + # marker here. In this branch the canonical history is the core + # transcript itself (which has already been written to s.messages + # above) and the marker is purely advisory — the existing contract + # is "marker only when there is a recovered partial turn to + # annotate". Adding a pending-retry marker on every empty-journal + # core-sync would surface a spurious "reload to retry" banner on + # sessions whose journal is legitimately absent (e.g. archived + # streams). The first and third branches handle the lost-response + # case where the marker is the only signal the user gets. session.save(touch_updated_at=touch_updated_at) logger.info( "Session %s: synced %d messages from core transcript%s", @@ -1182,11 +1460,16 @@ def _apply_core_sync_or_error_marker( session, stream_id_for_recheck or session.active_stream_id, ) + _stream_id = stream_id_for_recheck or session.active_stream_id session.active_stream_id = None session.pending_user_message = None session.pending_attachments = [] session.pending_started_at = None - session.messages.append(_interrupted_recovery_marker(recovered_output=recovered_output)) + session.messages.append( + _build_recovery_marker_with_retry_hook( + recovered_output=recovered_output, stream_id=_stream_id, + ) + ) session.save(touch_updated_at=touch_updated_at) logger.info("Session %s: no core transcript found, added error marker", sid) return True @@ -1306,9 +1589,19 @@ def get_session(sid, metadata_only=False): actual message history (e.g., for fast sidebar switching). """ with LOCK: - if sid in SESSIONS: + cached = SESSIONS.get(sid) + if cached is not None: SESSIONS.move_to_end(sid) # LRU: mark as recently used - return SESSIONS[sid] + if cached is not None: + if not metadata_only and _session_has_pending_journal_retry(cached): + try: + _retry_journal_recovery_in_place(cached) + except Exception: + logger.debug( + "lazy journal-retry failed on cache hit for session %s", + sid, exc_info=True, + ) + return cached if metadata_only: s = Session.load_metadata_only(sid) if s: @@ -1324,6 +1617,18 @@ def get_session(sid, metadata_only=False): if not metadata_only: try: repaired = _repair_stale_pending(s) + # If the stale-pending repair did not fire but the session + # already carries a pending-journal-retry marker (e.g. set on + # a previous repair pass), give the lazy-retry path one + # chance to self-heal on this read. + if not repaired and _session_has_pending_journal_retry(s): + try: + _retry_journal_recovery_in_place(s) + except Exception: + logger.debug( + "lazy journal-retry failed on cold load for session %s", + sid, exc_info=True, + ) # If repair had to bail because the per-session lock was held, # do not pin the still-stale sidecar in the LRU cache forever. # Leaving it cached would prevent future get_session() calls from From e8cd0bcc66838fe5abeb6933cd9da60f13c14856 Mon Sep 17 00:00:00 2001 From: Isla Liu Date: Wed, 20 May 2026 11:58:37 +0800 Subject: [PATCH 02/10] test(session): end-to-end regression for lost-response self-heal MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Reproduces the production failure mode: 1. Stage 1 — sidecar repair runs while the run-journal for the dead stream is empty on disk. Assert the marker arms the lazy-retry hook (`_pending_journal_recovery=True`, `_journal_retry_stream_id`, `_journal_retry_attempts=0`, `_journal_retry_first_seen_ts`) and does NOT carry the legacy "no agent output was recovered" wording. Pending sidecar fields are cleared regardless. 2. Stage 2 — journaled token / tool / tool_complete / token events appear on disk. Call `get_session(sid)` and assert the marker self-heals: wording promotes to "recovered from the run journal", journaled assistant rows + tool card land above the marker in chronological order, all retry meta is stripped. Without the lazy-retry path this test fails at the very first assertion (marker still carries the legacy no-output wording). --- .../test_session_lost_response_regression.py | 202 ++++++++++++++++++ 1 file changed, 202 insertions(+) create mode 100644 tests/test_session_lost_response_regression.py diff --git a/tests/test_session_lost_response_regression.py b/tests/test_session_lost_response_regression.py new file mode 100644 index 00000000..aad6a17d --- /dev/null +++ b/tests/test_session_lost_response_regression.py @@ -0,0 +1,202 @@ +"""Regression: lazy-retry run-journal recovery across multiple session reads. + +The scenario this test pins down: + +1. A WebUI process restarts mid-stream. On the first sidecar repair attempt + the run-journal for the dead stream is NOT visible yet (page-cache loss, + un-fsynced writes, slow network FS, etc.) so + `_append_journaled_partial_output` returns False. +2. Pre-fix the repair path baked a permanent "no agent output was recovered" + marker into the session and never looked at the journal again — even + after the journaled tokens appeared on disk on a later read. +3. With the fix, the repair instead leaves a `_pending_journal_recovery` + flag on the marker; the next `get_session()` call lazily re-runs the + recovery, promotes the marker wording, and threads the journaled + assistant text/tools into the transcript in the correct chronological + position. +""" +import json +import queue +from pathlib import Path + +import pytest + +import api.models as models +import api.config as config +import api.profiles as profiles +import api.streaming as streaming # noqa: F401 imported for fixture parity +from api.models import ( + Session, + _apply_core_sync_or_error_marker, +) +from api.run_journal import append_run_event + + +# ── Fixtures (shape mirrors test_session_sidecar_repair.py) ──────────────── + + +@pytest.fixture(autouse=True) +def _isolate_session_dir(tmp_path, monkeypatch): + session_dir = tmp_path / "sessions" + session_dir.mkdir() + index_file = session_dir / "_index.json" + monkeypatch.setattr(models, "SESSION_DIR", session_dir) + monkeypatch.setattr(models, "SESSION_INDEX_FILE", index_file) + models.SESSIONS.clear() + yield session_dir, index_file + models.SESSIONS.clear() + + +@pytest.fixture(autouse=True) +def _isolate_stream_state(): + config.STREAMS.clear() + config.CANCEL_FLAGS.clear() + config.AGENT_INSTANCES.clear() + config.STREAM_PARTIAL_TEXT.clear() + yield + config.STREAMS.clear() + config.CANCEL_FLAGS.clear() + config.AGENT_INSTANCES.clear() + config.STREAM_PARTIAL_TEXT.clear() + + +@pytest.fixture(autouse=True) +def _isolate_agent_locks(): + config.SESSION_AGENT_LOCKS.clear() + yield + config.SESSION_AGENT_LOCKS.clear() + + +@pytest.fixture() +def hermes_home(tmp_path, monkeypatch): + home = tmp_path / "hermes_home" + home.mkdir() + (home / "sessions").mkdir() + monkeypatch.setenv("HERMES_HOME", str(home)) + monkeypatch.setattr(profiles, "_DEFAULT_HERMES_HOME", home) + return home + + +def _make_dead_stream_session( + session_id: str, + *, + stream_id: str, + existing_msgs_count: int = 96, + pending_text: str = ( + "[IMPORTANT: Background process polling. " + "Continue the user's prior request.]" + ), +): + """Build a session that mirrors the production bug: lots of prior history, + pending_user_message set, an active_stream_id pointing at a dead stream, + and pending_started_at populated.""" + messages = [] + for i in range(existing_msgs_count // 2): + messages.append({"role": "user", "content": f"q{i}"}) + messages.append({"role": "assistant", "content": f"a{i}"}) + s = Session(session_id=session_id, title="Lost-response repro", messages=messages) + s.pending_user_message = pending_text + s.pending_started_at = 1779237637 # production-shaped value + s.active_stream_id = stream_id + return s + + +# ── The regression test ──────────────────────────────────────────────────── + + +def test_lost_response_recovered_on_second_read(hermes_home, monkeypatch): + sid = "9f14583f0e4e4444aaaa111122223333" + stream_id = "7c8b4108d52b4aba9af362d3a54f47ac" + + # ── Stage 1: simulate page-cache loss — sidecar repair runs while the + # run-journal for this stream is empty/absent on disk. + s = _make_dead_stream_session(sid, stream_id=stream_id) + s.save() + core_path = hermes_home / "sessions" / f"session_{sid}.json" + + result = _apply_core_sync_or_error_marker( + s, core_path, stream_id_for_recheck=stream_id, + ) + assert result is True + + # Marker should carry the lazy-retry flag and *not* the permanent + # "no agent output was recovered" wording yet. + last = s.messages[-1] + assert last.get("_error") is True + assert last.get("type") == "interrupted" + assert last.get("_pending_journal_recovery") is True, ( + "First repair pass should defer the recovery decision via a " + "_pending_journal_recovery flag so a later read can self-heal." + ) + assert last.get("_journal_retry_stream_id") == stream_id + assert last.get("_journal_retry_attempts") == 0 + assert isinstance(last.get("_journal_retry_first_seen_ts"), int) + assert "no agent output was recovered" not in last["content"] + # pending fields cleared regardless of journal visibility + assert s.pending_user_message is None + assert s.active_stream_id is None + assert s.pending_started_at is None + + # ── Stage 2: the journaled events become visible on disk. + append_run_event(sid, stream_id, "token", {"text": "Checking GitHub first."}) + append_run_event( + sid, + stream_id, + "tool", + { + "name": "terminal", + "preview": "gh pr list --repo nesquena/hermes-webui", + "args": {"command": "gh pr list --repo nesquena/hermes-webui"}, + }, + ) + append_run_event( + sid, + stream_id, + "tool_complete", + {"name": "terminal", "duration": 1.2, "is_error": False}, + ) + append_run_event( + sid, stream_id, "token", {"text": " The first PR scan completed."}, + ) + + # Pin session into the LRU cache and call get_session — this is the + # production path that triggers lazy retry. + models.SESSIONS[sid] = s + reloaded = models.get_session(sid) + assert reloaded is s + + contents = [m.get("content", "") for m in s.messages] + # The marker self-healed: + assert any("recovered from the run journal" in c for c in contents), ( + "After journaled tokens become readable, the marker must promote to " + "the recovered-output wording." + ) + assert not any("no agent output was recovered" in c for c in contents) + + # The journaled assistant text and tool card landed BEFORE the marker + # so chronological order in the transcript is preserved. + marker_idx = next( + i for i, m in enumerate(s.messages) + if m.get("type") == "interrupted" and m.get("_error") + ) + recovered_msgs = [ + m for m in s.messages[:marker_idx] + if m.get("_recovered_from_run_journal") is True + ] + assert recovered_msgs, "recovered assistant content must sit above the marker" + recovered_text = " ".join(m.get("content", "") for m in recovered_msgs) + assert "Checking GitHub first." in recovered_text + assert "first PR scan completed" in recovered_text + + # Tool card lives in session.tool_calls and points at one of the + # recovered assistant indices. + assert s.tool_calls, "journaled tool should be materialized" + assert s.tool_calls[-1]["name"] == "terminal" + assert s.tool_calls[-1]["done"] is True + + # Flag and meta cleaned up after promotion. + promoted = s.messages[marker_idx] + assert "_pending_journal_recovery" not in promoted + assert "_journal_retry_stream_id" not in promoted + assert "_journal_retry_attempts" not in promoted + assert "_journal_retry_first_seen_ts" not in promoted From 2387720068857f7be69cb14652ac081c81a222f7 Mon Sep 17 00:00:00 2001 From: Isla Liu Date: Wed, 20 May 2026 11:58:54 +0800 Subject: [PATCH 03/10] test(session): unit + backwards-compat + WSL race coverage for lazy-retry path MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds five test classes that together pin down the contract added in the previous commit and protect pre-fix session shapes: * `TestInterruptedRecoveryMarker` — pure-function tests for the new `pending_retry=True` keyword and the mutual-exclusion rule between `recovered_output=True` and `pending_retry=True`. * `TestRetryJournalRecoveryInPlace` — promote-on-success, increment-on-failure, demote-after-max-attempts, demote-after-giveup-seconds, no-op when no pending marker, and the `_session_has_pending_journal_retry` short-circuit (which stops at the most recent normal assistant turn). * `TestGetSessionLazyRetryHook` — both `get_session()` entry paths (cache-hit and cold-load) trigger the helper when a pending marker is present; the short-circuit avoids the helper when nothing is pending; and `metadata_only=True` skips the helper to keep sidebar refresh free. * `TestLazyRetryBackwardsCompat` — pre-fix sessions whose markers use the legacy "no agent output" wording (no flag) are not touched by `get_session()`. The four retry-meta keys round-trip cleanly through `Session.save()` / `Session.load()`. * `TestWslPageCacheRace` — covers the WSL2 / network-FS shape: a first `read_run_events` raising IOError followed by a successful read; a journal that grows visible tokens between sidecar repair and retry; and two concurrent `get_session(sid)` calls converging on a single promoted marker with a single recovered body (deduped by `dedupe_existing=True`). Two pre-existing assertions had to be relaxed because they encoded the buggy contract (permanent "no agent output was recovered" / "user message above was preserved" wording in the journal-empty + stream-id known case). Both tests now accept either the legacy wording or the new "Recovering the partial output…" wording with the pending-retry flag, reflecting the broader fact that the old wording was the bug. --- tests/test_session_sidecar_repair.py | 448 ++++++++++++++++++++++++++- 1 file changed, 446 insertions(+), 2 deletions(-) diff --git a/tests/test_session_sidecar_repair.py b/tests/test_session_sidecar_repair.py index ca2bda45..34377e78 100644 --- a/tests/test_session_sidecar_repair.py +++ b/tests/test_session_sidecar_repair.py @@ -313,7 +313,14 @@ class TestDraftRecovery: ) assert "Response interrupted" in content assert "WebUI process restarted" in content - assert "user message above was preserved" in content + # The marker now arms the lazy-retry hook when a stream id is known + # (Recovering the partial output... reload to retry). The legacy + # "user message above was preserved" wording is reserved for the + # no-stream-id and post-retry-give-up cases. + assert ( + "user message above was preserved" in content + or "Recovering the partial output" in content + ) assert error_msgs[0].get("type") == "interrupted" def test_pending_attachments_recovered(self, hermes_home, monkeypatch): @@ -705,7 +712,14 @@ class TestNonEmptyMessagesPendingCleared: assert not any("private scratchpad text" in c for c in contents) error_msgs = [m for m in s.messages if m.get("_error")] assert len(error_msgs) == 1 - assert "no agent output was recovered" in error_msgs[0]["content"] + # Reasoning-only events do not count as visible output, so the marker + # arms the lazy-retry hook (stream id is known, journal may have more + # events appear on a later read). The legacy "no agent output was + # recovered" wording is now reserved for the no-stream-id case. + assert error_msgs[0].get("_pending_journal_recovery") is True + assert error_msgs[0].get("_journal_retry_stream_id") == "reasoning_only_stream" + assert "no agent output was recovered" not in error_msgs[0]["content"] + assert "Recovering the partial output" in error_msgs[0]["content"] def test_journal_recovery_keeps_consecutive_tools_on_one_anchor(self, hermes_home, monkeypatch): """Consecutive journaled tools without an intervening visible update @@ -1153,3 +1167,433 @@ class TestCoreSyncMetadata: assert len(user_msgs) == 1 assert user_msgs[0]["content"] == "My question" assert user_msgs[0].get("_recovered") is True + + +# ── Lazy run-journal recovery (read-side self-heal) ───────────────────────── + +class TestInterruptedRecoveryMarker: + """Pure-function tests for _interrupted_recovery_marker(pending_retry=…).""" + + def test_marker_recovered_output_excludes_pending_retry_flag(self): + marker = models._interrupted_recovery_marker(recovered_output=True) + assert marker["_error"] is True + assert marker["type"] == "interrupted" + assert "_pending_journal_recovery" not in marker + assert "recovered from the run journal" in marker["content"] + + def test_marker_pending_retry_sets_flag_and_wording(self): + marker = models._interrupted_recovery_marker(pending_retry=True) + assert marker.get("_pending_journal_recovery") is True + assert "Recovering the partial output" in marker["content"] + assert "no agent output was recovered" not in marker["content"] + + def test_marker_recovered_output_beats_pending_retry(self): + marker = models._interrupted_recovery_marker( + recovered_output=True, pending_retry=True, + ) + assert "_pending_journal_recovery" not in marker + assert "recovered from the run journal" in marker["content"] + + def test_marker_default_wording_unchanged_for_no_output_no_retry(self): + marker = models._interrupted_recovery_marker() + assert "_pending_journal_recovery" not in marker + assert "no agent output was recovered" in marker["content"] + + +class TestRetryJournalRecoveryInPlace: + """In-place retry helper: promote / increment / give up.""" + + def _make_pending_session(self, hermes_home, stream_id="lazy_stream", + attempts=0, first_seen_ts=None): + s = _make_session(messages=[ + {"role": "user", "content": "earlier turn"}, + {"role": "assistant", "content": "earlier reply"}, + {"role": "user", "content": "Stuck draft", "_recovered": True}, + ]) + marker = models._interrupted_recovery_marker(pending_retry=True) + marker["_journal_retry_stream_id"] = stream_id + marker["_journal_retry_attempts"] = attempts + marker["_journal_retry_first_seen_ts"] = ( + first_seen_ts if first_seen_ts is not None else int(time.time()) + ) + s.messages.append(marker) + s.save() + return s + + def test_promotes_marker_when_journal_now_available(self, hermes_home, monkeypatch): + stream_id = "lazy_stream_promote" + s = self._make_pending_session(hermes_home, stream_id=stream_id) + marker_before = s.messages[-1] + + append_run_event(s.session_id, stream_id, "token", {"text": "Late tokens."}) + append_run_event( + s.session_id, stream_id, "tool", + {"name": "terminal", "preview": "echo", "args": {"cmd": "echo"}}, + ) + + ok = models._retry_journal_recovery_in_place(s) + assert ok is True + + # Marker promoted in place and meta stripped + marker_idx = next( + i for i, m in enumerate(s.messages) + if m.get("type") == "interrupted" and m.get("_error") + ) + promoted = s.messages[marker_idx] + assert promoted is marker_before + assert "recovered from the run journal" in promoted["content"] + assert "_pending_journal_recovery" not in promoted + assert "_journal_retry_stream_id" not in promoted + assert "_journal_retry_attempts" not in promoted + assert "_journal_retry_first_seen_ts" not in promoted + + # Journaled rows reordered ABOVE the marker, preserving order + before_marker = s.messages[:marker_idx] + recovered = [m for m in before_marker if m.get("_recovered_from_run_journal")] + assert recovered, "journaled assistant rows must sit above the marker" + assert any("Late tokens." in m.get("content", "") for m in recovered) + # tool_call.assistant_msg_idx still points into the valid range + for tc in s.tool_calls or []: + idx = tc.get("assistant_msg_idx") + if isinstance(idx, int): + assert 0 <= idx < len(s.messages) + + def test_increments_attempts_when_journal_still_empty(self, hermes_home, monkeypatch): + stream_id = "lazy_stream_increment" + s = self._make_pending_session(hermes_home, stream_id=stream_id) + # No append_run_event — journal stays empty. + ok = models._retry_journal_recovery_in_place(s) + assert ok is False + marker = s.messages[-1] + assert marker.get("_pending_journal_recovery") is True + assert marker.get("_journal_retry_attempts") == 1 + assert "Recovering the partial output" in marker["content"] + + def test_demotes_to_neutral_after_max_attempts(self, hermes_home, monkeypatch): + stream_id = "lazy_stream_giveup_attempts" + s = self._make_pending_session( + hermes_home, stream_id=stream_id, + attempts=models._JOURNAL_RETRY_MAX_ATTEMPTS, + ) + ok = models._retry_journal_recovery_in_place(s) + assert ok is False + marker = s.messages[-1] + assert "_pending_journal_recovery" not in marker + assert "_journal_retry_stream_id" not in marker + assert "_journal_retry_attempts" not in marker + assert "_journal_retry_first_seen_ts" not in marker + assert "Partial output may have been lost" in marker["content"] + assert "Recovering the partial output" not in marker["content"] + + def test_demotes_to_neutral_after_giveup_seconds(self, hermes_home, monkeypatch): + stream_id = "lazy_stream_giveup_age" + first_seen = int(time.time()) - (models._JOURNAL_RETRY_GIVEUP_SECONDS + 60) + s = self._make_pending_session( + hermes_home, stream_id=stream_id, first_seen_ts=first_seen, + ) + ok = models._retry_journal_recovery_in_place(s) + assert ok is False + marker = s.messages[-1] + assert "_pending_journal_recovery" not in marker + assert "Partial output may have been lost" in marker["content"] + + def test_noop_when_no_pending_marker(self, hermes_home, monkeypatch): + s = _make_session(messages=[ + {"role": "user", "content": "x"}, + {"role": "assistant", "content": "y"}, + ]) + s.save() + spy = [] + original = models._append_journaled_partial_output + + def _spy(*a, **kw): + spy.append(1) + return original(*a, **kw) + + monkeypatch.setattr(models, "_append_journaled_partial_output", _spy) + ok = models._retry_journal_recovery_in_place(s) + assert ok is False + assert spy == [], "no pending marker → must not call recovery" + + def test_short_circuit_helper_detects_pending_marker(self, hermes_home, monkeypatch): + s = _make_session(messages=[ + {"role": "user", "content": "x"}, + {"role": "assistant", "content": "y"}, + ]) + assert models._session_has_pending_journal_retry(s) is False + marker = models._interrupted_recovery_marker(pending_retry=True) + marker["_journal_retry_stream_id"] = "abc" + marker["_journal_retry_attempts"] = 0 + marker["_journal_retry_first_seen_ts"] = int(time.time()) + s.messages.append(marker) + assert models._session_has_pending_journal_retry(s) is True + + def test_short_circuit_helper_stops_at_normal_assistant(self, hermes_home, monkeypatch): + s = _make_session(messages=[ + {"role": "user", "content": "x"}, + ]) + # An old, already-promoted marker followed by a normal assistant turn — + # the pending flag belongs to a prior turn that was healed; helper must + # not loop back into it. + marker = models._interrupted_recovery_marker(pending_retry=True) + marker["_journal_retry_stream_id"] = "abc" + marker["_journal_retry_attempts"] = 0 + marker["_journal_retry_first_seen_ts"] = int(time.time()) + s.messages.append(marker) + s.messages.append({"role": "user", "content": "later"}) + s.messages.append({"role": "assistant", "content": "later reply"}) + assert models._session_has_pending_journal_retry(s) is False + + +class TestGetSessionLazyRetryHook: + """get_session() must trigger _retry_journal_recovery_in_place on both + cache-hit and cold-load paths when a pending marker exists, and skip + quickly when nothing is pending.""" + + def _make_session_with_pending_marker(self, sid="lazy_get", stream_id="st"): + s = _make_session(session_id=sid, messages=[ + {"role": "user", "content": "u"}, + {"role": "assistant", "content": "a"}, + {"role": "user", "content": "u2", "_recovered": True}, + ]) + marker = models._interrupted_recovery_marker(pending_retry=True) + marker["_journal_retry_stream_id"] = stream_id + marker["_journal_retry_attempts"] = 0 + marker["_journal_retry_first_seen_ts"] = int(time.time()) + s.messages.append(marker) + return s + + def test_triggers_retry_on_cache_hit(self, hermes_home, monkeypatch): + sid = "lazy_get_cache" + stream_id = "stream_cache" + s = self._make_session_with_pending_marker(sid=sid, stream_id=stream_id) + s.save() + models.SESSIONS[sid] = s + append_run_event(sid, stream_id, "token", {"text": "Late."}) + + reloaded = models.get_session(sid) + assert reloaded is s + marker = s.messages[-1] if "Recovering" in s.messages[-1]["content"] else next( + m for m in s.messages + if m.get("type") == "interrupted" and m.get("_error") + ) + assert "recovered from the run journal" in marker["content"] + assert "_pending_journal_recovery" not in marker + + def test_triggers_retry_on_cold_load(self, hermes_home, monkeypatch): + sid = "lazy_get_cold" + stream_id = "stream_cold" + s = self._make_session_with_pending_marker(sid=sid, stream_id=stream_id) + s.save() + models.SESSIONS.pop(sid, None) + append_run_event(sid, stream_id, "token", {"text": "Late."}) + + reloaded = models.get_session(sid) + marker = next( + m for m in reloaded.messages + if m.get("type") == "interrupted" and m.get("_error") + ) + assert "recovered from the run journal" in marker["content"] + assert "_pending_journal_recovery" not in marker + + def test_short_circuit_when_no_pending_marker(self, hermes_home, monkeypatch): + sid = "lazy_get_no_pending" + s = _make_session(session_id=sid, messages=[ + {"role": "user", "content": "x"}, + {"role": "assistant", "content": "y"}, + ]) + s.save() + models.SESSIONS[sid] = s + spy = [] + monkeypatch.setattr( + models, "_retry_journal_recovery_in_place", + lambda session: spy.append(1) or False, + ) + models.get_session(sid) + assert spy == [] + + def test_metadata_only_skips_retry(self, hermes_home, monkeypatch): + sid = "lazy_get_meta" + stream_id = "stream_meta" + s = self._make_session_with_pending_marker(sid=sid, stream_id=stream_id) + s.save() + models.SESSIONS[sid] = s + spy = [] + monkeypatch.setattr( + models, "_retry_journal_recovery_in_place", + lambda session: spy.append(1) or False, + ) + models.get_session(sid, metadata_only=True) + assert spy == [], "metadata_only must skip the lazy-retry helper" + + +class TestLazyRetryBackwardsCompat: + """Pre-fix session shapes must continue to work.""" + + def test_legacy_marker_without_flag_unchanged(self, hermes_home, monkeypatch): + """An old session whose marker carries the legacy 'no agent output' + wording (no flag) must not be touched by get_session().""" + sid = "legacy_marker_sid" + s = _make_session(session_id=sid, messages=[ + {"role": "user", "content": "x"}, + {"role": "assistant", "content": "y"}, + {"role": "user", "content": "later", "_recovered": True}, + ]) + legacy = models._interrupted_recovery_marker(recovered_output=False) + s.messages.append(legacy) + s.save() + models.SESSIONS.pop(sid, None) + spy = [] + original = models._append_journaled_partial_output + monkeypatch.setattr( + models, "_append_journaled_partial_output", + lambda *a, **kw: spy.append(1) or original(*a, **kw), + ) + models.get_session(sid) + assert spy == [], "legacy marker (no flag) must not re-trigger recovery" + + def test_pending_retry_marker_round_trips_through_session_save_and_load( + self, hermes_home, monkeypatch): + """All four retry meta keys must survive Session.save() / Session.load().""" + sid = "round_trip_sid" + s = _make_session(session_id=sid, messages=[ + {"role": "user", "content": "x"}, + ]) + marker = models._interrupted_recovery_marker(pending_retry=True) + marker["_journal_retry_stream_id"] = "abc" + marker["_journal_retry_attempts"] = 3 + marker["_journal_retry_first_seen_ts"] = 1779200000 + s.messages.append(marker) + s.save() + models.SESSIONS.pop(sid, None) + + reloaded = Session.load(sid) + last = reloaded.messages[-1] + assert last.get("_pending_journal_recovery") is True + assert last.get("_journal_retry_stream_id") == "abc" + assert last.get("_journal_retry_attempts") == 3 + assert last.get("_journal_retry_first_seen_ts") == 1779200000 + + +class TestWslPageCacheRace: + """Cover the WSL2 / network-FS shape: read_run_events returns empty / errors + first, recovers on a later call.""" + + def test_first_read_raises_oserror_second_read_succeeds(self, hermes_home, monkeypatch): + sid = "wsl_race_sid" + stream_id = "wsl_race_stream" + s = _make_session(session_id=sid, messages=[ + {"role": "user", "content": "x"}, + ]) + s.pending_user_message = "Keep going" + s.pending_started_at = time.time() - 120 + s.active_stream_id = stream_id + + # Simulate first read raising IOError, then succeeding. + import api.run_journal as run_journal + real = run_journal.read_run_events + attempts = {"n": 0} + + def flaky_read(sid_, run_id, **kw): + attempts["n"] += 1 + if attempts["n"] == 1: + raise OSError("EIO simulated by test") + return real(sid_, run_id, **kw) + + monkeypatch.setattr(run_journal, "read_run_events", flaky_read) + + core_path = hermes_home / "sessions" / f"session_{sid}.json" + result = _apply_core_sync_or_error_marker( + s, core_path, stream_id_for_recheck=stream_id, + ) + assert result is True + marker = next(m for m in s.messages if m.get("type") == "interrupted") + assert marker.get("_pending_journal_recovery") is True + + # Now write journal events; the next retry call will read them. + append_run_event(sid, stream_id, "token", {"text": "Came back."}) + ok = models._retry_journal_recovery_in_place(s) + assert ok is True + marker_after = next(m for m in s.messages if m.get("type") == "interrupted") + assert "recovered from the run journal" in marker_after["content"] + assert "_pending_journal_recovery" not in marker_after + + def test_journal_grows_between_reads(self, hermes_home, monkeypatch): + sid = "wsl_grow_sid" + stream_id = "wsl_grow_stream" + s = _make_session(session_id=sid, messages=[ + {"role": "user", "content": "x"}, + ]) + s.pending_user_message = "Keep going" + s.pending_started_at = time.time() - 120 + s.active_stream_id = stream_id + + # First repair pass: nothing visible yet. + core_path = hermes_home / "sessions" / f"session_{sid}.json" + result = _apply_core_sync_or_error_marker( + s, core_path, stream_id_for_recheck=stream_id, + ) + assert result is True + marker = next(m for m in s.messages if m.get("type") == "interrupted") + assert marker.get("_pending_journal_recovery") is True + + # Journal grows. + append_run_event(sid, stream_id, "token", {"text": "Partial 1."}) + append_run_event(sid, stream_id, "token", {"text": " Partial 2."}) + append_run_event( + sid, stream_id, "tool", + {"name": "terminal", "preview": "ls", "args": {"cmd": "ls"}}, + ) + + ok = models._retry_journal_recovery_in_place(s) + assert ok is True + marker_after = next(m for m in s.messages if m.get("type") == "interrupted") + assert "recovered from the run journal" in marker_after["content"] + # Both tokens recovered, in order, before the marker. + marker_idx = s.messages.index(marker_after) + recovered_text = " ".join( + m.get("content", "") for m in s.messages[:marker_idx] + if m.get("_recovered_from_run_journal") + ) + assert "Partial 1." in recovered_text and "Partial 2." in recovered_text + + def test_concurrent_get_session_calls_idempotent(self, hermes_home, monkeypatch): + sid = "wsl_concurrent_sid" + stream_id = "wsl_concurrent_stream" + s = _make_session(session_id=sid, messages=[ + {"role": "user", "content": "x"}, + {"role": "assistant", "content": "y"}, + {"role": "user", "content": "later", "_recovered": True}, + ]) + marker = models._interrupted_recovery_marker(pending_retry=True) + marker["_journal_retry_stream_id"] = stream_id + marker["_journal_retry_attempts"] = 0 + marker["_journal_retry_first_seen_ts"] = int(time.time()) + s.messages.append(marker) + s.save() + append_run_event(sid, stream_id, "token", {"text": "ConcurrentTokens"}) + models.SESSIONS[sid] = s + + results = [] + + def _worker(): + try: + models.get_session(sid) + results.append("ok") + except Exception as exc: # noqa: BLE001 + results.append(exc) + + threads = [threading.Thread(target=_worker, daemon=True) for _ in range(2)] + for t in threads: + t.start() + for t in threads: + t.join(timeout=5) + + assert results.count("ok") == 2, f"both workers must succeed: {results}" + + # Exactly one interrupted marker remains, exactly one journal-recovered + # body (deduped by dedupe_existing=True). + interrupted_markers = [m for m in s.messages if m.get("type") == "interrupted"] + assert len(interrupted_markers) == 1 + recovered = [m for m in s.messages if m.get("_recovered_from_run_journal")] + assert sum(1 for m in recovered if "ConcurrentTokens" in m.get("content", "")) == 1 From 66b6d8f019a83a31cf9e73b12cde81009f4e4927 Mon Sep 17 00:00:00 2001 From: Isla Liu Date: Wed, 20 May 2026 11:59:06 +0800 Subject: [PATCH 04/10] docs(session): CHANGELOG entry + troubleshooting FAQ for the lost-response self-heal MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit CHANGELOG: append an Unreleased / Fixed entry describing the user-visible behaviour change (interrupted-turn marker now self-heals on the next session read; gives up gracefully after 12 retries or 24h). docs/troubleshooting.md: add a 'Symptom → Why → Diagnostic → Fix → Caps → When to file a bug' entry for the 'no agent output was recovered' marker so users who hit the lost-response shape on WSL2 / network FS can recognise it, verify the run-journal on disk, and know that reloading the session is enough. --- CHANGELOG.md | 4 ++++ docs/troubleshooting.md | 24 ++++++++++++++++++++++++ 2 files changed, 28 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 185f15f1..6786a512 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,10 @@ ## [Unreleased] +### Fixed + +- **Sidecar repair / streaming recovery** — Lazily retry run-journal recovery when sidecar repair runs before the journaled tokens are visible on disk (mainly affects WSL2 / network FS setups where the run-journal `.jsonl` is written by the dead worker but the WebUI process reads it through a cache that has not yet seen the writes). The interrupted-turn marker now self-heals on the next session read: it carries a `_pending_journal_recovery` flag that re-runs `_append_journaled_partial_output` from `get_session()`, promotes the marker to the recovered-output wording on success, and demotes to neutral wording after 12 failed retries or 24h, so users no longer see a permanent "no agent output was recovered" banner when the journaled tokens are present on disk. New regression test `tests/test_session_lost_response_regression.py` and additional coverage in `tests/test_session_sidecar_repair.py`. + ## [v0.51.95] — 2026-05-20 — Release BS (stage-388 — 5-PR batch — live tool callback event dedup + browser-only dashboard links + messaging transcript merge alignment + Geist Contrast skin + SSE runtime diagnostics) diff --git a/docs/troubleshooting.md b/docs/troubleshooting.md index 3617aeba..cb6384bb 100644 --- a/docs/troubleshooting.md +++ b/docs/troubleshooting.md @@ -85,6 +85,30 @@ If after running steps 1-4 the import still fails *and* `pip install -e .` succe --- +## "Response interrupted." marker keeps saying "no agent output was recovered" + +**Symptom.** After the WebUI process restarts mid-turn (manual restart, OOM, crash, …), the affected chat shows an `**Response interrupted.**` marker with the wording *"The user message above was preserved, but no agent output was recovered."*, even though the run-journal for that turn is present on disk and contains the partial tokens the agent had already streamed. + +**Why.** Sidecar repair re-checks the run-journal at restart and uses the result as a one-shot signal. On WSL2 (9p / DrvFs) and on some network-backed setups, the run-journal `.jsonl` is written by the dead worker but the WebUI process reads it through a page-cache state that has not yet seen those writes — recovery returns "empty" and the marker is baked permanently. The fix introduces a *lazy* retry path: when sidecar repair cannot read visible output but knows the stream id, it stores a `_pending_journal_recovery` flag on the marker and re-attempts recovery from `get_session()` until the journal becomes readable (or the retry budget is exhausted). + +**Diagnostic.** + +1. Identify the affected session id and stream id from the marker. The marker JSON (under `~/.hermes/webui/sessions/session_.json`) shows them on `_journal_retry_stream_id` after the fix; pre-fix sessions only carry the legacy wording. +2. Check whether the run-journal contains real events: + ```bash + ls -la ~/.hermes/_run_journal//.jsonl + head -2 ~/.hermes/_run_journal//.jsonl + ``` + If the file exists and contains `token` / `tool` events, the lazy-retry path will pick them up the next time the session is opened. + +**Fix.** Reload the session in the browser. On the next `get_session()` call the marker is re-evaluated; if the journaled events are visible on disk the marker promotes to *"The partial output above was recovered from the run journal …"* wording and the journaled assistant text + tool cards land above the marker in chronological order. No manual sidecar editing is required. + +**Caps.** The lazy retry path gives up after 12 failed attempts or 24h of wall-clock age, at which point the marker is demoted to a neutral *"Partial output may have been lost."* wording so the "reload to retry" prompt doesn't linger forever for genuinely lost journals. + +**When to file a bug.** If, after the fix, you see the lazy-retry wording (*"Recovering the partial output from the run journal — reload this session to retry."*) but reloading the session never promotes it to the recovered wording even though the `.jsonl` clearly contains `token` events, capture the marker JSON and the run-journal file and file a bug. + +--- + ## Other troubleshooting This document grows over time. If a recurring failure mode isn't covered here yet, add it via PR. The format for each entry: **Symptom → Why → Diagnostic commands → Fix → When to file a bug**. From 9870e8f11125fcf93119bb5d29780fe6e5f132ab Mon Sep 17 00:00:00 2001 From: Isla Liu Date: Wed, 20 May 2026 12:18:03 +0800 Subject: [PATCH 05/10] =?UTF-8?q?fix(session):=20address=20Copilot=20revie?= =?UTF-8?q?w=20=E2=80=94=20scope=20tool-card=20dedupe=20by=20stream=20id?= =?UTF-8?q?=20+=20tighten=20docs?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Four code-review comments from the automated Copilot reviewer on this PR: 1. `_journal_tool_already_present` dedupe was session-wide, so a legitimately-repeated tool (e.g. a second `terminal: ls` in an earlier turn) could cause the retry path to falsely skip materializing the recovered tool card. The helper now takes a keyword `stream_id` argument; when supplied, a tool card whose `_recovered_stream_id` is set AND differs from the candidate is no longer treated as a duplicate. Untagged tool cards (live tools, or tool cards carried over from a pre-tagging core transcript) still match, preserving the existing 'core transcript already has this tool, don't duplicate' invariant. Two new tests in `TestJournalToolDedupeScoping` cover both legs of the rule. 2./3. The troubleshooting FAQ pointed at `~/.hermes/webui/sessions/session_.json` and `~/.hermes/_run_journal/...`. The actual sidecar filename has no `session_` prefix and the run-journal lives under the WebUI sessions dir (`~/.hermes/webui/sessions/_run_journal//.jsonl`, default). Both paths fixed and an explicit note added about `HERMES_WEBUI_STATE_DIR` overriding the state root. 4. Drop unused `json` / `queue` / `Path` imports from `tests/test_session_lost_response_regression.py` so the file stops carrying noise that future linting would flag. --- api/models.py | 43 +++++++- docs/troubleshooting.md | 8 +- .../test_session_lost_response_regression.py | 4 - tests/test_session_sidecar_repair.py | 103 ++++++++++++++++++ 4 files changed, 147 insertions(+), 11 deletions(-) diff --git a/api/models.py b/api/models.py index 7bb831e9..c666fc1f 100644 --- a/api/models.py +++ b/api/models.py @@ -883,9 +883,33 @@ def _find_existing_assistant_for_journal_content(session, content: str) -> int | return None -def _journal_tool_already_present(session, name: str, preview: str) -> bool: +def _journal_tool_already_present( + session, + name: str, + preview: str, + *, + stream_id: str | None = None, +) -> bool: + """Return True when an equivalent tool card already exists. + + Matching rule: + + * If the existing tool card carries ``_recovered_stream_id``, that means a + previous journal-recovery run materialized it. The retry can safely + collapse against it only when both stream ids match — otherwise a + legitimately-repeated tool (e.g. a second ``terminal: ls`` in a + different turn) would be dropped. + * If the existing tool card has no ``_recovered_stream_id`` (a live tool + card, or a tool card carried over from a core transcript that pre-dates + stream-id tagging), the legacy name+preview match still wins. This + preserves the "core transcript already has this tool, don't duplicate + it" invariant the original repair path established. + * When ``stream_id`` is omitted, the helper degrades cleanly to its + pre-fix session-wide behaviour. + """ candidate_name = str(name or '') candidate_preview = _normalize_journal_recovery_text(preview) + candidate_stream = str(stream_id) if stream_id else None for tool_call in session.tool_calls or []: if not isinstance(tool_call, dict): continue @@ -894,8 +918,17 @@ def _journal_tool_already_present(session, name: str, preview: str) -> bool: existing_preview = _normalize_journal_recovery_text( tool_call.get('preview') or tool_call.get('snippet') or '' ) - if existing_preview == candidate_preview: - return True + if existing_preview != candidate_preview: + continue + if candidate_stream is not None: + existing_stream = tool_call.get('_recovered_stream_id') + # A tool card explicitly tagged with a recovered_stream_id that + # differs from ours belongs to another retry's turn — don't let + # it pre-empt this retry. Untagged tool cards (live or carried + # over from the core transcript) still match. + if existing_stream and str(existing_stream) != candidate_stream: + continue + return True return False @@ -1038,7 +1071,9 @@ def _append_journaled_partial_output( anchor_idx = ensure_assistant_anchor(created_at) name = str(payload.get('name') or 'tool') preview = str(payload.get('preview') or '') - if dedupe_existing and _journal_tool_already_present(session, name, preview): + if dedupe_existing and _journal_tool_already_present( + session, name, preview, stream_id=stream_id, + ): current_assistant_idx = anchor_idx continue recovered_tool_calls.append({ diff --git a/docs/troubleshooting.md b/docs/troubleshooting.md index cb6384bb..6ba7dc8b 100644 --- a/docs/troubleshooting.md +++ b/docs/troubleshooting.md @@ -93,11 +93,13 @@ If after running steps 1-4 the import still fails *and* `pip install -e .` succe **Diagnostic.** -1. Identify the affected session id and stream id from the marker. The marker JSON (under `~/.hermes/webui/sessions/session_.json`) shows them on `_journal_retry_stream_id` after the fix; pre-fix sessions only carry the legacy wording. +The on-disk locations below assume the default `~/.hermes/webui` state directory. If you override it via `HERMES_WEBUI_STATE_DIR`, substitute that path for `~/.hermes/webui` in every step. + +1. Identify the affected session id and stream id from the marker. The marker JSON lives at `~/.hermes/webui/sessions/.json`; after the fix it shows them on the `_journal_retry_stream_id` key. Pre-fix sessions only carry the legacy wording, with no retry meta. 2. Check whether the run-journal contains real events: ```bash - ls -la ~/.hermes/_run_journal//.jsonl - head -2 ~/.hermes/_run_journal//.jsonl + ls -la ~/.hermes/webui/sessions/_run_journal//.jsonl + head -2 ~/.hermes/webui/sessions/_run_journal//.jsonl ``` If the file exists and contains `token` / `tool` events, the lazy-retry path will pick them up the next time the session is opened. diff --git a/tests/test_session_lost_response_regression.py b/tests/test_session_lost_response_regression.py index aad6a17d..365f526f 100644 --- a/tests/test_session_lost_response_regression.py +++ b/tests/test_session_lost_response_regression.py @@ -15,10 +15,6 @@ The scenario this test pins down: assistant text/tools into the transcript in the correct chronological position. """ -import json -import queue -from pathlib import Path - import pytest import api.models as models diff --git a/tests/test_session_sidecar_repair.py b/tests/test_session_sidecar_repair.py index 34377e78..c712ab0a 100644 --- a/tests/test_session_sidecar_repair.py +++ b/tests/test_session_sidecar_repair.py @@ -1475,6 +1475,109 @@ class TestLazyRetryBackwardsCompat: assert last.get("_journal_retry_first_seen_ts") == 1779200000 +class TestJournalToolDedupeScoping: + """`_journal_tool_already_present` must only collapse against tool cards + recovered from the same stream — a repeated tool (e.g. ``terminal: ls``) + in a previous turn must NOT pre-empt this turn's recovery.""" + + def test_repeated_tool_in_earlier_turn_does_not_block_recovery(self, hermes_home, monkeypatch): + sid = "dedupe_scope_sid" + stream_id = "dedupe_scope_stream" + s = _make_session(session_id=sid, messages=[ + {"role": "user", "content": "earlier turn"}, + {"role": "assistant", "content": "earlier reply"}, + {"role": "user", "content": "later turn", "_recovered": True}, + ]) + # Pre-existing tool card from an earlier turn with same (name, preview) + # but a different recovered-stream-id. The retry must NOT see this as a + # hit when dedupe_existing is asked to scope to ``stream_id``. + s.tool_calls = [ + { + "name": "terminal", + "preview": "ls", + "snippet": "ls", + "tid": "old-1", + "_recovered_from_run_journal": True, + "_recovered_stream_id": "earlier_stream", + "done": True, + } + ] + marker = models._interrupted_recovery_marker(pending_retry=True) + marker["_journal_retry_stream_id"] = stream_id + marker["_journal_retry_attempts"] = 0 + marker["_journal_retry_first_seen_ts"] = int(time.time()) + s.messages.append(marker) + s.save() + append_run_event(sid, stream_id, "token", {"text": "Listing."}) + append_run_event( + sid, stream_id, "tool", + {"name": "terminal", "preview": "ls", "args": {"cmd": "ls"}}, + ) + + ok = models._retry_journal_recovery_in_place(s) + assert ok is True + + # Two tool cards now: the old one untouched, plus a new one for this + # stream. If dedupe were session-wide the new one would be dropped. + scoped = [ + tc for tc in s.tool_calls + if tc.get("_recovered_stream_id") == stream_id + ] + assert len(scoped) == 1 + assert scoped[0]["name"] == "terminal" + assert scoped[0]["preview"] == "ls" + # Old tool card is preserved. + assert any( + tc.get("_recovered_stream_id") == "earlier_stream" + for tc in s.tool_calls + ) + + def test_untagged_tool_still_matches_for_core_transcript_invariant(self, hermes_home, monkeypatch): + """Tool cards without ``_recovered_stream_id`` (live tools, or tools + carried over from the core transcript) match regardless of the + ``stream_id`` argument. This preserves the "core transcript already + contains this tool, don't duplicate it" invariant the original repair + path relies on.""" + s = _make_session(messages=[ + {"role": "user", "content": "x"}, + {"role": "assistant", "content": "y"}, + ]) + s.tool_calls = [ + {"name": "terminal", "preview": "ls", "snippet": "ls"}, + ] + # No stream_id → legacy session-wide check returns True. + assert models._journal_tool_already_present(s, "terminal", "ls") is True + # With a stream_id, untagged tool cards still match (different stream + # ids only override the match decision when the existing card itself + # is tagged with a stream id that disagrees). + assert models._journal_tool_already_present( + s, "terminal", "ls", stream_id="some_stream", + ) is True + + def test_tagged_tool_with_different_stream_does_not_match(self, hermes_home, monkeypatch): + """A tool card tagged with a different recovered_stream_id must NOT + be considered a duplicate when the retry is scoped to a different + stream.""" + s = _make_session(messages=[ + {"role": "user", "content": "x"}, + ]) + s.tool_calls = [ + { + "name": "terminal", + "preview": "ls", + "snippet": "ls", + "_recovered_stream_id": "other_stream", + }, + ] + assert models._journal_tool_already_present( + s, "terminal", "ls", stream_id="this_stream", + ) is False + # But scoping to the same stream id matches. + assert models._journal_tool_already_present( + s, "terminal", "ls", stream_id="other_stream", + ) is True + + class TestWslPageCacheRace: """Cover the WSL2 / network-FS shape: read_run_events returns empty / errors first, recovers on a later call.""" From 195778533202f8f709ff82cbd93dc71f41d0e476 Mon Sep 17 00:00:00 2001 From: Isla Liu Date: Wed, 20 May 2026 13:08:08 +0800 Subject: [PATCH 06/10] =?UTF-8?q?fix(session):=20address=20Copilot=20round?= =?UTF-8?q?-2=20review=20=E2=80=94=20correct=20stale=20comment=20and=20dro?= =?UTF-8?q?p=20unused=20fixture=20arg?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Two non-functional cleanups from the second Copilot pass: 1. The inline comment in `test_error_marker_no_preserved_as_draft` said the legacy "user message above was preserved" wording was used for the post-retry-give-up case. The actual implementation demotes give-up markers to a different neutral wording ("Partial output may have been lost."). Comment rewritten to match the contract. 2. The regression test `test_lost_response_recovered_on_second_read` declared a `monkeypatch` parameter it never used. Dropped. --- tests/test_session_lost_response_regression.py | 2 +- tests/test_session_sidecar_repair.py | 5 +++-- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/tests/test_session_lost_response_regression.py b/tests/test_session_lost_response_regression.py index 365f526f..eb5c4db0 100644 --- a/tests/test_session_lost_response_regression.py +++ b/tests/test_session_lost_response_regression.py @@ -100,7 +100,7 @@ def _make_dead_stream_session( # ── The regression test ──────────────────────────────────────────────────── -def test_lost_response_recovered_on_second_read(hermes_home, monkeypatch): +def test_lost_response_recovered_on_second_read(hermes_home): sid = "9f14583f0e4e4444aaaa111122223333" stream_id = "7c8b4108d52b4aba9af362d3a54f47ac" diff --git a/tests/test_session_sidecar_repair.py b/tests/test_session_sidecar_repair.py index c712ab0a..369a0309 100644 --- a/tests/test_session_sidecar_repair.py +++ b/tests/test_session_sidecar_repair.py @@ -314,9 +314,10 @@ class TestDraftRecovery: assert "Response interrupted" in content assert "WebUI process restarted" in content # The marker now arms the lazy-retry hook when a stream id is known - # (Recovering the partial output... reload to retry). The legacy + # ("Recovering the partial output… reload to retry."). The legacy # "user message above was preserved" wording is reserved for the - # no-stream-id and post-retry-give-up cases. + # no-stream-id repair case; the post-retry-give-up case demotes to + # the neutral "Partial output may have been lost." wording instead. assert ( "user message above was preserved" in content or "Recovering the partial output" in content From d5a185d9c6758f51b2b9b8d069be3ff6f5080b9f Mon Sep 17 00:00:00 2001 From: Isla Liu Date: Wed, 20 May 2026 20:48:38 +0800 Subject: [PATCH 07/10] fix(session): serialize lazy journal retry per session --- api/models.py | 26 +++++++- .../test_session_lost_response_regression.py | 64 +++++++++++++++++++ 2 files changed, 88 insertions(+), 2 deletions(-) diff --git a/api/models.py b/api/models.py index c666fc1f..b9cd532c 100644 --- a/api/models.py +++ b/api/models.py @@ -1144,6 +1144,13 @@ def _append_journaled_partial_output( # so users do not see "reload to retry" prompts forever. _JOURNAL_RETRY_MAX_ATTEMPTS = 12 _JOURNAL_RETRY_GIVEUP_SECONDS = 24 * 3600 +_JOURNAL_RETRY_LOCKS: dict[str, threading.Lock] = {} +_JOURNAL_RETRY_LOCKS_GUARD = threading.Lock() + + +def _journal_retry_lock_for_sid(sid: str) -> threading.Lock: + with _JOURNAL_RETRY_LOCKS_GUARD: + return _JOURNAL_RETRY_LOCKS.setdefault(str(sid), threading.Lock()) def _build_recovery_marker_with_retry_hook( @@ -1231,6 +1238,21 @@ def _reorder_journal_tail_above_marker(session, marker_idx: int) -> None: session.messages = new_messages +def _try_retry_journal_recovery_in_place(session) -> bool: + sid = str(getattr(session, 'session_id', '') or '') + lock = _journal_retry_lock_for_sid(sid) + if not lock.acquire(blocking=False): + logger.debug("lazy journal-retry already running for session %s", sid) + return False + try: + return _retry_journal_recovery_in_place(session) + finally: + lock.release() + with _JOURNAL_RETRY_LOCKS_GUARD: + if _JOURNAL_RETRY_LOCKS.get(sid) is lock: + _JOURNAL_RETRY_LOCKS.pop(sid, None) + + def _retry_journal_recovery_in_place(session) -> bool: """Re-attempt run-journal recovery for the most recent pending marker. @@ -1630,7 +1652,7 @@ def get_session(sid, metadata_only=False): if cached is not None: if not metadata_only and _session_has_pending_journal_retry(cached): try: - _retry_journal_recovery_in_place(cached) + _try_retry_journal_recovery_in_place(cached) except Exception: logger.debug( "lazy journal-retry failed on cache hit for session %s", @@ -1658,7 +1680,7 @@ def get_session(sid, metadata_only=False): # chance to self-heal on this read. if not repaired and _session_has_pending_journal_retry(s): try: - _retry_journal_recovery_in_place(s) + _try_retry_journal_recovery_in_place(s) except Exception: logger.debug( "lazy journal-retry failed on cold load for session %s", diff --git a/tests/test_session_lost_response_regression.py b/tests/test_session_lost_response_regression.py index eb5c4db0..db129d45 100644 --- a/tests/test_session_lost_response_regression.py +++ b/tests/test_session_lost_response_regression.py @@ -15,6 +15,9 @@ The scenario this test pins down: assistant text/tools into the transcript in the correct chronological position. """ +from concurrent.futures import ThreadPoolExecutor +import threading + import pytest import api.models as models @@ -59,8 +62,10 @@ def _isolate_stream_state(): @pytest.fixture(autouse=True) def _isolate_agent_locks(): config.SESSION_AGENT_LOCKS.clear() + models._JOURNAL_RETRY_LOCKS.clear() yield config.SESSION_AGENT_LOCKS.clear() + models._JOURNAL_RETRY_LOCKS.clear() @pytest.fixture() @@ -97,6 +102,32 @@ def _make_dead_stream_session( return s +def _make_pending_retry_session(session_id: str, *, stream_id: str): + s = Session(session_id=session_id, title="Pending retry", messages=[ + {"role": "user", "content": "q", "timestamp": 1}, + { + "role": "assistant", + "content": models._INTERRUPTED_PENDING_RETRY_WORDING, + "timestamp": 2, + "_error": True, + "type": "interrupted", + "_pending_journal_recovery": True, + "_journal_retry_stream_id": stream_id, + "_journal_retry_attempts": 0, + "_journal_retry_first_seen_ts": int(models.time.time()), + }, + ]) + s.save() + return s + + +def _assert_retry_meta_removed(marker): + assert "_pending_journal_recovery" not in marker + assert "_journal_retry_stream_id" not in marker + assert "_journal_retry_attempts" not in marker + assert "_journal_retry_first_seen_ts" not in marker + + # ── The regression test ──────────────────────────────────────────────────── @@ -196,3 +227,36 @@ def test_lost_response_recovered_on_second_read(hermes_home): assert "_journal_retry_stream_id" not in promoted assert "_journal_retry_attempts" not in promoted assert "_journal_retry_first_seen_ts" not in promoted + + +def test_concurrent_get_session_serializes_lazy_journal_retry(hermes_home, monkeypatch): + sid = "retry_lock_sid" + stream_id = "retry_lock_stream" + s = _make_pending_retry_session(sid, stream_id=stream_id) + models.SESSIONS[sid] = s + + entered = threading.Event() + release = threading.Event() + counter_lock = threading.Lock() + calls = 0 + + def slow_retry(session): + nonlocal calls + with counter_lock: + calls += 1 + entered.set() + assert release.wait(timeout=2), "test timed out waiting to release retry body" + return False + + monkeypatch.setattr(models, "_retry_journal_recovery_in_place", slow_retry) + + with ThreadPoolExecutor(max_workers=2) as executor: + first = executor.submit(models.get_session, sid) + assert entered.wait(timeout=2), "first caller did not enter retry helper" + second = executor.submit(models.get_session, sid) + assert second.result(timeout=2) is s + release.set() + assert first.result(timeout=2) is s + + assert calls == 1 + From 2a303de2a3aa92f2d43df8cd7a9f121a3ae62db5 Mon Sep 17 00:00:00 2001 From: Isla Liu Date: Wed, 20 May 2026 20:49:43 +0800 Subject: [PATCH 08/10] fix(session): preserve retry budget while journal is still arriving --- api/models.py | 68 +++++++++++++++++-- .../test_session_lost_response_regression.py | 36 +++++++++- 2 files changed, 98 insertions(+), 6 deletions(-) diff --git a/api/models.py b/api/models.py index b9cd532c..91b5fdbf 100644 --- a/api/models.py +++ b/api/models.py @@ -957,6 +957,39 @@ def _run_journal_has_visible_output(session, stream_id: str | None) -> bool: return False +def _journal_is_still_arriving(session, stream_id: str | None) -> bool: + """Return True for journals that may become visible on a later read. + + `read_run_events()` deliberately collapses missing files and empty files + into an empty event list, so the lazy retry path needs a small filesystem + visibility check to avoid burning all retry attempts while WSL2 / network + filesystems are still surfacing the journal. Non-empty journals are treated + as sealed enough for retry-budget accounting; if they contain no visible + output, the normal capped give-up path handles them. + """ + if not stream_id: + return False + try: + from api.run_journal import _run_path, latest_run_summary + + path = _run_path(session.session_id, stream_id) + summary = latest_run_summary(session.session_id, stream_id) + if summary.get('terminal'): + return False + try: + return (not path.exists()) or path.stat().st_size == 0 + except OSError: + return True + except Exception: + logger.debug( + "Session %s: failed to classify journal visibility for stream %s", + getattr(session, 'session_id', '?'), + stream_id, + exc_info=True, + ) + return False + + def _append_journaled_partial_output( session, stream_id: str | None, @@ -1137,7 +1170,10 @@ def _append_journaled_partial_output( # `_append_journaled_partial_output` with `dedupe_existing=True`. On # success the marker is promoted in place to the recovered-output # wording, the journaled rows are reordered to sit above the marker, -# and all retry meta is stripped. On failure attempts is incremented. +# and all retry meta is stripped. If the journal is still missing or +# zero-byte, the retry is a no-op and does not consume attempt budget. +# Terminal/non-useful journals consume attempt budget and can demote +# immediately at the max-attempt cap. # * After `_JOURNAL_RETRY_MAX_ATTEMPTS` failed retries or # `_JOURNAL_RETRY_GIVEUP_SECONDS` of wall-clock age, the marker is # demoted to the neutral wording ("Partial output may have been lost.") @@ -1245,7 +1281,9 @@ def _try_retry_journal_recovery_in_place(session) -> bool: logger.debug("lazy journal-retry already running for session %s", sid) return False try: - return _retry_journal_recovery_in_place(session) + return _retry_journal_recovery_in_place( + session, preserve_arriving_budget=True, + ) finally: lock.release() with _JOURNAL_RETRY_LOCKS_GUARD: @@ -1253,7 +1291,11 @@ def _try_retry_journal_recovery_in_place(session) -> bool: _JOURNAL_RETRY_LOCKS.pop(sid, None) -def _retry_journal_recovery_in_place(session) -> bool: +def _retry_journal_recovery_in_place( + session, + *, + preserve_arriving_budget: bool = False, +) -> bool: """Re-attempt run-journal recovery for the most recent pending marker. Returns True if the marker was promoted to the recovered-output wording. @@ -1338,12 +1380,28 @@ def _retry_journal_recovery_in_place(session) -> bool: attempts, ) return True - msg['_journal_retry_attempts'] = attempts + 1 + if ( + preserve_arriving_budget + and _journal_is_still_arriving(session, stream_id) + ): + logger.debug( + "Session %s: journal for stream %s still arriving; " + "preserving retry budget", + getattr(session, 'session_id', '?'), + stream_id, + ) + return False + next_attempts = attempts + 1 + if next_attempts >= _JOURNAL_RETRY_MAX_ATTEMPTS: + msg['content'] = _INTERRUPTED_NEUTRAL_WORDING + _strip_journal_retry_meta(msg) + else: + msg['_journal_retry_attempts'] = next_attempts try: session.save(touch_updated_at=False) except Exception: logger.debug( - "save() failed while incrementing retry counter for session %s", + "save() failed while updating retry counter for session %s", getattr(session, 'session_id', '?'), exc_info=True, ) diff --git a/tests/test_session_lost_response_regression.py b/tests/test_session_lost_response_regression.py index db129d45..0e8cea6c 100644 --- a/tests/test_session_lost_response_regression.py +++ b/tests/test_session_lost_response_regression.py @@ -240,7 +240,7 @@ def test_concurrent_get_session_serializes_lazy_journal_retry(hermes_home, monke counter_lock = threading.Lock() calls = 0 - def slow_retry(session): + def slow_retry(session, *, preserve_arriving_budget=False): nonlocal calls with counter_lock: calls += 1 @@ -260,3 +260,37 @@ def test_concurrent_get_session_serializes_lazy_journal_retry(hermes_home, monke assert calls == 1 + +def test_still_arriving_journal_does_not_consume_retry_budget(hermes_home, monkeypatch): + sid = "retry_arriving_sid" + stream_id = "retry_arriving_stream" + s = _make_pending_retry_session(sid, stream_id=stream_id) + models.SESSIONS[sid] = s + + monkeypatch.setattr(models, "_append_journaled_partial_output", lambda *a, **kw: False) + monkeypatch.setattr(models, "_journal_is_still_arriving", lambda *a, **kw: True) + + for _ in range(20): + assert models.get_session(sid) is s + + marker = s.messages[-1] + assert marker["_journal_retry_attempts"] == 0 + assert marker["_pending_journal_recovery"] is True + assert marker["content"] == models._INTERRUPTED_PENDING_RETRY_WORDING + + +def test_sealed_empty_journal_consumes_retry_budget_and_demotes_at_max(hermes_home, monkeypatch): + sid = "retry_sealed_sid" + stream_id = "retry_sealed_stream" + s = _make_pending_retry_session(sid, stream_id=stream_id) + s.messages[-1]["_journal_retry_attempts"] = models._JOURNAL_RETRY_MAX_ATTEMPTS - 1 + append_run_event(sid, stream_id, "stream_end", {}) + models.SESSIONS[sid] = s + + assert models.get_session(sid) is s + + marker = s.messages[-1] + assert marker["content"] == models._INTERRUPTED_NEUTRAL_WORDING + _assert_retry_meta_removed(marker) + assert not any(m.get("_recovered_from_run_journal") for m in s.messages) + From 37c3e84ad206e8efd4788822b7154c27363db6a5 Mon Sep 17 00:00:00 2001 From: Isla Liu Date: Wed, 20 May 2026 20:50:21 +0800 Subject: [PATCH 09/10] test(session): cover lazy journal retry give-up paths --- .../test_session_lost_response_regression.py | 43 +++++++++++++++++++ 1 file changed, 43 insertions(+) diff --git a/tests/test_session_lost_response_regression.py b/tests/test_session_lost_response_regression.py index 0e8cea6c..4b85ebb9 100644 --- a/tests/test_session_lost_response_regression.py +++ b/tests/test_session_lost_response_regression.py @@ -294,3 +294,46 @@ def test_sealed_empty_journal_consumes_retry_budget_and_demotes_at_max(hermes_ho _assert_retry_meta_removed(marker) assert not any(m.get("_recovered_from_run_journal") for m in s.messages) + +def test_marker_demotes_after_max_attempts_with_sealed_empty_journal(hermes_home, monkeypatch): + sid = "retry_max_sid" + stream_id = "retry_max_stream" + s = _make_pending_retry_session(sid, stream_id=stream_id) + s.messages[-1]["_journal_retry_attempts"] = models._JOURNAL_RETRY_MAX_ATTEMPTS - 1 + append_run_event(sid, stream_id, "stream_end", {}) + models.SESSIONS[sid] = s + + assert models.get_session(sid) is s + + marker = s.messages[-1] + assert marker["content"] == models._INTERRUPTED_NEUTRAL_WORDING + _assert_retry_meta_removed(marker) + assert not any(m.get("_recovered_from_run_journal") for m in s.messages) + + +def test_marker_demotes_after_giveup_seconds(hermes_home, monkeypatch): + base = 1_779_000_000 + monkeypatch.setattr(models.time, "time", lambda: base) + sid = "retry_age_sid" + stream_id = "retry_age_stream" + s = _make_pending_retry_session(sid, stream_id=stream_id) + s.messages[-1]["_journal_retry_first_seen_ts"] = ( + base - models._JOURNAL_RETRY_GIVEUP_SECONDS - 1 + ) + models.SESSIONS[sid] = s + + append_calls = 0 + + def append_should_not_run(*args, **kwargs): + nonlocal append_calls + append_calls += 1 + return False + + monkeypatch.setattr(models, "_append_journaled_partial_output", append_should_not_run) + + assert models.get_session(sid) is s + + marker = s.messages[-1] + assert marker["content"] == models._INTERRUPTED_NEUTRAL_WORDING + _assert_retry_meta_removed(marker) + assert append_calls == 0 From 98106c809bbfea73b8847cf175458a677bc66942 Mon Sep 17 00:00:00 2001 From: Isla Liu Date: Wed, 20 May 2026 20:51:01 +0800 Subject: [PATCH 10/10] docs(session): clarify lazy retry trigger for metadata-only polling --- docs/troubleshooting.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/docs/troubleshooting.md b/docs/troubleshooting.md index 6ba7dc8b..78f5f4ec 100644 --- a/docs/troubleshooting.md +++ b/docs/troubleshooting.md @@ -105,6 +105,8 @@ The on-disk locations below assume the default `~/.hermes/webui` state directory **Fix.** Reload the session in the browser. On the next `get_session()` call the marker is re-evaluated; if the journaled events are visible on disk the marker promotes to *"The partial output above was recovered from the run journal …"* wording and the journaled assistant text + tool cards land above the marker in chronological order. No manual sidecar editing is required. +**Trigger.** Sidebar metadata polling is intentionally not enough to run this self-heal. Requests such as `/api/session?messages=0&resolve_model=0` load the session with `metadata_only=True`, skip the full messages array, and therefore skip the lazy journal retry helper. Click/open the affected conversation so the message panel performs a full `messages=1` load; that full render is what re-checks the journal and can promote the marker. + **Caps.** The lazy retry path gives up after 12 failed attempts or 24h of wall-clock age, at which point the marker is demoted to a neutral *"Partial output may have been lost."* wording so the "reload to retry" prompt doesn't linger forever for genuinely lost journals. **When to file a bug.** If, after the fix, you see the lazy-retry wording (*"Recovering the partial output from the run journal — reload this session to retry."*) but reloading the session never promotes it to the recovered wording even though the `.jsonl` clearly contains `token` events, capture the marker JSON and the run-journal file and file a bug.