Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,8 @@ uv run protocols/aave/main.py

In production the scripts run on a schedule via supercronic on a VPS, defined in [`automation/jobs.yaml`](./automation/jobs.yaml). See [`deploy/`](./deploy/) — [`install.sh`](./deploy/install.sh) provisions a host and [`runbook.md`](./deploy/runbook.md) covers operations.

The optional read-only alerts API exposes persisted alert history from SQLite. See [`deploy/alerts-api.md`](./deploy/alerts-api.md) for endpoint examples and pagination.

## Code Style

Format and lint code with ruff:
Expand Down
1 change: 1 addition & 0 deletions api/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
"""Read-only monitoring alerts API."""
4 changes: 4 additions & 0 deletions api/__main__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
from api.server import main

if __name__ == "__main__":
main()
194 changes: 194 additions & 0 deletions api/server.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,194 @@
from __future__ import annotations

import json
import os
from dataclasses import dataclass
from datetime import UTC, datetime
from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer
from typing import Any
from urllib.parse import parse_qs, urlparse

from utils.logging import get_logger
from utils.store import AlertEvent, get_alert, query_alerts

logger = get_logger("api.server")

ALLOWED_SEVERITIES = {"LOW", "MEDIUM", "HIGH", "CRITICAL"}
MAX_LIMIT = 500


class BadRequest(ValueError):
pass


@dataclass(frozen=True)
class AlertQuery:
protocol: str | None = None
severity: str | None = None
source: str | None = None
from_ts: str | None = None
to_ts: str | None = None
cursor: int | None = None
limit: int = 100


def _one(params: dict[str, list[str]], key: str) -> str | None:
values = params.get(key)
if not values:
return None
return values[-1]


def parse_timestamp(value: str, name: str) -> str:
raw = value
if raw.endswith("Z"):
raw = f"{raw[:-1]}+00:00"
try:
parsed = datetime.fromisoformat(raw)
except ValueError as exc:
raise BadRequest(f"invalid {name} timestamp") from exc
if parsed.tzinfo is None:
raise BadRequest(f"{name} timestamp must include timezone")
return parsed.astimezone(UTC).isoformat().replace("+00:00", "Z")


def parse_alert_query(query: str) -> AlertQuery:
params = parse_qs(query, keep_blank_values=True)
severity = _one(params, "severity")
if severity is not None and severity not in ALLOWED_SEVERITIES:
raise BadRequest("invalid severity")

limit = 100
limit_raw = _one(params, "limit")
if limit_raw:
try:
limit = int(limit_raw)
except ValueError as exc:
raise BadRequest("invalid limit") from exc
if limit < 1:
raise BadRequest("invalid limit")
limit = min(limit, MAX_LIMIT)

cursor = None
cursor_raw = _one(params, "cursor")
if cursor_raw:
try:
cursor = int(cursor_raw)
except ValueError as exc:
raise BadRequest("invalid cursor") from exc
if cursor < 1:
raise BadRequest("invalid cursor")

from_raw = _one(params, "from") or _one(params, "since")
to_raw = _one(params, "to")
from_ts = parse_timestamp(from_raw, "from") if from_raw else None
to_ts = parse_timestamp(to_raw, "to") if to_raw else None
if from_ts and to_ts and to_ts <= from_ts:
raise BadRequest("to must be after from")

return AlertQuery(
protocol=_one(params, "protocol"),
severity=severity,
source=_one(params, "source"),
from_ts=from_ts,
to_ts=to_ts,
cursor=cursor,
limit=limit,
)


def alert_to_json(row: AlertEvent) -> dict[str, object]:
return {
"id": row["id"],
"created_at": row["created_at"],
"source": row["source"],
"protocol": row["protocol"],
"channel": row["channel"],
"severity": row["severity"],
"message": row["message"],
"plain_text": row["plain_text"],
"silent": row["silent"],
"delivery_status": row["delivery_status"],
"delivered_at": row["delivered_at"],
"delivery_error": row["delivery_error"],
"metadata": row["metadata"],
}


def write_json(handler: BaseHTTPRequestHandler, status: int, payload: dict[str, object]) -> None:
body = json.dumps(payload, separators=(",", ":")).encode()
handler.send_response(status)
handler.send_header("Content-Type", "application/json")
handler.send_header("Cache-Control", "no-store")
handler.send_header("Content-Length", str(len(body)))
handler.end_headers()
handler.wfile.write(body)


def write_error(handler: BaseHTTPRequestHandler, status: int, code: str, message: str) -> None:
write_json(handler, status, {"error": code, "message": message})


class AlertsHandler(BaseHTTPRequestHandler):
def do_GET(self) -> None:
parsed = urlparse(self.path)
try:
if parsed.path == "/healthz":
write_json(self, 200, {"status": "ok"})
return
if parsed.path == "/v1/alerts":
alert_query = parse_alert_query(parsed.query)
rows = query_alerts(
protocol=alert_query.protocol,
severity=alert_query.severity,
source=alert_query.source,
from_ts=alert_query.from_ts,
to_ts=alert_query.to_ts,
cursor=alert_query.cursor,
limit=alert_query.limit,
)
data = [alert_to_json(row) for row in rows]
next_cursor = str(min(row["id"] for row in rows)) if len(rows) == alert_query.limit else None
write_json(self, 200, {"data": data, "next_cursor": next_cursor, "limit": alert_query.limit})
return
if parsed.path.startswith("/v1/alerts/"):
alert_id_raw = parsed.path.removeprefix("/v1/alerts/")
try:
alert_id = int(alert_id_raw)
except ValueError:
write_error(self, 404, "not_found", "unknown path")
return
row = get_alert(alert_id)
if row is None:
write_error(self, 404, "not_found", "alert not found")
return
write_json(self, 200, alert_to_json(row))
return
write_error(self, 404, "not_found", "unknown path")
except BadRequest as exc:
write_error(self, 400, "bad_request", str(exc))
except Exception:
logger.exception("API request failed for path %s", parsed.path)
write_error(self, 500, "server_error", "unexpected server error")

