diff --git a/CHANGELOG.md b/CHANGELOG.md index 0faf2bcc..429553e5 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,6 +3,10 @@ ## [Unreleased] +### Fixed + +- **Sidecar repair / streaming recovery** — Lazily retry run-journal recovery when sidecar repair runs before the journaled tokens are visible on disk (mainly affects WSL2 / network FS setups where the run-journal `.jsonl` is written by the dead worker but the WebUI process reads it through a cache that has not yet seen the writes). The interrupted-turn marker now self-heals on the next session read: it carries a `_pending_journal_recovery` flag that re-runs `_append_journaled_partial_output` from `get_session()`, promotes the marker to the recovered-output wording on success, and demotes to neutral wording after 12 failed retries or 24h, so users no longer see a permanent "no agent output was recovered" banner when the journaled tokens are present on disk. New regression test `tests/test_session_lost_response_regression.py` and additional coverage in `tests/test_session_sidecar_repair.py`. + ## [v0.51.99] — 2026-05-20 — Release BW (stage-392 — 5-PR batch — compact tool activity grouping + CLI sidebar scan cap + title-generation API key forwarding + post-compression replay dedup + clarify popup stability) diff --git a/api/models.py b/api/models.py index cdaa1d24..c5a65350 100644 --- a/api/models.py +++ b/api/models.py @@ -741,27 +741,69 @@ def _get_profile_home(profile) -> Path: return Path(os.environ.get('HERMES_HOME') or '~/.hermes').expanduser() -def _interrupted_recovery_marker(*, recovered_output: bool = False) -> dict: +_INTERRUPTED_RECOVERED_WORDING = ( + '**Response interrupted.**\n\n' + 'The WebUI process restarted before this turn finished. ' + 'The partial output above was recovered from the run journal, ' + 'but the interrupted agent process could not continue.' +) +_INTERRUPTED_NO_OUTPUT_WORDING = ( + '**Response interrupted.**\n\n' + 'The WebUI process restarted before this turn finished. ' + 'The user message above was preserved, but no agent output was recovered.' +) +_INTERRUPTED_PENDING_RETRY_WORDING = ( + '**Response interrupted.**\n\n' + 'The WebUI process restarted before this turn finished. ' + 'Recovering the partial output from the run journal — ' + 'reload this session to retry.' +) +# Neutral wording used when the lazy retry path gives up (max attempts reached +# or the marker has been pending longer than _JOURNAL_RETRY_GIVEUP_SECONDS). +_INTERRUPTED_NEUTRAL_WORDING = ( + '**Response interrupted.**\n\n' + 'The WebUI process restarted before this turn finished. ' + 'Partial output may have been lost.' +) + + +def _interrupted_recovery_marker( + *, + recovered_output: bool = False, + pending_retry: bool = False, +) -> dict: + """Build the standard interrupted-turn marker. + + ``recovered_output=True`` means the run journal already yielded visible + text on this repair pass — the marker advertises that the partial output + has been recovered. + + ``pending_retry=True`` is the lazy-retry hook: the journal was unreadable + on this pass (page-cache loss, un-fsynced writes on slow FS, etc.). The + marker carries a ``_pending_journal_recovery`` flag so a later + ``get_session()`` can re-attempt recovery without baking a permanent + "no output" claim into the transcript. + + The two are mutually exclusive; ``recovered_output`` wins if both are + set so the caller cannot accidentally re-arm retry on a successful + repair. + """ if recovered_output: - content = ( - '**Response interrupted.**\n\n' - 'The WebUI process restarted before this turn finished. ' - 'The partial output above was recovered from the run journal, ' - 'but the interrupted agent process could not continue.' - ) + content = _INTERRUPTED_RECOVERED_WORDING + elif pending_retry: + content = _INTERRUPTED_PENDING_RETRY_WORDING else: - content = ( - '**Response interrupted.**\n\n' - 'The WebUI process restarted before this turn finished. ' - 'The user message above was preserved, but no agent output was recovered.' - ) - return { + content = _INTERRUPTED_NO_OUTPUT_WORDING + marker = { 'role': 'assistant', 'content': content, 'timestamp': int(time.time()), '_error': True, 'type': 'interrupted', } + if pending_retry and not recovered_output: + marker['_pending_journal_recovery'] = True + return marker def _truncate_journal_tool_args(args, limit: int = 4) -> dict: @@ -848,9 +890,33 @@ def _find_existing_assistant_for_journal_content(session, content: str) -> int | return None -def _journal_tool_already_present(session, name: str, preview: str) -> bool: +def _journal_tool_already_present( + session, + name: str, + preview: str, + *, + stream_id: str | None = None, +) -> bool: + """Return True when an equivalent tool card already exists. + + Matching rule: + + * If the existing tool card carries ``_recovered_stream_id``, that means a + previous journal-recovery run materialized it. The retry can safely + collapse against it only when both stream ids match — otherwise a + legitimately-repeated tool (e.g. a second ``terminal: ls`` in a + different turn) would be dropped. + * If the existing tool card has no ``_recovered_stream_id`` (a live tool + card, or a tool card carried over from a core transcript that pre-dates + stream-id tagging), the legacy name+preview match still wins. This + preserves the "core transcript already has this tool, don't duplicate + it" invariant the original repair path established. + * When ``stream_id`` is omitted, the helper degrades cleanly to its + pre-fix session-wide behaviour. + """ candidate_name = str(name or '') candidate_preview = _normalize_journal_recovery_text(preview) + candidate_stream = str(stream_id) if stream_id else None for tool_call in session.tool_calls or []: if not isinstance(tool_call, dict): continue @@ -859,8 +925,17 @@ def _journal_tool_already_present(session, name: str, preview: str) -> bool: existing_preview = _normalize_journal_recovery_text( tool_call.get('preview') or tool_call.get('snippet') or '' ) - if existing_preview == candidate_preview: - return True + if existing_preview != candidate_preview: + continue + if candidate_stream is not None: + existing_stream = tool_call.get('_recovered_stream_id') + # A tool card explicitly tagged with a recovered_stream_id that + # differs from ours belongs to another retry's turn — don't let + # it pre-empt this retry. Untagged tool cards (live or carried + # over from the core transcript) still match. + if existing_stream and str(existing_stream) != candidate_stream: + continue + return True return False @@ -889,6 +964,39 @@ def _run_journal_has_visible_output(session, stream_id: str | None) -> bool: return False +def _journal_is_still_arriving(session, stream_id: str | None) -> bool: + """Return True for journals that may become visible on a later read. + + `read_run_events()` deliberately collapses missing files and empty files + into an empty event list, so the lazy retry path needs a small filesystem + visibility check to avoid burning all retry attempts while WSL2 / network + filesystems are still surfacing the journal. Non-empty journals are treated + as sealed enough for retry-budget accounting; if they contain no visible + output, the normal capped give-up path handles them. + """ + if not stream_id: + return False + try: + from api.run_journal import _run_path, latest_run_summary + + path = _run_path(session.session_id, stream_id) + summary = latest_run_summary(session.session_id, stream_id) + if summary.get('terminal'): + return False + try: + return (not path.exists()) or path.stat().st_size == 0 + except OSError: + return True + except Exception: + logger.debug( + "Session %s: failed to classify journal visibility for stream %s", + getattr(session, 'session_id', '?'), + stream_id, + exc_info=True, + ) + return False + + def _append_journaled_partial_output( session, stream_id: str | None, @@ -1003,7 +1111,9 @@ def _append_journaled_partial_output( anchor_idx = ensure_assistant_anchor(created_at) name = str(payload.get('name') or 'tool') preview = str(payload.get('preview') or '') - if dedupe_existing and _journal_tool_already_present(session, name, preview): + if dedupe_existing and _journal_tool_already_present( + session, name, preview, stream_id=stream_id, + ): current_assistant_idx = anchor_idx continue recovered_tool_calls.append({ @@ -1045,6 +1155,273 @@ def _append_journaled_partial_output( return appended_any +# ── Lazy run-journal recovery (read-side self-heal) ───────────────────────── +# +# When sidecar repair runs before the run-journal for the dead stream is +# visible on disk (page-cache loss on WSL2 9p / DrvFs, an un-fsynced journal +# tail, a slow network FS, …), `_append_journaled_partial_output` returns +# False even though the journaled events will appear on disk shortly. Without +# the helpers below the repair path baked a permanent "no agent output was +# recovered" claim into the marker, and a later session read could never +# correct it. +# +# The contract is: +# +# * Sidecar repair (`_apply_core_sync_or_error_marker`) writes a marker +# with `_pending_journal_recovery=True` whenever it could not recover +# visible output AND the stream id is known. Three retry-meta keys go +# onto the marker: `_journal_retry_stream_id`, `_journal_retry_attempts`, +# `_journal_retry_first_seen_ts`. +# * Every `get_session()` call that returns the full session checks the +# latest assistant marker; if the flag is set it re-runs +# `_append_journaled_partial_output` with `dedupe_existing=True`. On +# success the marker is promoted in place to the recovered-output +# wording, the journaled rows are reordered to sit above the marker, +# and all retry meta is stripped. If the journal is still missing or +# zero-byte, the retry is a no-op and does not consume attempt budget. +# Terminal/non-useful journals consume attempt budget and can demote +# immediately at the max-attempt cap. +# * After `_JOURNAL_RETRY_MAX_ATTEMPTS` failed retries or +# `_JOURNAL_RETRY_GIVEUP_SECONDS` of wall-clock age, the marker is +# demoted to the neutral wording ("Partial output may have been lost.") +# so users do not see "reload to retry" prompts forever. +_JOURNAL_RETRY_MAX_ATTEMPTS = 12 +_JOURNAL_RETRY_GIVEUP_SECONDS = 24 * 3600 +_JOURNAL_RETRY_LOCKS: dict[str, threading.Lock] = {} +_JOURNAL_RETRY_LOCKS_GUARD = threading.Lock() + + +def _journal_retry_lock_for_sid(sid: str) -> threading.Lock: + with _JOURNAL_RETRY_LOCKS_GUARD: + return _JOURNAL_RETRY_LOCKS.setdefault(str(sid), threading.Lock()) + + +def _build_recovery_marker_with_retry_hook( + *, recovered_output: bool, stream_id: str | None, +) -> dict: + """Build an interrupted-turn marker, arming the lazy-retry hook when + visible output was not recovered yet but a stream id is available.""" + if recovered_output: + return _interrupted_recovery_marker(recovered_output=True) + if not stream_id: + return _interrupted_recovery_marker(recovered_output=False) + marker = _interrupted_recovery_marker(pending_retry=True) + marker['_journal_retry_stream_id'] = str(stream_id) + marker['_journal_retry_attempts'] = 0 + marker['_journal_retry_first_seen_ts'] = int(time.time()) + return marker + + +def _session_has_pending_journal_retry(session) -> bool: + """Cheap short-circuit: scan from the tail until the most recent normal + assistant turn. Any `_pending_journal_recovery` flag found before then + means a retry is queued. + """ + messages = getattr(session, 'messages', None) or [] + for msg in reversed(messages): + if not isinstance(msg, dict): + continue + if msg.get('_pending_journal_recovery'): + return True + if msg.get('role') == 'assistant' and not msg.get('_error'): + # A normal assistant turn after any pending marker — nothing to + # retry above this point. + return False + return False + + +def _strip_journal_retry_meta(marker: dict) -> None: + marker.pop('_pending_journal_recovery', None) + marker.pop('_journal_retry_stream_id', None) + marker.pop('_journal_retry_attempts', None) + marker.pop('_journal_retry_first_seen_ts', None) + + +def _reorder_journal_tail_above_marker(session, marker_idx: int) -> None: + """Move `_recovered_from_run_journal=True` rows appended *after* + ``marker_idx`` to sit immediately above the marker so chronological + order is preserved (journaled output happened during the turn, marker + annotates its end). + """ + messages = session.messages + if marker_idx < 0 or marker_idx >= len(messages): + return + tail = messages[marker_idx + 1 :] + if not tail: + return + journaled = [ + m for m in tail + if isinstance(m, dict) and m.get('_recovered_from_run_journal') + ] + if not journaled: + return + rest = [ + m for m in tail + if not (isinstance(m, dict) and m.get('_recovered_from_run_journal')) + ] + marker = messages[marker_idx] + new_messages = ( + messages[:marker_idx] + + journaled + + [marker] + + rest + ) + # Rebase any tool_calls.assistant_msg_idx values that pointed into the + # journaled rows when they were appended at the tail. + old_journaled_idx_base = marker_idx + 1 + new_journaled_idx_base = marker_idx + shift = new_journaled_idx_base - old_journaled_idx_base # = -1 + for tool_call in session.tool_calls or []: + if not isinstance(tool_call, dict): + continue + idx = tool_call.get('assistant_msg_idx') + if isinstance(idx, int) and idx >= old_journaled_idx_base \ + and idx < old_journaled_idx_base + len(journaled): + tool_call['assistant_msg_idx'] = idx + shift + session.messages = new_messages + + +def _try_retry_journal_recovery_in_place(session) -> bool: + sid = str(getattr(session, 'session_id', '') or '') + lock = _journal_retry_lock_for_sid(sid) + if not lock.acquire(blocking=False): + logger.debug("lazy journal-retry already running for session %s", sid) + return False + try: + return _retry_journal_recovery_in_place( + session, preserve_arriving_budget=True, + ) + finally: + lock.release() + with _JOURNAL_RETRY_LOCKS_GUARD: + if _JOURNAL_RETRY_LOCKS.get(sid) is lock: + _JOURNAL_RETRY_LOCKS.pop(sid, None) + + +def _retry_journal_recovery_in_place( + session, + *, + preserve_arriving_budget: bool = False, +) -> bool: + """Re-attempt run-journal recovery for the most recent pending marker. + + Returns True if the marker was promoted to the recovered-output wording. + Never raises — caller is best-effort. + """ + try: + messages = session.messages or [] + for idx in range(len(messages) - 1, -1, -1): + msg = messages[idx] + if not isinstance(msg, dict): + continue + if msg.get('role') == 'assistant' and not msg.get('_error') \ + and not msg.get('_pending_journal_recovery'): + # Walked past the pending marker without finding it. + return False + if not ( + msg.get('type') == 'interrupted' + and msg.get('_pending_journal_recovery') + ): + continue + stream_id = msg.get('_journal_retry_stream_id') + first_seen = msg.get('_journal_retry_first_seen_ts') or 0 + attempts = int(msg.get('_journal_retry_attempts') or 0) + now = time.time() + give_up = ( + attempts >= _JOURNAL_RETRY_MAX_ATTEMPTS + or ( + first_seen + and now - float(first_seen) > _JOURNAL_RETRY_GIVEUP_SECONDS + ) + ) + if not stream_id: + # No stream id to retry against; demote immediately. + msg['content'] = _INTERRUPTED_NEUTRAL_WORDING + _strip_journal_retry_meta(msg) + try: + session.save(touch_updated_at=False) + except Exception: + logger.debug( + "save() failed while demoting marker for session %s", + getattr(session, 'session_id', '?'), + exc_info=True, + ) + return False + if give_up: + msg['content'] = _INTERRUPTED_NEUTRAL_WORDING + _strip_journal_retry_meta(msg) + try: + session.save(touch_updated_at=False) + except Exception: + logger.debug( + "save() failed while demoting marker for session %s", + getattr(session, 'session_id', '?'), + exc_info=True, + ) + return False + tail_len_before = len(session.messages) + ok = _append_journaled_partial_output( + session, stream_id, dedupe_existing=True, + ) + if ok: + msg['content'] = _INTERRUPTED_RECOVERED_WORDING + _strip_journal_retry_meta(msg) + # The journaled rows were appended at the end of messages; + # only the rows past the previous tail count as "newly + # journaled" and need to move above the marker. + _ = tail_len_before # informational; helper below scans + _reorder_journal_tail_above_marker(session, idx) + try: + session.save(touch_updated_at=False) + except Exception: + logger.debug( + "save() failed while promoting marker for session %s", + getattr(session, 'session_id', '?'), + exc_info=True, + ) + logger.info( + "Session %s: lazy journal-recovery promoted marker for " + "stream %s after %d attempts", + getattr(session, 'session_id', '?'), + stream_id, + attempts, + ) + return True + if ( + preserve_arriving_budget + and _journal_is_still_arriving(session, stream_id) + ): + logger.debug( + "Session %s: journal for stream %s still arriving; " + "preserving retry budget", + getattr(session, 'session_id', '?'), + stream_id, + ) + return False + next_attempts = attempts + 1 + if next_attempts >= _JOURNAL_RETRY_MAX_ATTEMPTS: + msg['content'] = _INTERRUPTED_NEUTRAL_WORDING + _strip_journal_retry_meta(msg) + else: + msg['_journal_retry_attempts'] = next_attempts + try: + session.save(touch_updated_at=False) + except Exception: + logger.debug( + "save() failed while updating retry counter for session %s", + getattr(session, 'session_id', '?'), + exc_info=True, + ) + return False + return False + except Exception: + logger.exception( + "_retry_journal_recovery_in_place failed for session %s", + getattr(session, 'session_id', '?'), + ) + return False + + def _apply_core_sync_or_error_marker( session, core_path, @@ -1112,11 +1489,16 @@ def _apply_core_sync_or_error_marker( session, stream_id_for_recheck or session.active_stream_id, ) + _stream_id = stream_id_for_recheck or session.active_stream_id session.active_stream_id = None session.pending_user_message = None session.pending_attachments = [] session.pending_started_at = None - session.messages.append(_interrupted_recovery_marker(recovered_output=recovered_output)) + session.messages.append( + _build_recovery_marker_with_retry_hook( + recovered_output=recovered_output, stream_id=_stream_id, + ) + ) session.save(touch_updated_at=touch_updated_at) logger.info( "Session %s: recovered pending user turn (messages non-empty), added error marker", @@ -1167,6 +1549,17 @@ def _apply_core_sync_or_error_marker( session.messages.append( _interrupted_recovery_marker(recovered_output=True) ) + # NOTE: when the core transcript was synced in but the run journal + # is not yet visible, intentionally do NOT append a lazy-retry + # marker here. In this branch the canonical history is the core + # transcript itself (which has already been written to s.messages + # above) and the marker is purely advisory — the existing contract + # is "marker only when there is a recovered partial turn to + # annotate". Adding a pending-retry marker on every empty-journal + # core-sync would surface a spurious "reload to retry" banner on + # sessions whose journal is legitimately absent (e.g. archived + # streams). The first and third branches handle the lost-response + # case where the marker is the only signal the user gets. session.save(touch_updated_at=touch_updated_at) logger.info( "Session %s: synced %d messages from core transcript%s", @@ -1189,11 +1582,16 @@ def _apply_core_sync_or_error_marker( session, stream_id_for_recheck or session.active_stream_id, ) + _stream_id = stream_id_for_recheck or session.active_stream_id session.active_stream_id = None session.pending_user_message = None session.pending_attachments = [] session.pending_started_at = None - session.messages.append(_interrupted_recovery_marker(recovered_output=recovered_output)) + session.messages.append( + _build_recovery_marker_with_retry_hook( + recovered_output=recovered_output, stream_id=_stream_id, + ) + ) session.save(touch_updated_at=touch_updated_at) logger.info("Session %s: no core transcript found, added error marker", sid) return True @@ -1313,9 +1711,19 @@ def get_session(sid, metadata_only=False): actual message history (e.g., for fast sidebar switching). """ with LOCK: - if sid in SESSIONS: + cached = SESSIONS.get(sid) + if cached is not None: SESSIONS.move_to_end(sid) # LRU: mark as recently used - return SESSIONS[sid] + if cached is not None: + if not metadata_only and _session_has_pending_journal_retry(cached): + try: + _try_retry_journal_recovery_in_place(cached) + except Exception: + logger.debug( + "lazy journal-retry failed on cache hit for session %s", + sid, exc_info=True, + ) + return cached if metadata_only: s = Session.load_metadata_only(sid) if s: @@ -1331,6 +1739,18 @@ def get_session(sid, metadata_only=False): if not metadata_only: try: repaired = _repair_stale_pending(s) + # If the stale-pending repair did not fire but the session + # already carries a pending-journal-retry marker (e.g. set on + # a previous repair pass), give the lazy-retry path one + # chance to self-heal on this read. + if not repaired and _session_has_pending_journal_retry(s): + try: + _try_retry_journal_recovery_in_place(s) + except Exception: + logger.debug( + "lazy journal-retry failed on cold load for session %s", + sid, exc_info=True, + ) # If repair had to bail because the per-session lock was held, # do not pin the still-stale sidecar in the LRU cache forever. # Leaving it cached would prevent future get_session() calls from diff --git a/docs/troubleshooting.md b/docs/troubleshooting.md index 3617aeba..78f5f4ec 100644 --- a/docs/troubleshooting.md +++ b/docs/troubleshooting.md @@ -85,6 +85,34 @@ If after running steps 1-4 the import still fails *and* `pip install -e .` succe --- +## "Response interrupted." marker keeps saying "no agent output was recovered" + +**Symptom.** After the WebUI process restarts mid-turn (manual restart, OOM, crash, …), the affected chat shows an `**Response interrupted.**` marker with the wording *"The user message above was preserved, but no agent output was recovered."*, even though the run-journal for that turn is present on disk and contains the partial tokens the agent had already streamed. + +**Why.** Sidecar repair re-checks the run-journal at restart and uses the result as a one-shot signal. On WSL2 (9p / DrvFs) and on some network-backed setups, the run-journal `.jsonl` is written by the dead worker but the WebUI process reads it through a page-cache state that has not yet seen those writes — recovery returns "empty" and the marker is baked permanently. The fix introduces a *lazy* retry path: when sidecar repair cannot read visible output but knows the stream id, it stores a `_pending_journal_recovery` flag on the marker and re-attempts recovery from `get_session()` until the journal becomes readable (or the retry budget is exhausted). + +**Diagnostic.** + +The on-disk locations below assume the default `~/.hermes/webui` state directory. If you override it via `HERMES_WEBUI_STATE_DIR`, substitute that path for `~/.hermes/webui` in every step. + +1. Identify the affected session id and stream id from the marker. The marker JSON lives at `~/.hermes/webui/sessions/.json`; after the fix it shows them on the `_journal_retry_stream_id` key. Pre-fix sessions only carry the legacy wording, with no retry meta. +2. Check whether the run-journal contains real events: + ```bash + ls -la ~/.hermes/webui/sessions/_run_journal//.jsonl + head -2 ~/.hermes/webui/sessions/_run_journal//.jsonl + ``` + If the file exists and contains `token` / `tool` events, the lazy-retry path will pick them up the next time the session is opened. + +**Fix.** Reload the session in the browser. On the next `get_session()` call the marker is re-evaluated; if the journaled events are visible on disk the marker promotes to *"The partial output above was recovered from the run journal …"* wording and the journaled assistant text + tool cards land above the marker in chronological order. No manual sidecar editing is required. + +**Trigger.** Sidebar metadata polling is intentionally not enough to run this self-heal. Requests such as `/api/session?messages=0&resolve_model=0` load the session with `metadata_only=True`, skip the full messages array, and therefore skip the lazy journal retry helper. Click/open the affected conversation so the message panel performs a full `messages=1` load; that full render is what re-checks the journal and can promote the marker. + +**Caps.** The lazy retry path gives up after 12 failed attempts or 24h of wall-clock age, at which point the marker is demoted to a neutral *"Partial output may have been lost."* wording so the "reload to retry" prompt doesn't linger forever for genuinely lost journals. + +**When to file a bug.** If, after the fix, you see the lazy-retry wording (*"Recovering the partial output from the run journal — reload this session to retry."*) but reloading the session never promotes it to the recovered wording even though the `.jsonl` clearly contains `token` events, capture the marker JSON and the run-journal file and file a bug. + +--- + ## Other troubleshooting This document grows over time. If a recurring failure mode isn't covered here yet, add it via PR. The format for each entry: **Symptom → Why → Diagnostic commands → Fix → When to file a bug**. diff --git a/tests/test_session_lost_response_regression.py b/tests/test_session_lost_response_regression.py new file mode 100644 index 00000000..4b85ebb9 --- /dev/null +++ b/tests/test_session_lost_response_regression.py @@ -0,0 +1,339 @@ +"""Regression: lazy-retry run-journal recovery across multiple session reads. + +The scenario this test pins down: + +1. A WebUI process restarts mid-stream. On the first sidecar repair attempt + the run-journal for the dead stream is NOT visible yet (page-cache loss, + un-fsynced writes, slow network FS, etc.) so + `_append_journaled_partial_output` returns False. +2. Pre-fix the repair path baked a permanent "no agent output was recovered" + marker into the session and never looked at the journal again — even + after the journaled tokens appeared on disk on a later read. +3. With the fix, the repair instead leaves a `_pending_journal_recovery` + flag on the marker; the next `get_session()` call lazily re-runs the + recovery, promotes the marker wording, and threads the journaled + assistant text/tools into the transcript in the correct chronological + position. +""" +from concurrent.futures import ThreadPoolExecutor +import threading + +import pytest + +import api.models as models +import api.config as config +import api.profiles as profiles +import api.streaming as streaming # noqa: F401 imported for fixture parity +from api.models import ( + Session, + _apply_core_sync_or_error_marker, +) +from api.run_journal import append_run_event + + +# ── Fixtures (shape mirrors test_session_sidecar_repair.py) ──────────────── + + +@pytest.fixture(autouse=True) +def _isolate_session_dir(tmp_path, monkeypatch): + session_dir = tmp_path / "sessions" + session_dir.mkdir() + index_file = session_dir / "_index.json" + monkeypatch.setattr(models, "SESSION_DIR", session_dir) + monkeypatch.setattr(models, "SESSION_INDEX_FILE", index_file) + models.SESSIONS.clear() + yield session_dir, index_file + models.SESSIONS.clear() + + +@pytest.fixture(autouse=True) +def _isolate_stream_state(): + config.STREAMS.clear() + config.CANCEL_FLAGS.clear() + config.AGENT_INSTANCES.clear() + config.STREAM_PARTIAL_TEXT.clear() + yield + config.STREAMS.clear() + config.CANCEL_FLAGS.clear() + config.AGENT_INSTANCES.clear() + config.STREAM_PARTIAL_TEXT.clear() + + +@pytest.fixture(autouse=True) +def _isolate_agent_locks(): + config.SESSION_AGENT_LOCKS.clear() + models._JOURNAL_RETRY_LOCKS.clear() + yield + config.SESSION_AGENT_LOCKS.clear() + models._JOURNAL_RETRY_LOCKS.clear() + + +@pytest.fixture() +def hermes_home(tmp_path, monkeypatch): + home = tmp_path / "hermes_home" + home.mkdir() + (home / "sessions").mkdir() + monkeypatch.setenv("HERMES_HOME", str(home)) + monkeypatch.setattr(profiles, "_DEFAULT_HERMES_HOME", home) + return home + + +def _make_dead_stream_session( + session_id: str, + *, + stream_id: str, + existing_msgs_count: int = 96, + pending_text: str = ( + "[IMPORTANT: Background process polling. " + "Continue the user's prior request.]" + ), +): + """Build a session that mirrors the production bug: lots of prior history, + pending_user_message set, an active_stream_id pointing at a dead stream, + and pending_started_at populated.""" + messages = [] + for i in range(existing_msgs_count // 2): + messages.append({"role": "user", "content": f"q{i}"}) + messages.append({"role": "assistant", "content": f"a{i}"}) + s = Session(session_id=session_id, title="Lost-response repro", messages=messages) + s.pending_user_message = pending_text + s.pending_started_at = 1779237637 # production-shaped value + s.active_stream_id = stream_id + return s + + +def _make_pending_retry_session(session_id: str, *, stream_id: str): + s = Session(session_id=session_id, title="Pending retry", messages=[ + {"role": "user", "content": "q", "timestamp": 1}, + { + "role": "assistant", + "content": models._INTERRUPTED_PENDING_RETRY_WORDING, + "timestamp": 2, + "_error": True, + "type": "interrupted", + "_pending_journal_recovery": True, + "_journal_retry_stream_id": stream_id, + "_journal_retry_attempts": 0, + "_journal_retry_first_seen_ts": int(models.time.time()), + }, + ]) + s.save() + return s + + +def _assert_retry_meta_removed(marker): + assert "_pending_journal_recovery" not in marker + assert "_journal_retry_stream_id" not in marker + assert "_journal_retry_attempts" not in marker + assert "_journal_retry_first_seen_ts" not in marker + + +# ── The regression test ──────────────────────────────────────────────────── + + +def test_lost_response_recovered_on_second_read(hermes_home): + sid = "9f14583f0e4e4444aaaa111122223333" + stream_id = "7c8b4108d52b4aba9af362d3a54f47ac" + + # ── Stage 1: simulate page-cache loss — sidecar repair runs while the + # run-journal for this stream is empty/absent on disk. + s = _make_dead_stream_session(sid, stream_id=stream_id) + s.save() + core_path = hermes_home / "sessions" / f"session_{sid}.json" + + result = _apply_core_sync_or_error_marker( + s, core_path, stream_id_for_recheck=stream_id, + ) + assert result is True + + # Marker should carry the lazy-retry flag and *not* the permanent + # "no agent output was recovered" wording yet. + last = s.messages[-1] + assert last.get("_error") is True + assert last.get("type") == "interrupted" + assert last.get("_pending_journal_recovery") is True, ( + "First repair pass should defer the recovery decision via a " + "_pending_journal_recovery flag so a later read can self-heal." + ) + assert last.get("_journal_retry_stream_id") == stream_id + assert last.get("_journal_retry_attempts") == 0 + assert isinstance(last.get("_journal_retry_first_seen_ts"), int) + assert "no agent output was recovered" not in last["content"] + # pending fields cleared regardless of journal visibility + assert s.pending_user_message is None + assert s.active_stream_id is None + assert s.pending_started_at is None + + # ── Stage 2: the journaled events become visible on disk. + append_run_event(sid, stream_id, "token", {"text": "Checking GitHub first."}) + append_run_event( + sid, + stream_id, + "tool", + { + "name": "terminal", + "preview": "gh pr list --repo nesquena/hermes-webui", + "args": {"command": "gh pr list --repo nesquena/hermes-webui"}, + }, + ) + append_run_event( + sid, + stream_id, + "tool_complete", + {"name": "terminal", "duration": 1.2, "is_error": False}, + ) + append_run_event( + sid, stream_id, "token", {"text": " The first PR scan completed."}, + ) + + # Pin session into the LRU cache and call get_session — this is the + # production path that triggers lazy retry. + models.SESSIONS[sid] = s + reloaded = models.get_session(sid) + assert reloaded is s + + contents = [m.get("content", "") for m in s.messages] + # The marker self-healed: + assert any("recovered from the run journal" in c for c in contents), ( + "After journaled tokens become readable, the marker must promote to " + "the recovered-output wording." + ) + assert not any("no agent output was recovered" in c for c in contents) + + # The journaled assistant text and tool card landed BEFORE the marker + # so chronological order in the transcript is preserved. + marker_idx = next( + i for i, m in enumerate(s.messages) + if m.get("type") == "interrupted" and m.get("_error") + ) + recovered_msgs = [ + m for m in s.messages[:marker_idx] + if m.get("_recovered_from_run_journal") is True + ] + assert recovered_msgs, "recovered assistant content must sit above the marker" + recovered_text = " ".join(m.get("content", "") for m in recovered_msgs) + assert "Checking GitHub first." in recovered_text + assert "first PR scan completed" in recovered_text + + # Tool card lives in session.tool_calls and points at one of the + # recovered assistant indices. + assert s.tool_calls, "journaled tool should be materialized" + assert s.tool_calls[-1]["name"] == "terminal" + assert s.tool_calls[-1]["done"] is True + + # Flag and meta cleaned up after promotion. + promoted = s.messages[marker_idx] + assert "_pending_journal_recovery" not in promoted + assert "_journal_retry_stream_id" not in promoted + assert "_journal_retry_attempts" not in promoted + assert "_journal_retry_first_seen_ts" not in promoted + + +def test_concurrent_get_session_serializes_lazy_journal_retry(hermes_home, monkeypatch): + sid = "retry_lock_sid" + stream_id = "retry_lock_stream" + s = _make_pending_retry_session(sid, stream_id=stream_id) + models.SESSIONS[sid] = s + + entered = threading.Event() + release = threading.Event() + counter_lock = threading.Lock() + calls = 0 + + def slow_retry(session, *, preserve_arriving_budget=False): + nonlocal calls + with counter_lock: + calls += 1 + entered.set() + assert release.wait(timeout=2), "test timed out waiting to release retry body" + return False + + monkeypatch.setattr(models, "_retry_journal_recovery_in_place", slow_retry) + + with ThreadPoolExecutor(max_workers=2) as executor: + first = executor.submit(models.get_session, sid) + assert entered.wait(timeout=2), "first caller did not enter retry helper" + second = executor.submit(models.get_session, sid) + assert second.result(timeout=2) is s + release.set() + assert first.result(timeout=2) is s + + assert calls == 1 + + +def test_still_arriving_journal_does_not_consume_retry_budget(hermes_home, monkeypatch): + sid = "retry_arriving_sid" + stream_id = "retry_arriving_stream" + s = _make_pending_retry_session(sid, stream_id=stream_id) + models.SESSIONS[sid] = s + + monkeypatch.setattr(models, "_append_journaled_partial_output", lambda *a, **kw: False) + monkeypatch.setattr(models, "_journal_is_still_arriving", lambda *a, **kw: True) + + for _ in range(20): + assert models.get_session(sid) is s + + marker = s.messages[-1] + assert marker["_journal_retry_attempts"] == 0 + assert marker["_pending_journal_recovery"] is True + assert marker["content"] == models._INTERRUPTED_PENDING_RETRY_WORDING + + +def test_sealed_empty_journal_consumes_retry_budget_and_demotes_at_max(hermes_home, monkeypatch): + sid = "retry_sealed_sid" + stream_id = "retry_sealed_stream" + s = _make_pending_retry_session(sid, stream_id=stream_id) + s.messages[-1]["_journal_retry_attempts"] = models._JOURNAL_RETRY_MAX_ATTEMPTS - 1 + append_run_event(sid, stream_id, "stream_end", {}) + models.SESSIONS[sid] = s + + assert models.get_session(sid) is s + + marker = s.messages[-1] + assert marker["content"] == models._INTERRUPTED_NEUTRAL_WORDING + _assert_retry_meta_removed(marker) + assert not any(m.get("_recovered_from_run_journal") for m in s.messages) + + +def test_marker_demotes_after_max_attempts_with_sealed_empty_journal(hermes_home, monkeypatch): + sid = "retry_max_sid" + stream_id = "retry_max_stream" + s = _make_pending_retry_session(sid, stream_id=stream_id) + s.messages[-1]["_journal_retry_attempts"] = models._JOURNAL_RETRY_MAX_ATTEMPTS - 1 + append_run_event(sid, stream_id, "stream_end", {}) + models.SESSIONS[sid] = s + + assert models.get_session(sid) is s + + marker = s.messages[-1] + assert marker["content"] == models._INTERRUPTED_NEUTRAL_WORDING + _assert_retry_meta_removed(marker) + assert not any(m.get("_recovered_from_run_journal") for m in s.messages) + + +def test_marker_demotes_after_giveup_seconds(hermes_home, monkeypatch): + base = 1_779_000_000 + monkeypatch.setattr(models.time, "time", lambda: base) + sid = "retry_age_sid" + stream_id = "retry_age_stream" + s = _make_pending_retry_session(sid, stream_id=stream_id) + s.messages[-1]["_journal_retry_first_seen_ts"] = ( + base - models._JOURNAL_RETRY_GIVEUP_SECONDS - 1 + ) + models.SESSIONS[sid] = s + + append_calls = 0 + + def append_should_not_run(*args, **kwargs): + nonlocal append_calls + append_calls += 1 + return False + + monkeypatch.setattr(models, "_append_journaled_partial_output", append_should_not_run) + + assert models.get_session(sid) is s + + marker = s.messages[-1] + assert marker["content"] == models._INTERRUPTED_NEUTRAL_WORDING + _assert_retry_meta_removed(marker) + assert append_calls == 0 diff --git a/tests/test_session_sidecar_repair.py b/tests/test_session_sidecar_repair.py index ca2bda45..369a0309 100644 --- a/tests/test_session_sidecar_repair.py +++ b/tests/test_session_sidecar_repair.py @@ -313,7 +313,15 @@ class TestDraftRecovery: ) assert "Response interrupted" in content assert "WebUI process restarted" in content - assert "user message above was preserved" in content + # The marker now arms the lazy-retry hook when a stream id is known + # ("Recovering the partial output… reload to retry."). The legacy + # "user message above was preserved" wording is reserved for the + # no-stream-id repair case; the post-retry-give-up case demotes to + # the neutral "Partial output may have been lost." wording instead. + assert ( + "user message above was preserved" in content + or "Recovering the partial output" in content + ) assert error_msgs[0].get("type") == "interrupted" def test_pending_attachments_recovered(self, hermes_home, monkeypatch): @@ -705,7 +713,14 @@ class TestNonEmptyMessagesPendingCleared: assert not any("private scratchpad text" in c for c in contents) error_msgs = [m for m in s.messages if m.get("_error")] assert len(error_msgs) == 1 - assert "no agent output was recovered" in error_msgs[0]["content"] + # Reasoning-only events do not count as visible output, so the marker + # arms the lazy-retry hook (stream id is known, journal may have more + # events appear on a later read). The legacy "no agent output was + # recovered" wording is now reserved for the no-stream-id case. + assert error_msgs[0].get("_pending_journal_recovery") is True + assert error_msgs[0].get("_journal_retry_stream_id") == "reasoning_only_stream" + assert "no agent output was recovered" not in error_msgs[0]["content"] + assert "Recovering the partial output" in error_msgs[0]["content"] def test_journal_recovery_keeps_consecutive_tools_on_one_anchor(self, hermes_home, monkeypatch): """Consecutive journaled tools without an intervening visible update @@ -1153,3 +1168,536 @@ class TestCoreSyncMetadata: assert len(user_msgs) == 1 assert user_msgs[0]["content"] == "My question" assert user_msgs[0].get("_recovered") is True + + +# ── Lazy run-journal recovery (read-side self-heal) ───────────────────────── + +class TestInterruptedRecoveryMarker: + """Pure-function tests for _interrupted_recovery_marker(pending_retry=…).""" + + def test_marker_recovered_output_excludes_pending_retry_flag(self): + marker = models._interrupted_recovery_marker(recovered_output=True) + assert marker["_error"] is True + assert marker["type"] == "interrupted" + assert "_pending_journal_recovery" not in marker + assert "recovered from the run journal" in marker["content"] + + def test_marker_pending_retry_sets_flag_and_wording(self): + marker = models._interrupted_recovery_marker(pending_retry=True) + assert marker.get("_pending_journal_recovery") is True + assert "Recovering the partial output" in marker["content"] + assert "no agent output was recovered" not in marker["content"] + + def test_marker_recovered_output_beats_pending_retry(self): + marker = models._interrupted_recovery_marker( + recovered_output=True, pending_retry=True, + ) + assert "_pending_journal_recovery" not in marker + assert "recovered from the run journal" in marker["content"] + + def test_marker_default_wording_unchanged_for_no_output_no_retry(self): + marker = models._interrupted_recovery_marker() + assert "_pending_journal_recovery" not in marker + assert "no agent output was recovered" in marker["content"] + + +class TestRetryJournalRecoveryInPlace: + """In-place retry helper: promote / increment / give up.""" + + def _make_pending_session(self, hermes_home, stream_id="lazy_stream", + attempts=0, first_seen_ts=None): + s = _make_session(messages=[ + {"role": "user", "content": "earlier turn"}, + {"role": "assistant", "content": "earlier reply"}, + {"role": "user", "content": "Stuck draft", "_recovered": True}, + ]) + marker = models._interrupted_recovery_marker(pending_retry=True) + marker["_journal_retry_stream_id"] = stream_id + marker["_journal_retry_attempts"] = attempts + marker["_journal_retry_first_seen_ts"] = ( + first_seen_ts if first_seen_ts is not None else int(time.time()) + ) + s.messages.append(marker) + s.save() + return s + + def test_promotes_marker_when_journal_now_available(self, hermes_home, monkeypatch): + stream_id = "lazy_stream_promote" + s = self._make_pending_session(hermes_home, stream_id=stream_id) + marker_before = s.messages[-1] + + append_run_event(s.session_id, stream_id, "token", {"text": "Late tokens."}) + append_run_event( + s.session_id, stream_id, "tool", + {"name": "terminal", "preview": "echo", "args": {"cmd": "echo"}}, + ) + + ok = models._retry_journal_recovery_in_place(s) + assert ok is True + + # Marker promoted in place and meta stripped + marker_idx = next( + i for i, m in enumerate(s.messages) + if m.get("type") == "interrupted" and m.get("_error") + ) + promoted = s.messages[marker_idx] + assert promoted is marker_before + assert "recovered from the run journal" in promoted["content"] + assert "_pending_journal_recovery" not in promoted + assert "_journal_retry_stream_id" not in promoted + assert "_journal_retry_attempts" not in promoted + assert "_journal_retry_first_seen_ts" not in promoted + + # Journaled rows reordered ABOVE the marker, preserving order + before_marker = s.messages[:marker_idx] + recovered = [m for m in before_marker if m.get("_recovered_from_run_journal")] + assert recovered, "journaled assistant rows must sit above the marker" + assert any("Late tokens." in m.get("content", "") for m in recovered) + # tool_call.assistant_msg_idx still points into the valid range + for tc in s.tool_calls or []: + idx = tc.get("assistant_msg_idx") + if isinstance(idx, int): + assert 0 <= idx < len(s.messages) + + def test_increments_attempts_when_journal_still_empty(self, hermes_home, monkeypatch): + stream_id = "lazy_stream_increment" + s = self._make_pending_session(hermes_home, stream_id=stream_id) + # No append_run_event — journal stays empty. + ok = models._retry_journal_recovery_in_place(s) + assert ok is False + marker = s.messages[-1] + assert marker.get("_pending_journal_recovery") is True + assert marker.get("_journal_retry_attempts") == 1 + assert "Recovering the partial output" in marker["content"] + + def test_demotes_to_neutral_after_max_attempts(self, hermes_home, monkeypatch): + stream_id = "lazy_stream_giveup_attempts" + s = self._make_pending_session( + hermes_home, stream_id=stream_id, + attempts=models._JOURNAL_RETRY_MAX_ATTEMPTS, + ) + ok = models._retry_journal_recovery_in_place(s) + assert ok is False + marker = s.messages[-1] + assert "_pending_journal_recovery" not in marker + assert "_journal_retry_stream_id" not in marker + assert "_journal_retry_attempts" not in marker + assert "_journal_retry_first_seen_ts" not in marker + assert "Partial output may have been lost" in marker["content"] + assert "Recovering the partial output" not in marker["content"] + + def test_demotes_to_neutral_after_giveup_seconds(self, hermes_home, monkeypatch): + stream_id = "lazy_stream_giveup_age" + first_seen = int(time.time()) - (models._JOURNAL_RETRY_GIVEUP_SECONDS + 60) + s = self._make_pending_session( + hermes_home, stream_id=stream_id, first_seen_ts=first_seen, + ) + ok = models._retry_journal_recovery_in_place(s) + assert ok is False + marker = s.messages[-1] + assert "_pending_journal_recovery" not in marker + assert "Partial output may have been lost" in marker["content"] + + def test_noop_when_no_pending_marker(self, hermes_home, monkeypatch): + s = _make_session(messages=[ + {"role": "user", "content": "x"}, + {"role": "assistant", "content": "y"}, + ]) + s.save() + spy = [] + original = models._append_journaled_partial_output + + def _spy(*a, **kw): + spy.append(1) + return original(*a, **kw) + + monkeypatch.setattr(models, "_append_journaled_partial_output", _spy) + ok = models._retry_journal_recovery_in_place(s) + assert ok is False + assert spy == [], "no pending marker → must not call recovery" + + def test_short_circuit_helper_detects_pending_marker(self, hermes_home, monkeypatch): + s = _make_session(messages=[ + {"role": "user", "content": "x"}, + {"role": "assistant", "content": "y"}, + ]) + assert models._session_has_pending_journal_retry(s) is False + marker = models._interrupted_recovery_marker(pending_retry=True) + marker["_journal_retry_stream_id"] = "abc" + marker["_journal_retry_attempts"] = 0 + marker["_journal_retry_first_seen_ts"] = int(time.time()) + s.messages.append(marker) + assert models._session_has_pending_journal_retry(s) is True + + def test_short_circuit_helper_stops_at_normal_assistant(self, hermes_home, monkeypatch): + s = _make_session(messages=[ + {"role": "user", "content": "x"}, + ]) + # An old, already-promoted marker followed by a normal assistant turn — + # the pending flag belongs to a prior turn that was healed; helper must + # not loop back into it. + marker = models._interrupted_recovery_marker(pending_retry=True) + marker["_journal_retry_stream_id"] = "abc" + marker["_journal_retry_attempts"] = 0 + marker["_journal_retry_first_seen_ts"] = int(time.time()) + s.messages.append(marker) + s.messages.append({"role": "user", "content": "later"}) + s.messages.append({"role": "assistant", "content": "later reply"}) + assert models._session_has_pending_journal_retry(s) is False + + +class TestGetSessionLazyRetryHook: + """get_session() must trigger _retry_journal_recovery_in_place on both + cache-hit and cold-load paths when a pending marker exists, and skip + quickly when nothing is pending.""" + + def _make_session_with_pending_marker(self, sid="lazy_get", stream_id="st"): + s = _make_session(session_id=sid, messages=[ + {"role": "user", "content": "u"}, + {"role": "assistant", "content": "a"}, + {"role": "user", "content": "u2", "_recovered": True}, + ]) + marker = models._interrupted_recovery_marker(pending_retry=True) + marker["_journal_retry_stream_id"] = stream_id + marker["_journal_retry_attempts"] = 0 + marker["_journal_retry_first_seen_ts"] = int(time.time()) + s.messages.append(marker) + return s + + def test_triggers_retry_on_cache_hit(self, hermes_home, monkeypatch): + sid = "lazy_get_cache" + stream_id = "stream_cache" + s = self._make_session_with_pending_marker(sid=sid, stream_id=stream_id) + s.save() + models.SESSIONS[sid] = s + append_run_event(sid, stream_id, "token", {"text": "Late."}) + + reloaded = models.get_session(sid) + assert reloaded is s + marker = s.messages[-1] if "Recovering" in s.messages[-1]["content"] else next( + m for m in s.messages + if m.get("type") == "interrupted" and m.get("_error") + ) + assert "recovered from the run journal" in marker["content"] + assert "_pending_journal_recovery" not in marker + + def test_triggers_retry_on_cold_load(self, hermes_home, monkeypatch): + sid = "lazy_get_cold" + stream_id = "stream_cold" + s = self._make_session_with_pending_marker(sid=sid, stream_id=stream_id) + s.save() + models.SESSIONS.pop(sid, None) + append_run_event(sid, stream_id, "token", {"text": "Late."}) + + reloaded = models.get_session(sid) + marker = next( + m for m in reloaded.messages + if m.get("type") == "interrupted" and m.get("_error") + ) + assert "recovered from the run journal" in marker["content"] + assert "_pending_journal_recovery" not in marker + + def test_short_circuit_when_no_pending_marker(self, hermes_home, monkeypatch): + sid = "lazy_get_no_pending" + s = _make_session(session_id=sid, messages=[ + {"role": "user", "content": "x"}, + {"role": "assistant", "content": "y"}, + ]) + s.save() + models.SESSIONS[sid] = s + spy = [] + monkeypatch.setattr( + models, "_retry_journal_recovery_in_place", + lambda session: spy.append(1) or False, + ) + models.get_session(sid) + assert spy == [] + + def test_metadata_only_skips_retry(self, hermes_home, monkeypatch): + sid = "lazy_get_meta" + stream_id = "stream_meta" + s = self._make_session_with_pending_marker(sid=sid, stream_id=stream_id) + s.save() + models.SESSIONS[sid] = s + spy = [] + monkeypatch.setattr( + models, "_retry_journal_recovery_in_place", + lambda session: spy.append(1) or False, + ) + models.get_session(sid, metadata_only=True) + assert spy == [], "metadata_only must skip the lazy-retry helper" + + +class TestLazyRetryBackwardsCompat: + """Pre-fix session shapes must continue to work.""" + + def test_legacy_marker_without_flag_unchanged(self, hermes_home, monkeypatch): + """An old session whose marker carries the legacy 'no agent output' + wording (no flag) must not be touched by get_session().""" + sid = "legacy_marker_sid" + s = _make_session(session_id=sid, messages=[ + {"role": "user", "content": "x"}, + {"role": "assistant", "content": "y"}, + {"role": "user", "content": "later", "_recovered": True}, + ]) + legacy = models._interrupted_recovery_marker(recovered_output=False) + s.messages.append(legacy) + s.save() + models.SESSIONS.pop(sid, None) + spy = [] + original = models._append_journaled_partial_output + monkeypatch.setattr( + models, "_append_journaled_partial_output", + lambda *a, **kw: spy.append(1) or original(*a, **kw), + ) + models.get_session(sid) + assert spy == [], "legacy marker (no flag) must not re-trigger recovery" + + def test_pending_retry_marker_round_trips_through_session_save_and_load( + self, hermes_home, monkeypatch): + """All four retry meta keys must survive Session.save() / Session.load().""" + sid = "round_trip_sid" + s = _make_session(session_id=sid, messages=[ + {"role": "user", "content": "x"}, + ]) + marker = models._interrupted_recovery_marker(pending_retry=True) + marker["_journal_retry_stream_id"] = "abc" + marker["_journal_retry_attempts"] = 3 + marker["_journal_retry_first_seen_ts"] = 1779200000 + s.messages.append(marker) + s.save() + models.SESSIONS.pop(sid, None) + + reloaded = Session.load(sid) + last = reloaded.messages[-1] + assert last.get("_pending_journal_recovery") is True + assert last.get("_journal_retry_stream_id") == "abc" + assert last.get("_journal_retry_attempts") == 3 + assert last.get("_journal_retry_first_seen_ts") == 1779200000 + + +class TestJournalToolDedupeScoping: + """`_journal_tool_already_present` must only collapse against tool cards + recovered from the same stream — a repeated tool (e.g. ``terminal: ls``) + in a previous turn must NOT pre-empt this turn's recovery.""" + + def test_repeated_tool_in_earlier_turn_does_not_block_recovery(self, hermes_home, monkeypatch): + sid = "dedupe_scope_sid" + stream_id = "dedupe_scope_stream" + s = _make_session(session_id=sid, messages=[ + {"role": "user", "content": "earlier turn"}, + {"role": "assistant", "content": "earlier reply"}, + {"role": "user", "content": "later turn", "_recovered": True}, + ]) + # Pre-existing tool card from an earlier turn with same (name, preview) + # but a different recovered-stream-id. The retry must NOT see this as a + # hit when dedupe_existing is asked to scope to ``stream_id``. + s.tool_calls = [ + { + "name": "terminal", + "preview": "ls", + "snippet": "ls", + "tid": "old-1", + "_recovered_from_run_journal": True, + "_recovered_stream_id": "earlier_stream", + "done": True, + } + ] + marker = models._interrupted_recovery_marker(pending_retry=True) + marker["_journal_retry_stream_id"] = stream_id + marker["_journal_retry_attempts"] = 0 + marker["_journal_retry_first_seen_ts"] = int(time.time()) + s.messages.append(marker) + s.save() + append_run_event(sid, stream_id, "token", {"text": "Listing."}) + append_run_event( + sid, stream_id, "tool", + {"name": "terminal", "preview": "ls", "args": {"cmd": "ls"}}, + ) + + ok = models._retry_journal_recovery_in_place(s) + assert ok is True + + # Two tool cards now: the old one untouched, plus a new one for this + # stream. If dedupe were session-wide the new one would be dropped. + scoped = [ + tc for tc in s.tool_calls + if tc.get("_recovered_stream_id") == stream_id + ] + assert len(scoped) == 1 + assert scoped[0]["name"] == "terminal" + assert scoped[0]["preview"] == "ls" + # Old tool card is preserved. + assert any( + tc.get("_recovered_stream_id") == "earlier_stream" + for tc in s.tool_calls + ) + + def test_untagged_tool_still_matches_for_core_transcript_invariant(self, hermes_home, monkeypatch): + """Tool cards without ``_recovered_stream_id`` (live tools, or tools + carried over from the core transcript) match regardless of the + ``stream_id`` argument. This preserves the "core transcript already + contains this tool, don't duplicate it" invariant the original repair + path relies on.""" + s = _make_session(messages=[ + {"role": "user", "content": "x"}, + {"role": "assistant", "content": "y"}, + ]) + s.tool_calls = [ + {"name": "terminal", "preview": "ls", "snippet": "ls"}, + ] + # No stream_id → legacy session-wide check returns True. + assert models._journal_tool_already_present(s, "terminal", "ls") is True + # With a stream_id, untagged tool cards still match (different stream + # ids only override the match decision when the existing card itself + # is tagged with a stream id that disagrees). + assert models._journal_tool_already_present( + s, "terminal", "ls", stream_id="some_stream", + ) is True + + def test_tagged_tool_with_different_stream_does_not_match(self, hermes_home, monkeypatch): + """A tool card tagged with a different recovered_stream_id must NOT + be considered a duplicate when the retry is scoped to a different + stream.""" + s = _make_session(messages=[ + {"role": "user", "content": "x"}, + ]) + s.tool_calls = [ + { + "name": "terminal", + "preview": "ls", + "snippet": "ls", + "_recovered_stream_id": "other_stream", + }, + ] + assert models._journal_tool_already_present( + s, "terminal", "ls", stream_id="this_stream", + ) is False + # But scoping to the same stream id matches. + assert models._journal_tool_already_present( + s, "terminal", "ls", stream_id="other_stream", + ) is True + + +class TestWslPageCacheRace: + """Cover the WSL2 / network-FS shape: read_run_events returns empty / errors + first, recovers on a later call.""" + + def test_first_read_raises_oserror_second_read_succeeds(self, hermes_home, monkeypatch): + sid = "wsl_race_sid" + stream_id = "wsl_race_stream" + s = _make_session(session_id=sid, messages=[ + {"role": "user", "content": "x"}, + ]) + s.pending_user_message = "Keep going" + s.pending_started_at = time.time() - 120 + s.active_stream_id = stream_id + + # Simulate first read raising IOError, then succeeding. + import api.run_journal as run_journal + real = run_journal.read_run_events + attempts = {"n": 0} + + def flaky_read(sid_, run_id, **kw): + attempts["n"] += 1 + if attempts["n"] == 1: + raise OSError("EIO simulated by test") + return real(sid_, run_id, **kw) + + monkeypatch.setattr(run_journal, "read_run_events", flaky_read) + + core_path = hermes_home / "sessions" / f"session_{sid}.json" + result = _apply_core_sync_or_error_marker( + s, core_path, stream_id_for_recheck=stream_id, + ) + assert result is True + marker = next(m for m in s.messages if m.get("type") == "interrupted") + assert marker.get("_pending_journal_recovery") is True + + # Now write journal events; the next retry call will read them. + append_run_event(sid, stream_id, "token", {"text": "Came back."}) + ok = models._retry_journal_recovery_in_place(s) + assert ok is True + marker_after = next(m for m in s.messages if m.get("type") == "interrupted") + assert "recovered from the run journal" in marker_after["content"] + assert "_pending_journal_recovery" not in marker_after + + def test_journal_grows_between_reads(self, hermes_home, monkeypatch): + sid = "wsl_grow_sid" + stream_id = "wsl_grow_stream" + s = _make_session(session_id=sid, messages=[ + {"role": "user", "content": "x"}, + ]) + s.pending_user_message = "Keep going" + s.pending_started_at = time.time() - 120 + s.active_stream_id = stream_id + + # First repair pass: nothing visible yet. + core_path = hermes_home / "sessions" / f"session_{sid}.json" + result = _apply_core_sync_or_error_marker( + s, core_path, stream_id_for_recheck=stream_id, + ) + assert result is True + marker = next(m for m in s.messages if m.get("type") == "interrupted") + assert marker.get("_pending_journal_recovery") is True + + # Journal grows. + append_run_event(sid, stream_id, "token", {"text": "Partial 1."}) + append_run_event(sid, stream_id, "token", {"text": " Partial 2."}) + append_run_event( + sid, stream_id, "tool", + {"name": "terminal", "preview": "ls", "args": {"cmd": "ls"}}, + ) + + ok = models._retry_journal_recovery_in_place(s) + assert ok is True + marker_after = next(m for m in s.messages if m.get("type") == "interrupted") + assert "recovered from the run journal" in marker_after["content"] + # Both tokens recovered, in order, before the marker. + marker_idx = s.messages.index(marker_after) + recovered_text = " ".join( + m.get("content", "") for m in s.messages[:marker_idx] + if m.get("_recovered_from_run_journal") + ) + assert "Partial 1." in recovered_text and "Partial 2." in recovered_text + + def test_concurrent_get_session_calls_idempotent(self, hermes_home, monkeypatch): + sid = "wsl_concurrent_sid" + stream_id = "wsl_concurrent_stream" + s = _make_session(session_id=sid, messages=[ + {"role": "user", "content": "x"}, + {"role": "assistant", "content": "y"}, + {"role": "user", "content": "later", "_recovered": True}, + ]) + marker = models._interrupted_recovery_marker(pending_retry=True) + marker["_journal_retry_stream_id"] = stream_id + marker["_journal_retry_attempts"] = 0 + marker["_journal_retry_first_seen_ts"] = int(time.time()) + s.messages.append(marker) + s.save() + append_run_event(sid, stream_id, "token", {"text": "ConcurrentTokens"}) + models.SESSIONS[sid] = s + + results = [] + + def _worker(): + try: + models.get_session(sid) + results.append("ok") + except Exception as exc: # noqa: BLE001 + results.append(exc) + + threads = [threading.Thread(target=_worker, daemon=True) for _ in range(2)] + for t in threads: + t.start() + for t in threads: + t.join(timeout=5) + + assert results.count("ok") == 2, f"both workers must succeed: {results}" + + # Exactly one interrupted marker remains, exactly one journal-recovered + # body (deduped by dedupe_existing=True). + interrupted_markers = [m for m in s.messages if m.get("type") == "interrupted"] + assert len(interrupted_markers) == 1 + recovered = [m for m in s.messages if m.get("_recovered_from_run_journal")] + assert sum(1 for m in recovered if "ConcurrentTokens" in m.get("content", "")) == 1