Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
178 changes: 154 additions & 24 deletions src/qwed_a2a/interceptor.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
This is the [QWED Core] module — all inter-agent messages flow through here.
"""

import ast
import json
import re
import time
Expand Down Expand Up @@ -128,6 +129,15 @@
message=message,
details=engine_result,
)
elif engine_result.get("status") == "heuristic_pass":
verdict = self._build_verdict(
trace_id=trace_id,
status=VerdictStatus.HEURISTIC_PASS,
reason=engine_result.get("reason"),
engine=engine_result["engine"],
message=message,
details=engine_result,
)
elif engine_result["verified"]:
verdict = self._build_verdict(
trace_id=trace_id,
Expand Down Expand Up @@ -290,56 +300,171 @@
"reason": "No contradictions found in assertions",
}

# Compiled regex patterns for case-insensitive, whitespace-tolerant detection
# ── AST dangerous node types ─────────────────────────────────────────────
# Used by _verify_code_ast() for structural (not textual) analysis.
# These are deterministic: if the AST contains one of these constructs
# the payload is blocked regardless of how obfuscated the source is.
_DANGEROUS_CALL_NAMES: frozenset = frozenset(
{
"eval",
"exec",
"compile",
"__import__",
}
)
# Maps known dangerous module names to the specific method names that are
# dangerous when called on that module. This is intentionally scoped to
# (receiver, method) pairs to avoid false positives:
# thread.run() — safe, receiver is not subprocess/os
# client.call() — safe, receiver is not subprocess/os
# subprocess.run() — dangerous, receiver IS subprocess

Check notice on line 320 in src/qwed_a2a/interceptor.py

View check run for this annotation

QWED Security / QWED Security

QWED: pattern_scan

subprocess invocation detected. Context=COMMENT. Decision reason: Pattern detected in a non-executable context.
# Limitation: import aliasing (e.g., `import subprocess as sp; sp.run()`)
# is not caught here — the regex heuristic layer provides partial coverage.
_DANGEROUS_RECEIVER_METHODS: Dict[str, frozenset] = {
"subprocess": frozenset(
{"run", "Popen", "call", "check_output", "check_call", "popen"}
),
"os": frozenset({"system", "popen"}),
}
# Dangerous module import names (caught at ast.Import / ast.ImportFrom level)
_DANGEROUS_IMPORTS: frozenset = frozenset(
{
"subprocess",
"importlib",
"ctypes",
"pty",
}
)

# ── Regex patterns as secondary heuristic layer ───────────────────────────
# Catch obfuscation patterns that survive AST parsing: encoded strings,
# getattr-based lookups, and dynamic attribute construction.
_DANGEROUS_PATTERNS: Dict[str, re.Pattern] = {
"eval": re.compile(r"\beval\s*\(", re.IGNORECASE),
"exec": re.compile(r"\bexec\s*\(", re.IGNORECASE),
"subprocess": re.compile(
r"\b(?:subprocess\s*\.|import\s+subprocess\b|from\s+subprocess\s+import\b)",
re.IGNORECASE,
"getattr_builtin": re.compile(

Check warning on line 343 in src/qwed_a2a/interceptor.py

View check run for this annotation

QWED Security / QWED Security

QWED: pattern_scan

compile() can be part of dynamic code generation. Context=RUNTIME_CODE. Decision reason: Executable runtime path contains a risky but non-blocking pattern.
r"""getattr\s*\(\s*(?:__builtins__|builtins)\s*""", re.IGNORECASE
),
"builtins_dict_access": re.compile(

Check warning on line 346 in src/qwed_a2a/interceptor.py

View check run for this annotation

QWED Security / QWED Security

QWED: pattern_scan

compile() can be part of dynamic code generation. Context=RUNTIME_CODE. Decision reason: Executable runtime path contains a risky but non-blocking pattern.
r"""__builtins__\s*\.\s*__dict__\s*\[""", re.IGNORECASE
),
"os.system": re.compile(r"\bos\.system\s*\(", re.IGNORECASE),
"os.popen": re.compile(r"\bos\.popen\s*\(", re.IGNORECASE),
"__import__": re.compile(r"__import__\s*\(", re.IGNORECASE),
"compile": re.compile(r"\bcompile\s*\(", re.IGNORECASE),
"importlib": re.compile(r"\bimportlib\s*\.", re.IGNORECASE),
"base64_exec": re.compile(

Check warning on line 349 in src/qwed_a2a/interceptor.py

View check run for this annotation

QWED Security / QWED Security

QWED: pattern_scan

compile() can be part of dynamic code generation. Context=RUNTIME_CODE. Decision reason: Executable runtime path contains a risky but non-blocking pattern.
r"""(?:base64\s*\.\s*b64decode|b64decode)\s*\(""", re.IGNORECASE
),
"dynamic_import": re.compile(r"""__import__\s*\(""", re.IGNORECASE),

Check warning on line 352 in src/qwed_a2a/interceptor.py

View check run for this annotation

QWED Security / QWED Security

QWED: pattern_scan

compile() can be part of dynamic code generation. Context=RUNTIME_CODE. Decision reason: Executable runtime path contains a risky but non-blocking pattern.
"os_system": re.compile(r"""\bos\.system\s*\(""", re.IGNORECASE),

Check warning on line 353 in src/qwed_a2a/interceptor.py

View check run for this annotation

QWED Security / QWED Security

QWED: pattern_scan

compile() can be part of dynamic code generation. Context=RUNTIME_CODE. Decision reason: Executable runtime path contains a risky but non-blocking pattern.
"os_popen": re.compile(r"""\bos\.popen\s*\(""", re.IGNORECASE),

Check warning on line 354 in src/qwed_a2a/interceptor.py

View check run for this annotation

QWED Security / QWED Security

QWED: pattern_scan

compile() can be part of dynamic code generation. Context=RUNTIME_CODE. Decision reason: Executable runtime path contains a risky but non-blocking pattern.
}

def _verify_code(self, payload: Dict[str, Any]) -> Dict[str, Any]:
"""
Lightweight code security verification.
Heuristic code security scan: AST structural analysis + regex patterns.

Important: this is a heuristic scan, NOT deterministic verification.
A HEURISTIC_PASS result means no known dangerous constructs were found —
it does NOT mean the code is safe. Obfuscated or novel attack patterns
may not be detected.

Scans for dangerous patterns using case-insensitive regex
to prevent trivial bypass via casing or whitespace.
Analysis layers (run in order):
1. AST parse — catches direct dangerous calls and imports
structurally, before any text-level obfuscation can hide them
2. Regex scan — secondary heuristic for dynamic access patterns
(getattr(__builtins__,...), base64-encoded payloads, etc.)

Returns HEURISTIC_PASS when no threats found, BLOCKED when any found.
"""
code = payload.get("code", "")

if not code:
return {
"verified": True,
"verified": False,
"status": "heuristic_pass",
"engine": "code_guard",
"reason": "No code to analyze",
}

# ── Layer 1: AST structural analysis ──────────────────────────────────
try:
tree = ast.parse(code)
except SyntaxError as exc:
# Unparseable code could indicate obfuscation or raw bytecode;
# fail closed — do not forward what we cannot analyse.
return {
"verified": False,
"engine": "code_guard",
"reason": f"Code failed AST parsing — cannot verify: {exc}",
}

ast_threats: list = []
for node in ast.walk(tree):
# Direct dangerous function calls: eval(...), exec(...), compile(...)

Check notice on line 398 in src/qwed_a2a/interceptor.py

View check run for this annotation

QWED Security / QWED Security

QWED: pattern_scan

Dangerous eval() call can execute untrusted code. Context=COMMENT. Decision reason: Pattern detected in a non-executable context.

Check notice on line 398 in src/qwed_a2a/interceptor.py

View check run for this annotation

QWED Security / QWED Security

QWED: pattern_scan

Dangerous exec() call can execute untrusted code. Context=COMMENT. Decision reason: Pattern detected in a non-executable context.

Check notice on line 398 in src/qwed_a2a/interceptor.py

View check run for this annotation

QWED Security / QWED Security

QWED: pattern_scan

compile() can be part of dynamic code generation. Context=COMMENT. Decision reason: Pattern detected in a non-executable context.
if isinstance(node, ast.Call):
func = node.func
if isinstance(func, ast.Name) and func.id in self._DANGEROUS_CALL_NAMES:
ast_threats.append(f"call:{func.id}()")
# Receiver-scoped attribute calls: subprocess.run(...), os.system(...)

Check notice on line 403 in src/qwed_a2a/interceptor.py

View check run for this annotation

QWED Security / QWED Security

QWED: pattern_scan

os.system() shell execution primitive detected. Context=COMMENT. Decision reason: Pattern detected in a non-executable context.

Check notice on line 403 in src/qwed_a2a/interceptor.py

View check run for this annotation

QWED Security / QWED Security

QWED: pattern_scan

subprocess invocation detected. Context=COMMENT. Decision reason: Pattern detected in a non-executable context.
# Only blocked when called on a known dangerous receiver — this avoids
# false positives from legitimate .run()/.call() on other objects.
elif isinstance(func, ast.Attribute):
if isinstance(func.value, ast.Name):
receiver = func.value.id
dangerous_methods = self._DANGEROUS_RECEIVER_METHODS.get(
receiver, frozenset()
)
if func.attr in dangerous_methods:
ast_threats.append(f"call:{receiver}.{func.attr}()")

# Dangerous imports: import subprocess, from ctypes import ...
elif isinstance(node, ast.Import):
for alias in node.names:
root = alias.name.split(".")[0]
if root in self._DANGEROUS_IMPORTS:
ast_threats.append(f"import:{root}")
elif isinstance(node, ast.ImportFrom):
if node.module:
root = node.module.split(".")[0]
if root in self._DANGEROUS_IMPORTS:
ast_threats.append(f"import:{root}")

if ast_threats:
return {
"verified": False,
"engine": "code_guard",
"reason": "No code to verify",
"reason": (
f"Dangerous constructs detected via AST analysis: "
f"{', '.join(ast_threats)}"
),
"threats": ast_threats,
"analysis": "ast",
}

found_threats = []
# ── Layer 2: Regex heuristic scan ─────────────────────────────────────
regex_threats: list = []
for label, pattern in self._DANGEROUS_PATTERNS.items():
if pattern.search(code):
found_threats.append(label)
regex_threats.append(label)

if found_threats:
if regex_threats:
return {
"verified": False,
"engine": "code_guard",
"reason": (
f"Dangerous code patterns detected: {', '.join(found_threats)}"
f"Suspicious patterns detected via heuristic scan: "
f"{', '.join(regex_threats)}"
),
"threats": found_threats,
"threats": regex_threats,
"analysis": "regex",
}

# ── Both layers clean — heuristic pass, not verified ──────────────────
return {
"verified": True,
"verified": False,
"status": "heuristic_pass",
"engine": "code_guard",
"reason": "No dangerous patterns found in code",
"reason": (
"AST analysis and heuristic scan found no known dangerous constructs. "
"This is a heuristic result — novel or deeply obfuscated attack "
"patterns may not be detected."
),
"analysis": "ast+regex",
}

def _build_verdict(
Expand All @@ -358,6 +483,11 @@
"""
attestation_jwt = None

# UNVERIFIABLE — no JWT (no verification ran, issuing one would be false)
# All other statuses (FORWARDED, BLOCKED, HEURISTIC_PASS) get signed JWTs.
# HEURISTIC_PASS JWTs declare verdict_status="heuristic_pass" in their
# claims so downstream consumers know they received a heuristic result,
# not a deterministic verification proof.
if status != VerdictStatus.UNVERIFIABLE:
try:
payload_hash = A2ACryptoService.hash_content(
Expand Down Expand Up @@ -400,7 +530,7 @@
"""Record telemetry for this intercept."""
latency_ms = (time.perf_counter() - start_time) * 1000
record_intercept(
status=verdict.status.value,
status=verdict.status,
engine=verdict.engine_used,
sender_id=sender_id,
latency_ms=latency_ms,
Expand Down
1 change: 1 addition & 0 deletions src/qwed_a2a/protocol/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ class VerdictStatus(str, Enum):
FORWARDED = "forwarded"
BLOCKED = "blocked"
UNVERIFIABLE = "unverifiable"
HEURISTIC_PASS = "heuristic_pass"
ERROR = "error"


Expand Down
16 changes: 12 additions & 4 deletions src/qwed_a2a/utils/telemetry.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,10 @@
from dataclasses import dataclass, field
from typing import Any, Callable, Dict, Optional

# Imported here (not at top) to avoid circular imports —
# schema is a leaf module with no telemetry dependency.
from qwed_a2a.protocol.schema import VerdictStatus

# Conditional Sentry import
try:
import sentry_sdk
Expand All @@ -32,6 +36,7 @@ class InterceptMetrics:
total_forwarded: int = 0
total_blocked: int = 0
total_unverifiable: int = 0
total_heuristic_pass: int = 0
total_errors: int = 0
total_latency_ms: float = 0.0
by_engine: Dict[str, int] = field(default_factory=lambda: defaultdict(int))
Expand All @@ -55,6 +60,7 @@ def to_dict(self) -> Dict[str, Any]:
"total_forwarded": self.total_forwarded,
"total_blocked": self.total_blocked,
"total_unverifiable": self.total_unverifiable,
"total_heuristic_pass": self.total_heuristic_pass,
"total_errors": self.total_errors,
"average_latency_ms": round(self.average_latency_ms, 2),
"block_rate": round(self.block_rate, 4),
Expand Down Expand Up @@ -116,7 +122,7 @@ def init_telemetry(


def record_intercept(
status: str,
status: VerdictStatus,
engine: Optional[str],
sender_id: str,
latency_ms: float,
Expand All @@ -127,12 +133,14 @@ def record_intercept(
metrics.total_latency_ms += latency_ms
metrics.by_sender[sender_id] = metrics.by_sender.get(sender_id, 0) + 1

if status == "forwarded":
if status == VerdictStatus.FORWARDED:
metrics.total_forwarded += 1
elif status == "blocked":
elif status == VerdictStatus.BLOCKED:
metrics.total_blocked += 1
elif status == "unverifiable":
elif status == VerdictStatus.UNVERIFIABLE:
metrics.total_unverifiable += 1
elif status == VerdictStatus.HEURISTIC_PASS:
metrics.total_heuristic_pass += 1
else:
metrics.total_errors += 1

Expand Down
Loading
Loading