Skip to content

Commit b6a5532

Browse files
committed
fix(observability): harden phase usage exporter plumbing
1 parent 1cafa58 commit b6a5532

9 files changed

Lines changed: 146 additions & 134 deletions

File tree

src/__tests__/logging-contracts.test.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,8 @@ describe('logging contracts', () => {
7272
);
7373

7474
expect(record.usage.cached_input_tokens).toBe(4);
75+
expect(record.usage.cache_creation_input_tokens).toBe(2);
76+
expect(record.usage.cache_read_input_tokens).toBe(2);
7577
});
7678

7779
it('should reject usage_missing records with unknown reason values', () => {

src/__tests__/otelFoundation.test.ts

Lines changed: 15 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -339,66 +339,38 @@ describe('otel foundation', () => {
339339
}
340340
});
341341

342-
it('should release the shared SDK when run exporter registration fails', async () => {
342+
it('should reject mismatched run-scoped exporter ids before starting the SDK', async () => {
343343
const foundation = await loadFoundationWithMockedSdk();
344344
const tempDir = mkdtempSync(join(tmpdir(), 'takt-otel-registration-failure-'));
345-
const firstPhaseUsageLogPath = join(tempDir, 'first-usage-events.phase.jsonl');
346-
const secondShadowLogPath = join(tempDir, 'second-otel-session-shadow.jsonl');
347-
const secondPhaseUsageLogPath = join(tempDir, 'second-usage-events.phase.jsonl');
348-
let first: { shutdown(): Promise<void> } | undefined;
345+
const shadowLogPath = join(tempDir, 'session-otel-session-shadow.jsonl');
346+
const phaseUsageLogPath = join(tempDir, 'session-usage-events.phase.jsonl');
347+
const monitorPath = join(tempDir, 'monitor.json');
349348

350349
try {
351-
first = await foundation.initializeOtelFoundation(
352-
enabledUsageEventsPhaseObservability,
353-
{
354-
usageEventsExporter: {
355-
runId: 'run-1',
356-
sessionId: 'session-1',
357-
phaseUsageLogPath: firstPhaseUsageLogPath,
358-
},
359-
},
360-
);
361-
362350
await expect(foundation.initializeOtelFoundation(
363351
enabledAllObservability,
364352
{
365353
sessionLogExporter: {
366354
runId: 'run-2',
367-
shadowLogPath: secondShadowLogPath,
368-
sanitizedTask: 'second task',
355+
shadowLogPath,
356+
sanitizedTask: 'task',
369357
workflowName: 'default',
370358
},
371359
usageEventsExporter: {
372360
runId: 'run-1',
373-
sessionId: 'session-duplicate',
374-
phaseUsageLogPath: secondPhaseUsageLogPath,
361+
sessionId: 'session-1',
362+
phaseUsageLogPath,
363+
},
364+
monitorJsonExporter: {
365+
runId: 'run-2',
366+
monitorPath,
375367
},
376368
},
377-
)).rejects.toThrow('Phase usage event exporter is already registered for runId: run-1');
378-
379-
const sessionLogProcessor = foundation.constructedOptions[0]?.spanProcessors?.[0] as {
380-
onStart(span: unknown, parentContext: unknown): void;
381-
};
382-
sessionLogProcessor.onStart({
383-
name: 'step.implement',
384-
startTime: [1_778_777_205, 0],
385-
attributes: {
386-
'takt.run.id': 'run-2',
387-
'takt.step.name': 'implement',
388-
'takt.step.persona': 'coder',
389-
'takt.step.iteration': 1,
390-
},
391-
}, {});
392-
const secondShadowRecords = readFileSync(secondShadowLogPath, 'utf-8').trim().split('\n');
393-
expect(secondShadowRecords).toHaveLength(1);
369+
)).rejects.toThrow('Run-scoped OpenTelemetry exporters must share the same runId');
394370

395-
await first.shutdown();
396-
397-
expect(foundation.shutdownMock).toHaveBeenCalledOnce();
371+
expect(foundation.startMock).not.toHaveBeenCalled();
372+
expect(foundation.constructedOptions).toEqual([]);
398373
} finally {
399-
if (first) {
400-
await first.shutdown();
401-
}
402374
rmSync(tempDir, { recursive: true, force: true });
403375
}
404376
});

src/__tests__/usageEventsSpanProcessor.test.ts

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ describe('UsageEventsSpanProcessor', () => {
6363
]);
6464
});
6565

66-
it('rejects duplicate runId registrations', () => {
66+
it('ignores duplicate runId registrations without redirecting existing output', () => {
6767
const firstLogPath = createTempLogPath('first-usage-events.phase.jsonl');
6868
const secondLogPath = createTempLogPath('second-usage-events.phase.jsonl');
6969
const processor = new UsageEventsSpanProcessor();
@@ -77,7 +77,17 @@ describe('UsageEventsSpanProcessor', () => {
7777
runId: 'run-1',
7878
sessionId: 'session-duplicate',
7979
phaseUsageLogPath: secondLogPath,
80-
})).toThrow('Phase usage event exporter is already registered for runId: run-1');
80+
})).not.toThrow();
81+
82+
processor.onEnd(makePhaseSpan('run-1') as unknown as ReadableSpan);
83+
84+
expect(readRecords(firstLogPath)).toEqual([
85+
expect.objectContaining({
86+
run_id: 'run-1',
87+
session_id: 'session-1',
88+
phase: 'phase1_execute',
89+
}),
90+
]);
8191
expect(existsSync(secondLogPath)).toBe(false);
8292
});
8393

src/agents/judge-status-usecase.ts

Lines changed: 38 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -16,20 +16,24 @@ export interface JudgeStatusOptions {
1616
language?: Language;
1717
interactive?: boolean;
1818
onStream?: StreamCallback;
19-
onJudgeStage?: (entry: {
20-
stage: 1 | 2 | 3;
21-
method: 'structured_output' | 'phase3_tag' | 'ai_judge';
22-
status: 'done' | 'error' | 'skipped';
23-
instruction: string;
24-
response: string;
25-
providerUsage?: ProviderUsageSnapshot;
26-
}) => void;
19+
onJudgeStage?: (entry: JudgeStageLogEntry) => void;
2720
onStructuredPromptResolved?: (promptParts: {
2821
systemPrompt: string;
2922
userInstruction: string;
3023
}) => void;
3124
}
3225

26+
export interface JudgeStageLogEntry {
27+
stage: 1 | 2 | 3;
28+
method: 'structured_output' | 'phase3_tag' | 'ai_judge';
29+
status: 'done' | 'error' | 'skipped';
30+
instruction: string;
31+
response: string;
32+
providerUsage?: ProviderUsageSnapshot;
33+
}
34+
35+
type JudgeResponseEntry = Pick<JudgeStageLogEntry, 'instruction' | 'status' | 'response' | 'providerUsage'>;
36+
3337
export interface TagJudgeRunOptions {
3438
cwd: string;
3539
provider?: ProviderType;
@@ -133,6 +137,28 @@ export async function evaluateCondition(
133137
return detectJudgeIndex(response.content);
134138
}
135139

140+
export function createJudgeStageRecorder(): {
141+
capture(entry: JudgeResponseEntry): void;
142+
stage(entry: Pick<JudgeStageLogEntry, 'stage' | 'method'>): JudgeStageLogEntry;
143+
} {
144+
let latest: JudgeResponseEntry = {
145+
status: 'skipped',
146+
instruction: '',
147+
response: '',
148+
};
149+
return {
150+
capture(entry): void {
151+
latest = entry;
152+
},
153+
stage(entry): JudgeStageLogEntry {
154+
return {
155+
...entry,
156+
...latest,
157+
};
158+
},
159+
};
160+
}
161+
136162
export async function judgeStatus(
137163
structuredInstruction: string,
138164
tagInstruction: string,
@@ -207,32 +233,20 @@ export async function judgeStatus(
207233
const conditions = buildJudgeConditions(rules, interactiveEnabled);
208234

209235
if (conditions.length > 0) {
210-
let stage3Status: 'done' | 'error' | 'skipped' = 'skipped';
211-
let stage3Instruction = '';
212-
let stage3Response = '';
213-
let stage3ProviderUsage: ProviderUsageSnapshot | undefined;
236+
const stage3 = createJudgeStageRecorder();
214237
const normalizedConditions = conditions.map((c, pos) => ({ index: pos, text: c.text }));
215238
const fallbackPosition = await evaluateCondition(structuredInstruction, normalizedConditions, {
216239
cwd: options.cwd,
217240
provider: options.provider,
218241
resolvedProvider: options.resolvedProvider,
219242
resolvedModel: options.resolvedModel,
220-
onJudgeResponse: (entry) => {
221-
stage3Status = entry.status;
222-
stage3Instruction = entry.instruction;
223-
stage3Response = entry.response;
224-
stage3ProviderUsage = entry.providerUsage;
225-
},
243+
onJudgeResponse: stage3.capture,
226244
});
227245

228-
options.onJudgeStage?.({
246+
options.onJudgeStage?.(stage3.stage({
229247
stage: 3,
230248
method: 'ai_judge',
231-
status: stage3Status,
232-
instruction: stage3Instruction,
233-
response: stage3Response,
234-
providerUsage: stage3ProviderUsage,
235-
});
249+
}));
236250

237251
if (fallbackPosition >= 0 && fallbackPosition < conditions.length) {
238252
const originalIndex = conditions[fallbackPosition]?.index;

src/agents/structured-caller/prompt-based-structured-caller.ts

Lines changed: 5 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
import type { WorkflowRule, PartDefinition } from '../../core/models/types.js';
2-
import type { ProviderUsageSnapshot } from '../../core/models/response.js';
32
import {
43
buildPromptBasedDecomposePrompt,
54
buildPromptBasedMorePartsPrompt,
@@ -8,6 +7,7 @@ import {
87
import { buildJudgePrompt, detectJudgeIndex, isValidRuleIndex, buildJudgeConditions } from '../judge-utils.js';
98
import { runAgent } from '../runner.js';
109
import {
10+
createJudgeStageRecorder,
1111
runTagJudgeStage,
1212
type EvaluateConditionOptions,
1313
type JudgeStatusOptions,
@@ -102,31 +102,19 @@ export class PromptBasedStructuredCaller implements StructuredCaller {
102102
const conditions = buildJudgeConditions(rules, interactiveEnabled);
103103

104104
if (conditions.length > 0) {
105-
let stage3Status: 'done' | 'error' | 'skipped' = 'skipped';
106-
let stage3Instruction = '';
107-
let stage3Response = '';
108-
let stage3ProviderUsage: ProviderUsageSnapshot | undefined;
105+
const stage3 = createJudgeStageRecorder();
109106
const fallbackIndex = await this.evaluateCondition(structuredInstruction, conditions, {
110107
cwd: options.cwd,
111108
provider: options.provider,
112109
resolvedProvider: options.resolvedProvider,
113110
resolvedModel: options.resolvedModel,
114-
onJudgeResponse: (entry) => {
115-
stage3Status = entry.status;
116-
stage3Instruction = entry.instruction;
117-
stage3Response = entry.response;
118-
stage3ProviderUsage = entry.providerUsage;
119-
},
111+
onJudgeResponse: stage3.capture,
120112
});
121113

122-
options.onJudgeStage?.({
114+
options.onJudgeStage?.(stage3.stage({
123115
stage: 3,
124116
method: 'ai_judge',
125-
status: stage3Status,
126-
instruction: stage3Instruction,
127-
response: stage3Response,
128-
providerUsage: stage3ProviderUsage,
129-
});
117+
}));
130118

131119
if (isValidRuleIndex(fallbackIndex, rules, interactiveEnabled)) {
132120
return { ruleIndex: fallbackIndex, method: 'ai_judge' };

src/core/logging/phaseUsageEvent.ts

Lines changed: 20 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@ import {
33
USAGE_MISSING_REASONS,
44
type UsageMissingReason,
55
} from './contracts.js';
6+
import { buildUsageEventPayload } from './providerEvent.js';
7+
import type { ProviderUsageSnapshot } from '../models/response.js';
68
import { isProviderType, type ProviderType } from '../../shared/types/provider.js';
79

810
export type PhaseUsageType =
@@ -171,12 +173,19 @@ function buildRecord(
171173
function extractUsage(attributes: Record<string, unknown>): Pick<PhaseUsageEventLogRecord, 'usage_missing' | 'reason' | 'usage'> & {
172174
missing: boolean;
173175
} {
176+
const snapshot = usageSnapshotFromAttributes(attributes);
177+
const payload = buildUsageEventPayload(snapshot);
178+
return {
179+
missing: payload.usage_missing,
180+
...payload,
181+
};
182+
}
183+
184+
function usageSnapshotFromAttributes(attributes: Record<string, unknown>): ProviderUsageSnapshot {
174185
if (attributes['takt.usage.missing'] === true) {
175186
return {
176-
missing: true,
177-
usage_missing: true,
187+
usageMissing: true,
178188
reason: getUsageMissingReason(attributes['takt.usage.missing_reason']),
179-
usage: {},
180189
};
181190
}
182191

@@ -187,26 +196,21 @@ function extractUsage(attributes: Record<string, unknown>): Pick<PhaseUsageEvent
187196

188197
if (inputTokens === undefined || outputTokens === undefined || totalTokens === undefined) {
189198
return {
190-
missing: true,
191-
usage_missing: true,
199+
usageMissing: true,
192200
reason: hasAnyUsageAttribute(attributes)
193201
? USAGE_MISSING_REASONS.TOKENS_MISSING
194202
: USAGE_MISSING_REASONS.NOT_AVAILABLE,
195-
usage: {},
196203
};
197204
}
198205

199206
return {
200-
missing: false,
201-
usage_missing: false,
202-
usage: {
203-
input_tokens: inputTokens,
204-
output_tokens: outputTokens,
205-
total_tokens: totalTokens,
206-
...optionalNumber('cached_input_tokens', getNumber(attributes, 'gen_ai.usage.cached_input_tokens')),
207-
...optionalNumber('cache_creation_input_tokens', getNumber(attributes, 'gen_ai.usage.cache_creation_input_tokens')),
208-
...optionalNumber('cache_read_input_tokens', getNumber(attributes, 'gen_ai.usage.cache_read_input_tokens')),
209-
},
207+
usageMissing: false,
208+
inputTokens,
209+
outputTokens,
210+
totalTokens,
211+
cachedInputTokens: getNumber(attributes, 'gen_ai.usage.cached_input_tokens'),
212+
cacheCreationInputTokens: getNumber(attributes, 'gen_ai.usage.cache_creation_input_tokens'),
213+
cacheReadInputTokens: getNumber(attributes, 'gen_ai.usage.cache_read_input_tokens'),
210214
};
211215
}
212216

@@ -287,10 +291,6 @@ function getUsageMissingReason(value: unknown): UsageMissingReason {
287291
: USAGE_MISSING_REASONS.NOT_AVAILABLE;
288292
}
289293

290-
function optionalNumber<K extends string>(key: K, value: number | undefined): Partial<Record<K, number>> {
291-
return value !== undefined ? { [key]: value } as Partial<Record<K, number>> : {};
292-
}
293-
294294
function hrTimeToIso(time: readonly [number, number] | undefined): string {
295295
if (!time) {
296296
return new Date().toISOString();

0 commit comments

Comments
 (0)