Skip to content

Commit f14c374

Browse files
committed
fix(observability): align phase shadow log parity
1 parent 319bda9 commit f14c374

12 files changed

Lines changed: 213 additions & 23 deletions

src/__tests__/sessionLogSpanProcessor.test.ts

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -160,4 +160,53 @@ describe('SessionLogSpanProcessor', () => {
160160
}),
161161
]);
162162
});
163+
164+
it('writes phase start and complete records from the completed phase span snapshot', () => {
165+
const shadowLogPath = createTempLogPath();
166+
const processor = new SessionLogSpanProcessor({
167+
runId: 'run-1',
168+
shadowLogPath,
169+
sanitizedTask: 'task',
170+
workflowName: 'default',
171+
});
172+
const phaseSpan = {
173+
name: 'phase.implement.execute',
174+
startTime: [1_778_777_200, 0],
175+
endTime: [1_778_777_205, 0],
176+
attributes: {
177+
'takt.run.id': 'run-1',
178+
'takt.step.name': 'implement',
179+
'takt.step.iteration': 1,
180+
'takt.phase.number': 1,
181+
'takt.phase.name': 'execute',
182+
'takt.phase.execution_id': 'implement:1:1:1',
183+
'takt.phase.instruction': 'Implement it',
184+
'takt.phase.system_prompt': 'System prompt',
185+
'takt.phase.user_instruction': 'User instruction',
186+
'takt.phase.status': 'done',
187+
'takt.phase.result.content': 'implemented',
188+
},
189+
};
190+
191+
processor.onStart(phaseSpan as unknown as Span, {} as Context);
192+
processor.onEnd(phaseSpan as unknown as ReadableSpan);
193+
194+
expect(readRecords(shadowLogPath)).toEqual([
195+
expect.objectContaining({
196+
type: 'workflow_start',
197+
}),
198+
expect.objectContaining({
199+
type: 'phase_start',
200+
phaseExecutionId: 'implement:1:1:1',
201+
systemPrompt: 'System prompt',
202+
userInstruction: 'User instruction',
203+
}),
204+
expect.objectContaining({
205+
type: 'phase_complete',
206+
phaseExecutionId: 'implement:1:1:1',
207+
status: 'done',
208+
content: 'implemented',
209+
}),
210+
]);
211+
});
163212
});

src/__tests__/span-to-ndjson-mapper.test.ts

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -145,6 +145,8 @@ describe('span-to-ndjson mapper', () => {
145145
'takt.phase.name': 'execute',
146146
'takt.phase.execution_id': 'implement:1:1',
147147
'takt.phase.instruction': 'Implement it',
148+
'takt.phase.system_prompt': 'System prompt',
149+
'takt.phase.user_instruction': 'User instruction',
148150
'takt.phase.status': 'done',
149151
'takt.phase.result.content': 'implemented',
150152
},
@@ -161,6 +163,8 @@ describe('span-to-ndjson mapper', () => {
161163
phaseExecutionId: 'implement:1:1',
162164
timestamp: '2026-05-14T16:46:40.000Z',
163165
instruction: 'Implement it',
166+
systemPrompt: 'System prompt',
167+
userInstruction: 'User instruction',
164168
});
165169
expect(mapSpanEndToNdjson(phaseSpan)).toEqual({
166170
type: 'phase_complete',
@@ -177,6 +181,37 @@ describe('span-to-ndjson mapper', () => {
177181
});
178182
});
179183

184+
it('skips phase spans without execution ids to avoid shadow-only phase records', () => {
185+
const span: SpanSnapshot = {
186+
name: 'phase.implement.execute',
187+
attributes: {
188+
'takt.step.name': 'implement',
189+
'takt.phase.number': 1,
190+
'takt.phase.name': 'execute',
191+
'takt.phase.status': 'done',
192+
},
193+
};
194+
195+
expect(mapSpanStartToNdjson(span)).toBeUndefined();
196+
expect(mapSpanEndToNdjson(span)).toBeUndefined();
197+
});
198+
199+
it('skips phase spans without resolved prompt parts to avoid shadow-only phase records', () => {
200+
const span: SpanSnapshot = {
201+
name: 'phase.implement.execute',
202+
attributes: {
203+
'takt.step.name': 'implement',
204+
'takt.phase.number': 1,
205+
'takt.phase.name': 'execute',
206+
'takt.phase.execution_id': 'implement:1:1:1',
207+
'takt.phase.status': 'done',
208+
},
209+
};
210+
211+
expect(mapSpanStartToNdjson(span)).toBeUndefined();
212+
expect(mapSpanEndToNdjson(span)).toBeUndefined();
213+
});
214+
180215
it('maps judge stage spans into session log compatible judge records', () => {
181216
expect(mapSpanEndToNdjson({
182217
name: 'judge_stage.implement.1.structured_output',

src/__tests__/workflowSpans.test.ts

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -307,13 +307,18 @@ describe('workflow OpenTelemetry spans', () => {
307307
phase: 1,
308308
phaseName: 'execute',
309309
instruction: 'secret execute',
310+
phaseExecutionId: 'implement:3:1:1',
310311
sanitizeText: (text: string) => text.replaceAll('secret', '[REDACTED]'),
311312
providerInfo: {
312313
provider: 'codex',
313314
model: 'gpt-5',
314315
providerSource: 'project',
315316
modelSource: 'global',
316317
},
318+
getPromptParts: () => ({
319+
systemPrompt: 'secret system',
320+
userInstruction: 'secret user',
321+
}),
317322
}, async () => ({ status: 'done', content: 'secret content' }), (result: { status: string; content: string }) => ({
318323
status: result.status,
319324
content: result.content,
@@ -333,7 +338,10 @@ describe('workflow OpenTelemetry spans', () => {
333338
'takt.step.iteration': 3,
334339
'takt.phase.number': 1,
335340
'takt.phase.name': 'execute',
341+
'takt.phase.execution_id': 'implement:3:1:1',
336342
'takt.phase.instruction': '[REDACTED] execute',
343+
'takt.phase.system_prompt': '[REDACTED] system',
344+
'takt.phase.user_instruction': '[REDACTED] user',
337345
'takt.phase.status': 'done',
338346
'takt.phase.result.content': '[REDACTED] content',
339347
'takt.provider.name': 'codex',

src/core/logging/span-to-ndjson-mapper.ts

Lines changed: 27 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -134,7 +134,17 @@ function mapPhaseStart(span: SpanSnapshot): NdjsonPhaseStart | undefined {
134134
const step = getString(span.attributes, 'takt.step.name');
135135
const phase = getPhaseNumber(span.attributes, 'takt.phase.number');
136136
const phaseName = getPhaseName(span.attributes, 'takt.phase.name');
137-
if (!step || phase === undefined || phaseName === undefined) {
137+
const phaseExecutionId = getString(span.attributes, 'takt.phase.execution_id');
138+
const systemPrompt = getString(span.attributes, 'takt.phase.system_prompt');
139+
const userInstruction = getString(span.attributes, 'takt.phase.user_instruction');
140+
if (
141+
!step
142+
|| phase === undefined
143+
|| phaseName === undefined
144+
|| !phaseExecutionId
145+
|| systemPrompt === undefined
146+
|| userInstruction === undefined
147+
) {
138148
return undefined;
139149
}
140150

@@ -145,20 +155,31 @@ function mapPhaseStart(span: SpanSnapshot): NdjsonPhaseStart | undefined {
145155
...getWorkflowStack(span.attributes),
146156
phase,
147157
phaseName,
148-
...optionalString('phaseExecutionId', getString(span.attributes, 'takt.phase.execution_id')),
158+
phaseExecutionId,
149159
timestamp: getTimestamp(span.startTime),
150160
...optionalString('instruction', getString(span.attributes, 'takt.phase.instruction')),
151-
...optionalString('systemPrompt', getString(span.attributes, 'takt.phase.system_prompt')),
152-
...optionalString('userInstruction', getString(span.attributes, 'takt.phase.user_instruction')),
161+
systemPrompt,
162+
userInstruction,
153163
};
154164
}
155165

156166
function mapPhaseComplete(span: SpanSnapshot): NdjsonPhaseComplete | undefined {
157167
const step = getString(span.attributes, 'takt.step.name');
158168
const phase = getPhaseNumber(span.attributes, 'takt.phase.number');
159169
const phaseName = getPhaseName(span.attributes, 'takt.phase.name');
170+
const phaseExecutionId = getString(span.attributes, 'takt.phase.execution_id');
171+
const systemPrompt = getString(span.attributes, 'takt.phase.system_prompt');
172+
const userInstruction = getString(span.attributes, 'takt.phase.user_instruction');
160173
const status = getString(span.attributes, 'takt.phase.status');
161-
if (!step || phase === undefined || phaseName === undefined || !status) {
174+
if (
175+
!step
176+
|| phase === undefined
177+
|| phaseName === undefined
178+
|| !phaseExecutionId
179+
|| systemPrompt === undefined
180+
|| userInstruction === undefined
181+
|| !status
182+
) {
162183
return undefined;
163184
}
164185

@@ -169,7 +190,7 @@ function mapPhaseComplete(span: SpanSnapshot): NdjsonPhaseComplete | undefined {
169190
...getWorkflowStack(span.attributes),
170191
phase,
171192
phaseName,
172-
...optionalString('phaseExecutionId', getString(span.attributes, 'takt.phase.execution_id')),
193+
phaseExecutionId,
173194
status,
174195
...optionalString('content', getString(span.attributes, 'takt.phase.result.content')),
175196
timestamp: getTimestamp(span.endTime),

src/core/workflow/engine/ArpeggioRunner.ts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,7 @@ interface ArpeggioBatchObservability {
101101
readonly phaseExecutionId: string;
102102
readonly sanitizeText?: (text: string) => string;
103103
readonly providerInfo?: StepProviderInfo;
104+
readonly getPromptParts?: () => PhasePromptParts | undefined;
104105
}
105106

106107
/** Execute a single batch with retry logic */
@@ -136,6 +137,7 @@ async function executeBatchWithRetry(
136137
phaseExecutionId: observability.phaseExecutionId,
137138
sanitizeText: observability.sanitizeText,
138139
providerInfo: observability.providerInfo,
140+
getPromptParts: observability.getPromptParts,
139141
}, async () => {
140142
for (let attempt = 0; attempt <= maxRetries; attempt++) {
141143
try {
@@ -362,11 +364,13 @@ export class ArpeggioRunner {
362364
await semaphore.acquire();
363365
try {
364366
let didEmitPhaseStart = false;
367+
let resolvedPromptParts: PhasePromptParts | undefined;
365368
const phaseExecutionId = `${step.name}:1:${stepIteration}:${batch.batchIndex}`;
366369
const batchAgentOptions: RunAgentOptions = {
367370
...agentOptions,
368371
onPromptResolved: (promptParts) => {
369372
if (didEmitPhaseStart) return;
373+
resolvedPromptParts = promptParts;
370374
this.deps.onPhaseStart?.(step, 1, 'execute', promptParts.userInstruction, promptParts, phaseExecutionId, iteration);
371375
didEmitPhaseStart = true;
372376
},
@@ -388,6 +392,7 @@ export class ArpeggioRunner {
388392
phaseExecutionId,
389393
sanitizeText: this.deps.sanitizeObservabilityText,
390394
providerInfo,
395+
getPromptParts: () => resolvedPromptParts,
391396
},
392397
runtime,
393398
);

src/core/workflow/engine/ParallelRunner.ts

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ import type { ParallelLoggerOptions } from './parallel-logger.js';
2727
import type { StructuredCaller } from '../../../agents/structured-caller.js';
2828
import { runWithPhaseSpan } from '../observability/workflowSpans.js';
2929
import type { QualityGateRunResult } from '../quality-gates/types.js';
30+
import { buildPhaseExecutionId } from '../../../shared/utils/phaseExecutionId.js';
3031

3132
const log = createLogger('parallel-runner');
3233

@@ -199,13 +200,21 @@ export class ParallelRunner {
199200
// Phase 1: main execution (Write excluded if sub-step has report)
200201
const baseOptions = this.deps.optionsBuilder.buildAgentOptions(subStep, runtime);
201202
let didEmitPhaseStart = false;
203+
let resolvedPromptParts: PhasePromptParts | undefined;
204+
const phaseExecutionId = buildPhaseExecutionId({
205+
step: subStep.name,
206+
iteration: parentIteration,
207+
phase: 1,
208+
sequence: 1,
209+
});
202210

203211
// Override onStream with parallel logger's prefixed handler (immutable)
204212
const agentOptions = parallelLogger
205213
? { ...baseOptions, onStream: parallelLogger.createStreamHandler(subStep.name, index) }
206214
: { ...baseOptions };
207215
agentOptions.onPromptResolved = (promptParts: PhasePromptParts) => {
208-
this.deps.onPhaseStart?.(subStep, 1, 'execute', subInstruction, promptParts, undefined, parentIteration);
216+
resolvedPromptParts = promptParts;
217+
this.deps.onPhaseStart?.(subStep, 1, 'execute', subInstruction, promptParts, phaseExecutionId, parentIteration);
209218
didEmitPhaseStart = true;
210219
};
211220
const subResponse = await runWithPhaseSpan({
@@ -217,8 +226,10 @@ export class ParallelRunner {
217226
phase: 1,
218227
phaseName: 'execute',
219228
instruction: subInstruction,
229+
phaseExecutionId,
220230
sanitizeText: this.deps.sanitizeObservabilityText,
221231
providerInfo: subPm,
232+
getPromptParts: () => resolvedPromptParts,
222233
}, () => executeAgent(subStep.persona, subInstruction, agentOptions), (result) => ({
223234
status: result.status,
224235
content: result.content,
@@ -228,7 +239,7 @@ export class ParallelRunner {
228239
throw new Error(`Missing prompt parts for phase start: ${subStep.name}:1`);
229240
}
230241
updatePersonaSession(subSessionKey, subResponse.sessionId);
231-
this.deps.onPhaseComplete?.(subStep, 1, 'execute', subResponse.content, subResponse.status, subResponse.error, undefined, parentIteration);
242+
this.deps.onPhaseComplete?.(subStep, 1, 'execute', subResponse.content, subResponse.status, subResponse.error, phaseExecutionId, parentIteration);
232243
if (subResponse.status === 'error' || subResponse.status === 'blocked' || subResponse.status === 'rate_limited') {
233244
state.stepOutputs.set(subStep.name, subResponse);
234245
return { subStep, response: subResponse, instruction: subInstruction, providerInfo: subPm };

src/core/workflow/engine/StepExecutor.ts

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ import { validateStructuredOutputAgainstSchema } from './structured-output-schem
3636
import { providerSupportsStructuredOutput } from '../../../infra/providers/provider-capabilities.js';
3737
import { resolveReportHandles } from '../instruction/report-handles.js';
3838
import { AGENT_FAILURE_CATEGORIES } from '../../../shared/types/agent-failure.js';
39+
import { buildPhaseExecutionId } from '../../../shared/utils/phaseExecutionId.js';
3940
import type {
4041
StructuredOutputFailureReason,
4142
StructuredOutputNormalizerRegistry,
@@ -535,12 +536,20 @@ export class StepExecutor {
535536

536537
// Phase 1: main execution (Write excluded if step has report)
537538
let didEmitPhaseStart = false;
539+
let resolvedPromptParts: PhasePromptParts | undefined;
540+
const phaseExecutionId = buildPhaseExecutionId({
541+
step: step.name,
542+
iteration: state.iteration,
543+
phase: 1,
544+
sequence: 1,
545+
});
538546
const providerInfo = this.deps.optionsBuilder.resolveStepProviderModel(step, runtime);
539547
const baseAgentOptions = this.deps.optionsBuilder.buildAgentOptions(step, runtime);
540548
const agentOptions = {
541549
...baseAgentOptions,
542550
onPromptResolved: (promptParts: PhasePromptParts) => {
543-
this.deps.onPhaseStart?.(step, 1, 'execute', phase1Instruction, promptParts, undefined, state.iteration);
551+
resolvedPromptParts = promptParts;
552+
this.deps.onPhaseStart?.(step, 1, 'execute', phase1Instruction, promptParts, phaseExecutionId, state.iteration);
544553
didEmitPhaseStart = true;
545554
},
546555
};
@@ -553,8 +562,10 @@ export class StepExecutor {
553562
phase: 1,
554563
phaseName: 'execute',
555564
instruction: phase1Instruction,
565+
phaseExecutionId,
556566
sanitizeText: this.deps.sanitizeObservabilityText,
557567
providerInfo,
568+
getPromptParts: () => resolvedPromptParts,
558569
}, () => executeAgent(step.persona, phase1Instruction, agentOptions), (result) => ({
559570
status: result.status,
560571
content: result.content,
@@ -565,7 +576,7 @@ export class StepExecutor {
565576
throw new Error(`Missing prompt parts for phase start: ${step.name}:1`);
566577
}
567578
updatePersonaSession(sessionKey, response.sessionId);
568-
this.deps.onPhaseComplete?.(step, 1, 'execute', response.content, response.status, response.error, undefined, state.iteration);
579+
this.deps.onPhaseComplete?.(step, 1, 'execute', response.content, response.status, response.error, phaseExecutionId, state.iteration);
569580

570581
// Provider failures should abort immediately.
571582
if (response.status === 'error' || response.status === 'rate_limited') {

src/core/workflow/engine/TeamLeaderRunner.ts

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import type { WorkflowEngineOptions, PhaseName, PhasePromptParts } from '../type
2525
import type { RuntimeStepResolution, StepRunResult } from '../types.js';
2626
import { buildTeamLeaderErrorPartResult, runTeamLeaderPart } from './team-leader-part-runner.js';
2727
import { runWithPhaseSpan } from '../observability/workflowSpans.js';
28+
import { buildPhaseExecutionId } from '../../../shared/utils/phaseExecutionId.js';
2829

2930
const log = createLogger('team-leader-runner');
3031
const MAX_TOTAL_PARTS = 20;
@@ -103,6 +104,13 @@ export class TeamLeaderRunner {
103104

104105
emitTeamLeaderProgressHint(this.deps.engineOptions, 'decompose');
105106
let didEmitPhaseStart = false;
107+
let resolvedPromptParts: PhasePromptParts | undefined;
108+
const phaseExecutionId = buildPhaseExecutionId({
109+
step: leaderStep.name,
110+
iteration: parentIteration,
111+
phase: 1,
112+
sequence: 1,
113+
});
106114
const structuredCaller = this.deps.engineOptions.structuredCaller;
107115
if (!structuredCaller) {
108116
throw new Error('structuredCaller is required for team leader execution');
@@ -117,8 +125,10 @@ export class TeamLeaderRunner {
117125
phase: 1,
118126
phaseName: 'execute',
119127
instruction,
128+
phaseExecutionId,
120129
sanitizeText: this.deps.sanitizeObservabilityText,
121130
providerInfo: leaderProviderInfo,
131+
getPromptParts: () => resolvedPromptParts,
122132
},
123133
() => structuredCaller.decomposeTask(instruction, teamLeaderConfig.maxParts, {
124134
cwd: this.deps.getCwd(),
@@ -132,7 +142,8 @@ export class TeamLeaderRunner {
132142
onStream: this.deps.engineOptions.onStream,
133143
onPromptResolved: (promptParts) => {
134144
if (didEmitPhaseStart) return;
135-
this.deps.onPhaseStart?.(leaderStep, 1, 'execute', promptParts.userInstruction, promptParts, undefined, parentIteration);
145+
resolvedPromptParts = promptParts;
146+
this.deps.onPhaseStart?.(leaderStep, 1, 'execute', promptParts.userInstruction, promptParts, phaseExecutionId, parentIteration);
136147
didEmitPhaseStart = true;
137148
},
138149
}), (result) => ({
@@ -149,7 +160,7 @@ export class TeamLeaderRunner {
149160
content: JSON.stringify({ parts }, null, 2),
150161
timestamp: new Date(),
151162
};
152-
this.deps.onPhaseComplete?.(leaderStep, 1, 'execute', leaderResponse.content, leaderResponse.status, leaderResponse.error, undefined, parentIteration);
163+
this.deps.onPhaseComplete?.(leaderStep, 1, 'execute', leaderResponse.content, leaderResponse.status, leaderResponse.error, phaseExecutionId, parentIteration);
153164
log.debug('Team leader decomposed parts', {
154165
step: step.name,
155166
partCount: parts.length,

0 commit comments

Comments
 (0)