Files
hermes-webui/tests/test_run_journal_streaming_static.py
T
Hermes Agent b293bf8bc5 stage-364: Opus-caught live SSE event_id fix (side-channel approach)
Replace the earlier frontend-reset approach with a backend side-channel
approach that preserves the queue (event, data) tuple shape.

Problem (Opus catch):
- Live SSE frames emitted by _sse() in api/streaming.py:2296 carried no
  'id:' field. Only journal-replay frames (via _sse_with_id) emitted IDs.
- Frontend's _lastRunJournalSeq cursor stayed at 0 during live streaming.
- Mid-stream error → reconnect-to-replay arrived with after_seq=0.
- Server replayed every journaled event from seq 1.
- assistantText (closure-scoped) had accumulated all live tokens already
  → double-rendered output.

Fix:
- api/config.py: STREAM_LAST_EVENT_ID: dict = {} module-level dict.
- api/streaming.py put(): capture journal event_id, write to
  STREAM_LAST_EVENT_ID[stream_id]. Keep queue tuple as (event, data).
- api/routes.py _handle_sse_stream: read STREAM_LAST_EVENT_ID[stream_id]
  at emit time, use _sse_with_id when set.
- api/streaming.py finally block: pop STREAM_LAST_EVENT_ID for cleanup.

Why side-channel instead of 3-tuple:
- Earlier attempt (queue tuple → (event, data, event_id)) broke 4 existing
  tests: test_cancel_interrupt, test_sprint42, test_sprint51,
  test_issue1857_usage_overwrite. These all unpack 'event, data = q.get()'.
- Frontend-reset approach (reset assistantText before replay) broke 3
  other tests: test_smooth_text_fade, test_streaming_markdown,
  test_streaming_race_fix. _wireSSE must NOT reset accumulators because
  legacy reconnect doesn't replay events; only journal-replay does.

Side-channel preserves both invariants:
- Queue contract stays (event, data) — legacy consumers unbroken.
- Frontend accumulators stay alive on _wireSSE — legacy reconnect unbroken.
- Live SSE emits 'id:' so the journal cursor advances correctly.

6 regression tests added in test_stage364_opus_live_sse_event_id.py.
1 existing test (test_run_journal_streaming_static.test_streaming_journals_sse_events_before_queue_delivery) updated to be tuple-shape-agnostic.

Test results:
- Full pytest: 5713 passed, 10 skipped, 1 xfailed, 2 xpassed, 0 failed
- Previously-failing 5 tests: ALL PASS
- 6 new regression tests: ALL PASS
2026-05-16 03:58:54 +00:00

29 lines
1.3 KiB
Python

from pathlib import Path
def test_streaming_initializes_one_run_journal_writer_per_stream():
src = Path("api/streaming.py").read_text(encoding="utf-8")
register_idx = src.index("register_active_run(")
writer_idx = src.index("RunJournalWriter(session_id, stream_id)", register_idx)
cancel_idx = src.index("cancel_event = threading.Event()", writer_idx)
assert "from api.run_journal import RunJournalWriter" in src
assert register_idx < writer_idx < cancel_idx
def test_streaming_journals_sse_events_before_queue_delivery():
src = Path("api/streaming.py").read_text(encoding="utf-8")
put_idx = src.index("def put(event, data):")
journal_idx = src.index("run_journal.append_sse_event(event, data)", put_idx)
# Stage-364 maintainer fix: put() now pushes 3-tuples (event, data, event_id)
# so the SSE consumer can emit `id:` on live frames. Accept either shape
# so this test survives both the v0.51.71 in-flight fix and a future revert.
try:
queue_idx = src.index("q.put_nowait((event, data, event_id))", put_idx)
except ValueError:
queue_idx = src.index("q.put_nowait((event, data))", put_idx)
block = src[put_idx:queue_idx]
assert put_idx < journal_idx < queue_idx
assert "Failed to append run journal event" in block