fix: preserve session history during compression rotation (#2223)

The previous implementation renamed old_sid.json → new_sid.json during
context compression, destroying the only persistent copy of the full
conversation history. If the summarisation LLM call also failed, the
user was left with zero recoverable messages.

Fix:
- Remove the destructive old_path.rename(new_path) call
- Preserve old_sid.json as an immutable pre-compression archive
- Create new_sid.json as a fresh file via s.save()
- Set parent_session_id on the continuation session for lineage
- Save in-memory messages to old_sid.json if they're newer than disk

Test: test_issue2223_compression_no_rename.py (6 tests, all passing)
This commit is contained in:
RØG3R L!M4
2026-05-14 03:02:44 +00:00
parent 3f4e82b4d1
commit 5bbf18324c
3 changed files with 157 additions and 6 deletions
+2
View File
@@ -4,6 +4,8 @@
### Fixed
- **Compression session rotation preserves history (#2223)** — Context compression no longer destroys the original session file. The previous implementation renamed `old_sid.json``new_sid.json` before saving the compressed session, destroying the only persistent copy of the full conversation history. If the summarisation LLM call also failed, the user was left with zero recoverable messages. The fix: leave `old_sid.json` intact, create `new_sid.json` as a fresh file, and set `parent_session_id` on the continuation session to link the lineage. The old file retains the complete pre-compression transcript as an immutable archive.
- **PR #2217** by @franksong2702 (refs #2215 Fix B) — Drops the leftover `re.MULTILINE` flag from the "the user is asking" pre-amble strip pattern in `api/streaming.py:695`. PR #2213 removed `re.MULTILINE` from the three sibling wrapper-strip patterns (`<think>`, MiniMax, Gemma) but missed this one instance. With `re.MULTILINE`, `^` matched the start of any line in the response, so a mid-response line that legitimately started with "The user is asking us to wait" could be stripped silently. Now the pattern only matches when the entire response leads with that wrapper, consistent with the other strips. One-flag, two-character change + regression test pinning the new behavior.
- **PR #2216** by @franksong2702 (closes #2215 Fix A) — Caps the `_summary_cache` for per-target update summaries with an `OrderedDict`-backed LRU bounded at 16 entries. Pre-fix the cache was an unbounded plain dict introduced in PR #2207; cardinality is small in practice (0-2 active update ranges per server lifetime) so this is defensive future-proofing rather than a leak being hit today. Cache hits call `move_to_end()` to refresh recency; cache writes call `popitem(last=False)` to evict the oldest entry when at capacity. Overwrites of existing keys bypass eviction. Both operations run under the existing `_cache_lock` for thread safety. With Fix A and Fix B both shipped, issue #2215 is closed.
+49 -6
View File
@@ -3567,7 +3567,6 @@ def _run_agent_streaming(
if _agent_sid and _agent_sid != session_id:
old_sid = session_id
new_sid = _agent_sid
# Rename the session file
old_path = SESSION_DIR / f'{old_sid}.json'
new_path = SESSION_DIR / f'{new_sid}.json'
s.session_id = new_sid
@@ -3587,6 +3586,55 @@ def _run_agent_streaming(
"Stamped profile=%r on continuation session %s after compression",
_resolved_profile_name, new_sid,
)
# Preserve the original session file so the full pre-compression
# history survives even when summarisation fails. The previous
# implementation renamed old_sid.json → new_sid.json, which
# destroyed the only persistent copy of the uncompressed history
# before the new (possibly summary-only) session had been saved.
# If the LLM summariser also failed, the user was left with zero
# recoverable messages. (#2223)
# ---
# Archive the old session: write its current state to disk so
# the full conversation history survives even when context
# compression removes messages from the model's context. Skip
# the write when the file already contains up-to-date data
# (i.e. it was just saved by a checkpoint).
if old_path.exists():
try:
existing_text = old_path.read_text(encoding='utf-8')
try:
existing = json.loads(existing_text)
existing_msgs = len(existing.get('messages') or [])
except (json.JSONDecodeError, ValueError):
existing_msgs = -1
if len(s.messages) > existing_msgs:
# In-memory messages are newer than the file — save.
saved_sid = s.session_id
s.session_id = old_sid
try:
# Temporarily restore old sid so save()
# writes to old_path. Also stamp a
# parent_session_id linking back so lineage
# can be traced from the OLD session forward
# to the continuation session.
_old_parent = s.parent_session_id
s.parent_session_id = None
s.save(touch_updated_at=False, skip_index=True)
s.parent_session_id = _old_parent
logger.info(
"Preserved pre-compression session %s (%d messages) to disk",
old_sid, len(s.messages),
)
except Exception:
logger.debug("Failed to preserve pre-compression session file", exc_info=True)
finally:
s.session_id = saved_sid
except OSError:
logger.debug("Could not read old session file before preservation")
# Set parent_session_id on the continuation session so the
# frontend can traverse the lineage chain (old → new).
if not s.parent_session_id:
s.parent_session_id = old_sid
with LOCK:
if old_sid in SESSIONS:
SESSIONS[new_sid] = SESSIONS.pop(old_sid)
@@ -3603,11 +3651,6 @@ def _run_agent_streaming(
_cached_entry = SESSION_AGENT_CACHE.pop(old_sid, None)
if _cached_entry:
SESSION_AGENT_CACHE[new_sid] = _cached_entry
if old_path.exists() and not new_path.exists():
try:
old_path.rename(new_path)
except OSError:
logger.debug("Failed to rename session file during compression")
_compressed = True
# Also detect compression via the result dict or compressor state
if not _compressed:
@@ -0,0 +1,106 @@
"""
Tests for #2223: compression session rotation must not destroy session history.
The previous implementation renamed old_sid.json → new_sid.json during context
compression, destroying the only persistent copy of the uncompressed history
before the new session had been saved. If the summariser also failed, the user
was left with zero recoverable messages.
The fix preserves old_sid.json and creates new_sid.json as a fresh file, setting
parent_session_id to link the lineage.
"""
import json
import pathlib
import textwrap
import threading
import pytest
STREAMING = pathlib.Path(__file__).resolve().parents[1] / "api" / "streaming.py"
streaming_src = STREAMING.read_text(encoding="utf-8")
# ── Structural checks ────────────────────────────────────────────────────────
class TestNoRenameDuringCompression:
"""The destructive old_path.rename(new_path) call must be removed."""
def test_rename_call_removed(self):
"""old_path.rename(new_path) must not appear in the compression rotation block."""
# The old code had: if old_path.exists() and not new_path.exists(): old_path.rename(new_path)
# That line must be gone.
assert "old_path.rename(new_path)" not in streaming_src, (
"old_path.rename(new_path) still present — compression rotation "
"still destroys session history (#2223)"
)
def test_parent_session_id_stamped_on_continuation(self):
"""The continuation session must carry parent_session_id linking to old_sid."""
assert "s.parent_session_id = old_sid" in streaming_src, (
"parent_session_id not stamped on continuation session (#2223)"
)
def test_old_session_preservation_logic_exists(self):
"""There must be logic to preserve the pre-compression session file."""
assert "Preserved pre-compression session" in streaming_src, (
"Pre-compression session preservation logging not found (#2223)"
)
class TestMergePreservesHistory:
"""_merge_display_messages_after_agent_result must preserve all previous
display messages when compression returns only a marker."""
@pytest.fixture
def merge(self):
from api.streaming import _merge_display_messages_after_agent_result
return _merge_display_messages_after_agent_result
def test_marker_only_preserves_all_previous(self, merge):
"""When result is just a compression-failure marker, previous display survives."""
previous_display = [
{"role": "user", "content": f"msg{i}"} for i in range(100)
] + [
{"role": "assistant", "content": f"reply{i}"} for i in range(100)
]
previous_context = list(previous_display)
marker = {
"role": "user",
"content": (
"Summary generation was unavailable. 200 message(s) were removed "
"to free context space but could not be summarized."
),
}
result = [marker, {"role": "user", "content": "continue"}, {"role": "assistant", "content": "ok"}]
merged = merge(previous_display, previous_context, result, "continue")
# All 200 original messages must survive.
assert len(merged) >= 200
for i in range(100):
assert merged[i]["content"] == f"msg{i}"
def test_empty_result_preserves_all_previous(self, merge):
"""If result_messages is empty, previous display is returned unchanged."""
previous_display = [
{"role": "user", "content": "hello"},
{"role": "assistant", "content": "hi"},
]
previous_context = list(previous_display)
merged = merge(previous_display, previous_context, [], "test")
assert merged == previous_display
def test_none_result_preserves_all_previous(self, merge):
"""If result_messages is None, previous display is returned unchanged."""
previous_display = [
{"role": "user", "content": "hello"},
{"role": "assistant", "content": "hi"},
]
previous_context = list(previous_display)
merged = merge(previous_display, previous_context, None, "test")
assert merged == previous_display