From 6a78d7c7de5a405ceb461f2337878f1f8676e6e4 Mon Sep 17 00:00:00 2001 From: nrslib <38722970+nrslib@users.noreply.github.com> Date: Thu, 28 May 2026 18:43:24 +0900 Subject: [PATCH] takt: fix-codex-reconnect-retry --- src/__tests__/codex-client-retry.test.ts | 339 ++++++++++++++++++ src/__tests__/codex-stream-handler.test.ts | 94 ++++- src/__tests__/shared-utils-exports.test.ts | 8 + .../tasks/execute/traceReportRedaction.ts | 13 +- src/infra/codex/CodexStreamHandler.ts | 96 ++++- src/infra/codex/client.ts | 59 ++- src/shared/utils/sensitiveText.ts | 48 +++ 7 files changed, 618 insertions(+), 39 deletions(-) create mode 100644 src/__tests__/shared-utils-exports.test.ts create mode 100644 src/shared/utils/sensitiveText.ts diff --git a/src/__tests__/codex-client-retry.test.ts b/src/__tests__/codex-client-retry.test.ts index 2eaf3e2b7..706ec4f6d 100644 --- a/src/__tests__/codex-client-retry.test.ts +++ b/src/__tests__/codex-client-retry.test.ts @@ -11,6 +11,12 @@ let runPlanIndex = 0; let startThreadCalls: Array | undefined> = []; let resumeThreadCalls: Array<{ threadId: string; options?: Record }> = []; const CODEX_STREAM_IDLE_TIMEOUT_MS = 10 * 60 * 1000; +const CODEX_RECONNECT_FAILURE_MESSAGE = 'Reconnecting... 2/5 (timeout waiting for child process to exit)'; +const CODEX_RECONNECT_RETRYABLE_MESSAGES = [ + 'Reconnecting... 2/5', + 'timeout waiting for child process to exit', + CODEX_RECONNECT_FAILURE_MESSAGE, +]; function createEvents(events: MockEvent[]) { return (async function* () { @@ -47,6 +53,21 @@ function createIdleTimeoutPlan(onThreadStarted?: () => void): RunPlan { }; } +function createReconnectCommandFailureEvents(message: string, command: string): MockEvent[] { + return [ + { type: 'thread.started', thread_id: 'thread-1' }, + { + type: 'item.started', + item: { + id: 'cmd-e2e', + type: 'command_execution', + command, + }, + }, + { type: 'turn.failed', error: { message } }, + ]; +} + function createThread(id: string) { return { id, @@ -96,6 +117,7 @@ describe('CodexClient retry', () => { }); afterEach(() => { + vi.clearAllTimers(); vi.useRealTimers(); }); @@ -255,6 +277,323 @@ describe('CodexClient retry', () => { expect(result.content).toBe('Selected model is at capacity. Please try a different model.'); }); + it.each(CODEX_RECONNECT_RETRYABLE_MESSAGES)( + 'turn.failed の %s provider_error を 1 秒後に retry して成功を返す', + async (reconnectMessage) => { + vi.useFakeTimers(); + + runPlans = [ + { + type: 'events', + events: [ + { type: 'thread.started', thread_id: 'thread-1' }, + { type: 'turn.failed', error: { message: reconnectMessage } }, + ], + }, + { + type: 'events', + events: [ + { type: 'thread.started', thread_id: 'thread-1' }, + { type: 'item.completed', item: { id: 'msg-reconnect', type: 'agent_message', text: 'reconnect retry succeeded' } }, + { type: 'turn.completed', usage: { input_tokens: 1, output_tokens: 2 } }, + ], + }, + ]; + + const client = new CodexClient(); + + const resultPromise = client.call('coder', 'prompt', { cwd: '/tmp' }); + + await vi.advanceTimersByTimeAsync(999); + expect(resumeThreadCalls).toHaveLength(0); + + await vi.advanceTimersByTimeAsync(1); + const result = await resultPromise; + + expect(startThreadCalls).toHaveLength(1); + expect(resumeThreadCalls).toEqual([ + { + threadId: 'thread-1', + options: expect.objectContaining({ workingDirectory: '/tmp' }), + }, + ]); + expect(result.status).toBe('done'); + expect(result.content).toBe('reconnect retry succeeded'); + }, + ); + + it('stream error event の Reconnecting provider_error を 1 秒後に retry して成功を返す', async () => { + vi.useFakeTimers(); + + runPlans = [ + { + type: 'events', + events: [ + { type: 'thread.started', thread_id: 'thread-1' }, + { type: 'error', message: CODEX_RECONNECT_FAILURE_MESSAGE }, + ], + }, + { + type: 'events', + events: [ + { type: 'thread.started', thread_id: 'thread-1' }, + { type: 'item.completed', item: { id: 'msg-reconnect-event-error', type: 'agent_message', text: 'stream error retry succeeded' } }, + { type: 'turn.completed', usage: { input_tokens: 1, output_tokens: 2 } }, + ], + }, + ]; + + const client = new CodexClient(); + + const resultPromise = client.call('coder', 'prompt', { cwd: '/tmp' }); + + await vi.advanceTimersByTimeAsync(999); + expect(resumeThreadCalls).toHaveLength(0); + + await vi.advanceTimersByTimeAsync(1); + const result = await resultPromise; + + expect(startThreadCalls).toHaveLength(1); + expect(resumeThreadCalls).toEqual([ + { + threadId: 'thread-1', + options: expect.objectContaining({ workingDirectory: '/tmp' }), + }, + ]); + expect(result.status).toBe('done'); + expect(result.content).toBe('stream error retry succeeded'); + }); + + it.each(CODEX_RECONNECT_RETRYABLE_MESSAGES)( + '例外経路の %s provider_error を 1 秒後に retry して成功を返す', + async (reconnectMessage) => { + vi.useFakeTimers(); + + runPlans = [ + { type: 'throw', error: new Error(reconnectMessage) }, + { + type: 'events', + events: [ + { type: 'thread.started', thread_id: 'thread-1' }, + { type: 'item.completed', item: { id: 'msg-reconnect-exception', type: 'agent_message', text: 'exception retry succeeded' } }, + { type: 'turn.completed', usage: { input_tokens: 2, output_tokens: 3 } }, + ], + }, + ]; + + const client = new CodexClient(); + + const resultPromise = client.call('coder', 'prompt', { cwd: '/tmp' }); + + await vi.advanceTimersByTimeAsync(999); + expect(resumeThreadCalls).toHaveLength(0); + + await vi.advanceTimersByTimeAsync(1); + const result = await resultPromise; + + expect(startThreadCalls).toHaveLength(1); + expect(resumeThreadCalls).toEqual([ + { + threadId: 'thread-1', + options: expect.objectContaining({ workingDirectory: '/tmp' }), + }, + ]); + expect(result.status).toBe('done'); + expect(result.content).toBe('exception retry succeeded'); + }, + ); + + it('Reconnecting 系 provider_error の retry を使い切った場合は実行中 command の結果不明を診断する', async () => { + vi.useFakeTimers(); + + runPlans = Array.from({ length: 9 }, () => ({ + type: 'events' as const, + events: [ + ...createReconnectCommandFailureEvents( + CODEX_RECONNECT_FAILURE_MESSAGE, + 'npm run test:e2e:mock', + ), + ], + })); + + const client = new CodexClient(); + const onStream = vi.fn(); + const resultPromise = client.call('coder', 'prompt', { cwd: '/tmp', onStream }); + await vi.runAllTimersAsync(); + const result = await resultPromise; + + expect(startThreadCalls).toHaveLength(1); + expect(resumeThreadCalls).toHaveLength(8); + expect(result.status).toBe('error'); + expect(result.failureCategory).toBe('provider_error'); + expect(result.content).toContain('provider reconnect failure'); + expect(result.content).toContain(CODEX_RECONNECT_FAILURE_MESSAGE); + expect(result.content).toContain('Active tool: Bash'); + expect(result.content).toContain('Bash command: npm run test:e2e:mock'); + expect(result.content).toContain('Command result: unknown'); + expect(onStream).toHaveBeenCalledWith({ + type: 'result', + data: expect.objectContaining({ + success: false, + error: expect.stringContaining('provider reconnect failure'), + failureCategory: 'provider_error', + }), + }); + }); + + it('例外経路の Reconnecting 系 provider_error の retry を使い切った場合も実行中 command の結果不明を診断する', async () => { + vi.useFakeTimers(); + + runPlans = Array.from({ length: 9 }, () => ({ + type: 'stream' as const, + createEvents: async function* () { + yield { type: 'thread.started', thread_id: 'thread-1' }; + yield { + type: 'item.started', + item: { + id: 'cmd-exception', + type: 'command_execution', + command: 'npm run test:e2e:mock', + }, + }; + throw new Error(CODEX_RECONNECT_FAILURE_MESSAGE); + }, + })); + + const client = new CodexClient(); + const resultPromise = client.call('coder', 'prompt', { cwd: '/tmp' }); + await vi.runAllTimersAsync(); + const result = await resultPromise; + + expect(startThreadCalls).toHaveLength(1); + expect(resumeThreadCalls).toHaveLength(8); + expect(result.status).toBe('error'); + expect(result.failureCategory).toBe('provider_error'); + expect(result.content).toContain('provider reconnect failure'); + expect(result.content).toContain(CODEX_RECONNECT_FAILURE_MESSAGE); + expect(result.content).toContain('Active tool: Bash'); + expect(result.content).toContain('Bash command: npm run test:e2e:mock'); + expect(result.content).toContain('Command result: unknown'); + }); + + it('Reconnecting 系 provider_error の command 診断は機密値をマスクする', async () => { + vi.useFakeTimers(); + + const secretCommand = [ + 'curl -H "Authorization: Bearer sk-abcdefghijklmnopqrstuvwxyz"', + 'curl -H "Authorization: Basic dXNlcjpwYXNz"', + 'curl -H "Authorization: Bearer leading-space-token"', + 'curl -H "Cookie: sessionid=plain-session-id; theme=dark"', + 'curl -H "Set-Cookie: sessionid=set-cookie-secret; Path=/"', + 'curl -u user:plain-password', + 'curl -uuser:compact-password', + 'curl --user other:other-password', + 'curl --user=third:third-password', + 'curl --proxy-user proxy:proxy-password', + 'curl --proxy-user=proxy-eq:proxy-eq-password', + 'curl https://url-user:url-password@example.test/path', + 'https://example.test?api_key=query-secret', + '--token ghp_abcdefghijklmnopqrstuvwxyz1234567890', + 'OPENAI_API_KEY=sk-proj-secret_1234567890', + "PASSWORD='correct horse battery staple'", + "AWS_SECRET_ACCESS_KEY='aws secret phrase with spaces'", + 'SERVICE_PRIVATE_KEY="private key phrase with spaces"', + '--aws-access-key-id access-key-secret', + 'PASSWORD="abc\\" double assignment leaked tail"', + "SECRET='abc\\' single assignment leaked tail'", + '--token "abc\\" double option leaked tail"', + "--private-key 'abc\\' single option leaked tail'", + ].join(' '); + + runPlans = Array.from({ length: 9 }, () => ({ + type: 'events' as const, + events: [ + { type: 'thread.started', thread_id: 'thread-1' }, + { + type: 'item.started', + item: { + id: 'cmd-secret', + type: 'command_execution', + command: secretCommand, + }, + }, + { type: 'turn.failed', error: { message: CODEX_RECONNECT_FAILURE_MESSAGE } }, + ], + })); + + const client = new CodexClient(); + const resultPromise = client.call('coder', 'prompt', { cwd: '/tmp' }); + await vi.runAllTimersAsync(); + const result = await resultPromise; + + expect(result.status).toBe('error'); + expect(result.content).toContain('Bash command:'); + expect(result.content).toContain('Authorization: Bearer [REDACTED]'); + expect(result.content).toContain('Authorization: Basic [REDACTED]'); + expect(result.content).toContain('Authorization: Bearer [REDACTED]'); + expect(result.content).toContain('Cookie: [REDACTED]'); + expect(result.content).toContain('Set-Cookie: [REDACTED]'); + expect(result.content).toContain('-u [REDACTED]'); + expect(result.content).toContain('-u[REDACTED]'); + expect(result.content).toContain('--user [REDACTED]'); + expect(result.content).toContain('--user=[REDACTED]'); + expect(result.content).toContain('--proxy-user [REDACTED]'); + expect(result.content).toContain('--proxy-user=[REDACTED]'); + expect(result.content).toContain('https://[REDACTED]@example.test/path'); + expect(result.content).toContain('api_key=[REDACTED]'); + expect(result.content).toContain('--token [REDACTED]'); + expect(result.content).toContain('OPENAI_API_KEY=[REDACTED]'); + expect(result.content).toContain("PASSWORD='[REDACTED]'"); + expect(result.content).toContain("AWS_SECRET_ACCESS_KEY='[REDACTED]'"); + expect(result.content).toContain('SERVICE_PRIVATE_KEY="[REDACTED]"'); + expect(result.content).toContain('--aws-access-key-id [REDACTED]'); + expect(result.content).toContain('PASSWORD="[REDACTED]"'); + expect(result.content).toContain("SECRET='[REDACTED]'"); + expect(result.content).toContain('--private-key [REDACTED]'); + expect(result.content).not.toContain('sk-abcdefghijklmnopqrstuvwxyz'); + expect(result.content).not.toContain('dXNlcjpwYXNz'); + expect(result.content).not.toContain('leading-space-token'); + expect(result.content).not.toContain('plain-session-id'); + expect(result.content).not.toContain('set-cookie-secret'); + expect(result.content).not.toContain('plain-password'); + expect(result.content).not.toContain('compact-password'); + expect(result.content).not.toContain('other-password'); + expect(result.content).not.toContain('third-password'); + expect(result.content).not.toContain('proxy-password'); + expect(result.content).not.toContain('proxy-eq-password'); + expect(result.content).not.toContain('url-password'); + expect(result.content).not.toContain('query-secret'); + expect(result.content).not.toContain('ghp_abcdefghijklmnopqrstuvwxyz1234567890'); + expect(result.content).not.toContain('sk-proj-secret_1234567890'); + expect(result.content).not.toContain('correct horse battery staple'); + expect(result.content).not.toContain('aws secret phrase with spaces'); + expect(result.content).not.toContain('private key phrase with spaces'); + expect(result.content).not.toContain('access-key-secret'); + expect(result.content).not.toContain('double assignment leaked tail'); + expect(result.content).not.toContain('single assignment leaked tail'); + expect(result.content).not.toContain('double option leaked tail'); + expect(result.content).not.toContain('single option leaked tail'); + expect(result.error).not.toContain('correct horse battery staple'); + expect(result.error).not.toContain('dXNlcjpwYXNz'); + expect(result.error).not.toContain('leading-space-token'); + expect(result.error).not.toContain('plain-session-id'); + expect(result.error).not.toContain('set-cookie-secret'); + expect(result.error).not.toContain('plain-password'); + expect(result.error).not.toContain('compact-password'); + expect(result.error).not.toContain('other-password'); + expect(result.error).not.toContain('third-password'); + expect(result.error).not.toContain('proxy-password'); + expect(result.error).not.toContain('proxy-eq-password'); + expect(result.error).not.toContain('url-password'); + expect(result.error).not.toContain('aws secret phrase with spaces'); + expect(result.error).not.toContain('private key phrase with spaces'); + expect(result.error).not.toContain('access-key-secret'); + expect(result.error).not.toContain('double assignment leaked tail'); + expect(result.error).not.toContain('single assignment leaked tail'); + expect(result.error).not.toContain('double option leaked tail'); + expect(result.error).not.toContain('single option leaked tail'); + }); + it('ストリームの idle timeout を 1 回 retry して成功を返す', async () => { vi.useFakeTimers(); diff --git a/src/__tests__/codex-stream-handler.test.ts b/src/__tests__/codex-stream-handler.test.ts index 48b49ee01..c76cd418a 100644 --- a/src/__tests__/codex-stream-handler.test.ts +++ b/src/__tests__/codex-stream-handler.test.ts @@ -1,5 +1,10 @@ import { describe, expect, it, vi } from 'vitest'; -import { emitResult } from '../infra/codex/CodexStreamHandler.js'; +import { + createStreamTrackingState, + emitCodexItemCompleted, + emitCodexItemStart, + emitResult, +} from '../infra/codex/CodexStreamHandler.js'; import type { StreamCallback } from '../core/workflow/types.js'; describe('CodexStreamHandler emitResult', () => { @@ -20,3 +25,90 @@ describe('CodexStreamHandler emitResult', () => { }); }); }); + +describe('CodexStreamHandler active tool tracking', () => { + it('user-facing stream callback がなくても command_execution の active tool state を更新する', () => { + const state = createStreamTrackingState(); + + emitCodexItemStart( + { id: 'cmd-1', type: 'command_execution', command: 'npm run test:e2e:mock' }, + undefined, + state, + ); + + expect(state.activeTool).toEqual({ + id: 'cmd-1', + tool: 'Bash', + input: { command: 'npm run test:e2e:mock' }, + }); + + emitCodexItemCompleted( + { + id: 'cmd-1', + type: 'command_execution', + status: 'completed', + exit_code: 0, + aggregated_output: 'done', + }, + undefined, + state, + ); + + expect(state.activeTool).toBeUndefined(); + }); + + it('id が無い command_execution completed でも active tool state を残さない', () => { + const state = createStreamTrackingState(); + + emitCodexItemStart( + { type: 'command_execution', command: 'npm run test:e2e:mock' }, + undefined, + state, + ); + + expect(state.activeTool).toEqual({ + id: expect.stringMatching(/^item_/), + tool: 'Bash', + input: { command: 'npm run test:e2e:mock' }, + }); + + emitCodexItemCompleted( + { + type: 'command_execution', + status: 'completed', + exit_code: 0, + aggregated_output: 'done', + }, + undefined, + state, + ); + + expect(state.activeTool).toBeUndefined(); + }); + + it('id が無い command_execution 完了後の次の command を active tool state に記録する', () => { + const state = createStreamTrackingState(); + + emitCodexItemStart({ type: 'command_execution', command: 'npm test' }, undefined, state); + const firstId = state.activeTool?.id; + + emitCodexItemCompleted( + { + type: 'command_execution', + status: 'completed', + exit_code: 0, + aggregated_output: 'done', + }, + undefined, + state, + ); + emitCodexItemStart({ type: 'command_execution', command: 'npm run lint' }, undefined, state); + + expect(state.activeTool).toEqual({ + id: expect.stringMatching(/^item_/), + tool: 'Bash', + input: { command: 'npm run lint' }, + }); + expect(state.activeTool?.id).not.toBe(firstId); + }); +}); diff --git a/src/__tests__/shared-utils-exports.test.ts b/src/__tests__/shared-utils-exports.test.ts new file mode 100644 index 000000000..4aa71036b --- /dev/null +++ b/src/__tests__/shared-utils-exports.test.ts @@ -0,0 +1,8 @@ +import { describe, expect, it } from 'vitest'; + +describe('shared utils exports', () => { + it('should keep reconnect redaction helper out of the shared utils barrel', async () => { + const utils = await import('../shared/utils/index.js'); + expect('sanitizeSensitiveText' in utils).toBe(false); + }); +}); diff --git a/src/features/tasks/execute/traceReportRedaction.ts b/src/features/tasks/execute/traceReportRedaction.ts index 027da81e9..a0a783ce9 100644 --- a/src/features/tasks/execute/traceReportRedaction.ts +++ b/src/features/tasks/execute/traceReportRedaction.ts @@ -3,18 +3,7 @@ import type { TraceReportMode, TraceReportParams, } from './traceReportTypes.js'; - -export function sanitizeSensitiveText(text: string): string { - if (!text) return text; - return text - .replace(/(Authorization\s*:\s*Bearer\s+)([^\s]+)/gi, '$1[REDACTED]') - .replace( - /(["']?(?:api[_-]?key|token|password|secret|access[_-]?token|refresh[_-]?token)["']?\s*[:=]\s*["']?)([^"',\s}\]]+)(["']?)/gi, - '$1[REDACTED]$3', - ) - .replace(/([?&](?:api[_-]?key|token|password|secret)=)([^&\s]+)/gi, '$1[REDACTED]') - .replace(/\b(?:sk-[A-Za-z0-9]{8,}|ghp_[A-Za-z0-9]{8,}|xox[baprs]-[A-Za-z0-9-]{8,})\b/g, '[REDACTED]'); -} +import { sanitizeSensitiveText } from '../../../shared/utils/sensitiveText.js'; function transformText(text: string, mode: TraceReportMode): string { if (!text) { diff --git a/src/infra/codex/CodexStreamHandler.ts b/src/infra/codex/CodexStreamHandler.ts index ca16695d3..bbd6fec6b 100644 --- a/src/infra/codex/CodexStreamHandler.ts +++ b/src/infra/codex/CodexStreamHandler.ts @@ -6,7 +6,7 @@ */ import type { AgentFailureCategory } from '../../shared/types/agent-failure.js'; -import type { StreamCallback } from '../../shared/types/provider.js'; +import type { StreamCallback, StreamToolUseEventData } from '../../shared/types/provider.js'; export type CodexEvent = { type: string; @@ -25,6 +25,8 @@ export interface StreamTrackingState { outputOffsets: Map; textOffsets: Map; thinkingOffsets: Map; + anonymousItemIds: Map; + activeTool?: StreamToolUseEventData; } export function createStreamTrackingState(): StreamTrackingState { @@ -33,6 +35,7 @@ export function createStreamTrackingState(): StreamTrackingState { outputOffsets: new Map(), textOffsets: new Map(), thinkingOffsets: new Map(), + anonymousItemIds: new Map(), }; } @@ -80,6 +83,46 @@ export function emitToolUse( onStream({ type: 'tool_use', data: { tool, input, id } }); } +function recordToolUse( + state: StreamTrackingState, + tool: string, + input: Record, + id: string, +): void { + state.activeTool = { tool, input, id }; +} + +function clearActiveTool(state: StreamTrackingState, id: string): void { + if (state.activeTool?.id === id) { + state.activeTool = undefined; + } +} + +function resolveCodexItemId(item: CodexItem, state: StreamTrackingState): string { + if (item.id) { + return item.id; + } + + const existingId = state.anonymousItemIds.get(item.type); + if (existingId) { + return existingId; + } + + const id = `item_${Math.random().toString(36).slice(2, 10)}`; + state.anonymousItemIds.set(item.type, id); + return id; +} + +function releaseAnonymousItemId(item: CodexItem, state: StreamTrackingState, id: string): void { + if (item.id) { + return; + } + + if (state.anonymousItemIds.get(item.type) === id) { + state.anonymousItemIds.delete(item.type); + } +} + export function emitToolResult( onStream: StreamCallback | undefined, content: string, @@ -132,37 +175,48 @@ export function formatFileChangeSummary(changes: Array<{ path?: string; kind?: s export function emitCodexItemStart( item: CodexItem, onStream: StreamCallback | undefined, - startedItems: Set, + state: StreamTrackingState, ): void { - if (!onStream) return; - const id = item.id || `item_${Math.random().toString(36).slice(2, 10)}`; - if (startedItems.has(id)) return; + emitCodexItemStartWithId(item, onStream, state, resolveCodexItemId(item, state)); +} + +function emitCodexItemStartWithId( + item: CodexItem, + onStream: StreamCallback | undefined, + state: StreamTrackingState, + id: string, +): void { + if (state.startedItems.has(id)) return; switch (item.type) { case 'command_execution': { const command = typeof item.command === 'string' ? item.command : ''; + recordToolUse(state, 'Bash', { command }, id); emitToolUse(onStream, 'Bash', { command }, id); - startedItems.add(id); + state.startedItems.add(id); break; } case 'mcp_tool_call': { const tool = typeof item.tool === 'string' ? item.tool : 'Tool'; const args = (item.arguments ?? {}) as Record; + recordToolUse(state, tool, args, id); emitToolUse(onStream, tool, args, id); - startedItems.add(id); + state.startedItems.add(id); break; } case 'web_search': { const query = typeof item.query === 'string' ? item.query : ''; + recordToolUse(state, 'WebSearch', { query }, id); emitToolUse(onStream, 'WebSearch', { query }, id); - startedItems.add(id); + state.startedItems.add(id); break; } case 'file_change': { const changes = Array.isArray(item.changes) ? item.changes : []; const summary = formatFileChangeSummary(changes as Array<{ path?: string; kind?: string }>); + recordToolUse(state, 'Edit', { file_path: summary || 'patch' }, id); emitToolUse(onStream, 'Edit', { file_path: summary || 'patch' }, id); - startedItems.add(id); + state.startedItems.add(id); break; } default: @@ -175,8 +229,7 @@ export function emitCodexItemCompleted( onStream: StreamCallback | undefined, state: StreamTrackingState, ): void { - if (!onStream) return; - const id = item.id || `item_${Math.random().toString(36).slice(2, 10)}`; + const id = resolveCodexItemId(item, state); switch (item.type) { case 'reasoning': { @@ -203,7 +256,7 @@ export function emitCodexItemCompleted( } case 'command_execution': { if (!state.startedItems.has(id)) { - emitCodexItemStart(item, onStream, state.startedItems); + emitCodexItemStartWithId(item, onStream, state, id); } const output = typeof item.aggregated_output === 'string' ? item.aggregated_output : ''; if (output) { @@ -218,11 +271,12 @@ export function emitCodexItemCompleted( const isError = status === 'failed' || (exitCode !== undefined && exitCode !== 0); const content = output || (exitCode !== undefined ? `Exit code: ${exitCode}` : ''); emitToolResult(onStream, content, isError); + clearActiveTool(state, id); break; } case 'mcp_tool_call': { if (!state.startedItems.has(id)) { - emitCodexItemStart(item, onStream, state.startedItems); + emitCodexItemStartWithId(item, onStream, state, id); } const status = typeof item.status === 'string' ? item.status : ''; const isError = status === 'failed' || !!item.error; @@ -239,29 +293,34 @@ export function emitCodexItemCompleted( } } emitToolResult(onStream, content, isError); + clearActiveTool(state, id); break; } case 'web_search': { if (!state.startedItems.has(id)) { - emitCodexItemStart(item, onStream, state.startedItems); + emitCodexItemStartWithId(item, onStream, state, id); } emitToolResult(onStream, 'Search completed', false); + clearActiveTool(state, id); break; } case 'file_change': { if (!state.startedItems.has(id)) { - emitCodexItemStart(item, onStream, state.startedItems); + emitCodexItemStartWithId(item, onStream, state, id); } const status = typeof item.status === 'string' ? item.status : ''; const isError = status === 'failed'; const changes = Array.isArray(item.changes) ? item.changes : []; const summary = formatFileChangeSummary(changes as Array<{ path?: string; kind?: string }>); emitToolResult(onStream, summary || 'Applied patch', isError); + clearActiveTool(state, id); break; } default: break; } + + releaseAnonymousItemId(item, state, id); } export function emitCodexItemUpdate( @@ -269,13 +328,12 @@ export function emitCodexItemUpdate( onStream: StreamCallback | undefined, state: StreamTrackingState, ): void { - if (!onStream) return; - const id = item.id || `item_${Math.random().toString(36).slice(2, 10)}`; + const id = resolveCodexItemId(item, state); switch (item.type) { case 'command_execution': { if (!state.startedItems.has(id)) { - emitCodexItemStart(item, onStream, state.startedItems); + emitCodexItemStartWithId(item, onStream, state, id); } const output = typeof item.aggregated_output === 'string' ? item.aggregated_output : ''; if (output) { @@ -313,7 +371,7 @@ export function emitCodexItemUpdate( case 'mcp_tool_call': case 'web_search': { if (!state.startedItems.has(id)) { - emitCodexItemStart(item, onStream, state.startedItems); + emitCodexItemStartWithId(item, onStream, state, id); } break; } diff --git a/src/infra/codex/client.ts b/src/infra/codex/client.ts index b1db98ee9..f1ae79d17 100644 --- a/src/infra/codex/client.ts +++ b/src/infra/codex/client.ts @@ -8,6 +8,7 @@ import { Codex, type TurnOptions } from '@openai/codex-sdk'; import { USAGE_MISSING_REASONS } from '../../core/logging/contracts.js'; import type { AgentResponse, ProviderUsageSnapshot } from '../../core/models/index.js'; import { createLogger, getErrorMessage, createStreamDiagnostics, parseStructuredOutput, type StreamDiagnostics } from '../../shared/utils/index.js'; +import { sanitizeSensitiveText } from '../../shared/utils/sensitiveText.js'; import { AGENT_FAILURE_CATEGORIES, classifyAbortSignalReason, @@ -17,6 +18,7 @@ import { type AgentFailureCategory, type AgentFailureDetail, } from '../../shared/types/agent-failure.js'; +import type { StreamToolUseEventData } from '../../shared/types/provider.js'; import { mapToCodexSandboxMode, type CodexCallOptions } from './types.js'; import { type CodexEvent, @@ -39,6 +41,10 @@ const CODEX_STREAM_ABORTED_MESSAGE = 'Codex execution aborted'; const CODEX_TIMEOUT_MAX_RETRIES = 2; const CODEX_RETRY_MAX_RETRIES = 8; const CODEX_RETRY_BASE_DELAY_MS = 1000; +const CODEX_RECONNECT_ERROR_PATTERNS = [ + 'reconnecting...', + 'timeout waiting for child process to exit', +]; const CODEX_RETRYABLE_ERROR_PATTERNS = [ 'stream disconnected before completion', 'transport error', @@ -49,6 +55,7 @@ const CODEX_RETRYABLE_ERROR_PATTERNS = [ 'eai_again', 'fetch failed', 'at capacity', + ...CODEX_RECONNECT_ERROR_PATTERNS, ]; function toNumber(value: unknown): number | undefined { @@ -106,6 +113,43 @@ export class CodexClient { return CODEX_RETRYABLE_ERROR_PATTERNS.some((pattern) => lower.includes(pattern)); } + private isReconnectFailure(message: string): boolean { + const lower = message.toLowerCase(); + return CODEX_RECONNECT_ERROR_PATTERNS.some((pattern) => lower.includes(pattern)); + } + + private withReconnectFailureDiagnostics( + failure: AgentFailureDetail, + activeTool: StreamToolUseEventData | undefined, + ): AgentFailureDetail { + if ( + failure.category !== AGENT_FAILURE_CATEGORIES.PROVIDER_ERROR + || !this.isReconnectFailure(failure.reason) + ) { + return failure; + } + + const lines = [ + 'provider reconnect failure', + `Original error: ${failure.reason}`, + ]; + if (activeTool) { + lines.push(`Active tool: ${activeTool.tool}`); + const command = activeTool.tool === 'Bash' && typeof activeTool.input.command === 'string' + ? activeTool.input.command + : undefined; + if (command) { + lines.push(`Bash command: ${sanitizeSensitiveText(command)}`); + } + } + lines.push('Command result: unknown'); + + return { + ...failure, + reason: lines.join('\n'), + }; + } + private async waitForRetryDelay(attempt: number, signal?: AbortSignal): Promise { const delayMs = CODEX_RETRY_BASE_DELAY_MS * (2 ** Math.max(0, attempt - 1)); await new Promise((resolve, reject) => { @@ -256,6 +300,7 @@ export class CodexClient { const timeoutMessage = `Codex stream timed out after ${Math.floor(CODEX_STREAM_IDLE_TIMEOUT_MS / 60000)} minutes of inactivity`; let abortCause: 'timeout' | 'external' | undefined; let diagRef: StreamDiagnostics | undefined; + const state = createStreamTrackingState(); const resetIdleTimeout = (): void => { if (idleTimeoutId !== undefined) { @@ -281,7 +326,6 @@ export class CodexClient { options.abortSignal.addEventListener('abort', onExternalAbort, { once: true }); } } - try { log.debug('Executing Codex thread', { agentType, @@ -307,7 +351,6 @@ export class CodexClient { let success = true; let failureMessage = ''; let providerUsage: ProviderUsageSnapshot | undefined; - const state = createStreamTrackingState(); for await (const event of events as AsyncGenerator) { resetIdleTimeout(); @@ -344,7 +387,7 @@ export class CodexClient { if (event.type === 'item.started') { const item = event.item as CodexItem | undefined; if (item) { - emitCodexItemStart(item, options.onStream, state.startedItems); + emitCodexItemStart(item, options.onStream, state); } continue; } @@ -426,13 +469,14 @@ export class CodexClient { continue; } - const errorResponse = this.buildErrorResponse(agentType, currentThreadId, failure); + const finalFailure = this.withReconnectFailureDiagnostics(failure, state.activeTool); + const errorResponse = this.buildErrorResponse(agentType, currentThreadId, finalFailure); emitResult( options.onStream, false, errorResponse.error ?? errorResponse.content, currentThreadId, - failure.category, + finalFailure.category, ); return errorResponse; } @@ -491,13 +535,14 @@ export class CodexClient { continue; } - const errorResponse = this.buildErrorResponse(agentType, currentThreadId, failure); + const finalFailure = this.withReconnectFailureDiagnostics(failure, state.activeTool); + const errorResponse = this.buildErrorResponse(agentType, currentThreadId, finalFailure); emitResult( options.onStream, false, errorResponse.error ?? errorResponse.content, currentThreadId, - failure.category, + finalFailure.category, ); return errorResponse; diff --git a/src/shared/utils/sensitiveText.ts b/src/shared/utils/sensitiveText.ts new file mode 100644 index 000000000..c05eea0c6 --- /dev/null +++ b/src/shared/utils/sensitiveText.ts @@ -0,0 +1,48 @@ +const SENSITIVE_KEY_PATTERN = String.raw`[A-Za-z0-9_.-]*(?:api[_-]?key|token|password|secret|access[_-]?key|access[_-]?token|refresh[_-]?token|private[_-]?key)[A-Za-z0-9_.-]*`; +const SENSITIVE_QUOTED_VALUE_PATTERN = String.raw`"(?:\\.|[^"\\])*"|'(?:\\.|[^'\\])*'`; +const SENSITIVE_ASSIGNMENT_QUOTED_VALUE_REGEX = new RegExp( + String.raw`(["']?(?:${SENSITIVE_KEY_PATTERN})["']?\s*[:=]\s*)(${SENSITIVE_QUOTED_VALUE_PATTERN})`, + 'gi', +); +const SENSITIVE_ASSIGNMENT_BARE_VALUE_REGEX = new RegExp( + String.raw`(["']?(?:${SENSITIVE_KEY_PATTERN})["']?\s*[:=]\s*["']?)([^"'[,\s}\]]+)(["']?)`, + 'gi', +); +const SENSITIVE_OPTION_VALUE_REGEX = new RegExp( + String.raw`(--(?:${SENSITIVE_KEY_PATTERN})(?:=|\s+))(${SENSITIVE_QUOTED_VALUE_PATTERN}|[^\s]+)`, + 'gi', +); +const HTTP_AUTHORIZATION_HEADER_REGEX = /\b(Authorization\s*:\s*)([^"'\r\n]+)/gi; +const HTTP_COOKIE_HEADER_REGEX = /\b((?:Set-)?Cookie\s*:\s*)([^"'\r\n]+)/gi; +const URL_USERINFO_CREDENTIALS_REGEX = /(\b[A-Za-z][A-Za-z0-9+.-]*:\/\/)([^/?#\s@]+:[^/?#\s@]+)@/g; +const CURL_USER_EQUALS_REGEX = /(\B--(?:proxy-)?user=)(?:"(?:\\.|[^"\\])*"|'(?:\\.|[^'\\])*'|[^\s]+)/gi; +const CURL_USER_SPACE_REGEX = /(\B(?:-u|--(?:proxy-)?user)\s+)(?:"(?:\\.|[^"\\])*"|'(?:\\.|[^'\\])*'|[^\s]+)/gi; +const CURL_SHORT_USER_COMPACT_REGEX = /(^|\s)(-u)(?:"(?:\\.|[^"\\])*"|'(?:\\.|[^'\\])*'|[^\s]+)/gi; + +export function sanitizeSensitiveText(text: string): string { + if (!text) return text; + return text + .replace(SENSITIVE_ASSIGNMENT_QUOTED_VALUE_REGEX, (_match, prefix: string, quotedValue: string) => { + const quote = quotedValue[0]; + return `${prefix}${quote}[REDACTED]${quote}`; + }) + .replace(URL_USERINFO_CREDENTIALS_REGEX, '$1[REDACTED]@') + .replace(HTTP_AUTHORIZATION_HEADER_REGEX, (_match, prefix: string, value: string) => { + const leadingWhitespaceLength = value.length - value.trimStart().length; + const leadingWhitespace = value.slice(0, leadingWhitespaceLength); + const trimmed = value.slice(leadingWhitespaceLength); + const authScheme = /^([A-Za-z]+)\s+/.exec(trimmed); + if (authScheme) { + return `${prefix}${leadingWhitespace}${authScheme[1]} [REDACTED]`; + } + return `${prefix}${leadingWhitespace}[REDACTED]`; + }) + .replace(HTTP_COOKIE_HEADER_REGEX, '$1[REDACTED]') + .replace(CURL_USER_EQUALS_REGEX, '$1[REDACTED]') + .replace(CURL_USER_SPACE_REGEX, '$1[REDACTED]') + .replace(CURL_SHORT_USER_COMPACT_REGEX, '$1$2[REDACTED]') + .replace(SENSITIVE_ASSIGNMENT_BARE_VALUE_REGEX, '$1[REDACTED]$3') + .replace(SENSITIVE_OPTION_VALUE_REGEX, '$1[REDACTED]') + .replace(/([?&](?:api[_-]?key|token|password|secret)=)([^&\s]+)/gi, '$1[REDACTED]') + .replace(/\b(?:sk-[A-Za-z0-9_-]{8,}|ghp_[A-Za-z0-9_]{8,}|xox[baprs]-[A-Za-z0-9-]{8,})\b/g, '[REDACTED]'); +}