Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions packages/keiko-model-gateway/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -129,10 +129,13 @@ export {
export {
MAX_SPEECH_AUDIO_BYTES,
requestTextToSpeech,
requestTextToSpeechStream,
type SpeechResponseFormat,
type TextToSpeechErrorKind,
type TextToSpeechOutcome,
type TextToSpeechRequest,
type TextToSpeechStreamOutcome,
type TextToSpeechStreamSuccess,
type TextToSpeechSuccess,
} from "./text-to-speech-adapter.js";

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
import { describe, expect, it } from "vitest";
import { MAX_SPEECH_AUDIO_BYTES, requestTextToSpeech } from "./text-to-speech-adapter.js";
import {
MAX_SPEECH_AUDIO_BYTES,
requestTextToSpeech,
requestTextToSpeechStream,
} from "./text-to-speech-adapter.js";
import { OutboundHttpEgressError } from "./http.js";

// A recognizable audio byte marker so a test can assert the adapter returns the provider body verbatim
Expand Down Expand Up @@ -331,3 +335,67 @@ describe("requestTextToSpeech", () => {
expect(seenBody).not.toContain(SECRET_API_KEY);
});
});

async function collect(stream: ReadableStream<Uint8Array>): Promise<Uint8Array> {
const reader = stream.getReader();
const parts: Uint8Array[] = [];
for (;;) {
const { done, value } = await reader.read();
if (done) break;
parts.push(value);
}
const total = parts.reduce((n, p) => n + p.length, 0);
const out = new Uint8Array(total);
let offset = 0;
for (const p of parts) {
out.set(p, offset);
offset += p.length;
}
return out;
}

describe("requestTextToSpeechStream", () => {
it("returns the provider body as a byte stream with the resolved mime type", async () => {
const fetchImpl = mockFetch(() => audioResponse(AUDIO_BYTES, "audio/pcm"));
const outcome = await requestTextToSpeechStream({
endpoint: ENDPOINT,
apiKey: SECRET_API_KEY,
modelId: "keiko-tts",
input: ANSWER,
responseFormat: "pcm",
fetchImpl,
});
expect(outcome.ok).toBe(true);
if (!outcome.ok) return;
expect(outcome.value.mimeType).toBe("audio/pcm");
expect(new TextDecoder().decode(await collect(outcome.value.body))).toBe(AUDIO_MARKER);
});

it("maps a provider error status to a coded kind without streaming a body", async () => {
const fetchImpl = mockFetch(() => new Response("provider error page", { status: 429 }));
const outcome = await requestTextToSpeechStream({
endpoint: ENDPOINT,
apiKey: SECRET_API_KEY,
modelId: "keiko-tts",
input: ANSWER,
fetchImpl,
});
expect(outcome).toEqual({ ok: false, kind: "rate-limited" });
});

it("errors the stream once the audio exceeds the size cap", async () => {
const fetchImpl = mockFetch(() => audioResponse(new Uint8Array(100), "audio/pcm"));
const outcome = await requestTextToSpeechStream({
endpoint: ENDPOINT,
apiKey: SECRET_API_KEY,
modelId: "keiko-tts",
input: ANSWER,
responseFormat: "pcm",
maxAudioBytes: 10,
fetchImpl,
});
expect(outcome.ok).toBe(true);
if (!outcome.ok) return;
await expect(collect(outcome.value.body)).rejects.toThrow();
});
});
74 changes: 74 additions & 0 deletions packages/keiko-model-gateway/src/text-to-speech-adapter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -276,3 +276,77 @@ export async function requestTextToSpeech(
}
return decodeSuccess(dispatched, built);
}

export interface TextToSpeechStreamSuccess {
// The synthesized audio bytes as they arrive from the provider, capped incrementally so a hostile or
// misconfigured endpoint cannot stream an unbounded body. The BFF pipes these straight to the browser
// (no whole-clip buffering, no base64 envelope) for start-on-first-chunk playback.
readonly body: ReadableStream<Uint8Array>;
readonly mimeType: string;
}

export type TextToSpeechStreamOutcome =
| { readonly ok: true; readonly value: TextToSpeechStreamSuccess }
| { readonly ok: false; readonly kind: TextToSpeechErrorKind };

// Wraps the provider body in a passthrough that aborts once `maxBytes` is exceeded, so the streamed
// response inherits the same size ceiling as the buffered path without ever holding the whole clip.
function boundBodyStream(
source: ReadableStream<Uint8Array>,
maxBytes: number,
): ReadableStream<Uint8Array> {
const reader = source.getReader();
let total = 0;
return new ReadableStream<Uint8Array>({
async pull(controller): Promise<void> {
try {
const { done, value } = await reader.read();
if (done) {
controller.close();
return;
}
total += value.byteLength;
if (total > maxBytes) {
await reader.cancel();
controller.error(new Error("synthesized audio exceeded the size limit"));
return;
}
controller.enqueue(value);
} catch (error) {
controller.error(error);
}
},
cancel(reason): void {
void reader.cancel(reason);
},
});
}

