Files
hermes-webui/tests/test_session_lineage_report.py
T
2026-05-11 17:15:22 +08:00

266 lines
10 KiB
Python

"""Read-only session lineage report endpoint tests."""
import json
import sqlite3
import time
from types import SimpleNamespace
from urllib.parse import urlparse
from unittest.mock import patch
import api.agent_sessions as agent_sessions
import api.routes as routes
def _ensure_state_db(path):
conn = sqlite3.connect(str(path))
conn.executescript(
"""
CREATE TABLE sessions (
id TEXT PRIMARY KEY,
source TEXT,
session_source TEXT,
title TEXT,
model TEXT,
started_at REAL NOT NULL,
message_count INTEGER DEFAULT 0,
parent_session_id TEXT,
ended_at REAL,
end_reason TEXT
);
CREATE TABLE messages (
id TEXT PRIMARY KEY,
session_id TEXT,
role TEXT,
content TEXT,
timestamp REAL
);
"""
)
return conn
def _insert_state_row(conn, sid, *, parent=None, ended_at=None, end_reason=None, started_at=None, source="webui", session_source=None):
conn.execute(
"""
INSERT INTO sessions
(id, source, session_source, title, model, started_at, message_count, parent_session_id, ended_at, end_reason)
VALUES (?, ?, ?, ?, 'openai/gpt-5', ?, 2, ?, ?, ?)
""",
(sid, source, session_source, sid.replace("_", " "), started_at or time.time(), parent, ended_at, end_reason),
)
conn.commit()
def _insert_message(conn, sid, *, timestamp=None, role="user"):
conn.execute(
"INSERT INTO messages (id, session_id, role, content, timestamp) VALUES (?, ?, ?, 'hello', ?)",
(f"msg_{sid}_{role}", sid, role, timestamp or time.time()),
)
conn.commit()
def test_lineage_report_returns_bounded_read_only_tip_and_hidden_segments(tmp_path):
conn = _ensure_state_db(tmp_path / "state.db")
t0 = time.time() - 100
try:
_insert_state_row(conn, "lineage_report_root", started_at=t0, ended_at=t0 + 5, end_reason="compression")
_insert_state_row(conn, "lineage_report_mid", parent="lineage_report_root", started_at=t0 + 6, ended_at=t0 + 12, end_reason="cli_close")
_insert_state_row(conn, "lineage_report_tip", parent="lineage_report_mid", started_at=t0 + 13)
report = agent_sessions.read_session_lineage_report(tmp_path / "state.db", "lineage_report_tip")
assert report["mutation"] is False
assert report["session_id"] == "lineage_report_tip"
assert report["lineage_key"] == "lineage_report_root"
assert report["tip_session_id"] == "lineage_report_tip"
assert report["total_segments"] == 3
assert report["materialized_segments"] == 3
assert [s["session_id"] for s in report["segments"]] == [
"lineage_report_tip",
"lineage_report_mid",
"lineage_report_root",
]
assert [s["role"] for s in report["segments"]] == ["tip", "hidden_segment", "hidden_segment"]
assert report["children"] == []
assert report["manual_review"] is False
assert "archive_candidates" not in report
assert "delete_candidates" not in report
finally:
conn.close()
def test_lineage_report_keeps_cross_surface_parent_out_of_hidden_segments(tmp_path):
conn = _ensure_state_db(tmp_path / "state.db")
t0 = time.time() - 100
try:
_insert_state_row(
conn,
"lineage_report_telegram_parent",
source="telegram",
started_at=t0,
ended_at=t0 + 5,
end_reason="compression",
)
_insert_state_row(
conn,
"lineage_report_webui_tip",
source="webui",
parent="lineage_report_telegram_parent",
started_at=t0 + 6,
)
report = agent_sessions.read_session_lineage_report(tmp_path / "state.db", "lineage_report_webui_tip")
assert report["lineage_key"] == "lineage_report_webui_tip"
assert report["total_segments"] == 1
assert [s["session_id"] for s in report["segments"]] == ["lineage_report_webui_tip"]
assert report["segments"][0]["role"] == "tip"
assert report["children"] == []
finally:
conn.close()
def test_lineage_report_keeps_explicit_forks_out_of_hidden_segments(tmp_path):
conn = _ensure_state_db(tmp_path / "state.db")
t0 = time.time() - 100
try:
_insert_state_row(conn, "lineage_report_root", started_at=t0, ended_at=t0 + 5, end_reason="compression")
_insert_state_row(
conn,
"lineage_report_fork",
parent="lineage_report_root",
started_at=t0 + 6,
session_source="fork",
)
report = agent_sessions.read_session_lineage_report(tmp_path / "state.db", "lineage_report_fork")
assert report["lineage_key"] == "lineage_report_fork"
assert report["tip_session_id"] == "lineage_report_fork"
assert report["total_segments"] == 1
assert [s["session_id"] for s in report["segments"]] == ["lineage_report_fork"]
assert report["segments"][0]["role"] == "tip"
assert report["children"] == []
assert report["manual_review"] is False
finally:
conn.close()
def test_importable_agent_projection_keeps_explicit_forks_out_of_compression_lineage(tmp_path):
conn = _ensure_state_db(tmp_path / "state.db")
t0 = time.time() - 100
try:
_insert_state_row(conn, "lineage_report_root", started_at=t0, ended_at=t0 + 5, end_reason="compression")
_insert_state_row(
conn,
"lineage_report_fork",
parent="lineage_report_root",
started_at=t0 + 6,
session_source="fork",
)
_insert_message(conn, "lineage_report_fork", timestamp=t0 + 7)
rows = agent_sessions.read_importable_agent_session_rows(tmp_path / "state.db", exclude_sources=())
assert [row["id"] for row in rows] == ["lineage_report_fork"]
fork = rows[0]
assert fork.get("relationship_type") == "child_session"
assert fork.get("parent_session_id") == "lineage_report_root"
assert fork.get("_parent_lineage_root_id") == "lineage_report_root"
assert "_lineage_root_id" not in fork
assert "_compression_segment_count" not in fork
finally:
conn.close()
def test_lineage_report_surfaces_non_continuation_children_without_mutation(tmp_path):
conn = _ensure_state_db(tmp_path / "state.db")
t0 = time.time() - 100
try:
_insert_state_row(conn, "lineage_report_root", started_at=t0, ended_at=t0 + 5, end_reason="compression")
_insert_state_row(conn, "lineage_report_tip", parent="lineage_report_root", started_at=t0 + 6, ended_at=t0 + 15, end_reason="user_stop")
_insert_state_row(conn, "lineage_report_child", parent="lineage_report_tip", started_at=t0 + 8)
report = agent_sessions.read_session_lineage_report(tmp_path / "state.db", "lineage_report_tip")
assert report["lineage_key"] == "lineage_report_root"
assert [s["session_id"] for s in report["segments"]] == ["lineage_report_tip", "lineage_report_root"]
assert report["children"] == [
{
"session_id": "lineage_report_child",
"role": "child_session",
"title": "lineage report child",
"source": "webui",
"started_at": t0 + 8,
"updated_at": t0 + 8,
"end_reason": None,
"active": True,
"archived": False,
}
]
assert report["mutation"] is False
finally:
conn.close()
def test_lineage_report_marks_bounded_parent_walk_for_manual_review(tmp_path):
conn = _ensure_state_db(tmp_path / "state.db")
t0 = time.time() - 100
try:
_insert_state_row(conn, "lineage_report_root", started_at=t0, ended_at=t0 + 5, end_reason="compression")
_insert_state_row(conn, "lineage_report_mid", parent="lineage_report_root", started_at=t0 + 6, ended_at=t0 + 12, end_reason="compression")
_insert_state_row(conn, "lineage_report_tip", parent="lineage_report_mid", started_at=t0 + 13)
report = agent_sessions.read_session_lineage_report(tmp_path / "state.db", "lineage_report_tip", max_hops=1)
assert report["mutation"] is False
assert report["manual_review"] is True
assert [s["session_id"] for s in report["segments"]] == ["lineage_report_tip", "lineage_report_mid"]
assert report["total_segments"] == 2
finally:
conn.close()
def test_lineage_report_endpoint_is_read_only_and_uses_active_state_db(tmp_path):
conn = _ensure_state_db(tmp_path / "state.db")
t0 = time.time() - 100
try:
_insert_state_row(conn, "lineage_report_root", started_at=t0, ended_at=t0 + 5, end_reason="compression")
_insert_state_row(conn, "lineage_report_tip", parent="lineage_report_root", started_at=t0 + 6)
captured = {}
def fake_j(handler, data, status=200, **_kwargs):
captured["status"] = status
captured["data"] = data
return data
handler = SimpleNamespace()
parsed = urlparse("/api/session/lineage/report?session_id=lineage_report_tip")
with patch.object(routes, "_active_state_db_path", return_value=tmp_path / "state.db"), patch.object(routes, "j", side_effect=fake_j):
routes.handle_get(handler, parsed)
assert captured["status"] == 200
assert captured["data"]["mutation"] is False
assert captured["data"]["lineage_key"] == "lineage_report_root"
assert captured["data"]["total_segments"] == 2
finally:
conn.close()
def test_lineage_report_endpoint_returns_404_for_unknown_session(tmp_path):
conn = _ensure_state_db(tmp_path / "state.db")
conn.close()
captured = {}
def fake_bad(handler, message, status=400):
captured["status"] = status
captured["message"] = message
return {"error": message}
handler = SimpleNamespace()
parsed = urlparse("/api/session/lineage/report?session_id=missing_lineage_report_session")
with patch.object(routes, "_active_state_db_path", return_value=tmp_path / "state.db"), patch.object(routes, "bad", side_effect=fake_bad):
routes.handle_get(handler, parsed)
assert captured == {"status": 404, "message": "Session not found"}