Merge pull request #2020 from nesquena/stage-331

Release M — v0.51.37 — Compression / lineage backend (6 PRs)
This commit is contained in:
nesquena-hermes
2026-05-10 11:07:11 -07:00
committed by GitHub
14 changed files with 800 additions and 12 deletions
+2
View File
@@ -50,3 +50,5 @@ docs/*
graphify-out/
.graphify_cached.json
.graphify_uncached.txt
.venv/
+27
View File
@@ -1,5 +1,32 @@
# Hermes Web UI -- Changelog
## [v0.51.37] — 2026-05-10 — Release M (compression / lineage backend)
### Fixed
- **PR #2004** by @franksong2702 — Persisted compression boundary summary for reload UI. Both manual `/session/compress` and auto-compression paths now persist `compression_anchor_summary`, `compression_anchor_visible_idx`, and `compression_anchor_message_key` so the compression card renders correctly after a page reload. Closes #1833.
- **PR #2006** by @qxxaa — Stamp profile on continuation session after context compression. In multi-profile deployments, memory writes after auto-compression silently targeted the **default profile's** `MEMORY.md`, regardless of which profile the browser session was using. Root cause: the compression migration block in `_periodic_checkpoint` did not carry `s.profile` across to the continuation session, so subsequent requests fell back to the default profile's `HERMES_HOME`. Fix resolves the profile name from `s.profile` (or `get_active_profile_name()` while TLS still holds) at streaming-thread start, then stamps `s.profile = _resolved_profile_name` on the continuation session. Verified evidence: session `0dfefb` had read the wrong profile's `MEMORY.md` (16% / 4 entries) instead of the troubleshooting profile's bank (72-77% / 5000+ chars).
- **PR #2011** by @ai-ag2026 — Sidebar lineage collapse: prefer the latest compressed segment when a parent row is touched. Previously the sidebar collapse helper picked representatives by timestamp only, which could surface a touched-parent row instead of the newer compressed tip. Now keys on `_compression_segment_count` so the highest-count segment wins. Regression test added.
- **PR #2014** by @ai-ag2026 — Keep explicit `/api/session/branch` forks out of compression-lineage collapse. Forked sessions now mark `session_source="fork"` on creation, and the sidebar lineage helper guards against folding fork rows into the compression-collapse path even when the parent isn't currently in the rendered window. Backend marker test + sidebar guard test added.
- **PR #2015** by @Jellypowered — Stitch continuation-lineage transcripts in WebUI. Sessions split by continuation events (compression boundary, CLI-close) could show only the latest segment in the WebUI message history. `get_cli_session_messages()` now walks the valid continuation lineage and stitches messages across sessions so the full conversation is visible.
### Added
- **PR #2012** by @dso2ng — New read-only `/api/session/lineage-report/<sid>` endpoint exposing a bounded JSON diagnostic of a session's compression/branching lineage. Pure backend probe — no client UI changes. The sidebar lineage UI (#1906/#1943) already covers user-facing affordances; this fills the bounded backend probe gap for CLI/scripting use.
### Tests
5049 → **5058 collected, 5058 passing, 0 regressions** (+9 net new across `test_session_lineage_collapse.py`, `test_session_lineage_full_transcript.py`, `test_session_lineage_report.py`, `test_465_session_branching.py`, `test_auto_compression_card.py`, `test_sprint46.py`). Full suite 157s on Python 3.11 with `HERMES_HOME` isolation.
### Notes
- `api/routes.py` (4 PRs touched it) and `api/streaming.py` (2 PRs) were the multi-PR files. All hunks at distinct anchors; stage merge clean with no conflicts.
- Theme coherence: every PR in this batch addresses session compression, lineage, or continuation-stitching — the same conceptual surface from different angles.
## [v0.51.36] — 2026-05-10 — Release L (locale + provider + cross-cutting)
### Fixed
+157
View File
@@ -439,6 +439,163 @@ def read_importable_agent_session_rows(
def _lineage_report_row(row: dict, role: str) -> dict:
updated_at = row.get('ended_at') if row.get('ended_at') is not None else row.get('started_at')
return {
'session_id': row.get('id'),
'role': role,
'title': row.get('title'),
'source': row.get('source'),
'started_at': row.get('started_at'),
'updated_at': updated_at,
'end_reason': row.get('end_reason'),
'active': row.get('ended_at') is None,
'archived': False,
}
def _empty_lineage_report(session_id: str, *, found: bool = False) -> dict:
return {
'mutation': False,
'found': found,
'session_id': session_id,
'lineage_key': session_id,
'tip_session_id': session_id,
'total_segments': 0,
'materialized_segments': 0,
'segments': [],
'children': [],
'manual_review': False,
}
def read_session_lineage_report(db_path: Path, session_id: str | None, max_hops: int = 20) -> dict:
"""Return a bounded, read-only lifecycle report for a session lineage.
This helper intentionally reports only facts that can be derived from
``state.db.sessions`` without mutating WebUI JSON, archiving rows, or
deleting historical segments. It mirrors the sidebar continuation rules so
a future UI/PR can explain which rows are hidden compression/cli-close
segments and which child-session branches remain distinct.
"""
sid = str(session_id or '').strip()
if not sid:
return _empty_lineage_report('')
db_path = Path(db_path)
if not db_path.exists():
return _empty_lineage_report(sid)
try:
with closing(sqlite3.connect(str(db_path))) as conn:
conn.row_factory = sqlite3.Row
cur = conn.cursor()
cur.execute("PRAGMA table_info(sessions)")
session_cols = {row[1] for row in cur.fetchall()}
required = {'id', 'parent_session_id', 'end_reason'}
if not required.issubset(session_cols):
return _empty_lineage_report(sid)
source_expr = _optional_col('source', session_cols)
title_expr = _optional_col('title', session_cols)
started_expr = _optional_col('started_at', session_cols, '0')
ended_expr = _optional_col('ended_at', session_cols)
end_reason_expr = _optional_col('end_reason', session_cols)
parent_expr = _optional_col('parent_session_id', session_cols)
def fetch_one(row_id: str | None) -> dict | None:
if not row_id:
return None
cur.execute(
f"""
SELECT s.id,
{source_expr},
{title_expr},
{started_expr},
{parent_expr},
{ended_expr},
{end_reason_expr}
FROM sessions s
WHERE s.id = ?
""",
(row_id,),
)
row = cur.fetchone()
return dict(row) if row else None
target = fetch_one(sid)
if not target:
return _empty_lineage_report(sid)
segments = [target]
current = target
seen = {sid}
manual_review = False
for _hop in range(max(0, int(max_hops))):
parent_id = current.get('parent_session_id')
parent = fetch_one(parent_id)
if not parent or parent_id in seen:
manual_review = bool(parent_id and parent_id in seen)
break
if not _is_continuation_session(parent, current):
break
segments.append(parent)
seen.add(parent_id)
current = parent
else:
manual_review = True
segment_ids = {row['id'] for row in segments}
child_rows: list[dict] = []
for parent in segments:
cur.execute(
f"""
SELECT s.id,
{source_expr},
{title_expr},
{started_expr},
{parent_expr},
{ended_expr},
{end_reason_expr}
FROM sessions s
WHERE s.parent_session_id = ?
ORDER BY s.started_at DESC
""",
(parent['id'],),
)
for child_row in cur.fetchall():
child = dict(child_row)
if child['id'] in segment_ids:
continue
if _is_continuation_session(parent, child):
# A continuation outside the selected path means the
# lineage is branched or the caller selected an older
# segment. Report manual review rather than proposing
# destructive cleanup candidates.
manual_review = True
continue
child_rows.append(child)
except Exception:
return _empty_lineage_report(sid)
root_id = segments[-1]['id'] if segments else sid
tip_id = segments[0]['id'] if segments else sid
return {
'mutation': False,
'found': True,
'session_id': sid,
'lineage_key': root_id,
'tip_session_id': tip_id,
'total_segments': len(segments),
'materialized_segments': len(segments),
'segments': [
_lineage_report_row(row, 'tip' if idx == 0 else 'hidden_segment')
for idx, row in enumerate(segments)
],
'children': [_lineage_report_row(row, 'child_session') for row in child_rows],
'manual_review': manual_review,
}
def read_session_lineage_metadata(db_path: Path, session_ids: list[str] | set[str]) -> dict[str, dict]:
"""Return compression-lineage metadata for known WebUI sidebar sessions.
+55 -5
View File
@@ -329,6 +329,7 @@ class Session:
context_messages=None,
compression_anchor_visible_idx=None,
compression_anchor_message_key=None,
compression_anchor_summary=None,
context_length=None, threshold_tokens=None,
last_prompt_tokens=None,
gateway_routing=None, gateway_routing_history=None,
@@ -361,6 +362,7 @@ class Session:
self.context_messages = context_messages if isinstance(context_messages, list) else []
self.compression_anchor_visible_idx = compression_anchor_visible_idx
self.compression_anchor_message_key = compression_anchor_message_key
self.compression_anchor_summary = compression_anchor_summary
self.context_length = context_length
self.threshold_tokens = threshold_tokens
self.last_prompt_tokens = last_prompt_tokens
@@ -411,6 +413,7 @@ class Session:
'personality', 'active_stream_id',
'pending_user_message', 'pending_attachments', 'pending_started_at',
'compression_anchor_visible_idx', 'compression_anchor_message_key',
'compression_anchor_summary',
'context_length', 'threshold_tokens', 'last_prompt_tokens',
'gateway_routing', 'gateway_routing_history', 'llm_title_generated',
'parent_session_id',
@@ -572,6 +575,7 @@ class Session:
'personality': self.personality,
'compression_anchor_visible_idx': self.compression_anchor_visible_idx,
'compression_anchor_message_key': self.compression_anchor_message_key,
'compression_anchor_summary': self.compression_anchor_summary,
'context_length': self.context_length,
'threshold_tokens': self.threshold_tokens,
'last_prompt_tokens': self.last_prompt_tokens,
@@ -1662,7 +1666,9 @@ def get_cli_session_messages(sid) -> list:
Preserve tool-call/result and reasoning metadata from the agent state.db so
CLI-origin transcripts render with the same tool cards as WebUI-native
sessions. Returns empty list on any error.
sessions. When the requested session is the tip of a compression/CLI-close
continuation chain, return the stitched full transcript across all segments
in chronological order. Returns empty list on any error.
"""
import os
if str(sid or '').startswith(f'{CLAUDE_CODE_SOURCE}_'):
@@ -1701,12 +1707,56 @@ def get_cli_session_messages(sid) -> list:
'codex_message_items',
]
selected = ['role', 'content', 'timestamp'] + [c for c in optional if c in available]
cur.execute("PRAGMA table_info(sessions)")
session_cols = {str(row['name']) for row in cur.fetchall()}
session_chain = [str(sid)]
if {'parent_session_id', 'end_reason', 'started_at', 'source'}.issubset(session_cols):
cur.execute(
"""
SELECT id, source, started_at, parent_session_id, ended_at, end_reason
FROM sessions
WHERE id = ?
""",
(sid,),
)
rows_by_id = {}
row = cur.fetchone()
if row:
rows_by_id[str(row['id'])] = dict(row)
current_id = str(row['id'])
seen = {current_id}
for _ in range(20):
current = rows_by_id.get(current_id)
parent_id = current.get('parent_session_id') if current else None
if not parent_id or parent_id in seen:
break
cur.execute(
"""
SELECT id, source, started_at, parent_session_id, ended_at, end_reason
FROM sessions
WHERE id = ?
""",
(parent_id,),
)
parent_row = cur.fetchone()
if not parent_row:
break
parent_dict = dict(parent_row)
rows_by_id[str(parent_row['id'])] = parent_dict
if not _is_continuation_session(parent_dict, current):
break
session_chain.insert(0, str(parent_row['id']))
current_id = str(parent_row['id'])
seen.add(current_id)
placeholders = ', '.join('?' for _ in session_chain)
cur.execute(f"""
SELECT {', '.join(selected)}
SELECT {', '.join(selected)}, session_id
FROM messages
WHERE session_id = ?
ORDER BY timestamp ASC
""", (sid,))
WHERE session_id IN ({placeholders})
ORDER BY timestamp ASC, id ASC
""", session_chain)
msgs = []
for row in cur.fetchall():
msg = {
+74 -2
View File
@@ -26,6 +26,7 @@ from api.agent_sessions import (
MESSAGING_SOURCES,
is_cli_session_row,
is_cli_session_row_visible,
read_session_lineage_report,
)
logger = logging.getLogger(__name__)
@@ -3028,8 +3029,31 @@ def handle_get(handler, parsed) -> bool:
# longer visible conversation than the single state.db
# segment for this messaging session id. Prefer the longer
# sidecar so repaired WebUI history is not hidden behind the
# canonical per-segment transcript.
_all_msgs = sidecar_messages if len(sidecar_messages) > len(cli_messages) else cli_messages
# canonical per-segment transcript. When both sources carry
# different slices of the same stitched conversation, merge
# them chronologically and dedupe exact repeats.
if sidecar_messages and sidecar_messages != cli_messages:
merged_messages = []
seen_message_keys = set()
for msg in sorted(list(cli_messages) + list(sidecar_messages), key=lambda m: (
float(m.get("timestamp") or 0),
str(m.get("role") or ""),
str(m.get("content") or ""),
)):
key = (
str(msg.get("role") or ""),
str(msg.get("content") or ""),
str(msg.get("timestamp") or ""),
str(msg.get("tool_call_id") or ""),
str(msg.get("tool_name") or msg.get("name") or ""),
)
if key in seen_message_keys:
continue
seen_message_keys.add(key)
merged_messages.append(msg)
_all_msgs = merged_messages
else:
_all_msgs = sidecar_messages if len(sidecar_messages) > len(cli_messages) else cli_messages
else:
_all_msgs = s.messages
else:
@@ -3184,6 +3208,15 @@ def handle_get(handler, parsed) -> bool:
return j(handler, {"session": redact_session_data(sess)})
return bad(handler, "Session not found", 404)
if parsed.path == "/api/session/lineage/report":
sid = parse_qs(parsed.query).get("session_id", [""])[0]
if not sid:
return bad(handler, "session_id required", 400)
report = read_session_lineage_report(_active_state_db_path(), sid)
if not report.get("found"):
return bad(handler, "Session not found", 404)
return j(handler, report)
if parsed.path == "/api/session/status":
sid = parse_qs(parsed.query).get("session_id", [""])[0]
if not sid:
@@ -4232,6 +4265,7 @@ def handle_post(handler, parsed) -> bool:
title=branch_title,
messages=forked_messages,
parent_session_id=source.session_id,
session_source="fork",
)
with LOCK:
SESSIONS[branch.session_id] = branch
@@ -7505,6 +7539,38 @@ def _handle_session_compress(handler, body):
return None
return {"role": role, "ts": ts, "text": norm, "attachments": attach_count}
def _compression_summary_from_messages(messages):
text = None
for m in reversed(messages or []):
if not isinstance(m, dict):
continue
role = str(m.get("role") or "").lower()
if role != "assistant":
continue
if not isinstance(m.get("content"), str):
continue
content = str(m.get("content") or "").strip()
if not content:
continue
norm = re.sub(r"\s+", " ", content).strip()
if (
"context compaction" in norm.lower()
or "context compression" in norm.lower()
):
return norm
return None
def _compact_summary_text(raw_text):
if not isinstance(raw_text, str):
return None
txt = raw_text.strip()
if not txt:
return None
txt = re.sub(r"\s+", " ", txt)
if len(txt) > 320:
txt = f"{txt[:314]}"
return txt
try:
require(body, "session_id")
except ValueError as e:
@@ -7691,6 +7757,12 @@ def _handle_session_compress(handler, body):
visible_after = _visible_messages_for_anchor(compressed)
s.compression_anchor_visible_idx = max(0, len(visible_after) - 1) if visible_after else None
s.compression_anchor_message_key = _anchor_message_key(visible_after[-1]) if visible_after else None
summary_text = None
if isinstance(summary, dict):
summary_text = summary.get("reference_message") or summary.get("token_line") or summary.get("headline")
s.compression_anchor_summary = _compact_summary_text(
summary_text or _compression_summary_from_messages(compressed) or ""
)
s.save()
session_payload = redact_session_data(
+126 -1
View File
@@ -1550,6 +1550,87 @@ def _is_context_compression_marker(msg):
)
def _compact_summary_text(raw_text: str | None, limit: int = 320) -> str | None:
"""Normalize a text blob used in compression summary cards."""
if not isinstance(raw_text, str):
return None
txt = raw_text.strip()
if not txt:
return None
txt = re.sub(r"\s+", " ", txt).strip()
if len(txt) > limit:
txt = f"{txt[: limit - 6]}"
return txt
def _compression_anchor_message_key(message):
if not isinstance(message, dict):
return None
role = str(message.get('role') or '')
if not role or role == 'tool':
return None
content = message.get('content', '')
text = _message_text(content)
if len(text) > 160:
text = text[:160]
ts = message.get('_ts') or message.get('timestamp')
attachments = message.get('attachments')
attach_count = len(attachments) if isinstance(attachments, list) else 0
if not text and not attach_count and not ts:
return None
return {'role': role, 'ts': ts, 'text': text, 'attachments': attach_count}
def _visible_messages_for_compression_anchor(messages):
out = []
for m in messages or []:
if not isinstance(m, dict):
continue
role = m.get('role')
if not role or role == 'tool':
continue
content = m.get('content', '')
has_attachments = bool(m.get('attachments'))
has_tool_calls = bool(isinstance(m.get('tool_calls'), list) and m.get('tool_calls'))
has_tool_use = False
has_reasoning = bool(m.get('reasoning'))
if isinstance(content, list):
text = '\n'.join(
str(p.get('text') or p.get('content') or '')
for p in content
if isinstance(p, dict)
and p.get('type') in {'text', 'input_text', 'output_text'}
).strip()
for part in content:
if not isinstance(part, dict):
continue
if part.get('type') == 'tool_use':
has_tool_use = True
if not text:
has_reasoning = has_reasoning or any(
isinstance(part, dict)
and part.get('type') in {'thinking', 'reasoning'}
for part in content
)
else:
text = str(content or '').strip()
if text or has_attachments or has_tool_calls or has_tool_use or has_reasoning:
out.append(m)
return out
def _compression_summary_from_messages(messages):
for m in reversed(messages or []):
if not isinstance(m, dict):
continue
if not _is_context_compression_marker(m):
continue
text = _message_text(m.get('content'))
if text:
return text
return None
def _find_current_user_turn(messages, msg_text):
needle = " ".join(str(msg_text or '').split())
fallback = None
@@ -2036,7 +2117,24 @@ def _run_agent_streaming(
except ImportError:
_profile_home = os.environ.get('HERMES_HOME', '')
_profile_runtime_env = {}
# Capture the resolved profile name now, while profile context is
# reliable. Used in the compression migration block to stamp s.profile
# on the continuation session. We resolve it here rather than calling
# get_active_profile_name() at compression time because that function
# reads thread-local storage (_tls.profile) set by set_request_profile()
# on the HTTP handler thread. The streaming thread is a separate
# threading.Thread and does not inherit TLS. At compression time,
# get_active_profile_name() would fall back to the process-global
# _active_profile, which may belong to a different concurrent tab.
_resolved_profile_name = getattr(s, 'profile', None)
if not _resolved_profile_name:
try:
from api.profiles import get_active_profile_name
_resolved_profile_name = get_active_profile_name()
except Exception:
_resolved_profile_name = None
_thread_env = _build_agent_thread_env(
_profile_runtime_env,
str(s.workspace),
@@ -3004,6 +3102,22 @@ def _run_agent_streaming(
old_path = SESSION_DIR / f'{old_sid}.json'
new_path = SESSION_DIR / f'{new_sid}.json'
s.session_id = new_sid
# Carry profile identity across the compression boundary.
# Without this, s.profile stays None on the continuation
# session. On the next request, _run_agent_streaming calls
# get_hermes_home_for_profile(getattr(s, 'profile', None))
# which falls back to the default profile's HERMES_HOME.
# Memory writes then land in the wrong profile's MEMORY.md.
# Stamping here also ensures s.save() persists a non-null
# profile field to the continuation session's JSON file,
# covering the case where the session is later evicted from
# SESSIONS and reconstructed from disk via Session.load().
if not s.profile and _resolved_profile_name:
s.profile = _resolved_profile_name
logger.info(
"Stamped profile=%r on continuation session %s after compression",
_resolved_profile_name, new_sid,
)
with LOCK:
if old_sid in SESSIONS:
SESSIONS[new_sid] = SESSIONS.pop(old_sid)
@@ -3033,6 +3147,17 @@ def _run_agent_streaming(
_compressed = True
# Notify the frontend that compression happened
if _compressed:
visible_after = _visible_messages_for_compression_anchor(s.messages)
s.compression_anchor_visible_idx = (
max(0, len(visible_after) - 1) if visible_after else None
)
s.compression_anchor_message_key = (
_compression_anchor_message_key(visible_after[-1]) if visible_after else None
)
s.compression_anchor_summary = _compact_summary_text(
_compression_summary_from_messages(s.messages)
or _compression_summary_from_messages(s.context_messages)
)
put('compressed', {
'message': 'Context auto-compressed to continue the conversation',
})
+9 -1
View File
@@ -1978,6 +1978,7 @@ function _isChildSession(s){
function _sessionLineageKey(s, sessionIdsInList){
if(!s||!s.session_id) return null;
if(_isChildSession(s)) return null;
if(s.session_source==='fork') return null;
const lineageKey=s._lineage_root_id||s.lineage_root_id||null;
if(lineageKey) return lineageKey;
// If parent_session_id points to another session in the current list,
@@ -2102,7 +2103,14 @@ function _collapseSessionLineageForSidebar(sessions){
}
for(const [key,items] of groups.entries()){
if(items.length<=1){result.push(items[0]);continue;}
const sorted=[...items].sort((a,b)=>_sessionTimestampMs(b)-_sessionTimestampMs(a));
const sorted=[...items].sort((a,b)=>{
const bSeg=Number(b&&b._compression_segment_count||0);
const aSeg=Number(a&&a._compression_segment_count||0);
if(bSeg||aSeg){
if(bSeg!==aSeg) return bSeg-aSeg;
}
return _sessionTimestampMs(b)-_sessionTimestampMs(a);
});
const chosen=sorted[0];
result.push({...chosen,_lineage_key:key,_lineage_collapsed_count:items.length,_lineage_segments:sorted});
}
+7 -2
View File
@@ -4759,6 +4759,9 @@ function renderMessages(options){
const sessionCompressionAnchorKey=(
S.session && S.session.compression_anchor_message_key && typeof S.session.compression_anchor_message_key==='object'
) ? S.session.compression_anchor_message_key : null;
const sessionCompressionSummary=(
S.session && typeof S.session.compression_anchor_summary==='string'
) ? S.session.compression_anchor_summary.trim() : '';
const preservedCompressionTaskMessages=_latestPreservedCompressionTaskListMessages(S.messages);
const vis=S.messages.filter(m=>{
if(!m||!m.role||m.role==='tool')return false;
@@ -4775,8 +4778,10 @@ function renderMessages(options){
inner.innerHTML='';
const compressionNode=compressionState?_compressionCardsNode(compressionState):null;
const referenceMessage=S.messages.find(m=>_isContextCompactionMessage(m));
const referenceText=referenceMessage?msgContent(referenceMessage)||String(referenceMessage.content||''):'';
const referenceNode=(!compressionState && referenceMessage && (sessionCompressionAnchor!==null || sessionCompressionAnchorKey))
const referenceText=referenceMessage
? msgContent(referenceMessage)||String(referenceMessage.content||'')
: sessionCompressionSummary;
const referenceNode=(!compressionState && !!referenceText && (sessionCompressionAnchor!==null || sessionCompressionAnchorKey || sessionCompressionSummary))
? (()=>{const row=document.createElement('div');row.innerHTML=`<div class="compression-turn"><div class="compression-turn-blocks">${_compressionReferenceCardHtml(referenceText,false)}${_preservedCompressionTaskListCardsHtml(preservedCompressionTaskMessages)}</div></div>`;return row.firstElementChild;})()
: null;
let preservedCompressionTaskCardsAttached=!!referenceNode;
+26
View File
@@ -68,6 +68,32 @@ def test_branch_creates_session_with_parent():
"Branch handler should set parent_session_id to source session"
def test_branch_marks_explicit_forks_as_fork_sessions():
"""Explicit branches must not be mistaken for compression lineage rows."""
with open('api/routes.py') as f:
src = f.read()
branch_match = re.search(
r'parsed\.path == "/api/session/branch"(.*?)(?=\n if parsed\.path|$)',
src, re.DOTALL
)
assert branch_match
block = branch_match.group(1)
assert 'session_source="fork"' in block, \
"Branch handler should mark explicit forks with session_source='fork'"
def test_branch_fork_sessions_do_not_collapse_into_parent_lineage():
"""Forks remain selectable rows even if their parent is not in the current list."""
with open('static/sessions.js') as f:
src = f.read()
fn = re.search(r'function _sessionLineageKey\(.*?\n\}', src, re.DOTALL)
assert fn, "Could not find _sessionLineageKey"
block = fn.group(0)
assert "if(s.session_source==='fork') return null;" in block, \
"Explicit fork sessions should not collapse via parent_session_id"
assert block.index("if(s.session_source==='fork') return null;") < block.index('return s.parent_session_id || null')
def test_branch_keep_count_support():
"""Verify the branch endpoint supports keep_count parameter."""
with open('api/routes.py') as f:
+10
View File
@@ -192,6 +192,16 @@ def test_preserved_task_list_renders_through_compression_card_path():
assert "_contextCompactionMessageHtml(m, tsTitle, preservedForThisCard)" in src
def test_context_anchor_reference_uses_session_summary_fallback():
src = _read("static/ui.js")
assert "sessionCompressionSummary" in src
assert "const sessionCompressionSummary" in src
assert "referenceText=referenceMessage" in src
assert ": sessionCompressionSummary" in src
assert "!!referenceText && (sessionCompressionAnchor!==null || sessionCompressionAnchorKey || sessionCompressionSummary)" in src
def test_preserved_task_list_attaches_once_per_render():
src = _read("static/ui.js")
+41
View File
@@ -170,6 +170,47 @@ console.log(JSON.stringify(collapsed));
assert [seg["session_id"] for seg in collapsed[0]["_lineage_segments"]] == ["seg10", "seg9", "seg8", "seg7"]
def test_sidebar_lineage_collapse_prefers_highest_compression_segment_over_touched_parent():
"""A touched parent segment must not hide the newer compressed tip.
Opening or polling an older segment can refresh its updated_at without adding
messages. The collapsed sidebar row must still pick the highest compression
segment, otherwise the visible chat jumps back to a parent that lacks the
completed assistant answer.
"""
js = SESSIONS_JS_PATH.read_text(encoding="utf-8")
source = f"""
const src = {js!r};
function extractFunc(name) {{
const re = new RegExp('function\\\\s+' + name + '\\\\s*\\\\(');
const start = src.search(re);
if (start < 0) throw new Error(name + ' not found');
let i = src.indexOf('{{', start);
let depth = 1; i++;
while (depth > 0 && i < src.length) {{
if (src[i] === '{{') depth++;
else if (src[i] === '}}') depth--;
i++;
}}
return src.slice(start, i);
}}
eval(extractFunc('_sessionTimestampMs'));
eval(extractFunc('_isChildSession'));
eval(extractFunc('_sessionLineageKey'));
eval(extractFunc('_collapseSessionLineageForSidebar'));
const sessions = [
{{session_id:'seg13', title:'Schaue dir die Release (fork)', message_count:2490, updated_at:200, last_message_at:200, _lineage_root_id:'root', _compression_segment_count:13}},
{{session_id:'seg14', title:'Schaue dir die Release (fork)', message_count:2532, updated_at:150, last_message_at:150, _lineage_root_id:'root', _compression_segment_count:14}},
];
const collapsed = _collapseSessionLineageForSidebar(sessions);
console.log(JSON.stringify(collapsed));
"""
collapsed = json.loads(_run_node(source))
assert [row["session_id"] for row in collapsed] == ["seg14"]
assert collapsed[0]["_lineage_collapsed_count"] == 2
assert [seg["session_id"] for seg in collapsed[0]["_lineage_segments"]] == ["seg14", "seg13"]
def test_sidebar_attaches_child_sessions_to_collapsed_hidden_parent_lineage():
js = SESSIONS_JS_PATH.read_text(encoding="utf-8")
@@ -0,0 +1,61 @@
"""Regression coverage for stitched full-transcript loading across session segments."""
from __future__ import annotations
import api.routes as routes
def test_session_endpoint_merges_sidecar_and_lineage_messages_for_cli_sessions(monkeypatch):
class DummySession:
def __init__(self):
self.messages = [{"role": "assistant", "content": "sidecar tail", "timestamp": 10.0}]
self.tool_calls = []
self.active_stream_id = None
self.pending_user_message = None
self.pending_attachments = []
self.pending_started_at = None
self.context_length = 0
self.threshold_tokens = 0
self.last_prompt_tokens = 0
self.model = "openai/gpt-5"
self.session_id = "tip"
def compact(self):
return {"session_id": "tip", "title": "Tip", "model": "openai/gpt-5"}
captured = {}
monkeypatch.setattr(routes, "get_session", lambda sid, metadata_only=False: DummySession())
monkeypatch.setattr(routes, "_clear_stale_stream_state", lambda s: None)
monkeypatch.setattr(routes, "_lookup_cli_session_metadata", lambda sid: {"session_source": "messaging"})
monkeypatch.setattr(routes, "_is_messaging_session_record", lambda s: True)
monkeypatch.setattr(
routes,
"get_cli_session_messages",
lambda sid: [
{"role": "user", "content": "root user", "timestamp": 1.0},
{"role": "assistant", "content": "tip assistant", "timestamp": 2.0},
],
)
monkeypatch.setattr(routes, "_resolve_effective_session_model_for_display", lambda s: getattr(s, "model", None))
monkeypatch.setattr(routes, "_resolve_effective_session_model_provider_for_display", lambda s: None)
monkeypatch.setattr(routes, "_merge_cli_sidebar_metadata", lambda raw, meta: raw)
monkeypatch.setattr(routes, "redact_session_data", lambda raw: raw)
monkeypatch.setattr(routes, "j", lambda handler, payload, status=200: captured.setdefault("payload", payload))
class Handler:
pass
class Parsed:
path = "/api/session"
query = "session_id=tip"
routes.handle_get(Handler(), Parsed())
session = captured["payload"]["session"]
assert [m["content"] for m in session["messages"]] == [
"root user",
"tip assistant",
"sidecar tail",
]
+196
View File
@@ -0,0 +1,196 @@
"""Read-only session lineage report endpoint tests."""
import json
import sqlite3
import time
from types import SimpleNamespace
from urllib.parse import urlparse
from unittest.mock import patch
import api.agent_sessions as agent_sessions
import api.routes as routes
def _ensure_state_db(path):
conn = sqlite3.connect(str(path))
conn.executescript(
"""
CREATE TABLE sessions (
id TEXT PRIMARY KEY,
source TEXT,
title TEXT,
model TEXT,
started_at REAL NOT NULL,
message_count INTEGER DEFAULT 0,
parent_session_id TEXT,
ended_at REAL,
end_reason TEXT
);
"""
)
return conn
def _insert_state_row(conn, sid, *, parent=None, ended_at=None, end_reason=None, started_at=None, source="webui"):
conn.execute(
"""
INSERT INTO sessions
(id, source, title, model, started_at, message_count, parent_session_id, ended_at, end_reason)
VALUES (?, ?, ?, 'openai/gpt-5', ?, 2, ?, ?, ?)
""",
(sid, source, sid.replace("_", " "), started_at or time.time(), parent, ended_at, end_reason),
)
conn.commit()
def test_lineage_report_returns_bounded_read_only_tip_and_hidden_segments(tmp_path):
conn = _ensure_state_db(tmp_path / "state.db")
t0 = time.time() - 100
try:
_insert_state_row(conn, "lineage_report_root", started_at=t0, ended_at=t0 + 5, end_reason="compression")
_insert_state_row(conn, "lineage_report_mid", parent="lineage_report_root", started_at=t0 + 6, ended_at=t0 + 12, end_reason="cli_close")
_insert_state_row(conn, "lineage_report_tip", parent="lineage_report_mid", started_at=t0 + 13)
report = agent_sessions.read_session_lineage_report(tmp_path / "state.db", "lineage_report_tip")
assert report["mutation"] is False
assert report["session_id"] == "lineage_report_tip"
assert report["lineage_key"] == "lineage_report_root"
assert report["tip_session_id"] == "lineage_report_tip"
assert report["total_segments"] == 3
assert report["materialized_segments"] == 3
assert [s["session_id"] for s in report["segments"]] == [
"lineage_report_tip",
"lineage_report_mid",
"lineage_report_root",
]
assert [s["role"] for s in report["segments"]] == ["tip", "hidden_segment", "hidden_segment"]
assert report["children"] == []
assert report["manual_review"] is False
assert "archive_candidates" not in report
assert "delete_candidates" not in report
finally:
conn.close()
def test_lineage_report_keeps_cross_surface_parent_out_of_hidden_segments(tmp_path):
conn = _ensure_state_db(tmp_path / "state.db")
t0 = time.time() - 100
try:
_insert_state_row(
conn,
"lineage_report_telegram_parent",
source="telegram",
started_at=t0,
ended_at=t0 + 5,
end_reason="compression",
)
_insert_state_row(
conn,
"lineage_report_webui_tip",
source="webui",
parent="lineage_report_telegram_parent",
started_at=t0 + 6,
)
report = agent_sessions.read_session_lineage_report(tmp_path / "state.db", "lineage_report_webui_tip")
assert report["lineage_key"] == "lineage_report_webui_tip"
assert report["total_segments"] == 1
assert [s["session_id"] for s in report["segments"]] == ["lineage_report_webui_tip"]
assert report["segments"][0]["role"] == "tip"
assert report["children"] == []
finally:
conn.close()
def test_lineage_report_surfaces_non_continuation_children_without_mutation(tmp_path):
conn = _ensure_state_db(tmp_path / "state.db")
t0 = time.time() - 100
try:
_insert_state_row(conn, "lineage_report_root", started_at=t0, ended_at=t0 + 5, end_reason="compression")
_insert_state_row(conn, "lineage_report_tip", parent="lineage_report_root", started_at=t0 + 6, ended_at=t0 + 15, end_reason="user_stop")
_insert_state_row(conn, "lineage_report_child", parent="lineage_report_tip", started_at=t0 + 8)
report = agent_sessions.read_session_lineage_report(tmp_path / "state.db", "lineage_report_tip")
assert report["lineage_key"] == "lineage_report_root"
assert [s["session_id"] for s in report["segments"]] == ["lineage_report_tip", "lineage_report_root"]
assert report["children"] == [
{
"session_id": "lineage_report_child",
"role": "child_session",
"title": "lineage report child",
"source": "webui",
"started_at": t0 + 8,
"updated_at": t0 + 8,
"end_reason": None,
"active": True,
"archived": False,
}
]
assert report["mutation"] is False
finally:
conn.close()
def test_lineage_report_marks_bounded_parent_walk_for_manual_review(tmp_path):
conn = _ensure_state_db(tmp_path / "state.db")
t0 = time.time() - 100
try:
_insert_state_row(conn, "lineage_report_root", started_at=t0, ended_at=t0 + 5, end_reason="compression")
_insert_state_row(conn, "lineage_report_mid", parent="lineage_report_root", started_at=t0 + 6, ended_at=t0 + 12, end_reason="compression")
_insert_state_row(conn, "lineage_report_tip", parent="lineage_report_mid", started_at=t0 + 13)
report = agent_sessions.read_session_lineage_report(tmp_path / "state.db", "lineage_report_tip", max_hops=1)
assert report["mutation"] is False
assert report["manual_review"] is True
assert [s["session_id"] for s in report["segments"]] == ["lineage_report_tip", "lineage_report_mid"]
assert report["total_segments"] == 2
finally:
conn.close()
def test_lineage_report_endpoint_is_read_only_and_uses_active_state_db(tmp_path):
conn = _ensure_state_db(tmp_path / "state.db")
t0 = time.time() - 100
try:
_insert_state_row(conn, "lineage_report_root", started_at=t0, ended_at=t0 + 5, end_reason="compression")
_insert_state_row(conn, "lineage_report_tip", parent="lineage_report_root", started_at=t0 + 6)
captured = {}
def fake_j(handler, data, status=200, **_kwargs):
captured["status"] = status
captured["data"] = data
return data
handler = SimpleNamespace()
parsed = urlparse("/api/session/lineage/report?session_id=lineage_report_tip")
with patch.object(routes, "_active_state_db_path", return_value=tmp_path / "state.db"), patch.object(routes, "j", side_effect=fake_j):
routes.handle_get(handler, parsed)
assert captured["status"] == 200
assert captured["data"]["mutation"] is False
assert captured["data"]["lineage_key"] == "lineage_report_root"
assert captured["data"]["total_segments"] == 2
finally:
conn.close()
def test_lineage_report_endpoint_returns_404_for_unknown_session(tmp_path):
conn = _ensure_state_db(tmp_path / "state.db")
conn.close()
captured = {}
def fake_bad(handler, message, status=400):
captured["status"] = status
captured["message"] = message
return {"error": message}
handler = SimpleNamespace()
parsed = urlparse("/api/session/lineage/report?session_id=missing_lineage_report_session")
with patch.object(routes, "_active_state_db_path", return_value=tmp_path / "state.db"), patch.object(routes, "bad", side_effect=fake_bad):
routes.handle_get(handler, parsed)
assert captured == {"status": 404, "message": "Session not found"}
+9 -1
View File
@@ -10,7 +10,7 @@ import types
from api.models import Session
from api.config import SESSION_DIR
from api.routes import _handle_session_compress
from api.routes import _handle_session_compress, get_session
from tests._pytest_port import BASE
@@ -141,6 +141,14 @@ def test_session_compress_roundtrip(monkeypatch, cleanup_test_sessions):
{"role": "user", "content": "one"},
{"role": "assistant", "content": "four"},
]
assert payload["session"]["compression_anchor_summary"] is not None
assert payload["session"]["compression_anchor_visible_idx"] == 1
assert isinstance(payload["session"]["compression_anchor_message_key"], dict)
assert payload["session"]["compression_anchor_message_key"].get("role") == "assistant"
loaded = get_session(sid)
assert loaded.compression_anchor_summary == payload["session"]["compression_anchor_summary"]
assert loaded.compression_anchor_visible_idx == payload["session"]["compression_anchor_visible_idx"]
assert loaded.compression_anchor_message_key == payload["session"]["compression_anchor_message_key"]
assert _FakeAgent.last_instance is not None
assert _FakeAgent.last_instance.context_compressor.calls[0]["focus_topic"] == "database schema"