From 5f9b9c02b29001578b40ebb2a62c2d419b04a4b7 Mon Sep 17 00:00:00 2001 From: Frank Song Date: Fri, 15 May 2026 08:22:44 +0800 Subject: [PATCH] Fix WebUI stream completion recovery gaps --- CHANGELOG.md | 6 + api/streaming.py | 103 +++++++++++++++++- static/messages.js | 18 +++ static/ui.js | 17 +++ ...issue2262_compression_marker_timeout_ui.py | 39 +++++++ tests/test_notify_on_complete_webui.py | 30 +++++ tests/test_profile_terminal_env.py | 4 + 7 files changed, 216 insertions(+), 1 deletion(-) create mode 100644 tests/test_issue2262_compression_marker_timeout_ui.py create mode 100644 tests/test_notify_on_complete_webui.py diff --git a/CHANGELOG.md b/CHANGELOG.md index 328683ef..7d7e1e4c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,12 @@ - **PR #2165** by @starship-s — Pooled OpenAI Codex quota status surfaced in the Providers panel. Pre-fix, the Providers page presented Codex quota as if there were only one credential/account state, which was misleading when users authenticate through a credential pool with several usable credentials, temporarily exhausted credentials, failed probes, and different reset windows. Now the active provider quota card includes a credential-pool summary (available / exhausted / failed / checked counts), displays the best currently-available pool windows in the collapsed view as "Best of N", and exposes per-credential detail behind an expandable section. Exhausted credentials are intentionally NOT re-probed while their cooldown is active (matches credential-pool selection behavior, avoids generating failed quota calls from a status page). Manual refresh still means "probe now" but transient refresh failures preserve the last known-good snapshot. JWT decode (`_decode_jwt_claims_unverified`) is used only for token-shape classification (Codex OAuth JWT vs raw OpenAI API key), explicitly NOT for authorization — documented in the function docstring. Per-row plan labels only shown when verified account-limit data is available. Concurrent probing capped at `min(_CODEX_POOL_MAX_WORKERS=6, len(probe_items))` so page render time stays bounded on large pools. Transient `None` probe results are NOT cached (only known unavailable/exhausted states are cached); 32-test regression suite covering pool snapshot, concurrent probe, JWT detection, cache invalidation, transient-vs-known cache distinction, and i18n parity across all currently-supported locales. Scoped to OpenAI Codex (the only provider with the credential-pool/account-limit path needed to surface this accurately). +### Fixed + +- WebUI agent turns now inherit `HERMES_SESSION_PLATFORM=webui` and drain matching `notify_on_complete` background-process completions into the next model input. Completion events are filtered by the process session key before delivery, so another tab/session's background process output remains queued for its owner instead of being injected into the wrong conversation. + +- Marker-only preserved-task-list compression sentinels no longer render as standalone assistant responses after stream recovery or timeout paths. If the frontend receives only that internal marker as assistant content, it replaces it with an explicit "No response received after context compression" error and shows an error toast. + ## [v0.51.64] — 2026-05-14 — Release AN (stage-357 — 3-PR small batch — docker_init k8s whoami fallback + PWA manifest session routes (closes #2226) + aux title test coverage) ### Fixed diff --git a/api/streaming.py b/api/streaming.py index 04cba4db..b940e97e 100644 --- a/api/streaming.py +++ b/api/streaming.py @@ -583,11 +583,98 @@ def _build_agent_thread_env(profile_runtime_env: dict | None, workspace: str, se 'TERMINAL_CWD': str(workspace), 'HERMES_EXEC_ASK': '1', 'HERMES_SESSION_KEY': session_id, + 'HERMES_SESSION_ID': session_id, + 'HERMES_SESSION_PLATFORM': 'webui', 'HERMES_HOME': profile_home, }) return env +def _format_process_notification(evt: dict) -> str: + """Format a completed background process notification for agent input.""" + if not isinstance(evt, dict): + return '' + if evt.get('type') != 'completion': + return '' + _sid = evt.get('session_id', '') + _cmd = evt.get('command', '') + _exit = evt.get('exit_code', '') + _out = evt.get('output') or '' + if len(_out) > 4000: + _out = _out[:4000] + '\n... (truncated)' + return ( + f"[IMPORTANT: Background process {_sid} completed (exit code {_exit}).\n" + f"Command: {_cmd}\n" + f"Output:\n{_out}]" + ) + + +def _mark_process_completion_consumed(process_registry, process_id: str) -> None: + """Best-effort bridge to the agent registry's private completion marker.""" + try: + with process_registry._lock: + process_registry._completion_consumed.add(process_id) + except Exception: + logger.debug("Failed to mark process completion consumed", exc_info=True) + + +def _drain_webui_process_notifications(session_id: str) -> list[str]: + """Return completion notifications that belong to this WebUI session. + + The agent registry completion queue is process-wide and events do not carry + the WebUI session key directly. Look up the live process session before + delivery so completions from other tabs remain queued for their owners. + """ + if not session_id: + return [] + try: + from tools.process_registry import process_registry + except Exception: + return [] + + notifications: list[str] = [] + skipped_events: list[dict] = [] + completion_queue = getattr(process_registry, 'completion_queue', None) + if completion_queue is None: + return [] + + while True: + try: + evt = completion_queue.get_nowait() + except queue.Empty: + break + except Exception: + logger.debug("Failed to drain process completion queue", exc_info=True) + break + + evt_sid = str(evt.get('session_id') or '') if isinstance(evt, dict) else '' + if not evt_sid: + skipped_events.append(evt) + continue + try: + if process_registry.is_completion_consumed(evt_sid): + continue + proc = process_registry.get(evt_sid) + except Exception: + proc = None + if getattr(proc, 'session_key', None) != session_id: + skipped_events.append(evt) + continue + + notification = _format_process_notification(evt) + if notification: + notifications.append(notification) + _mark_process_completion_consumed(process_registry, evt_sid) + + for evt in skipped_events: + try: + completion_queue.put(evt) + except Exception: + logger.debug("Failed to requeue process completion event", exc_info=True) + break + return notifications + + def _attachment_name(att) -> str: if isinstance(att, dict): return str(att.get('name') or att.get('filename') or att.get('path') or '').strip() @@ -2254,6 +2341,8 @@ def _run_agent_streaming( old_cwd = None old_exec_ask = None old_session_key = None + old_session_id = None + old_session_platform = None old_hermes_home = None old_profile_env = {} @@ -2504,11 +2593,15 @@ def _run_agent_streaming( old_cwd = os.environ.get('TERMINAL_CWD') old_exec_ask = os.environ.get('HERMES_EXEC_ASK') old_session_key = os.environ.get('HERMES_SESSION_KEY') + old_session_id = os.environ.get('HERMES_SESSION_ID') + old_session_platform = os.environ.get('HERMES_SESSION_PLATFORM') old_hermes_home = os.environ.get('HERMES_HOME') os.environ.update(_profile_runtime_env) os.environ['TERMINAL_CWD'] = str(s.workspace) os.environ['HERMES_EXEC_ASK'] = '1' os.environ['HERMES_SESSION_KEY'] = session_id + os.environ['HERMES_SESSION_ID'] = session_id + os.environ['HERMES_SESSION_PLATFORM'] = 'webui' if _profile_home: os.environ['HERMES_HOME'] = _profile_home # Patch module-level caches to match the active profile. @@ -3260,7 +3353,11 @@ def _run_agent_streaming( ) _ckpt_thread.start() - user_message = _build_native_multimodal_message(workspace_ctx, msg_text, attachments, workspace, cfg=_cfg) + _process_notifications = _drain_webui_process_notifications(session_id) + _agent_msg_text = msg_text + if _process_notifications: + _agent_msg_text = "\n\n".join([*_process_notifications, msg_text]).strip() + user_message = _build_native_multimodal_message(workspace_ctx, _agent_msg_text, attachments, workspace, cfg=_cfg) result = agent.run_conversation( user_message=user_message, system_message=workspace_system_msg, @@ -4184,6 +4281,10 @@ def _run_agent_streaming( else: os.environ['HERMES_EXEC_ASK'] = old_exec_ask if old_session_key is None: os.environ.pop('HERMES_SESSION_KEY', None) else: os.environ['HERMES_SESSION_KEY'] = old_session_key + if old_session_id is None: os.environ.pop('HERMES_SESSION_ID', None) + else: os.environ['HERMES_SESSION_ID'] = old_session_id + if old_session_platform is None: os.environ.pop('HERMES_SESSION_PLATFORM', None) + else: os.environ['HERMES_SESSION_PLATFORM'] = old_session_platform if old_hermes_home is None: os.environ.pop('HERMES_HOME', None) else: os.environ['HERMES_HOME'] = old_hermes_home diff --git a/static/messages.js b/static/messages.js index 66a3c90d..8f57d13a 100644 --- a/static/messages.js +++ b/static/messages.js @@ -481,6 +481,20 @@ function attachLiveStream(activeSid, streamId, uploaded=[], options={}){ clearInflightState(activeSid); _clearActivePaneInflightIfOwner(); } + function _isMarkerOnlyAssistantMessage(m){ + if(!m||m.role!=='assistant') return false; + const text=String(typeof msgContent==='function'?msgContent(m):(m.content||'')); + return typeof _isPreservedCompressionTaskListMarkerOnlyText==='function' + && _isPreservedCompressionTaskListMarkerOnlyText(text); + } + function _replaceMarkerOnlyAssistantWithStreamError(messages){ + if(!Array.isArray(messages)) return false; + const msg=[...messages].reverse().find(m=>m&&m.role==='assistant'); + if(!_isMarkerOnlyAssistantMessage(msg)) return false; + msg.content='**Error:** No response received after context compression. Please retry.'; + msg.provider_details='The only assistant text returned for this turn was the internal preserved-task-list compression marker, so the WebUI replaced it with an explicit error instead of rendering the marker as a model response.'; + return true; + } function _setActivePaneIdleIfOwner(){ if(_isActiveSession()||!S.session||!INFLIGHT[S.session.session_id]){ setBusy(false); @@ -1358,6 +1372,7 @@ function attachLiveStream(activeSid, streamId, uploaded=[], options={}){ localStorage.setItem('hermes-webui-session',S.session.session_id); if(typeof _setActiveSessionUrl==='function') _setActiveSessionUrl(S.session.session_id); } + const _markerOnlyAssistantError=_replaceMarkerOnlyAssistantWithStreamError(S.messages); if( window._compressionUi&&window._compressionUi.automatic&& window._compressionUi.sessionId===activeSid&& @@ -1429,6 +1444,7 @@ function attachLiveStream(activeSid, streamId, uploaded=[], options={}){ S.busy=false; // No-reply guard (#373): if agent returned nothing, show inline error if(!S.messages.some(m=>m.role==='assistant'&&String(m.content||'').trim())&&!assistantText){removeThinking();S.messages.push({role:'assistant',content:'**No response received.** Check your API key and model selection.'});} + if(_markerOnlyAssistantError&&typeof showToast==='function') showToast('No response received after context compression. Please retry.',5000,'error'); if(isSessionViewed) _markSessionViewed(completedSid, completedSession.message_count ?? S.messages.length); syncTopbar();renderMessages({preserveScroll:true}); if(shouldFollowOnDone&&typeof scrollToBottom==='function') scrollToBottom(); @@ -1713,6 +1729,8 @@ function attachLiveStream(activeSid, streamId, uploaded=[], options={}){ localStorage.setItem('hermes-webui-session',S.session.session_id); if(typeof _setActiveSessionUrl==='function') _setActiveSessionUrl(S.session.session_id); } + const _markerOnlyAssistantError=_replaceMarkerOnlyAssistantWithStreamError(S.messages); + if(_markerOnlyAssistantError&&typeof showToast==='function') showToast('No response received after context compression. Please retry.',5000,'error'); const hasMessageToolMetadata=S.messages.some(m=>{ if(!m||m.role!=='assistant') return false; // Recognize both the standard `tool_calls` (used by completed assistant diff --git a/static/ui.js b/static/ui.js index f270909f..af7fff59 100644 --- a/static/ui.js +++ b/static/ui.js @@ -4755,11 +4755,25 @@ function _isContextCompactionMessage(m){ const text=msgContent(m)||String(m.content||''); return /^\s*\[context compaction/i.test(text) || /^\s*context compaction/i.test(text); } +function _isPreservedCompressionTaskListMarkerText(text){ + return /^\s*\[your active task list was preserved across context compression\]/i.test(String(text||'')); +} +function _isPreservedCompressionTaskListMarkerOnlyText(text){ + return _isPreservedCompressionTaskListMarkerText(text) + && !String(text||'') + .replace(/^\s*\[your active task list was preserved across context compression\]\s*/i,'') + .trim(); +} function _isPreservedCompressionTaskListMessage(m){ if(!m||m.role!=='user') return false; const text=msgContent(m)||String(m.content||''); return /^\s*\[your active task list was preserved across context compression\]/i.test(text); } +function _isMarkerOnlyAssistantCompressionMessage(m){ + if(!m||m.role!=='assistant') return false; + const text=msgContent(m)||String(m.content||''); + return _isPreservedCompressionTaskListMarkerOnlyText(text); +} function _preservedCompressionTaskListPreview(text){ const body=String(text||'') .replace(/^\s*\[your active task list was preserved across context compression\]\s*/i,'') @@ -5338,6 +5352,9 @@ function renderMessages(options){ } } const isUser=m.role==='user'; + if(!isUser&&_isMarkerOnlyAssistantCompressionMessage(m)){ + content='**Error:** No response received after context compression. Please retry.'; + } const displayContent=isUser?_stripWorkspaceDisplayPrefix(content):content; const isLastAssistant=!isUser&&vi===renderVisWithIdx.length-1; let filesHtml=''; diff --git a/tests/test_issue2262_compression_marker_timeout_ui.py b/tests/test_issue2262_compression_marker_timeout_ui.py new file mode 100644 index 00000000..780370d9 --- /dev/null +++ b/tests/test_issue2262_compression_marker_timeout_ui.py @@ -0,0 +1,39 @@ +from pathlib import Path + + +def _read(path: str) -> str: + return Path(path).read_text(encoding="utf-8") + + +def test_preserved_task_list_marker_only_helper_is_strict(): + src = _read("static/ui.js") + + assert "function _isPreservedCompressionTaskListMarkerOnlyText" in src + start = src.find("function _isPreservedCompressionTaskListMarkerOnlyText") + end = src.find("function _isPreservedCompressionTaskListMessage", start) + helper = src[start:end] + + assert "_isPreservedCompressionTaskListMarkerText(text)" in helper + assert ".replace(/^\\s*\\[your active task list was preserved across context compression\\]" in helper + assert ".trim()" in helper + + +def test_marker_only_assistant_message_renders_as_error_not_model_text(): + src = _read("static/ui.js") + + assert "function _isMarkerOnlyAssistantCompressionMessage" in src + assert "m.role!=='assistant'" in src + assert "_isPreservedCompressionTaskListMarkerOnlyText(text)" in src + assert "if(!isUser&&_isMarkerOnlyAssistantCompressionMessage(m))" in src + assert "content='**Error:** No response received after context compression. Please retry.'" in src + + +def test_done_and_restore_replace_marker_only_assistant_with_error_toast(): + src = _read("static/messages.js") + + assert "function _replaceMarkerOnlyAssistantWithStreamError(messages)" in src + assert "_isMarkerOnlyAssistantMessage(msg)" in src + assert "msg.content='**Error:** No response received after context compression. Please retry.'" in src + assert "internal preserved-task-list compression marker" in src + assert "_markerOnlyAssistantError=_replaceMarkerOnlyAssistantWithStreamError(S.messages)" in src + assert "showToast('No response received after context compression. Please retry.',5000,'error')" in src diff --git a/tests/test_notify_on_complete_webui.py b/tests/test_notify_on_complete_webui.py new file mode 100644 index 00000000..8b8e88b6 --- /dev/null +++ b/tests/test_notify_on_complete_webui.py @@ -0,0 +1,30 @@ +from pathlib import Path + + +def test_webui_drains_only_matching_background_completion_events(): + src = Path("api/streaming.py").read_text(encoding="utf-8") + + assert "def _drain_webui_process_notifications(session_id: str)" in src + assert "from tools.process_registry import process_registry" in src + assert "proc = process_registry.get(evt_sid)" in src + assert "getattr(proc, 'session_key', None) != session_id" in src + assert "skipped_events.append(evt)" in src + assert "completion_queue.put(evt)" in src + + +def test_webui_injects_process_notifications_without_persisting_them_as_user_text(): + src = Path("api/streaming.py").read_text(encoding="utf-8") + + assert "_process_notifications = _drain_webui_process_notifications(session_id)" in src + assert "[*_process_notifications, msg_text]" in src + assert "_build_native_multimodal_message(workspace_ctx, _agent_msg_text" in src + assert "persist_user_message=msg_text" in src + + +def test_webui_sets_gateway_session_platform_for_background_watchers(): + src = Path("api/streaming.py").read_text(encoding="utf-8") + + assert "'HERMES_SESSION_PLATFORM': 'webui'" in src + assert "os.environ['HERMES_SESSION_PLATFORM'] = 'webui'" in src + assert "old_session_platform = os.environ.get('HERMES_SESSION_PLATFORM')" in src + assert "os.environ.pop('HERMES_SESSION_PLATFORM', None)" in src diff --git a/tests/test_profile_terminal_env.py b/tests/test_profile_terminal_env.py index ebf72596..1614d6b6 100644 --- a/tests/test_profile_terminal_env.py +++ b/tests/test_profile_terminal_env.py @@ -78,6 +78,8 @@ def test_streaming_thread_env_allows_profile_terminal_cwd_override(): "TERMINAL_CWD": "/profile/config/cwd", "HERMES_EXEC_ASK": "0", "HERMES_SESSION_KEY": "old-session", + "HERMES_SESSION_ID": "old-session", + "HERMES_SESSION_PLATFORM": "cli", "HERMES_HOME": "/old/profile/home", "TERMINAL_ENV": "ssh", }, @@ -89,5 +91,7 @@ def test_streaming_thread_env_allows_profile_terminal_cwd_override(): assert env["TERMINAL_CWD"] == "/active/workspace" assert env["HERMES_EXEC_ASK"] == "1" assert env["HERMES_SESSION_KEY"] == "active-session" + assert env["HERMES_SESSION_ID"] == "active-session" + assert env["HERMES_SESSION_PLATFORM"] == "webui" assert env["HERMES_HOME"] == "/active/profile/home" assert env["TERMINAL_ENV"] == "ssh"