Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
152 changes: 152 additions & 0 deletions crates/omnigraph-cli/tests/cli_dst_parity.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
//! CLI cross-backend smoke (DST PR-D follow-up).
//!
//! A DST-flavored op SEQUENCE — init → two `load --merge` (multi-fragment) →
//! insert → edge-free delete → query — run against the EMBEDDED `Omnigraph` SDK
//! and the `omnigraph` CLI SUBPROCESS on twin local graphs must agree on the
//! final person-slug set.
//!
//! `parity_matrix.rs` already covers SINGLE-verb CLI-local-vs-HTTP-server
//! parity; this adds the missing pieces: the **embedded-SDK arm** and a
//! **multi-op sequence**. The CLI wraps the same engine, so this verifies the
//! CLI transport layer (arg parsing, `--json` serialization, `--store`
//! addressing, `--as` actor resolution) faithfully reflects the embedded engine
//! across a realistic sequence — it is NOT a second engine implementation.
//!
//! Oracle is format-tolerant (embedded `to_rust_json` and CLI `--json` differ in
//! wrapping): compare the SET of `smoke-*` slug strings recursively extracted
//! from each arm's output.

mod support;

use std::collections::BTreeSet;

use omnigraph::db::{Omnigraph, ReadTarget};
use omnigraph::loader::{LoadMode, load_jsonl};
use omnigraph_compiler::ir::ParamMap;

const SCHEMA: &str = "node Person { slug: String @key name: String }\nedge Knows: Person -> Person\n";
const ACTOR: &str = "smoke-actor";
const FINAL_Q: &str = "query q() { match { $p: Person } return { $p.slug } }";
const INSERT: &str = "query i() { insert Person { slug: \"smoke-new\", name: \"n\" } }";
// smoke-8 has NO incident edges → a clean single-statement delete (no cascade,
// so it can't trip the RC-1 multi-statement-delete drift) — both arms succeed.
const DELETE: &str = "query d() { delete Person where slug = \"smoke-8\" }";

fn person(slug: &str) -> String {
format!("{{\"type\":\"Person\",\"data\":{{\"slug\":\"{slug}\",\"name\":\"n\"}}}}\n")
}
fn knows(a: &str, b: &str) -> String {
format!("{{\"edge\":\"Knows\",\"from\":\"{a}\",\"to\":\"{b}\",\"data\":{{}}}}\n")
}
fn batch1() -> String {
(0..5).map(|i| person(&format!("smoke-{i}"))).collect()
}
fn batch2() -> String {
let mut s: String = (5..10).map(|i| person(&format!("smoke-{i}"))).collect();
s.push_str(&knows("smoke-0", "smoke-1"));
s.push_str(&knows("smoke-1", "smoke-2"));
s
}

/// Recursively collect every string value beginning with `smoke-`.
fn smoke_slugs(v: &serde_json::Value) -> BTreeSet<String> {
fn walk(v: &serde_json::Value, out: &mut BTreeSet<String>) {
match v {
serde_json::Value::String(s) if s.starts_with("smoke-") => {
out.insert(s.clone());
}
serde_json::Value::Array(a) => a.iter().for_each(|x| walk(x, out)),
serde_json::Value::Object(m) => m.values().for_each(|x| walk(x, out)),
_ => {}
}
}
let mut out = BTreeSet::new();
walk(v, &mut out);
out
}

/// Drive the sequence via the embedded engine; return the final slug set.
async fn embedded_arm(uri: &str) -> BTreeSet<String> {
let db = Omnigraph::init(uri, SCHEMA).await.unwrap();
load_jsonl(&db, &batch1(), LoadMode::Merge).await.unwrap();
load_jsonl(&db, &batch2(), LoadMode::Merge).await.unwrap();
db.mutate("main", INSERT, "i", &ParamMap::new()).await.unwrap();
db.mutate("main", DELETE, "d", &ParamMap::new()).await.unwrap();
let res = db
.query(ReadTarget::branch("main"), FINAL_Q, "q", &ParamMap::new())
.await
.unwrap();
smoke_slugs(&res.to_rust_json())
}

