Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions crates/omnigraph/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ name = "omnigraph"
[features]
default = []
failpoints = ["dep:fail", "fail/failpoints"]
# Deterministic-replay seam for the DST test harness (seeds the engine's
# observable ULID/timestamp sites via a task-local). Zero cost when off.
dst = []

[dependencies]
omnigraph-compiler = { path = "../omnigraph-compiler", version = "0.7.2" }
Expand Down
14 changes: 7 additions & 7 deletions crates/omnigraph/src/db/commit_graph.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use std::collections::{HashMap, VecDeque};
use std::sync::Arc;
use std::time::{SystemTime, UNIX_EPOCH};

use arrow_array::{
Array, RecordBatch, RecordBatchIterator, StringArray, TimestampMicrosecondArray, UInt64Array,
Expand Down Expand Up @@ -42,7 +41,7 @@ impl CommitGraph {
let root = root_uri.trim_end_matches('/');
let uri = graph_commits_uri(root);
let genesis = GraphCommit {
graph_commit_id: ulid::Ulid::new().to_string(),
graph_commit_id: crate::dst::next_ulid().to_string(),
manifest_branch: None,
manifest_version,
parent_commit_id: None,
Expand Down Expand Up @@ -259,7 +258,7 @@ impl CommitGraph {
merged_parent_commit_id: Option<&str>,
actor_id: Option<&str>,
) -> Result<String> {
let graph_commit_id = ulid::Ulid::new().to_string();
let graph_commit_id = crate::dst::next_ulid().to_string();
let commit = GraphCommit {
graph_commit_id: graph_commit_id.clone(),
manifest_branch: manifest_branch.map(|s| s.to_string()),
Expand Down Expand Up @@ -695,10 +694,11 @@ async fn open_for_branch(root_uri: &str, branch: Option<&str>) -> Result<CommitG
}

fn now_micros() -> Result<i64> {
let duration = SystemTime::now()
.duration_since(UNIX_EPOCH)
.map_err(|e| OmniError::manifest(format!("system clock before UNIX_EPOCH: {}", e)))?;
Ok(duration.as_micros() as i64)
// Routed through the DST seam (deterministic under the `dst` feature; the
// real clock otherwise). The `_result` variant PROPAGATES a clock-before-
// epoch error (the pre-seam contract) instead of persisting a 0 timestamp.
// Covers all commit-graph timestamp callers.
crate::dst::now_micros_result()
}

#[cfg(test)]
Expand Down
9 changes: 3 additions & 6 deletions crates/omnigraph/src/db/manifest/recovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1920,12 +1920,9 @@ pub(crate) fn new_sidecar(
actor_id: Option<String>,
tables: Vec<SidecarTablePin>,
) -> RecoverySidecar {
use std::time::{SystemTime, UNIX_EPOCH};
let operation_id = ulid::Ulid::new().to_string();
let started_at = match SystemTime::now().duration_since(UNIX_EPOCH) {
Ok(d) => format!("{}", d.as_micros()),
Err(_) => "0".to_string(),
};
let operation_id = crate::dst::next_ulid().to_string();
// Routed through the DST seam (deterministic under the `dst` feature).
let started_at = format!("{}", crate::dst::now_micros());
RecoverySidecar {
schema_version: SIDECAR_SCHEMA_VERSION,
operation_id,
Expand Down
9 changes: 4 additions & 5 deletions crates/omnigraph/src/db/recovery_audit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
//! audit append is retried).

use std::sync::Arc;
use std::time::{SystemTime, UNIX_EPOCH};

use arrow_array::{
Array, RecordBatch, RecordBatchIterator, StringArray, TimestampMicrosecondArray,
Expand Down Expand Up @@ -277,10 +276,10 @@ fn decode_row(batch: &RecordBatch, row: usize) -> Result<RecoveryAuditRecord> {
}

pub(crate) fn now_micros() -> Result<i64> {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.map(|d| d.as_micros() as i64)
.map_err(|e| OmniError::manifest_internal(format!("system clock before unix epoch: {}", e)))
// Routed through the DST seam (deterministic under the `dst` feature). The
// `_result` variant PROPAGATES a clock-before-epoch error (the pre-seam
// contract) instead of persisting a 0 timestamp.
crate::dst::now_micros_result()
}

#[cfg(test)]
Expand Down
155 changes: 155 additions & 0 deletions crates/omnigraph/src/dst.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
//! Deterministic-replay seam for the DST test harness (Cargo feature `dst`).
//!
//! The engine mints ULIDs and microsecond timestamps at scattered, OBSERVABLE
//! sites (commit-graph ids/timestamps, keyless node/edge ids, recovery sidecar
//! op-ids). For a seeded harness run to be bit-reproducible, those sites call
//! [`next_ulid`] / [`now_micros`] here instead of `Ulid::new()` /
//! `SystemTime::now()`.
//!
//! Mirrors `failpoints.rs`: WITHOUT the `dst` feature (or with no provider in
//! scope) both functions fall back to the real source, and the feature block
//! compiles away — **zero production cost**. Under the feature, a per-task
//! seeded provider (`with_seed`) makes ids/timestamps deterministic and exposes
//! a rolling fingerprint for the replay-equality oracle.

use ulid::Ulid;

/// Production fallback: real micros since the Unix epoch, propagating a
/// clock-before-epoch failure (mirrors the pre-seam helper's `OmniError`).
fn real_now_micros_result() -> crate::error::Result<i64> {
use std::time::{SystemTime, UNIX_EPOCH};
SystemTime::now()
.duration_since(UNIX_EPOCH)
.map(|d| d.as_micros() as i64)
.map_err(|e| crate::error::OmniError::manifest(format!("system clock before UNIX_EPOCH: {e}")))
}

/// Infallible variant: a clock-before-epoch failure collapses to `0`. Only for
/// callers that aren't fallible (the sidecar `started_at` diagnostic string and
/// the seeded ULID clock); the fallible timestamp helpers use
/// [`now_micros_result`] so a bad clock still surfaces, never persists as `0`.
fn real_now_micros() -> i64 {
real_now_micros_result().unwrap_or(0)
}

/// A ULID for an observable id. Deterministic under [`with_seed`]; otherwise a
/// real `Ulid::new()`.
pub(crate) fn next_ulid() -> Ulid {
#[cfg(feature = "dst")]
{
if let Ok(u) = imp::DST.try_with(|s| s.next_ulid()) {
return u;
}
}
Ulid::new()
Comment on lines +40 to +44

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Spawned Tasks Escape

with_seed only scopes a Tokio task-local provider, and this branch silently falls back to Ulid::new() when no provider is present. If a seeded replay wraps a workload that calls the engine from a spawned task, that task does not have this task-local provider, so generated commit IDs, sidecar IDs, or keyless row IDs come from the real clock/random source and are not included in the fingerprint. The replay can then contain nondeterministic persisted IDs while the oracle misses those calls. Consider making the seeded path fail loudly when the feature is enabled and deterministic mode was expected, or providing a way to propagate the seeded provider into spawned work.

Context Used: AGENTS.md (source)

Fix in Claude Code

}

/// Micros since the epoch for an observable timestamp. Deterministic +
/// monotonic under [`with_seed`]; otherwise the real clock.
pub(crate) fn now_micros() -> i64 {
#[cfg(feature = "dst")]
{
if let Ok(t) = imp::DST.try_with(|s| s.now_micros()) {
return t;
}
}
real_now_micros()
}

/// Fallible timestamp for the production write path: deterministic + monotonic
/// under [`with_seed`]; otherwise the real clock, PROPAGATING a clock-before-
/// epoch error (the pre-seam contract). Use this — not [`now_micros`] — wherever
/// the caller is `Result`-returning, so a broken clock fails loudly (deny-list:
/// no silent failures) instead of persisting a `0` timestamp.
pub(crate) fn now_micros_result() -> crate::error::Result<i64> {
#[cfg(feature = "dst")]
{
if let Ok(t) = imp::DST.try_with(|s| s.now_micros()) {
return Ok(t);
}
}
real_now_micros_result()
}

#[cfg(feature = "dst")]
mod imp {
use std::sync::Arc;
use std::sync::atomic::{AtomicI64, AtomicU64, Ordering::SeqCst};

use ulid::Ulid;

pub(super) fn splitmix64(mut x: u64) -> u64 {
x = x.wrapping_add(0x9E37_79B9_7F4A_7C15);
x = (x ^ (x >> 30)).wrapping_mul(0xBF58_476D_1CE4_E5B9);
x = (x ^ (x >> 27)).wrapping_mul(0x94D0_49BB_1331_11EB);
x ^ (x >> 31)
}

/// Seeded, monotonic id/clock source + a rolling fingerprint of everything
/// it has minted — the replay-equality signal.
pub struct DstState {
seed: u64,
ids: AtomicU64,
clock_micros: AtomicI64,
fingerprint: AtomicU64,
}

impl DstState {
pub fn new(seed: u64) -> Arc<Self> {
Arc::new(Self {
seed,
ids: AtomicU64::new(0),
// Fixed seed-derived epoch (~2023 in micros), advanced
// monotonically so ULID timestamps stay sortable. The seed-
// derived offset is bounded to < 1 day (86_400_000_000 micros)
// via splitmix64 so an arbitrary u64 seed (e.g. a fuzzer seed)
// can't overflow i64 — `(seed as i64) * 1_000_000` would panic in
// debug / wrap in release for large seeds and mint bogus times.
clock_micros: AtomicI64::new(
1_700_000_000_000_000 + (splitmix64(seed) % 86_400_000_000) as i64,
),
fingerprint: AtomicU64::new(splitmix64(seed)),
})
}
fn mix(&self, v: u64) {
let cur = self.fingerprint.load(SeqCst);
self.fingerprint.store(splitmix64(cur ^ v), SeqCst);
}
pub fn next_ulid(&self) -> Ulid {
let n = self.ids.fetch_add(1, SeqCst);
let ms = (self.clock_micros.load(SeqCst) / 1000) as u64;
// 80-bit random part derived from (seed, counter) — unique + deterministic.
let random = ((splitmix64(self.seed ^ n) as u128) << 16) | (n as u128 & 0xFFFF);
self.mix(n ^ ms);
Ulid::from_parts(ms, random)
}
pub fn now_micros(&self) -> i64 {
// 1ms per call → monotonic, deterministic.
let t = self.clock_micros.fetch_add(1000, SeqCst);
self.mix(t as u64);
t
}
pub fn fingerprint(&self) -> u64 {
self.fingerprint.load(SeqCst)
}
}

tokio::task_local! {
pub static DST: Arc<DstState>;
}
}

/// Run `fut` with a seeded deterministic id/clock provider in scope; returns the
/// future's output and the engine's id/clock FINGERPRINT for that run. Two runs
/// of the same seeded workload must produce the same fingerprint — the
/// replay-equality oracle. Only available under the `dst` feature.
#[cfg(feature = "dst")]
pub async fn with_seed<F>(seed: u64, fut: F) -> (F::Output, u64)
where
F: std::future::Future,
{
let state = imp::DstState::new(seed);
let handle = std::sync::Arc::clone(&state);
let out = imp::DST.scope(state, fut).await;
(out, handle.fingerprint())
}
4 changes: 2 additions & 2 deletions crates/omnigraph/src/exec/mutation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1086,7 +1086,7 @@ impl Omnigraph {
}
}
} else {
ulid::Ulid::new().to_string()
crate::dst::next_ulid().to_string()
};

let batch = build_insert_batch(&schema, &id, &resolved, &blob_props)?;
Expand Down Expand Up @@ -1132,7 +1132,7 @@ impl Omnigraph {
let edge_type = &self.catalog().edge_types[type_name];
let schema = edge_type.arrow_schema.clone();
let blob_props = edge_type.blob_properties.clone();
let id = ulid::Ulid::new().to_string();
let id = crate::dst::next_ulid().to_string();

let batch = build_insert_batch(&schema, &id, &resolved, &blob_props)?;
validate_edge_insert_endpoints(self, staging, branch, type_name, &resolved).await?;
Expand Down
1 change: 1 addition & 0 deletions crates/omnigraph/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

pub mod changes;
pub mod db;
pub mod dst;
pub mod embedding;
pub mod error;
mod exec;
Expand Down
2 changes: 1 addition & 1 deletion crates/omnigraph/src/loader/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1177,7 +1177,7 @@ fn load_write_concurrency() -> usize {
}

fn generate_id() -> String {
ulid::Ulid::new().to_string()
crate::dst::next_ulid().to_string()
}

pub(crate) fn parse_date32_literal(value: &str) -> Result<i32> {
Expand Down
56 changes: 56 additions & 0 deletions crates/omnigraph/tests/dst.rs
Original file line number Diff line number Diff line change
Expand Up @@ -581,3 +581,59 @@ async fn run_walk(faults: bool) {
}



// ═══════════════════════════ determinism (PR-C) ═══════════════════════════

/// Replay-equality oracle (feature `dst`): the same seeded clean op sequence,
/// run twice under the same DST seed, must mint the IDENTICAL engine id/clock
/// fingerprint AND produce the identical outcome — a determinism tripwire (green
/// while the engine is reproducible, red the moment non-determinism leaks into
/// the id/timestamp seam). Uses insert/read only (no optimize/delete), so
/// neither the Lance FTS-builder non-determinism nor RC-1 perturbs the
/// comparison; the engine still mints a commit id + timestamp per load, so the
/// seam IS exercised. A different seed must diverge (proves seed-sensitivity).
#[cfg(feature = "dst")]
#[tokio::test]
async fn replay_equality_same_seed() {
async fn run(seed: u64) -> (usize, usize) {
let dir = tempfile::tempdir().unwrap();
let db = backend::open_clean(dir.path().to_str().unwrap()).await;
let mut rng = Rng::new(seed);
let (mut np, mut nd) = (0usize, 0usize);
for i in 0..15 {
match rng.below(3) {
0 => {
load_jsonl(&db, &person(&format!("p{i}")), LoadMode::Merge)
.await
.unwrap();
np += 1;
}
1 => {
load_jsonl(&db, &doc(&format!("d{i}"), "whatsapp"), LoadMode::Merge)
.await
.unwrap();
nd += 1;
}
_ => {
let _ = db
.query(
ReadTarget::branch("main"),
"query q() { match { $x: Person } return { $x.slug } }",
"q",
&ParamMap::new(),
)
.await;
}
}
}
(np, nd)
}

let (out1, fp1) = omnigraph::dst::with_seed(7, run(7)).await;
let (out2, fp2) = omnigraph::dst::with_seed(7, run(7)).await;
assert_eq!(fp1, fp2, "engine id/clock fingerprint must match for the same seed");
assert_eq!(out1, out2, "harness outcome must match for the same seed");

let (_out3, fp3) = omnigraph::dst::with_seed(99, run(99)).await;
assert_ne!(fp1, fp3, "different seeds should yield different fingerprints");
}