diff --git a/crates/omnigraph-cli/tests/cli_dst_parity.rs b/crates/omnigraph-cli/tests/cli_dst_parity.rs new file mode 100644 index 00000000..1e3cf250 --- /dev/null +++ b/crates/omnigraph-cli/tests/cli_dst_parity.rs @@ -0,0 +1,152 @@ +//! CLI cross-backend smoke (DST PR-D follow-up). +//! +//! A DST-flavored op SEQUENCE — init → two `load --merge` (multi-fragment) → +//! insert → edge-free delete → query — run against the EMBEDDED `Omnigraph` SDK +//! and the `omnigraph` CLI SUBPROCESS on twin local graphs must agree on the +//! final person-slug set. +//! +//! `parity_matrix.rs` already covers SINGLE-verb CLI-local-vs-HTTP-server +//! parity; this adds the missing pieces: the **embedded-SDK arm** and a +//! **multi-op sequence**. The CLI wraps the same engine, so this verifies the +//! CLI transport layer (arg parsing, `--json` serialization, `--store` +//! addressing, `--as` actor resolution) faithfully reflects the embedded engine +//! across a realistic sequence — it is NOT a second engine implementation. +//! +//! Oracle is format-tolerant (embedded `to_rust_json` and CLI `--json` differ in +//! wrapping): compare the SET of `smoke-*` slug strings recursively extracted +//! from each arm's output. + +mod support; + +use std::collections::BTreeSet; + +use omnigraph::db::{Omnigraph, ReadTarget}; +use omnigraph::loader::{LoadMode, load_jsonl}; +use omnigraph_compiler::ir::ParamMap; + +const SCHEMA: &str = "node Person { slug: String @key name: String }\nedge Knows: Person -> Person\n"; +const ACTOR: &str = "smoke-actor"; +const FINAL_Q: &str = "query q() { match { $p: Person } return { $p.slug } }"; +const INSERT: &str = "query i() { insert Person { slug: \"smoke-new\", name: \"n\" } }"; +// smoke-8 has NO incident edges → a clean single-statement delete (no cascade, +// so it can't trip the RC-1 multi-statement-delete drift) — both arms succeed. +const DELETE: &str = "query d() { delete Person where slug = \"smoke-8\" }"; + +fn person(slug: &str) -> String { + format!("{{\"type\":\"Person\",\"data\":{{\"slug\":\"{slug}\",\"name\":\"n\"}}}}\n") +} +fn knows(a: &str, b: &str) -> String { + format!("{{\"edge\":\"Knows\",\"from\":\"{a}\",\"to\":\"{b}\",\"data\":{{}}}}\n") +} +fn batch1() -> String { + (0..5).map(|i| person(&format!("smoke-{i}"))).collect() +} +fn batch2() -> String { + let mut s: String = (5..10).map(|i| person(&format!("smoke-{i}"))).collect(); + s.push_str(&knows("smoke-0", "smoke-1")); + s.push_str(&knows("smoke-1", "smoke-2")); + s +} + +/// Recursively collect every string value beginning with `smoke-`. +fn smoke_slugs(v: &serde_json::Value) -> BTreeSet { + fn walk(v: &serde_json::Value, out: &mut BTreeSet) { + match v { + serde_json::Value::String(s) if s.starts_with("smoke-") => { + out.insert(s.clone()); + } + serde_json::Value::Array(a) => a.iter().for_each(|x| walk(x, out)), + serde_json::Value::Object(m) => m.values().for_each(|x| walk(x, out)), + _ => {} + } + } + let mut out = BTreeSet::new(); + walk(v, &mut out); + out +} + +/// Drive the sequence via the embedded engine; return the final slug set. +async fn embedded_arm(uri: &str) -> BTreeSet { + let db = Omnigraph::init(uri, SCHEMA).await.unwrap(); + load_jsonl(&db, &batch1(), LoadMode::Merge).await.unwrap(); + load_jsonl(&db, &batch2(), LoadMode::Merge).await.unwrap(); + db.mutate("main", INSERT, "i", &ParamMap::new()).await.unwrap(); + db.mutate("main", DELETE, "d", &ParamMap::new()).await.unwrap(); + let res = db + .query(ReadTarget::branch("main"), FINAL_Q, "q", &ParamMap::new()) + .await + .unwrap(); + smoke_slugs(&res.to_rust_json()) +} + +/// Drive the SAME sequence via the `omnigraph` CLI subprocess (sync — assert_cmd). +fn cli_arm(uri: &str, schema_path: &str, b1: &str, b2: &str) -> BTreeSet { + support::cli() + .args(["init", "--schema", schema_path, uri]) + .assert() + .success(); + support::cli() + .args(["load", "--mode", "merge", "--data", b1, uri]) + .assert() + .success(); + support::cli() + .args(["load", "--mode", "merge", "--data", b2, uri]) + .assert() + .success(); + support::cli() + .args(["mutate", "-e", INSERT, "--store", uri, "--as", ACTOR]) + .assert() + .success(); + support::cli() + .args(["mutate", "-e", DELETE, "--store", uri, "--as", ACTOR]) + .assert() + .success(); + let out = support::cli() + .args(["query", "-e", FINAL_Q, "--json", "--store", uri, "--as", ACTOR]) + .output() + .expect("cli query spawn"); + assert!(out.status.success(), "cli query failed: {out:?}"); + let v: serde_json::Value = + serde_json::from_slice(&out.stdout).expect("cli --json query output"); + smoke_slugs(&v) +} + +#[tokio::test(flavor = "multi_thread")] +async fn embedded_and_cli_agree_on_dst_sequence() { + let dir = tempfile::tempdir().unwrap(); + let schema_path = dir.path().join("smoke.pg"); + std::fs::write(&schema_path, SCHEMA).unwrap(); + let b1 = dir.path().join("b1.jsonl"); + std::fs::write(&b1, batch1()).unwrap(); + let b2 = dir.path().join("b2.jsonl"); + std::fs::write(&b2, batch2()).unwrap(); + let emb_uri = dir.path().join("emb.omni"); + let cli_uri = dir.path().join("cli.omni"); + + // Embedded arm (in-process async). + let emb = embedded_arm(emb_uri.to_str().unwrap()).await; + + // CLI arm (subprocess; assert_cmd is sync → run on a blocking thread). + let (cli_uri_s, schema_s, b1_s, b2_s) = ( + cli_uri.to_str().unwrap().to_string(), + schema_path.to_str().unwrap().to_string(), + b1.to_str().unwrap().to_string(), + b2.to_str().unwrap().to_string(), + ); + let cli = tokio::task::spawn_blocking(move || cli_arm(&cli_uri_s, &schema_s, &b1_s, &b2_s)) + .await + .unwrap(); + + assert_eq!( + emb, cli, + "embedded and CLI diverged on the final person-slug set" + ); + + // Sanity: the sequence produced the expected set (0..9 + new − 8). + let expected: BTreeSet = (0..10) + .filter(|i| *i != 8) + .map(|i| format!("smoke-{i}")) + .chain(["smoke-new".to_string()]) + .collect(); + assert_eq!(emb, expected, "unexpected final state: {emb:?}"); +} diff --git a/crates/omnigraph/tests/dst.rs b/crates/omnigraph/tests/dst.rs index bec60df2..4aa0af2b 100644 --- a/crates/omnigraph/tests/dst.rs +++ b/crates/omnigraph/tests/dst.rs @@ -42,6 +42,8 @@ mod backend; mod readshape; #[path = "dst/statemachine.rs"] mod statemachine; +#[path = "dst/fuzz.rs"] +mod fuzz; // NOTE: the failpoint-gated recovery cells live in their OWN binary // (`tests/dst_recovery.rs`), not here — the process-global `fail` registry would // otherwise leak armed failpoints into these (non-serial, parallel) walks. @@ -581,3 +583,51 @@ async fn run_walk(faults: bool) { } + +// ═══════════════════════════ D5 context: S3 (PR-D) ════════════════════════ + +/// Build a unique `s3://` DST graph URI, or `None` when the bucket env is unset +/// (skip locally; runs in CI's `rustfs_integration` job). +fn s3_dst_uri() -> Option { + let bucket = std::env::var("OMNIGRAPH_S3_TEST_BUCKET").ok()?; + let base = if bucket.starts_with("s3://") { + bucket + } else { + format!("s3://{bucket}") + }; + let nanos = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap() + .as_nanos(); + Some(format!( + "{}/dst-harness/{}-{}", + base.trim_end_matches('/'), + std::process::id(), + nanos + )) +} + +/// D5 S3 context: the FULL white-box battery must hold on a real object-store +/// backend, not just local FS. A short seeded op sequence builds state on +/// `s3://`, then the battery runs (known bugs allow-listed, novel → fail). +#[tokio::test(flavor = "multi_thread")] +async fn s3_battery_holds() { + let Some(uri) = s3_dst_uri() else { + eprintln!("skipping s3 dst battery: OMNIGRAPH_S3_TEST_BUCKET unset"); + return; + }; + let db = backend::open_clean(&uri).await; + let mut rng = Rng::new(1); + let mut model = Model::new(); + for _ in 0..8 { + let _ = op::step(&db, &mut rng, &mut model).await; + } + for (name, res) in run_battery(&db, &model).await { + if let Err(f) = res { + match classify(&f) { + Some(_bug) => {} // known open bug — allow-listed + None => panic!("s3: NOVEL battery violation [{name}]: {}", f.message()), + } + } + } +} diff --git a/crates/omnigraph/tests/dst/MATRIX.md b/crates/omnigraph/tests/dst/MATRIX.md index cd701a15..2215d262 100644 --- a/crates/omnigraph/tests/dst/MATRIX.md +++ b/crates/omnigraph/tests/dst/MATRIX.md @@ -76,9 +76,10 @@ Legend: ✅ sampled · 🟡 partial · ❌ unsampled · ⏸️ deferred-by-plan | context | status | note | |---------|--------|------| | embedded backend | ✅ | the only backend | -| **CLI / long-lived server backend** | ⏸️ | PR-D | +| **CLI / long-lived server backend** | ❌ deferred | crate-boundary friction: harness is in `omnigraph-engine` tests, the `omnigraph` binary is in `omnigraph-cli` (`CARGO_BIN_EXE_omnigraph` is cross-package, unavailable), and the natural home — `omnigraph-cli` tests — can't reach the harness modules. Plus subprocess-per-op is slow/flaky. Needs the harness op/model extracted to a shared crate first. | | local FS | ✅ | | -| **S3 (RustFS/MinIO)** | ⏸️ | PR-D | +| **S3 (RustFS/MinIO)** | ✅ (PR-D) | `s3_battery_holds` — full battery on `s3://`, env-gated (`OMNIGRAPH_S3_TEST_BUCKET`) | +| **parser/loader fuzz** | ✅ (PR-D) | `fuzz::*` — proptest dup/malformed-injecting; `cargo-fuzz`/libFuzzer deferred (needs nightly) | | single writer | ✅ | | | concurrent writers (one handle) | ✅ | `concurrent_walk_structural_invariants` | | **concurrent *opens* / ≥2 recovery sweeps** | ❌ | **the `#296` cell — see below** | @@ -128,7 +129,12 @@ seam cannot induce a `RolledPastExpected` sidecar. one-winner-CAS territory. ❌ remaining. 5. **generative** (not hand-armed) recovery faults — wrap Lance's `object_store` so the walk *discovers* sidecar/CAS bugs instead of the cells being scripted. ❌ -6. **vector/FTS/rrf read shapes** + **determinism/replay-equality** (PR-C) + **CLI/server/S3 backends** (PR-D). ⏸️ +6. **determinism/replay-equality** (PR-C) — ✅ DONE (`--features dst` seam + + `replay_equality_same_seed`). **S3 context + parser/loader fuzz** (PR-D) — ✅ DONE + (`s3_battery_holds`, `fuzz::*`). **Still ⏸️/❌:** CLI/server backends (deferred, + crate-boundary — see D5 table), `cargo-fuzz`/libFuzzer (needs nightly), + `porcupine` linearizability, vector/FTS/rrf read shapes, wrapping the main + walks in `with_seed`. ## The standing rule When a bug is found *outside* this harness, before closing it: add its row to the diff --git a/crates/omnigraph/tests/dst/fuzz.rs b/crates/omnigraph/tests/dst/fuzz.rs new file mode 100644 index 00000000..577fe316 --- /dev/null +++ b/crates/omnigraph/tests/dst/fuzz.rs @@ -0,0 +1,176 @@ +//! PR-D parser/loader fuzz — proptest-based (no nightly / `cargo-fuzz` needed). +//! +//! The lesson of Lance #7230: a parse/merge path that was only ever fed +//! pre-sorted, dup-free input shipped a latent crash. So feed the GQ query +//! parser, the schema parser, and the JSONL loader BOTH arbitrary and +//! ADVERSARIAL (duplicate-key, reordered-field, malformed) input and assert they +//! return a `Result` — never PANIC. proptest shrinks any panic to a minimal +//! input and persists the seed under `proptest-regressions/`. +//! +//! (cargo-fuzz/libFuzzer targets are the deferred follow-up — they need a +//! nightly toolchain; this proptest form gets the same "parser survives hostile +//! input" coverage with the stable toolchain already in use.) + +use proptest::prelude::*; + +use omnigraph::loader::{LoadMode, load_jsonl}; +use omnigraph_compiler::find_named_query; +use omnigraph_compiler::schema::parser::parse_schema; + +use crate::backend; + +/// Random GQ-token sequences — far likelier to reach deep parser states (and +/// their panics) than purely random bytes. +fn arb_gq() -> impl Strategy { + let token = prop_oneof![ + Just("query"), + Just("q"), + Just("("), + Just(")"), + Just("{"), + Just("}"), + Just("match"), + Just("return"), + Just("$x"), + Just("$y"), + Just(":"), + Just("Person"), + Just("Doc"), + Just("knows"), + Just("not"), + Just("limit"), + Just("3"), + Just("order"), + Just("desc"), + Just(","), + Just("."), + Just("slug"), + Just("\""), + Just("count"), + Just("="), + Just("where"), + Just("{1,3}"), + Just("nearest"), + ] + .prop_map(|s| s.to_string()); + proptest::collection::vec(token, 0..40).prop_map(|toks| toks.join(" ")) +} + +/// Random `.pg`-token sequences for the schema parser. +fn arb_pg() -> impl Strategy { + let token = prop_oneof![ + Just("node"), + Just("edge"), + Just("Person"), + Just("Doc"), + Just("{"), + Just("}"), + Just("slug"), + Just(":"), + Just("String"), + Just("I64"), + Just("@key"), + Just("@index"), + Just("@card(0..1)"), + Just("enum(a,b)"), + Just("->"), + Just("Knows"), + Just("\n"), + ] + .prop_map(|s| s.to_string()); + proptest::collection::vec(token, 0..40).prop_map(|toks| toks.join(" ")) +} + +proptest! { + #![proptest_config(ProptestConfig { cases: 256, .. ProptestConfig::default() })] + + /// The GQ query parser must return a Result for ANY input — never panic. + #[test] + fn gq_parser_never_panics(src in ".{0,256}") { + let _ = find_named_query(&src, "q"); + } + + /// ...including structured GQ-token soup that reaches deep parse states. + #[test] + fn gq_parser_structured_never_panics(src in arb_gq()) { + let _ = find_named_query(&src, "q"); + } + + /// The schema parser must return a Result for ANY input — never panic. + #[test] + fn schema_parser_never_panics(src in ".{0,256}") { + let _ = parse_schema(&src); + } + + /// ...including structured `.pg`-token soup. + #[test] + fn schema_parser_structured_never_panics(src in arb_pg()) { + let _ = parse_schema(&src); + } +} + +/// The JSONL loader must SURVIVE adversarial batches — duplicate `@key`s, +/// reordered/extra/missing fields, wrong types, malformed JSON, orphan edges — +/// returning `Ok`/`Err`, never panicking (and never corrupting: a later normal +/// load + read still works). +#[tokio::test] +async fn loader_survives_adversarial_jsonl() { + let dir = tempfile::tempdir().unwrap(); + let db = backend::open_clean(dir.path().to_str().unwrap()).await; + + let adversarial: &[&str] = &[ + // duplicate @key within one batch + "{\"type\":\"Person\",\"data\":{\"slug\":\"dup\",\"name\":\"a\"}}\n{\"type\":\"Person\",\"data\":{\"slug\":\"dup\",\"name\":\"b\"}}\n", + // reordered top-level fields + "{\"data\":{\"slug\":\"r1\",\"name\":\"n\"},\"type\":\"Person\"}\n", + // extra unknown field + "{\"type\":\"Person\",\"data\":{\"slug\":\"x1\",\"name\":\"n\",\"bogus\":42}}\n", + // missing required property (name) + "{\"type\":\"Person\",\"data\":{\"slug\":\"x2\"}}\n", + // wrong type (name as number) + "{\"type\":\"Person\",\"data\":{\"slug\":\"x3\",\"name\":99}}\n", + // unknown node type + "{\"type\":\"Ghost\",\"data\":{\"slug\":\"g\"}}\n", + // null slug + "{\"type\":\"Person\",\"data\":{\"slug\":null,\"name\":\"n\"}}\n", + // orphan edge (endpoints don't exist) + "{\"edge\":\"Knows\",\"from\":\"nope\",\"to\":\"nope\",\"data\":{}}\n", + // malformed JSON line + "{not json at all\n", + // empty line + valid line + "\n{\"type\":\"Doc\",\"data\":{\"slug\":\"d1\",\"source\":\"whatsapp\",\"body\":\"b\"}}\n", + // enum out of range + "{\"type\":\"Doc\",\"data\":{\"slug\":\"d2\",\"source\":\"not_an_enum\",\"body\":\"b\"}}\n", + // huge string + &{ + let big = "z".repeat(50_000); + format!("{{\"type\":\"Person\",\"data\":{{\"slug\":\"big\",\"name\":\"{big}\"}}}}\n") + }, + ]; + + for (i, batch) in adversarial.iter().enumerate() { + // The ONLY contract: no panic. Ok or Err are both acceptable. + let _ = load_jsonl(&db, batch, LoadMode::Merge).await; + let _ = i; + } + + // ...and the graph is still usable after all that abuse. + load_jsonl( + &db, + "{\"type\":\"Person\",\"data\":{\"slug\":\"sane\",\"name\":\"ok\"}}\n", + LoadMode::Merge, + ) + .await + .unwrap(); + let n = db + .query( + omnigraph::db::ReadTarget::branch("main"), + "query q() { match { $p: Person { slug: \"sane\" } } return { $p.slug } }", + "q", + &omnigraph_compiler::ir::ParamMap::new(), + ) + .await + .unwrap() + .num_rows(); + assert_eq!(n, 1, "graph must remain usable after adversarial loads"); +} diff --git a/docs/dev/testing.md b/docs/dev/testing.md index 21b0c8ed..79bf5355 100644 --- a/docs/dev/testing.md +++ b/docs/dev/testing.md @@ -7,7 +7,7 @@ This file is the always-on map of the test surface. **Consult it before every ta | Crate | Path | Style | |---|---|---| | `omnigraph` (engine) | `crates/omnigraph/tests/` | Integration tests (28 files), fixture-driven, share `tests/helpers/mod.rs` | -| `omnigraph-cli` | `crates/omnigraph-cli/tests/` | Per-area suites (post-modularization): `cli_cluster.rs` (cluster command surface + operator-actor cascade), `cli_cluster_e2e.rs` (spawned-binary lifecycle compositions — lost-state re-import recovery, out-of-band drift, graph-root destruction, multi-graph mixed-disposition convergence), `cli_data.rs` (load/read/change/branch/commit/export/snapshot/policy/embed/maintenance + operator format cascade), `cli_schema_config.rs` (init/config, schema plan/apply), `cli_queries.rs`, `parity_matrix.rs` (RFC-009 Phase 1: the embedded-vs-remote referee — every forked verb run against both arms with matched Cedar policy and the same actor, scrubbed-JSON + exit-code equality; divergences are pinned in its `KNOWN_DIVERGENCES` ledger, never silently repaired), `system_local.rs` (full-cycle cluster lifecycle with a spawned `--cluster` server, applied-policy enforcement over HTTP, keyed-credential auth, operator aliases), `system_remote.rs`; share `tests/support/mod.rs` (hermetic `OMNIGRAPH_HOME` by default) | +| `omnigraph-cli` | `crates/omnigraph-cli/tests/` | Per-area suites (post-modularization): `cli_cluster.rs` (cluster command surface + operator-actor cascade), `cli_cluster_e2e.rs` (spawned-binary lifecycle compositions — lost-state re-import recovery, out-of-band drift, graph-root destruction, multi-graph mixed-disposition convergence), `cli_data.rs` (load/read/change/branch/commit/export/snapshot/policy/embed/maintenance + operator format cascade), `cli_schema_config.rs` (init/config, schema plan/apply), `cli_queries.rs`, `parity_matrix.rs` (RFC-009 Phase 1: the embedded-vs-remote referee — every forked verb run against both arms with matched Cedar policy and the same actor, scrubbed-JSON + exit-code equality; divergences are pinned in its `KNOWN_DIVERGENCES` ledger, never silently repaired), `cli_dst_parity.rs` (DST cross-backend smoke — a DST-flavored op SEQUENCE [init → 2× `load --merge` → insert → edge-free delete → query] run against the **embedded SDK** and the **CLI subprocess** on twin local graphs must agree on the final slug set; fills the gap parity_matrix leaves — the embedded arm + multi-op sequences; reuses `support::cli()`), `system_local.rs` (full-cycle cluster lifecycle with a spawned `--cluster` server, applied-policy enforcement over HTTP, keyed-credential auth, operator aliases), `system_remote.rs`; share `tests/support/mod.rs` (hermetic `OMNIGRAPH_HOME` by default) | | `omnigraph-cluster` | mostly in-source `#[cfg(test)] mod tests`; `tests/failpoints.rs` (feature-gated); `tests/s3_cluster.rs` (bucket-gated full lifecycle on object storage) | Cluster config parser, local JSON state diff, state CAS/lock handling/recovery, read-only validate/plan/status plus explicit refresh/import graph observations, config-only apply (content-addressed payload publish, disposition gating, composite-digest convergence, idempotent re-apply), catalog payload verification (status read-only, refresh drift + self-heal), failpoint crash-mid-apply / CAS-race coverage, Stage 4A graph creation (create executor, recovery sidecars + sweep rows, create crash windows), Stage 4B schema apply (migration previews in plan, schema executor, schema-apply sweep classification, schema crash windows), Stage 4C gated deletes (digest-bound approvals, delete executor + tombstones, delete sweep rows, delete crash windows), and 5A policy binding metadata (applies_to in the applied revision, binding-change diffing + convergence, pre-5A backfill), and the 5B serving-snapshot read API (converged read, refusal rows) | | `omnigraph-server` | `crates/omnigraph-server/tests/` | Per-area suites (post-modularization): `auth_policy.rs`, `data_routes.rs`, `schema_routes.rs`, `stored_queries.rs`, `multi_graph.rs` (cluster-mode boot — converged serving, policy binding wiring, boot refusals — + the concurrent branch-ops matrix), `boot_settings.rs` (mode inference, PolicySource), `s3.rs` (bucket-gated: single-graph serving + config-free `--cluster s3://` boot), `openapi.rs` (OpenAPI drift / regeneration); share `tests/support/mod.rs` | | `omnigraph-compiler` | mostly in-source `#[cfg(test)] mod tests` | Parser, type-checker, IR lowering, lint | @@ -49,6 +49,8 @@ The engine's `tests/` is the principal coverage surface; most graph-shaped behav | `failpoints.rs` | Failure-injection coverage (gated on `failpoints` feature). Includes the five per-writer Phase B → recovery integration tests (`recovery_rolls_forward_after_finalize_publisher_failure`, `schema_apply_phase_b_failure_recovered_on_next_open`, `branch_merge_phase_b_failure_recovered_on_next_open`, `ensure_indices_phase_b_failure_recovered_on_next_open`, `optimize_phase_b_failure_recovered_on_next_open`) and the write-entry in-process heal contract (the four `*_after_finalize_publisher_failure_heals_without_reopen` tests — load, mutation, schema apply, branch merge: a follow-up write on the same handle rolls a sidecar-covered residual forward without reopen/refresh) and the storage-fault matrix for the sidecar lifecycle (`recovery.sidecar_{write,delete,list}` / `recovery.record_audit` failpoints: Phase A put failure aborts with zero drift, Phase D delete failure is swallowed and healed by the next write, list failures are loud at heal and open, audit-append failures are retried to exactly one audit row; plus the bucket-gated `s3_load_recovers_after_publisher_failure_without_reopen`) and the convergence-idempotent roll-forward regression (`open_sweep_roll_forward_converges_when_manifest_advances_concurrently`: two concurrent open-sweeps race one sidecar at the `recovery.before_roll_forward_publish` rendezvous; the CAS loser must converge, not fail the open — iss-schema-apply-reopen-recovery-race). | | `recovery.rs` | Open-time recovery sweep — sidecar I/O, classifier dispatch (NoMovement / RolledPastExpected / UnexpectedAtP1 / UnexpectedMultistep / InvariantViolation), all-or-nothing decision, roll-forward via `ManifestBatchPublisher::publish`, roll-back via `Dataset::restore`, audit row in `_graph_commit_recoveries.lance`, `OpenMode::ReadOnly` skip path | | `composite_flow.rs` | Compositional/narrative end-to-end stories — multi-step flows that compose mechanics covered by other test files. Catches integration regressions where individual operations all pass their unit tests but their composition breaks (sequential merges, post-merge main writes, time-travel through merge DAG, reopen consistency over multi-merge histories, post-optimize and post-cleanup strict writes). | +| `dst.rs` (+ `dst/`) | **Deterministic-simulation (DST) / morphological-matrix harness** (iss-784/epc-783). A seeded generative walk drives the engine through an op alphabet (`dst/op.rs`), runs a white-box invariant battery after every op (`dst/invariants.rs`: HEAD==manifest, `Dataset::validate`, **unique live `_rowid`** [RC-X corruption class], scalar-index probe, count/content/edges==model, `@key` uniqueness) against a reference `model`, and classifies each finding as a KNOWN open bug (allow-listed) or NOVEL (fails). Includes: characterization regressions pinning the 3 known bugs (`regression_rc1_…`, `regression_rc_x_…`, `regression_dup_key_…`) + 1 harness-surfaced finding (`regression_self_loop_not_traversable`); a `FaultAdapter` (`dst/fault.rs`) `StorageAdapter` wrapper; a `proptest-state-machine` shrinking campaign (`dst/statemachine.rs`); a D3 read-shape × D2 morphology battery (`dst/readshape.rs`); a concurrent multi-actor walk; parser/loader fuzz (`dst/fuzz.rs`, proptest dup/malformed-injecting); an env-gated S3-context battery (`s3_battery_holds`); and the `--features dst` replay-equality oracle (`replay_equality_same_seed`, see the `dst` feature below). Coverage ledger + the "why exhaustive sampling is impossible" rationale live in `dst/MATRIX.md`. | +| `dst_recovery.rs` | **Failpoint-gated** DST recovery cells (own binary so the process-global `fail` registry can't leak armed failpoints into the parallel `dst` walks — run with `--features failpoints`): `mutation.post_finalize_pre_publisher` induces a real `RolledPastExpected` sidecar (the StorageAdapter `FaultAdapter` is off the manifest-publish path and cannot), then `recovery_rolls_forward_under_finalize_failure` (white-box battery after roll-forward) and `concurrent_opens_converge_on_pending_sidecar` (an inline park-first rendezvous forces two open-sweeps to race one sidecar — the CAS loser must converge; the generative shape of the #296 fix). | ## Fixtures @@ -71,6 +73,21 @@ The engine's `tests/` is the principal coverage surface; most graph-shaped behav - **Serialize and rendezvous, never sleep.** The `fail` registry is process-global, so every failpoint test carries `#[serial]` (`serial_test`). For concurrent tests, use `helpers::failpoint::Rendezvous` (`tests/helpers/failpoint.rs`): `park_first(name)` parks the first thread to hit the point until `release()`, and `wait_until_reached().await` blocks on that condition (it doubles as a fired-assertion). Do not coordinate threads with fixed `sleep`s. - Activated tests: `crates/omnigraph/tests/failpoints.rs` and `crates/omnigraph-cluster/tests/failpoints.rs` (integration binaries, never in-source — the fail registry is process-global). Run with `cargo test -p omnigraph-engine --features failpoints --test failpoints` / `cargo test -p omnigraph-cluster --features failpoints --test failpoints`. +## Running the DST harness + +The DST harness (`dst.rs` + `dst/`, above) has several run modes; each is its own CI job, none part of the default `cargo test`: + +```bash +cargo test -p omnigraph-engine --test dst # the generative walks + battery + regressions + fuzz (default; S3 cell skips) +cargo test -p omnigraph-engine --features dst --test dst # + the replay-equality oracle (the `dst` determinism feature, below) +cargo test -p omnigraph-engine --features failpoints --test dst_recovery # the recovery / #296-class cells +cargo test -p omnigraph-cli --test cli_dst_parity # the embedded-vs-CLI cross-backend smoke +OMNIGRAPH_S3_TEST_BUCKET=… cargo test -p omnigraph-engine --test dst s3_battery_holds # the S3-context battery +``` + +- **The `dst` feature** (`dst = []` in `crates/omnigraph/Cargo.toml`) is a TEST-ONLY deterministic-replay seam, structured exactly like `failpoints` (zero production cost — the `#[cfg(feature="dst")]` blocks compile away). `src/dst.rs` exposes `next_ulid()`/`now_micros()` that the engine's observable ULID/timestamp sites call instead of `Ulid::new()`/`SystemTime::now()`; without the feature (or with no provider in scope via `dst::with_seed`) they fall back to the real source. The harness uses it for `replay_equality_same_seed` (same seed → identical engine id/clock fingerprint + outcome). Bit-identical replay is **engine-layer only** — Lance internals (compaction/index ids, threadpool scheduling) stay non-deterministic. +- See `crates/omnigraph/tests/dst/MATRIX.md` for the coverage ledger (which D1–D5 cells are sampled, the hidden dimensions found by checking against externally-fixed bugs like #296, and what's deferred). + ## RustFS / S3 integration CI runs these S3-backed tests against a containerized RustFS server (`.github/workflows/ci.yml` → `rustfs_integration` job): @@ -81,6 +98,7 @@ CI runs these S3-backed tests against a containerized RustFS server (`.github/wo - `cargo test -p omnigraph-cluster --test s3_cluster` (full control-plane lifecycle on the bucket) - `cargo test -p omnigraph-cli --test system_local local_cli_s3_end_to_end_init_load_read_flow` - `cargo test -p omnigraph-engine --features failpoints --test failpoints s3_` (recovery-sidecar lifecycle on a real bucket) +- `cargo test -p omnigraph-engine --test dst s3_battery_holds` (the DST white-box battery on a real object store) Locally, set `OMNIGRAPH_S3_TEST_BUCKET` (and the usual `AWS_*` vars including `AWS_ENDPOINT_URL_S3` for non-AWS) before running. Without those, S3 tests skip gracefully.