Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions AGENTS.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ OmniGraph is a typed property-graph engine built as a coordination layer over ma
- **Languages**: a `.pg` schema language and a `.gq` query language, both Pest-based, with a typed IR.
- **Multi-modal querying**: vector ANN (`nearest`), full-text (`search`/`fuzzy`/`match_text`/`bm25`), Reciprocal Rank Fusion (`rrf`), and graph traversal (`Expand`, anti-join `not { … }`) in one runtime.
- **Branches and commits across the whole graph**: Git-style — every successful publish appends to a commit DAG; merges are three-way at the row level.
- **Atomic per-query writes**: `mutate_as` and `load` accumulate insert/update batches into an in-memory `MutationStaging.pending` per touched table; one `stage_*` + `commit_staged` per table runs at end-of-query, then `ManifestBatchPublisher::publish` commits the manifest atomically with per-table `expected_table_versions` CAS. A mid-query failure leaves Lance HEAD untouched on staged tables — no drift, no run state machine, no staging branches. Deletes still inline-commit; D₂ at parse time prevents inserts/updates and deletes from coexisting in one query.
- **Atomic per-query writes**: `mutate_as` and `load` accumulate insert/update batches into an in-memory `MutationStaging.pending` per touched table; one `stage_*` + `commit_staged` per table runs at end-of-query, then `ManifestBatchPublisher::publish` commits the manifest atomically with per-table `expected_table_versions` CAS. A mid-query failure leaves Lance HEAD untouched on staged tables — no drift, no run state machine, no staging branches. Deletes stage through the same path (MR-A: `stage_delete` via Lance 7.0 `DeleteBuilder::execute_uncommitted`), so they no longer advance Lance HEAD inline. D₂ at parse time is a deliberate boundary — one mutation query is constructive (insert/update) XOR destructive (delete) — so read-your-writes within a query stays unambiguous and each table commits at most one version; compose mixed operations via separate mutations, or a branch for single-commit atomicity.
- **HTTP server**: Axum + utoipa OpenAPI, bearer auth (SHA-256 hashed, optional AWS Secrets Manager). Cedar policy enforcement is engine-wide — every `_as` writer calls `Omnigraph::enforce(action, scope, actor)`, so HTTP, CLI, and embedded SDK consumers all hit the same gate. **Cluster-only boot** (RFC-011): the server always boots from a cluster directory (`--cluster <dir | s3://…>`, RFC-005) and serves N graphs (N ≥ 1) under multi-graph routes (`/graphs/{graph_id}/...` + read-only `GET /graphs` enumeration); there are no single-graph flat routes and no positional-URI boot. Per-graph + server-level Cedar policies. Runtime add/remove (`POST /graphs`, `DELETE /graphs/{id}`) is not exposed — operators run `cluster apply` and restart.
- **CLI** with two-surface config (RFC-007/008): the team-owned cluster directory (`cluster.yaml`) plus the per-operator `~/.omnigraph/config.yaml` (servers, clusters, credentials, actor, profiles, aliases, defaults). Graphs are addressed via `--store`/`--server`/`--cluster`/`--profile`/operator defaults (RFC-011). Multi-format output (json/jsonl/csv/kv/table).

