Merge pull request #1365 from nesquena/release/v0.50.249

release: v0.50.249
This commit is contained in:
nesquena-hermes
2026-04-30 15:02:27 -07:00
committed by GitHub
13 changed files with 757 additions and 18 deletions
+12
View File
@@ -2,6 +2,18 @@
## [Unreleased]
## [v0.50.249] — 2026-04-30
### Added
- **Real-time clarify notifications via SSE long-connection** — replaces the 1.5s HTTP polling loop for clarify (`/api/clarify/pending`) with a Server-Sent Events endpoint at `/api/clarify/stream?session_id=` that pushes clarify events to the browser the instant they fire. Mirrors the approval-SSE pattern shipped in v0.50.248 (#1350) including all the correctness lessons learned during that release: atomic subscribe + initial snapshot inside a single `with clarify._lock:` block (no snapshot/subscribe race), `_clarify_sse_notify` invoked from inside `_lock` in both `submit_pending` and `resolve_clarify` (no notify-ordering race), payload built from `q[0].data` head-of-queue (not the just-appended entry), and `resolve_clarify` re-emits the new head (or `None`/`0` when empty) so trailing clarify prompts never get stuck. Frontend uses `EventSource` with automatic 3s HTTP polling fallback on `onerror`, plus a 60s reconnect timer to recover from silently-broken connections. Bounded `queue.Queue(maxsize=16)` per subscriber with silent drop on full prevents memory leaks from slow tabs. 29 new static-analysis + unit + concurrency tests. (`api/clarify.py`, `api/routes.py`, `static/messages.js`, `tests/test_clarify_sse.py`) @fxd-jason — PR #1355
### Fixed
- **Context window indicator no longer shows misleading "100% used (0% left)" when context_length is missing from the live SSE payload** — the v0.50.247 / PR #1348 fallback to `agent.model_metadata.get_model_context_length()` was applied to the session-save path but NOT to the live SSE `usage` event. For sessions on large-context models (e.g. claude-sonnet-4.6 via OpenRouter, 1M tokens) where the agent didn't have a compressor configured, `usage.context_length` was omitted from the SSE payload, the JS frontend defaulted to 128K, and cumulative `input_tokens` over multiple turns overflowed against the 128K default — clamping the ring to 100% with a tooltip claiming the context was "0% left." The fix mirrors the session-save fallback exactly: when `usage.context_length` is missing, resolve via `get_model_context_length(model, base_url)` and write it onto the `usage` dict before serialization. Symmetric fallback added for `last_prompt_tokens` (uses `s.last_prompt_tokens` instead of the cumulative `input_tokens` counter). Frontend now tracks `rawPct` separately from the clamped `pct`; when `rawPct > 100` the tooltip shows `${rawPct}% used (context exceeded)` instead of misleading users. (`api/streaming.py`, `static/ui.js`) — PR #1356
- **"Uploading…" composer status persists for the entire stream duration after a file upload** — `setComposerStatus('Uploading…')` was set before `uploadPendingFiles()` but never cleared after the upload completed; only `setBusy(false)` at the end of the agent stream eventually wiped it. Users saw "Uploading…" displayed during the agent response, which is misleading. The fix clears the status unconditionally after the upload await completes. UX defect, no behavior change to upload correctness or message text. (`static/messages.js`) — PR #1356
- **Imported CLI/gateway session metadata survives compact() round-trip**`Session.load_metadata_only().compact()` was dropping `is_cli_session`, `source_tag`, `session_source`, and `source_label`, so imported agent/Telegram/messaging sessions in the sidebar lost their provenance after the metadata-only fast path. Adds these four fields to `Session.__init__`, the `METADATA_FIELDS` save round-trip, and `compact()` output. Without this, sidebar payloads couldn't distinguish imported sessions from native WebUI ones. (`api/models.py`, `tests/test_gateway_sync.py`) @dso2ng — PR #1357
- **Sidebar collapses compression-lineage segments instead of showing every segment as a separate row** — when an agent session has a compression lineage (`_lineage_root_id` populated by the gateway-import path in `api/agent_sessions.py:169`), the sidebar previously listed each segment as its own top-level conversation, cluttering the list with what the user perceives as a single conversation. Adds a pure client-side helper `_collapseSessionLineageForSidebar()` that groups by `_lineage_root_id`/`lineage_root_id`/`parent_session_id`, keeps only the most recently active tip per group, and stores `_lineage_collapsed_count` on the visible row for future UI affordances. Non-destructive — no session JSON or messages are merged, deleted, or rewritten. Only collapses rows when lineage metadata is present. (`static/sessions.js`, `tests/test_session_lineage_collapse.py`) @dso2ng — PR #1358
- **Active session synchronizes across multiple browser tabs** — multiple WebUI tabs sharing the same `localStorage` would diverge from each other when one tab switched sessions, leaving idle tabs with stale in-memory active-session state until their next user action wrote into the wrong session. Adds a `storage` event listener on the `hermes-webui-session` localStorage key. Idle tabs auto-load the new active session and re-render the sidebar cache. Busy tabs (currently mid-turn) do not auto-switch — they show a brief toast instead, so the user notices but the active turn isn't interrupted. (`static/sessions.js`, `tests/test_session_cross_tab_sync.py`) @dso2ng — PR #1359
## [v0.50.248] — 2026-04-30
### Added
+50 -11
View File
@@ -6,6 +6,7 @@ clarification string instead of an approval decision.
from __future__ import annotations
import queue
import threading
import time
from typing import Optional
@@ -17,6 +18,9 @@ _pending: dict[str, dict] = {}
_gateway_queues: dict[str, list] = {}
_gateway_notify_cbs: dict[str, object] = {}
# ── SSE subscriber registry ─────────────────────────────────────────────
_clarify_sse_subscribers: dict[str, list[queue.Queue]] = {}
class _ClarifyEntry:
"""One pending clarify request inside a session."""
@@ -70,15 +74,46 @@ def _with_timeout_metadata(data: dict) -> dict:
return item
def _clarify_sse_notify(session_id: str, head: dict | None, total: int) -> None:
"""Push a clarify event to all SSE subscribers for a session."""
payload = {"pending": dict(head) if head else None, "pending_count": total}
for q in _clarify_sse_subscribers.get(session_id, ()):
try:
q.put_nowait(payload)
except queue.Full:
pass # drop if subscriber is slow
def sse_subscribe(session_id: str) -> queue.Queue:
"""Register a bounded Queue for SSE push to a given session."""
q: queue.Queue = queue.Queue(maxsize=16)
with _lock:
_clarify_sse_subscribers.setdefault(session_id, []).append(q)
return q
def sse_unsubscribe(session_id: str, q: queue.Queue) -> None:
"""Remove a subscriber Queue; clean up empty session entries."""
with _lock:
subs = _clarify_sse_subscribers.get(session_id)
if subs:
try:
subs.remove(q)
except ValueError:
pass
if not subs:
_clarify_sse_subscribers.pop(session_id, None)
def submit_pending(session_key: str, data: dict) -> _ClarifyEntry:
"""Queue a pending clarify request and notify the UI callback if registered."""
data = _with_timeout_metadata(data)
with _lock:
queue = _gateway_queues.setdefault(session_key, [])
gw_queue = _gateway_queues.setdefault(session_key, [])
# De-duplicate while unresolved: if the most recent pending clarify is
# semantically identical, reuse it instead of stacking duplicates.
if queue:
last = queue[-1]
if gw_queue:
last = gw_queue[-1]
if (
str(last.data.get("question", "")) == str(data.get("question", ""))
and list(last.data.get("choices_offered") or [])
@@ -87,7 +122,7 @@ def submit_pending(session_key: str, data: dict) -> _ClarifyEntry:
entry = last
cb = _gateway_notify_cbs.get(session_key)
# Keep _pending aligned to the oldest unresolved entry.
_pending[session_key] = queue[0].data
_pending[session_key] = gw_queue[0].data
if cb:
try:
cb(dict(entry.data))
@@ -96,9 +131,11 @@ def submit_pending(session_key: str, data: dict) -> _ClarifyEntry:
return entry
entry = _ClarifyEntry(data)
queue.append(entry)
_pending[session_key] = queue[0].data
gw_queue.append(entry)
_pending[session_key] = gw_queue[0].data
cb = _gateway_notify_cbs.get(session_key)
# Notify SSE subscribers from inside _lock for ordering guarantees.
_clarify_sse_notify(session_key, dict(gw_queue[0].data), len(gw_queue))
if cb:
try:
cb(data)
@@ -125,15 +162,17 @@ def has_pending(session_key: str) -> bool:
def resolve_clarify(session_key: str, response: str, resolve_all: bool = False) -> int:
"""Resolve the oldest pending clarify request for a session."""
with _lock:
queue = _gateway_queues.get(session_key)
if not queue:
q = _gateway_queues.get(session_key)
if not q:
_pending.pop(session_key, None)
return 0
entries = list(queue) if resolve_all else [queue.pop(0)]
if queue:
_pending[session_key] = queue[0].data
entries = list(q) if resolve_all else [q.pop(0)]
if q:
_pending[session_key] = q[0].data
_clarify_sse_notify(session_key, dict(q[0].data), len(q))
else:
_clear_queue_locked(session_key)
_clarify_sse_notify(session_key, None, 0)
count = 0
for entry in entries:
entry.result = response
+9
View File
@@ -347,6 +347,10 @@ class Session:
self.context_length = context_length
self.threshold_tokens = threshold_tokens
self.last_prompt_tokens = last_prompt_tokens
self.is_cli_session = bool(kwargs.get('is_cli_session', False))
self.source_tag = kwargs.get('source_tag')
self.session_source = kwargs.get('session_source')
self.source_label = kwargs.get('source_label')
self._metadata_message_count = None
@property
@@ -367,6 +371,7 @@ class Session:
'pending_user_message', 'pending_attachments', 'pending_started_at',
'compression_anchor_visible_idx', 'compression_anchor_message_key',
'context_length', 'threshold_tokens', 'last_prompt_tokens',
'is_cli_session', 'source_tag', 'session_source', 'source_label',
]
meta = {k: getattr(self, k, None) for k in METADATA_FIELDS}
meta['messages'] = self.messages
@@ -462,6 +467,10 @@ class Session:
'threshold_tokens': self.threshold_tokens,
'last_prompt_tokens': self.last_prompt_tokens,
'active_stream_id': self.active_stream_id,
'is_cli_session': self.is_cli_session,
'source_tag': self.source_tag,
'session_source': self.session_source,
'source_label': self.source_label,
'is_streaming': _is_streaming_session(
self.active_stream_id, active_stream_ids
) if include_runtime else False,
+78
View File
@@ -643,10 +643,13 @@ try:
submit_pending as submit_clarify_pending,
get_pending as get_clarify_pending,
resolve_clarify,
sse_subscribe as clarify_sse_subscribe,
sse_unsubscribe as clarify_sse_unsubscribe,
)
except ImportError:
submit_clarify_pending = lambda *a, **k: None
get_clarify_pending = lambda *a, **k: None
clarify_sse_subscribe = None
resolve_clarify = lambda *a, **k: 0
@@ -1271,6 +1274,9 @@ def handle_get(handler, parsed) -> bool:
if parsed.path == "/api/clarify/pending":
return _handle_clarify_pending(handler, parsed)
if parsed.path == "/api/clarify/stream":
return _handle_clarify_sse_stream(handler, parsed)
if parsed.path == "/api/clarify/inject_test":
# Loopback-only: used by automated tests; blocked from any remote client
if handler.client_address[0] != "127.0.0.1":
@@ -2879,6 +2885,78 @@ def _handle_clarify_pending(handler, parsed):
return j(handler, {"pending": None})
def _handle_clarify_sse_stream(handler, parsed):
"""SSE endpoint for real-time clarify notifications.
Long-lived connection that pushes clarify events the moment they arrive,
replacing the 1.5s polling loop. The frontend uses EventSource and falls
back to HTTP polling if the connection fails.
"""
if clarify_sse_subscribe is None:
return bad(handler, "clarify SSE not available")
sid = parse_qs(parsed.query).get("session_id", [""])[0]
if not sid:
return bad(handler, "session_id is required")
# Subscribe AND snapshot atomically. We import clarify's _lock so that
# subscribe and the snapshot read happen under the same mutex — same
# pattern as the approval SSE handler.
#
# NOTE: We must NOT call clarify.get_pending() here — it acquires _lock
# internally, which would deadlock since clarify._lock is a non-reentrant
# threading.Lock. Instead, read _gateway_queues / _pending inline under
# the lock we already hold.
from api.clarify import (
_lock as _clarify_lock,
_clarify_sse_subscribers as _clarify_subs,
_gateway_queues as _clarify_gateway_queues,
_pending as _clarify_pending,
)
q = queue.Queue(maxsize=16)
initial_pending = None
initial_count = 0
with _clarify_lock:
_clarify_subs.setdefault(sid, []).append(q)
gw_q = _clarify_gateway_queues.get(sid) or []
if gw_q:
initial_pending = dict(gw_q[0].data)
initial_count = len(gw_q)
else:
_legacy = _clarify_pending.get(sid)
if _legacy:
initial_pending = dict(_legacy)
initial_count = 1
handler.send_response(200)
handler.send_header('Content-Type', 'text/event-stream; charset=utf-8')
handler.send_header('Cache-Control', 'no-cache')
handler.send_header('X-Accel-Buffering', 'no')
handler.send_header('Connection', 'keep-alive')
handler.end_headers()
from api.streaming import _sse
# Push initial state immediately so the client doesn't miss anything.
_sse(handler, 'initial', {"pending": initial_pending, "pending_count": initial_count})
try:
while True:
try:
payload = q.get(timeout=30)
except queue.Empty:
handler.wfile.write(b': keepalive\n\n')
handler.wfile.flush()
continue
if payload is None:
break
_sse(handler, 'clarify', payload)
except _CLIENT_DISCONNECT_ERRORS:
pass
finally:
clarify_sse_unsubscribe(sid, q)
def _handle_clarify_inject(handler, parsed):
"""Inject a fake pending clarify prompt -- loopback-only, used by automated tests."""
qs = parse_qs(parsed.query)
+22
View File
@@ -2241,6 +2241,28 @@ def _run_agent_streaming(session_id, msg_text, model, workspace, stream_id, atta
usage['context_length'] = getattr(_cc, 'context_length', 0) or 0
usage['threshold_tokens'] = getattr(_cc, 'threshold_tokens', 0) or 0
usage['last_prompt_tokens'] = getattr(_cc, 'last_prompt_tokens', 0) or 0
# Fallback: when the compressor is absent or reports context_length=0,
# resolve the model's context window from metadata so the UI indicator
# shows the correct percentage rather than overflowing against the 128K
# JS default. Mirrors the session-save fallback above (lines ~2205-2217).
if not usage.get('context_length'):
try:
from agent.model_metadata import get_model_context_length as _get_cl
_fb_cl = _get_cl(
getattr(agent, 'model', resolved_model or '') or '',
getattr(agent, 'base_url', '') or '',
)
if _fb_cl:
usage['context_length'] = _fb_cl
except Exception:
pass
# Fallback: when last_prompt_tokens is missing (no compressor), use the
# session-persisted value rather than letting the frontend fall back to
# the cumulative input_tokens counter, which overflows for long sessions.
if not usage.get('last_prompt_tokens'):
_sess_lpt = getattr(s, 'last_prompt_tokens', 0) or 0
if _sess_lpt:
usage['last_prompt_tokens'] = _sess_lpt
# (reasoning trace already attached + saved above, before s.save())
# Leftover-steer delivery: if a /steer was queued (via
# api/chat/steer) but the agent finished its turn before
+55 -4
View File
@@ -141,6 +141,10 @@ async function send(){
let uploaded=[];
try{uploaded=await uploadPendingFiles();}
catch(e){if(!text){setComposerStatus(`Upload error: ${e.message}`);return;}}
// Clear the uploading status now that upload is done — if we don't clear here
// it stays visible for the entire duration of the agent stream, since
// setComposerStatus('') is only called in setBusy(false), not setBusy(true).
setComposerStatus('');
const uploadedNames=uploaded.map(u=>u.name||u);
const uploadedPaths=uploaded.map(u=>u&&u.is_image?(u.name||u.filename||u):(u.path||u.name||u));
@@ -1668,10 +1672,56 @@ async function respondClarify(response) {
} catch(e) { setStatus(t("clarify_responding") + " " + e.message); }
}
var _clarifyEventSource = null;
var _clarifyFallbackTimer = null;
var _clarifyHealthTimer = null;
function startClarifyPolling(sid) {
stopClarifyPolling();
_clarifyMissingEndpointWarned = false;
_clarifyPollTimer = setInterval(async () => {
// SSE primary path: long-lived connection pushes events instantly.
try {
_clarifyEventSource = new EventSource('/api/clarify/stream?session_id=' + encodeURIComponent(sid));
} catch(e) {
_startClarifyFallbackPoll(sid);
return;
}
_clarifyEventSource.addEventListener('initial', function(ev) {
try {
var d = JSON.parse(ev.data);
if (d.pending) { d.pending._session_id = sid; showClarifyCard(d.pending); }
else { hideClarifyCard(false, 'expired'); }
} catch(e) {}
});
_clarifyEventSource.addEventListener('clarify', function(ev) {
try {
var d = JSON.parse(ev.data);
if (d.pending) { d.pending._session_id = sid; showClarifyCard(d.pending); }
else { hideClarifyCard(false, 'expired'); }
} catch(e) {}
});
_clarifyEventSource.onerror = function() {
stopClarifyPolling();
_startClarifyFallbackPoll(sid);
};
// Health timer: if no event received in 60s, reconnect.
_clarifyHealthTimer = setInterval(function() {
if (_clarifyEventSource) {
try { _clarifyEventSource.close(); } catch(_){}
_clarifyEventSource = null;
}
clearInterval(_clarifyHealthTimer); _clarifyHealthTimer = null;
startClarifyPolling(sid);
}, 60000);
}
function _startClarifyFallbackPoll(sid) {
_clarifyFallbackTimer = setInterval(async () => {
if (!S.session || S.session.session_id !== sid) {
stopClarifyPolling(); hideClarifyCard(true, 'session'); return;
}
@@ -1689,13 +1739,14 @@ function startClarifyPolling(sid) {
}
stopClarifyPolling();
}
// Ignore transient poll errors; SSE clarify event still provides a fast path.
}
}, 1500);
}, 3000);
}
function stopClarifyPolling() {
if (_clarifyPollTimer) { clearInterval(_clarifyPollTimer); _clarifyPollTimer = null; }
if (_clarifyEventSource) { try { _clarifyEventSource.close(); } catch(_){} _clarifyEventSource = null; }
if (_clarifyFallbackTimer) { clearInterval(_clarifyFallbackTimer); _clarifyFallbackTimer = null; }
if (_clarifyHealthTimer) { clearInterval(_clarifyHealthTimer); _clarifyHealthTimer = null; }
}
// ── Notifications and Sound ──────────────────────────────────────────────────
+40 -1
View File
@@ -1211,6 +1211,28 @@ function _sessionTimeBucketLabel(timestampMs, nowMs) {
return t('session_time_bucket_older');
}
function _sessionLineageKey(s){
if(!s||!s.session_id) return null;
return s._lineage_root_id || s.lineage_root_id || s.parent_session_id || null;
}
function _collapseSessionLineageForSidebar(sessions){
const result=[];
const groups=new Map();
for(const s of sessions||[]){
const key=_sessionLineageKey(s);
if(!key){result.push(s);continue;}
if(!groups.has(key)) groups.set(key,[]);
groups.get(key).push(s);
}
for(const items of groups.values()){
if(items.length<=1){result.push(items[0]);continue;}
const chosen=[...items].sort((a,b)=>_sessionTimestampMs(b)-_sessionTimestampMs(a))[0];
result.push({...chosen,_lineage_collapsed_count:items.length});
}
return result;
}
function renderSessionListFromCache(){
// Don't re-render while user is actively renaming a session (would destroy the input)
if(_renamingSid) return;
@@ -1232,7 +1254,8 @@ function renderSessionListFromCache(){
// Filter by active project
const projectFiltered=_activeProject?profileFiltered.filter(s=>s.project_id===_activeProject):profileFiltered;
// Filter archived unless toggle is on
const sessions=_showArchived?projectFiltered:projectFiltered.filter(s=>!s.archived);
const sessionsRaw=_showArchived?projectFiltered:projectFiltered.filter(s=>!s.archived);
const sessions=_collapseSessionLineageForSidebar(sessionsRaw);
const archivedCount=projectFiltered.filter(s=>s.archived).length;
const list=$('sessionList');list.innerHTML='';
// Batch select bar (when in select mode)
@@ -1605,6 +1628,22 @@ function renderSessionListFromCache(){
}
}
async function _handleActiveSessionStorageEvent(e){
if(!e || e.key !== 'hermes-webui-session') return;
const sid = e.newValue;
if(!sid || (S.session && S.session.session_id === sid)) return;
if(S.busy){
if(typeof showToast==='function') showToast('Active session changed in another tab. Finish the current turn before switching.',3000);
return;
}
await loadSession(sid);
renderSessionListFromCache();
}
if(typeof window!=='undefined'){
window.addEventListener('storage', (e) => { void _handleActiveSessionStorageEvent(e); });
}
async function deleteSession(sid){
const ok=await showConfirmDialog({
message:'Delete this conversation?',
+4 -2
View File
@@ -864,7 +864,9 @@ function _syncCtxIndicator(usage){
}
if(wrap) wrap.style.display='';
const hasPromptTok=!!promptTok;
const pct=hasPromptTok?Math.min(100,Math.round((promptTok/ctxWindow)*100)):0;
const rawPct=hasPromptTok?Math.round((promptTok/ctxWindow)*100):0;
const pct=Math.min(100,rawPct);
const overflowed=rawPct>100;
const ring=$('ctxRingValue');
const center=$('ctxPercent');
const usageLine=$('ctxTooltipUsage');
@@ -908,7 +910,7 @@ function _syncCtxIndicator(usage){
if(!hasExplicitCtx&&hasPromptTok) label+=' (est. 128K)';
if(cost) label+=` \u00b7 $${cost<0.01?cost.toFixed(4):cost.toFixed(2)}`;
el.setAttribute('aria-label',label);
if(usageLine) usageLine.textContent=hasPromptTok?`${pct}% used (${Math.max(0,100-pct)}% left)`:`${_fmtTokens(totalTok)} tokens used`;
if(usageLine) usageLine.textContent=hasPromptTok?(overflowed?`${rawPct}% used (context exceeded)`:`${pct}% used (${100-pct}% left)`):`${_fmtTokens(totalTok)} tokens used`;
if(tokensLine) tokensLine.textContent=hasPromptTok?`${_fmtTokens(promptTok)} / ${_fmtTokens(ctxWindow)} tokens used`:`In: ${_fmtTokens(usage.input_tokens||0)} \u00b7 Out: ${_fmtTokens(usage.output_tokens||0)}`;
const threshold=usage.threshold_tokens||0;
if(thresholdLine){
+238
View File
@@ -0,0 +1,238 @@
"""Tests for clarify SSE long-connection (mirrors test_approval_sse.py structure).
Covers:
- Static analysis of backend and frontend code
- Unit tests for subscribe/unsubscribe/notify lifecycle
- Concurrency safety of subscriber registry
"""
import os
import queue
import threading
import textwrap
import pytest
# ── Paths ────────────────────────────────────────────────────────────────────
_ROUTES = os.path.join(os.path.dirname(__file__), "..", "api", "routes.py")
_CLARIFY = os.path.join(os.path.dirname(__file__), "..", "api", "clarify.py")
_MESSAGES = os.path.join(os.path.dirname(__file__), "..", "static", "messages.js")
def _read(path):
with open(path) as f:
return f.read()
# ══════════════════════════════════════════════════════════════════════════════
# 1. Static analysis — verify code structure without importing the server
# ══════════════════════════════════════════════════════════════════════════════
@pytest.mark.parametrize("marker", [
"_clarify_sse_subscribers",
"def sse_subscribe",
"def sse_unsubscribe",
"_clarify_sse_notify",
])
class TestClarifySSEBackendMarkers:
def test_clarify_module_has_marker(self, marker):
src = _read(_CLARIFY)
assert marker in src, f"api/clarify.py missing: {marker}"
class TestClarifySSEBackendCode:
def test_submit_pending_calls_notify(self):
src = _read(_CLARIFY)
assert "_clarify_sse_notify(" in src, (
"submit_pending should call _clarify_sse_notify inside _lock"
)
def test_resolve_clarify_calls_notify(self):
src = _read(_CLARIFY)
# Count occurrences — resolve should notify on both branches
assert src.count("_clarify_sse_notify(") >= 2, (
"resolve_clarify should call _clarify_sse_notify for both queue-has-more and empty cases"
)
class TestClarifySSERoutesCode:
def test_route_registered(self):
src = _read(_ROUTES)
assert '"/api/clarify/stream"' in src, "Missing /api/clarify/stream route"
def test_handler_function_exists(self):
src = _read(_ROUTES)
assert "def _handle_clarify_sse_stream(" in src
def test_imports_sse_subscribe(self):
src = _read(_ROUTES)
assert "clarify_sse_subscribe" in src
def test_imports_sse_unsubscribe(self):
src = _read(_ROUTES)
assert "clarify_sse_unsubscribe" in src
class TestClarifySSEFrontendCode:
@pytest.fixture(autouse=True)
def _load_js(self):
self.js = _read(_MESSAGES)
def test_uses_event_source(self):
assert "new EventSource" in self.js
assert "/api/clarify/stream" in self.js
def test_frontend_listens_initial_event(self):
assert "'initial'" in self.js or '"initial"' in self.js
def test_frontend_listens_clarify_event(self):
assert "'clarify'" in self.js or '"clarify"' in self.js
def test_frontend_has_fallback_poll(self):
assert "_startClarifyFallbackPoll" in self.js or "clarifyFallbackTimer" in self.js
def test_frontend_fallback_interval_3s(self):
# Fallback poll interval should be 3000ms
assert "3000" in self.js
def test_frontend_stop_closes_event_source(self):
assert "_clarifyEventSource" in self.js
assert ".close()" in self.js
def test_frontend_has_health_timer(self):
assert "_clarifyHealthTimer" in self.js
# ══════════════════════════════════════════════════════════════════════════════
# 2. Unit tests — import clarify module directly
# ══════════════════════════════════════════════════════════════════════════════
@pytest.fixture()
def clarify_mod():
"""Import api.clarify fresh (module-level state is shared)."""
from api import clarify
return clarify
@pytest.fixture(autouse=True)
def _cleanup_subscribers(clarify_mod):
"""Clear SSE subscribers between tests to avoid leakage."""
yield
clarify_mod._clarify_sse_subscribers.clear()
class TestClarifySSEUnit:
def test_subscribe_returns_queue(self, clarify_mod):
q = clarify_mod.sse_subscribe("s1")
assert isinstance(q, queue.Queue)
assert q.maxsize == 16
def test_unsubscribe_removes_queue(self, clarify_mod):
q = clarify_mod.sse_subscribe("s1")
clarify_mod.sse_unsubscribe("s1", q)
assert "s1" not in clarify_mod._clarify_sse_subscribers
def test_unsubscribe_cleans_empty_session(self, clarify_mod):
q = clarify_mod.sse_subscribe("s1")
clarify_mod.sse_unsubscribe("s1", q)
assert "s1" not in clarify_mod._clarify_sse_subscribers
def test_unsubscribe_unknown_queue_no_error(self, clarify_mod):
q = queue.Queue()
clarify_mod.sse_unsubscribe("s1", q) # should not raise
def test_multiple_subscribers_same_session(self, clarify_mod):
q1 = clarify_mod.sse_subscribe("s1")
q2 = clarify_mod.sse_subscribe("s1")
assert len(clarify_mod._clarify_sse_subscribers["s1"]) == 2
clarify_mod.sse_unsubscribe("s1", q1)
assert len(clarify_mod._clarify_sse_subscribers["s1"]) == 1
def test_notify_delivers_to_all_subscribers(self, clarify_mod):
q1 = clarify_mod.sse_subscribe("s1")
q2 = clarify_mod.sse_subscribe("s1")
clarify_mod._clarify_sse_notify("s1", {"question": "test?"}, 1)
assert q1.get(timeout=1)["pending"]["question"] == "test?"
assert q2.get(timeout=1)["pending"]["question"] == "test?"
def test_cross_session_isolation(self, clarify_mod):
q1 = clarify_mod.sse_subscribe("s1")
q2 = clarify_mod.sse_subscribe("s2")
clarify_mod._clarify_sse_notify("s1", {"question": "q1"}, 1)
assert q1.get(timeout=1)["pending"]["question"] == "q1"
assert q2.empty()
def test_queue_overflow_drops_silently(self, clarify_mod):
q = clarify_mod.sse_subscribe("s1")
for i in range(20): # maxsize=16
clarify_mod._clarify_sse_notify("s1", {"q": i}, i + 1)
# Should not raise; some messages dropped
count = 0
while not q.empty():
q.get_nowait()
count += 1
assert count <= 16
def test_submit_pending_triggers_notify(self, clarify_mod):
q = clarify_mod.sse_subscribe("s1")
clarify_mod.submit_pending("s1", {"question": "hello?", "choices_offered": []})
payload = q.get(timeout=1)
assert payload["pending"] is not None
assert payload["pending"]["question"] == "hello?"
assert payload["pending_count"] == 1
def test_unsubscribe_mid_notify_safe(self, clarify_mod):
q1 = clarify_mod.sse_subscribe("s1")
q2 = clarify_mod.sse_subscribe("s1")
clarify_mod.sse_unsubscribe("s1", q2)
clarify_mod._clarify_sse_notify("s1", {"question": "safe?"}, 1)
assert q1.get(timeout=1)["pending"]["question"] == "safe?"
# ══════════════════════════════════════════════════════════════════════════════
# 3. Concurrency tests
# ══════════════════════════════════════════════════════════════════════════════
class TestClarifySSEConcurrency:
def test_concurrent_subscribe_unsubscribe(self, clarify_mod):
errors = []
def worker():
try:
for _ in range(50):
q = clarify_mod.sse_subscribe("s1")
clarify_mod.sse_unsubscribe("s1", q)
except Exception as e:
errors.append(e)
threads = [threading.Thread(target=worker) for _ in range(4)]
for t in threads:
t.start()
for t in threads:
t.join(timeout=5)
assert not errors
def test_concurrent_notify_and_subscribe(self, clarify_mod):
received = []
lock = threading.Lock()
q = clarify_mod.sse_subscribe("s1")
def notifier():
for i in range(20):
clarify_mod._clarify_sse_notify("s1", {"q": i}, i + 1)
def subscriber():
for _ in range(20):
q2 = clarify_mod.sse_subscribe("s1")
clarify_mod.sse_unsubscribe("s1", q2)
t1 = threading.Thread(target=notifier)
t2 = threading.Thread(target=subscriber)
t1.start()
t2.start()
t1.join(timeout=5)
t2.join(timeout=5)
# Just verify no crash — some events may have been dropped
while not q.empty():
payload = q.get_nowait()
with lock:
received.append(payload)
# Should have received at least some events
assert len(received) > 0
+27
View File
@@ -724,6 +724,33 @@ def test_gateway_watcher_uses_normalized_source_metadata(monkeypatch):
pass
def test_imported_cli_session_metadata_survives_compact(cleanup_test_sessions):
"""Imported agent sessions should remain distinguishable in compact sidebar payloads."""
from api.models import Session
sid = 'gw_imported_metadata_001'
cleanup_test_sessions.append(sid)
s = Session(
session_id=sid,
title='Imported Telegram Chat',
messages=[{'role': 'user', 'content': 'hello from telegram', 'timestamp': time.time()}],
model='openai/gpt-5',
)
s.is_cli_session = True
s.source_tag = 'telegram'
s.session_source = 'messaging'
s.source_label = 'Telegram'
s.save(touch_updated_at=False)
loaded = Session.load_metadata_only(sid)
compact = loaded.compact()
assert compact['is_cli_session'] is True
assert compact['source_tag'] == 'telegram'
assert compact['session_source'] == 'messaging'
assert compact['source_label'] == 'Telegram'
def test_imported_cron_sessions_hidden_from_sidebar_by_default(cleanup_test_sessions):
"""Cron sessions already imported into the WebUI store should stay hidden from the sidebar."""
from api.models import Session
@@ -0,0 +1,146 @@
"""Regression test: _handle_clarify_sse_stream must not deadlock on its own lock.
The naive implementation called clarify.get_pending(sid) inside a
`with _clarify_lock:` block, but get_pending also acquires _lock. Because
clarify._lock is a non-reentrant threading.Lock(), the second acquisition
would deadlock the SSE handler thread the moment any client connected to
/api/clarify/stream.
This test runs the inlined snapshot logic under the lock and verifies it
completes both with an empty queue and with a pending entry within a
short timeout. If the regression returns (someone re-introduces the
recursive get_pending() call), this test will hang and the timeout will
fire.
"""
from __future__ import annotations
import pathlib
import queue
import sys
import threading
import time
REPO_ROOT = pathlib.Path(__file__).parent.parent.resolve()
sys.path.insert(0, str(REPO_ROOT))
def _run_handler_snapshot(sid: str, timeout: float = 2.0):
"""Replicate the snapshot logic from _handle_clarify_sse_stream and
return ``(initial_pending, initial_count)`` or raise on deadlock."""
from api.clarify import (
_lock as _clarify_lock,
_clarify_sse_subscribers as _clarify_subs,
_gateway_queues as _clarify_gateway_queues,
_pending as _clarify_pending,
)
result: list = []
def worker():
try:
q: queue.Queue = queue.Queue(maxsize=16)
initial_pending = None
initial_count = 0
with _clarify_lock:
_clarify_subs.setdefault(sid, []).append(q)
gw_q = _clarify_gateway_queues.get(sid) or []
if gw_q:
initial_pending = dict(gw_q[0].data)
initial_count = len(gw_q)
else:
legacy = _clarify_pending.get(sid)
if legacy:
initial_pending = dict(legacy)
initial_count = 1
# Cleanup the subscriber so we don't leak between tests.
with _clarify_lock:
subs = _clarify_subs.get(sid)
if subs and q in subs:
subs.remove(q)
if not subs:
_clarify_subs.pop(sid, None)
result.append((initial_pending, initial_count))
except BaseException as exc: # noqa: BLE001
result.append(exc)
t = threading.Thread(target=worker, daemon=True)
t.start()
t.join(timeout=timeout)
if t.is_alive():
raise AssertionError(
f"_handle_clarify_sse_stream snapshot deadlocked (>{timeout}s). "
"Did someone re-introduce a recursive _lock acquisition? "
"The handler must NOT call clarify.get_pending() — read "
"_gateway_queues / _pending inline under the same _lock."
)
if isinstance(result[0], BaseException):
raise result[0]
return result[0]
def test_handler_snapshot_does_not_deadlock_when_queue_is_empty():
sid = f"clarify-sse-empty-{time.time_ns()}"
initial_pending, initial_count = _run_handler_snapshot(sid)
assert initial_pending is None
assert initial_count == 0
def test_handler_snapshot_does_not_deadlock_when_queue_has_entry():
"""With a real pending entry, the snapshot must capture it without deadlock."""
from api import clarify
sid = f"clarify-sse-populated-{time.time_ns()}"
try:
clarify.submit_pending(sid, {
"question": "Pick one?",
"choices_offered": ["a", "b"],
})
initial_pending, initial_count = _run_handler_snapshot(sid)
assert initial_count == 1
assert initial_pending is not None
assert initial_pending.get("question") == "Pick one?"
assert initial_pending.get("choices_offered") == ["a", "b"]
finally:
clarify.resolve_clarify(sid, "a")
def test_routes_handler_does_not_call_get_pending_under_lock():
"""Source-level invariant: routes.py must not call get_clarify_pending()
inside the `with _clarify_lock:` block that would re-acquire _lock and
deadlock."""
src = (REPO_ROOT / "api" / "routes.py").read_text(encoding="utf-8")
# Find _handle_clarify_sse_stream
start = src.find("def _handle_clarify_sse_stream(")
assert start != -1, "_handle_clarify_sse_stream must exist"
end = src.find("\ndef ", start + 1)
body = src[start:end if end != -1 else len(src)]
# Find the lock block
lock_start = body.find("with _clarify_lock:")
assert lock_start != -1, "Handler must use `with _clarify_lock:`"
# Walk forward by indentation to find the end of the lock block
lines = body[lock_start:].split("\n")
lock_block_lines = [lines[0]] # The `with` line itself
base_indent = None
for line in lines[1:]:
if not line.strip():
lock_block_lines.append(line)
continue
indent = len(line) - len(line.lstrip(" "))
if base_indent is None:
base_indent = indent
if indent < base_indent and line.strip():
break
lock_block_lines.append(line)
lock_body = "\n".join(lock_block_lines)
assert "get_clarify_pending(" not in lock_body, (
"_handle_clarify_sse_stream must not call get_clarify_pending() "
"inside `with _clarify_lock:` — it acquires _lock internally and "
"would deadlock the handler thread (non-reentrant Lock)."
)
assert "clarify.get_pending(" not in lock_body, (
"Same — clarify.get_pending() acquires _lock internally."
)
+16
View File
@@ -0,0 +1,16 @@
"""Regression tests for cross-tab active session synchronization."""
from pathlib import Path
REPO_ROOT = Path(__file__).parent.parent.resolve()
SESSIONS_JS = (REPO_ROOT / "static" / "sessions.js").read_text(encoding="utf-8")
def test_sessions_js_listens_for_active_session_storage_changes():
assert "addEventListener('storage'" in SESSIONS_JS or 'addEventListener("storage"' in SESSIONS_JS
assert "hermes-webui-session" in SESSIONS_JS
assert "_handleActiveSessionStorageEvent" in SESSIONS_JS
def test_storage_sync_does_not_switch_while_busy():
marker = "if(S.busy)"
assert marker in SESSIONS_JS, "cross-tab storage sync must not switch sessions during an active turn"
+60
View File
@@ -0,0 +1,60 @@
"""Regression tests for sidebar lineage collapse helpers."""
import shutil
import subprocess
from pathlib import Path
import pytest
REPO_ROOT = Path(__file__).parent.parent.resolve()
SESSIONS_JS_PATH = REPO_ROOT / "static" / "sessions.js"
NODE = shutil.which("node")
pytestmark = pytest.mark.skipif(NODE is None, reason="node not on PATH")
def _run_node(source: str) -> str:
result = subprocess.run(
[NODE, "-e", source],
cwd=str(REPO_ROOT),
capture_output=True,
text=True,
timeout=10,
)
if result.returncode != 0:
raise RuntimeError(result.stderr)
return result.stdout.strip()
def test_sidebar_lineage_collapse_keeps_latest_tip_and_counts_segments():
js = SESSIONS_JS_PATH.read_text(encoding="utf-8")
source = f"""
const src = {js!r};
function extractFunc(name) {{
const re = new RegExp('function\\\\s+' + name + '\\\\s*\\\\(');
const start = src.search(re);
if (start < 0) throw new Error(name + ' not found');
let i = src.indexOf('{{', start);
let depth = 1; i++;
while (depth > 0 && i < src.length) {{
if (src[i] === '{{') depth++;
else if (src[i] === '}}') depth--;
i++;
}}
return src.slice(start, i);
}}
eval(extractFunc('_sessionTimestampMs'));
eval(extractFunc('_sessionLineageKey'));
eval(extractFunc('_collapseSessionLineageForSidebar'));
const sessions = [
{{session_id:'root', title:'Hermes WebUI', message_count:10, updated_at:10, last_message_at:10, _lineage_root_id:'root', _lineage_tip_id:'root'}},
{{session_id:'tip', title:'Hermes WebUI', message_count:20, updated_at:20, last_message_at:20, _lineage_root_id:'root', _lineage_tip_id:'tip'}},
{{session_id:'solo', title:'Other', message_count:5, updated_at:15, last_message_at:15}},
];
const collapsed = _collapseSessionLineageForSidebar(sessions);
console.log(JSON.stringify(collapsed));
"""
collapsed = _run_node(source)
assert '"session_id":"tip"' in collapsed
assert '"session_id":"root"' not in collapsed
assert '"_lineage_collapsed_count":2' in collapsed
assert '"session_id":"solo"' in collapsed