fix(approval): close SSE notify-ordering, head-fidelity, and trailing-approval gaps (Opus MUST-FIX A/C/D)

Pre-release Opus review caught three correctness bugs in the original
PR #1350 SSE wiring beyond the snapshot/subscribe race:

A) **Notify-ordering race (MUST-FIX A):** _approval_sse_notify took _lock
   only for the subscriber-list snapshot, then released it before
   put_nowait. With two parallel submit_pending calls, T2's notify
   could fire before T1's, leaving the UI showing pending_count=1 while
   the server actually had 2 queued.

C) **Trailing approval lost (MUST-FIX C):** _handle_approval_respond
   never called _approval_sse_notify after popping. With parallel
   tool-call approvals (#527), a second approval queued behind the one
   being responded to was invisible until the next event ever fired —
   in practice, the agent thread parked on it would appear hung.

D) **Payload showed tail not head (MUST-FIX D):** payload built from
   the just-appended entry instead of queue[0]. /api/approval/pending
   returns the head; SSE returned the tail. Diverging contracts.

Fix:
- Split into _approval_sse_notify_locked (caller holds _lock, no
  internal locking) and _approval_sse_notify (convenience wrapper).
- submit_pending: call _locked variant inside the queue-mutation lock,
  passing queue_list[0] as head.
- _handle_approval_respond: call _locked variant inside the pop lock,
  passing the new head (or None/0 if queue is empty).
- Restore fallback poll to 1500ms (was bumped to 3000ms; degraded-mode
  parity with v0.50.247 is more important than save 1.5s of polling).

New regression tests in tests/test_pr1350_sse_notify_correctness.py:
- test_second_submit_pending_sends_head_not_tail (D)
- test_respond_to_first_pushes_second_as_new_head (C)
- test_respond_to_only_pending_pushes_empty_state (C edge)
- test_pending_count_is_monotonic_under_contention (A)

Updated test_approval_sse.py to pin the new contract:
- _approval_sse_notify_locked(session_key, head, total)
- 1500ms fallback interval

Total: 3411 tests passing.

