feat(chat): replace HTTP+SSE with Socket.IO for chat runs and add context compression

- Replace HTTP POST + SSE streaming with Socket.IO /chat-run namespace
  for decoupled message handling that survives client disconnect/refresh
- Add SQLite-backed context compression with snapshot-based incremental updates
- Unify server-side session state tracking (completedSessions + compressingSessions
  → sessionStates) for reliable state replay on reconnect
- Filter compress_ sessions from session list queries
- Add compression snapshot store with proper snake_case→camelCase column aliases
- Delete temporary compress_ sessions after compression completes
- Change compressed summary role from 'system' to 'user'
- Add compression.started/completed events to frontend chat store

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
ekko
2026-04-28 16:44:34 +08:00
parent 88ca3c865d
commit cacf8a2bac
12 changed files with 1646 additions and 352 deletions
+3 -2
View File
@@ -62,15 +62,16 @@
],
"dependencies": {
"eventsource": "^4.1.0",
"js-tiktoken": "^1.0.21",
"node-pty": "^1.1.0",
"socket.io": "^4.8.3",
"socket.io-client": "^4.8.3"
},
"devDependencies": {
"@multiavatar/multiavatar": "^1.0.7",
"@koa/bodyparser": "^5.0.0",
"@koa/cors": "^5.0.0",
"@koa/router": "^15.4.0",
"@multiavatar/multiavatar": "^1.0.7",
"@pinia/testing": "^1.0.3",
"@types/eventsource": "^1.1.15",
"@types/js-yaml": "^4.0.9",
@@ -117,4 +118,4 @@
"vue-tsc": "^3.2.6",
"ws": "^8.20.0"
}
}
}
+106 -42
View File
@@ -1,3 +1,4 @@
import { io, type Socket } from 'socket.io-client'
import { request, getBaseUrlValue, getApiKey } from '../client'
export interface ChatMessage {
@@ -8,7 +9,6 @@ export interface ChatMessage {
export interface StartRunRequest {
input: string | ChatMessage[]
instructions?: string
conversation_history?: ChatMessage[]
session_id?: string
model?: string
}
@@ -38,70 +38,134 @@ export interface RunEvent {
output_tokens: number
total_tokens: number
}
/** session_id tag added by server for client-side filtering */
session_id?: string
}
export async function startRun(body: StartRunRequest): Promise<StartRunResponse> {
const headers: Record<string, string> = {}
if (body.session_id) {
headers['X-Hermes-Session-Id'] = body.session_id
// ============================
// Socket.IO chat run connection
// ============================
let chatRunSocket: Socket | null = null
export function getChatRunSocket(): Socket | null {
return chatRunSocket
}
export function connectChatRun(): Socket {
if (chatRunSocket?.connected) return chatRunSocket
// Clean up old socket to prevent duplicate event listeners
if (chatRunSocket) {
chatRunSocket.removeAllListeners()
chatRunSocket.disconnect()
}
return request<StartRunResponse>('/api/hermes/v1/runs', {
method: 'POST',
body: JSON.stringify(body),
headers,
const baseUrl = getBaseUrlValue()
const token = getApiKey()
const profile = localStorage.getItem('hermes_active_profile_name') || 'default'
chatRunSocket = io(`${baseUrl}/chat-run`, {
auth: { token },
query: { profile },
transports: ['websocket', 'polling'],
reconnection: true,
reconnectionAttempts: Infinity,
reconnectionDelay: 1000,
reconnectionDelayMax: 10000,
})
return chatRunSocket
}
export function streamRunEvents(
runId: string,
export function disconnectChatRun(): void {
if (chatRunSocket) {
chatRunSocket.disconnect()
chatRunSocket = null
}
}
/**
* Start a chat run via Socket.IO and stream events back.
* Returns an AbortController-compatible handle for cancellation.
*/
export function startRunViaSocket(
body: StartRunRequest,
onEvent: (event: RunEvent) => void,
onDone: () => void,
onError: (err: Error) => void,
) {
const baseUrl = getBaseUrlValue()
const token = getApiKey()
const profile = localStorage.getItem('hermes_active_profile_name')
const params = new URLSearchParams()
if (token) params.set('token', token)
if (profile && profile !== 'default') params.set('profile', profile)
const qs = params.toString()
const url = `${baseUrl}/api/hermes/v1/runs/${runId}/events${qs ? `?${qs}` : ''}`
onStarted?: (runId: string) => void,
): { abort: () => void } {
const socket = connectChatRun()
let closed = false
const source = new EventSource(url)
source.onmessage = (e) => {
function cleanup() {
if (closed) return
try {
const parsed = JSON.parse(e.data)
onEvent(parsed)
closed = true
socket.off('run.started', onRunStarted)
socket.off('run.failed', onRunFailed)
socket.off('message.delta', onMessageDelta)
socket.off('reasoning.delta', onReasoningDelta)
socket.off('thinking.delta', onReasoningDelta)
socket.off('reasoning.available', onReasoningAvailable)
socket.off('tool.started', onToolStarted)
socket.off('tool.completed', onToolCompleted)
socket.off('run.completed', onRunCompleted)
socket.off('compression.started', onCompressionStarted)
socket.off('compression.completed', onCompressionCompleted)
}
if (parsed.event === 'run.completed' || parsed.event === 'run.failed') {
closed = true
source.close()
onDone()
}
} catch {
onEvent({ event: 'message', delta: e.data })
// All event handlers share the same cleanup logic
const handleEvent = (event: RunEvent) => {
if (closed) return
onEvent(event)
if (event.event === 'run.completed' || event.event === 'run.failed') {
cleanup()
onDone()
}
}
source.onerror = () => {
if (closed) return
closed = true
source.close()
onError(new Error('SSE connection error'))
function onRunStarted(data: RunEvent) {
handleEvent(data)
onStarted?.(data.run_id || '')
}
function onRunFailed(data: RunEvent) {
handleEvent(data)
onError?.(new Error(data.error || 'Run failed'))
}
function onMessageDelta(data: RunEvent) { handleEvent(data) }
function onReasoningDelta(data: RunEvent) { handleEvent(data) }
function onThinkingDelta(data: RunEvent) { handleEvent(data) }
function onReasoningAvailable(data: RunEvent) { handleEvent(data) }
function onToolStarted(data: RunEvent) { handleEvent(data) }
function onToolCompleted(data: RunEvent) { handleEvent(data) }
function onRunCompleted(data: RunEvent) { handleEvent(data) }
function onCompressionStarted(data: RunEvent) { handleEvent(data) }
function onCompressionCompleted(data: RunEvent) { handleEvent(data) }
socket.on('run.started', onRunStarted)
socket.on('run.failed', onRunFailed)
socket.on('message.delta', onMessageDelta)
socket.on('reasoning.delta', onReasoningDelta)
socket.on('thinking.delta', onThinkingDelta)
socket.on('reasoning.available', onReasoningAvailable)
socket.on('tool.started', onToolStarted)
socket.on('tool.completed', onToolCompleted)
socket.on('run.completed', onRunCompleted)
socket.on('compression.started', onCompressionStarted)
socket.on('compression.completed', onCompressionCompleted)
// Emit run:start with ack callback to get run_id
socket.emit('run', body)
// Return AbortController-compatible object
return {
abort: () => {
if (!closed) {
closed = true
source.close()
socket.emit('abort', { session_id: body.session_id })
cleanup()
}
},
} as unknown as AbortController
}
}
export async function fetchModels(): Promise<{ data: Array<{ id: string }> }> {
@@ -12,6 +12,12 @@ const { t } = useI18n();
const { isDark } = useTheme();
const listRef = ref<HTMLElement>();
function formatTokens(n: number): string {
if (n >= 1_000_000) return (n / 1_000_000).toFixed(1) + 'M'
if (n >= 1_000) return (n / 1_000).toFixed(1) + 'K'
return String(n)
}
const displayMessages = computed(() =>
chatStore.messages.filter((m) => m.role !== "tool"),
);
@@ -128,7 +134,48 @@ watch(currentToolCalls, () => {
playsinline
class="thinking-video"
/>
<div v-if="currentToolCalls.length > 0" class="tool-calls-panel">
<div v-if="currentToolCalls.length > 0 || chatStore.compressionState" class="tool-calls-panel">
<!-- Compression indicator -->
<div v-if="chatStore.compressionState" class="tool-call-item compression-item">
<svg
v-if="chatStore.compressionState.compressing"
width="12"
height="12"
viewBox="0 0 24 24"
fill="none"
stroke="currentColor"
stroke-width="1.5"
class="tool-call-icon"
>
<path d="M4 4v5h.582m15.356 2A8.001 8.001 0 004.582 9m0 0H9m11 11v-5h-.581m0 0a8.003 8.003 0 01-15.357-2m15.357 2H15" />
</svg>
<svg
v-else-if="chatStore.compressionState.compressed"
width="12"
height="12"
viewBox="0 0 24 24"
fill="none"
stroke="currentColor"
stroke-width="1.5"
class="tool-call-icon"
>
<path d="M5 13l4 4L19 7" />
</svg>
<span class="tool-call-name">
{{
chatStore.compressionState.compressing
? `Compressing... (${chatStore.compressionState.messageCount} msgs, ~${formatTokens(chatStore.compressionState.beforeTokens)} tokens)`
: chatStore.compressionState.compressed
? `Compressed ${chatStore.compressionState.messageCount} msgs: ~${formatTokens(chatStore.compressionState.beforeTokens)} → ~${formatTokens(chatStore.compressionState.afterTokens)} tokens`
: `Compression skipped`
}}
</span>
<span
v-if="chatStore.compressionState.compressing"
class="tool-call-spinner"
></span>
</div>
<!-- Tool calls -->
<div
v-for="tc in currentToolCalls"
:key="tc.id"
@@ -253,6 +300,11 @@ watch(currentToolCalls, () => {
background: rgba(255, 255, 255, 0.06);
}
&.compression-item {
color: $text-muted;
font-size: 10px;
}
.tool-call-icon {
flex-shrink: 0;
color: $text-muted;
+338 -300
View File
@@ -1,4 +1,4 @@
import { startRun, streamRunEvents, type ChatMessage, type RunEvent } from '@/api/hermes/chat'
import { startRunViaSocket, connectChatRun, type RunEvent } from '@/api/hermes/chat'
import { deleteSession as deleteSessionApi, fetchSession, fetchSessions, fetchSessionUsageSingle, type HermesMessage, type SessionSummary } from '@/api/hermes/sessions'
import { getApiKey } from '@/api/client'
import { defineStore } from 'pinia'
@@ -170,18 +170,10 @@ function mapHermesSession(s: SessionSummary): Session {
}
}
// Cache keys for stale-while-revalidate loading of sessions / messages.
// All keys include the active profile name to isolate cache between profiles.
// Rendering from cache on boot avoids the multi-round-trip wait the user sees
// every time they open the page (esp. noticeable on mobile).
const STORAGE_KEY_PREFIX = 'hermes_active_session_'
const SESSIONS_CACHE_KEY_PREFIX = 'hermes_sessions_cache_v1_'
const LEGACY_STORAGE_KEY = 'hermes_active_session'
const LEGACY_SESSIONS_CACHE_KEY = 'hermes_sessions_cache_v1'
const IN_FLIGHT_TTL_MS = 15 * 60 * 1000 // Give up after 15 minutes
const POLL_INTERVAL_MS = 2000
const POLL_STABLE_EXITS = 3 // 3 × 2s = 6s of no change → assume run finished
const LIVE_BADGE_WINDOW_MS = 5 * 60 * 1000
const IN_FLIGHT_TTL_MS = 15 * 60 * 1000 // Give up after 15 minutes
// 获取当前 profile 名称,用于隔离缓存。
// 从 profiles store 的 activeProfileName(同步 localStorage)读取,
@@ -196,11 +188,9 @@ function getProfileName(): string {
function storageKey(): string { return STORAGE_KEY_PREFIX + getProfileName() }
function sessionsCacheKey(): string { return SESSIONS_CACHE_KEY_PREFIX + getProfileName() }
function msgsCacheKey(sid: string): string { return `hermes_session_msgs_v1_${getProfileName()}_${sid}_` }
function inFlightKey(sid: string): string { return `hermes_in_flight_v1_${getProfileName()}_${sid}` }
function legacyStorageKey(): string | null { return getProfileName() === 'default' ? LEGACY_STORAGE_KEY : null }
function legacySessionsCacheKey(): string | null { return getProfileName() === 'default' ? LEGACY_SESSIONS_CACHE_KEY : null }
function legacyMsgsCacheKey(sid: string): string | null { return getProfileName() === 'default' ? `hermes_session_msgs_v1_${sid}` : null }
function inFlightKey(sid: string): string { return `hermes_in_flight_v1_${getProfileName()}_${sid}` }
function legacyInFlightKey(sid: string): string | null { return getProfileName() === 'default' ? `hermes_in_flight_v1_${sid}` : null }
interface InFlightRun {
@@ -303,61 +293,37 @@ function removeItemWithLegacy(key: string, legacyKey?: string | null) {
// Strip the circular `file: File` reference from attachments before caching —
// File objects don't serialize and we only need name/type/size/url for display.
function sanitizeForCache(msgs: Message[]): Message[] {
return msgs.map(m => {
if (!m.attachments?.length) return m
return {
...m,
attachments: m.attachments.map(a => ({ id: a.id, name: a.name, type: a.type, size: a.size, url: a.url })),
}
})
}
// Heals assistant messages whose `reasoning` field was polluted by the
// old bug where `reasoning.available` clobbered it with the assistant
// content. Detection heuristic: reasoning is a prefix of content (the
// bug always derived `reasoning` from `content[:500]` with tags stripped).
// Legitimate reasoning is almost never a prefix of the final answer.
function scrubBuggyReasoningInCache(msgs: Message[] | null | undefined): Message[] {
if (!msgs) return []
return msgs.map(m => {
if (m.role !== 'assistant' || !m.reasoning || !m.content) return m
const r = m.reasoning.trim()
const c = m.content.trim()
if (!r || !c) return m
if (c === r || c.startsWith(r)) {
const { reasoning: _drop, ...rest } = m
return rest as Message
}
return m
})
}
export const useChatStore = defineStore('chat', () => {
const sessions = ref<Session[]>([])
const activeSessionId = ref<string | null>(null)
const focusMessageId = ref<string | null>(null)
const streamStates = ref<Map<string, AbortController>>(new Map())
const streamStates = ref<Map<string, { abort: () => void }>>(new Map())
const isStreaming = computed(() => activeSessionId.value != null && streamStates.value.has(activeSessionId.value))
const isLoadingSessions = ref(false)
const sessionsLoaded = ref(false)
const isLoadingMessages = ref(false)
// tmux-like resume state: true when we recovered an in-flight run from
// localStorage after a refresh and are polling fetchSession for progress.
// UI shows the thinking indicator while this is set.
const resumingRuns = ref<Set<string>>(new Set())
const isRunActive = computed(() =>
isStreaming.value
|| (activeSessionId.value != null && resumingRuns.value.has(activeSessionId.value))
)
const pollTimers = new Map<string, ReturnType<typeof setInterval>>()
const pollSignatures = new Map<string, { sig: string, stableTicks: number }>()
const isRunActive = computed(() => isStreaming.value)
// Compression state
const compressionState = ref<{
compressing: boolean
messageCount: number
beforeTokens: number
afterTokens: number
compressed: boolean | null
error?: string
} | null>(null)
function setCompressionState(state: typeof compressionState.value) {
compressionState.value = state
}
const activeSession = ref<Session | null>(null)
const messages = computed<Message[]>(() => activeSession.value?.messages || [])
function isSessionLive(sessionId: string): boolean {
if (streamStates.value.has(sessionId) || resumingRuns.value.has(sessionId)) return true
if (streamStates.value.has(sessionId)) return true
const session = sessions.value.find(candidate => candidate.id === sessionId)
if (!session?.lastActiveAt || session.endedAt != null) return false
@@ -365,7 +331,6 @@ export const useChatStore = defineStore('chat', () => {
}
function persistSessionsList() {
// Cache lightweight summaries only (messages are cached per-session).
saveJsonWithLegacy(
sessionsCacheKey(),
sessions.value.map(s => ({ ...s, messages: [] })),
@@ -373,13 +338,6 @@ export const useChatStore = defineStore('chat', () => {
)
}
function persistActiveMessages() {
const sid = activeSessionId.value
if (!sid) return
const s = sessions.value.find(sess => sess.id === sid)
if (s) saveJsonWithLegacy(msgsCacheKey(sid), sanitizeForCache(s.messages), legacyMsgsCacheKey(sid))
}
function markInFlight(sid: string, runId: string) {
saveJsonWithLegacy(inFlightKey(sid), { runId, startedAt: Date.now() } as InFlightRun, legacyInFlightKey(sid))
}
@@ -398,122 +356,10 @@ export const useChatStore = defineStore('chat', () => {
return rec
}
function compareServerMessages(local: Message[], server: Message[]) {
const localUserIndexes = local.map((m, i) => (m.role === 'user' ? i : -1)).filter(i => i >= 0)
const serverUserIndexes = server.map((m, i) => (m.role === 'user' ? i : -1)).filter(i => i >= 0)
const localUsers = localUserIndexes.length
const serverUsers = serverUserIndexes.length
if (serverUsers > localUsers) return { serverIsCaughtUp: true, serverIsAhead: true }
if (serverUsers < localUsers) return { serverIsCaughtUp: false, serverIsAhead: false }
const localLastUserIndex = localUserIndexes[localUserIndexes.length - 1] ?? -1
const serverLastUserIndex = serverUserIndexes[serverUserIndexes.length - 1] ?? -1
const sameCurrentTurn =
localLastUserIndex < 0
|| serverLastUserIndex < 0
|| local[localLastUserIndex]?.content === server[serverLastUserIndex]?.content
if (!sameCurrentTurn) return { serverIsCaughtUp: false, serverIsAhead: false }
const localCurrentAssistantLen = local
.slice(localLastUserIndex + 1)
.filter(m => m.role === 'assistant')
.reduce((total, m) => total + (m.content?.length || 0), 0)
const serverCurrentAssistantLen = server
.slice(serverLastUserIndex + 1)
.filter(m => m.role === 'assistant')
.reduce((total, m) => total + (m.content?.length || 0), 0)
return {
serverIsCaughtUp: true,
serverIsAhead: serverCurrentAssistantLen >= localCurrentAssistantLen,
}
}
function stopPolling(sid: string) {
const t = pollTimers.get(sid)
if (t) {
clearInterval(t)
pollTimers.delete(sid)
}
pollSignatures.delete(sid)
resumingRuns.value = new Set([...resumingRuns.value].filter(x => x !== sid))
}
// Poll fetchSession while an in-flight run is recovering. Exits when the
// server's message signature is stable for POLL_STABLE_EXITS ticks (run
// presumed done), TTL elapses, or the user explicitly starts streaming.
function startPolling(sid: string) {
if (pollTimers.has(sid)) return
resumingRuns.value = new Set([...resumingRuns.value, sid])
const timer = setInterval(async () => {
// If a fresh SSE stream started for this session, polling is redundant.
if (streamStates.value.has(sid)) {
stopPolling(sid)
return
}
const inFlight = readInFlight(sid)
if (!inFlight) {
stopPolling(sid)
return
}
try {
const detail = await fetchSession(sid)
if (!detail) return
const mapped = mapHermesMessages(detail.messages || [])
const target = sessions.value.find(s => s.id === sid)
if (!target) return
// Use the same current-turn comparison as switchSession: server is
// ahead only when it has a newer user turn or the assistant output
// after the current user turn has caught up.
const local = target.messages
const { serverIsAhead, serverIsCaughtUp } = compareServerMessages(local, mapped)
if (serverIsAhead) {
target.messages = mapped
if (detail.title && !target.title) target.title = detail.title
if (sid === activeSessionId.value) persistActiveMessages()
}
// Stability detection ONLY matters when the server has at least as
// many user turns as we do. Otherwise the server is still catching
// up (e.g. the new turn we just sent hasn't been flushed server-side
// yet) and a "stable" signature is a false positive — the stability
// is the server NOT having our latest turn, not the run being done.
if (!serverIsCaughtUp) {
pollSignatures.delete(sid)
} else {
const last = mapped[mapped.length - 1]
const sig = `${mapped.length}|${last?.content?.slice(-40) || ''}|${last?.toolStatus || ''}`
const prev = pollSignatures.get(sid)
if (prev && prev.sig === sig) {
prev.stableTicks += 1
if (prev.stableTicks >= POLL_STABLE_EXITS) {
// The server view has stopped changing. If it is still behind
// the locally streamed assistant reply, end recovery without
// retreating local state; otherwise commit the server view.
if (serverIsAhead) {
target.messages = mapped
if (detail.title) target.title = detail.title
if (sid === activeSessionId.value) persistActiveMessages()
}
clearInFlight(sid)
stopPolling(sid)
}
} else {
pollSignatures.set(sid, { sig, stableTicks: 0 })
}
}
} catch {
// transient network error — ignore, next tick tries again
}
}, POLL_INTERVAL_MS)
pollTimers.set(sid, timer)
}
async function loadSessions() {
isLoadingSessions.value = true
try {
// 从 profile 对应的缓存中恢复,实现 instant render
// Restore sessions list from cache (lightweight, no messages)
const cachedSessions = loadJsonWithFallback<Session[]>(sessionsCacheKey(), legacySessionsCacheKey())
if (cachedSessions?.length) {
sessions.value = cachedSessions
@@ -521,8 +367,6 @@ export const useChatStore = defineStore('chat', () => {
if (savedId) {
const cachedActive = cachedSessions.find(s => s.id === savedId) || null
if (cachedActive) {
const cachedMsgs = loadJsonWithFallback<Message[]>(msgsCacheKey(savedId), legacyMsgsCacheKey(savedId))
if (cachedMsgs) cachedActive.messages = scrubBuggyReasoningInCache(cachedMsgs)
activeSession.value = cachedActive
activeSessionId.value = savedId
}
@@ -548,8 +392,6 @@ export const useChatStore = defineStore('chat', () => {
const localOnly = sessions.value.filter(s => {
if (freshIds.has(s.id)) return false
if (readInFlight(s.id)) return true
// Session no longer exists on server and no active run — clean up cache
removeItemWithLegacy(msgsCacheKey(s.id), legacyMsgsCacheKey(s.id))
removeItemWithLegacy(inFlightKey(s.id), legacyInFlightKey(s.id))
return false
})
@@ -572,10 +414,7 @@ export const useChatStore = defineStore('chat', () => {
}
}
// Re-pull active session from server without retreating newer locally
// streamed output. Used on SSE drop and on tab-visible events — mobile
// browsers kill EventSource while backgrounded, but the backend run usually
// completes anyway.
// Re-pull active session from server. Used on tab-visible events.
async function refreshActiveSession(): Promise<boolean> {
const sid = activeSessionId.value
if (!sid) return false
@@ -585,11 +424,7 @@ export const useChatStore = defineStore('chat', () => {
const target = sessions.value.find(s => s.id === sid)
if (!target) return false
const mapped = mapHermesMessages(detail.messages || [])
const { serverIsAhead } = compareServerMessages(target.messages, mapped)
if (serverIsAhead) {
target.messages = mapped
persistActiveMessages()
}
target.messages = mapped
if (detail.title) target.title = detail.title
return true
} catch (err) {
@@ -626,35 +461,12 @@ export const useChatStore = defineStore('chat', () => {
if (!activeSession.value) return
// Hydrate messages from localStorage cache first (instant render), then
// revalidate from server in the background. If no cache exists, show the
// loading state while we fetch.
const hasLocalMessages = activeSession.value.messages.length > 0
if (!hasLocalMessages) {
const cachedMsgs = loadJsonWithFallback<Message[]>(msgsCacheKey(sessionId), legacyMsgsCacheKey(sessionId))
if (cachedMsgs?.length) {
activeSession.value.messages = scrubBuggyReasoningInCache(cachedMsgs)
}
}
const needsBlockingLoad = activeSession.value.messages.length === 0
if (needsBlockingLoad) isLoadingMessages.value = true
isLoadingMessages.value = true
try {
const detail = await fetchSession(sessionId)
if (detail && detail.messages) {
const mapped = mapHermesMessages(detail.messages)
// Pick whichever view has more information for the current turn.
// Simple message-count comparison is wrong because mapHermesMessages
// folds tool_call-only assistant messages; global last-assistant
// comparison is also wrong across turns. Trust server only when it has
// a newer user turn or its assistant output after the current user turn
// has caught up.
const local = activeSession.value.messages
const { serverIsAhead } = compareServerMessages(local, mapped)
if (serverIsAhead) {
activeSession.value.messages = mapped
}
activeSession.value.messages = mapHermesMessages(detail.messages)
// Update title: use Hermes title, or fallback to first user message
if (detail.title) {
activeSession.value.title = detail.title
@@ -665,7 +477,6 @@ export const useChatStore = defineStore('chat', () => {
activeSession.value.title = t + (firstUser.content.length > 40 ? '...' : '')
}
}
persistActiveMessages()
}
} catch (err) {
console.error('Failed to load session messages:', err)
@@ -673,12 +484,9 @@ export const useChatStore = defineStore('chat', () => {
isLoadingMessages.value = false
}
// tmux-like resume: if this session has a recent in-flight run and we're
// not currently streaming, start polling fetchSession to pick up progress
// that happened while we were gone. Exits automatically on stability.
if (readInFlight(sessionId) && !streamStates.value.has(sessionId)) {
startPolling(sessionId)
}
// Always resume via Socket.IO for the active session.
// Server tracks run/compression state per session and replays events.
resumeInFlightRun(sessionId)
// Fetch token usage for this session from web-ui DB
try {
@@ -713,7 +521,6 @@ export const useChatStore = defineStore('chat', () => {
async function deleteSession(sessionId: string) {
await deleteSessionApi(sessionId)
sessions.value = sessions.value.filter(s => s.id !== sessionId)
removeItemWithLegacy(msgsCacheKey(sessionId), legacyMsgsCacheKey(sessionId))
persistSessionsList()
if (activeSessionId.value === sessionId) {
if (sessions.value.length > 0) {
@@ -777,21 +584,10 @@ export const useChatStore = defineStore('chat', () => {
timestamp: Date.now(),
attachments: attachments && attachments.length > 0 ? attachments : undefined,
}
// Build conversation history BEFORE adding the new message, so the
// user's current message appears only in `input` — not duplicated in
// `conversation_history` as well.
const sessionMsgs = getSessionMsgs(sid)
const history: ChatMessage[] = sessionMsgs
.filter(m => (m.role === 'user' || m.role === 'assistant') && m.content.trim())
.map(m => ({ role: m.role as 'user' | 'assistant' | 'system', content: m.content }))
addMessage(sid, userMsg)
updateSessionTitle(sid)
// Persist immediately so a refresh before the first SSE event (e.g. the
// user closes the tab right after sending) still has the user's message
// and session title in the cache.
if (sid === activeSessionId.value) {
persistActiveMessages()
persistSessionsList()
}
@@ -821,71 +617,65 @@ export const useChatStore = defineStore('chat', () => {
const appStore = useAppStore()
const sessionModel = activeSession.value?.model || appStore.selectedModel
const run = await startRun({
const runPayload = {
input: inputText,
conversation_history: history,
session_id: sid,
model: sessionModel || undefined,
})
const runId = (run as any).run_id || (run as any).id
if (!runId) {
addMessage(sid, {
id: uid(),
role: 'system',
content: `Error: startRun returned no run ID. Response: ${JSON.stringify(run)}`,
timestamp: Date.now(),
})
return
}
// tmux-like resume: persist run_id so refresh/reopen can pick up the
// working indicator and poll for progress.
markInFlight(sid, runId)
// If we were already polling (e.g. user re-sent while resume was still
// polling an earlier run), cancel that polling — the new SSE stream is
// the authoritative live source.
stopPolling(sid)
// Helper to clean up this session's stream state
const cleanup = () => {
streamStates.value.delete(sid)
if (persistTimer) {
clearTimeout(persistTimer)
persistTimer = null
}
}
// Throttle in-flight cache writes so a refresh mid-stream still shows
// the partial reply. 800ms keeps quota pressure low while guaranteeing
// at most ~1s of unsaved delta on reload.
let persistTimer: ReturnType<typeof setTimeout> | null = null
// Per-run flags used to detect silently-swallowed errors at run.completed.
// hermes-agent occasionally emits run.completed with empty output and no
// usage when the agent layer caught an upstream error (e.g. invalid API
// key). We need to distinguish: (a) run with assistant text produced,
// (b) run with only tool activity, (c) run with truly nothing visible.
// Reset per send() call — closures captured by SSE callbacks are scoped
// Reset per send() call — closures captured by Socket.IO callbacks are scoped
// to this run, so there is no cross-run contamination.
let runProducedAssistantText = false
let runHadToolActivity = false
const schedulePersist = () => {
if (sid !== activeSessionId.value || persistTimer) return
persistTimer = setTimeout(() => {
persistTimer = null
persistActiveMessages()
}, 800)
}
// Listen to SSE events — all closures capture `sid`
const ctrl = streamRunEvents(
runId,
// Send run via Socket.IO and listen to streamed events — all closures capture `sid`
const ctrl = startRunViaSocket(
runPayload,
// onEvent
(evt: RunEvent) => {
switch (evt.event) {
case 'run.started':
break
case 'compression.started': {
setCompressionState({
compressing: true,
messageCount: (evt as any).message_count || 0,
beforeTokens: (evt as any).token_count || 0,
afterTokens: 0,
compressed: null,
})
break
}
case 'compression.completed': {
setCompressionState({
compressing: false,
messageCount: (evt as any).totalMessages || 0,
beforeTokens: (evt as any).beforeTokens || 0,
afterTokens: (evt as any).afterTokens || 0,
compressed: (evt as any).compressed ?? false,
error: (evt as any).error,
})
// Auto-clear after 5s
setTimeout(() => {
if (compressionState.value && !compressionState.value.compressing) {
setCompressionState(null)
}
}, 5000)
break
}
case 'reasoning.delta':
case 'thinking.delta': {
const text = evt.text || evt.delta || ''
@@ -908,7 +698,7 @@ export const useChatStore = defineStore('chat', () => {
})
noteReasoningStart(newId)
}
schedulePersist()
break
}
@@ -925,7 +715,7 @@ export const useChatStore = defineStore('chat', () => {
// 否则(上游未转发 delta,只发这一次 available)不显示时长。
noteReasoningEnd(last.id)
}
schedulePersist()
break
}
@@ -952,7 +742,7 @@ export const useChatStore = defineStore('chat', () => {
isStreaming: true,
})
}
schedulePersist()
break
}
@@ -972,7 +762,7 @@ export const useChatStore = defineStore('chat', () => {
toolPreview: evt.preview,
toolStatus: 'running',
})
schedulePersist()
break
}
@@ -986,7 +776,7 @@ export const useChatStore = defineStore('chat', () => {
const last = toolMsgs[toolMsgs.length - 1]
updateMessage(sid, last.id, { toolStatus: 'done' })
}
schedulePersist()
break
}
@@ -1048,9 +838,8 @@ export const useChatStore = defineStore('chat', () => {
// the next page load to still see in-flight === true (so
// polling kicks in and recovers) rather than the other way
// around (cleared in-flight + stale streaming cache = UI stuck).
if (sid === activeSessionId.value) persistActiveMessages()
clearInFlight(sid)
stopPolling(sid)
break
}
@@ -1077,9 +866,8 @@ export const useChatStore = defineStore('chat', () => {
}
})
cleanup()
if (sid === activeSessionId.value) persistActiveMessages()
clearInFlight(sid)
stopPolling(sid)
break
}
}
@@ -1095,21 +883,13 @@ export const useChatStore = defineStore('chat', () => {
updateSessionTitle(sid)
},
// onError
// Mobile browsers drop EventSource when the tab backgrounds / screen
// locks / network flips. The backend run usually completes anyway, so
// rather than injecting a stale "SSE connection error" bubble we mark
// streaming as done and silently re-sync from the server, which has
// the real final answer. If the server fetch itself fails, we leave
// whatever text we already streamed in place — no visible error.
(err) => {
console.warn('SSE connection dropped, resyncing from server:', err.message)
console.warn('Socket.IO run stream error:', err.message)
const msgs = getSessionMsgs(sid)
const last = msgs[msgs.length - 1]
if (last?.isStreaming) {
updateMessage(sid, last.id, { isStreaming: false })
}
// Any tool messages still marked 'running' will be replaced by the
// server's view after refresh; clear their spinner state now.
msgs.forEach((m, i) => {
if (m.role === 'tool' && m.toolStatus === 'running') {
msgs[i] = { ...m, toolStatus: 'done' }
@@ -1119,12 +899,10 @@ export const useChatStore = defineStore('chat', () => {
if (sid === activeSessionId.value) {
void refreshActiveSession()
}
// The run might still be going on the server side (SSE drop doesn't
// abort it). If we still have an in-flight record, fall back to
// polling fetchSession to keep the user updated.
if (readInFlight(sid)) {
startPolling(sid)
}
},
// onStarted — called when server acks with run_id
(runId: string) => {
markInFlight(sid, runId)
},
)
@@ -1139,6 +917,264 @@ export const useChatStore = defineStore('chat', () => {
}
}
/**
* Resume an in-flight run after page refresh.
* Emits 'resume' to join the session room on the server,
* then sets up event listeners to receive ongoing events.
*/
function resumeInFlightRun(sid: string) {
const socket = connectChatRun()
let closed = false
let runProducedAssistantText = false
let runHadToolActivity = false
const cleanup = () => {
if (closed) return
closed = true
socket.off('run.started', onRunStarted)
socket.off('run.failed', onRunFailed)
socket.off('message.delta', onMessageDelta)
socket.off('reasoning.delta', onReasoningDelta)
socket.off('thinking.delta', onThinkingDelta)
socket.off('reasoning.available', onReasoningAvailable)
socket.off('tool.started', onToolStarted)
socket.off('tool.completed', onToolCompleted)
socket.off('run.completed', onRunCompleted)
socket.off('compression.started', onCompressionStarted)
socket.off('compression.completed', onCompressionCompleted)
streamStates.value.delete(sid)
}
// Shared event handler — filters by session_id tag
function handleEvent(evt: RunEvent) {
if (closed) return
// Filter events for this session (server tags all events with session_id)
if (evt.session_id && evt.session_id !== sid) return
switch (evt.event) {
case 'run.started':
break
case 'compression.started': {
setCompressionState({
compressing: true,
messageCount: (evt as any).message_count || 0,
beforeTokens: (evt as any).token_count || 0,
afterTokens: 0,
compressed: null,
})
break
}
case 'compression.completed': {
setCompressionState({
compressing: false,
messageCount: (evt as any).totalMessages || 0,
beforeTokens: (evt as any).beforeTokens || 0,
afterTokens: (evt as any).afterTokens || 0,
compressed: (evt as any).compressed ?? false,
error: (evt as any).error,
})
setTimeout(() => {
if (compressionState.value && !compressionState.value.compressing) {
setCompressionState(null)
}
}, 5000)
break
}
case 'reasoning.delta':
case 'thinking.delta': {
const text = evt.text || evt.delta || ''
if (!text) break
runProducedAssistantText = true
const msgs = getSessionMsgs(sid)
const last = msgs[msgs.length - 1]
if (last?.role === 'assistant' && last.isStreaming) {
last.reasoning = (last.reasoning || '') + text
noteReasoningStart(last.id)
} else {
const newId = uid()
addMessage(sid, {
id: newId,
role: 'assistant',
content: '',
timestamp: Date.now(),
isStreaming: true,
reasoning: text,
})
noteReasoningStart(newId)
}
break
}
case 'reasoning.available': {
const msgs = getSessionMsgs(sid)
const last = msgs[msgs.length - 1]
if (last?.role === 'assistant' && last.isStreaming) {
noteReasoningEnd(last.id)
}
break
}
case 'message.delta': {
if (evt.delta) runProducedAssistantText = true
const msgs = getSessionMsgs(sid)
const last = msgs[msgs.length - 1]
if (last?.role === 'assistant' && last.isStreaming) {
const prev = last.content
const next = prev + (evt.delta || '')
noteThinkingDelta(last.id, prev, next)
if (last.reasoning) noteReasoningEnd(last.id)
last.content = next
} else {
const newId = uid()
const nextContent = evt.delta || ''
noteThinkingDelta(newId, '', nextContent)
addMessage(sid, {
id: newId,
role: 'assistant',
content: nextContent,
timestamp: Date.now(),
isStreaming: true,
})
}
break
}
case 'tool.started': {
runHadToolActivity = true
const msgs = getSessionMsgs(sid)
const last = msgs[msgs.length - 1]
if (last?.isStreaming) {
updateMessage(sid, last.id, { isStreaming: false })
}
addMessage(sid, {
id: uid(),
role: 'tool',
content: '',
timestamp: Date.now(),
toolName: evt.tool || evt.name,
toolPreview: evt.preview,
toolStatus: 'running',
})
break
}
case 'tool.completed': {
runHadToolActivity = true
const msgs = getSessionMsgs(sid)
const toolMsgs = msgs.filter(m => m.role === 'tool' && m.toolStatus === 'running')
if (toolMsgs.length > 0) {
updateMessage(sid, toolMsgs[toolMsgs.length - 1].id, { toolStatus: 'done' })
}
break
}
case 'run.completed': {
const msgs = getSessionMsgs(sid)
const lastMsg = msgs[msgs.length - 1]
if (lastMsg?.isStreaming) {
updateMessage(sid, lastMsg.id, { isStreaming: false })
}
if (evt.usage) {
const target = sessions.value.find(s => s.id === sid)
if (target) {
target.inputTokens = evt.usage.input_tokens
target.outputTokens = evt.usage.output_tokens
}
}
const finalOutput = typeof evt.output === 'string' ? evt.output : ''
const finalOutputTrimmed = finalOutput.trim()
if (!runProducedAssistantText && finalOutputTrimmed !== '') {
addMessage(sid, {
id: uid(),
role: 'assistant',
content: finalOutput,
timestamp: Date.now(),
})
}
const swallowedError = !runProducedAssistantText && !runHadToolActivity && finalOutputTrimmed === ''
if (swallowedError) {
addMessage(sid, {
id: uid(),
role: 'system',
content: 'Error: Agent returned no output. The model call may have failed (e.g. invalid API key, model not supported by provider, or context exceeded). Check the hermes-agent logs for details.',
timestamp: Date.now(),
})
}
cleanup()
updateSessionTitle(sid)
clearInFlight(sid)
break
}
case 'run.failed': {
const msgs = getSessionMsgs(sid)
const lastErr = msgs[msgs.length - 1]
if (lastErr?.isStreaming) {
updateMessage(sid, lastErr.id, {
isStreaming: false,
content: evt.error ? `Error: ${evt.error}` : 'Run failed',
role: 'system',
})
} else {
addMessage(sid, {
id: uid(),
role: 'system',
content: evt.error ? `Error: ${evt.error}` : 'Run failed',
timestamp: Date.now(),
})
}
msgs.forEach((m, i) => {
if (m.role === 'tool' && m.toolStatus === 'running') {
msgs[i] = { ...m, toolStatus: 'error' }
}
})
cleanup()
clearInFlight(sid)
break
}
}
}
function onRunStarted(data: RunEvent) { handleEvent(data) }
function onRunFailed(data: RunEvent) { handleEvent(data) }
function onMessageDelta(data: RunEvent) { handleEvent(data) }
function onReasoningDelta(data: RunEvent) { handleEvent(data) }
function onThinkingDelta(data: RunEvent) { handleEvent(data) }
function onReasoningAvailable(data: RunEvent) { handleEvent(data) }
function onToolStarted(data: RunEvent) { handleEvent(data) }
function onToolCompleted(data: RunEvent) { handleEvent(data) }
function onRunCompleted(data: RunEvent) { handleEvent(data) }
function onCompressionStarted(data: RunEvent) { handleEvent(data) }
function onCompressionCompleted(data: RunEvent) { handleEvent(data) }
socket.on('run.started', onRunStarted)
socket.on('run.failed', onRunFailed)
socket.on('message.delta', onMessageDelta)
socket.on('reasoning.delta', onReasoningDelta)
socket.on('thinking.delta', onThinkingDelta)
socket.on('reasoning.available', onReasoningAvailable)
socket.on('tool.started', onToolStarted)
socket.on('tool.completed', onToolCompleted)
socket.on('run.completed', onRunCompleted)
socket.on('compression.started', onCompressionStarted)
socket.on('compression.completed', onCompressionCompleted)
// Emit resume to join the session room
socket.emit('resume', { session_id: sid })
// Mark as streaming so UI shows the indicator
streamStates.value.set(sid, { abort: cleanup })
}
function stopStreaming() {
const sid = activeSessionId.value
if (!sid) return
@@ -1151,9 +1187,8 @@ export const useChatStore = defineStore('chat', () => {
updateMessage(sid, lastMsg.id, { isStreaming: false })
}
streamStates.value.delete(sid)
clearInFlight(sid)
stopPolling(sid)
}
clearInFlight(sid)
}
// Tab visibility: re-sync when returning to foreground
@@ -1161,8 +1196,10 @@ export const useChatStore = defineStore('chat', () => {
document.addEventListener('visibilitychange', () => {
if (document.visibilityState === 'visible' && activeSessionId.value && !isStreaming.value) {
void refreshActiveSession()
if (readInFlight(activeSessionId.value)) {
startPolling(activeSessionId.value)
// Re-subscribe in case Socket.IO reconnected
const sid = activeSessionId.value
if (sid && !streamStates.value.has(sid)) {
resumeInFlightRun(sid)
}
}
})
@@ -1237,6 +1274,7 @@ export const useChatStore = defineStore('chat', () => {
isStreaming,
isRunActive,
isSessionLive,
compressionState,
isLoadingSessions,
sessionsLoaded,
isLoadingMessages,
@@ -0,0 +1,55 @@
/**
* SQLite-backed compression snapshot store for 1:1 chat sessions.
*
* Stores the latest compression summary and the index of the last
* compressed message, so incremental compression can pick up where
* the previous one left off.
*/
import { isSqliteAvailable, ensureTable, getDb } from '../index'
const TABLE = 'chat_compression_snapshots'
const SCHEMA: Record<string, string> = {
session_id: 'TEXT PRIMARY KEY',
summary: 'TEXT NOT NULL DEFAULT \'\'',
last_message_index: 'INTEGER NOT NULL DEFAULT 0',
message_count_at_time: 'INTEGER NOT NULL DEFAULT 0',
updated_at: 'INTEGER NOT NULL',
}
export function initCompressionSnapshotStore(): void {
if (isSqliteAvailable()) {
ensureTable(TABLE, SCHEMA)
}
}
export function getCompressionSnapshot(sessionId: string): { summary: string; lastMessageIndex: number; messageCountAtTime: number } | null {
if (!isSqliteAvailable()) return null
return getDb()!.prepare(
`SELECT summary, last_message_index AS lastMessageIndex, message_count_at_time AS messageCountAtTime FROM ${TABLE} WHERE session_id = ?`,
).get(sessionId) as any ?? null
}
export function saveCompressionSnapshot(
sessionId: string,
summary: string,
lastMessageIndex: number,
messageCountAtTime: number,
): void {
if (!isSqliteAvailable()) return
getDb()!.prepare(
`INSERT INTO ${TABLE} (session_id, summary, last_message_index, message_count_at_time, updated_at)
VALUES (?, ?, ?, ?, ?)
ON CONFLICT(session_id) DO UPDATE SET
summary = excluded.summary,
last_message_index = excluded.last_message_index,
message_count_at_time = excluded.message_count_at_time,
updated_at = excluded.updated_at`,
).run(sessionId, summary, lastMessageIndex, messageCountAtTime, Date.now())
}
export function deleteCompressionSnapshot(sessionId: string): void {
if (!isSqliteAvailable()) return
getDb()!.prepare(`DELETE FROM ${TABLE} WHERE session_id = ?`).run(sessionId)
}
+4 -4
View File
@@ -242,7 +242,7 @@ function runLiteralContentSearch(
${SESSION_SELECT},
s.parent_session_id AS parent_session_id
FROM sessions s
WHERE s.source != 'tool'
WHERE s.source != 'tool' AND s.id NOT LIKE 'compress_%'
${sourceClause}
)
SELECT
@@ -411,7 +411,7 @@ function loadAllSessions(db: { prepare: (sql: string) => { all: (...params: any[
${SESSION_SELECT},
s.parent_session_id AS parent_session_id
FROM sessions s
WHERE s.source != 'tool'
WHERE s.source != 'tool' AND s.id NOT LIKE 'compress_%'
`).all() as Record<string, unknown>[]
const sessions = rows.map(mapInternalSessionRow)
const byId = new Map(sessions.map(s => [s.id, s]))
@@ -623,7 +623,7 @@ export async function listSessionSummaries(source?: string, limit = 2000): Promi
const db = new DatabaseSync(sessionDbPath(), { open: true, readOnly: true })
try {
const clauses = ["s.parent_session_id IS NULL", "s.source != 'tool'"]
const clauses = ["s.parent_session_id IS NULL", "s.source != 'tool'", "s.id NOT LIKE 'compress_%'"]
const params: any[] = []
if (source) {
clauses.push('s.source = ?')
@@ -689,7 +689,7 @@ export async function searchSessionSummaries(
${SESSION_SELECT},
s.parent_session_id AS parent_session_id
FROM sessions s
WHERE s.source != 'tool'
WHERE s.source != 'tool' AND s.id NOT LIKE 'compress_%'
${sourceClause}
`
+5 -2
View File
@@ -3,7 +3,10 @@ import { mkdirSync, readFileSync, writeFileSync, existsSync } from 'fs'
import { resolve } from 'path'
import { homedir } from 'os'
const DB_DIR = resolve(homedir(), '.hermes-web-ui')
const isDev = process.env.NODE_ENV !== 'production'
const DB_DIR = isDev
? resolve(process.cwd(), 'packages/server/data')
: resolve(homedir(), '.hermes-web-ui')
const DB_PATH = resolve(DB_DIR, 'hermes-web-ui.db')
const JSON_PATH = resolve(DB_DIR, 'hermes-web-ui.json')
@@ -27,7 +30,7 @@ export function getDb(): DatabaseSync | null {
if (!_db) {
mkdirSync(DB_DIR, { recursive: true })
_db = new DatabaseSync(DB_PATH)
_db.exec('PRAGMA journal_mode=WAL')
_db.exec('PRAGMA journal_mode=DELETE')
_db.exec('PRAGMA foreign_keys=ON')
}
return _db
+11
View File
@@ -15,7 +15,9 @@ import { setupTerminalWebSocket } from './routes/hermes/terminal'
import { startVersionCheck } from './routes/health'
import { registerRoutes } from './routes'
import { setGroupChatServer } from './routes/hermes/group-chat'
import { setChatRunServer } from './routes/hermes/chat-run'
import { GroupChatServer } from './services/hermes/group-chat'
import { ChatRunSocket } from './services/hermes/chat-run-socket'
import { logger } from './services/logger'
// Injected by esbuild at build time; fallback to reading package.json in dev mode
@@ -52,6 +54,10 @@ export async function bootstrap() {
initUsageStore()
console.log('[bootstrap] usage store initialized')
const { initCompressionSnapshotStore } = await import('./db/hermes/compression-snapshot')
initCompressionSnapshotStore()
console.log('[bootstrap] compression snapshot store initialized')
app.use(cors({ origin: config.corsOrigins }))
app.use(bodyParser())
console.log('[bootstrap] cors + bodyParser registered')
@@ -92,6 +98,11 @@ export async function bootstrap() {
setGroupChatServer(groupChatServer)
groupChatServer.setGatewayManager(getGatewayManagerInstance())
// Chat run Socket.IO — shares the same Server instance, just adds /chat-run namespace
const chatRunServer = new ChatRunSocket(groupChatServer.getIO(), getGatewayManagerInstance())
setChatRunServer(chatRunServer)
chatRunServer.init()
// Catch-all: destroy upgrade requests not handled by terminal or Socket.IO
server.on('upgrade', (req: any, socket: any) => {
const url = new URL(req.url || '', `http://${req.headers.host}`)
@@ -0,0 +1,592 @@
/**
* Chat Context Compressor
*
* Compresses 1:1 chat conversation history before sending to upstream.
* Uses the Hermes structured summary prompt for LLM-based compression.
*
* Algorithm:
* 1. If total tokens < trigger threshold → return as-is
* 2. Pre-clean: truncate old tool results (no LLM call)
* 3. Load snapshot from SQLite for incremental update
* 4. Keep last 20 messages verbatim (tail protection by message count)
* 5. Summarize everything before the tail
* 6. Save snapshot: last_message_index = index where compression ends
*/
import { EventSource } from 'eventsource'
import { encodingForModel, getEncoding } from 'js-tiktoken'
import { logger } from '../../services/logger'
import {
getCompressionSnapshot,
saveCompressionSnapshot,
deleteCompressionSnapshot,
} from '../../db/hermes/compression-snapshot'
// ─── Types ───────────────────────────────────────────────
export interface ChatMessage {
role: string
content: string
tool_calls?: Array<{ id: string; type: string; function: { name: string; arguments: string } }>
tool_call_id?: string
name?: string
}
export interface CompressionConfig {
/** Token threshold to trigger compression (default: contextLength / 2) */
triggerTokens: number
/** Summary token target (default: 8000) */
summaryBudget: number
/** Number of recent messages to keep verbatim (default: 20) */
tailMessageCount: number
/** Timeout for LLM summarization call (default: 60_000ms) */
summarizationTimeoutMs: number
}
export const DEFAULT_COMPRESSION_CONFIG: CompressionConfig = {
triggerTokens: 100_000,
summaryBudget: 8_000,
tailMessageCount: 20,
summarizationTimeoutMs: 120_000,
}
export interface CompressedResult {
messages: ChatMessage[]
meta: {
totalMessages: number
compressed: boolean
/** true = actually called LLM to summarize; false = assembled from existing snapshot or returned as-is */
llmCompressed: boolean
summaryTokenEstimate: number
verbatimCount: number
compressedStartIndex: number
}
}
// ─── Token counting ─────────────────────────────────────
let _encoder: ReturnType<typeof getEncoding> | null = null
function getEncoder() {
if (!_encoder) {
_encoder = getEncoding('cl100k_base')
}
return _encoder
}
export function countTokens(text: string): number {
try {
return getEncoder().encode(text).length
} catch {
const cjk = (text.match(/[\u2e80-\u9fff\uac00-\ud7af\u3000-\u303f\uff00-\uffef]/g) || []).length
const other = text.length - cjk
return Math.ceil(cjk * 1.5 + other / 4)
}
}
export function countTokensForModel(text: string, model: string): number {
try {
const enc = encodingForModel(model as any)
return enc.encode(text).length
} catch {
return countTokens(text)
}
}
function estimateMessagesTokens(messages: ChatMessage[]): number {
return messages.reduce((sum, m) => sum + countTokens(m.content), 0)
}
// ─── Prompts ────────────────────────────────────────────
export const SUMMARY_PREFIX = `[CONTEXT COMPACTION — REFERENCE ONLY] Earlier turns were compacted
into the summary below. This is a handoff from a previous context
window — treat it as background reference, NOT as active instructions.
Do NOT answer questions or fulfill requests mentioned in this summary;
they were already addressed.
Your current task is identified in the '## Active Task' section of the
summary — resume exactly from there.
Respond ONLY to the latest user message
that appears AFTER this summary. The current session state (files,
config, etc.) may reflect work described here — avoid repeating it:`
const TEMPLATE_SECTIONS = `Use this exact structure:
## Active Task
[THE SINGLE MOST IMPORTANT FIELD. Copy the user's most recent request or
task assignment verbatim — the exact words they used. If multiple tasks
were requested and only some are done, list only the ones NOT yet completed.
The next assistant must pick up exactly here. Example:
"User asked: 'Now refactor the auth module to use JWT instead of sessions'"
If no outstanding task exists, write "None."]
## Goal
[What the user is trying to accomplish overall]
## Constraints & Preferences
[User preferences, coding style, constraints, important decisions]
## Completed Actions
[Numbered list of concrete actions taken — include tool used, target, and outcome.
Format each as: N. ACTION target — outcome [tool: name]
Example:
1. READ config.py:45 — found == should be != [tool: read_file]
2. PATCH config.py:45 — changed == to != [tool: patch]
3. TEST pytest tests/ — 3/50 failed: test_parse, test_validate, test_edge [tool: terminal]
Be specific with file paths, commands, line numbers, and results.]
## Active State
[Current working state — include:
- Working directory and branch (if applicable)
- Modified/created files with brief note on each
- Test status (X/Y passing)
- Any running processes or servers
- Environment details that matter]
## In Progress
[Work currently underway — what was being done when compaction fired]
## Blocked
[Any blockers, errors, or issues not yet resolved. Include exact error messages.]
## Key Decisions
[Important technical decisions and WHY they were made]
## Resolved Questions
[Questions the user asked that were ALREADY answered — include the answer so the next assistant does not re-answer them]
## Pending User Asks
[Questions or requests from the user that have NOT yet been answered or fulfilled. If none, write "None."]
## Relevant Files
[Files read, modified, or created — with brief note on each]
## Remaining Work
[What remains to be done — framed as context, not instructions]
## Critical Context
[Any specific values, error messages, configuration details, or data that would be lost without explicit preservation]`
function buildFullPrompt(contentToSummarize: string, summaryBudget: number): string {
return `You are a summarization agent creating a context checkpoint.
Your output will be injected as reference material for a DIFFERENT
assistant that continues the conversation.
Do NOT respond to any questions or requests in the conversation —
only output the structured summary.
Do NOT include any preamble, greeting, or prefix.
Create a structured handoff summary for a different assistant that will continue
this conversation after earlier turns are compacted. The next assistant should be
able to understand what happened without re-reading the original turns.
TURNS TO SUMMARIZE:
${contentToSummarize}
${TEMPLATE_SECTIONS}
Target ~${summaryBudget} tokens. Be CONCRETE — include file paths, command outputs, error messages, line numbers, and specific values. Avoid vague descriptions like "made some changes" — say exactly what changed.
Write only the summary body. Do not include any preamble or prefix.`
}
function buildIncrementalPrompt(previousSummary: string, contentToSummarize: string, summaryBudget: number): string {
return `You are a summarization agent creating a context checkpoint.
Your output will be injected as reference material for a DIFFERENT
assistant that continues the conversation.
Do NOT respond to any questions or requests in the conversation —
only output the structured summary.
Do NOT include any preamble, greeting, or prefix.
You are updating a context compaction summary. A previous compaction produced the
summary below. New conversation turns have occurred since then and need to be
incorporated.
PREVIOUS SUMMARY:
${previousSummary}
NEW TURNS TO INCORPORATE:
${contentToSummarize}
Update the summary using this exact structure. PRESERVE all existing information
that is still relevant. ADD new completed actions to the numbered list
(continue numbering). Move items from "In Progress" to "Completed Actions" when
done. Move answered questions to "Resolved Questions". Update "Active State"
to reflect current state. Remove information only if it is clearly obsolete.
CRITICAL: Update "## Active Task" to reflect the user's most recent unfulfilled
request — this is the most important field for task continuity.
${TEMPLATE_SECTIONS}
Target ~${summaryBudget} tokens. Be CONCRETE — include file paths, command outputs, error messages, line numbers, and specific values. Avoid vague descriptions like "made some changes" — say exactly what changed.
Write only the summary body. Do not include any preamble or prefix.`
}
// ─── Pre-cleaning ───────────────────────────────────────
function serializeForSummary(messages: ChatMessage[]): string {
const parts: string[] = []
for (const msg of messages) {
const role = msg.role === 'tool' ? `[tool:${msg.name || 'unknown'}]` : msg.role
let content = msg.content || ''
if (msg.role === 'tool' && content.length > 5500) {
content = content.slice(0, 4000) + '\n... [truncated]\n...' + content.slice(-1500)
}
if (msg.role === 'assistant' && msg.tool_calls?.length) {
const toolsInfo = msg.tool_calls.map(tc => {
let args = tc.function.arguments
if (args.length > 1500) args = args.slice(0, 1500) + '...'
return `[tool_call: ${tc.function.name}(${args})]`
}).join('\n')
parts.push(`${role}: ${toolsInfo}`)
if (content.trim()) parts.push(`${role}: ${content}`)
} else {
parts.push(`${role}: ${content}`)
}
}
return parts.join('\n\n')
}
function pruneOldToolResults(messages: ChatMessage[], keepRecentCount: number): ChatMessage[] {
if (messages.length <= keepRecentCount) return messages
const tail = messages.slice(-keepRecentCount)
const head = messages.slice(0, -keepRecentCount)
const pruned = head.map(msg => {
if (msg.role !== 'tool') return msg
const content = msg.content || ''
const preview = content.slice(0, 100).replace(/\n/g, ' ')
const truncated = content.length > 100 ? '...' : ''
return { ...msg, content: `[${msg.name || 'tool'}] ${preview}${truncated}` }
})
return [...pruned, ...tail]
}
// ─── LLM Summarization ──────────────────────────────────
async function callSummarizer(
upstream: string,
apiKey: string | undefined,
prompt: string,
history: Array<{ role: string; content: string }>,
timeoutMs: number,
previousSummary?: string,
): Promise<string> {
const sessionId = `compress_${Date.now().toString(36)}_${Math.random().toString(36).slice(2, 8)}`
const convHistory: Array<{ role: string; content: string }> = [...history]
if (previousSummary) {
convHistory.unshift(
{ role: 'user', content: `[Previous summary]\n${previousSummary}` },
{ role: 'assistant', content: 'Understood, I will update the summary.' },
)
}
const headers: Record<string, string> = { 'Content-Type': 'application/json' }
if (apiKey) headers['Authorization'] = `Bearer ${apiKey}`
const res = await fetch(`${upstream}/v1/runs`, {
method: 'POST',
headers,
body: JSON.stringify({
input: prompt,
conversation_history: convHistory,
session_id: sessionId,
}),
signal: AbortSignal.timeout(timeoutMs),
})
if (!res.ok) {
throw new Error(`Summarization run failed: ${res.status}`)
}
const { run_id } = await res.json() as { run_id: string }
return new Promise<string>((resolve, reject) => {
const timer = setTimeout(() => {
source.close()
reject(new Error('Summarization timed out'))
}, timeoutMs)
const eventsUrl = new URL(`${upstream}/v1/runs/${run_id}/events`)
if (apiKey) eventsUrl.searchParams.set('token', apiKey)
const source = new EventSource(eventsUrl.toString())
source.onmessage = (event: MessageEvent) => {
try {
const parsed = JSON.parse(event.data)
if (parsed.event === 'run.completed') {
clearTimeout(timer)
source.close()
deleteCompressSession(upstream, apiKey, sessionId).catch(() => {})
const output = parsed.output
if (!output || typeof output !== 'string' || output.trim() === '') {
reject(new Error('Empty summarization response'))
return
}
resolve(output.trim())
} else if (parsed.event === 'run.failed') {
clearTimeout(timer)
source.close()
deleteCompressSession(upstream, apiKey, sessionId).catch(() => {})
reject(new Error(parsed.error || 'Summarization run failed'))
}
} catch { /* ignore parse errors */ }
}
source.onerror = () => {
clearTimeout(timer)
source.close()
deleteCompressSession(upstream, apiKey, sessionId).catch(() => {})
reject(new Error('Summarization SSE connection error'))
}
})
}
/** Best-effort delete the temporary compression session from the gateway */
async function deleteCompressSession(upstream: string, apiKey: string | undefined, sessionId: string): Promise<void> {
try {
const headers: Record<string, string> = {}
if (apiKey) headers['Authorization'] = `Bearer ${apiKey}`
await fetch(`${upstream}/api/sessions/${sessionId}`, {
method: 'DELETE',
headers,
signal: AbortSignal.timeout(5000),
})
} catch { /* best-effort */ }
}
// ─── Main Compressor ────────────────────────────────────
export class ChatContextCompressor {
private config: CompressionConfig
constructor(opts?: {
config?: Partial<CompressionConfig>
}) {
this.config = { ...DEFAULT_COMPRESSION_CONFIG, ...opts?.config }
}
/**
* Assemble and compress conversation history.
*
* Flow:
* 1. Check snapshot → if exists, assemble = summary + new messages after snapshot index
* 2. If no snapshot → assemble = all messages
* 3. Count tokens of assembled context
* 4. Under threshold → return assembled as-is (no LLM call)
* 5. Over threshold → LLM compress, keep last N messages, save new snapshot
*/
async compress(
messages: ChatMessage[],
upstream: string,
apiKey: string | undefined,
sessionId?: string,
contextLength?: number,
): Promise<CompressedResult> {
const cl = contextLength || 200_000
const triggerTokens = Math.floor(cl / 2)
const total = messages.length
const makeMeta = (opts: Partial<CompressedResult['meta']> = {}): CompressedResult['meta'] => ({
totalMessages: total,
compressed: false,
llmCompressed: false,
summaryTokenEstimate: 0,
verbatimCount: total,
compressedStartIndex: -1,
...opts,
})
// ── Step 1: Check snapshot first ─────────────────────
const snapshot = sessionId ? getCompressionSnapshot(sessionId) : null
if (snapshot) {
const { summary: previousSummary, lastMessageIndex } = snapshot
const newMessages = messages.slice(lastMessageIndex + 1)
const summaryTokens = countTokens(SUMMARY_PREFIX + previousSummary)
const newTokens = estimateMessagesTokens(newMessages)
const assembledTokens = summaryTokens + newTokens
logger.info(
'[context-compressor] session=%s: snapshot at %d, %d new messages, assembled ~%d tokens (threshold %d)',
sessionId, lastMessageIndex, newMessages.length, assembledTokens, triggerTokens,
)
// Under threshold → return summary + new messages, no LLM call
if (assembledTokens <= triggerTokens) {
const result: ChatMessage[] = [
{ role: 'system', content: SUMMARY_PREFIX + '\n\n' + previousSummary },
...newMessages,
]
return {
messages: result,
meta: makeMeta({
compressed: true,
llmCompressed: false,
summaryTokenEstimate: summaryTokens,
verbatimCount: newMessages.length,
compressedStartIndex: lastMessageIndex,
}),
}
}
// Over threshold → incremental LLM compress
return this.incrementalCompress(
messages, snapshot, upstream, apiKey, sessionId!, makeMeta(),
)
}
// ── Step 2: No snapshot — check all messages ──────────
const totalTokens = estimateMessagesTokens(messages)
logger.info(
'[context-compressor] session=%s: no snapshot, %d messages, ~%d tokens (threshold %d)',
sessionId, total, totalTokens, triggerTokens,
)
if (totalTokens <= triggerTokens) {
return { messages, meta: makeMeta() }
}
// Over threshold → full LLM compress
return this.fullCompress(messages, upstream, apiKey, sessionId!, makeMeta())
}
private async incrementalCompress(
messages: ChatMessage[],
snapshot: { summary: string; lastMessageIndex: number },
upstream: string,
apiKey: string | undefined,
sessionId: string,
meta: CompressedResult['meta'],
): Promise<CompressedResult> {
const { summary: previousSummary, lastMessageIndex } = snapshot
const total = messages.length
const cleaned = pruneOldToolResults(messages, this.config.tailMessageCount)
const newMessages = cleaned.slice(lastMessageIndex + 1)
const tailCount = this.config.tailMessageCount
// Keep last N of new messages, compress the rest
const tailStart = Math.max(0, newMessages.length - tailCount)
const toCompress = newMessages.slice(0, tailStart)
const tail = newMessages.slice(tailStart)
logger.info(
'[context-compressor] [incremental-llm] compressing %d of %d new messages, keeping %d tail',
toCompress.length, newMessages.length, tail.length,
)
let summary: string | null = null
try {
const contentToSummarize = serializeForSummary(toCompress)
const prompt = buildIncrementalPrompt(previousSummary, contentToSummarize, this.config.summaryBudget)
const history = toCompress
.filter(m => m.role === 'user' || m.role === 'assistant')
.map(m => ({ role: m.role, content: m.content }))
const t0 = Date.now()
summary = await callSummarizer(upstream, apiKey, prompt, history, this.config.summarizationTimeoutMs, previousSummary)
logger.info('[context-compressor] incremental-llm done in %dms, %d chars', Date.now() - t0, summary.length)
} catch (err: any) {
logger.warn('[context-compressor] incremental-llm failed: %s — reusing previous summary', err.message)
summary = previousSummary
}
const result: ChatMessage[] = [
{ role: 'system', content: SUMMARY_PREFIX + '\n\n' + summary },
...tail,
]
const newLastIndex = lastMessageIndex + tailStart
if (sessionId) {
saveCompressionSnapshot(sessionId, summary, newLastIndex, total)
}
return {
messages: result,
meta: {
...meta,
compressed: true,
llmCompressed: true,
summaryTokenEstimate: countTokens(SUMMARY_PREFIX + summary),
verbatimCount: tail.length,
compressedStartIndex: newLastIndex,
},
}
}
private async fullCompress(
messages: ChatMessage[],
upstream: string,
apiKey: string | undefined,
sessionId: string,
meta: CompressedResult['meta'],
): Promise<CompressedResult> {
const total = messages.length
const cleaned = pruneOldToolResults(messages, this.config.tailMessageCount)
const tailCount = this.config.tailMessageCount
if (total <= tailCount) {
return { messages: cleaned, meta }
}
const tailStart = total - tailCount
const toCompress = cleaned.slice(0, tailStart)
const tail = cleaned.slice(tailStart)
logger.info(
'[context-compressor] [full-llm] compressing messages 0-%d, keeping %d-%d',
tailStart - 1, tailStart, total - 1,
)
const contentToSummarize = serializeForSummary(toCompress)
const prompt = buildFullPrompt(contentToSummarize, this.config.summaryBudget)
const history = toCompress
.filter(m => m.role === 'user' || m.role === 'assistant')
.map(m => ({ role: m.role, content: m.content }))
let summary: string | null = null
try {
const t0 = Date.now()
summary = await callSummarizer(upstream, apiKey, prompt, history, this.config.summarizationTimeoutMs)
logger.info('[context-compressor] full-llm done in %dms, %d chars', Date.now() - t0, summary.length)
} catch (err: any) {
logger.warn('[context-compressor] full-llm failed: %s', err.message)
}
const result: ChatMessage[] = []
if (summary) {
result.push({ role: 'system', content: SUMMARY_PREFIX + '\n\n' + summary })
if (sessionId) {
saveCompressionSnapshot(sessionId, summary, tailStart - 1, total)
}
}
result.push(...tail)
return {
messages: result,
meta: {
...meta,
compressed: true,
llmCompressed: !!summary,
summaryTokenEstimate: summary ? countTokens(SUMMARY_PREFIX + summary) : 0,
verbatimCount: tail.length,
compressedStartIndex: tailStart - 1,
},
}
}
/** Remove snapshot for a session (e.g. when session is deleted) */
static invalidateSnapshot(sessionId: string): void {
deleteCompressionSnapshot(sessionId)
}
}
@@ -0,0 +1,11 @@
import type { ChatRunSocket } from '../../services/hermes/chat-run-socket'
let chatRunServer: ChatRunSocket | null = null
export function setChatRunServer(server: ChatRunSocket): void {
chatRunServer = server
}
export function getChatRunServer(): ChatRunSocket | null {
return chatRunServer
}
@@ -15,7 +15,7 @@ export function setRunSession(runId: string, sessionId: string): void {
setTimeout(() => runSessionMap.delete(runId), 30 * 60 * 1000)
}
function getSessionForRun(runId: string): string | undefined {
export function getSessionForRun(runId: string): string | undefined {
return runSessionMap.get(runId)
}
@@ -0,0 +1,467 @@
/**
* Chat run via Socket.IO — namespace /chat-run.
*
* Replaces HTTP POST + SSE. Socket.IO decouples message handling
* from connection lifecycle: the server continues streaming upstream
* events even after the client disconnects or refreshes.
*
* Uses Socket.IO rooms keyed by session_id. On client reconnect,
* the client emits 'resume' to rejoin its session room.
*/
import type { Server, Socket } from 'socket.io'
import { EventSource } from 'eventsource'
import { setRunSession, getSessionForRun } from '../../routes/hermes/proxy-handler'
import { updateUsage } from '../../db/hermes/usage-store'
import { getSessionDetailFromDb } from '../../db/hermes/sessions-db'
import { getModelContextLength } from './model-context'
import { ChatContextCompressor, countTokens, SUMMARY_PREFIX } from '../../lib/context-compressor'
import { getCompressionSnapshot } from '../../db/hermes/compression-snapshot'
import { logger } from '../logger'
const compressor = new ChatContextCompressor()
// --- In-flight run tracking ---
interface InFlightRun {
runId: string
abortController: AbortController
}
// --- ChatRunSocket ---
export class ChatRunSocket {
private nsp: ReturnType<Server['of']>
private gatewayManager: any
/** sessionId → InFlightRun */
private activeRuns = new Map<string, InFlightRun>()
/** sessionId → accumulated state events for reconnecting clients */
private sessionStates = new Map<string, Array<{ event: string; data: any }>>()
constructor(io: Server, gatewayManager: any) {
this.nsp = io.of('/chat-run')
this.gatewayManager = gatewayManager
}
init() {
this.nsp.use(this.authMiddleware.bind(this))
this.nsp.on('connection', this.onConnection.bind(this))
logger.info('[chat-run-socket] Socket.IO ready at /chat-run')
}
// --- Auth middleware ---
private async authMiddleware(socket: Socket, next: (err?: Error) => void) {
const token = socket.handshake.auth?.token as string | undefined
if (!process.env.AUTH_DISABLED && process.env.AUTH_DISABLED !== '1') {
const { getToken } = await import('../auth')
const serverToken = await getToken()
if (serverToken && token !== serverToken) {
return next(new Error('Authentication failed'))
}
}
next()
}
// --- Connection handler ---
private onConnection(socket: Socket) {
const profile = (socket.handshake.query?.profile as string) || 'default'
socket.on('run', async (data: {
input: string
session_id?: string
model?: string
instructions?: string
}) => {
await this.handleRun(socket, data, profile)
})
socket.on('resume', (data: { session_id?: string }) => {
if (data.session_id) {
const sid = data.session_id
const room = `session:${sid}`
socket.join(room)
// Replay all accumulated state events for this session
const states = this.sessionStates.get(sid)
if (states) {
for (const state of states) {
socket.emit(state.event, { ...state.data, session_id: sid })
}
logger.info('[chat-run-socket] replayed %d state events for reconnecting client on session %s', states.length, sid)
}
logger.info('[chat-run-socket] socket %s resumed session %s (active: %s)', socket.id, sid, this.activeRuns.has(sid))
}
})
socket.on('abort', (data: { session_id?: string }) => {
if (data.session_id) {
this.handleAbort(data.session_id)
}
})
}
// --- Run handler ---
private async handleRun(
socket: Socket,
data: { input: string; session_id?: string; model?: string; instructions?: string },
profile: string,
) {
const { input, session_id, model, instructions } = data
const upstream = (process.env.UPSTREAM || 'http://127.0.0.1:8642').replace(/\/$/, '')
const apiKey = this.gatewayManager.getApiKey(profile) || undefined
// Join session room — events go to room, survives socket disconnect
if (session_id) {
socket.join(`session:${session_id}`)
}
// Emit helper: tag every payload with session_id
const emit = (event: string, payload: any) => {
const tagged = session_id ? { ...payload, session_id } : payload
if (session_id) {
this.nsp.to(`session:${session_id}`).emit(event, tagged)
} else if (socket.connected) {
socket.emit(event, tagged)
}
}
try {
// Build upstream request body
const body: Record<string, any> = { input }
if (session_id) body.session_id = session_id
if (model) body.model = model
if (instructions) body.instructions = instructions
// Build conversation_history from DB if session_id is provided
if (session_id) {
try {
const detail = await getSessionDetailFromDb(session_id)
if (detail?.messages?.length) {
let history: Array<{
role: string
content: string
tool_calls?: any[]
tool_call_id?: string
name?: string
}> = detail.messages
.filter(m => (m.role === 'user' || m.role === 'assistant' || m.role === 'tool') && m.content !== undefined)
.map(m => {
const msg: any = { role: m.role, content: m.content || '' }
if (m.tool_calls?.length) msg.tool_calls = m.tool_calls
if (m.tool_call_id) msg.tool_call_id = m.tool_call_id
if (m.tool_name) msg.name = m.tool_name
return msg
})
// Context compression with snapshot awareness
const contextLength = getModelContextLength(profile)
const triggerTokens = Math.floor(contextLength / 2)
// Step 1: Check existing snapshot — if present, assemble summary + new messages
const snapshot = session_id ? getCompressionSnapshot(session_id) : null
if (snapshot) {
const newMessages = history.slice(snapshot.lastMessageIndex + 1)
const summaryTokens = countTokens(SUMMARY_PREFIX + snapshot.summary)
const newTokens = newMessages.reduce((sum, m) => sum + countTokens(m.content), 0)
const assembledTokens = summaryTokens + newTokens
logger.info('[context-compress] session=%s: snapshot at %d, %d new messages, assembled ~%d tokens (threshold %d)',
session_id, snapshot.lastMessageIndex, newMessages.length, assembledTokens, triggerTokens)
if (assembledTokens <= triggerTokens) {
// Under threshold — use assembled context directly, no LLM call needed
history = [
{ role: 'user', content: SUMMARY_PREFIX + '\n\n' + snapshot.summary },
...newMessages,
]
} else {
// Over threshold — needs incremental LLM compression
const beforeTokens = assembledTokens
this.pushState(session_id, 'compression.started', {
event: 'compression.started',
message_count: newMessages.length,
token_count: beforeTokens,
})
emit('compression.started', {
event: 'compression.started',
message_count: newMessages.length,
token_count: beforeTokens,
})
try {
const result = await compressor.compress(
history, upstream, apiKey, session_id, contextLength,
)
this.replaceState(session_id, 'compression.completed', {
event: 'compression.completed',
compressed: result.meta.compressed,
llmCompressed: result.meta.llmCompressed,
totalMessages: result.meta.totalMessages,
resultMessages: result.messages.length,
beforeTokens,
afterTokens: result.messages.reduce((sum, m) => sum + countTokens(m.content), 0),
summaryTokens: result.meta.summaryTokenEstimate,
verbatimCount: result.meta.verbatimCount,
compressedStartIndex: result.meta.compressedStartIndex,
})
logger.info('[context-compress] AFTER session=%s: %d messages, ~%d tokens (was %d)', session_id, result.messages.length, result.messages.reduce((sum, m) => sum + countTokens(m.content), 0), beforeTokens)
emit('compression.completed', {
event: 'compression.completed',
compressed: result.meta.compressed,
llmCompressed: result.meta.llmCompressed,
totalMessages: result.meta.totalMessages,
resultMessages: result.messages.length,
beforeTokens,
afterTokens: result.messages.reduce((sum, m) => sum + countTokens(m.content), 0),
summaryTokens: result.meta.summaryTokenEstimate,
verbatimCount: result.meta.verbatimCount,
compressedStartIndex: result.meta.compressedStartIndex,
})
history = result.messages.map(m => ({
role: m.role,
content: m.content,
tool_calls: m.tool_calls,
tool_call_id: m.tool_call_id,
name: m.name,
}))
} catch (err: any) {
this.replaceState(session_id, 'compression.completed', {
event: 'compression.completed',
compressed: false,
totalMessages: newMessages.length,
resultMessages: newMessages.length,
beforeTokens,
afterTokens: beforeTokens,
summaryTokens: 0,
verbatimCount: newMessages.length,
compressedStartIndex: -1,
error: err.message,
})
logger.warn(err, '[chat-run-socket] compression failed for session %s, using assembled context', session_id)
emit('compression.completed', {
event: 'compression.completed',
compressed: false,
totalMessages: newMessages.length,
resultMessages: newMessages.length,
beforeTokens,
afterTokens: beforeTokens,
summaryTokens: 0,
verbatimCount: newMessages.length,
compressedStartIndex: -1,
error: err.message,
})
}
}
} else if (history.length > 4) {
// No snapshot — check if raw history exceeds threshold
const beforeTokens = history.reduce((sum, m) => sum + countTokens(m.content), 0)
if (beforeTokens <= triggerTokens) {
// Under threshold — use raw history as-is
logger.info('[context-compress] session=%s: %d messages, ~%d tokens — under threshold, skip', session_id, history.length, beforeTokens)
} else {
// Over threshold — full LLM compression
logger.info('[context-compress] BEFORE session=%s: %d messages, ~%d tokens (threshold %d)', session_id, history.length, beforeTokens, triggerTokens)
this.pushState(session_id, 'compression.started', {
event: 'compression.started',
message_count: history.length,
token_count: beforeTokens,
})
emit('compression.started', {
event: 'compression.started',
message_count: history.length,
token_count: beforeTokens,
})
try {
const result = await compressor.compress(
history, upstream, apiKey, session_id, contextLength,
)
this.replaceState(session_id, 'compression.completed', {
event: 'compression.completed',
compressed: result.meta.compressed,
llmCompressed: result.meta.llmCompressed,
totalMessages: result.meta.totalMessages,
resultMessages: result.messages.length,
beforeTokens,
afterTokens: result.messages.reduce((sum, m) => sum + countTokens(m.content), 0),
summaryTokens: result.meta.summaryTokenEstimate,
verbatimCount: result.meta.verbatimCount,
compressedStartIndex: result.meta.compressedStartIndex,
})
logger.info('[context-compress] AFTER session=%s: %d messages, ~%d tokens (was %d)', session_id, result.messages.length, result.messages.reduce((sum, m) => sum + countTokens(m.content), 0), beforeTokens)
emit('compression.completed', {
event: 'compression.completed',
compressed: result.meta.compressed,
llmCompressed: result.meta.llmCompressed,
totalMessages: result.meta.totalMessages,
resultMessages: result.messages.length,
beforeTokens,
afterTokens: result.messages.reduce((sum, m) => sum + countTokens(m.content), 0),
summaryTokens: result.meta.summaryTokenEstimate,
verbatimCount: result.meta.verbatimCount,
compressedStartIndex: result.meta.compressedStartIndex,
})
history = result.messages.map(m => ({
role: m.role,
content: m.content,
tool_calls: m.tool_calls,
tool_call_id: m.tool_call_id,
name: m.name,
}))
} catch (err: any) {
this.replaceState(session_id, 'compression.completed', {
event: 'compression.completed',
compressed: false,
totalMessages: history.length,
resultMessages: history.length,
beforeTokens,
afterTokens: beforeTokens,
summaryTokens: 0,
verbatimCount: history.length,
compressedStartIndex: -1,
error: err.message,
})
logger.warn(err, '[chat-run-socket] compression failed for session %s, using raw history', session_id)
emit('compression.completed', {
event: 'compression.completed',
compressed: false,
totalMessages: history.length,
resultMessages: history.length,
beforeTokens,
afterTokens: beforeTokens,
summaryTokens: 0,
verbatimCount: history.length,
compressedStartIndex: -1,
error: err.message,
})
}
}
}
body.conversation_history = history
}
} catch (err) {
logger.warn(err, '[chat-run-socket] failed to load conversation history for session %s', session_id)
}
}
const headers: Record<string, string> = { 'Content-Type': 'application/json' }
if (apiKey) headers['Authorization'] = `Bearer ${apiKey}`
const res = await fetch(`${upstream}/v1/runs`, {
method: 'POST',
headers,
body: JSON.stringify(body),
signal: AbortSignal.timeout(120_000),
})
if (!res.ok) {
const text = await res.text().catch(() => '')
emit('run.failed', { event: 'run.failed', error: `Upstream ${res.status}: ${text}` })
return
}
const runData = await res.json() as any
const runId = runData.run_id
if (!runId) {
emit('run.failed', { event: 'run.failed', error: 'No run_id in upstream response' })
return
}
if (session_id) {
setRunSession(runId, session_id)
}
const abortController = new AbortController()
if (session_id) {
this.activeRuns.set(session_id, { runId, abortController })
}
emit('run.started', { event: 'run.started', run_id: runId, status: runData.status })
// Stream upstream events via EventSource — survives socket disconnect
const eventsUrl = new URL(`${upstream}/v1/runs/${runId}/events`)
if (apiKey) eventsUrl.searchParams.set('token', apiKey)
const source = new EventSource(eventsUrl.toString())
source.onmessage = (event: MessageEvent) => {
try {
const parsed = JSON.parse(event.data as string)
// Intercept run.completed for usage tracking
if (parsed.event === 'run.completed' && parsed.usage && parsed.run_id) {
const sid = getSessionForRun(parsed.run_id)
if (sid) {
updateUsage(sid, parsed.usage.input_tokens, parsed.usage.output_tokens)
}
}
emit(parsed.event || 'message', parsed)
if (parsed.event === 'run.completed' || parsed.event === 'run.failed') {
source.close()
if (session_id) this.markCompleted(session_id, { event: parsed.event, run_id: parsed.run_id })
}
} catch { /* not JSON, skip */ }
}
source.onerror = () => {
source.close()
emit('run.failed', { event: 'run.failed', error: 'EventSource connection lost' })
if (session_id) this.markCompleted(session_id, { event: 'run.failed' })
}
} catch (err: any) {
emit('run.failed', { event: 'run.failed', error: err.message })
if (session_id) this.markCompleted(session_id, { event: 'run.failed' })
}
}
// --- Abort handler ---
private handleAbort(sessionId: string) {
const run = this.activeRuns.get(sessionId)
if (run) {
run.abortController.abort()
this.markCompleted(sessionId, { event: 'run.failed', run_id: run.runId })
}
}
/** Mark a session run as completed/failed so reconnecting clients get notified */
private markCompleted(sessionId: string, info: { event: string; run_id?: string }) {
this.activeRuns.delete(sessionId)
this.pushState(sessionId, info.event, { event: info.event, run_id: info.run_id })
// Auto-cleanup after 30s — enough time for a page refresh
setTimeout(() => this.sessionStates.delete(sessionId), 30_000)
}
/** Append a state event for a session (used for replay on reconnect) */
private pushState(sessionId: string, event: string, data: any) {
if (!this.sessionStates.has(sessionId)) {
this.sessionStates.set(sessionId, [])
}
this.sessionStates.get(sessionId)!.push({ event, data })
}
/** Replace the last state with the same event name, or append if different */
private replaceState(sessionId: string, event: string, data: any) {
const states = this.sessionStates.get(sessionId)
if (states) {
const idx = states.findIndex(s => s.event === event)
if (idx >= 0) {
states[idx] = { event, data }
return
}
}
this.pushState(sessionId, event, data)
}
}