Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 23 additions & 4 deletions packages/core/src/core/queue-manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,18 @@ export class QueueManager {
max: 100, // Max 100 entries
ttl: 1000 * 60, // Default 1 minute TTL
allowStale: false, // Don't return stale entries
updateAgeOnGet: true, // Reset TTL on access
// Do NOT reset TTL on read. These entries back continuously-polled views
// (overview, queue list, metrics). With updateAgeOnGet enabled, an open
// dashboard re-read each entry before its TTL elapsed, so the age kept
// resetting and the entry never expired — the overview showed stale data
// indefinitely, even across hard refreshes (see #25).
updateAgeOnGet: false,
});

private readonly CACHE_TTL = {
metrics: 5 * 60 * 1000, // 5 minutes - metrics are expensive
overview: 2 * 60 * 1000, // 2 minutes
queues: 2 * 60 * 1000, // 2 minutes
overview: 5 * 1000, // 5s - backs the live overview (frontend polls every 5s)
queues: 5 * 1000, // 5s - backs the live queue list (frontend polls every 5s)
flows: 2 * 60 * 1000, // 2 minutes
activity: 5 * 60 * 1000, // 5 minutes - activity timeline
};
Expand Down Expand Up @@ -220,12 +225,24 @@ export class QueueManager {

// Invalidate main cache entries that might be affected
// These are expensive to recompute, so we invalidate them
// to ensure accuracy after mutations
// to ensure accuracy after mutations. "overview" is derived from "queues",
// so both must be cleared together.
this.cache.delete("metrics");
this.cache.delete("queues");
this.cache.delete("overview");
this.cache.delete("activity");
}

/**
* Invalidate caches that reflect queue-level state (pause status).
* Called after pause/resume so the queue list and overview update
* immediately instead of waiting for the cache TTL.
*/
private invalidateQueueStateCache(): void {
this.cache.delete("queues");
this.cache.delete("overview");
}

/**
* Clear cache (useful after mutations)
*/
Expand Down Expand Up @@ -414,6 +431,7 @@ export class QueueManager {
throw new Error(`Queue "${queueName}" not found`);
}
await queue.pause();
this.invalidateQueueStateCache();
}

/**
Expand All @@ -425,6 +443,7 @@ export class QueueManager {
throw new Error(`Queue "${queueName}" not found`);
}
await queue.resume();
this.invalidateQueueStateCache();
}

/**
Expand Down