From e7cb99431d75f5c5c7e68850a3e495f329290c22 Mon Sep 17 00:00:00 2001 From: ZeroIce Date: Thu, 25 Jun 2026 00:32:52 +0800 Subject: [PATCH 1/2] Bound agentflow token counting fallback --- .../components/nodes/agentflow/Agent/Agent.ts | 8 ++- .../ConditionAgent/ConditionAgent.ts | 13 +++- .../components/nodes/agentflow/LLM/LLM.ts | 8 ++- .../components/nodes/agentflow/utils.test.ts | 67 ++++++++++++++++++- packages/components/nodes/agentflow/utils.ts | 50 ++++++++++++++ 5 files changed, 136 insertions(+), 10 deletions(-) diff --git a/packages/components/nodes/agentflow/Agent/Agent.ts b/packages/components/nodes/agentflow/Agent/Agent.ts index cd031ef4f40..36684ea468d 100644 --- a/packages/components/nodes/agentflow/Agent/Agent.ts +++ b/packages/components/nodes/agentflow/Agent/Agent.ts @@ -32,7 +32,8 @@ import { revertBase64ImagesToFileRefs, normalizeMessagesForStorage, replaceInlineDataWithFileReferences, - updateFlowState + updateFlowState, + createTokenCounter } from '../utils' import { convertMultiOptionsToStringArray, @@ -1806,10 +1807,11 @@ class Agent_Agentflow implements INode { abortController: AbortController ): Promise { const maxTokenLimit = (nodeData.inputs?.agentMemoryMaxTokenLimit as number) || 2000 + const countTokens = createTokenCounter(llmWithoutToolsBind) // Convert past messages to a format suitable for token counting const messagesString = pastMessages.map((msg: any) => `${msg.role}: ${msg.content}`).join('\n') - const tokenCount = await llmWithoutToolsBind.getNumTokens(messagesString) + const tokenCount = await countTokens(messagesString) if (tokenCount > maxTokenLimit) { // Calculate how many messages to summarize (messages that exceed the token limit) @@ -1824,7 +1826,7 @@ class Agent_Agentflow implements INode { messagesToSummarize.push(poppedMessage) // Recalculate token count for remaining messages const remainingMessagesString = remainingMessages.map((msg: any) => `${msg.role}: ${msg.content}`).join('\n') - currBufferLength = await llmWithoutToolsBind.getNumTokens(remainingMessagesString) + currBufferLength = await countTokens(remainingMessagesString) } } diff --git a/packages/components/nodes/agentflow/ConditionAgent/ConditionAgent.ts b/packages/components/nodes/agentflow/ConditionAgent/ConditionAgent.ts index 8c714ea1e61..db2ec323e6f 100644 --- a/packages/components/nodes/agentflow/ConditionAgent/ConditionAgent.ts +++ b/packages/components/nodes/agentflow/ConditionAgent/ConditionAgent.ts @@ -1,7 +1,13 @@ import { AnalyticHandler } from '../../../src/handler' import { ICommonObject, IMessage, INode, INodeData, INodeOptionsValue, INodeOutputsValue, INodeParams } from '../../../src/Interface' import { AIMessageChunk, BaseMessageLike } from '@langchain/core/messages' -import { getPastChatHistoryImageMessages, getUniqueImageMessages, processMessagesWithImages, revertBase64ImagesToFileRefs } from '../utils' +import { + createTokenCounter, + getPastChatHistoryImageMessages, + getUniqueImageMessages, + processMessagesWithImages, + revertBase64ImagesToFileRefs +} from '../utils' import { CONDITION_AGENT_SYSTEM_PROMPT, DEFAULT_SUMMARIZER_TEMPLATE } from '../prompt' import { BaseChatModel } from '@langchain/core/language_models/chat_models' import { findBestScenarioIndex } from './matchScenario' @@ -608,10 +614,11 @@ class ConditionAgent_Agentflow implements INode { abortController: AbortController ): Promise { const maxTokenLimit = (nodeData.inputs?.conditionAgentMemoryMaxTokenLimit as number) || 2000 + const countTokens = createTokenCounter(llmNodeInstance) // Convert past messages to a format suitable for token counting const messagesString = pastMessages.map((msg: any) => `${msg.role}: ${msg.content}`).join('\n') - const tokenCount = await llmNodeInstance.getNumTokens(messagesString) + const tokenCount = await countTokens(messagesString) if (tokenCount > maxTokenLimit) { // Calculate how many messages to summarize (messages that exceed the token limit) @@ -626,7 +633,7 @@ class ConditionAgent_Agentflow implements INode { messagesToSummarize.push(poppedMessage) // Recalculate token count for remaining messages const remainingMessagesString = remainingMessages.map((msg: any) => `${msg.role}: ${msg.content}`).join('\n') - currBufferLength = await llmNodeInstance.getNumTokens(remainingMessagesString) + currBufferLength = await countTokens(remainingMessagesString) } } diff --git a/packages/components/nodes/agentflow/LLM/LLM.ts b/packages/components/nodes/agentflow/LLM/LLM.ts index ccb5d88f594..1ad8f362d97 100644 --- a/packages/components/nodes/agentflow/LLM/LLM.ts +++ b/packages/components/nodes/agentflow/LLM/LLM.ts @@ -13,7 +13,8 @@ import { processMessagesWithImages, revertBase64ImagesToFileRefs, replaceInlineDataWithFileReferences, - updateFlowState + updateFlowState, + createTokenCounter } from '../utils' import { processTemplateVariables, configureStructuredOutput, extractResponseContent } from '../../../src/utils' import { getModelConfigByModelName, MODEL_TYPE } from '../../../src/modelLoader' @@ -797,10 +798,11 @@ class LLM_Agentflow implements INode { abortController: AbortController ): Promise { const maxTokenLimit = (nodeData.inputs?.llmMemoryMaxTokenLimit as number) || 2000 + const countTokens = createTokenCounter(llmNodeInstance) // Convert past messages to a format suitable for token counting const messagesString = pastMessages.map((msg: any) => `${msg.role}: ${msg.content}`).join('\n') - const tokenCount = await llmNodeInstance.getNumTokens(messagesString) + const tokenCount = await countTokens(messagesString) if (tokenCount > maxTokenLimit) { // Calculate how many messages to summarize (messages that exceed the token limit) @@ -815,7 +817,7 @@ class LLM_Agentflow implements INode { messagesToSummarize.push(poppedMessage) // Recalculate token count for remaining messages const remainingMessagesString = remainingMessages.map((msg: any) => `${msg.role}: ${msg.content}`).join('\n') - currBufferLength = await llmNodeInstance.getNumTokens(remainingMessagesString) + currBufferLength = await countTokens(remainingMessagesString) } } diff --git a/packages/components/nodes/agentflow/utils.test.ts b/packages/components/nodes/agentflow/utils.test.ts index 330631aa207..3d2e1263731 100644 --- a/packages/components/nodes/agentflow/utils.test.ts +++ b/packages/components/nodes/agentflow/utils.test.ts @@ -1,4 +1,11 @@ -import { revertBase64ImagesToFileRefs, processMessagesWithImages, addImageArtifactsToMessages, getUniqueImageMessages } from './utils' +import { + revertBase64ImagesToFileRefs, + processMessagesWithImages, + addImageArtifactsToMessages, + getUniqueImageMessages, + createTokenCounter, + getApproximateTokenCount +} from './utils' import { sanitizeFileName } from '../../src/validator' import { IChatMessage, IMultimodalContentItem } from './Interface.Agentflow' import { IFileUpload } from '../../src/Interface' @@ -657,3 +664,61 @@ describe('path traversal prevention in image processing', () => { expect(fileRef?.fileName).not.toContain('..') }) }) + +describe('createTokenCounter', () => { + const originalEnv = process.env + let warnSpy: jest.SpyInstance + + beforeEach(() => { + jest.resetModules() + process.env = { ...originalEnv } + delete process.env.DISABLE_TIKTOKEN + delete process.env.USE_APPROXIMATE_TOKENS + delete process.env.TIKTOKEN_TIMEOUT + warnSpy = jest.spyOn(console, 'warn').mockImplementation(() => undefined) + }) + + afterEach(() => { + warnSpy.mockRestore() + process.env = originalEnv + }) + + it('uses the model token counter when it resolves before timeout', async () => { + const llm = { getNumTokens: jest.fn().mockResolvedValue(42) } + const countTokens = createTokenCounter(llm) + + await expect(countTokens('hello world')).resolves.toBe(42) + expect(llm.getNumTokens).toHaveBeenCalledWith('hello world') + expect(warnSpy).not.toHaveBeenCalled() + }) + + it('can use approximate token counts without calling the model counter', async () => { + process.env.DISABLE_TIKTOKEN = 'true' + const llm = { getNumTokens: jest.fn().mockResolvedValue(42) } + const countTokens = createTokenCounter(llm) + + await expect(countTokens('12345678')).resolves.toBe(2) + expect(llm.getNumTokens).not.toHaveBeenCalled() + }) + + it('falls back to approximate counts after a tokenizer error', async () => { + const llm = { getNumTokens: jest.fn().mockRejectedValue(new Error('fetch failed')) } + const countTokens = createTokenCounter(llm) + + await expect(countTokens('12345678')).resolves.toBe(2) + await expect(countTokens('123456789012')).resolves.toBe(3) + expect(llm.getNumTokens).toHaveBeenCalledTimes(1) + expect(warnSpy).toHaveBeenCalledTimes(1) + }) + + it('falls back to approximate counts after token counting times out', async () => { + process.env.TIKTOKEN_TIMEOUT = '1' + const llm = { getNumTokens: jest.fn(() => new Promise(() => undefined)) } + const countTokens = createTokenCounter(llm) + + await expect(countTokens('123456789')).resolves.toBe(getApproximateTokenCount('123456789')) + await expect(countTokens('1234')).resolves.toBe(1) + expect(llm.getNumTokens).toHaveBeenCalledTimes(1) + expect(warnSpy).toHaveBeenCalledTimes(1) + }) +}) diff --git a/packages/components/nodes/agentflow/utils.ts b/packages/components/nodes/agentflow/utils.ts index 4b673b237ad..36d50265e07 100644 --- a/packages/components/nodes/agentflow/utils.ts +++ b/packages/components/nodes/agentflow/utils.ts @@ -56,8 +56,58 @@ const ARTIFACT_TYPES: Record = { pdf: 'text' } +const DEFAULT_TOKEN_COUNT_TIMEOUT_MS = 1500 + +type TokenCountingModel = { + getNumTokens(text: string): Promise +} + // ─── Shared helpers (used across multiple functions) ───────────────────────── +const isTruthyEnv = (value?: string): boolean => ['1', 'true', 'yes'].includes((value || '').toLowerCase()) + +const getTokenCountTimeoutMs = (): number => { + const timeout = Number(process.env.TIKTOKEN_TIMEOUT) + return Number.isFinite(timeout) && timeout > 0 ? timeout : DEFAULT_TOKEN_COUNT_TIMEOUT_MS +} + +const getNumTokensWithTimeout = async (llm: TokenCountingModel, text: string, timeoutMs: number): Promise => { + return new Promise((resolve, reject) => { + const timeout = setTimeout(() => reject(new Error(`Token counting timed out after ${timeoutMs}ms`)), timeoutMs) + + llm.getNumTokens(text).then( + (count) => { + clearTimeout(timeout) + resolve(count) + }, + (error) => { + clearTimeout(timeout) + reject(error) + } + ) + }) +} + +export const getApproximateTokenCount = (text: string): number => Math.ceil((text || '').length / 4) + +export const createTokenCounter = (llm: TokenCountingModel): ((text: string) => Promise) => { + let useApproximateCount = isTruthyEnv(process.env.DISABLE_TIKTOKEN) || isTruthyEnv(process.env.USE_APPROXIMATE_TOKENS) + + return async (text: string): Promise => { + if (useApproximateCount) { + return getApproximateTokenCount(text) + } + + try { + return await getNumTokensWithTimeout(llm, text, getTokenCountTimeoutMs()) + } catch (error) { + useApproximateCount = true + console.warn('Failed to calculate number of tokens, falling back to approximate count', error) + return getApproximateTokenCount(text) + } + } +} + /** Reads a file from storage and returns a base64 data-URL string. */ const storedFileToBase64 = async (fileName: string, mime: string, options: ICommonObject): Promise => { const contents = await getFileFromStorage(fileName, options.orgId, options.chatflowid, options.chatId) From 58c4985df211515f94e9ab76e3c96cef82918826 Mon Sep 17 00:00:00 2001 From: ZeroIce Date: Fri, 26 Jun 2026 00:01:09 +0800 Subject: [PATCH 2/2] Address token counter review edge cases --- .../components/nodes/agentflow/utils.test.ts | 24 +++++++++++++++++++ packages/components/nodes/agentflow/utils.ts | 18 +++++++++++--- 2 files changed, 39 insertions(+), 3 deletions(-) diff --git a/packages/components/nodes/agentflow/utils.test.ts b/packages/components/nodes/agentflow/utils.test.ts index 3d2e1263731..b6d0ea9c75e 100644 --- a/packages/components/nodes/agentflow/utils.test.ts +++ b/packages/components/nodes/agentflow/utils.test.ts @@ -711,6 +711,30 @@ describe('createTokenCounter', () => { expect(warnSpy).toHaveBeenCalledTimes(1) }) + it('clears the timeout when token counting throws synchronously', async () => { + const clearTimeoutSpy = jest.spyOn(global, 'clearTimeout') + const llm = { + getNumTokens: jest.fn(() => { + throw new Error('sync tokenizer failure') + }) + } + const countTokens = createTokenCounter(llm) + + await expect(countTokens('12345678')).resolves.toBe(2) + expect(llm.getNumTokens).toHaveBeenCalledTimes(1) + expect(clearTimeoutSpy).toHaveBeenCalledTimes(1) + expect(warnSpy).toHaveBeenCalledTimes(1) + + clearTimeoutSpy.mockRestore() + }) + + it('uses approximate counts when the model token counter is unavailable', async () => { + const countTokens = createTokenCounter(undefined) + + await expect(countTokens('12345678')).resolves.toBe(2) + expect(warnSpy).not.toHaveBeenCalled() + }) + it('falls back to approximate counts after token counting times out', async () => { process.env.TIKTOKEN_TIMEOUT = '1' const llm = { getNumTokens: jest.fn(() => new Promise(() => undefined)) } diff --git a/packages/components/nodes/agentflow/utils.ts b/packages/components/nodes/agentflow/utils.ts index 36d50265e07..a3ba428dded 100644 --- a/packages/components/nodes/agentflow/utils.ts +++ b/packages/components/nodes/agentflow/utils.ts @@ -75,7 +75,16 @@ const getNumTokensWithTimeout = async (llm: TokenCountingModel, text: string, ti return new Promise((resolve, reject) => { const timeout = setTimeout(() => reject(new Error(`Token counting timed out after ${timeoutMs}ms`)), timeoutMs) - llm.getNumTokens(text).then( + let tokenCountPromise: Promise + try { + tokenCountPromise = Promise.resolve(llm.getNumTokens(text)) + } catch (error) { + clearTimeout(timeout) + reject(error) + return + } + + tokenCountPromise.then( (count) => { clearTimeout(timeout) resolve(count) @@ -90,8 +99,11 @@ const getNumTokensWithTimeout = async (llm: TokenCountingModel, text: string, ti export const getApproximateTokenCount = (text: string): number => Math.ceil((text || '').length / 4) -export const createTokenCounter = (llm: TokenCountingModel): ((text: string) => Promise) => { - let useApproximateCount = isTruthyEnv(process.env.DISABLE_TIKTOKEN) || isTruthyEnv(process.env.USE_APPROXIMATE_TOKENS) +export const createTokenCounter = (llm?: TokenCountingModel | null): ((text: string) => Promise) => { + let useApproximateCount = + isTruthyEnv(process.env.DISABLE_TIKTOKEN) || + isTruthyEnv(process.env.USE_APPROXIMATE_TOKENS) || + typeof llm?.getNumTokens !== 'function' return async (text: string): Promise => { if (useApproximateCount) {