Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 63 additions & 0 deletions ui/src/bichat/data/MessageTransport.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -132,3 +132,66 @@ describe('submitQuestionAnswers', () => {
});
});
});

describe('MessageTransport abort vs timeout classification', () => {
it('re-throws an AbortError (user/unmount cancel) instead of yielding an error chunk', async () => {
// A user stop / provider unmount aborts the fetch. The generator must let
// the AbortError propagate so the machine takes its soft-cancel path; a
// yielded error chunk would be re-thrown downstream as a generic Error and
// discard the turn (the bug fixed here).
const abortError = new Error('The operation was aborted');
abortError.name = 'AbortError';
const fetchMock = vi.fn(async () => {
throw abortError;
});
vi.stubGlobal('fetch', fetchMock);

const chunks: unknown[] = [];
const iterate = (async () => {
for await (const chunk of sendMessage(createDeps(), 'session-1', 'hi', [])) {
chunks.push(chunk);
}
})();

await expect(iterate).rejects.toMatchObject({ name: 'AbortError' });
expect(chunks).toEqual([]);
});

it('yields a timeout error chunk when the connection times out', async () => {
vi.useFakeTimers();
try {
const fetchMock = vi.fn(
(_url: string, init: RequestInit) =>
new Promise<Response>((_resolve, reject) => {
init.signal?.addEventListener('abort', () => {
const e = new Error('aborted');
e.name = 'AbortError';
reject(e);
});
}),
);
vi.stubGlobal('fetch', fetchMock);

const chunks: Array<{ type?: string; error?: string }> = [];
const iterate = (async () => {
for await (const chunk of sendMessage(
createDeps({ streamConnectTimeoutMs: 50 }),
'session-1',
'hi',
[],
)) {
chunks.push(chunk as { type?: string; error?: string });
}
})();

await vi.advanceTimersByTimeAsync(60);
await iterate;

expect(chunks).toHaveLength(1);
expect(chunks[0]?.type).toBe('error');
expect(chunks[0]?.error).toContain('timed out');
} finally {
vi.useRealTimers();
}
});
});
20 changes: 14 additions & 6 deletions ui/src/bichat/data/MessageTransport.ts
Original file line number Diff line number Diff line change
Expand Up @@ -145,12 +145,20 @@ export async function* sendMessage(
} catch (err) {
if (err instanceof Error) {
if (err.name === 'AbortError') {
yield {
type: 'error',
error: connectionTimedOut
? `Stream request timed out after ${deps.streamConnectTimeoutMs}ms`
: 'Stream cancelled',
};
// A connection timeout is a genuine failure — surface it as an error
// chunk. A user-initiated stop or a provider unmount, however, must
// stay an AbortError so the machine takes its soft-cancel path (restore
// input, no error banner) instead of treating the send as failed and
// discarding the turn. Re-throw to preserve the error's identity;
// yielding an error chunk here would flatten it into a generic Error.
if (connectionTimedOut) {
yield {
type: 'error',
error: `Stream request timed out after ${deps.streamConnectTimeoutMs}ms`,
};
} else {
throw err;
}
} else {
yield {
type: 'error',
Expand Down
132 changes: 132 additions & 0 deletions ui/src/bichat/machine/ChatMachine.sendError.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
import { beforeEach, describe, expect, it } from 'vitest';
import type {
Attachment,
ChatDataSource,
SendMessageOptions,
Session,
StreamChunk,
} from '../types';
import type { RateLimiter } from '../utils/RateLimiter';
import { ChatMachine } from './ChatMachine';

const SESSION_ID = 'session-1';

function makeSession(id: string): Session {
const now = new Date().toISOString();
return {
id,
title: 'Test session',
status: 'active',
pinned: false,
createdAt: now,
updatedAt: now,
};
}

function createMemorySessionStorage(): Storage {
const store = new Map<string, string>();
return {
get length() {
return store.size;
},
clear() {
store.clear();
},
getItem(key: string) {
return store.has(key) ? store.get(key)! : null;
},
key(index: number) {
return Array.from(store.keys())[index] ?? null;
},
removeItem(key: string) {
store.delete(key);
},
setItem(key: string, value: string) {
store.set(key, value);
},
};
}

function installWindowWithSessionStorage(): void {
Object.defineProperty(globalThis, 'window', {
value: {
sessionStorage: createMemorySessionStorage(),
dispatchEvent: () => true,
},
configurable: true,
writable: true,
});
}

const rateLimiter = {
canMakeRequest: () => true,
getTimeUntilNextRequest: () => 0,
} as unknown as RateLimiter;

function makeDataSource(
sendImpl: (
sessionId: string,
content: string,
attachments?: Attachment[],
signal?: AbortSignal,
options?: SendMessageOptions,
) => AsyncGenerator<StreamChunk>,
): ChatDataSource {
return {
createSession: async () => makeSession(SESSION_ID),
fetchSession: async (id: string) => ({
session: makeSession(id),
turns: [],
pendingQuestion: null,
}),
sendMessage: sendImpl,
} as unknown as ChatDataSource;
}

describe('ChatMachine send error handling', () => {
beforeEach(() => {
installWindowWithSessionStorage();
});

it('restores the typed prompt and shows a banner on a genuine stream error', async () => {
// The answer streams partially, then a terminal error arrives. The send
// must fail non-destructively: the optimistic turn is removed, the user's
// prompt is returned to the input (never silently lost), and a retryable
// banner is shown.
const dataSource = makeDataSource(async function* () {
yield { type: 'content', content: 'Partial answer' } as StreamChunk;
yield { type: 'error', error: 'boom' } as StreamChunk;
});
const machine = new ChatMachine({ dataSource, rateLimiter });
machine.setSessionId(SESSION_ID);

await machine.sendMessage('Hello world');

const input = machine.getInputSnapshot();
const messaging = machine.getMessagingSnapshot();
expect(input.message).toBe('Hello world');
expect(messaging.streamError).toBeTruthy();
expect(messaging.turns).toHaveLength(0);
});

it('restores the prompt without an error banner when the stream is aborted', async () => {
// A user stop / unmount surfaces as a real AbortError (see
// MessageTransport): the machine takes the soft-cancel path — prompt
// restored, no error banner.
const abortError = new Error('aborted');
abortError.name = 'AbortError';
const dataSource = makeDataSource(async function* () {
yield { type: 'content', content: 'Partial' } as StreamChunk;
throw abortError;
});
const machine = new ChatMachine({ dataSource, rateLimiter });
machine.setSessionId(SESSION_ID);

await machine.sendMessage('Hello again');

const input = machine.getInputSnapshot();
const messaging = machine.getMessagingSnapshot();
expect(input.message).toBe('Hello again');
expect(messaging.streamError).toBeNull();
});
});
8 changes: 7 additions & 1 deletion ui/src/bichat/machine/ChatMachine.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1279,7 +1279,13 @@ export class ChatMachine {
});

const normalized = normalizeRPCError(err, "Failed to send message");
this._updateInput({ inputError: normalized.userMessage });
// Restore the user's prompt to the input so a failed send never silently
// loses what they typed (mirrors the abort path above). The error banner
// and Retry (lastSendAttempt) remain available for re-sending.
this._updateInput({
message: content,
inputError: normalized.userMessage,
});
this._updateMessaging({
streamError: normalized.userMessage,
streamErrorRetryable: normalized.retryable,
Expand Down
Loading