Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,8 @@
"@storybook/test": "8.6.15",
"@tailwindcss/cli": "4.1.18",
"@tailwindcss/typography": "^0.5.19",
"@testing-library/dom": "^10.4.1",
"@testing-library/react": "^16.3.2",
"@types/node": "^25.2.1",
"@types/react": "^18.3.3",
"@types/react-dom": "^18.3.0",
Expand All @@ -126,6 +128,7 @@
"eslint-plugin-react": "^7.37.5",
"eslint-plugin-react-hooks": "^5.2.0",
"globals": "^15.15.0",
"jsdom": "^29.0.2",
"storybook": "8.6.15",
"tailwindcss": "4.1.18",
"tsup": "^8.5.0",
Expand Down
357 changes: 355 additions & 2 deletions pnpm-lock.yaml

Large diffs are not rendered by default.

41 changes: 41 additions & 0 deletions ui/src/bichat/data/HttpDataSource.ts
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,47 @@ export class HttpDataSource implements ChatDataSource {
);
}

/**
* Open a native EventSource against GET /stream/events for the given
* run. Used by components that want browser-native auto-reconnect
* with Last-Event-ID (tab close, wifi drop, device switch). Prefer
* over resumeStream when tailing an already-running generation that
* another tab started.
*/
subscribeRunEvents(
sessionId: string,
runId: string,
options: Messages.SubscribeRunEventsOptions
): Promise<void> {
return Messages.subscribeRunEvents(
{
baseUrl: this.config.baseUrl,
streamEndpoint: this.config.streamEndpoint!,
},
sessionId,
runId,
options
);
}

/**
* Subscribe to the per-tenant active-run fan-out
* (GET /stream/active-runs). Never resolves until the caller aborts
* via the signal; use from a top-level component (sidebar container)
* that mounts for the lifetime of the chat app.
*/
subscribeActiveRuns(
options: Messages.SubscribeActiveRunsOptions
): Promise<void> {
return Messages.subscribeActiveRuns(
{
baseUrl: this.config.baseUrl,
streamEndpoint: this.config.streamEndpoint!,
},
options
);
}

async *sendMessage(
sessionId: string,
content: string,
Expand Down
206 changes: 206 additions & 0 deletions ui/src/bichat/data/MessageTransport.test.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
import { afterEach, describe, expect, it, vi } from 'vitest';
import {
resumeStream,
RunEventsConnectError,
sendMessage,
subscribeRunEvents,
submitQuestionAnswers,
type MessageTransportDeps,
} from './MessageTransport';
import type { StreamChunk } from '../types';

const encoder = new TextEncoder();

Expand Down Expand Up @@ -92,6 +95,209 @@ describe('MessageTransport stream timeout defaults', () => {
});
});

describe('sendMessage request_id idempotency', () => {
it('includes an auto-generated requestId in the POST body', async () => {
const fetchMock = vi.fn(async () => new Response(
createSSEStream([{ type: 'done' }]),
{ status: 200, headers: { 'Content-Type': 'text/event-stream' } },
));
vi.stubGlobal('fetch', fetchMock);

for await (const _ of sendMessage(createDeps(), 'session-1', 'hello', [])) {
// drain stream
}

expect(fetchMock).toHaveBeenCalledTimes(1);
const firstCall = fetchMock.mock.calls[0] as unknown as [unknown, RequestInit];
const body = JSON.parse(firstCall[1].body as string);
expect(typeof body.requestId).toBe('string');
expect(body.requestId.length).toBeGreaterThan(0);
});

it('preserves a caller-supplied requestId verbatim', async () => {
const fetchMock = vi.fn(async () => new Response(
createSSEStream([{ type: 'done' }]),
{ status: 200, headers: { 'Content-Type': 'text/event-stream' } },
));
vi.stubGlobal('fetch', fetchMock);

const explicit = '11111111-2222-4333-8444-555566667777';
for await (const _ of sendMessage(
createDeps(),
'session-1',
'hello',
[],
undefined,
{ requestId: explicit },
)) {
// drain
}

const firstCall = fetchMock.mock.calls[0] as unknown as [unknown, RequestInit];
const body = JSON.parse(firstCall[1].body as string);
expect(body.requestId).toBe(explicit);
});
});

// ---------------------------------------------------------------------------
// subscribeRunEvents — EventSource listener regression coverage
// ---------------------------------------------------------------------------

class FakeEventSource {
static instances: FakeEventSource[] = [];
listeners = new Map<string, Set<(e: MessageEvent) => void>>();
onerror: ((e: Event) => void) | null = null;
onmessage: ((e: MessageEvent) => void) | null = null;
readyState = 0;
closed = false;
constructor(public url: string, public init?: EventSourceInit) {
FakeEventSource.instances.push(this);
}
addEventListener(name: string, cb: (e: MessageEvent) => void) {
if (!this.listeners.has(name)) {this.listeners.set(name, new Set());}
this.listeners.get(name)!.add(cb);
}
removeEventListener(name: string, cb: (e: MessageEvent) => void) {
this.listeners.get(name)?.delete(cb);
}
close() {
this.readyState = 2;
this.closed = true;
}
emit(name: string, data: unknown) {
const payload = typeof data === 'string' ? data : JSON.stringify(data);
const evt = new MessageEvent(name, { data: payload });
for (const cb of this.listeners.get(name) ?? []) {cb(evt);}
}
emitError() {
this.onerror?.(new Event('error'));
}
}

