Skip to content
36 changes: 36 additions & 0 deletions lib/kv-router/src/scheduling/queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -573,6 +573,7 @@ impl<
/// Run the full scheduling pipeline for a single request:
/// compute projected load -> select worker -> book tracked state -> respond.
fn admit_one(&self, mut request: SchedulingRequest, decay_now: Instant) {
let op_start = Instant::now();
request.worker_loads = self
.slots
.project_worker_loads(request.token_seq.as_deref(), decay_now);
Expand All @@ -588,6 +589,41 @@ impl<
.select_worker(&workers, &request, eligibility, self.block_size)
};

// Per-op BUSY-time attribution (DYN_STALL_OP_TRACE=1): project_worker_loads + select_worker run
// synchronously on the scheduler-actor task (which lives on the frontend runtime), so this is
// on-event-loop busy time (no await above). WARN past DYN_STALL_OP_WARN_MS so a residual
// frontend stall can be attributed to scheduler admission vs request-path hashing. Zero-cost off.
{
static STALL_OP_WARN_MS: std::sync::OnceLock<Option<u128>> = std::sync::OnceLock::new();
let warn = *STALL_OP_WARN_MS.get_or_init(|| {
if std::env::var("DYN_STALL_OP_TRACE")
.ok()
.is_some_and(|v| v == "1" || v == "true")
{
Some(
std::env::var("DYN_STALL_OP_WARN_MS")
.ok()
.and_then(|s| s.parse().ok())
.unwrap_or(50u128),
)
} else {
None
}
});
if let Some(warn_ms) = warn {
let ms = op_start.elapsed().as_millis();
if ms >= warn_ms {
tracing::warn!(
target: "dynamo_stall_op",
op = "admit_select",
busy_ms = ms as u64,
isl_tokens = request.isl_tokens,
"scheduler admission busy on actor task"
);
}
}
}

let selection = match selection {
Ok(s) => s,
Err(e) => {
Expand Down
52 changes: 52 additions & 0 deletions lib/llm/src/kv_router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -519,6 +519,58 @@ where
);
}

// Per-op BUSY-time attribution (DYN_STALL_OP_TRACE=1): WARN when a synchronous frontend
// request-path op exceeds DYN_STALL_OP_WARN_MS (default 50ms) of on-event-loop busy time, so a
// residual frontend event-loop stall (after the tokenize offload) is attributed to a NAMED op
// (block_hash / seq_hash / indexer / schedule), not inferred. block/seq-hash deltas are pure
// busy (no await between `start` and here); `indexer` is busy for the inline radix path;
// `schedule` is wall (includes the actor oneshot) so treat it as an upper bound. Target
// dynamo_stall_op correlates by timestamp with the dynamo_stall canary. Zero-cost when off.
{
static STALL_OP_WARN_MS: std::sync::OnceLock<Option<u128>> = std::sync::OnceLock::new();
let warn = *STALL_OP_WARN_MS.get_or_init(|| {
if std::env::var("DYN_STALL_OP_TRACE")
.ok()
.is_some_and(|v| v == "1" || v == "true")
{
Some(
std::env::var("DYN_STALL_OP_WARN_MS")
.ok()
.and_then(|s| s.parse().ok())
.unwrap_or(50u128),
)
} else {
None
}
});
if let Some(warn_ms) = warn {
let ops = [
("block_hash", hash_elapsed.as_millis()),
(
"seq_hash",
seq_hash_elapsed.saturating_sub(hash_elapsed).as_millis(),
),
("indexer", indexer_duration.as_millis()),
(
"schedule",
total_elapsed.saturating_sub(find_matches_elapsed).as_millis(),
),
];
for (op, ms) in ops {
if ms >= warn_ms {
tracing::warn!(
target: "dynamo_stall_op",
op = op,
busy_ms = ms as u64,
isl_tokens = isl_tokens,
num_blocks = num_blocks,
"frontend request-path op busy on event loop"
);
}
}
}
}

// Observe per-request shared cache metrics.
if let Some(hits) = sc_hits_for_metrics
&& let Some(m) = metrics::RouterRequestMetrics::get()
Expand Down
85 changes: 61 additions & 24 deletions lib/llm/src/kv_router/prefill_router/execution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use dynamo_runtime::{
protocols::maybe_error::MaybeError,
};

