From cacf8a2bac38d701bb7af02425e002aacdfc2539 Mon Sep 17 00:00:00 2001 From: ekko Date: Tue, 28 Apr 2026 16:44:34 +0800 Subject: [PATCH] feat(chat): replace HTTP+SSE with Socket.IO for chat runs and add context compression MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Replace HTTP POST + SSE streaming with Socket.IO /chat-run namespace for decoupled message handling that survives client disconnect/refresh - Add SQLite-backed context compression with snapshot-based incremental updates - Unify server-side session state tracking (completedSessions + compressingSessions → sessionStates) for reliable state replay on reconnect - Filter compress_ sessions from session list queries - Add compression snapshot store with proper snake_case→camelCase column aliases - Delete temporary compress_ sessions after compression completes - Change compressed summary role from 'system' to 'user' - Add compression.started/completed events to frontend chat store Co-Authored-By: Claude Opus 4.6 --- package.json | 5 +- packages/client/src/api/hermes/chat.ts | 148 ++-- .../components/hermes/chat/MessageList.vue | 54 +- packages/client/src/stores/hermes/chat.ts | 638 ++++++++++-------- .../src/db/hermes/compression-snapshot.ts | 55 ++ packages/server/src/db/hermes/sessions-db.ts | 8 +- packages/server/src/db/index.ts | 7 +- packages/server/src/index.ts | 11 + .../src/lib/context-compressor/index.ts | 592 ++++++++++++++++ packages/server/src/routes/hermes/chat-run.ts | 11 + .../server/src/routes/hermes/proxy-handler.ts | 2 +- .../src/services/hermes/chat-run-socket.ts | 467 +++++++++++++ 12 files changed, 1646 insertions(+), 352 deletions(-) create mode 100644 packages/server/src/db/hermes/compression-snapshot.ts create mode 100644 packages/server/src/lib/context-compressor/index.ts create mode 100644 packages/server/src/routes/hermes/chat-run.ts create mode 100644 packages/server/src/services/hermes/chat-run-socket.ts diff --git a/package.json b/package.json index ed3b2134..27c7b2bf 100644 --- a/package.json +++ b/package.json @@ -62,15 +62,16 @@ ], "dependencies": { "eventsource": "^4.1.0", + "js-tiktoken": "^1.0.21", "node-pty": "^1.1.0", "socket.io": "^4.8.3", "socket.io-client": "^4.8.3" }, "devDependencies": { - "@multiavatar/multiavatar": "^1.0.7", "@koa/bodyparser": "^5.0.0", "@koa/cors": "^5.0.0", "@koa/router": "^15.4.0", + "@multiavatar/multiavatar": "^1.0.7", "@pinia/testing": "^1.0.3", "@types/eventsource": "^1.1.15", "@types/js-yaml": "^4.0.9", @@ -117,4 +118,4 @@ "vue-tsc": "^3.2.6", "ws": "^8.20.0" } -} \ No newline at end of file +} diff --git a/packages/client/src/api/hermes/chat.ts b/packages/client/src/api/hermes/chat.ts index 48e706fe..d4fbfc65 100644 --- a/packages/client/src/api/hermes/chat.ts +++ b/packages/client/src/api/hermes/chat.ts @@ -1,3 +1,4 @@ +import { io, type Socket } from 'socket.io-client' import { request, getBaseUrlValue, getApiKey } from '../client' export interface ChatMessage { @@ -8,7 +9,6 @@ export interface ChatMessage { export interface StartRunRequest { input: string | ChatMessage[] instructions?: string - conversation_history?: ChatMessage[] session_id?: string model?: string } @@ -38,70 +38,134 @@ export interface RunEvent { output_tokens: number total_tokens: number } + /** session_id tag added by server for client-side filtering */ + session_id?: string } -export async function startRun(body: StartRunRequest): Promise { - const headers: Record = {} - if (body.session_id) { - headers['X-Hermes-Session-Id'] = body.session_id +// ============================ +// Socket.IO chat run connection +// ============================ + +let chatRunSocket: Socket | null = null + +export function getChatRunSocket(): Socket | null { + return chatRunSocket +} + +export function connectChatRun(): Socket { + if (chatRunSocket?.connected) return chatRunSocket + + // Clean up old socket to prevent duplicate event listeners + if (chatRunSocket) { + chatRunSocket.removeAllListeners() + chatRunSocket.disconnect() } - return request('/api/hermes/v1/runs', { - method: 'POST', - body: JSON.stringify(body), - headers, + + const baseUrl = getBaseUrlValue() + const token = getApiKey() + const profile = localStorage.getItem('hermes_active_profile_name') || 'default' + + chatRunSocket = io(`${baseUrl}/chat-run`, { + auth: { token }, + query: { profile }, + transports: ['websocket', 'polling'], + reconnection: true, + reconnectionAttempts: Infinity, + reconnectionDelay: 1000, + reconnectionDelayMax: 10000, }) + + return chatRunSocket } -export function streamRunEvents( - runId: string, +export function disconnectChatRun(): void { + if (chatRunSocket) { + chatRunSocket.disconnect() + chatRunSocket = null + } +} + +/** + * Start a chat run via Socket.IO and stream events back. + * Returns an AbortController-compatible handle for cancellation. + */ +export function startRunViaSocket( + body: StartRunRequest, onEvent: (event: RunEvent) => void, onDone: () => void, onError: (err: Error) => void, -) { - const baseUrl = getBaseUrlValue() - const token = getApiKey() - const profile = localStorage.getItem('hermes_active_profile_name') - const params = new URLSearchParams() - if (token) params.set('token', token) - if (profile && profile !== 'default') params.set('profile', profile) - const qs = params.toString() - const url = `${baseUrl}/api/hermes/v1/runs/${runId}/events${qs ? `?${qs}` : ''}` - + onStarted?: (runId: string) => void, +): { abort: () => void } { + const socket = connectChatRun() let closed = false - const source = new EventSource(url) - source.onmessage = (e) => { + function cleanup() { if (closed) return - try { - const parsed = JSON.parse(e.data) - onEvent(parsed) + closed = true + socket.off('run.started', onRunStarted) + socket.off('run.failed', onRunFailed) + socket.off('message.delta', onMessageDelta) + socket.off('reasoning.delta', onReasoningDelta) + socket.off('thinking.delta', onReasoningDelta) + socket.off('reasoning.available', onReasoningAvailable) + socket.off('tool.started', onToolStarted) + socket.off('tool.completed', onToolCompleted) + socket.off('run.completed', onRunCompleted) + socket.off('compression.started', onCompressionStarted) + socket.off('compression.completed', onCompressionCompleted) + } - if (parsed.event === 'run.completed' || parsed.event === 'run.failed') { - closed = true - source.close() - onDone() - } - } catch { - onEvent({ event: 'message', delta: e.data }) + // All event handlers share the same cleanup logic + const handleEvent = (event: RunEvent) => { + if (closed) return + onEvent(event) + if (event.event === 'run.completed' || event.event === 'run.failed') { + cleanup() + onDone() } } - source.onerror = () => { - if (closed) return - closed = true - source.close() - onError(new Error('SSE connection error')) + function onRunStarted(data: RunEvent) { + handleEvent(data) + onStarted?.(data.run_id || '') } + function onRunFailed(data: RunEvent) { + handleEvent(data) + onError?.(new Error(data.error || 'Run failed')) + } + function onMessageDelta(data: RunEvent) { handleEvent(data) } + function onReasoningDelta(data: RunEvent) { handleEvent(data) } + function onThinkingDelta(data: RunEvent) { handleEvent(data) } + function onReasoningAvailable(data: RunEvent) { handleEvent(data) } + function onToolStarted(data: RunEvent) { handleEvent(data) } + function onToolCompleted(data: RunEvent) { handleEvent(data) } + function onRunCompleted(data: RunEvent) { handleEvent(data) } + function onCompressionStarted(data: RunEvent) { handleEvent(data) } + function onCompressionCompleted(data: RunEvent) { handleEvent(data) } + + socket.on('run.started', onRunStarted) + socket.on('run.failed', onRunFailed) + socket.on('message.delta', onMessageDelta) + socket.on('reasoning.delta', onReasoningDelta) + socket.on('thinking.delta', onThinkingDelta) + socket.on('reasoning.available', onReasoningAvailable) + socket.on('tool.started', onToolStarted) + socket.on('tool.completed', onToolCompleted) + socket.on('run.completed', onRunCompleted) + socket.on('compression.started', onCompressionStarted) + socket.on('compression.completed', onCompressionCompleted) + + // Emit run:start with ack callback to get run_id + socket.emit('run', body) - // Return AbortController-compatible object return { abort: () => { if (!closed) { - closed = true - source.close() + socket.emit('abort', { session_id: body.session_id }) + cleanup() } }, - } as unknown as AbortController + } } export async function fetchModels(): Promise<{ data: Array<{ id: string }> }> { diff --git a/packages/client/src/components/hermes/chat/MessageList.vue b/packages/client/src/components/hermes/chat/MessageList.vue index e3f02fef..a7e4541b 100644 --- a/packages/client/src/components/hermes/chat/MessageList.vue +++ b/packages/client/src/components/hermes/chat/MessageList.vue @@ -12,6 +12,12 @@ const { t } = useI18n(); const { isDark } = useTheme(); const listRef = ref(); +function formatTokens(n: number): string { + if (n >= 1_000_000) return (n / 1_000_000).toFixed(1) + 'M' + if (n >= 1_000) return (n / 1_000).toFixed(1) + 'K' + return String(n) +} + const displayMessages = computed(() => chatStore.messages.filter((m) => m.role !== "tool"), ); @@ -128,7 +134,48 @@ watch(currentToolCalls, () => { playsinline class="thinking-video" /> -
+
+ +
+ + + + + + + + {{ + chatStore.compressionState.compressing + ? `Compressing... (${chatStore.compressionState.messageCount} msgs, ~${formatTokens(chatStore.compressionState.beforeTokens)} tokens)` + : chatStore.compressionState.compressed + ? `Compressed ${chatStore.compressionState.messageCount} msgs: ~${formatTokens(chatStore.compressionState.beforeTokens)} → ~${formatTokens(chatStore.compressionState.afterTokens)} tokens` + : `Compression skipped` + }} + + +
+
{ background: rgba(255, 255, 255, 0.06); } + &.compression-item { + color: $text-muted; + font-size: 10px; + } + .tool-call-icon { flex-shrink: 0; color: $text-muted; diff --git a/packages/client/src/stores/hermes/chat.ts b/packages/client/src/stores/hermes/chat.ts index 6fc403ef..3fb81b1f 100644 --- a/packages/client/src/stores/hermes/chat.ts +++ b/packages/client/src/stores/hermes/chat.ts @@ -1,4 +1,4 @@ -import { startRun, streamRunEvents, type ChatMessage, type RunEvent } from '@/api/hermes/chat' +import { startRunViaSocket, connectChatRun, type RunEvent } from '@/api/hermes/chat' import { deleteSession as deleteSessionApi, fetchSession, fetchSessions, fetchSessionUsageSingle, type HermesMessage, type SessionSummary } from '@/api/hermes/sessions' import { getApiKey } from '@/api/client' import { defineStore } from 'pinia' @@ -170,18 +170,10 @@ function mapHermesSession(s: SessionSummary): Session { } } -// Cache keys for stale-while-revalidate loading of sessions / messages. -// All keys include the active profile name to isolate cache between profiles. -// Rendering from cache on boot avoids the multi-round-trip wait the user sees -// every time they open the page (esp. noticeable on mobile). const STORAGE_KEY_PREFIX = 'hermes_active_session_' -const SESSIONS_CACHE_KEY_PREFIX = 'hermes_sessions_cache_v1_' const LEGACY_STORAGE_KEY = 'hermes_active_session' -const LEGACY_SESSIONS_CACHE_KEY = 'hermes_sessions_cache_v1' -const IN_FLIGHT_TTL_MS = 15 * 60 * 1000 // Give up after 15 minutes -const POLL_INTERVAL_MS = 2000 -const POLL_STABLE_EXITS = 3 // 3 × 2s = 6s of no change → assume run finished const LIVE_BADGE_WINDOW_MS = 5 * 60 * 1000 +const IN_FLIGHT_TTL_MS = 15 * 60 * 1000 // Give up after 15 minutes // 获取当前 profile 名称,用于隔离缓存。 // 从 profiles store 的 activeProfileName(同步 localStorage)读取, @@ -196,11 +188,9 @@ function getProfileName(): string { function storageKey(): string { return STORAGE_KEY_PREFIX + getProfileName() } function sessionsCacheKey(): string { return SESSIONS_CACHE_KEY_PREFIX + getProfileName() } -function msgsCacheKey(sid: string): string { return `hermes_session_msgs_v1_${getProfileName()}_${sid}_` } -function inFlightKey(sid: string): string { return `hermes_in_flight_v1_${getProfileName()}_${sid}` } function legacyStorageKey(): string | null { return getProfileName() === 'default' ? LEGACY_STORAGE_KEY : null } function legacySessionsCacheKey(): string | null { return getProfileName() === 'default' ? LEGACY_SESSIONS_CACHE_KEY : null } -function legacyMsgsCacheKey(sid: string): string | null { return getProfileName() === 'default' ? `hermes_session_msgs_v1_${sid}` : null } +function inFlightKey(sid: string): string { return `hermes_in_flight_v1_${getProfileName()}_${sid}` } function legacyInFlightKey(sid: string): string | null { return getProfileName() === 'default' ? `hermes_in_flight_v1_${sid}` : null } interface InFlightRun { @@ -303,61 +293,37 @@ function removeItemWithLegacy(key: string, legacyKey?: string | null) { // Strip the circular `file: File` reference from attachments before caching — // File objects don't serialize and we only need name/type/size/url for display. -function sanitizeForCache(msgs: Message[]): Message[] { - return msgs.map(m => { - if (!m.attachments?.length) return m - return { - ...m, - attachments: m.attachments.map(a => ({ id: a.id, name: a.name, type: a.type, size: a.size, url: a.url })), - } - }) -} - -// Heals assistant messages whose `reasoning` field was polluted by the -// old bug where `reasoning.available` clobbered it with the assistant -// content. Detection heuristic: reasoning is a prefix of content (the -// bug always derived `reasoning` from `content[:500]` with tags stripped). -// Legitimate reasoning is almost never a prefix of the final answer. -function scrubBuggyReasoningInCache(msgs: Message[] | null | undefined): Message[] { - if (!msgs) return [] - return msgs.map(m => { - if (m.role !== 'assistant' || !m.reasoning || !m.content) return m - const r = m.reasoning.trim() - const c = m.content.trim() - if (!r || !c) return m - if (c === r || c.startsWith(r)) { - const { reasoning: _drop, ...rest } = m - return rest as Message - } - return m - }) -} export const useChatStore = defineStore('chat', () => { const sessions = ref([]) const activeSessionId = ref(null) const focusMessageId = ref(null) - const streamStates = ref>(new Map()) + const streamStates = ref void }>>(new Map()) const isStreaming = computed(() => activeSessionId.value != null && streamStates.value.has(activeSessionId.value)) const isLoadingSessions = ref(false) const sessionsLoaded = ref(false) const isLoadingMessages = ref(false) - // tmux-like resume state: true when we recovered an in-flight run from - // localStorage after a refresh and are polling fetchSession for progress. - // UI shows the thinking indicator while this is set. - const resumingRuns = ref>(new Set()) - const isRunActive = computed(() => - isStreaming.value - || (activeSessionId.value != null && resumingRuns.value.has(activeSessionId.value)) - ) - const pollTimers = new Map>() - const pollSignatures = new Map() + const isRunActive = computed(() => isStreaming.value) + + // Compression state + const compressionState = ref<{ + compressing: boolean + messageCount: number + beforeTokens: number + afterTokens: number + compressed: boolean | null + error?: string + } | null>(null) + + function setCompressionState(state: typeof compressionState.value) { + compressionState.value = state + } const activeSession = ref(null) const messages = computed(() => activeSession.value?.messages || []) function isSessionLive(sessionId: string): boolean { - if (streamStates.value.has(sessionId) || resumingRuns.value.has(sessionId)) return true + if (streamStates.value.has(sessionId)) return true const session = sessions.value.find(candidate => candidate.id === sessionId) if (!session?.lastActiveAt || session.endedAt != null) return false @@ -365,7 +331,6 @@ export const useChatStore = defineStore('chat', () => { } function persistSessionsList() { - // Cache lightweight summaries only (messages are cached per-session). saveJsonWithLegacy( sessionsCacheKey(), sessions.value.map(s => ({ ...s, messages: [] })), @@ -373,13 +338,6 @@ export const useChatStore = defineStore('chat', () => { ) } - function persistActiveMessages() { - const sid = activeSessionId.value - if (!sid) return - const s = sessions.value.find(sess => sess.id === sid) - if (s) saveJsonWithLegacy(msgsCacheKey(sid), sanitizeForCache(s.messages), legacyMsgsCacheKey(sid)) - } - function markInFlight(sid: string, runId: string) { saveJsonWithLegacy(inFlightKey(sid), { runId, startedAt: Date.now() } as InFlightRun, legacyInFlightKey(sid)) } @@ -398,122 +356,10 @@ export const useChatStore = defineStore('chat', () => { return rec } - function compareServerMessages(local: Message[], server: Message[]) { - const localUserIndexes = local.map((m, i) => (m.role === 'user' ? i : -1)).filter(i => i >= 0) - const serverUserIndexes = server.map((m, i) => (m.role === 'user' ? i : -1)).filter(i => i >= 0) - const localUsers = localUserIndexes.length - const serverUsers = serverUserIndexes.length - - if (serverUsers > localUsers) return { serverIsCaughtUp: true, serverIsAhead: true } - if (serverUsers < localUsers) return { serverIsCaughtUp: false, serverIsAhead: false } - - const localLastUserIndex = localUserIndexes[localUserIndexes.length - 1] ?? -1 - const serverLastUserIndex = serverUserIndexes[serverUserIndexes.length - 1] ?? -1 - const sameCurrentTurn = - localLastUserIndex < 0 - || serverLastUserIndex < 0 - || local[localLastUserIndex]?.content === server[serverLastUserIndex]?.content - - if (!sameCurrentTurn) return { serverIsCaughtUp: false, serverIsAhead: false } - - const localCurrentAssistantLen = local - .slice(localLastUserIndex + 1) - .filter(m => m.role === 'assistant') - .reduce((total, m) => total + (m.content?.length || 0), 0) - const serverCurrentAssistantLen = server - .slice(serverLastUserIndex + 1) - .filter(m => m.role === 'assistant') - .reduce((total, m) => total + (m.content?.length || 0), 0) - - return { - serverIsCaughtUp: true, - serverIsAhead: serverCurrentAssistantLen >= localCurrentAssistantLen, - } - } - - function stopPolling(sid: string) { - const t = pollTimers.get(sid) - if (t) { - clearInterval(t) - pollTimers.delete(sid) - } - pollSignatures.delete(sid) - resumingRuns.value = new Set([...resumingRuns.value].filter(x => x !== sid)) - } - - // Poll fetchSession while an in-flight run is recovering. Exits when the - // server's message signature is stable for POLL_STABLE_EXITS ticks (run - // presumed done), TTL elapses, or the user explicitly starts streaming. - function startPolling(sid: string) { - if (pollTimers.has(sid)) return - resumingRuns.value = new Set([...resumingRuns.value, sid]) - const timer = setInterval(async () => { - // If a fresh SSE stream started for this session, polling is redundant. - if (streamStates.value.has(sid)) { - stopPolling(sid) - return - } - const inFlight = readInFlight(sid) - if (!inFlight) { - stopPolling(sid) - return - } - try { - const detail = await fetchSession(sid) - if (!detail) return - const mapped = mapHermesMessages(detail.messages || []) - const target = sessions.value.find(s => s.id === sid) - if (!target) return - // Use the same current-turn comparison as switchSession: server is - // ahead only when it has a newer user turn or the assistant output - // after the current user turn has caught up. - const local = target.messages - const { serverIsAhead, serverIsCaughtUp } = compareServerMessages(local, mapped) - if (serverIsAhead) { - target.messages = mapped - if (detail.title && !target.title) target.title = detail.title - if (sid === activeSessionId.value) persistActiveMessages() - } - // Stability detection ONLY matters when the server has at least as - // many user turns as we do. Otherwise the server is still catching - // up (e.g. the new turn we just sent hasn't been flushed server-side - // yet) and a "stable" signature is a false positive — the stability - // is the server NOT having our latest turn, not the run being done. - if (!serverIsCaughtUp) { - pollSignatures.delete(sid) - } else { - const last = mapped[mapped.length - 1] - const sig = `${mapped.length}|${last?.content?.slice(-40) || ''}|${last?.toolStatus || ''}` - const prev = pollSignatures.get(sid) - if (prev && prev.sig === sig) { - prev.stableTicks += 1 - if (prev.stableTicks >= POLL_STABLE_EXITS) { - // The server view has stopped changing. If it is still behind - // the locally streamed assistant reply, end recovery without - // retreating local state; otherwise commit the server view. - if (serverIsAhead) { - target.messages = mapped - if (detail.title) target.title = detail.title - if (sid === activeSessionId.value) persistActiveMessages() - } - clearInFlight(sid) - stopPolling(sid) - } - } else { - pollSignatures.set(sid, { sig, stableTicks: 0 }) - } - } - } catch { - // transient network error — ignore, next tick tries again - } - }, POLL_INTERVAL_MS) - pollTimers.set(sid, timer) - } - async function loadSessions() { isLoadingSessions.value = true try { - // 从 profile 对应的缓存中恢复,实现 instant render + // Restore sessions list from cache (lightweight, no messages) const cachedSessions = loadJsonWithFallback(sessionsCacheKey(), legacySessionsCacheKey()) if (cachedSessions?.length) { sessions.value = cachedSessions @@ -521,8 +367,6 @@ export const useChatStore = defineStore('chat', () => { if (savedId) { const cachedActive = cachedSessions.find(s => s.id === savedId) || null if (cachedActive) { - const cachedMsgs = loadJsonWithFallback(msgsCacheKey(savedId), legacyMsgsCacheKey(savedId)) - if (cachedMsgs) cachedActive.messages = scrubBuggyReasoningInCache(cachedMsgs) activeSession.value = cachedActive activeSessionId.value = savedId } @@ -548,8 +392,6 @@ export const useChatStore = defineStore('chat', () => { const localOnly = sessions.value.filter(s => { if (freshIds.has(s.id)) return false if (readInFlight(s.id)) return true - // Session no longer exists on server and no active run — clean up cache - removeItemWithLegacy(msgsCacheKey(s.id), legacyMsgsCacheKey(s.id)) removeItemWithLegacy(inFlightKey(s.id), legacyInFlightKey(s.id)) return false }) @@ -572,10 +414,7 @@ export const useChatStore = defineStore('chat', () => { } } - // Re-pull active session from server without retreating newer locally - // streamed output. Used on SSE drop and on tab-visible events — mobile - // browsers kill EventSource while backgrounded, but the backend run usually - // completes anyway. + // Re-pull active session from server. Used on tab-visible events. async function refreshActiveSession(): Promise { const sid = activeSessionId.value if (!sid) return false @@ -585,11 +424,7 @@ export const useChatStore = defineStore('chat', () => { const target = sessions.value.find(s => s.id === sid) if (!target) return false const mapped = mapHermesMessages(detail.messages || []) - const { serverIsAhead } = compareServerMessages(target.messages, mapped) - if (serverIsAhead) { - target.messages = mapped - persistActiveMessages() - } + target.messages = mapped if (detail.title) target.title = detail.title return true } catch (err) { @@ -626,35 +461,12 @@ export const useChatStore = defineStore('chat', () => { if (!activeSession.value) return - // Hydrate messages from localStorage cache first (instant render), then - // revalidate from server in the background. If no cache exists, show the - // loading state while we fetch. - const hasLocalMessages = activeSession.value.messages.length > 0 - if (!hasLocalMessages) { - const cachedMsgs = loadJsonWithFallback(msgsCacheKey(sessionId), legacyMsgsCacheKey(sessionId)) - if (cachedMsgs?.length) { - activeSession.value.messages = scrubBuggyReasoningInCache(cachedMsgs) - } - } - - const needsBlockingLoad = activeSession.value.messages.length === 0 - if (needsBlockingLoad) isLoadingMessages.value = true + isLoadingMessages.value = true try { const detail = await fetchSession(sessionId) if (detail && detail.messages) { - const mapped = mapHermesMessages(detail.messages) - // Pick whichever view has more information for the current turn. - // Simple message-count comparison is wrong because mapHermesMessages - // folds tool_call-only assistant messages; global last-assistant - // comparison is also wrong across turns. Trust server only when it has - // a newer user turn or its assistant output after the current user turn - // has caught up. - const local = activeSession.value.messages - const { serverIsAhead } = compareServerMessages(local, mapped) - if (serverIsAhead) { - activeSession.value.messages = mapped - } + activeSession.value.messages = mapHermesMessages(detail.messages) // Update title: use Hermes title, or fallback to first user message if (detail.title) { activeSession.value.title = detail.title @@ -665,7 +477,6 @@ export const useChatStore = defineStore('chat', () => { activeSession.value.title = t + (firstUser.content.length > 40 ? '...' : '') } } - persistActiveMessages() } } catch (err) { console.error('Failed to load session messages:', err) @@ -673,12 +484,9 @@ export const useChatStore = defineStore('chat', () => { isLoadingMessages.value = false } - // tmux-like resume: if this session has a recent in-flight run and we're - // not currently streaming, start polling fetchSession to pick up progress - // that happened while we were gone. Exits automatically on stability. - if (readInFlight(sessionId) && !streamStates.value.has(sessionId)) { - startPolling(sessionId) - } + // Always resume via Socket.IO for the active session. + // Server tracks run/compression state per session and replays events. + resumeInFlightRun(sessionId) // Fetch token usage for this session from web-ui DB try { @@ -713,7 +521,6 @@ export const useChatStore = defineStore('chat', () => { async function deleteSession(sessionId: string) { await deleteSessionApi(sessionId) sessions.value = sessions.value.filter(s => s.id !== sessionId) - removeItemWithLegacy(msgsCacheKey(sessionId), legacyMsgsCacheKey(sessionId)) persistSessionsList() if (activeSessionId.value === sessionId) { if (sessions.value.length > 0) { @@ -777,21 +584,10 @@ export const useChatStore = defineStore('chat', () => { timestamp: Date.now(), attachments: attachments && attachments.length > 0 ? attachments : undefined, } - // Build conversation history BEFORE adding the new message, so the - // user's current message appears only in `input` — not duplicated in - // `conversation_history` as well. - const sessionMsgs = getSessionMsgs(sid) - const history: ChatMessage[] = sessionMsgs - .filter(m => (m.role === 'user' || m.role === 'assistant') && m.content.trim()) - .map(m => ({ role: m.role as 'user' | 'assistant' | 'system', content: m.content })) addMessage(sid, userMsg) updateSessionTitle(sid) - // Persist immediately so a refresh before the first SSE event (e.g. the - // user closes the tab right after sending) still has the user's message - // and session title in the cache. if (sid === activeSessionId.value) { - persistActiveMessages() persistSessionsList() } @@ -821,71 +617,65 @@ export const useChatStore = defineStore('chat', () => { const appStore = useAppStore() const sessionModel = activeSession.value?.model || appStore.selectedModel - const run = await startRun({ + const runPayload = { input: inputText, - conversation_history: history, session_id: sid, model: sessionModel || undefined, - }) - - const runId = (run as any).run_id || (run as any).id - if (!runId) { - addMessage(sid, { - id: uid(), - role: 'system', - content: `Error: startRun returned no run ID. Response: ${JSON.stringify(run)}`, - timestamp: Date.now(), - }) - return } - // tmux-like resume: persist run_id so refresh/reopen can pick up the - // working indicator and poll for progress. - markInFlight(sid, runId) - // If we were already polling (e.g. user re-sent while resume was still - // polling an earlier run), cancel that polling — the new SSE stream is - // the authoritative live source. - stopPolling(sid) - // Helper to clean up this session's stream state const cleanup = () => { streamStates.value.delete(sid) - if (persistTimer) { - clearTimeout(persistTimer) - persistTimer = null - } } - // Throttle in-flight cache writes so a refresh mid-stream still shows - // the partial reply. 800ms keeps quota pressure low while guaranteeing - // at most ~1s of unsaved delta on reload. - let persistTimer: ReturnType | null = null // Per-run flags used to detect silently-swallowed errors at run.completed. // hermes-agent occasionally emits run.completed with empty output and no // usage when the agent layer caught an upstream error (e.g. invalid API // key). We need to distinguish: (a) run with assistant text produced, // (b) run with only tool activity, (c) run with truly nothing visible. - // Reset per send() call — closures captured by SSE callbacks are scoped + // Reset per send() call — closures captured by Socket.IO callbacks are scoped // to this run, so there is no cross-run contamination. let runProducedAssistantText = false let runHadToolActivity = false - const schedulePersist = () => { - if (sid !== activeSessionId.value || persistTimer) return - persistTimer = setTimeout(() => { - persistTimer = null - persistActiveMessages() - }, 800) - } - // Listen to SSE events — all closures capture `sid` - const ctrl = streamRunEvents( - runId, + // Send run via Socket.IO and listen to streamed events — all closures capture `sid` + const ctrl = startRunViaSocket( + runPayload, // onEvent (evt: RunEvent) => { switch (evt.event) { case 'run.started': break + case 'compression.started': { + setCompressionState({ + compressing: true, + messageCount: (evt as any).message_count || 0, + beforeTokens: (evt as any).token_count || 0, + afterTokens: 0, + compressed: null, + }) + break + } + + case 'compression.completed': { + setCompressionState({ + compressing: false, + messageCount: (evt as any).totalMessages || 0, + beforeTokens: (evt as any).beforeTokens || 0, + afterTokens: (evt as any).afterTokens || 0, + compressed: (evt as any).compressed ?? false, + error: (evt as any).error, + }) + // Auto-clear after 5s + setTimeout(() => { + if (compressionState.value && !compressionState.value.compressing) { + setCompressionState(null) + } + }, 5000) + break + } + case 'reasoning.delta': case 'thinking.delta': { const text = evt.text || evt.delta || '' @@ -908,7 +698,7 @@ export const useChatStore = defineStore('chat', () => { }) noteReasoningStart(newId) } - schedulePersist() + break } @@ -925,7 +715,7 @@ export const useChatStore = defineStore('chat', () => { // 否则(上游未转发 delta,只发这一次 available)不显示时长。 noteReasoningEnd(last.id) } - schedulePersist() + break } @@ -952,7 +742,7 @@ export const useChatStore = defineStore('chat', () => { isStreaming: true, }) } - schedulePersist() + break } @@ -972,7 +762,7 @@ export const useChatStore = defineStore('chat', () => { toolPreview: evt.preview, toolStatus: 'running', }) - schedulePersist() + break } @@ -986,7 +776,7 @@ export const useChatStore = defineStore('chat', () => { const last = toolMsgs[toolMsgs.length - 1] updateMessage(sid, last.id, { toolStatus: 'done' }) } - schedulePersist() + break } @@ -1048,9 +838,8 @@ export const useChatStore = defineStore('chat', () => { // the next page load to still see in-flight === true (so // polling kicks in and recovers) rather than the other way // around (cleared in-flight + stale streaming cache = UI stuck). - if (sid === activeSessionId.value) persistActiveMessages() + clearInFlight(sid) - stopPolling(sid) break } @@ -1077,9 +866,8 @@ export const useChatStore = defineStore('chat', () => { } }) cleanup() - if (sid === activeSessionId.value) persistActiveMessages() + clearInFlight(sid) - stopPolling(sid) break } } @@ -1095,21 +883,13 @@ export const useChatStore = defineStore('chat', () => { updateSessionTitle(sid) }, // onError - // Mobile browsers drop EventSource when the tab backgrounds / screen - // locks / network flips. The backend run usually completes anyway, so - // rather than injecting a stale "SSE connection error" bubble we mark - // streaming as done and silently re-sync from the server, which has - // the real final answer. If the server fetch itself fails, we leave - // whatever text we already streamed in place — no visible error. (err) => { - console.warn('SSE connection dropped, resyncing from server:', err.message) + console.warn('Socket.IO run stream error:', err.message) const msgs = getSessionMsgs(sid) const last = msgs[msgs.length - 1] if (last?.isStreaming) { updateMessage(sid, last.id, { isStreaming: false }) } - // Any tool messages still marked 'running' will be replaced by the - // server's view after refresh; clear their spinner state now. msgs.forEach((m, i) => { if (m.role === 'tool' && m.toolStatus === 'running') { msgs[i] = { ...m, toolStatus: 'done' } @@ -1119,12 +899,10 @@ export const useChatStore = defineStore('chat', () => { if (sid === activeSessionId.value) { void refreshActiveSession() } - // The run might still be going on the server side (SSE drop doesn't - // abort it). If we still have an in-flight record, fall back to - // polling fetchSession to keep the user updated. - if (readInFlight(sid)) { - startPolling(sid) - } + }, + // onStarted — called when server acks with run_id + (runId: string) => { + markInFlight(sid, runId) }, ) @@ -1139,6 +917,264 @@ export const useChatStore = defineStore('chat', () => { } } + /** + * Resume an in-flight run after page refresh. + * Emits 'resume' to join the session room on the server, + * then sets up event listeners to receive ongoing events. + */ + function resumeInFlightRun(sid: string) { + const socket = connectChatRun() + let closed = false + let runProducedAssistantText = false + let runHadToolActivity = false + + const cleanup = () => { + if (closed) return + closed = true + socket.off('run.started', onRunStarted) + socket.off('run.failed', onRunFailed) + socket.off('message.delta', onMessageDelta) + socket.off('reasoning.delta', onReasoningDelta) + socket.off('thinking.delta', onThinkingDelta) + socket.off('reasoning.available', onReasoningAvailable) + socket.off('tool.started', onToolStarted) + socket.off('tool.completed', onToolCompleted) + socket.off('run.completed', onRunCompleted) + socket.off('compression.started', onCompressionStarted) + socket.off('compression.completed', onCompressionCompleted) + streamStates.value.delete(sid) + } + + // Shared event handler — filters by session_id tag + function handleEvent(evt: RunEvent) { + if (closed) return + // Filter events for this session (server tags all events with session_id) + if (evt.session_id && evt.session_id !== sid) return + switch (evt.event) { + case 'run.started': + break + + case 'compression.started': { + setCompressionState({ + compressing: true, + messageCount: (evt as any).message_count || 0, + beforeTokens: (evt as any).token_count || 0, + afterTokens: 0, + compressed: null, + }) + break + } + + case 'compression.completed': { + setCompressionState({ + compressing: false, + messageCount: (evt as any).totalMessages || 0, + beforeTokens: (evt as any).beforeTokens || 0, + afterTokens: (evt as any).afterTokens || 0, + compressed: (evt as any).compressed ?? false, + error: (evt as any).error, + }) + setTimeout(() => { + if (compressionState.value && !compressionState.value.compressing) { + setCompressionState(null) + } + }, 5000) + break + } + + case 'reasoning.delta': + case 'thinking.delta': { + const text = evt.text || evt.delta || '' + if (!text) break + runProducedAssistantText = true + const msgs = getSessionMsgs(sid) + const last = msgs[msgs.length - 1] + if (last?.role === 'assistant' && last.isStreaming) { + last.reasoning = (last.reasoning || '') + text + noteReasoningStart(last.id) + } else { + const newId = uid() + addMessage(sid, { + id: newId, + role: 'assistant', + content: '', + timestamp: Date.now(), + isStreaming: true, + reasoning: text, + }) + noteReasoningStart(newId) + } + + break + } + + case 'reasoning.available': { + const msgs = getSessionMsgs(sid) + const last = msgs[msgs.length - 1] + if (last?.role === 'assistant' && last.isStreaming) { + noteReasoningEnd(last.id) + } + + break + } + + case 'message.delta': { + if (evt.delta) runProducedAssistantText = true + const msgs = getSessionMsgs(sid) + const last = msgs[msgs.length - 1] + if (last?.role === 'assistant' && last.isStreaming) { + const prev = last.content + const next = prev + (evt.delta || '') + noteThinkingDelta(last.id, prev, next) + if (last.reasoning) noteReasoningEnd(last.id) + last.content = next + } else { + const newId = uid() + const nextContent = evt.delta || '' + noteThinkingDelta(newId, '', nextContent) + addMessage(sid, { + id: newId, + role: 'assistant', + content: nextContent, + timestamp: Date.now(), + isStreaming: true, + }) + } + + break + } + + case 'tool.started': { + runHadToolActivity = true + const msgs = getSessionMsgs(sid) + const last = msgs[msgs.length - 1] + if (last?.isStreaming) { + updateMessage(sid, last.id, { isStreaming: false }) + } + addMessage(sid, { + id: uid(), + role: 'tool', + content: '', + timestamp: Date.now(), + toolName: evt.tool || evt.name, + toolPreview: evt.preview, + toolStatus: 'running', + }) + + break + } + + case 'tool.completed': { + runHadToolActivity = true + const msgs = getSessionMsgs(sid) + const toolMsgs = msgs.filter(m => m.role === 'tool' && m.toolStatus === 'running') + if (toolMsgs.length > 0) { + updateMessage(sid, toolMsgs[toolMsgs.length - 1].id, { toolStatus: 'done' }) + } + + break + } + + case 'run.completed': { + const msgs = getSessionMsgs(sid) + const lastMsg = msgs[msgs.length - 1] + if (lastMsg?.isStreaming) { + updateMessage(sid, lastMsg.id, { isStreaming: false }) + } + if (evt.usage) { + const target = sessions.value.find(s => s.id === sid) + if (target) { + target.inputTokens = evt.usage.input_tokens + target.outputTokens = evt.usage.output_tokens + } + } + const finalOutput = typeof evt.output === 'string' ? evt.output : '' + const finalOutputTrimmed = finalOutput.trim() + if (!runProducedAssistantText && finalOutputTrimmed !== '') { + addMessage(sid, { + id: uid(), + role: 'assistant', + content: finalOutput, + timestamp: Date.now(), + }) + } + const swallowedError = !runProducedAssistantText && !runHadToolActivity && finalOutputTrimmed === '' + if (swallowedError) { + addMessage(sid, { + id: uid(), + role: 'system', + content: 'Error: Agent returned no output. The model call may have failed (e.g. invalid API key, model not supported by provider, or context exceeded). Check the hermes-agent logs for details.', + timestamp: Date.now(), + }) + } + cleanup() + updateSessionTitle(sid) + + clearInFlight(sid) + break + } + + case 'run.failed': { + const msgs = getSessionMsgs(sid) + const lastErr = msgs[msgs.length - 1] + if (lastErr?.isStreaming) { + updateMessage(sid, lastErr.id, { + isStreaming: false, + content: evt.error ? `Error: ${evt.error}` : 'Run failed', + role: 'system', + }) + } else { + addMessage(sid, { + id: uid(), + role: 'system', + content: evt.error ? `Error: ${evt.error}` : 'Run failed', + timestamp: Date.now(), + }) + } + msgs.forEach((m, i) => { + if (m.role === 'tool' && m.toolStatus === 'running') { + msgs[i] = { ...m, toolStatus: 'error' } + } + }) + cleanup() + + clearInFlight(sid) + break + } + } + } + + function onRunStarted(data: RunEvent) { handleEvent(data) } + function onRunFailed(data: RunEvent) { handleEvent(data) } + function onMessageDelta(data: RunEvent) { handleEvent(data) } + function onReasoningDelta(data: RunEvent) { handleEvent(data) } + function onThinkingDelta(data: RunEvent) { handleEvent(data) } + function onReasoningAvailable(data: RunEvent) { handleEvent(data) } + function onToolStarted(data: RunEvent) { handleEvent(data) } + function onToolCompleted(data: RunEvent) { handleEvent(data) } + function onRunCompleted(data: RunEvent) { handleEvent(data) } + function onCompressionStarted(data: RunEvent) { handleEvent(data) } + function onCompressionCompleted(data: RunEvent) { handleEvent(data) } + + socket.on('run.started', onRunStarted) + socket.on('run.failed', onRunFailed) + socket.on('message.delta', onMessageDelta) + socket.on('reasoning.delta', onReasoningDelta) + socket.on('thinking.delta', onThinkingDelta) + socket.on('reasoning.available', onReasoningAvailable) + socket.on('tool.started', onToolStarted) + socket.on('tool.completed', onToolCompleted) + socket.on('run.completed', onRunCompleted) + socket.on('compression.started', onCompressionStarted) + socket.on('compression.completed', onCompressionCompleted) + + // Emit resume to join the session room + socket.emit('resume', { session_id: sid }) + + // Mark as streaming so UI shows the indicator + streamStates.value.set(sid, { abort: cleanup }) + } + function stopStreaming() { const sid = activeSessionId.value if (!sid) return @@ -1151,9 +1187,8 @@ export const useChatStore = defineStore('chat', () => { updateMessage(sid, lastMsg.id, { isStreaming: false }) } streamStates.value.delete(sid) - clearInFlight(sid) - stopPolling(sid) } + clearInFlight(sid) } // Tab visibility: re-sync when returning to foreground @@ -1161,8 +1196,10 @@ export const useChatStore = defineStore('chat', () => { document.addEventListener('visibilitychange', () => { if (document.visibilityState === 'visible' && activeSessionId.value && !isStreaming.value) { void refreshActiveSession() - if (readInFlight(activeSessionId.value)) { - startPolling(activeSessionId.value) + // Re-subscribe in case Socket.IO reconnected + const sid = activeSessionId.value + if (sid && !streamStates.value.has(sid)) { + resumeInFlightRun(sid) } } }) @@ -1237,6 +1274,7 @@ export const useChatStore = defineStore('chat', () => { isStreaming, isRunActive, isSessionLive, + compressionState, isLoadingSessions, sessionsLoaded, isLoadingMessages, diff --git a/packages/server/src/db/hermes/compression-snapshot.ts b/packages/server/src/db/hermes/compression-snapshot.ts new file mode 100644 index 00000000..f57d7163 --- /dev/null +++ b/packages/server/src/db/hermes/compression-snapshot.ts @@ -0,0 +1,55 @@ +/** + * SQLite-backed compression snapshot store for 1:1 chat sessions. + * + * Stores the latest compression summary and the index of the last + * compressed message, so incremental compression can pick up where + * the previous one left off. + */ + +import { isSqliteAvailable, ensureTable, getDb } from '../index' + +const TABLE = 'chat_compression_snapshots' + +const SCHEMA: Record = { + session_id: 'TEXT PRIMARY KEY', + summary: 'TEXT NOT NULL DEFAULT \'\'', + last_message_index: 'INTEGER NOT NULL DEFAULT 0', + message_count_at_time: 'INTEGER NOT NULL DEFAULT 0', + updated_at: 'INTEGER NOT NULL', +} + +export function initCompressionSnapshotStore(): void { + if (isSqliteAvailable()) { + ensureTable(TABLE, SCHEMA) + } +} + +export function getCompressionSnapshot(sessionId: string): { summary: string; lastMessageIndex: number; messageCountAtTime: number } | null { + if (!isSqliteAvailable()) return null + return getDb()!.prepare( + `SELECT summary, last_message_index AS lastMessageIndex, message_count_at_time AS messageCountAtTime FROM ${TABLE} WHERE session_id = ?`, + ).get(sessionId) as any ?? null +} + +export function saveCompressionSnapshot( + sessionId: string, + summary: string, + lastMessageIndex: number, + messageCountAtTime: number, +): void { + if (!isSqliteAvailable()) return + getDb()!.prepare( + `INSERT INTO ${TABLE} (session_id, summary, last_message_index, message_count_at_time, updated_at) + VALUES (?, ?, ?, ?, ?) + ON CONFLICT(session_id) DO UPDATE SET + summary = excluded.summary, + last_message_index = excluded.last_message_index, + message_count_at_time = excluded.message_count_at_time, + updated_at = excluded.updated_at`, + ).run(sessionId, summary, lastMessageIndex, messageCountAtTime, Date.now()) +} + +export function deleteCompressionSnapshot(sessionId: string): void { + if (!isSqliteAvailable()) return + getDb()!.prepare(`DELETE FROM ${TABLE} WHERE session_id = ?`).run(sessionId) +} diff --git a/packages/server/src/db/hermes/sessions-db.ts b/packages/server/src/db/hermes/sessions-db.ts index c512e0cd..2204ed21 100644 --- a/packages/server/src/db/hermes/sessions-db.ts +++ b/packages/server/src/db/hermes/sessions-db.ts @@ -242,7 +242,7 @@ function runLiteralContentSearch( ${SESSION_SELECT}, s.parent_session_id AS parent_session_id FROM sessions s - WHERE s.source != 'tool' + WHERE s.source != 'tool' AND s.id NOT LIKE 'compress_%' ${sourceClause} ) SELECT @@ -411,7 +411,7 @@ function loadAllSessions(db: { prepare: (sql: string) => { all: (...params: any[ ${SESSION_SELECT}, s.parent_session_id AS parent_session_id FROM sessions s - WHERE s.source != 'tool' + WHERE s.source != 'tool' AND s.id NOT LIKE 'compress_%' `).all() as Record[] const sessions = rows.map(mapInternalSessionRow) const byId = new Map(sessions.map(s => [s.id, s])) @@ -623,7 +623,7 @@ export async function listSessionSummaries(source?: string, limit = 2000): Promi const db = new DatabaseSync(sessionDbPath(), { open: true, readOnly: true }) try { - const clauses = ["s.parent_session_id IS NULL", "s.source != 'tool'"] + const clauses = ["s.parent_session_id IS NULL", "s.source != 'tool'", "s.id NOT LIKE 'compress_%'"] const params: any[] = [] if (source) { clauses.push('s.source = ?') @@ -689,7 +689,7 @@ export async function searchSessionSummaries( ${SESSION_SELECT}, s.parent_session_id AS parent_session_id FROM sessions s - WHERE s.source != 'tool' + WHERE s.source != 'tool' AND s.id NOT LIKE 'compress_%' ${sourceClause} ` diff --git a/packages/server/src/db/index.ts b/packages/server/src/db/index.ts index 7a9b910b..1a9feaf6 100644 --- a/packages/server/src/db/index.ts +++ b/packages/server/src/db/index.ts @@ -3,7 +3,10 @@ import { mkdirSync, readFileSync, writeFileSync, existsSync } from 'fs' import { resolve } from 'path' import { homedir } from 'os' -const DB_DIR = resolve(homedir(), '.hermes-web-ui') +const isDev = process.env.NODE_ENV !== 'production' +const DB_DIR = isDev + ? resolve(process.cwd(), 'packages/server/data') + : resolve(homedir(), '.hermes-web-ui') const DB_PATH = resolve(DB_DIR, 'hermes-web-ui.db') const JSON_PATH = resolve(DB_DIR, 'hermes-web-ui.json') @@ -27,7 +30,7 @@ export function getDb(): DatabaseSync | null { if (!_db) { mkdirSync(DB_DIR, { recursive: true }) _db = new DatabaseSync(DB_PATH) - _db.exec('PRAGMA journal_mode=WAL') + _db.exec('PRAGMA journal_mode=DELETE') _db.exec('PRAGMA foreign_keys=ON') } return _db diff --git a/packages/server/src/index.ts b/packages/server/src/index.ts index c40eae97..6a42ceca 100644 --- a/packages/server/src/index.ts +++ b/packages/server/src/index.ts @@ -15,7 +15,9 @@ import { setupTerminalWebSocket } from './routes/hermes/terminal' import { startVersionCheck } from './routes/health' import { registerRoutes } from './routes' import { setGroupChatServer } from './routes/hermes/group-chat' +import { setChatRunServer } from './routes/hermes/chat-run' import { GroupChatServer } from './services/hermes/group-chat' +import { ChatRunSocket } from './services/hermes/chat-run-socket' import { logger } from './services/logger' // Injected by esbuild at build time; fallback to reading package.json in dev mode @@ -52,6 +54,10 @@ export async function bootstrap() { initUsageStore() console.log('[bootstrap] usage store initialized') + const { initCompressionSnapshotStore } = await import('./db/hermes/compression-snapshot') + initCompressionSnapshotStore() + console.log('[bootstrap] compression snapshot store initialized') + app.use(cors({ origin: config.corsOrigins })) app.use(bodyParser()) console.log('[bootstrap] cors + bodyParser registered') @@ -92,6 +98,11 @@ export async function bootstrap() { setGroupChatServer(groupChatServer) groupChatServer.setGatewayManager(getGatewayManagerInstance()) + // Chat run Socket.IO — shares the same Server instance, just adds /chat-run namespace + const chatRunServer = new ChatRunSocket(groupChatServer.getIO(), getGatewayManagerInstance()) + setChatRunServer(chatRunServer) + chatRunServer.init() + // Catch-all: destroy upgrade requests not handled by terminal or Socket.IO server.on('upgrade', (req: any, socket: any) => { const url = new URL(req.url || '', `http://${req.headers.host}`) diff --git a/packages/server/src/lib/context-compressor/index.ts b/packages/server/src/lib/context-compressor/index.ts new file mode 100644 index 00000000..4a6c6231 --- /dev/null +++ b/packages/server/src/lib/context-compressor/index.ts @@ -0,0 +1,592 @@ +/** + * Chat Context Compressor + * + * Compresses 1:1 chat conversation history before sending to upstream. + * Uses the Hermes structured summary prompt for LLM-based compression. + * + * Algorithm: + * 1. If total tokens < trigger threshold → return as-is + * 2. Pre-clean: truncate old tool results (no LLM call) + * 3. Load snapshot from SQLite for incremental update + * 4. Keep last 20 messages verbatim (tail protection by message count) + * 5. Summarize everything before the tail + * 6. Save snapshot: last_message_index = index where compression ends + */ + +import { EventSource } from 'eventsource' +import { encodingForModel, getEncoding } from 'js-tiktoken' +import { logger } from '../../services/logger' +import { + getCompressionSnapshot, + saveCompressionSnapshot, + deleteCompressionSnapshot, +} from '../../db/hermes/compression-snapshot' + +// ─── Types ─────────────────────────────────────────────── + +export interface ChatMessage { + role: string + content: string + tool_calls?: Array<{ id: string; type: string; function: { name: string; arguments: string } }> + tool_call_id?: string + name?: string +} + +export interface CompressionConfig { + /** Token threshold to trigger compression (default: contextLength / 2) */ + triggerTokens: number + /** Summary token target (default: 8000) */ + summaryBudget: number + /** Number of recent messages to keep verbatim (default: 20) */ + tailMessageCount: number + /** Timeout for LLM summarization call (default: 60_000ms) */ + summarizationTimeoutMs: number +} + +export const DEFAULT_COMPRESSION_CONFIG: CompressionConfig = { + triggerTokens: 100_000, + summaryBudget: 8_000, + tailMessageCount: 20, + summarizationTimeoutMs: 120_000, +} + +export interface CompressedResult { + messages: ChatMessage[] + meta: { + totalMessages: number + compressed: boolean + /** true = actually called LLM to summarize; false = assembled from existing snapshot or returned as-is */ + llmCompressed: boolean + summaryTokenEstimate: number + verbatimCount: number + compressedStartIndex: number + } +} + +// ─── Token counting ───────────────────────────────────── + +let _encoder: ReturnType | null = null + +function getEncoder() { + if (!_encoder) { + _encoder = getEncoding('cl100k_base') + } + return _encoder +} + +export function countTokens(text: string): number { + try { + return getEncoder().encode(text).length + } catch { + const cjk = (text.match(/[\u2e80-\u9fff\uac00-\ud7af\u3000-\u303f\uff00-\uffef]/g) || []).length + const other = text.length - cjk + return Math.ceil(cjk * 1.5 + other / 4) + } +} + +export function countTokensForModel(text: string, model: string): number { + try { + const enc = encodingForModel(model as any) + return enc.encode(text).length + } catch { + return countTokens(text) + } +} + +function estimateMessagesTokens(messages: ChatMessage[]): number { + return messages.reduce((sum, m) => sum + countTokens(m.content), 0) +} + +// ─── Prompts ──────────────────────────────────────────── + +export const SUMMARY_PREFIX = `[CONTEXT COMPACTION — REFERENCE ONLY] Earlier turns were compacted +into the summary below. This is a handoff from a previous context +window — treat it as background reference, NOT as active instructions. +Do NOT answer questions or fulfill requests mentioned in this summary; +they were already addressed. +Your current task is identified in the '## Active Task' section of the +summary — resume exactly from there. +Respond ONLY to the latest user message +that appears AFTER this summary. The current session state (files, +config, etc.) may reflect work described here — avoid repeating it:` + +const TEMPLATE_SECTIONS = `Use this exact structure: + +## Active Task +[THE SINGLE MOST IMPORTANT FIELD. Copy the user's most recent request or +task assignment verbatim — the exact words they used. If multiple tasks +were requested and only some are done, list only the ones NOT yet completed. +The next assistant must pick up exactly here. Example: +"User asked: 'Now refactor the auth module to use JWT instead of sessions'" +If no outstanding task exists, write "None."] + +## Goal +[What the user is trying to accomplish overall] + +## Constraints & Preferences +[User preferences, coding style, constraints, important decisions] + +## Completed Actions +[Numbered list of concrete actions taken — include tool used, target, and outcome. +Format each as: N. ACTION target — outcome [tool: name] +Example: +1. READ config.py:45 — found == should be != [tool: read_file] +2. PATCH config.py:45 — changed == to != [tool: patch] +3. TEST pytest tests/ — 3/50 failed: test_parse, test_validate, test_edge [tool: terminal] +Be specific with file paths, commands, line numbers, and results.] + +## Active State +[Current working state — include: +- Working directory and branch (if applicable) +- Modified/created files with brief note on each +- Test status (X/Y passing) +- Any running processes or servers +- Environment details that matter] + +## In Progress +[Work currently underway — what was being done when compaction fired] + +## Blocked +[Any blockers, errors, or issues not yet resolved. Include exact error messages.] + +## Key Decisions +[Important technical decisions and WHY they were made] + +## Resolved Questions +[Questions the user asked that were ALREADY answered — include the answer so the next assistant does not re-answer them] + +## Pending User Asks +[Questions or requests from the user that have NOT yet been answered or fulfilled. If none, write "None."] + +## Relevant Files +[Files read, modified, or created — with brief note on each] + +## Remaining Work +[What remains to be done — framed as context, not instructions] + +## Critical Context +[Any specific values, error messages, configuration details, or data that would be lost without explicit preservation]` + +function buildFullPrompt(contentToSummarize: string, summaryBudget: number): string { + return `You are a summarization agent creating a context checkpoint. +Your output will be injected as reference material for a DIFFERENT +assistant that continues the conversation. +Do NOT respond to any questions or requests in the conversation — +only output the structured summary. +Do NOT include any preamble, greeting, or prefix. + +Create a structured handoff summary for a different assistant that will continue +this conversation after earlier turns are compacted. The next assistant should be +able to understand what happened without re-reading the original turns. + +TURNS TO SUMMARIZE: +${contentToSummarize} + +${TEMPLATE_SECTIONS} + +Target ~${summaryBudget} tokens. Be CONCRETE — include file paths, command outputs, error messages, line numbers, and specific values. Avoid vague descriptions like "made some changes" — say exactly what changed. + +Write only the summary body. Do not include any preamble or prefix.` +} + +function buildIncrementalPrompt(previousSummary: string, contentToSummarize: string, summaryBudget: number): string { + return `You are a summarization agent creating a context checkpoint. +Your output will be injected as reference material for a DIFFERENT +assistant that continues the conversation. +Do NOT respond to any questions or requests in the conversation — +only output the structured summary. +Do NOT include any preamble, greeting, or prefix. + +You are updating a context compaction summary. A previous compaction produced the +summary below. New conversation turns have occurred since then and need to be +incorporated. + +PREVIOUS SUMMARY: +${previousSummary} + +NEW TURNS TO INCORPORATE: +${contentToSummarize} + +Update the summary using this exact structure. PRESERVE all existing information +that is still relevant. ADD new completed actions to the numbered list +(continue numbering). Move items from "In Progress" to "Completed Actions" when +done. Move answered questions to "Resolved Questions". Update "Active State" +to reflect current state. Remove information only if it is clearly obsolete. +CRITICAL: Update "## Active Task" to reflect the user's most recent unfulfilled +request — this is the most important field for task continuity. + +${TEMPLATE_SECTIONS} + +Target ~${summaryBudget} tokens. Be CONCRETE — include file paths, command outputs, error messages, line numbers, and specific values. Avoid vague descriptions like "made some changes" — say exactly what changed. + +Write only the summary body. Do not include any preamble or prefix.` +} + +// ─── Pre-cleaning ─────────────────────────────────────── + +function serializeForSummary(messages: ChatMessage[]): string { + const parts: string[] = [] + for (const msg of messages) { + const role = msg.role === 'tool' ? `[tool:${msg.name || 'unknown'}]` : msg.role + let content = msg.content || '' + + if (msg.role === 'tool' && content.length > 5500) { + content = content.slice(0, 4000) + '\n... [truncated]\n...' + content.slice(-1500) + } + + if (msg.role === 'assistant' && msg.tool_calls?.length) { + const toolsInfo = msg.tool_calls.map(tc => { + let args = tc.function.arguments + if (args.length > 1500) args = args.slice(0, 1500) + '...' + return `[tool_call: ${tc.function.name}(${args})]` + }).join('\n') + parts.push(`${role}: ${toolsInfo}`) + if (content.trim()) parts.push(`${role}: ${content}`) + } else { + parts.push(`${role}: ${content}`) + } + } + return parts.join('\n\n') +} + +function pruneOldToolResults(messages: ChatMessage[], keepRecentCount: number): ChatMessage[] { + if (messages.length <= keepRecentCount) return messages + + const tail = messages.slice(-keepRecentCount) + const head = messages.slice(0, -keepRecentCount) + + const pruned = head.map(msg => { + if (msg.role !== 'tool') return msg + const content = msg.content || '' + const preview = content.slice(0, 100).replace(/\n/g, ' ') + const truncated = content.length > 100 ? '...' : '' + return { ...msg, content: `[${msg.name || 'tool'}] ${preview}${truncated}` } + }) + + return [...pruned, ...tail] +} + +// ─── LLM Summarization ────────────────────────────────── + +async function callSummarizer( + upstream: string, + apiKey: string | undefined, + prompt: string, + history: Array<{ role: string; content: string }>, + timeoutMs: number, + previousSummary?: string, +): Promise { + const sessionId = `compress_${Date.now().toString(36)}_${Math.random().toString(36).slice(2, 8)}` + + const convHistory: Array<{ role: string; content: string }> = [...history] + + if (previousSummary) { + convHistory.unshift( + { role: 'user', content: `[Previous summary]\n${previousSummary}` }, + { role: 'assistant', content: 'Understood, I will update the summary.' }, + ) + } + + const headers: Record = { 'Content-Type': 'application/json' } + if (apiKey) headers['Authorization'] = `Bearer ${apiKey}` + + const res = await fetch(`${upstream}/v1/runs`, { + method: 'POST', + headers, + body: JSON.stringify({ + input: prompt, + conversation_history: convHistory, + session_id: sessionId, + }), + signal: AbortSignal.timeout(timeoutMs), + }) + + if (!res.ok) { + throw new Error(`Summarization run failed: ${res.status}`) + } + + const { run_id } = await res.json() as { run_id: string } + + return new Promise((resolve, reject) => { + const timer = setTimeout(() => { + source.close() + reject(new Error('Summarization timed out')) + }, timeoutMs) + + const eventsUrl = new URL(`${upstream}/v1/runs/${run_id}/events`) + if (apiKey) eventsUrl.searchParams.set('token', apiKey) + + const source = new EventSource(eventsUrl.toString()) + + source.onmessage = (event: MessageEvent) => { + try { + const parsed = JSON.parse(event.data) + if (parsed.event === 'run.completed') { + clearTimeout(timer) + source.close() + deleteCompressSession(upstream, apiKey, sessionId).catch(() => {}) + const output = parsed.output + if (!output || typeof output !== 'string' || output.trim() === '') { + reject(new Error('Empty summarization response')) + return + } + resolve(output.trim()) + } else if (parsed.event === 'run.failed') { + clearTimeout(timer) + source.close() + deleteCompressSession(upstream, apiKey, sessionId).catch(() => {}) + reject(new Error(parsed.error || 'Summarization run failed')) + } + } catch { /* ignore parse errors */ } + } + + source.onerror = () => { + clearTimeout(timer) + source.close() + deleteCompressSession(upstream, apiKey, sessionId).catch(() => {}) + reject(new Error('Summarization SSE connection error')) + } + }) +} + +/** Best-effort delete the temporary compression session from the gateway */ +async function deleteCompressSession(upstream: string, apiKey: string | undefined, sessionId: string): Promise { + try { + const headers: Record = {} + if (apiKey) headers['Authorization'] = `Bearer ${apiKey}` + await fetch(`${upstream}/api/sessions/${sessionId}`, { + method: 'DELETE', + headers, + signal: AbortSignal.timeout(5000), + }) + } catch { /* best-effort */ } +} + +// ─── Main Compressor ──────────────────────────────────── + +export class ChatContextCompressor { + private config: CompressionConfig + + constructor(opts?: { + config?: Partial + }) { + this.config = { ...DEFAULT_COMPRESSION_CONFIG, ...opts?.config } + } + + /** + * Assemble and compress conversation history. + * + * Flow: + * 1. Check snapshot → if exists, assemble = summary + new messages after snapshot index + * 2. If no snapshot → assemble = all messages + * 3. Count tokens of assembled context + * 4. Under threshold → return assembled as-is (no LLM call) + * 5. Over threshold → LLM compress, keep last N messages, save new snapshot + */ + async compress( + messages: ChatMessage[], + upstream: string, + apiKey: string | undefined, + sessionId?: string, + contextLength?: number, + ): Promise { + const cl = contextLength || 200_000 + const triggerTokens = Math.floor(cl / 2) + const total = messages.length + + const makeMeta = (opts: Partial = {}): CompressedResult['meta'] => ({ + totalMessages: total, + compressed: false, + llmCompressed: false, + summaryTokenEstimate: 0, + verbatimCount: total, + compressedStartIndex: -1, + ...opts, + }) + + // ── Step 1: Check snapshot first ───────────────────── + const snapshot = sessionId ? getCompressionSnapshot(sessionId) : null + + if (snapshot) { + const { summary: previousSummary, lastMessageIndex } = snapshot + const newMessages = messages.slice(lastMessageIndex + 1) + const summaryTokens = countTokens(SUMMARY_PREFIX + previousSummary) + const newTokens = estimateMessagesTokens(newMessages) + const assembledTokens = summaryTokens + newTokens + + logger.info( + '[context-compressor] session=%s: snapshot at %d, %d new messages, assembled ~%d tokens (threshold %d)', + sessionId, lastMessageIndex, newMessages.length, assembledTokens, triggerTokens, + ) + + // Under threshold → return summary + new messages, no LLM call + if (assembledTokens <= triggerTokens) { + const result: ChatMessage[] = [ + { role: 'system', content: SUMMARY_PREFIX + '\n\n' + previousSummary }, + ...newMessages, + ] + return { + messages: result, + meta: makeMeta({ + compressed: true, + llmCompressed: false, + summaryTokenEstimate: summaryTokens, + verbatimCount: newMessages.length, + compressedStartIndex: lastMessageIndex, + }), + } + } + + // Over threshold → incremental LLM compress + return this.incrementalCompress( + messages, snapshot, upstream, apiKey, sessionId!, makeMeta(), + ) + } + + // ── Step 2: No snapshot — check all messages ────────── + const totalTokens = estimateMessagesTokens(messages) + + logger.info( + '[context-compressor] session=%s: no snapshot, %d messages, ~%d tokens (threshold %d)', + sessionId, total, totalTokens, triggerTokens, + ) + + if (totalTokens <= triggerTokens) { + return { messages, meta: makeMeta() } + } + + // Over threshold → full LLM compress + return this.fullCompress(messages, upstream, apiKey, sessionId!, makeMeta()) + } + + private async incrementalCompress( + messages: ChatMessage[], + snapshot: { summary: string; lastMessageIndex: number }, + upstream: string, + apiKey: string | undefined, + sessionId: string, + meta: CompressedResult['meta'], + ): Promise { + const { summary: previousSummary, lastMessageIndex } = snapshot + const total = messages.length + const cleaned = pruneOldToolResults(messages, this.config.tailMessageCount) + const newMessages = cleaned.slice(lastMessageIndex + 1) + const tailCount = this.config.tailMessageCount + + // Keep last N of new messages, compress the rest + const tailStart = Math.max(0, newMessages.length - tailCount) + const toCompress = newMessages.slice(0, tailStart) + const tail = newMessages.slice(tailStart) + + logger.info( + '[context-compressor] [incremental-llm] compressing %d of %d new messages, keeping %d tail', + toCompress.length, newMessages.length, tail.length, + ) + + let summary: string | null = null + try { + const contentToSummarize = serializeForSummary(toCompress) + const prompt = buildIncrementalPrompt(previousSummary, contentToSummarize, this.config.summaryBudget) + const history = toCompress + .filter(m => m.role === 'user' || m.role === 'assistant') + .map(m => ({ role: m.role, content: m.content })) + + const t0 = Date.now() + summary = await callSummarizer(upstream, apiKey, prompt, history, this.config.summarizationTimeoutMs, previousSummary) + logger.info('[context-compressor] incremental-llm done in %dms, %d chars', Date.now() - t0, summary.length) + } catch (err: any) { + logger.warn('[context-compressor] incremental-llm failed: %s — reusing previous summary', err.message) + summary = previousSummary + } + + const result: ChatMessage[] = [ + { role: 'system', content: SUMMARY_PREFIX + '\n\n' + summary }, + ...tail, + ] + + const newLastIndex = lastMessageIndex + tailStart + if (sessionId) { + saveCompressionSnapshot(sessionId, summary, newLastIndex, total) + } + + return { + messages: result, + meta: { + ...meta, + compressed: true, + llmCompressed: true, + summaryTokenEstimate: countTokens(SUMMARY_PREFIX + summary), + verbatimCount: tail.length, + compressedStartIndex: newLastIndex, + }, + } + } + + private async fullCompress( + messages: ChatMessage[], + upstream: string, + apiKey: string | undefined, + sessionId: string, + meta: CompressedResult['meta'], + ): Promise { + const total = messages.length + const cleaned = pruneOldToolResults(messages, this.config.tailMessageCount) + const tailCount = this.config.tailMessageCount + + if (total <= tailCount) { + return { messages: cleaned, meta } + } + + const tailStart = total - tailCount + const toCompress = cleaned.slice(0, tailStart) + const tail = cleaned.slice(tailStart) + + logger.info( + '[context-compressor] [full-llm] compressing messages 0-%d, keeping %d-%d', + tailStart - 1, tailStart, total - 1, + ) + + const contentToSummarize = serializeForSummary(toCompress) + const prompt = buildFullPrompt(contentToSummarize, this.config.summaryBudget) + const history = toCompress + .filter(m => m.role === 'user' || m.role === 'assistant') + .map(m => ({ role: m.role, content: m.content })) + + let summary: string | null = null + try { + const t0 = Date.now() + summary = await callSummarizer(upstream, apiKey, prompt, history, this.config.summarizationTimeoutMs) + logger.info('[context-compressor] full-llm done in %dms, %d chars', Date.now() - t0, summary.length) + } catch (err: any) { + logger.warn('[context-compressor] full-llm failed: %s', err.message) + } + + const result: ChatMessage[] = [] + + if (summary) { + result.push({ role: 'system', content: SUMMARY_PREFIX + '\n\n' + summary }) + if (sessionId) { + saveCompressionSnapshot(sessionId, summary, tailStart - 1, total) + } + } + + result.push(...tail) + + return { + messages: result, + meta: { + ...meta, + compressed: true, + llmCompressed: !!summary, + summaryTokenEstimate: summary ? countTokens(SUMMARY_PREFIX + summary) : 0, + verbatimCount: tail.length, + compressedStartIndex: tailStart - 1, + }, + } + } + + /** Remove snapshot for a session (e.g. when session is deleted) */ + static invalidateSnapshot(sessionId: string): void { + deleteCompressionSnapshot(sessionId) + } +} diff --git a/packages/server/src/routes/hermes/chat-run.ts b/packages/server/src/routes/hermes/chat-run.ts new file mode 100644 index 00000000..1cda64f5 --- /dev/null +++ b/packages/server/src/routes/hermes/chat-run.ts @@ -0,0 +1,11 @@ +import type { ChatRunSocket } from '../../services/hermes/chat-run-socket' + +let chatRunServer: ChatRunSocket | null = null + +export function setChatRunServer(server: ChatRunSocket): void { + chatRunServer = server +} + +export function getChatRunServer(): ChatRunSocket | null { + return chatRunServer +} diff --git a/packages/server/src/routes/hermes/proxy-handler.ts b/packages/server/src/routes/hermes/proxy-handler.ts index 77f38d32..b38f3761 100644 --- a/packages/server/src/routes/hermes/proxy-handler.ts +++ b/packages/server/src/routes/hermes/proxy-handler.ts @@ -15,7 +15,7 @@ export function setRunSession(runId: string, sessionId: string): void { setTimeout(() => runSessionMap.delete(runId), 30 * 60 * 1000) } -function getSessionForRun(runId: string): string | undefined { +export function getSessionForRun(runId: string): string | undefined { return runSessionMap.get(runId) } diff --git a/packages/server/src/services/hermes/chat-run-socket.ts b/packages/server/src/services/hermes/chat-run-socket.ts new file mode 100644 index 00000000..6f34f442 --- /dev/null +++ b/packages/server/src/services/hermes/chat-run-socket.ts @@ -0,0 +1,467 @@ +/** + * Chat run via Socket.IO — namespace /chat-run. + * + * Replaces HTTP POST + SSE. Socket.IO decouples message handling + * from connection lifecycle: the server continues streaming upstream + * events even after the client disconnects or refreshes. + * + * Uses Socket.IO rooms keyed by session_id. On client reconnect, + * the client emits 'resume' to rejoin its session room. + */ +import type { Server, Socket } from 'socket.io' +import { EventSource } from 'eventsource' +import { setRunSession, getSessionForRun } from '../../routes/hermes/proxy-handler' +import { updateUsage } from '../../db/hermes/usage-store' +import { getSessionDetailFromDb } from '../../db/hermes/sessions-db' +import { getModelContextLength } from './model-context' +import { ChatContextCompressor, countTokens, SUMMARY_PREFIX } from '../../lib/context-compressor' +import { getCompressionSnapshot } from '../../db/hermes/compression-snapshot' +import { logger } from '../logger' + +const compressor = new ChatContextCompressor() + +// --- In-flight run tracking --- + +interface InFlightRun { + runId: string + abortController: AbortController +} + +// --- ChatRunSocket --- + +export class ChatRunSocket { + private nsp: ReturnType + private gatewayManager: any + /** sessionId → InFlightRun */ + private activeRuns = new Map() + /** sessionId → accumulated state events for reconnecting clients */ + private sessionStates = new Map>() + + constructor(io: Server, gatewayManager: any) { + this.nsp = io.of('/chat-run') + this.gatewayManager = gatewayManager + } + + init() { + this.nsp.use(this.authMiddleware.bind(this)) + this.nsp.on('connection', this.onConnection.bind(this)) + logger.info('[chat-run-socket] Socket.IO ready at /chat-run') + } + + // --- Auth middleware --- + + private async authMiddleware(socket: Socket, next: (err?: Error) => void) { + const token = socket.handshake.auth?.token as string | undefined + if (!process.env.AUTH_DISABLED && process.env.AUTH_DISABLED !== '1') { + const { getToken } = await import('../auth') + const serverToken = await getToken() + if (serverToken && token !== serverToken) { + return next(new Error('Authentication failed')) + } + } + next() + } + + // --- Connection handler --- + + private onConnection(socket: Socket) { + const profile = (socket.handshake.query?.profile as string) || 'default' + + socket.on('run', async (data: { + input: string + session_id?: string + model?: string + instructions?: string + }) => { + await this.handleRun(socket, data, profile) + }) + + socket.on('resume', (data: { session_id?: string }) => { + if (data.session_id) { + const sid = data.session_id + const room = `session:${sid}` + socket.join(room) + + // Replay all accumulated state events for this session + const states = this.sessionStates.get(sid) + if (states) { + for (const state of states) { + socket.emit(state.event, { ...state.data, session_id: sid }) + } + logger.info('[chat-run-socket] replayed %d state events for reconnecting client on session %s', states.length, sid) + } + + logger.info('[chat-run-socket] socket %s resumed session %s (active: %s)', socket.id, sid, this.activeRuns.has(sid)) + } + }) + + socket.on('abort', (data: { session_id?: string }) => { + if (data.session_id) { + this.handleAbort(data.session_id) + } + }) + } + + // --- Run handler --- + + private async handleRun( + socket: Socket, + data: { input: string; session_id?: string; model?: string; instructions?: string }, + profile: string, + ) { + const { input, session_id, model, instructions } = data + const upstream = (process.env.UPSTREAM || 'http://127.0.0.1:8642').replace(/\/$/, '') + const apiKey = this.gatewayManager.getApiKey(profile) || undefined + + // Join session room — events go to room, survives socket disconnect + if (session_id) { + socket.join(`session:${session_id}`) + } + + // Emit helper: tag every payload with session_id + const emit = (event: string, payload: any) => { + const tagged = session_id ? { ...payload, session_id } : payload + if (session_id) { + this.nsp.to(`session:${session_id}`).emit(event, tagged) + } else if (socket.connected) { + socket.emit(event, tagged) + } + } + + try { + // Build upstream request body + const body: Record = { input } + if (session_id) body.session_id = session_id + if (model) body.model = model + if (instructions) body.instructions = instructions + + // Build conversation_history from DB if session_id is provided + if (session_id) { + try { + const detail = await getSessionDetailFromDb(session_id) + if (detail?.messages?.length) { + let history: Array<{ + role: string + content: string + tool_calls?: any[] + tool_call_id?: string + name?: string + }> = detail.messages + .filter(m => (m.role === 'user' || m.role === 'assistant' || m.role === 'tool') && m.content !== undefined) + .map(m => { + const msg: any = { role: m.role, content: m.content || '' } + if (m.tool_calls?.length) msg.tool_calls = m.tool_calls + if (m.tool_call_id) msg.tool_call_id = m.tool_call_id + if (m.tool_name) msg.name = m.tool_name + return msg + }) + + // Context compression with snapshot awareness + const contextLength = getModelContextLength(profile) + const triggerTokens = Math.floor(contextLength / 2) + + // Step 1: Check existing snapshot — if present, assemble summary + new messages + const snapshot = session_id ? getCompressionSnapshot(session_id) : null + if (snapshot) { + const newMessages = history.slice(snapshot.lastMessageIndex + 1) + const summaryTokens = countTokens(SUMMARY_PREFIX + snapshot.summary) + const newTokens = newMessages.reduce((sum, m) => sum + countTokens(m.content), 0) + const assembledTokens = summaryTokens + newTokens + logger.info('[context-compress] session=%s: snapshot at %d, %d new messages, assembled ~%d tokens (threshold %d)', + session_id, snapshot.lastMessageIndex, newMessages.length, assembledTokens, triggerTokens) + if (assembledTokens <= triggerTokens) { + // Under threshold — use assembled context directly, no LLM call needed + history = [ + { role: 'user', content: SUMMARY_PREFIX + '\n\n' + snapshot.summary }, + ...newMessages, + ] + } else { + // Over threshold — needs incremental LLM compression + const beforeTokens = assembledTokens + this.pushState(session_id, 'compression.started', { + event: 'compression.started', + message_count: newMessages.length, + token_count: beforeTokens, + }) + emit('compression.started', { + event: 'compression.started', + message_count: newMessages.length, + token_count: beforeTokens, + }) + + try { + const result = await compressor.compress( + history, upstream, apiKey, session_id, contextLength, + ) + + this.replaceState(session_id, 'compression.completed', { + event: 'compression.completed', + compressed: result.meta.compressed, + llmCompressed: result.meta.llmCompressed, + totalMessages: result.meta.totalMessages, + resultMessages: result.messages.length, + beforeTokens, + afterTokens: result.messages.reduce((sum, m) => sum + countTokens(m.content), 0), + summaryTokens: result.meta.summaryTokenEstimate, + verbatimCount: result.meta.verbatimCount, + compressedStartIndex: result.meta.compressedStartIndex, + }) + logger.info('[context-compress] AFTER session=%s: %d messages, ~%d tokens (was %d)', session_id, result.messages.length, result.messages.reduce((sum, m) => sum + countTokens(m.content), 0), beforeTokens) + + emit('compression.completed', { + event: 'compression.completed', + compressed: result.meta.compressed, + llmCompressed: result.meta.llmCompressed, + totalMessages: result.meta.totalMessages, + resultMessages: result.messages.length, + beforeTokens, + afterTokens: result.messages.reduce((sum, m) => sum + countTokens(m.content), 0), + summaryTokens: result.meta.summaryTokenEstimate, + verbatimCount: result.meta.verbatimCount, + compressedStartIndex: result.meta.compressedStartIndex, + }) + + history = result.messages.map(m => ({ + role: m.role, + content: m.content, + tool_calls: m.tool_calls, + tool_call_id: m.tool_call_id, + name: m.name, + })) + } catch (err: any) { + this.replaceState(session_id, 'compression.completed', { + event: 'compression.completed', + compressed: false, + totalMessages: newMessages.length, + resultMessages: newMessages.length, + beforeTokens, + afterTokens: beforeTokens, + summaryTokens: 0, + verbatimCount: newMessages.length, + compressedStartIndex: -1, + error: err.message, + }) + logger.warn(err, '[chat-run-socket] compression failed for session %s, using assembled context', session_id) + emit('compression.completed', { + event: 'compression.completed', + compressed: false, + totalMessages: newMessages.length, + resultMessages: newMessages.length, + beforeTokens, + afterTokens: beforeTokens, + summaryTokens: 0, + verbatimCount: newMessages.length, + compressedStartIndex: -1, + error: err.message, + }) + } + } + } else if (history.length > 4) { + // No snapshot — check if raw history exceeds threshold + const beforeTokens = history.reduce((sum, m) => sum + countTokens(m.content), 0) + + if (beforeTokens <= triggerTokens) { + // Under threshold — use raw history as-is + logger.info('[context-compress] session=%s: %d messages, ~%d tokens — under threshold, skip', session_id, history.length, beforeTokens) + } else { + // Over threshold — full LLM compression + logger.info('[context-compress] BEFORE session=%s: %d messages, ~%d tokens (threshold %d)', session_id, history.length, beforeTokens, triggerTokens) + + this.pushState(session_id, 'compression.started', { + event: 'compression.started', + message_count: history.length, + token_count: beforeTokens, + }) + emit('compression.started', { + event: 'compression.started', + message_count: history.length, + token_count: beforeTokens, + }) + + try { + const result = await compressor.compress( + history, upstream, apiKey, session_id, contextLength, + ) + + this.replaceState(session_id, 'compression.completed', { + event: 'compression.completed', + compressed: result.meta.compressed, + llmCompressed: result.meta.llmCompressed, + totalMessages: result.meta.totalMessages, + resultMessages: result.messages.length, + beforeTokens, + afterTokens: result.messages.reduce((sum, m) => sum + countTokens(m.content), 0), + summaryTokens: result.meta.summaryTokenEstimate, + verbatimCount: result.meta.verbatimCount, + compressedStartIndex: result.meta.compressedStartIndex, + }) + logger.info('[context-compress] AFTER session=%s: %d messages, ~%d tokens (was %d)', session_id, result.messages.length, result.messages.reduce((sum, m) => sum + countTokens(m.content), 0), beforeTokens) + + emit('compression.completed', { + event: 'compression.completed', + compressed: result.meta.compressed, + llmCompressed: result.meta.llmCompressed, + totalMessages: result.meta.totalMessages, + resultMessages: result.messages.length, + beforeTokens, + afterTokens: result.messages.reduce((sum, m) => sum + countTokens(m.content), 0), + summaryTokens: result.meta.summaryTokenEstimate, + verbatimCount: result.meta.verbatimCount, + compressedStartIndex: result.meta.compressedStartIndex, + }) + + history = result.messages.map(m => ({ + role: m.role, + content: m.content, + tool_calls: m.tool_calls, + tool_call_id: m.tool_call_id, + name: m.name, + })) + } catch (err: any) { + this.replaceState(session_id, 'compression.completed', { + event: 'compression.completed', + compressed: false, + totalMessages: history.length, + resultMessages: history.length, + beforeTokens, + afterTokens: beforeTokens, + summaryTokens: 0, + verbatimCount: history.length, + compressedStartIndex: -1, + error: err.message, + }) + logger.warn(err, '[chat-run-socket] compression failed for session %s, using raw history', session_id) + emit('compression.completed', { + event: 'compression.completed', + compressed: false, + totalMessages: history.length, + resultMessages: history.length, + beforeTokens, + afterTokens: beforeTokens, + summaryTokens: 0, + verbatimCount: history.length, + compressedStartIndex: -1, + error: err.message, + }) + } + } + } + + body.conversation_history = history + } + } catch (err) { + logger.warn(err, '[chat-run-socket] failed to load conversation history for session %s', session_id) + } + } + + const headers: Record = { 'Content-Type': 'application/json' } + if (apiKey) headers['Authorization'] = `Bearer ${apiKey}` + + const res = await fetch(`${upstream}/v1/runs`, { + method: 'POST', + headers, + body: JSON.stringify(body), + signal: AbortSignal.timeout(120_000), + }) + + if (!res.ok) { + const text = await res.text().catch(() => '') + emit('run.failed', { event: 'run.failed', error: `Upstream ${res.status}: ${text}` }) + return + } + + const runData = await res.json() as any + const runId = runData.run_id + if (!runId) { + emit('run.failed', { event: 'run.failed', error: 'No run_id in upstream response' }) + return + } + + if (session_id) { + setRunSession(runId, session_id) + } + + const abortController = new AbortController() + if (session_id) { + this.activeRuns.set(session_id, { runId, abortController }) + } + + emit('run.started', { event: 'run.started', run_id: runId, status: runData.status }) + + // Stream upstream events via EventSource — survives socket disconnect + const eventsUrl = new URL(`${upstream}/v1/runs/${runId}/events`) + if (apiKey) eventsUrl.searchParams.set('token', apiKey) + + const source = new EventSource(eventsUrl.toString()) + + source.onmessage = (event: MessageEvent) => { + try { + const parsed = JSON.parse(event.data as string) + + // Intercept run.completed for usage tracking + if (parsed.event === 'run.completed' && parsed.usage && parsed.run_id) { + const sid = getSessionForRun(parsed.run_id) + if (sid) { + updateUsage(sid, parsed.usage.input_tokens, parsed.usage.output_tokens) + } + } + + emit(parsed.event || 'message', parsed) + + if (parsed.event === 'run.completed' || parsed.event === 'run.failed') { + source.close() + if (session_id) this.markCompleted(session_id, { event: parsed.event, run_id: parsed.run_id }) + } + } catch { /* not JSON, skip */ } + } + + source.onerror = () => { + source.close() + emit('run.failed', { event: 'run.failed', error: 'EventSource connection lost' }) + if (session_id) this.markCompleted(session_id, { event: 'run.failed' }) + } + } catch (err: any) { + emit('run.failed', { event: 'run.failed', error: err.message }) + if (session_id) this.markCompleted(session_id, { event: 'run.failed' }) + } + } + + // --- Abort handler --- + + private handleAbort(sessionId: string) { + const run = this.activeRuns.get(sessionId) + if (run) { + run.abortController.abort() + this.markCompleted(sessionId, { event: 'run.failed', run_id: run.runId }) + } + } + + /** Mark a session run as completed/failed so reconnecting clients get notified */ + private markCompleted(sessionId: string, info: { event: string; run_id?: string }) { + this.activeRuns.delete(sessionId) + this.pushState(sessionId, info.event, { event: info.event, run_id: info.run_id }) + // Auto-cleanup after 30s — enough time for a page refresh + setTimeout(() => this.sessionStates.delete(sessionId), 30_000) + } + + /** Append a state event for a session (used for replay on reconnect) */ + private pushState(sessionId: string, event: string, data: any) { + if (!this.sessionStates.has(sessionId)) { + this.sessionStates.set(sessionId, []) + } + this.sessionStates.get(sessionId)!.push({ event, data }) + } + + /** Replace the last state with the same event name, or append if different */ + private replaceState(sessionId: string, event: string, data: any) { + const states = this.sessionStates.get(sessionId) + if (states) { + const idx = states.findIndex(s => s.event === event) + if (idx >= 0) { + states[idx] = { event, data } + return + } + } + this.pushState(sessionId, event, data) + } +}