From 6c5bc95b3b9b7108bcb004f76c39d49fbef5a24a Mon Sep 17 00:00:00 2001 From: Michael Lam Date: Sun, 3 May 2026 22:43:11 -0700 Subject: [PATCH 01/14] fix: broadcast SSE events to all tabs --- api/config.py | 48 ++++++++++++++++ api/routes.py | 26 ++++++--- tests/test_issue_1584_multitab_sse.py | 83 +++++++++++++++++++++++++++ 3 files changed, 148 insertions(+), 9 deletions(-) create mode 100644 tests/test_issue_1584_multitab_sse.py diff --git a/api/config.py b/api/config.py index 5f0ffc06..36ac1806 100644 --- a/api/config.py +++ b/api/config.py @@ -14,6 +14,7 @@ import copy import json import logging import os +import queue import sys import threading import time @@ -2745,6 +2746,53 @@ _INDEX_HTML_PATH = REPO_ROOT / "static" / "index.html" LOCK = threading.Lock() SESSIONS_MAX = 100 CHAT_LOCK = threading.Lock() + + +class StreamChannel: + """Broadcast SSE events to every connected browser tab for a stream. + + While no tab is connected, events are buffered so the first/reconnected + subscriber still receives the stream tail that arrived during the gap. + Once one or more subscribers are attached, new events are broadcast to all + of them instead of being consumed destructively by a single queue reader. + """ + + def __init__(self): + self._lock = threading.Lock() + self._subscribers: list[queue.Queue] = [] + self._offline_buffer: list[tuple[str, object]] = [] + + def subscribe(self) -> queue.Queue: + q: queue.Queue = queue.Queue() + with self._lock: + snapshot = list(self._offline_buffer) + self._subscribers.append(q) + for item in snapshot: + q.put_nowait(item) + return q + + def unsubscribe(self, q: queue.Queue) -> None: + with self._lock: + try: + self._subscribers.remove(q) + except ValueError: + pass + + def put_nowait(self, item: tuple[str, object]) -> None: + with self._lock: + subscribers = list(self._subscribers) + if not subscribers: + self._offline_buffer.append(item) + return + self._offline_buffer.clear() + for q in subscribers: + q.put_nowait(item) + + +def create_stream_channel() -> StreamChannel: + return StreamChannel() + + STREAMS: dict = {} STREAMS_LOCK = threading.Lock() CANCEL_FLAGS: dict = {} diff --git a/api/routes.py b/api/routes.py index 3c8b8086..c642678f 100644 --- a/api/routes.py +++ b/api/routes.py @@ -332,6 +332,7 @@ from api.config import ( get_reasoning_status, set_reasoning_display, set_reasoning_effort, + create_stream_channel, ) from api.helpers import ( require, @@ -3649,9 +3650,10 @@ def _handle_list_dir(handler, parsed): def _handle_sse_stream(handler, parsed): stream_id = parse_qs(parsed.query).get("stream_id", [""])[0] - q = STREAMS.get(stream_id) - if q is None: + stream = STREAMS.get(stream_id) + if stream is None: return j(handler, {"error": "stream not found"}, status=404) + subscriber = stream.subscribe() if hasattr(stream, "subscribe") else stream handler.send_response(200) handler.send_header("Content-Type", "text/event-stream; charset=utf-8") handler.send_header("Cache-Control", "no-cache") @@ -3661,7 +3663,7 @@ def _handle_sse_stream(handler, parsed): try: while True: try: - event, data = q.get(timeout=30) + event, data = subscriber.get(timeout=30) except queue.Empty: handler.wfile.write(b": heartbeat\n\n") handler.wfile.flush() @@ -3671,6 +3673,12 @@ def _handle_sse_stream(handler, parsed): break except _CLIENT_DISCONNECT_ERRORS: pass + finally: + if subscriber is not stream and hasattr(stream, "unsubscribe"): + try: + stream.unsubscribe(subscriber) + except Exception: + pass return True @@ -4812,9 +4820,9 @@ def _handle_btw(handler, body): stream_id = uuid.uuid4().hex ephemeral.active_stream_id = stream_id ephemeral.save() - q = queue.Queue() + stream = create_stream_channel() with STREAMS_LOCK: - STREAMS[stream_id] = q + STREAMS[stream_id] = stream from api.background import track_btw track_btw(body["session_id"], ephemeral.session_id, stream_id, question) thr = threading.Thread( @@ -4858,9 +4866,9 @@ def _handle_background(handler, body): stream_id = uuid.uuid4().hex bg.active_stream_id = stream_id bg.save() - q = queue.Queue() + stream = create_stream_channel() with STREAMS_LOCK: - STREAMS[stream_id] = q + STREAMS[stream_id] = stream task_id = uuid.uuid4().hex[:8] from api.background import track_background, complete_background parent_sid = body["session_id"] @@ -4974,9 +4982,9 @@ def _handle_chat_start(handler, body): s.pending_started_at = time.time() s.save() set_last_workspace(workspace) - q = queue.Queue() + stream = create_stream_channel() with STREAMS_LOCK: - STREAMS[stream_id] = q + STREAMS[stream_id] = stream thr = threading.Thread( target=_run_agent_streaming, args=(s.session_id, msg, model, workspace, stream_id, attachments), diff --git a/tests/test_issue_1584_multitab_sse.py b/tests/test_issue_1584_multitab_sse.py new file mode 100644 index 00000000..8341d317 --- /dev/null +++ b/tests/test_issue_1584_multitab_sse.py @@ -0,0 +1,83 @@ +import io +import threading +from types import SimpleNamespace + +from api.config import STREAMS, STREAMS_LOCK, create_stream_channel +from api.routes import _handle_sse_stream + + +class _FakeHandler: + def __init__(self): + self.status = None + self.headers = [] + self.wfile = io.BytesIO() + + def send_response(self, status): + self.status = status + + def send_header(self, key, value): + self.headers.append((key, value)) + + def end_headers(self): + return None + + +def test_stream_channel_broadcasts_each_event_to_every_subscriber(): + stream = create_stream_channel() + q1 = stream.subscribe() + q2 = stream.subscribe() + + try: + stream.put_nowait(("token", {"text": "H"})) + stream.put_nowait(("token", {"text": "allo"})) + stream.put_nowait(("stream_end", {"status": "done"})) + + assert q1.get(timeout=1) == ("token", {"text": "H"}) + assert q1.get(timeout=1) == ("token", {"text": "allo"}) + assert q1.get(timeout=1) == ("stream_end", {"status": "done"}) + + assert q2.get(timeout=1) == ("token", {"text": "H"}) + assert q2.get(timeout=1) == ("token", {"text": "allo"}) + assert q2.get(timeout=1) == ("stream_end", {"status": "done"}) + finally: + stream.unsubscribe(q1) + stream.unsubscribe(q2) + + +def test_same_stream_in_two_tabs_receives_identical_token_sequence(): + stream_id = "multitab-stream" + stream = create_stream_channel() + with STREAMS_LOCK: + STREAMS[stream_id] = stream + + handlers = [_FakeHandler(), _FakeHandler()] + threads = [ + threading.Thread( + target=_handle_sse_stream, + args=(handler, SimpleNamespace(query=f"stream_id={stream_id}")), + daemon=True, + ) + for handler in handlers + ] + + try: + for thread in threads: + thread.start() + + stream.put_nowait(("token", {"text": "H"})) + stream.put_nowait(("token", {"text": "allo"})) + stream.put_nowait(("stream_end", {"status": "done"})) + + for thread in threads: + thread.join(timeout=1) + assert not thread.is_alive(), "every tab should finish the same SSE stream" + + for handler in handlers: + payload = handler.wfile.getvalue().decode("utf-8") + assert handler.status == 200 + assert '"text": "H"' in payload + assert '"text": "allo"' in payload + assert "event: stream_end" in payload + finally: + with STREAMS_LOCK: + STREAMS.pop(stream_id, None) From ad46d820609d4dd7656170f29fbf051aa0937ff2 Mon Sep 17 00:00:00 2001 From: Michael Lam Date: Sun, 3 May 2026 22:33:54 -0700 Subject: [PATCH 02/14] fix: isolate pytest Hermes config path --- tests/conftest.py | 5 +++++ tests/test_pytest_config_isolation.py | 15 +++++++++++++++ 2 files changed, 20 insertions(+) create mode 100644 tests/test_pytest_config_isolation.py diff --git a/tests/conftest.py b/tests/conftest.py index 386c3fb8..b1a09580 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -69,6 +69,10 @@ os.environ['HERMES_WEBUI_STATE_DIR'] = str(TEST_STATE_DIR) os.environ['HERMES_WEBUI_DEFAULT_WORKSPACE'] = str(TEST_WORKSPACE) os.environ['HERMES_HOME'] = str(TEST_STATE_DIR) os.environ['HERMES_BASE_HOME'] = str(TEST_STATE_DIR) +# Hermes Agent sessions may inherit HERMES_CONFIG_PATH pointing at the live +# ~/.hermes/config.yaml. Override it before any product modules are imported so +# tests that read/write config.yaml stay inside the isolated test home. +os.environ['HERMES_CONFIG_PATH'] = str(TEST_STATE_DIR / 'config.yaml') # ── Server script: always relative to repo root ─────────────────────────── SERVER_SCRIPT = REPO_ROOT / 'server.py' @@ -297,6 +301,7 @@ def test_server(): "HERMES_WEBUI_DEFAULT_WORKSPACE": str(TEST_WORKSPACE), "HERMES_WEBUI_DEFAULT_MODEL": "openai/gpt-5.4-mini", "HERMES_HOME": str(TEST_STATE_DIR), + "HERMES_CONFIG_PATH": str(TEST_STATE_DIR / 'config.yaml'), # Belt-and-suspenders: HERMES_BASE_HOME hard-locks _DEFAULT_HERMES_HOME # in api/profiles.py to the test state dir regardless of profile switching # or any os.environ mutation that happens inside the server process. diff --git a/tests/test_pytest_config_isolation.py b/tests/test_pytest_config_isolation.py new file mode 100644 index 00000000..00775c91 --- /dev/null +++ b/tests/test_pytest_config_isolation.py @@ -0,0 +1,15 @@ +"""Regression coverage for pytest isolation of Hermes config paths.""" +import os +from pathlib import Path + + +def test_pytest_overrides_inherited_hermes_config_path(): + """A live-agent HERMES_CONFIG_PATH must never leak into WebUI tests. + + Hermes agents commonly run with HERMES_CONFIG_PATH pointing at the real + ~/.hermes/config.yaml. The test harness must replace it with the isolated + test home before product modules are imported, otherwise provider/onboarding + tests can mutate the user's real config. + """ + test_state_dir = Path(os.environ["HERMES_WEBUI_TEST_STATE_DIR"]) + assert Path(os.environ["HERMES_CONFIG_PATH"]) == test_state_dir / "config.yaml" From 22187d2b4ca2ed066c4e2023c7a2fb609401c370 Mon Sep 17 00:00:00 2001 From: Michael Lam Date: Sun, 3 May 2026 23:13:10 -0700 Subject: [PATCH 03/14] fix: resolve provider config cleanup path --- api/providers.py | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/api/providers.py b/api/providers.py index 86825774..546f675c 100644 --- a/api/providers.py +++ b/api/providers.py @@ -593,7 +593,13 @@ def _clean_provider_key_from_config(provider_id: str) -> None: from api.config import _cfg_lock try: - config_path = _get_config_path() + # Resolve through api.config at call time instead of the function imported + # at module load. Several tests (and some profile flows) monkeypatch the + # config module's path resolver after api.providers has already been + # imported; using the stale imported reference can clean the wrong + # config.yaml. + import api.config as _config + config_path = _config._get_config_path() except Exception: return From 14fac05dc9ac3e3f7632e2295cbc082a0dbfea26 Mon Sep 17 00:00:00 2001 From: Sanjay Santhanam <51058514+Sanjays2402@users.noreply.github.com> Date: Sun, 3 May 2026 23:21:19 -0700 Subject: [PATCH 04/14] fix(streaming): use truthy-check for _pending_started_at fallback MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Switch the per-turn duration fallback from `is not None` to a truthy check so None, missing-attr, and an explicit 0 all uniformly fall back to time.time(). Without this, a 0 timestamp (e.g. via a buggy migration or manual file edit) would yield `time.time() - 0` ≈ wall-clock-since-epoch, displaying nonsense like 'Done in 56 years 4 months ...'. In practice pending_started_at is always set via int(time.time()) so this is a hardening fix, not a live-bug fix. Also drop the brittle source-string assertion in the regression test that pinned the literal expression. The behavioural test test_done_handler_persists_duration_on_last_assistant_message already proves the duration field is set; pinning the source line broke twice during the v0.50.290 release pipeline alone (Opus tightening + maintainer revert). Fixes #1595 Signed-off-by: Sanjay Santhanam <51058514+Sanjays2402@users.noreply.github.com> --- api/streaming.py | 6 ++++-- tests/test_turn_duration_display.py | 4 ---- 2 files changed, 4 insertions(+), 6 deletions(-) diff --git a/api/streaming.py b/api/streaming.py index 9f079409..1ff71227 100644 --- a/api/streaming.py +++ b/api/streaming.py @@ -2054,8 +2054,10 @@ def _run_agent_streaming( agent.ephemeral_system_prompt = _personality_prompt _pending_started_at = getattr(s, 'pending_started_at', None) # Normal chat-start sets pending_started_at before spawning this thread; - # fallback to now only for recovered/legacy flows where that marker is absent. - _turn_started_at = _pending_started_at if _pending_started_at is not None else time.time() + # fallback to now only for recovered/legacy flows where that marker is absent + # or has been zeroed out (e.g. via a buggy migration / manual file edit). + # Truthy-check covers None, missing-attr, and 0 uniformly. + _turn_started_at = _pending_started_at if _pending_started_at else time.time() _previous_messages = list(s.messages or []) _previous_context_messages = list(_session_context_messages(s)) _pre_compression_count = getattr( diff --git a/tests/test_turn_duration_display.py b/tests/test_turn_duration_display.py index 8fc38c25..b87deb74 100644 --- a/tests/test_turn_duration_display.py +++ b/tests/test_turn_duration_display.py @@ -21,10 +21,6 @@ def test_streaming_done_payload_includes_backend_turn_duration(): "Turn duration should be measured from the persisted pending_started_at " "start time, not only from browser-local state." ) - assert "if _pending_started_at is not None else time.time()" in STREAMING_PY, ( - "The fallback should preserve explicit timestamp values and only use now " - "when pending_started_at is absent." - ) assert "recovered/legacy flows" in STREAMING_PY, ( "The missing-start fallback should be documented so it is not mistaken " "for the primary timing path." From 032b680e2615861314ff75d20f4669abab6a859f Mon Sep 17 00:00:00 2001 From: Michael Lam Date: Sun, 3 May 2026 23:55:45 -0700 Subject: [PATCH 05/14] fix: render streaming markdown on subpath mounts --- static/index.html | 2 +- static/messages.js | 7 +++++-- tests/test_streaming_markdown.py | 24 ++++++++++++++++++++++++ 3 files changed, 30 insertions(+), 3 deletions(-) diff --git a/static/index.html b/static/index.html index 2d22a9f1..3fe0c6aa 100644 --- a/static/index.html +++ b/static/index.html @@ -28,7 +28,7 @@