Files
hermes-webui/api/session_ops.py
T

413 lines
17 KiB
Python

"""Session-mutation operations for slash commands (/retry, /undo) and
read-only aggregators (/status, /usage). Operates on the webui's own
JSON Session store (api/models.py), not on hermes-agent's SQLite.
Behavior parity reference: gateway/run.py:_handle_*_command in
the hermes-agent repo.
"""
from __future__ import annotations
import json
import logging
from typing import Any
from api.config import LOCK, _get_session_agent_lock
from api.models import get_session, SESSIONS
logger = logging.getLogger(__name__)
AUTO_TITLE_LABELS = {'untitled', 'new chat'}
def _live_active_stream_id(session) -> str | None:
"""Return session.active_stream_id ONLY if that stream is live in THIS
process; else None.
After a restart/crash the persisted active_stream_id survives in the
session JSON but the in-memory STREAMS / ACTIVE_RUNS that actually drive a
live turn were wiped. Exposing that dead id (e.g. via /api/session/status to
the hidden-tab poller) would make a client attach its renderer to a stream
that never emits — a permanent fake "thinking" state. Liveness test mirrors
routes._clear_stale_stream_state: live iff present in STREAMS (open SSE
channel) or ACTIVE_RUNS (worker bookkeeping).
"""
stream_id = getattr(session, 'active_stream_id', None)
if not stream_id:
return None
try:
from api import config as _cfg
with _cfg.STREAMS_LOCK:
if stream_id in _cfg.STREAMS:
return stream_id
with _cfg.ACTIVE_RUNS_LOCK:
if stream_id in (_cfg.ACTIVE_RUNS or {}):
return stream_id
except Exception:
# On any introspection failure, fail SAFE (report no live stream) rather
# than surfacing a possibly-stale id.
return None
return None
def session_has_manual_title(session) -> bool:
"""Return whether adaptive title refresh should leave this title alone."""
return getattr(session, 'manual_title', False) is True
def apply_session_title_rename(session, raw_title) -> str:
"""Apply user-driven rename semantics to a Session object.
Non-empty custom titles are protected from adaptive refresh. Clearing the
title, or resetting it to an automatic label, removes that protection so the
normal auto-title path can run again.
"""
title = str(raw_title or '').strip()[:80]
if not title:
title = 'Untitled'
manual_title = title.strip().casefold() not in AUTO_TITLE_LABELS
session.title = title
session.manual_title = manual_title
session.llm_title_generated = False
return title
def mark_session_title_generated(session) -> None:
"""Mark a session title as generated by the title model."""
session.llm_title_generated = True
session.manual_title = False
def _truncate_at_last_user(messages):
history = messages or []
last_user_idx = None
for i in range(len(history) - 1, -1, -1):
if isinstance(history[i], dict) and history[i].get('role') == 'user':
last_user_idx = i
break
if last_user_idx is None:
return None
return history[:last_user_idx]
def _truncation_watermark_for(messages):
history = list(messages or [])
if not history:
return 0.0
try:
return float(history[-1].get('timestamp') or 0)
except (AttributeError, TypeError, ValueError):
return 0.0
def truncate_context_for_display_keep(
context_messages: list | None,
full_messages: list | None,
keep: int,
) -> list:
"""Align model context with display prefix ``full_messages[:keep]``."""
if keep <= 0:
return []
ctx = context_messages if isinstance(context_messages, list) else []
msgs = full_messages if isinstance(full_messages, list) else []
if not ctx:
return []
if len(ctx) <= len(msgs):
return ctx[:keep]
if len(msgs) == 0:
return []
def _row_signature(row: Any) -> tuple[str, ...] | None:
if not isinstance(row, dict):
return None
tool_calls = row.get('tool_calls')
tool_calls_sig = json.dumps(tool_calls, sort_keys=True, default=str) if tool_calls else ''
return (
str(row.get('role') or ''),
str(row.get('content') or ''),
str(row.get('tool_call_id') or ''),
str(row.get('tool_use_id') or ''),
str(row.get('tool_name') or row.get('name') or ''),
tool_calls_sig,
)
def _first_match_from(message: Any, start_idx: int) -> tuple[int | None, int | None]:
msg_sig = _row_signature(message)
if msg_sig is None:
return None, None
msg_id = message.get('id')
msg_ts = message.get('timestamp')
weak_matches: list[int] = []
for idx in range(start_idx, len(ctx)):
context_row = ctx[idx]
context_sig = _row_signature(context_row)
if context_sig is None:
continue
context_id = context_row.get('id')
if context_id is not None and msg_id is not None:
if context_id == msg_id:
return idx, None
continue
if context_sig != msg_sig:
continue
context_ts = context_row.get('timestamp')
if context_ts is not None and msg_ts is not None:
if context_ts == msg_ts:
return idx, None
continue
weak_matches.append(idx)
if len(weak_matches) > 1:
return None, weak_matches[0]
return (weak_matches[0], None) if len(weak_matches) == 1 else (None, None)
matches = [None] * len(msgs)
ambiguous_matches = [None] * len(msgs)
next_ctx_idx = 0
for msg_idx, message in enumerate(msgs):
match_idx, ambiguous_idx = _first_match_from(message, next_ctx_idx)
matches[msg_idx] = match_idx
ambiguous_matches[msg_idx] = ambiguous_idx
if match_idx is not None:
next_ctx_idx = match_idx + 1
# Cut at the first unkept display turn, or fallback to the last kept turn
# if the boundary is not directly alignable.
if keep < len(msgs):
last_kept = None
if keep > 0:
last_kept = matches[keep - 1]
first_unkept = matches[keep]
if first_unkept is not None:
if (
last_kept is not None
and isinstance(msgs[keep - 1], dict)
and msgs[keep - 1].get('role') == 'user'
):
return ctx[:last_kept + 1]
return ctx[:first_unkept]
if last_kept is not None:
ambiguous_first_unkept = ambiguous_matches[keep]
if (
ambiguous_first_unkept is not None
and isinstance(msgs[keep - 1], dict)
and msgs[keep - 1].get('role') != 'user'
):
return ctx[:ambiguous_first_unkept]
return ctx[:last_kept + 1]
# Final fallback preserves #5096 behavior when alignment is unreliable.
prefix_len = max(0, len(ctx) - len(msgs))
prefix = ctx[:prefix_len]
suffix = ctx[prefix_len:]
return prefix + suffix[:keep]
def truncate_session_at_keep(session, keep: int) -> tuple[int, int]:
"""Truncate display + context; set watermark/boundary. Returns old counts."""
full_messages = list(session.messages or [])
old_msg_count = len(full_messages)
old_ctx_count = len(getattr(session, 'context_messages', None) or [])
session.messages = full_messages[:keep]
if isinstance(getattr(session, 'context_messages', None), list):
session.context_messages = truncate_context_for_display_keep(
session.context_messages,
full_messages,
keep,
)
session.truncation_watermark = _truncation_watermark_for(session.messages)
session.truncation_boundary = session.truncation_watermark
return old_msg_count, old_ctx_count
def retry_last(session_id: str) -> dict[str, Any]:
"""Truncate the session to before the last user message, return its text.
Mirrors gateway/run.py:_handle_retry_command. Caller (webui frontend)
is expected to put the returned text back in the composer and call
send() to resume the conversation -- the agent's gateway calls its own
_handle_message; the webui has no equivalent in-process pipeline.
Raises:
KeyError: session not found
ValueError: no user message in transcript
"""
# Acquire the per-session agent lock as the outermost lock so that the
# read-modify-write of s.messages is serialised with the periodic
# checkpoint thread, cancel_stream, and all other session writers.
# Lock ordering: _agent_lock → LOCK → _write_session_index (LOCK).
with _get_session_agent_lock(session_id):
# get_session() and Session.save() both acquire the module-level LOCK
# internally (the latter via _write_session_index()), and LOCK is a
# non-reentrant threading.Lock — so they MUST be called outside our
# own `with LOCK:` block to avoid self-deadlocking.
#
# The race we close is the read-modify-write of s.messages: two
# concurrent /api/session/retry calls could otherwise both compute the
# same last_user_idx from the same history and double-truncate. We
# serialize just the in-memory mutation; persistence happens inside
# the per-session lock so the checkpoint thread cannot race us.
#
# Stale-object guard: on a cache miss, two concurrent get_session()
# calls can each load and cache a *different* Session instance for the
# same session_id (the second store clobbers the first). Re-bind to
# the canonical cached instance inside the lock so the mutation lands
# on the object the next reader will see, not a stale parallel copy.
s = get_session(session_id) # raises KeyError if missing
with LOCK:
s = SESSIONS.get(session_id, s)
history = s.messages or []
last_user_idx = None
for i in range(len(history) - 1, -1, -1):
if history[i].get('role') == 'user':
last_user_idx = i
break
if last_user_idx is None:
raise ValueError('No previous message to retry.')
last_user_text = _extract_text(history[last_user_idx].get('content', ''))
removed_count = len(history) - last_user_idx
s.messages = history[:last_user_idx]
s.truncation_watermark = _truncation_watermark_for(s.messages)
# Persist the original truncate cutoff so empty-sidecar recovery
# can distinguish legitimate prefix from deleted suffix.
s.truncation_boundary = s.truncation_watermark
if isinstance(getattr(s, 'context_messages', None), list) and s.context_messages:
truncated_context = _truncate_at_last_user(s.context_messages)
if truncated_context is not None:
s.context_messages = truncated_context
s.save()
return {'last_user_text': last_user_text, 'removed_count': removed_count}
def undo_last(session_id: str) -> dict[str, Any]:
"""Remove the most recent user message and everything after it.
Mirrors gateway/run.py:_handle_undo_command. Returns a preview of the
removed text so the UI can confirm to the user.
Raises:
KeyError: session not found
ValueError: no user message in transcript
"""
# Acquire the per-session agent lock as the outermost lock so that the
# read-modify-write of s.messages is serialised with the periodic
# checkpoint thread, cancel_stream, and all other session writers.
# Lock ordering: _agent_lock → LOCK → _write_session_index (LOCK).
with _get_session_agent_lock(session_id):
s = get_session(session_id) # acquires LOCK transiently
with LOCK:
# Stale-object guard — see retry_last for the rationale.
s = SESSIONS.get(session_id, s)
history = s.messages or []
last_user_idx = None
for i in range(len(history) - 1, -1, -1):
if history[i].get('role') == 'user':
last_user_idx = i
break
if last_user_idx is None:
raise ValueError('Nothing to undo.')
removed_text = _extract_text(history[last_user_idx].get('content', ''))
removed_count = len(history) - last_user_idx
s.messages = history[:last_user_idx]
s.truncation_watermark = _truncation_watermark_for(s.messages)
# Persist the original truncate cutoff.
s.truncation_boundary = s.truncation_watermark
if isinstance(getattr(s, 'context_messages', None), list) and s.context_messages:
truncated_context = _truncate_at_last_user(s.context_messages)
if truncated_context is not None:
s.context_messages = truncated_context
s.save() # outside LOCK -- save() re-acquires LOCK via _write_session_index()
preview = (removed_text[:40] + '...') if len(removed_text) > 40 else removed_text
return {
'removed_count': removed_count,
'removed_preview': preview,
}
def session_status(session_id: str) -> dict[str, Any]:
"""Return a snapshot of session state for /status.
Webui equivalent of gateway/run.py:_handle_status_command. The agent's
"agent_running" comes from `session_key in self._running_agents`; the
webui equivalent is whether the session has an active stream
(active_stream_id is set).
"""
s = get_session(session_id)
inp = int(s.input_tokens or 0)
out = int(s.output_tokens or 0)
profile = getattr(s, 'profile', None) or 'default'
try:
from api.profiles import get_hermes_home_for_profile
hermes_home = str(get_hermes_home_for_profile(profile))
except Exception:
hermes_home = ''
return {
'session_id': s.session_id,
'title': s.title,
'model': s.model,
'profile': profile,
'hermes_home': hermes_home,
'workspace': s.workspace,
'personality': s.personality,
'message_count': len(s.messages or []),
'created_at': s.created_at,
'updated_at': s.updated_at,
'agent_running': bool(getattr(s, 'active_stream_id', None)),
# Expose the stream id itself (not just the agent_running bool) so a
# hidden-tab poller can attach the live renderer to a server-initiated
# turn (self-wake / cron / restart hook) without opening the persistent
# per-session SSE while the tab is hidden. See messages.js hidden-tab
# active-stream poll. Additive field — existing consumers ignore it.
#
# CRITICAL: only expose a stream id that is actually LIVE in this
# process. After a restart/crash the persisted active_stream_id is stale
# (the in-memory STREAMS/ACTIVE_RUNS were wiped) — handing that dead id
# to the poller would make it attach a renderer to a stream that never
# produces tokens (a permanent fake "thinking" state). Mirror
# _clear_stale_stream_state's liveness test: a stream counts as live
# only if it's in STREAMS (SSE channel open) or ACTIVE_RUNS (worker
# bookkeeping). Otherwise report None so the poller waits for a REAL
# server_turn_started instead of latching a ghost.
'active_stream_id': _live_active_stream_id(s),
'input_tokens': inp,
'output_tokens': out,
'total_tokens': inp + out,
'estimated_cost': s.estimated_cost,
}
def session_usage(session_id: str) -> dict[str, Any]:
"""Return token usage and cost for /usage.
Mirrors gateway/run.py:_handle_usage_command's basic counters. The
agent shows additional fields (rate-limit headroom etc.) that depend
on provider API responses we don't have in webui -- those are deferred.
"""
s = get_session(session_id)
inp = int(s.input_tokens or 0)
out = int(s.output_tokens or 0)
return {
'input_tokens': inp,
'output_tokens': out,
'total_tokens': inp + out,
'estimated_cost': s.estimated_cost,
'model': s.model,
}
def _extract_text(content: Any) -> str:
"""Flatten message content to plain text. Agent stores either a string
or a list of {type, text|...} parts; webui needs the user-typed text."""
if isinstance(content, str):
return content
if isinstance(content, list):
parts = []
for p in content:
if isinstance(p, dict) and p.get('type') == 'text':
parts.append(p.get('text', ''))
return ' '.join(parts)
return str(content)