diff --git a/CHANGELOG.md b/CHANGELOG.md index 9cadce58..c3b0a9ed 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,7 +5,7 @@ ## [v0.50.248] — 2026-04-30 ### Added -- **Real-time approval notifications via SSE long-connection** — replaces the 1.5s HTTP polling loop with a Server-Sent Events endpoint at `/api/approval/stream?session_id=` that pushes approval events to the browser the instant they fire. Cuts approval latency from up to 1.5s down to near-instant and eliminates the "always polling" network noise users observed. Backend uses a thread-safe subscriber registry (`_approval_sse_subscribers` dict, bounded `queue.Queue(maxsize=16)` per subscriber, silent drop on full to prevent leaks from slow tabs). 30s keepalive comments prevent proxy/CDN timeouts; `_CLIENT_DISCONNECT_ERRORS` + `finally` block guarantee subscriber cleanup on any exit path. **Subscribe and snapshot are taken atomically under a single `_lock` acquisition** so a `submit_pending()` arriving in the gap can't be lost. Frontend uses `EventSource` with automatic 3s HTTP polling fallback on `onerror`. 42 new tests cover wiring, lifecycle, multi-subscriber, cross-session isolation, queue overflow, and concurrent subscribe/notify stress. (`api/routes.py`, `static/messages.js`, `tests/test_approval_sse.py`) @fxd-jason — PR #1350 +- **Real-time approval notifications via SSE long-connection** — replaces the 1.5s HTTP polling loop with a Server-Sent Events endpoint at `/api/approval/stream?session_id=` that pushes approval events to the browser the instant they fire. Cuts approval latency from up to 1.5s down to near-instant and eliminates the "always polling" network noise users observed. Backend uses a thread-safe subscriber registry (`_approval_sse_subscribers` dict, bounded `queue.Queue(maxsize=16)` per subscriber, silent drop on full to prevent leaks from slow tabs). 30s keepalive comments prevent proxy/CDN timeouts; `_CLIENT_DISCONNECT_ERRORS` + `finally` block guarantee subscriber cleanup on any exit path. **Subscribe and snapshot are taken atomically under a single `_lock` acquisition** so a `submit_pending()` arriving in the gap can't be lost. **Notify runs inside the queue-mutation lock** in both `submit_pending` and `_handle_approval_respond` so two parallel callers can't deliver out-of-order with stale `pending_count`. **SSE payload always reflects head-of-queue, never tail**, matching `/api/approval/pending`'s contract — with parallel tool-call approvals (#527), the just-appended entry is at the tail but the UI must show the head. **`_handle_approval_respond` now re-emits the new head after popping** so a trailing approval queued behind the one being responded to is surfaced immediately instead of getting stuck until the next event. Frontend uses `EventSource` with automatic 1.5s HTTP polling fallback on `onerror` (preserves degraded-mode parity with v0.50.247). 50 tests cover wiring, lifecycle, multi-subscriber, cross-session isolation, queue overflow, concurrent subscribe/notify stress, atomic-lock invariants, head-fidelity, trailing-approval re-emission, and notify-order monotonicity. (`api/routes.py`, `static/messages.js`, `tests/test_approval_sse.py`, `tests/test_pr1350_sse_atomic_subscribe.py`, `tests/test_pr1350_sse_notify_correctness.py`) @fxd-jason — PR #1350 ### Fixed - **Context indicator percentage shows even without explicit `context_length`** — frontend companion to the v0.50.246 backend fix. The context ring used to display `·` (no data) whenever `context_length` was 0 or missing — fresh agents, interrupted streams, or models without compressor state. Now defaults to **128K** when `usage.context_length` is falsy and labels the indicator with `(est. 128K)` so users can tell apparent vs. measured. Falls back to `input_tokens` for `last_prompt_tokens` so the ring lights up immediately on the first user message. (`static/ui.js`) @fxd-jason — PR #1349 diff --git a/api/routes.py b/api/routes.py index 83f439d2..bc2ac1d0 100644 --- a/api/routes.py +++ b/api/routes.py @@ -567,16 +567,43 @@ def _approval_sse_unsubscribe(session_id: str, q: queue.Queue) -> None: _approval_sse_subscribers.pop(session_id, None) -def _approval_sse_notify(session_id: str, entry: dict, total: int) -> None: - """Push an approval event to all SSE subscribers for a session.""" - payload = {"pending": dict(entry), "pending_count": total} - with _lock: - subs = list(_approval_sse_subscribers.get(session_id, [])) +def _approval_sse_notify_locked(session_id: str, head: dict | None, total: int) -> None: + """Push an approval event to all SSE subscribers for a session. + + CALLER MUST HOLD `_lock`. Snapshots the subscriber list under the held + lock and then calls `q.put_nowait()` on each (which is itself thread-safe). + + `head` is the approval entry currently at the head of the queue (the one + the UI should display) — NOT the just-appended entry. With multiple + parallel approvals (#527), the just-appended entry is at the TAIL, but + `/api/approval/pending` always returns the HEAD, so SSE must match. + + `total` is the total number of pending approvals. + + Pass `head=None` and `total=0` when the queue has just been emptied (e.g. + `_handle_approval_respond` popped the last entry) so the client knows to + hide its approval card. + """ + payload = {"pending": dict(head) if head else None, "pending_count": total} + subs = _approval_sse_subscribers.get(session_id, ()) for q in subs: try: q.put_nowait(payload) except queue.Full: - pass # drop if subscriber is slow + pass # drop if subscriber is slow (bounded queue prevents memory leak) + + +def _approval_sse_notify(session_id: str, head: dict | None, total: int) -> None: + """Convenience wrapper that takes `_lock` itself. + + Use only from contexts that don't already hold `_lock`. Production call + sites (submit_pending, _handle_approval_respond) MUST hold the lock and + call `_approval_sse_notify_locked` directly to avoid a notify-ordering + race where a later append's notify can fire before an earlier append's + notify (resulting in stale `pending_count`). + """ + with _lock: + _approval_sse_notify_locked(session_id, head, total) def submit_pending(session_key: str, approval: dict) -> None: @@ -591,22 +618,24 @@ def submit_pending(session_key: str, approval: dict) -> None: """ entry = dict(approval) entry.setdefault("approval_id", uuid.uuid4().hex) - total = 0 with _lock: - queue = _pending.setdefault(session_key, []) + queue_list = _pending.setdefault(session_key, []) # Replace a legacy non-list value if the agent version uses the old pattern. - if not isinstance(queue, list): - _pending[session_key] = [queue] - queue = _pending[session_key] - queue.append(entry) - total = len(queue) + if not isinstance(queue_list, list): + _pending[session_key] = [queue_list] + queue_list = _pending[session_key] + queue_list.append(entry) + total = len(queue_list) + head = queue_list[0] # /api/approval/pending always returns head + # Push to SSE subscribers from inside _lock so two parallel + # submit_pending calls can't deliver out-of-order (T2's later + # notify arriving before T1's earlier notify with a stale count). + _approval_sse_notify_locked(session_key, head, total) # NOTE: We do NOT call _submit_pending_raw here — that function overwrites # _pending[session_key] with a single dict, which would undo the list we just # built. The gateway blocking path uses _gateway_queues (a separate mechanism # managed by check_all_command_guards / register_gateway_notify), which is # unaffected by _pending. The _pending dict is only used for UI polling. - # Push to SSE subscribers so long-connected clients get notified instantly. - _approval_sse_notify(session_key, entry, total) # Clarify prompts (optional -- graceful fallback if agent not available) try: @@ -3885,6 +3914,16 @@ def _handle_approval_respond(handler, body): elif queue: # Legacy single-dict value. pending = _pending.pop(sid, None) + # Notify SSE subscribers of the new head (or empty state) so the UI + # surfaces any trailing approvals that were queued behind this one + # without waiting for the next submit_pending. Without this, a parallel + # tool-call scenario (#527) would leave the second approval invisible + # in the SSE path until the next event ever fired (the agent thread + # would be parked indefinitely from the user's perspective). + if isinstance(_pending.get(sid), list) and _pending[sid]: + _approval_sse_notify_locked(sid, _pending[sid][0], len(_pending[sid])) + else: + _approval_sse_notify_locked(sid, None, 0) if pending: keys = pending.get("pattern_keys") or [pending.get("pattern_key", "")] diff --git a/static/messages.js b/static/messages.js index 3dffbf0d..c4f92d9f 100644 --- a/static/messages.js +++ b/static/messages.js @@ -1367,7 +1367,7 @@ function _startApprovalFallbackPoll(sid) { if (data.pending) { data.pending._session_id=sid; showApprovalCard(data.pending, data.pending_count||1); } else { hideApprovalCard(); } } catch(e) { /* ignore poll errors */ } - }, 3000); // 3s fallback interval (was 1.5s when it was the primary path) + }, 1500); // matches the v0.50.247 polling cadence so degraded-mode users see the same responsiveness } function stopApprovalPolling() { diff --git a/tests/test_approval_queue.py b/tests/test_approval_queue.py index fbcac019..5d94e828 100644 --- a/tests/test_approval_queue.py +++ b/tests/test_approval_queue.py @@ -23,8 +23,8 @@ INDEX_HTML = (REPO_ROOT / "static" / "index.html").read_text(encoding="utf-8") def test_submit_pending_appends_to_list(): """submit_pending() must append to a list, not overwrite.""" - # The new wrapper must contain queue.append - assert "queue.append(entry)" in ROUTES_SRC, \ + # The new wrapper must contain a queue append (list mutation pattern) + assert "queue_list.append(entry)" in ROUTES_SRC or "queue.append(entry)" in ROUTES_SRC, \ "submit_pending() must append entry to a list queue, not overwrite _pending[sid]" diff --git a/tests/test_approval_sse.py b/tests/test_approval_sse.py index a3f9e233..89670747 100644 --- a/tests/test_approval_sse.py +++ b/tests/test_approval_sse.py @@ -90,9 +90,15 @@ class TestSSEStaticAnalysis: "SSE handler must send 'approval' events when pushing notifications" def test_notify_called_from_submit_pending(self): - """submit_pending must call _approval_sse_notify.""" - assert "_approval_sse_notify(session_key, entry, total)" in ROUTES_SRC, \ - "submit_pending() must call _approval_sse_notify() to push SSE events" + """submit_pending must call _approval_sse_notify_locked.""" + # Pinned to the inner-lock variant: must run inside the same `with _lock:` + # block as the queue mutation so two parallel submit_pending calls can't + # deliver out-of-order with stale pending_count. Tracks the v0.50.248 + # MUST-FIX A fix. + assert "_approval_sse_notify_locked(session_key, head, total)" in ROUTES_SRC, \ + ("submit_pending() must call _approval_sse_notify_locked(session_key, head, total) " + "from inside the `with _lock:` block — not the unlocked _approval_sse_notify wrapper, " + "and head must be queue_list[0] (the head, not the just-appended entry).") def test_unsubscribe_in_finally(self): """SSE handler must unsubscribe in a finally block.""" @@ -164,9 +170,9 @@ class TestFrontendSSEImplementation: "SSE onerror handler must call _startApprovalFallbackPoll" def test_fallback_poll_interval(self): - """Fallback polling interval must be reasonable (3s).""" - assert "3000" in MESSAGES_JS, \ - "Fallback polling interval must be set (3000ms)" + """Fallback polling interval must match v0.50.247's 1500ms cadence.""" + assert "1500" in MESSAGES_JS, \ + "Fallback polling interval must be 1500ms to match degraded-mode parity with v0.50.247" def test_fallback_closes_eventsource(self): """onerror handler must close the EventSource before falling back.""" diff --git a/tests/test_pr1350_sse_notify_correctness.py b/tests/test_pr1350_sse_notify_correctness.py new file mode 100644 index 00000000..ead05e61 --- /dev/null +++ b/tests/test_pr1350_sse_notify_correctness.py @@ -0,0 +1,259 @@ +"""Regression tests for the v0.50.248 SSE notify-on-respond fix. + +Originally PR #1350 only called `_approval_sse_notify` from `submit_pending`, +not from `_handle_approval_respond`. With the parallel-tool-call scenario +that PR #527 supports: + + 1. submit_pending(A) -> SSE pushes (A, 1). UI shows A. + 2. submit_pending(B) -> SSE pushes (B, 2). UI shows B. (bug: was sending B as head, not A) + 3. respond(B) -> queue still contains A. UI hides card. + NO event fires. A is invisible until next event. + +Pre-release Opus review caught two MUST-FIX bugs: + A. notify-ordering race: notify outside _lock could deliver out-of-order + under contention. + C. Trailing approval lost: respond never re-emitted the new head. + D. Payload was tail-not-head: with #527 parallel approvals, /api/approval/pending + returns head, but SSE was returning the just-appended entry (tail). + +The fix: + - `_approval_sse_notify_locked(sid, head, total)` runs inside the caller's + held `_lock` so two parallel callers serialize their notifies in the same + order they serialize their queue mutations. + - submit_pending now passes `head = queue_list[0]` (head-of-queue), not the + just-appended entry. + - _handle_approval_respond now calls _approval_sse_notify_locked after the + pop with the new head (or None/0 if queue is empty). +""" + +import pathlib +import queue +import sys +import time +import uuid + +REPO_ROOT = pathlib.Path(__file__).parent.parent.resolve() +sys.path.insert(0, str(REPO_ROOT)) + + +class TestParallelApprovalsHeadFidelity: + """SSE payload must always reflect head-of-queue, never tail.""" + + def setup_method(self): + from api import routes as r + with r._lock: + r._approval_sse_subscribers.clear() + r._pending.clear() + + def teardown_method(self): + from api import routes as r + with r._lock: + r._approval_sse_subscribers.clear() + r._pending.clear() + + def test_second_submit_pending_sends_head_not_tail(self): + """When B is appended while A is still pending, SSE payload must show A as head.""" + from api import routes as r + sid = f"sse-headtail-{uuid.uuid4().hex[:8]}" + q = r._approval_sse_subscribe(sid) + try: + r.submit_pending(sid, { + "command": "first-A", + "pattern_key": "first", + "pattern_keys": ["first"], + "description": "A", + }) + r.submit_pending(sid, { + "command": "second-B", + "pattern_key": "second", + "pattern_keys": ["second"], + "description": "B", + }) + # First payload should be A (just-appended, also head). + p1 = q.get(timeout=1) + assert p1["pending"]["command"] == "first-A" + assert p1["pending_count"] == 1 + # Second payload's HEAD is still A (we appended B but A is still queued). + p2 = q.get(timeout=1) + assert p2["pending"]["command"] == "first-A", ( + "SSE payload must show head-of-queue (A), not tail (B). " + f"Got: {p2['pending']['command']}" + ) + assert p2["pending_count"] == 2 + finally: + r._approval_sse_unsubscribe(sid, q) + + +class TestRespondNotifiesTrailingApproval: + """After respond() pops one approval, SSE must re-emit the new head if any.""" + + def setup_method(self): + from api import routes as r + with r._lock: + r._approval_sse_subscribers.clear() + r._pending.clear() + + def teardown_method(self): + from api import routes as r + with r._lock: + r._approval_sse_subscribers.clear() + r._pending.clear() + + def test_respond_to_first_pushes_second_as_new_head(self): + """submit A; submit B; respond(A) -> SSE must push (B, 1) so the UI surfaces B.""" + from api import routes as r + sid = f"sse-trailing-{uuid.uuid4().hex[:8]}" + + # Subscribe BEFORE any submit so we capture all events deterministically. + sub_q = r._approval_sse_subscribe(sid) + try: + r.submit_pending(sid, { + "command": "first-A", + "pattern_key": "p1", + "pattern_keys": ["p1"], + "description": "A", + "approval_id": "id-A", + }) + r.submit_pending(sid, { + "command": "second-B", + "pattern_key": "p2", + "pattern_keys": ["p2"], + "description": "B", + "approval_id": "id-B", + }) + # Drain the two submit-driven events. + sub_q.get(timeout=1) # head=A, total=1 + sub_q.get(timeout=1) # head=A, total=2 (head is still A) + + # Now simulate respond(A) by directly invoking the lock+pop+notify + # sequence the route handler runs. (Calling _handle_approval_respond + # would require an HTTP handler mock; the inner sequence is what we + # need to verify.) + from api.routes import _approval_sse_notify_locked, _lock, _pending + with _lock: + qlist = _pending.get(sid) + # Pop A by approval_id + for i, e in enumerate(qlist): + if e.get("approval_id") == "id-A": + qlist.pop(i) + break + # Re-emit head + if isinstance(_pending.get(sid), list) and _pending[sid]: + _approval_sse_notify_locked(sid, _pending[sid][0], len(_pending[sid])) + else: + _approval_sse_notify_locked(sid, None, 0) + + # SSE must push (B, 1) so the UI surfaces the trailing approval. + p3 = sub_q.get(timeout=1) + assert p3["pending"] is not None, \ + "After responding to A, SSE must emit the new head B (not None)" + assert p3["pending"]["command"] == "second-B", \ + f"New head should be B, got: {p3['pending']['command']}" + assert p3["pending_count"] == 1 + finally: + r._approval_sse_unsubscribe(sid, sub_q) + + def test_respond_to_only_pending_pushes_empty_state(self): + """If respond pops the last entry, SSE must push a None/0 sentinel so UI hides card.""" + from api import routes as r + sid = f"sse-empty-{uuid.uuid4().hex[:8]}" + + sub_q = r._approval_sse_subscribe(sid) + try: + r.submit_pending(sid, { + "command": "only-A", + "pattern_key": "p", + "pattern_keys": ["p"], + "description": "A", + "approval_id": "id-only-A", + }) + sub_q.get(timeout=1) # drain submit notify + + from api.routes import _approval_sse_notify_locked, _lock, _pending + with _lock: + qlist = _pending.get(sid) + for i, e in enumerate(qlist): + if e.get("approval_id") == "id-only-A": + qlist.pop(i) + break + if not qlist: + _pending.pop(sid, None) + if isinstance(_pending.get(sid), list) and _pending[sid]: + _approval_sse_notify_locked(sid, _pending[sid][0], len(_pending[sid])) + else: + _approval_sse_notify_locked(sid, None, 0) + + payload = sub_q.get(timeout=1) + assert payload["pending"] is None, \ + "After responding to the only approval, SSE must push pending=None" + assert payload["pending_count"] == 0 + finally: + r._approval_sse_unsubscribe(sid, sub_q) + + +class TestNotifyOrderUnderContention: + """Two parallel submit_pending callers must deliver in queue-mutation order.""" + + def setup_method(self): + from api import routes as r + with r._lock: + r._approval_sse_subscribers.clear() + r._pending.clear() + + def teardown_method(self): + from api import routes as r + with r._lock: + r._approval_sse_subscribers.clear() + r._pending.clear() + + def test_pending_count_is_monotonic_under_contention(self): + """Under parallel submit_pending, pending_count must be monotonically increasing. + + Pre-fix: notify outside _lock meant T2's notify could fire before T1's, + with subscribers seeing pending_count=2 then pending_count=1. Now that + notify runs inside _lock alongside the append, the order is guaranteed. + """ + import threading + from api import routes as r + sid = f"sse-order-{uuid.uuid4().hex[:8]}" + + sub_q = r._approval_sse_subscribe(sid) + try: + errors = [] + barrier = threading.Barrier(8) + + def submitter(idx): + try: + barrier.wait(timeout=2) + r.submit_pending(sid, { + "command": f"cmd-{idx}", + "pattern_key": f"p{idx}", + "pattern_keys": [f"p{idx}"], + "description": f"d{idx}", + }) + except Exception as e: + errors.append(e) + + threads = [threading.Thread(target=submitter, args=(i,)) for i in range(8)] + for t in threads: + t.start() + for t in threads: + t.join(timeout=5) + + assert not errors, f"Submitter errors: {errors}" + + # Drain payloads — count must go 1, 2, 3, ..., 8 in some order + # consistent with the queue serialization. Specifically, never decrease. + counts = [] + for _ in range(8): + p = sub_q.get(timeout=2) + counts.append(p["pending_count"]) + + assert counts == sorted(counts), ( + f"pending_count must be monotonically increasing under contention. " + f"Got: {counts}. Pre-fix this could be out-of-order." + ) + assert counts == list(range(1, 9)), \ + f"Expected [1..8], got {counts}" + finally: + r._approval_sse_unsubscribe(sid, sub_q)