Skip to content

Fix build queue slot refilling#66

Closed
Moskize91 wants to merge 1 commit into
mainfrom
fix/queue-slot-scheduler
Closed

Fix build queue slot refilling#66
Moskize91 wants to merge 1 commit into
mainfrom
fix/queue-slot-scheduler

Conversation

@Moskize91

Copy link
Copy Markdown
Contributor

Summary

  • refactor the build queue worker into independent slot loops that each claim one job at a time
  • keep idle slots alive while any job is still running so newly added jobs can be picked up immediately
  • add a regression test for adding a second job while the first job is still running

Tests

  • pnpm test:run
  • pnpm typecheck
  • pnpm lint
  • pnpm format:check

@coderabbitai

coderabbitai Bot commented Jun 18, 2026

Copy link
Copy Markdown

Review Change Stack

Summary by CodeRabbit

  • Refactor

    • Enhanced build job worker scheduling with improved slot-based job claiming and idle timeout management for better handling of concurrent jobs.
  • Tests

    • Added integration test coverage for concurrent job execution.

Walkthrough

The runBuildJobWorker scheduling logic is refactored from claiming a batch of queued jobs per loop iteration to running one async runSlot loop per concurrency slot. Each slot independently heartbeats, recovers stale jobs, claims exactly one queued job via the new claimQueuedBuildJob (replacing batch claimQueuedBuildJobs), executes it, and loops. Idle-exit tracking changes from a single lastWorkAt timestamp to busySlotCount and idleSince, so the worker exits only after all slots have been idle for the configured timeout. A new executeClaimedBuildJob helper centralizes per-job lifecycle: emitting the started event, calling executeJob, and recording success or failure. A new integration test and withTimeout helper verify that idle slots pick up newly enqueued jobs while other slots are still occupied.

🚥 Pre-merge checks | ✅ 3 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Title check ⚠️ Warning The title does not follow the required format of <type>(<scope>): <subject>. It lacks the type prefix and scope designation. Reformat the title to match the required format, e.g., fix(build-queue): refill slots while jobs are running or similar.
✅ Passed checks (3 passed)
Check name Status Explanation
Description check ✅ Passed The description clearly relates to the changeset, detailing the refactoring of slot loops, idle slot improvements, and the new regression test.
Linked Issues check ✅ Passed Check skipped because no linked issues were found for this pull request.
Out of Scope Changes check ✅ Passed Check skipped because no linked issues were found for this pull request.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
✨ Simplify code
  • Create PR with simplified code
  • Commit simplified code in branch fix/queue-slot-scheduler

Comment @coderabbitai help to get the list of available commands and usage tips.

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 4

🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@src/facade/build-queue.ts`:
- Around line 540-542: The Promise.all call in the slot execution block rejects
immediately on the first error, causing the finally block to execute and clear
heartbeats, release the lease, and close state while other slots are still
executing jobs. Replace Promise.all with Promise.allSettled when creating the
array of promises from the runSlot() calls to ensure all slots complete
execution before the finally block cleans up resources.
- Around line 559-570: The markBuildJobSucceeded call is currently inside the
same try-catch block as the job execution and start event recording, which means
if success-finalization fails, it gets caught and incorrectly recorded as a job
failure. Move the markBuildJobSucceeded call outside the try-catch block so the
catch block only handles errors from appendBuildJobEvent and executeJob. This
ensures success-finalization errors surface separately instead of being
converted into job failures.
- Around line 781-791: The UPDATE statement in the state.run() call attempts to
claim a job by updating its state from 'queued' to 'running', but the subsequent
call to requireBuildJobById(state, job.jobId) returns the job regardless of
whether the UPDATE actually succeeded (affected zero or one row). Capture the
result from state.run() to verify that at least one row was updated, confirming
the job was actually claimed before it's returned. If no rows were updated, the
job is no longer queued (possibly claimed by another process) and should not be
returned for execution.

In `@test/facade/build-queue.test.ts`:
- Around line 262-275: The cleanup calls to releaseFirst() and releaseSecond()
will not execute if the withTimeout assertion for secondStartedSignal fails,
leaving jobs blocked and the worker running. Wrap the code block containing the
addBuildJob call and the withTimeout assertion in a try/finally block to ensure
releaseFirst() and releaseSecond() are always called in the finally block.
Additionally, add a timeout message to the await firstStartedSignal call for
bounded failure diagnostics similar to the second signal timeout.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 0a0b0c7f-36e5-4039-8b2e-b2a810481b0a

📥 Commits

Reviewing files that changed from the base of the PR and between 4a01b5f and 846ee69.

📒 Files selected for processing (2)
  • src/facade/build-queue.ts
  • test/facade/build-queue.test.ts

Comment thread src/facade/build-queue.ts
Comment on lines +540 to +542
await Promise.all(
Array.from({ length: concurrency }, async () => await runSlot()),
);

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Wait for all slots before releasing the worker lease.

Promise.all rejects on the first slot error, so the finally block can clear heartbeats, release the lease, and close state while other slots are still executing jobs. That can make an in-flight job look stale or leave sibling slots using a closed database.

Proposed fix
-    await Promise.all(
-      Array.from({ length: concurrency }, async () => await runSlot()),
-    );
+    const slotErrors: unknown[] = [];
+
+    await Promise.all(
+      Array.from({ length: concurrency }, async () => {
+        try {
+          await runSlot();
+        } catch (error) {
+          stopping = true;
+          slotErrors.push(error);
+        }
+      }),
+    );
+
+    if (slotErrors.length > 0) {
+      throw slotErrors[0];
+    }
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@src/facade/build-queue.ts` around lines 540 - 542, The Promise.all call in
the slot execution block rejects immediately on the first error, causing the
finally block to execute and clear heartbeats, release the lease, and close
state while other slots are still executing jobs. Replace Promise.all with
Promise.allSettled when creating the array of promises from the runSlot() calls
to ensure all slots complete execution before the finally block cleans up
resources.

Comment thread src/facade/build-queue.ts
Comment on lines +559 to +570
try {
await appendBuildJobEvent(job, {
at: Date.now(),
jobId: job.jobId,
seq: 0,
state: "running",
type: "started",
});
await options.executeJob(job, reporter);
await markBuildJobSucceeded(job.jobId, ownerId);
} catch (error) {
await markBuildJobFailed(job.jobId, ownerId, error);

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Don’t convert success-finalization failures into job failures.

Line 568 is inside the same try as executeJob, so a failure while recording success is caught and then recorded as a job failure. Keep the catch scoped to the actual job execution/start path and let success-finalization errors surface separately.

Proposed fix
   try {
     await appendBuildJobEvent(job, {
       at: Date.now(),
       jobId: job.jobId,
       seq: 0,
       state: "running",
       type: "started",
     });
     await options.executeJob(job, reporter);
-    await markBuildJobSucceeded(job.jobId, ownerId);
   } catch (error) {
     await markBuildJobFailed(job.jobId, ownerId, error);
+    return;
   }
+
+  await markBuildJobSucceeded(job.jobId, ownerId);
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
try {
await appendBuildJobEvent(job, {
at: Date.now(),
jobId: job.jobId,
seq: 0,
state: "running",
type: "started",
});
await options.executeJob(job, reporter);
await markBuildJobSucceeded(job.jobId, ownerId);
} catch (error) {
await markBuildJobFailed(job.jobId, ownerId, error);
try {
await appendBuildJobEvent(job, {
at: Date.now(),
jobId: job.jobId,
seq: 0,
state: "running",
type: "started",
});
await options.executeJob(job, reporter);
} catch (error) {
await markBuildJobFailed(job.jobId, ownerId, error);
return;
}
await markBuildJobSucceeded(job.jobId, ownerId);
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@src/facade/build-queue.ts` around lines 559 - 570, The markBuildJobSucceeded
call is currently inside the same try-catch block as the job execution and start
event recording, which means if success-finalization fails, it gets caught and
incorrectly recorded as a job failure. Move the markBuildJobSucceeded call
outside the try-catch block so the catch block only handles errors from
appendBuildJobEvent and executeJob. This ensures success-finalization errors
surface separately instead of being converted into job failures.

Comment thread src/facade/build-queue.ts
Comment on lines +781 to +791
const now = Date.now();
await state.run(
`
UPDATE build_jobs
SET state = 'running', owner_id = ?, owner_pid = ?, updated_at = ?
WHERE job_id = ? AND state = 'queued'
`,
[ownerId, process.pid, now, job.jobId],
);
}

return await Promise.all(
jobs.map(async (job) => await requireBuildJobById(state, job.jobId)),
[ownerId, process.pid, now, job.jobId],
);

return await requireBuildJobById(state, job.jobId);

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Verify the conditional claim update before returning the job.

Line 786 allows the UPDATE to affect zero rows if the selected job is no longer queued, but Line 791 still returns that job. Re-read and verify ownerId/state before handing it to executeJob.

Proposed fix
     await state.run(
       `
 UPDATE build_jobs
 SET state = 'running', owner_id = ?, owner_pid = ?, updated_at = ?
 WHERE job_id = ? AND state = 'queued'
 `,
       [ownerId, process.pid, now, job.jobId],
     );
 
-    return await requireBuildJobById(state, job.jobId);
+    const claimedJob = await requireBuildJobById(state, job.jobId);
+
+    if (claimedJob.ownerId !== ownerId || claimedJob.state !== "running") {
+      return undefined;
+    }
+
+    return claimedJob;
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@src/facade/build-queue.ts` around lines 781 - 791, The UPDATE statement in
the state.run() call attempts to claim a job by updating its state from 'queued'
to 'running', but the subsequent call to requireBuildJobById(state, job.jobId)
returns the job regardless of whether the UPDATE actually succeeded (affected
zero or one row). Capture the result from state.run() to verify that at least
one row was updated, confirming the job was actually claimed before it's
returned. If no rows were updated, the job is no longer queued (possibly claimed
by another process) and should not be returned for execution.

Comment on lines +262 to +275
await firstStartedSignal;
await addBuildJob({
archivePath: `${path}/book.sdpub`,
chapterId: 2,
target: "graph",
});
await withTimeout(
secondStartedSignal,
"Timed out waiting for idle queue slot to claim the second job.",
);

releaseFirst();
releaseSecond();
await worker;

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Release the blocked jobs even when the timeout assertion fails.

If withTimeout(secondStartedSignal, ...) rejects, Lines 273-275 are skipped, leaving the first job blocked and the worker heartbeat running. Wrap the assertion path in try/finally; this also gives the first-start wait a bounded failure message.

Proposed fix
-      await firstStartedSignal;
-      await addBuildJob({
-        archivePath: `${path}/book.sdpub`,
-        chapterId: 2,
-        target: "graph",
-      });
-      await withTimeout(
-        secondStartedSignal,
-        "Timed out waiting for idle queue slot to claim the second job.",
-      );
-
-      releaseFirst();
-      releaseSecond();
-      await worker;
+      try {
+        await withTimeout(
+          firstStartedSignal,
+          "Timed out waiting for the first job to start.",
+        );
+        await addBuildJob({
+          archivePath: `${path}/book.sdpub`,
+          chapterId: 2,
+          target: "graph",
+        });
+        await withTimeout(
+          secondStartedSignal,
+          "Timed out waiting for idle queue slot to claim the second job.",
+        );
+      } finally {
+        releaseFirst();
+        releaseSecond();
+        await withTimeout(
+          worker,
+          "Timed out waiting for the build worker to stop.",
+        );
+      }
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@test/facade/build-queue.test.ts` around lines 262 - 275, The cleanup calls to
releaseFirst() and releaseSecond() will not execute if the withTimeout assertion
for secondStartedSignal fails, leaving jobs blocked and the worker running. Wrap
the code block containing the addBuildJob call and the withTimeout assertion in
a try/finally block to ensure releaseFirst() and releaseSecond() are always
called in the finally block. Additionally, add a timeout message to the await
firstStartedSignal call for bounded failure diagnostics similar to the second
signal timeout.

@Moskize91 Moskize91 closed this Jun 18, 2026
@Moskize91 Moskize91 deleted the fix/queue-slot-scheduler branch June 18, 2026 09:17
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant