Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 21 additions & 2 deletions components/src/dynamo/trtllm/request_handlers/handler_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,21 @@ def _restore_gms_weights() -> None:
manager.remap_all_vas()


# DYN_ENABLE_FAST_CANCELLATION: revert of PR #7489 ("prevent KV block leak from
# cancel during disagg KV transfer"). When set, the decode handler does NOT wrap
# abort() in _DeferredAbort — i.e. cancellation fires immediately instead of being
# deferred until the first decode token (KV-transfer-complete signal). This restores
# the pre-#7489 "fast cancellation" behavior, reintroducing the KV-block-leak risk
# that #7489 fixed. Diagnostic flag only — NOT a shippable default. Paired with the
# Rust-side gate in prefill_router (same env var) for a faithful whole-PR revert.
FAST_CANCELLATION = os.environ.get("DYN_ENABLE_FAST_CANCELLATION", "0").lower() in (
"1",
"true",
"yes",
"on",
)


class _Abortable(Protocol):
"""Structural type for objects that support abort(). Satisfied by both
GenerationResult and _DeferredAbort."""
Expand Down Expand Up @@ -1159,10 +1174,14 @@ async def _generate_locally_impl(
)

# In disagg decode mode, wrap abort() to defer until first token
# (KV transfer complete).
# (KV transfer complete). DYN_ENABLE_FAST_CANCELLATION (revert of #7489)
# bypasses the wrapper so abort fires immediately (fast cancellation).
abort_guard = (
_DeferredAbort(generation_result)
if self.disaggregation_mode == DisaggregationMode.DECODE
if (
self.disaggregation_mode == DisaggregationMode.DECODE
and not FAST_CANCELLATION
)
else None
)

Expand Down
53 changes: 53 additions & 0 deletions lib/llm/src/kv_router/prefill_router/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,28 @@ use inner::InnerPrefillRouter;
pub use types::{PrefillError, PrefillQueryOutcome};
use types::{PrefillOutcome, PrefillResolveDecision, build_decode_router_override};

/// DYN_ENABLE_FAST_CANCELLATION: revert of PR #7489 ("prevent KV block leak from cancel
/// during disagg KV transfer"). When set, the prefill request is re-linked as a child of
/// the engine context so a client cancel/kill propagates and tears down the in-flight
/// prefill + NIXL KV transfer ("fast cancellation"), and decode routing is aborted when the
/// context is killed. Default off = post-#7489 behavior (prefill not linked; decode routing
/// proceeds so KV transfer can complete + clean up). Diagnostic flag only — fast cancellation
/// reintroduces the KV-block-leak risk #7489 fixed. Mirrors the Python `FAST_CANCELLATION`
/// gate in trtllm `handler_base.py` for a faithful whole-PR revert.
fn fast_cancellation_enabled() -> bool {
static ENABLED: OnceLock<bool> = OnceLock::new();
*ENABLED.get_or_init(|| {
std::env::var("DYN_ENABLE_FAST_CANCELLATION")
.map(|v| {
matches!(
v.trim().to_ascii_lowercase().as_str(),
"1" | "true" | "yes" | "on"
)
})
.unwrap_or(false)
})
}

/// PrefillRouter is a forward-only operator that sits between Migration and the decode router.
/// It optionally calls a prefill worker before routing to decode, extracting disaggregated_params
/// from the prefill response and injecting them into the decode request.
Expand Down Expand Up @@ -181,6 +203,13 @@ impl
metadata.clone(),
);

// DYN_ENABLE_FAST_CANCELLATION (revert #7489): re-link prefill as a child of
// the engine context so cancel/kill propagates and interrupts the in-flight
// prefill + NIXL transfer (fast cancellation). Default off = not linked.
if fast_cancellation_enabled() {
engine_ctx.link_child(prefill_context.context());
}

// Pass the phase barrier to the spawned task. It is released after routing
// completes so worker recording finishes before phase changes to Decode.
// The prefill trace (carrying a/b) is moved into the spawned task, which
Expand Down Expand Up @@ -263,6 +292,11 @@ impl
request_id.clone(),
metadata.clone(),
);
// DYN_ENABLE_FAST_CANCELLATION (revert #7489): re-link prefill as a child of
// the engine context so cancel/kill interrupts the in-flight prefill + transfer.
if fast_cancellation_enabled() {
engine_ctx.link_child(prefill_context.context());
}
let completion = Self::execute_prefill(
self.prefill_router.get().cloned(),
prefill_context,
Expand Down Expand Up @@ -303,6 +337,11 @@ impl
request_id.clone(),
metadata.clone(),
);
// DYN_ENABLE_FAST_CANCELLATION (revert #7489): re-link prefill as a child of
// the engine context so cancel/kill interrupts the in-flight prefill + transfer.
if fast_cancellation_enabled() {
engine_ctx.link_child(prefill_context.context());
}

// In Direct mode, pass preselected_worker so execute_prefill uses
// router.direct() instead of router.generate() (which bails in Direct mode).
Expand Down Expand Up @@ -335,7 +374,21 @@ impl
// and leaks KV blocks permanently. The decode handler's
// kv_transfer_complete_event guard will clean up after KV is received.
// Log-only; decode routing must proceed for KV transfer cleanup.
//
// DYN_ENABLE_FAST_CANCELLATION (revert #7489): restore the pre-#7489 early-abort —
// when set, a stopped/killed context aborts decode routing immediately (fast
// cancellation; reintroduces the orphaned-transfer / block-leak risk #7489 fixed).
if engine_ctx.is_stopped() || engine_ctx.is_killed() {
if fast_cancellation_enabled() {
tracing::debug!(
"FAST_CANCELLATION: aborting decode entry, context {} stopped/killed",
engine_ctx.id()
);
return Err(anyhow::anyhow!(
"Context id {} is stopped or killed",
engine_ctx.id()
));
}
tracing::debug!(
"Context {} killed/stopped after prefill, allowing decode routing for KV transfer",
engine_ctx.id()
Expand Down
Loading