From 359eb603afdf9701acbc97be9fc5ed74641bff4b Mon Sep 17 00:00:00 2001 From: nicolotognoni Date: Wed, 10 Jun 2026 09:02:09 +0200 Subject: [PATCH 1/2] fix(pipeline): arm barge-in for the whole carrier-buffered playback window MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Agent-runtime LLMs (HermesLLM/OpenClawLLM) deliver the whole reply at once after a long thinking pause, so TTS outruns realtime and the carrier ends up holding tens of seconds of queued audio. The speaking state ended a fixed 1.5s grace after the last *push*, not the last *playback* — for most of the audible reply isSpeaking was already false, VAD/transcript events were treated as a calm next turn, send_clear never fired, and the agent 'detected the barge-in but kept talking'. Track an estimated playback cursor (_playback_buffered_until / playbackBufferedUntil) advanced per pushed chunk at its real byte rate (PCM16@16kHz = 32 B/ms, carrier-native mulaw@8kHz = 8 B/ms), and split end-speaking-with-grace into two phases: phase 1 holds isSpeaking=true with tailGraceActive=false for the whole estimated backlog (barge-in stays armed and takes the full cancel + send_clear path, dropping the carrier buffer); phase 2 is the unchanged echo-tail grace. Barge-in cancels reset the cursor. No new config; token-paced LLMs (no backlog) behave identically to before, and PATTER_TTS_TAIL_GRACE_MS=0 still forces the legacy synchronous flip. Full Python/TS parity with mirrored unit tests (RED without the fix). --- CHANGELOG.md | 1 + libraries/python/getpatter/stream_handler.py | 100 +++++++- .../unit/test_pipeline_bargein_buffered.py | 224 ++++++++++++++++ libraries/typescript/src/stream-handler.ts | 121 +++++++-- .../unit/pipeline-bargein-buffered.test.ts | 239 ++++++++++++++++++ 5 files changed, 653 insertions(+), 32 deletions(-) create mode 100644 libraries/python/tests/unit/test_pipeline_bargein_buffered.py create mode 100644 libraries/typescript/tests/unit/pipeline-bargein-buffered.test.ts diff --git a/CHANGELOG.md b/CHANGELOG.md index 56590b2..6d5b400 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -87,6 +87,7 @@ - **Back-to-back dedup fix** — a final within 500 ms of the previous is now dropped only when it is a *near-duplicate* (Deepgram emitting `speech_final` then `is_final` for the same utterance). A genuinely different fast follow-up (e.g. the real interruption right after a suppressed phantom) is kept instead of being silently swallowed into an empty turn. - **Interrupted-turn context rewrite** — on a confirmed mid-turn barge-in the spoken prefix is recorded in history with an `[interrupted by caller]` marker (instead of an ungrounded full reply), so a stateful agent runtime (Hermes/OpenClaw, keyed by `X-Hermes-Session-Id`) sees on the next turn that it was cut off and what the caller actually heard. `libraries/python/getpatter/stream_handler.py`, `libraries/typescript/src/stream-handler.ts`. - **Forward-STT-without-AEC no longer self-interrupts on its own echo.** The remaining live Hermes/OpenClaw barge-in failure: with `PATTER_FORWARD_STT_WHILE_SPEAKING` on, no AEC, and no `barge_in_strategies`, a VAD `speech_start` during TTS cancelled the turn immediately — but on a no-AEC link that `speech_start` is very often the agent's *own* TTS echo (or pre-first-token line noise during a long tool-running turn). The result was a cascade of false-positive interruptions: a short normal reply like "bene bene" produced `agent_text='[interrupted]'` with `bargein_ms≈0`, and the next turn's LLM ran for seconds but emitted `tts_characters=0` because it was torn down before its first token. The echo guard existed only on the *transcript* path, so the raw VAD-energy cancel had no protection. The VAD-energy cancel is now **deferred to transcript confirmation** whenever audio is forwarded during TTS without AEC (`forward_stt_while_speaking && aec is None`), exactly as it already was when `barge_in_strategies` are configured: the `speech_start` marks the barge-in *pending* (the agent keeps talking) and the cancel only fires once `_handle_barge_in` / `handleBargeIn` sees a real transcript that survives the echo guard; if none confirms within `barge_in_confirm_ms` (default 1500 ms) the agent resumes its sentence. The default VAD path and forward-STT *with* AEC keep the responsive immediate cancel — no behaviour change for existing configs. For the cleanest short-echo handling, still pair with `echo_cancellation=True` or `barge_in_strategies`. `libraries/python/getpatter/stream_handler.py`, `libraries/typescript/src/stream-handler.ts`. +- **Barge-in now works while the carrier is still PLAYING a long buffered reply — the "Hermes detects the interruption but keeps talking" bug.** The pipeline pushes TTS audio to the carrier as fast as the provider synthesizes it (no pacing) while the carrier buffers and plays at realtime. With a token-paced LLM the two stay roughly in sync, but an agent-runtime LLM (`HermesLLM` / `OpenClawLLM`) delivers its whole — often long — reply at once after the thinking pause: TTS outruns realtime and the carrier ends up holding tens of seconds of queued audio. The handler's speaking state ended a fixed `PATTER_TTS_TAIL_GRACE_MS` (1.5 s) after the last *push*, not the last *playback* — so for most of the audible reply `_is_speaking` was already false, every VAD `speech_start` / transcript was treated as a calm next turn instead of a barge-in, `send_clear` was never sent, and the buffered audio kept playing over the caller (with the next turn's reply queued behind it). The handler now tracks an **estimated playback cursor** (`_playback_buffered_until` / `playbackBufferedUntil`, advanced per pushed chunk at the chunk's real byte rate — PCM16@16kHz or carrier-native μ-law@8kHz) and `_end_speaking_with_grace` waits in two phases: phase 1 keeps `_is_speaking=true` with `_tail_grace_active=false` for the whole estimated backlog (barge-in stays armed and takes the full cancel + `send_clear` path, which drops the carrier buffer instantly); phase 2 is the unchanged echo-tail grace. Barge-in cancels reset the cursor (the buffer was just cleared). No new config; token-paced LLMs (no backlog) behave byte-identically to before, and `PATTER_TTS_TAIL_GRACE_MS=0` still forces the legacy synchronous flip. This is the industry-standard semantics (stop + flush client-side regardless of LLM state — cf. Twilio media-stream `clear`). `libraries/python/getpatter/stream_handler.py`, `libraries/typescript/src/stream-handler.ts`. - **(Python) Twilio/Plivo mark frames now carry the caller-supplied name — first-message pacing no longer burns the mark-await timeout on every call.** `TwilioAudioSender.send_mark` (and the Plivo checkpoint equivalent) discarded the `mark_name` argument and sent a locally generated `audio_N` instead, so the `fm_N` echo the first-message pacer waited for never matched and every mark resolved via the 0.5 s fallback timeout (~1.5 s of guaranteed extra latency in the barge-in window of every Twilio call). The wire name is now the caller's, matching the TypeScript behaviour. `libraries/python/getpatter/telephony/twilio.py`, `.../telephony/plivo.py`. - **(TypeScript) Inbound audio frames are now awaited — a transient audio-path error can no longer kill the whole server.** All three carrier WS message handlers called `handler.handleAudio(...)` without `await`, so a rejection inside the audio path (VAD, resampler, STT send) escaped the surrounding `try/catch` and became an unhandled rejection, which terminates the Node process (Node 15+) together with every active call. `libraries/typescript/src/server.ts`. - **(TypeScript) Telnyx calls no longer leak `activeCallIds` entries.** The Telnyx WS close handler was the only one of the three carriers that never deleted its `ws → call_control_id` map entry, so the map grew for the server's lifetime and graceful shutdown issued hangup REST calls for long-dead calls. `libraries/typescript/src/server.ts`. diff --git a/libraries/python/getpatter/stream_handler.py b/libraries/python/getpatter/stream_handler.py index b9c6daf..68fa4b5 100644 --- a/libraries/python/getpatter/stream_handler.py +++ b/libraries/python/getpatter/stream_handler.py @@ -2474,6 +2474,22 @@ def __init__( # before any audio went out, exit the TTS loop on # ``_is_speaking=False``, and silently drop the agent's first turn. self._first_audio_sent_at: float | None = None + # Estimated wall-clock (``time.time()`` units) when the LAST audio + # byte pushed to the carrier finishes PLAYING on the phone. The + # pipeline pushes TTS audio as fast as the provider synthesizes it + # (no pacing) while the carrier buffers + plays at realtime, so "we + # finished pushing" and "the caller finished hearing" can diverge by + # tens of seconds — especially with agent-runtime LLMs (Hermes / + # OpenClaw) that deliver a long reply all at once after a thinking + # pause. ``_end_speaking_with_grace`` holds ``_is_speaking=True`` + # (with ``_tail_grace_active=False``) until this cursor passes, so a + # barge-in during the audible backlog still takes the cancel path + # (``send_clear`` drops the carrier buffer) instead of being treated + # as a calm next turn. Advanced by ``_track_outbound_playback``; + # reset by the barge-in cancel paths and + # ``_end_tail_grace_for_new_turn``. Mirrors TS + # ``playbackBufferedUntil``. + self._playback_buffered_until: float = 0.0 # Optional barge-in confirmation strategies (see # ``getpatter.services.barge_in_strategies``). With an empty tuple # the SDK uses the legacy "cancel on first VAD speech_start" @@ -3202,6 +3218,7 @@ async def _synthesize_sentence( if self._aec is not None: self._aec.push_far_end(processed_audio) await self.audio_sender.send_audio(processed_audio) + self._track_outbound_playback(len(processed_audio)) self._mark_first_audio_sent() finally: await gen.aclose() @@ -3636,6 +3653,12 @@ async def _do_cancel_for_barge_in(self, transcript_text: str) -> None: self._speaking_started_at = None self._first_audio_sent_at = None self._last_cancel_at = time.time() + # The ``send_clear`` below drops whatever the carrier had + # buffered ahead — snap the playback cursor back and kill any + # pending grace task so its phase-1 wait (carrier backlog) / + # tail-grace flag cannot fire against the cancelled turn. + self._playback_buffered_until = 0.0 + self._clear_grace_task() # Unblock any firstMessage paced-send loop that's sitting in # ``_wait_for_mark_window`` — without this the loop keeps # awaiting echoes for up to ``_MARK_AWAIT_TIMEOUT_S`` per @@ -4244,6 +4267,10 @@ async def on_audio_received(self, audio_bytes: bytes) -> None: self._speaking_generation += 1 self._last_cancel_at = time.time() self._suppressed_speech_pending = False + # ``send_clear`` above dropped the carrier's + # buffered audio — reset the playback cursor. + self._playback_buffered_until = 0.0 + self._clear_grace_task() # Tear down the in-flight LLM stream too. The # consumption loop polls ``_llm_cancel_event`` # per chunk, but a turn parked PRE-first-token @@ -4427,6 +4454,30 @@ def _mark_first_audio_sent(self) -> None: if self._first_audio_sent_at is None: self._first_audio_sent_at = time.time() + def _track_outbound_playback(self, num_bytes: int) -> None: + """Advance ``_playback_buffered_until`` by the playout duration of an + outbound TTS chunk. + + ``num_bytes`` is the chunk size BEFORE carrier encoding (the exact + buffer handed to ``audio_sender.send_audio``): PCM16 @ 16 kHz in the + default path (32 bytes/ms), or the carrier's native μ-law @ 8 kHz + (8 bytes/ms) when the TTS adapter emits wire format directly + (``audio_sender._input_is_mulaw_8k`` — Twilio/Plivo ``ulaw_8000``; + Telnyx native is PCM16 16 kHz so it stays at 32 bytes/ms). Mirrors TS + ``trackOutboundPlayback``. + """ + if num_bytes <= 0: + return + bytes_per_s = ( + 8_000.0 + if getattr(self.audio_sender, "_input_is_mulaw_8k", False) + else 32_000.0 + ) + now = time.time() + buffered_until = getattr(self, "_playback_buffered_until", 0.0) + base = buffered_until if buffered_until > now else now + self._playback_buffered_until = base + (num_bytes / bytes_per_s) + def _can_barge_in(self) -> bool: """Whether barge-in is allowed to fire right now. @@ -4471,6 +4522,12 @@ async def _end_speaking_with_grace(self) -> None: ``PATTER_TTS_TAIL_GRACE_MS`` (default 1500 ms) — keeps the flag set while the trailing audio actually plays out. Setting the env var to ``0`` keeps the legacy synchronous behaviour for tests / soak runs. + + When the carrier still holds an audio backlog we pushed faster than + realtime (``_playback_buffered_until`` in the future), the grace is + preceded by a phase-1 wait that keeps the agent formally "speaking" + — with barge-in armed — for the whole audible window. See the inline + comments below. """ try: grace_ms = int(os.environ.get("PATTER_TTS_TAIL_GRACE_MS", "1500")) @@ -4501,17 +4558,47 @@ async def _end_speaking_with_grace(self) -> None: return gen = self._speaking_generation - # The agent has finished pushing audio; we now hold ``_is_speaking`` - # only to suppress the fading echo tail. Mark this as the tail-grace - # window so fast next-turn speech is rescued as a new turn rather - # than mis-detected as a barge-in. - self._tail_grace_active = True # Cancel any still-pending flip from a previous turn so at most one # grace task is ever in flight (parity with TS ``clearGraceTimer``). self._clear_grace_task() + # Phase 1 — the carrier is still PLAYING audio we already pushed. + # Agent-runtime LLMs (Hermes/OpenClaw) deliver the whole reply at + # once, TTS outruns realtime, and the carrier buffers tens of + # seconds of audio that keeps playing long after this method runs. + # For that whole audible window the agent IS still speaking from the + # caller's perspective: keep ``_is_speaking=True`` with + # ``_tail_grace_active=False`` so VAD/transcript barge-in takes the + # cancel path (``send_clear`` drops the carrier buffer) instead of + # the next-turn rescue — without this, "the agent detects the + # interruption but keeps talking". A barge-in meanwhile cancels this + # task (``_clear_grace_task`` in the cancel paths). With no backlog + # (token-paced LLMs) the tail grace starts immediately, exactly as + # before. Phase 2 — the existing echo-tail grace. + buffered_s = getattr(self, "_playback_buffered_until", 0.0) - time.time() + if buffered_s <= 0: + # The agent has finished pushing audio and the carrier played it + # out; ``_is_speaking`` is now held only to suppress the fading + # echo tail. Mark this as the tail-grace window so fast next-turn + # speech is rescued as a new turn rather than mis-detected as a + # barge-in. + self._tail_grace_active = True async def _flip_after_grace() -> None: try: + # Phase 1: wait out the estimated carrier-side backlog. + while True: + remaining = ( + getattr(self, "_playback_buffered_until", 0.0) - time.time() + ) + if remaining <= 0: + break + await asyncio.sleep(remaining) + if self._speaking_generation != gen: + return + if self._speaking_generation != gen: + return + # Phase 2: the echo-tail grace window. + self._tail_grace_active = True await asyncio.sleep(grace_ms / 1000) # Only reset if no newer turn started while we slept; a # newer turn would have bumped ``_speaking_generation``. @@ -4567,6 +4654,9 @@ async def _end_tail_grace_for_new_turn(self) -> None: self._tail_grace_active = False self._speaking_started_at = None self._first_audio_sent_at = None + # Tail grace only starts after the playback cursor drained (phase 1 + # of ``_end_speaking_with_grace``), so no carrier backlog is left. + self._playback_buffered_until = 0.0 # Invalidate the pending grace-flip task scheduled by # ``_end_speaking_with_grace`` so it cannot later flip state on a turn # that has already moved on (bump the generation AND cancel the task — diff --git a/libraries/python/tests/unit/test_pipeline_bargein_buffered.py b/libraries/python/tests/unit/test_pipeline_bargein_buffered.py new file mode 100644 index 0000000..0a56c87 --- /dev/null +++ b/libraries/python/tests/unit/test_pipeline_bargein_buffered.py @@ -0,0 +1,224 @@ +"""Unit tests for barge-in while the carrier still plays buffered audio. + +The pipeline pushes TTS audio to the carrier as fast as the provider +synthesizes it; the carrier buffers and plays at realtime. With an +agent-runtime LLM (Hermes / OpenClaw) the whole — often long — reply +arrives at once, so the SDK finishes *pushing* tens of seconds before the +caller finishes *hearing*. The handler must keep ``_is_speaking=True`` +(with ``_tail_grace_active=False``) for that whole audible backlog so a +barge-in still takes the cancel path (``send_clear`` drops the carrier +buffer) instead of being mis-read as a calm next turn — previously the +fixed 1.5 s grace expired mid-reply and "the agent detected the barge-in +but kept talking". + +State estimation lives in ``_track_outbound_playback`` / +``_playback_buffered_until``; the two-phase wait lives in +``_end_speaking_with_grace``. +""" + +from __future__ import annotations + +import asyncio +import time +from collections import deque +from typing import AsyncIterator, Iterable +from unittest.mock import AsyncMock + +import pytest + +from getpatter.providers.base import Transcript +from getpatter.stream_handler import PipelineStreamHandler + +from tests.conftest import make_agent + + +class _StubSTT: + def __init__(self, transcripts: Iterable[Transcript]) -> None: + self._transcripts = list(transcripts) + + async def receive_transcripts(self) -> AsyncIterator[Transcript]: + for t in self._transcripts: + yield t + await asyncio.sleep(0) + + +def _make_handler(audio_sender: AsyncMock) -> PipelineStreamHandler: + handler = PipelineStreamHandler( + agent=make_agent(), + audio_sender=audio_sender, + call_id="call-buffered", + caller="+15551110000", + callee="+15552220000", + resolved_prompt="p", + metrics=None, + for_twilio=True, + on_transcript=None, + conversation_history=deque(maxlen=10), + transcript_entries=deque(maxlen=10), + ) + handler.on_message = None + handler._llm_loop = None + return handler + + +def _make_audio_sender(*, mulaw_native: bool = False) -> AsyncMock: + sender = AsyncMock() + # AsyncMock auto-creates truthy attributes — pin the format flag so + # ``_track_outbound_playback`` sees the real default (PCM16 @ 16 kHz). + sender._input_is_mulaw_8k = mulaw_native + return sender + + +# --------------------------------------------------------------------------- +# _track_outbound_playback — cursor math +# --------------------------------------------------------------------------- + + +@pytest.mark.unit +class TestTrackOutboundPlayback: + def test_pcm16_16k_chunk_advances_cursor_by_chunk_duration(self) -> None: + handler = _make_handler(_make_audio_sender()) + before = time.time() + handler._track_outbound_playback(3200) # 100 ms at 32 bytes/ms + assert handler._playback_buffered_until == pytest.approx(before + 0.1, abs=0.05) + + def test_mulaw_8k_native_chunk_uses_8_bytes_per_ms(self) -> None: + handler = _make_handler(_make_audio_sender(mulaw_native=True)) + before = time.time() + handler._track_outbound_playback(800) # 100 ms at 8 bytes/ms + assert handler._playback_buffered_until == pytest.approx(before + 0.1, abs=0.05) + + def test_back_to_back_chunks_accumulate(self) -> None: + handler = _make_handler(_make_audio_sender()) + before = time.time() + handler._track_outbound_playback(3200) + handler._track_outbound_playback(3200) + assert handler._playback_buffered_until == pytest.approx(before + 0.2, abs=0.05) + + def test_cursor_rebases_to_now_after_idle_gap(self) -> None: + handler = _make_handler(_make_audio_sender()) + handler._playback_buffered_until = time.time() - 10.0 # long drained + before = time.time() + handler._track_outbound_playback(3200) + assert handler._playback_buffered_until == pytest.approx(before + 0.1, abs=0.05) + + def test_empty_chunk_is_a_no_op(self) -> None: + handler = _make_handler(_make_audio_sender()) + handler._track_outbound_playback(0) + assert handler._playback_buffered_until == 0.0 + + +# --------------------------------------------------------------------------- +# _end_speaking_with_grace — two-phase wait +# --------------------------------------------------------------------------- + + +@pytest.mark.unit +@pytest.mark.asyncio +class TestBufferedBacklogHoldsSpeaking: + async def test_backlog_keeps_speaking_armed_not_tail_grace( + self, monkeypatch + ) -> None: + """While the carrier plays buffered audio the agent is still + speaking — NOT in the tail-grace window — so barge-in stays armed.""" + monkeypatch.setenv("PATTER_TTS_TAIL_GRACE_MS", "50") + handler = _make_handler(_make_audio_sender()) + handler._is_speaking = True + handler._playback_buffered_until = time.time() + 0.5 + + await handler._end_speaking_with_grace() + await asyncio.sleep(0.1) # well inside the backlog window + + assert handler._is_speaking is True + assert handler._tail_grace_active is False + handler._clear_grace_task() + + async def test_backlog_drains_then_tail_grace_then_flip(self, monkeypatch) -> None: + monkeypatch.setenv("PATTER_TTS_TAIL_GRACE_MS", "50") + handler = _make_handler(_make_audio_sender()) + handler._is_speaking = True + handler._playback_buffered_until = time.time() + 0.15 + + await handler._end_speaking_with_grace() + await asyncio.sleep(0.4) # backlog (150 ms) + grace (50 ms) + margin + + assert handler._is_speaking is False + assert handler._tail_grace_active is False + + async def test_no_backlog_starts_tail_grace_immediately(self, monkeypatch) -> None: + """Token-paced LLMs (no carrier backlog) keep today's behaviour.""" + monkeypatch.setenv("PATTER_TTS_TAIL_GRACE_MS", "50") + handler = _make_handler(_make_audio_sender()) + handler._is_speaking = True + assert handler._playback_buffered_until == 0.0 + + await handler._end_speaking_with_grace() + + assert handler._tail_grace_active is True + await asyncio.sleep(0.15) + assert handler._is_speaking is False + + +# --------------------------------------------------------------------------- +# Barge-in during the buffered backlog — the Hermes/OpenClaw regression +# --------------------------------------------------------------------------- + + +@pytest.mark.unit +@pytest.mark.asyncio +class TestBargeInDuringBufferedBacklog: + async def test_transcript_during_backlog_cancels_and_clears( + self, monkeypatch + ) -> None: + """A transcript while the carrier still plays buffered audio must run + the FULL cancel path: flip ``_is_speaking`` and ``send_clear`` the + carrier so the buffered reply actually stops.""" + monkeypatch.setenv("PATTER_TTS_TAIL_GRACE_MS", "50") + audio_sender = _make_audio_sender() + handler = _make_handler(audio_sender) + handler._stt = _StubSTT( + [Transcript(text="aspetta", is_final=False, confidence=0.5)] + ) + handler._is_speaking = True + # Turn finished pushing; carrier still has seconds of audio queued. + handler._playback_buffered_until = time.time() + 5.0 + await handler._end_speaking_with_grace() + + assert handler._is_speaking is True # backlog holds the floor + await asyncio.wait_for(handler._stt_loop(), timeout=2.0) + + audio_sender.send_clear.assert_awaited_once() + assert handler._is_speaking is False + assert handler._playback_buffered_until == 0.0 + + async def test_cancel_resets_cursor_and_grace_task(self) -> None: + handler = _make_handler(_make_audio_sender()) + handler._is_speaking = True + handler._playback_buffered_until = time.time() + 5.0 + + await handler._do_cancel_for_barge_in("stop") + + assert handler._playback_buffered_until == 0.0 + assert handler._grace_task is None + + async def test_synthesize_sentence_tracks_pushed_audio(self) -> None: + """The pipeline TTS path must advance the playback cursor for every + chunk it pushes to the carrier.""" + + class _StubTTS: + async def synthesize(self, _text: str): + yield b"\x00" * 6400 # 200 ms of PCM16 @ 16 kHz + + handler = _make_handler(_make_audio_sender()) + handler._tts = _StubTTS() + handler._is_speaking = True + + from getpatter.services.pipeline_hooks import PipelineHookExecutor + + before = time.time() + ok = await handler._synthesize_sentence( + "ciao", PipelineHookExecutor(None), handler._build_hook_context(), [True] + ) + + assert ok is True + assert handler._playback_buffered_until == pytest.approx(before + 0.2, abs=0.1) diff --git a/libraries/typescript/src/stream-handler.ts b/libraries/typescript/src/stream-handler.ts index ea35ba1..c556e3a 100644 --- a/libraries/typescript/src/stream-handler.ts +++ b/libraries/typescript/src/stream-handler.ts @@ -554,6 +554,21 @@ export class StreamHandler { * ``isSpeaking=false``, and silently cut the agent's first turn. */ private firstAudioSentAt: number | null = null; + /** + * Estimated wall-clock (ms) when the LAST audio byte pushed to the carrier + * finishes PLAYING on the phone. The pipeline pushes TTS audio as fast as + * the provider synthesizes it (no pacing) and the carrier buffers + plays + * at realtime, so "we finished pushing" and "the caller finished hearing" + * can diverge by tens of seconds — especially with agent-runtime LLMs + * (Hermes/OpenClaw) that deliver a long reply all at once after a thinking + * pause. ``endSpeakingWithGrace`` holds ``isSpeaking=true`` (with + * ``tailGraceActive=false``) until this cursor passes, so a barge-in during + * the audible backlog still takes the cancel path (``sendClear`` drops the + * carrier buffer) instead of being treated as a calm next turn. Advanced by + * ``trackOutboundPlayback``; reset by ``cancelSpeaking`` (the buffer is + * cleared) and ``endTailGraceForNewTurn``. + */ + private playbackBufferedUntil = 0; /** * Optional barge-in confirmation strategies. With an empty array the * SDK falls back to the legacy "cancel on first VAD speech_start" @@ -729,6 +744,28 @@ export class StreamHandler { } } + /** + * Advance ``playbackBufferedUntil`` by the playout duration of an outbound + * TTS chunk. ``numBytes`` is the size of the chunk BEFORE carrier encoding + * (the same buffer handed to ``encodePipelineAudio``): PCM16 @ 16 kHz in + * the default path (32 bytes/ms), or the carrier's native μ-law @ 8 kHz + * (8 bytes/ms) when the TTS adapter emits wire format directly + * (``ttsOutputFormatNativeForCarrier`` — Twilio/Plivo ``ulaw_8000``; + * Telnyx native is ``pcm_16000`` so it stays at 32 bytes/ms). + */ + private trackOutboundPlayback(numBytes: number): void { + if (numBytes <= 0) return; + const bytesPerMs = + this.ttsOutputFormatNativeForCarrier && + this.deps.bridge.telephonyProvider !== 'telnyx' + ? 8 + : 32; + const now = Date.now(); + const base = + this.playbackBufferedUntil > now ? this.playbackBufferedUntil : now; + this.playbackBufferedUntil = base + numBytes / bytesPerMs; + } + /** * Atomically end speaking AND invalidate any pending grace timer. * Use instead of ``this.isSpeaking = false`` at barge-in sites. @@ -744,6 +781,10 @@ export class StreamHandler { this.firstAudioSentAt = null; this.lastCancelAt = Date.now(); this.suppressedSpeechPending = false; + // The barge-in paths that call this also ``sendClear`` the carrier — + // whatever audio was buffered ahead is dropped, so the playback cursor + // snaps back to "nothing pending". + this.playbackBufferedUntil = 0; // Drain any firstMessage mark waiters so a loop blocked on // ``waitForMarkWindow`` exits on the next tick and observes // ``!isSpeaking``. Without this the loop would stay blocked until @@ -852,34 +893,56 @@ export class StreamHandler { if (grace > 0) { const gen = this.speakingGeneration; this.clearGraceTimer(); - // The agent has finished pushing audio; ``isSpeaking`` is now held only - // to suppress the fading echo tail. Mark the tail-grace window so fast - // next-turn speech is rescued as a new turn rather than mis-detected as - // a barge-in. - this.tailGraceActive = true; - this.graceTimer = setTimeout(() => { - this.graceTimer = null; - if (this.speakingGeneration === gen) { - this.isSpeaking = false; - this.tailGraceActive = false; - this.speakingStartedAt = null; - this.firstAudioSentAt = null; - this.clearPendingBargeIn(); - void this.resetBargeInStrategies(); - // If VAD detected speech during the agent's turn but it was - // gate-suppressed (agent hadn't been speaking long enough for - // barge-in to fire), flush the ring buffer to STT now so the - // user's words aren't silently lost. - if (this.suppressedSpeechPending) { - this.suppressedSpeechPending = false; - this.flushInboundAudioRing(); + const startTailGrace = (): void => { + // The carrier has (estimatedly) finished playing everything we + // pushed; ``isSpeaking`` is now held only to suppress the fading + // echo tail. Mark the tail-grace window so fast next-turn speech is + // rescued as a new turn rather than mis-detected as a barge-in. + this.tailGraceActive = true; + this.graceTimer = setTimeout(() => { + this.graceTimer = null; + if (this.speakingGeneration === gen) { + this.isSpeaking = false; + this.tailGraceActive = false; + this.speakingStartedAt = null; + this.firstAudioSentAt = null; + this.clearPendingBargeIn(); + void this.resetBargeInStrategies(); + // If VAD detected speech during the agent's turn but it was + // gate-suppressed (agent hadn't been speaking long enough for + // barge-in to fire), flush the ring buffer to STT now so the + // user's words aren't silently lost. + if (this.suppressedSpeechPending) { + this.suppressedSpeechPending = false; + this.flushInboundAudioRing(); + } + // Reset VAD so any stuck SPEECH state from echo / loopback during + // the agent's turn does not block the next user utterance from + // emitting ``speech_start``. + this.resetVad(); } - // Reset VAD so any stuck SPEECH state from echo / loopback during - // the agent's turn does not block the next user utterance from - // emitting ``speech_start``. - this.resetVad(); - } - }, grace); + }, grace); + }; + // Phase 1 — the carrier is still PLAYING audio we already pushed. + // Agent-runtime LLMs (Hermes/OpenClaw) deliver the whole reply at + // once, TTS outruns realtime, and the carrier buffers tens of seconds + // of audio that keeps playing long after this method runs. For that + // whole audible window the agent IS still speaking from the caller's + // perspective: keep ``isSpeaking=true`` with ``tailGraceActive=false`` + // so VAD/transcript barge-in takes the cancel path (``sendClear`` + // drops the carrier buffer) instead of the next-turn rescue — without + // this, "the agent detects the interruption but keeps talking". + // A barge-in meanwhile bumps ``speakingGeneration`` (cancelSpeaking), + // which no-ops this timer. Phase 2 — the existing echo-tail grace. + const bufferedMs = Math.max(0, this.playbackBufferedUntil - Date.now()); + if (bufferedMs <= 0) { + startTailGrace(); + } else { + this.graceTimer = setTimeout(() => { + this.graceTimer = null; + if (this.speakingGeneration === gen) startTailGrace(); + }, bufferedMs); + } } else { this.isSpeaking = false; this.tailGraceActive = false; @@ -916,6 +979,9 @@ export class StreamHandler { this.tailGraceActive = false; this.speakingStartedAt = null; this.firstAudioSentAt = null; + // Tail grace only starts after the playback cursor drained (phase 1 of + // ``endSpeakingWithGrace``), so there is no carrier backlog left here. + this.playbackBufferedUntil = 0; this.speakingGeneration++; // invalidates the pending grace timer this.clearGraceTimer(); this.clearPendingBargeIn(); @@ -2540,6 +2606,7 @@ export class StreamHandler { } const encoded = this.encodePipelineAudio(processedAudio); this.deps.bridge.sendAudio(this.ws, encoded, this.streamSid); + this.trackOutboundPlayback(processedAudio.length); this.markFirstAudioSent(); } } catch (e) { diff --git a/libraries/typescript/tests/unit/pipeline-bargein-buffered.test.ts b/libraries/typescript/tests/unit/pipeline-bargein-buffered.test.ts new file mode 100644 index 0000000..c4ccd97 --- /dev/null +++ b/libraries/typescript/tests/unit/pipeline-bargein-buffered.test.ts @@ -0,0 +1,239 @@ +/** + * [unit] Barge-in while the carrier still plays buffered audio. + * + * The pipeline pushes TTS audio to the carrier as fast as the provider + * synthesizes it; the carrier buffers and plays at realtime. With an + * agent-runtime LLM (Hermes / OpenClaw) the whole — often long — reply + * arrives at once, so the SDK finishes *pushing* tens of seconds before the + * caller finishes *hearing*. The handler must keep ``isSpeaking=true`` (with + * ``tailGraceActive=false``) for that whole audible backlog so a barge-in + * still takes the cancel path (``sendClear`` drops the carrier buffer) + * instead of being mis-read as a calm next turn — previously the fixed 1.5 s + * grace expired mid-reply and "the agent detected the barge-in but kept + * talking". + * + * Parity with Python tests/unit/test_pipeline_bargein_buffered.py. + */ + +import { describe, it, expect, vi, beforeEach, afterEach } from 'vitest'; +import type { TelephonyBridge, StreamHandlerDeps } from '../../src/stream-handler'; +import { StreamHandler } from '../../src/stream-handler'; +import { MetricsStore } from '../../src/dashboard/store'; +import { RemoteMessageHandler } from '../../src/remote-message'; +import { PipelineHookExecutor } from '../../src/pipeline-hooks'; +import type { WebSocket as WSWebSocket } from 'ws'; + +function makeMockBridge(overrides?: Partial): TelephonyBridge { + return { + label: 'TestBridge', + telephonyProvider: 'twilio', + sendAudio: vi.fn(), + sendMark: vi.fn(), + sendClear: vi.fn(), + transferCall: vi.fn().mockResolvedValue(undefined), + endCall: vi.fn().mockResolvedValue(undefined), + createStt: vi.fn().mockReturnValue(null), + queryTelephonyCost: vi.fn().mockResolvedValue(undefined), + ...overrides, + }; +} + +function makeMockWs(): WSWebSocket { + return { + send: vi.fn(), + close: vi.fn(), + on: vi.fn(), + once: vi.fn(), + readyState: 1, + removeListener: vi.fn(), + addEventListener: vi.fn(), + removeEventListener: vi.fn(), + } as unknown as WSWebSocket; +} + +function makeDeps(overrides?: Partial): StreamHandlerDeps { + return { + config: { openaiKey: 'test-oai-key' }, + agent: { systemPrompt: 'Test agent', provider: 'pipeline' }, + bridge: makeMockBridge(), + metricsStore: new MetricsStore(), + pricing: null, + remoteHandler: new RemoteMessageHandler(), + recording: false, + buildAIAdapter: vi.fn().mockReturnValue(null), + sanitizeVariables: vi.fn(() => ({})), + resolveVariables: vi.fn((tpl: string) => tpl), + ...overrides, + } as StreamHandlerDeps; +} + +function makeHandler(deps = makeDeps()): StreamHandler { + return new StreamHandler(deps, makeMockWs(), '+15551111111', '+15552222222'); +} + +describe('[unit] trackOutboundPlayback — cursor math', () => { + it('advances the cursor by chunk duration for PCM16 @ 16 kHz', () => { + const h = makeHandler() as any; + const before = Date.now(); + h.trackOutboundPlayback(3200); // 100 ms at 32 bytes/ms + expect(h.playbackBufferedUntil).toBeGreaterThanOrEqual(before + 95); + expect(h.playbackBufferedUntil).toBeLessThanOrEqual(before + 150); + }); + + it('uses 8 bytes/ms when the TTS emits carrier-native μ-law 8 kHz', () => { + const h = makeHandler() as any; + h.ttsOutputFormatNativeForCarrier = true; // Twilio bridge in makeDeps + const before = Date.now(); + h.trackOutboundPlayback(800); // 100 ms at 8 bytes/ms + expect(h.playbackBufferedUntil).toBeGreaterThanOrEqual(before + 95); + expect(h.playbackBufferedUntil).toBeLessThanOrEqual(before + 150); + }); + + it('keeps 32 bytes/ms for Telnyx native pcm_16000', () => { + const h = makeHandler( + makeDeps({ bridge: makeMockBridge({ telephonyProvider: 'telnyx' }) }), + ) as any; + h.ttsOutputFormatNativeForCarrier = true; + const before = Date.now(); + h.trackOutboundPlayback(3200); // still 100 ms at 32 bytes/ms + expect(h.playbackBufferedUntil).toBeGreaterThanOrEqual(before + 95); + expect(h.playbackBufferedUntil).toBeLessThanOrEqual(before + 150); + }); + + it('accumulates back-to-back chunks', () => { + const h = makeHandler() as any; + const before = Date.now(); + h.trackOutboundPlayback(3200); + h.trackOutboundPlayback(3200); + expect(h.playbackBufferedUntil).toBeGreaterThanOrEqual(before + 195); + expect(h.playbackBufferedUntil).toBeLessThanOrEqual(before + 250); + }); + + it('rebases to now after an idle gap', () => { + const h = makeHandler() as any; + h.playbackBufferedUntil = Date.now() - 10_000; + const before = Date.now(); + h.trackOutboundPlayback(3200); + expect(h.playbackBufferedUntil).toBeGreaterThanOrEqual(before + 95); + expect(h.playbackBufferedUntil).toBeLessThanOrEqual(before + 150); + }); + + it('treats an empty chunk as a no-op', () => { + const h = makeHandler() as any; + h.trackOutboundPlayback(0); + expect(h.playbackBufferedUntil).toBe(0); + }); +}); + +describe('[unit] endSpeakingWithGrace — two-phase wait', () => { + beforeEach(() => { + vi.useFakeTimers(); + vi.stubEnv('PATTER_TTS_TAIL_GRACE_MS', '50'); + }); + afterEach(() => { + vi.useRealTimers(); + vi.unstubAllEnvs(); + }); + + it('holds isSpeaking (NOT tail grace) while the carrier backlog plays', () => { + const h = makeHandler() as any; + h.isSpeaking = true; + h.playbackBufferedUntil = Date.now() + 500; + + h.endSpeakingWithGrace(); + vi.advanceTimersByTime(100); // well inside the backlog window + + expect(h.isSpeaking).toBe(true); + expect(h.tailGraceActive).toBe(false); + h.clearGraceTimer(); + }); + + it('drains the backlog, then tail grace, then flips to idle', () => { + const h = makeHandler() as any; + h.isSpeaking = true; + h.playbackBufferedUntil = Date.now() + 150; + + h.endSpeakingWithGrace(); + vi.advanceTimersByTime(160); // backlog drained + expect(h.isSpeaking).toBe(true); + expect(h.tailGraceActive).toBe(true); + + vi.advanceTimersByTime(60); // grace elapsed + expect(h.isSpeaking).toBe(false); + expect(h.tailGraceActive).toBe(false); + }); + + it('starts tail grace immediately when there is no backlog (legacy path)', () => { + const h = makeHandler() as any; + h.isSpeaking = true; + expect(h.playbackBufferedUntil).toBe(0); + + h.endSpeakingWithGrace(); + expect(h.tailGraceActive).toBe(true); + + vi.advanceTimersByTime(60); + expect(h.isSpeaking).toBe(false); + }); +}); + +describe('[unit] barge-in during the buffered backlog — Hermes/OpenClaw regression', () => { + beforeEach(() => { + vi.stubEnv('PATTER_TTS_TAIL_GRACE_MS', '50'); + }); + afterEach(() => { + vi.unstubAllEnvs(); + }); + + it('a transcript during the backlog runs the FULL cancel path', () => { + const deps = makeDeps(); + const h = makeHandler(deps) as any; + h.isSpeaking = true; + // Turn finished pushing; carrier still has seconds of audio queued. + h.playbackBufferedUntil = Date.now() + 5_000; + h.endSpeakingWithGrace(); + + expect(h.isSpeaking).toBe(true); // backlog holds the floor + expect(h.tailGraceActive).toBe(false); + + const interrupted = h.handleBargeIn({ text: 'aspetta', isFinal: false }); + + expect(interrupted).toBe(true); + expect(deps.bridge.sendClear).toHaveBeenCalledTimes(1); + expect(h.isSpeaking).toBe(false); + expect(h.playbackBufferedUntil).toBe(0); + h.clearGraceTimer(); + }); + + it('cancelSpeaking resets the playback cursor', () => { + const h = makeHandler() as any; + h.isSpeaking = true; + h.playbackBufferedUntil = Date.now() + 5_000; + + h.cancelSpeaking(); + + expect(h.playbackBufferedUntil).toBe(0); + }); + + it('synthesizeSentence advances the cursor for every pushed chunk', async () => { + const deps = makeDeps(); + const h = makeHandler(deps) as any; + h.isSpeaking = true; + h.tts = { + synthesizeStream: async function* () { + yield Buffer.alloc(6400); // 200 ms of PCM16 @ 16 kHz + }, + }; + + const before = Date.now(); + await h.synthesizeSentence( + 'ciao', + new PipelineHookExecutor(undefined), + h.buildHookContext(), + { value: false }, + ); + + expect(deps.bridge.sendAudio).toHaveBeenCalled(); + expect(h.playbackBufferedUntil).toBeGreaterThanOrEqual(before + 195); + expect(h.playbackBufferedUntil).toBeLessThanOrEqual(before + 300); + }); +}); From 24184a05d98e70a8126bd6cdf27113f38967f22a Mon Sep 17 00:00:00 2001 From: nicolotognoni Date: Wed, 10 Jun 2026 10:32:17 +0200 Subject: [PATCH 2/2] feat(pipeline): record only the heard reply prefix on barge-in (LiveKit-style truncation) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Two gaps with agent-runtime LLMs (Hermes/OpenClaw), building on the playback cursor from the previous commit: - Mid-turn barge-in: the whole reply was already synthesized into the carrier buffer, so the '[interrupted by caller]' marker was appended to the FULL text — a stateful runtime believed the caller heard everything. - Post-complete barge-in (during the buffered tail): no marker at all — history kept the full reply the caller never finished hearing. Track per-turn (sentence, playback_start) segments at each sentence's first audible chunk (filler and llm_error_message audio advance the clock but add no segment), map heard = total_pushed - carrier_backlog to a sentence-granular prefix, and: (a) the streaming path records ' [interrupted by caller]'; (b) the barge-in cancel paths rewrite the last assistant history entry the same way before clearing the buffer. Legacy full-text marker preserved when no segments were tracked. Full Python/TS parity with mirrored unit tests. --- CHANGELOG.md | 1 + libraries/python/getpatter/stream_handler.py | 163 +++++++++++++- .../unit/test_pipeline_bargein_buffered.py | 198 ++++++++++++++++++ libraries/typescript/src/stream-handler.ts | 137 +++++++++++- .../tests/llm-error-fallback.mocked.test.ts | 2 + .../tests/long-turn-filler.mocked.test.ts | 2 + .../unit/pipeline-bargein-buffered.test.ts | 135 ++++++++++++ 7 files changed, 628 insertions(+), 10 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 6d5b400..aeee18b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -88,6 +88,7 @@ - **Interrupted-turn context rewrite** — on a confirmed mid-turn barge-in the spoken prefix is recorded in history with an `[interrupted by caller]` marker (instead of an ungrounded full reply), so a stateful agent runtime (Hermes/OpenClaw, keyed by `X-Hermes-Session-Id`) sees on the next turn that it was cut off and what the caller actually heard. `libraries/python/getpatter/stream_handler.py`, `libraries/typescript/src/stream-handler.ts`. - **Forward-STT-without-AEC no longer self-interrupts on its own echo.** The remaining live Hermes/OpenClaw barge-in failure: with `PATTER_FORWARD_STT_WHILE_SPEAKING` on, no AEC, and no `barge_in_strategies`, a VAD `speech_start` during TTS cancelled the turn immediately — but on a no-AEC link that `speech_start` is very often the agent's *own* TTS echo (or pre-first-token line noise during a long tool-running turn). The result was a cascade of false-positive interruptions: a short normal reply like "bene bene" produced `agent_text='[interrupted]'` with `bargein_ms≈0`, and the next turn's LLM ran for seconds but emitted `tts_characters=0` because it was torn down before its first token. The echo guard existed only on the *transcript* path, so the raw VAD-energy cancel had no protection. The VAD-energy cancel is now **deferred to transcript confirmation** whenever audio is forwarded during TTS without AEC (`forward_stt_while_speaking && aec is None`), exactly as it already was when `barge_in_strategies` are configured: the `speech_start` marks the barge-in *pending* (the agent keeps talking) and the cancel only fires once `_handle_barge_in` / `handleBargeIn` sees a real transcript that survives the echo guard; if none confirms within `barge_in_confirm_ms` (default 1500 ms) the agent resumes its sentence. The default VAD path and forward-STT *with* AEC keep the responsive immediate cancel — no behaviour change for existing configs. For the cleanest short-echo handling, still pair with `echo_cancellation=True` or `barge_in_strategies`. `libraries/python/getpatter/stream_handler.py`, `libraries/typescript/src/stream-handler.ts`. - **Barge-in now works while the carrier is still PLAYING a long buffered reply — the "Hermes detects the interruption but keeps talking" bug.** The pipeline pushes TTS audio to the carrier as fast as the provider synthesizes it (no pacing) while the carrier buffers and plays at realtime. With a token-paced LLM the two stay roughly in sync, but an agent-runtime LLM (`HermesLLM` / `OpenClawLLM`) delivers its whole — often long — reply at once after the thinking pause: TTS outruns realtime and the carrier ends up holding tens of seconds of queued audio. The handler's speaking state ended a fixed `PATTER_TTS_TAIL_GRACE_MS` (1.5 s) after the last *push*, not the last *playback* — so for most of the audible reply `_is_speaking` was already false, every VAD `speech_start` / transcript was treated as a calm next turn instead of a barge-in, `send_clear` was never sent, and the buffered audio kept playing over the caller (with the next turn's reply queued behind it). The handler now tracks an **estimated playback cursor** (`_playback_buffered_until` / `playbackBufferedUntil`, advanced per pushed chunk at the chunk's real byte rate — PCM16@16kHz or carrier-native μ-law@8kHz) and `_end_speaking_with_grace` waits in two phases: phase 1 keeps `_is_speaking=true` with `_tail_grace_active=false` for the whole estimated backlog (barge-in stays armed and takes the full cancel + `send_clear` path, which drops the carrier buffer instantly); phase 2 is the unchanged echo-tail grace. Barge-in cancels reset the cursor (the buffer was just cleared). No new config; token-paced LLMs (no backlog) behave byte-identically to before, and `PATTER_TTS_TAIL_GRACE_MS=0` still forces the legacy synchronous flip. This is the industry-standard semantics (stop + flush client-side regardless of LLM state — cf. Twilio media-stream `clear`). `libraries/python/getpatter/stream_handler.py`, `libraries/typescript/src/stream-handler.ts`. +- **Interrupted-turn history now records the reply prefix the caller actually HEARD (LiveKit-style truncation), not everything the LLM generated.** Builds on the playback cursor above. Two gaps closed: (a) on a **mid-turn** barge-in with an agent-runtime LLM, the whole reply had already been synthesized into the carrier buffer, so the `[interrupted by caller]` marker was appended to the FULL text — a stateful runtime (Hermes/OpenClaw) believed the caller heard everything; (b) on a barge-in landing **after the turn completed** (while the carrier still played the buffered tail) no marker was applied at all. The handler now tracks per-turn `(sentence, playback_start)` segments (`_turn_spoken_segments` / `turnSpokenSegments`; filler and `llm_error_message` audio advance the clock but add no segment) and maps `heard = total_pushed − carrier_backlog` to a sentence-granular prefix: the streaming path records ` [interrupted by caller]`, and the post-complete cancel paths rewrite the last assistant history entry the same way before clearing the buffer. No new config; with no tracked segments (e.g. no TTS) the legacy full-text marker is preserved. `libraries/python/getpatter/stream_handler.py`, `libraries/typescript/src/stream-handler.ts`. - **(Python) Twilio/Plivo mark frames now carry the caller-supplied name — first-message pacing no longer burns the mark-await timeout on every call.** `TwilioAudioSender.send_mark` (and the Plivo checkpoint equivalent) discarded the `mark_name` argument and sent a locally generated `audio_N` instead, so the `fm_N` echo the first-message pacer waited for never matched and every mark resolved via the 0.5 s fallback timeout (~1.5 s of guaranteed extra latency in the barge-in window of every Twilio call). The wire name is now the caller's, matching the TypeScript behaviour. `libraries/python/getpatter/telephony/twilio.py`, `.../telephony/plivo.py`. - **(TypeScript) Inbound audio frames are now awaited — a transient audio-path error can no longer kill the whole server.** All three carrier WS message handlers called `handler.handleAudio(...)` without `await`, so a rejection inside the audio path (VAD, resampler, STT send) escaped the surrounding `try/catch` and became an unhandled rejection, which terminates the Node process (Node 15+) together with every active call. `libraries/typescript/src/server.ts`. - **(TypeScript) Telnyx calls no longer leak `activeCallIds` entries.** The Telnyx WS close handler was the only one of the three carriers that never deleted its `ws → call_control_id` map entry, so the map grew for the server's lifetime and graceful shutdown issued hangup REST calls for long-dead calls. `libraries/typescript/src/server.ts`. diff --git a/libraries/python/getpatter/stream_handler.py b/libraries/python/getpatter/stream_handler.py index 68fa4b5..b510808 100644 --- a/libraries/python/getpatter/stream_handler.py +++ b/libraries/python/getpatter/stream_handler.py @@ -2490,6 +2490,19 @@ def __init__( # ``_end_tail_grace_for_new_turn``. Mirrors TS # ``playbackBufferedUntil``. self._playback_buffered_until: float = 0.0 + # Per-turn playback timeline used to estimate the response prefix the + # caller actually HEARD when a barge-in lands. ``_turn_playback_total_s`` + # accumulates the playout duration of every chunk pushed this turn + # (including filler audio, which keeps the timeline aligned); + # ``_turn_spoken_segments`` records ``(sentence_text, + # cumulative_start_s)`` for each RESPONSE sentence at its first audible + # chunk (filler / error-fallback audio advances the clock but adds no + # segment). ``heard = total - remaining_backlog`` then maps to a + # sentence-granular prefix — see ``_heard_response_prefix``. Both reset + # at ``_begin_speaking``. Mirrors TS ``turnPlaybackTotalMs`` / + # ``turnSpokenSegments``. + self._turn_playback_total_s: float = 0.0 + self._turn_spoken_segments: list[tuple[str, float]] = [] # Optional barge-in confirmation strategies (see # ``getpatter.services.barge_in_strategies``). With an empty tuple # the SDK uses the legacy "cancel on first VAD speech_start" @@ -3140,8 +3153,14 @@ async def _synthesize_sentence( hook_executor: PipelineHookExecutor, hook_ctx: HookContext, first_tts_chunk: list, + record_segment: bool = True, ) -> bool: - """Synthesize a single sentence through TTS with hooks. Returns False if interrupted.""" + """Synthesize a single sentence through TTS with hooks. Returns False if interrupted. + + ``record_segment=False`` (filler / error-fallback audio) advances the + playback clock without adding a heard-prefix segment — that audio is + not part of the LLM's reply. See ``_heard_response_prefix``. + """ if self._tts is None: return True @@ -3217,6 +3236,18 @@ async def _synthesize_sentence( # very fast carrier echo is still seen by the next mic frame. if self._aec is not None: self._aec.push_far_end(processed_audio) + if record_segment: + # First audible chunk of this sentence — stamp its start + # on the per-turn playback timeline so a barge-in can + # estimate the heard prefix at sentence granularity. + # ``getattr`` is defensive against test fixtures built + # via ``object.__new__`` (no ``__init__``). + segments = getattr(self, "_turn_spoken_segments", None) + if segments is not None: + segments.append( + (processed, getattr(self, "_turn_playback_total_s", 0.0)) + ) + record_segment = False await self.audio_sender.send_audio(processed_audio) self._track_outbound_playback(len(processed_audio)) self._mark_first_audio_sent() @@ -3260,8 +3291,14 @@ async def _filler() -> None: # turn and we still hold the floor (no concurrent barge-in). if first_tts_chunk[0] and self._is_speaking: try: + # Filler audio is not part of the LLM's reply — advance + # the playback clock without a heard-prefix segment. await self._synthesize_sentence( - message, hook_executor, hook_ctx, first_tts_chunk + message, + hook_executor, + hook_ctx, + first_tts_chunk, + record_segment=False, ) except asyncio.CancelledError: raise @@ -3429,8 +3466,14 @@ async def _process_streaming_response(self, result, call_id: str) -> str: fallback = getattr(self.agent, "llm_error_message", None) if fallback and first_tts_chunk[0] and self._is_speaking: try: + # Error-fallback audio is not part of the LLM's reply + # — no heard-prefix segment. await self._synthesize_sentence( - fallback, hook_executor, hook_ctx, first_tts_chunk + fallback, + hook_executor, + hook_ctx, + first_tts_chunk, + record_segment=False, ) except Exception: # pragma: no cover - defensive logger.exception("llm_error_message fallback synthesis failed") @@ -3500,7 +3543,23 @@ async def _process_streaming_response(self, result, call_id: str) -> str: # ungrounded full reply that pollutes its context. self._last_response_interrupted = interrupted if interrupted and response_text: - response_text = f"{response_text} [interrupted by caller]" + # Truncate to what the caller actually HEARD, not everything the + # LLM generated. An agent-runtime LLM delivers the full reply at + # once, so by barge-in time ``full_response_parts`` can hold tens + # of seconds of text the caller never listened to — recording it + # would make a stateful runtime believe it was all said. Falls + # back to the legacy full-text marker when no playback segments + # were tracked (e.g. no TTS configured). + heard = self._heard_response_prefix() + if heard is not None: + heard_text, _heard_everything = heard + response_text = ( + f"{heard_text} [interrupted by caller]" + if heard_text + else "[interrupted by caller]" + ) + else: + response_text = f"{response_text} [interrupted by caller]" return response_text async def _process_regular_response(self, response_text: str, call_id: str) -> None: @@ -3653,6 +3712,11 @@ async def _do_cancel_for_barge_in(self, transcript_text: str) -> None: self._speaking_started_at = None self._first_audio_sent_at = None self._last_cancel_at = time.time() + # A barge-in landing AFTER the turn completed (carrier still + # draining the buffered tail) — rewrite the history to the heard + # prefix FIRST, while the playback cursor still measures what + # was left unheard. + self._maybe_truncate_completed_turn_history() # The ``send_clear`` below drops whatever the carrier had # buffered ahead — snap the playback cursor back and kill any # pending grace task so its phase-1 wait (carrier backlog) / @@ -4267,6 +4331,10 @@ async def on_audio_received(self, audio_bytes: bytes) -> None: self._speaking_generation += 1 self._last_cancel_at = time.time() self._suppressed_speech_pending = False + # Post-complete barge-in during the buffered + # tail — rewrite history to the heard prefix + # BEFORE resetting the playback cursor. + self._maybe_truncate_completed_turn_history() # ``send_clear`` above dropped the carrier's # buffered audio — reset the playback cursor. self._playback_buffered_until = 0.0 @@ -4433,6 +4501,9 @@ async def _begin_speaking(self, is_first_message: bool = False) -> None: # Fresh turn — reset the echo-guard reference so this turn's barge-in # checks compare against THIS turn's spoken text, not the last turn's. self._current_agent_spoken_text = "" + # Fresh turn — reset the heard-prefix playback timeline. + self._turn_playback_total_s = 0.0 + self._turn_spoken_segments = [] # Reset the VAD detector so the next user utterance triggers a clean # SILENCE→SPEECH transition. Without this, PSTN echo from the # previous turn can keep the smoothed probability above the @@ -4474,9 +4545,91 @@ def _track_outbound_playback(self, num_bytes: int) -> None: else 32_000.0 ) now = time.time() + chunk_s = num_bytes / bytes_per_s buffered_until = getattr(self, "_playback_buffered_until", 0.0) base = buffered_until if buffered_until > now else now - self._playback_buffered_until = base + (num_bytes / bytes_per_s) + self._playback_buffered_until = base + chunk_s + # Per-turn playout total — the time axis for the heard-prefix + # estimate (see ``_heard_response_prefix``). Reset at + # ``_begin_speaking``. + self._turn_playback_total_s = ( + getattr(self, "_turn_playback_total_s", 0.0) + chunk_s + ) + + def _heard_response_prefix(self) -> tuple[str, bool] | None: + """Estimate the response prefix the caller actually HEARD this turn. + + The pipeline pushes audio faster than realtime, so at barge-in time + ``heard = total_pushed - carrier_backlog`` seconds of audio have + actually played. Mapped at sentence granularity against + ``_turn_spoken_segments``: a sentence counts as heard once its + playback has STARTED (``start <= heard``), so the sentence playing at + the moment of interruption is included. + + Returns ``None`` when no segments were tracked this turn (nothing + synthesized through the tracked path — callers fall back to the + legacy full-text behaviour). Otherwise ``(heard_text, + heard_everything)``. Mirrors TS ``heardResponsePrefix``. + """ + segments = getattr(self, "_turn_spoken_segments", None) + if not segments: + return None + total_s = getattr(self, "_turn_playback_total_s", 0.0) + remaining_s = max( + 0.0, getattr(self, "_playback_buffered_until", 0.0) - time.time() + ) + heard_s = max(0.0, total_s - remaining_s) + heard = [text for text, start_s in segments if start_s <= heard_s] + return " ".join(heard), len(heard) == len(segments) + + def _rewrite_last_assistant_entry(self, text: str) -> None: + """Replace the text of the most recent assistant entry in the + conversation history and the dashboard transcript. No-op when the + last entry is not an assistant turn (e.g. the caller's next turn was + already committed).""" + for entries in ( + getattr(self, "conversation_history", None), + getattr(self, "transcript_entries", None), + ): + if not entries: + continue + last = entries[-1] + if isinstance(last, dict) and last.get("role") == "assistant": + last["text"] = text + + def _maybe_truncate_completed_turn_history(self) -> None: + """LiveKit-style "heard prefix" semantics for a barge-in that lands + AFTER the turn completed, while the carrier is still playing the + buffered tail. + + The completed turn already recorded its FULL reply in history, but + the caller only heard part of it before interrupting — a stateful + agent runtime (Hermes / OpenClaw) would otherwise "remember saying" + things the caller never heard. Rewrites the last assistant entry to + the heard prefix + ``[interrupted by caller]``. + + MUST run BEFORE the cancel path resets ``_playback_buffered_until`` + (the backlog is the heard-prefix input). No-op when a turn is still + in flight (the streaming path applies its own marker), when there is + no backlog, or when everything was already heard. + """ + dispatch = getattr(self, "_dispatch_task", None) + if dispatch is not None and not dispatch.done(): + return + remaining_s = getattr(self, "_playback_buffered_until", 0.0) - time.time() + if remaining_s <= 0: + return + heard = self._heard_response_prefix() + if heard is None: + return + heard_text, heard_everything = heard + if heard_everything: + return + self._rewrite_last_assistant_entry( + f"{heard_text} [interrupted by caller]" + if heard_text + else "[interrupted by caller]" + ) def _can_barge_in(self) -> bool: """Whether barge-in is allowed to fire right now. diff --git a/libraries/python/tests/unit/test_pipeline_bargein_buffered.py b/libraries/python/tests/unit/test_pipeline_bargein_buffered.py index 0a56c87..0f7c480 100644 --- a/libraries/python/tests/unit/test_pipeline_bargein_buffered.py +++ b/libraries/python/tests/unit/test_pipeline_bargein_buffered.py @@ -201,6 +201,58 @@ async def test_cancel_resets_cursor_and_grace_task(self) -> None: assert handler._playback_buffered_until == 0.0 assert handler._grace_task is None + async def test_synthesize_sentence_records_heard_prefix_segment(self) -> None: + """Each response sentence is stamped on the per-turn playback + timeline at its first audible chunk.""" + + class _StubTTS: + async def synthesize(self, _text: str): + yield b"\x00" * 6400 # 200 ms of PCM16 @ 16 kHz + + handler = _make_handler(_make_audio_sender()) + handler._tts = _StubTTS() + handler._is_speaking = True + + from getpatter.services.pipeline_hooks import PipelineHookExecutor + + ctx = handler._build_hook_context() + await handler._synthesize_sentence( + "Frase uno.", PipelineHookExecutor(None), ctx, [True] + ) + await handler._synthesize_sentence( + "Frase due.", PipelineHookExecutor(None), ctx, [False] + ) + + assert handler._turn_spoken_segments == [ + ("Frase uno.", 0.0), + ("Frase due.", pytest.approx(0.2)), + ] + + async def test_filler_audio_advances_clock_without_segment(self) -> None: + """record_segment=False (filler / error fallback) advances the + playback clock but adds no heard-prefix segment.""" + + class _StubTTS: + async def synthesize(self, _text: str): + yield b"\x00" * 6400 + + handler = _make_handler(_make_audio_sender()) + handler._tts = _StubTTS() + handler._is_speaking = True + + from getpatter.services.pipeline_hooks import PipelineHookExecutor + + await handler._synthesize_sentence( + "One moment.", + PipelineHookExecutor(None), + handler._build_hook_context(), + [True], + record_segment=False, + ) + + assert handler._turn_spoken_segments == [] + assert handler._turn_playback_total_s == pytest.approx(0.2) + async def test_synthesize_sentence_tracks_pushed_audio(self) -> None: """The pipeline TTS path must advance the playback cursor for every chunk it pushes to the carrier.""" @@ -222,3 +274,149 @@ async def synthesize(self, _text: str): assert ok is True assert handler._playback_buffered_until == pytest.approx(before + 0.2, abs=0.1) + + +# --------------------------------------------------------------------------- +# Heard-prefix estimation — what did the caller actually listen to? +# --------------------------------------------------------------------------- + + +@pytest.mark.unit +class TestHeardResponsePrefix: + def test_maps_backlog_to_sentence_prefix(self) -> None: + handler = _make_handler(_make_audio_sender()) + handler._turn_spoken_segments = [ + ("Frase uno.", 0.0), + ("Frase due.", 2.0), + ("Frase tre.", 4.0), + ] + handler._turn_playback_total_s = 6.0 + # 4 s still buffered → only the first 2 s actually played. + handler._playback_buffered_until = time.time() + 4.0 + + text, heard_everything = handler._heard_response_prefix() + + assert text == "Frase uno. Frase due." + assert heard_everything is False + + def test_no_segments_returns_none(self) -> None: + handler = _make_handler(_make_audio_sender()) + assert handler._heard_response_prefix() is None + + def test_drained_backlog_means_everything_heard(self) -> None: + handler = _make_handler(_make_audio_sender()) + handler._turn_spoken_segments = [("Frase uno.", 0.0), ("Frase due.", 2.0)] + handler._turn_playback_total_s = 4.0 + handler._playback_buffered_until = 0.0 # long drained + + text, heard_everything = handler._heard_response_prefix() + + assert text == "Frase uno. Frase due." + assert heard_everything is True + + +# --------------------------------------------------------------------------- +# Post-complete barge-in — rewrite history to the heard prefix +# --------------------------------------------------------------------------- + + +def _completed_turn_handler(full_text: str) -> PipelineStreamHandler: + """Handler in the post-complete state: reply recorded in history, carrier + still playing the buffered tail.""" + handler = _make_handler(_make_audio_sender()) + handler._is_speaking = True + handler.conversation_history.append( + {"role": "assistant", "text": full_text, "timestamp": time.time()} + ) + handler.transcript_entries.append({"role": "assistant", "text": full_text}) + handler._turn_spoken_segments = [ + ("Frase uno.", 0.0), + ("Frase due.", 2.0), + ("Frase tre.", 4.0), + ] + handler._turn_playback_total_s = 6.0 + handler._playback_buffered_until = time.time() + 4.0 + return handler + + +@pytest.mark.unit +@pytest.mark.asyncio +class TestPostCompleteHeardPrefixRewrite: + async def test_bargein_during_tail_rewrites_history(self) -> None: + """A barge-in after turn-complete must truncate the recorded reply to + the heard prefix so a stateful runtime doesn't 'remember saying' + sentences the caller never heard.""" + full = "Frase uno. Frase due. Frase tre." + handler = _completed_turn_handler(full) + + await handler._do_cancel_for_barge_in("aspetta") + + expected = "Frase uno. Frase due. [interrupted by caller]" + assert handler.conversation_history[-1]["text"] == expected + assert handler.transcript_entries[-1]["text"] == expected + + async def test_no_backlog_no_rewrite(self) -> None: + full = "Frase uno. Frase due. Frase tre." + handler = _completed_turn_handler(full) + handler._playback_buffered_until = 0.0 # everything already played + + await handler._do_cancel_for_barge_in("ok") + + assert handler.conversation_history[-1]["text"] == full + + async def test_in_flight_turn_is_not_rewritten(self) -> None: + """While a turn is still in flight the streaming path owns the + marker — the post-complete rewrite must not double-apply.""" + full = "Frase uno. Frase due. Frase tre." + handler = _completed_turn_handler(full) + handler._dispatch_task = asyncio.create_task(asyncio.sleep(0.5)) + + try: + await handler._do_cancel_for_barge_in("aspetta") + assert handler.conversation_history[-1]["text"] == full + finally: + handler._dispatch_task.cancel() + try: + await handler._dispatch_task + except asyncio.CancelledError: + pass + + +# --------------------------------------------------------------------------- +# Mid-turn barge-in — the marker records only the heard prefix +# --------------------------------------------------------------------------- + + +@pytest.mark.unit +@pytest.mark.asyncio +class TestMidTurnHeardPrefixMarker: + async def test_interrupted_response_truncates_to_heard_prefix( + self, monkeypatch + ) -> None: + """An agent-runtime LLM delivers the full reply at once: every + sentence is synthesized into the carrier buffer within ms, but the + caller has only HEARD the first one when the barge-in lands. The + history marker must record that prefix, not the whole reply.""" + monkeypatch.setenv("PATTER_TTS_TAIL_GRACE_MS", "0") + handler = _make_handler(_make_audio_sender()) + + class _BigChunkTTS: + output_format = "pcm_16000" + + async def synthesize(self, _text: str): + yield b"\x00" * 64000 # 2 s of PCM16 @ 16 kHz per sentence + + handler._tts = _BigChunkTTS() + + async def _result(): + yield "Frase uno. " + yield "Frase due. " + # Barge-in lands after both sentences were PUSHED (4 s buffered) + # but before any further token. + handler._llm_cancel_event.set() + yield "Frase tre." + + text = await handler._process_streaming_response(_result(), "call-heard") + + assert handler._last_response_interrupted is True + assert text == "Frase uno. [interrupted by caller]" diff --git a/libraries/typescript/src/stream-handler.ts b/libraries/typescript/src/stream-handler.ts index c556e3a..2526f1c 100644 --- a/libraries/typescript/src/stream-handler.ts +++ b/libraries/typescript/src/stream-handler.ts @@ -569,6 +569,20 @@ export class StreamHandler { * cleared) and ``endTailGraceForNewTurn``. */ private playbackBufferedUntil = 0; + /** + * Per-turn playback timeline used to estimate the response prefix the + * caller actually HEARD when a barge-in lands. ``turnPlaybackTotalMs`` + * accumulates the playout duration of every chunk pushed this turn + * (including filler audio, which keeps the timeline aligned); + * ``turnSpokenSegments`` records ``{text, startMs}`` for each RESPONSE + * sentence at its first audible chunk (filler / error-fallback audio + * advances the clock but adds no segment). ``heard = total - backlog`` + * then maps to a sentence-granular prefix — see ``heardResponsePrefix``. + * Both reset at ``beginSpeaking``. Mirrors Python + * ``_turn_playback_total_s`` / ``_turn_spoken_segments``. + */ + private turnPlaybackTotalMs = 0; + private turnSpokenSegments: Array<{ readonly text: string; readonly startMs: number }> = []; /** * Optional barge-in confirmation strategies. With an empty array the * SDK falls back to the legacy "cancel on first VAD speech_start" @@ -719,6 +733,9 @@ export class StreamHandler { // Fresh turn — reset the echo-guard reference so barge-in checks compare // against THIS turn's spoken text, not the last turn's. this.currentAgentSpokenText = ''; + // Fresh turn — reset the heard-prefix playback timeline. + this.turnPlaybackTotalMs = 0; + this.turnSpokenSegments = []; // Reset the VAD detector so the next user utterance triggers a clean // SILENCE→SPEECH transition. Without this, PSTN echo from the previous // turn can keep the detector's smoothed probability above the @@ -761,9 +778,79 @@ export class StreamHandler { ? 8 : 32; const now = Date.now(); + const chunkMs = numBytes / bytesPerMs; const base = this.playbackBufferedUntil > now ? this.playbackBufferedUntil : now; - this.playbackBufferedUntil = base + numBytes / bytesPerMs; + this.playbackBufferedUntil = base + chunkMs; + // Per-turn playout total — the time axis for the heard-prefix estimate + // (see ``heardResponsePrefix``). Reset at ``beginSpeaking``. + this.turnPlaybackTotalMs += chunkMs; + } + + /** + * Estimate the response prefix the caller actually HEARD this turn. + * + * The pipeline pushes audio faster than realtime, so at barge-in time + * ``heard = totalPushed - carrierBacklog`` ms of audio have actually + * played. Mapped at sentence granularity against ``turnSpokenSegments``: + * a sentence counts as heard once its playback has STARTED + * (``startMs <= heardMs``), so the sentence playing at the moment of + * interruption is included. + * + * Returns ``null`` when no segments were tracked this turn (nothing + * synthesized through the tracked path — callers fall back to the legacy + * full-text behaviour). Mirrors Python ``_heard_response_prefix``. + */ + private heardResponsePrefix(): { text: string; heardEverything: boolean } | null { + if (this.turnSpokenSegments.length === 0) return null; + const remainingMs = Math.max(0, this.playbackBufferedUntil - Date.now()); + const heardMs = Math.max(0, this.turnPlaybackTotalMs - remainingMs); + const heard = this.turnSpokenSegments.filter((s) => s.startMs <= heardMs); + return { + text: heard.map((s) => s.text).join(' '), + heardEverything: heard.length === this.turnSpokenSegments.length, + }; + } + + /** + * Replace the text of the most recent assistant entry in the conversation + * history. No-op when the last entry is not an assistant turn (e.g. the + * caller's next turn was already committed). + */ + private rewriteLastAssistantEntry(text: string): void { + const entries = this.history.entries; + const last = entries[entries.length - 1]; + if (last && last.role === 'assistant') { + entries[entries.length - 1] = { ...last, text }; + } + } + + /** + * LiveKit-style "heard prefix" semantics for a barge-in that lands AFTER + * the turn completed, while the carrier is still playing the buffered + * tail. + * + * The completed turn already recorded its FULL reply in history, but the + * caller only heard part of it before interrupting — a stateful agent + * runtime (Hermes / OpenClaw) would otherwise "remember saying" things + * the caller never heard. Rewrites the last assistant entry to the heard + * prefix + ``[interrupted by caller]``. + * + * MUST run BEFORE ``cancelSpeaking`` resets ``playbackBufferedUntil`` + * (the backlog is the heard-prefix input). No-op when a turn is still in + * flight (the streaming path applies its own marker), when there is no + * backlog, or when everything was already heard. Mirrors Python + * ``_maybe_truncate_completed_turn_history``. + */ + private maybeTruncateCompletedTurnHistory(): void { + if (this.dispatchTask !== null) return; // turn still in flight + const remainingMs = this.playbackBufferedUntil - Date.now(); + if (remainingMs <= 0) return; + const heard = this.heardResponsePrefix(); + if (heard === null || heard.heardEverything) return; + this.rewriteLastAssistantEntry( + heard.text ? `${heard.text} [interrupted by caller]` : '[interrupted by caller]', + ); } /** @@ -1718,6 +1805,10 @@ export class StreamHandler { this.metricsAcc.recordBargeinDetected(); const bargeinSpan = startSpan(SPAN_BARGEIN, { 'patter.call.id': this.callId }); try { + // Post-complete barge-in during the buffered tail — rewrite + // history to the heard prefix BEFORE cancelSpeaking resets + // the playback cursor. + this.maybeTruncateCompletedTurnHistory(); this.cancelSpeaking(); try { this.deps.bridge.sendClear(this.ws, this.streamSid); @@ -2558,7 +2649,11 @@ export class StreamHandler { hookExecutor: PipelineHookExecutor, hookCtx: HookContext, ttsFirstByteSent: { value: boolean }, + recordSegment = true, ): Promise { + // ``recordSegment=false`` (filler / error-fallback audio) advances the + // playback clock without adding a heard-prefix segment — that audio is + // not part of the LLM's reply. See ``heardResponsePrefix``. if (!this.tts || !this.isSpeaking) return; // Apply text transforms before the beforeSynthesize hook @@ -2604,6 +2699,16 @@ export class StreamHandler { if (this.aec) { this.aec.pushFarEnd(processedAudio); } + if (recordSegment) { + // First audible chunk of this sentence — stamp its start on the + // per-turn playback timeline so a barge-in can estimate the heard + // prefix at sentence granularity. + this.turnSpokenSegments.push({ + text: processedText, + startMs: this.turnPlaybackTotalMs, + }); + recordSegment = false; + } const encoded = this.encodePipelineAudio(processedAudio); this.deps.bridge.sendAudio(this.ws, encoded, this.streamSid); this.trackOutboundPlayback(processedAudio.length); @@ -2835,9 +2940,23 @@ export class StreamHandler { // Marker goes to the history/transcript ONLY (so a stateful agent // runtime sees it was interrupted); metrics use the PLAIN text and are // gated on !interrupted — mirrors Python. - const spokenText = interrupted - ? `${responseText} [interrupted by caller]` - : responseText; + let spokenText = responseText; + if (interrupted) { + // Truncate to what the caller actually HEARD, not everything the + // LLM generated — an agent-runtime LLM delivers the full reply at + // once, so by barge-in time ``responseText`` can hold tens of + // seconds of text the caller never listened to. Falls back to the + // legacy full-text marker when no playback segments were tracked + // (e.g. no TTS configured). Mirrors Python + // ``_process_streaming_response``. + const heard = this.heardResponsePrefix(); + spokenText = + heard === null + ? `${responseText} [interrupted by caller]` + : heard.text + ? `${heard.text} [interrupted by caller]` + : '[interrupted by caller]'; + } await this.emitAssistantTranscript(spokenText); if (!interrupted) this.metricsAcc.recordTtsComplete(responseText); } else { @@ -2998,6 +3117,9 @@ export class StreamHandler { this.metricsAcc.recordBargeinDetected(); const bargeinSpan = startSpan(SPAN_BARGEIN, { 'patter.call.id': this.callId }); try { + // Post-complete barge-in during the buffered tail — rewrite history to + // the heard prefix BEFORE cancelSpeaking resets the playback cursor. + this.maybeTruncateCompletedTurnHistory(); this.cancelSpeaking(); try { this.deps.bridge.sendClear(this.ws, this.streamSid); @@ -3139,11 +3261,14 @@ export class StreamHandler { if (cancelled || ttsFirstByteSent.value || !this.isSpeaking) return; // Track the in-flight synthesis so clear() can await it — serializing the // filler before the real sentence so their audio can never interleave. + // Filler audio is not part of the LLM's reply — advance the playback + // clock without a heard-prefix segment (recordSegment=false). inFlight = this.synthesizeSentence( message, hookExecutor, hookCtx, ttsFirstByteSent, + false, ).catch((err) => { getLogger().error( `longTurnMessage filler synthesis failed (${label}):`, @@ -3291,7 +3416,9 @@ export class StreamHandler { const fallback = this.deps.agent.llmErrorMessage; if (fallback && !ttsFirstByteSent.value && this.isSpeaking) { try { - await this.synthesizeSentence(fallback, hookExecutor, hookCtx, ttsFirstByteSent); + // Error-fallback audio is not part of the LLM's reply — no + // heard-prefix segment (recordSegment=false). + await this.synthesizeSentence(fallback, hookExecutor, hookCtx, ttsFirstByteSent, false); } catch (err) { getLogger().error(`llmErrorMessage fallback synthesis failed (${label}):`, err); } diff --git a/libraries/typescript/tests/llm-error-fallback.mocked.test.ts b/libraries/typescript/tests/llm-error-fallback.mocked.test.ts index 80c7c45..dcb5915 100644 --- a/libraries/typescript/tests/llm-error-fallback.mocked.test.ts +++ b/libraries/typescript/tests/llm-error-fallback.mocked.test.ts @@ -477,11 +477,13 @@ describe('[mocked] pipeline LLM-error spoken fallback (llmErrorMessage)', () => await new Promise((r) => setTimeout(r, 300)); + // Error-fallback audio is not part of the LLM reply — recordSegment=false. expect(synthSpy).toHaveBeenCalledWith( FALLBACK, expect.anything(), expect.anything(), expect.anything(), + false, ); expect(bridge.sendAudio as ReturnType).not.toHaveBeenCalled(); }, 10000); diff --git a/libraries/typescript/tests/long-turn-filler.mocked.test.ts b/libraries/typescript/tests/long-turn-filler.mocked.test.ts index bca84d6..bf2a3d1 100644 --- a/libraries/typescript/tests/long-turn-filler.mocked.test.ts +++ b/libraries/typescript/tests/long-turn-filler.mocked.test.ts @@ -331,11 +331,13 @@ describe('[mocked] pipeline long-turn filler (longTurnMessage)', () => { await stt.emitTranscript('What is the weather?'); // The real reply still plays; the broken filler degraded to silence. + // Filler audio is not part of the LLM reply — recordSegment=false. await vi.waitFor(() => expect(synthSpy).toHaveBeenCalledWith( FILLER, expect.anything(), expect.anything(), expect.anything(), + false, ), { timeout: 5000 }); await vi.waitFor(() => expect(synthSpy).toHaveBeenCalledWith( 'Here is your answer.', diff --git a/libraries/typescript/tests/unit/pipeline-bargein-buffered.test.ts b/libraries/typescript/tests/unit/pipeline-bargein-buffered.test.ts index c4ccd97..b09aecc 100644 --- a/libraries/typescript/tests/unit/pipeline-bargein-buffered.test.ts +++ b/libraries/typescript/tests/unit/pipeline-bargein-buffered.test.ts @@ -237,3 +237,138 @@ describe('[unit] barge-in during the buffered backlog — Hermes/OpenClaw regres expect(h.playbackBufferedUntil).toBeLessThanOrEqual(before + 300); }); }); + +describe('[unit] heardResponsePrefix — what did the caller actually listen to?', () => { + it('maps the backlog to a sentence-granular heard prefix', () => { + const h = makeHandler() as any; + h.turnSpokenSegments = [ + { text: 'Frase uno.', startMs: 0 }, + { text: 'Frase due.', startMs: 2000 }, + { text: 'Frase tre.', startMs: 4000 }, + ]; + h.turnPlaybackTotalMs = 6000; + // 4 s still buffered → only the first 2 s actually played. + h.playbackBufferedUntil = Date.now() + 4000; + + const heard = h.heardResponsePrefix(); + + expect(heard.text).toBe('Frase uno. Frase due.'); + expect(heard.heardEverything).toBe(false); + }); + + it('returns null when no segments were tracked', () => { + const h = makeHandler() as any; + expect(h.heardResponsePrefix()).toBeNull(); + }); + + it('reports everything heard once the backlog drained', () => { + const h = makeHandler() as any; + h.turnSpokenSegments = [ + { text: 'Frase uno.', startMs: 0 }, + { text: 'Frase due.', startMs: 2000 }, + ]; + h.turnPlaybackTotalMs = 4000; + h.playbackBufferedUntil = 0; // long drained + + const heard = h.heardResponsePrefix(); + + expect(heard.text).toBe('Frase uno. Frase due.'); + expect(heard.heardEverything).toBe(true); + }); + + it('synthesizeSentence records a heard-prefix segment per sentence', async () => { + const deps = makeDeps(); + const h = makeHandler(deps) as any; + h.isSpeaking = true; + h.tts = { + synthesizeStream: async function* () { + yield Buffer.alloc(6400); // 200 ms of PCM16 @ 16 kHz + }, + }; + + const hookExecutor = new PipelineHookExecutor(undefined); + await h.synthesizeSentence('Frase uno.', hookExecutor, h.buildHookContext(), { + value: false, + }); + await h.synthesizeSentence('Frase due.', hookExecutor, h.buildHookContext(), { + value: true, + }); + + expect(h.turnSpokenSegments).toEqual([ + { text: 'Frase uno.', startMs: 0 }, + { text: 'Frase due.', startMs: 200 }, + ]); + }); + + it('filler audio advances the clock without adding a segment', async () => { + const deps = makeDeps(); + const h = makeHandler(deps) as any; + h.isSpeaking = true; + h.tts = { + synthesizeStream: async function* () { + yield Buffer.alloc(6400); + }, + }; + + await h.synthesizeSentence( + 'One moment.', + new PipelineHookExecutor(undefined), + h.buildHookContext(), + { value: false }, + false, // recordSegment=false — filler / error fallback + ); + + expect(h.turnSpokenSegments).toEqual([]); + expect(h.turnPlaybackTotalMs).toBe(200); + }); +}); + +describe('[unit] post-complete barge-in — history rewritten to the heard prefix', () => { + const FULL = 'Frase uno. Frase due. Frase tre.'; + + function completedTurnHandler(): { h: any; deps: StreamHandlerDeps } { + const deps = makeDeps(); + const h = makeHandler(deps) as any; + h.isSpeaking = true; + h.history.push({ role: 'assistant', text: FULL, timestamp: Date.now() }); + h.turnSpokenSegments = [ + { text: 'Frase uno.', startMs: 0 }, + { text: 'Frase due.', startMs: 2000 }, + { text: 'Frase tre.', startMs: 4000 }, + ]; + h.turnPlaybackTotalMs = 6000; + h.playbackBufferedUntil = Date.now() + 4000; + return { h, deps }; + } + + it('a barge-in during the buffered tail truncates the recorded reply', () => { + const { h, deps } = completedTurnHandler(); + + h.runBargeInCancel('aspetta'); + + const last = h.history.entries[h.history.entries.length - 1]; + expect(last.text).toBe('Frase uno. Frase due. [interrupted by caller]'); + expect(deps.bridge.sendClear).toHaveBeenCalledTimes(1); + }); + + it('no backlog → no rewrite', () => { + const { h } = completedTurnHandler(); + h.playbackBufferedUntil = 0; // everything already played + + h.runBargeInCancel('ok'); + + const last = h.history.entries[h.history.entries.length - 1]; + expect(last.text).toBe(FULL); + }); + + it('a turn still in flight is owned by the streaming marker — no rewrite', () => { + const { h } = completedTurnHandler(); + h.dispatchTask = new Promise(() => {}); // in flight, never settles + + h.runBargeInCancel('aspetta'); + + const last = h.history.entries[h.history.entries.length - 1]; + expect(last.text).toBe(FULL); + h.dispatchTask = null; + }); +});