Skip to content

tblakex01/policyforge

Repository files navigation

Python 3.10+ MIT License Tests Dependencies

PolicyForge

Local policy engine for AI agent tool-call gating with optional multi-cloud sync.

PolicyForge evaluates every AI agent tool call against YAML-defined policies locally — zero network hops, zero cloud dependencies for security decisions. Policies are version-controlled, human-readable, and sync across AWS S3, Azure Blob Storage, and OCI Object Storage.

Agent calls tool → PolicyForge evaluates locally → ALLOW / DENY / LOG_ONLY
                                                        ↓
                                              HMAC-signed audit trail

Why PolicyForge?

Most agent security tools delegate decisions to a remote API. That means your security posture depends on someone else's uptime, latency, and infrastructure. PolicyForge takes a different approach:

  • All evaluation happens locally — no network calls in the decision path
  • Fail-closed by default — if something goes wrong, the tool call is denied
  • Framework-agnostic — works with MS Foundry Agents, LangChain, OpenAI, or any callable
  • HMAC-signed audit trail — tamper-evident logs with hash chaining
  • Multi-cloud sync — distribute policies from S3, Azure Blob, or OCI Object Storage
  • Single dependency — just PyYAML for the core engine

Quick Start

Install

pip install policyforge

# Optional cloud sync providers
pip install policyforge[aws]          # S3
pip install policyforge[azure]        # Azure Blob Storage
pip install policyforge[oci]          # OCI Object Storage
pip install policyforge[all-clouds]   # All three

Define a Policy

# policies/security.yaml
name: default-security
fail_mode: closed
default_verdict: DENY

rules:
  - name: block-shell-exec
    priority: 10
    verdict: DENY
    message: "Shell execution is blocked by policy."
    match_strategy: any
    conditions:
      - field: tool_name
        operator: in
        value: ["run_shell", "bash", "exec"]

  - name: block-internal-network
    priority: 20
    verdict: DENY
    message: "Requests to internal networks are blocked."
    conditions:
      - field: tool_name
        operator: eq
        value: http_request
      - field: args.url
        operator: regex
        value: "https?://(10\\.|172\\.(1[6-9]|2[0-9]|3[01])\\.|192\\.168\\.)"

Gate Your Tools

from policyforge import PolicyEngine
from policyforge.decorators import policy_gate, PolicyDeniedError

engine = PolicyEngine(policy_paths=["./policies"])

# Decorator approach
@policy_gate(engine, tool_name="web_search")
def web_search(query: str) -> list[str]:
    return search(query)

# Wrapper approach (for framework tool registries)
from policyforge.decorators import PolicyGateWrapper

wrapper = PolicyGateWrapper(engine)
safe_tools = wrapper.wrap_dict({
    "search": search_fn,
    "read_file": read_fn,
    "write_file": write_fn,
})

# Direct evaluation
decision = engine.evaluate("delete_records", args={"count": 500})
if decision.verdict.value == "DENY":
    print(f"Blocked: {decision.message}")
    print(engine.render_share_receipt(decision))

Share a Policy Receipt

When a tool call is denied or flagged as LOG_ONLY, render a sanitized Markdown receipt for Slack, tickets, or PRs without exposing raw tool arguments:

decision = engine.evaluate("run_shell", {"command": "rm -rf /tmp/demo"})
receipt = engine.render_share_receipt(decision)
print(receipt)

The receipt includes the verdict, tool name, policy, matched rule, request ID, agent ID, args hash, and message. If an AuditLogger is attached, PolicyForge also writes a share_receipt_generated event so you can measure which blocked decisions become shareable escalation moments.


Architecture

┌─────────────────────────────────────────────────────┐
│                    Your AI Agent                     │
│  (MS Foundry / LangChain / OpenAI / Custom)         │
└──────────────────────┬──────────────────────────────┘
                       │ tool call
                       ▼
┌─────────────────────────────────────────────────────┐
│                   PolicyForge                        │
│                                                      │
│  ┌─────────────┐  ┌──────────────┐  ┌────────────┐ │
│  │ YAML Loader  │→│ Policy Engine │→│ Audit Logger│ │
│  │ + Validation │  │ (local eval)  │  │ (HMAC+chain)│ │
│  └──────┬──────┘  └──────────────┘  └────────────┘ │
│         │                                            │
│  ┌──────┴──────────────────────────────────────────┐│
│  │          Cloud Sync (optional)                   ││
│  │  ┌─────┐  ┌────────────┐  ┌──────────────────┐ ││
│  │  │ S3  │  │ Azure Blob │  │ OCI Obj. Storage │ ││
│  │  └─────┘  └────────────┘  └──────────────────┘ ││
│  └─────────────────────────────────────────────────┘│
└─────────────────────────────────────────────────────┘

Policy Reference

Policy Structure

Field Type Default Description
name string required Unique policy identifier
description string "" Human-readable summary
rules list [] Ordered evaluation rules
default_verdict ALLOW | DENY | LOG_ONLY DENY Verdict when no rule matches
fail_mode closed | open | log closed Behavior on evaluation error
version string "1.0.0" Policy version for tracking
enabled bool true Master on/off switch

Rule Structure

Field Type Default Description
name string required Rule identifier
conditions list required At least one condition
verdict ALLOW | DENY | LOG_ONLY DENY Verdict when rule matches
match_strategy all | any all AND vs OR for conditions
priority int 100 Lower = evaluated first
message string "" Explanation on match

Condition Operators

Operator Description Example
eq Equals tool_name eq "bash"
neq Not equals env neq "production"
in Value in list tool_name in ["bash", "exec"]
not_in Value not in list role not_in ["admin"]
contains String contains args.sql contains "DROP"
regex Regex match args.url regex "https?://10\\."
gt / lt Greater / less than args.count gt 100
gte / lte Greater/less or equal args.amount lte 1000

Audit Trail

PolicyForge writes every decision to a HMAC-SHA256 signed JSON-lines log with hash chaining for tamper detection.

from policyforge import AuditLogger

audit = AuditLogger(
    log_dir="./audit_logs",
    hmac_key="your-secret-key",     # or set POLICYFORGE_HMAC_KEY env var
    chain_hashes=True,               # blockchain-style tamper detection
)

engine = PolicyEngine(
    policy_paths=["./policies"],
    audit_logger=audit,
)

# Verify log integrity
valid, tampered = audit.verify_log()
print(f"{valid} valid, {tampered} tampered")

Each log entry contains: timestamp, request ID, tool name, agent ID, args hash (SHA-256, not raw args), verdict, matched rule, policy name, evaluation time, and HMAC signature. Share receipt generation is logged as an event record with event="share_receipt_generated" and metadata including the verdict and receipt format, which gives you a lightweight funnel from denied decisions to internal sharing without adding a separate analytics dependency.


Threat Model

PolicyForge gates agent tool calls against three attack classes, each addressed by a dedicated subsystem. This release ships the first.

Tool Fingerprint Pinning (this release)

Defends against MCP tool poisoning, rug-pull (tool definitions changing after approval), typosquatting, and cross-server name shadowing.

Every call's (server_id, name, schema_hash, description_hash) is compared against a project-local, HMAC-chained approvals ledger at .policyforge/approvals.jsonl. Drift or Unicode shadowing (Cyrillic/Greek homoglyphs, NFKC collisions) short-circuits evaluation with DENY before any rule runs. The approvals ledger itself is tamper-evident: writer-open verifies the full chain and refuses to continue if any entry has been altered.

tool_trust:
  mode: enforce
  ledger_path: .policyforge/approvals.jsonl
  on_mismatch: DENY
  on_unknown: DENY
  detect_shadowing:
    nfkc: true
    confusables: true
from policyforge import PolicyEngine, ToolMetadata, TrustConfig, TrustManager, TrustMode

trust = TrustManager(
    TrustConfig(mode=TrustMode.ENFORCE),
    hmac_key="your-secret",
)
engine = PolicyEngine(policy_paths=["policies/"], trust_manager=trust)

decision = engine.evaluate(
    tool_name="create_issue",
    args={"title": "..."},
    context={
        "tool": ToolMetadata(
            server_id="mcp://github",
            schema_hash="<sha256 of the tool's input schema>",
            description_hash="<sha256 of the tool's description>",
        )
    },
)

See policyforge/policies/tool_trust_example.yaml for a complete annotated configuration.

Provenance-Tagged Args (next release)

Will defend indirect prompt injection and confused-deputy attacks by letting rules deny based on the origin of an argument (user, web, rag, tool output) rather than its content.

Lethal-Trifecta Detector (future)

Will defend exfiltration chains (read private data → ingest untrusted content → post externally) by maintaining per-session capability state and denying the call that would close the trifecta.


Cloud Sync

Sync policies across your multi-cloud environment. The sync layer is strictly for policy distribution — security decisions are always made locally. Remote subdirectories are preserved locally, and unchanged-file skips use provider-specific checksums when the backend exposes one. PolicyForge preserves nested policy directories during pull and push, so teams can organize policies by environment or business unit without filename collisions.

from policyforge.sync import SyncManager
from policyforge.sync.s3 import S3SyncProvider
from policyforge.sync.azure_blob import AzureBlobSyncProvider
from policyforge.sync.oci_os import OCISyncProvider

sync = SyncManager(local_dir="./policies")

sync.add_provider(S3SyncProvider(
    bucket="corp-ai-policies",
    prefix="agents/prod/",
    region="us-east-1",
))

sync.add_provider(AzureBlobSyncProvider(
    container="policies",
    account_url="https://corpstore.blob.core.windows.net",
))

sync.add_provider(OCISyncProvider(
    namespace="corp-tenancy",
    bucket="ai-policies",
    prefix="prod/",
))

# Pull latest policies from all providers
results = sync.pull()
for r in results:
    print(f"{r.provider}: {r.downloaded} updated, errors={r.errors}")

# Reload the engine with fresh policies
engine.reload(["./policies"])

MS Foundry Agents Integration

from policyforge import PolicyEngine, AuditLogger
from policyforge.decorators import PolicyGateWrapper

engine = PolicyEngine(policy_paths=["./policies"])
gate = PolicyGateWrapper(engine, extra_context={"environment": "production"})

# Wrap your Foundry Agent tool functions
gated_tools = gate.wrap_dict({
    "search_reservations": search_reservations,
    "send_guest_email": send_guest_email,
    "adjust_loyalty_points": adjust_loyalty_points,
})

# Register gated_tools with your Foundry Agent instead of the originals.
# Denied calls raise PolicyDeniedError — catch it in your tool-execution
# loop and return a safe response to the agent.

Development

git clone https://github.com/tblakex01/policyforge.git
cd policyforge
pip install -e ".[dev]"
pytest -v

License

MIT

About

Local policy engine for AI agent tool-call gating with optional multi-cloud sync. Framework-agnostic, fail-closed by default, HMAC-signed audit trail.

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Packages

 
 
 

Contributors

Languages