Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 42 additions & 30 deletions crates/omnigraph/src/db/omnigraph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -505,9 +505,7 @@ impl Omnigraph {
}

for run in self.list_runs().await? {
if run.target_branch == branch
&& matches!(run.status, RunStatus::Running | RunStatus::Failed)
{
if run.target_branch == branch && matches!(run.status, RunStatus::Running) {
return Err(OmniError::manifest_conflict(format!(
"cannot delete branch '{}' while run '{}' targeting it is {}",
branch,
Expand Down Expand Up @@ -591,12 +589,21 @@ impl Omnigraph {
.into_iter()
.filter(|run| {
run.target_branch == branch
&& matches!(run.status, RunStatus::Published | RunStatus::Aborted)
&& matches!(
run.status,
RunStatus::Published | RunStatus::Aborted | RunStatus::Failed
)
})
.map(|run| run.run_branch)
.collect::<Vec<_>>();

let live_branches: HashSet<String> =
self.coordinator.all_branches().await?.into_iter().collect();

for run_branch in terminal_run_branches {
if !live_branches.contains(&run_branch) {
continue;
}
match self.delete_branch_storage_only(&run_branch).await {
Ok(()) => {}
Err(OmniError::Manifest(err)) if err.kind == ManifestErrorKind::NotFound => {}
Expand Down Expand Up @@ -776,9 +783,7 @@ impl Omnigraph {
let run = self.get_run(run_id).await?;
match run.status {
RunStatus::Running | RunStatus::Failed => {
let updated = run.with_status(RunStatus::Aborted, None)?;
self.coordinator.append_run_record(&updated).await?;
Ok(updated)
self.terminate_run(&run, RunStatus::Aborted, None).await
}
RunStatus::Published => Err(OmniError::manifest_conflict(format!(
"run '{}' is already published",
Expand All @@ -795,11 +800,7 @@ impl Omnigraph {
self.ensure_schema_state_valid().await?;
let run = self.get_run(run_id).await?;
match run.status {
RunStatus::Running => {
let updated = run.with_status(RunStatus::Failed, None)?;
self.coordinator.append_run_record(&updated).await?;
Ok(updated)
}
RunStatus::Running => self.terminate_run(&run, RunStatus::Failed, None).await,
RunStatus::Failed => Ok(run),
RunStatus::Published => Err(OmniError::manifest_conflict(format!(
"run '{}' is already published",
Expand All @@ -812,6 +813,22 @@ impl Omnigraph {
}
}

/// Append a terminal-state run record and delete the `__run__` branch.
/// The status record is authoritative; the branch is scaffolding. Delete
/// errors are swallowed — a later `branch_delete` of the target will
/// retry via `cleanup_terminal_run_branches_for_target`.
async fn terminate_run(
&mut self,
run: &RunRecord,
status: RunStatus,
published_snapshot_id: Option<String>,
) -> Result<RunRecord> {
let updated = run.with_status(status, published_snapshot_id)?;
self.coordinator.append_run_record(&updated).await?;
let _ = self.delete_branch_storage_only(&updated.run_branch).await;
Ok(updated)
}

pub async fn publish_run(&mut self, run_id: &RunId) -> Result<SnapshotId> {
self.publish_run_as(run_id, None).await
}
Expand Down Expand Up @@ -869,11 +886,12 @@ impl Omnigraph {
self.audit_actor_id = previous_actor;
publish_result?;
let published_snapshot_id = self.resolve_snapshot(&run.target_branch).await?;
let updated = run.with_status(
self.terminate_run(
&run,
RunStatus::Published,
Some(published_snapshot_id.as_str().to_string()),
)?;
self.coordinator.append_run_record(&updated).await?;
)
.await?;
Ok(published_snapshot_id)
}

Expand Down Expand Up @@ -1723,19 +1741,17 @@ edge WorksAt: Person -> Company
}

#[tokio::test]
async fn test_apply_schema_succeeds_after_load_creates_published_run_branch() {
// Regression for MR-670: schema apply used to fail after any load or
// change because published __run__ branches count as "non-main" in
// the blocking-branch check, and there is no CLI path to clean them
// up (branch_delete rejects internal refs; run abort rejects
// Published runs). Published run branches are intentionally retained
// for post-publish inspection — schema apply now filters them out
// instead of requiring their deletion.
async fn test_apply_schema_succeeds_after_load() {
// MR-670 + MR-674: schema apply used to be blocked by leftover
// __run__ branches. MR-670 added a defense-in-depth filter that
// skips internal system branches. MR-674 made run branches
// ephemeral on every terminal state, so in practice no __run__
// branch survives publish — but the filter still guards the
// invariant.
let dir = tempfile::tempdir().unwrap();
let uri = dir.path().to_str().unwrap();
let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();

// A load goes through a __run__ branch which remains after publish.
crate::loader::load_jsonl(
&mut db,
r#"{"type": "Person", "data": {"name": "Alice", "age": 30}}"#,
Expand All @@ -1744,17 +1760,13 @@ edge WorksAt: Person -> Company
.await
.unwrap();

// Confirm at the coordinator level that a published run branch did
// get created and persists after publish.
let all_branches = db.coordinator.all_branches().await.unwrap();
assert!(
all_branches.iter().any(|b| is_internal_run_branch(b)),
"expected at least one internal run branch after load, got: {:?}",
!all_branches.iter().any(|b| is_internal_run_branch(b)),
"MR-674: run branch should be deleted after publish, got: {:?}",
all_branches
);

// Schema apply should succeed — the filter skips internal system
// branches, including __run__ ones.
let desired = TEST_SCHEMA.replace(
" age: I32?\n}",
" age: I32?\n nickname: String?\n}",
Expand Down
116 changes: 92 additions & 24 deletions crates/omnigraph/tests/runs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use lance::Dataset;

use omnigraph::db::commit_graph::CommitGraph;
use omnigraph::db::{Omnigraph, ReadTarget, RunStatus};
use omnigraph::error::OmniError;
use omnigraph::error::{ManifestErrorKind, OmniError};
use omnigraph::loader::{LoadMode, load_jsonl};

use helpers::*;
Expand Down Expand Up @@ -170,7 +170,7 @@ async fn publish_run_merges_internal_branch_into_target_and_marks_record() {
}

#[tokio::test]
async fn abort_run_keeps_target_unchanged_and_preserves_hidden_branch_for_inspection() {
async fn abort_run_leaves_target_unchanged_and_deletes_run_branch() {
let dir = tempfile::tempdir().unwrap();
let mut db = init_and_load(&dir).await;
let run = db.begin_run("main", Some("abort-test")).await.unwrap();
Expand All @@ -197,16 +197,21 @@ async fn abort_run_keeps_target_unchanged_and_preserves_hidden_branch_for_inspec
.unwrap();
assert_eq!(main_qr.num_rows(), 0);

let run_qr = db
let err = db
.query(
ReadTarget::branch(run.run_branch.as_str()),
TEST_QUERIES,
"get_person",
&params(&[("$name", "Eve")]),
)
.await
.unwrap();
assert_eq!(run_qr.num_rows(), 1);
.unwrap_err();
assert!(
matches!(err, OmniError::Manifest(ref e) if e.kind == ManifestErrorKind::NotFound)
|| matches!(err, OmniError::Lance(_)),
"run branch should be gone after abort, got: {}",
err
);
}

#[tokio::test]
Expand Down Expand Up @@ -292,21 +297,22 @@ async fn public_load_preserves_staged_edge_ids_on_publish() {
let uri = dir.path().to_str().unwrap();
let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();

load_jsonl(&mut db, TEST_DATA, LoadMode::Overwrite)
let run = db.begin_run("main", Some("preserve-ids-load")).await.unwrap();
db.load(&run.run_branch, TEST_DATA, LoadMode::Overwrite)
.await
.unwrap();

let runs = latest_runs(uri).await;
let run_branch = runs[0].run_branch.clone();

let mut main_ids = collect_column_strings(&read_table(&db, "edge:Knows").await, "id");
let mut run_ids = collect_column_strings(
&read_table_branch(&db, run_branch.as_str(), "edge:Knows").await,
let mut staged_ids = collect_column_strings(
&read_table_branch(&db, run.run_branch.as_str(), "edge:Knows").await,
"id",
);
staged_ids.sort();

db.publish_run(&run.run_id).await.unwrap();

let mut main_ids = collect_column_strings(&read_table(&db, "edge:Knows").await, "id");
main_ids.sort();
run_ids.sort();
assert_eq!(main_ids, run_ids);
assert_eq!(main_ids, staged_ids);
}

#[tokio::test]
Expand Down Expand Up @@ -381,29 +387,32 @@ async fn public_mutation_uses_hidden_transactional_run_and_publishes_it() {
#[tokio::test]
async fn public_mutation_preserves_staged_edge_ids_on_publish() {
let dir = tempfile::tempdir().unwrap();
let uri = dir.path().to_str().unwrap();
let mut db = init_and_load(&dir).await;

let run = db
.begin_run("main", Some("preserve-ids-mutation"))
.await
.unwrap();
db.mutate(
"main",
run.run_branch.as_str(),
MUTATION_QUERIES,
"add_friend",
&params(&[("$from", "Alice"), ("$to", "Diana")]),
)
.await
.unwrap();

let runs = latest_runs(uri).await;
let latest = runs.last().unwrap();

let mut main_ids = collect_column_strings(&read_table(&db, "edge:Knows").await, "id");
let mut run_ids = collect_column_strings(
&read_table_branch(&db, latest.run_branch.as_str(), "edge:Knows").await,
let mut staged_ids = collect_column_strings(
&read_table_branch(&db, run.run_branch.as_str(), "edge:Knows").await,
"id",
);
staged_ids.sort();

db.publish_run(&run.run_id).await.unwrap();

let mut main_ids = collect_column_strings(&read_table(&db, "edge:Knows").await, "id");
main_ids.sort();
run_ids.sort();
assert_eq!(main_ids, run_ids);
assert_eq!(main_ids, staged_ids);
}

#[tokio::test]
Expand Down Expand Up @@ -531,3 +540,62 @@ async fn public_mutation_records_actor_on_run_and_published_commit() {
.unwrap();
assert_eq!(head.actor_id.as_deref(), Some("act-andrew"));
}

#[tokio::test]
async fn run_branches_do_not_accumulate_across_repeated_loads() {
// MR-674: run branches are transactional scaffolding. Every terminal
// state (Published, Aborted, Failed) deletes the branch. Verifying the
// invariant end-to-end: after 10 publishes and one abort, only main
// should remain.
let dir = tempfile::tempdir().unwrap();
let uri = dir.path().to_str().unwrap();
let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();

for i in 0..10 {
let payload = format!(
r#"{{"type":"Person","data":{{"name":"p{}","age":{}}}}}"#,
i, i
);
load_jsonl(&mut db, &payload, LoadMode::Append)
.await
.unwrap();
}

let aborted_run = db.begin_run("main", Some("abort-me")).await.unwrap();
db.abort_run(&aborted_run.run_id).await.unwrap();

assert_eq!(db.branch_list().await.unwrap(), vec!["main".to_string()]);
let all_branches = Omnigraph::open(uri)
.await
.unwrap()
.branch_list()
.await
.unwrap();
assert_eq!(all_branches, vec!["main".to_string()]);
}

#[tokio::test]
async fn failed_load_deletes_run_branch() {
let dir = tempfile::tempdir().unwrap();
let uri = dir.path().to_str().unwrap();
let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();

let bad = r#"{"type":"Person","data":{"name":"Alice","age":30}}
{"edge":"Knows","from":"Alice","to":"Missing"}"#;
let _ = load_jsonl(&mut db, bad, LoadMode::Overwrite).await;

let runs = latest_runs(uri).await;
assert_eq!(runs.len(), 1);
assert_eq!(runs[0].status, "failed");

let err = db
.snapshot_of(ReadTarget::branch(runs[0].run_branch.as_str()))
.await
.unwrap_err();
assert!(
matches!(err, OmniError::Manifest(ref e) if e.kind == ManifestErrorKind::NotFound)
|| matches!(err, OmniError::Lance(_)),
"failed run's branch should be gone, got: {}",
err
);
}