diff --git a/api/compression_anchor.py b/api/compression_anchor.py new file mode 100644 index 00000000..98799efc --- /dev/null +++ b/api/compression_anchor.py @@ -0,0 +1,77 @@ +""" +Shared helpers for session compression anchor metadata. +""" + + +def _content_text(content, *, part_types): + if isinstance(content, list): + return "\n".join( + str(part.get("text") or part.get("content") or "") + for part in content + if isinstance(part, dict) and part.get("type") in part_types + ).strip() + return str(content or "").strip() + + +def _content_has_part_type(content, part_types): + if not isinstance(content, list): + return False + return any( + isinstance(part, dict) and part.get("type") in part_types + for part in content + ) + + +def visible_messages_for_anchor(messages, *, auto_compression: bool = False): + """Return transcript messages that can anchor compression UI metadata. + + Manual compression historically only counted plain ``text`` content parts + for non-assistant messages, while the streaming auto-compression path also + accepted provider-style ``input_text`` / ``output_text`` parts and metadata + markers on any non-tool role. Keep that difference explicit at the call site + instead of carrying two near-identical helper implementations. + """ + out = [] + text_part_types = {"text", "input_text", "output_text"} if auto_compression else {"text"} + for message in messages or []: + if not isinstance(message, dict): + continue + role = message.get("role") + if not role or role == "tool": + continue + + content = message.get("content", "") + has_attachments = bool(message.get("attachments")) + text = _content_text(content, part_types=text_part_types) + + if auto_compression: + has_tool_calls = bool( + isinstance(message.get("tool_calls"), list) and message.get("tool_calls") + ) + has_tool_use = _content_has_part_type(content, {"tool_use"}) + has_reasoning = bool(message.get("reasoning")) + if not text: + has_reasoning = has_reasoning or _content_has_part_type( + content, + {"thinking", "reasoning"}, + ) + if text or has_attachments or has_tool_calls or has_tool_use or has_reasoning: + out.append(message) + continue + + if role == "assistant": + has_tool_calls = bool( + isinstance(message.get("tool_calls"), list) and message.get("tool_calls") + ) + has_tool_use = _content_has_part_type(content, {"tool_use"}) + has_reasoning = bool(message.get("reasoning")) or _content_has_part_type( + content, + {"thinking", "reasoning"}, + ) + if text or has_attachments or has_tool_calls or has_tool_use or has_reasoning: + out.append(message) + continue + + if text or has_attachments: + out.append(message) + return out diff --git a/api/routes.py b/api/routes.py index 59f2f326..9c08268b 100644 --- a/api/routes.py +++ b/api/routes.py @@ -28,6 +28,7 @@ from api.agent_sessions import ( is_cli_session_row_visible, read_session_lineage_report, ) +from api.compression_anchor import visible_messages_for_anchor logger = logging.getLogger(__name__) @@ -7563,51 +7564,6 @@ def _handle_clarify_respond(handler, body): def _handle_session_compress(handler, body): - def _visible_messages_for_anchor(messages): - out = [] - for m in messages or []: - if not isinstance(m, dict): - continue - role = m.get("role") - if not role or role == "tool": - continue - content = m.get("content", "") - has_attachments = bool(m.get("attachments")) - if role == "assistant": - tool_calls = m.get("tool_calls") - has_tool_calls = isinstance(tool_calls, list) and len(tool_calls) > 0 - has_tool_use = False - has_reasoning = bool(m.get("reasoning")) - if isinstance(content, list): - for p in content: - if not isinstance(p, dict): - continue - if p.get("type") == "tool_use": - has_tool_use = True - if p.get("type") in {"thinking", "reasoning"}: - has_reasoning = True - text = "\n".join( - str(p.get("text") or p.get("content") or "") - for p in content - if isinstance(p, dict) and p.get("type") == "text" - ).strip() - else: - text = str(content or "").strip() - if text or has_attachments or has_tool_calls or has_tool_use or has_reasoning: - out.append(m) - continue - if isinstance(content, list): - text = "\n".join( - str(p.get("text") or p.get("content") or "") - for p in content - if isinstance(p, dict) and p.get("type") == "text" - ).strip() - else: - text = str(content or "").strip() - if text or has_attachments: - out.append(m) - return out - def _anchor_message_key(m): if not isinstance(m, dict): return None @@ -7846,7 +7802,7 @@ def _handle_session_compress(handler, body): s.pending_user_message = None s.pending_attachments = [] s.pending_started_at = None - visible_after = _visible_messages_for_anchor(compressed) + visible_after = visible_messages_for_anchor(compressed, auto_compression=False) s.compression_anchor_visible_idx = max(0, len(visible_after) - 1) if visible_after else None s.compression_anchor_message_key = _anchor_message_key(visible_after[-1]) if visible_after else None summary_text = None diff --git a/api/streaming.py b/api/streaming.py index ece26fa4..8d5865d8 100644 --- a/api/streaming.py +++ b/api/streaming.py @@ -33,6 +33,7 @@ from api.config import ( model_with_provider_context, ) from api.helpers import redact_session_data, _redact_text +from api.compression_anchor import visible_messages_for_anchor from api.metering import meter # Global lock for os.environ writes. Per-session locks (_agent_lock) prevent @@ -1606,44 +1607,6 @@ def _compression_anchor_message_key(message): return {'role': role, 'ts': ts, 'text': text, 'attachments': attach_count} -def _visible_messages_for_compression_anchor(messages): - out = [] - for m in messages or []: - if not isinstance(m, dict): - continue - role = m.get('role') - if not role or role == 'tool': - continue - content = m.get('content', '') - has_attachments = bool(m.get('attachments')) - has_tool_calls = bool(isinstance(m.get('tool_calls'), list) and m.get('tool_calls')) - has_tool_use = False - has_reasoning = bool(m.get('reasoning')) - if isinstance(content, list): - text = '\n'.join( - str(p.get('text') or p.get('content') or '') - for p in content - if isinstance(p, dict) - and p.get('type') in {'text', 'input_text', 'output_text'} - ).strip() - for part in content: - if not isinstance(part, dict): - continue - if part.get('type') == 'tool_use': - has_tool_use = True - if not text: - has_reasoning = has_reasoning or any( - isinstance(part, dict) - and part.get('type') in {'thinking', 'reasoning'} - for part in content - ) - else: - text = str(content or '').strip() - if text or has_attachments or has_tool_calls or has_tool_use or has_reasoning: - out.append(m) - return out - - def _compression_summary_from_messages(messages): for m in reversed(messages or []): if not isinstance(m, dict): @@ -3360,7 +3323,7 @@ def _run_agent_streaming( _compressed = True # Notify the frontend that compression happened if _compressed: - visible_after = _visible_messages_for_compression_anchor(s.messages) + visible_after = visible_messages_for_anchor(s.messages, auto_compression=True) s.compression_anchor_visible_idx = ( max(0, len(visible_after) - 1) if visible_after else None ) diff --git a/tests/test_issue2028_compression_anchor_helpers.py b/tests/test_issue2028_compression_anchor_helpers.py new file mode 100644 index 00000000..1fcb4f6a --- /dev/null +++ b/tests/test_issue2028_compression_anchor_helpers.py @@ -0,0 +1,59 @@ +""" +Regression coverage for shared compression-anchor visibility helpers (#2028). +""" + +from pathlib import Path + +from api.compression_anchor import visible_messages_for_anchor + + +def test_legacy_duplicate_anchor_helpers_are_removed(): + routes_src = Path("api/routes.py").read_text(encoding="utf-8") + streaming_src = Path("api/streaming.py").read_text(encoding="utf-8") + + assert "def _visible_messages_for_anchor" not in routes_src + assert "def _visible_messages_for_compression_anchor" not in streaming_src + assert "visible_messages_for_anchor(compressed, auto_compression=False)" in routes_src + assert "visible_messages_for_anchor(s.messages, auto_compression=True)" in streaming_src + + +def test_visible_messages_for_anchor_preserves_manual_text_part_filter(): + text_only = {"role": "assistant", "content": [{"type": "text", "text": "Visible"}]} + input_only = {"role": "assistant", "content": [{"type": "input_text", "text": "Model input"}]} + reasoning_only = {"role": "assistant", "content": [{"type": "thinking", "text": "hidden"}]} + tool_use_only = {"role": "assistant", "content": [{"type": "tool_use", "id": "call_1"}]} + tool_message = {"role": "tool", "content": "tool output"} + + assert visible_messages_for_anchor( + [text_only, input_only, reasoning_only, tool_use_only, tool_message], + auto_compression=False, + ) == [text_only, reasoning_only, tool_use_only] + + +def test_visible_messages_for_anchor_preserves_auto_compression_text_part_filter(): + text_only = {"role": "assistant", "content": [{"type": "text", "text": "Visible"}]} + input_only = {"role": "assistant", "content": [{"type": "input_text", "text": "Model input"}]} + output_only = {"role": "assistant", "content": [{"type": "output_text", "text": "Model output"}]} + reasoning_only = {"role": "assistant", "content": [{"type": "reasoning", "text": "hidden"}]} + tool_message = {"role": "tool", "content": "tool output"} + + assert visible_messages_for_anchor( + [text_only, input_only, output_only, reasoning_only, tool_message], + auto_compression=True, + ) == [text_only, input_only, output_only, reasoning_only] + + +def test_visible_messages_for_anchor_keeps_manual_user_messages_simple(): + user_tool_metadata = {"role": "user", "content": [], "tool_calls": [{"id": "call_1"}]} + user_attachment = {"role": "user", "content": [], "attachments": [{"name": "screenshot.png"}]} + assistant_tool_metadata = {"role": "assistant", "content": [], "tool_calls": [{"id": "call_2"}]} + + assert visible_messages_for_anchor( + [user_tool_metadata, user_attachment, assistant_tool_metadata], + auto_compression=False, + ) == [user_attachment, assistant_tool_metadata] + + assert visible_messages_for_anchor( + [user_tool_metadata, user_attachment, assistant_tool_metadata], + auto_compression=True, + ) == [user_tool_metadata, user_attachment, assistant_tool_metadata]