mirror of
https://github.com/nesquena/hermes-webui.git
synced 2026-05-25 03:00:23 +00:00
204 lines
6.8 KiB
Python
204 lines
6.8 KiB
Python
import json
|
|
|
|
import api.turn_journal as turn_journal
|
|
from api.session_recovery import audit_session_recovery
|
|
from api.turn_journal import (
|
|
append_turn_journal_event,
|
|
derive_turn_journal_states,
|
|
read_turn_journal,
|
|
)
|
|
|
|
|
|
def _write_session(session_dir, sid, messages=None):
|
|
payload = {
|
|
"session_id": sid,
|
|
"title": "Turn journal test",
|
|
"messages": messages or [],
|
|
}
|
|
(session_dir / f"{sid}.json").write_text(json.dumps(payload), encoding="utf-8")
|
|
|
|
|
|
def test_append_turn_journal_event_fsyncs_jsonl_and_preserves_payload(tmp_path):
|
|
event = append_turn_journal_event(
|
|
"sid-1",
|
|
{
|
|
"event": "submitted",
|
|
"turn_id": "turn-1",
|
|
"stream_id": "stream-1",
|
|
"role": "user",
|
|
"content": "hello",
|
|
"attachments": [{"name": "a.png", "path": "/tmp/a.png"}],
|
|
},
|
|
session_dir=tmp_path,
|
|
)
|
|
|
|
assert event["version"] == 1
|
|
assert event["session_id"] == "sid-1"
|
|
assert event["created_at"] > 0
|
|
journal_path = tmp_path / "_turn_journal" / "sid-1.jsonl"
|
|
assert journal_path.exists()
|
|
lines = journal_path.read_text(encoding="utf-8").splitlines()
|
|
assert len(lines) == 1
|
|
assert json.loads(lines[0])["content"] == "hello"
|
|
|
|
|
|
def test_read_turn_journal_tolerates_malformed_lines(tmp_path):
|
|
journal_dir = tmp_path / "_turn_journal"
|
|
journal_dir.mkdir()
|
|
(journal_dir / "sid-1.jsonl").write_text(
|
|
'{"event":"submitted","turn_id":"turn-1","session_id":"sid-1"}\n'
|
|
'not-json\n'
|
|
'{"event":"completed","turn_id":"turn-1","session_id":"sid-1"}\n',
|
|
encoding="utf-8",
|
|
)
|
|
|
|
result = read_turn_journal("sid-1", session_dir=tmp_path)
|
|
|
|
assert [event["event"] for event in result["events"]] == ["submitted", "completed"]
|
|
assert result["malformed"] == [{"line": 2, "raw": "not-json"}]
|
|
|
|
|
|
def test_append_turn_journal_event_locks_around_write_and_fsync(tmp_path, monkeypatch):
|
|
calls = []
|
|
|
|
class FakeFcntl:
|
|
LOCK_EX = 1
|
|
LOCK_UN = 2
|
|
|
|
@staticmethod
|
|
def flock(fd, flag):
|
|
calls.append((fd, flag))
|
|
|
|
monkeypatch.setattr(turn_journal, "_fcntl", FakeFcntl)
|
|
|
|
append_turn_journal_event(
|
|
"sid-1",
|
|
{"event": "submitted", "turn_id": "turn-locked", "content": "x" * 5000},
|
|
session_dir=tmp_path,
|
|
)
|
|
|
|
assert [flag for _, flag in calls] == [FakeFcntl.LOCK_EX, FakeFcntl.LOCK_UN]
|
|
|
|
|
|
def test_append_turn_journal_event_still_writes_when_fcntl_unavailable(tmp_path, monkeypatch):
|
|
monkeypatch.setattr(turn_journal, "_fcntl", None)
|
|
|
|
append_turn_journal_event(
|
|
"sid-1",
|
|
{"event": "submitted", "turn_id": "turn-no-fcntl", "content": "hello"},
|
|
session_dir=tmp_path,
|
|
)
|
|
|
|
result = read_turn_journal("sid-1", session_dir=tmp_path)
|
|
assert result["events"][0]["turn_id"] == "turn-no-fcntl"
|
|
|
|
|
|
def test_derive_turn_journal_states_keeps_latest_event_per_turn():
|
|
states, _ = derive_turn_journal_states([
|
|
{"event": "submitted", "turn_id": "turn-1", "created_at": 1},
|
|
{"event": "worker_started", "turn_id": "turn-1", "created_at": 2},
|
|
{"event": "submitted", "turn_id": "turn-2", "created_at": 3},
|
|
{"event": "completed", "turn_id": "turn-1", "created_at": 4},
|
|
])
|
|
|
|
assert states["turn-1"]["event"] == "completed"
|
|
assert states["turn-2"]["event"] == "submitted"
|
|
|
|
|
|
def test_derive_turn_journal_states_uses_created_at_not_file_order():
|
|
states, _ = derive_turn_journal_states([
|
|
{"event": "completed", "turn_id": "turn-1", "created_at": 20},
|
|
{"event": "submitted", "turn_id": "turn-1", "created_at": 10},
|
|
])
|
|
|
|
assert states["turn-1"]["event"] == "completed"
|
|
|
|
|
|
def test_audit_reports_pending_turn_journal_entry_when_user_message_absent(tmp_path):
|
|
_write_session(tmp_path, "sid-1", messages=[])
|
|
append_turn_journal_event(
|
|
"sid-1",
|
|
{
|
|
"event": "submitted",
|
|
"turn_id": "turn-1",
|
|
"stream_id": "stream-1",
|
|
"role": "user",
|
|
"content": "recover me",
|
|
"attachments": [],
|
|
},
|
|
session_dir=tmp_path,
|
|
)
|
|
|
|
report = audit_session_recovery(tmp_path)
|
|
|
|
assert report["status"] == "warn"
|
|
assert report["summary"]["repairable"] == 1
|
|
assert report["items"] == [
|
|
{
|
|
"session_id": "sid-1",
|
|
"kind": "turn_journal_pending_turn",
|
|
"category": "repairable",
|
|
"recommendation": "audit_only_pending_turn_journal",
|
|
"live_messages": 0,
|
|
"bak_messages": -1,
|
|
"turn_id": "turn-1",
|
|
"event": "submitted",
|
|
}
|
|
]
|
|
|
|
|
|
def test_audit_ignores_completed_or_already_materialized_turn_journal_entry(tmp_path):
|
|
_write_session(tmp_path, "sid-1", messages=[{"role": "user", "content": "already there"}])
|
|
append_turn_journal_event(
|
|
"sid-1",
|
|
{
|
|
"event": "submitted",
|
|
"turn_id": "turn-1",
|
|
"role": "user",
|
|
"content": "already there",
|
|
},
|
|
session_dir=tmp_path,
|
|
)
|
|
append_turn_journal_event(
|
|
"sid-1",
|
|
{"event": "completed", "turn_id": "turn-1"},
|
|
session_dir=tmp_path,
|
|
)
|
|
|
|
report = audit_session_recovery(tmp_path)
|
|
|
|
assert report["status"] == "ok"
|
|
assert report["items"] == []
|
|
|
|
|
|
def test_derive_turn_journal_states_reports_terminal_collision_when_both_completed_and_interrupted():
|
|
# A turn that recorded both completed and interrupted terminal events should
|
|
# not silently collapse to one winner — the collision must be reported.
|
|
events = [
|
|
{'event': 'submitted', 'turn_id': 'turn-double-terminal', 'created_at': 1},
|
|
{'event': 'worker_started', 'turn_id': 'turn-double-terminal', 'created_at': 2},
|
|
{'event': 'completed', 'turn_id': 'turn-double-terminal', 'created_at': 3},
|
|
{'event': 'interrupted', 'turn_id': 'turn-double-terminal', 'created_at': 4, 'reason': 'server_restart'},
|
|
]
|
|
states, collisions = derive_turn_journal_states(events)
|
|
|
|
# Derived state still picks the latest by timestamp (interrupted)
|
|
assert states['turn-double-terminal']['event'] == 'interrupted'
|
|
# But the collision is explicitly reported so callers can audit it
|
|
assert len(collisions) == 1
|
|
assert collisions[0]['turn_id'] == 'turn-double-terminal'
|
|
assert [e['event'] for e in collisions[0]['events']] == ['completed', 'interrupted']
|
|
|
|
|
|
def test_derive_turn_journal_states_no_collision_when_single_terminal():
|
|
# A normal turn with only one terminal event must not produce a collision.
|
|
events = [
|
|
{'event': 'submitted', 'turn_id': 'turn-normal', 'created_at': 1},
|
|
{'event': 'worker_started', 'turn_id': 'turn-normal', 'created_at': 2},
|
|
{'event': 'completed', 'turn_id': 'turn-normal', 'created_at': 3},
|
|
]
|
|
states, collisions = derive_turn_journal_states(events)
|
|
|
|
assert states['turn-normal']['event'] == 'completed'
|
|
assert collisions == []
|