/// Drive the SAME sequence via the `omnigraph` CLI subprocess (sync — assert_cmd).
fn cli_arm(uri: &str, schema_path: &str, b1: &str, b2: &str) -> BTreeSet<String> {
support::cli()
.args(["init", "--schema", schema_path, uri])
.assert()
.success();
support::cli()
.args(["load", "--mode", "merge", "--data", b1, uri])
.assert()
.success();
support::cli()
.args(["load", "--mode", "merge", "--data", b2, uri])
.assert()
.success();
support::cli()
.args(["mutate", "-e", INSERT, "--store", uri, "--as", ACTOR])
.assert()
.success();
support::cli()
.args(["mutate", "-e", DELETE, "--store", uri, "--as", ACTOR])
.assert()
.success();
let out = support::cli()
.args(["query", "-e", FINAL_Q, "--json", "--store", uri, "--as", ACTOR])
.output()
.expect("cli query spawn");
assert!(out.status.success(), "cli query failed: {out:?}");
let v: serde_json::Value =
serde_json::from_slice(&out.stdout).expect("cli --json query output");
smoke_slugs(&v)
}

#[tokio::test(flavor = "multi_thread")]
async fn embedded_and_cli_agree_on_dst_sequence() {
let dir = tempfile::tempdir().unwrap();
let schema_path = dir.path().join("smoke.pg");
std::fs::write(&schema_path, SCHEMA).unwrap();
let b1 = dir.path().join("b1.jsonl");
std::fs::write(&b1, batch1()).unwrap();
let b2 = dir.path().join("b2.jsonl");
std::fs::write(&b2, batch2()).unwrap();
let emb_uri = dir.path().join("emb.omni");
let cli_uri = dir.path().join("cli.omni");

// Embedded arm (in-process async).
let emb = embedded_arm(emb_uri.to_str().unwrap()).await;

// CLI arm (subprocess; assert_cmd is sync → run on a blocking thread).
let (cli_uri_s, schema_s, b1_s, b2_s) = (
cli_uri.to_str().unwrap().to_string(),
schema_path.to_str().unwrap().to_string(),
b1.to_str().unwrap().to_string(),
b2.to_str().unwrap().to_string(),
);
let cli = tokio::task::spawn_blocking(move || cli_arm(&cli_uri_s, &schema_s, &b1_s, &b2_s))
.await
.unwrap();

assert_eq!(
emb, cli,
"embedded and CLI diverged on the final person-slug set"
);

// Sanity: the sequence produced the expected set (0..9 + new − 8).
let expected: BTreeSet<String> = (0..10)
.filter(|i| *i != 8)
.map(|i| format!("smoke-{i}"))
.chain(["smoke-new".to_string()])
.collect();
assert_eq!(emb, expected, "unexpected final state: {emb:?}");
}
50 changes: 50 additions & 0 deletions crates/omnigraph/tests/dst.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ mod backend;
mod readshape;
#[path = "dst/statemachine.rs"]
mod statemachine;
#[path = "dst/fuzz.rs"]
mod fuzz;
// NOTE: the failpoint-gated recovery cells live in their OWN binary
// (`tests/dst_recovery.rs`), not here — the process-global `fail` registry would
// otherwise leak armed failpoints into these (non-serial, parallel) walks.
Expand Down Expand Up @@ -581,3 +583,51 @@ async fn run_walk(faults: bool) {
}



// ═══════════════════════════ D5 context: S3 (PR-D) ════════════════════════

/// Build a unique `s3://` DST graph URI, or `None` when the bucket env is unset
/// (skip locally; runs in CI's `rustfs_integration` job).
fn s3_dst_uri() -> Option<String> {
let bucket = std::env::var("OMNIGRAPH_S3_TEST_BUCKET").ok()?;
let base = if bucket.starts_with("s3://") {
bucket
} else {
format!("s3://{bucket}")
};
let nanos = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_nanos();
Some(format!(
"{}/dst-harness/{}-{}",
base.trim_end_matches('/'),
std::process::id(),
nanos
))
}

