Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions docs/task-management.ja.md
Original file line number Diff line number Diff line change
Expand Up @@ -133,9 +133,9 @@ concurrency が 1 より大きい場合、TAKT はワーカープールを使用
- タスクごとに色分けされたプレフィックス付き出力で読みやすさを確保
- Ctrl+C でのグレースフルシャットダウン(実行中タスクの完了を待機)

### 中断されたタスクの復旧
### 中断されたタスクのクリーンアップ

`takt run` が中断された場合(プロセスクラッシュ、Ctrl+C など)、`running` ステータスのまま残ったタスクは次回の `takt run` または `takt watch` 起動時に自動的に `pending` に復旧されます
`takt run` が中断された場合(プロセスクラッシュ、Ctrl+C など)、`running` ステータスのまま残ったタスクは次回の `takt run` または `takt watch` 起動時に自動的に `failed` にマークされます。再実行する場合は明示的に requeue してください

## タスクの監視(`takt watch`)

Expand All @@ -150,7 +150,7 @@ watch コマンドの動作は次の通りです。
- Ctrl+C(SIGINT)まで実行を継続
- `tasks.yaml` の新しい `pending` タスクを監視
- タスクが現れるたびに実行
- 起動時に中断された `running` タスクを復旧
- 起動時に中断された `running` タスクを `failed` にマーク
- 終了時に合計/成功/失敗タスク数のサマリを表示

これは「プロデューサー-コンシューマー」ワークフローに便利です。一方のターミナルで `takt add` でタスクを追加し、もう一方で `takt watch` がそれらを自動実行します。
Expand Down
6 changes: 3 additions & 3 deletions docs/task-management.md
Original file line number Diff line number Diff line change
Expand Up @@ -133,9 +133,9 @@ When concurrency is greater than 1, TAKT uses a worker pool that:
- Displays color-coded prefixed output per task for readability
- Supports graceful shutdown on Ctrl+C (waits for in-flight tasks to complete)

### Interrupted Task Recovery
### Interrupted Task Cleanup

If `takt run` is interrupted (e.g., process crash, Ctrl+C), tasks left in `running` status are automatically recovered to `pending` on the next `takt run` or `takt watch` invocation.
If `takt run` is interrupted (e.g., process crash, Ctrl+C), tasks left in `running` status are automatically marked as `failed` on the next `takt run` or `takt watch` invocation. Requeue them explicitly to run them again.

## Watching Tasks (`takt watch`)

Expand All @@ -150,7 +150,7 @@ The watch command:
- Stays running until Ctrl+C (SIGINT)
- Monitors `tasks.yaml` for new `pending` tasks
- Executes each task as it appears
- Recovers interrupted `running` tasks on startup
- Marks interrupted `running` tasks as `failed` on startup
- Displays a summary of total/success/failed tasks on exit

