fix(interceptor): CodeGuard AST+heuristic dual-layer scan, HEURISTIC_PASS verdict — closes #9#27
Conversation
…PASS verdict — closes #9 A2A-005: CodeGuard was a regex-only heuristic mislabeled as 'code security verification'. Regex scans that pass prove absence of known signatures, not code safety. This collapses VERIFIED and HEURISTIC_PASS into a single trust claim — downstream consumers cannot tell the difference. Changes: schema.py - Added VerdictStatus.HEURISTIC_PASS = 'heuristic_pass' - Distinct from FORWARDED (deterministic proof) and UNVERIFIABLE telemetry.py - Added total_heuristic_pass counter to InterceptMetrics - record_intercept() routes 'heuristic_pass' correctly (not to errors) interceptor.py — _verify_code() rewritten: Layer 1 — AST structural analysis (primary): - ast.parse() on every payload; unparseable code fails CLOSED (BLOCKED) - ast.walk() checks: dangerous Name calls (eval/exec/compile/__import__), dangerous Attribute calls (.system/.popen/.run/.Popen/.call/ .check_output/.check_call), dangerous module imports (subprocess/importlib/ctypes/pty) - Catches string-concatenation bypass: getattr(__builtins__, 'ev'+'al') is caught by regex heuristic (Layer 2) Layer 2 — Regex heuristic scan (secondary, defense in depth): - getattr(__builtins__,...) — catches dynamic name-lookup bypasses - __builtins__.__dict__[...] — dict-based attribute bypass - base64.b64decode — encoded payload exfiltration - __import__(...) — direct dynamic import - os.system / os.popen — in case AST misses edge cases Return semantics: - BLOCKED → dangerous construct found (either layer) - HEURISTIC_PASS → both layers clean; never 'verified: True' - JWT for HEURISTIC_PASS declares verdict_status='heuristic_pass' so downstream consumers know they received a heuristic result Docstring updated: 'security verification' → 'heuristic scan' interceptor.py — intercept() dispatch: - Added HEURISTIC_PASS branch alongside UNVERIFIABLE - _build_verdict() issues signed JWT for HEURISTIC_PASS (verdict claim = 'heuristic_pass', not 'forwarded') tests/test_code_guard.py (31 new tests): - VerdictStatus enum assertions - Clean code → HEURISTIC_PASS (never FORWARDED) - JWT verdict claim = 'heuristic_pass' - AST: direct calls (eval/exec/compile/__import__) - AST: attribute calls (os.system, subprocess.run/Popen) - AST: imports (subprocess, importlib) - All bypass scenarios from issue #9 - Fail-closed on syntax error - Telemetry counter isolation tests/test_interceptor.py: - Updated safe_code_forwarded → HEURISTIC_PASS - Updated dangerous_code reason check (AST format)
|
Warning Review limit reached
More reviews will be available in 50 minutes and 16 seconds. Learn how PR review limits work. Your organization has run out of usage credits. Purchase more in the billing tab. ⌛ How to resolve this issue?After more reviews become available, a review can be triggered using the We recommend that you space out your commits to avoid hitting the rate limit. 🚦 How do rate limits work?CodeRabbit enforces hourly rate limits for each developer per organization. Our paid plans include higher PR review limits than trial, open-source, and free plans. In all cases, reviews become available again over time. During sustained high-volume PR review activity, CodeRabbit may temporarily slow when the next review becomes available. Please see our Fair Usage Limits Policy for further information. ℹ️ Review info⚙️ Run configurationConfiguration used: Path: .coderabbit.yaml Review profile: CHILL Plan: Pro Run ID: 📒 Files selected for processing (4)
📝 WalkthroughWalkthroughThis PR implements heuristic code verification via two-layer AST and regex scanning. It adds a ChangesHeuristic Code Verification with AST Scanning
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~25 minutes Possibly related issues
Possibly related PRs
Suggested labels
Poem
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 3584cf4125
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "Codex (@codex) review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "Codex (@codex) address that feedback".
There was a problem hiding this comment.
🧹 Nitpick comments (5)
src/qwed_a2a/utils/telemetry.py (1)
120-141: ⚡ Quick winUse
VerdictStatusinstead of raw status strings in metrics routing.
status: strplus literal comparisons can silently misclassify metrics on typos/new enum values (falling intototal_errors). Type this asVerdictStatusand compare against enum members.Proposed refactor
-from typing import Any, Callable, Dict, Optional +from typing import Any, Callable, Dict, Optional +from qwed_a2a.protocol.schema import VerdictStatus ... -def record_intercept( - status: str, +def record_intercept( + status: VerdictStatus, engine: Optional[str], sender_id: str, latency_ms: float, ) -> None: ... - if status == "forwarded": + if status == VerdictStatus.FORWARDED: metrics.total_forwarded += 1 - elif status == "blocked": + elif status == VerdictStatus.BLOCKED: metrics.total_blocked += 1 - elif status == "unverifiable": + elif status == VerdictStatus.UNVERIFIABLE: metrics.total_unverifiable += 1 - elif status == "heuristic_pass": + elif status == VerdictStatus.HEURISTIC_PASS: metrics.total_heuristic_pass += 1 else: metrics.total_errors += 1🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@src/qwed_a2a/utils/telemetry.py` around lines 120 - 141, The record_intercept function currently types status as str and routes by literal string comparisons; change status's type annotation to VerdictStatus (import the enum) and update each comparison to use the enum members (e.g., VerdictStatus.FORWARDED, VerdictStatus.BLOCKED, VerdictStatus.UNVERIFIABLE, VerdictStatus.HEURISTIC_PASS) instead of raw strings; keep the same increments (metrics.total_forwarded, etc.) and preserve the fallback to metrics.total_errors for any unknown/other enum values.src/qwed_a2a/interceptor.py (1)
326-334: ⚡ Quick winConsider adding
osto_DANGEROUS_IMPORTSfor consistency.The regex patterns explicitly check for
os.systemandos.popen(lines 350-351), butosis not in_DANGEROUS_IMPORTS. This creates a gap: code likegetattr(os, 'system')('cmd')bypasses both layers because:
import ospasses the import checkgetattris not in_DANGEROUS_CALL_NAMES- The regex only matches
getattr(__builtins__|builtins...Adding
osto_DANGEROUS_IMPORTSwould make the AST layer catchimport osand provide defense-in-depth for the regex patterns.Proposed fix
_DANGEROUS_IMPORTS: frozenset = frozenset( { "subprocess", "importlib", "ctypes", "pty", + "os", } )Also applies to: 340-342
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@src/qwed_a2a/interceptor.py` around lines 326 - 334, The import-layer whitelist is missing "os" which bypasses the AST-level block even though regexes later check for os.system/os.popen; update the frozenset _DANGEROUS_IMPORTS to include the string "os" so that import os is caught early (also apply the same addition to the other identical set at the second occurrence around lines referenced), keeping existing symbols like _DANGEROUS_CALL_NAMES and regex checks unchanged.tests/test_interceptor.py (1)
62-66: 💤 Low valueTest name and docstring are inconsistent with the assertion.
The function is named
test_safe_code_forwardedand the docstring says "should be forwarded", but the assertion expectsVerdictStatus.HEURISTIC_PASS. Consider renaming for clarity.Proposed fix
- async def test_safe_code_forwarded(self, interceptor, safe_code_message): - """Safe code should be forwarded.""" + async def test_safe_code_heuristic_pass(self, interceptor, safe_code_message): + """Safe code should receive HEURISTIC_PASS verdict.""" verdict = await interceptor.intercept(safe_code_message, trace_id="t_code_safe") assert verdict.status == VerdictStatus.HEURISTIC_PASS assert verdict.engine_used == "code_guard"🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@tests/test_interceptor.py` around lines 62 - 66, The test's name/docstring don't match its assertions: update the test so the name and docstring reflect that a safe code message produces a heuristic pass; e.g., rename test_safe_code_forwarded to test_safe_code_heuristic_pass and change the docstring to something like "Safe code should return HEURISTIC_PASS and be handled by code_guard", keeping the assertions against VerdictStatus.HEURISTIC_PASS and engine_used == "code_guard" that call interceptor.intercept(safe_code_message, trace_id="t_code_safe").tests/test_code_guard.py (2)
310-327: ⚡ Quick winMake telemetry assertions delta-based to avoid suite-order coupling.
These tests assert absolute global values (
total_errors == 0), which can become flaky when metrics are shared across tests. Snapshot before/after and assert deltas instead.🔧 Suggested update
class TestHeuristicPassTelemetry: `@pytest.mark.asyncio` async def test_heuristic_pass_increments_counter(self, code_interceptor): from qwed_a2a.utils.telemetry import get_metrics + before = get_metrics() + before_hp = before.total_heuristic_pass + before_errors = before.total_errors + msg = _code_message("x = 1") await code_interceptor.intercept(msg, trace_id="t_telemetry_hp") metrics = get_metrics() - assert metrics.total_heuristic_pass >= 1 - assert metrics.total_errors == 0 + assert metrics.total_heuristic_pass >= before_hp + 1 + assert metrics.total_errors == before_errors `@pytest.mark.asyncio` async def test_heuristic_pass_not_counted_as_error(self, code_interceptor): from qwed_a2a.utils.telemetry import get_metrics + before_errors = get_metrics().total_errors msg = _code_message("y = 2 + 2") await code_interceptor.intercept(msg, trace_id="t_telemetry_no_err") metrics = get_metrics() - assert metrics.total_errors == 0 + assert metrics.total_errors == before_errors🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@tests/test_code_guard.py` around lines 310 - 327, Tests read global counters directly (metrics.total_errors / metrics.total_heuristic_pass) causing order-dependent failures; update both tests to snapshot current metrics via get_metrics() before calling code_interceptor.intercept(), then call intercept and re-read get_metrics(), and assert on the delta (e.g., new.total_errors - old.total_errors == 0 and new.total_heuristic_pass - old.total_heuristic_pass >= 1 where appropriate). Modify test_heuristic_pass_increments_counter and test_heuristic_pass_not_counted_as_error to use these before/after comparisons instead of absolute assertions.
108-117: ⚡ Quick winAssert JWT
algintest_heuristic_pass_jwt_declares_correct_verdictThe attestation is decoded with
verify_signature=False, so the test currently checks claims but not the JWT header algorithm; adding analgassertion will catch any future regression away from the ES256 signing path.🔧 Suggested update
async def test_heuristic_pass_jwt_declares_correct_verdict(self, code_interceptor): """The JWT verdict claim must be 'heuristic_pass', not 'forwarded'.""" import jwt as pyjwt msg = _code_message("x = 42") verdict = await code_interceptor.intercept(msg, trace_id="t_jwt_claim") assert verdict.attestation_jwt is not None + header = pyjwt.get_unverified_header(verdict.attestation_jwt) + assert header["alg"] in {"ES256", "ES384", "ES512"} raw = pyjwt.decode(verdict.attestation_jwt, options={"verify_signature": False}) assert raw["qwed_a2a"]["verdict"] == "heuristic_pass" assert raw["qwed_a2a"]["engine"] == "code_guard"🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@tests/test_code_guard.py` around lines 108 - 117, In test_heuristic_pass_jwt_declares_correct_verdict, after obtaining verdict.attestation_jwt and decoding the payload with pyjwt.decode, also call pyjwt.get_unverified_header(verdict.attestation_jwt) and assert its "alg" equals "ES256" to ensure the attestation uses the expected ES256 signing algorithm; update the test to add this header assertion alongside the existing payload checks referencing verdict.attestation_jwt and pyjwt functions.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Nitpick comments:
In `@src/qwed_a2a/interceptor.py`:
- Around line 326-334: The import-layer whitelist is missing "os" which bypasses
the AST-level block even though regexes later check for os.system/os.popen;
update the frozenset _DANGEROUS_IMPORTS to include the string "os" so that
import os is caught early (also apply the same addition to the other identical
set at the second occurrence around lines referenced), keeping existing symbols
like _DANGEROUS_CALL_NAMES and regex checks unchanged.
In `@src/qwed_a2a/utils/telemetry.py`:
- Around line 120-141: The record_intercept function currently types status as
str and routes by literal string comparisons; change status's type annotation to
VerdictStatus (import the enum) and update each comparison to use the enum
members (e.g., VerdictStatus.FORWARDED, VerdictStatus.BLOCKED,
VerdictStatus.UNVERIFIABLE, VerdictStatus.HEURISTIC_PASS) instead of raw
strings; keep the same increments (metrics.total_forwarded, etc.) and preserve
the fallback to metrics.total_errors for any unknown/other enum values.
In `@tests/test_code_guard.py`:
- Around line 310-327: Tests read global counters directly (metrics.total_errors
/ metrics.total_heuristic_pass) causing order-dependent failures; update both
tests to snapshot current metrics via get_metrics() before calling
code_interceptor.intercept(), then call intercept and re-read get_metrics(), and
assert on the delta (e.g., new.total_errors - old.total_errors == 0 and
new.total_heuristic_pass - old.total_heuristic_pass >= 1 where appropriate).
Modify test_heuristic_pass_increments_counter and
test_heuristic_pass_not_counted_as_error to use these before/after comparisons
instead of absolute assertions.
- Around line 108-117: In test_heuristic_pass_jwt_declares_correct_verdict,
after obtaining verdict.attestation_jwt and decoding the payload with
pyjwt.decode, also call pyjwt.get_unverified_header(verdict.attestation_jwt) and
assert its "alg" equals "ES256" to ensure the attestation uses the expected
ES256 signing algorithm; update the test to add this header assertion alongside
the existing payload checks referencing verdict.attestation_jwt and pyjwt
functions.
In `@tests/test_interceptor.py`:
- Around line 62-66: The test's name/docstring don't match its assertions:
update the test so the name and docstring reflect that a safe code message
produces a heuristic pass; e.g., rename test_safe_code_forwarded to
test_safe_code_heuristic_pass and change the docstring to something like "Safe
code should return HEURISTIC_PASS and be handled by code_guard", keeping the
assertions against VerdictStatus.HEURISTIC_PASS and engine_used == "code_guard"
that call interceptor.intercept(safe_code_message, trace_id="t_code_safe").
ℹ️ Review info
⚙️ Run configuration
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
Run ID: d68f42f3-999a-4954-9039-8fd6b1419c0e
📒 Files selected for processing (5)
src/qwed_a2a/interceptor.pysrc/qwed_a2a/protocol/schema.pysrc/qwed_a2a/utils/telemetry.pytests/test_code_guard.pytests/test_interceptor.py
… type telemetry
Sentry HIGH / Codex P1 — false positives in _DANGEROUS_ATTR_CALLS:
_DANGEROUS_ATTR_CALLS blocked any method named .run()/.call()/.popen()
regardless of the receiver object. thread.run(), client.call(), and any
custom job.run() would all be incorrectly BLOCKED.
Fix: Replace the flat frozenset with _DANGEROUS_RECEIVER_METHODS — a dict
mapping known dangerous module names to their dangerous methods:
subprocess -> {run, Popen, call, check_output, check_call, popen}
os -> {system, popen}
AST walk now checks isinstance(func.value, ast.Name) and matches
(receiver, method) pairs. thread.run() passes; subprocess.run() is still
blocked. Caveat documented: import aliases (subprocess as sp) evade this
layer — regex heuristics provide secondary coverage.
CodeRabbit — rejected 'add os to _DANGEROUS_IMPORTS':
Blocking all 'import os' would false-positive on os.path, os.environ,
os.getcwd etc. The receiver+method fix correctly handles os.system /
os.popen without this blanket block.
CodeRabbit — VerdictStatus in telemetry (was raw str):
- record_intercept() signature changed: status: str -> status: VerdictStatus
- All comparisons changed to enum members (VerdictStatus.FORWARDED etc.)
- Call site in _record() changed: verdict.status.value -> verdict.status
- Prevents silent misclassification to total_errors on typos or new values
CodeRabbit — delta-based telemetry assertions in test_code_guard.py:
- Before/after snapshot pattern instead of absolute global counter checks
- Prevents suite-order coupling if autouse reset ever changes
CodeRabbit — JWT alg assertion:
- test_heuristic_pass_jwt_declares_correct_verdict now checks header['alg']
- Catches regressions away from ES256 signing path
CodeRabbit — test name inconsistency:
- test_safe_code_forwarded renamed to test_safe_code_heuristic_pass
- Docstring updated to reflect HEURISTIC_PASS semantics
New false positive regression tests (+3):
- test_thread_run_not_blocked (threading.Thread.start())
- test_arbitrary_run_method_not_blocked (custom job.run())
- test_subprocess_run_still_blocked (confirms real threat still caught)
Problem (A2A-005)
_verify_code()was a regex-only heuristic scan — mislabeled as "code securityverification". Regex scans that pass prove absence of known signatures, not
code safety. A clean regex result was returned as
verified: True→FORWARDED→ signed JWT. Downstream consumers had no way to distinguish this from a
deterministic verification.
Bypass examples from the issue all exploited this:
getattr(__builtins__, 'ev'+'al')(...)__builtins__.__dict__['exec'](...)base64.b64decode(...)as payload carrierChanges
schema.pyVerdictStatus.HEURISTIC_PASS = "heuristic_pass"— distinct fromFORWARDEDinterceptor.py—_verify_code()rewritten with two analysis layersLayer 1 — AST structural analysis (primary)
ast.parse()on every payload — unparseable code fails closed (BLOCKED)ast.walk()detects: dangerousNamecalls (eval/exec/compile/__import__),dangerous
.attrcalls (.system/.popen/.run/.Popen/.call/.check_output),dangerous module imports (
subprocess/importlib/ctypes/pty)Layer 2 — Regex heuristic scan (secondary, defense in depth)
Catches dynamic/obfuscated patterns that survive AST parsing:
getattr(__builtins__, ...)— dynamic name-lookup bypass__builtins__.__dict__[...]— dict-based attribute bypassbase64.b64decode(...)— encoded payload exfiltrationReturn semantics:
BLOCKED→ dangerous construct found (either layer)HEURISTIC_PASS→ both layers clean; neververified: Trueverdict_status = "heuristic_pass"— downstreamconsumers can inspect the claim and decide their acceptance threshold
telemetry.pytotal_heuristic_passcounter — not routed tototal_errorsTests (31 new in
test_code_guard.py)VerdictStatus.HEURISTIC_PASSenum assertionsHEURISTIC_PASS, neverFORWARDEDverdictclaim ="heuristic_pass"Acceptance criteria
_verify_code()usesast.parse()+ node walking as primary analysisverified: True—HEURISTIC_PASSat bestVerdictStatus.HEURISTIC_PASSexists and is distinct fromFORWARDEDverdict = heuristic_passSummary by CodeRabbit
New Features
Tests