Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions server/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -619,6 +619,12 @@ if(DFLASH27B_TESTS)
target_include_directories(test_kvflash_placement PRIVATE ${DFLASH27B_SRC_INCLUDE_DIRS})
add_test(NAME kvflash_placement COMMAND test_kvflash_placement)
endif()
if(EXISTS "${CMAKE_CURRENT_SOURCE_DIR}/test/test_kvflash_pager.cpp")
add_executable(test_kvflash_pager test/test_kvflash_pager.cpp)
target_include_directories(test_kvflash_pager PRIVATE ${DFLASH27B_SRC_INCLUDE_DIRS})
target_link_libraries(test_kvflash_pager PRIVATE dflash_common)
add_test(NAME kvflash_pager COMMAND test_kvflash_pager)
endif()
if(EXISTS "${CMAKE_CURRENT_SOURCE_DIR}/test/test_bandit_integration.cpp")
add_executable(test_bandit_integration test/test_bandit_integration.cpp)
target_include_directories(test_bandit_integration PRIVATE ${DFLASH27B_SRC_INCLUDE_DIRS})
Expand Down
2 changes: 1 addition & 1 deletion server/src/common/backend_factory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ bool arch_supports_remote_draft(const std::string & arch) {
}

bool arch_supports_pflash_compression(const std::string & arch) {
return arch == "qwen35" || arch == "qwen3";
return arch == "qwen35" || arch == "qwen3" || arch == "qwen35moe";
}

