Skip to content

Adding lease_lost field to heartbeat responses to let a worker know if it should stop processing its work.#327

Open
udnay wants to merge 15 commits into
mainfrom
yo/add-lease-lost-heartbeat-response
Open

Adding lease_lost field to heartbeat responses to let a worker know if it should stop processing its work.#327
udnay wants to merge 15 commits into
mainfrom
yo/add-lease-lost-heartbeat-response

Conversation

@udnay

@udnay udnay commented May 28, 2026

Copy link
Copy Markdown
Contributor

Adding lease_lost field to heartbeat responses to let a worker know if it should stop processing its work.

This is distinct from a cancelled task, this means that the worker crashed or had a couple of epic event loop delays that caused silo to think that the worker is no longer responding.

Add lease lost to refresh tasks


Note

Medium Risk
Changes core lease/heartbeat and worker completion paths; proto field is backward compatible but clients must be updated to avoid stale workers reporting after reaps.

Overview
Introduces lease_lost on HeartbeatResponse so workers learn they no longer hold a lease without a failed gRPC heartbeat. Missing leases and worker ID mismatches now return a successful heartbeat with lease_lost: true instead of LeaseNotFound / LeaseOwnerMismatch errors; real infra failures still error for retries.

On the server, expired lease reaping records attempts with error code LEASE_LOST (replacing WORKER_CRASHED) and clearer abort messaging. The TypeScript worker aborts run and refresh tasks when leaseLost is set—via markLostLease, LeaseLostError, and onError—and does not call reportOutcome / reportRefreshOutcome, since the reaper may have already finalized the attempt. Cancellation reporting is unchanged except lease-loss no longer counts as “should report cancelled.”

Tests and turmoil scenarios are updated for the new code and heartbeat behavior.

Reviewed by Cursor Bugbot for commit 5051dbd. Bugbot is set up for automated code reviews on this repo. Configure here.

kirinrastogi and others added 15 commits May 23, 2026 14:05
Previously, three places independently decided what to write when a
concurrency slot was granted: `process_grants` (background scanner),
`process_ticket_request_task` (future-scheduled tickets at dequeue), and
`enqueue_limit_task_at_index` (the canonical chain walker). The first two
short-cut directly to a single-queue `RunAttempt`, silently dropping any
remaining limits and leaking prior holders when multi-limit chains
deferred past the first concurrency limit.

This change persists the chain state (`task_id`, `limit_index`,
`held_queues`, full `limits` list) on deferred requests and tickets so
`process_grants` and the dequeue ticket path can route every grant
through the same `enqueue_limit_task_at_index` chain walker via a new
`LimitChainResumer` trait (Weak-shard impl breaks the Arc cycle).

Side benefits: dequeue no longer fetches `JobInfo` for ticket grants
(limits ride on the ticket), the chain's `task_id` is preserved across
deferred grants so prior-holders stay reachable by the same id, and
follow-up tasks are written at `now_ms` to avoid the broker tombstone.
`enqueue_limit_task_at_index` accumulated in-memory grants via
`try_reserve` as it walked the limits, but if any interior `?` fired
(e.g. a `db.get` for floating-limit state, `handle_enqueue` on a
downstream try_reserve, `put_task`) the already-made reservations were
silently leaked — both call sites (process_grants' chain resumer and
handle_request_ticket) treated Err as "no chain_grants to track."

Split the function into a thin wrapper plus an inner `walk_limit_chain`.
The inner walks the limits and pushes successful grants into a
caller-owned `Vec` as it goes. The wrapper rolls those grants back via
`rollback_grant` if the inner returns Err. Callers now observe the
"all-or-nothing" semantics they assumed: `Ok(grants)` means every entry
is a live in-memory reservation with a pending batch write, `Err(_)`
means no leftover in-memory state.

Also:
- silo-bench: enqueue jobs with 1 Concurrency + 1-2 FloatingConcurrency
  limits, drive `ReportRefreshOutcome` with random new max_concurrency
  values, and audit holders across all shards after a 120s drain phase.
- New test `deferred_concurrency_resumes_into_rate_limit` pins the
  multi-limit chain resume case: Conc-A grants immediately at enqueue,
  Conc-B defers via grant scanner, and the scanner's chain MUST emit a
  CheckRateLimit (not a RunAttempt) carrying `held_queues = [A, B]`.
- Added a diagnostic `tracing::warn!` in `try_reserve_internal` that
  fires if a grant ever lands past the limit — silent in current
  testing, kept as a tripwire.
ShardChainResumer::resume_chain emitted the resumed RunAttempt at
task_key(params.start_at_ms, ...) — the same key the original enqueue
had written (then deleted in-batch) when the job first queued. If the
broker scan momentarily observed that interim RunAttempt before the
delete won in the LSM, a worker could lease and ack-delete it,
installing a tombstone for the key. The resumer's later write at the
same key was then silently suppressed, stranding the holders it had
granted. Mirrors the precedent in handle_request_ticket (dequeue.rs)
and the project_broker_tombstone_chain_continuation memory.

Also extends the tasks SQL view with task_id and held_queues so future
leak audits can directly compare leaked-holder task_ids to the
RunAttempt tasks in the DB.

Regression test pins the invariant: after process_grants resumes a
queued chain, the terminal RunAttempt's task_key.start_time_ms must be
>= now-at-release, strictly greater than the original enqueue's
start_at_ms. The test fails on the prior code (start_time_ms equals
the original enqueue time) and passes with the fix.
The prior commit overloaded one field (start_at_ms) to mean both "the
job's scheduled start time" and "the time component baked into every
task_key this walk writes." Working but confusing. Splitting clarifies
the contract at the type level:

- scheduled_at_ms — drives the `> now_ms` future-scheduling decision
  and is persisted on EnqueueTask records so process_grants can skip
  future-dated requests on resume.
- task_key_start_ms — the time component used in every task_key /
  concurrency_request_key the walk writes.

Call sites:
- Original enqueue / import / reimport / retry: both fields equal
  (a fresh attempt_number guarantees no tombstone collision).
- ShardChainResumer::resume_chain: scheduled_at_ms = the original
  start_at_ms (honest); task_key_start_ms = now_ms (dodges the broker
  tombstone for the same reason as before).
- handle_request_ticket / handle_check_rate_limit chain continuations:
  both = now_ms (already had this semantics).

handle_enqueue's signature gains a parameter — also takes both — and
internally uses scheduled_at_ms for the future-scheduling check while
every key write uses task_key_start_ms.

No behavioural change vs the prior commit; this is purely a naming /
type-level refactor to make the contract explicit.
append_grant_edits wrote both the holder *and* a `RunAttempt(held=[just
this queue])` at task_key on every immediate grant — a leftover from
when handle_enqueue was a standalone "grant a slot and create the
task" combo, predating the chain walker. With the walker in place that
interim is purely redundant: it gets overwritten by the terminal
RunAttempt write at the end of the walk, deleted by the Queued-branch
cleanup when a later limit queues, or overwritten by put_task when a
RateLimit step writes a CheckRateLimit at the same task_key.

Removing it closes a real correctness gap. If the slatedb scan ever
observed an in-flight intermediate state of the chain's WriteBatch, the
broker buffer could capture an interim RunAttempt whose `held_queues`
contains only one queue — and a worker leasing it would
under-release on completion, leaking the other queues' holders. After
this change the broker can only ever see the terminal RunAttempt with
the full `held_queues`, regardless of slatedb's batch-vs-scan
visibility semantics.

Also drops the conditional `writer.delete(task_key(...))` from
walk_limit_chain's Queued branches — no interim exists to clean up.

No behavioural change for any caller; tests still green (209 across
holder_leak/floating/concurrency/dequeue/enqueue/retry/import/
rate_limit/etc.).
…ndler error

- handle_request_ticket: release stored_held_queues when dropping the
  ticket due to terminal/missing/unreadable job status. Without this,
  any concurrency holders earlier chain steps accumulated leak.
