From 0aa3a51ef671fb7408ea37a4fe3937f62911a14e Mon Sep 17 00:00:00 2001 From: 49 Date: Mon, 6 Apr 2026 15:41:01 +0800 Subject: [PATCH 1/5] refactor(bridge): extract CallbackRouter from BridgeManager Move all button callback handling (~180 lines) into a dedicated CallbackRouter class. Uses constructor DI with callback functions, matching existing CommandRouter pattern. --- bridge/src/engine/bridge-manager.ts | 199 +++----------------------- bridge/src/engine/callback-router.ts | 204 +++++++++++++++++++++++++++ 2 files changed, 220 insertions(+), 183 deletions(-) create mode 100644 bridge/src/engine/callback-router.ts diff --git a/bridge/src/engine/bridge-manager.ts b/bridge/src/engine/bridge-manager.ts index b69b6c8f..ec9956d4 100644 --- a/bridge/src/engine/bridge-manager.ts +++ b/bridge/src/engine/bridge-manager.ts @@ -16,6 +16,8 @@ import { getToolCommand } from './tool-registry.js'; import { SessionStateManager } from './session-state.js'; import { PermissionCoordinator } from './permission-coordinator.js'; import { CommandRouter } from './command-router.js'; +import { CallbackRouter } from './callback-router.js'; +import type { SdkQuestionState } from './callback-router.js'; import type { FeishuStreamingSession } from '../channels/feishu-streaming.js'; import { CostTracker } from './cost-tracker.js'; import { readFileSync, writeFileSync, mkdirSync } from 'node:fs'; @@ -79,6 +81,7 @@ export class BridgeManager { private pendingAttachments = new Map(); private commands: CommandRouter; + private callbackRouter: CallbackRouter; private chatIdFile: string; /** Cached LLM providers keyed by runtime name */ private providerCache = new Map(); @@ -111,6 +114,17 @@ export class BridgeManager { this.activeControls, this.permissions, ); + const sdkState: SdkQuestionState = { + sdkQuestionData: this.sdkQuestionData, + sdkQuestionAnswers: this.sdkQuestionAnswers, + sdkQuestionTextAnswers: this.sdkQuestionTextAnswers, + }; + this.callbackRouter = new CallbackRouter( + this.permissions, + sdkState, + () => this.coreAvailable, + (adapter, msg) => this.handleInboundMessage(adapter, msg), + ); } /** Expose coreAvailable flag for main.ts polling loop */ @@ -545,190 +559,9 @@ export class BridgeManager { return true; } - // Callback data + // Callback data — delegate to CallbackRouter if (msg.callbackData) { - // Prompt suggestion callback — re-inject as a normal user message - if (msg.callbackData.startsWith('suggest:')) { - const suggestion = msg.callbackData.slice('suggest:'.length); - // Re-process as a regular text message - msg.text = suggestion; - msg.callbackData = undefined; - return this.handleInboundMessage(adapter, msg); - } - - // AskUserQuestion answer callbacks (askq:{hookId}:{optionIndex}:{sessionId}) - // NOTE: check toggle/submit/skip BEFORE this — they also start with "askq" - if (msg.callbackData.startsWith('askq:') && !msg.callbackData.startsWith('askq_')) { - const parts = msg.callbackData.split(':'); - const hookId = parts[1]; - const optionIndex = parseInt(parts[2], 10); - const sessionId = parts[3] || ''; - await this.permissions.resolveAskQuestion( - hookId, optionIndex, sessionId, - msg.messageId, adapter, msg.chatId, this.coreAvailable, - ); - return true; - } - - // AskUserQuestion multi-select toggle (askq_toggle:{hookId}:{idx}:{sessionId}) - if (msg.callbackData.startsWith('askq_toggle:')) { - const parts = msg.callbackData.split(':'); - const hookId = parts[1]; - const optionIndex = parseInt(parts[2], 10); - const sessionId = parts[3] || ''; - const selected = this.permissions.toggleMultiSelectOption(hookId, optionIndex); - if (selected === null) return true; - - // Re-render the card with updated checkboxes - const card = this.permissions.buildMultiSelectCard(hookId, sessionId, selected, adapter.channelType); - if (card) { - await adapter.editMessage(msg.chatId, msg.messageId, { - chatId: msg.chatId, - text: card.text, - html: card.html, - buttons: card.buttons, - feishuHeader: adapter.channelType === 'feishu' ? { template: 'blue', title: '❓ Terminal' } : undefined, - }); - } - return true; - } - - // AskUserQuestion multi-select submit (askq_submit:{hookId}:{sessionId}) - if (msg.callbackData.startsWith('askq_submit:')) { - const parts = msg.callbackData.split(':'); - const hookId = parts[1]; - const sessionId = parts[2] || ''; - await this.permissions.resolveMultiSelect( - hookId, sessionId, - msg.messageId, adapter, msg.chatId, this.coreAvailable, - ); - return true; - } - - // AskUserQuestion skip callback — resolve with allow + empty answers (askq_skip:{hookId}:{sessionId}) - if (msg.callbackData.startsWith('askq_skip:')) { - const parts = msg.callbackData.split(':'); - const hookId = parts[1]; - const sessionId = parts[2] || ''; - await this.permissions.resolveAskQuestionSkip( - hookId, sessionId, - msg.messageId, adapter, msg.chatId, this.coreAvailable, - ); - return true; - } - - // SDK AskUserQuestion multi-select submit (askq_submit_sdk:{permId}) - if (msg.callbackData.startsWith('askq_submit_sdk:')) { - const permId = msg.callbackData.split(':')[1]; - const selected = this.permissions.getToggledSelections(permId); - if (selected.size === 0) { - await adapter.send({ chatId: msg.chatId, text: '⚠️ No options selected' }); - return true; - } - const qData = this.sdkQuestionData.get(permId); - if (qData) { - const q = qData.questions[0]; - const selectedLabels = [...selected].sort((a, b) => a - b).map(i => q.options[i]?.label).filter(Boolean); - const answerText = selectedLabels.join(','); - this.sdkQuestionTextAnswers.set(permId, answerText); - // Edit card to show selection - adapter.editMessage(msg.chatId, msg.messageId, { - chatId: msg.chatId, - text: `✅ Selected: ${selectedLabels.join(', ')}`, - buttons: [], - feishuHeader: msg.channelType === 'feishu' ? { template: 'green', title: '✅ Answered' } : undefined, - }).catch(() => {}); - } - this.permissions.cleanupQuestion(permId); - this.permissions.getGateway().resolve(permId, 'allow'); - return true; - } - - // Hook permission callbacks (hook:allow:ID:sessionId, hook:allow_always:ID:sessionId, hook:deny:ID:sessionId) - if (msg.callbackData.startsWith('hook:')) { - const parts = msg.callbackData.split(':'); - const decision = parts[1]; // allow, allow_always, or deny - const hookId = parts[2]; - const sessionId = parts[3] || ''; - await this.permissions.resolveHookCallback(hookId, decision, sessionId, msg.messageId, adapter, msg.chatId, this.coreAvailable); - return true; - } - - // Graduated permission callbacks — resolve gateway, no message edit - // (renderer.onPermissionResolved() handles the visual transition) - if (msg.callbackData.startsWith('perm:allow_edits:')) { - const permId = msg.callbackData.split(':').slice(2).join(':'); - this.permissions.getGateway().resolve(permId, 'allow'); - return true; - } - - if (msg.callbackData.startsWith('perm:allow_tool:')) { - const parts = msg.callbackData.split(':'); - const permId = parts[2]; - const toolName = parts.slice(3).join(':'); - this.permissions.getGateway().resolve(permId, 'allow'); - this.permissions.addAllowedTool(toolName); - console.log(`[bridge] Added ${toolName} to session whitelist`); - return true; - } - - if (msg.callbackData.startsWith('perm:allow_bash:')) { - const parts = msg.callbackData.split(':'); - const permId = parts[2]; - const prefix = parts.slice(3).join(':'); - this.permissions.getGateway().resolve(permId, 'allow'); - this.permissions.addAllowedBashPrefix(prefix); - console.log(`[bridge] Added Bash(${prefix} *) to session whitelist`); - return true; - } - - // SDK AskUserQuestion answer callbacks (perm:allow:permId:askq:optionIndex) - if (msg.callbackData.includes(':askq:')) { - const parts = msg.callbackData.split(':'); - const askqIdx = parts.indexOf('askq'); - if (askqIdx >= 0) { - const permId = parts.slice(2, askqIdx).join(':'); - const optionIndex = parseInt(parts[askqIdx + 1], 10); - const qData = this.sdkQuestionData.get(permId); - const selected = qData?.questions?.[0]?.options?.[optionIndex]; - if (!selected) { - // Invalid option index (stale button or tampered data) — ignore - return true; - } - this.sdkQuestionAnswers.set(permId, optionIndex); - this.permissions.getGateway().resolve(permId, 'allow'); - adapter.editMessage(msg.chatId, msg.messageId, { - chatId: msg.chatId, - text: `✅ Selected: ${selected.label}`, - buttons: [], - feishuHeader: { template: 'green', title: `✅ ${selected.label}` }, - }).catch(() => {}); - return true; - } - } - - // SDK AskUserQuestion skip (perm:allow:permId:askq_skip) — resolve with deny so handler returns empty answers - if (msg.callbackData.includes(':askq_skip')) { - const parts = msg.callbackData.split(':'); - const skipIdx = parts.indexOf('askq_skip'); - if (skipIdx >= 0) { - const permId = parts.slice(2, skipIdx).join(':'); - this.permissions.getGateway().resolve(permId, 'deny', 'Skipped'); - adapter.editMessage(msg.chatId, msg.messageId, { - chatId: msg.chatId, - text: '⏭ Skipped', - buttons: [], - feishuHeader: { template: 'grey', title: '⏭ Skipped' }, - }).catch(() => {}); - return true; - } - } - - // Regular permission broker callbacks (perm:allow:ID, perm:deny:ID) - console.log(`[bridge] Perm callback: ${msg.callbackData}, gateway pending: ${this.permissions.getGateway().pendingCount()}`); - this.permissions.handleBrokerCallback(msg.callbackData); - // No message edit — renderer.onPermissionResolved() morphs back to status line - return true; + return this.callbackRouter.handle(adapter, msg); } // Bridge commands — only intercept known commands, pass others to Claude Code diff --git a/bridge/src/engine/callback-router.ts b/bridge/src/engine/callback-router.ts new file mode 100644 index 00000000..fe748dae --- /dev/null +++ b/bridge/src/engine/callback-router.ts @@ -0,0 +1,204 @@ +import type { BaseChannelAdapter } from '../channels/base.js'; +import type { InboundMessage } from '../channels/types.js'; +import type { PermissionCoordinator } from './permission-coordinator.js'; + +/** Shared SDK question state — owned by SDKEngine, read/written by CallbackRouter */ +export interface SdkQuestionState { + sdkQuestionData: Map; multiSelect: boolean }>; chatId: string }>; + sdkQuestionAnswers: Map; + sdkQuestionTextAnswers: Map; +} + +/** + * Routes all button callback interactions from IM platforms. + * + * Handles: prompt suggestions, AskUserQuestion buttons (single/multi-select), + * hook permission callbacks, SDK permission callbacks, and broker callbacks. + */ +export class CallbackRouter { + constructor( + private permissions: PermissionCoordinator, + private sdkState: SdkQuestionState, + private coreAvailable: () => boolean, + private handleInboundMessage: (adapter: BaseChannelAdapter, msg: InboundMessage) => Promise, + ) {} + + async handle(adapter: BaseChannelAdapter, msg: InboundMessage): Promise { + if (!msg.callbackData) return false; + + // Prompt suggestion callback — re-inject as a normal user message + if (msg.callbackData.startsWith('suggest:')) { + const suggestion = msg.callbackData.slice('suggest:'.length); + msg.text = suggestion; + msg.callbackData = undefined; + return this.handleInboundMessage(adapter, msg); + } + + // AskUserQuestion answer callbacks (askq:{hookId}:{optionIndex}:{sessionId}) + // NOTE: check toggle/submit/skip BEFORE this — they also start with "askq" + if (msg.callbackData.startsWith('askq:') && !msg.callbackData.startsWith('askq_')) { + const parts = msg.callbackData.split(':'); + const hookId = parts[1]; + const optionIndex = parseInt(parts[2], 10); + const sessionId = parts[3] || ''; + await this.permissions.resolveAskQuestion( + hookId, optionIndex, sessionId, + msg.messageId, adapter, msg.chatId, this.coreAvailable(), + ); + return true; + } + + // AskUserQuestion multi-select toggle (askq_toggle:{hookId}:{idx}:{sessionId}) + if (msg.callbackData.startsWith('askq_toggle:')) { + const parts = msg.callbackData.split(':'); + const hookId = parts[1]; + const optionIndex = parseInt(parts[2], 10); + const selected = this.permissions.toggleMultiSelectOption(hookId, optionIndex); + if (selected === null) return true; + + const sessionId = parts[3] || ''; + const card = this.permissions.buildMultiSelectCard(hookId, sessionId, selected, adapter.channelType); + if (card) { + await adapter.editMessage(msg.chatId, msg.messageId, { + chatId: msg.chatId, + text: card.text, + html: card.html, + buttons: card.buttons, + feishuHeader: adapter.channelType === 'feishu' ? { template: 'blue', title: '❓ Terminal' } : undefined, + }); + } + return true; + } + + // AskUserQuestion multi-select submit (askq_submit:{hookId}:{sessionId}) + if (msg.callbackData.startsWith('askq_submit:')) { + const parts = msg.callbackData.split(':'); + const hookId = parts[1]; + const sessionId = parts[2] || ''; + await this.permissions.resolveMultiSelect( + hookId, sessionId, + msg.messageId, adapter, msg.chatId, this.coreAvailable(), + ); + return true; + } + + // AskUserQuestion skip callback (askq_skip:{hookId}:{sessionId}) + if (msg.callbackData.startsWith('askq_skip:')) { + const parts = msg.callbackData.split(':'); + const hookId = parts[1]; + const sessionId = parts[2] || ''; + await this.permissions.resolveAskQuestionSkip( + hookId, sessionId, + msg.messageId, adapter, msg.chatId, this.coreAvailable(), + ); + return true; + } + + // SDK AskUserQuestion multi-select submit (askq_submit_sdk:{permId}) + if (msg.callbackData.startsWith('askq_submit_sdk:')) { + const permId = msg.callbackData.split(':')[1]; + const selected = this.permissions.getToggledSelections(permId); + if (selected.size === 0) { + await adapter.send({ chatId: msg.chatId, text: '⚠️ No options selected' }); + return true; + } + const qData = this.sdkState.sdkQuestionData.get(permId); + if (qData) { + const q = qData.questions[0]; + const selectedLabels = [...selected].sort((a, b) => a - b).map(i => q.options[i]?.label).filter(Boolean); + const answerText = selectedLabels.join(','); + this.sdkState.sdkQuestionTextAnswers.set(permId, answerText); + adapter.editMessage(msg.chatId, msg.messageId, { + chatId: msg.chatId, + text: `✅ Selected: ${selectedLabels.join(', ')}`, + buttons: [], + feishuHeader: msg.channelType === 'feishu' ? { template: 'green', title: '✅ Answered' } : undefined, + }).catch(() => {}); + } + this.permissions.cleanupQuestion(permId); + this.permissions.getGateway().resolve(permId, 'allow'); + return true; + } + + // Hook permission callbacks (hook:allow:ID:sessionId, hook:allow_always:ID:sessionId, hook:deny:ID:sessionId) + if (msg.callbackData.startsWith('hook:')) { + const parts = msg.callbackData.split(':'); + const decision = parts[1]; + const hookId = parts[2]; + const sessionId = parts[3] || ''; + await this.permissions.resolveHookCallback(hookId, decision, sessionId, msg.messageId, adapter, msg.chatId, this.coreAvailable()); + return true; + } + + // Graduated permission callbacks — resolve gateway, no message edit + if (msg.callbackData.startsWith('perm:allow_edits:')) { + const permId = msg.callbackData.split(':').slice(2).join(':'); + this.permissions.getGateway().resolve(permId, 'allow'); + return true; + } + + if (msg.callbackData.startsWith('perm:allow_tool:')) { + const parts = msg.callbackData.split(':'); + const permId = parts[2]; + const toolName = parts.slice(3).join(':'); + this.permissions.getGateway().resolve(permId, 'allow'); + this.permissions.addAllowedTool(toolName); + console.log(`[bridge] Added ${toolName} to session whitelist`); + return true; + } + + if (msg.callbackData.startsWith('perm:allow_bash:')) { + const parts = msg.callbackData.split(':'); + const permId = parts[2]; + const prefix = parts.slice(3).join(':'); + this.permissions.getGateway().resolve(permId, 'allow'); + this.permissions.addAllowedBashPrefix(prefix); + console.log(`[bridge] Added Bash(${prefix} *) to session whitelist`); + return true; + } + + // SDK AskUserQuestion answer callbacks (perm:allow:permId:askq:optionIndex) + if (msg.callbackData.includes(':askq:')) { + const parts = msg.callbackData.split(':'); + const askqIdx = parts.indexOf('askq'); + if (askqIdx >= 0) { + const permId = parts.slice(2, askqIdx).join(':'); + const optionIndex = parseInt(parts[askqIdx + 1], 10); + const qData = this.sdkState.sdkQuestionData.get(permId); + const selected = qData?.questions?.[0]?.options?.[optionIndex]; + if (!selected) return true; + this.sdkState.sdkQuestionAnswers.set(permId, optionIndex); + this.permissions.getGateway().resolve(permId, 'allow'); + adapter.editMessage(msg.chatId, msg.messageId, { + chatId: msg.chatId, + text: `✅ Selected: ${selected.label}`, + buttons: [], + feishuHeader: { template: 'green', title: `✅ ${selected.label}` }, + }).catch(() => {}); + return true; + } + } + + // SDK AskUserQuestion skip (perm:allow:permId:askq_skip) + if (msg.callbackData.includes(':askq_skip')) { + const parts = msg.callbackData.split(':'); + const skipIdx = parts.indexOf('askq_skip'); + if (skipIdx >= 0) { + const permId = parts.slice(2, skipIdx).join(':'); + this.permissions.getGateway().resolve(permId, 'deny', 'Skipped'); + adapter.editMessage(msg.chatId, msg.messageId, { + chatId: msg.chatId, + text: '⏭ Skipped', + buttons: [], + feishuHeader: { template: 'grey', title: '⏭ Skipped' }, + }).catch(() => {}); + return true; + } + } + + // Regular permission broker callbacks (perm:allow:ID, perm:deny:ID) + console.log(`[bridge] Perm callback: ${msg.callbackData}, gateway pending: ${this.permissions.getGateway().pendingCount()}`); + this.permissions.handleBrokerCallback(msg.callbackData); + return true; + } +} From 9d2bd7fb28e1103affa6b07b19454f60b6ca972e Mon Sep 17 00:00:00 2001 From: 49 Date: Mon, 6 Apr 2026 15:49:55 +0800 Subject: [PATCH 2/5] refactor(bridge): extract SDKEngine from BridgeManager MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Move SDK conversation flow (~400 lines) into a dedicated SDKEngine class. Owns ConversationEngine, activeControls, SDK question state maps. Provider-agnostic — works with both Claude SDK and Codex via LLMProvider. --- bridge/src/engine/bridge-manager.ts | 444 +------------------------- bridge/src/engine/sdk-engine.ts | 463 ++++++++++++++++++++++++++++ 2 files changed, 476 insertions(+), 431 deletions(-) create mode 100644 bridge/src/engine/sdk-engine.ts diff --git a/bridge/src/engine/bridge-manager.ts b/bridge/src/engine/bridge-manager.ts index ec9956d4..06fb6e7c 100644 --- a/bridge/src/engine/bridge-manager.ts +++ b/bridge/src/engine/bridge-manager.ts @@ -1,25 +1,17 @@ import { BaseChannelAdapter, createAdapter } from '../channels/base.js'; -import type { InboundMessage, OutboundMessage } from '../channels/types.js'; -import { ConversationEngine } from './conversation.js'; +import type { InboundMessage } from '../channels/types.js'; import { ChannelRouter } from './router.js'; import { PermissionBroker } from '../permissions/broker.js'; import { PendingPermissions } from '../permissions/gateway.js'; -import { DeliveryLayer, chunkByParagraph } from '../delivery/delivery.js'; import { getBridgeContext } from '../context.js'; import { resolveProvider } from '../providers/index.js'; import type { LLMProvider } from '../providers/base.js'; import { loadConfig } from '../config.js'; -import { markdownToTelegram } from '../markdown/index.js'; -import { downgradeHeadings } from '../markdown/feishu.js'; -import { MessageRenderer } from './message-renderer.js'; -import { getToolCommand } from './tool-registry.js'; import { SessionStateManager } from './session-state.js'; import { PermissionCoordinator } from './permission-coordinator.js'; import { CommandRouter } from './command-router.js'; import { CallbackRouter } from './callback-router.js'; -import type { SdkQuestionState } from './callback-router.js'; -import type { FeishuStreamingSession } from '../channels/feishu-streaming.js'; -import { CostTracker } from './cost-tracker.js'; +import { SDKEngine } from './sdk-engine.js'; import { readFileSync, writeFileSync, mkdirSync } from 'node:fs'; import { join, basename } from 'node:path'; import { homedir, networkInterfaces } from 'node:os'; @@ -66,29 +58,22 @@ export interface HookNotificationData { export class BridgeManager { private adapters = new Map(); private running = false; - private engine = new ConversationEngine(); private router = new ChannelRouter(); - private delivery = new DeliveryLayer(); private coreUrl: string; private token: string; private coreAvailable = false; private state = new SessionStateManager(); private permissions: PermissionCoordinator; - /** Active query controls per chat — for /stop command */ - private activeControls = new Map(); private lastChatId = new Map(); /** Pending image attachments waiting for a text message to merge with (key: channelType:chatId) */ private pendingAttachments = new Map(); private commands: CommandRouter; private callbackRouter: CallbackRouter; + private sdkEngine: SDKEngine; private chatIdFile: string; /** Cached LLM providers keyed by runtime name */ private providerCache = new Map(); - /** SDK AskUserQuestion: store question data and selected option index */ - private sdkQuestionData = new Map; multiSelect: boolean }>; chatId: string }>(); - private sdkQuestionAnswers = new Map(); - private sdkQuestionTextAnswers = new Map(); constructor() { const config = loadConfig(); @@ -106,22 +91,18 @@ export class BridgeManager { if (typeof v === 'string') this.lastChatId.set(k, v); } } catch { /* no saved chat IDs yet */ } + this.sdkEngine = new SDKEngine(this.state, this.router, this.permissions); this.commands = new CommandRouter( this.state, () => this.adapters, this.router, () => this.coreAvailable, - this.activeControls, + this.sdkEngine.getActiveControls(), this.permissions, ); - const sdkState: SdkQuestionState = { - sdkQuestionData: this.sdkQuestionData, - sdkQuestionAnswers: this.sdkQuestionAnswers, - sdkQuestionTextAnswers: this.sdkQuestionTextAnswers, - }; this.callbackRouter = new CallbackRouter( this.permissions, - sdkState, + this.sdkEngine.getQuestionState(), () => this.coreAvailable, (adapter, msg) => this.handleInboundMessage(adapter, msg), ); @@ -177,14 +158,8 @@ export class BridgeManager { } /** Find a pending SDK AskUserQuestion for numeric text reply */ - private findPendingSdkQuestion(_channelType: string, chatId: string): { permId: string } | null { - // Find the most recent pending SDK askq permission scoped to this chat - for (const [permId, data] of this.sdkQuestionData) { - if (data.chatId === chatId && this.permissions.getGateway().isPending(permId)) { - return { permId }; - } - } - return null; + private findPendingSdkQuestion(channelType: string, chatId: string): { permId: string } | null { + return this.sdkEngine.findPendingQuestion(channelType, chatId); } registerAdapter(adapter: BaseChannelAdapter): void { @@ -430,7 +405,7 @@ export class BridgeManager { // Validate against actual options count to avoid "Selected: ?" for out-of-range numbers const qData = pendingHookQ ? this.permissions.getQuestionData(pendingHookQ.hookId) - : pendingSdkQ ? this.sdkQuestionData.get(pendingSdkQ.permId) : null; + : pendingSdkQ ? this.sdkEngine.getQuestionState().sdkQuestionData.get(pendingSdkQ.permId) : null; const optionsCount = qData?.questions?.[0]?.options?.length ?? 0; if (idx < optionsCount) validOptionIndex = idx; } @@ -446,7 +421,7 @@ export class BridgeManager { return true; } if (pendingSdkQ) { - this.sdkQuestionAnswers.set(pendingSdkQ.permId, validOptionIndex); + this.sdkEngine.getQuestionState().sdkQuestionAnswers.set(pendingSdkQ.permId, validOptionIndex); this.permissions.getGateway().resolve(pendingSdkQ.permId, 'allow'); return true; } @@ -460,7 +435,7 @@ export class BridgeManager { return true; } if (pendingSdkQ) { - this.sdkQuestionTextAnswers.set(pendingSdkQ.permId, trimmed); + this.sdkEngine.getQuestionState().sdkQuestionTextAnswers.set(pendingSdkQ.permId, trimmed); this.permissions.getGateway().resolve(pendingSdkQ.permId, 'allow'); return true; } @@ -571,401 +546,8 @@ export class BridgeManager { // Unrecognized slash command → fall through to Claude Code } - // Check for session expiry (>30 min inactivity) and auto-create new session - const expired = this.state.checkAndUpdateLastActive(msg.channelType, msg.chatId); - if (expired) { - const newSessionId = `session-${Date.now()}-${Math.random().toString(36).slice(2, 8)}`; - await this.router.rebind(msg.channelType, msg.chatId, newSessionId); - this.state.clearThread(msg.channelType, msg.chatId); - this.permissions.clearSessionWhitelist(); - } - - const binding = await this.router.resolve(msg.channelType, msg.chatId); - - // Resolve threadId: use existing thread if message came from one, or reuse session thread - let threadId = msg.threadId; - if (!threadId && adapter.channelType === 'discord') { - threadId = this.state.getThread(msg.channelType, msg.chatId); - } - // For Telegram topics, always pass threadId through - if (!threadId && msg.threadId) { - threadId = msg.threadId; - } - - // Reaction target: for Discord threads, reaction goes on the original channel message - const reactionChatId = msg.chatId; - - // Start typing heartbeat (in thread if available) - const typingTarget = threadId && adapter.channelType === 'discord' ? threadId : msg.chatId; - const typingInterval = setInterval(() => { - adapter.sendTyping(typingTarget).catch(() => {}); - }, 4000); - adapter.sendTyping(typingTarget).catch(() => {}); - - const costTracker = new CostTracker(); - costTracker.start(); - - // Add processing reaction - const reactionEmojis: Record = { - telegram: { processing: '\u{1F914}', done: '\u{1F44D}', error: '\u{1F631}' }, - feishu: { processing: 'Typing', done: 'OK', error: 'FACEPALM' }, - discord: { processing: '\u{1F914}', done: '\u{1F44D}', error: '\u{274C}' }, - }; - const reactions = reactionEmojis[adapter.channelType] || reactionEmojis.telegram; - adapter.addReaction(reactionChatId, msg.messageId, reactions.processing).catch(() => {}); - - // Feishu streaming disabled — new renderer uses short status lines - // that don't benefit from streaming, and streaming cards can't be - // edited with im.message.patch (needed for permission buttons) - let feishuSession: import('../channels/feishu-streaming.js').FeishuStreamingSession | null = null; - - const platformLimits: Record = { telegram: 4096, discord: 2000, feishu: 30000 }; - let permissionReminderMsgId: string | undefined; - let permissionReminderTool: string | undefined; - let permissionReminderInput: string | undefined; - const renderer = new MessageRenderer({ - platformLimit: platformLimits[adapter.channelType] ?? 4096, - throttleMs: 300, - onPermissionTimeout: async (toolName, input, buttons) => { - permissionReminderTool = toolName; - permissionReminderInput = input; - const text = `⚠️ Permission pending — ${toolName}: ${permissionReminderInput}`; - const targetChatId = threadId && adapter.channelType === 'discord' ? threadId : msg.chatId; - const outMsg: OutboundMessage = adapter.channelType === 'telegram' - ? { chatId: targetChatId, html: markdownToTelegram(text) } - : { chatId: targetChatId, text }; - outMsg.buttons = buttons.map(b => ({ ...b, style: b.style as 'primary' | 'danger' | 'default' })); - if (threadId) outMsg.threadId = threadId; - try { - const result = await adapter.send(outMsg); - permissionReminderMsgId = result.messageId; - } catch { /* non-fatal */ } - }, - flushCallback: async (content, isEdit, buttons) => { - // Feishu streaming path — skip when buttons needed (streaming doesn't support buttons) - if (feishuSession && !buttons?.length) { - if (!isEdit) { - try { - const messageId = await feishuSession.start(downgradeHeadings(content)); - clearInterval(typingInterval); - return messageId; - } catch { - feishuSession = null; - } - } else { - feishuSession.update(downgradeHeadings(content)).catch(() => {}); - return; - } - } - // Non-streaming path - let outMsg: OutboundMessage; - if (adapter.channelType === 'telegram') { - outMsg = { chatId: msg.chatId, html: markdownToTelegram(content), threadId }; - } else if (adapter.channelType === 'discord') { - outMsg = { chatId: msg.chatId, text: content, threadId }; - } else { - outMsg = { chatId: msg.chatId, text: content }; - } - if (buttons?.length) { - outMsg.buttons = buttons.map(b => ({ ...b, style: b.style as 'primary' | 'danger' | 'default' })); - } - if (!isEdit) { - if (adapter.channelType === 'discord' && !threadId && 'createThread' in adapter) { - const result = await adapter.send(outMsg); - clearInterval(typingInterval); - const preview = (msg.text || 'Claude').slice(0, 80); - const newThreadId = await (adapter as any).createThread(msg.chatId, result.messageId, `💬 ${preview}`); - if (newThreadId) { - threadId = newThreadId; - this.state.setThread(msg.channelType, msg.chatId, newThreadId); - } - return result.messageId; - } - const result = await adapter.send(outMsg); - clearInterval(typingInterval); - return result.messageId; - } else { - const limit = platformLimits[adapter.channelType] ?? 4096; - if (content.length > limit) { - // Overflow: edit first chunk into existing message, send rest as new messages - const chunks = chunkByParagraph(content, limit); - const firstOutMsg: OutboundMessage = adapter.channelType === 'telegram' - ? { chatId: msg.chatId, html: markdownToTelegram(chunks[0]), threadId } - : adapter.channelType === 'discord' - ? { chatId: msg.chatId, text: chunks[0], threadId } - : { chatId: msg.chatId, text: chunks[0] }; - await adapter.editMessage(msg.chatId, renderer.messageId!, firstOutMsg); - const target = threadId && adapter.channelType === 'discord' ? threadId : msg.chatId; - for (let i = 1; i < chunks.length; i++) { - const overflowMsg: OutboundMessage = adapter.channelType === 'telegram' - ? { chatId: target, html: markdownToTelegram(chunks[i]) } - : { chatId: target, text: chunks[i] }; - await adapter.send(overflowMsg); - } - } else { - await adapter.editMessage(msg.chatId, renderer.messageId!, outMsg); - } - } - }, - }); - - let completedStats: import('./cost-tracker.js').UsageStats | undefined; - - // When an AskUserQuestion is approved, auto-allow the next permission request - // to avoid redundant confirmation (e.g. "delete this?" → yes → Bash permission) - let askQuestionApproved = false; - - // Build SDK-level permission handler based on /perm mode - const permMode = this.state.getPermMode(msg.channelType, msg.chatId); - const sdkPermissionHandler = permMode === 'on' - ? async (toolName: string, toolInput: Record, promptSentence: string, _signal?: AbortSignal) => { - // Check dynamic whitelist — auto-allow if previously approved - if (this.permissions.isToolAllowed(toolName, toolInput)) { - console.log(`[bridge] Auto-allowed ${toolName} via session whitelist`); - return 'allow' as const; - } - - // Auto-allow if user just approved an AskUserQuestion - if (askQuestionApproved) { - askQuestionApproved = false; - console.log(`[bridge] Auto-allowed ${toolName} after AskUserQuestion approval`); - return 'allow' as const; - } - - const permId = `sdk-${Date.now()}-${Math.random().toString(36).slice(2, 8)}`; - const chatKey = this.state.stateKey(msg.channelType, msg.chatId); - this.permissions.setPendingSdkPerm(chatKey, permId); - console.log(`[bridge] Permission request: ${toolName} (${permId}) for ${chatKey}`); - - // NOTE: We intentionally ignore the SDK abort signal for IM permissions. - // IM users may respond hours later — the abort signal should not auto-deny - // a permission the user hasn't seen yet. No timeout either. - - // Render permission inline in the terminal card - const inputStr = getToolCommand(toolName, toolInput) - || JSON.stringify(toolInput, null, 2); - const buttons: Array<{ label: string; callbackData: string; style: string }> = [ - { label: '✅ Allow', callbackData: `perm:allow:${permId}`, style: 'primary' }, - { label: '❌ Deny', callbackData: `perm:deny:${permId}`, style: 'danger' }, - ]; - renderer.onPermissionNeeded(toolName, inputStr, permId, buttons); - - // Wait for user response — no timeout, IM users may respond much later - const result = await this.permissions.getGateway().waitFor(permId); - renderer.onPermissionResolved(permId); - - // Update timeout reminder message if it was sent - if (permissionReminderMsgId) { - const icon = result.behavior === 'deny' ? '❌' : '✅'; - const label = `${permissionReminderTool}: ${permissionReminderInput} ${icon}`; - adapter.editMessage(msg.chatId, permissionReminderMsgId, { - chatId: msg.chatId, - text: label, - }).catch(() => {}); - permissionReminderMsgId = undefined; - } - - this.permissions.clearPendingSdkPerm(chatKey); - console.log(`[bridge] Permission resolved: ${toolName} (${permId}) → ${result.behavior}`); - return result.behavior as 'allow' | 'allow_always' | 'deny'; - } - : undefined; - - // Build SDK-level AskUserQuestion handler - const sdkAskQuestionHandler = async ( - questions: Array<{ question: string; header: string; options: Array<{ label: string; description?: string }>; multiSelect: boolean }>, - _signal?: AbortSignal, - ): Promise> => { - if (!questions.length) return {}; - const q = questions[0]; - const permId = `askq-${Date.now()}-${Math.random().toString(36).slice(2, 8)}`; - - // Build question text - const header = q.header ? `📋 **${q.header}**\n\n` : ''; - const optionsList = q.options - .map((opt, i) => `${i + 1}. **${opt.label}**${opt.description ? ` — ${opt.description}` : ''}`) - .join('\n'); - const questionText = `${header}${q.question}\n\n${optionsList}`; - - // Build option buttons: multiSelect uses toggle+submit, singleSelect uses direct select - const isMulti = q.multiSelect; - const buttons: Array<{ label: string; callbackData: string; style: 'primary' | 'danger'; row?: number }> = isMulti - ? [ - ...q.options.map((opt, idx) => ({ - label: `☐ ${opt.label}`, - callbackData: `askq_toggle:${permId}:${idx}:sdk`, - style: 'primary' as const, - row: idx, - })), - { label: '✅ Submit', callbackData: `askq_submit_sdk:${permId}`, style: 'primary' as const, row: q.options.length }, - { label: '❌ Skip', callbackData: `perm:allow:${permId}:askq_skip`, style: 'danger' as const, row: q.options.length }, - ] - : [ - ...q.options.map((opt, idx) => ({ - label: `${idx + 1}. ${opt.label}`, - callbackData: `perm:allow:${permId}:askq:${idx}`, - style: 'primary' as const, - })), - { label: '❌ Skip', callbackData: `perm:allow:${permId}:askq_skip`, style: 'danger' as const }, - ]; - - // Store question data for answer resolution (also needed for toggle state) - this.sdkQuestionData.set(permId, { questions, chatId: msg.chatId }); - // Store in permission coordinator for toggle tracking (reuse hookQuestionData) - if (isMulti) { - this.permissions.storeQuestionData(permId, questions); - } - - // Create gateway entry BEFORE sending — prevents race condition where user - // replies before waitFor is called, causing isPending() to return false - // NOTE: We intentionally ignore the abort signal for AskUserQuestion. - // IM users may respond hours later — questions must wait for user response. - const waitPromise = this.permissions.getGateway().waitFor(permId); - - // Send question card AFTER gateway entry exists — user replies are now safe - const hint = isMulti - ? (msg.channelType === 'feishu' ? '\n\n💬 点击选项切换选中,然后按 Submit 确认' : '\n\n💬 Tap options to toggle, then Submit') - : (msg.channelType === 'feishu' ? '\n\n💬 回复数字选择,或直接输入内容' : '\n\n💬 Reply with number to select, or type your answer'); - - const outMsg: import('../channels/types.js').OutboundMessage = { - chatId: msg.chatId, - text: msg.channelType !== 'telegram' ? questionText + hint : undefined, - html: msg.channelType === 'telegram' ? questionText.replace(/\*\*(.*?)\*\*/g, '$1') + hint : undefined, - buttons, - feishuHeader: msg.channelType === 'feishu' ? { template: 'blue', title: '❓ Question' } : undefined, - }; - const sendResult = await adapter.send(outMsg); - this.permissions.trackPermissionMessage(sendResult.messageId, permId, binding.sessionId, msg.channelType); - - // Await user answer — waits indefinitely until user responds via IM - const result = await waitPromise; - - if (result.behavior === 'deny') { - this.sdkQuestionData.delete(permId); - // Throw so provider returns { behavior: 'deny' } — Claude stops asking - adapter.editMessage(msg.chatId, sendResult.messageId, { - chatId: msg.chatId, - text: '⏭ Skipped', - buttons: [], - feishuHeader: msg.channelType === 'feishu' ? { template: 'grey', title: '⏭ Skipped' } : undefined, - }).catch(() => {}); - throw new Error('User skipped question'); - } - - // User answered — auto-allow the next tool permission in this query - askQuestionApproved = true; - - // Check for free text answer first, then option index - const textAnswer = this.sdkQuestionTextAnswers.get(permId); - this.sdkQuestionTextAnswers.delete(permId); - this.sdkQuestionData.delete(permId); - - if (textAnswer !== undefined) { - // Free text reply - adapter.editMessage(msg.chatId, sendResult.messageId, { - chatId: msg.chatId, - text: `✅ Answer: ${textAnswer.length > 50 ? textAnswer.slice(0, 47) + '...' : textAnswer}`, - buttons: [], - feishuHeader: msg.channelType === 'feishu' ? { template: 'green', title: '✅ Answered' } : undefined, - }).catch(() => {}); - return { [q.question]: textAnswer }; - } - - // Option index reply (button callback already edited the message — skip redundant edit) - const optionIndex = this.sdkQuestionAnswers.get(permId); - this.sdkQuestionAnswers.delete(permId); - const selected = optionIndex !== undefined ? q.options[optionIndex] : undefined; - const answerLabel = selected?.label ?? ''; - - if (!selected) { - // Button callback already edited the card; only update if we somehow have no answer - adapter.editMessage(msg.chatId, sendResult.messageId, { - chatId: msg.chatId, - text: '✅ Answered', - buttons: [], - feishuHeader: msg.channelType === 'feishu' ? { template: 'green', title: '✅ Answered' } : undefined, - }).catch(() => {}); - } - - return { [q.question]: answerLabel }; - }; - - try { - const result = await this.engine.processMessage({ - sessionId: binding.sessionId, - text: msg.text, - attachments: msg.attachments, - llm: this.getProvider(msg.channelType, msg.chatId), - sdkPermissionHandler, - sdkAskQuestionHandler, - effort: this.state.getEffort(msg.channelType, msg.chatId), - model: this.state.getModel(msg.channelType, msg.chatId), - onControls: (ctrl) => { - const chatKey = this.state.stateKey(msg.channelType, msg.chatId); - this.activeControls.set(chatKey, ctrl); - }, - onTextDelta: (delta) => renderer.onTextDelta(delta), - onToolStart: (event) => { - renderer.onToolStart(event.name); - }, - onToolResult: (_event) => { - // No-op — MessageRenderer counts on start, not complete - }, - onAgentStart: (_data) => { - renderer.onToolStart('Agent'); - }, - onAgentProgress: (_data) => { - // No-op — flat display - }, - onAgentComplete: (_data) => { - // No-op — flat display - }, - onToolProgress: (_data) => { - // No-op — flat display - }, - onRateLimit: (data) => { - if (data.status === 'rejected') { - renderer.onTextDelta('\n⚠️ Rate limited. Retrying...\n'); - } else if (data.status === 'allowed_warning' && data.utilization) { - renderer.onTextDelta(`\n⚠️ Rate limit: ${Math.round(data.utilization * 100)}% used\n`); - } - }, - onQueryResult: (event) => { - if (event.permissionDenials?.length) { - console.warn(`[bridge] Permission denials: ${event.permissionDenials.map(d => d.toolName).join(', ')}`); - } - const usage = { input_tokens: event.usage.inputTokens, output_tokens: event.usage.outputTokens, cost_usd: event.usage.costUsd }; - completedStats = costTracker.finish(usage); - renderer.onComplete(completedStats); - }, - onPromptSuggestion: (suggestion) => { - // Send as a quick-reply button after the response completes - const chatId = threadId && adapter.channelType === 'discord' ? threadId : msg.chatId; - const truncated = suggestion.length > 60 ? suggestion.slice(0, 57) + '...' : suggestion; - adapter.send({ - chatId, - text: `💡 ${truncated}`, - buttons: [{ label: '💡 ' + truncated, callbackData: `suggest:${suggestion.slice(0, 200)}`, style: 'default' as const }], - }).catch(() => {}); - }, - onError: (err) => renderer.onError(err), - }); - - // Success: change to done reaction - adapter.addReaction(reactionChatId, msg.messageId, reactions.done).catch(() => {}); - } catch (err) { - // Error: change to error reaction - adapter.addReaction(reactionChatId, msg.messageId, reactions.error).catch(() => {}); - throw err; - } finally { - clearInterval(typingInterval); - renderer.dispose(); - this.activeControls.delete(this.state.stateKey(msg.channelType, msg.chatId)); - // Close Feishu streaming card (no-op: streaming disabled) - // if (feishuSession) { feishuSession.close().catch(() => {}); } - } - - return true; + // SDK conversation — delegate to SDKEngine + return this.sdkEngine.handleMessage(adapter, msg, this.getProvider(msg.channelType, msg.chatId)); } } diff --git a/bridge/src/engine/sdk-engine.ts b/bridge/src/engine/sdk-engine.ts new file mode 100644 index 00000000..d2963fd8 --- /dev/null +++ b/bridge/src/engine/sdk-engine.ts @@ -0,0 +1,463 @@ +import type { BaseChannelAdapter } from '../channels/base.js'; +import type { InboundMessage, OutboundMessage } from '../channels/types.js'; +import type { LLMProvider, QueryControls } from '../providers/base.js'; +import type { PermissionCoordinator } from './permission-coordinator.js'; +import type { SessionStateManager } from './session-state.js'; +import type { ChannelRouter } from './router.js'; +import type { SdkQuestionState } from './callback-router.js'; +import { ConversationEngine } from './conversation.js'; +import { MessageRenderer } from './message-renderer.js'; +import { CostTracker } from './cost-tracker.js'; +import type { UsageStats } from './cost-tracker.js'; +import { getToolCommand } from './tool-registry.js'; +import { markdownToTelegram } from '../markdown/index.js'; +import { downgradeHeadings } from '../markdown/feishu.js'; +import { chunkByParagraph } from '../delivery/delivery.js'; +import type { FeishuStreamingSession } from '../channels/feishu-streaming.js'; + +/** + * Handles the full SDK conversation flow: session management, renderer setup, + * permission handler construction, AskUserQuestion handling, and processMessage call. + * + * Provider-agnostic — works with both Claude SDK and Codex via the LLMProvider interface. + */ +export class SDKEngine { + private engine = new ConversationEngine(); + private activeControls = new Map(); + + // SDK AskUserQuestion state — shared with CallbackRouter via SdkQuestionState interface + private sdkQuestionData = new Map; multiSelect: boolean }>; chatId: string }>(); + private sdkQuestionAnswers = new Map(); + private sdkQuestionTextAnswers = new Map(); + + constructor( + private state: SessionStateManager, + private router: ChannelRouter, + private permissions: PermissionCoordinator, + ) {} + + /** Expose question state for CallbackRouter */ + getQuestionState(): SdkQuestionState { + return { + sdkQuestionData: this.sdkQuestionData, + sdkQuestionAnswers: this.sdkQuestionAnswers, + sdkQuestionTextAnswers: this.sdkQuestionTextAnswers, + }; + } + + /** Expose active controls for /stop command */ + getActiveControls(): Map { + return this.activeControls; + } + + /** Find pending SDK question for text reply routing */ + findPendingQuestion(_channelType: string, chatId: string): { permId: string } | null { + for (const [permId, data] of this.sdkQuestionData) { + if (data.chatId === chatId && this.permissions.getGateway().isPending(permId)) { + return { permId }; + } + } + return null; + } + + /** Run a full SDK conversation turn */ + async handleMessage( + adapter: BaseChannelAdapter, + msg: InboundMessage, + provider: LLMProvider, + ): Promise { + // Check for session expiry (>30 min inactivity) and auto-create new session + const expired = this.state.checkAndUpdateLastActive(msg.channelType, msg.chatId); + if (expired) { + const newSessionId = `session-${Date.now()}-${Math.random().toString(36).slice(2, 8)}`; + await this.router.rebind(msg.channelType, msg.chatId, newSessionId); + this.state.clearThread(msg.channelType, msg.chatId); + this.permissions.clearSessionWhitelist(); + } + + const binding = await this.router.resolve(msg.channelType, msg.chatId); + + // Resolve threadId: use existing thread if message came from one, or reuse session thread + let threadId = msg.threadId; + if (!threadId && adapter.channelType === 'discord') { + threadId = this.state.getThread(msg.channelType, msg.chatId); + } + // For Telegram topics, always pass threadId through + if (!threadId && msg.threadId) { + threadId = msg.threadId; + } + + // Reaction target: for Discord threads, reaction goes on the original channel message + const reactionChatId = msg.chatId; + + // Start typing heartbeat (in thread if available) + const typingTarget = threadId && adapter.channelType === 'discord' ? threadId : msg.chatId; + const typingInterval = setInterval(() => { + adapter.sendTyping(typingTarget).catch(() => {}); + }, 4000); + adapter.sendTyping(typingTarget).catch(() => {}); + + const costTracker = new CostTracker(); + costTracker.start(); + + // Add processing reaction + const reactionEmojis: Record = { + telegram: { processing: '\u{1F914}', done: '\u{1F44D}', error: '\u{1F631}' }, + feishu: { processing: 'Typing', done: 'OK', error: 'FACEPALM' }, + discord: { processing: '\u{1F914}', done: '\u{1F44D}', error: '\u{274C}' }, + }; + const reactions = reactionEmojis[adapter.channelType] || reactionEmojis.telegram; + adapter.addReaction(reactionChatId, msg.messageId, reactions.processing).catch(() => {}); + + // Feishu streaming disabled — new renderer uses short status lines + // that don't benefit from streaming, and streaming cards can't be + // edited with im.message.patch (needed for permission buttons) + let feishuSession: FeishuStreamingSession | null = null; + + const platformLimits: Record = { telegram: 4096, discord: 2000, feishu: 30000 }; + let permissionReminderMsgId: string | undefined; + let permissionReminderTool: string | undefined; + let permissionReminderInput: string | undefined; + const renderer = new MessageRenderer({ + platformLimit: platformLimits[adapter.channelType] ?? 4096, + throttleMs: 300, + onPermissionTimeout: async (toolName, input, buttons) => { + permissionReminderTool = toolName; + permissionReminderInput = input; + const text = `⚠️ Permission pending — ${toolName}: ${permissionReminderInput}`; + const targetChatId = threadId && adapter.channelType === 'discord' ? threadId : msg.chatId; + const outMsg: OutboundMessage = adapter.channelType === 'telegram' + ? { chatId: targetChatId, html: markdownToTelegram(text) } + : { chatId: targetChatId, text }; + outMsg.buttons = buttons.map(b => ({ ...b, style: b.style as 'primary' | 'danger' | 'default' })); + if (threadId) outMsg.threadId = threadId; + try { + const result = await adapter.send(outMsg); + permissionReminderMsgId = result.messageId; + } catch { /* non-fatal */ } + }, + flushCallback: async (content, isEdit, buttons) => { + // Feishu streaming path — skip when buttons needed (streaming doesn't support buttons) + if (feishuSession && !buttons?.length) { + if (!isEdit) { + try { + const messageId = await feishuSession.start(downgradeHeadings(content)); + clearInterval(typingInterval); + return messageId; + } catch { + feishuSession = null; + } + } else { + feishuSession.update(downgradeHeadings(content)).catch(() => {}); + return; + } + } + // Non-streaming path + let outMsg: OutboundMessage; + if (adapter.channelType === 'telegram') { + outMsg = { chatId: msg.chatId, html: markdownToTelegram(content), threadId }; + } else if (adapter.channelType === 'discord') { + outMsg = { chatId: msg.chatId, text: content, threadId }; + } else { + outMsg = { chatId: msg.chatId, text: content }; + } + if (buttons?.length) { + outMsg.buttons = buttons.map(b => ({ ...b, style: b.style as 'primary' | 'danger' | 'default' })); + } + if (!isEdit) { + if (adapter.channelType === 'discord' && !threadId && 'createThread' in adapter) { + const result = await adapter.send(outMsg); + clearInterval(typingInterval); + const preview = (msg.text || 'Claude').slice(0, 80); + const newThreadId = await (adapter as any).createThread(msg.chatId, result.messageId, `💬 ${preview}`); + if (newThreadId) { + threadId = newThreadId; + this.state.setThread(msg.channelType, msg.chatId, newThreadId); + } + return result.messageId; + } + const result = await adapter.send(outMsg); + clearInterval(typingInterval); + return result.messageId; + } else { + const limit = platformLimits[adapter.channelType] ?? 4096; + if (content.length > limit) { + // Overflow: edit first chunk into existing message, send rest as new messages + const chunks = chunkByParagraph(content, limit); + const firstOutMsg: OutboundMessage = adapter.channelType === 'telegram' + ? { chatId: msg.chatId, html: markdownToTelegram(chunks[0]), threadId } + : adapter.channelType === 'discord' + ? { chatId: msg.chatId, text: chunks[0], threadId } + : { chatId: msg.chatId, text: chunks[0] }; + await adapter.editMessage(msg.chatId, renderer.messageId!, firstOutMsg); + const target = threadId && adapter.channelType === 'discord' ? threadId : msg.chatId; + for (let i = 1; i < chunks.length; i++) { + const overflowMsg: OutboundMessage = adapter.channelType === 'telegram' + ? { chatId: target, html: markdownToTelegram(chunks[i]) } + : { chatId: target, text: chunks[i] }; + await adapter.send(overflowMsg); + } + } else { + await adapter.editMessage(msg.chatId, renderer.messageId!, outMsg); + } + } + }, + }); + + let completedStats: UsageStats | undefined; + + // When an AskUserQuestion is approved, auto-allow the next permission request + // to avoid redundant confirmation (e.g. "delete this?" → yes → Bash permission) + let askQuestionApproved = false; + + // Build SDK-level permission handler based on /perm mode + const permMode = this.state.getPermMode(msg.channelType, msg.chatId); + const sdkPermissionHandler = permMode === 'on' + ? async (toolName: string, toolInput: Record, promptSentence: string, _signal?: AbortSignal) => { + // Check dynamic whitelist — auto-allow if previously approved + if (this.permissions.isToolAllowed(toolName, toolInput)) { + console.log(`[bridge] Auto-allowed ${toolName} via session whitelist`); + return 'allow' as const; + } + + // Auto-allow if user just approved an AskUserQuestion + if (askQuestionApproved) { + askQuestionApproved = false; + console.log(`[bridge] Auto-allowed ${toolName} after AskUserQuestion approval`); + return 'allow' as const; + } + + const permId = `sdk-${Date.now()}-${Math.random().toString(36).slice(2, 8)}`; + const chatKey = this.state.stateKey(msg.channelType, msg.chatId); + this.permissions.setPendingSdkPerm(chatKey, permId); + console.log(`[bridge] Permission request: ${toolName} (${permId}) for ${chatKey}`); + + // NOTE: We intentionally ignore the SDK abort signal for IM permissions. + // IM users may respond hours later — the abort signal should not auto-deny + // a permission the user hasn't seen yet. No timeout either. + + // Render permission inline in the terminal card + const inputStr = getToolCommand(toolName, toolInput) + || JSON.stringify(toolInput, null, 2); + const buttons: Array<{ label: string; callbackData: string; style: string }> = [ + { label: '✅ Allow', callbackData: `perm:allow:${permId}`, style: 'primary' }, + { label: '❌ Deny', callbackData: `perm:deny:${permId}`, style: 'danger' }, + ]; + renderer.onPermissionNeeded(toolName, inputStr, permId, buttons); + + // Wait for user response — no timeout, IM users may respond much later + const result = await this.permissions.getGateway().waitFor(permId); + renderer.onPermissionResolved(permId); + + // Update timeout reminder message if it was sent + if (permissionReminderMsgId) { + const icon = result.behavior === 'deny' ? '❌' : '✅'; + const label = `${permissionReminderTool}: ${permissionReminderInput} ${icon}`; + adapter.editMessage(msg.chatId, permissionReminderMsgId, { + chatId: msg.chatId, + text: label, + }).catch(() => {}); + permissionReminderMsgId = undefined; + } + + this.permissions.clearPendingSdkPerm(chatKey); + console.log(`[bridge] Permission resolved: ${toolName} (${permId}) → ${result.behavior}`); + return result.behavior as 'allow' | 'allow_always' | 'deny'; + } + : undefined; + + // Build SDK-level AskUserQuestion handler + const sdkAskQuestionHandler = async ( + questions: Array<{ question: string; header: string; options: Array<{ label: string; description?: string }>; multiSelect: boolean }>, + _signal?: AbortSignal, + ): Promise> => { + if (!questions.length) return {}; + const q = questions[0]; + const permId = `askq-${Date.now()}-${Math.random().toString(36).slice(2, 8)}`; + + // Build question text + const header = q.header ? `📋 **${q.header}**\n\n` : ''; + const optionsList = q.options + .map((opt, i) => `${i + 1}. **${opt.label}**${opt.description ? ` — ${opt.description}` : ''}`) + .join('\n'); + const questionText = `${header}${q.question}\n\n${optionsList}`; + + // Build option buttons: multiSelect uses toggle+submit, singleSelect uses direct select + const isMulti = q.multiSelect; + const buttons: Array<{ label: string; callbackData: string; style: 'primary' | 'danger'; row?: number }> = isMulti + ? [ + ...q.options.map((opt, idx) => ({ + label: `☐ ${opt.label}`, + callbackData: `askq_toggle:${permId}:${idx}:sdk`, + style: 'primary' as const, + row: idx, + })), + { label: '✅ Submit', callbackData: `askq_submit_sdk:${permId}`, style: 'primary' as const, row: q.options.length }, + { label: '❌ Skip', callbackData: `perm:allow:${permId}:askq_skip`, style: 'danger' as const, row: q.options.length }, + ] + : [ + ...q.options.map((opt, idx) => ({ + label: `${idx + 1}. ${opt.label}`, + callbackData: `perm:allow:${permId}:askq:${idx}`, + style: 'primary' as const, + })), + { label: '❌ Skip', callbackData: `perm:allow:${permId}:askq_skip`, style: 'danger' as const }, + ]; + + // Store question data for answer resolution (also needed for toggle state) + this.sdkQuestionData.set(permId, { questions, chatId: msg.chatId }); + // Store in permission coordinator for toggle tracking (reuse hookQuestionData) + if (isMulti) { + this.permissions.storeQuestionData(permId, questions); + } + + // Create gateway entry BEFORE sending — prevents race condition where user + // replies before waitFor is called, causing isPending() to return false + // NOTE: We intentionally ignore the abort signal for AskUserQuestion. + // IM users may respond hours later — questions must wait for user response. + const waitPromise = this.permissions.getGateway().waitFor(permId); + + // Send question card AFTER gateway entry exists — user replies are now safe + const hint = isMulti + ? (msg.channelType === 'feishu' ? '\n\n💬 点击选项切换选中,然后按 Submit 确认' : '\n\n💬 Tap options to toggle, then Submit') + : (msg.channelType === 'feishu' ? '\n\n💬 回复数字选择,或直接输入内容' : '\n\n💬 Reply with number to select, or type your answer'); + + const outMsg: OutboundMessage = { + chatId: msg.chatId, + text: msg.channelType !== 'telegram' ? questionText + hint : undefined, + html: msg.channelType === 'telegram' ? questionText.replace(/\*\*(.*?)\*\*/g, '$1') + hint : undefined, + buttons, + feishuHeader: msg.channelType === 'feishu' ? { template: 'blue', title: '❓ Question' } : undefined, + }; + const sendResult = await adapter.send(outMsg); + this.permissions.trackPermissionMessage(sendResult.messageId, permId, binding.sessionId, msg.channelType); + + // Await user answer — waits indefinitely until user responds via IM + const result = await waitPromise; + + if (result.behavior === 'deny') { + this.sdkQuestionData.delete(permId); + // Throw so provider returns { behavior: 'deny' } — Claude stops asking + adapter.editMessage(msg.chatId, sendResult.messageId, { + chatId: msg.chatId, + text: '⏭ Skipped', + buttons: [], + feishuHeader: msg.channelType === 'feishu' ? { template: 'grey', title: '⏭ Skipped' } : undefined, + }).catch(() => {}); + throw new Error('User skipped question'); + } + + // User answered — auto-allow the next tool permission in this query + askQuestionApproved = true; + + // Check for free text answer first, then option index + const textAnswer = this.sdkQuestionTextAnswers.get(permId); + this.sdkQuestionTextAnswers.delete(permId); + this.sdkQuestionData.delete(permId); + + if (textAnswer !== undefined) { + // Free text reply + adapter.editMessage(msg.chatId, sendResult.messageId, { + chatId: msg.chatId, + text: `✅ Answer: ${textAnswer.length > 50 ? textAnswer.slice(0, 47) + '...' : textAnswer}`, + buttons: [], + feishuHeader: msg.channelType === 'feishu' ? { template: 'green', title: '✅ Answered' } : undefined, + }).catch(() => {}); + return { [q.question]: textAnswer }; + } + + // Option index reply (button callback already edited the message — skip redundant edit) + const optionIndex = this.sdkQuestionAnswers.get(permId); + this.sdkQuestionAnswers.delete(permId); + const selected = optionIndex !== undefined ? q.options[optionIndex] : undefined; + const answerLabel = selected?.label ?? ''; + + if (!selected) { + // Button callback already edited the card; only update if we somehow have no answer + adapter.editMessage(msg.chatId, sendResult.messageId, { + chatId: msg.chatId, + text: '✅ Answered', + buttons: [], + feishuHeader: msg.channelType === 'feishu' ? { template: 'green', title: '✅ Answered' } : undefined, + }).catch(() => {}); + } + + return { [q.question]: answerLabel }; + }; + + try { + const result = await this.engine.processMessage({ + sessionId: binding.sessionId, + text: msg.text, + attachments: msg.attachments, + llm: provider, + sdkPermissionHandler, + sdkAskQuestionHandler, + effort: this.state.getEffort(msg.channelType, msg.chatId), + model: this.state.getModel(msg.channelType, msg.chatId), + onControls: (ctrl) => { + const chatKey = this.state.stateKey(msg.channelType, msg.chatId); + this.activeControls.set(chatKey, ctrl); + }, + onTextDelta: (delta) => renderer.onTextDelta(delta), + onToolStart: (event) => { + renderer.onToolStart(event.name); + }, + onToolResult: (_event) => { + // No-op — MessageRenderer counts on start, not complete + }, + onAgentStart: (_data) => { + renderer.onToolStart('Agent'); + }, + onAgentProgress: (_data) => { + // No-op — flat display + }, + onAgentComplete: (_data) => { + // No-op — flat display + }, + onToolProgress: (_data) => { + // No-op — flat display + }, + onRateLimit: (data) => { + if (data.status === 'rejected') { + renderer.onTextDelta('\n⚠️ Rate limited. Retrying...\n'); + } else if (data.status === 'allowed_warning' && data.utilization) { + renderer.onTextDelta(`\n⚠️ Rate limit: ${Math.round(data.utilization * 100)}% used\n`); + } + }, + onQueryResult: (event) => { + if (event.permissionDenials?.length) { + console.warn(`[bridge] Permission denials: ${event.permissionDenials.map(d => d.toolName).join(', ')}`); + } + const usage = { input_tokens: event.usage.inputTokens, output_tokens: event.usage.outputTokens, cost_usd: event.usage.costUsd }; + completedStats = costTracker.finish(usage); + renderer.onComplete(completedStats); + }, + onPromptSuggestion: (suggestion) => { + // Send as a quick-reply button after the response completes + const chatId = threadId && adapter.channelType === 'discord' ? threadId : msg.chatId; + const truncated = suggestion.length > 60 ? suggestion.slice(0, 57) + '...' : suggestion; + adapter.send({ + chatId, + text: `💡 ${truncated}`, + buttons: [{ label: '💡 ' + truncated, callbackData: `suggest:${suggestion.slice(0, 200)}`, style: 'default' as const }], + }).catch(() => {}); + }, + onError: (err) => renderer.onError(err), + }); + + // Success: change to done reaction + adapter.addReaction(reactionChatId, msg.messageId, reactions.done).catch(() => {}); + } catch (err) { + // Error: change to error reaction + adapter.addReaction(reactionChatId, msg.messageId, reactions.error).catch(() => {}); + throw err; + } finally { + clearInterval(typingInterval); + renderer.dispose(); + this.activeControls.delete(this.state.stateKey(msg.channelType, msg.chatId)); + } + + return true; + } +} From 675de1d9813ca3a73141df41309c07cad8c937cf Mon Sep 17 00:00:00 2001 From: 49 Date: Mon, 6 Apr 2026 15:52:55 +0800 Subject: [PATCH 3/5] refactor(bridge): extract HookEngine from BridgeManager Move hook notification formatting and delivery (~80 lines) into a dedicated HookEngine class. Re-exports HookNotificationData for backward compatibility with existing consumers. --- bridge/src/engine/bridge-manager.ts | 76 +++--------------------- bridge/src/engine/hook-engine.ts | 89 +++++++++++++++++++++++++++++ 2 files changed, 97 insertions(+), 68 deletions(-) create mode 100644 bridge/src/engine/hook-engine.ts diff --git a/bridge/src/engine/bridge-manager.ts b/bridge/src/engine/bridge-manager.ts index 06fb6e7c..dedee0f7 100644 --- a/bridge/src/engine/bridge-manager.ts +++ b/bridge/src/engine/bridge-manager.ts @@ -12,8 +12,10 @@ import { PermissionCoordinator } from './permission-coordinator.js'; import { CommandRouter } from './command-router.js'; import { CallbackRouter } from './callback-router.js'; import { SDKEngine } from './sdk-engine.js'; +import { HookEngine } from './hook-engine.js'; +export type { HookNotificationData } from './hook-engine.js'; import { readFileSync, writeFileSync, mkdirSync } from 'node:fs'; -import { join, basename } from 'node:path'; +import { join } from 'node:path'; import { homedir, networkInterfaces } from 'node:os'; /** Bridge commands handled synchronously (don't block adapter loop) */ @@ -43,18 +45,6 @@ function getLocalIP(): string { return 'localhost'; } -/** Data shape for hook notifications (stop, idle_prompt, etc.) from Go Core */ -export interface HookNotificationData { - tlive_hook_type?: string; - tlive_session_id?: string; - tlive_cwd?: string; - notification_type?: string; - message?: string; - last_assistant_message?: string; - last_output?: string; - [key: string]: unknown; -} - export class BridgeManager { private adapters = new Map(); private running = false; @@ -71,6 +61,7 @@ export class BridgeManager { private commands: CommandRouter; private callbackRouter: CallbackRouter; private sdkEngine: SDKEngine; + private hookEngine: HookEngine; private chatIdFile: string; /** Cached LLM providers keyed by runtime name */ private providerCache = new Map(); @@ -92,6 +83,7 @@ export class BridgeManager { } } catch { /* no saved chat IDs yet */ } this.sdkEngine = new SDKEngine(this.state, this.router, this.permissions); + this.hookEngine = new HookEngine(this.permissions, () => this.coreAvailable, this.token, getLocalIP); this.commands = new CommandRouter( this.state, () => this.adapters, @@ -186,61 +178,9 @@ export class BridgeManager { } } - /** Send a hook notification to IM with [Local] prefix and track for reply routing */ - async sendHookNotification(adapter: BaseChannelAdapter, chatId: string, hook: HookNotificationData, receiveIdType?: string): Promise { - const { formatNotification } = await import('../formatting/index.js'); - const hookType = hook.tlive_hook_type || ''; - - let title: string; - let type: 'stop' | 'idle_prompt' | 'generic'; - let summary: string | undefined; - - // Build context suffix: project name + short session ID - const contextParts: string[] = []; - if (hook.tlive_cwd) { - const projectName = basename(hook.tlive_cwd || '') || ''; - if (projectName) contextParts.push(projectName); - } - if (hook.tlive_session_id) { - const shortId = hook.tlive_session_id.slice(-6); - contextParts.push(`#${shortId}`); - } - const contextSuffix = contextParts.length > 0 ? ' · ' + contextParts.join(' · ') : ''; - - if (hookType === 'stop') { - type = 'stop'; - const raw = (hook.last_assistant_message || hook.last_output || '').trim(); - summary = raw ? (raw.length > 3000 ? raw.slice(0, 2997) + '...' : raw) : undefined; - title = `Terminal${contextSuffix}`; - } else if (hook.notification_type === 'idle_prompt') { - title = `Terminal${contextSuffix} · ` + (hook.message || 'Waiting for input...'); - type = 'idle_prompt'; - } else { - title = hook.message || 'Notification'; - type = 'generic'; - } - - let terminalUrl: string | undefined; - if (this.coreAvailable && hook.tlive_session_id) { - const config = loadConfig(); - const baseUrl = config.publicUrl || `http://${getLocalIP()}:${config.port || 4590}`; - terminalUrl = `${baseUrl}/terminal.html?id=${hook.tlive_session_id}&token=${this.token}`; - } - - const formatted = formatNotification({ type, title, summary, terminalUrl }, adapter.channelType as any); - - const outMsg: import('../channels/types.js').OutboundMessage = { - chatId, - text: formatted.text, - html: formatted.html, - embed: formatted.embed, - buttons: (formatted as any).buttons, - feishuHeader: formatted.feishuHeader, - feishuElements: (formatted as any).feishuElements, - receiveIdType, - }; - const result = await adapter.send(outMsg); - this.permissions.trackHookMessage(result.messageId, hook.tlive_session_id || ''); + /** Send a hook notification to IM — delegates to HookEngine */ + async sendHookNotification(adapter: BaseChannelAdapter, chatId: string, hook: import('./hook-engine.js').HookNotificationData, receiveIdType?: string): Promise { + return this.hookEngine.sendNotification(adapter, chatId, hook, receiveIdType); } private async runAdapterLoop(adapter: BaseChannelAdapter): Promise { diff --git a/bridge/src/engine/hook-engine.ts b/bridge/src/engine/hook-engine.ts new file mode 100644 index 00000000..cda74f38 --- /dev/null +++ b/bridge/src/engine/hook-engine.ts @@ -0,0 +1,89 @@ +import type { BaseChannelAdapter } from '../channels/base.js'; +import type { OutboundMessage } from '../channels/types.js'; +import type { PermissionCoordinator } from './permission-coordinator.js'; +import { loadConfig } from '../config.js'; +import { basename } from 'node:path'; + +/** Data shape for hook notifications (stop, idle_prompt, etc.) from Go Core */ +export interface HookNotificationData { + tlive_hook_type?: string; + tlive_session_id?: string; + tlive_cwd?: string; + notification_type?: string; + message?: string; + last_assistant_message?: string; + last_output?: string; + [key: string]: unknown; +} + +/** + * Handles hook notification delivery to IM platforms and hook reply routing. + * + * Receives notifications from Go Core (stop, idle_prompt, generic) and + * formats/sends them to the appropriate IM channel. + */ +export class HookEngine { + constructor( + private permissions: PermissionCoordinator, + private coreAvailable: () => boolean, + private token: string, + private getLocalIP: () => string, + ) {} + + /** Send hook notification to IM with context suffix and track for reply routing */ + async sendNotification(adapter: BaseChannelAdapter, chatId: string, hook: HookNotificationData, receiveIdType?: string): Promise { + const { formatNotification } = await import('../formatting/index.js'); + const hookType = hook.tlive_hook_type || ''; + + let title: string; + let type: 'stop' | 'idle_prompt' | 'generic'; + let summary: string | undefined; + + // Build context suffix: project name + short session ID + const contextParts: string[] = []; + if (hook.tlive_cwd) { + const projectName = basename(hook.tlive_cwd || '') || ''; + if (projectName) contextParts.push(projectName); + } + if (hook.tlive_session_id) { + const shortId = hook.tlive_session_id.slice(-6); + contextParts.push(`#${shortId}`); + } + const contextSuffix = contextParts.length > 0 ? ' · ' + contextParts.join(' · ') : ''; + + if (hookType === 'stop') { + type = 'stop'; + const raw = (hook.last_assistant_message || hook.last_output || '').trim(); + summary = raw ? (raw.length > 3000 ? raw.slice(0, 2997) + '...' : raw) : undefined; + title = `Terminal${contextSuffix}`; + } else if (hook.notification_type === 'idle_prompt') { + title = `Terminal${contextSuffix} · ` + (hook.message || 'Waiting for input...'); + type = 'idle_prompt'; + } else { + title = hook.message || 'Notification'; + type = 'generic'; + } + + let terminalUrl: string | undefined; + if (this.coreAvailable() && hook.tlive_session_id) { + const config = loadConfig(); + const baseUrl = config.publicUrl || `http://${this.getLocalIP()}:${config.port || 4590}`; + terminalUrl = `${baseUrl}/terminal.html?id=${hook.tlive_session_id}&token=${this.token}`; + } + + const formatted = formatNotification({ type, title, summary, terminalUrl }, adapter.channelType as any); + + const outMsg: OutboundMessage = { + chatId, + text: formatted.text, + html: formatted.html, + embed: formatted.embed, + buttons: (formatted as any).buttons, + feishuHeader: formatted.feishuHeader, + feishuElements: (formatted as any).feishuElements, + receiveIdType, + }; + const result = await adapter.send(outMsg); + this.permissions.trackHookMessage(result.messageId, hook.tlive_session_id || ''); + } +} From 8d1037578c1640936e976e3cec86a2fa3f96cfaf Mon Sep 17 00:00:00 2001 From: 49 Date: Mon, 6 Apr 2026 15:59:00 +0800 Subject: [PATCH 4/5] refactor(bridge): extract MessageRouter from BridgeManager Move auth check, attachment buffering, permission text resolution, AskQuestion text replies, and hook reply routing (~250 lines) into a dedicated MessageRouter class. BridgeManager.handleInboundMessage is now a thin orchestrator delegating to MessageRouter, CallbackRouter, CommandRouter, and SDKEngine. --- bridge/src/engine/bridge-manager.ts | 293 ++------------------------- bridge/src/engine/message-router.ts | 301 ++++++++++++++++++++++++++++ 2 files changed, 316 insertions(+), 278 deletions(-) create mode 100644 bridge/src/engine/message-router.ts diff --git a/bridge/src/engine/bridge-manager.ts b/bridge/src/engine/bridge-manager.ts index dedee0f7..ba7ebf71 100644 --- a/bridge/src/engine/bridge-manager.ts +++ b/bridge/src/engine/bridge-manager.ts @@ -13,10 +13,9 @@ import { CommandRouter } from './command-router.js'; import { CallbackRouter } from './callback-router.js'; import { SDKEngine } from './sdk-engine.js'; import { HookEngine } from './hook-engine.js'; +import { MessageRouter } from './message-router.js'; export type { HookNotificationData } from './hook-engine.js'; -import { readFileSync, writeFileSync, mkdirSync } from 'node:fs'; -import { join } from 'node:path'; -import { homedir, networkInterfaces } from 'node:os'; +import { networkInterfaces } from 'node:os'; /** Bridge commands handled synchronously (don't block adapter loop) */ const QUICK_COMMANDS = new Set(['/new', '/status', '/verbose', '/hooks', '/sessions', '/session', '/help', '/perm', '/effort', '/stop', '/approve', '/pairings', '/runtime', '/settings', '/model']); @@ -54,15 +53,12 @@ export class BridgeManager { private coreAvailable = false; private state = new SessionStateManager(); private permissions: PermissionCoordinator; - private lastChatId = new Map(); - /** Pending image attachments waiting for a text message to merge with (key: channelType:chatId) */ - private pendingAttachments = new Map(); private commands: CommandRouter; private callbackRouter: CallbackRouter; private sdkEngine: SDKEngine; private hookEngine: HookEngine; - private chatIdFile: string; + private messageRouter: MessageRouter; /** Cached LLM providers keyed by runtime name */ private providerCache = new Map(); @@ -74,16 +70,13 @@ export class BridgeManager { this.coreUrl = config.coreUrl; this.token = config.token; this.permissions = new PermissionCoordinator(gateway, broker, this.coreUrl, this.token); - // Load persisted chatIds (so hook routing works without needing a message first) - this.chatIdFile = join(homedir(), '.tlive', 'runtime', 'chat-ids.json'); - try { - const data = JSON.parse(readFileSync(this.chatIdFile, 'utf-8')); - for (const [k, v] of Object.entries(data)) { - if (typeof v === 'string') this.lastChatId.set(k, v); - } - } catch { /* no saved chat IDs yet */ } this.sdkEngine = new SDKEngine(this.state, this.router, this.permissions); this.hookEngine = new HookEngine(this.permissions, () => this.coreAvailable, this.token, getLocalIP); + this.messageRouter = new MessageRouter( + this.permissions, this.state, this.sdkEngine, + () => this.coreAvailable, this.coreUrl, this.token, + ); + this.messageRouter.loadChatIds(); this.commands = new CommandRouter( this.state, () => this.adapters, @@ -112,7 +105,7 @@ export class BridgeManager { /** Get the last active chatId for a given channel type (for hook routing) */ getLastChatId(channelType: string): string { - return this.lastChatId.get(channelType) ?? ''; + return this.messageRouter.getLastChatId(channelType); } /** Resolve LLM provider for a chat — uses per-chat runtime if set, else global default */ @@ -149,11 +142,6 @@ export class BridgeManager { this.permissions.storeQuestionData(hookId, questions, contextSuffix); } - /** Find a pending SDK AskUserQuestion for numeric text reply */ - private findPendingSdkQuestion(channelType: string, chatId: string): { permId: string } | null { - return this.sdkEngine.findPendingQuestion(channelType, chatId); - } - registerAdapter(adapter: BaseChannelAdapter): void { this.adapters.set(adapter.channelType, adapter); } @@ -192,7 +180,7 @@ export class BridgeManager { // Regular messages (Claude queries) are fire-and-forget so they don't // block the loop while waiting for LLM responses or permission approvals. const hasPendingQuestion = this.permissions.getLatestPendingQuestion(adapter.channelType) !== null - || this.findPendingSdkQuestion(adapter.channelType, msg.chatId) !== null; + || this.sdkEngine.findPendingQuestion(adapter.channelType, msg.chatId) !== null; const isQuickMessage = !!msg.callbackData || (msg.text && QUICK_COMMANDS.has(msg.text.split(' ')[0].toLowerCase())) || this.permissions.parsePermissionText(msg.text || '') !== null @@ -219,260 +207,10 @@ export class BridgeManager { } async handleInboundMessage(adapter: BaseChannelAdapter, msg: InboundMessage): Promise { - // Auth check — with pairing mode for Telegram - if (!adapter.isAuthorized(msg.userId, msg.chatId)) { - // Telegram pairing mode: generate code for unknown user (DM only) - if (adapter.channelType === 'telegram' && 'requestPairing' in adapter && msg.text) { - const tgAdapter = adapter as any; - const username = msg.userId; // userId as fallback - const code = tgAdapter.requestPairing(msg.userId, msg.chatId, username); - if (code) { - await adapter.send({ - chatId: msg.chatId, - html: [ - `🔐 Pairing Required`, - '', - `Your pairing code: ${code}`, - '', - `Ask an admin to run /approve ${code} in an authorized channel.`, - `Code expires in 1 hour.`, - ].join('\n'), - }); - } - } - return false; - } - - // Track last active chatId per channel type (used for hook notification routing) - if (msg.chatId) { - this.lastChatId.set(adapter.channelType, msg.chatId); - // Persist so hooks work even after Bridge restart - try { - mkdirSync(join(homedir(), '.tlive', 'runtime'), { recursive: true }); - writeFileSync(this.chatIdFile, JSON.stringify(Object.fromEntries(this.lastChatId))); - } catch { /* non-fatal */ } - } - - // Image buffering: cache image-only messages, merge into next text message - const attachKey = `${msg.channelType}:${msg.chatId}`; - if (msg.attachments?.length && !msg.text && !msg.callbackData) { - // Image-only message: buffer attachments and wait for text - // Limit: max 5 attachments, max 10MB total - const MAX_ATTACHMENTS = 5; - const MAX_TOTAL_BYTES = 10 * 1024 * 1024; - let attachments = msg.attachments.slice(0, MAX_ATTACHMENTS); - const totalBytes = attachments.reduce((sum, a) => sum + a.base64Data.length, 0); - if (totalBytes > MAX_TOTAL_BYTES) { - // Keep only attachments that fit within budget - let budget = MAX_TOTAL_BYTES; - attachments = attachments.filter(a => { - if (a.base64Data.length <= budget) { - budget -= a.base64Data.length; - return true; - } - return false; - }); - console.warn(`[${msg.channelType}] Attachment buffer exceeded 10MB limit, kept ${attachments.length}`); - } - if (attachments.length > 0) { - this.pendingAttachments.set(attachKey, { - attachments, - timestamp: Date.now(), - }); - console.log(`[${msg.channelType}] Buffered ${attachments.length} attachment(s), waiting for text`); - } - return true; - } - // Merge pending attachments into current text message - if (msg.text && !msg.callbackData) { - const pending = this.pendingAttachments.get(attachKey); - if (pending && Date.now() - pending.timestamp < 60_000) { - msg.attachments = [...(msg.attachments || []), ...pending.attachments]; - console.log(`[${msg.channelType}] Merged ${pending.attachments.length} buffered attachment(s) with text`); - } - this.pendingAttachments.delete(attachKey); - } - - // Text-based permission resolution (all platforms — fallback when buttons expire) - if (msg.text) { - const decision = this.permissions.parsePermissionText(msg.text); - if (decision) { - // 1. Try SDK permission gateway — scoped to THIS chat only - const chatKey = this.state.stateKey(msg.channelType, msg.chatId); - if (this.permissions.tryResolveByText(chatKey, decision)) { - // Brief reaction instead of a full card — avoids flooding - const emoji = decision === 'deny' ? 'NO' : decision === 'allow_always' ? 'DONE' : 'OK'; - adapter.addReaction(msg.chatId, msg.messageId, emoji).catch(() => {}); - return true; - } - - // 2. Try hook permission (via Go Core) - if (this.permissions.pendingPermissionCount() > 1 && !msg.replyToMessageId) { - const hint = adapter.channelType === 'feishu' - ? '⚠️ 多个权限待审批,请引用回复具体的权限消息' - : '⚠️ Multiple permissions pending — reply to the specific permission message'; - await adapter.send({ chatId: msg.chatId, text: hint }); - return true; - } - const permEntry = this.permissions.findHookPermission(msg.replyToMessageId, adapter.channelType); - if (permEntry && this.coreAvailable) { - try { - await this.permissions.resolveHookPermission(permEntry.permissionId, decision, adapter.channelType, this.coreAvailable); - const label = decision === 'deny' ? '❌ Denied' : decision === 'allow_always' ? '📌 Always allowed' : '✅ Allowed'; - await adapter.send({ chatId: msg.chatId, text: label }); - } catch (err) { - await adapter.send({ chatId: msg.chatId, text: `❌ Failed to resolve: ${err}` }); - } - return true; - } - } - } - - // Text reply to pending AskUserQuestion — numeric (select option) or free text (direct input) - if (msg.text) { - const trimmed = msg.text.trim(); - // Check for any pending AskUserQuestion (hook or SDK mode) - const pendingHookQ = this.permissions.getLatestPendingQuestion(adapter.channelType); - const pendingSdkQ = this.findPendingSdkQuestion(adapter.channelType, msg.chatId); - - if (pendingHookQ || pendingSdkQ) { - // Check if input is a valid in-range numeric option selection - let validOptionIndex = -1; - const numMatch = trimmed.match(/^(\d+)$/); - if (numMatch) { - const idx = parseInt(numMatch[1], 10) - 1; - if (idx >= 0) { - // Validate against actual options count to avoid "Selected: ?" for out-of-range numbers - const qData = pendingHookQ - ? this.permissions.getQuestionData(pendingHookQ.hookId) - : pendingSdkQ ? this.sdkEngine.getQuestionState().sdkQuestionData.get(pendingSdkQ.permId) : null; - const optionsCount = qData?.questions?.[0]?.options?.length ?? 0; - if (idx < optionsCount) validOptionIndex = idx; - } - } - - if (validOptionIndex >= 0) { - // Numeric reply — select option by validated index - if (pendingHookQ) { - await this.permissions.resolveAskQuestion( - pendingHookQ.hookId, validOptionIndex, pendingHookQ.sessionId, - pendingHookQ.messageId, adapter, msg.chatId, this.coreAvailable, - ); - return true; - } - if (pendingSdkQ) { - this.sdkEngine.getQuestionState().sdkQuestionAnswers.set(pendingSdkQ.permId, validOptionIndex); - this.permissions.getGateway().resolve(pendingSdkQ.permId, 'allow'); - return true; - } - } else { - // Free text reply (including out-of-range numbers) — use text as direct answer - if (pendingHookQ) { - await this.permissions.resolveAskQuestionWithText( - pendingHookQ.hookId, trimmed, pendingHookQ.sessionId, - pendingHookQ.messageId, adapter, msg.chatId, this.coreAvailable, - ); - return true; - } - if (pendingSdkQ) { - this.sdkEngine.getQuestionState().sdkQuestionTextAnswers.set(pendingSdkQ.permId, trimmed); - this.permissions.getGateway().resolve(pendingSdkQ.permId, 'allow'); - return true; - } - } - } - } - - // Reply routing: quote-reply to a hook message → send to PTY stdin - if ((msg.text || msg.attachments?.length) && msg.replyToMessageId && this.permissions.isHookMessage(msg.replyToMessageId)) { - // Before forwarding to PTY, check Core for a pending AskUserQuestion that - // the bridge hasn't polled yet (race condition: hook creates perm, bridge - // polls every 2s, user replies before the next poll cycle). - if (msg.text && this.coreAvailable) { - try { - const pendingResp = await fetch(`${this.coreUrl}/api/hooks/pending`, { - headers: { Authorization: `Bearer ${this.token}` }, - signal: AbortSignal.timeout(2000), - }); - if (pendingResp.ok) { - const pending = await pendingResp.json() as Array<{ id: string; tool_name: string; input: unknown; session_id?: string }>; - const askq = pending.find((p: { tool_name: string }) => p.tool_name === 'AskUserQuestion'); - if (askq) { - // There's a pending AskUserQuestion — handle text as question answer - const inputData = (typeof askq.input === 'string' - ? (() => { try { return JSON.parse(askq.input as string); } catch { return {}; } })() - : askq.input) as Record; - const questions = (inputData?.questions ?? []) as Array<{ - question: string; header: string; - options: Array<{ label: string; description?: string }>; multiSelect: boolean; - }>; - if (questions.length > 0) { - const q = questions[0]; - const trimmed = msg.text.trim(); - // Store question data if not already stored - if (!this.permissions.getQuestionData(askq.id)) { - this.permissions.storeQuestionData(askq.id, questions); - this.permissions.trackPermissionMessage(msg.replyToMessageId, askq.id, askq.session_id || '', adapter.channelType); - } - // Numeric → option selection; else → free text - const numMatch = trimmed.match(/^(\d+)$/); - const idx = numMatch ? parseInt(numMatch[1], 10) - 1 : -1; - if (idx >= 0 && idx < q.options.length) { - await this.permissions.resolveAskQuestion( - askq.id, idx, askq.session_id || '', - msg.replyToMessageId, adapter, msg.chatId, this.coreAvailable, - ); - } else { - await this.permissions.resolveAskQuestionWithText( - askq.id, trimmed, askq.session_id || '', - msg.replyToMessageId, adapter, msg.chatId, this.coreAvailable, - ); - } - return true; - } - } - } - } catch { /* non-fatal: fall through to normal PTY routing */ } - } - - const entry = this.permissions.getHookMessage(msg.replyToMessageId)!; - if (entry.sessionId && this.coreAvailable) { - try { - // If images attached, save as temp files and include paths in the text - let inputText = msg.text || ''; - if (msg.attachments?.length) { - const { writeFileSync, mkdirSync } = await import('node:fs'); - const { join } = await import('node:path'); - const { tmpdir } = await import('node:os'); - const imgDir = join(tmpdir(), 'tlive-images'); - mkdirSync(imgDir, { recursive: true }); - for (const att of msg.attachments) { - if (att.type === 'image') { - const ext = att.mimeType === 'image/png' ? '.png' : '.jpg'; - const filePath = join(imgDir, `img-${Date.now()}${ext}`); - writeFileSync(filePath, Buffer.from(att.base64Data, 'base64')); - inputText = inputText ? `${inputText}\n${filePath}` : filePath; - } - } - } - await fetch(`${this.coreUrl}/api/sessions/${entry.sessionId}/input`, { - method: 'POST', - headers: { - Authorization: `Bearer ${this.token}`, - 'Content-Type': 'application/json', - }, - body: JSON.stringify({ text: inputText + '\r' }), - signal: AbortSignal.timeout(5000), - }); - await adapter.send({ chatId: msg.chatId, text: '✓ Sent to local session' }); - } catch (err) { - await adapter.send({ chatId: msg.chatId, text: `❌ Failed to send: ${err}` }); - } - } else { - await adapter.send({ chatId: msg.chatId, text: '⚠️ Local session not available (no session ID)' }); - } - return true; - } + // Text routing: auth, attachments, permissions, AskQuestion replies, hook replies + const result = await this.messageRouter.route(adapter, msg); + if (result.action === 'handled') return true; + if (result.action === 'unauthorized') return false; // Callback data — delegate to CallbackRouter if (msg.callbackData) { @@ -480,10 +218,9 @@ export class BridgeManager { } // Bridge commands — only intercept known commands, pass others to Claude Code - if (msg.text.startsWith('/')) { + if (msg.text?.startsWith('/')) { const handled = await this.commands.handle(adapter, msg); if (handled) return true; - // Unrecognized slash command → fall through to Claude Code } // SDK conversation — delegate to SDKEngine diff --git a/bridge/src/engine/message-router.ts b/bridge/src/engine/message-router.ts new file mode 100644 index 00000000..5f7761f9 --- /dev/null +++ b/bridge/src/engine/message-router.ts @@ -0,0 +1,301 @@ +import type { BaseChannelAdapter } from '../channels/base.js'; +import type { InboundMessage, FileAttachment } from '../channels/types.js'; +import type { PermissionCoordinator } from './permission-coordinator.js'; +import type { SessionStateManager } from './session-state.js'; +import type { SDKEngine } from './sdk-engine.js'; +import { writeFileSync, mkdirSync } from 'node:fs'; +import { join } from 'node:path'; +import { homedir, tmpdir } from 'node:os'; + +export type RouteResult = + | { action: 'handled' } + | { action: 'pass' } // pass to commands + SDK engine + | { action: 'unauthorized' }; + +/** + * Routes inbound text messages through auth, attachment buffering, + * permission resolution, AskQuestion text replies, and hook reply routing. + * + * Returns a RouteResult indicating whether the message was fully handled + * or should be passed through to commands and the SDK engine. + */ +export class MessageRouter { + /** Pending image attachments waiting for a text message to merge with (key: channelType:chatId) */ + private pendingAttachments = new Map(); + private lastChatId = new Map(); + private chatIdFile: string; + + constructor( + private permissions: PermissionCoordinator, + private state: SessionStateManager, + private sdkEngine: SDKEngine, + private coreAvailable: () => boolean, + private coreUrl: string, + private token: string, + ) { + this.chatIdFile = join(homedir(), '.tlive', 'runtime', 'chat-ids.json'); + } + + /** Load persisted chatIds from disk (called once at startup) */ + loadChatIds(): void { + try { + const { readFileSync } = require('node:fs'); + const data = JSON.parse(readFileSync(this.chatIdFile, 'utf-8')); + for (const [k, v] of Object.entries(data)) { + if (typeof v === 'string') this.lastChatId.set(k, v); + } + } catch { /* no saved chat IDs yet */ } + } + + /** Get the last active chatId for a given channel type (for hook routing) */ + getLastChatId(channelType: string): string { + return this.lastChatId.get(channelType) ?? ''; + } + + /** Route an inbound message. Returns what happened. */ + async route(adapter: BaseChannelAdapter, msg: InboundMessage): Promise { + // 1. Auth check — with pairing mode for Telegram + if (!adapter.isAuthorized(msg.userId, msg.chatId)) { + if (adapter.channelType === 'telegram' && 'requestPairing' in adapter && msg.text) { + const tgAdapter = adapter as any; + const username = msg.userId; + const code = tgAdapter.requestPairing(msg.userId, msg.chatId, username); + if (code) { + await adapter.send({ + chatId: msg.chatId, + html: [ + `🔐 Pairing Required`, + '', + `Your pairing code: ${code}`, + '', + `Ask an admin to run /approve ${code} in an authorized channel.`, + `Code expires in 1 hour.`, + ].join('\n'), + }); + } + } + return { action: 'unauthorized' }; + } + + // 2. Track last active chatId per channel type (used for hook notification routing) + if (msg.chatId) { + this.lastChatId.set(adapter.channelType, msg.chatId); + try { + mkdirSync(join(homedir(), '.tlive', 'runtime'), { recursive: true }); + writeFileSync(this.chatIdFile, JSON.stringify(Object.fromEntries(this.lastChatId))); + } catch { /* non-fatal */ } + } + + // 3. Image buffering: cache image-only messages, merge into next text message + const attachKey = `${msg.channelType}:${msg.chatId}`; + if (msg.attachments?.length && !msg.text && !msg.callbackData) { + const MAX_ATTACHMENTS = 5; + const MAX_TOTAL_BYTES = 10 * 1024 * 1024; + let attachments = msg.attachments.slice(0, MAX_ATTACHMENTS); + const totalBytes = attachments.reduce((sum, a) => sum + a.base64Data.length, 0); + if (totalBytes > MAX_TOTAL_BYTES) { + let budget = MAX_TOTAL_BYTES; + attachments = attachments.filter(a => { + if (a.base64Data.length <= budget) { + budget -= a.base64Data.length; + return true; + } + return false; + }); + console.warn(`[${msg.channelType}] Attachment buffer exceeded 10MB limit, kept ${attachments.length}`); + } + if (attachments.length > 0) { + this.pendingAttachments.set(attachKey, { + attachments, + timestamp: Date.now(), + }); + console.log(`[${msg.channelType}] Buffered ${attachments.length} attachment(s), waiting for text`); + } + return { action: 'handled' }; + } + // Merge pending attachments into current text message + if (msg.text && !msg.callbackData) { + const pending = this.pendingAttachments.get(attachKey); + if (pending && Date.now() - pending.timestamp < 60_000) { + msg.attachments = [...(msg.attachments || []), ...pending.attachments]; + console.log(`[${msg.channelType}] Merged ${pending.attachments.length} buffered attachment(s) with text`); + } + this.pendingAttachments.delete(attachKey); + } + + // 4. Text-based permission resolution (all platforms — fallback when buttons expire) + if (msg.text) { + const decision = this.permissions.parsePermissionText(msg.text); + if (decision) { + const chatKey = this.state.stateKey(msg.channelType, msg.chatId); + if (this.permissions.tryResolveByText(chatKey, decision)) { + const emoji = decision === 'deny' ? 'NO' : decision === 'allow_always' ? 'DONE' : 'OK'; + adapter.addReaction(msg.chatId, msg.messageId, emoji).catch(() => {}); + return { action: 'handled' }; + } + + if (this.permissions.pendingPermissionCount() > 1 && !msg.replyToMessageId) { + const hint = adapter.channelType === 'feishu' + ? '⚠️ 多个权限待审批,请引用回复具体的权限消息' + : '⚠️ Multiple permissions pending — reply to the specific permission message'; + await adapter.send({ chatId: msg.chatId, text: hint }); + return { action: 'handled' }; + } + const permEntry = this.permissions.findHookPermission(msg.replyToMessageId, adapter.channelType); + if (permEntry && this.coreAvailable()) { + try { + await this.permissions.resolveHookPermission(permEntry.permissionId, decision, adapter.channelType, this.coreAvailable()); + const label = decision === 'deny' ? '❌ Denied' : decision === 'allow_always' ? '📌 Always allowed' : '✅ Allowed'; + await adapter.send({ chatId: msg.chatId, text: label }); + } catch (err) { + await adapter.send({ chatId: msg.chatId, text: `❌ Failed to resolve: ${err}` }); + } + return { action: 'handled' }; + } + } + } + + // 5. Text reply to pending AskUserQuestion — numeric (select option) or free text + if (msg.text) { + const trimmed = msg.text.trim(); + const pendingHookQ = this.permissions.getLatestPendingQuestion(adapter.channelType); + const pendingSdkQ = this.sdkEngine.findPendingQuestion(adapter.channelType, msg.chatId); + + if (pendingHookQ || pendingSdkQ) { + let validOptionIndex = -1; + const numMatch = trimmed.match(/^(\d+)$/); + if (numMatch) { + const idx = parseInt(numMatch[1], 10) - 1; + if (idx >= 0) { + const qData = pendingHookQ + ? this.permissions.getQuestionData(pendingHookQ.hookId) + : pendingSdkQ ? this.sdkEngine.getQuestionState().sdkQuestionData.get(pendingSdkQ.permId) : null; + const optionsCount = qData?.questions?.[0]?.options?.length ?? 0; + if (idx < optionsCount) validOptionIndex = idx; + } + } + + if (validOptionIndex >= 0) { + if (pendingHookQ) { + await this.permissions.resolveAskQuestion( + pendingHookQ.hookId, validOptionIndex, pendingHookQ.sessionId, + pendingHookQ.messageId, adapter, msg.chatId, this.coreAvailable(), + ); + return { action: 'handled' }; + } + if (pendingSdkQ) { + this.sdkEngine.getQuestionState().sdkQuestionAnswers.set(pendingSdkQ.permId, validOptionIndex); + this.permissions.getGateway().resolve(pendingSdkQ.permId, 'allow'); + return { action: 'handled' }; + } + } else { + if (pendingHookQ) { + await this.permissions.resolveAskQuestionWithText( + pendingHookQ.hookId, trimmed, pendingHookQ.sessionId, + pendingHookQ.messageId, adapter, msg.chatId, this.coreAvailable(), + ); + return { action: 'handled' }; + } + if (pendingSdkQ) { + this.sdkEngine.getQuestionState().sdkQuestionTextAnswers.set(pendingSdkQ.permId, trimmed); + this.permissions.getGateway().resolve(pendingSdkQ.permId, 'allow'); + return { action: 'handled' }; + } + } + } + } + + // 6. Reply routing: quote-reply to a hook message → send to PTY stdin + if ((msg.text || msg.attachments?.length) && msg.replyToMessageId && this.permissions.isHookMessage(msg.replyToMessageId)) { + await this.routeHookReply(adapter, msg); + return { action: 'handled' }; + } + + // 7. Not handled — pass through to callbacks, commands, SDK engine + return { action: 'pass' }; + } + + /** Route a quote-reply to a hook message → PTY stdin or pending AskUserQuestion */ + private async routeHookReply(adapter: BaseChannelAdapter, msg: InboundMessage): Promise { + // Before forwarding to PTY, check Core for a pending AskUserQuestion that + // the bridge hasn't polled yet (race condition: hook creates perm, bridge + // polls every 2s, user replies before the next poll cycle). + if (msg.text && this.coreAvailable()) { + try { + const pendingResp = await fetch(`${this.coreUrl}/api/hooks/pending`, { + headers: { Authorization: `Bearer ${this.token}` }, + signal: AbortSignal.timeout(2000), + }); + if (pendingResp.ok) { + const pending = await pendingResp.json() as Array<{ id: string; tool_name: string; input: unknown; session_id?: string }>; + const askq = pending.find((p: { tool_name: string }) => p.tool_name === 'AskUserQuestion'); + if (askq) { + const inputData = (typeof askq.input === 'string' + ? (() => { try { return JSON.parse(askq.input as string); } catch { return {}; } })() + : askq.input) as Record; + const questions = (inputData?.questions ?? []) as Array<{ + question: string; header: string; + options: Array<{ label: string; description?: string }>; multiSelect: boolean; + }>; + if (questions.length > 0) { + const q = questions[0]; + const trimmed = msg.text.trim(); + if (!this.permissions.getQuestionData(askq.id)) { + this.permissions.storeQuestionData(askq.id, questions); + this.permissions.trackPermissionMessage(msg.replyToMessageId!, askq.id, askq.session_id || '', adapter.channelType); + } + const numMatch = trimmed.match(/^(\d+)$/); + const idx = numMatch ? parseInt(numMatch[1], 10) - 1 : -1; + if (idx >= 0 && idx < q.options.length) { + await this.permissions.resolveAskQuestion( + askq.id, idx, askq.session_id || '', + msg.replyToMessageId!, adapter, msg.chatId, this.coreAvailable(), + ); + } else { + await this.permissions.resolveAskQuestionWithText( + askq.id, trimmed, askq.session_id || '', + msg.replyToMessageId!, adapter, msg.chatId, this.coreAvailable(), + ); + } + return; + } + } + } + } catch { /* non-fatal: fall through to normal PTY routing */ } + } + + const entry = this.permissions.getHookMessage(msg.replyToMessageId!)!; + if (entry.sessionId && this.coreAvailable()) { + try { + // If images attached, save as temp files and include paths in the text + let inputText = msg.text || ''; + if (msg.attachments?.length) { + const imgDir = join(tmpdir(), 'tlive-images'); + mkdirSync(imgDir, { recursive: true }); + for (const att of msg.attachments) { + if (att.type === 'image') { + const ext = att.mimeType === 'image/png' ? '.png' : '.jpg'; + const filePath = join(imgDir, `img-${Date.now()}${ext}`); + writeFileSync(filePath, Buffer.from(att.base64Data, 'base64')); + inputText = inputText ? `${inputText}\n${filePath}` : filePath; + } + } + } + await fetch(`${this.coreUrl}/api/sessions/${entry.sessionId}/input`, { + method: 'POST', + headers: { + Authorization: `Bearer ${this.token}`, + 'Content-Type': 'application/json', + }, + body: JSON.stringify({ text: inputText + '\r' }), + signal: AbortSignal.timeout(5000), + }); + await adapter.send({ chatId: msg.chatId, text: '✓ Sent to local session' }); + } catch (err) { + await adapter.send({ chatId: msg.chatId, text: `❌ Failed to send: ${err}` }); + } + } else { + await adapter.send({ chatId: msg.chatId, text: '⚠️ Local session not available (no session ID)' }); + } + } +} From 0904ee11d7178df384cac39e84a3ea73e19a8b32 Mon Sep 17 00:00:00 2001 From: 49 Date: Mon, 6 Apr 2026 16:07:08 +0800 Subject: [PATCH 5/5] test(bridge): add unit tests for extracted engine modules - callback-router.test.ts: 24 tests covering all 13+ callback types - message-router.test.ts: 22 tests for auth, buffering, permissions, hook routing - hook-engine.test.ts: 17 tests for notification formatting, context, URLs Total: 506 tests (up from 443) --- bridge/src/__tests__/callback-router.test.ts | 391 +++++++++++++++ bridge/src/__tests__/hook-engine.test.ts | 351 ++++++++++++++ bridge/src/__tests__/message-router.test.ts | 477 +++++++++++++++++++ 3 files changed, 1219 insertions(+) create mode 100644 bridge/src/__tests__/callback-router.test.ts create mode 100644 bridge/src/__tests__/hook-engine.test.ts create mode 100644 bridge/src/__tests__/message-router.test.ts diff --git a/bridge/src/__tests__/callback-router.test.ts b/bridge/src/__tests__/callback-router.test.ts new file mode 100644 index 00000000..d3c6308a --- /dev/null +++ b/bridge/src/__tests__/callback-router.test.ts @@ -0,0 +1,391 @@ +import { describe, it, expect, vi, beforeEach } from 'vitest'; +import { CallbackRouter } from '../engine/callback-router.js'; +import type { SdkQuestionState } from '../engine/callback-router.js'; +import type { BaseChannelAdapter } from '../channels/base.js'; +import type { InboundMessage } from '../channels/types.js'; + +function mockAdapter(channelType = 'telegram'): BaseChannelAdapter { + const messageQueue: any[] = []; + return { + channelType, + start: vi.fn().mockResolvedValue(undefined), + stop: vi.fn().mockResolvedValue(undefined), + consumeOne: vi.fn().mockImplementation(() => messageQueue.shift() ?? null), + send: vi.fn().mockResolvedValue({ messageId: '1', success: true }), + editMessage: vi.fn().mockResolvedValue(undefined), + sendTyping: vi.fn().mockResolvedValue(undefined), + addReaction: vi.fn().mockResolvedValue(undefined), + removeReaction: vi.fn().mockResolvedValue(undefined), + validateConfig: vi.fn().mockReturnValue(null), + isAuthorized: vi.fn().mockReturnValue(true), + _pushMessage: (msg: any) => messageQueue.push(msg), + } as any; +} + +function createMockGateway() { + return { + resolve: vi.fn(), + pendingCount: vi.fn().mockReturnValue(0), + }; +} + +function createMockPermissions(gateway = createMockGateway()) { + return { + resolveAskQuestion: vi.fn().mockResolvedValue(undefined), + toggleMultiSelectOption: vi.fn().mockReturnValue(new Set()), + buildMultiSelectCard: vi.fn().mockReturnValue({ + text: 'card text', + html: 'card', + buttons: [{ text: 'Submit', callbackData: 'askq_submit:h1:s1' }], + }), + resolveMultiSelect: vi.fn().mockResolvedValue(undefined), + resolveAskQuestionSkip: vi.fn().mockResolvedValue(undefined), + getToggledSelections: vi.fn().mockReturnValue(new Set()), + cleanupQuestion: vi.fn(), + getGateway: vi.fn().mockReturnValue(gateway), + resolveHookCallback: vi.fn().mockResolvedValue(undefined), + addAllowedTool: vi.fn(), + addAllowedBashPrefix: vi.fn(), + handleBrokerCallback: vi.fn(), + }; +} + +function makeMsg(overrides: Partial = {}): InboundMessage { + return { + channelType: 'telegram', + chatId: 'c1', + userId: 'u1', + text: '', + messageId: 'm1', + ...overrides, + }; +} + +describe('CallbackRouter', () => { + let adapter: BaseChannelAdapter; + let permissions: ReturnType; + let gateway: ReturnType; + let sdkState: SdkQuestionState; + let handleInboundMessage: ReturnType; + let router: CallbackRouter; + + beforeEach(() => { + adapter = mockAdapter(); + gateway = createMockGateway(); + permissions = createMockPermissions(gateway); + sdkState = { + sdkQuestionData: new Map(), + sdkQuestionAnswers: new Map(), + sdkQuestionTextAnswers: new Map(), + }; + handleInboundMessage = vi.fn().mockResolvedValue(true); + router = new CallbackRouter( + permissions as any, + sdkState, + () => true, + handleInboundMessage, + ); + }); + + it('returns false when no callbackData', async () => { + const result = await router.handle(adapter, makeMsg({ callbackData: undefined })); + expect(result).toBe(false); + }); + + describe('suggest:', () => { + it('re-injects suggestion as text message and calls handleInboundMessage', async () => { + const msg = makeMsg({ callbackData: 'suggest:Hello world' }); + const result = await router.handle(adapter, msg); + + expect(result).toBe(true); + expect(handleInboundMessage).toHaveBeenCalledWith(adapter, expect.objectContaining({ + text: 'Hello world', + callbackData: undefined, + })); + }); + }); + + describe('askq:{hookId}:{idx}:{sessionId}', () => { + it('resolves single-select hook answer', async () => { + const msg = makeMsg({ callbackData: 'askq:h1:2:sess1' }); + const result = await router.handle(adapter, msg); + + expect(result).toBe(true); + expect(permissions.resolveAskQuestion).toHaveBeenCalledWith( + 'h1', 2, 'sess1', 'm1', adapter, 'c1', true, + ); + }); + + it('handles missing sessionId gracefully', async () => { + const msg = makeMsg({ callbackData: 'askq:h1:0' }); + const result = await router.handle(adapter, msg); + + expect(result).toBe(true); + expect(permissions.resolveAskQuestion).toHaveBeenCalledWith( + 'h1', 0, '', 'm1', adapter, 'c1', true, + ); + }); + }); + + describe('askq_toggle:{hookId}:{idx}:{sessionId}', () => { + it('toggles multi-select option and rebuilds card', async () => { + permissions.toggleMultiSelectOption.mockReturnValue(new Set([0, 2])); + const msg = makeMsg({ callbackData: 'askq_toggle:h1:2:sess1' }); + const result = await router.handle(adapter, msg); + + expect(result).toBe(true); + expect(permissions.toggleMultiSelectOption).toHaveBeenCalledWith('h1', 2); + expect(permissions.buildMultiSelectCard).toHaveBeenCalledWith('h1', 'sess1', new Set([0, 2]), 'telegram'); + expect(adapter.editMessage).toHaveBeenCalledWith('c1', 'm1', expect.objectContaining({ + text: 'card text', + buttons: expect.any(Array), + })); + }); + + it('returns true without editing when toggle returns null', async () => { + permissions.toggleMultiSelectOption.mockReturnValue(null); + const msg = makeMsg({ callbackData: 'askq_toggle:h1:0:sess1' }); + const result = await router.handle(adapter, msg); + + expect(result).toBe(true); + expect(adapter.editMessage).not.toHaveBeenCalled(); + }); + + it('returns true without editing when buildMultiSelectCard returns null', async () => { + permissions.toggleMultiSelectOption.mockReturnValue(new Set([1])); + permissions.buildMultiSelectCard.mockReturnValue(null); + const msg = makeMsg({ callbackData: 'askq_toggle:h1:1:sess1' }); + const result = await router.handle(adapter, msg); + + expect(result).toBe(true); + expect(adapter.editMessage).not.toHaveBeenCalled(); + }); + + it('passes feishuHeader when adapter is feishu', async () => { + const feishuAdapter = mockAdapter('feishu'); + permissions.toggleMultiSelectOption.mockReturnValue(new Set([0])); + const msg = makeMsg({ callbackData: 'askq_toggle:h1:0:sess1', channelType: 'feishu' }); + await router.handle(feishuAdapter, msg); + + expect(feishuAdapter.editMessage).toHaveBeenCalledWith('c1', 'm1', expect.objectContaining({ + feishuHeader: { template: 'blue', title: '❓ Terminal' }, + })); + }); + }); + + describe('askq_submit:{hookId}:{sessionId}', () => { + it('resolves multi-select submit', async () => { + const msg = makeMsg({ callbackData: 'askq_submit:h1:sess1' }); + const result = await router.handle(adapter, msg); + + expect(result).toBe(true); + expect(permissions.resolveMultiSelect).toHaveBeenCalledWith( + 'h1', 'sess1', 'm1', adapter, 'c1', true, + ); + }); + }); + + describe('askq_skip:{hookId}:{sessionId}', () => { + it('resolves skip handler', async () => { + const msg = makeMsg({ callbackData: 'askq_skip:h1:sess1' }); + const result = await router.handle(adapter, msg); + + expect(result).toBe(true); + expect(permissions.resolveAskQuestionSkip).toHaveBeenCalledWith( + 'h1', 'sess1', 'm1', adapter, 'c1', true, + ); + }); + }); + + describe('askq_submit_sdk:{permId}', () => { + it('sends warning when no options selected', async () => { + permissions.getToggledSelections.mockReturnValue(new Set()); + const msg = makeMsg({ callbackData: 'askq_submit_sdk:p1' }); + const result = await router.handle(adapter, msg); + + expect(result).toBe(true); + expect(adapter.send).toHaveBeenCalledWith(expect.objectContaining({ + text: '⚠️ No options selected', + })); + expect(gateway.resolve).not.toHaveBeenCalled(); + }); + + it('resolves gateway with selected labels when options exist', async () => { + permissions.getToggledSelections.mockReturnValue(new Set([0, 2])); + sdkState.sdkQuestionData.set('p1', { + questions: [{ + question: 'Pick tools', + header: 'Tools', + options: [ + { label: 'Alpha' }, + { label: 'Beta' }, + { label: 'Gamma' }, + ], + multiSelect: true, + }], + chatId: 'c1', + }); + + const msg = makeMsg({ callbackData: 'askq_submit_sdk:p1' }); + const result = await router.handle(adapter, msg); + + expect(result).toBe(true); + expect(sdkState.sdkQuestionTextAnswers.get('p1')).toBe('Alpha,Gamma'); + expect(permissions.cleanupQuestion).toHaveBeenCalledWith('p1'); + expect(gateway.resolve).toHaveBeenCalledWith('p1', 'allow'); + expect(adapter.editMessage).toHaveBeenCalledWith('c1', 'm1', expect.objectContaining({ + text: '✅ Selected: Alpha, Gamma', + buttons: [], + })); + }); + }); + + describe('hook:allow:{hookId}:{sessionId}', () => { + it('resolves hook permission allow', async () => { + const msg = makeMsg({ callbackData: 'hook:allow:h1:sess1' }); + const result = await router.handle(adapter, msg); + + expect(result).toBe(true); + expect(permissions.resolveHookCallback).toHaveBeenCalledWith( + 'h1', 'allow', 'sess1', 'm1', adapter, 'c1', true, + ); + }); + }); + + describe('hook:deny:{hookId}:{sessionId}', () => { + it('resolves hook permission deny', async () => { + const msg = makeMsg({ callbackData: 'hook:deny:h1:sess1' }); + const result = await router.handle(adapter, msg); + + expect(result).toBe(true); + expect(permissions.resolveHookCallback).toHaveBeenCalledWith( + 'h1', 'deny', 'sess1', 'm1', adapter, 'c1', true, + ); + }); + }); + + describe('perm:allow_edits:{permId}', () => { + it('resolves gateway for graduated permission', async () => { + const msg = makeMsg({ callbackData: 'perm:allow_edits:p1' }); + const result = await router.handle(adapter, msg); + + expect(result).toBe(true); + expect(gateway.resolve).toHaveBeenCalledWith('p1', 'allow'); + }); + }); + + describe('perm:allow_tool:{permId}:{toolName}', () => { + it('resolves gateway and adds tool to whitelist', async () => { + const msg = makeMsg({ callbackData: 'perm:allow_tool:p1:WriteFile' }); + const result = await router.handle(adapter, msg); + + expect(result).toBe(true); + expect(gateway.resolve).toHaveBeenCalledWith('p1', 'allow'); + expect(permissions.addAllowedTool).toHaveBeenCalledWith('WriteFile'); + }); + + it('handles tool names with colons', async () => { + const msg = makeMsg({ callbackData: 'perm:allow_tool:p1:mcp:some:tool' }); + await router.handle(adapter, msg); + + expect(gateway.resolve).toHaveBeenCalledWith('p1', 'allow'); + expect(permissions.addAllowedTool).toHaveBeenCalledWith('mcp:some:tool'); + }); + }); + + describe('perm:allow_bash:{permId}:{prefix}', () => { + it('resolves gateway and adds bash prefix to whitelist', async () => { + const msg = makeMsg({ callbackData: 'perm:allow_bash:p1:npm run' }); + const result = await router.handle(adapter, msg); + + expect(result).toBe(true); + expect(gateway.resolve).toHaveBeenCalledWith('p1', 'allow'); + expect(permissions.addAllowedBashPrefix).toHaveBeenCalledWith('npm run'); + }); + + it('handles prefixes with colons', async () => { + const msg = makeMsg({ callbackData: 'perm:allow_bash:p1:docker:compose' }); + await router.handle(adapter, msg); + + expect(permissions.addAllowedBashPrefix).toHaveBeenCalledWith('docker:compose'); + }); + }); + + describe('perm:allow:{permId}:askq:{idx} — SDK answer callback', () => { + it('resolves gateway with selected option and edits message', async () => { + sdkState.sdkQuestionData.set('p1', { + questions: [{ + question: 'Pick one', + header: 'Choice', + options: [ + { label: 'Option A' }, + { label: 'Option B' }, + ], + multiSelect: false, + }], + chatId: 'c1', + }); + + const msg = makeMsg({ callbackData: 'perm:allow:p1:askq:1' }); + const result = await router.handle(adapter, msg); + + expect(result).toBe(true); + expect(sdkState.sdkQuestionAnswers.get('p1')).toBe(1); + expect(gateway.resolve).toHaveBeenCalledWith('p1', 'allow'); + expect(adapter.editMessage).toHaveBeenCalledWith('c1', 'm1', expect.objectContaining({ + text: '✅ Selected: Option B', + buttons: [], + })); + }); + + it('returns true without resolving when option index is out of range', async () => { + sdkState.sdkQuestionData.set('p1', { + questions: [{ + question: 'Pick one', + header: 'Choice', + options: [{ label: 'Only' }], + multiSelect: false, + }], + chatId: 'c1', + }); + + const msg = makeMsg({ callbackData: 'perm:allow:p1:askq:5' }); + const result = await router.handle(adapter, msg); + + expect(result).toBe(true); + expect(gateway.resolve).not.toHaveBeenCalled(); + }); + }); + + describe('perm:allow:{permId}:askq_skip — SDK skip callback', () => { + it('resolves gateway with deny/Skipped and edits message', async () => { + const msg = makeMsg({ callbackData: 'perm:allow:p1:askq_skip' }); + const result = await router.handle(adapter, msg); + + expect(result).toBe(true); + expect(gateway.resolve).toHaveBeenCalledWith('p1', 'deny', 'Skipped'); + expect(adapter.editMessage).toHaveBeenCalledWith('c1', 'm1', expect.objectContaining({ + text: '⏭ Skipped', + buttons: [], + })); + }); + }); + + describe('perm:allow:p1 / perm:deny:p1 — regular broker callback (fallback)', () => { + it('delegates perm:allow to handleBrokerCallback', async () => { + const msg = makeMsg({ callbackData: 'perm:allow:p1' }); + const result = await router.handle(adapter, msg); + + expect(result).toBe(true); + expect(permissions.handleBrokerCallback).toHaveBeenCalledWith('perm:allow:p1'); + }); + + it('delegates perm:deny to handleBrokerCallback', async () => { + const msg = makeMsg({ callbackData: 'perm:deny:p1' }); + const result = await router.handle(adapter, msg); + + expect(result).toBe(true); + expect(permissions.handleBrokerCallback).toHaveBeenCalledWith('perm:deny:p1'); + }); + }); +}); diff --git a/bridge/src/__tests__/hook-engine.test.ts b/bridge/src/__tests__/hook-engine.test.ts new file mode 100644 index 00000000..6e7b625b --- /dev/null +++ b/bridge/src/__tests__/hook-engine.test.ts @@ -0,0 +1,351 @@ +import { describe, it, expect, vi, beforeEach } from 'vitest'; +import { HookEngine } from '../engine/hook-engine.js'; +import type { BaseChannelAdapter } from '../channels/base.js'; +import type { PermissionCoordinator } from '../engine/permission-coordinator.js'; + +vi.mock('../formatting/index.js', () => ({ + formatNotification: vi.fn().mockImplementation(({ type, title, summary, terminalUrl }) => ({ + text: title, + html: title, + embed: undefined, + feishuHeader: undefined, + })), +})); + +vi.mock('../config.js', () => ({ + loadConfig: vi.fn().mockReturnValue({ publicUrl: '', port: 4590 }), +})); + +function mockAdapter(channelType = 'telegram'): BaseChannelAdapter { + return { + channelType, + start: vi.fn().mockResolvedValue(undefined), + stop: vi.fn().mockResolvedValue(undefined), + consumeOne: vi.fn().mockReturnValue(null), + send: vi.fn().mockResolvedValue({ messageId: 'msg-42', success: true }), + editMessage: vi.fn().mockResolvedValue(undefined), + sendTyping: vi.fn().mockResolvedValue(undefined), + addReaction: vi.fn().mockResolvedValue(undefined), + removeReaction: vi.fn().mockResolvedValue(undefined), + validateConfig: vi.fn().mockReturnValue(null), + isAuthorized: vi.fn().mockReturnValue(true), + } as any; +} + +function mockPermissions(): PermissionCoordinator { + return { + trackHookMessage: vi.fn(), + } as any; +} + +describe('HookEngine', () => { + let engine: HookEngine; + let permissions: PermissionCoordinator; + let adapter: BaseChannelAdapter; + let formatNotification: ReturnType; + + beforeEach(async () => { + vi.clearAllMocks(); + permissions = mockPermissions(); + adapter = mockAdapter(); + + const formatting = await import('../formatting/index.js'); + formatNotification = vi.mocked(formatting.formatNotification); + formatNotification.mockImplementation(({ type, title, summary, terminalUrl }: any) => ({ + text: title, + html: title, + embed: undefined, + feishuHeader: undefined, + })); + }); + + function createEngine(opts: { coreAvailable?: boolean; token?: string; localIP?: string } = {}) { + return new HookEngine( + permissions, + () => opts.coreAvailable ?? false, + opts.token ?? 'test-token', + () => opts.localIP ?? '127.0.0.1', + ); + } + + describe('stop notification', () => { + it('builds title with context suffix and truncates summary at 3000 chars', async () => { + engine = createEngine(); + const longMessage = 'x'.repeat(4000); + + await engine.sendNotification(adapter, 'c1', { + tlive_hook_type: 'stop', + tlive_session_id: 'session-abc123', + tlive_cwd: '/home/user/my-project', + last_assistant_message: longMessage, + }); + + expect(formatNotification).toHaveBeenCalledWith( + expect.objectContaining({ + type: 'stop', + title: 'Terminal · my-project · #abc123', + summary: 'x'.repeat(2997) + '...', + }), + 'telegram', + ); + }); + + it('uses last_output as fallback when last_assistant_message is missing', async () => { + engine = createEngine(); + + await engine.sendNotification(adapter, 'c1', { + tlive_hook_type: 'stop', + last_output: 'some output', + }); + + expect(formatNotification).toHaveBeenCalledWith( + expect.objectContaining({ + type: 'stop', + summary: 'some output', + }), + 'telegram', + ); + }); + + it('sets summary to undefined when no assistant message or output', async () => { + engine = createEngine(); + + await engine.sendNotification(adapter, 'c1', { + tlive_hook_type: 'stop', + }); + + expect(formatNotification).toHaveBeenCalledWith( + expect.objectContaining({ + type: 'stop', + summary: undefined, + }), + 'telegram', + ); + }); + }); + + describe('idle_prompt notification', () => { + it('builds title with message', async () => { + engine = createEngine(); + + await engine.sendNotification(adapter, 'c1', { + notification_type: 'idle_prompt', + message: 'Claude is waiting for your input', + tlive_session_id: 'sess-xyz789', + tlive_cwd: '/home/user/app', + }); + + expect(formatNotification).toHaveBeenCalledWith( + expect.objectContaining({ + type: 'idle_prompt', + title: 'Terminal · app · #xyz789 · Claude is waiting for your input', + }), + 'telegram', + ); + }); + + it('uses default message when hook.message is missing', async () => { + engine = createEngine(); + + await engine.sendNotification(adapter, 'c1', { + notification_type: 'idle_prompt', + }); + + expect(formatNotification).toHaveBeenCalledWith( + expect.objectContaining({ + type: 'idle_prompt', + title: 'Terminal · Waiting for input...', + }), + 'telegram', + ); + }); + }); + + describe('generic notification', () => { + it('uses hook.message as title', async () => { + engine = createEngine(); + + await engine.sendNotification(adapter, 'c1', { + message: 'Something happened', + }); + + expect(formatNotification).toHaveBeenCalledWith( + expect.objectContaining({ + type: 'generic', + title: 'Something happened', + }), + 'telegram', + ); + }); + + it('falls back to "Notification" when message is missing', async () => { + engine = createEngine(); + + await engine.sendNotification(adapter, 'c1', {}); + + expect(formatNotification).toHaveBeenCalledWith( + expect.objectContaining({ + type: 'generic', + title: 'Notification', + }), + 'telegram', + ); + }); + }); + + describe('context suffix', () => { + it('includes project name from cwd and short session ID', async () => { + engine = createEngine(); + + await engine.sendNotification(adapter, 'c1', { + tlive_hook_type: 'stop', + tlive_cwd: '/home/user/my-awesome-project', + tlive_session_id: 'abcdef123456', + }); + + expect(formatNotification).toHaveBeenCalledWith( + expect.objectContaining({ + title: 'Terminal · my-awesome-project · #123456', + }), + 'telegram', + ); + }); + + it('omits suffix when cwd and session_id are both missing', async () => { + engine = createEngine(); + + await engine.sendNotification(adapter, 'c1', { + tlive_hook_type: 'stop', + }); + + expect(formatNotification).toHaveBeenCalledWith( + expect.objectContaining({ + title: 'Terminal', + }), + 'telegram', + ); + }); + }); + + describe('terminal URL', () => { + it('generates URL when coreAvailable is true and session_id exists', async () => { + engine = createEngine({ coreAvailable: true, token: 'my-token', localIP: '192.168.1.10' }); + + await engine.sendNotification(adapter, 'c1', { + tlive_hook_type: 'stop', + tlive_session_id: 'sess-001', + }); + + expect(formatNotification).toHaveBeenCalledWith( + expect.objectContaining({ + terminalUrl: 'http://192.168.1.10:4590/terminal.html?id=sess-001&token=my-token', + }), + 'telegram', + ); + }); + + it('does not generate URL when coreAvailable is false', async () => { + engine = createEngine({ coreAvailable: false }); + + await engine.sendNotification(adapter, 'c1', { + tlive_hook_type: 'stop', + tlive_session_id: 'sess-001', + }); + + expect(formatNotification).toHaveBeenCalledWith( + expect.objectContaining({ + terminalUrl: undefined, + }), + 'telegram', + ); + }); + + it('does not generate URL when session_id is missing', async () => { + engine = createEngine({ coreAvailable: true }); + + await engine.sendNotification(adapter, 'c1', { + tlive_hook_type: 'stop', + }); + + expect(formatNotification).toHaveBeenCalledWith( + expect.objectContaining({ + terminalUrl: undefined, + }), + 'telegram', + ); + }); + + it('uses publicUrl from config when available', async () => { + const { loadConfig } = await import('../config.js'); + vi.mocked(loadConfig).mockReturnValue({ publicUrl: 'https://my.domain.com', port: 4590 } as any); + + engine = createEngine({ coreAvailable: true, token: 'tk' }); + + await engine.sendNotification(adapter, 'c1', { + tlive_hook_type: 'stop', + tlive_session_id: 'sess-002', + }); + + expect(formatNotification).toHaveBeenCalledWith( + expect.objectContaining({ + terminalUrl: 'https://my.domain.com/terminal.html?id=sess-002&token=tk', + }), + 'telegram', + ); + }); + }); + + describe('message tracking', () => { + it('calls trackHookMessage with message ID and session ID', async () => { + engine = createEngine(); + + await engine.sendNotification(adapter, 'c1', { + tlive_hook_type: 'stop', + tlive_session_id: 'sess-track', + }); + + expect(permissions.trackHookMessage).toHaveBeenCalledWith('msg-42', 'sess-track'); + }); + + it('passes empty string for session ID when missing', async () => { + engine = createEngine(); + + await engine.sendNotification(adapter, 'c1', { + message: 'test', + }); + + expect(permissions.trackHookMessage).toHaveBeenCalledWith('msg-42', ''); + }); + }); + + describe('receiveIdType', () => { + it('passes receiveIdType through to outbound message', async () => { + engine = createEngine(); + + await engine.sendNotification(adapter, 'c1', { + message: 'test', + }, 'open_id'); + + expect(adapter.send).toHaveBeenCalledWith( + expect.objectContaining({ + chatId: 'c1', + receiveIdType: 'open_id', + }), + ); + }); + + it('sets receiveIdType to undefined when not provided', async () => { + engine = createEngine(); + + await engine.sendNotification(adapter, 'c1', { + message: 'test', + }); + + expect(adapter.send).toHaveBeenCalledWith( + expect.objectContaining({ + receiveIdType: undefined, + }), + ); + }); + }); +}); diff --git a/bridge/src/__tests__/message-router.test.ts b/bridge/src/__tests__/message-router.test.ts new file mode 100644 index 00000000..63e9ea0d --- /dev/null +++ b/bridge/src/__tests__/message-router.test.ts @@ -0,0 +1,477 @@ +import { describe, it, expect, vi, beforeEach, afterEach } from 'vitest'; +import { MessageRouter } from '../engine/message-router.js'; +import type { BaseChannelAdapter } from '../channels/base.js'; +import type { InboundMessage, FileAttachment } from '../channels/types.js'; + +// ── Mocks ────────────────────────────────────────────────────────────── + +vi.mock('node:fs', () => ({ + writeFileSync: vi.fn(), + mkdirSync: vi.fn(), + readFileSync: vi.fn(() => { throw new Error('not found'); }), +})); + +function mockAdapter(channelType = 'telegram'): BaseChannelAdapter & { requestPairing?: ReturnType } { + return { + channelType, + start: vi.fn().mockResolvedValue(undefined), + stop: vi.fn().mockResolvedValue(undefined), + consumeOne: vi.fn().mockReturnValue(null), + send: vi.fn().mockResolvedValue({ messageId: 'sent-1', success: true }), + editMessage: vi.fn().mockResolvedValue(undefined), + sendTyping: vi.fn().mockResolvedValue(undefined), + addReaction: vi.fn().mockResolvedValue(undefined), + removeReaction: vi.fn().mockResolvedValue(undefined), + validateConfig: vi.fn().mockReturnValue(null), + isAuthorized: vi.fn().mockReturnValue(true), + } as any; +} + +function mockPermissions() { + return { + parsePermissionText: vi.fn().mockReturnValue(null), + tryResolveByText: vi.fn().mockReturnValue(false), + pendingPermissionCount: vi.fn().mockReturnValue(0), + findHookPermission: vi.fn().mockReturnValue(null), + resolveHookPermission: vi.fn().mockResolvedValue(undefined), + getLatestPendingQuestion: vi.fn().mockReturnValue(null), + getQuestionData: vi.fn().mockReturnValue(null), + resolveAskQuestion: vi.fn().mockResolvedValue(undefined), + resolveAskQuestionWithText: vi.fn().mockResolvedValue(undefined), + isHookMessage: vi.fn().mockReturnValue(false), + getHookMessage: vi.fn().mockReturnValue(null), + storeQuestionData: vi.fn(), + trackPermissionMessage: vi.fn(), + getGateway: vi.fn().mockReturnValue({ + isPending: vi.fn().mockReturnValue(false), + resolve: vi.fn(), + }), + }; +} + +function mockState() { + return { + stateKey: vi.fn((channelType: string, chatId: string) => `${channelType}:${chatId}`), + }; +} + +function mockSdkEngine() { + return { + findPendingQuestion: vi.fn().mockReturnValue(null), + getQuestionState: vi.fn().mockReturnValue({ + sdkQuestionData: new Map(), + sdkQuestionAnswers: new Map(), + sdkQuestionTextAnswers: new Map(), + }), + }; +} + +function makeMsg(overrides: Partial = {}): InboundMessage { + return { + channelType: 'telegram', + chatId: 'c1', + userId: 'u1', + text: 'hello', + messageId: 'm1', + ...overrides, + }; +} + +function makeAttachment(sizeBytes = 100): FileAttachment { + return { + type: 'image', + name: 'img.png', + mimeType: 'image/png', + base64Data: 'A'.repeat(sizeBytes), + }; +} + +// ── Tests ────────────────────────────────────────────────────────────── + +describe('MessageRouter', () => { + let router: MessageRouter; + let permissions: ReturnType; + let state: ReturnType; + let sdkEngine: ReturnType; + let adapter: ReturnType; + let coreAvailable: ReturnType; + + beforeEach(() => { + permissions = mockPermissions(); + state = mockState(); + sdkEngine = mockSdkEngine(); + coreAvailable = vi.fn().mockReturnValue(true); + adapter = mockAdapter(); + + router = new MessageRouter( + permissions as any, + state as any, + sdkEngine as any, + coreAvailable, + 'http://localhost:4590', + 'test-token', + ); + }); + + // ── 1. Auth ────────────────────────────────────────────────────────── + + describe('auth', () => { + it('returns unauthorized when adapter rejects user', async () => { + (adapter.isAuthorized as any).mockReturnValue(false); + const result = await router.route(adapter, makeMsg()); + expect(result).toEqual({ action: 'unauthorized' }); + }); + + it('triggers Telegram pairing flow for unauthorized user', async () => { + const tgAdapter = mockAdapter('telegram') as any; + tgAdapter.isAuthorized.mockReturnValue(false); + tgAdapter.requestPairing = vi.fn().mockReturnValue('ABC123'); + + const result = await router.route(tgAdapter, makeMsg({ text: 'hi' })); + + expect(result).toEqual({ action: 'unauthorized' }); + expect(tgAdapter.requestPairing).toHaveBeenCalledWith('u1', 'c1', 'u1'); + expect(tgAdapter.send).toHaveBeenCalledWith( + expect.objectContaining({ html: expect.stringContaining('ABC123') }), + ); + }); + }); + + // ── 2. ChatId tracking ────────────────────────────────────────────── + + describe('chatId tracking', () => { + it('updates lastChatId and returns it via getLastChatId', async () => { + await router.route(adapter, makeMsg({ chatId: 'chat-42' })); + expect(router.getLastChatId('telegram')).toBe('chat-42'); + }); + + it('returns empty string for unknown channel type', () => { + expect(router.getLastChatId('slack')).toBe(''); + }); + }); + + // ── 3. Attachment buffering ────────────────────────────────────────── + + describe('attachment buffering', () => { + it('buffers image-only message and returns handled', async () => { + const msg = makeMsg({ text: '', attachments: [makeAttachment()] }); + const result = await router.route(adapter, msg); + expect(result).toEqual({ action: 'handled' }); + }); + + it('merges buffered attachments into subsequent text message', async () => { + const att = makeAttachment(); + await router.route(adapter, makeMsg({ text: '', attachments: [att], messageId: 'm-img' })); + + const textMsg = makeMsg({ text: 'describe this', messageId: 'm-txt' }); + const result = await router.route(adapter, textMsg); + + // Message should pass through with attachments merged + expect(result).toEqual({ action: 'pass' }); + expect(textMsg.attachments).toHaveLength(1); + expect(textMsg.attachments![0].name).toBe('img.png'); + }); + }); + + // ── 4. Attachment limits ───────────────────────────────────────────── + + describe('attachment limits', () => { + it('enforces max 5 attachments', async () => { + const atts = Array.from({ length: 7 }, () => makeAttachment(50)); + const msg = makeMsg({ text: '', attachments: atts }); + await router.route(adapter, msg); + + // Now send text to merge + const textMsg = makeMsg({ text: 'describe' }); + await router.route(adapter, textMsg); + expect(textMsg.attachments).toHaveLength(5); + }); + + it('trims attachments exceeding 10MB total', async () => { + const bigSize = 4 * 1024 * 1024; // 4MB each → only 2 fit in 10MB + const atts = [makeAttachment(bigSize), makeAttachment(bigSize), makeAttachment(bigSize)]; + const msg = makeMsg({ text: '', attachments: atts }); + await router.route(adapter, msg); + + const textMsg = makeMsg({ text: 'describe' }); + await router.route(adapter, textMsg); + expect(textMsg.attachments!.length).toBe(2); + }); + }); + + // ── 5. Attachment expiry ───────────────────────────────────────────── + + describe('attachment expiry', () => { + it('discards buffered attachments after 60s', async () => { + const att = makeAttachment(); + const now = 1000000; + vi.spyOn(Date, 'now').mockReturnValue(now); + + await router.route(adapter, makeMsg({ text: '', attachments: [att] })); + + // Advance past 60s + (Date.now as any).mockReturnValue(now + 61_000); + + const textMsg = makeMsg({ text: 'describe' }); + await router.route(adapter, textMsg); + + expect(textMsg.attachments ?? []).toHaveLength(0); + + vi.restoreAllMocks(); + }); + }); + + // ── 6. Permission text: allow ──────────────────────────────────────── + + describe('permission text resolution', () => { + it('"allow" resolves SDK permission and returns handled', async () => { + permissions.parsePermissionText.mockReturnValue('allow'); + permissions.tryResolveByText.mockReturnValue(true); + + const result = await router.route(adapter, makeMsg({ text: 'allow' })); + + expect(result).toEqual({ action: 'handled' }); + expect(adapter.addReaction).toHaveBeenCalledWith('c1', 'm1', 'OK'); + }); + + // ── 7. Permission text: deny ───────────────────────────────────── + + it('"deny" adds NO reaction emoji', async () => { + permissions.parsePermissionText.mockReturnValue('deny'); + permissions.tryResolveByText.mockReturnValue(true); + + await router.route(adapter, makeMsg({ text: 'deny' })); + + expect(adapter.addReaction).toHaveBeenCalledWith('c1', 'm1', 'NO'); + }); + }); + + // ── 8. Multiple pending permissions ────────────────────────────────── + + describe('multiple pending permissions', () => { + it('warns user to quote-reply when >1 pending and no replyToMessageId', async () => { + permissions.parsePermissionText.mockReturnValue('allow'); + permissions.tryResolveByText.mockReturnValue(false); + permissions.pendingPermissionCount.mockReturnValue(2); + + const result = await router.route(adapter, makeMsg({ text: 'allow' })); + + expect(result).toEqual({ action: 'handled' }); + expect(adapter.send).toHaveBeenCalledWith( + expect.objectContaining({ text: expect.stringContaining('Multiple permissions pending') }), + ); + }); + }); + + // ── 9. Hook permission text ────────────────────────────────────────── + + describe('hook permission text', () => { + it('resolves via hook permission on quote-reply', async () => { + permissions.parsePermissionText.mockReturnValue('allow'); + permissions.tryResolveByText.mockReturnValue(false); + permissions.pendingPermissionCount.mockReturnValue(1); + permissions.findHookPermission.mockReturnValue({ permissionId: 'hp1' }); + + const result = await router.route(adapter, makeMsg({ + text: 'allow', + replyToMessageId: 'perm-msg-1', + })); + + expect(result).toEqual({ action: 'handled' }); + expect(permissions.resolveHookPermission).toHaveBeenCalledWith('hp1', 'allow', 'telegram', true); + expect(adapter.send).toHaveBeenCalledWith( + expect.objectContaining({ text: expect.stringContaining('Allowed') }), + ); + }); + }); + + // ── 10-14. AskQuestion text reply ──────────────────────────────────── + + describe('AskQuestion text reply', () => { + it('numeric reply selects option (hook)', async () => { + permissions.getLatestPendingQuestion.mockReturnValue({ + hookId: 'hq1', + sessionId: 'sess1', + messageId: 'qmsg1', + }); + permissions.getQuestionData.mockReturnValue({ + questions: [{ question: 'Pick', options: [{ label: 'A' }, { label: 'B' }] }], + }); + + const result = await router.route(adapter, makeMsg({ text: '2' })); + + expect(result).toEqual({ action: 'handled' }); + expect(permissions.resolveAskQuestion).toHaveBeenCalledWith( + 'hq1', 1, 'sess1', 'qmsg1', adapter, 'c1', true, + ); + }); + + it('numeric reply selects option (SDK)', async () => { + const sdkAnswers = new Map(); + sdkEngine.findPendingQuestion.mockReturnValue({ permId: 'sp1' }); + sdkEngine.getQuestionState.mockReturnValue({ + sdkQuestionData: new Map([['sp1', { questions: [{ question: 'Pick', options: [{ label: 'X' }, { label: 'Y' }] }] }]]), + sdkQuestionAnswers: sdkAnswers, + sdkQuestionTextAnswers: new Map(), + }); + + const gateway = { isPending: vi.fn(), resolve: vi.fn() }; + permissions.getGateway.mockReturnValue(gateway); + + const result = await router.route(adapter, makeMsg({ text: '1' })); + + expect(result).toEqual({ action: 'handled' }); + expect(sdkAnswers.get('sp1')).toBe(0); + expect(gateway.resolve).toHaveBeenCalledWith('sp1', 'allow'); + }); + + it('free text answer (hook)', async () => { + permissions.getLatestPendingQuestion.mockReturnValue({ + hookId: 'hq2', + sessionId: 'sess2', + messageId: 'qmsg2', + }); + permissions.getQuestionData.mockReturnValue({ + questions: [{ question: 'What?', options: [] }], + }); + + const result = await router.route(adapter, makeMsg({ text: 'my answer' })); + + expect(result).toEqual({ action: 'handled' }); + expect(permissions.resolveAskQuestionWithText).toHaveBeenCalledWith( + 'hq2', 'my answer', 'sess2', 'qmsg2', adapter, 'c1', true, + ); + }); + + it('free text answer (SDK)', async () => { + const sdkTextAnswers = new Map(); + sdkEngine.findPendingQuestion.mockReturnValue({ permId: 'sp2' }); + sdkEngine.getQuestionState.mockReturnValue({ + sdkQuestionData: new Map([['sp2', { questions: [{ question: 'What?', options: [] }] }]]), + sdkQuestionAnswers: new Map(), + sdkQuestionTextAnswers: sdkTextAnswers, + }); + + const gateway = { isPending: vi.fn(), resolve: vi.fn() }; + permissions.getGateway.mockReturnValue(gateway); + + const result = await router.route(adapter, makeMsg({ text: 'free text' })); + + expect(result).toEqual({ action: 'handled' }); + expect(sdkTextAnswers.get('sp2')).toBe('free text'); + expect(gateway.resolve).toHaveBeenCalledWith('sp2', 'allow'); + }); + + it('out-of-range number falls through to free text', async () => { + permissions.getLatestPendingQuestion.mockReturnValue({ + hookId: 'hq3', + sessionId: 'sess3', + messageId: 'qmsg3', + }); + permissions.getQuestionData.mockReturnValue({ + questions: [{ question: 'Pick', options: [{ label: 'A' }] }], + }); + + const result = await router.route(adapter, makeMsg({ text: '99' })); + + expect(result).toEqual({ action: 'handled' }); + // Should call free text, not numeric resolve + expect(permissions.resolveAskQuestion).not.toHaveBeenCalled(); + expect(permissions.resolveAskQuestionWithText).toHaveBeenCalledWith( + 'hq3', '99', 'sess3', 'qmsg3', adapter, 'c1', true, + ); + }); + }); + + // ── 15. Hook reply routing ─────────────────────────────────────────── + + describe('hook reply routing', () => { + let originalFetch: typeof global.fetch; + + beforeEach(() => { + originalFetch = global.fetch; + }); + + afterEach(() => { + global.fetch = originalFetch; + }); + + it('routes reply to hook message via fetch and confirms', async () => { + permissions.isHookMessage.mockReturnValue(true); + permissions.getHookMessage.mockReturnValue({ sessionId: 'sess-x' }); + + global.fetch = vi.fn() + .mockResolvedValueOnce({ ok: false }) // pending endpoint fails → skip AskQ check + .mockResolvedValueOnce({ ok: true }) as any; // session input succeeds + + const result = await router.route(adapter, makeMsg({ + text: 'some input', + replyToMessageId: 'hook-msg-1', + })); + + expect(result).toEqual({ action: 'handled' }); + expect(global.fetch).toHaveBeenCalledWith( + 'http://localhost:4590/api/sessions/sess-x/input', + expect.objectContaining({ method: 'POST' }), + ); + expect(adapter.send).toHaveBeenCalledWith( + expect.objectContaining({ text: '✓ Sent to local session' }), + ); + }); + + it('shows error when fetch to session input fails', async () => { + permissions.isHookMessage.mockReturnValue(true); + permissions.getHookMessage.mockReturnValue({ sessionId: 'sess-y' }); + + global.fetch = vi.fn() + .mockResolvedValueOnce({ ok: false }) // pending endpoint + .mockRejectedValueOnce(new Error('connection refused')) as any; + + const result = await router.route(adapter, makeMsg({ + text: 'some input', + replyToMessageId: 'hook-msg-2', + })); + + expect(result).toEqual({ action: 'handled' }); + expect(adapter.send).toHaveBeenCalledWith( + expect.objectContaining({ text: expect.stringContaining('Failed to send') }), + ); + }); + + it('resolves AskUserQuestion found in pending hooks', async () => { + permissions.isHookMessage.mockReturnValue(true); + permissions.getHookMessage.mockReturnValue({ sessionId: 'sess-z' }); + permissions.getQuestionData.mockReturnValue(null); + + global.fetch = vi.fn().mockResolvedValue({ + ok: true, + json: async () => [{ + id: 'ask-1', + tool_name: 'AskUserQuestion', + input: { questions: [{ question: 'Pick one', header: '', options: [{ label: 'A' }, { label: 'B' }], multiSelect: false }] }, + session_id: 'sess-z', + }], + }) as any; + + const result = await router.route(adapter, makeMsg({ + text: '1', + replyToMessageId: 'hook-msg-3', + })); + + expect(result).toEqual({ action: 'handled' }); + expect(permissions.storeQuestionData).toHaveBeenCalledWith('ask-1', expect.any(Array)); + expect(permissions.resolveAskQuestion).toHaveBeenCalledWith( + 'ask-1', 0, 'sess-z', 'hook-msg-3', adapter, 'c1', true, + ); + }); + }); + + // ── 16. Pass-through ───────────────────────────────────────────────── + + describe('pass-through', () => { + it('regular text message returns pass', async () => { + const result = await router.route(adapter, makeMsg({ text: 'just chatting' })); + expect(result).toEqual({ action: 'pass' }); + }); + }); +});