Skip to content

Unify concurrency grant + limit chain code path#326

Merged
kirinrastogi merged 15 commits into
mainfrom
yo/limit-chain-refactor
Jun 1, 2026
Merged

Unify concurrency grant + limit chain code path#326
kirinrastogi merged 15 commits into
mainfrom
yo/limit-chain-refactor

Conversation

@udnay

@udnay udnay commented May 23, 2026

Copy link
Copy Markdown
Contributor

Unify concurrency grant + limit chain code path

Previously, three places independently decided what to write when a
concurrency slot was granted: process_grants (background scanner),
process_ticket_request_task (future-scheduled tickets at dequeue), and
enqueue_limit_task_at_index (the canonical chain walker). The first two
short-cut directly to a single-queue RunAttempt, silently dropping any
remaining limits and leaking prior holders when multi-limit chains
deferred past the first concurrency limit.

This change persists the chain state (task_id, limit_index,
held_queues, full limits list) on deferred requests and tickets so
process_grants and the dequeue ticket path can route every grant
through the same enqueue_limit_task_at_index chain walker via a new
LimitChainResumer trait (Weak-shard impl breaks the Arc cycle).

Side benefits: dequeue no longer fetches JobInfo for ticket grants
(limits ride on the ticket), the chain's task_id is preserved across
deferred grants so prior-holders stay reachable by the same id, and
follow-up tasks are written at now_ms to avoid the broker tombstone.

Chain walker: self-clean partial grants on error; extend bench + tests

enqueue_limit_task_at_index accumulated in-memory grants via
try_reserve as it walked the limits, but if any interior ? fired
(e.g. a db.get for floating-limit state, handle_enqueue on a
downstream try_reserve, put_task) the already-made reservations were
silently leaked — both call sites (process_grants' chain resumer and
handle_request_ticket) treated Err as "no chain_grants to track."

Split the function into a thin wrapper plus an inner walk_limit_chain.
The inner walks the limits and pushes successful grants into a
caller-owned Vec as it goes. The wrapper rolls those grants back via
rollback_grant if the inner returns Err. Callers now observe the
"all-or-nothing" semantics they assumed: Ok(grants) means every entry
is a live in-memory reservation with a pending batch write, Err(_)
means no leftover in-memory state.

Also:

  • silo-bench: enqueue jobs with 1 Concurrency + 1-2 FloatingConcurrency
    limits, drive ReportRefreshOutcome with random new max_concurrency
    values, and audit holders across all shards after a 120s drain phase.
  • New test deferred_concurrency_resumes_into_rate_limit pins the
    multi-limit chain resume case: Conc-A grants immediately at enqueue,
    Conc-B defers via grant scanner, and the scanner's chain MUST emit a
    CheckRateLimit (not a RunAttempt) carrying held_queues = [A, B].
  • Added a diagnostic tracing::warn! in try_reserve_internal that
    fires if a grant ever lands past the limit — silent in current
    testing, kept as a tripwire.

Chain resumer: write at task_key(now_ms) to dodge broker tombstones

ShardChainResumer::resume_chain emitted the resumed RunAttempt at
task_key(params.start_at_ms, ...) — the same key the original enqueue
had written (then deleted in-batch) when the job first queued. If the
broker scan momentarily observed that interim RunAttempt before the
delete won in the LSM, a worker could lease and ack-delete it,
installing a tombstone for the key. The resumer's later write at the
same key was then silently suppressed, stranding the holders it had
granted. Mirrors the precedent in handle_request_ticket (dequeue.rs)
and the project_broker_tombstone_chain_continuation memory.

Also extends the tasks SQL view with task_id and held_queues so future
leak audits can directly compare leaked-holder task_ids to the
RunAttempt tasks in the DB.

Regression test pins the invariant: after process_grants resumes a
queued chain, the terminal RunAttempt's task_key.start_time_ms must be

= now-at-release, strictly greater than the original enqueue's
start_at_ms. The test fails on the prior code (start_time_ms equals
the original enqueue time) and passes with the fix.

cargo fmt

Split LimitTaskParams: scheduled_at_ms + task_key_start_ms

The prior commit overloaded one field (start_at_ms) to mean both "the
job's scheduled start time" and "the time component baked into every
task_key this walk writes." Working but confusing. Splitting clarifies
the contract at the type level:

  • scheduled_at_ms — drives the > now_ms future-scheduling decision
    and is persisted on EnqueueTask records so process_grants can skip
    future-dated requests on resume.
  • task_key_start_ms — the time component used in every task_key /
    concurrency_request_key the walk writes.

