diff --git a/crates/omnigraph/Cargo.toml b/crates/omnigraph/Cargo.toml index 2fe85886..371c4a93 100644 --- a/crates/omnigraph/Cargo.toml +++ b/crates/omnigraph/Cargo.toml @@ -56,6 +56,10 @@ omnigraph-compiler = { path = "../omnigraph-compiler", version = "0.7.1" } tokio = { workspace = true } lance-namespace-impls = { workspace = true } lance-io = "7.0.0" +arrow-array = { workspace = true } +futures = { workspace = true } +serde_json = { workspace = true } serial_test = "3" proptest = "1" +proptest-state-machine = "0.8" async-trait = { workspace = true } diff --git a/crates/omnigraph/tests/dst.rs b/crates/omnigraph/tests/dst.rs index bfdfb88e..f3feb164 100644 --- a/crates/omnigraph/tests/dst.rs +++ b/crates/omnigraph/tests/dst.rs @@ -18,8 +18,10 @@ //! bug is fixed: RC-X → Lance #7230 (Lance v8.0.0); RC-1 → stale-view manifest //! CAS on delete op-class combos; dup-@key → MR-714. +use std::panic::AssertUnwindSafe; use std::sync::Arc; +use futures::FutureExt; use omnigraph::db::ReadTarget; use omnigraph::loader::{LoadMode, load_jsonl}; use omnigraph_compiler::ir::ParamMap; @@ -36,6 +38,10 @@ mod invariants; mod coverage; #[path = "dst/backend.rs"] mod backend; +#[path = "dst/readshape.rs"] +mod readshape; +#[path = "dst/statemachine.rs"] +mod statemachine; use coverage::Coverage; use invariants::{Finding, classify, run_battery}; @@ -181,6 +187,254 @@ async fn regression_dup_key_under_concurrency() { ); } +/// FINDING (harness-surfaced, distinct from the 3 above): a `Knows` SELF-LOOP is +/// committed to the edge table but is NOT returned by `$a knows $b` traversal — +/// durable across optimize and reopen, and the CSR build keeps it (so the drop +/// is in Expand). `proptest_equivalence` can't catch it: it only asserts +/// CSR-vs-indexed MODE equivalence, and both modes drop the self-loop alike. The +/// model-based edges==model oracle caught it; self-loops are kept out of the +/// generic generator so that oracle stays unambiguous. This characterization +/// guard breaks (forcing review) when self-loops become traversable. +#[tokio::test] +async fn regression_self_loop_not_traversable() { + let dir = tempfile::tempdir().unwrap(); + let db = backend::open_clean(dir.path().to_str().unwrap()).await; + load_jsonl(&db, &person("s0"), LoadMode::Merge).await.unwrap(); + load_jsonl(&db, &knows("s0", "s0"), LoadMode::Merge).await.unwrap(); + + // The self-loop row is durably committed to the edge table. + let snap = db.snapshot_of(ReadTarget::branch("main")).await.unwrap(); + let mut raw = 0; + for entry in snap.entries() { + if entry.table_key == "edge:Knows" { + raw = snap + .open(&entry.table_key) + .await + .unwrap() + .count_rows(None) + .await + .unwrap(); + } + } + assert_eq!(raw, 1, "self-loop edge row should be committed"); + + // ...but traversal does not return it (the finding). + let trav = db + .query( + ReadTarget::branch("main"), + "query e() { match { $a: Person $a knows $b } return { $a.slug, $b.slug } }", + "e", + &ParamMap::new(), + ) + .await + .unwrap() + .num_rows(); + assert_eq!( + trav, 0, + "self-loop appears FIXED (traversable={trav}) — flip this regression to expect 1" + ); +} + +// ═══════════════════════ concurrency (multi-actor) ════════════════════════ + +/// Seeded N-actor concurrent walk over a SHARED graph, with an OVERLAPPING key +/// space so same-key upserts race. Under concurrency the sequential reference +/// model doesn't apply, so the oracle is the interleaving-INVARIANT subset: +/// unique live row-ids, `Dataset::validate`, HEAD==manifest, and `@key` +/// uniqueness. dup-`@key` (MR-714) and RC-1 drift are allow-listed knowns; any +/// other divergence is a novel concurrency bug and fails. A Lance panic inside +/// an actor is contained by `tokio::spawn` (surfaces as a JoinError we ignore), +/// so the harness stays up — the post-join battery is the judge. +#[tokio::test(flavor = "multi_thread", worker_threads = 4)] +async fn concurrent_walk_structural_invariants() { + let mut reproduced: Vec = Vec::new(); + for seed in 0..3u64 { + let dir = tempfile::tempdir().unwrap(); + let db = Arc::new(backend::open_clean(dir.path().to_str().unwrap()).await); + let actors = 4usize; + let mut handles = Vec::new(); + for a in 0..actors { + let db = db.clone(); + handles.push(tokio::spawn(async move { + let mut rng = Rng::new(seed.wrapping_mul(131) + a as u64); + for _ in 0..20 { + let k = rng.below(30); // shared 0..30 key space → races + match rng.below(5) { + 0 | 1 => { + let _ = load_jsonl(&db, &person(&format!("h{k}")), LoadMode::Merge).await; + } + 2 => { + let q = format!( + "query u() {{ update Person set {{ name: \"x{k}\" }} where slug = \"h{k}\" }}" + ); + let _ = db.mutate("main", &q, "u", &ParamMap::new()).await; + } + 3 => { + let q = format!("query d() {{ delete Person where slug = \"h{k}\" }}"); + let _ = db.mutate("main", &q, "d", &ParamMap::new()).await; + } + _ => { + let _ = db.optimize().await; + } + } + } + })); + } + for h in handles { + let _ = h.await; // ignore JoinError: a contained actor panic is judged by the battery + } + + let checks: Vec<(&str, Result<(), Finding>)> = vec![ + ("row-id-unique", invariants::no_duplicate_live_row_ids(&db).await), + ("dataset.validate", invariants::dataset_validate(&db).await), + ("head==manifest", invariants::head_eq_manifest(&db).await), + ("no-dup-@key", invariants::no_duplicate_keys(&db, "Person", "main").await), + ]; + for (name, res) in checks { + if let Err(f) = res { + match classify(&f) { + Some(bug) => reproduced.push(format!("seed={seed} [{name}] -> {bug}")), + None => panic!( + "seed={seed}: NOVEL concurrent violation [{name}]: {}", + f.message() + ), + } + } + } + } + eprintln!( + "[dst] concurrent walk: {} known-bug instance(s), 0 novel violations", + reproduced.len() + ); + for r in &reproduced { + eprintln!(" - {r}"); + } +} + +// ═══════════════════════ branch isolation + merge ═════════════════════════ + +async fn count_persons(db: &omnigraph::db::Omnigraph, branch: &str) -> usize { + db.query( + ReadTarget::branch(branch), + "query q() { match { $x: Person } return { $x.slug } }", + "q", + &ParamMap::new(), + ) + .await + .unwrap() + .num_rows() +} + +async fn person_exists(db: &omnigraph::db::Omnigraph, branch: &str, slug: &str) -> usize { + let q = + format!("query p() {{ match {{ $x: Person {{ slug: \"{slug}\" }} }} return {{ $x.slug }} }}"); + db.query(ReadTarget::branch(branch), &q, "p", &ParamMap::new()) + .await + .unwrap() + .num_rows() +} + +/// D4 oracles for the branch subsystem (the deferred B2 branch_isolation + +/// merge_correctness): a branch must not observe `main` writes made after it +/// forked, `main` must not observe branch-only writes, and a merge must produce +/// the row-level union. Kept as a focused scenario (not the generic per-op walk) +/// so the reference model stays single-branch and unambiguous. +#[tokio::test] +async fn branch_isolation_and_merge() { + let dir = tempfile::tempdir().unwrap(); + let db = backend::open_clean(dir.path().to_str().unwrap()).await; + let mut seed = String::new(); + for i in 0..5 { + seed.push_str(&person(&format!("p{i}"))); + } + load_jsonl(&db, &seed, LoadMode::Merge).await.unwrap(); + + db.branch_create("feature").await.unwrap(); + + // Diverge: a post-fork write on each side. + db.mutate( + "main", + "query i() { insert Person { slug: \"m-only\", name: \"n\" } }", + "i", + &ParamMap::new(), + ) + .await + .unwrap(); + db.mutate( + "feature", + "query i() { insert Person { slug: \"f-only\", name: \"n\" } }", + "i", + &ParamMap::new(), + ) + .await + .unwrap(); + + // branch_isolation: neither side sees the other's post-fork write. + assert_eq!( + person_exists(&db, "feature", "m-only").await, + 0, + "isolation: feature observed a post-fork main write" + ); + assert_eq!( + person_exists(&db, "main", "f-only").await, + 0, + "isolation: main observed a feature-only write" + ); + assert_eq!(person_exists(&db, "feature", "f-only").await, 1); + assert_eq!(person_exists(&db, "main", "m-only").await, 1); + + // merge_correctness: main converges to the row-level union (p0..p4 + both). + let outcome = db.branch_merge("feature", "main").await.unwrap(); + let total = count_persons(&db, "main").await; + assert_eq!( + total, 7, + "merge: expected the 7-person union, got {total} (outcome {outcome:?})" + ); + assert_eq!( + person_exists(&db, "main", "f-only").await, + 1, + "merge: feature's row was not merged into main" + ); +} + +// ═══════════════════════ D3 read shapes × D2 morphology ═══════════════════ + +/// Every read shape must execute (no engine error / panic) against every table +/// morphology — the D2×D3 cell sweep — plus the morphology-invariant counts +/// (full-scan == live persons, zero-match == 0) must hold, and the shapes must +/// run against a forked branch too. +#[tokio::test] +async fn read_shape_battery_across_morphologies() { + for morph in readshape::Morph::ALL { + let dir = tempfile::tempdir().unwrap(); + let db = omnigraph::db::Omnigraph::init(dir.path().to_str().unwrap(), readshape::SCHEMA) + .await + .unwrap(); + let expected_persons = readshape::build(&db, morph).await; + for (name, res) in readshape::run(&db, "main").await { + let rows = + res.unwrap_or_else(|e| panic!("morph {morph:?} shape [{name}] errored: {e}")); + if name == "full-scan" { + assert_eq!(rows, expected_persons, "morph {morph:?} full-scan count"); + } + if name == "zero-match" { + assert_eq!(rows, 0, "morph {morph:?} zero-match must be empty"); + } + } + } + + // on-branch morphology: every shape must execute against a forked branch. + let dir = tempfile::tempdir().unwrap(); + let db = omnigraph::db::Omnigraph::init(dir.path().to_str().unwrap(), readshape::SCHEMA) + .await + .unwrap(); + readshape::build(&db, readshape::Morph::MultiFragment).await; + db.branch_create("feature").await.unwrap(); + for (name, res) in readshape::run(&db, "feature").await { + res.unwrap_or_else(|e| panic!("on-branch shape [{name}] errored: {e}")); + } +} + // ═══════════════════════════ generative walk ══════════════════════════════ /// Clean walk: full white-box battery after every op; novel violations fail. @@ -199,7 +453,7 @@ async fn seeded_op_loop_with_cas_faults() { async fn run_walk(faults: bool) { let mut cov = Coverage::new(); let mut reproduced: Vec = Vec::new(); - for seed in 0..2u64 { + for seed in 0..4u64 { let dir = tempfile::tempdir().unwrap(); let uri = dir.path().to_str().unwrap(); let db = if faults { @@ -210,8 +464,29 @@ async fn run_walk(faults: bool) { let mut rng = Rng::new(seed); let mut model = Model::new(); - 'walk: for step_i in 0..15 { - let (kind, res) = op::step(&db, &mut rng, &mut model).await; + 'walk: for step_i in 0..25 { + // A fault-injection / depth DST harness must treat a substrate PANIC + // (a Lance `unwrap`/index crash unwinding through the engine) as a + // finding, not a suite abort — so the op and the battery run under + // catch_unwind, and a known crash signature is classified like any + // other known bug (record + break); a novel panic re-raises. + let stepped = AssertUnwindSafe(op::step(&db, &mut rng, &mut model)) + .catch_unwind() + .await; + let (kind, res) = match stepped { + Ok(kr) => kr, + Err(p) => { + let msg = invariants::panic_message(&*p); + match invariants::classify_panic(&msg) { + Some(bug) => { + cov.finding(bug); + reproduced.push(format!("seed={seed} step={step_i} PANIC -> {bug}")); + break 'walk; + } + None => panic!("seed={seed} step={step_i}: NOVEL panic: {msg}"), + } + } + }; cov.op(kind); if let Err(e) = res { let f = Finding::Engine(e); @@ -227,7 +502,21 @@ async fn run_walk(faults: bool) { None => panic!("seed={seed} step={step_i}: NOVEL op error: {}", f.message()), } } - for (name, res) in run_battery(&db, &model).await { + let battery = match AssertUnwindSafe(run_battery(&db, &model)).catch_unwind().await { + Ok(b) => b, + Err(p) => { + let msg = invariants::panic_message(&*p); + match invariants::classify_panic(&msg) { + Some(bug) => { + cov.finding(bug); + reproduced.push(format!("seed={seed} step={step_i} battery PANIC -> {bug}")); + break 'walk; + } + None => panic!("seed={seed} step={step_i}: NOVEL battery panic: {msg}"), + } + } + }; + for (name, res) in battery { cov.invariant(name); if let Err(f) = res { match classify(&f) { @@ -244,9 +533,32 @@ async fn run_walk(faults: bool) { } } } + + // ── reopen==pre_state: durability oracle ── + // A fresh handle on the same bytes (clean adapter; the open-time recovery + // sweep runs) must agree with the model the walk built. count==model / + // content==model prove the data survived; head==manifest / row-id-disjoint + // prove the sweep left no residual drift. Reuses the same battery at + // durability time, so no separate coverage cell. + drop(db); + let reopened = backend::reopen(uri).await; + for (name, res) in run_battery(&reopened, &model).await { + if let Err(f) = res { + match classify(&f) { + Some(bug) => { + cov.finding(bug); + reproduced.push(format!("seed={seed} [reopen/{name}] -> {bug}")); + } + None => panic!( + "seed={seed}: NOVEL post-reopen violation [{name}]: {}", + f.message() + ), + } + } + } } eprintln!( - "[dst] walk(faults={faults}): coverage [{}]; {} known-bug instance(s), 0 novel violations", + "[dst] walk(faults={faults}): coverage [{}]; {} known-bug instance(s), 0 novel violations (+reopen durability gate)", cov.report(), reproduced.len() ); @@ -254,3 +566,4 @@ async fn run_walk(faults: bool) { eprintln!(" - {r}"); } } + diff --git a/crates/omnigraph/tests/dst/backend.rs b/crates/omnigraph/tests/dst/backend.rs index e1d54d59..91638dc7 100644 --- a/crates/omnigraph/tests/dst/backend.rs +++ b/crates/omnigraph/tests/dst/backend.rs @@ -11,6 +11,13 @@ pub async fn open_clean(uri: &str) -> Omnigraph { Omnigraph::init(uri, SCHEMA).await.expect("init") } +/// Reopen an EXISTING graph from its bytes on disk (runs the open-time recovery +/// sweep in read-write mode). Backs the reopen==pre_state durability oracle: a +/// fresh handle must agree with the model the walk built. +pub async fn reopen(uri: &str) -> Omnigraph { + Omnigraph::open(uri).await.expect("reopen") +} + /// A graph whose storage injects seeded manifest-layer faults (CAS-lost). The /// graph + schema are created cleanly first, then reopened through the /// `FaultAdapter` so only the op workload runs under faults. diff --git a/crates/omnigraph/tests/dst/coverage.rs b/crates/omnigraph/tests/dst/coverage.rs index eb1ed65a..4fea7ab0 100644 --- a/crates/omnigraph/tests/dst/coverage.rs +++ b/crates/omnigraph/tests/dst/coverage.rs @@ -28,7 +28,7 @@ impl Coverage { } pub fn report(&self) -> String { format!( - "ops {}/{}, invariants {}/4, known-bugs exercised {}", + "ops {}/{}, invariants {}/7, known-bugs exercised {}", self.ops.len(), OpKind::ALL.len(), self.invariants.len(), diff --git a/crates/omnigraph/tests/dst/invariants.rs b/crates/omnigraph/tests/dst/invariants.rs index 455f12b1..8f1d7c75 100644 --- a/crates/omnigraph/tests/dst/invariants.rs +++ b/crates/omnigraph/tests/dst/invariants.rs @@ -11,10 +11,15 @@ //! always a real finding (the named regressions cover the known concurrency //! cases separately). +use std::collections::HashSet; + +use arrow_array::{RecordBatch, UInt64Array}; +use futures::TryStreamExt; use omnigraph::db::{Omnigraph, ReadTarget}; use omnigraph::error::OmniError; use omnigraph_compiler::ir::ParamMap; +#[derive(Debug)] pub enum Finding { /// An engine-returned error (variant preserved for structured classification). Engine(OmniError), @@ -37,10 +42,15 @@ pub fn classify(f: &Finding) -> Option<&'static str> { match f { Finding::Engine(e) => { let s = e.to_string(); - // RC-1: stale-view manifest CAS — gated on the Manifest variant so a - // coincidental substring in another subsystem cannot mask it. + // RC-1: edge-table HEAD/manifest drift from the node-delete cascade. + // Gated on the Manifest variant so a coincidental substring elsewhere + // can't mask it. Two surfaces of the same root: the write-time CAS + // "stale view", and a LATER write's precondition refusing the + // uncovered drift ("ahead of manifest ... run omnigraph repair"). if matches!(e, OmniError::Manifest(_)) - && (s.contains("stale view") || (s.contains("expected") && s.contains("current"))) + && (s.contains("stale view") + || (s.contains("expected") && s.contains("current")) + || s.contains("ahead of manifest version")) { return Some("RC-1 stale-view"); } @@ -51,11 +61,49 @@ pub fn classify(f: &Finding) -> Option<&'static str> { } None } - // Structural divergences are never allow-listed. - Finding::Logical(_) => None, + // Structural divergences are novel by default, with ONE narrow + // exception: dup-`@key` (MR-714) is a known open bug, so a finding whose + // message carries the `dup-@key` marker is allow-listed like the engine + // known-bugs above. Every other structural divergence stays novel. + Finding::Logical(s) => { + if s.contains("dup-@key") { + return Some("dup-@key MR-714"); + } + None + } } } +/// Extract a readable message from a caught panic payload. +pub fn panic_message(p: &(dyn std::any::Any + Send)) -> String { + if let Some(s) = p.downcast_ref::<&str>() { + (*s).to_string() + } else if let Some(s) = p.downcast_ref::() { + s.clone() + } else { + "".to_string() + } +} + +/// Classify a caught panic (a Lance-internal `unwrap`/index crash that unwinds +/// through the engine) the same KNOWN-vs-NOVEL way as `classify`. Under fault +/// injection and at walk depth the substrate WILL crash, so the harness must +/// treat a panic as a finding, not let it abort the suite. NONE → re-raise. +/// +/// Note on `index out of bounds`: in THIS harness the only source of that panic +/// is Lance's inverted (FTS) index builder — the harness's own code uses checked +/// loops and returns `Logical` findings, never an OOB panic — so the broad match +/// is safe here and would not mask a harness bug. +pub fn classify_panic(msg: &str) -> Option<&'static str> { + if msg.contains("from_sorted_iter") || msg.contains("non-sorted input") { + return Some("RC-X/#7230 scalar-BTREE"); + } + if msg.contains("index out of bounds") { + return Some("Lance FTS inverted-builder OOB"); + } + None +} + // ── INVARIANT #1: Lance HEAD == manifest table version, per table ── pub async fn head_eq_manifest(db: &Omnigraph) -> Result<(), Finding> { let snap = db @@ -92,6 +140,58 @@ pub async fn dataset_validate(db: &Omnigraph) -> Result<(), Finding> { Ok(()) } +// ── INVARIANT #3a: no duplicate LIVE stable row-id within a table ── +// A stable row id uniquely identifies one live row for the dataset's lifetime; +// the same id appearing on two live rows is exactly the duplicate-row-address +// corruption class behind RC-X / Lance #7230. We read the truth deletion-vector- +// correctly by scanning each table with `with_row_id()` (Lance's `_rowid` +// projection returns only LIVE rows, so a tombstoned id masked by an UPDATE or +// compaction never counts) and asserting the live `_rowid`s are unique. Unlike +// `index_probe` (which only surfaces a bad scalar-BTREE page when a filtered +// READ loads it), this is a direct structural check on every committed row. +// +// (An earlier version compared raw `row_id_meta` fragment ranges, but those +// ranges include tombstoned ids — after an UPDATE+compaction an old fragment's +// range legitimately overlaps the new fragment's, so that check false-positived. +// Scanning live `_rowid`s is the deletion-vector-aware form.) +pub async fn no_duplicate_live_row_ids(db: &Omnigraph) -> Result<(), Finding> { + let snap = db + .snapshot_of(ReadTarget::branch("main")) + .await + .map_err(Finding::Engine)?; + for entry in snap.entries() { + let ds = snap.open(&entry.table_key).await.map_err(Finding::Engine)?; + let mut scanner = ds.scan(); + scanner.with_row_id(); + let batches: Vec = scanner + .try_into_stream() + .await + .map_err(|e| Finding::Logical(format!("scan {}: {e}", entry.table_key)))? + .try_collect() + .await + .map_err(|e| Finding::Logical(format!("scan collect {}: {e}", entry.table_key)))?; + let mut seen: HashSet = HashSet::new(); + for batch in &batches { + let col = batch + .column_by_name("_rowid") + .ok_or_else(|| Finding::Logical(format!("no _rowid column on {}", entry.table_key)))? + .as_any() + .downcast_ref::() + .ok_or_else(|| Finding::Logical(format!("_rowid not u64 on {}", entry.table_key)))?; + for i in 0..col.len() { + let id = col.value(i); + if !seen.insert(id) { + return Err(Finding::Logical(format!( + "duplicate live stable row-id {id} in {} (RC-X-class duplicate row address)", + entry.table_key + ))); + } + } + } + } + Ok(()) +} + // ── INVARIANT #3: scalar-index probe (catches RC-X at creation) ── // Force-loads the Doc.source BTREE flat pages by filtering each enum value. pub async fn index_probe(db: &Omnigraph) -> Result<(), Finding> { @@ -104,13 +204,46 @@ pub async fn index_probe(db: &Omnigraph) -> Result<(), Finding> { Ok(()) } +// ── @key uniqueness (no sequential model — for the concurrent oracle) ── +// Every `@key` value may appear on at most one live row. Concurrent same-key +// upserts that produce duplicates are MR-714 (dup-`@key`); `classify` allow- +// lists the `dup-@key` marker as that known bug. +pub async fn no_duplicate_keys(db: &Omnigraph, ty: &str, branch: &str) -> Result<(), Finding> { + let q = format!("query q() {{ match {{ $x: {ty} }} return {{ $x.slug }} }}"); + let res = db + .query(ReadTarget::branch(branch), &q, "q", &ParamMap::new()) + .await + .map_err(Finding::Engine)?; + let json = res.to_rust_json(); + let rows = json + .as_array() + .ok_or_else(|| Finding::Logical(format!("{ty} key scan not an array")))?; + let total = rows.len(); + let mut seen: HashSet = HashSet::new(); + for row in rows { + let slug = row["x.slug"] + .as_str() + .ok_or_else(|| Finding::Logical(format!("missing x.slug in {ty}")))?; + if !seen.insert(slug.to_string()) { + return Err(Finding::Logical(format!( + "duplicate @key {slug:?} in {ty} ({total} rows, {} distinct) — dup-@key", + seen.len() + ))); + } + } + Ok(()) +} + /// The full battery as a registry: `(name, result)` per invariant. Adding an /// invariant is one line here; the walk iterates and the coverage map records. pub async fn run_battery(db: &Omnigraph, model: &crate::model::Model) -> Vec<(&'static str, Result<(), Finding>)> { vec![ ("head==manifest", head_eq_manifest(db).await), ("dataset.validate", dataset_validate(db).await), + ("row-id-unique", no_duplicate_live_row_ids(db).await), ("index-probe", index_probe(db).await), ("count==model", crate::model::check_counts(db, model).await), + ("content==model", crate::model::check_content(db, model).await), + ("edges==model", crate::model::check_edges(db, model).await), ] } diff --git a/crates/omnigraph/tests/dst/model.rs b/crates/omnigraph/tests/dst/model.rs index f699792a..5b71b71b 100644 --- a/crates/omnigraph/tests/dst/model.rs +++ b/crates/omnigraph/tests/dst/model.rs @@ -6,7 +6,7 @@ //! A divergence means a lost write (countmodel) — both real bugs. -use std::collections::HashSet; +use std::collections::{HashMap, HashSet}; use omnigraph::db::{Omnigraph, ReadTarget}; use omnigraph_compiler::ir::ParamMap; @@ -17,6 +17,14 @@ use crate::invariants::Finding; pub struct Model { persons: HashSet, docs: HashSet, + /// Expected `body` value per live Doc id — the source of truth for the + /// content==model oracle (a lost UPDATE shows up here even when counts match). + doc_body: HashMap, + /// Live `Knows` edges as `from -> to` (the map enforces the schema's + /// `@card(0..1)`: a `from` has at most one outgoing edge). Every endpoint is + /// a live Person by construction — `del_person` cascades both directions, so + /// the model never holds an orphan and the engine producing one is a finding. + knows: HashMap, next: usize, } @@ -37,11 +45,23 @@ impl Model { pub fn add_person(&mut self, id: usize) { self.persons.insert(id); } - pub fn add_doc(&mut self, id: usize) { + pub fn add_doc(&mut self, id: usize, body: String) { self.docs.insert(id); + self.doc_body.insert(id, body); + } + /// Record an UPDATE's new body — only for a Doc the model believes exists, + /// so a no-op update (0 rows matched) doesn't desync the model. + pub fn update_doc_body(&mut self, id: usize, body: String) { + if self.docs.contains(&id) { + self.doc_body.insert(id, body); + } } pub fn del_person(&mut self, id: usize) { self.persons.remove(&id); + // Node delete cascades to incident edges, BOTH directions — mirror it so + // the model never references a deleted Person (this is the RC-1 surface). + self.knows.remove(&id); + self.knows.retain(|_, &mut to| to != id); } pub fn persons(&self) -> usize { self.persons.len() @@ -49,6 +69,37 @@ impl Model { pub fn docs(&self) -> usize { self.docs.len() } + pub fn doc_bodies(&self) -> &HashMap { + &self.doc_body + } + + // ── edges ── + /// Live person ids (for picking an edge endpoint). + pub fn persons_vec(&self) -> Vec { + self.persons.iter().copied().collect() + } + /// Persons with no outgoing `Knows` — the only legal `from` for a new edge + /// under `@card(0..1)`, so every generated InsertKnows is a valid op. + pub fn free_froms(&self) -> Vec { + self.persons + .iter() + .copied() + .filter(|p| !self.knows.contains_key(p)) + .collect() + } + /// Persons that currently have an outgoing edge (legal DeleteKnows targets). + pub fn knows_froms(&self) -> Vec { + self.knows.keys().copied().collect() + } + pub fn add_edge(&mut self, from: usize, to: usize) { + self.knows.insert(from, to); + } + pub fn del_edge(&mut self, from: usize) { + self.knows.remove(&from); + } + pub fn edges(&self) -> usize { + self.knows.len() + } } async fn count(db: &Omnigraph, ty: &str) -> Result { @@ -79,3 +130,106 @@ pub async fn check_counts(db: &Omnigraph, model: &Model) -> Result<(), Finding> } Ok(()) } + +/// content==model: every live Doc's `body` must equal the model's expected +/// value, and no `@key` may appear twice. A count check passes through a +/// lost UPDATE (the row is still there, just stale) or a silent dup-`@key` +/// (a value-level duplicate the row count would only catch if it changed the +/// total) — this is the oracle that catches both. Slugs are `g{id}`. +pub async fn check_content(db: &Omnigraph, model: &Model) -> Result<(), Finding> { + let res = db + .query( + ReadTarget::branch("main"), + "query c() { match { $d: Doc } return { $d.slug, $d.body } }", + "c", + &ParamMap::new(), + ) + .await + .map_err(Finding::Engine)?; + let json = res.to_rust_json(); + let rows = json + .as_array() + .ok_or_else(|| Finding::Logical("Doc content query did not return an array".into()))?; + + let mut seen: HashMap = HashMap::new(); + for row in rows { + let slug = row["d.slug"] + .as_str() + .ok_or_else(|| Finding::Logical(format!("missing d.slug in {row}")))?; + let body = row["d.body"] + .as_str() + .ok_or_else(|| Finding::Logical(format!("missing d.body in {row}")))?; + let id: usize = slug + .strip_prefix('g') + .and_then(|s| s.parse().ok()) + .ok_or_else(|| Finding::Logical(format!("unexpected Doc slug {slug}")))?; + if let Some(prev) = seen.insert(id, body.to_string()) { + return Err(Finding::Logical(format!( + "duplicate Doc @key g{id} (dup-@key): {prev:?} and {body:?}" + ))); + } + } + + for (id, expected) in model.doc_bodies() { + match seen.get(id) { + None => { + return Err(Finding::Logical(format!( + "Doc g{id} missing from engine (model body {expected:?})" + ))); + } + Some(actual) if actual != expected => { + return Err(Finding::Logical(format!( + "Doc g{id} body mismatch: engine {actual:?} != model {expected:?} (lost update)" + ))); + } + _ => {} + } + } + Ok(()) +} + +/// edges==model (referential integrity): two complementary counts must both +/// equal the model's live-edge count. The RAW `edge:Knows` row count (read +/// white-box via the snapshot, so it sees orphans too) catches a lost +/// node-delete cascade that strands an edge pointing at a deleted Person; the +/// TRAVERSAL count (`$a knows $b`, which only matches live endpoints) catches a +/// lost edge write. Raw > traversal means an orphan exists. +pub async fn check_edges(db: &Omnigraph, model: &Model) -> Result<(), Finding> { + let snap = db + .snapshot_of(ReadTarget::branch("main")) + .await + .map_err(Finding::Engine)?; + let mut raw: usize = 0; + for entry in snap.entries() { + if entry.table_key == "edge:Knows" { + let ds = snap.open(&entry.table_key).await.map_err(Finding::Engine)?; + raw = ds + .count_rows(None) + .await + .map_err(|e| Finding::Logical(format!("edge:Knows count_rows: {e}")))?; + } + } + if raw != model.edges() { + return Err(Finding::Logical(format!( + "raw edge:Knows rows={raw} != model={} (orphan edge / lost cascade / dup edge)", + model.edges() + ))); + } + let res = db + .query( + ReadTarget::branch("main"), + "query e() { match { $a: Person $a knows $b } return { $a.slug, $b.slug } }", + "e", + &ParamMap::new(), + ) + .await + .map_err(Finding::Engine)?; + if res.num_rows() != model.edges() { + return Err(Finding::Logical(format!( + "traversable Knows edges={} != model={} (orphan endpoint / lost edge)", + res.num_rows(), + model.edges() + ))); + } + Ok(()) +} diff --git a/crates/omnigraph/tests/dst/op.rs b/crates/omnigraph/tests/dst/op.rs index 7f1b5d71..52bfccc5 100644 --- a/crates/omnigraph/tests/dst/op.rs +++ b/crates/omnigraph/tests/dst/op.rs @@ -3,7 +3,7 @@ //! `OpKind` (for coverage) and the raw `OmniError` (for structured //! classification — never stringified here). -use omnigraph::db::{Omnigraph, ReadTarget}; +use omnigraph::db::{Omnigraph, ReadTarget, RepairOptions}; use omnigraph::error::OmniError; use omnigraph::loader::{LoadMode, load_jsonl}; use omnigraph_compiler::ir::ParamMap; @@ -62,23 +62,29 @@ pub enum OpKind { Optimize, DeletePerson, UpdateDoc, + InsertKnows, + DeleteKnows, + Repair, Read, } impl OpKind { - pub const ALL: [OpKind; 6] = [ + pub const ALL: [OpKind; 9] = [ OpKind::InsertPerson, OpKind::InsertDoc, OpKind::Optimize, OpKind::DeletePerson, OpKind::UpdateDoc, + OpKind::InsertKnows, + OpKind::DeleteKnows, + OpKind::Repair, OpKind::Read, ]; } /// Pick and run one op. The model is updated only on success. pub async fn step(db: &Omnigraph, rng: &mut Rng, model: &mut Model) -> (OpKind, Result<(), OmniError>) { - match rng.below(6) { + match rng.below(9) { 0 => { let mut ids = Vec::new(); let mut data = String::new(); @@ -109,8 +115,9 @@ pub async fn step(db: &Omnigraph, rng: &mut Rng, model: &mut Model) -> (OpKind, } let res = match load_jsonl(db, &data, LoadMode::Merge).await { Ok(_) => { + // `doc()` writes body "needle filler" — track it for content==model. for id in ids { - model.add_doc(id); + model.add_doc(id, "needle filler".to_string()); } Ok(()) } @@ -134,8 +141,80 @@ pub async fn step(db: &Omnigraph, rng: &mut Rng, model: &mut Model) -> (OpKind, 4 => { // UPDATE moves the whole row → scalar-index remap (RC-X morphology). let id = rng.below(model.id_high()); - let q = format!("query u() {{ update Doc set {{ body: \"u{id} needle\" }} where slug = \"g{id}\" }}"); - (OpKind::UpdateDoc, db.mutate("main", &q, "u", &ParamMap::new()).await.map(|_| ())) + let body = format!("u{id} needle"); + let q = format!("query u() {{ update Doc set {{ body: \"{body}\" }} where slug = \"g{id}\" }}"); + let res = match db.mutate("main", &q, "u", &ParamMap::new()).await { + Ok(_) => { + // Only mutates the model for a Doc it believes exists, so a + // no-op update (0 rows matched) can't desync content==model. + model.update_doc_body(id, body); + Ok(()) + } + Err(e) => Err(e), + }; + (OpKind::UpdateDoc, res) + } + 5 => { + // InsertKnows — pick a `from` with no outgoing edge (so the @card(0..1) + // insert is always legal) and any live `to`; both endpoints exist, so + // the engine producing an orphan/dup is a finding, not a generated one. + let froms = model.free_froms(); + let persons = model.persons_vec(); + if froms.is_empty() || persons.is_empty() { + // No legal edge to add yet — a no-op (still counts for coverage). + (OpKind::InsertKnows, Ok(())) + } else { + let from = froms[rng.below(froms.len())]; + // Exclude self-loops: a Knows self-loop is committed to the edge + // table but is NOT returned by `$a knows $b` traversal (durable + // across optimize+reopen; the CSR build keeps it, so the drop is + // in Expand). That stored-but-not-traversable divergence is a real + // finding the harness surfaced — kept out of the generic generator + // so the edges==model oracle stays unambiguous; see B3 notes. + let others: Vec = persons.iter().copied().filter(|p| *p != from).collect(); + if others.is_empty() { + return (OpKind::InsertKnows, Ok(())); + } + let to = others[rng.below(others.len())]; + let data = knows(&format!("g{from}"), &format!("g{to}")); + let res = match load_jsonl(db, &data, LoadMode::Merge).await { + Ok(_) => { + model.add_edge(from, to); + Ok(()) + } + Err(e) => Err(e), + }; + (OpKind::InsertKnows, res) + } + } + 6 => { + // DeleteKnows — remove an existing edge by its `from`. + let froms = model.knows_froms(); + if froms.is_empty() { + (OpKind::DeleteKnows, Ok(())) + } else { + let from = froms[rng.below(froms.len())]; + let q = format!("query dk() {{ delete Knows where from = \"g{from}\" }}"); + let res = match db.mutate("main", &q, "dk", &ParamMap::new()).await { + Ok(_) => { + model.del_edge(from); + Ok(()) + } + Err(e) => Err(e), + }; + (OpKind::DeleteKnows, res) + } + } + 7 => { + // Repair in confirm (not force) mode — heals VERIFIED maintenance + // drift but leaves suspicious/semantic drift (e.g. RC-1's) for + // head_eq_manifest to still catch. A no-op on a clean graph; must + // never change logical data, so the model is untouched. + let opts = RepairOptions { + confirm: true, + force: false, + }; + (OpKind::Repair, db.repair(opts).await.map(|_| ())) } _ => ( OpKind::Read, diff --git a/crates/omnigraph/tests/dst/readshape.rs b/crates/omnigraph/tests/dst/readshape.rs new file mode 100644 index 00000000..910b218c --- /dev/null +++ b/crates/omnigraph/tests/dst/readshape.rs @@ -0,0 +1,169 @@ +//! D3 × D2: the read-shape battery run against deliberately-built table +//! morphologies. The oracle is primarily NO-CRASH — every query shape must +//! execute without an engine error or panic across each latent layout (single +//! fragment, ≥2 fragments, deletion vectors, compacted, on-branch) — plus exact +//! count checks where the answer is morphology-invariant. This is the D2×D3 +//! cell sweep: a planner/execution regression that only bites a specific +//! fragment layout shows up here, where the generic walk's main-branch oracles +//! would miss it. +//! +//! Scope: the non-vector/non-FTS shapes (scan · @key · indexed · non-indexed · +//! range · order+limit · count · numeric aggregates · 1-hop · var-hop · +//! negation · zero-match). Vector (`nearest`) and FTS (`bm25`/`search`) shapes +//! are deferred — they need vector data and the inverted index whose builder +//! OOB-panics at depth (see `regression`-adjacent finding in dst.rs). + +use omnigraph::db::{Omnigraph, ReadTarget}; +use omnigraph::loader::{LoadMode, load_jsonl}; +use omnigraph_compiler::ir::ParamMap; + +/// Richer than the walk schema: `age: I64` unlocks range + numeric aggregates. +pub const SCHEMA: &str = r#" +node Person { + slug: String @key + name: String + age: I64 +} +node Doc { + slug: String @key + source: enum(whatsapp, email, linkedin, slack, telegram) @index + body: String +} +edge Knows: Person -> Person @card(0..1) +"#; + +fn person(slug: &str, age: i64) -> String { + format!("{{\"type\":\"Person\",\"data\":{{\"slug\":\"{slug}\",\"name\":\"n\",\"age\":{age}}}}}\n") +} +fn doc(slug: &str, source: &str) -> String { + format!("{{\"type\":\"Doc\",\"data\":{{\"slug\":\"{slug}\",\"source\":\"{source}\",\"body\":\"needle filler\"}}}}\n") +} +fn knows(from: &str, to: &str) -> String { + format!("{{\"edge\":\"Knows\",\"from\":\"{from}\",\"to\":\"{to}\",\"data\":{{}}}}\n") +} + +#[derive(Clone, Copy, Debug)] +pub enum Morph { + /// One load batch → one fragment. + Single, + /// Each row group in its own `Merge` batch → ≥2 fragments. + MultiFragment, + /// Multi-fragment, then a delete → deletion vectors over live rows. + WithDeletions, + /// Multi-fragment, then `optimize` → compacted + reindexed. + Optimized, +} + +impl Morph { + pub const ALL: [Morph; 4] = [ + Morph::Single, + Morph::MultiFragment, + Morph::WithDeletions, + Morph::Optimized, + ]; +} + +/// The base population (before morphology-specific shaping): 10 persons p0..p9 +/// with ages 0..9, a `knows` chain p0→p1→…→p8, and 10 docs cycling 4 of the 5 +/// enum sources (never `telegram`, so zero-match is deterministic). +fn base_rows() -> (Vec, Vec) { + let sources = ["whatsapp", "email", "linkedin", "slack"]; + let mut persons = Vec::new(); + let mut docs = Vec::new(); + for i in 0..10 { + persons.push(person(&format!("p{i}"), i as i64)); + docs.push(doc(&format!("d{i}"), sources[i % 4])); + } + (persons, docs) +} + +/// Build the requested morphology on a fresh graph; returns the live Person +/// count after shaping (the only count the read battery asserts exactly). +pub async fn build(db: &Omnigraph, morph: Morph) -> usize { + let (persons, docs) = base_rows(); + match morph { + Morph::Single => { + let mut all = persons.concat(); + all.push_str(&docs.concat()); + for i in 0..9 { + all.push_str(&knows(&format!("p{i}"), &format!("p{}", i + 1))); + } + load_jsonl(db, &all, LoadMode::Merge).await.unwrap(); + 10 + } + Morph::MultiFragment | Morph::Optimized => { + // Each person + its doc in its own batch → 10 fragments. + for i in 0..10 { + load_jsonl(db, &format!("{}{}", persons[i], docs[i]), LoadMode::Merge) + .await + .unwrap(); + } + for i in 0..9 { + load_jsonl(db, &knows(&format!("p{i}"), &format!("p{}", i + 1)), LoadMode::Merge) + .await + .unwrap(); + } + if matches!(morph, Morph::Optimized) { + db.optimize().await.unwrap(); + } + 10 + } + Morph::WithDeletions => { + for i in 0..10 { + load_jsonl(db, &format!("{}{}", persons[i], docs[i]), LoadMode::Merge) + .await + .unwrap(); + } + for i in 0..9 { + load_jsonl(db, &knows(&format!("p{i}"), &format!("p{}", i + 1)), LoadMode::Merge) + .await + .unwrap(); + } + // Delete p0..p2 → deletion vectors + cascaded edges. + for i in 0..3 { + db.mutate( + "main", + &format!("query d() {{ delete Person where slug = \"p{i}\" }}"), + "d", + &ParamMap::new(), + ) + .await + .unwrap(); + } + 7 + } + } +} + +/// Every read shape as `(name, query)`. Each must execute without an engine +/// error against every morphology. +pub fn shapes() -> Vec<(&'static str, &'static str)> { + vec![ + ("full-scan", "query q() { match { $p: Person } return { $p.slug } }"), + ("key-filter", "query q() { match { $p: Person { slug: \"p5\" } } return { $p.slug } }"), + ("indexed-filter", "query q() { match { $d: Doc { source: \"whatsapp\" } } return { $d.slug } }"), + ("nonindexed-filter", "query q() { match { $p: Person $p.age >= 5 } return { $p.slug } }"), + ("range", "query q() { match { $p: Person $p.age >= 3 $p.age <= 6 } return { $p.slug } }"), + ("order-limit", "query q() { match { $p: Person } return { $p.slug, $p.age } order { $p.age desc } limit 3 }"), + ("count", "query q() { match { $p: Person } return { count($p) as n } }"), + ("aggregate", "query q() { match { $p: Person } return { sum($p.age) as s, avg($p.age) as a, min($p.age) as mn, max($p.age) as mx } }"), + ("traversal-1hop", "query q() { match { $a: Person $a knows $b } return { $a.slug, $b.slug } }"), + ("var-hop", "query q() { match { $a: Person { slug: \"p3\" } $a knows{1,3} $b } return { $b.slug } }"), + ("negation", "query q() { match { $p: Person not { $p knows $_ } } return { $p.slug } }"), + ("zero-match", "query q() { match { $d: Doc { source: \"telegram\" } } return { $d.slug } }"), + ] +} + +/// Run the whole battery against `branch`; returns `(shape, Result)`. +pub async fn run(db: &Omnigraph, branch: &str) -> Vec<(&'static str, Result)> { + let mut out = Vec::new(); + for (name, q) in shapes() { + let r = db + .query(ReadTarget::branch(branch), q, "q", &ParamMap::new()) + .await + .map(|res| res.num_rows()) + .map_err(|e| e.to_string()); + out.push((name, r)); + } + out +} diff --git a/crates/omnigraph/tests/dst/statemachine.rs b/crates/omnigraph/tests/dst/statemachine.rs new file mode 100644 index 00000000..527ea48f --- /dev/null +++ b/crates/omnigraph/tests/dst/statemachine.rs @@ -0,0 +1,229 @@ +//! B5: proptest-state-machine campaign with automatic SHRINKING + regression +//! persistence, over the CLEAN op subset. +//! +//! proptest-state-machine fails and minimizes on ANY reference↔SUT divergence, +//! so it runs over the ops that have no known open bug — insert-person, +//! insert-doc, update-doc, read (no delete/optimize/edge/concurrency, which +//! trigger RC-1 / RC-X / FTS-OOB / dup-`@key`). Over that subset reference and +//! engine must agree exactly; a regression that breaks a clean op auto-minimizes +//! to the shortest failing op sequence and persists its seed under +//! `proptest-regressions/`. The generative walk + the named regressions cover +//! the buggy ops; this layer adds the shrinking machinery. +//! +//! The async engine is bridged the canonical sync way (`proptest_equivalence.rs` +//! pattern): the SUT owns a current-thread `Runtime` and every engine call is a +//! `rt.block_on(...)` — the whole campaign is a plain `#[test]`, no ambient +//! runtime, so `block_on` is legal. + +use std::collections::BTreeMap; + +use proptest::prelude::*; +use proptest::strategy::Union; +use proptest::test_runner::Config; +use proptest_state_machine::{ReferenceStateMachine, StateMachineTest}; +use tokio::runtime::Runtime; + +use omnigraph::db::{Omnigraph, ReadTarget}; +use omnigraph::loader::{LoadMode, load_jsonl}; +use omnigraph_compiler::ir::ParamMap; + +use crate::backend; +use crate::op::{doc, person}; + +/// The reference model (also the state-machine `State`): the keys that must +/// exist and each Doc's expected body. `BTreeMap`/`u32` keep `Debug` output +/// stable, which keeps shrink reports deterministic. +#[derive(Clone, Debug, Default)] +pub struct RefModel { + persons: BTreeMap, + docs: BTreeMap, + next: u32, +} + +/// The clean transitions. Ids are explicit so the reference is a pure function. +#[derive(Clone, Debug)] +pub enum Tr { + InsertPerson(u32), + InsertDoc(u32), + /// (existing doc id, tag) → body `u{id}-{tag}`. + UpdateDoc(u32, u32), + Read, +} + +impl ReferenceStateMachine for RefModel { + type State = RefModel; + type Transition = Tr; + + fn init_state() -> BoxedStrategy { + Just(RefModel::default()).boxed() + } + + fn transitions(state: &RefModel) -> BoxedStrategy { + let next = state.next; + let mut options: Vec> = vec![ + Just(Tr::InsertPerson(next)).boxed(), + Just(Tr::InsertDoc(next)).boxed(), + Just(Tr::Read).boxed(), + ]; + if !state.docs.is_empty() { + let ids: Vec = state.docs.keys().copied().collect(); + options.push( + (proptest::sample::select(ids), any::()) + .prop_map(|(id, tag)| Tr::UpdateDoc(id, tag as u32)) + .boxed(), + ); + } + Union::new(options).boxed() + } + + fn apply(mut state: RefModel, transition: &Tr) -> RefModel { + match transition { + Tr::InsertPerson(id) => { + state.persons.insert(*id, ()); + state.next = state.next.max(id + 1); + } + Tr::InsertDoc(id) => { + state.docs.insert(*id, "needle filler".to_string()); + state.next = state.next.max(id + 1); + } + Tr::UpdateDoc(id, tag) => { + // Mirror engine no-op semantics: only an existing doc changes. + if state.docs.contains_key(id) { + state.docs.insert(*id, format!("u{id}-{tag}")); + } + } + Tr::Read => {} + } + state + } +} + +/// The system under test: a real graph plus the runtime that drives it. +pub struct Sut { + rt: Runtime, + db: Omnigraph, + _dir: tempfile::TempDir, +} + +pub struct DstMachine; + +impl Sut { + fn person_count(&self) -> usize { + self.rt.block_on(async { + self.db + .query( + ReadTarget::branch("main"), + "query q() { match { $x: Person } return { $x.slug } }", + "q", + &ParamMap::new(), + ) + .await + .unwrap() + .num_rows() + }) + } + + /// id → body for every live Doc. + fn doc_bodies(&self) -> BTreeMap { + self.rt.block_on(async { + let res = self + .db + .query( + ReadTarget::branch("main"), + "query c() { match { $d: Doc } return { $d.slug, $d.body } }", + "c", + &ParamMap::new(), + ) + .await + .unwrap(); + let json = res.to_rust_json(); + let mut out = BTreeMap::new(); + for row in json.as_array().unwrap() { + let slug = row["d.slug"].as_str().unwrap(); + let body = row["d.body"].as_str().unwrap().to_string(); + let id: u32 = slug.strip_prefix('g').unwrap().parse().unwrap(); + out.insert(id, body); + } + out + }) + } +} + +impl StateMachineTest for DstMachine { + type SystemUnderTest = Sut; + type Reference = RefModel; + + fn init_test(_ref_state: &RefModel) -> Sut { + let rt = Runtime::new().unwrap(); + let dir = tempfile::tempdir().unwrap(); + let uri = dir.path().to_str().unwrap().to_string(); + let db = rt.block_on(backend::open_clean(&uri)); + Sut { + rt, + db, + _dir: dir, + } + } + + fn apply(sut: Sut, _ref_state: &RefModel, transition: Tr) -> Sut { + sut.rt.block_on(async { + match transition { + Tr::InsertPerson(id) => { + load_jsonl(&sut.db, &person(&format!("g{id}")), LoadMode::Merge) + .await + .unwrap(); + } + Tr::InsertDoc(id) => { + load_jsonl(&sut.db, &doc(&format!("g{id}"), "whatsapp"), LoadMode::Merge) + .await + .unwrap(); + } + Tr::UpdateDoc(id, tag) => { + let q = format!( + "query u() {{ update Doc set {{ body: \"u{id}-{tag}\" }} where slug = \"g{id}\" }}" + ); + sut.db.mutate("main", &q, "u", &ParamMap::new()).await.unwrap(); + } + Tr::Read => { + sut.db + .query( + ReadTarget::branch("main"), + "query w() { match { $d: Doc { source: \"whatsapp\" } } return { $d.slug } }", + "w", + &ParamMap::new(), + ) + .await + .unwrap(); + } + } + }); + sut + } + + fn check_invariants(sut: &Sut, ref_state: &RefModel) { + // count==model and content==model against the reference, plus structural + // row-id uniqueness — all must hold exactly over the clean op subset. + assert_eq!( + sut.person_count(), + ref_state.persons.len(), + "Person count diverged from reference" + ); + assert_eq!( + sut.doc_bodies(), + ref_state.docs, + "Doc id→body diverged from reference (lost/dup/stale write)" + ); + sut.rt + .block_on(crate::invariants::no_duplicate_live_row_ids(&sut.db)) + .expect("duplicate live stable row-id"); + } +} + +proptest_state_machine::prop_state_machine! { + #![proptest_config(Config { cases: 24, .. Config::default() })] + + /// Over the clean op subset, the engine must track the reference for any + /// generated sequence of 1..30 ops; a divergence auto-shrinks + persists. + #[test] + fn clean_ops_track_reference(sequential 1..30 => DstMachine); +}