Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 10 additions & 10 deletions binaries/cuprated/src/blockchain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,18 @@ use cuprate_types::{
VerifiedBlockInformation,
};

use crate::constants::PANIC_CRITICAL_SERVICE_ERROR;

mod chain_service;
mod error;
mod fast_sync;
pub mod interface;
mod manager;
pub(crate) mod syncer;
mod types;

pub use error::{BlockManagerError, BlockValidationError, IncomingBlockError};
pub use fast_sync::get_fast_sync_hashes;
pub use interface::BlockchainManagerHandle;
pub use manager::IncomingBlockOk;
pub use syncer::{Syncer, SyncerHandle};
pub use types::ConsensusBlockchainReadHandle;

Expand Down Expand Up @@ -85,17 +86,16 @@ pub async fn check_add_genesis(
blockchain_read_handle: &mut BlockchainReadHandle,
blockchain_write_handle: &mut BlockchainWriteHandle,
network: Network,
) {
) -> anyhow::Result<()> {
// Try to get the chain height, will fail if the genesis block is not in the DB.
if blockchain_read_handle
.ready()
.await
.expect(PANIC_CRITICAL_SERVICE_ERROR)
.await?
.call(BlockchainReadRequest::ChainHeight)
.await
.is_ok()
{
return;
return Ok(());
}

let genesis = generate_genesis_block(network);
Expand All @@ -105,8 +105,7 @@ pub async fn check_add_genesis(

blockchain_write_handle
.ready()
.await
.expect(PANIC_CRITICAL_SERVICE_ERROR)
.await?
.call(BlockchainWriteRequest::WriteBlock(
VerifiedBlockInformation {
block_blob: genesis.serialize(),
Expand All @@ -123,8 +122,9 @@ pub async fn check_add_genesis(
block: genesis,
},
))
.await
.expect(PANIC_CRITICAL_SERVICE_ERROR);
.await?;

Ok(())
}

/// Initializes the consensus services.
Expand Down
113 changes: 113 additions & 0 deletions binaries/cuprated/src/blockchain/error.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
//! Error types for the blockchain manager interface.

use cuprate_blockchain::BlockchainError;
use cuprate_consensus::ExtendedConsensusError;
use cuprate_consensus_rules::{blocks::BlockError, hard_forks::HardForkError, ConsensusError};
use cuprate_txpool::TxPoolError;
use cuprate_types::TxConversionError;

macro_rules! impl_internal_from {
($($t:ty),* $(,)?) => {$(
impl From<$t> for BlockManagerError {
fn from(e: $t) -> Self { Self::Internal(e.into()) }
}
impl From<$t> for IncomingBlockError {
fn from(e: $t) -> Self { BlockManagerError::from(e).into() }
}
)*};
}

/// A validation failure - the peer should be banned.
#[derive(Debug, thiserror::Error)]
pub enum BlockValidationError {
/// Invalid hard-fork rules.
#[error(transparent)]
HardFork(HardForkError),

/// Any other consensus rule violation.
#[error(transparent)]
Other(ExtendedConsensusError),
}

/// An error from the blockchain manager's internal handlers.
#[derive(Debug, thiserror::Error)]
pub enum BlockManagerError {
/// The peer sent us an invalid block; ban them.
#[error(transparent)]
Validation(BlockValidationError),

/// A node-side failure.
#[error(transparent)]
Internal(#[from] tower::BoxError),
}

impl From<ExtendedConsensusError> for BlockManagerError {
fn from(e: ExtendedConsensusError) -> Self {
match e {
ExtendedConsensusError::DBErr(e) => Self::Internal(e),
ExtendedConsensusError::ConErr(ConsensusError::Block(BlockError::HardForkError(e))) => {
Self::Validation(BlockValidationError::HardFork(e))
}

ExtendedConsensusError::ConErr(_)
| ExtendedConsensusError::TxsIncludedWithBlockIncorrect
| ExtendedConsensusError::OneOrMoreBatchVerificationStatementsInvalid
| ExtendedConsensusError::NoBlocksToVerify => {
Self::Validation(BlockValidationError::Other(e))
}
}
}
}

impl From<ConsensusError> for BlockManagerError {
fn from(e: ConsensusError) -> Self {
ExtendedConsensusError::ConErr(e).into()
}
}

/// An error returned from [`BlockchainManagerHandle::handle_incoming_block`](super::interface::BlockchainManagerHandle::handle_incoming_block).
#[derive(Debug, thiserror::Error)]
pub enum IncomingBlockError {
/// The peer sent us an invalid block; ban them.
#[error(transparent)]
Validation(BlockValidationError),

/// A node-side failure.
#[error(transparent)]
Internal(tower::BoxError),

/// We are missing the block's parent.
#[error("The block has an unknown parent.")]
Orphan,

/// Some transactions in the block were unknown.
///
/// The inner values are the block hash and the indexes of the missing txs in the block.
#[error("Unknown transactions in block.")]
UnknownTransactions([u8; 32], Vec<usize>),

/// The block claimed more transactions than it contained.
#[error("Too many transactions given for block.")]
TooManyTxs,

/// The blockchain manager command channel is closed.
#[error("The blockchain manager command channel is closed.")]
ChannelClosed,
}

impl From<BlockManagerError> for IncomingBlockError {
fn from(e: BlockManagerError) -> Self {
match e {
BlockManagerError::Validation(v) => Self::Validation(v),
BlockManagerError::Internal(i) => Self::Internal(i),
}
}
}

impl From<ConsensusError> for IncomingBlockError {
fn from(e: ConsensusError) -> Self {
BlockManagerError::from(e).into()
}
}

impl_internal_from!(BlockchainError, TxPoolError, TxConversionError);
56 changes: 11 additions & 45 deletions binaries/cuprated/src/blockchain/interface.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,17 +11,17 @@ use monero_oxide::{block::Block, transaction::Transaction};
use tokio::sync::{mpsc, oneshot};
use tower::{Service, ServiceExt};

use cuprate_blockchain::service::BlockchainReadHandle;
use cuprate_blockchain::{service::BlockchainReadHandle, BlockchainError};
use cuprate_consensus::transactions::new_tx_verification_data;
use cuprate_txpool::service::{
interface::{TxpoolReadRequest, TxpoolReadResponse},
TxpoolReadHandle,
};
use cuprate_types::blockchain::{BlockchainReadRequest, BlockchainResponse};

use crate::{
blockchain::manager::{BlockchainManagerCommand, IncomingBlockOk},
constants::PANIC_CRITICAL_SERVICE_ERROR,
use crate::blockchain::{
manager::{BlockchainManagerCommand, IncomingBlockOk},
IncomingBlockError,
};

/// Handle for the blockchain manager.
Expand All @@ -42,25 +42,6 @@ pub struct BlockchainManagerHandle {
blocks_being_handled: Arc<Mutex<HashSet<[u8; 32]>>>,
}

/// An error that can be returned from [`BlockchainManagerHandle::handle_incoming_block`].
#[derive(Debug, thiserror::Error)]
pub enum IncomingBlockError {
/// Some transactions in the block were unknown.
///
/// The inner values are the block hash and the indexes of the missing txs in the block.
#[error("Unknown transactions in block.")]
UnknownTransactions([u8; 32], Vec<usize>),
/// We are missing the block's parent.
#[error("The block has an unknown parent.")]
Orphan,
/// The block was invalid.
#[error(transparent)]
InvalidBlock(anyhow::Error),
/// The blockchain manager command channel is closed.
#[error("The blockchain manager command channel is closed.")]
ChannelClosed,
}

impl BlockchainManagerHandle {
/// Create a new handle and command receiver pair.
pub fn new() -> (Self, mpsc::Receiver<BlockchainManagerCommand>) {
Expand Down Expand Up @@ -98,34 +79,24 @@ impl BlockchainManagerHandle {
txpool_read_handle: &mut TxpoolReadHandle,
) -> Result<IncomingBlockOk, IncomingBlockError> {
if given_txs.len() > block.transactions.len() {
return Err(IncomingBlockError::InvalidBlock(anyhow::anyhow!(
"Too many transactions given for block"
)));
return Err(IncomingBlockError::TooManyTxs);
}

if !block_exists(block.header.previous, blockchain_read_handle)
.await
.expect(PANIC_CRITICAL_SERVICE_ERROR)
{
if !block_exists(block.header.previous, blockchain_read_handle).await? {
return Err(IncomingBlockError::Orphan);
}

let block_hash = block.hash();

if block_exists(block_hash, blockchain_read_handle)
.await
.expect(PANIC_CRITICAL_SERVICE_ERROR)
{
if block_exists(block_hash, blockchain_read_handle).await? {
return Ok(IncomingBlockOk::AlreadyHave);
}

let TxpoolReadResponse::TxsForBlock { mut txs, missing } = txpool_read_handle
.ready()
.await
.expect(PANIC_CRITICAL_SERVICE_ERROR)
.await?
.call(TxpoolReadRequest::TxsForBlock(block.transactions.clone()))
.await
.expect(PANIC_CRITICAL_SERVICE_ERROR)
.await?
else {
unreachable!()
};
Expand All @@ -142,11 +113,7 @@ impl BlockchainManagerHandle {
return Err(IncomingBlockError::UnknownTransactions(block_hash, missing));
};

txs.insert(
needed_hash,
new_tx_verification_data(tx)
.map_err(|e| IncomingBlockError::InvalidBlock(e.into()))?,
);
txs.insert(needed_hash, new_tx_verification_data(tx)?);
}
}

Expand Down Expand Up @@ -185,7 +152,6 @@ impl BlockchainManagerHandle {
response_rx
.await
.map_err(|_| IncomingBlockError::ChannelClosed)?
.map_err(IncomingBlockError::InvalidBlock)
}

/// Pop blocks from the top of the blockchain.
Expand All @@ -211,7 +177,7 @@ impl BlockchainManagerHandle {
async fn block_exists(
block_hash: [u8; 32],
blockchain_read_handle: &mut BlockchainReadHandle,
) -> Result<bool, anyhow::Error> {
) -> Result<bool, BlockchainError> {
let BlockchainResponse::FindBlock(chain) = blockchain_read_handle
.ready()
.await?
Expand Down
45 changes: 24 additions & 21 deletions binaries/cuprated/src/blockchain/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ use cuprate_types::{

use crate::{
blockchain::{chain_service::ChainService, syncer, types::ConsensusBlockchainReadHandle},
constants::PANIC_CRITICAL_SERVICE_ERROR,
txpool::TxpoolManagerHandle,
LaunchContext,
};
Expand Down Expand Up @@ -59,15 +58,18 @@ pub(crate) async fn init_blockchain_manager(
let stop_current_block_downloader = Arc::new(Notify::new());
let fast_sync_hashes = launch_ctx.config.fast_sync_hashes();

launch_ctx.task_executor.spawn(syncer.run(
launch_ctx.blockchain.context_svc(),
ChainService(launch_ctx.blockchain.read(), fast_sync_hashes),
clearnet_interface.clone(),
batch_tx,
Arc::clone(&stop_current_block_downloader),
block_downloader_config,
shutdown_token.clone(),
));
launch_ctx.task_executor.spawn_critical(
"blockchain syncer",
syncer.run(
launch_ctx.blockchain.context_svc(),
ChainService(launch_ctx.blockchain.read(), fast_sync_hashes),
clearnet_interface.clone(),
batch_tx,
Arc::clone(&stop_current_block_downloader),
block_downloader_config,
shutdown_token.clone(),
),
);

let manager = BlockchainManager {
blockchain_write_handle,
Expand All @@ -83,9 +85,10 @@ pub(crate) async fn init_blockchain_manager(
fast_sync_hashes,
};

launch_ctx
.task_executor
.spawn(manager.run(batch_rx, command_rx, shutdown_token));
launch_ctx.task_executor.spawn_critical(
"blockchain manager",
manager.run(batch_rx, command_rx, shutdown_token),
);

Ok(())
}
Expand Down Expand Up @@ -125,29 +128,29 @@ impl BlockchainManager {
mut block_batch_rx: mpsc::Receiver<(BlockBatch, Arc<OwnedSemaphorePermit>)>,
mut command_rx: mpsc::Receiver<BlockchainManagerCommand>,
shutdown_token: CancellationToken,
) {
) -> anyhow::Result<()> {
loop {
tokio::select! {
biased;
() = shutdown_token.cancelled() => {
break;
}
Some((batch, permit)) = block_batch_rx.recv() => {
self.handle_incoming_block_batch(
batch,
).await;
self.handle_incoming_block_batch(batch)
.await
.map_err(anyhow::Error::from_boxed)?;

drop(permit);
}
Some(incoming_command) = command_rx.recv() => {
self.handle_command(incoming_command).await;
}
else => {
break;
self.handle_command(incoming_command)
.await
.map_err(anyhow::Error::from_boxed)?;
}
}
}

tracing::info!("Blockchain manager shut down.");
Ok(())
}
}
Loading
Loading