Call sites:

  • Original enqueue / import / reimport / retry: both fields equal
    (a fresh attempt_number guarantees no tombstone collision).
  • ShardChainResumer::resume_chain: scheduled_at_ms = the original
    start_at_ms (honest); task_key_start_ms = now_ms (dodges the broker
    tombstone for the same reason as before).
  • handle_request_ticket / handle_check_rate_limit chain continuations:
    both = now_ms (already had this semantics).

handle_enqueue's signature gains a parameter — also takes both — and
internally uses scheduled_at_ms for the future-scheduling check while
every key write uses task_key_start_ms.

No behavioural change vs the prior commit; this is purely a naming /
type-level refactor to make the contract explicit.

Stop writing interim RunAttempts in append_grant_edits

append_grant_edits wrote both the holder and a RunAttempt(held=[just this queue]) at task_key on every immediate grant — a leftover from
when handle_enqueue was a standalone "grant a slot and create the
task" combo, predating the chain walker. With the walker in place that
interim is purely redundant: it gets overwritten by the terminal
RunAttempt write at the end of the walk, deleted by the Queued-branch
cleanup when a later limit queues, or overwritten by put_task when a
RateLimit step writes a CheckRateLimit at the same task_key.

Removing it closes a real correctness gap. If the slatedb scan ever
observed an in-flight intermediate state of the chain's WriteBatch, the
broker buffer could capture an interim RunAttempt whose held_queues
contains only one queue — and a worker leasing it would
under-release on completion, leaking the other queues' holders. After
this change the broker can only ever see the terminal RunAttempt with
the full held_queues, regardless of slatedb's batch-vs-scan
visibility semantics.

Also drops the conditional writer.delete(task_key(...)) from
walk_limit_chain's Queued branches — no interim exists to clean up.

No behavioural change for any caller; tests still green (209 across
holder_leak/floating/concurrency/dequeue/enqueue/retry/import/
rate_limit/etc.).

Fix held_queues leaks in cancel/terminal paths; rollback grants on handler error

  • handle_request_ticket: release stored_held_queues when dropping the
    ticket due to terminal/missing/unreadable job status. Without this,
    any concurrency holders earlier chain steps accumulated leak.
  • cancel.rs: cancelling a FutureRequestTaskWritten ticket now deletes
    its persisted held_queues and tracks them for post-commit
    atomic_release; previously the holders were silently orphaned.
  • dequeue.rs: per-iteration handler Err now rolls back
    state.grants_to_rollback so in-memory try_reserve reservations don't
    leak when a handler bails after granting.
  • Cover the cancel case with a regression test; tighten the existing
    two-concurrency-limits chain test (use A capacity=3 so the
    resumable-chain state is guaranteed) and assert it instead of just
    observing it.
  • Add a debug_assert on the future-RequestTicket branch enforcing
    task_key_start_ms == scheduled_at_ms, plus a comment explaining why
    breaking out of the grant-scan loop on the first future request is
    safe.

Close remaining held-queue leaks in cancel + dequeue paths

  • cancel.rs: new Task::CheckRateLimit arm releases held_queues when
    cancelling a job parked on a rate-limit step.
  • cancel.rs: delete_concurrency_requests_with_prefix decodes each
    request value, deletes the holders its persisted held_queues point
    at, and surfaces them to the caller so the existing post-commit
    atomic_release path covers immediate-deferred requests too.
  • Refactor cancel_job_inner to track holders as a (task_id, queue)
    vec rather than a single (Option<task_id>, Vec) so the
    RunAttempt / RequestTicket / CheckRateLimit / TicketRequested paths
    can all accumulate uniformly.
  • dequeue.rs: handle_check_rate_limit gets the same
    Succeeded/Failed/Cancelled short-circuit as handle_request_ticket.
    Holders are released; Gubernator is not called for doomed jobs.
  • concurrency.rs grant scanner: treat an empty decoded limits on a
    request as corrupt (warn + delete + skip) instead of silently
    granting via the limit=1 fallback. The chain resumer with
    limits=[] would otherwise fabricate a terminal RunAttempt that
    bypasses every gate.
  • limit_chain.rs: ShardChainResumer::resume_chain takes now_ms from
    ResumeChainParams instead of re-capturing it. The scanner and
    resumer can now never disagree about the timestamp baked into the
    same batch.
  • concurrency.rs: switch chain_resumer's storage from OnceLock to
    Mutex<Option<…>> so tests can take_chain_resumer_for_test() to
    exercise the scanner's release-and-bail branch.

Regression tests:

  • cancel_check_rate_limit_releases_held_queues
  • cancel_deferred_concurrency_request_releases_held_queues
  • grant_scanner_releases_reservations_when_chain_resumer_missing

Tightened periodic_reconcile_grants_pending_request_without_signal to
inject a well-formed (limits-populated) orphan request; the previous
empty-limits shape would now be rejected as corrupt.

Rate-limit retry preserves chain task_id; harden chain-continuation invariants

Critical fix:

  • schedule_rate_limit_retry now takes a task_id: &str and reuses it
    on the retry CheckRateLimit. Previously it minted a fresh UUID,
    which broke the "chain's task_id is carried forward so prior
    holders remain reachable by the same id" invariant: every
    rate-limit retry leaked one concurrency slot. After enough retries
    the gating queue would stall because in-memory holders == limit.
    Both dequeue.rs call sites pass check_task_id through.

Defensive fixes around the broker-tombstone dodge:

  • handle_request_ticket, handle_check_rate_limit, and
    ShardChainResumer::resume_chain now compute
    task_key_start_ms = now_ms.max(parent_start_time_ms + 1) (parent
    for the resumer is the chain's start_at_ms). Previously they all
    used plain now_ms, which collides with the just-tombstoned
    parent's task_key when a worker processes a future-scheduled
    ticket exactly at its scheduled millisecond. Costs a single max.

  • walk_limit_chain consumes skip_try_reserve after the first
    handle_enqueue call (std::mem::replace). Today the only call site
    passing true exits the loop on the first iteration, so this is
    defensive — but a future change letting the walker step past the
    first limit with the flag still set would silently bypass every
    downstream reservation.

Regression test:

  • rate_limit_retry_preserves_chain_task_id wires up a stub
    RateLimitClient that fails once then passes; enqueues
    [Conc-A, RateLimit]; drives the chain through retry, completion,
    and holder release; asserts A's holder count is zero both on disk
    and in memory. Verified failing on pre-fix code (left: 1).

Close reimport held_queues leaks; harden rate-limit retry tombstone dodge

Reimport leaks:

  • import.rs Task::RequestTicket arm now folds in with RunAttempt and
    CheckRateLimit: a future RequestTicket can carry upstream chain
    grants on its held_queues, so deleting the task isn't enough — the
    prior holders must be released too. Without this fix every reimport
    of a future-scheduled multi-concurrency job orphaned its prior A
    slot. Mirrors the cancel.rs arm.
  • import.rs delete_concurrency_requests_for_job call now binds the
    returned Vec<(task_id, queue)> and extends it into
    released_holders. Previously the return was discarded, leaking
    every held_queues entry on a TicketRequested chain at reimport time.
  • Unified released_holders to Vec<(task_id, queue)> (same shape
    as cancel.rs) so both task-arm and request-scan paths feed through
    one accumulator and one post-commit atomic_release loop.

Rate-limit retry tombstone:

  • handle_check_rate_limit's parent task_key is ack-deleted, which
    installs a broker tombstone on (task_group, parent_start_time_ms, priority, job_id, attempt_number). The retry shares every
    component except start_time_ms, so a same-millisecond collision
    (zero backoff, or reset_time_ms == parent.start_time_ms) would
    silently suppress the retry and strand every held_queues entry.
    schedule_rate_limit_retry now takes parent_start_time_ms and
    computes task_key_start_ms = retry_at_ms.max(parent + 1) via a
    free helper rate_limit_retry_task_key_start_ms. Both dequeue.rs
    retry call sites now pass the parsed parent start time.

Regression tests:

  • reimport_releases_held_queues_on_future_request_ticket and
    reimport_releases_held_queues_on_deferred_request — confirmed
    failing on pre-fix code.
  • rate_limit_retry_dodges_parent_tombstone_with_zero_backoff — an
    integration test that asserts the persisted retry task_key
    start_time_ms strictly exceeds the parent's.
  • Four unit tests in rate_limit.rs deterministically cover the
    dodge invariant (equal, before, strictly after, exactly +1).

Defer concurrency grant for future-scheduled jobs

handle_enqueue called try_reserve unconditionally, so a future-scheduled
job whose queue had free capacity would grab a holder immediately and
hold it until the RunAttempt fired at scheduled_at_ms. A burst of
future-scheduled work into a quiet queue starved present-time jobs of
every slot.

Rollback the spurious reservation when scheduled_at_ms > now_ms and
fall through to the existing FutureRequestTaskWritten branch, which
writes Task::RequestTicket at scheduled_at_ms; handle_request_ticket
calls try_reserve again at dequeue time. New regression test
future_scheduled_jobs_do_not_starve_immediate_work pins the invariant.

Three pre-existing tests built their setup on the bug (chain accumulating
held_queues on a future Task::RequestTicket / future-scheduled holder
extraction). Adjusted them to present-time enqueues; removed two that
became redundant with their present-time analogs.

Fix webui tenant-detail test for future-grant deferral

test_tenant_detail_shows_queues_and_waiting_jobs enqueued two
future-scheduled concurrency-limited jobs and relied on them grabbing
holders to make detail-queue appear in queue_counts. Post-fix
future-scheduled jobs don't take holders at enqueue time, so the queue
was absent from the count rollup.

Switch the two jobs to present-time enqueues: job 1 grants the only
slot (holder=1), job 2 defers (requester=1). The queue still shows up
in queue_counts via either source.


Note

High Risk
Touches core concurrency, grant scanning, dequeue, and durable schema fields; incorrect chain resume or holder release can stall queues or leak capacity until restart.

Overview
This PR unifies multi-limit job processing so deferred concurrency grants no longer short-circuit to a single-queue RunAttempt. FlatBuffers and runtime types now carry task_id, limit_index, held_queues, and the full limits list on RequestTicket, EnqueueTask, and related paths. The grant scanner and dequeue RequestTicket handler resume via LimitChainResumerenqueue_limit_task_at_index (one walker). request_id is replaced by a stable chain task_id so holders stay keyed consistently.

Concurrency behavior: immediate grants write holders only (no interim RunAttempt); the terminal RunAttempt carries the full held_queues. Future-scheduled jobs no longer try_reserve at enqueue—they defer until dequeue. scheduled_at_ms vs task_key_start_ms separate scheduling from task keys to avoid broker tombstone collisions on chain resume, rate-limit retries, and ticket processing.

Leak / cleanup hardening: cancel, reimport, terminal dequeue paths release held_queues on tickets, rate-limit steps, and decoded deferred requests; handler errors roll back in-memory grants; rate-limit retries reuse chain task_id (no new UUID). Dequeue adds an RAII guard so cancelled RPCs don’t strand broker inflight tasks.

Tooling: silo-bench exercises floating limits + refresh outcomes, drain phase, and cross-shard holder audit; tasks SQL view adds task_id / held_queues.

Reviewed by Cursor Bugbot for commit f4244b9. Bugbot is set up for automated code reviews on this repo. Configure here.

@udnay

udnay commented May 23, 2026

Copy link
Copy Markdown
Contributor Author

@cursor cursor Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cursor Bugbot has reviewed your changes and found 3 potential issues.

Fix All in Cursor

❌ Bugbot Autofix is OFF. To automatically fix reported issues with cloud agents, enable autofix in the Cursor dashboard.

Reviewed by Cursor Bugbot for commit f50746f. Configure here.

Comment thread src/concurrency.rs
Comment thread src/job_store_shard/dequeue.rs
Comment thread src/job_store_shard/dequeue.rs
kirinrastogi and others added 14 commits May 30, 2026 11:31
Previously, three places independently decided what to write when a
concurrency slot was granted: `process_grants` (background scanner),
`process_ticket_request_task` (future-scheduled tickets at dequeue), and
`enqueue_limit_task_at_index` (the canonical chain walker). The first two
short-cut directly to a single-queue `RunAttempt`, silently dropping any
remaining limits and leaking prior holders when multi-limit chains
deferred past the first concurrency limit.

This change persists the chain state (`task_id`, `limit_index`,
`held_queues`, full `limits` list) on deferred requests and tickets so
`process_grants` and the dequeue ticket path can route every grant
through the same `enqueue_limit_task_at_index` chain walker via a new
`LimitChainResumer` trait (Weak-shard impl breaks the Arc cycle).

Side benefits: dequeue no longer fetches `JobInfo` for ticket grants
(limits ride on the ticket), the chain's `task_id` is preserved across
deferred grants so prior-holders stay reachable by the same id, and
follow-up tasks are written at `now_ms` to avoid the broker tombstone.
`enqueue_limit_task_at_index` accumulated in-memory grants via
`try_reserve` as it walked the limits, but if any interior `?` fired
(e.g. a `db.get` for floating-limit state, `handle_enqueue` on a
downstream try_reserve, `put_task`) the already-made reservations were
silently leaked — both call sites (process_grants' chain resumer and
handle_request_ticket) treated Err as "no chain_grants to track."

Split the function into a thin wrapper plus an inner `walk_limit_chain`.
The inner walks the limits and pushes successful grants into a
caller-owned `Vec` as it goes. The wrapper rolls those grants back via
`rollback_grant` if the inner returns Err. Callers now observe the
"all-or-nothing" semantics they assumed: `Ok(grants)` means every entry
is a live in-memory reservation with a pending batch write, `Err(_)`
means no leftover in-memory state.

Also:
- silo-bench: enqueue jobs with 1 Concurrency + 1-2 FloatingConcurrency
  limits, drive `ReportRefreshOutcome` with random new max_concurrency
  values, and audit holders across all shards after a 120s drain phase.
- New test `deferred_concurrency_resumes_into_rate_limit` pins the
  multi-limit chain resume case: Conc-A grants immediately at enqueue,
  Conc-B defers via grant scanner, and the scanner's chain MUST emit a
  CheckRateLimit (not a RunAttempt) carrying `held_queues = [A, B]`.
- Added a diagnostic `tracing::warn!` in `try_reserve_internal` that
  fires if a grant ever lands past the limit — silent in current
  testing, kept as a tripwire.
ShardChainResumer::resume_chain emitted the resumed RunAttempt at
task_key(params.start_at_ms, ...) — the same key the original enqueue
had written (then deleted in-batch) when the job first queued. If the
broker scan momentarily observed that interim RunAttempt before the
delete won in the LSM, a worker could lease and ack-delete it,
installing a tombstone for the key. The resumer's later write at the
same key was then silently suppressed, stranding the holders it had
granted. Mirrors the precedent in handle_request_ticket (dequeue.rs)
and the project_broker_tombstone_chain_continuation memory.

Also extends the tasks SQL view with task_id and held_queues so future
leak audits can directly compare leaked-holder task_ids to the
RunAttempt tasks in the DB.

Regression test pins the invariant: after process_grants resumes a
queued chain, the terminal RunAttempt's task_key.start_time_ms must be
>= now-at-release, strictly greater than the original enqueue's
start_at_ms. The test fails on the prior code (start_time_ms equals
the original enqueue time) and passes with the fix.
The prior commit overloaded one field (start_at_ms) to mean both "the
job's scheduled start time" and "the time component baked into every
task_key this walk writes." Working but confusing. Splitting clarifies
the contract at the type level:

- scheduled_at_ms — drives the `> now_ms` future-scheduling decision
  and is persisted on EnqueueTask records so process_grants can skip
  future-dated requests on resume.
- task_key_start_ms — the time component used in every task_key /
  concurrency_request_key the walk writes.

Call sites:
- Original enqueue / import / reimport / retry: both fields equal
  (a fresh attempt_number guarantees no tombstone collision).
- ShardChainResumer::resume_chain: scheduled_at_ms = the original
  start_at_ms (honest); task_key_start_ms = now_ms (dodges the broker
  tombstone for the same reason as before).
- handle_request_ticket / handle_check_rate_limit chain continuations:
  both = now_ms (already had this semantics).

handle_enqueue's signature gains a parameter — also takes both — and
internally uses scheduled_at_ms for the future-scheduling check while
every key write uses task_key_start_ms.

No behavioural change vs the prior commit; this is purely a naming /
type-level refactor to make the contract explicit.
append_grant_edits wrote both the holder *and* a `RunAttempt(held=[just
this queue])` at task_key on every immediate grant — a leftover from
when handle_enqueue was a standalone "grant a slot and create the
task" combo, predating the chain walker. With the walker in place that
interim is purely redundant: it gets overwritten by the terminal
RunAttempt write at the end of the walk, deleted by the Queued-branch
cleanup when a later limit queues, or overwritten by put_task when a
RateLimit step writes a CheckRateLimit at the same task_key.

Removing it closes a real correctness gap. If the slatedb scan ever
observed an in-flight intermediate state of the chain's WriteBatch, the
broker buffer could capture an interim RunAttempt whose `held_queues`
contains only one queue — and a worker leasing it would
under-release on completion, leaking the other queues' holders. After
this change the broker can only ever see the terminal RunAttempt with
the full `held_queues`, regardless of slatedb's batch-vs-scan
visibility semantics.

Also drops the conditional `writer.delete(task_key(...))` from
walk_limit_chain's Queued branches — no interim exists to clean up.

No behavioural change for any caller; tests still green (209 across
holder_leak/floating/concurrency/dequeue/enqueue/retry/import/
rate_limit/etc.).
…ndler error

- handle_request_ticket: release stored_held_queues when dropping the
  ticket due to terminal/missing/unreadable job status. Without this,
  any concurrency holders earlier chain steps accumulated leak.
- cancel.rs: cancelling a FutureRequestTaskWritten ticket now deletes
  its persisted held_queues and tracks them for post-commit
  atomic_release; previously the holders were silently orphaned.
- dequeue.rs: per-iteration handler Err now rolls back
  state.grants_to_rollback so in-memory try_reserve reservations don't
  leak when a handler bails after granting.
- Cover the cancel case with a regression test; tighten the existing
  two-concurrency-limits chain test (use A capacity=3 so the
  resumable-chain state is guaranteed) and assert it instead of just
  observing it.
- Add a debug_assert on the future-RequestTicket branch enforcing
  task_key_start_ms == scheduled_at_ms, plus a comment explaining why
  breaking out of the grant-scan loop on the first future request is
  safe.
- cancel.rs: new Task::CheckRateLimit arm releases held_queues when
  cancelling a job parked on a rate-limit step.
- cancel.rs: delete_concurrency_requests_with_prefix decodes each
  request value, deletes the holders its persisted held_queues point
  at, and surfaces them to the caller so the existing post-commit
  atomic_release path covers immediate-deferred requests too.
- Refactor cancel_job_inner to track holders as a (task_id, queue)
  vec rather than a single (Option<task_id>, Vec<queue>) so the
  RunAttempt / RequestTicket / CheckRateLimit / TicketRequested paths
  can all accumulate uniformly.
- dequeue.rs: handle_check_rate_limit gets the same
  Succeeded/Failed/Cancelled short-circuit as handle_request_ticket.
  Holders are released; Gubernator is not called for doomed jobs.
- concurrency.rs grant scanner: treat an empty decoded `limits` on a
  request as corrupt (warn + delete + skip) instead of silently
  granting via the limit=1 fallback. The chain resumer with
  `limits=[]` would otherwise fabricate a terminal RunAttempt that
  bypasses every gate.
- limit_chain.rs: ShardChainResumer::resume_chain takes now_ms from
  ResumeChainParams instead of re-capturing it. The scanner and
  resumer can now never disagree about the timestamp baked into the
  same batch.
- concurrency.rs: switch chain_resumer's storage from OnceLock to
  Mutex<Option<…>> so tests can take_chain_resumer_for_test() to
  exercise the scanner's release-and-bail branch.

Regression tests:
- cancel_check_rate_limit_releases_held_queues
- cancel_deferred_concurrency_request_releases_held_queues
- grant_scanner_releases_reservations_when_chain_resumer_missing

Tightened periodic_reconcile_grants_pending_request_without_signal to
inject a well-formed (limits-populated) orphan request; the previous
empty-limits shape would now be rejected as corrupt.
…nvariants

Critical fix:
- schedule_rate_limit_retry now takes a `task_id: &str` and reuses it
  on the retry CheckRateLimit. Previously it minted a fresh UUID,
  which broke the "chain's task_id is carried forward so prior
  holders remain reachable by the same id" invariant: every
  rate-limit retry leaked one concurrency slot. After enough retries
  the gating queue would stall because in-memory holders == limit.
  Both dequeue.rs call sites pass `check_task_id` through.

Defensive fixes around the broker-tombstone dodge:
- handle_request_ticket, handle_check_rate_limit, and
  ShardChainResumer::resume_chain now compute
  task_key_start_ms = now_ms.max(parent_start_time_ms + 1) (parent
  for the resumer is the chain's `start_at_ms`). Previously they all
  used plain `now_ms`, which collides with the just-tombstoned
  parent's task_key when a worker processes a future-scheduled
  ticket exactly at its scheduled millisecond. Costs a single max.

- walk_limit_chain consumes `skip_try_reserve` after the first
  handle_enqueue call (std::mem::replace). Today the only call site
  passing `true` exits the loop on the first iteration, so this is
  defensive — but a future change letting the walker step past the
  first limit with the flag still set would silently bypass every
  downstream reservation.

Regression test:
- rate_limit_retry_preserves_chain_task_id wires up a stub
  RateLimitClient that fails once then passes; enqueues
  `[Conc-A, RateLimit]`; drives the chain through retry, completion,
  and holder release; asserts A's holder count is zero both on disk
  and in memory. Verified failing on pre-fix code (left: 1).
…odge

Reimport leaks:
- import.rs `Task::RequestTicket` arm now folds in with RunAttempt and
  CheckRateLimit: a future RequestTicket can carry upstream chain
  grants on its held_queues, so deleting the task isn't enough — the
  prior holders must be released too. Without this fix every reimport
  of a future-scheduled multi-concurrency job orphaned its prior A
  slot. Mirrors the cancel.rs arm.
- import.rs `delete_concurrency_requests_for_job` call now binds the
  returned `Vec<(task_id, queue)>` and extends it into
  `released_holders`. Previously the return was discarded, leaking
  every held_queues entry on a TicketRequested chain at reimport time.
- Unified `released_holders` to `Vec<(task_id, queue)>` (same shape
  as cancel.rs) so both task-arm and request-scan paths feed through
  one accumulator and one post-commit `atomic_release` loop.

Rate-limit retry tombstone:
- `handle_check_rate_limit`'s parent task_key is ack-deleted, which
  installs a broker tombstone on `(task_group, parent_start_time_ms,
  priority, job_id, attempt_number)`. The retry shares every
  component except start_time_ms, so a same-millisecond collision
  (zero backoff, or `reset_time_ms == parent.start_time_ms`) would
  silently suppress the retry and strand every held_queues entry.
  `schedule_rate_limit_retry` now takes `parent_start_time_ms` and
  computes `task_key_start_ms = retry_at_ms.max(parent + 1)` via a
  free helper `rate_limit_retry_task_key_start_ms`. Both dequeue.rs
  retry call sites now pass the parsed parent start time.

Regression tests:
- `reimport_releases_held_queues_on_future_request_ticket` and
  `reimport_releases_held_queues_on_deferred_request` — confirmed
  failing on pre-fix code.
- `rate_limit_retry_dodges_parent_tombstone_with_zero_backoff` — an
  integration test that asserts the persisted retry task_key
  start_time_ms strictly exceeds the parent's.
- Four unit tests in `rate_limit.rs` deterministically cover the
  dodge invariant (equal, before, strictly after, exactly +1).
handle_enqueue called try_reserve unconditionally, so a future-scheduled
job whose queue had free capacity would grab a holder immediately and
hold it until the RunAttempt fired at scheduled_at_ms. A burst of
future-scheduled work into a quiet queue starved present-time jobs of
every slot.

Rollback the spurious reservation when scheduled_at_ms > now_ms and
fall through to the existing FutureRequestTaskWritten branch, which
writes Task::RequestTicket at scheduled_at_ms; handle_request_ticket
calls try_reserve again at dequeue time. New regression test
future_scheduled_jobs_do_not_starve_immediate_work pins the invariant.

Three pre-existing tests built their setup on the bug (chain accumulating
held_queues on a future Task::RequestTicket / future-scheduled holder
extraction). Adjusted them to present-time enqueues; removed two that
became redundant with their present-time analogs.
test_tenant_detail_shows_queues_and_waiting_jobs enqueued two
future-scheduled concurrency-limited jobs and relied on them grabbing
holders to make detail-queue appear in queue_counts. Post-fix
future-scheduled jobs don't take holders at enqueue time, so the queue
was absent from the count rollup.

Switch the two jobs to present-time enqueues: job 1 grants the only
slot (holder=1), job 2 defers (requester=1). The queue still shows up
in queue_counts via either source.
@udnay udnay force-pushed the yo/limit-chain-refactor branch from 2a695c2 to f4244b9 Compare May 30, 2026 19:49
Comment thread src/job_store_shard/mod.rs Outdated
@kirinrastogi kirinrastogi merged commit a1cd9a4 into main Jun 1, 2026
16 checks passed
@kirinrastogi kirinrastogi deleted the yo/limit-chain-refactor branch June 1, 2026 15:44
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants