From 9fd867f8f77fedf4d98456f43ef953b7a0e6f7cd Mon Sep 17 00:00:00 2001 From: Hasan Khan Date: Sun, 28 Jun 2026 11:39:09 -0700 Subject: [PATCH 1/3] fix(db): unify state history tables Signed-off-by: Hasan Khan --- crates/api-core/src/handlers/power_shelf.rs | 13 +- crates/api-core/src/handlers/rack.rs | 10 +- crates/api-core/src/handlers/switch.rs | 13 +- crates/api-core/src/tests/machine_history.rs | 2 +- crates/api-core/src/tests/power_shelf.rs | 25 ++ crates/api-core/src/tests/rack_find.rs | 25 ++ crates/api-core/src/tests/switch.rs | 25 ++ crates/api-core/src/tests/vpc_prefix.rs | 4 +- ...60628120000_unify_state_history_tables.sql | 176 ++++++++++++ crates/api-db/src/dpa_interface.rs | 140 +++++++++- crates/api-db/src/network_segment.rs | 8 +- .../sql/machine_snapshot_history_join.snippet | 4 +- .../src/sql/managed_host_history_join.snippet | 4 +- crates/api-db/src/state_history.rs | 252 +++++++++++++----- 14 files changed, 579 insertions(+), 122 deletions(-) create mode 100644 crates/api-db/migrations/20260628120000_unify_state_history_tables.sql diff --git a/crates/api-core/src/handlers/power_shelf.rs b/crates/api-core/src/handlers/power_shelf.rs index 9c556ce90a..81bbee49fb 100644 --- a/crates/api-core/src/handlers/power_shelf.rs +++ b/crates/api-core/src/handlers/power_shelf.rs @@ -229,8 +229,8 @@ pub async fn delete_power_shelf( } /// Force deletes a power shelf and optionally its associated interfaces from the database. -/// Unlike `delete_power_shelf` (soft delete), this immediately hard-deletes the power shelf, -/// its state history, and optionally its machine interfaces. +/// Unlike `delete_power_shelf` (soft delete), this immediately hard-deletes the power shelf +/// while retaining its state history. pub async fn admin_force_delete_power_shelf( api: &Api, request: Request, @@ -275,15 +275,6 @@ pub async fn admin_force_delete_power_shelf( interfaces_deleted = interface_ids.len() as u32; } - // Delete state history. - db::state_history::delete_by_object_id( - &mut txn, - db::state_history::StateHistoryTableId::PowerShelf, - &power_shelf_id, - ) - .await - .map_err(CarbideError::from)?; - // Hard-delete the power shelf. db_power_shelf::final_delete(power_shelf_id, &mut txn) .await diff --git a/crates/api-core/src/handlers/rack.rs b/crates/api-core/src/handlers/rack.rs index 4e4bcf91c8..da3bacbe1d 100644 --- a/crates/api-core/src/handlers/rack.rs +++ b/crates/api-core/src/handlers/rack.rs @@ -213,7 +213,7 @@ pub async fn delete_rack( /// Force deletes a rack from the database. /// Unlike `delete_rack` (soft delete), this immediately hard-deletes the rack -/// and its state history. +/// while retaining its state history. pub async fn admin_force_delete_rack( api: &Api, request: Request, @@ -242,14 +242,6 @@ pub async fn admin_force_delete_rack( .into()); } - db::state_history::delete_by_object_id( - &mut txn, - db::state_history::StateHistoryTableId::Rack, - &rack_id, - ) - .await - .map_err(CarbideError::from)?; - db_rack::final_delete(&mut txn, &rack_id) .await .map_err(CarbideError::from)?; diff --git a/crates/api-core/src/handlers/switch.rs b/crates/api-core/src/handlers/switch.rs index 4bda23fbf5..df4786603f 100644 --- a/crates/api-core/src/handlers/switch.rs +++ b/crates/api-core/src/handlers/switch.rs @@ -314,8 +314,8 @@ pub async fn delete_switch( } /// Force deletes a switch and optionally its associated interfaces from the database. -/// Unlike `delete_switch` (soft delete), this immediately hard-deletes the switch, -/// its state history, and optionally its machine interfaces. +/// Unlike `delete_switch` (soft delete), this immediately hard-deletes the switch +/// while retaining its state history. pub async fn admin_force_delete_switch( api: &Api, request: Request, @@ -359,15 +359,6 @@ pub async fn admin_force_delete_switch( interfaces_deleted = interface_ids.len() as u32; } - // Delete state history. - db::state_history::delete_by_object_id( - &mut txn, - db::state_history::StateHistoryTableId::Switch, - &switch_id, - ) - .await - .map_err(CarbideError::from)?; - // Hard-delete the switch. db_switch::final_delete(switch_id, &mut txn) .await diff --git a/crates/api-core/src/tests/machine_history.rs b/crates/api-core/src/tests/machine_history.rs index 28f57bb283..8d57974aa6 100644 --- a/crates/api-core/src/tests/machine_history.rs +++ b/crates/api-core/src/tests/machine_history.rs @@ -242,7 +242,7 @@ async fn test_old_machine_state_history( let mut txn = env.pool.begin().await?; - let query = "INSERT INTO machine_state_history (machine_id, state, state_version) VALUES ($1, $2::jsonb, $3)"; + let query = "INSERT INTO machine_state_history (object_id, state, state_version) VALUES ($1, $2::jsonb, $3)"; sqlx::query(query) .bind(host_machine_id.to_string()) .bind(r#"{"state": "hostinit", "machine_state": {"state": "nolongerarealstate"}}"#) diff --git a/crates/api-core/src/tests/power_shelf.rs b/crates/api-core/src/tests/power_shelf.rs index dc71217599..d0bebd11cd 100644 --- a/crates/api-core/src/tests/power_shelf.rs +++ b/crates/api-core/src/tests/power_shelf.rs @@ -638,6 +638,17 @@ async fn test_force_delete_power_shelf_success( ) .await?; + let mut txn = env.pool.begin().await?; + db::state_history::persist( + &mut txn, + db::state_history::StateHistoryTableId::PowerShelf, + &power_shelf_id, + &"retained-before-force-delete", + config_version::ConfigVersion::initial(), + ) + .await?; + txn.commit().await?; + // Force delete without deleting interfaces. let response = env .api @@ -666,6 +677,20 @@ async fn test_force_delete_power_shelf_success( "Power shelf should be hard-deleted" ); + let mut conn = env.pool.acquire().await?; + let history = db::state_history::for_object( + &mut conn, + db::state_history::StateHistoryTableId::PowerShelf, + &power_shelf_id, + ) + .await?; + assert!( + history + .iter() + .any(|record| record.state == r#""retained-before-force-delete""#), + "Power shelf state history should be retained", + ); + Ok(()) } diff --git a/crates/api-core/src/tests/rack_find.rs b/crates/api-core/src/tests/rack_find.rs index 8d88d426dd..d3e09f8b4e 100644 --- a/crates/api-core/src/tests/rack_find.rs +++ b/crates/api-core/src/tests/rack_find.rs @@ -126,6 +126,17 @@ async fn test_force_delete_rack_success( .unwrap(); drop(txn); + let mut txn = env.pool.begin().await?; + db::state_history::persist( + &mut txn, + db::state_history::StateHistoryTableId::Rack, + &rack_id, + &"retained-before-force-delete", + config_version::ConfigVersion::initial(), + ) + .await?; + txn.commit().await?; + let response = env .api .admin_force_delete_rack(tonic::Request::new(AdminForceDeleteRackRequest { @@ -147,6 +158,20 @@ async fn test_force_delete_rack_success( assert!(racks.is_empty(), "Rack should be hard-deleted"); + let mut conn = env.pool.acquire().await?; + let history = db::state_history::for_object( + &mut conn, + db::state_history::StateHistoryTableId::Rack, + &rack_id, + ) + .await?; + assert!( + history + .iter() + .any(|record| record.state == r#""retained-before-force-delete""#), + "Rack state history should be retained", + ); + Ok(()) } diff --git a/crates/api-core/src/tests/switch.rs b/crates/api-core/src/tests/switch.rs index db1c9fc9d8..777ad599ab 100644 --- a/crates/api-core/src/tests/switch.rs +++ b/crates/api-core/src/tests/switch.rs @@ -627,6 +627,17 @@ async fn test_force_delete_switch_success( let switch_id = new_switch(&env, None, None).await?; + let mut txn = env.pool.begin().await?; + db::state_history::persist( + &mut txn, + db::state_history::StateHistoryTableId::Switch, + &switch_id, + &"retained-before-force-delete", + config_version::ConfigVersion::initial(), + ) + .await?; + txn.commit().await?; + // Force delete without deleting interfaces. let response = env .api @@ -655,6 +666,20 @@ async fn test_force_delete_switch_success( "Switch should be hard-deleted" ); + let mut conn = env.pool.acquire().await?; + let history = db::state_history::for_object( + &mut conn, + db::state_history::StateHistoryTableId::Switch, + &switch_id, + ) + .await?; + assert!( + history + .iter() + .any(|record| record.state == r#""retained-before-force-delete""#), + "Switch state history should be retained", + ); + Ok(()) } diff --git a/crates/api-core/src/tests/vpc_prefix.rs b/crates/api-core/src/tests/vpc_prefix.rs index ef92445643..0c425097d5 100644 --- a/crates/api-core/src/tests/vpc_prefix.rs +++ b/crates/api-core/src/tests/vpc_prefix.rs @@ -466,9 +466,9 @@ async fn test_deleted_provisioning_vpc_prefix_enters_deleting_on_first_controlle assert_eq!( sqlx::query_scalar::<_, i64>( "SELECT COUNT(*) FROM vpc_prefix_state_history \ - WHERE vpc_prefix_id = $1 AND state->>'state' = 'ready'", + WHERE object_id = $1 AND state->>'state' = 'ready'", ) - .bind(id) + .bind(id.to_string()) .fetch_one(&env.pool) .await .expect("Could not count ready history records"), diff --git a/crates/api-db/migrations/20260628120000_unify_state_history_tables.sql b/crates/api-db/migrations/20260628120000_unify_state_history_tables.sql new file mode 100644 index 0000000000..4b2c78cb81 --- /dev/null +++ b/crates/api-db/migrations/20260628120000_unify_state_history_tables.sql @@ -0,0 +1,176 @@ +-- State history must outlive the object it describes. Remove the remaining +-- cascading foreign keys before normalizing the object ID columns. +ALTER TABLE power_shelf_state_history + DROP CONSTRAINT fk_power_shelf_state_history_power_shelf_id; +ALTER TABLE rack_state_history + DROP CONSTRAINT fk_rack_state_history_rack_id; +ALTER TABLE switch_state_history + DROP CONSTRAINT fk_switch_state_history_switch_id; + +-- Older power-shelf and switch history rows allowed explicit NULL timestamps. +-- Preserve those rows while making the timestamp contract consistent. +UPDATE power_shelf_state_history SET timestamp = NOW() WHERE timestamp IS NULL; +UPDATE switch_state_history SET timestamp = NOW() WHERE timestamp IS NULL; + +ALTER TABLE power_shelf_state_history + ALTER COLUMN timestamp SET NOT NULL; +ALTER TABLE switch_state_history + ALTER COLUMN timestamp SET NOT NULL; + +-- Give every state-history table the same TEXT object key. +ALTER TABLE machine_state_history + RENAME COLUMN machine_id TO object_id; +ALTER TABLE machine_state_history + ALTER COLUMN object_id TYPE TEXT USING object_id::TEXT; + +ALTER TABLE network_segment_state_history + RENAME COLUMN segment_id TO object_id; +ALTER TABLE network_segment_state_history + ALTER COLUMN object_id TYPE TEXT USING object_id::TEXT; + +ALTER TABLE vpc_prefix_state_history + RENAME COLUMN vpc_prefix_id TO object_id; +ALTER TABLE vpc_prefix_state_history + ALTER COLUMN object_id TYPE TEXT USING object_id::TEXT; + +ALTER TABLE dpa_interface_state_history + RENAME COLUMN interface_id TO object_id; +ALTER TABLE dpa_interface_state_history + ALTER COLUMN object_id TYPE TEXT USING object_id::TEXT; + +ALTER TABLE ib_partition_state_history + RENAME COLUMN partition_id TO object_id; +ALTER TABLE ib_partition_state_history + ALTER COLUMN object_id TYPE TEXT USING object_id::TEXT; + +ALTER TABLE power_shelf_state_history + RENAME COLUMN power_shelf_id TO object_id; +ALTER TABLE power_shelf_state_history + ALTER COLUMN object_id TYPE TEXT USING object_id::TEXT; + +ALTER TABLE rack_state_history + RENAME COLUMN rack_id TO object_id; +ALTER TABLE rack_state_history + ALTER COLUMN object_id TYPE TEXT USING object_id::TEXT; + +ALTER TABLE switch_state_history + RENAME COLUMN switch_id TO object_id; +ALTER TABLE switch_state_history + ALTER COLUMN object_id TYPE TEXT USING object_id::TEXT; + +-- Keep object-ID lookups indexed consistently across every history table. +ALTER INDEX machine_state_history_machine_id_idx + RENAME TO machine_state_history_object_id_idx; +CREATE INDEX network_segment_state_history_object_id_idx + ON network_segment_state_history(object_id); +ALTER INDEX vpc_prefix_state_history_vpc_prefix_id_idx + RENAME TO vpc_prefix_state_history_object_id_idx; +CREATE INDEX dpa_interface_state_history_object_id_idx + ON dpa_interface_state_history(object_id); +CREATE INDEX ib_partition_state_history_object_id_idx + ON ib_partition_state_history(object_id); +ALTER INDEX idx_power_shelf_state_history_power_shelf_id + RENAME TO power_shelf_state_history_object_id_idx; +ALTER INDEX idx_rack_state_history_rack_id + RENAME TO rack_state_history_object_id_idx; +ALTER INDEX idx_switch_state_history_switch_id + RENAME TO switch_state_history_object_id_idx; + +-- Recreate the retention functions against the common object_id column. +CREATE OR REPLACE FUNCTION machine_state_history_keep_limit() +RETURNS TRIGGER AS +$body$ +BEGIN + DELETE FROM machine_state_history WHERE object_id=NEW.object_id AND id NOT IN (SELECT id FROM machine_state_history WHERE object_id=NEW.object_id ORDER BY id DESC LIMIT 250); + RETURN NULL; +END; +$body$ +LANGUAGE plpgsql; + +CREATE OR REPLACE FUNCTION network_segment_state_history_keep_limit() +RETURNS TRIGGER AS +$body$ +BEGIN + DELETE FROM network_segment_state_history WHERE object_id=NEW.object_id AND id NOT IN (SELECT id FROM network_segment_state_history WHERE object_id=NEW.object_id ORDER BY id DESC LIMIT 250); + RETURN NULL; +END; +$body$ +LANGUAGE plpgsql; + +CREATE OR REPLACE FUNCTION vpc_prefix_state_history_keep_limit() +RETURNS TRIGGER AS +$body$ +BEGIN + DELETE FROM vpc_prefix_state_history WHERE object_id=NEW.object_id AND id NOT IN (SELECT id FROM vpc_prefix_state_history WHERE object_id=NEW.object_id ORDER BY id DESC LIMIT 250); + RETURN NULL; +END; +$body$ +LANGUAGE plpgsql; + +CREATE OR REPLACE FUNCTION dpa_interface_state_history_keep_limit() +RETURNS TRIGGER AS +$body$ +BEGIN + DELETE FROM dpa_interface_state_history WHERE object_id=NEW.object_id AND id NOT IN (SELECT id FROM dpa_interface_state_history WHERE object_id=NEW.object_id ORDER BY id DESC LIMIT 250); + RETURN NULL; +END; +$body$ +LANGUAGE plpgsql; + +CREATE OR REPLACE FUNCTION ib_partition_state_history_keep_limit() +RETURNS TRIGGER AS +$body$ +BEGIN + DELETE FROM ib_partition_state_history WHERE object_id=NEW.object_id AND id NOT IN (SELECT id FROM ib_partition_state_history WHERE object_id=NEW.object_id ORDER BY id DESC LIMIT 250); + RETURN NULL; +END; +$body$ +LANGUAGE plpgsql; + +CREATE OR REPLACE FUNCTION power_shelf_state_history_keep_limit() +RETURNS TRIGGER AS +$body$ +BEGIN + DELETE FROM power_shelf_state_history WHERE object_id=NEW.object_id AND id NOT IN (SELECT id FROM power_shelf_state_history WHERE object_id=NEW.object_id ORDER BY id DESC LIMIT 250); + RETURN NULL; +END; +$body$ +LANGUAGE plpgsql; + +CREATE OR REPLACE FUNCTION rack_state_history_keep_limit() +RETURNS TRIGGER AS +$body$ +BEGIN + DELETE FROM rack_state_history WHERE object_id=NEW.object_id AND id NOT IN (SELECT id FROM rack_state_history WHERE object_id=NEW.object_id ORDER BY id DESC LIMIT 250); + RETURN NULL; +END; +$body$ +LANGUAGE plpgsql; + +CREATE OR REPLACE FUNCTION switch_state_history_keep_limit() +RETURNS TRIGGER AS +$body$ +BEGIN + DELETE FROM switch_state_history WHERE object_id=NEW.object_id AND id NOT IN (SELECT id FROM switch_state_history WHERE object_id=NEW.object_id ORDER BY id DESC LIMIT 250); + RETURN NULL; +END; +$body$ +LANGUAGE plpgsql; + +-- Machine cleanup deletes DPA-interface rows, but their history is durable. +CREATE OR REPLACE PROCEDURE cleanup_machine_by_id(deletion_machine_id VARCHAR(64)) +LANGUAGE plpgsql AS $$ +BEGIN + UPDATE machine_interfaces SET machine_id = NULL WHERE machine_id = deletion_machine_id; + UPDATE machine_interfaces SET attached_dpu_machine_id = NULL WHERE attached_dpu_machine_id = deletion_machine_id; + DELETE FROM measurement_journal WHERE report_id IN (SELECT report_id FROM measurement_reports WHERE machine_id = deletion_machine_id); + DELETE FROM measurement_reports_values WHERE report_id IN (SELECT report_id FROM measurement_reports WHERE machine_id = deletion_machine_id); + DELETE FROM measurement_reports WHERE machine_id = deletion_machine_id; + DELETE FROM measurement_approved_machines WHERE machine_id = deletion_machine_id; + DELETE FROM machine_topologies WHERE machine_id = deletion_machine_id; + DELETE FROM machine_validation WHERE machine_id = deletion_machine_id; + DELETE FROM dpa_interfaces WHERE machine_id = deletion_machine_id; + DELETE FROM applied_dpu_remediations WHERE dpu_machine_id = deletion_machine_id; + DELETE FROM machines WHERE id = deletion_machine_id; +END +$$; diff --git a/crates/api-db/src/dpa_interface.rs b/crates/api-db/src/dpa_interface.rs index bc14f3f84c..bd8f7ed683 100644 --- a/crates/api-db/src/dpa_interface.rs +++ b/crates/api-db/src/dpa_interface.rs @@ -306,8 +306,8 @@ pub async fn find_by_ids( sqlx::QueryBuilder::new("select row_to_json(m.*) from (SELECT si.*, COALESCE(history_agg.json, '[]'::json) AS history FROM dpa_interfaces si LEFT JOIN LATERAL ( - SELECT h.interface_id, json_agg(json_build_object('interface_id', h.interface_id, 'state', h.state::text, 'state_version', h.state_version, - 'timestamp', h.timestamp)) AS json FROM dpa_interface_state_history h WHERE h.interface_id = si.id GROUP BY h.interface_id ) AS history_agg ON true + SELECT h.object_id, json_agg(json_build_object('interface_id', h.object_id, 'state', h.state::text, 'state_version', h.state_version, + 'timestamp', h.timestamp)) AS json FROM dpa_interface_state_history h WHERE h.object_id = si.id::text GROUP BY h.object_id ) AS history_agg ON true WHERE deleted is NULL") } else { sqlx::QueryBuilder::new( @@ -411,13 +411,6 @@ pub async fn update_controller_state_outcome( } pub async fn delete(value: DpaInterface, txn: &mut PgConnection) -> Result<(), DatabaseError> { - let query = "delete from dpa_interface_state_history where interface_id=$1"; - sqlx::query(query) - .bind(value.id) - .execute(&mut *txn) - .await - .map_err(|e| DatabaseError::query(query, e))?; - let query = "delete from dpa_interfaces where id=$1"; sqlx::query(query) .bind(value.id) @@ -548,7 +541,7 @@ mod test { use carbide_libmlx_model::device::info::MlxDeviceInfo; use carbide_uuid::machine::MachineId; use mac_address::MacAddress; - use model::dpa_interface::{DpaInterfaceType, NewDpaInterface}; + use model::dpa_interface::{DpaInterfaceControllerState, DpaInterfaceType, NewDpaInterface}; use model::machine::ManagedHostState; use crate::machine; @@ -586,6 +579,133 @@ mod test { Ok(()) } + #[crate::sqlx_test] + async fn deleting_interface_retains_state_history( + pool: sqlx::PgPool, + ) -> Result<(), Box> { + let mut txn = pool.begin().await?; + let machine_id = + MachineId::from_str("fm100htes3rn1npvbtm5qd57dkilaag7ljugl1llmm7rfuq1ov50i0rpl30")?; + machine::create( + &mut txn, + None, + &machine_id, + ManagedHostState::Ready, + None, + 2, + ) + .await?; + + let interface = crate::dpa_interface::persist( + NewDpaInterface { + mac_address: MacAddress::from_str("00:11:22:33:44:55")?, + machine_id, + device_type: "Bluefield 3".to_string(), + pci_name: "5e:00.0".to_string(), + device_description: None, + interface_type: DpaInterfaceType::Svpc, + }, + &mut txn, + ) + .await?; + crate::state_history::persist( + &mut txn, + crate::state_history::StateHistoryTableId::DpaInterface, + &interface.id, + &DpaInterfaceControllerState::Provisioning, + config_version::ConfigVersion::initial(), + ) + .await?; + + let interfaces_with_history = + crate::dpa_interface::find_by_ids(txn.as_mut(), &[interface.id], true).await?; + assert_eq!(interfaces_with_history.len(), 1); + assert_eq!( + interfaces_with_history[0].history.len(), + 1, + "DPA include-history query should use the shared object_id column", + ); + + crate::dpa_interface::delete(interface.clone(), &mut txn).await?; + + assert!( + crate::dpa_interface::find_by_ids(txn.as_mut(), &[interface.id], false) + .await? + .is_empty(), + "DPA interface should be deleted", + ); + let history = crate::state_history::for_object( + txn.as_mut(), + crate::state_history::StateHistoryTableId::DpaInterface, + &interface.id, + ) + .await?; + assert_eq!(history.len(), 1, "DPA state history should be retained"); + + Ok(()) + } + + #[crate::sqlx_test] + async fn deleting_machine_retains_dpa_interface_state_history( + pool: sqlx::PgPool, + ) -> Result<(), Box> { + let mut txn = pool.begin().await?; + let machine_id = + MachineId::from_str("fm100htes3rn1npvbtm5qd57dkilaag7ljugl1llmm7rfuq1ov50i0rpl30")?; + machine::create( + &mut txn, + None, + &machine_id, + ManagedHostState::Ready, + None, + 2, + ) + .await?; + + let interface = crate::dpa_interface::persist( + NewDpaInterface { + mac_address: MacAddress::from_str("00:11:22:33:44:55")?, + machine_id, + device_type: "Bluefield 3".to_string(), + pci_name: "5e:00.0".to_string(), + device_description: None, + interface_type: DpaInterfaceType::Svpc, + }, + &mut txn, + ) + .await?; + crate::state_history::persist( + &mut txn, + crate::state_history::StateHistoryTableId::DpaInterface, + &interface.id, + &DpaInterfaceControllerState::Provisioning, + config_version::ConfigVersion::initial(), + ) + .await?; + + machine::force_cleanup(&mut txn, &machine_id).await?; + + assert!( + crate::dpa_interface::find_by_ids(txn.as_mut(), &[interface.id], false) + .await? + .is_empty(), + "DPA interface should be deleted with its machine", + ); + let history = crate::state_history::for_object( + txn.as_mut(), + crate::state_history::StateHistoryTableId::DpaInterface, + &interface.id, + ) + .await?; + assert_eq!( + history.len(), + 1, + "DPA state history should survive machine cleanup", + ); + + Ok(()) + } + // test_ensure_idempotent verifies that calling ensure() twice with // the same (machine_id, mac_address) returns the same DpaInterface // both times without error, ensuring that ensure ensures as ensured! diff --git a/crates/api-db/src/network_segment.rs b/crates/api-db/src/network_segment.rs index 469c2e15c2..0cf74d37ba 100644 --- a/crates/api-db/src/network_segment.rs +++ b/crates/api-db/src/network_segment.rs @@ -95,11 +95,11 @@ macro_rules! network_segment_snapshot_with_history_query { GROUP BY np.segment_id ) AS prefixes_agg ON true LEFT JOIN LATERAL ( - SELECT h.segment_id, - json_agg(json_build_object('segment_id', h.segment_id, 'state', h.state::text, 'state_version', h.state_version, 'timestamp', h."timestamp")) AS json + SELECT h.object_id, + json_agg(json_build_object('segment_id', h.object_id, 'state', h.state::text, 'state_version', h.state_version, 'timestamp', h."timestamp")) AS json FROM network_segment_state_history h - WHERE h.segment_id = ns.id - GROUP BY h.segment_id + WHERE h.object_id = ns.id::text + GROUP BY h.object_id ) AS history_agg ON true "# }; diff --git a/crates/api-db/src/sql/machine_snapshot_history_join.snippet b/crates/api-db/src/sql/machine_snapshot_history_join.snippet index 08683d22fa..bbc2d464d5 100644 --- a/crates/api-db/src/sql/machine_snapshot_history_join.snippet +++ b/crates/api-db/src/sql/machine_snapshot_history_join.snippet @@ -1,5 +1,5 @@ LEFT JOIN LATERAL ( - SELECT json_agg(json_build_object('machine_id', mh.machine_id, 'state', mh.state::TEXT, 'state_version', mh.state_version, 'timestamp', mh.timestamp)) AS json + SELECT json_agg(json_build_object('machine_id', mh.object_id, 'state', mh.state::TEXT, 'state_version', mh.state_version, 'timestamp', mh.timestamp)) AS json FROM machine_state_history mh - WHERE mh.machine_id = m.id + WHERE mh.object_id = m.id ) AS h ON true diff --git a/crates/api-db/src/sql/managed_host_history_join.snippet b/crates/api-db/src/sql/managed_host_history_join.snippet index cc704b00dd..c250d7d611 100644 --- a/crates/api-db/src/sql/managed_host_history_join.snippet +++ b/crates/api-db/src/sql/managed_host_history_join.snippet @@ -1,5 +1,5 @@ LEFT JOIN LATERAL ( - SELECT json_agg(json_build_object('machine_id', mh.machine_id, 'state', mh.state::TEXT, 'state_version', mh.state_version, 'timestamp', mh.timestamp)) AS json + SELECT json_agg(json_build_object('machine_id', mh.object_id, 'state', mh.state::TEXT, 'state_version', mh.state_version, 'timestamp', mh.timestamp)) AS json FROM machine_state_history mh - WHERE mh.machine_id = m2.id + WHERE mh.object_id = m2.id ) AS h ON true diff --git a/crates/api-db/src/state_history.rs b/crates/api-db/src/state_history.rs index cf5924c1a4..0fd7240a1e 100644 --- a/crates/api-db/src/state_history.rs +++ b/crates/api-db/src/state_history.rs @@ -20,7 +20,7 @@ use config_version::ConfigVersion; use model::state_history::StateHistoryRecord; use serde::Serialize; use sqlx::postgres::PgRow; -use sqlx::{Encode, FromRow, PgConnection, Postgres, Row, Type}; +use sqlx::{FromRow, PgConnection, Row}; use crate::{DatabaseError, DatabaseResult}; @@ -79,32 +79,6 @@ impl StateHistoryTableId { StateHistoryTableId::Switch => "switch_state_history", } } - - pub fn object_id_column(self) -> &'static str { - match self { - StateHistoryTableId::Machine => "machine_id", - StateHistoryTableId::NetworkSegment => "segment_id", - StateHistoryTableId::VpcPrefix => "vpc_prefix_id", - StateHistoryTableId::DpaInterface => "interface_id", - StateHistoryTableId::IbPartition => "partition_id", - StateHistoryTableId::PowerShelf => "power_shelf_id", - StateHistoryTableId::Rack => "rack_id", - StateHistoryTableId::Switch => "switch_id", - } - } - - fn object_id_sql_type(self) -> &'static str { - match self { - StateHistoryTableId::NetworkSegment - | StateHistoryTableId::VpcPrefix - | StateHistoryTableId::DpaInterface - | StateHistoryTableId::IbPartition => "uuid", - StateHistoryTableId::Machine - | StateHistoryTableId::PowerShelf - | StateHistoryTableId::Rack - | StateHistoryTableId::Switch => "varchar", - } - } } /// Retrieve state history for a list of objects. @@ -120,13 +94,10 @@ pub async fn find_by_object_ids( return Ok(std::collections::HashMap::new()); } - let mut qb = sqlx::QueryBuilder::new("SELECT "); - qb.push(table_id.object_id_column()); - qb.push("::TEXT AS object_id, state::TEXT, state_version, timestamp FROM "); + let mut qb = + sqlx::QueryBuilder::new("SELECT object_id, state::TEXT, state_version, timestamp FROM "); qb.push(table_id.sql_table()); - qb.push(" WHERE "); - qb.push(table_id.object_id_column()); - qb.push("::TEXT IN ("); + qb.push(" WHERE object_id IN ("); let mut separated = qb.separated(", "); for id in ids { @@ -157,9 +128,7 @@ pub async fn for_object( ) -> DatabaseResult> { let mut query = sqlx::QueryBuilder::new("SELECT state::TEXT, state_version, timestamp FROM "); query.push(table_id.sql_table()); - query.push(" WHERE "); - query.push(table_id.object_id_column()); - query.push("::TEXT = "); + query.push(" WHERE object_id = "); query.push_bind(object_id.to_string()); query.push(" ORDER BY id ASC"); query @@ -170,24 +139,20 @@ pub async fn for_object( } /// Store a state history record for an object. -pub async fn persist( +pub async fn persist( txn: &mut PgConnection, table_id: StateHistoryTableId, - object_id: &ID, + object_id: &impl std::fmt::Display, state: &S, state_version: ConfigVersion, ) -> DatabaseResult where - ID: std::fmt::Display + Sync, - for<'q> &'q ID: Encode<'q, Postgres> + Type, S: Serialize + Sync, { let mut query = sqlx::QueryBuilder::new("INSERT INTO "); query.push(table_id.sql_table()); - query.push(" ("); - query.push(table_id.object_id_column()); - query.push(", state, state_version) VALUES ("); - query.push_bind(object_id); + query.push(" (object_id, state, state_version) VALUES ("); + query.push_bind(object_id.to_string()); query.push(", "); query.push_bind(sqlx::types::Json(state)); query.push(", "); @@ -212,15 +177,9 @@ pub async fn update_object_ids( ) -> DatabaseResult<()> { let mut query = sqlx::QueryBuilder::new("UPDATE "); query.push(table_id.sql_table()); - query.push(" SET "); - query.push(table_id.object_id_column()); - query.push(" = "); + query.push(" SET object_id = "); query.push_bind(new_object_id.to_string()); - query.push("::"); - query.push(table_id.object_id_sql_type()); - query.push(" WHERE "); - query.push(table_id.object_id_column()); - query.push("::TEXT = "); + query.push(" WHERE object_id = "); query.push_bind(old_object_id.to_string()); query .build() @@ -231,22 +190,175 @@ pub async fn update_object_ids( Ok(()) } -/// Delete all state history entries for an object. -pub async fn delete_by_object_id( - txn: &mut PgConnection, - table_id: StateHistoryTableId, - object_id: &impl std::fmt::Display, -) -> DatabaseResult { - let mut query = sqlx::QueryBuilder::new("DELETE FROM "); - query.push(table_id.sql_table()); - query.push(" WHERE "); - query.push(table_id.object_id_column()); - query.push("::TEXT = "); - query.push_bind(object_id.to_string()); - let result = query - .build() - .execute(txn) - .await - .map_err(|e| DatabaseError::query("state_history::delete_by_object_id", e))?; - Ok(result.rows_affected()) +#[cfg(test)] +mod tests { + use sqlx::PgPool; + + use super::{StateHistoryTableId, find_by_object_ids, for_object, persist, update_object_ids}; + + const TABLES: [StateHistoryTableId; 8] = [ + StateHistoryTableId::Machine, + StateHistoryTableId::NetworkSegment, + StateHistoryTableId::VpcPrefix, + StateHistoryTableId::DpaInterface, + StateHistoryTableId::IbPartition, + StateHistoryTableId::PowerShelf, + StateHistoryTableId::Rack, + StateHistoryTableId::Switch, + ]; + + #[crate::sqlx_test] + async fn state_history_tables_share_schema_and_retention_behavior( + pool: PgPool, + ) -> Result<(), Box> { + let mut conn = pool.acquire().await?; + let expected_columns = [ + ("id", "bigint", "NO"), + ("object_id", "text", "NO"), + ("state", "jsonb", "NO"), + ("state_version", "character varying", "NO"), + ("timestamp", "timestamp with time zone", "NO"), + ]; + + for table_id in TABLES { + let table_name = table_id.sql_table(); + let columns: Vec<(String, String, String)> = sqlx::query_as( + "SELECT column_name, data_type, is_nullable \ + FROM information_schema.columns \ + WHERE table_schema = 'public' AND table_name = $1 \ + ORDER BY ordinal_position", + ) + .bind(table_name) + .fetch_all(&mut *conn) + .await?; + assert_eq!( + columns, + expected_columns + .iter() + .map(|(name, data_type, nullable)| { + ( + (*name).to_string(), + (*data_type).to_string(), + (*nullable).to_string(), + ) + }) + .collect::>(), + "unexpected schema for {table_name}", + ); + + let primary_key: String = sqlx::query_scalar( + "SELECT key_column_usage.column_name \ + FROM information_schema.table_constraints \ + JOIN information_schema.key_column_usage \ + ON table_constraints.constraint_name = key_column_usage.constraint_name \ + AND table_constraints.constraint_schema = key_column_usage.constraint_schema \ + WHERE table_constraints.table_schema = 'public' \ + AND table_constraints.table_name = $1 \ + AND table_constraints.constraint_type = 'PRIMARY KEY'", + ) + .bind(table_name) + .fetch_one(&mut *conn) + .await?; + assert_eq!(primary_key, "id", "unexpected primary key for {table_name}"); + + let foreign_key_count: i64 = sqlx::query_scalar( + "SELECT COUNT(*) \ + FROM information_schema.table_constraints \ + WHERE table_schema = 'public' \ + AND table_name = $1 \ + AND constraint_type = 'FOREIGN KEY'", + ) + .bind(table_name) + .fetch_one(&mut *conn) + .await?; + assert_eq!( + foreign_key_count, 0, + "{table_name} must not reference the object table", + ); + + let timestamp_default: Option = sqlx::query_scalar( + "SELECT column_default \ + FROM information_schema.columns \ + WHERE table_schema = 'public' \ + AND table_name = $1 \ + AND column_name = 'timestamp'", + ) + .bind(table_name) + .fetch_one(&mut *conn) + .await?; + assert_eq!( + timestamp_default.as_deref(), + Some("now()"), + "unexpected timestamp default for {table_name}", + ); + + let has_object_id_index: bool = sqlx::query_scalar( + "SELECT EXISTS ( \ + SELECT 1 FROM pg_indexes \ + WHERE schemaname = 'public' \ + AND tablename = $1 \ + AND indexdef LIKE '% (object_id)%' \ + )", + ) + .bind(table_name) + .fetch_one(&mut *conn) + .await?; + assert!( + has_object_id_index, + "{table_name} must index object_id lookups", + ); + + // An arbitrary ID proves both that the table no longer has a parent + // foreign key and that every caller uses the common TEXT contract. + let object_id = format!("orphaned-{table_name}-{}", "x".repeat(80)); + let renamed_object_id = format!("renamed-{object_id}"); + let version = config_version::ConfigVersion::new(1); + let inserted = persist(&mut conn, table_id, &object_id, &1_u32, version).await?; + assert_eq!(inserted.state, "1", "unexpected state for {table_name}"); + assert_eq!(inserted.state_version, version); + assert!( + inserted.time.is_some(), + "missing timestamp for {table_name}" + ); + + let history = for_object(&mut conn, table_id, &object_id).await?; + assert_eq!(history.len(), 1, "failed to read {table_name}"); + + update_object_ids(&mut conn, table_id, &object_id, &renamed_object_id).await?; + let histories = find_by_object_ids( + &mut conn, + table_id, + &[renamed_object_id.as_str(), "missing-object"], + ) + .await?; + assert_eq!( + histories.len(), + 1, + "unexpected lookup result for {table_name}" + ); + assert_eq!( + histories[&renamed_object_id].len(), + 1, + "renamed history missing from {table_name}", + ); + + // Exercise the row-level retention trigger in one bulk insert. The + // original row is the oldest of 251 and must be evicted. + let mut insert = sqlx::QueryBuilder::new("INSERT INTO "); + insert.push(table_name); + insert.push(" (object_id, state, state_version) SELECT "); + insert.push_bind(&renamed_object_id); + insert.push(", to_jsonb(sequence), "); + insert.push_bind(config_version::ConfigVersion::new(2)); + insert.push(" FROM generate_series(2, 251) AS sequence"); + insert.build().execute(&mut *conn).await?; + + let retained = for_object(&mut conn, table_id, &renamed_object_id).await?; + assert_eq!(retained.len(), 250, "retention failed for {table_name}"); + assert_eq!(retained.first().unwrap().state, "2"); + assert_eq!(retained.last().unwrap().state, "251"); + } + + Ok(()) + } } From c46d99e551cd1e1dc12e35e62fc6f418dd89ff17 Mon Sep 17 00:00:00 2001 From: Hasan Khan Date: Sun, 28 Jun 2026 12:18:58 -0700 Subject: [PATCH 2/3] fix(db): serialize state history retention Signed-off-by: Hasan Khan --- ...60628120000_unify_state_history_tables.sql | 9 ++ crates/api-db/src/dpa_interface.rs | 32 +++++- crates/api-db/src/state_history.rs | 104 ++++++++++++++++++ 3 files changed, 140 insertions(+), 5 deletions(-) diff --git a/crates/api-db/migrations/20260628120000_unify_state_history_tables.sql b/crates/api-db/migrations/20260628120000_unify_state_history_tables.sql index 4b2c78cb81..2d1ad4653d 100644 --- a/crates/api-db/migrations/20260628120000_unify_state_history_tables.sql +++ b/crates/api-db/migrations/20260628120000_unify_state_history_tables.sql @@ -77,10 +77,12 @@ ALTER INDEX idx_switch_state_history_switch_id RENAME TO switch_state_history_object_id_idx; -- Recreate the retention functions against the common object_id column. +-- Serialize cleanup per table/object so concurrent inserts cannot exceed the limit. CREATE OR REPLACE FUNCTION machine_state_history_keep_limit() RETURNS TRIGGER AS $body$ BEGIN + PERFORM pg_advisory_xact_lock(hashtextextended(NEW.object_id, TG_RELID::bigint)); DELETE FROM machine_state_history WHERE object_id=NEW.object_id AND id NOT IN (SELECT id FROM machine_state_history WHERE object_id=NEW.object_id ORDER BY id DESC LIMIT 250); RETURN NULL; END; @@ -91,6 +93,7 @@ CREATE OR REPLACE FUNCTION network_segment_state_history_keep_limit() RETURNS TRIGGER AS $body$ BEGIN + PERFORM pg_advisory_xact_lock(hashtextextended(NEW.object_id, TG_RELID::bigint)); DELETE FROM network_segment_state_history WHERE object_id=NEW.object_id AND id NOT IN (SELECT id FROM network_segment_state_history WHERE object_id=NEW.object_id ORDER BY id DESC LIMIT 250); RETURN NULL; END; @@ -101,6 +104,7 @@ CREATE OR REPLACE FUNCTION vpc_prefix_state_history_keep_limit() RETURNS TRIGGER AS $body$ BEGIN + PERFORM pg_advisory_xact_lock(hashtextextended(NEW.object_id, TG_RELID::bigint)); DELETE FROM vpc_prefix_state_history WHERE object_id=NEW.object_id AND id NOT IN (SELECT id FROM vpc_prefix_state_history WHERE object_id=NEW.object_id ORDER BY id DESC LIMIT 250); RETURN NULL; END; @@ -111,6 +115,7 @@ CREATE OR REPLACE FUNCTION dpa_interface_state_history_keep_limit() RETURNS TRIGGER AS $body$ BEGIN + PERFORM pg_advisory_xact_lock(hashtextextended(NEW.object_id, TG_RELID::bigint)); DELETE FROM dpa_interface_state_history WHERE object_id=NEW.object_id AND id NOT IN (SELECT id FROM dpa_interface_state_history WHERE object_id=NEW.object_id ORDER BY id DESC LIMIT 250); RETURN NULL; END; @@ -121,6 +126,7 @@ CREATE OR REPLACE FUNCTION ib_partition_state_history_keep_limit() RETURNS TRIGGER AS $body$ BEGIN + PERFORM pg_advisory_xact_lock(hashtextextended(NEW.object_id, TG_RELID::bigint)); DELETE FROM ib_partition_state_history WHERE object_id=NEW.object_id AND id NOT IN (SELECT id FROM ib_partition_state_history WHERE object_id=NEW.object_id ORDER BY id DESC LIMIT 250); RETURN NULL; END; @@ -131,6 +137,7 @@ CREATE OR REPLACE FUNCTION power_shelf_state_history_keep_limit() RETURNS TRIGGER AS $body$ BEGIN + PERFORM pg_advisory_xact_lock(hashtextextended(NEW.object_id, TG_RELID::bigint)); DELETE FROM power_shelf_state_history WHERE object_id=NEW.object_id AND id NOT IN (SELECT id FROM power_shelf_state_history WHERE object_id=NEW.object_id ORDER BY id DESC LIMIT 250); RETURN NULL; END; @@ -141,6 +148,7 @@ CREATE OR REPLACE FUNCTION rack_state_history_keep_limit() RETURNS TRIGGER AS $body$ BEGIN + PERFORM pg_advisory_xact_lock(hashtextextended(NEW.object_id, TG_RELID::bigint)); DELETE FROM rack_state_history WHERE object_id=NEW.object_id AND id NOT IN (SELECT id FROM rack_state_history WHERE object_id=NEW.object_id ORDER BY id DESC LIMIT 250); RETURN NULL; END; @@ -151,6 +159,7 @@ CREATE OR REPLACE FUNCTION switch_state_history_keep_limit() RETURNS TRIGGER AS $body$ BEGIN + PERFORM pg_advisory_xact_lock(hashtextextended(NEW.object_id, TG_RELID::bigint)); DELETE FROM switch_state_history WHERE object_id=NEW.object_id AND id NOT IN (SELECT id FROM switch_state_history WHERE object_id=NEW.object_id ORDER BY id DESC LIMIT 250); RETURN NULL; END; diff --git a/crates/api-db/src/dpa_interface.rs b/crates/api-db/src/dpa_interface.rs index bd8f7ed683..6fa291a4e4 100644 --- a/crates/api-db/src/dpa_interface.rs +++ b/crates/api-db/src/dpa_interface.rs @@ -307,7 +307,7 @@ pub async fn find_by_ids( (SELECT si.*, COALESCE(history_agg.json, '[]'::json) AS history FROM dpa_interfaces si LEFT JOIN LATERAL ( SELECT h.object_id, json_agg(json_build_object('interface_id', h.object_id, 'state', h.state::text, 'state_version', h.state_version, - 'timestamp', h.timestamp)) AS json FROM dpa_interface_state_history h WHERE h.object_id = si.id::text GROUP BY h.object_id ) AS history_agg ON true + 'timestamp', h.timestamp) ORDER BY h.id ASC) AS json FROM dpa_interface_state_history h WHERE h.object_id = si.id::text GROUP BY h.object_id ) AS history_agg ON true WHERE deleted is NULL") } else { sqlx::QueryBuilder::new( @@ -616,14 +616,36 @@ mod test { config_version::ConfigVersion::initial(), ) .await?; + crate::state_history::persist( + &mut txn, + crate::state_history::StateHistoryTableId::DpaInterface, + &interface.id, + &DpaInterfaceControllerState::Ready, + config_version::ConfigVersion::new(2), + ) + .await?; + + let expected_history = crate::state_history::for_object( + txn.as_mut(), + crate::state_history::StateHistoryTableId::DpaInterface, + &interface.id, + ) + .await?; let interfaces_with_history = crate::dpa_interface::find_by_ids(txn.as_mut(), &[interface.id], true).await?; assert_eq!(interfaces_with_history.len(), 1); assert_eq!( - interfaces_with_history[0].history.len(), - 1, - "DPA include-history query should use the shared object_id column", + interfaces_with_history[0] + .history + .iter() + .map(|record| record.state.as_str()) + .collect::>(), + expected_history + .iter() + .map(|record| record.state.as_str()) + .collect::>(), + "DPA include-history query should use the shared object_id column and ordering", ); crate::dpa_interface::delete(interface.clone(), &mut txn).await?; @@ -640,7 +662,7 @@ mod test { &interface.id, ) .await?; - assert_eq!(history.len(), 1, "DPA state history should be retained"); + assert_eq!(history.len(), 2, "DPA state history should be retained"); Ok(()) } diff --git a/crates/api-db/src/state_history.rs b/crates/api-db/src/state_history.rs index 0fd7240a1e..0ce128cb00 100644 --- a/crates/api-db/src/state_history.rs +++ b/crates/api-db/src/state_history.rs @@ -207,6 +207,110 @@ mod tests { StateHistoryTableId::Switch, ]; + async fn assert_concurrent_retention( + pool: &PgPool, + table_id: StateHistoryTableId, + ) -> Result<(), Box> { + let table_name = table_id.sql_table(); + let object_id = format!("concurrent-{table_name}"); + + let mut seed = sqlx::QueryBuilder::new("INSERT INTO "); + seed.push(table_name); + seed.push(" (object_id, state, state_version) SELECT "); + seed.push_bind(&object_id); + seed.push(", to_jsonb(sequence), "); + seed.push_bind(config_version::ConfigVersion::new(1)); + seed.push(" FROM generate_series(1, 249) AS sequence"); + seed.build().execute(pool).await?; + + let mut first_txn = pool.begin().await?; + persist( + &mut first_txn, + table_id, + &object_id, + &250_u32, + config_version::ConfigVersion::new(250), + ) + .await?; + + let (pid_sender, pid_receiver) = tokio::sync::oneshot::channel(); + let second_pool = pool.clone(); + let second_object_id = object_id.clone(); + let second_insert = tokio::spawn(async move { + let mut txn = second_pool.begin().await.map_err(|err| err.to_string())?; + let pid = sqlx::query_scalar::<_, i32>("SELECT pg_backend_pid()") + .fetch_one(&mut *txn) + .await + .map_err(|err| err.to_string())?; + pid_sender + .send(pid) + .map_err(|_| "could not report second writer PID".to_string())?; + persist( + &mut txn, + table_id, + &second_object_id, + &251_u32, + config_version::ConfigVersion::new(251), + ) + .await + .map_err(|err| err.to_string())?; + txn.commit().await.map_err(|err| err.to_string()) + }); + + let second_pid = pid_receiver.await?; + let mut observer = pool.acquire().await?; + tokio::time::timeout(std::time::Duration::from_secs(5), async { + loop { + let waiting: bool = sqlx::query_scalar( + "SELECT EXISTS (\ + SELECT 1 FROM pg_locks \ + WHERE pid = $1 AND locktype = 'advisory' AND NOT granted\ + )", + ) + .bind(second_pid) + .fetch_one(&mut *observer) + .await?; + if waiting { + return Ok::<(), sqlx::Error>(()); + } + tokio::task::yield_now().await; + } + }) + .await + .map_err(|_| { + std::io::Error::other(format!( + "second writer did not wait for {table_name} retention lock", + )) + })??; + drop(observer); + + first_txn.commit().await?; + second_insert.await?.map_err(std::io::Error::other)?; + + let mut retained_query = sqlx::QueryBuilder::new("SELECT state::TEXT FROM "); + retained_query.push(table_name); + retained_query.push(" WHERE object_id = "); + retained_query.push_bind(&object_id); + retained_query.push(" ORDER BY id ASC"); + let retained: Vec = retained_query.build_query_scalar().fetch_all(pool).await?; + assert_eq!(retained.len(), 250, "retention failed for {table_name}"); + assert_eq!(retained.first().unwrap(), "2"); + assert_eq!(retained.last().unwrap(), "251"); + + Ok(()) + } + + #[crate::sqlx_test] + async fn concurrent_inserts_are_serialized_per_object( + pool: PgPool, + ) -> Result<(), Box> { + for table_id in TABLES { + assert_concurrent_retention(&pool, table_id).await?; + } + + Ok(()) + } + #[crate::sqlx_test] async fn state_history_tables_share_schema_and_retention_behavior( pool: PgPool, From 219e815781e5c80d6a5827fb4b579c2a38e87f82 Mon Sep 17 00:00:00 2001 From: Hasan Khan Date: Sun, 28 Jun 2026 12:39:26 -0700 Subject: [PATCH 3/3] test(db): clean up retention concurrency test Signed-off-by: Hasan Khan --- crates/api-db/src/state_history.rs | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/crates/api-db/src/state_history.rs b/crates/api-db/src/state_history.rs index 0ce128cb00..f411767f10 100644 --- a/crates/api-db/src/state_history.rs +++ b/crates/api-db/src/state_history.rs @@ -207,6 +207,9 @@ mod tests { StateHistoryTableId::Switch, ]; + // This test helper intentionally keeps the first transaction open while it verifies that the + // per-object advisory lock blocks a concurrent writer. + #[allow(txn_held_across_await)] async fn assert_concurrent_retention( pool: &PgPool, table_id: StateHistoryTableId, @@ -258,8 +261,7 @@ mod tests { }); let second_pid = pid_receiver.await?; - let mut observer = pool.acquire().await?; - tokio::time::timeout(std::time::Duration::from_secs(5), async { + let wait_result = tokio::time::timeout(std::time::Duration::from_secs(5), async { loop { let waiting: bool = sqlx::query_scalar( "SELECT EXISTS (\ @@ -268,7 +270,7 @@ mod tests { )", ) .bind(second_pid) - .fetch_one(&mut *observer) + .fetch_one(pool) .await?; if waiting { return Ok::<(), sqlx::Error>(()); @@ -281,11 +283,12 @@ mod tests { std::io::Error::other(format!( "second writer did not wait for {table_name} retention lock", )) - })??; - drop(observer); + }); first_txn.commit().await?; - second_insert.await?.map_err(std::io::Error::other)?; + let second_result = second_insert.await?.map_err(std::io::Error::other); + wait_result??; + second_result?; let mut retained_query = sqlx::QueryBuilder::new("SELECT state::TEXT FROM "); retained_query.push(table_name);