Merge pull request #1351 from nesquena/release/v0.50.248

release: v0.50.248
This commit is contained in:
nesquena-hermes
2026-04-30 11:50:17 -07:00
committed by GitHub
8 changed files with 1069 additions and 15 deletions
+8
View File
@@ -2,6 +2,14 @@
## [Unreleased]
## [v0.50.248] — 2026-04-30
### Added
- **Real-time approval notifications via SSE long-connection** — replaces the 1.5s HTTP polling loop with a Server-Sent Events endpoint at `/api/approval/stream?session_id=` that pushes approval events to the browser the instant they fire. Cuts approval latency from up to 1.5s down to near-instant and eliminates the "always polling" network noise users observed. Backend uses a thread-safe subscriber registry (`_approval_sse_subscribers` dict, bounded `queue.Queue(maxsize=16)` per subscriber, silent drop on full to prevent leaks from slow tabs). 30s keepalive comments prevent proxy/CDN timeouts; `_CLIENT_DISCONNECT_ERRORS` + `finally` block guarantee subscriber cleanup on any exit path. **Subscribe and snapshot are taken atomically under a single `_lock` acquisition** so a `submit_pending()` arriving in the gap can't be lost. **Notify runs inside the queue-mutation lock** in both `submit_pending` and `_handle_approval_respond` so two parallel callers can't deliver out-of-order with stale `pending_count`. **SSE payload always reflects head-of-queue, never tail**, matching `/api/approval/pending`'s contract — with parallel tool-call approvals (#527), the just-appended entry is at the tail but the UI must show the head. **`_handle_approval_respond` now re-emits the new head after popping** so a trailing approval queued behind the one being responded to is surfaced immediately instead of getting stuck until the next event. Frontend uses `EventSource` with automatic 1.5s HTTP polling fallback on `onerror` (preserves degraded-mode parity with v0.50.247). 50 tests cover wiring, lifecycle, multi-subscriber, cross-session isolation, queue overflow, concurrent subscribe/notify stress, atomic-lock invariants, head-fidelity, trailing-approval re-emission, and notify-order monotonicity. (`api/routes.py`, `static/messages.js`, `tests/test_approval_sse.py`, `tests/test_pr1350_sse_atomic_subscribe.py`, `tests/test_pr1350_sse_notify_correctness.py`) @fxd-jason — PR #1350
### Fixed
- **Context indicator percentage shows even without explicit `context_length`** — frontend companion to the v0.50.246 backend fix. The context ring used to display `·` (no data) whenever `context_length` was 0 or missing — fresh agents, interrupted streams, or models without compressor state. Now defaults to **128K** when `usage.context_length` is falsy and labels the indicator with `(est. 128K)` so users can tell apparent vs. measured. Falls back to `input_tokens` for `last_prompt_tokens` so the ring lights up immediately on the first user message. (`static/ui.js`) @fxd-jason — PR #1349
## [v0.50.247] — 2026-04-30
### Added
+146 -5
View File
@@ -545,6 +545,67 @@ except ImportError:
_permanent_approved = set()
# ── Approval SSE subscribers (long-connection push) ──────────────────────────
_approval_sse_subscribers: dict[str, list[queue.Queue]] = {}
def _approval_sse_subscribe(session_id: str) -> queue.Queue:
"""Register an SSE subscriber for approval events on a given session."""
q = queue.Queue(maxsize=16)
with _lock:
_approval_sse_subscribers.setdefault(session_id, []).append(q)
return q
def _approval_sse_unsubscribe(session_id: str, q: queue.Queue) -> None:
"""Remove an SSE subscriber."""
with _lock:
subs = _approval_sse_subscribers.get(session_id)
if subs and q in subs:
subs.remove(q)
if not subs:
_approval_sse_subscribers.pop(session_id, None)
def _approval_sse_notify_locked(session_id: str, head: dict | None, total: int) -> None:
"""Push an approval event to all SSE subscribers for a session.
CALLER MUST HOLD `_lock`. Snapshots the subscriber list under the held
lock and then calls `q.put_nowait()` on each (which is itself thread-safe).
`head` is the approval entry currently at the head of the queue (the one
the UI should display) — NOT the just-appended entry. With multiple
parallel approvals (#527), the just-appended entry is at the TAIL, but
`/api/approval/pending` always returns the HEAD, so SSE must match.
`total` is the total number of pending approvals.
Pass `head=None` and `total=0` when the queue has just been emptied (e.g.
`_handle_approval_respond` popped the last entry) so the client knows to
hide its approval card.
"""
payload = {"pending": dict(head) if head else None, "pending_count": total}
subs = _approval_sse_subscribers.get(session_id, ())
for q in subs:
try:
q.put_nowait(payload)
except queue.Full:
pass # drop if subscriber is slow (bounded queue prevents memory leak)
def _approval_sse_notify(session_id: str, head: dict | None, total: int) -> None:
"""Convenience wrapper that takes `_lock` itself.
Use only from contexts that don't already hold `_lock`. Production call
sites (submit_pending, _handle_approval_respond) MUST hold the lock and
call `_approval_sse_notify_locked` directly to avoid a notify-ordering
race where a later append's notify can fire before an earlier append's
notify (resulting in stale `pending_count`).
"""
with _lock:
_approval_sse_notify_locked(session_id, head, total)
def submit_pending(session_key: str, approval: dict) -> None:
"""Append a pending approval to the per-session queue.
@@ -553,16 +614,23 @@ def submit_pending(session_key: str, approval: dict) -> None:
a specific entry even when multiple approvals are queued simultaneously.
- Change the storage from a single overwriting dict value to a list, so
parallel tool calls each get their own approval slot (fixes #527).
- Notify any connected SSE subscribers immediately.
"""
entry = dict(approval)
entry.setdefault("approval_id", uuid.uuid4().hex)
with _lock:
queue = _pending.setdefault(session_key, [])
queue_list = _pending.setdefault(session_key, [])
# Replace a legacy non-list value if the agent version uses the old pattern.
if not isinstance(queue, list):
_pending[session_key] = [queue]
queue = _pending[session_key]
queue.append(entry)
if not isinstance(queue_list, list):
_pending[session_key] = [queue_list]
queue_list = _pending[session_key]
queue_list.append(entry)
total = len(queue_list)
head = queue_list[0] # /api/approval/pending always returns head
# Push to SSE subscribers from inside _lock so two parallel
# submit_pending calls can't deliver out-of-order (T2's later
# notify arriving before T1's earlier notify with a stale count).
_approval_sse_notify_locked(session_key, head, total)
# NOTE: We do NOT call _submit_pending_raw here — that function overwrites
# _pending[session_key] with a single dict, which would undo the list we just
# built. The gateway blocking path uses _gateway_queues (a separate mechanism
@@ -1191,6 +1259,9 @@ def handle_get(handler, parsed) -> bool:
if parsed.path == "/api/approval/pending":
return _handle_approval_pending(handler, parsed)
if parsed.path == "/api/approval/stream":
return _handle_approval_sse_stream(handler, parsed)
if parsed.path == "/api/approval/inject_test":
# Loopback-only: used by automated tests; blocked from any remote client
if handler.client_address[0] != "127.0.0.1":
@@ -2720,6 +2791,66 @@ def _handle_approval_pending(handler, parsed):
return j(handler, {"pending": None, "pending_count": 0})
def _handle_approval_sse_stream(handler, parsed):
"""SSE endpoint for real-time approval notifications.
Long-lived connection that pushes approval events the moment they arrive,
replacing the 1.5s polling loop. The frontend uses EventSource and falls
back to HTTP polling if the connection fails.
"""
sid = parse_qs(parsed.query).get("session_id", [""])[0]
if not sid:
return bad(handler, "session_id is required")
# Subscribe AND snapshot atomically under a single _lock acquisition so a
# submit_pending() that fires between the two cannot be lost. If we
# snapshot first then subscribe (the naive ordering), an approval that
# arrives in the gap is appended to _pending (after our snapshot) AND
# notified to subscribers (before we joined) — leaving the client unaware
# until the next event arrives.
q = queue.Queue(maxsize=16)
initial_pending = None
initial_count = 0
with _lock:
_approval_sse_subscribers.setdefault(sid, []).append(q)
q_list = _pending.get(sid)
if isinstance(q_list, list):
initial_pending = dict(q_list[0]) if q_list else None
initial_count = len(q_list)
elif q_list:
initial_pending = dict(q_list)
initial_count = 1
handler.send_response(200)
handler.send_header('Content-Type', 'text/event-stream; charset=utf-8')
handler.send_header('Cache-Control', 'no-cache')
handler.send_header('X-Accel-Buffering', 'no')
handler.send_header('Connection', 'keep-alive')
handler.end_headers()
from api.streaming import _sse
# Push initial state immediately so the client doesn't miss anything.
_sse(handler, 'initial', {"pending": initial_pending, "pending_count": initial_count})
try:
while True:
try:
payload = q.get(timeout=30)
except queue.Empty:
# Keepalive — SSE comment line prevents proxy/CDN timeout.
handler.wfile.write(b': keepalive\n\n')
handler.wfile.flush()
continue
if payload is None:
break # signal to close
_sse(handler, 'approval', payload)
except _CLIENT_DISCONNECT_ERRORS:
pass # client went away — normal for long-lived connections
finally:
_approval_sse_unsubscribe(sid, q)
def _handle_approval_inject(handler, parsed):
"""Inject a fake pending approval -- loopback-only, used by automated tests."""
qs = parse_qs(parsed.query)
@@ -3783,6 +3914,16 @@ def _handle_approval_respond(handler, body):
elif queue:
# Legacy single-dict value.
pending = _pending.pop(sid, None)
# Notify SSE subscribers of the new head (or empty state) so the UI
# surfaces any trailing approvals that were queued behind this one
# without waiting for the next submit_pending. Without this, a parallel
# tool-call scenario (#527) would leave the second approval invisible
# in the SSE path until the next event ever fired (the agent thread
# would be parked indefinitely from the user's perspective).
if isinstance(_pending.get(sid), list) and _pending[sid]:
_approval_sse_notify_locked(sid, _pending[sid][0], len(_pending[sid]))
else:
_approval_sse_notify_locked(sid, None, 0)
if pending:
keys = pending.get("pattern_keys") or [pending.get("pattern_key", "")]
+47 -1
View File
@@ -1314,6 +1314,50 @@ async function respondApproval(choice) {
function startApprovalPolling(sid) {
stopApprovalPolling();
// ── SSE (preferred): long-lived connection, server pushes instantly ──
try {
const es = new EventSource('/api/approval/stream?session_id=' + encodeURIComponent(sid));
let _fallbackActive = false;
es.addEventListener('initial', e => {
const d = JSON.parse(e.data);
if (d.pending) { d.pending._session_id = sid; showApprovalCard(d.pending, d.pending_count || 1); }
else { hideApprovalCard(); }
});
es.addEventListener('approval', e => {
const d = JSON.parse(e.data);
if (d.pending) { d.pending._session_id = sid; showApprovalCard(d.pending, d.pending_count || 1); }
else { hideApprovalCard(); }
});
es.onerror = () => {
// SSE failed — fall back to HTTP polling (3s interval)
if (_fallbackActive) return;
_fallbackActive = true;
try { es.close(); } catch(_){}
_startApprovalFallbackPoll(sid);
};
// If the session changes or stops being busy, close the SSE.
// We detect this via a periodic check (cheap — no network request).
_approvalSSEHealthTimer = setInterval(() => {
if (!S.busy || !S.session || S.session.session_id !== sid) {
stopApprovalPolling(); hideApprovalCard(true);
}
}, 5000);
_approvalEventSource = es;
} catch(_e) {
// EventSource constructor failed — use polling directly
_startApprovalFallbackPoll(sid);
}
}
let _approvalEventSource = null;
let _approvalSSEHealthTimer = null;
function _startApprovalFallbackPoll(sid) {
_approvalPollTimer = setInterval(async () => {
if (!S.busy || !S.session || S.session.session_id !== sid) {
stopApprovalPolling(); hideApprovalCard(true); return;
@@ -1323,11 +1367,13 @@ function startApprovalPolling(sid) {
if (data.pending) { data.pending._session_id=sid; showApprovalCard(data.pending, data.pending_count||1); }
else { hideApprovalCard(); }
} catch(e) { /* ignore poll errors */ }
}, 1500);
}, 1500); // matches the v0.50.247 polling cadence so degraded-mode users see the same responsiveness
}
function stopApprovalPolling() {
if (_approvalPollTimer) { clearInterval(_approvalPollTimer); _approvalPollTimer = null; }
if (_approvalEventSource) { try { _approvalEventSource.close(); } catch(_){} _approvalEventSource = null; }
if (_approvalSSEHealthTimer) { clearInterval(_approvalSSEHealthTimer); _approvalSSEHealthTimer = null; }
}
// ── Clarify polling ──
+12 -7
View File
@@ -850,9 +850,12 @@ function _syncCtxIndicator(usage){
const wrap=$('ctxIndicatorWrap');
const el=$('ctxIndicator');
if(!el)return;
// Use input_tokens as fallback when last_prompt_tokens is not available
const promptTok=usage.last_prompt_tokens||usage.input_tokens||0;
const totalTok=(usage.input_tokens||0)+(usage.output_tokens||0);
const ctxWindow=usage.context_length||0;
// Default context window to 128K when not provided by backend
const DEFAULT_CTX=128*1024;
const ctxWindow=usage.context_length||DEFAULT_CTX;
const cost=usage.estimated_cost;
// Show indicator whenever we have any usage data (tokens or cost)
if(!promptTok&&!totalTok&&!cost){
@@ -860,8 +863,8 @@ function _syncCtxIndicator(usage){
return;
}
if(wrap) wrap.style.display='';
const hasCtxWindow=!!(promptTok&&ctxWindow);
const pct=hasCtxWindow?Math.min(100,Math.round((promptTok/ctxWindow)*100)):0;
const hasPromptTok=!!promptTok;
const pct=hasPromptTok?Math.min(100,Math.round((promptTok/ctxWindow)*100)):0;
const ring=$('ctxRingValue');
const center=$('ctxPercent');
const usageLine=$('ctxTooltipUsage');
@@ -873,7 +876,8 @@ function _syncCtxIndicator(usage){
ring.style.strokeDasharray=String(circumference);
ring.style.strokeDashoffset=String(circumference*(1-pct/100));
}
if(center) center.textContent=hasCtxWindow?String(pct):'\u00b7';
if(center) center.textContent=hasPromptTok?String(pct):'\u00b7';
const hasExplicitCtx=!!usage.context_length;
el.classList.toggle('ctx-mid',pct>50&&pct<=75);
el.classList.toggle('ctx-high',pct>75);
// ── Compress affordance (#524) ──
@@ -900,11 +904,12 @@ function _syncCtxIndicator(usage){
compressWrap.style.display='none';
}
}
let label=hasCtxWindow?`Context window ${pct}% used`:`${_fmtTokens(totalTok)} tokens used`;
let label=hasPromptTok?`Context window ${pct}% used`:`${_fmtTokens(totalTok)} tokens used`;
if(!hasExplicitCtx&&hasPromptTok) label+=' (est. 128K)';
if(cost) label+=` \u00b7 $${cost<0.01?cost.toFixed(4):cost.toFixed(2)}`;
el.setAttribute('aria-label',label);
if(usageLine) usageLine.textContent=hasCtxWindow?`${pct}% used (${Math.max(0,100-pct)}% left)`:`${_fmtTokens(totalTok)} tokens used`;
if(tokensLine) tokensLine.textContent=hasCtxWindow?`${_fmtTokens(promptTok)} / ${_fmtTokens(ctxWindow)} tokens used`:`In: ${_fmtTokens(usage.input_tokens||0)} \u00b7 Out: ${_fmtTokens(usage.output_tokens||0)}`;
if(usageLine) usageLine.textContent=hasPromptTok?`${pct}% used (${Math.max(0,100-pct)}% left)`:`${_fmtTokens(totalTok)} tokens used`;
if(tokensLine) tokensLine.textContent=hasPromptTok?`${_fmtTokens(promptTok)} / ${_fmtTokens(ctxWindow)} tokens used`:`In: ${_fmtTokens(usage.input_tokens||0)} \u00b7 Out: ${_fmtTokens(usage.output_tokens||0)}`;
const threshold=usage.threshold_tokens||0;
if(thresholdLine){
if(threshold&&ctxWindow){
+2 -2
View File
@@ -23,8 +23,8 @@ INDEX_HTML = (REPO_ROOT / "static" / "index.html").read_text(encoding="utf-8")
def test_submit_pending_appends_to_list():
"""submit_pending() must append to a list, not overwrite."""
# The new wrapper must contain queue.append
assert "queue.append(entry)" in ROUTES_SRC, \
# The new wrapper must contain a queue append (list mutation pattern)
assert "queue_list.append(entry)" in ROUTES_SRC or "queue.append(entry)" in ROUTES_SRC, \
"submit_pending() must append entry to a list queue, not overwrite _pending[sid]"
+487
View File
@@ -0,0 +1,487 @@
"""Tests for the approval SSE (Server-Sent Events) long-connection implementation.
Verifies:
- SSE subscribe/unsubscribe/notify lifecycle
- Initial snapshot delivery on connect
- Instant push when submit_pending() fires
- Client disconnect triggers unsubscribe cleanup
- Multiple concurrent subscribers per session
- Queue overflow (slow subscriber) drops silently
- Cross-session isolation (notify only reaches matching subscribers)
- Frontend EventSource / fallback polling patterns
"""
import json
import pathlib
import queue
import re
import sys
import threading
import time
import uuid
REPO_ROOT = pathlib.Path(__file__).parent.parent.resolve()
sys.path.insert(0, str(REPO_ROOT))
ROUTES_SRC = (REPO_ROOT / "api" / "routes.py").read_text(encoding="utf-8")
MESSAGES_JS = (REPO_ROOT / "static" / "messages.js").read_text(encoding="utf-8")
# ═══════════════════════════════════════════════════════════════════════════════
# 1. Static-analysis tests (no server needed)
# ═══════════════════════════════════════════════════════════════════════════════
class TestSSEStaticAnalysis:
"""Verify the SSE infrastructure exists and is wired correctly in routes.py."""
def test_sse_route_registered(self):
"""The /api/approval/stream route must be registered."""
assert '"/api/approval/stream"' in ROUTES_SRC, \
"Route /api/approval/stream must be registered in the URL dispatch"
def test_sse_handler_function_exists(self):
"""_handle_approval_sse_stream handler must exist."""
assert "def _handle_approval_sse_stream(" in ROUTES_SRC, \
"_handle_approval_sse_stream handler function must exist"
def test_subscribe_function_exists(self):
"""_approval_sse_subscribe must exist and use a Queue."""
assert "def _approval_sse_subscribe(" in ROUTES_SRC, \
"_approval_sse_subscribe must be defined"
def test_unsubscribe_function_exists(self):
"""_approval_sse_unsubscribe must exist and clean up empty lists."""
assert "def _approval_sse_unsubscribe(" in ROUTES_SRC, \
"_approval_sse_unsubscribe must be defined"
def test_notify_function_exists(self):
"""_approval_sse_notify must exist and push to subscriber queues."""
assert "def _approval_sse_notify(" in ROUTES_SRC, \
"_approval_sse_notify must be defined"
def test_sse_subscribers_dict_exists(self):
"""Module-level _approval_sse_subscribers dict must exist."""
assert "_approval_sse_subscribers" in ROUTES_SRC, \
"_approval_sse_subscribers module-level dict must exist"
def test_sse_content_type(self):
"""SSE handler must set text/event-stream content type."""
assert "text/event-stream" in ROUTES_SRC, \
"SSE handler must set Content-Type to text/event-stream"
def test_sse_keepalive(self):
"""SSE handler must send keepalive comments to prevent proxy timeout."""
assert "keepalive" in ROUTES_SRC, \
"SSE handler must send keepalive comments"
def test_sse_cache_control(self):
"""SSE handler must set Cache-Control: no-cache."""
assert "no-cache" in ROUTES_SRC, \
"SSE handler must set Cache-Control: no-cache"
def test_sse_initial_snapshot(self):
"""SSE handler must send initial snapshot on connect."""
assert "'initial'" in ROUTES_SRC, \
"SSE handler must send an 'initial' event with snapshot data"
def test_sse_approval_event(self):
"""SSE handler must send 'approval' events on push."""
assert "'approval'" in ROUTES_SRC, \
"SSE handler must send 'approval' events when pushing notifications"
def test_notify_called_from_submit_pending(self):
"""submit_pending must call _approval_sse_notify_locked."""
# Pinned to the inner-lock variant: must run inside the same `with _lock:`
# block as the queue mutation so two parallel submit_pending calls can't
# deliver out-of-order with stale pending_count. Tracks the v0.50.248
# MUST-FIX A fix.
assert "_approval_sse_notify_locked(session_key, head, total)" in ROUTES_SRC, \
("submit_pending() must call _approval_sse_notify_locked(session_key, head, total) "
"from inside the `with _lock:` block — not the unlocked _approval_sse_notify wrapper, "
"and head must be queue_list[0] (the head, not the just-appended entry).")
def test_unsubscribe_in_finally(self):
"""SSE handler must unsubscribe in a finally block."""
# Find the finally block that calls _approval_sse_unsubscribe
assert re.search(r"finally:.*\n.*_approval_sse_unsubscribe\(", ROUTES_SRC, re.DOTALL), \
"SSE handler must call _approval_sse_unsubscribe in a finally block"
def test_client_disconnect_handled(self):
"""SSE handler must catch client disconnect errors."""
assert "_CLIENT_DISCONNECT_ERRORS" in ROUTES_SRC, \
"SSE handler must catch client disconnect errors"
def test_subscriber_queue_maxsize(self):
"""Subscriber queues must have a bounded maxsize to prevent memory leaks."""
assert "queue.Queue(maxsize=" in ROUTES_SRC, \
"Subscriber queues must have maxsize set to prevent unbounded memory growth"
def test_notify_drops_on_full(self):
"""_approval_sse_notify must silently drop events when subscriber is slow."""
# The queue.Full exception handler
assert "queue.Full" in ROUTES_SRC, \
"_approval_sse_notify must handle queue.Full to drop events for slow subscribers"
def test_subscribe_uses_shared_lock(self):
"""subscribe/unsubscribe/notify must all use the same _lock."""
# All three functions must use _lock
for func in ["_approval_sse_subscribe", "_approval_sse_unsubscribe", "_approval_sse_notify"]:
# Find the function and verify it uses "with _lock"
func_start = ROUTES_SRC.find(f"def {func}(")
assert func_start != -1, f"{func} must exist"
# Find the next function definition after this one
next_func = ROUTES_SRC.find("\ndef ", func_start + 1)
func_body = ROUTES_SRC[func_start:next_func] if next_func != -1 else ROUTES_SRC[func_start:]
assert "with _lock:" in func_body, \
f"{func} must use 'with _lock:' for thread safety"
def test_unsubscribe_cleans_empty_session(self):
"""Unsubscribe must remove empty session keys from the dict."""
assert "_approval_sse_subscribers.pop(session_id, None)" in ROUTES_SRC, \
"_approval_sse_unsubscribe must pop session_id when subscriber list is empty"
class TestFrontendSSEImplementation:
"""Verify the frontend JavaScript SSE implementation."""
def test_eventsource_used(self):
"""Frontend must use EventSource for SSE connection."""
assert "new EventSource(" in MESSAGES_JS, \
"startApprovalPolling must create an EventSource for SSE"
def test_sse_url_matches_backend(self):
"""Frontend SSE URL must match backend /api/approval/stream route."""
assert "/api/approval/stream" in MESSAGES_JS, \
"EventSource must connect to /api/approval/stream"
def test_initial_event_listener(self):
"""Frontend must listen for 'initial' SSE events."""
assert "'initial'" in MESSAGES_JS or '"initial"' in MESSAGES_JS, \
"Frontend must addEventListener for 'initial' SSE events"
def test_approval_event_listener(self):
"""Frontend must listen for 'approval' SSE events."""
assert "'approval'" in MESSAGES_JS or '"approval"' in MESSAGES_JS, \
"Frontend must addEventListener for 'approval' SSE events"
def test_onerror_fallback_to_polling(self):
"""onerror must fall back to HTTP polling."""
assert "_startApprovalFallbackPoll" in MESSAGES_JS, \
"SSE onerror handler must call _startApprovalFallbackPoll"
def test_fallback_poll_interval(self):
"""Fallback polling interval must match v0.50.247's 1500ms cadence."""
assert "1500" in MESSAGES_JS, \
"Fallback polling interval must be 1500ms to match degraded-mode parity with v0.50.247"
def test_fallback_closes_eventsource(self):
"""onerror handler must close the EventSource before falling back."""
# The onerror handler should call es.close()
assert "es.close()" in MESSAGES_JS, \
"onerror handler must close the EventSource before falling back"
def test_stop_closes_eventsource(self):
"""stopApprovalPolling must close EventSource."""
assert "_approvalEventSource.close()" in MESSAGES_JS, \
"stopApprovalPolling must close _approvalEventSource"
def test_health_timer_cleanup(self):
"""stopApprovalPolling must clear the SSE health timer."""
assert "_approvalSSEHealthTimer" in MESSAGES_JS, \
"SSE health timer must be tracked and cleared in stopApprovalPolling"
# ═══════════════════════════════════════════════════════════════════════════════
# 2. Unit tests (in-process, no HTTP server)
# ═══════════════════════════════════════════════════════════════════════════════
class TestSSESubscribeUnsubscribe:
"""Test the subscribe/unsubscribe lifecycle."""
def setup_method(self):
"""Clean SSE subscriber state before each test."""
from api import routes as r
with r._lock:
r._approval_sse_subscribers.clear()
def teardown_method(self):
"""Clean up after each test."""
from api import routes as r
with r._lock:
r._approval_sse_subscribers.clear()
def test_subscribe_returns_queue(self):
"""_approval_sse_subscribe must return a Queue."""
from api import routes as r
sid = f"sse-test-{uuid.uuid4().hex[:8]}"
q = r._approval_sse_subscribe(sid)
assert isinstance(q, queue.Queue), "subscribe must return a queue.Queue"
# Cleanup
r._approval_sse_unsubscribe(sid, q)
def test_subscribe_registers_subscriber(self):
"""After subscribe, the queue must appear in _approval_sse_subscribers."""
from api import routes as r
sid = f"sse-reg-{uuid.uuid4().hex[:8]}"
q = r._approval_sse_subscribe(sid)
try:
with r._lock:
subs = r._approval_sse_subscribers.get(sid, [])
assert q in subs, "Subscribed queue must be in the subscribers list"
finally:
r._approval_sse_unsubscribe(sid, q)
def test_unsubscribe_removes_queue(self):
"""After unsubscribe, the queue must not be in the subscribers list."""
from api import routes as r
sid = f"sse-unsub-{uuid.uuid4().hex[:8]}"
q = r._approval_sse_subscribe(sid)
r._approval_sse_unsubscribe(sid, q)
with r._lock:
subs = r._approval_sse_subscribers.get(sid, [])
assert q not in subs, "Unsubscribed queue must not be in the list"
def test_unsubscribe_removes_empty_session_key(self):
"""When the last subscriber is removed, the session key must be cleaned up."""
from api import routes as r
sid = f"sse-empty-{uuid.uuid4().hex[:8]}"
q = r._approval_sse_subscribe(sid)
r._approval_sse_unsubscribe(sid, q)
with r._lock:
assert sid not in r._approval_sse_subscribers, \
"Session key must be removed when subscriber list is empty"
def test_unsubscribe_idempotent(self):
"""Unsubscribing twice must not raise."""
from api import routes as r
sid = f"sse-idem-{uuid.uuid4().hex[:8]}"
q = r._approval_sse_subscribe(sid)
r._approval_sse_unsubscribe(sid, q)
r._approval_sse_unsubscribe(sid, q) # should not raise
def test_unsubscribe_unknown_queue_noop(self):
"""Unsubscribing a queue that was never subscribed must not crash."""
from api import routes as r
sid = f"sse-noop-{uuid.uuid4().hex[:8]}"
q = queue.Queue()
r._approval_sse_unsubscribe(sid, q) # should not raise
class TestSSENotify:
"""Test the notification mechanism."""
def setup_method(self):
from api import routes as r
with r._lock:
r._approval_sse_subscribers.clear()
def teardown_method(self):
from api import routes as r
with r._lock:
r._approval_sse_subscribers.clear()
def test_notify_delivers_payload(self):
"""_approval_sse_notify must put the payload on subscriber queues."""
from api import routes as r
sid = f"sse-notify-{uuid.uuid4().hex[:8]}"
q = r._approval_sse_subscribe(sid)
try:
entry = {"command": "rm -rf /tmp/test", "pattern_key": "delete"}
r._approval_sse_notify(sid, entry, 1)
payload = q.get(timeout=1)
assert payload["pending"]["command"] == "rm -rf /tmp/test"
assert payload["pending_count"] == 1
finally:
r._approval_sse_unsubscribe(sid, q)
def test_notify_multiple_subscribers(self):
"""All subscribers for a session must receive the notification."""
from api import routes as r
sid = f"sse-multi-{uuid.uuid4().hex[:8]}"
q1 = r._approval_sse_subscribe(sid)
q2 = r._approval_sse_subscribe(sid)
q3 = r._approval_sse_subscribe(sid)
try:
entry = {"command": "test-cmd"}
r._approval_sse_notify(sid, entry, 2)
for q in [q1, q2, q3]:
payload = q.get(timeout=1)
assert payload["pending"]["command"] == "test-cmd"
assert payload["pending_count"] == 2
finally:
for q in [q1, q2, q3]:
r._approval_sse_unsubscribe(sid, q)
def test_notify_cross_session_isolation(self):
"""Notify for session A must NOT deliver to session B subscribers."""
from api import routes as r
sid_a = f"sse-iso-a-{uuid.uuid4().hex[:8]}"
sid_b = f"sse-iso-b-{uuid.uuid4().hex[:8]}"
qa = r._approval_sse_subscribe(sid_a)
qb = r._approval_sse_subscribe(sid_b)
try:
entry = {"command": "only-for-a"}
r._approval_sse_notify(sid_a, entry, 1)
# qa should have the event
payload = qa.get(timeout=1)
assert payload["pending"]["command"] == "only-for-a"
# qb should be empty
assert qb.empty(), "Session B subscriber must not receive session A events"
finally:
r._approval_sse_unsubscribe(sid_a, qa)
r._approval_sse_unsubscribe(sid_b, qb)
def test_notify_no_subscribers_is_noop(self):
"""Notifying a session with no subscribers must not raise."""
from api import routes as r
sid = f"sse-nosub-{uuid.uuid4().hex[:8]}"
r._approval_sse_notify(sid, {"command": "test"}, 1) # should not raise
def test_notify_drops_on_full_queue(self):
"""When subscriber queue is full, events must be silently dropped."""
from api import routes as r
sid = f"sse-full-{uuid.uuid4().hex[:8]}"
q = r._approval_sse_subscribe(sid)
try:
# Fill the queue (maxsize=16)
for i in range(20):
r._approval_sse_notify(sid, {"command": f"cmd-{i}"}, i + 1)
# Queue should have at most 16 items
assert q.qsize() <= 16, "Queue must not exceed maxsize"
assert q.qsize() > 0, "Queue should have some items"
finally:
r._approval_sse_unsubscribe(sid, q)
class TestSSENotifyFromSubmitPending:
"""Test that submit_pending triggers SSE notifications."""
def setup_method(self):
from api import routes as r
with r._lock:
r._approval_sse_subscribers.clear()
r._pending.clear()
def teardown_method(self):
from api import routes as r
with r._lock:
r._approval_sse_subscribers.clear()
r._pending.clear()
def test_submit_pending_notifies_sse_subscriber(self):
"""submit_pending must push an SSE event to subscribers."""
from api import routes as r
sid = f"sse-submit-{uuid.uuid4().hex[:8]}"
q = r._approval_sse_subscribe(sid)
try:
r.submit_pending(sid, {
"command": "rm -rf /tmp/test",
"pattern_key": "recursive delete",
"pattern_keys": ["recursive delete"],
"description": "recursive delete",
})
payload = q.get(timeout=1)
assert payload["pending"]["command"] == "rm -rf /tmp/test"
assert payload["pending_count"] == 1
finally:
r._approval_sse_unsubscribe(sid, q)
def test_submit_pending_delivers_count(self):
"""Multiple submit_pending calls must report correct pending_count."""
from api import routes as r
sid = f"sse-count-{uuid.uuid4().hex[:8]}"
q = r._approval_sse_subscribe(sid)
try:
for i in range(3):
r.submit_pending(sid, {
"command": f"cmd-{i}",
"pattern_key": f"p{i}",
"pattern_keys": [f"p{i}"],
"description": f"d{i}",
})
for expected_count in [1, 2, 3]:
payload = q.get(timeout=1)
assert payload["pending_count"] == expected_count, \
f"Expected pending_count={expected_count}, got {payload['pending_count']}"
finally:
r._approval_sse_unsubscribe(sid, q)
class TestSSEConcurrency:
"""Test thread safety of SSE subscribe/unsubscribe/notify."""
def setup_method(self):
from api import routes as r
with r._lock:
r._approval_sse_subscribers.clear()
r._pending.clear()
def teardown_method(self):
from api import routes as r
with r._lock:
r._approval_sse_subscribers.clear()
r._pending.clear()
def test_concurrent_subscribe_unsubscribe(self):
"""Concurrent subscribe/unsubscribe must not corrupt state."""
from api import routes as r
sid = f"sse-conc-{uuid.uuid4().hex[:8]}"
errors = []
queues = []
def worker():
try:
for _ in range(50):
q = r._approval_sse_subscribe(sid)
queues.append(q)
r._approval_sse_unsubscribe(sid, q)
except Exception as e:
errors.append(e)
threads = [threading.Thread(target=worker) for _ in range(4)]
for t in threads:
t.start()
for t in threads:
t.join(timeout=10)
assert not errors, f"Concurrent subscribe/unsubscribe errors: {errors}"
# After all threads finish, no queues should remain
with r._lock:
subs = r._approval_sse_subscribers.get(sid, [])
assert len(subs) == 0, "All subscribers should be cleaned up"
def test_concurrent_notify_while_subscribing(self):
"""Notify while new subscribers are joining must not deadlock or crash."""
from api import routes as r
sid = f"sse-notsub-{uuid.uuid4().hex[:8]}"
errors = []
def notifier():
try:
for i in range(100):
r._approval_sse_notify(sid, {"command": f"cmd-{i}"}, 1)
except Exception as e:
errors.append(e)
def subscriber():
try:
for _ in range(50):
q = r._approval_sse_subscribe(sid)
time.sleep(0.001)
r._approval_sse_unsubscribe(sid, q)
except Exception as e:
errors.append(e)
threads = [
threading.Thread(target=notifier),
threading.Thread(target=subscriber),
threading.Thread(target=subscriber),
]
for t in threads:
t.start()
for t in threads:
t.join(timeout=15)
assert not errors, f"Concurrent notify/subscribe errors: {errors}"
with r._lock:
r._approval_sse_subscribers.clear()
+108
View File
@@ -0,0 +1,108 @@
"""Test that the SSE subscribe + snapshot are taken atomically under _lock.
Regression test for the snapshot/subscribe race condition: if subscribe
happens AFTER the snapshot, a submit_pending() that fires in the gap is
both appended to _pending (after our snapshot) AND notified to subscribers
(before we joined) the client never learns about it until the next event.
The fix in v0.50.248 takes the lock once, registers the subscriber queue,
THEN reads the snapshot all under the same lock acquisition.
This test verifies the source-level invariant rather than the runtime
behavior: the subscriber-registration line MUST appear inside the same
`with _lock:` block as the snapshot read, and BEFORE the snapshot read.
"""
import pathlib
import sys
REPO_ROOT = pathlib.Path(__file__).parent.parent.resolve()
sys.path.insert(0, str(REPO_ROOT))
ROUTES_SRC = (REPO_ROOT / "api" / "routes.py").read_text(encoding="utf-8")
def _extract_lock_block(body: str) -> str:
"""Extract the body of the first `with _lock:` block from the handler.
Lines are part of the block as long as they are blank or start with the
block's indent (>= 8 spaces, since the handler body itself is at 4 spaces
and the lock block indents one level deeper).
"""
lines = body.split("\n")
out: list[str] = []
in_block = False
block_indent = None
for line in lines:
if not in_block:
if line.strip() == "with _lock:":
in_block = True
continue
# Determine block indent from the first non-empty line we see inside.
if block_indent is None:
stripped = line.lstrip(" ")
if stripped == "":
continue # blank lines don't set indent
block_indent = len(line) - len(stripped)
out.append(line)
continue
# Continuation: blank lines OK, otherwise must be at >= block_indent.
if line.strip() == "":
out.append(line)
continue
line_indent = len(line) - len(line.lstrip(" "))
if line_indent >= block_indent:
out.append(line)
else:
break
return "\n".join(out)
def _handler_body() -> str:
start = ROUTES_SRC.find("def _handle_approval_sse_stream(")
assert start != -1, "_handle_approval_sse_stream must exist"
end = ROUTES_SRC.find("\ndef ", start + 1)
return ROUTES_SRC[start:end if end != -1 else len(ROUTES_SRC)]
def test_snapshot_taken_under_lock():
"""The initial _pending snapshot must be guarded by `with _lock:`."""
lock_body = _extract_lock_block(_handler_body())
assert lock_body, "_handle_approval_sse_stream must contain a `with _lock:` block"
assert "_pending.get(sid)" in lock_body, \
"Initial snapshot of _pending must be read inside the `with _lock:` block"
def test_subscriber_registered_inside_lock():
"""The subscriber queue must be registered inside the same `with _lock:` block."""
lock_body = _extract_lock_block(_handler_body())
assert lock_body, "Handler must contain a `with _lock:` block"
assert "_approval_sse_subscribers" in lock_body and "append(q)" in lock_body, \
("Subscriber registration (`_approval_sse_subscribers.setdefault(sid, []).append(q)`) "
"must happen inside the same `with _lock:` block as the snapshot. "
"Otherwise a submit_pending() between snapshot-and-subscribe is lost.")
def test_subscribe_before_snapshot_in_lock():
"""Inside the lock, the subscriber must be registered BEFORE reading the snapshot."""
lock_body = _extract_lock_block(_handler_body())
assert lock_body, "Handler must contain a `with _lock:` block"
sub_idx = lock_body.find("_approval_sse_subscribers")
snap_idx = lock_body.find("_pending.get(sid)")
assert sub_idx != -1, "Subscriber registration must be inside the lock"
assert snap_idx != -1, "Snapshot read must be inside the lock"
assert sub_idx < snap_idx, (
"Subscriber registration must come BEFORE the snapshot read inside the lock. "
"Otherwise an approval arriving between subscribe and snapshot is silently dropped."
)
def test_no_double_subscribe_outside_lock():
"""The handler must not also call `_approval_sse_subscribe()` (legacy code path)."""
body = _handler_body()
assert "= _approval_sse_subscribe(sid)" not in body, (
"_handle_approval_sse_stream must not call _approval_sse_subscribe() — "
"the atomic version inlines subscribe inside the snapshot lock block."
)
+259
View File
@@ -0,0 +1,259 @@
"""Regression tests for the v0.50.248 SSE notify-on-respond fix.
Originally PR #1350 only called `_approval_sse_notify` from `submit_pending`,
not from `_handle_approval_respond`. With the parallel-tool-call scenario
that PR #527 supports:
1. submit_pending(A) -> SSE pushes (A, 1). UI shows A.
2. submit_pending(B) -> SSE pushes (B, 2). UI shows B. (bug: was sending B as head, not A)
3. respond(B) -> queue still contains A. UI hides card.
NO event fires. A is invisible until next event.
Pre-release Opus review caught two MUST-FIX bugs:
A. notify-ordering race: notify outside _lock could deliver out-of-order
under contention.
C. Trailing approval lost: respond never re-emitted the new head.
D. Payload was tail-not-head: with #527 parallel approvals, /api/approval/pending
returns head, but SSE was returning the just-appended entry (tail).
The fix:
- `_approval_sse_notify_locked(sid, head, total)` runs inside the caller's
held `_lock` so two parallel callers serialize their notifies in the same
order they serialize their queue mutations.
- submit_pending now passes `head = queue_list[0]` (head-of-queue), not the
just-appended entry.
- _handle_approval_respond now calls _approval_sse_notify_locked after the
pop with the new head (or None/0 if queue is empty).
"""
import pathlib
import queue
import sys
import time
import uuid
REPO_ROOT = pathlib.Path(__file__).parent.parent.resolve()
sys.path.insert(0, str(REPO_ROOT))
class TestParallelApprovalsHeadFidelity:
"""SSE payload must always reflect head-of-queue, never tail."""
def setup_method(self):
from api import routes as r
with r._lock:
r._approval_sse_subscribers.clear()
r._pending.clear()
def teardown_method(self):
from api import routes as r
with r._lock:
r._approval_sse_subscribers.clear()
r._pending.clear()
def test_second_submit_pending_sends_head_not_tail(self):
"""When B is appended while A is still pending, SSE payload must show A as head."""
from api import routes as r
sid = f"sse-headtail-{uuid.uuid4().hex[:8]}"
q = r._approval_sse_subscribe(sid)
try:
r.submit_pending(sid, {
"command": "first-A",
"pattern_key": "first",
"pattern_keys": ["first"],
"description": "A",
})
r.submit_pending(sid, {
"command": "second-B",
"pattern_key": "second",
"pattern_keys": ["second"],
"description": "B",
})
# First payload should be A (just-appended, also head).
p1 = q.get(timeout=1)
assert p1["pending"]["command"] == "first-A"
assert p1["pending_count"] == 1
# Second payload's HEAD is still A (we appended B but A is still queued).
p2 = q.get(timeout=1)
assert p2["pending"]["command"] == "first-A", (
"SSE payload must show head-of-queue (A), not tail (B). "
f"Got: {p2['pending']['command']}"
)
assert p2["pending_count"] == 2
finally:
r._approval_sse_unsubscribe(sid, q)
class TestRespondNotifiesTrailingApproval:
"""After respond() pops one approval, SSE must re-emit the new head if any."""
def setup_method(self):
from api import routes as r
with r._lock:
r._approval_sse_subscribers.clear()
r._pending.clear()
def teardown_method(self):
from api import routes as r
with r._lock:
r._approval_sse_subscribers.clear()
r._pending.clear()
def test_respond_to_first_pushes_second_as_new_head(self):
"""submit A; submit B; respond(A) -> SSE must push (B, 1) so the UI surfaces B."""
from api import routes as r
sid = f"sse-trailing-{uuid.uuid4().hex[:8]}"
# Subscribe BEFORE any submit so we capture all events deterministically.
sub_q = r._approval_sse_subscribe(sid)
try:
r.submit_pending(sid, {
"command": "first-A",
"pattern_key": "p1",
"pattern_keys": ["p1"],
"description": "A",
"approval_id": "id-A",
})
r.submit_pending(sid, {
"command": "second-B",
"pattern_key": "p2",
"pattern_keys": ["p2"],
"description": "B",
"approval_id": "id-B",
})
# Drain the two submit-driven events.
sub_q.get(timeout=1) # head=A, total=1
sub_q.get(timeout=1) # head=A, total=2 (head is still A)
# Now simulate respond(A) by directly invoking the lock+pop+notify
# sequence the route handler runs. (Calling _handle_approval_respond
# would require an HTTP handler mock; the inner sequence is what we
# need to verify.)
from api.routes import _approval_sse_notify_locked, _lock, _pending
with _lock:
qlist = _pending.get(sid)
# Pop A by approval_id
for i, e in enumerate(qlist):
if e.get("approval_id") == "id-A":
qlist.pop(i)
break
# Re-emit head
if isinstance(_pending.get(sid), list) and _pending[sid]:
_approval_sse_notify_locked(sid, _pending[sid][0], len(_pending[sid]))
else:
_approval_sse_notify_locked(sid, None, 0)
# SSE must push (B, 1) so the UI surfaces the trailing approval.
p3 = sub_q.get(timeout=1)
assert p3["pending"] is not None, \
"After responding to A, SSE must emit the new head B (not None)"
assert p3["pending"]["command"] == "second-B", \
f"New head should be B, got: {p3['pending']['command']}"
assert p3["pending_count"] == 1
finally:
r._approval_sse_unsubscribe(sid, sub_q)
def test_respond_to_only_pending_pushes_empty_state(self):
"""If respond pops the last entry, SSE must push a None/0 sentinel so UI hides card."""
from api import routes as r
sid = f"sse-empty-{uuid.uuid4().hex[:8]}"
sub_q = r._approval_sse_subscribe(sid)
try:
r.submit_pending(sid, {
"command": "only-A",
"pattern_key": "p",
"pattern_keys": ["p"],
"description": "A",
"approval_id": "id-only-A",
})
sub_q.get(timeout=1) # drain submit notify
from api.routes import _approval_sse_notify_locked, _lock, _pending
with _lock:
qlist = _pending.get(sid)
for i, e in enumerate(qlist):
if e.get("approval_id") == "id-only-A":
qlist.pop(i)
break
if not qlist:
_pending.pop(sid, None)
if isinstance(_pending.get(sid), list) and _pending[sid]:
_approval_sse_notify_locked(sid, _pending[sid][0], len(_pending[sid]))
else:
_approval_sse_notify_locked(sid, None, 0)
payload = sub_q.get(timeout=1)
assert payload["pending"] is None, \
"After responding to the only approval, SSE must push pending=None"
assert payload["pending_count"] == 0
finally:
r._approval_sse_unsubscribe(sid, sub_q)
class TestNotifyOrderUnderContention:
"""Two parallel submit_pending callers must deliver in queue-mutation order."""
def setup_method(self):
from api import routes as r
with r._lock:
r._approval_sse_subscribers.clear()
r._pending.clear()
def teardown_method(self):
from api import routes as r
with r._lock:
r._approval_sse_subscribers.clear()
r._pending.clear()
def test_pending_count_is_monotonic_under_contention(self):
"""Under parallel submit_pending, pending_count must be monotonically increasing.
Pre-fix: notify outside _lock meant T2's notify could fire before T1's,
with subscribers seeing pending_count=2 then pending_count=1. Now that
notify runs inside _lock alongside the append, the order is guaranteed.
"""
import threading
from api import routes as r
sid = f"sse-order-{uuid.uuid4().hex[:8]}"
sub_q = r._approval_sse_subscribe(sid)
try:
errors = []
barrier = threading.Barrier(8)
def submitter(idx):
try:
barrier.wait(timeout=2)
r.submit_pending(sid, {
"command": f"cmd-{idx}",
"pattern_key": f"p{idx}",
"pattern_keys": [f"p{idx}"],
"description": f"d{idx}",
})
except Exception as e:
errors.append(e)
threads = [threading.Thread(target=submitter, args=(i,)) for i in range(8)]
for t in threads:
t.start()
for t in threads:
t.join(timeout=5)
assert not errors, f"Submitter errors: {errors}"
# Drain payloads — count must go 1, 2, 3, ..., 8 in some order
# consistent with the queue serialization. Specifically, never decrease.
counts = []
for _ in range(8):
p = sub_q.get(timeout=2)
counts.append(p["pending_count"])
assert counts == sorted(counts), (
f"pending_count must be monotonically increasing under contention. "
f"Got: {counts}. Pre-fix this could be out-of-order."
)
assert counts == list(range(1, 9)), \
f"Expected [1..8], got {counts}"
finally:
r._approval_sse_unsubscribe(sid, sub_q)