diff --git a/CHANGELOG.md b/CHANGELOG.md index 66177e9..73140d7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -29,6 +29,22 @@ ### Fixed +- **Telemetry: `call_completed` is no longer lost when the process exits right + after `disconnect()`.** Found live in an E2E call test: `call_started` + reached the collector but `call_completed` — the one event carrying + duration/cost/latency/outcome — never did, for every short-lived script + ("place call, wait, exit", the main outbound use case). `disconnect()` used + a fire-and-forget flush; the loop closed right after and cancelled the POST + mid-air, and Python's 0.25 s atexit fallback was too short for a cold TLS + handshake. Both SDKs now `await telemetry.drain()` in `disconnect()` — a + new bounded flush-and-wait that keeps the client reusable (a subsequent + `serve()` still emits) — and the Python atexit timeout is bumped to 1 s as + a second line of defense for scripts that never call `disconnect()`. + `libraries/python/getpatter/telemetry/client.py`, + `libraries/python/getpatter/client.py`, + `libraries/typescript/src/telemetry/client.ts`, + `libraries/typescript/src/client.ts`. + - **Telemetry: `hermes` / `openclaw` CLI usage is no longer invisible (schema v6).** The `cli_command` enum was closed over `dashboard/eval/telemetry/none/other`, so the wizard commands — the main diff --git a/libraries/python/getpatter/client.py b/libraries/python/getpatter/client.py index bba13d4..47a8580 100644 --- a/libraries/python/getpatter/client.py +++ b/libraries/python/getpatter/client.py @@ -2582,10 +2582,14 @@ async def disconnect(self) -> None: teardown, and stale entries leak across ``serve`` / ``disconnect`` cycles. See FIX #93. """ - # Ship any telemetry buffered at construction/agent() before teardown. - # flush_pending is cheap and keeps the instance reusable (no close), so a - # subsequent serve() still emits. - self._telemetry.flush_pending() + # Ship buffered telemetry and WAIT for delivery (bounded) before + # teardown. A fire-and-forget flush here lost the final events of + # short-lived scripts: the process exited right after disconnect(), + # the loop closed cancelling the flush task mid-POST, and + # call_completed (duration/cost/latency) never reached the wire. + # drain() keeps the instance reusable (no close) — a subsequent + # serve() still emits. + await self._telemetry.drain() # Cancel and drain any in-flight prewarm work BEFORE tearing the # server down so the synth tasks see a clean cancellation point diff --git a/libraries/python/getpatter/telemetry/client.py b/libraries/python/getpatter/telemetry/client.py index f1d9d98..9c85ea4 100644 --- a/libraries/python/getpatter/telemetry/client.py +++ b/libraries/python/getpatter/telemetry/client.py @@ -41,7 +41,11 @@ _TIMEOUT_S = 3.0 _FLUSH_TIMEOUT_S = 2.0 -_ATEXIT_TIMEOUT_S = 0.25 # keep process-exit blocking minimal for short-lived runs +# Bounded process-exit flush. 0.25 s was routinely too short for a cold TLS +# POST (the final batch of a short-lived script was lost); 1 s delivers it +# while keeping exit blocking small. Scripts that call disconnect() don't rely +# on this — disconnect() drains with the loop still alive. +_ATEXIT_TIMEOUT_S = 1.0 _BUFFER_MAX = 256 # The relay rejects batches larger than 64 events per request — a full buffer # must ship as multiple POSTs or events 65..256 silently vanish server-side. @@ -184,6 +188,26 @@ def flush_pending(self) -> None: except Exception: logger.debug("telemetry flush_pending failed", exc_info=True) + async def drain(self, timeout: float = _FLUSH_TIMEOUT_S) -> None: + """Flush buffered events and wait (bounded) for delivery. + + Unlike :meth:`aclose` the client stays usable afterwards — for teardown + paths that may serve again (``Patter.disconnect()``). Without the wait, + a process exiting right after ``disconnect()`` closes the event loop and + cancels the fire-and-forget flush task mid-POST — the final events of a + short-lived script (typically ``call_completed``, the one carrying + duration/cost/latency) were routinely lost. + """ + if not self._enabled or self._debug or self._closed: + return + try: + if self._flush_task is not None and not self._flush_task.done(): + await asyncio.wait_for(self._flush_task, timeout=timeout) + if self._buffer: + await asyncio.wait_for(self._flush(), timeout=timeout) + except Exception: + logger.debug("telemetry drain failed", exc_info=True) + async def aclose(self) -> None: """Flush remaining events and release the HTTP client (graceful shutdown).""" if self._closed: diff --git a/libraries/python/tests/test_telemetry.py b/libraries/python/tests/test_telemetry.py index 045e612..00990db 100644 --- a/libraries/python/tests/test_telemetry.py +++ b/libraries/python/tests/test_telemetry.py @@ -228,6 +228,26 @@ async def test_aclose_awaits_in_flight_flush_started_by_record(enabled, collecto assert [e["event"] for e in collector.events] == ["cli_command"] +async def test_drain_delivers_final_events_and_keeps_client_usable(enabled, collector): + """``Patter.disconnect()`` drains via ``drain()``: the final events of a + short-lived script (``call_completed`` carrying duration/cost/latency) must + be DELIVERED before the loop closes — a fire-and-forget flush was cancelled + at loop teardown and the bounded atexit fallback routinely lost them + (observed live: call_started reached Axiom, call_completed never did). + Unlike ``aclose``, the client must stay usable for a subsequent serve(). + """ + client = TelemetryClient(sdk_version="0.6.7", endpoint=collector.url) + client.record("call_completed", outcome="completed", carrier="twilio") + await client.drain() + assert [e["event"] for e in collector.events] == ["call_completed"] + + # Still usable after drain (disconnect() must not kill a reusable instance). + client.record("cli_command", cli_command="dashboard") + await client.drain() + assert [e["event"] for e in collector.events] == ["call_completed", "cli_command"] + await client.aclose() + + class _GatedCollector(_Collector): """A real collector that holds its FIRST response until released. @@ -950,7 +970,7 @@ def test_schema_version_is_6(): assert build_event("first_run", sdk_version="0.6.5")["schema_version"] == 6 -def test_build_event_v5_new_events_and_dims(): +def test_build_event_v6_new_events_and_dims(): cli = build_event( "cli_command", sdk_version="0.6.5", dimensions={"cli_command": "dashboard"} ) diff --git a/libraries/typescript/src/client.ts b/libraries/typescript/src/client.ts index 97a3bc2..dac2f96 100644 --- a/libraries/typescript/src/client.ts +++ b/libraries/typescript/src/client.ts @@ -1984,9 +1984,11 @@ export class Patter { * entries leak across ``serve`` / ``disconnect`` cycles. See FIX #93. */ async disconnect(): Promise { - // Ship any telemetry buffered at construction/agent() before teardown. - // flushPending is cheap and keeps the instance reusable (no close). - this.telemetry.flushPending(); + // Ship buffered telemetry and WAIT for delivery before teardown — + // mirrors Python: a fire-and-forget flush can lose the final events + // (call_completed with duration/cost/latency) when the process exits + // right after disconnect(). drain() keeps the instance reusable. + await this.telemetry.drain(); // Clear pending TTL eviction timers and drain in-flight prewarm // synth tasks BEFORE tearing the server down so the synth tasks diff --git a/libraries/typescript/src/telemetry/client.ts b/libraries/typescript/src/telemetry/client.ts index 0e81dce..2abe6a4 100644 --- a/libraries/typescript/src/telemetry/client.ts +++ b/libraries/typescript/src/telemetry/client.ts @@ -153,6 +153,22 @@ export class TelemetryClient { } } + /** + * Flush buffered events and wait for delivery. Unlike `close()` the client + * stays usable afterwards — for teardown paths that may serve again + * (`Patter.disconnect()`). Bounded by the flush's own per-POST abort timer. + * Mirrors Python's `drain()`. + */ + async drain(): Promise { + if (!this.enabledFlag || this.debug || this.closed) return; + try { + if (this.inflight) await this.inflight; + if (this.buffer.length > 0) await this.flush(); + } catch (err) { + getLogger().debug('telemetry drain failed', err); + } + } + /** Flush remaining events (graceful shutdown). Never throws. */ async close(): Promise { if (this.closed) return; diff --git a/libraries/typescript/tests/telemetry.test.ts b/libraries/typescript/tests/telemetry.test.ts index a04e5a1..d6b3420 100644 --- a/libraries/typescript/tests/telemetry.test.ts +++ b/libraries/typescript/tests/telemetry.test.ts @@ -296,6 +296,23 @@ describe('[integration] telemetry — enabled path', () => { expect(collector.events.map((e) => e.event)).toEqual(['cli_command']); }); + it('drain() delivers the final events and keeps the client usable (disconnect path)', async () => { + enableTelemetryEnv(); + const client = new TelemetryClient({ sdkVersion: '0.6.7', endpoint: collector.url }); + client.record('call_completed', { outcome: 'completed', carrier: 'twilio' }); + // Patter.disconnect() awaits drain(): the final events of a short-lived + // script (call_completed with duration/cost/latency) must be DELIVERED + // before teardown, not left to a fire-and-forget flush racing process exit. + await client.drain(); + expect(collector.events.map((e) => e.event)).toEqual(['call_completed']); + + // Unlike close(), the client stays usable for a subsequent serve(). + client.record('cli_command', { cli_command: 'dashboard' }); + await client.drain(); + expect(collector.events.map((e) => e.event)).toEqual(['call_completed', 'cli_command']); + await client.close(); + }); + it('an event recorded during an in-flight flush is chained, not stranded', async () => { // Regression: record() saw `inflight` and skipped scheduling, so an event // recorded while a flush POST was in flight sat in the buffer with no