This is useful for a "producer-consumer" workflow where you add tasks with `takt add` in one terminal and let `takt watch` execute them automatically in another.
Expand Down
14 changes: 7 additions & 7 deletions docs/testing/e2e.md
Original file line number Diff line number Diff line change
Expand Up @@ -122,20 +122,20 @@ E2Eテストを追加・変更した場合は、このドキュメントも更
- `.takt/tasks.yaml` に pending タスクを追加する(`workflow` に `e2e/fixtures/workflows/mock-single-step.yaml` を指定)。
- 出力に `Task "watch-task" completed` が含まれることを確認する。
- `Ctrl+C` で終了する。
- Run recovery and high-priority run flows(`e2e/specs/run-recovery.e2e.ts`)
- 目的: 高優先度ユースケース(異常終了リカバリー、並列実行、初期化〜add〜run)をまとめて確認。
- Run interrupted task cleanup and high-priority run flows(`e2e/specs/run-recovery.e2e.ts`)
- 目的: 高優先度ユースケース(異常終了したrunningタスクのfailed化、並列実行、初期化〜add〜run)をまとめて確認。
- LLM: 呼び出さない(`--provider mock` 固定)
- 手順(ユーザー行動/コマンド):
- 異常終了リカバリー:
- 異常終了したrunningタスクのfailed化:
- `.takt/tasks.yaml` に pending タスク2件を投入し、`takt run --provider mock` 実行中にプロセスを強制終了する。
- 再度 `takt run --provider mock` を実行し、`Recovered 1 interrupted running task(s) to pending.` が出力されることを確認する。
- 復旧対象を含む全タスクが完了し、`.takt/tasks.yaml` が空になることを確認する
- 再度 `takt run --provider mock` を実行し、`Marked 1 interrupted running task(s) as failed.` が出力されることを確認する。
- 異常終了時にrunningだったタスクはfailedで残り、残りのpendingタスクだけが完了することを確認する
- 高並列実行:
- `concurrency: 10` を設定し、pending タスク12件を投入して `takt run --provider mock` を実行する。
- 出力に `Concurrency: 10` と `Tasks Summary` が含まれること、および `.takt/tasks.yaml` が空になることを確認する
- 出力に `Concurrency: 10` と `Tasks Summary` が含まれること、および全タスクが completed 履歴として残ることを確認する
- 初期化〜add〜run:
- グローバル `config.yaml` 不在の環境で `takt add` を2回実行し、`takt run --provider mock` を実行する。
- タスク実行完了後に `.takt/tasks/` 配下の2タスクディレクトリ生成、`.takt/.gitignore` 生成、`.takt/tasks.yaml` の空状態を確認する
- タスク実行完了後に `.takt/tasks/` 配下の2タスクディレクトリ生成、`.takt/.gitignore` 生成、`.takt/tasks.yaml` に2件の completed 履歴が残ることを確認する
- Run tasks graceful shutdown on SIGINT(`e2e/specs/run-sigint-graceful.e2e.ts`)
- 目的: `takt run` を並列実行中に `Ctrl+C` した際、新規クローン投入を止めてグレースフルに終了することを確認。
- LLM: 呼び出さない(`--provider mock` 固定)
Expand Down
154 changes: 101 additions & 53 deletions e2e/specs/run-recovery.e2e.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import {
updateIsolatedConfig,
type IsolatedEnv,
} from '../helpers/isolated-env';
import { runTakt } from '../helpers/takt-runner';
import { formatTaktRunResult, runTakt } from '../helpers/takt-runner';

const __filename = fileURLToPath(import.meta.url);
const __dirname = dirname(__filename);
Expand All @@ -33,6 +33,9 @@ interface TaskRecord {
status: 'pending' | 'running' | 'failed' | 'completed';
owner_pid?: number | null;
workflow?: string;
failure?: {
error?: string;
};
}