- cancel.rs: cancelling a FutureRequestTaskWritten ticket now deletes
  its persisted held_queues and tracks them for post-commit
  atomic_release; previously the holders were silently orphaned.
- dequeue.rs: per-iteration handler Err now rolls back
  state.grants_to_rollback so in-memory try_reserve reservations don't
  leak when a handler bails after granting.
- Cover the cancel case with a regression test; tighten the existing
  two-concurrency-limits chain test (use A capacity=3 so the
  resumable-chain state is guaranteed) and assert it instead of just
  observing it.
- Add a debug_assert on the future-RequestTicket branch enforcing
  task_key_start_ms == scheduled_at_ms, plus a comment explaining why
  breaking out of the grant-scan loop on the first future request is
  safe.
- cancel.rs: new Task::CheckRateLimit arm releases held_queues when
  cancelling a job parked on a rate-limit step.
- cancel.rs: delete_concurrency_requests_with_prefix decodes each
  request value, deletes the holders its persisted held_queues point
  at, and surfaces them to the caller so the existing post-commit
  atomic_release path covers immediate-deferred requests too.
- Refactor cancel_job_inner to track holders as a (task_id, queue)
  vec rather than a single (Option<task_id>, Vec<queue>) so the
  RunAttempt / RequestTicket / CheckRateLimit / TicketRequested paths
  can all accumulate uniformly.
- dequeue.rs: handle_check_rate_limit gets the same
  Succeeded/Failed/Cancelled short-circuit as handle_request_ticket.
  Holders are released; Gubernator is not called for doomed jobs.
- concurrency.rs grant scanner: treat an empty decoded `limits` on a
  request as corrupt (warn + delete + skip) instead of silently
  granting via the limit=1 fallback. The chain resumer with
  `limits=[]` would otherwise fabricate a terminal RunAttempt that
  bypasses every gate.
- limit_chain.rs: ShardChainResumer::resume_chain takes now_ms from
  ResumeChainParams instead of re-capturing it. The scanner and
  resumer can now never disagree about the timestamp baked into the
  same batch.
- concurrency.rs: switch chain_resumer's storage from OnceLock to
  Mutex<Option<…>> so tests can take_chain_resumer_for_test() to
  exercise the scanner's release-and-bail branch.

Regression tests:
- cancel_check_rate_limit_releases_held_queues
- cancel_deferred_concurrency_request_releases_held_queues
- grant_scanner_releases_reservations_when_chain_resumer_missing

Tightened periodic_reconcile_grants_pending_request_without_signal to
inject a well-formed (limits-populated) orphan request; the previous
empty-limits shape would now be rejected as corrupt.
…nvariants

Critical fix:
- schedule_rate_limit_retry now takes a `task_id: &str` and reuses it
  on the retry CheckRateLimit. Previously it minted a fresh UUID,
  which broke the "chain's task_id is carried forward so prior
  holders remain reachable by the same id" invariant: every
  rate-limit retry leaked one concurrency slot. After enough retries
  the gating queue would stall because in-memory holders == limit.
  Both dequeue.rs call sites pass `check_task_id` through.