std::unique_ptr<ModelBackend> create_backend(const BackendArgs & args) {
Expand Down
161 changes: 160 additions & 1 deletion server/src/common/kvflash_pager.h
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,7 @@ class KvFlashPager {

// Drop all mappings and host backing (new request / cache reset).
// Cumulative stats are kept; the epoch advances so cached masks refill.
// Pins are cleared: the caller (backend) re-applies them after rebuild.
void reset() {
#ifdef KVFLASH_HAS_ASYNC_DMA
if (page_stream_) {
Expand All @@ -183,6 +184,7 @@ class KvFlashPager {
stats_.host_bytes = 0;
cur_chunk_ = 0;
epoch_++;
std::fill(pinned_.begin(), pinned_.end(), 0);
has_pending_page_in_ = false;
}

Expand All @@ -207,6 +209,55 @@ class KvFlashPager {
// Optional external relevance score; higher = keep. Falls back to LRU.
std::function<float(int /*chunk*/)> score_hook;

// ── Critical-chunk pinning ──────────────────────────────────────────
// Pinned chunks are never chosen as eviction victims (OR-ed on top of the
// sink/tail protections). Empty by default → byte-identical non-pin path.
// Pins are cleared by reset() and re-applied by the backend after each
// prefill/restore rebuild via apply_kvflash_pins().

// Pin logical token range [tok_lo, tok_hi) half-open.
// Maps to chunk range [c_lo, c_hi] inclusive where c_hi covers the last
// token in the range (tok_hi - 1), not tok_hi itself, so an exact boundary
// at tok_hi does not pin the next chunk beyond the range.
// Best-effort deadlock guard: if (sink+tail+n_pinned+2) > n_blocks_ the
// span is refused with a one-line warning and the function returns without
// setting any pin.
void pin_range(int64_t tok_lo, int64_t tok_hi) {
if (!attached() || tok_lo >= tok_hi) return;
Comment thread
cubic-dev-ai[bot] marked this conversation as resolved.
const int c_lo = (int)(tok_lo / cfg_.chunk_tokens);
const int c_hi = (int)((tok_hi - 1) / cfg_.chunk_tokens);
if (c_lo > c_hi || c_lo < 0) return;
// Count currently-pinned chunks + the new ones.
int currently_pinned = 0;
for (int c = 0; c < (int)pinned_.size(); c++) {
if (pinned_[c]) currently_pinned++;
}
int new_pins = 0;
for (int c = c_lo; c <= c_hi; c++) {
if (c >= (int)pinned_.size() || !pinned_[c]) new_pins++;
}
// Deadlock guard: fixed protections + pinned + 2 (one evictable victim +
// one append-head) must fit in the pool.
if (cfg_.sink_chunks + cfg_.tail_window_chunks + currently_pinned + new_pins + 2 > n_blocks_) {
std::fprintf(stderr,
"[kvflash] pin_range [%lld,%lld] refused: "
"sink=%d tail=%d pinned=%d new=%d pool_blocks=%d — would deadlock eviction\n",
(long long)tok_lo, (long long)tok_hi,
cfg_.sink_chunks, cfg_.tail_window_chunks,
currently_pinned, new_pins, n_blocks_);
return;
}
if (c_hi + 1 > (int)pinned_.size()) pinned_.resize((size_t)c_hi + 1, 0);
for (int c = c_lo; c <= c_hi; c++) pinned_[(size_t)c] = 1;
}

bool is_pinned(int c) const {
return c >= 0 && c < (int)pinned_.size() && pinned_[(size_t)c];
}

// Clear all pins (also called by reset()).
void unpin_all() { std::fill(pinned_.begin(), pinned_.end(), 0); }

// Allocate slots for [kv_start, kv_start + n_tok) ahead of a forward
// step (evicting LRU/low-score chunks as needed). False — with a
// diagnostic — if the pool has no evictable block left.
Expand Down Expand Up @@ -397,7 +448,8 @@ class KvFlashPager {
const ChunkState & st = chunks_[c];
if (st.block < 0 && !st.on_host) continue; // never materialized
const bool prot = c < cfg_.sink_chunks ||
c > cur_chunk_ - 1 - cfg_.tail_window_chunks;
c > cur_chunk_ - 1 - cfg_.tail_window_chunks ||
is_pinned(c);
cands.push_back({c, prot ? 3.4e38f : score_hook(c)});
}
std::sort(cands.begin(), cands.end(),
Expand All @@ -418,6 +470,111 @@ class KvFlashPager {
return events;
}

// Snapshot all resident+paged-out chunks into a flat byte blob.
// Layout: 8-byte magic, header fields (6×uint32), then for each logical
// chunk c in [0, n_chunks): chunk_bytes_ bytes in fixed segment order
// (layer-major, K then V, head-minor) — matching copy_chunk.
std::vector<uint8_t> serialize() const {
static constexpr uint64_t kMagic = 0x4b56464c41534800ULL; // "KVFLASH\0"
const int nc = (int)chunks_.size();
const size_t hdr = sizeof(uint64_t) + 6 * sizeof(uint32_t);
std::vector<uint8_t> out;
out.resize(hdr + (size_t)nc * chunk_bytes_, 0);
uint8_t * p = out.data();
std::memcpy(p, &kMagic, 8); p += 8;
auto w32 = [&](uint32_t v) { std::memcpy(p, &v, 4); p += 4; };
w32((uint32_t)nc);
w32((uint32_t)cfg_.chunk_tokens);
w32((uint32_t)n_head_kv_);
w32((uint32_t)k_seg_bytes_);
w32((uint32_t)v_seg_bytes_);
w32((uint32_t)chunk_bytes_);
for (int c = 0; c < nc; ++c) {
uint8_t * dst = out.data() + hdr + (size_t)c * chunk_bytes_;
const ChunkState & st = chunks_[c];
if (st.block >= 0) {
// Resident: gather from pool tensors in fixed segment order
// (layer-major, K then V, head-minor) — matching copy_chunk.
uint8_t * q = dst;
for (size_t l = 0; l < attn_k_.size(); ++l) {
for (int kv = 0; kv < 2; ++kv) {
ggml_tensor * t = kv == 0 ? attn_k_[l] : attn_v_[l];
const size_t seg = kv == 0 ? k_seg_bytes_ : v_seg_bytes_;
for (int h = 0; h < n_head_kv_; ++h) {
const size_t off = (size_t)st.block * cfg_.chunk_tokens * t->nb[1]
+ (size_t)h * t->nb[2];
ggml_backend_tensor_get(t, q, off, seg);
q += seg;
}
}
}
} else if (st.on_host) {
// Host-backed: copy verbatim. host_data is a raw pinned pointer
// under async DMA, a std::vector otherwise.
#ifdef KVFLASH_HAS_ASYNC_DMA
std::memcpy(dst, st.host_data, chunk_bytes_);
#else
std::memcpy(dst, st.host_data.data(), chunk_bytes_);
#endif
}
// else: never written — stays zero-filled from the resize above.
}
return out;
}

// Restore state from a blob produced by serialize(). Returns false on
// header mismatch (layout drift guard). Callers must rebuild slot masks.
//
// Ordering: pre-size chunks_ so each entry exists before slot_for() runs.
// We set host_data + on_host=true, THEN call slot_for(). slot_for's recall
// branch sees on_host==true and calls copy_chunk(to_host=false) itself —
// no extra copy_chunk needed (that would double-write).
bool deserialize(const uint8_t * data, size_t n) {
static constexpr uint64_t kMagic = 0x4b56464c41534800ULL;
const size_t hdr = sizeof(uint64_t) + 6 * sizeof(uint32_t);
if (n < hdr) return false;
const uint8_t * p = data;
uint64_t magic = 0; std::memcpy(&magic, p, 8); p += 8;
if (magic != kMagic) return false;
auto r32 = [&]() { uint32_t v = 0; std::memcpy(&v, p, 4); p += 4; return v; };
const int nc = (int)r32();
const int ct = (int)r32();
const int nhkv = (int)r32();
const size_t kseg = (size_t)r32();
const size_t vseg = (size_t)r32();
const size_t cb = (size_t)r32();
if (ct != cfg_.chunk_tokens || nhkv != n_head_kv_ ||
kseg != k_seg_bytes_ || vseg != v_seg_bytes_ || cb != chunk_bytes_) {
return false;
}
if (n < hdr + (size_t)nc * chunk_bytes_) return false;
reset();
chunks_.resize(nc); // pre-size so slot_for doesn't resize again
for (int c = 0; c < nc; ++c) {
const uint8_t * src = data + hdr + (size_t)c * chunk_bytes_;
ChunkState & st = chunks_[c];
// Park bytes in host_data; slot_for's recall branch copies to pool.
// host_data is a raw pinned pointer under async DMA, a vector otherwise.
#ifdef KVFLASH_HAS_ASYNC_DMA
if (!st.host_data) {
if (cudaMallocHost(&st.host_data, chunk_bytes_) != cudaSuccess) return false;
stats_.host_bytes += (int64_t)chunk_bytes_;
}
std::memcpy(st.host_data, src, chunk_bytes_);
#else
st.host_data.assign(src, src + chunk_bytes_);
#endif
st.on_host = true;
// slot_for assigns a block and auto-recalls via copy_chunk(to_host=false).
slot_for((int64_t)c * cfg_.chunk_tokens);
}
#ifdef KVFLASH_HAS_ASYNC_DMA
// Recalls above are async on page_stream_; settle before the pool is read.
if (has_pending_page_in_) synchronize_paging();
#endif
return true;
}

private:
struct ChunkState {
int block = -1; // pool block index, -1 = not resident
Expand All @@ -441,6 +598,7 @@ class KvFlashPager {
if (chunks_[c].block < 0) continue;
if (c < cfg_.sink_chunks) continue;
if (c > cur_chunk_ - 1 - cfg_.tail_window_chunks) continue;
if (is_pinned(c)) continue;
if (score_hook) {
const float s = score_hook(c);
if (victim < 0 || s < v_score) { victim = c; v_score = s; }
Expand Down Expand Up @@ -566,6 +724,7 @@ class KvFlashPager {
std::vector<ChunkState> chunks_;
std::vector<int> free_blocks_;
std::vector<uint8_t> zero_buf_; // used by zero_block() in non-CUDA builds
std::vector<uint8_t> pinned_; // per-chunk pin flag; empty = no pins
KvFlashStats stats_;
size_t k_seg_bytes_ = 0, v_seg_bytes_ = 0, chunk_bytes_ = 0;
int n_blocks_ = 0, n_head_kv_ = 0, cur_chunk_ = 0;
Expand Down
18 changes: 17 additions & 1 deletion server/src/common/kvflash_qk.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,10 @@ inline void kvflash_qk_chunk_scores(
const float * query,
const KvFlashQkDims & d,
std::vector<float> & out,
float missing_score = -2.0f) {
float missing_score = -2.0f,
const float * seeded = nullptr,
float seeded_sentinel = -std::numeric_limits<float>::infinity(),
int seeded_n = -1) {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2: Default seeded_n = -1 creates an out-of-bounds read risk when a caller passes a seeded buffer shorter than n_chunks without setting seeded_n.

Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At server/src/common/kvflash_qk.h, line 53:

<comment>Default `seeded_n = -1` creates an out-of-bounds read risk when a caller passes a `seeded` buffer shorter than `n_chunks` without setting `seeded_n`.</comment>

<file context>
@@ -47,7 +47,10 @@ inline void kvflash_qk_chunk_scores(
+    float missing_score = -2.0f,
+    const float * seeded = nullptr,
+    float seeded_sentinel = -std::numeric_limits<float>::infinity(),
+    int seeded_n = -1) {
     const int group = d.n_q_heads / d.n_kv_heads;
     const int n_chunks = (int)pooled_keys.size();
</file context>

const int group = d.n_q_heads / d.n_kv_heads;
const int n_chunks = (int)pooled_keys.size();
out.assign((size_t)n_chunks, missing_score);
Expand Down Expand Up @@ -83,6 +86,19 @@ inline void kvflash_qk_chunk_scores(
}
out[(size_t)c] = acc * inv_layers; // layer-MEAN (Phase-0 config)
}
// Seeded fallback: for chunks with no pooled key, use the ledger score from
// a prior turn if it is not the sentinel (i.e. it was actually scored).
// seeded_n bounds the valid range of the seeded array; chunks beyond it
// (n_chunks > seeded array length) fall back to missing_score safely.
if (seeded) {
const int seeded_limit = (seeded_n >= 0) ? seeded_n : n_chunks;
for (int c = 0; c < n_chunks; c++) {
if (!pooled_keys[(size_t)c] && c < seeded_limit &&
seeded[c] != seeded_sentinel) {
out[(size_t)c] = seeded[c];
}
}
}
}

} // namespace dflash::common
Expand Down
40 changes: 39 additions & 1 deletion server/src/common/moe_hybrid_ffn_eval.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1009,7 +1009,7 @@ static int mmq_safe_sub_batch() {
static const int v = [](){
const char * e = std::getenv("DFLASH_MMQ_SUB_BATCH");
if (e) return std::max(1, std::atoi(e));
return (query_gpu_compute_sm() >= 80) ? 8 : 1;
return (query_gpu_compute_sm() >= 80) ? 4 : 1; // Q4_K MMVQ cap=4 on sm_86
}();
return v;
}
Expand Down Expand Up @@ -1066,6 +1066,27 @@ static bool eval_moe_hybrid_ffn_batched_core(
if (cl >= 0) { cold_sel[i] = cl; cold_wts[i] = selected_weights[i]; fp_has_cold = true; }
}
}
// Dummy slots (wts==0) may alias a real hot expert's local ID per token →
// ids_to_sorted_host drops entries → ASSERT in slow ggml_mul_mat_id path.
for (int t = 0; t < n_tokens; ++t) {
const int base = t * n_used;
int32_t next = 0;
for (int s = 0; s < n_used; ++s) {
if (hot_wts[base + s] > 0.0f) continue;
// Bounded search: at most n_hot_init probes. If every ID in
Comment thread
cubic-dev-ai[bot] marked this conversation as resolved.
// [0, n_hot_init) is already taken by another slot we break and
// keep `next` as-is (duplicate), which is safe — the zero-weight
// slot is ignored by ids_to_sorted_host anyway.
int tries = 0;

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2: Complex dummy-slot normalization logic is duplicated between the cached fast path and the inline rebuild path in the same function. This increases maintenance risk: a future bug fix or behavioral tweak to one loop can be missed in the other, producing path-dependent behavior for the same routing inputs. Extract a shared helper and call it from both paths.

Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At server/src/common/moe_hybrid_ffn_eval.cpp, line 1080:

<comment>Complex dummy-slot normalization logic is duplicated between the cached fast path and the inline rebuild path in the same function. This increases maintenance risk: a future bug fix or behavioral tweak to one loop can be missed in the other, producing path-dependent behavior for the same routing inputs. Extract a shared helper and call it from both paths.</comment>

<file context>
@@ -1073,8 +1073,16 @@ static bool eval_moe_hybrid_ffn_batched_core(
+                // [0, n_hot_init) is already taken by another slot we break and
+                // keep `next` as-is (duplicate), which is safe — the zero-weight
+                // slot is ignored by ids_to_sorted_host anyway.
+                int tries = 0;
+                while (tries < n_hot_init &&
+                       [&]{ for (int k=0; k<n_used; ++k) if (k!=s && hot_sel[base+k]==next) return true; return false; }()) {
</file context>

while (tries < n_hot_init &&
[&]{ for (int k=0; k<n_used; ++k) if (k!=s && hot_sel[base+k]==next) return true; return false; }()) {
if (++next >= n_hot_init) next = 0;
++tries;
}
hot_sel[base + s] = next++;
if (next >= n_hot_init) next = 0;
}
}

CachedHotBatchedGraph & hg = storage.hot_batched_mixed[n_tokens];
const bool hg_ok = (hg.valid() && hg.n_tokens == n_tokens)
Expand Down Expand Up @@ -1145,6 +1166,23 @@ static bool eval_moe_hybrid_ffn_batched_core(
}
}
}
// Dummy slots (wts==0) may alias a real hot expert's local ID per token →
// ids_to_sorted_host drops entries → ASSERT in slow ggml_mul_mat_id path.
for (int t = 0; t < n_tokens; ++t) {
const int base = t * n_used;
int32_t next = 0;
for (int s = 0; s < n_used; ++s) {
if (hot_wts[base + s] > 0.0f) continue;
int tries = 0;
while (tries < n_hot_init &&
[&]{ for (int k=0; k<n_used; ++k) if (k!=s && hot_sel[base+k]==next) return true; return false; }()) {
if (++next >= n_hot_init) next = 0;
++tries;
}
hot_sel[base + s] = next++;
if (next >= n_hot_init) next = 0;
}
}

// ── Step 2: Build and run hot GPU graph (includes shared expert always) ──
std::vector<float> hot_partial((size_t)n_embd * (size_t)n_tokens, 0.0f);
Expand Down
Loading