From 3584cf4125b55f14298fe2c4af2364987c627982 Mon Sep 17 00:00:00 2001 From: Rahul Date: Tue, 26 May 2026 15:39:45 +0530 Subject: [PATCH 1/2] =?UTF-8?q?fix(interceptor):=20CodeGuard=20AST+heurist?= =?UTF-8?q?ic=20dual-layer=20scan,=20HEURISTIC=5FPASS=20verdict=20?= =?UTF-8?q?=E2=80=94=20closes=20#9?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit A2A-005: CodeGuard was a regex-only heuristic mislabeled as 'code security verification'. Regex scans that pass prove absence of known signatures, not code safety. This collapses VERIFIED and HEURISTIC_PASS into a single trust claim — downstream consumers cannot tell the difference. Changes: schema.py - Added VerdictStatus.HEURISTIC_PASS = 'heuristic_pass' - Distinct from FORWARDED (deterministic proof) and UNVERIFIABLE telemetry.py - Added total_heuristic_pass counter to InterceptMetrics - record_intercept() routes 'heuristic_pass' correctly (not to errors) interceptor.py — _verify_code() rewritten: Layer 1 — AST structural analysis (primary): - ast.parse() on every payload; unparseable code fails CLOSED (BLOCKED) - ast.walk() checks: dangerous Name calls (eval/exec/compile/__import__), dangerous Attribute calls (.system/.popen/.run/.Popen/.call/ .check_output/.check_call), dangerous module imports (subprocess/importlib/ctypes/pty) - Catches string-concatenation bypass: getattr(__builtins__, 'ev'+'al') is caught by regex heuristic (Layer 2) Layer 2 — Regex heuristic scan (secondary, defense in depth): - getattr(__builtins__,...) — catches dynamic name-lookup bypasses - __builtins__.__dict__[...] — dict-based attribute bypass - base64.b64decode — encoded payload exfiltration - __import__(...) — direct dynamic import - os.system / os.popen — in case AST misses edge cases Return semantics: - BLOCKED → dangerous construct found (either layer) - HEURISTIC_PASS → both layers clean; never 'verified: True' - JWT for HEURISTIC_PASS declares verdict_status='heuristic_pass' so downstream consumers know they received a heuristic result Docstring updated: 'security verification' → 'heuristic scan' interceptor.py — intercept() dispatch: - Added HEURISTIC_PASS branch alongside UNVERIFIABLE - _build_verdict() issues signed JWT for HEURISTIC_PASS (verdict claim = 'heuristic_pass', not 'forwarded') tests/test_code_guard.py (31 new tests): - VerdictStatus enum assertions - Clean code → HEURISTIC_PASS (never FORWARDED) - JWT verdict claim = 'heuristic_pass' - AST: direct calls (eval/exec/compile/__import__) - AST: attribute calls (os.system, subprocess.run/Popen) - AST: imports (subprocess, importlib) - All bypass scenarios from issue #9 - Fail-closed on syntax error - Telemetry counter isolation tests/test_interceptor.py: - Updated safe_code_forwarded → HEURISTIC_PASS - Updated dangerous_code reason check (AST format) --- src/qwed_a2a/interceptor.py | 166 +++++++++++++--- src/qwed_a2a/protocol/schema.py | 1 + src/qwed_a2a/utils/telemetry.py | 4 + tests/test_code_guard.py | 326 ++++++++++++++++++++++++++++++++ tests/test_interceptor.py | 4 +- 5 files changed, 476 insertions(+), 25 deletions(-) create mode 100644 tests/test_code_guard.py diff --git a/src/qwed_a2a/interceptor.py b/src/qwed_a2a/interceptor.py index 5542474..876f3b7 100644 --- a/src/qwed_a2a/interceptor.py +++ b/src/qwed_a2a/interceptor.py @@ -7,6 +7,7 @@ This is the [QWED Core] module — all inter-agent messages flow through here. """ +import ast import json import re import time @@ -128,6 +129,15 @@ async def intercept( message=message, details=engine_result, ) + elif engine_result.get("status") == "heuristic_pass": + verdict = self._build_verdict( + trace_id=trace_id, + status=VerdictStatus.HEURISTIC_PASS, + reason=engine_result.get("reason"), + engine=engine_result["engine"], + message=message, + details=engine_result, + ) elif engine_result["verified"]: verdict = self._build_verdict( trace_id=trace_id, @@ -290,56 +300,161 @@ def _verify_logic(self, payload: Dict[str, Any]) -> Dict[str, Any]: "reason": "No contradictions found in assertions", } - # Compiled regex patterns for case-insensitive, whitespace-tolerant detection + # ── AST dangerous node types ───────────────────────────────────────────── + # Used by _verify_code_ast() for structural (not textual) analysis. + # These are deterministic: if the AST contains one of these constructs + # the payload is blocked regardless of how obfuscated the source is. + _DANGEROUS_CALL_NAMES: frozenset = frozenset( + { + "eval", + "exec", + "compile", + "__import__", + } + ) + _DANGEROUS_ATTR_CALLS: frozenset = frozenset( + { + "system", + "popen", + "run", + "Popen", + "call", + "check_output", + "check_call", + } + ) + # Dangerous module import names (caught at ast.Import / ast.ImportFrom level) + _DANGEROUS_IMPORTS: frozenset = frozenset( + { + "subprocess", + "importlib", + "ctypes", + "pty", + } + ) + + # ── Regex patterns as secondary heuristic layer ─────────────────────────── + # Catch obfuscation patterns that survive AST parsing: encoded strings, + # getattr-based lookups, and dynamic attribute construction. _DANGEROUS_PATTERNS: Dict[str, re.Pattern] = { - "eval": re.compile(r"\beval\s*\(", re.IGNORECASE), - "exec": re.compile(r"\bexec\s*\(", re.IGNORECASE), - "subprocess": re.compile( - r"\b(?:subprocess\s*\.|import\s+subprocess\b|from\s+subprocess\s+import\b)", - re.IGNORECASE, + "getattr_builtin": re.compile( + r"""getattr\s*\(\s*(?:__builtins__|builtins)\s*""", re.IGNORECASE + ), + "builtins_dict_access": re.compile( + r"""__builtins__\s*\.\s*__dict__\s*\[""", re.IGNORECASE ), - "os.system": re.compile(r"\bos\.system\s*\(", re.IGNORECASE), - "os.popen": re.compile(r"\bos\.popen\s*\(", re.IGNORECASE), - "__import__": re.compile(r"__import__\s*\(", re.IGNORECASE), - "compile": re.compile(r"\bcompile\s*\(", re.IGNORECASE), - "importlib": re.compile(r"\bimportlib\s*\.", re.IGNORECASE), + "base64_exec": re.compile( + r"""(?:base64\s*\.\s*b64decode|b64decode)\s*\(""", re.IGNORECASE + ), + "dynamic_import": re.compile(r"""__import__\s*\(""", re.IGNORECASE), + "os_system": re.compile(r"""\bos\.system\s*\(""", re.IGNORECASE), + "os_popen": re.compile(r"""\bos\.popen\s*\(""", re.IGNORECASE), } def _verify_code(self, payload: Dict[str, Any]) -> Dict[str, Any]: """ - Lightweight code security verification. + Heuristic code security scan: AST structural analysis + regex patterns. + + Important: this is a heuristic scan, NOT deterministic verification. + A HEURISTIC_PASS result means no known dangerous constructs were found — + it does NOT mean the code is safe. Obfuscated or novel attack patterns + may not be detected. - Scans for dangerous patterns using case-insensitive regex - to prevent trivial bypass via casing or whitespace. + Analysis layers (run in order): + 1. AST parse — catches direct dangerous calls and imports + structurally, before any text-level obfuscation can hide them + 2. Regex scan — secondary heuristic for dynamic access patterns + (getattr(__builtins__,...), base64-encoded payloads, etc.) + + Returns HEURISTIC_PASS when no threats found, BLOCKED when any found. """ code = payload.get("code", "") if not code: return { - "verified": True, + "verified": False, + "status": "heuristic_pass", + "engine": "code_guard", + "reason": "No code to analyze", + } + + # ── Layer 1: AST structural analysis ────────────────────────────────── + try: + tree = ast.parse(code) + except SyntaxError as exc: + # Unparseable code could indicate obfuscation or raw bytecode; + # fail closed — do not forward what we cannot analyse. + return { + "verified": False, + "engine": "code_guard", + "reason": f"Code failed AST parsing — cannot verify: {exc}", + } + + ast_threats: list = [] + for node in ast.walk(tree): + # Direct dangerous function calls: eval(...), exec(...), compile(...) + if isinstance(node, ast.Call): + func = node.func + if isinstance(func, ast.Name) and func.id in self._DANGEROUS_CALL_NAMES: + ast_threats.append(f"call:{func.id}()") + # Attribute calls: os.system(...), subprocess.run(...), etc. + elif isinstance(func, ast.Attribute): + if func.attr in self._DANGEROUS_ATTR_CALLS: + ast_threats.append(f"attr:.{func.attr}()") + + # Dangerous imports: import subprocess, from ctypes import ... + elif isinstance(node, ast.Import): + for alias in node.names: + root = alias.name.split(".")[0] + if root in self._DANGEROUS_IMPORTS: + ast_threats.append(f"import:{root}") + elif isinstance(node, ast.ImportFrom): + if node.module: + root = node.module.split(".")[0] + if root in self._DANGEROUS_IMPORTS: + ast_threats.append(f"import:{root}") + + if ast_threats: + return { + "verified": False, "engine": "code_guard", - "reason": "No code to verify", + "reason": ( + f"Dangerous constructs detected via AST analysis: " + f"{', '.join(ast_threats)}" + ), + "threats": ast_threats, + "analysis": "ast", } - found_threats = [] + # ── Layer 2: Regex heuristic scan ───────────────────────────────────── + regex_threats: list = [] for label, pattern in self._DANGEROUS_PATTERNS.items(): if pattern.search(code): - found_threats.append(label) + regex_threats.append(label) - if found_threats: + if regex_threats: return { "verified": False, "engine": "code_guard", "reason": ( - f"Dangerous code patterns detected: {', '.join(found_threats)}" + f"Suspicious patterns detected via heuristic scan: " + f"{', '.join(regex_threats)}" ), - "threats": found_threats, + "threats": regex_threats, + "analysis": "regex", } + # ── Both layers clean — heuristic pass, not verified ────────────────── return { - "verified": True, + "verified": False, + "status": "heuristic_pass", "engine": "code_guard", - "reason": "No dangerous patterns found in code", + "reason": ( + "AST analysis and heuristic scan found no known dangerous constructs. " + "This is a heuristic result — novel or deeply obfuscated attack " + "patterns may not be detected." + ), + "analysis": "ast+regex", } def _build_verdict( @@ -358,6 +473,11 @@ def _build_verdict( """ attestation_jwt = None + # UNVERIFIABLE — no JWT (no verification ran, issuing one would be false) + # All other statuses (FORWARDED, BLOCKED, HEURISTIC_PASS) get signed JWTs. + # HEURISTIC_PASS JWTs declare verdict_status="heuristic_pass" in their + # claims so downstream consumers know they received a heuristic result, + # not a deterministic verification proof. if status != VerdictStatus.UNVERIFIABLE: try: payload_hash = A2ACryptoService.hash_content( diff --git a/src/qwed_a2a/protocol/schema.py b/src/qwed_a2a/protocol/schema.py index c1320e1..6e13fcb 100644 --- a/src/qwed_a2a/protocol/schema.py +++ b/src/qwed_a2a/protocol/schema.py @@ -28,6 +28,7 @@ class VerdictStatus(str, Enum): FORWARDED = "forwarded" BLOCKED = "blocked" UNVERIFIABLE = "unverifiable" + HEURISTIC_PASS = "heuristic_pass" ERROR = "error" diff --git a/src/qwed_a2a/utils/telemetry.py b/src/qwed_a2a/utils/telemetry.py index 78521ef..93b94db 100644 --- a/src/qwed_a2a/utils/telemetry.py +++ b/src/qwed_a2a/utils/telemetry.py @@ -32,6 +32,7 @@ class InterceptMetrics: total_forwarded: int = 0 total_blocked: int = 0 total_unverifiable: int = 0 + total_heuristic_pass: int = 0 total_errors: int = 0 total_latency_ms: float = 0.0 by_engine: Dict[str, int] = field(default_factory=lambda: defaultdict(int)) @@ -55,6 +56,7 @@ def to_dict(self) -> Dict[str, Any]: "total_forwarded": self.total_forwarded, "total_blocked": self.total_blocked, "total_unverifiable": self.total_unverifiable, + "total_heuristic_pass": self.total_heuristic_pass, "total_errors": self.total_errors, "average_latency_ms": round(self.average_latency_ms, 2), "block_rate": round(self.block_rate, 4), @@ -133,6 +135,8 @@ def record_intercept( metrics.total_blocked += 1 elif status == "unverifiable": metrics.total_unverifiable += 1 + elif status == "heuristic_pass": + metrics.total_heuristic_pass += 1 else: metrics.total_errors += 1 diff --git a/tests/test_code_guard.py b/tests/test_code_guard.py new file mode 100644 index 0000000..b1f39a8 --- /dev/null +++ b/tests/test_code_guard.py @@ -0,0 +1,326 @@ +""" +Tests for CodeGuard heuristic scan — issue #9 (A2A-005). + +Covers: +- HEURISTIC_PASS is returned for clean code (not FORWARDED/VERIFIED) +- AST analysis blocks direct dangerous calls +- AST analysis blocks dangerous imports +- Regex heuristics block dynamic/obfuscated access patterns +- All bypass examples from the issue are handled +- JWT is issued for HEURISTIC_PASS with trust_level=heuristic in claims +- VerdictStatus.HEURISTIC_PASS exists and is distinct from FORWARDED +- Telemetry records heuristic_pass +""" + +import pytest + +from qwed_a2a.interceptor import A2AVerificationInterceptor +from qwed_a2a.protocol.schema import ( + AgentMessage, + InterceptorConfig, + PayloadType, + VerdictStatus, +) +from qwed_a2a.security.trust_boundary import TrustBoundary + + +# helpers + + +def _code_message( + code: str, sender: str = "agent-A", receiver: str = "agent-B" +) -> AgentMessage: + return AgentMessage( + sender_agent_id=sender, + receiver_agent_id=receiver, + payload_type=PayloadType.CODE_EXECUTION, + payload={"code": code}, + ) + + +@pytest.fixture +def code_interceptor(): + """Interceptor with code verification and open trust boundary.""" + return A2AVerificationInterceptor( + config=InterceptorConfig(enable_code_verification=True, block_on_error=True), + trust_boundary=TrustBoundary(default_allow=True), + ) + + +# VerdictStatus enum + + +class TestVerdictStatusEnum: + def test_heuristic_pass_exists(self): + assert hasattr(VerdictStatus, "HEURISTIC_PASS") + + def test_heuristic_pass_value(self): + assert VerdictStatus.HEURISTIC_PASS == "heuristic_pass" + + def test_heuristic_pass_is_not_forwarded(self): + assert VerdictStatus.HEURISTIC_PASS != VerdictStatus.FORWARDED + + def test_heuristic_pass_is_not_blocked(self): + assert VerdictStatus.HEURISTIC_PASS != VerdictStatus.BLOCKED + + def test_heuristic_pass_is_not_unverifiable(self): + assert VerdictStatus.HEURISTIC_PASS != VerdictStatus.UNVERIFIABLE + + +# Clean code -> HEURISTIC_PASS + + +class TestCleanCodeReturnsHeuristicPass: + @pytest.mark.asyncio + async def test_clean_code_is_heuristic_pass(self, code_interceptor): + msg = _code_message("x = 1 + 2\nprint(x)") + verdict = await code_interceptor.intercept(msg, trace_id="t_clean") + assert verdict.status == VerdictStatus.HEURISTIC_PASS + + @pytest.mark.asyncio + async def test_clean_code_is_not_forwarded(self, code_interceptor): + """FORWARDED must never be returned for a code payload.""" + msg = _code_message("def add(a, b): return a + b") + verdict = await code_interceptor.intercept(msg, trace_id="t_not_forwarded") + assert verdict.status != VerdictStatus.FORWARDED + + @pytest.mark.asyncio + async def test_empty_code_is_heuristic_pass(self, code_interceptor): + msg = _code_message("") + verdict = await code_interceptor.intercept(msg, trace_id="t_empty") + assert verdict.status == VerdictStatus.HEURISTIC_PASS + + @pytest.mark.asyncio + async def test_clean_code_has_reason(self, code_interceptor): + msg = _code_message("result = [x**2 for x in range(10)]") + verdict = await code_interceptor.intercept(msg, trace_id="t_reason") + assert verdict.reason is not None + assert len(verdict.reason) > 0 + + @pytest.mark.asyncio + async def test_clean_code_has_attestation_jwt(self, code_interceptor): + """HEURISTIC_PASS must carry a JWT for downstream trust-level inspection.""" + msg = _code_message("import math\nresult = math.sqrt(16)") + verdict = await code_interceptor.intercept(msg, trace_id="t_jwt") + assert verdict.attestation_jwt is not None + + @pytest.mark.asyncio + async def test_heuristic_pass_jwt_declares_correct_verdict(self, code_interceptor): + """The JWT verdict claim must be 'heuristic_pass', not 'forwarded'.""" + import jwt as pyjwt + + msg = _code_message("x = 42") + verdict = await code_interceptor.intercept(msg, trace_id="t_jwt_claim") + assert verdict.attestation_jwt is not None + raw = pyjwt.decode(verdict.attestation_jwt, options={"verify_signature": False}) + assert raw["qwed_a2a"]["verdict"] == "heuristic_pass" + assert raw["qwed_a2a"]["engine"] == "code_guard" + + +# AST analysis — direct dangerous calls + + +class TestASTDirectCalls: + @pytest.mark.asyncio + async def test_eval_direct_call_blocked(self, code_interceptor): + msg = _code_message('result = eval("1+1")') + verdict = await code_interceptor.intercept(msg, trace_id="t_eval") + assert verdict.status == VerdictStatus.BLOCKED + + @pytest.mark.asyncio + async def test_exec_direct_call_blocked(self, code_interceptor): + msg = _code_message('exec("import os")') + verdict = await code_interceptor.intercept(msg, trace_id="t_exec") + assert verdict.status == VerdictStatus.BLOCKED + + @pytest.mark.asyncio + async def test_compile_direct_call_blocked(self, code_interceptor): + msg = _code_message('code = compile("x=1", "", "exec")') + verdict = await code_interceptor.intercept(msg, trace_id="t_compile") + assert verdict.status == VerdictStatus.BLOCKED + + @pytest.mark.asyncio + async def test_dunder_import_direct_call_blocked(self, code_interceptor): + msg = _code_message('os = __import__("os")') + verdict = await code_interceptor.intercept(msg, trace_id="t_dunder_import") + assert verdict.status == VerdictStatus.BLOCKED + + +# AST analysis — attribute calls + + +class TestASTAttributeCalls: + @pytest.mark.asyncio + async def test_os_system_blocked(self, code_interceptor): + msg = _code_message("import os\nos.system('id')") + verdict = await code_interceptor.intercept(msg, trace_id="t_os_system") + assert verdict.status == VerdictStatus.BLOCKED + + @pytest.mark.asyncio + async def test_subprocess_run_blocked(self, code_interceptor): + msg = _code_message("import subprocess\nsubprocess.run(['ls'])") + verdict = await code_interceptor.intercept(msg, trace_id="t_subprocess_run") + assert verdict.status == VerdictStatus.BLOCKED + + @pytest.mark.asyncio + async def test_subprocess_popen_blocked(self, code_interceptor): + msg = _code_message("import subprocess\nsubprocess.Popen(['id'])") + verdict = await code_interceptor.intercept(msg, trace_id="t_popen") + assert verdict.status == VerdictStatus.BLOCKED + + +# AST analysis — dangerous imports + + +class TestASTImports: + @pytest.mark.asyncio + async def test_import_subprocess_blocked(self, code_interceptor): + msg = _code_message("import subprocess") + verdict = await code_interceptor.intercept(msg, trace_id="t_imp_sub") + assert verdict.status == VerdictStatus.BLOCKED + + @pytest.mark.asyncio + async def test_from_subprocess_import_blocked(self, code_interceptor): + msg = _code_message("from subprocess import run") + verdict = await code_interceptor.intercept(msg, trace_id="t_from_sub") + assert verdict.status == VerdictStatus.BLOCKED + + @pytest.mark.asyncio + async def test_import_importlib_blocked(self, code_interceptor): + msg = _code_message("import importlib") + verdict = await code_interceptor.intercept(msg, trace_id="t_importlib") + assert verdict.status == VerdictStatus.BLOCKED + + @pytest.mark.asyncio + async def test_safe_imports_pass(self, code_interceptor): + """Non-dangerous imports must not be blocked.""" + msg = _code_message("import math\nimport json\nfrom pathlib import Path") + verdict = await code_interceptor.intercept(msg, trace_id="t_safe_import") + assert verdict.status == VerdictStatus.HEURISTIC_PASS + + +# Bypass scenarios from issue #9 + + +class TestBypassScenarios: + """ + Bypass examples explicitly listed in issue #9. + These demonstrate why regex-only scanning is insufficient. + """ + + @pytest.mark.asyncio + async def test_bypass_1_string_concat_getattr_builtins(self, code_interceptor): + """ + Bypass 1: getattr(__builtins__, 'ev'+'al')(...) hides function name. + Caught by regex heuristic for getattr(__builtins__, ...) pattern. + """ + code = ( + 'fn = "ev" + "al"\n' + "getattr(__builtins__, fn)(\"import os; os.system('rm -rf /')\")" + ) + msg = _code_message(code) + verdict = await code_interceptor.intercept(msg, trace_id="t_bypass_1") + assert verdict.status == VerdictStatus.BLOCKED, ( + f"String-concat eval bypass not caught. Status: {verdict.status}, " + f"Reason: {verdict.reason}" + ) + + @pytest.mark.asyncio + async def test_bypass_2_builtins_dict_exec(self, code_interceptor): + """ + Bypass 2: __builtins__.__dict__['exec'](...) avoids direct exec() call. + Caught by regex heuristic for __builtins__.__dict__[ pattern. + """ + code = "__builtins__.__dict__['exec'](\"import os; os.system('id')\")" + msg = _code_message(code) + verdict = await code_interceptor.intercept(msg, trace_id="t_bypass_2") + assert ( + verdict.status == VerdictStatus.BLOCKED + ), f"builtins dict bypass not caught. Reason: {verdict.reason}" + + @pytest.mark.asyncio + async def test_bypass_3_exec_with_base64(self, code_interceptor): + """ + Bypass 3: exec(base64.b64decode(...)) — exec() caught by AST directly. + """ + import base64 + + encoded = base64.b64encode(b"import os; os.system('id')").decode() + code = f"import base64\nexec(base64.b64decode('{encoded}'))" + msg = _code_message(code) + verdict = await code_interceptor.intercept(msg, trace_id="t_bypass_3") + assert ( + verdict.status == VerdictStatus.BLOCKED + ), f"exec+base64 bypass not caught. Reason: {verdict.reason}" + + @pytest.mark.asyncio + async def test_bypass_3b_base64_decode_alone(self, code_interceptor): + """ + Bypass 3b: base64.b64decode without exec — regex catches b64decode pattern. + """ + import base64 + + encoded = base64.b64encode(b"import os; os.system('id')").decode() + code = f"import base64\npayload = base64.b64decode('{encoded}')" + msg = _code_message(code) + verdict = await code_interceptor.intercept(msg, trace_id="t_bypass_3b") + assert ( + verdict.status == VerdictStatus.BLOCKED + ), f"base64.b64decode alone not caught. Reason: {verdict.reason}" + + @pytest.mark.asyncio + async def test_syntax_error_fails_closed(self, code_interceptor): + """ + Unparseable code must fail closed — BLOCKED, not HEURISTIC_PASS. + Cannot analyse what we cannot parse. + """ + code = "def broken(\nx = @@invalid_syntax!!" + msg = _code_message(code) + verdict = await code_interceptor.intercept(msg, trace_id="t_syntax_err") + assert ( + verdict.status == VerdictStatus.BLOCKED + ), f"Unparseable code was not blocked. Status: {verdict.status}" + + +# BLOCKED reason and JWT + + +class TestBlockedReason: + @pytest.mark.asyncio + async def test_ast_block_has_reason(self, code_interceptor): + msg = _code_message('eval("x")') + verdict = await code_interceptor.intercept(msg, trace_id="t_block_reason") + assert verdict.status == VerdictStatus.BLOCKED + assert verdict.reason is not None + assert "QWED BLOCKED" in verdict.reason + + @pytest.mark.asyncio + async def test_blocked_verdict_has_jwt(self, code_interceptor): + """BLOCKED verdicts carry a signed JWT as proof of the blocking decision.""" + msg = _code_message('exec("rm -rf /")') + verdict = await code_interceptor.intercept(msg, trace_id="t_block_jwt") + assert verdict.attestation_jwt is not None + + +# Telemetry + + +class TestHeuristicPassTelemetry: + @pytest.mark.asyncio + async def test_heuristic_pass_increments_counter(self, code_interceptor): + from qwed_a2a.utils.telemetry import get_metrics + + msg = _code_message("x = 1") + await code_interceptor.intercept(msg, trace_id="t_telemetry_hp") + metrics = get_metrics() + assert metrics.total_heuristic_pass >= 1 + assert metrics.total_errors == 0 + + @pytest.mark.asyncio + async def test_heuristic_pass_not_counted_as_error(self, code_interceptor): + from qwed_a2a.utils.telemetry import get_metrics + + msg = _code_message("y = 2 + 2") + await code_interceptor.intercept(msg, trace_id="t_telemetry_no_err") + metrics = get_metrics() + assert metrics.total_errors == 0 diff --git a/tests/test_interceptor.py b/tests/test_interceptor.py index 443a4e8..0b4a500 100644 --- a/tests/test_interceptor.py +++ b/tests/test_interceptor.py @@ -56,13 +56,13 @@ async def test_dangerous_code_blocked(self, interceptor, dangerous_code_message) dangerous_code_message, trace_id="t_code_bad" ) assert verdict.status == VerdictStatus.BLOCKED - assert "os.system" in verdict.reason + assert "system" in verdict.reason assert verdict.engine_used == "code_guard" async def test_safe_code_forwarded(self, interceptor, safe_code_message): """Safe code should be forwarded.""" verdict = await interceptor.intercept(safe_code_message, trace_id="t_code_safe") - assert verdict.status == VerdictStatus.FORWARDED + assert verdict.status == VerdictStatus.HEURISTIC_PASS assert verdict.engine_used == "code_guard" From b7d785e80fba7fcb24e493d128dc971c505fcc85 Mon Sep 17 00:00:00 2001 From: Rahul Date: Tue, 26 May 2026 15:50:18 +0530 Subject: [PATCH 2/2] fix(interceptor): scope AST attr-call check to receiver+method pairs; type telemetry MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Sentry HIGH / Codex P1 — false positives in _DANGEROUS_ATTR_CALLS: _DANGEROUS_ATTR_CALLS blocked any method named .run()/.call()/.popen() regardless of the receiver object. thread.run(), client.call(), and any custom job.run() would all be incorrectly BLOCKED. Fix: Replace the flat frozenset with _DANGEROUS_RECEIVER_METHODS — a dict mapping known dangerous module names to their dangerous methods: subprocess -> {run, Popen, call, check_output, check_call, popen} os -> {system, popen} AST walk now checks isinstance(func.value, ast.Name) and matches (receiver, method) pairs. thread.run() passes; subprocess.run() is still blocked. Caveat documented: import aliases (subprocess as sp) evade this layer — regex heuristics provide secondary coverage. CodeRabbit — rejected 'add os to _DANGEROUS_IMPORTS': Blocking all 'import os' would false-positive on os.path, os.environ, os.getcwd etc. The receiver+method fix correctly handles os.system / os.popen without this blanket block. CodeRabbit — VerdictStatus in telemetry (was raw str): - record_intercept() signature changed: status: str -> status: VerdictStatus - All comparisons changed to enum members (VerdictStatus.FORWARDED etc.) - Call site in _record() changed: verdict.status.value -> verdict.status - Prevents silent misclassification to total_errors on typos or new values CodeRabbit — delta-based telemetry assertions in test_code_guard.py: - Before/after snapshot pattern instead of absolute global counter checks - Prevents suite-order coupling if autouse reset ever changes CodeRabbit — JWT alg assertion: - test_heuristic_pass_jwt_declares_correct_verdict now checks header['alg'] - Catches regressions away from ES256 signing path CodeRabbit — test name inconsistency: - test_safe_code_forwarded renamed to test_safe_code_heuristic_pass - Docstring updated to reflect HEURISTIC_PASS semantics New false positive regression tests (+3): - test_thread_run_not_blocked (threading.Thread.start()) - test_arbitrary_run_method_not_blocked (custom job.run()) - test_subprocess_run_still_blocked (confirms real threat still caught) --- src/qwed_a2a/interceptor.py | 40 +++++++++++++++++----------- src/qwed_a2a/utils/telemetry.py | 14 ++++++---- tests/test_code_guard.py | 47 +++++++++++++++++++++++++++++---- tests/test_interceptor.py | 4 +-- 4 files changed, 78 insertions(+), 27 deletions(-) diff --git a/src/qwed_a2a/interceptor.py b/src/qwed_a2a/interceptor.py index 876f3b7..2790f0e 100644 --- a/src/qwed_a2a/interceptor.py +++ b/src/qwed_a2a/interceptor.py @@ -312,17 +312,20 @@ def _verify_logic(self, payload: Dict[str, Any]) -> Dict[str, Any]: "__import__", } ) - _DANGEROUS_ATTR_CALLS: frozenset = frozenset( - { - "system", - "popen", - "run", - "Popen", - "call", - "check_output", - "check_call", - } - ) + # Maps known dangerous module names to the specific method names that are + # dangerous when called on that module. This is intentionally scoped to + # (receiver, method) pairs to avoid false positives: + # thread.run() — safe, receiver is not subprocess/os + # client.call() — safe, receiver is not subprocess/os + # subprocess.run() — dangerous, receiver IS subprocess + # Limitation: import aliasing (e.g., `import subprocess as sp; sp.run()`) + # is not caught here — the regex heuristic layer provides partial coverage. + _DANGEROUS_RECEIVER_METHODS: Dict[str, frozenset] = { + "subprocess": frozenset( + {"run", "Popen", "call", "check_output", "check_call", "popen"} + ), + "os": frozenset({"system", "popen"}), + } # Dangerous module import names (caught at ast.Import / ast.ImportFrom level) _DANGEROUS_IMPORTS: frozenset = frozenset( { @@ -397,10 +400,17 @@ def _verify_code(self, payload: Dict[str, Any]) -> Dict[str, Any]: func = node.func if isinstance(func, ast.Name) and func.id in self._DANGEROUS_CALL_NAMES: ast_threats.append(f"call:{func.id}()") - # Attribute calls: os.system(...), subprocess.run(...), etc. + # Receiver-scoped attribute calls: subprocess.run(...), os.system(...) + # Only blocked when called on a known dangerous receiver — this avoids + # false positives from legitimate .run()/.call() on other objects. elif isinstance(func, ast.Attribute): - if func.attr in self._DANGEROUS_ATTR_CALLS: - ast_threats.append(f"attr:.{func.attr}()") + if isinstance(func.value, ast.Name): + receiver = func.value.id + dangerous_methods = self._DANGEROUS_RECEIVER_METHODS.get( + receiver, frozenset() + ) + if func.attr in dangerous_methods: + ast_threats.append(f"call:{receiver}.{func.attr}()") # Dangerous imports: import subprocess, from ctypes import ... elif isinstance(node, ast.Import): @@ -520,7 +530,7 @@ def _record( """Record telemetry for this intercept.""" latency_ms = (time.perf_counter() - start_time) * 1000 record_intercept( - status=verdict.status.value, + status=verdict.status, engine=verdict.engine_used, sender_id=sender_id, latency_ms=latency_ms, diff --git a/src/qwed_a2a/utils/telemetry.py b/src/qwed_a2a/utils/telemetry.py index 93b94db..a3ae248 100644 --- a/src/qwed_a2a/utils/telemetry.py +++ b/src/qwed_a2a/utils/telemetry.py @@ -12,6 +12,10 @@ from dataclasses import dataclass, field from typing import Any, Callable, Dict, Optional +# Imported here (not at top) to avoid circular imports — +# schema is a leaf module with no telemetry dependency. +from qwed_a2a.protocol.schema import VerdictStatus + # Conditional Sentry import try: import sentry_sdk @@ -118,7 +122,7 @@ def init_telemetry( def record_intercept( - status: str, + status: VerdictStatus, engine: Optional[str], sender_id: str, latency_ms: float, @@ -129,13 +133,13 @@ def record_intercept( metrics.total_latency_ms += latency_ms metrics.by_sender[sender_id] = metrics.by_sender.get(sender_id, 0) + 1 - if status == "forwarded": + if status == VerdictStatus.FORWARDED: metrics.total_forwarded += 1 - elif status == "blocked": + elif status == VerdictStatus.BLOCKED: metrics.total_blocked += 1 - elif status == "unverifiable": + elif status == VerdictStatus.UNVERIFIABLE: metrics.total_unverifiable += 1 - elif status == "heuristic_pass": + elif status == VerdictStatus.HEURISTIC_PASS: metrics.total_heuristic_pass += 1 else: metrics.total_errors += 1 diff --git a/tests/test_code_guard.py b/tests/test_code_guard.py index b1f39a8..6ae2426 100644 --- a/tests/test_code_guard.py +++ b/tests/test_code_guard.py @@ -104,6 +104,34 @@ async def test_clean_code_has_attestation_jwt(self, code_interceptor): verdict = await code_interceptor.intercept(msg, trace_id="t_jwt") assert verdict.attestation_jwt is not None + @pytest.mark.asyncio + async def test_thread_run_not_blocked(self, code_interceptor): + """thread.run() must not be blocked -- receiver is not subprocess/os.""" + code = "import threading\nt = threading.Thread(target=lambda: None)\nt.start()" + msg = _code_message(code) + verdict = await code_interceptor.intercept(msg, trace_id="t_thread_run") + assert ( + verdict.status == VerdictStatus.HEURISTIC_PASS + ), f"threading.Thread.start() incorrectly blocked. Reason: {verdict.reason}" + + @pytest.mark.asyncio + async def test_arbitrary_run_method_not_blocked(self, code_interceptor): + """job.run() on arbitrary objects must not be blocked -- no dangerous receiver.""" + code = "class Job:\n def run(self): return 42\njob = Job()\njob.run()" + msg = _code_message(code) + verdict = await code_interceptor.intercept(msg, trace_id="t_job_run") + assert ( + verdict.status == VerdictStatus.HEURISTIC_PASS + ), f"job.run() incorrectly blocked. Reason: {verdict.reason}" + + @pytest.mark.asyncio + async def test_subprocess_run_still_blocked(self, code_interceptor): + """subprocess.run() must still be blocked -- correct receiver+method pair.""" + code = 'import subprocess\nsubprocess.run(["id"])' + msg = _code_message(code) + verdict = await code_interceptor.intercept(msg, trace_id="t_subprocess_run_fp") + assert verdict.status == VerdictStatus.BLOCKED + @pytest.mark.asyncio async def test_heuristic_pass_jwt_declares_correct_verdict(self, code_interceptor): """The JWT verdict claim must be 'heuristic_pass', not 'forwarded'.""" @@ -112,6 +140,8 @@ async def test_heuristic_pass_jwt_declares_correct_verdict(self, code_intercepto msg = _code_message("x = 42") verdict = await code_interceptor.intercept(msg, trace_id="t_jwt_claim") assert verdict.attestation_jwt is not None + header = pyjwt.get_unverified_header(verdict.attestation_jwt) + assert header["alg"] in {"ES256", "ES384", "ES512"} raw = pyjwt.decode(verdict.attestation_jwt, options={"verify_signature": False}) assert raw["qwed_a2a"]["verdict"] == "heuristic_pass" assert raw["qwed_a2a"]["engine"] == "code_guard" @@ -310,17 +340,24 @@ class TestHeuristicPassTelemetry: async def test_heuristic_pass_increments_counter(self, code_interceptor): from qwed_a2a.utils.telemetry import get_metrics + before = get_metrics() + before_hp = before.total_heuristic_pass + before_errors = before.total_errors + msg = _code_message("x = 1") await code_interceptor.intercept(msg, trace_id="t_telemetry_hp") - metrics = get_metrics() - assert metrics.total_heuristic_pass >= 1 - assert metrics.total_errors == 0 + + after = get_metrics() + assert after.total_heuristic_pass >= before_hp + 1 + assert after.total_errors == before_errors @pytest.mark.asyncio async def test_heuristic_pass_not_counted_as_error(self, code_interceptor): from qwed_a2a.utils.telemetry import get_metrics + before_errors = get_metrics().total_errors + msg = _code_message("y = 2 + 2") await code_interceptor.intercept(msg, trace_id="t_telemetry_no_err") - metrics = get_metrics() - assert metrics.total_errors == 0 + + assert get_metrics().total_errors == before_errors diff --git a/tests/test_interceptor.py b/tests/test_interceptor.py index 0b4a500..da2b0d7 100644 --- a/tests/test_interceptor.py +++ b/tests/test_interceptor.py @@ -59,8 +59,8 @@ async def test_dangerous_code_blocked(self, interceptor, dangerous_code_message) assert "system" in verdict.reason assert verdict.engine_used == "code_guard" - async def test_safe_code_forwarded(self, interceptor, safe_code_message): - """Safe code should be forwarded.""" + async def test_safe_code_heuristic_pass(self, interceptor, safe_code_message): + """Safe code should return HEURISTIC_PASS — heuristic scan, not deterministic proof.""" verdict = await interceptor.intercept(safe_code_message, trace_id="t_code_safe") assert verdict.status == VerdictStatus.HEURISTIC_PASS assert verdict.engine_used == "code_guard"