// Streaming variant of requestTextToSpeech: returns the provider audio as a bounded byte stream instead
// of a fully-buffered clip, so the BFF can forward it chunk-by-chunk and the browser can start playback
// on the first chunk. Same provider contract, auth, egress seam, error coding, and size cap; only the
// delivery shape differs. Raw audio is never persisted here.
export async function requestTextToSpeechStream(
request: TextToSpeechRequest,
): Promise<TextToSpeechStreamOutcome> {
const built = buildRequest(request);
const dispatched = await dispatch(built, request.fetchImpl, request.egress);
if (typeof dispatched === "string") {
return { ok: false, kind: dispatched };
}
if (!dispatched.ok) {
const kind = classifyStatus(dispatched.status) ?? "transport";
await discardBody(dispatched);
return { ok: false, kind };
}
if (dispatched.body === null) {
return { ok: false, kind: "empty-audio" };
}
return {
ok: true,
value: {
body: boundBodyStream(dispatched.body, built.maxAudioBytes),
mimeType: resolveMimeType(dispatched, built.responseFormat),
},
};
}
7 changes: 7 additions & 0 deletions packages/keiko-server/src/deps.ts
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ import type {
SpeechToTextRequest,
TextToSpeechOutcome,
TextToSpeechRequest,
TextToSpeechStreamOutcome,
} from "@oscharko-dev/keiko-model-gateway";
import {
createRelationshipStorePort,
Expand Down Expand Up @@ -285,6 +286,12 @@ export interface UiHandlerDeps {
readonly voiceSpeechRequest?:
| ((request: TextToSpeechRequest) => Promise<TextToSpeechOutcome>)
| undefined;
// Streaming counterpart of voiceSpeechRequest (Issue #1556). Lets the /api/voice/speak/stream route
// forward provider PCM chunk-by-chunk in tests without touching global fetch. Production leaves it
// undefined and uses requestTextToSpeechStream; raw audio is streamed through, never persisted.
readonly voiceSpeechStreamRequest?:
| ((request: TextToSpeechRequest) => Promise<TextToSpeechStreamOutcome>)
| undefined;
// Issue #497 (Epic #491) — realtime voice proxied-SDP negotiation seam (ADR-0058 D3/D6). Lets the
// WebSocket control plane perform the browser↔provider SDP exchange through the provider-neutral
// realtime adapter without touching global fetch in tests. Production leaves this undefined and
Expand Down
7 changes: 6 additions & 1 deletion packages/keiko-server/src/routes.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,11 @@ import {
handleEvidenceDetail,
} from "./read-handlers.js";
import { handleGetWorkspaceState, handlePutWorkspaceState } from "./workspace-state-handlers.js";
import { handleVoiceSpeak, handleVoiceTranscribe } from "./voice-handlers.js";
import {
handleVoiceSpeak,
handleVoiceSpeakStream,
handleVoiceTranscribe,
} from "./voice-handlers.js";
import { handleVoiceRecapBuild } from "./voice-recap.js";
import {
handleCreateRun,
Expand Down Expand Up @@ -279,6 +283,7 @@ export const API_ROUTES: readonly RouteDefinition[] = [
// the visible assistant answer text (inside the JSON + CSRF envelope) and receive synthesized audio
// as base64; answers VOICE_UNAVAILABLE when no speech-output capability is configured/enabled.
{ method: "POST", pattern: "/api/voice/speak", handler: handleVoiceSpeak },
{ method: "POST", pattern: "/api/voice/speak/stream", handler: handleVoiceSpeakStream },
// Issue #504 (Epic #491, ADR-0067) — optional, capability-gated, user-triggered voice session recap.
// POST the committed transcript text (content-free counts alongside) and derive memory candidates via
// the EXISTING governed capture path; candidates surface in the existing review queue as "proposed".
Expand Down
119 changes: 117 additions & 2 deletions packages/keiko-server/src/voice-handlers.speak.test.ts
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
import { describe, expect, it } from "vitest";
import { Readable } from "node:stream";
import type { IncomingMessage } from "node:http";
import { handleVoiceSpeak } from "./voice-handlers.js";
import { handleVoiceSpeak, handleVoiceSpeakStream } from "./voice-handlers.js";
import { buildRedactor, createRunRegistry, type UiHandlerDeps } from "./index.js";
import { createInMemoryUiStore } from "./store/index.js";
import type { RouteContext } from "./routes.js";
import { STREAMING, type RouteContext, type RouteResult } from "./routes.js";
import type {
GatewayConfig,
TextToSpeechOutcome,
TextToSpeechRequest,
TextToSpeechStreamOutcome,
} from "@oscharko-dev/keiko-model-gateway";

const PROVIDER_SECRET = "voice-tts-secret-token-1234567890";
Expand Down Expand Up @@ -360,3 +361,117 @@ describe("POST /api/voice/speak — provider failure mapping (AC4)", () => {
expect(serialized).not.toContain(PROVIDER_BASE_URL);
});
});

// A minimal ServerResponse fake capturing the streaming write path.
class FakeRes {
statusCode: number | undefined;
headers: Record<string, string> | undefined;
readonly chunks: Uint8Array[] = [];
ended = false;
destroyed = false;
writeHead(status: number, headers?: Record<string, string>): this {
this.statusCode = status;
this.headers = headers;
return this;
}
write(chunk: Uint8Array): boolean {
this.chunks.push(chunk);
return true;
}
end(): void {
this.ended = true;
}
on(): this {
return this;
}
destroy(): void {
this.destroyed = true;
}
}

function streamOf(chunks: Uint8Array[]): ReadableStream<Uint8Array> {
let i = 0;
return new ReadableStream<Uint8Array>({
pull(controller): void {
const chunk = chunks[i];
if (chunk !== undefined) {
i += 1;
controller.enqueue(chunk);
} else {
controller.close();
}
},
});
}

function streamCtx(body: unknown, res: FakeRes): RouteContext {
return {
req: Readable.from([Buffer.from(JSON.stringify(body), "utf8")]) as IncomingMessage,
res: res as unknown as RouteContext["res"],
params: {},
url: new URL("http://127.0.0.1/api/voice/speak/stream"),
};
}

function streamOk(
body: ReadableStream<Uint8Array>,
mimeType = "audio/pcm",
): TextToSpeechStreamOutcome {
return { ok: true, value: { body, mimeType } };
}

describe("POST /api/voice/speak/stream", () => {
it("requests pcm, streams the provider bytes with audio/pcm, and returns STREAMING", async () => {
const seen: TextToSpeechRequest[] = [];
const res = new FakeRes();
const deps = depsWith({
config: SPEECH_OUTPUT_CONFIG,
configPresent: true,
voiceSpeechStreamRequest: (
request: TextToSpeechRequest,
): Promise<TextToSpeechStreamOutcome> => {
seen.push(request);
return Promise.resolve(
streamOk(streamOf([new Uint8Array([1, 2, 3]), new Uint8Array([4, 5])])),
);
},
});

const outcome = await handleVoiceSpeakStream(streamCtx({ text: "spoken answer" }, res), deps);
expect(outcome).toBe(STREAMING);
expect(res.statusCode).toBe(200);
expect(res.headers?.["Content-Type"]).toBe("audio/pcm");
expect(Buffer.concat(res.chunks.map((c) => Buffer.from(c)))).toEqual(
Buffer.from([1, 2, 3, 4, 5]),
);
expect(res.ended).toBe(true);
// The streaming path requests raw pcm (fastest to first audio).
expect(seen[0]?.responseFormat).toBe("pcm");
expect(seen[0]?.signal).toBeDefined();
});

it("returns a coded error RouteResult BEFORE any headers when synthesis fails", async () => {
const res = new FakeRes();
const deps = depsWith({
config: SPEECH_OUTPUT_CONFIG,
configPresent: true,
voiceSpeechStreamRequest: (): Promise<TextToSpeechStreamOutcome> =>
Promise.resolve({ ok: false, kind: "rate-limited" }),
});
const outcome = await handleVoiceSpeakStream(streamCtx({ text: "spoken answer" }, res), deps);
expect(outcome).not.toBe(STREAMING);
expect((outcome as RouteResult).status).toBe(429);
expect(res.statusCode).toBeUndefined(); // never committed a 200 + audio headers
expect(res.ended).toBe(false);
});

it("returns 503 VOICE_UNAVAILABLE for an STT-only deployment (no streaming)", async () => {
const res = new FakeRes();
const outcome = await handleVoiceSpeakStream(
streamCtx({ text: "x" }, res),
depsWith({ config: STT_ONLY_CONFIG, configPresent: true }),
);
expect((outcome as RouteResult).status).toBe(503);
expect(res.statusCode).toBeUndefined();
});
});
Loading
Loading