diff --git a/packages/keiko-model-gateway/src/index.ts b/packages/keiko-model-gateway/src/index.ts index 06219546..6a21fbf4 100644 --- a/packages/keiko-model-gateway/src/index.ts +++ b/packages/keiko-model-gateway/src/index.ts @@ -129,10 +129,13 @@ export { export { MAX_SPEECH_AUDIO_BYTES, requestTextToSpeech, + requestTextToSpeechStream, type SpeechResponseFormat, type TextToSpeechErrorKind, type TextToSpeechOutcome, type TextToSpeechRequest, + type TextToSpeechStreamOutcome, + type TextToSpeechStreamSuccess, type TextToSpeechSuccess, } from "./text-to-speech-adapter.js"; diff --git a/packages/keiko-model-gateway/src/text-to-speech-adapter.test.ts b/packages/keiko-model-gateway/src/text-to-speech-adapter.test.ts index 58835927..d20fc0b6 100644 --- a/packages/keiko-model-gateway/src/text-to-speech-adapter.test.ts +++ b/packages/keiko-model-gateway/src/text-to-speech-adapter.test.ts @@ -1,5 +1,9 @@ import { describe, expect, it } from "vitest"; -import { MAX_SPEECH_AUDIO_BYTES, requestTextToSpeech } from "./text-to-speech-adapter.js"; +import { + MAX_SPEECH_AUDIO_BYTES, + requestTextToSpeech, + requestTextToSpeechStream, +} from "./text-to-speech-adapter.js"; import { OutboundHttpEgressError } from "./http.js"; // A recognizable audio byte marker so a test can assert the adapter returns the provider body verbatim @@ -331,3 +335,67 @@ describe("requestTextToSpeech", () => { expect(seenBody).not.toContain(SECRET_API_KEY); }); }); + +async function collect(stream: ReadableStream): Promise { + const reader = stream.getReader(); + const parts: Uint8Array[] = []; + for (;;) { + const { done, value } = await reader.read(); + if (done) break; + parts.push(value); + } + const total = parts.reduce((n, p) => n + p.length, 0); + const out = new Uint8Array(total); + let offset = 0; + for (const p of parts) { + out.set(p, offset); + offset += p.length; + } + return out; +} + +describe("requestTextToSpeechStream", () => { + it("returns the provider body as a byte stream with the resolved mime type", async () => { + const fetchImpl = mockFetch(() => audioResponse(AUDIO_BYTES, "audio/pcm")); + const outcome = await requestTextToSpeechStream({ + endpoint: ENDPOINT, + apiKey: SECRET_API_KEY, + modelId: "keiko-tts", + input: ANSWER, + responseFormat: "pcm", + fetchImpl, + }); + expect(outcome.ok).toBe(true); + if (!outcome.ok) return; + expect(outcome.value.mimeType).toBe("audio/pcm"); + expect(new TextDecoder().decode(await collect(outcome.value.body))).toBe(AUDIO_MARKER); + }); + + it("maps a provider error status to a coded kind without streaming a body", async () => { + const fetchImpl = mockFetch(() => new Response("provider error page", { status: 429 })); + const outcome = await requestTextToSpeechStream({ + endpoint: ENDPOINT, + apiKey: SECRET_API_KEY, + modelId: "keiko-tts", + input: ANSWER, + fetchImpl, + }); + expect(outcome).toEqual({ ok: false, kind: "rate-limited" }); + }); + + it("errors the stream once the audio exceeds the size cap", async () => { + const fetchImpl = mockFetch(() => audioResponse(new Uint8Array(100), "audio/pcm")); + const outcome = await requestTextToSpeechStream({ + endpoint: ENDPOINT, + apiKey: SECRET_API_KEY, + modelId: "keiko-tts", + input: ANSWER, + responseFormat: "pcm", + maxAudioBytes: 10, + fetchImpl, + }); + expect(outcome.ok).toBe(true); + if (!outcome.ok) return; + await expect(collect(outcome.value.body)).rejects.toThrow(); + }); +}); diff --git a/packages/keiko-model-gateway/src/text-to-speech-adapter.ts b/packages/keiko-model-gateway/src/text-to-speech-adapter.ts index 527adc61..67b9f4f8 100644 --- a/packages/keiko-model-gateway/src/text-to-speech-adapter.ts +++ b/packages/keiko-model-gateway/src/text-to-speech-adapter.ts @@ -276,3 +276,77 @@ export async function requestTextToSpeech( } return decodeSuccess(dispatched, built); } + +export interface TextToSpeechStreamSuccess { + // The synthesized audio bytes as they arrive from the provider, capped incrementally so a hostile or + // misconfigured endpoint cannot stream an unbounded body. The BFF pipes these straight to the browser + // (no whole-clip buffering, no base64 envelope) for start-on-first-chunk playback. + readonly body: ReadableStream; + readonly mimeType: string; +} + +export type TextToSpeechStreamOutcome = + | { readonly ok: true; readonly value: TextToSpeechStreamSuccess } + | { readonly ok: false; readonly kind: TextToSpeechErrorKind }; + +// Wraps the provider body in a passthrough that aborts once `maxBytes` is exceeded, so the streamed +// response inherits the same size ceiling as the buffered path without ever holding the whole clip. +function boundBodyStream( + source: ReadableStream, + maxBytes: number, +): ReadableStream { + const reader = source.getReader(); + let total = 0; + return new ReadableStream({ + async pull(controller): Promise { + try { + const { done, value } = await reader.read(); + if (done) { + controller.close(); + return; + } + total += value.byteLength; + if (total > maxBytes) { + await reader.cancel(); + controller.error(new Error("synthesized audio exceeded the size limit")); + return; + } + controller.enqueue(value); + } catch (error) { + controller.error(error); + } + }, + cancel(reason): void { + void reader.cancel(reason); + }, + }); +} + +// Streaming variant of requestTextToSpeech: returns the provider audio as a bounded byte stream instead +// of a fully-buffered clip, so the BFF can forward it chunk-by-chunk and the browser can start playback +// on the first chunk. Same provider contract, auth, egress seam, error coding, and size cap; only the +// delivery shape differs. Raw audio is never persisted here. +export async function requestTextToSpeechStream( + request: TextToSpeechRequest, +): Promise { + const built = buildRequest(request); + const dispatched = await dispatch(built, request.fetchImpl, request.egress); + if (typeof dispatched === "string") { + return { ok: false, kind: dispatched }; + } + if (!dispatched.ok) { + const kind = classifyStatus(dispatched.status) ?? "transport"; + await discardBody(dispatched); + return { ok: false, kind }; + } + if (dispatched.body === null) { + return { ok: false, kind: "empty-audio" }; + } + return { + ok: true, + value: { + body: boundBodyStream(dispatched.body, built.maxAudioBytes), + mimeType: resolveMimeType(dispatched, built.responseFormat), + }, + }; +} diff --git a/packages/keiko-server/src/deps.ts b/packages/keiko-server/src/deps.ts index a7bb4259..932c2c8c 100644 --- a/packages/keiko-server/src/deps.ts +++ b/packages/keiko-server/src/deps.ts @@ -63,6 +63,7 @@ import type { SpeechToTextRequest, TextToSpeechOutcome, TextToSpeechRequest, + TextToSpeechStreamOutcome, } from "@oscharko-dev/keiko-model-gateway"; import { createRelationshipStorePort, @@ -285,6 +286,12 @@ export interface UiHandlerDeps { readonly voiceSpeechRequest?: | ((request: TextToSpeechRequest) => Promise) | undefined; + // Streaming counterpart of voiceSpeechRequest (Issue #1556). Lets the /api/voice/speak/stream route + // forward provider PCM chunk-by-chunk in tests without touching global fetch. Production leaves it + // undefined and uses requestTextToSpeechStream; raw audio is streamed through, never persisted. + readonly voiceSpeechStreamRequest?: + | ((request: TextToSpeechRequest) => Promise) + | undefined; // Issue #497 (Epic #491) — realtime voice proxied-SDP negotiation seam (ADR-0058 D3/D6). Lets the // WebSocket control plane perform the browser↔provider SDP exchange through the provider-neutral // realtime adapter without touching global fetch in tests. Production leaves this undefined and diff --git a/packages/keiko-server/src/routes.ts b/packages/keiko-server/src/routes.ts index 00d00abe..bc91ee56 100644 --- a/packages/keiko-server/src/routes.ts +++ b/packages/keiko-server/src/routes.ts @@ -18,7 +18,11 @@ import { handleEvidenceDetail, } from "./read-handlers.js"; import { handleGetWorkspaceState, handlePutWorkspaceState } from "./workspace-state-handlers.js"; -import { handleVoiceSpeak, handleVoiceTranscribe } from "./voice-handlers.js"; +import { + handleVoiceSpeak, + handleVoiceSpeakStream, + handleVoiceTranscribe, +} from "./voice-handlers.js"; import { handleVoiceRecapBuild } from "./voice-recap.js"; import { handleCreateRun, @@ -279,6 +283,7 @@ export const API_ROUTES: readonly RouteDefinition[] = [ // the visible assistant answer text (inside the JSON + CSRF envelope) and receive synthesized audio // as base64; answers VOICE_UNAVAILABLE when no speech-output capability is configured/enabled. { method: "POST", pattern: "/api/voice/speak", handler: handleVoiceSpeak }, + { method: "POST", pattern: "/api/voice/speak/stream", handler: handleVoiceSpeakStream }, // Issue #504 (Epic #491, ADR-0067) — optional, capability-gated, user-triggered voice session recap. // POST the committed transcript text (content-free counts alongside) and derive memory candidates via // the EXISTING governed capture path; candidates surface in the existing review queue as "proposed". diff --git a/packages/keiko-server/src/voice-handlers.speak.test.ts b/packages/keiko-server/src/voice-handlers.speak.test.ts index 25040b9e..45a2c0d4 100644 --- a/packages/keiko-server/src/voice-handlers.speak.test.ts +++ b/packages/keiko-server/src/voice-handlers.speak.test.ts @@ -1,14 +1,15 @@ import { describe, expect, it } from "vitest"; import { Readable } from "node:stream"; import type { IncomingMessage } from "node:http"; -import { handleVoiceSpeak } from "./voice-handlers.js"; +import { handleVoiceSpeak, handleVoiceSpeakStream } from "./voice-handlers.js"; import { buildRedactor, createRunRegistry, type UiHandlerDeps } from "./index.js"; import { createInMemoryUiStore } from "./store/index.js"; -import type { RouteContext } from "./routes.js"; +import { STREAMING, type RouteContext, type RouteResult } from "./routes.js"; import type { GatewayConfig, TextToSpeechOutcome, TextToSpeechRequest, + TextToSpeechStreamOutcome, } from "@oscharko-dev/keiko-model-gateway"; const PROVIDER_SECRET = "voice-tts-secret-token-1234567890"; @@ -360,3 +361,117 @@ describe("POST /api/voice/speak — provider failure mapping (AC4)", () => { expect(serialized).not.toContain(PROVIDER_BASE_URL); }); }); + +// A minimal ServerResponse fake capturing the streaming write path. +class FakeRes { + statusCode: number | undefined; + headers: Record | undefined; + readonly chunks: Uint8Array[] = []; + ended = false; + destroyed = false; + writeHead(status: number, headers?: Record): this { + this.statusCode = status; + this.headers = headers; + return this; + } + write(chunk: Uint8Array): boolean { + this.chunks.push(chunk); + return true; + } + end(): void { + this.ended = true; + } + on(): this { + return this; + } + destroy(): void { + this.destroyed = true; + } +} + +function streamOf(chunks: Uint8Array[]): ReadableStream { + let i = 0; + return new ReadableStream({ + pull(controller): void { + const chunk = chunks[i]; + if (chunk !== undefined) { + i += 1; + controller.enqueue(chunk); + } else { + controller.close(); + } + }, + }); +} + +function streamCtx(body: unknown, res: FakeRes): RouteContext { + return { + req: Readable.from([Buffer.from(JSON.stringify(body), "utf8")]) as IncomingMessage, + res: res as unknown as RouteContext["res"], + params: {}, + url: new URL("http://127.0.0.1/api/voice/speak/stream"), + }; +} + +function streamOk( + body: ReadableStream, + mimeType = "audio/pcm", +): TextToSpeechStreamOutcome { + return { ok: true, value: { body, mimeType } }; +} + +describe("POST /api/voice/speak/stream", () => { + it("requests pcm, streams the provider bytes with audio/pcm, and returns STREAMING", async () => { + const seen: TextToSpeechRequest[] = []; + const res = new FakeRes(); + const deps = depsWith({ + config: SPEECH_OUTPUT_CONFIG, + configPresent: true, + voiceSpeechStreamRequest: ( + request: TextToSpeechRequest, + ): Promise => { + seen.push(request); + return Promise.resolve( + streamOk(streamOf([new Uint8Array([1, 2, 3]), new Uint8Array([4, 5])])), + ); + }, + }); + + const outcome = await handleVoiceSpeakStream(streamCtx({ text: "spoken answer" }, res), deps); + expect(outcome).toBe(STREAMING); + expect(res.statusCode).toBe(200); + expect(res.headers?.["Content-Type"]).toBe("audio/pcm"); + expect(Buffer.concat(res.chunks.map((c) => Buffer.from(c)))).toEqual( + Buffer.from([1, 2, 3, 4, 5]), + ); + expect(res.ended).toBe(true); + // The streaming path requests raw pcm (fastest to first audio). + expect(seen[0]?.responseFormat).toBe("pcm"); + expect(seen[0]?.signal).toBeDefined(); + }); + + it("returns a coded error RouteResult BEFORE any headers when synthesis fails", async () => { + const res = new FakeRes(); + const deps = depsWith({ + config: SPEECH_OUTPUT_CONFIG, + configPresent: true, + voiceSpeechStreamRequest: (): Promise => + Promise.resolve({ ok: false, kind: "rate-limited" }), + }); + const outcome = await handleVoiceSpeakStream(streamCtx({ text: "spoken answer" }, res), deps); + expect(outcome).not.toBe(STREAMING); + expect((outcome as RouteResult).status).toBe(429); + expect(res.statusCode).toBeUndefined(); // never committed a 200 + audio headers + expect(res.ended).toBe(false); + }); + + it("returns 503 VOICE_UNAVAILABLE for an STT-only deployment (no streaming)", async () => { + const res = new FakeRes(); + const outcome = await handleVoiceSpeakStream( + streamCtx({ text: "x" }, res), + depsWith({ config: STT_ONLY_CONFIG, configPresent: true }), + ); + expect((outcome as RouteResult).status).toBe(503); + expect(res.statusCode).toBeUndefined(); + }); +}); diff --git a/packages/keiko-server/src/voice-handlers.ts b/packages/keiko-server/src/voice-handlers.ts index ae9502b1..71254e13 100644 --- a/packages/keiko-server/src/voice-handlers.ts +++ b/packages/keiko-server/src/voice-handlers.ts @@ -17,6 +17,7 @@ import type { IncomingMessage } from "node:http"; import { requestSpeechToText, requestTextToSpeech, + requestTextToSpeechStream, resolveVoiceCapability, selectSpeechOutputModel, selectSpeechToTextModel, @@ -29,11 +30,12 @@ import { type SpeechToTextSuccess, type TextToSpeechErrorKind, type TextToSpeechRequest, + type TextToSpeechStreamOutcome, type TextToSpeechSuccess, type VoicePersona, } from "@oscharko-dev/keiko-model-gateway"; -import type { RouteContext, RouteResult } from "./routes.js"; -import { errorBody } from "./routes.js"; +import type { HandlerOutcome, RouteContext, RouteResult } from "./routes.js"; +import { errorBody, STREAMING } from "./routes.js"; import type { UiHandlerDeps } from "./deps.js"; import { currentGatewayConfig, currentGatewayEgressConfig } from "./deps.js"; import { isVoiceDisabledByPolicy } from "./read-handlers.js"; @@ -604,10 +606,18 @@ function speechResult(value: TextToSpeechSuccess): RouteResult { }; } -export async function handleVoiceSpeak( +interface ResolvedSpeak { + readonly validated: ValidatedSpeech; + readonly provider: ModelProviderConfig; + readonly target: SpeechTarget; +} + +// Shared front-matter for both speak routes: gate the capability, parse + validate the request, and +// resolve the provider + voice target. Returns the resolved request, or a RouteResult to return as-is. +async function resolveSpeakRequest( ctx: RouteContext, deps: UiHandlerDeps, -): Promise { +): Promise { const gated = gateSpeechOutput(deps); if (isRouteResult(gated)) { return gated; @@ -628,7 +638,103 @@ export async function handleVoiceSpeak( if (provider === undefined) { return speechUnavailable(deps); } + return { validated, provider, target }; +} + +export async function handleVoiceSpeak( + ctx: RouteContext, + deps: UiHandlerDeps, +): Promise { + const resolved = await resolveSpeakRequest(ctx, deps); + if (isRouteResult(resolved)) { + return resolved; + } const synthesize = deps.voiceSpeechRequest ?? requestTextToSpeech; - const outcome = await synthesize(buildTtsRequest(provider, target, validated, deps)); + const outcome = await synthesize( + buildTtsRequest(resolved.provider, resolved.target, resolved.validated, deps), + ); return outcome.ok ? speechResult(outcome.value) : speechProviderErrorResult(deps, outcome.kind); } + +// The streaming speak path requests raw PCM (the fastest provider format to first audio) and forwards +// the bytes to the browser un-buffered (no base64 JSON envelope) for AudioWorklet start-on-first-chunk +// playback. The buffered /api/voice/speak route stays as the universal fallback. +const STREAM_SPEECH_FORMAT = "pcm" as const; + +function buildStreamTtsRequest( + resolved: ResolvedSpeak, + deps: UiHandlerDeps, + signal: AbortSignal, +): TextToSpeechRequest { + return { + ...buildTtsRequest(resolved.provider, resolved.target, resolved.validated, deps), + responseFormat: STREAM_SPEECH_FORMAT, + signal, + }; +} + +// Aborts the synthesis when the client disconnects (res "close" is the canonical signal), so a barge-in +// or navigation stops the provider stream rather than producing audio no one will hear. +function abortOnResClose(ctx: RouteContext): AbortController { + const controller = new AbortController(); + ctx.res.on("close", () => { + controller.abort(); + }); + return controller; +} + +// Pipes the provider audio stream to the response honoring backpressure (res.write → false aborts) and +// client disconnect. Once 200 + audio headers are sent no JSON error is possible, so a mid-stream +// failure just ends the partial stream — the client falls back to the buffered route on the next turn. +async function pipeAudioStream( + ctx: RouteContext, + body: ReadableStream, + controller: AbortController, +): Promise { + const reader = body.getReader(); + try { + for (;;) { + const { done, value } = await reader.read(); + if (done || controller.signal.aborted) { + break; + } + if (!ctx.res.write(value)) { + controller.abort(); + ctx.res.destroy(); + break; + } + } + } catch { + // partial stream — ended in finally + } finally { + try { + await reader.cancel(); + } catch { + // already released + } + ctx.res.end(); + } +} + +export async function handleVoiceSpeakStream( + ctx: RouteContext, + deps: UiHandlerDeps, +): Promise { + const resolved = await resolveSpeakRequest(ctx, deps); + if (isRouteResult(resolved)) { + return resolved; + } + const controller = abortOnResClose(ctx); + const synthesizeStream: (request: TextToSpeechRequest) => Promise = + deps.voiceSpeechStreamRequest ?? requestTextToSpeechStream; + const outcome = await synthesizeStream(buildStreamTtsRequest(resolved, deps, controller.signal)); + if (!outcome.ok) { + return speechProviderErrorResult(deps, outcome.kind); + } + const mimeType = ALLOWED_SPEECH_MIME.has(outcome.value.mimeType) + ? outcome.value.mimeType + : DEFAULT_SPEECH_MIME; + ctx.res.writeHead(200, { "Content-Type": mimeType, "Cache-Control": "no-store" }); + await pipeAudioStream(ctx, outcome.value.body, controller); + return STREAMING; +} diff --git a/packages/keiko-ui/public/keiko-playback-worklet.js b/packages/keiko-ui/public/keiko-playback-worklet.js new file mode 100644 index 00000000..9560c065 --- /dev/null +++ b/packages/keiko-ui/public/keiko-playback-worklet.js @@ -0,0 +1,156 @@ +// Keiko gapless PCM playback worklet (Issue #1556). Plays streamed assistant-speech audio sample- +// accurately on the audio render thread so the buffered whole-clip