diff --git a/otel-collector/cassandra/.env.example b/otel-collector/cassandra/.env.example new file mode 100644 index 0000000..43b3415 --- /dev/null +++ b/otel-collector/cassandra/.env.example @@ -0,0 +1,2 @@ +OTEL_EXPORTER_OTLP_ENDPOINT=https://otlp-aps1.last9.io:443 +OTEL_EXPORTER_OTLP_HEADERS_AUTHORIZATION=Basic diff --git a/otel-collector/cassandra/.gitignore b/otel-collector/cassandra/.gitignore new file mode 100644 index 0000000..e2a2611 --- /dev/null +++ b/otel-collector/cassandra/.gitignore @@ -0,0 +1,5 @@ +.env +.env.local +*.log +.DS_Store +*.jar diff --git a/otel-collector/cassandra/Dockerfile b/otel-collector/cassandra/Dockerfile new file mode 100644 index 0000000..513e654 --- /dev/null +++ b/otel-collector/cassandra/Dockerfile @@ -0,0 +1,17 @@ +FROM eclipse-temurin:17-jre-jammy AS java +FROM otel/opentelemetry-collector-contrib:0.144.0 AS otelcol + +# Use a Debian slim base so the Temurin JRE (Ubuntu glibc) links correctly. +# The distroless otelcol base lacks some runtime libraries the JRE needs. +FROM debian:12-slim + +RUN apt-get update && apt-get install -y --no-install-recommends ca-certificates && rm -rf /var/lib/apt/lists/* + +COPY --from=otelcol /otelcol-contrib /otelcol-contrib +COPY --from=java /opt/java/openjdk /opt/java/openjdk + +ENV JAVA_HOME=/opt/java/openjdk +ENV PATH="${JAVA_HOME}/bin:${PATH}" + +USER nobody +ENTRYPOINT ["/otelcol-contrib"] diff --git a/otel-collector/cassandra/README.md b/otel-collector/cassandra/README.md new file mode 100644 index 0000000..31c0f89 --- /dev/null +++ b/otel-collector/cassandra/README.md @@ -0,0 +1,110 @@ +# Cassandra + OTel Collector + +Collects Cassandra JMX metrics via the `opentelemetry-jmx-metrics` JAR and ships to Last9. + +## Prerequisites + +- Apache Cassandra 4.x with JMX exposed on port 7199 +- Java 17+ on the machine running the JMX metrics collector +- OTel Collector with OTLP receiver +- Last9 OTLP credentials + +## Quick Start (local Docker test) + +```bash +cp .env.example .env +# Fill in OTEL_EXPORTER_OTLP_ENDPOINT and OTEL_EXPORTER_OTLP_HEADERS_AUTHORIZATION +docker compose up -d +``` + +The Docker Compose setup uses a JMX metrics sidecar container (`eclipse-temurin:17-jre-jammy`) that collects JMX metrics from Cassandra and forwards them to the OTel Collector via OTLP. + +## Production Setup (bare-metal) + +### 1. Download the JMX metrics JAR + +```bash +sudo wget -O /opt/opentelemetry-jmx-metrics.jar \ + https://github.com/open-telemetry/opentelemetry-java-contrib/releases/download/v1.43.0/opentelemetry-jmx-metrics.jar +``` + +### 2. Create the JMX config + +Create `/etc/otelcol-contrib/jmx-config.properties`: + +```properties +otel.exporter.otlp.endpoint = http://localhost:4317 +otel.jmx.interval.milliseconds = 60000 +otel.jmx.service.url = service:jmx:rmi:///jndi/rmi://localhost:7199/jmxrmi +otel.jmx.target.system = cassandra +otel.metrics.exporter = otlp +``` + +### 3. Run the JMX metrics JAR as a service + +Create `/etc/systemd/system/cassandra-jmx-metrics.service`: + +```ini +[Unit] +Description=Cassandra JMX Metrics Collector +After=cassandra.service + +[Service] +ExecStart=java -jar /opt/opentelemetry-jmx-metrics.jar -config /etc/otelcol-contrib/jmx-config.properties +Restart=always +RestartSec=10 + +[Install] +WantedBy=multi-user.target +``` + +```bash +sudo systemctl daemon-reload +sudo systemctl enable --now cassandra-jmx-metrics +``` + +### 4. Install and configure OTel Collector + +```bash +# AMD64 +wget https://github.com/open-telemetry/opentelemetry-collector-releases/releases/download/v0.144.0/otelcol-contrib_0.144.0_linux_amd64.deb +sudo dpkg -i otelcol-contrib_0.144.0_linux_amd64.deb +# ARM64 +wget https://github.com/open-telemetry/opentelemetry-collector-releases/releases/download/v0.144.0/otelcol-contrib_0.144.0_linux_arm64.deb +sudo dpkg -i otelcol-contrib_0.144.0_linux_arm64.deb +``` + +```bash +sudo cp otel-collector-config.yaml /etc/otelcol-contrib/config.yaml +sudo systemctl enable --now otelcol-contrib +``` + +## Cassandra JMX Configuration + +JMX is enabled on port 7199 by default. To allow remote JMX access: + +Add to `/etc/cassandra/cassandra-env.sh` or JVM options: + +``` +-Dcom.sun.management.jmxremote +-Dcom.sun.management.jmxremote.port=7199 +-Dcom.sun.management.jmxremote.rmi.port=7199 +-Dcom.sun.management.jmxremote.ssl=false +-Dcom.sun.management.jmxremote.authenticate=false +-Djava.rmi.server.hostname= +``` + +## Configuration + +| Variable | Description | +|---|---| +| `OTEL_EXPORTER_OTLP_ENDPOINT` | Last9 OTLP endpoint | +| `OTEL_EXPORTER_OTLP_HEADERS_AUTHORIZATION` | Last9 Basic auth header | + +## Metrics Collected + +- Read/write request latency (p50, p99, max) +- Request counts and error counts by operation +- Pending and completed compaction tasks +- Storage load and hints counts +- System CPU, memory, disk, network via `hostmetrics` diff --git a/otel-collector/cassandra/docker-compose.yaml b/otel-collector/cassandra/docker-compose.yaml new file mode 100644 index 0000000..c9e3e62 --- /dev/null +++ b/otel-collector/cassandra/docker-compose.yaml @@ -0,0 +1,44 @@ +services: + cassandra: + image: cassandra:4 + container_name: cassandra-test + environment: + # Enable remote JMX access for the otel-collector container + - LOCAL_JMX=no + - JVM_EXTRA_OPTS=-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Dcom.sun.management.jmxremote.local.only=false -Dcom.sun.management.jmxremote.port=7199 -Dcom.sun.management.jmxremote.rmi.port=7199 -Djava.rmi.server.hostname=cassandra + ports: + - "9042:9042" + - "7199:7199" + healthcheck: + test: ["CMD", "cqlsh", "-e", "describe keyspaces"] + interval: 15s + timeout: 10s + retries: 10 + start_period: 60s + + # JMX metrics sidecar: runs the OTel JMX metrics JAR, sends to otel-collector via OTLP + jmx-metrics: + image: eclipse-temurin:17-jre-jammy + container_name: cassandra-jmx-metrics + command: ["java", "-jar", "/opt/opentelemetry-jmx-metrics.jar", "-config", "/opt/jmx-config.properties"] + volumes: + - ./opentelemetry-jmx-metrics.jar:/opt/opentelemetry-jmx-metrics.jar:ro + - ./jmx-config.properties:/opt/jmx-config.properties:ro + depends_on: + cassandra: + condition: service_healthy + otel-collector: + condition: service_started + + otel-collector: + image: otel/opentelemetry-collector-contrib:0.144.0 + container_name: cassandra-otel-collector + command: ["--config=/etc/otel/config.yaml"] + volumes: + - ./otel-collector-config.yaml:/etc/otel/config.yaml:ro + environment: + - OTEL_EXPORTER_OTLP_ENDPOINT=${OTEL_EXPORTER_OTLP_ENDPOINT} + - OTEL_EXPORTER_OTLP_HEADERS_AUTHORIZATION=${OTEL_EXPORTER_OTLP_HEADERS_AUTHORIZATION} + depends_on: + cassandra: + condition: service_healthy diff --git a/otel-collector/cassandra/jmx-config.properties b/otel-collector/cassandra/jmx-config.properties new file mode 100644 index 0000000..ec3a7d7 --- /dev/null +++ b/otel-collector/cassandra/jmx-config.properties @@ -0,0 +1,5 @@ +otel.exporter.otlp.endpoint = http://otel-collector:4317 +otel.jmx.interval.milliseconds = 60000 +otel.jmx.service.url = service:jmx:rmi:///jndi/rmi://cassandra:7199/jmxrmi +otel.jmx.target.system = cassandra +otel.metrics.exporter = otlp diff --git a/otel-collector/cassandra/otel-collector-config.yaml b/otel-collector/cassandra/otel-collector-config.yaml new file mode 100644 index 0000000..35e05c8 --- /dev/null +++ b/otel-collector/cassandra/otel-collector-config.yaml @@ -0,0 +1,64 @@ +receivers: + # Receives JMX metrics from the opentelemetry-jmx-metrics sidecar (Docker) + # For bare-metal: use the jmxreceiver directly (see README) + otlp: + protocols: + grpc: + endpoint: 0.0.0.0:4317 + + hostmetrics: + collection_interval: 60s + scrapers: + cpu: + metrics: + system.cpu.logical.count: + enabled: true + memory: + metrics: + system.memory.utilization: + enabled: true + system.memory.limit: + enabled: true + load: + disk: + filesystem: + metrics: + system.filesystem.utilization: + enabled: true + network: + paging: + +processors: + batch: + timeout: 5s + send_batch_size: 10000 + send_batch_max_size: 10000 + resourcedetection/system: + detectors: ["system"] + system: + hostname_sources: ["os"] + transform/add_db_system: + metric_statements: + - context: datapoint + statements: + - set(attributes["host.name"], resource.attributes["host.name"]) + - set(attributes["db.system"], "cassandra") + +exporters: + otlp/last9: + endpoint: "${env:OTEL_EXPORTER_OTLP_ENDPOINT}" + headers: + "Authorization": "${env:OTEL_EXPORTER_OTLP_HEADERS_AUTHORIZATION}" + debug: + verbosity: detailed + +service: + pipelines: + metrics/jmx: + receivers: [otlp] + processors: [batch, transform/add_db_system] + exporters: [otlp/last9, debug] + metrics/host: + receivers: [hostmetrics] + processors: [batch, resourcedetection/system, transform/add_db_system] + exporters: [otlp/last9, debug] diff --git a/otel-collector/neo4j/.env.example b/otel-collector/neo4j/.env.example new file mode 100644 index 0000000..43b3415 --- /dev/null +++ b/otel-collector/neo4j/.env.example @@ -0,0 +1,2 @@ +OTEL_EXPORTER_OTLP_ENDPOINT=https://otlp-aps1.last9.io:443 +OTEL_EXPORTER_OTLP_HEADERS_AUTHORIZATION=Basic diff --git a/otel-collector/neo4j/.gitignore b/otel-collector/neo4j/.gitignore new file mode 100644 index 0000000..3b1636a --- /dev/null +++ b/otel-collector/neo4j/.gitignore @@ -0,0 +1,4 @@ +.env +.env.local +*.log +.DS_Store diff --git a/otel-collector/neo4j/README.md b/otel-collector/neo4j/README.md new file mode 100644 index 0000000..8fa93ee --- /dev/null +++ b/otel-collector/neo4j/README.md @@ -0,0 +1,84 @@ +# Neo4j + OTel Collector + +Collects Neo4j metrics via its Prometheus endpoint and ships to Last9. + +> **Note:** Prometheus metrics require Neo4j Enterprise Edition. The `docker-compose.yaml` uses `neo4j:5-enterprise` with a 30-day evaluation license. + +## Prerequisites + +- Neo4j Enterprise 5.x installed and running +- OTel Collector installed as a binary +- Last9 OTLP credentials + +## Neo4j Configuration + +Enable the Prometheus metrics endpoint in `neo4j.conf`: + +``` +server.metrics.prometheus.enabled=true +server.metrics.prometheus.endpoint=0.0.0.0:2004 +``` + +Restart Neo4j after making changes: + +```bash +sudo systemctl restart neo4j +``` + +Verify the endpoint is working: + +```bash +curl http://localhost:2004/metrics | head -20 +``` + +## Quick Start (local Docker test) + +```bash +cp .env.example .env +# Fill in OTEL_EXPORTER_OTLP_ENDPOINT and OTEL_EXPORTER_OTLP_HEADERS_AUTHORIZATION +docker compose up -d +``` + +## Production Setup (bare-metal) + +1. Install OTel Collector: + + ```bash + # AMD64 + wget https://github.com/open-telemetry/opentelemetry-collector-releases/releases/download/v0.144.0/otelcol-contrib_0.144.0_linux_amd64.deb + sudo dpkg -i otelcol-contrib_0.144.0_linux_amd64.deb + # ARM64 + wget https://github.com/open-telemetry/opentelemetry-collector-releases/releases/download/v0.144.0/otelcol-contrib_0.144.0_linux_arm64.deb + sudo dpkg -i otelcol-contrib_0.144.0_linux_arm64.deb + ``` + +2. Copy config: + + ```bash + sudo cp otel-collector-config.yaml /etc/otelcol-contrib/config.yaml + ``` + +3. Set credentials in `/etc/otelcol-contrib/otelcol-contrib.conf` and start: + + ```bash + sudo systemctl enable --now otelcol-contrib + ``` + +## Configuration + +| Variable | Description | +|---|---| +| `OTEL_EXPORTER_OTLP_ENDPOINT` | Last9 OTLP endpoint | +| `OTEL_EXPORTER_OTLP_HEADERS_AUTHORIZATION` | Last9 Basic auth header | +| `NEO4J_HOST` | Neo4j hostname (default: `localhost`) | + +## Metrics Collected + +- Transaction throughput (commits, rollbacks, active read/write) +- Query execution latency (slotted, pipelined, parallel) +- Bolt connection counts and idle sessions +- Page cache hit ratio and fault rates +- Store sizes (database, available) +- Cypher cache hit/miss rates +- JVM heap, GC pause times, thread counts +- System CPU, memory, disk, network via `hostmetrics` diff --git a/otel-collector/neo4j/docker-compose.yaml b/otel-collector/neo4j/docker-compose.yaml new file mode 100644 index 0000000..abd4234 --- /dev/null +++ b/otel-collector/neo4j/docker-compose.yaml @@ -0,0 +1,33 @@ +services: + neo4j: + image: neo4j:5-enterprise + container_name: neo4j-test + environment: + NEO4J_AUTH: none + NEO4J_ACCEPT_LICENSE_AGREEMENT: eval + NEO4J_server_metrics_prometheus_enabled: "true" + NEO4J_server_metrics_prometheus_endpoint: "0.0.0.0:2004" + ports: + - "7474:7474" + - "7687:7687" + - "2004:2004" + healthcheck: + test: ["CMD", "wget", "-q", "--spider", "http://localhost:7474"] + interval: 10s + timeout: 5s + retries: 10 + start_period: 30s + + otel-collector: + image: otel/opentelemetry-collector-contrib:0.144.0 + container_name: neo4j-otel-collector + command: ["--config=/etc/otel/config.yaml"] + volumes: + - ./otel-collector-config.yaml:/etc/otel/config.yaml:ro + environment: + - NEO4J_HOST=neo4j + - OTEL_EXPORTER_OTLP_ENDPOINT=${OTEL_EXPORTER_OTLP_ENDPOINT} + - OTEL_EXPORTER_OTLP_HEADERS_AUTHORIZATION=${OTEL_EXPORTER_OTLP_HEADERS_AUTHORIZATION} + depends_on: + neo4j: + condition: service_healthy diff --git a/otel-collector/neo4j/otel-collector-config.yaml b/otel-collector/neo4j/otel-collector-config.yaml new file mode 100644 index 0000000..803f960 --- /dev/null +++ b/otel-collector/neo4j/otel-collector-config.yaml @@ -0,0 +1,62 @@ +receivers: + prometheus: + config: + scrape_configs: + - job_name: neo4j + scrape_interval: 60s + metrics_path: /metrics + static_configs: + - targets: ["${env:NEO4J_HOST:-localhost}:2004"] + + hostmetrics: + collection_interval: 60s + scrapers: + cpu: + metrics: + system.cpu.logical.count: + enabled: true + memory: + metrics: + system.memory.utilization: + enabled: true + system.memory.limit: + enabled: true + load: + disk: + filesystem: + metrics: + system.filesystem.utilization: + enabled: true + network: + paging: + +processors: + batch: + timeout: 5s + send_batch_size: 10000 + send_batch_max_size: 10000 + resourcedetection/system: + detectors: ["system"] + system: + hostname_sources: ["os"] + transform/hostmetrics: + metric_statements: + - context: datapoint + statements: + - set(attributes["host.name"], resource.attributes["host.name"]) + - set(attributes["db.system"], "neo4j") + +exporters: + otlp/last9: + endpoint: "${env:OTEL_EXPORTER_OTLP_ENDPOINT}" + headers: + "Authorization": "${env:OTEL_EXPORTER_OTLP_HEADERS_AUTHORIZATION}" + debug: + verbosity: detailed + +service: + pipelines: + metrics: + receivers: [prometheus, hostmetrics] + processors: [batch, resourcedetection/system, transform/hostmetrics] + exporters: [otlp/last9, debug] diff --git a/otel-collector/postgres-no-docker/.env.example b/otel-collector/postgres-no-docker/.env.example new file mode 100644 index 0000000..43b3415 --- /dev/null +++ b/otel-collector/postgres-no-docker/.env.example @@ -0,0 +1,2 @@ +OTEL_EXPORTER_OTLP_ENDPOINT=https://otlp-aps1.last9.io:443 +OTEL_EXPORTER_OTLP_HEADERS_AUTHORIZATION=Basic diff --git a/otel-collector/postgres-no-docker/.gitignore b/otel-collector/postgres-no-docker/.gitignore new file mode 100644 index 0000000..3b1636a --- /dev/null +++ b/otel-collector/postgres-no-docker/.gitignore @@ -0,0 +1,4 @@ +.env +.env.local +*.log +.DS_Store diff --git a/otel-collector/postgres-no-docker/README.md b/otel-collector/postgres-no-docker/README.md new file mode 100644 index 0000000..a0013d9 --- /dev/null +++ b/otel-collector/postgres-no-docker/README.md @@ -0,0 +1,116 @@ +# PostgreSQL + OTel Collector (No Docker) + +Collects PostgreSQL metrics and slow query logs using the OTel Collector binary — no Docker required. Runs as a systemd service on the same host as PostgreSQL. + +## Prerequisites + +- PostgreSQL 14+ installed and running +- OTel Collector binary installed +- Last9 OTLP credentials + +## PostgreSQL Configuration + +1. Create a monitoring user: + + ```sql + CREATE USER otel WITH PASSWORD 'your_secure_password'; + GRANT pg_monitor TO otel; + ``` + +2. Enable slow query logging in `postgresql.conf`: + + ```ini + log_min_duration_statement = 1000 # log queries slower than 1s (in ms) + log_line_prefix = '%t [%p] %u@%d ' + logging_collector = on + log_directory = '/var/log/postgresql' + log_filename = 'postgresql-%Y-%m-%d.log' + ``` + +3. Reload PostgreSQL: + + ```bash + sudo systemctl reload postgresql + ``` + +## Install OTel Collector + +```bash +# AMD64 (Debian/Ubuntu) +wget https://github.com/open-telemetry/opentelemetry-collector-releases/releases/download/v0.144.0/otelcol-contrib_0.144.0_linux_amd64.deb +sudo dpkg -i otelcol-contrib_0.144.0_linux_amd64.deb + +# ARM64 (Debian/Ubuntu) +wget https://github.com/open-telemetry/opentelemetry-collector-releases/releases/download/v0.144.0/otelcol-contrib_0.144.0_linux_arm64.deb +sudo dpkg -i otelcol-contrib_0.144.0_linux_arm64.deb + +# RHEL/CentOS AMD64 +wget https://github.com/open-telemetry/opentelemetry-collector-releases/releases/download/v0.144.0/otelcol-contrib_0.144.0_linux_amd64.rpm +sudo rpm -ivh otelcol-contrib_0.144.0_linux_amd64.rpm +``` + +## Quick Start + +1. Copy `.env.example` to `.env` and fill in credentials: + + ```bash + cp .env.example .env + ``` + +2. Copy the collector config: + + ```bash + sudo cp otel-collector-config.yaml /etc/otelcol-contrib/config.yaml + ``` + +3. Set environment variables for the service. Edit `/etc/otelcol-contrib/otelcol-contrib.env` (created by the .deb/.rpm package) and add: + + ```bash + OTEL_EXPORTER_OTLP_ENDPOINT=https://otlp-aps1.last9.io:443 + OTEL_EXPORTER_OTLP_HEADERS_AUTHORIZATION=Basic + POSTGRESQL_PASSWORD= + ``` + +4. Grant the collector read access to PostgreSQL logs: + + ```bash + sudo usermod -aG adm otelcol-contrib + # or explicitly: + sudo chmod o+r /var/log/postgresql/*.log + ``` + +5. Start and enable the service: + + ```bash + sudo systemctl daemon-reload + sudo systemctl enable otelcol-contrib + sudo systemctl start otelcol-contrib + ``` + +## Verification + +```bash +# Check service status +sudo systemctl status otelcol-contrib + +# Watch logs +sudo journalctl -u otelcol-contrib -f +``` + +## Configuration + +| Variable | Description | +|---|---| +| `OTEL_EXPORTER_OTLP_ENDPOINT` | Last9 OTLP endpoint | +| `OTEL_EXPORTER_OTLP_HEADERS_AUTHORIZATION` | Last9 Basic auth header | +| `POSTGRESQL_PASSWORD` | Password for the `otel` monitoring user | + +## Log Path + +Adjust `include` in `otel-collector-config.yaml` to match your system: + +| OS | Default log path | +|---|---| +| Ubuntu/Debian | `/var/log/postgresql/postgresql-*.log` | +| RHEL/CentOS | `/var/lib/pgsql//data/log/postgresql-*.log` | +| Custom | Set `log_directory` in `postgresql.conf` | diff --git a/otel-collector/postgres-no-docker/docker-compose.yaml b/otel-collector/postgres-no-docker/docker-compose.yaml new file mode 100644 index 0000000..2561c3b --- /dev/null +++ b/otel-collector/postgres-no-docker/docker-compose.yaml @@ -0,0 +1,31 @@ +services: + postgres: + image: postgres:16 + container_name: postgres-nd-test + environment: + POSTGRES_PASSWORD: postgres + POSTGRES_USER: postgres + ports: + - "5434:5432" + healthcheck: + test: ["CMD-SHELL", "pg_isready -U postgres"] + interval: 10s + timeout: 5s + retries: 10 + start_period: 15s + + otel-collector: + image: otel/opentelemetry-collector-contrib:0.144.0 + container_name: postgres-nd-otel-collector + command: ["--config=/etc/otel/config.yaml", "--feature-gates=transform.flatten.logs"] + volumes: + - ./otel-collector-config.yaml:/etc/otel/config.yaml:ro + environment: + - POSTGRES_HOST=postgres + - POSTGRESQL_USERNAME=postgres + - POSTGRESQL_PASSWORD=postgres + - OTEL_EXPORTER_OTLP_ENDPOINT=${OTEL_EXPORTER_OTLP_ENDPOINT} + - OTEL_EXPORTER_OTLP_HEADERS_AUTHORIZATION=${OTEL_EXPORTER_OTLP_HEADERS_AUTHORIZATION} + depends_on: + postgres: + condition: service_healthy diff --git a/otel-collector/postgres-no-docker/otel-collector-config.yaml b/otel-collector/postgres-no-docker/otel-collector-config.yaml new file mode 100644 index 0000000..5cabf2a --- /dev/null +++ b/otel-collector/postgres-no-docker/otel-collector-config.yaml @@ -0,0 +1,103 @@ +receivers: + postgresql: + endpoint: "${env:POSTGRES_HOST:-localhost}:5432" + username: ${env:POSTGRESQL_USERNAME:-otel} + password: ${env:POSTGRESQL_PASSWORD} + databases: + - postgres + collection_interval: 60s + tls: + insecure: true + + filelog: + # Adjust path to match your OS and PostgreSQL version: + # Ubuntu/Debian: /var/log/postgresql/postgresql--main.log + # RHEL/CentOS: /var/lib/pgsql//data/log/postgresql-*.log + include: [/var/log/postgresql/postgresql-*.log] + include_file_path: true + start_at: end + retry_on_failure: + enabled: true + # Log lines start with a timestamp from log_line_prefix + # Format: 2026-03-06 14:30:00 UTC [1234] user@db LOG: duration: 1234.567 ms statement: ... + multiline: + line_start_pattern: '^\d{4}-\d{2}-\d{2}\s\d{2}:\d{2}:\d{2}' + operators: + - type: regex_parser + if: body matches "duration:" + regex: '(?P\d{4}-\d{2}-\d{2}\s\d{2}:\d{2}:\d{2}\s\w+)\s+\[(?P\d+)\]\s+(?P[^@]+)@(?P\S+)\s+LOG:\s+duration:\s+(?P[\d.]+)\s+ms\s+(?P\w+):\s+(?P.*)' + on_error: send + + hostmetrics: + collection_interval: 60s + scrapers: + cpu: + metrics: + system.cpu.logical.count: + enabled: true + memory: + metrics: + system.memory.utilization: + enabled: true + system.memory.limit: + enabled: true + load: + disk: + filesystem: + metrics: + system.filesystem.utilization: + enabled: true + network: + paging: + +processors: + batch: + timeout: 5s + send_batch_size: 10000 + send_batch_max_size: 10000 + resourcedetection/system: + detectors: ["system"] + system: + hostname_sources: ["os"] + transform/logs: + flatten_data: true + log_statements: + - context: log + statements: + - set(observed_time, Now()) + - set(time_unix_nano, observed_time_unix_nano) where time_unix_nano == 0 + - set(resource.attributes["service.name"], "postgresql") + - set(resource.attributes["deployment.environment"], "production") + transform/slow_queries: + log_statements: + - context: log + conditions: + - attributes["duration_ms"] != nil + statements: + - set(attributes["db.system"], "postgresql") + - set(attributes["db.operation.duration_ms"], attributes["duration_ms"]) + - set(attributes["db.user"], attributes["user"]) + - set(attributes["db.namespace"], attributes["database"]) + - set(attributes["db.query.text"], attributes["query"]) + - set(attributes["db.pid"], attributes["pid"]) + - set(attributes["slow_query"], true) + - set(severity_text, "WARN") + +exporters: + otlp/last9: + endpoint: "${env:OTEL_EXPORTER_OTLP_ENDPOINT}" + headers: + "Authorization": "${env:OTEL_EXPORTER_OTLP_HEADERS_AUTHORIZATION}" + debug: + verbosity: detailed + +service: + pipelines: + logs: + receivers: [filelog] + processors: [batch, resourcedetection/system, transform/logs, transform/slow_queries] + exporters: [otlp/last9, debug] + metrics: + receivers: [postgresql, hostmetrics] + processors: [batch, resourcedetection/system] + exporters: [otlp/last9, debug] diff --git a/otel-collector/postgres-no-docker/otelcol.service b/otel-collector/postgres-no-docker/otelcol.service new file mode 100644 index 0000000..453d6a8 --- /dev/null +++ b/otel-collector/postgres-no-docker/otelcol.service @@ -0,0 +1,17 @@ +[Unit] +Description=OpenTelemetry Collector Contrib +Documentation=https://opentelemetry.io/docs/collector/ +After=network.target postgresql.service + +[Service] +EnvironmentFile=/etc/otelcol-contrib/otelcol-contrib.env +ExecStart=/usr/bin/otelcol-contrib --config /etc/otelcol-contrib/config.yaml --feature-gates transform.flatten.logs +Restart=on-failure +RestartSec=5s +User=otelcol-contrib +Group=otelcol-contrib +# Allow reading PostgreSQL log files +SupplementaryGroups=adm + +[Install] +WantedBy=multi-user.target diff --git a/python/aws-cost-explorer/.env.example b/python/aws-cost-explorer/.env.example new file mode 100644 index 0000000..ca4c158 --- /dev/null +++ b/python/aws-cost-explorer/.env.example @@ -0,0 +1,19 @@ +# AWS credentials — omit if running on EC2/ECS/Lambda with an IAM role +AWS_ACCESS_KEY_ID= +AWS_SECRET_ACCESS_KEY= +AWS_DEFAULT_REGION=us-east-1 + +# How many days back to fetch on each run (default: 30) +DAYS_BACK=30 + +# How often to re-poll (seconds). Cost Explorer data updates ~once/day. +POLL_INTERVAL_SECONDS=86400 + +# Last9 OTLP endpoint +OTLP_ENDPOINT=https://otlp.last9.io + +# Last9 auth header: get from Last9 dashboard → Integrations → OTLP +OTLP_HEADERS=Authorization=Basic + +# Service name shown in Last9 +OTEL_SERVICE_NAME=aws-cost-reporter diff --git a/python/aws-cost-explorer/.gitignore b/python/aws-cost-explorer/.gitignore new file mode 100644 index 0000000..92c5b7e --- /dev/null +++ b/python/aws-cost-explorer/.gitignore @@ -0,0 +1,11 @@ +.env +.env.local +.env.*.local +/.venv/ +__pycache__/ +*.pyc +.idea/ +.vscode/ +*.swp +.DS_Store +*.log diff --git a/python/aws-cost-explorer/Dockerfile b/python/aws-cost-explorer/Dockerfile new file mode 100644 index 0000000..3ff08d6 --- /dev/null +++ b/python/aws-cost-explorer/Dockerfile @@ -0,0 +1,10 @@ +FROM python:3.13-slim + +WORKDIR /app + +COPY requirements.txt . +RUN pip install --no-cache-dir -r requirements.txt + +COPY main.py . + +CMD ["python", "main.py"] diff --git a/python/aws-cost-explorer/README.md b/python/aws-cost-explorer/README.md new file mode 100644 index 0000000..7bdf68b --- /dev/null +++ b/python/aws-cost-explorer/README.md @@ -0,0 +1,71 @@ +# AWS Cost Explorer → Last9 + +Sends AWS cost metrics to Last9 using the Cost Explorer API. No S3 bucket or CUR setup required — data flows within minutes. + +## Prerequisites + +- AWS account with billing access +- [Last9 OTLP credentials](https://app.last9.io/integrations) + +## Deploy with CloudFormation (recommended) + +No CLI or local setup needed — deploy directly from the AWS console. + +1. Open [CloudFormation → Create stack](https://console.aws.amazon.com/cloudformation/home#/stacks/create) +2. Upload `cloudformation.yaml` +3. Fill in `OtlpHeaders` with your Last9 auth header +4. Click **Create stack** + +CloudFormation creates the IAM role, Lambda function, and EventBridge daily schedule automatically. + +Test after deploy: +```bash +aws lambda invoke --function-name aws-cost-reporter /tmp/out.json && cat /tmp/out.json +``` + +## Deploy with AWS CLI + +Requires AWS CLI configured locally: + +```bash +OTLP_HEADERS="Authorization=Basic " ./deploy.sh +``` + +## Run with Docker (local testing) + +```bash +cp .env.example .env +# Fill in AWS credentials and OTLP_HEADERS +docker compose up +``` + +## Configuration + +| Variable | Required | Default | Description | +|---|---|---|---| +| `OTLP_HEADERS` | Yes | — | Last9 auth header (`Authorization=Basic `) | +| `AWS_ACCESS_KEY_ID` | No* | — | AWS access key (Docker only) | +| `AWS_SECRET_ACCESS_KEY` | No* | — | AWS secret key (Docker only) | +| `AWS_DEFAULT_REGION` | No | `us-east-1` | AWS region | +| `DAYS_BACK` | No | `30` | Days of history to fetch per run | +| `POLL_INTERVAL_SECONDS` | No | `86400` | Re-poll interval for Docker mode | +| `OTEL_SERVICE_NAME` | No | `aws-cost-reporter` | Service name in Last9 | + +\* Lambda uses the attached IAM role — no credentials needed. + +## Metrics + +| Metric | Unit | Dimensions | +|---|---|---| +| `aws.cost.unblended` | USD | `aws.service`, `aws.account.id`, `aws.region` | +| `aws.cost.amortized` | USD | same — RI and Savings Plan effective rates applied | + +## Verification + +After the Lambda runs, query `aws.cost.unblended` in [Last9 Metrics](https://app.last9.io/metrics) and group by `aws.service`. + +--- + +> **Need resource-level granularity or cost allocation tags?** +> Use the [AWS CUR integration](../aws-cur/) instead — it reads Cost and Usage Report +> parquet files from S3 for line-item detail and custom tag dimensions. diff --git a/python/aws-cost-explorer/cloudformation.yaml b/python/aws-cost-explorer/cloudformation.yaml new file mode 100644 index 0000000..3d3785d --- /dev/null +++ b/python/aws-cost-explorer/cloudformation.yaml @@ -0,0 +1,200 @@ +AWSTemplateFormatVersion: '2010-09-09' +Description: > + AWS Cost Explorer → Last9 via OpenTelemetry metrics. + Deploys a Lambda function triggered daily by EventBridge that fetches + AWS cost data and exports it to Last9 as OTel gauge metrics. + +Parameters: + OtlpHeaders: + Type: String + Description: "Last9 OTLP auth header — get from Last9 dashboard → Integrations → OTLP (e.g. Authorization=Basic xxxx)" + NoEcho: true + + OtlpEndpoint: + Type: String + Default: https://otlp.last9.io + Description: Last9 OTLP endpoint + + DaysBack: + Type: Number + Default: 30 + Description: Days of cost history to fetch on each run + + Schedule: + Type: String + Default: rate(1 day) + Description: EventBridge schedule expression (e.g. rate(1 day) or cron(0 6 * * ? *)) + + ServiceName: + Type: String + Default: aws-cost-reporter + Description: Service name shown in Last9 and used as the Lambda function name + +Resources: + + ExecutionRole: + Type: AWS::IAM::Role + Properties: + RoleName: !Sub "${ServiceName}-role" + AssumeRolePolicyDocument: + Version: "2012-10-17" + Statement: + - Effect: Allow + Principal: + Service: lambda.amazonaws.com + Action: sts:AssumeRole + ManagedPolicyArns: + - arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole + Policies: + - PolicyName: CostExplorerRead + PolicyDocument: + Version: "2012-10-17" + Statement: + - Effect: Allow + Action: ce:GetCostAndUsage + Resource: "*" + + Function: + Type: AWS::Lambda::Function + Properties: + FunctionName: !Ref ServiceName + Runtime: python3.13 + Handler: index.handler + Role: !GetAtt ExecutionRole.Arn + Timeout: 300 + MemorySize: 256 + Environment: + Variables: + OTLP_ENDPOINT: !Ref OtlpEndpoint + OTLP_HEADERS: !Ref OtlpHeaders + OTEL_SERVICE_NAME: !Ref ServiceName + DAYS_BACK: !Ref DaysBack + Code: + ZipFile: | + import json, os, urllib.request + from datetime import date, timedelta, timezone, datetime + import boto3 + + OTLP = os.environ.get('OTLP_ENDPOINT', 'https://otlp.last9.io') + HDRS = os.environ.get('OTLP_HEADERS', '') + SVC = os.environ.get('OTEL_SERVICE_NAME', 'aws-cost-reporter') + DAYS = int(os.environ.get('DAYS_BACK', '30')) + + def _parse_headers(): + h = {} + for p in HDRS.split(','): + if '=' in p: + k, v = p.split('=', 1) + h[k.strip()] = v.strip() + return h + + def _to_ns(d): + dt = datetime.strptime(d, '%Y-%m-%d').replace(hour=12, tzinfo=timezone.utc) + return str(int(dt.timestamp() * 1_000_000_000)) + + def fetch_costs(): + ce = boto3.client('ce', region_name='us-east-1') + end = date.today() + start = end - timedelta(days=DAYS) + rows = [] + tok = None + while True: + kw = dict( + TimePeriod={'Start': str(start), 'End': str(end)}, + Granularity='DAILY', + Metrics=['UnblendedCost', 'AmortizedCost'], + GroupBy=[ + {'Type': 'DIMENSION', 'Key': 'SERVICE'}, + {'Type': 'DIMENSION', 'Key': 'LINKED_ACCOUNT'}, + {'Type': 'DIMENSION', 'Key': 'REGION'}, + ], + ) + if tok: + kw['NextPageToken'] = tok + r = ce.get_cost_and_usage(**kw) + for res in r.get('ResultsByTime', []): + day = res['TimePeriod']['Start'] + for g in res.get('Groups', []): + svc, acct, region = g['Keys'] + u = float(g['Metrics']['UnblendedCost']['Amount']) + am = float(g['Metrics']['AmortizedCost']['Amount']) + if u or am: + rows.append((day, svc, acct, region, u, am)) + tok = r.get('NextPageToken') + if not tok: + break + return rows + + def send_metrics(rows): + u_dps, am_dps = [], [] + for day, svc, acct, region, u, am in rows: + ns = _to_ns(day) + attrs = [ + {'key': 'aws.service', 'value': {'stringValue': svc}}, + {'key': 'aws.account.id', 'value': {'stringValue': acct}}, + {'key': 'aws.region', 'value': {'stringValue': region}}, + ] + dp = {'attributes': attrs, 'timeUnixNano': ns} + if u: u_dps.append({**dp, 'asDouble': u}) + if am: am_dps.append({**dp, 'asDouble': am}) + + metrics = [] + if u_dps: + metrics.append({'name': 'aws.cost.unblended', 'unit': 'USD', + 'description': 'Daily unblended AWS cost', + 'gauge': {'dataPoints': u_dps}}) + if am_dps: + metrics.append({'name': 'aws.cost.amortized', 'unit': 'USD', + 'description': 'Daily amortized AWS cost (RI/SP effective rates)', + 'gauge': {'dataPoints': am_dps}}) + if not metrics: + return + + payload = json.dumps({'resourceMetrics': [{'resource': {'attributes': [ + {'key': 'service.name', 'value': {'stringValue': SVC}}, + {'key': 'cloud.provider', 'value': {'stringValue': 'aws'}}, + ]}, 'scopeMetrics': [{'scope': {'name': 'aws.cost_explorer'}, + 'metrics': metrics}]}]}).encode() + + req = urllib.request.Request( + f"{OTLP.rstrip('/')}/v1/metrics", + data=payload, + headers={**_parse_headers(), 'Content-Type': 'application/json'}, + ) + urllib.request.urlopen(req, timeout=30) + + def handler(event, context): + rows = fetch_costs() + send_metrics(rows) + return {'statusCode': 200, 'exported': len(rows)} + + ScheduleRule: + Type: AWS::Events::Rule + Properties: + Name: !Sub "${ServiceName}-schedule" + ScheduleExpression: !Ref Schedule + State: ENABLED + Targets: + - Id: CostReporterTarget + Arn: !GetAtt Function.Arn + + LambdaInvokePermission: + Type: AWS::Lambda::Permission + Properties: + FunctionName: !Ref Function + Action: lambda:InvokeFunction + Principal: events.amazonaws.com + SourceArn: !GetAtt ScheduleRule.Arn + +Outputs: + FunctionArn: + Description: Lambda function ARN + Value: !GetAtt Function.Arn + + ScheduleArn: + Description: EventBridge schedule rule ARN + Value: !GetAtt ScheduleRule.Arn + + TestCommand: + Description: Run this to test immediately after deploy + Value: !Sub "aws lambda invoke --function-name ${ServiceName} --region ${AWS::Region} /tmp/out.json && cat /tmp/out.json" diff --git a/python/aws-cost-explorer/deploy.sh b/python/aws-cost-explorer/deploy.sh new file mode 100755 index 0000000..8db7c69 --- /dev/null +++ b/python/aws-cost-explorer/deploy.sh @@ -0,0 +1,147 @@ +#!/usr/bin/env bash +# Deploys the AWS Cost Explorer collector as a Lambda function with a daily +# EventBridge schedule. Requires AWS CLI configured with sufficient permissions. +# +# Usage: +# OTLP_HEADERS="Authorization=Basic " ./deploy.sh +# +# Optional overrides: +# FUNCTION_NAME=aws-cost-reporter (default) +# AWS_REGION=us-east-1 (default) +# DAYS_BACK=30 (default) +# SCHEDULE=rate(1 day) (default) +# OTEL_SERVICE_NAME=aws-cost-reporter + +set -euo pipefail + +FUNCTION_NAME="${FUNCTION_NAME:-aws-cost-reporter}" +AWS_REGION="${AWS_REGION:-us-east-1}" +DAYS_BACK="${DAYS_BACK:-30}" +SCHEDULE="${SCHEDULE:-rate(1 day)}" +OTEL_SERVICE_NAME="${OTEL_SERVICE_NAME:-aws-cost-reporter}" +OTLP_ENDPOINT="${OTLP_ENDPOINT:-https://otlp.last9.io}" +ROLE_NAME="${FUNCTION_NAME}-role" + +: "${OTLP_HEADERS:?OTLP_HEADERS is required. Set it to: Authorization=Basic }" + +ACCOUNT_ID=$(aws sts get-caller-identity --query Account --output text) +ROLE_ARN="arn:aws:iam::${ACCOUNT_ID}:role/${ROLE_NAME}" + +echo "==> Deploying ${FUNCTION_NAME} to ${AWS_REGION}" + +# ── 1. IAM role ──────────────────────────────────────────────────────────────── + +if ! aws iam get-role --role-name "${ROLE_NAME}" &>/dev/null; then + echo "--> Creating IAM role ${ROLE_NAME}" + aws iam create-role \ + --role-name "${ROLE_NAME}" \ + --assume-role-policy-document '{ + "Version": "2012-10-17", + "Statement": [{ + "Effect": "Allow", + "Principal": {"Service": "lambda.amazonaws.com"}, + "Action": "sts:AssumeRole" + }] + }' >/dev/null + + aws iam attach-role-policy \ + --role-name "${ROLE_NAME}" \ + --policy-arn arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole + + aws iam put-role-policy \ + --role-name "${ROLE_NAME}" \ + --policy-name cost-explorer-read \ + --policy-document '{ + "Version": "2012-10-17", + "Statement": [{ + "Effect": "Allow", + "Action": "ce:GetCostAndUsage", + "Resource": "*" + }] + }' + + echo "--> Waiting for IAM role to propagate…" + sleep 10 +else + echo "--> IAM role ${ROLE_NAME} already exists" +fi + +# ── 2. Package Lambda ────────────────────────────────────────────────────────── + +echo "--> Packaging Lambda" +BUILD_DIR=$(mktemp -d) +pip install --quiet -r requirements.txt -t "${BUILD_DIR}" +cp main.py "${BUILD_DIR}/" +(cd "${BUILD_DIR}" && zip -qr /tmp/aws-cost-reporter.zip .) +rm -rf "${BUILD_DIR}" + +# ── 3. Deploy Lambda ─────────────────────────────────────────────────────────── + +ENV_VARS="Variables={OTLP_ENDPOINT=${OTLP_ENDPOINT},OTLP_HEADERS=${OTLP_HEADERS},OTEL_SERVICE_NAME=${OTEL_SERVICE_NAME},DAYS_BACK=${DAYS_BACK}}" + +if aws lambda get-function --function-name "${FUNCTION_NAME}" --region "${AWS_REGION}" &>/dev/null; then + echo "--> Updating Lambda function" + aws lambda update-function-code \ + --function-name "${FUNCTION_NAME}" \ + --zip-file fileb:///tmp/aws-cost-reporter.zip \ + --region "${AWS_REGION}" >/dev/null + aws lambda update-function-configuration \ + --function-name "${FUNCTION_NAME}" \ + --environment "${ENV_VARS}" \ + --region "${AWS_REGION}" >/dev/null +else + echo "--> Creating Lambda function" + aws lambda create-function \ + --function-name "${FUNCTION_NAME}" \ + --runtime python3.13 \ + --role "${ROLE_ARN}" \ + --handler main.lambda_handler \ + --zip-file fileb:///tmp/aws-cost-reporter.zip \ + --timeout 300 \ + --memory-size 256 \ + --environment "${ENV_VARS}" \ + --region "${AWS_REGION}" >/dev/null +fi + +FUNCTION_ARN=$(aws lambda get-function \ + --function-name "${FUNCTION_NAME}" \ + --region "${AWS_REGION}" \ + --query Configuration.FunctionArn \ + --output text) + +# ── 4. EventBridge schedule ──────────────────────────────────────────────────── + +RULE_NAME="${FUNCTION_NAME}-schedule" + +echo "--> Creating EventBridge rule: ${SCHEDULE}" +RULE_ARN=$(aws events put-rule \ + --name "${RULE_NAME}" \ + --schedule-expression "${SCHEDULE}" \ + --state ENABLED \ + --region "${AWS_REGION}" \ + --query RuleArn \ + --output text) + +aws lambda add-permission \ + --function-name "${FUNCTION_NAME}" \ + --statement-id "${RULE_NAME}" \ + --action lambda:InvokeFunction \ + --principal events.amazonaws.com \ + --source-arn "${RULE_ARN}" \ + --region "${AWS_REGION}" 2>/dev/null || true + +aws events put-targets \ + --rule "${RULE_NAME}" \ + --targets "Id=1,Arn=${FUNCTION_ARN}" \ + --region "${AWS_REGION}" >/dev/null + +# ── Done ─────────────────────────────────────────────────────────────────────── + +echo "" +echo "✓ Deployed ${FUNCTION_NAME}" +echo " Function : ${FUNCTION_ARN}" +echo " Schedule : ${SCHEDULE}" +echo " Next run : $(aws events describe-rule --name "${RULE_NAME}" --region "${AWS_REGION}" --query ScheduleExpression --output text)" +echo "" +echo "Test now:" +echo " aws lambda invoke --function-name ${FUNCTION_NAME} --region ${AWS_REGION} /tmp/out.json && cat /tmp/out.json" diff --git a/python/aws-cost-explorer/docker-compose.yaml b/python/aws-cost-explorer/docker-compose.yaml new file mode 100644 index 0000000..b2896a8 --- /dev/null +++ b/python/aws-cost-explorer/docker-compose.yaml @@ -0,0 +1,6 @@ +services: + aws-cost-explorer-collector: + build: . + env_file: + - .env + restart: unless-stopped diff --git a/python/aws-cost-explorer/main.py b/python/aws-cost-explorer/main.py new file mode 100644 index 0000000..747ba3f --- /dev/null +++ b/python/aws-cost-explorer/main.py @@ -0,0 +1,224 @@ +""" +AWS Cost Explorer → OpenTelemetry Pipeline + +Polls the AWS Cost Explorer API and exports cost metrics to Last9. +No CUR setup or S3 bucket required — data is available within minutes. + +Metrics exported: + aws.cost.unblended (USD) — daily unblended cost per service/account/region + aws.cost.amortized (USD) — daily amortized cost (includes RI/SP effective rates) + +Deployment modes: + Lambda — deploy with deploy.sh; EventBridge triggers daily (recommended) + Docker — docker compose up (for local testing or non-AWS environments) +""" + +from __future__ import annotations + +import logging +import os +import signal +import time +from datetime import date, timedelta, timezone, datetime + +import boto3 +import requests + +logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s") +log = logging.getLogger(__name__) + +# ── Configuration ────────────────────────────────────────────────────────────── + +AWS_REGION = os.environ.get("AWS_DEFAULT_REGION", "us-east-1") +DAYS_BACK = int(os.environ.get("DAYS_BACK", "30")) +POLL_INTERVAL_SECONDS = int(os.environ.get("POLL_INTERVAL_SECONDS", "86400")) + +OTLP_ENDPOINT = os.environ.get("OTLP_ENDPOINT", "https://otlp.last9.io") +OTLP_HEADERS_RAW = os.environ.get("OTLP_HEADERS", "") +OTEL_SERVICE_NAME = os.environ.get("OTEL_SERVICE_NAME", "aws-cost-reporter") + +# ── Helpers ──────────────────────────────────────────────────────────────────── + + +def _parse_headers(raw: str) -> dict[str, str]: + headers: dict[str, str] = {} + for pair in raw.split(","): + if "=" in pair: + k, v = pair.split("=", 1) + headers[k.strip()] = v.strip() + return headers + + +def _date_to_ns(date_str: str) -> str: + dt = datetime.strptime(date_str, "%Y-%m-%d").replace(hour=12, tzinfo=timezone.utc) + return str(int(dt.timestamp() * 1_000_000_000)) + + +# ── Cost Explorer fetch ──────────────────────────────────────────────────────── + + +def fetch_costs(ce: object) -> list[dict]: + """ + Fetch daily costs grouped by SERVICE, LINKED_ACCOUNT, REGION. + Returns flat list of {date, service, account_id, region, unblended, amortized}. + """ + end = date.today() + start = end - timedelta(days=DAYS_BACK) + + rows: list[dict] = [] + next_token: str | None = None + + while True: + kwargs: dict = { + "TimePeriod": {"Start": str(start), "End": str(end)}, + "Granularity": "DAILY", + "Metrics": ["UnblendedCost", "AmortizedCost"], + "GroupBy": [ + {"Type": "DIMENSION", "Key": "SERVICE"}, + {"Type": "DIMENSION", "Key": "LINKED_ACCOUNT"}, + {"Type": "DIMENSION", "Key": "REGION"}, + ], + } + if next_token: + kwargs["NextPageToken"] = next_token + + resp = ce.get_cost_and_usage(**kwargs) + + for result in resp.get("ResultsByTime", []): + day = result["TimePeriod"]["Start"] + for group in result.get("Groups", []): + service, account_id, region = group["Keys"] + unblended = float(group["Metrics"]["UnblendedCost"]["Amount"]) + amortized = float(group["Metrics"]["AmortizedCost"]["Amount"]) + if unblended == 0.0 and amortized == 0.0: + continue + rows.append({ + "date": day, + "service": service, + "account_id": account_id, + "region": region, + "unblended": unblended, + "amortized": amortized, + }) + + next_token = resp.get("NextPageToken") + if not next_token: + break + + log.info("Fetched %d cost rows (%s → %s)", len(rows), start, end) + return rows + + +# ── OTLP export ──────────────────────────────────────────────────────────────── + + +def send_otlp_metrics(rows: list[dict]) -> None: + if not rows: + log.info("No cost rows to export") + return + + unblended_dps: list[dict] = [] + amortized_dps: list[dict] = [] + + for row in rows: + time_ns = _date_to_ns(row["date"]) + attrs = [ + {"key": "aws.service", "value": {"stringValue": row["service"]}}, + {"key": "aws.account.id", "value": {"stringValue": row["account_id"]}}, + {"key": "aws.region", "value": {"stringValue": row["region"]}}, + ] + if row["unblended"] != 0.0: + unblended_dps.append({"attributes": attrs, "timeUnixNano": time_ns, + "asDouble": row["unblended"]}) + if row["amortized"] != 0.0: + amortized_dps.append({"attributes": attrs, "timeUnixNano": time_ns, + "asDouble": row["amortized"]}) + + metrics = [] + if unblended_dps: + metrics.append({ + "name": "aws.cost.unblended", + "unit": "USD", + "description": "Daily unblended AWS cost by service, account, and region", + "gauge": {"dataPoints": unblended_dps}, + }) + if amortized_dps: + metrics.append({ + "name": "aws.cost.amortized", + "unit": "USD", + "description": "Daily amortized AWS cost including RI and Savings Plan effective rates", + "gauge": {"dataPoints": amortized_dps}, + }) + + payload = { + "resourceMetrics": [{ + "resource": { + "attributes": [ + {"key": "service.name", "value": {"stringValue": OTEL_SERVICE_NAME}}, + {"key": "telemetry.sdk.language", "value": {"stringValue": "python"}}, + {"key": "cloud.provider", "value": {"stringValue": "aws"}}, + ] + }, + "scopeMetrics": [{ + "scope": {"name": "aws.cost_explorer", "version": "1.0.0"}, + "metrics": metrics, + }], + }] + } + + hdrs = {**_parse_headers(OTLP_HEADERS_RAW), "Content-Type": "application/json"} + resp = requests.post( + f"{OTLP_ENDPOINT.rstrip('/')}/v1/metrics", + json=payload, + headers=hdrs, + timeout=30, + ) + if resp.status_code not in (200, 204): + log.warning("OTLP export failed: HTTP %s — %s", resp.status_code, resp.text[:200]) + else: + log.info("Exported %d unblended + %d amortized data points to Last9", + len(unblended_dps), len(amortized_dps)) + + +# ── Poll loop ────────────────────────────────────────────────────────────────── + + +def poll(ce: object) -> None: + rows = fetch_costs(ce) + send_otlp_metrics(rows) + + +def main() -> None: + log.info("AWS Cost Explorer collector starting") + log.info("Days back : %d", DAYS_BACK) + log.info("Poll interval : %ds", POLL_INTERVAL_SECONDS) + log.info("OTLP endpoint : %s", OTLP_ENDPOINT) + + ce = boto3.client("ce", region_name="us-east-1") # Cost Explorer is a global service + + def _shutdown(sig, _frame): + log.info("Shutting down…") + raise SystemExit(0) + + signal.signal(signal.SIGTERM, _shutdown) + signal.signal(signal.SIGINT, _shutdown) + + while True: + poll(ce) + log.info("Sleeping %ds…", POLL_INTERVAL_SECONDS) + time.sleep(POLL_INTERVAL_SECONDS) + + +# ── Lambda handler ───────────────────────────────────────────────────────────── + + +def lambda_handler(event: dict, context: object) -> dict: + """Entry point for AWS Lambda (triggered by EventBridge schedule).""" + ce = boto3.client("ce", region_name="us-east-1") + rows = fetch_costs(ce) + send_otlp_metrics(rows) + return {"statusCode": 200, "exported": len(rows)} + + +if __name__ == "__main__": + main() diff --git a/python/aws-cost-explorer/requirements.txt b/python/aws-cost-explorer/requirements.txt new file mode 100644 index 0000000..2699c54 --- /dev/null +++ b/python/aws-cost-explorer/requirements.txt @@ -0,0 +1,2 @@ +boto3==1.38.0 +requests==2.32.3 diff --git a/python/aws-cur/.env.example b/python/aws-cur/.env.example new file mode 100644 index 0000000..ad63b96 --- /dev/null +++ b/python/aws-cur/.env.example @@ -0,0 +1,40 @@ +# S3 bucket where AWS delivers your CUR files +CUR_S3_BUCKET=my-company-cur-bucket + +# S3 path prefix before the report name (leave empty if report is at bucket root) +CUR_S3_PREFIX=cur/reports + +# Report name as configured in AWS Billing → Cost & Usage Reports +CUR_REPORT_NAME=daily-cost-report + +# AWS credentials — omit these if running on EC2/ECS/Lambda with an IAM role +AWS_ACCESS_KEY_ID= +AWS_SECRET_ACCESS_KEY= +AWS_DEFAULT_REGION=us-east-1 + +# How many billing periods (months) to process on each poll run +# Increase for initial backfill; reduce to 1 once caught up +MONTHS_BACK=3 + +# How often to re-poll S3 for updated CUR data (seconds). CUR updates ~once/day. +POLL_INTERVAL_SECONDS=3600 + +# Line item types to include. Defaults cover all billable usage. +# Set to "Usage" only for a simpler per-service breakdown. +INCLUDE_LINE_ITEM_TYPES=Usage,SavingsPlanCoveredUsage,DiscountedUsage,SavingsPlanNegation,BundledDiscount + +# Cost allocation tag keys to forward as aws.tag.* metric dimensions. +# Must match tags activated in AWS Billing → Cost allocation tags. +# Leave empty to omit tags (lower cardinality). Separate with commas. +# Example: COST_ALLOCATION_TAGS=team,environment,project +COST_ALLOCATION_TAGS= + +# Last9 OTLP endpoint (get from Last9 dashboard → Integrations → OTLP) +OTLP_ENDPOINT=https://otlp.last9.io + +# Last9 auth header: Authorization=Basic +# Get from Last9 dashboard → Integrations → Copy OTLP headers +OTLP_HEADERS=Authorization=Basic + +# Service name shown in Last9 +OTEL_SERVICE_NAME=aws-cost-reporter diff --git a/python/aws-cur/.gitignore b/python/aws-cur/.gitignore new file mode 100644 index 0000000..1126860 --- /dev/null +++ b/python/aws-cur/.gitignore @@ -0,0 +1,26 @@ +# Environment/secrets +.env +.env.local +.env.*.local + +# Dependencies +/.venv/ +__pycache__/ +*.pyc +*.pyo + +# IDE +.idea/ +.vscode/ +*.swp + +# OS +.DS_Store +Thumbs.db + +# Logs +*.log + +# Build artifacts +/dist/ +/build/ diff --git a/python/aws-cur/Dockerfile b/python/aws-cur/Dockerfile new file mode 100644 index 0000000..3ff08d6 --- /dev/null +++ b/python/aws-cur/Dockerfile @@ -0,0 +1,10 @@ +FROM python:3.13-slim + +WORKDIR /app + +COPY requirements.txt . +RUN pip install --no-cache-dir -r requirements.txt + +COPY main.py . + +CMD ["python", "main.py"] diff --git a/python/aws-cur/README.md b/python/aws-cur/README.md new file mode 100644 index 0000000..38b44c5 --- /dev/null +++ b/python/aws-cur/README.md @@ -0,0 +1,96 @@ +# AWS Cost and Usage Report → Last9 + +Collects AWS billing data from Cost and Usage Reports (CUR) and sends it to Last9 as OpenTelemetry metrics. + +## Prerequisites + +- AWS account with billing access +- S3 bucket for CUR delivery +- [Last9 OTLP credentials](https://app.last9.io/integrations) + +## Setup + +### 1. Enable CUR in AWS + +Go to [AWS Billing → Cost & Usage Reports](https://us-east-1.console.aws.amazon.com/billing/home#/reports) → **Create report**: + +| Setting | Value | +|---|---| +| Time granularity | Daily | +| File format | **Parquet** | +| Report content | Include resource IDs | + +Note the **bucket name**, **S3 prefix**, and **report name** — you'll need them below. + +> CUR files appear in S3 within 24 hours of first setup. + +### 2. Grant S3 read access + +Attach this policy to the IAM user or role running the collector: + +```json +{ + "Version": "2012-10-17", + "Statement": [{ + "Effect": "Allow", + "Action": ["s3:GetObject", "s3:ListBucket"], + "Resource": [ + "arn:aws:s3:::", + "arn:aws:s3:::/*" + ] + }] +} +``` + +### 3. Configure and run + +```bash +cp .env.example .env +# Fill in CUR_S3_BUCKET, CUR_REPORT_NAME, AWS credentials, and OTLP_HEADERS +docker compose up +``` + +## Configuration + +| Variable | Required | Default | Description | +|---|---|---|---| +| `CUR_S3_BUCKET` | Yes | — | S3 bucket name | +| `CUR_REPORT_NAME` | Yes | — | Report name from AWS Billing | +| `CUR_S3_PREFIX` | No | `""` | S3 path prefix before the report name | +| `AWS_ACCESS_KEY_ID` | No* | — | AWS access key | +| `AWS_SECRET_ACCESS_KEY` | No* | — | AWS secret key | +| `AWS_DEFAULT_REGION` | No | `us-east-1` | AWS region | +| `MONTHS_BACK` | No | `3` | Billing periods to process per run | +| `COST_ALLOCATION_TAGS` | No | `""` | Tag keys to add as `aws.tag.*` dimensions (e.g. `team,environment`) | +| `OTLP_HEADERS` | Yes | — | Last9 auth header (e.g. `Authorization=Basic `) | +| `OTEL_SERVICE_NAME` | No | `aws-cost-reporter` | Service name in Last9 | + +\* Skip on EC2/ECS/Lambda — the attached IAM role is used automatically. + +## Metrics + +| Metric | Unit | Dimensions | +|---|---|---| +| `aws.cost.unblended` | USD | `aws.service`, `aws.account.id`, `aws.region`, `aws.usage.type`, `aws.tag.*` | +| `aws.cost.amortized` | USD | same — RI and Savings Plan effective rates applied | +| `aws.usage.quantity` | 1 | `aws.service`, `aws.account.id`, `aws.region`, `aws.usage.type`, `aws.tag.*` | + +`aws.tag.*` dimensions appear only when `COST_ALLOCATION_TAGS` is set and those tags are activated in [AWS Billing → Cost allocation tags](https://us-east-1.console.aws.amazon.com/billing/home#/tags). + +## Verification + +Confirm data is flowing: + +``` +Exported N unblended + N amortized + N usage data points to Last9 +``` + +To test without a real AWS account: + +```bash +OTLP_HEADERS="Authorization=Basic " \ +COST_ALLOCATION_TAGS=team,environment \ +python test_local.py +``` + +Then query `aws.cost.unblended` in [Last9 Metrics](https://app.last9.io/metrics) and filter by `aws.service`. diff --git a/python/aws-cur/docker-compose.yaml b/python/aws-cur/docker-compose.yaml new file mode 100644 index 0000000..571762d --- /dev/null +++ b/python/aws-cur/docker-compose.yaml @@ -0,0 +1,6 @@ +services: + aws-cur-collector: + build: . + env_file: + - .env + restart: unless-stopped diff --git a/python/aws-cur/main.py b/python/aws-cur/main.py new file mode 100644 index 0000000..1354b26 --- /dev/null +++ b/python/aws-cur/main.py @@ -0,0 +1,421 @@ +""" +AWS Cost and Usage Report (CUR) → OpenTelemetry Pipeline + +Reads CUR parquet files from S3, aggregates daily costs by service/account/region, +and exports as OTLP gauge metrics to Last9. + +Metrics exported: + aws.cost.unblended (USD) — daily unblended cost per service/account/region + aws.cost.amortized (USD) — daily amortized cost (includes RI/SP effective cost) + aws.usage.quantity — daily usage amount per service/account/region/usage_type + +Cost allocation tags from CUR (resource_tags_user_*) are forwarded as aws.tag.* +metric attributes. Configure which tags to include via COST_ALLOCATION_TAGS. + +CUR setup required: + 1. Enable Cost and Usage Reports in AWS Billing → Cost & Usage Reports + 2. Format: Parquet, Time granularity: Daily, S3 destination configured + 3. Enable cost allocation tags you want in AWS Billing → Cost allocation tags + 4. Set CUR_S3_BUCKET, CUR_S3_PREFIX, CUR_REPORT_NAME env vars + +Historical timestamps are written directly via OTLP/HTTP JSON so each data +point carries the actual billing date, not the current time. +""" + +from __future__ import annotations + +import io +import json +import logging +import os +import signal +import time +from datetime import datetime, timezone +from typing import Any + +import boto3 +import pandas as pd +import pyarrow.parquet as pq +import requests + +logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s") +log = logging.getLogger(__name__) + +# ── Configuration ────────────────────────────────────────────────────────────── + +CUR_S3_BUCKET = os.environ["CUR_S3_BUCKET"] +CUR_S3_PREFIX = os.environ.get("CUR_S3_PREFIX", "").strip("/") +CUR_REPORT_NAME = os.environ["CUR_REPORT_NAME"] +AWS_REGION = os.environ.get("AWS_DEFAULT_REGION", "us-east-1") + +POLL_INTERVAL_SECONDS = int(os.environ.get("POLL_INTERVAL_SECONDS", "3600")) +MONTHS_BACK = int(os.environ.get("MONTHS_BACK", "3")) + +OTLP_ENDPOINT = os.environ.get("OTLP_ENDPOINT", "https://otlp.last9.io") +OTLP_HEADERS_RAW = os.environ.get("OTLP_HEADERS", "") +OTEL_SERVICE_NAME = os.environ.get("OTEL_SERVICE_NAME", "aws-cost-reporter") + +# Line item types to include — excludes Tax, Credit, Refund, etc. +INCLUDE_LINE_ITEM_TYPES = frozenset( + os.environ.get( + "INCLUDE_LINE_ITEM_TYPES", + "Usage,SavingsPlanCoveredUsage,DiscountedUsage,SavingsPlanNegation,BundledDiscount", + ).split(",") +) + +# Cost allocation tag keys to include as metric dimensions (aws.tag.). +# Must match tags enabled in AWS Billing → Cost allocation tags. +# Example: COST_ALLOCATION_TAGS=team,environment,project +COST_ALLOCATION_TAGS: list[str] = [ + t.strip() + for t in os.environ.get("COST_ALLOCATION_TAGS", "").split(",") + if t.strip() +] + +# ── Helpers ──────────────────────────────────────────────────────────────────── + + +def _parse_headers(raw: str) -> dict[str, str]: + headers: dict[str, str] = {} + for pair in raw.split(","): + if "=" in pair: + k, v = pair.split("=", 1) + headers[k.strip()] = v.strip() + return headers + + +def _date_to_ns(date_str: str) -> str: + """Convert YYYY-MM-DD to Unix nanosecond string for OTLP.""" + dt = datetime.strptime(date_str, "%Y-%m-%d").replace(hour=12, tzinfo=timezone.utc) + return str(int(dt.timestamp() * 1_000_000_000)) + + +# ── S3 / CUR reading ─────────────────────────────────────────────────────────── + + +def _report_prefix() -> str: + parts = [p for p in [CUR_S3_PREFIX, CUR_REPORT_NAME] if p] + return "/".join(parts) + "/" + + +def list_billing_periods(s3: Any) -> list[str]: + """Return S3 key prefixes for billing period folders, newest first.""" + prefix = _report_prefix() + paginator = s3.get_paginator("list_objects_v2") + folders: set[str] = set() + for page in paginator.paginate(Bucket=CUR_S3_BUCKET, Prefix=prefix, Delimiter="/"): + for cp in page.get("CommonPrefixes", []): + folders.add(cp["Prefix"]) + return sorted(folders, reverse=True) + + +def load_manifest(s3: Any, period_prefix: str) -> dict: + key = f"{period_prefix}{CUR_REPORT_NAME}-Manifest.json" + obj = s3.get_object(Bucket=CUR_S3_BUCKET, Key=key) + return json.loads(obj["Body"].read()) + + +def read_cur_parquet(s3: Any, manifest: dict) -> pd.DataFrame: + """Download and concatenate all parquet chunks for one billing period.""" + dfs: list[pd.DataFrame] = [] + for key in manifest.get("reportKeys", []): + log.info(" Reading s3://%s/%s", CUR_S3_BUCKET, key) + obj = s3.get_object(Bucket=CUR_S3_BUCKET, Key=key) + buf = io.BytesIO(obj["Body"].read()) + dfs.append(pq.read_table(buf).to_pandas()) + if not dfs: + return pd.DataFrame() + return pd.concat(dfs, ignore_index=True) + + +# ── Column normalization ─────────────────────────────────────────────────────── + +# CUR parquet may use either "line_item/ProductCode" or "line_item_product_code" +# as column names depending on the CUR export version. Normalize to snake_case. +_COL_ALIASES: dict[str, str] = { + "line_item/productcode": "line_item_product_code", + "line_item/usagestartdate": "line_item_usage_start_date", + "line_item/usageaccountid": "line_item_usage_account_id", + "line_item/lineitemtype": "line_item_line_item_type", + "line_item/unblendedcost": "line_item_unblended_cost", + "line_item/usageamount": "line_item_usage_amount", + "line_item/usagetype": "line_item_usage_type", + "product/region": "product_region", +} + + +def _normalize_columns(df: pd.DataFrame) -> pd.DataFrame: + rename = {} + for col in df.columns: + normalized = col.lower().replace("/", "_") + if normalized != col: + rename[col] = normalized + # Also handle slash-containing names + alias = col.lower().replace("_", "").replace("/", "") + for src, dst in _COL_ALIASES.items(): + if src.replace("_", "").replace("/", "") == alias: + rename[col] = dst + break + return df.rename(columns=rename) if rename else df + + +# ── Aggregation ──────────────────────────────────────────────────────────────── + +_REQUIRED_COLS = { + "line_item_usage_start_date", + "line_item_product_code", + "line_item_usage_account_id", + "product_region", + "line_item_line_item_type", + "line_item_unblended_cost", + "line_item_usage_amount", + "line_item_usage_type", +} + +# Per-type column that holds the true effective cost for amortized calculation. +# SavingsPlanCoveredUsage → SP effective cost; DiscountedUsage → RI effective cost. +_AMORTIZED_COL_BY_TYPE: dict[str, str] = { + "SavingsPlanCoveredUsage": "savings_plan_savings_plan_effective_cost", + "DiscountedUsage": "reservation_effective_cost", +} + + +def _compute_amortized(df: pd.DataFrame) -> pd.Series: + """Return per-row amortized cost using SP/RI effective cost where applicable.""" + result = df["line_item_unblended_cost"].copy() + for item_type, col in _AMORTIZED_COL_BY_TYPE.items(): + if col in df.columns: + mask = df["line_item_line_item_type"] == item_type + result.loc[mask] = pd.to_numeric(df.loc[mask, col], errors="coerce").fillna(0.0) + return result + + +def _tag_col(tag_key: str) -> str: + """Map a cost allocation tag key to its CUR parquet column name.""" + return f"resource_tags_user_{tag_key.lower().replace('-', '_').replace(' ', '_')}" + + +def aggregate_costs(df: pd.DataFrame) -> pd.DataFrame: + """ + Group by date/service/account/region/usage_type (+ configured tags) and sum costs. + Returns DataFrame with columns: + date, service, account_id, region, usage_type, + unblended_cost, amortized_cost, usage_quantity, + [aws_tag_ for each COST_ALLOCATION_TAGS entry] + """ + df = _normalize_columns(df) + + missing = _REQUIRED_COLS - set(df.columns) + if missing: + log.warning("CUR missing expected columns: %s", missing) + for col in missing: + df[col] = None + + df = df[df["line_item_line_item_type"].isin(INCLUDE_LINE_ITEM_TYPES)].copy() + + df["date"] = pd.to_datetime(df["line_item_usage_start_date"]).dt.strftime("%Y-%m-%d") + df["line_item_unblended_cost"] = ( + pd.to_numeric(df["line_item_unblended_cost"], errors="coerce").fillna(0.0) + ) + df["line_item_usage_amount"] = ( + pd.to_numeric(df["line_item_usage_amount"], errors="coerce").fillna(0.0) + ) + df["amortized_cost"] = _compute_amortized(df) + + # Resolve which tag columns exist in this CUR file + tag_cols: list[str] = [] + tag_col_map: dict[str, str] = {} # cur_col → output_col + for tag_key in COST_ALLOCATION_TAGS: + cur_col = _tag_col(tag_key) + if cur_col in df.columns: + out_col = f"aws_tag_{tag_key.lower().replace('-', '_')}" + tag_cols.append(cur_col) + tag_col_map[cur_col] = out_col + else: + log.warning("Cost allocation tag column not found in CUR: %s (tag: %s)", cur_col, tag_key) + + group_cols = [ + "date", + "line_item_product_code", + "line_item_usage_account_id", + "product_region", + "line_item_usage_type", + *tag_cols, + ] + + agg = ( + df.groupby(group_cols, dropna=False) + .agg( + unblended_cost=("line_item_unblended_cost", "sum"), + amortized_cost=("amortized_cost", "sum"), + usage_quantity=("line_item_usage_amount", "sum"), + ) + .reset_index() + .rename(columns={ + "line_item_product_code": "service", + "line_item_usage_account_id": "account_id", + "product_region": "region", + "line_item_usage_type": "usage_type", + **tag_col_map, + }) + ) + return agg + + +# ── OTLP export ──────────────────────────────────────────────────────────────── + + +def _row_attrs(row: dict) -> list[dict]: + attrs = [] + for key, val in [ + ("aws.service", row.get("service", "")), + ("aws.account.id", row.get("account_id", "")), + ("aws.region", row.get("region", "")), + ("aws.usage.type", row.get("usage_type", "")), + ]: + if val and str(val) not in ("", "nan", "None"): + attrs.append({"key": key, "value": {"stringValue": str(val)}}) + # Forward cost allocation tags as aws.tag. + for col, val in row.items(): + if col.startswith("aws_tag_") and val and str(val) not in ("", "nan", "None"): + attr_key = "aws.tag." + col[len("aws_tag_"):] + attrs.append({"key": attr_key, "value": {"stringValue": str(val)}}) + return attrs + + +def send_otlp_metrics(agg: pd.DataFrame) -> None: + if agg.empty: + log.info("No rows to export") + return + + cost_dps: list[dict] = [] + amortized_dps: list[dict] = [] + usage_dps: list[dict] = [] + + for _, row in agg.iterrows(): + time_ns = _date_to_ns(row["date"]) + attrs = _row_attrs(row.to_dict()) + unblended = float(row["unblended_cost"]) + amortized = float(row["amortized_cost"]) + qty = float(row["usage_quantity"]) + if unblended != 0.0: + cost_dps.append({"attributes": attrs, "timeUnixNano": time_ns, "asDouble": unblended}) + if amortized != 0.0: + amortized_dps.append({"attributes": attrs, "timeUnixNano": time_ns, "asDouble": amortized}) + if qty != 0.0: + usage_dps.append({"attributes": attrs, "timeUnixNano": time_ns, "asDouble": qty}) + + metrics: list[dict] = [] + if cost_dps: + metrics.append({ + "name": "aws.cost.unblended", + "unit": "USD", + "description": "Daily unblended AWS cost by service, account, and region", + "gauge": {"dataPoints": cost_dps}, + }) + if amortized_dps: + metrics.append({ + "name": "aws.cost.amortized", + "unit": "USD", + "description": "Daily amortized AWS cost including RI and Savings Plan effective rates", + "gauge": {"dataPoints": amortized_dps}, + }) + if usage_dps: + metrics.append({ + "name": "aws.usage.quantity", + "unit": "1", + "description": "Daily AWS usage amount by service, account, region, and usage type", + "gauge": {"dataPoints": usage_dps}, + }) + + if not metrics: + return + + payload = { + "resourceMetrics": [{ + "resource": { + "attributes": [ + {"key": "service.name", "value": {"stringValue": OTEL_SERVICE_NAME}}, + {"key": "telemetry.sdk.language", "value": {"stringValue": "python"}}, + {"key": "cloud.provider", "value": {"stringValue": "aws"}}, + ] + }, + "scopeMetrics": [{ + "scope": {"name": "aws.cur", "version": "1.0.0"}, + "metrics": metrics, + }], + }] + } + + hdrs = {**_parse_headers(OTLP_HEADERS_RAW), "Content-Type": "application/json"} + resp = requests.post( + f"{OTLP_ENDPOINT.rstrip('/')}/v1/metrics", + json=payload, + headers=hdrs, + timeout=60, + ) + if resp.status_code not in (200, 204): + log.warning("OTLP export failed: HTTP %s — %s", resp.status_code, resp.text[:200]) + else: + log.info("Exported %d unblended + %d amortized + %d usage data points to Last9", + len(cost_dps), len(amortized_dps), len(usage_dps)) + + +# ── Poll loop ────────────────────────────────────────────────────────────────── + + +def poll(s3: Any) -> None: + periods = list_billing_periods(s3) + if not periods: + log.warning("No billing period folders found at s3://%s/%s", CUR_S3_BUCKET, _report_prefix()) + return + + for period_prefix in periods[:MONTHS_BACK]: + try: + manifest = load_manifest(s3, period_prefix) + billing_start = manifest.get("billingPeriod", {}).get("start", period_prefix) + log.info("Billing period: %s (%d file(s))", + billing_start, len(manifest.get("reportKeys", []))) + + df = read_cur_parquet(s3, manifest) + if df.empty: + log.info(" No data in period %s", period_prefix) + continue + + log.info(" Loaded %d rows, aggregating…", len(df)) + agg = aggregate_costs(df) + log.info(" Aggregated to %d rows", len(agg)) + send_otlp_metrics(agg) + + except s3.exceptions.NoSuchKey: + log.warning("Manifest not found for %s — CUR may still be generating", period_prefix) + except Exception as exc: # noqa: BLE001 + log.error("Error processing %s: %s", period_prefix, exc) + + +def main() -> None: + log.info("AWS CUR collector starting") + log.info("Bucket : %s", CUR_S3_BUCKET) + log.info("Prefix : %s", CUR_S3_PREFIX or "(root)") + log.info("Report name : %s", CUR_REPORT_NAME) + log.info("Months back : %d", MONTHS_BACK) + log.info("Poll interval : %ds", POLL_INTERVAL_SECONDS) + log.info("OTLP endpoint : %s", OTLP_ENDPOINT) + + s3 = boto3.client("s3", region_name=AWS_REGION) + + def _shutdown(sig, _frame): + log.info("Shutting down…") + raise SystemExit(0) + + signal.signal(signal.SIGTERM, _shutdown) + signal.signal(signal.SIGINT, _shutdown) + + while True: + poll(s3) + log.info("Sleeping %ds…", POLL_INTERVAL_SECONDS) + time.sleep(POLL_INTERVAL_SECONDS) + + +if __name__ == "__main__": + main() diff --git a/python/aws-cur/requirements.txt b/python/aws-cur/requirements.txt new file mode 100644 index 0000000..1288f7e --- /dev/null +++ b/python/aws-cur/requirements.txt @@ -0,0 +1,4 @@ +boto3==1.38.0 +pyarrow==19.0.1 +pandas==2.2.3 +requests==2.32.3 diff --git a/python/aws-cur/test_local.py b/python/aws-cur/test_local.py new file mode 100644 index 0000000..7969588 --- /dev/null +++ b/python/aws-cur/test_local.py @@ -0,0 +1,162 @@ +""" +Local smoke test — no AWS required. + +Generates synthetic CUR data covering all line item types, runs it through +aggregate_costs() and send_otlp_metrics(), and verifies data lands in Last9. + +Usage: + pip install -r requirements.txt + OTLP_ENDPOINT=https://otlp.last9.io \ + OTLP_HEADERS="Authorization=Basic " \ + CUR_S3_BUCKET=local CUR_REPORT_NAME=local \ + COST_ALLOCATION_TAGS=team,environment \ + python test_local.py +""" + +from __future__ import annotations + +import os +import sys +from datetime import date, timedelta + +import pandas as pd + +# Provide dummy values so main.py config block doesn't raise on import +os.environ.setdefault("CUR_S3_BUCKET", "local") +os.environ.setdefault("CUR_REPORT_NAME", "local") + +from main import aggregate_costs, send_otlp_metrics # noqa: E402 + + +def _make_cur_df(days: int = 7) -> pd.DataFrame: + """Build a minimal CUR-shaped DataFrame covering key line item types.""" + rows = [] + today = date.today() + + services = [ + ("AmazonEC2", "us-east-1", "BoxUsage:t3.medium"), + ("AmazonS3", "us-east-1", "TimedStorage-ByteHrs"), + ("AmazonRDS", "ap-south-1", "InstanceUsage:db.t3.micro"), + ("AWSLambda", "us-east-1", "Lambda-GB-Second"), + ] + + for d in range(days): + usage_date = today - timedelta(days=d + 1) + date_str = usage_date.strftime("%Y-%m-%d") + + for service, region, usage_type in services: + base = { + "line_item_usage_start_date": date_str, + "line_item_product_code": service, + "line_item_usage_account_id": "123456789012", + "product_region": region, + "line_item_usage_type": usage_type, + "resource_tags_user_team": "platform", + "resource_tags_user_environment": "production", + } + + # Regular on-demand usage + rows.append({**base, + "line_item_line_item_type": "Usage", + "line_item_unblended_cost": 12.50, + "line_item_usage_amount": 24.0, + "savings_plan_savings_plan_effective_cost": 0.0, + "reservation_effective_cost": 0.0, + }) + + # Savings Plan covered usage (EC2) — amortized should differ from unblended + rows.append({ + "line_item_usage_start_date": date_str, + "line_item_product_code": "AmazonEC2", + "line_item_usage_account_id": "123456789012", + "product_region": "us-east-1", + "line_item_usage_type": "BoxUsage:m5.xlarge", + "line_item_line_item_type": "SavingsPlanCoveredUsage", + "line_item_unblended_cost": 4.608, # on-demand rate + "line_item_usage_amount": 24.0, + "savings_plan_savings_plan_effective_cost": 3.312, # SP effective (cheaper) + "reservation_effective_cost": 0.0, + "resource_tags_user_team": "backend", + "resource_tags_user_environment": "production", + }) + + # Reserved Instance usage + rows.append({ + "line_item_usage_start_date": date_str, + "line_item_product_code": "AmazonRDS", + "line_item_usage_account_id": "123456789012", + "product_region": "us-east-1", + "line_item_usage_type": "InstanceUsage:db.r5.large", + "line_item_line_item_type": "DiscountedUsage", + "line_item_unblended_cost": 0.0, # RI shows $0 unblended + "line_item_usage_amount": 24.0, + "savings_plan_savings_plan_effective_cost": 0.0, + "reservation_effective_cost": 2.88, # RI amortized cost + "resource_tags_user_team": "data", + "resource_tags_user_environment": "production", + }) + + # Tax row — should be excluded by INCLUDE_LINE_ITEM_TYPES filter + rows.append({ + "line_item_usage_start_date": date_str, + "line_item_product_code": "AmazonEC2", + "line_item_usage_account_id": "123456789012", + "product_region": "us-east-1", + "line_item_usage_type": "Tax", + "line_item_line_item_type": "Tax", + "line_item_unblended_cost": 1.50, + "line_item_usage_amount": 0.0, + "savings_plan_savings_plan_effective_cost": 0.0, + "reservation_effective_cost": 0.0, + "resource_tags_user_team": "", + "resource_tags_user_environment": "", + }) + + return pd.DataFrame(rows) + + +def main() -> None: + df = _make_cur_df(days=7) + print(f"Generated {len(df)} synthetic CUR rows") + + agg = aggregate_costs(df) + print(f"\nAggregated to {len(agg)} rows") + print("\nSample rows:") + print(agg.to_string(max_rows=10)) + + # Verify amortized < unblended for SP/RI rows (discount is working) + sp_rows = agg[agg["usage_type"].str.contains("m5.xlarge", na=False)] + if not sp_rows.empty: + row = sp_rows.iloc[0] + assert row["amortized_cost"] < row["unblended_cost"], ( + f"SP amortized ({row['amortized_cost']}) should be < unblended ({row['unblended_cost']})" + ) + print(f"\n✓ SP amortized cost ({row['amortized_cost']:.4f}) < unblended ({row['unblended_cost']:.4f})") + + # Verify RI rows have non-zero amortized despite $0 unblended + ri_rows = agg[agg["usage_type"].str.contains("db.r5.large", na=False)] + if not ri_rows.empty: + row = ri_rows.iloc[0] + assert row["amortized_cost"] > 0, "RI amortized cost should be non-zero" + assert row["unblended_cost"] == 0.0, "RI unblended cost should be $0" + print(f"✓ RI amortized cost ({row['amortized_cost']:.4f}) > 0 despite $0 unblended") + + # Verify Tax rows were excluded + tax_rows = agg[agg["usage_type"] == "Tax"] + assert tax_rows.empty, "Tax line items should be filtered out" + print("✓ Tax line items filtered out") + + # Verify tag columns present + tag_cols = [c for c in agg.columns if c.startswith("aws_tag_")] + if tag_cols: + print(f"✓ Tag columns forwarded: {tag_cols}") + else: + print(" (No COST_ALLOCATION_TAGS configured — tag columns not present)") + + print("\nSending to Last9 OTLP endpoint…") + send_otlp_metrics(agg) + print("Done. Query aws.cost.unblended and aws.cost.amortized in Last9.") + + +if __name__ == "__main__": + main()