Stage 374: PR #2427 — fix(streaming): recover journaled partial assistant output after WebUI restart by @franksong2702 (fixes #2423)

Co-authored-by: Frank Song <franksong2702@gmail.com>
This commit is contained in:
nesquena-hermes
2026-05-17 02:49:35 +00:00
parent a33cd4195b
commit 8f98465024
4 changed files with 390 additions and 16 deletions
+180 -10
View File
@@ -685,26 +685,188 @@ def _get_profile_home(profile) -> Path:
return Path(os.environ.get('HERMES_HOME') or '~/.hermes').expanduser()
def _interrupted_recovery_marker() -> dict:
return {
'role': 'assistant',
'content': (
def _interrupted_recovery_marker(*, recovered_output: bool = False) -> dict:
if recovered_output:
content = (
'**Response interrupted.**\n\n'
'The WebUI process restarted before this turn finished. '
'The partial output above was recovered from the run journal, '
'but the interrupted agent process could not continue.'
)
else:
content = (
'**Response interrupted.**\n\n'
'The WebUI process restarted before this turn finished. '
'The user message above was preserved, but no agent output was recovered.'
),
)
return {
'role': 'assistant',
'content': content,
'timestamp': int(time.time()),
'_error': True,
'type': 'interrupted',
}
def _truncate_journal_tool_args(args, limit: int = 4) -> dict:
if not isinstance(args, dict):
return {}
out = {}
for key, value in list(args.items())[:limit]:
text = str(value)
out[str(key)] = text[:120] + ('...' if len(text) > 120 else '')
return out
def _append_journaled_partial_output(session, stream_id: str | None) -> bool:
"""Recover already-emitted visible output from a dead stream journal.
This repair path is intentionally conservative: it restores user-visible
assistant text and tool-card metadata that had already been emitted over
SSE before the WebUI process died. It does not restore hidden reasoning and
it does not try to continue execution.
"""
if not stream_id:
return False
try:
from api.run_journal import read_run_events
journal = read_run_events(session.session_id, stream_id)
except Exception:
logger.debug(
"Session %s: failed to read run journal for stream %s",
getattr(session, 'session_id', '?'),
stream_id,
exc_info=True,
)
return False
events = [event for event in journal.get('events') or [] if isinstance(event, dict)]
if not events:
return False
appended_any = False
assistant_parts: list[str] = []
assistant_started_at: float | None = None
current_assistant_idx: int | None = None
recovered_tool_calls: list[dict] = []
def flush_assistant() -> int | None:
nonlocal appended_any, assistant_parts, assistant_started_at, current_assistant_idx
content = ''.join(assistant_parts).strip()
assistant_parts = []
if not content:
return current_assistant_idx
timestamp = int(assistant_started_at or time.time())
session.messages.append({
'role': 'assistant',
'content': content,
'timestamp': timestamp,
'_recovered_from_run_journal': True,
'_recovered_stream_id': stream_id,
})
current_assistant_idx = len(session.messages) - 1
assistant_started_at = None
appended_any = True
return current_assistant_idx
def ensure_assistant_anchor(created_at: float | None = None) -> int:
nonlocal appended_any, current_assistant_idx
idx = flush_assistant()
if idx is not None:
return idx
# A stream can start with tools before any text. Keep those tools
# visible after restart with an empty recovered assistant anchor instead
# of inventing synthetic progress prose.
session.messages.append({
'role': 'assistant',
'content': '',
'timestamp': int(created_at or time.time()),
'_recovered_from_run_journal': True,
'_recovered_stream_id': stream_id,
})
current_assistant_idx = len(session.messages) - 1
appended_any = True
return current_assistant_idx
for event in events:
event_name = str(event.get('event') or event.get('type') or '')
payload = event.get('payload') if isinstance(event.get('payload'), dict) else {}
created_at = event.get('created_at') if isinstance(event.get('created_at'), (int, float)) else None
if event_name == 'token':
text = str(payload.get('text') or '')
if not text:
continue
if not assistant_parts and assistant_started_at is None:
assistant_started_at = created_at or time.time()
assistant_parts.append(text)
continue
if event_name == 'interim_assistant':
if payload.get('already_streamed'):
flush_assistant()
continue
text = str(payload.get('text') or '').strip()
if not text:
continue
if not assistant_parts and assistant_started_at is None:
assistant_started_at = created_at or time.time()
if assistant_parts and not ''.join(assistant_parts).endswith(('\n', ' ')):
assistant_parts.append('\n\n')
assistant_parts.append(text)
flush_assistant()
continue
if event_name == 'tool':
anchor_idx = flush_assistant()
if anchor_idx is None:
anchor_idx = ensure_assistant_anchor(created_at)
name = str(payload.get('name') or 'tool')
preview = str(payload.get('preview') or '')
recovered_tool_calls.append({
'name': name,
'preview': preview,
'snippet': preview,
'tid': f"journal-{event.get('seq') or len(recovered_tool_calls) + 1}",
'assistant_msg_idx': anchor_idx,
'args': _truncate_journal_tool_args(payload.get('args') or {}),
'done': False,
'_recovered_from_run_journal': True,
'_recovered_stream_id': stream_id,
})
appended_any = True
current_assistant_idx = anchor_idx
continue
if event_name == 'tool_complete':
name = str(payload.get('name') or '')
for tool_call in reversed(recovered_tool_calls):
if tool_call.get('done'):
continue
if not name or tool_call.get('name') == name:
tool_call['done'] = True
if payload.get('preview'):
tool_call['preview'] = str(payload.get('preview') or '')
tool_call['snippet'] = str(payload.get('preview') or '')
if payload.get('duration') is not None:
tool_call['duration'] = payload.get('duration')
tool_call['is_error'] = bool(payload.get('is_error', False))
break
continue
if event_name in {'done', 'stream_end', 'cancel', 'apperror', 'error'}:
flush_assistant()
flush_assistant()
if recovered_tool_calls:
session.tool_calls = list(session.tool_calls or []) + recovered_tool_calls
appended_any = True
return appended_any
def _apply_core_sync_or_error_marker(
session,
core_path,
stream_id_for_recheck=None,
*,
require_stream_dead=True,
touch_updated_at=True,
) -> bool:
"""Inner repair logic. Must be called with the per-session lock already held.
@@ -761,12 +923,16 @@ def _apply_core_sync_or_error_marker(
if session.pending_attachments:
recovered['attachments'] = list(session.pending_attachments)
_append_recovered_turn_to_context(session, recovered)
recovered_output = _append_journaled_partial_output(
session,
stream_id_for_recheck or session.active_stream_id,
)
session.active_stream_id = None
session.pending_user_message = None
session.pending_attachments = []
session.pending_started_at = None
session.messages.append(_interrupted_recovery_marker())
session.save()
session.messages.append(_interrupted_recovery_marker(recovered_output=recovered_output))
session.save(touch_updated_at=touch_updated_at)
logger.info(
"Session %s: recovered pending user turn (messages non-empty), added error marker",
sid,
@@ -789,7 +955,7 @@ def _apply_core_sync_or_error_marker(
session.pending_user_message = None
session.pending_attachments = []
session.pending_started_at = None
session.save()
session.save(touch_updated_at=touch_updated_at)
logger.info(
"Session %s: synced %d messages from core transcript",
sid, len(core_messages),
@@ -805,12 +971,16 @@ def _apply_core_sync_or_error_marker(
if isinstance(session.pending_started_at, (int, float)) and session.pending_started_at > 0:
_recovered_ts = int(session.pending_started_at)
_append_recovered_pending_turn(session, timestamp=_recovered_ts)
recovered_output = _append_journaled_partial_output(
session,
stream_id_for_recheck or session.active_stream_id,
)
session.active_stream_id = None
session.pending_user_message = None
session.pending_attachments = []
session.pending_started_at = None
session.messages.append(_interrupted_recovery_marker())
session.save()
session.messages.append(_interrupted_recovery_marker(recovered_output=recovered_output))
session.save(touch_updated_at=touch_updated_at)
logger.info("Session %s: no core transcript found, added error marker", sid)
return True
+33
View File
@@ -1004,6 +1004,39 @@ def _clear_stale_stream_state(session) -> bool:
with _get_session_agent_lock(session.session_id):
if getattr(session, "active_stream_id", None) != stream_id:
return False
if getattr(session, "pending_user_message", None):
try:
from api.models import _apply_core_sync_or_error_marker, _get_profile_home
profile_home = _get_profile_home(getattr(session, "profile", None))
core_path = profile_home / "sessions" / f"session_{session.session_id}.json"
repaired = _apply_core_sync_or_error_marker(
session,
core_path,
stream_id_for_recheck=stream_id,
touch_updated_at=False,
)
except Exception:
logger.exception(
"_clear_stale_stream_state: failed to repair stale pending stream %s "
"for session %s",
stream_id, getattr(session, "session_id", "?"),
)
repaired = False
if repaired:
if original_stub is not session:
try:
original_stub.active_stream_id = None
if hasattr(original_stub, "pending_user_message"):
original_stub.pending_user_message = None
if hasattr(original_stub, "pending_attachments"):
original_stub.pending_attachments = []
if hasattr(original_stub, "pending_started_at"):
original_stub.pending_started_at = None
except Exception:
pass
return True
if getattr(session, "active_stream_id", None) != stream_id:
return False
_materialize_pending_user_turn_before_error(session)
session.active_stream_id = None
if hasattr(session, "pending_user_message"):
+53 -6
View File
@@ -25,6 +25,7 @@ import api.config as config
import api.models as models
import api.streaming as streaming
from api.models import Session
from api.run_journal import append_run_event
from api.streaming import cancel_stream
REPO_ROOT = pathlib.Path(__file__).parent.parent.resolve()
@@ -391,14 +392,60 @@ def test_stale_stream_cleanup_materializes_pending_turn_before_clearing_state():
assert cleared is True
assert s.active_stream_id is None
assert s.pending_user_message is None
assert s.messages[-1]["role"] == "user"
assert s.messages[-1]["content"] == "please make the GUI fully usable"
assert s.messages[-1]["timestamp"] == 1778187755
assert s.messages[-1]["attachments"] == [{"name": "visible-state.png"}]
assert s.messages[-2]["role"] == "user"
assert s.messages[-2]["content"] == "please make the GUI fully usable"
assert s.messages[-2]["timestamp"] == 1778187755
assert s.messages[-2]["attachments"] == [{"name": "visible-state.png"}]
assert s.messages[-1]["role"] == "assistant"
assert s.messages[-1].get("_error") is True
assert s.messages[-1].get("type") == "interrupted"
reloaded = models.get_session(sid, metadata_only=False)
assert reloaded.messages[-1]["role"] == "user"
assert reloaded.messages[-1]["content"] == "please make the GUI fully usable"
assert reloaded.messages[-2]["role"] == "user"
assert reloaded.messages[-2]["content"] == "please make the GUI fully usable"
assert reloaded.messages[-1]["role"] == "assistant"
assert reloaded.messages[-1].get("type") == "interrupted"
def test_stale_stream_cleanup_recovers_journaled_visible_output():
"""The /api/session stale cleanup path can run before a full chat reload;
it must preserve journaled partial output instead of only clearing runtime
flags."""
from api.routes import _clear_stale_stream_state
sid = "test_pending_error_d4_journal"
s = _make_session(
session_id=sid,
pending_msg="please check maintainer activity",
messages=[{"role": "assistant", "content": "previous answer"}],
)
append_run_event(
sid,
"stream_1361",
"token",
{"text": "I will check GitHub first."},
)
append_run_event(
sid,
"stream_1361",
"tool",
{"name": "terminal", "preview": "gh issue view 2423", "args": {"command": "gh issue view 2423"}},
)
append_run_event(
sid,
"stream_1361",
"tool_complete",
{"name": "terminal", "duration": 0.4, "is_error": False},
)
cleared = _clear_stale_stream_state(s)
assert cleared is True
assert any("I will check GitHub first." in (m.get("content") or "") for m in s.messages)
assert s.tool_calls
assert s.tool_calls[0]["name"] == "terminal"
assert s.messages[-1].get("type") == "interrupted"
assert "partial output above was recovered" in s.messages[-1]["content"]
# ── Structural guard: pin call sites of the materialize helper at error branches ──
+124
View File
@@ -21,6 +21,7 @@ from api.models import (
import api.config as config
import api.streaming as streaming
import api.profiles as profiles
from api.run_journal import append_run_event
# ── Fixtures ────────────────────────────────────────────────────────────────
@@ -617,6 +618,129 @@ class TestNonEmptyMessagesPendingCleared:
assert s.pending_started_at is None
assert s.active_stream_id is None
def test_journaled_partial_output_is_recovered_before_interrupted_marker(self, hermes_home, monkeypatch):
"""When a WebUI restart leaves a dead stream with journaled partial
output, repair should not collapse the user-visible transcript to only
a generic interrupted marker."""
s = _make_session(messages=[{"role": "user", "content": "existing turn"}])
s.pending_user_message = "Check maintainer activity"
s.pending_started_at = time.time() - 120
s.active_stream_id = "journaled_stream"
s.save()
append_run_event(
s.session_id,
"journaled_stream",
"token",
{"text": "I will check GitHub first."},
)
append_run_event(
s.session_id,
"journaled_stream",
"tool",
{
"name": "terminal",
"preview": "gh pr list --repo nesquena/hermes-webui",
"args": {"command": "gh pr list --repo nesquena/hermes-webui"},
},
)
append_run_event(
s.session_id,
"journaled_stream",
"tool_complete",
{"name": "terminal", "duration": 1.2, "is_error": False},
)
append_run_event(
s.session_id,
"journaled_stream",
"token",
{"text": "The first check finished before the restart."},
)
core_path = hermes_home / "sessions" / f"session_{s.session_id}.json"
result = _apply_core_sync_or_error_marker(
s,
core_path,
stream_id_for_recheck="journaled_stream",
)
assert result is True
contents = [m.get("content", "") for m in s.messages]
assert any("I will check GitHub first." in c for c in contents)
assert any("The first check finished before the restart." in c for c in contents)
assert s.tool_calls, "journaled tool starts should become visible settled tool cards"
assert s.tool_calls[0]["name"] == "terminal"
assert s.tool_calls[0]["done"] is True
assert s.tool_calls[0]["assistant_msg_idx"] < len(s.messages)
error_msgs = [m for m in s.messages if m.get("_error")]
assert len(error_msgs) == 1
assert "partial output above was recovered" in error_msgs[0]["content"]
assert "no agent output was recovered" not in error_msgs[0]["content"]
def test_journal_recovery_does_not_materialize_reasoning_only_events(self, hermes_home, monkeypatch):
"""Run-journal repair must not turn hidden reasoning into visible chat
transcript content."""
s = _make_session(messages=[{"role": "user", "content": "existing turn"}])
s.pending_user_message = "Keep going"
s.pending_started_at = time.time() - 120
s.active_stream_id = "reasoning_only_stream"
s.save()
append_run_event(
s.session_id,
"reasoning_only_stream",
"reasoning",
{"text": "private scratchpad text"},
)
core_path = hermes_home / "sessions" / f"session_{s.session_id}.json"
result = _apply_core_sync_or_error_marker(
s,
core_path,
stream_id_for_recheck="reasoning_only_stream",
)
assert result is True
contents = [m.get("content", "") for m in s.messages]
assert not any("private scratchpad text" in c for c in contents)
error_msgs = [m for m in s.messages if m.get("_error")]
assert len(error_msgs) == 1
assert "no agent output was recovered" in error_msgs[0]["content"]
def test_journal_recovery_keeps_consecutive_tools_on_one_anchor(self, hermes_home, monkeypatch):
"""Consecutive journaled tools without an intervening visible update
should recover as one activity group instead of repeated empty anchors."""
s = _make_session(messages=[{"role": "user", "content": "existing turn"}])
s.pending_user_message = "Inspect files"
s.pending_started_at = time.time() - 120
s.active_stream_id = "tool_burst_stream"
s.save()
append_run_event(
s.session_id,
"tool_burst_stream",
"token",
{"text": "I will inspect the relevant files first."},
)
for name in ("search_files", "read_file"):
append_run_event(
s.session_id,
"tool_burst_stream",
"tool",
{"name": name, "preview": name, "args": {"query": "stream recovery"}},
)
core_path = hermes_home / "sessions" / f"session_{s.session_id}.json"
result = _apply_core_sync_or_error_marker(
s,
core_path,
stream_id_for_recheck="tool_burst_stream",
)
assert result is True
assert len(s.tool_calls) == 2
assert s.tool_calls[0]["assistant_msg_idx"] == s.tool_calls[1]["assistant_msg_idx"]
class TestLastResortSyncDelegation:
"""_last_resort_sync_from_core delegates to the shared helpers