Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 0 additions & 7 deletions node/test-utils/src/node_ext.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,16 +66,9 @@ impl NodeExt for NodeApi<RocksDbStore, TestNodeVm> {

fn assert_written_state(&self, resource_id: ResourceId, writers: &[Hash]) {
let store = self.storage().store();
let writer_count = writers.len();
let writer_log: Vec<u8> = writers.iter().flat_map(|h| h.as_bytes()).collect();

let versioned_state = StateVersion::from_latest_data(store.as_ref(), resource_id);
assert_eq!(
versioned_state.version(),
writer_count as u64,
"resource {resource_id:?}: expected version {writer_count}, got {}",
versioned_state.version()
);
assert_eq!(
*versioned_state.data(),
writer_log,
Expand Down
9 changes: 5 additions & 4 deletions scheduling/scheduler/src/access_handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use crate::{ResourceAccess, processor::Processor};
pub struct AccessHandle<'a, S: Store, P: Processor<S>> {
state_version: Arc<StateVersion>,
access: &'a ResourceAccess<S, P>,
batch_index: u64,
}

impl<'a, S: Store, P: Processor<S>> AccessHandle<'a, S, P> {
Expand Down Expand Up @@ -42,13 +43,13 @@ impl<'a, S: Store, P: Processor<S>> AccessHandle<'a, S, P> {
/// Returns a mutable reference to the serialized resource data.
#[inline]
pub fn data_mut(&mut self) -> &mut Vec<u8> {
self.state_version.data_mut()
self.state_version.data_mut(self.batch_index)
}

/// Replaces the serialized resource data.
#[inline]
pub fn set_data(&mut self, data: Vec<u8>) {
self.state_version.set_data(data)
self.state_version.set_data(self.batch_index, data)
}

/// Returns true if this resource was created by the current transaction (version 0).
Expand All @@ -57,8 +58,8 @@ impl<'a, S: Store, P: Processor<S>> AccessHandle<'a, S, P> {
self.state_version.version() == 0
}

pub(crate) fn new(access: &'a ResourceAccess<S, P>) -> Self {
Self { state_version: access.read_state(), access }
pub(crate) fn new(access: &'a ResourceAccess<S, P>, batch_index: u64) -> Self {
Self { state_version: access.read_state(), access, batch_index }
}

pub(crate) fn commit_changes(self) {
Expand Down
3 changes: 2 additions & 1 deletion scheduling/scheduler/src/scheduled_transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,11 +99,12 @@ impl<S: Store, P: Processor<S>> ScheduledTransaction<S, P> {

pub(crate) fn execute(&self) {
if let Some(batch) = self.batch.upgrade() {
let batch_index = batch.checkpoint().index();
let mut ctx = TransactionContext::new(
&self.tx,
self.batch_position,
&batch,
self.resources.iter().map(AccessHandle::new).collect(),
self.resources.iter().map(|a| AccessHandle::new(a, batch_index)).collect(),
);

// If the batch was canceled, roll back all changes and exit early.
Expand Down
241 changes: 241 additions & 0 deletions scheduling/scheduler/tests/e2e.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use vprogs_core_types::{AccessMetadata, Checkpoint, ResourceId, SchedulerTransac
use vprogs_scheduling_scheduler::{ExecutionConfig, Scheduler};
use vprogs_scheduling_test_utils::{Processor, SchedulerExt};
use vprogs_state_metadata::StateMetadata;
use vprogs_state_version::StateVersion;
use vprogs_storage_manager::StorageConfig;
use vprogs_storage_rocksdb_store::RocksDbStore;

Expand Down Expand Up @@ -2000,3 +2001,243 @@ pub fn test_smt_deterministic_roots() {
scheduler.shutdown();
}
}

/// A resource emptied by a batch leaves no latest pointer once that batch commits.
#[test]
pub fn test_removal_deletes_latest_ptr() {
let temp_dir = TempDir::new().expect("failed to create temp dir");
{
let storage: RocksDbStore = RocksDbStore::open(temp_dir.path());
let mut scheduler = Scheduler::new(
ExecutionConfig::default().with_processor(Processor),
StorageConfig::default().with_store(storage),
);

let batch1 = scheduler.schedule(
1,
vec![SchedulerTransaction::new(
1,
vec![AccessMetadata::write(ResourceId::for_test(1))],
1,
)],
);
batch1.wait_committed_blocking();
scheduler.assert_written_state(ResourceId::for_test(1), vec![1]);

// Batch 2 empties resource 1.
let batch2 = scheduler.schedule(
2,
vec![SchedulerTransaction::new(
2,
vec![AccessMetadata::write(ResourceId::for_test(1))],
Processor::CLEAR_DATA,
)],
);
batch2.wait_committed_blocking();

scheduler.assert_resource_deleted(ResourceId::for_test(1));

scheduler.shutdown();
}
}

/// Recreating a removed resource takes the recreating batch's index as its version and does not
/// collide with the retained pre-removal version.
#[test]
pub fn test_recreate_after_removal_uses_batch_index_version() {
let temp_dir = TempDir::new().expect("failed to create temp dir");
{
let storage: RocksDbStore = RocksDbStore::open(temp_dir.path());
let mut scheduler = Scheduler::new(
ExecutionConfig::default().with_processor(Processor),
StorageConfig::default().with_store(storage),
);

// Create (batch 1), remove (batch 2), recreate (batch 3).
scheduler.schedule(
1,
vec![SchedulerTransaction::new(
1,
vec![AccessMetadata::write(ResourceId::for_test(1))],
1,
)],
);
scheduler.schedule(
2,
vec![SchedulerTransaction::new(
2,
vec![AccessMetadata::write(ResourceId::for_test(1))],
Processor::CLEAR_DATA,
)],
);
let batch3 = scheduler.schedule(
3,
vec![SchedulerTransaction::new(
3,
vec![AccessMetadata::write(ResourceId::for_test(1))],
3,
)],
);
batch3.wait_committed_blocking();

let store = scheduler.state().storage().store();
let state = StateVersion::from_latest_data(store.as_ref(), ResourceId::for_test(1));
assert_eq!(state.version(), 3, "recreate takes the batch-index version");
assert_eq!(*state.data(), 3usize.to_be_bytes().to_vec());

// The pre-removal version (batch 1) is retained at its own key, not overwritten.
assert_eq!(
StateVersion::get(store.as_ref(), 1, &ResourceId::for_test(1)),
Some(1usize.to_be_bytes().to_vec()),
"the original version is retained without collision"
);

scheduler.shutdown();
}
}

/// Rolling back a removal restores the resource's pre-removal version.
#[test]
pub fn test_rollback_removal_restores_prior_version() {
let temp_dir = TempDir::new().expect("failed to create temp dir");
{
let storage: RocksDbStore = RocksDbStore::open(temp_dir.path());
let mut scheduler = Scheduler::new(
ExecutionConfig::default().with_processor(Processor),
StorageConfig::default().with_store(storage),
);

scheduler.schedule(
1,
vec![SchedulerTransaction::new(
1,
vec![AccessMetadata::write(ResourceId::for_test(1))],
1,
)],
);
let batch2 = scheduler.schedule(
2,
vec![SchedulerTransaction::new(
2,
vec![AccessMetadata::write(ResourceId::for_test(1))],
Processor::CLEAR_DATA,
)],
);
batch2.wait_committed_blocking();
scheduler.assert_resource_deleted(ResourceId::for_test(1));

// Rolling back the removal batch restores resource 1.
scheduler.rollback_to(1).expect("rollback should succeed");
scheduler.assert_written_state(ResourceId::for_test(1), vec![1]);

scheduler.shutdown();
}
}

/// Rolling back a recreate (after a removal) leaves the resource absent again.
#[test]
pub fn test_rollback_recreate_leaves_absent() {
let temp_dir = TempDir::new().expect("failed to create temp dir");
{
let storage: RocksDbStore = RocksDbStore::open(temp_dir.path());
let mut scheduler = Scheduler::new(
ExecutionConfig::default().with_processor(Processor),
StorageConfig::default().with_store(storage),
);

scheduler.schedule(
1,
vec![SchedulerTransaction::new(
1,
vec![AccessMetadata::write(ResourceId::for_test(1))],
1,
)],
);
scheduler.schedule(
2,
vec![SchedulerTransaction::new(
2,
vec![AccessMetadata::write(ResourceId::for_test(1))],
Processor::CLEAR_DATA,
)],
);
let batch3 = scheduler.schedule(
3,
vec![SchedulerTransaction::new(
3,
vec![AccessMetadata::write(ResourceId::for_test(1))],
3,
)],
);
batch3.wait_committed_blocking();
scheduler.assert_written_state(ResourceId::for_test(1), vec![3]);

// Rolling back to the removal batch (2) undoes the recreate: absent again.
scheduler.rollback_to(2).expect("rollback should succeed");
scheduler.assert_resource_deleted(ResourceId::for_test(1));

scheduler.shutdown();
}
}

/// After pruning crosses the removal batch, a removed resource has zero on-disk footprint: no
/// latest pointer, no version rows, and no rollback pointers.
#[test]
pub fn test_pruning_removed_resource_zero_footprint() {
use vprogs_state_ptr_rollback::StatePtrRollback;

let temp_dir = TempDir::new().expect("failed to create temp dir");
{
let storage: RocksDbStore = RocksDbStore::open(temp_dir.path());
let mut scheduler = Scheduler::new(
ExecutionConfig::default().with_processor(Processor),
StorageConfig::default().with_store(storage),
);

// Create resource 1 (batch 1), remove it (batch 2), then an unrelated batch 3 so batches
// 1-2 fall below the pruning threshold.
scheduler.schedule(
1,
vec![SchedulerTransaction::new(
1,
vec![AccessMetadata::write(ResourceId::for_test(1))],
1,
)],
);
scheduler.schedule(
2,
vec![SchedulerTransaction::new(
2,
vec![AccessMetadata::write(ResourceId::for_test(1))],
Processor::CLEAR_DATA,
)],
);
let batch3 = scheduler.schedule(
3,
vec![SchedulerTransaction::new(
3,
vec![AccessMetadata::write(ResourceId::for_test(2))],
3,
)],
);
batch3.wait_committed_blocking();

// Prune batches 1 and 2.
scheduler.pruning().set_threshold(3);
scheduler.wait_pruned(2, Duration::from_secs(10));

let store = scheduler.state().storage().store();

// No latest pointer, no data row for the pre-removal version, no rollback pointers.
scheduler.assert_resource_deleted(ResourceId::for_test(1));
assert_eq!(
StateVersion::get(store.as_ref(), 1, &ResourceId::for_test(1)),
None,
"the pre-removal version row should be pruned"
);
assert_eq!(StatePtrRollback::iter_batch(store.as_ref(), 1).count(), 0);
assert_eq!(StatePtrRollback::iter_batch(store.as_ref(), 2).count(), 0);

scheduler.shutdown();
}
}
14 changes: 13 additions & 1 deletion scheduling/test-utils/src/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,17 @@ use vprogs_scheduling_scheduler::TransactionContext;
use vprogs_storage_types::Store;

/// A minimal processor implementation for testing the scheduler.
///
/// A write transaction appends its id to each resource it writes, except the sentinel
/// [`Processor::CLEAR_DATA`], which empties (removes) the resource instead.
#[derive(Clone)]
pub struct Processor;

impl Processor {
/// Transaction id whose writes empty (remove) the resource rather than appending to it.
pub const CLEAR_DATA: usize = usize::MAX;
}

impl<S: Store> vprogs_scheduling_scheduler::Processor<S> for Processor {
fn process_transaction(
&self,
Expand All @@ -14,7 +22,11 @@ impl<S: Store> vprogs_scheduling_scheduler::Processor<S> for Processor {
let tx_id = ctx.scheduler_tx().tx;
for resource in ctx.resources_mut() {
if resource.access_metadata().access_type == AccessType::Write {
resource.data_mut().extend_from_slice(&tx_id.to_be_bytes());
if tx_id == Self::CLEAR_DATA {
resource.set_data(Vec::new());
} else {
resource.data_mut().extend_from_slice(&tx_id.to_be_bytes());
}
}
}
Ok(())
Expand Down
4 changes: 1 addition & 3 deletions scheduling/test-utils/src/scheduler_ext.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ pub trait SchedulerExt {
/// Repeatedly processes the eviction queue until the cache is empty or timeout is reached.
fn wait_cache_empty(&mut self, timeout: Duration) -> &mut Self;

/// Asserts that a resource has the expected version and writer log.
/// Asserts that a resource's stored data matches the expected writer log.
fn assert_written_state(&self, resource_id: ResourceId, writers: Vec<usize>) -> &Self;

/// Asserts that a resource has been deleted (no latest pointer exists).
Expand Down Expand Up @@ -56,11 +56,9 @@ impl SchedulerExt for Scheduler<RocksDbStore, Processor> {

fn assert_written_state(&self, resource_id: ResourceId, writers: Vec<usize>) -> &Self {
let store = self.state().storage().store();
let writer_count = writers.len();
let writer_log: Vec<u8> = writers.iter().flat_map(|id| id.to_be_bytes()).collect();

let versioned_state = StateVersion::from_latest_data(store.as_ref(), resource_id);
assert_eq!(versioned_state.version(), writer_count as u64);
assert_eq!(*versioned_state.data(), writer_log);
self
}
Expand Down
Loading
Loading