diff --git a/node/test-utils/src/node_ext.rs b/node/test-utils/src/node_ext.rs index 8015b10c..79be6e72 100644 --- a/node/test-utils/src/node_ext.rs +++ b/node/test-utils/src/node_ext.rs @@ -66,16 +66,9 @@ impl NodeExt for NodeApi { fn assert_written_state(&self, resource_id: ResourceId, writers: &[Hash]) { let store = self.storage().store(); - let writer_count = writers.len(); let writer_log: Vec = writers.iter().flat_map(|h| h.as_bytes()).collect(); let versioned_state = StateVersion::from_latest_data(store.as_ref(), resource_id); - assert_eq!( - versioned_state.version(), - writer_count as u64, - "resource {resource_id:?}: expected version {writer_count}, got {}", - versioned_state.version() - ); assert_eq!( *versioned_state.data(), writer_log, diff --git a/scheduling/scheduler/src/access_handle.rs b/scheduling/scheduler/src/access_handle.rs index cb5be380..8729b56a 100644 --- a/scheduling/scheduler/src/access_handle.rs +++ b/scheduling/scheduler/src/access_handle.rs @@ -13,6 +13,7 @@ use crate::{ResourceAccess, processor::Processor}; pub struct AccessHandle<'a, S: Store, P: Processor> { state_version: Arc, access: &'a ResourceAccess, + batch_index: u64, } impl<'a, S: Store, P: Processor> AccessHandle<'a, S, P> { @@ -42,13 +43,13 @@ impl<'a, S: Store, P: Processor> AccessHandle<'a, S, P> { /// Returns a mutable reference to the serialized resource data. #[inline] pub fn data_mut(&mut self) -> &mut Vec { - self.state_version.data_mut() + self.state_version.data_mut(self.batch_index) } /// Replaces the serialized resource data. #[inline] pub fn set_data(&mut self, data: Vec) { - self.state_version.set_data(data) + self.state_version.set_data(self.batch_index, data) } /// Returns true if this resource was created by the current transaction (version 0). @@ -57,8 +58,8 @@ impl<'a, S: Store, P: Processor> AccessHandle<'a, S, P> { self.state_version.version() == 0 } - pub(crate) fn new(access: &'a ResourceAccess) -> Self { - Self { state_version: access.read_state(), access } + pub(crate) fn new(access: &'a ResourceAccess, batch_index: u64) -> Self { + Self { state_version: access.read_state(), access, batch_index } } pub(crate) fn commit_changes(self) { diff --git a/scheduling/scheduler/src/scheduled_transaction.rs b/scheduling/scheduler/src/scheduled_transaction.rs index 3ca8e93e..9a167007 100644 --- a/scheduling/scheduler/src/scheduled_transaction.rs +++ b/scheduling/scheduler/src/scheduled_transaction.rs @@ -99,11 +99,12 @@ impl> ScheduledTransaction { pub(crate) fn execute(&self) { if let Some(batch) = self.batch.upgrade() { + let batch_index = batch.checkpoint().index(); let mut ctx = TransactionContext::new( &self.tx, self.batch_position, &batch, - self.resources.iter().map(AccessHandle::new).collect(), + self.resources.iter().map(|a| AccessHandle::new(a, batch_index)).collect(), ); // If the batch was canceled, roll back all changes and exit early. diff --git a/scheduling/scheduler/tests/e2e.rs b/scheduling/scheduler/tests/e2e.rs index 35a1b520..04d09d9a 100644 --- a/scheduling/scheduler/tests/e2e.rs +++ b/scheduling/scheduler/tests/e2e.rs @@ -8,6 +8,7 @@ use vprogs_core_types::{AccessMetadata, Checkpoint, ResourceId, SchedulerTransac use vprogs_scheduling_scheduler::{ExecutionConfig, Scheduler}; use vprogs_scheduling_test_utils::{Processor, SchedulerExt}; use vprogs_state_metadata::StateMetadata; +use vprogs_state_version::StateVersion; use vprogs_storage_manager::StorageConfig; use vprogs_storage_rocksdb_store::RocksDbStore; @@ -2000,3 +2001,243 @@ pub fn test_smt_deterministic_roots() { scheduler.shutdown(); } } + +/// A resource emptied by a batch leaves no latest pointer once that batch commits. +#[test] +pub fn test_removal_deletes_latest_ptr() { + let temp_dir = TempDir::new().expect("failed to create temp dir"); + { + let storage: RocksDbStore = RocksDbStore::open(temp_dir.path()); + let mut scheduler = Scheduler::new( + ExecutionConfig::default().with_processor(Processor), + StorageConfig::default().with_store(storage), + ); + + let batch1 = scheduler.schedule( + 1, + vec![SchedulerTransaction::new( + 1, + vec![AccessMetadata::write(ResourceId::for_test(1))], + 1, + )], + ); + batch1.wait_committed_blocking(); + scheduler.assert_written_state(ResourceId::for_test(1), vec![1]); + + // Batch 2 empties resource 1. + let batch2 = scheduler.schedule( + 2, + vec![SchedulerTransaction::new( + 2, + vec![AccessMetadata::write(ResourceId::for_test(1))], + Processor::CLEAR_DATA, + )], + ); + batch2.wait_committed_blocking(); + + scheduler.assert_resource_deleted(ResourceId::for_test(1)); + + scheduler.shutdown(); + } +} + +/// Recreating a removed resource takes the recreating batch's index as its version and does not +/// collide with the retained pre-removal version. +#[test] +pub fn test_recreate_after_removal_uses_batch_index_version() { + let temp_dir = TempDir::new().expect("failed to create temp dir"); + { + let storage: RocksDbStore = RocksDbStore::open(temp_dir.path()); + let mut scheduler = Scheduler::new( + ExecutionConfig::default().with_processor(Processor), + StorageConfig::default().with_store(storage), + ); + + // Create (batch 1), remove (batch 2), recreate (batch 3). + scheduler.schedule( + 1, + vec![SchedulerTransaction::new( + 1, + vec![AccessMetadata::write(ResourceId::for_test(1))], + 1, + )], + ); + scheduler.schedule( + 2, + vec![SchedulerTransaction::new( + 2, + vec![AccessMetadata::write(ResourceId::for_test(1))], + Processor::CLEAR_DATA, + )], + ); + let batch3 = scheduler.schedule( + 3, + vec![SchedulerTransaction::new( + 3, + vec![AccessMetadata::write(ResourceId::for_test(1))], + 3, + )], + ); + batch3.wait_committed_blocking(); + + let store = scheduler.state().storage().store(); + let state = StateVersion::from_latest_data(store.as_ref(), ResourceId::for_test(1)); + assert_eq!(state.version(), 3, "recreate takes the batch-index version"); + assert_eq!(*state.data(), 3usize.to_be_bytes().to_vec()); + + // The pre-removal version (batch 1) is retained at its own key, not overwritten. + assert_eq!( + StateVersion::get(store.as_ref(), 1, &ResourceId::for_test(1)), + Some(1usize.to_be_bytes().to_vec()), + "the original version is retained without collision" + ); + + scheduler.shutdown(); + } +} + +/// Rolling back a removal restores the resource's pre-removal version. +#[test] +pub fn test_rollback_removal_restores_prior_version() { + let temp_dir = TempDir::new().expect("failed to create temp dir"); + { + let storage: RocksDbStore = RocksDbStore::open(temp_dir.path()); + let mut scheduler = Scheduler::new( + ExecutionConfig::default().with_processor(Processor), + StorageConfig::default().with_store(storage), + ); + + scheduler.schedule( + 1, + vec![SchedulerTransaction::new( + 1, + vec![AccessMetadata::write(ResourceId::for_test(1))], + 1, + )], + ); + let batch2 = scheduler.schedule( + 2, + vec![SchedulerTransaction::new( + 2, + vec![AccessMetadata::write(ResourceId::for_test(1))], + Processor::CLEAR_DATA, + )], + ); + batch2.wait_committed_blocking(); + scheduler.assert_resource_deleted(ResourceId::for_test(1)); + + // Rolling back the removal batch restores resource 1. + scheduler.rollback_to(1).expect("rollback should succeed"); + scheduler.assert_written_state(ResourceId::for_test(1), vec![1]); + + scheduler.shutdown(); + } +} + +/// Rolling back a recreate (after a removal) leaves the resource absent again. +#[test] +pub fn test_rollback_recreate_leaves_absent() { + let temp_dir = TempDir::new().expect("failed to create temp dir"); + { + let storage: RocksDbStore = RocksDbStore::open(temp_dir.path()); + let mut scheduler = Scheduler::new( + ExecutionConfig::default().with_processor(Processor), + StorageConfig::default().with_store(storage), + ); + + scheduler.schedule( + 1, + vec![SchedulerTransaction::new( + 1, + vec![AccessMetadata::write(ResourceId::for_test(1))], + 1, + )], + ); + scheduler.schedule( + 2, + vec![SchedulerTransaction::new( + 2, + vec![AccessMetadata::write(ResourceId::for_test(1))], + Processor::CLEAR_DATA, + )], + ); + let batch3 = scheduler.schedule( + 3, + vec![SchedulerTransaction::new( + 3, + vec![AccessMetadata::write(ResourceId::for_test(1))], + 3, + )], + ); + batch3.wait_committed_blocking(); + scheduler.assert_written_state(ResourceId::for_test(1), vec![3]); + + // Rolling back to the removal batch (2) undoes the recreate: absent again. + scheduler.rollback_to(2).expect("rollback should succeed"); + scheduler.assert_resource_deleted(ResourceId::for_test(1)); + + scheduler.shutdown(); + } +} + +/// After pruning crosses the removal batch, a removed resource has zero on-disk footprint: no +/// latest pointer, no version rows, and no rollback pointers. +#[test] +pub fn test_pruning_removed_resource_zero_footprint() { + use vprogs_state_ptr_rollback::StatePtrRollback; + + let temp_dir = TempDir::new().expect("failed to create temp dir"); + { + let storage: RocksDbStore = RocksDbStore::open(temp_dir.path()); + let mut scheduler = Scheduler::new( + ExecutionConfig::default().with_processor(Processor), + StorageConfig::default().with_store(storage), + ); + + // Create resource 1 (batch 1), remove it (batch 2), then an unrelated batch 3 so batches + // 1-2 fall below the pruning threshold. + scheduler.schedule( + 1, + vec![SchedulerTransaction::new( + 1, + vec![AccessMetadata::write(ResourceId::for_test(1))], + 1, + )], + ); + scheduler.schedule( + 2, + vec![SchedulerTransaction::new( + 2, + vec![AccessMetadata::write(ResourceId::for_test(1))], + Processor::CLEAR_DATA, + )], + ); + let batch3 = scheduler.schedule( + 3, + vec![SchedulerTransaction::new( + 3, + vec![AccessMetadata::write(ResourceId::for_test(2))], + 3, + )], + ); + batch3.wait_committed_blocking(); + + // Prune batches 1 and 2. + scheduler.pruning().set_threshold(3); + scheduler.wait_pruned(2, Duration::from_secs(10)); + + let store = scheduler.state().storage().store(); + + // No latest pointer, no data row for the pre-removal version, no rollback pointers. + scheduler.assert_resource_deleted(ResourceId::for_test(1)); + assert_eq!( + StateVersion::get(store.as_ref(), 1, &ResourceId::for_test(1)), + None, + "the pre-removal version row should be pruned" + ); + assert_eq!(StatePtrRollback::iter_batch(store.as_ref(), 1).count(), 0); + assert_eq!(StatePtrRollback::iter_batch(store.as_ref(), 2).count(), 0); + + scheduler.shutdown(); + } +} diff --git a/scheduling/test-utils/src/processor.rs b/scheduling/test-utils/src/processor.rs index f0848a01..e8a166e4 100644 --- a/scheduling/test-utils/src/processor.rs +++ b/scheduling/test-utils/src/processor.rs @@ -3,9 +3,17 @@ use vprogs_scheduling_scheduler::TransactionContext; use vprogs_storage_types::Store; /// A minimal processor implementation for testing the scheduler. +/// +/// A write transaction appends its id to each resource it writes, except the sentinel +/// [`Processor::CLEAR_DATA`], which empties (removes) the resource instead. #[derive(Clone)] pub struct Processor; +impl Processor { + /// Transaction id whose writes empty (remove) the resource rather than appending to it. + pub const CLEAR_DATA: usize = usize::MAX; +} + impl vprogs_scheduling_scheduler::Processor for Processor { fn process_transaction( &self, @@ -14,7 +22,11 @@ impl vprogs_scheduling_scheduler::Processor for Processor { let tx_id = ctx.scheduler_tx().tx; for resource in ctx.resources_mut() { if resource.access_metadata().access_type == AccessType::Write { - resource.data_mut().extend_from_slice(&tx_id.to_be_bytes()); + if tx_id == Self::CLEAR_DATA { + resource.set_data(Vec::new()); + } else { + resource.data_mut().extend_from_slice(&tx_id.to_be_bytes()); + } } } Ok(()) diff --git a/scheduling/test-utils/src/scheduler_ext.rs b/scheduling/test-utils/src/scheduler_ext.rs index 52344935..bd66207d 100644 --- a/scheduling/test-utils/src/scheduler_ext.rs +++ b/scheduling/test-utils/src/scheduler_ext.rs @@ -17,7 +17,7 @@ pub trait SchedulerExt { /// Repeatedly processes the eviction queue until the cache is empty or timeout is reached. fn wait_cache_empty(&mut self, timeout: Duration) -> &mut Self; - /// Asserts that a resource has the expected version and writer log. + /// Asserts that a resource's stored data matches the expected writer log. fn assert_written_state(&self, resource_id: ResourceId, writers: Vec) -> &Self; /// Asserts that a resource has been deleted (no latest pointer exists). @@ -56,11 +56,9 @@ impl SchedulerExt for Scheduler { fn assert_written_state(&self, resource_id: ResourceId, writers: Vec) -> &Self { let store = self.state().storage().store(); - let writer_count = writers.len(); let writer_log: Vec = writers.iter().flat_map(|id| id.to_be_bytes()).collect(); let versioned_state = StateVersion::from_latest_data(store.as_ref(), resource_id); - assert_eq!(versioned_state.version(), writer_count as u64); assert_eq!(*versioned_state.data(), writer_log); self } diff --git a/state/version/src/lib.rs b/state/version/src/lib.rs index 61806a52..281661c4 100644 --- a/state/version/src/lib.rs +++ b/state/version/src/lib.rs @@ -7,7 +7,7 @@ use vprogs_state_ptr_rollback::StatePtrRollback; use vprogs_storage_manager::concat_bytes; use vprogs_storage_types::{ReadStore, StateSpace, WriteBatch}; -#[derive(Clone, Debug, Eq, Hash, PartialEq)] +#[derive(Clone, Debug)] pub struct StateVersion { resource_id: ResourceId, version: u64, @@ -19,10 +19,7 @@ impl StateVersion { Self { resource_id: id, version: 0, data: Vec::new() } } - pub fn from_latest_data(store: &S, id: ResourceId) -> Self - where - S: ReadStore, - { + pub fn from_latest_data(store: &S, id: ResourceId) -> Self { match StatePtrLatest::get(store, &id) { None => Self::empty(id), Some(version) => Self { @@ -41,52 +38,45 @@ impl StateVersion { &self.data } - pub fn data_mut(self: &mut Arc) -> &mut Vec { - &mut Arc::make_mut(self).tap_mut(|s| s.version += 1).data + pub fn data_mut(self: &mut Arc, version: u64) -> &mut Vec { + &mut Arc::make_mut(self).tap_mut(|s| s.version = version).data } - pub fn set_data(self: &mut Arc, data: Vec) { + pub fn set_data(self: &mut Arc, version: u64, data: Vec) { if self.data != data { if let Some(s) = Arc::get_mut(self) { - s.version += 1; + s.version = version; s.data = data; } else { - let this = Self { resource_id: self.resource_id, version: self.version + 1, data }; + let this = Self { resource_id: self.resource_id, version, data }; *self = Arc::new(this); } } } - pub fn write_data(&self, wb: &mut W) - where - W: WriteBatch, - { + pub fn write_data(&self, wb: &mut W) { if !self.data.is_empty() { Self::put(wb, self.version, &self.resource_id, &self.data); } } - pub fn write_latest_ptr(&self, wb: &mut W) - where - W: WriteBatch, - { - StatePtrLatest::put(wb, &self.resource_id, self.version); + pub fn write_latest_ptr(&self, wb: &mut W) { + if !self.data.is_empty() { + StatePtrLatest::put(wb, &self.resource_id, self.version); + } else { + StatePtrLatest::delete(wb, &self.resource_id); + } } - pub fn write_rollback_ptr(&self, wb: &mut W, batch_index: u64) - where - W: WriteBatch, - { - StatePtrRollback::put(wb, batch_index, &self.resource_id, self.version); + pub fn write_rollback_ptr(&self, wb: &mut W, batch_index: u64) { + let version = if self.data.is_empty() { 0 } else { self.version }; + StatePtrRollback::put(wb, batch_index, &self.resource_id, version); } /// Gets the data for a specific version of a resource. /// /// Key layout: `version (u64 BE) || resource_id (borsh)` - pub fn get(store: &S, version: u64, resource_id: &ResourceId) -> Option> - where - S: ReadStore, - { + pub fn get(store: &S, version: u64, resource_id: &ResourceId) -> Option> { let rid = borsh::to_vec(resource_id).expect("failed to serialize ResourceId"); let key = concat_bytes!(&version.to_be_bytes(), &rid); store.get(StateSpace::StateVersion, &key) @@ -95,10 +85,7 @@ impl StateVersion { /// Stores data for a specific version of a resource. /// /// Key layout: `version (u64 BE) || resource_id (borsh)` - pub fn put(wb: &mut W, version: u64, resource_id: &ResourceId, data: &[u8]) - where - W: WriteBatch, - { + pub fn put(wb: &mut W, version: u64, resource_id: &ResourceId, data: &[u8]) { let rid = borsh::to_vec(resource_id).expect("failed to serialize ResourceId"); let key = concat_bytes!(&version.to_be_bytes(), &rid); wb.put(StateSpace::StateVersion, &key, data); @@ -107,10 +94,7 @@ impl StateVersion { /// Deletes data for a specific version of a resource. /// /// Key layout: `version (u64 BE) || resource_id (borsh)` - pub fn delete(wb: &mut W, version: u64, resource_id: &ResourceId) - where - W: WriteBatch, - { + pub fn delete(wb: &mut W, version: u64, resource_id: &ResourceId) { let rid = borsh::to_vec(resource_id).expect("failed to serialize ResourceId"); let key = concat_bytes!(&version.to_be_bytes(), &rid); wb.delete(StateSpace::StateVersion, &key);