Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
last_output count fixed, other nits addressed
  • Loading branch information
abhinavDhulipala committed Apr 4, 2026
commit a2972b36c73f719220b922143bbb878da88ada7e
2 changes: 1 addition & 1 deletion apps/sim/executor/execution/block-executor.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { createLogger, type Logger } from '@sim/logger'
import { redactApiKeys } from '@/lib/core/security/redaction'
import { hydrateCacheReferences } from '@/lib/paginated-cache/paginate'
import { getBaseUrl } from '@/lib/core/utils/urls'
import { hydrateCacheReferences } from '@/lib/paginated-cache/paginate'
import {
containsUserFileWithMetadata,
hydrateUserFilesWithBase64,
Expand Down
74 changes: 68 additions & 6 deletions apps/sim/lib/paginated-cache/paginate.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,7 @@ vi.mock('@/lib/paginated-cache/redis-cache', () => ({
import { autoPaginate, hydrateCacheReferences } from '@/lib/paginated-cache/paginate'
import type { ToolResponse } from '@/tools/types'

function makePageResponse(
items: unknown[],
hasMore: boolean,
cursor: string | null
): ToolResponse {
function makePageResponse(items: unknown[], hasMore: boolean, cursor: string | null): ToolResponse {
return {
success: true,
output: {
Expand Down Expand Up @@ -206,7 +202,73 @@ describe('autoPaginate', () => {
})

const storedCacheId = mockStoreMetadata.mock.calls[0][0] as string
expect(storedCacheId).toMatch(/^exec-42:zendesk_get_tickets:tickets:\d+$/)
expect(storedCacheId).toMatch(
/^exec-42:zendesk_get_tickets:tickets:[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$/
)
})

it('does not inject fields that the tool output does not have', async () => {
const noMetadataConfig = {
...paginationConfig,
pageField: 'items',
getItems: (output: Record<string, unknown>) => (output.items as unknown[]) ?? [],
}
const initialResult: ToolResponse = {
success: true,
output: {
items: [{ id: 1 }],
cursor: 'abc',
},
}

const result = await autoPaginate({
initialResult,
params: {},
paginationConfig: noMetadataConfig,
executeTool: mockExecuteTool,
toolId: 'custom_tool',
executionId: 'exec-1',
})

const outputKeys = Object.keys(result.output)
expect(outputKeys).toContain('items')
expect(outputKeys).toContain('cursor')
expect(outputKeys).not.toContain('metadata')
expect(outputKeys).not.toContain('paging')
})
})

describe('cleanupPaginatedCache', () => {
let mockScan: ReturnType<typeof vi.fn>
let mockDel: ReturnType<typeof vi.fn>

beforeEach(() => {
vi.clearAllMocks()
mockScan = vi.fn().mockResolvedValue(['0', []])
mockDel = vi.fn().mockResolvedValue(1)
mockGetRedisClient.mockReturnValue({ scan: mockScan, del: mockDel })
})

it('scans with prefix-based patterns and deletes matching keys', async () => {
mockScan
.mockResolvedValueOnce(['0', ['pagcache:page:exec-1:tool:field:uuid:0']])
.mockResolvedValueOnce(['0', ['pagcache:meta:exec-1:tool:field:uuid']])

const { cleanupPaginatedCache } = await import('@/lib/paginated-cache/paginate')
await cleanupPaginatedCache('exec-1')

expect(mockScan).toHaveBeenCalledWith('0', 'MATCH', 'pagcache:page:exec-1:*', 'COUNT', 100)
expect(mockScan).toHaveBeenCalledWith('0', 'MATCH', 'pagcache:meta:exec-1:*', 'COUNT', 100)
expect(mockDel).toHaveBeenCalledTimes(2)
})

it('no-ops when Redis is unavailable', async () => {
mockGetRedisClient.mockReturnValue(null)

const { cleanupPaginatedCache } = await import('@/lib/paginated-cache/paginate')
await cleanupPaginatedCache('exec-1')

expect(mockScan).not.toHaveBeenCalled()
})
})

Expand Down
77 changes: 46 additions & 31 deletions apps/sim/lib/paginated-cache/paginate.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
import crypto from 'node:crypto'
import { createLogger } from '@sim/logger'
import { getRedisClient } from '@/lib/core/config/redis'
import type { PaginatedCacheStorageAdapter } from '@/lib/paginated-cache/adapter'
import { RedisPaginatedCache } from '@/lib/paginated-cache/redis-cache'
import { isPaginatedCacheReference } from '@/lib/paginated-cache/types'
import type { PaginatedCacheReference, ToolPaginationConfig } from '@/lib/paginated-cache/types'
import { isPaginatedCacheReference } from '@/lib/paginated-cache/types'
import type { ToolResponse } from '@/tools/types'

const logger = createLogger('Paginator')

const DEFAULT_MAX_PAGES = 100
const DEFAULT_MAX_PAGES = 10_000

interface AutoPaginateOptions {
initialResult: ToolResponse
Expand All @@ -23,8 +25,14 @@ interface AutoPaginateOptions {
}

export async function autoPaginate(options: AutoPaginateOptions): Promise<ToolResponse> {
const { initialResult, params, paginationConfig: config, executeTool, toolId, executionId } =
options
const {
initialResult,
params,
paginationConfig: config,
executeTool,
toolId,
executionId,
} = options
const maxPages = config.maxPages ?? DEFAULT_MAX_PAGES

const redis = getRedisClient()
Expand All @@ -33,7 +41,7 @@ export async function autoPaginate(options: AutoPaginateOptions): Promise<ToolRe
}

const cache = new RedisPaginatedCache(redis)
const cacheId = `${executionId}:${toolId}:${config.pageField}:${Date.now()}`
const cacheId = `${executionId}:${toolId}:${config.pageField}:${crypto.randomUUID()}`

let totalItems = 0
let pageIndex = 0
Expand Down Expand Up @@ -97,7 +105,14 @@ export async function hydrateCacheReferences(
if (!containsCacheReference(inputs)) {
return inputs
}
return (await deepHydrate(inputs)) as Record<string, unknown>

const redis = getRedisClient()
if (!redis) {
throw new Error('Redis is required to hydrate paginated cache references but is not available')
}

const adapter = new RedisPaginatedCache(redis)
return (await deepHydrate(inputs, adapter)) as Record<string, unknown>
}

function containsCacheReference(value: unknown): boolean {
Expand All @@ -109,37 +124,35 @@ function containsCacheReference(value: unknown): boolean {
return false
}

async function deepHydrate(value: unknown): Promise<unknown> {
async function deepHydrate(
value: unknown,
adapter: PaginatedCacheStorageAdapter
): Promise<unknown> {
if (isPaginatedCacheReference(value)) {
return hydrateReference(value)
return hydrateReference(value, adapter)
}

if (Array.isArray(value)) {
return Promise.all(value.map(deepHydrate))
return Promise.all(value.map((v) => deepHydrate(v, adapter)))
}

if (typeof value === 'object' && value !== null) {
const entries = Object.entries(value as Record<string, unknown>)
const hydrated: Record<string, unknown> = {}
for (const [key, val] of entries) {
hydrated[key] = await deepHydrate(val)
hydrated[key] = await deepHydrate(val, adapter)
}
return hydrated
}

return value
}

async function hydrateReference(ref: PaginatedCacheReference): Promise<unknown[]> {
const redis = getRedisClient()
if (!redis) {
throw new Error(
`Redis is required to hydrate paginated cache reference (cacheId: ${ref.cacheId}) but is not available`
)
}

const cache = new RedisPaginatedCache(redis)
const pages = await cache.getAllPages(ref.cacheId, ref.totalPages)
async function hydrateReference(
ref: PaginatedCacheReference,
adapter: PaginatedCacheStorageAdapter
): Promise<unknown[]> {
const pages = await adapter.getAllPages(ref.cacheId, ref.totalPages)

const items: unknown[] = []
for (const page of pages) {
Expand All @@ -165,21 +178,23 @@ export async function cleanupPaginatedCache(executionId: string): Promise<void>
return
}

const pattern = `pagcache:*${executionId}:*`
const patterns = [`pagcache:page:${executionId}:*`, `pagcache:meta:${executionId}:*`]

try {
let cursor = '0'
let deletedCount = 0

do {
const [nextCursor, keys] = await redis.scan(cursor, 'MATCH', pattern, 'COUNT', 100)
cursor = nextCursor

if (keys.length > 0) {
await redis.del(...keys)
deletedCount += keys.length
}
} while (cursor !== '0')
for (const pattern of patterns) {
let cursor = '0'
do {
const [nextCursor, keys] = await redis.scan(cursor, 'MATCH', pattern, 'COUNT', 100)
cursor = nextCursor

if (keys.length > 0) {
await redis.del(...keys)
deletedCount += keys.length
}
} while (cursor !== '0')
}

if (deletedCount > 0) {
logger.info(`Cleaned up ${deletedCount} paginated cache entries for execution ${executionId}`)
Expand Down
2 changes: 1 addition & 1 deletion apps/sim/lib/paginated-cache/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ export interface ToolPaginationConfig<O = Record<string, unknown>> {
currentParams: Record<string, unknown>,
token: string | number
) => Record<string, unknown>
/** Maximum pages to fetch. Default: 100 */
/** Maximum pages to fetch. Default: 10,000 */
maxPages?: number
}

Expand Down
80 changes: 51 additions & 29 deletions apps/sim/tools/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import { generateRequestId } from '@/lib/core/utils/request'
import { getBaseUrl, getInternalApiBaseUrl } from '@/lib/core/utils/urls'
import { SIM_VIA_HEADER, serializeCallChain } from '@/lib/execution/call-chain'
import { parseMcpToolId } from '@/lib/mcp/utils'
import { autoPaginate } from '@/lib/paginated-cache/paginate'
import { isCustomTool, isMcpTool } from '@/executor/constants'
import { resolveSkillContent } from '@/executor/handlers/agent/skills-resolver'
import type { ExecutionContext } from '@/executor/types'
Expand All @@ -26,7 +27,6 @@ import type {
ToolResponse,
ToolRetryConfig,
} from '@/tools/types'
import { autoPaginate } from '@/lib/paginated-cache/paginate'
import { formatRequestParams, getTool, validateRequiredParametersAfterMerge } from '@/tools/utils'
import * as toolsUtilsServer from '@/tools/utils.server'

Expand Down Expand Up @@ -600,6 +600,40 @@ async function processFileOutputs(
}
}

/**
* If the tool has a pagination config and there are more pages, auto-paginate
* and replace the page field with a Redis cache reference.
*/
async function maybeAutoPaginate(
tool: ToolConfig,
finalResult: ToolResponse,
contextParams: Record<string, unknown>,
normalizedToolId: string,
skipPostProcess: boolean,
executionContext?: ExecutionContext
): Promise<ToolResponse> {
if (
!tool.pagination ||
!finalResult.success ||
skipPostProcess ||
!executionContext?.executionId
) {
return finalResult
}
Comment thread
abhinavDhulipala marked this conversation as resolved.
const nextToken = tool.pagination.getNextPageToken(finalResult.output)
if (nextToken === null) {
return finalResult
}
return autoPaginate({
initialResult: finalResult,
params: contextParams,
paginationConfig: tool.pagination,
executeTool,
toolId: normalizedToolId,
executionId: executionContext.executionId,
})
}

/**
* Execute a tool by making the appropriate HTTP request
* All requests go directly - internal routes use regular fetch, external use SSRF-protected fetch
Expand Down Expand Up @@ -820,20 +854,14 @@ export async function executeTool(
// Process file outputs if execution context is available
finalResult = await processFileOutputs(finalResult, tool, executionContext)

// Auto-paginate if tool has pagination config and there are more pages
if (tool.pagination && finalResult.success && !skipPostProcess && executionContext?.executionId) {
const nextToken = tool.pagination.getNextPageToken(finalResult.output)
if (nextToken !== null) {
finalResult = await autoPaginate({
initialResult: finalResult,
params: contextParams,
paginationConfig: tool.pagination,
executeTool,
toolId: normalizedToolId,
executionId: executionContext.executionId,
})
}
}
finalResult = await maybeAutoPaginate(
tool,
finalResult,
contextParams,
normalizedToolId,
skipPostProcess,
executionContext
)

// Add timing data to the result
const endTime = new Date()
Expand Down Expand Up @@ -890,20 +918,14 @@ export async function executeTool(
// Process file outputs if execution context is available
finalResult = await processFileOutputs(finalResult, tool, executionContext)

// Auto-paginate if tool has pagination config and there are more pages
if (tool.pagination && finalResult.success && !skipPostProcess && executionContext?.executionId) {
const nextToken = tool.pagination.getNextPageToken(finalResult.output)
if (nextToken !== null) {
finalResult = await autoPaginate({
initialResult: finalResult,
params: contextParams,
paginationConfig: tool.pagination,
executeTool,
toolId: normalizedToolId,
executionId: executionContext.executionId,
})
}
}
finalResult = await maybeAutoPaginate(
tool,
finalResult,
contextParams,
normalizedToolId,
skipPostProcess,
executionContext
)
Comment thread
cursor[bot] marked this conversation as resolved.

// Add timing data to the result
const endTime = new Date()
Expand Down
2 changes: 1 addition & 1 deletion apps/sim/tools/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ export interface ToolConfig<P = any, R = any> {
* Optional pagination configuration for tools that return paginated data.
* When provided, the executor automatically fetches all pages and caches them in Redis.
*/
pagination?: ToolPaginationConfig
pagination?: ToolPaginationConfig<R extends { output: infer O } ? O : Record<string, unknown>>
}

export interface TableRow {
Expand Down
12 changes: 5 additions & 7 deletions apps/sim/tools/zendesk/get_tickets.ts
Original file line number Diff line number Diff line change
Expand Up @@ -185,18 +185,16 @@ export const zendeskGetTicketsTool: ToolConfig<ZendeskGetTicketsParams, ZendeskG

pagination: {
pageField: 'tickets',
getItems: (output: Record<string, unknown>) => (output.tickets as unknown[]) ?? [],
getNextPageToken: (output: Record<string, unknown>) => {
const paging = output.paging as Record<string, unknown> | undefined
if (paging?.has_more && paging?.after_cursor) {
return paging.after_cursor as string
getItems: (output) => output.tickets ?? [],
getNextPageToken: (output) => {
if (output.paging?.has_more && output.paging?.after_cursor) {
return output.paging.after_cursor
}
return null
},
buildNextPageParams: (params: Record<string, unknown>, token: string | number) => ({
buildNextPageParams: (params, token) => ({
...params,
pageAfter: String(token),
}),
maxPages: 100,
},
}
Loading