diff --git a/server/CMakeLists.txt b/server/CMakeLists.txt index 6b35df216..2522d8e42 100644 --- a/server/CMakeLists.txt +++ b/server/CMakeLists.txt @@ -619,6 +619,12 @@ if(DFLASH27B_TESTS) target_include_directories(test_kvflash_placement PRIVATE ${DFLASH27B_SRC_INCLUDE_DIRS}) add_test(NAME kvflash_placement COMMAND test_kvflash_placement) endif() + if(EXISTS "${CMAKE_CURRENT_SOURCE_DIR}/test/test_kvflash_pager.cpp") + add_executable(test_kvflash_pager test/test_kvflash_pager.cpp) + target_include_directories(test_kvflash_pager PRIVATE ${DFLASH27B_SRC_INCLUDE_DIRS}) + target_link_libraries(test_kvflash_pager PRIVATE dflash_common) + add_test(NAME kvflash_pager COMMAND test_kvflash_pager) + endif() if(EXISTS "${CMAKE_CURRENT_SOURCE_DIR}/test/test_bandit_integration.cpp") add_executable(test_bandit_integration test/test_bandit_integration.cpp) target_include_directories(test_bandit_integration PRIVATE ${DFLASH27B_SRC_INCLUDE_DIRS}) diff --git a/server/src/common/backend_factory.cpp b/server/src/common/backend_factory.cpp index e93c15df8..c0327446f 100644 --- a/server/src/common/backend_factory.cpp +++ b/server/src/common/backend_factory.cpp @@ -28,7 +28,7 @@ bool arch_supports_remote_draft(const std::string & arch) { } bool arch_supports_pflash_compression(const std::string & arch) { - return arch == "qwen35" || arch == "qwen3"; + return arch == "qwen35" || arch == "qwen3" || arch == "qwen35moe"; } std::unique_ptr create_backend(const BackendArgs & args) { diff --git a/server/src/common/kvflash_pager.h b/server/src/common/kvflash_pager.h index d61707d96..a861a4b3e 100644 --- a/server/src/common/kvflash_pager.h +++ b/server/src/common/kvflash_pager.h @@ -161,6 +161,7 @@ class KvFlashPager { // Drop all mappings and host backing (new request / cache reset). // Cumulative stats are kept; the epoch advances so cached masks refill. + // Pins are cleared: the caller (backend) re-applies them after rebuild. void reset() { #ifdef KVFLASH_HAS_ASYNC_DMA if (page_stream_) { @@ -183,6 +184,7 @@ class KvFlashPager { stats_.host_bytes = 0; cur_chunk_ = 0; epoch_++; + std::fill(pinned_.begin(), pinned_.end(), 0); has_pending_page_in_ = false; } @@ -207,6 +209,55 @@ class KvFlashPager { // Optional external relevance score; higher = keep. Falls back to LRU. std::function score_hook; + // ── Critical-chunk pinning ────────────────────────────────────────── + // Pinned chunks are never chosen as eviction victims (OR-ed on top of the + // sink/tail protections). Empty by default → byte-identical non-pin path. + // Pins are cleared by reset() and re-applied by the backend after each + // prefill/restore rebuild via apply_kvflash_pins(). + + // Pin logical token range [tok_lo, tok_hi) half-open. + // Maps to chunk range [c_lo, c_hi] inclusive where c_hi covers the last + // token in the range (tok_hi - 1), not tok_hi itself, so an exact boundary + // at tok_hi does not pin the next chunk beyond the range. + // Best-effort deadlock guard: if (sink+tail+n_pinned+2) > n_blocks_ the + // span is refused with a one-line warning and the function returns without + // setting any pin. + void pin_range(int64_t tok_lo, int64_t tok_hi) { + if (!attached() || tok_lo >= tok_hi) return; + const int c_lo = (int)(tok_lo / cfg_.chunk_tokens); + const int c_hi = (int)((tok_hi - 1) / cfg_.chunk_tokens); + if (c_lo > c_hi || c_lo < 0) return; + // Count currently-pinned chunks + the new ones. + int currently_pinned = 0; + for (int c = 0; c < (int)pinned_.size(); c++) { + if (pinned_[c]) currently_pinned++; + } + int new_pins = 0; + for (int c = c_lo; c <= c_hi; c++) { + if (c >= (int)pinned_.size() || !pinned_[c]) new_pins++; + } + // Deadlock guard: fixed protections + pinned + 2 (one evictable victim + + // one append-head) must fit in the pool. + if (cfg_.sink_chunks + cfg_.tail_window_chunks + currently_pinned + new_pins + 2 > n_blocks_) { + std::fprintf(stderr, + "[kvflash] pin_range [%lld,%lld] refused: " + "sink=%d tail=%d pinned=%d new=%d pool_blocks=%d — would deadlock eviction\n", + (long long)tok_lo, (long long)tok_hi, + cfg_.sink_chunks, cfg_.tail_window_chunks, + currently_pinned, new_pins, n_blocks_); + return; + } + if (c_hi + 1 > (int)pinned_.size()) pinned_.resize((size_t)c_hi + 1, 0); + for (int c = c_lo; c <= c_hi; c++) pinned_[(size_t)c] = 1; + } + + bool is_pinned(int c) const { + return c >= 0 && c < (int)pinned_.size() && pinned_[(size_t)c]; + } + + // Clear all pins (also called by reset()). + void unpin_all() { std::fill(pinned_.begin(), pinned_.end(), 0); } + // Allocate slots for [kv_start, kv_start + n_tok) ahead of a forward // step (evicting LRU/low-score chunks as needed). False — with a // diagnostic — if the pool has no evictable block left. @@ -397,7 +448,8 @@ class KvFlashPager { const ChunkState & st = chunks_[c]; if (st.block < 0 && !st.on_host) continue; // never materialized const bool prot = c < cfg_.sink_chunks || - c > cur_chunk_ - 1 - cfg_.tail_window_chunks; + c > cur_chunk_ - 1 - cfg_.tail_window_chunks || + is_pinned(c); cands.push_back({c, prot ? 3.4e38f : score_hook(c)}); } std::sort(cands.begin(), cands.end(), @@ -418,6 +470,111 @@ class KvFlashPager { return events; } + // Snapshot all resident+paged-out chunks into a flat byte blob. + // Layout: 8-byte magic, header fields (6×uint32), then for each logical + // chunk c in [0, n_chunks): chunk_bytes_ bytes in fixed segment order + // (layer-major, K then V, head-minor) — matching copy_chunk. + std::vector serialize() const { + static constexpr uint64_t kMagic = 0x4b56464c41534800ULL; // "KVFLASH\0" + const int nc = (int)chunks_.size(); + const size_t hdr = sizeof(uint64_t) + 6 * sizeof(uint32_t); + std::vector out; + out.resize(hdr + (size_t)nc * chunk_bytes_, 0); + uint8_t * p = out.data(); + std::memcpy(p, &kMagic, 8); p += 8; + auto w32 = [&](uint32_t v) { std::memcpy(p, &v, 4); p += 4; }; + w32((uint32_t)nc); + w32((uint32_t)cfg_.chunk_tokens); + w32((uint32_t)n_head_kv_); + w32((uint32_t)k_seg_bytes_); + w32((uint32_t)v_seg_bytes_); + w32((uint32_t)chunk_bytes_); + for (int c = 0; c < nc; ++c) { + uint8_t * dst = out.data() + hdr + (size_t)c * chunk_bytes_; + const ChunkState & st = chunks_[c]; + if (st.block >= 0) { + // Resident: gather from pool tensors in fixed segment order + // (layer-major, K then V, head-minor) — matching copy_chunk. + uint8_t * q = dst; + for (size_t l = 0; l < attn_k_.size(); ++l) { + for (int kv = 0; kv < 2; ++kv) { + ggml_tensor * t = kv == 0 ? attn_k_[l] : attn_v_[l]; + const size_t seg = kv == 0 ? k_seg_bytes_ : v_seg_bytes_; + for (int h = 0; h < n_head_kv_; ++h) { + const size_t off = (size_t)st.block * cfg_.chunk_tokens * t->nb[1] + + (size_t)h * t->nb[2]; + ggml_backend_tensor_get(t, q, off, seg); + q += seg; + } + } + } + } else if (st.on_host) { + // Host-backed: copy verbatim. host_data is a raw pinned pointer + // under async DMA, a std::vector otherwise. +#ifdef KVFLASH_HAS_ASYNC_DMA + std::memcpy(dst, st.host_data, chunk_bytes_); +#else + std::memcpy(dst, st.host_data.data(), chunk_bytes_); +#endif + } + // else: never written — stays zero-filled from the resize above. + } + return out; + } + + // Restore state from a blob produced by serialize(). Returns false on + // header mismatch (layout drift guard). Callers must rebuild slot masks. + // + // Ordering: pre-size chunks_ so each entry exists before slot_for() runs. + // We set host_data + on_host=true, THEN call slot_for(). slot_for's recall + // branch sees on_host==true and calls copy_chunk(to_host=false) itself — + // no extra copy_chunk needed (that would double-write). + bool deserialize(const uint8_t * data, size_t n) { + static constexpr uint64_t kMagic = 0x4b56464c41534800ULL; + const size_t hdr = sizeof(uint64_t) + 6 * sizeof(uint32_t); + if (n < hdr) return false; + const uint8_t * p = data; + uint64_t magic = 0; std::memcpy(&magic, p, 8); p += 8; + if (magic != kMagic) return false; + auto r32 = [&]() { uint32_t v = 0; std::memcpy(&v, p, 4); p += 4; return v; }; + const int nc = (int)r32(); + const int ct = (int)r32(); + const int nhkv = (int)r32(); + const size_t kseg = (size_t)r32(); + const size_t vseg = (size_t)r32(); + const size_t cb = (size_t)r32(); + if (ct != cfg_.chunk_tokens || nhkv != n_head_kv_ || + kseg != k_seg_bytes_ || vseg != v_seg_bytes_ || cb != chunk_bytes_) { + return false; + } + if (n < hdr + (size_t)nc * chunk_bytes_) return false; + reset(); + chunks_.resize(nc); // pre-size so slot_for doesn't resize again + for (int c = 0; c < nc; ++c) { + const uint8_t * src = data + hdr + (size_t)c * chunk_bytes_; + ChunkState & st = chunks_[c]; + // Park bytes in host_data; slot_for's recall branch copies to pool. + // host_data is a raw pinned pointer under async DMA, a vector otherwise. +#ifdef KVFLASH_HAS_ASYNC_DMA + if (!st.host_data) { + if (cudaMallocHost(&st.host_data, chunk_bytes_) != cudaSuccess) return false; + stats_.host_bytes += (int64_t)chunk_bytes_; + } + std::memcpy(st.host_data, src, chunk_bytes_); +#else + st.host_data.assign(src, src + chunk_bytes_); +#endif + st.on_host = true; + // slot_for assigns a block and auto-recalls via copy_chunk(to_host=false). + slot_for((int64_t)c * cfg_.chunk_tokens); + } +#ifdef KVFLASH_HAS_ASYNC_DMA + // Recalls above are async on page_stream_; settle before the pool is read. + if (has_pending_page_in_) synchronize_paging(); +#endif + return true; + } + private: struct ChunkState { int block = -1; // pool block index, -1 = not resident @@ -441,6 +598,7 @@ class KvFlashPager { if (chunks_[c].block < 0) continue; if (c < cfg_.sink_chunks) continue; if (c > cur_chunk_ - 1 - cfg_.tail_window_chunks) continue; + if (is_pinned(c)) continue; if (score_hook) { const float s = score_hook(c); if (victim < 0 || s < v_score) { victim = c; v_score = s; } @@ -566,6 +724,7 @@ class KvFlashPager { std::vector chunks_; std::vector free_blocks_; std::vector zero_buf_; // used by zero_block() in non-CUDA builds + std::vector pinned_; // per-chunk pin flag; empty = no pins KvFlashStats stats_; size_t k_seg_bytes_ = 0, v_seg_bytes_ = 0, chunk_bytes_ = 0; int n_blocks_ = 0, n_head_kv_ = 0, cur_chunk_ = 0; diff --git a/server/src/common/kvflash_qk.h b/server/src/common/kvflash_qk.h index f84bd85b0..a7e3ef445 100644 --- a/server/src/common/kvflash_qk.h +++ b/server/src/common/kvflash_qk.h @@ -47,7 +47,10 @@ inline void kvflash_qk_chunk_scores( const float * query, const KvFlashQkDims & d, std::vector & out, - float missing_score = -2.0f) { + float missing_score = -2.0f, + const float * seeded = nullptr, + float seeded_sentinel = -std::numeric_limits::infinity(), + int seeded_n = -1) { const int group = d.n_q_heads / d.n_kv_heads; const int n_chunks = (int)pooled_keys.size(); out.assign((size_t)n_chunks, missing_score); @@ -83,6 +86,19 @@ inline void kvflash_qk_chunk_scores( } out[(size_t)c] = acc * inv_layers; // layer-MEAN (Phase-0 config) } + // Seeded fallback: for chunks with no pooled key, use the ledger score from + // a prior turn if it is not the sentinel (i.e. it was actually scored). + // seeded_n bounds the valid range of the seeded array; chunks beyond it + // (n_chunks > seeded array length) fall back to missing_score safely. + if (seeded) { + const int seeded_limit = (seeded_n >= 0) ? seeded_n : n_chunks; + for (int c = 0; c < n_chunks; c++) { + if (!pooled_keys[(size_t)c] && c < seeded_limit && + seeded[c] != seeded_sentinel) { + out[(size_t)c] = seeded[c]; + } + } + } } } // namespace dflash::common diff --git a/server/src/common/moe_hybrid_ffn_eval.cpp b/server/src/common/moe_hybrid_ffn_eval.cpp index e8bddebd2..cb3fd85d5 100644 --- a/server/src/common/moe_hybrid_ffn_eval.cpp +++ b/server/src/common/moe_hybrid_ffn_eval.cpp @@ -1009,7 +1009,7 @@ static int mmq_safe_sub_batch() { static const int v = [](){ const char * e = std::getenv("DFLASH_MMQ_SUB_BATCH"); if (e) return std::max(1, std::atoi(e)); - return (query_gpu_compute_sm() >= 80) ? 8 : 1; + return (query_gpu_compute_sm() >= 80) ? 4 : 1; // Q4_K MMVQ cap=4 on sm_86 }(); return v; } @@ -1066,6 +1066,27 @@ static bool eval_moe_hybrid_ffn_batched_core( if (cl >= 0) { cold_sel[i] = cl; cold_wts[i] = selected_weights[i]; fp_has_cold = true; } } } + // Dummy slots (wts==0) may alias a real hot expert's local ID per token → + // ids_to_sorted_host drops entries → ASSERT in slow ggml_mul_mat_id path. + for (int t = 0; t < n_tokens; ++t) { + const int base = t * n_used; + int32_t next = 0; + for (int s = 0; s < n_used; ++s) { + if (hot_wts[base + s] > 0.0f) continue; + // Bounded search: at most n_hot_init probes. If every ID in + // [0, n_hot_init) is already taken by another slot we break and + // keep `next` as-is (duplicate), which is safe — the zero-weight + // slot is ignored by ids_to_sorted_host anyway. + int tries = 0; + while (tries < n_hot_init && + [&]{ for (int k=0; k= n_hot_init) next = 0; + ++tries; + } + hot_sel[base + s] = next++; + if (next >= n_hot_init) next = 0; + } + } CachedHotBatchedGraph & hg = storage.hot_batched_mixed[n_tokens]; const bool hg_ok = (hg.valid() && hg.n_tokens == n_tokens) @@ -1145,6 +1166,23 @@ static bool eval_moe_hybrid_ffn_batched_core( } } } + // Dummy slots (wts==0) may alias a real hot expert's local ID per token → + // ids_to_sorted_host drops entries → ASSERT in slow ggml_mul_mat_id path. + for (int t = 0; t < n_tokens; ++t) { + const int base = t * n_used; + int32_t next = 0; + for (int s = 0; s < n_used; ++s) { + if (hot_wts[base + s] > 0.0f) continue; + int tries = 0; + while (tries < n_hot_init && + [&]{ for (int k=0; k= n_hot_init) next = 0; + ++tries; + } + hot_sel[base + s] = next++; + if (next >= n_hot_init) next = 0; + } + } // ── Step 2: Build and run hot GPU graph (includes shared expert always) ── std::vector hot_partial((size_t)n_embd * (size_t)n_tokens, 0.0f); diff --git a/server/src/draft/draft_gguf_loader.cpp b/server/src/draft/draft_gguf_loader.cpp index 29efc543d..a9db84ca5 100644 --- a/server/src/draft/draft_gguf_loader.cpp +++ b/server/src/draft/draft_gguf_loader.cpp @@ -140,6 +140,76 @@ bool check_shape_2d(const ggml_tensor * t, int64_t ne0, int64_t ne1, } // namespace +// Lightweight metadata-only read of the drafter's capture-layer config. +// Opens the GGUF header (no tensor data loaded), extracts n_target_layers +// and target_layer_ids, and returns them. Called by the backend BEFORE +// load_target_model so that target_feat is allocated with the correct +// fc_in = n_capture_layers * n_embd dimension. +bool read_draft_capture_config(const std::string & path, + int & n_capture, + int * capture_ids, + int max_ids) { + n_capture = 0; + gguf_init_params gip{}; + gip.no_alloc = true; + gip.ctx = nullptr; // metadata-only: no tensor context + gguf_context * gctx = gguf_init_from_file(path.c_str(), gip); + if (!gctx) return false; + + // Resolve the arch prefix for key building. + std::string A = "qwen35-dflash-draft"; + { + int64_t arch_id = gguf_find_key(gctx, "general.architecture"); + if (arch_id >= 0) { + const char * arch = gguf_get_val_str(gctx, arch_id); + if (arch) A = arch; + } + } + + char key[128]; + + // Read n_target_layers. + std::snprintf(key, sizeof(key), "%s.%s", A.c_str(), "dflash.n_target_layers"); + int64_t ntl_id = gguf_find_key(gctx, key); + if (ntl_id >= 0) { + n_capture = (int)gguf_get_val_u32(gctx, ntl_id); + } + + // Zero-initialize the output array so callers get safe values even when + // target_layer_ids is absent (P1-G: uninitialized capture IDs). + for (int k = 0; k < max_ids; k++) capture_ids[k] = 0; + + // Read target_layer_ids array (exact capture positions from training). + std::snprintf(key, sizeof(key), "%s.%s", A.c_str(), "dflash.target_layer_ids"); + int64_t tli_id = gguf_find_key(gctx, key); + if (tli_id >= 0 && gguf_get_kv_type(gctx, tli_id) == GGUF_TYPE_ARRAY) { + // P2-2: verify element type is INT32 before casting (array may be INT64). + if (gguf_get_arr_type(gctx, tli_id) != GGUF_TYPE_INT32) { + std::fprintf(stderr, + "[draft-cfg] target_layer_ids array type is not INT32; skipping\n"); + } else { + const size_t n = std::min((size_t)gguf_get_arr_n(gctx, tli_id), + (size_t)max_ids); + const int32_t * ids = static_cast(gguf_get_arr_data(gctx, tli_id)); + for (size_t k = 0; k < n; k++) { + capture_ids[k] = (int)ids[k]; + } + if (n > 0) n_capture = (int)n; + } + } + + gguf_free(gctx); + + if (n_capture > 0) { + std::fprintf(stderr, "[draft-cfg] early-read: n_capture=%d ids=", n_capture); + for (int k = 0; k < n_capture && k < max_ids; k++) + std::fprintf(stderr, "%s%d", k ? "," : "[", capture_ids[k]); + std::fprintf(stderr, "]\n"); + } + return n_capture > 0; +} + + bool load_draft_gguf(const std::string & path, ggml_backend_t backend, DraftWeights & out, diff --git a/server/src/internal.h b/server/src/internal.h index 8e93532fb..03ae64d92 100644 --- a/server/src/internal.h +++ b/server/src/internal.h @@ -451,6 +451,11 @@ struct PrefixSnapshot { // [HEAD_DIM, kv_end-kv_start, N_HEAD_KV] (smaller than cache). // - ssm_state_snap, conv_state_snap, target_feat_snap are NOT // allocated (THIN snapshots are KV-only). + + // Phase C: pooled snapshots (kvflash evicted state). + // attn_k_snap/attn_v_snap are NOT allocated; KV lives in kvflash_blob. + bool is_pooled = false; + std::vector kvflash_blob; // pager-serialized KV; valid iff is_pooled }; // Snapshot the slim state of `cache` into `snap`. KV tensors are RIGHT-SIZED @@ -461,13 +466,17 @@ struct PrefixSnapshot { bool snapshot_target_cache(const TargetWeights & w, const TargetCache & cache, ggml_backend_t backend, - PrefixSnapshot & snap); + PrefixSnapshot & snap, + bool skip_kv = false, + const std::vector * kvflash_blob = nullptr); // Restore `cache` from `snap`. cache must already exist (created via // create_target_cache) and have matching shapes. Sets cache.cur_pos = // snap.cur_pos. Does NOT touch ssm_intermediate / conv_input_cache — // those will be repopulated by the first decode step's verify forward. -bool restore_target_cache(const PrefixSnapshot & snap, TargetCache & cache); +// skip_kv=true: skip restoring attn_k/attn_v rows (pooled restore path). +bool restore_target_cache(const PrefixSnapshot & snap, TargetCache & cache, + bool skip_kv = false); // Free the snapshot's GPU buffers. void free_prefix_snapshot(PrefixSnapshot & snap); diff --git a/server/src/qwen35/qwen35_backend.cpp b/server/src/qwen35/qwen35_backend.cpp index 4e376950e..a0403422c 100644 --- a/server/src/qwen35/qwen35_backend.cpp +++ b/server/src/qwen35/qwen35_backend.cpp @@ -255,6 +255,28 @@ bool Qwen35Backend::init() { kvflash_qk_policy_ = kvflash_policy_is_qk(); if (std::getenv("DFLASH_KVFLASH") && !kvflash_qk_policy_) { kvflash_drafter_path_ = kvflash_find_drafter(cfg_.target_path); + // Parse DFLASH_KVFLASH_PIN_SPANS="lo:hi,lo:hi" token ranges. + if (const char * ps = std::getenv("DFLASH_KVFLASH_PIN_SPANS")) { + const char * p = ps; + while (*p) { + long lo = 0, hi = 0; + char * end = nullptr; + lo = std::strtol(p, &end, 10); + if (end == p || *end != ':') break; + p = end + 1; + hi = std::strtol(p, &end, 10); + if (end == p) break; + if (lo >= 0 && hi >= lo) { + kvflash_pin_spans_.push_back({(int)lo, (int)hi}); + } + p = end; + if (*p == ',') p++; + } + if (!kvflash_pin_spans_.empty()) { + std::fprintf(stderr, "[kvflash] pin spans: %d range(s) configured\n", + (int)kvflash_pin_spans_.size()); + } + } } // "auto" sizes the pool from the GPU: weights are resident at this // point and the cache is not yet allocated, so device-free minus a @@ -271,8 +293,12 @@ bool Qwen35Backend::init() { if (kvflash_tokens_ > 0) { kvflash_tau_ = std::max(1, env_int_or_default("DFLASH_KVFLASH_TAU", 64)); } - // Subclass gate (e.g. MoE all-hot): may zero kvflash_tokens_ before the KV - // cache is sized, so create_target_cache allocates full max_ctx KV. + // Subclass gate (e.g. MoE all-hot fit check): may zero kvflash_tokens_ to + // disable the pool when it is not needed. Called before create_target_cache + // so the cache is allocated at the right size. + // Stash the budget so the gate can reuse the bytes_per_token/free_bytes without + // re-querying GPU memory. + kvf_budget_ = kvf_budget; if (!post_kvflash_init_gate()) return false; if (!create_target_cache(w_, cfg_.device.max_ctx, max_verify_tokens, target_backend_, cache_, /*prefill_only=*/true, /*ctx_alloc=*/kvflash_tokens_)) { @@ -428,23 +454,17 @@ bool Qwen35Backend::unpark(const std::string & what) { bool Qwen35Backend::snapshot_save(int slot) { if (slot < 0 || slot >= PREFIX_SLOTS) return false; - // kvflash: snapshots right-size to cur_pos, which is a LOGICAL position - // that can exceed the physical pool once decode has paged, and they copy - // rows assuming the identity layout, which pooled prefill / eviction - // breaks. Snapshots of pooled state need page-table serialization - // (follow-up); identity-mapped prefill-time snapshots remain valid. - if (kvflash_active() && - (cache_.cur_pos > kvflash_tokens_ || !kvflash_pager_.is_identity())) { - static bool warned = false; - if (!warned) { - std::fprintf(stderr, "[kvflash] snapshot skipped: cur_pos %d exceeds " - "pool %d (pooled snapshots are a follow-up)\n", - cache_.cur_pos, kvflash_tokens_); - warned = true; - } - return false; - } + const bool pooled = kvflash_active() && + (cache_.cur_pos > kvflash_tokens_ || !kvflash_pager_.is_identity()); PrefixSnapshot & snap = prefix_snapshots_[slot]; + if (pooled) { + std::vector blob = kvflash_pager_.serialize(); + if (!snapshot_target_cache(w_, cache_, snap_backend_, snap, /*skip_kv=*/true, &blob)) return false; + snap.is_pooled = true; + snap.kvflash_blob = std::move(blob); // keep for same-request restore + return true; + } + snap.is_pooled = false; return snapshot_target_cache(w_, cache_, snap_backend_, snap); } @@ -460,7 +480,19 @@ bool Qwen35Backend::snapshot_used(int slot) const { bool Qwen35Backend::restore_target_cache_from_snapshot(int slot) { if (slot < 0 || slot >= PREFIX_SLOTS || !prefix_snapshots_[slot].ctx) return false; - return restore_target_cache(prefix_snapshots_[slot], cache_); + const PrefixSnapshot & snap = prefix_snapshots_[slot]; + return restore_target_cache(snap, cache_, snap.is_pooled); +} + +bool Qwen35Backend::snapshot_is_pooled(int slot) const { + if (slot < 0 || slot >= PREFIX_SLOTS) return false; + return prefix_snapshots_[slot].is_pooled; +} + +static const std::vector s_empty_blob; +const std::vector & Qwen35Backend::snapshot_kvflash_blob(int slot) const { + if (slot < 0 || slot >= PREFIX_SLOTS) return s_empty_blob; + return prefix_snapshots_[slot].kvflash_blob; } int Qwen35Backend::snapshot_cur_pos(int slot) const { @@ -512,16 +544,23 @@ bool Qwen35Backend::snapshot_adopt(int slot, ggml_context * ctx, snap.conv_state_snap[idx] = t; } else if (std::strcmp(t->name, "snap_target_feat") == 0) { snap.target_feat_snap = t; + } else if (std::strcmp(t->name, "snap_kvflash_blob") == 0) { + snap.is_pooled = true; + snap.kvflash_blob.resize(ggml_nbytes(t)); + ggml_backend_tensor_get(t, snap.kvflash_blob.data(), 0, ggml_nbytes(t)); } } // Validate all required tensors are present. - for (int i = 0; i < n_full_attn; ++i) { - if (!snap.attn_k_snap[i] || !snap.attn_v_snap[i]) { - snap.attn_k_snap.clear(); snap.attn_v_snap.clear(); - snap.ssm_state_snap.clear(); snap.conv_state_snap.clear(); - snap.target_feat_snap = nullptr; - return false; + // Pooled snapshots have no attn_k/v (KV lives in kvflash_blob); skip that check. + if (!snap.is_pooled) { + for (int i = 0; i < n_full_attn; ++i) { + if (!snap.attn_k_snap[i] || !snap.attn_v_snap[i]) { + snap.attn_k_snap.clear(); snap.attn_v_snap.clear(); + snap.ssm_state_snap.clear(); snap.conv_state_snap.clear(); + snap.target_feat_snap = nullptr; + return false; + } } } for (int i = 0; i < n_delta; ++i) { @@ -860,8 +899,21 @@ GenerateResult Qwen35Backend::restore_and_generate_impl(int slot, // not leftovers from the previous request. cudaMemset is ~0.2ms. if (cache_.base_buf) ggml_backend_buffer_clear(cache_.base_buf, 0); - // Restore snapshot - restore_target_cache(prefix_snapshots_[slot], cache_); + // Restore snapshot (skip KV copy when pooled; pager handles KV separately). + const PrefixSnapshot & snap_ref = prefix_snapshots_[slot]; + const bool snap_pooled = snap_ref.is_pooled; + restore_target_cache(snap_ref, cache_, snap_pooled); + + // Pooled restore: rebuild pager from blob so KV rows are accessible. + if (snap_pooled && kvflash_active()) { + if (!kvflash_pager_.deserialize(snap_ref.kvflash_blob.data(), + snap_ref.kvflash_blob.size())) { + result.error = "kvflash: pager deserialize failed"; + out_io.emit(-1); + return result; + } + apply_kvflash_pins(); + } // Now generate from restored state sampler_ = req.sampler; @@ -869,7 +921,7 @@ GenerateResult Qwen35Backend::restore_and_generate_impl(int slot, sampler_rng_.seed(sampler_.seed); } - const int snap_pos = prefix_snapshots_[slot].cur_pos; + const int snap_pos = snap_ref.cur_pos; cache_.cur_pos = snap_pos; // FIX(prefix-cache + spec-decode): restore_target_cache brings back KV / @@ -894,12 +946,20 @@ GenerateResult Qwen35Backend::restore_and_generate_impl(int slot, // Daemon receives the FULL prompt; slice off the cached prefix and prefill // only the delta at KV positions [snap_pos, snap_pos + delta.size()). + // Pooled snapshots: do_prefill with kv_offset != 0 is unsupported when + // the pool is already full; fall back to full re-prefill from position 0. int committed = snap_pos; const int prompt_len = (int)req.prompt.size(); if (prompt_len > snap_pos) { auto t_prefill_start = std::chrono::steady_clock::now(); - std::vector delta = restore_prompt_delta(req.prompt, snap_pos); - committed = do_prefill(delta, out_io, req.snap_pos, req.snap_slot, /*kv_offset=*/snap_pos); + if (snap_pooled && kvflash_active()) { + reset_recurrent_state(cache_); + cache_.cur_pos = 0; + committed = do_prefill(req.prompt, out_io, req.snap_pos, req.snap_slot); + } else { + std::vector delta = restore_prompt_delta(req.prompt, snap_pos); + committed = do_prefill(delta, out_io, req.snap_pos, req.snap_slot, /*kv_offset=*/snap_pos); + } if (committed < 0) { result.error = "prefill"; return result; @@ -1020,15 +1080,38 @@ int Qwen35Backend::do_prefill(const std::vector & tokens, // identity-style in the first place). const bool kvf_paged = kvflash_active() && kv_offset + prompt_len > kvflash_tokens_ - kvflash_pager_.chunk_tokens(); - if (kvf_paged && kv_offset != 0) { - std::fprintf(stderr, - "[kvflash] restored prefix (%d) + prompt (%d) exceeds pool %d; " - "pooled prefill requires a fresh request\n", - kv_offset, prompt_len, kvflash_tokens_); - set_last_error("kvflash: restore + pooled prefill unsupported"); - return -1; - } - if (kvf_paged) { + // restore-consume: kv_offset != 0 means the caller passed a suffix after + // deserializing the prefix KV; allow it in the pooled path without reset. + const bool kvf_restore_suffix = kvf_paged && kv_offset != 0; + if (kvf_restore_suffix) { + // Suffix pooled prefill: pager was seeded by deserialize(); just set + // ubatch size and skip reset. Log to distinguish from the cold path. + prefill_ubatch = kvflash_pager_.chunk_tokens(); + // Verify kv_offset is chunk-aligned (snapshot boundary contract). + // On misalignment, fall back to a full cold re-prefill rather than + // aborting the request — the KV restored from the snapshot is stale. + if (kv_offset % prefill_ubatch != 0) { + std::fprintf(stderr, + "[kvflash] restore-consume: kv_offset=%d not chunk-aligned " + "(chunk_tokens=%d) — falling back to full re-prefill\n", + kv_offset, prefill_ubatch); + kv_offset = 0; + kvflash_pager_.reset(); + if (kvflash_qk_policy_) { + kvflash_qk_pool_.reset(kvflash_qk_pool_.dims()); + kvflash_qk_pooled_upto_ = 0; + } + std::printf("[kvflash] pooled prefill (fallback): %d tokens through a %d-token pool " + "(%d-token chunks, evicting)\n", + prompt_len, kvflash_tokens_, prefill_ubatch); + std::fflush(stdout); + } else { + std::printf("[kvflash] restore-consume suffix: offset=%d suffix=%d " + "pool=%d chunk=%d\n", + kv_offset, prompt_len, kvflash_tokens_, prefill_ubatch); + std::fflush(stdout); + } + } else if (kvf_paged) { prefill_ubatch = kvflash_pager_.chunk_tokens(); kvflash_pager_.reset(); if (kvflash_qk_policy_) { @@ -1247,6 +1330,7 @@ int Qwen35Backend::do_prefill(const std::vector & tokens, } else { kvflash_sync_prefill(committed, tokens, kv_offset); } + apply_kvflash_pins(); } // End-of-prefill snapshot: scoped disk-cache saves (auto/fixed policy) @@ -1331,6 +1415,13 @@ void Qwen35Backend::kvflash_upload_mask() { need * sizeof(uint16_t)); } +void Qwen35Backend::apply_kvflash_pins() { + if (kvflash_pin_spans_.empty()) return; + for (const auto & span : kvflash_pin_spans_) { + kvflash_pager_.pin_range((int64_t)span.first, (int64_t)span.second); + } +} + // Attach the drafter as the residency scorer outside the pflash compress // path: with `--kvflash --prefill-drafter ` but compression off, the // drafter would otherwise never load and the pool would silently run @@ -1364,7 +1455,15 @@ void Qwen35Backend::kvflash_maybe_reselect(int generated) { // re-prefill; measured 0.9 s @8K, ~46 s bisected @256K), while decode // produces ~30 tok/s. Capping rescore overhead at ~15% of decode time // gives tau ~= history/45. The configured tau is the floor. - const int tau = std::max(kvflash_tau_, (int)(kvflash_history_.size() / 45)); + // DFLASH_KVFLASH_TAU_EXACT=1 bypasses adaptive scaling so the configured + // value is used exactly (useful for testing / short-decode validation). + static const bool tau_exact = [] { + const char * e = std::getenv("DFLASH_KVFLASH_TAU_EXACT"); + return e && std::strcmp(e, "0") != 0; + }(); + const int tau = tau_exact + ? kvflash_tau_ + : std::max(kvflash_tau_, (int)(kvflash_history_.size() / 45)); if (generated % tau != 0) return; // Lazy-load the drafter only when a rescore is actually due, so the // first tokens of the first request never pay the load. diff --git a/server/src/qwen35/qwen35_backend.h b/server/src/qwen35/qwen35_backend.h index bc4fcdc0b..c76e91c22 100644 --- a/server/src/qwen35/qwen35_backend.h +++ b/server/src/qwen35/qwen35_backend.h @@ -129,13 +129,15 @@ class Qwen35Backend : public ModelBackend { protected: virtual bool load_target_model(ggml_backend_t backend, TargetWeights & out); + // Called after kvflash_tokens_ / kvflash_tau_ / pager are initialized. + // Subclasses may zero kvflash_tokens_ here to gate the pool off when it + // is not needed (e.g. all-hot MoE + full KV fits without the pool). + // Must return true; returning false aborts init(). + virtual bool post_kvflash_init_gate() { return true; } virtual bool run_ar_decode_path(int committed, int n_gen, std::vector & out_tokens, const DaemonIO & io); virtual bool should_capture_moe_router() const { return false; } - // Hook after kvflash pool sizing, before create_target_cache: a subclass - // may disable the pool (kvflash_tokens_=0) when it is redundant. Default no-op. - virtual bool post_kvflash_init_gate() { return true; } virtual void after_target_compute(StepGraph &, int /*kv_start*/, int /*n_tokens*/) {} @@ -152,6 +154,8 @@ class Qwen35Backend : public ModelBackend { bool prefill_logits_valid() const { return prefill_last_logits_valid_; } std::size_t prefill_logits_offset() const { return prefill_last_logits_offset_; } bool restore_target_cache_from_snapshot(int slot); + bool snapshot_is_pooled(int slot) const; + const std::vector & snapshot_kvflash_blob(int slot) const; // Accessors for draft/spec-decode state (needed by hybrid spec-decode in subclass) DraftWeights & draft_weights() { return dw_; } @@ -179,11 +183,16 @@ class Qwen35Backend : public ModelBackend { std::vector kvflash_scores_; // latest chunk scores std::vector kvflash_mask_buf_; // host mirror of slot mask std::string kvflash_drafter_path_; // DFLASH_KVFLASH_DRAFTER + // Token spans to pin (DFLASH_KVFLASH_PIN_SPANS="lo:hi,lo:hi" format). + // Empty when env is unset — no pins, byte-identical to prior behaviour. + std::vector> kvflash_pin_spans_; uint64_t kvflash_mask_epoch_ = (uint64_t)-1; int kvflash_tokens_ = 0; // 0 = off int kvflash_tau_ = 64; bool kvflash_drafter_failed_ = false; // don't retry a failed load bool kvflash_active() const { return kvflash_tokens_ > 0; } + // Budget snapshot stashed by init() for post_kvflash_init_gate() subclasses. + KvFlashAutoBudget kvf_budget_{}; // Pool sizing inputs — shared so MoE placement reserves exactly the pool // runtime allocates (else placement over-reserves KV and starves experts). bool kvflash_scorer_expected() const { @@ -213,6 +222,10 @@ class Qwen35Backend : public ModelBackend { // scorer is missing (lazy-loads the drafter on first need; also heals // after a residency release frees it). No-op without a path. void kvflash_ensure_scorer(); + // Re-apply all configured pin spans to the pager. Call after every + // pager rebuild (reset()+alloc_span or deserialize) so critical chunks + // survive the next eviction cycle. No-op when kvflash_pin_spans_ is empty. + void apply_kvflash_pins(); private: // ── GPU backends ───────────────────────────────────────────────── diff --git a/server/src/qwen35/qwen35_target_graph.cpp b/server/src/qwen35/qwen35_target_graph.cpp index 742aaa329..35c277f50 100644 --- a/server/src/qwen35/qwen35_target_graph.cpp +++ b/server/src/qwen35/qwen35_target_graph.cpp @@ -190,8 +190,24 @@ bool create_target_cache_partial(const TargetWeights & w, } } - constexpr int TARGET_FEAT_CAP_DEFAULT = 4096; - out.target_feat_cap = std::min(max_ctx, TARGET_FEAT_CAP_DEFAULT); + // Feature ring cap. The drafter needs the last `cap` target hidden states + // as context. A cap smaller than the context makes the ring WRAP and the + // drafter reads stale features → acceptance collapses (0.1% at 27K with the + // old 4096 default). Default to the full reserved context so the ring never + // wraps; lower via DFLASH_FEAT_RING_CAP to save VRAM (ring = cap*fc_in*2B). + int target_feat_cap_default = max_ctx; + if (const char * e = std::getenv("DFLASH_FEAT_RING_CAP")) { + char * endp = nullptr; + long v = std::strtol(e, &endp, 10); + if (endp == e || *endp != '\0' || v <= 0) { + std::fprintf(stderr, + "[dflash] DFLASH_FEAT_RING_CAP='%s' is invalid or non-positive; " + "using default max_ctx=%d\n", e, max_ctx); + } else { + target_feat_cap_default = (int)std::min(v, (long)max_ctx); + } + } + out.target_feat_cap = std::min(max_ctx, target_feat_cap_default); if (allocate_target_feat) { const int fc_in = w.n_capture_layers * w.n_embd; out.target_feat = ggml_new_tensor_2d(out.base_ctx, GGML_TYPE_BF16, fc_in, out.target_feat_cap); @@ -1421,7 +1437,9 @@ QwenLayerPrefnOutputs build_qwen35_layer_prefn( bool snapshot_target_cache(const TargetWeights & w, const TargetCache & cache, ggml_backend_t backend, - PrefixSnapshot & snap) { + PrefixSnapshot & snap, + bool skip_kv, + const std::vector * kvflash_blob) { const int n_full_attn = w.n_layer / w.full_attention_interval; // 16 const int n_delta = w.n_layer - n_full_attn; // 48 const int snap_pos = cache.cur_pos; @@ -1431,14 +1449,20 @@ bool snapshot_target_cache(const TargetWeights & w, return false; } - // Reuse existing buffer if shapes match (same cur_pos); otherwise reallocate. + // Reuse existing buffer if shapes match (same cur_pos AND same pooled mode). // Right-sized KV tensors use [head_dim, cur_pos, n_head_kv] — orders of // magnitude smaller than [head_dim, max_ctx, n_head_kv] for short prefixes. - const bool needs_alloc = (snap.ctx == nullptr) || (snap.cur_pos != snap_pos); + // skip_kv: pooled path — KV lives in the pager blob; omit attn_k/v tensors. + // If pooled mode changed (skip_kv != snap.is_pooled) the buffer layout is + // wrong even at the same position — must reallocate. + const bool pooled_mismatch = (snap.ctx != nullptr) && (skip_kv != snap.is_pooled); + const bool needs_alloc = (snap.ctx == nullptr) || (snap.cur_pos != snap_pos) || pooled_mismatch; if (needs_alloc) { free_prefix_snapshot(snap); - const int total_tensors = 2 * n_full_attn + 2 * n_delta + 1; // 65 + const int kv_tensors = skip_kv ? 0 : 2 * n_full_attn; + const bool has_blob = (kvflash_blob != nullptr && !kvflash_blob->empty()); + const int total_tensors = kv_tensors + 2 * n_delta + 1 + (has_blob ? 1 : 0); ggml_init_params ip{}; ip.mem_size = (size_t)(total_tensors + 16) * ggml_tensor_overhead(); ip.mem_buffer = nullptr; @@ -1446,23 +1470,25 @@ bool snapshot_target_cache(const TargetWeights & w, snap.ctx = ggml_init(ip); if (!snap.ctx) { set_last_error("PrefixSnapshot ggml_init failed"); return false; } - snap.attn_k_snap.assign(n_full_attn, nullptr); - snap.attn_v_snap.assign(n_full_attn, nullptr); snap.ssm_state_snap.assign(n_delta, nullptr); snap.conv_state_snap.assign(n_delta, nullptr); - // Right-sized KV: [head_dim, snap_pos, n_head_kv] - for (int i = 0; i < n_full_attn; i++) { - ggml_tensor * sk = cache.attn_k[i]; - ggml_tensor * sv = cache.attn_v[i]; - if (!sk || !sv) continue; - ggml_tensor * K = ggml_new_tensor_3d(snap.ctx, sk->type, sk->ne[0], snap_pos, sk->ne[2]); - ggml_tensor * V = ggml_new_tensor_3d(snap.ctx, sv->type, sv->ne[0], snap_pos, sv->ne[2]); - char name[64]; - std::snprintf(name, sizeof(name), "snap_cache_k_%d", i); ggml_set_name(K, name); - std::snprintf(name, sizeof(name), "snap_cache_v_%d", i); ggml_set_name(V, name); - snap.attn_k_snap[i] = K; - snap.attn_v_snap[i] = V; + if (!skip_kv) { + snap.attn_k_snap.assign(n_full_attn, nullptr); + snap.attn_v_snap.assign(n_full_attn, nullptr); + // Right-sized KV: [head_dim, snap_pos, n_head_kv] + for (int i = 0; i < n_full_attn; i++) { + ggml_tensor * sk = cache.attn_k[i]; + ggml_tensor * sv = cache.attn_v[i]; + if (!sk || !sv) continue; + ggml_tensor * K = ggml_new_tensor_3d(snap.ctx, sk->type, sk->ne[0], snap_pos, sk->ne[2]); + ggml_tensor * V = ggml_new_tensor_3d(snap.ctx, sv->type, sv->ne[0], snap_pos, sv->ne[2]); + char name[64]; + std::snprintf(name, sizeof(name), "snap_cache_k_%d", i); ggml_set_name(K, name); + std::snprintf(name, sizeof(name), "snap_cache_v_%d", i); ggml_set_name(V, name); + snap.attn_k_snap[i] = K; + snap.attn_v_snap[i] = V; + } } // SSM / conv: full-size (position-independent recurrent state). @@ -1489,6 +1515,14 @@ bool snapshot_target_cache(const TargetWeights & w, snap.target_feat_snap = nullptr; } + // Pooled blob: stash pager-serialized KV as a named I8 tensor so the + // disk cache round-trips it without any DiskCacheHeader changes. + ggml_tensor * blob_t = nullptr; + if (has_blob) { + blob_t = ggml_new_tensor_1d(snap.ctx, GGML_TYPE_I8, (int64_t)kvflash_blob->size()); + ggml_set_name(blob_t, "snap_kvflash_blob"); + } + snap.buf = ggml_backend_alloc_ctx_tensors(snap.ctx, backend); if (!snap.buf) { set_last_error("ggml_backend_alloc_ctx_tensors failed for PrefixSnapshot"); @@ -1501,30 +1535,39 @@ bool snapshot_target_cache(const TargetWeights & w, snap.target_feat_snap = nullptr; return false; } - std::fprintf(stderr, "[snap] alloc right-sized: cur_pos=%d buf=%.2f MiB backend=%s\n", + std::fprintf(stderr, "[snap] alloc right-sized: cur_pos=%d buf=%.2f MiB backend=%s skip_kv=%d blob=%zu\n", snap_pos, (double)ggml_backend_buffer_get_size(snap.buf) / 1024.0 / 1024.0, - ggml_backend_name(backend)); + ggml_backend_name(backend), (int)skip_kv, + has_blob ? kvflash_blob->size() : (size_t)0); + + // Upload blob bytes into the allocated tensor. + if (has_blob && blob_t) { + ggml_backend_tensor_set(blob_t, kvflash_blob->data(), 0, kvflash_blob->size()); + } } // Copy KV strip-by-strip (right-sized snapshot is smaller than cache). - for (int i = 0; i < n_full_attn; i++) { - ggml_tensor * sk = cache.attn_k[i]; - ggml_tensor * dk = snap.attn_k_snap[i]; - ggml_tensor * sv = cache.attn_v[i]; - ggml_tensor * dv = snap.attn_v_snap[i]; - if (!sk || !dk || !sv || !dv) continue; - const size_t k_strip = (size_t)snap_pos * sk->nb[1]; - const size_t v_strip = (size_t)snap_pos * sv->nb[1]; - for (int kh = 0; kh < (int)sk->ne[2]; kh++) { - size_t src_off = (size_t)kh * sk->nb[2]; - size_t dst_off = (size_t)kh * dk->nb[2]; - ggml_backend_tensor_get(sk, (char *)dk->data + dst_off, src_off, k_strip); - } - for (int kh = 0; kh < (int)sv->ne[2]; kh++) { - size_t src_off = (size_t)kh * sv->nb[2]; - size_t dst_off = (size_t)kh * dv->nb[2]; - ggml_backend_tensor_get(sv, (char *)dv->data + dst_off, src_off, v_strip); + // skip_kv=true: KV serialized externally via pager; skip the D2H copy here. + if (!skip_kv) { + for (int i = 0; i < n_full_attn; i++) { + ggml_tensor * sk = cache.attn_k[i]; + ggml_tensor * dk = snap.attn_k_snap[i]; + ggml_tensor * sv = cache.attn_v[i]; + ggml_tensor * dv = snap.attn_v_snap[i]; + if (!sk || !dk || !sv || !dv) continue; + const size_t k_strip = (size_t)snap_pos * sk->nb[1]; + const size_t v_strip = (size_t)snap_pos * sv->nb[1]; + for (int kh = 0; kh < (int)sk->ne[2]; kh++) { + size_t src_off = (size_t)kh * sk->nb[2]; + size_t dst_off = (size_t)kh * dk->nb[2]; + ggml_backend_tensor_get(sk, (char *)dk->data + dst_off, src_off, k_strip); + } + for (int kh = 0; kh < (int)sv->ne[2]; kh++) { + size_t src_off = (size_t)kh * sv->nb[2]; + size_t dst_off = (size_t)kh * dv->nb[2]; + ggml_backend_tensor_get(sv, (char *)dv->data + dst_off, src_off, v_strip); + } } } @@ -1544,6 +1587,16 @@ bool snapshot_target_cache(const TargetWeights & w, ggml_backend_tensor_get(cache.target_feat, snap.target_feat_snap->data, 0, feat_nbytes); } + // Blob refresh on reuse path (same cur_pos, alloc skipped, blob content may differ). + if (kvflash_blob && !kvflash_blob->empty()) { + for (ggml_tensor * t = ggml_get_first_tensor(snap.ctx); t; t = ggml_get_next_tensor(snap.ctx, t)) { + if (std::strcmp(t->name, "snap_kvflash_blob") == 0) { + ggml_backend_tensor_set(t, kvflash_blob->data(), 0, kvflash_blob->size()); + break; + } + } + } + snap.cur_pos = snap_pos; snap.last_tok = cache.last_tok; snap.kv_k_type = cache.kv_k_type; @@ -1553,7 +1606,8 @@ bool snapshot_target_cache(const TargetWeights & w, return true; } -bool restore_target_cache(const PrefixSnapshot & snap, TargetCache & cache) { +bool restore_target_cache(const PrefixSnapshot & snap, TargetCache & cache, + bool skip_kv) { if (snap.kv_k_type != cache.kv_k_type) { set_last_error("restore_target_cache: kv_k_type mismatch"); return false; @@ -1562,48 +1616,54 @@ bool restore_target_cache(const PrefixSnapshot & snap, TargetCache & cache) { set_last_error("restore_target_cache: max_ctx mismatch"); return false; } - // Topology: snapshot must describe the same model layout the cache was - // allocated against. A mismatch (stale snapshot from a different daemon - // run, or a snap captured before a model swap) would index past - // cache.attn_k / .ssm_state / .conv_state and silently corrupt memory. - if (snap.attn_k_snap.size() != cache.attn_k.size() || - snap.attn_v_snap.size() != cache.attn_v.size() || - snap.ssm_state_snap.size() != cache.ssm_state.size() || + // Topology check for non-KV vectors (always present). + if (snap.ssm_state_snap.size() != cache.ssm_state.size() || snap.conv_state_snap.size() != cache.conv_state.size()) { set_last_error("restore_target_cache: layer-count mismatch (stale snapshot?)"); return false; } + if (!skip_kv) { + // KV vectors must also match when we intend to restore them. + if (snap.attn_k_snap.size() != cache.attn_k.size() || + snap.attn_v_snap.size() != cache.attn_v.size()) { + set_last_error("restore_target_cache: KV layer-count mismatch (stale snapshot?)"); + return false; + } + } if (snap.cur_pos < 0 || snap.cur_pos > cache.max_ctx) { set_last_error("restore_target_cache: snap.cur_pos out of range"); return false; } - const int n_full_attn = (int)snap.attn_k_snap.size(); - const int n_delta = (int)snap.ssm_state_snap.size(); - const int snap_pos = snap.cur_pos; + const int n_delta = (int)snap.ssm_state_snap.size(); + const int snap_pos = snap.cur_pos; // KV: strip-by-strip copy from right-sized snapshot into full-size cache. - for (int i = 0; i < n_full_attn; i++) { - ggml_tensor * sk = snap.attn_k_snap[i]; - ggml_tensor * dk = cache.attn_k[i]; - ggml_tensor * sv = snap.attn_v_snap[i]; - ggml_tensor * dv = cache.attn_v[i]; - if ((!sk || !sv) != (!dk || !dv)) { - set_last_error("restore_target_cache: KV shard layout mismatch"); - return false; - } - if (!sk || !dk || !sv || !dv) continue; - const size_t k_strip = (size_t)snap_pos * sk->nb[1]; - const size_t v_strip = (size_t)snap_pos * sv->nb[1]; - for (int kh = 0; kh < (int)sk->ne[2]; kh++) { - size_t src_off = (size_t)kh * sk->nb[2]; - size_t dst_off = (size_t)kh * dk->nb[2]; - ggml_backend_tensor_set(dk, (const char *)sk->data + src_off, dst_off, k_strip); - } - for (int kh = 0; kh < (int)sv->ne[2]; kh++) { - size_t src_off = (size_t)kh * sv->nb[2]; - size_t dst_off = (size_t)kh * dv->nb[2]; - ggml_backend_tensor_set(dv, (const char *)sv->data + src_off, dst_off, v_strip); + // skip_kv=true: KV is restored separately via pager deserialize. + if (!skip_kv) { + const int n_full_attn = (int)snap.attn_k_snap.size(); + for (int i = 0; i < n_full_attn; i++) { + ggml_tensor * sk = snap.attn_k_snap[i]; + ggml_tensor * dk = cache.attn_k[i]; + ggml_tensor * sv = snap.attn_v_snap[i]; + ggml_tensor * dv = cache.attn_v[i]; + if ((!sk || !sv) != (!dk || !dv)) { + set_last_error("restore_target_cache: KV shard layout mismatch"); + return false; + } + if (!sk || !dk || !sv || !dv) continue; + const size_t k_strip = (size_t)snap_pos * sk->nb[1]; + const size_t v_strip = (size_t)snap_pos * sv->nb[1]; + for (int kh = 0; kh < (int)sk->ne[2]; kh++) { + size_t src_off = (size_t)kh * sk->nb[2]; + size_t dst_off = (size_t)kh * dk->nb[2]; + ggml_backend_tensor_set(dk, (const char *)sk->data + src_off, dst_off, k_strip); + } + for (int kh = 0; kh < (int)sv->ne[2]; kh++) { + size_t src_off = (size_t)kh * sv->nb[2]; + size_t dst_off = (size_t)kh * dv->nb[2]; + ggml_backend_tensor_set(dv, (const char *)sv->data + src_off, dst_off, v_strip); + } } } @@ -1649,6 +1709,9 @@ void free_prefix_snapshot(PrefixSnapshot & snap) { snap.is_thin = false; snap.kv_start = 0; snap.kv_end = 0; + snap.is_pooled = false; + snap.kvflash_blob.clear(); + snap.kvflash_blob.shrink_to_fit(); } bool snapshot_target_cache_thin(const TargetWeights & w, diff --git a/server/src/qwen35moe/qwen35moe_backend.cpp b/server/src/qwen35moe/qwen35moe_backend.cpp index 48b8c879e..eb42c5da6 100644 --- a/server/src/qwen35moe/qwen35moe_backend.cpp +++ b/server/src/qwen35moe/qwen35moe_backend.cpp @@ -32,11 +32,46 @@ static uint64_t elapsed_us(HybridClock::time_point start, HybridClock::time_poin return (uint64_t) std::chrono::duration_cast(end - start).count(); } +static bool hybrid_spec_profile_enabled() { + static const bool enabled = []() { + const char * v = std::getenv("DFLASH_QWEN35MOE_SPEC_PROFILE"); + return v && std::atoi(v) != 0; + }(); + return enabled; +} + +static bool hybrid_spec_microbench_enabled() { + static const bool enabled = []() { + const char * v = std::getenv("DFLASH_QWEN35MOE_SPEC_MICROBENCH"); + return v && std::atoi(v) != 0; + }(); + return enabled; +} + } // namespace +struct Qwen35MoeBackend::HybridSpecBatchProfile { + uint64_t total_us = 0; + uint64_t prefn_graph_build_us = 0; + uint64_t prefn_compute_us = 0; + uint64_t prefn_ssm_compute_us = 0; + uint64_t prefn_attn_compute_us = 0; + uint64_t position_build_us = 0; + uint64_t mask_build_us = 0; + uint64_t routing_readback_us = 0; + uint64_t moe_ffn_us = 0; + uint64_t feature_capture_us = 0; + uint64_t lm_head_graph_build_us = 0; + uint64_t lm_head_compute_us = 0; +}; + +struct Qwen35MoeBackend::HybridSpecGraphCache {}; + Qwen35MoeBackend::Qwen35MoeBackend(const Qwen35Config & cfg) : Qwen35Backend(cfg) {} +Qwen35MoeBackend::~Qwen35MoeBackend() = default; + bool Qwen35MoeBackend::load_target_model(ggml_backend_t backend, TargetWeights & out) { // Phase 1: Load core model (non-expert tensors) to GPU. // Expert tensors get metadata descriptors but are NOT allocated on GPU. @@ -71,12 +106,13 @@ bool Qwen35MoeBackend::load_target_model(ggml_backend_t backend, TargetWeights & ? std::string("hotness:") + hotness_path : std::string("uniform"); - // If all experts fit on GPU, reload with experts included + // If all experts fit on GPU, reload with experts included. + // Record the placement result so post_kvflash_init_gate() can disable + // the kvflash pool — moe_hybrid will be null on this path (no cold storage + // needed), so the gate cannot detect all-hot from the hybrid pointer alone. if (placement.total_hot >= out.n_layer * out.n_expert) { std::printf("[qwen35moe] all experts fit in VRAM, loading fully to GPU\n"); std::fflush(stdout); - // Record the placement result so post_kvflash_init_gate() can disable - // the KVFlash pool (moe_hybrid is null on this all-hot path). placement_all_hot_ = true; free_target_weights(out); return load_target_gguf(cfg_.target_path, backend, out); @@ -313,6 +349,13 @@ bool Qwen35MoeBackend::rebuild_hybrid_from_placement(const MoeHybridPlacement & return true; } +bool Qwen35MoeBackend::park(const std::string & what) { + // Delegate to base class. The hybrid-logits step-graph is created inline + // (not cached persistently) in this branch, so no additional teardown is + // needed here to avoid use-after-free on unpark. + return Qwen35Backend::park(what); +} + bool Qwen35MoeBackend::spark_bootstrap_finalize(const std::string & profile_path) { if (!spark_wants_bootstrap()) return false; std::string err; @@ -334,22 +377,41 @@ bool Qwen35MoeBackend::spark_bootstrap_finalize(const std::string & profile_path bool Qwen35MoeBackend::post_kvflash_init_gate() { // Gate: disable the KVFlash pool when dynamic placement confirmed all experts - // fit hot even with the FULL max_ctx KV reservation — the pool then reserves - // nothing useful (pure slot-map overhead). placement_all_hot_full_kv_ is set - // in load_dynamic_placement(). When the pool is what KEEPS experts hot - // (placement_all_hot_ true but _full_kv_ false), we must NOT disable it. + // are hot (0 cold experts). When all-hot, load_target_model takes the + // early-return path (load_target_gguf, no moe_hybrid), so moe_hybrid is null. + // The old byte-math fit check cannot fire on that path — it bails on + // !moe_hybrid. Instead we use placement_all_hot_, set by load_target_model + // exactly when placement.total_hot >= n_layer * n_expert. + // + // Secondary path: if moe_hybrid IS set (partial cold), count actual cold + // experts. If somehow 0 cold made it through (shouldn't happen, but be safe), + // also gate off. if (!kvflash_active()) return true; bool should_disable = false; + if (placement_all_hot_full_kv_) { + // KVFlash genuinely redundant: all experts fit hot even with the FULL + // max_ctx KV reservation, so the pool reserves nothing useful — disable + // it (pool would be pure slot-map overhead). NOTE: placement_all_hot_ + // (without _full_kv_) can be true because the POOL freed the VRAM that + // kept experts hot — in that case we must KEEP kvflash, hence the check + // on _full_kv_ here, not placement_all_hot_. should_disable = true; } else if (target_weights().moe_hybrid) { + // Partial placement: count actual cold experts from the hybrid storage. int total_cold = 0; for (const auto & ls : target_weights().moe_hybrid->layers) { total_cold += (int)ls.cold_expert_ids.size(); } - if (total_cold == 0) should_disable = true; // hybrid built but 0 cold + if (total_cold == 0) { + // Hybrid storage built but no cold experts — pool not needed. + should_disable = true; + } + // else: genuine cold experts present → kvflash stays active. } + // else: moe_hybrid null AND placement_all_hot_ false → dense model (no MoE); + // the base class gate is a no-op for dense, leave kvflash as-is. if (should_disable) { std::printf("[kvflash] disabled: placement all-hot at max_ctx %d, pool not needed\n", @@ -358,6 +420,7 @@ bool Qwen35MoeBackend::post_kvflash_init_gate() { kvflash_tokens_ = 0; kvflash_tau_ = 64; kvflash_drafter_path_.clear(); + kvflash_pin_spans_.clear(); } return true; } @@ -448,8 +511,15 @@ bool Qwen35MoeBackend::run_pipelined_decode_path(int committed, int n_gen, // Persistent logits graph (built once, reused per token). // Precision policy (#310): keep the rms-norm input and out_norm in f32. + // GPU argmax: for temp==0 greedy path, argmax is computed on GPU so only + // 4 bytes (token id) are read back instead of the full ~150K-float vocab. + // Escape hatch: DFLASH_GPU_ARGMAX=0 disables and falls back to full D2H. + static const bool kGpuArgmaxMoe = []() { + const char * v = std::getenv("DFLASH_GPU_ARGMAX"); + return v == nullptr || v[0] != '0'; + }(); StepGraph logits_sg; - auto project_logits = [&]() -> bool { + auto project_logits = [&](bool need_full_logits) -> bool { if (!logits_sg.ctx) { ggml_init_params ip{}; ip.mem_size = 64 * 1024 * 1024; @@ -469,6 +539,11 @@ bool Qwen35MoeBackend::run_pipelined_decode_path(int committed, int n_gen, logits_sg.logits = ggml_mul_mat(logits_sg.ctx, target_weights().output, normed); ggml_set_output(logits_sg.logits); ggml_build_forward_expand(logits_sg.gf, logits_sg.logits); + // GPU-side argmax: reduces vocab→1 int32 on GPU, avoids 150K-float D2H. + logits_sg.argmax_tokens = ggml_argmax(logits_sg.ctx, logits_sg.logits); + ggml_set_name(logits_sg.argmax_tokens, "moe_ar_argmax"); + ggml_set_output(logits_sg.argmax_tokens); + ggml_build_forward_expand(logits_sg.gf, logits_sg.argmax_tokens); logits_sg.alloc = ggml_gallocr_new(ggml_backend_get_default_buffer_type(target_backend())); if (!ggml_gallocr_alloc_graph(logits_sg.alloc, logits_sg.gf)) { step_graph_destroy(logits_sg); @@ -480,7 +555,9 @@ bool Qwen35MoeBackend::run_pipelined_decode_path(int committed, int n_gen, pipe_state_->gpu_state.act_cur, logits_sg.hidden_input); auto st = ggml_backend_graph_compute(target_backend(), logits_sg.gf); if (st != GGML_STATUS_SUCCESS) return false; - ggml_backend_tensor_get(logits_sg.logits, logits_buf.data(), 0, sizeof(float) * (size_t)vocab); + if (need_full_logits) { + ggml_backend_tensor_get(logits_sg.logits, logits_buf.data(), 0, sizeof(float) * (size_t)vocab); + } return true; }; @@ -488,9 +565,17 @@ bool Qwen35MoeBackend::run_pipelined_decode_path(int committed, int n_gen, { int32_t first_tok; if (sampler_config().temp > 0) { - if (!prefill_logits_valid()) return false; - ggml_backend_tensor_get(target_step_graph().logits, logits_buf.data(), - prefill_logits_offset(), sizeof(float) * (size_t)vocab); + // MoE restore path: pipe_state_ holds valid act_cur on GPU from the last + // forward (delta prefill or exact-hit priming). Use project_logits to sample. + // Dense path (pipe_state_ == nullptr): fall back to dense step-graph logits. + if (pipe_state_) { + if (!project_logits(/*need_full_logits=*/true)) return false; + } else if (prefill_logits_valid()) { + ggml_backend_tensor_get(target_step_graph().logits, logits_buf.data(), + prefill_logits_offset(), sizeof(float) * (size_t)vocab); + } else { + return false; + } first_tok = sample_logits(logits_buf.data(), vocab, sampler_config(), out_tokens, sampler_rng_engine()); } else { @@ -541,18 +626,28 @@ bool Qwen35MoeBackend::run_pipelined_decode_path(int committed, int n_gen, } const auto layers_done = DecodeClock::now(); - // act_cur stays on GPU — project_logits reads it via GPU→GPU copy - if (!project_logits()) { + // act_cur stays on GPU — project_logits reads it via GPU→GPU copy. + // For greedy/temp==0, pass need_full_logits=false: full D2H is skipped, + // only the 4-byte GPU argmax result is read back below. + const bool need_full = sampler_config().needs_logit_processing(); + if (!project_logits(need_full)) { step_graph_destroy(logits_sg); return false; } const auto logits_done = DecodeClock::now(); int32_t next_tok; - if (sampler_config().temp > 0) { + if (need_full) { next_tok = sample_logits(logits_buf.data(), vocab, sampler_config(), out_tokens, sampler_rng_engine()); + } else if (kGpuArgmaxMoe && logits_sg.argmax_tokens) { + // GPU argmax: read 4 bytes instead of ~600 KB vocab logits D2H. + int32_t tok_i = 0; + ggml_backend_tensor_get(logits_sg.argmax_tokens, &tok_i, 0, sizeof(int32_t)); + next_tok = tok_i; } else { + // Fallback: full D2H + CPU argmax (should not reach with GPU argmax enabled). + ggml_backend_tensor_get(logits_sg.logits, logits_buf.data(), 0, sizeof(float) * (size_t)vocab); next_tok = 0; float best = logits_buf[0]; for (int j = 1; j < vocab; ++j) { @@ -770,18 +865,33 @@ GenerateResult Qwen35MoeBackend::generate_impl(const GenerateRequest & req, const int prompt_len = (int)req.prompt.size(); const int prefill_chunk = std::min(128, prompt_len); // batch size per GPU compute - // kvflash: hybrid prefill writes rows identity-mapped, so the prompt must - // fit the pool with one chunk of decode headroom (same contract as the - // base do_prefill). - if (kvflash_active() && - prompt_len > kvflash_tokens_ - kvflash_pager_.chunk_tokens()) { - std::fprintf(stderr, - "[kvflash] hybrid prompt (%d) exceeds pool %d; raise --kvflash " - "or enable pflash compression\n", prompt_len, kvflash_tokens_); - result.error = "kvflash: prompt exceeds resident pool"; - cleanup_graphs(); - return result; - } + // kvflash: if the prompt fits the pool, prefill is identity-mapped (normal). + // If it exceeds the pool, switch to pooled chunked prefill: loop + // hybrid_forward_batch over chunk_tokens-sized slices with live eviction. + // Restore + pooled prefill is refused (same contract as the dense do_prefill). + int committed = 0; + const bool kvf_paged = kvflash_active() && + prompt_len > kvflash_tokens_ - kvflash_pager_.chunk_tokens(); + if (kvf_paged) { + kvflash_pager_.reset(); + // Apply pins BEFORE the eviction loop so pinned chunks survive + // paging from the very first chunk allocation. + apply_kvflash_pins(); + std::printf("[kvflash] hybrid pooled prefill: %d tokens " + "(%d-token chunks, evicting)\n", + prompt_len, kvflash_pager_.chunk_tokens()); + std::fflush(stdout); + if (!chunked_prefill(req.prompt.data(), 0, prompt_len, act_cur, committed)) { + result.error = "kvf_paged_prefill"; + cleanup_graphs(); + return result; + } + kvflash_history_.assign(req.prompt.begin(), req.prompt.end()); + kvflash_pager_.zero_free_blocks(); + kvflash_mask_epoch_ = (uint64_t)-1; + // Re-apply pins (zero_free_blocks doesn't touch pinned_ but be explicit) + apply_kvflash_pins(); + } else { // Embed all prompt tokens const int n_expert_used = target_weights().n_expert_used; @@ -830,11 +940,13 @@ GenerateResult Qwen35MoeBackend::generate_impl(const GenerateRequest & req, // Set positions if attention layer if (prefill_sg.positions) { std::vector pos_data((size_t)chunk_len * 4); + // planar layout: pos[i2], pos[i2+ne2], pos[i2+ne2*2], pos[i2+ne2*3] for (int i = 0; i < chunk_len; ++i) { - pos_data[(size_t)i * 4 + 0] = chunk_start + i; - pos_data[(size_t)i * 4 + 1] = chunk_start + i; - pos_data[(size_t)i * 4 + 2] = chunk_start + i; - pos_data[(size_t)i * 4 + 3] = 0; + const int pos = chunk_start + i; + pos_data[(size_t)0 * chunk_len + i] = pos; + pos_data[(size_t)1 * chunk_len + i] = pos; + pos_data[(size_t)2 * chunk_len + i] = pos; + pos_data[(size_t)3 * chunk_len + i] = 0; } ggml_backend_tensor_set(prefill_sg.positions, pos_data.data(), 0, sizeof(int32_t) * pos_data.size()); } @@ -1029,11 +1141,12 @@ GenerateResult Qwen35MoeBackend::generate_impl(const GenerateRequest & req, std::memcpy(act_cur.data(), embed_all.data() + (size_t)(prompt_len - 1) * (size_t)hidden, sizeof(float) * (size_t)hidden); - int committed = prompt_len; + committed = prompt_len; target_cache().cur_pos = committed; if (kvflash_active()) { kvflash_sync_prefill(committed, req.prompt, /*kv_offset=*/0); } + } // end else (non-paged prefill) auto t_prefill_end = std::chrono::steady_clock::now(); result.prefill_s = std::chrono::duration(t_prefill_end - t_prefill_start).count(); @@ -1358,17 +1471,23 @@ GenerateResult Qwen35MoeBackend::restore_and_generate_impl(int slot, return result; } + // Only the delta case (prompt_len > snap_pos) is served by the fast restore + // path: the residual prefill below replays the new tokens and leaves valid + // first-token logits at the correct position for any temperature. When there + // is no delta (prompt_len <= snap_pos: an exact-hit turn that adds no tokens, + // or a snapshot longer than the prompt) the snapshot carries no first-token + // logits, and priming by re-forwarding would double-advance the DeltaNet + // recurrent state (off-by-one corruption). These cases are rare; serve them + // with a correct full generate instead. + if ((int)req.prompt.size() <= snapshot_cur_pos(slot)) { + return generate_impl(req, io); + } + sampler_config() = req.sampler; if (req.do_sample && sampler_config().seed != 0) { sampler_rng_engine().seed(sampler_config().seed); } - if (req.n_gen > 0 && sampler_config().temp > 0) { - std::fprintf(stderr, - "[qwen35moe] hybrid snapshot restore falls back to full generate for temp sampling\n"); - return generate_impl(req, io); - } - pipe_state_.reset(); for (auto & layer : target_weights().moe_hybrid->layers) { layer.hot_graph.free(); @@ -1381,6 +1500,7 @@ GenerateResult Qwen35MoeBackend::restore_and_generate_impl(int slot, return result; } + const bool snap_pooled = snapshot_is_pooled(slot); const int snap_pos = snapshot_cur_pos(slot); int committed = snap_pos; target_cache().cur_pos = committed; @@ -1392,55 +1512,57 @@ GenerateResult Qwen35MoeBackend::restore_and_generate_impl(int slot, return result; } - // kvflash: the restored prefix + delta prefill land identity-mapped, so - // the full prompt must fit the pool (snapshots past the pool are never - // saved, but the delta can still overflow it). - if (kvflash_active() && - prompt_len > kvflash_tokens_ - kvflash_pager_.chunk_tokens()) { - std::fprintf(stderr, - "[kvflash] hybrid restore prompt (%d) exceeds pool %d; raise " - "--kvflash\n", prompt_len, kvflash_tokens_); - result.error = "kvflash: prompt exceeds resident pool"; - out_io.emit(-1); - return result; - } - - // kvflash: the delta prefill below runs the maskless pipelined forward - // over the padded pool span; map the restored prefix identity-style and - // zero stale free slots BEFORE any forward reads them. if (kvflash_active()) { - kvflash_pager_.reset(); - if (!kvflash_pager_.alloc_span(0, snap_pos)) { - result.error = "kvflash_slot"; - out_io.emit(-1); - return result; + if (snap_pooled) { + // Pooled restore: deserialize rebuilds the full page table (page-in + // on demand via slot_for); no reset()+alloc_span needed. + const auto & blob = snapshot_kvflash_blob(slot); + if (!kvflash_pager_.deserialize(blob.data(), blob.size())) { + result.error = "kvflash: pager deserialize failed"; + out_io.emit(-1); + return result; + } + apply_kvflash_pins(); + } else { + // Identity-mapped restore: the prefix fits the pool contiguously. + if (prompt_len > kvflash_tokens_ - kvflash_pager_.chunk_tokens()) { + std::fprintf(stderr, + "[kvflash] hybrid restore prompt (%d) exceeds pool %d; raise " + "--kvflash\n", prompt_len, kvflash_tokens_); + result.error = "kvflash: prompt exceeds resident pool"; + out_io.emit(-1); + return result; + } + kvflash_pager_.reset(); + if (!kvflash_pager_.alloc_span(0, snap_pos)) { + result.error = "kvflash_slot"; + out_io.emit(-1); + return result; + } + kvflash_pager_.zero_free_blocks(); + apply_kvflash_pins(); } - kvflash_pager_.zero_free_blocks(); } const int hidden = target_weights().n_embd; std::vector act_cur((size_t)hidden); if (prompt_len > snap_pos) { auto t_prefill_start = std::chrono::steady_clock::now(); - for (int i = snap_pos; i < prompt_len; ++i) { - int32_t argmax = -1; - if (!hybrid_forward_one_token(req.prompt[(size_t)i], committed, - act_cur, argmax)) { - result.error = "prefill_delta"; - return result; - } - committed++; - target_cache().cur_pos = committed; - target_cache().last_tok = argmax; + // Residual delta-prefill [snap_pos, prompt_len): batched (~1 s vs ~34 s + // per-token). hybrid_forward_batch's alloc_span/slot_of populate the + // page table for decode; [0, snap_pos) is already mapped above. + if (!chunked_prefill(req.prompt.data(), snap_pos, prompt_len, act_cur, committed)) { + result.error = "prefill_delta"; + return result; } result.prefill_s = std::chrono::duration( std::chrono::steady_clock::now() - t_prefill_start).count(); } - if (kvflash_active()) { + if (kvflash_active() && !snap_pooled) { // Rebuild the pager mapping over the identity-mapped [0, committed). - // With the full prompt available the history carries real ids; - // restore-only generates keep an unknown-prefix history. + // Pooled restore: pager state already correct from deserialize; sync + // would reset() and clobber the page table, so skip it. if (prompt_len == committed) { kvflash_sync_prefill(committed, req.prompt, /*kv_offset=*/0); } else { @@ -1448,6 +1570,17 @@ GenerateResult Qwen35MoeBackend::restore_and_generate_impl(int slot, } } + // Re-sync the draft feature mirror so spec-decode after restore sees valid features. + // Mirrors what generate_impl does before entering the spec-decode branch. + if (!is_draft_parked() && feature_mirror().target_feat && target_cache().target_feat) { + const int ring_cap = feature_mirror().cap; + const int n = std::min(committed, ring_cap); + const int start = std::max(0, committed - n); + draft_feature_mirror_sync_range(target_cache().target_feat, + target_cache().target_feat_cap, + feature_mirror(), start, n); + } + if (req.n_gen > 0) { if (target_cache().last_tok < 0) { std::fprintf(stderr, @@ -1455,9 +1588,35 @@ GenerateResult Qwen35MoeBackend::restore_and_generate_impl(int slot, return generate_impl(req, io); } auto t_decode_start = std::chrono::steady_clock::now(); - if (!run_pipelined_decode_path(committed, req.n_gen, result.tokens, out_io)) { - result.error = "decode"; - return result; + + // Note: the no-delta (prompt_len == snap_pos) case is handled by the + // early full-generate fallback above, so here prompt_len > snap_pos and + // the residual prefill loop has left pipe_state_ valid + act_cur populated + // from the last delta token — correct first-token logits for any temp. + + // Dispatch: spec-decode for temp==0 + draft, AR pipelined otherwise. + // Mirrors generate_impl's can_hybrid_spec condition exactly. + const bool can_hybrid_spec = !req.force_ar_decode + && cfg_.draft_path + && !is_draft_parked() + && feature_mirror().target_feat + && sampler_config().temp == 0.0f + && draft_weights().block_size > 0; + + if (can_hybrid_spec) { + result.spec_decode_ran = true; + // last_tok is already set from snapshot (or delta loop); do_hybrid_spec_decode + // uses it as the anchor for drafting. No compute_logits() call needed. + if (!do_hybrid_spec_decode(committed, req.n_gen, result.tokens, out_io, + &result.accept_rate)) { + result.error = "hybrid_spec_decode"; + return result; + } + } else { + if (!run_pipelined_decode_path(committed, req.n_gen, result.tokens, out_io)) { + result.error = "decode"; + return result; + } } result.decode_s = std::chrono::duration( std::chrono::steady_clock::now() - t_decode_start).count(); @@ -1566,6 +1725,29 @@ bool Qwen35MoeBackend::hybrid_forward_one_token(int32_t tok, int kv_pos, return true; } +// Chunked prefill of tokens[start, end) via hybrid_forward_batch slices. +// Advances committed / cur_pos / last_tok. Returns false on forward failure +// (caller owns result.error + any cleanup). Shared by generate_impl's +// kvf_paged branch and restore_and_generate_impl's residual delta. +bool Qwen35MoeBackend::chunked_prefill(const int32_t * tokens, int start_pos, + int end_pos, std::vector & act_cur, + int & committed) { + const int ct = kvflash_active() + ? kvflash_pager_.chunk_tokens() + : std::min(128, end_pos - start_pos); + std::vector chunk_argmax; + for (int start = start_pos; start < end_pos; start += ct) { + const int n = std::min(ct, end_pos - start); + if (!hybrid_forward_batch(tokens + start, n, start, act_cur, chunk_argmax, + /*capture_features=*/true)) + return false; + committed = start + n; + target_cache().cur_pos = committed; + target_cache().last_tok = chunk_argmax.back(); + } + return true; +} + // ── Batched hybrid forward ───────────────────────────────────────────────── // Processes all tokens layer-by-layer using the same approach as prefill: // per layer: build_layer_prefn_step (DeltaNet + router) → MoE FFN (batched) @@ -1581,6 +1763,8 @@ bool Qwen35MoeBackend::hybrid_forward_batch( const int hidden = target_weights().n_embd; const int n_layer = target_weights().n_layer; const int n_expert_used = target_weights().n_expert_used; + HybridSpecBatchProfile prof{}; + const auto batch_t0 = HybridClock::now(); // Embed all tokens std::vector embed_all((size_t)n_tokens * (size_t)hidden); @@ -1619,15 +1803,21 @@ bool Qwen35MoeBackend::hybrid_forward_batch( ggml_gallocr_t ffn_hot_alloc = nullptr; MoeHybridConfig chunk_cfg = make_moe_hybrid_config(target_weights()); + std::vector chunk_residuals((size_t)n_tokens * (size_t)hidden); + std::vector chunk_post((size_t)n_tokens * (size_t)hidden); + std::vector chunk_selected((size_t)n_tokens * (size_t)n_expert_used); + std::vector chunk_weights((size_t)n_tokens * (size_t)n_expert_used); for (int il = 0; il < n_layer; ++il) { auto & storage = target_weights().moe_hybrid->layers[(size_t)il]; + const bool is_attn = (((il + 1) % target_weights().full_attention_interval) == 0); const bool with_mask = kvf || (cfg_.kq_stride_pad > KQ_MASK_PAD) || (n_tokens > 1); // Build pre-FFN graph (DeltaNet/attention + router) for all tokens step_graph_free(prefn_sg); + const auto prefn_build_t0 = HybridClock::now(); if (!build_layer_prefn_step(prefn_sg, target_weights(), target_cache(), target_backend(), il, /*kv_start=*/base_pos, n_tokens, with_mask, /*fa_window=*/0, cfg_.kq_stride_pad, @@ -1636,6 +1826,7 @@ bool Qwen35MoeBackend::hybrid_forward_batch( if (ffn_hot_alloc) ggml_gallocr_free(ffn_hot_alloc); return false; } + prof.prefn_graph_build_us += elapsed_us(prefn_build_t0, HybridClock::now()); if (prefn_sg.kv_write_rows) { ggml_backend_tensor_set(prefn_sg.kv_write_rows, kvf_rows.data(), 0, sizeof(int64_t) * kvf_rows.size()); @@ -1647,19 +1838,24 @@ bool Qwen35MoeBackend::hybrid_forward_batch( // Set positions for attention layers if (prefn_sg.positions) { + const auto pos_t0 = HybridClock::now(); std::vector pos_data((size_t)n_tokens * 4); + // planar layout: pos[i2], pos[i2+ne2], pos[i2+ne2*2], pos[i2+ne2*3] for (int i = 0; i < n_tokens; ++i) { - pos_data[(size_t)i * 4 + 0] = base_pos + i; - pos_data[(size_t)i * 4 + 1] = base_pos + i; - pos_data[(size_t)i * 4 + 2] = base_pos + i; - pos_data[(size_t)i * 4 + 3] = 0; + const int pos = base_pos + i; + pos_data[(size_t)0 * n_tokens + i] = pos; + pos_data[(size_t)1 * n_tokens + i] = pos; + pos_data[(size_t)2 * n_tokens + i] = pos; + pos_data[(size_t)3 * n_tokens + i] = 0; } ggml_backend_tensor_set(prefn_sg.positions, pos_data.data(), 0, sizeof(int32_t) * pos_data.size()); + prof.position_build_us += elapsed_us(pos_t0, HybridClock::now()); } // Set causal mask if (prefn_sg.attn_mask && kvf) { + const auto mask_t0 = HybridClock::now(); // Slot-space mask (verify_batch recipe): committed resident // positions (< base_pos) plus this block's own slots, causal. // Built once, reused for every layer's graph. @@ -1688,7 +1884,9 @@ bool Qwen35MoeBackend::hybrid_forward_batch( } ggml_backend_tensor_set(prefn_sg.attn_mask, kvf_mask.data(), 0, sizeof(uint16_t) * kvf_mask.size()); + prof.mask_build_us += elapsed_us(mask_t0, HybridClock::now()); } else if (prefn_sg.attn_mask) { + const auto mask_t0 = HybridClock::now(); const int kv_len = base_pos + n_tokens; const int kv_pad_override = (int)prefn_sg.attn_mask->ne[0]; std::vector mask_buf; @@ -1696,22 +1894,24 @@ bool Qwen35MoeBackend::hybrid_forward_batch( cfg_.kq_stride_pad, /*win_start=*/0, kv_pad_override); ggml_backend_tensor_set(prefn_sg.attn_mask, mask_buf.data(), 0, sizeof(uint16_t) * mask_buf.size()); + prof.mask_build_us += elapsed_us(mask_t0, HybridClock::now()); } // Compute pre-FFN (DeltaNet + router for all tokens in one dispatch) + const auto prefn_compute_t0 = HybridClock::now(); auto st = ggml_backend_graph_compute(target_backend(), prefn_sg.gf); if (st != GGML_STATUS_SUCCESS) { step_graph_destroy(prefn_sg); if (ffn_hot_alloc) ggml_gallocr_free(ffn_hot_alloc); return false; } + const uint64_t prefn_compute_us = elapsed_us(prefn_compute_t0, HybridClock::now()); + prof.prefn_compute_us += prefn_compute_us; + if (is_attn) prof.prefn_attn_compute_us += prefn_compute_us; + else prof.prefn_ssm_compute_us += prefn_compute_us; // Readback results - std::vector chunk_residuals((size_t)n_tokens * (size_t)hidden); - std::vector chunk_post((size_t)n_tokens * (size_t)hidden); - std::vector chunk_selected((size_t)n_tokens * (size_t)n_expert_used); - std::vector chunk_weights((size_t)n_tokens * (size_t)n_expert_used); - + const auto routing_t0 = HybridClock::now(); ggml_backend_tensor_get(prefn_sg.ffn_residual, chunk_residuals.data(), 0, sizeof(float) * chunk_residuals.size()); ggml_backend_tensor_get(prefn_sg.ffn_post, chunk_post.data(), 0, @@ -1728,11 +1928,13 @@ bool Qwen35MoeBackend::hybrid_forward_batch( sizeof(int32_t) * chunk_selected.size()); ggml_backend_tensor_get(prefn_sg.moe_weights, chunk_weights.data(), 0, sizeof(float) * chunk_weights.size()); + prof.routing_readback_us += elapsed_us(routing_t0, HybridClock::now()); // MoE FFN — batched MoeLayerDesc chunk_desc = make_moe_layer_desc(target_weights().layers[(size_t)il]); std::vector ffn_batch_out; bool ffn_ok = false; + const auto moe_t0 = HybridClock::now(); // Spark expert cache: pull the verify batch's selected cold experts into // spare GPU slots (LRU) so the batched FFN serves them on-die — the SAME @@ -1782,6 +1984,7 @@ bool Qwen35MoeBackend::hybrid_forward_batch( single_out.data(), sizeof(float) * (size_t)hidden); } } + prof.moe_ffn_us += elapsed_us(moe_t0, HybridClock::now()); // Combine FFN + residual → embed_all for next layer for (int i = 0; i < n_tokens; ++i) { @@ -1794,6 +1997,7 @@ bool Qwen35MoeBackend::hybrid_forward_batch( // Feature capture at capture layers if (capture_features && target_cache().target_feat && cfg_.draft_path) { + const auto feat_t0 = HybridClock::now(); int capture_idx = -1; for (int k = 0; k < target_weights().n_capture_layers; k++) { if (target_weights().capture_layer_ids[k] == il) { @@ -1814,6 +2018,7 @@ bool Qwen35MoeBackend::hybrid_forward_batch( ggml_backend_tensor_set(target_cache().target_feat, bf16_tmp.data(), offset, (size_t)hidden * elt); } + prof.feature_capture_us += elapsed_us(feat_t0, HybridClock::now()); } } } @@ -1830,11 +2035,14 @@ bool Qwen35MoeBackend::hybrid_forward_batch( // replay forwards (vocab ~152k x n_tokens x 4B, twice per spec step). argmax_out.resize(n_tokens); StepGraph proj_sg; + const auto lm_build_t0 = HybridClock::now(); if (!build_lm_head_projection_step(proj_sg, target_weights(), target_backend(), n_tokens)) { return false; } + prof.lm_head_graph_build_us += elapsed_us(lm_build_t0, HybridClock::now()); ggml_backend_tensor_set(proj_sg.hidden_input, embed_all.data(), 0, sizeof(float) * (size_t)n_tokens * (size_t)hidden); + const auto lm_compute_t0 = HybridClock::now(); auto proj_st = ggml_backend_graph_compute(target_backend(), proj_sg.gf); if (proj_st != GGML_STATUS_SUCCESS) { step_graph_destroy(proj_sg); @@ -1842,7 +2050,31 @@ bool Qwen35MoeBackend::hybrid_forward_batch( } ggml_backend_tensor_get(proj_sg.argmax_tokens, argmax_out.data(), 0, sizeof(int32_t) * (size_t)n_tokens); + prof.lm_head_compute_us += elapsed_us(lm_compute_t0, HybridClock::now()); step_graph_destroy(proj_sg); + prof.total_us = elapsed_us(batch_t0, HybridClock::now()); + + if (hybrid_spec_profile_enabled()) { + std::fprintf(stderr, + "[hybrid-spec-prof][batch] n_tokens=%d total=%.3fms prefn_build=%.3fms " + "prefn_compute=%.3fms ssm_prefn=%.3fms attn_prefn=%.3fms " + "positions=%.3fms masks=%.3fms routing_readback=%.3fms " + "moe_ffn=%.3fms feature_capture=%.3fms lm_head_build=%.3fms " + "lm_head_compute=%.3fms\n", + n_tokens, + prof.total_us / 1000.0, + prof.prefn_graph_build_us / 1000.0, + prof.prefn_compute_us / 1000.0, + prof.prefn_ssm_compute_us / 1000.0, + prof.prefn_attn_compute_us / 1000.0, + prof.position_build_us / 1000.0, + prof.mask_build_us / 1000.0, + prof.routing_readback_us / 1000.0, + prof.moe_ffn_us / 1000.0, + prof.feature_capture_us / 1000.0, + prof.lm_head_graph_build_us / 1000.0, + prof.lm_head_compute_us / 1000.0); + } return true; } @@ -1984,18 +2216,28 @@ bool Qwen35MoeBackend::do_hybrid_spec_decode(int committed, int n_gen, } draft_tok[0] = last_tok; - // 4. Verify: snapshot recurrent state, then run ALL draft tokens batched + // 4. Verify: snapshot recurrent state, then run draft tokens via pipelined AR path. + // Sequential single-token pipelined forward (~0.9ms/tok) vs batched hybrid (~4.5ms/tok). + // Feature capture suppressed during verify (positions would overwrite valid prefill cache). snapshot_ssm_state(target_cache()); target_tok.resize(verify_width); - bool verify_ok = hybrid_forward_batch( - draft_tok.data(), verify_width, committed, - act_cur, target_tok, /*capture_features=*/false); - if (!verify_ok) { - std::fprintf(stderr, "[hybrid-spec] verify failed\n"); - restore_ssm_state(target_cache()); - step_graph_destroy(draft_sg); - return false; + { + ggml_tensor * saved_feat = target_cache().target_feat; + target_cache().target_feat = nullptr; // suppress feature capture during verify + bool verify_ok = true; + for (int i = 0; i < verify_width && verify_ok; i++) { + int32_t argmax; + verify_ok = hybrid_forward_one_token(draft_tok[i], committed + i, act_cur, argmax); + if (verify_ok) target_tok[i] = argmax; + } + target_cache().target_feat = saved_feat; + if (!verify_ok) { + std::fprintf(stderr, "[hybrid-spec] verify failed\n"); + restore_ssm_state(target_cache()); + step_graph_destroy(draft_sg); + return false; + } } // 5. Acceptance: longest matching prefix @@ -2012,7 +2254,9 @@ bool Qwen35MoeBackend::do_hybrid_spec_decode(int committed, int n_gen, if (commit_n <= accept_n) bonus_tok = -1; } - // 6. Restore and replay accepted tokens + // 6. Restore SSM state and replay accepted tokens via pipelined path. + // Replay overwrites KV at committed..committed+commit_n-1 with correct values + // and captures features for the next draft step. restore_ssm_state(target_cache()); std::vector replay_tok((size_t)commit_n); @@ -2020,15 +2264,17 @@ bool Qwen35MoeBackend::do_hybrid_spec_decode(int committed, int n_gen, replay_tok[i] = (i < accept_n) ? draft_tok[i] : bonus_tok; } - // Replay tokens through batched hybrid forward (captures features for next draft step) - std::vector replay_argmax; - if (!hybrid_forward_batch(replay_tok.data(), commit_n, committed, - act_cur, replay_argmax, /*capture_features=*/true)) { - std::fprintf(stderr, "[hybrid-spec] replay failed\n"); - step_graph_destroy(draft_sg); - return false; + { + int32_t last_argmax = last_tok; + for (int i = 0; i < commit_n; i++) { + if (!hybrid_forward_one_token(replay_tok[i], committed + i, act_cur, last_argmax)) { + std::fprintf(stderr, "[hybrid-spec] replay failed\n"); + step_graph_destroy(draft_sg); + return false; + } + } + last_tok = last_argmax; } - last_tok = replay_argmax[commit_n - 1]; // 7. Sync features to mirror for next draft step if (feature_mirror().target_feat && target_cache().target_feat) { diff --git a/server/src/qwen35moe/qwen35moe_backend.h b/server/src/qwen35moe/qwen35moe_backend.h index a731b4f7a..83aec71eb 100644 --- a/server/src/qwen35moe/qwen35moe_backend.h +++ b/server/src/qwen35moe/qwen35moe_backend.h @@ -20,7 +20,7 @@ namespace dflash::common { class Qwen35MoeBackend : public Qwen35Backend { public: explicit Qwen35MoeBackend(const Qwen35Config & cfg); - ~Qwen35MoeBackend() override = default; + ~Qwen35MoeBackend() override; GenerateResult generate_impl(const GenerateRequest & req, const DaemonIO & io) override; @@ -39,18 +39,15 @@ class Qwen35MoeBackend : public Qwen35Backend { std::vector & out_tokens, const DaemonIO & io) override; bool should_capture_moe_router() const override { return routing_stats_ != nullptr; } + bool park(const std::string & what) override; bool spark_wants_bootstrap() const override; bool spark_bootstrap_finalize(const std::string & profile_path) override; void after_target_compute(StepGraph & sg, int kv_start, int n_tokens) override; private: - // All-hot placement signal for post_kvflash_init_gate(): set when - // load_target_model takes the all-hot early-return (moe_hybrid null). - bool placement_all_hot_ = false; - // True iff all experts fit hot with the FULL max_ctx KV reservation - // (KVFlash redundant). When false but placement_all_hot_ is true, the pool - // is what kept experts hot — the gate must NOT disable KVFlash. - bool placement_all_hot_full_kv_ = false; + struct HybridSpecBatchProfile; + struct HybridSpecGraphCache; + std::shared_ptr routing_stats_; std::string routing_stats_out_path_; std::string placement_out_path_; @@ -62,6 +59,15 @@ class Qwen35MoeBackend : public Qwen35Backend { bool hybrid_telemetry_ = false; MoeHybridStreamEngine stream_engine_; MoeRoutingCollector * routing_collector_ = nullptr; + // Set when load_target_model's dynamic placement fit ALL experts on GPU + // (the all-hot early-return path). post_kvflash_init_gate() uses this + // as the authoritative placement signal instead of the byte-math fit check, + // since moe_hybrid is null on that path (no cold storage needed). + bool placement_all_hot_ = false; + // True iff all experts fit hot with the FULL max_ctx KV reservation (i.e. + // KVFlash is redundant). When false but placement_all_hot_ is true, the + // pool is what kept experts hot — the gate must NOT disable KVFlash. + bool placement_all_hot_full_kv_ = false; void maybe_post_request_swap(); bool load_dynamic_placement(const char * hotness_path, @@ -91,6 +97,11 @@ class Qwen35MoeBackend : public Qwen35Backend { std::vector & argmax_out, bool capture_features); + // Chunked prefill of tokens[start_pos, end_pos) via hybrid_forward_batch + // slices; advances committed/cur_pos/last_tok. False on forward failure. + bool chunked_prefill(const int32_t * tokens, int start_pos, int end_pos, + std::vector & act_cur, int & committed); + // Pipelined decode: uses cached DeltaNet graphs + optimized FFN loop bool run_pipelined_decode_path(int committed, int n_gen, std::vector & out_tokens, @@ -98,6 +109,8 @@ class Qwen35MoeBackend : public Qwen35Backend { // Persistent pipelined state (initialized once, reused across requests) std::unique_ptr pipe_state_; + std::unique_ptr hybrid_spec_graph_cache_; + bool spec_microbench_done_ = false; bool ensure_pipe_state(int kv_start); }; diff --git a/server/src/server/disk_prefix_cache.cpp b/server/src/server/disk_prefix_cache.cpp index 0ab946b72..852d16be5 100644 --- a/server/src/server/disk_prefix_cache.cpp +++ b/server/src/server/disk_prefix_cache.cpp @@ -534,7 +534,8 @@ bool DiskPrefixCache::maybe_store_continued(int slot, int DiskPrefixCache::cold_prefix_boundary(const std::vector & prompt_ids, const std::vector & boundaries) { - if (disabled() || !layout_known_) return 0; + if (disabled()) return 0; + // layout_known_ is bootstrapped by save() internally; don't block cold-save on empty cache. const int prompt_len = (int)prompt_ids.size(); if (prompt_len <= config_.cold_max_tokens) return 0; diff --git a/server/test/test_kvflash_moe_paged.sh b/server/test/test_kvflash_moe_paged.sh new file mode 100755 index 000000000..e4ee1d7d0 --- /dev/null +++ b/server/test/test_kvflash_moe_paged.sh @@ -0,0 +1,83 @@ +#!/usr/bin/env bash +# Integration test (GPU + model): the qwen35moe KVFlash pooled chunked-prefill +# path (generate_impl chunk loop) is correct under real eviction — +# (1) it preserves protected (sink) context: a fact in the first chunk is +# recalled even though the middle is evicted, and +# (2) it is stable across pool sizes: two different pools produce the same +# greedy (temp 0) answer (no pool-size-dependent corruption). +# +# Both arms use --kvflash with prompt >> pool so the chunk loop + eviction run +# (cold experts via DFLASH_EXPERT_BUDGET_MB keep moe_hybrid active so the MoE +# chunk loop is the live path). max_ctx 131072 keeps the gate from disabling +# KVFlash. This is the silent-corruption gate for PR A. +# +# Hardware-gated. TARGET=/path/...Q3_K_M.gguf bash test_kvflash_moe_paged.sh +set -euo pipefail + +REPO="$(cd "$(dirname "${BASH_SOURCE[0]}")/../.." && pwd)" +SERVER_BIN="${DFLASH_SERVER_BIN:-$REPO/server/build/dflash_server}" +TARGET="${TARGET:-/home/peppi/models/qwen3.6-35b-a3b/Qwen3.6-35B-A3B-UD-Q3_K_M.gguf}" +CHAT_TEMPLATE="${CHAT_TEMPLATE:-/home/peppi/models/qwen3-coder-chat-template.jinja}" +HOST=127.0.0.1; PORT="${PORT:-18080}" +LOCK="${DG_GPU_LOCK:-/tmp/dg_gpu.lock}" +EXPERT_CAP="${DFLASH_EXPERT_BUDGET_MB:-4000}" +NEEDLE="ZEBRA-9" + +[ -x "$SERVER_BIN" ] || { echo "SKIP: server not built: $SERVER_BIN"; exit 0; } +[ -f "$TARGET" ] || { echo "SKIP: target GGUF not found: $TARGET"; exit 0; } +fail() { echo "FAIL: $*" >&2; exit 1; } + +# Needle in the FIRST line (sink chunk, never evicted) + long filler so the +# prompt far exceeds the pool and the chunk loop must evict the middle. +PROMPT_FILE="$(mktemp)" +{ + echo "IMPORTANT: the secret deployment code is $NEEDLE. Remember it." + for i in $(seq 1 400); do + echo "Line $i: lorem ipsum dolor sit amet $i, consectetur $((i*7%97)) adipiscing $((i%13)) elit." + done + echo "What is the secret deployment code? Answer with just the code." +} > "$PROMPT_FILE" +REQ="$(mktemp)" +python3 - "$PROMPT_FILE" "$REQ" <<'PY' +import json,sys +t=open(sys.argv[1]).read() +json.dump({"model":"luce","messages":[{"role":"user","content":t}], + "max_tokens":12,"temperature":0,"stream":False}, open(sys.argv[2],"w")) +PY + +# Start server with the given pool, POST the fixed request, echo the answer. +# Asserts the chunk loop actually ran (pooled prefill + cold experts). +generate() { + local pool="$1" log out; log="$(mktemp)"; out="$(mktemp)" + ( flock "$LOCK" env DFLASH_EXPERT_BUDGET_MB="$EXPERT_CAP" "$SERVER_BIN" "$TARGET" \ + --host "$HOST" --port "$PORT" --max-ctx 131072 --kvflash "$pool" --model-name luce \ + --chat-template-file "$CHAT_TEMPLATE" >"$log" 2>&1 ) & + local pid=$! + for _ in $(seq 1 120); do + curl -fsS "http://$HOST:$PORT/v1/models" >/dev/null 2>&1 && break + kill -0 "$pid" 2>/dev/null || break + sleep 2 + done + curl -fsS "http://$HOST:$PORT/v1/chat/completions" -H 'Content-Type: application/json' \ + --data @"$REQ" 2>/dev/null \ + | python3 -c 'import sys,json; print(json.load(sys.stdin)["choices"][0]["message"]["content"])' \ + >"$out" 2>/dev/null || true + pkill -9 -f "$SERVER_BIN .*--port $PORT" 2>/dev/null || true + wait "$pid" 2>/dev/null || true + grep -qE 'result: [0-9]+ hot experts, [1-9][0-9]* cold experts' "$log" \ + || { cat "$log" >&2; fail "cold experts absent (pool=$pool): chunk loop not exercised"; } + grep -qE 'pooled prefill' "$log" \ + || { cat "$log" >&2; fail "pooled prefill not engaged (pool=$pool): prompt did not exceed pool"; } + cat "$out"; rm -f "$log" "$out" +} + +echo "== Arm 1: --kvflash 2048 (chunk loop, evicting middle) ==" +A="$(generate 2048)"; echo " answer: $A" +echo "== Arm 2: --kvflash 4096 (chunk loop, different pool) ==" +B="$(generate 4096)"; echo " answer: $B" +rm -f "$PROMPT_FILE" "$REQ" + +grep -q "$NEEDLE" <<<"$A" || fail "pool 2048 lost the sink needle ($NEEDLE): paging dropped protected context" +grep -q "$NEEDLE" <<<"$B" || fail "pool 4096 lost the sink needle ($NEEDLE)" +[ "$A" = "$B" ] || fail "pool-size-dependent divergence: '$A' != '$B'" +echo "PASS: pooled prefill preserves sink context ($NEEDLE) and is stable across pool sizes" diff --git a/server/test/test_kvflash_pager.cpp b/server/test/test_kvflash_pager.cpp new file mode 100644 index 000000000..ec8b71c90 --- /dev/null +++ b/server/test/test_kvflash_pager.cpp @@ -0,0 +1,132 @@ +// Unit tests for KvFlashPager serialize/deserialize round-trip and critical- +// chunk pinning, on a CPU ggml backend (no CUDA). +#include "../src/common/kvflash_pager.h" + +#include "ggml.h" +#include "ggml-alloc.h" +#include "ggml-backend.h" +#include "ggml-cpu.h" + +#include +#include +#include +#include + +using namespace dflash::common; + +static void expect(bool cond, const char * msg) { + if (!cond) { std::fprintf(stderr, "FAIL: %s\n", msg); std::exit(1); } +} + +// Small synthetic cache: head_dim=4, pool=512 tok, n_head_kv=2, 1 layer, +// chunk_tokens=64 -> 8 blocks. sink=1 + tail=4 -> min_pool = (1+4+2)*64=448 <= 512. +struct Harness { + ggml_context * ctx = nullptr; + ggml_backend_t backend = nullptr; + ggml_backend_buffer_t buf = nullptr; + std::vector k, v; + + Harness(int head_dim, int pool, int n_head_kv, int n_layer) { + backend = ggml_backend_cpu_init(); + ggml_init_params ip{}; + ip.mem_size = (size_t)(n_layer * 2 + 8) * ggml_tensor_overhead(); + ip.no_alloc = true; + ctx = ggml_init(ip); + for (int l = 0; l < n_layer; l++) { + ggml_tensor * kt = ggml_new_tensor_3d(ctx, GGML_TYPE_F16, head_dim, pool, n_head_kv); + ggml_tensor * vt = ggml_new_tensor_3d(ctx, GGML_TYPE_F16, head_dim, pool, n_head_kv); + k.push_back(kt); v.push_back(vt); + } + buf = ggml_backend_alloc_ctx_tensors(ctx, backend); + } + ~Harness() { + if (buf) ggml_backend_buffer_free(buf); + if (ctx) ggml_free(ctx); + if (backend) ggml_backend_free(backend); + } + KvFlashConfig cfg(int pool, int chunk) const { + KvFlashConfig c; c.pool_tokens = pool; c.chunk_tokens = chunk; + c.sink_chunks = 1; c.tail_window_chunks = 4; return c; + } +}; + +static void test_serialize_roundtrip() { + const int head_dim = 4, pool = 512, nkv = 2, nlayer = 1, chunk = 64; + Harness A(head_dim, pool, nkv, nlayer); + KvFlashPager pa; + expect(pa.attach(A.cfg(pool, chunk), A.k, A.v), "attach A"); + // Map the whole pool and stamp a recognizable ramp into the K/V buffers. + expect(pa.alloc_span(0, pool), "alloc_span fills pool A"); + const size_t kbytes = ggml_nbytes(A.k[0]); + std::vector ramp(kbytes); + for (size_t i = 0; i < kbytes; i++) ramp[i] = (uint8_t)(i * 31 + 7); + ggml_backend_tensor_set(A.k[0], ramp.data(), 0, kbytes); + ggml_backend_tensor_set(A.v[0], ramp.data(), 0, kbytes); + + std::vector blob = pa.serialize(); + expect(!blob.empty(), "serialize produces a blob"); + + // Deserialize into a fresh pager over fresh (zeroed) tensors. + Harness B(head_dim, pool, nkv, nlayer); + KvFlashPager pb; + expect(pb.attach(B.cfg(pool, chunk), B.k, B.v), "attach B"); + expect(pb.deserialize(blob.data(), blob.size()), "deserialize succeeds"); + + // The restored pool must contain the same KV bytes as the source. + std::vector kb(kbytes), vb(kbytes); + ggml_backend_tensor_get(B.k[0], kb.data(), 0, kbytes); + ggml_backend_tensor_get(B.v[0], vb.data(), 0, kbytes); + expect(kb == ramp, "K restored byte-identical after round-trip"); + expect(vb == ramp, "V restored byte-identical after round-trip"); + + // A blob from a mismatched layout must be rejected (header validation). + KvFlashConfig bad = B.cfg(pool, chunk); + blob[8] ^= 0xFF; // corrupt a header byte + KvFlashPager pc; + Harness C(head_dim, pool, nkv, nlayer); + expect(pc.attach(C.cfg(pool, chunk), C.k, C.v), "attach C"); + expect(!pc.deserialize(blob.data(), blob.size()), "corrupt-header blob rejected"); + (void)bad; + std::printf("ok: serialize/deserialize round-trip + header guard\n"); +} + +static void test_pinning() { + const int head_dim = 4, pool = 512, nkv = 2, nlayer = 1, chunk = 64; + Harness H(head_dim, pool, nkv, nlayer); + KvFlashPager p; + expect(p.attach(H.cfg(pool, chunk), H.k, H.v), "attach pin"); + + // No pins -> nothing pinned. + expect(!p.is_pinned(2), "pre: chunk 2 unpinned"); + + // Pin a single mid chunk (tokens 128..191 -> chunk 2). 8 blocks, sink1+tail4 + // +pinned1+2 = 8 <= 8 -> allowed. + p.pin_range(128, 191); + expect(p.is_pinned(2), "chunk 2 pinned"); + expect(!p.is_pinned(1), "chunk 1 not pinned"); + expect(!p.is_pinned(3), "chunk 3 not pinned"); + + // unpin clears it. + p.unpin_all(); + expect(!p.is_pinned(2), "unpin_all clears chunk 2"); + + // Deadlock guard: pinning the whole pool (8 chunks) would leave no evictable + // block (1+4+8+2 = 15 > 8) -> refused, nothing pinned. + p.pin_range(0, pool - 1); + expect(!p.is_pinned(0) && !p.is_pinned(4) && !p.is_pinned(7), + "over-pin refused by deadlock guard"); + + // reset() also clears pins. + p.pin_range(128, 191); + expect(p.is_pinned(2), "re-pin chunk 2"); + p.reset(); + expect(!p.is_pinned(2), "reset clears pins"); + std::printf("ok: pinning + deadlock guard + reset\n"); +} + +int main() { + test_serialize_roundtrip(); + test_pinning(); + std::printf("PASS: kvflash pager (serde + pinning)\n"); + return 0; +} diff --git a/server/test/test_server_unit.cpp b/server/test/test_server_unit.cpp index f4d3b6025..a8166e065 100644 --- a/server/test/test_server_unit.cpp +++ b/server/test/test_server_unit.cpp @@ -2592,15 +2592,10 @@ static void test_disk_cache_cold_prefix_finds_boundary() { cfg.min_tokens = 512; DiskPrefixCache cache(cfg, backend); cache.init(); - // Manually mark layout as known (hack for testing without real snapshots). - // Since cold_prefix_boundary checks layout_known_, and we can't easily - // set it without a real snapshot, the function will return 0. - // This tests that short prompts / bad boundaries correctly return 0. std::vector prompt(10000, 1); std::vector boundaries = {1000, 2000, 3000, 4000, 6000, 8000}; - // Without layout_known_, returns 0. int result = cache.cold_prefix_boundary(prompt, boundaries); - TEST_ASSERT(result == 0); // layout not known yet + TEST_ASSERT(result == 4000); // best eligible boundary, no entry cached yet rm_rf(dir); }