Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 29 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,34 @@
## Unreleased

### Fixed

- **Telemetry: fire-and-forget events no longer vanish before delivery.** Three
delivery bugs found by end-to-end testing of the published 0.6.6 packages, fixed
in both SDKs: (1) a `TelemetryClient` constructed without holding a reference —
the CLI's `cli_command` pattern — was garbage-collected together with its buffered
events before the exit flush could send them (the registry is a weak set by
design; clients now hold a strong module-level reference from first buffered
event until the buffer drains); (2) `aclose()`/`close()` flushed an already-empty
buffer while the delivery POST started by `record(...)` was still in flight, so a
prompt shutdown killed it mid-air (close now awaits the in-flight flush first);
(3) events recorded *while* a flush POST was in flight stranded in the buffer with
no flush scheduled (a completed flush now chains another when the buffer is
non-empty). Net effect: `getpatter <command>` CLI usage events actually arrive, and
constructor-time events (`sdk_initialized`, `first_run`) can no longer shadow
agent-time events. `libraries/python/getpatter/telemetry/client.py`,
`libraries/typescript/src/telemetry/client.ts`.

### Added

- **Telemetry: Realtime model variant in `feature_used`.** Realtime agents now
report which Realtime model they run (`llm_model: "openai-gpt-realtime-2"`,
`"openai-gpt-realtime-mini"`, …) through the existing sanitized `llm_model`
dimension (custom/fine-tuned names still collapse to `openai-other`), so model mix
is visible for the realtime engine family the way it already was for pipeline
stacks. The per-process dedupe key includes the model, so two agents on different
Realtime models both record. `libraries/python/getpatter/client.py`,
`libraries/typescript/src/client.ts`.

## 0.6.6 (2026-06-10)

### Added
Expand Down
2 changes: 1 addition & 1 deletion docs/telemetry.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ value is replaced with `other` before sending):
| `sdk_initialized` | `carrier` (twilio/telnyx/plivo/none), `tunnel` (static/configured/none), and anonymous deploy-shape (presence-only env/file probes — **never the value of an env var**): `invoked_by_agent` (claude/cursor/copilot/…/none), `container` (bool), `serverless` (lambda/cloud_run/vercel/azure_functions/none), `cloud` (aws/gcp/azure/fly/none), `package_manager`, `days_since_install_bucket`, and `previous_sdk_version` (for the upgrade funnel) |
| `first_run` | Sent **once per install** (the run that creates the local state), to mark activation. Carries the same anonymous deploy-shape dimensions as `sdk_initialized`. Never sent if you have opted out (opting out never touches the filesystem) |
| `cli_command` | `cli_command` — which `getpatter` CLI command was invoked (`dashboard` / `eval` / `telemetry` / `other`). **Only the command name** — never arguments or flag values. The `getpatter telemetry` control command never emits telemetry itself |
| `feature_used` | `engine` (realtime/convai/pipeline), `provider` (vendor family), and for **pipeline** the composed stack: `stt_provider` / `tts_provider` / `llm_provider` (vendor) plus `stt_model` / `tts_model` / `llm_model` (e.g. `deepgram-nova-3`, `anthropic-claude-opus-4-8`). A fine-tuned, self-hosted, or custom model collapses to `{vendor}-other` — a custom model name is **never** sent |
| `feature_used` | `engine` (realtime/convai/pipeline), `provider` (vendor family), and the models in use: for **pipeline** the composed stack`stt_provider` / `tts_provider` / `llm_provider` (vendor) plus `stt_model` / `tts_model` / `llm_model` (e.g. `deepgram-nova-3`, `anthropic-claude-opus-4-8`) — and for **realtime** the Realtime model as `llm_model` (e.g. `openai-gpt-realtime-2`). A fine-tuned, self-hosted, or custom model collapses to `{vendor}-other` — a custom model name is **never** sent |
| `agent_configured` | `builtin_tool_count` (0–3), `custom_tool_count_bucket` (a range like `2_3` — **never the tool names**), `integration` (openclaw/mcp/hermes/other/none), `integration_kind` (consult/mcp/none), `mcp_server_count_bucket`, and feature-adoption flags: `noise_reduction` (near_field/far_field/none), `turn_detection` (default/custom/none), `preambles_used` (bool), `per_tool_timeouts_set` (bool), `llm_fallback_configured` (bool) |
| `call_started` | Sent when a call connects (media stream begins), to pair with `call_completed` for a connect→complete funnel: `engine`, `provider`, `carrier`, and `direction` (inbound/outbound). No metrics exist yet at connect, so no latency/cost/duration |
| `call_completed` | `outcome` (completed / error / no_answer / busy / failed), `direction` (inbound/outbound), `error_code` (a closed code like `rate_limit` / `timeout` / `connection` — **never the error message**), `engine`, `provider`, `carrier`, `latency_ms` (whole milliseconds), `duration_seconds` (whole seconds), `cost_usd` (total call cost in USD), `turn_count_bucket` — sent once per call. **No call content, no per-call identifier.** |
Expand Down
10 changes: 9 additions & 1 deletion libraries/python/getpatter/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -1679,10 +1679,18 @@ def agent(
# --- Anonymous telemetry: engine family + the composed stack, deduped ---
# Deduped by the *whole* stack signature (not just the engine family) so a
# second agent with a different STT/TTS/LLM still records its composition.
from getpatter.telemetry.stack import stack_dimensions
from getpatter.telemetry.stack import model_token, stack_dimensions

_family = _telemetry_engine_family(engine, provider, stt, tts)
_stack = stack_dimensions(stt, tts, llm)
if _family == "realtime":
# Realtime has no composed stack, so the model variant is the whole
# story — resolve it the same way the engine unpacking below does
# (an explicit model= kwarg wins; otherwise the engine's model).
_rt_model = model
if _rt_model == "gpt-realtime-mini" and getattr(engine, "model", None):
_rt_model = engine.model
_stack = {**_stack, "llm_model": model_token("openai", _rt_model)}
_feature_key = (
_family + "|" + ",".join(f"{k}={v}" for k, v in sorted(_stack.items()))
)
Expand Down
29 changes: 27 additions & 2 deletions libraries/python/getpatter/telemetry/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,15 @@
_LIVE_CLIENTS: "weakref.WeakSet[TelemetryClient]" = weakref.WeakSet()
_ATEXIT_REGISTERED = False

# Strong references to clients that still hold undelivered events. The registry
# above is deliberately weak so a discarded ``Patter``'s client can be collected —
# but a client constructed fire-and-forget (the CLI pattern:
# ``TelemetryClient(...).record(...)`` with no reference held) must not take its
# buffered events to the grave before a flush delivers them. A client is held
# strongly from its first buffered event until the buffer drains, so the lifetime
# is bounded by the next flush (async task, ``aclose``, or the atexit hook).
_PENDING_FLUSH: "set[TelemetryClient]" = set()

# One-time "telemetry is on" notice, shown once per process.
_NOTICE_SHOWN = False

Expand Down Expand Up @@ -143,6 +152,7 @@ def record(self, name: str, **dimensions: Any) -> None:

try:
self._buffer.append(event) # maxlen deque drops oldest when full
_PENDING_FLUSH.add(self) # survive GC until the buffer drains
self._schedule_flush()
except Exception:
logger.debug("telemetry enqueue failed", exc_info=True)
Expand All @@ -169,11 +179,21 @@ async def aclose(self) -> None:
self._closed = True
_LIVE_CLIENTS.discard(self)
if not self._enabled or self._debug:
_PENDING_FLUSH.discard(self)
return
try:
# A flush task scheduled by ``record`` drains the buffer immediately,
# so flushing again below would see nothing — await the in-flight
# delivery first or shutdown kills its POST mid-air.
if self._flush_task is not None and not self._flush_task.done():
await asyncio.wait_for(self._flush_task, timeout=_FLUSH_TIMEOUT_S)
except Exception:
logger.debug("telemetry aclose in-flight flush failed", exc_info=True)
try:
await asyncio.wait_for(self._flush(), timeout=_FLUSH_TIMEOUT_S)
except Exception:
logger.debug("telemetry aclose flush failed", exc_info=True)
_PENDING_FLUSH.discard(self)
if self._http is not None:
try:
await self._http.aclose()
Expand All @@ -193,17 +213,22 @@ def _schedule_flush(self) -> None:
self._flush_task = loop.create_task(self._flush())
self._flush_task.add_done_callback(self._on_flush_done)

@staticmethod
def _on_flush_done(task: "asyncio.Task[None]") -> None:
def _on_flush_done(self, task: "asyncio.Task[None]") -> None:
if task.cancelled():
return
exc = task.exception()
if exc is not None:
logger.debug("telemetry flush task raised", exc_info=exc)
# Events recorded while the POST was in flight are sitting in the buffer
# with no flush scheduled (``record`` saw the live task and skipped) —
# chain another flush or they strand until aclose()/process exit.
if self._buffer and not self._closed:
self._schedule_flush()

def _drain(self) -> list[dict[str, Any]]:
events = list(self._buffer)
self._buffer.clear()
_PENDING_FLUSH.discard(self) # nothing buffered — GC may reclaim us again
return events

async def _flush(self) -> None:
Expand Down
77 changes: 77 additions & 0 deletions libraries/python/tests/test_telemetry.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from __future__ import annotations

import asyncio
import gc
import json
import threading
from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer
Expand Down Expand Up @@ -151,6 +152,82 @@ async def test_denylisted_dimensions_are_dropped(enabled, collector):
assert "sk-secret" not in blob


# --- realtime model capture --------------------------------------------------


async def test_agent_records_realtime_model_in_feature_used(
enabled, collector, monkeypatch
):
"""The Realtime engine's model variant ships in ``feature_used`` (pipeline
already carries per-layer models; realtime previously sent only the engine
family). The dedupe key includes the model, so a second agent on a different
Realtime model records again.
"""
monkeypatch.setenv("PATTER_TELEMETRY_ENDPOINT", collector.url)
from getpatter import OpenAIRealtime, Patter

phone = Patter()
phone.agent(
system_prompt="hi",
engine=OpenAIRealtime(api_key="sk-test", model="gpt-realtime-2"),
)
phone.agent(
system_prompt="hi",
engine=OpenAIRealtime(api_key="sk-test", model="gpt-realtime-mini"),
)

deadline = asyncio.get_event_loop().time() + 2.0
while (
sum(1 for e in collector.events if e["event"] == "feature_used") < 2
and asyncio.get_event_loop().time() < deadline
):
await asyncio.sleep(0.01)

feature_events = [e for e in collector.events if e["event"] == "feature_used"]
assert sorted(e.get("llm_model") for e in feature_events) == [
"openai-gpt-realtime-2",
"openai-gpt-realtime-mini",
]
assert all(e["engine"] == "realtime" for e in feature_events)


# --- pending-buffer survival (fire-and-forget clients) ----------------------


def test_unreferenced_client_buffered_event_survives_gc_until_atexit(
enabled, collector
):
"""The CLI records via ``TelemetryClient(...).record(...)`` without keeping a
reference. The buffered event must survive garbage collection and ship via
the atexit flush — a client holding undelivered events may not die with its
last reference (regression: the WeakSet registry let CPython collect it,
silently losing every ``cli_command`` event).
"""
from getpatter.telemetry import client as client_mod

TelemetryClient(sdk_version="0.6.6", endpoint=collector.url).record(
"cli_command", cli_command="dashboard"
)
gc.collect()

client_mod._atexit_flush_all()

assert [e["event"] for e in collector.events] == ["cli_command"]


async def test_aclose_awaits_in_flight_flush_started_by_record(enabled, collector):
"""``record`` schedules a flush task that drains the buffer immediately;
``aclose`` must await that in-flight delivery — not just its own (now-empty)
flush — or a graceful shutdown right after recording kills the POST mid-air.
"""
client = TelemetryClient(sdk_version="0.6.6", endpoint=collector.url)
client.record("cli_command", cli_command="dashboard")
await asyncio.sleep(0) # let the scheduled flush task start and drain
await client.aclose()

assert [e["event"] for e in collector.events] == ["cli_command"]


# --- disabled paths: zero egress -------------------------------------------


Expand Down
17 changes: 15 additions & 2 deletions libraries/typescript/src/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ import { validateAllToolSchemas } from "./tools/schema-validation";
import type { ToolDefinition } from "./types";
import { getLogger } from "./logger";
import { TelemetryClient } from "./telemetry";
import { stackDimensions } from "./telemetry/stack";
import { modelToken, stackDimensions } from "./telemetry/stack";
import {
invokedByAgent,
inContainer,
Expand Down Expand Up @@ -589,7 +589,20 @@ export class Patter {
// *whole* stack signature so a second agent with a different STT/TTS/LLM still
// records its composition.
const family = telemetryEngineFamily(opts);
const stack = stackDimensions(opts.stt, opts.tts, opts.llm);
let stack: Record<string, string> = { ...stackDimensions(opts.stt, opts.tts, opts.llm) };
if (family === "realtime") {
// Realtime has no composed stack, so the model variant is the whole
// story — resolve it the same way the engine merge below does (an
// explicit model wins; otherwise the engine's model; else the default).
const engineModel = (opts.engine as { model?: string } | undefined)?.model;
stack = {
...stack,
llm_model: modelToken(
"openai",
(opts.model as string | undefined) ?? engineModel ?? "gpt-realtime-mini",
),
};
}
const featureKey =
family +
"|" +
Expand Down
35 changes: 29 additions & 6 deletions libraries/typescript/src/telemetry/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,15 @@ let noticeShown = false;
const liveClients = new Set<WeakRef<TelemetryClient>>();
let exitHookRegistered = false;

// Strong references to clients that still hold undelivered events (mirrors the
// Python `_PENDING_FLUSH` set). The registry above is deliberately weak so a
// discarded `Patter`'s client can be collected — but a client constructed
// fire-and-forget (`new TelemetryClient(...).record(...)` with no reference
// held) must not take its buffered events to the grave before a flush delivers
// them. A client is held strongly from its first buffered event until the
// buffer drains, so the lifetime is bounded by the next flush.
const pendingFlush = new Set<TelemetryClient>();

function showNoticeOnce(): void {
if (noticeShown) return;
noticeShown = true;
Expand Down Expand Up @@ -73,7 +82,7 @@ export class TelemetryClient {
private readonly endpoint: string;
private readonly debug: boolean;
private readonly buffer: TelemetryEvent[] = [];
private flushing = false;
private inflight: Promise<void> | null = null;
private closed = false;
private readonly selfRef: WeakRef<TelemetryClient> = new WeakRef(this);

Expand Down Expand Up @@ -120,6 +129,7 @@ export class TelemetryClient {
try {
if (this.buffer.length >= BUFFER_MAX) this.buffer.shift(); // drop oldest
this.buffer.push(event);
pendingFlush.add(this); // survive GC until the buffer drains
this.scheduleFlush();
} catch (err) {
getLogger().debug('telemetry enqueue failed', err);
Expand All @@ -145,25 +155,38 @@ export class TelemetryClient {
if (this.closed) return;
this.closed = true;
liveClients.delete(this.selfRef);
if (!this.enabledFlag || this.debug) return;
if (!this.enabledFlag || this.debug) {
pendingFlush.delete(this);
return;
}
try {
// A flush scheduled by `record()` drains the buffer immediately, so
// flushing again below would see nothing — await the in-flight delivery
// first or a CLI that exits right after close() kills the POST mid-air.
if (this.inflight) await this.inflight;
await this.flush();
} catch (err) {
getLogger().debug('telemetry close flush failed', err);
}
pendingFlush.delete(this);
}

private scheduleFlush(): void {
if (this.flushing) return;
this.flushing = true;
void this.flush().finally(() => {
this.flushing = false;
if (this.inflight) return;
this.inflight = this.flush().finally(() => {
this.inflight = null;
// Events recorded while the POST was in flight are sitting in the buffer
// with no flush scheduled (record() saw `inflight` and skipped) — chain
// another flush or they strand until close()/process exit.
if (this.buffer.length > 0) this.scheduleFlush();
});
void this.inflight;
}

private async flush(): Promise<void> {
if (this.buffer.length === 0) return;
const events = this.buffer.splice(0, this.buffer.length);
pendingFlush.delete(this); // nothing buffered — GC may reclaim us again

const controller = new AbortController();
const timer = setTimeout(() => controller.abort(), TIMEOUT_MS);
Expand Down
50 changes: 50 additions & 0 deletions libraries/typescript/tests/telemetry.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,56 @@ describe('[integration] telemetry — enabled path', () => {
expect(typeof event.run_id).toBe('string');
});

it('agent() records the realtime model in feature_used, deduped per model', async () => {
enableTelemetryEnv();
process.env.PATTER_TELEMETRY_ENDPOINT = collector.url;
const { Patter, Twilio, OpenAIRealtime } = await import('../src');

const phone = new Patter({
phoneNumber: '+15555550100',
carrier: new Twilio({
accountSid: 'ACxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx',
authToken: 'test-token',
}),
});
phone.agent({
systemPrompt: 'hi',
engine: new OpenAIRealtime({ apiKey: 'sk-test', model: 'gpt-realtime-2' }),
});
phone.agent({
systemPrompt: 'hi',
engine: new OpenAIRealtime({ apiKey: 'sk-test', model: 'gpt-realtime-mini' }),
});

const deadline = Date.now() + 2000;
while (
collector.events.filter((e) => e.event === 'feature_used').length < 2 &&
Date.now() < deadline
) {
await new Promise((r) => setTimeout(r, 10));
}

const featureEvents = collector.events.filter((e) => e.event === 'feature_used');
expect(featureEvents.map((e) => e.llm_model).sort()).toEqual([
'openai-gpt-realtime-2',
'openai-gpt-realtime-mini',
]);
for (const e of featureEvents) expect(e.engine).toBe('realtime');
});

it('close() delivers an event whose flush was already started by record()', async () => {
enableTelemetryEnv();
const client = new TelemetryClient({ sdkVersion: '0.6.6', endpoint: collector.url });
client.record('cli_command', { cli_command: 'eval' });
// record() schedules an async flush that drains the buffer immediately;
// close() must await that in-flight delivery, not just its own (now-empty)
// flush — otherwise a CLI that exits right after close() kills the POST
// mid-air and every cli_command event is silently lost.
await client.close();

expect(collector.events.map((e) => e.event)).toEqual(['cli_command']);
});

it('drops denylisted dimensions', async () => {
enableTelemetryEnv();
const client = new TelemetryClient({ sdkVersion: '0.6.3', endpoint: collector.url });
Expand Down
Loading