diff --git a/Cargo.lock b/Cargo.lock index a5e9244a..6e1d7a39 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8577,6 +8577,7 @@ dependencies = [ "kaspa-rpc-core", "kaspa-txscript", "kaspa-wrpc-client", + "vprogs-core-types", "zerocopy", ] @@ -8729,12 +8730,14 @@ dependencies = [ "vprogs-scheduling-test-utils", "vprogs-state-batch-metadata", "vprogs-state-metadata", + "vprogs-state-proof-receipt", "vprogs-state-ptr-latest", "vprogs-state-ptr-rollback", "vprogs-state-version", "vprogs-storage-manager", "vprogs-storage-rocksdb-store", "vprogs-storage-types", + "zerocopy", ] [[package]] @@ -9041,6 +9044,9 @@ dependencies = [ [[package]] name = "vprogs-transaction-runtime-transaction-effects" version = "0.1.0" +dependencies = [ + "borsh", +] [[package]] name = "vprogs-zk-abi" @@ -9073,13 +9079,19 @@ dependencies = [ "kaspa-hashes", "kaspa-rpc-core", "log", + "tempfile", "tokio", "vprogs-core-atomics", "vprogs-core-codec", "vprogs-core-macros", "vprogs-core-smt", + "vprogs-core-types", "vprogs-l1-types", "vprogs-scheduling-scheduler", + "vprogs-scheduling-test-utils", + "vprogs-state-proof-receipt", + "vprogs-storage-manager", + "vprogs-storage-rocksdb-store", "vprogs-storage-types", "vprogs-zk-abi", "vprogs-zk-batch-prover", @@ -9200,6 +9212,7 @@ dependencies = [ "vprogs-core-macros", "vprogs-l1-types", "vprogs-scheduling-scheduler", + "vprogs-state-proof-receipt", "vprogs-storage-types", "vprogs-zk-abi", "vprogs-zk-transaction-prover", @@ -9210,11 +9223,14 @@ dependencies = [ name = "vprogs-zk-transaction-prover" version = "0.1.0" dependencies = [ + "borsh", "tap", "tokio", "vprogs-core-atomics", "vprogs-core-macros", + "vprogs-l1-types", "vprogs-scheduling-scheduler", + "vprogs-state-proof-receipt", "vprogs-storage-types", ] diff --git a/core/types/Cargo.toml b/core/types/Cargo.toml index 8cf9e124..010d2ca4 100644 --- a/core/types/Cargo.toml +++ b/core/types/Cargo.toml @@ -8,3 +8,7 @@ borsh = { workspace = true, default-features = false } rkyv = { workspace = true, default-features = false } vprogs-core-codec = { workspace = true } zerocopy = { workspace = true, features = ["derive"] } + +[features] +# Stand-in trait impls for tests only (e.g. `BatchMetadata for u64`); never enabled by the node. +test-utils = [] diff --git a/core/types/src/batch_metadata.rs b/core/types/src/batch_metadata.rs index 34d3194a..68019d80 100644 --- a/core/types/src/batch_metadata.rs +++ b/core/types/src/batch_metadata.rs @@ -4,14 +4,27 @@ use borsh::{BorshDeserialize, BorshSerialize}; /// Opaque metadata attached to each scheduler batch, supporting serialization. /// -/// Implementors derive `BorshSerialize` and `BorshDeserialize`; the trait is implemented -/// automatically via a blanket impl. +/// Implementors derive `BorshSerialize` and `BorshDeserialize` and supply the chain block hash +/// the batch was formed against via [`block_hash`](Self::block_hash), which keys the batch's +/// proof receipts in the proof-receipt store. pub trait BatchMetadata: BorshSerialize + BorshDeserialize + Clone + Debug + Default + Send + Sync + 'static { + /// The chain block hash this batch was formed against, as raw bytes. + /// + /// The proving workers fold it into each receipt's cache key so the same checkpoint index on + /// two competing chains yields distinct keys. + fn block_hash(&self) -> [u8; 32]; } -impl BatchMetadata for T where - T: BorshSerialize + BorshDeserialize + Clone + Debug + Default + Send + Sync + 'static -{ +/// A `u64` batch index doubles as minimal test metadata: its big-endian bytes stand in for a +/// block hash, keeping per-index receipts distinct without a real chain. Gated behind `test-utils` +/// so the node never treats a bare index as batch metadata. +#[cfg(feature = "test-utils")] +impl BatchMetadata for u64 { + fn block_hash(&self) -> [u8; 32] { + let mut bytes = [0u8; 32]; + bytes[..8].copy_from_slice(&self.to_be_bytes()); + bytes + } } diff --git a/examples/tn10-flow/src/daemon.rs b/examples/tn10-flow/src/daemon.rs index ab16177b..39e654c4 100644 --- a/examples/tn10-flow/src/daemon.rs +++ b/examples/tn10-flow/src/daemon.rs @@ -24,7 +24,7 @@ use vprogs_node_framework::{Node, NodeConfig}; use vprogs_scheduling_scheduler::ExecutionConfig; use vprogs_storage_manager::StorageConfig; use vprogs_storage_rocksdb_store::RocksDbStore; -use vprogs_zk_aggregate_prover::{AggregateProverConfig, ScheduledBundle}; +use vprogs_zk_aggregate_prover::{AggregateProverConfig, ScheduledBundle, SettlementArtifact}; use vprogs_zk_backend_risc0_api::{Backend, ProofType, Receipt}; use vprogs_zk_batch_prover::{BatchProverConfig, LaneProofRequest, LaneProofSource}; use vprogs_zk_vm::{ProvingPipeline, Vm}; @@ -37,7 +37,7 @@ pub type V = Vm; pub type FlowNode = Node; /// Queue of bundle handles the aggregate prover publishes to the settlement worker (one per formed /// bundle). -pub type FlowSettlementQueue = AsyncQueue>; +pub type FlowSettlementQueue = AsyncQueue>>; /// Everything the bridge needs to follow our lane on the remote node. pub struct BridgeParams { diff --git a/l1/types/Cargo.toml b/l1/types/Cargo.toml index 76fcf0b3..f2d7b47f 100644 --- a/l1/types/Cargo.toml +++ b/l1/types/Cargo.toml @@ -10,4 +10,5 @@ kaspa-hashes.workspace = true kaspa-rpc-core.workspace = true kaspa-txscript.workspace = true kaspa-wrpc-client.workspace = true +vprogs-core-types.workspace = true zerocopy.workspace = true diff --git a/l1/types/src/chain_block_metadata.rs b/l1/types/src/chain_block_metadata.rs index cee11002..8a709460 100644 --- a/l1/types/src/chain_block_metadata.rs +++ b/l1/types/src/chain_block_metadata.rs @@ -1,5 +1,6 @@ use borsh::{BorshDeserialize, BorshSerialize}; use kaspa_rpc_core::{RpcHeader, RpcOptionalHeader}; +use vprogs_core_types::BatchMetadata; use crate::{Hash, SettlementInfo}; @@ -39,6 +40,12 @@ pub struct ChainBlockMetadata { pub last_settlement: Option, } +impl BatchMetadata for ChainBlockMetadata { + fn block_hash(&self) -> [u8; 32] { + self.hash.as_bytes() + } +} + impl From<&RpcHeader> for ChainBlockMetadata { /// Builds metadata from a regular RPC header, populating only the header-derived fields and /// leaving lane / parent-derived state at its default. diff --git a/node/test-utils/src/vm.rs b/node/test-utils/src/vm.rs index d52d22a5..b0baeb87 100644 --- a/node/test-utils/src/vm.rs +++ b/node/test-utils/src/vm.rs @@ -22,9 +22,23 @@ impl Processor for TestNodeVm { Ok(()) } + // This test VM does not prove, so its receipt-cache image ids are unset. + fn tx_image_id(&self) -> [u8; 32] { + [0u8; 32] + } + + fn batch_image_id(&self) -> [u8; 32] { + [0u8; 32] + } + + fn aggregator_image_id(&self) -> [u8; 32] { + [0u8; 32] + } + type Transaction = L1Transaction; type TransactionArtifact = (); type BatchArtifact = (); + type AggregatorArtifact = (); type BatchMetadata = ChainBlockMetadata; type Error = (); } diff --git a/node/vm/src/lib.rs b/node/vm/src/lib.rs index 05be3604..29a9ae74 100644 --- a/node/vm/src/lib.rs +++ b/node/vm/src/lib.rs @@ -18,9 +18,23 @@ impl Processor for VM { todo!("transaction execution from SchedulerTransaction") } + // The node VM does not yet drive proving, so its receipt-cache image ids are unset. + fn tx_image_id(&self) -> [u8; 32] { + [0u8; 32] + } + + fn batch_image_id(&self) -> [u8; 32] { + [0u8; 32] + } + + fn aggregator_image_id(&self) -> [u8; 32] { + [0u8; 32] + } + type Transaction = L1Transaction; type TransactionArtifact = TransactionEffects; type BatchArtifact = (); + type AggregatorArtifact = (); type BatchMetadata = ChainBlockMetadata; type Error = VmError; } diff --git a/scheduling/scheduler/Cargo.toml b/scheduling/scheduler/Cargo.toml index 77e34bac..eade53c0 100644 --- a/scheduling/scheduler/Cargo.toml +++ b/scheduling/scheduler/Cargo.toml @@ -20,11 +20,13 @@ vprogs-core-types.workspace = true vprogs-scheduling-execution-workers.workspace = true vprogs-state-batch-metadata.workspace = true vprogs-state-metadata.workspace = true +vprogs-state-proof-receipt.workspace = true vprogs-state-ptr-latest.workspace = true vprogs-state-ptr-rollback.workspace = true vprogs-state-version.workspace = true vprogs-storage-manager.workspace = true vprogs-storage-types.workspace = true +zerocopy.workspace = true [dev-dependencies] tempfile.workspace = true diff --git a/scheduling/scheduler/src/lib.rs b/scheduling/scheduler/src/lib.rs index e0c2b6dd..a8bf5996 100644 --- a/scheduling/scheduler/src/lib.rs +++ b/scheduling/scheduler/src/lib.rs @@ -26,11 +26,11 @@ pub use processor::Processor; pub use pruning_worker::PruningWorker; pub(crate) use resource::Resource; pub(crate) use resource_access::ResourceAccess; -pub use scheduled_batch::{ScheduledBatch, ScheduledBatchRef}; +pub use scheduled_batch::{AggReceiptCoord, ScheduledBatch, ScheduledBatchRef}; pub use scheduled_transaction::ScheduledTransaction; pub(crate) use scheduled_transaction::ScheduledTransactionRef; pub use scheduler::Scheduler; pub use state::SchedulerState; pub use state_diff::{StateDiff, StateDiffRef}; -pub use storage_cmd::{Read, Write}; +pub use storage_cmd::{Read, ReadReceipt, ReceiptRead, ReceiptValue, StoreReceipt, Write}; pub use transaction_context::TransactionContext; diff --git a/scheduling/scheduler/src/processor.rs b/scheduling/scheduler/src/processor.rs index e3ddeb08..224b72cf 100644 --- a/scheduling/scheduler/src/processor.rs +++ b/scheduling/scheduler/src/processor.rs @@ -1,3 +1,4 @@ +use borsh::{BorshDeserialize, BorshSerialize}; use vprogs_core_types::BatchMetadata; use vprogs_storage_types::Store; @@ -24,13 +25,28 @@ pub trait Processor: Clone + Sized + Send + Sync + 'static { // Default implementation does nothing (override if needed). } + /// Program identifier keying a per-transaction receipt in the proof-receipt store. + fn tx_image_id(&self) -> [u8; 32]; + + /// Program identifier keying a per-batch receipt in the proof-receipt store. + fn batch_image_id(&self) -> [u8; 32]; + + /// Program identifier keying an aggregate (settlement) receipt in the proof-receipt store. + fn aggregator_image_id(&self) -> [u8; 32]; + /// The transaction payload type (e.g. kaspa `L1Transaction`, `usize` in tests). /// The scheduler wraps it in `SchedulerTransaction`. type Transaction: Send + Sync + 'static; /// Artifact produced by a processed transaction ([`ScheduledTransaction::publish_artifact`]). - type TransactionArtifact: Send + Sync + 'static; - /// Artifact produced by a processed batch ([`ScheduledBatch::publish_artifact`]). - type BatchArtifact: Send + Sync + 'static; + /// The `Borsh` bounds let the scheduler cache it in (and restore it from) the proof-receipt + /// store; `Clone` lets a served lookup hand the cached value back out of its shared slot. + type TransactionArtifact: Clone + BorshSerialize + BorshDeserialize + Send + Sync + 'static; + /// Artifact produced by a processed batch ([`ScheduledBatch::publish_artifact`]). Bounded for + /// the same proof-receipt caching as [`TransactionArtifact`](Self::TransactionArtifact). + type BatchArtifact: Clone + BorshSerialize + BorshDeserialize + Send + Sync + 'static; + /// Receipt produced by aggregating a run of per-batch receipts into one settlement receipt. + /// Bounded for the same proof-receipt caching as the other artifacts. + type AggregatorArtifact: Clone + BorshSerialize + BorshDeserialize + Send + Sync + 'static; /// Opaque metadata attached to each batch for persistence. type BatchMetadata: BatchMetadata; /// Error type returned when transaction processing fails. diff --git a/scheduling/scheduler/src/pruning_worker.rs b/scheduling/scheduler/src/pruning_worker.rs index eab77090..0910c618 100644 --- a/scheduling/scheduler/src/pruning_worker.rs +++ b/scheduling/scheduler/src/pruning_worker.rs @@ -12,6 +12,7 @@ use tokio::{runtime::Builder, sync::Notify}; use vprogs_core_types::{Checkpoint, ResourceId}; use vprogs_state_batch_metadata::BatchMetadata as StoredBatchMetadata; use vprogs_state_metadata::StateMetadata; +use vprogs_state_proof_receipt::{Prefix, invalidate_checkpoint}; use vprogs_state_ptr_rollback::StatePtrRollback; use vprogs_state_version::StateVersion; use vprogs_storage_types::Store; @@ -225,6 +226,15 @@ impl> PruningWorker { // Delete batch metadata entries for this batch. StoredBatchMetadata::delete(wb, index); + // Drop every cached proof receipt at this checkpoint index, across all programs + // and all competing forks. A pruned batch can never be rolled back to, so no + // future replay (even a flip reorg) needs its receipts again. + invalidate_checkpoint( + store.as_ref(), + wb, + &Prefix { checkpoint_index: index.into() }, + ); + // Prune stale SMT nodes for this version. store.prune(wb, index); } diff --git a/scheduling/scheduler/src/scheduled_batch.rs b/scheduling/scheduler/src/scheduled_batch.rs index 66020dc1..8be3487f 100644 --- a/scheduling/scheduler/src/scheduled_batch.rs +++ b/scheduling/scheduler/src/scheduled_batch.rs @@ -8,17 +8,29 @@ use crossbeam_deque::{Injector, Steal, Worker}; use vprogs_core_atomics::AtomicAsyncLatch; use vprogs_core_macros::smart_pointer; use vprogs_core_smt::Commitment; -use vprogs_core_types::{Checkpoint, ResourceId, SchedulerTransaction}; +use vprogs_core_types::{BatchMetadata, Checkpoint, ResourceId, SchedulerTransaction}; use vprogs_scheduling_execution_workers::Batch; use vprogs_state_batch_metadata::BatchMetadata as StoredBatchMetadata; use vprogs_state_metadata::StateMetadata; +use vprogs_state_proof_receipt::{AggregatorKey, BatchKey, Prefix}; use vprogs_storage_types::Store; use crate::{ - CancellationContext, ScheduledTransaction, Scheduler, StateDiff, Write, cpu_task::ManagerTask, - processor::Processor, state::SchedulerState, + CancellationContext, Read, ReadReceipt, ReceiptRead, ScheduledTransaction, Scheduler, + StateDiff, StoreReceipt, Write, cpu_task::ManagerTask, processor::Processor, + state::SchedulerState, storage_cmd::ReceiptLookup, }; +/// The bundle-start coordinate and claimed tip that key an aggregate (settlement) receipt. +pub struct AggReceiptCoord { + /// Bundle-start checkpoint index. + pub checkpoint_index: u64, + /// L1 block at the bundle's first checkpoint (the block it proves from). + pub from_block: [u8; 32], + /// Commitment to the bundle's claimed tip. + pub seq_commit: [u8; 32], +} + /// A batch of transactions progressing through the scheduler's lifecycle. /// /// Each batch moves through three stages: processed (all transactions executed), persisted (all @@ -30,6 +42,8 @@ use crate::{ pub struct ScheduledBatch> { /// Cancellation context captured at creation time for rollback detection. cancellation: CancellationContext, + /// Processor handle for deriving the program image ids that key this batch's proof receipts. + processor: P, /// Shared scheduler state for storage access and eviction. state: SchedulerState, /// This batch's sequential index and metadata. @@ -225,6 +239,87 @@ impl> ScheduledBatch { self } + /// Looks up this batch's cached per-batch receipt, returning a handle that resolves to the + /// deserialized receipt, or `None` on a cache miss. Served by the read worker so the caller + /// never blocks its async runtime on a store read. + pub fn read_batch_receipt(&self) -> ReceiptRead { + self.submit_read_receipt(self.batch_key()) + } + + /// Stores this batch's per-batch receipt through the write worker, returning a latch that opens + /// once it commits. Independent of the batch's own persistence latches. + pub fn write_batch_receipt(&self, receipt: P::BatchArtifact) -> AtomicAsyncLatch { + self.submit_store_receipt(self.batch_key(), receipt) + } + + /// The per-batch receipt key at this batch's coordinate. + fn batch_key(&self) -> BatchKey { + BatchKey { + prefix: Prefix { checkpoint_index: self.checkpoint.index().into() }, + block_hash: self.checkpoint.metadata().block_hash(), + image_id: self.processor.batch_image_id(), + } + } + + /// Looks up the aggregate (settlement) receipt at `coord`, with this batch as the storage + /// gateway: the aggregate prover holds no store of its own, so it reaches the receipt cache + /// through a batch's storage handle. Resolves to the receipt, or `None` on a miss. + pub fn read_agg_receipt( + &self, + coord: AggReceiptCoord, + ) -> ReceiptRead { + self.submit_read_receipt(self.agg_key(coord)) + } + + /// Stores the aggregate (settlement) receipt at `coord` through the write worker, returning a + /// latch that opens once it commits. This batch is the storage gateway, as for + /// [`read_agg_receipt`](Self::read_agg_receipt). + pub fn write_agg_receipt( + &self, + coord: AggReceiptCoord, + receipt: P::AggregatorArtifact, + ) -> AtomicAsyncLatch { + self.submit_store_receipt(self.agg_key(coord), receipt) + } + + /// The aggregate receipt key at `coord`; this batch supplies the aggregator image id. + fn agg_key(&self, coord: AggReceiptCoord) -> AggregatorKey { + AggregatorKey { + prefix: Prefix { checkpoint_index: coord.checkpoint_index.into() }, + block_hash: coord.from_block, + image_id: self.processor.aggregator_image_id(), + seq_commit: coord.seq_commit, + } + } + + /// Submits a proof-receipt lookup for the typed `key` to the read worker, returning the typed + /// [`ReceiptRead`] handle the caller awaits. The key type determines the stored value's kind + /// and how the served value projects back to the concrete receipt. + pub(crate) fn submit_read_receipt>( + &self, + key: K, + ) -> ReceiptRead { + let (cmd, handle) = ReadReceipt::new(key.into_key(), K::extract); + self.state.storage().submit_read(Read::ReadReceipt(cmd)); + handle + } + + /// Submits `receipt` under the typed `key` to the write worker, returning a latch that opens + /// once it commits. The key type pins the receipt to its matching stored variant. + pub(crate) fn submit_store_receipt>( + &self, + key: K, + receipt: K::Artifact, + ) -> AtomicAsyncLatch { + let committed = AtomicAsyncLatch::new(); + self.state.storage().submit_write(Write::StoreReceipt(StoreReceipt::new( + key.into_key(), + K::wrap(receipt), + committed.clone(), + ))); + committed + } + /// Submits this batch for commit on the write worker. No-op if canceled. pub fn schedule_commit(&self) { if !self.canceled() { @@ -257,6 +352,7 @@ impl> ScheduledBatch { ScheduledBatchData { cancellation: scheduler.cancellation().clone(), + processor: scheduler.processor().clone(), state: scheduler.state().clone(), checkpoint, pending_txs: AtomicU64::new(txs.len() as u64), diff --git a/scheduling/scheduler/src/scheduled_transaction.rs b/scheduling/scheduler/src/scheduled_transaction.rs index 3ca8e93e..114634b2 100644 --- a/scheduling/scheduler/src/scheduled_transaction.rs +++ b/scheduling/scheduler/src/scheduled_transaction.rs @@ -4,13 +4,15 @@ use std::sync::{ }; use arc_swap::ArcSwapOption; +use vprogs_core_atomics::AtomicAsyncLatch; use vprogs_core_macros::smart_pointer; -use vprogs_core_types::SchedulerTransaction; +use vprogs_core_types::{BatchMetadata, SchedulerTransaction}; +use vprogs_state_proof_receipt::{Prefix, TxKey}; use vprogs_storage_types::Store; use crate::{ - AccessHandle, ResourceAccess, ScheduledBatchRef, Scheduler, StateDiff, TransactionContext, - processor::Processor, + AccessHandle, ReceiptRead, ResourceAccess, ScheduledBatch, ScheduledBatchRef, Scheduler, + StateDiff, TransactionContext, processor::Processor, }; /// A transaction progressing through the scheduler's execution pipeline. @@ -54,6 +56,32 @@ impl> ScheduledTransaction { self.artifact.load_full().expect("artifact not ready") } + /// Looks up this transaction's cached receipt, returning a handle that resolves to the + /// deserialized receipt, or `None` on a cache miss. Served by the read worker through the + /// owning batch's storage handle. + pub fn read_tx_receipt(&self) -> ReceiptRead { + let batch = self.batch.upgrade().expect("owning batch dropped"); + batch.submit_read_receipt(self.tx_key(&batch)) + } + + /// Stores this transaction's receipt through the owning batch's write worker, returning a latch + /// that opens once it commits. + pub fn write_tx_receipt(&self, receipt: P::TransactionArtifact) -> AtomicAsyncLatch { + let batch = self.batch.upgrade().expect("owning batch dropped"); + batch.submit_store_receipt(self.tx_key(&batch), receipt) + } + + /// The per-transaction receipt key at the owning batch's coordinate and this transaction's + /// merge index. + fn tx_key(&self, batch: &ScheduledBatch) -> TxKey { + TxKey { + prefix: Prefix { checkpoint_index: batch.checkpoint().index().into() }, + block_hash: batch.checkpoint().metadata().block_hash(), + image_id: self.processor.tx_image_id(), + merge_idx: self.merge_idx.into(), + } + } + /// Publishes this transaction's artifact. `None` skips without storing an artifact. pub fn publish_artifact(&self, artifact: Option) { if let Some(artifact) = artifact { diff --git a/scheduling/scheduler/src/storage_cmd.rs b/scheduling/scheduler/src/storage_cmd.rs index bb883403..b1bcf8a4 100644 --- a/scheduling/scheduler/src/storage_cmd.rs +++ b/scheduling/scheduler/src/storage_cmd.rs @@ -1,18 +1,262 @@ +use std::sync::Arc; + +use arc_swap::ArcSwapOption; +use vprogs_core_atomics::AtomicAsyncLatch; +use vprogs_state_proof_receipt::{AggregatorKey, BatchKey, ReceiptKey, TxKey}; use vprogs_storage_manager::{ReadCmd, WriteCmd}; -use vprogs_storage_types::{ReadStore, Store}; +use vprogs_storage_types::{ReadStore, StateSpace, Store, WriteBatch}; +use zerocopy::IntoBytes; use crate::{ResourceAccess, ScheduledBatch, StateDiff, processor::Processor, rollback::Rollback}; +/// A typed proof-receipt key: the concrete coordinate for one receipt kind. +/// +/// Each variant carries its actual key struct, so the variant alone pins both the stored key bytes +/// ([`as_bytes`](Self::as_bytes)) and the [`ReceiptValue`] variant a stored blob deserializes into. +/// The storage workers carry a `Key` instead of raw bytes plus a separate kind tag, so the two can +/// never drift apart. +pub(crate) enum Key { + /// A per-transaction receipt key. + Tx(TxKey), + /// A per-batch receipt key. + Batch(BatchKey), + /// An aggregate (settlement) receipt key. + Agg(AggregatorKey), +} + +impl Key { + /// The stored key bytes, starting with the shared `Prefix`. + fn as_bytes(&self) -> &[u8] { + match self { + Key::Tx(key) => key.as_bytes(), + Key::Batch(key) => key.as_bytes(), + Key::Agg(key) => key.as_bytes(), + } + } +} + +/// A typed proof-receipt value, tagged by the kind of receipt it is. +/// +/// The variant pins the value to the matching [`Processor`] artifact type, so a caller cannot store +/// a transaction receipt under a batch key (or vice versa). The storage workers (de)serialize +/// whichever variant they hold; every receipt kind shares the [`StateSpace::ProofReceipt`] column +/// family. +pub enum ReceiptValue> { + /// A per-transaction receipt. + Tx(P::TransactionArtifact), + /// A per-batch receipt. + Batch(P::BatchArtifact), + /// An aggregate (settlement) receipt. + Agg(P::AggregatorArtifact), +} + +impl> ReceiptValue { + /// Serializes the held receipt to its stored byte form. + fn to_bytes(&self) -> Vec { + match self { + ReceiptValue::Tx(receipt) => borsh::to_vec(receipt), + ReceiptValue::Batch(receipt) => borsh::to_vec(receipt), + ReceiptValue::Agg(receipt) => borsh::to_vec(receipt), + } + .expect("failed to serialize receipt") + } + + /// Deserializes stored bytes into the variant named by `key`. + fn from_bytes(key: &Key, bytes: &[u8]) -> Self { + const CORRUPT: &str = "corrupted proof-receipt store"; + match key { + Key::Tx(_) => ReceiptValue::Tx(borsh::from_slice(bytes).expect(CORRUPT)), + Key::Batch(_) => ReceiptValue::Batch(borsh::from_slice(bytes).expect(CORRUPT)), + Key::Agg(_) => ReceiptValue::Agg(borsh::from_slice(bytes).expect(CORRUPT)), + } + } + + /// Moves out the per-transaction receipt. Panics if the value is not a [`Tx`](Self::Tx); the + /// issuer's lookup kind guarantees the variant. + pub(crate) fn into_tx(self) -> P::TransactionArtifact { + match self { + ReceiptValue::Tx(receipt) => receipt, + _ => unreachable!("receipt kind mismatch"), + } + } + + /// Moves out the per-batch receipt. Panics if the value is not a [`Batch`](Self::Batch). + pub(crate) fn into_batch(self) -> P::BatchArtifact { + match self { + ReceiptValue::Batch(receipt) => receipt, + _ => unreachable!("receipt kind mismatch"), + } + } + + /// Moves out the aggregate receipt. Panics if the value is not an [`Agg`](Self::Agg). + pub(crate) fn into_agg(self) -> P::AggregatorArtifact { + match self { + ReceiptValue::Agg(receipt) => receipt, + _ => unreachable!("receipt kind mismatch"), + } + } +} + +/// Ties a proof-receipt key type to the receipt it stores: a single typed key erases into the +/// storage workers' [`Key`] ([`into_key`](Self::into_key)) and drives the handle's projection back +/// to the artifact ([`extract`](Self::extract)) and the write worker's serialization +/// ([`wrap`](Self::wrap)). One impl per key type is the single source of truth pairing a key with +/// its [`ReceiptValue`] variant. +pub(crate) trait ReceiptLookup>: ReceiptKey { + /// The concrete receipt artifact this key stores. + type Artifact; + /// Erases this key into the type-tagged [`Key`] the storage workers carry. + fn into_key(self) -> Key; + /// Projects a served [`ReceiptValue`] to the artifact. + fn extract(value: ReceiptValue) -> Self::Artifact; + /// Wraps an artifact into its [`ReceiptValue`] variant for storage. + fn wrap(artifact: Self::Artifact) -> ReceiptValue; +} + +impl> ReceiptLookup for TxKey { + type Artifact = P::TransactionArtifact; + fn into_key(self) -> Key { + Key::Tx(self) + } + fn extract(value: ReceiptValue) -> Self::Artifact { + value.into_tx() + } + fn wrap(artifact: Self::Artifact) -> ReceiptValue { + ReceiptValue::Tx(artifact) + } +} + +impl> ReceiptLookup for BatchKey { + type Artifact = P::BatchArtifact; + fn into_key(self) -> Key { + Key::Batch(self) + } + fn extract(value: ReceiptValue) -> Self::Artifact { + value.into_batch() + } + fn wrap(artifact: Self::Artifact) -> ReceiptValue { + ReceiptValue::Batch(artifact) + } +} + +impl> ReceiptLookup for AggregatorKey { + type Artifact = P::AggregatorArtifact; + fn into_key(self) -> Key { + Key::Agg(self) + } + fn extract(value: ReceiptValue) -> Self::Artifact { + value.into_agg() + } + fn wrap(artifact: Self::Artifact) -> ReceiptValue { + ReceiptValue::Agg(artifact) + } +} + +/// A proof-receipt lookup served by the read worker, off the issuer's async runtime. +/// +/// The read worker deserializes the stored blob into `slot` as the [`ReceiptValue`] named by `key` +/// (left empty on a cache miss) and opens `ready`, so the proving worker that enqueued it awaits +/// the result via [`ReceiptRead`] instead of blocking its single-threaded runtime on a synchronous +/// store read. +pub struct ReadReceipt> { + /// The typed key identifying the receipt to look up. + key: Key, + /// Filled with the served [`ReceiptValue`], or left empty on a cache miss. + slot: Arc>>, + /// Opens once the lookup has been served. + ready: AtomicAsyncLatch, +} + +/// Handle to an in-flight [`ReadReceipt`]: await [`resolve`](Self::resolve) for the typed receipt +/// `R`, or `None` on a cache miss. `extract` projects the served [`ReceiptValue`] to the issuer's +/// concrete artifact type. +pub struct ReceiptRead, R> { + /// Shared slot the read worker fills with the served [`ReceiptValue`]. + slot: Arc>>, + /// Opens once the lookup has been served. + ready: AtomicAsyncLatch, + /// Projects the served [`ReceiptValue`] to the issuer's concrete artifact type. + extract: fn(ReceiptValue) -> R, +} + +impl> ReadReceipt { + /// Creates a lookup command for the typed `key` together with the typed [`ReceiptRead`] handle + /// its issuer awaits, projecting the served value through `extract`. + pub(crate) fn new( + key: Key, + extract: fn(ReceiptValue) -> R, + ) -> (Self, ReceiptRead) { + let slot = Arc::new(ArcSwapOption::empty()); + let ready = AtomicAsyncLatch::new(); + let handle = ReceiptRead { slot: slot.clone(), ready: ready.clone(), extract }; + (Self { key, slot, ready }, handle) + } + + /// Serves the lookup: fills `slot` from the store (a miss leaves it empty), then opens `ready`. + fn read(&self, store: &RS) { + if let Some(bytes) = store.get(StateSpace::ProofReceipt, self.key.as_bytes()) { + self.slot.store(Some(Arc::new(ReceiptValue::from_bytes(&self.key, &bytes)))); + } + self.ready.open(); + } +} + +impl, R> ReceiptRead { + /// Resolves once the read worker has served the lookup: the typed receipt, or `None` on a cache + /// miss. + pub async fn resolve(self) -> Option { + self.ready.wait().await; + let value = self.slot.swap(None)?; + let value = Arc::into_inner(value).expect("receipt slot uniquely held"); + Some((self.extract)(value)) + } +} + +/// A typed proof-receipt awaiting durable storage in the [`StateSpace::ProofReceipt`] column +/// family. +/// +/// Carries the key bytes, the typed [`ReceiptValue`] (serialized by the write worker), and a latch +/// the worker opens once the write commits, so the proving worker that enqueued it can wait for +/// durability before publishing the matching artifact. +pub struct StoreReceipt> { + /// The typed key the receipt is stored under. + key: Key, + /// The typed receipt, serialized by the write worker. + value: ReceiptValue, + /// Opens once the write commits. + committed: AtomicAsyncLatch, +} + +impl> StoreReceipt { + /// Pairs the typed `key` and `value` with the latch opened once the write commits. + pub(crate) fn new(key: Key, value: ReceiptValue, committed: AtomicAsyncLatch) -> Self { + Self { key, value, committed } + } + + /// Writes the serialized receipt into `wb` under its key. + fn write(&self, wb: &mut W) { + wb.put(StateSpace::ProofReceipt, self.key.as_bytes(), &self.value.to_bytes()); + } + + /// Opens the `committed` latch once the write has landed. + fn done(self) { + self.committed.open(); + } +} + /// Commands dispatched to the storage manager's read worker. pub enum Read> { /// Fetch the latest version data for a resource from disk. LatestData(ResourceAccess), + /// Look up a cached typed proof-receipt by key. + ReadReceipt(ReadReceipt), } impl> ReadCmd for Read { fn exec(&self, store: &RS) { match self { Read::LatestData(resource_access) => resource_access.read_latest_data(store), + Read::ReadReceipt(receipt) => receipt.read(store), } } } @@ -25,6 +269,8 @@ pub enum Write> { CommitBatch(ScheduledBatch), /// Revert all batches after a target checkpoint. Rollback(Rollback), + /// Persist a typed proof-receipt in the proof-receipt column family. + StoreReceipt(StoreReceipt), } impl> WriteCmd for Write { @@ -33,6 +279,7 @@ impl> WriteCmd for Write { Write::StateDiff(state_diff) => state_diff.write(&mut wb), Write::CommitBatch(batch) => batch.commit(store, &mut wb), Write::Rollback(rollback) => return rollback.execute(store, wb), + Write::StoreReceipt(receipt) => receipt.write(&mut wb), } wb } @@ -42,6 +289,7 @@ impl> WriteCmd for Write { Write::StateDiff(state_diff) => state_diff.write_done(), Write::CommitBatch(batch) => batch.commit_done(), Write::Rollback(rollback) => rollback.done(), + Write::StoreReceipt(receipt) => receipt.done(), } } } diff --git a/scheduling/test-utils/Cargo.toml b/scheduling/test-utils/Cargo.toml index 44fe2235..791d2e8c 100644 --- a/scheduling/test-utils/Cargo.toml +++ b/scheduling/test-utils/Cargo.toml @@ -4,7 +4,7 @@ name = "vprogs-scheduling-test-utils" version.workspace = true [dependencies] -vprogs-core-types.workspace = true +vprogs-core-types = { workspace = true, features = ["test-utils"] } vprogs-scheduling-scheduler.workspace = true vprogs-state-version.workspace = true vprogs-storage-rocksdb-store.workspace = true diff --git a/scheduling/test-utils/src/processor.rs b/scheduling/test-utils/src/processor.rs index f0848a01..896dc189 100644 --- a/scheduling/test-utils/src/processor.rs +++ b/scheduling/test-utils/src/processor.rs @@ -20,9 +20,24 @@ impl vprogs_scheduling_scheduler::Processor for Processor { Ok(()) } + // This processor never proves, so its image ids only need to be stable for receipt-cache + // round-trips, not match any real program. + fn tx_image_id(&self) -> [u8; 32] { + [0u8; 32] + } + + fn batch_image_id(&self) -> [u8; 32] { + [1u8; 32] + } + + fn aggregator_image_id(&self) -> [u8; 32] { + [2u8; 32] + } + type Transaction = usize; - type TransactionArtifact = (); - type BatchArtifact = (); + type TransactionArtifact = Vec; + type BatchArtifact = Vec; + type AggregatorArtifact = Vec; type BatchMetadata = u64; type Error = (); } diff --git a/sim/src/driver/l2_driver.rs b/sim/src/driver/l2_driver.rs index db0b20c3..46bb1769 100644 --- a/sim/src/driver/l2_driver.rs +++ b/sim/src/driver/l2_driver.rs @@ -163,7 +163,7 @@ pub struct L2Driver { /// Queue the in-process aggregate prover publishes each bundle handle onto (real_e2e only). /// The driver pops from it to settle proved bundles. `None` in the other modes and before /// the proving stack is built. - settlement_queue: Option>>, + settlement_queue: Option>>>, /// Batches submitted to the aggregate prover but not yet accounted by a bundle outcome /// (real_e2e only). Gates the settlement back-pressure and is reconciled by each outcome's /// `batches`. @@ -198,7 +198,7 @@ fn build_exec( proving: bool, covenant_id: Option, consensus: Weak, -) -> (Exec, Option>>) { +) -> (Exec, Option>>>) { let db = tempfile::tempdir().expect("temp db dir"); let store = RocksDbStore::open(db.path().join("l2")); let (pipeline, settlement_queue) = if proving { diff --git a/transaction-runtime/transaction-effects/Cargo.toml b/transaction-runtime/transaction-effects/Cargo.toml index a905f9a0..386d7923 100644 --- a/transaction-runtime/transaction-effects/Cargo.toml +++ b/transaction-runtime/transaction-effects/Cargo.toml @@ -2,3 +2,6 @@ edition.workspace = true name = "vprogs-transaction-runtime-transaction-effects" version.workspace = true + +[dependencies] +borsh.workspace = true diff --git a/transaction-runtime/transaction-effects/src/lib.rs b/transaction-runtime/transaction-effects/src/lib.rs index fc7acb68..a6c91728 100644 --- a/transaction-runtime/transaction-effects/src/lib.rs +++ b/transaction-runtime/transaction-effects/src/lib.rs @@ -1 +1,4 @@ +use borsh::{BorshDeserialize, BorshSerialize}; + +#[derive(Clone, Debug, Default, BorshSerialize, BorshDeserialize)] pub struct TransactionEffects {} diff --git a/zk/aggregate-prover/Cargo.toml b/zk/aggregate-prover/Cargo.toml index c37846ba..cb2f8825 100644 --- a/zk/aggregate-prover/Cargo.toml +++ b/zk/aggregate-prover/Cargo.toml @@ -15,7 +15,15 @@ vprogs-core-macros.workspace = true vprogs-core-smt.workspace = true vprogs-l1-types.workspace = true vprogs-scheduling-scheduler.workspace = true +vprogs-state-proof-receipt.workspace = true vprogs-storage-types.workspace = true vprogs-zk-abi = { workspace = true, features = ["host"] } vprogs-zk-batch-prover.workspace = true vprogs-zk-transaction-prover.workspace = true + +[dev-dependencies] +tempfile.workspace = true +vprogs-core-types.workspace = true +vprogs-scheduling-test-utils.workspace = true +vprogs-storage-manager.workspace = true +vprogs-storage-rocksdb-store.workspace = true diff --git a/zk/aggregate-prover/src/backend.rs b/zk/aggregate-prover/src/backend.rs index 1a0b720a..02cd84d5 100644 --- a/zk/aggregate-prover/src/backend.rs +++ b/zk/aggregate-prover/src/backend.rs @@ -13,7 +13,9 @@ pub trait Backend: vprogs_zk_batch_prover::Backend { batch_receipts: Vec, ) -> impl Future + Send + 'static; - /// Trusted per-batch (batch-processor) image id, written into the aggregator inputs so the - /// guest can verify each composed batch journal. - fn batch_image_id(&self) -> &[u8; 32]; + /// Aggregator image id: the program identifier that keys a settlement (bundle) receipt in the + /// proof-receipt store. The trusted batch image the aggregator verifies its composed batch + /// journals against is [`batch_image_id`](vprogs_zk_batch_prover::Backend::batch_image_id), + /// inherited from the per-batch backend. + fn aggregator_image_id(&self) -> &[u8; 32]; } diff --git a/zk/aggregate-prover/src/config.rs b/zk/aggregate-prover/src/config.rs index efce37cb..034b06ad 100644 --- a/zk/aggregate-prover/src/config.rs +++ b/zk/aggregate-prover/src/config.rs @@ -2,7 +2,7 @@ use kaspa_hashes::Hash; use vprogs_core_atomics::AsyncQueue; use vprogs_zk_batch_prover::LaneProofSource; -use crate::ScheduledBundle; +use crate::{ScheduledBundle, SettlementArtifact}; /// Static configuration for the aggregate prover. /// @@ -22,5 +22,5 @@ pub struct AggregateProverConfig { /// before its proof exists; the consumer awaits the artifact), for a settlement worker to act /// on. `None` runs the prover without settling (e.g. exec/test paths); proved bundles are /// then only logged. - pub settlement_queue: Option>>, + pub settlement_queue: Option>>>, } diff --git a/zk/aggregate-prover/src/prover.rs b/zk/aggregate-prover/src/prover.rs index a33d8143..b1c7ed54 100644 --- a/zk/aggregate-prover/src/prover.rs +++ b/zk/aggregate-prover/src/prover.rs @@ -39,6 +39,7 @@ impl> AggregateProver { S, TransactionArtifact = B::Receipt, BatchArtifact = B::Receipt, + AggregatorArtifact = B::Receipt, BatchMetadata = ChainBlockMetadata, >, { diff --git a/zk/aggregate-prover/src/scheduled_bundle.rs b/zk/aggregate-prover/src/scheduled_bundle.rs index 31e2b1ca..24a641e3 100644 --- a/zk/aggregate-prover/src/scheduled_bundle.rs +++ b/zk/aggregate-prover/src/scheduled_bundle.rs @@ -5,8 +5,8 @@ use kaspa_hashes::Hash; use vprogs_core_atomics::AtomicAsyncLatch; use vprogs_core_macros::smart_pointer; use vprogs_l1_types::SettlementInfo; - -use crate::SettlementArtifact; +use vprogs_scheduling_scheduler::{AggReceiptCoord, Processor, ReceiptRead, ScheduledBatch}; +use vprogs_storage_types::Store; /// The L1 block span a bundle proves over. #[derive(Clone, Copy)] @@ -17,12 +17,12 @@ pub struct BundleBlocks { pub block_prove_to: Hash, } -/// A bundle the aggregate prover has formed and published to the settlement queue. +/// A formed bundle of proved batches, published to a settlement consumer before its artifact +/// exists. /// -/// Mirrors [`ScheduledBatch`](vprogs_scheduling_scheduler::ScheduledBatch)'s artifact mechanism: -/// the handle is pushed onto the settlement queue *before* its proof exists, so a consumer can pop -/// it, read the immediate metadata (e.g. reconcile pacing against `batches`), and then await the -/// proved [`SettlementArtifact`]. The aggregate worker fills the artifact via +/// Mirrors [`ScheduledBatch`]'s artifact mechanism: the handle is published *before* its artifact +/// exists, so a consumer can pop it, read the immediate metadata (e.g. reconcile pacing against +/// `batches`), and then await the proved artifact `A`, filled via /// [`publish_artifact`](Self::publish_artifact) once proving completes. /// /// A no-op bundle (all-empty prefix, or one that advanced no state) carries no artifact: it is @@ -30,13 +30,12 @@ pub struct BundleBlocks { /// formed bundle still produces exactly one handle, so a consumer that paces itself against proving /// can account for all submitted batches by summing each handle's `batches`. #[smart_pointer] -pub struct ScheduledBundle { +pub struct ScheduledBundle { /// Number of scheduled batches this bundle consumed (including empty batches in the ready /// prefix). Readable immediately, before the artifact is published. batches: usize, - /// The bundle's first checkpoint index (bundle-start). With `from_block` it is the - /// bundle-start coordinate that keys the aggregator receipt's prefix in the proof-receipt - /// store. Immediate. + /// The bundle's first checkpoint index (bundle-start). With `from_block`, the coordinate that + /// keys the bundle's aggregate receipt. Immediate. checkpoint_index: u64, /// L1 block at the bundle's first checkpoint (the block it proves *from*), pairing with /// `block_prove_to`. Together with `checkpoint_index` it keys the aggregator receipt and keeps @@ -45,19 +44,18 @@ pub struct ScheduledBundle { /// L1 block the bundle proves through (its final block). Immediate, for logging / pacing. block_prove_to: Hash, /// Most-recent covenant settlement visible on chain as of the bundle's final block, or `None` - /// until one lands. Lets the settler skip a bundle an external settlement already covered: the - /// same redundancy the aggregate prover applies when forming bundles. Immediate. + /// until one lands. Immediate. latest_settlement: Option, - /// The proved settlement, filled via [`publish_artifact`](Self::publish_artifact). `None` for - /// a no-op bundle that advanced no state. - settlement: ArcSwapOption>, + /// The proved artifact, filled via [`publish_artifact`](Self::publish_artifact). `None` for a + /// no-op bundle that advanced no state. + artifact: ArcSwapOption, /// Opens when the artifact has been published (or resolved as a no-op). artifact_published: AtomicAsyncLatch, } -impl ScheduledBundle { - /// Creates an unresolved bundle handle: its metadata is readable immediately, the settlement - /// artifact is filled later via [`publish_artifact`](Self::publish_artifact). +impl ScheduledBundle { + /// Creates an unresolved bundle handle: its metadata is readable immediately, the artifact is + /// filled later via [`publish_artifact`](Self::publish_artifact). pub fn new( batches: usize, checkpoint_index: u64, @@ -71,13 +69,13 @@ impl ScheduledBundle { from_block, block_prove_to, latest_settlement, - settlement: ArcSwapOption::empty(), + artifact: ArcSwapOption::empty(), artifact_published: AtomicAsyncLatch::new(), })) } /// Creates an immediately-resolved no-op bundle: it advanced no state, so it carries no - /// settlement and its artifact latch is already open. + /// artifact and its artifact latch is already open. pub fn resolved_noop( batches: usize, checkpoint_index: u64, @@ -93,7 +91,7 @@ impl ScheduledBundle { from_block, block_prove_to, latest_settlement, - settlement: ArcSwapOption::empty(), + artifact: ArcSwapOption::empty(), artifact_published, })) } @@ -124,22 +122,56 @@ impl ScheduledBundle { self.latest_settlement } - /// Publishes the bundle's settlement artifact and opens the `artifact_published` latch. A - /// `None` artifact resolves the handle as a no-op (the consumer skips it). - pub fn publish_artifact(&self, artifact: Option>) { + /// Looks up this bundle's cached aggregate (settlement) receipt, returning a handle that + /// resolves to the deserialized receipt, or `None` on a cache miss. `gateway` is any batch in + /// the bundle, supplying the storage handle the aggregate prover reaches the cache through. + pub fn read_agg_receipt>( + &self, + gateway: &ScheduledBatch, + seq_commit: [u8; 32], + ) -> ReceiptRead { + gateway.read_agg_receipt(AggReceiptCoord { + checkpoint_index: self.checkpoint_index, + from_block: self.from_block.as_bytes(), + seq_commit, + }) + } + + /// Stores this bundle's aggregate (settlement) receipt through the write worker, returning a + /// latch that opens once it commits. `gateway` supplies the storage handle, as for + /// [`read_agg_receipt`](Self::read_agg_receipt). + pub fn write_agg_receipt>( + &self, + gateway: &ScheduledBatch, + seq_commit: [u8; 32], + receipt: P::AggregatorArtifact, + ) -> AtomicAsyncLatch { + gateway.write_agg_receipt( + AggReceiptCoord { + checkpoint_index: self.checkpoint_index, + from_block: self.from_block.as_bytes(), + seq_commit, + }, + receipt, + ) + } + + /// Publishes the bundle's artifact and opens the `artifact_published` latch. A `None` artifact + /// resolves the handle as a no-op (the consumer skips it). + pub fn publish_artifact(&self, artifact: Option) { if let Some(artifact) = artifact { - self.settlement.store(Some(Arc::new(artifact))); + self.artifact.store(Some(Arc::new(artifact))); } self.artifact_published.open(); } - /// Returns the published settlement artifact, or `None` if the bundle resolved as a no-op. + /// Returns the published artifact, or `None` if the bundle resolved as a no-op. /// /// Must be called after [`wait_artifact_published`](Self::wait_artifact_published): the handle /// is visible to consumers before its artifact exists, so an early call returns `None` for an /// unresolved bundle rather than panicking. - pub fn artifact(&self) -> Option>> { - self.settlement.load_full() + pub fn artifact(&self) -> Option> { + self.artifact.load_full() } /// Waits until the bundle's artifact has been published (or resolved as a no-op). diff --git a/zk/aggregate-prover/src/worker.rs b/zk/aggregate-prover/src/worker.rs index 6e7d23cd..b99a7e83 100644 --- a/zk/aggregate-prover/src/worker.rs +++ b/zk/aggregate-prover/src/worker.rs @@ -34,7 +34,7 @@ pub(crate) struct Worker, B: Backend, L: LaneProofSour lane_source: L, /// Queue each formed bundle's [`ScheduledBundle`] handle is published onto for on-chain /// settlement, or `None` to run without settling. - settlement_queue: Option>>, + settlement_queue: Option>>>, /// Batches accumulated but not yet bundled, in scheduling order. queued: VecDeque>, /// Last settled L1 block (the lower bound a new bundle chains from). `None` at genesis. @@ -52,6 +52,7 @@ where S, TransactionArtifact = B::Receipt, BatchArtifact = B::Receipt, + AggregatorArtifact = B::Receipt, BatchMetadata = ChainBlockMetadata, >, { @@ -196,27 +197,48 @@ where ); self.emit(handle.clone()); - // Aggregate the bundle: fetch the final block's lane proof, encode the aggregator inputs - // over the per-batch journals, and prove with the per-batch receipts as composition - // assumptions. - let lane_proof = self - .lane_source - .fetch_lane_proof(LaneProofRequest { block: block_prove_to, lane_key: self.lane_key }) - .await; - let journals: Vec> = receipts.iter().map(|r| B::journal_bytes(r)).collect(); - let inputs = AggregatorInputs::encode( - self.backend.batch_image_id(), - &lane_proof, - journals.iter().map(|j| j.as_slice()), - ); - let receipt = self.backend.prove_aggregator(&inputs, receipts).await; - if self.prover.shutdown.is_open() { - // Shutting down: resolve the published handle as a no-op so a consumer awaiting its - // artifact is released rather than blocked on a latch that never opens, and drop the - // proved bundle (the same discard-on-shutdown behavior as before). - handle.publish_artifact(None); - return true; - } + // The bundle's `from -> to` coordinate (its start checkpoint + block, claimed tip + // commitment) proves to the same settlement receipt, so a replay (including a flip reorg + // back onto this fork) reuses the cached one instead of re-fetching the lane proof and + // re-proving. The bundle keys the receipt off its own start coordinate and the claimed tip + // `seq_commit`; its first batch is the storage gateway (the aggregate prover holds no store + // of its own) and supplies the aggregator image id. + let seq_commit = last_metadata.seq_commit.as_bytes(); + let first_batch = bundle.first().unwrap(); + let receipt = match handle.read_agg_receipt(first_batch, seq_commit).resolve().await { + Some(receipt) => receipt, + None => { + // Aggregate the bundle: fetch the final block's lane proof, encode the aggregator + // inputs over the per-batch journals, and prove with the per-batch receipts as + // composition assumptions. + let lane_proof = self + .lane_source + .fetch_lane_proof(LaneProofRequest { + block: block_prove_to, + lane_key: self.lane_key, + }) + .await; + let journals: Vec> = receipts.iter().map(|r| B::journal_bytes(r)).collect(); + let inputs = AggregatorInputs::encode( + self.backend.batch_image_id(), + &lane_proof, + journals.iter().map(|j| j.as_slice()), + ); + let receipt = self.backend.prove_aggregator(&inputs, receipts).await; + if self.prover.shutdown.is_open() { + // Shutting down: resolve the published handle as a no-op so a consumer awaiting + // its artifact is released rather than blocked on a latch that never opens, and + // drop the proved bundle (the same discard-on-shutdown behavior as before). + handle.publish_artifact(None); + return true; + } + + // Wait for the receipt to be durable before publishing the artifact, so a crash + // never leaves a consumed-but-uncached settlement receipt. + handle.write_agg_receipt(first_batch, seq_commit, receipt.clone()).wait().await; + receipt + } + }; // Parse the settlement journal. let journal = B::journal_bytes(&receipt); @@ -277,7 +299,7 @@ where /// Publishes a formed bundle's handle onto the settlement queue, if one is wired. With no queue /// the prover runs without settling and the handle is dropped. - fn emit(&self, bundle: ScheduledBundle) { + fn emit(&self, bundle: ScheduledBundle>) { if let Some(queue) = &self.settlement_queue { queue.push(bundle); } diff --git a/zk/aggregate-prover/tests/proof_receipt_cache.rs b/zk/aggregate-prover/tests/proof_receipt_cache.rs new file mode 100644 index 00000000..518d0677 --- /dev/null +++ b/zk/aggregate-prover/tests/proof_receipt_cache.rs @@ -0,0 +1,63 @@ +//! Round-trips proof-receipt blobs through the storage read/write workers via the +//! [`ScheduledBatch`] and [`ScheduledBundle`] handles, covering the cache-hit path the dev-mode +//! proving sims don't reach (they only ever miss-then-store, never replay a fork to read a stored +//! receipt back). + +use kaspa_hashes::Hash; +use tempfile::TempDir; +use vprogs_core_types::BatchMetadata; +use vprogs_scheduling_scheduler::{ExecutionConfig, Scheduler}; +use vprogs_scheduling_test_utils::Processor; +use vprogs_storage_manager::StorageConfig; +use vprogs_storage_rocksdb_store::RocksDbStore; +use vprogs_zk_aggregate_prover::{BundleBlocks, ScheduledBundle}; + +#[test] +fn proof_receipt_round_trips_through_storage_workers() { + let temp_dir = TempDir::new().expect("failed to create temp dir"); + let storage: RocksDbStore = RocksDbStore::open(temp_dir.path()); + let mut scheduler = Scheduler::new( + ExecutionConfig::default().with_processor(Processor), + StorageConfig::default().with_store(storage), + ); + + // An empty batch yields a live handle whose storage manager runs both workers; the receipt + // column family is independent of the batch's own (empty) state. The batch derives its own + // per-batch receipt key from its checkpoint coordinate and the processor's image id. + let batch = scheduler.schedule(1, vec![]); + + // A single-batch bundle sharing the batch's start coordinate; the batch is its storage gateway + // for the aggregate receipt. + let block = Hash::from_bytes(batch.checkpoint().metadata().block_hash()); + let bundle: ScheduledBundle<()> = ScheduledBundle::new( + 1, + batch.checkpoint().index(), + BundleBlocks { from_block: block, block_prove_to: block }, + None, + ); + + let rt = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .expect("failed to build tokio runtime"); + rt.block_on(async { + let batch_receipt = vec![9u8; 64]; + + // Cache miss before anything is stored. + assert!(batch.read_batch_receipt().resolve().await.is_none()); + + // Store the receipt, wait for the write worker to commit it, then read it back: the + // read worker serves the exact receipt (the flip-reorg acceleration path). + batch.write_batch_receipt(batch_receipt.clone()).wait().await; + assert_eq!(batch.read_batch_receipt().resolve().await, Some(batch_receipt)); + + // The aggregate receipt at the same coordinate keys differently (it is a distinct kind), so + // it misses despite the stored batch receipt, then round-trips on its own key. + let agg_receipt = vec![7u8; 32]; + assert!(bundle.read_agg_receipt(&batch, [0u8; 32]).resolve().await.is_none()); + bundle.write_agg_receipt(&batch, [0u8; 32], agg_receipt.clone()).wait().await; + assert_eq!(bundle.read_agg_receipt(&batch, [0u8; 32]).resolve().await, Some(agg_receipt)); + }); + + scheduler.shutdown(); +} diff --git a/zk/backend/risc0/api/src/backend.rs b/zk/backend/risc0/api/src/backend.rs index c29ef26f..05a59565 100644 --- a/zk/backend/risc0/api/src/backend.rs +++ b/zk/backend/risc0/api/src/backend.rs @@ -132,6 +132,10 @@ impl vprogs_zk_batch_prover::Backend for Backend { fn journal_bytes(receipt: &Receipt) -> Vec { receipt.journal.bytes.clone() } + + fn batch_image_id(&self) -> &[u8; 32] { + &self.batch_processor.id + } } impl vprogs_zk_aggregate_prover::Backend for Backend { @@ -163,7 +167,7 @@ impl vprogs_zk_aggregate_prover::Backend for Backend { })) } - fn batch_image_id(&self) -> &[u8; 32] { - &self.batch_processor.id + fn aggregator_image_id(&self) -> &[u8; 32] { + &self.aggregator.id } } diff --git a/zk/backend/risc0/settler/src/worker.rs b/zk/backend/risc0/settler/src/worker.rs index 4f602525..53ca50f7 100644 --- a/zk/backend/risc0/settler/src/worker.rs +++ b/zk/backend/risc0/settler/src/worker.rs @@ -49,7 +49,7 @@ pub struct SettlementWorkerConfig { /// panicking on a rejected settlement or a confirmation timeout (propagated through its /// `JoinHandle`). pub async fn run( - queue: AsyncQueue>, + queue: AsyncQueue>>, cfg: SettlementWorkerConfig, covenant: CovenantState, shutdown: AtomicAsyncLatch, diff --git a/zk/batch-prover/Cargo.toml b/zk/batch-prover/Cargo.toml index c05ed8d4..87c8da88 100644 --- a/zk/batch-prover/Cargo.toml +++ b/zk/batch-prover/Cargo.toml @@ -13,6 +13,7 @@ vprogs-core-atomics.workspace = true vprogs-core-macros.workspace = true vprogs-l1-types.workspace = true vprogs-scheduling-scheduler.workspace = true +vprogs-state-proof-receipt.workspace = true vprogs-storage-types.workspace = true vprogs-zk-abi = { workspace = true, features = ["host"] } vprogs-zk-transaction-prover.workspace = true diff --git a/zk/batch-prover/src/backend.rs b/zk/batch-prover/src/backend.rs index d12ce85f..c6de6469 100644 --- a/zk/batch-prover/src/backend.rs +++ b/zk/batch-prover/src/backend.rs @@ -13,4 +13,9 @@ pub trait Backend: vprogs_zk_transaction_prover::Backend { /// Extracts journal bytes from a receipt. fn journal_bytes(receipt: &Self::Receipt) -> Vec; + + /// Trusted batch-processor image id: the program identifier that keys a per-batch receipt in + /// the proof-receipt store, and the trusted image the aggregator verifies each composed batch + /// journal against. + fn batch_image_id(&self) -> &[u8; 32]; } diff --git a/zk/batch-prover/src/worker.rs b/zk/batch-prover/src/worker.rs index 8c3e7c4b..9d6b1dca 100644 --- a/zk/batch-prover/src/worker.rs +++ b/zk/batch-prover/src/worker.rs @@ -79,25 +79,39 @@ where return; } - // Collect SMT proof at the version preceding the batch's checkpoint. - let prev_version = batch.checkpoint().index().saturating_sub(1); - let proof_bytes = self.store.prove(&batch.resource_ids(), prev_version).expect("proof"); + // The (checkpoint, block, batch-image) coordinate proves to the same per-batch receipt, so + // a replay (including a flip reorg back onto this fork) reuses the cached one instead of + // re-collecting the SMT proof and re-proving. + let receipt = match batch.read_batch_receipt().resolve().await { + Some(receipt) => receipt, + None => { + // Collect SMT proof at the version preceding the batch's checkpoint. + let prev_version = batch.checkpoint().index().saturating_sub(1); + let proof_bytes = + self.store.prove(&batch.resource_ids(), prev_version).expect("proof"); - // One pass: per-tx journal bytes (inputs) + receipt clones (proof composition). - let (journals, receipts): (Vec<_>, Vec<_>) = - batch.tx_artifacts().map(|a| (B::journal_bytes(&a), (*a).clone())).unzip(); + // One pass: per-tx journal bytes (inputs) + receipt clones (proof composition). + let (journals, receipts): (Vec<_>, Vec<_>) = + batch.tx_artifacts().map(|a| (B::journal_bytes(&a), (*a).clone())).unzip(); - // Encode the inputs for the proof. - let covenant_id = self.config.covenant_id.map(|h| h.as_bytes()).unwrap_or_default(); - let input_bytes = BatchInputs::encode( - (self.backend.image_id(), &covenant_id, &self.config.lane_key), - &proof_bytes, - batch.checkpoint().metadata(), - &journals, - ); + // Encode the inputs for the proof. + let covenant_id = self.config.covenant_id.map(|h| h.as_bytes()).unwrap_or_default(); + let input_bytes = BatchInputs::encode( + (self.backend.image_id(), &covenant_id, &self.config.lane_key), + &proof_bytes, + batch.checkpoint().metadata(), + &journals, + ); - // Compose the batch proof against those tx receipts. - let receipt = self.backend.prove_batch(&input_bytes, receipts).await; + // Compose the batch proof against those tx receipts. + let receipt = self.backend.prove_batch(&input_bytes, receipts).await; + + // Wait for the receipt to be durable before publishing the artifact, so a crash + // never leaves a consumed-but-uncached receipt. + batch.write_batch_receipt(receipt.clone()).wait().await; + receipt + } + }; // Publish the receipt as the batch's artifact. batch.publish_artifact(Some(receipt)); diff --git a/zk/transaction-prover/Cargo.toml b/zk/transaction-prover/Cargo.toml index 3a3ddc13..c6ec57f1 100644 --- a/zk/transaction-prover/Cargo.toml +++ b/zk/transaction-prover/Cargo.toml @@ -4,9 +4,12 @@ name = "vprogs-zk-transaction-prover" version.workspace = true [dependencies] +borsh.workspace = true tap.workspace = true tokio = { workspace = true, features = ["rt"] } vprogs-core-atomics.workspace = true vprogs-core-macros.workspace = true +vprogs-l1-types.workspace = true vprogs-scheduling-scheduler.workspace = true +vprogs-state-proof-receipt.workspace = true vprogs-storage-types.workspace = true diff --git a/zk/transaction-prover/src/backend.rs b/zk/transaction-prover/src/backend.rs index 8542c673..d9a418b5 100644 --- a/zk/transaction-prover/src/backend.rs +++ b/zk/transaction-prover/src/backend.rs @@ -2,8 +2,9 @@ use std::future::Future; /// ZK backend for transaction proving. pub trait Backend: Clone + Send + Sync + 'static { - /// Proof receipt type produced by this backend. - type Receipt: Clone + Send + Sync + 'static; + /// Proof receipt type produced by this backend. The `Borsh` bounds let the proving workers + /// cache it in (and restore it from) the proof-receipt store. + type Receipt: Clone + Send + Sync + borsh::BorshSerialize + borsh::BorshDeserialize + 'static; /// Returns the guest image ID. fn image_id(&self) -> &[u8; 32]; diff --git a/zk/transaction-prover/src/prover.rs b/zk/transaction-prover/src/prover.rs index df4c1164..5fd155c0 100644 --- a/zk/transaction-prover/src/prover.rs +++ b/zk/transaction-prover/src/prover.rs @@ -5,6 +5,7 @@ use std::{ use vprogs_core_atomics::{AsyncQueue, AtomicAsyncLatch}; use vprogs_core_macros::smart_pointer; +use vprogs_l1_types::ChainBlockMetadata; use vprogs_scheduling_scheduler::{Processor, ScheduledTransaction}; use vprogs_storage_types::Store; @@ -23,7 +24,10 @@ pub struct TransactionProver> { impl> TransactionProver { /// Creates a new transaction prover and spawns its worker thread. - pub fn new>(backend: B) -> Self { + pub fn new>(backend: B) -> Self + where + P: Processor, + { let prover = Self(Arc::new(TransactionProverData { inbox: AsyncQueue::new(), shutdown: AtomicAsyncLatch::new(), diff --git a/zk/transaction-prover/src/worker.rs b/zk/transaction-prover/src/worker.rs index 8e407663..d9fce1f9 100644 --- a/zk/transaction-prover/src/worker.rs +++ b/zk/transaction-prover/src/worker.rs @@ -1,12 +1,13 @@ use std::thread::{JoinHandle, spawn}; use tokio::runtime::Builder; +use vprogs_l1_types::ChainBlockMetadata; use vprogs_scheduling_scheduler::Processor; use vprogs_storage_types::Store; use crate::{Backend, TransactionProver}; -/// Background worker that dispatches transaction proofs to a [`Backend`] concurrently. +/// Background worker that proves queued transactions through a [`Backend`], one at a time. pub(crate) struct Worker, B: Backend> { /// Shared prover state (inbox, shutdown). prover: TransactionProver, @@ -14,7 +15,11 @@ pub(crate) struct Worker, B: Backend> { backend: B, } -impl, B: Backend> Worker { +impl Worker +where + S: Store, + P: Processor, +{ /// Spawns the worker on a new thread with a single-threaded tokio runtime and returns its join /// handle. The prover joins this on shutdown so the worker's GPU prover is torn down (its risc0 /// CUDA context released) before the process exits. @@ -29,15 +34,30 @@ impl, B: Backend> Wo // Register notification before draining so we don't miss pushes. let notified = self.prover.inbox.notified(); - // Dispatch all queued transactions for proving. + // Prove each queued transaction in turn. while let Some((tx, tx_inputs)) = self.prover.inbox.pop() { - if tx.batch().upgrade().is_some_and(|b| !b.canceled()) { - let receipt = self.backend.prove_transaction(tx_inputs); - tokio::spawn(async move { tx.publish_artifact(Some(receipt.await)) }); - } else { + // Hold the batch alive for the iteration: the tx's receipt lookups reach storage + // through it, and dropping it would also drop the only path to the cache. + let Some(_batch) = tx.batch().upgrade().filter(|b| !b.canceled()) else { // Canceled or dropped batch: advance the counter without proving. tx.publish_artifact(None); + continue; + }; + + // Coordinate this receipt with the proof-receipt cache: the same (checkpoint, + // block, image, merge index) coordinate proves to the same receipt, so a replay + // (including a flip reorg back onto this fork) reuses it instead of re-proving. + if let Some(receipt) = tx.read_tx_receipt().resolve().await { + tx.publish_artifact(Some(receipt)); + continue; } + + let receipt = self.backend.prove_transaction(tx_inputs).await; + + // Wait for the receipt to be durable before publishing the artifact, so a crash + // never leaves a consumed-but-uncached receipt. + tx.write_tx_receipt(receipt.clone()).wait().await; + tx.publish_artifact(Some(receipt)); } // Wait for a new submission or shutdown. diff --git a/zk/vm/src/backend.rs b/zk/vm/src/backend.rs index 6921f261..87c8dca3 100644 --- a/zk/vm/src/backend.rs +++ b/zk/vm/src/backend.rs @@ -1,5 +1,7 @@ -/// Full ZK backend: synchronous execution plus transaction and batch proving. -pub trait Backend: vprogs_zk_batch_prover::Backend { +/// Full ZK backend: synchronous execution plus transaction, batch, and settlement-aggregation +/// proving. Extending the aggregate backend lets the VM expose every program image id (including +/// the aggregator's) through its [`Processor`](vprogs_scheduling_scheduler::Processor) impl. +pub trait Backend: vprogs_zk_aggregate_prover::Backend { /// Execute a transaction from pre-encoded wire bytes and return the raw result. fn execute_transaction(&self, wire_bytes: &[u8]) -> Vec; } diff --git a/zk/vm/src/proving_pipeline.rs b/zk/vm/src/proving_pipeline.rs index be285cc4..8ee30a60 100644 --- a/zk/vm/src/proving_pipeline.rs +++ b/zk/vm/src/proving_pipeline.rs @@ -22,7 +22,10 @@ pub enum ProvingPipeline> { impl> ProvingPipeline { /// Creates a transaction-only proving pipeline. - pub fn transaction>(backend: B) -> Self { + pub fn transaction>(backend: B) -> Self + where + P: Processor, + { Self::Transaction(TransactionProver::new(backend)) } @@ -59,6 +62,7 @@ impl> ProvingPipeline { S, TransactionArtifact = B::Receipt, BatchArtifact = B::Receipt, + AggregatorArtifact = B::Receipt, BatchMetadata = ChainBlockMetadata, >, { diff --git a/zk/vm/src/vm.rs b/zk/vm/src/vm.rs index a3d2d435..662c1bd4 100644 --- a/zk/vm/src/vm.rs +++ b/zk/vm/src/vm.rs @@ -65,9 +65,22 @@ impl Processor for Vm { self.proving_pipeline.shutdown(); } + fn tx_image_id(&self) -> [u8; 32] { + *self.backend.image_id() + } + + fn batch_image_id(&self) -> [u8; 32] { + *self.backend.batch_image_id() + } + + fn aggregator_image_id(&self) -> [u8; 32] { + *self.backend.aggregator_image_id() + } + type Transaction = L1Transaction; type TransactionArtifact = B::Receipt; type BatchArtifact = B::Receipt; + type AggregatorArtifact = B::Receipt; type BatchMetadata = ChainBlockMetadata; type Error = Error; }