diff --git a/examples/agentsec/2-agent-frameworks/google-adk-agent/agent.py b/examples/agentsec/2-agent-frameworks/google-adk-agent/agent.py new file mode 100644 index 0000000..6d63639 --- /dev/null +++ b/examples/agentsec/2-agent-frameworks/google-adk-agent/agent.py @@ -0,0 +1,185 @@ +#!/usr/bin/env python3 +# Copyright 2026 Cisco Systems, Inc. and its affiliates +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# SPDX-License-Identifier: Apache-2.0 +""" +Google ADK agent with Cisco AI Defense (agentsec) protection. + +This example demonstrates how to use agentsec with the Google Agent Development +Kit (ADK). ADK uses the google-genai SDK internally for Gemini model calls and +the mcp SDK for MCP tool interactions — both are automatically patched by +agentsec.protect(). + +agentsec intercepts: + - LLM calls: AsyncModels.generate_content / generate_content_stream + - MCP calls: ClientSession.call_tool / get_prompt / read_resource + +Usage: + python agent.py + +Gemini authentication (one of the two): + Option A — Gemini Developer API: + GOOGLE_API_KEY: Gemini API key + + Option B — Vertex AI (current .env setup): + GOOGLE_CLOUD_PROJECT: GCP project ID (e.g. gcp-aiteamgcp-nprd-22046) + GOOGLE_CLOUD_LOCATION: GCP region (e.g. us-central1) + + Application Default Credentials (run: gcloud auth application-default login) + +Environment variables (loaded from ../../.env): + AGENTSEC_API_MODE_LLM: LLM inspection mode (monitor | enforce | off) + AGENTSEC_API_MODE_MCP: MCP inspection mode (monitor | enforce | off) + AI_DEFENSE_API_MODE_LLM_API_KEY: Cisco AI Defense API key + AI_DEFENSE_API_MODE_LLM_ENDPOINT: AI Defense API endpoint + MCP_SERVER_URL: Remote MCP server URL (StreamableHTTP) +""" + +import asyncio +import logging +import os +from pathlib import Path + +from dotenv import load_dotenv + +env_file = Path(__file__).resolve().parent.parent.parent / ".env" +if env_file.exists(): + load_dotenv(env_file) + print(f"Loaded environment from {env_file}", flush=True) + +# Auto-enable Vertex AI mode for google-genai SDK when GCP project is configured +if os.environ.get("GOOGLE_CLOUD_PROJECT") and not os.environ.get("GOOGLE_API_KEY"): + os.environ.setdefault("GOOGLE_GENAI_USE_VERTEXAI", "1") + +# ── Enable protection BEFORE importing ADK or any LLM / MCP clients ── +from aidefense.runtime import agentsec +from aidefense.runtime.agentsec.exceptions import SecurityPolicyError + +llm_mode = os.environ.get("AGENTSEC_API_MODE_LLM", "monitor") +mcp_mode = os.environ.get("AGENTSEC_API_MODE_MCP", "monitor") +agentsec.protect( + api_mode={ + "llm": {"mode": llm_mode}, + "mcp": {"mode": mcp_mode}, + }, +) + +# ── Now import ADK (google-genai and mcp are already patched) ── +from google.adk.agents import LlmAgent +from google.adk.runners import Runner +from google.adk.sessions import InMemorySessionService +from google.adk.tools.mcp_tool import McpToolset +from google.adk.tools.mcp_tool.mcp_session_manager import StreamableHTTPConnectionParams +from google.genai import types + +logging.basicConfig( + level=logging.DEBUG, + format="[%(name)s] %(levelname)s: %(message)s", +) +logger = logging.getLogger(__name__) + +APP_NAME = "adk_agentsec_demo" + + +def _build_mcp_toolset() -> McpToolset | None: + """Build an MCP toolset from MCP_SERVER_URL if configured.""" + mcp_url = os.environ.get("MCP_SERVER_URL") + if not mcp_url: + logger.debug("MCP_SERVER_URL not set — running without MCP tools") + return None + + return McpToolset( + connection_params=StreamableHTTPConnectionParams(url=mcp_url), + ) + + +async def main() -> None: + """Run a single-turn ADK agent with agentsec protection.""" + patched = agentsec.get_patched_clients() + print(f"Patched clients: {patched}", flush=True) + + if os.environ.get("GOOGLE_API_KEY"): + print("Gemini backend: Developer API (GOOGLE_API_KEY)", flush=True) + elif os.environ.get("GOOGLE_CLOUD_PROJECT"): + print( + f"Gemini backend: Vertex AI " + f"(project={os.environ['GOOGLE_CLOUD_PROJECT']}, " + f"location={os.environ.get('GOOGLE_CLOUD_LOCATION', 'us-central1')})", + flush=True, + ) + else: + print( + "WARNING: Neither GOOGLE_API_KEY nor GOOGLE_CLOUD_PROJECT is set. " + "ADK will fail to initialize the Gemini client.", + flush=True, + ) + + tools: list = [] + mcp_toolset = _build_mcp_toolset() + if mcp_toolset is not None: + tools.append(mcp_toolset) + + model = os.environ.get("GEMINI_MODEL", "gemini-2.5-flash") + logger.info(f"Using model: {model}") + + agent = LlmAgent( + model=model, + name="secure_assistant", + instruction=( + "You are a helpful assistant protected by Cisco AI Defense. " + "Answer questions concisely. If you have access to tools, use them " + "when the user's request would benefit from external data." + ), + tools=tools, + ) + + session_service = InMemorySessionService() + runner = Runner( + app_name=APP_NAME, + agent=agent, + session_service=session_service, + ) + + session = await session_service.create_session( + state={}, app_name=APP_NAME, user_id="demo_user" + ) + + query = "Summarize the benefits of zero-trust security in two sentences." + print(f"\nUser: {query}", flush=True) + + content = types.Content(role="user", parts=[types.Part(text=query)]) + + try: + events = runner.run_async( + session_id=session.id, + user_id=session.user_id, + new_message=content, + ) + async for event in events: + if event.content and event.content.parts: + for part in event.content.parts: + if hasattr(part, "text") and part.text: + print(f"Assistant: {part.text}", flush=True) + except SecurityPolicyError as exc: + logger.warning(f"Blocked by AI Defense policy: {exc}") + print(f"\n⛔ Request blocked by Cisco AI Defense: {exc}", flush=True) + + if mcp_toolset is not None: + await mcp_toolset.close() + + print("\nDone — all calls were inspected by Cisco AI Defense.", flush=True) + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/examples/agentsec/2-agent-frameworks/google-adk-agent/pyproject.toml b/examples/agentsec/2-agent-frameworks/google-adk-agent/pyproject.toml new file mode 100644 index 0000000..7fd5bdc --- /dev/null +++ b/examples/agentsec/2-agent-frameworks/google-adk-agent/pyproject.toml @@ -0,0 +1,41 @@ +# Copyright 2026 Cisco Systems, Inc. and its affiliates +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# SPDX-License-Identifier: Apache-2.0 + +[build-system] +requires = ["poetry-core>=1.0.0"] +build-backend = "poetry.core.masonry.api" + +[tool.poetry] +name = "google-adk-agentsec-example" +version = "0.1.0" +description = "Google ADK agent example with agentsec protection" +authors = ["Cisco "] + +[tool.poetry.dependencies] +python = ">=3.10,<3.14" +cisco-aidefense-sdk = { path = "../../../..", develop = true } + +# Google Agent Development Kit +google-adk = ">=1.0.0" + +# Google GenAI SDK (used by ADK for Gemini model calls) +google-genai = ">=1.14.0" + +# MCP client library for tool integration +mcp = ">=1.6.0" + +# Environment variable loading from .env files +python-dotenv = ">=1.0.0" diff --git a/examples/agentsec/2-agent-frameworks/google-adk-agent/scripts/run.sh b/examples/agentsec/2-agent-frameworks/google-adk-agent/scripts/run.sh new file mode 100755 index 0000000..031faca --- /dev/null +++ b/examples/agentsec/2-agent-frameworks/google-adk-agent/scripts/run.sh @@ -0,0 +1,29 @@ +#!/usr/bin/env bash +# Copyright 2026 Cisco Systems, Inc. and its affiliates +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# SPDX-License-Identifier: Apache-2.0 + +# Run the Google ADK agentsec example. +# +# Usage: +# ./scripts/run.sh # defaults to monitor mode +# AGENTSEC_API_MODE_LLM=enforce ./scripts/run.sh +set -euo pipefail + +SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" +AGENT_DIR="$(dirname "$SCRIPT_DIR")" + +cd "$AGENT_DIR" +python agent.py "$@" diff --git a/examples/agentsec/2-agent-frameworks/google-adk-agent/tests/__init__.py b/examples/agentsec/2-agent-frameworks/google-adk-agent/tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/examples/agentsec/2-agent-frameworks/google-adk-agent/tests/unit/__init__.py b/examples/agentsec/2-agent-frameworks/google-adk-agent/tests/unit/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/examples/agentsec/2-agent-frameworks/google-adk-agent/tests/unit/test_google_adk_example.py b/examples/agentsec/2-agent-frameworks/google-adk-agent/tests/unit/test_google_adk_example.py new file mode 100644 index 0000000..dd75100 --- /dev/null +++ b/examples/agentsec/2-agent-frameworks/google-adk-agent/tests/unit/test_google_adk_example.py @@ -0,0 +1,302 @@ +# Copyright 2026 Cisco Systems, Inc. and its affiliates +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# SPDX-License-Identifier: Apache-2.0 +""" +Tests for the Google ADK Agent Example. + +This module tests: +- Basic example structure and imports +- agentsec protection and patching +- Google ADK agent setup (LlmAgent, Runner) +- MCP tool integration via McpToolset +- Error handling (SecurityPolicyError) +""" + +import ast +import os + +import pytest + + +# --------------------------------------------------------------------------- +# Fixtures +# --------------------------------------------------------------------------- + +EXAMPLE_DIR = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) +"""Path to the google-adk-agent example directory.""" + + +@pytest.fixture(scope="module") +def example_file(): + """Path to the example agent.py file.""" + return os.path.join(EXAMPLE_DIR, "agent.py") + + +@pytest.fixture(scope="module") +def example_code(example_file): + """Read the example file source code.""" + with open(example_file) as f: + return f.read() + + +@pytest.fixture(scope="module") +def example_ast(example_code): + """Parse the example file into an AST.""" + return ast.parse(example_code) + + +# --------------------------------------------------------------------------- +# File structure +# --------------------------------------------------------------------------- + +class TestFileStructure: + """Tests for file structure and basic setup.""" + + def test_agent_py_exists(self): + """Test that agent.py exists.""" + assert os.path.exists(os.path.join(EXAMPLE_DIR, "agent.py")), \ + "agent.py should exist" + + def test_pyproject_toml_exists(self): + """Test that pyproject.toml exists with correct dependencies.""" + pyproject_file = os.path.join(EXAMPLE_DIR, "pyproject.toml") + assert os.path.exists(pyproject_file), "pyproject.toml should exist" + + with open(pyproject_file) as f: + content = f.read() + + assert "cisco-aidefense-sdk" in content, "Should require cisco-aidefense-sdk" + assert "google-adk" in content, "Should require google-adk" + assert "google-genai" in content, "Should require google-genai" + assert "python-dotenv" in content, "Should require python-dotenv" + assert "mcp" in content, "Should require mcp" + + def test_run_script_exists(self): + """Test that scripts/run.sh exists.""" + script_file = os.path.join(EXAMPLE_DIR, "scripts", "run.sh") + assert os.path.exists(script_file), "scripts/run.sh should exist" + + +# --------------------------------------------------------------------------- +# Import ordering +# --------------------------------------------------------------------------- + +class TestImportOrder: + """Tests for correct import ordering in the example.""" + + def test_dotenv_before_agentsec(self, example_code): + """Test that dotenv is imported and called early.""" + lines = example_code.splitlines() + + dotenv_line = next( + (i for i, l in enumerate(lines) if "load_dotenv" in l and "import" in l), None + ) + agentsec_line = next( + (i for i, l in enumerate(lines) if "from aidefense.runtime import agentsec" in l), + None, + ) + assert dotenv_line is not None, "Should call load_dotenv()" + assert agentsec_line is not None, "Should import agentsec" + assert dotenv_line < agentsec_line, \ + "load_dotenv() should be called before agentsec import" + + def test_agentsec_before_adk(self, example_code): + """Test that agentsec is imported before ADK.""" + lines = example_code.splitlines() + + agentsec_line = next( + (i for i, l in enumerate(lines) if "from aidefense.runtime import agentsec" in l), + None, + ) + adk_line = next( + (i for i, l in enumerate(lines) + if "from google.adk" in l or "from google.genai" in l), + None, + ) + assert agentsec_line is not None, "Should import agentsec" + assert adk_line is not None, "Should import google.adk" + assert agentsec_line < adk_line, \ + "agentsec should be imported before Google ADK" + + def test_protect_before_adk(self, example_code): + """Test that agentsec.protect() is called before ADK import.""" + lines = example_code.splitlines() + + protect_line = next( + (i for i, l in enumerate(lines) if "agentsec.protect" in l), None + ) + adk_line = next( + (i for i, l in enumerate(lines) + if "from google.adk" in l or "from google.genai" in l), + None, + ) + assert protect_line is not None, "Should call agentsec.protect()" + assert adk_line is not None, "Should import Google ADK" + assert protect_line < adk_line, \ + "agentsec.protect() should be called before ADK import" + + +# --------------------------------------------------------------------------- +# agentsec integration +# --------------------------------------------------------------------------- + +class TestAgentsecIntegration: + """Tests for agentsec SDK integration.""" + + def test_protect_is_called(self, example_code): + """Test that agentsec.protect() is called.""" + assert "agentsec.protect(" in example_code, \ + "Should call agentsec.protect()" + + def test_mode_from_env(self, example_code): + """Test that mode is read from environment variable.""" + assert "AGENTSEC_API_MODE_LLM" in example_code, \ + "Should read AGENTSEC_API_MODE_LLM from env" + + def test_security_policy_error_handled(self, example_code): + """Test that SecurityPolicyError is imported and handled.""" + assert "SecurityPolicyError" in example_code, \ + "Should import SecurityPolicyError" + assert "except SecurityPolicyError" in example_code, \ + "Should catch SecurityPolicyError" + + def test_patched_clients_logged(self, example_code): + """Test that patched clients are logged.""" + assert "get_patched_clients" in example_code, \ + "Should call get_patched_clients()" + + +# --------------------------------------------------------------------------- +# ADK agent setup +# --------------------------------------------------------------------------- + +class TestADKAgent: + """Tests for Google ADK agent implementation.""" + + def test_llm_agent_used(self, example_code): + """Test that LlmAgent is imported and used.""" + assert "from google.adk.agents import LlmAgent" in example_code or \ + "from google.adk.agents.llm_agent import LlmAgent" in example_code, \ + "Should import LlmAgent from google.adk" + assert "LlmAgent(" in example_code, "Should instantiate LlmAgent" + + def test_runner_used(self, example_code): + """Test that Runner is imported and used.""" + assert "from google.adk.runners import Runner" in example_code, \ + "Should import Runner" + assert "Runner(" in example_code, "Should instantiate Runner" + + def test_session_service_used(self, example_code): + """Test that InMemorySessionService is used.""" + assert "InMemorySessionService" in example_code, \ + "Should use InMemorySessionService" + + def test_gemini_model_specified(self, example_code): + """Test that a Gemini model is specified.""" + assert "gemini" in example_code.lower(), \ + "Should specify a Gemini model" + + def test_run_async_used(self, example_code): + """Test that runner.run_async is used for agent execution.""" + assert "run_async" in example_code, \ + "Should use run_async() for agent execution" + + def test_content_created(self, example_code): + """Test that google.genai types.Content is used for user input.""" + assert "types.Content(" in example_code, \ + "Should create Content for user message" + + +# --------------------------------------------------------------------------- +# MCP integration +# --------------------------------------------------------------------------- + +class TestMCPIntegration: + """Tests for MCP tool integration.""" + + def test_mcp_toolset_imported(self, example_code): + """Test that McpToolset is imported.""" + assert "McpToolset" in example_code, \ + "Should import McpToolset" + + def test_streamable_http_params(self, example_code): + """Test that StreamableHTTPConnectionParams is used.""" + assert "StreamableHTTPConnectionParams" in example_code, \ + "Should import StreamableHTTPConnectionParams" + + def test_mcp_url_from_env(self, example_code): + """Test that MCP URL is read from environment.""" + assert "MCP_SERVER_URL" in example_code, \ + "Should read MCP_SERVER_URL from env" + + def test_mcp_toolset_cleanup(self, example_code): + """Test that MCP toolset is properly closed.""" + assert "toolset.close()" in example_code or "mcp_toolset.close()" in example_code, \ + "Should close MCP toolset on exit" + + +# --------------------------------------------------------------------------- +# Debug logging +# --------------------------------------------------------------------------- + +class TestDebugLogging: + """Tests for debug logging implementation.""" + + def test_logger_debug_used(self, example_code): + """Test that logger.debug is used for debug messages.""" + assert "logger.debug" in example_code, \ + "Should use logger.debug for debug messages" + + def test_flush_true_used(self, example_code): + """Test that flush=True is used for immediate output.""" + assert "flush=True" in example_code, \ + "Should use flush=True for immediate output" + + def test_logging_configured(self, example_code): + """Test that logging is properly configured.""" + assert "logging.basicConfig" in example_code or \ + "logging.getLogger" in example_code, \ + "Should configure logging" + + +# --------------------------------------------------------------------------- +# Syntax and main +# --------------------------------------------------------------------------- + +class TestSyntaxAndMain: + """Tests for code syntax and main function.""" + + def test_syntax_valid(self, example_code): + """Test that the example code parses without syntax errors.""" + try: + ast.parse(example_code) + except SyntaxError as e: + pytest.fail(f"Syntax error in example code: {e}") + + def test_has_docstring(self, example_ast): + """Test that the module has a docstring.""" + docstring = ast.get_docstring(example_ast) + assert docstring, "Module should have a docstring" + + def test_main_function_defined(self, example_code): + """Test that main() function is defined.""" + assert "async def main()" in example_code or "def main()" in example_code, \ + "Should define main() function" + + def test_main_guard(self, example_code): + """Test that if __name__ == '__main__' guard is present.""" + assert '__name__ == "__main__"' in example_code or \ + "__name__ == '__main__'" in example_code, \ + "Should have main guard"