diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 1e9249f9..f5462aeb 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -90,6 +90,7 @@ jobs: crates/omnigraph/src/db/manifest.rs|crates/omnigraph/src/db/manifest/*) run_rustfs_ci=true ;; crates/omnigraph/tests/s3_storage.rs|crates/omnigraph/tests/write_cost_s3.rs|crates/omnigraph/tests/helpers/*) run_rustfs_ci=true ;; crates/omnigraph/src/table_store.rs|crates/omnigraph/src/instrumentation.rs) run_rustfs_ci=true ;; + crates/omnigraph/src/runtime_cache.rs|crates/omnigraph/src/graph_index/*) run_rustfs_ci=true ;; crates/omnigraph-cluster/src/store.rs|crates/omnigraph-cluster/src/serve.rs) run_rustfs_ci=true ;; crates/omnigraph-cluster/tests/s3_cluster.rs) run_rustfs_ci=true ;; crates/omnigraph-server/tests/s3.rs|crates/omnigraph-server/tests/support/*) run_rustfs_ci=true ;; diff --git a/crates/omnigraph/src/db/omnigraph.rs b/crates/omnigraph/src/db/omnigraph.rs index 92025d85..1e6be426 100644 --- a/crates/omnigraph/src/db/omnigraph.rs +++ b/crates/omnigraph/src/db/omnigraph.rs @@ -1296,8 +1296,9 @@ impl Omnigraph { pub(crate) async fn graph_index_for_resolved( &self, resolved: &ResolvedTarget, + edge_types: &std::collections::HashMap, ) -> Result> { - table_ops::graph_index_for_resolved(self, resolved).await + table_ops::graph_index_for_resolved(self, resolved, edge_types).await } /// Ensure BTree scalar indices exist on key columns. diff --git a/crates/omnigraph/src/db/omnigraph/table_ops.rs b/crates/omnigraph/src/db/omnigraph/table_ops.rs index f28a1767..26146728 100644 --- a/crates/omnigraph/src/db/omnigraph/table_ops.rs +++ b/crates/omnigraph/src/db/omnigraph/table_ops.rs @@ -10,15 +10,22 @@ pub(super) async fn graph_index(db: &Omnigraph) -> Result = catalog + .edge_types + .iter() + .map(|(name, et)| (name.clone(), (et.from_type.clone(), et.to_type.clone()))) + .collect(); + db.runtime_cache.graph_index(&resolved, &edge_types).await } pub(super) async fn graph_index_for_resolved( db: &Omnigraph, resolved: &ResolvedTarget, + edge_types: &std::collections::HashMap, ) -> Result> { - let catalog = db.catalog(); - db.runtime_cache.graph_index(resolved, &catalog).await + db.runtime_cache.graph_index(resolved, edge_types).await } pub(super) async fn ensure_indices(db: &Omnigraph) -> Result> { diff --git a/crates/omnigraph/src/exec/query.rs b/crates/omnigraph/src/exec/query.rs index 23e14349..b1a834f4 100644 --- a/crates/omnigraph/src/exec/query.rs +++ b/crates/omnigraph/src/exec/query.rs @@ -50,7 +50,11 @@ impl Omnigraph { .any(|op| matches!(op, IROp::Expand { .. } | IROp::AntiJoin { .. })); // Lazy: an index-served query with no AntiJoin never builds the CSR. let graph_index = if needs_graph { - GraphIndexHandle::cached(self, &resolved) + GraphIndexHandle::cached( + self, + &resolved, + referenced_edge_types(&ir.pipeline, &catalog), + ) } else { GraphIndexHandle::none() }; @@ -95,14 +99,9 @@ impl Omnigraph { .any(|op| matches!(op, IROp::Expand { .. } | IROp::AntiJoin { .. })); // Lazy build against this historical snapshot (not the RuntimeCache, // which is keyed to live branch targets); only a CSR-path Expand or an - // AntiJoin triggers it. + // AntiJoin triggers it. Scoped to the edges this query traverses. let graph_index = if needs_graph { - let edge_types = catalog - .edge_types - .iter() - .map(|(name, et)| (name.clone(), (et.from_type.clone(), et.to_type.clone()))) - .collect(); - GraphIndexHandle::direct(&snapshot, edge_types) + GraphIndexHandle::direct(&snapshot, referenced_edge_types(&ir.pipeline, &catalog)) } else { GraphIndexHandle::none() }; @@ -762,6 +761,51 @@ fn execute_pipeline<'a>( }) } +/// The edge types a query's pipeline actually traverses, mapped to their +/// `(from_type, to_type)` endpoints. Recurses through `AntiJoin` inner pipelines +/// (whose bulk fast path consumes the CSR for the inner `Expand`'s edge). The +/// CSR build is scoped to exactly this set instead of every edge type in the +/// catalog — otherwise a single-edge join (`$x identifiesPerson $p`) that lands +/// on the CSR path would scan the whole graph's edge data (every message, +/// relationship, … table), the cause of the cross-edge-join hang. Empty when the +/// only traversal is an `AntiJoin` with no inner `Expand` — that shape never asks +/// the handle for an index, so an empty build is never realized. +fn referenced_edge_types( + pipeline: &[IROp], + catalog: &Catalog, +) -> HashMap { + let mut names = std::collections::BTreeSet::new(); + collect_referenced_edge_names(pipeline, &mut names); + names + .into_iter() + .filter_map(|name| { + catalog + .edge_types + .get(&name) + .map(|et| (name, (et.from_type.clone(), et.to_type.clone()))) + }) + .collect() +} + +fn collect_referenced_edge_names( + pipeline: &[IROp], + out: &mut std::collections::BTreeSet, +) { + for op in pipeline { + match op { + IROp::Expand { edge_type, .. } => { + out.insert(edge_type.clone()); + } + IROp::AntiJoin { inner, .. } => collect_referenced_edge_names(inner, out), + // Exhaustive on purpose (no `_` arm): a new edge-referencing IROp must + // force a compile error here rather than silently under-scope the CSR + // build — an omitted edge would fail at runtime with "no adjacency + // index for edge". The non-traversal ops reference no edges. + IROp::NodeScan { .. } | IROp::Filter(_) => {} + } + } +} + /// Lazily provides the in-memory CSR graph index, building it on first use and /// memoizing for the rest of the query. Indexed-mode Expand never asks for it, /// so a query that is entirely index-served and has no AntiJoin never pays the @@ -776,7 +820,11 @@ pub struct GraphIndexHandle<'a> { enum GraphIndexBuilder<'a> { None, - Cached(&'a Omnigraph, &'a crate::db::ResolvedTarget), + Cached( + &'a Omnigraph, + &'a crate::db::ResolvedTarget, + HashMap, + ), Direct(&'a Snapshot, HashMap), } @@ -788,10 +836,14 @@ impl<'a> GraphIndexHandle<'a> { } } - fn cached(db: &'a Omnigraph, resolved: &'a crate::db::ResolvedTarget) -> Self { + fn cached( + db: &'a Omnigraph, + resolved: &'a crate::db::ResolvedTarget, + edge_types: HashMap, + ) -> Self { Self { cell: tokio::sync::OnceCell::new(), - builder: GraphIndexBuilder::Cached(db, resolved), + builder: GraphIndexBuilder::Cached(db, resolved, edge_types), } } @@ -810,8 +862,8 @@ impl<'a> GraphIndexHandle<'a> { .get_or_try_init(|| async { match &self.builder { GraphIndexBuilder::None => Ok::>, OmniError>(None), - GraphIndexBuilder::Cached(db, resolved) => { - Ok(Some(db.graph_index_for_resolved(resolved).await?)) + GraphIndexBuilder::Cached(db, resolved, edge_types) => { + Ok(Some(db.graph_index_for_resolved(resolved, edge_types).await?)) } GraphIndexBuilder::Direct(snapshot, edge_types) => { Ok(Some(Arc::new(GraphIndex::build(snapshot, edge_types).await?))) @@ -834,7 +886,12 @@ impl<'a> GraphIndexHandle<'a> { /// forces the path (ops escape hatch + test hook). Both modes are semantically /// identical, so the override only changes which path runs, never the result. fn traversal_indexed_override() -> Option { - match std::env::var("OMNIGRAPH_TRAVERSAL_MODE").ok().as_deref() { + // The scoped test seam (`with_traversal_mode`) takes precedence over the + // process-global `OMNIGRAPH_TRAVERSAL_MODE` ops escape hatch. + let mode = crate::instrumentation::traversal_mode_override() + .map(str::to_string) + .or_else(|| std::env::var("OMNIGRAPH_TRAVERSAL_MODE").ok()); + match mode.as_deref() { Some("indexed") => Some(true), Some("csr") => Some(false), _ => None, @@ -2460,6 +2517,107 @@ mod expand_chooser_tests { } } +#[cfg(test)] +mod referenced_edge_types_tests { + use super::*; + + fn node_scan(var: &str, ty: &str) -> IROp { + IROp::NodeScan { + variable: var.to_string(), + type_name: ty.to_string(), + filters: Vec::new(), + } + } + + fn expand(edge: &str) -> IROp { + IROp::Expand { + src_var: "a".into(), + dst_var: "b".into(), + edge_type: edge.to_string(), + direction: Direction::Out, + dst_type: "X".into(), + min_hops: 1, + max_hops: Some(1), + dst_filters: Vec::new(), + } + } + + fn names(pipeline: &[IROp]) -> Vec { + let mut set = std::collections::BTreeSet::new(); + collect_referenced_edge_names(pipeline, &mut set); + set.into_iter().collect() + } + + #[test] + fn collects_a_single_expand_edge() { + assert_eq!( + names(&[node_scan("x", "ExternalID"), expand("identifiesPerson")]), + vec!["identifiesPerson".to_string()] + ); + } + + #[test] + fn ignores_non_traversal_ops_and_dedups() { + // A pipeline that touches one edge twice references exactly that one edge — + // never the whole catalog (the cross-edge-join hang this scoping fixes). + let pipeline = vec![ + node_scan("x", "ExternalID"), + expand("identifiesPerson"), + IROp::Filter(IRFilter { + left: IRExpr::PropAccess { + variable: "p".into(), + property: "name".into(), + }, + op: omnigraph_compiler::query::ast::CompOp::Eq, + right: IRExpr::Literal(Literal::String("a".into())), + }), + expand("identifiesPerson"), + ]; + assert_eq!(names(&pipeline), vec!["identifiesPerson".to_string()]); + } + + #[test] + fn recurses_through_anti_join_inner_pipeline() { + // The bulk anti-join fast path consumes the CSR for the inner Expand's + // edge, so its edge type must be in scope even though it is nested. + let pipeline = vec![ + node_scan("p", "Person"), + expand("knows"), + IROp::AntiJoin { + outer_var: "p".into(), + inner: vec![expand("worksAt")], + }, + ]; + assert_eq!( + names(&pipeline), + vec!["knows".to_string(), "worksAt".to_string()] + ); + } + + #[test] + fn recurses_through_nested_anti_joins() { + let pipeline = vec![IROp::AntiJoin { + outer_var: "p".into(), + inner: vec![IROp::AntiJoin { + outer_var: "c".into(), + inner: vec![expand("deepEdge")], + }], + }]; + assert_eq!(names(&pipeline), vec!["deepEdge".to_string()]); + } + + #[test] + fn anti_join_with_no_inner_expand_references_no_edges() { + // A predicate-only anti-join never asks the handle for an index, so the + // empty set is correct — no whole-graph build is realized. + let pipeline = vec![IROp::AntiJoin { + outer_var: "p".into(), + inner: vec![node_scan("c", "Company")], + }]; + assert!(names(&pipeline).is_empty()); + } +} + #[cfg(test)] mod literal_lowering_tests { use super::*; diff --git a/crates/omnigraph/src/graph_index/mod.rs b/crates/omnigraph/src/graph_index/mod.rs index ae3173a2..56831dda 100644 --- a/crates/omnigraph/src/graph_index/mod.rs +++ b/crates/omnigraph/src/graph_index/mod.rs @@ -113,6 +113,12 @@ impl GraphIndex { snapshot: &Snapshot, edge_types: &HashMap, // edge_name → (from_type, to_type) ) -> Result { + // INVARIANT (A1 graph-index cache key): the topology is a pure function of + // the edge tables' `src`/`dst` columns and nothing else. `RuntimeCache` + // keys `GraphIndexCacheKey` on each edge table's physical identity + // `(table_key, version, table_branch, e_tag)` so a lazy-fork branch reuses + // main's built index. If you read node tables, schema, or other state here, + // add it to that key or the cache will serve a stale index. let mut type_indices: HashMap = HashMap::new(); let mut csr = HashMap::new(); let mut csc = HashMap::new(); diff --git a/crates/omnigraph/src/instrumentation.rs b/crates/omnigraph/src/instrumentation.rs index 011c787c..861592ec 100644 --- a/crates/omnigraph/src/instrumentation.rs +++ b/crates/omnigraph/src/instrumentation.rs @@ -59,6 +59,14 @@ pub struct QueryIoProbes { /// Internal/system-table (`__manifest`) open CALLS — the complement of /// `data_open_count`, kept for symmetry and debugging. pub internal_open_count: Arc, + /// Counts topology-index builds (the `RuntimeCache::graph_index` cache-miss + /// path). A cost test asserts a fresh branch whose edge tables are unchanged + /// from main reuses main's cached index (0 builds) rather than rebuilding it. + pub graph_build_count: Arc, + /// Edge tables included in topology builds this query (summed over build + /// invocations). A cost test asserts a query referencing one edge builds only + /// that edge, not every catalog edge (the cold-build shrink A2 ships). + pub graph_edges_built: Arc, } tokio::task_local! { @@ -78,6 +86,32 @@ fn current(f: impl FnOnce(&QueryIoProbes) -> R) -> Option { QUERY_IO_PROBES.try_with(f).ok() } +tokio::task_local! { + static TRAVERSAL_MODE_OVERRIDE: Option<&'static str>; +} + +/// Force the Expand execution mode (`"indexed"` | `"csr"`) for the scope of `fut` +/// WITHOUT mutating the process-global `OMNIGRAPH_TRAVERSAL_MODE` env var. This is +/// the general traversal-mode test seam: scope-bound (so it cannot leak — the +/// override is gone when `fut` resolves or unwinds) and process-safe (it never +/// touches shared state, so a forced-mode test never affects a concurrent test in +/// the same binary, removing the need for `#[serial]` + a dedicated all-serial +/// binary). Mirrors [`with_query_io_probes`]. The env var stays the production/ops +/// escape hatch; this scoped override takes precedence over it +/// (`exec::query::traversal_indexed_override`). +pub async fn with_traversal_mode(mode: &'static str, fut: F) -> F::Output +where + F: std::future::Future, +{ + TRAVERSAL_MODE_OVERRIDE.scope(Some(mode), fut).await +} + +/// The scoped traversal-mode override active for this task, if any. `None` in +/// production (no scope installed), so the env var is consulted instead. +pub(crate) fn traversal_mode_override() -> Option<&'static str> { + TRAVERSAL_MODE_OVERRIDE.try_with(|m| *m).ok().flatten() +} + pub(crate) fn manifest_wrapper() -> Option> { current(|p| p.manifest_wrapper.clone()).flatten() } @@ -119,6 +153,16 @@ pub(crate) fn record_open(uri: &str) { }); } +/// Record one topology-index build over `edges` edge tables (the +/// `RuntimeCache::graph_index` cache-miss path). No-op when no probes are +/// installed (production). +pub(crate) fn record_graph_build(edges: usize) { + let _ = current(|p| { + p.graph_build_count.fetch_add(1, Ordering::Relaxed); + p.graph_edges_built.fetch_add(edges as u64, Ordering::Relaxed); + }); +} + /// Per-operation staged-write counts, installed for a task via /// [`with_merge_write_probes`]. Lets a cost-budget test assert WHICH staged-write /// primitive an operation invokes — e.g. that an append-only fast-forward merge diff --git a/crates/omnigraph/src/runtime_cache.rs b/crates/omnigraph/src/runtime_cache.rs index e85a90a1..7023a925 100644 --- a/crates/omnigraph/src/runtime_cache.rs +++ b/crates/omnigraph/src/runtime_cache.rs @@ -4,16 +4,20 @@ use std::sync::Arc; use lance::Dataset; use lance::session::Session; -use omnigraph_compiler::catalog::Catalog; use tokio::sync::Mutex; use crate::db::ResolvedTarget; use crate::error::Result; use crate::graph_index::GraphIndex; +/// Cache key for a built `GraphIndex`. Keyed (A1) by the physical identity of the +/// edge tables the topology is derived from, NOT by the resolved snapshot id. The +/// topology is a pure function of the edge tables' `src`/`dst`, so two snapshots +/// (e.g. main and a lazy-fork branch whose edge tables physically *are* main's) +/// with identical edge tables share one built index: a fresh branch reuses main's +/// instead of rebuilding it from a cold scan. #[derive(Debug, Clone, PartialEq, Eq, Hash)] struct GraphIndexCacheKey { - snapshot_id: String, edge_tables: Vec, } @@ -22,6 +26,20 @@ struct GraphIndexTableState { table_key: String, table_version: u64, table_branch: Option, + /// Lance manifest incarnation token for this edge table version. Preserves the + /// incarnation distinction the dropped synthetic snapshot id used to carry: a + /// branch deleted and recreated at the same version number gets a new e_tag, so + /// the cache rebuilds instead of serving stale topology. `None` only on stores + /// without e_tags (local FS); there a same-branch manifest refresh clears the + /// cache as the fallback (the read-path gap in docs/dev/invariants.md). + e_tag: Option, + /// The edge's `(from_type, to_type)` endpoint names at build time. `GraphIndex` + /// keys its `TypeIndex`es by these, and `execute_expand_csr` looks them up by + /// the *current* catalog's endpoint names — so a schema change that repoints an + /// edge type while leaving the edge table's physical identity unchanged must + /// invalidate the entry (else the reused index has the old type-index namespace + /// and the new traversal fails with "no type index for "). + endpoints: (String, String), } #[derive(Debug, Default)] @@ -40,12 +58,18 @@ impl RuntimeCache { cache.entries.invalidate_all(); } + /// Build (or fetch) the CSR/CSC graph index scoped to exactly `edge_types` — + /// the edge types the query actually traverses, not every edge type in the + /// catalog. Scoping is what keeps a single-edge join (`$x identifiesPerson + /// $p`) from scanning the whole graph's edge data; the cache key carries the + /// scoped set, so a `{Knows}` index and a `{Knows, WorksAt}` index are + /// distinct entries and never serve each other. pub async fn graph_index( &self, resolved: &ResolvedTarget, - catalog: &Catalog, + edge_types: &HashMap, ) -> Result> { - let key = graph_index_cache_key(resolved, catalog); + let key = graph_index_cache_key(resolved, edge_types); { let mut cache = self.graph_indices.lock().await; if let Some(index) = cache.entries.get(&key).cloned() { @@ -53,13 +77,8 @@ impl RuntimeCache { } } - let edge_types = catalog - .edge_types - .iter() - .map(|(name, et)| (name.clone(), (et.from_type.clone(), et.to_type.clone()))) - .collect(); - - let index = Arc::new(GraphIndex::build(&resolved.snapshot, &edge_types).await?); + crate::instrumentation::record_graph_build(edge_types.len()); + let index = Arc::new(GraphIndex::build(&resolved.snapshot, edge_types).await?); let mut cache = self.graph_indices.lock().await; if let Some(existing) = cache.entries.get(&key).cloned() { return Ok(existing); @@ -151,11 +170,13 @@ impl Default for GraphIndexCache { } } -fn graph_index_cache_key(resolved: &ResolvedTarget, catalog: &Catalog) -> GraphIndexCacheKey { - let mut edge_tables: Vec = catalog - .edge_types - .keys() - .filter_map(|edge_name| { +fn graph_index_cache_key( + resolved: &ResolvedTarget, + edge_types: &HashMap, +) -> GraphIndexCacheKey { + let mut edge_tables: Vec = edge_types + .iter() + .filter_map(|(edge_name, endpoints)| { let table_key = format!("edge:{}", edge_name); resolved .snapshot @@ -164,15 +185,14 @@ fn graph_index_cache_key(resolved: &ResolvedTarget, catalog: &Catalog) -> GraphI table_key, table_version: entry.table_version, table_branch: entry.table_branch.clone(), + e_tag: entry.version_metadata.e_tag().map(str::to_string), + endpoints: endpoints.clone(), }) }) .collect(); edge_tables.sort_by(|a, b| a.table_key.cmp(&b.table_key)); - GraphIndexCacheKey { - snapshot_id: resolved.snapshot_id.as_str().to_string(), - edge_tables, - } + GraphIndexCacheKey { edge_tables } } /// Max held `Dataset` handles. A handle holds only Arcs (object store + manifest), @@ -290,9 +310,16 @@ mod tests { use super::*; fn key(id: usize) -> GraphIndexCacheKey { + // Distinct keys via a distinct edge table per id (the key no longer carries + // a snapshot id — it is the physical edge-table identity set, A1). GraphIndexCacheKey { - snapshot_id: format!("snap-{id}"), - edge_tables: Vec::new(), + edge_tables: vec![GraphIndexTableState { + table_key: format!("edge:t{id}"), + table_version: 1, + table_branch: None, + e_tag: None, + endpoints: ("A".to_string(), "B".to_string()), + }], } } @@ -300,6 +327,34 @@ mod tests { Arc::new(GraphIndex::empty_for_test()) } + /// An edge table at the same physical identity but a different `(from_type, + /// to_type)` endpoint mapping (a schema repoint) must NOT share a cache entry + /// — the built index's `TypeIndex` namespace is keyed by those endpoints. + #[test] + fn endpoint_remap_at_same_physical_identity_splits_cache_key() { + let base = GraphIndexTableState { + table_key: "edge:Knows".to_string(), + table_version: 7, + table_branch: None, + e_tag: Some("etag".to_string()), + endpoints: ("Person".to_string(), "Person".to_string()), + }; + let repointed = GraphIndexTableState { + endpoints: ("Person".to_string(), "Account".to_string()), + ..base.clone() + }; + let k_old = GraphIndexCacheKey { + edge_tables: vec![base], + }; + let k_new = GraphIndexCacheKey { + edge_tables: vec![repointed], + }; + assert_ne!( + k_old, k_new, + "a schema endpoint remap must produce a distinct graph-index cache key" + ); + } + #[test] fn graph_index_cache_evicts_oldest_entry() { let mut cache = GraphIndexCache::default(); diff --git a/crates/omnigraph/tests/helpers/cost.rs b/crates/omnigraph/tests/helpers/cost.rs index 37ac9e8e..05efe080 100644 --- a/crates/omnigraph/tests/helpers/cost.rs +++ b/crates/omnigraph/tests/helpers/cost.rs @@ -309,6 +309,8 @@ impl OpProbes { probe_count: Arc::clone(&h.probe_count), data_open_count: Arc::clone(&h.data_open_count), internal_open_count: Arc::clone(&h.internal_open_count), + // graph_build_count / graph_edges_built unused by this harness. + ..Default::default() }; (probes, h) } diff --git a/crates/omnigraph/tests/helpers/mod.rs b/crates/omnigraph/tests/helpers/mod.rs index 13127f28..597a104a 100644 --- a/crates/omnigraph/tests/helpers/mod.rs +++ b/crates/omnigraph/tests/helpers/mod.rs @@ -110,6 +110,26 @@ pub async fn count_rows_branch(db: &Omnigraph, branch: &str, table_key: &str) -> ds.count_rows(None).await.unwrap() } +/// First result column as sorted strings — the shared shape the traversal / +/// cost tests use to compare a query's returned names. Empty for a 0-row result. +pub fn first_column_sorted(result: &QueryResult) -> Vec { + if result.num_rows() == 0 { + return Vec::new(); + } + let batch = result.concat_batches().unwrap(); + let col = batch + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + let mut v: Vec = (0..col.len()) + .filter(|&i| !col.is_null(i)) + .map(|i| col.value(i).to_string()) + .collect(); + v.sort(); + v +} + /// Collect all string values from a named column across batches. pub fn collect_column_strings(batches: &[RecordBatch], col: &str) -> Vec { let mut out = Vec::new(); diff --git a/crates/omnigraph/tests/proptest_equivalence.rs b/crates/omnigraph/tests/proptest_equivalence.rs index 3423a2fa..3b4011fa 100644 --- a/crates/omnigraph/tests/proptest_equivalence.rs +++ b/crates/omnigraph/tests/proptest_equivalence.rs @@ -9,10 +9,10 @@ //! third ExpandMode, the anti-join fast/slow fork) fail loudly instead of //! silently. //! -//! Each test is a sync `#[test]` + `#[serial]`: it builds its own runtime and -//! `block_on`s per generated case (proptest closures are sync), and the -//! mode-equivalence test writes `OMNIGRAPH_TRAVERSAL_MODE`, so serial execution -//! keeps env writes from racing other tests in this binary. +//! Each test is a sync `#[test]`: it builds its own runtime and `block_on`s per +//! generated case (proptest closures are sync). The mode-equivalence test forces +//! the Expand mode via the scoped `with_traversal_mode` seam — no env mutation, so +//! no `#[serial]` and no leak across shrink/cases. mod helpers; @@ -21,9 +21,9 @@ use std::collections::HashSet; use arrow_array::{Array, StringArray}; use proptest::prelude::*; use proptest::test_runner::{Config, TestRunner}; -use serial_test::serial; use omnigraph::db::{Omnigraph, ReadTarget}; +use omnigraph::instrumentation::with_traversal_mode; use omnigraph::loader::{LoadMode, load_jsonl}; use omnigraph_compiler::ir::ParamMap; use omnigraph_compiler::query::ast::Literal; @@ -138,27 +138,6 @@ fn config() -> Config { } } -fn clear_mode() { - unsafe { std::env::remove_var("OMNIGRAPH_TRAVERSAL_MODE") }; -} - -/// RAII guard that sets `OMNIGRAPH_TRAVERSAL_MODE` and clears it on drop — so a -/// panic mid-case (e.g. a query `unwrap`) cannot leak the forced mode into -/// proptest's subsequent shrink/cases and mask the divergence under test. SAFE: -/// every test in this binary is `#[serial]`, so no thread reads the env during -/// the write. -struct ModeGuard; -impl ModeGuard { - fn set(mode: &str) -> Self { - unsafe { std::env::set_var("OMNIGRAPH_TRAVERSAL_MODE", mode) }; - ModeGuard - } -} -impl Drop for ModeGuard { - fn drop(&mut self) { - unsafe { std::env::remove_var("OMNIGRAPH_TRAVERSAL_MODE") }; - } -} async fn load_graph(graph: &GenGraph) -> (tempfile::TempDir, Omnigraph) { let dir = tempfile::tempdir().unwrap(); @@ -203,7 +182,6 @@ async fn col0_set(db: &mut Omnigraph, name: &str, params: &ParamMap) -> HashSet< // one (worksAt{1,2}, collision-prone). This is the search-over-the-class version // of the hand-built cross-type-collision fixture. #[test] -#[serial] fn prop_expand_indexed_eq_csr() { let rt = tokio::runtime::Runtime::new().unwrap(); let mut runner = TestRunner::new(config()); @@ -214,17 +192,12 @@ fn prop_expand_indexed_eq_csr() { for start in graph.persons.clone() { let p = one_param(&start); for q in ["friends", "employers"] { - // Each guard clears the mode on drop (end of the block, - // or on panic), so a forced mode never leaks across runs. - let csr = { - let _g = ModeGuard::set("csr"); - col0_sorted(&mut db, q, &p).await - }; - let indexed = { - let _g = ModeGuard::set("indexed"); - col0_sorted(&mut db, q, &p).await - }; - // No guard → env unset → auto (cost-based) path. + // The seam is scope-bound: the forced mode is gone when the + // wrapped future resolves, so it never leaks across runs. + let csr = with_traversal_mode("csr", col0_sorted(&mut db, q, &p)).await; + let indexed = + with_traversal_mode("indexed", col0_sorted(&mut db, q, &p)).await; + // No override → auto (cost-based) path. let auto = col0_sorted(&mut db, q, &p).await; if csr != indexed || csr != auto { return Some((start, q, csr, indexed, auto)); @@ -247,9 +220,7 @@ fn prop_expand_indexed_eq_csr() { // destination type's loaded key set — independent of the two-mode comparison, so // it catches over-emission even if both modes are wrong identically. #[test] -#[serial] fn prop_results_subset_of_existing_nodes() { - clear_mode(); let rt = tokio::runtime::Runtime::new().unwrap(); let mut runner = TestRunner::new(config()); runner @@ -282,9 +253,7 @@ fn prop_results_subset_of_existing_nodes() { // INVARIANT 3: anti-join complement. `not { $p worksAt $_ }` and its complement // (persons WITH a worksAt) must be disjoint and together cover all persons. #[test] -#[serial] fn prop_antijoin_partitions_persons() { - clear_mode(); let rt = tokio::runtime::Runtime::new().unwrap(); let mut runner = TestRunner::new(config()); runner diff --git a/crates/omnigraph/tests/s3_storage.rs b/crates/omnigraph/tests/s3_storage.rs index 3814600e..eb546f81 100644 --- a/crates/omnigraph/tests/s3_storage.rs +++ b/crates/omnigraph/tests/s3_storage.rs @@ -1,7 +1,11 @@ mod helpers; +use std::sync::Arc; +use std::sync::atomic::{AtomicU64, Ordering}; + use omnigraph::db::MergeOutcome; -use omnigraph::db::Omnigraph; +use omnigraph::db::{Omnigraph, ReadTarget}; +use omnigraph::instrumentation::{QueryIoProbes, with_query_io_probes, with_traversal_mode}; use omnigraph::loader::{LoadMode, load_jsonl}; use helpers::*; @@ -244,3 +248,82 @@ async fn s3_schema_apply_migrates_live_graph() { "live S3 schema must carry the migration" ); } + +/// Graph-index (CSR topology) cross-branch reuse on a real object store, where the +/// cache key's per-table `e_tag` is a genuine non-`None` token (Lance e_tag is +/// `None` on local FS, so the local twin in `warm_read_cost.rs` keys on `None` — +/// this exercises the e_tag-present path production runs). With e_tags present, a +/// fresh lazy-fork branch reuses main's cached index (`graph_build_count == 0`). +/// Forces CSR via the scoped `with_traversal_mode` seam (no env mutation, so no +/// interference with the other tests in this binary). +#[tokio::test(flavor = "multi_thread")] +async fn s3_fresh_branch_traversal_reuses_main_graph_index_with_etags() { + let Some(uri) = s3_test_graph_uri("graph-index-etag") else { + eprintln!("skipping s3 graph-index test: OMNIGRAPH_S3_TEST_BUCKET is not set"); + return; + }; + + let mut writer = Omnigraph::init(&uri, TEST_SCHEMA).await.unwrap(); + // TEST_DATA seeds Alice->Bob and Alice->Charlie Knows edges. + load_jsonl(&mut writer, TEST_DATA, LoadMode::Overwrite) + .await + .unwrap(); + + // Separate reader: it never creates the branch, so branch_create below does + // not invalidate the reader's warm cache. + let reader = Omnigraph::open(&uri).await.unwrap(); + + // Warm main on the CSR path: builds + caches the topology index keyed by the + // edge table's physical identity incl. its real e_tag. + let warm = with_traversal_mode( + "csr", + reader.query( + ReadTarget::branch("main"), + TEST_QUERIES, + "friends_of", + ¶ms(&[("$name", "Alice")]), + ), + ) + .await + .unwrap(); + assert_eq!( + first_column_sorted(&warm), + vec!["Bob", "Charlie"], + "test setup: Alice knows Bob and Charlie" + ); + + // Lazy fork: feature's edge tables are physically main's (same version + + // e_tag, table_branch = None). + writer.branch_create("feature").await.unwrap(); + + let graph_build = Arc::new(AtomicU64::new(0)); + let probes = QueryIoProbes { + graph_build_count: Arc::clone(&graph_build), + ..Default::default() + }; + let on_branch = with_traversal_mode( + "csr", + with_query_io_probes( + probes, + reader.query( + ReadTarget::branch("feature"), + TEST_QUERIES, + "friends_of", + ¶ms(&[("$name", "Alice")]), + ), + ), + ) + .await + .unwrap(); + + assert_eq!( + first_column_sorted(&on_branch), + vec!["Bob", "Charlie"], + "fresh branch sees main's edges (lazy fork) and the reused index is correct" + ); + assert_eq!( + graph_build.load(Ordering::Relaxed), + 0, + "with real e_tags, a fresh lazy-fork branch must reuse main's cached CSR index, not rebuild" + ); +} diff --git a/crates/omnigraph/tests/traversal_indexed.rs b/crates/omnigraph/tests/traversal_indexed.rs index 2ceed85a..cadd3d23 100644 --- a/crates/omnigraph/tests/traversal_indexed.rs +++ b/crates/omnigraph/tests/traversal_indexed.rs @@ -1,63 +1,43 @@ //! BTREE-indexed Expand path (`execute_expand_indexed`) coverage. //! -//! These tests force the Expand execution mode via `OMNIGRAPH_TRAVERSAL_MODE` -//! and assert the indexed path matches the CSR path (both are semantically -//! identical — the indexed path just serves neighbor lookups from the persisted -//! src/dst BTREE instead of an in-memory CSR). They live in their own test -//! binary and are all `#[serial]`, so the env writes never race a concurrent -//! reader: within this process serial execution serializes every env read, and -//! other test binaries (e.g. `traversal.rs`) are separate processes whose env -//! stays unset (→ CSR), validating the shared hydrate/align tail on the CSR path. +//! These tests force the Expand execution mode via the scoped `with_traversal_mode` +//! test seam — NOT the process-global `OMNIGRAPH_TRAVERSAL_MODE` env var — and +//! assert the indexed path matches the CSR path (both are semantically identical: +//! the indexed path serves neighbor lookups from the persisted src/dst BTREE +//! instead of an in-memory CSR). The seam is scope-bound and process-safe, so +//! these tests need no `#[serial]` and no dedicated binary. mod helpers; -use arrow_array::{Array, StringArray}; - use omnigraph::db::Omnigraph; +use omnigraph::instrumentation::with_traversal_mode; use omnigraph::loader::{LoadMode, load_jsonl}; use omnigraph::table_store::{IndexCoverage, TableStore}; use omnigraph_compiler::ir::ParamMap; -use serial_test::serial; use helpers::*; -fn set_mode(mode: &str) { - // SAFE: every test here is #[serial] and this binary has no non-serial - // env reader, so no thread reads the environment during this write. - unsafe { std::env::set_var("OMNIGRAPH_TRAVERSAL_MODE", mode) }; -} - -fn clear_mode() { - unsafe { std::env::remove_var("OMNIGRAPH_TRAVERSAL_MODE") }; -} - -/// Run a name-returning query and return its first column, sorted. +/// Run `name` on main under the cost-chooser (auto) Expand mode; first column sorted. async fn sorted_names(db: &mut Omnigraph, queries: &str, name: &str, params: &ParamMap) -> Vec { - let result = query_main(db, queries, name, params).await.unwrap(); - if result.num_rows() == 0 { - return Vec::new(); - } - let batch = result.concat_batches().unwrap(); - let col = batch - .column(0) - .as_any() - .downcast_ref::() - .unwrap(); - let mut v: Vec = (0..col.len()).map(|i| col.value(i).to_string()).collect(); - v.sort(); - v + first_column_sorted(&query_main(db, queries, name, params).await.unwrap()) } /// Run the same query under CSR, indexed, and auto (cost-chooser) modes; assert -/// all three produce identical results and return them. The auto pass exercises -/// `choose_expand_mode` end to end: whichever path it selects, the rows must -/// match the forced paths (the chooser changes which path runs, never the result). +/// all three produce identical results and return them. The forced modes use the +/// scoped `with_traversal_mode` seam; the auto pass exercises `choose_expand_mode` +/// end to end (whichever path it selects, the rows must match the forced paths — +/// the chooser changes which path runs, never the result). async fn both_modes(db: &mut Omnigraph, queries: &str, name: &str, params: &ParamMap) -> Vec { - set_mode("csr"); - let csr = sorted_names(db, queries, name, params).await; - set_mode("indexed"); - let indexed = sorted_names(db, queries, name, params).await; - clear_mode(); + let csr = first_column_sorted( + &with_traversal_mode("csr", query_main(db, queries, name, params)) + .await + .unwrap(), + ); + let indexed = first_column_sorted( + &with_traversal_mode("indexed", query_main(db, queries, name, params)) + .await + .unwrap(), + ); let auto = sorted_names(db, queries, name, params).await; assert_eq!( indexed, csr, @@ -72,7 +52,6 @@ async fn both_modes(db: &mut Omnigraph, queries: &str, name: &str, params: &Para // The C6 index-coverage guard: `key_column_index_coverage` must report whether // a `key_col IN (...)` scan will use the persisted BTREE or silently full-scan. -// Not #[serial] — it calls the helper directly and reads no env. #[tokio::test] async fn key_column_index_coverage_detects_btree_presence() { let dir = tempfile::tempdir().unwrap(); @@ -136,7 +115,6 @@ async fn coverage_degrades_for_appended_unindexed_fragment() { } #[tokio::test] -#[serial] async fn indexed_matches_csr_one_hop_same_type() { let dir = tempfile::tempdir().unwrap(); let mut db = init_and_load(&dir).await; @@ -146,7 +124,6 @@ async fn indexed_matches_csr_one_hop_same_type() { } #[tokio::test] -#[serial] async fn indexed_matches_csr_multi_hop_same_type() { let dir = tempfile::tempdir().unwrap(); let mut db = init_and_load(&dir).await; @@ -165,7 +142,6 @@ query reach($name: String) { } #[tokio::test] -#[serial] async fn indexed_matches_csr_cross_type() { let dir = tempfile::tempdir().unwrap(); let mut db = init_and_load(&dir).await; @@ -183,7 +159,6 @@ query employer($name: String) { } #[tokio::test] -#[serial] async fn indexed_matches_csr_no_match() { let dir = tempfile::tempdir().unwrap(); let mut db = init_and_load(&dir).await; @@ -193,7 +168,6 @@ async fn indexed_matches_csr_no_match() { } #[tokio::test] -#[serial] async fn indexed_finds_unindexed_appended_edge() { let dir = tempfile::tempdir().unwrap(); let mut db = init_and_load(&dir).await; @@ -212,9 +186,14 @@ async fn indexed_finds_unindexed_appended_edge() { .await .unwrap(); - set_mode("indexed"); - let got = sorted_names(&mut db, TEST_QUERIES, "friends_of", ¶ms(&[("$name", "Alice")])).await; - clear_mode(); + let got = first_column_sorted( + &with_traversal_mode( + "indexed", + query_main(&mut db, TEST_QUERIES, "friends_of", ¶ms(&[("$name", "Alice")])), + ) + .await + .unwrap(), + ); assert_eq!( got, @@ -234,7 +213,6 @@ async fn indexed_finds_unindexed_appended_edge() { // CSR path never produces. `both_modes` (csr == indexed == auto) plus the // golden assert catch both the divergence and an over-emitting shared bug. #[tokio::test] -#[serial] async fn cross_type_id_collision_does_not_bleed_into_second_hop() { const SCHEMA: &str = r#" node Person { name: String @key } @@ -288,7 +266,6 @@ query reach($name: String) { // bounded range deliberately: an unbounded `{1,}` is a typecheck error, not a // runtime path. `both_modes` also confirms indexed == csr on the cycle. #[tokio::test] -#[serial] async fn variable_hops_terminate_and_dedup_on_cycle() { let dir = tempfile::tempdir().unwrap(); let uri = dir.path().to_str().unwrap(); @@ -310,7 +287,6 @@ async fn variable_hops_terminate_and_dedup_on_cycle() { // A self-loop a->a plus a->b. Variable-length traversal must not loop forever and // must not re-emit the seeded source. #[tokio::test] -#[serial] async fn variable_hops_handle_self_loop() { let dir = tempfile::tempdir().unwrap(); let uri = dir.path().to_str().unwrap(); diff --git a/crates/omnigraph/tests/warm_read_cost.rs b/crates/omnigraph/tests/warm_read_cost.rs index 0d36940b..cf0724f7 100644 --- a/crates/omnigraph/tests/warm_read_cost.rs +++ b/crates/omnigraph/tests/warm_read_cost.rs @@ -7,34 +7,18 @@ mod helpers; -use arrow_array::{Array, StringArray}; +use std::sync::Arc; +use std::sync::atomic::{AtomicU64, Ordering}; + use omnigraph::db::{Omnigraph, ReadTarget}; -use omnigraph_compiler::result::QueryResult; +use omnigraph::instrumentation::{QueryIoProbes, with_query_io_probes, with_traversal_mode}; use helpers::cost::{cost_harness, measure}; use helpers::{ - MUTATION_QUERIES, TEST_QUERIES, commit_many, count_rows, init_and_load, mixed_params, - mutate_branch, mutate_main, params, + MUTATION_QUERIES, TEST_QUERIES, commit_many, count_rows, first_column_sorted, init_and_load, + mixed_params, mutate_branch, mutate_main, params, }; -fn first_column_strings(result: &QueryResult) -> Vec { - if result.num_rows() == 0 { - return Vec::new(); - } - let batch = result.concat_batches().unwrap(); - let values = batch - .column(0) - .as_any() - .downcast_ref::() - .unwrap(); - let mut out = (0..values.len()) - .filter(|&row| !values.is_null(row)) - .map(|row| values.value(row).to_string()) - .collect::>(); - out.sort(); - out -} - /// A warm same-branch read must do ZERO `__manifest` object-store reads and must /// not open the commit graph, even at commit-history depth. Wrapped in /// `cost_harness`, so `manifest_reads` is ground truth: the warm-coordinator @@ -458,10 +442,15 @@ async fn recreated_branch_owned_table_handle_uses_table_etag() { ); } -/// The graph-index cache is keyed by synthetic snapshot id plus edge-table -/// state. A recreated branch can reuse the same edge table `(branch, version)`, -/// so the synthetic snapshot id must carry the manifest incarnation or traversal -/// can reuse stale topology. +/// A recreated branch can reuse the same edge table `(branch, version)`. The +/// graph-index cache is keyed (A1) by each edge table's physical identity +/// `(table_key, version, table_branch, e_tag)`; on local FS the e_tag is `None`, +/// so a recreated branch at the same version has the same key — the stale topology +/// is instead evicted by the same-branch manifest refresh (`invalidate_all` on the +/// `version_probes == 2` stale path), the documented e_tag-less fallback. This +/// traversal takes the indexed path (single-source frontier), so it also exercises +/// the table-handle cache incarnation; the assertion is that recreated-branch +/// topology is never stale regardless of path. #[tokio::test] async fn recreated_branch_traversal_uses_graph_index_incarnation() { let dir = tempfile::tempdir().unwrap(); @@ -493,7 +482,7 @@ async fn recreated_branch_traversal_uses_graph_index_incarnation() { ) .await .unwrap(); - assert_eq!(first_column_strings(&old_friends), vec!["Alice"]); + assert_eq!(first_column_sorted(&old_friends), vec!["Alice"]); let old_edge_entry = reader .snapshot_of(ReadTarget::branch("feature")) .await @@ -540,7 +529,7 @@ async fn recreated_branch_traversal_uses_graph_index_incarnation() { .await; let new_friends = new_friends.unwrap(); assert_eq!( - first_column_strings(&new_friends), + first_column_sorted(&new_friends), vec!["Bob"], "traversal must use the recreated branch's topology, not stale cached graph index" ); @@ -563,7 +552,7 @@ async fn recreated_branch_traversal_uses_graph_index_incarnation() { .await .unwrap(); assert_eq!( - first_column_strings(&stale_old_friends), + first_column_sorted(&stale_old_friends), Vec::::new(), "old branch topology must not leak after branch recreation" ); @@ -728,3 +717,134 @@ async fn write_invalidates_table_cache_for_changed_table() { "the post-write read observes the new row (no stale handle served)" ); } + +// ─── Topology-index build cost (A1 cross-branch reuse + A2 scoped build) ───── +// +// These force the CSR build path (the indexed path builds no topology) via the +// scoped `with_traversal_mode` seam — no process-global env, so they are safe in +// this mixed serial/non-serial binary and need no `#[serial]`. They read the +// `graph_build_count` / `graph_edges_built` probes off a directly-constructed +// `QueryIoProbes`. + +/// A1: a fresh (unwritten) branch reuses main's cached CSR topology index +/// (`graph_build_count == 0`), and the reused index returns correct results for +/// the branch. Before A1 the branch-keyed snapshot id forced a rebuild (count 1). +#[tokio::test] +async fn fresh_branch_traversal_reuses_main_graph_index() { + let dir = tempfile::tempdir().unwrap(); + let mut writer = init_and_load(&dir).await; + let uri = dir.path().to_str().unwrap(); + // A Knows edge on main so there is topology to build and then reuse. + mutate_main( + &mut writer, + MUTATION_QUERIES, + "insert_person_and_friend", + &mixed_params(&[("$name", "Walker"), ("$friend", "Alice")], &[("$age", 41)]), + ) + .await + .unwrap(); + + // Separate reader handle. As in production, the reader never creates the + // branch, so creating it does not invalidate the reader's warm cache. + let reader = Omnigraph::open(uri).await.unwrap(); + + // Reader warms main on the CSR path: builds and caches the topology index. + let warm = with_traversal_mode( + "csr", + reader.query( + ReadTarget::branch("main"), + TEST_QUERIES, + "friends_of", + ¶ms(&[("$name", "Walker")]), + ), + ) + .await + .unwrap(); + assert_eq!( + first_column_sorted(&warm), + vec!["Alice"], + "test setup: main has the Knows edge" + ); + + // A separate writer creates the branch (lazy fork: feature's edge tables are + // physically main's — same version + e_tag, table_branch=None). + writer.branch_create("feature").await.unwrap(); + + let graph_build = Arc::new(AtomicU64::new(0)); + let probes = QueryIoProbes { + graph_build_count: Arc::clone(&graph_build), + ..Default::default() + }; + let on_branch = with_traversal_mode( + "csr", + with_query_io_probes( + probes, + reader.query( + ReadTarget::branch("feature"), + TEST_QUERIES, + "friends_of", + ¶ms(&[("$name", "Walker")]), + ), + ), + ) + .await + .unwrap(); + + assert_eq!( + first_column_sorted(&on_branch), + vec!["Alice"], + "fresh branch sees main's edges (lazy fork) and the reused index is correct" + ); + assert_eq!( + graph_build.load(Ordering::Relaxed), + 0, + "a fresh branch with unchanged edges must reuse main's cached CSR index, not rebuild it" + ); +} + +/// A2: a query referencing one edge type builds the topology for only that edge, +/// not every edge in the catalog. Forces CSR (the build path) and counts edge +/// tables built. Before A2 the build materialized all catalog edges (the fixture +/// defines Knows + WorksAt, so a build-all touches >= 2) — the cold-build cost. +#[tokio::test] +async fn single_edge_query_builds_only_referenced_edge() { + let dir = tempfile::tempdir().unwrap(); + let mut db = init_and_load(&dir).await; + // A Knows edge so the referenced build has topology; the fixture also defines + // WorksAt, so a build-all would touch more than one edge. + mutate_main( + &mut db, + MUTATION_QUERIES, + "insert_person_and_friend", + &mixed_params(&[("$name", "Walker"), ("$friend", "Alice")], &[("$age", 41)]), + ) + .await + .unwrap(); + + let graph_edges = Arc::new(AtomicU64::new(0)); + let probes = QueryIoProbes { + graph_edges_built: Arc::clone(&graph_edges), + ..Default::default() + }; + let result = with_traversal_mode( + "csr", + with_query_io_probes( + probes, + db.query( + ReadTarget::branch("main"), + TEST_QUERIES, + "friends_of", + ¶ms(&[("$name", "Walker")]), + ), + ), + ) + .await + .unwrap(); + + assert_eq!(first_column_sorted(&result), vec!["Alice"]); + assert_eq!( + graph_edges.load(Ordering::Relaxed), + 1, + "a query referencing only `knows` must build only that edge, not all catalog edges" + ); +} diff --git a/docs/dev/architecture.md b/docs/dev/architecture.md index abdc45bb..2469c84c 100644 --- a/docs/dev/architecture.md +++ b/docs/dev/architecture.md @@ -138,8 +138,8 @@ flowchart TB end subgraph idx[graph index] - gi[GraphIndex
CSR/CSC built per query]:::l2 - rc[RuntimeCache LRU=8]:::l2 + gi[GraphIndex
CSR/CSC built per query
scoped to traversed edges]:::l2 + rc[RuntimeCache LRU=8
keyed by edge-table identity]:::l2 end subgraph io[Lance I/O] diff --git a/docs/dev/execution.md b/docs/dev/execution.md index 2762a3f8..0430f25d 100644 --- a/docs/dev/execution.md +++ b/docs/dev/execution.md @@ -6,7 +6,7 @@ Pipeline: 1. Parse + typecheck via `omnigraph-compiler`. 2. Lower to IR. -3. If `Expand` or `AntiJoin` is present, build (or fetch from `RuntimeCache`) a `GraphIndex`. +3. If `Expand` or `AntiJoin` is present, build (or fetch from `RuntimeCache`) a `GraphIndex` **scoped to the edge types the query actually traverses** (`referenced_edge_types`, recursing through `AntiJoin` inners) — not every edge type in the catalog. The CSR build full-scans each covered edge dataset, so scoping is what keeps a single-edge join (`$x identifiesPerson $p`) from scanning the whole graph's edge data. The `RuntimeCache` key is each covered edge table's **physical identity** `(table_key, version, table_branch, e_tag)` (not the resolved snapshot id), so a `{Knows}` index and a `{Knows, WorksAt}` index are distinct entries AND a lazy-fork branch whose edge tables physically *are* main's reuses main's built index instead of cold-scanning it. 4. Run `execute_query` against the snapshot. ### Read flow — sequence diff --git a/docs/dev/invariants.md b/docs/dev/invariants.md index f7a6443a..ebad595e 100644 --- a/docs/dev/invariants.md +++ b/docs/dev/invariants.md @@ -336,7 +336,29 @@ them explicit. deferred — it needs the Q8 cleanup-resurrection watermark first). The commit graph IS now reconcilable from the manifest (RFC-013 Phase 7 — it is a pure projection of the `graph_commit`/`graph_head` rows); the traversal id-map is - still rebuilt. + still rebuilt. The CSR/CSC topology index is now **scoped and cross-branch + reused** (the two cuts that closed the cross-edge-join hang): the build covers + only the edge types a query traverses (`referenced_edge_types` over + `Expand`/`AntiJoin`, not every catalog edge — a single-edge join no longer + scans the whole graph's edge data), and the `RuntimeCache` cache key is each + edge table's physical identity `(table_key, version, table_branch, e_tag)` + plus the edge's `(from_type, to_type)` endpoint mapping — rather than the + resolved snapshot id — so a lazy-fork branch reuses main's built index instead + of cold-scanning it, while a schema repoint of an edge type (which changes the + built `TypeIndex` namespace) still rebuilds even if the edge table's physical + identity is unchanged. Residual: on stores without per-table e_tags (local FS) + a branch deleted and recreated at the same version with the same endpoints has + the same key, so the incarnation distinction falls back to the same-branch + manifest refresh clearing read caches (`invalidate_all`); production object + stores carry real e_tags, so the key alone distinguishes incarnations there + (the e_tag-present cross-branch-reuse path is exercised in CI by + `s3_storage.rs::s3_fresh_branch_traversal_reuses_main_graph_index_with_etags` + against RustFS, which surfaces real ETags — local-FS tests cannot reach it). + Known narrow gap (local FS only): a cold *cross-branch* resolve of a + recreated branch (a long-lived reader bound to another branch) does not trigger + that same-branch refresh, so an e_tag-less recreated branch can still reuse a + stale entry until a same-branch read refreshes — acceptable because local FS is + a dev/test substrate and production carries e_tags. - **Commit-graph parent under concurrency — CLOSED (RFC-013 Phase 7):** the graph commit is now recorded in the manifest publish CAS, and the publisher resolves the new commit's parent INSIDE its retry loop, per attempt, from the just-loaded diff --git a/docs/dev/testing.md b/docs/dev/testing.md index eb8323ef..be8f83fb 100644 --- a/docs/dev/testing.md +++ b/docs/dev/testing.md @@ -25,7 +25,7 @@ The engine's `tests/` is the principal coverage surface; most graph-shaped behav | `staged_writes.rs` | TableStore staged-write primitives (`stage_append`, `stage_merge_insert`, `commit_staged`, `scan_with_staged`, `count_rows_with_staged`) — primitive-level only; engine code uses the in-memory `MutationStaging` accumulator instead | | `forbidden_apis.rs` | Defense-in-depth source-walk guard: engine code (`exec/`, `db/omnigraph/`, `loader/`, `changes/`) must not reach around the sealed storage trait to Lance inline-commit APIs, nor open datasets directly (`Dataset::open` / `DatasetBuilder::from_uri`/`from_namespace`) — reads route through `Snapshot::open` and the held-handle cache; `// forbidden-api-allow: ` sentinel exempts reviewed lines | | `lance_surface_guards.rs` | Pins the Lance API surfaces omnigraph depends on (named runtime + compile-only guards; see [lance.md](lance.md)) — the first smoke check on any Lance version bump; e.g. `compact_files_still_fails_on_blob_columns` turns red when the upstream blob-compaction fix lands | -| `warm_read_cost.rs` | Cost-budget tests for the warm read path (query-latency work), measured at the object-store boundary with Lance `IOTracker` (the LanceDB IO-counted pattern): a warm same-branch read does 0 manifest opens, 0 commit-graph opens, 1 version probe, validates the schema once (Fix 1 / finding A / Fix 2 at commit-history depth); stale same-branch reads perform exactly 2 probes and refresh manifest-only; recreated non-main branches with the same Lance version refresh by incarnation; recreated branch-owned table handles are distinguished by table e_tag or refresh-time cache clearing; recreated traversal topology is protected by synthetic snapshot-id incarnation or refresh-time cache clearing; a warm *repeat* read does 0 table opens via the held-handle cache and a write re-opens only the changed table at its new version/e_tag (Fix 3/6A). See "Cost-budget tests" below | +| `warm_read_cost.rs` | Cost-budget tests for the warm read path (query-latency work), measured at the object-store boundary with Lance `IOTracker` (the LanceDB IO-counted pattern): a warm same-branch read does 0 manifest opens, 1 version probe, validates the schema once (Fix 1 / finding A / Fix 2 at commit-history depth); stale same-branch reads perform exactly 2 probes and refresh manifest-only; recreated non-main branches with the same Lance version refresh by incarnation; recreated branch-owned table handles are distinguished by table e_tag or refresh-time cache clearing; recreated traversal topology is protected by per-edge-table e_tag in the graph-index cache key or refresh-time cache clearing; a warm *repeat* read does 0 table opens via the held-handle cache and a write re-opens only the changed table at its new version/e_tag (Fix 3/6A). Also the CSR topology-build cost guards: `fresh_branch_traversal_reuses_main_graph_index` (A1 — a lazy-fork branch reuses main's cached CSR index, 0 rebuilds via `graph_build_count`) and `single_edge_query_builds_only_referenced_edge` (A2 — a one-edge query builds only that edge via `graph_edges_built`); both force CSR via the scoped `with_traversal_mode` seam, so they need no `#[serial]`. See "Cost-budget tests" below. | | `write_cost.rs` | Cost-budget tests for the WRITE path (RFC-013), the latency twin of `warm_read_cost.rs` on the **shared `helpers::cost` harness** (`measure`/`IoCounts`/`assert_flat`/`local_graph`). Runs on **local FS**; gates the **internal-table** term (`__manifest` scans flat in commit-history depth, lineage rows included — `internal_table_scans_are_flat_in_history`, now **green every-PR** since RFC-013 step 2 brought the internal tables into `optimize`; the test compacts at each depth before measuring) plus green every-PR guards (single-insert `data_writes` bounded, a per-write read-op ceiling that fails the moment a round-trip is added, and a `measure_with_staged` fitness assert that a keyed insert routes through `stage_merge_insert` once with no `stage_append`/vector-index build). The **data-table opener** term is S3-only — see `write_cost_s3.rs` and the backend-split note in "Cost-budget tests" below | | `helpers/cost.rs` | The shared cost-budget harness (not a test): `IoCounts`/`StagedCounts` (counts by table class), `measure`/`measure_with_staged` (the one place the `with_query_io_probes` + `MergeWriteProbes` task-local + `IOTracker` wiring lives; reads per-op deltas via lance's `incremental_stats()`, the upstream per-request idiom from `rust/lance/src/dataset/tests/dataset_io.rs`), `cost_harness`/`GraphIoMeter` (installs ONE `__manifest` `IOTracker` for a whole test body so the graph opens **under** it and `manifest_reads` is **ground truth** — every read regardless of handle age, the warm-coordinator freshness probe included — closing the blind spot where a per-op tracker installed at measure time cannot see a long-lived handle's reads; outside `cost_harness`, `measure` falls back to fresh per-op tracking, so `write_cost_s3.rs` is unaffected), `last_manifest_reads()` (the manifest read log for `assert_io_eq!`-style failure diagnostics), `assert_flat(curve, select, slack, what)`, and store-agnostic `local_graph`/`s3_graph` fixtures. `warm_read_cost.rs`, `write_cost.rs`, and `write_cost_s3.rs` all consume it so a cost test body is written once and reads in one vocabulary | | `lifecycle.rs` | Graph lifecycle, schema state | @@ -35,13 +35,13 @@ The engine's `tests/` is the principal coverage surface; most graph-shaped behav | `schema_apply.rs` | Migration plan + apply, schema-apply lock; index materialization deferred to the reconciler (iss-848): `apply_schema_defers_vector_index_on_empty_table` (an empty-table Vector `@index` never aborts the apply) and `index_only_constraint_apply_touches_no_table_data` (adding an `@index` is metadata-only — no table-version bump) | | `search.rs` | FTS / vector / hybrid (`bm25`, `nearest`, `rrf`) | | `traversal.rs` | `Expand`, variable-length hops, anti-join (CSR path — `OMNIGRAPH_TRAVERSAL_MODE` unset) | -| `traversal_indexed.rs` | BTREE-indexed Expand (`execute_expand_indexed`) forced via `OMNIGRAPH_TRAVERSAL_MODE`, asserted semantically equal to the CSR path; own binary, all `#[serial]` so env writes never race | +| `traversal_indexed.rs` | BTREE-indexed Expand (`execute_expand_indexed`) forced via the scoped `with_traversal_mode` seam (not the env var), asserted semantically equal to the CSR path. No `#[serial]` needed — the seam is scope-bound and process-safe. (The CSR topology-build cost guards — `fresh_branch_traversal_reuses_main_graph_index` (A1, `graph_build_count`) and `single_edge_query_builds_only_referenced_edge` (A2, `graph_edges_built`) — live in `warm_read_cost.rs`.) | | `proptest_equivalence.rs` | Property-based query-correctness invariants over generated graphs (shared key alphabet forces cross-type id collisions, cycles, self-loops) — pins Expand-mode equivalence so a future fork divergence fails loudly instead of silently; `#[serial]` | | `ordering.rs` | ORDER BY contract: descending, multi-key precedence, deterministic key-column tie-break (total order, so `ORDER … LIMIT` is deterministic), NULL placement (`nulls_first = !descending`) | | `literal_filters.rs` | Execution goldens for non-string/non-integer scalar literal filters (F64/F32/Bool/Date/DateTime) across both the in-memory comparison arm and the Lance-pushdown arm | | `aggregation.rs` | `count`, `sum`, `avg`, `min`, `max` | | `export.rs` | NDJSON streaming export filters | -| `s3_storage.rs` | S3-backed graph (skipped unless `OMNIGRAPH_S3_TEST_BUCKET` is set) | +| `s3_storage.rs` | S3-backed graph (skipped unless `OMNIGRAPH_S3_TEST_BUCKET` is set). Includes `s3_fresh_branch_traversal_reuses_main_graph_index_with_etags` — the CSR topology cache-key test on a **real** per-table e_tag (`None` on local FS, so `warm_read_cost.rs` can't reach this path); forces CSR via the scoped `with_traversal_mode` seam | | `lance_version_columns.rs` | Per-row `_row_last_updated_at_version` behavior | | `validators.rs` | Schema constraint enforcement (enum, range, unique, cardinality) across JSONL, insert, update paths | | `policy_engine_chassis.rs` | Engine-layer Cedar enforcement (MR-722): allow + deny through every `_as` writer via the SDK directly — no HTTP — proving embedded and CLI callers hit the same gate as the server, with action × scope shapes matching `authorize_request` | @@ -75,7 +75,7 @@ The engine's `tests/` is the principal coverage surface; most graph-shaped behav CI runs these S3-backed tests against a containerized RustFS server (`.github/workflows/ci.yml` → `rustfs_integration` job): -- `cargo test -p omnigraph-engine --test s3_storage` +- `cargo test -p omnigraph-engine --test s3_storage` (lifecycle/branching + the e_tag-present CSR topology cache-key reuse test — the path local FS can't reach since its e_tag is `None`) - `cargo test -p omnigraph-engine --test write_cost_s3` (RFC-013 step 3a's data-table opener cost gate — flat across commit depth on S3; the term local FS can't reproduce) - `cargo test -p omnigraph-server --test s3` (single-graph serving + config-free `--cluster s3://` boot) - `cargo test -p omnigraph-cluster --test s3_cluster` (full control-plane lifecycle on the bucket)