/// D5 S3 context: the FULL white-box battery must hold on a real object-store
/// backend, not just local FS. A short seeded op sequence builds state on
/// `s3://`, then the battery runs (known bugs allow-listed, novel → fail).
#[tokio::test(flavor = "multi_thread")]
async fn s3_battery_holds() {
Comment on lines +613 to +614

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Wire the S3 DST test into RustFS CI

This new test is bucket-gated, so the normal workspace job always returns early because .github/workflows/ci.yml:193-200 runs cargo test --workspace --locked without OMNIGRAPH_S3_TEST_BUCKET. The only job that sets the bucket, rustfs_integration, runs s3_storage, write_cost_s3, server/cluster/CLI S3 tests, and failpoint S3 tests, but never cargo test -p omnigraph-engine --test dst (ci.yml:373-400), and the PR change classifier does not include crates/omnigraph/tests/dst.rs for RustFS (ci.yml:87-96). As a result this claimed S3 coverage is skipped in CI unless a developer runs it manually.

Useful? React with 👍 / 👎.

let Some(uri) = s3_dst_uri() else {
eprintln!("skipping s3 dst battery: OMNIGRAPH_S3_TEST_BUCKET unset");
return;
};
let db = backend::open_clean(&uri).await;
let mut rng = Rng::new(1);
let mut model = Model::new();
for _ in 0..8 {
let _ = op::step(&db, &mut rng, &mut model).await;
}
Comment on lines +622 to +624

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Fail on unexpected S3 op errors

In the S3 context, every op::step error is discarded. Since step only updates the model after successful operations, an S3-specific failure in a load, mutation, optimize, or read can leave both the database and model at the previous state, letting run_battery pass even though the backend operation failed. This should classify known open bugs like run_walk does and fail on any other S3 operation error.

Useful? React with 👍 / 👎.

for (name, res) in run_battery(&db, &model).await {
if let Err(f) = res {
match classify(&f) {
Some(_bug) => {} // known open bug — allow-listed
None => panic!("s3: NOVEL battery violation [{name}]: {}", f.message()),
}
}
}
}
Comment thread
cursor[bot] marked this conversation as resolved.
12 changes: 9 additions & 3 deletions crates/omnigraph/tests/dst/MATRIX.md
Original file line number Diff line number Diff line change
Expand Up @@ -76,9 +76,10 @@ Legend: ✅ sampled · 🟡 partial · ❌ unsampled · ⏸️ deferred-by-plan
| context | status | note |
|---------|--------|------|
| embedded backend | ✅ | the only backend |
| **CLI / long-lived server backend** | ⏸️ | PR-D |
| **CLI / long-lived server backend** | ❌ deferred | crate-boundary friction: harness is in `omnigraph-engine` tests, the `omnigraph` binary is in `omnigraph-cli` (`CARGO_BIN_EXE_omnigraph` is cross-package, unavailable), and the natural home — `omnigraph-cli` tests — can't reach the harness modules. Plus subprocess-per-op is slow/flaky. Needs the harness op/model extracted to a shared crate first. |
| local FS | ✅ | |
| **S3 (RustFS/MinIO)** | ⏸️ | PR-D |
| **S3 (RustFS/MinIO)** | ✅ (PR-D) | `s3_battery_holds` — full battery on `s3://`, env-gated (`OMNIGRAPH_S3_TEST_BUCKET`) |

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 S3 Battery Never Runs

This row marks the S3 context complete via s3_battery_holds, but the RustFS CI path does not run the dst integration test target. When CI provisions OMNIGRAPH_S3_TEST_BUCKET, the new battery is still never executed, so an S3 regression can pass while the matrix reports the coverage as done.

Context Used: AGENTS.md (source)

Fix in Claude Code

| **parser/loader fuzz** | ✅ (PR-D) | `fuzz::*` — proptest dup/malformed-injecting; `cargo-fuzz`/libFuzzer deferred (needs nightly) |
| single writer | ✅ | |
| concurrent writers (one handle) | ✅ | `concurrent_walk_structural_invariants` |
| **concurrent *opens* / ≥2 recovery sweeps** | ❌ | **the `#296` cell — see below** |
Expand Down Expand Up @@ -128,7 +129,12 @@ seam cannot induce a `RolledPastExpected` sidecar.
one-winner-CAS territory. ❌ remaining.
5. **generative** (not hand-armed) recovery faults — wrap Lance's `object_store` so
the walk *discovers* sidecar/CAS bugs instead of the cells being scripted. ❌
6. **vector/FTS/rrf read shapes** + **determinism/replay-equality** (PR-C) + **CLI/server/S3 backends** (PR-D). ⏸️
6. **determinism/replay-equality** (PR-C) — ✅ DONE (`--features dst` seam +
`replay_equality_same_seed`). **S3 context + parser/loader fuzz** (PR-D) — ✅ DONE
(`s3_battery_holds`, `fuzz::*`). **Still ⏸️/❌:** CLI/server backends (deferred,
crate-boundary — see D5 table), `cargo-fuzz`/libFuzzer (needs nightly),
`porcupine` linearizability, vector/FTS/rrf read shapes, wrapping the main
walks in `with_seed`.

## The standing rule
When a bug is found *outside* this harness, before closing it: add its row to the
Expand Down
Loading