mirror of
https://github.com/nesquena/hermes-webui.git
synced 2026-05-25 03:00:23 +00:00
Stage 320: PR #1860 — request wedge diagnostics by @franksong2702
This commit is contained in:
+23
-1
@@ -966,12 +966,24 @@ def _enrich_sidebar_lineage_metadata(sessions: list[dict]) -> None:
|
||||
session.update(metadata[sid])
|
||||
|
||||
|
||||
def all_sessions():
|
||||
def _diag_stage(diag, name: str) -> None:
|
||||
if diag is not None:
|
||||
try:
|
||||
diag.stage(name)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
|
||||
def all_sessions(diag=None):
|
||||
_diag_stage(diag, "all_sessions.active_streams")
|
||||
active_stream_ids = _active_stream_ids()
|
||||
# Phase C: try index first for O(1) read; fall back to full scan
|
||||
_diag_stage(diag, "all_sessions.index_exists")
|
||||
if SESSION_INDEX_FILE.exists():
|
||||
try:
|
||||
_diag_stage(diag, "all_sessions.read_index")
|
||||
index = json.loads(SESSION_INDEX_FILE.read_text(encoding='utf-8'))
|
||||
_diag_stage(diag, "all_sessions.prune_index")
|
||||
index = [
|
||||
s for s in index
|
||||
if _index_entry_exists(s.get('session_id'))
|
||||
@@ -979,21 +991,25 @@ def all_sessions():
|
||||
backfilled = []
|
||||
for i, s in enumerate(index):
|
||||
if 'last_message_at' not in s:
|
||||
_diag_stage(diag, "all_sessions.backfill_load")
|
||||
full = Session.load(s.get('session_id'))
|
||||
if full:
|
||||
index[i] = full.compact()
|
||||
backfilled.append(full)
|
||||
if backfilled:
|
||||
try:
|
||||
_diag_stage(diag, "all_sessions.backfill_write")
|
||||
_write_session_index(updates=backfilled)
|
||||
except Exception:
|
||||
logger.debug("Failed to persist last_message_at backfill")
|
||||
_diag_stage(diag, "all_sessions.mark_streaming")
|
||||
for s in index:
|
||||
s['is_streaming'] = _is_streaming_session(
|
||||
s.get('active_stream_id'),
|
||||
active_stream_ids,
|
||||
)
|
||||
# Overlay any in-memory sessions that may be newer than the index
|
||||
_diag_stage(diag, "all_sessions.overlay_lock")
|
||||
index_map = {s['session_id']: s for s in index}
|
||||
with LOCK:
|
||||
for s in SESSIONS.values():
|
||||
@@ -1001,6 +1017,7 @@ def all_sessions():
|
||||
include_runtime=True,
|
||||
active_stream_ids=active_stream_ids,
|
||||
)
|
||||
_diag_stage(diag, "all_sessions.sort_filter")
|
||||
result = sorted(index_map.values(), key=lambda s: (s.get('pinned', False), _session_sort_timestamp(s)), reverse=True)
|
||||
# Hide empty Untitled sessions from the UI entirely — they are ephemeral
|
||||
# scratch pads that only become real once the first message is sent (#1171).
|
||||
@@ -1025,11 +1042,13 @@ def all_sessions():
|
||||
for s in result:
|
||||
if not s.get('profile'):
|
||||
s['profile'] = 'default'
|
||||
_diag_stage(diag, "all_sessions.lineage_metadata")
|
||||
_enrich_sidebar_lineage_metadata(result)
|
||||
return result
|
||||
except Exception:
|
||||
logger.debug("Failed to load session index, falling back to full scan")
|
||||
# Full scan fallback
|
||||
_diag_stage(diag, "all_sessions.full_scan")
|
||||
out = []
|
||||
for p in SESSION_DIR.glob('*.json'):
|
||||
if p.name.startswith('_'): continue
|
||||
@@ -1038,8 +1057,10 @@ def all_sessions():
|
||||
if s: out.append(s)
|
||||
except Exception:
|
||||
logger.debug("Failed to load session from %s", p)
|
||||
_diag_stage(diag, "all_sessions.full_scan_overlay")
|
||||
for s in SESSIONS.values():
|
||||
if all(s.session_id != x.session_id for x in out): out.append(s)
|
||||
_diag_stage(diag, "all_sessions.full_scan_sort_filter")
|
||||
out.sort(key=lambda s: (getattr(s, 'pinned', False), _session_sort_timestamp(s)), reverse=True)
|
||||
# Hide empty Untitled sessions from the UI entirely — kept consistent with the
|
||||
# index-path filter above. No grace window: a 0-message Untitled session is
|
||||
@@ -1054,6 +1075,7 @@ def all_sessions():
|
||||
for s in result:
|
||||
if not s.get('profile'):
|
||||
s['profile'] = 'default'
|
||||
_diag_stage(diag, "all_sessions.lineage_metadata")
|
||||
_enrich_sidebar_lineage_metadata(result)
|
||||
return result
|
||||
|
||||
|
||||
@@ -0,0 +1,160 @@
|
||||
"""Slow request diagnostics for latency-sensitive browser API paths."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
import sys
|
||||
import threading
|
||||
import time
|
||||
import traceback
|
||||
import uuid
|
||||
from typing import Any
|
||||
|
||||
|
||||
DEFAULT_SLOW_REQUEST_SECONDS = 5.0
|
||||
MAX_STACK_FRAMES_PER_THREAD = 40
|
||||
|
||||
|
||||
def _slow_request_seconds() -> float:
|
||||
raw = os.getenv("HERMES_WEBUI_SLOW_REQUEST_SECONDS", "").strip()
|
||||
if not raw:
|
||||
return DEFAULT_SLOW_REQUEST_SECONDS
|
||||
try:
|
||||
value = float(raw)
|
||||
except ValueError:
|
||||
return DEFAULT_SLOW_REQUEST_SECONDS
|
||||
return max(0.0, value)
|
||||
|
||||
|
||||
class RequestDiagnostics:
|
||||
"""Track request stages and emit a watchdog record if a request wedges."""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
method: str,
|
||||
path: str,
|
||||
*,
|
||||
logger: logging.Logger | None = None,
|
||||
timeout_seconds: float | None = None,
|
||||
auto_start: bool = True,
|
||||
) -> None:
|
||||
self.request_id = uuid.uuid4().hex[:10]
|
||||
self.method = str(method or "-")
|
||||
self.path = str(path or "-").split("?", 1)[0]
|
||||
self.logger = logger or logging.getLogger(__name__)
|
||||
self.timeout_seconds = _slow_request_seconds() if timeout_seconds is None else max(0.0, float(timeout_seconds))
|
||||
self.started_monotonic = time.monotonic()
|
||||
self.started_wall = time.time()
|
||||
self._lock = threading.Lock()
|
||||
self._stages: list[dict[str, Any]] = []
|
||||
self._current_stage = "start"
|
||||
self._current_stage_started = self.started_monotonic
|
||||
self._finished = False
|
||||
self._watchdog_logged = False
|
||||
self._timer: threading.Timer | None = None
|
||||
if auto_start and self.timeout_seconds > 0:
|
||||
self._timer = threading.Timer(self.timeout_seconds, self._on_timeout)
|
||||
self._timer.daemon = True
|
||||
self._timer.start()
|
||||
|
||||
@classmethod
|
||||
def maybe_start(
|
||||
cls,
|
||||
method: str,
|
||||
path: str,
|
||||
*,
|
||||
logger: logging.Logger | None = None,
|
||||
) -> "RequestDiagnostics | None":
|
||||
clean_path = str(path or "").split("?", 1)[0]
|
||||
if (method.upper(), clean_path) not in {
|
||||
("GET", "/api/sessions"),
|
||||
("POST", "/api/chat/start"),
|
||||
}:
|
||||
return None
|
||||
return cls(method, clean_path, logger=logger)
|
||||
|
||||
def stage(self, name: str) -> None:
|
||||
now = time.monotonic()
|
||||
clean = str(name or "unknown").strip() or "unknown"
|
||||
with self._lock:
|
||||
if self._finished:
|
||||
return
|
||||
self._stages.append(
|
||||
{
|
||||
"name": self._current_stage,
|
||||
"ms": round((now - self._current_stage_started) * 1000, 1),
|
||||
}
|
||||
)
|
||||
self._current_stage = clean
|
||||
self._current_stage_started = now
|
||||
|
||||
def finish(self) -> None:
|
||||
timer = None
|
||||
record = None
|
||||
with self._lock:
|
||||
if self._finished:
|
||||
return
|
||||
self._finished = True
|
||||
timer = self._timer
|
||||
record = self._build_record_locked(include_stacks=False)
|
||||
if timer is not None:
|
||||
timer.cancel()
|
||||
if record and self.timeout_seconds > 0 and record["elapsed_ms"] >= self.timeout_seconds * 1000:
|
||||
self.logger.warning(
|
||||
"Slow WebUI request completed: %s",
|
||||
json.dumps(record, sort_keys=True),
|
||||
)
|
||||
|
||||
def _on_timeout(self) -> None:
|
||||
with self._lock:
|
||||
if self._finished or self._watchdog_logged:
|
||||
return
|
||||
self._watchdog_logged = True
|
||||
record = self._build_record_locked(include_stacks=True)
|
||||
self.logger.warning(
|
||||
"Slow WebUI request still running: %s",
|
||||
json.dumps(record, sort_keys=True),
|
||||
)
|
||||
|
||||
def _build_record_locked(self, *, include_stacks: bool) -> dict[str, Any]:
|
||||
now = time.monotonic()
|
||||
stages = list(self._stages)
|
||||
stages.append(
|
||||
{
|
||||
"name": self._current_stage,
|
||||
"ms": round((now - self._current_stage_started) * 1000, 1),
|
||||
}
|
||||
)
|
||||
record: dict[str, Any] = {
|
||||
"request_id": self.request_id,
|
||||
"method": self.method,
|
||||
"path": self.path,
|
||||
"started_at": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime(self.started_wall)),
|
||||
"elapsed_ms": round((now - self.started_monotonic) * 1000, 1),
|
||||
"current_stage": self._current_stage,
|
||||
"stages": stages,
|
||||
}
|
||||
if include_stacks:
|
||||
record["thread_stacks"] = _thread_stack_snapshot()
|
||||
return record
|
||||
|
||||
|
||||
def _thread_stack_snapshot() -> list[dict[str, Any]]:
|
||||
frames = sys._current_frames()
|
||||
threads = {thread.ident: thread for thread in threading.enumerate()}
|
||||
snapshot: list[dict[str, Any]] = []
|
||||
for ident, frame in frames.items():
|
||||
thread = threads.get(ident)
|
||||
stack = traceback.format_stack(frame, limit=MAX_STACK_FRAMES_PER_THREAD)
|
||||
snapshot.append(
|
||||
{
|
||||
"thread_id": ident,
|
||||
"thread_name": thread.name if thread else "",
|
||||
"daemon": bool(thread.daemon) if thread else None,
|
||||
"stack": [line.rstrip() for line in stack],
|
||||
}
|
||||
)
|
||||
snapshot.sort(key=lambda item: str(item.get("thread_name") or ""))
|
||||
return snapshot
|
||||
+223
-170
@@ -587,6 +587,7 @@ from api.helpers import (
|
||||
_redact_text,
|
||||
)
|
||||
from api.agent_health import build_agent_health_payload
|
||||
from api.request_diagnostics import RequestDiagnostics
|
||||
from api.system_health import build_system_health_payload
|
||||
|
||||
|
||||
@@ -2972,80 +2973,96 @@ def handle_get(handler, parsed) -> bool:
|
||||
return j(handler, {"results": get_results(sid)})
|
||||
|
||||
if parsed.path == "/api/sessions":
|
||||
webui_sessions = all_sessions()
|
||||
settings = load_settings()
|
||||
show_cli_sessions = bool(settings.get("show_cli_sessions"))
|
||||
if show_cli_sessions:
|
||||
cli = get_cli_sessions()
|
||||
cli_by_id = {s["session_id"]: s for s in cli}
|
||||
for s in webui_sessions:
|
||||
meta = cli_by_id.get(s.get("session_id"))
|
||||
if not meta:
|
||||
continue
|
||||
if _is_messaging_session_record(meta):
|
||||
s.update(_merge_cli_sidebar_metadata(s, meta))
|
||||
if s.get("session_id") != meta.get("session_id"):
|
||||
s["session_id"] = meta.get("session_id")
|
||||
else:
|
||||
for key in ("source_tag", "raw_source", "session_source", "source_label"):
|
||||
if not s.get(key) and meta.get(key):
|
||||
s[key] = meta[key]
|
||||
# Apply the same CLI visibility semantics to imported local copies so
|
||||
# low-value imported artifacts do not leak into the sidebar.
|
||||
webui_sessions = [s for s in webui_sessions if is_cli_session_row_visible(s)]
|
||||
webui_ids = {s["session_id"] for s in webui_sessions}
|
||||
from api.models import _hide_from_default_sidebar as _cron_hide
|
||||
deduped_cli = [s for s in cli if s["session_id"] not in webui_ids and is_cli_session_row_visible(s) and not _cron_hide(s)]
|
||||
else:
|
||||
webui_sessions = [s for s in webui_sessions if not _is_cli_session_for_settings(s)]
|
||||
deduped_cli = []
|
||||
merged = webui_sessions + deduped_cli
|
||||
merged.sort(
|
||||
key=lambda s: s.get("last_message_at") or s.get("updated_at", 0) or 0,
|
||||
reverse=True,
|
||||
)
|
||||
# ── Profile scoping (#1611) ────────────────────────────────────────
|
||||
# Default: filter to the active profile. ?all_profiles=1 opts into
|
||||
# the aggregate view used by the "All profiles" sidebar toggle.
|
||||
# The other_profile_count is always returned so the UI can render
|
||||
# the "Show N from other profiles" affordance without sending the
|
||||
# cross-profile rows by default.
|
||||
#
|
||||
# IMPORTANT: scope BEFORE _keep_latest_messaging_session_per_source.
|
||||
# _messaging_source_key is profile-blind (#1614 follow-up): if the
|
||||
# same Slack/Telegram identity has sessions in profiles A and B, a
|
||||
# profile-blind dedupe would discard the older one even when scoped
|
||||
# to its own profile, leaving that profile with zero rows for that
|
||||
# source. Filter first so the dedupe operates only within the active
|
||||
# profile's rows.
|
||||
from api.profiles import get_active_profile_name
|
||||
active_profile = get_active_profile_name()
|
||||
all_profiles = _all_profiles_query_flag(parsed)
|
||||
if all_profiles:
|
||||
scoped = merged
|
||||
other_profile_count = 0
|
||||
else:
|
||||
scoped = [s for s in merged
|
||||
if _profiles_match(s.get("profile"), active_profile)]
|
||||
other_profile_count = len(merged) - len(scoped)
|
||||
scoped = _keep_latest_messaging_session_per_source(scoped)
|
||||
if show_cli_sessions:
|
||||
scoped = _cap_recent_cli_sessions(scoped, cli_cap=CLI_VISIBLE_SESSION_CAP)
|
||||
safe_merged = []
|
||||
for s in scoped:
|
||||
item = dict(s)
|
||||
if isinstance(item.get("title"), str):
|
||||
item["title"] = _redact_text(item["title"])
|
||||
safe_merged.append(item)
|
||||
return j(handler, {
|
||||
"sessions": safe_merged,
|
||||
"cli_count": len(deduped_cli),
|
||||
"all_profiles": all_profiles,
|
||||
"active_profile": active_profile,
|
||||
"other_profile_count": other_profile_count,
|
||||
"server_time": time.time(),
|
||||
"server_tz": time.strftime("%z"),
|
||||
})
|
||||
diag = RequestDiagnostics.maybe_start("GET", parsed.path, logger=logger)
|
||||
try:
|
||||
diag.stage("all_sessions")
|
||||
webui_sessions = all_sessions(diag=diag)
|
||||
diag.stage("load_settings")
|
||||
settings = load_settings()
|
||||
show_cli_sessions = bool(settings.get("show_cli_sessions"))
|
||||
if show_cli_sessions:
|
||||
diag.stage("get_cli_sessions")
|
||||
cli = get_cli_sessions()
|
||||
diag.stage("merge_cli_sessions")
|
||||
cli_by_id = {s["session_id"]: s for s in cli}
|
||||
for s in webui_sessions:
|
||||
meta = cli_by_id.get(s.get("session_id"))
|
||||
if not meta:
|
||||
continue
|
||||
if _is_messaging_session_record(meta):
|
||||
s.update(_merge_cli_sidebar_metadata(s, meta))
|
||||
if s.get("session_id") != meta.get("session_id"):
|
||||
s["session_id"] = meta.get("session_id")
|
||||
else:
|
||||
for key in ("source_tag", "raw_source", "session_source", "source_label"):
|
||||
if not s.get(key) and meta.get(key):
|
||||
s[key] = meta[key]
|
||||
# Apply the same CLI visibility semantics to imported local copies so
|
||||
# low-value imported artifacts do not leak into the sidebar.
|
||||
webui_sessions = [s for s in webui_sessions if is_cli_session_row_visible(s)]
|
||||
webui_ids = {s["session_id"] for s in webui_sessions}
|
||||
from api.models import _hide_from_default_sidebar as _cron_hide
|
||||
deduped_cli = [s for s in cli if s["session_id"] not in webui_ids and is_cli_session_row_visible(s) and not _cron_hide(s)]
|
||||
else:
|
||||
diag.stage("filter_webui_sessions")
|
||||
webui_sessions = [s for s in webui_sessions if not _is_cli_session_for_settings(s)]
|
||||
deduped_cli = []
|
||||
diag.stage("sort_sessions")
|
||||
merged = webui_sessions + deduped_cli
|
||||
merged.sort(
|
||||
key=lambda s: s.get("last_message_at") or s.get("updated_at", 0) or 0,
|
||||
reverse=True,
|
||||
)
|
||||
# ── Profile scoping (#1611) ────────────────────────────────────────
|
||||
# Default: filter to the active profile. ?all_profiles=1 opts into
|
||||
# the aggregate view used by the "All profiles" sidebar toggle.
|
||||
# The other_profile_count is always returned so the UI can render
|
||||
# the "Show N from other profiles" affordance without sending the
|
||||
# cross-profile rows by default.
|
||||
#
|
||||
# IMPORTANT: scope BEFORE _keep_latest_messaging_session_per_source.
|
||||
# _messaging_source_key is profile-blind (#1614 follow-up): if the
|
||||
# same Slack/Telegram identity has sessions in profiles A and B, a
|
||||
# profile-blind dedupe would discard the older one even when scoped
|
||||
# to its own profile, leaving that profile with zero rows for that
|
||||
# source. Filter first so the dedupe operates only within the active
|
||||
# profile's rows.
|
||||
diag.stage("active_profile")
|
||||
from api.profiles import get_active_profile_name
|
||||
active_profile = get_active_profile_name()
|
||||
all_profiles = _all_profiles_query_flag(parsed)
|
||||
diag.stage("profile_filter")
|
||||
if all_profiles:
|
||||
scoped = merged
|
||||
other_profile_count = 0
|
||||
else:
|
||||
scoped = [s for s in merged
|
||||
if _profiles_match(s.get("profile"), active_profile)]
|
||||
other_profile_count = len(merged) - len(scoped)
|
||||
diag.stage("messaging_dedupe")
|
||||
scoped = _keep_latest_messaging_session_per_source(scoped)
|
||||
if show_cli_sessions:
|
||||
diag.stage("cli_cap")
|
||||
scoped = _cap_recent_cli_sessions(scoped, cli_cap=CLI_VISIBLE_SESSION_CAP)
|
||||
diag.stage("redact_sessions")
|
||||
safe_merged = []
|
||||
for s in scoped:
|
||||
item = dict(s)
|
||||
if isinstance(item.get("title"), str):
|
||||
item["title"] = _redact_text(item["title"])
|
||||
safe_merged.append(item)
|
||||
diag.stage("response_write")
|
||||
return j(handler, {
|
||||
"sessions": safe_merged,
|
||||
"cli_count": len(deduped_cli),
|
||||
"all_profiles": all_profiles,
|
||||
"active_profile": active_profile,
|
||||
"other_profile_count": other_profile_count,
|
||||
"server_time": time.time(),
|
||||
"server_tz": time.strftime("%z"),
|
||||
})
|
||||
finally:
|
||||
diag.finish()
|
||||
|
||||
if parsed.path == "/api/projects":
|
||||
# ── Profile scoping (#1614) ────────────────────────────────────────
|
||||
@@ -3453,9 +3470,16 @@ def handle_get(handler, parsed) -> bool:
|
||||
|
||||
def handle_post(handler, parsed) -> bool:
|
||||
"""Handle all POST routes. Returns True if handled, False for 404."""
|
||||
diag = RequestDiagnostics.maybe_start("POST", parsed.path, logger=logger)
|
||||
# CSRF: reject cross-origin browser requests
|
||||
if diag:
|
||||
diag.stage("csrf")
|
||||
if not _check_csrf(handler):
|
||||
return j(handler, {"error": "Cross-origin request rejected"}, status=403)
|
||||
try:
|
||||
return j(handler, {"error": "Cross-origin request rejected"}, status=403)
|
||||
finally:
|
||||
if diag:
|
||||
diag.finish()
|
||||
|
||||
if parsed.path == "/api/upload":
|
||||
return handle_upload(handler)
|
||||
@@ -3465,7 +3489,14 @@ def handle_post(handler, parsed) -> bool:
|
||||
if parsed.path == "/api/transcribe":
|
||||
return handle_transcribe(handler)
|
||||
|
||||
body = read_body(handler)
|
||||
if diag:
|
||||
diag.stage("read_body")
|
||||
try:
|
||||
body = read_body(handler)
|
||||
except Exception:
|
||||
if diag:
|
||||
diag.finish()
|
||||
raise
|
||||
|
||||
if parsed.path.startswith("/api/kanban/"):
|
||||
from api.kanban_bridge import handle_kanban_post
|
||||
@@ -4002,7 +4033,7 @@ def handle_post(handler, parsed) -> bool:
|
||||
return _handle_background(handler, body)
|
||||
|
||||
if parsed.path == "/api/chat/start":
|
||||
return _handle_chat_start(handler, body)
|
||||
return _handle_chat_start(handler, body, diag=diag)
|
||||
|
||||
if parsed.path == "/api/chat":
|
||||
return _handle_chat_sync(handler, body)
|
||||
@@ -6170,104 +6201,126 @@ def _prepare_chat_start_session_for_stream(
|
||||
s.save()
|
||||
|
||||
|
||||
def _handle_chat_start(handler, body):
|
||||
def _handle_chat_start(handler, body, diag=None):
|
||||
try:
|
||||
require(body, "session_id")
|
||||
except ValueError as e:
|
||||
return bad(handler, str(e))
|
||||
try:
|
||||
s = get_session(body["session_id"])
|
||||
except KeyError:
|
||||
return bad(handler, "Session not found", 404)
|
||||
requested_profile = str(body.get("profile") or "").strip()
|
||||
if requested_profile:
|
||||
diag.stage("validate_session_id") if diag else None
|
||||
try:
|
||||
from api.profiles import _PROFILE_ID_RE
|
||||
require(body, "session_id")
|
||||
except ValueError as e:
|
||||
return bad(handler, str(e))
|
||||
diag.stage("get_session") if diag else None
|
||||
try:
|
||||
s = get_session(body["session_id"])
|
||||
except KeyError:
|
||||
return bad(handler, "Session not found", 404)
|
||||
diag.stage("validate_profile") if diag else None
|
||||
requested_profile = str(body.get("profile") or "").strip()
|
||||
if requested_profile:
|
||||
try:
|
||||
from api.profiles import _PROFILE_ID_RE
|
||||
|
||||
if requested_profile != "default" and not _PROFILE_ID_RE.fullmatch(requested_profile):
|
||||
return bad(handler, "invalid profile", 400)
|
||||
except ImportError:
|
||||
requested_profile = ""
|
||||
if requested_profile and not _profiles_match(getattr(s, "profile", None), requested_profile):
|
||||
has_persisted_turns = bool(
|
||||
getattr(s, "messages", None)
|
||||
or getattr(s, "context_messages", None)
|
||||
or getattr(s, "pending_user_message", None)
|
||||
)
|
||||
if not has_persisted_turns:
|
||||
# Empty sessions are placeholders. If the user switches profiles
|
||||
# before sending the first turn, run the placeholder under the
|
||||
# currently-selected profile instead of the stale one stamped at
|
||||
# creation time.
|
||||
s.profile = requested_profile
|
||||
msg = str(body.get("message", "")).strip()
|
||||
if not msg:
|
||||
return bad(handler, "message is required")
|
||||
attachments = _normalize_chat_attachments(body.get("attachments") or [])[:20]
|
||||
try:
|
||||
workspace = str(resolve_trusted_workspace(body.get("workspace") or s.workspace))
|
||||
except ValueError as e:
|
||||
return bad(handler, str(e))
|
||||
requested_model = body.get("model") or s.model
|
||||
requested_provider = (
|
||||
body.get("model_provider")
|
||||
if "model_provider" in body
|
||||
else getattr(s, "model_provider", None)
|
||||
)
|
||||
model, model_provider, normalized_model = _resolve_compatible_session_model_state(
|
||||
requested_model,
|
||||
requested_provider,
|
||||
)
|
||||
# Prevent duplicate runs in the same session while a stream is still active.
|
||||
# This commonly happens after page refresh/reconnect races and can produce
|
||||
# duplicated clarify cards for what appears to be a single user request.
|
||||
current_stream_id = getattr(s, "active_stream_id", None)
|
||||
if current_stream_id:
|
||||
with STREAMS_LOCK:
|
||||
current_active = current_stream_id in STREAMS
|
||||
if current_active:
|
||||
return j(
|
||||
handler,
|
||||
{
|
||||
"error": "session already has an active stream",
|
||||
"active_stream_id": current_stream_id,
|
||||
},
|
||||
status=409,
|
||||
if requested_profile != "default" and not _PROFILE_ID_RE.fullmatch(requested_profile):
|
||||
return bad(handler, "invalid profile", 400)
|
||||
except ImportError:
|
||||
requested_profile = ""
|
||||
if requested_profile and not _profiles_match(getattr(s, "profile", None), requested_profile):
|
||||
has_persisted_turns = bool(
|
||||
getattr(s, "messages", None)
|
||||
or getattr(s, "context_messages", None)
|
||||
or getattr(s, "pending_user_message", None)
|
||||
)
|
||||
# Stale stream id from a previous run; clear and continue.
|
||||
_clear_stale_stream_state(s)
|
||||
stream_id = uuid.uuid4().hex
|
||||
with _get_session_agent_lock(s.session_id):
|
||||
_prepare_chat_start_session_for_stream(
|
||||
s,
|
||||
msg=msg,
|
||||
attachments=attachments,
|
||||
workspace=workspace,
|
||||
model=model,
|
||||
model_provider=model_provider,
|
||||
stream_id=stream_id,
|
||||
if not has_persisted_turns:
|
||||
# Empty sessions are placeholders. If the user switches profiles
|
||||
# before sending the first turn, run the placeholder under the
|
||||
# currently-selected profile instead of the stale one stamped at
|
||||
# creation time.
|
||||
s.profile = requested_profile
|
||||
diag.stage("normalize_message") if diag else None
|
||||
msg = str(body.get("message", "")).strip()
|
||||
if not msg:
|
||||
return bad(handler, "message is required")
|
||||
diag.stage("normalize_attachments") if diag else None
|
||||
attachments = _normalize_chat_attachments(body.get("attachments") or [])[:20]
|
||||
diag.stage("resolve_workspace") if diag else None
|
||||
try:
|
||||
workspace = str(resolve_trusted_workspace(body.get("workspace") or s.workspace))
|
||||
except ValueError as e:
|
||||
return bad(handler, str(e))
|
||||
requested_model = body.get("model") or s.model
|
||||
requested_provider = (
|
||||
body.get("model_provider")
|
||||
if "model_provider" in body
|
||||
else getattr(s, "model_provider", None)
|
||||
)
|
||||
set_last_workspace(workspace)
|
||||
stream = create_stream_channel()
|
||||
with STREAMS_LOCK:
|
||||
STREAMS[stream_id] = stream
|
||||
thr = threading.Thread(
|
||||
target=_run_agent_streaming,
|
||||
args=(s.session_id, msg, model, workspace, stream_id, attachments),
|
||||
kwargs={"model_provider": model_provider},
|
||||
daemon=True,
|
||||
)
|
||||
thr.start()
|
||||
response = {
|
||||
"stream_id": stream_id,
|
||||
"session_id": s.session_id,
|
||||
"pending_started_at": s.pending_started_at,
|
||||
}
|
||||
if normalized_model:
|
||||
response["effective_model"] = model
|
||||
if model_provider:
|
||||
response["effective_model_provider"] = model_provider
|
||||
return j(handler, response)
|
||||
diag.stage("resolve_model_provider") if diag else None
|
||||
model, model_provider, normalized_model = _resolve_compatible_session_model_state(
|
||||
requested_model,
|
||||
requested_provider,
|
||||
)
|
||||
# Prevent duplicate runs in the same session while a stream is still active.
|
||||
# This commonly happens after page refresh/reconnect races and can produce
|
||||
# duplicated clarify cards for what appears to be a single user request.
|
||||
diag.stage("active_stream_check") if diag else None
|
||||
current_stream_id = getattr(s, "active_stream_id", None)
|
||||
if current_stream_id:
|
||||
diag.stage("active_stream_lock_wait") if diag else None
|
||||
with STREAMS_LOCK:
|
||||
current_active = current_stream_id in STREAMS
|
||||
if current_active:
|
||||
diag.stage("response_write") if diag else None
|
||||
return j(
|
||||
handler,
|
||||
{
|
||||
"error": "session already has an active stream",
|
||||
"active_stream_id": current_stream_id,
|
||||
},
|
||||
status=409,
|
||||
)
|
||||
# Stale stream id from a previous run; clear and continue.
|
||||
diag.stage("stale_stream_cleanup") if diag else None
|
||||
_clear_stale_stream_state(s)
|
||||
stream_id = uuid.uuid4().hex
|
||||
session_lock = _get_session_agent_lock(s.session_id)
|
||||
diag.stage("session_lock_wait") if diag else None
|
||||
with session_lock:
|
||||
diag.stage("save_pending_state") if diag else None
|
||||
_prepare_chat_start_session_for_stream(
|
||||
s,
|
||||
msg=msg,
|
||||
attachments=attachments,
|
||||
workspace=workspace,
|
||||
model=model,
|
||||
model_provider=model_provider,
|
||||
stream_id=stream_id,
|
||||
)
|
||||
diag.stage("set_last_workspace") if diag else None
|
||||
set_last_workspace(workspace)
|
||||
diag.stage("stream_registration") if diag else None
|
||||
stream = create_stream_channel()
|
||||
with STREAMS_LOCK:
|
||||
STREAMS[stream_id] = stream
|
||||
diag.stage("worker_thread_start") if diag else None
|
||||
thr = threading.Thread(
|
||||
target=_run_agent_streaming,
|
||||
args=(s.session_id, msg, model, workspace, stream_id, attachments),
|
||||
kwargs={"model_provider": model_provider},
|
||||
daemon=True,
|
||||
)
|
||||
thr.start()
|
||||
response = {
|
||||
"stream_id": stream_id,
|
||||
"session_id": s.session_id,
|
||||
"pending_started_at": s.pending_started_at,
|
||||
}
|
||||
if normalized_model:
|
||||
response["effective_model"] = model
|
||||
if model_provider:
|
||||
response["effective_model_provider"] = model_provider
|
||||
diag.stage("response_write") if diag else None
|
||||
return j(handler, response)
|
||||
finally:
|
||||
if diag:
|
||||
diag.finish()
|
||||
|
||||
|
||||
def _normalize_chat_attachments(raw_attachments):
|
||||
|
||||
@@ -0,0 +1,110 @@
|
||||
import json
|
||||
import logging
|
||||
from pathlib import Path
|
||||
|
||||
import api.models as models
|
||||
from api.models import Session
|
||||
from api.request_diagnostics import RequestDiagnostics
|
||||
|
||||
|
||||
class _StageRecorder:
|
||||
def __init__(self):
|
||||
self.stages = []
|
||||
|
||||
def stage(self, name):
|
||||
self.stages.append(name)
|
||||
|
||||
|
||||
def test_request_diagnostics_timeout_record_includes_stage_and_thread_stacks(caplog):
|
||||
logger = logging.getLogger("test.issue1855.timeout")
|
||||
diag = RequestDiagnostics(
|
||||
"GET",
|
||||
"/api/sessions?all_profiles=1",
|
||||
logger=logger,
|
||||
timeout_seconds=5,
|
||||
auto_start=False,
|
||||
)
|
||||
diag.stage("all_sessions.read_index")
|
||||
|
||||
with caplog.at_level(logging.WARNING, logger=logger.name):
|
||||
diag._on_timeout()
|
||||
|
||||
assert len(caplog.records) == 1
|
||||
record = json.loads(caplog.records[0].args[0])
|
||||
assert record["method"] == "GET"
|
||||
assert record["path"] == "/api/sessions"
|
||||
assert record["current_stage"] == "all_sessions.read_index"
|
||||
assert record["elapsed_ms"] >= 0
|
||||
assert any(stage["name"] == "all_sessions.read_index" for stage in record["stages"])
|
||||
assert record["thread_stacks"]
|
||||
|
||||
|
||||
def test_request_diagnostics_maybe_start_is_limited_to_issue1855_paths():
|
||||
assert RequestDiagnostics.maybe_start("GET", "/api/sessions") is not None
|
||||
assert RequestDiagnostics.maybe_start("POST", "/api/chat/start") is not None
|
||||
assert RequestDiagnostics.maybe_start("GET", "/health") is None
|
||||
assert RequestDiagnostics.maybe_start("POST", "/api/session/new") is None
|
||||
|
||||
|
||||
def test_all_sessions_reports_internal_index_stages(tmp_path, monkeypatch):
|
||||
session_dir = tmp_path / "sessions"
|
||||
session_dir.mkdir()
|
||||
index_file = session_dir / "_index.json"
|
||||
monkeypatch.setattr(models, "SESSION_DIR", session_dir)
|
||||
monkeypatch.setattr(models, "SESSION_INDEX_FILE", index_file)
|
||||
monkeypatch.setattr(models, "_enrich_sidebar_lineage_metadata", lambda sessions: None)
|
||||
models.SESSIONS.clear()
|
||||
|
||||
s = Session(
|
||||
session_id="issue1855_indexed",
|
||||
title="Indexed",
|
||||
messages=[{"role": "user", "content": "hi", "timestamp": 100}],
|
||||
)
|
||||
s.path.write_text(json.dumps(s.__dict__, ensure_ascii=False), encoding="utf-8")
|
||||
index_file.write_text(
|
||||
json.dumps(
|
||||
[
|
||||
{
|
||||
"session_id": s.session_id,
|
||||
"title": s.title,
|
||||
"updated_at": s.updated_at,
|
||||
"workspace": s.workspace,
|
||||
"model": s.model,
|
||||
"message_count": 1,
|
||||
"created_at": s.created_at,
|
||||
"pinned": False,
|
||||
"archived": False,
|
||||
"last_message_at": 100,
|
||||
}
|
||||
],
|
||||
ensure_ascii=False,
|
||||
),
|
||||
encoding="utf-8",
|
||||
)
|
||||
|
||||
diag = _StageRecorder()
|
||||
rows = models.all_sessions(diag=diag)
|
||||
|
||||
assert [row["session_id"] for row in rows] == [s.session_id]
|
||||
assert "all_sessions.read_index" in diag.stages
|
||||
assert "all_sessions.overlay_lock" in diag.stages
|
||||
assert "all_sessions.lineage_metadata" in diag.stages
|
||||
|
||||
|
||||
def test_issue1855_target_routes_are_wired_to_diagnostics():
|
||||
src = Path("api/routes.py").read_text(encoding="utf-8")
|
||||
|
||||
assert 'RequestDiagnostics.maybe_start("GET", parsed.path' in src
|
||||
assert "all_sessions(diag=diag)" in src
|
||||
assert 'RequestDiagnostics.maybe_start("POST", parsed.path' in src
|
||||
assert "_handle_chat_start(handler, body, diag=diag)" in src
|
||||
for stage in (
|
||||
"read_body",
|
||||
"resolve_model_provider",
|
||||
"session_lock_wait",
|
||||
"save_pending_state",
|
||||
"stream_registration",
|
||||
"worker_thread_start",
|
||||
"response_write",
|
||||
):
|
||||
assert stage in src
|
||||
Reference in New Issue
Block a user