From 30f05ace6586061e5f8e7312a223db698948d6b3 Mon Sep 17 00:00:00 2001 From: Vincent Corleone Date: Sun, 14 Jun 2026 10:47:40 +0800 Subject: [PATCH] add weixin-bridge leverage the benefits of feishu-bridge and @tencent-weixin/openclaw-weixin --- integrations/weixin-bridge/.env.example | 32 + integrations/weixin-bridge/README.md | 90 ++ integrations/weixin-bridge/package.json | 16 + integrations/weixin-bridge/src/index.mjs | 1006 ++++++++++++++++++++++ integrations/weixin-bridge/src/lib.mjs | 463 ++++++++++ 5 files changed, 1607 insertions(+) create mode 100644 integrations/weixin-bridge/.env.example create mode 100644 integrations/weixin-bridge/README.md create mode 100644 integrations/weixin-bridge/package.json create mode 100644 integrations/weixin-bridge/src/index.mjs create mode 100644 integrations/weixin-bridge/src/lib.mjs diff --git a/integrations/weixin-bridge/.env.example b/integrations/weixin-bridge/.env.example new file mode 100644 index 000000000..e5ff14cc6 --- /dev/null +++ b/integrations/weixin-bridge/.env.example @@ -0,0 +1,32 @@ +# ============================================================================= +# Weixin Bot Bridge — 环境变量模板 +# ============================================================================= +# +# 此 bridge 通过扫码登录微信个人账号(iLink Bot 协议), +# 而非微信公众号。登录后 bot_token 会自动保存到 WEXIN_STATE_DIR。 + +# CodeWhale Runtime 连接 +CODEWHALE_RUNTIME_URL=http://127.0.0.1:7878 +CODEWHALE_RUNTIME_TOKEN=replace-with-long-random-token +CODEWHALE_WORKSPACE=/opt/whalebro +CODEWHALE_MODEL=auto +CODEWHALE_MODE=agent +CODEWHALE_ALLOW_SHELL=true +CODEWHALE_TRUST_MODE=false +CODEWHALE_AUTO_APPROVE=false + +# Comma-separated WeChat user IDs (from_user_id) allowed to control the runtime. +# Leave empty only during first pairing, with WEXIN_ALLOW_UNLISTED=true. +WEXIN_CHAT_ALLOWLIST= +WEXIN_ALLOW_UNLISTED=false + +# 状态持久化路径 +WEXIN_STATE_DIR=/var/lib/codewhale-weixin-bot-bridge +WEXIN_THREAD_MAP_PATH=/var/lib/codewhale-weixin-bot-bridge/thread-map.json + +# 消息配置 +WEXIN_MAX_REPLY_CHARS=3500 +CODEWHALE_TURN_TIMEOUT_MS=900000 + +# 长轮询配置 +WEXIN_LONGPOLL_TIMEOUT_MS=35000 diff --git a/integrations/weixin-bridge/README.md b/integrations/weixin-bridge/README.md new file mode 100644 index 000000000..af7c96914 --- /dev/null +++ b/integrations/weixin-bridge/README.md @@ -0,0 +1,90 @@ +# Weixin Bot Bridge + +此 bridge 让微信个人账号通过扫码登录控制本地 `codewhale serve --http` runtime。 +使用腾讯 iLink Bot 协议(参考 `@tencent-weixin/openclaw-weixin`), +无需公众号注册即可工作。 + +与现有的 `integrations/wechat-bridge`(公众号客服消息模式)不同, +此 bridge 直接登录**个人微信账号**,通过长轮询 `getUpdates` 收发消息。 + +## 安全模型 + +- `codewhale serve --http` 绑定于 `127.0.0.1`。 +- `/v1/*` runtime 调用使用 `CODEWHALE_RUNTIME_TOKEN`。 +- 微信用户必须加入白名单,除非首次配对时设置 `WEXIN_ALLOW_UNLISTED=true`。 +- 仅支持私聊;暂不支持群聊。 +- 工具审批通过文本命令:`/allow ` 或 `/deny `。 +- bridge 主动向微信服务器发起长轮询请求,无需公网端口。 + +## 设置 + +```bash +cd /opt/codewhale/weixin-bot-bridge +npm install --omit=dev +cp .env.example /etc/codewhale/weixin-bot-bridge.env +sudoedit /etc/codewhale/weixin-bot-bridge.env +node src/index.mjs +``` + +首次启动时会显示一个二维码,用微信扫描以完成登录授权。 +登录凭证会自动保存,后续启动无需重新扫码。 + +## 命令 + +- `/status` +- `/threads` +- `/new` +- `/resume ` +- `/model ` +- `/interrupt` +- `/compact` +- `/allow [remember]` +- `/deny ` + +其他所有内容均作为 CodeWhale 提示发送。 + +## 首次配对 + +1. 设置 `WEXIN_ALLOW_UNLISTED=true` 启动 bridge。 +2. 扫码登录后,在微信中发送 `/status`。 +3. Bridge 会将你的 `user_id` 返回给你(若白名单为空则显示在拒绝消息中)。 +4. 将 `user_id` 加入 `WEXIN_CHAT_ALLOWLIST`。 +5. 将 `WEXIN_ALLOW_UNLISTED` 改回 `false` 并重启 bridge。 + +## 环境变量 + +| 变量 | 必填 | 说明 | +|------|------|------| +| `CODEWHALE_RUNTIME_URL` | 否 | Runtime HTTP 地址(默认 `http://127.0.0.1:7878`) | +| `CODEWHALE_RUNTIME_TOKEN` | **是** | Runtime Bearer 令牌 | +| `CODEWHALE_WORKSPACE` | 否 | 工作区路径(默认 cwd) | +| `CODEWHALE_MODEL` | 否 | 模型名称(默认 `auto`) | +| `CODEWHALE_MODE` | 否 | 运行模式(默认 `agent`) | +| `WEXIN_CHAT_ALLOWLIST` | 否 | 逗号分隔的允许用户 ID | +| `WEXIN_ALLOW_UNLISTED` | 否 | 首次配对模式(默认 `false`) | +| `WEXIN_STATE_DIR` | 否 | 状态持久化目录 | +| `WEXIN_THREAD_MAP_PATH` | 否 | 线程映射文件路径 | +| `WEXIN_MAX_REPLY_CHARS` | 否 | 单条回复最大字符数(默认 `3500`) | +| `CODEWHALE_TURN_TIMEOUT_MS` | 否 | Turn 超时(默认 `900000`) | +| `WEXIN_LONGPOLL_TIMEOUT_MS` | 否 | 长轮询超时(默认 `35000`) | + +## 架构 + +``` +微信客户端 ──getUpdates 长轮询──▶ Weixin Bot Bridge ──HTTP──▶ codewhale serve --http + ◀──sendMessage── (127.0.0.1:7878) +``` + +Bridge 通过扫码获取 `bot_token`,然后长轮询 `POST /ilink/bot/getupdates` +以接收消息,并通过 `POST /ilink/bot/sendmessage` 发送回复。 +所有消息均带有 `context_token` 以维持会话上下文。 + +## 与 wechat-bridge 的区别 + +| 特性 | wechat-bridge | weixin-bot-bridge | +|------|---------------|-------------------| +| 账号类型 | 微信公众号 | 个人微信 | +| 登录方式 | App ID + Secret 配置 | 扫码登录 | +| 消息协议 | 公众号回调 + 客服消息 | iLink Bot 长轮询 + sendMessage | +| 公网需求 | 需要(回调 URL) | 不需要 | +| 消息类型 | 仅文本 | 文本/图片/语音/视频/文件(MVP仅文本) | diff --git a/integrations/weixin-bridge/package.json b/integrations/weixin-bridge/package.json new file mode 100644 index 000000000..5556b3351 --- /dev/null +++ b/integrations/weixin-bridge/package.json @@ -0,0 +1,16 @@ +{ + "name": "@codewhale/weixin-bot-bridge", + "version": "0.1.0", + "private": true, + "type": "module", + "description": "Weixin iLink Bot bridge for a local codewhale serve --http runtime. Scan QR to login.", + "main": "src/index.mjs", + "scripts": { + "start": "node src/index.mjs", + "check": "node --check src/index.mjs && node --check src/lib.mjs", + "test": "node --test test/*.test.mjs" + }, + "engines": { + "node": ">=18" + } +} diff --git a/integrations/weixin-bridge/src/index.mjs b/integrations/weixin-bridge/src/index.mjs new file mode 100644 index 000000000..0759d2be0 --- /dev/null +++ b/integrations/weixin-bridge/src/index.mjs @@ -0,0 +1,1006 @@ +import fs from "node:fs/promises"; +import path from "node:path"; +import crypto from "node:crypto"; + +import { + getLoginQR, + waitForLogin, + getUpdates, + sendMessage, + getConfig, + notifyStart, + notifyStop, + ILinkLoginBase, + parseList, + parseBool, + envFirst, + extractText, + parseCommand, + commandAction, + preservedChatStateFields, + splitMessage, + compactRuntimeError, + latestRunningTurn, + activeTurnBlock, + helpText, +} from "./lib.mjs"; + +// ============================================================================ +// ThreadStore — JSON 文件持久化(与 feishu/telegram/wechat bridge 一致) +// ============================================================================ + +class ThreadStore { + static async open(filePath) { + const store = new ThreadStore(filePath); + await store.load(); + return store; + } + + constructor(filePath) { + this.filePath = filePath; + this.data = { chats: {}, messages: [] }; + } + + async load() { + try { + const raw = await fs.readFile(this.filePath, "utf8"); + this.data = JSON.parse(raw); + if (!this.data.chats) this.data.chats = {}; + if (!Array.isArray(this.data.messages)) this.data.messages = []; + } catch (error) { + if (error.code !== "ENOENT") throw error; + } + } + + async recordMessage(messageKey) { + if (!messageKey) return false; + if (!Array.isArray(this.data.messages)) this.data.messages = []; + if (this.data.messages.includes(messageKey)) return true; + this.data.messages.push(messageKey); + this.data.messages = this.data.messages.slice(-500); + await this.save(); + return false; + } + + async getChat(chatId) { + return this.data.chats[chatId] || null; + } + + async setChat(chatId, state) { + this.data.chats[chatId] = state; + await this.save(); + return state; + } + + async patchChat(chatId, patch) { + const current = this.data.chats[chatId] || {}; + this.data.chats[chatId] = { ...current, ...patch }; + await this.save(); + return this.data.chats[chatId]; + } + + async save() { + const dir = path.dirname(this.filePath); + await fs.mkdir(dir, { recursive: true, mode: 0o700 }); + const tmp = `${this.filePath}.tmp`; + await fs.writeFile(tmp, `${JSON.stringify(this.data, null, 2)}\n`, { + mode: 0o600, + }); + await fs.rename(tmp, this.filePath); + } +} + +// ============================================================================ +// 账号持久化 +// ============================================================================ + +function resolveAccountPath(stateDir) { + return path.join(stateDir, "account.json"); +} + +async function loadAccount(stateDir) { + const p = resolveAccountPath(stateDir); + try { + const raw = await fs.readFile(p, "utf8"); + return JSON.parse(raw); + } catch (error) { + if (error.code !== "ENOENT") throw error; + return null; + } +} + +async function saveAccount(stateDir, account) { + const p = resolveAccountPath(stateDir); + await fs.mkdir(path.dirname(p), { recursive: true, mode: 0o700 }); + const tmp = `${p}.tmp`; + await fs.writeFile(tmp, `${JSON.stringify(account, null, 2)}\n`, { + mode: 0o600, + }); + await fs.rename(tmp, p); +} + +// ============================================================================ +// 配置 +// ============================================================================ + +function requiredEnv(name) { + const value = process.env[name]; + if (!value || !value.trim()) { + console.error(`Missing required env: ${name}`); + process.exit(1); + } + return value.trim(); +} + +function requiredEnvFirst(...names) { + const value = envFirst(process.env, ...names); + if (!value) { + console.error(`Missing required env: one of ${names.join(", ")}`); + process.exit(1); + } + return value; +} + +const config = { + runtimeUrl: ( + envFirst(process.env, "CODEWHALE_RUNTIME_URL", "DEEPSEEK_RUNTIME_URL") || + "http://127.0.0.1:7878" + ).replace(/\/+$/, ""), + runtimeToken: requiredEnvFirst( + "CODEWHALE_RUNTIME_TOKEN", + "DEEPSEEK_RUNTIME_TOKEN" + ), + workspace: + envFirst(process.env, "CODEWHALE_WORKSPACE", "DEEPSEEK_WORKSPACE") || + process.cwd(), + model: + envFirst(process.env, "CODEWHALE_MODEL", "DEEPSEEK_MODEL") || "auto", + mode: + envFirst(process.env, "CODEWHALE_MODE", "DEEPSEEK_MODE") || "agent", + allowShell: parseBool( + envFirst( + process.env, + "CODEWHALE_ALLOW_SHELL", + "DEEPSEEK_ALLOW_SHELL" + ), + true + ), + trustMode: parseBool( + envFirst( + process.env, + "CODEWHALE_TRUST_MODE", + "DEEPSEEK_TRUST_MODE" + ), + false + ), + autoApprove: parseBool( + envFirst( + process.env, + "CODEWHALE_AUTO_APPROVE", + "DEEPSEEK_AUTO_APPROVE" + ), + false + ), + allowlist: parseList( + envFirst( + process.env, + "WEXIN_CHAT_ALLOWLIST", + "CODEWHALE_CHAT_ALLOWLIST", + "DEEPSEEK_CHAT_ALLOWLIST" + ) + ), + allowUnlisted: parseBool( + envFirst( + process.env, + "WEXIN_ALLOW_UNLISTED", + "CODEWHALE_ALLOW_UNLISTED", + "DEEPSEEK_ALLOW_UNLISTED" + ), + false + ), + stateDir: + process.env.WEXIN_STATE_DIR || + "/var/lib/codewhale-weixin-bot-bridge", + threadMapPath: + process.env.WEXIN_THREAD_MAP_PATH || + "/var/lib/codewhale-weixin-bot-bridge/thread-map.json", + maxReplyChars: Number(process.env.WEXIN_MAX_REPLY_CHARS || 3500), + longPollTimeoutMs: Number( + process.env.WEXIN_LONGPOLL_TIMEOUT_MS || 35000 + ), + turnTimeoutMs: Number( + envFirst( + process.env, + "CODEWHALE_TURN_TIMEOUT_MS", + "DEEPSEEK_TURN_TIMEOUT_MS" + ) || 900000 + ), +}; + +// ============================================================================ +// Runtime API 工具 +// ============================================================================ + +function authHeaders() { + return { + Authorization: `Bearer ${config.runtimeToken}`, + "Content-Type": "application/json", + }; +} + +async function readJsonSafe(response) { + try { + return await response.json(); + } catch { + return null; + } +} + +async function runtimeJson(subPath, { method = "GET", body = null, auth = true } = {}) { + const url = `${config.runtimeUrl}${subPath}`; + const options = { method, headers: auth ? authHeaders() : {} }; + if (body) options.body = JSON.stringify(body); + const response = await fetch(url, options); + const result = await readJsonSafe(response); + if (!response.ok) { + throw new Error(compactRuntimeError(response.status, result)); + } + return result; +} + +async function* readSse(response) { + let buffer = ""; + for await (const chunk of response.body) { + buffer += new TextDecoder().decode(chunk, { stream: true }); + const lines = buffer.split("\n"); + buffer = lines.pop() || ""; + for (const line of lines) { + const trimmed = line.trim(); + if (!trimmed) continue; + if (trimmed.startsWith("data:")) { + yield { data: trimmed.slice(5).trim() }; + } else if (trimmed.startsWith("event:")) { + yield { event: trimmed.slice(6).trim() }; + } else if (trimmed.startsWith("id:")) { + yield { id: trimmed.slice(3).trim() }; + } + } + } +} + +// ============================================================================ +// 消息发送 — 通过 iLink sendMessage +// ============================================================================ + +async function sendText(chatId, text) { + if (!botAccount) { + console.error("sendText: bot not logged in"); + return; + } + const chunks = splitMessage(text, config.maxReplyChars); + for (const chunk of chunks) { + await sendMessage({ + baseUrl: botAccount.baseUrl, + token: botAccount.token, + body: { + msg: { + to_user_id: chatId, + client_id: `${Date.now().toString(36)}${Math.random().toString(36).slice(2, 8)}`, + message_type: 2, // BOT + message_state: 2, // FINISH + item_list: [{ type: 1, text_item: { text: chunk } }], + context_token: await getContextToken(chatId), + }, + }, + }); + } +} + +async function getContextToken(chatId) { + const state = await threadStore.getChat(chatId); + return state?.contextToken || undefined; +} + +// ============================================================================ +// 命令处理(与 feishu/telegram/wechat bridge 一致) +// ============================================================================ + +async function handleCommand(chatId, command) { + const action = commandAction(command); + switch (action.kind) { + case "help": + await sendText(chatId, helpText()); + return; + case "status": + await sendStatus(chatId); + return; + case "threads": + await sendThreads(chatId); + return; + case "new_thread": { + const state = await ensureThread(chatId, { forceNew: true }); + await sendText(chatId, `Created thread ${state.threadId}`); + return; + } + case "resume": + await resumeThread(chatId, action.threadId); + return; + case "interrupt": + await interruptActiveTurn(chatId); + return; + case "compact": + await compactThread(chatId); + return; + case "approval": + await decideApproval(chatId, action); + return; + case "set_model": + await setChatModel(chatId, action.modelName); + return; + case "prompt": + await runPrompt(chatId, action.prompt); + return; + default: + await sendText(chatId, helpText()); + } +} + +async function ensureThread(chatId, { forceNew = false } = {}) { + const existing = await threadStore.getChat(chatId); + if (existing?.threadId && !forceNew) return existing; + + const effectiveModel = existing?.model || config.model; + + const thread = await runtimeJson("/v1/threads", { + method: "POST", + body: { + model: effectiveModel, + workspace: config.workspace, + mode: config.mode, + allow_shell: config.allowShell, + trust_mode: config.trustMode, + auto_approve: config.autoApprove, + archived: false, + system_prompt: + "You are being controlled from a WeChat phone chat via iLink Bot. Keep status updates concise. Ask for tool approvals when needed; do not assume mobile messages imply blanket approval.", + }, + }); + + const state = { + ...preservedChatStateFields(existing), + threadId: thread.id, + lastSeq: 0, + activeTurnId: null, + updatedAt: new Date().toISOString(), + }; + await threadStore.setChat(chatId, state); + return state; +} + +async function runPrompt(chatId, prompt) { + if (!prompt.trim()) { + await sendText(chatId, helpText()); + return; + } + const state = await ensureThread(chatId); + const effectiveModel = state?.model || config.model; + const detail = await runtimeJson( + `/v1/threads/${encodeURIComponent(state.threadId)}` + ); + const activeBlock = activeTurnBlock(detail, state); + if (activeBlock) { + await threadStore.patchChat(chatId, { + activeTurnId: activeBlock.turnId, + updatedAt: new Date().toISOString(), + }); + await sendText(chatId, activeBlock.message); + return; + } + if (state.activeTurnId) { + await threadStore.patchChat(chatId, { activeTurnId: null }); + } + const sinceSeq = Number(detail.latest_seq || state.lastSeq || 0); + + const turnResponse = await runtimeJson( + `/v1/threads/${encodeURIComponent(state.threadId)}/turns`, + { + method: "POST", + body: { + prompt, + input_summary: prompt.slice(0, 200), + model: effectiveModel, + mode: config.mode, + allow_shell: config.allowShell, + trust_mode: config.trustMode, + auto_approve: config.autoApprove, + }, + } + ); + + const turnId = turnResponse.turn?.id; + await threadStore.patchChat(chatId, { + activeTurnId: turnId || null, + lastSeq: sinceSeq, + updatedAt: new Date().toISOString(), + }); + await sendText(chatId, `Started turn ${turnId || "(unknown)"}`); + + try { + await streamTurnEvents(chatId, state.threadId, turnId, sinceSeq); + } finally { + await threadStore.patchChat(chatId, { + activeTurnId: null, + updatedAt: new Date().toISOString(), + }); + } +} + +async function streamTurnEvents(chatId, threadId, turnId, sinceSeq) { + const controller = new AbortController(); + const timeout = setTimeout( + () => controller.abort(), + config.turnTimeoutMs + ); + let responseText = ""; + let latestSeq = sinceSeq; + let sentProgressAt = Date.now(); + + try { + const response = await fetch( + `${config.runtimeUrl}/v1/threads/${encodeURIComponent(threadId)}/events?since_seq=${sinceSeq}`, + { + headers: authHeaders(), + signal: controller.signal, + } + ); + if (!response.ok) { + const body = await readJsonSafe(response); + throw new Error(compactRuntimeError(response.status, body)); + } + + for await (const event of readSse(response)) { + if (!event.data) continue; + const record = JSON.parse(event.data); + latestSeq = Math.max(latestSeq, Number(record.seq || 0)); + await threadStore.patchChat(chatId, { lastSeq: latestSeq }); + + if (turnId && record.turn_id && record.turn_id !== turnId) continue; + + if ( + record.event === "item.delta" && + record.payload?.kind === "agent_message" + ) { + responseText += record.payload.delta || ""; + const now = Date.now(); + if ( + responseText.length > config.maxReplyChars && + now - sentProgressAt > 15000 + ) { + await sendText(chatId, responseText.slice(0, config.maxReplyChars)); + responseText = responseText.slice(config.maxReplyChars); + sentProgressAt = now; + } + } + + if (record.event === "approval.required") { + const approval = record.payload || {}; + const approvalId = approval.approval_id || approval.id; + if (!approvalId) { + await sendText( + chatId, + [ + "Approval required", + `tool=${approval.tool_name || "unknown"}`, + approval.description || "", + "", + "No approval_id was provided by the runtime; use /status and retry from the TUI.", + ] + .filter(Boolean) + .join("\n") + ); + } else { + await sendText( + chatId, + [ + "Approval required", + `tool=${approval.tool_name || "unknown"}`, + `approval_id=${approvalId}`, + approval.description || "", + "", + `Reply /allow ${approvalId} or /deny ${approvalId}`, + ] + .filter(Boolean) + .join("\n") + ); + } + } + + if (record.event === "turn.completed") { + const turn = record.payload?.turn || {}; + const status = turn.status || "completed"; + const error = turn.error ? `\n${turn.error}` : ""; + if (status !== "completed") { + await sendText(chatId, `Turn ${status}.${error}`.trim()); + } else { + await sendText( + chatId, + responseText.trim() || "Turn completed." + ); + } + return; + } + + if (record.event === "turn.lifecycle") { + const status = + record.payload?.turn?.status || record.payload?.status; + if (["failed", "canceled", "interrupted"].includes(status)) { + await sendText(chatId, `Turn ${status}.`); + return; + } + } + } + } catch (error) { + if (error.name === "AbortError") { + await sendText( + chatId, + `Turn timed out after ${Math.round(config.turnTimeoutMs / 1000)}s.` + ); + return; + } + throw error; + } finally { + clearTimeout(timeout); + } +} + +async function sendStatus(chatId) { + try { + const [health, runtimeInfo, workspace] = await Promise.all([ + runtimeJson("/health", { auth: false }), + runtimeJson("/v1/runtime/info"), + runtimeJson("/v1/workspace/status"), + ]); + await sendText( + chatId, + [ + `runtime=${health.status || "unknown"}`, + `version=${runtimeInfo.version || "unknown"}`, + `bind=${runtimeInfo.bind_host}:${runtimeInfo.port}`, + `auth_required=${runtimeInfo.auth_required}`, + `workspace=${workspace.workspace}`, + `git_repo=${workspace.git_repo}`, + workspace.branch ? `branch=${workspace.branch}` : "", + `staged=${workspace.staged} unstaged=${workspace.unstaged} untracked=${workspace.untracked}`, + ] + .filter(Boolean) + .join("\n") + ); + } catch (error) { + await sendText(chatId, `Status check failed: ${error.message}`); + } +} + +async function sendThreads(chatId) { + try { + const threads = await runtimeJson( + "/v1/threads/summary?limit=8&include_archived=true" + ); + if (!threads.length) { + await sendText(chatId, "No runtime threads yet."); + return; + } + await sendText( + chatId, + threads + .map((thread) => { + const status = thread.latest_turn_status || "none"; + return `${thread.id} [${status}] ${thread.title || thread.preview || ""}`; + }) + .join("\n") + ); + } catch (error) { + await sendText(chatId, `Thread listing failed: ${error.message}`); + } +} + +async function resumeThread(chatId, args) { + const threadId = args.trim(); + if (!threadId) { + await sendText(chatId, "Usage: /resume "); + return; + } + try { + const detail = await runtimeJson( + `/v1/threads/${encodeURIComponent(threadId)}` + ); + const existing = await threadStore.getChat(chatId); + await threadStore.setChat(chatId, { + ...preservedChatStateFields(existing), + threadId, + lastSeq: Number(detail.latest_seq || 0), + activeTurnId: null, + updatedAt: new Date().toISOString(), + }); + await sendText(chatId, `Resumed thread ${threadId}`); + } catch (error) { + await sendText(chatId, `Resume failed: ${error.message}`); + } +} + +async function interruptActiveTurn(chatId) { + const state = await threadStore.getChat(chatId); + if (!state?.threadId) { + await sendText(chatId, "No runtime thread recorded for this chat."); + return; + } + try { + const detail = await runtimeJson( + `/v1/threads/${encodeURIComponent(state.threadId)}` + ); + const runningTurn = latestRunningTurn(detail); + const turnId = state.activeTurnId || runningTurn?.id; + if (!turnId) { + await sendText(chatId, "No active turn recorded for this chat."); + return; + } + await runtimeJson( + `/v1/threads/${encodeURIComponent(state.threadId)}/turns/${encodeURIComponent(turnId)}/interrupt`, + { method: "POST" } + ); + await threadStore.patchChat(chatId, { + activeTurnId: turnId, + updatedAt: new Date().toISOString(), + }); + await sendText(chatId, `Interrupt requested for ${turnId}`); + } catch (error) { + await sendText(chatId, `Interrupt failed: ${error.message}`); + } +} + +async function compactThread(chatId) { + try { + const state = await ensureThread(chatId); + const result = await runtimeJson( + `/v1/threads/${encodeURIComponent(state.threadId)}/compact`, + { + method: "POST", + body: { reason: "weixin-bot bridge request" }, + } + ); + await sendText( + chatId, + `Compaction started: ${result.turn?.id || "unknown turn"}` + ); + } catch (error) { + await sendText(chatId, `Compact failed: ${error.message}`); + } +} + +async function decideApproval(chatId, action) { + const decision = action.decision; + const { approvalId, remember } = action; + if (!approvalId) { + await sendText( + chatId, + `Usage: /${decision} ${decision === "allow" ? " [remember]" : ""}` + ); + return; + } + try { + await runtimeJson( + `/v1/approvals/${encodeURIComponent(approvalId)}`, + { + method: "POST", + body: { decision, remember }, + } + ); + await sendText( + chatId, + `Approval ${approvalId}: ${decision}${remember ? " and remember" : ""}` + ); + } catch (error) { + await sendText(chatId, `Approval failed: ${error.message}`); + } +} + +async function setChatModel(chatId, modelName) { + if (!modelName || modelName === "default") { + await threadStore.patchChat(chatId, { + model: null, + updatedAt: new Date().toISOString(), + }); + await sendText( + chatId, + `Reset per-chat model. Using bridge default: ${config.model}` + ); + return; + } + await threadStore.patchChat(chatId, { + model: modelName, + updatedAt: new Date().toISOString(), + }); + await sendText(chatId, `Per-chat model set to: ${modelName}`); +} + +// ============================================================================ +// 主循环 — 长轮询 getUpdates +// ============================================================================ + +let botAccount = null; +let stopping = false; +let threadStore; +let stopSignal = null; + +function resolveSyncBufPath(stateDir) { + return path.join(stateDir, "sync-buf.txt"); +} + +async function loadSyncBuf(stateDir) { + const p = resolveSyncBufPath(stateDir); + try { + return await fs.readFile(p, "utf8"); + } catch { + return ""; + } +} + +async function saveSyncBuf(stateDir, buf) { + const p = resolveSyncBufPath(stateDir); + const tmp = `${p}.tmp`; + await fs.writeFile(tmp, buf, { mode: 0o600 }); + await fs.rename(tmp, p); +} + +async function monitorLoop() { + const { baseUrl, token } = botAccount; + let getUpdatesBuf = await loadSyncBuf(config.stateDir); + let nextTimeoutMs = config.longPollTimeoutMs; + let consecutiveFailures = 0; + + console.log(`Monitor started: baseUrl=${baseUrl} timeoutMs=${nextTimeoutMs}`); + + while (!stopping) { + try { + const abortController = new AbortController(); + const timer = setTimeout( + () => abortController.abort(), + nextTimeoutMs + 5000 + ); + + const resp = await getUpdates({ + baseUrl, + token, + get_updates_buf: getUpdatesBuf, + timeoutMs: nextTimeoutMs, + signal: abortController.signal, + }); + + clearTimeout(timer); + + if (resp.longpolling_timeout_ms) { + nextTimeoutMs = resp.longpolling_timeout_ms; + } + + // 检查错误 + const isApiError = + (resp.ret !== undefined && resp.ret !== 0) || + (resp.errcode !== undefined && resp.errcode !== 0); + + if (isApiError) { + consecutiveFailures += 1; + console.error( + `getUpdates error: ret=${resp.ret} errcode=${resp.errcode} errmsg=${resp.errmsg}` + ); + if (consecutiveFailures >= 3) { + console.error("3 consecutive failures, backing off 30s"); + await sleep(30000); + consecutiveFailures = 0; + } else { + await sleep(2000); + } + continue; + } + + consecutiveFailures = 0; + + // 保存游标 + if (resp.get_updates_buf) { + getUpdatesBuf = resp.get_updates_buf; + await saveSyncBuf(config.stateDir, getUpdatesBuf); + } + + // 处理消息 + const msgs = resp.msgs || []; + for (const msg of msgs) { + const fromUser = msg.from_user_id || ""; + const messageId = String(msg.message_id || ""); + + if (!fromUser) continue; + + const msgKey = `${fromUser}:${messageId}`; + if (await threadStore.recordMessage(msgKey)) continue; + + // 保存 context_token + if (msg.context_token) { + await threadStore.patchChat(fromUser, { + contextToken: msg.context_token, + updatedAt: new Date().toISOString(), + }); + } + + // 提取文本 + const text = extractText(msg.item_list); + + if (!text) { + await sendText( + fromUser, + "仅支持文本消息。图片/语音/视频/文件暂不支持。" + ); + continue; + } + + console.log( + `[inbound] from=${fromUser} text=${text.slice(0, 100)}` + ); + + // 白名单检查 + if (!isAllowed(fromUser)) { + await sendText( + fromUser, + [ + "This WeChat user is not in WEXIN_CHAT_ALLOWLIST.", + `user_id=${fromUser}`, + "", + "For first pairing, add this user_id to WEXIN_CHAT_ALLOWLIST, or temporarily set WEXIN_ALLOW_UNLISTED=true.", + ].join("\n") + ); + continue; + } + + // 命令路由 + const command = parseCommand(text); + await handleCommand(fromUser, command).catch((error) => { + console.error( + `failed to handle command from=${fromUser} text=${text.slice(0, 100)}`, + error + ); + }); + } + } catch (error) { + if (error.name === "AbortError" || error.message?.includes("abort")) { + // 长轮询超时是正常的,立即重试 + continue; + } + if (stopping) break; + + consecutiveFailures += 1; + console.error( + `getUpdates exception (${consecutiveFailures}/3):`, + error.message + ); + if (consecutiveFailures >= 3) { + console.error("3 consecutive exceptions, backing off 30s"); + await sleep(30000); + consecutiveFailures = 0; + } else { + await sleep(2000); + } + } + } +} + +function isAllowed(fromUser) { + if (config.allowUnlisted) return true; + const allowed = new Set(config.allowlist); + return allowed.has(fromUser); +} + +function sleep(ms) { + return new Promise((resolve) => setTimeout(resolve, ms)); +} + +// ============================================================================ +// 启动流程 — QR 登录 → 长轮询 +// ============================================================================ + +async function main() { + console.log("Starting CodeWhale Weixin Bot Bridge"); + console.log(`Runtime: ${config.runtimeUrl}`); + console.log(`Workspace: ${config.workspace}`); + console.log(`State dir: ${config.stateDir}`); + + // 初始化 ThreadStore + threadStore = await ThreadStore.open(config.threadMapPath); + + // 尝试加载已有账号 + botAccount = await loadAccount(config.stateDir); + + if (botAccount?.token) { + console.log("Loaded existing bot account, trying to resume..."); + console.log(` accountId: ${botAccount.accountId}`); + console.log(` baseUrl: ${botAccount.baseUrl}`); + } else { + // QR 登录 + console.log("No bot account found. Starting QR login..."); + console.log(""); + + const { qrcodeUrl, sessionKey } = await getLoginQR(); + console.log("请用微信扫描以下二维码登录:"); + console.log(qrcodeUrl); + console.log(""); + + const result = await waitForLogin({ sessionKey, timeoutMs: 300_000 }); + + if (!result.connected) { + console.error(`Login failed: ${result.message}`); + process.exit(1); + } + + botAccount = { + accountId: result.accountId, + token: result.botToken, + baseUrl: result.baseUrl, + userId: result.userId, + }; + + await saveAccount(config.stateDir, botAccount); + console.log(`✅ Login successful! accountId=${botAccount.accountId}`); + } + + // 通知上线 + try { + const startResp = await notifyStart({ + baseUrl: botAccount.baseUrl, + token: botAccount.token, + }); + if (startResp.ret && startResp.ret !== 0) { + console.warn(`notifyStart: ret=${startResp.ret} errmsg=${startResp.errmsg}`); + } else { + console.log("notifyStart: OK"); + } + } catch (error) { + console.error("notifyStart failed:", error.message); + } + + // 信号处理 + process.once("SIGINT", shutdown); + process.once("SIGTERM", shutdown); + + if (!config.allowlist.length && !config.allowUnlisted) { + console.log( + "No allowlist configured. Incoming chats will receive their user IDs and be refused." + ); + } + + // 进入长轮询循环 + await monitorLoop(); + + console.log("Bridge stopped."); +} + +async function shutdown() { + if (stopping) return; + stopping = true; + console.log("Shutting down..."); + + if (botAccount?.token) { + try { + const stopResp = await notifyStop({ + baseUrl: botAccount.baseUrl, + token: botAccount.token, + }); + console.log( + `notifyStop: ret=${stopResp.ret} errmsg=${stopResp.errmsg ?? "OK"}` + ); + } catch (error) { + console.error("notifyStop failed:", error.message); + } + } + + setTimeout(() => process.exit(0), 2000); +} + +main().catch((error) => { + console.error("Fatal error:", error); + process.exit(1); +}); diff --git a/integrations/weixin-bridge/src/lib.mjs b/integrations/weixin-bridge/src/lib.mjs new file mode 100644 index 000000000..c9fb7bbb2 --- /dev/null +++ b/integrations/weixin-bridge/src/lib.mjs @@ -0,0 +1,463 @@ +import crypto from "node:crypto"; + +// ============================================================================ +// iLink Bot API 协议层 — 参考 @tencent-weixin/openclaw-weixin +// ============================================================================ + +const DEFAULT_API_TIMEOUT_MS = 30_000; +const LONGPOLL_DEFAULT_TIMEOUT_MS = 35_000; + +// --------------------------------------------------------------------------- +// 通用工具 +// --------------------------------------------------------------------------- + +export function parseList(raw) { + return String(raw || "") + .split(",") + .map((item) => item.trim()) + .filter(Boolean); +} + +export function parseBool(raw, fallback = false) { + if (raw == null || raw === "") return fallback; + return ["1", "true", "yes", "on"].includes(String(raw).trim().toLowerCase()); +} + +export function envFirst(env, ...names) { + for (const name of names) { + const value = env?.[name]; + if (value != null && String(value).trim()) return String(value).trim(); + } + return ""; +} + +export function randomUin() { + const uint32 = crypto.randomBytes(4).readUInt32BE(0); + return Buffer.from(String(uint32), "utf-8").toString("base64"); +} + +// iLink-App-Id: 从 openclaw-weixin 的 package.json 已知为 "bot" +const ILINK_APP_ID = "bot"; + +// iLink-App-ClientVersion: 0x00MMNNPP (major<<16 | minor<<8 | patch) +function buildClientVersion(version) { + const [major = 0, minor = 0, patch = 0] = version.split(".").map(Number); + return ((major & 0xff) << 16) | ((minor & 0xff) << 8) | (patch & 0xff); +} +const ILINK_APP_CLIENT_VERSION = String(buildClientVersion("2.4.4")); + +// --------------------------------------------------------------------------- +// 消息提取 +// --------------------------------------------------------------------------- + +export const MessageItemType = { + NONE: 0, + TEXT: 1, + IMAGE: 2, + VOICE: 3, + FILE: 4, + VIDEO: 5, + TOOL_CALL_START: 11, + TOOL_CALL_RESULT: 12, +}; + +export function extractText(itemList) { + if (!Array.isArray(itemList) || !itemList.length) return ""; + for (const item of itemList) { + if (item.type === MessageItemType.TEXT && item.text_item?.text != null) { + return String(item.text_item.text); + } + if (item.type === MessageItemType.VOICE && item.voice_item?.text) { + return item.voice_item.text; + } + } + return ""; +} + +// --------------------------------------------------------------------------- +// 命令解析(与 feishu/telegram/wechat bridge 一致) +// --------------------------------------------------------------------------- + +export function parseCommand(text) { + const trimmed = String(text || "").trim(); + if (!trimmed.startsWith("/")) return { name: "prompt", args: trimmed }; + const [head, ...rest] = trimmed.split(/\s+/); + return { + name: head.slice(1).toLowerCase(), + args: rest.join(" ").trim(), + }; +} + +export function parseApprovalDecisionArgs(args) { + const parts = String(args || "").split(/\s+/).filter(Boolean); + return { + approvalId: parts[0] || "", + remember: parts.slice(1).includes("remember"), + }; +} + +export function commandAction(command) { + switch (command.name) { + case "help": + return { kind: "help" }; + case "status": + return { kind: "status" }; + case "threads": + return { kind: "threads" }; + case "new": + return { kind: "new_thread" }; + case "resume": + return { kind: "resume", threadId: command.args }; + case "model": + return { kind: "set_model", modelName: command.args }; + case "interrupt": + return { kind: "interrupt" }; + case "compact": + return { kind: "compact" }; + case "allow": + return { + kind: "approval", + decision: "allow", + ...parseApprovalDecisionArgs(command.args), + }; + case "deny": + return { + kind: "approval", + decision: "deny", + ...parseApprovalDecisionArgs(command.args), + }; + case "prompt": + return { kind: "prompt", prompt: command.args }; + default: + return { + kind: "prompt", + prompt: `/${command.name}${command.args ? ` ${command.args}` : ""}`, + }; + } +} + +export function preservedChatStateFields(state = {}) { + const preserved = {}; + if (Object.prototype.hasOwnProperty.call(state || {}, "model")) { + preserved.model = state.model || null; + } + return preserved; +} + +// --------------------------------------------------------------------------- +// 消息拆分(微信 iLink 单条消息无明确上限,保守 3500 字符) +// --------------------------------------------------------------------------- + +export function splitMessage(text, maxChars = 3500) { + const value = String(text || ""); + const chars = Array.from(value); + if (chars.length <= maxChars) return value ? [value] : []; + const chunks = []; + let cursor = 0; + while (cursor < chars.length) { + chunks.push(chars.slice(cursor, cursor + maxChars).join("")); + cursor += maxChars; + } + return chunks; +} + +// --------------------------------------------------------------------------- +// Runtime 工具 +// --------------------------------------------------------------------------- + +export function compactRuntimeError(status, body) { + const message = + body?.error?.message || + body?.message || + (typeof body === "string" ? body : JSON.stringify(body)); + return `Runtime API request failed (${status}): ${message}`; +} + +export function latestRunningTurn(detail) { + const turns = Array.isArray(detail?.turns) ? detail.turns : []; + for (let index = turns.length - 1; index >= 0; index -= 1) { + const turn = turns[index]; + if (["queued", "in_progress"].includes(turn?.status)) return turn; + } + return null; +} + +export function activeTurnBlock(detail, state = {}) { + const runningTurn = latestRunningTurn(detail); + if (!runningTurn) return null; + return { + turnId: runningTurn.id || state.activeTurnId || "", + message: `Thread already has active turn ${ + runningTurn.id || state.activeTurnId || "(unknown)" + }. Wait for it to finish or send /interrupt.`, + }; +} + +// --------------------------------------------------------------------------- +// 帮助文本 +// --------------------------------------------------------------------------- + +export function helpText() { + return [ + "CodeWhale Weixin Bot 命令:", + "/help - 显示此帮助", + "/status - 运行时和工作区状态", + "/threads - 最近运行时线程", + "/new - 为此聊天创建新线程", + "/resume - 绑定到已有线程", + "/model - 设置或重置此聊天的模型", + "/interrupt - 中断活跃 turn", + "/compact - 压缩当前线程", + "/allow [remember] - 批准工具调用", + "/deny - 拒绝工具调用", + "", + "其他所有内容均作为 CodeWhale 提示发送。", + ].join("\n"); +} + +// ============================================================================ +// iLink Bot HTTP API 调用 +// ============================================================================ + +export const ILinkLoginBase = "https://ilinkai.weixin.qq.com"; + +function authHeaders({ token } = {}) { + const headers = { + "Content-Type": "application/json", + AuthorizationType: "ilink_bot_token", + "X-WECHAT-UIN": randomUin(), + "iLink-App-Id": ILINK_APP_ID, + "iLink-App-ClientVersion": ILINK_APP_CLIENT_VERSION, + }; + if (token) { + headers["Authorization"] = `Bearer ${token}`; + } + return headers; +} + +/** + * 通用 POST 到 iLink API。 + */ +export async function apiPost({ baseUrl, endpoint, body, token, timeoutMs, signal }) { + const url = `${baseUrl.replace(/\/+$/, "")}/${endpoint}`; + const ms = timeoutMs || DEFAULT_API_TIMEOUT_MS; + const controller = new AbortController(); + const timer = setTimeout(() => controller.abort(), ms); + signal?.addEventListener("abort", () => controller.abort(), { once: true }); + + try { + const response = await fetch(url, { + method: "POST", + headers: authHeaders({ token }), + body, + signal: controller.signal, + }); + const text = await response.text(); + if (!response.ok) { + throw new Error( + `iLink API ${endpoint} failed: HTTP ${response.status} — ${text.slice(0, 200)}` + ); + } + return text; + } finally { + clearTimeout(timer); + } +} + +/** + * 通用 GET 到 iLink API(用于轮询扫码状态等)。 + */ +export async function apiGet({ baseUrl, endpoint, token, timeoutMs, signal }) { + const url = `${baseUrl.replace(/\/+$/, "")}/${endpoint}`; + const ms = timeoutMs || DEFAULT_API_TIMEOUT_MS; + const controller = new AbortController(); + const timer = setTimeout(() => controller.abort(), ms); + signal?.addEventListener("abort", () => controller.abort(), { once: true }); + + try { + const response = await fetch(url, { + method: "GET", + headers: authHeaders({ token }), + signal: controller.signal, + }); + const text = await response.text(); + if (!response.ok) { + // 长轮询超时是正常的,不抛错 + if (response.status === 524 || text.includes("timeout")) { + return text; + } + throw new Error( + `iLink API ${endpoint} failed: HTTP ${response.status} — ${text.slice(0, 200)}` + ); + } + return text; + } finally { + clearTimeout(timer); + } +} + +// ============================================================================ +// 扫码登录 +// ============================================================================ + +/** + * 获取登录二维码。 + * 参考 login-qr.ts:79-90 — 端点 get_bot_qrcode,bot_type 是 query 参数。 + * @returns {Promise<{qrcode: string, qrcodeUrl: string, sessionKey: string}>} + */ +export async function getLoginQR({ botType = "3" } = {}) { + const raw = await apiPost({ + baseUrl: ILinkLoginBase, + endpoint: `ilink/bot/get_bot_qrcode?bot_type=${encodeURIComponent(botType)}`, + body: JSON.stringify({ local_token_list: [] }), + }); + const data = JSON.parse(raw); + const qrcodeUrl = data.qrcode_img_content || ""; + const sessionKey = data.qrcode || crypto.randomUUID(); + return { qrcode: data.qrcode, qrcodeUrl, sessionKey }; +} + +/** + * 轮询扫码状态直到确认或超时。 + * 参考 login-qr.ts:112-136 — 端点 get_qrcode_status,GET 方法。 + * @returns {Promise<{connected: boolean, botToken?: string, accountId?: string, baseUrl?: string, userId?: string, message: string}>} + */ +export async function waitForLogin({ sessionKey, timeoutMs = 300_000 } = {}) { + const deadline = Date.now() + timeoutMs; + + while (Date.now() < deadline) { + let raw; + try { + raw = await apiGet({ + baseUrl: ILinkLoginBase, + endpoint: `ilink/bot/get_qrcode_status?qrcode=${encodeURIComponent(sessionKey)}`, + timeoutMs: 35_000, // 长轮询超时 + }); + } catch { + // 超时等网络错误 → 视为 wait,继续轮询 + await new Promise((r) => setTimeout(r, 1000)); + continue; + } + + const data = JSON.parse(raw); + const status = data.status; + + if (status === "confirmed") { + return { + connected: true, + botToken: data.bot_token, + accountId: data.ilink_bot_id, + baseUrl: data.baseurl || ILinkLoginBase, + userId: data.ilink_user_id, + message: "已连接微信。", + }; + } + + if (status === "expired" || status === "binded_redirect") { + return { connected: false, message: status === "expired" ? "二维码已过期,请重试。" : "已连接过此桥接,无需重复连接。" }; + } + + // 等 1 秒再轮询 + await new Promise((r) => setTimeout(r, 1000)); + } + + return { connected: false, message: "登录超时,请重试。" }; +} + +// ============================================================================ +// 消息 API +// ============================================================================ + +/** + * 长轮询获取新消息。 + */ +export async function getUpdates({ baseUrl, token, get_updates_buf = "", timeoutMs = LONGPOLL_DEFAULT_TIMEOUT_MS, signal }) { + const raw = await apiPost({ + baseUrl, + endpoint: "ilink/bot/getupdates", + body: JSON.stringify({ + get_updates_buf, + base_info: { bot_agent: "CodeWhale/1.0" }, + }), + token, + timeoutMs, + signal, + }); + return JSON.parse(raw); +} + +/** + * 发送消息。 + */ +export async function sendMessage({ baseUrl, token, body, timeoutMs }) { + await apiPost({ + baseUrl, + endpoint: "ilink/bot/sendmessage", + body: JSON.stringify({ + ...body, + base_info: { bot_agent: "CodeWhale/1.0" }, + }), + token, + timeoutMs, + }); +} + +/** + * 发送/取消输入状态。 + */ +export async function sendTyping({ baseUrl, token, ilinkUserId, typingTicket, status = 1 }) { + await apiPost({ + baseUrl, + endpoint: "ilink/bot/sendtyping", + body: JSON.stringify({ + ilink_user_id: ilinkUserId, + typing_ticket: typingTicket, + status, + base_info: { bot_agent: "CodeWhale/1.0" }, + }), + token, + }); +} + +/** + * 获取账号配置(含 typing_ticket)。 + */ +export async function getConfig({ baseUrl, token, ilinkUserId, contextToken }) { + const raw = await apiPost({ + baseUrl, + endpoint: "ilink/bot/getconfig", + body: JSON.stringify({ + ilink_user_id: ilinkUserId, + context_token: contextToken, + base_info: { bot_agent: "CodeWhale/1.0" }, + }), + token, + }); + return JSON.parse(raw); +} + +/** + * 通知上线。 + */ +export async function notifyStart({ baseUrl, token }) { + const raw = await apiPost({ + baseUrl, + endpoint: "ilink/bot/msg/notifystart", + body: JSON.stringify({ base_info: { bot_agent: "CodeWhale/1.0" } }), + token, + }); + return JSON.parse(raw); +} + +/** + * 通知下线。 + */ +export async function notifyStop({ baseUrl, token }) { + const raw = await apiPost({ + baseUrl, + endpoint: "ilink/bot/msg/notifystop", + body: JSON.stringify({ base_info: { bot_agent: "CodeWhale/1.0" } }), + token, + }); + return JSON.parse(raw); +}