Expand Down Expand Up @@ -250,7 +250,7 @@ omnigraph policy explain --cluster ./company-brain --graph knowledge --actor act
| Columnar storage on object store | ✅ Arrow/Lance | URI normalization, S3 env-var plumbing |
| Per-dataset versioning + time travel | ✅ | `snapshot_at_version`, `entity_at`, snapshot-pinned reads across many tables |
| Per-dataset branches | ✅ | **Graph-level** branches (atomic across all sub-tables), lazy fork, system branch filtering |
| Atomic single-dataset commits | ✅ | **Multi-table publish via three layers**, NOT a single Lance primitive: (1) per-table Lance `commit_staged` for the data write, (2) `__manifest` row-level CAS via `ManifestBatchPublisher` for cross-table ordering, (3) the open-time recovery sweep for the residual gap between (1) and (2). All three layers ship; the five migrated writers (`MutationStaging::finalize`, `schema_apply`, `branch_merge`, `ensure_indices`, `optimize_all_tables`) write a `__recovery/{ulid}.json` sidecar before Phase B and delete it after Phase C. The next `Omnigraph::open` (gated on `OpenMode::ReadWrite`) runs the sweep in `db/manifest/recovery.rs`: classify, decide all-or-nothing per sidecar, roll forward via single `ManifestBatchPublisher::publish` or roll back via `Dataset::restore` followed by a manifest publish of the restored version (so both directions converge to `manifest == HEAD` — no residual drift), and record an audit row in `_graph_commit_recoveries.lance` (queryable via `omnigraph commit list --filter actor=omnigraph:recovery`). The write entry points (`load_as`, `mutate_as`, `apply_schema_as`, `branch_merge_as`) and `refresh` additionally run an in-process roll-forward-only heal (serialized against live writers via the per-table write queues), so a long-lived server converges on its next write without restart; only rollback-eligible sidecars still defer to the next read-write open (a future background reconciler's goal). Engine writes route through a sealed `TableStorage` trait (`db.storage()`) exposing only `stage_*` + `commit_staged` + reads; the inline-commit residuals (`delete_where`, `create_vector_index`) are split onto a separate sealed `InlineCommitResidual` trait reached via `db.storage_inline_residual()` (MR-854), so the default surface cannot couple a write with a HEAD advance — §1 holds by construction. `delete_where` and `create_vector_index` stay inline until upstream Lance ships a public two-phase API ([#6658](https://github.com/lance-format/lance/issues/6658), [#6666](https://github.com/lance-format/lance/issues/6666)); `LoadMode::Overwrite` uses Lance `Overwrite` staged transactions. |
| Atomic single-dataset commits | ✅ | **Multi-table publish via three layers**, NOT a single Lance primitive: (1) per-table Lance `commit_staged` for the data write, (2) `__manifest` row-level CAS via `ManifestBatchPublisher` for cross-table ordering, (3) the open-time recovery sweep for the residual gap between (1) and (2). All three layers ship; the five migrated writers (`MutationStaging::finalize`, `schema_apply`, `branch_merge`, `ensure_indices`, `optimize_all_tables`) write a `__recovery/{ulid}.json` sidecar before Phase B and delete it after Phase C. The next `Omnigraph::open` (gated on `OpenMode::ReadWrite`) runs the sweep in `db/manifest/recovery.rs`: classify, decide all-or-nothing per sidecar, roll forward via single `ManifestBatchPublisher::publish` or roll back via `Dataset::restore` followed by a manifest publish of the restored version (so both directions converge to `manifest == HEAD` — no residual drift), and record an audit row in `_graph_commit_recoveries.lance` (queryable via `omnigraph commit list --filter actor=omnigraph:recovery`). The write entry points (`load_as`, `mutate_as`, `apply_schema_as`, `branch_merge_as`) and `refresh` additionally run an in-process roll-forward-only heal (serialized against live writers via the per-table write queues), so a long-lived server converges on its next write without restart; only rollback-eligible sidecars still defer to the next read-write open (a future background reconciler's goal). Engine writes route through a sealed `TableStorage` trait (`db.storage()`) exposing only `stage_*` + `commit_staged` + reads; the sole inline-commit residual (`create_vector_index`) is split onto a separate sealed `InlineCommitResidual` trait reached via `db.storage_inline_residual()` (MR-854), so the default surface cannot couple a write with a HEAD advance — §1 holds by construction. `delete` migrated to the staged path in MR-A (`stage_delete` via Lance 7.0 `DeleteBuilder::execute_uncommitted`, [#6658](https://github.com/lance-format/lance/issues/6658)); `create_vector_index` stays inline until upstream Lance ships a public two-phase API ([#6666](https://github.com/lance-format/lance/issues/6666)); `LoadMode::Overwrite` uses Lance `Overwrite` staged transactions. |
| Compaction (`compact_files`) + reindex (`optimize_indices`) | ✅ | `omnigraph optimize` orchestrates over all node/edge tables, bounded concurrency; per table runs `compact_files` **then Lance `optimize_indices`** (folds appended/rewritten fragments back into existing indexes — incremental merge, not retrain) and **publishes the resulting version to `__manifest`** (so the manifest tracks the Lance HEAD — required for reads to observe the work and for schema apply / strict writes to pass their HEAD-vs-manifest precondition), under the per-`(table, main)` write queue with `SidecarKind::Optimize` recovery coverage spanning both ops; **commits even with no compaction work if index coverage is stale**; **refuses on an unrecovered graph**; **skips uncovered HEAD > manifest drift** with `DriftNeedsRepair`; **skips blob-bearing tables** (reported via `TableOptimizeStats.skipped`, not silent; reindex is skipped for them too today), gated on `LANCE_SUPPORTS_BLOB_COMPACTION` until the upstream blob-v2 compaction-decode bug is fixed (see [docs/dev/invariants.md](docs/dev/invariants.md) Known Gaps) |
| Repair uncovered drift | — | `omnigraph repair` explicitly classifies uncovered table `HEAD > manifest` drift: verified maintenance drift (`ReserveFragments`/`Rewrite`) can be published with `--confirm`; suspicious or unverifiable drift requires `--force --confirm`. Sidecar-covered crash residuals still recover automatically on open. |
| Cleanup (`cleanup_old_versions`) | ✅ | `omnigraph cleanup` with `--keep` / `--older-than` policy |
Expand Down
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ datafusion-expr = "53"
datafusion-functions-aggregate = "53"

lance = { version = "7.0.0", default-features = false, features = ["aws"] }
lance-core = "7.0.0"
lance-datafusion = "7.0.0"
lance-file = "7.0.0"
lance-index = "7.0.0"
Expand Down
1 change: 1 addition & 0 deletions crates/omnigraph/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ failpoints = ["dep:fail", "fail/failpoints"]
omnigraph-compiler = { path = "../omnigraph-compiler", version = "0.7.2" }
omnigraph-policy = { path = "../omnigraph-policy", version = "0.7.2" }
lance = { workspace = true }
lance-core = { workspace = true }
lance-datafusion = { workspace = true }
datafusion = { workspace = true }
lance-file = { workspace = true }
Expand Down
19 changes: 10 additions & 9 deletions crates/omnigraph/src/db/omnigraph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ pub struct Omnigraph {
/// write-serialization mechanism (the server holds the engine as a
/// lockless `Arc<Omnigraph>`). Reachable from engine internals
/// (mutation finalize, schema_apply, branch_merge, ensure_indices,
/// delete_where, the fork path, recovery reconciler).
/// the fork path, recovery reconciler).
write_queue: Arc<crate::db::write_queue::WriteQueueManager>,
/// Process-wide mutex held across the swap → operate → restore window
/// in `branch_merge_impl`. Two concurrent merges with distinct targets
Expand Down Expand Up @@ -702,13 +702,14 @@ impl Omnigraph {
&self.table_store
}

/// Inline-commit residual surface (`delete_where`,
/// `create_vector_index`) — the writes Lance cannot yet express as a
/// stage-then-commit pair. Deliberately separate from [`Self::storage`] so
/// the default storage surface is staged-only and a new writer cannot couple
/// "write bytes" with "advance HEAD" by reaching for `db.storage()`. Only
/// the handful of documented residual call sites (mutation/merge deletes,
/// vector-index build) use this accessor. See
/// Inline-commit residual surface (`create_vector_index`) — the sole
/// write Lance cannot yet express as a stage-then-commit pair (segment
/// commit needs `build_index_metadata_from_segments`, Lance #6666).
/// Deliberately separate from [`Self::storage`] so the default storage
/// surface is staged-only and a new writer cannot couple "write bytes" with
/// "advance HEAD" by reaching for `db.storage()`. Only the vector-index
/// build uses this accessor — delete migrated to the staged path
/// (`stage_delete`) in MR-A. See
/// `crate::storage_layer::InlineCommitResidual` for the per-method blocker.
pub(crate) fn storage_inline_residual(
&self,
Expand All @@ -726,7 +727,7 @@ impl Omnigraph {
/// Per-`(table_key, branch)` writer queues.
///
/// Engine-internal writers (mutation finalize, schema_apply,
/// branch_merge, ensure_indices, delete_where) and the future MR-870
/// branch_merge, ensure_indices) and the future MR-870
/// recovery reconciler reach the queue manager via this accessor.
/// Returns an `Arc` clone so callers can hold the manager across
/// `&mut self` engine API boundaries.
Expand Down
5 changes: 3 additions & 2 deletions crates/omnigraph/src/db/omnigraph/table_ops.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1171,8 +1171,9 @@ async fn prepare_updates_for_commit(
// its just-committed version. When a table's handle is present, the index
// build below reuses it and SKIPS the `reopen_for_mutation` open. Absent
// entries (other writers — schema apply, merge, ensure_indices, tests —
// pass `HashMap::new()`; inline-committed/delete tables are never staged)
// keep the byte-identical `reopen_for_mutation` path.
// pass `HashMap::new()`) keep the byte-identical `reopen_for_mutation`
// path. Delete tables ARE staged now (MR-A), so their handle is present
// like any other staged write.
mut committed_handles: std::collections::HashMap<String, SnapshotHandle>,
) -> Result<Vec<crate::db::SubTableUpdate>> {
if updates.is_empty() {
Expand Down
8 changes: 4 additions & 4 deletions crates/omnigraph/src/db/write_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@
//! disjoint-key writes proceed concurrently and only writes to the same
//! `(table_key, branch_ref)` serialize here. This module owns the queue
//! data structure; callers in `MutationStaging::commit_all`, `branch_merge`,
//! `schema_apply`, `ensure_indices`, `delete_where`, the fork path (first
//! write to a table on a branch — acquired before the fork, held through the
//! manifest publish), and the recovery reconciler acquire guards before any
//! per-table Lance commit. Serialization is in-process only; cross-process
//! `schema_apply`, `ensure_indices`, the fork path (first write to a table on
//! a branch — acquired before the fork, held through the manifest publish),
//! and the recovery reconciler acquire guards before any per-table Lance
//! commit. Serialization is in-process only; cross-process
//! writers on one graph remain one-winner-CAS at the manifest publish.
//!
//! ## Why exclusive `tokio::sync::Mutex<()>` per key
Expand Down
48 changes: 23 additions & 25 deletions crates/omnigraph/src/exec/merge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1065,9 +1065,10 @@ async fn publish_rewritten_merge_table(
staged: &StagedMergeResult,
) -> Result<crate::db::SubTableUpdate> {
// Branch merge's source-rewrite path is Merge-shaped (upsert from
// source onto target). The inline `delete_where` later in this
// function operates on rows the rewrite chose to remove, not
// user-facing predicates, so Merge is the correct policy here.
// source onto target). The staged delete later in this function
// (`stage_delete` + `commit_staged`) operates on rows the rewrite chose
// to remove, not user-facing predicates, so Merge is the correct policy
// here.
// `open_for_mutation` is the no-txn entry, so collapse #1's non-strict
// open-skip (gated on `txn.is_some()`) never fires here — the handle is
// always `Some`.
Expand Down Expand Up @@ -1130,27 +1131,23 @@ async fn publish_rewritten_merge_table(
// See tests/failpoints.rs::branch_merge_rewrite_partial_after_merge_rolls_back.
crate::failpoints::maybe_fail(crate::failpoints::names::BRANCH_MERGE_REWRITE_AFTER_MERGE_PRE_DELETE)?;

// Phase 2: delete removed rows via deletion vectors.
//
// INLINE-COMMIT RESIDUAL: lance-6.0.1 does not expose a public
// two-phase delete API (DeleteJob is `pub(crate)` —
// lance-format/lance#6658 is open with no PRs). We deliberately do
// NOT introduce a `stage_delete` wrapper that would secretly
// inline-commit (it would create a side-channel between the staged
// and inline write paths). When the upstream API ships, swap this
// `delete_where` call for `stage_delete` + `commit_staged`.
// Phase 2: delete removed rows via deletion vectors, staged through
// `stage_delete` + `commit_staged` (MR-A — Lance 7.0's
// `DeleteBuilder::execute_uncommitted`, #6658, made delete a two-phase
// staged write, so this no longer inline-commits).
if !staged.deleted_ids.is_empty() {
let escaped: Vec<String> = staged
.deleted_ids
.iter()
.map(|id| format!("'{}'", id.replace('\'', "''")))
.collect();
let filter = format!("id IN ({})", escaped.join(", "));
let (new_ds, _) = target_db
.storage_inline_residual()
.delete_where(&full_path, current_ds, &filter)
.await?;
current_ds = new_ds;
if let Some(staged_delete) = target_db.storage().stage_delete(&current_ds, &filter).await? {
current_ds = target_db
.storage()
.commit_staged(current_ds, staged_delete)
.await?;
}
}

// Failpoint: crash after the Phase 2 delete commit, before the index build.
Expand Down Expand Up @@ -1310,20 +1307,21 @@ async fn publish_adopted_delta(
// tests/failpoints.rs::branch_merge_adopt_partial_after_upsert_rolls_back.
crate::failpoints::maybe_fail(crate::failpoints::names::BRANCH_MERGE_ADOPT_AFTER_UPSERT_PRE_DELETE)?;

// Phase 2: delete removed rows via deletion vectors (inline-commit residual,
// same as the three-way path until Lance ships a public two-phase delete).
// Phase 2: delete removed rows via deletion vectors, staged through
// `stage_delete` + `commit_staged` (same as the three-way path; MR-A).
if !delta.deleted_ids.is_empty() {
let escaped: Vec<String> = delta
.deleted_ids
.iter()
.map(|id| format!("'{}'", id.replace('\'', "''")))
.collect();
let filter = format!("id IN ({})", escaped.join(", "));
let (new_ds, _) = target_db
.storage_inline_residual()
.delete_where(&full_path, current_ds, &filter)
.await?;
current_ds = new_ds;
if let Some(staged_delete) = target_db.storage().stage_delete(&current_ds, &filter).await? {
current_ds = target_db
.storage()
.commit_staged(current_ds, staged_delete)
.await?;
}
}

// Phase 4: index coverage is reconciler-owned on the adopt path. Unlike the
Expand Down Expand Up @@ -1597,7 +1595,7 @@ impl Omnigraph {
// Pin `RewriteMerged` and `AdoptWithDelta` candidates — both advance
// Lance HEAD before the manifest publish (RewriteMerged via
// publish_rewritten_merge_table; AdoptWithDelta via publish_adopted_delta:
// stage_append + stage_merge_insert + delete_where + index — multiple
// stage_append + stage_merge_insert + stage_delete + index — multiple
// commit_staged calls per table, which the loose classification handles
// as multi-step drift).
//
Expand Down
Loading