fix(crypto): JWT replay prevention — validity 24h→5min, jti registry, context binding (closes #8)#26
Conversation
Three compounding replay vulnerabilities fixed:
1. Validity window: 86400s (24h) -> 300s (5min)
One A2A hop completes in <1s. 24-hour validity gave attackers
a 24-hour window to replay any intercepted attestation.
2. JtiRegistry — thread-safe, TTL-based replay prevention
RFC 7519 s4.1.7 requires implementations to reject previously
seen jti values. Added JtiRegistry (OrderedDict + threading.Lock)
with O(1) insertion and O(k) eviction of expired entries.
- Registered at sign time (issuer cannot replay its own tokens)
- Checked at verify time before accepting any token
- TTL aligned with validity_seconds — no unbounded memory growth
3. Context binding — deployment_id + session_id in claims
deployment_id: stable per-process identifier (pid + entropy)
session_id: caller-supplied, propagated into JWT claims
Binds each attestation to the exact deployment context that
issued it, making cross-context replay detectable.
verify_attestation() ordering:
1. Signature + expiry (PyJWT)
2. Required claims check
3. jti replay check (only after crypto is valid — avoids
polluting the registry with tampered tokens)
Tests (25 total, up from 6):
- JtiRegistry: 8 unit tests (eviction, threading, TTL, len)
- JWT round-trip: 6 tests (deployment_id, session_id, exp window)
- Tamper detection: 1 test
- Cross-instance: 1 test
- Hash determinism: 2 tests
- Replay prevention: 7 tests covering all attack vectors
|
Warning Review limit reached
More reviews will be available in 38 minutes and 23 seconds. Learn how PR review limits work. Your organization has run out of usage credits. Purchase more in the billing tab. ⌛ How to resolve this issue?After more reviews become available, a review can be triggered using the We recommend that you space out your commits to avoid hitting the rate limit. 🚦 How do rate limits work?CodeRabbit enforces hourly rate limits for each developer per organization. Our paid plans include higher PR review limits than trial, open-source, and free plans. In all cases, reviews become available again over time. During sustained high-volume PR review activity, CodeRabbit may temporarily slow when the next review becomes available. Please see our Fair Usage Limits Policy for further information. ℹ️ Review info⚙️ Run configurationConfiguration used: Path: .coderabbit.yaml Review profile: CHILL Plan: Pro Run ID: 📒 Files selected for processing (3)
📝 WalkthroughWalkthroughImplements ES256 JWT attestation with per-instance JTI replay prevention (TTL-based), binds tokens to a deployment_id and optional session_id, defaults token validity to 300s, registers JTIs at sign time, and adds comprehensive unit and integration tests for replay, tamper, and deployment-context validation. ChangesJWT Attestation with Replay Prevention
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Possibly related issues
Poem
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: a07ad31119
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "Codex (@codex) review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "Codex (@codex) address that feedback".
There was a problem hiding this comment.
🧹 Nitpick comments (1)
tests/test_crypto_signing.py (1)
122-140: 💤 Low valueConsider using a
threading.Barrierfor more robust concurrency testing.Without synchronization, threads started in a loop may serialize rather than execute concurrently — especially under the GIL. The test would still catch missing locks, but a Barrier ensures true contention:
♻️ Optional: Use Barrier for concurrent thread start
def test_thread_safety(self): """Concurrent registrations must not cause races or double-accepts.""" registry = JtiRegistry(ttl_seconds=300) results = [] lock = threading.Lock() + barrier = threading.Barrier(20) def register(): + barrier.wait() # all threads start simultaneously result = registry.check_and_register("shared-jti") with lock: results.append(result)🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@tests/test_crypto_signing.py` around lines 122 - 140, The test_thread_safety should coordinate thread start to ensure real concurrency: replace the current start-loop with a threading.Barrier so all worker threads wait at the barrier and then start the registry.check_and_register("shared-jti") call simultaneously; update the worker closure used by the threads to wait on the Barrier before invoking JtiRegistry.check_and_register and appending to results, keeping the existing assertions that exactly one True and nineteen False are recorded.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Nitpick comments:
In `@tests/test_crypto_signing.py`:
- Around line 122-140: The test_thread_safety should coordinate thread start to
ensure real concurrency: replace the current start-loop with a threading.Barrier
so all worker threads wait at the barrier and then start the
registry.check_and_register("shared-jti") call simultaneously; update the worker
closure used by the threads to wait on the Barrier before invoking
JtiRegistry.check_and_register and appending to results, keeping the existing
assertions that exactly one True and nineteen False are recorded.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
Run ID: b1f4d8da-4816-4db8-a914-1c95b98ef941
📒 Files selected for processing (2)
src/qwed_a2a/security/crypto.pytests/test_crypto_signing.py
… to thread test Codex P1: verify_attestation() validated signature, expiry, and jti replay — but never checked that the token's deployment_id matched the current deployment. A token issued in deployment A would verify in deployment B if signing keys were shared. Changes: - verify_attestation(): added step 4 — deployment context check - Extracts qwed_a2a.deployment_id from decoded claims - Compares against module-level _DEPLOYMENT_ID of current process - Rejects with 'Deployment context mismatch' before jti check runs - Ordering ensures cross-deployment tokens never pollute jti registry Tests (29 total, +4): - test_valid_token_passes_deployment_check: same-deployment tokens pass - test_cross_deployment_token_rejected: patched _DEPLOYMENT_ID rejected - test_missing_deployment_id_rejected: pre-fix legacy tokens rejected - test_deployment_id_check_runs_before_jti_check: cross-deployment tokens do not register their jti in the verifier registry CodeRabbit: added threading.Barrier(20) to test_thread_safety — all 20 threads now start simultaneously, maximising lock contention and exercising the registry under genuine concurrency.
…_ID env var Sentry MEDIUM: _DEPLOYMENT_ID was generated as pid+random at import time, making it unique to each Python process. verify_attestation() enforces that the token's deployment_id matches the verifier's ID — which means cross-process verification always fails in production, where the issuer and verifier run in separate processes. Fix: read _DEPLOYMENT_ID from QWED_A2A_DEPLOYMENT_ID env var so that all processes in the same logical deployment share a stable identifier. Falls back to a random hex string (not pid-tied) when the env var is unset — safe for testing but explicitly documented as non-functional for cross-process verification without the env var set.
There was a problem hiding this comment.
Actionable comments posted: 2
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
tests/test_crypto_signing.py (1)
346-356:⚠️ Potential issue | 🟠 Major | ⚡ Quick winSeed the verifier registry before asserting expiry precedence.
This verifier starts with an empty
JtiRegistry, so the test only proves that expired tokens are rejected. A replay-first implementation would still pass because there is no competing replay condition here. Pre-register the samejtion the verifier (or verify once before mutatingexp) so the test actually fails if replay beats expiry.Suggested tightening
verifier._key_pair = key_pair verifier._jti_registry = JtiRegistry(ttl_seconds=300) + assert verifier._jti_registry.check_and_register(raw["jti"]) is True is_valid, _, error = verifier.verify_attestation(expired_token) assert is_valid is False assert error is not None assert "expired" in error.lower()🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@tests/test_crypto_signing.py` around lines 346 - 356, The test currently uses a fresh verifier (A2ACryptoService created via __new__) with an empty JtiRegistry, so it only proves expiry rejection; to ensure replay detection takes precedence you must seed the verifier's registry with the same jti before calling verify_attestation on expired_token. Specifically, locate the test block that creates verifier, its _jti_registry = JtiRegistry(...), and expired_token, then register the token's jti into verifier._jti_registry (or call verifier.verify_attestation once with a non-expired token to populate the registry) so the subsequent call to A2ACryptoService.verify_attestation(expired_token) will fail if replay handling incorrectly beats expiry.
🧹 Nitpick comments (2)
tests/test_crypto_signing.py (2)
416-421: ⚡ Quick winPrefer real
A2ACryptoServiceinstances over__new__here.These verifier doubles bypass
__init__, so any constructor-set invariant added later can make the tests diverge from real deployment behavior. Construct a normal service and then swap in the shared key pair / fresh registry instead.Suggested tightening
- verifier = A2ACryptoService.__new__(A2ACryptoService) - verifier.issuer_id = issuer.issuer_id - verifier.validity_seconds = issuer.validity_seconds + verifier = A2ACryptoService( + issuer_id=issuer.issuer_id, + validity_seconds=issuer.validity_seconds, + ) issuer._ensure_key_pair() verifier._key_pair = issuer._key_pair verifier._jti_registry = JtiRegistry(ttl_seconds=300)Also applies to: 450-454, 474-479
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@tests/test_crypto_signing.py` around lines 416 - 421, Replace the ad-hoc verifier instance created with A2ACryptoService.__new__ by constructing a real A2ACryptoService via its normal constructor, then swap in the shared key pair and a fresh JtiRegistry; specifically, call A2ACryptoService(...) to obtain verifier (instead of __new__), set verifier.issuer_id and verifier.validity_seconds as needed, ensure issuer._ensure_key_pair() is called and then assign verifier._key_pair = issuer._key_pair and verifier._jti_registry = JtiRegistry(ttl_seconds=300); apply the same change to the other test sites that use __new__ (around the other occurrences noted).
132-144: ⚡ Quick winBound the barrier wait so this test can't deadlock the suite.
If one worker fails before reaching
barrier.wait(), the remaining threads block indefinitely and the subsequentjoin()never returns. Adding a timeout makes this fail fast in CI instead of hanging the run.Suggested tightening
- barrier = threading.Barrier(20) + barrier = threading.Barrier(20, timeout=5) ... - barrier.wait() # hold until all threads are ready + barrier.wait() # hold until all threads are ready ... for t in threads: - t.join() + t.join(timeout=5) + assert not t.is_alive()🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@tests/test_crypto_signing.py` around lines 132 - 144, The barrier.wait() call in the test's register() function can deadlock if any thread fails before waiting; update the use of the barrier (variable barrier in tests/test_crypto_signing.py and the nested register() function) to include a timeout (e.g., barrier.wait(timeout=...)) and handle threading.BrokenBarrierError/Timeout by failing the test fast (raise/assert/append a failure) so the remaining threads don't block indefinitely; ensure the join() still runs for cleanup.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@src/qwed_a2a/security/crypto.py`:
- Around line 116-126: The module currently falls back to a random value for
_DEPLOYMENT_ID which introduces non-determinism; change the initialization of
_DEPLOYMENT_ID to require the QWED_A2A_DEPLOYMENT_ID environment variable and
fail fast if it's missing: remove the os.urandom() fallback and, when
os.environ.get("QWED_A2A_DEPLOYMENT_ID") is None or empty, raise a clear
RuntimeError (or ValueError) during import with a message instructing the
operator to set QWED_A2A_DEPLOYMENT_ID so verification logic cannot silently use
a random ID.
- Around line 268-279: In A2ACryptoService.verify_attestation, validate the
top-level "qwed_a2a" claim with a Pydantic model before accessing deployment_id:
define a small Pydantic model (e.g., QwedA2AClaims with deployment_id: str) and
call QwedA2AClaims.parse_obj(claims.get("qwed_a2a")) (or .validate) to ensure
it's a mapping with the expected field; on ValidationError return (False, None,
"Invalid qwed_a2a claims" or similar) instead of dereferencing
qwed_claims.get("deployment_id"), then compare the validated.deployment_id to
_DEPLOYMENT_ID as before to enforce deployment context.
---
Outside diff comments:
In `@tests/test_crypto_signing.py`:
- Around line 346-356: The test currently uses a fresh verifier
(A2ACryptoService created via __new__) with an empty JtiRegistry, so it only
proves expiry rejection; to ensure replay detection takes precedence you must
seed the verifier's registry with the same jti before calling verify_attestation
on expired_token. Specifically, locate the test block that creates verifier, its
_jti_registry = JtiRegistry(...), and expired_token, then register the token's
jti into verifier._jti_registry (or call verifier.verify_attestation once with a
non-expired token to populate the registry) so the subsequent call to
A2ACryptoService.verify_attestation(expired_token) will fail if replay handling
incorrectly beats expiry.
---
Nitpick comments:
In `@tests/test_crypto_signing.py`:
- Around line 416-421: Replace the ad-hoc verifier instance created with
A2ACryptoService.__new__ by constructing a real A2ACryptoService via its normal
constructor, then swap in the shared key pair and a fresh JtiRegistry;
specifically, call A2ACryptoService(...) to obtain verifier (instead of
__new__), set verifier.issuer_id and verifier.validity_seconds as needed, ensure
issuer._ensure_key_pair() is called and then assign verifier._key_pair =
issuer._key_pair and verifier._jti_registry = JtiRegistry(ttl_seconds=300);
apply the same change to the other test sites that use __new__ (around the other
occurrences noted).
- Around line 132-144: The barrier.wait() call in the test's register() function
can deadlock if any thread fails before waiting; update the use of the barrier
(variable barrier in tests/test_crypto_signing.py and the nested register()
function) to include a timeout (e.g., barrier.wait(timeout=...)) and handle
threading.BrokenBarrierError/Timeout by failing the test fast
(raise/assert/append a failure) so the remaining threads don't block
indefinitely; ensure the join() still runs for cleanup.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
Run ID: 70502452-0ada-4f6f-a77e-e57a2b851fe1
📒 Files selected for processing (2)
src/qwed_a2a/security/crypto.pytests/test_crypto_signing.py
…c claims, test hardening CodeRabbit MAJOR (fail-closed deployment ID): - Removed random fallback for _DEPLOYMENT_ID - QWED_A2A_DEPLOYMENT_ID env var is now required at import time - Missing env var raises RuntimeError immediately — consistent with fail-closed philosophy; a random fallback silently breaks cross-process verification without any operator signal - tests/conftest.py: sets QWED_A2A_DEPLOYMENT_ID before qwed_a2a imports CodeRabbit MAJOR (Pydantic claims validation): - Added _QwedA2AClaims Pydantic model (version, verdict, engine, sender, receiver, deployment_id, session_id) - verify_attestation() now calls model_validate() on qwed_a2a claims before accessing deployment_id — malformed or non-mapping qwed_a2a values return 'Invalid qwed_a2a claims structure' instead of raising AttributeError Outside diff — expiry ordering test: - Pre-seeds the verifier jti registry before presenting expired token - Test now genuinely proves expiry check runs before replay check; a replay-first implementation would return 'Replay detected' not 'expired' Nitpick — __new__ replaced with A2ACryptoService() constructor: - All verifier doubles now use the normal constructor + key swap - Prevents future constructor-added invariants from diverging Nitpick — Barrier timeout + join timeout: - threading.Barrier(20, timeout=5) prevents CI deadlock if a thread fails before reaching the barrier - t.join(timeout=5) + assert not t.is_alive() fails fast in CI New tests (+2): - test_malformed_qwed_a2a_claim_rejected - test_missing_qwed_a2a_claim_rejected
Problem
Three compounding issues made every issued JWT attestation replay-vulnerable:
there was no registry, so the same JWT could be presented unlimited times
Changes
crypto.pyJtiRegistry— new class, thread-safe, TTL-based replay preventionOrderedDictpreserves insertion order for O(1) eviction of expired entriesthreading.Lockfor concurrent access safetyvalidity_seconds— no unbounded memory growthA2ACryptoServicevalidity_seconds:86400→300(5 minutes — sufficient for one A2A hop)sign_verdict(): addsdeployment_id(stable per-process) andsession_id(caller-supplied) to claims; registers jti immediately after signingverify_attestation(): checks jti registry after signature/expiry validation — tampered tokens never pollute the registryVerification ordering in
verify_attestation():test_crypto_signing.py25 tests (up from 6):
TestJtiRegistry— 8 unit tests: eviction, threading, TTL, bounded growthTestJWTRoundTrip—deployment_id,session_id,expwindowTestReplayPrevention— 7 tests: replay rejection, error message, ordering, tamper isolationAcceptance criteria
verify_attestation()rejects previously seen jti values"Replay detected: jti already seen"deployment_idandsession_idSummary by CodeRabbit
Security Enhancements
Tests