Add interim_assistant streaming path to WebUI

This commit is contained in:
Frank Song
2026-05-08 10:28:45 +08:00
committed by nesquena-hermes
parent 773857d159
commit 82c7367cef
4 changed files with 212 additions and 5 deletions
+15
View File
@@ -2102,6 +2102,17 @@ def _run_agent_streaming(
meter().record_reasoning(stream_id, _metering_reasoning_deltas[0])
_emit_metering()
def on_interim_assistant(text, **cb_kwargs):
if text is None:
return
visible = str(text).strip()
if not visible:
return
put('interim_assistant', {
'text': visible,
'already_streamed': bool(cb_kwargs.get('already_streamed', False)),
})
# Pre-initialise the activity counter here so on_tool (which
# closes over it) never captures an unbound name even if this
# block is reordered later (Issue #765).
@@ -2353,6 +2364,8 @@ def _run_agent_streaming(
# but guard defensively to avoid TypeError on an older agent build.
if 'reasoning_config' in _agent_params and _reasoning_config is not None:
_agent_kwargs['reasoning_config'] = _reasoning_config
if 'interim_assistant_callback' in _agent_params:
_agent_kwargs['interim_assistant_callback'] = on_interim_assistant
if 'status_callback' in _agent_params:
_agent_kwargs['status_callback'] = _agent_status_callback
if 'max_tokens' in _agent_params and _max_tokens_cfg is not None:
@@ -2410,6 +2423,8 @@ def _run_agent_streaming(
agent.tool_progress_callback = _agent_kwargs.get('tool_progress_callback')
if hasattr(agent, 'status_callback'):
agent.status_callback = _agent_kwargs.get('status_callback')
if hasattr(agent, 'interim_assistant_callback'):
agent.interim_assistant_callback = _agent_kwargs.get('interim_assistant_callback')
if hasattr(agent, 'reasoning_callback'):
agent.reasoning_callback = _agent_kwargs.get('reasoning_callback')
if hasattr(agent, 'clarify_callback'):
+29 -5
View File
@@ -645,6 +645,14 @@ function attachLiveStream(activeSid, streamId, uploaded=[], options={}){
if(!_SMD_SAFE_URL_RE.test(v)){n.removeAttribute('src');n.setAttribute('data-blocked-scheme','1');}
}
}
function _resetAssistantSegment(){
assistantRow=null;
assistantBody=null;
segmentStart=assistantText.length;
_freshSegment=true;
_smdEndParser();
}
let _lastRenderMs=0;
function _scheduleRender(){
if(_renderPending) return;
@@ -725,6 +733,26 @@ function attachLiveStream(activeSid, streamId, uploaded=[], options={}){
_scheduleRender();
});
source.addEventListener('interim_assistant',e=>{
if(!S.session||S.session.session_id!==activeSid) return;
const d=JSON.parse(e.data);
const visible=String(d&&d.text?d.text:'').trim();
const alreadyStreamed=!!(d&&d.already_streamed);
if(!visible){
return;
}
if(alreadyStreamed){
_resetAssistantSegment();
return;
}
assistantText+=visible;
syncInflightAssistantMessage();
if(!S.session||S.session.session_id!==activeSid) return;
const parsed=_parseStreamState();
if(String((parsed&&parsed.displayText)||'').trim()||assistantRow) ensureAssistantRow();
_scheduleRender();
});
source.addEventListener('reasoning',e=>{
const d=JSON.parse(e.data);
reasoningText += d.text || '';
@@ -768,11 +796,7 @@ function attachLiveStream(activeSid, streamId, uploaded=[], options={}){
// Reset the live assistant row reference so that any text tokens arriving
// after this tool call create a NEW segment appended below the tool card,
// rather than updating the old segment that sits above it in the DOM.
assistantRow=null;
assistantBody=null;
segmentStart=assistantText.length; // new segment starts at current text length
_freshSegment=true; // prevent reuse of old DOM node
_smdEndParser(); // finalize current smd parser; new one created on next token
_resetAssistantSegment();
scrollIfPinned();
});
+17
View File
@@ -693,6 +693,23 @@ def test_messages_js_supports_live_reasoning_and_tool_completion(cleanup_test_se
"messages.js must parse live stream state into reasoning + visible answer"
def test_messages_js_supports_interim_assistant_events(cleanup_test_sessions):
"""R18b: messages.js must render live interim assistant commentary when
`interim_assistant` SSE events arrive.
AIAgent emits completed mid-turn commentary through an interim callback.
Without a dedicated SSE handler, Codex-style interim status text disappears
from the live answer and users only see the final response after tool calls.
"""
src = (REPO_ROOT / "static/messages.js").read_text()
assert "source.addEventListener('interim_assistant'" in src or 'source.addEventListener("interim_assistant"' in src, \
"messages.js must listen for interim_assistant SSE events"
assert "function _resetAssistantSegment()" in src, \
"messages.js should share live-segment reset logic between interim assistant updates and tool events"
assert "_resetAssistantSegment();" in src, \
"messages.js should apply segment reset when tool or interim assistant events require it"
def test_ui_js_can_upgrade_thinking_spinner_into_live_reasoning_card(cleanup_test_sessions):
"""R19: ui.js must be able to replace the placeholder thinking spinner with
streamed reasoning text while a turn is in progress.
+151
View File
@@ -251,6 +251,157 @@ class TestRuntimeRouteInjection(unittest.TestCase):
self.assertEqual(init_kwargs["api_key"], "rt-key")
self.assertIs(init_kwargs["session_db"], fake_session_db)
def test_runtime_provider_forwards_interim_assistant_callback(self):
"""WebUI must pass interim_assistant_callback to AIAgent and emit SSE events."""
import api.streaming as streaming
captured = {}
class CapturingAgent:
def __init__(
self,
model=None,
provider=None,
base_url=None,
api_key=None,
platform=None,
quiet_mode=False,
enabled_toolsets=None,
fallback_model=None,
session_id=None,
session_db=None,
stream_delta_callback=None,
reasoning_callback=None,
tool_progress_callback=None,
interim_assistant_callback=None,
clarify_callback=None,
**kwargs,
):
captured["init_kwargs"] = dict(
model=model, provider=provider, base_url=base_url, api_key=api_key,
platform=platform, quiet_mode=quiet_mode,
enabled_toolsets=enabled_toolsets, fallback_model=fallback_model,
session_id=session_id, session_db=session_db,
stream_delta_callback=stream_delta_callback,
reasoning_callback=reasoning_callback,
tool_progress_callback=tool_progress_callback,
interim_assistant_callback=interim_assistant_callback,
clarify_callback=clarify_callback,
)
self.session_id = session_id
self.context_compressor = None
self.session_prompt_tokens = 0
self.session_completion_tokens = 0
self.session_estimated_cost_usd = None
self.reasoning_config = None
self.ephemeral_system_prompt = None
self._last_error = None
self.interim_assistant_callback = interim_assistant_callback
def run_conversation(self, **kwargs):
if self.interim_assistant_callback:
self.interim_assistant_callback("Inspecting repo structure.", already_streamed=False)
return {
"messages": [
{"role": "user", "content": kwargs.get("persist_user_message", "")},
{"role": "assistant", "content": "ok"},
]
}
def interrupt(self, _message):
captured["interrupted"] = True
class FakeSession:
session_id = "sess-interim-test"
title = "Test"
workspace = "/tmp"
model = "gpt-4o"
messages = []
personality = None
input_tokens = 0
output_tokens = 0
estimated_cost = None
tool_calls = []
active_stream_id = None
pending_user_message = None
pending_attachments = []
pending_started_at = None
def save(self, touch_updated_at=True, skip_index=True):
pass
def compact(self):
return {
"session_id": self.session_id, "title": self.title,
"workspace": self.workspace, "model": self.model,
"created_at": 0, "updated_at": 0, "pinned": False,
"archived": False, "project_id": None, "profile": None,
"input_tokens": 0, "output_tokens": 0,
"estimated_cost": None, "personality": None,
}
@property
def path(self):
return "/tmp/fake.json"
fake_stream_id = "stream-interim-callback"
fake_queue = queue.Queue()
fake_rt_module = types.ModuleType("hermes_cli.runtime_provider")
fake_rt_module.resolve_runtime_provider = mock.Mock(return_value={
"provider": "openai-codex",
"base_url": "https://api.openai.com/v1",
"api_key": "rt-key",
"api_mode": "codex_responses",
"command": "codex",
"args": ["exec", "--json"],
"credential_pool": object(),
})
fake_hermes_cli = types.ModuleType("hermes_cli")
fake_hermes_cli.runtime_provider = fake_rt_module
fake_hermes_state = types.ModuleType("hermes_state")
fake_hermes_state.SessionDB = mock.Mock(return_value=object())
with mock.patch.object(streaming, "get_session", return_value=FakeSession()), \
mock.patch.object(streaming, "_get_ai_agent", return_value=CapturingAgent), \
mock.patch.object(streaming, "resolve_model_provider", return_value=("gpt-4o", "openai-codex", None)), \
mock.patch("api.config.get_config", return_value={}), \
mock.patch("api.config._resolve_cli_toolsets", return_value=[]), \
mock.patch.dict(sys.modules, {
"hermes_cli": fake_hermes_cli,
"hermes_cli.runtime_provider": fake_rt_module,
"hermes_state": fake_hermes_state,
}):
streaming.STREAMS[fake_stream_id] = fake_queue
streaming._run_agent_streaming(
session_id="sess-interim-test",
msg_text="hello",
model="gpt-4o",
workspace="/tmp",
stream_id=fake_stream_id,
)
init_kwargs = captured["init_kwargs"]
self.assertIsNotNone(init_kwargs["interim_assistant_callback"])
self.assertTrue(callable(init_kwargs["interim_assistant_callback"]))
interim_events = []
while not fake_queue.empty():
try:
interim_events.append(fake_queue.get_nowait())
except queue.Empty:
break
self.assertTrue(
any(event == "interim_assistant" for event, _ in interim_events),
"interim_assistant callback should emit interim_assistant SSE events",
)
self.assertTrue(
any(
event == "interim_assistant" and event_data.get("text") == "Inspecting repo structure."
for event, event_data in interim_events
),
"interim_assistant event should carry the assistant commentary text"
)
class TestSessionDBAST(unittest.TestCase):
"""AST-level checks: verify the try/except is not inside _ENV_LOCK (deadlock guard)."""