This guide covers common integration patterns and extension points for Agent-as-Code.
Implement a custom model provider:
from agent_as_code.models import ModelProvider, ModelResponse
class CustomModelProvider(ModelProvider):
def __init__(self, config: dict):
super().__init__(config)
self.api_key = config.get("api_key")
self.base_url = config.get("base_url")
async def generate(self, prompt: str) -> ModelResponse:
# Implement model generation
response = await self._call_api(prompt)
return ModelResponse(
text=response["text"],
usage=response["usage"]
)
async def stream(self, prompt: str) -> AsyncIterator[str]:
# Implement streaming
async for chunk in self._stream_api(prompt):
yield chunk["text"]Configuration:
spec:
model:
provider: custom
name: my-model
config:
api_key: ${MY_MODEL_API_KEY}
base_url: https://api.mymodel.comIntegrate with local models:
from agent_as_code.models import LocalModelProvider
class OllamaProvider(LocalModelProvider):
async def setup(self):
# Initialize Ollama client
self.client = OllamaClient(self.config["base_url"])
async def generate(self, prompt: str) -> str:
return await self.client.generate(
model=self.config["model_name"],
prompt=prompt
)Configuration:
spec:
model:
provider: local
name: llama2
config:
base_url: http://localhost:11434Create a custom runtime:
from agent_as_code.runtime import Runtime, RuntimeConfig
class CustomRuntime(Runtime):
def __init__(self, config: RuntimeConfig):
super().__init__(config)
async def start(self):
# Initialize runtime
await self._setup_environment()
async def stop(self):
# Cleanup
await self._cleanup()
async def execute(self, input: dict) -> dict:
# Execute agent logic
return await self._process(input)Configuration:
spec:
runtime: custom:1.0
config:
custom_setting: valueExtend container runtime:
type CustomRuntime struct {
runtime.Runtime
customConfig map[string]interface{}
}
func (r *CustomRuntime) Run(options *runtime.RunOptions) (*runtime.ContainerInfo, error) {
// Add custom logic before running container
if err := r.preRun(options); err != nil {
return nil, err
}
// Run container
info, err := r.Runtime.Run(options)
if err != nil {
return nil, err
}
// Add custom logic after running container
if err := r.postRun(info); err != nil {
return nil, err
}
return info, nil
}Add custom API endpoints:
from fastapi import FastAPI, APIRouter
from agent_as_code.api import AgentAPI
router = APIRouter()
@router.post("/custom")
async def custom_endpoint(request: dict):
# Custom endpoint logic
return {"result": "success"}
class CustomAPI(AgentAPI):
def __init__(self):
super().__init__()
self.app.include_router(router, prefix="/api/v1")Add custom middleware:
from starlette.middleware.base import BaseHTTPMiddleware
class CustomMiddleware(BaseHTTPMiddleware):
async def dispatch(self, request, call_next):
# Pre-processing
response = await call_next(request)
# Post-processing
return response
app.add_middleware(CustomMiddleware)Implement custom storage:
from agent_as_code.storage import StorageBackend
class CustomStorage(StorageBackend):
async def save(self, key: str, data: bytes) -> str:
# Save data
location = await self._store(key, data)
return location
async def load(self, key: str) -> bytes:
# Load data
return await self._retrieve(key)
async def delete(self, key: str):
# Delete data
await self._remove(key)Configuration:
spec:
storage:
provider: custom
config:
endpoint: https://storage.example.com
credentials: ${STORAGE_CREDENTIALS}Add custom metrics:
from prometheus_client import Counter, Histogram
from agent_as_code.monitoring import Metrics
class CustomMetrics(Metrics):
def __init__(self):
self.requests = Counter(
'custom_requests_total',
'Total custom requests'
)
self.latency = Histogram(
'custom_latency_seconds',
'Request latency'
)
def record_request(self):
self.requests.inc()
def record_latency(self, duration: float):
self.latency.observe(duration)Implement custom health checks:
from agent_as_code.health import HealthCheck
class CustomHealthCheck(HealthCheck):
async def check(self) -> bool:
# Perform health check
status = await self._check_dependencies()
return status.is_healthy()
async def get_metrics(self) -> dict:
return {
"custom_metric": await self._get_metric()
}Implement custom authentication:
from agent_as_code.security import AuthProvider
class CustomAuth(AuthProvider):
async def authenticate(self, credentials: dict) -> bool:
# Verify credentials
return await self._verify(credentials)
async def authorize(self, token: str, resource: str) -> bool:
# Check authorization
return await self._check_access(token, resource)Configuration:
spec:
security:
auth_provider: custom
config:
auth_url: https://auth.example.com
client_id: ${CLIENT_ID}Implement custom event handling:
from agent_as_code.events import EventHandler
class CustomEventHandler(EventHandler):
async def handle(self, event: dict):
# Process event
if event["type"] == "custom":
await self._process_custom_event(event)
async def emit(self, event: dict):
# Emit event
await self._publish(event)Configuration:
spec:
events:
handler: custom
config:
broker_url: amqp://localhost- Implement proper error handling
- Use custom error types
- Provide detailed error messages
- Use environment variables
- Support configuration files
- Validate configuration
- Write unit tests
- Implement integration tests
- Use mocks for external services
- Document integration points
- Provide examples
- Include configuration options
- Implement authentication
- Use secure communication
- Handle sensitive data properly