Co-authored-by: jasonjcwu <jasonjcwu@users.noreply.github.com>
This commit is contained in:
nesquena-hermes
2026-04-30 18:45:15 +00:00
parent d6b9cfac23
commit e68f74ac99
6 changed files with 329 additions and 25 deletions
+1 -1
View File
@@ -5,7 +5,7 @@
## [v0.50.248] — 2026-04-30
### Added
- **Real-time approval notifications via SSE long-connection** — replaces the 1.5s HTTP polling loop with a Server-Sent Events endpoint at `/api/approval/stream?session_id=` that pushes approval events to the browser the instant they fire. Cuts approval latency from up to 1.5s down to near-instant and eliminates the "always polling" network noise users observed. Backend uses a thread-safe subscriber registry (`_approval_sse_subscribers` dict, bounded `queue.Queue(maxsize=16)` per subscriber, silent drop on full to prevent leaks from slow tabs). 30s keepalive comments prevent proxy/CDN timeouts; `_CLIENT_DISCONNECT_ERRORS` + `finally` block guarantee subscriber cleanup on any exit path. **Subscribe and snapshot are taken atomically under a single `_lock` acquisition** so a `submit_pending()` arriving in the gap can't be lost. Frontend uses `EventSource` with automatic 3s HTTP polling fallback on `onerror`. 42 new tests cover wiring, lifecycle, multi-subscriber, cross-session isolation, queue overflow, and concurrent subscribe/notify stress. (`api/routes.py`, `static/messages.js`, `tests/test_approval_sse.py`) @fxd-jason — PR #1350
- **Real-time approval notifications via SSE long-connection** — replaces the 1.5s HTTP polling loop with a Server-Sent Events endpoint at `/api/approval/stream?session_id=` that pushes approval events to the browser the instant they fire. Cuts approval latency from up to 1.5s down to near-instant and eliminates the "always polling" network noise users observed. Backend uses a thread-safe subscriber registry (`_approval_sse_subscribers` dict, bounded `queue.Queue(maxsize=16)` per subscriber, silent drop on full to prevent leaks from slow tabs). 30s keepalive comments prevent proxy/CDN timeouts; `_CLIENT_DISCONNECT_ERRORS` + `finally` block guarantee subscriber cleanup on any exit path. **Subscribe and snapshot are taken atomically under a single `_lock` acquisition** so a `submit_pending()` arriving in the gap can't be lost. **Notify runs inside the queue-mutation lock** in both `submit_pending` and `_handle_approval_respond` so two parallel callers can't deliver out-of-order with stale `pending_count`. **SSE payload always reflects head-of-queue, never tail**, matching `/api/approval/pending`'s contract — with parallel tool-call approvals (#527), the just-appended entry is at the tail but the UI must show the head. **`_handle_approval_respond` now re-emits the new head after popping** so a trailing approval queued behind the one being responded to is surfaced immediately instead of getting stuck until the next event. Frontend uses `EventSource` with automatic 1.5s HTTP polling fallback on `onerror` (preserves degraded-mode parity with v0.50.247). 50 tests cover wiring, lifecycle, multi-subscriber, cross-session isolation, queue overflow, concurrent subscribe/notify stress, atomic-lock invariants, head-fidelity, trailing-approval re-emission, and notify-order monotonicity. (`api/routes.py`, `static/messages.js`, `tests/test_approval_sse.py`, `tests/test_pr1350_sse_atomic_subscribe.py`, `tests/test_pr1350_sse_notify_correctness.py`) @fxd-jason — PR #1350
### Fixed
- **Context indicator percentage shows even without explicit `context_length`** — frontend companion to the v0.50.246 backend fix. The context ring used to display `·` (no data) whenever `context_length` was 0 or missing — fresh agents, interrupted streams, or models without compressor state. Now defaults to **128K** when `usage.context_length` is falsy and labels the indicator with `(est. 128K)` so users can tell apparent vs. measured. Falls back to `input_tokens` for `last_prompt_tokens` so the ring lights up immediately on the first user message. (`static/ui.js`) @fxd-jason — PR #1349
+54 -15
View File
@@ -567,16 +567,43 @@ def _approval_sse_unsubscribe(session_id: str, q: queue.Queue) -> None:
_approval_sse_subscribers.pop(session_id, None)
def _approval_sse_notify(session_id: str, entry: dict, total: int) -> None:
"""Push an approval event to all SSE subscribers for a session."""
payload = {"pending": dict(entry), "pending_count": total}
with _lock:
subs = list(_approval_sse_subscribers.get(session_id, []))
def _approval_sse_notify_locked(session_id: str, head: dict | None, total: int) -> None:
"""Push an approval event to all SSE subscribers for a session.
CALLER MUST HOLD `_lock`. Snapshots the subscriber list under the held
lock and then calls `q.put_nowait()` on each (which is itself thread-safe).
`head` is the approval entry currently at the head of the queue (the one
the UI should display) — NOT the just-appended entry. With multiple
parallel approvals (#527), the just-appended entry is at the TAIL, but
`/api/approval/pending` always returns the HEAD, so SSE must match.
`total` is the total number of pending approvals.
Pass `head=None` and `total=0` when the queue has just been emptied (e.g.
`_handle_approval_respond` popped the last entry) so the client knows to
hide its approval card.
"""
payload = {"pending": dict(head) if head else None, "pending_count": total}
subs = _approval_sse_subscribers.get(session_id, ())
for q in subs:
try:
q.put_nowait(payload)
except queue.Full:
pass # drop if subscriber is slow
pass # drop if subscriber is slow (bounded queue prevents memory leak)
def _approval_sse_notify(session_id: str, head: dict | None, total: int) -> None:
"""Convenience wrapper that takes `_lock` itself.
Use only from contexts that don't already hold `_lock`. Production call
sites (submit_pending, _handle_approval_respond) MUST hold the lock and
call `_approval_sse_notify_locked` directly to avoid a notify-ordering
race where a later append's notify can fire before an earlier append's
notify (resulting in stale `pending_count`).
"""
with _lock:
_approval_sse_notify_locked(session_id, head, total)
def submit_pending(session_key: str, approval: dict) -> None:
@@ -591,22 +618,24 @@ def submit_pending(session_key: str, approval: dict) -> None:
"""
entry = dict(approval)
entry.setdefault("approval_id", uuid.uuid4().hex)
total = 0
with _lock:
queue = _pending.setdefault(session_key, [])
queue_list = _pending.setdefault(session_key, [])
# Replace a legacy non-list value if the agent version uses the old pattern.
if not isinstance(queue, list):
_pending[session_key] = [queue]
queue = _pending[session_key]
queue.append(entry)
total = len(queue)
if not isinstance(queue_list, list):
_pending[session_key] = [queue_list]
queue_list = _pending[session_key]
queue_list.append(entry)
total = len(queue_list)
head = queue_list[0] # /api/approval/pending always returns head
# Push to SSE subscribers from inside _lock so two parallel
# submit_pending calls can't deliver out-of-order (T2's later
# notify arriving before T1's earlier notify with a stale count).
_approval_sse_notify_locked(session_key, head, total)
# NOTE: We do NOT call _submit_pending_raw here — that function overwrites
# _pending[session_key] with a single dict, which would undo the list we just
# built. The gateway blocking path uses _gateway_queues (a separate mechanism
# managed by check_all_command_guards / register_gateway_notify), which is
# unaffected by _pending. The _pending dict is only used for UI polling.
# Push to SSE subscribers so long-connected clients get notified instantly.
_approval_sse_notify(session_key, entry, total)
# Clarify prompts (optional -- graceful fallback if agent not available)
try:
@@ -3885,6 +3914,16 @@ def _handle_approval_respond(handler, body):
elif queue:
# Legacy single-dict value.
pending = _pending.pop(sid, None)
# Notify SSE subscribers of the new head (or empty state) so the UI
# surfaces any trailing approvals that were queued behind this one
# without waiting for the next submit_pending. Without this, a parallel
# tool-call scenario (#527) would leave the second approval invisible
# in the SSE path until the next event ever fired (the agent thread
# would be parked indefinitely from the user's perspective).
if isinstance(_pending.get(sid), list) and _pending[sid]:
_approval_sse_notify_locked(sid, _pending[sid][0], len(_pending[sid]))
else:
_approval_sse_notify_locked(sid, None, 0)
if pending:
keys = pending.get("pattern_keys") or [pending.get("pattern_key", "")]
+1 -1
View File
@@ -1367,7 +1367,7 @@ function _startApprovalFallbackPoll(sid) {
if (data.pending) { data.pending._session_id=sid; showApprovalCard(data.pending, data.pending_count||1); }
else { hideApprovalCard(); }
} catch(e) { /* ignore poll errors */ }
}, 3000); // 3s fallback interval (was 1.5s when it was the primary path)
}, 1500); // matches the v0.50.247 polling cadence so degraded-mode users see the same responsiveness
}
function stopApprovalPolling() {
+2 -2
View File
@@ -23,8 +23,8 @@ INDEX_HTML = (REPO_ROOT / "static" / "index.html").read_text(encoding="utf-8")
def test_submit_pending_appends_to_list():
"""submit_pending() must append to a list, not overwrite."""
# The new wrapper must contain queue.append
assert "queue.append(entry)" in ROUTES_SRC, \
# The new wrapper must contain a queue append (list mutation pattern)
assert "queue_list.append(entry)" in ROUTES_SRC or "queue.append(entry)" in ROUTES_SRC, \
"submit_pending() must append entry to a list queue, not overwrite _pending[sid]"
+12 -6
View File
@@ -90,9 +90,15 @@ class TestSSEStaticAnalysis:
"SSE handler must send 'approval' events when pushing notifications"
def test_notify_called_from_submit_pending(self):
"""submit_pending must call _approval_sse_notify."""
assert "_approval_sse_notify(session_key, entry, total)" in ROUTES_SRC, \
"submit_pending() must call _approval_sse_notify() to push SSE events"
"""submit_pending must call _approval_sse_notify_locked."""
# Pinned to the inner-lock variant: must run inside the same `with _lock:`
# block as the queue mutation so two parallel submit_pending calls can't
# deliver out-of-order with stale pending_count. Tracks the v0.50.248
# MUST-FIX A fix.
assert "_approval_sse_notify_locked(session_key, head, total)" in ROUTES_SRC, \
("submit_pending() must call _approval_sse_notify_locked(session_key, head, total) "
"from inside the `with _lock:` block — not the unlocked _approval_sse_notify wrapper, "
"and head must be queue_list[0] (the head, not the just-appended entry).")
def test_unsubscribe_in_finally(self):
"""SSE handler must unsubscribe in a finally block."""
@@ -164,9 +170,9 @@ class TestFrontendSSEImplementation:
"SSE onerror handler must call _startApprovalFallbackPoll"
def test_fallback_poll_interval(self):
"""Fallback polling interval must be reasonable (3s)."""
assert "3000" in MESSAGES_JS, \
"Fallback polling interval must be set (3000ms)"
"""Fallback polling interval must match v0.50.247's 1500ms cadence."""
assert "1500" in MESSAGES_JS, \
"Fallback polling interval must be 1500ms to match degraded-mode parity with v0.50.247"
def test_fallback_closes_eventsource(self):
"""onerror handler must close the EventSource before falling back."""
+259
View File
@@ -0,0 +1,259 @@
"""Regression tests for the v0.50.248 SSE notify-on-respond fix.
Originally PR #1350 only called `_approval_sse_notify` from `submit_pending`,
not from `_handle_approval_respond`. With the parallel-tool-call scenario
that PR #527 supports:
1. submit_pending(A) -> SSE pushes (A, 1). UI shows A.
2. submit_pending(B) -> SSE pushes (B, 2). UI shows B. (bug: was sending B as head, not A)
3. respond(B) -> queue still contains A. UI hides card.
NO event fires. A is invisible until next event.
Pre-release Opus review caught two MUST-FIX bugs:
A. notify-ordering race: notify outside _lock could deliver out-of-order
under contention.
C. Trailing approval lost: respond never re-emitted the new head.
D. Payload was tail-not-head: with #527 parallel approvals, /api/approval/pending
returns head, but SSE was returning the just-appended entry (tail).
The fix:
- `_approval_sse_notify_locked(sid, head, total)` runs inside the caller's
held `_lock` so two parallel callers serialize their notifies in the same
order they serialize their queue mutations.
- submit_pending now passes `head = queue_list[0]` (head-of-queue), not the
just-appended entry.
- _handle_approval_respond now calls _approval_sse_notify_locked after the
pop with the new head (or None/0 if queue is empty).
"""
import pathlib
import queue
import sys
import time
import uuid
REPO_ROOT = pathlib.Path(__file__).parent.parent.resolve()
sys.path.insert(0, str(REPO_ROOT))
class TestParallelApprovalsHeadFidelity:
"""SSE payload must always reflect head-of-queue, never tail."""
def setup_method(self):
from api import routes as r
with r._lock:
r._approval_sse_subscribers.clear()
r._pending.clear()
def teardown_method(self):
from api import routes as r
with r._lock:
r._approval_sse_subscribers.clear()
r._pending.clear()
def test_second_submit_pending_sends_head_not_tail(self):
"""When B is appended while A is still pending, SSE payload must show A as head."""
from api import routes as r
sid = f"sse-headtail-{uuid.uuid4().hex[:8]}"
q = r._approval_sse_subscribe(sid)
try:
r.submit_pending(sid, {
"command": "first-A",
"pattern_key": "first",
"pattern_keys": ["first"],
"description": "A",
})
r.submit_pending(sid, {
"command": "second-B",
"pattern_key": "second",
"pattern_keys": ["second"],
"description": "B",
})
# First payload should be A (just-appended, also head).
p1 = q.get(timeout=1)
assert p1["pending"]["command"] == "first-A"
assert p1["pending_count"] == 1
# Second payload's HEAD is still A (we appended B but A is still queued).
p2 = q.get(timeout=1)
assert p2["pending"]["command"] == "first-A", (
"SSE payload must show head-of-queue (A), not tail (B). "
f"Got: {p2['pending']['command']}"
)
assert p2["pending_count"] == 2
finally:
r._approval_sse_unsubscribe(sid, q)
class TestRespondNotifiesTrailingApproval:
"""After respond() pops one approval, SSE must re-emit the new head if any."""
def setup_method(self):
from api import routes as r
with r._lock:
r._approval_sse_subscribers.clear()
r._pending.clear()
def teardown_method(self):
from api import routes as r
with r._lock:
r._approval_sse_subscribers.clear()
r._pending.clear()
def test_respond_to_first_pushes_second_as_new_head(self):
"""submit A; submit B; respond(A) -> SSE must push (B, 1) so the UI surfaces B."""
from api import routes as r
sid = f"sse-trailing-{uuid.uuid4().hex[:8]}"
# Subscribe BEFORE any submit so we capture all events deterministically.
sub_q = r._approval_sse_subscribe(sid)
try:
r.submit_pending(sid, {
"command": "first-A",
"pattern_key": "p1",
"pattern_keys": ["p1"],
"description": "A",
"approval_id": "id-A",
})
r.submit_pending(sid, {
"command": "second-B",
"pattern_key": "p2",
"pattern_keys": ["p2"],
"description": "B",
"approval_id": "id-B",
})
# Drain the two submit-driven events.
sub_q.get(timeout=1) # head=A, total=1
sub_q.get(timeout=1) # head=A, total=2 (head is still A)
# Now simulate respond(A) by directly invoking the lock+pop+notify
# sequence the route handler runs. (Calling _handle_approval_respond
# would require an HTTP handler mock; the inner sequence is what we
# need to verify.)
from api.routes import _approval_sse_notify_locked, _lock, _pending
with _lock:
qlist = _pending.get(sid)
# Pop A by approval_id
for i, e in enumerate(qlist):
if e.get("approval_id") == "id-A":
qlist.pop(i)
break
# Re-emit head
if isinstance(_pending.get(sid), list) and _pending[sid]:
_approval_sse_notify_locked(sid, _pending[sid][0], len(_pending[sid]))
else:
_approval_sse_notify_locked(sid, None, 0)
# SSE must push (B, 1) so the UI surfaces the trailing approval.
p3 = sub_q.get(timeout=1)
assert p3["pending"] is not None, \
"After responding to A, SSE must emit the new head B (not None)"
assert p3["pending"]["command"] == "second-B", \
f"New head should be B, got: {p3['pending']['command']}"
assert p3["pending_count"] == 1
finally:
r._approval_sse_unsubscribe(sid, sub_q)
def test_respond_to_only_pending_pushes_empty_state(self):
"""If respond pops the last entry, SSE must push a None/0 sentinel so UI hides card."""
from api import routes as r
sid = f"sse-empty-{uuid.uuid4().hex[:8]}"
sub_q = r._approval_sse_subscribe(sid)
try:
r.submit_pending(sid, {
"command": "only-A",
"pattern_key": "p",
"pattern_keys": ["p"],
"description": "A",
"approval_id": "id-only-A",
})
sub_q.get(timeout=1) # drain submit notify
from api.routes import _approval_sse_notify_locked, _lock, _pending
with _lock:
qlist = _pending.get(sid)
for i, e in enumerate(qlist):
if e.get("approval_id") == "id-only-A":
qlist.pop(i)
break
if not qlist:
_pending.pop(sid, None)
if isinstance(_pending.get(sid), list) and _pending[sid]:
_approval_sse_notify_locked(sid, _pending[sid][0], len(_pending[sid]))
else:
_approval_sse_notify_locked(sid, None, 0)
payload = sub_q.get(timeout=1)
assert payload["pending"] is None, \
"After responding to the only approval, SSE must push pending=None"
assert payload["pending_count"] == 0
finally:
r._approval_sse_unsubscribe(sid, sub_q)
class TestNotifyOrderUnderContention:
"""Two parallel submit_pending callers must deliver in queue-mutation order."""
def setup_method(self):
from api import routes as r
with r._lock:
r._approval_sse_subscribers.clear()
r._pending.clear()
def teardown_method(self):
from api import routes as r
with r._lock:
r._approval_sse_subscribers.clear()
r._pending.clear()
def test_pending_count_is_monotonic_under_contention(self):
"""Under parallel submit_pending, pending_count must be monotonically increasing.
Pre-fix: notify outside _lock meant T2's notify could fire before T1's,
with subscribers seeing pending_count=2 then pending_count=1. Now that
notify runs inside _lock alongside the append, the order is guaranteed.
"""
import threading
from api import routes as r
sid = f"sse-order-{uuid.uuid4().hex[:8]}"
sub_q = r._approval_sse_subscribe(sid)
try:
errors = []
barrier = threading.Barrier(8)
def submitter(idx):
try:
barrier.wait(timeout=2)
r.submit_pending(sid, {
"command": f"cmd-{idx}",
"pattern_key": f"p{idx}",
"pattern_keys": [f"p{idx}"],
"description": f"d{idx}",
})
except Exception as e:
errors.append(e)
threads = [threading.Thread(target=submitter, args=(i,)) for i in range(8)]
for t in threads:
t.start()
for t in threads:
t.join(timeout=5)
assert not errors, f"Submitter errors: {errors}"
# Drain payloads — count must go 1, 2, 3, ..., 8 in some order
# consistent with the queue serialization. Specifically, never decrease.
counts = []
for _ in range(8):
p = sub_q.get(timeout=2)
counts.append(p["pending_count"])
assert counts == sorted(counts), (
f"pending_count must be monotonically increasing under contention. "
f"Got: {counts}. Pre-fix this could be out-of-order."
)
assert counts == list(range(1, 9)), \
f"Expected [1..8], got {counts}"
finally:
r._approval_sse_unsubscribe(sid, sub_q)