def do_POST(self) -> None:
write_error(self, 405, "method_not_allowed", "only GET is supported")

do_PUT = do_POST
do_PATCH = do_POST
do_DELETE = do_POST

def log_message(self, format: str, *args: Any) -> None:
logger.info("%s - %s", self.address_string(), format % args)


def run(host: str, port: int) -> None:
server = ThreadingHTTPServer((host, port), AlertsHandler)
logger.info("monitoring API listening on %s:%d", host, port)
server.serve_forever()


def main() -> None:
host = os.getenv("MONITORING_API_HOST", "127.0.0.1")
port = int(os.getenv("MONITORING_API_PORT", "8923"))
run(host, port)
1 change: 1 addition & 0 deletions automation/jobs.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ profiles:
- { name: "compound", script: protocols/compound/main.py }
- { name: "yearn-check-endorsed", script: protocols/yearn/check_endorsed.py }
- { name: "yearn-check-timelock-delay", script: protocols/yearn/check_timelock_delay.py }
- { name: "prune-alerts", script: utils/prune_alerts.py }

multisig:
cron: "0/10 * * * *"
Expand Down
9 changes: 8 additions & 1 deletion automation/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,13 @@ def _run_task(task: Task, *, profile: Profile, repo_root: Path, dry_run: bool) -
def _send_failure_digest(result: ProfileResult) -> None:
message = result.telegram_summary()
try:
send_telegram_message(message, protocol=TELEGRAM_PROTOCOL, plain_text=False)
send_telegram_message(
message,
protocol=TELEGRAM_PROTOCOL,
plain_text=False,
source="automation_digest",
origin_protocol=TELEGRAM_PROTOCOL,
channel=TELEGRAM_PROTOCOL,
)
except TelegramError as exc:
logger.error("failed to send automation digest for %s: %s", result.profile, exc)
111 changes: 111 additions & 0 deletions deploy/alerts-api.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
# Alerts API

The alerts API exposes persisted monitoring alerts from the local SQLite
database at `$CACHE_DIR/monitoring.db`. It is read-only.

Production runs it as `monitoring-api.service`, bound to localhost:

```sh
sudo systemctl enable --now monitoring-api
curl http://127.0.0.1:8923/healthz
```

For local testing:

```sh
CACHE_DIR=/tmp/monitoring-cache uv run python -m api
```

## Endpoints

### `GET /healthz`

Returns:

```json
{"status":"ok"}
```

### `GET /v1/alerts`

Returns alert rows ordered newest first.

Common examples:

```sh
curl 'http://127.0.0.1:8923/v1/alerts?limit=10'
curl 'http://127.0.0.1:8923/v1/alerts?source=protocol&limit=50'
curl 'http://127.0.0.1:8923/v1/alerts?protocol=aave&severity=HIGH'
curl 'http://127.0.0.1:8923/v1/alerts?from=2026-06-11T00:00:00Z&to=2026-06-12T00:00:00Z'
```

Query parameters:

- `limit`: rows to return, default `100`, max `500`.
- `cursor`: pagination cursor from the previous response.
- `from`: inclusive timestamp with timezone.
- `to`: exclusive timestamp with timezone.
- `since`: alias for `from`.
- `protocol`: exact protocol filter, for example `aave`.
- `severity`: one of `LOW`, `MEDIUM`, `HIGH`, `CRITICAL`.
- `source`: alert source, commonly `protocol`, `ops_error`, `crash`, or `automation_digest`.

Example response:

```json
{
"data": [
{
"id": 5021,
"created_at": "2026-06-11T10:23:45.123456Z",
"source": "protocol",
"protocol": "aave",
"channel": "aave",
"severity": "LOW",
"message": "message text",
"plain_text": false,
"silent": true,
"delivery_status": "delivered",
"delivered_at": "2026-06-11T10:23:45.456789Z",
"delivery_error": null,
"metadata": {}
}
],
"next_cursor": "5021",
"limit": 100
}
```

For the next page, pass `cursor=<next_cursor>`:

```sh
curl 'http://127.0.0.1:8923/v1/alerts?cursor=5021&limit=100'
```

### `GET /v1/alerts/{id}`

Returns one alert:

```sh
curl http://127.0.0.1:8923/v1/alerts/5021
```

Missing alerts return `404`.

## Delivery Status

An alert row means a monitor generated an alert. Telegram delivery is tracked
separately:

- `generated`: row inserted before Telegram delivery completed.
- `delivered`: Telegram API call succeeded.
- `failed`: Telegram API call failed.
- `skipped_debug`: `LOG_LEVEL=DEBUG` skipped Telegram delivery.
- `skipped_missing_credentials`: Telegram credentials were missing.
- `not_attempted`: no Telegram attempt was made.

## Public Access

Do not expose the Python service directly. Keep it bound to `127.0.0.1` and put
a reverse proxy in front of it. The proxy should enforce authentication, rate
limits, and request/response timeouts.
Loading