use super::trace::{self, PrefillTrace};
use super::{
InnerPrefillRouter, PrefillError, PrefillQueryOutcome, PrefillResolveDecision, PrefillRouter,
};
Expand Down Expand Up @@ -217,6 +218,7 @@ impl PrefillRouter {
request: SingleIn<PreprocessedRequest>,
target_worker: Option<u64>,
phase_transition_permit: Option<OwnedSemaphorePermit>,
mut prefill_trace: Option<PrefillTrace>,
) -> Result<PrefillCompletion, PrefillError> {
let router = router.ok_or(PrefillError::NotActivated)?;
// Clone tracker before request is consumed by generate_to_worker.
Expand All @@ -228,33 +230,24 @@ impl PrefillRouter {
InnerPrefillRouter::SimpleRouter(_) => target_worker.map(|worker_id| (worker_id, None)),
InnerPrefillRouter::KvRouter(_) => None,
};

// (c) just before generate_to_worker dispatch. Take an in-flight guard for
// the target CTX worker so the trace shows whether the frontend keeps each
// CTX densely fed (inflight ≫ 1) or trickles (≈ 1–2). The guard increments
// now, WARNs the new depth, and decrements on drop — leak-proof across all
// return paths below. Both guard + trace are no-ops when DYN_PREFILL_TRACE
// is off. The gauge keys on `target_worker` (the resolved CTX worker on the
// spawned and NoBootstrapEndpoint paths); on the Unavailable fallback path
// the router picks internally and target_worker is None — gauge skipped.
if let Some(t) = prefill_trace.as_mut() {
t.mark_dispatched();
}
let _inflight_guard = trace::inflight_guard(target_worker);

let mut prefill_response = router
.generate_to_worker(request, target_worker)
.await
.map_err(|e| {
// A shed prefill worker returns ResourceExhausted. Carry it as the
// source so the chain stays downcastable to 503; boxing the raw
// anyhow error instead would hide that identity.
if match_error_chain(e.as_ref(), &[ErrorType::ResourceExhausted], &[]) {
tracing::warn!(
worker_error = %e,
"Request rejected by prefill worker (at capacity) — returning HTTP 503"
);
return PrefillError::PrefillError(
"prefill worker overloaded".to_string(),
Some(Box::new(
DynamoError::builder()
.error_type(ErrorType::ResourceExhausted)
.message(e.to_string())
.build(),
)),
);
}
PrefillError::PrefillError(
"failed to route to prefill worker".to_string(),
Some(e.into()),
)
})?;
.map_err(map_generate_to_worker_error)?;

// Release the phase barrier now that routing completed and worker recording already ran.
// Decode may proceed without waiting for prefill output streaming to finish.
Expand All @@ -267,6 +260,11 @@ impl PrefillRouter {
));
};

// (d) first response item from the CTX worker (= prefill compute + KV-ready).
if let Some(t) = prefill_trace.as_mut() {
t.mark_first_response();
}

// Record when prefill result arrived at the router (for KV transfer latency metric).
// This is after drop(phase_transition_permit) and after first_output is received.
if let Some(ref tracker) = tracker {
Expand Down Expand Up @@ -297,6 +295,14 @@ impl PrefillRouter {
}
}

// (e) prefill stream ended. Emit the single per-request lifecycle summary
// here — this is the point where the prefill output stream is fully drained
// and we have a clean, successful lifecycle (a→e all observed). Error
// returns above drop the trace without emitting (incomplete lifecycle).
if let Some(t) = prefill_trace.take() {
t.emit();
}

let Some(output) = &first_output.data else {
return Err(PrefillError::NoDisaggregatedParams(
"Prefill router output has no data field".to_string(),
Expand Down Expand Up @@ -337,6 +343,7 @@ impl PrefillRouter {
prefill_request: SingleIn<PreprocessedRequest>,
target_worker: Option<u64>,
phase_transition_permit: OwnedSemaphorePermit,
prefill_trace: Option<PrefillTrace>,
) {
let router = self.prefill_router.get().cloned();
// Capture current span to propagate trace context to the spawned task
Expand All @@ -349,6 +356,7 @@ impl PrefillRouter {
prefill_request,
target_worker,
Some(phase_transition_permit),
prefill_trace,
)
.await
{
Expand Down Expand Up @@ -459,6 +467,35 @@ impl PrefillRouter {
}
}

/// Map a `generate_to_worker` error into a `PrefillError`. Extracted verbatim
/// from the former inline `.map_err(...)` closure so the dispatch site can use
/// `.map_err(map_generate_to_worker_error)?` while the in-flight RAII guard
/// cleans up the gauge on the error path.
fn map_generate_to_worker_error(e: anyhow::Error) -> PrefillError {
// A shed prefill worker returns ResourceExhausted. Carry it as the
// source so the chain stays downcastable to 503; boxing the raw
// anyhow error instead would hide that identity.
if match_error_chain(e.as_ref(), &[ErrorType::ResourceExhausted], &[]) {
tracing::warn!(
worker_error = %e,
"Request rejected by prefill worker (at capacity) — returning HTTP 503"
);
return PrefillError::PrefillError(
"prefill worker overloaded".to_string(),
Some(Box::new(
DynamoError::builder()
.error_type(ErrorType::ResourceExhausted)
.message(e.to_string())
.build(),
)),
);
}
PrefillError::PrefillError(
"failed to route to prefill worker".to_string(),
Some(e.into()),
)
}

fn prefill_worker_info(
tracker: Option<&RequestTracker>,
routing_data: Option<&RoutingData>,
Expand Down
36 changes: 35 additions & 1 deletion lib/llm/src/kv_router/prefill_router/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use crate::{
mod activation;
mod execution;
mod inner;
mod trace;
mod types;

use inner::InnerPrefillRouter;
Expand Down Expand Up @@ -84,12 +85,22 @@ impl
request: SingleIn<PreprocessedRequest>,
next: ServerStreamingEngine<PreprocessedRequest, Annotated<LLMEngineOutput>>,
) -> Result<ManyOut<Annotated<LLMEngineOutput>>> {
// (a) arrival: capture as early as possible, before any work. Zero-cost
// when DYN_PREFILL_TRACE is off (the Instant is cheap; the trace handle
// below is None and short-circuits everything downstream).
let trace_arrival = std::time::Instant::now();

// Extract request data while preserving context
let (mut req, context) = request.into_parts();
let request_id = context.id().to_string();
let metadata = context.metadata().clone();
let engine_ctx = context.context();

// Per-request prefill trace (None unless DYN_PREFILL_TRACE on). isl_tokens
// = prompt token count of the original request.
let mut prefill_trace =
trace::PrefillTrace::new(&request_id, req.token_ids.len(), trace_arrival);

// Save original max_tokens for decode
let original_max_tokens = req.stop_conditions.max_tokens;

Expand Down Expand Up @@ -139,6 +150,10 @@ impl
dp_rank,
bootstrap_info,
} => {
// (b) worker resolved/selected.
if let Some(t) = prefill_trace.as_mut() {
t.mark_selected(Some(worker_id));
}
let topology_constraints =
self.preflight_kv_transfer_constraints(endpoint_id, Some(worker_id))?;

Expand Down Expand Up @@ -168,7 +183,14 @@ impl

// Pass the phase barrier to the spawned task. It is released after routing
// completes so worker recording finishes before phase changes to Decode.
self.spawn_prefill_task(prefill_context, Some(worker_id), prefill_phase_barrier);
// The prefill trace (carrying a/b) is moved into the spawned task, which
// is the path that runs under load; it records c/d/e and emits there.
self.spawn_prefill_task(
prefill_context,
Some(worker_id),
prefill_phase_barrier,
prefill_trace.take(),
);

(
Ok(PrefillOutcome::Bootstrap {
Expand Down Expand Up @@ -215,6 +237,10 @@ impl
worker_id: resolved_wid,
dp_rank: resolved_dp_rank,
} => {
// (b) worker resolved/selected.
if let Some(t) = prefill_trace.as_mut() {
t.mark_selected(Some(resolved_wid));
}
let topology_constraints =
self.preflight_kv_transfer_constraints(endpoint_id, Some(resolved_wid))?;

Expand Down Expand Up @@ -242,6 +268,7 @@ impl
prefill_context,
Some(resolved_wid),
None,
prefill_trace.take(),
)
.await?;
(
Expand All @@ -254,6 +281,12 @@ impl
)
}
PrefillResolveDecision::Unavailable | PrefillResolveDecision::NotActivated => {
// (b) no worker resolved upfront here; the router selects one inside
// execute_prefill. Mark select wall now with the preselected pin (may
// be None — worker attribution for this path is best-effort).
if let Some(t) = prefill_trace.as_mut() {
t.mark_selected(preselected_worker);
}
let topology_constraints =
self.preflight_kv_transfer_constraints(endpoint_id, None)?;

Expand All @@ -278,6 +311,7 @@ impl
prefill_context,
preselected_worker,
None,
prefill_trace.take(),
)
.await?;
let prefill_worker_id = completion
Expand Down
Loading
Loading