Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,22 @@

### Fixed

- **Telemetry: `call_completed` is no longer lost when the process exits right
after `disconnect()`.** Found live in an E2E call test: `call_started`
reached the collector but `call_completed` — the one event carrying
duration/cost/latency/outcome — never did, for every short-lived script
("place call, wait, exit", the main outbound use case). `disconnect()` used
a fire-and-forget flush; the loop closed right after and cancelled the POST
mid-air, and Python's 0.25 s atexit fallback was too short for a cold TLS
handshake. Both SDKs now `await telemetry.drain()` in `disconnect()` — a
new bounded flush-and-wait that keeps the client reusable (a subsequent
`serve()` still emits) — and the Python atexit timeout is bumped to 1 s as
a second line of defense for scripts that never call `disconnect()`.
`libraries/python/getpatter/telemetry/client.py`,
`libraries/python/getpatter/client.py`,
`libraries/typescript/src/telemetry/client.ts`,
`libraries/typescript/src/client.ts`.

- **Telemetry: `hermes` / `openclaw` CLI usage is no longer invisible (schema
v6).** The `cli_command` enum was closed over
`dashboard/eval/telemetry/none/other`, so the wizard commands — the main
Expand Down
12 changes: 8 additions & 4 deletions libraries/python/getpatter/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -2582,10 +2582,14 @@ async def disconnect(self) -> None:
teardown, and stale entries leak across ``serve`` /
``disconnect`` cycles. See FIX #93.
"""
# Ship any telemetry buffered at construction/agent() before teardown.
# flush_pending is cheap and keeps the instance reusable (no close), so a
# subsequent serve() still emits.
self._telemetry.flush_pending()
# Ship buffered telemetry and WAIT for delivery (bounded) before
# teardown. A fire-and-forget flush here lost the final events of
# short-lived scripts: the process exited right after disconnect(),
# the loop closed cancelling the flush task mid-POST, and
# call_completed (duration/cost/latency) never reached the wire.
# drain() keeps the instance reusable (no close) — a subsequent
# serve() still emits.
await self._telemetry.drain()

# Cancel and drain any in-flight prewarm work BEFORE tearing the
# server down so the synth tasks see a clean cancellation point
Expand Down
26 changes: 25 additions & 1 deletion libraries/python/getpatter/telemetry/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,11 @@

_TIMEOUT_S = 3.0
_FLUSH_TIMEOUT_S = 2.0
_ATEXIT_TIMEOUT_S = 0.25 # keep process-exit blocking minimal for short-lived runs
# Bounded process-exit flush. 0.25 s was routinely too short for a cold TLS
# POST (the final batch of a short-lived script was lost); 1 s delivers it
# while keeping exit blocking small. Scripts that call disconnect() don't rely
# on this — disconnect() drains with the loop still alive.
_ATEXIT_TIMEOUT_S = 1.0
_BUFFER_MAX = 256
# The relay rejects batches larger than 64 events per request — a full buffer
# must ship as multiple POSTs or events 65..256 silently vanish server-side.
Expand Down Expand Up @@ -184,6 +188,26 @@ def flush_pending(self) -> None:
except Exception:
logger.debug("telemetry flush_pending failed", exc_info=True)

async def drain(self, timeout: float = _FLUSH_TIMEOUT_S) -> None:
"""Flush buffered events and wait (bounded) for delivery.

Unlike :meth:`aclose` the client stays usable afterwards — for teardown
paths that may serve again (``Patter.disconnect()``). Without the wait,
a process exiting right after ``disconnect()`` closes the event loop and
cancels the fire-and-forget flush task mid-POST — the final events of a
short-lived script (typically ``call_completed``, the one carrying
duration/cost/latency) were routinely lost.
"""
if not self._enabled or self._debug or self._closed:
return
try:
if self._flush_task is not None and not self._flush_task.done():
await asyncio.wait_for(self._flush_task, timeout=timeout)
if self._buffer:
await asyncio.wait_for(self._flush(), timeout=timeout)
except Exception:
logger.debug("telemetry drain failed", exc_info=True)

async def aclose(self) -> None:
"""Flush remaining events and release the HTTP client (graceful shutdown)."""
if self._closed:
Expand Down
22 changes: 21 additions & 1 deletion libraries/python/tests/test_telemetry.py
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,26 @@ async def test_aclose_awaits_in_flight_flush_started_by_record(enabled, collecto
assert [e["event"] for e in collector.events] == ["cli_command"]


async def test_drain_delivers_final_events_and_keeps_client_usable(enabled, collector):
"""``Patter.disconnect()`` drains via ``drain()``: the final events of a
short-lived script (``call_completed`` carrying duration/cost/latency) must
be DELIVERED before the loop closes — a fire-and-forget flush was cancelled
at loop teardown and the bounded atexit fallback routinely lost them
(observed live: call_started reached Axiom, call_completed never did).
Unlike ``aclose``, the client must stay usable for a subsequent serve().
"""
client = TelemetryClient(sdk_version="0.6.7", endpoint=collector.url)
client.record("call_completed", outcome="completed", carrier="twilio")
await client.drain()
assert [e["event"] for e in collector.events] == ["call_completed"]

# Still usable after drain (disconnect() must not kill a reusable instance).
client.record("cli_command", cli_command="dashboard")
await client.drain()
assert [e["event"] for e in collector.events] == ["call_completed", "cli_command"]
await client.aclose()


class _GatedCollector(_Collector):
"""A real collector that holds its FIRST response until released.

Expand Down Expand Up @@ -950,7 +970,7 @@ def test_schema_version_is_6():
assert build_event("first_run", sdk_version="0.6.5")["schema_version"] == 6


def test_build_event_v5_new_events_and_dims():
def test_build_event_v6_new_events_and_dims():
cli = build_event(
"cli_command", sdk_version="0.6.5", dimensions={"cli_command": "dashboard"}
)
Expand Down
8 changes: 5 additions & 3 deletions libraries/typescript/src/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1984,9 +1984,11 @@ export class Patter {
* entries leak across ``serve`` / ``disconnect`` cycles. See FIX #93.
*/
async disconnect(): Promise<void> {
// Ship any telemetry buffered at construction/agent() before teardown.
// flushPending is cheap and keeps the instance reusable (no close).
this.telemetry.flushPending();
// Ship buffered telemetry and WAIT for delivery before teardown —
// mirrors Python: a fire-and-forget flush can lose the final events
// (call_completed with duration/cost/latency) when the process exits
// right after disconnect(). drain() keeps the instance reusable.
await this.telemetry.drain();

// Clear pending TTL eviction timers and drain in-flight prewarm
// synth tasks BEFORE tearing the server down so the synth tasks
Expand Down
16 changes: 16 additions & 0 deletions libraries/typescript/src/telemetry/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,22 @@ export class TelemetryClient {
}
}

/**
* Flush buffered events and wait for delivery. Unlike `close()` the client
* stays usable afterwards — for teardown paths that may serve again
* (`Patter.disconnect()`). Bounded by the flush's own per-POST abort timer.
* Mirrors Python's `drain()`.
*/
async drain(): Promise<void> {
if (!this.enabledFlag || this.debug || this.closed) return;
try {
if (this.inflight) await this.inflight;
if (this.buffer.length > 0) await this.flush();
} catch (err) {
getLogger().debug('telemetry drain failed', err);
}
}

/** Flush remaining events (graceful shutdown). Never throws. */
async close(): Promise<void> {
if (this.closed) return;
Expand Down
17 changes: 17 additions & 0 deletions libraries/typescript/tests/telemetry.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,23 @@ describe('[integration] telemetry — enabled path', () => {
expect(collector.events.map((e) => e.event)).toEqual(['cli_command']);
});

it('drain() delivers the final events and keeps the client usable (disconnect path)', async () => {
enableTelemetryEnv();
const client = new TelemetryClient({ sdkVersion: '0.6.7', endpoint: collector.url });
client.record('call_completed', { outcome: 'completed', carrier: 'twilio' });
// Patter.disconnect() awaits drain(): the final events of a short-lived
// script (call_completed with duration/cost/latency) must be DELIVERED
// before teardown, not left to a fire-and-forget flush racing process exit.
await client.drain();
expect(collector.events.map((e) => e.event)).toEqual(['call_completed']);

// Unlike close(), the client stays usable for a subsequent serve().
client.record('cli_command', { cli_command: 'dashboard' });
await client.drain();
expect(collector.events.map((e) => e.event)).toEqual(['call_completed', 'cli_command']);
await client.close();
});

it('an event recorded during an in-flight flush is chained, not stranded', async () => {
// Regression: record() saw `inflight` and skipped scheduling, so an event
// recorded while a flush POST was in flight sat in the buffer with no
Expand Down
Loading