From 873ca778930c4a250c299167fc070d7ea81b5f52 Mon Sep 17 00:00:00 2001 From: Boog900 <54e72d8a-345f-4599-bd90-c6b9bc7d0ec5@aleeas.com> Date: Fri, 12 Jun 2026 23:45:04 +0100 Subject: [PATCH 1/2] add get_blocks.bin --- binaries/cuprated/src/rpc/handlers/bin.rs | 132 +++++++++----- binaries/cuprated/src/rpc/handlers/helper.rs | 14 +- .../cuprated/src/rpc/handlers/json_rpc.rs | 23 ++- .../cuprated/src/rpc/handlers/other_json.rs | 5 +- .../cuprated/src/rpc/service/blockchain.rs | 66 +++++-- binaries/cuprated/src/rpc/service/txpool.rs | 64 ++++++- binaries/cuprated/src/txpool.rs | 2 +- binaries/cuprated/src/txpool/incoming_tx.rs | 6 +- binaries/cuprated/src/txpool/manager.rs | 153 ++++++++++++++-- constants/src/rpc.rs | 4 - rpc/types/src/bin.rs | 17 +- storage/blockchain/src/service/read.rs | 168 +++++++++++++++++- storage/txpool/src/service/read.rs | 31 +++- types/types/src/blockchain.rs | 29 +++ 14 files changed, 599 insertions(+), 115 deletions(-) diff --git a/binaries/cuprated/src/rpc/handlers/bin.rs b/binaries/cuprated/src/rpc/handlers/bin.rs index 667a8b26a..92b8464e0 100644 --- a/binaries/cuprated/src/rpc/handlers/bin.rs +++ b/binaries/cuprated/src/rpc/handlers/bin.rs @@ -9,7 +9,9 @@ use std::num::NonZero; use anyhow::{anyhow, Error}; use bytes::Bytes; -use cuprate_constants::rpc::{RESTRICTED_BLOCK_COUNT, RESTRICTED_TRANSACTIONS_COUNT}; +use cuprate_constants::rpc::{ + GET_BLOCKS_BIN_MAX_BLOCK_COUNT, RESTRICTED_BLOCK_COUNT, RESTRICTED_TRANSACTIONS_COUNT, +}; use cuprate_fixed_bytes::ByteArrayVec; use cuprate_helper::cast::{u64_to_usize, usize_to_u64}; use cuprate_rpc_interface::RpcHandler; @@ -22,15 +24,15 @@ use cuprate_rpc_types::{ GetTransactionPoolHashesRequest, GetTransactionPoolHashesResponse, }, json::{GetOutputDistributionRequest, GetOutputDistributionResponse}, - misc::RequestedInfo, + misc::{RequestedInfo, Status}, }; use cuprate_types::{ - rpc::{PoolInfo, PoolInfoExtent}, + rpc::{BlockOutputIndices, PoolInfoExtent, TxOutputIndices}, BlockCompleteEntry, }; use crate::rpc::{ - handlers::{helper, shared, shared::not_available}, + handlers::{helper, shared}, service::{blockchain, txpool}, CupratedRpcHandler, }; @@ -44,7 +46,7 @@ pub async fn map_request( use BinResponse as Resp; Ok(match request { - Req::GetBlocks(r) => Resp::GetBlocks(not_available()?), + Req::GetBlocks(r) => Resp::GetBlocks(get_blocks(state, r).await?), Req::GetBlocksByHeight(r) => Resp::GetBlocksByHeight(get_blocks_by_height(state, r).await?), Req::GetHashes(r) => Resp::GetHashes(get_hashes(state, r).await?), Req::GetOutputIndexes(r) => Resp::GetOutputIndexes(get_output_indexes(state, r).await?), @@ -74,12 +76,13 @@ async fn get_blocks( prune, no_miner_tx, pool_info_since, + max_block_count, } = request; let block_hashes: Vec<[u8; 32]> = (&block_ids).into(); drop(block_ids); - let Some(requested_info) = RequestedInfo::from_u8(request.requested_info) else { + let Some(requested_info) = RequestedInfo::from_u8(requested_info) else { return Err(anyhow!("Wrong requested info")); }; @@ -89,71 +92,120 @@ async fn get_blocks( RequestedInfo::PoolOnly => (false, true), }; - let pool_info_extent = PoolInfoExtent::None; + let (pool_info_extent, added_pool_txs, remaining_added_pool_txids, removed_pool_txids); - let pool_info = if get_pool { + if get_pool { let is_restricted = state.is_restricted(); - let include_sensitive_txs = !is_restricted; - let max_tx_count = if is_restricted { RESTRICTED_TRANSACTIONS_COUNT } else { usize::MAX }; - txpool::pool_info( - &mut state.txpool_read, - include_sensitive_txs, - max_tx_count, - NonZero::new(u64_to_usize(request.pool_info_since)), - ) - .await? + let pool_since = + txpool::pool_info_since(&state.tx_handler.txpool_manager, pool_info_since).await?; + + let (to_send, remaining): (&[[u8; 32]], &[[u8; 32]]) = + if pool_since.added.len() > max_tx_count { + pool_since.added.split_at(max_tx_count) + } else { + (&pool_since.added, &[]) + }; + + added_pool_txs = txpool::tx_blobs_by_hash(&mut state.txpool_read, to_send, prune).await?; + remaining_added_pool_txids = remaining.to_vec().into(); + removed_pool_txids = pool_since.removed.into(); + + pool_info_extent = if pool_since.full_required { + PoolInfoExtent::Full + } else { + PoolInfoExtent::Incremental + }; } else { - PoolInfo::None - }; + pool_info_extent = PoolInfoExtent::None; + added_pool_txs = vec![]; + remaining_added_pool_txids = ByteArrayVec::default(); + removed_pool_txids = ByteArrayVec::default(); + } let resp = GetBlocksResponse { base: helper::access_response_base(false), blocks: vec![], start_height: 0, current_height: 0, + top_block_hash: [0; 32], output_indices: vec![], daemon_time, - pool_info, + pool_info_extent, + added_pool_txs, + remaining_added_pool_txids, + removed_pool_txids, }; if !get_blocks { return Ok(resp); } - if let Some(block_id) = block_hashes.first() { - let (height, hash) = helper::top_height(&mut state).await?; - - if hash == *block_id { - return Ok(GetBlocksResponse { - current_height: height + 1, - ..resp - }); - } - } + let len = u64_to_usize(if max_block_count > 0 { + max_block_count.min(GET_BLOCKS_BIN_MAX_BLOCK_COUNT) + } else { + GET_BLOCKS_BIN_MAX_BLOCK_COUNT + }); - let (block_hashes, start_height, _) = - blockchain::next_chain_entry(&mut state.blockchain_read, block_hashes).await?; + let req_start_height = if start_height > 0 { + Some(u64_to_usize(start_height)) + } else { + None + }; - if start_height.is_none() { - return Err(anyhow!("Block IDs were not sorted properly")); + if req_start_height.is_none() && block_hashes.is_empty() { + return Ok(GetBlocksResponse { + base: AccessResponseBase { + response_base: ResponseBase { + status: Status::Failed, + untrusted: false, + }, + credits: 0, + top_hash: String::new(), + }, + ..resp + }); } - let (blocks, missing_hashes, height) = - blockchain::block_complete_entries(&mut state.blockchain_read, block_hashes).await?; - - if !missing_hashes.is_empty() { - return Err(anyhow!("Missing blocks")); + let (top_height, top_hash) = helper::top_height(&mut state); + if start_height > top_height || block_hashes.first() == Some(&top_hash) { + return Ok(GetBlocksResponse { + current_height: top_height + 1, + top_block_hash: top_hash, + ..resp + }); } + let (blocks, chain_height, actual_start_height, output_indices, top_hash) = + blockchain::block_complete_entries_above_split_point( + &mut state.blockchain_read, + block_hashes, + req_start_height, + no_miner_tx, + len, + prune, + ) + .await?; + Ok(GetBlocksResponse { blocks, - current_height: usize_to_u64(height), + current_height: usize_to_u64(chain_height), + start_height: usize_to_u64(actual_start_height), + top_block_hash: top_hash, + output_indices: output_indices + .into_iter() + .map(|block| BlockOutputIndices { + indices: block + .into_iter() + .map(|indices| TxOutputIndices { indices }) + .collect(), + }) + .collect(), ..resp }) } diff --git a/binaries/cuprated/src/rpc/handlers/helper.rs b/binaries/cuprated/src/rpc/handlers/helper.rs index 920c551bc..7b5314617 100644 --- a/binaries/cuprated/src/rpc/handlers/helper.rs +++ b/binaries/cuprated/src/rpc/handlers/helper.rs @@ -35,7 +35,7 @@ pub(super) async fn block_header( let block = blockchain::block(&mut state.blockchain_read, height).await?; let header = blockchain::block_extended_header(&mut state.blockchain_read, height).await?; let hardfork = HardFork::from_vote(header.vote); - let (top_height, _) = top_height(state).await?; + let (top_height, _) = top_height(state); // TODO: if the request block is not on the main chain, // we must get the alt block and this variable will be `true`. @@ -132,11 +132,11 @@ pub(super) async fn block_header_by_hash( /// # Errors /// This returns the [`top_height`] on [`Ok`] and /// returns [`Error`] if `height` is greater than [`top_height`]. -pub(super) async fn check_height( +pub(super) fn check_height( state: &mut CupratedRpcHandler, height: u64, ) -> Result { - let (top_height, _) = top_height(state).await?; + let (top_height, _) = top_height(state); if height > top_height { return Err(anyhow!( @@ -164,10 +164,10 @@ pub(super) fn hex_to_hash(hex: String) -> Result<[u8; 32], Error> { } /// [`cuprate_types::blockchain::BlockchainResponse::ChainHeight`] minus 1. -pub(super) async fn top_height(state: &mut CupratedRpcHandler) -> Result<(u64, [u8; 32]), Error> { - let (chain_height, hash) = blockchain::chain_height(&mut state.blockchain_read).await?; - let height = chain_height.checked_sub(1).unwrap(); - Ok((height, hash)) +pub(super) fn top_height(state: &mut CupratedRpcHandler) -> (u64, [u8; 32]) { + let context = state.blockchain_context.blockchain_context(); + let height = context.chain_height.checked_sub(1).unwrap(); + (usize_to_u64(height), context.top_hash) } /// TODO: impl bootstrap diff --git a/binaries/cuprated/src/rpc/handlers/json_rpc.rs b/binaries/cuprated/src/rpc/handlers/json_rpc.rs index c471cbc78..abcac60d8 100644 --- a/binaries/cuprated/src/rpc/handlers/json_rpc.rs +++ b/binaries/cuprated/src/rpc/handlers/json_rpc.rs @@ -258,9 +258,7 @@ async fn get_block_count( Ok(GetBlockCountResponse { base: helper::response_base(false), // Block count starts at 1 - count: blockchain::chain_height(&mut state.blockchain_read) - .await? - .0, + count: usize_to_u64(state.blockchain_context.blockchain_context().chain_height), }) } @@ -345,7 +343,7 @@ async fn get_last_block_header( mut state: CupratedRpcHandler, request: GetLastBlockHeaderRequest, ) -> Result { - let (height, _) = helper::top_height(&mut state).await?; + let (height, _) = helper::top_height(&mut state); let block_header = helper::block_header(&mut state, height, request.fill_pow_hash).await?; Ok(GetLastBlockHeaderResponse { @@ -387,7 +385,8 @@ async fn get_block_header_by_height( mut state: CupratedRpcHandler, request: GetBlockHeaderByHeightRequest, ) -> Result { - helper::check_height(&mut state, request.height).await?; + helper::check_height(&mut state, request.height)?; + let block_header = helper::block_header(&mut state, request.height, request.fill_pow_hash).await?; @@ -402,7 +401,7 @@ async fn get_block_headers_range( mut state: CupratedRpcHandler, request: GetBlockHeadersRangeRequest, ) -> Result { - let (top_height, _) = helper::top_height(&mut state).await?; + let (top_height, _) = helper::top_height(&mut state); if request.start_height >= top_height || request.end_height >= top_height @@ -457,7 +456,8 @@ async fn get_block( request: GetBlockRequest, ) -> Result { let (block, block_header) = if request.hash.is_empty() { - helper::check_height(&mut state, request.height).await?; + helper::check_height(&mut state, request.height)?; + let block = blockchain::block(&mut state.blockchain_read, request.height).await?; let block_header = helper::block_header(&mut state, request.height, request.fill_pow_hash).await?; @@ -871,9 +871,8 @@ async fn get_coinbase_tx_sum( mut state: CupratedRpcHandler, request: GetCoinbaseTxSumRequest, ) -> Result { - let chain_height = blockchain::chain_height(&mut state.blockchain_read) - .await? - .0; + let chain_height = usize_to_u64(state.blockchain_context.blockchain_context().chain_height); + if request.height >= chain_height || request.count > chain_height - request.height { return Err(anyhow!("requested range exceeds blockchain height")); } @@ -906,9 +905,7 @@ async fn get_version( mut state: CupratedRpcHandler, _: GetVersionRequest, ) -> Result { - let current_height = blockchain::chain_height(&mut state.blockchain_read) - .await? - .0; + let current_height = usize_to_u64(state.blockchain_context.blockchain_context().chain_height); let target_height = state.syncer_handle.target_height(); let mut hard_forks: Vec = diff --git a/binaries/cuprated/src/rpc/handlers/other_json.rs b/binaries/cuprated/src/rpc/handlers/other_json.rs index 4c4a65749..a13d58e2b 100644 --- a/binaries/cuprated/src/rpc/handlers/other_json.rs +++ b/binaries/cuprated/src/rpc/handlers/other_json.rs @@ -114,8 +114,9 @@ async fn get_height( mut state: CupratedRpcHandler, _: GetHeightRequest, ) -> Result { - let (height, hash) = blockchain::chain_height(&mut state.blockchain_read).await?; - let hash = Hex(hash); + let context = state.blockchain_context.blockchain_context(); + let height = usize_to_u64(context.chain_height); + let hash = Hex(context.top_hash); Ok(GetHeightResponse { base: helper::response_base(false), diff --git a/binaries/cuprated/src/rpc/service/blockchain.rs b/binaries/cuprated/src/rpc/service/blockchain.rs index 909703866..b70e66df5 100644 --- a/binaries/cuprated/src/rpc/service/blockchain.rs +++ b/binaries/cuprated/src/rpc/service/blockchain.rs @@ -181,22 +181,6 @@ pub async fn block_extended_header_in_range( Ok(output) } -/// [`BlockchainReadRequest::ChainHeight`]. -pub async fn chain_height( - blockchain_read: &mut BlockchainReadHandle, -) -> Result<(u64, [u8; 32]), Error> { - let BlockchainResponse::ChainHeight(height, hash) = blockchain_read - .ready() - .await? - .call(BlockchainReadRequest::ChainHeight) - .await? - else { - unreachable!(); - }; - - Ok((usize_to_u64(height), hash)) -} - /// [`BlockchainReadRequest::GeneratedCoins`]. pub async fn generated_coins( blockchain_read: &mut BlockchainReadHandle, @@ -516,6 +500,56 @@ pub async fn block_complete_entries( Ok((blocks, missing_hashes, blockchain_height)) } +/// [`BlockchainReadRequest::BlockCompleteEntriesAboveSplitPoint`]. +/// +/// Returns `(blocks, blockchain_height, start_height, output_indices, top_hash)`. +pub async fn block_complete_entries_above_split_point( + blockchain_read: &mut BlockchainReadHandle, + chain: Vec<[u8; 32]>, + start_height: Option, + no_miner_tx: bool, + len: usize, + pruned: bool, +) -> Result< + ( + Vec, + usize, + usize, + Vec>>, + [u8; 32], + ), + Error, +> { + let BlockchainResponse::BlockCompleteEntriesAboveSplitPoint { + blocks, + output_indices, + blockchain_height, + start_height, + top_hash, + } = blockchain_read + .ready() + .await? + .call(BlockchainReadRequest::BlockCompleteEntriesAboveSplitPoint { + chain, + start_height, + no_miner_tx, + len, + pruned, + }) + .await? + else { + unreachable!(); + }; + + Ok(( + blocks, + blockchain_height, + start_height, + output_indices, + top_hash, + )) +} + /// [`BlockchainReadRequest::BlockCompleteEntriesByHeight`]. pub async fn block_complete_entries_by_height( blockchain_read: &mut BlockchainReadHandle, diff --git a/binaries/cuprated/src/rpc/service/txpool.rs b/binaries/cuprated/src/rpc/service/txpool.rs index 6f91989ed..0b07d537b 100644 --- a/binaries/cuprated/src/rpc/service/txpool.rs +++ b/binaries/cuprated/src/rpc/service/txpool.rs @@ -7,7 +7,7 @@ use std::{ }; use anyhow::{anyhow, Error}; -use monero_oxide::transaction::Transaction; +use monero_oxide::transaction::{Pruned, Transaction}; use tower::{Service, ServiceExt}; use cuprate_helper::cast::usize_to_u64; @@ -63,6 +63,68 @@ pub async fn size( Ok(usize_to_u64(size)) } +/// [`TxpoolReadRequest::TxsByHash`]/ +pub async fn tx_blobs_by_hash( + txpool_read: &mut TxpoolReadHandle, + tx_hashes: &[[u8; 32]], + prune: bool, +) -> Result, Error> { + let TxpoolReadResponse::TxsByHash(txs) = txpool_read + .ready() + .await + .map_err(|e| anyhow!(e))? + .call(TxpoolReadRequest::TxsByHash { + tx_hashes: tx_hashes.to_vec(), + // TODO: allow sending private txs on restricted. + include_sensitive_txs: false, + }) + .await + .map_err(|e| anyhow!(e))? + else { + unreachable!() + }; + + txs.into_iter() + .map(|t| { + let mut tx_blob = t.tx_blob; + + if prune { + // Instead of reading then writing the pruned part to get the pruned blob + // we can read and then just use the number of bytes read. + let mut slice = tx_blob.as_slice(); + Transaction::::read(&mut slice) + .map_err(|e| anyhow!("failed to parse pool tx blob: {e}"))?; + let pruned_len = tx_blob.len() - slice.len(); + tx_blob.truncate(pruned_len); + } + + Ok(PoolTxInfo { + tx_hash: t.tx_hash, + tx_blob, + double_spend_seen: t.double_spend_seen, + }) + }) + .collect() +} + +/// Query the txpool manager for public-pool transactions added/removed since `since`. +pub async fn pool_info_since( + txpool_manager: &crate::txpool::TxpoolManagerHandle, + since: u64, +) -> Result { + let (response_tx, response_rx) = tokio::sync::oneshot::channel(); + + txpool_manager + .command_tx + .send(crate::txpool::TxpoolManagerCommand::PoolInfoSince { since, response_tx }) + .await + .map_err(|_| anyhow!("txpool manager stopped"))?; + + response_rx + .await + .map_err(|_| anyhow!("txpool manager stopped")) +} + /// [`TxpoolReadRequest::PoolInfo`] pub async fn pool_info( txpool_read: &mut TxpoolReadHandle, diff --git a/binaries/cuprated/src/txpool.rs b/binaries/cuprated/src/txpool.rs index 8a9962567..f70ad92ed 100644 --- a/binaries/cuprated/src/txpool.rs +++ b/binaries/cuprated/src/txpool.rs @@ -13,5 +13,5 @@ mod relay_rules; mod txs_being_handled; pub use incoming_tx::{IncomingTxError, IncomingTxHandler, IncomingTxs}; -pub use manager::TxpoolManagerHandle; +pub use manager::{PoolInfoSinceResponse, TxpoolManagerCommand, TxpoolManagerHandle}; pub use relay_rules::RelayRuleError; diff --git a/binaries/cuprated/src/txpool/incoming_tx.rs b/binaries/cuprated/src/txpool/incoming_tx.rs index ada4dd9af..6104db032 100644 --- a/binaries/cuprated/src/txpool/incoming_tx.rs +++ b/binaries/cuprated/src/txpool/incoming_tx.rs @@ -43,7 +43,7 @@ use crate::{ dandelion::{ self, AnonTxService, ConcreteDandelionRouter, DiffuseService, MainDandelionRouter, }, - manager::{start_txpool_manager, TxpoolManagerHandle}, + manager::{start_txpool_manager, TxpoolManagerCommand, TxpoolManagerHandle}, relay_rules::{check_tx_relay_rules, RelayRuleError}, txs_being_handled::{TxsBeingHandled, TxsBeingHandledLocally}, }, @@ -243,8 +243,8 @@ async fn handle_incoming_txs( // TODO: take into account `do_not_relay` in the tx-pool manager. if txpool_manager_handle - .tx_tx - .send((tx, state.clone())) + .command_tx + .send(TxpoolManagerCommand::IncomingTx(tx, state.clone())) .await .is_err() { diff --git a/binaries/cuprated/src/txpool/manager.rs b/binaries/cuprated/src/txpool/manager.rs index fd7f4e5b4..b16920886 100644 --- a/binaries/cuprated/src/txpool/manager.rs +++ b/binaries/cuprated/src/txpool/manager.rs @@ -1,5 +1,6 @@ use std::{ cmp::min, + collections::BTreeSet, time::{Duration, Instant, SystemTime, UNIX_EPOCH}, }; @@ -38,6 +39,11 @@ use crate::{ const INCOMING_TX_QUEUE_SIZE: usize = 100; +/// The maximum number of recently-removed public transactions to remember. +/// +/// When this limit is reached, the oldest entry (by removal timestamp) is dropped. +const MAX_RECENTLY_REMOVED_TXS: usize = 5000; + /// Starts the transaction pool manager service. /// /// # Panics @@ -68,7 +74,7 @@ pub async fn start_txpool_manager( let mut stem_txs = Vec::new(); let mut tx_timeouts = DelayQueue::with_capacity(backlog.len()); - let current_txs = backlog + let current_txs: IndexMap<[u8; 32], TxInfo> = backlog .into_iter() .map(|tx| { let timeout_key = if tx.private { @@ -92,9 +98,18 @@ pub async fn start_txpool_manager( }) .collect(); + let public_pool_timestamps: BTreeSet<(u64, [u8; 32])> = current_txs + .iter() + .filter(|(_, info)| !info.private) + .map(|(id, info)| (info.received_at, *id)) + .collect(); + let mut manager = TxpoolManager { current_txs, tx_timeouts, + public_pool_timestamps, + recently_removed_txs: BTreeSet::new(), + removed_txs_start_time: current_unix_timestamp(), txpool_write_handle, txpool_read_handle, dandelion_pool_manager, @@ -109,26 +124,55 @@ pub async fn start_txpool_manager( manager.promote_tx(tx).await; } - let (tx_tx, tx_rx) = mpsc::channel(INCOMING_TX_QUEUE_SIZE); + let (command_tx, command_rx) = mpsc::channel(INCOMING_TX_QUEUE_SIZE); let (spent_kis_tx, spent_kis_rx) = mpsc::channel(1); let shutdown_token = task_executor.cancellation_token(); - task_executor.spawn(manager.run(tx_rx, spent_kis_rx, shutdown_token)); + task_executor.spawn(manager.run(command_rx, spent_kis_rx, shutdown_token)); TxpoolManagerHandle { - tx_tx, + command_tx, spent_kis_tx, } } +/// Commands sent to the [`TxpoolManager`] via [`TxpoolManagerHandle`]. +#[expect( + clippy::large_enum_variant, + reason = "`IncomingTx` is the most common command" +)] +pub enum TxpoolManagerCommand { + /// An incoming transaction to add to the pool. + IncomingTx( + TransactionVerificationData, + TxState, + ), + + /// Request hashes of public-pool transactions added/removed at or after a UNIX timestamp. + PoolInfoSince { + since: u64, + response_tx: oneshot::Sender, + }, +} + +/// Response to [`TxpoolManagerCommand::PoolInfoSince`]. +pub struct PoolInfoSinceResponse { + /// `true` if the manager's incremental tracking does not reach back to the + /// requested timestamp, so the caller must send a full pool snapshot + /// + /// When set, `added` contains the entire public pool and `removed` is empty. + pub full_required: bool, + /// Hashes of public-pool txs that entered the pool at or after `since`. + pub added: Vec<[u8; 32]>, + /// Hashes of public-pool txs that were removed from the pool at or after `since`. + pub removed: Vec<[u8; 32]>, +} + /// A handle to the tx-pool manager. #[derive(Clone)] pub struct TxpoolManagerHandle { - /// The incoming tx channel. - pub tx_tx: mpsc::Sender<( - TransactionVerificationData, - TxState, - )>, + /// Channel for sending commands to the manager. + pub command_tx: mpsc::Sender, /// The spent key images in a new block tx. spent_kis_tx: mpsc::Sender<(Vec<[u8; 32]>, oneshot::Sender<()>)>, @@ -141,7 +185,7 @@ impl TxpoolManagerHandle { #[expect(clippy::let_underscore_must_use)] pub fn mock() -> Self { let (spent_kis_tx, mut spent_kis_rx) = mpsc::channel(1); - let (tx_tx, mut tx_rx) = mpsc::channel(100); + let (command_tx, mut command_rx) = mpsc::channel(100); tokio::spawn(async move { loop { @@ -155,14 +199,14 @@ impl TxpoolManagerHandle { tokio::spawn(async move { loop { - if tx_rx.recv().await.is_none() { + if command_rx.recv().await.is_none() { return; } } }); Self { - tx_tx, + command_tx, spent_kis_tx, } } @@ -203,6 +247,21 @@ struct TxpoolManager { /// Timeouts can be for re-relaying or removal from the pool. tx_timeouts: DelayQueue<[u8; 32]>, + /// Sorted `(public_timestamp, tx_hash)` for every tx currently in the public pool. + /// + /// Stem txs are never present here. + public_pool_timestamps: BTreeSet<(u64, [u8; 32])>, + + /// Sorted `(removal_timestamp, tx_hash)` for recently removed public-pool transactions. + /// + /// Bounded by [`MAX_RECENTLY_REMOVED_TXS`], when the limit is exceeded the entry with the + /// lowest removal timestamp is dropped. Only public (non-stem) txs are tracked here. + recently_removed_txs: BTreeSet<(u64, [u8; 32])>, + + /// The earliest timestamp incremental removed-tx tracking covers. Advanced when old entries are + /// evicted from `recently_removed_txs`. + removed_txs_start_time: u64, + txpool_write_handle: TxpoolWriteHandle, txpool_read_handle: TxpoolReadHandle, @@ -234,6 +293,20 @@ impl TxpoolManager { .timeout_key .and_then(|key| self.tx_timeouts.try_remove(&key)); + if !tx_info.private { + self.public_pool_timestamps + .remove(&(tx_info.received_at, tx)); + + let removal_timestamp = current_unix_timestamp(); + self.recently_removed_txs.insert((removal_timestamp, tx)); + if self.recently_removed_txs.len() > MAX_RECENTLY_REMOVED_TXS { + if let Some((evicted_timestamp, _)) = self.recently_removed_txs.pop_first() { + self.removed_txs_start_time = + self.removed_txs_start_time.max(evicted_timestamp); + } + } + } + if remove_from_db { self.txpool_write_handle .ready() @@ -320,6 +393,8 @@ impl TxpoolManager { // The dandelion pool handles stem tx embargo. None } else { + self.public_pool_timestamps.insert((now, tx)); + let timeout = calculate_next_timeout(now, self.config.maximum_age_secs); tracing::trace!(in_secs = timeout, "setting next tx timeout"); @@ -411,6 +486,8 @@ impl TxpoolManager { // It's now in the public pool, pretend we just saw it. tx_info.received_at = current_unix_timestamp(); + self.public_pool_timestamps + .insert((tx_info.received_at, tx)); let next_timeout = calculate_next_timeout(tx_info.received_at, self.config.maximum_age_secs); @@ -429,6 +506,24 @@ impl TxpoolManager { .expect(PANIC_CRITICAL_SERVICE_ERROR); } + /// Returns the hashes of all public-pool transactions that entered the public pool at or + /// after `timestamp`. + fn public_txs_from(&self, timestamp: u64) -> Vec<[u8; 32]> { + self.public_pool_timestamps + .range((timestamp, [0_u8; 32])..) + .map(|(_, hash)| *hash) + .collect() + } + + /// Returns the hashes of recently removed public-pool transactions that were removed at or + /// after `timestamp` (inclusive, UNIX seconds). + fn removed_txs_from(&self, timestamp: u64) -> Vec<[u8; 32]> { + self.recently_removed_txs + .range((timestamp, [0_u8; 32])..) + .map(|(_, hash)| *hash) + .collect() + } + /// Handles removing all transactions that have been included/double spent in an incoming block. #[instrument(level = "debug", skip_all)] async fn new_block(&mut self, spent_key_images: Vec<[u8; 32]>) { @@ -454,10 +549,7 @@ impl TxpoolManager { #[expect(clippy::let_underscore_must_use)] async fn run( mut self, - mut tx_rx: mpsc::Receiver<( - TransactionVerificationData, - TxState, - )>, + mut command_rx: mpsc::Receiver, mut block_rx: mpsc::Receiver<(Vec<[u8; 32]>, oneshot::Sender<()>)>, shutdown_token: CancellationToken, ) { @@ -474,8 +566,33 @@ impl TxpoolManager { Some(tx) = self.tx_timeouts.next() => { self.handle_tx_timeout(tx.into_inner()).await; } - Some((tx, state)) = tx_rx.recv() => { - self.handle_incoming_tx(tx, state).await; + Some(command) = command_rx.recv() => { + match command { + TxpoolManagerCommand::IncomingTx(tx, state) => { + self.handle_incoming_tx(tx, state).await; + } + TxpoolManagerCommand::PoolInfoSince { since, response_tx } => { + // If `since` is 0 the requester wants the full pool. If `since` is older than `removed_txs_start_time`, + // then we need to send a full pool as the requester might have missed some removals. + let full_required = + since == 0 || since <= self.removed_txs_start_time; + + let response = if full_required { + PoolInfoSinceResponse { + full_required: true, + added: self.public_txs_from(0), + removed: vec![], + } + } else { + PoolInfoSinceResponse { + full_required: false, + added: self.public_txs_from(since), + removed: self.removed_txs_from(since), + } + }; + let _ = response_tx.send(response); + } + } } Some(tx) = self.promote_tx_channel.recv() => { self.promote_tx(tx).await; diff --git a/constants/src/rpc.rs b/constants/src/rpc.rs index 1130eb7b9..fa446b8ce 100644 --- a/constants/src/rpc.rs +++ b/constants/src/rpc.rs @@ -79,10 +79,6 @@ pub const OUTPUT_HISTOGRAM_RECENT_CUTOFF_RESTRICTION: Duration = Duration::from_ #[doc = monero_definition_link!(a1dc85c5373a30f14aaf7dcfdd95f5a7375d3623, "/src/cryptonote_config.h", 128)] pub const GET_BLOCKS_BIN_MAX_BLOCK_COUNT: u64 = 1000; -/// Maximum amount of requestable transactions in `/get_blocks.bin`. -#[doc = monero_definition_link!(a1dc85c5373a30f14aaf7dcfdd95f5a7375d3623, "/src/cryptonote_config.h", 129)] -pub const GET_BLOCKS_BIN_MAX_TX_COUNT: u64 = 20_000; - /// Max message content length in the RPC server. #[doc = monero_definition_link!(a1dc85c5373a30f14aaf7dcfdd95f5a7375d3623, "/src/cryptonote_config.h", 130)] /// diff --git a/rpc/types/src/bin.rs b/rpc/types/src/bin.rs index 519951e2f..67a747b26 100644 --- a/rpc/types/src/bin.rs +++ b/rpc/types/src/bin.rs @@ -9,7 +9,7 @@ use cuprate_fixed_bytes::ByteArrayVec; use serde::{Deserialize, Serialize}; use cuprate_types::{ - rpc::{BlockOutputIndices, PoolInfo}, + rpc::{BlockOutputIndices, PoolInfoExtent, PoolTxInfo}, BlockCompleteEntry, }; @@ -102,21 +102,24 @@ define_request_and_response! { Request { requested_info: u8 = default::(), "default", block_ids: ByteArrayVec<32> = default::>(), "default", - start_height: u64, - prune: bool, - no_miner_tx: bool, + start_height: u64 = default::(), "default", + prune: bool = default::(), "default", + no_miner_tx: bool = default::(), "default", pool_info_since: u64 = default::(), "default", + max_block_count: u64 = default::(), "default", }, - // TODO: add `top_block_hash` field - // AccessResponseBase { blocks: Vec = default::>(), "default", start_height: u64, current_height: u64, + top_block_hash: [u8; 32] = default::<[u8; 32]>(), "default", output_indices: Vec = default::>(), "default", daemon_time: u64 = default::(), "default", - pool_info: PoolInfo = default::(), "default", + pool_info_extent: PoolInfoExtent = default::(), "default", + added_pool_txs: Vec, + remaining_added_pool_txids: ByteArrayVec<32>, + removed_pool_txids: ByteArrayVec<32>, } } diff --git a/storage/blockchain/src/service/read.rs b/storage/blockchain/src/service/read.rs index 7a2c3707d..faa069ec8 100644 --- a/storage/blockchain/src/service/read.rs +++ b/storage/blockchain/src/service/read.rs @@ -17,6 +17,7 @@ use std::{ task::{Context, Poll}, }; +use bytes::Bytes; use fjall::Readable; use futures::channel::oneshot; use indexmap::{IndexMap, IndexSet}; @@ -40,7 +41,7 @@ use cuprate_types::{ ChainInfo, CoinbaseTxSum, OutputDistributionData, OutputHistogramEntry, OutputHistogramInput, }, - Chain, ChainId, ExtendedBlockHeader, OutputDistributionInput, TxsInBlock, + Chain, ChainId, ExtendedBlockHeader, OutputDistributionInput, TransactionBlobs, TxsInBlock, }; use crate::{ @@ -128,6 +129,20 @@ fn map_request( match request { R::BlockCompleteEntries(block_hashes) => block_complete_entries(env, block_hashes), R::BlockCompleteEntriesByHeight(heights) => block_complete_entries_by_height(env, heights), + R::BlockCompleteEntriesAboveSplitPoint { + chain, + start_height, + no_miner_tx, + len, + pruned, + } => block_complete_entries_above_split_point( + env, + chain, + start_height, + no_miner_tx, + len, + pruned, + ), R::BlockExtendedHeader(block) => block_extended_header(env, block), R::BlockHash(block, chain) => block_hash(env, block, chain), R::BlockHashInRange(blocks, chain) => block_hash_in_range(env, blocks, chain), @@ -218,6 +233,157 @@ fn block_complete_entries(db: &BlockchainDatabase, block_hashes: Vec) }) } +/// [`BlockchainReadRequest::BlockCompleteEntriesAboveSplitPoint`]. +fn block_complete_entries_above_split_point( + db: &BlockchainDatabase, + chain: Vec<[u8; 32]>, + start_height: Option, + no_miner_tx: bool, + len: usize, + pruned: bool, +) -> ResponseResult { + /// Total size of all block/tx blobs to return before stopping early. + /// + /// This is lower than monerod, as monerod packs too close to the epee size limit. + const MAX_TOTAL_SIZE: usize = 50 * 1024 * 1024; + /// Total tx count to return before stopping early. + /// + /// This is lower than monerod, as monerod packs too close to the epee size limit. + const MAX_TOTAL_TXS: usize = 10_000; + + let tx_ro = db.fjall.snapshot(); + let tapes = db.linear_tapes.reader(); + + let blockchain_height = crate::ops::blockchain::chain_height(db, &tapes)?; + let top_hash = tapes + .read_entry(&db.block_infos, usize_to_u64(blockchain_height - 1))? + .ok_or(BlockchainError::NotFound)? + .block_hash; + + // If a specific start height was requested, use it directly. Otherwise, scan to find the split point. + let height = if let Some(h) = start_height { + if h >= blockchain_height { + return Ok(BlockchainResponse::BlockCompleteEntriesAboveSplitPoint { + blocks: vec![], + output_indices: vec![], + blockchain_height, + start_height: h, + top_hash, + }); + } + h + } else { + let split = find_split_point(db, &chain, false, false, &tx_ro)?; + + if split == chain.len() { + return Err(BlockchainError::NotFound); + } + + block_height(db, &tx_ro, &chain[split])?.ok_or(BlockchainError::NotFound)? + }; + + if height == blockchain_height { + return Ok(BlockchainResponse::BlockCompleteEntriesAboveSplitPoint { + blocks: vec![], + output_indices: vec![], + blockchain_height, + start_height: height, + top_hash, + }); + } + + let mut tx_count = 0; + let mut total_size = 0; + + let blocks: Vec<_> = (height..min(height + len, blockchain_height)) + .map_while(|height| { + if total_size >= MAX_TOTAL_SIZE || tx_count >= MAX_TOTAL_TXS { + return None; + } + + let block = match get_block_complete_entry_from_height(height, pruned, &tapes, db) { + Ok(v) => v, + Err(e) => return Some(Err(e)), + }; + + tx_count += block.txs.len() + 1; + + let tx_blobs_size = match &block.txs { + TransactionBlobs::None => 0, + TransactionBlobs::Normal(b) => b.iter().map(Bytes::len).sum(), + TransactionBlobs::Pruned(p) => p.iter().map(|p| p.blob.len() + 32).sum(), + }; + + total_size += block.block.len() + tx_blobs_size; + + Some(Ok(block)) + }) + .collect::>()?; + + let first_tx_idx = tapes + .read_entry(&db.block_infos, usize_to_u64(height))? + .ok_or(BlockchainError::NotFound)? + .mining_tx_index; + + let mut output_indices = Vec::with_capacity(blocks.len()); + output_indices.push(Vec::with_capacity(8)); + + let mut last_height = height; + let mut miner_tx = true; + + for (i, tx_info) in tapes.iter_from(&db.tx_infos, first_tx_idx)?.enumerate() { + let tx_info = tx_info?; + + if tx_info.height != last_height { + if tx_info.height == height + blocks.len() { + // We have gone past all txs in the blocks we need + break; + } + last_height = tx_info.height; + miner_tx = true; + output_indices.push(Vec::with_capacity(8)); + } + + // monerod replaces the miner tx's indices with an empty + // placeholder when `no_miner_tx` is set. + if no_miner_tx && miner_tx { + miner_tx = false; + output_indices.last_mut().unwrap().push(vec![]); + continue; + } + miner_tx = false; + + let o_indexes = if tx_info.is_v1_tx() { + // For v1 txs we need to look up indexes. + let res = tx_ro + .get( + &db.v1_tx_outputs, + (first_tx_idx + usize_to_u64(i)).to_le_bytes(), + )? + .ok_or(BlockchainError::NotFound)?; + + res.chunks(8) + .map(|chunk| u64::from_le_bytes(chunk.try_into().unwrap())) + .collect::>() + } else { + // For v2 we can use the data in the tx_info. + (0..tx_info.numb_rct_outputs) + .map(|i| usize_to_u64(i) + tx_info.rct_output_start_idx) + .collect() + }; + + output_indices.last_mut().unwrap().push(o_indexes); + } + + Ok(BlockchainResponse::BlockCompleteEntriesAboveSplitPoint { + blocks, + output_indices, + blockchain_height, + start_height: height, + top_hash, + }) +} + /// [`BlockchainReadRequest::BlockCompleteEntriesByHeight`]. fn block_complete_entries_by_height( db: &BlockchainDatabase, diff --git a/storage/txpool/src/service/read.rs b/storage/txpool/src/service/read.rs index 28ac9f983..38eb284e0 100644 --- a/storage/txpool/src/service/read.rs +++ b/storage/txpool/src/service/read.rs @@ -18,13 +18,14 @@ use rayon::ThreadPool; use tower::Service; use cuprate_helper::asynch::InfallibleOneshotReceiver; +use cuprate_types::TxInPool; use crate::{ error::TxPoolError, ops::{get_transaction_verification_data, in_stem_pool}, service::interface::{TxpoolReadRequest, TxpoolReadResponse}, txpool::TxpoolDatabase, - types::{TransactionBlobHash, TransactionHash, TransactionInfo}, + types::{TransactionBlobHash, TransactionHash, TransactionInfo, TxStateFlags}, TxEntry, }; @@ -294,7 +295,33 @@ fn txs_by_hash( tx_hashes: Vec<[u8; 32]>, include_sensitive_txs: bool, ) -> Result { - Ok(TxpoolReadResponse::TxsByHash(todo!())) + let snapshot = db.fjall_database.snapshot(); + let mut txs = Vec::with_capacity(tx_hashes.len()); + + for tx_hash in tx_hashes { + let Some(info_bytes) = snapshot.get(&db.tx_infos, tx_hash)? else { + continue; + }; + let tx_info: TransactionInfo = bytemuck::pod_read_unaligned(info_bytes.as_ref()); + + if !include_sensitive_txs && tx_info.flags.private() { + continue; + } + + let Some(blob) = snapshot.get(&db.tx_blobs, tx_hash)? else { + continue; + }; + + txs.push(TxInPool { + tx_hash, + tx_blob: blob.to_vec(), + double_spend_seen: tx_info.flags.contains(TxStateFlags::DOUBLE_SPENT), + received_timestamp: tx_info.received_at, + relayed: !tx_info.flags.private(), + }); + } + + Ok(TxpoolReadResponse::TxsByHash(txs)) } /// Returns whether a key image is spent by a transaction in the pool. diff --git a/types/types/src/blockchain.rs b/types/types/src/blockchain.rs index ab61c49a1..65c963505 100644 --- a/types/types/src/blockchain.rs +++ b/types/types/src/blockchain.rs @@ -42,6 +42,19 @@ pub enum BlockchainReadRequest { /// The input is the block heights. BlockCompleteEntriesByHeight(Vec), + /// Request [`BlockCompleteEntry`]s (and output indices) for the + /// blocks above the split point between our chain and the given chain. + BlockCompleteEntriesAboveSplitPoint { + chain: Vec<[u8; 32]>, + /// If `Some`, skip the chain scan and start serving from this height directly. + start_height: Option, + /// If `true`, each block's miner-tx entry in the output indices is an + /// empty placeholder. + no_miner_tx: bool, + len: usize, + pruned: bool, + }, + /// Request a block's extended header. /// /// The input is the block's height. @@ -265,6 +278,22 @@ pub enum BlockchainResponse { /// Response to [`BlockchainReadRequest::BlockCompleteEntriesByHeight`]. BlockCompleteEntriesByHeight(Vec), + /// Response to [`BlockchainReadRequest::BlockCompleteEntriesAboveSplitPoint`]. + BlockCompleteEntriesAboveSplitPoint { + /// The [`BlockCompleteEntry`]s that we had. + blocks: Vec, + /// The output indices of all transaction outputs across all blocks. + /// + /// `output_indices[block][tx][output]`, including the miner tx (miner tx will be empty if not requested). + output_indices: Vec>>, + /// Our blockchain height. + blockchain_height: usize, + /// The height the returned blocks start from. + start_height: usize, + /// Hash of the current top block. + top_hash: [u8; 32], + }, + /// Response to [`BlockchainReadRequest::BlockExtendedHeader`]. /// /// Inner value is the extended headed of the requested block. From b38e8d878b65b990897218ed31deb54f7a967c87 Mon Sep 17 00:00:00 2001 From: Boog900 <54e72d8a-345f-4599-bd90-c6b9bc7d0ec5@aleeas.com> Date: Fri, 12 Jun 2026 23:53:02 +0100 Subject: [PATCH 2/2] fmt --- binaries/cuprated/src/rpc/handlers/helper.rs | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/binaries/cuprated/src/rpc/handlers/helper.rs b/binaries/cuprated/src/rpc/handlers/helper.rs index 7b5314617..31111b97e 100644 --- a/binaries/cuprated/src/rpc/handlers/helper.rs +++ b/binaries/cuprated/src/rpc/handlers/helper.rs @@ -132,10 +132,7 @@ pub(super) async fn block_header_by_hash( /// # Errors /// This returns the [`top_height`] on [`Ok`] and /// returns [`Error`] if `height` is greater than [`top_height`]. -pub(super) fn check_height( - state: &mut CupratedRpcHandler, - height: u64, -) -> Result { +pub(super) fn check_height(state: &mut CupratedRpcHandler, height: u64) -> Result { let (top_height, _) = top_height(state); if height > top_height {