Files
hermes-webui/api/session_lifecycle.py
T
nesquena-hermes 3ca1188f64 Release v0.51.256 — Release HX (stage-r4) (#3596)
## Release v0.51.256 — Release HX (stage-r4)

Performance — bound WebUI memory growth & idle CPU on large installs.

### Fixed
| Issue | Author | Fix |
|-------|--------|-----|
| #3506 | @nesquena-hermes (reported w/ profiling by @djenttleman) | On a large install (~615 sessions / 40k messages / 454 MB state.db) the WebUI process climbed ~100 MB → ~1.5 GB RSS over days and held high idle CPU. Three root causes fixed: (1) `session_lifecycle._sessions` grew unbounded → new `discard_session()` drops the entry at agent-eviction boundaries, only when no in-flight commit / no uncommitted memory work (retry invariant preserved); (2) cache caps now operator-tunable (`HERMES_WEBUI_AGENT_CACHE_MAX` default 50→25, `HERMES_WEBUI_SESSIONS_MAX`); (3) GatewayWatcher computes a cheap fingerprint before the expensive per-session `MAX(messages.timestamp)` projection and only re-projects on change. |

### Rebase + review notes
- Rebased onto current master; the code diff was verified **byte-identical to the nesquena-APPROVED head** at rebase time (only CHANGELOG re-resolved).
- The Codex regression gate then surfaced **two correctness gaps** the approval didn't catch, both fixed here with regression tests:
  1. **Watcher fingerprint missed same-count transcript rewrites.** `/retry`,`/undo`,`/compress` (`SessionDB.replace_messages`) rewrite messages with new timestamps but can leave `message_count` unchanged → stale sidebar `last_activity`. Fixed with a **per-session** grouped message aggregate (`id, count, user_count, MAX(timestamp)`) over the same non-excluded sessions (a global MAX would miss a rewrite of an older, non-newest session); cron/webui stay excluded so idle churn still doesn't re-project.
  2. **LRU agent-cache eviction could close a live worker's agent** (`popitem(last=False)`, liveness-blind — pre-existing, but the lower 50→25 cap made it more likely). Eviction now snapshots `ACTIVE_RUNS` session_ids (before the cache lock — no nested lock) and skips live sessions, deferring (temporarily exceeding cap) rather than closing a live agent.

### Gate
- Full pytest suite: **7612 passed, 0 failed** (one boot-cascade flake re-run; clean on re-run)
- ruff: CLEAN · Codex (regression): 4 rounds → both gaps + a stale test fixed → **SAFE TO SHIP**

Co-authored-by: nesquena <nesquena@users.noreply.github.com>
2026-06-04 12:28:06 -07:00

242 lines
8.7 KiB
Python

"""
Hermes WebUI memory-provider session lifecycle.
Batch-extraction memory providers (OpenViking, Holographic) only extract memories
when AIAgent.commit_memory_session() invokes provider on_session_end(). WebUI
sessions can be reopened and continued many times, so the lifecycle must guarantee:
1. Only completed, non-ephemeral turns are committable.
2. A commit finishing late must not erase work completed while it was in flight.
3. A failed commit preserves the uncommitted generation and owning agent handle.
4. Replacement/reopened agents cannot steal older dirty generations.
5. Overlapping commits are serialised via a per-session in-flight guard.
CLI-parity semantics — post-turn marking, boundary extraction/commit:
- Completed turn: Hermes core still mirrors the exchange through
run_agent.py::_sync_external_memory_for_turn(), MemoryManager sync_all(), and
provider sync_turn() WITHOUT triggering extraction. WebUI then calls
mark_turn_completed() after the saved/completed-turn boundary so later drains
know the synced session has uncommitted work and which agent owns it.
- Session boundary: commit_session_memory() triggers
AIAgent.commit_memory_session(), which calls provider on_session_end(),
posting /api/v1/sessions/<sid>/commit and triggering extraction. This is
called only at boundaries — /api/session/new with prev_session_id, explicit
agent eviction, LRU cache eviction, and shutdown drain — matching the CLI's
AIAgent.commit_memory_session()/shutdown_memory_provider() boundary.
The design uses a monotonic generation counter per session plus per-generation
agent ownership segments. mark_turn_completed() records which agent owns the new
generation. commit_session_memory() commits the earliest uncommitted segment and
compare-and-clears only that captured segment after success.
"""
from __future__ import annotations
import logging
import threading
import time
logger = logging.getLogger(__name__)
_lock = threading.Lock()
_condition = threading.Condition(_lock)
_sessions: dict[str, dict] = {}
def _new_entry() -> dict:
return {
"generation": 0,
"committed_generation": 0,
"agent": None,
"in_flight": False,
"segments": [],
}
def _reset_for_tests() -> None:
with _condition:
_sessions.clear()
_condition.notify_all()
def register_agent(session_id: str, agent) -> None:
"""Register the current agent handle for future completed generations.
Existing dirty generations keep their original segment owner. This prevents
a rebuilt/reopened agent from overwriting the handle needed to retry older
failed memory-provider work.
"""
if not session_id:
return
with _condition:
entry = _sessions.setdefault(session_id, _new_entry())
entry["agent"] = agent
_condition.notify_all()
def unregister_agent(session_id: str) -> None:
"""Clear the current future-generation agent handle.
Dirty segment owners are intentionally preserved so failed work remains
retryable even if the cache drops the current agent reference.
"""
if not session_id:
return
with _condition:
entry = _sessions.get(session_id)
if entry is not None:
entry["agent"] = None
_condition.notify_all()
def discard_session(session_id: str) -> bool:
"""Permanently drop a session's lifecycle entry to bound memory growth.
The ``_sessions`` dict is process-global and historically only ever grew:
``register_agent`` / ``mark_turn_completed`` insert keys but no runtime path
ever removed them, so every unique ``session_id`` the WebUI touched leaked a
permanent entry (issue #3506). Over days of use on a large install this is a
monotonic, unbounded climb.
This removes the entry, but only when it is provably safe to do so: no commit
is in flight and there is no uncommitted memory work that still needs the
retained agent handle. If the entry is busy or dirty it is left untouched so
failed batch-extraction memory work stays retryable -- exactly the invariant
``unregister_agent`` and ``_evict_session_agent`` already preserve.
Returns True when the entry was removed (or was already absent), False when
it was retained because work is still pending.
"""
if not session_id:
return False
with _condition:
entry = _sessions.get(session_id)
if entry is None:
return True
if entry["in_flight"]:
return False
if entry["generation"] > entry["committed_generation"]:
return False
del _sessions[session_id]
_condition.notify_all()
return True
def mark_turn_completed(session_id: str, *, agent=None) -> int:
if not session_id:
return 0
with _condition:
entry = _sessions.setdefault(session_id, _new_entry())
if agent is not None:
entry["agent"] = agent
owner = agent if agent is not None else entry.get("agent")
entry["generation"] += 1
generation = entry["generation"]
segments = entry["segments"]
if segments and not entry["in_flight"] and segments[-1].get("agent") is owner:
segments[-1]["end"] = generation
else:
segments.append({"start": generation, "end": generation, "agent": owner})
_condition.notify_all()
return generation
def has_uncommitted_work(session_id: str) -> bool:
if not session_id:
return False
with _lock:
entry = _sessions.get(session_id)
if entry is None:
return False
return entry["generation"] > entry["committed_generation"]
def _first_uncommitted_segment(entry: dict) -> dict | None:
committed = entry["committed_generation"]
for segment in entry["segments"]:
if segment["end"] > committed:
return segment
return None
def commit_session_memory(session_id: str, agent=None, *, wait: bool = False, timeout: float | None = None) -> bool:
if not session_id:
return False
deadline = time.monotonic() + timeout if timeout is not None else None
with _condition:
entry = _sessions.get(session_id)
if entry is None:
return False
while entry["in_flight"]:
if not wait:
return False
if deadline is None:
_condition.wait()
else:
remaining = deadline - time.monotonic()
if remaining <= 0:
return False
_condition.wait(remaining)
entry = _sessions.get(session_id)
if entry is None:
return False
if entry["generation"] <= entry["committed_generation"]:
return False
segment = _first_uncommitted_segment(entry)
if segment is None:
return False
effective_agent = segment.get("agent")
if effective_agent is None:
effective_agent = agent if agent is not None else entry.get("agent")
if effective_agent is not None:
segment["agent"] = effective_agent
if effective_agent is None:
return False
captured_generation = segment["end"]
entry["in_flight"] = True
try:
effective_agent.commit_memory_session()
except Exception:
logger.exception("commit_memory_session() failed for session %s", session_id)
with _condition:
re_entry = _sessions.get(session_id)
if re_entry is not None:
re_entry["in_flight"] = False
_condition.notify_all()
return False
with _condition:
re_entry = _sessions.get(session_id)
if re_entry is not None:
re_entry["in_flight"] = False
if captured_generation > re_entry["committed_generation"]:
re_entry["committed_generation"] = captured_generation
committed = re_entry["committed_generation"]
segments = re_entry["segments"]
while segments and segments[0]["end"] <= committed:
segments.pop(0)
if segments and segments[0]["start"] <= committed:
segments[0]["start"] = committed + 1
_condition.notify_all()
return True
def drain_all_on_shutdown() -> None:
while True:
with _lock:
snapshot = [sid for sid, entry in _sessions.items() if entry["generation"] > entry["committed_generation"]]
if not snapshot:
return
made_progress = False
for sid in snapshot:
if commit_session_memory(sid, wait=True):
made_progress = True
if not made_progress:
logger.debug("drain_all_on_shutdown: stopped with uncommitted sessions: %s", sorted(snapshot))
return