Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
17 commits
Select commit Hold shift + click to select a range
9e5d9b7
test(dst): port harness to 0.7.2 + harden (coverage ledger, Reopen op…
ragnorc Jun 25, 2026
f23a337
test(dst): close the fault-seam gap + the #296 cell (failpoints variant)
ragnorc Jun 25, 2026
76fa48f
chore: lock new test-only dev-deps (arrow-array, futures, serde_json,…
ragnorc Jun 25, 2026
da7f5f6
test(dst): PR-D parser/loader fuzz (proptest, no nightly)
ragnorc Jun 25, 2026
cc94db2
test(dst): PR-D D5 S3 context — full battery on s3:// (env-gated)
ragnorc Jun 25, 2026
68808ac
docs(dst): MATRIX ledger — PR-C + PR-D (S3, fuzz) done; CLI/server de…
ragnorc Jun 25, 2026
a611f31
test(cli): DST cross-backend smoke — embedded SDK vs CLI subprocess p…
ragnorc Jun 25, 2026
0f09ec3
docs(testing): document the DST harness (dst.rs, dst_recovery.rs, cli…
ragnorc Jun 25, 2026
43a05be
feat(dst): scaffold omnigraph-dst crate + validate the dev-dep cycle …
ragnorc Jun 25, 2026
b5727dc
feat(dst): extract op/model/invariants/fault behind a Backend trait i…
ragnorc Jun 25, 2026
a38516e
refactor(dst): make engine tests/dst.rs a thin consumer of omnigraph-dst
ragnorc Jun 25, 2026
62e9104
test(cli): full generative cross-backend walk (embedded vs CLI) via o…
ragnorc Jun 25, 2026
99abd2a
docs(dst): record the PR-E cross-backend walk + omnigraph-dst crate
ragnorc Jun 25, 2026
2f066dd
fix(dst): stop count==model masking lost-writes; classify swallowed e…
ragnorc Jun 25, 2026
70cd542
ci+docs(dst): run the S3 battery in CI; fix stale recovery row + Faul…
ragnorc Jun 26, 2026
58a7a3c
Merge remote-tracking branch 'origin/main' into dst-extract-crate
ragnorc Jun 26, 2026
9fd1ac7
fix(dst): honor Cli branch; tighten repair parity, CLI classify, acto…
ragnorc Jun 27, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,8 @@ jobs:
crates/omnigraph-cluster/tests/s3_cluster.rs) run_rustfs_ci=true ;;
crates/omnigraph-server/tests/s3.rs|crates/omnigraph-server/tests/support/*) run_rustfs_ci=true ;;
crates/omnigraph-cli/tests/system_local.rs) run_rustfs_ci=true ;;
crates/omnigraph/tests/dst.rs|crates/omnigraph/tests/dst/*) run_rustfs_ci=true ;;
crates/omnigraph-dst/*) run_rustfs_ci=true ;;
esac
done

Expand Down Expand Up @@ -403,6 +405,19 @@ jobs:
echo "$output" | grep -Eq "test result: ok\. [1-9][0-9]* passed" \
|| { echo "::error::filter 's3_' matched no tests — vacuous pass"; exit 1; }

- name: Run RustFS DST white-box battery
# The DST harness's S3-context battery (`s3_battery_holds`) is bucket-
# gated and lives in the `dst` target, which no other job runs — without
# this step the battery skips everywhere and MATRIX/testing.md overstate
# the S3 coverage. Same grep-guard as above: the name filter must match a
# real test (here the bucket IS set, so it runs rather than skips).
run: |
output=$(cargo test --locked -p omnigraph-engine --test dst s3_battery_holds -- --nocapture 2>&1); status=$?
echo "$output"
[ "$status" -eq 0 ] || exit "$status"
echo "$output" | grep -Eq "test result: ok\. [1-9][0-9]* passed" \
|| { echo "::error::filter 's3_battery_holds' matched no tests — vacuous pass"; exit 1; }

- name: Dump RustFS logs on failure
if: failure()
run: docker logs rustfs
27 changes: 27 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ members = [
"crates/omnigraph-cluster",
"crates/omnigraph-policy",
"crates/omnigraph-server",
"crates/omnigraph-dst",
]
default-members = [
"crates/omnigraph",
Expand Down
2 changes: 2 additions & 0 deletions crates/omnigraph-cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,3 +34,5 @@ serde_json = { workspace = true }
tempfile = { workspace = true }
lance = { workspace = true }
lance-index = { workspace = true }
omnigraph-dst = { path = "../omnigraph-dst" }
futures = { workspace = true }
117 changes: 117 additions & 0 deletions crates/omnigraph-cli/tests/cli_cross_backend_walk.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
//! Cross-backend generative walk (PR-E): the SAME seeded DST op sequence driven
//! against the embedded `Omnigraph` SDK AND the `omnigraph` CLI subprocess must
//! agree, per step, on the black-box state (Person/Doc slug sets + traversable
//! edge count).
//!
//! Honest scope: the CLI wraps the SAME engine, so this is NOT a second engine
//! implementation — it verifies the CLI transport layer (arg parsing, output
//! serialization, `--store` addressing, `--as` actor resolution) faithfully
//! reflects the embedded engine across a realistic generative sequence. The
//! white-box invariant battery stays embedded-only (it needs the real handle);
//! this is the black-box half the CLI arm can satisfy. Complements the
//! single-op `parity_matrix.rs` (CLI-local vs CLI-HTTP) and the fixed-sequence
//! `cli_dst_parity.rs` smoke with a generated, model-driven op stream.
//!
//! Lockstep determinism: both walks use the same seed and an identical reference
//! `Model`, and both wrap the same engine, so the op stream matches step-for-
//! step. Any divergence in op-kind, success, or resulting state fails the assert
//! immediately — before the two RNGs/models could desync.

use std::collections::BTreeSet;
use std::path::PathBuf;

use omnigraph_dst::op::OpKind;
use omnigraph_dst::{step, Backend, BackendError, Cli, Embedded, Model, Rng};

/// All slug values for a node type, normalized to an order-independent set. The
/// slug key is matched by suffix so the same parse works for the embedded
/// `to_rust_json` (`"x.slug"`) and the CLI `--json` shape.
async fn slugs<B: Backend>(b: &B, ty: &str) -> BTreeSet<String> {
let q = format!("query q() {{ match {{ $x: {ty} }} return {{ $x.slug }} }}");
let rows = b.query("main", &q).await.expect("slug query");
rows.iter()
.filter_map(|r| {
r.as_object()?
.iter()
.find(|(k, _)| k.ends_with("slug"))
.and_then(|(_, v)| v.as_str())
.map(|s| s.to_string())
})
.collect()
}

/// The traversable `Knows` edge count (`$a knows $b`) — the black-box edge oracle.
async fn edge_count<B: Backend>(b: &B) -> usize {
let rows = b
.query(
"main",
"query q() { match { $a: Person $a knows $b } return { $a.slug, $b.slug } }",
)
.await
.expect("edge query");
rows.len()
}

#[tokio::test]
async fn embedded_and_cli_agree_on_seeded_walk() {
let bin = PathBuf::from(env!("CARGO_BIN_EXE_omnigraph"));

for seed in 0..2u64 {
let emb_dir = tempfile::tempdir().unwrap();
let cli_dir = tempfile::tempdir().unwrap();
let emb = Embedded::open_clean(emb_dir.path().to_str().unwrap()).await;
let cli = Cli::new(bin.clone(), cli_dir.path().to_str().unwrap().to_string());
cli.init().await.expect("cli init");

let mut emb_rng = Rng::new(seed);
let mut cli_rng = Rng::new(seed);
let mut emb_model = Model::new();
let mut cli_model = Model::new();

for i in 0..18 {
let (ek, eres) = step(&emb, &mut emb_rng, &mut emb_model).await;
let (ck, cres) = step(&cli, &mut cli_rng, &mut cli_model).await;

assert_eq!(ek, ck, "seed={seed} step={i}: op-kind diverged");
// Repair's success FLAG legitimately differs in ONE known case:
// embedded `repair(force=false)` returns Ok and leaves suspicious/
// unverifiable drift in place (e.g. the drift RC-1 strands), whereas
// the CLI `repair --confirm` (no --force) EXITS NON-ZERO refusing to
// publish it. Allow-list exactly that (emb Ok + CLI "refused
// suspicious" error); a no-op on logical data either way, so the STATE
// assertions below still match. Any OTHER divergence — including a
// broken repair invocation, or any non-Repair op — must still fail.
let known_repair_divergence = ek == OpKind::Repair
&& eres.is_ok()
&& cres.as_ref().err().is_some_and(|e| {
let m = e.message();
m.contains("repair refused") || m.contains("suspicious")
});
if !known_repair_divergence {
assert_eq!(
eres.is_ok(),
cres.is_ok(),
"seed={seed} step={i} op[{ek:?}]: success diverged (emb={:?} cli={:?})",
eres.as_ref().err().map(BackendError::message),
cres.as_ref().err().map(BackendError::message),
);
}
Comment thread
cursor[bot] marked this conversation as resolved.

assert_eq!(
slugs(&emb, "Person").await,
slugs(&cli, "Person").await,
"seed={seed} step={i} op[{ek:?}]: Person slug set diverged"
);
assert_eq!(
slugs(&emb, "Doc").await,
slugs(&cli, "Doc").await,
"seed={seed} step={i} op[{ek:?}]: Doc slug set diverged"
);
assert_eq!(
edge_count(&emb).await,
edge_count(&cli).await,
"seed={seed} step={i} op[{ek:?}]: traversable edge count diverged"
);
}
}
}
152 changes: 152 additions & 0 deletions crates/omnigraph-cli/tests/cli_dst_parity.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
//! CLI cross-backend smoke (DST PR-D follow-up).
//!
//! A DST-flavored op SEQUENCE — init → two `load --merge` (multi-fragment) →
//! insert → edge-free delete → query — run against the EMBEDDED `Omnigraph` SDK
//! and the `omnigraph` CLI SUBPROCESS on twin local graphs must agree on the
//! final person-slug set.
//!
//! `parity_matrix.rs` already covers SINGLE-verb CLI-local-vs-HTTP-server
//! parity; this adds the missing pieces: the **embedded-SDK arm** and a
//! **multi-op sequence**. The CLI wraps the same engine, so this verifies the
//! CLI transport layer (arg parsing, `--json` serialization, `--store`
//! addressing, `--as` actor resolution) faithfully reflects the embedded engine
//! across a realistic sequence — it is NOT a second engine implementation.
//!
//! Oracle is format-tolerant (embedded `to_rust_json` and CLI `--json` differ in
//! wrapping): compare the SET of `smoke-*` slug strings recursively extracted
//! from each arm's output.

mod support;

use std::collections::BTreeSet;

use omnigraph::db::{Omnigraph, ReadTarget};
use omnigraph::loader::{LoadMode, load_jsonl};
use omnigraph_compiler::ir::ParamMap;

const SCHEMA: &str = "node Person { slug: String @key name: String }\nedge Knows: Person -> Person\n";
const ACTOR: &str = "smoke-actor";
const FINAL_Q: &str = "query q() { match { $p: Person } return { $p.slug } }";
const INSERT: &str = "query i() { insert Person { slug: \"smoke-new\", name: \"n\" } }";
// smoke-8 has NO incident edges → a clean single-statement delete (no cascade,
// so it can't trip the RC-1 multi-statement-delete drift) — both arms succeed.
const DELETE: &str = "query d() { delete Person where slug = \"smoke-8\" }";

fn person(slug: &str) -> String {
format!("{{\"type\":\"Person\",\"data\":{{\"slug\":\"{slug}\",\"name\":\"n\"}}}}\n")
}
fn knows(a: &str, b: &str) -> String {
format!("{{\"edge\":\"Knows\",\"from\":\"{a}\",\"to\":\"{b}\",\"data\":{{}}}}\n")
}
fn batch1() -> String {
(0..5).map(|i| person(&format!("smoke-{i}"))).collect()
}
fn batch2() -> String {
let mut s: String = (5..10).map(|i| person(&format!("smoke-{i}"))).collect();
s.push_str(&knows("smoke-0", "smoke-1"));
s.push_str(&knows("smoke-1", "smoke-2"));
s
}

/// Recursively collect every string value beginning with `smoke-`.
fn smoke_slugs(v: &serde_json::Value) -> BTreeSet<String> {
fn walk(v: &serde_json::Value, out: &mut BTreeSet<String>) {
match v {
serde_json::Value::String(s) if s.starts_with("smoke-") => {
out.insert(s.clone());
}
serde_json::Value::Array(a) => a.iter().for_each(|x| walk(x, out)),
serde_json::Value::Object(m) => m.values().for_each(|x| walk(x, out)),
_ => {}
}
}
let mut out = BTreeSet::new();
walk(v, &mut out);
out
}

/// Drive the sequence via the embedded engine; return the final slug set.
async fn embedded_arm(uri: &str) -> BTreeSet<String> {
let db = Omnigraph::init(uri, SCHEMA).await.unwrap();
load_jsonl(&db, &batch1(), LoadMode::Merge).await.unwrap();
load_jsonl(&db, &batch2(), LoadMode::Merge).await.unwrap();
db.mutate("main", INSERT, "i", &ParamMap::new()).await.unwrap();
db.mutate("main", DELETE, "d", &ParamMap::new()).await.unwrap();
let res = db
.query(ReadTarget::branch("main"), FINAL_Q, "q", &ParamMap::new())
.await
.unwrap();
smoke_slugs(&res.to_rust_json())
}

/// Drive the SAME sequence via the `omnigraph` CLI subprocess (sync — assert_cmd).
fn cli_arm(uri: &str, schema_path: &str, b1: &str, b2: &str) -> BTreeSet<String> {
support::cli()
.args(["init", "--schema", schema_path, uri])
.assert()
.success();
support::cli()
.args(["load", "--mode", "merge", "--data", b1, uri])
.assert()
.success();
support::cli()
.args(["load", "--mode", "merge", "--data", b2, uri])
.assert()
.success();
support::cli()
.args(["mutate", "-e", INSERT, "--store", uri, "--as", ACTOR])
.assert()
.success();
support::cli()
.args(["mutate", "-e", DELETE, "--store", uri, "--as", ACTOR])
.assert()
.success();
let out = support::cli()
.args(["query", "-e", FINAL_Q, "--json", "--store", uri, "--as", ACTOR])
.output()
.expect("cli query spawn");
assert!(out.status.success(), "cli query failed: {out:?}");
let v: serde_json::Value =
serde_json::from_slice(&out.stdout).expect("cli --json query output");
smoke_slugs(&v)
}

#[tokio::test(flavor = "multi_thread")]
async fn embedded_and_cli_agree_on_dst_sequence() {
let dir = tempfile::tempdir().unwrap();
let schema_path = dir.path().join("smoke.pg");
std::fs::write(&schema_path, SCHEMA).unwrap();
let b1 = dir.path().join("b1.jsonl");
std::fs::write(&b1, batch1()).unwrap();
let b2 = dir.path().join("b2.jsonl");
std::fs::write(&b2, batch2()).unwrap();
let emb_uri = dir.path().join("emb.omni");
let cli_uri = dir.path().join("cli.omni");

// Embedded arm (in-process async).
let emb = embedded_arm(emb_uri.to_str().unwrap()).await;

// CLI arm (subprocess; assert_cmd is sync → run on a blocking thread).
let (cli_uri_s, schema_s, b1_s, b2_s) = (
cli_uri.to_str().unwrap().to_string(),
schema_path.to_str().unwrap().to_string(),
b1.to_str().unwrap().to_string(),
b2.to_str().unwrap().to_string(),
);
let cli = tokio::task::spawn_blocking(move || cli_arm(&cli_uri_s, &schema_s, &b1_s, &b2_s))
.await
.unwrap();

assert_eq!(
emb, cli,
"embedded and CLI diverged on the final person-slug set"
);

// Sanity: the sequence produced the expected set (0..9 + new − 8).
let expected: BTreeSet<String> = (0..10)
.filter(|i| *i != 8)
.map(|i| format!("smoke-{i}"))
.chain(["smoke-new".to_string()])
.collect();
assert_eq!(emb, expected, "unexpected final state: {emb:?}");
}
17 changes: 17 additions & 0 deletions crates/omnigraph-dst/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
[package]
name = "omnigraph-dst"
version = "0.7.2"
edition = "2024"
license = "MIT"
description = "Deterministic-simulation (DST) test harness for Omnigraph — shared op alphabet, reference model, and a Backend trait driven across the embedded SDK and the CLI."
publish = false

[dependencies]
omnigraph = { package = "omnigraph-engine", path = "../omnigraph", version = "0.7.2" }
omnigraph-compiler = { path = "../omnigraph-compiler", version = "0.7.2" }
tokio = { workspace = true }
async-trait = { workspace = true }
serde_json = { workspace = true }
arrow-array = { workspace = true }
futures = { workspace = true }
tempfile = { workspace = true }
Loading