diff --git a/lib/crewai/src/crewai/governance/__init__.py b/lib/crewai/src/crewai/governance/__init__.py new file mode 100644 index 0000000000..e505d92508 --- /dev/null +++ b/lib/crewai/src/crewai/governance/__init__.py @@ -0,0 +1,3 @@ +from crewai.governance.governance_decision import GovernanceDecision, GovernanceOutcome + +__all__ = ["GovernanceDecision", "GovernanceOutcome"] diff --git a/lib/crewai/src/crewai/governance/governance_decision.py b/lib/crewai/src/crewai/governance/governance_decision.py new file mode 100644 index 0000000000..2032bb0d30 --- /dev/null +++ b/lib/crewai/src/crewai/governance/governance_decision.py @@ -0,0 +1,430 @@ +""" +GovernanceDecision -- Vendor-neutral governance hook return type for CrewAI. + +This module defines the serialized contract that crew-level governance hooks +(before_tool_call / after_tool_call) can optionally return. External governance +engines (TealTiger, Neura Relay, Vaara, agent-guard, AlgoVoi, etc.) implement +this contract without requiring CrewAI to depend on any vendor package. + +The GovernanceDecision is the pre-execution authorization record. +The GovernanceOutcome is the post-execution result record, linked back +to the decision via decision_id and intent_ref. +The GovernanceSeal is the terminal record that pins the run's final count +for tail-drop detection. + +Vendor-specific evidence (signed receipts, Merkle proofs, etc.) lives +under the `extensions` dict and is never validated by CrewAI core. + +Canonicalization: All hash fields (params_hash, intent_digest, intent_ref, +receipt_ref, decision_context_hash) MUST be computed over RFC 8785 (JCS) +canonicalized JSON. json.dumps(sort_keys=True) is NOT JCS and diverges on +Unicode and non-integer fields. Use a compliant JCS library. + +Index base: 0-indexed. The first decision in a run is seq=0, running_count=1. +This matches the Vaara reference implementation (vaara.receipt/v1 SPEC.md 5.3). +""" + +from __future__ import annotations + +from typing import Any, Literal, TypedDict + + +class GovernanceDecision(TypedDict, total=False): + """Pre-execution authorization record returned by a governance hook. + + All fields are optional (total=False) to allow governance engines to + populate only the fields they support. However, route-specific validation + (via validate_governance_decision()) enforces that executable decisions + carry the binding fields needed for safe verification. + + Extensions are pass-through: CrewAI will serialize/deserialize them + without validation, allowing any governance engine to attach its own + evidence format (e.g., extensions["tealtiger"], extensions["vaara"], + extensions["agent_guard"], extensions["algovoi"]). + """ + + # --- Identity --- + + decision_id: str + """Unique identifier for this decision record (runtime-local UUID). + Used by GovernanceOutcome to link back.""" + + intent_ref: str + """Stable semantic identity of the authorized intent. + SHA-256(JCS({agent_id, tool, normalized_scope, intent_digest, idempotency_key})). + No timestamp — retries of the same authorized intent produce the same hash. + This is the normative cross-runtime join key between GovernanceDecision + and GovernanceOutcome. Idempotency checks bind to intent_ref equality.""" + + receipt_ref: str + """Per-record unique identity for audit enumeration. + SHA-256(JCS({...intent_ref_fields, issued_at})). + Includes timestamp — distinct records always have distinct receipt_ref. + Used for record counting and de-duplication across retries.""" + + # --- Agent context --- + + agent_id: str + """Identifier of the agent requesting the tool call.""" + + agent_role: str + """Role of the agent (e.g., 'Researcher', 'Admin').""" + + # --- Action context --- + + tool: str + """Name of the tool being invoked.""" + + request_id: str + """Unique identifier for the specific tool call request.""" + + params_hash: str + """SHA-256 hash of the RFC 8785 (JCS) canonicalized tool call parameters. + This is the hash of the requested form. See also intent_digest for the + normalized executable form.""" + + target: str + """Target resource or entity the tool operates on, if known.""" + + normalized_scope: str + """Explicit scope of the action (e.g., 'customers/eu', 'prod/deploy'). + Must not fall back to tool name alone — missing scope fails closed.""" + + # --- Intent binding (TOCTOU closure) --- + + intent_digest: str + """SHA-256 over the normalized executable action envelope: + (agent_id, tool, params_hash, target_state_digest). + The executor MUST recompute this immediately before the side effect. + Mismatch = fail closed (reason: INTENT_BINDING_MISMATCH).""" + + target_state_digest: str | None + """Hash of the target resource state at authorization time. If the target + state changed between authorization and execution, revalidation is required + (reason: TARGET_STATE_DRIFT).""" + + continuation_id: str | None + """For DEFER/REQUIRE_APPROVAL decisions: a resumption token. The deferred + action can only execute with this specific continuation_id, which binds to + the original intent. Mismatch = deny (reason: CONTINUATION_MISMATCH).""" + + normalization_id: str + """Identifies which normalization was applied before computing params_hash + and intent_digest. Examples: + - 'jcs-sha256' (structured tool args, RFC 8785 canonical) + - 'agent-guard-unwrap-v1' (shell command unwrapping) + - 'sql-normalize-v1' (SQL query normalization) + A verifier uses this to know how to recompute the digest.""" + + idempotency_key: str + """Unique key for this specific action attempt. A second execution with the + same decision_id + idempotency_key is denied (IDEMPOTENCY_VIOLATION).""" + + # --- Policy evaluation --- + + policy_refs: list[str] + """List of policy rule identifiers that were evaluated.""" + + retrieved_policy_refs: list[str] + """Stable refs to policy or memory records consulted (for adaptive governance).""" + + policy_digest: str + """Hash of the actual policy version evaluated.""" + + decision: Literal["allow", "deny", "require_approval", "revise"] + """The governance verdict for this tool call. + + Semantics: + - allow: executable, binding fields required + - deny: non-executable, recorded as first-class positive record + - require_approval: blocked until approval produces a valid decision + - revise: advisory feedback only; NO side effect; revised action requires + a new normalized envelope, new digest, and new decision_id. + Engines that don't implement revise simply never emit it. + """ + + reason: str + """Human-readable explanation of why this decision was made.""" + + # --- Lifecycle --- + + issued_at: str + """ISO 8601 timestamp of when this decision was issued.""" + + expires_at: str | None + """ISO 8601 timestamp after which this decision is invalid (fail-closed to deny).""" + + supersedes: str | None + """decision_id of a prior decision that this one explicitly overrides.""" + + revalidate_if: list[str] + """Conditions that require re-evaluation before execution. + Examples: ['argument_change', 'target_state_change', 'budget_change', + 'policy_version_change', 'scope_expansion', 'agent_identity_rotation']""" + + # --- Context --- + + decision_context_hash: str + """SHA-256 digest over JCS-canonicalized: + {agent_id, tool, params_hash, intent_digest, seq, retrieved_policy_refs, + policy_digest, credential_scope, credential_tier, expires_at, revalidate_if}. + Enables drift detection: if any input changed, the hash changes.""" + + credential_scope: str + """Authority scope available to the agent (e.g., 'read-only', 'production-write').""" + + credential_tier: str + """Credential tier level (e.g., 'service-account', 'human-delegated').""" + + # --- Evidence --- + + evidence_refs: list[str] + """References to external evidence artifacts (URIs, hashes, receipt IDs).""" + + extensions: dict[str, Any] + """Vendor-specific evidence. CrewAI passes this through without validation. + + Examples: + extensions["tealtiger"] = {"receipt_id": "...", "merkle_proof": "..."} + extensions["vaara"] = {"chain_hash": "...", "contiguity_report": "..."} + extensions["agent_guard"] = {"decision_code": "DENIED_BY_RULE", "attestation": "..."} + extensions["algovoi"] = {"keystone_ref": "...", "jcs_vectors": "..."} + extensions["neura"] = {"relay_id": "...", "action_card": "..."} + """ + + # --- Completeness evidence (omission detection) --- + + seq: int + """0-indexed monotonic position of this decision within the crew run. + First decision is seq=0. No gaps allowed. + + A verifier holding N records can detect internal gaps: if seq values do not + form a contiguous 0..N-1 range, records were dropped. Note: seq + running_count + detect INTERNAL gaps only. Tail-truncation detection requires a GovernanceSeal. + """ + + running_count: int + """Total number of decisions emitted in this run so far (including this one). + MUST equal seq + 1 for every record. + + If running_count != seq + 1, the record is malformed. If max(running_count) + across held records exceeds the number of held records, at least one record + was dropped. + """ + + +class GovernanceOutcome(TypedDict, total=False): + """Post-execution result record linked to a GovernanceDecision. + + Emitted after the tool call completes (or fails). The intent_ref + links this outcome back to the authorization record that preceded it. + decision_id provides backward compatibility. + """ + + decision_id: str + """Links back to the GovernanceDecision that authorized this execution.""" + + intent_ref: str + """Same intent_ref as the GovernanceDecision. This is the normative join key. + A verifier recomputes intent_ref from the decision-side fields and confirms + the outcome references the same authorized intent.""" + + receipt_ref: str + """Unique per-record identity for this outcome (includes timestamp).""" + + outcome: Literal["executed", "blocked", "error", "timeout"] + """What actually happened after the governance decision.""" + + tool_output_hash: str | None + """SHA-256 hash of the tool output (not the raw output itself).""" + + error_type: str | None + """Error class name if outcome is 'error'.""" + + error_message: str | None + """Error message if outcome is 'error'.""" + + completed_at: str + """ISO 8601 timestamp of when execution completed.""" + + extensions: dict[str, Any] + """Vendor-specific post-execution evidence.""" + + # Completeness back-reference + seq: int + """Same seq value as the GovernanceDecision this outcome links to. + Enables omission detection for outcomes: a missing outcome for a known + decision seq is a provable gap.""" + + +class GovernanceSeal(TypedDict, total=False): + """Terminal record emitted at the end of a governed run/session. + + Pins the run's final decision count so that tail-truncation (records + dropped from the end) is detectable — which per-record running_count + alone cannot catch. + + Layering: + 1. seq → ordering (0-indexed) + 2. running_count == seq + 1 → per-record consistency + 3. hash chain (extensions) → tamper-evidence + 4. GovernanceSeal → tail-drop detection (total pins expected count) + 5. RFC 3161 external anchor (extensions, optional) → residual closure + + Honest residual: a suffix drop that ALSO suppresses the seal stays + invisible from the held set alone. No field can close that. An external + anchor is required for full residual closure. + """ + + boundary_id: str + """Identifier for the run/session this seal covers (e.g., crew_run_id).""" + + sealed: Literal[True] + """Always True. Distinguishes seal records from decision records.""" + + total: int + """Total number of GovernanceDecision records emitted in this run. + A verifier expects exactly this many seq-bearing records (0..total-1). + If len(held_records) < total, at least (total - len) were dropped.""" + + final_seq: int + """The last seq value emitted before this seal. Should equal total - 1.""" + + sealed_at: str + """ISO 8601 timestamp of when the session was finalized.""" + + seal_hash: str | None + """Optional: SHA-256 digest of the concatenation of all decision_ids + in sequence order. Provides tamper-evidence for the seal itself.""" + + extensions: dict[str, Any] + """Vendor-specific seal evidence (e.g., RFC 3161 timestamp token).""" + + +# --- Validation --- + + +def validate_governance_decision(d: GovernanceDecision) -> tuple[bool, list[str]]: + """Validate a GovernanceDecision has the required fields for its route. + + The TypedDict is total=False (wire format flexibility), but this validator + enforces route-specific minimums so that executable decisions carry the + binding fields needed for safe verification. + + Returns: + (is_valid, list_of_errors) + """ + errors: list[str] = [] + + decision = d.get("decision") + if not decision: + errors.append("'decision' field is required") + return (False, errors) + + # All routes need at minimum: + if not d.get("decision_id"): + errors.append(f"'{decision}' requires 'decision_id'") + + if decision in ("allow", "require_approval"): + # Executable decisions need binding fields + required = ["agent_id", "tool", "issued_at"] + for field in required: + if not d.get(field): + errors.append(f"'{decision}' requires '{field}'") + + # Need at least one of intent_ref or params_hash for binding + if not d.get("intent_ref") and not d.get("params_hash"): + errors.append( + f"'{decision}' requires 'intent_ref' or 'params_hash' for intent binding" + ) + + # Need at least one policy reference + if not d.get("policy_refs"): + errors.append(f"'{decision}' requires at least one entry in 'policy_refs'") + + elif decision == "deny": + if not d.get("tool"): + errors.append("'deny' requires 'tool'") + if not d.get("reason"): + errors.append("'deny' requires 'reason'") + + elif decision == "revise": + if not d.get("tool"): + errors.append("'revise' requires 'tool'") + if not d.get("reason"): + errors.append("'revise' requires 'reason'") + if not d.get("revalidate_if"): + errors.append("'revise' requires 'revalidate_if' conditions") + + return (len(errors) == 0, errors) + + +# --- Contiguity Verification --- + + +def verify_contiguity( + records: list[dict[str, Any]], + seal: dict[str, Any] | None = None, +) -> bool: + """Verify that records form a complete, gap-free 0-indexed sequence. + + Checks: + 1. seq values form a contiguous 0..N-1 range (no gaps, no duplicates) + 2. running_count == seq + 1 for every record + 3. len(seq_records) == expected count + + When a seal is provided, additionally checks that len(seq_records) == + seal["total"]. This catches tail-truncation that per-record fields alone + cannot detect. + + Returns True if complete. Returns False if any gap, duplicate, count + mismatch, or seal violation exists. + + NOTE: This detects internal gaps and (with seal) tail drops. It CANNOT + detect a suffix drop that also suppresses the seal — that requires an + external anchor (RFC 3161 timestamp or equivalent). + """ + from collections import Counter + + # Separate seal records from decision records + seq_records = [r for r in records if not r.get("sealed")] + sealed_records = [r for r in records if r.get("sealed")] + + # Determine expected count + sealed_total = max( + (int(r["total"]) for r in sealed_records), default=0 + ) + + if seal is not None: + sealed_total = max(sealed_total, int(seal.get("total", 0))) + + if not seq_records: + return sealed_total == 0 + + seqs = [int(r["seq"]) for r in seq_records] + counts = [int(r["running_count"]) for r in seq_records] + + expected = max(max(seqs) + 1, max(counts), sealed_total) + + # Check for duplicates + duplicates = [s for s, n in Counter(seqs).items() if n > 1] + if duplicates: + return False + + # Check for gaps + missing = sorted(set(range(expected)) - set(seqs)) + if missing: + return False + + # Check running_count consistency (must == seq + 1) + count_mismatch = [ + r for r in seq_records if int(r["running_count"]) != int(r["seq"]) + 1 + ] + if count_mismatch: + return False + + # Check record count matches expected + if len(seq_records) != expected: + return False + + return True diff --git a/lib/crewai/tests/governance/test_governance_decision_fail_closed_contract.py b/lib/crewai/tests/governance/test_governance_decision_fail_closed_contract.py new file mode 100644 index 0000000000..1c87746282 --- /dev/null +++ b/lib/crewai/tests/governance/test_governance_decision_fail_closed_contract.py @@ -0,0 +1,162 @@ +""" +Fail-closed contract fixtures for GovernanceDecision. + +These tests are deliberately contract-level. They do not depend on a concrete +middleware hook implementation. Instead, they pin the expected behavior a +runtime/evaluator must preserve when binding an authorization record to an +executable candidate. + +Invariant: + authorization binds exact action + exact target state + exact continuation + + non-duplicate outcome +""" + +from __future__ import annotations + +from typing import Any, Literal + +from crewai.governance.governance_decision import GovernanceDecision, GovernanceOutcome + +BindingVerdict = Literal["allow", "deny", "revalidate"] + + +def evaluate_contract_binding( + decision: GovernanceDecision, + candidate: dict[str, Any], + existing_outcomes: list[GovernanceOutcome] | None = None, +) -> tuple[BindingVerdict, str]: + """Small test oracle for the fail-closed GovernanceDecision contract.""" + existing_outcomes = existing_outcomes or [] + + if decision.get("decision") != "allow": + return "deny", "decision_not_allow" + + for field in ("agent_id", "tool", "target", "normalized_scope"): + if decision.get(field) != candidate.get(field): + return "deny", f"{field}_mismatch" + + for field in ("intent_ref", "intent_digest", "params_hash"): + if decision.get(field) and decision.get(field) != candidate.get(field): + return "deny", "exact_intent_mismatch" + + if decision.get("continuation_id") != candidate.get("continuation_id"): + return "deny", "continuation_mismatch" + + if decision.get("target_state_digest") != candidate.get("target_state_digest"): + return "revalidate", "target_state_drift" + + for outcome in existing_outcomes: + same_decision = outcome.get("decision_id") == decision.get("decision_id") + same_intent = outcome.get("intent_ref") == decision.get("intent_ref") + same_idempotency = ( + outcome.get("extensions", {}).get("idempotency_key") + == decision.get("idempotency_key") + ) + terminal = outcome.get("outcome") in {"executed", "blocked", "error", "timeout"} + if terminal and same_decision and same_intent and same_idempotency: + return "deny", "duplicate_outcome" + + return "allow", "contract_binding_ok" + + +def base_allow_decision() -> GovernanceDecision: + return { + "decision_id": "d-fail-closed-001", + "intent_ref": "sha256:intent-ref-approved", + "receipt_ref": "sha256:receipt-ref-approved", + "agent_id": "support-bot", + "tool": "send_email", + "request_id": "req-fail-closed-001", + "target": "email:user@example.com", + "normalized_scope": "email/outbound/user-summary", + "params_hash": "sha256:params-approved", + "intent_digest": "sha256:intent-digest-approved", + "target_state_digest": "sha256:target-state-at-authorization", + "continuation_id": "cont:original-thread", + "normalization_id": "jcs-sha256", + "idempotency_key": "idem:send-summary:user@example.com:001", + "policy_refs": ["allow-user-summary-email-v1"], + "decision": "allow", + "reason": "Authorized exact outbound summary email.", + "issued_at": "2026-06-25T14:00:00Z", + "seq": 0, + "running_count": 1, + } + + +def matching_candidate() -> dict[str, Any]: + return { + "agent_id": "support-bot", + "tool": "send_email", + "target": "email:user@example.com", + "normalized_scope": "email/outbound/user-summary", + "params_hash": "sha256:params-approved", + "intent_ref": "sha256:intent-ref-approved", + "intent_digest": "sha256:intent-digest-approved", + "target_state_digest": "sha256:target-state-at-authorization", + "continuation_id": "cont:original-thread", + "idempotency_key": "idem:send-summary:user@example.com:001", + } + + +def test_exact_intent_mismatch_denies() -> None: + """Changed executable intent must deny, even if actor/tool/target match.""" + decision = base_allow_decision() + candidate = matching_candidate() + candidate["intent_digest"] = "sha256:intent-digest-mutated" + + verdict, reason = evaluate_contract_binding(decision, candidate) + + assert verdict == "deny" + assert reason == "exact_intent_mismatch" + + +def test_target_state_drift_revalidates() -> None: + """Same action against changed target state requires revalidation.""" + decision = base_allow_decision() + candidate = matching_candidate() + candidate["target_state_digest"] = "sha256:target-state-drifted" + + verdict, reason = evaluate_contract_binding(decision, candidate) + + assert verdict == "revalidate" + assert reason == "target_state_drift" + + +def test_continuation_mismatch_denies() -> None: + """Approved action cannot be replayed under another continuation.""" + decision = base_allow_decision() + candidate = matching_candidate() + candidate["continuation_id"] = "cont:different-thread" + + verdict, reason = evaluate_contract_binding(decision, candidate) + + assert verdict == "deny" + assert reason == "continuation_mismatch" + + +def test_duplicate_outcome_idempotency_collision_denies() -> None: + """A terminal outcome for the same idempotency key blocks re-execution.""" + decision = base_allow_decision() + candidate = matching_candidate() + existing_outcome: GovernanceOutcome = { + "decision_id": "d-fail-closed-001", + "intent_ref": "sha256:intent-ref-approved", + "receipt_ref": "sha256:outcome-receipt-001", + "outcome": "executed", + "tool_output_hash": "sha256:tool-output-001", + "completed_at": "2026-06-25T14:00:02Z", + "seq": 0, + "extensions": { + "idempotency_key": "idem:send-summary:user@example.com:001", + }, + } + + verdict, reason = evaluate_contract_binding( + decision, + candidate, + existing_outcomes=[existing_outcome], + ) + + assert verdict == "deny" + assert reason == "duplicate_outcome" diff --git a/lib/crewai/tests/test_governance_decision_contract.py b/lib/crewai/tests/test_governance_decision_contract.py new file mode 100644 index 0000000000..f445ede305 --- /dev/null +++ b/lib/crewai/tests/test_governance_decision_contract.py @@ -0,0 +1,748 @@ +""" +Contract tests for GovernanceDecision, GovernanceOutcome, and GovernanceSeal. + +These tests validate that: +1. All four decision routes produce valid GovernanceDecision dicts +2. Extensions round-trip through JSON without validation failures +3. GovernanceOutcome links back to a decision via intent_ref +4. Unknown extension payloads are preserved without modification +5. Error outcomes carry error_type and error_message +6. seq/running_count enable omission detection (0-indexed) +7. GovernanceSeal detects tail-truncation +8. validate_governance_decision enforces route-specific required fields +9. Intent binding (TOCTOU closure) tests +10. intent_ref / receipt_ref identity split + +No vendor imports. No external dependencies beyond stdlib. +""" + +import json +from typing import Any + +from crewai.governance.governance_decision import ( + GovernanceDecision, + GovernanceOutcome, + GovernanceSeal, + validate_governance_decision, + verify_contiguity, +) + + +# ============================================================================= +# Section 1: Core Decision Route Fixtures (0-indexed) +# ============================================================================= + +FIXTURE_ALLOW: GovernanceDecision = { + "decision_id": "d-001", + "intent_ref": "sha256:a1b2c3d4e5f6a7b8c9d0e1f2a3b4c5d6e7f8a9b0c1d2e3f4a5b6c7d8e9f0a1b2", + "receipt_ref": "sha256:f0e1d2c3b4a5f6e7d8c9b0a1f2e3d4c5b6a7f8e9d0c1b2a3f4e5d6c7b8a9f0e1", + "agent_id": "support-bot", + "agent_role": "Support Agent", + "tool": "search_docs", + "request_id": "req-abc-001", + "params_hash": "sha256:e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855", + "normalized_scope": "docs/public", + "intent_digest": "sha256:1a2b3c4d5e6f7a8b9c0d1e2f3a4b5c6d7e8f9a0b1c2d3e4f5a6b7c8d9e0f1a2b", + "normalization_id": "jcs-sha256", + "policy_refs": ["allow-read-tools-v1"], + "policy_digest": "sha256:policy-v1-hash", + "decision": "allow", + "reason": "Tool is in the agent's read allowlist", + "issued_at": "2026-06-25T14:00:00Z", + "seq": 0, + "running_count": 1, +} + +FIXTURE_DENY: GovernanceDecision = { + "decision_id": "d-002", + "intent_ref": "sha256:b2c3d4e5f6a7b8c9d0e1f2a3b4c5d6e7f8a9b0c1d2e3f4a5b6c7d8e9f0a1b2c3", + "receipt_ref": "sha256:e1d2c3b4a5f6e7d8c9b0a1f2e3d4c5b6a7f8e9d0c1b2a3f4e5d6c7b8a9f0e1d2", + "agent_id": "finance-agent", + "agent_role": "Finance Analyst", + "tool": "delete_customer", + "request_id": "req-abc-002", + "params_hash": "sha256:a8f3c91e4b2d7f6a1e9c3b5d8f2a4c6e0b7d9f1a3c5e7b9d1f3a5c7e9b0d2f4a", + "normalized_scope": "customers/all", + "policy_refs": ["deny-destructive-v1"], + "decision": "deny", + "reason": "Tool not in allowlist for Finance Analyst role", + "issued_at": "2026-06-25T14:01:00Z", + "seq": 1, + "running_count": 2, +} + +FIXTURE_REQUIRE_APPROVAL: GovernanceDecision = { + "decision_id": "d-003", + "intent_ref": "sha256:c3d4e5f6a7b8c9d0e1f2a3b4c5d6e7f8a9b0c1d2e3f4a5b6c7d8e9f0a1b2c3d4", + "receipt_ref": "sha256:d2c3b4a5f6e7d8c9b0a1f2e3d4c5b6a7f8e9d0c1b2a3f4e5d6c7b8a9f0e1d2c3", + "agent_id": "admin-agent", + "agent_role": "Admin", + "tool": "export_data", + "request_id": "req-abc-003", + "target": "customer_database", + "normalized_scope": "customers/eu", + "params_hash": "sha256:d5e6f7a8b9c0d1e2f3a4b5c6d7e8f9a0b1c2d3e4f5a6b7c8d9e0f1a2b3c4d5e6", + "intent_digest": "sha256:3c4d5e6f7a8b9c0d1e2f3a4b5c6d7e8f9a0b1c2d3e4f5a6b7c8d9e0f1a2b3c4d", + "continuation_id": "cont-003-approval-pending", + "normalization_id": "jcs-sha256", + "policy_refs": ["require-approval-exports-v1"], + "decision": "require_approval", + "reason": "Data export requires human sign-off", + "issued_at": "2026-06-25T14:05:00Z", + "expires_at": "2026-06-25T14:10:00Z", + "revalidate_if": ["target_state_change", "policy_version_change"], + "seq": 2, + "running_count": 3, +} + +FIXTURE_REVISE: GovernanceDecision = { + "decision_id": "d-005", + "intent_ref": "sha256:e5f6a7b8c9d0e1f2a3b4c5d6e7f8a9b0c1d2e3f4a5b6c7d8e9f0a1b2c3d4e5f6", + "receipt_ref": "sha256:c3b4a5f6e7d8c9b0a1f2e3d4c5b6a7f8e9d0c1b2a3f4e5d6c7b8a9f0e1d2c3b4", + "agent_id": "finance-agent", + "agent_role": "Finance Analyst", + "tool": "stripe.refund", + "request_id": "req-abc-005", + "params_hash": "sha256:c4d5e6f7a8b9c0d1e2f3a4b5c6d7e8f9a0b1c2d3e4f5a6b7c8d9e0f1a2b3c4d5", + "target": "payment_pmt_123", + "normalized_scope": "payments/refund", + "normalization_id": "jcs-sha256", + "policy_refs": ["refund-limit-v1"], + "decision": "revise", + "reason": "Refund amount exceeds $1000 limit. Reduce amount and re-submit.", + "issued_at": "2026-06-25T14:15:00Z", + "revalidate_if": ["amount_changed"], + "seq": 3, + "running_count": 4, +} + +FIXTURE_ALLOW_WITH_EXTENSION: GovernanceDecision = { + "decision_id": "d-004", + "intent_ref": "sha256:d4e5f6a7b8c9d0e1f2a3b4c5d6e7f8a9b0c1d2e3f4a5b6c7d8e9f0a1b2c3d4e5", + "receipt_ref": "sha256:b4a5f6e7d8c9b0a1f2e3d4c5b6a7f8e9d0c1b2a3f4e5d6c7b8a9f0e1d2c3b4a5", + "agent_id": "ops-agent", + "agent_role": "Operations", + "tool": "deploy_service", + "request_id": "req-abc-004", + "params_hash": "sha256:b3c4d5e6f7a8b9c0d1e2f3a4b5c6d7e8f9a0b1c2d3e4f5a6b7c8d9e0f1a2b3c4", + "normalized_scope": "infra/deploy", + "intent_digest": "sha256:4d5e6f7a8b9c0d1e2f3a4b5c6d7e8f9a0b1c2d3e4f5a6b7c8d9e0f1a2b3c4d5e", + "normalization_id": "jcs-sha256", + "policy_refs": ["allow-deploy-with-evidence-v1"], + "decision": "allow", + "reason": "Policy: scoped token and audit receipt present", + "issued_at": "2026-06-25T14:10:00Z", + "evidence_refs": ["tealtiger-receipt-004"], + "extensions": { + "tealtiger": { + "receipt_id": "tt-004", + "merkle_proof": "sha256:proof-hash-here", + "prev_hash": "sha256:f7a8b9c0d1e2f3a4b5c6d7e8", + "verifier_contract_version": "2.1.0", + }, + "vaara": { + "chain_hash": "sha256:vaara-chain-hash", + "contiguity_verified": True, + }, + }, + "seq": 4, + "running_count": 5, +} + +FIXTURE_UNKNOWN_EXTENSION: GovernanceDecision = { + "decision_id": "d-006", + "intent_ref": "sha256:f6a7b8c9d0e1f2a3b4c5d6e7f8a9b0c1d2e3f4a5b6c7d8e9f0a1b2c3d4e5f6a7", + "receipt_ref": "sha256:a5f6e7d8c9b0a1f2e3d4c5b6a7f8e9d0c1b2a3f4e5d6c7b8a9f0e1d2c3b4a5f6", + "agent_id": "test-agent", + "tool": "any_tool", + "normalized_scope": "test/scope", + "params_hash": "sha256:test-params-hash", + "normalization_id": "jcs-sha256", + "policy_refs": ["test-policy"], + "decision": "allow", + "reason": "Testing unknown extension round-trip", + "issued_at": "2026-06-25T14:20:00Z", + "extensions": { + "custom_vendor": { + "arbitrary_field": True, + "nested": {"deep": [1, 2, 3]}, + "unicode": "\u65e5\u672c\u8a9e\u30c6\u30b9\u30c8", + } + }, + "seq": 5, + "running_count": 6, +} + + +# ============================================================================= +# Section 2: Outcome Fixtures +# ============================================================================= + +FIXTURE_OUTCOME: GovernanceOutcome = { + "decision_id": "d-004", + "intent_ref": "sha256:d4e5f6a7b8c9d0e1f2a3b4c5d6e7f8a9b0c1d2e3f4a5b6c7d8e9f0a1b2c3d4e5", + "receipt_ref": "sha256:outcome-receipt-004", + "outcome": "executed", + "tool_output_hash": "sha256:d4e5f6a7b8c9d0e1f2a3b4c5d6e7f8a9b0c1d2e3f4a5b6c7d8e9f0a1b2c3d4e5", + "completed_at": "2026-06-25T14:10:02Z", + "seq": 4, +} + +FIXTURE_OUTCOME_ERROR: GovernanceOutcome = { + "decision_id": "d-002", + "intent_ref": "sha256:b2c3d4e5f6a7b8c9d0e1f2a3b4c5d6e7f8a9b0c1d2e3f4a5b6c7d8e9f0a1b2c3", + "receipt_ref": "sha256:outcome-receipt-002-error", + "outcome": "error", + "error_type": "ToolExecutionError", + "error_message": "Connection refused: database host unreachable", + "completed_at": "2026-06-25T14:01:03Z", + "seq": 1, +} + + +# ============================================================================= +# Section 3: Core Contract Tests +# ============================================================================= + + +def test_allow_fixture_is_valid_governance_decision() -> None: + """ALLOW decision contains required binding fields.""" + assert FIXTURE_ALLOW["decision"] == "allow" + assert "decision_id" in FIXTURE_ALLOW + assert "agent_id" in FIXTURE_ALLOW + assert "tool" in FIXTURE_ALLOW + assert "intent_ref" in FIXTURE_ALLOW + assert "params_hash" in FIXTURE_ALLOW + assert "normalization_id" in FIXTURE_ALLOW + assert "issued_at" in FIXTURE_ALLOW + is_valid, errors = validate_governance_decision(FIXTURE_ALLOW) + assert is_valid, f"Validation failed: {errors}" + + +def test_deny_fixture_is_valid_governance_decision() -> None: + """DENY decision contains policy reference explaining the denial.""" + assert FIXTURE_DENY["decision"] == "deny" + assert len(FIXTURE_DENY["policy_refs"]) > 0 + assert "reason" in FIXTURE_DENY + is_valid, errors = validate_governance_decision(FIXTURE_DENY) + assert is_valid, f"Validation failed: {errors}" + + +def test_require_approval_fixture_has_expiry() -> None: + """REQUIRE_APPROVAL decision includes expires_at and continuation_id.""" + assert FIXTURE_REQUIRE_APPROVAL["decision"] == "require_approval" + assert FIXTURE_REQUIRE_APPROVAL["expires_at"] is not None + assert FIXTURE_REQUIRE_APPROVAL["continuation_id"] is not None + is_valid, errors = validate_governance_decision(FIXTURE_REQUIRE_APPROVAL) + assert is_valid, f"Validation failed: {errors}" + + +def test_revise_fixture_has_revalidate_if() -> None: + """REVISE decision includes revalidate_if conditions (advisory only).""" + assert FIXTURE_REVISE["decision"] == "revise" + assert len(FIXTURE_REVISE["revalidate_if"]) > 0 + is_valid, errors = validate_governance_decision(FIXTURE_REVISE) + assert is_valid, f"Validation failed: {errors}" + + +def test_extension_round_trips_through_json() -> None: + """Extensions serialize to JSON and deserialize without data loss.""" + original = FIXTURE_ALLOW_WITH_EXTENSION + serialized = json.dumps(original) + deserialized = json.loads(serialized) + + assert deserialized["extensions"]["tealtiger"]["receipt_id"] == "tt-004" + assert deserialized["extensions"]["vaara"]["contiguity_verified"] is True + + +def test_unknown_extension_round_trips_without_validation_failure() -> None: + """Unknown vendor extensions pass through JSON round-trip unchanged.""" + original = FIXTURE_UNKNOWN_EXTENSION + serialized = json.dumps(original) + deserialized = json.loads(serialized) + + assert deserialized["extensions"]["custom_vendor"]["arbitrary_field"] is True + assert deserialized["extensions"]["custom_vendor"]["nested"]["deep"] == [1, 2, 3] + assert deserialized["extensions"]["custom_vendor"]["unicode"] == "\u65e5\u672c\u8a9e\u30c6\u30b9\u30c8" + + +def test_outcome_links_back_via_intent_ref() -> None: + """GovernanceOutcome references the authorizing decision via intent_ref.""" + assert FIXTURE_OUTCOME["intent_ref"] == FIXTURE_ALLOW_WITH_EXTENSION["intent_ref"] + assert FIXTURE_OUTCOME["outcome"] == "executed" + assert "completed_at" in FIXTURE_OUTCOME + + +def test_error_outcome_has_error_fields() -> None: + """Error outcome carries error_type and error_message.""" + assert FIXTURE_OUTCOME_ERROR["outcome"] == "error" + assert FIXTURE_OUTCOME_ERROR["error_type"] is not None + assert FIXTURE_OUTCOME_ERROR["error_message"] is not None + assert FIXTURE_OUTCOME_ERROR["intent_ref"] == FIXTURE_DENY["intent_ref"] + + +def test_all_fixtures_json_serializable() -> None: + """Every fixture round-trips through JSON without error.""" + fixtures: list[dict[str, Any]] = [ + FIXTURE_ALLOW, FIXTURE_DENY, FIXTURE_REQUIRE_APPROVAL, + FIXTURE_ALLOW_WITH_EXTENSION, FIXTURE_REVISE, + FIXTURE_OUTCOME, FIXTURE_OUTCOME_ERROR, FIXTURE_UNKNOWN_EXTENSION, + ] + for fixture in fixtures: + serialized = json.dumps(fixture) + deserialized = json.loads(serialized) + assert deserialized == fixture + + +# ============================================================================= +# Section 4: Validation Tests (Route-Specific Required Fields) +# ============================================================================= + + +def test_allow_missing_binding_fields_fails_validation() -> None: + """An ALLOW without binding fields fails validation.""" + minimal_allow: GovernanceDecision = { + "decision_id": "d-bad-001", + "decision": "allow", + "reason": "no binding fields", + } + is_valid, errors = validate_governance_decision(minimal_allow) + assert is_valid is False + assert any("agent_id" in e for e in errors) + assert any("tool" in e for e in errors) + assert any("intent_ref" in e or "params_hash" in e for e in errors) + + +def test_deny_missing_reason_fails_validation() -> None: + """A DENY without reason fails validation.""" + bad_deny: GovernanceDecision = { + "decision_id": "d-bad-002", + "tool": "some_tool", + "decision": "deny", + } + is_valid, errors = validate_governance_decision(bad_deny) + assert is_valid is False + assert any("reason" in e for e in errors) + + +def test_revise_missing_revalidate_if_fails_validation() -> None: + """A REVISE without revalidate_if fails validation.""" + bad_revise: GovernanceDecision = { + "decision_id": "d-bad-003", + "tool": "some_tool", + "decision": "revise", + "reason": "needs revision", + } + is_valid, errors = validate_governance_decision(bad_revise) + assert is_valid is False + assert any("revalidate_if" in e for e in errors) + + +def test_missing_decision_field_fails_validation() -> None: + """A record with no decision field fails validation.""" + no_decision: GovernanceDecision = { + "decision_id": "d-bad-004", + "tool": "some_tool", + "reason": "no decision field", + } + is_valid, errors = validate_governance_decision(no_decision) + assert is_valid is False + assert any("'decision'" in e for e in errors) + + +# ============================================================================= +# Section 5: Completeness / Omission Detection Tests (0-indexed) +# ============================================================================= + +FIXTURE_CONTIGUOUS_RUN: list[GovernanceDecision] = [ + {"decision_id": "d-101", "tool": "search", "decision": "allow", "reason": "ok", + "issued_at": "2026-06-25T10:00:00Z", "seq": 0, "running_count": 1}, + {"decision_id": "d-102", "tool": "calc", "decision": "allow", "reason": "ok", + "issued_at": "2026-06-25T10:00:01Z", "seq": 1, "running_count": 2}, + {"decision_id": "d-103", "tool": "write", "decision": "deny", "reason": "blocked", + "issued_at": "2026-06-25T10:00:02Z", "seq": 2, "running_count": 3}, +] + +FIXTURE_SEQ_GAP: list[GovernanceDecision] = [ + {"decision_id": "d-201", "tool": "search", "decision": "allow", "reason": "ok", + "issued_at": "2026-06-25T10:00:00Z", "seq": 0, "running_count": 1}, + {"decision_id": "d-202", "tool": "calc", "decision": "allow", "reason": "ok", + "issued_at": "2026-06-25T10:00:01Z", "seq": 1, "running_count": 2}, + # seq 2 missing -- provable interior gap + {"decision_id": "d-204", "tool": "deploy", "decision": "allow", "reason": "ok", + "issued_at": "2026-06-25T10:00:03Z", "seq": 3, "running_count": 4}, +] + +FIXTURE_RUNNING_COUNT_MISMATCH: list[GovernanceDecision] = [ + {"decision_id": "d-301", "tool": "search", "decision": "allow", "reason": "ok", + "issued_at": "2026-06-25T10:00:00Z", "seq": 0, "running_count": 1}, + # running_count 4 at seq 1 means running_count != seq + 1 -- malformed + {"decision_id": "d-302", "tool": "calc", "decision": "allow", "reason": "ok", + "issued_at": "2026-06-25T10:00:01Z", "seq": 1, "running_count": 4}, +] + + +def test_contiguous_run_passes_verification() -> None: + """A complete 0-indexed run with no gaps passes contiguity verification.""" + assert verify_contiguity(FIXTURE_CONTIGUOUS_RUN) is True + + +def test_gap_in_seq_fails_verification() -> None: + """A gap in seq (dropped record) is detected as incomplete.""" + assert verify_contiguity(FIXTURE_SEQ_GAP) is False + + +def test_running_count_mismatch_fails() -> None: + """running_count != seq + 1 is detected as malformed.""" + assert verify_contiguity(FIXTURE_RUNNING_COUNT_MISMATCH) is False + + +def test_seq_starts_at_zero() -> None: + """First decision in a run has seq=0, running_count=1.""" + first = FIXTURE_CONTIGUOUS_RUN[0] + assert first["seq"] == 0 + assert first["running_count"] == 1 + + +# ============================================================================= +# Section 6: GovernanceSeal and Tail-Drop Detection Tests +# ============================================================================= + +FIXTURE_SEAL: GovernanceSeal = { + "boundary_id": "crew-run-001", + "sealed": True, + "total": 3, + "final_seq": 2, + "sealed_at": "2026-06-25T10:00:05Z", + "seal_hash": "sha256:concat-of-d101-d102-d103", +} + +FIXTURE_TAIL_DROP_SEALED: list[dict[str, Any]] = [ + {"decision_id": "d-401", "tool": "search", "decision": "allow", + "reason": "ok", "seq": 0, "running_count": 1}, + {"decision_id": "d-402", "tool": "calc", "decision": "allow", + "reason": "ok", "seq": 1, "running_count": 2}, + {"decision_id": "d-403", "tool": "write", "decision": "allow", + "reason": "ok", "seq": 2, "running_count": 3}, + # Seal says 4 total, but only 3 held — tail drop detected + {"boundary_id": "crew-run-1", "sealed": True, "total": 4}, +] + +FIXTURE_TAIL_DROP_NO_SEAL: list[dict[str, Any]] = [ + {"decision_id": "d-401", "tool": "search", "decision": "allow", + "reason": "ok", "seq": 0, "running_count": 1}, + {"decision_id": "d-402", "tool": "calc", "decision": "allow", + "reason": "ok", "seq": 1, "running_count": 2}, + {"decision_id": "d-403", "tool": "write", "decision": "allow", + "reason": "ok", "seq": 2, "running_count": 3}, + # No seal — tail drop is invisible (the irreducible residual) +] + +FIXTURE_SEALED_WHOLE: list[dict[str, Any]] = [ + {"decision_id": "d-501", "tool": "search", "decision": "allow", + "reason": "ok", "seq": 0, "running_count": 1}, + {"decision_id": "d-502", "tool": "calc", "decision": "allow", + "reason": "ok", "seq": 1, "running_count": 2}, + {"decision_id": "d-503", "tool": "write", "decision": "deny", + "reason": "blocked", "seq": 2, "running_count": 3}, + {"boundary_id": "crew-run-2", "sealed": True, "total": 3}, +] + + +def test_sealed_whole_run_passes() -> None: + """A complete run with matching seal passes verification.""" + assert verify_contiguity(FIXTURE_SEALED_WHOLE) is True + + +def test_tail_drop_caught_by_seal() -> None: + """Seal pins total=4 but only 3 records held — tail drop detected.""" + assert verify_contiguity(FIXTURE_TAIL_DROP_SEALED) is False + + +def test_tail_drop_without_seal_is_the_residual() -> None: + """Without a seal, tail drop is invisible — this is the honest residual.""" + assert verify_contiguity(FIXTURE_TAIL_DROP_NO_SEAL) is True + + +def test_seal_with_external_parameter() -> None: + """verify_contiguity accepts an external seal parameter.""" + records = FIXTURE_CONTIGUOUS_RUN + seal = {"total": 3} + assert verify_contiguity(records, seal=seal) is True + + # Seal claims 5 but only 3 held + bad_seal = {"total": 5} + assert verify_contiguity(records, seal=bad_seal) is False + + +# ============================================================================= +# Section 7: Intent Binding / TOCTOU Closure Tests +# ============================================================================= + + +def test_intent_ref_stable_across_retries() -> None: + """Same authorized intent with different timestamps produces same intent_ref. + + intent_ref = SHA-256(JCS({agent_id, tool, normalized_scope, intent_digest, + idempotency_key})) — no timestamp. Retries don't change it. + """ + # Two decisions for the same intent, different issued_at + decision_1: GovernanceDecision = { + "decision_id": "d-retry-001", + "intent_ref": "sha256:same-intent-hash", + "receipt_ref": "sha256:receipt-attempt-1", + "agent_id": "bot-1", + "tool": "search", + "normalized_scope": "docs/public", + "intent_digest": "sha256:intent-abc", + "normalization_id": "jcs-sha256", + "policy_refs": ["allow-v1"], + "decision": "allow", + "reason": "ok", + "issued_at": "2026-06-25T10:00:00Z", + "seq": 0, "running_count": 1, + } + decision_2: GovernanceDecision = { + **decision_1, + "decision_id": "d-retry-002", + "receipt_ref": "sha256:receipt-attempt-2", # different + "issued_at": "2026-06-25T10:00:05Z", # different timestamp + "seq": 1, "running_count": 2, + } + # Same intent_ref despite different timestamps + assert decision_1["intent_ref"] == decision_2["intent_ref"] + # Different receipt_ref (per-record uniqueness) + assert decision_1["receipt_ref"] != decision_2["receipt_ref"] + + +def test_intent_digest_mismatch_means_different_intent_ref() -> None: + """Changed args produce a different intent_digest → different intent_ref.""" + original_intent_ref = "sha256:original-intent" + mutated_intent_ref = "sha256:mutated-intent" + # If intent_digest changes, intent_ref MUST change + assert original_intent_ref != mutated_intent_ref + + +def test_target_state_digest_drift_requires_revalidation() -> None: + """If target_state_digest changes, executor must revalidate.""" + decision: GovernanceDecision = { + "decision_id": "d-drift-001", + "intent_ref": "sha256:drift-intent", + "agent_id": "bot-1", + "tool": "update_customer", + "target": "customer/123", + "target_state_digest": "sha256:state-at-auth-time", + "normalized_scope": "customers/write", + "params_hash": "sha256:params-hash", + "normalization_id": "jcs-sha256", + "policy_refs": ["allow-update-v1"], + "decision": "allow", + "reason": "authorized", + "issued_at": "2026-06-25T10:00:00Z", + "seq": 0, "running_count": 1, + } + # Simulated current state at execution time + current_state_digest = "sha256:state-has-drifted" + # Contract invariant: mismatch requires revalidation + assert decision["target_state_digest"] != current_state_digest + # Executor should NOT proceed — must revalidate + + +def test_continuation_id_mismatch_denies_resume() -> None: + """A deferred action resumed with wrong continuation_id is denied.""" + original_decision: GovernanceDecision = { + "decision_id": "d-defer-001", + "intent_ref": "sha256:defer-intent", + "agent_id": "bot-1", + "tool": "export_data", + "normalized_scope": "data/export", + "params_hash": "sha256:params", + "normalization_id": "jcs-sha256", + "policy_refs": ["require-approval-v1"], + "decision": "require_approval", + "reason": "needs human sign-off", + "issued_at": "2026-06-25T10:00:00Z", + "continuation_id": "cont-original-abc", + "seq": 0, "running_count": 1, + } + # Attempt to resume with a different continuation_id + resume_continuation = "cont-WRONG-xyz" + assert original_decision["continuation_id"] != resume_continuation + # Contract invariant: executor must deny (CONTINUATION_MISMATCH) + + +def test_idempotency_prevents_double_execution() -> None: + """Same decision_id + intent_ref with existing terminal outcome = deny.""" + # Decision already has a terminal outcome + executed_outcome: GovernanceOutcome = { + "decision_id": "d-idem-001", + "intent_ref": "sha256:idem-intent", + "outcome": "executed", + "completed_at": "2026-06-25T10:00:02Z", + "seq": 0, + } + # A second execution attempt against the same authorization + # Contract invariant: must be denied (IDEMPOTENCY_VIOLATION) + assert executed_outcome["outcome"] == "executed" + # Any system seeing an existing terminal outcome for this intent_ref + # MUST deny a second execution attempt + + +def test_expired_authorization_denies() -> None: + """An authorization past its expires_at must be denied.""" + decision: GovernanceDecision = { + "decision_id": "d-expired-001", + "intent_ref": "sha256:expired-intent", + "agent_id": "bot-1", + "tool": "deploy", + "normalized_scope": "infra/deploy", + "params_hash": "sha256:params", + "normalization_id": "jcs-sha256", + "policy_refs": ["allow-deploy-v1"], + "decision": "allow", + "reason": "authorized", + "issued_at": "2026-06-25T10:00:00Z", + "expires_at": "2026-06-25T10:05:00Z", + "seq": 0, "running_count": 1, + } + # Simulated current time is past expiry + current_time = "2026-06-25T10:06:00Z" + assert decision["expires_at"] < current_time + # Contract invariant: fail closed (AUTHORIZATION_EXPIRED) + + +# ============================================================================= +# Section 8: intent_ref / receipt_ref Identity Split Tests +# ============================================================================= + + +def test_intent_ref_is_join_key_between_decision_and_outcome() -> None: + """GovernanceDecision and GovernanceOutcome join via intent_ref.""" + assert FIXTURE_ALLOW_WITH_EXTENSION["intent_ref"] == FIXTURE_OUTCOME["intent_ref"] + + +def test_receipt_ref_unique_per_record() -> None: + """Every record has a unique receipt_ref (includes timestamp).""" + all_receipt_refs = [ + FIXTURE_ALLOW["receipt_ref"], + FIXTURE_DENY["receipt_ref"], + FIXTURE_REQUIRE_APPROVAL["receipt_ref"], + FIXTURE_ALLOW_WITH_EXTENSION["receipt_ref"], + FIXTURE_REVISE["receipt_ref"], + ] + assert len(all_receipt_refs) == len(set(all_receipt_refs)) + + +def test_same_intent_different_audit_timestamps_same_intent_ref() -> None: + """Audit timestamp changes must not alter semantic intent identity.""" + # This is the key invariant: intent_ref excludes timestamp + # Two records for the same intent at different times + intent_ref_a = "sha256:stable-semantic-identity" + intent_ref_b = "sha256:stable-semantic-identity" + assert intent_ref_a == intent_ref_b + + +def test_different_scope_different_intent_ref() -> None: + """Changed normalized_scope produces a different intent_ref.""" + # If scope changes, intent_ref MUST change — otherwise it's a bypass + scope_a_ref = "sha256:intent-with-scope-a" + scope_b_ref = "sha256:intent-with-scope-b" + assert scope_a_ref != scope_b_ref + + +# ============================================================================= +# Section 9: Deny is a First-Class Record Test +# ============================================================================= + + +def test_deny_is_a_positive_record() -> None: + """A DENY produces a full decision record, not an absence. + + This is the deny-as-record property: a blocked call leaves a + recomputable record that reads differently from a call that was + simply never made. A reviewer can tell 'denied and recorded' from + 'never observed'. + """ + assert FIXTURE_DENY["decision"] == "deny" + assert "decision_id" in FIXTURE_DENY + assert "intent_ref" in FIXTURE_DENY + assert "receipt_ref" in FIXTURE_DENY + assert "seq" in FIXTURE_DENY + assert "running_count" in FIXTURE_DENY + # Deny records participate in the completeness sequence + # just like allow records + + +# ============================================================================= +# Section 10: normalization_id Tests +# ============================================================================= + + +def test_normalization_id_identifies_hash_scheme() -> None: + """normalization_id tells a verifier how to recompute params_hash.""" + assert FIXTURE_ALLOW["normalization_id"] == "jcs-sha256" + # Other valid values: "agent-guard-unwrap-v1", "sql-normalize-v1" + + +def test_all_fixtures_carry_normalization_id() -> None: + """All decision fixtures include normalization_id.""" + fixtures = [ + FIXTURE_ALLOW, FIXTURE_DENY, FIXTURE_REQUIRE_APPROVAL, + FIXTURE_ALLOW_WITH_EXTENSION, FIXTURE_REVISE, FIXTURE_UNKNOWN_EXTENSION, + ] + for f in fixtures: + assert "normalization_id" in f, f"Missing normalization_id in {f['decision_id']}" + + +# ============================================================================= +# Section 11: Revise is Advisory-Only Tests +# ============================================================================= + + +def test_revise_is_non_executable() -> None: + """REVISE emits feedback and creates no side effect. + + Executing a revised action requires a fresh decision_id and digest. + revise is advisory-only: no outcome with executed=true should exist + for a revise decision without a new decision being issued first. + """ + assert FIXTURE_REVISE["decision"] == "revise" + # A revise decision should NEVER have an outcome with "executed" + # without a subsequent allow decision being issued + + +# ============================================================================= +# Section 12: seq and running_count Round-Trip Tests +# ============================================================================= + + +def test_seq_and_running_count_round_trip() -> None: + """seq and running_count fields survive JSON serialization.""" + for record in FIXTURE_CONTIGUOUS_RUN: + deserialized = json.loads(json.dumps(record)) + assert deserialized["seq"] == record["seq"] + assert deserialized["running_count"] == record["running_count"] + + +def test_running_count_equals_seq_plus_one() -> None: + """For every record, running_count == seq + 1 (0-indexed invariant).""" + all_records = FIXTURE_CONTIGUOUS_RUN + [ + FIXTURE_ALLOW, FIXTURE_DENY, FIXTURE_REQUIRE_APPROVAL, + FIXTURE_ALLOW_WITH_EXTENSION, FIXTURE_REVISE, FIXTURE_UNKNOWN_EXTENSION, + ] + for record in all_records: + assert record["running_count"] == record["seq"] + 1, ( + f"Record {record.get('decision_id')}: " + f"running_count={record['running_count']} != seq+1={record['seq'] + 1}" + ) + + +def test_outcome_carries_seq_back_reference() -> None: + """GovernanceOutcome carries the same seq as its linked decision.""" + assert FIXTURE_OUTCOME["seq"] == FIXTURE_ALLOW_WITH_EXTENSION["seq"] + assert FIXTURE_OUTCOME_ERROR["seq"] == FIXTURE_DENY["seq"] + + +def test_empty_records_passes_verification() -> None: + """An empty record list passes verification (vacuously true).""" + assert verify_contiguity([]) is True