Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
215 changes: 156 additions & 59 deletions src/__tests__/engine-parallel-failure.test.ts
Original file line number Diff line number Diff line change
@@ -1,16 +1,6 @@
/**
* WorkflowEngine integration tests: parallel step partial failure handling.
*
* Covers:
* - One sub-step fails while another succeeds → workflow continues
* - All sub-steps fail → workflow aborts
* - Failed sub-step is recorded as error with error message
*/

import { describe, it, expect, beforeEach, afterEach, vi } from 'vitest';
import { existsSync, rmSync } from 'node:fs';

// --- Mock setup (must be before imports that use these modules) ---
import { existsSync, readFileSync, rmSync } from 'node:fs';
import { join } from 'node:path';

vi.mock('../agents/runner.js', () => ({
runAgent: vi.fn(),
Expand All @@ -31,12 +21,11 @@ vi.mock('../shared/utils/index.js', async (importOriginal) => ({
generateReportDir: vi.fn().mockReturnValue('test-report-dir'),
}));

// --- Imports (after mocks) ---

import { WorkflowEngine } from '../core/workflow/index.js';
import { runAgent } from '../agents/runner.js';
import { detectMatchedRule } from '../core/workflow/evaluation/index.js';
import { needsStatusJudgmentPhase, runStatusJudgmentPhase } from '../core/workflow/phase-runner.js';
import { initDebugLogger, resetDebugLogger } from '../shared/utils/index.js';
import {
makeResponse,
makeStep,
Expand All @@ -47,10 +36,6 @@ import {
} from './engine-test-helpers.js';
import type { WorkflowConfig } from '../core/models/index.js';

/**
* Build a workflow config that goes directly to a parallel step:
* parallel-step (arch-review + security-review) → done
*/
function buildParallelOnlyConfig(): WorkflowConfig {
return {
name: 'test-parallel-failure',
Expand All @@ -62,26 +47,26 @@ function buildParallelOnlyConfig(): WorkflowConfig {
parallel: [
makeStep('arch-review', {
rules: [
makeRule('done', 'COMPLETE'),
makeRule('approved', 'COMPLETE'),
makeRule('needs_fix', 'fix'),
],
}),
makeStep('security-review', {
rules: [
makeRule('done', 'COMPLETE'),
makeRule('approved', 'COMPLETE'),
makeRule('needs_fix', 'fix'),
],
}),
],
rules: [
makeRule('any("done")', 'done', {
makeRule('all("approved")', 'done', {
isAggregateCondition: true,
aggregateType: 'any',
aggregateConditionText: 'done',
aggregateType: 'all',
aggregateConditionText: 'approved',
}),
makeRule('all("needs_fix")', 'fix', {
makeRule('any("needs_fix")', 'fix', {
isAggregateCondition: true,
aggregateType: 'all',
aggregateType: 'any',
aggregateConditionText: 'needs_fix',
}),
],
Expand Down Expand Up @@ -110,64 +95,67 @@ describe('WorkflowEngine Integration: Parallel Step Partial Failure', () => {
});

afterEach(() => {
resetDebugLogger();
if (existsSync(tmpDir)) {
rmSync(tmpDir, { recursive: true, force: true });
}
});

it('should continue when one sub-step fails but another succeeds', async () => {
it('should abort with parent error when one sub-step rejects and another approves', async () => {
const config = buildParallelOnlyConfig();
const engine = new WorkflowEngine(config, tmpDir, 'test task', { projectCwd: tmpDir });

const mock = vi.mocked(runAgent);
// arch-review fails (exit code 1)
mock.mockRejectedValueOnce(new Error('Claude Code process exited with code 1'));
// security-review succeeds
mock.mockImplementationOnce(async (persona, task, options) => {
options?.onPromptResolved?.({
systemPrompt: typeof persona === 'string' ? persona : '',
userInstruction: task,
});
return makeResponse({ persona: 'security-review', content: 'Security review passed' });
});
// done step
mock.mockImplementationOnce(async (persona, task, options) => {
options?.onPromptResolved?.({
systemPrompt: typeof persona === 'string' ? persona : '',
userInstruction: task,
});
return makeResponse({ persona: 'done', content: 'Completed' });
return makeResponse({ persona: 'security-review', content: '[SECURITY-REVIEW:1] approved' });
});

mockDetectMatchedRuleSequence([
// security-review sub-step rule match (arch-review has no match — it failed)
{ index: 0, method: 'phase1_tag' }, // security-review → done
{ index: 0, method: 'aggregate' }, // reviewers → any("done") matches
{ index: 0, method: 'phase1_tag' }, // done → COMPLETE
{ index: 0, method: 'phase1_tag' },
]);

const abortFn = vi.fn();
engine.on('workflow:abort', abortFn);

const state = await engine.run();

expect(state.status).toBe('completed');
expect(state.status).toBe('aborted');
expect(abortFn).toHaveBeenCalledOnce();
const reason = abortFn.mock.calls[0]![1] as string;
expect(reason).toContain('Step "reviewers" failed');
expect(reason).toContain('arch-review');
expect(reason).toContain('Claude Code process exited with code 1');
expect(reason).not.toContain('Status not found for step "reviewers"');

const reviewersOutput = state.stepOutputs.get('reviewers');
expect(reviewersOutput).toBeDefined();
expect(reviewersOutput!.status).toBe('error');
expect(reviewersOutput!.content).toContain('arch-review');
expect(reviewersOutput!.content).toContain('status: error');
expect(reviewersOutput!.content).toContain('failureCategory: none');
expect(reviewersOutput!.content).toContain('Claude Code process exited with code 1');
expect(reviewersOutput!.content).toContain('aggregate');

// arch-review should be recorded as error
const archReviewOutput = state.stepOutputs.get('arch-review');
expect(archReviewOutput).toBeDefined();
expect(archReviewOutput!.status).toBe('error');
expect(archReviewOutput!.error).toContain('exit');

// security-review should be recorded as done
const securityReviewOutput = state.stepOutputs.get('security-review');
expect(securityReviewOutput).toBeDefined();
expect(securityReviewOutput!.status).toBe('done');
});

it('should abort when all sub-steps fail', async () => {
it('should report all rejected sub-step errors through the parent error response', async () => {
const config = buildParallelOnlyConfig();
const engine = new WorkflowEngine(config, tmpDir, 'test task', { projectCwd: tmpDir });

const mock = vi.mocked(runAgent);
// Both fail
mock.mockRejectedValueOnce(new Error('Claude Code process exited with code 1'));
mock.mockRejectedValueOnce(new Error('Claude Code process exited with code 1'));

Expand All @@ -179,10 +167,20 @@ describe('WorkflowEngine Integration: Parallel Step Partial Failure', () => {
expect(state.status).toBe('aborted');
expect(abortFn).toHaveBeenCalledOnce();
const reason = abortFn.mock.calls[0]![1] as string;
expect(reason).toContain('All parallel sub-steps failed');
expect(reason).toContain('Step "reviewers" failed');
expect(reason).toContain('arch-review');
expect(reason).toContain('security-review');
expect(reason).not.toContain('All parallel sub-steps failed');

const reviewersOutput = state.stepOutputs.get('reviewers');
expect(reviewersOutput).toBeDefined();
expect(reviewersOutput!.status).toBe('error');
expect(reviewersOutput!.content).toContain('arch-review');
expect(reviewersOutput!.content).toContain('security-review');
expect(reviewersOutput!.content).toContain('status: error');
});

it('should preserve the rate limit message in workflow abort reason when all sub-steps fail with it', async () => {
it('should preserve rejected sub-step error detail in the parent diagnostic', async () => {
const config = buildParallelOnlyConfig();
const engine = new WorkflowEngine(config, tmpDir, 'test task', { projectCwd: tmpDir });

Expand All @@ -198,8 +196,12 @@ describe('WorkflowEngine Integration: Parallel Step Partial Failure', () => {
expect(state.status).toBe('aborted');
expect(abortFn).toHaveBeenCalledOnce();
const reason = abortFn.mock.calls[0]![1] as string;
expect(reason).toContain('All parallel sub-steps failed');
expect(reason).toContain('Rate limit exceeded. Please try again later.');
expect(reason).not.toContain('Status not found for step "reviewers"');

const reviewersOutput = state.stepOutputs.get('reviewers');
expect(reviewersOutput).toBeDefined();
expect(reviewersOutput!.content).toContain('Rate limit exceeded. Please try again later.');
});

it('should record failed sub-step error message in stepOutputs', async () => {
Expand All @@ -213,28 +215,123 @@ describe('WorkflowEngine Integration: Parallel Step Partial Failure', () => {
systemPrompt: typeof persona === 'string' ? persona : '',
userInstruction: task,
});
return makeResponse({ persona: 'security-review', content: 'OK' });
return makeResponse({ persona: 'security-review', content: '[SECURITY-REVIEW:1] approved' });
});

mockDetectMatchedRuleSequence([
{ index: 0, method: 'phase1_tag' },
]);

const state = await engine.run();

const archReviewOutput = state.stepOutputs.get('arch-review');
expect(archReviewOutput).toBeDefined();
expect(archReviewOutput!.error).toBe('Session resume failed');
expect(archReviewOutput!.content).toBe('');

const reviewersOutput = state.stepOutputs.get('reviewers');
expect(reviewersOutput).toBeDefined();
expect(reviewersOutput!.status).toBe('error');
expect(reviewersOutput!.error).toContain('Session resume failed');
});

it('should redact sensitive rejected sub-step error detail from parent abort reason', async () => {
const config = buildParallelOnlyConfig();
const engine = new WorkflowEngine(config, tmpDir, 'test task', { projectCwd: tmpDir });
const debugLogFile = join(tmpDir, 'parallel-debug.log');
initDebugLogger({ enabled: true, logFile: debugLogFile }, tmpDir);

const mock = vi.mocked(runAgent);
mock.mockRejectedValueOnce(new Error('Provider failed with api_key=top-secret and Authorization: Bearer sk-secret123456'));
mock.mockImplementationOnce(async (persona, task, options) => {
options?.onPromptResolved?.({
systemPrompt: typeof persona === 'string' ? persona : '',
userInstruction: task,
});
return makeResponse({ persona: 'done', content: 'Done' });
return makeResponse({ persona: 'security-review', content: '[SECURITY-REVIEW:1] approved' });
});

mockDetectMatchedRuleSequence([
{ index: 0, method: 'phase1_tag' },
{ index: 0, method: 'aggregate' },
]);

const abortFn = vi.fn();
engine.on('workflow:abort', abortFn);

const state = await engine.run();

expect(state.status).toBe('aborted');
expect(abortFn).toHaveBeenCalledOnce();
const reason = abortFn.mock.calls[0]![1] as string;
expect(reason).toContain('api_key=[REDACTED]');
expect(reason).toContain('Authorization: Bearer [REDACTED]');
expect(reason).not.toContain('top-secret');
expect(reason).not.toContain('sk-secret123456');

const reviewersOutput = state.stepOutputs.get('reviewers');
expect(reviewersOutput?.error).toBe(reviewersOutput?.content);
expect(reviewersOutput?.content).not.toContain('top-secret');
expect(reviewersOutput?.content).not.toContain('sk-secret123456');

const debugLog = readFileSync(debugLogFile, 'utf-8');
expect(debugLog).toContain('api_key=[REDACTED]');
expect(debugLog).toContain('Authorization: Bearer [REDACTED]');
expect(debugLog).not.toContain('top-secret');
expect(debugLog).not.toContain('sk-secret123456');
});

it('should promote a blocked sub-step to blocked parent response', async () => {
const config = buildParallelOnlyConfig();
const engine = new WorkflowEngine(config, tmpDir, 'test task', { projectCwd: tmpDir });

const mock = vi.mocked(runAgent);
mock.mockImplementationOnce(async (persona, task, options) => {
options?.onPromptResolved?.({
systemPrompt: typeof persona === 'string' ? persona : '',
userInstruction: task,
});
return makeResponse({
persona: 'arch-review',
status: 'blocked',
content: 'Need user clarification before review can continue',
});
});
mock.mockImplementationOnce(async (persona, task, options) => {
options?.onPromptResolved?.({
systemPrompt: typeof persona === 'string' ? persona : '',
userInstruction: task,
});
return makeResponse({ persona: 'security-review', content: '[SECURITY-REVIEW:1] approved' });
});

mockDetectMatchedRuleSequence([
{ index: 0, method: 'phase1_tag' },
]);

const blockedFn = vi.fn();
const abortFn = vi.fn();
engine.on('step:blocked', blockedFn);
engine.on('workflow:abort', abortFn);

const state = await engine.run();

const archReviewOutput = state.stepOutputs.get('arch-review');
expect(archReviewOutput).toBeDefined();
expect(archReviewOutput!.error).toBe('Session resume failed');
expect(archReviewOutput!.content).toBe('');
expect(state.status).toBe('aborted');
expect(blockedFn).toHaveBeenCalledOnce();
expect(abortFn).toHaveBeenCalledOnce();

const reviewersOutput = state.stepOutputs.get('reviewers');
expect(reviewersOutput).toBeDefined();
expect(reviewersOutput!.status).toBe('blocked');
expect(reviewersOutput!.content).toContain('arch-review');
expect(reviewersOutput!.content).toContain('status: blocked');
expect(reviewersOutput!.content).toContain('failureCategory: none');
expect(reviewersOutput!.content).toContain('Need user clarification before review can continue');
expect(reviewersOutput!.content).toContain('aggregate');
expect(state.previousResponseSourcePath).toMatch(
/^\.takt\/runs\/test-report-dir\/context\/previous_responses\/reviewers\.1\.\d{8}T\d{6}Z\.md$/,
);
const snapshot = readFileSync(join(tmpDir, state.previousResponseSourcePath!), 'utf-8');
expect(snapshot).toBe(reviewersOutput!.content);
});

it('should fallback to phase1 rule evaluation when sub-step phase3 throws', async () => {
Expand Down Expand Up @@ -275,9 +372,9 @@ describe('WorkflowEngine Integration: Parallel Step Partial Failure', () => {
});

mockDetectMatchedRuleSequence([
{ index: 0, method: 'phase1_tag' }, // arch-review fallback
{ index: 0, method: 'aggregate' }, // reviewers aggregate
{ index: 0, method: 'phase1_tag' }, // done -> COMPLETE
{ index: 0, method: 'phase1_tag' },
{ index: 0, method: 'aggregate' },
{ index: 0, method: 'phase1_tag' },
]);

const state = await engine.run();
Expand Down
13 changes: 9 additions & 4 deletions src/__tests__/engine-rate-limit-fallback.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ function makeRateLimitedResponse(
): AgentResponse {
return makeResponse({
persona: 'plan',
status: 'rate_limited' as never,
status: 'rate_limited',
content: '',
error: 'Rate limit exceeded. Please try again later.',
errorKind: 'rate_limit',
Expand Down Expand Up @@ -639,7 +639,6 @@ describe('WorkflowEngine rate limit fallback', () => {
});

it('parallel sub-step が rate_limited の場合は all-failed abort ではなく fallback provider で再実行する', async () => {
// Given
const engine = new WorkflowEngine(parallelStepConfig(), tmpDir, 'test task', createEngineOptions(tmpDir, {
rateLimitFallback: {
switchChain: [{ provider: 'codex', model: 'gpt-5' }],
Expand All @@ -657,10 +656,8 @@ describe('WorkflowEngine rate limit fallback', () => {
{ index: 0, method: 'aggregate' },
]);

// When
const state = await engine.run();

// Then
expect(state.status).toBe('completed');
expect(state.iteration).toBe(1);
expect(state.stepIterations.get('reviewers')).toBe(1);
Expand All @@ -673,9 +670,17 @@ describe('WorkflowEngine rate limit fallback', () => {
expect(prompts[2]).toContain('Fallback Execution');
expect(prompts[2]).toContain('claude');
expect(prompts[2]).toContain('codex');
expect(prompts[2]).toContain('arch-review');
expect(prompts[2]).toContain('security-review');
expect(prompts[2]).toContain('Aggregate rules were not evaluated');
expect(prompts[2]).toContain('Rate limit exceeded. Please try again later.');
expect(prompts[3]).toContain('Fallback Execution');
expect(prompts[3]).toContain('claude');
expect(prompts[3]).toContain('codex');
expect(prompts[3]).toContain('arch-review');
expect(prompts[3]).toContain('security-review');
expect(prompts[3]).toContain('Aggregate rules were not evaluated');
expect(prompts[3]).toContain('Rate limit exceeded. Please try again later.');
expect(runAgent).toHaveBeenCalledTimes(4);
expect(detectMatchedRule).toHaveBeenCalledTimes(3);
});
Expand Down
Loading
Loading