Defensive fixes around the broker-tombstone dodge:
- handle_request_ticket, handle_check_rate_limit, and
  ShardChainResumer::resume_chain now compute
  task_key_start_ms = now_ms.max(parent_start_time_ms + 1) (parent
  for the resumer is the chain's `start_at_ms`). Previously they all
  used plain `now_ms`, which collides with the just-tombstoned
  parent's task_key when a worker processes a future-scheduled
  ticket exactly at its scheduled millisecond. Costs a single max.

- walk_limit_chain consumes `skip_try_reserve` after the first
  handle_enqueue call (std::mem::replace). Today the only call site
  passing `true` exits the loop on the first iteration, so this is
  defensive — but a future change letting the walker step past the
  first limit with the flag still set would silently bypass every
  downstream reservation.

Regression test:
- rate_limit_retry_preserves_chain_task_id wires up a stub
  RateLimitClient that fails once then passes; enqueues
  `[Conc-A, RateLimit]`; drives the chain through retry, completion,
  and holder release; asserts A's holder count is zero both on disk
  and in memory. Verified failing on pre-fix code (left: 1).
…odge

Reimport leaks:
- import.rs `Task::RequestTicket` arm now folds in with RunAttempt and
  CheckRateLimit: a future RequestTicket can carry upstream chain
  grants on its held_queues, so deleting the task isn't enough — the
  prior holders must be released too. Without this fix every reimport
  of a future-scheduled multi-concurrency job orphaned its prior A
  slot. Mirrors the cancel.rs arm.
- import.rs `delete_concurrency_requests_for_job` call now binds the
  returned `Vec<(task_id, queue)>` and extends it into
  `released_holders`. Previously the return was discarded, leaking
  every held_queues entry on a TicketRequested chain at reimport time.
- Unified `released_holders` to `Vec<(task_id, queue)>` (same shape
  as cancel.rs) so both task-arm and request-scan paths feed through
  one accumulator and one post-commit `atomic_release` loop.

Rate-limit retry tombstone:
- `handle_check_rate_limit`'s parent task_key is ack-deleted, which
  installs a broker tombstone on `(task_group, parent_start_time_ms,
  priority, job_id, attempt_number)`. The retry shares every
  component except start_time_ms, so a same-millisecond collision
  (zero backoff, or `reset_time_ms == parent.start_time_ms`) would
  silently suppress the retry and strand every held_queues entry.
  `schedule_rate_limit_retry` now takes `parent_start_time_ms` and
  computes `task_key_start_ms = retry_at_ms.max(parent + 1)` via a
  free helper `rate_limit_retry_task_key_start_ms`. Both dequeue.rs
  retry call sites now pass the parsed parent start time.

Regression tests:
- `reimport_releases_held_queues_on_future_request_ticket` and
  `reimport_releases_held_queues_on_deferred_request` — confirmed
  failing on pre-fix code.
- `rate_limit_retry_dodges_parent_tombstone_with_zero_backoff` — an
  integration test that asserts the persisted retry task_key
  start_time_ms strictly exceeds the parent's.
- Four unit tests in `rate_limit.rs` deterministically cover the
  dodge invariant (equal, before, strictly after, exactly +1).
handle_enqueue called try_reserve unconditionally, so a future-scheduled
job whose queue had free capacity would grab a holder immediately and
hold it until the RunAttempt fired at scheduled_at_ms. A burst of
future-scheduled work into a quiet queue starved present-time jobs of
every slot.

Rollback the spurious reservation when scheduled_at_ms > now_ms and
fall through to the existing FutureRequestTaskWritten branch, which
writes Task::RequestTicket at scheduled_at_ms; handle_request_ticket
calls try_reserve again at dequeue time. New regression test
future_scheduled_jobs_do_not_starve_immediate_work pins the invariant.

Three pre-existing tests built their setup on the bug (chain accumulating
held_queues on a future Task::RequestTicket / future-scheduled holder
extraction). Adjusted them to present-time enqueues; removed two that
became redundant with their present-time analogs.
test_tenant_detail_shows_queues_and_waiting_jobs enqueued two
future-scheduled concurrency-limited jobs and relied on them grabbing
holders to make detail-queue appear in queue_counts. Post-fix
future-scheduled jobs don't take holders at enqueue time, so the queue
was absent from the count rollup.

Switch the two jobs to present-time enqueues: job 1 grants the only
slot (holder=1), job 2 defers (requester=1). The queue still shows up
in queue_counts via either source.
… if it should stop processing its work.

This is distinct from a cancelled task, this means that the worker crashed or had a couple of epic event loop delays that caused silo to think that the worker is no longer responding.
@udnay

udnay commented May 28, 2026

Copy link
Copy Markdown
Contributor Author

@udnay udnay force-pushed the yo/limit-chain-refactor branch from 2a695c2 to f4244b9 Compare May 30, 2026 19:49
Base automatically changed from yo/limit-chain-refactor to main June 1, 2026 15:44
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants