Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
bd4b596
refactor(bridge): add ProviderCapabilities interface for provider-bas…
y49 Apr 6, 2026
8fbfd3c
fix(bridge): handle multiple questions in AskUserQuestion
y49 Apr 6, 2026
7ebc38c
feat(bridge): passthrough unrecognized slash commands to Claude Code
y49 Apr 6, 2026
7fa4f78
feat(bridge): support streaming input — send messages while AI is wor…
y49 Apr 6, 2026
df6ec4f
feat(bridge): display TodoWrite progress cards in IM
y49 Apr 6, 2026
ddc27c1
feat(bridge): enhanced cost tracking with session totals
y49 Apr 6, 2026
f0f9aa7
fix(bridge): align AskUserQuestion with official SDK docs
y49 Apr 6, 2026
9c20c23
fix(bridge): align with SDK TypeScript reference for modelUsage and c…
y49 Apr 6, 2026
ff8dd01
fix(bridge): only inject streaming input on reply-to, queue on direct…
y49 Apr 6, 2026
203d96c
feat(bridge): message queue + reply-to injection refinement
y49 Apr 6, 2026
8ca76e0
docs: add LiveSession design — long-lived query with Thread/Turn model
y49 Apr 6, 2026
e727a63
refactor(bridge): define LiveSession interface and TurnParams
y49 Apr 6, 2026
194d3fd
feat(bridge): implement ClaudeLiveSession with Thread/Turn model
y49 Apr 6, 2026
a6a77fa
feat(bridge): add SessionRegistry with LiveSession integration
y49 Apr 6, 2026
44d01f1
refactor(bridge): remove old MessageInjector streaming input from str…
y49 Apr 6, 2026
8fcf40b
fix(bridge): address code review issues
y49 Apr 6, 2026
c96304f
fix(bridge): close LiveSession on /new to prevent memory leak
y49 Apr 6, 2026
52f71ef
fix(bridge): prevent memory leaks in long-running LiveSessions
y49 Apr 6, 2026
367dedc
fix(bridge): coalesce rapid-fire messages to handle Telegram 4096 cha…
y49 Apr 6, 2026
3643213
chore(bridge): unify log prefixes to tlive:module format
y49 Apr 6, 2026
1781fb3
fix(bridge): address review — effort/model passthrough, queue iterati…
y49 Apr 6, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions bridge/src/__tests__/bridge-manager.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ describe('BridgeManager', () => {
}),
controls: undefined,
}),
capabilities: () => ({ slashCommands: true, askUserQuestion: true, liveSession: true, todoTracking: true, costInUsd: true, skills: true, sessionResume: true }),
} as any,
permissions: { resolvePendingPermission: vi.fn() } as any,
core: { isHealthy: () => true } as any,
Expand Down
2 changes: 1 addition & 1 deletion bridge/src/__tests__/callback-router.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ describe('CallbackRouter', () => {
const result = await router.handle(adapter, msg);

expect(result).toBe(true);
expect(sdkState.sdkQuestionTextAnswers.get('p1')).toBe('Alpha,Gamma');
expect(sdkState.sdkQuestionTextAnswers.get('p1')).toBe('Alpha, Gamma');
expect(permissions.cleanupQuestion).toHaveBeenCalledWith('p1');
expect(gateway.resolve).toHaveBeenCalledWith('p1', 'allow');
expect(adapter.editMessage).toHaveBeenCalledWith('c1', 'm1', expect.objectContaining({
Expand Down
2 changes: 1 addition & 1 deletion bridge/src/context.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
export type { BridgeStore } from './store/interface.js';
export type { LLMProvider } from './providers/base.js';
export type { LLMProvider, ProviderCapabilities, LiveSession } from './providers/base.js';

export interface PermissionGateway {}
export interface CoreClient {}
Expand Down
92 changes: 87 additions & 5 deletions bridge/src/engine/bridge-manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ export class BridgeManager {
() => this.coreAvailable,
this.sdkEngine.getActiveControls(),
this.permissions,
(channelType, chatId) => this.sdkEngine.closeSession(channelType, chatId),
);
this.callbackRouter = new CallbackRouter(
this.permissions,
Expand Down Expand Up @@ -155,11 +156,13 @@ export class BridgeManager {
this.runAdapterLoop(adapter);
}
this.permissions.startPruning();
this.sdkEngine.startSessionPruning();
}

async stop(): Promise<void> {
this.running = false;
this.permissions.stopPruning();
this.sdkEngine.stopSessionPruning();
this.permissions.getGateway().denyAll();
for (const adapter of this.adapters.values()) {
await adapter.stop();
Expand All @@ -171,9 +174,66 @@ export class BridgeManager {
return this.hookEngine.sendNotification(adapter, chatId, hook, receiveIdType);
}

/** Process queued messages iteratively after current turn completes */
private async drainQueue(adapter: BaseChannelAdapter, channelType: string, chatId: string): Promise<void> {
let next: InboundMessage | undefined;
while ((next = this.sdkEngine.dequeueMessage(channelType, chatId))) {
console.log(`[${adapter.channelType}] Processing queued message`);
try {
await this.handleInboundMessage(adapter, next);
} catch (err) {
console.error(`[${adapter.channelType}] Error processing queued message:`, err);
break;
}
}
}

/** Wait briefly for follow-up messages from the same user, merge text if they arrive quickly.
* Handles Telegram splitting long messages at 4096 chars. */
/** Telegram message length limit — only coalesce if text is near this boundary */
private static TG_MSG_LIMIT = 4096;

private async coalesceMessages(adapter: BaseChannelAdapter, first: InboundMessage): Promise<InboundMessage> {
if (!first.text || first.callbackData) return first;

// Only wait for follow-up parts if message is near Telegram's 4096 char limit
if (first.text.length < BridgeManager.TG_MSG_LIMIT - 200) return first;

// Wait up to 500ms for follow-up parts
const parts: string[] = [first.text];
const deadline = Date.now() + 500;

while (Date.now() < deadline) {
await new Promise(r => setTimeout(r, 100));
const next = await adapter.consumeOne();
if (!next) continue;

// Only merge if same user, same chat, text-only (no callback/command), arrives quickly
if (next.userId === first.userId && next.chatId === first.chatId
&& next.text && !next.callbackData && !next.text.startsWith('/')) {
parts.push(next.text);
console.log(`[${adapter.channelType}] Coalesced message part (${next.text.length} chars)`);
} else {
// Different message — put it back by re-processing later
// We can't "unget" so we handle it inline
// For simplicity, process it in the next loop iteration by pushing to a buffer
this.coalescePushback.set(adapter.channelType, next);
break;
}
}

if (parts.length === 1) return first;
console.log(`[${adapter.channelType}] Merged ${parts.length} message parts (${parts.reduce((s, p) => s + p.length, 0)} chars total)`);
return { ...first, text: parts.join('\n') };
}

private coalescePushback = new Map<string, InboundMessage>();

private async runAdapterLoop(adapter: BaseChannelAdapter): Promise<void> {
while (this.running) {
const msg = await adapter.consumeOne();
// Check pushback from coalescing first
let msg = this.coalescePushback.get(adapter.channelType) ?? await adapter.consumeOne();
this.coalescePushback.delete(adapter.channelType);
if (!msg) { await new Promise(r => setTimeout(r, 100)); continue; }
console.log(`[${adapter.channelType}] Message from ${msg.userId}: ${msg.text || '(callback)'}`);
// Callbacks, commands, and permission text are fast — await them.
Expand All @@ -192,14 +252,29 @@ export class BridgeManager {
console.error(`[${adapter.channelType}] Error handling message:`, err);
}
} else {
// Guard: if this chat is already processing a message, tell the user
const chatKey = this.state.stateKey(msg.channelType, msg.chatId);
// Coalesce rapid-fire messages (e.g. Telegram splits long text at 4096 chars)
// Wait briefly and merge any follow-up messages from the same user/chat
const coalesced = await this.coalesceMessages(adapter, msg);

// Guard: if this chat is already processing a message
const chatKey = this.state.stateKey(coalesced.channelType, coalesced.chatId);
if (this.state.isProcessing(chatKey)) {
await adapter.send({ chatId: msg.chatId, text: '⏳ Previous message still processing, please wait...' }).catch(() => {});
if (coalesced.text && this.sdkEngine.canSteer(coalesced.channelType, coalesced.chatId, coalesced.replyToMessageId)) {
this.sdkEngine.steer(coalesced.channelType, coalesced.chatId, coalesced.text);
await adapter.send({ chatId: coalesced.chatId, text: '💬 Message sent to active session' }).catch(() => {});
} else if (coalesced.text) {
const queued = this.sdkEngine.queueMessage(coalesced.channelType, coalesced.chatId, coalesced);
if (queued) {
await adapter.send({ chatId: coalesced.chatId, text: '📥 Queued — will process after current task' }).catch(() => {});
} else {
await adapter.send({ chatId: coalesced.chatId, text: '⚠️ Queue full — please wait for current tasks to finish' }).catch(() => {});
}
}
continue;
}
this.state.setProcessing(chatKey, true);
this.handleInboundMessage(adapter, msg)
this.handleInboundMessage(adapter, coalesced)
.then(() => this.drainQueue(adapter, coalesced.channelType, coalesced.chatId))
.catch(err => console.error(`[${adapter.channelType}] Error handling message:`, err))
.finally(() => this.state.setProcessing(chatKey, false));
}
Expand All @@ -221,6 +296,13 @@ export class BridgeManager {
if (msg.text?.startsWith('/')) {
const handled = await this.commands.handle(adapter, msg);
if (handled) return true;

// Unrecognized slash command — check if provider supports passthrough
const provider = this.getProvider(msg.channelType, msg.chatId);
if (!provider.capabilities().slashCommands) {
await adapter.send({ chatId: msg.chatId, text: '⚠️ Slash commands not supported by current runtime' });
return true;
}
}

// SDK conversation — delegate to SDKEngine
Expand Down
10 changes: 5 additions & 5 deletions bridge/src/engine/callback-router.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import type { PermissionCoordinator } from './permission-coordinator.js';

/** Shared SDK question state — owned by SDKEngine, read/written by CallbackRouter */
export interface SdkQuestionState {
sdkQuestionData: Map<string, { questions: Array<{ question: string; header: string; options: Array<{ label: string; description?: string }>; multiSelect: boolean }>; chatId: string }>;
sdkQuestionData: Map<string, { questions: Array<{ question: string; header: string; options: Array<{ label: string; description?: string; preview?: string }>; multiSelect: boolean }>; chatId: string }>;
sdkQuestionAnswers: Map<string, number>;
sdkQuestionTextAnswers: Map<string, string>;
}
Expand Down Expand Up @@ -106,7 +106,7 @@ export class CallbackRouter {
if (qData) {
const q = qData.questions[0];
const selectedLabels = [...selected].sort((a, b) => a - b).map(i => q.options[i]?.label).filter(Boolean);
const answerText = selectedLabels.join(',');
const answerText = selectedLabels.join(', ');
this.sdkState.sdkQuestionTextAnswers.set(permId, answerText);
adapter.editMessage(msg.chatId, msg.messageId, {
chatId: msg.chatId,
Expand Down Expand Up @@ -143,7 +143,7 @@ export class CallbackRouter {
const toolName = parts.slice(3).join(':');
this.permissions.getGateway().resolve(permId, 'allow');
this.permissions.addAllowedTool(toolName);
console.log(`[bridge] Added ${toolName} to session whitelist`);
console.log(`[tlive:engine] Added ${toolName} to session whitelist`);
return true;
}

Expand All @@ -153,7 +153,7 @@ export class CallbackRouter {
const prefix = parts.slice(3).join(':');
this.permissions.getGateway().resolve(permId, 'allow');
this.permissions.addAllowedBashPrefix(prefix);
console.log(`[bridge] Added Bash(${prefix} *) to session whitelist`);
console.log(`[tlive:engine] Added Bash(${prefix} *) to session whitelist`);
return true;
}

Expand Down Expand Up @@ -197,7 +197,7 @@ export class CallbackRouter {
}

// Regular permission broker callbacks (perm:allow:ID, perm:deny:ID)
console.log(`[bridge] Perm callback: ${msg.callbackData}, gateway pending: ${this.permissions.getGateway().pendingCount()}`);
console.log(`[tlive:engine] Perm callback: ${msg.callbackData}, gateway pending: ${this.permissions.getGateway().pendingCount()}`);
this.permissions.handleBrokerCallback(msg.callbackData);
return true;
}
Expand Down
3 changes: 3 additions & 0 deletions bridge/src/engine/command-router.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ export class CommandRouter {
private coreAvailable: () => boolean,
private activeControls: Map<string, QueryControls>,
private permissions: { clearSessionWhitelist(): void },
private onNewSession?: (channelType: string, chatId: string) => void,
) {}

async handle(adapter: BaseChannelAdapter, msg: InboundMessage): Promise<boolean> {
Expand Down Expand Up @@ -65,6 +66,8 @@ export class CommandRouter {
return true;
}
case '/new': {
// Close any active LiveSession(s) for this chat before creating new session
this.onNewSession?.(msg.channelType, msg.chatId);
const newSessionId = `session-${Date.now()}-${Math.random().toString(36).slice(2, 8)}`;
await this.router.rebind(msg.channelType, msg.chatId, newSessionId);
this.state.clearLastActive(msg.channelType, msg.chatId);
Expand Down
14 changes: 10 additions & 4 deletions bridge/src/engine/conversation.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { getBridgeContext } from '../context.js';
import type { CanonicalEvent } from '../messages/schema.js';
import type { LLMProvider, FileAttachment, PermissionRequestHandler, QueryControls } from '../providers/base.js';
import type { LLMProvider, FileAttachment, PermissionRequestHandler, QueryControls, StreamChatResult } from '../providers/base.js';
import type { AskUserQuestionHandler } from '../messages/types.js';

const TEXT_MIME_PREFIXES = ['text/', 'application/json', 'application/xml', 'application/javascript', 'application/typescript', 'application/x-yaml', 'application/toml'];
Expand Down Expand Up @@ -35,13 +35,14 @@ interface ProcessMessageParams {
onTextDelta?: (delta: string) => void;
onToolStart?: (event: { id: string; name: string; input: Record<string, unknown> }) => void;
onToolResult?: (event: { toolUseId: string; content: string; isError: boolean }) => void;
onQueryResult?: (event: { sessionId: string; isError: boolean; usage: { inputTokens: number; outputTokens: number; costUsd?: number }; permissionDenials?: Array<{ toolName: string; toolUseId: string }> }) => void;
onQueryResult?: (event: { sessionId: string; isError: boolean; usage: { inputTokens: number; outputTokens: number; costUsd?: number; modelUsage?: Record<string, { inputTokens: number; outputTokens: number; cacheReadInputTokens?: number; cacheCreationInputTokens?: number; costUSD?: number }> }; permissionDenials?: Array<{ toolName: string; toolUseId: string }> }) => void;
onError?: (error: string) => void;
onAgentStart?: (data: { description: string; taskId?: string }) => void;
onAgentProgress?: (data: { description: string; lastTool?: string; usage?: { toolUses: number; durationMs: number } }) => void;
onAgentComplete?: (data: { summary: string; status: string }) => void;
onPromptSuggestion?: (suggestion: string) => void;
onToolProgress?: (data: { toolName: string; elapsed: number }) => void;
onTodoUpdate?: (todos: Array<{ content: string; status: 'pending' | 'in_progress' | 'completed' }>) => void;
onRateLimit?: (data: { status: string; utilization?: number; resetsAt?: number }) => void;
/** Receives query controls (interrupt, stopTask) when available */
onControls?: (controls: QueryControls) => void;
Expand All @@ -54,6 +55,8 @@ interface ProcessMessageParams {
model?: string;
/** Override LLM provider (for per-chat runtime selection) */
llm?: LLMProvider;
/** Pre-built stream from LiveSession.startTurn() — skips llm.streamChat() */
streamResult?: StreamChatResult;
}

interface ProcessMessageResult {
Expand Down Expand Up @@ -90,8 +93,8 @@ export class ConversationEngine {
const session = await store.getSession(params.sessionId);
const workDir = session?.workingDirectory ?? defaultWorkdir;

// 5. Stream LLM response (pass images as attachments for vision)
const result = llm.streamChat({
// 5. Stream LLM response — use pre-built stream from LiveSession or call streamChat
const result = params.streamResult ?? llm.streamChat({
prompt,
workingDirectory: workDir,
model: params.model,
Expand Down Expand Up @@ -156,6 +159,9 @@ export class ConversationEngine {
case 'tool_progress':
params.onToolProgress?.(value);
break;
case 'todo_update':
params.onTodoUpdate?.(value.todos);
break;
case 'rate_limit':
params.onRateLimit?.(value);
break;
Expand Down
48 changes: 45 additions & 3 deletions bridge/src/engine/cost-tracker.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,19 @@
export interface ModelUsageEntry {
inputTokens: number;
outputTokens: number;
cacheReadInputTokens?: number;
cacheCreationInputTokens?: number;
costUSD?: number;
}

export interface UsageStats {
inputTokens: number;
outputTokens: number;
costUsd: number;
durationMs: number;
sessionTotalUsd?: number;
queryCount?: number;
modelUsage?: Record<string, ModelUsageEntry>;
}

function formatTokens(n: number): string {
Expand All @@ -17,33 +28,64 @@ function formatDuration(ms: number): string {
return sec > 0 ? `${min}m ${sec}s` : `${min}m`;
}

/** Format per-model cost breakdown. Returns null if only one model or no data. */
function formatModelBreakdown(modelUsage?: Record<string, ModelUsageEntry>): string | null {
if (!modelUsage) return null;
const entries = Object.entries(modelUsage).filter(([, u]) => u.costUSD && u.costUSD > 0);
if (entries.length <= 1) return null;
// Short model names: "claude-sonnet-4-20250514" → "sonnet-4"
return entries.map(([model, u]) => {
const short = model.replace(/^claude-/, '').replace(/-\d{8}$/, '');
return `${short} $${u.costUSD!.toFixed(2)}`;
}).join(' + ');
}

export class CostTracker {
private startTime = 0;
private sessionTotal = 0;
private _queryCount = 0;

start(): void {
this.startTime = Date.now();
}

finish(usage: { input_tokens: number; output_tokens: number; cost_usd?: number }): UsageStats {
finish(usage: { input_tokens: number; output_tokens: number; cost_usd?: number; model_usage?: Record<string, ModelUsageEntry> }): UsageStats {
const durationMs = Date.now() - this.startTime;
const costUsd = usage.cost_usd ?? this.estimateCost(usage.input_tokens, usage.output_tokens);
this._queryCount++;
this.sessionTotal += costUsd;
return {
inputTokens: usage.input_tokens,
outputTokens: usage.output_tokens,
costUsd,
durationMs,
sessionTotalUsd: this.sessionTotal,
queryCount: this._queryCount,
...(usage.model_usage ? { modelUsage: usage.model_usage } : {}),
};
}

get queryCount(): number { return this._queryCount; }

static format(stats: UsageStats): string {
const duration = formatDuration(stats.durationMs);
// When tokens are 0 (e.g. Codex SDK doesn't expose token counts), show only duration
if (stats.inputTokens === 0 && stats.outputTokens === 0) {
return `📊 ${duration}`;
}
const tokens = `${formatTokens(stats.inputTokens)}/${formatTokens(stats.outputTokens)} tok`;
const cost = `$${stats.costUsd.toFixed(2)}`;
return `📊 ${tokens} | ${cost} | ${duration}`;
// Only show cost when non-zero (providers without cost_usd report 0)
if (stats.costUsd > 0) {
const cost = `$${stats.costUsd.toFixed(2)}`;
// Per-model breakdown when multiple models used
const modelBreakdown = formatModelBreakdown(stats.modelUsage);
const costPart = modelBreakdown || cost;
if (stats.queryCount && stats.queryCount > 1 && stats.sessionTotalUsd != null) {
return `📊 ${tokens} | ${costPart} (Σ $${stats.sessionTotalUsd.toFixed(2)}) | ${duration}`;
}
return `📊 ${tokens} | ${costPart} | ${duration}`;
}
return `📊 ${tokens} | ${duration}`;
}

private estimateCost(inputTokens: number, outputTokens: number): number {
Expand Down
Loading
Loading