Skip to content
Merged
1 change: 1 addition & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ jobs:
crates/omnigraph/src/db/manifest.rs|crates/omnigraph/src/db/manifest/*) run_rustfs_ci=true ;;
crates/omnigraph/tests/s3_storage.rs|crates/omnigraph/tests/write_cost_s3.rs|crates/omnigraph/tests/helpers/*) run_rustfs_ci=true ;;
crates/omnigraph/src/table_store.rs|crates/omnigraph/src/instrumentation.rs) run_rustfs_ci=true ;;
crates/omnigraph/src/runtime_cache.rs|crates/omnigraph/src/graph_index/*) run_rustfs_ci=true ;;
crates/omnigraph-cluster/src/store.rs|crates/omnigraph-cluster/src/serve.rs) run_rustfs_ci=true ;;
crates/omnigraph-cluster/tests/s3_cluster.rs) run_rustfs_ci=true ;;
crates/omnigraph-server/tests/s3.rs|crates/omnigraph-server/tests/support/*) run_rustfs_ci=true ;;
Expand Down
3 changes: 2 additions & 1 deletion crates/omnigraph/src/db/omnigraph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1296,8 +1296,9 @@ impl Omnigraph {
pub(crate) async fn graph_index_for_resolved(
&self,
resolved: &ResolvedTarget,
edge_types: &std::collections::HashMap<String, (String, String)>,
) -> Result<Arc<crate::graph_index::GraphIndex>> {
table_ops::graph_index_for_resolved(self, resolved).await
table_ops::graph_index_for_resolved(self, resolved, edge_types).await
}

/// Ensure BTree scalar indices exist on key columns.
Expand Down
13 changes: 10 additions & 3 deletions crates/omnigraph/src/db/omnigraph/table_ops.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,22 @@ pub(super) async fn graph_index(db: &Omnigraph) -> Result<Arc<crate::graph_index
.await?;
drop(coord);
let catalog = db.catalog();
db.runtime_cache.graph_index(&resolved, &catalog).await
// Whole-graph entry point: cover every edge type. Query execution scopes to
// the edges it actually traverses (see `referenced_edge_types`).
let edge_types: std::collections::HashMap<String, (String, String)> = catalog
.edge_types
.iter()
.map(|(name, et)| (name.clone(), (et.from_type.clone(), et.to_type.clone())))
.collect();
db.runtime_cache.graph_index(&resolved, &edge_types).await
}

pub(super) async fn graph_index_for_resolved(
db: &Omnigraph,
resolved: &ResolvedTarget,
edge_types: &std::collections::HashMap<String, (String, String)>,
) -> Result<Arc<crate::graph_index::GraphIndex>> {
let catalog = db.catalog();
db.runtime_cache.graph_index(resolved, &catalog).await
db.runtime_cache.graph_index(resolved, edge_types).await
}

pub(super) async fn ensure_indices(db: &Omnigraph) -> Result<Vec<PendingIndex>> {
Expand Down
186 changes: 172 additions & 14 deletions crates/omnigraph/src/exec/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,11 @@ impl Omnigraph {
.any(|op| matches!(op, IROp::Expand { .. } | IROp::AntiJoin { .. }));
// Lazy: an index-served query with no AntiJoin never builds the CSR.
let graph_index = if needs_graph {
GraphIndexHandle::cached(self, &resolved)
GraphIndexHandle::cached(
self,
&resolved,
referenced_edge_types(&ir.pipeline, &catalog),
)
} else {
GraphIndexHandle::none()
};
Expand Down Expand Up @@ -95,14 +99,9 @@ impl Omnigraph {
.any(|op| matches!(op, IROp::Expand { .. } | IROp::AntiJoin { .. }));
// Lazy build against this historical snapshot (not the RuntimeCache,
// which is keyed to live branch targets); only a CSR-path Expand or an
// AntiJoin triggers it.
// AntiJoin triggers it. Scoped to the edges this query traverses.
let graph_index = if needs_graph {
let edge_types = catalog
.edge_types
.iter()
.map(|(name, et)| (name.clone(), (et.from_type.clone(), et.to_type.clone())))
.collect();
GraphIndexHandle::direct(&snapshot, edge_types)
GraphIndexHandle::direct(&snapshot, referenced_edge_types(&ir.pipeline, &catalog))
} else {
GraphIndexHandle::none()
};
Expand Down Expand Up @@ -762,6 +761,51 @@ fn execute_pipeline<'a>(
})
}

/// The edge types a query's pipeline actually traverses, mapped to their
/// `(from_type, to_type)` endpoints. Recurses through `AntiJoin` inner pipelines
/// (whose bulk fast path consumes the CSR for the inner `Expand`'s edge). The
/// CSR build is scoped to exactly this set instead of every edge type in the
/// catalog — otherwise a single-edge join (`$x identifiesPerson $p`) that lands
/// on the CSR path would scan the whole graph's edge data (every message,
/// relationship, … table), the cause of the cross-edge-join hang. Empty when the
/// only traversal is an `AntiJoin` with no inner `Expand` — that shape never asks
/// the handle for an index, so an empty build is never realized.
fn referenced_edge_types(
pipeline: &[IROp],
catalog: &Catalog,
) -> HashMap<String, (String, String)> {
let mut names = std::collections::BTreeSet::new();
collect_referenced_edge_names(pipeline, &mut names);
names
.into_iter()
.filter_map(|name| {
catalog
.edge_types
.get(&name)
.map(|et| (name, (et.from_type.clone(), et.to_type.clone())))
})
.collect()
}

fn collect_referenced_edge_names(
pipeline: &[IROp],
out: &mut std::collections::BTreeSet<String>,
) {
for op in pipeline {
match op {
IROp::Expand { edge_type, .. } => {
out.insert(edge_type.clone());
}
IROp::AntiJoin { inner, .. } => collect_referenced_edge_names(inner, out),
// Exhaustive on purpose (no `_` arm): a new edge-referencing IROp must
// force a compile error here rather than silently under-scope the CSR
// build — an omitted edge would fail at runtime with "no adjacency
// index for edge". The non-traversal ops reference no edges.
IROp::NodeScan { .. } | IROp::Filter(_) => {}
}
}
}

/// Lazily provides the in-memory CSR graph index, building it on first use and
/// memoizing for the rest of the query. Indexed-mode Expand never asks for it,
/// so a query that is entirely index-served and has no AntiJoin never pays the
Expand All @@ -776,7 +820,11 @@ pub struct GraphIndexHandle<'a> {

enum GraphIndexBuilder<'a> {
None,
Cached(&'a Omnigraph, &'a crate::db::ResolvedTarget),
Cached(
&'a Omnigraph,
&'a crate::db::ResolvedTarget,
HashMap<String, (String, String)>,
),
Direct(&'a Snapshot, HashMap<String, (String, String)>),
}

Expand All @@ -788,10 +836,14 @@ impl<'a> GraphIndexHandle<'a> {
}
}

fn cached(db: &'a Omnigraph, resolved: &'a crate::db::ResolvedTarget) -> Self {
fn cached(
db: &'a Omnigraph,
resolved: &'a crate::db::ResolvedTarget,
edge_types: HashMap<String, (String, String)>,
) -> Self {
Self {
cell: tokio::sync::OnceCell::new(),
builder: GraphIndexBuilder::Cached(db, resolved),
builder: GraphIndexBuilder::Cached(db, resolved, edge_types),
}
}

Expand All @@ -810,8 +862,8 @@ impl<'a> GraphIndexHandle<'a> {
.get_or_try_init(|| async {
match &self.builder {
GraphIndexBuilder::None => Ok::<Option<Arc<GraphIndex>>, OmniError>(None),
GraphIndexBuilder::Cached(db, resolved) => {
Ok(Some(db.graph_index_for_resolved(resolved).await?))
GraphIndexBuilder::Cached(db, resolved, edge_types) => {
Ok(Some(db.graph_index_for_resolved(resolved, edge_types).await?))
}
GraphIndexBuilder::Direct(snapshot, edge_types) => {
Ok(Some(Arc::new(GraphIndex::build(snapshot, edge_types).await?)))
Expand All @@ -834,7 +886,12 @@ impl<'a> GraphIndexHandle<'a> {
/// forces the path (ops escape hatch + test hook). Both modes are semantically
/// identical, so the override only changes which path runs, never the result.
fn traversal_indexed_override() -> Option<bool> {
match std::env::var("OMNIGRAPH_TRAVERSAL_MODE").ok().as_deref() {
// The scoped test seam (`with_traversal_mode`) takes precedence over the
// process-global `OMNIGRAPH_TRAVERSAL_MODE` ops escape hatch.
let mode = crate::instrumentation::traversal_mode_override()
.map(str::to_string)
.or_else(|| std::env::var("OMNIGRAPH_TRAVERSAL_MODE").ok());
match mode.as_deref() {
Some("indexed") => Some(true),
Some("csr") => Some(false),
_ => None,
Expand Down Expand Up @@ -2460,6 +2517,107 @@ mod expand_chooser_tests {
}
}

#[cfg(test)]
mod referenced_edge_types_tests {
use super::*;

fn node_scan(var: &str, ty: &str) -> IROp {
IROp::NodeScan {
variable: var.to_string(),
type_name: ty.to_string(),
filters: Vec::new(),
}
}

fn expand(edge: &str) -> IROp {
IROp::Expand {
src_var: "a".into(),
dst_var: "b".into(),
edge_type: edge.to_string(),
direction: Direction::Out,
dst_type: "X".into(),
min_hops: 1,
max_hops: Some(1),
dst_filters: Vec::new(),
}
}

fn names(pipeline: &[IROp]) -> Vec<String> {
let mut set = std::collections::BTreeSet::new();
collect_referenced_edge_names(pipeline, &mut set);
set.into_iter().collect()
}

#[test]
fn collects_a_single_expand_edge() {
assert_eq!(
names(&[node_scan("x", "ExternalID"), expand("identifiesPerson")]),
vec!["identifiesPerson".to_string()]
);
}

#[test]
fn ignores_non_traversal_ops_and_dedups() {
// A pipeline that touches one edge twice references exactly that one edge —
// never the whole catalog (the cross-edge-join hang this scoping fixes).
let pipeline = vec![
node_scan("x", "ExternalID"),
expand("identifiesPerson"),
IROp::Filter(IRFilter {
left: IRExpr::PropAccess {
variable: "p".into(),
property: "name".into(),
},
op: omnigraph_compiler::query::ast::CompOp::Eq,
right: IRExpr::Literal(Literal::String("a".into())),
}),
expand("identifiesPerson"),
];
assert_eq!(names(&pipeline), vec!["identifiesPerson".to_string()]);
}

#[test]
fn recurses_through_anti_join_inner_pipeline() {
// The bulk anti-join fast path consumes the CSR for the inner Expand's
// edge, so its edge type must be in scope even though it is nested.
let pipeline = vec![
node_scan("p", "Person"),
expand("knows"),
IROp::AntiJoin {
outer_var: "p".into(),
inner: vec![expand("worksAt")],
},
];
assert_eq!(
names(&pipeline),
vec!["knows".to_string(), "worksAt".to_string()]
);
}

#[test]
fn recurses_through_nested_anti_joins() {
let pipeline = vec![IROp::AntiJoin {
outer_var: "p".into(),
inner: vec![IROp::AntiJoin {
outer_var: "c".into(),
inner: vec![expand("deepEdge")],
}],
}];
assert_eq!(names(&pipeline), vec!["deepEdge".to_string()]);
}

#[test]
fn anti_join_with_no_inner_expand_references_no_edges() {
// A predicate-only anti-join never asks the handle for an index, so the
// empty set is correct — no whole-graph build is realized.
let pipeline = vec![IROp::AntiJoin {
outer_var: "p".into(),
inner: vec![node_scan("c", "Company")],
}];
assert!(names(&pipeline).is_empty());
}
}

#[cfg(test)]
mod literal_lowering_tests {
use super::*;
Expand Down
6 changes: 6 additions & 0 deletions crates/omnigraph/src/graph_index/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,12 @@ impl GraphIndex {
snapshot: &Snapshot,
edge_types: &HashMap<String, (String, String)>, // edge_name → (from_type, to_type)
) -> Result<Self> {
// INVARIANT (A1 graph-index cache key): the topology is a pure function of
// the edge tables' `src`/`dst` columns and nothing else. `RuntimeCache`
// keys `GraphIndexCacheKey` on each edge table's physical identity
// `(table_key, version, table_branch, e_tag)` so a lazy-fork branch reuses
// main's built index. If you read node tables, schema, or other state here,
// add it to that key or the cache will serve a stale index.
let mut type_indices: HashMap<String, TypeIndex> = HashMap::new();
let mut csr = HashMap::new();
let mut csc = HashMap::new();
Expand Down
44 changes: 44 additions & 0 deletions crates/omnigraph/src/instrumentation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,14 @@ pub struct QueryIoProbes {
/// Internal/system-table (`__manifest`) open CALLS — the complement of
/// `data_open_count`, kept for symmetry and debugging.
pub internal_open_count: Arc<AtomicU64>,
/// Counts topology-index builds (the `RuntimeCache::graph_index` cache-miss
/// path). A cost test asserts a fresh branch whose edge tables are unchanged
/// from main reuses main's cached index (0 builds) rather than rebuilding it.
pub graph_build_count: Arc<AtomicU64>,
/// Edge tables included in topology builds this query (summed over build
/// invocations). A cost test asserts a query referencing one edge builds only
/// that edge, not every catalog edge (the cold-build shrink A2 ships).
pub graph_edges_built: Arc<AtomicU64>,
}

tokio::task_local! {
Expand All @@ -78,6 +86,32 @@ fn current<R>(f: impl FnOnce(&QueryIoProbes) -> R) -> Option<R> {
QUERY_IO_PROBES.try_with(f).ok()
}

tokio::task_local! {
static TRAVERSAL_MODE_OVERRIDE: Option<&'static str>;
}

/// Force the Expand execution mode (`"indexed"` | `"csr"`) for the scope of `fut`
/// WITHOUT mutating the process-global `OMNIGRAPH_TRAVERSAL_MODE` env var. This is
/// the general traversal-mode test seam: scope-bound (so it cannot leak — the
/// override is gone when `fut` resolves or unwinds) and process-safe (it never
/// touches shared state, so a forced-mode test never affects a concurrent test in
/// the same binary, removing the need for `#[serial]` + a dedicated all-serial
/// binary). Mirrors [`with_query_io_probes`]. The env var stays the production/ops
/// escape hatch; this scoped override takes precedence over it
/// (`exec::query::traversal_indexed_override`).
pub async fn with_traversal_mode<F>(mode: &'static str, fut: F) -> F::Output
where
F: std::future::Future,
{
TRAVERSAL_MODE_OVERRIDE.scope(Some(mode), fut).await
}

/// The scoped traversal-mode override active for this task, if any. `None` in
/// production (no scope installed), so the env var is consulted instead.
pub(crate) fn traversal_mode_override() -> Option<&'static str> {
TRAVERSAL_MODE_OVERRIDE.try_with(|m| *m).ok().flatten()
}

pub(crate) fn manifest_wrapper() -> Option<Arc<dyn WrappingObjectStore>> {
current(|p| p.manifest_wrapper.clone()).flatten()
}
Expand Down Expand Up @@ -119,6 +153,16 @@ pub(crate) fn record_open(uri: &str) {
});
}

/// Record one topology-index build over `edges` edge tables (the
/// `RuntimeCache::graph_index` cache-miss path). No-op when no probes are
/// installed (production).
pub(crate) fn record_graph_build(edges: usize) {
let _ = current(|p| {
p.graph_build_count.fetch_add(1, Ordering::Relaxed);
p.graph_edges_built.fetch_add(edges as u64, Ordering::Relaxed);
});
}

/// Per-operation staged-write counts, installed for a task via
/// [`with_merge_write_probes`]. Lets a cost-budget test assert WHICH staged-write
/// primitive an operation invokes — e.g. that an append-only fast-forward merge
Expand Down
Loading