diff --git a/binaries/cuprated/Cargo.toml b/binaries/cuprated/Cargo.toml index a078bc2f2..fe4907599 100644 --- a/binaries/cuprated/Cargo.toml +++ b/binaries/cuprated/Cargo.toml @@ -7,6 +7,12 @@ license = "AGPL-3.0-only" authors = ["Boog900", "hinto-janai", "SyntheticBird45"] repository = "https://github.com/Cuprate/cuprate/tree/main/binaries/cuprated" +[lib] +name = "cuprated" + +[[bin]] +name = "cuprated" + [features] default = ["arti"] arti = ["arti-client", "tor-hsservice", "tor-persist", "tor-rtcompat", "cuprate-p2p-transport/arti"] diff --git a/binaries/cuprated/src/config/args.rs b/binaries/cuprated/src/args.rs similarity index 98% rename from binaries/cuprated/src/config/args.rs rename to binaries/cuprated/src/args.rs index ee3a1674e..6792b1d64 100644 --- a/binaries/cuprated/src/config/args.rs +++ b/binaries/cuprated/src/args.rs @@ -5,7 +5,7 @@ use serde_json::Value; use cuprate_helper::network::Network; -use crate::{config::Config, version::CupratedVersionInfo}; +use cuprated::{config::Config, version::CupratedVersionInfo}; /// Cuprate Args. #[derive(clap::Parser, Debug)] diff --git a/binaries/cuprated/src/blockchain.rs b/binaries/cuprated/src/blockchain.rs index b7b976430..ba62ac2fe 100644 --- a/binaries/cuprated/src/blockchain.rs +++ b/binaries/cuprated/src/blockchain.rs @@ -8,7 +8,9 @@ use tokio::sync::{mpsc, Notify}; use tower::{BoxError, Service, ServiceExt}; use cuprate_blockchain::service::{BlockchainReadHandle, BlockchainWriteHandle}; -use cuprate_consensus::{generate_genesis_block, BlockchainContextService, ContextConfig}; +use cuprate_consensus::{ + generate_genesis_block, BlockchainContext, BlockchainContextService, ContextConfig, +}; use cuprate_cryptonight::cryptonight_hash_v0; use cuprate_p2p::{block_downloader::BlockDownloaderConfig, NetworkInterface}; use cuprate_p2p_core::{ClearNet, Network}; @@ -26,11 +28,58 @@ mod manager; mod syncer; mod types; -pub use fast_sync::set_fast_sync_hashes; -pub use manager::init_blockchain_manager; -pub use syncer::SyncNotify; +pub use fast_sync::get_fast_sync_hashes; +pub use interface::BlockchainManagerHandle; +pub use syncer::{Syncer, SyncerHandle}; pub use types::ConsensusBlockchainReadHandle; +pub(crate) use manager::init_blockchain_manager; + +/// The interface to the blockchain. +#[derive(Clone)] +pub struct BlockchainInterface { + /// A read handle to the blockchain database. + read: BlockchainReadHandle, + /// The blockchain context service. + context_svc: BlockchainContextService, + /// A handle to the blockchain manager. + manager: BlockchainManagerHandle, +} + +impl BlockchainInterface { + pub(crate) const fn new( + read: BlockchainReadHandle, + context_svc: BlockchainContextService, + manager: BlockchainManagerHandle, + ) -> Self { + Self { + read, + context_svc, + manager, + } + } + + /// Returns a read handle to the blockchain database. + pub fn read(&self) -> BlockchainReadHandle { + self.read.clone() + } + + /// Returns the current [`BlockchainContext`]. + pub fn context(&mut self) -> &BlockchainContext { + self.context_svc.blockchain_context() + } + + /// Returns a handle to the blockchain manager. + pub fn manager(&self) -> BlockchainManagerHandle { + self.manager.clone() + } + + /// Returns the blockchain context service. + pub(crate) fn context_svc(&self) -> BlockchainContextService { + self.context_svc.clone() + } +} + /// Checks if the genesis block is in the blockchain and adds it if not. pub async fn check_add_genesis( blockchain_read_handle: &mut BlockchainReadHandle, diff --git a/binaries/cuprated/src/blockchain/chain_service.rs b/binaries/cuprated/src/blockchain/chain_service.rs index 12b507370..67dd2e33f 100644 --- a/binaries/cuprated/src/blockchain/chain_service.rs +++ b/binaries/cuprated/src/blockchain/chain_service.rs @@ -14,7 +14,7 @@ use cuprate_types::blockchain::{BlockchainReadRequest, BlockchainResponse}; /// /// This has a more minimal interface than [`BlockchainReadRequest`] to make using the p2p crates easier. #[derive(Clone)] -pub struct ChainService(pub BlockchainReadHandle); +pub struct ChainService(pub BlockchainReadHandle, pub &'static [[u8; 32]]); impl Service> for ChainService { type Response = ChainSvcResponse; @@ -75,11 +75,16 @@ impl Service> for ChainService { .boxed(), ChainSvcRequest::ValidateEntries(entries, start_height) => { let mut blockchain_read_handle = self.0.clone(); + let fast_sync_hashes = self.1; async move { - let (valid, unknown) = - validate_entries(entries, start_height, &mut blockchain_read_handle) - .await?; + let (valid, unknown) = validate_entries( + entries, + start_height, + &mut blockchain_read_handle, + fast_sync_hashes, + ) + .await?; Ok(ChainSvcResponse::ValidateEntries { valid, unknown }) } diff --git a/binaries/cuprated/src/blockchain/fast_sync.rs b/binaries/cuprated/src/blockchain/fast_sync.rs index eeba4be19..50fc263f9 100644 --- a/binaries/cuprated/src/blockchain/fast_sync.rs +++ b/binaries/cuprated/src/blockchain/fast_sync.rs @@ -7,13 +7,15 @@ use cuprate_helper::network::Network; /// See `build.rs` for how this file is generated. static FAST_SYNC_HASHES: &[[u8; 32]] = &include!(concat!(env!("OUT_DIR"), "/fast_sync_hashes.rs")); -/// Set the fast-sync hashes according to the provided values. -pub fn set_fast_sync_hashes(fast_sync: bool, network: Network) { - cuprate_fast_sync::set_fast_sync_hashes(if fast_sync && network == Network::Mainnet { +/// Returns the fast-sync hashes for the given configuration. +/// +/// Returns a non-empty slice only for mainnet with fast sync enabled. +pub fn get_fast_sync_hashes(fast_sync: bool, network: Network) -> &'static [[u8; 32]] { + if fast_sync && network == Network::Mainnet { FAST_SYNC_HASHES } else { &[] - }); + } } #[cfg(test)] diff --git a/binaries/cuprated/src/blockchain/interface.rs b/binaries/cuprated/src/blockchain/interface.rs index c4058d1d6..d2dacae6b 100644 --- a/binaries/cuprated/src/blockchain/interface.rs +++ b/binaries/cuprated/src/blockchain/interface.rs @@ -4,7 +4,7 @@ //! blockchain manager. use std::{ collections::{HashMap, HashSet}, - sync::{LazyLock, Mutex, OnceLock}, + sync::{Arc, Mutex}, }; use monero_oxide::{block::Block, transaction::Transaction}; @@ -24,29 +24,25 @@ use crate::{ constants::PANIC_CRITICAL_SERVICE_ERROR, }; -/// The channel used to send [`BlockchainManagerCommand`]s to the blockchain manager. +/// Handle for the blockchain manager. /// -/// This channel is initialized in [`init_blockchain_manager`](super::manager::init_blockchain_manager), the functions -/// in this file document what happens if this is not initialized when they are called. -pub(super) static COMMAND_TX: OnceLock> = OnceLock::new(); - -/// A [`HashSet`] of block hashes that the blockchain manager is currently handling. -/// -/// This lock prevents sending the same block to the blockchain manager from multiple connections -/// before one of them actually gets added to the chain, allowing peers to do other things. -/// -/// This is used over something like a dashmap as we expect a lot of collisions in a short amount of -/// time for new blocks, so we would lose the benefit of sharded locks. A dashmap is made up of `RwLocks` -/// which are also more expensive than `Mutex`s. -static BLOCKS_BEING_HANDLED: LazyLock>> = - LazyLock::new(|| Mutex::new(HashSet::new())); - -/// Returns `true` if the given block hash is currently being handled. -pub fn is_block_being_handled(hash: &[u8; 32]) -> bool { - BLOCKS_BEING_HANDLED.lock().unwrap().contains(hash) +/// Created by `init_blockchain_manager`. +#[derive(Clone)] +pub struct BlockchainManagerHandle { + /// The channel used to send [`BlockchainManagerCommand`]s to the blockchain manager. + command_tx: mpsc::Sender, + /// A [`HashSet`] of block hashes that the blockchain manager is currently handling. + /// + /// This prevents sending the same block to the blockchain manager from multiple connections + /// before one of them actually gets added to the chain, allowing peers to do other things. + /// + /// This is used over something like a dashmap as we expect a lot of collisions in a short amount of + /// time for new blocks, so we would lose the benefit of sharded locks. A dashmap is made up of `RwLocks` + /// which are also more expensive than `Mutex`s. + blocks_being_handled: Arc>>, } -/// An error that can be returned from [`handle_incoming_block`]. +/// An error that can be returned from [`BlockchainManagerHandle::handle_incoming_block`]. #[derive(Debug, thiserror::Error)] pub enum IncomingBlockError { /// Some transactions in the block were unknown. @@ -60,142 +56,155 @@ pub enum IncomingBlockError { /// The block was invalid. #[error(transparent)] InvalidBlock(anyhow::Error), + /// The blockchain manager command channel is closed. + #[error("The blockchain manager command channel is closed.")] + ChannelClosed, } -/// Try to add a new block to the blockchain. -/// -/// On success returns [`IncomingBlockOk`]. -/// -/// # Errors -/// -/// This function will return an error if: -/// - the block was invalid -/// - we are missing transactions -/// - the block's parent is unknown -pub async fn handle_incoming_block( - block: Block, - mut given_txs: HashMap<[u8; 32], Transaction>, - blockchain_read_handle: &mut BlockchainReadHandle, - txpool_read_handle: &mut TxpoolReadHandle, -) -> Result { - if given_txs.len() > block.transactions.len() { - return Err(IncomingBlockError::InvalidBlock(anyhow::anyhow!( - "Too many transactions given for block" - ))); +impl BlockchainManagerHandle { + /// Create a new handle and command receiver pair. + pub fn new() -> (Self, mpsc::Receiver) { + let (command_tx, command_rx) = mpsc::channel(3); + ( + Self { + command_tx, + blocks_being_handled: Arc::new(Mutex::new(HashSet::new())), + }, + command_rx, + ) } - if !block_exists(block.header.previous, blockchain_read_handle) - .await - .expect(PANIC_CRITICAL_SERVICE_ERROR) - { - return Err(IncomingBlockError::Orphan); + /// Returns `true` if the given block hash is currently being handled. + pub fn is_block_being_handled(&self, hash: &[u8; 32]) -> bool { + self.blocks_being_handled.lock().unwrap().contains(hash) } - let block_hash = block.hash(); - - if block_exists(block_hash, blockchain_read_handle) - .await - .expect(PANIC_CRITICAL_SERVICE_ERROR) - { - return Ok(IncomingBlockOk::AlreadyHave); - } - - let TxpoolReadResponse::TxsForBlock { mut txs, missing } = txpool_read_handle - .ready() - .await - .expect(PANIC_CRITICAL_SERVICE_ERROR) - .call(TxpoolReadRequest::TxsForBlock(block.transactions.clone())) - .await - .expect(PANIC_CRITICAL_SERVICE_ERROR) - else { - unreachable!() - }; - - if !missing.is_empty() { - let needed_hashes = missing.iter().map(|index| block.transactions[*index]); - - for needed_hash in needed_hashes { - let Some(tx) = given_txs.remove(&needed_hash) else { - // We return back the indexes of all txs missing from our pool, not taking into account the txs - // that were given with the block, as these txs will be dropped. It is not worth it to try to add - // these txs to the pool as this will only happen with a misbehaving peer or if the txpool reaches - // the size limit. - return Err(IncomingBlockError::UnknownTransactions(block_hash, missing)); - }; - - txs.insert( - needed_hash, - new_tx_verification_data(tx) - .map_err(|e| IncomingBlockError::InvalidBlock(e.into()))?, - ); + /// Try to add a new block to the blockchain. + /// + /// On success returns `IncomingBlockOk`. + /// + /// # Errors + /// + /// This function will return an error if: + /// - the block was invalid + /// - we are missing transactions + /// - the block's parent is unknown + /// - the blockchain manager command channel is closed + pub async fn handle_incoming_block( + &self, + block: Block, + mut given_txs: HashMap<[u8; 32], Transaction>, + blockchain_read_handle: &mut BlockchainReadHandle, + txpool_read_handle: &mut TxpoolReadHandle, + ) -> Result { + if given_txs.len() > block.transactions.len() { + return Err(IncomingBlockError::InvalidBlock(anyhow::anyhow!( + "Too many transactions given for block" + ))); } - } - let Some(incoming_block_tx) = COMMAND_TX.get() else { - // We could still be starting up the blockchain manager. - return Ok(IncomingBlockOk::NotReady); - }; + if !block_exists(block.header.previous, blockchain_read_handle) + .await + .expect(PANIC_CRITICAL_SERVICE_ERROR) + { + return Err(IncomingBlockError::Orphan); + } - // Add the blocks hash to the blocks being handled. - if !BLOCKS_BEING_HANDLED.lock().unwrap().insert(block_hash) { - // If another place is already adding this block then we can stop. - return Ok(IncomingBlockOk::AlreadyHave); - } + let block_hash = block.hash(); - // We must remove the block hash from `BLOCKS_BEING_HANDLED`. - let _guard = { - struct RemoveFromBlocksBeingHandled { - block_hash: [u8; 32], + if block_exists(block_hash, blockchain_read_handle) + .await + .expect(PANIC_CRITICAL_SERVICE_ERROR) + { + return Ok(IncomingBlockOk::AlreadyHave); } - impl Drop for RemoveFromBlocksBeingHandled { - fn drop(&mut self) { - BLOCKS_BEING_HANDLED - .lock() - .unwrap() - .remove(&self.block_hash); + + let TxpoolReadResponse::TxsForBlock { mut txs, missing } = txpool_read_handle + .ready() + .await + .expect(PANIC_CRITICAL_SERVICE_ERROR) + .call(TxpoolReadRequest::TxsForBlock(block.transactions.clone())) + .await + .expect(PANIC_CRITICAL_SERVICE_ERROR) + else { + unreachable!() + }; + + if !missing.is_empty() { + let needed_hashes = missing.iter().map(|index| block.transactions[*index]); + + for needed_hash in needed_hashes { + let Some(tx) = given_txs.remove(&needed_hash) else { + // We return back the indexes of all txs missing from our pool, not taking into account the txs + // that were given with the block, as these txs will be dropped. It is not worth it to try to add + // these txs to the pool as this will only happen with a misbehaving peer or if the txpool reaches + // the size limit. + return Err(IncomingBlockError::UnknownTransactions(block_hash, missing)); + }; + + txs.insert( + needed_hash, + new_tx_verification_data(tx) + .map_err(|e| IncomingBlockError::InvalidBlock(e.into()))?, + ); } } - RemoveFromBlocksBeingHandled { block_hash } - }; - - let (response_tx, response_rx) = oneshot::channel(); - - incoming_block_tx - .send(BlockchainManagerCommand::AddBlock { - block, - prepped_txs: txs, - response_tx, - }) - .await - .expect("TODO: don't actually panic here, an err means we are shutting down"); - - response_rx - .await - .expect("The blockchain manager will always respond") - .map_err(IncomingBlockError::InvalidBlock) -} -/// Pop blocks from the top of the blockchain. -/// -/// # Errors -/// -/// Will error if the blockchain manager is not set up yet. -pub async fn pop_blocks(numb_blocks: usize) -> Result<(), anyhow::Error> { - let Some(incoming_block_tx) = COMMAND_TX.get() else { - // We could still be starting up the blockchain manager. - return anyhow::bail!("The blockchain manager is not running yet"); - }; - - let (response_tx, response_rx) = oneshot::channel(); + // Add the blocks hash to the blocks being handled. + if !self.blocks_being_handled.lock().unwrap().insert(block_hash) { + // If another place is already adding this block then we can stop. + return Ok(IncomingBlockOk::AlreadyHave); + } - incoming_block_tx - .send(BlockchainManagerCommand::PopBlocks { - numb_blocks, - response_tx, - }) - .await?; + // We must remove the block hash from `blocks_being_handled`. + let blocks = Arc::clone(&self.blocks_being_handled); + let _guard = { + struct RemoveFromBlocksBeingHandled { + block_hash: [u8; 32], + blocks: Arc>>, + } + impl Drop for RemoveFromBlocksBeingHandled { + fn drop(&mut self) { + self.blocks.lock().unwrap().remove(&self.block_hash); + } + } + RemoveFromBlocksBeingHandled { block_hash, blocks } + }; + + let (response_tx, response_rx) = oneshot::channel(); + + self.command_tx + .send(BlockchainManagerCommand::AddBlock { + block, + prepped_txs: txs, + response_tx, + }) + .await + .map_err(|_| IncomingBlockError::ChannelClosed)?; + + response_rx + .await + .map_err(|_| IncomingBlockError::ChannelClosed)? + .map_err(IncomingBlockError::InvalidBlock) + } - Ok(response_rx.await?) + /// Pop blocks from the top of the blockchain. + /// + /// # Errors + /// + /// Will error if the blockchain manager channel is closed. + pub async fn pop_blocks(&self, numb_blocks: usize) -> Result<(), anyhow::Error> { + let (response_tx, response_rx) = oneshot::channel(); + + self.command_tx + .send(BlockchainManagerCommand::PopBlocks { + numb_blocks, + response_tx, + }) + .await?; + + Ok(response_rx.await?) + } } /// Check if we have a block with the given hash. diff --git a/binaries/cuprated/src/blockchain/manager.rs b/binaries/cuprated/src/blockchain/manager.rs index 057e08384..c8071b480 100644 --- a/binaries/cuprated/src/blockchain/manager.rs +++ b/binaries/cuprated/src/blockchain/manager.rs @@ -2,7 +2,7 @@ use std::{collections::HashMap, sync::Arc}; use futures::StreamExt; use monero_oxide::block::Block; -use tokio::sync::{mpsc, oneshot, Notify, OwnedSemaphorePermit}; +use tokio::sync::{mpsc, oneshot, Notify, OwnedSemaphorePermit, RwLock}; use tower::{BoxError, Service, ServiceExt}; use tracing::error; @@ -12,7 +12,7 @@ use cuprate_consensus::{ ExtendedConsensusError, }; use cuprate_p2p::{ - block_downloader::{BlockBatch, BlockDownloaderConfig}, + block_downloader::{self, BlockBatch}, BroadcastSvc, NetworkInterface, }; use cuprate_p2p_core::ClearNet; @@ -23,12 +23,10 @@ use cuprate_types::{ }; use crate::{ - blockchain::{ - chain_service::ChainService, interface::COMMAND_TX, syncer, - types::ConsensusBlockchainReadHandle, - }, + blockchain::{chain_service::ChainService, syncer, types::ConsensusBlockchainReadHandle}, constants::PANIC_CRITICAL_SERVICE_ERROR, txpool::TxpoolManagerHandle, + LaunchContext, }; mod commands; @@ -38,48 +36,47 @@ mod handler; mod tests; pub use commands::{BlockchainManagerCommand, IncomingBlockOk}; -use syncer::SyncerHandle; +use syncer::Syncer; /// Initialize the blockchain manager. /// -/// This function sets up the [`BlockchainManager`] and the [`syncer`] so that the functions in [`interface`](super::interface) +/// This function sets up the `BlockchainManager` and the [`Syncer`] so that the functions in [`interface`](super::interface) /// can be called. -pub async fn init_blockchain_manager( +pub(crate) async fn init_blockchain_manager( + launch_ctx: &LaunchContext, clearnet_interface: NetworkInterface, blockchain_write_handle: BlockchainWriteHandle, - blockchain_read_handle: BlockchainReadHandle, txpool_manager_handle: TxpoolManagerHandle, - mut blockchain_context_service: BlockchainContextService, - block_downloader_config: BlockDownloaderConfig, - syncer_handle: SyncerHandle, + syncer: Syncer, + command_rx: mpsc::Receiver, ) { + let block_downloader_config = launch_ctx.config.block_downloader_config(); // TODO: find good values for these size limits let (batch_tx, batch_rx) = mpsc::channel(1); let stop_current_block_downloader = Arc::new(Notify::new()); - let (command_tx, command_rx) = mpsc::channel(3); + let fast_sync_hashes = launch_ctx.config.fast_sync_hashes(); - COMMAND_TX.set(command_tx).unwrap(); - - tokio::spawn(syncer::syncer( - blockchain_context_service.clone(), - ChainService(blockchain_read_handle.clone()), + tokio::spawn(syncer.run( + launch_ctx.blockchain.context_svc(), + ChainService(launch_ctx.blockchain.read(), fast_sync_hashes), clearnet_interface.clone(), batch_tx, Arc::clone(&stop_current_block_downloader), block_downloader_config, - syncer_handle, )); let manager = BlockchainManager { blockchain_write_handle, blockchain_read_handle: ConsensusBlockchainReadHandle::new( - blockchain_read_handle, + launch_ctx.blockchain.read(), BoxError::from, ), txpool_manager_handle, - blockchain_context_service, + blockchain_context_service: launch_ctx.blockchain.context_svc(), stop_current_block_downloader, broadcast_svc: clearnet_interface.broadcast_svc(), + reorg_lock: Arc::clone(&launch_ctx.reorg_lock), + fast_sync_hashes, }; tokio::spawn(manager.run(batch_rx, command_rx)); @@ -102,11 +99,15 @@ pub struct BlockchainManager { /// The blockchain context cache, this caches the current state of the blockchain to quickly calculate/retrieve /// values without needing to go to a [`BlockchainReadHandle`]. blockchain_context_service: BlockchainContextService, - /// A [`Notify`] to tell the [`syncer`](syncer::syncer) that we want to cancel this current download + /// A [`Notify`] to tell the [`Syncer`] that we want to cancel this current download /// attempt. stop_current_block_downloader: Arc, /// The broadcast service, to broadcast new blocks. broadcast_svc: BroadcastSvc, + /// Reorg lock. + reorg_lock: Arc>, + /// Fast-sync hashes for this node's network. + fast_sync_hashes: &'static [[u8; 32]], } impl BlockchainManager { diff --git a/binaries/cuprated/src/blockchain/manager/commands.rs b/binaries/cuprated/src/blockchain/manager/commands.rs index ad63a2f70..fe9a922e3 100644 --- a/binaries/cuprated/src/blockchain/manager/commands.rs +++ b/binaries/cuprated/src/blockchain/manager/commands.rs @@ -30,8 +30,6 @@ pub enum BlockchainManagerCommand { pub enum IncomingBlockOk { /// The block was added to the main-chain. AddedToMainChain, - /// The blockchain manager is not ready yet. - NotReady, /// The block was added to an alt-chain. AddedToAltChain, /// We already have the block. diff --git a/binaries/cuprated/src/blockchain/manager/handler.rs b/binaries/cuprated/src/blockchain/manager/handler.rs index a523481e5..6a6c06c52 100644 --- a/binaries/cuprated/src/blockchain/manager/handler.rs +++ b/binaries/cuprated/src/blockchain/manager/handler.rs @@ -34,7 +34,6 @@ use cuprate_types::{ use crate::{ blockchain::manager::commands::{BlockchainManagerCommand, IncomingBlockOk}, constants::PANIC_CRITICAL_SERVICE_ERROR, - signals::REORG_LOCK, }; impl super::BlockchainManager { @@ -59,7 +58,8 @@ impl super::BlockchainManager { numb_blocks, response_tx, } => { - let _guard = REORG_LOCK.write().await; + let reorg_lock = Arc::clone(&self.reorg_lock); + let _guard = reorg_lock.write().await; self.pop_blocks(numb_blocks).await; self.blockchain_write_handle .ready() @@ -212,7 +212,7 @@ impl super::BlockchainManager { /// This function will panic if any internal service returns an unexpected error that we cannot /// recover from or if the incoming batch contains no blocks. async fn handle_incoming_block_batch_main_chain(&mut self, batch: BlockBatch) { - if batch.blocks.last().unwrap().0.number() < fast_sync_stop_height() { + if batch.blocks.last().unwrap().0.number() < fast_sync_stop_height(self.fast_sync_hashes) { self.handle_incoming_block_batch_fast_sync(batch).await; return; } @@ -416,9 +416,9 @@ impl super::BlockchainManager { /// Attempt a re-org with the given top block of the alt-chain. /// - /// This function will take a write lock on [`REORG_LOCK`] and then set up the blockchain database + /// This function will take a write lock on `reorg_lock` and then set up the blockchain database /// and context cache to verify the alt-chain. It will then attempt to verify and add each block - /// in the alt-chain to the main-chain. Releasing the lock on [`REORG_LOCK`] when finished. + /// in the alt-chain to the main-chain. Releasing the lock on `reorg_lock` when finished. /// /// # Errors /// @@ -434,7 +434,8 @@ impl super::BlockchainManager { &mut self, top_alt_block: AltBlockInformation, ) -> Result<(), anyhow::Error> { - let _guard = REORG_LOCK.write().await; + let reorg_lock = Arc::clone(&self.reorg_lock); + let _guard = reorg_lock.write().await; let BlockchainResponse::AltBlocksInChain(mut alt_blocks) = self .blockchain_read_handle diff --git a/binaries/cuprated/src/blockchain/manager/tests.rs b/binaries/cuprated/src/blockchain/manager/tests.rs index de28ce99a..6f6727ec7 100644 --- a/binaries/cuprated/src/blockchain/manager/tests.rs +++ b/binaries/cuprated/src/blockchain/manager/tests.rs @@ -73,6 +73,8 @@ async fn mock_manager(data_dir: PathBuf) -> BlockchainManager { blockchain_context_service, stop_current_block_downloader: Arc::new(Default::default()), broadcast_svc: BroadcastSvc::mock(), + reorg_lock: Arc::new(Default::default()), + fast_sync_hashes: &[], } } @@ -222,8 +224,6 @@ async fn simple_reorg() { /// Same as [`simple_reorg`] but uses block batches instead. #[tokio::test] async fn simple_reorg_block_batch() { - cuprate_fast_sync::set_fast_sync_hashes(&[]); - let handle = HandleBuilder::new().build(); // create 2 managers diff --git a/binaries/cuprated/src/blockchain/syncer.rs b/binaries/cuprated/src/blockchain/syncer.rs index fd7e40910..179e3b4cd 100644 --- a/binaries/cuprated/src/blockchain/syncer.rs +++ b/binaries/cuprated/src/blockchain/syncer.rs @@ -1,4 +1,10 @@ -use std::{future::Future, sync::Arc}; +use std::{ + future::Future, + sync::{ + atomic::{AtomicU64, Ordering}, + Arc, + }, +}; use futures::{FutureExt, StreamExt}; use tokio::sync::{mpsc, Notify, OwnedSemaphorePermit, Semaphore}; @@ -7,215 +13,237 @@ use tracing::instrument; use cuprate_consensus::{BlockChainContextRequest, BlockChainContextResponse, BlockchainContext}; use cuprate_consensus_context::BlockchainContextService; +use cuprate_helper::cast::usize_to_u64; use cuprate_p2p::{ block_downloader::{BlockBatch, BlockDownloaderConfig, ChainSvcRequest, ChainSvcResponse}, NetworkInterface, PeerSetRequest, PeerSetResponse, }; use cuprate_p2p_core::{client::PeerSyncCallback, ClearNet, CoreSyncData, NetworkZone}; -use super::interface::is_block_being_handled; +use super::BlockchainInterface; -/// An error returned from the [`syncer`]. +/// An error returned from the [`Syncer`]. #[derive(Debug, thiserror::Error)] pub enum SyncerError { #[error("Incoming block channel closed.")] IncomingBlockChannelClosed, #[error("One of our services returned an error: {0}.")] ServiceError(#[from] tower::BoxError), + #[error("Sync permit semaphore closed unexpectedly: {0}.")] + SemaphoreClosed(#[from] tokio::sync::AcquireError), } -/// The syncer tasks that makes sure we are fully synchronised with our connected peers. -#[instrument(level = "debug", skip_all)] -#[expect(clippy::significant_drop_tightening)] -pub async fn syncer( - mut context_svc: BlockchainContextService, - our_chain: CN, - mut clearnet_interface: NetworkInterface, - incoming_block_batch_tx: mpsc::Sender<(BlockBatch, Arc)>, - stop_current_block_downloader: Arc, - block_downloader_config: BlockDownloaderConfig, - mut syncer_handle: SyncerHandle, -) -> Result<(), SyncerError> -where - CN: Service< - ChainSvcRequest, - Response = ChainSvcResponse, - Error = tower::BoxError, - > + Clone - + Send - + 'static, - CN::Future: Send + 'static, -{ - tracing::info!("Starting blockchain syncer"); - tracing::debug!("Waiting for new sync info in top sync channel"); - - let semaphore = Arc::new(Semaphore::new(1)); - let mut sync_permit = Arc::new(Arc::clone(&semaphore).acquire_owned().await.unwrap()); - - loop { - syncer_handle.notify_syncer.notified().await; - - tracing::trace!("Checking connected peers to see if we are behind",); - - let blockchain_context = context_svc.blockchain_context(); - - match check_sync_status(blockchain_context, &mut clearnet_interface).await? { - SyncStatus::BehindPeers => {} - SyncStatus::NoPeers => continue, - SyncStatus::Synced => { - if let Some(synced) = syncer_handle.synced_tx.take() { - tracing::info!("Synchronised with the network."); - #[expect(clippy::let_underscore_must_use)] - let _ = synced.send(()); - } - continue; - } - } +#[derive(Debug, PartialEq)] +enum SyncStatus { + NoPeers, + BehindPeers, + Synced, +} + +/// The syncer that makes sure we are fully synchronised with our connected peers. +pub struct Syncer { + notify_syncer: Arc, + synced_tx: Option>, + target_height: Arc, +} + +impl Syncer { + /// Create a new [`Syncer`] and its [`SyncerHandle`]. + pub fn new() -> (Self, SyncerHandle) { + let notify_syncer = Arc::new(Notify::new()); + let (synced_tx, synced_rx) = futures::channel::oneshot::channel(); + let target_height = Arc::new(AtomicU64::new(0)); - tracing::debug!( - "We are behind peers claimed cumulative difficulty, starting block downloader" - ); - let mut block_batch_stream = - clearnet_interface.block_downloader(our_chain.clone(), block_downloader_config); + let syncer_handle = SyncerHandle { + notify_syncer: Arc::clone(¬ify_syncer), + synced: synced_rx.shared(), + target_height: Arc::clone(&target_height), + }; + + let syncer = Self { + notify_syncer, + synced_tx: Some(synced_tx), + target_height, + }; + + (syncer, syncer_handle) + } + + /// Run the syncer. + #[instrument(level = "debug", skip_all)] + #[expect(clippy::significant_drop_tightening)] + pub async fn run( + mut self, + mut context_svc: BlockchainContextService, + our_chain: CN, + mut clearnet_interface: NetworkInterface, + incoming_block_batch_tx: mpsc::Sender<(BlockBatch, Arc)>, + stop_current_block_downloader: Arc, + block_downloader_config: BlockDownloaderConfig, + ) -> Result<(), SyncerError> + where + CN: Service< + ChainSvcRequest, + Response = ChainSvcResponse, + Error = tower::BoxError, + > + Clone + + Send + + 'static, + CN::Future: Send + 'static, + { + tracing::info!("Starting blockchain syncer"); + tracing::debug!("Waiting for new sync info in top sync channel"); + + let semaphore = Arc::new(Semaphore::new(1)); + let mut sync_permit = Arc::new(Arc::clone(&semaphore).acquire_owned().await?); loop { - tokio::select! { - () = stop_current_block_downloader.notified() => { - tracing::info!("Received stop signal, stopping block downloader"); + self.notify_syncer.notified().await; - drop(sync_permit); - sync_permit = Arc::new(Arc::clone(&semaphore).acquire_owned().await.unwrap()); + tracing::trace!("Checking connected peers to see if we are behind",); - break; + match self + .check_sync_status(&mut context_svc, &mut clearnet_interface) + .await? + { + SyncStatus::BehindPeers => {} + SyncStatus::NoPeers => continue, + SyncStatus::Synced => { + if let Some(synced) = self.synced_tx.take() { + tracing::info!("Synchronised with the network."); + #[expect(clippy::let_underscore_must_use)] + let _ = synced.send(()); + } + continue; } - batch = block_batch_stream.next() => { - let Some(batch) = batch else { - // Wait for all references to the permit have been dropped (which means all blocks in the queue - // have been handled before checking if we are synced. - drop(sync_permit); - sync_permit = Arc::new(Arc::clone(&semaphore).acquire_owned().await.unwrap()); + } - let blockchain_context = context_svc.blockchain_context(); + tracing::debug!( + "We are behind peers claimed cumulative difficulty, starting block downloader" + ); + let mut block_batch_stream = + clearnet_interface.block_downloader(our_chain.clone(), block_downloader_config); - if check_sync_status(blockchain_context, &mut clearnet_interface).await? == SyncStatus::Synced { - tracing::info!("Synchronised with the network."); - if let Some(synced) = syncer_handle.synced_tx.take() { - #[expect(clippy::let_underscore_must_use)] - let _ = synced.send(()); - } - } + loop { + tokio::select! { + () = stop_current_block_downloader.notified() => { + tracing::info!("Received stop signal, stopping block downloader"); + + drop(sync_permit); + sync_permit = Arc::new(Arc::clone(&semaphore).acquire_owned().await?); break; - }; + } + batch = block_batch_stream.next() => { + let Some(batch) = batch else { + // Wait for all references to the permit have been dropped (which means all blocks in the queue + // have been handled before checking if we are synced. + drop(sync_permit); + sync_permit = Arc::new(Arc::clone(&semaphore).acquire_owned().await?); + + if self.check_sync_status(&mut context_svc, &mut clearnet_interface).await? == SyncStatus::Synced { + tracing::info!("Synchronised with the network."); + if let Some(synced) = self.synced_tx.take() { + #[expect(clippy::let_underscore_must_use)] + let _ = synced.send(()); + } + } + + break; + }; - tracing::debug!("Got batch, len: {}", batch.blocks.len()); - if incoming_block_batch_tx.send((batch, Arc::clone(&sync_permit))).await.is_err() { - return Err(SyncerError::IncomingBlockChannelClosed); + tracing::debug!("Got batch, len: {}", batch.blocks.len()); + if incoming_block_batch_tx.send((batch, Arc::clone(&sync_permit))).await.is_err() { + return Err(SyncerError::IncomingBlockChannelClosed); + } } } } } } -} -#[derive(Debug, PartialEq)] -enum SyncStatus { - NoPeers, - BehindPeers, - Synced, -} + /// Checks if we are behind the connected peers. + async fn check_sync_status( + &mut self, + context_svc: &mut BlockchainContextService, + clearnet_interface: &mut NetworkInterface, + ) -> Result { + let PeerSetResponse::MostPoWSeen { + cumulative_difficulty, + height, + .. + } = clearnet_interface + .peer_set() + .ready() + .await? + .call(PeerSetRequest::MostPoWSeen) + .await? + else { + unreachable!(); + }; + + if cumulative_difficulty == 0 { + self.target_height.store(0, Ordering::Relaxed); + return Ok(SyncStatus::NoPeers); + } -/// Checks if we are behind the connected peers. -async fn check_sync_status( - blockchain_context: &BlockchainContext, - clearnet_interface: &mut NetworkInterface, -) -> Result { - let PeerSetResponse::MostPoWSeen { - cumulative_difficulty, - .. - } = clearnet_interface - .peer_set() - .ready() - .await? - .call(PeerSetRequest::MostPoWSeen) - .await? - else { - unreachable!(); - }; - - if cumulative_difficulty == 0 { - return Ok(SyncStatus::NoPeers); - } + if cumulative_difficulty > context_svc.blockchain_context().cumulative_difficulty { + self.target_height + .store(usize_to_u64(height), Ordering::Relaxed); + return Ok(SyncStatus::BehindPeers); + } - if cumulative_difficulty > blockchain_context.cumulative_difficulty { - return Ok(SyncStatus::BehindPeers); + self.target_height.store(0, Ordering::Relaxed); + Ok(SyncStatus::Synced) } - - Ok(SyncStatus::Synced) -} - -/// The handle for the blockchain syncer. -pub struct SyncerHandle { - /// The syncer notify channel, used to wake the syncer. - notify_syncer: Arc, - /// The synced notify channel, used to wake the tasks waiting on cuprate to be synced. - synced_tx: Option>, } -/// Notifications for sync state. +/// Handle for the [`Syncer`]. #[derive(Clone)] -pub struct SyncNotify { +pub struct SyncerHandle { /// The syncer notify channel, used to wake the syncer. notify_syncer: Arc, /// The synced notify channel, used to wake the tasks waiting on cuprate to be synced. synced: futures::future::Shared>, + /// The target height we are syncing to, 0 if not syncing. + target_height: Arc, } -impl SyncNotify { - /// Creates a new [`SyncNotify`] with the corresponding handle for the syncer. - pub fn new() -> (Self, SyncerHandle) { - let notify_syncer = Arc::new(Notify::new()); - let (synced_tx, synced_rx) = futures::channel::oneshot::channel(); +impl SyncerHandle { + /// Returns the target sync height. 0 if not syncing. + pub fn target_height(&self) -> u64 { + self.target_height.load(Ordering::Relaxed) + } - ( - Self { - notify_syncer: Arc::clone(¬ify_syncer), - synced: synced_rx.shared(), - }, - SyncerHandle { - notify_syncer, - synced_tx: Some(synced_tx), - }, - ) + /// A future that resolves when cuprate has synced with the network. + pub fn wait_for_synced( + &self, + ) -> impl Future> + 'static { + self.synced.clone() } /// Creates a [`PeerSyncCallback`] that filters and wakes the syncer. - pub fn callback(&self, context_svc: BlockchainContextService) -> PeerSyncCallback { - let this = self.clone(); + pub fn callback(&self, blockchain: &BlockchainInterface) -> PeerSyncCallback { + let context_svc = blockchain.context_svc(); + let blockchain_manager = blockchain.manager(); + let handle = self.clone(); PeerSyncCallback::new(move |peer_csd: &CoreSyncData| { let ctx = context_svc.blockchain_context_snapshot(); // If we are synced and the syncer hasn't yet set the node to synced, wake the syncer. if peer_csd.cumulative_difficulty() == ctx.cumulative_difficulty - && this.synced.peek().is_none() + && handle.synced.peek().is_none() { - this.notify_syncer.notify_one(); + handle.notify_syncer.notify_one(); } // If we are behind the peer, and we aren't just one block behind with the blockchain manager handling the block, wake the syncer. if peer_csd.cumulative_difficulty() > ctx.cumulative_difficulty && !(peer_csd.current_height.saturating_sub(1) == ctx.chain_height as u64 - && is_block_being_handled(&peer_csd.top_id)) + && blockchain_manager.is_block_being_handled(&peer_csd.top_id)) { - this.notify_syncer.notify_one(); + handle.notify_syncer.notify_one(); } }) } - - /// A future that resolves when cuprate has synced with the network. - pub fn wait_for_synced( - &self, - ) -> impl Future> + 'static { - self.synced.clone() - } } diff --git a/binaries/cuprated/src/commands.rs b/binaries/cuprated/src/commands.rs index 421584ce3..6de9755cd 100644 --- a/binaries/cuprated/src/commands.rs +++ b/binaries/cuprated/src/commands.rs @@ -1,7 +1,11 @@ //! Commands //! //! `cuprated` [`Command`] definition and handling. -use std::{io, thread::sleep, time::Duration}; +use std::{ + io, + thread::sleep, + time::{Duration, Instant}, +}; use clap::{builder::TypedValueParser, Parser, ValueEnum}; use tokio::sync::mpsc; @@ -13,11 +17,7 @@ use cuprate_consensus_context::{ }; use cuprate_helper::time::secs_to_hms; -use crate::{ - constants::PANIC_CRITICAL_SERVICE_ERROR, - logging::{self, CupratedTracingFilter}, - statics, -}; +use cuprated::logging::{self, CupratedTracingFilter}; /// A command received from [`io::stdin`]. #[derive(Debug, Parser)] @@ -91,10 +91,9 @@ pub fn command_listener(incoming_commands: mpsc::Sender) -> ! { } /// The [`Command`] handler loop. -pub async fn io_loop( - mut incoming_commands: mpsc::Receiver, - mut context_service: BlockchainContextService, -) { +pub async fn io_loop(mut incoming_commands: mpsc::Receiver, mut node: cuprated::Node) { + let start_instant = Instant::now(); + loop { let Some(command) = incoming_commands.recv().await else { tracing::warn!("Shutting down io_loop command channel closed."); @@ -119,9 +118,9 @@ pub async fn io_loop( } } Command::Status => { - let context = context_service.blockchain_context(); + let context = node.blockchain.context(); - let uptime = statics::START_INSTANT.elapsed().unwrap_or_default(); + let uptime = start_instant.elapsed(); let (h, m, s) = secs_to_hms(uptime.as_secs()); let height = context.chain_height; @@ -130,13 +129,14 @@ pub async fn io_loop( println!("STATUS:\n uptime: {h}h {m}m {s}s,\n height: {height},\n top_hash: {top_hash}"); } Command::FastSyncStopHeight => { - let stop_height = cuprate_fast_sync::fast_sync_stop_height(); + let stop_height = + cuprate_fast_sync::fast_sync_stop_height(node.config.fast_sync_hashes()); println!("{stop_height}"); } Command::PopBlocks { numb_blocks } => { tracing::info!("Popping {numb_blocks} blocks."); - let res = crate::blockchain::interface::pop_blocks(numb_blocks).await; + let res = node.blockchain.manager().pop_blocks(numb_blocks).await; match res { Ok(()) => println!("Popped {numb_blocks} blocks."), diff --git a/binaries/cuprated/src/config.rs b/binaries/cuprated/src/config.rs index 0288e80c5..4f87712fb 100644 --- a/binaries/cuprated/src/config.rs +++ b/binaries/cuprated/src/config.rs @@ -6,12 +6,10 @@ use std::{ net::{IpAddr, TcpListener}, path::{Path, PathBuf}, str::FromStr, - sync::LazyLock, time::Duration, }; -use anyhow::bail; -use clap::Parser; +use anyhow::{bail, Context}; use cuprate_blockchain::config::CacheSizes; use serde::{Deserialize, Serialize}; @@ -25,7 +23,6 @@ use cuprate_p2p_core::{ClearNet, Tor}; use cuprate_wire::OnionAddr; use crate::{ - constants::{DEFAULT_CONFIG_STARTUP_DELAY, DEFAULT_CONFIG_WARNING}, logging::eprintln_red, tor::{TorContext, TorMode}, }; @@ -33,7 +30,6 @@ use crate::{ #[cfg(feature = "arti")] use {arti_client::KeystoreSelector, safelog::DisplayRedacted}; -mod args; mod default; mod fs; mod p2p; @@ -57,6 +53,14 @@ use tokio::TokioConfig; use tor::TorConfig; use tracing_config::TracingConfig; +/// Result of a single check from [`Config::dry_run_check`]. +pub struct DryRunResult { + /// Description of the check. + pub description: String, + /// The result of the check. + pub result: Result<(), anyhow::Error>, +} + /// Header to put at the start of the generated config file. const HEADER: &str = r"## ____ _ ## / ___| _ _ __ _ __ __ _| |_ ___ @@ -75,66 +79,50 @@ const HEADER: &str = r"## ____ _ "; -/// A lazy-lock that reads and stores total system memory. -static MEMORY: LazyLock = LazyLock::new(|| { - tracing::info!("Attempting to read total memory from system"); +/// Resolves `target_max_memory` from system RAM if unset. +pub fn resolve_max_memory(config: &mut Config) { + // TODO: don't use `DefaultOrCustom` for target_max_memory. + if matches!(config.target_max_memory, DefaultOrCustom::Default) { + tracing::info!("Attempting to read total memory from system"); - let mut info = sysinfo::System::new(); - info.refresh_memory(); + let mut info = sysinfo::System::new(); + info.refresh_memory(); + let memory = info.total_memory(); - let memory = info.total_memory(); + if memory == 0 { + eprintln_red("Unable to read total memory, please manually set the `target_max_memory` value in the config file."); + std::process::exit(1); + } - if memory == 0 { - eprintln_red("Unable to read total memory, please manually set the `target_max_memory` value in the config file."); - std::process::exit(1); + config.target_max_memory = DefaultOrCustom::Custom(memory); } +} - memory -}); - -/// Reads the args & config file, returning a [`Config`]. -pub fn read_config_and_args() -> Config { - let args = args::Args::parse(); - args.do_quick_requests(); +/// Finds and reads a config file from the default locations. +/// +/// Tries the current directory first, then the config directory. +/// Returns `None` if no config file is found in either location. +/// +/// # Errors +/// +/// Returns an error if a config file is found but cannot be parsed. +pub fn find_config() -> Result, anyhow::Error> { + let paths = [ + std::env::current_dir() + .ok() + .map(|p| p.join(DEFAULT_CONFIG_FILE_NAME)), + Some(CUPRATE_CONFIG_DIR.join(DEFAULT_CONFIG_FILE_NAME)), + ]; - let config: Config = if let Some(config_file) = &args.config_file { - // If a config file was set in the args try to read it and exit if we can't. - match Config::read_from_path(config_file) { - Ok(config) => config, - Err(e) => { - eprintln_red(&format!("Failed to read config from file: {e}")); - std::process::exit(1); - } + for path in paths.into_iter().flatten() { + if !path.exists() { + continue; } - } else { - // First attempt to read the config file from the current directory. - std::env::current_dir() - .map(|path| path.join(DEFAULT_CONFIG_FILE_NAME)) - .map_err(Into::into) - .and_then(Config::read_from_path) - .inspect_err(|e| tracing::debug!("Failed to read config from current dir: {e}")) - // otherwise try the main config directory. - .or_else(|_| { - let file = CUPRATE_CONFIG_DIR.join(DEFAULT_CONFIG_FILE_NAME); - Config::read_from_path(file) - }) - .inspect_err(|e| { - tracing::debug!("Failed to read config from config dir: {e}"); - if !args.skip_config_warning { - eprintln_red(DEFAULT_CONFIG_WARNING); - std::thread::sleep(DEFAULT_CONFIG_STARTUP_DELAY); - } - }) - .unwrap_or_default() - }; - - let config = args.apply_args(config); - - if args.dry_run { - config.dry_run_check(); + + return Config::read_from_path(&path).map(Some); } - config + Ok(None) } config_struct! { @@ -252,19 +240,19 @@ impl Config { /// # Errors /// /// Will return an [`Err`] if the file cannot be read or if the file is not a valid [`toml`] config. - fn read_from_path(file: impl AsRef) -> Result { + pub fn read_from_path(file: impl AsRef) -> Result { let file_text = read_to_string(file.as_ref())?; - Ok(toml::from_str(&file_text) - .inspect(|_| println!("Using config at: {}", file.as_ref().to_string_lossy())) - .inspect_err(|e| { - eprintln_red(&format!( - "Failed to parse config file at: {}", - file.as_ref().to_string_lossy() - )); - eprintln_red(&format!("{e}")); - std::process::exit(1); - })?) + let config: Self = toml::from_str(&file_text).with_context(|| { + format!( + "Failed to parse config file at: {}", + file.as_ref().to_string_lossy() + ) + })?; + + println!("Using config at: {}", file.as_ref().to_string_lossy()); + + Ok(config) } /// Returns the current [`Network`] we are running on. @@ -272,6 +260,12 @@ impl Config { self.network } + /// Returns the fast-sync validation hashes for this config's network, + /// or `&[]` if fast sync is disabled. + pub fn fast_sync_hashes(&self) -> &'static [[u8; 32]] { + crate::blockchain::get_fast_sync_hashes(self.fast_sync, self.network) + } + /// The [`ClearNet`], [`cuprate_p2p::P2PConfig`]. pub fn clearnet_p2p_config(&self) -> cuprate_p2p::P2PConfig { cuprate_p2p::P2PConfig { @@ -378,7 +372,9 @@ impl Config { /// Returns the target maximum memory usage. pub fn target_max_memory(&self) -> u64 { match self.target_max_memory { - DefaultOrCustom::Default => *MEMORY, + DefaultOrCustom::Default => { + panic!("`target_max_memory` is unresolved; call `resolve_max_memory` first") + } DefaultOrCustom::Custom(size) => size, } } @@ -435,130 +431,95 @@ impl Config { Ok(()) } - pub fn dry_run_check(self) -> ! { - let mut error = false; + pub fn dry_run_check(&self) -> Vec { + let mut results = Vec::new(); if self.p2p.clear_net.enable_inbound { let port = p2p_port(self.p2p.clear_net.p2p_port, self.network); let ip = self.p2p.clear_net.listen_on; - match Self::check_port(IpAddr::V4(ip), port) { - Ok(()) => println!("P2P clearnet {ip}:{port} available."), - Err(e) => { - eprintln_red(&format!("Error: {e}")); - error = true; - } - } + results.push(DryRunResult { + description: format!("P2P clearnet {ip}:{port} available."), + result: Self::check_port(IpAddr::V4(ip), port), + }); } if self.p2p.clear_net.enable_inbound_v6 { let port = p2p_port(self.p2p.clear_net.p2p_port, self.network); let ip = self.p2p.clear_net.listen_on_v6; - match Self::check_port(IpAddr::V6(ip), port) { - Ok(()) => println!("P2P clearnet {ip}:{port} available."), - Err(e) => { - eprintln_red(&format!("Error: {e}")); - error = true; - } - } + results.push(DryRunResult { + description: format!("P2P clearnet {ip}:{port} available."), + result: Self::check_port(IpAddr::V6(ip), port), + }); } if self.rpc.restricted.enable { let port = restricted_rpc_port(self.rpc.restricted.port, self.network); let ip = self.rpc.restricted.address; - match Self::check_port(ip, port) { - Ok(()) => println!("RPC restricted {ip}:{port} available."), - Err(e) => { - eprintln_red(&format!("Error: {e}")); - error = true; - } - } + results.push(DryRunResult { + description: format!("RPC restricted {ip}:{port} available."), + result: Self::check_port(ip, port), + }); } if self.rpc.unrestricted.enable { let port = unrestricted_rpc_port(self.rpc.unrestricted.port, self.network); let ip = self.rpc.unrestricted.address; - match Self::check_port(ip, port) { - Ok(()) => println!("RPC unrestricted {ip}:{port} available."), - Err(e) => { - eprintln_red(&format!("Error: {e}")); - error = true; - } - } + results.push(DryRunResult { + description: format!("RPC unrestricted {ip}:{port} available."), + result: Self::check_port(ip, port), + }); } if self.tor.mode == TorMode::Daemon { let port = self.tor.daemon.listening_addr.port(); let ip = self.tor.daemon.listening_addr.ip(); - match Self::check_port(ip, port) { - Ok(()) => println!("Tor daemon {ip}:{port} available."), - Err(e) => { - eprintln_red(&format!("Error: {e}")); - error = true; - } - } + results.push(DryRunResult { + description: format!("Tor daemon {ip}:{port} available."), + result: Self::check_port(ip, port), + }); } - match Self::check_dir_permissions(&self.fs.fast_data_directory) { - Ok(()) => println!( - "Permissions are ok at {}", + results.push(DryRunResult { + description: format!( + "File permissions are valid at {}", self.fs.fast_data_directory.display() ), - Err(e) => { - eprintln_red(&format!("Error: {e}")); - error = true; - } - } + result: Self::check_dir_permissions(&self.fs.fast_data_directory), + }); - match Self::check_dir_permissions(&self.fs.slow_data_directory) { - Ok(()) => println!( - "Permissions are ok at {}", + results.push(DryRunResult { + description: format!( + "File permissions are valid at {}", self.fs.slow_data_directory.display() ), - Err(e) => { - eprintln_red(&format!("Error: {e}")); - error = true; - } - } + result: Self::check_dir_permissions(&self.fs.slow_data_directory), + }); - match Self::check_dir_permissions(&self.fs.cache_directory) { - Ok(()) => println!( - "Permissions are ok at {}", + results.push(DryRunResult { + description: format!( + "File permissions are valid at {}", self.fs.cache_directory.display() ), - Err(e) => { - eprintln_red(&format!("Error {e}")); - error = true; - } - } + result: Self::check_dir_permissions(&self.fs.cache_directory), + }); #[cfg(feature = "arti")] if matches!(self.tor.mode, TorMode::Arti | TorMode::Auto) { - match Self::check_dir_permissions(&self.tor.arti.directory_path) { - Ok(()) => println!( - "Permissions are ok at {}", + results.push(DryRunResult { + description: format!( + "File permissions are valid at {}", self.tor.arti.directory_path.display() ), - Err(e) => { - eprintln_red(&format!("Error: {e}")); - error = true; - } - } + result: Self::check_dir_permissions(&self.tor.arti.directory_path), + }); } - let code = if error { - eprintln_red("Checks failed."); - 1 - } else { - println!("All checks passed successfully!"); - 0 - }; - - std::process::exit(code) + results } } diff --git a/binaries/cuprated/src/lib.rs b/binaries/cuprated/src/lib.rs new file mode 100644 index 000000000..bab46b6a7 --- /dev/null +++ b/binaries/cuprated/src/lib.rs @@ -0,0 +1,284 @@ +//! `cuprated` library. +//! +//! Call [`Node::launch`] to initialize and run the node. Returns a [`Node`] +//! with handles to node services. +//! +//! # Example +//! +//! ```ignore +//! use cuprated::{config::Config, Node}; +//! +//! let config = Config::read_from_path("cuprated.toml")?; +//! cuprated::logging::init_logging(&config); +//! +//! let mut node = Node::launch(config).await; +//! let height = node.blockchain.context().chain_height; +//! ``` + +#![doc = include_str!("../README.md")] +#![cfg_attr(docsrs, feature(doc_cfg))] +#![allow( + unused_imports, + unreachable_pub, + unreachable_code, + unused_crate_dependencies, + dead_code, + unused_variables, + clippy::needless_pass_by_value, + clippy::unused_async, + clippy::diverging_sub_expression, + unused_mut, + clippy::let_unit_value, + clippy::needless_pass_by_ref_mut, + reason = "TODO: remove after v1.0.0" +)] + +pub mod blockchain; +pub mod config; +pub mod constants; +pub mod logging; +pub mod version; + +mod p2p; +mod rpc; +mod tor; +mod txpool; + +use std::sync::Arc; + +use tokio::sync::{oneshot, RwLock}; +use tower::{Service, ServiceExt}; +use tracing::error; + +use cuprate_p2p::NetworkInterface; +use cuprate_p2p_core::{ClearNet, Tor}; +use cuprate_txpool::service::TxpoolReadHandle; +use cuprate_types::blockchain::BlockchainWriteRequest; + +use crate::{ + blockchain::{BlockchainInterface, BlockchainManagerHandle, Syncer, SyncerHandle}, + config::Config, + constants::{DATABASE_CORRUPT_MSG, PANIC_CRITICAL_SERVICE_ERROR}, + tor::initialize_tor_if_enabled, + txpool::IncomingTxHandler, +}; + +/// Captures the necessary context for launching the node. +/// +/// A field belongs here if it is `Clone + Send + Sync`, used by +/// multiple subsystems, and available before subsystem init begins. +/// Write handles, single-consumer channels, `!Sync` types, +/// and late-constructed services do _not_ belong here. +#[derive(Clone)] +pub(crate) struct LaunchContext { + /// The configuration this node was launched with. + pub config: Arc, + + /// Reorg lock. + /// + /// A [`RwLock`] where a write lock is taken during a reorg and a read lock can be taken + /// for any operation which must complete without a reorg happening. + /// + /// Currently, the only operation that needs to take a read lock is adding txs to the tx-pool, + /// this can potentially be removed in the future, see: + pub reorg_lock: Arc>, + + /// Interface to the blockchain (database reads, cached state, mutations). + pub blockchain: BlockchainInterface, + + /// Read handle to the transaction pool. + pub txpool_read: TxpoolReadHandle, + + /// Syncer handle. + pub syncer: SyncerHandle, +} + +/// An active `cuprated` node. +/// +/// Returned by [`Node::launch`]. Use this to interact with the running node. +#[must_use] +pub struct Node { + /// Interface to the blockchain. + pub blockchain: BlockchainInterface, + + /// Transaction pool queries. + pub txpool: TxpoolReadHandle, + + /// Clearnet P2P interface. + pub clearnet: NetworkInterface, + + /// Tor P2P interface (available after sync). + pub tor: Option>>, + + /// Syncer handle. + pub syncer: SyncerHandle, + + /// The configuration this node was launched with. + pub config: Arc, +} + +impl Node { + /// Launch a new `cuprated` process. + /// + /// Sets up thread pools, databases, P2P networking, the blockchain manager, + /// and RPC servers. + /// + /// The caller should set up the following before calling this: + /// - Tracing/logging (the node emits tracing events during initialization) + /// - Global rayon thread pool (optional, uses rayon defaults if not set) + /// - Memory resolution (call [`resolve_max_memory`](crate::config::resolve_max_memory)) + /// + /// # Panics + /// + /// Panics if the database is corrupt, critical services fail to start, + /// or `target_max_memory` is unresolved. + pub async fn launch(config: impl Into>) -> Self { + let config: Arc = config.into(); + + // Initialize the database thread pool. + let db_thread_pool = Arc::new( + rayon::ThreadPoolBuilder::new() + .num_threads(config.storage.reader_threads) + .build() + .unwrap(), + ); + + // Start the blockchain & tx-pool databases. + let fjall_db = fjall::Database::builder(config.fjall_directory()) + .cache_size(config.fjall_cache_size()) + .open() + .unwrap(); + + let (mut blockchain_read_handle, mut blockchain_write_handle, _) = + cuprate_blockchain::service::init_with_pool( + &config.blockchain_config(), + fjall_db.clone(), + Arc::clone(&db_thread_pool), + ) + .inspect_err(|e| error!("Blockchain database error: {e}")) + .expect(DATABASE_CORRUPT_MSG); + + let (txpool_read_handle, txpool_write_handle) = + cuprate_txpool::service::init_with_pool(fjall_db, db_thread_pool) + .inspect_err(|e| error!("Txpool database error: {e}")) + .expect(DATABASE_CORRUPT_MSG); + + // TODO: Add an argument/option for keeping alt blocks between restart. + blockchain_write_handle + .ready() + .await + .expect(PANIC_CRITICAL_SERVICE_ERROR) + .call(BlockchainWriteRequest::FlushAltBlocks) + .await + .expect(PANIC_CRITICAL_SERVICE_ERROR); + + // Check add the genesis block to the blockchain. + blockchain::check_add_genesis( + &mut blockchain_read_handle, + &mut blockchain_write_handle, + config.network(), + ) + .await; + + // Start the context service and the block/tx verifier. + let context_svc = + blockchain::init_consensus(blockchain_read_handle.clone(), config.context_config()) + .await + .unwrap(); + + // Create the syncer and handle. + let (syncer, syncer_handle) = Syncer::new(); + + // Create the blockchain manager handle and command receiver. + let (blockchain_manager_handle, command_rx) = BlockchainManagerHandle::new(); + + // Create the blockchain interface. + let blockchain_interface = BlockchainInterface::new( + blockchain_read_handle.clone(), + context_svc.clone(), + blockchain_manager_handle.clone(), + ); + + // Create the launch context. + let launch_ctx = LaunchContext { + config, + reorg_lock: Arc::new(RwLock::new(())), + blockchain: blockchain_interface, + txpool_read: txpool_read_handle.clone(), + syncer: syncer_handle, + }; + + // Bootstrap or configure Tor if enabled. + let tor_enabled = launch_ctx.config.p2p.tor_net.enabled; + let tor_context = initialize_tor_if_enabled(&launch_ctx).await; + + // Start clearnet P2P zone + let (clearnet_interface, clearnet_tx_handler_subscriber) = + p2p::initialize_clearnet_p2p(&launch_ctx, &tor_context).await; + + // Create Tor router delivery channel. + let (tor_router_tx, tor_router_rx) = tor_enabled.then(oneshot::channel).unzip(); + + // Create the incoming tx handler service. + let tx_handler = IncomingTxHandler::init( + &launch_ctx, + clearnet_interface.clone(), + tor_router_rx, + txpool_write_handle.clone(), + ) + .await; + + // Send tx handler sender to clearnet zone + if clearnet_tx_handler_subscriber + .send(tx_handler.clone()) + .is_err() + { + unreachable!() + } + + // Tor interface channel - populated when Tor starts after sync. + let (tor_tx, tor_rx) = oneshot::channel(); + + // Initialize the blockchain manager. + blockchain::init_blockchain_manager( + &launch_ctx, + clearnet_interface.clone(), + blockchain_write_handle, + tx_handler.txpool_manager.clone(), + syncer, + command_rx, + ) + .await; + + // Initialize the RPC server(s). + rpc::init_rpc_servers(&launch_ctx, tx_handler.clone()); + + // Start Tor P2P zone after sync completes. + if tor_enabled { + p2p::initialize_tor_p2p( + launch_ctx.clone(), + tor_context, + tx_handler, + tor_tx, + tor_router_tx, + ); + } + + let LaunchContext { + blockchain, + txpool_read, + syncer, + config, + .. + } = launch_ctx; + + Self { + blockchain, + txpool: txpool_read, + clearnet: clearnet_interface, + tor: if tor_enabled { Some(tor_rx) } else { None }, + syncer, + config, + } + } +} diff --git a/binaries/cuprated/src/main.rs b/binaries/cuprated/src/main.rs index ae2e16b25..5bea5fabb 100644 --- a/binaries/cuprated/src/main.rs +++ b/binaries/cuprated/src/main.rs @@ -1,5 +1,8 @@ -#![doc = include_str!("../README.md")] -#![cfg_attr(docsrs, feature(doc_cfg))] +//! `cuprated` CLI binary. +//! +//! Wrapper around [`cuprated::Node::launch`] that handles argument parsing, +//! logging setup, and the interactive command listener. + #![allow( unused_imports, unreachable_pub, @@ -16,55 +19,45 @@ reason = "TODO: remove after v1.0.0" )] -use std::{mem, sync::Arc}; +use std::{ + io::{self, IsTerminal}, + thread::sleep, + time::Duration, +}; -use tokio::sync::{mpsc, oneshot}; -use tower::{Service, ServiceExt}; -use tracing::{error, info, level_filters::LevelFilter}; -use tracing_subscriber::{layer::SubscriberExt, reload::Handle, util::SubscriberInitExt, Registry}; +use clap::Parser; +use tokio::sync::mpsc; +use tracing::info; -use cuprate_consensus_context::{ - BlockChainContextRequest, BlockChainContextResponse, BlockchainContextService, +use cuprated::{ + config::{find_config, resolve_max_memory, Config}, + constants::{DEFAULT_CONFIG_STARTUP_DELAY, DEFAULT_CONFIG_WARNING}, + logging::eprintln_red, }; -use cuprate_helper::time::secs_to_hms; -use cuprate_p2p_core::{transports::Tcp, ClearNet}; -use cuprate_types::blockchain::BlockchainWriteRequest; -use txpool::IncomingTxHandler; + +mod args; +mod commands; use crate::{ - blockchain::SyncNotify, - config::Config, - constants::{DATABASE_CORRUPT_MSG, PANIC_CRITICAL_SERVICE_ERROR}, - logging::CupratedTracingFilter, - tor::initialize_tor_if_enabled, + args::Args, + commands::{command_listener, Command}, }; -mod blockchain; -mod commands; -mod config; -mod constants; -mod logging; -mod p2p; -mod rpc; -mod signals; -mod statics; -mod tor; -mod txpool; -mod version; - fn main() { // Set global private permissions for created files. cuprate_helper::fs::set_private_global_file_permissions(); - // Initialize global static `LazyLock` data. - statics::init_lazylock_statics(); + // Parse CLI args and read config. + let args = Args::parse(); + args.do_quick_requests(); - let config = config::read_config_and_args(); - - blockchain::set_fast_sync_hashes(config.fast_sync, config.network()); + let mut config = load_config(&args); // Initialize logging. - logging::init_logging(&config); + cuprated::logging::init_logging(&config); + + // Resolve available memory. + resolve_max_memory(&mut config); //Printing configuration info!("{config}"); @@ -75,175 +68,75 @@ fn main() { let rt = init_tokio_rt(&config); - let db_thread_pool = Arc::new( - rayon::ThreadPoolBuilder::new() - .num_threads(config.storage.reader_threads) - .build() - .unwrap(), - ); - - // Start the blockchain & tx-pool databases. - - let fjall_db = fjall::Database::builder(config.fjall_directory()) - .cache_size(config.fjall_cache_size()) - .open() - .unwrap(); - - let (mut blockchain_read_handle, mut blockchain_write_handle, _) = - cuprate_blockchain::service::init_with_pool( - &config.blockchain_config(), - fjall_db.clone(), - Arc::clone(&db_thread_pool), - ) - .inspect_err(|e| error!("Blockchain database error: {e}")) - .expect(DATABASE_CORRUPT_MSG); - - let (txpool_read_handle, txpool_write_handle) = - cuprate_txpool::service::init_with_pool(fjall_db, db_thread_pool) - .inspect_err(|e| error!("Txpool database error: {e}")) - .expect(DATABASE_CORRUPT_MSG); - - // Initialize async tasks. - + #[expect(clippy::significant_drop_tightening)] rt.block_on(async move { - // TODO: Add an argument/option for keeping alt blocks between restart. - blockchain_write_handle - .ready() - .await - .expect(PANIC_CRITICAL_SERVICE_ERROR) - .call(BlockchainWriteRequest::FlushAltBlocks) - .await - .expect(PANIC_CRITICAL_SERVICE_ERROR); + // Start the node. + let node = cuprated::Node::launch(config).await; - // Check add the genesis block to the blockchain. - blockchain::check_add_genesis( - &mut blockchain_read_handle, - &mut blockchain_write_handle, - config.network(), - ) - .await; + // If STDIN is a terminal, spawn a blocking thread for user input. + if io::stdin().is_terminal() { + let (command_tx, command_rx) = mpsc::channel(1); + std::thread::spawn(|| commands::command_listener(command_tx)); - // Start the context service and the block/tx verifier. - let context_svc = - blockchain::init_consensus(blockchain_read_handle.clone(), config.context_config()) + // Wait on the io_loop, spawned on a separate task as this improves performance. + tokio::spawn(commands::io_loop(command_rx, node)) .await .unwrap(); - - // Bootstrap or configure Tor if enabled. - let tor_context = initialize_tor_if_enabled(&config).await; - let tor_enabled = config.p2p.tor_net.enabled; - - // Create the sync notifier and handle. - let (sync_notify, syncer_handle) = SyncNotify::new(); - - // Start clearnet P2P zone - let (clearnet_interface, clearnet_tx_handler_subscriber) = p2p::initialize_clearnet_p2p( - &config, - context_svc.clone(), - blockchain_read_handle.clone(), - txpool_read_handle.clone(), - &tor_context, - sync_notify.callback(context_svc.clone()), - ) - .await; - - // Create Tor router delivery channel. - let (tor_router_tx, tor_router_rx) = tor_enabled.then(oneshot::channel).unzip(); - - // Create the incoming tx handler service. - let tx_handler = IncomingTxHandler::init( - config.storage.txpool.clone(), - clearnet_interface.clone(), - tor_router_rx, - txpool_write_handle.clone(), - txpool_read_handle.clone(), - context_svc.clone(), - blockchain_read_handle.clone(), - ) - .await; - - // Send tx handler sender to clearnet zone - if clearnet_tx_handler_subscriber - .send(tx_handler.clone()) - .is_err() - { - unreachable!() + } else { + // If no STDIN, await OS exit signal. + info!("Terminal/TTY not detected, disabling STDIN commands"); + tokio::signal::ctrl_c().await.unwrap(); } + }); +} - // Initialize the blockchain manager. - blockchain::init_blockchain_manager( - clearnet_interface, - blockchain_write_handle, - blockchain_read_handle.clone(), - tx_handler.txpool_manager.clone(), - context_svc.clone(), - config.block_downloader_config(), - syncer_handle, - ) - .await; - - // Initialize the RPC server(s). - rpc::init_rpc_servers( - config.rpc.clone(), - config.network, - blockchain_read_handle.clone(), - context_svc.clone(), - txpool_read_handle.clone(), - tx_handler.clone(), - ); - - // Start Tor P2P zone after sync completes. - if tor_enabled { - info!("Tor P2P zone will start after sync."); - let context_svc = context_svc.clone(); +/// Load config: explicit path from `--config-file`, auto-detect from default +/// locations, or fall back to defaults with a warning. +fn load_config(args: &Args) -> Config { + let config = if let Some(config_file) = &args.config_file { + Config::read_from_path(config_file).unwrap_or_else(|e| { + eprintln_red(&format!("Failed to read config from file: {e}")); + std::process::exit(1); + }) + } else if let Some(config) = find_config().unwrap_or_else(|e| { + eprintln_red(&format!("Failed to read config: {e}")); + std::process::exit(1); + }) { + config + } else { + if !args.skip_config_warning { + eprintln_red(DEFAULT_CONFIG_WARNING); + sleep(DEFAULT_CONFIG_STARTUP_DELAY); + } + Config::default() + }; - tokio::spawn(async move { - // Wait for the node to synchronize with the network - if sync_notify.wait_for_synced().await.is_err() { - tracing::info!("Not starting Tor P2P zone, syncer stopped"); - return; - } - tracing::info!("Starting Tor P2P zone."); + let config = args.apply_args(config); - let (tor_interface, tor_tx_handler_tx) = p2p::start_tor_p2p( - &config, - context_svc, - blockchain_read_handle, - txpool_read_handle, - tor_context, - ) - .await; + if args.dry_run { + let results = config.dry_run_check(); + let mut has_error = false; - // Send the tx handler to the Tor zone - if tor_tx_handler_tx.send(tx_handler).is_err() { - tracing::warn!("Failed to send tx handler to Tor zone."); - return; + for check in &results { + match &check.result { + Ok(()) => println!("{}", check.description), + Err(e) => { + eprintln_red(&format!("Error: {e}")); + has_error = true; } + } + } - // Deliver the Tor network interface to the dandelion router. - if let Some(tx) = tor_router_tx { - if tx.send(tor_interface).is_err() { - tracing::warn!("Failed to deliver Tor router to dandelion pool."); - } - } - }); + if has_error { + eprintln_red("Checks failed."); + std::process::exit(1); } - // Start the command listener. - if std::io::IsTerminal::is_terminal(&std::io::stdin()) { - let (command_tx, command_rx) = mpsc::channel(1); - std::thread::spawn(|| commands::command_listener(command_tx)); + println!("All checks passed successfully!"); + std::process::exit(0); + } - // Wait on the io_loop, spawned on a separate task as this improves performance. - tokio::spawn(commands::io_loop(command_rx, context_svc)) - .await - .unwrap(); - } else { - // If no STDIN, await OS exit signal. - info!("Terminal/TTY not detected, disabling STDIN commands"); - tokio::signal::ctrl_c().await.unwrap(); - } - }); + config } /// Initialize the [`tokio`] runtime. diff --git a/binaries/cuprated/src/p2p.rs b/binaries/cuprated/src/p2p.rs index 42a97634f..377c70d00 100644 --- a/binaries/cuprated/src/p2p.rs +++ b/binaries/cuprated/src/p2p.rs @@ -13,8 +13,6 @@ use tokio::sync::{ }; use tower::{Service, ServiceExt}; -use cuprate_blockchain::service::{BlockchainReadHandle, BlockchainWriteHandle}; -use cuprate_consensus::BlockchainContextService; use cuprate_p2p::{config::TransportConfig, NetworkInterface, P2PConfig}; use cuprate_p2p_core::{ client::{InternalPeerID, PeerSyncCallback}, @@ -26,10 +24,11 @@ use cuprate_txpool::service::{TxpoolReadHandle, TxpoolWriteHandle}; use cuprate_types::blockchain::BlockchainWriteRequest; use crate::{ - config::Config, + blockchain::BlockchainInterface, constants::PANIC_CRITICAL_SERVICE_ERROR, tor::{transport_clearnet_daemon_config, transport_daemon_config, TorContext, TorMode}, txpool::{self, IncomingTxHandler}, + LaunchContext, }; #[cfg(feature = "arti")] @@ -125,100 +124,121 @@ impl NetworkInterfaces { /// Initialize the clearnet P2P network zone. Returns [`NetworkInterface`] and /// [`Sender`] for propagating the tx handler. pub async fn initialize_clearnet_p2p( - config: &Config, - context_svc: BlockchainContextService, - blockchain_read_handle: BlockchainReadHandle, - txpool_read_handle: TxpoolReadHandle, + launch_ctx: &LaunchContext, tor_ctx: &TorContext, - peer_sync_callback: PeerSyncCallback, ) -> (NetworkInterface, Sender) { + let config = launch_ctx.config.as_ref(); + let peer_sync_callback = launch_ctx.syncer.callback(&launch_ctx.blockchain); + match &config.p2p.clear_net.proxy { ProxySettings::Tor => match tor_ctx.mode { #[cfg(feature = "arti")] TorMode::Arti => { tracing::info!("Anonymizing clearnet connections through Arti."); start_zone_p2p::( - blockchain_read_handle, - context_svc, - txpool_read_handle, + &launch_ctx.blockchain, + launch_ctx.txpool_read.clone(), config.clearnet_p2p_config(), transport_clearnet_arti_config(tor_ctx), - Some(peer_sync_callback.clone()), + Some(peer_sync_callback), ) .await .unwrap() } TorMode::Daemon => start_zone_p2p::( - blockchain_read_handle, - context_svc, - txpool_read_handle, + &launch_ctx.blockchain, + launch_ctx.txpool_read.clone(), config.clearnet_p2p_config(), transport_clearnet_daemon_config(config), - Some(peer_sync_callback.clone()), + Some(peer_sync_callback), ) .await .unwrap(), TorMode::Auto => unreachable!("Auto mode should be resolved before this point"), }, ProxySettings::Disabled => start_zone_p2p::( - blockchain_read_handle, - context_svc, - txpool_read_handle, + &launch_ctx.blockchain, + launch_ctx.txpool_read.clone(), config.clearnet_p2p_config(), config.p2p.clear_net.tcp_transport_config(config.network), - Some(peer_sync_callback.clone()), + Some(peer_sync_callback), ) .await .unwrap(), ProxySettings::Socks(socks_config) => start_zone_p2p::( - blockchain_read_handle, - context_svc, - txpool_read_handle, + &launch_ctx.blockchain, + launch_ctx.txpool_read.clone(), config.clearnet_p2p_config(), TransportConfig { client_config: socks_config.clone(), server_config: None, }, - Some(peer_sync_callback.clone()), + Some(peer_sync_callback), ) .await .unwrap(), } } -/// Start the Tor P2P network zone. Returns [`NetworkInterface`] and -/// a [`Sender`] for propagating the tx handler. -pub async fn start_tor_p2p( - config: &Config, - context_svc: BlockchainContextService, - blockchain_read_handle: BlockchainReadHandle, - txpool_read_handle: TxpoolReadHandle, - tor_ctx: TorContext, -) -> (NetworkInterface, Sender) { - match tor_ctx.mode { - TorMode::Daemon => start_zone_p2p::( - blockchain_read_handle, - context_svc, - txpool_read_handle, - config.tor_p2p_config(&tor_ctx), - transport_daemon_config(config), - None, - ) - .await - .unwrap(), - #[cfg(feature = "arti")] - TorMode::Arti => start_zone_p2p::( - blockchain_read_handle, - context_svc, - txpool_read_handle, - config.tor_p2p_config(&tor_ctx), - transport_arti_config(config, tor_ctx), - None, - ) - .await - .unwrap(), - TorMode::Auto => unreachable!("Auto mode should be resolved before this point"), - } +/// Initialize the Tor P2P network zone after the node has synced with the network. +/// Publishes [`NetworkInterface`] and forwards the [`IncomingTxHandler`] to the Tor zone. +pub fn initialize_tor_p2p( + launch_ctx: LaunchContext, + tor_context: TorContext, + tx_handler: IncomingTxHandler, + interface_publisher: Sender>, + dandelion_router: Option>>, +) { + tracing::info!("Tor P2P zone will start after sync."); + + tokio::spawn(async move { + // Wait for the node to synchronize with the network + if launch_ctx.syncer.wait_for_synced().await.is_err() { + tracing::info!("Not starting Tor P2P zone, syncer stopped"); + return; + } + tracing::info!("Starting Tor P2P zone."); + + let config = launch_ctx.config.as_ref(); + let (tor_interface, tor_tx_handler_tx) = match tor_context.mode { + TorMode::Daemon => start_zone_p2p::( + &launch_ctx.blockchain, + launch_ctx.txpool_read.clone(), + config.tor_p2p_config(&tor_context), + transport_daemon_config(config), + None, + ) + .await + .unwrap(), + #[cfg(feature = "arti")] + TorMode::Arti => start_zone_p2p::( + &launch_ctx.blockchain, + launch_ctx.txpool_read.clone(), + config.tor_p2p_config(&tor_context), + transport_arti_config(config, tor_context), + None, + ) + .await + .unwrap(), + TorMode::Auto => unreachable!("Auto mode should be resolved before this point"), + }; + + // Publish the Tor interface for consumers + drop(interface_publisher.send(tor_interface.clone())); + + // Send the tx handler to the Tor zone + if tor_tx_handler_tx.send(tx_handler).is_err() { + tracing::warn!("Failed to send tx handler to Tor zone."); + return; + } + + // Deliver the Tor network interface to the dandelion router. + if let Some(tx) = dandelion_router { + if tx.send(tor_interface).is_err() { + tracing::warn!("Failed to deliver Tor router to dandelion pool."); + } + } + }); } /// Starts the P2P network zone, returning a [`NetworkInterface`] to interact with it. @@ -226,8 +246,7 @@ pub async fn start_tor_p2p( /// A [`oneshot::Sender`] is also returned to provide the [`IncomingTxHandler`], until this is provided network /// handshakes can not be completed. pub async fn start_zone_p2p( - blockchain_read_handle: BlockchainReadHandle, - blockchain_context_service: BlockchainContextService, + blockchain: &BlockchainInterface, txpool_read_handle: TxpoolReadHandle, config: P2PConfig, transport_config: TransportConfig, @@ -239,20 +258,22 @@ where N::Addr: borsh::BorshDeserialize + borsh::BorshSerialize, CrossNetworkInternalPeerId: From::Addr>>, { + let context_svc = blockchain.context_svc(); let (incoming_tx_handler_tx, incoming_tx_handler_rx) = oneshot::channel(); let request_handler_maker = request_handler::P2pProtocolRequestHandlerMaker { - blockchain_read_handle, - blockchain_context_service: blockchain_context_service.clone(), + blockchain_read_handle: blockchain.read(), + blockchain_context_service: context_svc.clone(), txpool_read_handle, incoming_tx_handler: None, incoming_tx_handler_fut: incoming_tx_handler_rx.shared(), + blockchain_manager: blockchain.manager(), }; Ok(( cuprate_p2p::initialize_network::( request_handler_maker.map_response(|s| s.map_err(Into::into)), - core_sync_service::CoreSyncService(blockchain_context_service), + core_sync_service::CoreSyncService(context_svc), config, transport_config, peer_sync_callback, diff --git a/binaries/cuprated/src/p2p/request_handler.rs b/binaries/cuprated/src/p2p/request_handler.rs index 2bef3d11a..5d58b5429 100644 --- a/binaries/cuprated/src/p2p/request_handler.rs +++ b/binaries/cuprated/src/p2p/request_handler.rs @@ -46,7 +46,7 @@ use cuprate_wire::protocol::{ }; use crate::{ - blockchain::interface::{self as blockchain_interface, IncomingBlockError}, + blockchain::interface::{BlockchainManagerHandle, IncomingBlockError}, constants::PANIC_CRITICAL_SERVICE_ERROR, p2p::CrossNetworkInternalPeerId, txpool::{IncomingTxError, IncomingTxHandler, IncomingTxs}, @@ -65,6 +65,9 @@ pub struct P2pProtocolRequestHandlerMaker { /// A [`Future`](std::future::Future) that produces the [`IncomingTxHandler`]. pub incoming_tx_handler_fut: Shared>, + + /// Handle for the blockchain manager. + pub blockchain_manager: BlockchainManagerHandle, } impl Service> for P2pProtocolRequestHandlerMaker @@ -105,6 +108,7 @@ where blockchain_context_service: self.blockchain_context_service.clone(), txpool_read_handle, incoming_tx_handler, + blockchain_manager: self.blockchain_manager.clone(), })) } } @@ -117,6 +121,7 @@ pub struct P2pProtocolRequestHandler { blockchain_context_service: BlockchainContextService, txpool_read_handle: TxpoolReadHandle, incoming_tx_handler: IncomingTxHandler, + blockchain_manager: BlockchainManagerHandle, } impl Service for P2pProtocolRequestHandler @@ -152,6 +157,7 @@ where self.blockchain_read_handle.clone(), self.blockchain_context_service.clone(), self.txpool_read_handle.clone(), + self.blockchain_manager.clone(), ) .boxed(), ProtocolRequest::NewTransactions(r) => new_transactions( @@ -301,6 +307,7 @@ async fn new_fluffy_block( mut blockchain_read_handle: BlockchainReadHandle, mut blockchain_context_service: BlockchainContextService, mut txpool_read_handle: TxpoolReadHandle, + blockchain_manager: BlockchainManagerHandle, ) -> anyhow::Result { let current_blockchain_height = request.current_blockchain_height; @@ -348,13 +355,14 @@ async fn new_fluffy_block( return Ok(ProtocolResponse::NA); } - let res = blockchain_interface::handle_incoming_block( - block, - txs, - &mut blockchain_read_handle, - &mut txpool_read_handle, - ) - .await; + let res = blockchain_manager + .handle_incoming_block( + block, + txs, + &mut blockchain_read_handle, + &mut txpool_read_handle, + ) + .await; match res { Ok(_) => Ok(ProtocolResponse::NA), @@ -369,6 +377,10 @@ async fn new_fluffy_block( // Block's parent was unknown, could be syncing? Ok(ProtocolResponse::NA) } + Err(IncomingBlockError::ChannelClosed) => { + // Manager has exited (likely shutdown); drop silently. + Ok(ProtocolResponse::NA) + } Err(e) => Err(e.into()), } } diff --git a/binaries/cuprated/src/rpc/handlers/json_rpc.rs b/binaries/cuprated/src/rpc/handlers/json_rpc.rs index c06da1163..905b7d1a7 100644 --- a/binaries/cuprated/src/rpc/handlers/json_rpc.rs +++ b/binaries/cuprated/src/rpc/handlers/json_rpc.rs @@ -60,7 +60,6 @@ use cuprate_types::{ }; use crate::{ - blockchain::interface as blockchain_interface, constants::VERSION_BUILD, rpc::{ constants::{FIELD_NOT_SUPPORTED, UNSUPPORTED_RPC_CALL}, @@ -68,7 +67,6 @@ use crate::{ service::{address_book, blockchain, blockchain_context, blockchain_manager, txpool}, CupratedRpcHandler, }, - statics::START_INSTANT_UNIX, }; /// Map a [`JsonRpcRequest`] to the function that will lead to a [`JsonRpcResponse`]. @@ -247,13 +245,15 @@ async fn submit_block( let block_id = Hex(block.hash()); // Attempt to relay the block. - blockchain_interface::handle_incoming_block( - block, - HashMap::new(), // this function reads the txpool - &mut state.blockchain_read, - &mut state.txpool_read, - ) - .await?; + state + .blockchain_manager + .handle_incoming_block( + block, + HashMap::new(), // this function reads the txpool + &mut state.blockchain_read, + &mut state.txpool_read, + ) + .await?; Ok(SubmitBlockResponse { base: helper::response_base(false), @@ -535,7 +535,11 @@ async fn get_info( )] let rpc_connections_count = if restricted { 0 } else { 0 }; - let start_time = if restricted { 0 } else { *START_INSTANT_UNIX }; + let start_time = if restricted { + 0 + } else { + state.start_instant_unix + }; let synchronized = blockchain_manager::synced(todo!()).await?; let target_height = blockchain_manager::target_height(todo!()).await?; diff --git a/binaries/cuprated/src/rpc/handlers/other_json.rs b/binaries/cuprated/src/rpc/handlers/other_json.rs index 7be7fe4ed..79d41fd54 100644 --- a/binaries/cuprated/src/rpc/handlers/other_json.rs +++ b/binaries/cuprated/src/rpc/handlers/other_json.rs @@ -59,7 +59,6 @@ use crate::{ }, CupratedRpcHandler, }, - statics::START_INSTANT_UNIX, txpool::IncomingTxs, }; @@ -639,7 +638,7 @@ async fn get_net_stats( Ok(GetNetStatsResponse { base: helper::response_base(false), - start_time: *START_INSTANT_UNIX, + start_time: state.start_instant_unix, total_packets_in: todo!(), total_bytes_in: todo!(), total_packets_out: todo!(), diff --git a/binaries/cuprated/src/rpc/rpc_handler.rs b/binaries/cuprated/src/rpc/rpc_handler.rs index e87ff53ba..0d0d2ae05 100644 --- a/binaries/cuprated/src/rpc/rpc_handler.rs +++ b/binaries/cuprated/src/rpc/rpc_handler.rs @@ -21,7 +21,9 @@ use cuprate_types::BlockTemplate; use cuprate_helper::network::Network; -use crate::{rpc::handlers, txpool::IncomingTxHandler}; +use crate::{ + blockchain::BlockchainManagerHandle, rpc::handlers, txpool::IncomingTxHandler, LaunchContext, +}; /// TODO: use real type when public. #[derive(Clone)] @@ -149,8 +151,8 @@ pub enum BlockchainManagerResponse { CreateBlockTemplate(Box), } -/// TODO: use real type when public. -pub type BlockchainManagerHandle = +/// TODO: replace with [`BlockchainManagerHandle`] when RPC operations are implemented. +pub type BlockchainManagerRpcHandle = tower::util::BoxService; /// cuprated's RPC handler service. @@ -174,25 +176,35 @@ pub struct CupratedRpcHandler { pub txpool_read: TxpoolReadHandle, pub tx_handler: IncomingTxHandler, + + /// Command channel to the blockchain manager. + pub blockchain_manager: BlockchainManagerHandle, + + /// The time this node was launched as a UNIX timestamp. + pub start_instant_unix: u64, } impl CupratedRpcHandler { /// Create a new [`Self`]. - pub const fn new( + pub fn new( restricted: bool, - network: Network, - blockchain_read: BlockchainReadHandle, - blockchain_context: BlockchainContextService, - txpool_read: TxpoolReadHandle, tx_handler: IncomingTxHandler, + launch_ctx: &LaunchContext, ) -> Self { + let start_instant_unix = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap_or_default() + .as_secs(); + Self { restricted, - network, - blockchain_read, - blockchain_context, - txpool_read, + network: launch_ctx.config.network, tx_handler, + blockchain_read: launch_ctx.blockchain.read(), + blockchain_context: launch_ctx.blockchain.context_svc(), + txpool_read: launch_ctx.txpool_read.clone(), + blockchain_manager: launch_ctx.blockchain.manager(), + start_instant_unix, } } } diff --git a/binaries/cuprated/src/rpc/server.rs b/binaries/cuprated/src/rpc/server.rs index 8c53e02f3..b1c19a829 100644 --- a/binaries/cuprated/src/rpc/server.rs +++ b/binaries/cuprated/src/rpc/server.rs @@ -11,16 +11,13 @@ use tower::limit::rate::RateLimitLayer; use tower_http::limit::RequestBodyLimitLayer; use tracing::{info, warn}; -use cuprate_blockchain::service::BlockchainReadHandle; -use cuprate_consensus::BlockchainContextService; -use cuprate_helper::network::Network; use cuprate_rpc_interface::{RouterBuilder, RpcHandler}; -use cuprate_txpool::service::TxpoolReadHandle; use crate::{ - config::{restricted_rpc_port, unrestricted_rpc_port, RpcConfig}, - rpc::{rpc_handler::BlockchainManagerHandle, CupratedRpcHandler}, + config::{restricted_rpc_port, unrestricted_rpc_port}, + rpc::CupratedRpcHandler, txpool::IncomingTxHandler, + LaunchContext, }; /// Initialize the RPC server(s). @@ -30,20 +27,14 @@ use crate::{ /// - the server(s) could not be started /// - unrestricted RPC is started on non-local /// address without override option -pub fn init_rpc_servers( - config: RpcConfig, - network: Network, - blockchain_read: BlockchainReadHandle, - blockchain_context: BlockchainContextService, - txpool_read: TxpoolReadHandle, - tx_handler: IncomingTxHandler, -) { +pub fn init_rpc_servers(launch_ctx: &LaunchContext, tx_handler: IncomingTxHandler) { + let config = &launch_ctx.config.rpc; for ((enable, addr, port, request_byte_limit), restricted) in [ ( ( config.unrestricted.enable, config.unrestricted.address, - unrestricted_rpc_port(config.unrestricted.port, network), + unrestricted_rpc_port(config.unrestricted.port, launch_ctx.config.network), config.unrestricted.request_byte_limit, ), false, @@ -52,7 +43,7 @@ pub fn init_rpc_servers( ( config.restricted.enable, config.restricted.address, - restricted_rpc_port(config.restricted.port, network), + restricted_rpc_port(config.restricted.port, launch_ctx.config.network), config.restricted.request_byte_limit, ), true, @@ -77,14 +68,7 @@ pub fn init_rpc_servers( } } - let rpc_handler = CupratedRpcHandler::new( - restricted, - network, - blockchain_read.clone(), - blockchain_context.clone(), - txpool_read.clone(), - tx_handler.clone(), - ); + let rpc_handler = CupratedRpcHandler::new(restricted, tx_handler.clone(), launch_ctx); tokio::task::spawn(async move { run_rpc_server( diff --git a/binaries/cuprated/src/rpc/service/blockchain_manager.rs b/binaries/cuprated/src/rpc/service/blockchain_manager.rs index 75db7ade3..ddc5f1eec 100644 --- a/binaries/cuprated/src/rpc/service/blockchain_manager.rs +++ b/binaries/cuprated/src/rpc/service/blockchain_manager.rs @@ -11,7 +11,8 @@ use cuprate_rpc_types::misc::Span; use cuprate_types::BlockTemplate; use crate::rpc::rpc_handler::{ - BlockchainManagerHandle, BlockchainManagerRequest, BlockchainManagerResponse, + BlockchainManagerRequest, BlockchainManagerResponse, + BlockchainManagerRpcHandle as BlockchainManagerHandle, }; /// [`BlockchainManagerRequest::PopBlocks`] diff --git a/binaries/cuprated/src/signals.rs b/binaries/cuprated/src/signals.rs deleted file mode 100644 index 42148ca83..000000000 --- a/binaries/cuprated/src/signals.rs +++ /dev/null @@ -1,12 +0,0 @@ -//! Signals for Cuprate state used throughout the binary. - -use tokio::sync::RwLock; - -/// Reorg lock. -/// -/// A [`RwLock`] where a write lock is taken during a reorg and a read lock can be taken -/// for any operation which must complete without a reorg happening. -/// -/// Currently, the only operation that needs to take a read lock is adding txs to the tx-pool, -/// this can potentially be removed in the future, see: -pub static REORG_LOCK: RwLock<()> = RwLock::const_new(()); diff --git a/binaries/cuprated/src/statics.rs b/binaries/cuprated/src/statics.rs deleted file mode 100644 index 2d7338da6..000000000 --- a/binaries/cuprated/src/statics.rs +++ /dev/null @@ -1,53 +0,0 @@ -//! Global `static`s used throughout `cuprated`. - -use std::{ - sync::LazyLock, - time::{SystemTime, UNIX_EPOCH}, -}; - -/// Define all the `static`s that should be always be initialized early on. -/// -/// This wraps all `static`s inside a `LazyLock` and generates -/// a [`init_lazylock_statics`] function that must/should be -/// used by `main()` early on. -macro_rules! define_init_lazylock_statics { - ($( - $( #[$attr:meta] )* - $name:ident: $t:ty = $init_fn:expr_2021; - )*) => { - /// Initialize global static `LazyLock` data. - pub fn init_lazylock_statics() { - $( - LazyLock::force(&$name); - )* - } - - $( - $(#[$attr])* - pub static $name: LazyLock<$t> = LazyLock::new(|| $init_fn); - )* - }; -} - -define_init_lazylock_statics! { - /// The start time of `cuprated`. - START_INSTANT: SystemTime = SystemTime::now(); - - /// Start time of `cuprated` as a UNIX timestamp. - START_INSTANT_UNIX: u64 = START_INSTANT - .duration_since(UNIX_EPOCH) - .expect("Failed to set `cuprated` startup time.") - .as_secs(); -} - -#[cfg(test)] -mod test { - use super::*; - - /// Sanity check for startup UNIX time. - #[test] - fn start_instant_unix() { - // Fri Sep 27 01:07:13 AM UTC 2024 - assert!(*START_INSTANT_UNIX > 1727399233); - } -} diff --git a/binaries/cuprated/src/tor.rs b/binaries/cuprated/src/tor.rs index af4e57217..8ba7c12a8 100644 --- a/binaries/cuprated/src/tor.rs +++ b/binaries/cuprated/src/tor.rs @@ -19,6 +19,7 @@ use cuprate_wire::OnionAddr; use crate::{ config::{p2p_port, Config}, p2p::ProxySettings, + LaunchContext, }; #[cfg(feature = "arti")] @@ -71,7 +72,8 @@ pub struct TorContext { /// /// This function will bootstrap Arti if needed by Tor network zone or /// clearnet as a proxy. -pub async fn initialize_tor_if_enabled(config: &Config) -> TorContext { +pub async fn initialize_tor_if_enabled(launch_ctx: &LaunchContext) -> TorContext { + let config = launch_ctx.config.as_ref(); let anonymize_clearnet = matches!(config.p2p.clear_net.proxy, ProxySettings::Tor); let tor_enabled = config.p2p.tor_net.enabled || anonymize_clearnet; diff --git a/binaries/cuprated/src/txpool/incoming_tx.rs b/binaries/cuprated/src/txpool/incoming_tx.rs index d0d15df1a..2e27d2e0c 100644 --- a/binaries/cuprated/src/txpool/incoming_tx.rs +++ b/binaries/cuprated/src/txpool/incoming_tx.rs @@ -7,7 +7,7 @@ use std::{ use bytes::Bytes; use futures::{future::BoxFuture, FutureExt}; use monero_oxide::transaction::Transaction; -use tokio::sync::{mpsc, oneshot}; +use tokio::sync::{mpsc, oneshot, RwLock}; use tower::{BoxError, Service, ServiceExt}; use tracing::instrument; @@ -37,10 +37,8 @@ use cuprate_types::TransactionVerificationData; use crate::{ blockchain::ConsensusBlockchainReadHandle, - config::TxpoolConfig, constants::PANIC_CRITICAL_SERVICE_ERROR, p2p::CrossNetworkInternalPeerId, - signals::REORG_LOCK, txpool::{ dandelion::{ self, AnonTxService, ConcreteDandelionRouter, DiffuseService, MainDandelionRouter, @@ -49,6 +47,7 @@ use crate::{ relay_rules::{check_tx_relay_rules, RelayRuleError}, txs_being_handled::{TxsBeingHandled, TxsBeingHandledLocally}, }, + LaunchContext, }; /// An error that can happen handling an incoming tx. @@ -103,6 +102,8 @@ pub struct IncomingTxHandler { pub(super) txpool_read_handle: TxpoolReadHandle, /// The blockchain read handle. pub(super) blockchain_read_handle: ConsensusBlockchainReadHandle, + /// Reorg lock. + reorg_lock: Arc>, } impl IncomingTxHandler { @@ -110,14 +111,13 @@ impl IncomingTxHandler { #[expect(clippy::significant_drop_tightening)] #[instrument(level = "info", skip_all, name = "start_txpool")] pub async fn init( - txpool_config: TxpoolConfig, + launch_ctx: &LaunchContext, clear_net: NetworkInterface, tor_net_rx: Option>>, txpool_write_handle: TxpoolWriteHandle, - txpool_read_handle: TxpoolReadHandle, - blockchain_context_cache: BlockchainContextService, - blockchain_read_handle: BlockchainReadHandle, ) -> Self { + let txpool_config = launch_ctx.config.storage.txpool.clone(); + let diffuse_service = DiffuseService { clear_net_broadcast_service: clear_net.broadcast_svc(), }; @@ -129,13 +129,13 @@ impl IncomingTxHandler { let dandelion_pool_manager = dandelion::start_dandelion_pool_manager( dandelion_router, - txpool_read_handle.clone(), + launch_ctx.txpool_read.clone(), promote_tx, ); let txpool_manager = start_txpool_manager( txpool_write_handle, - txpool_read_handle.clone(), + launch_ctx.txpool_read.clone(), promote_rx, diffuse_service, dandelion_pool_manager.clone(), @@ -145,14 +145,15 @@ impl IncomingTxHandler { Self { txs_being_handled: TxsBeingHandled::new(), - blockchain_context_cache, + blockchain_context_cache: launch_ctx.blockchain.context_svc(), dandelion_pool_manager, txpool_manager, - txpool_read_handle, + txpool_read_handle: launch_ctx.txpool_read.clone(), blockchain_read_handle: ConsensusBlockchainReadHandle::new( - blockchain_read_handle, + launch_ctx.blockchain.read(), BoxError::from, ), + reorg_lock: Arc::clone(&launch_ctx.reorg_lock), } } } @@ -175,12 +176,14 @@ impl Service for IncomingTxHandler { self.txpool_read_handle.clone(), self.txpool_manager.clone(), self.dandelion_pool_manager.clone(), + Arc::clone(&self.reorg_lock), ) .boxed() } } /// Handles the incoming txs. +#[expect(clippy::too_many_arguments)] async fn handle_incoming_txs( IncomingTxs { txs, @@ -194,8 +197,9 @@ async fn handle_incoming_txs( mut txpool_read_handle: TxpoolReadHandle, mut txpool_manager_handle: TxpoolManagerHandle, mut dandelion_pool_manager: DandelionPoolService, + reorg_lock: Arc>, ) -> Result<(), IncomingTxError> { - let _reorg_guard = REORG_LOCK.read().await; + let _reorg_guard = reorg_lock.read().await; let (txs, stem_pool_txs, txs_being_handled_guard) = prepare_incoming_txs(txs, txs_being_handled, &mut txpool_read_handle).await?; diff --git a/binaries/cuprated/src/version.rs b/binaries/cuprated/src/version.rs index ab41af8f5..25c025fa3 100644 --- a/binaries/cuprated/src/version.rs +++ b/binaries/cuprated/src/version.rs @@ -56,6 +56,12 @@ impl CupratedVersionInfo { } } +impl Default for CupratedVersionInfo { + fn default() -> Self { + Self::new() + } +} + #[cfg(test)] mod tests { use super::CupratedVersionInfo; diff --git a/consensus/fast-sync/src/fast_sync.rs b/consensus/fast-sync/src/fast_sync.rs index d3371eacb..52029e864 100644 --- a/consensus/fast-sync/src/fast_sync.rs +++ b/consensus/fast-sync/src/fast_sync.rs @@ -1,7 +1,6 @@ use std::{ cmp::min, collections::{HashMap, VecDeque}, - sync::OnceLock, }; use blake3::Hasher; @@ -21,34 +20,19 @@ use cuprate_types::{ Chain, VerifiedBlockInformation, VerifiedTransactionInformation, }; -/// A [`OnceLock`] representing the fast sync hashes. -static FAST_SYNC_HASHES: OnceLock<&[[u8; 32]]> = OnceLock::new(); - /// The size of a batch of block hashes to hash to create a fast sync hash. pub const FAST_SYNC_BATCH_LEN: usize = 512; -/// Returns the height of the first block not included in the embedded hashes. -/// -/// # Panics -/// -/// This function will panic if [`set_fast_sync_hashes`] has not been called. -pub fn fast_sync_stop_height() -> usize { - FAST_SYNC_HASHES.get().unwrap().len() * FAST_SYNC_BATCH_LEN -} - -/// Sets the hashes to use for fast-sync. -/// -/// # Panics -/// -/// This will panic if this is called more than once. -pub fn set_fast_sync_hashes(hashes: &'static [[u8; 32]]) { - FAST_SYNC_HASHES.set(hashes).unwrap(); +/// Returns the height of the first block not included in the given hashes. +pub const fn fast_sync_stop_height(hashes: &[[u8; 32]]) -> usize { + hashes.len() * FAST_SYNC_BATCH_LEN } /// Validates that the given [`ChainEntry`]s are in the fast-sync hashes. /// /// `entries` should be a list of sequential entries. /// `start_height` should be the height of the first block in the first entry. +/// `fast_sync_hashes` should be the hashes to validate against (empty to skip validation). /// /// Returns a tuple, the first element being the entries that are valid* the second /// the entries we do not know are valid and should be passed in again when we have more entries. @@ -57,17 +41,16 @@ pub fn set_fast_sync_hashes(hashes: &'static [[u8; 32]]) { /// we can not check their validity here. /// /// There may be more entries returned than passed in as entries could be split. -/// -/// # Panics -/// -/// This will panic if [`set_fast_sync_hashes`] has not been called. pub async fn validate_entries( mut entries: VecDeque>, start_height: usize, blockchain_read_handle: &mut BlockchainReadHandle, + fast_sync_hashes: &[[u8; 32]], ) -> Result<(VecDeque>, VecDeque>), tower::BoxError> { + let stop_height = fast_sync_stop_height(fast_sync_hashes); + // if we are past the top fast sync block return all entries as valid. - if start_height >= fast_sync_stop_height() { + if start_height >= stop_height { return Ok((entries, VecDeque::new())); } @@ -83,7 +66,7 @@ pub async fn validate_entries( for, we will split a batch if it can only be partially validated. With the remaining hashes from the blockchain and the hashes in the batches we can validate we - work on calculating the fast sync hashes and comparing them to the ones in [`FAST_SYNC_HASHES`]. + work on calculating the fast sync hashes and comparing them to the ones provided. */ // First calculate the start and stop for this range of hashes. @@ -93,7 +76,7 @@ pub async fn validate_entries( let hashes_stop_height = min( (last_height / FAST_SYNC_BATCH_LEN) * FAST_SYNC_BATCH_LEN, - fast_sync_stop_height(), + stop_height, ); let mut hashes_stop_diff_last_height = last_height - hashes_stop_height; @@ -162,10 +145,7 @@ pub async fn validate_entries( if (i + 1) % FAST_SYNC_BATCH_LEN == 0 { let got_hash = hasher.finalize(); - if got_hash - != FAST_SYNC_HASHES.get().unwrap() - [get_hash_index_for_height(hashes_start_height + i)] - { + if got_hash != fast_sync_hashes[get_hash_index_for_height(hashes_start_height + i)] { return Err("Hashes do not match".into()); } hasher.reset(); @@ -274,22 +254,21 @@ mod tests { use cuprate_p2p::block_downloader::ChainEntry; use cuprate_p2p_core::{client::InternalPeerID, handles::HandleBuilder, ClearNet}; - use crate::{ - fast_sync_stop_height, set_fast_sync_hashes, validate_entries, FAST_SYNC_BATCH_LEN, - }; + use crate::{fast_sync_stop_height, validate_entries, FAST_SYNC_BATCH_LEN}; static HASHES: LazyLock<&[[u8; 32]]> = LazyLock::new(|| { - let hashes = (0..FAST_SYNC_BATCH_LEN * 2000) + (0..FAST_SYNC_BATCH_LEN * 2000) .map(|i| { let mut ret = [0; 32]; ret[..8].copy_from_slice(&i.to_le_bytes()); ret }) - .collect::>(); - - let hashes = hashes.leak(); + .collect::>() + .leak() + }); - let fast_sync_hashes = hashes + static FAST_SYNC_HASHES: LazyLock<&[[u8; 32]]> = LazyLock::new(|| { + HASHES .chunks(FAST_SYNC_BATCH_LEN) .map(|chunk| { let len = chunk.len() * 32; @@ -300,11 +279,8 @@ mod tests { // within the [[u8; 32]]'s lifetime. unsafe { blake3::hash(slice::from_raw_parts(bytes, len)).into() } }) - .collect::>(); - - set_fast_sync_hashes(fast_sync_hashes.leak()); - - hashes + .collect::>() + .leak() }); fn test_db(path: PathBuf) -> BlockchainReadHandle { @@ -343,14 +319,14 @@ mod tests { tokio_test::block_on(async move { let mut blockchain_read_handle= test_db(data_dir.path().to_path_buf()); - let ret = validate_entries::(VecDeque::from([entry]), 0, &mut blockchain_read_handle).await.unwrap(); + let ret = validate_entries::(VecDeque::from([entry]), 0, &mut blockchain_read_handle, *FAST_SYNC_HASHES).await.unwrap(); let len_left = ret.0.iter().map(|e| e.ids.len()).sum::(); let len_right = ret.1.iter().map(|e| e.ids.len()).sum::(); assert_eq!(len_left + len_right, len); - assert!(len_left <= fast_sync_stop_height()); - assert!(len_right < FAST_SYNC_BATCH_LEN || len > fast_sync_stop_height()); + assert!(len_left <= fast_sync_stop_height(*FAST_SYNC_HASHES)); + assert!(len_right < FAST_SYNC_BATCH_LEN || len > fast_sync_stop_height(*FAST_SYNC_HASHES)); }); } @@ -370,14 +346,14 @@ mod tests { tokio_test::block_on(async move { let mut blockchain_read_handle= test_db(data_dir.path().to_path_buf()); - let ret = validate_entries::(entries, 0, &mut blockchain_read_handle).await.unwrap(); + let ret = validate_entries::(entries, 0, &mut blockchain_read_handle, *FAST_SYNC_HASHES).await.unwrap(); let len_left = ret.0.iter().map(|e| e.ids.len()).sum::(); let len_right = ret.1.iter().map(|e| e.ids.len()).sum::(); assert_eq!(len_left + len_right, len); - assert!(len_left <= fast_sync_stop_height()); - assert!(len_right < FAST_SYNC_BATCH_LEN || len > fast_sync_stop_height()); + assert!(len_left <= fast_sync_stop_height(*FAST_SYNC_HASHES)); + assert!(len_right < FAST_SYNC_BATCH_LEN || len > fast_sync_stop_height(*FAST_SYNC_HASHES)); }); } @@ -397,7 +373,7 @@ mod tests { tokio_test::block_on(async move { let mut blockchain_read_handle= test_db(data_dir.path().to_path_buf()); - let ret = validate_entries::(VecDeque::from([entry]), 0, &mut blockchain_read_handle).await.unwrap(); + let ret = validate_entries::(VecDeque::from([entry]), 0, &mut blockchain_read_handle, *FAST_SYNC_HASHES).await.unwrap(); let len_left = ret.0.iter().map(|e| e.ids.len()).sum::(); let len_right = ret.1.iter().map(|e| e.ids.len()).sum::(); diff --git a/consensus/fast-sync/src/lib.rs b/consensus/fast-sync/src/lib.rs index 1e195df50..5fc69f3b3 100644 --- a/consensus/fast-sync/src/lib.rs +++ b/consensus/fast-sync/src/lib.rs @@ -14,6 +14,6 @@ use tracing_subscriber as _; mod fast_sync; pub use fast_sync::{ - block_to_verified_block_information, fast_sync_stop_height, set_fast_sync_hashes, - validate_entries, FAST_SYNC_BATCH_LEN, + block_to_verified_block_information, fast_sync_stop_height, validate_entries, + FAST_SYNC_BATCH_LEN, };