diff --git a/api/routes.py b/api/routes.py index 01671c2a..097de890 100644 --- a/api/routes.py +++ b/api/routes.py @@ -2277,6 +2277,7 @@ try: _pending, _lock, _permanent_approved, + _gateway_queues, resolve_gateway_approval, enable_session_yolo, disable_session_yolo, @@ -2295,6 +2296,7 @@ except ImportError: _pending = {} _lock = threading.Lock() _permanent_approved = set() + _gateway_queues = {} # ── Approval SSE subscribers (long-connection push) ────────────────────────── @@ -8739,6 +8741,7 @@ def _resolve_approval_legacy(sid: str, approval_id: str, choice: str) -> bool: # that omit approval_id still resolve the oldest entry for compatibility. pending = None found_target = False + gateway_keys = [] with _lock: queue = _pending.get(sid) if isinstance(queue, list): @@ -8764,6 +8767,25 @@ def _resolve_approval_legacy(sid: str, approval_id: str, choice: str) -> bool: if not approval_id or queue.get("approval_id") == approval_id: pending = _pending.pop(sid, None) found_target = pending is not None + # When no _pending entry found, peek into _gateway_queues for + # pattern_keys so session-level approval still works. The gateway + # path is the primary mechanism during active streaming; _pending + # is only used for UI polling/SSE notification. + # NOTE: Gateway queue entries don't carry approval_id, so when + # approval_id is given and _pending is empty, we assume the gateway + # entry at the head of the queue corresponds. This is safe because + # gateway entries are consumed synchronously with _pending entries + # under the same lock — there is no interleaving where a stale + # approval_id could match a different gateway entry. + if not pending: + gw_queue = _gateway_queues.get(sid) + if gw_queue and len(gw_queue) > 0: + gw_entry = gw_queue[0] + # _gateway_queues stores _ApprovalEntry objects; their + # .data dict carries command, pattern_key, pattern_keys. + gw_data = getattr(gw_entry, 'data', None) or {} + gateway_keys = gw_data.get("pattern_keys") or [gw_data.get("pattern_key", "")] if gw_data else [] + found_target = True # Notify SSE subscribers of the new head (or empty state) so the UI # surfaces any trailing approvals that were queued behind this one # without waiting for the next submit_pending. Without this, a parallel @@ -8775,16 +8797,17 @@ def _resolve_approval_legacy(sid: str, approval_id: str, choice: str) -> bool: else: _approval_sse_notify_locked(sid, None, 0) - if pending: - keys = pending.get("pattern_keys") or [pending.get("pattern_key", "")] - if choice in ("once", "session"): - for k in keys: - approve_session(sid, k) - elif choice == "always": - for k in keys: - approve_session(sid, k) - approve_permanent(k) - save_permanent_allowlist(_permanent_approved) + # Collect keys from both _pending and _gateway_queues + keys_from_pending = pending.get("pattern_keys") or [pending.get("pattern_key", "")] if pending else [] + all_keys = [k for k in keys_from_pending if k] + [k for k in gateway_keys if k] + if choice in ("once", "session"): + for k in all_keys: + approve_session(sid, k) + elif choice == "always": + for k in all_keys: + approve_session(sid, k) + approve_permanent(k) + save_permanent_allowlist(_permanent_approved) # Unblock the agent thread waiting in the gateway approval queue. # This is the primary signal when streaming is active — the agent # thread is parked in entry.event.wait() and needs to be woken up. diff --git a/tests/test_runtime_adapter_seam.py b/tests/test_runtime_adapter_seam.py index 1b6910c6..f24bac67 100644 --- a/tests/test_runtime_adapter_seam.py +++ b/tests/test_runtime_adapter_seam.py @@ -1,6 +1,8 @@ import importlib import queue +from tests.conftest import requires_agent_modules + def test_runtime_adapter_interface_and_legacy_journal_methods_exist(): runtime = importlib.import_module("api.runtime_adapter") @@ -271,6 +273,78 @@ def test_approval_respond_does_not_fallback_to_oldest_when_explicit_id_is_stale( assert "queue.pop(0)" not in stale_branch +def test_approval_respond_peeks_gateway_queues_when_pending_empty() -> None: + """When _pending has no matching entry but _gateway_queues does, the + helper should extract pattern_keys from the gateway queue and call + approve_session even though pending is None. + """ + routes = importlib.import_module("api.routes") + src = (routes.Path(__file__).parent.parent / "api" / "routes.py").read_text(encoding="utf-8") + helper_idx = src.index("def _resolve_approval_legacy") + helper_body = src[helper_idx:src.index("def _handle_approval_respond", helper_idx)] + + assert "_gateway_queues" in helper_body, ( + "_resolve_approval_legacy must reference _gateway_queues " + "to read pattern_keys when _pending is empty" + ) + assert "gateway_keys" in helper_body, ( + "Must extract pattern_keys from _gateway_queues into a gateway_keys variable" + ) + assert "approve_session" in helper_body[helper_body.index("all_keys"):], ( + "Must call approve_session for keys extracted from _gateway_queues" + ) + + +@requires_agent_modules +def test_approval_respond_approves_from_gateway_queues_when_pending_empty() -> None: + """Verify _resolve_approval_legacy peeks into _gateway_queues for + pattern_keys when _pending has no matching entry, and calls + approve_session() even though pending is None (the real streaming case). + """ + import threading + from api.routes import _resolve_approval_legacy + + routes = importlib.import_module("api.routes") + approval_mod = importlib.import_module("tools.approval") + + test_sid = "__test_gateway_approval_sid__" + test_key = "__test_pattern_key__" + + # 1. Ensure _pending is empty for this sid + with approval_mod._lock: + approval_mod._pending.pop(test_sid, None) + + # 2. Populate _gateway_queues with a real entry + entry = approval_mod._ApprovalEntry({ + "command": "test_cmd", + "pattern_key": test_key, + "pattern_keys": [test_key], + "description": "test dangerous cmd", + }) + with approval_mod._lock: + approval_mod._gateway_queues.setdefault(test_sid, []).append(entry) + + try: + # 3. Run the helper with empty _pending but populated _gateway_queues + result = _resolve_approval_legacy(test_sid, "", "session") + + # 4. Verify approve_session was called (is_approved must return True) + assert approval_mod.is_approved(test_sid, test_key), ( + "approve_session should have been called for the pattern_key " + "extracted from _gateway_queues" + ) + assert result is True, ( + "_resolve_approval_legacy should return True when it finds " + "and resolves the gateway entry" + ) + finally: + # 5. Cleanup + with approval_mod._lock: + approval_mod._gateway_queues.pop(test_sid, None) + approval_mod._session_approved.pop(test_sid, None) + approval_mod._pending.pop(test_sid, None) + + def test_chat_start_route_selects_adapter_only_when_flag_enabled(): routes = importlib.import_module("api.routes") src = (routes.Path(__file__).parent.parent / "api" / "routes.py").read_text(encoding="utf-8")