From b8df1c8e73633b225b2838df3540bc54fed493da Mon Sep 17 00:00:00 2001 From: Ragnor Comerford Date: Thu, 25 Jun 2026 13:48:38 +0200 Subject: [PATCH 1/2] feat(dst): deterministic-replay seam (feature `dst`) + replay-equality oracle (PR-C) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The engine mints observable ULIDs/timestamps at scattered sites; a seeded harness run was not bit-reproducible. New src/dst.rs (mirrors failpoints.rs's no-op-fallback shape): next_ulid()/now_micros() consult a tokio::task_local provider when the `dst` feature is on + a provider is in scope, else fall back to real Ulid::new()/SystemTime — ZERO production cost (cfg block compiles away; verified default suite byte-unchanged, 10 passed, no warnings). Routed the OBSERVABLE sites only: 6 Ulid::new() (mutation x2, loader generate_id, commit_graph genesis+append, recovery sidecar op_id) and the timestamp seam (the 2 now_micros helpers + the raw sidecar started_at). Skipped ephemeral cleanup-cutoff/metrics sites. with_seed(seed, fut) -> (Output, fingerprint) scopes the provider and returns a rolling id/clock fingerprint. New replay_equality_same_seed test (--features dst): same seed -> identical fingerprint + outcome; different seed -> divergent. A determinism tripwire — green while the engine is reproducible, red if non-determinism leaks into the seam. Honest scope: gives bit-identical ENGINE-layer seed replay; Lance internals (compaction/index ids, threadpool) stay non-deterministic, so full on-disk equality is out of scope and the flaky FTS-panic is NOT fixed by this. Main walks not yet wrapped in with_seed (follow-up). --- crates/omnigraph/Cargo.toml | 3 + crates/omnigraph/src/db/commit_graph.rs | 12 +- crates/omnigraph/src/db/manifest/recovery.rs | 9 +- crates/omnigraph/src/db/recovery_audit.rs | 7 +- crates/omnigraph/src/dst.rs | 125 +++++++++++++++++++ crates/omnigraph/src/exec/mutation.rs | 4 +- crates/omnigraph/src/lib.rs | 1 + crates/omnigraph/src/loader/mod.rs | 2 +- crates/omnigraph/tests/dst.rs | 56 +++++++++ 9 files changed, 198 insertions(+), 21 deletions(-) create mode 100644 crates/omnigraph/src/dst.rs diff --git a/crates/omnigraph/Cargo.toml b/crates/omnigraph/Cargo.toml index aad8c243..ab5547c4 100644 --- a/crates/omnigraph/Cargo.toml +++ b/crates/omnigraph/Cargo.toml @@ -14,6 +14,9 @@ name = "omnigraph" [features] default = [] failpoints = ["dep:fail", "fail/failpoints"] +# Deterministic-replay seam for the DST test harness (seeds the engine's +# observable ULID/timestamp sites via a task-local). Zero cost when off. +dst = [] [dependencies] omnigraph-compiler = { path = "../omnigraph-compiler", version = "0.7.2" } diff --git a/crates/omnigraph/src/db/commit_graph.rs b/crates/omnigraph/src/db/commit_graph.rs index 181d1d88..c207f786 100644 --- a/crates/omnigraph/src/db/commit_graph.rs +++ b/crates/omnigraph/src/db/commit_graph.rs @@ -1,6 +1,5 @@ use std::collections::{HashMap, VecDeque}; use std::sync::Arc; -use std::time::{SystemTime, UNIX_EPOCH}; use arrow_array::{ Array, RecordBatch, RecordBatchIterator, StringArray, TimestampMicrosecondArray, UInt64Array, @@ -42,7 +41,7 @@ impl CommitGraph { let root = root_uri.trim_end_matches('/'); let uri = graph_commits_uri(root); let genesis = GraphCommit { - graph_commit_id: ulid::Ulid::new().to_string(), + graph_commit_id: crate::dst::next_ulid().to_string(), manifest_branch: None, manifest_version, parent_commit_id: None, @@ -259,7 +258,7 @@ impl CommitGraph { merged_parent_commit_id: Option<&str>, actor_id: Option<&str>, ) -> Result { - let graph_commit_id = ulid::Ulid::new().to_string(); + let graph_commit_id = crate::dst::next_ulid().to_string(); let commit = GraphCommit { graph_commit_id: graph_commit_id.clone(), manifest_branch: manifest_branch.map(|s| s.to_string()), @@ -695,10 +694,9 @@ async fn open_for_branch(root_uri: &str, branch: Option<&str>) -> Result Result { - let duration = SystemTime::now() - .duration_since(UNIX_EPOCH) - .map_err(|e| OmniError::manifest(format!("system clock before UNIX_EPOCH: {}", e)))?; - Ok(duration.as_micros() as i64) + // Routed through the DST seam (deterministic under the `dst` feature; the + // real clock otherwise). Covers all commit-graph timestamp callers. + Ok(crate::dst::now_micros()) } #[cfg(test)] diff --git a/crates/omnigraph/src/db/manifest/recovery.rs b/crates/omnigraph/src/db/manifest/recovery.rs index 86483d46..f073f5e1 100644 --- a/crates/omnigraph/src/db/manifest/recovery.rs +++ b/crates/omnigraph/src/db/manifest/recovery.rs @@ -1920,12 +1920,9 @@ pub(crate) fn new_sidecar( actor_id: Option, tables: Vec, ) -> RecoverySidecar { - use std::time::{SystemTime, UNIX_EPOCH}; - let operation_id = ulid::Ulid::new().to_string(); - let started_at = match SystemTime::now().duration_since(UNIX_EPOCH) { - Ok(d) => format!("{}", d.as_micros()), - Err(_) => "0".to_string(), - }; + let operation_id = crate::dst::next_ulid().to_string(); + // Routed through the DST seam (deterministic under the `dst` feature). + let started_at = format!("{}", crate::dst::now_micros()); RecoverySidecar { schema_version: SIDECAR_SCHEMA_VERSION, operation_id, diff --git a/crates/omnigraph/src/db/recovery_audit.rs b/crates/omnigraph/src/db/recovery_audit.rs index 05d84b8f..12fd0e13 100644 --- a/crates/omnigraph/src/db/recovery_audit.rs +++ b/crates/omnigraph/src/db/recovery_audit.rs @@ -22,7 +22,6 @@ //! audit append is retried). use std::sync::Arc; -use std::time::{SystemTime, UNIX_EPOCH}; use arrow_array::{ Array, RecordBatch, RecordBatchIterator, StringArray, TimestampMicrosecondArray, @@ -277,10 +276,8 @@ fn decode_row(batch: &RecordBatch, row: usize) -> Result { } pub(crate) fn now_micros() -> Result { - SystemTime::now() - .duration_since(UNIX_EPOCH) - .map(|d| d.as_micros() as i64) - .map_err(|e| OmniError::manifest_internal(format!("system clock before unix epoch: {}", e))) + // Routed through the DST seam (deterministic under the `dst` feature). + Ok(crate::dst::now_micros()) } #[cfg(test)] diff --git a/crates/omnigraph/src/dst.rs b/crates/omnigraph/src/dst.rs new file mode 100644 index 00000000..d948853f --- /dev/null +++ b/crates/omnigraph/src/dst.rs @@ -0,0 +1,125 @@ +//! Deterministic-replay seam for the DST test harness (Cargo feature `dst`). +//! +//! The engine mints ULIDs and microsecond timestamps at scattered, OBSERVABLE +//! sites (commit-graph ids/timestamps, keyless node/edge ids, recovery sidecar +//! op-ids). For a seeded harness run to be bit-reproducible, those sites call +//! [`next_ulid`] / [`now_micros`] here instead of `Ulid::new()` / +//! `SystemTime::now()`. +//! +//! Mirrors `failpoints.rs`: WITHOUT the `dst` feature (or with no provider in +//! scope) both functions fall back to the real source, and the feature block +//! compiles away — **zero production cost**. Under the feature, a per-task +//! seeded provider (`with_seed`) makes ids/timestamps deterministic and exposes +//! a rolling fingerprint for the replay-equality oracle. + +use ulid::Ulid; + +/// Production fallback: real micros since the Unix epoch. +fn real_now_micros() -> i64 { + use std::time::{SystemTime, UNIX_EPOCH}; + SystemTime::now() + .duration_since(UNIX_EPOCH) + .map(|d| d.as_micros() as i64) + .unwrap_or(0) +} + +/// A ULID for an observable id. Deterministic under [`with_seed`]; otherwise a +/// real `Ulid::new()`. +pub(crate) fn next_ulid() -> Ulid { + #[cfg(feature = "dst")] + { + if let Ok(u) = imp::DST.try_with(|s| s.next_ulid()) { + return u; + } + } + Ulid::new() +} + +/// Micros since the epoch for an observable timestamp. Deterministic + +/// monotonic under [`with_seed`]; otherwise the real clock. +pub(crate) fn now_micros() -> i64 { + #[cfg(feature = "dst")] + { + if let Ok(t) = imp::DST.try_with(|s| s.now_micros()) { + return t; + } + } + real_now_micros() +} + +#[cfg(feature = "dst")] +mod imp { + use std::sync::Arc; + use std::sync::atomic::{AtomicI64, AtomicU64, Ordering::SeqCst}; + + use ulid::Ulid; + + pub(super) fn splitmix64(mut x: u64) -> u64 { + x = x.wrapping_add(0x9E37_79B9_7F4A_7C15); + x = (x ^ (x >> 30)).wrapping_mul(0xBF58_476D_1CE4_E5B9); + x = (x ^ (x >> 27)).wrapping_mul(0x94D0_49BB_1331_11EB); + x ^ (x >> 31) + } + + /// Seeded, monotonic id/clock source + a rolling fingerprint of everything + /// it has minted — the replay-equality signal. + pub struct DstState { + seed: u64, + ids: AtomicU64, + clock_micros: AtomicI64, + fingerprint: AtomicU64, + } + + impl DstState { + pub fn new(seed: u64) -> Arc { + Arc::new(Self { + seed, + ids: AtomicU64::new(0), + // Fixed seed-derived epoch (~2023 in micros), advanced + // monotonically so ULID timestamps stay sortable. + clock_micros: AtomicI64::new(1_700_000_000_000_000 + (seed as i64) * 1_000_000), + fingerprint: AtomicU64::new(splitmix64(seed)), + }) + } + fn mix(&self, v: u64) { + let cur = self.fingerprint.load(SeqCst); + self.fingerprint.store(splitmix64(cur ^ v), SeqCst); + } + pub fn next_ulid(&self) -> Ulid { + let n = self.ids.fetch_add(1, SeqCst); + let ms = (self.clock_micros.load(SeqCst) / 1000) as u64; + // 80-bit random part derived from (seed, counter) — unique + deterministic. + let random = ((splitmix64(self.seed ^ n) as u128) << 16) | (n as u128 & 0xFFFF); + self.mix(n ^ ms); + Ulid::from_parts(ms, random) + } + pub fn now_micros(&self) -> i64 { + // 1ms per call → monotonic, deterministic. + let t = self.clock_micros.fetch_add(1000, SeqCst); + self.mix(t as u64); + t + } + pub fn fingerprint(&self) -> u64 { + self.fingerprint.load(SeqCst) + } + } + + tokio::task_local! { + pub static DST: Arc; + } +} + +/// Run `fut` with a seeded deterministic id/clock provider in scope; returns the +/// future's output and the engine's id/clock FINGERPRINT for that run. Two runs +/// of the same seeded workload must produce the same fingerprint — the +/// replay-equality oracle. Only available under the `dst` feature. +#[cfg(feature = "dst")] +pub async fn with_seed(seed: u64, fut: F) -> (F::Output, u64) +where + F: std::future::Future, +{ + let state = imp::DstState::new(seed); + let handle = std::sync::Arc::clone(&state); + let out = imp::DST.scope(state, fut).await; + (out, handle.fingerprint()) +} diff --git a/crates/omnigraph/src/exec/mutation.rs b/crates/omnigraph/src/exec/mutation.rs index fe63a0c7..2fa884da 100644 --- a/crates/omnigraph/src/exec/mutation.rs +++ b/crates/omnigraph/src/exec/mutation.rs @@ -1086,7 +1086,7 @@ impl Omnigraph { } } } else { - ulid::Ulid::new().to_string() + crate::dst::next_ulid().to_string() }; let batch = build_insert_batch(&schema, &id, &resolved, &blob_props)?; @@ -1132,7 +1132,7 @@ impl Omnigraph { let edge_type = &self.catalog().edge_types[type_name]; let schema = edge_type.arrow_schema.clone(); let blob_props = edge_type.blob_properties.clone(); - let id = ulid::Ulid::new().to_string(); + let id = crate::dst::next_ulid().to_string(); let batch = build_insert_batch(&schema, &id, &resolved, &blob_props)?; validate_edge_insert_endpoints(self, staging, branch, type_name, &resolved).await?; diff --git a/crates/omnigraph/src/lib.rs b/crates/omnigraph/src/lib.rs index 7dd71353..b32f9dd9 100644 --- a/crates/omnigraph/src/lib.rs +++ b/crates/omnigraph/src/lib.rs @@ -9,6 +9,7 @@ pub mod changes; pub mod db; +pub mod dst; pub mod embedding; pub mod error; mod exec; diff --git a/crates/omnigraph/src/loader/mod.rs b/crates/omnigraph/src/loader/mod.rs index c3d48d0f..a46811f3 100644 --- a/crates/omnigraph/src/loader/mod.rs +++ b/crates/omnigraph/src/loader/mod.rs @@ -1177,7 +1177,7 @@ fn load_write_concurrency() -> usize { } fn generate_id() -> String { - ulid::Ulid::new().to_string() + crate::dst::next_ulid().to_string() } pub(crate) fn parse_date32_literal(value: &str) -> Result { diff --git a/crates/omnigraph/tests/dst.rs b/crates/omnigraph/tests/dst.rs index bec60df2..4152fc47 100644 --- a/crates/omnigraph/tests/dst.rs +++ b/crates/omnigraph/tests/dst.rs @@ -581,3 +581,59 @@ async fn run_walk(faults: bool) { } + +// ═══════════════════════════ determinism (PR-C) ═══════════════════════════ + +/// Replay-equality oracle (feature `dst`): the same seeded clean op sequence, +/// run twice under the same DST seed, must mint the IDENTICAL engine id/clock +/// fingerprint AND produce the identical outcome — a determinism tripwire (green +/// while the engine is reproducible, red the moment non-determinism leaks into +/// the id/timestamp seam). Uses insert/read only (no optimize/delete), so +/// neither the Lance FTS-builder non-determinism nor RC-1 perturbs the +/// comparison; the engine still mints a commit id + timestamp per load, so the +/// seam IS exercised. A different seed must diverge (proves seed-sensitivity). +#[cfg(feature = "dst")] +#[tokio::test] +async fn replay_equality_same_seed() { + async fn run(seed: u64) -> (usize, usize) { + let dir = tempfile::tempdir().unwrap(); + let db = backend::open_clean(dir.path().to_str().unwrap()).await; + let mut rng = Rng::new(seed); + let (mut np, mut nd) = (0usize, 0usize); + for i in 0..15 { + match rng.below(3) { + 0 => { + load_jsonl(&db, &person(&format!("p{i}")), LoadMode::Merge) + .await + .unwrap(); + np += 1; + } + 1 => { + load_jsonl(&db, &doc(&format!("d{i}"), "whatsapp"), LoadMode::Merge) + .await + .unwrap(); + nd += 1; + } + _ => { + let _ = db + .query( + ReadTarget::branch("main"), + "query q() { match { $x: Person } return { $x.slug } }", + "q", + &ParamMap::new(), + ) + .await; + } + } + } + (np, nd) + } + + let (out1, fp1) = omnigraph::dst::with_seed(7, run(7)).await; + let (out2, fp2) = omnigraph::dst::with_seed(7, run(7)).await; + assert_eq!(fp1, fp2, "engine id/clock fingerprint must match for the same seed"); + assert_eq!(out1, out2, "harness outcome must match for the same seed"); + + let (_out3, fp3) = omnigraph::dst::with_seed(99, run(99)).await; + assert_ne!(fp1, fp3, "different seeds should yield different fingerprints"); +} From 626ca8de810e443be5e0fcb28fe904af73596491 Mon Sep 17 00:00:00 2001 From: Ragnor Comerford Date: Sat, 27 Jun 2026 13:05:21 +0200 Subject: [PATCH 2/2] fix(dst): bound seeded clock init; keep production timestamp path fallible #5: bound the seed-derived clock_micros offset via splitmix64(seed) % 86_400_000_000 so a large u64 seed can't overflow i64 and mint bogus ULID times. #3: add a fallible now_micros_result() (deterministic under with_seed; otherwise the real clock, PROPAGATING a clock-before-epoch error) and route the commit-graph + recovery-audit helpers through it, so a bad clock surfaces OmniError::manifest again instead of silently persisting 0 (deny-list: no silent failures). Verified: prod build clean; --features dst --test dst -> 11 passed (incl. replay_equality_same_seed). --- crates/omnigraph/src/db/commit_graph.rs | 6 ++-- crates/omnigraph/src/db/recovery_audit.rs | 6 ++-- crates/omnigraph/src/dst.rs | 40 ++++++++++++++++++++--- 3 files changed, 43 insertions(+), 9 deletions(-) diff --git a/crates/omnigraph/src/db/commit_graph.rs b/crates/omnigraph/src/db/commit_graph.rs index c207f786..daf20c5c 100644 --- a/crates/omnigraph/src/db/commit_graph.rs +++ b/crates/omnigraph/src/db/commit_graph.rs @@ -695,8 +695,10 @@ async fn open_for_branch(root_uri: &str, branch: Option<&str>) -> Result Result { // Routed through the DST seam (deterministic under the `dst` feature; the - // real clock otherwise). Covers all commit-graph timestamp callers. - Ok(crate::dst::now_micros()) + // real clock otherwise). The `_result` variant PROPAGATES a clock-before- + // epoch error (the pre-seam contract) instead of persisting a 0 timestamp. + // Covers all commit-graph timestamp callers. + crate::dst::now_micros_result() } #[cfg(test)] diff --git a/crates/omnigraph/src/db/recovery_audit.rs b/crates/omnigraph/src/db/recovery_audit.rs index 12fd0e13..607f4790 100644 --- a/crates/omnigraph/src/db/recovery_audit.rs +++ b/crates/omnigraph/src/db/recovery_audit.rs @@ -276,8 +276,10 @@ fn decode_row(batch: &RecordBatch, row: usize) -> Result { } pub(crate) fn now_micros() -> Result { - // Routed through the DST seam (deterministic under the `dst` feature). - Ok(crate::dst::now_micros()) + // Routed through the DST seam (deterministic under the `dst` feature). The + // `_result` variant PROPAGATES a clock-before-epoch error (the pre-seam + // contract) instead of persisting a 0 timestamp. + crate::dst::now_micros_result() } #[cfg(test)] diff --git a/crates/omnigraph/src/dst.rs b/crates/omnigraph/src/dst.rs index d948853f..7dc159c0 100644 --- a/crates/omnigraph/src/dst.rs +++ b/crates/omnigraph/src/dst.rs @@ -14,13 +14,22 @@ use ulid::Ulid; -/// Production fallback: real micros since the Unix epoch. -fn real_now_micros() -> i64 { +/// Production fallback: real micros since the Unix epoch, propagating a +/// clock-before-epoch failure (mirrors the pre-seam helper's `OmniError`). +fn real_now_micros_result() -> crate::error::Result { use std::time::{SystemTime, UNIX_EPOCH}; SystemTime::now() .duration_since(UNIX_EPOCH) .map(|d| d.as_micros() as i64) - .unwrap_or(0) + .map_err(|e| crate::error::OmniError::manifest(format!("system clock before UNIX_EPOCH: {e}"))) +} + +/// Infallible variant: a clock-before-epoch failure collapses to `0`. Only for +/// callers that aren't fallible (the sidecar `started_at` diagnostic string and +/// the seeded ULID clock); the fallible timestamp helpers use +/// [`now_micros_result`] so a bad clock still surfaces, never persists as `0`. +fn real_now_micros() -> i64 { + real_now_micros_result().unwrap_or(0) } /// A ULID for an observable id. Deterministic under [`with_seed`]; otherwise a @@ -47,6 +56,21 @@ pub(crate) fn now_micros() -> i64 { real_now_micros() } +/// Fallible timestamp for the production write path: deterministic + monotonic +/// under [`with_seed`]; otherwise the real clock, PROPAGATING a clock-before- +/// epoch error (the pre-seam contract). Use this — not [`now_micros`] — wherever +/// the caller is `Result`-returning, so a broken clock fails loudly (deny-list: +/// no silent failures) instead of persisting a `0` timestamp. +pub(crate) fn now_micros_result() -> crate::error::Result { + #[cfg(feature = "dst")] + { + if let Ok(t) = imp::DST.try_with(|s| s.now_micros()) { + return Ok(t); + } + } + real_now_micros_result() +} + #[cfg(feature = "dst")] mod imp { use std::sync::Arc; @@ -76,8 +100,14 @@ mod imp { seed, ids: AtomicU64::new(0), // Fixed seed-derived epoch (~2023 in micros), advanced - // monotonically so ULID timestamps stay sortable. - clock_micros: AtomicI64::new(1_700_000_000_000_000 + (seed as i64) * 1_000_000), + // monotonically so ULID timestamps stay sortable. The seed- + // derived offset is bounded to < 1 day (86_400_000_000 micros) + // via splitmix64 so an arbitrary u64 seed (e.g. a fuzzer seed) + // can't overflow i64 — `(seed as i64) * 1_000_000` would panic in + // debug / wrap in release for large seeds and mint bogus times. + clock_micros: AtomicI64::new( + 1_700_000_000_000_000 + (splitmix64(seed) % 86_400_000_000) as i64, + ), fingerprint: AtomicU64::new(splitmix64(seed)), }) }