Skip to content
Closed
4 changes: 4 additions & 0 deletions crates/omnigraph/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,10 @@ omnigraph-compiler = { path = "../omnigraph-compiler", version = "0.7.1" }
tokio = { workspace = true }
lance-namespace-impls = { workspace = true }
lance-io = "7.0.0"
arrow-array = { workspace = true }
futures = { workspace = true }
serde_json = { workspace = true }
serial_test = "3"
proptest = "1"
proptest-state-machine = "0.8"

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Locked Test Build Breaks

This adds proptest-state-machine without updating Cargo.lock. The repo's documented test gate uses cargo test --workspace --locked, so the first locked build of this test target will fail before any DST tests run.

Context Used: AGENTS.md (source)

Fix in Claude Code

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Commit the lockfile entry for the new test dependency

Adding proptest-state-machine changes dependency resolution, but this commit does not update Cargo.lock (there is no proptest-state-machine entry in the lockfile). The documented CI gate runs cargo test --workspace --locked, so a clean checkout of this commit will fail before building until the lockfile is regenerated and committed.

Useful? React with 👍 / 👎.

async-trait = { workspace = true }
323 changes: 318 additions & 5 deletions crates/omnigraph/tests/dst.rs

Large diffs are not rendered by default.

7 changes: 7 additions & 0 deletions crates/omnigraph/tests/dst/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,13 @@ pub async fn open_clean(uri: &str) -> Omnigraph {
Omnigraph::init(uri, SCHEMA).await.expect("init")
}

/// Reopen an EXISTING graph from its bytes on disk (runs the open-time recovery
/// sweep in read-write mode). Backs the reopen==pre_state durability oracle: a
/// fresh handle must agree with the model the walk built.
pub async fn reopen(uri: &str) -> Omnigraph {
Omnigraph::open(uri).await.expect("reopen")
}

/// A graph whose storage injects seeded manifest-layer faults (CAS-lost). The
/// graph + schema are created cleanly first, then reopened through the
/// `FaultAdapter` so only the op workload runs under faults.
Expand Down
2 changes: 1 addition & 1 deletion crates/omnigraph/tests/dst/coverage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ impl Coverage {
}
pub fn report(&self) -> String {
format!(
"ops {}/{}, invariants {}/4, known-bugs exercised {}",
"ops {}/{}, invariants {}/7, known-bugs exercised {}",
self.ops.len(),
OpKind::ALL.len(),
self.invariants.len(),
Expand Down
143 changes: 138 additions & 5 deletions crates/omnigraph/tests/dst/invariants.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,15 @@
//! always a real finding (the named regressions cover the known concurrency
//! cases separately).

use std::collections::HashSet;

use arrow_array::{RecordBatch, UInt64Array};
use futures::TryStreamExt;
use omnigraph::db::{Omnigraph, ReadTarget};
use omnigraph::error::OmniError;
use omnigraph_compiler::ir::ParamMap;

#[derive(Debug)]
pub enum Finding {
/// An engine-returned error (variant preserved for structured classification).
Engine(OmniError),
Expand All @@ -37,10 +42,15 @@ pub fn classify(f: &Finding) -> Option<&'static str> {
match f {
Finding::Engine(e) => {
let s = e.to_string();
// RC-1: stale-view manifest CAS — gated on the Manifest variant so a
// coincidental substring in another subsystem cannot mask it.
// RC-1: edge-table HEAD/manifest drift from the node-delete cascade.
// Gated on the Manifest variant so a coincidental substring elsewhere
// can't mask it. Two surfaces of the same root: the write-time CAS
// "stale view", and a LATER write's precondition refusing the
// uncovered drift ("ahead of manifest ... run omnigraph repair").
if matches!(e, OmniError::Manifest(_))
&& (s.contains("stale view") || (s.contains("expected") && s.contains("current")))
&& (s.contains("stale view")
|| (s.contains("expected") && s.contains("current"))
|| s.contains("ahead of manifest version"))
{
return Some("RC-1 stale-view");
}
Expand All @@ -51,11 +61,49 @@ pub fn classify(f: &Finding) -> Option<&'static str> {
}
None
}
// Structural divergences are never allow-listed.
Finding::Logical(_) => None,
// Structural divergences are novel by default, with ONE narrow
// exception: dup-`@key` (MR-714) is a known open bug, so a finding whose
// message carries the `dup-@key` marker is allow-listed like the engine
// known-bugs above. Every other structural divergence stays novel.
Finding::Logical(s) => {
if s.contains("dup-@key") {
return Some("dup-@key MR-714");
}
None
}
}
}

/// Extract a readable message from a caught panic payload.
pub fn panic_message(p: &(dyn std::any::Any + Send)) -> String {
if let Some(s) = p.downcast_ref::<&str>() {
(*s).to_string()
} else if let Some(s) = p.downcast_ref::<String>() {
s.clone()
} else {
"<non-string panic payload>".to_string()
}
}

/// Classify a caught panic (a Lance-internal `unwrap`/index crash that unwinds
/// through the engine) the same KNOWN-vs-NOVEL way as `classify`. Under fault
/// injection and at walk depth the substrate WILL crash, so the harness must
/// treat a panic as a finding, not let it abort the suite. NONE → re-raise.
///
/// Note on `index out of bounds`: in THIS harness the only source of that panic
/// is Lance's inverted (FTS) index builder — the harness's own code uses checked
/// loops and returns `Logical` findings, never an OOB panic — so the broad match
/// is safe here and would not mask a harness bug.
pub fn classify_panic(msg: &str) -> Option<&'static str> {
if msg.contains("from_sorted_iter") || msg.contains("non-sorted input") {
return Some("RC-X/#7230 scalar-BTREE");
}
if msg.contains("index out of bounds") {
return Some("Lance FTS inverted-builder OOB");
}
None
}

