Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -247,9 +247,12 @@ Every verdict includes a signed **ES256 JWT attestation**:
| **Expiry** | 24 hours default |
| **Cross-service** | Each instance has its own key pair |

### Graceful Degradation
### Fail-Closed Attestations

If `cryptography` and `PyJWT` are not installed, the interceptor operates **without attestations**. Verdicts are still generated, but `attestation_jwt` will be `None`.
`cryptography` and `PyJWT` are required dependencies. If they are unavailable,
the interceptor raises at startup instead of returning unsigned verdicts. Signing
failures also fail closed so `attestation_jwt=None` is never emitted as a normal
verdict.

---

Expand Down
55 changes: 31 additions & 24 deletions src/qwed_a2a/interceptor.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
1. Validate incoming message schema (Pydantic)
2. Enforce trust boundary (allowlist/blocklist/rate limit)
3. Route payload to appropriate verification engine
4. Sign the verdict with JWT attestation (if crypto available)
4. Sign the verdict with JWT attestation
5. Return structured VerificationVerdict
"""

Expand All @@ -52,15 +52,17 @@
for agent in self.config.trusted_agents:
self.trust.trust_agent(agent)

# Graceful crypto degradation — attestations disabled if deps missing
# Fail closed if the attestation stack is unavailable. Unsigned verdicts
# would look operational while dropping QWED's core trust guarantee.
if crypto_service is not None:
self.crypto: Optional[A2ACryptoService] = crypto_service
self.crypto: A2ACryptoService = crypto_service
elif HAS_CRYPTO:
self.crypto = A2ACryptoService()
else:
self.crypto = None
logger.warning(
"Crypto dependencies unavailable; attestation JWTs will be disabled"
raise RuntimeError(
"QWED-A2A requires 'cryptography' and 'PyJWT' packages. "
"Install with: pip install qwed-a2a. "
"Operating without JWT attestations violates fail-closed policy."
)

@trace_intercept
Expand Down Expand Up @@ -286,17 +288,17 @@

# Compiled regex patterns for case-insensitive, whitespace-tolerant detection
_DANGEROUS_PATTERNS: Dict[str, re.Pattern] = {
"eval": re.compile(r"\beval\s*\(", re.IGNORECASE),

Check warning on line 291 in src/qwed_a2a/interceptor.py

View check run for this annotation

QWED Security / QWED Security

QWED: pattern_scan

compile() can be part of dynamic code generation. Context=RUNTIME_CODE. Decision reason: Executable runtime path contains a risky but non-blocking pattern.
"exec": re.compile(r"\bexec\s*\(", re.IGNORECASE),

Check warning on line 292 in src/qwed_a2a/interceptor.py

View check run for this annotation

QWED Security / QWED Security

QWED: pattern_scan

compile() can be part of dynamic code generation. Context=RUNTIME_CODE. Decision reason: Executable runtime path contains a risky but non-blocking pattern.
"subprocess": re.compile(

Check warning on line 293 in src/qwed_a2a/interceptor.py

View check run for this annotation

QWED Security / QWED Security

QWED: pattern_scan

compile() can be part of dynamic code generation. Context=RUNTIME_CODE. Decision reason: Executable runtime path contains a risky but non-blocking pattern.
r"\b(?:subprocess\s*\.|import\s+subprocess\b|from\s+subprocess\s+import\b)",
re.IGNORECASE,
),
"os.system": re.compile(r"\bos\.system\s*\(", re.IGNORECASE),

Check warning on line 297 in src/qwed_a2a/interceptor.py

View check run for this annotation

QWED Security / QWED Security

QWED: pattern_scan

compile() can be part of dynamic code generation. Context=RUNTIME_CODE. Decision reason: Executable runtime path contains a risky but non-blocking pattern.
"os.popen": re.compile(r"\bos\.popen\s*\(", re.IGNORECASE),

Check warning on line 298 in src/qwed_a2a/interceptor.py

View check run for this annotation

QWED Security / QWED Security

QWED: pattern_scan

compile() can be part of dynamic code generation. Context=RUNTIME_CODE. Decision reason: Executable runtime path contains a risky but non-blocking pattern.
"__import__": re.compile(r"__import__\s*\(", re.IGNORECASE),

Check warning on line 299 in src/qwed_a2a/interceptor.py

View check run for this annotation

QWED Security / QWED Security

QWED: pattern_scan

compile() can be part of dynamic code generation. Context=RUNTIME_CODE. Decision reason: Executable runtime path contains a risky but non-blocking pattern.
"compile": re.compile(r"\bcompile\s*\(", re.IGNORECASE),

Check warning on line 300 in src/qwed_a2a/interceptor.py

View check run for this annotation

QWED Security / QWED Security

QWED: pattern_scan

compile() can be part of dynamic code generation. Context=RUNTIME_CODE. Decision reason: Executable runtime path contains a risky but non-blocking pattern.
"importlib": re.compile(r"\bimportlib\s*\.", re.IGNORECASE),

Check warning on line 301 in src/qwed_a2a/interceptor.py

View check run for this annotation

QWED Security / QWED Security

QWED: pattern_scan

compile() can be part of dynamic code generation. Context=RUNTIME_CODE. Decision reason: Executable runtime path contains a risky but non-blocking pattern.
}

def _verify_code(self, payload: Dict[str, Any]) -> Dict[str, Any]:
Expand Down Expand Up @@ -345,24 +347,29 @@
message: AgentMessage,
details: Optional[Dict[str, Any]] = None,
) -> VerificationVerdict:
"""Build a VerificationVerdict with optional JWT attestation."""
attestation_jwt = None

if self.crypto is not None:
try:
payload_hash = A2ACryptoService.hash_content(
json.dumps(message.payload, sort_keys=True, default=str)
)
attestation_jwt = self.crypto.sign_verdict(
trace_id=trace_id,
verdict_status=status.value,
engine=engine,
sender_id=message.sender_agent_id,
receiver_id=message.receiver_agent_id,
payload_hash=payload_hash,
)
except Exception as exc:
logger.warning("Failed to sign attestation: %s", exc)
"""Build a VerificationVerdict with a required JWT attestation."""
try:
payload_hash = A2ACryptoService.hash_content(
json.dumps(message.payload, sort_keys=True, default=str)
)
attestation_jwt = self.crypto.sign_verdict(
trace_id=trace_id,
verdict_status=status.value,
engine=engine,
sender_id=message.sender_agent_id,
Comment on lines +355 to +359

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Validate signer output before returning a verdict

The fail-closed change still accepts any non-exception return from sign_verdict, so an injected crypto_service that returns None or an empty string on soft failure will emit a normal VerificationVerdict with a missing/invalid attestation. This directly violates the new “never emit unsigned verdicts” guarantee and can happen in real deployments that provide a custom signer wrapper. Treat falsy/invalid token output as a signing failure and raise instead of returning the verdict.

Useful? React with 👍 / 👎.

receiver_id=message.receiver_agent_id,
payload_hash=payload_hash,
)
except Exception as exc:
logger.error("Failed to sign attestation: %s", exc)
raise RuntimeError(
"Failed to sign attestation; refusing to return an unsigned verdict"
) from exc

if not attestation_jwt:
raise RuntimeError(
"sign_verdict returned an empty token — refusing to emit unsigned verdict"
)

return VerificationVerdict(
status=status,
Expand Down
41 changes: 41 additions & 0 deletions tests/test_interceptor.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,47 @@ async def test_contradiction_blocked(self, interceptor, contradictory_logic_mess
class TestInterceptorGeneral:
"""General message handling tests."""

async def test_missing_crypto_dependencies_fail_startup(self, monkeypatch):
"""Missing crypto dependencies should fail closed during startup."""
monkeypatch.setattr("qwed_a2a.interceptor.HAS_CRYPTO", False)

with pytest.raises(RuntimeError, match="requires 'cryptography' and 'PyJWT'"):
A2AVerificationInterceptor()

async def test_attestation_signing_failure_is_not_returned_as_normal_verdict(
self, trust_boundary, general_message
):
"""Signing failures must not produce normal verdicts without attestations."""

class FailingCryptoService:
def sign_verdict(self, **_kwargs):
raise RuntimeError("signer unavailable")

interceptor = A2AVerificationInterceptor(
crypto_service=FailingCryptoService(),
trust_boundary=trust_boundary,
)

with pytest.raises(RuntimeError, match="Failed to sign attestation"):
await interceptor.intercept(general_message, trace_id="t_gen_sign_fail")

async def test_empty_attestation_is_not_returned_as_normal_verdict(
self, trust_boundary, general_message
):
"""Falsy attestation tokens must fail closed like signing errors."""

class EmptyTokenCryptoService:
def sign_verdict(self, **_kwargs):
return None

interceptor = A2AVerificationInterceptor(
crypto_service=EmptyTokenCryptoService(),
trust_boundary=trust_boundary,
)

with pytest.raises(RuntimeError, match="sign_verdict returned an empty token"):
await interceptor.intercept(general_message, trace_id="t_gen_empty_token")

async def test_general_passthrough(self, interceptor, general_message):
"""General messages should pass through without verification."""
verdict = await interceptor.intercept(general_message, trace_id="t_gen_pass")
Expand Down
Loading