From 37c1f6f08f5bc3ae7d7bd1d8f99ff04bea6d8966 Mon Sep 17 00:00:00 2001 From: redsh4de <25299353+redsh4de@users.noreply.github.com> Date: Wed, 20 May 2026 12:27:49 +0300 Subject: [PATCH 1/5] typed service errors and spawn_critical --- binaries/cuprated/src/blockchain.rs | 20 +- binaries/cuprated/src/blockchain/error.rs | 92 ++++++ binaries/cuprated/src/blockchain/interface.rs | 56 +--- binaries/cuprated/src/blockchain/manager.rs | 45 +-- .../src/blockchain/manager/commands.rs | 4 +- .../src/blockchain/manager/handler.rs | 305 +++++++++--------- .../cuprated/src/blockchain/manager/tests.rs | 60 ++-- binaries/cuprated/src/constants.rs | 4 +- binaries/cuprated/src/lib.rs | 278 ++++++++-------- binaries/cuprated/src/monitor.rs | 48 ++- binaries/cuprated/src/p2p.rs | 113 ++++--- binaries/cuprated/src/p2p/request_handler.rs | 6 +- binaries/cuprated/src/rpc/server.rs | 28 +- .../cuprated/src/rpc/service/tx_handler.rs | 27 +- binaries/cuprated/src/tor.rs | 33 +- binaries/cuprated/src/txpool.rs | 4 +- binaries/cuprated/src/txpool/error.rs | 72 +++++ binaries/cuprated/src/txpool/incoming_tx.rs | 97 +++--- binaries/cuprated/src/txpool/manager.rs | 171 +++++----- types/types/src/lib.rs | 2 +- 20 files changed, 860 insertions(+), 605 deletions(-) create mode 100644 binaries/cuprated/src/blockchain/error.rs create mode 100644 binaries/cuprated/src/txpool/error.rs diff --git a/binaries/cuprated/src/blockchain.rs b/binaries/cuprated/src/blockchain.rs index d20e44631..edb46d037 100644 --- a/binaries/cuprated/src/blockchain.rs +++ b/binaries/cuprated/src/blockchain.rs @@ -19,17 +19,18 @@ use cuprate_types::{ VerifiedBlockInformation, }; -use crate::constants::PANIC_CRITICAL_SERVICE_ERROR; - mod chain_service; +mod error; mod fast_sync; pub mod interface; mod manager; pub(crate) mod syncer; mod types; +pub use error::{BlockManagerError, BlockValidationError, IncomingBlockError}; pub use fast_sync::get_fast_sync_hashes; pub use interface::BlockchainManagerHandle; +pub use manager::IncomingBlockOk; pub use syncer::{Syncer, SyncerHandle}; pub use types::ConsensusBlockchainReadHandle; @@ -85,17 +86,16 @@ pub async fn check_add_genesis( blockchain_read_handle: &mut BlockchainReadHandle, blockchain_write_handle: &mut BlockchainWriteHandle, network: Network, -) { +) -> anyhow::Result<()> { // Try to get the chain height, will fail if the genesis block is not in the DB. if blockchain_read_handle .ready() - .await - .expect(PANIC_CRITICAL_SERVICE_ERROR) + .await? .call(BlockchainReadRequest::ChainHeight) .await .is_ok() { - return; + return Ok(()); } let genesis = generate_genesis_block(network); @@ -105,8 +105,7 @@ pub async fn check_add_genesis( blockchain_write_handle .ready() - .await - .expect(PANIC_CRITICAL_SERVICE_ERROR) + .await? .call(BlockchainWriteRequest::WriteBlock( VerifiedBlockInformation { block_blob: genesis.serialize(), @@ -123,8 +122,9 @@ pub async fn check_add_genesis( block: genesis, }, )) - .await - .expect(PANIC_CRITICAL_SERVICE_ERROR); + .await?; + + Ok(()) } /// Initializes the consensus services. diff --git a/binaries/cuprated/src/blockchain/error.rs b/binaries/cuprated/src/blockchain/error.rs new file mode 100644 index 000000000..7df4fa90c --- /dev/null +++ b/binaries/cuprated/src/blockchain/error.rs @@ -0,0 +1,92 @@ +//! Error types for the blockchain manager interface. + +use cuprate_blockchain::BlockchainError; +use cuprate_consensus::ExtendedConsensusError; +use cuprate_consensus_rules::ConsensusError; +use cuprate_txpool::TxPoolError; +use cuprate_types::TxConversionError; + +macro_rules! impl_internal_from { + ($($t:ty),* $(,)?) => {$( + impl From<$t> for BlockManagerError { + fn from(e: $t) -> Self { Self::Internal(e.into()) } + } + impl From<$t> for IncomingBlockError { + fn from(e: $t) -> Self { BlockManagerError::from(e).into() } + } + )*}; +} + +/// A validation failure - the peer should be banned. +#[derive(Debug, thiserror::Error)] +pub enum BlockValidationError { + /// The block was received as an alt block but already exists on the + /// main chain. + #[error("Alt block already in main chain.")] + AlreadyInMainChain, + + /// The block failed consensus validation. + #[error(transparent)] + Consensus(ExtendedConsensusError), +} + +/// An error from the blockchain manager's internal handlers. +#[derive(Debug, thiserror::Error)] +pub enum BlockManagerError { + /// The peer sent us an invalid block; ban them. + #[error(transparent)] + Validation(#[from] BlockValidationError), + + /// A node-side failure. + #[error(transparent)] + Internal(#[from] tower::BoxError), +} + +impl From for BlockManagerError { + fn from(e: ExtendedConsensusError) -> Self { + if let ExtendedConsensusError::DBErr(e) = e { + return Self::Internal(e); + } + BlockValidationError::Consensus(e).into() + } +} + +impl From for BlockManagerError { + fn from(e: ConsensusError) -> Self { + BlockValidationError::Consensus(e.into()).into() + } +} + +/// An error returned from [`BlockchainManagerHandle::handle_incoming_block`](super::interface::BlockchainManagerHandle::handle_incoming_block). +#[derive(Debug, thiserror::Error)] +pub enum IncomingBlockError { + /// Surfaced from the blockchain manager. + #[error(transparent)] + Manager(#[from] BlockManagerError), + + /// We are missing the block's parent. + #[error("The block has an unknown parent.")] + Orphan, + + /// Some transactions in the block were unknown. + /// + /// The inner values are the block hash and the indexes of the missing txs in the block. + #[error("Unknown transactions in block.")] + UnknownTransactions([u8; 32], Vec), + + /// The block claimed more transactions than it contained. + #[error("Too many transactions given for block.")] + TooManyTxs, + + /// The blockchain manager command channel is closed. + #[error("The blockchain manager command channel is closed.")] + ChannelClosed, +} + +impl From for IncomingBlockError { + fn from(e: ConsensusError) -> Self { + Self::Manager(e.into()) + } +} + +impl_internal_from!(BlockchainError, TxPoolError, TxConversionError); diff --git a/binaries/cuprated/src/blockchain/interface.rs b/binaries/cuprated/src/blockchain/interface.rs index d2dacae6b..80270b3c9 100644 --- a/binaries/cuprated/src/blockchain/interface.rs +++ b/binaries/cuprated/src/blockchain/interface.rs @@ -11,7 +11,7 @@ use monero_oxide::{block::Block, transaction::Transaction}; use tokio::sync::{mpsc, oneshot}; use tower::{Service, ServiceExt}; -use cuprate_blockchain::service::BlockchainReadHandle; +use cuprate_blockchain::{service::BlockchainReadHandle, BlockchainError}; use cuprate_consensus::transactions::new_tx_verification_data; use cuprate_txpool::service::{ interface::{TxpoolReadRequest, TxpoolReadResponse}, @@ -19,9 +19,9 @@ use cuprate_txpool::service::{ }; use cuprate_types::blockchain::{BlockchainReadRequest, BlockchainResponse}; -use crate::{ - blockchain::manager::{BlockchainManagerCommand, IncomingBlockOk}, - constants::PANIC_CRITICAL_SERVICE_ERROR, +use crate::blockchain::{ + manager::{BlockchainManagerCommand, IncomingBlockOk}, + IncomingBlockError, }; /// Handle for the blockchain manager. @@ -42,25 +42,6 @@ pub struct BlockchainManagerHandle { blocks_being_handled: Arc>>, } -/// An error that can be returned from [`BlockchainManagerHandle::handle_incoming_block`]. -#[derive(Debug, thiserror::Error)] -pub enum IncomingBlockError { - /// Some transactions in the block were unknown. - /// - /// The inner values are the block hash and the indexes of the missing txs in the block. - #[error("Unknown transactions in block.")] - UnknownTransactions([u8; 32], Vec), - /// We are missing the block's parent. - #[error("The block has an unknown parent.")] - Orphan, - /// The block was invalid. - #[error(transparent)] - InvalidBlock(anyhow::Error), - /// The blockchain manager command channel is closed. - #[error("The blockchain manager command channel is closed.")] - ChannelClosed, -} - impl BlockchainManagerHandle { /// Create a new handle and command receiver pair. pub fn new() -> (Self, mpsc::Receiver) { @@ -98,34 +79,24 @@ impl BlockchainManagerHandle { txpool_read_handle: &mut TxpoolReadHandle, ) -> Result { if given_txs.len() > block.transactions.len() { - return Err(IncomingBlockError::InvalidBlock(anyhow::anyhow!( - "Too many transactions given for block" - ))); + return Err(IncomingBlockError::TooManyTxs); } - if !block_exists(block.header.previous, blockchain_read_handle) - .await - .expect(PANIC_CRITICAL_SERVICE_ERROR) - { + if !block_exists(block.header.previous, blockchain_read_handle).await? { return Err(IncomingBlockError::Orphan); } let block_hash = block.hash(); - if block_exists(block_hash, blockchain_read_handle) - .await - .expect(PANIC_CRITICAL_SERVICE_ERROR) - { + if block_exists(block_hash, blockchain_read_handle).await? { return Ok(IncomingBlockOk::AlreadyHave); } let TxpoolReadResponse::TxsForBlock { mut txs, missing } = txpool_read_handle .ready() - .await - .expect(PANIC_CRITICAL_SERVICE_ERROR) + .await? .call(TxpoolReadRequest::TxsForBlock(block.transactions.clone())) - .await - .expect(PANIC_CRITICAL_SERVICE_ERROR) + .await? else { unreachable!() }; @@ -142,11 +113,7 @@ impl BlockchainManagerHandle { return Err(IncomingBlockError::UnknownTransactions(block_hash, missing)); }; - txs.insert( - needed_hash, - new_tx_verification_data(tx) - .map_err(|e| IncomingBlockError::InvalidBlock(e.into()))?, - ); + txs.insert(needed_hash, new_tx_verification_data(tx)?); } } @@ -185,7 +152,6 @@ impl BlockchainManagerHandle { response_rx .await .map_err(|_| IncomingBlockError::ChannelClosed)? - .map_err(IncomingBlockError::InvalidBlock) } /// Pop blocks from the top of the blockchain. @@ -211,7 +177,7 @@ impl BlockchainManagerHandle { async fn block_exists( block_hash: [u8; 32], blockchain_read_handle: &mut BlockchainReadHandle, -) -> Result { +) -> Result { let BlockchainResponse::FindBlock(chain) = blockchain_read_handle .ready() .await? diff --git a/binaries/cuprated/src/blockchain/manager.rs b/binaries/cuprated/src/blockchain/manager.rs index 6d014e529..ad13d9990 100644 --- a/binaries/cuprated/src/blockchain/manager.rs +++ b/binaries/cuprated/src/blockchain/manager.rs @@ -25,7 +25,6 @@ use cuprate_types::{ use crate::{ blockchain::{chain_service::ChainService, syncer, types::ConsensusBlockchainReadHandle}, - constants::PANIC_CRITICAL_SERVICE_ERROR, txpool::TxpoolManagerHandle, LaunchContext, }; @@ -59,15 +58,18 @@ pub(crate) async fn init_blockchain_manager( let stop_current_block_downloader = Arc::new(Notify::new()); let fast_sync_hashes = launch_ctx.config.fast_sync_hashes(); - launch_ctx.task_executor.spawn(syncer.run( - launch_ctx.blockchain.context_svc(), - ChainService(launch_ctx.blockchain.read(), fast_sync_hashes), - clearnet_interface.clone(), - batch_tx, - Arc::clone(&stop_current_block_downloader), - block_downloader_config, - shutdown_token.clone(), - )); + launch_ctx.task_executor.spawn_critical( + "blockchain syncer", + syncer.run( + launch_ctx.blockchain.context_svc(), + ChainService(launch_ctx.blockchain.read(), fast_sync_hashes), + clearnet_interface.clone(), + batch_tx, + Arc::clone(&stop_current_block_downloader), + block_downloader_config, + shutdown_token.clone(), + ), + ); let manager = BlockchainManager { blockchain_write_handle, @@ -83,9 +85,10 @@ pub(crate) async fn init_blockchain_manager( fast_sync_hashes, }; - launch_ctx - .task_executor - .spawn(manager.run(batch_rx, command_rx, shutdown_token)); + launch_ctx.task_executor.spawn_critical( + "blockchain manager", + manager.run(batch_rx, command_rx, shutdown_token), + ); Ok(()) } @@ -125,7 +128,7 @@ impl BlockchainManager { mut block_batch_rx: mpsc::Receiver<(BlockBatch, Arc)>, mut command_rx: mpsc::Receiver, shutdown_token: CancellationToken, - ) { + ) -> anyhow::Result<()> { loop { tokio::select! { biased; @@ -133,21 +136,21 @@ impl BlockchainManager { break; } Some((batch, permit)) = block_batch_rx.recv() => { - self.handle_incoming_block_batch( - batch, - ).await; + self.handle_incoming_block_batch(batch) + .await + .map_err(anyhow::Error::from_boxed)?; drop(permit); } Some(incoming_command) = command_rx.recv() => { - self.handle_command(incoming_command).await; - } - else => { - break; + self.handle_command(incoming_command) + .await + .map_err(anyhow::Error::from_boxed)?; } } } tracing::info!("Blockchain manager shut down."); + Ok(()) } } diff --git a/binaries/cuprated/src/blockchain/manager/commands.rs b/binaries/cuprated/src/blockchain/manager/commands.rs index fe9a922e3..e8ed3e8e1 100644 --- a/binaries/cuprated/src/blockchain/manager/commands.rs +++ b/binaries/cuprated/src/blockchain/manager/commands.rs @@ -6,6 +6,8 @@ use tokio::sync::oneshot; use cuprate_types::TransactionVerificationData; +use crate::blockchain::IncomingBlockError; + /// The blockchain manager commands. #[expect(clippy::large_enum_variant)] pub enum BlockchainManagerCommand { @@ -16,7 +18,7 @@ pub enum BlockchainManagerCommand { /// All the transactions defined in [`Block::transactions`]. prepped_txs: HashMap<[u8; 32], TransactionVerificationData>, /// The channel to send the response down. - response_tx: oneshot::Sender>, + response_tx: oneshot::Sender>, }, /// Pop blocks from the top of the blockchain. PopBlocks { diff --git a/binaries/cuprated/src/blockchain/manager/handler.rs b/binaries/cuprated/src/blockchain/manager/handler.rs index 6a6c06c52..c0d0e073c 100644 --- a/binaries/cuprated/src/blockchain/manager/handler.rs +++ b/binaries/cuprated/src/blockchain/manager/handler.rs @@ -27,51 +27,52 @@ use cuprate_p2p::{block_downloader::BlockBatch, constants::LONG_BAN, BroadcastRe use cuprate_txpool::service::interface::TxpoolWriteRequest; use cuprate_types::{ blockchain::{BlockchainReadRequest, BlockchainResponse, BlockchainWriteRequest}, - AltBlockInformation, Chain, ChainId, HardFork, TransactionVerificationData, + AltBlockInformation, Chain, ChainId, HardFork, TransactionVerificationData, TxConversionError, VerifiedBlockInformation, VerifiedTransactionInformation, }; -use crate::{ - blockchain::manager::commands::{BlockchainManagerCommand, IncomingBlockOk}, - constants::PANIC_CRITICAL_SERVICE_ERROR, +use crate::blockchain::{ + manager::commands::{BlockchainManagerCommand, IncomingBlockOk}, + BlockManagerError, BlockValidationError, }; impl super::BlockchainManager { /// Handle an incoming command from another part of Cuprate. /// - /// # Panics + /// # Errors /// - /// This function will panic if any internal service returns an unexpected error that we cannot - /// recover from. - pub async fn handle_command(&mut self, command: BlockchainManagerCommand) { + /// This function will return an [`Err`] if any internal service returns an unexpected error. + pub async fn handle_command( + &mut self, + command: BlockchainManagerCommand, + ) -> Result<(), tower::BoxError> { match command { BlockchainManagerCommand::AddBlock { block, prepped_txs, response_tx, - } => { - let res = self.handle_incoming_block(block, prepped_txs).await; - - drop(response_tx.send(res)); - } + } => match self.handle_incoming_block(block, prepped_txs).await { + Err(BlockManagerError::Internal(e)) => return Err(e), + res => { + let _ = response_tx.send(res.map_err(Into::into)); + } + }, BlockchainManagerCommand::PopBlocks { numb_blocks, response_tx, } => { let reorg_lock = Arc::clone(&self.reorg_lock); let _guard = reorg_lock.write().await; - self.pop_blocks(numb_blocks).await; + self.pop_blocks(numb_blocks).await?; self.blockchain_write_handle .ready() - .await - .expect(PANIC_CRITICAL_SERVICE_ERROR) + .await? .call(BlockchainWriteRequest::FlushAltBlocks) - .await - .expect(PANIC_CRITICAL_SERVICE_ERROR); - #[expect(clippy::let_underscore_must_use)] + .await?; let _ = response_tx.send(()); } } + Ok(()) } /// Broadcast a valid block to the network. @@ -95,9 +96,9 @@ impl super::BlockchainManager { /// /// Otherwise, this function will validate and add the block to the main chain. /// - /// # Panics + /// # Errors /// - /// This function will panic if any internal service returns an unexpected error that we cannot + /// This function will return an [`Err`] if any internal service returns an unexpected error that we cannot /// recover from. #[instrument( name = "incoming_block", @@ -112,7 +113,7 @@ impl super::BlockchainManager { &mut self, block: Block, prepared_txs: HashMap<[u8; 32], TransactionVerificationData>, - ) -> Result { + ) -> Result { if block.header.previous != self .blockchain_context_service @@ -149,7 +150,7 @@ impl super::BlockchainManager { .await?; self.add_valid_block_to_main_chain(verified_block, BlockSource::Incoming) - .await; + .await?; info!( hash = hex::encode( @@ -168,10 +169,14 @@ impl super::BlockchainManager { /// This function will route to [`Self::handle_incoming_block_batch_main_chain`] or [`Self::handle_incoming_block_batch_alt_chain`] /// depending on if the first block in the batch follows from the top of our chain. /// + /// # Errors + /// + /// This function will return an [`Err`] if any internal service returns an unexpected + /// error that we cannot recover from. + /// /// # Panics /// - /// This function will panic if the batch is empty or if any internal service returns an unexpected - /// error that we cannot recover from or if the incoming batch contains no blocks. + /// This function will panic if the incoming batch contains no blocks. #[instrument( name = "incoming_block_batch", skip_all, @@ -181,7 +186,10 @@ impl super::BlockchainManager { len = batch.blocks.len() ) )] - pub async fn handle_incoming_block_batch(&mut self, batch: BlockBatch) { + pub async fn handle_incoming_block_batch( + &mut self, + batch: BlockBatch, + ) -> Result<(), tower::BoxError> { let (first_block, _) = batch .blocks .first() @@ -193,9 +201,9 @@ impl super::BlockchainManager { .blockchain_context() .top_hash { - self.handle_incoming_block_batch_main_chain(batch).await; + self.handle_incoming_block_batch_main_chain(batch).await } else { - self.handle_incoming_block_batch_alt_chain(batch).await; + self.handle_incoming_block_batch_alt_chain(batch).await } } @@ -207,26 +215,42 @@ impl super::BlockchainManager { /// This function will also handle banning the peer and canceling the block downloader if the /// block is invalid. /// + /// # Errors + /// + /// This function will return an [`Err`] if any internal service returns an unexpected error that we cannot + /// recover from. + /// /// # Panics /// - /// This function will panic if any internal service returns an unexpected error that we cannot - /// recover from or if the incoming batch contains no blocks. - async fn handle_incoming_block_batch_main_chain(&mut self, batch: BlockBatch) { - if batch.blocks.last().unwrap().0.number() < fast_sync_stop_height(self.fast_sync_hashes) { - self.handle_incoming_block_batch_fast_sync(batch).await; - return; + /// This function will panic if the incoming batch contains no blocks. + async fn handle_incoming_block_batch_main_chain( + &mut self, + batch: BlockBatch, + ) -> Result<(), tower::BoxError> { + let (last_block, _) = batch + .blocks + .last() + .expect("Block batch should not be empty"); + + if last_block.number() < fast_sync_stop_height(self.fast_sync_hashes) { + return self.handle_incoming_block_batch_fast_sync(batch).await; } - let Ok((prepped_blocks, mut output_cache)) = batch_prepare_main_chain_blocks( + let (prepped_blocks, mut output_cache) = match batch_prepare_main_chain_blocks( batch.blocks, &mut self.blockchain_context_service, self.blockchain_read_handle.clone(), ) .await - else { - batch.peer_handle.ban_peer(LONG_BAN); - self.stop_current_block_downloader.notify_waiters(); - return; + .map_err(BlockManagerError::from) + { + Ok(v) => v, + Err(BlockManagerError::Internal(e)) => return Err(e), + Err(BlockManagerError::Validation(_)) => { + batch.peer_handle.ban_peer(LONG_BAN); + self.stop_current_block_downloader.notify_waiters(); + return Ok(()); + } }; for (block, txs) in prepped_blocks { @@ -239,9 +263,11 @@ impl super::BlockchainManager { Some(&mut output_cache), ) .await + .map_err(BlockManagerError::from) { Ok(block) => block, - Err(e) => { + Err(BlockManagerError::Internal(e)) => return Err(e), + Err(BlockManagerError::Validation(e)) => { warn!( "Failed to verify block: {}, error {}, banning peer.", hex::encode(hash), @@ -249,23 +275,27 @@ impl super::BlockchainManager { ); batch.peer_handle.ban_peer(LONG_BAN); self.stop_current_block_downloader.notify_waiters(); - return; + return Ok(()); } }; self.add_valid_block_to_main_chain(verified_block, BlockSource::BatchSync) - .await; + .await?; } info!(fast_sync = false, "Successfully added block batch"); + Ok(()) } /// Handles an incoming block batch while we are under the fast sync height. /// - /// # Panics + /// # Errors /// - /// This function will panic if any internal service returns an unexpected error that we cannot + /// This function will return an [`Err`] if any internal service returns an unexpected error that we cannot /// recover from. - async fn handle_incoming_block_batch_fast_sync(&mut self, batch: BlockBatch) { + async fn handle_incoming_block_batch_fast_sync( + &mut self, + batch: BlockBatch, + ) -> Result<(), tower::BoxError> { let mut valid_blocks = Vec::with_capacity(batch.blocks.len()); for (block, txs) in batch.blocks { let block = block_to_verified_block_information( @@ -273,15 +303,16 @@ impl super::BlockchainManager { txs, self.blockchain_context_service.blockchain_context(), ); - self.add_valid_block_to_blockchain_cache(&block).await; + self.add_valid_block_to_blockchain_cache(&block).await?; valid_blocks.push(block); } self.batch_add_valid_block_to_blockchain_database(valid_blocks) - .await; + .await?; info!(fast_sync = true, "Successfully added block batch"); + Ok(()) } /// Handles an incoming [`BlockBatch`] that does not follow the main-chain. @@ -292,11 +323,14 @@ impl super::BlockchainManager { /// This function will also handle banning the peer and canceling the block downloader if the /// alt block is invalid or if a reorg fails. /// - /// # Panics + /// # Errors /// - /// This function will panic if any internal service returns an unexpected error that we cannot + /// This function will return an [`Err`] if any internal service returns an unexpected error that we cannot /// recover from. - async fn handle_incoming_block_batch_alt_chain(&mut self, mut batch: BlockBatch) { + async fn handle_incoming_block_batch_alt_chain( + &mut self, + mut batch: BlockBatch, + ) -> Result<(), tower::BoxError> { let mut blocks = batch.blocks.into_iter(); while let Some((block, txs)) = blocks.next() { @@ -310,16 +344,17 @@ impl super::BlockchainManager { let tx = new_tx_verification_data(tx)?; Ok((tx.tx_hash, tx)) }) - .collect::>()?; + .collect::>()?; let reorged = self.handle_incoming_alt_block(block, txs).await?; - Ok::<_, anyhow::Error>(reorged) + Ok::<_, BlockManagerError>(reorged) } .await; match res { - Err(e) => { + Err(BlockManagerError::Internal(e)) => return Err(e), + Err(BlockManagerError::Validation(e)) => { warn!( "Failed to verify block: {}, error {}, banning peer.", hex::encode(hash), @@ -327,18 +362,17 @@ impl super::BlockchainManager { ); batch.peer_handle.ban_peer(LONG_BAN); self.stop_current_block_downloader.notify_waiters(); - return; + return Ok(()); } Ok(AddAltBlock::Reorged) => { // Collect the remaining blocks and add them to the main chain instead. batch.blocks = blocks.collect(); if batch.blocks.is_empty() { - return; + return Ok(()); } - self.handle_incoming_block_batch_main_chain(batch).await; - return; + return self.handle_incoming_block_batch_main_chain(batch).await; } // continue adding alt blocks. Ok(AddAltBlock::NewlyCached(_) | AddAltBlock::AlreadyCached) => (), @@ -346,6 +380,7 @@ impl super::BlockchainManager { } info!(alt_chain = true, "Successfully added block batch"); + Ok(()) } /// Handles an incoming alt [`Block`]. @@ -359,32 +394,26 @@ impl super::BlockchainManager { /// This will return an [`Err`] if: /// - The alt block was invalid. /// - An attempt to reorg the chain failed. - /// - /// # Panics - /// - /// This function will panic if any internal service returns an unexpected error that we cannot - /// recover from. + /// - Any internal service returns an unexpected error. async fn handle_incoming_alt_block( &mut self, block: Block, prepared_txs: HashMap<[u8; 32], TransactionVerificationData>, - ) -> Result { + ) -> Result { // Check if a block already exists. let BlockchainResponse::FindBlock(chain) = self .blockchain_read_handle .ready() - .await - .expect(PANIC_CRITICAL_SERVICE_ERROR) + .await? .call(BlockchainReadRequest::FindBlock(block.hash())) - .await - .expect(PANIC_CRITICAL_SERVICE_ERROR) + .await? else { unreachable!(); }; match chain { Some((Chain::Alt(_), _)) => return Ok(AddAltBlock::AlreadyCached), - Some((Chain::Main, _)) => anyhow::bail!("Alt block already in main chain"), + Some((Chain::Main, _)) => return Err(BlockValidationError::AlreadyInMainChain.into()), None => (), } @@ -406,8 +435,7 @@ impl super::BlockchainManager { let block_blob = Bytes::copy_from_slice(&alt_block_info.block_blob); self.blockchain_write_handle .ready() - .await - .expect(PANIC_CRITICAL_SERVICE_ERROR) + .await? .call(BlockchainWriteRequest::WriteAltBlock(alt_block_info)) .await?; @@ -422,31 +450,25 @@ impl super::BlockchainManager { /// /// # Errors /// - /// This function will return an [`Err`] if the re-org was unsuccessful, if this happens the chain - /// will be returned back into its state it was at when then function was called. - /// - /// # Panics - /// - /// This function will panic if any internal service returns an unexpected error that we cannot - /// recover from. + /// This function will return an [`Err`] if any internal service returns an unexpected error, + /// or if the re-org was unsuccessful. If this happens the chain + /// will be returned to the state it was in when the function was called. #[instrument(name = "try_do_reorg", skip_all, level = "info")] async fn try_do_reorg( &mut self, top_alt_block: AltBlockInformation, - ) -> Result<(), anyhow::Error> { + ) -> Result<(), BlockManagerError> { let reorg_lock = Arc::clone(&self.reorg_lock); let _guard = reorg_lock.write().await; let BlockchainResponse::AltBlocksInChain(mut alt_blocks) = self .blockchain_read_handle .ready() - .await - .expect(PANIC_CRITICAL_SERVICE_ERROR) + .await? .call(BlockchainReadRequest::AltBlocksInChain( top_alt_block.chain_id, )) - .await - .map_err(|e| anyhow::anyhow!(e))? + .await? else { unreachable!(); }; @@ -463,7 +485,7 @@ impl super::BlockchainManager { let old_main_chain_id = self .pop_blocks(current_main_chain_height - split_height) - .await; + .await?; let reorg_res = self.verify_add_alt_blocks_to_main_chain(alt_blocks).await; @@ -480,7 +502,7 @@ impl super::BlockchainManager { Ok(()) } Err(e) => { - self.reverse_reorg(old_main_chain_id).await; + self.reverse_reorg(old_main_chain_id).await?; Err(e) } } @@ -491,22 +513,20 @@ impl super::BlockchainManager { /// This function takes the old chain's [`ChainId`] and reverts the chain state to back to before /// the reorg was attempted. /// - /// # Panics + /// # Errors /// - /// This function will panic if any internal service returns an unexpected error that we cannot + /// This function will return an [`Err`] if any internal service returns an unexpected error that we cannot /// recover from. #[instrument(name = "reverse_reorg", skip_all, level = "info")] - async fn reverse_reorg(&mut self, old_main_chain_id: ChainId) { + async fn reverse_reorg(&mut self, old_main_chain_id: ChainId) -> Result<(), tower::BoxError> { warn!("Reorg failed, reverting to old chain."); let BlockchainResponse::AltBlocksInChain(mut blocks) = self .blockchain_read_handle .ready() - .await - .expect(PANIC_CRITICAL_SERVICE_ERROR) + .await? .call(BlockchainReadRequest::AltBlocksInChain(old_main_chain_id)) - .await - .expect(PANIC_CRITICAL_SERVICE_ERROR) + .await? else { unreachable!(); }; @@ -521,7 +541,7 @@ impl super::BlockchainManager { if numb_blocks > 0 { self.pop_blocks(current_main_chain_height - split_height) - .await; + .await?; } for block in blocks { @@ -530,51 +550,46 @@ impl super::BlockchainManager { self.blockchain_context_service.blockchain_context(), ); self.add_valid_block_to_main_chain(verified_block, BlockSource::Reorg) - .await; + .await?; } self.blockchain_write_handle .ready() - .await - .expect(PANIC_CRITICAL_SERVICE_ERROR) + .await? .call(BlockchainWriteRequest::FlushAltBlocks) - .await - .expect(PANIC_CRITICAL_SERVICE_ERROR); + .await?; info!("Successfully reversed reorg"); + Ok(()) } /// Pop blocks from the main chain, moving them to alt-blocks. This function will flush all other alt-blocks. /// /// This returns the [`ChainId`] of the blocks that were popped. /// - /// # Panics + /// # Errors /// - /// This function will panic if any internal service returns an unexpected error that we cannot + /// This function will return an [`Err`] if any internal service returns an unexpected error that we cannot /// recover from. #[instrument(name = "pop_blocks", skip(self), level = "info")] - async fn pop_blocks(&mut self, numb_blocks: usize) -> ChainId { + async fn pop_blocks(&mut self, numb_blocks: usize) -> Result { let BlockchainResponse::PopBlocks(old_main_chain_id) = self .blockchain_write_handle .ready() - .await - .expect(PANIC_CRITICAL_SERVICE_ERROR) + .await? .call(BlockchainWriteRequest::PopBlocks(numb_blocks)) - .await - .expect(PANIC_CRITICAL_SERVICE_ERROR) + .await? else { unreachable!(); }; self.blockchain_context_service .ready() - .await - .expect(PANIC_CRITICAL_SERVICE_ERROR) + .await? .call(BlockChainContextRequest::PopBlocks { numb_blocks }) - .await - .expect(PANIC_CRITICAL_SERVICE_ERROR); + .await?; - old_main_chain_id + Ok(old_main_chain_id) } /// Verify and add a list of [`AltBlockInformation`]s to the main-chain. @@ -585,23 +600,19 @@ impl super::BlockchainManager { /// /// # Errors /// - /// This function will return an [`Err`] if the alt-blocks were invalid, in this case the re-org should - /// be aborted and the chain should be returned to its previous state. - /// - /// # Panics - /// - /// This function will panic if any internal service returns an unexpected error that we cannot - /// recover from. + /// This function will return an [`Err`] if any internal service returns an unexpected error, + /// or if the alt-blocks are invalid. In this case the re-org should be aborted and the chain + /// returned to its previous state. async fn verify_add_alt_blocks_to_main_chain( &mut self, alt_blocks: Vec, - ) -> Result<(), anyhow::Error> { + ) -> Result<(), BlockManagerError> { for mut alt_block in alt_blocks { let prepped_txs = alt_block .txs .drain(..) - .map(|tx| Ok(tx.try_into()?)) - .collect::>()?; + .map(TryInto::try_into) + .collect::>()?; let prepped_block = PreparedBlock::new_alt_block(alt_block)?; @@ -615,7 +626,7 @@ impl super::BlockchainManager { .await?; self.add_valid_block_to_main_chain(verified_block, BlockSource::Reorg) - .await; + .await?; } Ok(()) @@ -626,15 +637,15 @@ impl super::BlockchainManager { /// This function will update the blockchain database and the context cache, /// and announce the block to peers if `source` is [`BlockSource::Incoming`]. /// - /// # Panics + /// # Errors /// - /// This function will panic if any internal service returns an unexpected error that we cannot + /// This function will return an [`Err`] if any internal service returns an unexpected error that we cannot /// recover from. async fn add_valid_block_to_main_chain( &mut self, verified_block: VerifiedBlockInformation, source: BlockSource, - ) { + ) -> Result<(), tower::BoxError> { // FIXME: this is pretty inefficient, we should probably return the KI map created in the consensus crate. let spent_key_images = verified_block .txs @@ -651,10 +662,10 @@ impl super::BlockchainManager { .then(|| Bytes::copy_from_slice(&verified_block.block_blob)); self.add_valid_block_to_blockchain_cache(&verified_block) - .await; + .await?; self.add_valid_block_to_blockchain_database(verified_block) - .await; + .await?; if let Some(block_blob) = block_blob { let chain_height = self @@ -667,24 +678,24 @@ impl super::BlockchainManager { self.txpool_manager_handle .new_block(spent_key_images) - .await - .expect(PANIC_CRITICAL_SERVICE_ERROR); + .await?; + + Ok(()) } /// Adds a [`VerifiedBlockInformation`] to the blockchain context cache. /// - /// # Panics + /// # Errors /// - /// This function will panic if any internal service returns an unexpected error that we cannot + /// This function will return an [`Err`] if any internal service returns an unexpected error that we cannot /// recover from. async fn add_valid_block_to_blockchain_cache( &mut self, verified_block: &VerifiedBlockInformation, - ) { + ) -> Result<(), tower::BoxError> { self.blockchain_context_service .ready() - .await - .expect(PANIC_CRITICAL_SERVICE_ERROR) + .await? .call(BlockChainContextRequest::Update(NewBlockData { block_hash: verified_block.block_hash, height: verified_block.height, @@ -695,48 +706,46 @@ impl super::BlockchainManager { vote: HardFork::from_vote(verified_block.block.header.hardfork_signal), cumulative_difficulty: verified_block.cumulative_difficulty, })) - .await - .expect(PANIC_CRITICAL_SERVICE_ERROR); + .await?; + Ok(()) } /// Writes a [`VerifiedBlockInformation`] to the blockchain database. /// - /// # Panics + /// # Errors /// - /// This function will panic if any internal service returns an unexpected error that we cannot + /// This function will return an [`Err`] if any internal service returns an unexpected error that we cannot /// recover from. async fn add_valid_block_to_blockchain_database( &mut self, verified_block: VerifiedBlockInformation, - ) { + ) -> Result<(), tower::BoxError> { self.blockchain_write_handle .ready() - .await - .expect(PANIC_CRITICAL_SERVICE_ERROR) + .await? .call(BlockchainWriteRequest::WriteBlock(verified_block)) - .await - .expect(PANIC_CRITICAL_SERVICE_ERROR); + .await?; + Ok(()) } /// Batch writes the [`VerifiedBlockInformation`]s to the database. /// /// The blocks must be sequential. /// - /// # Panics + /// # Errors /// - /// This function will panic if any internal service returns an unexpected error that we cannot + /// This function will return an [`Err`] if any internal service returns an unexpected error that we cannot /// recover from. async fn batch_add_valid_block_to_blockchain_database( &mut self, blocks: Vec, - ) { + ) -> Result<(), tower::BoxError> { self.blockchain_write_handle .ready() - .await - .expect(PANIC_CRITICAL_SERVICE_ERROR) + .await? .call(BlockchainWriteRequest::BatchWriteBlocks(blocks)) - .await - .expect(PANIC_CRITICAL_SERVICE_ERROR); + .await?; + Ok(()) } } diff --git a/binaries/cuprated/src/blockchain/manager/tests.rs b/binaries/cuprated/src/blockchain/manager/tests.rs index 6f6727ec7..e8f446e9b 100644 --- a/binaries/cuprated/src/blockchain/manager/tests.rs +++ b/binaries/cuprated/src/blockchain/manager/tests.rs @@ -50,7 +50,8 @@ async fn mock_manager(data_dir: PathBuf) -> BlockchainManager { &mut blockchain_write_handle, Network::Mainnet, ) - .await; + .await + .unwrap(); let mut context_config = ContextConfig::main_net(); context_config.difficulty_cfg.fixed_difficulty = Some(1); @@ -129,7 +130,8 @@ async fn simple_reorg() { prepped_txs: HashMap::new(), response_tx: oneshot::channel().0, }) - .await; + .await + .unwrap(); manager_2 .handle_command(BlockchainManagerCommand::AddBlock { @@ -137,7 +139,8 @@ async fn simple_reorg() { prepped_txs: HashMap::new(), response_tx: oneshot::channel().0, }) - .await; + .await + .unwrap(); assert_eq!( manager_1.blockchain_context_service.blockchain_context(), @@ -154,7 +157,8 @@ async fn simple_reorg() { prepped_txs: HashMap::new(), response_tx: oneshot::channel().0, }) - .await; + .await + .unwrap(); manager_2 .handle_command(BlockchainManagerCommand::AddBlock { @@ -162,7 +166,8 @@ async fn simple_reorg() { prepped_txs: HashMap::new(), response_tx: oneshot::channel().0, }) - .await; + .await + .unwrap(); let manager_1_context = manager_1 .blockchain_context_service @@ -181,7 +186,8 @@ async fn simple_reorg() { prepped_txs: HashMap::new(), response_tx: oneshot::channel().0, }) - .await; + .await + .unwrap(); // make sure this didn't change the context assert_eq!( &manager_1_context, @@ -197,7 +203,8 @@ async fn simple_reorg() { prepped_txs: HashMap::new(), response_tx: oneshot::channel().0, }) - .await; + .await + .unwrap(); manager_2 .handle_command(BlockchainManagerCommand::AddBlock { @@ -205,7 +212,8 @@ async fn simple_reorg() { prepped_txs: HashMap::new(), response_tx: oneshot::channel().0, }) - .await; + .await + .unwrap(); // make sure manager 1 reorged. assert_eq!( @@ -242,7 +250,8 @@ async fn simple_reorg_block_batch() { size: 0, peer_handle: handle.1.clone(), }) - .await; + .await + .unwrap(); manager_2 .handle_incoming_block_batch(BlockBatch { @@ -250,7 +259,8 @@ async fn simple_reorg_block_batch() { size: 0, peer_handle: handle.1.clone(), }) - .await; + .await + .unwrap(); assert_eq!( manager_1.blockchain_context_service.blockchain_context(), @@ -267,7 +277,8 @@ async fn simple_reorg_block_batch() { size: 0, peer_handle: handle.1.clone(), }) - .await; + .await + .unwrap(); manager_2 .handle_incoming_block_batch(BlockBatch { @@ -275,7 +286,8 @@ async fn simple_reorg_block_batch() { size: 0, peer_handle: handle.1.clone(), }) - .await; + .await + .unwrap(); let manager_1_context = manager_1 .blockchain_context_service @@ -294,7 +306,8 @@ async fn simple_reorg_block_batch() { size: 0, peer_handle: handle.1.clone(), }) - .await; + .await + .unwrap(); // make sure this didn't change the context assert_eq!( &manager_1_context, @@ -310,7 +323,8 @@ async fn simple_reorg_block_batch() { size: 0, peer_handle: handle.1.clone(), }) - .await; + .await + .unwrap(); manager_2 .handle_incoming_block_batch(BlockBatch { @@ -318,7 +332,8 @@ async fn simple_reorg_block_batch() { size: 0, peer_handle: handle.1.clone(), }) - .await; + .await + .unwrap(); // make sure manager 1 reorged. assert_eq!( @@ -353,7 +368,8 @@ async fn recover_bad_reorg() { prepped_txs: HashMap::new(), response_tx: oneshot::channel().0, }) - .await; + .await + .unwrap(); let context_2 = manager_1 .blockchain_context_service @@ -368,7 +384,8 @@ async fn recover_bad_reorg() { prepped_txs: HashMap::new(), response_tx: oneshot::channel().0, }) - .await; + .await + .unwrap(); // Save this context for later to check the reorg gets reversed correctly. let context = manager_1 @@ -385,7 +402,8 @@ async fn recover_bad_reorg() { prepped_txs: HashMap::new(), response_tx: oneshot::channel().0, }) - .await; + .await + .unwrap(); // This tx is invalid and will make the reorg fail. let tx = Transaction::V2 { @@ -418,7 +436,8 @@ async fn recover_bad_reorg() { prepped_txs: HashMap::from([(tx.tx_hash, tx)]), response_tx: oneshot::channel().0, }) - .await; + .await + .unwrap(); let mut block_3_alt = generate_block(manager_1.blockchain_context_service.blockchain_context()); block_3_alt.header.previous = block_2_alt.hash(); @@ -434,7 +453,8 @@ async fn recover_bad_reorg() { prepped_txs: HashMap::new(), response_tx: oneshot::channel().0, }) - .await; + .await + .unwrap(); // make sure the reorg failed. assert_eq!( diff --git a/binaries/cuprated/src/constants.rs b/binaries/cuprated/src/constants.rs index 0374371fb..88607459e 100644 --- a/binaries/cuprated/src/constants.rs +++ b/binaries/cuprated/src/constants.rs @@ -20,8 +20,8 @@ pub const PATCH_VERSION: &str = env!("CARGO_PKG_VERSION_PATCH"); /// If a debug build, the suffix is `-debug`, else it is `-release`. pub const VERSION_BUILD: &str = formatcp!("{VERSION}-{}", cuprate_constants::build::BUILD); -/// The panic message used when cuprated encounters a critical service error. -pub const PANIC_CRITICAL_SERVICE_ERROR: &str = +/// The message used when cuprated encounters a critical service error. +pub const CRITICAL_SERVICE_ERROR: &str = "A service critical to Cuprate's function returned an unexpected error."; pub const DEFAULT_CONFIG_WARNING: &str = formatcp!( diff --git a/binaries/cuprated/src/lib.rs b/binaries/cuprated/src/lib.rs index 91588c157..b8522b8db 100644 --- a/binaries/cuprated/src/lib.rs +++ b/binaries/cuprated/src/lib.rs @@ -128,7 +128,7 @@ pub struct Node { impl Drop for Node { fn drop(&mut self) { - self.task_executor.trigger_shutdown(); + self.shutdown(); } } @@ -149,151 +149,161 @@ impl Node { /// or `target_max_memory` is unresolved. pub async fn launch(config: impl Into>) -> Result { let config: Arc = config.into(); + let task_executor = TaskExecutor::new(); + let shutdown_token = task_executor.cancellation_token(); + + let node: Result = async move { + // Initialize the database thread pool. + let db_thread_pool = Arc::new( + rayon::ThreadPoolBuilder::new() + .num_threads(config.storage.reader_threads) + .build() + .context("failed to build rayon database thread pool")?, + ); - // Initialize the database thread pool. - let db_thread_pool = Arc::new( - rayon::ThreadPoolBuilder::new() - .num_threads(config.storage.reader_threads) - .build() - .context("failed to build rayon database thread pool")?, - ); - - // Start the blockchain & tx-pool databases. - let fjall_db = fjall::Database::builder(config.fjall_directory()) - .cache_size(config.fjall_cache_size()) - .open() - .context(DATABASE_CORRUPT_MSG)?; - - let (mut blockchain_read_handle, mut blockchain_write_handle, _) = - cuprate_blockchain::service::init_with_pool( - &config.blockchain_config(), - fjall_db.clone(), - Arc::clone(&db_thread_pool), - ) - .context(DATABASE_CORRUPT_MSG)?; + // Start the blockchain & tx-pool databases. + let fjall_db = fjall::Database::builder(config.fjall_directory()) + .cache_size(config.fjall_cache_size()) + .open() + .context(DATABASE_CORRUPT_MSG)?; - let (txpool_read_handle, txpool_write_handle) = - cuprate_txpool::service::init_with_pool(fjall_db, db_thread_pool) + let (mut blockchain_read_handle, mut blockchain_write_handle, _) = + cuprate_blockchain::service::init_with_pool( + &config.blockchain_config(), + fjall_db.clone(), + Arc::clone(&db_thread_pool), + ) .context(DATABASE_CORRUPT_MSG)?; - // TODO: Add an argument/option for keeping alt blocks between restart. - blockchain_write_handle - .ready() - .await? - .call(BlockchainWriteRequest::FlushAltBlocks) + let (txpool_read_handle, txpool_write_handle) = + cuprate_txpool::service::init_with_pool(fjall_db, db_thread_pool) + .context(DATABASE_CORRUPT_MSG)?; + + // TODO: Add an argument/option for keeping alt blocks between restart. + blockchain_write_handle + .ready() + .await? + .call(BlockchainWriteRequest::FlushAltBlocks) + .await?; + + // Check add the genesis block to the blockchain. + blockchain::check_add_genesis( + &mut blockchain_read_handle, + &mut blockchain_write_handle, + config.network(), + ) .await?; - // Check add the genesis block to the blockchain. - blockchain::check_add_genesis( - &mut blockchain_read_handle, - &mut blockchain_write_handle, - config.network(), - ) - .await; + // Start the context service and the block/tx verifier. + let context_svc = + blockchain::init_consensus(blockchain_read_handle.clone(), config.context_config()) + .await + .map_err(anyhow::Error::from_boxed)?; - // Start the context service and the block/tx verifier. - let context_svc = - blockchain::init_consensus(blockchain_read_handle.clone(), config.context_config()) - .await - .map_err(anyhow::Error::from_boxed)?; - - // Create the syncer and handle. - let (syncer, syncer_handle) = Syncer::new(); - - // Create the blockchain manager handle and command receiver. - let (blockchain_manager_handle, command_rx) = BlockchainManagerHandle::new(); - - // Create the blockchain interface. - let blockchain_interface = BlockchainInterface::new( - blockchain_read_handle.clone(), - context_svc.clone(), - blockchain_manager_handle.clone(), - ); - - // Create the launch context. - let launch_ctx = LaunchContext { - config, - reorg_lock: Arc::new(RwLock::new(())), - blockchain: blockchain_interface, - txpool_read: txpool_read_handle.clone(), - syncer: syncer_handle, - task_executor: TaskExecutor::new(), - }; - - // Bootstrap or configure Tor if enabled. - let tor_enabled = launch_ctx.config.p2p.tor_net.enabled; - let tor_context = initialize_tor_if_enabled(&launch_ctx).await; - - // Start clearnet P2P zone - let (clearnet_interface, clearnet_tx_handler_subscriber) = - p2p::initialize_clearnet_p2p(&launch_ctx, &tor_context).await; - - // Create Tor router delivery channel. - let (tor_router_tx, tor_router_rx) = tor_enabled.then(oneshot::channel).unzip(); - - // Create the incoming tx handler service. - let tx_handler = IncomingTxHandler::init( - &launch_ctx, - clearnet_interface.clone(), - tor_router_rx, - txpool_write_handle.clone(), - ) - .await; + // Create the syncer and handle. + let (syncer, syncer_handle) = Syncer::new(); - // Send tx handler sender to clearnet zone - if clearnet_tx_handler_subscriber - .send(tx_handler.clone()) - .is_err() - { - unreachable!() - } + // Create the blockchain manager handle and command receiver. + let (blockchain_manager_handle, command_rx) = BlockchainManagerHandle::new(); - // Tor interface channel - populated when Tor starts after sync. - let (tor_tx, tor_rx) = oneshot::channel(); - - // Initialize the blockchain manager. - blockchain::init_blockchain_manager( - &launch_ctx, - clearnet_interface.clone(), - blockchain_write_handle, - tx_handler.txpool_manager.clone(), - syncer, - command_rx, - ) - .await?; - - // Initialize the RPC server(s). - rpc::init_rpc_servers(&launch_ctx, tx_handler.clone()); - - // Start Tor P2P zone after sync completes. - if tor_enabled { - p2p::initialize_tor_p2p( - launch_ctx.clone(), - tor_context, - tx_handler, - tor_tx, - tor_router_tx, + // Create the blockchain interface. + let blockchain_interface = BlockchainInterface::new( + blockchain_read_handle.clone(), + context_svc.clone(), + blockchain_manager_handle.clone(), ); + + // Create the launch context. + let launch_ctx = LaunchContext { + config, + reorg_lock: Arc::new(RwLock::new(())), + blockchain: blockchain_interface, + txpool_read: txpool_read_handle.clone(), + syncer: syncer_handle, + task_executor, + }; + + // Bootstrap or configure Tor if enabled. + let tor_enabled = launch_ctx.config.p2p.tor_net.enabled; + let tor_context = initialize_tor_if_enabled(&launch_ctx).await; + + // Start clearnet P2P zone + let (clearnet_interface, clearnet_tx_handler_subscriber) = + p2p::initialize_clearnet_p2p(&launch_ctx, &tor_context).await?; + + // Create Tor router delivery channel. + let (tor_router_tx, tor_router_rx) = tor_enabled.then(oneshot::channel).unzip(); + + // Create the incoming tx handler service. + let tx_handler = IncomingTxHandler::init( + &launch_ctx, + clearnet_interface.clone(), + tor_router_rx, + txpool_write_handle.clone(), + ) + .await?; + + // Send tx handler sender to clearnet zone + if clearnet_tx_handler_subscriber + .send(tx_handler.clone()) + .is_err() + { + unreachable!() + } + + // Tor interface channel - populated when Tor starts after sync. + let (tor_tx, tor_rx) = oneshot::channel(); + + // Initialize the blockchain manager. + blockchain::init_blockchain_manager( + &launch_ctx, + clearnet_interface.clone(), + blockchain_write_handle, + tx_handler.txpool_manager.clone(), + syncer, + command_rx, + ) + .await?; + + // Initialize the RPC server(s). + rpc::init_rpc_servers(&launch_ctx, tx_handler.clone())?; + + // Start Tor P2P zone after sync completes. + if tor_enabled { + p2p::initialize_tor_p2p( + launch_ctx.clone(), + tor_context, + tx_handler, + tor_tx, + tor_router_tx, + ); + } + + let LaunchContext { + blockchain, + txpool_read, + syncer, + config, + task_executor, + .. + } = launch_ctx; + + Ok(Self { + blockchain, + txpool: txpool_read, + clearnet: clearnet_interface, + tor: if tor_enabled { Some(tor_rx) } else { None }, + syncer, + config, + task_executor, + }) } + .await; - let LaunchContext { - blockchain, - txpool_read, - syncer, - config, - task_executor, - .. - } = launch_ctx; - - Ok(Self { - blockchain, - txpool: txpool_read, - clearnet: clearnet_interface, - tor: if tor_enabled { Some(tor_rx) } else { None }, - syncer, - config, - task_executor, - }) + if node.is_err() { + shutdown_token.cancel(); + } + node } /// Trigger a graceful shutdown. diff --git a/binaries/cuprated/src/monitor.rs b/binaries/cuprated/src/monitor.rs index 75849d104..eccb70140 100644 --- a/binaries/cuprated/src/monitor.rs +++ b/binaries/cuprated/src/monitor.rs @@ -1,10 +1,13 @@ //! Task spawning and shutdown coordination. -use std::future::Future; +use std::{future::Future, panic::AssertUnwindSafe}; +use futures::FutureExt; use tokio::task::JoinHandle; use tokio_util::{sync::CancellationToken, task::TaskTracker}; -use tracing::info; +use tracing::{error, info}; + +use crate::constants::CRITICAL_SERVICE_ERROR; /// A handle for task spawning and shutdown coordination. #[derive(Clone, Default)] @@ -28,6 +31,38 @@ impl TaskExecutor { self.tracker.spawn(future) } + /// Spawn a tracked task that triggers shutdown if the future returns + /// early or panics. + pub fn spawn_critical(&self, name: &'static str, future: F) -> JoinHandle<()> + where + F: Future> + Send + 'static, + E: Into + Send + 'static, + { + let executor = self.clone(); + self.tracker + .spawn(AssertUnwindSafe(future).catch_unwind().map(move |result| { + match result { + Ok(Ok(())) => { + if executor.token.is_cancelled() { + // Node is shutting down, so an early exit is expected + return; + } + error!( + subsystem = name, + "critical task exited before shutdown was requested" + ); + } + Ok(Err(e)) => error!(subsystem = name, "{:#}", e.into()), + Err(payload) => error!( + subsystem = name, + err = panic_message(&payload), + "{CRITICAL_SERVICE_ERROR}", + ), + } + executor.trigger_shutdown(); + })) + } + /// Get a clone of the cancellation token. pub fn cancellation_token(&self) -> CancellationToken { self.token.clone() @@ -48,3 +83,12 @@ impl TaskExecutor { self.tracker.wait().await; } } + +/// Extracts a printable message from a `catch_unwind` panic payload. +fn panic_message(payload: &(dyn std::any::Any + Send)) -> &str { + payload + .downcast_ref::() + .map(String::as_str) + .or_else(|| payload.downcast_ref::<&'static str>().copied()) + .unwrap_or("") +} diff --git a/binaries/cuprated/src/p2p.rs b/binaries/cuprated/src/p2p.rs index 965df201f..a2d539087 100644 --- a/binaries/cuprated/src/p2p.rs +++ b/binaries/cuprated/src/p2p.rs @@ -25,7 +25,6 @@ use cuprate_types::blockchain::BlockchainWriteRequest; use crate::{ blockchain::BlockchainInterface, - constants::PANIC_CRITICAL_SERVICE_ERROR, tor::{transport_clearnet_daemon_config, transport_daemon_config, TorContext, TorMode}, txpool::{self, IncomingTxHandler}, LaunchContext, @@ -126,7 +125,7 @@ impl NetworkInterfaces { pub async fn initialize_clearnet_p2p( launch_ctx: &LaunchContext, tor_ctx: &TorContext, -) -> (NetworkInterface, Sender) { +) -> Result<(NetworkInterface, Sender), anyhow::Error> { let config = launch_ctx.config.as_ref(); let peer_sync_callback = launch_ctx.syncer.callback(&launch_ctx.blockchain); @@ -139,45 +138,48 @@ pub async fn initialize_clearnet_p2p( &launch_ctx.blockchain, launch_ctx.txpool_read.clone(), config.clearnet_p2p_config(), - transport_clearnet_arti_config(tor_ctx), + transport_clearnet_arti_config(tor_ctx)?, Some(peer_sync_callback), ) .await - .unwrap() } - TorMode::Daemon => start_zone_p2p::( + TorMode::Daemon => { + start_zone_p2p::( + &launch_ctx.blockchain, + launch_ctx.txpool_read.clone(), + config.clearnet_p2p_config(), + transport_clearnet_daemon_config(config), + Some(peer_sync_callback), + ) + .await + } + TorMode::Auto => unreachable!("Auto mode should be resolved before this point"), + }, + ProxySettings::Disabled => { + start_zone_p2p::( &launch_ctx.blockchain, launch_ctx.txpool_read.clone(), config.clearnet_p2p_config(), - transport_clearnet_daemon_config(config), + config.p2p.clear_net.tcp_transport_config(config.network), Some(peer_sync_callback), ) .await - .unwrap(), - TorMode::Auto => unreachable!("Auto mode should be resolved before this point"), - }, - ProxySettings::Disabled => start_zone_p2p::( - &launch_ctx.blockchain, - launch_ctx.txpool_read.clone(), - config.clearnet_p2p_config(), - config.p2p.clear_net.tcp_transport_config(config.network), - Some(peer_sync_callback), - ) - .await - .unwrap(), - ProxySettings::Socks(socks_config) => start_zone_p2p::( - &launch_ctx.blockchain, - launch_ctx.txpool_read.clone(), - config.clearnet_p2p_config(), - TransportConfig { - client_config: socks_config.clone(), - server_config: None, - }, - Some(peer_sync_callback), - ) - .await - .unwrap(), + } + ProxySettings::Socks(socks_config) => { + start_zone_p2p::( + &launch_ctx.blockchain, + launch_ctx.txpool_read.clone(), + config.clearnet_p2p_config(), + TransportConfig { + client_config: socks_config.clone(), + server_config: None, + }, + Some(peer_sync_callback), + ) + .await + } } + .map_err(anyhow::Error::from_boxed) } /// Initialize the Tor P2P network zone after the node has synced with the network. @@ -210,27 +212,36 @@ pub fn initialize_tor_p2p( tracing::info!("Starting Tor P2P zone."); let config = launch_ctx.config.as_ref(); - let (tor_interface, tor_tx_handler_tx) = match tor_context.mode { - TorMode::Daemon => start_zone_p2p::( - &launch_ctx.blockchain, - launch_ctx.txpool_read.clone(), - config.tor_p2p_config(&tor_context), - transport_daemon_config(config), - None, - ) - .await - .unwrap(), - #[cfg(feature = "arti")] - TorMode::Arti => start_zone_p2p::( - &launch_ctx.blockchain, - launch_ctx.txpool_read.clone(), - config.tor_p2p_config(&tor_context), - transport_arti_config(config, tor_context), - None, - ) - .await - .unwrap(), - TorMode::Auto => unreachable!("Auto mode should be resolved before this point"), + let Ok((tor_interface, tor_tx_handler_tx)) = async { + match tor_context.mode { + TorMode::Daemon => { + start_zone_p2p::( + &launch_ctx.blockchain, + launch_ctx.txpool_read.clone(), + config.tor_p2p_config(&tor_context), + transport_daemon_config(config), + None, + ) + .await + } + #[cfg(feature = "arti")] + TorMode::Arti => { + start_zone_p2p::( + &launch_ctx.blockchain, + launch_ctx.txpool_read.clone(), + config.tor_p2p_config(&tor_context), + transport_arti_config(config, tor_context)?, + None, + ) + .await + } + TorMode::Auto => unreachable!("Auto mode should be resolved before this point"), + } + } + .await + .map_err(anyhow::Error::from_boxed) + .inspect_err(|e| tracing::error!("Failed to start Tor P2P zone: {e:#}")) else { + return; }; // Publish the Tor interface for consumers diff --git a/binaries/cuprated/src/p2p/request_handler.rs b/binaries/cuprated/src/p2p/request_handler.rs index 5d58b5429..25599af4c 100644 --- a/binaries/cuprated/src/p2p/request_handler.rs +++ b/binaries/cuprated/src/p2p/request_handler.rs @@ -46,8 +46,7 @@ use cuprate_wire::protocol::{ }; use crate::{ - blockchain::interface::{BlockchainManagerHandle, IncomingBlockError}, - constants::PANIC_CRITICAL_SERVICE_ERROR, + blockchain::{interface::BlockchainManagerHandle, IncomingBlockError}, p2p::CrossNetworkInternalPeerId, txpool::{IncomingTxError, IncomingTxHandler, IncomingTxs}, }; @@ -431,8 +430,7 @@ where let res = incoming_tx_handler .ready() - .await - .expect(PANIC_CRITICAL_SERVICE_ERROR) + .await? .call(IncomingTxs { txs, state, diff --git a/binaries/cuprated/src/rpc/server.rs b/binaries/cuprated/src/rpc/server.rs index 97d7f308e..a8edc8b43 100644 --- a/binaries/cuprated/src/rpc/server.rs +++ b/binaries/cuprated/src/rpc/server.rs @@ -10,7 +10,7 @@ use tokio::net::TcpListener; use tokio_util::sync::CancellationToken; use tower::limit::rate::RateLimitLayer; use tower_http::limit::RequestBodyLimitLayer; -use tracing::{info, warn}; +use tracing::{error, info, warn}; use cuprate_rpc_interface::{RouterBuilder, RpcHandler}; @@ -23,12 +23,14 @@ use crate::{ /// Initialize the RPC server(s). /// -/// # Panics -/// This function will panic if: -/// - the server(s) could not be started -/// - unrestricted RPC is started on non-local -/// address without override option -pub fn init_rpc_servers(launch_ctx: &LaunchContext, tx_handler: IncomingTxHandler) { +/// # Errors +/// +/// This function will return an [`Err`] if unrestricted RPC is started on a +/// non-local address without the override option. +pub fn init_rpc_servers( + launch_ctx: &LaunchContext, + tx_handler: IncomingTxHandler, +) -> Result<(), Error> { let config = &launch_ctx.config.rpc; for ((enable, addr, port, request_byte_limit), restricted) in [ ( @@ -65,7 +67,7 @@ pub fn init_rpc_servers(launch_ctx: &LaunchContext, tx_handler: IncomingTxHandle "Starting unrestricted RPC on non-local address, this is dangerous!" ); } else { - panic!("Refusing to start unrestricted RPC on a non-local address ({addr})"); + anyhow::bail!("Refusing to start unrestricted RPC on a non-local address ({addr})"); } } @@ -73,7 +75,7 @@ pub fn init_rpc_servers(launch_ctx: &LaunchContext, tx_handler: IncomingTxHandle let shutdown_token = launch_ctx.task_executor.cancellation_token(); launch_ctx.task_executor.spawn(async move { - run_rpc_server( + if let Err(e) = run_rpc_server( rpc_handler, restricted, SocketAddr::new(addr, port), @@ -81,10 +83,13 @@ pub fn init_rpc_servers(launch_ctx: &LaunchContext, tx_handler: IncomingTxHandle shutdown_token, ) .await - .unwrap(); - info!(restricted, "RPC server shut down."); + { + error!(restricted, "Failed to start RPC server: {e:#}"); + } }); } + + Ok(()) } /// This initializes and runs an RPC server. @@ -127,5 +132,6 @@ async fn run_rpc_server( .with_graceful_shutdown(shutdown_token.cancelled_owned()) .await?; + info!(restricted, "RPC server shut down."); Ok(()) } diff --git a/binaries/cuprated/src/rpc/service/tx_handler.rs b/binaries/cuprated/src/rpc/service/tx_handler.rs index 683822c17..b9f2a0baa 100644 --- a/binaries/cuprated/src/rpc/service/tx_handler.rs +++ b/binaries/cuprated/src/rpc/service/tx_handler.rs @@ -5,7 +5,9 @@ use tower::{Service, ServiceExt}; use cuprate_types::TxRelayChecks; -use crate::txpool::{IncomingTxError, IncomingTxHandler, IncomingTxs, RelayRuleError}; +use crate::txpool::{ + IncomingTxError, IncomingTxHandler, IncomingTxs, RelayRuleError, TxValidationError, +}; pub async fn handle_incoming_txs( tx_handler: &mut IncomingTxHandler, @@ -20,10 +22,11 @@ pub async fn handle_incoming_txs( Ok(match resp { Ok(()) => TxRelayChecks::empty(), - Err(e) => match e { - IncomingTxError::Consensus(ExtendedConsensusError::ConErr( - ConsensusError::Transaction(e), - )) => match e { + Err(IncomingTxError::Internal(e)) => return Err(anyhow!(e)), + Err(IncomingTxError::Validation(e)) => match e { + TxValidationError::Consensus(ExtendedConsensusError::ConErr( + ConsensusError::Transaction(ref tx_err), + )) => match tx_err { TransactionError::TooBig => TxRelayChecks::TOO_BIG, TransactionError::KeyImageSpent => TxRelayChecks::DOUBLE_SPEND, @@ -49,21 +52,21 @@ pub async fn handle_incoming_txs( | TransactionError::RingMemberNotFoundOrInvalid | TransactionError::RingSignatureIncorrect | TransactionError::TransactionVersionInvalid - | TransactionError::RingCTError(_) => return Err(anyhow!("unreachable")), + | TransactionError::RingCTError(_) => return Err(anyhow!(e)), }, - IncomingTxError::Parse(_) | IncomingTxError::Consensus(_) => { - return Err(anyhow!("unreachable")) + TxValidationError::Parse(_) | TxValidationError::Consensus(_) => { + return Err(anyhow!(e)) } - IncomingTxError::RelayRule(RelayRuleError::NonZeroTimelock) => { + TxValidationError::RelayRule(RelayRuleError::NonZeroTimelock) => { TxRelayChecks::NONZERO_UNLOCK_TIME } - IncomingTxError::RelayRule(RelayRuleError::ExtraFieldTooLarge) => { + TxValidationError::RelayRule(RelayRuleError::ExtraFieldTooLarge) => { TxRelayChecks::TX_EXTRA_TOO_BIG } - IncomingTxError::RelayRule(RelayRuleError::FeeBelowMinimum) => { + TxValidationError::RelayRule(RelayRuleError::FeeBelowMinimum) => { TxRelayChecks::FEE_TOO_LOW } - IncomingTxError::DuplicateTransaction => TxRelayChecks::DOUBLE_SPEND, + TxValidationError::DuplicateTransaction => TxRelayChecks::DOUBLE_SPEND, }, }) } diff --git a/binaries/cuprated/src/tor.rs b/binaries/cuprated/src/tor.rs index 872e55750..ca8ac4542 100644 --- a/binaries/cuprated/src/tor.rs +++ b/binaries/cuprated/src/tor.rs @@ -172,47 +172,54 @@ fn initialize_arti_onion_service(client_config: &TorClientConfig, config: &Confi //---------------------------------------------------------------------------------------------------- Transport configuration #[cfg(feature = "arti")] -pub fn transport_arti_config(config: &Config, ctx: TorContext) -> TransportConfig { +pub fn transport_arti_config( + config: &Config, + ctx: TorContext, +) -> Result, anyhow::Error> { // Extracting let (Some(bootstrapped_client), Some(client_config)) = (ctx.bootstrapped_client, ctx.arti_client_config) else { - panic!("Arti client should be initialized"); + anyhow::bail!("Arti client should be initialized"); }; - let server_config = config.p2p.tor_net.inbound_onion.then(|| { + let server_config = if config.p2p.tor_net.inbound_onion { let Some(onion_svc) = ctx.arti_onion_service else { - panic!("inbound onion enabled, but no onion service initialized!"); + anyhow::bail!("inbound onion enabled, but no onion service initialized!"); }; - ArtiServerConfig::new( + Some(ArtiServerConfig::new( onion_svc, p2p_port(config.p2p.tor_net.p2p_port, config.network), &bootstrapped_client, &client_config, - ) - }); + )) + } else { + None + }; - TransportConfig:: { + Ok(TransportConfig:: { client_config: ArtiClientConfig { client: bootstrapped_client, }, server_config, - } + }) } #[cfg(feature = "arti")] -pub fn transport_clearnet_arti_config(ctx: &TorContext) -> TransportConfig { +pub fn transport_clearnet_arti_config( + ctx: &TorContext, +) -> Result, anyhow::Error> { let Some(bootstrapped_client) = &ctx.bootstrapped_client else { - panic!("Arti enabled but no TorClient initialized!"); + anyhow::bail!("Arti enabled but no TorClient initialized!"); }; - TransportConfig:: { + Ok(TransportConfig:: { client_config: ArtiClientConfig { client: Arc::clone(bootstrapped_client), }, server_config: None, - } + }) } pub fn transport_daemon_config(config: &Config) -> TransportConfig { diff --git a/binaries/cuprated/src/txpool.rs b/binaries/cuprated/src/txpool.rs index f70ad92ed..2a6807497 100644 --- a/binaries/cuprated/src/txpool.rs +++ b/binaries/cuprated/src/txpool.rs @@ -7,11 +7,13 @@ use cuprate_p2p_core::ClearNet; use cuprate_txpool::service::{TxpoolReadHandle, TxpoolWriteHandle}; mod dandelion; +mod error; mod incoming_tx; mod manager; mod relay_rules; mod txs_being_handled; -pub use incoming_tx::{IncomingTxError, IncomingTxHandler, IncomingTxs}; +pub use error::{IncomingTxError, TxValidationError}; +pub use incoming_tx::{IncomingTxHandler, IncomingTxs}; pub use manager::{PoolInfoSinceResponse, TxpoolManagerCommand, TxpoolManagerHandle}; pub use relay_rules::RelayRuleError; diff --git a/binaries/cuprated/src/txpool/error.rs b/binaries/cuprated/src/txpool/error.rs new file mode 100644 index 000000000..42bf1a911 --- /dev/null +++ b/binaries/cuprated/src/txpool/error.rs @@ -0,0 +1,72 @@ +//! Error types for incoming transaction handling. + +use cuprate_consensus::ExtendedConsensusError; +use cuprate_consensus_rules::ConsensusError; +use cuprate_txpool::TxPoolError; + +use crate::txpool::relay_rules::RelayRuleError; + +/// A validation failure - the tx should be dropped. +#[derive(Debug, thiserror::Error)] +pub enum TxValidationError { + /// The transaction could not be parsed. + #[error("Error parsing tx: {0}")] + Parse(#[from] std::io::Error), + + /// The transaction violated a consensus rule. + #[error(transparent)] + Consensus(ExtendedConsensusError), + + /// A duplicate transaction appeared in the incoming batch. + #[error("Duplicate tx in message.")] + DuplicateTransaction, + + /// A relay rule was broken. + #[error("Relay rule was broken: {0}")] + RelayRule(#[from] RelayRuleError), +} + +/// An error returned while handling an incoming transaction. +#[derive(Debug, thiserror::Error)] +pub enum IncomingTxError { + /// The tx was rejected by validation; drop it. + #[error(transparent)] + Validation(#[from] TxValidationError), + + /// An inner tower service returned an error. + #[error(transparent)] + Internal(#[from] tower::BoxError), +} + +impl From for IncomingTxError { + fn from(e: ExtendedConsensusError) -> Self { + if let ExtendedConsensusError::DBErr(e) = e { + return Self::Internal(e); + } + TxValidationError::Consensus(e).into() + } +} + +impl From for IncomingTxError { + fn from(e: ConsensusError) -> Self { + TxValidationError::Consensus(e.into()).into() + } +} + +impl From for IncomingTxError { + fn from(e: std::io::Error) -> Self { + TxValidationError::from(e).into() + } +} + +impl From for IncomingTxError { + fn from(e: RelayRuleError) -> Self { + TxValidationError::from(e).into() + } +} + +impl From for IncomingTxError { + fn from(e: TxPoolError) -> Self { + Self::Internal(e.into()) + } +} diff --git a/binaries/cuprated/src/txpool/incoming_tx.rs b/binaries/cuprated/src/txpool/incoming_tx.rs index 6104db032..e85d0cbeb 100644 --- a/binaries/cuprated/src/txpool/incoming_tx.rs +++ b/binaries/cuprated/src/txpool/incoming_tx.rs @@ -15,7 +15,6 @@ use cuprate_blockchain::service::BlockchainReadHandle; use cuprate_consensus::{ transactions::{new_tx_verification_data, start_tx_verification, PrepTransactions}, BlockChainContextRequest, BlockChainContextResponse, BlockchainContextService, - ExtendedConsensusError, }; use cuprate_dandelion_tower::{ pool::{DandelionPoolService, IncomingTxBuilder}, @@ -31,38 +30,25 @@ use cuprate_txpool::{ }, TxpoolReadHandle, TxpoolWriteHandle, }, - transaction_blob_hash, + transaction_blob_hash, TxPoolError, }; use cuprate_types::TransactionVerificationData; use crate::{ blockchain::ConsensusBlockchainReadHandle, - constants::PANIC_CRITICAL_SERVICE_ERROR, p2p::CrossNetworkInternalPeerId, txpool::{ dandelion::{ self, AnonTxService, ConcreteDandelionRouter, DiffuseService, MainDandelionRouter, }, manager::{start_txpool_manager, TxpoolManagerCommand, TxpoolManagerHandle}, - relay_rules::{check_tx_relay_rules, RelayRuleError}, + relay_rules::check_tx_relay_rules, txs_being_handled::{TxsBeingHandled, TxsBeingHandledLocally}, + IncomingTxError, TxValidationError, }, LaunchContext, }; -/// An error that can happen handling an incoming tx. -#[derive(Debug, thiserror::Error)] -pub enum IncomingTxError { - #[error("Error parsing tx: {0}")] - Parse(std::io::Error), - #[error(transparent)] - Consensus(ExtendedConsensusError), - #[error("Duplicate tx in message")] - DuplicateTransaction, - #[error("Relay rule was broken: {0}")] - RelayRule(RelayRuleError), -} - /// Incoming transactions. pub struct IncomingTxs { /// The raw bytes of the transactions. @@ -115,7 +101,7 @@ impl IncomingTxHandler { clear_net: NetworkInterface, tor_net_rx: Option>>, txpool_write_handle: TxpoolWriteHandle, - ) -> Self { + ) -> anyhow::Result { let txpool_config = launch_ctx.config.storage.txpool.clone(); let diffuse_service = DiffuseService { @@ -142,9 +128,9 @@ impl IncomingTxHandler { txpool_config, launch_ctx.task_executor.clone(), ) - .await; + .await?; - Self { + Ok(Self { txs_being_handled: TxsBeingHandled::new(), blockchain_context_cache: launch_ctx.blockchain.context_svc(), dandelion_pool_manager, @@ -155,7 +141,7 @@ impl IncomingTxHandler { BoxError::from, ), reorg_lock: Arc::clone(&launch_ctx.reorg_lock), - } + }) } } @@ -209,8 +195,7 @@ async fn handle_incoming_txs( let txs = start_tx_verification() .append_prepped_txs(txs) - .prepare() - .map_err(|e| IncomingTxError::Consensus(e.into()))? + .prepare()? .full( context.chain_height, context.top_hash, @@ -220,8 +205,7 @@ async fn handle_incoming_txs( None, ) .verify() - .await - .map_err(IncomingTxError::Consensus)?; + .await?; for tx in txs { // TODO: this could be a DoS, if someone spams us with txs that violate these rules? @@ -232,7 +216,7 @@ async fn handle_incoming_txs( continue; } - return Err(IncomingTxError::RelayRule(e)); + return Err(TxValidationError::RelayRule(e).into()); } tracing::debug!( @@ -299,7 +283,9 @@ async fn prepare_incoming_txs( // If a duplicate is in here the incoming tx batch contained the same tx twice. if !tx_blob_hashes.insert(tx_blob_hash) { tracing::debug!("peer sent duplicate tx in batch, ignoring batch."); - return Some(Err(IncomingTxError::DuplicateTransaction)); + return Some(Err(IncomingTxError::Validation( + TxValidationError::DuplicateTransaction, + ))); } // If a duplicate is here it is being handled in another batch. @@ -318,11 +304,9 @@ async fn prepare_incoming_txs( stem_pool_hashes, } = txpool_read_handle .ready() - .await - .expect(PANIC_CRITICAL_SERVICE_ERROR) + .await? .call(TxpoolReadRequest::FilterKnownTxBlobHashes(tx_blob_hashes)) - .await - .expect(PANIC_CRITICAL_SERVICE_ERROR) + .await? else { unreachable!() }; @@ -339,10 +323,9 @@ async fn prepare_incoming_txs( } }) .map(|bytes| { - let tx = Transaction::read(&mut bytes.as_ref()).map_err(IncomingTxError::Parse)?; + let tx = Transaction::read(&mut bytes.as_ref())?; - let tx = new_tx_verification_data(tx) - .map_err(|e| IncomingTxError::Consensus(e.into()))?; + let tx = new_tx_verification_data(tx)?; Ok(tx) }) @@ -364,15 +347,25 @@ async fn rerelay_stem_tx( CrossNetworkInternalPeerId, >, ) { - let Ok(TxpoolReadResponse::TxBlob { tx_blob, .. }) = txpool_read_handle - .ready() - .await - .expect(PANIC_CRITICAL_SERVICE_ERROR) - .call(TxpoolReadRequest::TxBlob(*tx_hash)) - .await - else { - // The tx could have been dropped from the pool. - return; + let svc = match txpool_read_handle.ready().await { + Ok(svc) => svc, + Err(e) => { + tracing::warn!("Failed to acquire txpool read service for stem re-relay: {e}"); + return; + } + }; + let tx_blob = match svc.call(TxpoolReadRequest::TxBlob(*tx_hash)).await { + Ok(TxpoolReadResponse::TxBlob { tx_blob, .. }) => tx_blob, + Ok(_) => unreachable!(), + Err(TxPoolError::NotFound) => { + // The tx was dropped from the pool + return; + } + Err(e) => { + // The service returned an error + tracing::warn!("Failed to fetch stem tx for re-relay: {e}"); + return; + } }; let incoming_tx = @@ -384,11 +377,15 @@ async fn rerelay_stem_tx( .build() .unwrap(); - dandelion_pool_manager - .ready() - .await - .expect(PANIC_CRITICAL_SERVICE_ERROR) - .call(incoming_tx) - .await - .expect(PANIC_CRITICAL_SERVICE_ERROR); + if let Err(e) = async { + dandelion_pool_manager + .ready() + .await? + .call(incoming_tx) + .await + } + .await + { + tracing::warn!("Dandelion pool manager failed for stem re-relay: {e}"); + } } diff --git a/binaries/cuprated/src/txpool/manager.rs b/binaries/cuprated/src/txpool/manager.rs index b16920886..9905ade82 100644 --- a/binaries/cuprated/src/txpool/manager.rs +++ b/binaries/cuprated/src/txpool/manager.rs @@ -20,15 +20,19 @@ use cuprate_dandelion_tower::{ }; use cuprate_helper::time::current_unix_timestamp; use cuprate_p2p_core::ClearNet; -use cuprate_txpool::service::{ - interface::{TxpoolReadRequest, TxpoolReadResponse, TxpoolWriteRequest, TxpoolWriteResponse}, - TxpoolReadHandle, TxpoolWriteHandle, +use cuprate_txpool::{ + service::{ + interface::{ + TxpoolReadRequest, TxpoolReadResponse, TxpoolWriteRequest, TxpoolWriteResponse, + }, + TxpoolReadHandle, TxpoolWriteHandle, + }, + TxPoolError, }; use cuprate_types::TransactionVerificationData; use crate::{ config::TxpoolConfig, - constants::PANIC_CRITICAL_SERVICE_ERROR, monitor::TaskExecutor, p2p::{CrossNetworkInternalPeerId, NetworkInterfaces}, txpool::{ @@ -46,9 +50,9 @@ const MAX_RECENTLY_REMOVED_TXS: usize = 5000; /// Starts the transaction pool manager service. /// -/// # Panics +/// # Errors /// -/// This function may panic if any inner service has an unrecoverable error. +/// This function will return an [`Err`] if any inner service has an unrecoverable error. pub async fn start_txpool_manager( mut txpool_write_handle: TxpoolWriteHandle, mut txpool_read_handle: TxpoolReadHandle, @@ -57,14 +61,12 @@ pub async fn start_txpool_manager( dandelion_pool_manager: DandelionPoolService, config: TxpoolConfig, task_executor: TaskExecutor, -) -> TxpoolManagerHandle { +) -> anyhow::Result { let TxpoolReadResponse::Backlog(backlog) = txpool_read_handle .ready() - .await - .expect(PANIC_CRITICAL_SERVICE_ERROR) + .await? .call(TxpoolReadRequest::Backlog) - .await - .expect(PANIC_CRITICAL_SERVICE_ERROR) + .await? else { unreachable!() }; @@ -121,19 +123,22 @@ pub async fn start_txpool_manager( tracing::info!(stem_txs = stem_txs.len(), "promoting stem txs"); for tx in stem_txs { - manager.promote_tx(tx).await; + manager.promote_tx(tx).await?; } let (command_tx, command_rx) = mpsc::channel(INCOMING_TX_QUEUE_SIZE); let (spent_kis_tx, spent_kis_rx) = mpsc::channel(1); let shutdown_token = task_executor.cancellation_token(); - task_executor.spawn(manager.run(command_rx, spent_kis_rx, shutdown_token)); + task_executor.spawn_critical( + "txpool manager", + manager.run(command_rx, spent_kis_rx, shutdown_token), + ); - TxpoolManagerHandle { + Ok(TxpoolManagerHandle { command_tx, spent_kis_tx, - } + }) } /// Commands sent to the [`TxpoolManager`] via [`TxpoolManagerHandle`]. @@ -212,13 +217,15 @@ impl TxpoolManagerHandle { } /// Tell the tx-pool about spent key images in an incoming block. - pub async fn new_block(&mut self, spent_key_images: Vec<[u8; 32]>) -> anyhow::Result<()> { + pub async fn new_block( + &mut self, + spent_key_images: Vec<[u8; 32]>, + ) -> Result<(), tower::BoxError> { let (tx, rx) = oneshot::channel(); drop(self.spent_kis_tx.send((spent_key_images, tx)).await); - rx.await - .map_err(|_| anyhow::anyhow!("txpool manager stopped")) + rx.await.map_err(|_| "txpool manager stopped".into()) } } @@ -284,9 +291,21 @@ impl TxpoolManager { /// /// This function will panic if the tx is not in the tx-pool manager. #[instrument(level = "debug", skip_all, fields(tx_id = hex::encode(tx)))] - async fn remove_tx_from_pool(&mut self, tx: [u8; 32], remove_from_db: bool) { + async fn remove_tx_from_pool( + &mut self, + tx: [u8; 32], + remove_from_db: bool, + ) -> Result<(), TxPoolError> { tracing::debug!("removing tx from pool"); + if remove_from_db { + self.txpool_write_handle + .ready() + .await? + .call(TxpoolWriteRequest::RemoveTransaction(tx)) + .await?; + } + let tx_info = self.current_txs.swap_remove(&tx).unwrap(); tx_info @@ -307,15 +326,7 @@ impl TxpoolManager { } } - if remove_from_db { - self.txpool_write_handle - .ready() - .await - .expect(PANIC_CRITICAL_SERVICE_ERROR) - .call(TxpoolWriteRequest::RemoveTransaction(tx)) - .await - .expect(PANIC_CRITICAL_SERVICE_ERROR); - } + Ok(()) } /// Re-relay a tx to the network. @@ -324,20 +335,15 @@ impl TxpoolManager { /// /// This function will panic if the tx is not in the tx-pool. #[instrument(level = "debug", skip_all, fields(tx_id = hex::encode(tx)))] - async fn rerelay_tx(&mut self, tx: [u8; 32]) { + async fn rerelay_tx(&mut self, tx: [u8; 32]) -> Result<(), TxPoolError> { tracing::debug!("re-relaying tx to network"); - let TxpoolReadResponse::TxBlob { - tx_blob, - state_stem: _, - } = self + let TxpoolReadResponse::TxBlob { tx_blob, .. } = self .txpool_read_handle .ready() - .await - .expect(PANIC_CRITICAL_SERVICE_ERROR) + .await? .call(TxpoolReadRequest::TxBlob(tx)) - .await - .expect(PANIC_CRITICAL_SERVICE_ERROR) + .await? else { unreachable!() }; @@ -345,16 +351,18 @@ impl TxpoolManager { self.diffuse_service .call(DiffuseRequest(DandelionTx(Bytes::from(tx_blob)))) .await - .expect(PANIC_CRITICAL_SERVICE_ERROR); + .expect("Diffuse service should not return an error"); + + Ok(()) } /// Handles a transaction timeout, be either rebroadcasting or dropping the tx from the pool. /// If a rebroadcast happens, this function will handle adding another timeout to the queue. #[instrument(level = "debug", skip_all, fields(tx_id = hex::encode(tx)))] - async fn handle_tx_timeout(&mut self, tx: [u8; 32]) { + async fn handle_tx_timeout(&mut self, tx: [u8; 32]) -> Result<(), TxPoolError> { let Some(tx_info) = self.current_txs.get(&tx) else { tracing::warn!("tx timed out, but tx not in pool"); - return; + return Ok(()); }; let time_in_pool = current_unix_timestamp() - tx_info.received_at; @@ -363,15 +371,15 @@ impl TxpoolManager { // slightly off. if time_in_pool + 10 > self.config.maximum_age_secs { tracing::warn!("tx has been in pool too long, removing from pool"); - self.remove_tx_from_pool(tx, true).await; - return; + self.remove_tx_from_pool(tx, true).await?; + return Ok(()); } let received_at = tx_info.received_at; tracing::debug!(time_in_pool, "tx timed out, resending to network"); - self.rerelay_tx(tx).await; + self.rerelay_tx(tx).await?; let tx_info = self.current_txs.get_mut(&tx).unwrap(); @@ -382,6 +390,8 @@ impl TxpoolManager { self.tx_timeouts .insert(tx, Duration::from_secs(next_timeout)), ); + + Ok(()) } /// Adds a tx to the tx-pool manager. @@ -420,7 +430,7 @@ impl TxpoolManager { &mut self, tx: TransactionVerificationData, state: TxState, - ) { + ) -> Result<(), TxPoolError> { tracing::debug!("handling new tx"); let incoming_tx = @@ -431,14 +441,12 @@ impl TxpoolManager { let TxpoolWriteResponse::AddTransaction(double_spend) = self .txpool_write_handle .ready() - .await - .expect(PANIC_CRITICAL_SERVICE_ERROR) + .await? .call(TxpoolWriteRequest::AddTransaction { tx: Box::new(tx), state_stem: state.is_stem_stage(), }) - .await - .expect(PANIC_CRITICAL_SERVICE_ERROR) + .await? else { unreachable!() }; @@ -448,7 +456,7 @@ impl TxpoolManager { double_spent = hex::encode(tx_hash), "transaction is a double spend, ignoring" ); - return; + return Ok(()); } self.track_tx(tx_hash, tx_weight, tx_fee, state.is_stem_stage()); @@ -459,31 +467,43 @@ impl TxpoolManager { .build() .unwrap(); - self.dandelion_pool_manager - .ready() - .await - .expect(PANIC_CRITICAL_SERVICE_ERROR) - .call(incoming_tx) - .await - .expect(PANIC_CRITICAL_SERVICE_ERROR); + if let Err(e) = async { + self.dandelion_pool_manager + .ready() + .await? + .call(incoming_tx) + .await + } + .await + { + tracing::warn!("Dandelion pool manager failed for incoming tx: {e}"); + } + + Ok(()) } /// Promote a tx to the public pool. #[instrument(level = "debug", skip_all, fields(tx_id = hex::encode(tx)))] - async fn promote_tx(&mut self, tx: [u8; 32]) { + async fn promote_tx(&mut self, tx: [u8; 32]) -> Result<(), TxPoolError> { let Some(tx_info) = self.current_txs.get_mut(&tx) else { tracing::debug!("not promoting tx, tx not in pool"); - return; + return Ok(()); }; if !tx_info.private { tracing::trace!("not promoting tx, tx is already public"); - return; + return Ok(()); } - tx_info.private = false; tracing::debug!("promoting tx"); + self.txpool_write_handle + .ready() + .await? + .call(TxpoolWriteRequest::Promote(tx)) + .await?; + + tx_info.private = false; // It's now in the public pool, pretend we just saw it. tx_info.received_at = current_unix_timestamp(); self.public_pool_timestamps @@ -497,13 +517,7 @@ impl TxpoolManager { .insert(tx, Duration::from_secs(next_timeout)), ); - self.txpool_write_handle - .ready() - .await - .expect(PANIC_CRITICAL_SERVICE_ERROR) - .call(TxpoolWriteRequest::Promote(tx)) - .await - .expect(PANIC_CRITICAL_SERVICE_ERROR); + Ok(()) } /// Returns the hashes of all public-pool transactions that entered the public pool at or @@ -526,33 +540,31 @@ impl TxpoolManager { /// Handles removing all transactions that have been included/double spent in an incoming block. #[instrument(level = "debug", skip_all)] - async fn new_block(&mut self, spent_key_images: Vec<[u8; 32]>) { + async fn new_block(&mut self, spent_key_images: Vec<[u8; 32]>) -> Result<(), TxPoolError> { tracing::debug!("handling new block"); let TxpoolWriteResponse::NewBlock(removed_txs) = self .txpool_write_handle .ready() - .await - .expect(PANIC_CRITICAL_SERVICE_ERROR) + .await? .call(TxpoolWriteRequest::NewBlock { spent_key_images }) - .await - .expect(PANIC_CRITICAL_SERVICE_ERROR) + .await? else { unreachable!() }; for tx in removed_txs { - self.remove_tx_from_pool(tx, false).await; + self.remove_tx_from_pool(tx, false).await?; } + Ok(()) } - #[expect(clippy::let_underscore_must_use)] async fn run( mut self, mut command_rx: mpsc::Receiver, mut block_rx: mpsc::Receiver<(Vec<[u8; 32]>, oneshot::Sender<()>)>, shutdown_token: CancellationToken, - ) { + ) -> Result<(), TxPoolError> { loop { tokio::select! { biased; @@ -560,16 +572,16 @@ impl TxpoolManager { break; } Some((spent_kis, tx)) = block_rx.recv() => { - self.new_block(spent_kis).await; + self.new_block(spent_kis).await?; let _ = tx.send(()); } Some(tx) = self.tx_timeouts.next() => { - self.handle_tx_timeout(tx.into_inner()).await; + self.handle_tx_timeout(tx.into_inner()).await?; } Some(command) = command_rx.recv() => { match command { TxpoolManagerCommand::IncomingTx(tx, state) => { - self.handle_incoming_tx(tx, state).await; + self.handle_incoming_tx(tx, state).await?; } TxpoolManagerCommand::PoolInfoSince { since, response_tx } => { // If `since` is 0 the requester wants the full pool. If `since` is older than `removed_txs_start_time`, @@ -595,12 +607,13 @@ impl TxpoolManager { } } Some(tx) = self.promote_tx_channel.recv() => { - self.promote_tx(tx).await; + self.promote_tx(tx).await?; } } } tracing::info!("Txpool manager shut down."); + Ok(()) } } diff --git a/types/types/src/lib.rs b/types/types/src/lib.rs index da76352cc..5bc94902d 100644 --- a/types/types/src/lib.rs +++ b/types/types/src/lib.rs @@ -21,7 +21,7 @@ pub use block_complete_entry::{BlockCompleteEntry, PrunedTxBlobEntry, Transactio pub use connection_state::ConnectionState; pub use hard_fork::{HardFork, HardForkError}; pub use transaction_verification_data::{ - CachedVerificationState, TransactionVerificationData, TxVersion, + CachedVerificationState, TransactionVerificationData, TxConversionError, TxVersion, }; pub use types::{ AltBlockInformation, BlockTemplate, Chain, ChainId, ExtendedBlockHeader, From 93a227bf6fdfd95156a0c3ce426f6e2b961228ac Mon Sep 17 00:00:00 2001 From: redsh4de <25299353+redsh4de@users.noreply.github.com> Date: Wed, 20 May 2026 11:25:50 -0700 Subject: [PATCH 2/5] keep log guard alive until shutdown --- binaries/cuprated/src/logging.rs | 13 +++++++------ binaries/cuprated/src/main.rs | 2 +- 2 files changed, 8 insertions(+), 7 deletions(-) diff --git a/binaries/cuprated/src/logging.rs b/binaries/cuprated/src/logging.rs index 9666f91cf..2ae795199 100644 --- a/binaries/cuprated/src/logging.rs +++ b/binaries/cuprated/src/logging.rs @@ -5,13 +5,15 @@ use std::ops::BitAnd; use std::{ fmt::{Display, Formatter}, io::IsTerminal, - mem::forget, sync::OnceLock, }; use tracing::{ instrument::WithSubscriber, level_filters::LevelFilter, subscriber::Interest, Metadata, }; -use tracing_appender::{non_blocking::NonBlocking, rolling::Rotation}; +use tracing_appender::{ + non_blocking::{NonBlocking, WorkerGuard}, + rolling::Rotation, +}; use tracing_subscriber::{ filter::Filtered, fmt::{ @@ -84,7 +86,7 @@ impl Filter for CupratedTracingFilter { } /// Initialize [`tracing`] for logging to stdout and to a file. -pub fn init_logging(config: &Config) { +pub fn init_logging(config: &Config) -> WorkerGuard { // initialize the stdout filter, set `STDOUT_FILTER_HANDLE` and create the layer. let (stdout_filter, stdout_handle) = ReloadLayer::new(CupratedTracingFilter { level: config.tracing.stdout.level, @@ -107,9 +109,6 @@ pub fn init_logging(config: &Config) { .unwrap(), ); - // TODO: drop this when we shutdown. - forget(guard); - // initialize the appender filter, set `FILE_WRITER_FILTER_HANDLE` and create the layer. let (appender_filter, appender_handle) = ReloadLayer::new(CupratedTracingFilter { level: appender_config.level, @@ -127,6 +126,8 @@ pub fn init_logging(config: &Config) { .with(appender_layer) .with(stdout_layer) .init(); + + guard } /// Modify the stdout [`CupratedTracingFilter`]. diff --git a/binaries/cuprated/src/main.rs b/binaries/cuprated/src/main.rs index d4776323e..93fbda8f1 100644 --- a/binaries/cuprated/src/main.rs +++ b/binaries/cuprated/src/main.rs @@ -55,7 +55,7 @@ fn main() { let mut config = load_config(&args); // Initialize logging. - cuprated::logging::init_logging(&config); + let _log_guard = cuprated::logging::init_logging(&config); // Resolve available memory. resolve_max_memory(&mut config); From 8cc214568abfaaf23da20619f3d5545bec90330c Mon Sep 17 00:00:00 2001 From: redsh4de <25299353+redsh4de@users.noreply.github.com> Date: Mon, 25 May 2026 22:08:22 +0300 Subject: [PATCH 3/5] ban peer on validation errors in new_fluffy_block --- binaries/cuprated/src/blockchain/error.rs | 19 +++++++++++++--- binaries/cuprated/src/p2p/request_handler.rs | 23 ++++++++++++++++++-- 2 files changed, 37 insertions(+), 5 deletions(-) diff --git a/binaries/cuprated/src/blockchain/error.rs b/binaries/cuprated/src/blockchain/error.rs index 7df4fa90c..4b35bbb93 100644 --- a/binaries/cuprated/src/blockchain/error.rs +++ b/binaries/cuprated/src/blockchain/error.rs @@ -60,9 +60,13 @@ impl From for BlockManagerError { /// An error returned from [`BlockchainManagerHandle::handle_incoming_block`](super::interface::BlockchainManagerHandle::handle_incoming_block). #[derive(Debug, thiserror::Error)] pub enum IncomingBlockError { - /// Surfaced from the blockchain manager. + /// The peer sent us an invalid block; ban them. + #[error(transparent)] + Validation(BlockValidationError), + + /// A node-side failure. #[error(transparent)] - Manager(#[from] BlockManagerError), + Internal(tower::BoxError), /// We are missing the block's parent. #[error("The block has an unknown parent.")] @@ -83,9 +87,18 @@ pub enum IncomingBlockError { ChannelClosed, } +impl From for IncomingBlockError { + fn from(e: BlockManagerError) -> Self { + match e { + BlockManagerError::Validation(v) => Self::Validation(v), + BlockManagerError::Internal(i) => Self::Internal(i), + } + } +} + impl From for IncomingBlockError { fn from(e: ConsensusError) -> Self { - Self::Manager(e.into()) + BlockManagerError::from(e).into() } } diff --git a/binaries/cuprated/src/p2p/request_handler.rs b/binaries/cuprated/src/p2p/request_handler.rs index 25599af4c..3849b3d19 100644 --- a/binaries/cuprated/src/p2p/request_handler.rs +++ b/binaries/cuprated/src/p2p/request_handler.rs @@ -29,7 +29,7 @@ use cuprate_helper::{ map::{combine_low_high_bits_to_u128, split_u128_into_low_high_bits}, }; use cuprate_p2p::constants::{ - MAX_BLOCKS_IDS_IN_CHAIN_ENTRY, MAX_BLOCK_BATCH_LEN, MAX_TRANSACTION_BLOB_SIZE, MEDIUM_BAN, + LONG_BAN, MAX_BLOCKS_IDS_IN_CHAIN_ENTRY, MAX_BLOCK_BATCH_LEN, MAX_TRANSACTION_BLOB_SIZE, }; use cuprate_p2p_core::{ client::{InternalPeerID, PeerInformation}, @@ -354,6 +354,7 @@ async fn new_fluffy_block( return Ok(ProtocolResponse::NA); } + let block_hash = block.hash(); let res = blockchain_manager .handle_incoming_block( block, @@ -380,7 +381,21 @@ async fn new_fluffy_block( // Manager has exited (likely shutdown); drop silently. Ok(ProtocolResponse::NA) } - Err(e) => Err(e.into()), + Err(IncomingBlockError::Internal(e)) => { + tracing::error!("Failed to handle incoming block: {e}"); + Ok(ProtocolResponse::NA) + } + Err(e) => { + if matches!(e, IncomingBlockError::Validation(_)) { + tracing::warn!( + "Failed to verify block: {}, error {}, banning peer.", + hex::encode(block_hash), + e + ); + peer_information.handle.ban_peer(LONG_BAN); + } + Err(e.into()) + } } } @@ -441,6 +456,10 @@ where match res { Ok(()) => Ok(ProtocolResponse::NA), + Err(IncomingTxError::Internal(e)) => { + tracing::error!("Failed to handle incoming txs: {e}"); + Ok(ProtocolResponse::NA) + } Err(e) => Err(e.into()), } } From 2c11f0bb0906d380831d46119965212973ab5705 Mon Sep 17 00:00:00 2001 From: redsh4de <25299353+redsh4de@users.noreply.github.com> Date: Tue, 26 May 2026 21:55:22 +0300 Subject: [PATCH 4/5] reclassify AlreadyInMainChain as Ok in alt block path --- binaries/cuprated/src/blockchain/error.rs | 5 ----- binaries/cuprated/src/blockchain/manager/handler.rs | 10 ++++++++-- 2 files changed, 8 insertions(+), 7 deletions(-) diff --git a/binaries/cuprated/src/blockchain/error.rs b/binaries/cuprated/src/blockchain/error.rs index 4b35bbb93..3dce55eb1 100644 --- a/binaries/cuprated/src/blockchain/error.rs +++ b/binaries/cuprated/src/blockchain/error.rs @@ -20,11 +20,6 @@ macro_rules! impl_internal_from { /// A validation failure - the peer should be banned. #[derive(Debug, thiserror::Error)] pub enum BlockValidationError { - /// The block was received as an alt block but already exists on the - /// main chain. - #[error("Alt block already in main chain.")] - AlreadyInMainChain, - /// The block failed consensus validation. #[error(transparent)] Consensus(ExtendedConsensusError), diff --git a/binaries/cuprated/src/blockchain/manager/handler.rs b/binaries/cuprated/src/blockchain/manager/handler.rs index c0d0e073c..23e5e1663 100644 --- a/binaries/cuprated/src/blockchain/manager/handler.rs +++ b/binaries/cuprated/src/blockchain/manager/handler.rs @@ -375,7 +375,11 @@ impl super::BlockchainManager { return self.handle_incoming_block_batch_main_chain(batch).await; } // continue adding alt blocks. - Ok(AddAltBlock::NewlyCached(_) | AddAltBlock::AlreadyCached) => (), + Ok( + AddAltBlock::NewlyCached(_) + | AddAltBlock::AlreadyCached + | AddAltBlock::AlreadyInMainChain, + ) => (), } } @@ -413,7 +417,7 @@ impl super::BlockchainManager { match chain { Some((Chain::Alt(_), _)) => return Ok(AddAltBlock::AlreadyCached), - Some((Chain::Main, _)) => return Err(BlockValidationError::AlreadyInMainChain.into()), + Some((Chain::Main, _)) => return Ok(AddAltBlock::AlreadyInMainChain), None => (), } @@ -753,6 +757,8 @@ impl super::BlockchainManager { enum AddAltBlock { /// We already had this alt-block cached. AlreadyCached, + /// The block already exists on the main chain. + AlreadyInMainChain, /// The alt-block was newly cached. Contains the block blob. NewlyCached(Bytes), /// The chain was reorged. From c37f3b9e5f7feba8699ed8afaea9f65deba09e50 Mon Sep 17 00:00:00 2001 From: redsh4de <25299353+redsh4de@users.noreply.github.com> Date: Tue, 26 May 2026 21:55:22 +0300 Subject: [PATCH 5/5] classify validation errors by category, MEDIUM_BAN on hard-fork errors --- binaries/cuprated/src/blockchain/error.rs | 29 +++++--- .../src/blockchain/manager/handler.rs | 70 +++++++++++++++---- binaries/cuprated/src/p2p/request_handler.rs | 30 +++++--- 3 files changed, 98 insertions(+), 31 deletions(-) diff --git a/binaries/cuprated/src/blockchain/error.rs b/binaries/cuprated/src/blockchain/error.rs index 3dce55eb1..1df62d227 100644 --- a/binaries/cuprated/src/blockchain/error.rs +++ b/binaries/cuprated/src/blockchain/error.rs @@ -2,7 +2,7 @@ use cuprate_blockchain::BlockchainError; use cuprate_consensus::ExtendedConsensusError; -use cuprate_consensus_rules::ConsensusError; +use cuprate_consensus_rules::{blocks::BlockError, hard_forks::HardForkError, ConsensusError}; use cuprate_txpool::TxPoolError; use cuprate_types::TxConversionError; @@ -20,9 +20,13 @@ macro_rules! impl_internal_from { /// A validation failure - the peer should be banned. #[derive(Debug, thiserror::Error)] pub enum BlockValidationError { - /// The block failed consensus validation. + /// Invalid hard-fork rules. #[error(transparent)] - Consensus(ExtendedConsensusError), + HardFork(HardForkError), + + /// Any other consensus rule violation. + #[error(transparent)] + Other(ExtendedConsensusError), } /// An error from the blockchain manager's internal handlers. @@ -30,7 +34,7 @@ pub enum BlockValidationError { pub enum BlockManagerError { /// The peer sent us an invalid block; ban them. #[error(transparent)] - Validation(#[from] BlockValidationError), + Validation(BlockValidationError), /// A node-side failure. #[error(transparent)] @@ -39,16 +43,25 @@ pub enum BlockManagerError { impl From for BlockManagerError { fn from(e: ExtendedConsensusError) -> Self { - if let ExtendedConsensusError::DBErr(e) = e { - return Self::Internal(e); + match e { + ExtendedConsensusError::DBErr(e) => Self::Internal(e), + ExtendedConsensusError::ConErr(ConsensusError::Block(BlockError::HardForkError(e))) => { + Self::Validation(BlockValidationError::HardFork(e)) + } + + ExtendedConsensusError::ConErr(_) + | ExtendedConsensusError::TxsIncludedWithBlockIncorrect + | ExtendedConsensusError::OneOrMoreBatchVerificationStatementsInvalid + | ExtendedConsensusError::NoBlocksToVerify => { + Self::Validation(BlockValidationError::Other(e)) + } } - BlockValidationError::Consensus(e).into() } } impl From for BlockManagerError { fn from(e: ConsensusError) -> Self { - BlockValidationError::Consensus(e.into()).into() + ExtendedConsensusError::ConErr(e).into() } } diff --git a/binaries/cuprated/src/blockchain/manager/handler.rs b/binaries/cuprated/src/blockchain/manager/handler.rs index 23e5e1663..a35d6508f 100644 --- a/binaries/cuprated/src/blockchain/manager/handler.rs +++ b/binaries/cuprated/src/blockchain/manager/handler.rs @@ -23,7 +23,11 @@ use cuprate_consensus::{ use cuprate_consensus_context::{BlockchainContext, NewBlockData}; use cuprate_fast_sync::{block_to_verified_block_information, fast_sync_stop_height}; use cuprate_helper::cast::usize_to_u64; -use cuprate_p2p::{block_downloader::BlockBatch, constants::LONG_BAN, BroadcastRequest}; +use cuprate_p2p::{ + block_downloader::BlockBatch, + constants::{LONG_BAN, MEDIUM_BAN}, + BroadcastRequest, +}; use cuprate_txpool::service::interface::TxpoolWriteRequest; use cuprate_types::{ blockchain::{BlockchainReadRequest, BlockchainResponse, BlockchainWriteRequest}, @@ -246,8 +250,12 @@ impl super::BlockchainManager { { Ok(v) => v, Err(BlockManagerError::Internal(e)) => return Err(e), - Err(BlockManagerError::Validation(_)) => { - batch.peer_handle.ban_peer(LONG_BAN); + Err(BlockManagerError::Validation(e)) => { + let duration = match e { + BlockValidationError::HardFork(_) => MEDIUM_BAN, + BlockValidationError::Other(_) => LONG_BAN, + }; + batch.peer_handle.ban_peer(duration); self.stop_current_block_downloader.notify_waiters(); return Ok(()); } @@ -255,6 +263,7 @@ impl super::BlockchainManager { for (block, txs) in prepped_blocks { let hash = block.block_hash; + let block_version = block.hf_version.as_u8(); let verified_block = match verify_prepped_main_chain_block( block, txs, @@ -268,12 +277,27 @@ impl super::BlockchainManager { Ok(block) => block, Err(BlockManagerError::Internal(e)) => return Err(e), Err(BlockManagerError::Validation(e)) => { - warn!( - "Failed to verify block: {}, error {}, banning peer.", - hex::encode(hash), - e - ); - batch.peer_handle.ban_peer(LONG_BAN); + let duration = match e { + BlockValidationError::HardFork(e) => { + warn!( + "Failed to verify block: {}, error {} (block v{}, current v{}), banning peer.", + hex::encode(hash), + e, + block_version, + self.blockchain_context_service.blockchain_context().current_hf.as_u8() + ); + MEDIUM_BAN + } + BlockValidationError::Other(e) => { + warn!( + "Failed to verify block: {}, error {}, banning peer.", + hex::encode(hash), + e + ); + LONG_BAN + } + }; + batch.peer_handle.ban_peer(duration); self.stop_current_block_downloader.notify_waiters(); return Ok(()); } @@ -335,6 +359,7 @@ impl super::BlockchainManager { while let Some((block, txs)) = blocks.next() { let hash = block.hash(); + let block_version = block.header.hardfork_version; // async blocks work as try blocks. let res = async { @@ -355,12 +380,27 @@ impl super::BlockchainManager { match res { Err(BlockManagerError::Internal(e)) => return Err(e), Err(BlockManagerError::Validation(e)) => { - warn!( - "Failed to verify block: {}, error {}, banning peer.", - hex::encode(hash), - e - ); - batch.peer_handle.ban_peer(LONG_BAN); + let duration = match e { + BlockValidationError::HardFork(e) => { + warn!( + "Failed to verify block: {}, error {} (block v{}, current v{}), banning peer.", + hex::encode(hash), + e, + block_version, + self.blockchain_context_service.blockchain_context().current_hf.as_u8() + ); + MEDIUM_BAN + } + BlockValidationError::Other(e) => { + warn!( + "Failed to verify block: {}, error {}, banning peer.", + hex::encode(hash), + e + ); + LONG_BAN + } + }; + batch.peer_handle.ban_peer(duration); self.stop_current_block_downloader.notify_waiters(); return Ok(()); } diff --git a/binaries/cuprated/src/p2p/request_handler.rs b/binaries/cuprated/src/p2p/request_handler.rs index 3849b3d19..3309e8496 100644 --- a/binaries/cuprated/src/p2p/request_handler.rs +++ b/binaries/cuprated/src/p2p/request_handler.rs @@ -30,6 +30,7 @@ use cuprate_helper::{ }; use cuprate_p2p::constants::{ LONG_BAN, MAX_BLOCKS_IDS_IN_CHAIN_ENTRY, MAX_BLOCK_BATCH_LEN, MAX_TRANSACTION_BLOB_SIZE, + MEDIUM_BAN, }; use cuprate_p2p_core::{ client::{InternalPeerID, PeerInformation}, @@ -46,7 +47,7 @@ use cuprate_wire::protocol::{ }; use crate::{ - blockchain::{interface::BlockchainManagerHandle, IncomingBlockError}, + blockchain::{interface::BlockchainManagerHandle, BlockValidationError, IncomingBlockError}, p2p::CrossNetworkInternalPeerId, txpool::{IncomingTxError, IncomingTxHandler, IncomingTxs}, }; @@ -355,6 +356,7 @@ async fn new_fluffy_block( } let block_hash = block.hash(); + let block_version = block.header.hardfork_version; let res = blockchain_manager .handle_incoming_block( block, @@ -381,21 +383,33 @@ async fn new_fluffy_block( // Manager has exited (likely shutdown); drop silently. Ok(ProtocolResponse::NA) } - Err(IncomingBlockError::Internal(e)) => { - tracing::error!("Failed to handle incoming block: {e}"); - Ok(ProtocolResponse::NA) - } - Err(e) => { - if matches!(e, IncomingBlockError::Validation(_)) { + Err(IncomingBlockError::Validation(e)) => match e { + BlockValidationError::HardFork(e) => { + tracing::warn!( + "Failed to verify block: {}, error {} (block v{}, current v{}), banning peer.", + hex::encode(block_hash), + e, + block_version, + context.current_hf.as_u8() + ); + peer_information.handle.ban_peer(MEDIUM_BAN); + Err(e.into()) + } + BlockValidationError::Other(e) => { tracing::warn!( "Failed to verify block: {}, error {}, banning peer.", hex::encode(block_hash), e ); peer_information.handle.ban_peer(LONG_BAN); + Err(e.into()) } - Err(e.into()) + }, + Err(IncomingBlockError::Internal(e)) => { + tracing::error!("Failed to handle incoming block: {e}"); + Ok(ProtocolResponse::NA) } + Err(e) => Err(e.into()), } }