diff --git a/CHANGELOG.md b/CHANGELOG.md index e5fbd7d3..c3b0a9ed 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,14 @@ ## [Unreleased] +## [v0.50.248] — 2026-04-30 + +### Added +- **Real-time approval notifications via SSE long-connection** — replaces the 1.5s HTTP polling loop with a Server-Sent Events endpoint at `/api/approval/stream?session_id=` that pushes approval events to the browser the instant they fire. Cuts approval latency from up to 1.5s down to near-instant and eliminates the "always polling" network noise users observed. Backend uses a thread-safe subscriber registry (`_approval_sse_subscribers` dict, bounded `queue.Queue(maxsize=16)` per subscriber, silent drop on full to prevent leaks from slow tabs). 30s keepalive comments prevent proxy/CDN timeouts; `_CLIENT_DISCONNECT_ERRORS` + `finally` block guarantee subscriber cleanup on any exit path. **Subscribe and snapshot are taken atomically under a single `_lock` acquisition** so a `submit_pending()` arriving in the gap can't be lost. **Notify runs inside the queue-mutation lock** in both `submit_pending` and `_handle_approval_respond` so two parallel callers can't deliver out-of-order with stale `pending_count`. **SSE payload always reflects head-of-queue, never tail**, matching `/api/approval/pending`'s contract — with parallel tool-call approvals (#527), the just-appended entry is at the tail but the UI must show the head. **`_handle_approval_respond` now re-emits the new head after popping** so a trailing approval queued behind the one being responded to is surfaced immediately instead of getting stuck until the next event. Frontend uses `EventSource` with automatic 1.5s HTTP polling fallback on `onerror` (preserves degraded-mode parity with v0.50.247). 50 tests cover wiring, lifecycle, multi-subscriber, cross-session isolation, queue overflow, concurrent subscribe/notify stress, atomic-lock invariants, head-fidelity, trailing-approval re-emission, and notify-order monotonicity. (`api/routes.py`, `static/messages.js`, `tests/test_approval_sse.py`, `tests/test_pr1350_sse_atomic_subscribe.py`, `tests/test_pr1350_sse_notify_correctness.py`) @fxd-jason — PR #1350 + +### Fixed +- **Context indicator percentage shows even without explicit `context_length`** — frontend companion to the v0.50.246 backend fix. The context ring used to display `·` (no data) whenever `context_length` was 0 or missing — fresh agents, interrupted streams, or models without compressor state. Now defaults to **128K** when `usage.context_length` is falsy and labels the indicator with `(est. 128K)` so users can tell apparent vs. measured. Falls back to `input_tokens` for `last_prompt_tokens` so the ring lights up immediately on the first user message. (`static/ui.js`) @fxd-jason — PR #1349 + ## [v0.50.247] — 2026-04-30 ### Added diff --git a/api/routes.py b/api/routes.py index d1ece383..bc2ac1d0 100644 --- a/api/routes.py +++ b/api/routes.py @@ -545,6 +545,67 @@ except ImportError: _permanent_approved = set() +# ── Approval SSE subscribers (long-connection push) ────────────────────────── +_approval_sse_subscribers: dict[str, list[queue.Queue]] = {} + + +def _approval_sse_subscribe(session_id: str) -> queue.Queue: + """Register an SSE subscriber for approval events on a given session.""" + q = queue.Queue(maxsize=16) + with _lock: + _approval_sse_subscribers.setdefault(session_id, []).append(q) + return q + + +def _approval_sse_unsubscribe(session_id: str, q: queue.Queue) -> None: + """Remove an SSE subscriber.""" + with _lock: + subs = _approval_sse_subscribers.get(session_id) + if subs and q in subs: + subs.remove(q) + if not subs: + _approval_sse_subscribers.pop(session_id, None) + + +def _approval_sse_notify_locked(session_id: str, head: dict | None, total: int) -> None: + """Push an approval event to all SSE subscribers for a session. + + CALLER MUST HOLD `_lock`. Snapshots the subscriber list under the held + lock and then calls `q.put_nowait()` on each (which is itself thread-safe). + + `head` is the approval entry currently at the head of the queue (the one + the UI should display) — NOT the just-appended entry. With multiple + parallel approvals (#527), the just-appended entry is at the TAIL, but + `/api/approval/pending` always returns the HEAD, so SSE must match. + + `total` is the total number of pending approvals. + + Pass `head=None` and `total=0` when the queue has just been emptied (e.g. + `_handle_approval_respond` popped the last entry) so the client knows to + hide its approval card. + """ + payload = {"pending": dict(head) if head else None, "pending_count": total} + subs = _approval_sse_subscribers.get(session_id, ()) + for q in subs: + try: + q.put_nowait(payload) + except queue.Full: + pass # drop if subscriber is slow (bounded queue prevents memory leak) + + +def _approval_sse_notify(session_id: str, head: dict | None, total: int) -> None: + """Convenience wrapper that takes `_lock` itself. + + Use only from contexts that don't already hold `_lock`. Production call + sites (submit_pending, _handle_approval_respond) MUST hold the lock and + call `_approval_sse_notify_locked` directly to avoid a notify-ordering + race where a later append's notify can fire before an earlier append's + notify (resulting in stale `pending_count`). + """ + with _lock: + _approval_sse_notify_locked(session_id, head, total) + + def submit_pending(session_key: str, approval: dict) -> None: """Append a pending approval to the per-session queue. @@ -553,16 +614,23 @@ def submit_pending(session_key: str, approval: dict) -> None: a specific entry even when multiple approvals are queued simultaneously. - Change the storage from a single overwriting dict value to a list, so parallel tool calls each get their own approval slot (fixes #527). + - Notify any connected SSE subscribers immediately. """ entry = dict(approval) entry.setdefault("approval_id", uuid.uuid4().hex) with _lock: - queue = _pending.setdefault(session_key, []) + queue_list = _pending.setdefault(session_key, []) # Replace a legacy non-list value if the agent version uses the old pattern. - if not isinstance(queue, list): - _pending[session_key] = [queue] - queue = _pending[session_key] - queue.append(entry) + if not isinstance(queue_list, list): + _pending[session_key] = [queue_list] + queue_list = _pending[session_key] + queue_list.append(entry) + total = len(queue_list) + head = queue_list[0] # /api/approval/pending always returns head + # Push to SSE subscribers from inside _lock so two parallel + # submit_pending calls can't deliver out-of-order (T2's later + # notify arriving before T1's earlier notify with a stale count). + _approval_sse_notify_locked(session_key, head, total) # NOTE: We do NOT call _submit_pending_raw here — that function overwrites # _pending[session_key] with a single dict, which would undo the list we just # built. The gateway blocking path uses _gateway_queues (a separate mechanism @@ -1191,6 +1259,9 @@ def handle_get(handler, parsed) -> bool: if parsed.path == "/api/approval/pending": return _handle_approval_pending(handler, parsed) + if parsed.path == "/api/approval/stream": + return _handle_approval_sse_stream(handler, parsed) + if parsed.path == "/api/approval/inject_test": # Loopback-only: used by automated tests; blocked from any remote client if handler.client_address[0] != "127.0.0.1": @@ -2720,6 +2791,66 @@ def _handle_approval_pending(handler, parsed): return j(handler, {"pending": None, "pending_count": 0}) +def _handle_approval_sse_stream(handler, parsed): + """SSE endpoint for real-time approval notifications. + + Long-lived connection that pushes approval events the moment they arrive, + replacing the 1.5s polling loop. The frontend uses EventSource and falls + back to HTTP polling if the connection fails. + """ + sid = parse_qs(parsed.query).get("session_id", [""])[0] + if not sid: + return bad(handler, "session_id is required") + + # Subscribe AND snapshot atomically under a single _lock acquisition so a + # submit_pending() that fires between the two cannot be lost. If we + # snapshot first then subscribe (the naive ordering), an approval that + # arrives in the gap is appended to _pending (after our snapshot) AND + # notified to subscribers (before we joined) — leaving the client unaware + # until the next event arrives. + q = queue.Queue(maxsize=16) + initial_pending = None + initial_count = 0 + with _lock: + _approval_sse_subscribers.setdefault(sid, []).append(q) + q_list = _pending.get(sid) + if isinstance(q_list, list): + initial_pending = dict(q_list[0]) if q_list else None + initial_count = len(q_list) + elif q_list: + initial_pending = dict(q_list) + initial_count = 1 + + handler.send_response(200) + handler.send_header('Content-Type', 'text/event-stream; charset=utf-8') + handler.send_header('Cache-Control', 'no-cache') + handler.send_header('X-Accel-Buffering', 'no') + handler.send_header('Connection', 'keep-alive') + handler.end_headers() + + from api.streaming import _sse + + # Push initial state immediately so the client doesn't miss anything. + _sse(handler, 'initial', {"pending": initial_pending, "pending_count": initial_count}) + + try: + while True: + try: + payload = q.get(timeout=30) + except queue.Empty: + # Keepalive — SSE comment line prevents proxy/CDN timeout. + handler.wfile.write(b': keepalive\n\n') + handler.wfile.flush() + continue + if payload is None: + break # signal to close + _sse(handler, 'approval', payload) + except _CLIENT_DISCONNECT_ERRORS: + pass # client went away — normal for long-lived connections + finally: + _approval_sse_unsubscribe(sid, q) + + def _handle_approval_inject(handler, parsed): """Inject a fake pending approval -- loopback-only, used by automated tests.""" qs = parse_qs(parsed.query) @@ -3783,6 +3914,16 @@ def _handle_approval_respond(handler, body): elif queue: # Legacy single-dict value. pending = _pending.pop(sid, None) + # Notify SSE subscribers of the new head (or empty state) so the UI + # surfaces any trailing approvals that were queued behind this one + # without waiting for the next submit_pending. Without this, a parallel + # tool-call scenario (#527) would leave the second approval invisible + # in the SSE path until the next event ever fired (the agent thread + # would be parked indefinitely from the user's perspective). + if isinstance(_pending.get(sid), list) and _pending[sid]: + _approval_sse_notify_locked(sid, _pending[sid][0], len(_pending[sid])) + else: + _approval_sse_notify_locked(sid, None, 0) if pending: keys = pending.get("pattern_keys") or [pending.get("pattern_key", "")] diff --git a/static/messages.js b/static/messages.js index 8ea4fd35..c4f92d9f 100644 --- a/static/messages.js +++ b/static/messages.js @@ -1314,6 +1314,50 @@ async function respondApproval(choice) { function startApprovalPolling(sid) { stopApprovalPolling(); + // ── SSE (preferred): long-lived connection, server pushes instantly ── + try { + const es = new EventSource('/api/approval/stream?session_id=' + encodeURIComponent(sid)); + let _fallbackActive = false; + + es.addEventListener('initial', e => { + const d = JSON.parse(e.data); + if (d.pending) { d.pending._session_id = sid; showApprovalCard(d.pending, d.pending_count || 1); } + else { hideApprovalCard(); } + }); + + es.addEventListener('approval', e => { + const d = JSON.parse(e.data); + if (d.pending) { d.pending._session_id = sid; showApprovalCard(d.pending, d.pending_count || 1); } + else { hideApprovalCard(); } + }); + + es.onerror = () => { + // SSE failed — fall back to HTTP polling (3s interval) + if (_fallbackActive) return; + _fallbackActive = true; + try { es.close(); } catch(_){} + _startApprovalFallbackPoll(sid); + }; + + // If the session changes or stops being busy, close the SSE. + // We detect this via a periodic check (cheap — no network request). + _approvalSSEHealthTimer = setInterval(() => { + if (!S.busy || !S.session || S.session.session_id !== sid) { + stopApprovalPolling(); hideApprovalCard(true); + } + }, 5000); + + _approvalEventSource = es; + } catch(_e) { + // EventSource constructor failed — use polling directly + _startApprovalFallbackPoll(sid); + } +} + +let _approvalEventSource = null; +let _approvalSSEHealthTimer = null; + +function _startApprovalFallbackPoll(sid) { _approvalPollTimer = setInterval(async () => { if (!S.busy || !S.session || S.session.session_id !== sid) { stopApprovalPolling(); hideApprovalCard(true); return; @@ -1323,11 +1367,13 @@ function startApprovalPolling(sid) { if (data.pending) { data.pending._session_id=sid; showApprovalCard(data.pending, data.pending_count||1); } else { hideApprovalCard(); } } catch(e) { /* ignore poll errors */ } - }, 1500); + }, 1500); // matches the v0.50.247 polling cadence so degraded-mode users see the same responsiveness } function stopApprovalPolling() { if (_approvalPollTimer) { clearInterval(_approvalPollTimer); _approvalPollTimer = null; } + if (_approvalEventSource) { try { _approvalEventSource.close(); } catch(_){} _approvalEventSource = null; } + if (_approvalSSEHealthTimer) { clearInterval(_approvalSSEHealthTimer); _approvalSSEHealthTimer = null; } } // ── Clarify polling ── diff --git a/static/ui.js b/static/ui.js index 5f6e2fc9..6d01dec3 100644 --- a/static/ui.js +++ b/static/ui.js @@ -850,9 +850,12 @@ function _syncCtxIndicator(usage){ const wrap=$('ctxIndicatorWrap'); const el=$('ctxIndicator'); if(!el)return; + // Use input_tokens as fallback when last_prompt_tokens is not available const promptTok=usage.last_prompt_tokens||usage.input_tokens||0; const totalTok=(usage.input_tokens||0)+(usage.output_tokens||0); - const ctxWindow=usage.context_length||0; + // Default context window to 128K when not provided by backend + const DEFAULT_CTX=128*1024; + const ctxWindow=usage.context_length||DEFAULT_CTX; const cost=usage.estimated_cost; // Show indicator whenever we have any usage data (tokens or cost) if(!promptTok&&!totalTok&&!cost){ @@ -860,8 +863,8 @@ function _syncCtxIndicator(usage){ return; } if(wrap) wrap.style.display=''; - const hasCtxWindow=!!(promptTok&&ctxWindow); - const pct=hasCtxWindow?Math.min(100,Math.round((promptTok/ctxWindow)*100)):0; + const hasPromptTok=!!promptTok; + const pct=hasPromptTok?Math.min(100,Math.round((promptTok/ctxWindow)*100)):0; const ring=$('ctxRingValue'); const center=$('ctxPercent'); const usageLine=$('ctxTooltipUsage'); @@ -873,7 +876,8 @@ function _syncCtxIndicator(usage){ ring.style.strokeDasharray=String(circumference); ring.style.strokeDashoffset=String(circumference*(1-pct/100)); } - if(center) center.textContent=hasCtxWindow?String(pct):'\u00b7'; + if(center) center.textContent=hasPromptTok?String(pct):'\u00b7'; + const hasExplicitCtx=!!usage.context_length; el.classList.toggle('ctx-mid',pct>50&&pct<=75); el.classList.toggle('ctx-high',pct>75); // ── Compress affordance (#524) ── @@ -900,11 +904,12 @@ function _syncCtxIndicator(usage){ compressWrap.style.display='none'; } } - let label=hasCtxWindow?`Context window ${pct}% used`:`${_fmtTokens(totalTok)} tokens used`; + let label=hasPromptTok?`Context window ${pct}% used`:`${_fmtTokens(totalTok)} tokens used`; + if(!hasExplicitCtx&&hasPromptTok) label+=' (est. 128K)'; if(cost) label+=` \u00b7 $${cost<0.01?cost.toFixed(4):cost.toFixed(2)}`; el.setAttribute('aria-label',label); - if(usageLine) usageLine.textContent=hasCtxWindow?`${pct}% used (${Math.max(0,100-pct)}% left)`:`${_fmtTokens(totalTok)} tokens used`; - if(tokensLine) tokensLine.textContent=hasCtxWindow?`${_fmtTokens(promptTok)} / ${_fmtTokens(ctxWindow)} tokens used`:`In: ${_fmtTokens(usage.input_tokens||0)} \u00b7 Out: ${_fmtTokens(usage.output_tokens||0)}`; + if(usageLine) usageLine.textContent=hasPromptTok?`${pct}% used (${Math.max(0,100-pct)}% left)`:`${_fmtTokens(totalTok)} tokens used`; + if(tokensLine) tokensLine.textContent=hasPromptTok?`${_fmtTokens(promptTok)} / ${_fmtTokens(ctxWindow)} tokens used`:`In: ${_fmtTokens(usage.input_tokens||0)} \u00b7 Out: ${_fmtTokens(usage.output_tokens||0)}`; const threshold=usage.threshold_tokens||0; if(thresholdLine){ if(threshold&&ctxWindow){ diff --git a/tests/test_approval_queue.py b/tests/test_approval_queue.py index fbcac019..5d94e828 100644 --- a/tests/test_approval_queue.py +++ b/tests/test_approval_queue.py @@ -23,8 +23,8 @@ INDEX_HTML = (REPO_ROOT / "static" / "index.html").read_text(encoding="utf-8") def test_submit_pending_appends_to_list(): """submit_pending() must append to a list, not overwrite.""" - # The new wrapper must contain queue.append - assert "queue.append(entry)" in ROUTES_SRC, \ + # The new wrapper must contain a queue append (list mutation pattern) + assert "queue_list.append(entry)" in ROUTES_SRC or "queue.append(entry)" in ROUTES_SRC, \ "submit_pending() must append entry to a list queue, not overwrite _pending[sid]" diff --git a/tests/test_approval_sse.py b/tests/test_approval_sse.py new file mode 100644 index 00000000..89670747 --- /dev/null +++ b/tests/test_approval_sse.py @@ -0,0 +1,487 @@ +"""Tests for the approval SSE (Server-Sent Events) long-connection implementation. + +Verifies: + - SSE subscribe/unsubscribe/notify lifecycle + - Initial snapshot delivery on connect + - Instant push when submit_pending() fires + - Client disconnect triggers unsubscribe cleanup + - Multiple concurrent subscribers per session + - Queue overflow (slow subscriber) drops silently + - Cross-session isolation (notify only reaches matching subscribers) + - Frontend EventSource / fallback polling patterns +""" + +import json +import pathlib +import queue +import re +import sys +import threading +import time +import uuid + +REPO_ROOT = pathlib.Path(__file__).parent.parent.resolve() +sys.path.insert(0, str(REPO_ROOT)) + +ROUTES_SRC = (REPO_ROOT / "api" / "routes.py").read_text(encoding="utf-8") +MESSAGES_JS = (REPO_ROOT / "static" / "messages.js").read_text(encoding="utf-8") + + +# ═══════════════════════════════════════════════════════════════════════════════ +# 1. Static-analysis tests (no server needed) +# ═══════════════════════════════════════════════════════════════════════════════ + +class TestSSEStaticAnalysis: + """Verify the SSE infrastructure exists and is wired correctly in routes.py.""" + + def test_sse_route_registered(self): + """The /api/approval/stream route must be registered.""" + assert '"/api/approval/stream"' in ROUTES_SRC, \ + "Route /api/approval/stream must be registered in the URL dispatch" + + def test_sse_handler_function_exists(self): + """_handle_approval_sse_stream handler must exist.""" + assert "def _handle_approval_sse_stream(" in ROUTES_SRC, \ + "_handle_approval_sse_stream handler function must exist" + + def test_subscribe_function_exists(self): + """_approval_sse_subscribe must exist and use a Queue.""" + assert "def _approval_sse_subscribe(" in ROUTES_SRC, \ + "_approval_sse_subscribe must be defined" + + def test_unsubscribe_function_exists(self): + """_approval_sse_unsubscribe must exist and clean up empty lists.""" + assert "def _approval_sse_unsubscribe(" in ROUTES_SRC, \ + "_approval_sse_unsubscribe must be defined" + + def test_notify_function_exists(self): + """_approval_sse_notify must exist and push to subscriber queues.""" + assert "def _approval_sse_notify(" in ROUTES_SRC, \ + "_approval_sse_notify must be defined" + + def test_sse_subscribers_dict_exists(self): + """Module-level _approval_sse_subscribers dict must exist.""" + assert "_approval_sse_subscribers" in ROUTES_SRC, \ + "_approval_sse_subscribers module-level dict must exist" + + def test_sse_content_type(self): + """SSE handler must set text/event-stream content type.""" + assert "text/event-stream" in ROUTES_SRC, \ + "SSE handler must set Content-Type to text/event-stream" + + def test_sse_keepalive(self): + """SSE handler must send keepalive comments to prevent proxy timeout.""" + assert "keepalive" in ROUTES_SRC, \ + "SSE handler must send keepalive comments" + + def test_sse_cache_control(self): + """SSE handler must set Cache-Control: no-cache.""" + assert "no-cache" in ROUTES_SRC, \ + "SSE handler must set Cache-Control: no-cache" + + def test_sse_initial_snapshot(self): + """SSE handler must send initial snapshot on connect.""" + assert "'initial'" in ROUTES_SRC, \ + "SSE handler must send an 'initial' event with snapshot data" + + def test_sse_approval_event(self): + """SSE handler must send 'approval' events on push.""" + assert "'approval'" in ROUTES_SRC, \ + "SSE handler must send 'approval' events when pushing notifications" + + def test_notify_called_from_submit_pending(self): + """submit_pending must call _approval_sse_notify_locked.""" + # Pinned to the inner-lock variant: must run inside the same `with _lock:` + # block as the queue mutation so two parallel submit_pending calls can't + # deliver out-of-order with stale pending_count. Tracks the v0.50.248 + # MUST-FIX A fix. + assert "_approval_sse_notify_locked(session_key, head, total)" in ROUTES_SRC, \ + ("submit_pending() must call _approval_sse_notify_locked(session_key, head, total) " + "from inside the `with _lock:` block — not the unlocked _approval_sse_notify wrapper, " + "and head must be queue_list[0] (the head, not the just-appended entry).") + + def test_unsubscribe_in_finally(self): + """SSE handler must unsubscribe in a finally block.""" + # Find the finally block that calls _approval_sse_unsubscribe + assert re.search(r"finally:.*\n.*_approval_sse_unsubscribe\(", ROUTES_SRC, re.DOTALL), \ + "SSE handler must call _approval_sse_unsubscribe in a finally block" + + def test_client_disconnect_handled(self): + """SSE handler must catch client disconnect errors.""" + assert "_CLIENT_DISCONNECT_ERRORS" in ROUTES_SRC, \ + "SSE handler must catch client disconnect errors" + + def test_subscriber_queue_maxsize(self): + """Subscriber queues must have a bounded maxsize to prevent memory leaks.""" + assert "queue.Queue(maxsize=" in ROUTES_SRC, \ + "Subscriber queues must have maxsize set to prevent unbounded memory growth" + + def test_notify_drops_on_full(self): + """_approval_sse_notify must silently drop events when subscriber is slow.""" + # The queue.Full exception handler + assert "queue.Full" in ROUTES_SRC, \ + "_approval_sse_notify must handle queue.Full to drop events for slow subscribers" + + def test_subscribe_uses_shared_lock(self): + """subscribe/unsubscribe/notify must all use the same _lock.""" + # All three functions must use _lock + for func in ["_approval_sse_subscribe", "_approval_sse_unsubscribe", "_approval_sse_notify"]: + # Find the function and verify it uses "with _lock" + func_start = ROUTES_SRC.find(f"def {func}(") + assert func_start != -1, f"{func} must exist" + # Find the next function definition after this one + next_func = ROUTES_SRC.find("\ndef ", func_start + 1) + func_body = ROUTES_SRC[func_start:next_func] if next_func != -1 else ROUTES_SRC[func_start:] + assert "with _lock:" in func_body, \ + f"{func} must use 'with _lock:' for thread safety" + + def test_unsubscribe_cleans_empty_session(self): + """Unsubscribe must remove empty session keys from the dict.""" + assert "_approval_sse_subscribers.pop(session_id, None)" in ROUTES_SRC, \ + "_approval_sse_unsubscribe must pop session_id when subscriber list is empty" + + +class TestFrontendSSEImplementation: + """Verify the frontend JavaScript SSE implementation.""" + + def test_eventsource_used(self): + """Frontend must use EventSource for SSE connection.""" + assert "new EventSource(" in MESSAGES_JS, \ + "startApprovalPolling must create an EventSource for SSE" + + def test_sse_url_matches_backend(self): + """Frontend SSE URL must match backend /api/approval/stream route.""" + assert "/api/approval/stream" in MESSAGES_JS, \ + "EventSource must connect to /api/approval/stream" + + def test_initial_event_listener(self): + """Frontend must listen for 'initial' SSE events.""" + assert "'initial'" in MESSAGES_JS or '"initial"' in MESSAGES_JS, \ + "Frontend must addEventListener for 'initial' SSE events" + + def test_approval_event_listener(self): + """Frontend must listen for 'approval' SSE events.""" + assert "'approval'" in MESSAGES_JS or '"approval"' in MESSAGES_JS, \ + "Frontend must addEventListener for 'approval' SSE events" + + def test_onerror_fallback_to_polling(self): + """onerror must fall back to HTTP polling.""" + assert "_startApprovalFallbackPoll" in MESSAGES_JS, \ + "SSE onerror handler must call _startApprovalFallbackPoll" + + def test_fallback_poll_interval(self): + """Fallback polling interval must match v0.50.247's 1500ms cadence.""" + assert "1500" in MESSAGES_JS, \ + "Fallback polling interval must be 1500ms to match degraded-mode parity with v0.50.247" + + def test_fallback_closes_eventsource(self): + """onerror handler must close the EventSource before falling back.""" + # The onerror handler should call es.close() + assert "es.close()" in MESSAGES_JS, \ + "onerror handler must close the EventSource before falling back" + + def test_stop_closes_eventsource(self): + """stopApprovalPolling must close EventSource.""" + assert "_approvalEventSource.close()" in MESSAGES_JS, \ + "stopApprovalPolling must close _approvalEventSource" + + def test_health_timer_cleanup(self): + """stopApprovalPolling must clear the SSE health timer.""" + assert "_approvalSSEHealthTimer" in MESSAGES_JS, \ + "SSE health timer must be tracked and cleared in stopApprovalPolling" + + +# ═══════════════════════════════════════════════════════════════════════════════ +# 2. Unit tests (in-process, no HTTP server) +# ═══════════════════════════════════════════════════════════════════════════════ + +class TestSSESubscribeUnsubscribe: + """Test the subscribe/unsubscribe lifecycle.""" + + def setup_method(self): + """Clean SSE subscriber state before each test.""" + from api import routes as r + with r._lock: + r._approval_sse_subscribers.clear() + + def teardown_method(self): + """Clean up after each test.""" + from api import routes as r + with r._lock: + r._approval_sse_subscribers.clear() + + def test_subscribe_returns_queue(self): + """_approval_sse_subscribe must return a Queue.""" + from api import routes as r + sid = f"sse-test-{uuid.uuid4().hex[:8]}" + q = r._approval_sse_subscribe(sid) + assert isinstance(q, queue.Queue), "subscribe must return a queue.Queue" + # Cleanup + r._approval_sse_unsubscribe(sid, q) + + def test_subscribe_registers_subscriber(self): + """After subscribe, the queue must appear in _approval_sse_subscribers.""" + from api import routes as r + sid = f"sse-reg-{uuid.uuid4().hex[:8]}" + q = r._approval_sse_subscribe(sid) + try: + with r._lock: + subs = r._approval_sse_subscribers.get(sid, []) + assert q in subs, "Subscribed queue must be in the subscribers list" + finally: + r._approval_sse_unsubscribe(sid, q) + + def test_unsubscribe_removes_queue(self): + """After unsubscribe, the queue must not be in the subscribers list.""" + from api import routes as r + sid = f"sse-unsub-{uuid.uuid4().hex[:8]}" + q = r._approval_sse_subscribe(sid) + r._approval_sse_unsubscribe(sid, q) + with r._lock: + subs = r._approval_sse_subscribers.get(sid, []) + assert q not in subs, "Unsubscribed queue must not be in the list" + + def test_unsubscribe_removes_empty_session_key(self): + """When the last subscriber is removed, the session key must be cleaned up.""" + from api import routes as r + sid = f"sse-empty-{uuid.uuid4().hex[:8]}" + q = r._approval_sse_subscribe(sid) + r._approval_sse_unsubscribe(sid, q) + with r._lock: + assert sid not in r._approval_sse_subscribers, \ + "Session key must be removed when subscriber list is empty" + + def test_unsubscribe_idempotent(self): + """Unsubscribing twice must not raise.""" + from api import routes as r + sid = f"sse-idem-{uuid.uuid4().hex[:8]}" + q = r._approval_sse_subscribe(sid) + r._approval_sse_unsubscribe(sid, q) + r._approval_sse_unsubscribe(sid, q) # should not raise + + def test_unsubscribe_unknown_queue_noop(self): + """Unsubscribing a queue that was never subscribed must not crash.""" + from api import routes as r + sid = f"sse-noop-{uuid.uuid4().hex[:8]}" + q = queue.Queue() + r._approval_sse_unsubscribe(sid, q) # should not raise + + +class TestSSENotify: + """Test the notification mechanism.""" + + def setup_method(self): + from api import routes as r + with r._lock: + r._approval_sse_subscribers.clear() + + def teardown_method(self): + from api import routes as r + with r._lock: + r._approval_sse_subscribers.clear() + + def test_notify_delivers_payload(self): + """_approval_sse_notify must put the payload on subscriber queues.""" + from api import routes as r + sid = f"sse-notify-{uuid.uuid4().hex[:8]}" + q = r._approval_sse_subscribe(sid) + try: + entry = {"command": "rm -rf /tmp/test", "pattern_key": "delete"} + r._approval_sse_notify(sid, entry, 1) + payload = q.get(timeout=1) + assert payload["pending"]["command"] == "rm -rf /tmp/test" + assert payload["pending_count"] == 1 + finally: + r._approval_sse_unsubscribe(sid, q) + + def test_notify_multiple_subscribers(self): + """All subscribers for a session must receive the notification.""" + from api import routes as r + sid = f"sse-multi-{uuid.uuid4().hex[:8]}" + q1 = r._approval_sse_subscribe(sid) + q2 = r._approval_sse_subscribe(sid) + q3 = r._approval_sse_subscribe(sid) + try: + entry = {"command": "test-cmd"} + r._approval_sse_notify(sid, entry, 2) + for q in [q1, q2, q3]: + payload = q.get(timeout=1) + assert payload["pending"]["command"] == "test-cmd" + assert payload["pending_count"] == 2 + finally: + for q in [q1, q2, q3]: + r._approval_sse_unsubscribe(sid, q) + + def test_notify_cross_session_isolation(self): + """Notify for session A must NOT deliver to session B subscribers.""" + from api import routes as r + sid_a = f"sse-iso-a-{uuid.uuid4().hex[:8]}" + sid_b = f"sse-iso-b-{uuid.uuid4().hex[:8]}" + qa = r._approval_sse_subscribe(sid_a) + qb = r._approval_sse_subscribe(sid_b) + try: + entry = {"command": "only-for-a"} + r._approval_sse_notify(sid_a, entry, 1) + # qa should have the event + payload = qa.get(timeout=1) + assert payload["pending"]["command"] == "only-for-a" + # qb should be empty + assert qb.empty(), "Session B subscriber must not receive session A events" + finally: + r._approval_sse_unsubscribe(sid_a, qa) + r._approval_sse_unsubscribe(sid_b, qb) + + def test_notify_no_subscribers_is_noop(self): + """Notifying a session with no subscribers must not raise.""" + from api import routes as r + sid = f"sse-nosub-{uuid.uuid4().hex[:8]}" + r._approval_sse_notify(sid, {"command": "test"}, 1) # should not raise + + def test_notify_drops_on_full_queue(self): + """When subscriber queue is full, events must be silently dropped.""" + from api import routes as r + sid = f"sse-full-{uuid.uuid4().hex[:8]}" + q = r._approval_sse_subscribe(sid) + try: + # Fill the queue (maxsize=16) + for i in range(20): + r._approval_sse_notify(sid, {"command": f"cmd-{i}"}, i + 1) + # Queue should have at most 16 items + assert q.qsize() <= 16, "Queue must not exceed maxsize" + assert q.qsize() > 0, "Queue should have some items" + finally: + r._approval_sse_unsubscribe(sid, q) + + +class TestSSENotifyFromSubmitPending: + """Test that submit_pending triggers SSE notifications.""" + + def setup_method(self): + from api import routes as r + with r._lock: + r._approval_sse_subscribers.clear() + r._pending.clear() + + def teardown_method(self): + from api import routes as r + with r._lock: + r._approval_sse_subscribers.clear() + r._pending.clear() + + def test_submit_pending_notifies_sse_subscriber(self): + """submit_pending must push an SSE event to subscribers.""" + from api import routes as r + sid = f"sse-submit-{uuid.uuid4().hex[:8]}" + q = r._approval_sse_subscribe(sid) + try: + r.submit_pending(sid, { + "command": "rm -rf /tmp/test", + "pattern_key": "recursive delete", + "pattern_keys": ["recursive delete"], + "description": "recursive delete", + }) + payload = q.get(timeout=1) + assert payload["pending"]["command"] == "rm -rf /tmp/test" + assert payload["pending_count"] == 1 + finally: + r._approval_sse_unsubscribe(sid, q) + + def test_submit_pending_delivers_count(self): + """Multiple submit_pending calls must report correct pending_count.""" + from api import routes as r + sid = f"sse-count-{uuid.uuid4().hex[:8]}" + q = r._approval_sse_subscribe(sid) + try: + for i in range(3): + r.submit_pending(sid, { + "command": f"cmd-{i}", + "pattern_key": f"p{i}", + "pattern_keys": [f"p{i}"], + "description": f"d{i}", + }) + for expected_count in [1, 2, 3]: + payload = q.get(timeout=1) + assert payload["pending_count"] == expected_count, \ + f"Expected pending_count={expected_count}, got {payload['pending_count']}" + finally: + r._approval_sse_unsubscribe(sid, q) + + +class TestSSEConcurrency: + """Test thread safety of SSE subscribe/unsubscribe/notify.""" + + def setup_method(self): + from api import routes as r + with r._lock: + r._approval_sse_subscribers.clear() + r._pending.clear() + + def teardown_method(self): + from api import routes as r + with r._lock: + r._approval_sse_subscribers.clear() + r._pending.clear() + + def test_concurrent_subscribe_unsubscribe(self): + """Concurrent subscribe/unsubscribe must not corrupt state.""" + from api import routes as r + sid = f"sse-conc-{uuid.uuid4().hex[:8]}" + errors = [] + queues = [] + + def worker(): + try: + for _ in range(50): + q = r._approval_sse_subscribe(sid) + queues.append(q) + r._approval_sse_unsubscribe(sid, q) + except Exception as e: + errors.append(e) + + threads = [threading.Thread(target=worker) for _ in range(4)] + for t in threads: + t.start() + for t in threads: + t.join(timeout=10) + + assert not errors, f"Concurrent subscribe/unsubscribe errors: {errors}" + # After all threads finish, no queues should remain + with r._lock: + subs = r._approval_sse_subscribers.get(sid, []) + assert len(subs) == 0, "All subscribers should be cleaned up" + + def test_concurrent_notify_while_subscribing(self): + """Notify while new subscribers are joining must not deadlock or crash.""" + from api import routes as r + sid = f"sse-notsub-{uuid.uuid4().hex[:8]}" + errors = [] + + def notifier(): + try: + for i in range(100): + r._approval_sse_notify(sid, {"command": f"cmd-{i}"}, 1) + except Exception as e: + errors.append(e) + + def subscriber(): + try: + for _ in range(50): + q = r._approval_sse_subscribe(sid) + time.sleep(0.001) + r._approval_sse_unsubscribe(sid, q) + except Exception as e: + errors.append(e) + + threads = [ + threading.Thread(target=notifier), + threading.Thread(target=subscriber), + threading.Thread(target=subscriber), + ] + for t in threads: + t.start() + for t in threads: + t.join(timeout=15) + + assert not errors, f"Concurrent notify/subscribe errors: {errors}" + with r._lock: + r._approval_sse_subscribers.clear() diff --git a/tests/test_pr1350_sse_atomic_subscribe.py b/tests/test_pr1350_sse_atomic_subscribe.py new file mode 100644 index 00000000..fad3ea29 --- /dev/null +++ b/tests/test_pr1350_sse_atomic_subscribe.py @@ -0,0 +1,108 @@ +"""Test that the SSE subscribe + snapshot are taken atomically under _lock. + +Regression test for the snapshot/subscribe race condition: if subscribe +happens AFTER the snapshot, a submit_pending() that fires in the gap is +both appended to _pending (after our snapshot) AND notified to subscribers +(before we joined) — the client never learns about it until the next event. + +The fix in v0.50.248 takes the lock once, registers the subscriber queue, +THEN reads the snapshot — all under the same lock acquisition. + +This test verifies the source-level invariant rather than the runtime +behavior: the subscriber-registration line MUST appear inside the same +`with _lock:` block as the snapshot read, and BEFORE the snapshot read. +""" + +import pathlib +import sys + +REPO_ROOT = pathlib.Path(__file__).parent.parent.resolve() +sys.path.insert(0, str(REPO_ROOT)) + +ROUTES_SRC = (REPO_ROOT / "api" / "routes.py").read_text(encoding="utf-8") + + +def _extract_lock_block(body: str) -> str: + """Extract the body of the first `with _lock:` block from the handler. + + Lines are part of the block as long as they are blank or start with the + block's indent (>= 8 spaces, since the handler body itself is at 4 spaces + and the lock block indents one level deeper). + """ + lines = body.split("\n") + out: list[str] = [] + in_block = False + block_indent = None + for line in lines: + if not in_block: + if line.strip() == "with _lock:": + in_block = True + continue + # Determine block indent from the first non-empty line we see inside. + if block_indent is None: + stripped = line.lstrip(" ") + if stripped == "": + continue # blank lines don't set indent + block_indent = len(line) - len(stripped) + out.append(line) + continue + # Continuation: blank lines OK, otherwise must be at >= block_indent. + if line.strip() == "": + out.append(line) + continue + line_indent = len(line) - len(line.lstrip(" ")) + if line_indent >= block_indent: + out.append(line) + else: + break + return "\n".join(out) + + +def _handler_body() -> str: + start = ROUTES_SRC.find("def _handle_approval_sse_stream(") + assert start != -1, "_handle_approval_sse_stream must exist" + end = ROUTES_SRC.find("\ndef ", start + 1) + return ROUTES_SRC[start:end if end != -1 else len(ROUTES_SRC)] + + +def test_snapshot_taken_under_lock(): + """The initial _pending snapshot must be guarded by `with _lock:`.""" + lock_body = _extract_lock_block(_handler_body()) + assert lock_body, "_handle_approval_sse_stream must contain a `with _lock:` block" + assert "_pending.get(sid)" in lock_body, \ + "Initial snapshot of _pending must be read inside the `with _lock:` block" + + +def test_subscriber_registered_inside_lock(): + """The subscriber queue must be registered inside the same `with _lock:` block.""" + lock_body = _extract_lock_block(_handler_body()) + assert lock_body, "Handler must contain a `with _lock:` block" + assert "_approval_sse_subscribers" in lock_body and "append(q)" in lock_body, \ + ("Subscriber registration (`_approval_sse_subscribers.setdefault(sid, []).append(q)`) " + "must happen inside the same `with _lock:` block as the snapshot. " + "Otherwise a submit_pending() between snapshot-and-subscribe is lost.") + + +def test_subscribe_before_snapshot_in_lock(): + """Inside the lock, the subscriber must be registered BEFORE reading the snapshot.""" + lock_body = _extract_lock_block(_handler_body()) + assert lock_body, "Handler must contain a `with _lock:` block" + + sub_idx = lock_body.find("_approval_sse_subscribers") + snap_idx = lock_body.find("_pending.get(sid)") + + assert sub_idx != -1, "Subscriber registration must be inside the lock" + assert snap_idx != -1, "Snapshot read must be inside the lock" + assert sub_idx < snap_idx, ( + "Subscriber registration must come BEFORE the snapshot read inside the lock. " + "Otherwise an approval arriving between subscribe and snapshot is silently dropped." + ) + + +def test_no_double_subscribe_outside_lock(): + """The handler must not also call `_approval_sse_subscribe()` (legacy code path).""" + body = _handler_body() + assert "= _approval_sse_subscribe(sid)" not in body, ( + "_handle_approval_sse_stream must not call _approval_sse_subscribe() — " + "the atomic version inlines subscribe inside the snapshot lock block." + ) diff --git a/tests/test_pr1350_sse_notify_correctness.py b/tests/test_pr1350_sse_notify_correctness.py new file mode 100644 index 00000000..ead05e61 --- /dev/null +++ b/tests/test_pr1350_sse_notify_correctness.py @@ -0,0 +1,259 @@ +"""Regression tests for the v0.50.248 SSE notify-on-respond fix. + +Originally PR #1350 only called `_approval_sse_notify` from `submit_pending`, +not from `_handle_approval_respond`. With the parallel-tool-call scenario +that PR #527 supports: + + 1. submit_pending(A) -> SSE pushes (A, 1). UI shows A. + 2. submit_pending(B) -> SSE pushes (B, 2). UI shows B. (bug: was sending B as head, not A) + 3. respond(B) -> queue still contains A. UI hides card. + NO event fires. A is invisible until next event. + +Pre-release Opus review caught two MUST-FIX bugs: + A. notify-ordering race: notify outside _lock could deliver out-of-order + under contention. + C. Trailing approval lost: respond never re-emitted the new head. + D. Payload was tail-not-head: with #527 parallel approvals, /api/approval/pending + returns head, but SSE was returning the just-appended entry (tail). + +The fix: + - `_approval_sse_notify_locked(sid, head, total)` runs inside the caller's + held `_lock` so two parallel callers serialize their notifies in the same + order they serialize their queue mutations. + - submit_pending now passes `head = queue_list[0]` (head-of-queue), not the + just-appended entry. + - _handle_approval_respond now calls _approval_sse_notify_locked after the + pop with the new head (or None/0 if queue is empty). +""" + +import pathlib +import queue +import sys +import time +import uuid + +REPO_ROOT = pathlib.Path(__file__).parent.parent.resolve() +sys.path.insert(0, str(REPO_ROOT)) + + +class TestParallelApprovalsHeadFidelity: + """SSE payload must always reflect head-of-queue, never tail.""" + + def setup_method(self): + from api import routes as r + with r._lock: + r._approval_sse_subscribers.clear() + r._pending.clear() + + def teardown_method(self): + from api import routes as r + with r._lock: + r._approval_sse_subscribers.clear() + r._pending.clear() + + def test_second_submit_pending_sends_head_not_tail(self): + """When B is appended while A is still pending, SSE payload must show A as head.""" + from api import routes as r + sid = f"sse-headtail-{uuid.uuid4().hex[:8]}" + q = r._approval_sse_subscribe(sid) + try: + r.submit_pending(sid, { + "command": "first-A", + "pattern_key": "first", + "pattern_keys": ["first"], + "description": "A", + }) + r.submit_pending(sid, { + "command": "second-B", + "pattern_key": "second", + "pattern_keys": ["second"], + "description": "B", + }) + # First payload should be A (just-appended, also head). + p1 = q.get(timeout=1) + assert p1["pending"]["command"] == "first-A" + assert p1["pending_count"] == 1 + # Second payload's HEAD is still A (we appended B but A is still queued). + p2 = q.get(timeout=1) + assert p2["pending"]["command"] == "first-A", ( + "SSE payload must show head-of-queue (A), not tail (B). " + f"Got: {p2['pending']['command']}" + ) + assert p2["pending_count"] == 2 + finally: + r._approval_sse_unsubscribe(sid, q) + + +class TestRespondNotifiesTrailingApproval: + """After respond() pops one approval, SSE must re-emit the new head if any.""" + + def setup_method(self): + from api import routes as r + with r._lock: + r._approval_sse_subscribers.clear() + r._pending.clear() + + def teardown_method(self): + from api import routes as r + with r._lock: + r._approval_sse_subscribers.clear() + r._pending.clear() + + def test_respond_to_first_pushes_second_as_new_head(self): + """submit A; submit B; respond(A) -> SSE must push (B, 1) so the UI surfaces B.""" + from api import routes as r + sid = f"sse-trailing-{uuid.uuid4().hex[:8]}" + + # Subscribe BEFORE any submit so we capture all events deterministically. + sub_q = r._approval_sse_subscribe(sid) + try: + r.submit_pending(sid, { + "command": "first-A", + "pattern_key": "p1", + "pattern_keys": ["p1"], + "description": "A", + "approval_id": "id-A", + }) + r.submit_pending(sid, { + "command": "second-B", + "pattern_key": "p2", + "pattern_keys": ["p2"], + "description": "B", + "approval_id": "id-B", + }) + # Drain the two submit-driven events. + sub_q.get(timeout=1) # head=A, total=1 + sub_q.get(timeout=1) # head=A, total=2 (head is still A) + + # Now simulate respond(A) by directly invoking the lock+pop+notify + # sequence the route handler runs. (Calling _handle_approval_respond + # would require an HTTP handler mock; the inner sequence is what we + # need to verify.) + from api.routes import _approval_sse_notify_locked, _lock, _pending + with _lock: + qlist = _pending.get(sid) + # Pop A by approval_id + for i, e in enumerate(qlist): + if e.get("approval_id") == "id-A": + qlist.pop(i) + break + # Re-emit head + if isinstance(_pending.get(sid), list) and _pending[sid]: + _approval_sse_notify_locked(sid, _pending[sid][0], len(_pending[sid])) + else: + _approval_sse_notify_locked(sid, None, 0) + + # SSE must push (B, 1) so the UI surfaces the trailing approval. + p3 = sub_q.get(timeout=1) + assert p3["pending"] is not None, \ + "After responding to A, SSE must emit the new head B (not None)" + assert p3["pending"]["command"] == "second-B", \ + f"New head should be B, got: {p3['pending']['command']}" + assert p3["pending_count"] == 1 + finally: + r._approval_sse_unsubscribe(sid, sub_q) + + def test_respond_to_only_pending_pushes_empty_state(self): + """If respond pops the last entry, SSE must push a None/0 sentinel so UI hides card.""" + from api import routes as r + sid = f"sse-empty-{uuid.uuid4().hex[:8]}" + + sub_q = r._approval_sse_subscribe(sid) + try: + r.submit_pending(sid, { + "command": "only-A", + "pattern_key": "p", + "pattern_keys": ["p"], + "description": "A", + "approval_id": "id-only-A", + }) + sub_q.get(timeout=1) # drain submit notify + + from api.routes import _approval_sse_notify_locked, _lock, _pending + with _lock: + qlist = _pending.get(sid) + for i, e in enumerate(qlist): + if e.get("approval_id") == "id-only-A": + qlist.pop(i) + break + if not qlist: + _pending.pop(sid, None) + if isinstance(_pending.get(sid), list) and _pending[sid]: + _approval_sse_notify_locked(sid, _pending[sid][0], len(_pending[sid])) + else: + _approval_sse_notify_locked(sid, None, 0) + + payload = sub_q.get(timeout=1) + assert payload["pending"] is None, \ + "After responding to the only approval, SSE must push pending=None" + assert payload["pending_count"] == 0 + finally: + r._approval_sse_unsubscribe(sid, sub_q) + + +class TestNotifyOrderUnderContention: + """Two parallel submit_pending callers must deliver in queue-mutation order.""" + + def setup_method(self): + from api import routes as r + with r._lock: + r._approval_sse_subscribers.clear() + r._pending.clear() + + def teardown_method(self): + from api import routes as r + with r._lock: + r._approval_sse_subscribers.clear() + r._pending.clear() + + def test_pending_count_is_monotonic_under_contention(self): + """Under parallel submit_pending, pending_count must be monotonically increasing. + + Pre-fix: notify outside _lock meant T2's notify could fire before T1's, + with subscribers seeing pending_count=2 then pending_count=1. Now that + notify runs inside _lock alongside the append, the order is guaranteed. + """ + import threading + from api import routes as r + sid = f"sse-order-{uuid.uuid4().hex[:8]}" + + sub_q = r._approval_sse_subscribe(sid) + try: + errors = [] + barrier = threading.Barrier(8) + + def submitter(idx): + try: + barrier.wait(timeout=2) + r.submit_pending(sid, { + "command": f"cmd-{idx}", + "pattern_key": f"p{idx}", + "pattern_keys": [f"p{idx}"], + "description": f"d{idx}", + }) + except Exception as e: + errors.append(e) + + threads = [threading.Thread(target=submitter, args=(i,)) for i in range(8)] + for t in threads: + t.start() + for t in threads: + t.join(timeout=5) + + assert not errors, f"Submitter errors: {errors}" + + # Drain payloads — count must go 1, 2, 3, ..., 8 in some order + # consistent with the queue serialization. Specifically, never decrease. + counts = [] + for _ in range(8): + p = sub_q.get(timeout=2) + counts.append(p["pending_count"]) + + assert counts == sorted(counts), ( + f"pending_count must be monotonically increasing under contention. " + f"Got: {counts}. Pre-fix this could be out-of-order." + ) + assert counts == list(range(1, 9)), \ + f"Expected [1..8], got {counts}" + finally: + r._approval_sse_unsubscribe(sid, sub_q)