function installFakeEventSource(): typeof FakeEventSource {
FakeEventSource.instances = [];
vi.stubGlobal('EventSource', FakeEventSource as unknown as typeof EventSource);
return FakeEventSource;
}

describe('subscribeRunEvents — terminal event handling', () => {
it('settles when backend emits a `cancelled` event (regression)', async () => {
installFakeEventSource();
const chunks: StreamChunk[] = [];
const promise = subscribeRunEvents(
{ baseUrl: '', streamEndpoint: '/stream' },
'session-1',
'run-1',
{ onChunk: (c) => chunks.push(c) },
);
// Flush microtasks so the constructor + addEventListener calls
// complete before we emit.
await Promise.resolve();
const es = FakeEventSource.instances[0];
expect(es).toBeDefined();
es.emit('cancelled', { type: 'cancelled', reason: 'user_stop' });
await expect(promise).resolves.toBeUndefined();
expect(es.closed).toBe(true);
expect(chunks).toEqual([{ type: 'cancelled', reason: 'user_stop' }]);
});

it('settles when backend emits a `failed` event (regression)', async () => {
installFakeEventSource();
const chunks: StreamChunk[] = [];
const promise = subscribeRunEvents(
{ baseUrl: '', streamEndpoint: '/stream' },
'session-1',
'run-1',
{ onChunk: (c) => chunks.push(c) },
);
await Promise.resolve();
const es = FakeEventSource.instances[0];
es.emit('failed', { type: 'failed', error: 'reaper_stale' });
await expect(promise).resolves.toBeUndefined();
expect(es.closed).toBe(true);
expect(chunks).toHaveLength(1);
expect(chunks[0].type).toBe('failed');
});

it('surfaces malformed JSON as an error chunk without tearing down the subscription', async () => {
installFakeEventSource();
const chunks: StreamChunk[] = [];
const promise = subscribeRunEvents(
{ baseUrl: '', streamEndpoint: '/stream' },
'session-1',
'run-1',
{ onChunk: (c) => chunks.push(c) },
);
await Promise.resolve();
const es = FakeEventSource.instances[0];
es.emit('content', '{bad');
// Subscription remains open — the first chunk was an injected error
// chunk. A subsequent valid `done` settles the promise.
expect(es.closed).toBe(false);
es.emit('done', { type: 'done' });
await expect(promise).resolves.toBeUndefined();
expect(es.closed).toBe(true);
expect(chunks).toHaveLength(2);
expect(chunks[0].type).toBe('error');
expect(chunks[1]).toEqual({ type: 'done' });
});

it('resolves and closes EventSource when the AbortSignal fires', async () => {
installFakeEventSource();
const ctrl = new AbortController();
const promise = subscribeRunEvents(
{ baseUrl: '', streamEndpoint: '/stream' },
'session-1',
'run-1',
{ onChunk: () => {}, signal: ctrl.signal },
);
await Promise.resolve();
const es = FakeEventSource.instances[0];
expect(es.closed).toBe(false);
ctrl.abort();
await expect(promise).resolves.toBeUndefined();
expect(es.closed).toBe(true);
});

it('rejects with RunEventsConnectError when onerror fires within the initial-connect grace', async () => {
installFakeEventSource();
const promise = subscribeRunEvents(
{ baseUrl: '', streamEndpoint: '/stream' },
'session-1',
'run-1',
{ onChunk: () => {} },
);
await Promise.resolve();
const es = FakeEventSource.instances[0];
es.emitError();
await expect(promise).rejects.toBeInstanceOf(RunEventsConnectError);
expect(es.closed).toBe(true);
});
});

describe('sendMessage request_id fallback', () => {
it('produces a valid UUID v4 when crypto.randomUUID is unavailable', async () => {
const fetchMock = vi.fn(async () => new Response(
createSSEStream([{ type: 'done' }]),
{ status: 200, headers: { 'Content-Type': 'text/event-stream' } },
));
vi.stubGlobal('fetch', fetchMock);
// Stub crypto with a shape that is defined but lacks randomUUID —
// forces the Math.random fallback path in generateRequestId.
vi.stubGlobal('crypto', { randomUUID: undefined } as unknown as Crypto);

for await (const _ of sendMessage(createDeps(), 'session-1', 'hello', [])) {
// drain
}

const firstCall = fetchMock.mock.calls[0] as unknown as [unknown, RequestInit];
const body = JSON.parse(firstCall[1].body as string);
expect(body.requestId).toMatch(
/^[0-9a-f]{8}-[0-9a-f]{4}-4[0-9a-f]{3}-[89ab][0-9a-f]{3}-[0-9a-f]{12}$/,
);
});
});

describe('submitQuestionAnswers', () => {
it('flattens custom text answers for RPC submission', async () => {
const callRPC = vi.fn(async () => ({
Expand Down
Loading
Loading