function createLocalRepo(): LocalRepo {
Expand Down Expand Up @@ -118,6 +121,7 @@ function createEnvWithoutGlobalConfig(): {
TAKT_CONFIG_DIR: globalConfigDir,
GIT_CONFIG_GLOBAL: globalGitConfigPath,
TAKT_NO_TTY: '1',
TAKT_NOTIFY_WEBHOOK: undefined,
},
globalConfigPath,
cleanup: () => {
Expand All @@ -127,7 +131,7 @@ function createEnvWithoutGlobalConfig(): {
}

// E2E更新時は docs/testing/e2e.md も更新すること
describe('E2E: Run interrupted task recovery and high-priority run flows', () => {
describe('E2E: Run interrupted task cleanup and high-priority run flows', () => {
let isolatedEnv: IsolatedEnv;
let repo: LocalRepo;

Expand All @@ -141,13 +145,13 @@ describe('E2E: Run interrupted task recovery and high-priority run flows', () =>
isolatedEnv.cleanup();
});

it('should recover stale running task generated by forced process termination', async () => {
it('should fail stale running task generated by forced process termination', async () => {
// Given: 2 pending tasks exist, then first run is force-killed while task is running
updateIsolatedConfig(isolatedEnv.taktDir, {
provider: 'mock',
model: 'mock-model',
concurrency: 1,
task_poll_interval_ms: 50,
task_poll_interval_ms: 100,
});

const workflowPath = resolve(__dirname, '../fixtures/workflows/mock-slow-multi-step.yaml');
Expand Down Expand Up @@ -176,49 +180,71 @@ describe('E2E: Run interrupted task recovery and high-priority run flows', () =>
firstStderr += chunk.toString();
});

const runningObserved = await waitFor(() => {
if (!existsSync(tasksFile)) {
return false;
}
const tasks = readTasks(tasksFile);
return tasks.some((task) => task.status === 'running');
}, 30_000, 20);

expect(runningObserved, `stdout:\n${firstStdout}\n\nstderr:\n${firstStderr}`).toBe(true);

child.kill('SIGKILL');

await new Promise<void>((resolvePromise) => {
let childClosed = false;
const childClosedPromise = new Promise<void>((resolvePromise) => {
child.once('close', () => {
childClosed = true;
resolvePromise();
});
});

const staleTasks = readTasks(tasksFile);
const runningTask = staleTasks.find((task) => task.status === 'running');
expect(runningTask).toBeDefined();
expect(runningTask?.owner_pid).toBeTypeOf('number');

// When: run is executed again
const rerunResult = runTakt({
args: ['run', '--provider', 'mock'],
cwd: repo.path,
env: {
...isolatedEnv.env,
TAKT_MOCK_SCENARIO: scenarioPath,
},
timeout: 240_000,
});

// Then: stale running task is recovered and all tasks complete
expect(rerunResult.exitCode).toBe(0);
const combined = rerunResult.stdout + rerunResult.stderr;
expect(combined).toContain('Recovered 1 interrupted running task(s) to pending.');
expect(combined).toContain('recovery-target-1');
expect(combined).toContain('recovery-target-2');
try {
const runningObserved = await waitFor(() => {
if (!existsSync(tasksFile)) {
return false;
}
const tasks = readTasks(tasksFile);
return tasks.some((task) => task.status === 'running');
}, 30_000, 20);

expect(runningObserved, `stdout:\n${firstStdout}\n\nstderr:\n${firstStderr}`).toBe(true);

child.kill('SIGKILL');
await childClosedPromise;

const staleTasks = readTasks(tasksFile);
const runningTask = staleTasks.find((task) => task.status === 'running');
expect(runningTask).toBeDefined();
expect(runningTask?.owner_pid).toBeTypeOf('number');

const rerunResult = runTakt({
args: ['run', '--provider', 'mock'],
cwd: repo.path,
env: {
...isolatedEnv.env,
TAKT_MOCK_SCENARIO: scenarioPath,
},
timeout: 240_000,
});

const finalTasks = readTasks(tasksFile);
expect(finalTasks).toEqual([]);
expect(rerunResult.exitCode, formatTaktRunResult(rerunResult)).toBe(0);
const combined = rerunResult.stdout + rerunResult.stderr;
expect(combined).toContain('Marked 1 interrupted running task(s) as failed.');
expect(combined).toContain('recovery-target-2');

const finalTasks = readTasks(tasksFile);
expect(finalTasks).toEqual(expect.arrayContaining([
expect.objectContaining({
name: 'recovery-target-1',
status: 'failed',
owner_pid: null,
failure: {
error: 'Task was interrupted before this TAKT run started. Requeue it explicitly to run again.',
},
}),
expect.objectContaining({
name: 'recovery-target-2',
status: 'completed',
owner_pid: null,
}),
]));
expect(finalTasks).toHaveLength(2);
} finally {
if (!childClosed) {
child.kill('SIGKILL');
await childClosedPromise;
}
}
}, 240_000);

it('should process high-concurrency batch without leaving inconsistent task state', () => {
Expand All @@ -227,7 +253,7 @@ describe('E2E: Run interrupted task recovery and high-priority run flows', () =>
provider: 'mock',
model: 'mock-model',
concurrency: 10,
task_poll_interval_ms: 50,
task_poll_interval_ms: 100,
});

const workflowPath = resolve(__dirname, '../fixtures/workflows/mock-single-step.yaml');
Expand All @@ -248,46 +274,55 @@ describe('E2E: Run interrupted task recovery and high-priority run flows', () =>
timeout: 240_000,
});

// Then: all tasks complete and queue becomes empty
expect(result.exitCode).toBe(0);
expect(result.exitCode, formatTaktRunResult(result)).toBe(0);
expect(result.stdout).toContain('Concurrency: 10');
expect(result.stdout).toContain('Tasks Summary');
const finalTasks = readTasks(tasksFile);
expect(finalTasks).toEqual([]);
expect(finalTasks).toHaveLength(12);
expect(finalTasks).toEqual(
expect.arrayContaining(
Array.from({ length: 12 }, (_, index) => expect.objectContaining({
name: `parallel-load-${String(index + 1)}`,
status: 'completed',
owner_pid: null,
})),
),
);
}, 240_000);

it('should initialize project dirs and execute tasks after add+run when global config is absent', () => {
const envWithoutConfig = createEnvWithoutGlobalConfig();

try {
// Given: global config.yaml is absent and project config points to a mock workflow path
const workflowPath = resolve(__dirname, '../fixtures/workflows/mock-single-step.yaml');
const scenarioPath = resolve(__dirname, '../fixtures/scenarios/execute-done.json');
const projectConfigDir = join(repo.path, '.takt');
const projectConfigPath = join(projectConfigDir, 'config.yaml');
mkdirSync(projectConfigDir, { recursive: true });
writeFileSync(projectConfigPath, `workflow: ${workflowPath}\npermissionMode: default\n`, 'utf-8');
writeFileSync(projectConfigPath, 'provider: mock\nmodel: mock-model\n', 'utf-8');

expect(existsSync(envWithoutConfig.globalConfigPath)).toBe(false);

// When: add 2 tasks and run once
const addResult1 = runTakt({
args: ['--provider', 'mock', 'add', 'Initialize flow task 1'],
args: ['--provider', 'mock', '--workflow', workflowPath, 'add', 'Initialize flow task 1'],
cwd: repo.path,
env: {
...envWithoutConfig.env,
TAKT_MOCK_SCENARIO: scenarioPath,
},
input: 'n\n',
timeout: 240_000,
});

const addResult2 = runTakt({
args: ['--provider', 'mock', 'add', 'Initialize flow task 2'],
args: ['--provider', 'mock', '--workflow', workflowPath, 'add', 'Initialize flow task 2'],
cwd: repo.path,
env: {
...envWithoutConfig.env,
TAKT_MOCK_SCENARIO: scenarioPath,
},
input: 'n\n',
timeout: 240_000,
});

Expand All @@ -302,13 +337,26 @@ describe('E2E: Run interrupted task recovery and high-priority run flows', () =>
});

// Then: tasks are persisted/executed correctly and project init artifacts exist
expect(addResult1.exitCode).toBe(0);
expect(addResult2.exitCode).toBe(0);
expect(runResult.exitCode).toBe(0);
expect(addResult1.exitCode, formatTaktRunResult(addResult1)).toBe(0);
expect(addResult2.exitCode, formatTaktRunResult(addResult2)).toBe(0);
expect(runResult.exitCode, formatTaktRunResult(runResult)).toBe(0);

const tasksFile = join(repo.path, '.takt', 'tasks.yaml');
const parsedFinal = parseYaml(readFileSync(tasksFile, 'utf-8')) as { tasks?: TaskRecord[] };
expect(parsedFinal.tasks).toEqual([]);
expect(parsedFinal.tasks).toEqual([
expect.objectContaining({
name: 'initialize-flow-task-1',
summary: 'Initialize flow task 1',
status: 'completed',
owner_pid: null,
}),
expect.objectContaining({
name: 'initialize-flow-task-2',
summary: 'Initialize flow task 2',
status: 'completed',
owner_pid: null,
}),
]);

const taskDirsRoot = join(repo.path, '.takt', 'tasks');
const taskDirs = readdirSync(taskDirsRoot, { withFileTypes: true })
Expand Down
8 changes: 7 additions & 1 deletion src/__tests__/clone.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1412,11 +1412,17 @@ describe('autoFetch: true — fetch, rev-parse origin/<branch>, reset --hard', (
taskSlug: 'autofetch-task',
});

expect(fetchCalls).toHaveLength(2);
expect(fetchCalls).toHaveLength(3);
expect(fetchCalls[0]![0]).toBe('fetch');
expect(fetchCalls[0]![1]).toBe('origin');
expect(fetchCalls[0]![2]).toMatch(/^takt\/\d{8}T\d{4}-autofetch-task$/);
expect(fetchCalls[1]).toEqual(['fetch', 'origin']);
expect(fetchCalls[2]).toEqual([
'fetch',
'--no-write-fetch-head',
'/project-autofetch-test',
'refs/remotes/origin/main:refs/takt/base/main',
]);

expect(revParseOriginCalls).toHaveLength(1);
expect(revParseOriginCalls[0]).toEqual(['rev-parse', 'origin/main']);
Expand Down
Loading
Loading