From 6ca63e5815a20cc2b69fe517fe02170dbf2de1f1 Mon Sep 17 00:00:00 2001 From: Lumen Yang Date: Thu, 14 May 2026 21:51:38 +0000 Subject: [PATCH] perf(webui): keep external refresh metadata cheap --- api/models.py | 74 ++++++++++++++++++- api/routes.py | 35 +++++++-- api/streaming.py | 17 ++++- ...t_issue856_background_completion_unread.py | 2 +- tests/test_tars_scroll_reset_regressions.py | 2 +- 5 files changed, 115 insertions(+), 15 deletions(-) diff --git a/api/models.py b/api/models.py index 5fdd9818..82a0ca6c 100644 --- a/api/models.py +++ b/api/models.py @@ -2339,6 +2339,60 @@ def get_state_db_session_messages(sid, *, stitch_continuations: bool = False) -> return msgs +def get_state_db_session_summary(sid) -> dict: + """Return cheap message count/max timestamp for one state.db session. + + This is intentionally narrower than ``get_state_db_session_messages`` for + metadata-only WebUI polling: callers only need a staleness signal, not a + fully materialized transcript with tool/reasoning metadata. + """ + import os + try: + import sqlite3 + except ImportError: + return {} + + try: + from api.profiles import get_active_hermes_home + hermes_home = Path(get_active_hermes_home()).expanduser().resolve() + except Exception: + hermes_home = Path(os.getenv('HERMES_HOME', str(HOME / '.hermes'))).expanduser().resolve() + db_path = hermes_home / 'state.db' + if not sid or not db_path.exists(): + return {} + + try: + with closing(sqlite3.connect(str(db_path))) as conn: + conn.row_factory = sqlite3.Row + cur = conn.cursor() + cur.execute("PRAGMA table_info(messages)") + available = {str(row['name']) for row in cur.fetchall()} + if not {'session_id', 'timestamp'}.issubset(available): + return {} + cur.execute( + """ + SELECT COUNT(*) AS message_count, MAX(timestamp) AS last_message_at + FROM messages + WHERE session_id = ? + """, + (str(sid),), + ) + row = cur.fetchone() + if not row: + return {} + count = int(row['message_count'] or 0) + last_message_at = row['last_message_at'] + result = {'message_count': count} + if last_message_at not in (None, ''): + try: + result['last_message_at'] = float(last_message_at) + except (TypeError, ValueError): + pass + return result + except Exception: + return {} + + def _normalized_message_timestamp_for_key(value): if value is None or value == "": return "" @@ -2408,12 +2462,27 @@ def merge_session_messages_append_only(sidecar_messages: list, state_messages: l continue if key in seen_message_keys: continue + # State rows at or before the newest sidecar timestamp are normally + # assumed to have already been observed by the sidecar. The <= gate + # preserves sidecar-only ordering/metadata for equal timestamps and + # prevents duplicate legacy rows when timestamp precision differs + # between stores. Explicit message ids are authoritative, though: two + # equal-timestamp messages with different ids are distinct retries. + if ( + key[0] != "message_id" + and max_sidecar_timestamp is not None + and timestamp is not None + and timestamp <= max_sidecar_timestamp + ): + continue seen_message_keys.add(key) merged_messages.append(msg) return merged_messages -def reconciled_state_db_messages_for_session(session, *, prefer_context: bool = False) -> list: +def reconciled_state_db_messages_for_session( + session, *, prefer_context: bool = False, state_messages: list | None = None +) -> list: """Return append-only messages reconciled with state.db for a WebUI session.""" if session is None: return [] @@ -2424,7 +2493,8 @@ def reconciled_state_db_messages_for_session(session, *, prefer_context: bool = local_messages = context_messages if not local_messages: local_messages = getattr(session, 'messages', None) or [] - state_messages = get_state_db_session_messages(getattr(session, 'session_id', None)) + if state_messages is None: + state_messages = get_state_db_session_messages(getattr(session, 'session_id', None)) return merge_session_messages_append_only(local_messages, state_messages) diff --git a/api/routes.py b/api/routes.py index df507739..01671c2a 100644 --- a/api/routes.py +++ b/api/routes.py @@ -2221,6 +2221,7 @@ from api.models import ( get_cli_sessions, get_cli_session_messages, get_state_db_session_messages, + get_state_db_session_summary, merge_session_messages_append_only, ensure_cron_project, is_cron_session, @@ -3668,15 +3669,16 @@ def handle_get(handler, parsed) -> bool: is_messaging_session = _is_messaging_session_record(s) or _is_messaging_session_record(cli_meta) cli_messages = [] state_db_messages = [] + state_db_summary = {} if is_messaging_session: cli_messages = get_cli_session_messages(sid) elif load_messages: state_db_messages = get_state_db_session_messages(sid) elif not is_messaging_session: - # Metadata-only callers (frontend refresh polling) still need a - # reconciled count/timestamp so externally appended state.db - # messages can be detected without fetching the full transcript. - state_db_messages = get_state_db_session_messages(sid) + # Metadata-only callers (frontend refresh polling) only need a + # cheap staleness signal. Avoid full transcript materialization + # on the steady-state polling path. + state_db_summary = get_state_db_session_summary(sid) _t2 = _time.monotonic() effective_model = ( _resolve_effective_session_model_for_display(s) @@ -3707,6 +3709,25 @@ def handle_get(handler, parsed) -> bool: _all_msgs = merge_session_messages_append_only(cli_messages, sidecar_messages) else: _all_msgs = merge_session_messages_append_only(getattr(s, "messages", []) or [], state_db_messages) + if not load_messages and state_db_summary: + sidecar_messages = getattr(s, "messages", []) or [] + sidecar_count = len(sidecar_messages) + try: + sidecar_last = max( + float((m or {}).get("timestamp") or 0) + for m in sidecar_messages + if isinstance(m, dict) + ) if sidecar_messages else 0 + except (TypeError, ValueError): + sidecar_last = 0 + state_count = int(state_db_summary.get("message_count") or 0) + state_last = float(state_db_summary.get("last_message_at") or 0) + _all_msgs = sidecar_messages + _summary_message_count = max(sidecar_count, state_count) + _summary_last_message_at = max(sidecar_last, state_last) + else: + _summary_message_count = None + _summary_last_message_at = None if load_messages: if msg_before is not None: # Scroll-to-top paging: msg_before is a 0-based index into @@ -3762,9 +3783,9 @@ def handle_get(handler, parsed) -> bool: # messages already carry per-message tool metadata. Avoid sending # the full historical list with a small tail window. _session_tool_calls = [] - _merged_message_count = len(_all_msgs) - _merged_last_message_at = 0 - if _all_msgs: + _merged_message_count = _summary_message_count if _summary_message_count is not None else len(_all_msgs) + _merged_last_message_at = _summary_last_message_at if _summary_last_message_at is not None else 0 + if _summary_last_message_at is None and _all_msgs: try: _merged_last_message_at = max( float((m or {}).get("timestamp") or 0) diff --git a/api/streaming.py b/api/streaming.py index 71fa2498..a509261c 100644 --- a/api/streaming.py +++ b/api/streaming.py @@ -39,7 +39,7 @@ from api.compression_anchor import visible_messages_for_anchor from api.metering import meter from api.run_journal import RunJournalWriter from api.turn_journal import append_turn_journal_event_for_stream -from api.models import reconciled_state_db_messages_for_session +from api.models import get_state_db_session_messages, reconciled_state_db_messages_for_session # Global lock for os.environ writes. Per-session locks (_agent_lock) prevent # concurrent runs of the SAME session, but two DIFFERENT sessions can still @@ -3958,10 +3958,19 @@ def _run_agent_streaming( # or has been zeroed out (e.g. via a buggy migration / manual file edit). # Truthy-check covers None, missing-attr, and 0 uniformly. _turn_started_at = _pending_started_at if _pending_started_at else time.time() - _reconciled_messages = list(reconciled_state_db_messages_for_session(s) or []) - _previous_messages = _reconciled_messages + _external_state_messages = get_state_db_session_messages(getattr(s, 'session_id', None)) + _previous_messages = list( + reconciled_state_db_messages_for_session( + s, + state_messages=_external_state_messages, + ) or [] + ) _previous_context_messages = _drop_checkpointed_current_user_from_context( - reconciled_state_db_messages_for_session(s, prefer_context=True), + reconciled_state_db_messages_for_session( + s, + prefer_context=True, + state_messages=_external_state_messages, + ), msg_text, ) _pre_compression_count = getattr( diff --git a/tests/test_issue856_background_completion_unread.py b/tests/test_issue856_background_completion_unread.py index 62ee9f64..8b654dc6 100644 --- a/tests/test_issue856_background_completion_unread.py +++ b/tests/test_issue856_background_completion_unread.py @@ -389,7 +389,7 @@ def test_focus_visibility_return_marks_active_session_viewed_and_clears_marker() def test_completion_unread_clears_only_when_session_is_opened(): - load_idx = SESSIONS_JS.find("async function loadSession(sid)") + load_idx = SESSIONS_JS.find("async function loadSession(sid") assert load_idx != -1, "loadSession not found" load_block = SESSIONS_JS[load_idx:SESSIONS_JS.find("function _resolveSessionModelForDisplaySoon", load_idx)] diff --git a/tests/test_tars_scroll_reset_regressions.py b/tests/test_tars_scroll_reset_regressions.py index a37abf2e..256ac48d 100644 --- a/tests/test_tars_scroll_reset_regressions.py +++ b/tests/test_tars_scroll_reset_regressions.py @@ -28,7 +28,7 @@ def test_clicking_current_session_is_noop_before_load_session_side_effects(): load_session = _function_body(SESSIONS_JS, "async function loadSession") current_idx = load_session.index("const currentSid = S.session ? S.session.session_id : null") - noop_idx = load_session.index("if(currentSid===sid) return") + noop_idx = load_session.index("if(currentSid===sid && !forceReload) return") loading_idx = load_session.index("_loadingSessionId = sid") stop_idx = load_session.index("stopApprovalPolling")