mirror of
https://github.com/nesquena/hermes-webui.git
synced 2026-05-28 04:30:18 +00:00
v0.50.207: batch of 10 PRs — TPS stat, SSE guard, session polish, cron UX, folder create, model errors, session speed, title gen (#1031)
* fix: remove orphaned i18n keys from top-level LOCALES object Three Traditional Chinese translation keys (cmd_status, memory_saved, profile_delete_title) were placed outside any locale block between the en and ru blocks in static/i18n.js. They became top-level properties of the LOCALES object, causing them to appear as invalid language options in the Settings > Preferences dropdown. The correct translations already exist in the zh-Hant locale block. Fixes #1008 * fix: block stale SSE events from polluting new session's DOM - appendThinking(): guard with !S.session||!S.activeStreamId to drop events from a previous session's SSE stream during a session switch - appendLiveToolCard(): same guard for consistency - finalizeThinkingCard(): scroll thinking-card-body to top when scroll is pinned, so completed response is immediately visible - appendThinking(): auto-scroll thinking card body to bottom while streaming if user is watching (scroll pinned) * Fix empty agent sessions in sidebar * fix: resolve cron UI UX issues — icon ambiguity, toast overlap, running status Fixes #995 — three sub-issues in the Cron Jobs UI: 1. Dual play icons ambiguous: Resume button now shows a distinct play+bar icon (play triangle + vertical line) instead of the identical triangle used by Run now. 2. Toast notification overlapping header buttons: Added position:relative; z-index:10 to .main-view-header so it stacks above the fixed toast (z-index:100 within its layer). 3. No running status after trigger: After triggering a job, the status badge immediately shows 'running…' with a CSS spinner animation, and polls the cron list every 3s (up to 30s) to refresh when the job completes. - Added cron_status_running i18n key in all 5 locales (en, es, de, ru, zh, zh-Hant) - Added .detail-badge.running CSS class with spinner animation - New functions: _setCronDetailStatus(), _startCronRunningPoll() * fix(#1011): address review feedback — poll cleanup, badge persistence, 30s fallback - _clearCronDetail() now clears _cronRunningPoll interval on navigation - Poll re-applies 'running' badge after loadCrons() re-render (prevents flicker) - When poll ends (30s max), detail re-renders with actual status as fallback * feat: create folder and add space directly from UI (#782) - After creating a folder via the file tree New folder button, offer to add it as a space via confirm dialog - Add Create folder if it doesnt exist checkbox in the New Space form - Backend: support create flag in /api/workspaces/add to mkdir before validation - i18n: 4 new keys (folder_add_as_space_title/msg/btn, workspace_auto_create_folder) in all 6 locales * fix: validate workspace path before mkdir to prevent orphan directories Review feedback (critical): the previous code called mkdir() before validate_workspace_to_add(), which meant a rejected path (e.g. system dir) would leave an orphan directory on disk. New flow: 1. Resolve path and check against blocked system roots BEFORE any mutation 2. mkdir() only if path passes the blocklist check 3. Full validation (exists, is_dir) after mkdir Also imports _workspace_blocked_roots for the pre-mutation blocklist check. * fix(#1014): classify model-not-found errors with helpful message - Add model_not_found error type to streaming.py exception classifier - Detect 404, 'not found', 'does not exist', 'invalid model' patterns - Strip HTML tags from provider error messages (nginx 404 pages, etc.) - Add model_not_found branch to apperror handler in messages.js - Add i18n key model_not_found_label in all 6 locales - 15 tests covering detection, sanitization, frontend, and i18n * feat(ui): add live TPS stat to header Adds a TPS (Tokens Per Second) chip to the right of the header title bar that updates live while AI output is streaming. Metering (api/metering.py) - Tracks per-session output + reasoning tokens via GlobalMeter singleton - Per-session TPS = total_tokens / elapsed_time - Global TPS = average of active sessions' TPS values - HIGH/LOW are max/min of global_tps snapshots over a 60-minute rolling window (only recorded when > 0, so idle periods are excluded) - Thread-safe with a single lock Metering events emitted from streaming.py - Throttled at 100ms from token/reasoning/tool callbacks so the display updates rapidly during fast token streams - 1Hz ticker as fallback for slow streams (exits when no active sessions) - Final stats emitted on stream end Routes (api/routes.py) - Removed POST /api/metering/interval endpoint (dynamic interval via focus/blur was replaced with simple always-1s-when-active approach) UI (static/messages.js, index.html, style.css) - TPS chip in titlebar: shows 'N.N t/s . N.N high . N.N low' - Default: '0.0 t/s . 0.0 high' when idle - Display updates on every metering SSE event (throttled to 100ms) * feat: session restore speed + title gen reasoning hardening (#1025, #1026) PR #1025 (@franksong2702): Speed up large session restore paths - GET /api/session?messages=0 now parses only metadata before the messages array - Metadata-only loads no longer populate the full-session LRU cache - Frontend lazy fetch uses resolve_model=0 to avoid cold model-catalog lookup - Hard reload no longer waits for populateModelDropdown() before restoring session PR #1026 (@franksong2702): Harden auto title generation for reasoning models - Raises title-gen completion budget to 512 tokens (reasoning-safe) - Retries once with 1024 tokens on empty content / finish_reason:length - Applies retry to both auxiliary and active-agent fallback routes - Preserves underlying failure reason in title_status on local fallback Co-authored-by: Frank Song <franksong2702@gmail.com> * feat: session attention indicators in right slot + last_message_at timestamps (#1024) PR #1024 (@franksong2702): Polish session attention indicators - Streaming spinners and unread dots now reuse the right-side actions slot - Running/unread rows hide timestamps; idle/read rows keep right-aligned timestamps - Date group carets point down when expanded, right when collapsed - Pinned group no longer repeats pinned-star icon per row - Running indicators appear immediately after send (local busy state while /api/sessions catches up) - Sidebar sorting/grouping/timestamps now prefer last_message_at (derived from last real message) so metadata-only saves don't make old sessions appear under Today Co-authored-by: Frank Song <franksong2702@gmail.com> * docs: v0.50.207 release notes — 10 PRs, 2169 tests (+36) --------- Co-authored-by: bergeouss <bergeouss@users.noreply.github.com> Co-authored-by: Josh <josh@fyul.link> Co-authored-by: Frank Song <franksong2702@gmail.com> Co-authored-by: nesquena-hermes <nesquena-hermes@users.noreply.github.com>
This commit is contained in:
@@ -0,0 +1,55 @@
|
||||
"""Shared helpers for reading Hermes Agent sessions from state.db."""
|
||||
import logging
|
||||
import sqlite3
|
||||
from pathlib import Path
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def read_importable_agent_session_rows(db_path: Path, limit: int = 200, log=None) -> list[dict]:
|
||||
"""Return non-WebUI agent sessions that have readable message rows.
|
||||
|
||||
Hermes Agent can create rows in ``state.db.sessions`` before a session has
|
||||
any messages. WebUI cannot import those rows, so both the regular
|
||||
``/api/sessions`` path and the gateway SSE watcher must filter them the
|
||||
same way.
|
||||
"""
|
||||
db_path = Path(db_path)
|
||||
if not db_path.exists():
|
||||
return []
|
||||
|
||||
log = log or logger
|
||||
with sqlite3.connect(str(db_path)) as conn:
|
||||
conn.row_factory = sqlite3.Row
|
||||
cur = conn.cursor()
|
||||
|
||||
# Older Hermes Agent versions may not have source tracking. Without a
|
||||
# source column we cannot safely distinguish WebUI rows from agent rows.
|
||||
cur.execute("PRAGMA table_info(sessions)")
|
||||
session_cols = {row[1] for row in cur.fetchall()}
|
||||
if 'source' not in session_cols:
|
||||
log.warning(
|
||||
"agent session listing skipped: state.db at %s has no 'source' column "
|
||||
"(older hermes-agent?). Agent sessions unavailable. "
|
||||
"Upgrade hermes-agent to fix this.",
|
||||
db_path,
|
||||
)
|
||||
return []
|
||||
|
||||
cur.execute(
|
||||
"""
|
||||
SELECT s.id, s.title, s.model, s.message_count,
|
||||
s.started_at, s.source,
|
||||
COUNT(m.id) AS actual_message_count,
|
||||
MAX(m.timestamp) AS last_activity
|
||||
FROM sessions s
|
||||
LEFT JOIN messages m ON m.session_id = s.id
|
||||
WHERE s.source IS NOT NULL AND s.source != 'webui'
|
||||
GROUP BY s.id
|
||||
HAVING COUNT(m.id) > 0
|
||||
ORDER BY COALESCE(MAX(m.timestamp), s.started_at) DESC
|
||||
LIMIT ?
|
||||
""",
|
||||
(int(limit),),
|
||||
)
|
||||
return [dict(row) for row in cur.fetchall()]
|
||||
+13
-28
@@ -13,12 +13,12 @@ import json
|
||||
import logging
|
||||
import os
|
||||
import queue
|
||||
import sqlite3
|
||||
import threading
|
||||
import time
|
||||
from pathlib import Path
|
||||
|
||||
from api.config import HOME
|
||||
from api.agent_sessions import read_importable_agent_session_rows
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@@ -55,33 +55,18 @@ def _get_agent_sessions_from_db() -> list:
|
||||
return []
|
||||
|
||||
try:
|
||||
with sqlite3.connect(str(db_path)) as conn:
|
||||
conn.row_factory = sqlite3.Row
|
||||
cur = conn.cursor()
|
||||
cur.execute("""
|
||||
SELECT s.id, s.title, s.model, s.message_count,
|
||||
s.started_at, s.source,
|
||||
MAX(m.timestamp) AS last_activity
|
||||
FROM sessions s
|
||||
LEFT JOIN messages m ON m.session_id = s.id
|
||||
WHERE s.source IS NOT NULL AND s.source != 'webui'
|
||||
GROUP BY s.id
|
||||
HAVING COUNT(m.id) > 0
|
||||
ORDER BY COALESCE(MAX(m.timestamp), s.started_at) DESC
|
||||
LIMIT 200
|
||||
""")
|
||||
sessions = []
|
||||
for row in cur.fetchall():
|
||||
sessions.append({
|
||||
'session_id': row['id'],
|
||||
'title': row['title'] or 'Agent Session',
|
||||
'model': row['model'] or None,
|
||||
'message_count': row['message_count'] or 0,
|
||||
'created_at': row['started_at'],
|
||||
'updated_at': row['last_activity'] or row['started_at'],
|
||||
'source': row['source'] or 'cli',
|
||||
})
|
||||
return sessions
|
||||
sessions = []
|
||||
for row in read_importable_agent_session_rows(db_path, limit=200, log=logger):
|
||||
sessions.append({
|
||||
'session_id': row['id'],
|
||||
'title': row['title'] or 'Agent Session',
|
||||
'model': row['model'] or None,
|
||||
'message_count': row['message_count'] or row['actual_message_count'] or 0,
|
||||
'created_at': row['started_at'],
|
||||
'updated_at': row['last_activity'] or row['started_at'],
|
||||
'source': row['source'] or 'cli',
|
||||
})
|
||||
return sessions
|
||||
except Exception:
|
||||
return []
|
||||
|
||||
|
||||
+187
@@ -0,0 +1,187 @@
|
||||
"""
|
||||
Hermes Web UI -- Streaming performance metering.
|
||||
|
||||
Tracks Tokens Per Second (TPS) across all active WebUI sessions, and the
|
||||
HIGH/LOW TPS values observed over the past 60 minutes. Metering data is
|
||||
emitted via SSE events so the header label can update live during a stream.
|
||||
|
||||
Architecture
|
||||
────────────
|
||||
Each streaming session is tracked independently. TPS per session is:
|
||||
|
||||
session_tps = total_tokens / (last_token_ts - first_token_ts)
|
||||
|
||||
The global tps is the average of all currently active sessions' TPS values.
|
||||
This correctly represents the system's real-time capacity regardless of how
|
||||
many sessions are running or how long each has been streaming.
|
||||
|
||||
For HIGH/LOW tracking, every stats snapshot records the current global tps
|
||||
(only when > 0 — idle periods are skipped) into a rolling 60-minute history.
|
||||
The max/min of that history gives the peak throughput observed over the past hour.
|
||||
|
||||
The ticker in streaming.py calls get_interval() — it returns 1.0 when sessions
|
||||
are actively receiving tokens so the header updates at 1 Hz, and 10.0 when idle
|
||||
so the ticker exits and no idle readings are emitted.
|
||||
|
||||
Usage from api/streaming.py
|
||||
─────────────────────────────
|
||||
from api.metering import meter
|
||||
|
||||
meter().begin_session(stream_id) # stream starts
|
||||
meter().record_token(stream_id, running_output) # per output token
|
||||
meter().record_reasoning(stream_id, running_reasoning_len) # per reasoning token
|
||||
|
||||
The SSE `metering` event payload:
|
||||
{
|
||||
"tps": 47.3, # average TPS across active sessions (real-time)
|
||||
"high": 52.1, # highest average TPS observed in the past 60 minutes
|
||||
"low": 31.4, # lowest average TPS (excl. readings < 1 tps, to ignore idle)
|
||||
"active": 1, # sessions currently streaming
|
||||
}
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import threading
|
||||
import time
|
||||
from dataclasses import dataclass
|
||||
|
||||
_HOUR_SECS = 3600.0 # rolling window for HIGH/LOW tracking
|
||||
_STALE_SECS = 60.0 # consider a session inactive after this
|
||||
|
||||
|
||||
@dataclass
|
||||
class _SessionMeter:
|
||||
output_tokens: int = 0
|
||||
reasoning_tokens: int = 0
|
||||
first_token_ts: float = 0.0 # time.monotonic() of first token received
|
||||
last_token_ts: float = 0.0 # time.monotonic() of last token received
|
||||
|
||||
def total_tokens(self) -> int:
|
||||
return self.output_tokens + self.reasoning_tokens
|
||||
|
||||
def tps(self) -> float:
|
||||
if self.first_token_ts == 0.0 or self.last_token_ts <= self.first_token_ts:
|
||||
return 0.0
|
||||
return self.total_tokens() / (self.last_token_ts - self.first_token_ts)
|
||||
|
||||
|
||||
class GlobalMeter:
|
||||
"""Thread-safe global streaming meter.
|
||||
|
||||
Tracks per-session TPS, averages them for a global tps, and maintains a
|
||||
60-minute rolling history of global tps snapshots for HIGH/LOW reporting.
|
||||
"""
|
||||
|
||||
__slots__ = (
|
||||
'_lock',
|
||||
'_sessions', # stream_id -> _SessionMeter
|
||||
'_readings', # [(monotonic_ts, tps), ...] rolling 60-minute history
|
||||
'_window_start', # monotonic ts of current window
|
||||
)
|
||||
|
||||
def __init__(self) -> None:
|
||||
self._lock = threading.Lock()
|
||||
self._sessions: dict[str, _SessionMeter] = {}
|
||||
self._readings: list[tuple[float, float]] = []
|
||||
self._window_start: float = time.monotonic()
|
||||
|
||||
# ── Public API ────────────────────────────────────────────────────────────
|
||||
|
||||
def begin_session(self, stream_id: str) -> None:
|
||||
with self._lock:
|
||||
self._sessions[stream_id] = _SessionMeter()
|
||||
|
||||
def get_interval(self) -> float:
|
||||
"""Return 1.0 when sessions are actively receiving tokens, 10.0 when idle.
|
||||
|
||||
Used by the streaming ticker to run at 1 Hz during work and exit when
|
||||
there is nothing to measure.
|
||||
"""
|
||||
now = time.monotonic()
|
||||
with self._lock:
|
||||
# Only count sessions that have received at least one token recently.
|
||||
active_sids = {
|
||||
sid for sid, s in self._sessions.items()
|
||||
if s.first_token_ts > 0 and (now - s.last_token_ts) <= _STALE_SECS
|
||||
}
|
||||
return 1.0 if active_sids else 10.0
|
||||
|
||||
def record_token(self, stream_id: str, running_output_tokens: int) -> None:
|
||||
now = time.monotonic()
|
||||
with self._lock:
|
||||
s = self._sessions.get(stream_id)
|
||||
if s is None:
|
||||
return
|
||||
if s.first_token_ts == 0.0:
|
||||
s.first_token_ts = now
|
||||
s.last_token_ts = now
|
||||
s.output_tokens = running_output_tokens
|
||||
|
||||
def record_reasoning(self, stream_id: str, running_reasoning_tokens: int) -> None:
|
||||
now = time.monotonic()
|
||||
with self._lock:
|
||||
s = self._sessions.get(stream_id)
|
||||
if s is None:
|
||||
return
|
||||
if s.first_token_ts == 0.0:
|
||||
s.first_token_ts = now
|
||||
s.last_token_ts = now
|
||||
s.reasoning_tokens = running_reasoning_tokens
|
||||
|
||||
def end_session(self, stream_id: str, final_output_tokens: int, input_tokens: int = 0) -> None:
|
||||
with self._lock:
|
||||
self._sessions.pop(stream_id, None)
|
||||
|
||||
def get_stats(self) -> dict:
|
||||
now = time.monotonic()
|
||||
with self._lock:
|
||||
# Prune stale sessions
|
||||
stale = [
|
||||
sid for sid, s in self._sessions.items()
|
||||
if s.first_token_ts > 0 and (now - s.last_token_ts) > _STALE_SECS
|
||||
]
|
||||
for sid in stale:
|
||||
self._sessions.pop(sid, None)
|
||||
|
||||
# Reset window if everything went stale
|
||||
if not self._sessions:
|
||||
self._window_start = now
|
||||
|
||||
# Compute global tps: average of per-session TPS values
|
||||
active = [s for s in self._sessions.values() if s.first_token_ts > 0]
|
||||
if active:
|
||||
global_tps = sum(s.tps() for s in active) / len(active)
|
||||
else:
|
||||
global_tps = 0.0
|
||||
|
||||
# Prune readings older than 1 hour
|
||||
cutoff = now - _HOUR_SECS
|
||||
self._readings = [(ts, v) for ts, v in self._readings if ts > cutoff]
|
||||
|
||||
# Only record this snapshot for HIGH/LOW if there is active work.
|
||||
# This prevents idle periods from flooding the history and keeps
|
||||
# HIGH/LOW meaningful for the past hour of actual throughput.
|
||||
if global_tps > 0:
|
||||
self._readings.append((now, global_tps))
|
||||
|
||||
# HIGH/LOW from the past hour (skip near-zero idle readings)
|
||||
active_readings = [v for _, v in self._readings if v >= 1.0]
|
||||
high = max(active_readings) if active_readings else 0.0
|
||||
low = min(active_readings) if active_readings else 0.0
|
||||
|
||||
return {
|
||||
'tps': round(global_tps, 1),
|
||||
'high': round(high, 1),
|
||||
'low': round(low, 1),
|
||||
'active': len(self._sessions),
|
||||
}
|
||||
|
||||
|
||||
# ── Module-level singleton ─────────────────────────────────────────────────────
|
||||
|
||||
_meter = GlobalMeter()
|
||||
|
||||
|
||||
def meter() -> GlobalMeter:
|
||||
return _meter
|
||||
+172
-84
@@ -15,6 +15,7 @@ from api.config import (
|
||||
get_effective_default_model,
|
||||
)
|
||||
from api.workspace import get_last_workspace
|
||||
from api.agent_sessions import read_importable_agent_session_rows
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@@ -193,6 +194,114 @@ def _active_stream_ids():
|
||||
def _is_streaming_session(active_stream_id, active_stream_ids):
|
||||
return bool(active_stream_id and active_stream_id in active_stream_ids)
|
||||
|
||||
def _session_sort_timestamp(session):
|
||||
if isinstance(session, dict):
|
||||
return session.get('last_message_at') or session.get('updated_at') or 0
|
||||
return _last_message_timestamp(getattr(session, 'messages', None)) or getattr(session, 'updated_at', 0) or 0
|
||||
|
||||
|
||||
def _message_timestamp(message):
|
||||
if not isinstance(message, dict):
|
||||
return None
|
||||
raw = message.get('_ts') or message.get('timestamp')
|
||||
try:
|
||||
return float(raw) if raw is not None else None
|
||||
except (TypeError, ValueError):
|
||||
return None
|
||||
|
||||
|
||||
def _last_message_timestamp(messages):
|
||||
if not isinstance(messages, list):
|
||||
return None
|
||||
for message in reversed(messages):
|
||||
if isinstance(message, dict) and message.get('role') == 'tool':
|
||||
continue
|
||||
ts = _message_timestamp(message)
|
||||
if ts:
|
||||
return ts
|
||||
return None
|
||||
|
||||
|
||||
def _find_top_level_json_key(text, key):
|
||||
"""Return the byte offset of a top-level JSON object key, if present."""
|
||||
depth = 0
|
||||
i = 0
|
||||
n = len(text)
|
||||
while i < n:
|
||||
ch = text[i]
|
||||
if ch == '"':
|
||||
start = i
|
||||
i += 1
|
||||
escaped = False
|
||||
chars = []
|
||||
while i < n:
|
||||
c = text[i]
|
||||
if escaped:
|
||||
chars.append(c)
|
||||
escaped = False
|
||||
elif c == '\\':
|
||||
escaped = True
|
||||
elif c == '"':
|
||||
break
|
||||
else:
|
||||
chars.append(c)
|
||||
i += 1
|
||||
if i >= n:
|
||||
return None
|
||||
if depth == 1 and ''.join(chars) == key:
|
||||
j = i + 1
|
||||
while j < n and text[j] in ' \t\r\n':
|
||||
j += 1
|
||||
if j < n and text[j] == ':':
|
||||
return start
|
||||
elif ch in '{[':
|
||||
depth += 1
|
||||
elif ch in '}]':
|
||||
depth -= 1
|
||||
i += 1
|
||||
return None
|
||||
|
||||
|
||||
def _read_metadata_json_prefix(path, max_prefix_bytes=65536):
|
||||
"""Read only the metadata portion before the top-level messages array."""
|
||||
buf = ''
|
||||
with open(path, 'r', encoding='utf-8') as f:
|
||||
while len(buf.encode('utf-8')) < max_prefix_bytes:
|
||||
chunk = f.read(4096)
|
||||
if not chunk:
|
||||
return None
|
||||
buf += chunk
|
||||
messages_pos = _find_top_level_json_key(buf, 'messages')
|
||||
if messages_pos is None:
|
||||
continue
|
||||
prefix = buf[:messages_pos].rstrip()
|
||||
if prefix.endswith(','):
|
||||
prefix = prefix[:-1].rstrip()
|
||||
return f'{prefix}\n}}'
|
||||
return None
|
||||
|
||||
|
||||
def _lookup_index_message_count(session_id):
|
||||
"""Return the indexed message count without loading the full session file."""
|
||||
try:
|
||||
entries = json.loads(SESSION_INDEX_FILE.read_text(encoding='utf-8'))
|
||||
except Exception:
|
||||
return None
|
||||
if not isinstance(entries, list):
|
||||
return None
|
||||
for entry in entries:
|
||||
if entry.get('session_id') != session_id:
|
||||
continue
|
||||
count = entry.get('message_count')
|
||||
if isinstance(count, int) and count >= 0:
|
||||
return count
|
||||
try:
|
||||
count = int(count)
|
||||
except (TypeError, ValueError):
|
||||
return None
|
||||
return count if count >= 0 else None
|
||||
return None
|
||||
|
||||
|
||||
class Session:
|
||||
def __init__(self, session_id: str=None, title: str='Untitled',
|
||||
@@ -231,6 +340,7 @@ class Session:
|
||||
self.pending_started_at = pending_started_at
|
||||
self.compression_anchor_visible_idx = compression_anchor_visible_idx
|
||||
self.compression_anchor_message_key = compression_anchor_message_key
|
||||
self._metadata_message_count = None
|
||||
|
||||
@property
|
||||
def path(self):
|
||||
@@ -255,7 +365,8 @@ class Session:
|
||||
meta['tool_calls'] = self.tool_calls
|
||||
# Fields not in METADATA_FIELDS (e.g. last_usage, message_count) go at the end
|
||||
extra = {k: v for k, v in self.__dict__.items()
|
||||
if k not in METADATA_FIELDS and k not in ('messages', 'tool_calls')}
|
||||
if k not in METADATA_FIELDS and k not in ('messages', 'tool_calls')
|
||||
and not k.startswith('_')}
|
||||
payload = json.dumps({**meta, **extra}, ensure_ascii=False, indent=2)
|
||||
tmp = self.path.with_suffix(f'.tmp.{os.getpid()}.{threading.current_thread().ident}')
|
||||
try:
|
||||
@@ -288,10 +399,9 @@ class Session:
|
||||
"""Load only the compact metadata fields, skipping the messages array.
|
||||
|
||||
Session JSON files have metadata fields (session_id, title, model, etc.)
|
||||
at the top level, before the large messages array. We read only the
|
||||
first ~1KB — enough to capture all compact() fields — then parse just
|
||||
that prefix. Falls back to load() if the prefix doesn't contain enough
|
||||
fields or if the file is unexpectedly small.
|
||||
at the top level, before the large messages array. Read only up to the
|
||||
top-level "messages" field and synthesize a small metadata-only object.
|
||||
Falls back to load() for legacy or unexpected file layouts.
|
||||
"""
|
||||
if not sid or not all(c in '0123456789abcdefghijklmnopqrstuvwxyz_' for c in sid):
|
||||
return None
|
||||
@@ -299,26 +409,18 @@ class Session:
|
||||
if not p.exists():
|
||||
return None
|
||||
try:
|
||||
# Read just the first 1 KB — metadata comes before messages array
|
||||
with open(p, 'r', encoding='utf-8') as f:
|
||||
prefix = f.read(1024)
|
||||
prefix = _read_metadata_json_prefix(p)
|
||||
if not prefix:
|
||||
return cls.load(sid)
|
||||
parsed = json.loads(prefix)
|
||||
# Verify we got the essential fields.
|
||||
# With metadata-first save() ordering, messages appears at byte ~567.
|
||||
# For sessions <= ~512 bytes total the entire messages array fits in the
|
||||
# first 1 KB and we get a valid list. For larger sessions json.loads
|
||||
# fails on the truncated buffer (unterminated string), so we fall back
|
||||
# to full load. The one exception is a truncation inside a string value
|
||||
# that happens to produce valid JSON with a truncated string — guard
|
||||
# against that by requiring messages to be a list.
|
||||
needed = {'session_id', 'title', 'created_at', 'updated_at'}
|
||||
if not needed.issubset(parsed.keys()):
|
||||
return cls.load(sid)
|
||||
if not isinstance(parsed.get('messages'), list):
|
||||
return cls.load(sid)
|
||||
return cls(**parsed)
|
||||
parsed['messages'] = []
|
||||
parsed['tool_calls'] = []
|
||||
session = cls(**parsed)
|
||||
session._metadata_message_count = _lookup_index_message_count(sid)
|
||||
return session
|
||||
except Exception:
|
||||
# Corrupt prefix or decode error — fall back to full load
|
||||
return cls.load(sid)
|
||||
@@ -330,9 +432,14 @@ class Session:
|
||||
'title': self.title,
|
||||
'workspace': self.workspace,
|
||||
'model': self.model,
|
||||
'message_count': len(self.messages),
|
||||
'message_count': (
|
||||
self._metadata_message_count
|
||||
if self._metadata_message_count is not None
|
||||
else len(self.messages)
|
||||
),
|
||||
'created_at': self.created_at,
|
||||
'updated_at': self.updated_at,
|
||||
'last_message_at': _last_message_timestamp(self.messages) or self.updated_at,
|
||||
'pinned': self.pinned,
|
||||
'archived': self.archived,
|
||||
'project_id': self.project_id,
|
||||
@@ -352,9 +459,10 @@ class Session:
|
||||
def get_session(sid, metadata_only=False):
|
||||
"""Load a session, optionally with metadata only (skipping the messages array).
|
||||
|
||||
When metadata_only=True the session is still cached so the full load on the
|
||||
next access is fast. Use this when you only need compact() metadata and not
|
||||
the actual message history (e.g., for fast sidebar switching).
|
||||
Metadata-only loads intentionally do not populate the full-session cache.
|
||||
Otherwise a later full load could return a compact object with an empty
|
||||
messages list. Use this when you only need compact() metadata and not the
|
||||
actual message history (e.g., for fast sidebar switching).
|
||||
"""
|
||||
with LOCK:
|
||||
if sid in SESSIONS:
|
||||
@@ -362,6 +470,8 @@ def get_session(sid, metadata_only=False):
|
||||
return SESSIONS[sid]
|
||||
if metadata_only:
|
||||
s = Session.load_metadata_only(sid)
|
||||
if s:
|
||||
return s
|
||||
else:
|
||||
s = Session.load(sid)
|
||||
if s:
|
||||
@@ -413,6 +523,18 @@ def all_sessions():
|
||||
s for s in index
|
||||
if _index_entry_exists(s.get('session_id'))
|
||||
]
|
||||
backfilled = []
|
||||
for i, s in enumerate(index):
|
||||
if 'last_message_at' not in s:
|
||||
full = Session.load(s.get('session_id'))
|
||||
if full:
|
||||
index[i] = full.compact()
|
||||
backfilled.append(full)
|
||||
if backfilled:
|
||||
try:
|
||||
_write_session_index(updates=backfilled)
|
||||
except Exception:
|
||||
logger.debug("Failed to persist last_message_at backfill")
|
||||
for s in index:
|
||||
s['is_streaming'] = _is_streaming_session(
|
||||
s.get('active_stream_id'),
|
||||
@@ -426,7 +548,7 @@ def all_sessions():
|
||||
include_runtime=True,
|
||||
active_stream_ids=active_stream_ids,
|
||||
)
|
||||
result = sorted(index_map.values(), key=lambda s: (s.get('pinned', False), s['updated_at']), reverse=True)
|
||||
result = sorted(index_map.values(), key=lambda s: (s.get('pinned', False), _session_sort_timestamp(s)), reverse=True)
|
||||
# Hide empty Untitled sessions from the UI (created by tests, page refreshes, etc.)
|
||||
# Exempt sessions younger than 60 s so a brand-new session stays visible (#789)
|
||||
_now = time.time()
|
||||
@@ -454,7 +576,7 @@ def all_sessions():
|
||||
logger.debug("Failed to load session from %s", p)
|
||||
for s in SESSIONS.values():
|
||||
if all(s.session_id != x.session_id for x in out): out.append(s)
|
||||
out.sort(key=lambda s: (getattr(s, 'pinned', False), s.updated_at), reverse=True)
|
||||
out.sort(key=lambda s: (getattr(s, 'pinned', False), _session_sort_timestamp(s)), reverse=True)
|
||||
_now = time.time()
|
||||
result = [s.compact(include_runtime=True, active_stream_ids=active_stream_ids) for s in out if not (
|
||||
s.title == 'Untitled'
|
||||
@@ -528,16 +650,11 @@ def get_cli_sessions() -> list:
|
||||
"""Read CLI sessions from the agent's SQLite store and return them as
|
||||
dicts in a format the WebUI sidebar can render alongside local sessions.
|
||||
|
||||
Returns empty list if the SQLite DB is missing, the sqlite3 module is
|
||||
unavailable, or any error occurs -- the bridge is purely additive and never
|
||||
crashes the WebUI.
|
||||
Returns empty list if the SQLite DB is missing or any error occurs -- the
|
||||
bridge is purely additive and never crashes the WebUI.
|
||||
"""
|
||||
import os
|
||||
cli_sessions = []
|
||||
try:
|
||||
import sqlite3
|
||||
except ImportError:
|
||||
return cli_sessions
|
||||
|
||||
# Use the active WebUI profile's HERMES_HOME to find state.db.
|
||||
# The active profile is determined by what the user has selected in the UI
|
||||
@@ -566,59 +683,30 @@ def get_cli_sessions() -> list:
|
||||
_cli_profile = None # older agent -- fall back to no profile
|
||||
|
||||
try:
|
||||
with sqlite3.connect(str(db_path)) as conn:
|
||||
conn.row_factory = sqlite3.Row
|
||||
cur = conn.cursor()
|
||||
# Introspect schema to handle older hermes-agent versions that
|
||||
# may not have a 'source' column. Without this check the query raises
|
||||
# OperationalError which is silently swallowed, causing the empty-list bug.
|
||||
cur.execute("PRAGMA table_info(sessions)")
|
||||
_session_cols = {row[1] for row in cur.fetchall()}
|
||||
if 'source' not in _session_cols:
|
||||
import logging as _logging
|
||||
_logging.getLogger(__name__).warning(
|
||||
"get_cli_sessions(): state.db at %s has no 'source' column "
|
||||
"(older hermes-agent?). CLI sessions unavailable. "
|
||||
"Upgrade hermes-agent to fix this.",
|
||||
db_path,
|
||||
)
|
||||
return cli_sessions
|
||||
for row in read_importable_agent_session_rows(db_path, limit=200, log=logger):
|
||||
sid = row['id']
|
||||
raw_ts = row['last_activity'] or row['started_at']
|
||||
# Prefer the CLI session's own profile from the DB; fall back to
|
||||
# the active CLI profile so sidebar filtering works either way.
|
||||
profile = _cli_profile # CLI DB has no profile column; use active profile
|
||||
|
||||
cur.execute("""
|
||||
SELECT s.id, s.title, s.model, s.message_count,
|
||||
s.started_at, s.source,
|
||||
MAX(m.timestamp) AS last_activity
|
||||
FROM sessions s
|
||||
LEFT JOIN messages m ON m.session_id = s.id
|
||||
WHERE s.source IS NOT NULL AND s.source != 'webui'
|
||||
GROUP BY s.id
|
||||
ORDER BY COALESCE(MAX(m.timestamp), s.started_at) DESC
|
||||
LIMIT 200
|
||||
""")
|
||||
for row in cur.fetchall():
|
||||
sid = row['id']
|
||||
raw_ts = row['last_activity'] or row['started_at']
|
||||
# Prefer the CLI session's own profile from the DB; fall back to
|
||||
# the active CLI profile so sidebar filtering works either way.
|
||||
profile = _cli_profile # CLI DB has no profile column; use active profile
|
||||
|
||||
_source = row['source'] or 'cli'
|
||||
_display_title = row['title'] or f'{_source.title()} Session'
|
||||
cli_sessions.append({
|
||||
'session_id': sid,
|
||||
'title': _display_title,
|
||||
'workspace': str(get_last_workspace()),
|
||||
'model': row['model'] or None,
|
||||
'message_count': row['message_count'] or 0,
|
||||
'created_at': row['started_at'],
|
||||
'updated_at': raw_ts,
|
||||
'pinned': False,
|
||||
'archived': False,
|
||||
'project_id': None,
|
||||
'profile': profile,
|
||||
'source_tag': _source,
|
||||
'is_cli_session': True,
|
||||
})
|
||||
_source = row['source'] or 'cli'
|
||||
_display_title = row['title'] or f'{_source.title()} Session'
|
||||
cli_sessions.append({
|
||||
'session_id': sid,
|
||||
'title': _display_title,
|
||||
'workspace': str(get_last_workspace()),
|
||||
'model': row['model'] or None,
|
||||
'message_count': row['message_count'] or row['actual_message_count'] or 0,
|
||||
'created_at': row['started_at'],
|
||||
'updated_at': raw_ts,
|
||||
'pinned': False,
|
||||
'archived': False,
|
||||
'project_id': None,
|
||||
'profile': profile,
|
||||
'source_tag': _source,
|
||||
'is_cli_session': True,
|
||||
})
|
||||
except Exception as _cli_err:
|
||||
# DB schema changed, locked, or corrupted -- log warning so admins can diagnose.
|
||||
# Still degrade gracefully (don't crash the WebUI).
|
||||
|
||||
+34
-4
@@ -329,6 +329,7 @@ from api.workspace import (
|
||||
safe_resolve_ws,
|
||||
resolve_trusted_workspace,
|
||||
validate_workspace_to_add,
|
||||
_workspace_blocked_roots,
|
||||
)
|
||||
from api.upload import handle_upload, handle_transcribe
|
||||
from api.streaming import _sse, _run_agent_streaming, cancel_stream
|
||||
@@ -680,19 +681,26 @@ def handle_get(handler, parsed) -> bool:
|
||||
import time as _time
|
||||
_t0 = _time.monotonic()
|
||||
_debug_slow = os.environ.get("HERMES_DEBUG_SLOW", "")
|
||||
sid = parse_qs(parsed.query).get("session_id", [""])[0]
|
||||
query = parse_qs(parsed.query)
|
||||
sid = query.get("session_id", [""])[0]
|
||||
if not sid:
|
||||
return j(handler, {"error": "session_id is required"}, status=400)
|
||||
# ?messages=0 skips the message payload for fast session switching.
|
||||
# The frontend uses this when switching conversations in the sidebar
|
||||
# (only needs metadata). The full message array is loaded lazily
|
||||
# via ?messages=1 when the message panel opens.
|
||||
load_messages = parse_qs(parsed.query).get("messages", ["1"])[0] != "0"
|
||||
load_messages = query.get("messages", ["1"])[0] != "0"
|
||||
resolve_model_default = "1" if load_messages else "0"
|
||||
resolve_model = query.get("resolve_model", [resolve_model_default])[0] != "0"
|
||||
try:
|
||||
_t1 = _time.monotonic()
|
||||
s = get_session(sid, metadata_only=(not load_messages))
|
||||
_t2 = _time.monotonic()
|
||||
effective_model = _resolve_effective_session_model_for_display(s)
|
||||
effective_model = (
|
||||
_resolve_effective_session_model_for_display(s)
|
||||
if resolve_model
|
||||
else None
|
||||
)
|
||||
_t3 = _time.monotonic()
|
||||
raw = s.compact() | {
|
||||
"messages": s.messages if load_messages else [],
|
||||
@@ -735,6 +743,8 @@ def handle_get(handler, parsed) -> bool:
|
||||
"message_count": len(msgs),
|
||||
"created_at": (cli_meta or {}).get("created_at", 0),
|
||||
"updated_at": (cli_meta or {}).get("updated_at", 0),
|
||||
"last_message_at": (cli_meta or {}).get("last_message_at")
|
||||
or (cli_meta or {}).get("updated_at", 0),
|
||||
"pinned": False,
|
||||
"archived": False,
|
||||
"project_id": None,
|
||||
@@ -783,7 +793,10 @@ def handle_get(handler, parsed) -> bool:
|
||||
else:
|
||||
deduped_cli = []
|
||||
merged = webui_sessions + deduped_cli
|
||||
merged.sort(key=lambda s: s.get("updated_at", 0) or 0, reverse=True)
|
||||
merged.sort(
|
||||
key=lambda s: s.get("last_message_at") or s.get("updated_at", 0) or 0,
|
||||
reverse=True,
|
||||
)
|
||||
safe_merged = []
|
||||
for s in merged:
|
||||
item = dict(s)
|
||||
@@ -3027,8 +3040,25 @@ def _handle_create_dir(handler, body):
|
||||
def _handle_workspace_add(handler, body):
|
||||
path_str = body.get("path", "").strip()
|
||||
name = body.get("name", "").strip()
|
||||
auto_create = body.get("create", False)
|
||||
if not path_str:
|
||||
return bad(handler, "path is required")
|
||||
# Validate the path is NOT a blocked system root BEFORE any filesystem mutation.
|
||||
# This prevents creating orphan directories on rejected paths (#782 review).
|
||||
candidate = Path(path_str).expanduser().resolve()
|
||||
for blocked in _workspace_blocked_roots():
|
||||
try:
|
||||
candidate.relative_to(blocked)
|
||||
return bad(handler, f"Path points to a system directory: {candidate}")
|
||||
except ValueError:
|
||||
pass
|
||||
# Now safe to create the directory if requested
|
||||
if auto_create:
|
||||
try:
|
||||
candidate.mkdir(parents=True, exist_ok=True)
|
||||
except (OSError, PermissionError) as e:
|
||||
return bad(handler, f"Could not create directory: {_sanitize_error(e)}")
|
||||
# Full validation (exists, is_dir) — should pass now that dir exists
|
||||
try:
|
||||
p = validate_workspace_to_add(path_str)
|
||||
except ValueError as e:
|
||||
|
||||
+232
-75
@@ -25,6 +25,7 @@ from api.config import (
|
||||
resolve_model_provider,
|
||||
)
|
||||
from api.helpers import redact_session_data
|
||||
from api.metering import meter
|
||||
|
||||
# Global lock for os.environ writes. Per-session locks (_agent_lock) prevent
|
||||
# concurrent runs of the SAME session, but two DIFFERENT sessions can still
|
||||
@@ -292,9 +293,71 @@ def _aux_title_timeout(default: float = 15.0) -> float:
|
||||
return default
|
||||
|
||||
def _title_completion_budget(provider: str = '', model: str = '', base_url: str = '') -> int:
|
||||
if _is_minimax_route(provider, model, base_url):
|
||||
return 384
|
||||
return 160
|
||||
# Title generation is a small auxiliary task, but reasoning models may
|
||||
# spend a surprising amount of the completion budget before emitting final
|
||||
# content. Keep the budget high enough for MiniMax/Kimi-style reasoning
|
||||
# responses without making title generation depend on provider-specific
|
||||
# one-off branches.
|
||||
return 512
|
||||
|
||||
|
||||
def _title_retry_completion_budget(provider: str = '', model: str = '', base_url: str = '') -> int:
|
||||
return max(1024, _title_completion_budget(provider, model, base_url) * 2)
|
||||
|
||||
|
||||
def _title_retry_status(status: str) -> bool:
|
||||
return status in {
|
||||
'llm_length',
|
||||
'llm_length_aux',
|
||||
'llm_empty_reasoning',
|
||||
'llm_empty_reasoning_aux',
|
||||
}
|
||||
|
||||
|
||||
def _safe_obj_value(obj, key: str):
|
||||
if obj is None:
|
||||
return None
|
||||
if isinstance(obj, dict):
|
||||
return obj.get(key)
|
||||
value = getattr(obj, key, None)
|
||||
# Missing MagicMock attrs stringify as mock reprs and look truthy. Treat
|
||||
# them as absent so tests model real provider objects accurately.
|
||||
if value.__class__.__module__.startswith('unittest.mock'):
|
||||
return None
|
||||
return value
|
||||
|
||||
|
||||
def _safe_text_value(value) -> str:
|
||||
if value is None:
|
||||
return ''
|
||||
if value.__class__.__module__.startswith('unittest.mock'):
|
||||
return ''
|
||||
return str(value or '').strip()
|
||||
|
||||
|
||||
def _extract_title_response(resp, *, aux: bool = False) -> tuple[str, str]:
|
||||
"""Return (content, empty_status) from an OpenAI-compatible response."""
|
||||
suffix = '_aux' if aux else ''
|
||||
try:
|
||||
choices = _safe_obj_value(resp, 'choices') or []
|
||||
choice = choices[0] if choices else None
|
||||
message = _safe_obj_value(choice, 'message')
|
||||
content = _safe_text_value(_safe_obj_value(message, 'content'))
|
||||
if content:
|
||||
return content, ''
|
||||
finish_reason = _safe_text_value(_safe_obj_value(choice, 'finish_reason')).lower()
|
||||
reasoning = (
|
||||
_safe_text_value(_safe_obj_value(message, 'reasoning'))
|
||||
or _safe_text_value(_safe_obj_value(message, 'reasoning_content'))
|
||||
or _safe_text_value(_safe_obj_value(message, 'thinking'))
|
||||
)
|
||||
if finish_reason == 'length':
|
||||
return '', f'llm_length{suffix}'
|
||||
if reasoning:
|
||||
return '', f'llm_empty_reasoning{suffix}'
|
||||
return '', f'llm_empty{suffix}'
|
||||
except Exception:
|
||||
return '', f'llm_empty{suffix}'
|
||||
|
||||
|
||||
def generate_title_raw_via_aux(
|
||||
@@ -308,41 +371,43 @@ def generate_title_raw_via_aux(
|
||||
if not user_text or not assistant_text:
|
||||
return None, 'missing_exchange'
|
||||
qa, prompts = _title_prompts(user_text, assistant_text)
|
||||
max_tokens = _title_completion_budget(provider, model, base_url)
|
||||
base_max_tokens = _title_completion_budget(provider, model, base_url)
|
||||
reasoning_extra = {"reasoning": {"enabled": False}}
|
||||
if _is_minimax_route(provider, model, base_url):
|
||||
reasoning_extra["reasoning_split"] = True
|
||||
try:
|
||||
_timeout = _aux_title_timeout()
|
||||
from agent.auxiliary_client import call_llm
|
||||
last_status = 'llm_error_aux'
|
||||
for idx, prompt in enumerate(prompts):
|
||||
messages = [
|
||||
{"role": "system", "content": prompt},
|
||||
{"role": "user", "content": qa},
|
||||
]
|
||||
budgets = [base_max_tokens]
|
||||
try:
|
||||
resp = call_llm(
|
||||
task='title_generation',
|
||||
provider=provider or None,
|
||||
model=model or None,
|
||||
base_url=base_url or None,
|
||||
messages=messages,
|
||||
max_tokens=max_tokens,
|
||||
temperature=0.2,
|
||||
timeout=_timeout,
|
||||
extra_body=reasoning_extra,
|
||||
)
|
||||
raw = ''
|
||||
try:
|
||||
raw = resp.choices[0].message.content or ''
|
||||
except Exception:
|
||||
raw = ''
|
||||
raw = str(raw or '').strip()
|
||||
if raw:
|
||||
return raw, ('llm_aux' if idx == 0 else 'llm_aux_retry')
|
||||
for budget_idx, max_tokens in enumerate(budgets):
|
||||
resp = call_llm(
|
||||
task='title_generation',
|
||||
provider=provider or None,
|
||||
model=model or None,
|
||||
base_url=base_url or None,
|
||||
messages=messages,
|
||||
max_tokens=max_tokens,
|
||||
temperature=0.2,
|
||||
timeout=_timeout,
|
||||
extra_body=reasoning_extra,
|
||||
)
|
||||
raw, empty_status = _extract_title_response(resp, aux=True)
|
||||
if raw:
|
||||
return raw, ('llm_aux' if idx == 0 and budget_idx == 0 else 'llm_aux_retry')
|
||||
last_status = empty_status or 'llm_empty_aux'
|
||||
if budget_idx == 0 and _title_retry_status(last_status):
|
||||
budgets.append(_title_retry_completion_budget(provider, model, base_url))
|
||||
except Exception as e:
|
||||
last_status = 'llm_error_aux'
|
||||
logger.debug("Aux title generation attempt %s failed: %s", idx + 1, e)
|
||||
return None, 'llm_error_aux'
|
||||
return None, last_status
|
||||
except Exception as e:
|
||||
logger.debug("Aux title generation failed: %s", e)
|
||||
return None, 'llm_error_aux'
|
||||
@@ -356,7 +421,7 @@ def generate_title_raw_via_agent(agent, user_text: str, assistant_text: str) ->
|
||||
return None, 'missing_agent'
|
||||
|
||||
qa, prompts = _title_prompts(user_text, assistant_text)
|
||||
max_tokens = _title_completion_budget(
|
||||
base_max_tokens = _title_completion_budget(
|
||||
getattr(agent, 'provider', ''),
|
||||
getattr(agent, 'model', ''),
|
||||
getattr(agent, 'base_url', ''),
|
||||
@@ -370,57 +435,70 @@ def generate_title_raw_via_agent(agent, user_text: str, assistant_text: str) ->
|
||||
{"role": "system", "content": prompt},
|
||||
{"role": "user", "content": qa},
|
||||
]
|
||||
budgets = [base_max_tokens]
|
||||
try:
|
||||
raw = ""
|
||||
if getattr(agent, 'api_mode', '') == 'codex_responses':
|
||||
codex_kwargs = agent._build_api_kwargs(api_messages)
|
||||
codex_kwargs.pop('tools', None)
|
||||
if 'max_output_tokens' in codex_kwargs:
|
||||
codex_kwargs['max_output_tokens'] = max_tokens
|
||||
resp = agent._run_codex_stream(codex_kwargs)
|
||||
assistant_message, _ = agent._normalize_codex_response(resp)
|
||||
raw = (assistant_message.content or '') if assistant_message else ''
|
||||
elif getattr(agent, 'api_mode', '') == 'anthropic_messages':
|
||||
from agent.anthropic_adapter import build_anthropic_kwargs, normalize_anthropic_response
|
||||
ant_kwargs = build_anthropic_kwargs(
|
||||
model=agent.model,
|
||||
messages=api_messages,
|
||||
tools=None,
|
||||
max_tokens=max_tokens,
|
||||
reasoning_config=disabled_reasoning,
|
||||
is_oauth=getattr(agent, '_is_anthropic_oauth', False),
|
||||
preserve_dots=agent._anthropic_preserve_dots(),
|
||||
base_url=getattr(agent, '_anthropic_base_url', None),
|
||||
)
|
||||
resp = agent._anthropic_messages_create(ant_kwargs)
|
||||
assistant_message, _ = normalize_anthropic_response(
|
||||
resp, strip_tool_prefix=getattr(agent, '_is_anthropic_oauth', False)
|
||||
)
|
||||
raw = (assistant_message.content or '') if assistant_message else ''
|
||||
else:
|
||||
api_kwargs = agent._build_api_kwargs(api_messages)
|
||||
api_kwargs.pop('tools', None)
|
||||
api_kwargs['temperature'] = 0.1
|
||||
api_kwargs['timeout'] = 15.0
|
||||
if _is_minimax_route(getattr(agent, 'provider', ''), getattr(agent, 'model', ''), getattr(agent, 'base_url', '')):
|
||||
extra_body = dict(api_kwargs.get('extra_body') or {})
|
||||
extra_body['reasoning_split'] = True
|
||||
api_kwargs['extra_body'] = extra_body
|
||||
if 'max_completion_tokens' in api_kwargs:
|
||||
api_kwargs['max_completion_tokens'] = max_tokens
|
||||
last_status = 'llm_empty'
|
||||
for budget_idx, max_tokens in enumerate(budgets):
|
||||
raw = ""
|
||||
empty_status = ''
|
||||
if getattr(agent, 'api_mode', '') == 'codex_responses':
|
||||
codex_kwargs = agent._build_api_kwargs(api_messages)
|
||||
codex_kwargs.pop('tools', None)
|
||||
if 'max_output_tokens' in codex_kwargs:
|
||||
codex_kwargs['max_output_tokens'] = max_tokens
|
||||
resp = agent._run_codex_stream(codex_kwargs)
|
||||
assistant_message, _ = agent._normalize_codex_response(resp)
|
||||
raw = (assistant_message.content or '') if assistant_message else ''
|
||||
if not raw:
|
||||
empty_status = 'llm_empty'
|
||||
elif getattr(agent, 'api_mode', '') == 'anthropic_messages':
|
||||
from agent.anthropic_adapter import build_anthropic_kwargs, normalize_anthropic_response
|
||||
ant_kwargs = build_anthropic_kwargs(
|
||||
model=agent.model,
|
||||
messages=api_messages,
|
||||
tools=None,
|
||||
max_tokens=max_tokens,
|
||||
reasoning_config=disabled_reasoning,
|
||||
is_oauth=getattr(agent, '_is_anthropic_oauth', False),
|
||||
preserve_dots=agent._anthropic_preserve_dots(),
|
||||
base_url=getattr(agent, '_anthropic_base_url', None),
|
||||
)
|
||||
resp = agent._anthropic_messages_create(ant_kwargs)
|
||||
assistant_message, _ = normalize_anthropic_response(
|
||||
resp, strip_tool_prefix=getattr(agent, '_is_anthropic_oauth', False)
|
||||
)
|
||||
raw = (assistant_message.content or '') if assistant_message else ''
|
||||
if not raw:
|
||||
empty_status = 'llm_empty'
|
||||
else:
|
||||
api_kwargs['max_tokens'] = max_tokens
|
||||
resp = agent._ensure_primary_openai_client(reason='title_generation').chat.completions.create(
|
||||
**api_kwargs,
|
||||
)
|
||||
try:
|
||||
raw = resp.choices[0].message.content or ""
|
||||
except Exception:
|
||||
raw = ""
|
||||
raw = str(raw or '').strip()
|
||||
if raw:
|
||||
return raw, ('llm' if idx == 0 else 'llm_retry')
|
||||
api_kwargs = agent._build_api_kwargs(api_messages)
|
||||
api_kwargs.pop('tools', None)
|
||||
api_kwargs['temperature'] = 0.1
|
||||
api_kwargs['timeout'] = 15.0
|
||||
if _is_minimax_route(getattr(agent, 'provider', ''), getattr(agent, 'model', ''), getattr(agent, 'base_url', '')):
|
||||
extra_body = dict(api_kwargs.get('extra_body') or {})
|
||||
extra_body['reasoning_split'] = True
|
||||
api_kwargs['extra_body'] = extra_body
|
||||
if 'max_completion_tokens' in api_kwargs:
|
||||
api_kwargs['max_completion_tokens'] = max_tokens
|
||||
else:
|
||||
api_kwargs['max_tokens'] = max_tokens
|
||||
resp = agent._ensure_primary_openai_client(reason='title_generation').chat.completions.create(
|
||||
**api_kwargs,
|
||||
)
|
||||
raw, empty_status = _extract_title_response(resp)
|
||||
raw = str(raw or '').strip()
|
||||
if raw:
|
||||
return raw, ('llm' if idx == 0 and budget_idx == 0 else 'llm_retry')
|
||||
last_status = empty_status or 'llm_empty'
|
||||
if budget_idx == 0 and _title_retry_status(last_status):
|
||||
budgets.append(_title_retry_completion_budget(
|
||||
getattr(agent, 'provider', ''),
|
||||
getattr(agent, 'model', ''),
|
||||
getattr(agent, 'base_url', ''),
|
||||
))
|
||||
except Exception as e:
|
||||
last_status = 'llm_error'
|
||||
logger.debug(
|
||||
"Agent title generation attempt %s failed: provider=%s model=%s error=%s",
|
||||
idx + 1,
|
||||
@@ -428,7 +506,7 @@ def generate_title_raw_via_agent(agent, user_text: str, assistant_text: str) ->
|
||||
getattr(agent, 'model', None),
|
||||
e,
|
||||
)
|
||||
return None, 'llm_error'
|
||||
return None, last_status
|
||||
except Exception as e:
|
||||
logger.debug("Agent title generation failed: %s", e)
|
||||
return None, 'llm_error'
|
||||
@@ -611,6 +689,11 @@ def _run_background_title_update(session_id: str, user_text: str, assistant_text
|
||||
if next_title:
|
||||
logger.debug("Using local fallback for session title generation")
|
||||
source = 'fallback'
|
||||
fallback_reason = (
|
||||
f'local_summary:{llm_status}'
|
||||
if source == 'fallback' and llm_status
|
||||
else 'local_summary'
|
||||
)
|
||||
wrote_title = False
|
||||
effective_title = current
|
||||
if next_title:
|
||||
@@ -638,7 +721,7 @@ def _run_background_title_update(session_id: str, user_text: str, assistant_text
|
||||
|
||||
if wrote_title:
|
||||
if source == 'fallback':
|
||||
_put_title_status(put_event, session_id, source, 'local_summary', effective_title, raw_preview)
|
||||
_put_title_status(put_event, session_id, source, fallback_reason, effective_title, raw_preview)
|
||||
else:
|
||||
_put_title_status(put_event, session_id, source, llm_status, effective_title, raw_preview)
|
||||
put_event('title', {'session_id': session_id, 'title': effective_title})
|
||||
@@ -919,6 +1002,28 @@ def _run_agent_streaming(session_id, msg_text, model, workspace, stream_id, atta
|
||||
CANCEL_FLAGS[stream_id] = cancel_event
|
||||
STREAM_PARTIAL_TEXT[stream_id] = '' # start accumulating partial text (#893)
|
||||
|
||||
# Register this stream with the global streaming meter
|
||||
meter().begin_session(stream_id)
|
||||
|
||||
# Metering ticker — emits a metering event at 1 Hz while sessions are active.
|
||||
# When get_interval() returns >= 10.0 (no active sessions), the ticker exits
|
||||
# so no idle readings are emitted and the SSE consumer sees nothing.
|
||||
_metering_stop = threading.Event()
|
||||
|
||||
def _metering_ticker():
|
||||
while True:
|
||||
interval = meter().get_interval()
|
||||
if interval >= 10.0:
|
||||
break # nothing active — stop the ticker
|
||||
if _metering_stop.wait(interval):
|
||||
break # stream was cancelled or ended — exit
|
||||
stats = meter().get_stats()
|
||||
stats['session_id'] = stream_id
|
||||
put('metering', stats)
|
||||
|
||||
_metering_thread = threading.Thread(target=_metering_ticker, daemon=True)
|
||||
_metering_thread.start()
|
||||
|
||||
def put(event, data):
|
||||
# If cancelled, drop all further events except the cancel event itself
|
||||
if cancel_event.is_set() and event not in ('cancel', 'error'):
|
||||
@@ -1061,6 +1166,19 @@ def _run_agent_streaming(session_id, msg_text, model, workspace, stream_id, atta
|
||||
_reasoning_text = '' # accumulates reasoning/thinking trace for persistence
|
||||
_live_tool_calls = [] # tool progress fallback when final messages omit tool IDs
|
||||
|
||||
# Throttle: emit metering events at most every 100 ms so the TPS label
|
||||
# feels live during fast token streams without flooding the SSE channel.
|
||||
_metering_last_emit = [time.monotonic() - 1] # fire immediately on first token
|
||||
|
||||
def _emit_metering():
|
||||
now = time.monotonic()
|
||||
if now - _metering_last_emit[0] < 0.1:
|
||||
return
|
||||
_metering_last_emit[0] = now
|
||||
stats = meter().get_stats()
|
||||
stats['session_id'] = stream_id
|
||||
put('metering', stats)
|
||||
|
||||
def on_token(text):
|
||||
nonlocal _token_sent
|
||||
if text is None:
|
||||
@@ -1070,6 +1188,9 @@ def _run_agent_streaming(session_id, msg_text, model, workspace, stream_id, atta
|
||||
if stream_id in STREAM_PARTIAL_TEXT:
|
||||
STREAM_PARTIAL_TEXT[stream_id] += str(text)
|
||||
put('token', {'text': text})
|
||||
# Update global throughput meter
|
||||
meter().record_token(stream_id, len(STREAM_PARTIAL_TEXT[stream_id]))
|
||||
_emit_metering()
|
||||
|
||||
def on_reasoning(text):
|
||||
nonlocal _reasoning_text
|
||||
@@ -1077,6 +1198,9 @@ def _run_agent_streaming(session_id, msg_text, model, workspace, stream_id, atta
|
||||
return
|
||||
_reasoning_text += str(text)
|
||||
put('reasoning', {'text': str(text)})
|
||||
# Track reasoning tokens in the meter so TPS reflects all AI output
|
||||
meter().record_reasoning(stream_id, len(_reasoning_text))
|
||||
_emit_metering()
|
||||
|
||||
# Pre-initialise the activity counter here so on_tool (which
|
||||
# closes over it) never captures an unbound name even if this
|
||||
@@ -1084,6 +1208,7 @@ def _run_agent_streaming(session_id, msg_text, model, workspace, stream_id, atta
|
||||
_checkpoint_activity = [0]
|
||||
|
||||
def on_tool(*cb_args, **cb_kwargs):
|
||||
nonlocal _reasoning_text
|
||||
event_type = None
|
||||
name = None
|
||||
preview = None
|
||||
@@ -1103,7 +1228,10 @@ def _run_agent_streaming(session_id, msg_text, model, workspace, stream_id, atta
|
||||
if event_type in ('reasoning.available', '_thinking'):
|
||||
reason_text = preview if event_type == 'reasoning.available' else name
|
||||
if reason_text:
|
||||
_reasoning_text += str(reason_text)
|
||||
put('reasoning', {'text': str(reason_text)})
|
||||
meter().record_reasoning(stream_id, len(_reasoning_text))
|
||||
_emit_metering()
|
||||
return
|
||||
|
||||
args_snap = {}
|
||||
@@ -1623,6 +1751,10 @@ def _run_agent_streaming(session_id, msg_text, model, workspace, stream_id, atta
|
||||
# (reasoning trace already attached + saved above, before s.save())
|
||||
raw_session = s.compact() | {'messages': s.messages, 'tool_calls': tool_calls}
|
||||
put('done', {'session': redact_session_data(raw_session), 'usage': usage})
|
||||
# Emit metering stats for the header TPS label
|
||||
meter_stats = meter().get_stats()
|
||||
meter_stats['session_id'] = session_id
|
||||
put('metering', meter_stats)
|
||||
if _should_bg_title and _u0 and _a0:
|
||||
threading.Thread(
|
||||
target=_run_background_title_update,
|
||||
@@ -1635,6 +1767,8 @@ def _run_agent_streaming(session_id, msg_text, model, workspace, stream_id, atta
|
||||
# activeSid = original session_id so they must match for stream_end to close.
|
||||
put('stream_end', {'session_id': session_id})
|
||||
finally:
|
||||
# Stop the live metering ticker
|
||||
_metering_stop.set()
|
||||
# Unregister the gateway approval callback and unblock any threads
|
||||
# still waiting on approval (e.g. stream cancelled mid-approval).
|
||||
if _approval_registered and _unreg_notify is not None:
|
||||
@@ -1660,6 +1794,13 @@ def _run_agent_streaming(session_id, msg_text, model, workspace, stream_id, atta
|
||||
except Exception as e:
|
||||
print('[webui] stream error:\n' + traceback.format_exc(), flush=True)
|
||||
err_str = str(e)
|
||||
# Sanitize HTML from provider error responses — some providers return
|
||||
# full HTML pages (e.g. nginx "404 page not found") instead of JSON errors.
|
||||
# Strip HTML tags to avoid rendering raw markup in the chat message.
|
||||
_stripped = re.sub(r'<[^>]+>', ' ', err_str)
|
||||
_stripped = re.sub(r'\s+', ' ', _stripped).strip()
|
||||
if _stripped != err_str:
|
||||
err_str = _stripped
|
||||
_exc_lower = err_str.lower()
|
||||
# Classify before saving so the error message can be persisted to the session.
|
||||
# Check quota exhaustion first — OpenAI billing 429s use insufficient_quota which
|
||||
@@ -1683,6 +1824,16 @@ def _run_agent_streaming(session_id, msg_text, model, workspace, stream_id, atta
|
||||
or 'invalid api key' in _exc_lower
|
||||
or 'no cookie auth credentials' in _exc_lower
|
||||
)
|
||||
_exc_is_not_found = (
|
||||
'404' in err_str
|
||||
or 'not found' in _exc_lower
|
||||
or 'does not exist' in _exc_lower
|
||||
or 'model not found' in _exc_lower
|
||||
or 'model_not_found' in _exc_lower
|
||||
or 'invalid model' in _exc_lower
|
||||
or 'does not match any known model' in _exc_lower
|
||||
or 'unknown model' in _exc_lower
|
||||
)
|
||||
if _exc_is_quota:
|
||||
_exc_label, _exc_type, _exc_hint = (
|
||||
'Out of credits', 'quota_exhausted',
|
||||
@@ -1699,6 +1850,12 @@ def _run_agent_streaming(session_id, msg_text, model, workspace, stream_id, atta
|
||||
'The selected model may not be supported by your configured provider. '
|
||||
'Run `hermes model` in your terminal to switch providers, then restart the WebUI.',
|
||||
)
|
||||
elif _exc_is_not_found:
|
||||
_exc_label, _exc_type, _exc_hint = (
|
||||
'Model not found', 'model_not_found',
|
||||
'The selected model was not found by the provider. '
|
||||
'Check the model ID in Settings or run `hermes model` to verify it exists for your provider.',
|
||||
)
|
||||
else:
|
||||
_exc_label, _exc_type, _exc_hint = 'Error', 'error', ''
|
||||
if s is not None:
|
||||
|
||||
Reference in New Issue
Block a user