diff --git a/apps/desktop/sidecar/src/main.ts b/apps/desktop/sidecar/src/main.ts index 53a19c8..721215b 100644 --- a/apps/desktop/sidecar/src/main.ts +++ b/apps/desktop/sidecar/src/main.ts @@ -61,6 +61,7 @@ function classify(e: unknown): ErrorCode { ) return "REDIS_AUTH"; if (msg.includes("econnrefused")) return "REDIS_REFUSED"; + if (msg.includes("connection is closed")) return "REDIS_TLS"; if (msg.includes("enotfound") || msg.includes("eai_again")) return "REDIS_DNS"; if (msg.includes("tls") || msg.includes("ssl")) return "REDIS_TLS"; @@ -154,6 +155,9 @@ async function main(): Promise { const server = Bun.serve({ hostname: "127.0.0.1", port: 0, + // Flow discovery can legitimately take >10s on large deployments. + // Raise idle timeout so `/api/flows` responses aren't dropped mid-compute. + idleTimeout: 60, fetch: app.fetch, }); diff --git a/apps/desktop/src-tauri/src/redis_ping.rs b/apps/desktop/src-tauri/src/redis_ping.rs index 2c7d5f4..abf788b 100644 --- a/apps/desktop/src-tauri/src/redis_ping.rs +++ b/apps/desktop/src-tauri/src/redis_ping.rs @@ -7,6 +7,24 @@ use crate::AppError; /// Returns the same `AppError.code` taxonomy as the sidecar so the UI has /// one switch statement. pub async fn ping(url: &str) -> Result<(), AppError> { + // Some managed Redis endpoints (notably AWS ElastiCache config endpoints) + // require TLS even when users type redis://. Mirror sidecar behavior: + // if the first ping looks like a TLS/closed-socket failure, retry once + // with rediss:// transparently. + match ping_once(url).await { + Ok(()) => Ok(()), + Err(first) => { + if should_retry_with_tls(url, &first) { + let tls_url = force_tls_url(url); + ping_once(&tls_url).await + } else { + Err(first) + } + } + } +} + +async fn ping_once(url: &str) -> Result<(), AppError> { let client = redis::Client::open(url).map_err(|e| AppError::new("REDIS_URL_INVALID", e.to_string()))?; let connect_fut = client.get_multiplexed_async_connection(); @@ -25,11 +43,24 @@ pub async fn ping(url: &str) -> Result<(), AppError> { } } +fn should_retry_with_tls(url: &str, err: &AppError) -> bool { + url.starts_with("redis://") + && (err.code == "REDIS_TLS" || err.message.to_lowercase().contains("connection is closed")) +} + +fn force_tls_url(url: &str) -> String { + url.replacen("redis://", "rediss://", 1) +} + fn classify(err: redis::RedisError) -> AppError { let kind = err.kind(); let msg = err.to_string(); let lower = msg.to_lowercase(); + if lower.contains("connection is closed") { + return AppError::new("REDIS_TLS", msg); + } + let code = match kind { redis::ErrorKind::AuthenticationFailed => "REDIS_AUTH", redis::ErrorKind::IoError => { diff --git a/apps/desktop/src-tauri/src/sidecar.rs b/apps/desktop/src-tauri/src/sidecar.rs index 1508db2..c7224d3 100644 --- a/apps/desktop/src-tauri/src/sidecar.rs +++ b/apps/desktop/src-tauri/src/sidecar.rs @@ -11,7 +11,7 @@ use tokio::time::timeout; use crate::AppError; -const HANDSHAKE_TIMEOUT: Duration = Duration::from_secs(8); +const HANDSHAKE_TIMEOUT: Duration = Duration::from_secs(50); /// One JSON-line event emitted by the sidecar on its stdout. Mirrors /// `apps/desktop/sidecar/src/main.ts`. We use `untagged` so we don't have diff --git a/apps/desktop/src/app.tsx b/apps/desktop/src/app.tsx index c70c519..6a6699d 100644 --- a/apps/desktop/src/app.tsx +++ b/apps/desktop/src/app.tsx @@ -117,8 +117,11 @@ export function App(): JSX.Element { try { await tauriPing(url); } catch (e) { - setPhase({ kind: "failed", stage: "ping", error: e as AppError }); - return; + const pingError = e as AppError; + if (!shouldProceedAfterPingFailure(pingError)) { + setPhase({ kind: "failed", stage: "ping", error: pingError }); + return; + } } // Prefer an explicit password override (auto-reconnect path) over the @@ -291,6 +294,20 @@ export function App(): JSX.Element { ); } +function shouldProceedAfterPingFailure(error: AppError): boolean { + const msg = (error.message || "").toLowerCase(); + if (error.code === "REDIS_TLS" || error.code === "REDIS_TIMEOUT") { + return true; + } + if ( + (error.code === "UNKNOWN" || error.code === "REDIS_REFUSED") && + msg.includes("connection is closed") + ) { + return true; + } + return false; +} + function Splash(): JSX.Element { return ( :*:meta` @@ -14,17 +14,8 @@ export async function discoverQueues( prefix = "bull", ): Promise { const normalized = normalizeConnection(connection); - const client = createScanClient(normalized); - - // Surface the underlying connection error (ECONNREFUSED, NOAUTH, EAI_AGAIN, - // ENOTFOUND, TLS errors, etc.) rather than letting ioredis bury it under a - // "max retries per request" wrapper. We swap the first error in via the - // event listener and reject the ping promise with it. - const firstError = captureFirstError(client); - try { - await Promise.race([client.ping(), firstError.promise]); - const names = await scanQueueNames(client, prefix); + const names = await discoverQueueNamesStandalone(normalized, prefix); return names.map( (name) => new Queue(name, { @@ -32,13 +23,44 @@ export async function discoverQueues( prefix, }), ); + } catch (error) { + if (!shouldTryClusterFallback(error, normalized)) { + throw error; + } + + const cluster = await connectClusterWithRetry(normalized); + try { + const names = await scanQueueNamesCluster(cluster, prefix); + return names.map( + (name) => + new Queue(name, { + connection: cluster, + prefix, + }), + ); + } catch (clusterError) { + cluster.disconnect(); + throw clusterError; + } + } +} + +async function discoverQueueNamesStandalone( + normalized: RedisOptions & { url?: string }, + prefix: string, +): Promise { + const client = createScanClient(normalized); + const firstError = captureFirstError(client); + try { + await Promise.race([client.ping(), firstError.promise]); + return scanQueueNames(client, prefix); } finally { firstError.dispose(); client.disconnect(); } } -function captureFirstError(client: Redis): { +function captureFirstError(client: Redis | Cluster): { promise: Promise; dispose: () => void; } { @@ -87,6 +109,84 @@ function createScanClient(opts: RedisOptions & { url?: string }): Redis { return new Redis({ ...rest, lazyConnect: false, maxRetriesPerRequest: 1 }); } +function createClusterClient(opts: RedisOptions & { url?: string }): Cluster { + const { url, ...rest } = opts; + const startupNode = getStartupNode(url, rest); + const parsed = url ? new URL(url) : null; + const usernameFromUrl = parsed?.username + ? decodeURIComponent(parsed.username) + : ""; + const passwordFromUrl = parsed?.password + ? decodeURIComponent(parsed.password) + : ""; + const isTls = (parsed?.protocol ?? "").toLowerCase() === "rediss:"; + + return new Cluster([startupNode], { + // Keep hostnames for TLS cert validation on managed Redis cluster nodes. + dnsLookup: (address, callback) => callback(null, address), + redisOptions: { + ...rest, + username: rest.username ?? (usernameFromUrl || undefined), + password: rest.password ?? (passwordFromUrl || undefined), + tls: rest.tls ?? (isTls ? {} : undefined), + lazyConnect: true, + maxRetriesPerRequest: 1, + }, + }); +} + +function getStartupNode( + url: string | undefined, + opts: RedisOptions, +): { host: string; port: number } { + if (url) { + const parsed = new URL(url); + return { + host: parsed.hostname, + port: parsed.port ? Number(parsed.port) : 6379, + }; + } + return { + host: opts.host ?? "127.0.0.1", + port: opts.port ?? 6379, + }; +} + +async function connectClusterWithRetry( + normalized: RedisOptions & { url?: string }, +): Promise { + let lastError: unknown; + for (let attempt = 0; attempt < 3; attempt++) { + const cluster = createClusterClient(normalized); + cluster.on("error", () => {}); + try { + try { + await withTimeout( + cluster.connect(), + 5000, + "Redis cluster connect timed out", + ); + } catch (connectError) { + if (!isAlreadyConnectingError(connectError)) { + throw connectError; + } + } + await withTimeout(cluster.ping(), 5000, "Redis cluster ping timed out"); + return cluster; + } catch (error) { + lastError = error; + cluster.disconnect(); + if (!isRetryableClusterError(error) || attempt >= 2) { + throw error; + } + await new Promise((resolve) => setTimeout(resolve, 250 * (attempt + 1))); + } + } + throw lastError instanceof Error + ? lastError + : new Error("Failed to connect to Redis cluster"); +} + /** * Cursored SCAN for `:*:meta` keys. BullMQ writes a meta key for each * queue on first use; using that as the discovery signal avoids matching @@ -118,6 +218,67 @@ async function scanQueueNames( return Array.from(names).sort(); } +async function scanQueueNamesCluster( + cluster: Cluster, + prefix: string, +): Promise { + const names = new Set(); + const nodes = cluster.nodes("master"); + for (const node of nodes) { + const fromNode = await scanQueueNames(node, prefix); + for (const name of fromNode) names.add(name); + } + return Array.from(names).sort(); +} + +function shouldTryClusterFallback( + error: unknown, + normalized: RedisOptions & { url?: string }, +): boolean { + if (isMovedError(error) || isRetryableClusterError(error)) { + return true; + } + const host = normalized.url ? new URL(normalized.url).hostname : normalized.host; + return typeof host === "string" && host.toLowerCase().startsWith("clustercfg."); +} + +function isMovedError(error: unknown): boolean { + if (!(error instanceof Error)) return false; + return /^\s*MOVED\s+\d+\s+\S+:\d+/i.test(error.message); +} + +function isRetryableClusterError(error: unknown): boolean { + if (!(error instanceof Error)) return false; + const msg = error.message.trim().toLowerCase(); + return ( + msg.includes("connection is closed") || + msg.includes("failed to refresh slots cache") || + msg.includes("cluster all failed") || + msg.includes("timed out") + ); +} + +function isAlreadyConnectingError(error: unknown): boolean { + if (!(error instanceof Error)) return false; + return /already connecting\/connected/i.test(error.message); +} + +async function withTimeout( + promise: Promise, + timeoutMs: number, + message: string, +): Promise { + let timer: ReturnType | undefined; + const timeoutPromise = new Promise((_, reject) => { + timer = setTimeout(() => reject(new Error(message)), timeoutMs); + }); + try { + return await Promise.race([promise, timeoutPromise]); + } finally { + if (timer) clearTimeout(timer); + } +} + function parseQueueName(key: string, prefix: string): string | null { const head = `${prefix}:`; const tail = ":meta"; diff --git a/packages/core/src/core/queue-manager.ts b/packages/core/src/core/queue-manager.ts index 69ec811..50a8aa6 100644 --- a/packages/core/src/core/queue-manager.ts +++ b/packages/core/src/core/queue-manager.ts @@ -1783,6 +1783,55 @@ export class QueueManager { return Object.keys(tags).length > 0 ? tags : undefined; } + private extractParentRef(job: Job): { queueName: string; id: string } | null { + if (job.parent?.id) { + const queueName = this.parseQueueNameFromQueueKey(job.parent.queueKey); + if (queueName) { + return { queueName, id: job.parent.id }; + } + } + + if (job.parentKey) { + return this.parseParentKey(job.parentKey); + } + + return null; + } + + private parseQueueNameFromQueueKey(queueKey?: string): string | null { + if (!queueKey) return null; + const firstColon = queueKey.indexOf(":"); + if (firstColon === -1 || firstColon === queueKey.length - 1) { + return null; + } + return queueKey.slice(firstColon + 1); + } + + private parseParentKey(parentKey: string): { queueName: string; id: string } | null { + // BullMQ parentKey format: "::". + // Both queueName and jobId can include ":", so resolve queueName by + // matching against known queue names (longest-first). + const firstColon = parentKey.indexOf(":"); + if (firstColon === -1 || firstColon === parentKey.length - 1) { + return null; + } + + const withoutPrefix = parentKey.slice(firstColon + 1); + const queueNames = Array.from(this.queues.keys()).sort( + (a, b) => b.length - a.length, + ); + for (const queueName of queueNames) { + const prefix = `${queueName}:`; + if (withoutPrefix.startsWith(prefix)) { + const id = withoutPrefix.slice(prefix.length); + if (id) { + return { queueName, id }; + } + } + } + return null; + } + /** * Get unique values for a specific tag field across all jobs */ @@ -2148,10 +2197,16 @@ export class QueueManager { const queueChecks = await Promise.all( queueEntries.map(async ([queueName, queue]) => { const counts = await this.getCachedJobCounts(queue); + // Flow roots can finish quickly and end up only in completed/failed. + // Don't gate discovery to active/waiting states only. const hasRelevantJobs = (counts.waiting || 0) > 0 || (counts["waiting-children"] || 0) > 0 || - (counts.active || 0) > 0; + (counts.active || 0) > 0 || + (counts.completed || 0) > 0 || + (counts.failed || 0) > 0 || + (counts.delayed || 0) > 0 || + (counts.prioritized || 0) > 0; return { queueName, queue, hasRelevantJobs }; }), ); @@ -2161,61 +2216,77 @@ export class QueueManager { return []; } - // Focus on waiting-children first (most likely to be flows) - // Then check other types with reduced limits - const queueResults = await Promise.all( - relevantQueues.map(async ({ queueName, queue }) => { + // Fast path: derive flow roots directly from BullMQ dependencies keys. + // This avoids expensive broad job scans on high-throughput installations. + const dependencyRootRefs = await this.getFlowRootRefsFromDependencies( + limit * 100, + ); + if (dependencyRootRefs.length > 0) { + const dependencyCandidates: { queueName: string; job: Job }[] = []; + for (const { queueName, id } of dependencyRootRefs) { + const queue = this.queues.get(queueName); + if (!queue) continue; try { - // Fetch waiting-children first (most likely flows) with higher limit - const waitingChildrenJobs = await queue.getJobs( - ["waiting-children"], - 0, - 50, - ); - - // If we already have enough flows, skip other types - if (waitingChildrenJobs.length >= limit) { - return { queueName, jobs: waitingChildrenJobs }; - } - - // Fetch other types with reduced limits - const otherTypes = [ - "active", - "waiting", - "prioritized", - "completed", - "failed", - "delayed", - ]; - const otherJobArrays = await Promise.all( - otherTypes.map(async (type) => { - try { - return await queue.getJobs(type as any, 0, 30); // Reduced from 100 - } catch { - return []; - } - }), - ); - - const allJobs = [...waitingChildrenJobs, ...otherJobArrays.flat()]; - return { queueName, jobs: allJobs }; + const job = await queue.getJob(id); + if (job?.id) dependencyCandidates.push({ queueName, job }); } catch { - return { queueName, jobs: [] }; + // Root may have been removed. } - }), - ); + } - // Collect potential root jobs (no parent) - // Early exit when we have enough flows - const seenJobIds = new Set(); - const potentialRoots: { queueName: string; job: Job }[] = []; + if (dependencyCandidates.length > 0) { + return this.summarizeFlowRoots(dependencyCandidates, limit); + } + } - for (const { queueName, jobs } of queueResults) { - // Early exit if we have enough flows - if (potentialRoots.length >= limit * 2) { - break; + // Focus on waiting-children first (most likely to be flows), then include + // other states. Use bounded concurrency to avoid connection churn on + // large Redis clusters. + const fetchQueueJobs = async (queueName: string, queue: Queue) => { + try { + // Keep scan narrow: these statuses carry enough signal for flow + // discovery while staying fast on large queue fleets. + const scanPlan: Array<{ type: JobStatus; limit: number }> = [ + { type: "waiting-children", limit: 100 }, + { type: "waiting", limit: 100 }, + { type: "completed", limit: 100 }, + { type: "failed", limit: 100 }, + { type: "active", limit: 30 }, + ]; + const jobArrays = await Promise.all( + scanPlan.map(async ({ type, limit: perTypeLimit }) => { + try { + return await queue.getJobs(type as any, 0, perTypeLimit); + } catch { + return []; + } + }), + ); + + return { queueName, jobs: jobArrays.flat() }; + } catch { + return { queueName, jobs: [] as Job[] }; } + }; + + const queueResults: { queueName: string; jobs: Job[] }[] = []; + const queueBatchSize = 5; + for (let i = 0; i < relevantQueues.length; i += queueBatchSize) { + const batch = relevantQueues.slice(i, i + queueBatchSize); + const batchResults = await Promise.all( + batch.map(({ queueName, queue }) => fetchQueueJobs(queueName, queue)), + ); + queueResults.push(...batchResults); + } + + // Build root candidates. Prefer deriving roots from child jobs' parent + // pointers (much faster and more reliable than brute-force scanning all + // root jobs on high-throughput queues). + const seenJobIds = new Set(); + const fallbackRoots: { queueName: string; job: Job }[] = []; + const parentRefs = new Map(); + for (const { queueName, jobs } of queueResults) { for (const job of jobs) { if (!job?.id) continue; @@ -2223,72 +2294,176 @@ export class QueueManager { if (seenJobIds.has(jobKey)) continue; seenJobIds.add(jobKey); - // Check if this is a root job (has no parent) - const hasParent = !!job.parent || !!job.parentKey; - if (!hasParent) { - potentialRoots.push({ queueName, job }); + const parentRef = this.extractParentRef(job); + if (parentRef) { + parentRefs.set(`${parentRef.queueName}:${parentRef.id}`, parentRef); + continue; + } - // Early exit if we have enough potential roots - if (potentialRoots.length >= limit * 2) { - break; - } + // Fallback path when we don't see child markers in sampled jobs. + fallbackRoots.push({ queueName, job }); + } + } + + const rootCandidates = new Map(); + + // Fast path: resolve parent refs gathered from child jobs. + for (const { queueName, id } of parentRefs.values()) { + const parentQueue = this.queues.get(queueName); + if (!parentQueue) continue; + try { + const parentJob = await parentQueue.getJob(id); + if (parentJob?.id) { + rootCandidates.set(`${queueName}:${parentJob.id}`, { + queueName, + job: parentJob, + }); } + } catch { + // Parent may have been removed between reads. } } - // Check flows in parallel (batch to avoid overwhelming Redis) - const batchSize = 20; - const flows: FlowSummary[] = []; + // If no child-derived roots were found, fall back to sampled root jobs. + if (rootCandidates.size === 0) { + for (const candidate of fallbackRoots) { + rootCandidates.set(`${candidate.queueName}:${candidate.job.id}`, candidate); + } + } - for ( - let i = 0; - i < potentialRoots.length && flows.length < limit; - i += batchSize - ) { - const batch = potentialRoots.slice(i, i + batchSize); - const batchResults = await Promise.all( - batch.map(async ({ queueName, job }) => { - try { - const flowTree = await this.flowProducer!.getFlow({ - id: job.id!, - queueName, - }); + const potentialRoots = Array.from(rootCandidates.values()); - if (flowTree?.children && flowTree.children.length > 0) { - const stats = this.countFlowStats(flowTree); - const state = await job.getState(); - - return { - id: job.id!, - name: job.name, - queueName, - status: state as JobStatus, - totalJobs: stats.total, - completedJobs: stats.completed, - failedJobs: stats.failed, - timestamp: job.timestamp, - duration: - job.finishedOn && job.processedOn - ? job.finishedOn - job.processedOn - : undefined, - } as FlowSummary; + return this.summarizeFlowRoots(potentialRoots, limit); + }); + } + + private async summarizeFlowRoots( + roots: { queueName: string; job: Job }[], + limit: number, + ): Promise { + const batchSize = 20; + const flows: FlowSummary[] = []; + + for (let i = 0; i < roots.length && flows.length < limit; i += batchSize) { + const batch = roots.slice(i, i + batchSize); + const batchResults = await Promise.all( + batch.map(async ({ queueName, job }) => { + try { + const flowTree = await this.flowProducer!.getFlow({ + id: job.id!, + queueName, + }); + + if (flowTree?.children && flowTree.children.length > 0) { + const stats = this.countFlowStats(flowTree); + let state: JobStatus = "unknown"; + try { + state = (await job.getState()) as JobStatus; + } catch { + // Keep flow visible even when state lookup is flaky on + // high-latency/shared Redis deployments. + if (job.finishedOn) { + state = job.failedReason ? "failed" : "completed"; + } else if (job.processedOn) { + state = "active"; + } else { + state = "waiting"; + } } - } catch { - // Job might not have a flow, skip + + return { + id: job.id!, + name: job.name, + queueName, + status: state as JobStatus, + totalJobs: stats.total, + completedJobs: stats.completed, + failedJobs: stats.failed, + timestamp: job.timestamp, + duration: + job.finishedOn && job.processedOn + ? job.finishedOn - job.processedOn + : undefined, + } as FlowSummary; } - return null; - }), - ); + } catch { + // Job might not have a flow, skip. + } + return null; + }), + ); + + for (const result of batchResults) { + if (result && flows.length < limit) { + flows.push(result); + } + } + } + + return flows.sort((a, b) => b.timestamp - a.timestamp); + } - for (const result of batchResults) { - if (result && flows.length < limit) { - flows.push(result); + private async getFlowRootRefsFromDependencies( + maxKeys: number, + ): Promise<{ queueName: string; id: string }[]> { + const firstQueue = this.queues.values().next().value as Queue | undefined; + if (!firstQueue) return []; + + const client = (firstQueue as any).client as + | { scan: (...args: unknown[]) => Promise<[string, string[]]> } + | undefined; + if (!client?.scan) return []; + + const prefix = firstQueue.opts?.prefix ?? "bull"; + const pattern = `${prefix}:*:dependencies`; + const queueNames = Array.from(this.queues.keys()).sort( + (a, b) => b.length - a.length, + ); + const refs = new Map(); + + let cursor = "0"; + do { + const [next, keys] = await client.scan( + cursor, + "MATCH", + pattern, + "COUNT", + 500, + ); + cursor = next; + + for (const key of keys) { + const ref = this.parseDependencyKey(key, prefix, queueNames); + if (ref) { + refs.set(`${ref.queueName}:${ref.id}`, ref); + if (refs.size >= maxKeys) { + return Array.from(refs.values()); } } } + } while (cursor !== "0"); - return flows.sort((a, b) => b.timestamp - a.timestamp); - }); + return Array.from(refs.values()); + } + + private parseDependencyKey( + key: string, + prefix: string, + queueNames: string[], + ): { queueName: string; id: string } | null { + const head = `${prefix}:`; + const tail = ":dependencies"; + if (!key.startsWith(head) || !key.endsWith(tail)) return null; + + const middle = key.slice(head.length, key.length - tail.length); + for (const queueName of queueNames) { + const queuePrefix = `${queueName}:`; + if (middle.startsWith(queuePrefix)) { + const id = middle.slice(queuePrefix.length); + if (id) return { queueName, id }; + } + } + return null; } /** @@ -2309,7 +2484,19 @@ export class QueueManager { return null; } - return this.convertFlowTree(flowTree); + const nodes = await this.convertFlowChildren(flowTree); + if (nodes.length === 0) { + return null; + } + if (nodes.length === 1) { + return nodes[0] ?? null; + } + + const [first, ...rest] = nodes; + return { + ...first!, + children: [...(first?.children ?? []), ...rest], + }; } catch { return null; } @@ -2353,7 +2540,11 @@ export class QueueManager { /** * Convert BullMQ flow tree to our FlowNode structure */ - private async convertFlowTree(tree: any): Promise { + private async convertFlowTree(tree: any): Promise { + if (!tree?.job) { + return null; + } + const job = tree.job; const state = await job.getState(); const duration = @@ -2391,10 +2582,18 @@ export class QueueManager { const children: FlowNode[] = []; if (tree.children && tree.children.length > 0) { for (const child of tree.children) { - children.push(await this.convertFlowTree(child)); + const childNodes = await this.convertFlowChildren(child); + children.push(...childNodes); } } + // Some BullMQ versions/flow shapes return children placeholders without + // materialized job payloads. Fallback to dependencies to resolve child jobs. + if (children.length === 0) { + const dependencyChildren = await this.buildChildrenFromDependencies(job); + children.push(...dependencyChildren); + } + return { job: jobInfo, queueName: job.queueName || tree.queueName || "", @@ -2402,6 +2601,84 @@ export class QueueManager { }; } + private async convertFlowChildren(tree: any): Promise { + if (!tree) return []; + + // BullMQ can include intermediary dependency nodes without an attached job. + // Flatten them so Flow Details still renders all concrete child jobs. + if (!tree.job) { + if (!tree.children || tree.children.length === 0) return []; + const flattened: FlowNode[] = []; + for (const child of tree.children) { + const nodes = await this.convertFlowChildren(child); + flattened.push(...nodes); + } + return flattened; + } + + const node = await this.convertFlowTree(tree); + return node ? [node] : []; + } + + private async buildChildrenFromDependencies(job: Job): Promise { + let deps: + | { + processed?: Record; + unprocessed?: string[] | Record; + failed?: string[] | Record; + ignored?: string[] | Record; + } + | undefined; + try { + deps = await job.getDependencies(); + } catch { + return []; + } + + if (!deps) return []; + + const depStatusMap = new Map(); + for (const key of Object.keys(deps.processed || {})) { + depStatusMap.set(key, "completed"); + } + if (Array.isArray(deps.failed)) { + for (const key of deps.failed) depStatusMap.set(key, "failed"); + } else { + for (const key of Object.keys(deps.failed || {})) depStatusMap.set(key, "failed"); + } + if (Array.isArray(deps.unprocessed)) { + for (const key of deps.unprocessed) depStatusMap.set(key, "waiting"); + } else { + for (const key of Object.keys(deps.unprocessed || {})) + depStatusMap.set(key, "waiting"); + } + if (Array.isArray(deps.ignored)) { + for (const key of deps.ignored) depStatusMap.set(key, "unknown"); + } else { + for (const key of Object.keys(deps.ignored || {})) + depStatusMap.set(key, "unknown"); + } + + const children: FlowNode[] = []; + for (const [depKey, status] of depStatusMap) { + const ref = this.parseParentKey(depKey); + if (!ref) continue; + const childInfo: JobInfo = { + id: ref.id, + name: ref.id, + data: {}, + opts: {}, + progress: 0, + attemptsMade: 0, + timestamp: job.timestamp ?? Date.now(), + status, + }; + children.push({ job: childInfo, queueName: ref.queueName }); + } + + return children; + } + /** * Count statistics for a flow tree */ @@ -2410,15 +2687,19 @@ export class QueueManager { completed: number; failed: number; } { - let total = 1; + if (!tree) { + return { total: 0, completed: 0, failed: 0 }; + } + + const job = tree.job; + let total = job ? 1 : 0; let completed = 0; let failed = 0; // Check current job status (synchronously from available data) - const job = tree.job; - if (job.finishedOn && !job.failedReason) { + if (job?.finishedOn && !job.failedReason) { completed = 1; - } else if (job.failedReason) { + } else if (job?.failedReason) { failed = 1; } diff --git a/packages/core/src/ui/lib/hooks.ts b/packages/core/src/ui/lib/hooks.ts index f94ec4e..19168d3 100644 --- a/packages/core/src/ui/lib/hooks.ts +++ b/packages/core/src/ui/lib/hooks.ts @@ -124,7 +124,9 @@ export function useMetrics() { return useQuery({ queryKey: queryKeys.metrics, queryFn: ({ signal }) => api.getMetrics(signal), - refetchInterval: 30000, // Refresh every 30 seconds (metrics are compute-heavy) + staleTime: 30_000, + refetchInterval: 60_000, // Refresh every 60 seconds (metrics are compute-heavy) + refetchOnWindowFocus: false, }); } @@ -484,7 +486,9 @@ export function useFlows() { return useQuery({ queryKey: queryKeys.flows, queryFn: ({ signal }) => api.getFlows(undefined, signal), - refetchInterval: 5000, + staleTime: 10_000, + refetchInterval: 15_000, + refetchOnWindowFocus: false, }); } @@ -497,7 +501,9 @@ export function useFlow(queueName: string, jobId: string) { queryFn: () => api.getFlow(queueName, jobId), enabled: !!queueName && !!jobId, retry: false, // Don't retry - flow might not exist - refetchInterval: 5000, + staleTime: 30_000, + refetchOnWindowFocus: false, + refetchInterval: false, }); } diff --git a/packages/core/src/ui/pages/flow.tsx b/packages/core/src/ui/pages/flow.tsx index 3d4e42c..964bfdd 100644 --- a/packages/core/src/ui/pages/flow.tsx +++ b/packages/core/src/ui/pages/flow.tsx @@ -1,6 +1,8 @@ import { useNavigate } from "@tanstack/react-router"; import { AlertCircle, + ChevronDown, + ChevronRight, CheckCircle2, Clock, GitBranch, @@ -8,9 +10,10 @@ import { Network, XCircle, } from "lucide-react"; -import { FlowGraph } from "@/components/flows"; +import * as React from "react"; import { EmptyState } from "@/components/shared/empty-state"; import { StatusBadge } from "@/components/shared/status-badge"; +import { Button } from "@/components/ui/button"; import type { FlowNode } from "@/core/types"; import { useFlow } from "@/lib/hooks"; import { formatDuration } from "@/lib/utils"; @@ -22,6 +25,9 @@ interface FlowPageProps { export function FlowPage({ queueName, jobId }: FlowPageProps) { const navigate = useNavigate(); + const [expandedQueues, setExpandedQueues] = React.useState>( + () => new Set(), + ); const { data: flow, isLoading, error } = useFlow(queueName, jobId); const handleNodeClick = (node: FlowNode) => { @@ -31,6 +37,30 @@ export function FlowPage({ queueName, jobId }: FlowPageProps) { }); }; + const stats = React.useMemo( + () => (flow ? countFlowStats(flow) : { total: 0, completed: 0, failed: 0 }), + [flow], + ); + const childJobs = React.useMemo( + () => collectDescendants(flow?.children ?? []), + [flow?.children], + ); + const childQueueGroups = React.useMemo(() => { + const groups = new Map(); + for (const child of childJobs) { + const nodes = groups.get(child.queueName) ?? []; + nodes.push(child); + groups.set(child.queueName, nodes); + } + + return Array.from(groups.entries()) + .map(([name, jobs]) => ({ + queueName: name, + jobs, + })) + .sort((a, b) => b.jobs.length - a.jobs.length); + }, [childJobs]); + // Loading state if (isLoading) { return ( @@ -71,8 +101,17 @@ export function FlowPage({ queueName, jobId }: FlowPageProps) { ); } - // Count stats from flow tree - const stats = countFlowStats(flow); + const toggleQueue = (queue: string) => { + setExpandedQueues((prev) => { + const next = new Set(prev); + if (next.has(queue)) { + next.delete(queue); + } else { + next.add(queue); + } + return next; + }); + }; return (
@@ -132,14 +171,133 @@ export function FlowPage({ queueName, jobId }: FlowPageProps) {
- {/* Flow Graph - Full Width with dotted background */} -
- +
+
+
+

Parent Job Queue

+
+ +
+ +
+
+

Child Queues

+ + {childJobs.length} child jobs in {childQueueGroups.length} queues + +
+ + {childQueueGroups.length === 0 ? ( +
+ No child queues found for this flow +
+ ) : ( + <> +
+ {childQueueGroups.map((group) => { + const isExpanded = expandedQueues.has(group.queueName); + + return ( +
+
+ + + +
+ + {isExpanded && ( +
+
+ {group.jobs.map((child) => ( + + ))} +
+
+ )} +
+ ); + })} +
+ + )} +
); } +function collectDescendants(nodes: FlowNode[]): FlowNode[] { + const result: FlowNode[] = []; + const stack = [...nodes]; + while (stack.length > 0) { + const current = stack.shift(); + if (!current) continue; + result.push(current); + if (current.children?.length) { + stack.push(...current.children); + } + } + return result; +} + function countFlowStats(node: FlowNode): { total: number; completed: number; diff --git a/packages/core/src/ui/router.tsx b/packages/core/src/ui/router.tsx index 63950f8..4f9a4b0 100644 --- a/packages/core/src/ui/router.tsx +++ b/packages/core/src/ui/router.tsx @@ -8,12 +8,16 @@ import { useParams, useSearch, } from "@tanstack/react-router"; +import { ArrowLeft } from "lucide-react"; import * as React from "react"; import { z } from "zod"; +import { useQueryClient } from "@tanstack/react-query"; import { AppSidebar, type NavItem } from "@/components/app-sidebar"; import { CommandPalette } from "@/components/layout/command-palette"; import { HeaderSearch } from "@/components/layout/header-search"; +import { Button } from "@/components/ui/button"; import { useConfig, useQueueNames, useQueues } from "@/lib/hooks"; +import { api } from "@/lib/api"; import { FlowPage } from "@/pages/flow"; import { FlowsPage } from "@/pages/flows"; import { JobPage } from "@/pages/job"; @@ -314,11 +318,29 @@ function PageLayout({ children: React.ReactNode; }) { const context = useSearchContext(); + const navigate = useNavigate(); + const queryClient = useQueryClient(); + const [refreshing, setRefreshing] = React.useState(false); return ( <>
+

{title}

{subtitle && ( @@ -331,6 +353,22 @@ function PageLayout({ onValueChange={context.setSearchQuery} onFocus={() => context.setCommandOpen(true)} /> +
{children}