From 6f6f0cce763bbaa5fb527d8bc3a0eaa10ea7abb5 Mon Sep 17 00:00:00 2001 From: 143672 Date: Wed, 17 Jun 2026 12:57:27 +0400 Subject: [PATCH 1/4] Move ScheduledBundle into the scheduler crate alongside batch and tx Relocate ScheduledBundle and BundleBlocks from the aggregate-prover crate to the scheduler crate, so the bundle handle sits next to ScheduledBatch and ScheduledTransaction. The handle is now generic over its artifact type A (opaque, like ScheduledBatch's P::BatchArtifact) rather than the receipt, which keeps the scheduler free of settlement/proof-system structs and avoids a scheduler -> aggregate-prover dependency cycle. SettlementArtifact stays in aggregate-prover; call sites move from ScheduledBundle to ScheduledBundle>. The scheduler gains kaspa-hashes and vprogs-l1-types for the bundle's metadata fields, and the settler crate gains a scheduler dependency. Neutralize the moved bundle docs so the scheduler crate no longer names downstream crates (aggregate prover, settler, proof-receipt store), matching ScheduledBatch's generic voice. --- Cargo.lock | 3 ++ examples/tn10-flow/src/daemon.rs | 6 +-- scheduling/scheduler/Cargo.toml | 2 + scheduling/scheduler/src/lib.rs | 2 + .../scheduler}/src/scheduled_bundle.rs | 54 +++++++++---------- sim/src/driver/l2_driver.rs | 8 +-- zk/aggregate-prover/src/config.rs | 5 +- zk/aggregate-prover/src/lib.rs | 2 - zk/aggregate-prover/src/worker.rs | 9 ++-- zk/backend/risc0/settler/Cargo.toml | 1 + zk/backend/risc0/settler/src/worker.rs | 5 +- 11 files changed, 50 insertions(+), 47 deletions(-) rename {zk/aggregate-prover => scheduling/scheduler}/src/scheduled_bundle.rs (71%) diff --git a/Cargo.lock b/Cargo.lock index a5e9244a..a32fbc27 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8714,6 +8714,7 @@ dependencies = [ "borsh", "crossbeam-deque", "crossbeam-queue", + "kaspa-hashes", "num_cpus", "tap", "tempfile", @@ -8725,6 +8726,7 @@ dependencies = [ "vprogs-core-smt", "vprogs-core-test-utils", "vprogs-core-types", + "vprogs-l1-types", "vprogs-scheduling-execution-workers", "vprogs-scheduling-test-utils", "vprogs-state-batch-metadata", @@ -9146,6 +9148,7 @@ dependencies = [ "vprogs-core-atomics", "vprogs-core-smt", "vprogs-l1-wallet", + "vprogs-scheduling-scheduler", "vprogs-zk-aggregate-prover", "vprogs-zk-backend-risc0-api", "vprogs-zk-backend-risc0-covenant", diff --git a/examples/tn10-flow/src/daemon.rs b/examples/tn10-flow/src/daemon.rs index ab16177b..7b3fa9df 100644 --- a/examples/tn10-flow/src/daemon.rs +++ b/examples/tn10-flow/src/daemon.rs @@ -21,10 +21,10 @@ use kaspa_wrpc_client::prelude::KaspaRpcClient; use vprogs_core_atomics::AsyncQueue; use vprogs_l1_bridge::L1BridgeConfig; use vprogs_node_framework::{Node, NodeConfig}; -use vprogs_scheduling_scheduler::ExecutionConfig; +use vprogs_scheduling_scheduler::{ExecutionConfig, ScheduledBundle}; use vprogs_storage_manager::StorageConfig; use vprogs_storage_rocksdb_store::RocksDbStore; -use vprogs_zk_aggregate_prover::{AggregateProverConfig, ScheduledBundle}; +use vprogs_zk_aggregate_prover::{AggregateProverConfig, SettlementArtifact}; use vprogs_zk_backend_risc0_api::{Backend, ProofType, Receipt}; use vprogs_zk_batch_prover::{BatchProverConfig, LaneProofRequest, LaneProofSource}; use vprogs_zk_vm::{ProvingPipeline, Vm}; @@ -37,7 +37,7 @@ pub type V = Vm; pub type FlowNode = Node; /// Queue of bundle handles the aggregate prover publishes to the settlement worker (one per formed /// bundle). -pub type FlowSettlementQueue = AsyncQueue>; +pub type FlowSettlementQueue = AsyncQueue>>; /// Everything the bridge needs to follow our lane on the remote node. pub struct BridgeParams { diff --git a/scheduling/scheduler/Cargo.toml b/scheduling/scheduler/Cargo.toml index 77e34bac..e67c3c7d 100644 --- a/scheduling/scheduler/Cargo.toml +++ b/scheduling/scheduler/Cargo.toml @@ -8,6 +8,7 @@ arc-swap.workspace = true borsh.workspace = true crossbeam-deque.workspace = true crossbeam-queue.workspace = true +kaspa-hashes.workspace = true num_cpus.workspace = true tap.workspace = true thiserror.workspace = true @@ -17,6 +18,7 @@ vprogs-core-hashing.workspace = true vprogs-core-macros.workspace = true vprogs-core-smt.workspace = true vprogs-core-types.workspace = true +vprogs-l1-types.workspace = true vprogs-scheduling-execution-workers.workspace = true vprogs-state-batch-metadata.workspace = true vprogs-state-metadata.workspace = true diff --git a/scheduling/scheduler/src/lib.rs b/scheduling/scheduler/src/lib.rs index e0c2b6dd..d5c7f0df 100644 --- a/scheduling/scheduler/src/lib.rs +++ b/scheduling/scheduler/src/lib.rs @@ -10,6 +10,7 @@ mod resource; mod resource_access; mod rollback; mod scheduled_batch; +mod scheduled_bundle; mod scheduled_transaction; mod scheduler; mod state; @@ -27,6 +28,7 @@ pub use pruning_worker::PruningWorker; pub(crate) use resource::Resource; pub(crate) use resource_access::ResourceAccess; pub use scheduled_batch::{ScheduledBatch, ScheduledBatchRef}; +pub use scheduled_bundle::{BundleBlocks, ScheduledBundle}; pub use scheduled_transaction::ScheduledTransaction; pub(crate) use scheduled_transaction::ScheduledTransactionRef; pub use scheduler::Scheduler; diff --git a/zk/aggregate-prover/src/scheduled_bundle.rs b/scheduling/scheduler/src/scheduled_bundle.rs similarity index 71% rename from zk/aggregate-prover/src/scheduled_bundle.rs rename to scheduling/scheduler/src/scheduled_bundle.rs index 31e2b1ca..185c2314 100644 --- a/zk/aggregate-prover/src/scheduled_bundle.rs +++ b/scheduling/scheduler/src/scheduled_bundle.rs @@ -6,8 +6,6 @@ use vprogs_core_atomics::AtomicAsyncLatch; use vprogs_core_macros::smart_pointer; use vprogs_l1_types::SettlementInfo; -use crate::SettlementArtifact; - /// The L1 block span a bundle proves over. #[derive(Clone, Copy)] pub struct BundleBlocks { @@ -17,12 +15,12 @@ pub struct BundleBlocks { pub block_prove_to: Hash, } -/// A bundle the aggregate prover has formed and published to the settlement queue. +/// A formed bundle of proved batches, published to a settlement consumer before its artifact +/// exists. /// -/// Mirrors [`ScheduledBatch`](vprogs_scheduling_scheduler::ScheduledBatch)'s artifact mechanism: -/// the handle is pushed onto the settlement queue *before* its proof exists, so a consumer can pop -/// it, read the immediate metadata (e.g. reconcile pacing against `batches`), and then await the -/// proved [`SettlementArtifact`]. The aggregate worker fills the artifact via +/// Mirrors [`ScheduledBatch`](crate::ScheduledBatch)'s artifact mechanism: the handle is published +/// *before* its artifact exists, so a consumer can pop it, read the immediate metadata (e.g. +/// reconcile pacing against `batches`), and then await the proved artifact `A`, filled via /// [`publish_artifact`](Self::publish_artifact) once proving completes. /// /// A no-op bundle (all-empty prefix, or one that advanced no state) carries no artifact: it is @@ -30,13 +28,12 @@ pub struct BundleBlocks { /// formed bundle still produces exactly one handle, so a consumer that paces itself against proving /// can account for all submitted batches by summing each handle's `batches`. #[smart_pointer] -pub struct ScheduledBundle { +pub struct ScheduledBundle { /// Number of scheduled batches this bundle consumed (including empty batches in the ready /// prefix). Readable immediately, before the artifact is published. batches: usize, - /// The bundle's first checkpoint index (bundle-start). With `from_block` it is the - /// bundle-start coordinate that keys the aggregator receipt's prefix in the proof-receipt - /// store. Immediate. + /// The bundle's first checkpoint index (bundle-start). With `from_block`, the coordinate that + /// keys the bundle's aggregate receipt. Immediate. checkpoint_index: u64, /// L1 block at the bundle's first checkpoint (the block it proves *from*), pairing with /// `block_prove_to`. Together with `checkpoint_index` it keys the aggregator receipt and keeps @@ -45,19 +42,18 @@ pub struct ScheduledBundle { /// L1 block the bundle proves through (its final block). Immediate, for logging / pacing. block_prove_to: Hash, /// Most-recent covenant settlement visible on chain as of the bundle's final block, or `None` - /// until one lands. Lets the settler skip a bundle an external settlement already covered: the - /// same redundancy the aggregate prover applies when forming bundles. Immediate. + /// until one lands. Immediate. latest_settlement: Option, - /// The proved settlement, filled via [`publish_artifact`](Self::publish_artifact). `None` for - /// a no-op bundle that advanced no state. - settlement: ArcSwapOption>, + /// The proved artifact, filled via [`publish_artifact`](Self::publish_artifact). `None` for a + /// no-op bundle that advanced no state. + artifact: ArcSwapOption, /// Opens when the artifact has been published (or resolved as a no-op). artifact_published: AtomicAsyncLatch, } -impl ScheduledBundle { - /// Creates an unresolved bundle handle: its metadata is readable immediately, the settlement - /// artifact is filled later via [`publish_artifact`](Self::publish_artifact). +impl ScheduledBundle { + /// Creates an unresolved bundle handle: its metadata is readable immediately, the artifact is + /// filled later via [`publish_artifact`](Self::publish_artifact). pub fn new( batches: usize, checkpoint_index: u64, @@ -71,13 +67,13 @@ impl ScheduledBundle { from_block, block_prove_to, latest_settlement, - settlement: ArcSwapOption::empty(), + artifact: ArcSwapOption::empty(), artifact_published: AtomicAsyncLatch::new(), })) } /// Creates an immediately-resolved no-op bundle: it advanced no state, so it carries no - /// settlement and its artifact latch is already open. + /// artifact and its artifact latch is already open. pub fn resolved_noop( batches: usize, checkpoint_index: u64, @@ -93,7 +89,7 @@ impl ScheduledBundle { from_block, block_prove_to, latest_settlement, - settlement: ArcSwapOption::empty(), + artifact: ArcSwapOption::empty(), artifact_published, })) } @@ -124,22 +120,22 @@ impl ScheduledBundle { self.latest_settlement } - /// Publishes the bundle's settlement artifact and opens the `artifact_published` latch. A - /// `None` artifact resolves the handle as a no-op (the consumer skips it). - pub fn publish_artifact(&self, artifact: Option>) { + /// Publishes the bundle's artifact and opens the `artifact_published` latch. A `None` artifact + /// resolves the handle as a no-op (the consumer skips it). + pub fn publish_artifact(&self, artifact: Option) { if let Some(artifact) = artifact { - self.settlement.store(Some(Arc::new(artifact))); + self.artifact.store(Some(Arc::new(artifact))); } self.artifact_published.open(); } - /// Returns the published settlement artifact, or `None` if the bundle resolved as a no-op. + /// Returns the published artifact, or `None` if the bundle resolved as a no-op. /// /// Must be called after [`wait_artifact_published`](Self::wait_artifact_published): the handle /// is visible to consumers before its artifact exists, so an early call returns `None` for an /// unresolved bundle rather than panicking. - pub fn artifact(&self) -> Option>> { - self.settlement.load_full() + pub fn artifact(&self) -> Option> { + self.artifact.load_full() } /// Waits until the bundle's artifact has been published (or resolved as a no-op). diff --git a/sim/src/driver/l2_driver.rs b/sim/src/driver/l2_driver.rs index db0b20c3..19862066 100644 --- a/sim/src/driver/l2_driver.rs +++ b/sim/src/driver/l2_driver.rs @@ -20,9 +20,9 @@ use vprogs_core_test_utils::ResourceIdExt; use vprogs_core_types::{AccessMetadata, ResourceId, SchedulerTransaction}; use vprogs_l1_types::ChainBlockMetadata; use vprogs_l1_wallet::{build, encode_activity_payload}; -use vprogs_scheduling_scheduler::{Processor, Scheduler}; +use vprogs_scheduling_scheduler::{Processor, ScheduledBundle, Scheduler}; use vprogs_storage_rocksdb_store::RocksDbStore; -use vprogs_zk_aggregate_prover::{AggregateProverConfig, ScheduledBundle, SettlementArtifact}; +use vprogs_zk_aggregate_prover::{AggregateProverConfig, SettlementArtifact}; use vprogs_zk_backend_risc0_api::{Backend, ProofType, Receipt}; use vprogs_zk_backend_risc0_covenant::{ Settlement, SettlementDevInput, build_dev_redeem_script, dev_redeem_script_len, @@ -163,7 +163,7 @@ pub struct L2Driver { /// Queue the in-process aggregate prover publishes each bundle handle onto (real_e2e only). /// The driver pops from it to settle proved bundles. `None` in the other modes and before /// the proving stack is built. - settlement_queue: Option>>, + settlement_queue: Option>>>, /// Batches submitted to the aggregate prover but not yet accounted by a bundle outcome /// (real_e2e only). Gates the settlement back-pressure and is reconciled by each outcome's /// `batches`. @@ -198,7 +198,7 @@ fn build_exec( proving: bool, covenant_id: Option, consensus: Weak, -) -> (Exec, Option>>) { +) -> (Exec, Option>>>) { let db = tempfile::tempdir().expect("temp db dir"); let store = RocksDbStore::open(db.path().join("l2")); let (pipeline, settlement_queue) = if proving { diff --git a/zk/aggregate-prover/src/config.rs b/zk/aggregate-prover/src/config.rs index efce37cb..8dff3b2b 100644 --- a/zk/aggregate-prover/src/config.rs +++ b/zk/aggregate-prover/src/config.rs @@ -1,8 +1,9 @@ use kaspa_hashes::Hash; use vprogs_core_atomics::AsyncQueue; +use vprogs_scheduling_scheduler::ScheduledBundle; use vprogs_zk_batch_prover::LaneProofSource; -use crate::ScheduledBundle; +use crate::SettlementArtifact; /// Static configuration for the aggregate prover. /// @@ -22,5 +23,5 @@ pub struct AggregateProverConfig { /// before its proof exists; the consumer awaits the artifact), for a settlement worker to act /// on. `None` runs the prover without settling (e.g. exec/test paths); proved bundles are /// then only logged. - pub settlement_queue: Option>>, + pub settlement_queue: Option>>>, } diff --git a/zk/aggregate-prover/src/lib.rs b/zk/aggregate-prover/src/lib.rs index eda960f9..c6693187 100644 --- a/zk/aggregate-prover/src/lib.rs +++ b/zk/aggregate-prover/src/lib.rs @@ -2,12 +2,10 @@ mod backend; mod command; mod config; mod prover; -mod scheduled_bundle; mod settlement_artifact; mod worker; pub use backend::Backend; pub use config::AggregateProverConfig; pub use prover::AggregateProver; -pub use scheduled_bundle::{BundleBlocks, ScheduledBundle}; pub use settlement_artifact::SettlementArtifact; diff --git a/zk/aggregate-prover/src/worker.rs b/zk/aggregate-prover/src/worker.rs index 6e7d23cd..432205d4 100644 --- a/zk/aggregate-prover/src/worker.rs +++ b/zk/aggregate-prover/src/worker.rs @@ -9,14 +9,13 @@ use vprogs_core_atomics::AsyncQueue; use vprogs_core_codec::Reader; use vprogs_core_smt::EMPTY_HASH; use vprogs_l1_types::{ChainBlockMetadata, SettlementInfo}; -use vprogs_scheduling_scheduler::{Processor, ScheduledBatch}; +use vprogs_scheduling_scheduler::{BundleBlocks, Processor, ScheduledBatch, ScheduledBundle}; use vprogs_storage_types::Store; use vprogs_zk_abi::batch_aggregator::{Inputs as AggregatorInputs, StateTransition}; use vprogs_zk_batch_prover::{LaneProofRequest, LaneProofSource}; use crate::{ - AggregateProver, AggregateProverConfig, Backend, BundleBlocks, ScheduledBundle, - SettlementArtifact, command::Command, + AggregateProver, AggregateProverConfig, Backend, SettlementArtifact, command::Command, }; /// Background worker that accumulates scheduled batches, forms bundles from the consecutively-ready @@ -34,7 +33,7 @@ pub(crate) struct Worker, B: Backend, L: LaneProofSour lane_source: L, /// Queue each formed bundle's [`ScheduledBundle`] handle is published onto for on-chain /// settlement, or `None` to run without settling. - settlement_queue: Option>>, + settlement_queue: Option>>>, /// Batches accumulated but not yet bundled, in scheduling order. queued: VecDeque>, /// Last settled L1 block (the lower bound a new bundle chains from). `None` at genesis. @@ -277,7 +276,7 @@ where /// Publishes a formed bundle's handle onto the settlement queue, if one is wired. With no queue /// the prover runs without settling and the handle is dropped. - fn emit(&self, bundle: ScheduledBundle) { + fn emit(&self, bundle: ScheduledBundle>) { if let Some(queue) = &self.settlement_queue { queue.push(bundle); } diff --git a/zk/backend/risc0/settler/Cargo.toml b/zk/backend/risc0/settler/Cargo.toml index f003471d..03bd0889 100644 --- a/zk/backend/risc0/settler/Cargo.toml +++ b/zk/backend/risc0/settler/Cargo.toml @@ -16,6 +16,7 @@ tokio = { workspace = true, features = ["ma vprogs-core-atomics.workspace = true vprogs-core-smt.workspace = true vprogs-l1-wallet.workspace = true +vprogs-scheduling-scheduler.workspace = true vprogs-zk-aggregate-prover.workspace = true vprogs-zk-backend-risc0-api = { workspace = true, features = ["host"] } vprogs-zk-backend-risc0-covenant.workspace = true diff --git a/zk/backend/risc0/settler/src/worker.rs b/zk/backend/risc0/settler/src/worker.rs index 4f602525..a7a11a78 100644 --- a/zk/backend/risc0/settler/src/worker.rs +++ b/zk/backend/risc0/settler/src/worker.rs @@ -12,7 +12,8 @@ use kaspa_wrpc_client::prelude::KaspaRpcClient; use secp256k1::Keypair; use vprogs_core_atomics::{AsyncQueue, AtomicAsyncLatch}; use vprogs_l1_wallet::Wallet; -use vprogs_zk_aggregate_prover::{ScheduledBundle, SettlementArtifact}; +use vprogs_scheduling_scheduler::ScheduledBundle; +use vprogs_zk_aggregate_prover::SettlementArtifact; use vprogs_zk_backend_risc0_api::{Backend, Receipt}; use crate::covenant::{CovenantState, build_settlement}; @@ -49,7 +50,7 @@ pub struct SettlementWorkerConfig { /// panicking on a rejected settlement or a confirmation timeout (propagated through its /// `JoinHandle`). pub async fn run( - queue: AsyncQueue>, + queue: AsyncQueue>>, cfg: SettlementWorkerConfig, covenant: CovenantState, shutdown: AtomicAsyncLatch, From 87a355f2d0e4a6c8a37900bbc57d140fe45df73c Mon Sep 17 00:00:00 2001 From: 143672 Date: Wed, 17 Jun 2026 14:52:32 +0400 Subject: [PATCH 2/4] Cache proof receipts across the proving pipeline Wire the proof-receipt store into the transaction, batch, and aggregate provers so a replay (including a flip reorg back onto the same fork) reuses a cached receipt instead of re-proving. - Processor gains tx/batch/aggregator image-id accessors and an AggregatorArtifact type; artifacts are bounded Clone + Borsh so the scheduler can cache and restore them. - ScheduledTransaction and ScheduledBatch read/write their per-tx and per-batch receipts through the storage workers; ScheduledBundle owns the aggregate (settlement) receipt, keyed off its start coordinate with a batch as the storage gateway. - Receipts are keyed by (checkpoint, block_hash, image_id, ...) so competing forks stay distinct; the pruning worker drops a checkpoint's receipts across all forks and programs in one prefix scan. - The proving workers prove on a cache miss, wait for the receipt to be durable, then publish the artifact; the transaction worker proves inline so only one proof occupies the GPU at a time. ChainBlockMetadata supplies the block hash; a test-utils-gated BatchMetadata for u64 stands in for tests. Round-trips the tx/batch/agg receipt paths through the storage workers in a new scheduler test. --- Cargo.lock | 11 + core/types/Cargo.toml | 4 + core/types/src/batch_metadata.rs | 23 +- l1/types/Cargo.toml | 1 + l1/types/src/chain_block_metadata.rs | 7 + node/test-utils/src/vm.rs | 14 + node/vm/src/lib.rs | 14 + scheduling/scheduler/Cargo.toml | 3 + scheduling/scheduler/src/lib.rs | 2 +- scheduling/scheduler/src/processor.rs | 23 +- scheduling/scheduler/src/pruning_worker.rs | 10 + scheduling/scheduler/src/scheduled_batch.rs | 67 ++++- scheduling/scheduler/src/scheduled_bundle.rs | 43 +++ .../scheduler/src/scheduled_transaction.rs | 34 ++- scheduling/scheduler/src/storage_cmd.rs | 250 +++++++++++++++++- .../scheduler/tests/proof_receipt_cache.rs | 62 +++++ scheduling/test-utils/Cargo.toml | 2 +- scheduling/test-utils/src/processor.rs | 19 +- .../transaction-effects/Cargo.toml | 3 + .../transaction-effects/src/lib.rs | 3 + zk/aggregate-prover/Cargo.toml | 1 + zk/aggregate-prover/src/backend.rs | 8 +- zk/aggregate-prover/src/prover.rs | 1 + zk/aggregate-prover/src/worker.rs | 64 +++-- zk/backend/risc0/api/src/backend.rs | 8 +- zk/batch-prover/Cargo.toml | 1 + zk/batch-prover/src/backend.rs | 5 + zk/batch-prover/src/worker.rs | 46 ++-- zk/transaction-prover/Cargo.toml | 3 + zk/transaction-prover/src/backend.rs | 5 +- zk/transaction-prover/src/prover.rs | 6 +- zk/transaction-prover/src/worker.rs | 34 ++- zk/vm/src/backend.rs | 6 +- zk/vm/src/proving_pipeline.rs | 6 +- zk/vm/src/vm.rs | 13 + 35 files changed, 728 insertions(+), 74 deletions(-) create mode 100644 scheduling/scheduler/tests/proof_receipt_cache.rs diff --git a/Cargo.lock b/Cargo.lock index a32fbc27..1c36e236 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8577,6 +8577,7 @@ dependencies = [ "kaspa-rpc-core", "kaspa-txscript", "kaspa-wrpc-client", + "vprogs-core-types", "zerocopy", ] @@ -8731,12 +8732,14 @@ dependencies = [ "vprogs-scheduling-test-utils", "vprogs-state-batch-metadata", "vprogs-state-metadata", + "vprogs-state-proof-receipt", "vprogs-state-ptr-latest", "vprogs-state-ptr-rollback", "vprogs-state-version", "vprogs-storage-manager", "vprogs-storage-rocksdb-store", "vprogs-storage-types", + "zerocopy", ] [[package]] @@ -9043,6 +9046,9 @@ dependencies = [ [[package]] name = "vprogs-transaction-runtime-transaction-effects" version = "0.1.0" +dependencies = [ + "borsh", +] [[package]] name = "vprogs-zk-abi" @@ -9082,6 +9088,7 @@ dependencies = [ "vprogs-core-smt", "vprogs-l1-types", "vprogs-scheduling-scheduler", + "vprogs-state-proof-receipt", "vprogs-storage-types", "vprogs-zk-abi", "vprogs-zk-batch-prover", @@ -9203,6 +9210,7 @@ dependencies = [ "vprogs-core-macros", "vprogs-l1-types", "vprogs-scheduling-scheduler", + "vprogs-state-proof-receipt", "vprogs-storage-types", "vprogs-zk-abi", "vprogs-zk-transaction-prover", @@ -9213,11 +9221,14 @@ dependencies = [ name = "vprogs-zk-transaction-prover" version = "0.1.0" dependencies = [ + "borsh", "tap", "tokio", "vprogs-core-atomics", "vprogs-core-macros", + "vprogs-l1-types", "vprogs-scheduling-scheduler", + "vprogs-state-proof-receipt", "vprogs-storage-types", ] diff --git a/core/types/Cargo.toml b/core/types/Cargo.toml index 8cf9e124..010d2ca4 100644 --- a/core/types/Cargo.toml +++ b/core/types/Cargo.toml @@ -8,3 +8,7 @@ borsh = { workspace = true, default-features = false } rkyv = { workspace = true, default-features = false } vprogs-core-codec = { workspace = true } zerocopy = { workspace = true, features = ["derive"] } + +[features] +# Stand-in trait impls for tests only (e.g. `BatchMetadata for u64`); never enabled by the node. +test-utils = [] diff --git a/core/types/src/batch_metadata.rs b/core/types/src/batch_metadata.rs index 34d3194a..68019d80 100644 --- a/core/types/src/batch_metadata.rs +++ b/core/types/src/batch_metadata.rs @@ -4,14 +4,27 @@ use borsh::{BorshDeserialize, BorshSerialize}; /// Opaque metadata attached to each scheduler batch, supporting serialization. /// -/// Implementors derive `BorshSerialize` and `BorshDeserialize`; the trait is implemented -/// automatically via a blanket impl. +/// Implementors derive `BorshSerialize` and `BorshDeserialize` and supply the chain block hash +/// the batch was formed against via [`block_hash`](Self::block_hash), which keys the batch's +/// proof receipts in the proof-receipt store. pub trait BatchMetadata: BorshSerialize + BorshDeserialize + Clone + Debug + Default + Send + Sync + 'static { + /// The chain block hash this batch was formed against, as raw bytes. + /// + /// The proving workers fold it into each receipt's cache key so the same checkpoint index on + /// two competing chains yields distinct keys. + fn block_hash(&self) -> [u8; 32]; } -impl BatchMetadata for T where - T: BorshSerialize + BorshDeserialize + Clone + Debug + Default + Send + Sync + 'static -{ +/// A `u64` batch index doubles as minimal test metadata: its big-endian bytes stand in for a +/// block hash, keeping per-index receipts distinct without a real chain. Gated behind `test-utils` +/// so the node never treats a bare index as batch metadata. +#[cfg(feature = "test-utils")] +impl BatchMetadata for u64 { + fn block_hash(&self) -> [u8; 32] { + let mut bytes = [0u8; 32]; + bytes[..8].copy_from_slice(&self.to_be_bytes()); + bytes + } } diff --git a/l1/types/Cargo.toml b/l1/types/Cargo.toml index 76fcf0b3..f2d7b47f 100644 --- a/l1/types/Cargo.toml +++ b/l1/types/Cargo.toml @@ -10,4 +10,5 @@ kaspa-hashes.workspace = true kaspa-rpc-core.workspace = true kaspa-txscript.workspace = true kaspa-wrpc-client.workspace = true +vprogs-core-types.workspace = true zerocopy.workspace = true diff --git a/l1/types/src/chain_block_metadata.rs b/l1/types/src/chain_block_metadata.rs index cee11002..8a709460 100644 --- a/l1/types/src/chain_block_metadata.rs +++ b/l1/types/src/chain_block_metadata.rs @@ -1,5 +1,6 @@ use borsh::{BorshDeserialize, BorshSerialize}; use kaspa_rpc_core::{RpcHeader, RpcOptionalHeader}; +use vprogs_core_types::BatchMetadata; use crate::{Hash, SettlementInfo}; @@ -39,6 +40,12 @@ pub struct ChainBlockMetadata { pub last_settlement: Option, } +impl BatchMetadata for ChainBlockMetadata { + fn block_hash(&self) -> [u8; 32] { + self.hash.as_bytes() + } +} + impl From<&RpcHeader> for ChainBlockMetadata { /// Builds metadata from a regular RPC header, populating only the header-derived fields and /// leaving lane / parent-derived state at its default. diff --git a/node/test-utils/src/vm.rs b/node/test-utils/src/vm.rs index d52d22a5..b0baeb87 100644 --- a/node/test-utils/src/vm.rs +++ b/node/test-utils/src/vm.rs @@ -22,9 +22,23 @@ impl Processor for TestNodeVm { Ok(()) } + // This test VM does not prove, so its receipt-cache image ids are unset. + fn tx_image_id(&self) -> [u8; 32] { + [0u8; 32] + } + + fn batch_image_id(&self) -> [u8; 32] { + [0u8; 32] + } + + fn aggregator_image_id(&self) -> [u8; 32] { + [0u8; 32] + } + type Transaction = L1Transaction; type TransactionArtifact = (); type BatchArtifact = (); + type AggregatorArtifact = (); type BatchMetadata = ChainBlockMetadata; type Error = (); } diff --git a/node/vm/src/lib.rs b/node/vm/src/lib.rs index 05be3604..29a9ae74 100644 --- a/node/vm/src/lib.rs +++ b/node/vm/src/lib.rs @@ -18,9 +18,23 @@ impl Processor for VM { todo!("transaction execution from SchedulerTransaction") } + // The node VM does not yet drive proving, so its receipt-cache image ids are unset. + fn tx_image_id(&self) -> [u8; 32] { + [0u8; 32] + } + + fn batch_image_id(&self) -> [u8; 32] { + [0u8; 32] + } + + fn aggregator_image_id(&self) -> [u8; 32] { + [0u8; 32] + } + type Transaction = L1Transaction; type TransactionArtifact = TransactionEffects; type BatchArtifact = (); + type AggregatorArtifact = (); type BatchMetadata = ChainBlockMetadata; type Error = VmError; } diff --git a/scheduling/scheduler/Cargo.toml b/scheduling/scheduler/Cargo.toml index e67c3c7d..ac3b6885 100644 --- a/scheduling/scheduler/Cargo.toml +++ b/scheduling/scheduler/Cargo.toml @@ -22,13 +22,16 @@ vprogs-l1-types.workspace = true vprogs-scheduling-execution-workers.workspace = true vprogs-state-batch-metadata.workspace = true vprogs-state-metadata.workspace = true +vprogs-state-proof-receipt.workspace = true vprogs-state-ptr-latest.workspace = true vprogs-state-ptr-rollback.workspace = true vprogs-state-version.workspace = true vprogs-storage-manager.workspace = true vprogs-storage-types.workspace = true +zerocopy.workspace = true [dev-dependencies] +kaspa-hashes.workspace = true tempfile.workspace = true vprogs-core-test-utils.workspace = true vprogs-core-types.workspace = true diff --git a/scheduling/scheduler/src/lib.rs b/scheduling/scheduler/src/lib.rs index d5c7f0df..84b301bb 100644 --- a/scheduling/scheduler/src/lib.rs +++ b/scheduling/scheduler/src/lib.rs @@ -34,5 +34,5 @@ pub(crate) use scheduled_transaction::ScheduledTransactionRef; pub use scheduler::Scheduler; pub use state::SchedulerState; pub use state_diff::{StateDiff, StateDiffRef}; -pub use storage_cmd::{Read, Write}; +pub use storage_cmd::{Read, ReadReceipt, ReceiptRead, ReceiptValue, StoreReceipt, Write}; pub use transaction_context::TransactionContext; diff --git a/scheduling/scheduler/src/processor.rs b/scheduling/scheduler/src/processor.rs index e3ddeb08..d2c35644 100644 --- a/scheduling/scheduler/src/processor.rs +++ b/scheduling/scheduler/src/processor.rs @@ -1,3 +1,4 @@ +use borsh::{BorshDeserialize, BorshSerialize}; use vprogs_core_types::BatchMetadata; use vprogs_storage_types::Store; @@ -24,13 +25,29 @@ pub trait Processor: Clone + Sized + Send + Sync + 'static { // Default implementation does nothing (override if needed). } + /// Program identifier keying a per-transaction receipt in the proof-receipt store. + fn tx_image_id(&self) -> [u8; 32]; + + /// Program identifier keying a per-batch receipt in the proof-receipt store. + fn batch_image_id(&self) -> [u8; 32]; + + /// Program identifier keying an aggregated-bundle (settlement) receipt in the proof-receipt + /// store. + fn aggregator_image_id(&self) -> [u8; 32]; + /// The transaction payload type (e.g. kaspa `L1Transaction`, `usize` in tests). /// The scheduler wraps it in `SchedulerTransaction`. type Transaction: Send + Sync + 'static; /// Artifact produced by a processed transaction ([`ScheduledTransaction::publish_artifact`]). - type TransactionArtifact: Send + Sync + 'static; - /// Artifact produced by a processed batch ([`ScheduledBatch::publish_artifact`]). - type BatchArtifact: Send + Sync + 'static; + /// The `Borsh` bounds let the scheduler cache it in (and restore it from) the proof-receipt + /// store; `Clone` lets a served lookup hand the cached value back out of its shared slot. + type TransactionArtifact: Clone + BorshSerialize + BorshDeserialize + Send + Sync + 'static; + /// Artifact produced by a processed batch ([`ScheduledBatch::publish_artifact`]). Bounded for + /// the same proof-receipt caching as [`TransactionArtifact`](Self::TransactionArtifact). + type BatchArtifact: Clone + BorshSerialize + BorshDeserialize + Send + Sync + 'static; + /// Receipt produced by aggregating a bundle of per-batch receipts into one settlement receipt. + /// Bounded for the same proof-receipt caching as the other artifacts. + type AggregatorArtifact: Clone + BorshSerialize + BorshDeserialize + Send + Sync + 'static; /// Opaque metadata attached to each batch for persistence. type BatchMetadata: BatchMetadata; /// Error type returned when transaction processing fails. diff --git a/scheduling/scheduler/src/pruning_worker.rs b/scheduling/scheduler/src/pruning_worker.rs index eab77090..0910c618 100644 --- a/scheduling/scheduler/src/pruning_worker.rs +++ b/scheduling/scheduler/src/pruning_worker.rs @@ -12,6 +12,7 @@ use tokio::{runtime::Builder, sync::Notify}; use vprogs_core_types::{Checkpoint, ResourceId}; use vprogs_state_batch_metadata::BatchMetadata as StoredBatchMetadata; use vprogs_state_metadata::StateMetadata; +use vprogs_state_proof_receipt::{Prefix, invalidate_checkpoint}; use vprogs_state_ptr_rollback::StatePtrRollback; use vprogs_state_version::StateVersion; use vprogs_storage_types::Store; @@ -225,6 +226,15 @@ impl> PruningWorker { // Delete batch metadata entries for this batch. StoredBatchMetadata::delete(wb, index); + // Drop every cached proof receipt at this checkpoint index, across all programs + // and all competing forks. A pruned batch can never be rolled back to, so no + // future replay (even a flip reorg) needs its receipts again. + invalidate_checkpoint( + store.as_ref(), + wb, + &Prefix { checkpoint_index: index.into() }, + ); + // Prune stale SMT nodes for this version. store.prune(wb, index); } diff --git a/scheduling/scheduler/src/scheduled_batch.rs b/scheduling/scheduler/src/scheduled_batch.rs index 66020dc1..e391857e 100644 --- a/scheduling/scheduler/src/scheduled_batch.rs +++ b/scheduling/scheduler/src/scheduled_batch.rs @@ -8,15 +8,17 @@ use crossbeam_deque::{Injector, Steal, Worker}; use vprogs_core_atomics::AtomicAsyncLatch; use vprogs_core_macros::smart_pointer; use vprogs_core_smt::Commitment; -use vprogs_core_types::{Checkpoint, ResourceId, SchedulerTransaction}; +use vprogs_core_types::{BatchMetadata, Checkpoint, ResourceId, SchedulerTransaction}; use vprogs_scheduling_execution_workers::Batch; use vprogs_state_batch_metadata::BatchMetadata as StoredBatchMetadata; use vprogs_state_metadata::StateMetadata; +use vprogs_state_proof_receipt::{BatchKey, Prefix}; use vprogs_storage_types::Store; use crate::{ - CancellationContext, ScheduledTransaction, Scheduler, StateDiff, Write, cpu_task::ManagerTask, - processor::Processor, state::SchedulerState, + CancellationContext, Read, ReadReceipt, ReceiptRead, ScheduledTransaction, Scheduler, + StateDiff, StoreReceipt, Write, cpu_task::ManagerTask, processor::Processor, + state::SchedulerState, storage_cmd::ReceiptLookup, }; /// A batch of transactions progressing through the scheduler's lifecycle. @@ -30,6 +32,8 @@ use crate::{ pub struct ScheduledBatch> { /// Cancellation context captured at creation time for rollback detection. cancellation: CancellationContext, + /// Processor handle for deriving the program image ids that key this batch's proof receipts. + processor: P, /// Shared scheduler state for storage access and eviction. state: SchedulerState, /// This batch's sequential index and metadata. @@ -225,6 +229,62 @@ impl> ScheduledBatch { self } + /// Looks up this batch's cached per-batch receipt, returning a handle that resolves to the + /// deserialized receipt, or `None` on a cache miss. Served by the read worker so the caller + /// never blocks its async runtime on a store read. + pub fn read_batch_receipt(&self) -> ReceiptRead { + self.submit_read_receipt(self.batch_key()) + } + + /// Stores this batch's per-batch receipt through the write worker, returning a latch that opens + /// once it commits. Independent of the batch's own persistence latches. + pub fn write_batch_receipt(&self, receipt: P::BatchArtifact) -> AtomicAsyncLatch { + self.submit_store_receipt(self.batch_key(), receipt) + } + + /// The per-batch receipt key at this batch's coordinate. + fn batch_key(&self) -> BatchKey { + BatchKey { + prefix: Prefix { checkpoint_index: self.checkpoint.index().into() }, + block_hash: self.checkpoint.metadata().block_hash(), + image_id: self.processor.batch_image_id(), + } + } + + /// Aggregator program image id, keying a bundle's aggregate receipt (the bundle derives the + /// rest of the key from its own start coordinate). + pub(crate) fn aggregator_image_id(&self) -> [u8; 32] { + self.processor.aggregator_image_id() + } + + /// Submits a proof-receipt lookup for the typed `key` to the read worker, returning the typed + /// [`ReceiptRead`] handle the caller awaits. The key type determines the stored value's kind + /// and how the served value projects back to the concrete receipt. + pub(crate) fn submit_read_receipt>( + &self, + key: K, + ) -> ReceiptRead { + let (cmd, handle) = ReadReceipt::new(key.into_key(), K::extract); + self.state.storage().submit_read(Read::ReadReceipt(cmd)); + handle + } + + /// Submits `receipt` under the typed `key` to the write worker, returning a latch that opens + /// once it commits. The key type pins the receipt to its matching stored variant. + pub(crate) fn submit_store_receipt>( + &self, + key: K, + receipt: K::Artifact, + ) -> AtomicAsyncLatch { + let committed = AtomicAsyncLatch::new(); + self.state.storage().submit_write(Write::StoreReceipt(StoreReceipt::new( + key.into_key(), + K::wrap(receipt), + committed.clone(), + ))); + committed + } + /// Submits this batch for commit on the write worker. No-op if canceled. pub fn schedule_commit(&self) { if !self.canceled() { @@ -257,6 +317,7 @@ impl> ScheduledBatch { ScheduledBatchData { cancellation: scheduler.cancellation().clone(), + processor: scheduler.processor().clone(), state: scheduler.state().clone(), checkpoint, pending_txs: AtomicU64::new(txs.len() as u64), diff --git a/scheduling/scheduler/src/scheduled_bundle.rs b/scheduling/scheduler/src/scheduled_bundle.rs index 185c2314..3dd49949 100644 --- a/scheduling/scheduler/src/scheduled_bundle.rs +++ b/scheduling/scheduler/src/scheduled_bundle.rs @@ -5,6 +5,10 @@ use kaspa_hashes::Hash; use vprogs_core_atomics::AtomicAsyncLatch; use vprogs_core_macros::smart_pointer; use vprogs_l1_types::SettlementInfo; +use vprogs_state_proof_receipt::{AggregatorKey, Prefix}; +use vprogs_storage_types::Store; + +use crate::{ReceiptRead, ScheduledBatch, processor::Processor}; /// The L1 block span a bundle proves over. #[derive(Clone, Copy)] @@ -120,6 +124,45 @@ impl ScheduledBundle { self.latest_settlement } + /// Looks up this bundle's cached aggregate (settlement) receipt, returning a handle that + /// resolves to the deserialized receipt, or `None` on a cache miss. `gateway` is any batch in + /// the bundle: the aggregate prover holds no store of its own, so it reaches the receipt cache + /// through a batch's storage handle. + pub fn read_agg_receipt>( + &self, + gateway: &ScheduledBatch, + seq_commit: [u8; 32], + ) -> ReceiptRead { + gateway.submit_read_receipt(self.agg_key(gateway, seq_commit)) + } + + /// Stores this bundle's aggregate (settlement) receipt through the write worker, returning a + /// latch that opens once it commits. `gateway` supplies the storage handle, as for + /// [`read_agg_receipt`](Self::read_agg_receipt). + pub fn write_agg_receipt>( + &self, + gateway: &ScheduledBatch, + seq_commit: [u8; 32], + receipt: P::AggregatorArtifact, + ) -> AtomicAsyncLatch { + gateway.submit_store_receipt(self.agg_key(gateway, seq_commit), receipt) + } + + /// The aggregate receipt key at this bundle's start coordinate (first checkpoint index + block) + /// and claimed tip `seq_commit`; `gateway` supplies the aggregator image id. + fn agg_key>( + &self, + gateway: &ScheduledBatch, + seq_commit: [u8; 32], + ) -> AggregatorKey { + AggregatorKey { + prefix: Prefix { checkpoint_index: self.checkpoint_index.into() }, + block_hash: self.from_block.as_bytes(), + image_id: gateway.aggregator_image_id(), + seq_commit, + } + } + /// Publishes the bundle's artifact and opens the `artifact_published` latch. A `None` artifact /// resolves the handle as a no-op (the consumer skips it). pub fn publish_artifact(&self, artifact: Option) { diff --git a/scheduling/scheduler/src/scheduled_transaction.rs b/scheduling/scheduler/src/scheduled_transaction.rs index 3ca8e93e..114634b2 100644 --- a/scheduling/scheduler/src/scheduled_transaction.rs +++ b/scheduling/scheduler/src/scheduled_transaction.rs @@ -4,13 +4,15 @@ use std::sync::{ }; use arc_swap::ArcSwapOption; +use vprogs_core_atomics::AtomicAsyncLatch; use vprogs_core_macros::smart_pointer; -use vprogs_core_types::SchedulerTransaction; +use vprogs_core_types::{BatchMetadata, SchedulerTransaction}; +use vprogs_state_proof_receipt::{Prefix, TxKey}; use vprogs_storage_types::Store; use crate::{ - AccessHandle, ResourceAccess, ScheduledBatchRef, Scheduler, StateDiff, TransactionContext, - processor::Processor, + AccessHandle, ReceiptRead, ResourceAccess, ScheduledBatch, ScheduledBatchRef, Scheduler, + StateDiff, TransactionContext, processor::Processor, }; /// A transaction progressing through the scheduler's execution pipeline. @@ -54,6 +56,32 @@ impl> ScheduledTransaction { self.artifact.load_full().expect("artifact not ready") } + /// Looks up this transaction's cached receipt, returning a handle that resolves to the + /// deserialized receipt, or `None` on a cache miss. Served by the read worker through the + /// owning batch's storage handle. + pub fn read_tx_receipt(&self) -> ReceiptRead { + let batch = self.batch.upgrade().expect("owning batch dropped"); + batch.submit_read_receipt(self.tx_key(&batch)) + } + + /// Stores this transaction's receipt through the owning batch's write worker, returning a latch + /// that opens once it commits. + pub fn write_tx_receipt(&self, receipt: P::TransactionArtifact) -> AtomicAsyncLatch { + let batch = self.batch.upgrade().expect("owning batch dropped"); + batch.submit_store_receipt(self.tx_key(&batch), receipt) + } + + /// The per-transaction receipt key at the owning batch's coordinate and this transaction's + /// merge index. + fn tx_key(&self, batch: &ScheduledBatch) -> TxKey { + TxKey { + prefix: Prefix { checkpoint_index: batch.checkpoint().index().into() }, + block_hash: batch.checkpoint().metadata().block_hash(), + image_id: self.processor.tx_image_id(), + merge_idx: self.merge_idx.into(), + } + } + /// Publishes this transaction's artifact. `None` skips without storing an artifact. pub fn publish_artifact(&self, artifact: Option) { if let Some(artifact) = artifact { diff --git a/scheduling/scheduler/src/storage_cmd.rs b/scheduling/scheduler/src/storage_cmd.rs index bb883403..3eaf2d4e 100644 --- a/scheduling/scheduler/src/storage_cmd.rs +++ b/scheduling/scheduler/src/storage_cmd.rs @@ -1,18 +1,262 @@ +use std::sync::Arc; + +use arc_swap::ArcSwapOption; +use vprogs_core_atomics::AtomicAsyncLatch; +use vprogs_state_proof_receipt::{AggregatorKey, BatchKey, ReceiptKey, TxKey}; use vprogs_storage_manager::{ReadCmd, WriteCmd}; -use vprogs_storage_types::{ReadStore, Store}; +use vprogs_storage_types::{ReadStore, StateSpace, Store, WriteBatch}; +use zerocopy::IntoBytes; use crate::{ResourceAccess, ScheduledBatch, StateDiff, processor::Processor, rollback::Rollback}; +/// A typed proof-receipt key: the concrete coordinate for one receipt kind. +/// +/// Each variant carries its actual key struct, so the variant alone pins both the stored key bytes +/// ([`as_bytes`](Self::as_bytes)) and the [`ReceiptValue`] variant a stored blob deserializes into. +/// The storage workers carry a `Key` instead of raw bytes plus a separate kind tag, so the two can +/// never drift apart. +pub(crate) enum Key { + /// A per-transaction receipt key. + Tx(TxKey), + /// A per-batch receipt key. + Batch(BatchKey), + /// An aggregated-bundle (settlement) receipt key. + Agg(AggregatorKey), +} + +impl Key { + /// The stored key bytes, starting with the shared `Prefix`. + fn as_bytes(&self) -> &[u8] { + match self { + Key::Tx(key) => key.as_bytes(), + Key::Batch(key) => key.as_bytes(), + Key::Agg(key) => key.as_bytes(), + } + } +} + +/// A typed proof-receipt value, tagged by the kind of receipt it is. +/// +/// The variant pins the value to the matching [`Processor`] artifact type, so a caller cannot store +/// a transaction receipt under a batch key (or vice versa). The storage workers (de)serialize +/// whichever variant they hold; every receipt kind shares the [`StateSpace::ProofReceipt`] column +/// family. +pub enum ReceiptValue> { + /// A per-transaction receipt. + Tx(P::TransactionArtifact), + /// A per-batch receipt. + Batch(P::BatchArtifact), + /// An aggregated-bundle (settlement) receipt. + Agg(P::AggregatorArtifact), +} + +impl> ReceiptValue { + /// Serializes the held receipt to its stored byte form. + fn to_bytes(&self) -> Vec { + match self { + ReceiptValue::Tx(receipt) => borsh::to_vec(receipt), + ReceiptValue::Batch(receipt) => borsh::to_vec(receipt), + ReceiptValue::Agg(receipt) => borsh::to_vec(receipt), + } + .expect("failed to serialize receipt") + } + + /// Deserializes stored bytes into the variant named by `key`. + fn from_bytes(key: &Key, bytes: &[u8]) -> Self { + const CORRUPT: &str = "corrupted proof-receipt store"; + match key { + Key::Tx(_) => ReceiptValue::Tx(borsh::from_slice(bytes).expect(CORRUPT)), + Key::Batch(_) => ReceiptValue::Batch(borsh::from_slice(bytes).expect(CORRUPT)), + Key::Agg(_) => ReceiptValue::Agg(borsh::from_slice(bytes).expect(CORRUPT)), + } + } + + /// Moves out the per-transaction receipt. Panics if the value is not a [`Tx`](Self::Tx); the + /// issuer's lookup kind guarantees the variant. + pub(crate) fn into_tx(self) -> P::TransactionArtifact { + match self { + ReceiptValue::Tx(receipt) => receipt, + _ => unreachable!("receipt kind mismatch"), + } + } + + /// Moves out the per-batch receipt. Panics if the value is not a [`Batch`](Self::Batch). + pub(crate) fn into_batch(self) -> P::BatchArtifact { + match self { + ReceiptValue::Batch(receipt) => receipt, + _ => unreachable!("receipt kind mismatch"), + } + } + + /// Moves out the aggregated-bundle receipt. Panics if the value is not an [`Agg`](Self::Agg). + pub(crate) fn into_agg(self) -> P::AggregatorArtifact { + match self { + ReceiptValue::Agg(receipt) => receipt, + _ => unreachable!("receipt kind mismatch"), + } + } +} + +/// Ties a proof-receipt key type to the receipt it stores: a single typed key erases into the +/// storage workers' [`Key`] ([`into_key`](Self::into_key)) and drives the handle's projection back +/// to the artifact ([`extract`](Self::extract)) and the write worker's serialization +/// ([`wrap`](Self::wrap)). One impl per key type is the single source of truth pairing a key with +/// its [`ReceiptValue`] variant. +pub(crate) trait ReceiptLookup>: ReceiptKey { + /// The concrete receipt artifact this key stores. + type Artifact; + /// Erases this key into the type-tagged [`Key`] the storage workers carry. + fn into_key(self) -> Key; + /// Projects a served [`ReceiptValue`] to the artifact. + fn extract(value: ReceiptValue) -> Self::Artifact; + /// Wraps an artifact into its [`ReceiptValue`] variant for storage. + fn wrap(artifact: Self::Artifact) -> ReceiptValue; +} + +impl> ReceiptLookup for TxKey { + type Artifact = P::TransactionArtifact; + fn into_key(self) -> Key { + Key::Tx(self) + } + fn extract(value: ReceiptValue) -> Self::Artifact { + value.into_tx() + } + fn wrap(artifact: Self::Artifact) -> ReceiptValue { + ReceiptValue::Tx(artifact) + } +} + +impl> ReceiptLookup for BatchKey { + type Artifact = P::BatchArtifact; + fn into_key(self) -> Key { + Key::Batch(self) + } + fn extract(value: ReceiptValue) -> Self::Artifact { + value.into_batch() + } + fn wrap(artifact: Self::Artifact) -> ReceiptValue { + ReceiptValue::Batch(artifact) + } +} + +impl> ReceiptLookup for AggregatorKey { + type Artifact = P::AggregatorArtifact; + fn into_key(self) -> Key { + Key::Agg(self) + } + fn extract(value: ReceiptValue) -> Self::Artifact { + value.into_agg() + } + fn wrap(artifact: Self::Artifact) -> ReceiptValue { + ReceiptValue::Agg(artifact) + } +} + +/// A proof-receipt lookup served by the read worker, off the issuer's async runtime. +/// +/// The read worker deserializes the stored blob into `slot` as the [`ReceiptValue`] named by `key` +/// (left empty on a cache miss) and opens `ready`, so the proving worker that enqueued it awaits +/// the result via [`ReceiptRead`] instead of blocking its single-threaded runtime on a synchronous +/// store read. +pub struct ReadReceipt> { + /// The typed key identifying the receipt to look up. + key: Key, + /// Filled with the served [`ReceiptValue`], or left empty on a cache miss. + slot: Arc>>, + /// Opens once the lookup has been served. + ready: AtomicAsyncLatch, +} + +/// Handle to an in-flight [`ReadReceipt`]: await [`resolve`](Self::resolve) for the typed receipt +/// `R`, or `None` on a cache miss. `extract` projects the served [`ReceiptValue`] to the issuer's +/// concrete artifact type. +pub struct ReceiptRead, R> { + /// Shared slot the read worker fills with the served [`ReceiptValue`]. + slot: Arc>>, + /// Opens once the lookup has been served. + ready: AtomicAsyncLatch, + /// Projects the served [`ReceiptValue`] to the issuer's concrete artifact type. + extract: fn(ReceiptValue) -> R, +} + +impl> ReadReceipt { + /// Creates a lookup command for the typed `key` together with the typed [`ReceiptRead`] handle + /// its issuer awaits, projecting the served value through `extract`. + pub(crate) fn new( + key: Key, + extract: fn(ReceiptValue) -> R, + ) -> (Self, ReceiptRead) { + let slot = Arc::new(ArcSwapOption::empty()); + let ready = AtomicAsyncLatch::new(); + let handle = ReceiptRead { slot: slot.clone(), ready: ready.clone(), extract }; + (Self { key, slot, ready }, handle) + } + + /// Serves the lookup: fills `slot` from the store (a miss leaves it empty), then opens `ready`. + fn read(&self, store: &RS) { + if let Some(bytes) = store.get(StateSpace::ProofReceipt, self.key.as_bytes()) { + self.slot.store(Some(Arc::new(ReceiptValue::from_bytes(&self.key, &bytes)))); + } + self.ready.open(); + } +} + +impl, R> ReceiptRead { + /// Resolves once the read worker has served the lookup: the typed receipt, or `None` on a cache + /// miss. + pub async fn resolve(self) -> Option { + self.ready.wait().await; + let value = self.slot.swap(None)?; + let value = Arc::into_inner(value).expect("receipt slot uniquely held"); + Some((self.extract)(value)) + } +} + +/// A typed proof-receipt awaiting durable storage in the [`StateSpace::ProofReceipt`] column +/// family. +/// +/// Carries the key bytes, the typed [`ReceiptValue`] (serialized by the write worker), and a latch +/// the worker opens once the write commits, so the proving worker that enqueued it can wait for +/// durability before publishing the matching artifact. +pub struct StoreReceipt> { + /// The typed key the receipt is stored under. + key: Key, + /// The typed receipt, serialized by the write worker. + value: ReceiptValue, + /// Opens once the write commits. + committed: AtomicAsyncLatch, +} + +impl> StoreReceipt { + /// Pairs the typed `key` and `value` with the latch opened once the write commits. + pub(crate) fn new(key: Key, value: ReceiptValue, committed: AtomicAsyncLatch) -> Self { + Self { key, value, committed } + } + + /// Writes the serialized receipt into `wb` under its key. + fn write(&self, wb: &mut W) { + wb.put(StateSpace::ProofReceipt, self.key.as_bytes(), &self.value.to_bytes()); + } + + /// Opens the `committed` latch once the write has landed. + fn done(self) { + self.committed.open(); + } +} + /// Commands dispatched to the storage manager's read worker. pub enum Read> { /// Fetch the latest version data for a resource from disk. LatestData(ResourceAccess), + /// Look up a cached typed proof-receipt by key. + ReadReceipt(ReadReceipt), } impl> ReadCmd for Read { fn exec(&self, store: &RS) { match self { Read::LatestData(resource_access) => resource_access.read_latest_data(store), + Read::ReadReceipt(receipt) => receipt.read(store), } } } @@ -25,6 +269,8 @@ pub enum Write> { CommitBatch(ScheduledBatch), /// Revert all batches after a target checkpoint. Rollback(Rollback), + /// Persist a typed proof-receipt in the proof-receipt column family. + StoreReceipt(StoreReceipt), } impl> WriteCmd for Write { @@ -33,6 +279,7 @@ impl> WriteCmd for Write { Write::StateDiff(state_diff) => state_diff.write(&mut wb), Write::CommitBatch(batch) => batch.commit(store, &mut wb), Write::Rollback(rollback) => return rollback.execute(store, wb), + Write::StoreReceipt(receipt) => receipt.write(&mut wb), } wb } @@ -42,6 +289,7 @@ impl> WriteCmd for Write { Write::StateDiff(state_diff) => state_diff.write_done(), Write::CommitBatch(batch) => batch.commit_done(), Write::Rollback(rollback) => rollback.done(), + Write::StoreReceipt(receipt) => receipt.done(), } } } diff --git a/scheduling/scheduler/tests/proof_receipt_cache.rs b/scheduling/scheduler/tests/proof_receipt_cache.rs new file mode 100644 index 00000000..7f18deaa --- /dev/null +++ b/scheduling/scheduler/tests/proof_receipt_cache.rs @@ -0,0 +1,62 @@ +//! Round-trips proof-receipt blobs through the storage read/write workers via the +//! [`ScheduledBatch`] and [`ScheduledBundle`] handles, covering the cache-hit path the dev-mode +//! proving sims don't reach (they only ever miss-then-store, never replay a fork to read a stored +//! receipt back). + +use kaspa_hashes::Hash; +use tempfile::TempDir; +use vprogs_core_types::BatchMetadata; +use vprogs_scheduling_scheduler::{BundleBlocks, ExecutionConfig, ScheduledBundle, Scheduler}; +use vprogs_scheduling_test_utils::Processor; +use vprogs_storage_manager::StorageConfig; +use vprogs_storage_rocksdb_store::RocksDbStore; + +#[test] +fn proof_receipt_round_trips_through_storage_workers() { + let temp_dir = TempDir::new().expect("failed to create temp dir"); + let storage: RocksDbStore = RocksDbStore::open(temp_dir.path()); + let mut scheduler = Scheduler::new( + ExecutionConfig::default().with_processor(Processor), + StorageConfig::default().with_store(storage), + ); + + // An empty batch yields a live handle whose storage manager runs both workers; the receipt + // column family is independent of the batch's own (empty) state. The batch derives its own + // per-batch receipt key from its checkpoint coordinate and the processor's image id. + let batch = scheduler.schedule(1, vec![]); + + // A single-batch bundle sharing the batch's start coordinate; the batch is its storage gateway + // for the aggregate receipt. + let block = Hash::from_bytes(batch.checkpoint().metadata().block_hash()); + let bundle: ScheduledBundle<()> = ScheduledBundle::new( + 1, + batch.checkpoint().index(), + BundleBlocks { from_block: block, block_prove_to: block }, + None, + ); + + let rt = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .expect("failed to build tokio runtime"); + rt.block_on(async { + let batch_receipt = vec![9u8; 64]; + + // Cache miss before anything is stored. + assert!(batch.read_batch_receipt().resolve().await.is_none()); + + // Store the receipt, wait for the write worker to commit it, then read it back: the + // read worker serves the exact receipt (the flip-reorg acceleration path). + batch.write_batch_receipt(batch_receipt.clone()).wait().await; + assert_eq!(batch.read_batch_receipt().resolve().await, Some(batch_receipt)); + + // The aggregate receipt at the same coordinate keys differently (it is a distinct kind), so + // it misses despite the stored batch receipt, then round-trips on its own key. + let agg_receipt = vec![7u8; 32]; + assert!(bundle.read_agg_receipt(&batch, [0u8; 32]).resolve().await.is_none()); + bundle.write_agg_receipt(&batch, [0u8; 32], agg_receipt.clone()).wait().await; + assert_eq!(bundle.read_agg_receipt(&batch, [0u8; 32]).resolve().await, Some(agg_receipt)); + }); + + scheduler.shutdown(); +} diff --git a/scheduling/test-utils/Cargo.toml b/scheduling/test-utils/Cargo.toml index 44fe2235..791d2e8c 100644 --- a/scheduling/test-utils/Cargo.toml +++ b/scheduling/test-utils/Cargo.toml @@ -4,7 +4,7 @@ name = "vprogs-scheduling-test-utils" version.workspace = true [dependencies] -vprogs-core-types.workspace = true +vprogs-core-types = { workspace = true, features = ["test-utils"] } vprogs-scheduling-scheduler.workspace = true vprogs-state-version.workspace = true vprogs-storage-rocksdb-store.workspace = true diff --git a/scheduling/test-utils/src/processor.rs b/scheduling/test-utils/src/processor.rs index f0848a01..896dc189 100644 --- a/scheduling/test-utils/src/processor.rs +++ b/scheduling/test-utils/src/processor.rs @@ -20,9 +20,24 @@ impl vprogs_scheduling_scheduler::Processor for Processor { Ok(()) } + // This processor never proves, so its image ids only need to be stable for receipt-cache + // round-trips, not match any real program. + fn tx_image_id(&self) -> [u8; 32] { + [0u8; 32] + } + + fn batch_image_id(&self) -> [u8; 32] { + [1u8; 32] + } + + fn aggregator_image_id(&self) -> [u8; 32] { + [2u8; 32] + } + type Transaction = usize; - type TransactionArtifact = (); - type BatchArtifact = (); + type TransactionArtifact = Vec; + type BatchArtifact = Vec; + type AggregatorArtifact = Vec; type BatchMetadata = u64; type Error = (); } diff --git a/transaction-runtime/transaction-effects/Cargo.toml b/transaction-runtime/transaction-effects/Cargo.toml index a905f9a0..386d7923 100644 --- a/transaction-runtime/transaction-effects/Cargo.toml +++ b/transaction-runtime/transaction-effects/Cargo.toml @@ -2,3 +2,6 @@ edition.workspace = true name = "vprogs-transaction-runtime-transaction-effects" version.workspace = true + +[dependencies] +borsh.workspace = true diff --git a/transaction-runtime/transaction-effects/src/lib.rs b/transaction-runtime/transaction-effects/src/lib.rs index fc7acb68..a6c91728 100644 --- a/transaction-runtime/transaction-effects/src/lib.rs +++ b/transaction-runtime/transaction-effects/src/lib.rs @@ -1 +1,4 @@ +use borsh::{BorshDeserialize, BorshSerialize}; + +#[derive(Clone, Debug, Default, BorshSerialize, BorshDeserialize)] pub struct TransactionEffects {} diff --git a/zk/aggregate-prover/Cargo.toml b/zk/aggregate-prover/Cargo.toml index c37846ba..8a985315 100644 --- a/zk/aggregate-prover/Cargo.toml +++ b/zk/aggregate-prover/Cargo.toml @@ -15,6 +15,7 @@ vprogs-core-macros.workspace = true vprogs-core-smt.workspace = true vprogs-l1-types.workspace = true vprogs-scheduling-scheduler.workspace = true +vprogs-state-proof-receipt.workspace = true vprogs-storage-types.workspace = true vprogs-zk-abi = { workspace = true, features = ["host"] } vprogs-zk-batch-prover.workspace = true diff --git a/zk/aggregate-prover/src/backend.rs b/zk/aggregate-prover/src/backend.rs index 1a0b720a..02cd84d5 100644 --- a/zk/aggregate-prover/src/backend.rs +++ b/zk/aggregate-prover/src/backend.rs @@ -13,7 +13,9 @@ pub trait Backend: vprogs_zk_batch_prover::Backend { batch_receipts: Vec, ) -> impl Future + Send + 'static; - /// Trusted per-batch (batch-processor) image id, written into the aggregator inputs so the - /// guest can verify each composed batch journal. - fn batch_image_id(&self) -> &[u8; 32]; + /// Aggregator image id: the program identifier that keys a settlement (bundle) receipt in the + /// proof-receipt store. The trusted batch image the aggregator verifies its composed batch + /// journals against is [`batch_image_id`](vprogs_zk_batch_prover::Backend::batch_image_id), + /// inherited from the per-batch backend. + fn aggregator_image_id(&self) -> &[u8; 32]; } diff --git a/zk/aggregate-prover/src/prover.rs b/zk/aggregate-prover/src/prover.rs index a33d8143..b1c7ed54 100644 --- a/zk/aggregate-prover/src/prover.rs +++ b/zk/aggregate-prover/src/prover.rs @@ -39,6 +39,7 @@ impl> AggregateProver { S, TransactionArtifact = B::Receipt, BatchArtifact = B::Receipt, + AggregatorArtifact = B::Receipt, BatchMetadata = ChainBlockMetadata, >, { diff --git a/zk/aggregate-prover/src/worker.rs b/zk/aggregate-prover/src/worker.rs index 432205d4..65d14484 100644 --- a/zk/aggregate-prover/src/worker.rs +++ b/zk/aggregate-prover/src/worker.rs @@ -51,6 +51,7 @@ where S, TransactionArtifact = B::Receipt, BatchArtifact = B::Receipt, + AggregatorArtifact = B::Receipt, BatchMetadata = ChainBlockMetadata, >, { @@ -195,27 +196,48 @@ where ); self.emit(handle.clone()); - // Aggregate the bundle: fetch the final block's lane proof, encode the aggregator inputs - // over the per-batch journals, and prove with the per-batch receipts as composition - // assumptions. - let lane_proof = self - .lane_source - .fetch_lane_proof(LaneProofRequest { block: block_prove_to, lane_key: self.lane_key }) - .await; - let journals: Vec> = receipts.iter().map(|r| B::journal_bytes(r)).collect(); - let inputs = AggregatorInputs::encode( - self.backend.batch_image_id(), - &lane_proof, - journals.iter().map(|j| j.as_slice()), - ); - let receipt = self.backend.prove_aggregator(&inputs, receipts).await; - if self.prover.shutdown.is_open() { - // Shutting down: resolve the published handle as a no-op so a consumer awaiting its - // artifact is released rather than blocked on a latch that never opens, and drop the - // proved bundle (the same discard-on-shutdown behavior as before). - handle.publish_artifact(None); - return true; - } + // The bundle's `from -> to` coordinate (its start checkpoint + block, claimed tip + // commitment) proves to the same settlement receipt, so a replay (including a flip reorg + // back onto this fork) reuses the cached one instead of re-fetching the lane proof and + // re-proving. The bundle keys the receipt off its own start coordinate and the claimed tip + // `seq_commit`; its first batch is the storage gateway (the aggregate prover holds no store + // of its own) and supplies the aggregator image id. + let seq_commit = last_metadata.seq_commit.as_bytes(); + let first_batch = bundle.first().unwrap(); + let receipt = match handle.read_agg_receipt(first_batch, seq_commit).resolve().await { + Some(receipt) => receipt, + None => { + // Aggregate the bundle: fetch the final block's lane proof, encode the aggregator + // inputs over the per-batch journals, and prove with the per-batch receipts as + // composition assumptions. + let lane_proof = self + .lane_source + .fetch_lane_proof(LaneProofRequest { + block: block_prove_to, + lane_key: self.lane_key, + }) + .await; + let journals: Vec> = receipts.iter().map(|r| B::journal_bytes(r)).collect(); + let inputs = AggregatorInputs::encode( + self.backend.batch_image_id(), + &lane_proof, + journals.iter().map(|j| j.as_slice()), + ); + let receipt = self.backend.prove_aggregator(&inputs, receipts).await; + if self.prover.shutdown.is_open() { + // Shutting down: resolve the published handle as a no-op so a consumer awaiting + // its artifact is released rather than blocked on a latch that never opens, and + // drop the proved bundle (the same discard-on-shutdown behavior as before). + handle.publish_artifact(None); + return true; + } + + // Wait for the receipt to be durable before publishing the artifact, so a crash + // never leaves a consumed-but-uncached settlement receipt. + handle.write_agg_receipt(first_batch, seq_commit, receipt.clone()).wait().await; + receipt + } + }; // Parse the settlement journal. let journal = B::journal_bytes(&receipt); diff --git a/zk/backend/risc0/api/src/backend.rs b/zk/backend/risc0/api/src/backend.rs index c29ef26f..05a59565 100644 --- a/zk/backend/risc0/api/src/backend.rs +++ b/zk/backend/risc0/api/src/backend.rs @@ -132,6 +132,10 @@ impl vprogs_zk_batch_prover::Backend for Backend { fn journal_bytes(receipt: &Receipt) -> Vec { receipt.journal.bytes.clone() } + + fn batch_image_id(&self) -> &[u8; 32] { + &self.batch_processor.id + } } impl vprogs_zk_aggregate_prover::Backend for Backend { @@ -163,7 +167,7 @@ impl vprogs_zk_aggregate_prover::Backend for Backend { })) } - fn batch_image_id(&self) -> &[u8; 32] { - &self.batch_processor.id + fn aggregator_image_id(&self) -> &[u8; 32] { + &self.aggregator.id } } diff --git a/zk/batch-prover/Cargo.toml b/zk/batch-prover/Cargo.toml index c05ed8d4..87c8da88 100644 --- a/zk/batch-prover/Cargo.toml +++ b/zk/batch-prover/Cargo.toml @@ -13,6 +13,7 @@ vprogs-core-atomics.workspace = true vprogs-core-macros.workspace = true vprogs-l1-types.workspace = true vprogs-scheduling-scheduler.workspace = true +vprogs-state-proof-receipt.workspace = true vprogs-storage-types.workspace = true vprogs-zk-abi = { workspace = true, features = ["host"] } vprogs-zk-transaction-prover.workspace = true diff --git a/zk/batch-prover/src/backend.rs b/zk/batch-prover/src/backend.rs index d12ce85f..c6de6469 100644 --- a/zk/batch-prover/src/backend.rs +++ b/zk/batch-prover/src/backend.rs @@ -13,4 +13,9 @@ pub trait Backend: vprogs_zk_transaction_prover::Backend { /// Extracts journal bytes from a receipt. fn journal_bytes(receipt: &Self::Receipt) -> Vec; + + /// Trusted batch-processor image id: the program identifier that keys a per-batch receipt in + /// the proof-receipt store, and the trusted image the aggregator verifies each composed batch + /// journal against. + fn batch_image_id(&self) -> &[u8; 32]; } diff --git a/zk/batch-prover/src/worker.rs b/zk/batch-prover/src/worker.rs index 8c3e7c4b..9d6b1dca 100644 --- a/zk/batch-prover/src/worker.rs +++ b/zk/batch-prover/src/worker.rs @@ -79,25 +79,39 @@ where return; } - // Collect SMT proof at the version preceding the batch's checkpoint. - let prev_version = batch.checkpoint().index().saturating_sub(1); - let proof_bytes = self.store.prove(&batch.resource_ids(), prev_version).expect("proof"); + // The (checkpoint, block, batch-image) coordinate proves to the same per-batch receipt, so + // a replay (including a flip reorg back onto this fork) reuses the cached one instead of + // re-collecting the SMT proof and re-proving. + let receipt = match batch.read_batch_receipt().resolve().await { + Some(receipt) => receipt, + None => { + // Collect SMT proof at the version preceding the batch's checkpoint. + let prev_version = batch.checkpoint().index().saturating_sub(1); + let proof_bytes = + self.store.prove(&batch.resource_ids(), prev_version).expect("proof"); - // One pass: per-tx journal bytes (inputs) + receipt clones (proof composition). - let (journals, receipts): (Vec<_>, Vec<_>) = - batch.tx_artifacts().map(|a| (B::journal_bytes(&a), (*a).clone())).unzip(); + // One pass: per-tx journal bytes (inputs) + receipt clones (proof composition). + let (journals, receipts): (Vec<_>, Vec<_>) = + batch.tx_artifacts().map(|a| (B::journal_bytes(&a), (*a).clone())).unzip(); - // Encode the inputs for the proof. - let covenant_id = self.config.covenant_id.map(|h| h.as_bytes()).unwrap_or_default(); - let input_bytes = BatchInputs::encode( - (self.backend.image_id(), &covenant_id, &self.config.lane_key), - &proof_bytes, - batch.checkpoint().metadata(), - &journals, - ); + // Encode the inputs for the proof. + let covenant_id = self.config.covenant_id.map(|h| h.as_bytes()).unwrap_or_default(); + let input_bytes = BatchInputs::encode( + (self.backend.image_id(), &covenant_id, &self.config.lane_key), + &proof_bytes, + batch.checkpoint().metadata(), + &journals, + ); - // Compose the batch proof against those tx receipts. - let receipt = self.backend.prove_batch(&input_bytes, receipts).await; + // Compose the batch proof against those tx receipts. + let receipt = self.backend.prove_batch(&input_bytes, receipts).await; + + // Wait for the receipt to be durable before publishing the artifact, so a crash + // never leaves a consumed-but-uncached receipt. + batch.write_batch_receipt(receipt.clone()).wait().await; + receipt + } + }; // Publish the receipt as the batch's artifact. batch.publish_artifact(Some(receipt)); diff --git a/zk/transaction-prover/Cargo.toml b/zk/transaction-prover/Cargo.toml index 3a3ddc13..c6ec57f1 100644 --- a/zk/transaction-prover/Cargo.toml +++ b/zk/transaction-prover/Cargo.toml @@ -4,9 +4,12 @@ name = "vprogs-zk-transaction-prover" version.workspace = true [dependencies] +borsh.workspace = true tap.workspace = true tokio = { workspace = true, features = ["rt"] } vprogs-core-atomics.workspace = true vprogs-core-macros.workspace = true +vprogs-l1-types.workspace = true vprogs-scheduling-scheduler.workspace = true +vprogs-state-proof-receipt.workspace = true vprogs-storage-types.workspace = true diff --git a/zk/transaction-prover/src/backend.rs b/zk/transaction-prover/src/backend.rs index 8542c673..d9a418b5 100644 --- a/zk/transaction-prover/src/backend.rs +++ b/zk/transaction-prover/src/backend.rs @@ -2,8 +2,9 @@ use std::future::Future; /// ZK backend for transaction proving. pub trait Backend: Clone + Send + Sync + 'static { - /// Proof receipt type produced by this backend. - type Receipt: Clone + Send + Sync + 'static; + /// Proof receipt type produced by this backend. The `Borsh` bounds let the proving workers + /// cache it in (and restore it from) the proof-receipt store. + type Receipt: Clone + Send + Sync + borsh::BorshSerialize + borsh::BorshDeserialize + 'static; /// Returns the guest image ID. fn image_id(&self) -> &[u8; 32]; diff --git a/zk/transaction-prover/src/prover.rs b/zk/transaction-prover/src/prover.rs index df4c1164..5fd155c0 100644 --- a/zk/transaction-prover/src/prover.rs +++ b/zk/transaction-prover/src/prover.rs @@ -5,6 +5,7 @@ use std::{ use vprogs_core_atomics::{AsyncQueue, AtomicAsyncLatch}; use vprogs_core_macros::smart_pointer; +use vprogs_l1_types::ChainBlockMetadata; use vprogs_scheduling_scheduler::{Processor, ScheduledTransaction}; use vprogs_storage_types::Store; @@ -23,7 +24,10 @@ pub struct TransactionProver> { impl> TransactionProver { /// Creates a new transaction prover and spawns its worker thread. - pub fn new>(backend: B) -> Self { + pub fn new>(backend: B) -> Self + where + P: Processor, + { let prover = Self(Arc::new(TransactionProverData { inbox: AsyncQueue::new(), shutdown: AtomicAsyncLatch::new(), diff --git a/zk/transaction-prover/src/worker.rs b/zk/transaction-prover/src/worker.rs index 8e407663..d9fce1f9 100644 --- a/zk/transaction-prover/src/worker.rs +++ b/zk/transaction-prover/src/worker.rs @@ -1,12 +1,13 @@ use std::thread::{JoinHandle, spawn}; use tokio::runtime::Builder; +use vprogs_l1_types::ChainBlockMetadata; use vprogs_scheduling_scheduler::Processor; use vprogs_storage_types::Store; use crate::{Backend, TransactionProver}; -/// Background worker that dispatches transaction proofs to a [`Backend`] concurrently. +/// Background worker that proves queued transactions through a [`Backend`], one at a time. pub(crate) struct Worker, B: Backend> { /// Shared prover state (inbox, shutdown). prover: TransactionProver, @@ -14,7 +15,11 @@ pub(crate) struct Worker, B: Backend> { backend: B, } -impl, B: Backend> Worker { +impl Worker +where + S: Store, + P: Processor, +{ /// Spawns the worker on a new thread with a single-threaded tokio runtime and returns its join /// handle. The prover joins this on shutdown so the worker's GPU prover is torn down (its risc0 /// CUDA context released) before the process exits. @@ -29,15 +34,30 @@ impl, B: Backend> Wo // Register notification before draining so we don't miss pushes. let notified = self.prover.inbox.notified(); - // Dispatch all queued transactions for proving. + // Prove each queued transaction in turn. while let Some((tx, tx_inputs)) = self.prover.inbox.pop() { - if tx.batch().upgrade().is_some_and(|b| !b.canceled()) { - let receipt = self.backend.prove_transaction(tx_inputs); - tokio::spawn(async move { tx.publish_artifact(Some(receipt.await)) }); - } else { + // Hold the batch alive for the iteration: the tx's receipt lookups reach storage + // through it, and dropping it would also drop the only path to the cache. + let Some(_batch) = tx.batch().upgrade().filter(|b| !b.canceled()) else { // Canceled or dropped batch: advance the counter without proving. tx.publish_artifact(None); + continue; + }; + + // Coordinate this receipt with the proof-receipt cache: the same (checkpoint, + // block, image, merge index) coordinate proves to the same receipt, so a replay + // (including a flip reorg back onto this fork) reuses it instead of re-proving. + if let Some(receipt) = tx.read_tx_receipt().resolve().await { + tx.publish_artifact(Some(receipt)); + continue; } + + let receipt = self.backend.prove_transaction(tx_inputs).await; + + // Wait for the receipt to be durable before publishing the artifact, so a crash + // never leaves a consumed-but-uncached receipt. + tx.write_tx_receipt(receipt.clone()).wait().await; + tx.publish_artifact(Some(receipt)); } // Wait for a new submission or shutdown. diff --git a/zk/vm/src/backend.rs b/zk/vm/src/backend.rs index 6921f261..87c8dca3 100644 --- a/zk/vm/src/backend.rs +++ b/zk/vm/src/backend.rs @@ -1,5 +1,7 @@ -/// Full ZK backend: synchronous execution plus transaction and batch proving. -pub trait Backend: vprogs_zk_batch_prover::Backend { +/// Full ZK backend: synchronous execution plus transaction, batch, and settlement-aggregation +/// proving. Extending the aggregate backend lets the VM expose every program image id (including +/// the aggregator's) through its [`Processor`](vprogs_scheduling_scheduler::Processor) impl. +pub trait Backend: vprogs_zk_aggregate_prover::Backend { /// Execute a transaction from pre-encoded wire bytes and return the raw result. fn execute_transaction(&self, wire_bytes: &[u8]) -> Vec; } diff --git a/zk/vm/src/proving_pipeline.rs b/zk/vm/src/proving_pipeline.rs index be285cc4..8ee30a60 100644 --- a/zk/vm/src/proving_pipeline.rs +++ b/zk/vm/src/proving_pipeline.rs @@ -22,7 +22,10 @@ pub enum ProvingPipeline> { impl> ProvingPipeline { /// Creates a transaction-only proving pipeline. - pub fn transaction>(backend: B) -> Self { + pub fn transaction>(backend: B) -> Self + where + P: Processor, + { Self::Transaction(TransactionProver::new(backend)) } @@ -59,6 +62,7 @@ impl> ProvingPipeline { S, TransactionArtifact = B::Receipt, BatchArtifact = B::Receipt, + AggregatorArtifact = B::Receipt, BatchMetadata = ChainBlockMetadata, >, { diff --git a/zk/vm/src/vm.rs b/zk/vm/src/vm.rs index a3d2d435..662c1bd4 100644 --- a/zk/vm/src/vm.rs +++ b/zk/vm/src/vm.rs @@ -65,9 +65,22 @@ impl Processor for Vm { self.proving_pipeline.shutdown(); } + fn tx_image_id(&self) -> [u8; 32] { + *self.backend.image_id() + } + + fn batch_image_id(&self) -> [u8; 32] { + *self.backend.batch_image_id() + } + + fn aggregator_image_id(&self) -> [u8; 32] { + *self.backend.aggregator_image_id() + } + type Transaction = L1Transaction; type TransactionArtifact = B::Receipt; type BatchArtifact = B::Receipt; + type AggregatorArtifact = B::Receipt; type BatchMetadata = ChainBlockMetadata; type Error = Error; } From 98825519e748ad0da79316c918000ca09f2c7820 Mon Sep 17 00:00:00 2001 From: 143672 Date: Wed, 17 Jun 2026 18:45:26 +0400 Subject: [PATCH 3/4] Move ScheduledBundle back into the aggregate prover Return ScheduledBundle (and BundleBlocks) to the aggregate-prover crate so the scheduler stays bundle-agnostic: it owns the per-tx/per-batch/ aggregate receipt store but knows nothing about how bundles are formed. ScheduledBatch gains concrete read_agg_receipt / write_agg_receipt gateway methods that take a bare start coordinate and build the AggregatorKey internally, so the bundle forwards its own coordinate rather than reaching into the receipt store. The generic submit_*_receipt helpers and the ReceiptLookup trait stay private, keeping that machinery off the scheduler's public surface. Drop kaspa-hashes and vprogs-l1-types from the scheduler and the scheduler dependency from the settler (both only pulled in for the bundle), move the receipt-cache round-trip test alongside the bundle, and scrub "bundle" from the scheduler's receipt-store docs. --- Cargo.lock | 8 ++-- examples/tn10-flow/src/daemon.rs | 4 +- scheduling/scheduler/Cargo.toml | 3 -- scheduling/scheduler/src/lib.rs | 2 - scheduling/scheduler/src/processor.rs | 5 +- scheduling/scheduler/src/scheduled_batch.rs | 46 +++++++++++++++++-- scheduling/scheduler/src/storage_cmd.rs | 6 +-- sim/src/driver/l2_driver.rs | 4 +- zk/aggregate-prover/Cargo.toml | 7 +++ zk/aggregate-prover/src/config.rs | 3 +- zk/aggregate-prover/src/lib.rs | 2 + .../aggregate-prover}/src/scheduled_bundle.rs | 35 +++++--------- zk/aggregate-prover/src/worker.rs | 5 +- .../tests/proof_receipt_cache.rs | 3 +- zk/backend/risc0/settler/Cargo.toml | 1 - zk/backend/risc0/settler/src/worker.rs | 3 +- 16 files changed, 82 insertions(+), 55 deletions(-) rename {scheduling/scheduler => zk/aggregate-prover}/src/scheduled_bundle.rs (84%) rename {scheduling/scheduler => zk/aggregate-prover}/tests/proof_receipt_cache.rs (95%) diff --git a/Cargo.lock b/Cargo.lock index 1c36e236..6e1d7a39 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8715,7 +8715,6 @@ dependencies = [ "borsh", "crossbeam-deque", "crossbeam-queue", - "kaspa-hashes", "num_cpus", "tap", "tempfile", @@ -8727,7 +8726,6 @@ dependencies = [ "vprogs-core-smt", "vprogs-core-test-utils", "vprogs-core-types", - "vprogs-l1-types", "vprogs-scheduling-execution-workers", "vprogs-scheduling-test-utils", "vprogs-state-batch-metadata", @@ -9081,14 +9079,19 @@ dependencies = [ "kaspa-hashes", "kaspa-rpc-core", "log", + "tempfile", "tokio", "vprogs-core-atomics", "vprogs-core-codec", "vprogs-core-macros", "vprogs-core-smt", + "vprogs-core-types", "vprogs-l1-types", "vprogs-scheduling-scheduler", + "vprogs-scheduling-test-utils", "vprogs-state-proof-receipt", + "vprogs-storage-manager", + "vprogs-storage-rocksdb-store", "vprogs-storage-types", "vprogs-zk-abi", "vprogs-zk-batch-prover", @@ -9155,7 +9158,6 @@ dependencies = [ "vprogs-core-atomics", "vprogs-core-smt", "vprogs-l1-wallet", - "vprogs-scheduling-scheduler", "vprogs-zk-aggregate-prover", "vprogs-zk-backend-risc0-api", "vprogs-zk-backend-risc0-covenant", diff --git a/examples/tn10-flow/src/daemon.rs b/examples/tn10-flow/src/daemon.rs index 7b3fa9df..39e654c4 100644 --- a/examples/tn10-flow/src/daemon.rs +++ b/examples/tn10-flow/src/daemon.rs @@ -21,10 +21,10 @@ use kaspa_wrpc_client::prelude::KaspaRpcClient; use vprogs_core_atomics::AsyncQueue; use vprogs_l1_bridge::L1BridgeConfig; use vprogs_node_framework::{Node, NodeConfig}; -use vprogs_scheduling_scheduler::{ExecutionConfig, ScheduledBundle}; +use vprogs_scheduling_scheduler::ExecutionConfig; use vprogs_storage_manager::StorageConfig; use vprogs_storage_rocksdb_store::RocksDbStore; -use vprogs_zk_aggregate_prover::{AggregateProverConfig, SettlementArtifact}; +use vprogs_zk_aggregate_prover::{AggregateProverConfig, ScheduledBundle, SettlementArtifact}; use vprogs_zk_backend_risc0_api::{Backend, ProofType, Receipt}; use vprogs_zk_batch_prover::{BatchProverConfig, LaneProofRequest, LaneProofSource}; use vprogs_zk_vm::{ProvingPipeline, Vm}; diff --git a/scheduling/scheduler/Cargo.toml b/scheduling/scheduler/Cargo.toml index ac3b6885..eade53c0 100644 --- a/scheduling/scheduler/Cargo.toml +++ b/scheduling/scheduler/Cargo.toml @@ -8,7 +8,6 @@ arc-swap.workspace = true borsh.workspace = true crossbeam-deque.workspace = true crossbeam-queue.workspace = true -kaspa-hashes.workspace = true num_cpus.workspace = true tap.workspace = true thiserror.workspace = true @@ -18,7 +17,6 @@ vprogs-core-hashing.workspace = true vprogs-core-macros.workspace = true vprogs-core-smt.workspace = true vprogs-core-types.workspace = true -vprogs-l1-types.workspace = true vprogs-scheduling-execution-workers.workspace = true vprogs-state-batch-metadata.workspace = true vprogs-state-metadata.workspace = true @@ -31,7 +29,6 @@ vprogs-storage-types.workspace = true zerocopy.workspace = true [dev-dependencies] -kaspa-hashes.workspace = true tempfile.workspace = true vprogs-core-test-utils.workspace = true vprogs-core-types.workspace = true diff --git a/scheduling/scheduler/src/lib.rs b/scheduling/scheduler/src/lib.rs index 84b301bb..7f1e2089 100644 --- a/scheduling/scheduler/src/lib.rs +++ b/scheduling/scheduler/src/lib.rs @@ -10,7 +10,6 @@ mod resource; mod resource_access; mod rollback; mod scheduled_batch; -mod scheduled_bundle; mod scheduled_transaction; mod scheduler; mod state; @@ -28,7 +27,6 @@ pub use pruning_worker::PruningWorker; pub(crate) use resource::Resource; pub(crate) use resource_access::ResourceAccess; pub use scheduled_batch::{ScheduledBatch, ScheduledBatchRef}; -pub use scheduled_bundle::{BundleBlocks, ScheduledBundle}; pub use scheduled_transaction::ScheduledTransaction; pub(crate) use scheduled_transaction::ScheduledTransactionRef; pub use scheduler::Scheduler; diff --git a/scheduling/scheduler/src/processor.rs b/scheduling/scheduler/src/processor.rs index d2c35644..224b72cf 100644 --- a/scheduling/scheduler/src/processor.rs +++ b/scheduling/scheduler/src/processor.rs @@ -31,8 +31,7 @@ pub trait Processor: Clone + Sized + Send + Sync + 'static { /// Program identifier keying a per-batch receipt in the proof-receipt store. fn batch_image_id(&self) -> [u8; 32]; - /// Program identifier keying an aggregated-bundle (settlement) receipt in the proof-receipt - /// store. + /// Program identifier keying an aggregate (settlement) receipt in the proof-receipt store. fn aggregator_image_id(&self) -> [u8; 32]; /// The transaction payload type (e.g. kaspa `L1Transaction`, `usize` in tests). @@ -45,7 +44,7 @@ pub trait Processor: Clone + Sized + Send + Sync + 'static { /// Artifact produced by a processed batch ([`ScheduledBatch::publish_artifact`]). Bounded for /// the same proof-receipt caching as [`TransactionArtifact`](Self::TransactionArtifact). type BatchArtifact: Clone + BorshSerialize + BorshDeserialize + Send + Sync + 'static; - /// Receipt produced by aggregating a bundle of per-batch receipts into one settlement receipt. + /// Receipt produced by aggregating a run of per-batch receipts into one settlement receipt. /// Bounded for the same proof-receipt caching as the other artifacts. type AggregatorArtifact: Clone + BorshSerialize + BorshDeserialize + Send + Sync + 'static; /// Opaque metadata attached to each batch for persistence. diff --git a/scheduling/scheduler/src/scheduled_batch.rs b/scheduling/scheduler/src/scheduled_batch.rs index e391857e..cb68d71c 100644 --- a/scheduling/scheduler/src/scheduled_batch.rs +++ b/scheduling/scheduler/src/scheduled_batch.rs @@ -12,7 +12,7 @@ use vprogs_core_types::{BatchMetadata, Checkpoint, ResourceId, SchedulerTransact use vprogs_scheduling_execution_workers::Batch; use vprogs_state_batch_metadata::BatchMetadata as StoredBatchMetadata; use vprogs_state_metadata::StateMetadata; -use vprogs_state_proof_receipt::{BatchKey, Prefix}; +use vprogs_state_proof_receipt::{AggregatorKey, BatchKey, Prefix}; use vprogs_storage_types::Store; use crate::{ @@ -251,10 +251,46 @@ impl> ScheduledBatch { } } - /// Aggregator program image id, keying a bundle's aggregate receipt (the bundle derives the - /// rest of the key from its own start coordinate). - pub(crate) fn aggregator_image_id(&self) -> [u8; 32] { - self.processor.aggregator_image_id() + /// Looks up the aggregate (settlement) receipt at the start coordinate (`checkpoint_index` / + /// `from_block`) and claimed tip `seq_commit`, with this batch as the storage gateway: the + /// aggregate prover holds no store of its own, so it reaches the receipt cache through a batch's + /// storage handle. Resolves to the receipt, or `None` on a miss. + pub fn read_agg_receipt( + &self, + checkpoint_index: u64, + from_block: [u8; 32], + seq_commit: [u8; 32], + ) -> ReceiptRead { + self.submit_read_receipt(self.agg_key(checkpoint_index, from_block, seq_commit)) + } + + /// Stores the aggregate (settlement) receipt at the start coordinate through the write worker, + /// returning a latch that opens once it commits. This batch is the storage gateway, as for + /// [`read_agg_receipt`](Self::read_agg_receipt). + pub fn write_agg_receipt( + &self, + checkpoint_index: u64, + from_block: [u8; 32], + seq_commit: [u8; 32], + receipt: P::AggregatorArtifact, + ) -> AtomicAsyncLatch { + self.submit_store_receipt(self.agg_key(checkpoint_index, from_block, seq_commit), receipt) + } + + /// The aggregate receipt key at the start coordinate (`checkpoint_index` + `from_block`) and + /// claimed tip `seq_commit`; this batch supplies the aggregator image id. + fn agg_key( + &self, + checkpoint_index: u64, + from_block: [u8; 32], + seq_commit: [u8; 32], + ) -> AggregatorKey { + AggregatorKey { + prefix: Prefix { checkpoint_index: checkpoint_index.into() }, + block_hash: from_block, + image_id: self.processor.aggregator_image_id(), + seq_commit, + } } /// Submits a proof-receipt lookup for the typed `key` to the read worker, returning the typed diff --git a/scheduling/scheduler/src/storage_cmd.rs b/scheduling/scheduler/src/storage_cmd.rs index 3eaf2d4e..b1bcf8a4 100644 --- a/scheduling/scheduler/src/storage_cmd.rs +++ b/scheduling/scheduler/src/storage_cmd.rs @@ -20,7 +20,7 @@ pub(crate) enum Key { Tx(TxKey), /// A per-batch receipt key. Batch(BatchKey), - /// An aggregated-bundle (settlement) receipt key. + /// An aggregate (settlement) receipt key. Agg(AggregatorKey), } @@ -46,7 +46,7 @@ pub enum ReceiptValue> { Tx(P::TransactionArtifact), /// A per-batch receipt. Batch(P::BatchArtifact), - /// An aggregated-bundle (settlement) receipt. + /// An aggregate (settlement) receipt. Agg(P::AggregatorArtifact), } @@ -88,7 +88,7 @@ impl> ReceiptValue { } } - /// Moves out the aggregated-bundle receipt. Panics if the value is not an [`Agg`](Self::Agg). + /// Moves out the aggregate receipt. Panics if the value is not an [`Agg`](Self::Agg). pub(crate) fn into_agg(self) -> P::AggregatorArtifact { match self { ReceiptValue::Agg(receipt) => receipt, diff --git a/sim/src/driver/l2_driver.rs b/sim/src/driver/l2_driver.rs index 19862066..46bb1769 100644 --- a/sim/src/driver/l2_driver.rs +++ b/sim/src/driver/l2_driver.rs @@ -20,9 +20,9 @@ use vprogs_core_test_utils::ResourceIdExt; use vprogs_core_types::{AccessMetadata, ResourceId, SchedulerTransaction}; use vprogs_l1_types::ChainBlockMetadata; use vprogs_l1_wallet::{build, encode_activity_payload}; -use vprogs_scheduling_scheduler::{Processor, ScheduledBundle, Scheduler}; +use vprogs_scheduling_scheduler::{Processor, Scheduler}; use vprogs_storage_rocksdb_store::RocksDbStore; -use vprogs_zk_aggregate_prover::{AggregateProverConfig, SettlementArtifact}; +use vprogs_zk_aggregate_prover::{AggregateProverConfig, ScheduledBundle, SettlementArtifact}; use vprogs_zk_backend_risc0_api::{Backend, ProofType, Receipt}; use vprogs_zk_backend_risc0_covenant::{ Settlement, SettlementDevInput, build_dev_redeem_script, dev_redeem_script_len, diff --git a/zk/aggregate-prover/Cargo.toml b/zk/aggregate-prover/Cargo.toml index 8a985315..cb2f8825 100644 --- a/zk/aggregate-prover/Cargo.toml +++ b/zk/aggregate-prover/Cargo.toml @@ -20,3 +20,10 @@ vprogs-storage-types.workspace = true vprogs-zk-abi = { workspace = true, features = ["host"] } vprogs-zk-batch-prover.workspace = true vprogs-zk-transaction-prover.workspace = true + +[dev-dependencies] +tempfile.workspace = true +vprogs-core-types.workspace = true +vprogs-scheduling-test-utils.workspace = true +vprogs-storage-manager.workspace = true +vprogs-storage-rocksdb-store.workspace = true diff --git a/zk/aggregate-prover/src/config.rs b/zk/aggregate-prover/src/config.rs index 8dff3b2b..034b06ad 100644 --- a/zk/aggregate-prover/src/config.rs +++ b/zk/aggregate-prover/src/config.rs @@ -1,9 +1,8 @@ use kaspa_hashes::Hash; use vprogs_core_atomics::AsyncQueue; -use vprogs_scheduling_scheduler::ScheduledBundle; use vprogs_zk_batch_prover::LaneProofSource; -use crate::SettlementArtifact; +use crate::{ScheduledBundle, SettlementArtifact}; /// Static configuration for the aggregate prover. /// diff --git a/zk/aggregate-prover/src/lib.rs b/zk/aggregate-prover/src/lib.rs index c6693187..eda960f9 100644 --- a/zk/aggregate-prover/src/lib.rs +++ b/zk/aggregate-prover/src/lib.rs @@ -2,10 +2,12 @@ mod backend; mod command; mod config; mod prover; +mod scheduled_bundle; mod settlement_artifact; mod worker; pub use backend::Backend; pub use config::AggregateProverConfig; pub use prover::AggregateProver; +pub use scheduled_bundle::{BundleBlocks, ScheduledBundle}; pub use settlement_artifact::SettlementArtifact; diff --git a/scheduling/scheduler/src/scheduled_bundle.rs b/zk/aggregate-prover/src/scheduled_bundle.rs similarity index 84% rename from scheduling/scheduler/src/scheduled_bundle.rs rename to zk/aggregate-prover/src/scheduled_bundle.rs index 3dd49949..1134e4cf 100644 --- a/scheduling/scheduler/src/scheduled_bundle.rs +++ b/zk/aggregate-prover/src/scheduled_bundle.rs @@ -5,11 +5,9 @@ use kaspa_hashes::Hash; use vprogs_core_atomics::AtomicAsyncLatch; use vprogs_core_macros::smart_pointer; use vprogs_l1_types::SettlementInfo; -use vprogs_state_proof_receipt::{AggregatorKey, Prefix}; +use vprogs_scheduling_scheduler::{Processor, ReceiptRead, ScheduledBatch}; use vprogs_storage_types::Store; -use crate::{ReceiptRead, ScheduledBatch, processor::Processor}; - /// The L1 block span a bundle proves over. #[derive(Clone, Copy)] pub struct BundleBlocks { @@ -22,9 +20,9 @@ pub struct BundleBlocks { /// A formed bundle of proved batches, published to a settlement consumer before its artifact /// exists. /// -/// Mirrors [`ScheduledBatch`](crate::ScheduledBatch)'s artifact mechanism: the handle is published -/// *before* its artifact exists, so a consumer can pop it, read the immediate metadata (e.g. -/// reconcile pacing against `batches`), and then await the proved artifact `A`, filled via +/// Mirrors [`ScheduledBatch`]'s artifact mechanism: the handle is published *before* its artifact +/// exists, so a consumer can pop it, read the immediate metadata (e.g. reconcile pacing against +/// `batches`), and then await the proved artifact `A`, filled via /// [`publish_artifact`](Self::publish_artifact) once proving completes. /// /// A no-op bundle (all-empty prefix, or one that advanced no state) carries no artifact: it is @@ -126,14 +124,13 @@ impl ScheduledBundle { /// Looks up this bundle's cached aggregate (settlement) receipt, returning a handle that /// resolves to the deserialized receipt, or `None` on a cache miss. `gateway` is any batch in - /// the bundle: the aggregate prover holds no store of its own, so it reaches the receipt cache - /// through a batch's storage handle. + /// the bundle, supplying the storage handle the aggregate prover reaches the cache through. pub fn read_agg_receipt>( &self, gateway: &ScheduledBatch, seq_commit: [u8; 32], ) -> ReceiptRead { - gateway.submit_read_receipt(self.agg_key(gateway, seq_commit)) + gateway.read_agg_receipt(self.checkpoint_index, self.from_block.as_bytes(), seq_commit) } /// Stores this bundle's aggregate (settlement) receipt through the write worker, returning a @@ -145,22 +142,12 @@ impl ScheduledBundle { seq_commit: [u8; 32], receipt: P::AggregatorArtifact, ) -> AtomicAsyncLatch { - gateway.submit_store_receipt(self.agg_key(gateway, seq_commit), receipt) - } - - /// The aggregate receipt key at this bundle's start coordinate (first checkpoint index + block) - /// and claimed tip `seq_commit`; `gateway` supplies the aggregator image id. - fn agg_key>( - &self, - gateway: &ScheduledBatch, - seq_commit: [u8; 32], - ) -> AggregatorKey { - AggregatorKey { - prefix: Prefix { checkpoint_index: self.checkpoint_index.into() }, - block_hash: self.from_block.as_bytes(), - image_id: gateway.aggregator_image_id(), + gateway.write_agg_receipt( + self.checkpoint_index, + self.from_block.as_bytes(), seq_commit, - } + receipt, + ) } /// Publishes the bundle's artifact and opens the `artifact_published` latch. A `None` artifact diff --git a/zk/aggregate-prover/src/worker.rs b/zk/aggregate-prover/src/worker.rs index 65d14484..b99a7e83 100644 --- a/zk/aggregate-prover/src/worker.rs +++ b/zk/aggregate-prover/src/worker.rs @@ -9,13 +9,14 @@ use vprogs_core_atomics::AsyncQueue; use vprogs_core_codec::Reader; use vprogs_core_smt::EMPTY_HASH; use vprogs_l1_types::{ChainBlockMetadata, SettlementInfo}; -use vprogs_scheduling_scheduler::{BundleBlocks, Processor, ScheduledBatch, ScheduledBundle}; +use vprogs_scheduling_scheduler::{Processor, ScheduledBatch}; use vprogs_storage_types::Store; use vprogs_zk_abi::batch_aggregator::{Inputs as AggregatorInputs, StateTransition}; use vprogs_zk_batch_prover::{LaneProofRequest, LaneProofSource}; use crate::{ - AggregateProver, AggregateProverConfig, Backend, SettlementArtifact, command::Command, + AggregateProver, AggregateProverConfig, Backend, BundleBlocks, ScheduledBundle, + SettlementArtifact, command::Command, }; /// Background worker that accumulates scheduled batches, forms bundles from the consecutively-ready diff --git a/scheduling/scheduler/tests/proof_receipt_cache.rs b/zk/aggregate-prover/tests/proof_receipt_cache.rs similarity index 95% rename from scheduling/scheduler/tests/proof_receipt_cache.rs rename to zk/aggregate-prover/tests/proof_receipt_cache.rs index 7f18deaa..518d0677 100644 --- a/scheduling/scheduler/tests/proof_receipt_cache.rs +++ b/zk/aggregate-prover/tests/proof_receipt_cache.rs @@ -6,10 +6,11 @@ use kaspa_hashes::Hash; use tempfile::TempDir; use vprogs_core_types::BatchMetadata; -use vprogs_scheduling_scheduler::{BundleBlocks, ExecutionConfig, ScheduledBundle, Scheduler}; +use vprogs_scheduling_scheduler::{ExecutionConfig, Scheduler}; use vprogs_scheduling_test_utils::Processor; use vprogs_storage_manager::StorageConfig; use vprogs_storage_rocksdb_store::RocksDbStore; +use vprogs_zk_aggregate_prover::{BundleBlocks, ScheduledBundle}; #[test] fn proof_receipt_round_trips_through_storage_workers() { diff --git a/zk/backend/risc0/settler/Cargo.toml b/zk/backend/risc0/settler/Cargo.toml index 03bd0889..f003471d 100644 --- a/zk/backend/risc0/settler/Cargo.toml +++ b/zk/backend/risc0/settler/Cargo.toml @@ -16,7 +16,6 @@ tokio = { workspace = true, features = ["ma vprogs-core-atomics.workspace = true vprogs-core-smt.workspace = true vprogs-l1-wallet.workspace = true -vprogs-scheduling-scheduler.workspace = true vprogs-zk-aggregate-prover.workspace = true vprogs-zk-backend-risc0-api = { workspace = true, features = ["host"] } vprogs-zk-backend-risc0-covenant.workspace = true diff --git a/zk/backend/risc0/settler/src/worker.rs b/zk/backend/risc0/settler/src/worker.rs index a7a11a78..53ca50f7 100644 --- a/zk/backend/risc0/settler/src/worker.rs +++ b/zk/backend/risc0/settler/src/worker.rs @@ -12,8 +12,7 @@ use kaspa_wrpc_client::prelude::KaspaRpcClient; use secp256k1::Keypair; use vprogs_core_atomics::{AsyncQueue, AtomicAsyncLatch}; use vprogs_l1_wallet::Wallet; -use vprogs_scheduling_scheduler::ScheduledBundle; -use vprogs_zk_aggregate_prover::SettlementArtifact; +use vprogs_zk_aggregate_prover::{ScheduledBundle, SettlementArtifact}; use vprogs_zk_backend_risc0_api::{Backend, Receipt}; use crate::covenant::{CovenantState, build_settlement}; From 8182f734273ff0ad88cf5a3a3d18b32592b53703 Mon Sep 17 00:00:00 2001 From: 143672 Date: Wed, 17 Jun 2026 18:51:15 +0400 Subject: [PATCH 4/4] Group aggregate-receipt coordinate into AggReceiptCoord read_agg_receipt/write_agg_receipt/agg_key took from_block and seq_commit as adjacent [u8; 32] positional args, which a call site could transpose silently. Replace the trio with a named AggReceiptCoord struct so the fields disambiguate at every call site. --- scheduling/scheduler/src/lib.rs | 2 +- scheduling/scheduler/src/scheduled_batch.rs | 49 ++++++++++----------- zk/aggregate-prover/src/scheduled_bundle.rs | 16 ++++--- 3 files changed, 36 insertions(+), 31 deletions(-) diff --git a/scheduling/scheduler/src/lib.rs b/scheduling/scheduler/src/lib.rs index 7f1e2089..a8bf5996 100644 --- a/scheduling/scheduler/src/lib.rs +++ b/scheduling/scheduler/src/lib.rs @@ -26,7 +26,7 @@ pub use processor::Processor; pub use pruning_worker::PruningWorker; pub(crate) use resource::Resource; pub(crate) use resource_access::ResourceAccess; -pub use scheduled_batch::{ScheduledBatch, ScheduledBatchRef}; +pub use scheduled_batch::{AggReceiptCoord, ScheduledBatch, ScheduledBatchRef}; pub use scheduled_transaction::ScheduledTransaction; pub(crate) use scheduled_transaction::ScheduledTransactionRef; pub use scheduler::Scheduler; diff --git a/scheduling/scheduler/src/scheduled_batch.rs b/scheduling/scheduler/src/scheduled_batch.rs index cb68d71c..8be3487f 100644 --- a/scheduling/scheduler/src/scheduled_batch.rs +++ b/scheduling/scheduler/src/scheduled_batch.rs @@ -21,6 +21,16 @@ use crate::{ state::SchedulerState, storage_cmd::ReceiptLookup, }; +/// The bundle-start coordinate and claimed tip that key an aggregate (settlement) receipt. +pub struct AggReceiptCoord { + /// Bundle-start checkpoint index. + pub checkpoint_index: u64, + /// L1 block at the bundle's first checkpoint (the block it proves from). + pub from_block: [u8; 32], + /// Commitment to the bundle's claimed tip. + pub seq_commit: [u8; 32], +} + /// A batch of transactions progressing through the scheduler's lifecycle. /// /// Each batch moves through three stages: processed (all transactions executed), persisted (all @@ -251,45 +261,34 @@ impl> ScheduledBatch { } } - /// Looks up the aggregate (settlement) receipt at the start coordinate (`checkpoint_index` / - /// `from_block`) and claimed tip `seq_commit`, with this batch as the storage gateway: the - /// aggregate prover holds no store of its own, so it reaches the receipt cache through a batch's - /// storage handle. Resolves to the receipt, or `None` on a miss. + /// Looks up the aggregate (settlement) receipt at `coord`, with this batch as the storage + /// gateway: the aggregate prover holds no store of its own, so it reaches the receipt cache + /// through a batch's storage handle. Resolves to the receipt, or `None` on a miss. pub fn read_agg_receipt( &self, - checkpoint_index: u64, - from_block: [u8; 32], - seq_commit: [u8; 32], + coord: AggReceiptCoord, ) -> ReceiptRead { - self.submit_read_receipt(self.agg_key(checkpoint_index, from_block, seq_commit)) + self.submit_read_receipt(self.agg_key(coord)) } - /// Stores the aggregate (settlement) receipt at the start coordinate through the write worker, - /// returning a latch that opens once it commits. This batch is the storage gateway, as for + /// Stores the aggregate (settlement) receipt at `coord` through the write worker, returning a + /// latch that opens once it commits. This batch is the storage gateway, as for /// [`read_agg_receipt`](Self::read_agg_receipt). pub fn write_agg_receipt( &self, - checkpoint_index: u64, - from_block: [u8; 32], - seq_commit: [u8; 32], + coord: AggReceiptCoord, receipt: P::AggregatorArtifact, ) -> AtomicAsyncLatch { - self.submit_store_receipt(self.agg_key(checkpoint_index, from_block, seq_commit), receipt) + self.submit_store_receipt(self.agg_key(coord), receipt) } - /// The aggregate receipt key at the start coordinate (`checkpoint_index` + `from_block`) and - /// claimed tip `seq_commit`; this batch supplies the aggregator image id. - fn agg_key( - &self, - checkpoint_index: u64, - from_block: [u8; 32], - seq_commit: [u8; 32], - ) -> AggregatorKey { + /// The aggregate receipt key at `coord`; this batch supplies the aggregator image id. + fn agg_key(&self, coord: AggReceiptCoord) -> AggregatorKey { AggregatorKey { - prefix: Prefix { checkpoint_index: checkpoint_index.into() }, - block_hash: from_block, + prefix: Prefix { checkpoint_index: coord.checkpoint_index.into() }, + block_hash: coord.from_block, image_id: self.processor.aggregator_image_id(), - seq_commit, + seq_commit: coord.seq_commit, } } diff --git a/zk/aggregate-prover/src/scheduled_bundle.rs b/zk/aggregate-prover/src/scheduled_bundle.rs index 1134e4cf..24a641e3 100644 --- a/zk/aggregate-prover/src/scheduled_bundle.rs +++ b/zk/aggregate-prover/src/scheduled_bundle.rs @@ -5,7 +5,7 @@ use kaspa_hashes::Hash; use vprogs_core_atomics::AtomicAsyncLatch; use vprogs_core_macros::smart_pointer; use vprogs_l1_types::SettlementInfo; -use vprogs_scheduling_scheduler::{Processor, ReceiptRead, ScheduledBatch}; +use vprogs_scheduling_scheduler::{AggReceiptCoord, Processor, ReceiptRead, ScheduledBatch}; use vprogs_storage_types::Store; /// The L1 block span a bundle proves over. @@ -130,7 +130,11 @@ impl ScheduledBundle { gateway: &ScheduledBatch, seq_commit: [u8; 32], ) -> ReceiptRead { - gateway.read_agg_receipt(self.checkpoint_index, self.from_block.as_bytes(), seq_commit) + gateway.read_agg_receipt(AggReceiptCoord { + checkpoint_index: self.checkpoint_index, + from_block: self.from_block.as_bytes(), + seq_commit, + }) } /// Stores this bundle's aggregate (settlement) receipt through the write worker, returning a @@ -143,9 +147,11 @@ impl ScheduledBundle { receipt: P::AggregatorArtifact, ) -> AtomicAsyncLatch { gateway.write_agg_receipt( - self.checkpoint_index, - self.from_block.as_bytes(), - seq_commit, + AggReceiptCoord { + checkpoint_index: self.checkpoint_index, + from_block: self.from_block.as_bytes(), + seq_commit, + }, receipt, ) }