mirror of
https://github.com/nesquena/hermes-webui.git
synced 2026-05-25 03:00:23 +00:00
82 lines
2.9 KiB
Python
82 lines
2.9 KiB
Python
"""Regression coverage for capped CLI/agent session sidebar scans (#2628)."""
|
|
|
|
import pathlib
|
|
import sqlite3
|
|
import time
|
|
|
|
import api.agent_sessions as agent_sessions
|
|
|
|
REPO_ROOT = pathlib.Path(__file__).resolve().parents[1]
|
|
|
|
|
|
def _make_state_db(path, *, sessions=80, messages_per_session=3):
|
|
conn = sqlite3.connect(str(path))
|
|
conn.executescript(
|
|
"""
|
|
CREATE TABLE sessions (
|
|
id TEXT PRIMARY KEY,
|
|
source TEXT,
|
|
session_source TEXT,
|
|
title TEXT,
|
|
model TEXT,
|
|
started_at REAL NOT NULL,
|
|
message_count INTEGER DEFAULT 0,
|
|
parent_session_id TEXT,
|
|
ended_at REAL,
|
|
end_reason TEXT
|
|
);
|
|
CREATE INDEX idx_sessions_started ON sessions(started_at);
|
|
CREATE TABLE messages (
|
|
id TEXT PRIMARY KEY,
|
|
session_id TEXT,
|
|
role TEXT,
|
|
content TEXT,
|
|
timestamp REAL
|
|
);
|
|
CREATE INDEX idx_messages_session ON messages(session_id, timestamp);
|
|
"""
|
|
)
|
|
base = time.time() - sessions
|
|
for i in range(sessions):
|
|
sid = f"cli_perf_{i:04d}"
|
|
started = base + i
|
|
conn.execute(
|
|
"""
|
|
INSERT INTO sessions
|
|
(id, source, session_source, title, model, started_at, message_count, parent_session_id, ended_at, end_reason)
|
|
VALUES (?, 'cli', 'cli', ?, 'openai/gpt-5', ?, ?, NULL, NULL, NULL)
|
|
""",
|
|
(sid, sid, started, messages_per_session),
|
|
)
|
|
for j in range(messages_per_session):
|
|
conn.execute(
|
|
"INSERT INTO messages (id, session_id, role, content, timestamp) VALUES (?, ?, ?, 'hello', ?)",
|
|
(f"msg_{i:04d}_{j:02d}", sid, "user" if j == 0 else "assistant", started + j / 10),
|
|
)
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
|
|
def test_importable_agent_rows_push_sidebar_limit_into_sql(tmp_path):
|
|
"""A capped sidebar scan should not aggregate the entire state.db first."""
|
|
db = tmp_path / "state.db"
|
|
_make_state_db(db, sessions=120, messages_per_session=5)
|
|
|
|
rows = agent_sessions.read_importable_agent_session_rows(db, limit=20, exclude_sources=("webui",))
|
|
|
|
assert len(rows) == 20
|
|
assert [row["id"] for row in rows][:3] == ["cli_perf_0119", "cli_perf_0118", "cli_perf_0117"]
|
|
assert {row["actual_message_count"] for row in rows} == {5}
|
|
|
|
src = (REPO_ROOT / "api" / "agent_sessions.py").read_text()
|
|
assert "WITH candidates AS" in src
|
|
assert "JOIN candidates c ON c.id = s.id" in src
|
|
assert "candidate_limit = max(result_limit * 8, result_limit)" in src
|
|
|
|
|
|
def test_importable_agent_rows_zero_limit_skips_query_work(tmp_path):
|
|
db = tmp_path / "state.db"
|
|
_make_state_db(db, sessions=5, messages_per_session=1)
|
|
|
|
assert agent_sessions.read_importable_agent_session_rows(db, limit=0, exclude_sources=("webui",)) == []
|