"""Session-mutation operations for slash commands (/retry, /undo) and read-only aggregators (/status, /usage). Operates on the webui's own JSON Session store (api/models.py), not on hermes-agent's SQLite. Behavior parity reference: gateway/run.py:_handle_*_command in the hermes-agent repo. """ from __future__ import annotations import json import logging from typing import Any from api.config import LOCK, _get_session_agent_lock from api.models import get_session, SESSIONS logger = logging.getLogger(__name__) AUTO_TITLE_LABELS = {'untitled', 'new chat'} def _live_active_stream_id(session) -> str | None: """Return session.active_stream_id ONLY if that stream is live in THIS process; else None. After a restart/crash the persisted active_stream_id survives in the session JSON but the in-memory STREAMS / ACTIVE_RUNS that actually drive a live turn were wiped. Exposing that dead id (e.g. via /api/session/status to the hidden-tab poller) would make a client attach its renderer to a stream that never emits — a permanent fake "thinking" state. Liveness test mirrors routes._clear_stale_stream_state: live iff present in STREAMS (open SSE channel) or ACTIVE_RUNS (worker bookkeeping). """ stream_id = getattr(session, 'active_stream_id', None) if not stream_id: return None try: from api import config as _cfg with _cfg.STREAMS_LOCK: if stream_id in _cfg.STREAMS: return stream_id with _cfg.ACTIVE_RUNS_LOCK: if stream_id in (_cfg.ACTIVE_RUNS or {}): return stream_id except Exception: # On any introspection failure, fail SAFE (report no live stream) rather # than surfacing a possibly-stale id. return None return None def session_has_manual_title(session) -> bool: """Return whether adaptive title refresh should leave this title alone.""" return getattr(session, 'manual_title', False) is True def apply_session_title_rename(session, raw_title) -> str: """Apply user-driven rename semantics to a Session object. Non-empty custom titles are protected from adaptive refresh. Clearing the title, or resetting it to an automatic label, removes that protection so the normal auto-title path can run again. """ title = str(raw_title or '').strip()[:80] if not title: title = 'Untitled' manual_title = title.strip().casefold() not in AUTO_TITLE_LABELS session.title = title session.manual_title = manual_title session.llm_title_generated = False return title def mark_session_title_generated(session) -> None: """Mark a session title as generated by the title model.""" session.llm_title_generated = True session.manual_title = False def _truncate_at_last_user(messages): history = messages or [] last_user_idx = None for i in range(len(history) - 1, -1, -1): if isinstance(history[i], dict) and history[i].get('role') == 'user': last_user_idx = i break if last_user_idx is None: return None return history[:last_user_idx] def _truncation_watermark_for(messages): history = list(messages or []) if not history: return 0.0 try: return float(history[-1].get('timestamp') or 0) except (AttributeError, TypeError, ValueError): return 0.0 def truncate_context_for_display_keep( context_messages: list | None, full_messages: list | None, keep: int, ) -> list: """Align model context with display prefix ``full_messages[:keep]``.""" if keep <= 0: return [] ctx = context_messages if isinstance(context_messages, list) else [] msgs = full_messages if isinstance(full_messages, list) else [] if not ctx: return [] if len(ctx) <= len(msgs): return ctx[:keep] if len(msgs) == 0: return [] def _row_signature(row: Any) -> tuple[str, ...] | None: if not isinstance(row, dict): return None tool_calls = row.get('tool_calls') tool_calls_sig = json.dumps(tool_calls, sort_keys=True, default=str) if tool_calls else '' return ( str(row.get('role') or ''), str(row.get('content') or ''), str(row.get('tool_call_id') or ''), str(row.get('tool_use_id') or ''), str(row.get('tool_name') or row.get('name') or ''), tool_calls_sig, ) def _first_match_from(message: Any, start_idx: int) -> tuple[int | None, int | None]: msg_sig = _row_signature(message) if msg_sig is None: return None, None msg_id = message.get('id') msg_ts = message.get('timestamp') weak_matches: list[int] = [] for idx in range(start_idx, len(ctx)): context_row = ctx[idx] context_sig = _row_signature(context_row) if context_sig is None: continue context_id = context_row.get('id') if context_id is not None and msg_id is not None: if context_id == msg_id: return idx, None continue if context_sig != msg_sig: continue context_ts = context_row.get('timestamp') if context_ts is not None and msg_ts is not None: if context_ts == msg_ts: return idx, None continue weak_matches.append(idx) if len(weak_matches) > 1: return None, weak_matches[0] return (weak_matches[0], None) if len(weak_matches) == 1 else (None, None) matches = [None] * len(msgs) ambiguous_matches = [None] * len(msgs) next_ctx_idx = 0 for msg_idx, message in enumerate(msgs): match_idx, ambiguous_idx = _first_match_from(message, next_ctx_idx) matches[msg_idx] = match_idx ambiguous_matches[msg_idx] = ambiguous_idx if match_idx is not None: next_ctx_idx = match_idx + 1 # Cut at the first unkept display turn, or fallback to the last kept turn # if the boundary is not directly alignable. if keep < len(msgs): last_kept = None if keep > 0: last_kept = matches[keep - 1] first_unkept = matches[keep] if first_unkept is not None: if ( last_kept is not None and isinstance(msgs[keep - 1], dict) and msgs[keep - 1].get('role') == 'user' ): return ctx[:last_kept + 1] return ctx[:first_unkept] if last_kept is not None: ambiguous_first_unkept = ambiguous_matches[keep] if ( ambiguous_first_unkept is not None and isinstance(msgs[keep - 1], dict) and msgs[keep - 1].get('role') != 'user' ): return ctx[:ambiguous_first_unkept] return ctx[:last_kept + 1] # Final fallback preserves #5096 behavior when alignment is unreliable. prefix_len = max(0, len(ctx) - len(msgs)) prefix = ctx[:prefix_len] suffix = ctx[prefix_len:] return prefix + suffix[:keep] def truncate_session_at_keep(session, keep: int) -> tuple[int, int]: """Truncate display + context; set watermark/boundary. Returns old counts.""" full_messages = list(session.messages or []) old_msg_count = len(full_messages) old_ctx_count = len(getattr(session, 'context_messages', None) or []) session.messages = full_messages[:keep] if isinstance(getattr(session, 'context_messages', None), list): session.context_messages = truncate_context_for_display_keep( session.context_messages, full_messages, keep, ) session.truncation_watermark = _truncation_watermark_for(session.messages) session.truncation_boundary = session.truncation_watermark return old_msg_count, old_ctx_count def retry_last(session_id: str) -> dict[str, Any]: """Truncate the session to before the last user message, return its text. Mirrors gateway/run.py:_handle_retry_command. Caller (webui frontend) is expected to put the returned text back in the composer and call send() to resume the conversation -- the agent's gateway calls its own _handle_message; the webui has no equivalent in-process pipeline. Raises: KeyError: session not found ValueError: no user message in transcript """ # Acquire the per-session agent lock as the outermost lock so that the # read-modify-write of s.messages is serialised with the periodic # checkpoint thread, cancel_stream, and all other session writers. # Lock ordering: _agent_lock → LOCK → _write_session_index (LOCK). with _get_session_agent_lock(session_id): # get_session() and Session.save() both acquire the module-level LOCK # internally (the latter via _write_session_index()), and LOCK is a # non-reentrant threading.Lock — so they MUST be called outside our # own `with LOCK:` block to avoid self-deadlocking. # # The race we close is the read-modify-write of s.messages: two # concurrent /api/session/retry calls could otherwise both compute the # same last_user_idx from the same history and double-truncate. We # serialize just the in-memory mutation; persistence happens inside # the per-session lock so the checkpoint thread cannot race us. # # Stale-object guard: on a cache miss, two concurrent get_session() # calls can each load and cache a *different* Session instance for the # same session_id (the second store clobbers the first). Re-bind to # the canonical cached instance inside the lock so the mutation lands # on the object the next reader will see, not a stale parallel copy. s = get_session(session_id) # raises KeyError if missing with LOCK: s = SESSIONS.get(session_id, s) history = s.messages or [] last_user_idx = None for i in range(len(history) - 1, -1, -1): if history[i].get('role') == 'user': last_user_idx = i break if last_user_idx is None: raise ValueError('No previous message to retry.') last_user_text = _extract_text(history[last_user_idx].get('content', '')) removed_count = len(history) - last_user_idx s.messages = history[:last_user_idx] s.truncation_watermark = _truncation_watermark_for(s.messages) # Persist the original truncate cutoff so empty-sidecar recovery # can distinguish legitimate prefix from deleted suffix. s.truncation_boundary = s.truncation_watermark if isinstance(getattr(s, 'context_messages', None), list) and s.context_messages: truncated_context = _truncate_at_last_user(s.context_messages) if truncated_context is not None: s.context_messages = truncated_context s.save() return {'last_user_text': last_user_text, 'removed_count': removed_count} def undo_last(session_id: str) -> dict[str, Any]: """Remove the most recent user message and everything after it. Mirrors gateway/run.py:_handle_undo_command. Returns a preview of the removed text so the UI can confirm to the user. Raises: KeyError: session not found ValueError: no user message in transcript """ # Acquire the per-session agent lock as the outermost lock so that the # read-modify-write of s.messages is serialised with the periodic # checkpoint thread, cancel_stream, and all other session writers. # Lock ordering: _agent_lock → LOCK → _write_session_index (LOCK). with _get_session_agent_lock(session_id): s = get_session(session_id) # acquires LOCK transiently with LOCK: # Stale-object guard — see retry_last for the rationale. s = SESSIONS.get(session_id, s) history = s.messages or [] last_user_idx = None for i in range(len(history) - 1, -1, -1): if history[i].get('role') == 'user': last_user_idx = i break if last_user_idx is None: raise ValueError('Nothing to undo.') removed_text = _extract_text(history[last_user_idx].get('content', '')) removed_count = len(history) - last_user_idx s.messages = history[:last_user_idx] s.truncation_watermark = _truncation_watermark_for(s.messages) # Persist the original truncate cutoff. s.truncation_boundary = s.truncation_watermark if isinstance(getattr(s, 'context_messages', None), list) and s.context_messages: truncated_context = _truncate_at_last_user(s.context_messages) if truncated_context is not None: s.context_messages = truncated_context s.save() # outside LOCK -- save() re-acquires LOCK via _write_session_index() preview = (removed_text[:40] + '...') if len(removed_text) > 40 else removed_text return { 'removed_count': removed_count, 'removed_preview': preview, } def session_status(session_id: str) -> dict[str, Any]: """Return a snapshot of session state for /status. Webui equivalent of gateway/run.py:_handle_status_command. The agent's "agent_running" comes from `session_key in self._running_agents`; the webui equivalent is whether the session has an active stream (active_stream_id is set). """ s = get_session(session_id) inp = int(s.input_tokens or 0) out = int(s.output_tokens or 0) profile = getattr(s, 'profile', None) or 'default' try: from api.profiles import get_hermes_home_for_profile hermes_home = str(get_hermes_home_for_profile(profile)) except Exception: hermes_home = '' return { 'session_id': s.session_id, 'title': s.title, 'model': s.model, 'profile': profile, 'hermes_home': hermes_home, 'workspace': s.workspace, 'personality': s.personality, 'message_count': len(s.messages or []), 'created_at': s.created_at, 'updated_at': s.updated_at, 'agent_running': bool(getattr(s, 'active_stream_id', None)), # Expose the stream id itself (not just the agent_running bool) so a # hidden-tab poller can attach the live renderer to a server-initiated # turn (self-wake / cron / restart hook) without opening the persistent # per-session SSE while the tab is hidden. See messages.js hidden-tab # active-stream poll. Additive field — existing consumers ignore it. # # CRITICAL: only expose a stream id that is actually LIVE in this # process. After a restart/crash the persisted active_stream_id is stale # (the in-memory STREAMS/ACTIVE_RUNS were wiped) — handing that dead id # to the poller would make it attach a renderer to a stream that never # produces tokens (a permanent fake "thinking" state). Mirror # _clear_stale_stream_state's liveness test: a stream counts as live # only if it's in STREAMS (SSE channel open) or ACTIVE_RUNS (worker # bookkeeping). Otherwise report None so the poller waits for a REAL # server_turn_started instead of latching a ghost. 'active_stream_id': _live_active_stream_id(s), 'input_tokens': inp, 'output_tokens': out, 'total_tokens': inp + out, 'estimated_cost': s.estimated_cost, } def session_usage(session_id: str) -> dict[str, Any]: """Return token usage and cost for /usage. Mirrors gateway/run.py:_handle_usage_command's basic counters. The agent shows additional fields (rate-limit headroom etc.) that depend on provider API responses we don't have in webui -- those are deferred. """ s = get_session(session_id) inp = int(s.input_tokens or 0) out = int(s.output_tokens or 0) return { 'input_tokens': inp, 'output_tokens': out, 'total_tokens': inp + out, 'estimated_cost': s.estimated_cost, 'model': s.model, } def _extract_text(content: Any) -> str: """Flatten message content to plain text. Agent stores either a string or a list of {type, text|...} parts; webui needs the user-typed text.""" if isinstance(content, str): return content if isinstance(content, list): parts = [] for p in content: if isinstance(p, dict) and p.get('type') == 'text': parts.append(p.get('text', '')) return ' '.join(parts) return str(content)