diff --git a/AGENTS.md b/AGENTS.md
index d6d242d6..c0ff4f0a 100644
--- a/AGENTS.md
+++ b/AGENTS.md
@@ -32,7 +32,7 @@ OmniGraph is a typed property-graph engine built as a coordination layer over ma
- **Languages**: a `.pg` schema language and a `.gq` query language, both Pest-based, with a typed IR.
- **Multi-modal querying**: vector ANN (`nearest`), full-text (`search`/`fuzzy`/`match_text`/`bm25`), Reciprocal Rank Fusion (`rrf`), and graph traversal (`Expand`, anti-join `not { … }`) in one runtime.
- **Branches and commits across the whole graph**: Git-style — every successful publish appends to a commit DAG; merges are three-way at the row level.
-- **Atomic per-query writes**: `mutate_as` and `load` accumulate insert/update batches into an in-memory `MutationStaging.pending` per touched table; one `stage_*` + `commit_staged` per table runs at end-of-query, then `ManifestBatchPublisher::publish` commits the manifest atomically with per-table `expected_table_versions` CAS. A mid-query failure leaves Lance HEAD untouched on staged tables — no drift, no run state machine, no staging branches. Deletes still inline-commit; D₂ at parse time prevents inserts/updates and deletes from coexisting in one query.
+- **Atomic per-query writes**: `mutate_as` and `load` accumulate insert/update batches into an in-memory `MutationStaging.pending` per touched table; one `stage_*` + `commit_staged` per table runs at end-of-query, then `ManifestBatchPublisher::publish` commits the manifest atomically with per-table `expected_table_versions` CAS. A mid-query failure leaves Lance HEAD untouched on staged tables — no drift, no run state machine, no staging branches. Deletes stage through the same path (MR-A: `stage_delete` via Lance 7.0 `DeleteBuilder::execute_uncommitted`), so they no longer advance Lance HEAD inline. D₂ at parse time is a deliberate boundary — one mutation query is constructive (insert/update) XOR destructive (delete) — so read-your-writes within a query stays unambiguous and each table commits at most one version; compose mixed operations via separate mutations, or a branch for single-commit atomicity.
- **HTTP server**: Axum + utoipa OpenAPI, bearer auth (SHA-256 hashed, optional AWS Secrets Manager). Cedar policy enforcement is engine-wide — every `_as` writer calls `Omnigraph::enforce(action, scope, actor)`, so HTTP, CLI, and embedded SDK consumers all hit the same gate. **Cluster-only boot** (RFC-011): the server always boots from a cluster directory (`--cluster
`, RFC-005) and serves N graphs (N ≥ 1) under multi-graph routes (`/graphs/{graph_id}/...` + read-only `GET /graphs` enumeration); there are no single-graph flat routes and no positional-URI boot. Per-graph + server-level Cedar policies. Runtime add/remove (`POST /graphs`, `DELETE /graphs/{id}`) is not exposed — operators run `cluster apply` and restart.
- **CLI** with two-surface config (RFC-007/008): the team-owned cluster directory (`cluster.yaml`) plus the per-operator `~/.omnigraph/config.yaml` (servers, clusters, credentials, actor, profiles, aliases, defaults). Graphs are addressed via `--store`/`--server`/`--cluster`/`--profile`/operator defaults (RFC-011). Multi-format output (json/jsonl/csv/kv/table).
@@ -250,7 +250,7 @@ omnigraph policy explain --cluster ./company-brain --graph knowledge --actor act
| Columnar storage on object store | ✅ Arrow/Lance | URI normalization, S3 env-var plumbing |
| Per-dataset versioning + time travel | ✅ | `snapshot_at_version`, `entity_at`, snapshot-pinned reads across many tables |
| Per-dataset branches | ✅ | **Graph-level** branches (atomic across all sub-tables), lazy fork, system branch filtering |
-| Atomic single-dataset commits | ✅ | **Multi-table publish via three layers**, NOT a single Lance primitive: (1) per-table Lance `commit_staged` for the data write, (2) `__manifest` row-level CAS via `ManifestBatchPublisher` for cross-table ordering, (3) the open-time recovery sweep for the residual gap between (1) and (2). All three layers ship; the five migrated writers (`MutationStaging::finalize`, `schema_apply`, `branch_merge`, `ensure_indices`, `optimize_all_tables`) write a `__recovery/{ulid}.json` sidecar before Phase B and delete it after Phase C. The next `Omnigraph::open` (gated on `OpenMode::ReadWrite`) runs the sweep in `db/manifest/recovery.rs`: classify, decide all-or-nothing per sidecar, roll forward via single `ManifestBatchPublisher::publish` or roll back via `Dataset::restore` followed by a manifest publish of the restored version (so both directions converge to `manifest == HEAD` — no residual drift), and record an audit row in `_graph_commit_recoveries.lance` (queryable via `omnigraph commit list --filter actor=omnigraph:recovery`). The write entry points (`load_as`, `mutate_as`, `apply_schema_as`, `branch_merge_as`) and `refresh` additionally run an in-process roll-forward-only heal (serialized against live writers via the per-table write queues), so a long-lived server converges on its next write without restart; only rollback-eligible sidecars still defer to the next read-write open (a future background reconciler's goal). Engine writes route through a sealed `TableStorage` trait (`db.storage()`) exposing only `stage_*` + `commit_staged` + reads; the inline-commit residuals (`delete_where`, `create_vector_index`) are split onto a separate sealed `InlineCommitResidual` trait reached via `db.storage_inline_residual()` (MR-854), so the default surface cannot couple a write with a HEAD advance — §1 holds by construction. `delete_where` and `create_vector_index` stay inline until upstream Lance ships a public two-phase API ([#6658](https://github.com/lance-format/lance/issues/6658), [#6666](https://github.com/lance-format/lance/issues/6666)); `LoadMode::Overwrite` uses Lance `Overwrite` staged transactions. |
+| Atomic single-dataset commits | ✅ | **Multi-table publish via three layers**, NOT a single Lance primitive: (1) per-table Lance `commit_staged` for the data write, (2) `__manifest` row-level CAS via `ManifestBatchPublisher` for cross-table ordering, (3) the open-time recovery sweep for the residual gap between (1) and (2). All three layers ship; the five migrated writers (`MutationStaging::finalize`, `schema_apply`, `branch_merge`, `ensure_indices`, `optimize_all_tables`) write a `__recovery/{ulid}.json` sidecar before Phase B and delete it after Phase C. The next `Omnigraph::open` (gated on `OpenMode::ReadWrite`) runs the sweep in `db/manifest/recovery.rs`: classify, decide all-or-nothing per sidecar, roll forward via single `ManifestBatchPublisher::publish` or roll back via `Dataset::restore` followed by a manifest publish of the restored version (so both directions converge to `manifest == HEAD` — no residual drift), and record an audit row in `_graph_commit_recoveries.lance` (queryable via `omnigraph commit list --filter actor=omnigraph:recovery`). The write entry points (`load_as`, `mutate_as`, `apply_schema_as`, `branch_merge_as`) and `refresh` additionally run an in-process roll-forward-only heal (serialized against live writers via the per-table write queues), so a long-lived server converges on its next write without restart; only rollback-eligible sidecars still defer to the next read-write open (a future background reconciler's goal). Engine writes route through a sealed `TableStorage` trait (`db.storage()`) exposing only `stage_*` + `commit_staged` + reads; the sole inline-commit residual (`create_vector_index`) is split onto a separate sealed `InlineCommitResidual` trait reached via `db.storage_inline_residual()` (MR-854), so the default surface cannot couple a write with a HEAD advance — §1 holds by construction. `delete` migrated to the staged path in MR-A (`stage_delete` via Lance 7.0 `DeleteBuilder::execute_uncommitted`, [#6658](https://github.com/lance-format/lance/issues/6658)); `create_vector_index` stays inline until upstream Lance ships a public two-phase API ([#6666](https://github.com/lance-format/lance/issues/6666)); `LoadMode::Overwrite` uses Lance `Overwrite` staged transactions. |
| Compaction (`compact_files`) + reindex (`optimize_indices`) | ✅ | `omnigraph optimize` orchestrates over all node/edge tables, bounded concurrency; per table runs `compact_files` **then Lance `optimize_indices`** (folds appended/rewritten fragments back into existing indexes — incremental merge, not retrain) and **publishes the resulting version to `__manifest`** (so the manifest tracks the Lance HEAD — required for reads to observe the work and for schema apply / strict writes to pass their HEAD-vs-manifest precondition), under the per-`(table, main)` write queue with `SidecarKind::Optimize` recovery coverage spanning both ops; **commits even with no compaction work if index coverage is stale**; **refuses on an unrecovered graph**; **skips uncovered HEAD > manifest drift** with `DriftNeedsRepair`; **skips blob-bearing tables** (reported via `TableOptimizeStats.skipped`, not silent; reindex is skipped for them too today), gated on `LANCE_SUPPORTS_BLOB_COMPACTION` until the upstream blob-v2 compaction-decode bug is fixed (see [docs/dev/invariants.md](docs/dev/invariants.md) Known Gaps) |
| Repair uncovered drift | — | `omnigraph repair` explicitly classifies uncovered table `HEAD > manifest` drift: verified maintenance drift (`ReserveFragments`/`Rewrite`) can be published with `--confirm`; suspicious or unverifiable drift requires `--force --confirm`. Sidecar-covered crash residuals still recover automatically on open. |
| Cleanup (`cleanup_old_versions`) | ✅ | `omnigraph cleanup` with `--keep` / `--older-than` policy |
diff --git a/Cargo.lock b/Cargo.lock
index 16a18274..01170ec6 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -4939,6 +4939,7 @@ dependencies = [
"fail",
"futures",
"lance",
+ "lance-core",
"lance-datafusion",
"lance-file",
"lance-index",
diff --git a/Cargo.toml b/Cargo.toml
index c442242c..ee386796 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -32,6 +32,7 @@ datafusion-expr = "53"
datafusion-functions-aggregate = "53"
lance = { version = "7.0.0", default-features = false, features = ["aws"] }
+lance-core = "7.0.0"
lance-datafusion = "7.0.0"
lance-file = "7.0.0"
lance-index = "7.0.0"
diff --git a/crates/omnigraph/Cargo.toml b/crates/omnigraph/Cargo.toml
index 8801c429..cc750e16 100644
--- a/crates/omnigraph/Cargo.toml
+++ b/crates/omnigraph/Cargo.toml
@@ -19,6 +19,7 @@ failpoints = ["dep:fail", "fail/failpoints"]
omnigraph-compiler = { path = "../omnigraph-compiler", version = "0.7.2" }
omnigraph-policy = { path = "../omnigraph-policy", version = "0.7.2" }
lance = { workspace = true }
+lance-core = { workspace = true }
lance-datafusion = { workspace = true }
datafusion = { workspace = true }
lance-file = { workspace = true }
diff --git a/crates/omnigraph/src/db/omnigraph.rs b/crates/omnigraph/src/db/omnigraph.rs
index 823b157f..c3860c56 100644
--- a/crates/omnigraph/src/db/omnigraph.rs
+++ b/crates/omnigraph/src/db/omnigraph.rs
@@ -154,7 +154,7 @@ pub struct Omnigraph {
/// write-serialization mechanism (the server holds the engine as a
/// lockless `Arc`). Reachable from engine internals
/// (mutation finalize, schema_apply, branch_merge, ensure_indices,
- /// delete_where, the fork path, recovery reconciler).
+ /// the fork path, recovery reconciler).
write_queue: Arc,
/// Process-wide mutex held across the swap → operate → restore window
/// in `branch_merge_impl`. Two concurrent merges with distinct targets
@@ -702,13 +702,14 @@ impl Omnigraph {
&self.table_store
}
- /// Inline-commit residual surface (`delete_where`,
- /// `create_vector_index`) — the writes Lance cannot yet express as a
- /// stage-then-commit pair. Deliberately separate from [`Self::storage`] so
- /// the default storage surface is staged-only and a new writer cannot couple
- /// "write bytes" with "advance HEAD" by reaching for `db.storage()`. Only
- /// the handful of documented residual call sites (mutation/merge deletes,
- /// vector-index build) use this accessor. See
+ /// Inline-commit residual surface (`create_vector_index`) — the sole
+ /// write Lance cannot yet express as a stage-then-commit pair (segment
+ /// commit needs `build_index_metadata_from_segments`, Lance #6666).
+ /// Deliberately separate from [`Self::storage`] so the default storage
+ /// surface is staged-only and a new writer cannot couple "write bytes" with
+ /// "advance HEAD" by reaching for `db.storage()`. Only the vector-index
+ /// build uses this accessor — delete migrated to the staged path
+ /// (`stage_delete`) in MR-A. See
/// `crate::storage_layer::InlineCommitResidual` for the per-method blocker.
pub(crate) fn storage_inline_residual(
&self,
@@ -726,7 +727,7 @@ impl Omnigraph {
/// Per-`(table_key, branch)` writer queues.
///
/// Engine-internal writers (mutation finalize, schema_apply,
- /// branch_merge, ensure_indices, delete_where) and the future MR-870
+ /// branch_merge, ensure_indices) and the future MR-870
/// recovery reconciler reach the queue manager via this accessor.
/// Returns an `Arc` clone so callers can hold the manager across
/// `&mut self` engine API boundaries.
diff --git a/crates/omnigraph/src/db/omnigraph/table_ops.rs b/crates/omnigraph/src/db/omnigraph/table_ops.rs
index a917150e..f73f2a26 100644
--- a/crates/omnigraph/src/db/omnigraph/table_ops.rs
+++ b/crates/omnigraph/src/db/omnigraph/table_ops.rs
@@ -1171,8 +1171,9 @@ async fn prepare_updates_for_commit(
// its just-committed version. When a table's handle is present, the index
// build below reuses it and SKIPS the `reopen_for_mutation` open. Absent
// entries (other writers — schema apply, merge, ensure_indices, tests —
- // pass `HashMap::new()`; inline-committed/delete tables are never staged)
- // keep the byte-identical `reopen_for_mutation` path.
+ // pass `HashMap::new()`) keep the byte-identical `reopen_for_mutation`
+ // path. Delete tables ARE staged now (MR-A), so their handle is present
+ // like any other staged write.
mut committed_handles: std::collections::HashMap,
) -> Result> {
if updates.is_empty() {
diff --git a/crates/omnigraph/src/db/write_queue.rs b/crates/omnigraph/src/db/write_queue.rs
index 18a14d1c..07605662 100644
--- a/crates/omnigraph/src/db/write_queue.rs
+++ b/crates/omnigraph/src/db/write_queue.rs
@@ -5,10 +5,10 @@
//! disjoint-key writes proceed concurrently and only writes to the same
//! `(table_key, branch_ref)` serialize here. This module owns the queue
//! data structure; callers in `MutationStaging::commit_all`, `branch_merge`,
-//! `schema_apply`, `ensure_indices`, `delete_where`, the fork path (first
-//! write to a table on a branch — acquired before the fork, held through the
-//! manifest publish), and the recovery reconciler acquire guards before any
-//! per-table Lance commit. Serialization is in-process only; cross-process
+//! `schema_apply`, `ensure_indices`, the fork path (first write to a table on
+//! a branch — acquired before the fork, held through the manifest publish),
+//! and the recovery reconciler acquire guards before any per-table Lance
+//! commit. Serialization is in-process only; cross-process
//! writers on one graph remain one-winner-CAS at the manifest publish.
//!
//! ## Why exclusive `tokio::sync::Mutex<()>` per key
diff --git a/crates/omnigraph/src/exec/merge.rs b/crates/omnigraph/src/exec/merge.rs
index c846894a..0bf37903 100644
--- a/crates/omnigraph/src/exec/merge.rs
+++ b/crates/omnigraph/src/exec/merge.rs
@@ -1065,9 +1065,10 @@ async fn publish_rewritten_merge_table(
staged: &StagedMergeResult,
) -> Result {
// Branch merge's source-rewrite path is Merge-shaped (upsert from
- // source onto target). The inline `delete_where` later in this
- // function operates on rows the rewrite chose to remove, not
- // user-facing predicates, so Merge is the correct policy here.
+ // source onto target). The staged delete later in this function
+ // (`stage_delete` + `commit_staged`) operates on rows the rewrite chose
+ // to remove, not user-facing predicates, so Merge is the correct policy
+ // here.
// `open_for_mutation` is the no-txn entry, so collapse #1's non-strict
// open-skip (gated on `txn.is_some()`) never fires here — the handle is
// always `Some`.
@@ -1130,15 +1131,10 @@ async fn publish_rewritten_merge_table(
// See tests/failpoints.rs::branch_merge_rewrite_partial_after_merge_rolls_back.
crate::failpoints::maybe_fail(crate::failpoints::names::BRANCH_MERGE_REWRITE_AFTER_MERGE_PRE_DELETE)?;
- // Phase 2: delete removed rows via deletion vectors.
- //
- // INLINE-COMMIT RESIDUAL: lance-6.0.1 does not expose a public
- // two-phase delete API (DeleteJob is `pub(crate)` —
- // lance-format/lance#6658 is open with no PRs). We deliberately do
- // NOT introduce a `stage_delete` wrapper that would secretly
- // inline-commit (it would create a side-channel between the staged
- // and inline write paths). When the upstream API ships, swap this
- // `delete_where` call for `stage_delete` + `commit_staged`.
+ // Phase 2: delete removed rows via deletion vectors, staged through
+ // `stage_delete` + `commit_staged` (MR-A — Lance 7.0's
+ // `DeleteBuilder::execute_uncommitted`, #6658, made delete a two-phase
+ // staged write, so this no longer inline-commits).
if !staged.deleted_ids.is_empty() {
let escaped: Vec = staged
.deleted_ids
@@ -1146,11 +1142,12 @@ async fn publish_rewritten_merge_table(
.map(|id| format!("'{}'", id.replace('\'', "''")))
.collect();
let filter = format!("id IN ({})", escaped.join(", "));
- let (new_ds, _) = target_db
- .storage_inline_residual()
- .delete_where(&full_path, current_ds, &filter)
- .await?;
- current_ds = new_ds;
+ if let Some(staged_delete) = target_db.storage().stage_delete(¤t_ds, &filter).await? {
+ current_ds = target_db
+ .storage()
+ .commit_staged(current_ds, staged_delete)
+ .await?;
+ }
}
// Failpoint: crash after the Phase 2 delete commit, before the index build.
@@ -1310,8 +1307,8 @@ async fn publish_adopted_delta(
// tests/failpoints.rs::branch_merge_adopt_partial_after_upsert_rolls_back.
crate::failpoints::maybe_fail(crate::failpoints::names::BRANCH_MERGE_ADOPT_AFTER_UPSERT_PRE_DELETE)?;
- // Phase 2: delete removed rows via deletion vectors (inline-commit residual,
- // same as the three-way path until Lance ships a public two-phase delete).
+ // Phase 2: delete removed rows via deletion vectors, staged through
+ // `stage_delete` + `commit_staged` (same as the three-way path; MR-A).
if !delta.deleted_ids.is_empty() {
let escaped: Vec = delta
.deleted_ids
@@ -1319,11 +1316,12 @@ async fn publish_adopted_delta(
.map(|id| format!("'{}'", id.replace('\'', "''")))
.collect();
let filter = format!("id IN ({})", escaped.join(", "));
- let (new_ds, _) = target_db
- .storage_inline_residual()
- .delete_where(&full_path, current_ds, &filter)
- .await?;
- current_ds = new_ds;
+ if let Some(staged_delete) = target_db.storage().stage_delete(¤t_ds, &filter).await? {
+ current_ds = target_db
+ .storage()
+ .commit_staged(current_ds, staged_delete)
+ .await?;
+ }
}
// Phase 4: index coverage is reconciler-owned on the adopt path. Unlike the
@@ -1597,7 +1595,7 @@ impl Omnigraph {
// Pin `RewriteMerged` and `AdoptWithDelta` candidates — both advance
// Lance HEAD before the manifest publish (RewriteMerged via
// publish_rewritten_merge_table; AdoptWithDelta via publish_adopted_delta:
- // stage_append + stage_merge_insert + delete_where + index — multiple
+ // stage_append + stage_merge_insert + stage_delete + index — multiple
// commit_staged calls per table, which the loose classification handles
// as multi-step drift).
//
diff --git a/crates/omnigraph/src/exec/mutation.rs b/crates/omnigraph/src/exec/mutation.rs
index fe63a0c7..78cedd65 100644
--- a/crates/omnigraph/src/exec/mutation.rs
+++ b/crates/omnigraph/src/exec/mutation.rs
@@ -565,42 +565,28 @@ fn apply_assignments(
use super::staging::{MutationStaging, PendingMode};
-/// Open a sub-table dataset for read or inline-commit-write within the
-/// current mutation query, capturing pre-write metadata in `staging` on
-/// first touch. The captured version is the publisher's CAS fence at
-/// end-of-query (per-table OCC).
+/// Open a sub-table dataset for read or staged write within the current
+/// mutation query, capturing pre-write metadata in `staging` on first touch.
+/// The captured version is the publisher's CAS fence at end-of-query
+/// (per-table OCC).
///
/// On first touch, opens the dataset at HEAD on the requested branch
/// via `open_for_mutation_on_branch`, which compares Lance HEAD against
/// the manifest's pinned version — that fence is the engine's
/// publisher-style OCC catching cross-writer drift before we make any
/// changes. For delete-only queries, this strict open is also the uncovered
-/// drift guard that runs before `delete_where` can inline-commit.
+/// drift guard.
///
-/// On subsequent touches *within the same query*, behavior depends on
-/// whether the table has already been inline-committed by a delete op:
-///
-/// - **Insert / update path (no inline commit between touches).** Lance
-/// HEAD has not moved since first touch, so a fresh
-/// `open_for_mutation_on_branch` would still match the manifest
-/// pinned version. We just go through it again; `ensure_path` is a
-/// no-op (idempotent on the captured `expected_version`).
-/// - **Delete cascade or multi-delete on the same table.** A prior
-/// `delete_where` on this table has already advanced Lance HEAD past
-/// the manifest's pinned version (the manifest doesn't move until
-/// end-of-query). Going through `open_for_mutation_on_branch` again
-/// would trip its `ensure_expected_version` equality check
-/// (`actual = pinned + 1` vs `expected = pinned`). Instead we route
-/// through `reopen_for_mutation` at the post-inline-commit Lance
-/// version captured in `staging.inline_committed[table_key]`, which
-/// is the source of truth for "where is Lance HEAD right now on
-/// this table within this query."
-///
-/// The `inline_committed` reopen branch closes the multi-delete-on-same-table
-/// failure path that pre-staged-write engines inherited. The branch goes
-/// away once Lance exposes a two-phase delete API
-/// ([lance-format/lance#6658](https://github.com/lance-format/lance/issues/6658))
-/// and we can stage deletes on the same path as inserts/updates.
+/// On subsequent touches *within the same query*, Lance HEAD has not moved
+/// since first touch — inserts, updates AND deletes all stage their work and
+/// defer every HEAD advance to the end-of-query commit, so no op inline-commits
+/// between touches. A fresh `open_for_mutation_on_branch` therefore still
+/// matches the manifest pinned version; we go through it again and `ensure_path`
+/// is a no-op (idempotent on the captured `expected_version`). This holds for a
+/// delete cascade or multiple delete statements hitting the same table: each
+/// touch records another predicate (`record_delete`), and `stage_all` combines
+/// them into one staged delete — there is no post-inline-commit reopen to
+/// special-case anymore.
impl Omnigraph {
/// Resolve a LIVE-HEAD read handle for an edge table's committed-state `@card`
/// scan when collapse #1 skipped the accumulation open. The edge-insert path no
@@ -646,28 +632,6 @@ async fn open_table_for_mutation(
op_kind: crate::db::MutationOpKind,
txn: Option<&crate::db::WriteTxn>,
) -> Result<(Option, String, Option)> {
- if let Some(prior) = staging.inline_committed.get(table_key) {
- let path = staging.paths.get(table_key).ok_or_else(|| {
- OmniError::manifest_internal(format!(
- "open_table_for_mutation: inline_committed[{}] without paths entry",
- table_key
- ))
- })?;
- // The inline-committed reopen does NOT validate the schema contract
- // (it reopens at the post-inline-commit Lance version directly), so it
- // takes no `txn` — threading it here would change nothing. Deletes are
- // strict ops, so this always opens (returns `Some`).
- let ds = db
- .reopen_for_mutation(
- table_key,
- &path.full_path,
- path.table_branch.as_deref(),
- prior.table_version,
- op_kind,
- )
- .await?;
- return Ok((Some(ds), path.full_path.clone(), path.table_branch.clone()));
- }
// `open_for_mutation_on_branch` returns the expected version even when it
// skips the open (collapse #1, the non-strict insert/merge path): the version
// is the pinned base's, identical to the opened handle's `.version()`. Use it
@@ -694,17 +658,62 @@ async fn open_table_for_mutation(
Ok((opened.handle, opened.full_path, opened.table_branch))
}
+/// Build the committed-snapshot filter used to COUNT a delete statement's
+/// `affected_*`, excluding rows a prior delete statement on the same table
+/// already scheduled for removal in this query.
+///
+/// Deletes stage — they no longer inline-commit — so every statement in a
+/// delete-only query scans the same unchanged committed snapshot. Counting each
+/// predicate independently would double-count overlapping statements (the old
+/// inline path did not, because each delete committed before the next ran). The
+/// combined staged delete actually removes the UNION `p₁ ∪ p₂ ∪ …`; excluding
+/// the prior predicates here makes each statement contribute `|pₙ \ (p₁ ∪ …)|`,
+/// whose sum is exactly that distinct count. `base` (the original predicate) is
+/// still what gets recorded — only the count uses this exclusion.
+///
+/// LOAD-BEARING on D₂: this exclusion assumes the committed snapshot is
+/// invariant across the query's statements, which holds only because D₂
+/// (`enforce_no_mixed_destructive_constructive`) forbids mixing inserts/updates
+/// with deletes — so a delete-touched table never has pending writes that would
+/// shift what a later statement sees. If D₂ is ever relaxed, this dedup must be
+/// revisited (a later delete would then need to see prior in-query writes).
+///
+/// The exclusion uses `IS NOT TRUE`, not `NOT`, because of SQL three-valued
+/// logic: a prior predicate referencing a column that is NULL for some row
+/// (e.g. `age > 30` on a row with NULL `age`) evaluates to UNKNOWN, and
+/// `NOT UNKNOWN` is still UNKNOWN — which a `WHERE` treats as not-matched, so
+/// the row would be wrongly dropped from this statement's scan even though the
+/// prior delete never matched it (dropping it from `deleted_ids` skips its
+/// cascade, or — if it is the only match — leaves the node undeleted). Only
+/// rows a prior predicate matched as definitely TRUE should be excluded:
+/// `(prior) IS NOT TRUE` keeps both FALSE and UNKNOWN rows.
+fn dedup_delete_filter(base: &str, prior: &[String]) -> String {
+ if prior.is_empty() {
+ base.to_string()
+ } else {
+ let excluded = prior
+ .iter()
+ .map(|p| format!("({p})"))
+ .collect::>()
+ .join(" OR ");
+ format!("({base}) AND (({excluded}) IS NOT TRUE)")
+ }
+}
+
/// D₂ parse-time check: a single mutation query is either insert/update-only
/// or delete-only. Mixed → reject before any I/O.
///
-/// Reason: under the staged-write writer, inserts and updates
-/// accumulate in memory and commit at end-of-query, while deletes still
-/// inline-commit (Lance lacks a public two-phase delete in 6.0.1).
-/// Mixing creates ordering hazards (same-row insert→delete becomes a no-op
-/// because the staged insert isn't visible to delete; cascading deletes
-/// of just-inserted edges break referential integrity by silent design).
-/// Until Lance exposes `DeleteJob::execute_uncommitted`, the parse-time
-/// rejection keeps both paths atomic and correct.
+/// This is a deliberate semantic boundary, not temporary scaffolding. Inserts
+/// and updates accumulate as pending in-memory batches and deletes accumulate
+/// as predicates; both stage and commit at end-of-query. Keeping a single query
+/// to one kind means read-your-writes stays unambiguous (a read never has to
+/// reconcile pending inserts against same-query delete predicates) and each
+/// touched table commits at most one version per query. Compose mixed
+/// operations by issuing separate atomic mutations (writes, then deletes), or a
+/// branch + merge when one atomic commit is required. Allowing mixing would
+/// instead demand an in-query delete view, pending pruning, and per-table
+/// two-commit ordering in the hot mutation path — complexity this boundary
+/// deliberately avoids.
fn enforce_no_mixed_destructive_constructive(
ir: &omnigraph_compiler::ir::MutationIR,
) -> Result<()> {
@@ -724,8 +733,9 @@ fn enforce_no_mixed_destructive_constructive(
return Err(OmniError::manifest(format!(
"mutation '{}' on the same query mixes inserts/updates and deletes; \
split into separate mutations: (1) inserts and updates, then (2) deletes. \
- This restriction lifts when Lance exposes a two-phase delete API \
- (tracked: lance-format/lance#6658).",
+ A query is deliberately constructive or destructive, not both, so its \
+ read-your-writes stays unambiguous; run the two on a branch and merge \
+ if you need them in one atomic commit.",
ir.name
)));
}
@@ -804,11 +814,11 @@ impl Omnigraph {
let resolved_params = enrich_mutation_params(params)?;
// Per-query staging accumulator. Inserts and updates push batches
- // into `pending`; deletes still inline-commit and record into
- // `inline_committed`. At end-of-query, `finalize` issues one
- // `stage_*` + `commit_staged` per pending table, then the
- // publisher commits the manifest atomically across all touched
- // tables. Branch is threaded explicitly — no coordinator swap.
+ // into `pending`; deletes push predicates into `delete_predicates`. At
+ // end-of-query, `finalize` issues one `stage_*` + `commit_staged` per
+ // touched table (inserts/updates/deletes alike), then the publisher
+ // commits the manifest atomically across all touched tables. Branch is
+ // threaded explicitly — no coordinator swap.
let mut staging = MutationStaging::default();
// Lower + validate up front so the touched-table set is known before
@@ -1365,7 +1375,7 @@ impl Omnigraph {
let pred_sql = predicate_to_sql(predicate, params, false)?;
let table_key = format!("node:{}", type_name);
- let (handle, full_path, table_branch) = open_table_for_mutation(
+ let (handle, _full_path, _table_branch) = open_table_for_mutation(
self,
staging,
branch,
@@ -1376,14 +1386,20 @@ impl Omnigraph {
.await?;
// Delete is a STRICT op, so collapse #1 never skips its open.
let ds = handle.expect("strict Delete op always opens its dataset");
- let initial_version = ds.version();
// Scan matching IDs for cascade. Per D₂ this never overlaps with
// staged inserts (mixed insert/delete in one query is rejected at
- // parse time), so we scan committed only.
+ // parse time), so we scan committed only. Exclude IDs a prior delete
+ // statement on this table already scheduled (deletes stage, so the
+ // committed snapshot is unchanged across statements): without this,
+ // overlapping predicates would double-count `affected_nodes` AND
+ // re-cascade already-deleted nodes' edges. The combined staged delete
+ // still removes the union, so we record the original `pred_sql` below.
+ let scan_filter =
+ dedup_delete_filter(&pred_sql, staging.recorded_delete_predicates(&table_key));
let batches = self
.storage()
- .scan(&ds, Some(&["id"]), Some(&pred_sql), None)
+ .scan(&ds, Some(&["id"]), Some(&scan_filter), None)
.await?;
let deleted_ids: Vec = batches
@@ -1409,33 +1425,15 @@ impl Omnigraph {
let affected_nodes = deleted_ids.len();
- // Delete nodes — still inline-commit (Lance's `Dataset::delete` is
- // not exposed as a two-phase op in 6.0.1). D₂ keeps inserts and
- // deletes from coexisting in one query, so this advance of Lance
- // HEAD is the only HEAD movement during the query and the
- // publisher's CAS captures it intact.
- let ds = self
- .reopen_for_mutation(
- &table_key,
- &full_path,
- table_branch.as_deref(),
- initial_version,
- crate::db::MutationOpKind::Delete,
- )
- .await?;
+ // Record the node delete as a staged predicate. D₂ keeps inserts and
+ // deletes from coexisting in one query, so this table carries no
+ // pending write batches; `stage_all` turns the predicate into one
+ // `stage_delete` (a deletion-vector transaction) that advances Lance
+ // HEAD only at the unified end-of-query commit — no inline residual.
+ // `open_table_for_mutation` above already captured the table's
+ // path/version/op-kind via `ensure_path`.
crate::failpoints::maybe_fail(crate::failpoints::names::MUTATION_DELETE_NODE_PRE_PRIMARY_DELETE)?;
- let (_new_ds, delete_state) = self
- .storage_inline_residual()
- .delete_where(&full_path, ds, &pred_sql)
- .await?;
-
- staging.record_inline(crate::db::SubTableUpdate {
- table_key: table_key.clone(),
- table_version: delete_state.version,
- table_branch: table_branch.clone(),
- row_count: delete_state.row_count,
- version_metadata: delete_state.version_metadata,
- });
+ staging.record_delete(&table_key, pred_sql.clone());
let mut affected_edges = 0usize;
let escaped: Vec = deleted_ids
@@ -1465,7 +1463,7 @@ impl Omnigraph {
let edge_table_key = format!("edge:{}", edge_name);
let cascade_filter = cascade_filters.join(" OR ");
- let (edge_handle, edge_full_path, edge_table_branch) = open_table_for_mutation(
+ let (edge_handle, _edge_full_path, _edge_table_branch) = open_table_for_mutation(
self,
staging,
branch,
@@ -1477,21 +1475,26 @@ impl Omnigraph {
// Delete is a STRICT op, so collapse #1 never skips its open.
let edge_ds = edge_handle.expect("strict Delete op always opens its dataset");
- let (_new_edge_ds, edge_delete) = self
- .storage_inline_residual()
- .delete_where(&edge_full_path, edge_ds, &cascade_filter)
+ // `affected_edges` was the post-inline-commit `deleted_rows`; with
+ // staged deletes the rows aren't removed until end-of-query, so
+ // count the matching committed edges now. Exact under D₂ (no staged
+ // inserts can add matches mid-query), and bounded by the cascade
+ // working set. Exclude edges a prior delete statement (a prior
+ // cascade, or an explicit edge delete) on this table already
+ // scheduled, so an edge incident to two deleted nodes — or matched
+ // by both a cascade and an explicit `delete ` — is counted
+ // once. Record the ORIGINAL cascade filter (the combined staged
+ // delete removes the union); skip only when nothing NEW matches.
+ let count_filter =
+ dedup_delete_filter(&cascade_filter, staging.recorded_delete_predicates(&edge_table_key));
+ let matched = self
+ .storage()
+ .count_rows(&edge_ds, Some(count_filter))
.await?;
+ affected_edges += matched;
- affected_edges += edge_delete.deleted_rows;
-
- if edge_delete.deleted_rows > 0 {
- staging.record_inline(crate::db::SubTableUpdate {
- table_key: edge_table_key,
- table_version: edge_delete.version,
- table_branch: edge_table_branch,
- row_count: edge_delete.row_count,
- version_metadata: edge_delete.version_metadata,
- });
+ if matched > 0 {
+ staging.record_delete(&edge_table_key, cascade_filter);
}
}
@@ -1517,7 +1520,7 @@ impl Omnigraph {
let pred_sql = predicate_to_sql(predicate, params, true)?;
let table_key = format!("edge:{}", type_name);
- let (handle, full_path, table_branch) = open_table_for_mutation(
+ let (handle, _full_path, _table_branch) = open_table_for_mutation(
self,
staging,
branch,
@@ -1529,20 +1532,21 @@ impl Omnigraph {
// Delete is a STRICT op, so collapse #1 never skips its open.
let ds = handle.expect("strict Delete op always opens its dataset");
- let (_new_ds, delete_state) = self
- .storage_inline_residual()
- .delete_where(&full_path, ds, &pred_sql)
+ // Count matching committed edges now (the staged delete won't remove
+ // them until end-of-query). Exact under D₂; exclude edges a prior delete
+ // statement on this table (an earlier cascade or edge delete) already
+ // scheduled, so overlapping statements don't double-count. Record the
+ // ORIGINAL predicate below (the combined staged delete removes the
+ // union); only record when something NEW matches.
+ let count_filter =
+ dedup_delete_filter(&pred_sql, staging.recorded_delete_predicates(&table_key));
+ let affected = self
+ .storage()
+ .count_rows(&ds, Some(count_filter))
.await?;
- let affected = delete_state.deleted_rows;
if affected > 0 {
- staging.record_inline(crate::db::SubTableUpdate {
- table_key,
- table_version: delete_state.version,
- table_branch,
- row_count: delete_state.row_count,
- version_metadata: delete_state.version_metadata,
- });
+ staging.record_delete(&table_key, pred_sql.clone());
self.invalidate_graph_index().await;
}
diff --git a/crates/omnigraph/src/exec/staging.rs b/crates/omnigraph/src/exec/staging.rs
index ddfaa065..20c7bb28 100644
--- a/crates/omnigraph/src/exec/staging.rs
+++ b/crates/omnigraph/src/exec/staging.rs
@@ -14,9 +14,12 @@
//! This module is shared by the engine's mutation path (`exec/mutation.rs`)
//! and the bulk loader (`loader/mod.rs`); both feed insert/update batches
//! into `pending` and route end-of-query commits through `finalize`.
-//! Deletes follow the inline-commit path and are recorded via
-//! `record_inline` (parse-time D₂ rule prevents mixed insert/delete in a
-//! single query, so no flushing is required).
+//! Deletes accumulate as predicates in `delete_predicates` (via
+//! `record_delete`) and stage through the same `stage_* → commit_staged`
+//! path as writes — `stage_delete` produces a deletion-vector transaction
+//! that advances no Lance HEAD until the end-of-query commit. The parse-time
+//! D₂ rule keeps inserts/updates and deletes from mixing in one query, so
+//! `pending` and `delete_predicates` never overlap on a table.
use std::collections::{HashMap, HashSet};
use std::sync::Arc;
@@ -78,9 +81,9 @@ pub(crate) struct StagedTablePath {
///
/// Replaces the legacy inline-commit `MutationStaging.latest` map with
/// an in-memory accumulator that defers all Lance HEAD advances to
-/// end-of-query. After this rewire the bug class "Lance HEAD drifts ahead
-/// of `__manifest`" is unreachable in `mutate_as` and `load` for inserts
-/// and updates by construction.
+/// end-of-query. Inserts, updates AND deletes all stage here, so the bug
+/// class "Lance HEAD drifts ahead of `__manifest`" is unreachable in
+/// `mutate_as` and `load` by construction.
#[derive(Default)]
pub(crate) struct MutationStaging {
/// Pre-write manifest version per table — the publisher's CAS fence at
@@ -90,9 +93,11 @@ pub(crate) struct MutationStaging {
pub(crate) paths: HashMap,
/// In-memory accumulated batches per table (insert/update path).
pub(crate) pending: HashMap,
- /// Inline-committed updates from delete-touching ops (D₂ guarantees no
- /// pending batches exist on a delete-touched table).
- pub(crate) inline_committed: HashMap,
+ /// Per-table delete predicates from delete-touching ops. D₂ guarantees a
+ /// table is write-XOR-delete within one query, so this never overlaps
+ /// `pending`. Staged as one combined `stage_delete` per table at
+ /// end-of-query (no inline HEAD advance) — see `stage_delete_table`.
+ pub(crate) delete_predicates: HashMap>,
/// Strictest [`MutationOpKind`] seen per table within this query. Drives
/// the op-kind-aware drift check in [`StagedMutation::commit_all`]: for
/// tables whose first or any subsequent touch was a strict op
@@ -210,10 +215,29 @@ impl MutationStaging {
Ok(())
}
- /// Record a delete that already inline-committed at the Lance layer.
- pub(crate) fn record_inline(&mut self, update: SubTableUpdate) {
- self.inline_committed
- .insert(update.table_key.clone(), update);
+ /// Record a delete predicate for `table_key`. The caller must have already
+ /// called `ensure_path` (via `open_table_for_mutation`) so the table's
+ /// path/version/op-kind are captured. D₂ guarantees a delete-touched table
+ /// has no pending write batches, so the predicates are staged as one
+ /// combined `stage_delete` at end-of-query — no inline HEAD advance.
+ pub(crate) fn record_delete(&mut self, table_key: &str, predicate: String) {
+ self.delete_predicates
+ .entry(table_key.to_string())
+ .or_default()
+ .push(predicate);
+ }
+
+ /// Delete predicates already recorded for `table_key` by earlier delete
+ /// statements in this query. Read before recording the current statement's
+ /// predicate so its `affected_*` count can exclude rows a prior statement
+ /// already scheduled for deletion (deletes stage, so the committed snapshot
+ /// is unchanged across statements — without this, overlapping predicates
+ /// would double-count). `&[]` if none.
+ pub(crate) fn recorded_delete_predicates(&self, table_key: &str) -> &[String] {
+ self.delete_predicates
+ .get(table_key)
+ .map(|v| v.as_slice())
+ .unwrap_or(&[])
}
/// Read-your-writes accessor: the accumulated pending batches for
@@ -237,10 +261,10 @@ impl MutationStaging {
self.pending.get(table_key).map(|p| p.schema.clone())
}
- /// `true` if neither pending nor inline_committed has any state — the
- /// query made no observable writes.
+ /// `true` if neither pending writes nor delete predicates have any state —
+ /// the query made no observable writes.
pub(crate) fn is_empty(&self) -> bool {
- self.pending.is_empty() && self.inline_committed.is_empty()
+ self.pending.is_empty() && self.delete_predicates.is_empty()
}
/// Total count of pending rows across all tables. Used by tests and
@@ -282,7 +306,7 @@ impl MutationStaging {
expected_versions,
paths,
pending,
- inline_committed,
+ delete_predicates,
op_kinds,
} = self;
@@ -304,11 +328,13 @@ impl MutationStaging {
stage_inputs.push((table_key, table, path, expected));
}
let concurrency = concurrency.min(stage_inputs.len()).max(1);
- let staged_entries = futures::stream::iter(stage_inputs.into_iter().map(
- |(table_key, table, path, expected)| async move {
- stage_pending_table(db, table_key, table, path, expected).await
- },
- ))
+ let mut staged_entries: Vec = futures::stream::iter(
+ stage_inputs.into_iter().map(
+ |(table_key, table, path, expected)| async move {
+ stage_pending_table(db, table_key, table, path, expected).await
+ },
+ ),
+ )
.buffered(concurrency)
.collect::>>>()
.await
@@ -318,11 +344,48 @@ impl MutationStaging {
.flatten()
.collect();
+ // Second pass: stage deletes through the same staged path. D₂
+ // guarantees a delete-touched table carries no pending write batches,
+ // so `delete_predicates` and `pending` are disjoint — each is a fresh
+ // `StagedTableEntry`, never a merge into a write entry above. Multiple
+ // predicates on one table (a cascade hitting an edge table twice, or
+ // two delete statements) combine into a single `(p₁) OR (p₂) …` staged
+ // delete, so the table advances Lance HEAD exactly once at commit. A
+ // predicate matching zero committed rows yields `None` and is skipped
+ // (the staged equivalent of the old "skip record_inline on 0 rows" —
+ // no inline HEAD advance, closing the zero-row drift class).
+ for (table_key, predicates) in delete_predicates {
+ let path = paths.get(&table_key).cloned().ok_or_else(|| {
+ OmniError::manifest_internal(format!(
+ "MutationStaging::stage_all: missing path for delete table '{}'",
+ table_key
+ ))
+ })?;
+ let expected = *expected_versions.get(&table_key).ok_or_else(|| {
+ OmniError::manifest_internal(format!(
+ "MutationStaging::stage_all: missing expected version for delete table '{}'",
+ table_key
+ ))
+ })?;
+ let combined = if predicates.len() == 1 {
+ predicates.into_iter().next().unwrap()
+ } else {
+ predicates
+ .iter()
+ .map(|p| format!("({})", p))
+ .collect::>()
+ .join(" OR ")
+ };
+ if let Some(entry) =
+ stage_delete_table(db, table_key, combined, path, expected).await?
+ {
+ staged_entries.push(entry);
+ }
+ }
+
Ok(StagedMutation {
- inline_committed,
staged: staged_entries,
expected_versions,
- paths,
op_kinds,
})
}
@@ -395,6 +458,42 @@ async fn stage_pending_table(
}))
}
+/// Stage a delete on `table_key` from a combined predicate, mirroring
+/// [`stage_pending_table`] for the delete path. Reopens the dataset at the
+/// pinned `expected` version (strict Delete op) and stages a deletion-vector
+/// transaction via `TableStorage::stage_delete` — Phase A writes the deletion
+/// file but advances no Lance HEAD until `commit_all` runs `commit_staged`.
+/// Returns `None` when the predicate matches zero committed rows, so a no-op
+/// delete stages nothing and never moves HEAD (the zero-row drift fix carried
+/// onto the staged path).
+async fn stage_delete_table(
+ db: &crate::db::Omnigraph,
+ table_key: String,
+ predicate: String,
+ path: StagedTablePath,
+ expected: u64,
+) -> Result