diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 1e9249f9..965f7bc6 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -94,6 +94,8 @@ jobs: crates/omnigraph-cluster/tests/s3_cluster.rs) run_rustfs_ci=true ;; crates/omnigraph-server/tests/s3.rs|crates/omnigraph-server/tests/support/*) run_rustfs_ci=true ;; crates/omnigraph-cli/tests/system_local.rs) run_rustfs_ci=true ;; + crates/omnigraph/tests/dst.rs|crates/omnigraph/tests/dst/*) run_rustfs_ci=true ;; + crates/omnigraph-dst/*) run_rustfs_ci=true ;; esac done @@ -403,6 +405,19 @@ jobs: echo "$output" | grep -Eq "test result: ok\. [1-9][0-9]* passed" \ || { echo "::error::filter 's3_' matched no tests — vacuous pass"; exit 1; } + - name: Run RustFS DST white-box battery + # The DST harness's S3-context battery (`s3_battery_holds`) is bucket- + # gated and lives in the `dst` target, which no other job runs — without + # this step the battery skips everywhere and MATRIX/testing.md overstate + # the S3 coverage. Same grep-guard as above: the name filter must match a + # real test (here the bucket IS set, so it runs rather than skips). + run: | + output=$(cargo test --locked -p omnigraph-engine --test dst s3_battery_holds -- --nocapture 2>&1); status=$? + echo "$output" + [ "$status" -eq 0 ] || exit "$status" + echo "$output" | grep -Eq "test result: ok\. [1-9][0-9]* passed" \ + || { echo "::error::filter 's3_battery_holds' matched no tests — vacuous pass"; exit 1; } + - name: Dump RustFS logs on failure if: failure() run: docker logs rustfs diff --git a/Cargo.lock b/Cargo.lock index 16a18274..fb89d7ee 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4867,11 +4867,13 @@ dependencies = [ "assert_cmd", "clap", "color-eyre", + "futures", "lance", "lance-index", "omnigraph-api-types", "omnigraph-cluster", "omnigraph-compiler", + "omnigraph-dst", "omnigraph-engine", "omnigraph-policy", "omnigraph-server", @@ -4922,6 +4924,20 @@ dependencies = [ "thiserror", ] +[[package]] +name = "omnigraph-dst" +version = "0.7.2" +dependencies = [ + "arrow-array", + "async-trait", + "futures", + "omnigraph-compiler", + "omnigraph-engine", + "serde_json", + "tempfile", + "tokio", +] + [[package]] name = "omnigraph-engine" version = "0.7.2" @@ -4949,8 +4965,10 @@ dependencies = [ "lance-table", "object_store", "omnigraph-compiler", + "omnigraph-dst", "omnigraph-policy", "proptest", + "proptest-state-machine", "regex", "reqwest 0.12.28", "serde", @@ -5689,6 +5707,15 @@ dependencies = [ "unarray", ] +[[package]] +name = "proptest-state-machine" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1675727965d66ff6f335e7d398e477184da08f4bed22f1a7f0dbf2f077f56f2e" +dependencies = [ + "proptest", +] + [[package]] name = "prost" version = "0.14.3" diff --git a/Cargo.toml b/Cargo.toml index c442242c..4f0b8b8a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -8,6 +8,7 @@ members = [ "crates/omnigraph-cluster", "crates/omnigraph-policy", "crates/omnigraph-server", + "crates/omnigraph-dst", ] default-members = [ "crates/omnigraph", diff --git a/crates/omnigraph-cli/Cargo.toml b/crates/omnigraph-cli/Cargo.toml index df4ac8d3..c79286d0 100644 --- a/crates/omnigraph-cli/Cargo.toml +++ b/crates/omnigraph-cli/Cargo.toml @@ -34,3 +34,5 @@ serde_json = { workspace = true } tempfile = { workspace = true } lance = { workspace = true } lance-index = { workspace = true } +omnigraph-dst = { path = "../omnigraph-dst" } +futures = { workspace = true } diff --git a/crates/omnigraph-cli/tests/cli_cross_backend_walk.rs b/crates/omnigraph-cli/tests/cli_cross_backend_walk.rs new file mode 100644 index 00000000..ff5f5d2b --- /dev/null +++ b/crates/omnigraph-cli/tests/cli_cross_backend_walk.rs @@ -0,0 +1,117 @@ +//! Cross-backend generative walk (PR-E): the SAME seeded DST op sequence driven +//! against the embedded `Omnigraph` SDK AND the `omnigraph` CLI subprocess must +//! agree, per step, on the black-box state (Person/Doc slug sets + traversable +//! edge count). +//! +//! Honest scope: the CLI wraps the SAME engine, so this is NOT a second engine +//! implementation — it verifies the CLI transport layer (arg parsing, output +//! serialization, `--store` addressing, `--as` actor resolution) faithfully +//! reflects the embedded engine across a realistic generative sequence. The +//! white-box invariant battery stays embedded-only (it needs the real handle); +//! this is the black-box half the CLI arm can satisfy. Complements the +//! single-op `parity_matrix.rs` (CLI-local vs CLI-HTTP) and the fixed-sequence +//! `cli_dst_parity.rs` smoke with a generated, model-driven op stream. +//! +//! Lockstep determinism: both walks use the same seed and an identical reference +//! `Model`, and both wrap the same engine, so the op stream matches step-for- +//! step. Any divergence in op-kind, success, or resulting state fails the assert +//! immediately — before the two RNGs/models could desync. + +use std::collections::BTreeSet; +use std::path::PathBuf; + +use omnigraph_dst::op::OpKind; +use omnigraph_dst::{step, Backend, BackendError, Cli, Embedded, Model, Rng}; + +/// All slug values for a node type, normalized to an order-independent set. The +/// slug key is matched by suffix so the same parse works for the embedded +/// `to_rust_json` (`"x.slug"`) and the CLI `--json` shape. +async fn slugs(b: &B, ty: &str) -> BTreeSet { + let q = format!("query q() {{ match {{ $x: {ty} }} return {{ $x.slug }} }}"); + let rows = b.query("main", &q).await.expect("slug query"); + rows.iter() + .filter_map(|r| { + r.as_object()? + .iter() + .find(|(k, _)| k.ends_with("slug")) + .and_then(|(_, v)| v.as_str()) + .map(|s| s.to_string()) + }) + .collect() +} + +/// The traversable `Knows` edge count (`$a knows $b`) — the black-box edge oracle. +async fn edge_count(b: &B) -> usize { + let rows = b + .query( + "main", + "query q() { match { $a: Person $a knows $b } return { $a.slug, $b.slug } }", + ) + .await + .expect("edge query"); + rows.len() +} + +#[tokio::test] +async fn embedded_and_cli_agree_on_seeded_walk() { + let bin = PathBuf::from(env!("CARGO_BIN_EXE_omnigraph")); + + for seed in 0..2u64 { + let emb_dir = tempfile::tempdir().unwrap(); + let cli_dir = tempfile::tempdir().unwrap(); + let emb = Embedded::open_clean(emb_dir.path().to_str().unwrap()).await; + let cli = Cli::new(bin.clone(), cli_dir.path().to_str().unwrap().to_string()); + cli.init().await.expect("cli init"); + + let mut emb_rng = Rng::new(seed); + let mut cli_rng = Rng::new(seed); + let mut emb_model = Model::new(); + let mut cli_model = Model::new(); + + for i in 0..18 { + let (ek, eres) = step(&emb, &mut emb_rng, &mut emb_model).await; + let (ck, cres) = step(&cli, &mut cli_rng, &mut cli_model).await; + + assert_eq!(ek, ck, "seed={seed} step={i}: op-kind diverged"); + // Repair's success FLAG legitimately differs in ONE known case: + // embedded `repair(force=false)` returns Ok and leaves suspicious/ + // unverifiable drift in place (e.g. the drift RC-1 strands), whereas + // the CLI `repair --confirm` (no --force) EXITS NON-ZERO refusing to + // publish it. Allow-list exactly that (emb Ok + CLI "refused + // suspicious" error); a no-op on logical data either way, so the STATE + // assertions below still match. Any OTHER divergence — including a + // broken repair invocation, or any non-Repair op — must still fail. + let known_repair_divergence = ek == OpKind::Repair + && eres.is_ok() + && cres.as_ref().err().is_some_and(|e| { + let m = e.message(); + m.contains("repair refused") || m.contains("suspicious") + }); + if !known_repair_divergence { + assert_eq!( + eres.is_ok(), + cres.is_ok(), + "seed={seed} step={i} op[{ek:?}]: success diverged (emb={:?} cli={:?})", + eres.as_ref().err().map(BackendError::message), + cres.as_ref().err().map(BackendError::message), + ); + } + + assert_eq!( + slugs(&emb, "Person").await, + slugs(&cli, "Person").await, + "seed={seed} step={i} op[{ek:?}]: Person slug set diverged" + ); + assert_eq!( + slugs(&emb, "Doc").await, + slugs(&cli, "Doc").await, + "seed={seed} step={i} op[{ek:?}]: Doc slug set diverged" + ); + assert_eq!( + edge_count(&emb).await, + edge_count(&cli).await, + "seed={seed} step={i} op[{ek:?}]: traversable edge count diverged" + ); + } + } +} diff --git a/crates/omnigraph-cli/tests/cli_dst_parity.rs b/crates/omnigraph-cli/tests/cli_dst_parity.rs new file mode 100644 index 00000000..1e3cf250 --- /dev/null +++ b/crates/omnigraph-cli/tests/cli_dst_parity.rs @@ -0,0 +1,152 @@ +//! CLI cross-backend smoke (DST PR-D follow-up). +//! +//! A DST-flavored op SEQUENCE — init → two `load --merge` (multi-fragment) → +//! insert → edge-free delete → query — run against the EMBEDDED `Omnigraph` SDK +//! and the `omnigraph` CLI SUBPROCESS on twin local graphs must agree on the +//! final person-slug set. +//! +//! `parity_matrix.rs` already covers SINGLE-verb CLI-local-vs-HTTP-server +//! parity; this adds the missing pieces: the **embedded-SDK arm** and a +//! **multi-op sequence**. The CLI wraps the same engine, so this verifies the +//! CLI transport layer (arg parsing, `--json` serialization, `--store` +//! addressing, `--as` actor resolution) faithfully reflects the embedded engine +//! across a realistic sequence — it is NOT a second engine implementation. +//! +//! Oracle is format-tolerant (embedded `to_rust_json` and CLI `--json` differ in +//! wrapping): compare the SET of `smoke-*` slug strings recursively extracted +//! from each arm's output. + +mod support; + +use std::collections::BTreeSet; + +use omnigraph::db::{Omnigraph, ReadTarget}; +use omnigraph::loader::{LoadMode, load_jsonl}; +use omnigraph_compiler::ir::ParamMap; + +const SCHEMA: &str = "node Person { slug: String @key name: String }\nedge Knows: Person -> Person\n"; +const ACTOR: &str = "smoke-actor"; +const FINAL_Q: &str = "query q() { match { $p: Person } return { $p.slug } }"; +const INSERT: &str = "query i() { insert Person { slug: \"smoke-new\", name: \"n\" } }"; +// smoke-8 has NO incident edges → a clean single-statement delete (no cascade, +// so it can't trip the RC-1 multi-statement-delete drift) — both arms succeed. +const DELETE: &str = "query d() { delete Person where slug = \"smoke-8\" }"; + +fn person(slug: &str) -> String { + format!("{{\"type\":\"Person\",\"data\":{{\"slug\":\"{slug}\",\"name\":\"n\"}}}}\n") +} +fn knows(a: &str, b: &str) -> String { + format!("{{\"edge\":\"Knows\",\"from\":\"{a}\",\"to\":\"{b}\",\"data\":{{}}}}\n") +} +fn batch1() -> String { + (0..5).map(|i| person(&format!("smoke-{i}"))).collect() +} +fn batch2() -> String { + let mut s: String = (5..10).map(|i| person(&format!("smoke-{i}"))).collect(); + s.push_str(&knows("smoke-0", "smoke-1")); + s.push_str(&knows("smoke-1", "smoke-2")); + s +} + +/// Recursively collect every string value beginning with `smoke-`. +fn smoke_slugs(v: &serde_json::Value) -> BTreeSet { + fn walk(v: &serde_json::Value, out: &mut BTreeSet) { + match v { + serde_json::Value::String(s) if s.starts_with("smoke-") => { + out.insert(s.clone()); + } + serde_json::Value::Array(a) => a.iter().for_each(|x| walk(x, out)), + serde_json::Value::Object(m) => m.values().for_each(|x| walk(x, out)), + _ => {} + } + } + let mut out = BTreeSet::new(); + walk(v, &mut out); + out +} + +/// Drive the sequence via the embedded engine; return the final slug set. +async fn embedded_arm(uri: &str) -> BTreeSet { + let db = Omnigraph::init(uri, SCHEMA).await.unwrap(); + load_jsonl(&db, &batch1(), LoadMode::Merge).await.unwrap(); + load_jsonl(&db, &batch2(), LoadMode::Merge).await.unwrap(); + db.mutate("main", INSERT, "i", &ParamMap::new()).await.unwrap(); + db.mutate("main", DELETE, "d", &ParamMap::new()).await.unwrap(); + let res = db + .query(ReadTarget::branch("main"), FINAL_Q, "q", &ParamMap::new()) + .await + .unwrap(); + smoke_slugs(&res.to_rust_json()) +} + +/// Drive the SAME sequence via the `omnigraph` CLI subprocess (sync — assert_cmd). +fn cli_arm(uri: &str, schema_path: &str, b1: &str, b2: &str) -> BTreeSet { + support::cli() + .args(["init", "--schema", schema_path, uri]) + .assert() + .success(); + support::cli() + .args(["load", "--mode", "merge", "--data", b1, uri]) + .assert() + .success(); + support::cli() + .args(["load", "--mode", "merge", "--data", b2, uri]) + .assert() + .success(); + support::cli() + .args(["mutate", "-e", INSERT, "--store", uri, "--as", ACTOR]) + .assert() + .success(); + support::cli() + .args(["mutate", "-e", DELETE, "--store", uri, "--as", ACTOR]) + .assert() + .success(); + let out = support::cli() + .args(["query", "-e", FINAL_Q, "--json", "--store", uri, "--as", ACTOR]) + .output() + .expect("cli query spawn"); + assert!(out.status.success(), "cli query failed: {out:?}"); + let v: serde_json::Value = + serde_json::from_slice(&out.stdout).expect("cli --json query output"); + smoke_slugs(&v) +} + +#[tokio::test(flavor = "multi_thread")] +async fn embedded_and_cli_agree_on_dst_sequence() { + let dir = tempfile::tempdir().unwrap(); + let schema_path = dir.path().join("smoke.pg"); + std::fs::write(&schema_path, SCHEMA).unwrap(); + let b1 = dir.path().join("b1.jsonl"); + std::fs::write(&b1, batch1()).unwrap(); + let b2 = dir.path().join("b2.jsonl"); + std::fs::write(&b2, batch2()).unwrap(); + let emb_uri = dir.path().join("emb.omni"); + let cli_uri = dir.path().join("cli.omni"); + + // Embedded arm (in-process async). + let emb = embedded_arm(emb_uri.to_str().unwrap()).await; + + // CLI arm (subprocess; assert_cmd is sync → run on a blocking thread). + let (cli_uri_s, schema_s, b1_s, b2_s) = ( + cli_uri.to_str().unwrap().to_string(), + schema_path.to_str().unwrap().to_string(), + b1.to_str().unwrap().to_string(), + b2.to_str().unwrap().to_string(), + ); + let cli = tokio::task::spawn_blocking(move || cli_arm(&cli_uri_s, &schema_s, &b1_s, &b2_s)) + .await + .unwrap(); + + assert_eq!( + emb, cli, + "embedded and CLI diverged on the final person-slug set" + ); + + // Sanity: the sequence produced the expected set (0..9 + new − 8). + let expected: BTreeSet = (0..10) + .filter(|i| *i != 8) + .map(|i| format!("smoke-{i}")) + .chain(["smoke-new".to_string()]) + .collect(); + assert_eq!(emb, expected, "unexpected final state: {emb:?}"); +} diff --git a/crates/omnigraph-dst/Cargo.toml b/crates/omnigraph-dst/Cargo.toml new file mode 100644 index 00000000..eb769192 --- /dev/null +++ b/crates/omnigraph-dst/Cargo.toml @@ -0,0 +1,17 @@ +[package] +name = "omnigraph-dst" +version = "0.7.2" +edition = "2024" +license = "MIT" +description = "Deterministic-simulation (DST) test harness for Omnigraph — shared op alphabet, reference model, and a Backend trait driven across the embedded SDK and the CLI." +publish = false + +[dependencies] +omnigraph = { package = "omnigraph-engine", path = "../omnigraph", version = "0.7.2" } +omnigraph-compiler = { path = "../omnigraph-compiler", version = "0.7.2" } +tokio = { workspace = true } +async-trait = { workspace = true } +serde_json = { workspace = true } +arrow-array = { workspace = true } +futures = { workspace = true } +tempfile = { workspace = true } diff --git a/crates/omnigraph-dst/src/backend.rs b/crates/omnigraph-dst/src/backend.rs new file mode 100644 index 00000000..4ae340be --- /dev/null +++ b/crates/omnigraph-dst/src/backend.rs @@ -0,0 +1,262 @@ +//! The `Backend` abstraction that lets the seeded walk drive any execution +//! context (embedded SDK, CLI subprocess) through one op surface, plus the two +//! concrete backends. +//! +//! `Backend` is **black-box** by construction: `load`/`mutate`/`query`/ +//! `optimize`/`repair` plus a normalized `query` row shape — no engine handle. +//! That is the only surface a non-embedded context (the CLI) can satisfy. The +//! white-box invariant battery (`invariants.rs`) needs the real `Omnigraph` +//! handle, so it stays embedded-only via `Embedded::db()`; a cross-backend walk +//! runs the black-box oracles only. + +use std::io::Write; +use std::path::PathBuf; +use std::process::Command; + +use async_trait::async_trait; +use omnigraph::db::{Omnigraph, ReadTarget, RepairOptions}; +use omnigraph::error::OmniError; +use omnigraph::loader::{LoadMode, load_jsonl}; +use omnigraph_compiler::ir::ParamMap; +use serde_json::Value; + +use crate::op::SCHEMA; + +/// An error from any backend op. Embedded ops carry the raw `OmniError` +/// (variant preserved for structured classification); CLI ops carry the exit +/// code + stderr (which surfaces the same engine error text out-of-process). +#[derive(Debug)] +pub enum BackendError { + Engine(OmniError), + Cli { code: Option, stderr: String }, +} + +impl BackendError { + pub fn message(&self) -> String { + match self { + BackendError::Engine(e) => e.to_string(), + BackendError::Cli { code, stderr } => format!("cli exit {code:?}: {stderr}"), + } + } +} + +/// All `query`/`mutate` go through a single query named `q`, so the embedded +/// backend can name it and the CLI can infer it from a single-query `-e` source. +const QNAME: &str = "q"; + +/// The black-box op surface the walk drives. Every gq passed to `mutate`/`query` +/// must contain exactly one query named `q` (see `QNAME`). `query` returns +/// NORMALIZED rows: embedded via `to_rust_json`, CLI via parsed `--json`. +#[async_trait] +pub trait Backend: Send + Sync { + /// Bulk-load JSONL in merge (upsert) mode — the only mode the walk uses. + async fn load(&self, jsonl: &str) -> Result<(), BackendError>; + /// Run a single-statement write (`query q() { insert|update|delete … }`). + async fn mutate(&self, branch: &str, gq: &str) -> Result<(), BackendError>; + /// Run a read (`query q() { match … return … }`) → normalized rows. + async fn query(&self, branch: &str, gq: &str) -> Result, BackendError>; + async fn optimize(&self) -> Result<(), BackendError>; + async fn repair(&self) -> Result<(), BackendError>; +} + +// ───────────────────────────── Embedded ───────────────────────────── + +/// The in-process engine backend. Wraps a real `Omnigraph` (optionally over a +/// `FaultAdapter`) and additionally exposes the handle via `db()` so the +/// white-box battery can run on top of it. +pub struct Embedded { + db: Omnigraph, +} + +impl Embedded { + /// A clean graph (no fault injection). + pub async fn open_clean(uri: &str) -> Self { + Embedded { + db: Omnigraph::init(uri, SCHEMA).await.expect("init"), + } + } + + /// A graph whose storage injects seeded manifest-layer faults (CAS-lost). + /// Created cleanly first, then reopened through the `FaultAdapter` so only + /// the op workload runs under faults. + pub async fn open_faulted(uri: &str, seed: u64, cas_pct: u8) -> Self { + Omnigraph::init(uri, SCHEMA).await.expect("init"); + let base = omnigraph::storage::storage_for_uri(uri).expect("storage_for_uri"); + let faulted = crate::fault::FaultAdapter::new(base, seed, cas_pct); + Embedded { + db: Omnigraph::open_with_storage(uri, faulted) + .await + .expect("open_with_storage"), + } + } + + /// Reopen an EXISTING graph from disk (runs the open-time recovery sweep). + /// Backs the reopen==pre_state durability oracle. + pub async fn reopen(uri: &str) -> Self { + Embedded { + db: Omnigraph::open(uri).await.expect("reopen"), + } + } + + /// White-box access to the underlying handle (embedded-only; the invariant + /// battery reaches `Snapshot::open`→`Dataset`, `_rowid` scans through it). + pub fn db(&self) -> &Omnigraph { + &self.db + } +} + +#[async_trait] +impl Backend for Embedded { + async fn load(&self, jsonl: &str) -> Result<(), BackendError> { + load_jsonl(&self.db, jsonl, LoadMode::Merge) + .await + .map(|_| ()) + .map_err(BackendError::Engine) + } + async fn mutate(&self, branch: &str, gq: &str) -> Result<(), BackendError> { + self.db + .mutate(branch, gq, QNAME, &ParamMap::new()) + .await + .map(|_| ()) + .map_err(BackendError::Engine) + } + async fn query(&self, branch: &str, gq: &str) -> Result, BackendError> { + let res = self + .db + .query(ReadTarget::branch(branch), gq, QNAME, &ParamMap::new()) + .await + .map_err(BackendError::Engine)?; + let json = res.to_rust_json(); + Ok(json.as_array().cloned().unwrap_or_default()) + } + async fn optimize(&self) -> Result<(), BackendError> { + self.db.optimize().await.map(|_| ()).map_err(BackendError::Engine) + } + async fn repair(&self) -> Result<(), BackendError> { + // confirm (not force): heals VERIFIED maintenance drift, leaves + // suspicious/semantic drift (RC-1's) for head_eq_manifest to catch. + let opts = RepairOptions { + confirm: true, + force: false, + }; + self.db.repair(opts).await.map(|_| ()).map_err(BackendError::Engine) + } +} + +// ─────────────────────────────── Cli ──────────────────────────────── + +/// The CLI subprocess backend. Shells out to a built `omnigraph` binary against +/// a local `--store` graph. The binary path is injected by the consumer (e.g. +/// `env!("CARGO_BIN_EXE_omnigraph")`) — no `assert_cmd` in this library. +pub struct Cli { + bin: PathBuf, + store_uri: String, + actor: String, +} + +impl Cli { + pub fn new(bin: PathBuf, store_uri: impl Into) -> Self { + Cli { + bin, + store_uri: store_uri.into(), + actor: "act-dst".to_string(), + } + } + + fn io_err(e: std::io::Error) -> BackendError { + BackendError::Cli { + code: None, + stderr: e.to_string(), + } + } + + /// Run the binary, returning its `Output` (does NOT check the exit status). + fn run(&self, args: &[&str]) -> Result { + Command::new(&self.bin).args(args).output().map_err(|e| BackendError::Cli { + code: None, + stderr: format!("spawn {}: {e}", self.bin.display()), + }) + } + + /// Run and require success, else map to a `Cli` error with stderr. + fn run_ok(&self, args: &[&str]) -> Result { + let out = self.run(args)?; + if !out.status.success() { + return Err(BackendError::Cli { + code: out.status.code(), + stderr: String::from_utf8_lossy(&out.stderr).into_owned(), + }); + } + Ok(out) + } + + fn temp(&self, suffix: &str, contents: &str) -> Result { + let mut f = tempfile::Builder::new() + .suffix(suffix) + .tempfile() + .map_err(Self::io_err)?; + f.write_all(contents.as_bytes()).map_err(Self::io_err)?; + f.flush().map_err(Self::io_err)?; + Ok(f) + } + + /// `init --schema ` — call once before the walk. + pub async fn init(&self) -> Result<(), BackendError> { + let f = self.temp(".pg", SCHEMA)?; + let path = f.path().to_string_lossy().into_owned(); + self.run_ok(&["init", "--schema", &path, &self.store_uri]).map(|_| ()) + } +} + +#[async_trait] +impl Backend for Cli { + async fn load(&self, jsonl: &str) -> Result<(), BackendError> { + let f = self.temp(".jsonl", jsonl)?; + let path = f.path().to_string_lossy().into_owned(); + self.run_ok(&[ + "load", "--data", &path, "--mode", "merge", "--store", &self.store_uri, "--as", + &self.actor, + ]) + .map(|_| ()) + } + async fn mutate(&self, branch: &str, gq: &str) -> Result<(), BackendError> { + // Honor `branch` (the CLI infers the single query from -e), so this + // backend matches the Embedded contract instead of silently pinning main. + self.run_ok(&[ + "mutate", "-e", gq, "--branch", branch, "--store", &self.store_uri, "--as", + &self.actor, + ]) + .map(|_| ()) + } + async fn query(&self, branch: &str, gq: &str) -> Result, BackendError> { + let out = self.run_ok(&[ + "query", "-e", gq, "--json", "--branch", branch, "--store", &self.store_uri, "--as", + &self.actor, + ])?; + let stdout = String::from_utf8_lossy(&out.stdout); + let trimmed = stdout.trim(); + if trimmed.is_empty() { + return Ok(Vec::new()); + } + let v: Value = serde_json::from_str(trimmed).map_err(|e| BackendError::Cli { + code: out.status.code(), + stderr: format!("json parse: {e}: {stdout}"), + })?; + let rows = match &v { + Value::Array(a) => a.clone(), + Value::Object(o) => o + .get("rows") + .and_then(|r| r.as_array()) + .cloned() + .unwrap_or_else(|| vec![v.clone()]), + _ => Vec::new(), + }; + Ok(rows) + } + async fn optimize(&self) -> Result<(), BackendError> { + self.run_ok(&["optimize", "--store", &self.store_uri]).map(|_| ()) + } + async fn repair(&self) -> Result<(), BackendError> { + self.run_ok(&["repair", "--confirm", "--store", &self.store_uri]).map(|_| ()) + } +} diff --git a/crates/omnigraph-dst/src/fault.rs b/crates/omnigraph-dst/src/fault.rs new file mode 100644 index 00000000..5681952d --- /dev/null +++ b/crates/omnigraph-dst/src/fault.rs @@ -0,0 +1,100 @@ +//! A fault-injecting `StorageAdapter` wrapper (SlateDB-style). Wrap the base +//! adapter and inject seeded faults on the storage-adapter conditional-write +//! seam, then open the graph via `Omnigraph::open_with_storage`. +//! +//! The fault is a spurious CAS-lost on `write_text_if_match` — the +//! StorageAdapter conditional TEXT-OBJECT write (recovery sidecars, schema +//! staging, cluster state). SCOPE NOTE: this is **not** the Lance +//! manifest-publish CAS — graph data commits publish `__manifest` through Lance +//! `MergeInsertBuilder`/`ManifestBatchPublisher`, which bypasses this adapter, +//! so a pure-data op walk exercises few/no `write_text_if_match` calls and the +//! manifest-publish fault path is covered SEPARATELY by the failpoint recovery +//! cells (`dst_recovery`), not here. Where a conditional write IS lost, the +//! engine must surface/retry it (never silently drop it); `check_counts` +//! (count==model) catches a swallowed loss as a NOVEL violation. + +use std::sync::{Arc, Mutex}; + +use async_trait::async_trait; +use omnigraph::error::Result; +use omnigraph::storage::StorageAdapter; + +#[derive(Debug)] +pub struct FaultAdapter { + inner: Arc, + cas_conflict_pct: u8, + rng: Mutex, +} + +impl FaultAdapter { + /// Wrap `inner` (e.g. `storage_for_uri(uri)?`). `cas_conflict_pct` is the + /// percentage of conditional writes that spuriously report CAS-lost. + pub fn new( + inner: Arc, + seed: u64, + cas_conflict_pct: u8, + ) -> Arc { + Arc::new(Self { + inner, + cas_conflict_pct, + rng: Mutex::new(seed ^ 0x9E37_79B9_7F4A_7C15), + }) + } + + fn roll(&self, pct: u8) -> bool { + let mut g = self.rng.lock().unwrap(); + let mut x = *g; + x ^= x >> 12; + x ^= x << 25; + x ^= x >> 27; + *g = x; + (x.wrapping_mul(0x2545_F491_4F6C_DD1D) % 100) < pct as u64 + } +} + +#[async_trait] +impl StorageAdapter for FaultAdapter { + async fn read_text(&self, uri: &str) -> Result { + self.inner.read_text(uri).await + } + async fn write_text(&self, uri: &str, contents: &str) -> Result<()> { + self.inner.write_text(uri, contents).await + } + async fn write_text_if_absent(&self, uri: &str, contents: &str) -> Result { + self.inner.write_text_if_absent(uri, contents).await + } + async fn exists(&self, uri: &str) -> Result { + self.inner.exists(uri).await + } + async fn rename_text(&self, from_uri: &str, to_uri: &str) -> Result<()> { + self.inner.rename_text(from_uri, to_uri).await + } + async fn delete(&self, uri: &str) -> Result<()> { + self.inner.delete(uri).await + } + async fn list_dir(&self, dir_uri: &str) -> Result> { + self.inner.list_dir(dir_uri).await + } + async fn read_text_versioned(&self, uri: &str) -> Result<(String, String)> { + self.inner.read_text_versioned(uri).await + } + async fn write_text_if_match( + &self, + uri: &str, + contents: &str, + expected_version: &str, + ) -> Result> { + // Inject a spurious CAS-lost (precondition failed) — a concurrent-writer + // illusion. The engine must surface/retry this; a swallowed loss is + // caught downstream by count==model. + if self.roll(self.cas_conflict_pct) { + return Ok(None); + } + self.inner + .write_text_if_match(uri, contents, expected_version) + .await + } + async fn delete_prefix(&self, prefix_uri: &str) -> Result<()> { + self.inner.delete_prefix(prefix_uri).await + } +} diff --git a/crates/omnigraph-dst/src/invariants.rs b/crates/omnigraph-dst/src/invariants.rs new file mode 100644 index 00000000..8da44efc --- /dev/null +++ b/crates/omnigraph-dst/src/invariants.rs @@ -0,0 +1,281 @@ +//! D4: the white-box invariant battery + the structured known/novel classifier. +//! +//! A `Finding` is either an `Engine(OmniError)` (an op/query returned an engine +//! error) or a `Logical(String)` (a harness-computed structural mismatch: +//! HEAD!=manifest, count!=model, orphan edge). Classification is structured — +//! NOT free-text substring matching over arbitrary messages: +//! * Engine errors are allow-listed only for the two known bugs, each gated on +//! a narrow signal (RC-1 on the `OmniError::Manifest` variant; RC-X on +//! Lance's specific internal string). +//! * Logical findings are NEVER allow-listed except the dup-`@key` marker — a +//! structural divergence is otherwise always a real finding. +//! +//! The white-box checks take the real `Omnigraph` handle (via `Embedded::db()`), +//! so they are embedded-only; `run_battery` therefore takes `&Embedded`. A +//! cross-backend walk runs the black-box `model::check_*` oracles instead. + +use std::collections::HashSet; + +use arrow_array::{RecordBatch, UInt64Array}; +use futures::TryStreamExt; +use omnigraph::db::{Omnigraph, ReadTarget}; +use omnigraph::error::OmniError; +use omnigraph_compiler::ir::ParamMap; + +use crate::backend::{BackendError, Embedded}; +use crate::model::Model; + +#[derive(Debug)] +pub enum Finding { + /// An engine-returned error (variant preserved for structured classification). + Engine(OmniError), + /// A harness-computed structural violation — always novel. + Logical(String), +} + +impl Finding { + pub fn message(&self) -> String { + match self { + Finding::Engine(e) => e.to_string(), + Finding::Logical(s) => s.clone(), + } + } +} + +/// Shared engine-error classifier core: known-bug match on a (variant, message) +/// pair. `is_manifest` gates RC-1 to the `OmniError::Manifest` variant when the +/// variant is known (embedded); the CLI path passes the message only. +fn classify_engine_signal(is_manifest: bool, s: &str) -> Option<&'static str> { + // RC-1: edge-table HEAD/manifest drift from the node-delete cascade. Two + // surfaces of the same root: the write-time CAS "stale view", and a LATER + // write's precondition refusing the uncovered drift ("ahead of manifest …"). + if is_manifest + && (s.contains("stale view") + || (s.contains("expected") && s.contains("current")) + || s.contains("ahead of manifest version")) + { + return Some("RC-1 stale-view"); + } + // RC-X / Lance #7230: scalar-BTREE duplicate row addresses. The Lance + // internal panic string is highly specific (near-zero false positive). + if s.contains("from_sorted_iter") || s.contains("non-sorted input") { + return Some("RC-X/#7230 scalar-BTREE"); + } + None +} + +/// Returns `Some(known-bug-name)` if the finding matches a known open bug we +/// allow-list (so the walk explores past it), else `None` (= NOVEL → fail). +pub fn classify(f: &Finding) -> Option<&'static str> { + match f { + Finding::Engine(e) => classify_engine_signal(matches!(e, OmniError::Manifest(_)), &e.to_string()), + // Structural divergences are novel by default, with ONE narrow + // exception: dup-`@key` (MR-714) is a known open bug. + Finding::Logical(s) => { + if s.contains("dup-@key") { + Some("dup-@key MR-714") + } else { + None + } + } + } +} + +/// Classify a backend op error the same KNOWN-vs-NOVEL way as `classify`. The +/// embedded arm keeps the `OmniError::Manifest` variant gate; the CLI arm has +/// only stderr text, so it matches the (distinctive) RC-1/RC-X/dup-@key strings +/// directly — acceptable given how specific those messages are. +pub fn classify_backend(e: &BackendError) -> Option<&'static str> { + match e { + BackendError::Engine(oe) => { + classify_engine_signal(matches!(oe, OmniError::Manifest(_)), &oe.to_string()) + } + BackendError::Cli { stderr, .. } => { + // Out-of-process there's no OmniError variant to gate on, so gate RC-1 + // on its DISTINCTIVE strings only — NOT the generic "expected"+"current" + // that `classify_engine_signal` allows under is_manifest, which could + // appear in unrelated CLI errors and mask a novel finding. + if stderr.contains("stale view") || stderr.contains("ahead of manifest version") { + Some("RC-1 stale-view") + } else if stderr.contains("from_sorted_iter") || stderr.contains("non-sorted input") { + Some("RC-X/#7230 scalar-BTREE") + } else if stderr.contains("dup-@key") { + Some("dup-@key MR-714") + } else { + None + } + } + } +} + +/// Extract a readable message from a caught panic payload. +pub fn panic_message(p: &(dyn std::any::Any + Send)) -> String { + if let Some(s) = p.downcast_ref::<&str>() { + (*s).to_string() + } else if let Some(s) = p.downcast_ref::() { + s.clone() + } else { + "".to_string() + } +} + +/// Classify a caught panic (a Lance-internal `unwrap`/index crash that unwinds +/// through the engine) the same KNOWN-vs-NOVEL way as `classify`. Under fault +/// injection and at walk depth the substrate WILL crash, so the harness must +/// treat a panic as a finding, not let it abort the suite. NONE → re-raise. +/// +/// Note on `index out of bounds`: in THIS harness the only source of that panic +/// is Lance's inverted (FTS) index builder — the harness's own code uses checked +/// loops and returns `Logical` findings, never an OOB panic — so the broad match +/// is safe here and would not mask a harness bug. +pub fn classify_panic(msg: &str) -> Option<&'static str> { + if msg.contains("from_sorted_iter") || msg.contains("non-sorted input") { + return Some("RC-X/#7230 scalar-BTREE"); + } + if msg.contains("index out of bounds") { + return Some("Lance FTS inverted-builder OOB"); + } + None +} + +// ── INVARIANT #1: Lance HEAD == manifest table version, per table ── +pub async fn head_eq_manifest(db: &Omnigraph) -> Result<(), Finding> { + let snap = db + .snapshot_of(ReadTarget::branch("main")) + .await + .map_err(Finding::Engine)?; + for entry in snap.entries() { + let ds = snap.open(&entry.table_key).await.map_err(Finding::Engine)?; + let lance_head = ds.version().version; + if lance_head != entry.table_version { + return Err(Finding::Logical(format!( + "HEAD!=manifest on {}: lance_head={lance_head} manifest_pin={}", + entry.table_key, entry.table_version + ))); + } + } + Ok(()) +} + +// ── INVARIANT #2: Lance Dataset::validate() per table (general corruption) ── +pub async fn dataset_validate(db: &Omnigraph) -> Result<(), Finding> { + let snap = db + .snapshot_of(ReadTarget::branch("main")) + .await + .map_err(Finding::Engine)?; + for entry in snap.entries() { + let ds = snap.open(&entry.table_key).await.map_err(Finding::Engine)?; + // Dataset::validate returns a Lance error; a validation failure IS a + // structural corruption finding (always novel). + ds.validate() + .await + .map_err(|e| Finding::Logical(format!("Dataset::validate {}: {e}", entry.table_key)))?; + } + Ok(()) +} + +// ── INVARIANT #3a: no duplicate LIVE stable row-id within a table ── +// A stable row id uniquely identifies one live row for the dataset's lifetime; +// the same id appearing on two live rows is exactly the duplicate-row-address +// corruption class behind RC-X / Lance #7230. We read the truth deletion-vector- +// correctly by scanning each table with `with_row_id()` (Lance's `_rowid` +// projection returns only LIVE rows, so a tombstoned id masked by an UPDATE or +// compaction never counts) and asserting the live `_rowid`s are unique. Unlike +// `index_probe` (which only surfaces a bad scalar-BTREE page when a filtered +// READ loads it), this is a direct structural check on every committed row. +pub async fn no_duplicate_live_row_ids(db: &Omnigraph) -> Result<(), Finding> { + let snap = db + .snapshot_of(ReadTarget::branch("main")) + .await + .map_err(Finding::Engine)?; + for entry in snap.entries() { + let ds = snap.open(&entry.table_key).await.map_err(Finding::Engine)?; + let mut scanner = ds.scan(); + scanner.with_row_id(); + let batches: Vec = scanner + .try_into_stream() + .await + .map_err(|e| Finding::Logical(format!("scan {}: {e}", entry.table_key)))? + .try_collect() + .await + .map_err(|e| Finding::Logical(format!("scan collect {}: {e}", entry.table_key)))?; + let mut seen: HashSet = HashSet::new(); + for batch in &batches { + let col = batch + .column_by_name("_rowid") + .ok_or_else(|| Finding::Logical(format!("no _rowid column on {}", entry.table_key)))? + .as_any() + .downcast_ref::() + .ok_or_else(|| Finding::Logical(format!("_rowid not u64 on {}", entry.table_key)))?; + for i in 0..col.len() { + let id = col.value(i); + if !seen.insert(id) { + return Err(Finding::Logical(format!( + "duplicate live stable row-id {id} in {} (RC-X-class duplicate row address)", + entry.table_key + ))); + } + } + } + } + Ok(()) +} + +// ── INVARIANT #3: scalar-index probe (catches RC-X at creation) ── +// Force-loads the Doc.source BTREE flat pages by filtering each enum value. +pub async fn index_probe(db: &Omnigraph) -> Result<(), Finding> { + for src in ["whatsapp", "email", "linkedin", "slack", "telegram"] { + let q = format!("query w() {{ match {{ $d: Doc {{ source: \"{src}\" }} }} return {{ $d.slug }} }}"); + db.query(ReadTarget::branch("main"), &q, "w", &ParamMap::new()) + .await + .map_err(Finding::Engine)?; + } + Ok(()) +} + +// ── @key uniqueness (no sequential model — for the concurrent oracle) ── +// Every `@key` value may appear on at most one live row. Concurrent same-key +// upserts that produce duplicates are MR-714 (dup-`@key`); `classify` allow- +// lists the `dup-@key` marker as that known bug. +pub async fn no_duplicate_keys(db: &Omnigraph, ty: &str, branch: &str) -> Result<(), Finding> { + let q = format!("query q() {{ match {{ $x: {ty} }} return {{ $x.slug }} }}"); + let res = db + .query(ReadTarget::branch(branch), &q, "q", &ParamMap::new()) + .await + .map_err(Finding::Engine)?; + let json = res.to_rust_json(); + let rows = json + .as_array() + .ok_or_else(|| Finding::Logical(format!("{ty} key scan not an array")))?; + let total = rows.len(); + let mut seen: HashSet = HashSet::new(); + for row in rows { + let slug = row["x.slug"] + .as_str() + .ok_or_else(|| Finding::Logical(format!("missing x.slug in {ty}")))?; + if !seen.insert(slug.to_string()) { + return Err(Finding::Logical(format!( + "duplicate @key {slug:?} in {ty} ({total} rows, {} distinct) — dup-@key", + seen.len() + ))); + } + } + Ok(()) +} + +/// The full battery as a registry: `(name, result)` per invariant. Adding an +/// invariant is one line here; the walk iterates and the coverage map records. +/// White-box checks use `emb.db()`; the count/content/edges oracles run through +/// the same embedded backend (so they share the handle's view). +pub async fn run_battery(emb: &Embedded, model: &Model) -> Vec<(&'static str, Result<(), Finding>)> { + let db = emb.db(); + vec![ + ("head==manifest", head_eq_manifest(db).await), + ("dataset.validate", dataset_validate(db).await), + ("row-id-unique", no_duplicate_live_row_ids(db).await), + ("index-probe", index_probe(db).await), + ("count==model", crate::model::check_counts(emb, model).await), + ("content==model", crate::model::check_content(emb, model).await), + ("edges==model", crate::model::check_edges(emb, model).await), + ] +} diff --git a/crates/omnigraph-dst/src/lib.rs b/crates/omnigraph-dst/src/lib.rs new file mode 100644 index 00000000..96fc00ed --- /dev/null +++ b/crates/omnigraph-dst/src/lib.rs @@ -0,0 +1,29 @@ +//! Deterministic-simulation (DST) test harness for Omnigraph. +//! +//! Shared, dev-only library holding the op alphabet, the reference model, the +//! white-box invariant battery, and a `Backend` trait so the SAME generative +//! walk can run against the embedded `Omnigraph` SDK and the `omnigraph` CLI +//! subprocess. Consumed as a `[dev-dependencies]` by both `omnigraph-engine` +//! (white-box walks + regressions) and `omnigraph-cli` (cross-backend walk). +//! +//! Not a default-member; never built in the normal `cargo build`. +//! +//! Consumers: +//! * `omnigraph-engine` `tests/dst.rs` drives `Embedded` + the white-box +//! battery (the harness's core value — embedded-only by construction). +//! * `omnigraph-cli` `tests/cli_cross_backend_walk.rs` drives the SAME seeded +//! walk against `Embedded` AND `Cli`, asserting per-step black-box agreement. + +pub mod backend; +pub mod fault; +pub mod invariants; +pub mod model; +pub mod op; + +pub use backend::{Backend, BackendError, Cli, Embedded}; +pub use fault::FaultAdapter; +pub use invariants::{ + classify, classify_backend, classify_panic, panic_message, run_battery, Finding, +}; +pub use model::Model; +pub use op::{step, OpKind, Rng}; diff --git a/crates/omnigraph-dst/src/model.rs b/crates/omnigraph-dst/src/model.rs new file mode 100644 index 00000000..16dacd9f --- /dev/null +++ b/crates/omnigraph-dst/src/model.rs @@ -0,0 +1,265 @@ +//! Reference model (D4 oracle source of truth) + the count/content/edges==model +//! oracles. +//! +//! Tracks the SET of live Person/Doc keys (and Knows edges) the harness believes +//! should exist. Updated only AFTER an op succeeds, so a failed op (RC-1 +//! stale-view, or a FaultAdapter-injected CAS loss) leaves the model consistent +//! with reality. A divergence means a lost write (countmodel) — both real bugs. +//! +//! The count/content/edge-traversal oracles are **black-box** (`Backend::query`) +//! so they run against any backend. `check_edges` additionally reads the raw +//! `edge:Knows` row count white-box (embedded-only) to catch orphan edges a +//! traversal can't see. + +use std::collections::{HashMap, HashSet}; + +use omnigraph::db::ReadTarget; + +use crate::backend::{Backend, BackendError, Embedded}; +use crate::invariants::Finding; + +#[derive(Default)] +pub struct Model { + persons: HashSet, + docs: HashSet, + /// Expected `body` value per live Doc id — the source of truth for the + /// content==model oracle (a lost UPDATE shows up here even when counts match). + doc_body: HashMap, + /// Live `Knows` edges as `from -> to` (the map enforces the schema's + /// `@card(0..1)`: a `from` has at most one outgoing edge). Every endpoint is + /// a live Person by construction — `del_person` cascades both directions, so + /// the model never holds an orphan and the engine producing one is a finding. + knows: HashMap, + next: usize, +} + +impl Model { + pub fn new() -> Self { + Self::default() + } + /// A never-before-used id (so generated inserts never collide). + pub fn fresh_id(&mut self) -> usize { + let i = self.next; + self.next += 1; + i + } + /// Upper bound for picking an existing-ish id for delete/update targets. + pub fn id_high(&self) -> usize { + self.next.max(1) + } + pub fn add_person(&mut self, id: usize) { + self.persons.insert(id); + } + pub fn add_doc(&mut self, id: usize, body: String) { + self.docs.insert(id); + self.doc_body.insert(id, body); + } + /// Record an UPDATE's new body — only for a Doc the model believes exists, + /// so a no-op update (0 rows matched) doesn't desync the model. + pub fn update_doc_body(&mut self, id: usize, body: String) { + if self.docs.contains(&id) { + self.doc_body.insert(id, body); + } + } + pub fn del_person(&mut self, id: usize) { + self.persons.remove(&id); + // Node delete cascades to incident edges, BOTH directions — mirror it so + // the model never references a deleted Person (this is the RC-1 surface). + self.knows.remove(&id); + self.knows.retain(|_, &mut to| to != id); + } + pub fn persons(&self) -> usize { + self.persons.len() + } + pub fn docs(&self) -> usize { + self.docs.len() + } + pub fn doc_bodies(&self) -> &HashMap { + &self.doc_body + } + + // ── edges ── + /// Live person ids (for picking an edge endpoint). + pub fn persons_vec(&self) -> Vec { + self.persons.iter().copied().collect() + } + /// Persons with no outgoing `Knows` — the only legal `from` for a new edge + /// under `@card(0..1)`, so every generated InsertKnows is a valid op. + pub fn free_froms(&self) -> Vec { + self.persons + .iter() + .copied() + .filter(|p| !self.knows.contains_key(p)) + .collect() + } + /// Persons that currently have an outgoing edge (legal DeleteKnows targets). + pub fn knows_froms(&self) -> Vec { + self.knows.keys().copied().collect() + } + pub fn add_edge(&mut self, from: usize, to: usize) { + self.knows.insert(from, to); + } + pub fn del_edge(&mut self, from: usize) { + self.knows.remove(&from); + } + pub fn edges(&self) -> usize { + self.knows.len() + } +} + +/// Map a backend op error encountered mid-oracle into a `Finding` (preserving +/// the engine variant for the embedded case; CLI errors become a loud Logical). +fn finding_from_backend(e: BackendError) -> Finding { + match e { + BackendError::Engine(oe) => Finding::Engine(oe), + BackendError::Cli { code, stderr } => { + Finding::Logical(format!("cli oracle query failed (exit {code:?}): {stderr}")) + } + } +} + +/// Extract a row field by key SUFFIX, so the same parse works across backends +/// (embedded `to_rust_json` yields `"d.slug"`; CLI `--json` may yield `"slug"`). +fn field<'a>(row: &'a serde_json::Value, suffix: &str) -> Option<&'a str> { + row.as_object()? + .iter() + .find(|(k, _)| k.ends_with(suffix)) + .and_then(|(_, v)| v.as_str()) +} + +async fn count(b: &B, ty: &str) -> Result { + let q = format!("query q() {{ match {{ $x: {ty} }} return {{ $x.slug }} }}"); + b.query("main", &q) + .await + .map(|rows| rows.len()) + .map_err(finding_from_backend) +} + +/// count==model: live row counts must equal the model. A divergence is a +/// structural (Logical) finding — lost-write (countmodel — MR-714). +pub async fn check_counts(b: &B, model: &Model) -> Result<(), Finding> { + let p = count(b, "Person").await?; + if p != model.persons() { + // NB: deliberately NOT carrying the `dup-@key` marker. `classify()` + // allow-lists any Logical finding containing it as the known MR-714 bug, + // which would mask a genuine LOST WRITE (count < model) — the exact case + // this oracle exists to catch. Real duplicate-`@key` detection lives in + // `check_content` (value-level) and `invariants::no_duplicate_keys`, + // both of which emit the marker only when a duplicate is actually found. + return Err(Finding::Logical(format!( + "count Person={p} != model={} (lost-write or unexpected duplicate row)", + model.persons() + ))); + } + let d = count(b, "Doc").await?; + if d != model.docs() { + return Err(Finding::Logical(format!( + "count Doc={d} != model={}", + model.docs() + ))); + } + Ok(()) +} + +/// content==model: every live Doc's `body` must equal the model's expected +/// value, and no `@key` may appear twice. A count check passes through a +/// lost UPDATE (the row is still there, just stale) or a silent dup-`@key` +/// (a value-level duplicate the row count would only catch if it changed the +/// total) — this is the oracle that catches both. Slugs are `g{id}`. +pub async fn check_content(b: &B, model: &Model) -> Result<(), Finding> { + let rows = b + .query( + "main", + "query q() { match { $d: Doc } return { $d.slug, $d.body } }", + ) + .await + .map_err(finding_from_backend)?; + + let mut seen: HashMap = HashMap::new(); + for row in &rows { + let slug = field(row, "slug") + .ok_or_else(|| Finding::Logical(format!("missing slug in {row}")))?; + let body = field(row, "body") + .ok_or_else(|| Finding::Logical(format!("missing body in {row}")))?; + let id: usize = slug + .strip_prefix('g') + .and_then(|s| s.parse().ok()) + .ok_or_else(|| Finding::Logical(format!("unexpected Doc slug {slug}")))?; + if let Some(prev) = seen.insert(id, body.to_string()) { + return Err(Finding::Logical(format!( + "duplicate Doc @key g{id} (dup-@key): {prev:?} and {body:?}" + ))); + } + } + + for (id, expected) in model.doc_bodies() { + match seen.get(id) { + None => { + return Err(Finding::Logical(format!( + "Doc g{id} missing from engine (model body {expected:?})" + ))); + } + Some(actual) if actual != expected => { + return Err(Finding::Logical(format!( + "Doc g{id} body mismatch: engine {actual:?} != model {expected:?} (lost update)" + ))); + } + _ => {} + } + } + Ok(()) +} + +/// The traversable-edge half of edges==model (black-box): the `$a knows $b` +/// traversal count (which only matches live endpoints) must equal the model's +/// live-edge count. Catches a lost edge write or an orphaned endpoint. +pub async fn check_edges_traversable(b: &B, model: &Model) -> Result<(), Finding> { + let rows = b + .query( + "main", + "query q() { match { $a: Person $a knows $b } return { $a.slug, $b.slug } }", + ) + .await + .map_err(finding_from_backend)?; + if rows.len() != model.edges() { + return Err(Finding::Logical(format!( + "traversable Knows edges={} != model={} (orphan endpoint / lost edge)", + rows.len(), + model.edges() + ))); + } + Ok(()) +} + +/// edges==model (referential integrity): two complementary counts must both +/// equal the model's live-edge count. The RAW `edge:Knows` row count (read +/// white-box via the snapshot, so it sees orphans too) catches a lost +/// node-delete cascade that strands an edge pointing at a deleted Person; the +/// TRAVERSAL count (`check_edges_traversable`) catches a lost edge write. Raw > +/// traversal means an orphan exists. White-box → embedded-only. +pub async fn check_edges(emb: &Embedded, model: &Model) -> Result<(), Finding> { + let db = emb.db(); + let snap = db + .snapshot_of(ReadTarget::branch("main")) + .await + .map_err(Finding::Engine)?; + let mut raw: usize = 0; + for entry in snap.entries() { + if entry.table_key == "edge:Knows" { + let ds = snap.open(&entry.table_key).await.map_err(Finding::Engine)?; + raw = ds + .count_rows(None) + .await + .map_err(|e| Finding::Logical(format!("edge:Knows count_rows: {e}")))?; + } + } + if raw != model.edges() { + return Err(Finding::Logical(format!( + "raw edge:Knows rows={raw} != model={} (orphan edge / lost cascade / dup edge)", + model.edges() + ))); + } + check_edges_traversable(emb, model).await +} diff --git a/crates/omnigraph-dst/src/op.rs b/crates/omnigraph-dst/src/op.rs new file mode 100644 index 00000000..1a4dfada --- /dev/null +++ b/crates/omnigraph-dst/src/op.rs @@ -0,0 +1,233 @@ +//! D1: the operation alphabet, as data. `step` picks an op, executes it against +//! a `Backend` (embedded or CLI), and updates the reference `Model` on success. +//! Returns the `OpKind` (for coverage) and the raw `BackendError` (for +//! structured classification — never stringified here). +//! +//! Every `mutate`/`query` gq names its single query `q` so the same string +//! drives the embedded backend (named query) and the CLI (single-query `-e`). + +use crate::backend::{Backend, BackendError}; +use crate::model::Model; + +/// One schema exercising both bug surfaces: Person+Knows(@card) for the +/// delete-cascade / stale-view path, and Doc.source (low-cardinality enum +/// @index → scalar BTREE) for the index-corruption path. +pub const SCHEMA: &str = r#" +node Person { + slug: String @key + name: String +} +node Doc { + slug: String @key + source: enum(whatsapp, email, linkedin, slack, telegram) @index + body: String +} +edge Knows: Person -> Person @card(0..1) +"#; + +/// Inline deterministic RNG (xorshift64*, no `rand` dep). +pub struct Rng(u64); +impl Rng { + pub fn new(seed: u64) -> Self { + Rng(seed ^ 0x9E37_79B9_7F4A_7C15) + } + fn next(&mut self) -> u64 { + let mut x = self.0; + x ^= x >> 12; + x ^= x << 25; + x ^= x >> 27; + self.0 = x; + x.wrapping_mul(0x2545_F491_4F6C_DD1D) + } + pub fn below(&mut self, n: usize) -> usize { + (self.next() % n as u64) as usize + } +} + +pub fn person(slug: &str) -> String { + format!("{{\"type\":\"Person\",\"data\":{{\"slug\":\"{slug}\",\"name\":\"n\"}}}}\n") +} +pub fn doc(slug: &str, source: &str) -> String { + format!("{{\"type\":\"Doc\",\"data\":{{\"slug\":\"{slug}\",\"source\":\"{source}\",\"body\":\"needle filler\"}}}}\n") +} +pub fn knows(from: &str, to: &str) -> String { + format!("{{\"edge\":\"Knows\",\"from\":\"{from}\",\"to\":\"{to}\",\"data\":{{}}}}\n") +} + +#[derive(Clone, Copy, PartialEq, Eq, Hash, Debug)] +pub enum OpKind { + InsertPerson, + InsertDoc, + Optimize, + DeletePerson, + UpdateDoc, + InsertKnows, + DeleteKnows, + Repair, + /// Drop + reopen the handle mid-walk (driven by the walk, not `step`): makes + /// the recovery sweep / coordinator re-resolution a first-class op, sampled + /// across varied table states instead of only at the end. + Reopen, + Read, +} + +impl OpKind { + pub const ALL: [OpKind; 10] = [ + OpKind::InsertPerson, + OpKind::InsertDoc, + OpKind::Optimize, + OpKind::DeletePerson, + OpKind::UpdateDoc, + OpKind::InsertKnows, + OpKind::DeleteKnows, + OpKind::Repair, + OpKind::Reopen, + OpKind::Read, + ]; +} + +/// Pick and run one op against `b`. The model is updated only on success. +pub async fn step( + b: &B, + rng: &mut Rng, + model: &mut Model, +) -> (OpKind, Result<(), BackendError>) { + match rng.below(9) { + 0 => { + let mut ids = Vec::new(); + let mut data = String::new(); + for _ in 0..(1 + rng.below(10)) { + let id = model.fresh_id(); + ids.push(id); + data.push_str(&person(&format!("g{id}"))); + } + let res = match b.load(&data).await { + Ok(_) => { + for id in ids { + model.add_person(id); + } + Ok(()) + } + Err(e) => Err(e), + }; + (OpKind::InsertPerson, res) + } + 1 => { + let mut ids = Vec::new(); + let mut data = String::new(); + for _ in 0..(1 + rng.below(10)) { + let id = model.fresh_id(); + ids.push(id); + let s = if rng.below(100) < 85 { "whatsapp" } else { "email" }; + data.push_str(&doc(&format!("g{id}"), s)); + } + let res = match b.load(&data).await { + Ok(_) => { + // `doc()` writes body "needle filler" — track it for content==model. + for id in ids { + model.add_doc(id, "needle filler".to_string()); + } + Ok(()) + } + Err(e) => Err(e), + }; + (OpKind::InsertDoc, res) + } + 2 => (OpKind::Optimize, b.optimize().await), + 3 => { + let id = rng.below(model.id_high()); + let q = format!("query q() {{ delete Person where slug = \"g{id}\" }}"); + let res = match b.mutate("main", &q).await { + Ok(_) => { + model.del_person(id); + Ok(()) + } + Err(e) => Err(e), + }; + (OpKind::DeletePerson, res) + } + 4 => { + // UPDATE moves the whole row → scalar-index remap (RC-X morphology). + let id = rng.below(model.id_high()); + let body = format!("u{id} needle"); + let q = format!("query q() {{ update Doc set {{ body: \"{body}\" }} where slug = \"g{id}\" }}"); + let res = match b.mutate("main", &q).await { + Ok(_) => { + // Only mutates the model for a Doc it believes exists, so a + // no-op update (0 rows matched) can't desync content==model. + model.update_doc_body(id, body); + Ok(()) + } + Err(e) => Err(e), + }; + (OpKind::UpdateDoc, res) + } + 5 => { + // InsertKnows — pick a `from` with no outgoing edge (so the @card(0..1) + // insert is always legal) and any live `to`; both endpoints exist, so + // the engine producing an orphan/dup is a finding, not a generated one. + let froms = model.free_froms(); + let persons = model.persons_vec(); + if froms.is_empty() || persons.is_empty() { + // No legal edge to add yet — a no-op (still counts for coverage). + (OpKind::InsertKnows, Ok(())) + } else { + let from = froms[rng.below(froms.len())]; + // Exclude self-loops: a Knows self-loop is committed to the edge + // table but is NOT returned by `$a knows $b` traversal (durable + // across optimize+reopen; the CSR build keeps it, so the drop is + // in Expand). That stored-but-not-traversable divergence is a real + // finding the harness surfaced — kept out of the generic generator + // so the edges==model oracle stays unambiguous; see B3 notes. + let others: Vec = persons.iter().copied().filter(|p| *p != from).collect(); + if others.is_empty() { + return (OpKind::InsertKnows, Ok(())); + } + let to = others[rng.below(others.len())]; + let data = knows(&format!("g{from}"), &format!("g{to}")); + let res = match b.load(&data).await { + Ok(_) => { + model.add_edge(from, to); + Ok(()) + } + Err(e) => Err(e), + }; + (OpKind::InsertKnows, res) + } + } + 6 => { + // DeleteKnows — remove an existing edge by its `from`. + let froms = model.knows_froms(); + if froms.is_empty() { + (OpKind::DeleteKnows, Ok(())) + } else { + let from = froms[rng.below(froms.len())]; + let q = format!("query q() {{ delete Knows where from = \"g{from}\" }}"); + let res = match b.mutate("main", &q).await { + Ok(_) => { + model.del_edge(from); + Ok(()) + } + Err(e) => Err(e), + }; + (OpKind::DeleteKnows, res) + } + } + 7 => { + // Repair in confirm (not force) mode — heals VERIFIED maintenance + // drift but leaves suspicious/semantic drift (e.g. RC-1's) for + // head_eq_manifest to still catch. A no-op on a clean graph; must + // never change logical data, so the model is untouched. + (OpKind::Repair, b.repair().await) + } + _ => ( + OpKind::Read, + b.query( + "main", + "query q() { match { $d: Doc { source: \"whatsapp\" } } return { $d.slug } }", + ) + .await + .map(|_| ()), + ), + } +} diff --git a/crates/omnigraph/Cargo.toml b/crates/omnigraph/Cargo.toml index 5038fd1c..091245b0 100644 --- a/crates/omnigraph/Cargo.toml +++ b/crates/omnigraph/Cargo.toml @@ -53,8 +53,14 @@ arc-swap = { workspace = true } [dev-dependencies] omnigraph-compiler = { path = "../omnigraph-compiler", version = "0.7.2" } +omnigraph-dst = { path = "../omnigraph-dst" } tokio = { workspace = true } lance-namespace-impls = { workspace = true } lance-io = "7.0.0" +arrow-array = { workspace = true } +futures = { workspace = true } +serde_json = { workspace = true } +async-trait = { workspace = true } serial_test = "3" proptest = "1" +proptest-state-machine = "0.8" diff --git a/crates/omnigraph/tests/dst.rs b/crates/omnigraph/tests/dst.rs new file mode 100644 index 00000000..44af4a51 --- /dev/null +++ b/crates/omnigraph/tests/dst.rs @@ -0,0 +1,673 @@ +//! Deterministic-simulation / morphological-matrix test harness (iss-784 / epc-783). +//! +//! A seeded generative walk drives the real engine through the op alphabet +//! (`op`), runs the white-box invariant battery (`invariants`) against a +//! reference `model` after every op, and classifies each finding as a KNOWN +//! open bug (allow-listed so the walk explores past it) or NOVEL (fails). The +//! `FaultAdapter` (`fault`) injects manifest-layer faults; `coverage` records +//! which matrix cells were touched. +//! +//! Layout: `op` (D1 alphabet) · `model` (D4 reference + count==model) · +//! `invariants` (D4 battery + structured classifier) · `fault` (StorageAdapter +//! fault injection) · `backend` (D5 embedded context) · `coverage`. +//! +//! Run: `cargo test -p omnigraph-engine --test dst -- --nocapture` +//! +//! The three named regressions ASSERT current buggy behavior on the edge build +//! (Lance 7.0.0) — characterization guards that break (forcing review) when each +//! bug is fixed: RC-X → Lance #7230 (Lance v8.0.0); RC-1 → stale-view manifest +//! CAS on delete op-class combos; dup-@key → MR-714. + +use std::panic::AssertUnwindSafe; +use std::sync::Arc; + +use futures::FutureExt; +use omnigraph::db::{Omnigraph, ReadTarget}; +use omnigraph::loader::{LoadMode, load_jsonl}; +use omnigraph_compiler::ir::ParamMap; + +// The reusable harness primitives (op alphabet, reference model, white-box +// battery, fault adapter, the `Backend` trait + `Embedded`/`Cli`) live in the +// shared `omnigraph-dst` crate so the SAME walk can also run cross-backend (see +// `omnigraph-cli/tests/cli_cross_backend_walk.rs`). This binary is the embedded +// consumer: it adds the named regressions + the white-box generative walk. +use omnigraph_dst::backend::Embedded; +use omnigraph_dst::invariants::{self, Finding, classify, classify_backend, run_battery}; +use omnigraph_dst::model::Model; +use omnigraph_dst::op::{self, Rng, doc, knows, person}; + +// Engine-local modules that stay in-tree (they drive the `Omnigraph` handle / +// compiler / loader directly, with no Backend abstraction): the coverage ledger, +// the D2×D3 read-shape battery, the proptest-state-machine shrinking campaign, +// and the parser/loader fuzz. +// +// NOTE: the failpoint-gated recovery cells live in their OWN binary +// (`tests/dst_recovery.rs`), not here — the process-global `fail` registry would +// otherwise leak armed failpoints into these (non-serial, parallel) walks. +#[path = "dst/coverage.rs"] +mod coverage; +#[path = "dst/readshape.rs"] +mod readshape; +#[path = "dst/statemachine.rs"] +mod statemachine; +#[path = "dst/fuzz.rs"] +mod fuzz; + +use coverage::Coverage; + +/// A raw embedded graph for the white-box regressions/scenarios — they drive the +/// `Omnigraph` handle directly. The generative walk uses `Embedded` (the Backend +/// impl) instead so the same code can run cross-backend. +async fn open_clean(uri: &str) -> Omnigraph { + Omnigraph::init(uri, op::SCHEMA).await.expect("init") +} + +// ═══════════════════════════ named regressions ════════════════════════════ + +/// RC-1 — multi-statement `delete Person + delete Knows` fails with a spurious +/// stale-view manifest-CAS error (the node-delete cascade bumps edge:Knows +/// under the explicit edge-delete's pinned version). +#[tokio::test] +async fn regression_rc1_stale_view_on_delete_combo() { + let dir = tempfile::tempdir().unwrap(); + let db = open_clean(dir.path().to_str().unwrap()).await; + let mut seed = String::new(); + for i in 0..50 { + seed.push_str(&person(&format!("p{i}"))); + } + for i in 0..50 { + seed.push_str(&knows(&format!("p{i}"), &format!("p{}", (i + 1) % 50))); + } + load_jsonl(&db, &seed, LoadMode::Merge).await.unwrap(); + + let q = "query m() { delete Person where slug = \"p30\" delete Knows where from = \"p30\" }"; + match db.mutate("main", q, "m", &ParamMap::new()).await { + Err(e) => assert_eq!( + classify(&Finding::Engine(e)), + Some("RC-1 stale-view"), + "RC-1: expected a stale-view manifest error" + ), + Ok(_) => panic!("RC-1 appears FIXED — flip this regression to expect Ok"), + } +} + +/// RC-X — Lance #7230: scalar BTREE corruption (duplicate row addresses) under +/// UPDATE racing optimize; the indexed read crashes with `from_sorted_iter`. +#[tokio::test] +async fn regression_rc_x_btree_from_sorted_iter() { + let dir = tempfile::tempdir().unwrap(); + let db = Arc::new(open_clean(dir.path().to_str().unwrap()).await); + const FRAGS: usize = 3; + const PER: usize = 600; + for frag in 0..FRAGS { + let mut data = String::new(); + for i in 0..PER { + let s = if i % 12 == 0 { "email" } else { "whatsapp" }; + data.push_str(&doc(&format!("d{frag}-{i}"), s)); + } + load_jsonl(&db, &data, LoadMode::Merge).await.unwrap(); + } + let _ = db.optimize().await; + + let upd = { + let db = db.clone(); + tokio::spawn(async move { + for round in 0..3 { + for frag in 0..FRAGS { + for i in (0..PER).step_by(40) { + let q = format!( + "query u() {{ update Doc set {{ body: \"r{round} needle\" }} where slug = \"d{frag}-{i}\" }}" + ); + for _ in 0..4 { + if db.mutate("main", &q, "u", &ParamMap::new()).await.is_ok() { + break; + } + } + } + } + } + }) + }; + let opt = { + let db = db.clone(); + tokio::spawn(async move { + for _ in 0..4 { + let _ = db.optimize().await; + } + }) + }; + let _ = upd.await; + let _ = opt.await; + + match db + .query( + ReadTarget::branch("main"), + "query w() { match { $d: Doc { source: \"whatsapp\" } } return { $d.slug } }", + "w", + &ParamMap::new(), + ) + .await + { + Err(e) => assert_eq!( + classify(&Finding::Engine(e)), + Some("RC-X/#7230 scalar-BTREE"), + "RC-X: expected a from_sorted_iter error" + ), + Ok(res) => panic!( + "RC-X appears FIXED ({} rows) — flip to expect Ok (Lance >= 8.0.0)", + res.num_rows() + ), + } +} + +/// dup-@key (MR-714) — concurrent same-key merge-upserts produce duplicate rows. +#[tokio::test] +async fn regression_dup_key_under_concurrency() { + let dir = tempfile::tempdir().unwrap(); + let db = Arc::new(open_clean(dir.path().to_str().unwrap()).await); + let workers = 4usize; + let keys = 500usize; + let mut handles = Vec::new(); + for _ in 0..workers { + let db = db.clone(); + handles.push(tokio::spawn(async move { + let mut data = String::new(); + for k in 0..keys { + data.push_str(&person(&format!("hot-{k}"))); + } + for _ in 0..12 { + if load_jsonl(&db, &data, LoadMode::Merge).await.is_ok() { + break; + } + } + })); + } + for h in handles { + let _ = h.await; + } + let total = db + .query( + ReadTarget::branch("main"), + "query q() { match { $x: Person } return { $x.slug } }", + "q", + &ParamMap::new(), + ) + .await + .unwrap() + .num_rows(); + assert!( + total > keys, + "dup-@key appears FIXED: total={total} == distinct keys={keys}; flip this regression" + ); +} + +/// FINDING (harness-surfaced, distinct from the 3 above): a `Knows` SELF-LOOP is +/// committed to the edge table but is NOT returned by `$a knows $b` traversal — +/// durable across optimize and reopen, and the CSR build keeps it (so the drop +/// is in Expand). `proptest_equivalence` can't catch it: it only asserts +/// CSR-vs-indexed MODE equivalence, and both modes drop the self-loop alike. The +/// model-based edges==model oracle caught it; self-loops are kept out of the +/// generic generator so that oracle stays unambiguous. This characterization +/// guard breaks (forcing review) when self-loops become traversable. +#[tokio::test] +async fn regression_self_loop_not_traversable() { + let dir = tempfile::tempdir().unwrap(); + let db = open_clean(dir.path().to_str().unwrap()).await; + load_jsonl(&db, &person("s0"), LoadMode::Merge).await.unwrap(); + load_jsonl(&db, &knows("s0", "s0"), LoadMode::Merge).await.unwrap(); + + // The self-loop row is durably committed to the edge table. + let snap = db.snapshot_of(ReadTarget::branch("main")).await.unwrap(); + let mut raw = 0; + for entry in snap.entries() { + if entry.table_key == "edge:Knows" { + raw = snap + .open(&entry.table_key) + .await + .unwrap() + .count_rows(None) + .await + .unwrap(); + } + } + assert_eq!(raw, 1, "self-loop edge row should be committed"); + + // ...but traversal does not return it (the finding). + let trav = db + .query( + ReadTarget::branch("main"), + "query e() { match { $a: Person $a knows $b } return { $a.slug, $b.slug } }", + "e", + &ParamMap::new(), + ) + .await + .unwrap() + .num_rows(); + assert_eq!( + trav, 0, + "self-loop appears FIXED (traversable={trav}) — flip this regression to expect 1" + ); +} + +// ═══════════════════════ concurrency (multi-actor) ════════════════════════ + +/// Seeded N-actor concurrent walk over a SHARED graph, with an OVERLAPPING key +/// space so same-key upserts race. Under concurrency the sequential reference +/// model doesn't apply, so the oracle is the interleaving-INVARIANT subset: +/// unique live row-ids, `Dataset::validate`, HEAD==manifest, and `@key` +/// uniqueness. dup-`@key` (MR-714) and RC-1 drift are allow-listed knowns; any +/// other divergence is a novel concurrency bug and fails. A Lance panic inside +/// an actor is contained by `tokio::spawn` (surfaces as a JoinError we ignore), +/// so the harness stays up — the post-join battery is the judge. +#[tokio::test(flavor = "multi_thread", worker_threads = 4)] +async fn concurrent_walk_structural_invariants() { + let mut reproduced: Vec = Vec::new(); + for seed in 0..3u64 { + let dir = tempfile::tempdir().unwrap(); + let db = Arc::new(open_clean(dir.path().to_str().unwrap()).await); + let actors = 4usize; + let mut handles = Vec::new(); + for a in 0..actors { + let db = db.clone(); + handles.push(tokio::spawn(async move { + let mut rng = Rng::new(seed.wrapping_mul(131) + a as u64); + for _ in 0..20 { + let k = rng.below(30); // shared 0..30 key space → races + match rng.below(5) { + 0 | 1 => { + let _ = load_jsonl(&db, &person(&format!("h{k}")), LoadMode::Merge).await; + } + 2 => { + let q = format!( + "query u() {{ update Person set {{ name: \"x{k}\" }} where slug = \"h{k}\" }}" + ); + let _ = db.mutate("main", &q, "u", &ParamMap::new()).await; + } + 3 => { + let q = format!("query d() {{ delete Person where slug = \"h{k}\" }}"); + let _ = db.mutate("main", &q, "d", &ParamMap::new()).await; + } + _ => { + let _ = db.optimize().await; + } + } + } + })); + } + for h in handles { + // A contained actor panic surfaces as a JoinError. Don't swallow it: + // a known substrate crash (FTS-OOB / RC-X) is recorded, but a NOVEL + // actor panic — one that may not leave durable corruption the battery + // would see — must still fail the test, exactly as the sequential + // walk classifies its panics. + if let Err(join_err) = h.await { + if join_err.is_panic() { + let msg = invariants::panic_message(&*join_err.into_panic()); + match invariants::classify_panic(&msg) { + Some(bug) => reproduced.push(format!("seed={seed} actor PANIC -> {bug}")), + None => panic!("seed={seed}: NOVEL actor panic: {msg}"), + } + } else { + // Not a panic (e.g. a cancelled/aborted task) — the actors are + // never cancelled here, so this is unexpected; surface it + // rather than letting it fall through silently. + panic!("seed={seed}: unexpected non-panic actor JoinError: {join_err}"); + } + } + } + + let checks: Vec<(&str, Result<(), Finding>)> = vec![ + ("row-id-unique", invariants::no_duplicate_live_row_ids(&db).await), + ("dataset.validate", invariants::dataset_validate(&db).await), + ("head==manifest", invariants::head_eq_manifest(&db).await), + ("no-dup-@key", invariants::no_duplicate_keys(&db, "Person", "main").await), + ]; + for (name, res) in checks { + if let Err(f) = res { + match classify(&f) { + Some(bug) => reproduced.push(format!("seed={seed} [{name}] -> {bug}")), + None => panic!( + "seed={seed}: NOVEL concurrent violation [{name}]: {}", + f.message() + ), + } + } + } + } + eprintln!( + "[dst] concurrent walk: {} known-bug instance(s), 0 novel violations", + reproduced.len() + ); + for r in &reproduced { + eprintln!(" - {r}"); + } +} + +// ═══════════════════════ branch isolation + merge ═════════════════════════ + +async fn count_persons(db: &omnigraph::db::Omnigraph, branch: &str) -> usize { + db.query( + ReadTarget::branch(branch), + "query q() { match { $x: Person } return { $x.slug } }", + "q", + &ParamMap::new(), + ) + .await + .unwrap() + .num_rows() +} + +async fn person_exists(db: &omnigraph::db::Omnigraph, branch: &str, slug: &str) -> usize { + let q = + format!("query p() {{ match {{ $x: Person {{ slug: \"{slug}\" }} }} return {{ $x.slug }} }}"); + db.query(ReadTarget::branch(branch), &q, "p", &ParamMap::new()) + .await + .unwrap() + .num_rows() +} + +/// D4 oracles for the branch subsystem (the deferred B2 branch_isolation + +/// merge_correctness): a branch must not observe `main` writes made after it +/// forked, `main` must not observe branch-only writes, and a merge must produce +/// the row-level union. Kept as a focused scenario (not the generic per-op walk) +/// so the reference model stays single-branch and unambiguous. +#[tokio::test] +async fn branch_isolation_and_merge() { + let dir = tempfile::tempdir().unwrap(); + let db = open_clean(dir.path().to_str().unwrap()).await; + let mut seed = String::new(); + for i in 0..5 { + seed.push_str(&person(&format!("p{i}"))); + } + load_jsonl(&db, &seed, LoadMode::Merge).await.unwrap(); + + db.branch_create("feature").await.unwrap(); + + // Diverge: a post-fork write on each side. + db.mutate( + "main", + "query i() { insert Person { slug: \"m-only\", name: \"n\" } }", + "i", + &ParamMap::new(), + ) + .await + .unwrap(); + db.mutate( + "feature", + "query i() { insert Person { slug: \"f-only\", name: \"n\" } }", + "i", + &ParamMap::new(), + ) + .await + .unwrap(); + + // branch_isolation: neither side sees the other's post-fork write. + assert_eq!( + person_exists(&db, "feature", "m-only").await, + 0, + "isolation: feature observed a post-fork main write" + ); + assert_eq!( + person_exists(&db, "main", "f-only").await, + 0, + "isolation: main observed a feature-only write" + ); + assert_eq!(person_exists(&db, "feature", "f-only").await, 1); + assert_eq!(person_exists(&db, "main", "m-only").await, 1); + + // merge_correctness: main converges to the row-level union (p0..p4 + both). + let outcome = db.branch_merge("feature", "main").await.unwrap(); + let total = count_persons(&db, "main").await; + assert_eq!( + total, 7, + "merge: expected the 7-person union, got {total} (outcome {outcome:?})" + ); + assert_eq!( + person_exists(&db, "main", "f-only").await, + 1, + "merge: feature's row was not merged into main" + ); +} + +// ═══════════════════════ D3 read shapes × D2 morphology ═══════════════════ + +/// Every read shape must execute (no engine error / panic) against every table +/// morphology — the D2×D3 cell sweep — plus the morphology-invariant counts +/// (full-scan == live persons, zero-match == 0) must hold, and the shapes must +/// run against a forked branch too. +#[tokio::test] +async fn read_shape_battery_across_morphologies() { + for morph in readshape::Morph::ALL { + let dir = tempfile::tempdir().unwrap(); + let db = omnigraph::db::Omnigraph::init(dir.path().to_str().unwrap(), readshape::SCHEMA) + .await + .unwrap(); + let expected_persons = readshape::build(&db, morph).await; + for (name, res) in readshape::run(&db, "main").await { + let rows = + res.unwrap_or_else(|e| panic!("morph {morph:?} shape [{name}] errored: {e}")); + if name == "full-scan" { + assert_eq!(rows, expected_persons, "morph {morph:?} full-scan count"); + } + if name == "zero-match" { + assert_eq!(rows, 0, "morph {morph:?} zero-match must be empty"); + } + } + } + + // on-branch morphology: every shape must execute against a forked branch. + let dir = tempfile::tempdir().unwrap(); + let db = omnigraph::db::Omnigraph::init(dir.path().to_str().unwrap(), readshape::SCHEMA) + .await + .unwrap(); + readshape::build(&db, readshape::Morph::MultiFragment).await; + db.branch_create("feature").await.unwrap(); + for (name, res) in readshape::run(&db, "feature").await { + res.unwrap_or_else(|e| panic!("on-branch shape [{name}] errored: {e}")); + } +} + +// ═══════════════════════════ generative walk ══════════════════════════════ + +/// Clean walk: full white-box battery after every op; novel violations fail. +#[tokio::test] +async fn seeded_op_loop_invariants_hold() { + run_walk(false).await; +} + +/// Phase 2: same walk under injected `StorageAdapter` conditional-write CAS-lost +/// faults (`FaultAdapter`). The engine must surface/retry them (never silently +/// lose a write) — count==model catches loss. SCOPE: this seam is the text-object +/// conditional write (sidecars/schema-staging/cluster-state), NOT the Lance +/// manifest-publish CAS — that path is covered by the failpoint cells in +/// `dst_recovery`, so this walk's value here is the recovery/sidecar CAS layer. +#[tokio::test] +async fn seeded_op_loop_with_cas_faults() { + run_walk(true).await; +} + +async fn run_walk(faults: bool) { + let mut cov = Coverage::new(); + let mut reproduced: Vec = Vec::new(); + for seed in 0..4u64 { + let dir = tempfile::tempdir().unwrap(); + let uri = dir.path().to_str().unwrap(); + let mut db = if faults { + Embedded::open_faulted(uri, seed, 8).await + } else { + Embedded::open_clean(uri).await + }; + let mut rng = Rng::new(seed); + let mut model = Model::new(); + + 'walk: for step_i in 0..25 { + // Reopen op (walk-driven): drop + reopen the handle so the recovery + // sweep / coordinator re-resolution runs against the current table + // state; the op + battery below then execute on the reopened handle, + // so the existing checks validate post-reopen. (A clean reopen, so a + // faulted walk continues fault-free after it — acceptable.) + if step_i > 0 && rng.below(100) < 15 { + drop(db); + db = Embedded::reopen(uri).await; + cov.op(op::OpKind::Reopen); + } + // A fault-injection / depth DST harness must treat a substrate PANIC + // (a Lance `unwrap`/index crash unwinding through the engine) as a + // finding, not a suite abort — so the op and the battery run under + // catch_unwind, and a known crash signature is classified like any + // other known bug (record + break); a novel panic re-raises. + let stepped = AssertUnwindSafe(op::step(&db, &mut rng, &mut model)) + .catch_unwind() + .await; + let (kind, res) = match stepped { + Ok(kr) => kr, + Err(p) => { + let msg = invariants::panic_message(&*p); + match invariants::classify_panic(&msg) { + Some(bug) => { + cov.finding(bug); + reproduced.push(format!("seed={seed} step={step_i} PANIC -> {bug}")); + break 'walk; + } + None => panic!("seed={seed} step={step_i}: NOVEL panic: {msg}"), + } + } + }; + cov.op(kind); + if let Err(e) = res { + match classify_backend(&e) { + Some(bug) => { + cov.finding(bug); + reproduced.push(format!("seed={seed} step={step_i} op[{kind:?}] -> {bug}")); + break 'walk; + } + // Fault injection legitimately fails writes in varied ways; + // the INVARIANT checks below stay strict. + None if faults => {} + None => panic!("seed={seed} step={step_i}: NOVEL op error: {}", e.message()), + } + } + let battery = match AssertUnwindSafe(run_battery(&db, &model)).catch_unwind().await { + Ok(b) => b, + Err(p) => { + let msg = invariants::panic_message(&*p); + match invariants::classify_panic(&msg) { + Some(bug) => { + cov.finding(bug); + reproduced.push(format!("seed={seed} step={step_i} battery PANIC -> {bug}")); + break 'walk; + } + None => panic!("seed={seed} step={step_i}: NOVEL battery panic: {msg}"), + } + } + }; + for (name, res) in battery { + cov.invariant(name); + if let Err(f) = res { + match classify(&f) { + Some(bug) => { + cov.finding(bug); + reproduced.push(format!("seed={seed} step={step_i} [{name}] -> {bug}")); + break 'walk; + } + None => panic!( + "seed={seed} step={step_i}: NOVEL invariant violation [{name}]: {}", + f.message() + ), + } + } + } + } + + // ── reopen==pre_state: durability oracle ── + // A fresh handle on the same bytes (clean adapter; the open-time recovery + // sweep runs) must agree with the model the walk built. count==model / + // content==model prove the data survived; head==manifest / row-id-disjoint + // prove the sweep left no residual drift. Reuses the same battery at + // durability time, so no separate coverage cell. + drop(db); + let reopened = Embedded::reopen(uri).await; + for (name, res) in run_battery(&reopened, &model).await { + if let Err(f) = res { + match classify(&f) { + Some(bug) => { + cov.finding(bug); + reproduced.push(format!("seed={seed} [reopen/{name}] -> {bug}")); + } + None => panic!( + "seed={seed}: NOVEL post-reopen violation [{name}]: {}", + f.message() + ), + } + } + } + } + eprintln!( + "[dst] walk(faults={faults}): coverage [{}]; {} known-bug instance(s), 0 novel violations (+reopen durability gate)", + cov.report(), + reproduced.len() + ); + for r in &reproduced { + eprintln!(" - {r}"); + } +} + + + +// ═══════════════════════════ D5 context: S3 (PR-D) ════════════════════════ + +/// Build a unique `s3://` DST graph URI, or `None` when the bucket env is unset +/// (skip locally; runs in CI's `rustfs_integration` job). +fn s3_dst_uri() -> Option { + let bucket = std::env::var("OMNIGRAPH_S3_TEST_BUCKET").ok()?; + let base = if bucket.starts_with("s3://") { + bucket + } else { + format!("s3://{bucket}") + }; + let nanos = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap() + .as_nanos(); + Some(format!( + "{}/dst-harness/{}-{}", + base.trim_end_matches('/'), + std::process::id(), + nanos + )) +} + +/// D5 S3 context: the FULL white-box battery must hold on a real object-store +/// backend, not just local FS. A short seeded op sequence builds state on +/// `s3://`, then the battery runs (known bugs allow-listed, novel → fail). +#[tokio::test(flavor = "multi_thread")] +async fn s3_battery_holds() { + let Some(uri) = s3_dst_uri() else { + eprintln!("skipping s3 dst battery: OMNIGRAPH_S3_TEST_BUCKET unset"); + return; + }; + let db = Embedded::open_clean(&uri).await; + let mut rng = Rng::new(1); + let mut model = Model::new(); + for _ in 0..8 { + // Don't discard op errors: `step` only advances the model on success, so + // a swallowed S3-specific failure would leave db+model in step and let + // the battery pass over a real backend fault. Classify like `run_walk` — + // known open bugs allow-listed, any novel S3 op error fails. + let (_kind, res) = op::step(&db, &mut rng, &mut model).await; + if let Err(e) = res { + match classify_backend(&e) { + Some(_bug) => {} // known open bug — allow-listed + None => panic!("s3: NOVEL op error: {}", e.message()), + } + } + } + for (name, res) in run_battery(&db, &model).await { + if let Err(f) = res { + match classify(&f) { + Some(_bug) => {} // known open bug — allow-listed + None => panic!("s3: NOVEL battery violation [{name}]: {}", f.message()), + } + } + } +} diff --git a/crates/omnigraph/tests/dst/MATRIX.md b/crates/omnigraph/tests/dst/MATRIX.md new file mode 100644 index 00000000..c52005b1 --- /dev/null +++ b/crates/omnigraph/tests/dst/MATRIX.md @@ -0,0 +1,147 @@ +# DST coverage ledger (completeness-critic output) + +This is the harness's honest self-assessment: which cells of the morphological +matrix it actually samples, which it does not, and — most importantly — which +**dimensions it had not named** until a bug found by other means forced them +into view. + +> **Read this first:** "comprehensive" is always *relative to the dimensions +> below*. A 100% mark here means "100% of the distinctions we thought to draw." +> The only way to find an *unnamed* dimension is to check the harness against a +> bug it didn't catch (see the `#296` row). Re-run this critic whenever that +> happens. This ledger is a process artifact, never a proof. + +## Why exhaustive sampling is impossible (so this ledger exists instead) + +1. **Cells are path-spaces, several unbounded.** A "cell" is reached by a + *sequence* of ops; sequence count grows exponentially with length, and + length, history depth, data values, and fragment/version morphology are all + unbounded. The grid is a projection of an infinite tree. +2. **Concurrency is non-enumerable.** A concurrent bug lives in a *schedule* + (interleaving + timing), not a cell. You sample schedules or steer toward a + named hazard; you never cover them. +3. **The model is open-world.** You can only sample dimensions you've named, and + you can't prove you've named them all (`#296` was a dimension we hadn't). + +So the goal is not coverage; it's **maximize bugs-found per unit cost** — +sample with novelty bias, steer to named hazards, and run this critic to surface +new dimensions. + +## Dimension ledger + +Legend: ✅ sampled · 🟡 partial · ❌ unsampled · ⏸️ deferred-by-plan (PR-C/PR-D) + +### D1 — operations +| op | status | where | +|----|--------|-------| +| insert node (Person/Doc) | ✅ | walk, statemachine | +| insert/delete edge (Knows) | ✅ | walk (`InsertKnows`/`DeleteKnows`) | +| update | ✅ | walk, statemachine | +| delete node (cascade) | ✅ | walk (`DeletePerson`) | +| optimize | ✅ | walk | +| repair | ✅ | walk (`Repair`) | +| read | ✅ | walk, readshape | +| branch create/write/merge | 🟡 | `branch_isolation_and_merge` (scenario, not generic walk) | +| **`open` / recovery sweep** | ✅ | walk (`Reopen` op, walk-driven mid-sequence) + failpoint recovery cells (`dst_recovery`: roll-forward under finalize failure + the `#296` concurrent-opens-converge cell) | +| cleanup (version GC) | ❌ | needs `&mut self`; deferred | +| apply-schema mid-sequence | ❌ | forks the single-branch model; deferred | +| overwrite (`LoadMode::Overwrite`) | ❌ | deferred | + +### D2 — latent table morphology +| morphology | status | where | +|------------|--------|-------| +| 1 vs ≥2 fragments | ✅ | readshape (`Single`/`MultiFragment`), walk | +| deletion vectors | ✅ | readshape (`WithDeletions`), walk | +| compacted/reindexed | ✅ | readshape (`Optimized`), walk | +| on-branch | 🟡 | readshape on-branch + branch scenario | +| HEAD>manifest drift | 🟡 | produced by RC-1; not deliberately steered | +| overlapping row-id ranges | ✅ (as invariant target) | `no_duplicate_live_row_ids` | + +### D3 — read shapes +| shape | status | where | +|-------|--------|-------| +| scan · @key · indexed · non-indexed · range · order+limit · count · numeric-agg · 1-hop · var-hop · negation · zero-match | ✅ | `readshape::shapes()` × 4 morphologies × on-branch | +| **vector (`nearest`)** | ⏸️ | needs vector data | +| **FTS (`bm25`/`search`/`fuzzy`) / `rrf`** | ⏸️ | needs the inverted index whose builder OOB-panics (finding #5) | + +### D4 — oracles +| oracle | status | +|--------|--------| +| HEAD==manifest · `Dataset::validate` · row-id-unique · index-probe · count==model · content==model · edges==model (RI) · @key-unique | ✅ | +| branch isolation · merge correctness · reopen==pre_state | ✅ | +| **replay-equality (bit-identical)** | ⏸️ — blocked on PR-C determinism (Lance internal parallelism unseeded) | +| **linearizability (porcupine)** | ⏸️ | + +### D5 — context *(the dimension with the biggest blind spots)* +| context | status | note | +|---------|--------|------| +| embedded backend | ✅ | drives `Embedded` (the in-process `Backend` impl) + the white-box battery | +| **CLI backend (subprocess)** | ✅ (PR-E) | `cli_cross_backend_walk` (in `omnigraph-cli` tests) — the SAME seeded walk drives `Embedded` AND `Cli` (`omnigraph-dst::Cli`, binary via `CARGO_BIN_EXE_omnigraph`), asserting per-step black-box agreement (slug sets + edge count). Unblocked by extracting op/model/invariants/fault behind the `Backend` trait into the shared `omnigraph-dst` crate. White-box battery stays embedded-only by construction; the CLI arm runs the black-box oracles. One known contract difference allow-listed: `repair --confirm` (no `--force`) exits non-zero refusing suspicious drift where embedded `repair(force=false)` returns Ok. | +| **long-lived server backend** | ❌ deferred | `parity_matrix.rs` covers single-op CLI-vs-server; a server-arm generative walk is the remaining D5 gap. | +| local FS | ✅ | | +| **S3 (RustFS/MinIO)** | ✅ (PR-D) | `s3_battery_holds` — full battery on `s3://`, env-gated (`OMNIGRAPH_S3_TEST_BUCKET`) | +| **parser/loader fuzz** | ✅ (PR-D) | `fuzz::*` — proptest dup/malformed-injecting; `cargo-fuzz`/libFuzzer deferred (needs nightly) | +| single writer | ✅ | | +| concurrent writers (one handle) | ✅ | `concurrent_walk_structural_invariants` | +| **concurrent *opens* / ≥2 recovery sweeps** | ❌ | **the `#296` cell — see below** | +| **cross-process writers/opens** | ❌ | documented engine known-gap; harness can't reach it yet | +| cold vs warm coordinator | 🟡 | reopen exercises cold; not steered | + +### Hidden dimensions (named only after a miss) +| dimension | how we learned it | status | +|-----------|-------------------|--------| +| **handle/process multiplicity** | `#296` (concurrent recovery sweeps on one sidecar) | ❌ being closed | +| **`open`/recovery as a first-class op** | `#296` (the sweep is a hidden op) | 🟡 — `Reopen` op added to the walk; concurrent/cross-process still ❌ | +| **schedule/interleaving precision** | `#296` needs one exact classify→publish-CAS race | 🟡 — sampled stochastically (multi-thread), not steered; failpoints would steer it | + +### ⚠️ Fault-injection seam is narrower than documented (critic finding) +The Phase-2 `FaultAdapter` wraps `StorageAdapter::write_text_if_match` claiming to +fault "the conditional manifest write." It does **not**: the `__manifest` publish +is a Lance `MergeInsertBuilder` row-level CAS on `object_id` +(`db/manifest/publisher.rs:377`), which never flows through the StorageAdapter. +Verified empirically — a write under `cas_conflict_pct = 100` **succeeds and +leaves no sidecar**. So `seeded_op_loop_with_cas_faults` injects into a *cold* +text-CAS path (schema staging / `omnigraph.rs:2380,2441`), not the manifest hot +path, which is why faults≈no-faults in the walk. **Real manifest-CAS / publish +fault injection needs either (a) wrapping Lance's `object_store` at dataset open +(the deferred "Lance-internal-I/O" seam) or (b) the engine's failpoints** +(`recovery.before_roll_forward_publish`, the per-writer Phase-B publisher +failpoints). This is the prerequisite for the `#296` cell — the StorageAdapter +seam cannot induce a `RolledPastExpected` sidecar. + +## Prioritized gap-closure backlog +1. **Widen the fault seam** — ✅ DONE via the `--features failpoints` variant + (`tests/dst_recovery.rs`, own binary so the process-global `fail` registry can't + leak into the main walks). `mutation.post_finalize_pre_publisher` now induces a + real `RolledPastExpected` sidecar — the thing the StorageAdapter wrapper could + not. *(The StorageAdapter seam is still off the manifest publish; failpoints are + the reach. A Lance `object_store` wrapper remains the option for generative — + not hand-armed — manifest-CAS faults.)* +2. **`#296` cell — concurrent `open` under a pending sidecar** — ✅ DONE + (`concurrent_opens_converge_on_pending_sidecar`): an inline park-first rendezvous + at `recovery.before_roll_forward_publish` forces two sweeps to race one sidecar; + the CAS-loser must converge (not `ExpectedVersionMismatch`). Non-vacuous (the + rendezvous panics if the race never fires) + the white-box battery as oracle. + Guards 0.7.2 (the failpoint instrumentation was added *with* the #296 fix, so it + can't run against true 0.7.1). +3. **`open`/recovery as a generated op** — ✅ DONE (the `Reopen` op): the walk drops + + reopens mid-sequence, sampling the sweep across walk states. +4. **cross-process** writer/open scenarios (subprocess backend) — the documented + one-winner-CAS territory. ❌ remaining. +5. **generative** (not hand-armed) recovery faults — wrap Lance's `object_store` so + the walk *discovers* sidecar/CAS bugs instead of the cells being scripted. ❌ +6. **determinism/replay-equality** (PR-C) — ✅ DONE (`--features dst` seam + + `replay_equality_same_seed`). **S3 context + parser/loader fuzz** (PR-D) — ✅ DONE + (`s3_battery_holds`, `fuzz::*`). **CLI cross-backend walk** (PR-E) — ✅ DONE: the + harness op/model/invariants/fault extracted behind a `Backend` trait into the + shared `omnigraph-dst` crate, so the same seeded walk runs embedded AND via the + CLI subprocess (`cli_cross_backend_walk`, see D5 table). **Still ⏸️/❌:** + long-lived *server* backend walk (deferred — `parity_matrix` covers single-op + CLI-vs-server), `cargo-fuzz`/libFuzzer (needs nightly), `porcupine` + linearizability, vector/FTS/rrf read shapes, wrapping the main walks in + `with_seed`. + +## The standing rule +When a bug is found *outside* this harness, before closing it: add its row to the +Hidden-dimensions table if it names a new dimension, then add a sampling cell. +That is the only mechanism that grows the model. The ledger is never "done." diff --git a/crates/omnigraph/tests/dst/coverage.rs b/crates/omnigraph/tests/dst/coverage.rs new file mode 100644 index 00000000..9eed2290 --- /dev/null +++ b/crates/omnigraph/tests/dst/coverage.rs @@ -0,0 +1,38 @@ +//! Coverage capture: which matrix cells the generative walk actually touched. +//! Turns "comprehensive" from a claim into a number (hit / total). + +use std::collections::HashSet; + +use omnigraph_dst::op::OpKind; + +#[derive(Default)] +pub struct Coverage { + ops: HashSet, + invariants: HashSet<&'static str>, + /// op-error and invariant-violation signatures actually exercised. + findings: HashSet<&'static str>, +} + +impl Coverage { + pub fn new() -> Self { + Self::default() + } + pub fn op(&mut self, k: OpKind) { + self.ops.insert(k); + } + pub fn invariant(&mut self, name: &'static str) { + self.invariants.insert(name); + } + pub fn finding(&mut self, name: &'static str) { + self.findings.insert(name); + } + pub fn report(&self) -> String { + format!( + "ops {}/{}, invariants {}/7, known-bugs exercised {}", + self.ops.len(), + OpKind::ALL.len(), + self.invariants.len(), + self.findings.len() + ) + } +} diff --git a/crates/omnigraph/tests/dst/fuzz.rs b/crates/omnigraph/tests/dst/fuzz.rs new file mode 100644 index 00000000..8870f32f --- /dev/null +++ b/crates/omnigraph/tests/dst/fuzz.rs @@ -0,0 +1,174 @@ +//! PR-D parser/loader fuzz — proptest-based (no nightly / `cargo-fuzz` needed). +//! +//! The lesson of Lance #7230: a parse/merge path that was only ever fed +//! pre-sorted, dup-free input shipped a latent crash. So feed the GQ query +//! parser, the schema parser, and the JSONL loader BOTH arbitrary and +//! ADVERSARIAL (duplicate-key, reordered-field, malformed) input and assert they +//! return a `Result` — never PANIC. proptest shrinks any panic to a minimal +//! input and persists the seed under `proptest-regressions/`. +//! +//! (cargo-fuzz/libFuzzer targets are the deferred follow-up — they need a +//! nightly toolchain; this proptest form gets the same "parser survives hostile +//! input" coverage with the stable toolchain already in use.) + +use proptest::prelude::*; + +use omnigraph::loader::{LoadMode, load_jsonl}; +use omnigraph_compiler::find_named_query; +use omnigraph_compiler::schema::parser::parse_schema; + +/// Random GQ-token sequences — far likelier to reach deep parser states (and +/// their panics) than purely random bytes. +fn arb_gq() -> impl Strategy { + let token = prop_oneof![ + Just("query"), + Just("q"), + Just("("), + Just(")"), + Just("{"), + Just("}"), + Just("match"), + Just("return"), + Just("$x"), + Just("$y"), + Just(":"), + Just("Person"), + Just("Doc"), + Just("knows"), + Just("not"), + Just("limit"), + Just("3"), + Just("order"), + Just("desc"), + Just(","), + Just("."), + Just("slug"), + Just("\""), + Just("count"), + Just("="), + Just("where"), + Just("{1,3}"), + Just("nearest"), + ] + .prop_map(|s| s.to_string()); + proptest::collection::vec(token, 0..40).prop_map(|toks| toks.join(" ")) +} + +/// Random `.pg`-token sequences for the schema parser. +fn arb_pg() -> impl Strategy { + let token = prop_oneof![ + Just("node"), + Just("edge"), + Just("Person"), + Just("Doc"), + Just("{"), + Just("}"), + Just("slug"), + Just(":"), + Just("String"), + Just("I64"), + Just("@key"), + Just("@index"), + Just("@card(0..1)"), + Just("enum(a,b)"), + Just("->"), + Just("Knows"), + Just("\n"), + ] + .prop_map(|s| s.to_string()); + proptest::collection::vec(token, 0..40).prop_map(|toks| toks.join(" ")) +} + +proptest! { + #![proptest_config(ProptestConfig { cases: 256, .. ProptestConfig::default() })] + + /// The GQ query parser must return a Result for ANY input — never panic. + #[test] + fn gq_parser_never_panics(src in ".{0,256}") { + let _ = find_named_query(&src, "q"); + } + + /// ...including structured GQ-token soup that reaches deep parse states. + #[test] + fn gq_parser_structured_never_panics(src in arb_gq()) { + let _ = find_named_query(&src, "q"); + } + + /// The schema parser must return a Result for ANY input — never panic. + #[test] + fn schema_parser_never_panics(src in ".{0,256}") { + let _ = parse_schema(&src); + } + + /// ...including structured `.pg`-token soup. + #[test] + fn schema_parser_structured_never_panics(src in arb_pg()) { + let _ = parse_schema(&src); + } +} + +/// The JSONL loader must SURVIVE adversarial batches — duplicate `@key`s, +/// reordered/extra/missing fields, wrong types, malformed JSON, orphan edges — +/// returning `Ok`/`Err`, never panicking (and never corrupting: a later normal +/// load + read still works). +#[tokio::test] +async fn loader_survives_adversarial_jsonl() { + let dir = tempfile::tempdir().unwrap(); + let db = crate::open_clean(dir.path().to_str().unwrap()).await; + + let adversarial: &[&str] = &[ + // duplicate @key within one batch + "{\"type\":\"Person\",\"data\":{\"slug\":\"dup\",\"name\":\"a\"}}\n{\"type\":\"Person\",\"data\":{\"slug\":\"dup\",\"name\":\"b\"}}\n", + // reordered top-level fields + "{\"data\":{\"slug\":\"r1\",\"name\":\"n\"},\"type\":\"Person\"}\n", + // extra unknown field + "{\"type\":\"Person\",\"data\":{\"slug\":\"x1\",\"name\":\"n\",\"bogus\":42}}\n", + // missing required property (name) + "{\"type\":\"Person\",\"data\":{\"slug\":\"x2\"}}\n", + // wrong type (name as number) + "{\"type\":\"Person\",\"data\":{\"slug\":\"x3\",\"name\":99}}\n", + // unknown node type + "{\"type\":\"Ghost\",\"data\":{\"slug\":\"g\"}}\n", + // null slug + "{\"type\":\"Person\",\"data\":{\"slug\":null,\"name\":\"n\"}}\n", + // orphan edge (endpoints don't exist) + "{\"edge\":\"Knows\",\"from\":\"nope\",\"to\":\"nope\",\"data\":{}}\n", + // malformed JSON line + "{not json at all\n", + // empty line + valid line + "\n{\"type\":\"Doc\",\"data\":{\"slug\":\"d1\",\"source\":\"whatsapp\",\"body\":\"b\"}}\n", + // enum out of range + "{\"type\":\"Doc\",\"data\":{\"slug\":\"d2\",\"source\":\"not_an_enum\",\"body\":\"b\"}}\n", + // huge string + &{ + let big = "z".repeat(50_000); + format!("{{\"type\":\"Person\",\"data\":{{\"slug\":\"big\",\"name\":\"{big}\"}}}}\n") + }, + ]; + + for (i, batch) in adversarial.iter().enumerate() { + // The ONLY contract: no panic. Ok or Err are both acceptable. + let _ = load_jsonl(&db, batch, LoadMode::Merge).await; + let _ = i; + } + + // ...and the graph is still usable after all that abuse. + load_jsonl( + &db, + "{\"type\":\"Person\",\"data\":{\"slug\":\"sane\",\"name\":\"ok\"}}\n", + LoadMode::Merge, + ) + .await + .unwrap(); + let n = db + .query( + omnigraph::db::ReadTarget::branch("main"), + "query q() { match { $p: Person { slug: \"sane\" } } return { $p.slug } }", + "q", + &omnigraph_compiler::ir::ParamMap::new(), + ) + .await + .unwrap() + .num_rows(); + assert_eq!(n, 1, "graph must remain usable after adversarial loads"); +} diff --git a/crates/omnigraph/tests/dst/readshape.rs b/crates/omnigraph/tests/dst/readshape.rs new file mode 100644 index 00000000..910b218c --- /dev/null +++ b/crates/omnigraph/tests/dst/readshape.rs @@ -0,0 +1,169 @@ +//! D3 × D2: the read-shape battery run against deliberately-built table +//! morphologies. The oracle is primarily NO-CRASH — every query shape must +//! execute without an engine error or panic across each latent layout (single +//! fragment, ≥2 fragments, deletion vectors, compacted, on-branch) — plus exact +//! count checks where the answer is morphology-invariant. This is the D2×D3 +//! cell sweep: a planner/execution regression that only bites a specific +//! fragment layout shows up here, where the generic walk's main-branch oracles +//! would miss it. +//! +//! Scope: the non-vector/non-FTS shapes (scan · @key · indexed · non-indexed · +//! range · order+limit · count · numeric aggregates · 1-hop · var-hop · +//! negation · zero-match). Vector (`nearest`) and FTS (`bm25`/`search`) shapes +//! are deferred — they need vector data and the inverted index whose builder +//! OOB-panics at depth (see `regression`-adjacent finding in dst.rs). + +use omnigraph::db::{Omnigraph, ReadTarget}; +use omnigraph::loader::{LoadMode, load_jsonl}; +use omnigraph_compiler::ir::ParamMap; + +/// Richer than the walk schema: `age: I64` unlocks range + numeric aggregates. +pub const SCHEMA: &str = r#" +node Person { + slug: String @key + name: String + age: I64 +} +node Doc { + slug: String @key + source: enum(whatsapp, email, linkedin, slack, telegram) @index + body: String +} +edge Knows: Person -> Person @card(0..1) +"#; + +fn person(slug: &str, age: i64) -> String { + format!("{{\"type\":\"Person\",\"data\":{{\"slug\":\"{slug}\",\"name\":\"n\",\"age\":{age}}}}}\n") +} +fn doc(slug: &str, source: &str) -> String { + format!("{{\"type\":\"Doc\",\"data\":{{\"slug\":\"{slug}\",\"source\":\"{source}\",\"body\":\"needle filler\"}}}}\n") +} +fn knows(from: &str, to: &str) -> String { + format!("{{\"edge\":\"Knows\",\"from\":\"{from}\",\"to\":\"{to}\",\"data\":{{}}}}\n") +} + +#[derive(Clone, Copy, Debug)] +pub enum Morph { + /// One load batch → one fragment. + Single, + /// Each row group in its own `Merge` batch → ≥2 fragments. + MultiFragment, + /// Multi-fragment, then a delete → deletion vectors over live rows. + WithDeletions, + /// Multi-fragment, then `optimize` → compacted + reindexed. + Optimized, +} + +impl Morph { + pub const ALL: [Morph; 4] = [ + Morph::Single, + Morph::MultiFragment, + Morph::WithDeletions, + Morph::Optimized, + ]; +} + +/// The base population (before morphology-specific shaping): 10 persons p0..p9 +/// with ages 0..9, a `knows` chain p0→p1→…→p8, and 10 docs cycling 4 of the 5 +/// enum sources (never `telegram`, so zero-match is deterministic). +fn base_rows() -> (Vec, Vec) { + let sources = ["whatsapp", "email", "linkedin", "slack"]; + let mut persons = Vec::new(); + let mut docs = Vec::new(); + for i in 0..10 { + persons.push(person(&format!("p{i}"), i as i64)); + docs.push(doc(&format!("d{i}"), sources[i % 4])); + } + (persons, docs) +} + +/// Build the requested morphology on a fresh graph; returns the live Person +/// count after shaping (the only count the read battery asserts exactly). +pub async fn build(db: &Omnigraph, morph: Morph) -> usize { + let (persons, docs) = base_rows(); + match morph { + Morph::Single => { + let mut all = persons.concat(); + all.push_str(&docs.concat()); + for i in 0..9 { + all.push_str(&knows(&format!("p{i}"), &format!("p{}", i + 1))); + } + load_jsonl(db, &all, LoadMode::Merge).await.unwrap(); + 10 + } + Morph::MultiFragment | Morph::Optimized => { + // Each person + its doc in its own batch → 10 fragments. + for i in 0..10 { + load_jsonl(db, &format!("{}{}", persons[i], docs[i]), LoadMode::Merge) + .await + .unwrap(); + } + for i in 0..9 { + load_jsonl(db, &knows(&format!("p{i}"), &format!("p{}", i + 1)), LoadMode::Merge) + .await + .unwrap(); + } + if matches!(morph, Morph::Optimized) { + db.optimize().await.unwrap(); + } + 10 + } + Morph::WithDeletions => { + for i in 0..10 { + load_jsonl(db, &format!("{}{}", persons[i], docs[i]), LoadMode::Merge) + .await + .unwrap(); + } + for i in 0..9 { + load_jsonl(db, &knows(&format!("p{i}"), &format!("p{}", i + 1)), LoadMode::Merge) + .await + .unwrap(); + } + // Delete p0..p2 → deletion vectors + cascaded edges. + for i in 0..3 { + db.mutate( + "main", + &format!("query d() {{ delete Person where slug = \"p{i}\" }}"), + "d", + &ParamMap::new(), + ) + .await + .unwrap(); + } + 7 + } + } +} + +/// Every read shape as `(name, query)`. Each must execute without an engine +/// error against every morphology. +pub fn shapes() -> Vec<(&'static str, &'static str)> { + vec![ + ("full-scan", "query q() { match { $p: Person } return { $p.slug } }"), + ("key-filter", "query q() { match { $p: Person { slug: \"p5\" } } return { $p.slug } }"), + ("indexed-filter", "query q() { match { $d: Doc { source: \"whatsapp\" } } return { $d.slug } }"), + ("nonindexed-filter", "query q() { match { $p: Person $p.age >= 5 } return { $p.slug } }"), + ("range", "query q() { match { $p: Person $p.age >= 3 $p.age <= 6 } return { $p.slug } }"), + ("order-limit", "query q() { match { $p: Person } return { $p.slug, $p.age } order { $p.age desc } limit 3 }"), + ("count", "query q() { match { $p: Person } return { count($p) as n } }"), + ("aggregate", "query q() { match { $p: Person } return { sum($p.age) as s, avg($p.age) as a, min($p.age) as mn, max($p.age) as mx } }"), + ("traversal-1hop", "query q() { match { $a: Person $a knows $b } return { $a.slug, $b.slug } }"), + ("var-hop", "query q() { match { $a: Person { slug: \"p3\" } $a knows{1,3} $b } return { $b.slug } }"), + ("negation", "query q() { match { $p: Person not { $p knows $_ } } return { $p.slug } }"), + ("zero-match", "query q() { match { $d: Doc { source: \"telegram\" } } return { $d.slug } }"), + ] +} + +/// Run the whole battery against `branch`; returns `(shape, Result)`. +pub async fn run(db: &Omnigraph, branch: &str) -> Vec<(&'static str, Result)> { + let mut out = Vec::new(); + for (name, q) in shapes() { + let r = db + .query(ReadTarget::branch(branch), q, "q", &ParamMap::new()) + .await + .map(|res| res.num_rows()) + .map_err(|e| e.to_string()); + out.push((name, r)); + } + out +} diff --git a/crates/omnigraph/tests/dst/recovery_walk.rs b/crates/omnigraph/tests/dst/recovery_walk.rs new file mode 100644 index 00000000..65ebe96c --- /dev/null +++ b/crates/omnigraph/tests/dst/recovery_walk.rs @@ -0,0 +1,184 @@ +//! Closes the fault-seam gap (see MATRIX.md). The Phase-2 `FaultAdapter` wraps +//! `StorageAdapter::write_text_if_match`, which is OFF the manifest-publish hot +//! path (the publish is a Lance `MergeInsertBuilder` CAS), so it cannot induce a +//! pending recovery sidecar. **Failpoints can** — `mutation.post_finalize_pre_publisher` +//! fires after Lance HEAD advances (and the `__recovery/{ulid}.json` sidecar is +//! written) but before the manifest publish, leaving a `RolledPastExpected` +//! sidecar. +//! +//! Gated on `--features failpoints` (the `fail` registry is process-global, so +//! every test here is `#[serial]`). Two cells: +//! 1. recovery rolls forward under a finalize failure — judged by the harness's +//! WHITE-BOX structural battery (additive vs the engine's count-only test). +//! 2. the **#296 cell** — concurrent `Omnigraph::open` racing one pending +//! sidecar must all converge, never `ExpectedVersionMismatch`. This is the +//! named blind spot from MATRIX.md, now sampled. + +use std::sync::Arc; +use std::sync::atomic::{AtomicBool, Ordering::SeqCst}; +use std::time::Duration; + +use fail::FailScenario; +use omnigraph::db::{Omnigraph, ReadTarget}; +use omnigraph::failpoints::{ScopedFailPoint, names}; +use omnigraph_compiler::ir::ParamMap; +use serial_test::serial; + +use omnigraph_dst::invariants::{self, Finding, classify}; + +/// The structural (model-free) battery — valid even after a roll-forward +/// changes the visible state, unlike count==model. +async fn structural_battery(db: &Omnigraph) -> Vec<(&'static str, Result<(), Finding>)> { + vec![ + ("row-id-unique", invariants::no_duplicate_live_row_ids(db).await), + ("dataset.validate", invariants::dataset_validate(db).await), + ("head==manifest", invariants::head_eq_manifest(db).await), + ("no-dup-@key", invariants::no_duplicate_keys(db, "Person", "main").await), + ] +} + +fn judge(findings: Vec<(&'static str, Result<(), Finding>)>, ctx: &str) { + for (name, res) in findings { + if let Err(f) = res { + match classify(&f) { + Some(_bug) => {} // known open bug — allow-listed + None => panic!("{ctx}: NOVEL violation [{name}]: {}", f.message()), + } + } + } +} + +/// Arm `mutation.post_finalize_pre_publisher`, run one insert that fails after +/// Phase B, and confirm exactly one sidecar persists. Returns having left the +/// graph at `uri` with a pending `RolledPastExpected` sidecar. +async fn induce_pending_sidecar(uri: &str, slug: &str) { + let db = crate::reopen(uri).await; + let _fp = ScopedFailPoint::new(names::MUTATION_POST_FINALIZE_PRE_PUBLISHER, "return"); + let q = format!("query i() {{ insert Person {{ slug: \"{slug}\", name: \"n\" }} }}"); + let err = db.mutate("main", &q, "i", &ParamMap::new()).await.unwrap_err(); + assert!( + err.to_string().contains("mutation.post_finalize_pre_publisher"), + "expected the finalize failpoint, got: {err}" + ); + // sidecar dropped with `_fp` + `db` at scope end (handle freed for reopen) +} + +fn sidecar_count(uri: &str) -> usize { + let recov = std::path::Path::new(uri).join("__recovery"); + std::fs::read_dir(&recov) + .map(|rd| rd.filter_map(|e| e.ok()).count()) + .unwrap_or(0) +} + +/// Cell 1: a finalize failure leaves a sidecar; the next open rolls it forward, +/// and the white-box structural battery holds afterward. +#[tokio::test] +#[serial] +async fn recovery_rolls_forward_under_finalize_failure() { + let _scenario = FailScenario::setup(); + let dir = tempfile::tempdir().unwrap(); + let uri = dir.path().to_str().unwrap().to_string(); + drop(crate::open_clean(&uri).await); // init the graph + + induce_pending_sidecar(&uri, "eve").await; + assert_eq!(sidecar_count(&uri), 1, "a sidecar must persist for recovery"); + + // Reopen runs the sweep → rolls forward → converges. + let db = crate::reopen(&uri).await; + assert_eq!(sidecar_count(&uri), 0, "sweep must delete the sidecar"); + let n = db + .query( + ReadTarget::branch("main"), + "query q() { match { $x: Person } return { $x.slug } }", + "q", + &ParamMap::new(), + ) + .await + .unwrap() + .num_rows(); + assert_eq!(n, 1, "the rolled-forward insert must be visible"); + judge(structural_battery(&db).await, "post-roll-forward"); +} + +/// Minimal park-first rendezvous (inlined from `tests/helpers/failpoint.rs` so +/// the dst binary doesn't pull the whole helpers module): the FIRST thread to +/// hit `name` parks until `release()`; later arrivals fall through. Bounded so a +/// bug can't hang the suite. +struct Rendezvous { + reached: Arc, + release: Arc, + _fp: ScopedFailPoint, +} +impl Rendezvous { + fn park_first(name: &str) -> Self { + let reached = Arc::new(AtomicBool::new(false)); + let release = Arc::new(AtomicBool::new(false)); + let (r, rl) = (Arc::clone(&reached), Arc::clone(&release)); + let _fp = ScopedFailPoint::with_callback(name, move || { + if r.compare_exchange(false, true, SeqCst, SeqCst).is_ok() { + for _ in 0..6000 { + if rl.load(SeqCst) { + return; + } + std::thread::sleep(Duration::from_millis(5)); + } + } + }); + Self { reached, release, _fp } + } + async fn wait_until_reached(&self) { + for _ in 0..2400 { + if self.reached.load(SeqCst) { + return; + } + tokio::time::sleep(Duration::from_millis(5)).await; + } + panic!("rendezvous: failpoint was never reached"); + } + fn release(&self) { + self.release.store(true, SeqCst); + } +} + +/// Cell 2 (the #296 blind spot), forced DETERMINISTICALLY: two concurrent +/// `Omnigraph::open` sweeps race ONE pending sidecar. The first parks at the +/// publish window (after classifying `RolledPastExpected`); the second falls +/// through, rolls the sidecar forward (manifest v→v+1), deletes it. Released, +/// the parked sweep's publish CAS finds the manifest already at goal — it must +/// CONVERGE, not fail the open with `ExpectedVersionMismatch`. This is the +/// generative shape of #296: it fails the open on 0.7.1, converges on 0.7.2. +#[tokio::test(flavor = "multi_thread", worker_threads = 4)] +#[serial] +async fn concurrent_opens_converge_on_pending_sidecar() { + let _scenario = FailScenario::setup(); + let dir = tempfile::tempdir().unwrap(); + let uri = dir.path().to_str().unwrap().to_string(); + drop(crate::open_clean(&uri).await); + + induce_pending_sidecar(&uri, "race").await; + assert_eq!(sidecar_count(&uri), 1); + + let rv = Rendezvous::park_first(names::RECOVERY_BEFORE_ROLL_FORWARD_PUBLISH); + + let uri_parked = uri.clone(); + let parked = tokio::spawn(async move { Omnigraph::open(&uri_parked).await.map(|_| ()) }); + rv.wait_until_reached().await; + + // The second open converges the sidecar, advancing the manifest past the + // parked sweep's pin. + Omnigraph::open(&uri) + .await + .expect("the converging open must roll the sidecar forward and succeed"); + + // Release the parked sweep: its CAS loses at a now-stale expected version and + // must converge, not crash the open. + rv.release(); + parked + .await + .expect("parked open task must not panic") + .expect("the parked sweep must CONVERGE on CAS-loss, not ExpectedVersionMismatch (#296)"); + + assert_eq!(sidecar_count(&uri), 0, "sidecar gone after both converge"); + let db = crate::reopen(&uri).await; + judge(structural_battery(&db).await, "post-concurrent-converge"); +} diff --git a/crates/omnigraph/tests/dst/statemachine.rs b/crates/omnigraph/tests/dst/statemachine.rs new file mode 100644 index 00000000..75b04017 --- /dev/null +++ b/crates/omnigraph/tests/dst/statemachine.rs @@ -0,0 +1,228 @@ +//! B5: proptest-state-machine campaign with automatic SHRINKING + regression +//! persistence, over the CLEAN op subset. +//! +//! proptest-state-machine fails and minimizes on ANY reference↔SUT divergence, +//! so it runs over the ops that have no known open bug — insert-person, +//! insert-doc, update-doc, read (no delete/optimize/edge/concurrency, which +//! trigger RC-1 / RC-X / FTS-OOB / dup-`@key`). Over that subset reference and +//! engine must agree exactly; a regression that breaks a clean op auto-minimizes +//! to the shortest failing op sequence and persists its seed under +//! `proptest-regressions/`. The generative walk + the named regressions cover +//! the buggy ops; this layer adds the shrinking machinery. +//! +//! The async engine is bridged the canonical sync way (`proptest_equivalence.rs` +//! pattern): the SUT owns a current-thread `Runtime` and every engine call is a +//! `rt.block_on(...)` — the whole campaign is a plain `#[test]`, no ambient +//! runtime, so `block_on` is legal. + +use std::collections::BTreeMap; + +use proptest::prelude::*; +use proptest::strategy::Union; +use proptest::test_runner::Config; +use proptest_state_machine::{ReferenceStateMachine, StateMachineTest}; +use tokio::runtime::Runtime; + +use omnigraph::db::{Omnigraph, ReadTarget}; +use omnigraph::loader::{LoadMode, load_jsonl}; +use omnigraph_compiler::ir::ParamMap; + +use omnigraph_dst::op::{doc, person}; + +/// The reference model (also the state-machine `State`): the keys that must +/// exist and each Doc's expected body. `BTreeMap`/`u32` keep `Debug` output +/// stable, which keeps shrink reports deterministic. +#[derive(Clone, Debug, Default)] +pub struct RefModel { + persons: BTreeMap, + docs: BTreeMap, + next: u32, +} + +/// The clean transitions. Ids are explicit so the reference is a pure function. +#[derive(Clone, Debug)] +pub enum Tr { + InsertPerson(u32), + InsertDoc(u32), + /// (existing doc id, tag) → body `u{id}-{tag}`. + UpdateDoc(u32, u32), + Read, +} + +impl ReferenceStateMachine for RefModel { + type State = RefModel; + type Transition = Tr; + + fn init_state() -> BoxedStrategy { + Just(RefModel::default()).boxed() + } + + fn transitions(state: &RefModel) -> BoxedStrategy { + let next = state.next; + let mut options: Vec> = vec![ + Just(Tr::InsertPerson(next)).boxed(), + Just(Tr::InsertDoc(next)).boxed(), + Just(Tr::Read).boxed(), + ]; + if !state.docs.is_empty() { + let ids: Vec = state.docs.keys().copied().collect(); + options.push( + (proptest::sample::select(ids), any::()) + .prop_map(|(id, tag)| Tr::UpdateDoc(id, tag as u32)) + .boxed(), + ); + } + Union::new(options).boxed() + } + + fn apply(mut state: RefModel, transition: &Tr) -> RefModel { + match transition { + Tr::InsertPerson(id) => { + state.persons.insert(*id, ()); + state.next = state.next.max(id + 1); + } + Tr::InsertDoc(id) => { + state.docs.insert(*id, "needle filler".to_string()); + state.next = state.next.max(id + 1); + } + Tr::UpdateDoc(id, tag) => { + // Mirror engine no-op semantics: only an existing doc changes. + if state.docs.contains_key(id) { + state.docs.insert(*id, format!("u{id}-{tag}")); + } + } + Tr::Read => {} + } + state + } +} + +/// The system under test: a real graph plus the runtime that drives it. +pub struct Sut { + rt: Runtime, + db: Omnigraph, + _dir: tempfile::TempDir, +} + +pub struct DstMachine; + +impl Sut { + fn person_count(&self) -> usize { + self.rt.block_on(async { + self.db + .query( + ReadTarget::branch("main"), + "query q() { match { $x: Person } return { $x.slug } }", + "q", + &ParamMap::new(), + ) + .await + .unwrap() + .num_rows() + }) + } + + /// id → body for every live Doc. + fn doc_bodies(&self) -> BTreeMap { + self.rt.block_on(async { + let res = self + .db + .query( + ReadTarget::branch("main"), + "query c() { match { $d: Doc } return { $d.slug, $d.body } }", + "c", + &ParamMap::new(), + ) + .await + .unwrap(); + let json = res.to_rust_json(); + let mut out = BTreeMap::new(); + for row in json.as_array().unwrap() { + let slug = row["d.slug"].as_str().unwrap(); + let body = row["d.body"].as_str().unwrap().to_string(); + let id: u32 = slug.strip_prefix('g').unwrap().parse().unwrap(); + out.insert(id, body); + } + out + }) + } +} + +impl StateMachineTest for DstMachine { + type SystemUnderTest = Sut; + type Reference = RefModel; + + fn init_test(_ref_state: &RefModel) -> Sut { + let rt = Runtime::new().unwrap(); + let dir = tempfile::tempdir().unwrap(); + let uri = dir.path().to_str().unwrap().to_string(); + let db = rt.block_on(crate::open_clean(&uri)); + Sut { + rt, + db, + _dir: dir, + } + } + + fn apply(sut: Sut, _ref_state: &RefModel, transition: Tr) -> Sut { + sut.rt.block_on(async { + match transition { + Tr::InsertPerson(id) => { + load_jsonl(&sut.db, &person(&format!("g{id}")), LoadMode::Merge) + .await + .unwrap(); + } + Tr::InsertDoc(id) => { + load_jsonl(&sut.db, &doc(&format!("g{id}"), "whatsapp"), LoadMode::Merge) + .await + .unwrap(); + } + Tr::UpdateDoc(id, tag) => { + let q = format!( + "query u() {{ update Doc set {{ body: \"u{id}-{tag}\" }} where slug = \"g{id}\" }}" + ); + sut.db.mutate("main", &q, "u", &ParamMap::new()).await.unwrap(); + } + Tr::Read => { + sut.db + .query( + ReadTarget::branch("main"), + "query w() { match { $d: Doc { source: \"whatsapp\" } } return { $d.slug } }", + "w", + &ParamMap::new(), + ) + .await + .unwrap(); + } + } + }); + sut + } + + fn check_invariants(sut: &Sut, ref_state: &RefModel) { + // count==model and content==model against the reference, plus structural + // row-id uniqueness — all must hold exactly over the clean op subset. + assert_eq!( + sut.person_count(), + ref_state.persons.len(), + "Person count diverged from reference" + ); + assert_eq!( + sut.doc_bodies(), + ref_state.docs, + "Doc id→body diverged from reference (lost/dup/stale write)" + ); + sut.rt + .block_on(omnigraph_dst::invariants::no_duplicate_live_row_ids(&sut.db)) + .expect("duplicate live stable row-id"); + } +} + +proptest_state_machine::prop_state_machine! { + #![proptest_config(Config { cases: 24, .. Config::default() })] + + /// Over the clean op subset, the engine must track the reference for any + /// generated sequence of 1..30 ops; a divergence auto-shrinks + persists. + #[test] + fn clean_ops_track_reference(sequential 1..30 => DstMachine); +} diff --git a/crates/omnigraph/tests/dst_recovery.rs b/crates/omnigraph/tests/dst_recovery.rs new file mode 100644 index 00000000..65c31e53 --- /dev/null +++ b/crates/omnigraph/tests/dst_recovery.rs @@ -0,0 +1,27 @@ +//! Failpoint-gated DST cells, in their OWN binary so the process-global `fail` +//! registry cannot leak armed failpoints into the main `dst` suite's (parallel, +//! non-serial) generative walks. Run with `--features failpoints`; empty +//! otherwise. The reusable harness primitives (op alphabet, white-box +//! invariants, …) come from the shared `omnigraph-dst` crate; this binary adds +//! only the raw-handle helpers + the recovery cells. +//! +//! See `dst/recovery_walk.rs` for the cells; see `dst/MATRIX.md` for why this +//! closes the fault-seam gap. +#![cfg(feature = "failpoints")] +#![allow(dead_code)] + +use omnigraph::db::Omnigraph; + +#[path = "dst/recovery_walk.rs"] +mod recovery_walk; + +/// A raw embedded graph — the recovery cells drive the `Omnigraph` handle +/// directly (white-box), so they don't go through the `Backend` abstraction. +async fn open_clean(uri: &str) -> Omnigraph { + Omnigraph::init(uri, omnigraph_dst::op::SCHEMA) + .await + .expect("init") +} +async fn reopen(uri: &str) -> Omnigraph { + Omnigraph::open(uri).await.expect("reopen") +} diff --git a/docs/dev/testing.md b/docs/dev/testing.md index 1b480f47..ebf8ae8e 100644 --- a/docs/dev/testing.md +++ b/docs/dev/testing.md @@ -7,7 +7,7 @@ This file is the always-on map of the test surface. **Consult it before every ta | Crate | Path | Style | |---|---|---| | `omnigraph` (engine) | `crates/omnigraph/tests/` | Integration tests (28 files), fixture-driven, share `tests/helpers/mod.rs` | -| `omnigraph-cli` | `crates/omnigraph-cli/tests/` | Per-area suites (post-modularization): `cli_cluster.rs` (cluster command surface + operator-actor cascade), `cli_cluster_e2e.rs` (spawned-binary lifecycle compositions — lost-state re-import recovery, out-of-band drift, graph-root destruction, multi-graph mixed-disposition convergence), `cli_data.rs` (load/read/change/branch/commit/export/snapshot/policy/embed/maintenance + operator format cascade), `cli_schema_config.rs` (init/config, schema plan/apply), `cli_queries.rs`, `parity_matrix.rs` (RFC-009 Phase 1: the embedded-vs-remote referee — every forked verb run against both arms with matched Cedar policy and the same actor, scrubbed-JSON + exit-code equality; divergences are pinned in its `KNOWN_DIVERGENCES` ledger, never silently repaired), `system_local.rs` (full-cycle cluster lifecycle with a spawned `--cluster` server, applied-policy enforcement over HTTP, keyed-credential auth, operator aliases), `system_remote.rs`; share `tests/support/mod.rs` (hermetic `OMNIGRAPH_HOME` by default) | +| `omnigraph-cli` | `crates/omnigraph-cli/tests/` | Per-area suites (post-modularization): `cli_cluster.rs` (cluster command surface + operator-actor cascade), `cli_cluster_e2e.rs` (spawned-binary lifecycle compositions — lost-state re-import recovery, out-of-band drift, graph-root destruction, multi-graph mixed-disposition convergence), `cli_data.rs` (load/read/change/branch/commit/export/snapshot/policy/embed/maintenance + operator format cascade), `cli_schema_config.rs` (init/config, schema plan/apply), `cli_queries.rs`, `parity_matrix.rs` (RFC-009 Phase 1: the embedded-vs-remote referee — every forked verb run against both arms with matched Cedar policy and the same actor, scrubbed-JSON + exit-code equality; divergences are pinned in its `KNOWN_DIVERGENCES` ledger, never silently repaired), `cli_dst_parity.rs` (DST cross-backend smoke — a DST-flavored op SEQUENCE [init → 2× `load --merge` → insert → edge-free delete → query] run against the **embedded SDK** and the **CLI subprocess** on twin local graphs must agree on the final slug set; fills the gap parity_matrix leaves — the embedded arm + multi-op sequences; reuses `support::cli()`), `cli_cross_backend_walk.rs` (PR-E — the FULL generative DST walk: the same seeded op stream from the shared `omnigraph-dst` crate drives the embedded `Omnigraph` SDK AND the `omnigraph` CLI subprocess in lockstep, asserting per-step black-box agreement on Person/Doc slug sets + traversable edge count + op-success parity; the white-box battery stays embedded-only, so the CLI arm runs the black-box oracles; one known contract difference allow-listed — `repair --confirm` exits non-zero on suspicious drift where embedded `repair(force=false)` returns Ok), `system_local.rs` (full-cycle cluster lifecycle with a spawned `--cluster` server, applied-policy enforcement over HTTP, keyed-credential auth, operator aliases), `system_remote.rs`; share `tests/support/mod.rs` (hermetic `OMNIGRAPH_HOME` by default) | | `omnigraph-cluster` | mostly in-source `#[cfg(test)] mod tests`; `tests/failpoints.rs` (feature-gated); `tests/s3_cluster.rs` (bucket-gated full lifecycle on object storage) | Cluster config parser, local JSON state diff, state CAS/lock handling/recovery, read-only validate/plan/status plus explicit refresh/import graph observations, config-only apply (content-addressed payload publish, disposition gating, composite-digest convergence, idempotent re-apply), catalog payload verification (status read-only, refresh drift + self-heal), failpoint crash-mid-apply / CAS-race coverage, Stage 4A graph creation (create executor, recovery sidecars + sweep rows, create crash windows), Stage 4B schema apply (migration previews in plan, schema executor, schema-apply sweep classification, schema crash windows), Stage 4C gated deletes (digest-bound approvals, delete executor + tombstones, delete sweep rows, delete crash windows), and 5A policy binding metadata (applies_to in the applied revision, binding-change diffing + convergence, pre-5A backfill), and the 5B serving-snapshot read API (converged read, refusal rows) | | `omnigraph-server` | `crates/omnigraph-server/tests/` | Per-area suites (post-modularization): `auth_policy.rs`, `data_routes.rs`, `schema_routes.rs`, `stored_queries.rs`, `multi_graph.rs` (cluster-mode boot — converged serving, policy binding wiring, boot refusals — + the concurrent branch-ops matrix), `boot_settings.rs` (mode inference, PolicySource), `s3.rs` (bucket-gated: single-graph serving + config-free `--cluster s3://` boot), `openapi.rs` (OpenAPI drift / regeneration); share `tests/support/mod.rs` | | `omnigraph-compiler` | mostly in-source `#[cfg(test)] mod tests` | Parser, type-checker, IR lowering, lint | @@ -49,6 +49,8 @@ The engine's `tests/` is the principal coverage surface; most graph-shaped behav | `failpoints.rs` | Failure-injection coverage (gated on `failpoints` feature). Includes the five per-writer Phase B → recovery integration tests (`recovery_rolls_forward_after_finalize_publisher_failure`, `schema_apply_phase_b_failure_recovered_on_next_open`, `branch_merge_phase_b_failure_recovered_on_next_open`, `ensure_indices_phase_b_failure_recovered_on_next_open`, `optimize_phase_b_failure_recovered_on_next_open`) and the write-entry in-process heal contract (the four `*_after_finalize_publisher_failure_heals_without_reopen` tests — load, mutation, schema apply, branch merge: a follow-up write on the same handle rolls a sidecar-covered residual forward without reopen/refresh) and the storage-fault matrix for the sidecar lifecycle (`recovery.sidecar_{write,delete,list}` / `recovery.record_audit` failpoints: Phase A put failure aborts with zero drift, Phase D delete failure is swallowed and healed by the next write, list failures are loud at heal and open, audit-append failures are retried to exactly one audit row; plus the bucket-gated `s3_load_recovers_after_publisher_failure_without_reopen`). Also the v3→v4 migration fault-injection test (`transient_legacy_open_failure_aborts_migration_without_stamping_v4`, `migration.v3_to_v4.legacy_open` failpoint): a transient legacy-open failure aborts the migration loudly and leaves it retryable (stamp stays v3, no partial backfill), never stamping v4 over an empty backfill. Also the v4 stamp-bump exhaustion regression (`v4_stamp_exhaustion_returns_retryable_contention`, `migration.v4_stamp.force_incompatible` failpoint): the stamp retry loop surfaces a retryable `RowLevelCasContention` on exhaustion, not a stringified `Lance`. And the convergence-idempotent roll-forward regression (`open_sweep_roll_forward_converges_when_manifest_advances_concurrently`: two concurrent open-sweeps race one sidecar at the `recovery.before_roll_forward_publish` rendezvous; the CAS loser must converge, not fail the open — iss-schema-apply-reopen-recovery-race). | | `recovery.rs` | Open-time recovery sweep — sidecar I/O, classifier dispatch (NoMovement / RolledPastExpected / UnexpectedAtP1 / UnexpectedMultistep / InvariantViolation), all-or-nothing decision, roll-forward via `ManifestBatchPublisher::publish`, roll-back via `Dataset::restore`, audit row in `_graph_commit_recoveries.lance`, `OpenMode::ReadOnly` skip path | | `composite_flow.rs` | Compositional/narrative end-to-end stories — multi-step flows that compose mechanics covered by other test files. Catches integration regressions where individual operations all pass their unit tests but their composition breaks (sequential merges, post-merge main writes, time-travel through merge DAG, reopen consistency over multi-merge histories, post-optimize and post-cleanup strict writes). | +| `dst.rs` (+ `dst/`) | **Deterministic-simulation (DST) / morphological-matrix harness** (iss-784/epc-783). A seeded generative walk drives the engine through an op alphabet (`dst/op.rs`), runs a white-box invariant battery after every op (`dst/invariants.rs`: HEAD==manifest, `Dataset::validate`, **unique live `_rowid`** [RC-X corruption class], scalar-index probe, count/content/edges==model, `@key` uniqueness) against a reference `model`, and classifies each finding as a KNOWN open bug (allow-listed) or NOVEL (fails). Includes: characterization regressions pinning the 3 known bugs (`regression_rc1_…`, `regression_rc_x_…`, `regression_dup_key_…`) + 1 harness-surfaced finding (`regression_self_loop_not_traversable`); a `FaultAdapter` (`dst/fault.rs`) `StorageAdapter` wrapper; a `proptest-state-machine` shrinking campaign (`dst/statemachine.rs`); a D3 read-shape × D2 morphology battery (`dst/readshape.rs`); a concurrent multi-actor walk; parser/loader fuzz (`dst/fuzz.rs`, proptest dup/malformed-injecting); an env-gated S3-context battery (`s3_battery_holds`); and the `--features dst` replay-equality oracle (`replay_equality_same_seed`, see the `dst` feature below). Coverage ledger + the "why exhaustive sampling is impossible" rationale live in `dst/MATRIX.md`. | +| `dst_recovery.rs` | **Failpoint-gated** DST recovery cells (own binary so the process-global `fail` registry can't leak armed failpoints into the parallel `dst` walks — run with `--features failpoints`): `mutation.post_finalize_pre_publisher` induces a real `RolledPastExpected` sidecar (the StorageAdapter `FaultAdapter` is off the manifest-publish path and cannot), then `recovery_rolls_forward_under_finalize_failure` (white-box battery after roll-forward) and `concurrent_opens_converge_on_pending_sidecar` (an inline park-first rendezvous forces two open-sweeps to race one sidecar — the CAS loser must converge; the generative shape of the #296 fix). | ## Fixtures @@ -71,6 +73,24 @@ The engine's `tests/` is the principal coverage surface; most graph-shaped behav - **Serialize and rendezvous, never sleep.** The `fail` registry is process-global, so every failpoint test carries `#[serial]` (`serial_test`). For concurrent tests, use `helpers::failpoint::Rendezvous` (`tests/helpers/failpoint.rs`): `park_first(name)` parks the first thread to hit the point until `release()`, and `wait_until_reached().await` blocks on that condition (it doubles as a fired-assertion). Do not coordinate threads with fixed `sleep`s. - Activated tests: `crates/omnigraph/tests/failpoints.rs` and `crates/omnigraph-cluster/tests/failpoints.rs` (integration binaries, never in-source — the fail registry is process-global). Run with `cargo test -p omnigraph-engine --features failpoints --test failpoints` / `cargo test -p omnigraph-cluster --features failpoints --test failpoints`. +## Running the DST harness + +The DST harness (`dst.rs` + `dst/`, above) has several run modes; each is its own CI job, none part of the default `cargo test`: + +```bash +cargo test -p omnigraph-engine --test dst # the generative walks + battery + regressions + fuzz (default; S3 cell skips) +cargo test -p omnigraph-engine --features dst --test dst # + the replay-equality oracle (the `dst` determinism feature, below) +cargo test -p omnigraph-engine --features failpoints --test dst_recovery # the recovery / #296-class cells +cargo test -p omnigraph-cli --test cli_dst_parity # the embedded-vs-CLI cross-backend smoke (fixed sequence) +cargo test -p omnigraph-cli --test cli_cross_backend_walk # the FULL generative embedded-vs-CLI walk (PR-E) +OMNIGRAPH_S3_TEST_BUCKET=… cargo test -p omnigraph-engine --test dst s3_battery_holds # the S3-context battery +``` + +- **The `omnigraph-dst` crate** (`crates/omnigraph-dst`, dev-only, NOT a workspace default-member) holds the reusable harness primitives — the op alphabet, reference `model`, white-box invariant battery, the `FaultAdapter`, and the `Backend` trait with its `Embedded` (in-process) and `Cli` (subprocess) impls — so the SAME seeded walk can run embedded (engine `dst.rs`) AND cross-backend (`cli_cross_backend_walk.rs`). It depends on `omnigraph-engine` and is itself a `[dev-dependencies]` of both the engine and the CLI: a dev-dependency cycle cargo supports (the lib builds without dev-deps). The white-box battery needs the real `Omnigraph` handle (via `Embedded::db()`), so it is embedded-only; a cross-backend walk runs the black-box oracles (count/content/edges==model via `Backend::query`). + +- **The `dst` feature** (`dst = []` in `crates/omnigraph/Cargo.toml`) is a TEST-ONLY deterministic-replay seam, structured exactly like `failpoints` (zero production cost — the `#[cfg(feature="dst")]` blocks compile away). `src/dst.rs` exposes `next_ulid()`/`now_micros()` that the engine's observable ULID/timestamp sites call instead of `Ulid::new()`/`SystemTime::now()`; without the feature (or with no provider in scope via `dst::with_seed`) they fall back to the real source. The harness uses it for `replay_equality_same_seed` (same seed → identical engine id/clock fingerprint + outcome). Bit-identical replay is **engine-layer only** — Lance internals (compaction/index ids, threadpool scheduling) stay non-deterministic. +- See `crates/omnigraph/tests/dst/MATRIX.md` for the coverage ledger (which D1–D5 cells are sampled, the hidden dimensions found by checking against externally-fixed bugs like #296, and what's deferred). + ## RustFS / S3 integration CI runs these S3-backed tests against a containerized RustFS server (`.github/workflows/ci.yml` → `rustfs_integration` job): @@ -81,6 +101,7 @@ CI runs these S3-backed tests against a containerized RustFS server (`.github/wo - `cargo test -p omnigraph-cluster --test s3_cluster` (full control-plane lifecycle on the bucket) - `cargo test -p omnigraph-cli --test system_local local_cli_s3_end_to_end_init_load_read_flow` - `cargo test -p omnigraph-engine --features failpoints --test failpoints s3_` (recovery-sidecar lifecycle on a real bucket) +- `cargo test -p omnigraph-engine --test dst s3_battery_holds` (the DST white-box battery on a real object store) Locally, set `OMNIGRAPH_S3_TEST_BUCKET` (and the usual `AWS_*` vars including `AWS_ENDPOINT_URL_S3` for non-AWS) before running. Without those, S3 tests skip gracefully.