From 979afe8a543ff40d6f88155c47b4626b86424bf3 Mon Sep 17 00:00:00 2001 From: Ragnor Comerford Date: Sun, 28 Jun 2026 18:01:13 +0200 Subject: [PATCH 1/6] perf(engine): scope the CSR topology index to traversed edges, reuse it cross-branch The in-memory CSR graph index was built over every edge type in the catalog and cache-keyed by the resolved snapshot id, so a single-edge join (`$x identifiesPerson $p`) full-scanned every edge table in the graph (the 40-60s / 428s-first-traversal hang), and a lazy-fork branch cold-rebuilt main's index. Two cuts close that: - Scope (A2): build only the edge types the query traverses (`referenced_edge_types` over Expand/AntiJoin, exhaustive match), not the whole catalog. Threaded through GraphIndexHandle -> RuntimeCache; cache-keyed on the scoped set. - Cross-branch reuse (A1): key RuntimeCache by each edge table's physical identity (table_key, version, table_branch, e_tag) instead of the snapshot id, so a lazy-fork branch whose edge tables physically are main's reuses main's built index. Local-FS (e_tag None) falls back to refresh-invalidation. Adds graph_build_count/graph_edges_built probes for the cost tests. --- crates/omnigraph/src/db/omnigraph.rs | 3 +- .../omnigraph/src/db/omnigraph/table_ops.rs | 13 +- crates/omnigraph/src/exec/query.rs | 179 ++++++++++++++++-- crates/omnigraph/src/graph_index/mod.rs | 6 + crates/omnigraph/src/instrumentation.rs | 18 ++ crates/omnigraph/src/runtime_cache.rs | 58 ++++-- 6 files changed, 240 insertions(+), 37 deletions(-) diff --git a/crates/omnigraph/src/db/omnigraph.rs b/crates/omnigraph/src/db/omnigraph.rs index c3860c56..7a6c4512 100644 --- a/crates/omnigraph/src/db/omnigraph.rs +++ b/crates/omnigraph/src/db/omnigraph.rs @@ -1298,8 +1298,9 @@ impl Omnigraph { pub(crate) async fn graph_index_for_resolved( &self, resolved: &ResolvedTarget, + edge_types: &std::collections::HashMap, ) -> Result> { - table_ops::graph_index_for_resolved(self, resolved).await + table_ops::graph_index_for_resolved(self, resolved, edge_types).await } /// Ensure BTree scalar indices exist on key columns. diff --git a/crates/omnigraph/src/db/omnigraph/table_ops.rs b/crates/omnigraph/src/db/omnigraph/table_ops.rs index f73f2a26..d27856d0 100644 --- a/crates/omnigraph/src/db/omnigraph/table_ops.rs +++ b/crates/omnigraph/src/db/omnigraph/table_ops.rs @@ -10,15 +10,22 @@ pub(super) async fn graph_index(db: &Omnigraph) -> Result = catalog + .edge_types + .iter() + .map(|(name, et)| (name.clone(), (et.from_type.clone(), et.to_type.clone()))) + .collect(); + db.runtime_cache.graph_index(&resolved, &edge_types).await } pub(super) async fn graph_index_for_resolved( db: &Omnigraph, resolved: &ResolvedTarget, + edge_types: &std::collections::HashMap, ) -> Result> { - let catalog = db.catalog(); - db.runtime_cache.graph_index(resolved, &catalog).await + db.runtime_cache.graph_index(resolved, edge_types).await } pub(super) async fn ensure_indices(db: &Omnigraph) -> Result> { diff --git a/crates/omnigraph/src/exec/query.rs b/crates/omnigraph/src/exec/query.rs index 23e14349..aaf071d8 100644 --- a/crates/omnigraph/src/exec/query.rs +++ b/crates/omnigraph/src/exec/query.rs @@ -50,7 +50,11 @@ impl Omnigraph { .any(|op| matches!(op, IROp::Expand { .. } | IROp::AntiJoin { .. })); // Lazy: an index-served query with no AntiJoin never builds the CSR. let graph_index = if needs_graph { - GraphIndexHandle::cached(self, &resolved) + GraphIndexHandle::cached( + self, + &resolved, + referenced_edge_types(&ir.pipeline, &catalog), + ) } else { GraphIndexHandle::none() }; @@ -95,14 +99,9 @@ impl Omnigraph { .any(|op| matches!(op, IROp::Expand { .. } | IROp::AntiJoin { .. })); // Lazy build against this historical snapshot (not the RuntimeCache, // which is keyed to live branch targets); only a CSR-path Expand or an - // AntiJoin triggers it. + // AntiJoin triggers it. Scoped to the edges this query traverses. let graph_index = if needs_graph { - let edge_types = catalog - .edge_types - .iter() - .map(|(name, et)| (name.clone(), (et.from_type.clone(), et.to_type.clone()))) - .collect(); - GraphIndexHandle::direct(&snapshot, edge_types) + GraphIndexHandle::direct(&snapshot, referenced_edge_types(&ir.pipeline, &catalog)) } else { GraphIndexHandle::none() }; @@ -762,6 +761,51 @@ fn execute_pipeline<'a>( }) } +/// The edge types a query's pipeline actually traverses, mapped to their +/// `(from_type, to_type)` endpoints. Recurses through `AntiJoin` inner pipelines +/// (whose bulk fast path consumes the CSR for the inner `Expand`'s edge). The +/// CSR build is scoped to exactly this set instead of every edge type in the +/// catalog — otherwise a single-edge join (`$x identifiesPerson $p`) that lands +/// on the CSR path would scan the whole graph's edge data (every message, +/// relationship, … table), the cause of the cross-edge-join hang. Empty when the +/// only traversal is an `AntiJoin` with no inner `Expand` — that shape never asks +/// the handle for an index, so an empty build is never realized. +fn referenced_edge_types( + pipeline: &[IROp], + catalog: &Catalog, +) -> HashMap { + let mut names = std::collections::BTreeSet::new(); + collect_referenced_edge_names(pipeline, &mut names); + names + .into_iter() + .filter_map(|name| { + catalog + .edge_types + .get(&name) + .map(|et| (name, (et.from_type.clone(), et.to_type.clone()))) + }) + .collect() +} + +fn collect_referenced_edge_names( + pipeline: &[IROp], + out: &mut std::collections::BTreeSet, +) { + for op in pipeline { + match op { + IROp::Expand { edge_type, .. } => { + out.insert(edge_type.clone()); + } + IROp::AntiJoin { inner, .. } => collect_referenced_edge_names(inner, out), + // Exhaustive on purpose (no `_` arm): a new edge-referencing IROp must + // force a compile error here rather than silently under-scope the CSR + // build — an omitted edge would fail at runtime with "no adjacency + // index for edge". The non-traversal ops reference no edges. + IROp::NodeScan { .. } | IROp::Filter(_) => {} + } + } +} + /// Lazily provides the in-memory CSR graph index, building it on first use and /// memoizing for the rest of the query. Indexed-mode Expand never asks for it, /// so a query that is entirely index-served and has no AntiJoin never pays the @@ -776,7 +820,11 @@ pub struct GraphIndexHandle<'a> { enum GraphIndexBuilder<'a> { None, - Cached(&'a Omnigraph, &'a crate::db::ResolvedTarget), + Cached( + &'a Omnigraph, + &'a crate::db::ResolvedTarget, + HashMap, + ), Direct(&'a Snapshot, HashMap), } @@ -788,10 +836,14 @@ impl<'a> GraphIndexHandle<'a> { } } - fn cached(db: &'a Omnigraph, resolved: &'a crate::db::ResolvedTarget) -> Self { + fn cached( + db: &'a Omnigraph, + resolved: &'a crate::db::ResolvedTarget, + edge_types: HashMap, + ) -> Self { Self { cell: tokio::sync::OnceCell::new(), - builder: GraphIndexBuilder::Cached(db, resolved), + builder: GraphIndexBuilder::Cached(db, resolved, edge_types), } } @@ -810,8 +862,8 @@ impl<'a> GraphIndexHandle<'a> { .get_or_try_init(|| async { match &self.builder { GraphIndexBuilder::None => Ok::>, OmniError>(None), - GraphIndexBuilder::Cached(db, resolved) => { - Ok(Some(db.graph_index_for_resolved(resolved).await?)) + GraphIndexBuilder::Cached(db, resolved, edge_types) => { + Ok(Some(db.graph_index_for_resolved(resolved, edge_types).await?)) } GraphIndexBuilder::Direct(snapshot, edge_types) => { Ok(Some(Arc::new(GraphIndex::build(snapshot, edge_types).await?))) @@ -2460,6 +2512,107 @@ mod expand_chooser_tests { } } +#[cfg(test)] +mod referenced_edge_types_tests { + use super::*; + + fn node_scan(var: &str, ty: &str) -> IROp { + IROp::NodeScan { + variable: var.to_string(), + type_name: ty.to_string(), + filters: Vec::new(), + } + } + + fn expand(edge: &str) -> IROp { + IROp::Expand { + src_var: "a".into(), + dst_var: "b".into(), + edge_type: edge.to_string(), + direction: Direction::Out, + dst_type: "X".into(), + min_hops: 1, + max_hops: Some(1), + dst_filters: Vec::new(), + } + } + + fn names(pipeline: &[IROp]) -> Vec { + let mut set = std::collections::BTreeSet::new(); + collect_referenced_edge_names(pipeline, &mut set); + set.into_iter().collect() + } + + #[test] + fn collects_a_single_expand_edge() { + assert_eq!( + names(&[node_scan("x", "ExternalID"), expand("identifiesPerson")]), + vec!["identifiesPerson".to_string()] + ); + } + + #[test] + fn ignores_non_traversal_ops_and_dedups() { + // A pipeline that touches one edge twice references exactly that one edge — + // never the whole catalog (the cross-edge-join hang this scoping fixes). + let pipeline = vec![ + node_scan("x", "ExternalID"), + expand("identifiesPerson"), + IROp::Filter(IRFilter { + left: IRExpr::PropAccess { + variable: "p".into(), + property: "name".into(), + }, + op: omnigraph_compiler::query::ast::CompOp::Eq, + right: IRExpr::Literal(Literal::String("a".into())), + }), + expand("identifiesPerson"), + ]; + assert_eq!(names(&pipeline), vec!["identifiesPerson".to_string()]); + } + + #[test] + fn recurses_through_anti_join_inner_pipeline() { + // The bulk anti-join fast path consumes the CSR for the inner Expand's + // edge, so its edge type must be in scope even though it is nested. + let pipeline = vec![ + node_scan("p", "Person"), + expand("knows"), + IROp::AntiJoin { + outer_var: "p".into(), + inner: vec![expand("worksAt")], + }, + ]; + assert_eq!( + names(&pipeline), + vec!["knows".to_string(), "worksAt".to_string()] + ); + } + + #[test] + fn recurses_through_nested_anti_joins() { + let pipeline = vec![IROp::AntiJoin { + outer_var: "p".into(), + inner: vec![IROp::AntiJoin { + outer_var: "c".into(), + inner: vec![expand("deepEdge")], + }], + }]; + assert_eq!(names(&pipeline), vec!["deepEdge".to_string()]); + } + + #[test] + fn anti_join_with_no_inner_expand_references_no_edges() { + // A predicate-only anti-join never asks the handle for an index, so the + // empty set is correct — no whole-graph build is realized. + let pipeline = vec![IROp::AntiJoin { + outer_var: "p".into(), + inner: vec![node_scan("c", "Company")], + }]; + assert!(names(&pipeline).is_empty()); + } +} + #[cfg(test)] mod literal_lowering_tests { use super::*; diff --git a/crates/omnigraph/src/graph_index/mod.rs b/crates/omnigraph/src/graph_index/mod.rs index ae3173a2..56831dda 100644 --- a/crates/omnigraph/src/graph_index/mod.rs +++ b/crates/omnigraph/src/graph_index/mod.rs @@ -113,6 +113,12 @@ impl GraphIndex { snapshot: &Snapshot, edge_types: &HashMap, // edge_name → (from_type, to_type) ) -> Result { + // INVARIANT (A1 graph-index cache key): the topology is a pure function of + // the edge tables' `src`/`dst` columns and nothing else. `RuntimeCache` + // keys `GraphIndexCacheKey` on each edge table's physical identity + // `(table_key, version, table_branch, e_tag)` so a lazy-fork branch reuses + // main's built index. If you read node tables, schema, or other state here, + // add it to that key or the cache will serve a stale index. let mut type_indices: HashMap = HashMap::new(); let mut csr = HashMap::new(); let mut csc = HashMap::new(); diff --git a/crates/omnigraph/src/instrumentation.rs b/crates/omnigraph/src/instrumentation.rs index 97186869..2c6f71cb 100644 --- a/crates/omnigraph/src/instrumentation.rs +++ b/crates/omnigraph/src/instrumentation.rs @@ -60,6 +60,14 @@ pub struct QueryIoProbes { /// Internal/system-table (`__manifest`, `_graph_commits*`) open CALLS — the /// complement of `data_open_count`, kept for symmetry and debugging. pub internal_open_count: Arc, + /// Counts topology-index builds (the `RuntimeCache::graph_index` cache-miss + /// path). A cost test asserts a fresh branch whose edge tables are unchanged + /// from main reuses main's cached index (0 builds) rather than rebuilding it. + pub graph_build_count: Arc, + /// Edge tables included in topology builds this query (summed over build + /// invocations). A cost test asserts a query referencing one edge builds only + /// that edge, not every catalog edge (the cold-build shrink A2 ships). + pub graph_edges_built: Arc, } tokio::task_local! { @@ -130,6 +138,16 @@ pub(crate) fn record_open(uri: &str) { }); } +/// Record one topology-index build over `edges` edge tables (the +/// `RuntimeCache::graph_index` cache-miss path). No-op when no probes are +/// installed (production). +pub(crate) fn record_graph_build(edges: usize) { + let _ = current(|p| { + p.graph_build_count.fetch_add(1, Ordering::Relaxed); + p.graph_edges_built.fetch_add(edges as u64, Ordering::Relaxed); + }); +} + /// Per-operation staged-write counts, installed for a task via /// [`with_merge_write_probes`]. Lets a cost-budget test assert WHICH staged-write /// primitive an operation invokes — e.g. that an append-only fast-forward merge diff --git a/crates/omnigraph/src/runtime_cache.rs b/crates/omnigraph/src/runtime_cache.rs index e85a90a1..d4b99f8b 100644 --- a/crates/omnigraph/src/runtime_cache.rs +++ b/crates/omnigraph/src/runtime_cache.rs @@ -4,16 +4,20 @@ use std::sync::Arc; use lance::Dataset; use lance::session::Session; -use omnigraph_compiler::catalog::Catalog; use tokio::sync::Mutex; use crate::db::ResolvedTarget; use crate::error::Result; use crate::graph_index::GraphIndex; +/// Cache key for a built `GraphIndex`. Keyed (A1) by the physical identity of the +/// edge tables the topology is derived from, NOT by the resolved snapshot id. The +/// topology is a pure function of the edge tables' `src`/`dst`, so two snapshots +/// (e.g. main and a lazy-fork branch whose edge tables physically *are* main's) +/// with identical edge tables share one built index: a fresh branch reuses main's +/// instead of rebuilding it from a cold scan. #[derive(Debug, Clone, PartialEq, Eq, Hash)] struct GraphIndexCacheKey { - snapshot_id: String, edge_tables: Vec, } @@ -22,6 +26,13 @@ struct GraphIndexTableState { table_key: String, table_version: u64, table_branch: Option, + /// Lance manifest incarnation token for this edge table version. Preserves the + /// incarnation distinction the dropped synthetic snapshot id used to carry: a + /// branch deleted and recreated at the same version number gets a new e_tag, so + /// the cache rebuilds instead of serving stale topology. `None` only on stores + /// without e_tags (local FS); there a same-branch manifest refresh clears the + /// cache as the fallback (the read-path gap in docs/dev/invariants.md). + e_tag: Option, } #[derive(Debug, Default)] @@ -40,12 +51,18 @@ impl RuntimeCache { cache.entries.invalidate_all(); } + /// Build (or fetch) the CSR/CSC graph index scoped to exactly `edge_types` — + /// the edge types the query actually traverses, not every edge type in the + /// catalog. Scoping is what keeps a single-edge join (`$x identifiesPerson + /// $p`) from scanning the whole graph's edge data; the cache key carries the + /// scoped set, so a `{Knows}` index and a `{Knows, WorksAt}` index are + /// distinct entries and never serve each other. pub async fn graph_index( &self, resolved: &ResolvedTarget, - catalog: &Catalog, + edge_types: &HashMap, ) -> Result> { - let key = graph_index_cache_key(resolved, catalog); + let key = graph_index_cache_key(resolved, edge_types); { let mut cache = self.graph_indices.lock().await; if let Some(index) = cache.entries.get(&key).cloned() { @@ -53,13 +70,8 @@ impl RuntimeCache { } } - let edge_types = catalog - .edge_types - .iter() - .map(|(name, et)| (name.clone(), (et.from_type.clone(), et.to_type.clone()))) - .collect(); - - let index = Arc::new(GraphIndex::build(&resolved.snapshot, &edge_types).await?); + crate::instrumentation::record_graph_build(edge_types.len()); + let index = Arc::new(GraphIndex::build(&resolved.snapshot, edge_types).await?); let mut cache = self.graph_indices.lock().await; if let Some(existing) = cache.entries.get(&key).cloned() { return Ok(existing); @@ -151,9 +163,11 @@ impl Default for GraphIndexCache { } } -fn graph_index_cache_key(resolved: &ResolvedTarget, catalog: &Catalog) -> GraphIndexCacheKey { - let mut edge_tables: Vec = catalog - .edge_types +fn graph_index_cache_key( + resolved: &ResolvedTarget, + edge_types: &HashMap, +) -> GraphIndexCacheKey { + let mut edge_tables: Vec = edge_types .keys() .filter_map(|edge_name| { let table_key = format!("edge:{}", edge_name); @@ -164,15 +178,13 @@ fn graph_index_cache_key(resolved: &ResolvedTarget, catalog: &Catalog) -> GraphI table_key, table_version: entry.table_version, table_branch: entry.table_branch.clone(), + e_tag: entry.version_metadata.e_tag().map(str::to_string), }) }) .collect(); edge_tables.sort_by(|a, b| a.table_key.cmp(&b.table_key)); - GraphIndexCacheKey { - snapshot_id: resolved.snapshot_id.as_str().to_string(), - edge_tables, - } + GraphIndexCacheKey { edge_tables } } /// Max held `Dataset` handles. A handle holds only Arcs (object store + manifest), @@ -290,9 +302,15 @@ mod tests { use super::*; fn key(id: usize) -> GraphIndexCacheKey { + // Distinct keys via a distinct edge table per id (the key no longer carries + // a snapshot id — it is the physical edge-table identity set, A1). GraphIndexCacheKey { - snapshot_id: format!("snap-{id}"), - edge_tables: Vec::new(), + edge_tables: vec![GraphIndexTableState { + table_key: format!("edge:t{id}"), + table_version: 1, + table_branch: None, + e_tag: None, + }], } } From afebf39d304eb97aeb357be35782f5cc1cd71d55 Mon Sep 17 00:00:00 2001 From: Ragnor Comerford Date: Sun, 28 Jun 2026 18:01:21 +0200 Subject: [PATCH 2/6] test(engine): cost tests for scoped + cross-branch-reused topology index fresh_branch_traversal_reuses_main_graph_index (A1: a lazy-fork branch reuses main's cached CSR index, 0 rebuilds) and single_edge_query_builds_only_referenced_edge (A2: a one-edge query builds only that edge, not the whole catalog), via the graph_build_count/graph_edges_built probes. Forced CSR mode, #[serial]. Updates the recreated-branch incarnation test comment for the physical-identity key. --- crates/omnigraph/tests/helpers/cost.rs | 2 + crates/omnigraph/tests/warm_read_cost.rs | 156 ++++++++++++++++++++++- 2 files changed, 154 insertions(+), 4 deletions(-) diff --git a/crates/omnigraph/tests/helpers/cost.rs b/crates/omnigraph/tests/helpers/cost.rs index 331bfb37..f82e5941 100644 --- a/crates/omnigraph/tests/helpers/cost.rs +++ b/crates/omnigraph/tests/helpers/cost.rs @@ -315,6 +315,8 @@ impl OpProbes { probe_count: Arc::clone(&h.probe_count), data_open_count: Arc::clone(&h.data_open_count), internal_open_count: Arc::clone(&h.internal_open_count), + // graph_build_count / graph_edges_built unused by this harness. + ..Default::default() }; (probes, h) } diff --git a/crates/omnigraph/tests/warm_read_cost.rs b/crates/omnigraph/tests/warm_read_cost.rs index 8f6e74e0..9a745e35 100644 --- a/crates/omnigraph/tests/warm_read_cost.rs +++ b/crates/omnigraph/tests/warm_read_cost.rs @@ -7,9 +7,14 @@ mod helpers; +use std::sync::Arc; +use std::sync::atomic::{AtomicU64, Ordering}; + use arrow_array::{Array, StringArray}; use omnigraph::db::{Omnigraph, ReadTarget}; +use omnigraph::instrumentation::{QueryIoProbes, with_query_io_probes}; use omnigraph_compiler::result::QueryResult; +use serial_test::serial; use helpers::cost::{cost_harness, measure}; use helpers::{ @@ -474,10 +479,15 @@ async fn recreated_branch_owned_table_handle_uses_table_etag() { ); } -/// The graph-index cache is keyed by synthetic snapshot id plus edge-table -/// state. A recreated branch can reuse the same edge table `(branch, version)`, -/// so the synthetic snapshot id must carry the manifest incarnation or traversal -/// can reuse stale topology. +/// A recreated branch can reuse the same edge table `(branch, version)`. The +/// graph-index cache is keyed (A1) by each edge table's physical identity +/// `(table_key, version, table_branch, e_tag)`; on local FS the e_tag is `None`, +/// so a recreated branch at the same version has the same key — the stale topology +/// is instead evicted by the same-branch manifest refresh (`invalidate_all` on the +/// `version_probes == 2` stale path), the documented e_tag-less fallback. This +/// traversal takes the indexed path (single-source frontier), so it also exercises +/// the table-handle cache incarnation; the assertion is that recreated-branch +/// topology is never stale regardless of path. #[tokio::test] async fn recreated_branch_traversal_uses_graph_index_incarnation() { let dir = tempfile::tempdir().unwrap(); @@ -756,3 +766,141 @@ async fn write_invalidates_table_cache_for_changed_table() { "the post-write read observes the new row (no stale handle served)" ); } + +// ─── Topology-index build cost (A1 cross-branch reuse + A2 scoped build) ───── +// +// These force the CSR build path (the indexed path builds no topology), so they +// are `#[serial]` — the env write must not race a concurrent reader in this +// binary. The build-count probes (`graph_build_count` / `graph_edges_built`) are +// read off a directly-constructed `QueryIoProbes`. + +fn set_csr_mode() { + // SAFE: every test that sets this is `#[serial]`, so no thread reads the env + // during the write. + unsafe { std::env::set_var("OMNIGRAPH_TRAVERSAL_MODE", "csr") }; +} + +fn clear_traversal_mode() { + unsafe { std::env::remove_var("OMNIGRAPH_TRAVERSAL_MODE") }; +} + +/// A1: a fresh (unwritten) branch reuses main's cached CSR topology index +/// (`graph_build_count == 0`), and the reused index returns correct results for +/// the branch. Before A1 the branch-keyed snapshot id forced a rebuild (count 1). +#[tokio::test] +#[serial] +async fn fresh_branch_traversal_reuses_main_graph_index() { + let dir = tempfile::tempdir().unwrap(); + let mut writer = init_and_load(&dir).await; + let uri = dir.path().to_str().unwrap(); + // A Knows edge on main so there is topology to build and then reuse. + mutate_main( + &mut writer, + MUTATION_QUERIES, + "insert_person_and_friend", + &mixed_params(&[("$name", "Walker"), ("$friend", "Alice")], &[("$age", 41)]), + ) + .await + .unwrap(); + + // Separate reader handle. As in production, the reader never creates the + // branch, so creating it does not invalidate the reader's warm cache. + let reader = Omnigraph::open(uri).await.unwrap(); + + set_csr_mode(); + // Reader warms main on the CSR path: builds and caches the topology index. + let warm = reader + .query( + ReadTarget::branch("main"), + TEST_QUERIES, + "friends_of", + ¶ms(&[("$name", "Walker")]), + ) + .await + .unwrap(); + assert_eq!( + first_column_strings(&warm), + vec!["Alice"], + "test setup: main has the Knows edge" + ); + + // A separate writer creates the branch (lazy fork: feature's edge tables are + // physically main's — same version + e_tag, table_branch=None). + writer.branch_create("feature").await.unwrap(); + + let graph_build = Arc::new(AtomicU64::new(0)); + let probes_in = QueryIoProbes { + graph_build_count: Arc::clone(&graph_build), + ..Default::default() + }; + let on_branch = with_query_io_probes( + probes_in, + reader.query( + ReadTarget::branch("feature"), + TEST_QUERIES, + "friends_of", + ¶ms(&[("$name", "Walker")]), + ), + ) + .await; + clear_traversal_mode(); + let on_branch = on_branch.unwrap(); + + assert_eq!( + first_column_strings(&on_branch), + vec!["Alice"], + "fresh branch sees main's edges (lazy fork) and the reused index is correct" + ); + assert_eq!( + graph_build.load(Ordering::Relaxed), + 0, + "a fresh branch with unchanged edges must reuse main's cached CSR index, not rebuild it" + ); +} + +/// A2: a query referencing one edge type builds the topology for only that edge, +/// not every edge in the catalog. Forces CSR (the build path) and counts edge +/// tables built. Before A2 the build materialized all catalog edges (the fixture +/// defines Knows + WorksAt, so a build-all touches >= 2) — the cold-build cost. +#[tokio::test] +#[serial] +async fn single_edge_query_builds_only_referenced_edge() { + let dir = tempfile::tempdir().unwrap(); + let mut db = init_and_load(&dir).await; + // A Knows edge so the referenced build has topology; the fixture also defines + // WorksAt, so a build-all would touch more than one edge. + mutate_main( + &mut db, + MUTATION_QUERIES, + "insert_person_and_friend", + &mixed_params(&[("$name", "Walker"), ("$friend", "Alice")], &[("$age", 41)]), + ) + .await + .unwrap(); + + set_csr_mode(); + let graph_edges = Arc::new(AtomicU64::new(0)); + let probes_in = QueryIoProbes { + graph_edges_built: Arc::clone(&graph_edges), + ..Default::default() + }; + let result = with_query_io_probes( + probes_in, + db.query( + ReadTarget::branch("main"), + TEST_QUERIES, + "friends_of", + ¶ms(&[("$name", "Walker")]), + ), + ) + .await; + clear_traversal_mode(); + let result = result.unwrap(); + + assert_eq!(first_column_strings(&result), vec!["Alice"]); + assert_eq!( + graph_edges.load(Ordering::Relaxed), + 1, + "a query referencing only `knows` must build only that edge, not all catalog edges" + ); +} From 18de8230485a8dcb9b625791c28fd42f6d790a0b Mon Sep 17 00:00:00 2001 From: Ragnor Comerford Date: Sun, 28 Jun 2026 18:01:22 +0200 Subject: [PATCH 3/6] docs(engine): topology-index scoping + physical-identity cache key Document the scoped CSR build and the physical-identity (e_tag) graph-index cache key with its local-FS refresh-invalidation fallback across invariants, testing, execution, and architecture docs. --- docs/dev/architecture.md | 4 ++-- docs/dev/execution.md | 2 +- docs/dev/invariants.md | 13 ++++++++++++- docs/dev/testing.md | 4 ++-- 4 files changed, 17 insertions(+), 6 deletions(-) diff --git a/docs/dev/architecture.md b/docs/dev/architecture.md index abdc45bb..2469c84c 100644 --- a/docs/dev/architecture.md +++ b/docs/dev/architecture.md @@ -138,8 +138,8 @@ flowchart TB end subgraph idx[graph index] - gi[GraphIndex
CSR/CSC built per query]:::l2 - rc[RuntimeCache LRU=8]:::l2 + gi[GraphIndex
CSR/CSC built per query
scoped to traversed edges]:::l2 + rc[RuntimeCache LRU=8
keyed by edge-table identity]:::l2 end subgraph io[Lance I/O] diff --git a/docs/dev/execution.md b/docs/dev/execution.md index 2762a3f8..0430f25d 100644 --- a/docs/dev/execution.md +++ b/docs/dev/execution.md @@ -6,7 +6,7 @@ Pipeline: 1. Parse + typecheck via `omnigraph-compiler`. 2. Lower to IR. -3. If `Expand` or `AntiJoin` is present, build (or fetch from `RuntimeCache`) a `GraphIndex`. +3. If `Expand` or `AntiJoin` is present, build (or fetch from `RuntimeCache`) a `GraphIndex` **scoped to the edge types the query actually traverses** (`referenced_edge_types`, recursing through `AntiJoin` inners) — not every edge type in the catalog. The CSR build full-scans each covered edge dataset, so scoping is what keeps a single-edge join (`$x identifiesPerson $p`) from scanning the whole graph's edge data. The `RuntimeCache` key is each covered edge table's **physical identity** `(table_key, version, table_branch, e_tag)` (not the resolved snapshot id), so a `{Knows}` index and a `{Knows, WorksAt}` index are distinct entries AND a lazy-fork branch whose edge tables physically *are* main's reuses main's built index instead of cold-scanning it. 4. Run `execute_query` against the snapshot. ### Read flow — sequence diff --git a/docs/dev/invariants.md b/docs/dev/invariants.md index 94a83b0a..5761f489 100644 --- a/docs/dev/invariants.md +++ b/docs/dev/invariants.md @@ -331,7 +331,18 @@ them explicit. deferred — it needs the Q8 cleanup-resurrection watermark first). The commit graph IS now reconcilable from the manifest (RFC-013 Phase 7 — it is a pure projection of the `graph_commit`/`graph_head` rows); the traversal id-map is - still rebuilt. + still rebuilt. The CSR/CSC topology index is now **scoped and cross-branch + reused** (the two cuts that closed the cross-edge-join hang): the build covers + only the edge types a query traverses (`referenced_edge_types` over + `Expand`/`AntiJoin`, not every catalog edge — a single-edge join no longer + scans the whole graph's edge data), and the `RuntimeCache` cache key is each + edge table's physical identity `(table_key, version, table_branch, e_tag)` + rather than the resolved snapshot id, so a lazy-fork branch reuses main's built + index instead of cold-scanning it. Residual: on stores without per-table + e_tags (local FS) a branch deleted and recreated at the same version has the + same key, so the topology distinction falls back to the same-branch manifest + refresh clearing read caches (`invalidate_all`); production object stores carry + real e_tags, so the key alone distinguishes incarnations there. - **Commit-graph parent under concurrency — CLOSED (RFC-013 Phase 7):** the graph commit is now recorded in the manifest publish CAS, and the publisher resolves the new commit's parent INSIDE its retry loop, per attempt, from the just-loaded diff --git a/docs/dev/testing.md b/docs/dev/testing.md index 20388048..61f31170 100644 --- a/docs/dev/testing.md +++ b/docs/dev/testing.md @@ -25,7 +25,7 @@ The engine's `tests/` is the principal coverage surface; most graph-shaped behav | `staged_writes.rs` | TableStore staged-write primitives (`stage_append`, `stage_merge_insert`, `commit_staged`, `scan_with_staged`, `count_rows_with_staged`) — primitive-level only; engine code uses the in-memory `MutationStaging` accumulator instead | | `forbidden_apis.rs` | Defense-in-depth source-walk guard: engine code (`exec/`, `db/omnigraph/`, `loader/`, `changes/`) must not reach around the sealed storage trait to Lance inline-commit APIs, nor open datasets directly (`Dataset::open` / `DatasetBuilder::from_uri`/`from_namespace`) — reads route through `Snapshot::open` and the held-handle cache; `// forbidden-api-allow: ` sentinel exempts reviewed lines | | `lance_surface_guards.rs` | Pins the Lance API surfaces omnigraph depends on (named runtime + compile-only guards; see [lance.md](lance.md)) — the first smoke check on any Lance version bump; e.g. `compact_files_still_fails_on_blob_columns` turns red when the upstream blob-compaction fix lands | -| `warm_read_cost.rs` | Cost-budget tests for the warm read path (query-latency work), measured at the object-store boundary with Lance `IOTracker` (the LanceDB IO-counted pattern): a warm same-branch read does 0 manifest opens, 0 commit-graph opens, 1 version probe, validates the schema once (Fix 1 / finding A / Fix 2 at commit-history depth); stale same-branch reads perform exactly 2 probes and refresh manifest-only; recreated non-main branches with the same Lance version refresh by incarnation; recreated branch-owned table handles are distinguished by table e_tag or refresh-time cache clearing; recreated traversal topology is protected by synthetic snapshot-id incarnation or refresh-time cache clearing; a warm *repeat* read does 0 table opens via the held-handle cache and a write re-opens only the changed table at its new version/e_tag (Fix 3/6A). See "Cost-budget tests" below | +| `warm_read_cost.rs` | Cost-budget tests for the warm read path (query-latency work), measured at the object-store boundary with Lance `IOTracker` (the LanceDB IO-counted pattern): a warm same-branch read does 0 manifest opens, 0 commit-graph opens, 1 version probe, validates the schema once (Fix 1 / finding A / Fix 2 at commit-history depth); stale same-branch reads perform exactly 2 probes and refresh manifest-only; recreated non-main branches with the same Lance version refresh by incarnation; recreated branch-owned table handles are distinguished by table e_tag or refresh-time cache clearing; recreated traversal topology is protected by per-edge-table e_tag in the graph-index cache key or refresh-time cache clearing; a warm *repeat* read does 0 table opens via the held-handle cache and a write re-opens only the changed table at its new version/e_tag (Fix 3/6A). Topology-build cost (CSR path, `#[serial]` + `OMNIGRAPH_TRAVERSAL_MODE=csr`, read off the `graph_build_count`/`graph_edges_built` probes): `fresh_branch_traversal_reuses_main_graph_index` (a lazy-fork branch reuses main's cached CSR index — 0 rebuilds — keyed by physical edge-table identity) and `single_edge_query_builds_only_referenced_edge` (a one-edge query builds only that edge, not all catalog edges). See "Cost-budget tests" below | | `write_cost.rs` | Cost-budget tests for the WRITE path (RFC-013), the latency twin of `warm_read_cost.rs` on the **shared `helpers::cost` harness** (`measure`/`IoCounts`/`assert_flat`/`local_graph`). Runs on **local FS**; gates the **internal-table** term (`__manifest`/`_graph_commits` scans flat in commit-history depth — `internal_table_scans_are_flat_in_history`, now **green every-PR** since RFC-013 step 2 brought the internal tables into `optimize`; the test compacts at each depth before measuring) plus green every-PR guards (single-insert `data_writes` bounded, a per-write read-op ceiling that fails the moment a round-trip is added, and a `measure_with_staged` fitness assert that a keyed insert routes through `stage_merge_insert` once with no `stage_append`/vector-index build). The **data-table opener** term is S3-only — see `write_cost_s3.rs` and the backend-split note in "Cost-budget tests" below | | `helpers/cost.rs` | The shared cost-budget harness (not a test): `IoCounts`/`StagedCounts` (counts by table class), `measure`/`measure_with_staged` (the one place the `with_query_io_probes` + `MergeWriteProbes` task-local + `IOTracker` wiring lives; reads per-op deltas via lance's `incremental_stats()`, the upstream per-request idiom from `rust/lance/src/dataset/tests/dataset_io.rs`), `cost_harness`/`GraphIoMeter` (installs ONE `__manifest` `IOTracker` for a whole test body so the graph opens **under** it and `manifest_reads` is **ground truth** — every read regardless of handle age, the warm-coordinator freshness probe included — closing the blind spot where a per-op tracker installed at measure time cannot see a long-lived handle's reads; outside `cost_harness`, `measure` falls back to fresh per-op tracking, so `write_cost_s3.rs` is unaffected), `last_manifest_reads()` (the manifest read log for `assert_io_eq!`-style failure diagnostics), `assert_flat(curve, select, slack, what)`, and store-agnostic `local_graph`/`s3_graph` fixtures. `warm_read_cost.rs`, `write_cost.rs`, and `write_cost_s3.rs` all consume it so a cost test body is written once and reads in one vocabulary | | `lifecycle.rs` | Graph lifecycle, schema state | @@ -35,7 +35,7 @@ The engine's `tests/` is the principal coverage surface; most graph-shaped behav | `schema_apply.rs` | Migration plan + apply, schema-apply lock; index materialization deferred to the reconciler (iss-848): `apply_schema_defers_vector_index_on_empty_table` (an empty-table Vector `@index` never aborts the apply) and `index_only_constraint_apply_touches_no_table_data` (adding an `@index` is metadata-only — no table-version bump) | | `search.rs` | FTS / vector / hybrid (`bm25`, `nearest`, `rrf`) | | `traversal.rs` | `Expand`, variable-length hops, anti-join (CSR path — `OMNIGRAPH_TRAVERSAL_MODE` unset) | -| `traversal_indexed.rs` | BTREE-indexed Expand (`execute_expand_indexed`) forced via `OMNIGRAPH_TRAVERSAL_MODE`, asserted semantically equal to the CSR path; own binary, all `#[serial]` so env writes never race | +| `traversal_indexed.rs` | BTREE-indexed Expand (`execute_expand_indexed`) forced via `OMNIGRAPH_TRAVERSAL_MODE`, asserted semantically equal to the CSR path; own binary, all `#[serial]` so env writes never race. (The cross-edge-join scoped-build cost guard lives in `warm_read_cost.rs::single_edge_query_builds_only_referenced_edge`, asserted via the `graph_edges_built` probe.) | | `proptest_equivalence.rs` | Property-based query-correctness invariants over generated graphs (shared key alphabet forces cross-type id collisions, cycles, self-loops) — pins Expand-mode equivalence so a future fork divergence fails loudly instead of silently; `#[serial]` | | `ordering.rs` | ORDER BY contract: descending, multi-key precedence, deterministic key-column tie-break (total order, so `ORDER … LIMIT` is deterministic), NULL placement (`nulls_first = !descending`) | | `literal_filters.rs` | Execution goldens for non-string/non-integer scalar literal filters (F64/F32/Bool/Date/DateTime) across both the in-memory comparison arm and the Lance-pushdown arm | From ea71fc5bcc83ed9335bc72d7cdab54827ef1e5dd Mon Sep 17 00:00:00 2001 From: Ragnor Comerford Date: Sun, 28 Jun 2026 18:22:50 +0200 Subject: [PATCH 4/6] fix(test): move CSR-forced topology cost tests to the all-serial binary MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The two topology-build cost tests force OMNIGRAPH_TRAVERSAL_MODE via process- global env mutation, which query.rs reads. In warm_read_cost.rs (a mixed serial/non-serial binary) a concurrent non-serial traversal test could race the env write (UB under Rust 2024's unsafe set_var contract) and be forced onto CSR. Move them to traversal_indexed.rs — the dedicated all-serial binary with no non-serial env reader (its documented-safe home) — and add a ModeGuard RAII helper so a panic mid-test cannot leak the override. Addresses a PR review (P2). --- crates/omnigraph/tests/traversal_indexed.rs | 165 +++++++++++++++++++- crates/omnigraph/tests/warm_read_cost.rs | 143 ----------------- docs/dev/testing.md | 4 +- 3 files changed, 166 insertions(+), 146 deletions(-) diff --git a/crates/omnigraph/tests/traversal_indexed.rs b/crates/omnigraph/tests/traversal_indexed.rs index 2ceed85a..78d68852 100644 --- a/crates/omnigraph/tests/traversal_indexed.rs +++ b/crates/omnigraph/tests/traversal_indexed.rs @@ -11,12 +11,17 @@ mod helpers; +use std::sync::Arc; +use std::sync::atomic::{AtomicU64, Ordering}; + use arrow_array::{Array, StringArray}; -use omnigraph::db::Omnigraph; +use omnigraph::db::{Omnigraph, ReadTarget}; +use omnigraph::instrumentation::{QueryIoProbes, with_query_io_probes}; use omnigraph::loader::{LoadMode, load_jsonl}; use omnigraph::table_store::{IndexCoverage, TableStore}; use omnigraph_compiler::ir::ParamMap; +use omnigraph_compiler::result::QueryResult; use serial_test::serial; use helpers::*; @@ -31,6 +36,40 @@ fn clear_mode() { unsafe { std::env::remove_var("OMNIGRAPH_TRAVERSAL_MODE") }; } +/// RAII guard that sets `OMNIGRAPH_TRAVERSAL_MODE` and clears it on drop, so a +/// panic mid-test (e.g. a query `unwrap`) cannot leak the forced mode into a +/// later test in this binary. SAFE: every test here is `#[serial]` and this +/// binary has no non-serial env reader, so no thread reads the env during the +/// write. (Mirrors `proptest_equivalence.rs::ModeGuard`.) +struct ModeGuard; +impl ModeGuard { + fn set(mode: &str) -> Self { + set_mode(mode); + ModeGuard + } +} +impl Drop for ModeGuard { + fn drop(&mut self) { + clear_mode(); + } +} + +/// First result column, sorted — for the probe-based topology-build tests below. +fn column0(result: &QueryResult) -> Vec { + if result.num_rows() == 0 { + return Vec::new(); + } + let batch = result.concat_batches().unwrap(); + let col = batch + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + let mut v: Vec = (0..col.len()).map(|i| col.value(i).to_string()).collect(); + v.sort(); + v +} + /// Run a name-returning query and return its first column, sorted. async fn sorted_names(db: &mut Omnigraph, queries: &str, name: &str, params: &ParamMap) -> Vec { let result = query_main(db, queries, name, params).await.unwrap(); @@ -325,3 +364,127 @@ async fn variable_hops_handle_self_loop() { // a->a hits the seeded source (pruned); only b is reached. assert_eq!(got, vec!["b"]); } + +// ─── Topology-index build cost (A1 cross-branch reuse + A2 scoped build) ───── +// +// These force the CSR build path (the indexed path builds no topology) and read +// the `graph_build_count` / `graph_edges_built` probes. They live HERE — the +// all-serial binary with no non-serial env reader — because they mutate the +// process-global `OMNIGRAPH_TRAVERSAL_MODE`, which `query.rs` reads; in a mixed +// serial/non-serial binary a concurrent non-serial traversal would race the env +// write. The `ModeGuard` clears the override even on a panic. + +/// A1: a fresh (unwritten) branch reuses main's cached CSR topology index +/// (`graph_build_count == 0`), and the reused index returns correct results for +/// the branch. Before A1 the branch-keyed snapshot id forced a rebuild (count 1). +#[tokio::test] +#[serial] +async fn fresh_branch_traversal_reuses_main_graph_index() { + let dir = tempfile::tempdir().unwrap(); + let mut writer = init_and_load(&dir).await; + let uri = dir.path().to_str().unwrap(); + // A Knows edge on main so there is topology to build and then reuse. + mutate_main( + &mut writer, + MUTATION_QUERIES, + "insert_person_and_friend", + &mixed_params(&[("$name", "Walker"), ("$friend", "Alice")], &[("$age", 41)]), + ) + .await + .unwrap(); + + // Separate reader handle. As in production, the reader never creates the + // branch, so creating it does not invalidate the reader's warm cache. + let reader = Omnigraph::open(uri).await.unwrap(); + + let _mode = ModeGuard::set("csr"); + // Reader warms main on the CSR path: builds and caches the topology index. + let warm = reader + .query( + ReadTarget::branch("main"), + TEST_QUERIES, + "friends_of", + ¶ms(&[("$name", "Walker")]), + ) + .await + .unwrap(); + assert_eq!(column0(&warm), vec!["Alice"], "test setup: main has the Knows edge"); + + // A separate writer creates the branch (lazy fork: feature's edge tables are + // physically main's — same version + e_tag, table_branch=None). + writer.branch_create("feature").await.unwrap(); + + let graph_build = Arc::new(AtomicU64::new(0)); + let probes_in = QueryIoProbes { + graph_build_count: Arc::clone(&graph_build), + ..Default::default() + }; + let on_branch = with_query_io_probes( + probes_in, + reader.query( + ReadTarget::branch("feature"), + TEST_QUERIES, + "friends_of", + ¶ms(&[("$name", "Walker")]), + ), + ) + .await + .unwrap(); + + assert_eq!( + column0(&on_branch), + vec!["Alice"], + "fresh branch sees main's edges (lazy fork) and the reused index is correct" + ); + assert_eq!( + graph_build.load(Ordering::Relaxed), + 0, + "a fresh branch with unchanged edges must reuse main's cached CSR index, not rebuild it" + ); +} + +/// A2: a query referencing one edge type builds the topology for only that edge, +/// not every edge in the catalog. Forces CSR (the build path) and counts edge +/// tables built. Before A2 the build materialized all catalog edges (the fixture +/// defines Knows + WorksAt, so a build-all touches >= 2) — the cold-build cost. +#[tokio::test] +#[serial] +async fn single_edge_query_builds_only_referenced_edge() { + let dir = tempfile::tempdir().unwrap(); + let mut db = init_and_load(&dir).await; + // A Knows edge so the referenced build has topology; the fixture also defines + // WorksAt, so a build-all would touch more than one edge. + mutate_main( + &mut db, + MUTATION_QUERIES, + "insert_person_and_friend", + &mixed_params(&[("$name", "Walker"), ("$friend", "Alice")], &[("$age", 41)]), + ) + .await + .unwrap(); + + let _mode = ModeGuard::set("csr"); + let graph_edges = Arc::new(AtomicU64::new(0)); + let probes_in = QueryIoProbes { + graph_edges_built: Arc::clone(&graph_edges), + ..Default::default() + }; + let result = with_query_io_probes( + probes_in, + db.query( + ReadTarget::branch("main"), + TEST_QUERIES, + "friends_of", + ¶ms(&[("$name", "Walker")]), + ), + ) + .await + .unwrap(); + + assert_eq!(column0(&result), vec!["Alice"]); + assert_eq!( + graph_edges.load(Ordering::Relaxed), + 1, + "a query referencing only `knows` must build only that edge, not all catalog edges" + ); +} diff --git a/crates/omnigraph/tests/warm_read_cost.rs b/crates/omnigraph/tests/warm_read_cost.rs index 551819f4..4a34ef5f 100644 --- a/crates/omnigraph/tests/warm_read_cost.rs +++ b/crates/omnigraph/tests/warm_read_cost.rs @@ -7,14 +7,9 @@ mod helpers; -use std::sync::Arc; -use std::sync::atomic::{AtomicU64, Ordering}; - use arrow_array::{Array, StringArray}; use omnigraph::db::{Omnigraph, ReadTarget}; -use omnigraph::instrumentation::{QueryIoProbes, with_query_io_probes}; use omnigraph_compiler::result::QueryResult; -use serial_test::serial; use helpers::cost::{cost_harness, measure}; use helpers::{ @@ -738,141 +733,3 @@ async fn write_invalidates_table_cache_for_changed_table() { "the post-write read observes the new row (no stale handle served)" ); } - -// ─── Topology-index build cost (A1 cross-branch reuse + A2 scoped build) ───── -// -// These force the CSR build path (the indexed path builds no topology), so they -// are `#[serial]` — the env write must not race a concurrent reader in this -// binary. The build-count probes (`graph_build_count` / `graph_edges_built`) are -// read off a directly-constructed `QueryIoProbes`. - -fn set_csr_mode() { - // SAFE: every test that sets this is `#[serial]`, so no thread reads the env - // during the write. - unsafe { std::env::set_var("OMNIGRAPH_TRAVERSAL_MODE", "csr") }; -} - -fn clear_traversal_mode() { - unsafe { std::env::remove_var("OMNIGRAPH_TRAVERSAL_MODE") }; -} - -/// A1: a fresh (unwritten) branch reuses main's cached CSR topology index -/// (`graph_build_count == 0`), and the reused index returns correct results for -/// the branch. Before A1 the branch-keyed snapshot id forced a rebuild (count 1). -#[tokio::test] -#[serial] -async fn fresh_branch_traversal_reuses_main_graph_index() { - let dir = tempfile::tempdir().unwrap(); - let mut writer = init_and_load(&dir).await; - let uri = dir.path().to_str().unwrap(); - // A Knows edge on main so there is topology to build and then reuse. - mutate_main( - &mut writer, - MUTATION_QUERIES, - "insert_person_and_friend", - &mixed_params(&[("$name", "Walker"), ("$friend", "Alice")], &[("$age", 41)]), - ) - .await - .unwrap(); - - // Separate reader handle. As in production, the reader never creates the - // branch, so creating it does not invalidate the reader's warm cache. - let reader = Omnigraph::open(uri).await.unwrap(); - - set_csr_mode(); - // Reader warms main on the CSR path: builds and caches the topology index. - let warm = reader - .query( - ReadTarget::branch("main"), - TEST_QUERIES, - "friends_of", - ¶ms(&[("$name", "Walker")]), - ) - .await - .unwrap(); - assert_eq!( - first_column_strings(&warm), - vec!["Alice"], - "test setup: main has the Knows edge" - ); - - // A separate writer creates the branch (lazy fork: feature's edge tables are - // physically main's — same version + e_tag, table_branch=None). - writer.branch_create("feature").await.unwrap(); - - let graph_build = Arc::new(AtomicU64::new(0)); - let probes_in = QueryIoProbes { - graph_build_count: Arc::clone(&graph_build), - ..Default::default() - }; - let on_branch = with_query_io_probes( - probes_in, - reader.query( - ReadTarget::branch("feature"), - TEST_QUERIES, - "friends_of", - ¶ms(&[("$name", "Walker")]), - ), - ) - .await; - clear_traversal_mode(); - let on_branch = on_branch.unwrap(); - - assert_eq!( - first_column_strings(&on_branch), - vec!["Alice"], - "fresh branch sees main's edges (lazy fork) and the reused index is correct" - ); - assert_eq!( - graph_build.load(Ordering::Relaxed), - 0, - "a fresh branch with unchanged edges must reuse main's cached CSR index, not rebuild it" - ); -} - -/// A2: a query referencing one edge type builds the topology for only that edge, -/// not every edge in the catalog. Forces CSR (the build path) and counts edge -/// tables built. Before A2 the build materialized all catalog edges (the fixture -/// defines Knows + WorksAt, so a build-all touches >= 2) — the cold-build cost. -#[tokio::test] -#[serial] -async fn single_edge_query_builds_only_referenced_edge() { - let dir = tempfile::tempdir().unwrap(); - let mut db = init_and_load(&dir).await; - // A Knows edge so the referenced build has topology; the fixture also defines - // WorksAt, so a build-all would touch more than one edge. - mutate_main( - &mut db, - MUTATION_QUERIES, - "insert_person_and_friend", - &mixed_params(&[("$name", "Walker"), ("$friend", "Alice")], &[("$age", 41)]), - ) - .await - .unwrap(); - - set_csr_mode(); - let graph_edges = Arc::new(AtomicU64::new(0)); - let probes_in = QueryIoProbes { - graph_edges_built: Arc::clone(&graph_edges), - ..Default::default() - }; - let result = with_query_io_probes( - probes_in, - db.query( - ReadTarget::branch("main"), - TEST_QUERIES, - "friends_of", - ¶ms(&[("$name", "Walker")]), - ), - ) - .await; - clear_traversal_mode(); - let result = result.unwrap(); - - assert_eq!(first_column_strings(&result), vec!["Alice"]); - assert_eq!( - graph_edges.load(Ordering::Relaxed), - 1, - "a query referencing only `knows` must build only that edge, not all catalog edges" - ); -} diff --git a/docs/dev/testing.md b/docs/dev/testing.md index 66c3b2f6..3a1642cc 100644 --- a/docs/dev/testing.md +++ b/docs/dev/testing.md @@ -25,7 +25,7 @@ The engine's `tests/` is the principal coverage surface; most graph-shaped behav | `staged_writes.rs` | TableStore staged-write primitives (`stage_append`, `stage_merge_insert`, `commit_staged`, `scan_with_staged`, `count_rows_with_staged`) — primitive-level only; engine code uses the in-memory `MutationStaging` accumulator instead | | `forbidden_apis.rs` | Defense-in-depth source-walk guard: engine code (`exec/`, `db/omnigraph/`, `loader/`, `changes/`) must not reach around the sealed storage trait to Lance inline-commit APIs, nor open datasets directly (`Dataset::open` / `DatasetBuilder::from_uri`/`from_namespace`) — reads route through `Snapshot::open` and the held-handle cache; `// forbidden-api-allow: ` sentinel exempts reviewed lines | | `lance_surface_guards.rs` | Pins the Lance API surfaces omnigraph depends on (named runtime + compile-only guards; see [lance.md](lance.md)) — the first smoke check on any Lance version bump; e.g. `compact_files_still_fails_on_blob_columns` turns red when the upstream blob-compaction fix lands | -| `warm_read_cost.rs` | Cost-budget tests for the warm read path (query-latency work), measured at the object-store boundary with Lance `IOTracker` (the LanceDB IO-counted pattern): a warm same-branch read does 0 manifest opens, 1 version probe, validates the schema once (Fix 1 / finding A / Fix 2 at commit-history depth); stale same-branch reads perform exactly 2 probes and refresh manifest-only; recreated non-main branches with the same Lance version refresh by incarnation; recreated branch-owned table handles are distinguished by table e_tag or refresh-time cache clearing; recreated traversal topology is protected by per-edge-table e_tag in the graph-index cache key or refresh-time cache clearing; a warm *repeat* read does 0 table opens via the held-handle cache and a write re-opens only the changed table at its new version/e_tag (Fix 3/6A). Topology-build cost (CSR path, `#[serial]` + `OMNIGRAPH_TRAVERSAL_MODE=csr`, read off the `graph_build_count`/`graph_edges_built` probes): `fresh_branch_traversal_reuses_main_graph_index` (a lazy-fork branch reuses main's cached CSR index — 0 rebuilds — keyed by physical edge-table identity) and `single_edge_query_builds_only_referenced_edge` (a one-edge query builds only that edge, not all catalog edges). See "Cost-budget tests" below | +| `warm_read_cost.rs` | Cost-budget tests for the warm read path (query-latency work), measured at the object-store boundary with Lance `IOTracker` (the LanceDB IO-counted pattern): a warm same-branch read does 0 manifest opens, 1 version probe, validates the schema once (Fix 1 / finding A / Fix 2 at commit-history depth); stale same-branch reads perform exactly 2 probes and refresh manifest-only; recreated non-main branches with the same Lance version refresh by incarnation; recreated branch-owned table handles are distinguished by table e_tag or refresh-time cache clearing; recreated traversal topology is protected by per-edge-table e_tag in the graph-index cache key or refresh-time cache clearing; a warm *repeat* read does 0 table opens via the held-handle cache and a write re-opens only the changed table at its new version/e_tag (Fix 3/6A). See "Cost-budget tests" below. (The CSR topology-build cost guards — physical-identity cache key + scoped build — live in `traversal_indexed.rs`, since they force `OMNIGRAPH_TRAVERSAL_MODE` and must run in an all-serial binary.) | | `write_cost.rs` | Cost-budget tests for the WRITE path (RFC-013), the latency twin of `warm_read_cost.rs` on the **shared `helpers::cost` harness** (`measure`/`IoCounts`/`assert_flat`/`local_graph`). Runs on **local FS**; gates the **internal-table** term (`__manifest` scans flat in commit-history depth, lineage rows included — `internal_table_scans_are_flat_in_history`, now **green every-PR** since RFC-013 step 2 brought the internal tables into `optimize`; the test compacts at each depth before measuring) plus green every-PR guards (single-insert `data_writes` bounded, a per-write read-op ceiling that fails the moment a round-trip is added, and a `measure_with_staged` fitness assert that a keyed insert routes through `stage_merge_insert` once with no `stage_append`/vector-index build). The **data-table opener** term is S3-only — see `write_cost_s3.rs` and the backend-split note in "Cost-budget tests" below | | `helpers/cost.rs` | The shared cost-budget harness (not a test): `IoCounts`/`StagedCounts` (counts by table class), `measure`/`measure_with_staged` (the one place the `with_query_io_probes` + `MergeWriteProbes` task-local + `IOTracker` wiring lives; reads per-op deltas via lance's `incremental_stats()`, the upstream per-request idiom from `rust/lance/src/dataset/tests/dataset_io.rs`), `cost_harness`/`GraphIoMeter` (installs ONE `__manifest` `IOTracker` for a whole test body so the graph opens **under** it and `manifest_reads` is **ground truth** — every read regardless of handle age, the warm-coordinator freshness probe included — closing the blind spot where a per-op tracker installed at measure time cannot see a long-lived handle's reads; outside `cost_harness`, `measure` falls back to fresh per-op tracking, so `write_cost_s3.rs` is unaffected), `last_manifest_reads()` (the manifest read log for `assert_io_eq!`-style failure diagnostics), `assert_flat(curve, select, slack, what)`, and store-agnostic `local_graph`/`s3_graph` fixtures. `warm_read_cost.rs`, `write_cost.rs`, and `write_cost_s3.rs` all consume it so a cost test body is written once and reads in one vocabulary | | `lifecycle.rs` | Graph lifecycle, schema state | @@ -35,7 +35,7 @@ The engine's `tests/` is the principal coverage surface; most graph-shaped behav | `schema_apply.rs` | Migration plan + apply, schema-apply lock; index materialization deferred to the reconciler (iss-848): `apply_schema_defers_vector_index_on_empty_table` (an empty-table Vector `@index` never aborts the apply) and `index_only_constraint_apply_touches_no_table_data` (adding an `@index` is metadata-only — no table-version bump) | | `search.rs` | FTS / vector / hybrid (`bm25`, `nearest`, `rrf`) | | `traversal.rs` | `Expand`, variable-length hops, anti-join (CSR path — `OMNIGRAPH_TRAVERSAL_MODE` unset) | -| `traversal_indexed.rs` | BTREE-indexed Expand (`execute_expand_indexed`) forced via `OMNIGRAPH_TRAVERSAL_MODE`, asserted semantically equal to the CSR path; own binary, all `#[serial]` so env writes never race. (The cross-edge-join scoped-build cost guard lives in `warm_read_cost.rs::single_edge_query_builds_only_referenced_edge`, asserted via the `graph_edges_built` probe.) | +| `traversal_indexed.rs` | BTREE-indexed Expand (`execute_expand_indexed`) forced via `OMNIGRAPH_TRAVERSAL_MODE`, asserted semantically equal to the CSR path; own binary, all `#[serial]` so env writes never race (the `ModeGuard` RAII helper clears the override even on a panic). Also home to the CSR topology-build cost guards (which force the same env): `fresh_branch_traversal_reuses_main_graph_index` (A1 — a lazy-fork branch reuses main's cached CSR index, 0 rebuilds via the `graph_build_count` probe) and `single_edge_query_builds_only_referenced_edge` (A2 — a one-edge query builds only that edge via the `graph_edges_built` probe). | | `proptest_equivalence.rs` | Property-based query-correctness invariants over generated graphs (shared key alphabet forces cross-type id collisions, cycles, self-loops) — pins Expand-mode equivalence so a future fork divergence fails loudly instead of silently; `#[serial]` | | `ordering.rs` | ORDER BY contract: descending, multi-key precedence, deterministic key-column tie-break (total order, so `ORDER … LIMIT` is deterministic), NULL placement (`nulls_first = !descending`) | | `literal_filters.rs` | Execution goldens for non-string/non-integer scalar literal filters (F64/F32/Bool/Date/DateTime) across both the in-memory comparison arm and the Lance-pushdown arm | From 4d8151eb4d1b68a22f1efc9d588742e9128c96c6 Mon Sep 17 00:00:00 2001 From: Ragnor Comerford Date: Sun, 28 Jun 2026 18:29:10 +0200 Subject: [PATCH 5/6] fix(engine): include edge endpoints in the graph-index cache key The A1 physical-identity key omitted the edge's (from_type, to_type). GraphIndex keys its TypeIndexes by those endpoint names and execute_expand_csr looks them up by the current catalog's names, so a schema repoint of an edge type that leaves the edge table's physical identity unchanged would reuse a stale index built with the old endpoint namespace and fail with "no type index for ". The old snapshot_id (carrying the manifest version) masked this; dropping it exposed it. Adding the endpoints to the key rebuilds on a repoint while preserving lazy-fork cross-branch reuse (same endpoints -> same key). Addresses a PR review (P1). --- crates/omnigraph/src/runtime_cache.rs | 41 +++++++++++++++++++++++++-- docs/dev/invariants.md | 20 +++++++++---- 2 files changed, 53 insertions(+), 8 deletions(-) diff --git a/crates/omnigraph/src/runtime_cache.rs b/crates/omnigraph/src/runtime_cache.rs index d4b99f8b..7023a925 100644 --- a/crates/omnigraph/src/runtime_cache.rs +++ b/crates/omnigraph/src/runtime_cache.rs @@ -33,6 +33,13 @@ struct GraphIndexTableState { /// without e_tags (local FS); there a same-branch manifest refresh clears the /// cache as the fallback (the read-path gap in docs/dev/invariants.md). e_tag: Option, + /// The edge's `(from_type, to_type)` endpoint names at build time. `GraphIndex` + /// keys its `TypeIndex`es by these, and `execute_expand_csr` looks them up by + /// the *current* catalog's endpoint names — so a schema change that repoints an + /// edge type while leaving the edge table's physical identity unchanged must + /// invalidate the entry (else the reused index has the old type-index namespace + /// and the new traversal fails with "no type index for "). + endpoints: (String, String), } #[derive(Debug, Default)] @@ -168,8 +175,8 @@ fn graph_index_cache_key( edge_types: &HashMap, ) -> GraphIndexCacheKey { let mut edge_tables: Vec = edge_types - .keys() - .filter_map(|edge_name| { + .iter() + .filter_map(|(edge_name, endpoints)| { let table_key = format!("edge:{}", edge_name); resolved .snapshot @@ -179,6 +186,7 @@ fn graph_index_cache_key( table_version: entry.table_version, table_branch: entry.table_branch.clone(), e_tag: entry.version_metadata.e_tag().map(str::to_string), + endpoints: endpoints.clone(), }) }) .collect(); @@ -310,6 +318,7 @@ mod tests { table_version: 1, table_branch: None, e_tag: None, + endpoints: ("A".to_string(), "B".to_string()), }], } } @@ -318,6 +327,34 @@ mod tests { Arc::new(GraphIndex::empty_for_test()) } + /// An edge table at the same physical identity but a different `(from_type, + /// to_type)` endpoint mapping (a schema repoint) must NOT share a cache entry + /// — the built index's `TypeIndex` namespace is keyed by those endpoints. + #[test] + fn endpoint_remap_at_same_physical_identity_splits_cache_key() { + let base = GraphIndexTableState { + table_key: "edge:Knows".to_string(), + table_version: 7, + table_branch: None, + e_tag: Some("etag".to_string()), + endpoints: ("Person".to_string(), "Person".to_string()), + }; + let repointed = GraphIndexTableState { + endpoints: ("Person".to_string(), "Account".to_string()), + ..base.clone() + }; + let k_old = GraphIndexCacheKey { + edge_tables: vec![base], + }; + let k_new = GraphIndexCacheKey { + edge_tables: vec![repointed], + }; + assert_ne!( + k_old, k_new, + "a schema endpoint remap must produce a distinct graph-index cache key" + ); + } + #[test] fn graph_index_cache_evicts_oldest_entry() { let mut cache = GraphIndexCache::default(); diff --git a/docs/dev/invariants.md b/docs/dev/invariants.md index 9f320dd0..145e5b03 100644 --- a/docs/dev/invariants.md +++ b/docs/dev/invariants.md @@ -342,12 +342,20 @@ them explicit. `Expand`/`AntiJoin`, not every catalog edge — a single-edge join no longer scans the whole graph's edge data), and the `RuntimeCache` cache key is each edge table's physical identity `(table_key, version, table_branch, e_tag)` - rather than the resolved snapshot id, so a lazy-fork branch reuses main's built - index instead of cold-scanning it. Residual: on stores without per-table - e_tags (local FS) a branch deleted and recreated at the same version has the - same key, so the topology distinction falls back to the same-branch manifest - refresh clearing read caches (`invalidate_all`); production object stores carry - real e_tags, so the key alone distinguishes incarnations there. + plus the edge's `(from_type, to_type)` endpoint mapping — rather than the + resolved snapshot id — so a lazy-fork branch reuses main's built index instead + of cold-scanning it, while a schema repoint of an edge type (which changes the + built `TypeIndex` namespace) still rebuilds even if the edge table's physical + identity is unchanged. Residual: on stores without per-table e_tags (local FS) + a branch deleted and recreated at the same version with the same endpoints has + the same key, so the incarnation distinction falls back to the same-branch + manifest refresh clearing read caches (`invalidate_all`); production object + stores carry real e_tags, so the key alone distinguishes incarnations there. + Known narrow gap (local FS only): a cold *cross-branch* resolve of a + recreated branch (a long-lived reader bound to another branch) does not trigger + that same-branch refresh, so an e_tag-less recreated branch can still reuse a + stale entry until a same-branch read refreshes — acceptable because local FS is + a dev/test substrate and production carries e_tags. - **Commit-graph parent under concurrency — CLOSED (RFC-013 Phase 7):** the graph commit is now recorded in the manifest publish CAS, and the publisher resolves the new commit's parent INSIDE its retry loop, per attempt, from the just-loaded From 89011eb65827aecb2858ccc3394319c657995ad4 Mon Sep 17 00:00:00 2001 From: Ragnor Comerford Date: Sun, 28 Jun 2026 19:06:22 +0200 Subject: [PATCH 6/6] test(engine): scoped with_traversal_mode seam + e_tag graph-index coverage MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Replace the process-global OMNIGRAPH_TRAVERSAL_MODE env-mutation test hack (which forced #[serial] + dedicated all-serial binaries and was triplicated as ModeGuard + set_mode/clear_mode) with one general abstraction: a task-local `with_traversal_mode` seam mirroring `with_query_io_probes`. It is scope-bound (leak-free even on panic) and process-safe (never touches shared state), so a forced-mode test cannot affect a concurrent test in the same binary. `traversal_indexed_override` consults the seam first, then the env var (which stays the documented ops escape hatch). - Migrate traversal_indexed.rs, proptest_equivalence.rs, and the two topology cost tests (moved back to warm_read_cost.rs) to the seam; drop all ModeGuard / set_mode / clear_mode / #[serial] / per-file column0 helpers. - Consolidate the duplicated first-column extractors into one shared `helpers::first_column_sorted`. - Add `s3_storage.rs::s3_fresh_branch_traversal_reuses_main_graph_index_with_etags`: the CSR cache-key cross-branch reuse path on a REAL per-table e_tag (None on local FS, so local tests can't reach it). Confirmed empirically that RustFS — the CI S3 backend — surfaces ETags into version_metadata.e_tag(). CI path filter now triggers the rustfs job on runtime_cache/graph_index changes. --- .github/workflows/ci.yml | 1 + crates/omnigraph/src/exec/query.rs | 7 +- crates/omnigraph/src/instrumentation.rs | 26 ++ crates/omnigraph/tests/helpers/mod.rs | 20 ++ .../omnigraph/tests/proptest_equivalence.rs | 53 +--- crates/omnigraph/tests/s3_storage.rs | 85 +++++- crates/omnigraph/tests/traversal_indexed.rs | 251 +++--------------- crates/omnigraph/tests/warm_read_cost.rs | 165 ++++++++++-- docs/dev/invariants.md | 5 +- docs/dev/testing.md | 8 +- 10 files changed, 328 insertions(+), 293 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 1e9249f9..f5462aeb 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -90,6 +90,7 @@ jobs: crates/omnigraph/src/db/manifest.rs|crates/omnigraph/src/db/manifest/*) run_rustfs_ci=true ;; crates/omnigraph/tests/s3_storage.rs|crates/omnigraph/tests/write_cost_s3.rs|crates/omnigraph/tests/helpers/*) run_rustfs_ci=true ;; crates/omnigraph/src/table_store.rs|crates/omnigraph/src/instrumentation.rs) run_rustfs_ci=true ;; + crates/omnigraph/src/runtime_cache.rs|crates/omnigraph/src/graph_index/*) run_rustfs_ci=true ;; crates/omnigraph-cluster/src/store.rs|crates/omnigraph-cluster/src/serve.rs) run_rustfs_ci=true ;; crates/omnigraph-cluster/tests/s3_cluster.rs) run_rustfs_ci=true ;; crates/omnigraph-server/tests/s3.rs|crates/omnigraph-server/tests/support/*) run_rustfs_ci=true ;; diff --git a/crates/omnigraph/src/exec/query.rs b/crates/omnigraph/src/exec/query.rs index aaf071d8..b1a834f4 100644 --- a/crates/omnigraph/src/exec/query.rs +++ b/crates/omnigraph/src/exec/query.rs @@ -886,7 +886,12 @@ impl<'a> GraphIndexHandle<'a> { /// forces the path (ops escape hatch + test hook). Both modes are semantically /// identical, so the override only changes which path runs, never the result. fn traversal_indexed_override() -> Option { - match std::env::var("OMNIGRAPH_TRAVERSAL_MODE").ok().as_deref() { + // The scoped test seam (`with_traversal_mode`) takes precedence over the + // process-global `OMNIGRAPH_TRAVERSAL_MODE` ops escape hatch. + let mode = crate::instrumentation::traversal_mode_override() + .map(str::to_string) + .or_else(|| std::env::var("OMNIGRAPH_TRAVERSAL_MODE").ok()); + match mode.as_deref() { Some("indexed") => Some(true), Some("csr") => Some(false), _ => None, diff --git a/crates/omnigraph/src/instrumentation.rs b/crates/omnigraph/src/instrumentation.rs index 597729df..861592ec 100644 --- a/crates/omnigraph/src/instrumentation.rs +++ b/crates/omnigraph/src/instrumentation.rs @@ -86,6 +86,32 @@ fn current(f: impl FnOnce(&QueryIoProbes) -> R) -> Option { QUERY_IO_PROBES.try_with(f).ok() } +tokio::task_local! { + static TRAVERSAL_MODE_OVERRIDE: Option<&'static str>; +} + +/// Force the Expand execution mode (`"indexed"` | `"csr"`) for the scope of `fut` +/// WITHOUT mutating the process-global `OMNIGRAPH_TRAVERSAL_MODE` env var. This is +/// the general traversal-mode test seam: scope-bound (so it cannot leak — the +/// override is gone when `fut` resolves or unwinds) and process-safe (it never +/// touches shared state, so a forced-mode test never affects a concurrent test in +/// the same binary, removing the need for `#[serial]` + a dedicated all-serial +/// binary). Mirrors [`with_query_io_probes`]. The env var stays the production/ops +/// escape hatch; this scoped override takes precedence over it +/// (`exec::query::traversal_indexed_override`). +pub async fn with_traversal_mode(mode: &'static str, fut: F) -> F::Output +where + F: std::future::Future, +{ + TRAVERSAL_MODE_OVERRIDE.scope(Some(mode), fut).await +} + +/// The scoped traversal-mode override active for this task, if any. `None` in +/// production (no scope installed), so the env var is consulted instead. +pub(crate) fn traversal_mode_override() -> Option<&'static str> { + TRAVERSAL_MODE_OVERRIDE.try_with(|m| *m).ok().flatten() +} + pub(crate) fn manifest_wrapper() -> Option> { current(|p| p.manifest_wrapper.clone()).flatten() } diff --git a/crates/omnigraph/tests/helpers/mod.rs b/crates/omnigraph/tests/helpers/mod.rs index 13127f28..597a104a 100644 --- a/crates/omnigraph/tests/helpers/mod.rs +++ b/crates/omnigraph/tests/helpers/mod.rs @@ -110,6 +110,26 @@ pub async fn count_rows_branch(db: &Omnigraph, branch: &str, table_key: &str) -> ds.count_rows(None).await.unwrap() } +/// First result column as sorted strings — the shared shape the traversal / +/// cost tests use to compare a query's returned names. Empty for a 0-row result. +pub fn first_column_sorted(result: &QueryResult) -> Vec { + if result.num_rows() == 0 { + return Vec::new(); + } + let batch = result.concat_batches().unwrap(); + let col = batch + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + let mut v: Vec = (0..col.len()) + .filter(|&i| !col.is_null(i)) + .map(|i| col.value(i).to_string()) + .collect(); + v.sort(); + v +} + /// Collect all string values from a named column across batches. pub fn collect_column_strings(batches: &[RecordBatch], col: &str) -> Vec { let mut out = Vec::new(); diff --git a/crates/omnigraph/tests/proptest_equivalence.rs b/crates/omnigraph/tests/proptest_equivalence.rs index 3423a2fa..3b4011fa 100644 --- a/crates/omnigraph/tests/proptest_equivalence.rs +++ b/crates/omnigraph/tests/proptest_equivalence.rs @@ -9,10 +9,10 @@ //! third ExpandMode, the anti-join fast/slow fork) fail loudly instead of //! silently. //! -//! Each test is a sync `#[test]` + `#[serial]`: it builds its own runtime and -//! `block_on`s per generated case (proptest closures are sync), and the -//! mode-equivalence test writes `OMNIGRAPH_TRAVERSAL_MODE`, so serial execution -//! keeps env writes from racing other tests in this binary. +//! Each test is a sync `#[test]`: it builds its own runtime and `block_on`s per +//! generated case (proptest closures are sync). The mode-equivalence test forces +//! the Expand mode via the scoped `with_traversal_mode` seam — no env mutation, so +//! no `#[serial]` and no leak across shrink/cases. mod helpers; @@ -21,9 +21,9 @@ use std::collections::HashSet; use arrow_array::{Array, StringArray}; use proptest::prelude::*; use proptest::test_runner::{Config, TestRunner}; -use serial_test::serial; use omnigraph::db::{Omnigraph, ReadTarget}; +use omnigraph::instrumentation::with_traversal_mode; use omnigraph::loader::{LoadMode, load_jsonl}; use omnigraph_compiler::ir::ParamMap; use omnigraph_compiler::query::ast::Literal; @@ -138,27 +138,6 @@ fn config() -> Config { } } -fn clear_mode() { - unsafe { std::env::remove_var("OMNIGRAPH_TRAVERSAL_MODE") }; -} - -/// RAII guard that sets `OMNIGRAPH_TRAVERSAL_MODE` and clears it on drop — so a -/// panic mid-case (e.g. a query `unwrap`) cannot leak the forced mode into -/// proptest's subsequent shrink/cases and mask the divergence under test. SAFE: -/// every test in this binary is `#[serial]`, so no thread reads the env during -/// the write. -struct ModeGuard; -impl ModeGuard { - fn set(mode: &str) -> Self { - unsafe { std::env::set_var("OMNIGRAPH_TRAVERSAL_MODE", mode) }; - ModeGuard - } -} -impl Drop for ModeGuard { - fn drop(&mut self) { - unsafe { std::env::remove_var("OMNIGRAPH_TRAVERSAL_MODE") }; - } -} async fn load_graph(graph: &GenGraph) -> (tempfile::TempDir, Omnigraph) { let dir = tempfile::tempdir().unwrap(); @@ -203,7 +182,6 @@ async fn col0_set(db: &mut Omnigraph, name: &str, params: &ParamMap) -> HashSet< // one (worksAt{1,2}, collision-prone). This is the search-over-the-class version // of the hand-built cross-type-collision fixture. #[test] -#[serial] fn prop_expand_indexed_eq_csr() { let rt = tokio::runtime::Runtime::new().unwrap(); let mut runner = TestRunner::new(config()); @@ -214,17 +192,12 @@ fn prop_expand_indexed_eq_csr() { for start in graph.persons.clone() { let p = one_param(&start); for q in ["friends", "employers"] { - // Each guard clears the mode on drop (end of the block, - // or on panic), so a forced mode never leaks across runs. - let csr = { - let _g = ModeGuard::set("csr"); - col0_sorted(&mut db, q, &p).await - }; - let indexed = { - let _g = ModeGuard::set("indexed"); - col0_sorted(&mut db, q, &p).await - }; - // No guard → env unset → auto (cost-based) path. + // The seam is scope-bound: the forced mode is gone when the + // wrapped future resolves, so it never leaks across runs. + let csr = with_traversal_mode("csr", col0_sorted(&mut db, q, &p)).await; + let indexed = + with_traversal_mode("indexed", col0_sorted(&mut db, q, &p)).await; + // No override → auto (cost-based) path. let auto = col0_sorted(&mut db, q, &p).await; if csr != indexed || csr != auto { return Some((start, q, csr, indexed, auto)); @@ -247,9 +220,7 @@ fn prop_expand_indexed_eq_csr() { // destination type's loaded key set — independent of the two-mode comparison, so // it catches over-emission even if both modes are wrong identically. #[test] -#[serial] fn prop_results_subset_of_existing_nodes() { - clear_mode(); let rt = tokio::runtime::Runtime::new().unwrap(); let mut runner = TestRunner::new(config()); runner @@ -282,9 +253,7 @@ fn prop_results_subset_of_existing_nodes() { // INVARIANT 3: anti-join complement. `not { $p worksAt $_ }` and its complement // (persons WITH a worksAt) must be disjoint and together cover all persons. #[test] -#[serial] fn prop_antijoin_partitions_persons() { - clear_mode(); let rt = tokio::runtime::Runtime::new().unwrap(); let mut runner = TestRunner::new(config()); runner diff --git a/crates/omnigraph/tests/s3_storage.rs b/crates/omnigraph/tests/s3_storage.rs index 3814600e..eb546f81 100644 --- a/crates/omnigraph/tests/s3_storage.rs +++ b/crates/omnigraph/tests/s3_storage.rs @@ -1,7 +1,11 @@ mod helpers; +use std::sync::Arc; +use std::sync::atomic::{AtomicU64, Ordering}; + use omnigraph::db::MergeOutcome; -use omnigraph::db::Omnigraph; +use omnigraph::db::{Omnigraph, ReadTarget}; +use omnigraph::instrumentation::{QueryIoProbes, with_query_io_probes, with_traversal_mode}; use omnigraph::loader::{LoadMode, load_jsonl}; use helpers::*; @@ -244,3 +248,82 @@ async fn s3_schema_apply_migrates_live_graph() { "live S3 schema must carry the migration" ); } + +/// Graph-index (CSR topology) cross-branch reuse on a real object store, where the +/// cache key's per-table `e_tag` is a genuine non-`None` token (Lance e_tag is +/// `None` on local FS, so the local twin in `warm_read_cost.rs` keys on `None` — +/// this exercises the e_tag-present path production runs). With e_tags present, a +/// fresh lazy-fork branch reuses main's cached index (`graph_build_count == 0`). +/// Forces CSR via the scoped `with_traversal_mode` seam (no env mutation, so no +/// interference with the other tests in this binary). +#[tokio::test(flavor = "multi_thread")] +async fn s3_fresh_branch_traversal_reuses_main_graph_index_with_etags() { + let Some(uri) = s3_test_graph_uri("graph-index-etag") else { + eprintln!("skipping s3 graph-index test: OMNIGRAPH_S3_TEST_BUCKET is not set"); + return; + }; + + let mut writer = Omnigraph::init(&uri, TEST_SCHEMA).await.unwrap(); + // TEST_DATA seeds Alice->Bob and Alice->Charlie Knows edges. + load_jsonl(&mut writer, TEST_DATA, LoadMode::Overwrite) + .await + .unwrap(); + + // Separate reader: it never creates the branch, so branch_create below does + // not invalidate the reader's warm cache. + let reader = Omnigraph::open(&uri).await.unwrap(); + + // Warm main on the CSR path: builds + caches the topology index keyed by the + // edge table's physical identity incl. its real e_tag. + let warm = with_traversal_mode( + "csr", + reader.query( + ReadTarget::branch("main"), + TEST_QUERIES, + "friends_of", + ¶ms(&[("$name", "Alice")]), + ), + ) + .await + .unwrap(); + assert_eq!( + first_column_sorted(&warm), + vec!["Bob", "Charlie"], + "test setup: Alice knows Bob and Charlie" + ); + + // Lazy fork: feature's edge tables are physically main's (same version + + // e_tag, table_branch = None). + writer.branch_create("feature").await.unwrap(); + + let graph_build = Arc::new(AtomicU64::new(0)); + let probes = QueryIoProbes { + graph_build_count: Arc::clone(&graph_build), + ..Default::default() + }; + let on_branch = with_traversal_mode( + "csr", + with_query_io_probes( + probes, + reader.query( + ReadTarget::branch("feature"), + TEST_QUERIES, + "friends_of", + ¶ms(&[("$name", "Alice")]), + ), + ), + ) + .await + .unwrap(); + + assert_eq!( + first_column_sorted(&on_branch), + vec!["Bob", "Charlie"], + "fresh branch sees main's edges (lazy fork) and the reused index is correct" + ); + assert_eq!( + graph_build.load(Ordering::Relaxed), + 0, + "with real e_tags, a fresh lazy-fork branch must reuse main's cached CSR index, not rebuild" + ); +} diff --git a/crates/omnigraph/tests/traversal_indexed.rs b/crates/omnigraph/tests/traversal_indexed.rs index 78d68852..cadd3d23 100644 --- a/crates/omnigraph/tests/traversal_indexed.rs +++ b/crates/omnigraph/tests/traversal_indexed.rs @@ -1,102 +1,43 @@ //! BTREE-indexed Expand path (`execute_expand_indexed`) coverage. //! -//! These tests force the Expand execution mode via `OMNIGRAPH_TRAVERSAL_MODE` -//! and assert the indexed path matches the CSR path (both are semantically -//! identical — the indexed path just serves neighbor lookups from the persisted -//! src/dst BTREE instead of an in-memory CSR). They live in their own test -//! binary and are all `#[serial]`, so the env writes never race a concurrent -//! reader: within this process serial execution serializes every env read, and -//! other test binaries (e.g. `traversal.rs`) are separate processes whose env -//! stays unset (→ CSR), validating the shared hydrate/align tail on the CSR path. +//! These tests force the Expand execution mode via the scoped `with_traversal_mode` +//! test seam — NOT the process-global `OMNIGRAPH_TRAVERSAL_MODE` env var — and +//! assert the indexed path matches the CSR path (both are semantically identical: +//! the indexed path serves neighbor lookups from the persisted src/dst BTREE +//! instead of an in-memory CSR). The seam is scope-bound and process-safe, so +//! these tests need no `#[serial]` and no dedicated binary. mod helpers; -use std::sync::Arc; -use std::sync::atomic::{AtomicU64, Ordering}; - -use arrow_array::{Array, StringArray}; - -use omnigraph::db::{Omnigraph, ReadTarget}; -use omnigraph::instrumentation::{QueryIoProbes, with_query_io_probes}; +use omnigraph::db::Omnigraph; +use omnigraph::instrumentation::with_traversal_mode; use omnigraph::loader::{LoadMode, load_jsonl}; use omnigraph::table_store::{IndexCoverage, TableStore}; use omnigraph_compiler::ir::ParamMap; -use omnigraph_compiler::result::QueryResult; -use serial_test::serial; use helpers::*; -fn set_mode(mode: &str) { - // SAFE: every test here is #[serial] and this binary has no non-serial - // env reader, so no thread reads the environment during this write. - unsafe { std::env::set_var("OMNIGRAPH_TRAVERSAL_MODE", mode) }; -} - -fn clear_mode() { - unsafe { std::env::remove_var("OMNIGRAPH_TRAVERSAL_MODE") }; -} - -/// RAII guard that sets `OMNIGRAPH_TRAVERSAL_MODE` and clears it on drop, so a -/// panic mid-test (e.g. a query `unwrap`) cannot leak the forced mode into a -/// later test in this binary. SAFE: every test here is `#[serial]` and this -/// binary has no non-serial env reader, so no thread reads the env during the -/// write. (Mirrors `proptest_equivalence.rs::ModeGuard`.) -struct ModeGuard; -impl ModeGuard { - fn set(mode: &str) -> Self { - set_mode(mode); - ModeGuard - } -} -impl Drop for ModeGuard { - fn drop(&mut self) { - clear_mode(); - } -} - -/// First result column, sorted — for the probe-based topology-build tests below. -fn column0(result: &QueryResult) -> Vec { - if result.num_rows() == 0 { - return Vec::new(); - } - let batch = result.concat_batches().unwrap(); - let col = batch - .column(0) - .as_any() - .downcast_ref::() - .unwrap(); - let mut v: Vec = (0..col.len()).map(|i| col.value(i).to_string()).collect(); - v.sort(); - v -} - -/// Run a name-returning query and return its first column, sorted. +/// Run `name` on main under the cost-chooser (auto) Expand mode; first column sorted. async fn sorted_names(db: &mut Omnigraph, queries: &str, name: &str, params: &ParamMap) -> Vec { - let result = query_main(db, queries, name, params).await.unwrap(); - if result.num_rows() == 0 { - return Vec::new(); - } - let batch = result.concat_batches().unwrap(); - let col = batch - .column(0) - .as_any() - .downcast_ref::() - .unwrap(); - let mut v: Vec = (0..col.len()).map(|i| col.value(i).to_string()).collect(); - v.sort(); - v + first_column_sorted(&query_main(db, queries, name, params).await.unwrap()) } /// Run the same query under CSR, indexed, and auto (cost-chooser) modes; assert -/// all three produce identical results and return them. The auto pass exercises -/// `choose_expand_mode` end to end: whichever path it selects, the rows must -/// match the forced paths (the chooser changes which path runs, never the result). +/// all three produce identical results and return them. The forced modes use the +/// scoped `with_traversal_mode` seam; the auto pass exercises `choose_expand_mode` +/// end to end (whichever path it selects, the rows must match the forced paths — +/// the chooser changes which path runs, never the result). async fn both_modes(db: &mut Omnigraph, queries: &str, name: &str, params: &ParamMap) -> Vec { - set_mode("csr"); - let csr = sorted_names(db, queries, name, params).await; - set_mode("indexed"); - let indexed = sorted_names(db, queries, name, params).await; - clear_mode(); + let csr = first_column_sorted( + &with_traversal_mode("csr", query_main(db, queries, name, params)) + .await + .unwrap(), + ); + let indexed = first_column_sorted( + &with_traversal_mode("indexed", query_main(db, queries, name, params)) + .await + .unwrap(), + ); let auto = sorted_names(db, queries, name, params).await; assert_eq!( indexed, csr, @@ -111,7 +52,6 @@ async fn both_modes(db: &mut Omnigraph, queries: &str, name: &str, params: &Para // The C6 index-coverage guard: `key_column_index_coverage` must report whether // a `key_col IN (...)` scan will use the persisted BTREE or silently full-scan. -// Not #[serial] — it calls the helper directly and reads no env. #[tokio::test] async fn key_column_index_coverage_detects_btree_presence() { let dir = tempfile::tempdir().unwrap(); @@ -175,7 +115,6 @@ async fn coverage_degrades_for_appended_unindexed_fragment() { } #[tokio::test] -#[serial] async fn indexed_matches_csr_one_hop_same_type() { let dir = tempfile::tempdir().unwrap(); let mut db = init_and_load(&dir).await; @@ -185,7 +124,6 @@ async fn indexed_matches_csr_one_hop_same_type() { } #[tokio::test] -#[serial] async fn indexed_matches_csr_multi_hop_same_type() { let dir = tempfile::tempdir().unwrap(); let mut db = init_and_load(&dir).await; @@ -204,7 +142,6 @@ query reach($name: String) { } #[tokio::test] -#[serial] async fn indexed_matches_csr_cross_type() { let dir = tempfile::tempdir().unwrap(); let mut db = init_and_load(&dir).await; @@ -222,7 +159,6 @@ query employer($name: String) { } #[tokio::test] -#[serial] async fn indexed_matches_csr_no_match() { let dir = tempfile::tempdir().unwrap(); let mut db = init_and_load(&dir).await; @@ -232,7 +168,6 @@ async fn indexed_matches_csr_no_match() { } #[tokio::test] -#[serial] async fn indexed_finds_unindexed_appended_edge() { let dir = tempfile::tempdir().unwrap(); let mut db = init_and_load(&dir).await; @@ -251,9 +186,14 @@ async fn indexed_finds_unindexed_appended_edge() { .await .unwrap(); - set_mode("indexed"); - let got = sorted_names(&mut db, TEST_QUERIES, "friends_of", ¶ms(&[("$name", "Alice")])).await; - clear_mode(); + let got = first_column_sorted( + &with_traversal_mode( + "indexed", + query_main(&mut db, TEST_QUERIES, "friends_of", ¶ms(&[("$name", "Alice")])), + ) + .await + .unwrap(), + ); assert_eq!( got, @@ -273,7 +213,6 @@ async fn indexed_finds_unindexed_appended_edge() { // CSR path never produces. `both_modes` (csr == indexed == auto) plus the // golden assert catch both the divergence and an over-emitting shared bug. #[tokio::test] -#[serial] async fn cross_type_id_collision_does_not_bleed_into_second_hop() { const SCHEMA: &str = r#" node Person { name: String @key } @@ -327,7 +266,6 @@ query reach($name: String) { // bounded range deliberately: an unbounded `{1,}` is a typecheck error, not a // runtime path. `both_modes` also confirms indexed == csr on the cycle. #[tokio::test] -#[serial] async fn variable_hops_terminate_and_dedup_on_cycle() { let dir = tempfile::tempdir().unwrap(); let uri = dir.path().to_str().unwrap(); @@ -349,7 +287,6 @@ async fn variable_hops_terminate_and_dedup_on_cycle() { // A self-loop a->a plus a->b. Variable-length traversal must not loop forever and // must not re-emit the seeded source. #[tokio::test] -#[serial] async fn variable_hops_handle_self_loop() { let dir = tempfile::tempdir().unwrap(); let uri = dir.path().to_str().unwrap(); @@ -364,127 +301,3 @@ async fn variable_hops_handle_self_loop() { // a->a hits the seeded source (pruned); only b is reached. assert_eq!(got, vec!["b"]); } - -// ─── Topology-index build cost (A1 cross-branch reuse + A2 scoped build) ───── -// -// These force the CSR build path (the indexed path builds no topology) and read -// the `graph_build_count` / `graph_edges_built` probes. They live HERE — the -// all-serial binary with no non-serial env reader — because they mutate the -// process-global `OMNIGRAPH_TRAVERSAL_MODE`, which `query.rs` reads; in a mixed -// serial/non-serial binary a concurrent non-serial traversal would race the env -// write. The `ModeGuard` clears the override even on a panic. - -/// A1: a fresh (unwritten) branch reuses main's cached CSR topology index -/// (`graph_build_count == 0`), and the reused index returns correct results for -/// the branch. Before A1 the branch-keyed snapshot id forced a rebuild (count 1). -#[tokio::test] -#[serial] -async fn fresh_branch_traversal_reuses_main_graph_index() { - let dir = tempfile::tempdir().unwrap(); - let mut writer = init_and_load(&dir).await; - let uri = dir.path().to_str().unwrap(); - // A Knows edge on main so there is topology to build and then reuse. - mutate_main( - &mut writer, - MUTATION_QUERIES, - "insert_person_and_friend", - &mixed_params(&[("$name", "Walker"), ("$friend", "Alice")], &[("$age", 41)]), - ) - .await - .unwrap(); - - // Separate reader handle. As in production, the reader never creates the - // branch, so creating it does not invalidate the reader's warm cache. - let reader = Omnigraph::open(uri).await.unwrap(); - - let _mode = ModeGuard::set("csr"); - // Reader warms main on the CSR path: builds and caches the topology index. - let warm = reader - .query( - ReadTarget::branch("main"), - TEST_QUERIES, - "friends_of", - ¶ms(&[("$name", "Walker")]), - ) - .await - .unwrap(); - assert_eq!(column0(&warm), vec!["Alice"], "test setup: main has the Knows edge"); - - // A separate writer creates the branch (lazy fork: feature's edge tables are - // physically main's — same version + e_tag, table_branch=None). - writer.branch_create("feature").await.unwrap(); - - let graph_build = Arc::new(AtomicU64::new(0)); - let probes_in = QueryIoProbes { - graph_build_count: Arc::clone(&graph_build), - ..Default::default() - }; - let on_branch = with_query_io_probes( - probes_in, - reader.query( - ReadTarget::branch("feature"), - TEST_QUERIES, - "friends_of", - ¶ms(&[("$name", "Walker")]), - ), - ) - .await - .unwrap(); - - assert_eq!( - column0(&on_branch), - vec!["Alice"], - "fresh branch sees main's edges (lazy fork) and the reused index is correct" - ); - assert_eq!( - graph_build.load(Ordering::Relaxed), - 0, - "a fresh branch with unchanged edges must reuse main's cached CSR index, not rebuild it" - ); -} - -/// A2: a query referencing one edge type builds the topology for only that edge, -/// not every edge in the catalog. Forces CSR (the build path) and counts edge -/// tables built. Before A2 the build materialized all catalog edges (the fixture -/// defines Knows + WorksAt, so a build-all touches >= 2) — the cold-build cost. -#[tokio::test] -#[serial] -async fn single_edge_query_builds_only_referenced_edge() { - let dir = tempfile::tempdir().unwrap(); - let mut db = init_and_load(&dir).await; - // A Knows edge so the referenced build has topology; the fixture also defines - // WorksAt, so a build-all would touch more than one edge. - mutate_main( - &mut db, - MUTATION_QUERIES, - "insert_person_and_friend", - &mixed_params(&[("$name", "Walker"), ("$friend", "Alice")], &[("$age", 41)]), - ) - .await - .unwrap(); - - let _mode = ModeGuard::set("csr"); - let graph_edges = Arc::new(AtomicU64::new(0)); - let probes_in = QueryIoProbes { - graph_edges_built: Arc::clone(&graph_edges), - ..Default::default() - }; - let result = with_query_io_probes( - probes_in, - db.query( - ReadTarget::branch("main"), - TEST_QUERIES, - "friends_of", - ¶ms(&[("$name", "Walker")]), - ), - ) - .await - .unwrap(); - - assert_eq!(column0(&result), vec!["Alice"]); - assert_eq!( - graph_edges.load(Ordering::Relaxed), - 1, - "a query referencing only `knows` must build only that edge, not all catalog edges" - ); -} diff --git a/crates/omnigraph/tests/warm_read_cost.rs b/crates/omnigraph/tests/warm_read_cost.rs index 4a34ef5f..cf0724f7 100644 --- a/crates/omnigraph/tests/warm_read_cost.rs +++ b/crates/omnigraph/tests/warm_read_cost.rs @@ -7,34 +7,18 @@ mod helpers; -use arrow_array::{Array, StringArray}; +use std::sync::Arc; +use std::sync::atomic::{AtomicU64, Ordering}; + use omnigraph::db::{Omnigraph, ReadTarget}; -use omnigraph_compiler::result::QueryResult; +use omnigraph::instrumentation::{QueryIoProbes, with_query_io_probes, with_traversal_mode}; use helpers::cost::{cost_harness, measure}; use helpers::{ - MUTATION_QUERIES, TEST_QUERIES, commit_many, count_rows, init_and_load, mixed_params, - mutate_branch, mutate_main, params, + MUTATION_QUERIES, TEST_QUERIES, commit_many, count_rows, first_column_sorted, init_and_load, + mixed_params, mutate_branch, mutate_main, params, }; -fn first_column_strings(result: &QueryResult) -> Vec { - if result.num_rows() == 0 { - return Vec::new(); - } - let batch = result.concat_batches().unwrap(); - let values = batch - .column(0) - .as_any() - .downcast_ref::() - .unwrap(); - let mut out = (0..values.len()) - .filter(|&row| !values.is_null(row)) - .map(|row| values.value(row).to_string()) - .collect::>(); - out.sort(); - out -} - /// A warm same-branch read must do ZERO `__manifest` object-store reads and must /// not open the commit graph, even at commit-history depth. Wrapped in /// `cost_harness`, so `manifest_reads` is ground truth: the warm-coordinator @@ -498,7 +482,7 @@ async fn recreated_branch_traversal_uses_graph_index_incarnation() { ) .await .unwrap(); - assert_eq!(first_column_strings(&old_friends), vec!["Alice"]); + assert_eq!(first_column_sorted(&old_friends), vec!["Alice"]); let old_edge_entry = reader .snapshot_of(ReadTarget::branch("feature")) .await @@ -545,7 +529,7 @@ async fn recreated_branch_traversal_uses_graph_index_incarnation() { .await; let new_friends = new_friends.unwrap(); assert_eq!( - first_column_strings(&new_friends), + first_column_sorted(&new_friends), vec!["Bob"], "traversal must use the recreated branch's topology, not stale cached graph index" ); @@ -568,7 +552,7 @@ async fn recreated_branch_traversal_uses_graph_index_incarnation() { .await .unwrap(); assert_eq!( - first_column_strings(&stale_old_friends), + first_column_sorted(&stale_old_friends), Vec::::new(), "old branch topology must not leak after branch recreation" ); @@ -733,3 +717,134 @@ async fn write_invalidates_table_cache_for_changed_table() { "the post-write read observes the new row (no stale handle served)" ); } + +// ─── Topology-index build cost (A1 cross-branch reuse + A2 scoped build) ───── +// +// These force the CSR build path (the indexed path builds no topology) via the +// scoped `with_traversal_mode` seam — no process-global env, so they are safe in +// this mixed serial/non-serial binary and need no `#[serial]`. They read the +// `graph_build_count` / `graph_edges_built` probes off a directly-constructed +// `QueryIoProbes`. + +/// A1: a fresh (unwritten) branch reuses main's cached CSR topology index +/// (`graph_build_count == 0`), and the reused index returns correct results for +/// the branch. Before A1 the branch-keyed snapshot id forced a rebuild (count 1). +#[tokio::test] +async fn fresh_branch_traversal_reuses_main_graph_index() { + let dir = tempfile::tempdir().unwrap(); + let mut writer = init_and_load(&dir).await; + let uri = dir.path().to_str().unwrap(); + // A Knows edge on main so there is topology to build and then reuse. + mutate_main( + &mut writer, + MUTATION_QUERIES, + "insert_person_and_friend", + &mixed_params(&[("$name", "Walker"), ("$friend", "Alice")], &[("$age", 41)]), + ) + .await + .unwrap(); + + // Separate reader handle. As in production, the reader never creates the + // branch, so creating it does not invalidate the reader's warm cache. + let reader = Omnigraph::open(uri).await.unwrap(); + + // Reader warms main on the CSR path: builds and caches the topology index. + let warm = with_traversal_mode( + "csr", + reader.query( + ReadTarget::branch("main"), + TEST_QUERIES, + "friends_of", + ¶ms(&[("$name", "Walker")]), + ), + ) + .await + .unwrap(); + assert_eq!( + first_column_sorted(&warm), + vec!["Alice"], + "test setup: main has the Knows edge" + ); + + // A separate writer creates the branch (lazy fork: feature's edge tables are + // physically main's — same version + e_tag, table_branch=None). + writer.branch_create("feature").await.unwrap(); + + let graph_build = Arc::new(AtomicU64::new(0)); + let probes = QueryIoProbes { + graph_build_count: Arc::clone(&graph_build), + ..Default::default() + }; + let on_branch = with_traversal_mode( + "csr", + with_query_io_probes( + probes, + reader.query( + ReadTarget::branch("feature"), + TEST_QUERIES, + "friends_of", + ¶ms(&[("$name", "Walker")]), + ), + ), + ) + .await + .unwrap(); + + assert_eq!( + first_column_sorted(&on_branch), + vec!["Alice"], + "fresh branch sees main's edges (lazy fork) and the reused index is correct" + ); + assert_eq!( + graph_build.load(Ordering::Relaxed), + 0, + "a fresh branch with unchanged edges must reuse main's cached CSR index, not rebuild it" + ); +} + +/// A2: a query referencing one edge type builds the topology for only that edge, +/// not every edge in the catalog. Forces CSR (the build path) and counts edge +/// tables built. Before A2 the build materialized all catalog edges (the fixture +/// defines Knows + WorksAt, so a build-all touches >= 2) — the cold-build cost. +#[tokio::test] +async fn single_edge_query_builds_only_referenced_edge() { + let dir = tempfile::tempdir().unwrap(); + let mut db = init_and_load(&dir).await; + // A Knows edge so the referenced build has topology; the fixture also defines + // WorksAt, so a build-all would touch more than one edge. + mutate_main( + &mut db, + MUTATION_QUERIES, + "insert_person_and_friend", + &mixed_params(&[("$name", "Walker"), ("$friend", "Alice")], &[("$age", 41)]), + ) + .await + .unwrap(); + + let graph_edges = Arc::new(AtomicU64::new(0)); + let probes = QueryIoProbes { + graph_edges_built: Arc::clone(&graph_edges), + ..Default::default() + }; + let result = with_traversal_mode( + "csr", + with_query_io_probes( + probes, + db.query( + ReadTarget::branch("main"), + TEST_QUERIES, + "friends_of", + ¶ms(&[("$name", "Walker")]), + ), + ), + ) + .await + .unwrap(); + + assert_eq!(first_column_sorted(&result), vec!["Alice"]); + assert_eq!( + graph_edges.load(Ordering::Relaxed), + 1, + "a query referencing only `knows` must build only that edge, not all catalog edges" + ); +} diff --git a/docs/dev/invariants.md b/docs/dev/invariants.md index 145e5b03..ebad595e 100644 --- a/docs/dev/invariants.md +++ b/docs/dev/invariants.md @@ -350,7 +350,10 @@ them explicit. a branch deleted and recreated at the same version with the same endpoints has the same key, so the incarnation distinction falls back to the same-branch manifest refresh clearing read caches (`invalidate_all`); production object - stores carry real e_tags, so the key alone distinguishes incarnations there. + stores carry real e_tags, so the key alone distinguishes incarnations there + (the e_tag-present cross-branch-reuse path is exercised in CI by + `s3_storage.rs::s3_fresh_branch_traversal_reuses_main_graph_index_with_etags` + against RustFS, which surfaces real ETags — local-FS tests cannot reach it). Known narrow gap (local FS only): a cold *cross-branch* resolve of a recreated branch (a long-lived reader bound to another branch) does not trigger that same-branch refresh, so an e_tag-less recreated branch can still reuse a diff --git a/docs/dev/testing.md b/docs/dev/testing.md index 3a1642cc..be8f83fb 100644 --- a/docs/dev/testing.md +++ b/docs/dev/testing.md @@ -25,7 +25,7 @@ The engine's `tests/` is the principal coverage surface; most graph-shaped behav | `staged_writes.rs` | TableStore staged-write primitives (`stage_append`, `stage_merge_insert`, `commit_staged`, `scan_with_staged`, `count_rows_with_staged`) — primitive-level only; engine code uses the in-memory `MutationStaging` accumulator instead | | `forbidden_apis.rs` | Defense-in-depth source-walk guard: engine code (`exec/`, `db/omnigraph/`, `loader/`, `changes/`) must not reach around the sealed storage trait to Lance inline-commit APIs, nor open datasets directly (`Dataset::open` / `DatasetBuilder::from_uri`/`from_namespace`) — reads route through `Snapshot::open` and the held-handle cache; `// forbidden-api-allow: ` sentinel exempts reviewed lines | | `lance_surface_guards.rs` | Pins the Lance API surfaces omnigraph depends on (named runtime + compile-only guards; see [lance.md](lance.md)) — the first smoke check on any Lance version bump; e.g. `compact_files_still_fails_on_blob_columns` turns red when the upstream blob-compaction fix lands | -| `warm_read_cost.rs` | Cost-budget tests for the warm read path (query-latency work), measured at the object-store boundary with Lance `IOTracker` (the LanceDB IO-counted pattern): a warm same-branch read does 0 manifest opens, 1 version probe, validates the schema once (Fix 1 / finding A / Fix 2 at commit-history depth); stale same-branch reads perform exactly 2 probes and refresh manifest-only; recreated non-main branches with the same Lance version refresh by incarnation; recreated branch-owned table handles are distinguished by table e_tag or refresh-time cache clearing; recreated traversal topology is protected by per-edge-table e_tag in the graph-index cache key or refresh-time cache clearing; a warm *repeat* read does 0 table opens via the held-handle cache and a write re-opens only the changed table at its new version/e_tag (Fix 3/6A). See "Cost-budget tests" below. (The CSR topology-build cost guards — physical-identity cache key + scoped build — live in `traversal_indexed.rs`, since they force `OMNIGRAPH_TRAVERSAL_MODE` and must run in an all-serial binary.) | +| `warm_read_cost.rs` | Cost-budget tests for the warm read path (query-latency work), measured at the object-store boundary with Lance `IOTracker` (the LanceDB IO-counted pattern): a warm same-branch read does 0 manifest opens, 1 version probe, validates the schema once (Fix 1 / finding A / Fix 2 at commit-history depth); stale same-branch reads perform exactly 2 probes and refresh manifest-only; recreated non-main branches with the same Lance version refresh by incarnation; recreated branch-owned table handles are distinguished by table e_tag or refresh-time cache clearing; recreated traversal topology is protected by per-edge-table e_tag in the graph-index cache key or refresh-time cache clearing; a warm *repeat* read does 0 table opens via the held-handle cache and a write re-opens only the changed table at its new version/e_tag (Fix 3/6A). Also the CSR topology-build cost guards: `fresh_branch_traversal_reuses_main_graph_index` (A1 — a lazy-fork branch reuses main's cached CSR index, 0 rebuilds via `graph_build_count`) and `single_edge_query_builds_only_referenced_edge` (A2 — a one-edge query builds only that edge via `graph_edges_built`); both force CSR via the scoped `with_traversal_mode` seam, so they need no `#[serial]`. See "Cost-budget tests" below. | | `write_cost.rs` | Cost-budget tests for the WRITE path (RFC-013), the latency twin of `warm_read_cost.rs` on the **shared `helpers::cost` harness** (`measure`/`IoCounts`/`assert_flat`/`local_graph`). Runs on **local FS**; gates the **internal-table** term (`__manifest` scans flat in commit-history depth, lineage rows included — `internal_table_scans_are_flat_in_history`, now **green every-PR** since RFC-013 step 2 brought the internal tables into `optimize`; the test compacts at each depth before measuring) plus green every-PR guards (single-insert `data_writes` bounded, a per-write read-op ceiling that fails the moment a round-trip is added, and a `measure_with_staged` fitness assert that a keyed insert routes through `stage_merge_insert` once with no `stage_append`/vector-index build). The **data-table opener** term is S3-only — see `write_cost_s3.rs` and the backend-split note in "Cost-budget tests" below | | `helpers/cost.rs` | The shared cost-budget harness (not a test): `IoCounts`/`StagedCounts` (counts by table class), `measure`/`measure_with_staged` (the one place the `with_query_io_probes` + `MergeWriteProbes` task-local + `IOTracker` wiring lives; reads per-op deltas via lance's `incremental_stats()`, the upstream per-request idiom from `rust/lance/src/dataset/tests/dataset_io.rs`), `cost_harness`/`GraphIoMeter` (installs ONE `__manifest` `IOTracker` for a whole test body so the graph opens **under** it and `manifest_reads` is **ground truth** — every read regardless of handle age, the warm-coordinator freshness probe included — closing the blind spot where a per-op tracker installed at measure time cannot see a long-lived handle's reads; outside `cost_harness`, `measure` falls back to fresh per-op tracking, so `write_cost_s3.rs` is unaffected), `last_manifest_reads()` (the manifest read log for `assert_io_eq!`-style failure diagnostics), `assert_flat(curve, select, slack, what)`, and store-agnostic `local_graph`/`s3_graph` fixtures. `warm_read_cost.rs`, `write_cost.rs`, and `write_cost_s3.rs` all consume it so a cost test body is written once and reads in one vocabulary | | `lifecycle.rs` | Graph lifecycle, schema state | @@ -35,13 +35,13 @@ The engine's `tests/` is the principal coverage surface; most graph-shaped behav | `schema_apply.rs` | Migration plan + apply, schema-apply lock; index materialization deferred to the reconciler (iss-848): `apply_schema_defers_vector_index_on_empty_table` (an empty-table Vector `@index` never aborts the apply) and `index_only_constraint_apply_touches_no_table_data` (adding an `@index` is metadata-only — no table-version bump) | | `search.rs` | FTS / vector / hybrid (`bm25`, `nearest`, `rrf`) | | `traversal.rs` | `Expand`, variable-length hops, anti-join (CSR path — `OMNIGRAPH_TRAVERSAL_MODE` unset) | -| `traversal_indexed.rs` | BTREE-indexed Expand (`execute_expand_indexed`) forced via `OMNIGRAPH_TRAVERSAL_MODE`, asserted semantically equal to the CSR path; own binary, all `#[serial]` so env writes never race (the `ModeGuard` RAII helper clears the override even on a panic). Also home to the CSR topology-build cost guards (which force the same env): `fresh_branch_traversal_reuses_main_graph_index` (A1 — a lazy-fork branch reuses main's cached CSR index, 0 rebuilds via the `graph_build_count` probe) and `single_edge_query_builds_only_referenced_edge` (A2 — a one-edge query builds only that edge via the `graph_edges_built` probe). | +| `traversal_indexed.rs` | BTREE-indexed Expand (`execute_expand_indexed`) forced via the scoped `with_traversal_mode` seam (not the env var), asserted semantically equal to the CSR path. No `#[serial]` needed — the seam is scope-bound and process-safe. (The CSR topology-build cost guards — `fresh_branch_traversal_reuses_main_graph_index` (A1, `graph_build_count`) and `single_edge_query_builds_only_referenced_edge` (A2, `graph_edges_built`) — live in `warm_read_cost.rs`.) | | `proptest_equivalence.rs` | Property-based query-correctness invariants over generated graphs (shared key alphabet forces cross-type id collisions, cycles, self-loops) — pins Expand-mode equivalence so a future fork divergence fails loudly instead of silently; `#[serial]` | | `ordering.rs` | ORDER BY contract: descending, multi-key precedence, deterministic key-column tie-break (total order, so `ORDER … LIMIT` is deterministic), NULL placement (`nulls_first = !descending`) | | `literal_filters.rs` | Execution goldens for non-string/non-integer scalar literal filters (F64/F32/Bool/Date/DateTime) across both the in-memory comparison arm and the Lance-pushdown arm | | `aggregation.rs` | `count`, `sum`, `avg`, `min`, `max` | | `export.rs` | NDJSON streaming export filters | -| `s3_storage.rs` | S3-backed graph (skipped unless `OMNIGRAPH_S3_TEST_BUCKET` is set) | +| `s3_storage.rs` | S3-backed graph (skipped unless `OMNIGRAPH_S3_TEST_BUCKET` is set). Includes `s3_fresh_branch_traversal_reuses_main_graph_index_with_etags` — the CSR topology cache-key test on a **real** per-table e_tag (`None` on local FS, so `warm_read_cost.rs` can't reach this path); forces CSR via the scoped `with_traversal_mode` seam | | `lance_version_columns.rs` | Per-row `_row_last_updated_at_version` behavior | | `validators.rs` | Schema constraint enforcement (enum, range, unique, cardinality) across JSONL, insert, update paths | | `policy_engine_chassis.rs` | Engine-layer Cedar enforcement (MR-722): allow + deny through every `_as` writer via the SDK directly — no HTTP — proving embedded and CLI callers hit the same gate as the server, with action × scope shapes matching `authorize_request` | @@ -75,7 +75,7 @@ The engine's `tests/` is the principal coverage surface; most graph-shaped behav CI runs these S3-backed tests against a containerized RustFS server (`.github/workflows/ci.yml` → `rustfs_integration` job): -- `cargo test -p omnigraph-engine --test s3_storage` +- `cargo test -p omnigraph-engine --test s3_storage` (lifecycle/branching + the e_tag-present CSR topology cache-key reuse test — the path local FS can't reach since its e_tag is `None`) - `cargo test -p omnigraph-engine --test write_cost_s3` (RFC-013 step 3a's data-table opener cost gate — flat across commit depth on S3; the term local FS can't reproduce) - `cargo test -p omnigraph-server --test s3` (single-graph serving + config-free `--cluster s3://` boot) - `cargo test -p omnigraph-cluster --test s3_cluster` (full control-plane lifecycle on the bucket)