// ── INVARIANT #1: Lance HEAD == manifest table version, per table ──
pub async fn head_eq_manifest(db: &Omnigraph) -> Result<(), Finding> {
let snap = db
Expand Down Expand Up @@ -92,6 +140,58 @@ pub async fn dataset_validate(db: &Omnigraph) -> Result<(), Finding> {
Ok(())
}

// ── INVARIANT #3a: no duplicate LIVE stable row-id within a table ──
// A stable row id uniquely identifies one live row for the dataset's lifetime;
// the same id appearing on two live rows is exactly the duplicate-row-address
// corruption class behind RC-X / Lance #7230. We read the truth deletion-vector-
// correctly by scanning each table with `with_row_id()` (Lance's `_rowid`
// projection returns only LIVE rows, so a tombstoned id masked by an UPDATE or
// compaction never counts) and asserting the live `_rowid`s are unique. Unlike
// `index_probe` (which only surfaces a bad scalar-BTREE page when a filtered
// READ loads it), this is a direct structural check on every committed row.
//
// (An earlier version compared raw `row_id_meta` fragment ranges, but those
// ranges include tombstoned ids — after an UPDATE+compaction an old fragment's
// range legitimately overlaps the new fragment's, so that check false-positived.
// Scanning live `_rowid`s is the deletion-vector-aware form.)
pub async fn no_duplicate_live_row_ids(db: &Omnigraph) -> Result<(), Finding> {
let snap = db
.snapshot_of(ReadTarget::branch("main"))
.await
.map_err(Finding::Engine)?;
for entry in snap.entries() {
let ds = snap.open(&entry.table_key).await.map_err(Finding::Engine)?;
let mut scanner = ds.scan();
scanner.with_row_id();
let batches: Vec<RecordBatch> = scanner
.try_into_stream()
.await
.map_err(|e| Finding::Logical(format!("scan {}: {e}", entry.table_key)))?
.try_collect()
.await
.map_err(|e| Finding::Logical(format!("scan collect {}: {e}", entry.table_key)))?;
let mut seen: HashSet<u64> = HashSet::new();
for batch in &batches {
let col = batch
.column_by_name("_rowid")
.ok_or_else(|| Finding::Logical(format!("no _rowid column on {}", entry.table_key)))?
.as_any()
.downcast_ref::<UInt64Array>()
.ok_or_else(|| Finding::Logical(format!("_rowid not u64 on {}", entry.table_key)))?;
for i in 0..col.len() {
let id = col.value(i);
if !seen.insert(id) {
return Err(Finding::Logical(format!(
"duplicate live stable row-id {id} in {} (RC-X-class duplicate row address)",
entry.table_key
)));
}
}
}
}
Ok(())
}

// ── INVARIANT #3: scalar-index probe (catches RC-X at creation) ──
// Force-loads the Doc.source BTREE flat pages by filtering each enum value.
pub async fn index_probe(db: &Omnigraph) -> Result<(), Finding> {
Expand All @@ -104,13 +204,46 @@ pub async fn index_probe(db: &Omnigraph) -> Result<(), Finding> {
Ok(())
}

// ── @key uniqueness (no sequential model — for the concurrent oracle) ──
// Every `@key` value may appear on at most one live row. Concurrent same-key
// upserts that produce duplicates are MR-714 (dup-`@key`); `classify` allow-
// lists the `dup-@key` marker as that known bug.
pub async fn no_duplicate_keys(db: &Omnigraph, ty: &str, branch: &str) -> Result<(), Finding> {
let q = format!("query q() {{ match {{ $x: {ty} }} return {{ $x.slug }} }}");
let res = db
.query(ReadTarget::branch(branch), &q, "q", &ParamMap::new())
.await
.map_err(Finding::Engine)?;
let json = res.to_rust_json();
let rows = json
.as_array()
.ok_or_else(|| Finding::Logical(format!("{ty} key scan not an array")))?;
let total = rows.len();
let mut seen: HashSet<String> = HashSet::new();
for row in rows {
let slug = row["x.slug"]
.as_str()
.ok_or_else(|| Finding::Logical(format!("missing x.slug in {ty}")))?;
if !seen.insert(slug.to_string()) {
return Err(Finding::Logical(format!(
"duplicate @key {slug:?} in {ty} ({total} rows, {} distinct) — dup-@key",
seen.len()
)));
}
}
Ok(())
}

/// The full battery as a registry: `(name, result)` per invariant. Adding an
/// invariant is one line here; the walk iterates and the coverage map records.
pub async fn run_battery(db: &Omnigraph, model: &crate::model::Model) -> Vec<(&'static str, Result<(), Finding>)> {
vec![
("head==manifest", head_eq_manifest(db).await),
("dataset.validate", dataset_validate(db).await),
("row-id-unique", no_duplicate_live_row_ids(db).await),
("index-probe", index_probe(db).await),
("count==model", crate::model::check_counts(db, model).await),
("content==model", crate::model::check_content(db, model).await),
("edges==model", crate::model::check_edges(db, model).await),
]
}
Loading
Loading