diff --git a/.github/assets/kurtosis_op_network_params.yaml b/.github/assets/kurtosis_op_network_params.yaml index 87670587395..5dcc418fe08 100644 --- a/.github/assets/kurtosis_op_network_params.yaml +++ b/.github/assets/kurtosis_op_network_params.yaml @@ -4,7 +4,6 @@ ethereum_package: el_extra_params: - "--rpc.eth-proof-window=100" cl_type: teku - cl_image: "consensys/teku:25.4.0" network_params: preset: minimal genesis_delay: 5 diff --git a/crates/engine/tree/src/tree/mod.rs b/crates/engine/tree/src/tree/mod.rs index 9b261cd805a..6aacd576be2 100644 --- a/crates/engine/tree/src/tree/mod.rs +++ b/crates/engine/tree/src/tree/mod.rs @@ -73,6 +73,8 @@ mod metrics; mod payload_processor; mod persistence_state; pub mod precompile_cache; +#[cfg(test)] +mod tests; // TODO(alexey): compare trie updates in `insert_block_inner` #[expect(unused)] mod trie_updates; @@ -2891,1415 +2893,3 @@ impl PersistingKind { matches!(self, Self::PersistingDescendant) } } -#[cfg(test)] -mod tests { - use super::*; - use crate::persistence::PersistenceAction; - use alloy_consensus::Header; - use alloy_primitives::{ - map::{HashMap, HashSet}, - Bytes, B256, - }; - use alloy_rlp::Decodable; - use alloy_rpc_types_engine::{ - CancunPayloadFields, ExecutionData, ExecutionPayloadSidecar, ExecutionPayloadV1, - ExecutionPayloadV3, - }; - use assert_matches::assert_matches; - use reth_chain_state::{test_utils::TestBlockBuilder, BlockState}; - use reth_chainspec::{ChainSpec, HOLESKY, MAINNET}; - use reth_engine_primitives::ForkchoiceStatus; - use reth_ethereum_consensus::EthBeaconConsensus; - use reth_ethereum_engine_primitives::EthEngineTypes; - use reth_ethereum_primitives::{Block, EthPrimitives}; - use reth_evm_ethereum::MockEvmConfig; - use reth_node_ethereum::EthereumEngineValidator; - use reth_primitives_traits::Block as _; - use reth_provider::test_utils::MockEthProvider; - use reth_trie::HashedPostState; - use std::{ - collections::BTreeMap, - str::FromStr, - sync::mpsc::{channel, Sender}, - }; - - /// This is a test channel that allows you to `release` any value that is in the channel. - /// - /// If nothing has been sent, then the next value will be immediately sent. - struct TestChannel { - /// If an item is sent to this channel, an item will be released in the wrapped channel - release: Receiver<()>, - /// The sender channel - tx: Sender, - /// The receiver channel - rx: Receiver, - } - - impl TestChannel { - /// Creates a new test channel - fn spawn_channel() -> (Sender, Receiver, TestChannelHandle) { - let (original_tx, original_rx) = channel(); - let (wrapped_tx, wrapped_rx) = channel(); - let (release_tx, release_rx) = channel(); - let handle = TestChannelHandle::new(release_tx); - let test_channel = Self { release: release_rx, tx: wrapped_tx, rx: original_rx }; - // spawn the task that listens and releases stuff - std::thread::spawn(move || test_channel.intercept_loop()); - (original_tx, wrapped_rx, handle) - } - - /// Runs the intercept loop, waiting for the handle to release a value - fn intercept_loop(&self) { - while self.release.recv() == Ok(()) { - let Ok(value) = self.rx.recv() else { return }; - - let _ = self.tx.send(value); - } - } - } - - struct TestChannelHandle { - /// The sender to use for releasing values - release: Sender<()>, - } - - impl TestChannelHandle { - /// Returns a [`TestChannelHandle`] - const fn new(release: Sender<()>) -> Self { - Self { release } - } - - /// Signals to the channel task that a value should be released - #[expect(dead_code)] - fn release(&self) { - let _ = self.release.send(()); - } - } - - struct TestHarness { - tree: EngineApiTreeHandler< - EthPrimitives, - MockEthProvider, - EthEngineTypes, - EthereumEngineValidator, - MockEvmConfig, - >, - to_tree_tx: Sender, Block>>, - from_tree_rx: UnboundedReceiver, - blocks: Vec, - action_rx: Receiver, - evm_config: MockEvmConfig, - block_builder: TestBlockBuilder, - provider: MockEthProvider, - } - - impl TestHarness { - fn new(chain_spec: Arc) -> Self { - let (action_tx, action_rx) = channel(); - Self::with_persistence_channel(chain_spec, action_tx, action_rx) - } - - #[expect(dead_code)] - fn with_test_channel(chain_spec: Arc) -> (Self, TestChannelHandle) { - let (action_tx, action_rx, handle) = TestChannel::spawn_channel(); - (Self::with_persistence_channel(chain_spec, action_tx, action_rx), handle) - } - - fn with_persistence_channel( - chain_spec: Arc, - action_tx: Sender, - action_rx: Receiver, - ) -> Self { - let persistence_handle = PersistenceHandle::new(action_tx); - - let consensus = Arc::new(EthBeaconConsensus::new(chain_spec.clone())); - - let provider = MockEthProvider::default(); - - let payload_validator = EthereumEngineValidator::new(chain_spec.clone()); - - let (from_tree_tx, from_tree_rx) = unbounded_channel(); - - let header = chain_spec.genesis_header().clone(); - let header = SealedHeader::seal_slow(header); - let engine_api_tree_state = - EngineApiTreeState::new(10, 10, header.num_hash(), EngineApiKind::Ethereum); - let canonical_in_memory_state = CanonicalInMemoryState::with_head(header, None, None); - - let (to_payload_service, _payload_command_rx) = unbounded_channel(); - let payload_builder = PayloadBuilderHandle::new(to_payload_service); - - let evm_config = MockEvmConfig::default(); - - let tree = EngineApiTreeHandler::new( - provider.clone(), - consensus, - payload_validator, - from_tree_tx, - engine_api_tree_state, - canonical_in_memory_state, - persistence_handle, - PersistenceState::default(), - payload_builder, - // TODO: fix tests for state root task https://github.com/paradigmxyz/reth/issues/14376 - // always assume enough parallelism for tests - TreeConfig::default() - .with_legacy_state_root(true) - .with_has_enough_parallelism(true), - EngineApiKind::Ethereum, - evm_config.clone(), - ); - - let block_builder = TestBlockBuilder::default().with_chain_spec((*chain_spec).clone()); - Self { - to_tree_tx: tree.incoming_tx.clone(), - tree, - from_tree_rx, - blocks: vec![], - action_rx, - evm_config, - block_builder, - provider, - } - } - - fn with_blocks(mut self, blocks: Vec) -> Self { - let mut blocks_by_hash = HashMap::default(); - let mut blocks_by_number = BTreeMap::new(); - let mut state_by_hash = HashMap::default(); - let mut hash_by_number = BTreeMap::new(); - let mut parent_to_child: HashMap> = HashMap::default(); - let mut parent_hash = B256::ZERO; - - for block in &blocks { - let sealed_block = block.recovered_block(); - let hash = sealed_block.hash(); - let number = sealed_block.number; - blocks_by_hash.insert(hash, block.clone()); - blocks_by_number.entry(number).or_insert_with(Vec::new).push(block.clone()); - state_by_hash.insert(hash, Arc::new(BlockState::new(block.clone()))); - hash_by_number.insert(number, hash); - parent_to_child.entry(parent_hash).or_default().insert(hash); - parent_hash = hash; - } - - self.tree.state.tree_state = TreeState { - blocks_by_hash, - blocks_by_number, - current_canonical_head: blocks.last().unwrap().recovered_block().num_hash(), - parent_to_child, - persisted_trie_updates: HashMap::default(), - engine_kind: EngineApiKind::Ethereum, - }; - - let last_executed_block = blocks.last().unwrap().clone(); - let pending = Some(BlockState::new(last_executed_block)); - self.tree.canonical_in_memory_state = - CanonicalInMemoryState::new(state_by_hash, hash_by_number, pending, None, None); - - self.blocks = blocks.clone(); - - let recovered_blocks = - blocks.iter().map(|b| b.recovered_block().clone()).collect::>(); - - self.persist_blocks(recovered_blocks); - - self - } - - const fn with_backfill_state(mut self, state: BackfillSyncState) -> Self { - self.tree.backfill_sync_state = state; - self - } - - fn extend_execution_outcome( - &self, - execution_outcomes: impl IntoIterator>, - ) { - self.evm_config.extend(execution_outcomes); - } - - fn insert_block( - &mut self, - block: RecoveredBlock, - ) -> Result> { - let execution_outcome = self.block_builder.get_execution_outcome(block.clone()); - self.extend_execution_outcome([execution_outcome]); - self.tree.provider.add_state_root(block.state_root); - self.tree.insert_block(block) - } - - async fn fcu_to(&mut self, block_hash: B256, fcu_status: impl Into) { - let fcu_status = fcu_status.into(); - - self.send_fcu(block_hash, fcu_status).await; - - self.check_fcu(block_hash, fcu_status).await; - } - - async fn send_fcu(&mut self, block_hash: B256, fcu_status: impl Into) { - let fcu_state = self.fcu_state(block_hash); - - let (tx, rx) = oneshot::channel(); - self.tree - .on_engine_message(FromEngine::Request( - BeaconEngineMessage::ForkchoiceUpdated { - state: fcu_state, - payload_attrs: None, - tx, - version: EngineApiMessageVersion::default(), - } - .into(), - )) - .unwrap(); - - let response = rx.await.unwrap().unwrap().await.unwrap(); - match fcu_status.into() { - ForkchoiceStatus::Valid => assert!(response.payload_status.is_valid()), - ForkchoiceStatus::Syncing => assert!(response.payload_status.is_syncing()), - ForkchoiceStatus::Invalid => assert!(response.payload_status.is_invalid()), - } - } - - async fn check_fcu(&mut self, block_hash: B256, fcu_status: impl Into) { - let fcu_state = self.fcu_state(block_hash); - - // check for ForkchoiceUpdated event - let event = self.from_tree_rx.recv().await.unwrap(); - match event { - EngineApiEvent::BeaconConsensus(BeaconConsensusEngineEvent::ForkchoiceUpdated( - state, - status, - )) => { - assert_eq!(state, fcu_state); - assert_eq!(status, fcu_status.into()); - } - _ => panic!("Unexpected event: {event:#?}"), - } - } - - const fn fcu_state(&self, block_hash: B256) -> ForkchoiceState { - ForkchoiceState { - head_block_hash: block_hash, - safe_block_hash: block_hash, - finalized_block_hash: block_hash, - } - } - - async fn send_new_payload( - &mut self, - block: RecoveredBlock, - ) { - let payload = ExecutionPayloadV3::from_block_unchecked( - block.hash(), - &block.clone_sealed_block().into_block(), - ); - self.tree - .on_new_payload(ExecutionData { - payload: payload.into(), - sidecar: ExecutionPayloadSidecar::v3(CancunPayloadFields { - parent_beacon_block_root: block.parent_beacon_block_root.unwrap(), - versioned_hashes: vec![], - }), - }) - .unwrap(); - } - - async fn insert_chain( - &mut self, - chain: impl IntoIterator> + Clone, - ) { - for block in chain.clone() { - self.insert_block(block.clone()).unwrap(); - } - self.check_canon_chain_insertion(chain).await; - } - - async fn check_canon_commit(&mut self, hash: B256) { - let event = self.from_tree_rx.recv().await.unwrap(); - match event { - EngineApiEvent::BeaconConsensus( - BeaconConsensusEngineEvent::CanonicalChainCommitted(header, _), - ) => { - assert_eq!(header.hash(), hash); - } - _ => panic!("Unexpected event: {event:#?}"), - } - } - - async fn check_fork_chain_insertion( - &mut self, - chain: impl IntoIterator> + Clone, - ) { - for block in chain { - self.check_fork_block_added(block.hash()).await; - } - } - - async fn check_canon_chain_insertion( - &mut self, - chain: impl IntoIterator> + Clone, - ) { - for block in chain.clone() { - self.check_canon_block_added(block.hash()).await; - } - } - - async fn check_canon_block_added(&mut self, expected_hash: B256) { - let event = self.from_tree_rx.recv().await.unwrap(); - match event { - EngineApiEvent::BeaconConsensus( - BeaconConsensusEngineEvent::CanonicalBlockAdded(executed, _), - ) => { - assert_eq!(executed.recovered_block.hash(), expected_hash); - } - _ => panic!("Unexpected event: {event:#?}"), - } - } - - async fn check_fork_block_added(&mut self, expected_hash: B256) { - let event = self.from_tree_rx.recv().await.unwrap(); - match event { - EngineApiEvent::BeaconConsensus(BeaconConsensusEngineEvent::ForkBlockAdded( - executed, - _, - )) => { - assert_eq!(executed.recovered_block.hash(), expected_hash); - } - _ => panic!("Unexpected event: {event:#?}"), - } - } - - async fn check_invalid_block(&mut self, expected_hash: B256) { - let event = self.from_tree_rx.recv().await.unwrap(); - match event { - EngineApiEvent::BeaconConsensus(BeaconConsensusEngineEvent::InvalidBlock( - block, - )) => { - assert_eq!(block.hash(), expected_hash); - } - _ => panic!("Unexpected event: {event:#?}"), - } - } - - fn persist_blocks(&self, blocks: Vec>) { - let mut block_data: Vec<(B256, Block)> = Vec::with_capacity(blocks.len()); - let mut headers_data: Vec<(B256, Header)> = Vec::with_capacity(blocks.len()); - - for block in &blocks { - block_data.push((block.hash(), block.clone_block())); - headers_data.push((block.hash(), block.header().clone())); - } - - self.provider.extend_blocks(block_data); - self.provider.extend_headers(headers_data); - } - - fn setup_range_insertion_for_valid_chain( - &mut self, - chain: Vec>, - ) { - self.setup_range_insertion_for_chain(chain, None) - } - - fn setup_range_insertion_for_invalid_chain( - &mut self, - chain: Vec>, - index: usize, - ) { - self.setup_range_insertion_for_chain(chain, Some(index)) - } - - fn setup_range_insertion_for_chain( - &mut self, - chain: Vec>, - invalid_index: Option, - ) { - // setting up execution outcomes for the chain, the blocks will be - // executed starting from the oldest, so we need to reverse. - let mut chain_rev = chain; - chain_rev.reverse(); - - let mut execution_outcomes = Vec::with_capacity(chain_rev.len()); - for (index, block) in chain_rev.iter().enumerate() { - let execution_outcome = self.block_builder.get_execution_outcome(block.clone()); - let state_root = if invalid_index.is_some() && invalid_index.unwrap() == index { - B256::random() - } else { - block.state_root - }; - self.tree.provider.add_state_root(state_root); - execution_outcomes.push(execution_outcome); - } - self.extend_execution_outcome(execution_outcomes); - } - - fn check_canon_head(&self, head_hash: B256) { - assert_eq!(self.tree.state.tree_state.canonical_head().hash, head_hash); - } - } - - #[test] - fn test_tree_persist_block_batch() { - let tree_config = TreeConfig::default(); - let chain_spec = MAINNET.clone(); - let mut test_block_builder = TestBlockBuilder::eth().with_chain_spec((*chain_spec).clone()); - - // we need more than tree_config.persistence_threshold() +1 blocks to - // trigger the persistence task. - let blocks: Vec<_> = test_block_builder - .get_executed_blocks(1..tree_config.persistence_threshold() + 2) - .collect(); - let mut test_harness = TestHarness::new(chain_spec).with_blocks(blocks); - - let mut blocks = vec![]; - for idx in 0..tree_config.max_execute_block_batch_size() * 2 { - blocks.push(test_block_builder.generate_random_block(idx as u64, B256::random())); - } - - test_harness.to_tree_tx.send(FromEngine::DownloadedBlocks(blocks)).unwrap(); - - // process the message - let msg = test_harness.tree.try_recv_engine_message().unwrap().unwrap(); - test_harness.tree.on_engine_message(msg).unwrap(); - - // we now should receive the other batch - let msg = test_harness.tree.try_recv_engine_message().unwrap().unwrap(); - match msg { - FromEngine::DownloadedBlocks(blocks) => { - assert_eq!(blocks.len(), tree_config.max_execute_block_batch_size()); - } - _ => panic!("unexpected message: {msg:#?}"), - } - } - - #[tokio::test] - async fn test_tree_persist_blocks() { - let tree_config = TreeConfig::default(); - let chain_spec = MAINNET.clone(); - let mut test_block_builder = TestBlockBuilder::eth().with_chain_spec((*chain_spec).clone()); - - // we need more than tree_config.persistence_threshold() +1 blocks to - // trigger the persistence task. - let blocks: Vec<_> = test_block_builder - .get_executed_blocks(1..tree_config.persistence_threshold() + 2) - .collect(); - let test_harness = TestHarness::new(chain_spec).with_blocks(blocks.clone()); - std::thread::Builder::new() - .name("Tree Task".to_string()) - .spawn(|| test_harness.tree.run()) - .unwrap(); - - // send a message to the tree to enter the main loop. - test_harness.to_tree_tx.send(FromEngine::DownloadedBlocks(vec![])).unwrap(); - - let received_action = - test_harness.action_rx.recv().expect("Failed to receive save blocks action"); - if let PersistenceAction::SaveBlocks(saved_blocks, _) = received_action { - // only blocks.len() - tree_config.memory_block_buffer_target() will be - // persisted - let expected_persist_len = - blocks.len() - tree_config.memory_block_buffer_target() as usize; - assert_eq!(saved_blocks.len(), expected_persist_len); - assert_eq!(saved_blocks, blocks[..expected_persist_len]); - } else { - panic!("unexpected action received {received_action:?}"); - } - } - - #[tokio::test] - async fn test_in_memory_state_trait_impl() { - let blocks: Vec<_> = TestBlockBuilder::eth().get_executed_blocks(0..10).collect(); - let test_harness = TestHarness::new(MAINNET.clone()).with_blocks(blocks.clone()); - - for executed_block in blocks { - let sealed_block = executed_block.recovered_block(); - - let expected_state = BlockState::new(executed_block.clone()); - - let actual_state_by_hash = test_harness - .tree - .canonical_in_memory_state - .state_by_hash(sealed_block.hash()) - .unwrap(); - assert_eq!(expected_state, *actual_state_by_hash); - - let actual_state_by_number = test_harness - .tree - .canonical_in_memory_state - .state_by_number(sealed_block.number) - .unwrap(); - assert_eq!(expected_state, *actual_state_by_number); - } - } - - #[tokio::test] - async fn test_engine_request_during_backfill() { - let tree_config = TreeConfig::default(); - let blocks: Vec<_> = TestBlockBuilder::eth() - .get_executed_blocks(0..tree_config.persistence_threshold()) - .collect(); - let mut test_harness = TestHarness::new(MAINNET.clone()) - .with_blocks(blocks) - .with_backfill_state(BackfillSyncState::Active); - - let (tx, rx) = oneshot::channel(); - test_harness - .tree - .on_engine_message(FromEngine::Request( - BeaconEngineMessage::ForkchoiceUpdated { - state: ForkchoiceState { - head_block_hash: B256::random(), - safe_block_hash: B256::random(), - finalized_block_hash: B256::random(), - }, - payload_attrs: None, - tx, - version: EngineApiMessageVersion::default(), - } - .into(), - )) - .unwrap(); - - let resp = rx.await.unwrap().unwrap().await.unwrap(); - assert!(resp.payload_status.is_syncing()); - } - - #[test] - fn test_disconnected_payload() { - let s = include_str!("../../test-data/holesky/2.rlp"); - let data = Bytes::from_str(s).unwrap(); - let block = Block::decode(&mut data.as_ref()).unwrap(); - let sealed = block.seal_slow(); - let hash = sealed.hash(); - let payload = ExecutionPayloadV1::from_block_unchecked(hash, &sealed.clone().into_block()); - - let mut test_harness = TestHarness::new(HOLESKY.clone()); - - let outcome = test_harness - .tree - .on_new_payload(ExecutionData { - payload: payload.into(), - sidecar: ExecutionPayloadSidecar::none(), - }) - .unwrap(); - assert!(outcome.outcome.is_syncing()); - - // ensure block is buffered - let buffered = test_harness.tree.state.buffer.block(&hash).unwrap(); - assert_eq!(buffered.clone_sealed_block(), sealed); - } - - #[test] - fn test_disconnected_block() { - let s = include_str!("../../test-data/holesky/2.rlp"); - let data = Bytes::from_str(s).unwrap(); - let block = Block::decode(&mut data.as_ref()).unwrap(); - let sealed = block.seal_slow().try_recover().unwrap(); - - let mut test_harness = TestHarness::new(HOLESKY.clone()); - - let outcome = test_harness.tree.insert_block(sealed.clone()).unwrap(); - assert_eq!( - outcome, - InsertPayloadOk::Inserted(BlockStatus::Disconnected { - head: test_harness.tree.state.tree_state.current_canonical_head, - missing_ancestor: sealed.parent_num_hash() - }) - ); - } - - #[tokio::test] - async fn test_holesky_payload() { - let s = include_str!("../../test-data/holesky/1.rlp"); - let data = Bytes::from_str(s).unwrap(); - let block: Block = Block::decode(&mut data.as_ref()).unwrap(); - let sealed = block.seal_slow(); - let payload = - ExecutionPayloadV1::from_block_unchecked(sealed.hash(), &sealed.clone().into_block()); - - let mut test_harness = - TestHarness::new(HOLESKY.clone()).with_backfill_state(BackfillSyncState::Active); - - let (tx, rx) = oneshot::channel(); - test_harness - .tree - .on_engine_message(FromEngine::Request( - BeaconEngineMessage::NewPayload { - payload: ExecutionData { - payload: payload.clone().into(), - sidecar: ExecutionPayloadSidecar::none(), - }, - tx, - } - .into(), - )) - .unwrap(); - - let resp = rx.await.unwrap().unwrap(); - assert!(resp.is_syncing()); - } - - #[tokio::test] - async fn test_tree_state_on_new_head_reorg() { - reth_tracing::init_test_tracing(); - let chain_spec = MAINNET.clone(); - - // Set persistence_threshold to 1 - let mut test_harness = TestHarness::new(chain_spec); - test_harness.tree.config = test_harness - .tree - .config - .with_persistence_threshold(1) - .with_memory_block_buffer_target(1); - let mut test_block_builder = TestBlockBuilder::eth(); - let blocks: Vec<_> = test_block_builder.get_executed_blocks(1..6).collect(); - - for block in &blocks { - test_harness.tree.state.tree_state.insert_executed(block.clone()); - } - - // set block 3 as the current canonical head - test_harness - .tree - .state - .tree_state - .set_canonical_head(blocks[2].recovered_block().num_hash()); - - // create a fork from block 2 - let fork_block_3 = test_block_builder - .get_executed_block_with_number(3, blocks[1].recovered_block().hash()); - let fork_block_4 = test_block_builder - .get_executed_block_with_number(4, fork_block_3.recovered_block().hash()); - let fork_block_5 = test_block_builder - .get_executed_block_with_number(5, fork_block_4.recovered_block().hash()); - - test_harness.tree.state.tree_state.insert_executed(fork_block_3.clone()); - test_harness.tree.state.tree_state.insert_executed(fork_block_4.clone()); - test_harness.tree.state.tree_state.insert_executed(fork_block_5.clone()); - - // normal (non-reorg) case - let result = test_harness.tree.on_new_head(blocks[4].recovered_block().hash()).unwrap(); - assert!(matches!(result, Some(NewCanonicalChain::Commit { .. }))); - if let Some(NewCanonicalChain::Commit { new }) = result { - assert_eq!(new.len(), 2); - assert_eq!(new[0].recovered_block().hash(), blocks[3].recovered_block().hash()); - assert_eq!(new[1].recovered_block().hash(), blocks[4].recovered_block().hash()); - } - - // should be a None persistence action before we advance persistence - let current_action = test_harness.tree.persistence_state.current_action(); - assert_eq!(current_action, None); - - // let's attempt to persist and check that it attempts to save blocks - // - // since in-memory block buffer target and persistence_threshold are both 1, this should - // save all but the current tip of the canonical chain (up to blocks[1]) - test_harness.tree.advance_persistence().unwrap(); - let current_action = test_harness.tree.persistence_state.current_action().cloned(); - assert_eq!( - current_action, - Some(CurrentPersistenceAction::SavingBlocks { - highest: blocks[1].recovered_block().num_hash() - }) - ); - - // get rid of the prev action - let received_action = test_harness.action_rx.recv().unwrap(); - let PersistenceAction::SaveBlocks(saved_blocks, sender) = received_action else { - panic!("received wrong action"); - }; - assert_eq!(saved_blocks, vec![blocks[0].clone(), blocks[1].clone()]); - - // send the response so we can advance again - sender.send(Some(blocks[1].recovered_block().num_hash())).unwrap(); - - // we should be persisting blocks[1] because we threw out the prev action - let current_action = test_harness.tree.persistence_state.current_action().cloned(); - assert_eq!( - current_action, - Some(CurrentPersistenceAction::SavingBlocks { - highest: blocks[1].recovered_block().num_hash() - }) - ); - - // after advancing persistence, we should be at `None` for the next action - test_harness.tree.advance_persistence().unwrap(); - let current_action = test_harness.tree.persistence_state.current_action().cloned(); - assert_eq!(current_action, None); - - // reorg case - let result = test_harness.tree.on_new_head(fork_block_5.recovered_block().hash()).unwrap(); - assert!(matches!(result, Some(NewCanonicalChain::Reorg { .. }))); - - if let Some(NewCanonicalChain::Reorg { new, old }) = result { - assert_eq!(new.len(), 3); - assert_eq!(new[0].recovered_block().hash(), fork_block_3.recovered_block().hash()); - assert_eq!(new[1].recovered_block().hash(), fork_block_4.recovered_block().hash()); - assert_eq!(new[2].recovered_block().hash(), fork_block_5.recovered_block().hash()); - - assert_eq!(old.len(), 1); - assert_eq!(old[0].recovered_block().hash(), blocks[2].recovered_block().hash()); - } - - // The canonical block has not changed, so we will not get any active persistence action - test_harness.tree.advance_persistence().unwrap(); - let current_action = test_harness.tree.persistence_state.current_action().cloned(); - assert_eq!(current_action, None); - - // Let's change the canonical head and advance persistence - test_harness - .tree - .state - .tree_state - .set_canonical_head(fork_block_5.recovered_block().num_hash()); - - // The canonical block has changed now, we should get fork_block_4 due to the persistence - // threshold and in memory block buffer target - test_harness.tree.advance_persistence().unwrap(); - let current_action = test_harness.tree.persistence_state.current_action().cloned(); - assert_eq!( - current_action, - Some(CurrentPersistenceAction::SavingBlocks { - highest: fork_block_4.recovered_block().num_hash() - }) - ); - } - - #[test] - fn test_tree_state_on_new_head_deep_fork() { - reth_tracing::init_test_tracing(); - - let chain_spec = MAINNET.clone(); - let mut test_harness = TestHarness::new(chain_spec); - let mut test_block_builder = TestBlockBuilder::eth(); - - let blocks: Vec<_> = test_block_builder.get_executed_blocks(0..5).collect(); - - for block in &blocks { - test_harness.tree.state.tree_state.insert_executed(block.clone()); - } - - // set last block as the current canonical head - let last_block = blocks.last().unwrap().recovered_block().clone(); - - test_harness.tree.state.tree_state.set_canonical_head(last_block.num_hash()); - - // create a fork chain from last_block - let chain_a = test_block_builder.create_fork(&last_block, 10); - let chain_b = test_block_builder.create_fork(&last_block, 10); - - for block in &chain_a { - test_harness.tree.state.tree_state.insert_executed(ExecutedBlockWithTrieUpdates { - block: ExecutedBlock { - recovered_block: Arc::new(block.clone()), - execution_output: Arc::new(ExecutionOutcome::default()), - hashed_state: Arc::new(HashedPostState::default()), - }, - trie: ExecutedTrieUpdates::empty(), - }); - } - test_harness.tree.state.tree_state.set_canonical_head(chain_a.last().unwrap().num_hash()); - - for block in &chain_b { - test_harness.tree.state.tree_state.insert_executed(ExecutedBlockWithTrieUpdates { - block: ExecutedBlock { - recovered_block: Arc::new(block.clone()), - execution_output: Arc::new(ExecutionOutcome::default()), - hashed_state: Arc::new(HashedPostState::default()), - }, - trie: ExecutedTrieUpdates::empty(), - }); - } - - // for each block in chain_b, reorg to it and then back to canonical - let mut expected_new = Vec::new(); - for block in &chain_b { - // reorg to chain from block b - let result = test_harness.tree.on_new_head(block.hash()).unwrap(); - assert_matches!(result, Some(NewCanonicalChain::Reorg { .. })); - - expected_new.push(block); - if let Some(NewCanonicalChain::Reorg { new, old }) = result { - assert_eq!(new.len(), expected_new.len()); - for (index, block) in expected_new.iter().enumerate() { - assert_eq!(new[index].recovered_block().hash(), block.hash()); - } - - assert_eq!(old.len(), chain_a.len()); - for (index, block) in chain_a.iter().enumerate() { - assert_eq!(old[index].recovered_block().hash(), block.hash()); - } - } - - // set last block of chain a as canonical head - test_harness.tree.on_new_head(chain_a.last().unwrap().hash()).unwrap(); - } - } - - #[tokio::test] - async fn test_get_canonical_blocks_to_persist() { - let chain_spec = MAINNET.clone(); - let mut test_harness = TestHarness::new(chain_spec); - let mut test_block_builder = TestBlockBuilder::eth(); - - let canonical_head_number = 9; - let blocks: Vec<_> = - test_block_builder.get_executed_blocks(0..canonical_head_number + 1).collect(); - test_harness = test_harness.with_blocks(blocks.clone()); - - let last_persisted_block_number = 3; - test_harness.tree.persistence_state.last_persisted_block = - blocks[last_persisted_block_number as usize].recovered_block.num_hash(); - - let persistence_threshold = 4; - let memory_block_buffer_target = 3; - test_harness.tree.config = TreeConfig::default() - .with_persistence_threshold(persistence_threshold) - .with_memory_block_buffer_target(memory_block_buffer_target); - - let blocks_to_persist = test_harness.tree.get_canonical_blocks_to_persist().unwrap(); - - let expected_blocks_to_persist_length: usize = - (canonical_head_number - memory_block_buffer_target - last_persisted_block_number) - .try_into() - .unwrap(); - - assert_eq!(blocks_to_persist.len(), expected_blocks_to_persist_length); - for (i, item) in - blocks_to_persist.iter().enumerate().take(expected_blocks_to_persist_length) - { - assert_eq!(item.recovered_block().number, last_persisted_block_number + i as u64 + 1); - } - - // make sure only canonical blocks are included - let fork_block = test_block_builder.get_executed_block_with_number(4, B256::random()); - let fork_block_hash = fork_block.recovered_block().hash(); - test_harness.tree.state.tree_state.insert_executed(fork_block); - - assert!(test_harness.tree.state.tree_state.block_by_hash(fork_block_hash).is_some()); - - let blocks_to_persist = test_harness.tree.get_canonical_blocks_to_persist().unwrap(); - assert_eq!(blocks_to_persist.len(), expected_blocks_to_persist_length); - - // check that the fork block is not included in the blocks to persist - assert!(!blocks_to_persist.iter().any(|b| b.recovered_block().hash() == fork_block_hash)); - - // check that the original block 4 is still included - assert!(blocks_to_persist.iter().any(|b| b.recovered_block().number == 4 && - b.recovered_block().hash() == blocks[4].recovered_block().hash())); - - // check that if we advance persistence, the persistence action is the correct value - test_harness.tree.advance_persistence().expect("advancing persistence should succeed"); - assert_eq!( - test_harness.tree.persistence_state.current_action().cloned(), - Some(CurrentPersistenceAction::SavingBlocks { - highest: blocks_to_persist.last().unwrap().recovered_block().num_hash() - }) - ); - } - - #[tokio::test] - async fn test_engine_tree_fcu_missing_head() { - let chain_spec = MAINNET.clone(); - let mut test_harness = TestHarness::new(chain_spec.clone()); - - let mut test_block_builder = TestBlockBuilder::eth().with_chain_spec((*chain_spec).clone()); - - let blocks: Vec<_> = test_block_builder.get_executed_blocks(0..5).collect(); - test_harness = test_harness.with_blocks(blocks); - - let missing_block = test_block_builder - .generate_random_block(6, test_harness.blocks.last().unwrap().recovered_block().hash()); - - test_harness.fcu_to(missing_block.hash(), PayloadStatusEnum::Syncing).await; - - // after FCU we receive an EngineApiEvent::Download event to get the missing block. - let event = test_harness.from_tree_rx.recv().await.unwrap(); - match event { - EngineApiEvent::Download(DownloadRequest::BlockSet(actual_block_set)) => { - let expected_block_set = HashSet::from_iter([missing_block.hash()]); - assert_eq!(actual_block_set, expected_block_set); - } - _ => panic!("Unexpected event: {event:#?}"), - } - } - - #[tokio::test] - async fn test_engine_tree_fcu_canon_chain_insertion() { - let chain_spec = MAINNET.clone(); - let mut test_harness = TestHarness::new(chain_spec.clone()); - - let base_chain: Vec<_> = test_harness.block_builder.get_executed_blocks(0..1).collect(); - test_harness = test_harness.with_blocks(base_chain.clone()); - - test_harness - .fcu_to(base_chain.last().unwrap().recovered_block().hash(), ForkchoiceStatus::Valid) - .await; - - // extend main chain - let main_chain = test_harness.block_builder.create_fork(base_chain[0].recovered_block(), 3); - - test_harness.insert_chain(main_chain).await; - } - - #[tokio::test] - async fn test_engine_tree_fcu_reorg_with_all_blocks() { - let chain_spec = MAINNET.clone(); - let mut test_harness = TestHarness::new(chain_spec.clone()); - - let main_chain: Vec<_> = test_harness.block_builder.get_executed_blocks(0..5).collect(); - test_harness = test_harness.with_blocks(main_chain.clone()); - - let fork_chain = test_harness.block_builder.create_fork(main_chain[2].recovered_block(), 3); - let fork_chain_last_hash = fork_chain.last().unwrap().hash(); - - // add fork blocks to the tree - for block in &fork_chain { - test_harness.insert_block(block.clone()).unwrap(); - } - - test_harness.send_fcu(fork_chain_last_hash, ForkchoiceStatus::Valid).await; - - // check for ForkBlockAdded events, we expect fork_chain.len() blocks added - test_harness.check_fork_chain_insertion(fork_chain.clone()).await; - - // check for CanonicalChainCommitted event - test_harness.check_canon_commit(fork_chain_last_hash).await; - - test_harness.check_fcu(fork_chain_last_hash, ForkchoiceStatus::Valid).await; - - // new head is the tip of the fork chain - test_harness.check_canon_head(fork_chain_last_hash); - } - - #[tokio::test] - async fn test_engine_tree_live_sync_transition_required_blocks_requested() { - reth_tracing::init_test_tracing(); - - let chain_spec = MAINNET.clone(); - let mut test_harness = TestHarness::new(chain_spec.clone()); - - let base_chain: Vec<_> = test_harness.block_builder.get_executed_blocks(0..1).collect(); - test_harness = test_harness.with_blocks(base_chain.clone()); - - test_harness - .fcu_to(base_chain.last().unwrap().recovered_block().hash(), ForkchoiceStatus::Valid) - .await; - - // extend main chain with enough blocks to trigger pipeline run but don't insert them - let main_chain = test_harness - .block_builder - .create_fork(base_chain[0].recovered_block(), MIN_BLOCKS_FOR_PIPELINE_RUN + 10); - - let main_chain_last_hash = main_chain.last().unwrap().hash(); - test_harness.send_fcu(main_chain_last_hash, ForkchoiceStatus::Syncing).await; - - test_harness.check_fcu(main_chain_last_hash, ForkchoiceStatus::Syncing).await; - - // create event for backfill finished - let backfill_finished_block_number = MIN_BLOCKS_FOR_PIPELINE_RUN + 1; - let backfill_finished = FromOrchestrator::BackfillSyncFinished(ControlFlow::Continue { - block_number: backfill_finished_block_number, - }); - - let backfill_tip_block = main_chain[(backfill_finished_block_number - 1) as usize].clone(); - // add block to mock provider to enable persistence clean up. - test_harness.provider.add_block(backfill_tip_block.hash(), backfill_tip_block.into_block()); - test_harness.tree.on_engine_message(FromEngine::Event(backfill_finished)).unwrap(); - - let event = test_harness.from_tree_rx.recv().await.unwrap(); - match event { - EngineApiEvent::Download(DownloadRequest::BlockSet(hash_set)) => { - assert_eq!(hash_set, HashSet::from_iter([main_chain_last_hash])); - } - _ => panic!("Unexpected event: {event:#?}"), - } - - test_harness - .tree - .on_engine_message(FromEngine::DownloadedBlocks(vec![main_chain - .last() - .unwrap() - .clone()])) - .unwrap(); - - let event = test_harness.from_tree_rx.recv().await.unwrap(); - match event { - EngineApiEvent::Download(DownloadRequest::BlockRange(initial_hash, total_blocks)) => { - assert_eq!( - total_blocks, - (main_chain.len() - backfill_finished_block_number as usize - 1) as u64 - ); - assert_eq!(initial_hash, main_chain.last().unwrap().parent_hash); - } - _ => panic!("Unexpected event: {event:#?}"), - } - } - - #[tokio::test] - async fn test_engine_tree_live_sync_transition_eventually_canonical() { - reth_tracing::init_test_tracing(); - - let chain_spec = MAINNET.clone(); - let mut test_harness = TestHarness::new(chain_spec.clone()); - test_harness.tree.config = test_harness.tree.config.with_max_execute_block_batch_size(100); - - // create base chain and setup test harness with it - let base_chain: Vec<_> = test_harness.block_builder.get_executed_blocks(0..1).collect(); - test_harness = test_harness.with_blocks(base_chain.clone()); - - // fcu to the tip of base chain - test_harness - .fcu_to(base_chain.last().unwrap().recovered_block().hash(), ForkchoiceStatus::Valid) - .await; - - // create main chain, extension of base chain, with enough blocks to - // trigger backfill sync - let main_chain = test_harness - .block_builder - .create_fork(base_chain[0].recovered_block(), MIN_BLOCKS_FOR_PIPELINE_RUN + 10); - - let main_chain_last = main_chain.last().unwrap(); - let main_chain_last_hash = main_chain_last.hash(); - let main_chain_backfill_target = - main_chain.get(MIN_BLOCKS_FOR_PIPELINE_RUN as usize).unwrap(); - let main_chain_backfill_target_hash = main_chain_backfill_target.hash(); - - // fcu to the element of main chain that should trigger backfill sync - test_harness.send_fcu(main_chain_backfill_target_hash, ForkchoiceStatus::Syncing).await; - test_harness.check_fcu(main_chain_backfill_target_hash, ForkchoiceStatus::Syncing).await; - - // check download request for target - let event = test_harness.from_tree_rx.recv().await.unwrap(); - match event { - EngineApiEvent::Download(DownloadRequest::BlockSet(hash_set)) => { - assert_eq!(hash_set, HashSet::from_iter([main_chain_backfill_target_hash])); - } - _ => panic!("Unexpected event: {event:#?}"), - } - - // send message to tell the engine the requested block was downloaded - test_harness - .tree - .on_engine_message(FromEngine::DownloadedBlocks(vec![ - main_chain_backfill_target.clone() - ])) - .unwrap(); - - // check that backfill is triggered - let event = test_harness.from_tree_rx.recv().await.unwrap(); - match event { - EngineApiEvent::BackfillAction(BackfillAction::Start( - reth_stages::PipelineTarget::Sync(target_hash), - )) => { - assert_eq!(target_hash, main_chain_backfill_target_hash); - } - _ => panic!("Unexpected event: {event:#?}"), - } - - // persist blocks of main chain, same as the backfill operation would do - let backfilled_chain: Vec<_> = - main_chain.clone().drain(0..(MIN_BLOCKS_FOR_PIPELINE_RUN + 1) as usize).collect(); - test_harness.persist_blocks(backfilled_chain.clone()); - - test_harness.setup_range_insertion_for_valid_chain(backfilled_chain); - - // send message to mark backfill finished - test_harness - .tree - .on_engine_message(FromEngine::Event(FromOrchestrator::BackfillSyncFinished( - ControlFlow::Continue { block_number: main_chain_backfill_target.number }, - ))) - .unwrap(); - - // send fcu to the tip of main - test_harness.fcu_to(main_chain_last_hash, ForkchoiceStatus::Syncing).await; - - let event = test_harness.from_tree_rx.recv().await.unwrap(); - match event { - EngineApiEvent::Download(DownloadRequest::BlockSet(target_hash)) => { - assert_eq!(target_hash, HashSet::from_iter([main_chain_last_hash])); - } - _ => panic!("Unexpected event: {event:#?}"), - } - - // tell engine main chain tip downloaded - test_harness - .tree - .on_engine_message(FromEngine::DownloadedBlocks(vec![main_chain_last.clone()])) - .unwrap(); - - // check download range request - let event = test_harness.from_tree_rx.recv().await.unwrap(); - match event { - EngineApiEvent::Download(DownloadRequest::BlockRange(initial_hash, total_blocks)) => { - assert_eq!( - total_blocks, - (main_chain.len() - MIN_BLOCKS_FOR_PIPELINE_RUN as usize - 2) as u64 - ); - assert_eq!(initial_hash, main_chain_last.parent_hash); - } - _ => panic!("Unexpected event: {event:#?}"), - } - - let remaining: Vec<_> = main_chain - .clone() - .drain((MIN_BLOCKS_FOR_PIPELINE_RUN + 1) as usize..main_chain.len()) - .collect(); - - test_harness.setup_range_insertion_for_valid_chain(remaining.clone()); - - // tell engine block range downloaded - test_harness - .tree - .on_engine_message(FromEngine::DownloadedBlocks(remaining.clone())) - .unwrap(); - - test_harness.check_canon_chain_insertion(remaining).await; - - // check canonical chain committed event with the hash of the latest block - test_harness.check_canon_commit(main_chain_last_hash).await; - - // new head is the tip of the main chain - test_harness.check_canon_head(main_chain_last_hash); - } - - #[tokio::test] - async fn test_engine_tree_live_sync_fcu_extends_canon_chain() { - reth_tracing::init_test_tracing(); - - let chain_spec = MAINNET.clone(); - let mut test_harness = TestHarness::new(chain_spec.clone()); - - // create base chain and setup test harness with it - let base_chain: Vec<_> = test_harness.block_builder.get_executed_blocks(0..1).collect(); - test_harness = test_harness.with_blocks(base_chain.clone()); - - // fcu to the tip of base chain - test_harness - .fcu_to(base_chain.last().unwrap().recovered_block().hash(), ForkchoiceStatus::Valid) - .await; - - // create main chain, extension of base chain - let main_chain = - test_harness.block_builder.create_fork(base_chain[0].recovered_block(), 10); - // determine target in the middle of main hain - let target = main_chain.get(5).unwrap(); - let target_hash = target.hash(); - let main_last = main_chain.last().unwrap(); - let main_last_hash = main_last.hash(); - - // insert main chain - test_harness.insert_chain(main_chain).await; - - // send fcu to target - test_harness.send_fcu(target_hash, ForkchoiceStatus::Valid).await; - - test_harness.check_canon_commit(target_hash).await; - test_harness.check_fcu(target_hash, ForkchoiceStatus::Valid).await; - - // send fcu to main tip - test_harness.send_fcu(main_last_hash, ForkchoiceStatus::Valid).await; - - test_harness.check_canon_commit(main_last_hash).await; - test_harness.check_fcu(main_last_hash, ForkchoiceStatus::Valid).await; - test_harness.check_canon_head(main_last_hash); - } - - #[tokio::test] - async fn test_engine_tree_valid_forks_with_older_canonical_head() { - reth_tracing::init_test_tracing(); - - let chain_spec = MAINNET.clone(); - let mut test_harness = TestHarness::new(chain_spec.clone()); - - // create base chain and setup test harness with it - let base_chain: Vec<_> = test_harness.block_builder.get_executed_blocks(0..1).collect(); - test_harness = test_harness.with_blocks(base_chain.clone()); - - let old_head = base_chain.first().unwrap().recovered_block(); - - // extend base chain - let extension_chain = test_harness.block_builder.create_fork(old_head, 5); - let fork_block = extension_chain.last().unwrap().clone_sealed_block(); - - test_harness.setup_range_insertion_for_valid_chain(extension_chain.clone()); - test_harness.insert_chain(extension_chain).await; - - // fcu to old_head - test_harness.fcu_to(old_head.hash(), ForkchoiceStatus::Valid).await; - - // create two competing chains starting from fork_block - let chain_a = test_harness.block_builder.create_fork(&fork_block, 10); - let chain_b = test_harness.block_builder.create_fork(&fork_block, 10); - - // insert chain A blocks using newPayload - test_harness.setup_range_insertion_for_valid_chain(chain_a.clone()); - for block in &chain_a { - test_harness.send_new_payload(block.clone()).await; - } - - test_harness.check_canon_chain_insertion(chain_a.clone()).await; - - // insert chain B blocks using newPayload - test_harness.setup_range_insertion_for_valid_chain(chain_b.clone()); - for block in &chain_b { - test_harness.send_new_payload(block.clone()).await; - } - - test_harness.check_canon_chain_insertion(chain_b.clone()).await; - - // send FCU to make the tip of chain B the new head - let chain_b_tip_hash = chain_b.last().unwrap().hash(); - test_harness.send_fcu(chain_b_tip_hash, ForkchoiceStatus::Valid).await; - - // check for CanonicalChainCommitted event - test_harness.check_canon_commit(chain_b_tip_hash).await; - - // verify FCU was processed - test_harness.check_fcu(chain_b_tip_hash, ForkchoiceStatus::Valid).await; - - // verify the new canonical head - test_harness.check_canon_head(chain_b_tip_hash); - - // verify that chain A is now considered a fork - assert!(test_harness.tree.is_fork(chain_a.last().unwrap().sealed_header()).unwrap()); - } - - #[tokio::test] - async fn test_engine_tree_buffered_blocks_are_eventually_connected() { - let chain_spec = MAINNET.clone(); - let mut test_harness = TestHarness::new(chain_spec.clone()); - - let base_chain: Vec<_> = test_harness.block_builder.get_executed_blocks(0..1).collect(); - test_harness = test_harness.with_blocks(base_chain.clone()); - - // side chain consisting of two blocks, the last will be inserted first - // so that we force it to be buffered - let side_chain = - test_harness.block_builder.create_fork(base_chain.last().unwrap().recovered_block(), 2); - - // buffer last block of side chain - let buffered_block = side_chain.last().unwrap(); - let buffered_block_hash = buffered_block.hash(); - - test_harness.setup_range_insertion_for_valid_chain(vec![buffered_block.clone()]); - test_harness.send_new_payload(buffered_block.clone()).await; - - assert!(test_harness.tree.state.buffer.block(&buffered_block_hash).is_some()); - - let non_buffered_block = side_chain.first().unwrap(); - let non_buffered_block_hash = non_buffered_block.hash(); - - // insert block that continues the canon chain, should not be buffered - test_harness.setup_range_insertion_for_valid_chain(vec![non_buffered_block.clone()]); - test_harness.send_new_payload(non_buffered_block.clone()).await; - assert!(test_harness.tree.state.buffer.block(&non_buffered_block_hash).is_none()); - - // the previously buffered block should be connected now - assert!(test_harness.tree.state.buffer.block(&buffered_block_hash).is_none()); - - // both blocks are added to the canon chain in order - test_harness.check_canon_block_added(non_buffered_block_hash).await; - test_harness.check_canon_block_added(buffered_block_hash).await; - } - - #[tokio::test] - async fn test_engine_tree_valid_and_invalid_forks_with_older_canonical_head() { - reth_tracing::init_test_tracing(); - - let chain_spec = MAINNET.clone(); - let mut test_harness = TestHarness::new(chain_spec.clone()); - - // create base chain and setup test harness with it - let base_chain: Vec<_> = test_harness.block_builder.get_executed_blocks(0..1).collect(); - test_harness = test_harness.with_blocks(base_chain.clone()); - - let old_head = base_chain.first().unwrap().recovered_block(); - - // extend base chain - let extension_chain = test_harness.block_builder.create_fork(old_head, 5); - let fork_block = extension_chain.last().unwrap().clone_sealed_block(); - test_harness.insert_chain(extension_chain).await; - - // fcu to old_head - test_harness.fcu_to(old_head.hash(), ForkchoiceStatus::Valid).await; - - // create two competing chains starting from fork_block, one of them invalid - let total_fork_elements = 10; - let chain_a = test_harness.block_builder.create_fork(&fork_block, total_fork_elements); - let chain_b = test_harness.block_builder.create_fork(&fork_block, total_fork_elements); - - // insert chain B blocks using newPayload - test_harness.setup_range_insertion_for_valid_chain(chain_b.clone()); - for block in &chain_b { - test_harness.send_new_payload(block.clone()).await; - test_harness.send_fcu(block.hash(), ForkchoiceStatus::Valid).await; - test_harness.check_canon_block_added(block.hash()).await; - test_harness.check_canon_commit(block.hash()).await; - test_harness.check_fcu(block.hash(), ForkchoiceStatus::Valid).await; - } - - // insert chain A blocks using newPayload, one of the blocks will be invalid - let invalid_index = 3; - test_harness.setup_range_insertion_for_invalid_chain(chain_a.clone(), invalid_index); - for block in &chain_a { - test_harness.send_new_payload(block.clone()).await; - } - - // check canon chain insertion up to the invalid index and taking into - // account reversed ordering - test_harness - .check_fork_chain_insertion( - chain_a[..chain_a.len() - invalid_index - 1].iter().cloned(), - ) - .await; - for block in &chain_a[chain_a.len() - invalid_index - 1..] { - test_harness.check_invalid_block(block.hash()).await; - } - - // send FCU to make the tip of chain A, expect invalid - let chain_a_tip_hash = chain_a.last().unwrap().hash(); - test_harness.fcu_to(chain_a_tip_hash, ForkchoiceStatus::Invalid).await; - - // send FCU to make the tip of chain B the new head - let chain_b_tip_hash = chain_b.last().unwrap().hash(); - - // verify the new canonical head - test_harness.check_canon_head(chain_b_tip_hash); - - // verify the canonical head didn't change - test_harness.check_canon_head(chain_b_tip_hash); - } - - #[tokio::test] - async fn test_engine_tree_reorg_with_missing_ancestor_expecting_valid() { - reth_tracing::init_test_tracing(); - let chain_spec = MAINNET.clone(); - let mut test_harness = TestHarness::new(chain_spec.clone()); - - let base_chain: Vec<_> = test_harness.block_builder.get_executed_blocks(0..6).collect(); - test_harness = test_harness.with_blocks(base_chain.clone()); - - // create a side chain with an invalid block - let side_chain = test_harness - .block_builder - .create_fork(base_chain.last().unwrap().recovered_block(), 15); - let invalid_index = 9; - - test_harness.setup_range_insertion_for_invalid_chain(side_chain.clone(), invalid_index); - - for (index, block) in side_chain.iter().enumerate() { - test_harness.send_new_payload(block.clone()).await; - - if index < side_chain.len() - invalid_index - 1 { - test_harness.send_fcu(block.hash(), ForkchoiceStatus::Valid).await; - } - } - - // Try to do a forkchoice update to a block after the invalid one - let fork_tip_hash = side_chain.last().unwrap().hash(); - test_harness.send_fcu(fork_tip_hash, ForkchoiceStatus::Invalid).await; - } -} diff --git a/crates/engine/tree/src/tree/tests.rs b/crates/engine/tree/src/tree/tests.rs new file mode 100644 index 00000000000..caa385a6993 --- /dev/null +++ b/crates/engine/tree/src/tree/tests.rs @@ -0,0 +1,1377 @@ +use super::*; +use crate::persistence::PersistenceAction; +use alloy_consensus::Header; +use alloy_primitives::{ + map::{HashMap, HashSet}, + Bytes, B256, +}; +use alloy_rlp::Decodable; +use alloy_rpc_types_engine::{ + CancunPayloadFields, ExecutionData, ExecutionPayloadSidecar, ExecutionPayloadV1, + ExecutionPayloadV3, +}; +use assert_matches::assert_matches; +use reth_chain_state::{test_utils::TestBlockBuilder, BlockState}; +use reth_chainspec::{ChainSpec, HOLESKY, MAINNET}; +use reth_engine_primitives::ForkchoiceStatus; +use reth_ethereum_consensus::EthBeaconConsensus; +use reth_ethereum_engine_primitives::EthEngineTypes; +use reth_ethereum_primitives::{Block, EthPrimitives}; +use reth_evm_ethereum::MockEvmConfig; +use reth_node_ethereum::EthereumEngineValidator; +use reth_primitives_traits::Block as _; +use reth_provider::test_utils::MockEthProvider; +use reth_trie::HashedPostState; +use std::{ + collections::BTreeMap, + str::FromStr, + sync::mpsc::{channel, Sender}, +}; + +/// This is a test channel that allows you to `release` any value that is in the channel. +/// +/// If nothing has been sent, then the next value will be immediately sent. +struct TestChannel { + /// If an item is sent to this channel, an item will be released in the wrapped channel + release: Receiver<()>, + /// The sender channel + tx: Sender, + /// The receiver channel + rx: Receiver, +} + +impl TestChannel { + /// Creates a new test channel + fn spawn_channel() -> (Sender, Receiver, TestChannelHandle) { + let (original_tx, original_rx) = channel(); + let (wrapped_tx, wrapped_rx) = channel(); + let (release_tx, release_rx) = channel(); + let handle = TestChannelHandle::new(release_tx); + let test_channel = Self { release: release_rx, tx: wrapped_tx, rx: original_rx }; + // spawn the task that listens and releases stuff + std::thread::spawn(move || test_channel.intercept_loop()); + (original_tx, wrapped_rx, handle) + } + + /// Runs the intercept loop, waiting for the handle to release a value + fn intercept_loop(&self) { + while self.release.recv() == Ok(()) { + let Ok(value) = self.rx.recv() else { return }; + + let _ = self.tx.send(value); + } + } +} + +struct TestChannelHandle { + /// The sender to use for releasing values + release: Sender<()>, +} + +impl TestChannelHandle { + /// Returns a [`TestChannelHandle`] + const fn new(release: Sender<()>) -> Self { + Self { release } + } + + /// Signals to the channel task that a value should be released + #[expect(dead_code)] + fn release(&self) { + let _ = self.release.send(()); + } +} + +struct TestHarness { + tree: EngineApiTreeHandler< + EthPrimitives, + MockEthProvider, + EthEngineTypes, + EthereumEngineValidator, + MockEvmConfig, + >, + to_tree_tx: Sender, Block>>, + from_tree_rx: UnboundedReceiver, + blocks: Vec, + action_rx: Receiver, + evm_config: MockEvmConfig, + block_builder: TestBlockBuilder, + provider: MockEthProvider, +} + +impl TestHarness { + fn new(chain_spec: Arc) -> Self { + let (action_tx, action_rx) = channel(); + Self::with_persistence_channel(chain_spec, action_tx, action_rx) + } + + #[expect(dead_code)] + fn with_test_channel(chain_spec: Arc) -> (Self, TestChannelHandle) { + let (action_tx, action_rx, handle) = TestChannel::spawn_channel(); + (Self::with_persistence_channel(chain_spec, action_tx, action_rx), handle) + } + + fn with_persistence_channel( + chain_spec: Arc, + action_tx: Sender, + action_rx: Receiver, + ) -> Self { + let persistence_handle = PersistenceHandle::new(action_tx); + + let consensus = Arc::new(EthBeaconConsensus::new(chain_spec.clone())); + + let provider = MockEthProvider::default(); + + let payload_validator = EthereumEngineValidator::new(chain_spec.clone()); + + let (from_tree_tx, from_tree_rx) = unbounded_channel(); + + let header = chain_spec.genesis_header().clone(); + let header = SealedHeader::seal_slow(header); + let engine_api_tree_state = + EngineApiTreeState::new(10, 10, header.num_hash(), EngineApiKind::Ethereum); + let canonical_in_memory_state = CanonicalInMemoryState::with_head(header, None, None); + + let (to_payload_service, _payload_command_rx) = unbounded_channel(); + let payload_builder = PayloadBuilderHandle::new(to_payload_service); + + let evm_config = MockEvmConfig::default(); + + let tree = EngineApiTreeHandler::new( + provider.clone(), + consensus, + payload_validator, + from_tree_tx, + engine_api_tree_state, + canonical_in_memory_state, + persistence_handle, + PersistenceState::default(), + payload_builder, + // TODO: fix tests for state root task https://github.com/paradigmxyz/reth/issues/14376 + // always assume enough parallelism for tests + TreeConfig::default().with_legacy_state_root(true).with_has_enough_parallelism(true), + EngineApiKind::Ethereum, + evm_config.clone(), + ); + + let block_builder = TestBlockBuilder::default().with_chain_spec((*chain_spec).clone()); + Self { + to_tree_tx: tree.incoming_tx.clone(), + tree, + from_tree_rx, + blocks: vec![], + action_rx, + evm_config, + block_builder, + provider, + } + } + + fn with_blocks(mut self, blocks: Vec) -> Self { + let mut blocks_by_hash = HashMap::default(); + let mut blocks_by_number = BTreeMap::new(); + let mut state_by_hash = HashMap::default(); + let mut hash_by_number = BTreeMap::new(); + let mut parent_to_child: HashMap> = HashMap::default(); + let mut parent_hash = B256::ZERO; + + for block in &blocks { + let sealed_block = block.recovered_block(); + let hash = sealed_block.hash(); + let number = sealed_block.number; + blocks_by_hash.insert(hash, block.clone()); + blocks_by_number.entry(number).or_insert_with(Vec::new).push(block.clone()); + state_by_hash.insert(hash, Arc::new(BlockState::new(block.clone()))); + hash_by_number.insert(number, hash); + parent_to_child.entry(parent_hash).or_default().insert(hash); + parent_hash = hash; + } + + self.tree.state.tree_state = TreeState { + blocks_by_hash, + blocks_by_number, + current_canonical_head: blocks.last().unwrap().recovered_block().num_hash(), + parent_to_child, + persisted_trie_updates: HashMap::default(), + engine_kind: EngineApiKind::Ethereum, + }; + + let last_executed_block = blocks.last().unwrap().clone(); + let pending = Some(BlockState::new(last_executed_block)); + self.tree.canonical_in_memory_state = + CanonicalInMemoryState::new(state_by_hash, hash_by_number, pending, None, None); + + self.blocks = blocks.clone(); + + let recovered_blocks = + blocks.iter().map(|b| b.recovered_block().clone()).collect::>(); + + self.persist_blocks(recovered_blocks); + + self + } + + const fn with_backfill_state(mut self, state: BackfillSyncState) -> Self { + self.tree.backfill_sync_state = state; + self + } + + fn extend_execution_outcome( + &self, + execution_outcomes: impl IntoIterator>, + ) { + self.evm_config.extend(execution_outcomes); + } + + fn insert_block( + &mut self, + block: RecoveredBlock, + ) -> Result> { + let execution_outcome = self.block_builder.get_execution_outcome(block.clone()); + self.extend_execution_outcome([execution_outcome]); + self.tree.provider.add_state_root(block.state_root); + self.tree.insert_block(block) + } + + async fn fcu_to(&mut self, block_hash: B256, fcu_status: impl Into) { + let fcu_status = fcu_status.into(); + + self.send_fcu(block_hash, fcu_status).await; + + self.check_fcu(block_hash, fcu_status).await; + } + + async fn send_fcu(&mut self, block_hash: B256, fcu_status: impl Into) { + let fcu_state = self.fcu_state(block_hash); + + let (tx, rx) = oneshot::channel(); + self.tree + .on_engine_message(FromEngine::Request( + BeaconEngineMessage::ForkchoiceUpdated { + state: fcu_state, + payload_attrs: None, + tx, + version: EngineApiMessageVersion::default(), + } + .into(), + )) + .unwrap(); + + let response = rx.await.unwrap().unwrap().await.unwrap(); + match fcu_status.into() { + ForkchoiceStatus::Valid => assert!(response.payload_status.is_valid()), + ForkchoiceStatus::Syncing => assert!(response.payload_status.is_syncing()), + ForkchoiceStatus::Invalid => assert!(response.payload_status.is_invalid()), + } + } + + async fn check_fcu(&mut self, block_hash: B256, fcu_status: impl Into) { + let fcu_state = self.fcu_state(block_hash); + + // check for ForkchoiceUpdated event + let event = self.from_tree_rx.recv().await.unwrap(); + match event { + EngineApiEvent::BeaconConsensus(BeaconConsensusEngineEvent::ForkchoiceUpdated( + state, + status, + )) => { + assert_eq!(state, fcu_state); + assert_eq!(status, fcu_status.into()); + } + _ => panic!("Unexpected event: {event:#?}"), + } + } + + const fn fcu_state(&self, block_hash: B256) -> ForkchoiceState { + ForkchoiceState { + head_block_hash: block_hash, + safe_block_hash: block_hash, + finalized_block_hash: block_hash, + } + } + + async fn send_new_payload(&mut self, block: RecoveredBlock) { + let payload = ExecutionPayloadV3::from_block_unchecked( + block.hash(), + &block.clone_sealed_block().into_block(), + ); + self.tree + .on_new_payload(ExecutionData { + payload: payload.into(), + sidecar: ExecutionPayloadSidecar::v3(CancunPayloadFields { + parent_beacon_block_root: block.parent_beacon_block_root.unwrap(), + versioned_hashes: vec![], + }), + }) + .unwrap(); + } + + async fn insert_chain( + &mut self, + chain: impl IntoIterator> + Clone, + ) { + for block in chain.clone() { + self.insert_block(block.clone()).unwrap(); + } + self.check_canon_chain_insertion(chain).await; + } + + async fn check_canon_commit(&mut self, hash: B256) { + let event = self.from_tree_rx.recv().await.unwrap(); + match event { + EngineApiEvent::BeaconConsensus( + BeaconConsensusEngineEvent::CanonicalChainCommitted(header, _), + ) => { + assert_eq!(header.hash(), hash); + } + _ => panic!("Unexpected event: {event:#?}"), + } + } + + async fn check_fork_chain_insertion( + &mut self, + chain: impl IntoIterator> + Clone, + ) { + for block in chain { + self.check_fork_block_added(block.hash()).await; + } + } + + async fn check_canon_chain_insertion( + &mut self, + chain: impl IntoIterator> + Clone, + ) { + for block in chain.clone() { + self.check_canon_block_added(block.hash()).await; + } + } + + async fn check_canon_block_added(&mut self, expected_hash: B256) { + let event = self.from_tree_rx.recv().await.unwrap(); + match event { + EngineApiEvent::BeaconConsensus(BeaconConsensusEngineEvent::CanonicalBlockAdded( + executed, + _, + )) => { + assert_eq!(executed.recovered_block.hash(), expected_hash); + } + _ => panic!("Unexpected event: {event:#?}"), + } + } + + async fn check_fork_block_added(&mut self, expected_hash: B256) { + let event = self.from_tree_rx.recv().await.unwrap(); + match event { + EngineApiEvent::BeaconConsensus(BeaconConsensusEngineEvent::ForkBlockAdded( + executed, + _, + )) => { + assert_eq!(executed.recovered_block.hash(), expected_hash); + } + _ => panic!("Unexpected event: {event:#?}"), + } + } + + async fn check_invalid_block(&mut self, expected_hash: B256) { + let event = self.from_tree_rx.recv().await.unwrap(); + match event { + EngineApiEvent::BeaconConsensus(BeaconConsensusEngineEvent::InvalidBlock(block)) => { + assert_eq!(block.hash(), expected_hash); + } + _ => panic!("Unexpected event: {event:#?}"), + } + } + + fn persist_blocks(&self, blocks: Vec>) { + let mut block_data: Vec<(B256, Block)> = Vec::with_capacity(blocks.len()); + let mut headers_data: Vec<(B256, Header)> = Vec::with_capacity(blocks.len()); + + for block in &blocks { + block_data.push((block.hash(), block.clone_block())); + headers_data.push((block.hash(), block.header().clone())); + } + + self.provider.extend_blocks(block_data); + self.provider.extend_headers(headers_data); + } + + fn setup_range_insertion_for_valid_chain( + &mut self, + chain: Vec>, + ) { + self.setup_range_insertion_for_chain(chain, None) + } + + fn setup_range_insertion_for_invalid_chain( + &mut self, + chain: Vec>, + index: usize, + ) { + self.setup_range_insertion_for_chain(chain, Some(index)) + } + + fn setup_range_insertion_for_chain( + &mut self, + chain: Vec>, + invalid_index: Option, + ) { + // setting up execution outcomes for the chain, the blocks will be + // executed starting from the oldest, so we need to reverse. + let mut chain_rev = chain; + chain_rev.reverse(); + + let mut execution_outcomes = Vec::with_capacity(chain_rev.len()); + for (index, block) in chain_rev.iter().enumerate() { + let execution_outcome = self.block_builder.get_execution_outcome(block.clone()); + let state_root = if invalid_index.is_some() && invalid_index.unwrap() == index { + B256::random() + } else { + block.state_root + }; + self.tree.provider.add_state_root(state_root); + execution_outcomes.push(execution_outcome); + } + self.extend_execution_outcome(execution_outcomes); + } + + fn check_canon_head(&self, head_hash: B256) { + assert_eq!(self.tree.state.tree_state.canonical_head().hash, head_hash); + } +} + +#[test] +fn test_tree_persist_block_batch() { + let tree_config = TreeConfig::default(); + let chain_spec = MAINNET.clone(); + let mut test_block_builder = TestBlockBuilder::eth().with_chain_spec((*chain_spec).clone()); + + // we need more than tree_config.persistence_threshold() +1 blocks to + // trigger the persistence task. + let blocks: Vec<_> = test_block_builder + .get_executed_blocks(1..tree_config.persistence_threshold() + 2) + .collect(); + let mut test_harness = TestHarness::new(chain_spec).with_blocks(blocks); + + let mut blocks = vec![]; + for idx in 0..tree_config.max_execute_block_batch_size() * 2 { + blocks.push(test_block_builder.generate_random_block(idx as u64, B256::random())); + } + + test_harness.to_tree_tx.send(FromEngine::DownloadedBlocks(blocks)).unwrap(); + + // process the message + let msg = test_harness.tree.try_recv_engine_message().unwrap().unwrap(); + test_harness.tree.on_engine_message(msg).unwrap(); + + // we now should receive the other batch + let msg = test_harness.tree.try_recv_engine_message().unwrap().unwrap(); + match msg { + FromEngine::DownloadedBlocks(blocks) => { + assert_eq!(blocks.len(), tree_config.max_execute_block_batch_size()); + } + _ => panic!("unexpected message: {msg:#?}"), + } +} + +#[tokio::test] +async fn test_tree_persist_blocks() { + let tree_config = TreeConfig::default(); + let chain_spec = MAINNET.clone(); + let mut test_block_builder = TestBlockBuilder::eth().with_chain_spec((*chain_spec).clone()); + + // we need more than tree_config.persistence_threshold() +1 blocks to + // trigger the persistence task. + let blocks: Vec<_> = test_block_builder + .get_executed_blocks(1..tree_config.persistence_threshold() + 2) + .collect(); + let test_harness = TestHarness::new(chain_spec).with_blocks(blocks.clone()); + std::thread::Builder::new() + .name("Tree Task".to_string()) + .spawn(|| test_harness.tree.run()) + .unwrap(); + + // send a message to the tree to enter the main loop. + test_harness.to_tree_tx.send(FromEngine::DownloadedBlocks(vec![])).unwrap(); + + let received_action = + test_harness.action_rx.recv().expect("Failed to receive save blocks action"); + if let PersistenceAction::SaveBlocks(saved_blocks, _) = received_action { + // only blocks.len() - tree_config.memory_block_buffer_target() will be + // persisted + let expected_persist_len = blocks.len() - tree_config.memory_block_buffer_target() as usize; + assert_eq!(saved_blocks.len(), expected_persist_len); + assert_eq!(saved_blocks, blocks[..expected_persist_len]); + } else { + panic!("unexpected action received {received_action:?}"); + } +} + +#[tokio::test] +async fn test_in_memory_state_trait_impl() { + let blocks: Vec<_> = TestBlockBuilder::eth().get_executed_blocks(0..10).collect(); + let test_harness = TestHarness::new(MAINNET.clone()).with_blocks(blocks.clone()); + + for executed_block in blocks { + let sealed_block = executed_block.recovered_block(); + + let expected_state = BlockState::new(executed_block.clone()); + + let actual_state_by_hash = + test_harness.tree.canonical_in_memory_state.state_by_hash(sealed_block.hash()).unwrap(); + assert_eq!(expected_state, *actual_state_by_hash); + + let actual_state_by_number = test_harness + .tree + .canonical_in_memory_state + .state_by_number(sealed_block.number) + .unwrap(); + assert_eq!(expected_state, *actual_state_by_number); + } +} + +#[tokio::test] +async fn test_engine_request_during_backfill() { + let tree_config = TreeConfig::default(); + let blocks: Vec<_> = TestBlockBuilder::eth() + .get_executed_blocks(0..tree_config.persistence_threshold()) + .collect(); + let mut test_harness = TestHarness::new(MAINNET.clone()) + .with_blocks(blocks) + .with_backfill_state(BackfillSyncState::Active); + + let (tx, rx) = oneshot::channel(); + test_harness + .tree + .on_engine_message(FromEngine::Request( + BeaconEngineMessage::ForkchoiceUpdated { + state: ForkchoiceState { + head_block_hash: B256::random(), + safe_block_hash: B256::random(), + finalized_block_hash: B256::random(), + }, + payload_attrs: None, + tx, + version: EngineApiMessageVersion::default(), + } + .into(), + )) + .unwrap(); + + let resp = rx.await.unwrap().unwrap().await.unwrap(); + assert!(resp.payload_status.is_syncing()); +} + +#[test] +fn test_disconnected_payload() { + let s = include_str!("../../test-data/holesky/2.rlp"); + let data = Bytes::from_str(s).unwrap(); + let block = Block::decode(&mut data.as_ref()).unwrap(); + let sealed = block.seal_slow(); + let hash = sealed.hash(); + let payload = ExecutionPayloadV1::from_block_unchecked(hash, &sealed.clone().into_block()); + + let mut test_harness = TestHarness::new(HOLESKY.clone()); + + let outcome = test_harness + .tree + .on_new_payload(ExecutionData { + payload: payload.into(), + sidecar: ExecutionPayloadSidecar::none(), + }) + .unwrap(); + assert!(outcome.outcome.is_syncing()); + + // ensure block is buffered + let buffered = test_harness.tree.state.buffer.block(&hash).unwrap(); + assert_eq!(buffered.clone_sealed_block(), sealed); +} + +#[test] +fn test_disconnected_block() { + let s = include_str!("../../test-data/holesky/2.rlp"); + let data = Bytes::from_str(s).unwrap(); + let block = Block::decode(&mut data.as_ref()).unwrap(); + let sealed = block.seal_slow().try_recover().unwrap(); + + let mut test_harness = TestHarness::new(HOLESKY.clone()); + + let outcome = test_harness.tree.insert_block(sealed.clone()).unwrap(); + assert_eq!( + outcome, + InsertPayloadOk::Inserted(BlockStatus::Disconnected { + head: test_harness.tree.state.tree_state.current_canonical_head, + missing_ancestor: sealed.parent_num_hash() + }) + ); +} + +#[tokio::test] +async fn test_holesky_payload() { + let s = include_str!("../../test-data/holesky/1.rlp"); + let data = Bytes::from_str(s).unwrap(); + let block: Block = Block::decode(&mut data.as_ref()).unwrap(); + let sealed = block.seal_slow(); + let payload = + ExecutionPayloadV1::from_block_unchecked(sealed.hash(), &sealed.clone().into_block()); + + let mut test_harness = + TestHarness::new(HOLESKY.clone()).with_backfill_state(BackfillSyncState::Active); + + let (tx, rx) = oneshot::channel(); + test_harness + .tree + .on_engine_message(FromEngine::Request( + BeaconEngineMessage::NewPayload { + payload: ExecutionData { + payload: payload.clone().into(), + sidecar: ExecutionPayloadSidecar::none(), + }, + tx, + } + .into(), + )) + .unwrap(); + + let resp = rx.await.unwrap().unwrap(); + assert!(resp.is_syncing()); +} + +#[tokio::test] +async fn test_tree_state_on_new_head_reorg() { + reth_tracing::init_test_tracing(); + let chain_spec = MAINNET.clone(); + + // Set persistence_threshold to 1 + let mut test_harness = TestHarness::new(chain_spec); + test_harness.tree.config = + test_harness.tree.config.with_persistence_threshold(1).with_memory_block_buffer_target(1); + let mut test_block_builder = TestBlockBuilder::eth(); + let blocks: Vec<_> = test_block_builder.get_executed_blocks(1..6).collect(); + + for block in &blocks { + test_harness.tree.state.tree_state.insert_executed(block.clone()); + } + + // set block 3 as the current canonical head + test_harness.tree.state.tree_state.set_canonical_head(blocks[2].recovered_block().num_hash()); + + // create a fork from block 2 + let fork_block_3 = + test_block_builder.get_executed_block_with_number(3, blocks[1].recovered_block().hash()); + let fork_block_4 = + test_block_builder.get_executed_block_with_number(4, fork_block_3.recovered_block().hash()); + let fork_block_5 = + test_block_builder.get_executed_block_with_number(5, fork_block_4.recovered_block().hash()); + + test_harness.tree.state.tree_state.insert_executed(fork_block_3.clone()); + test_harness.tree.state.tree_state.insert_executed(fork_block_4.clone()); + test_harness.tree.state.tree_state.insert_executed(fork_block_5.clone()); + + // normal (non-reorg) case + let result = test_harness.tree.on_new_head(blocks[4].recovered_block().hash()).unwrap(); + assert!(matches!(result, Some(NewCanonicalChain::Commit { .. }))); + if let Some(NewCanonicalChain::Commit { new }) = result { + assert_eq!(new.len(), 2); + assert_eq!(new[0].recovered_block().hash(), blocks[3].recovered_block().hash()); + assert_eq!(new[1].recovered_block().hash(), blocks[4].recovered_block().hash()); + } + + // should be a None persistence action before we advance persistence + let current_action = test_harness.tree.persistence_state.current_action(); + assert_eq!(current_action, None); + + // let's attempt to persist and check that it attempts to save blocks + // + // since in-memory block buffer target and persistence_threshold are both 1, this should + // save all but the current tip of the canonical chain (up to blocks[1]) + test_harness.tree.advance_persistence().unwrap(); + let current_action = test_harness.tree.persistence_state.current_action().cloned(); + assert_eq!( + current_action, + Some(CurrentPersistenceAction::SavingBlocks { + highest: blocks[1].recovered_block().num_hash() + }) + ); + + // get rid of the prev action + let received_action = test_harness.action_rx.recv().unwrap(); + let PersistenceAction::SaveBlocks(saved_blocks, sender) = received_action else { + panic!("received wrong action"); + }; + assert_eq!(saved_blocks, vec![blocks[0].clone(), blocks[1].clone()]); + + // send the response so we can advance again + sender.send(Some(blocks[1].recovered_block().num_hash())).unwrap(); + + // we should be persisting blocks[1] because we threw out the prev action + let current_action = test_harness.tree.persistence_state.current_action().cloned(); + assert_eq!( + current_action, + Some(CurrentPersistenceAction::SavingBlocks { + highest: blocks[1].recovered_block().num_hash() + }) + ); + + // after advancing persistence, we should be at `None` for the next action + test_harness.tree.advance_persistence().unwrap(); + let current_action = test_harness.tree.persistence_state.current_action().cloned(); + assert_eq!(current_action, None); + + // reorg case + let result = test_harness.tree.on_new_head(fork_block_5.recovered_block().hash()).unwrap(); + assert!(matches!(result, Some(NewCanonicalChain::Reorg { .. }))); + + if let Some(NewCanonicalChain::Reorg { new, old }) = result { + assert_eq!(new.len(), 3); + assert_eq!(new[0].recovered_block().hash(), fork_block_3.recovered_block().hash()); + assert_eq!(new[1].recovered_block().hash(), fork_block_4.recovered_block().hash()); + assert_eq!(new[2].recovered_block().hash(), fork_block_5.recovered_block().hash()); + + assert_eq!(old.len(), 1); + assert_eq!(old[0].recovered_block().hash(), blocks[2].recovered_block().hash()); + } + + // The canonical block has not changed, so we will not get any active persistence action + test_harness.tree.advance_persistence().unwrap(); + let current_action = test_harness.tree.persistence_state.current_action().cloned(); + assert_eq!(current_action, None); + + // Let's change the canonical head and advance persistence + test_harness + .tree + .state + .tree_state + .set_canonical_head(fork_block_5.recovered_block().num_hash()); + + // The canonical block has changed now, we should get fork_block_4 due to the persistence + // threshold and in memory block buffer target + test_harness.tree.advance_persistence().unwrap(); + let current_action = test_harness.tree.persistence_state.current_action().cloned(); + assert_eq!( + current_action, + Some(CurrentPersistenceAction::SavingBlocks { + highest: fork_block_4.recovered_block().num_hash() + }) + ); +} + +#[test] +fn test_tree_state_on_new_head_deep_fork() { + reth_tracing::init_test_tracing(); + + let chain_spec = MAINNET.clone(); + let mut test_harness = TestHarness::new(chain_spec); + let mut test_block_builder = TestBlockBuilder::eth(); + + let blocks: Vec<_> = test_block_builder.get_executed_blocks(0..5).collect(); + + for block in &blocks { + test_harness.tree.state.tree_state.insert_executed(block.clone()); + } + + // set last block as the current canonical head + let last_block = blocks.last().unwrap().recovered_block().clone(); + + test_harness.tree.state.tree_state.set_canonical_head(last_block.num_hash()); + + // create a fork chain from last_block + let chain_a = test_block_builder.create_fork(&last_block, 10); + let chain_b = test_block_builder.create_fork(&last_block, 10); + + for block in &chain_a { + test_harness.tree.state.tree_state.insert_executed(ExecutedBlockWithTrieUpdates { + block: ExecutedBlock { + recovered_block: Arc::new(block.clone()), + execution_output: Arc::new(ExecutionOutcome::default()), + hashed_state: Arc::new(HashedPostState::default()), + }, + trie: ExecutedTrieUpdates::empty(), + }); + } + test_harness.tree.state.tree_state.set_canonical_head(chain_a.last().unwrap().num_hash()); + + for block in &chain_b { + test_harness.tree.state.tree_state.insert_executed(ExecutedBlockWithTrieUpdates { + block: ExecutedBlock { + recovered_block: Arc::new(block.clone()), + execution_output: Arc::new(ExecutionOutcome::default()), + hashed_state: Arc::new(HashedPostState::default()), + }, + trie: ExecutedTrieUpdates::empty(), + }); + } + + // for each block in chain_b, reorg to it and then back to canonical + let mut expected_new = Vec::new(); + for block in &chain_b { + // reorg to chain from block b + let result = test_harness.tree.on_new_head(block.hash()).unwrap(); + assert_matches!(result, Some(NewCanonicalChain::Reorg { .. })); + + expected_new.push(block); + if let Some(NewCanonicalChain::Reorg { new, old }) = result { + assert_eq!(new.len(), expected_new.len()); + for (index, block) in expected_new.iter().enumerate() { + assert_eq!(new[index].recovered_block().hash(), block.hash()); + } + + assert_eq!(old.len(), chain_a.len()); + for (index, block) in chain_a.iter().enumerate() { + assert_eq!(old[index].recovered_block().hash(), block.hash()); + } + } + + // set last block of chain a as canonical head + test_harness.tree.on_new_head(chain_a.last().unwrap().hash()).unwrap(); + } +} + +#[tokio::test] +async fn test_get_canonical_blocks_to_persist() { + let chain_spec = MAINNET.clone(); + let mut test_harness = TestHarness::new(chain_spec); + let mut test_block_builder = TestBlockBuilder::eth(); + + let canonical_head_number = 9; + let blocks: Vec<_> = + test_block_builder.get_executed_blocks(0..canonical_head_number + 1).collect(); + test_harness = test_harness.with_blocks(blocks.clone()); + + let last_persisted_block_number = 3; + test_harness.tree.persistence_state.last_persisted_block = + blocks[last_persisted_block_number as usize].recovered_block.num_hash(); + + let persistence_threshold = 4; + let memory_block_buffer_target = 3; + test_harness.tree.config = TreeConfig::default() + .with_persistence_threshold(persistence_threshold) + .with_memory_block_buffer_target(memory_block_buffer_target); + + let blocks_to_persist = test_harness.tree.get_canonical_blocks_to_persist().unwrap(); + + let expected_blocks_to_persist_length: usize = + (canonical_head_number - memory_block_buffer_target - last_persisted_block_number) + .try_into() + .unwrap(); + + assert_eq!(blocks_to_persist.len(), expected_blocks_to_persist_length); + for (i, item) in blocks_to_persist.iter().enumerate().take(expected_blocks_to_persist_length) { + assert_eq!(item.recovered_block().number, last_persisted_block_number + i as u64 + 1); + } + + // make sure only canonical blocks are included + let fork_block = test_block_builder.get_executed_block_with_number(4, B256::random()); + let fork_block_hash = fork_block.recovered_block().hash(); + test_harness.tree.state.tree_state.insert_executed(fork_block); + + assert!(test_harness.tree.state.tree_state.block_by_hash(fork_block_hash).is_some()); + + let blocks_to_persist = test_harness.tree.get_canonical_blocks_to_persist().unwrap(); + assert_eq!(blocks_to_persist.len(), expected_blocks_to_persist_length); + + // check that the fork block is not included in the blocks to persist + assert!(!blocks_to_persist.iter().any(|b| b.recovered_block().hash() == fork_block_hash)); + + // check that the original block 4 is still included + assert!(blocks_to_persist.iter().any(|b| b.recovered_block().number == 4 && + b.recovered_block().hash() == blocks[4].recovered_block().hash())); + + // check that if we advance persistence, the persistence action is the correct value + test_harness.tree.advance_persistence().expect("advancing persistence should succeed"); + assert_eq!( + test_harness.tree.persistence_state.current_action().cloned(), + Some(CurrentPersistenceAction::SavingBlocks { + highest: blocks_to_persist.last().unwrap().recovered_block().num_hash() + }) + ); +} + +#[tokio::test] +async fn test_engine_tree_fcu_missing_head() { + let chain_spec = MAINNET.clone(); + let mut test_harness = TestHarness::new(chain_spec.clone()); + + let mut test_block_builder = TestBlockBuilder::eth().with_chain_spec((*chain_spec).clone()); + + let blocks: Vec<_> = test_block_builder.get_executed_blocks(0..5).collect(); + test_harness = test_harness.with_blocks(blocks); + + let missing_block = test_block_builder + .generate_random_block(6, test_harness.blocks.last().unwrap().recovered_block().hash()); + + test_harness.fcu_to(missing_block.hash(), PayloadStatusEnum::Syncing).await; + + // after FCU we receive an EngineApiEvent::Download event to get the missing block. + let event = test_harness.from_tree_rx.recv().await.unwrap(); + match event { + EngineApiEvent::Download(DownloadRequest::BlockSet(actual_block_set)) => { + let expected_block_set = HashSet::from_iter([missing_block.hash()]); + assert_eq!(actual_block_set, expected_block_set); + } + _ => panic!("Unexpected event: {event:#?}"), + } +} + +#[tokio::test] +async fn test_engine_tree_fcu_canon_chain_insertion() { + let chain_spec = MAINNET.clone(); + let mut test_harness = TestHarness::new(chain_spec.clone()); + + let base_chain: Vec<_> = test_harness.block_builder.get_executed_blocks(0..1).collect(); + test_harness = test_harness.with_blocks(base_chain.clone()); + + test_harness + .fcu_to(base_chain.last().unwrap().recovered_block().hash(), ForkchoiceStatus::Valid) + .await; + + // extend main chain + let main_chain = test_harness.block_builder.create_fork(base_chain[0].recovered_block(), 3); + + test_harness.insert_chain(main_chain).await; +} + +#[tokio::test] +async fn test_engine_tree_fcu_reorg_with_all_blocks() { + let chain_spec = MAINNET.clone(); + let mut test_harness = TestHarness::new(chain_spec.clone()); + + let main_chain: Vec<_> = test_harness.block_builder.get_executed_blocks(0..5).collect(); + test_harness = test_harness.with_blocks(main_chain.clone()); + + let fork_chain = test_harness.block_builder.create_fork(main_chain[2].recovered_block(), 3); + let fork_chain_last_hash = fork_chain.last().unwrap().hash(); + + // add fork blocks to the tree + for block in &fork_chain { + test_harness.insert_block(block.clone()).unwrap(); + } + + test_harness.send_fcu(fork_chain_last_hash, ForkchoiceStatus::Valid).await; + + // check for ForkBlockAdded events, we expect fork_chain.len() blocks added + test_harness.check_fork_chain_insertion(fork_chain.clone()).await; + + // check for CanonicalChainCommitted event + test_harness.check_canon_commit(fork_chain_last_hash).await; + + test_harness.check_fcu(fork_chain_last_hash, ForkchoiceStatus::Valid).await; + + // new head is the tip of the fork chain + test_harness.check_canon_head(fork_chain_last_hash); +} + +#[tokio::test] +async fn test_engine_tree_live_sync_transition_required_blocks_requested() { + reth_tracing::init_test_tracing(); + + let chain_spec = MAINNET.clone(); + let mut test_harness = TestHarness::new(chain_spec.clone()); + + let base_chain: Vec<_> = test_harness.block_builder.get_executed_blocks(0..1).collect(); + test_harness = test_harness.with_blocks(base_chain.clone()); + + test_harness + .fcu_to(base_chain.last().unwrap().recovered_block().hash(), ForkchoiceStatus::Valid) + .await; + + // extend main chain with enough blocks to trigger pipeline run but don't insert them + let main_chain = test_harness + .block_builder + .create_fork(base_chain[0].recovered_block(), MIN_BLOCKS_FOR_PIPELINE_RUN + 10); + + let main_chain_last_hash = main_chain.last().unwrap().hash(); + test_harness.send_fcu(main_chain_last_hash, ForkchoiceStatus::Syncing).await; + + test_harness.check_fcu(main_chain_last_hash, ForkchoiceStatus::Syncing).await; + + // create event for backfill finished + let backfill_finished_block_number = MIN_BLOCKS_FOR_PIPELINE_RUN + 1; + let backfill_finished = FromOrchestrator::BackfillSyncFinished(ControlFlow::Continue { + block_number: backfill_finished_block_number, + }); + + let backfill_tip_block = main_chain[(backfill_finished_block_number - 1) as usize].clone(); + // add block to mock provider to enable persistence clean up. + test_harness.provider.add_block(backfill_tip_block.hash(), backfill_tip_block.into_block()); + test_harness.tree.on_engine_message(FromEngine::Event(backfill_finished)).unwrap(); + + let event = test_harness.from_tree_rx.recv().await.unwrap(); + match event { + EngineApiEvent::Download(DownloadRequest::BlockSet(hash_set)) => { + assert_eq!(hash_set, HashSet::from_iter([main_chain_last_hash])); + } + _ => panic!("Unexpected event: {event:#?}"), + } + + test_harness + .tree + .on_engine_message(FromEngine::DownloadedBlocks(vec![main_chain.last().unwrap().clone()])) + .unwrap(); + + let event = test_harness.from_tree_rx.recv().await.unwrap(); + match event { + EngineApiEvent::Download(DownloadRequest::BlockRange(initial_hash, total_blocks)) => { + assert_eq!( + total_blocks, + (main_chain.len() - backfill_finished_block_number as usize - 1) as u64 + ); + assert_eq!(initial_hash, main_chain.last().unwrap().parent_hash); + } + _ => panic!("Unexpected event: {event:#?}"), + } +} + +#[tokio::test] +async fn test_engine_tree_live_sync_transition_eventually_canonical() { + reth_tracing::init_test_tracing(); + + let chain_spec = MAINNET.clone(); + let mut test_harness = TestHarness::new(chain_spec.clone()); + test_harness.tree.config = test_harness.tree.config.with_max_execute_block_batch_size(100); + + // create base chain and setup test harness with it + let base_chain: Vec<_> = test_harness.block_builder.get_executed_blocks(0..1).collect(); + test_harness = test_harness.with_blocks(base_chain.clone()); + + // fcu to the tip of base chain + test_harness + .fcu_to(base_chain.last().unwrap().recovered_block().hash(), ForkchoiceStatus::Valid) + .await; + + // create main chain, extension of base chain, with enough blocks to + // trigger backfill sync + let main_chain = test_harness + .block_builder + .create_fork(base_chain[0].recovered_block(), MIN_BLOCKS_FOR_PIPELINE_RUN + 10); + + let main_chain_last = main_chain.last().unwrap(); + let main_chain_last_hash = main_chain_last.hash(); + let main_chain_backfill_target = main_chain.get(MIN_BLOCKS_FOR_PIPELINE_RUN as usize).unwrap(); + let main_chain_backfill_target_hash = main_chain_backfill_target.hash(); + + // fcu to the element of main chain that should trigger backfill sync + test_harness.send_fcu(main_chain_backfill_target_hash, ForkchoiceStatus::Syncing).await; + test_harness.check_fcu(main_chain_backfill_target_hash, ForkchoiceStatus::Syncing).await; + + // check download request for target + let event = test_harness.from_tree_rx.recv().await.unwrap(); + match event { + EngineApiEvent::Download(DownloadRequest::BlockSet(hash_set)) => { + assert_eq!(hash_set, HashSet::from_iter([main_chain_backfill_target_hash])); + } + _ => panic!("Unexpected event: {event:#?}"), + } + + // send message to tell the engine the requested block was downloaded + test_harness + .tree + .on_engine_message(FromEngine::DownloadedBlocks(vec![main_chain_backfill_target.clone()])) + .unwrap(); + + // check that backfill is triggered + let event = test_harness.from_tree_rx.recv().await.unwrap(); + match event { + EngineApiEvent::BackfillAction(BackfillAction::Start( + reth_stages::PipelineTarget::Sync(target_hash), + )) => { + assert_eq!(target_hash, main_chain_backfill_target_hash); + } + _ => panic!("Unexpected event: {event:#?}"), + } + + // persist blocks of main chain, same as the backfill operation would do + let backfilled_chain: Vec<_> = + main_chain.clone().drain(0..(MIN_BLOCKS_FOR_PIPELINE_RUN + 1) as usize).collect(); + test_harness.persist_blocks(backfilled_chain.clone()); + + test_harness.setup_range_insertion_for_valid_chain(backfilled_chain); + + // send message to mark backfill finished + test_harness + .tree + .on_engine_message(FromEngine::Event(FromOrchestrator::BackfillSyncFinished( + ControlFlow::Continue { block_number: main_chain_backfill_target.number }, + ))) + .unwrap(); + + // send fcu to the tip of main + test_harness.fcu_to(main_chain_last_hash, ForkchoiceStatus::Syncing).await; + + let event = test_harness.from_tree_rx.recv().await.unwrap(); + match event { + EngineApiEvent::Download(DownloadRequest::BlockSet(target_hash)) => { + assert_eq!(target_hash, HashSet::from_iter([main_chain_last_hash])); + } + _ => panic!("Unexpected event: {event:#?}"), + } + + // tell engine main chain tip downloaded + test_harness + .tree + .on_engine_message(FromEngine::DownloadedBlocks(vec![main_chain_last.clone()])) + .unwrap(); + + // check download range request + let event = test_harness.from_tree_rx.recv().await.unwrap(); + match event { + EngineApiEvent::Download(DownloadRequest::BlockRange(initial_hash, total_blocks)) => { + assert_eq!( + total_blocks, + (main_chain.len() - MIN_BLOCKS_FOR_PIPELINE_RUN as usize - 2) as u64 + ); + assert_eq!(initial_hash, main_chain_last.parent_hash); + } + _ => panic!("Unexpected event: {event:#?}"), + } + + let remaining: Vec<_> = main_chain + .clone() + .drain((MIN_BLOCKS_FOR_PIPELINE_RUN + 1) as usize..main_chain.len()) + .collect(); + + test_harness.setup_range_insertion_for_valid_chain(remaining.clone()); + + // tell engine block range downloaded + test_harness.tree.on_engine_message(FromEngine::DownloadedBlocks(remaining.clone())).unwrap(); + + test_harness.check_canon_chain_insertion(remaining).await; + + // check canonical chain committed event with the hash of the latest block + test_harness.check_canon_commit(main_chain_last_hash).await; + + // new head is the tip of the main chain + test_harness.check_canon_head(main_chain_last_hash); +} + +#[tokio::test] +async fn test_engine_tree_live_sync_fcu_extends_canon_chain() { + reth_tracing::init_test_tracing(); + + let chain_spec = MAINNET.clone(); + let mut test_harness = TestHarness::new(chain_spec.clone()); + + // create base chain and setup test harness with it + let base_chain: Vec<_> = test_harness.block_builder.get_executed_blocks(0..1).collect(); + test_harness = test_harness.with_blocks(base_chain.clone()); + + // fcu to the tip of base chain + test_harness + .fcu_to(base_chain.last().unwrap().recovered_block().hash(), ForkchoiceStatus::Valid) + .await; + + // create main chain, extension of base chain + let main_chain = test_harness.block_builder.create_fork(base_chain[0].recovered_block(), 10); + // determine target in the middle of main hain + let target = main_chain.get(5).unwrap(); + let target_hash = target.hash(); + let main_last = main_chain.last().unwrap(); + let main_last_hash = main_last.hash(); + + // insert main chain + test_harness.insert_chain(main_chain).await; + + // send fcu to target + test_harness.send_fcu(target_hash, ForkchoiceStatus::Valid).await; + + test_harness.check_canon_commit(target_hash).await; + test_harness.check_fcu(target_hash, ForkchoiceStatus::Valid).await; + + // send fcu to main tip + test_harness.send_fcu(main_last_hash, ForkchoiceStatus::Valid).await; + + test_harness.check_canon_commit(main_last_hash).await; + test_harness.check_fcu(main_last_hash, ForkchoiceStatus::Valid).await; + test_harness.check_canon_head(main_last_hash); +} + +#[tokio::test] +async fn test_engine_tree_valid_forks_with_older_canonical_head() { + reth_tracing::init_test_tracing(); + + let chain_spec = MAINNET.clone(); + let mut test_harness = TestHarness::new(chain_spec.clone()); + + // create base chain and setup test harness with it + let base_chain: Vec<_> = test_harness.block_builder.get_executed_blocks(0..1).collect(); + test_harness = test_harness.with_blocks(base_chain.clone()); + + let old_head = base_chain.first().unwrap().recovered_block(); + + // extend base chain + let extension_chain = test_harness.block_builder.create_fork(old_head, 5); + let fork_block = extension_chain.last().unwrap().clone_sealed_block(); + + test_harness.setup_range_insertion_for_valid_chain(extension_chain.clone()); + test_harness.insert_chain(extension_chain).await; + + // fcu to old_head + test_harness.fcu_to(old_head.hash(), ForkchoiceStatus::Valid).await; + + // create two competing chains starting from fork_block + let chain_a = test_harness.block_builder.create_fork(&fork_block, 10); + let chain_b = test_harness.block_builder.create_fork(&fork_block, 10); + + // insert chain A blocks using newPayload + test_harness.setup_range_insertion_for_valid_chain(chain_a.clone()); + for block in &chain_a { + test_harness.send_new_payload(block.clone()).await; + } + + test_harness.check_canon_chain_insertion(chain_a.clone()).await; + + // insert chain B blocks using newPayload + test_harness.setup_range_insertion_for_valid_chain(chain_b.clone()); + for block in &chain_b { + test_harness.send_new_payload(block.clone()).await; + } + + test_harness.check_canon_chain_insertion(chain_b.clone()).await; + + // send FCU to make the tip of chain B the new head + let chain_b_tip_hash = chain_b.last().unwrap().hash(); + test_harness.send_fcu(chain_b_tip_hash, ForkchoiceStatus::Valid).await; + + // check for CanonicalChainCommitted event + test_harness.check_canon_commit(chain_b_tip_hash).await; + + // verify FCU was processed + test_harness.check_fcu(chain_b_tip_hash, ForkchoiceStatus::Valid).await; + + // verify the new canonical head + test_harness.check_canon_head(chain_b_tip_hash); + + // verify that chain A is now considered a fork + assert!(test_harness.tree.is_fork(chain_a.last().unwrap().sealed_header()).unwrap()); +} + +#[tokio::test] +async fn test_engine_tree_buffered_blocks_are_eventually_connected() { + let chain_spec = MAINNET.clone(); + let mut test_harness = TestHarness::new(chain_spec.clone()); + + let base_chain: Vec<_> = test_harness.block_builder.get_executed_blocks(0..1).collect(); + test_harness = test_harness.with_blocks(base_chain.clone()); + + // side chain consisting of two blocks, the last will be inserted first + // so that we force it to be buffered + let side_chain = + test_harness.block_builder.create_fork(base_chain.last().unwrap().recovered_block(), 2); + + // buffer last block of side chain + let buffered_block = side_chain.last().unwrap(); + let buffered_block_hash = buffered_block.hash(); + + test_harness.setup_range_insertion_for_valid_chain(vec![buffered_block.clone()]); + test_harness.send_new_payload(buffered_block.clone()).await; + + assert!(test_harness.tree.state.buffer.block(&buffered_block_hash).is_some()); + + let non_buffered_block = side_chain.first().unwrap(); + let non_buffered_block_hash = non_buffered_block.hash(); + + // insert block that continues the canon chain, should not be buffered + test_harness.setup_range_insertion_for_valid_chain(vec![non_buffered_block.clone()]); + test_harness.send_new_payload(non_buffered_block.clone()).await; + assert!(test_harness.tree.state.buffer.block(&non_buffered_block_hash).is_none()); + + // the previously buffered block should be connected now + assert!(test_harness.tree.state.buffer.block(&buffered_block_hash).is_none()); + + // both blocks are added to the canon chain in order + test_harness.check_canon_block_added(non_buffered_block_hash).await; + test_harness.check_canon_block_added(buffered_block_hash).await; +} + +#[tokio::test] +async fn test_engine_tree_valid_and_invalid_forks_with_older_canonical_head() { + reth_tracing::init_test_tracing(); + + let chain_spec = MAINNET.clone(); + let mut test_harness = TestHarness::new(chain_spec.clone()); + + // create base chain and setup test harness with it + let base_chain: Vec<_> = test_harness.block_builder.get_executed_blocks(0..1).collect(); + test_harness = test_harness.with_blocks(base_chain.clone()); + + let old_head = base_chain.first().unwrap().recovered_block(); + + // extend base chain + let extension_chain = test_harness.block_builder.create_fork(old_head, 5); + let fork_block = extension_chain.last().unwrap().clone_sealed_block(); + test_harness.insert_chain(extension_chain).await; + + // fcu to old_head + test_harness.fcu_to(old_head.hash(), ForkchoiceStatus::Valid).await; + + // create two competing chains starting from fork_block, one of them invalid + let total_fork_elements = 10; + let chain_a = test_harness.block_builder.create_fork(&fork_block, total_fork_elements); + let chain_b = test_harness.block_builder.create_fork(&fork_block, total_fork_elements); + + // insert chain B blocks using newPayload + test_harness.setup_range_insertion_for_valid_chain(chain_b.clone()); + for block in &chain_b { + test_harness.send_new_payload(block.clone()).await; + test_harness.send_fcu(block.hash(), ForkchoiceStatus::Valid).await; + test_harness.check_canon_block_added(block.hash()).await; + test_harness.check_canon_commit(block.hash()).await; + test_harness.check_fcu(block.hash(), ForkchoiceStatus::Valid).await; + } + + // insert chain A blocks using newPayload, one of the blocks will be invalid + let invalid_index = 3; + test_harness.setup_range_insertion_for_invalid_chain(chain_a.clone(), invalid_index); + for block in &chain_a { + test_harness.send_new_payload(block.clone()).await; + } + + // check canon chain insertion up to the invalid index and taking into + // account reversed ordering + test_harness + .check_fork_chain_insertion(chain_a[..chain_a.len() - invalid_index - 1].iter().cloned()) + .await; + for block in &chain_a[chain_a.len() - invalid_index - 1..] { + test_harness.check_invalid_block(block.hash()).await; + } + + // send FCU to make the tip of chain A, expect invalid + let chain_a_tip_hash = chain_a.last().unwrap().hash(); + test_harness.fcu_to(chain_a_tip_hash, ForkchoiceStatus::Invalid).await; + + // send FCU to make the tip of chain B the new head + let chain_b_tip_hash = chain_b.last().unwrap().hash(); + + // verify the new canonical head + test_harness.check_canon_head(chain_b_tip_hash); + + // verify the canonical head didn't change + test_harness.check_canon_head(chain_b_tip_hash); +} + +#[tokio::test] +async fn test_engine_tree_reorg_with_missing_ancestor_expecting_valid() { + reth_tracing::init_test_tracing(); + let chain_spec = MAINNET.clone(); + let mut test_harness = TestHarness::new(chain_spec.clone()); + + let base_chain: Vec<_> = test_harness.block_builder.get_executed_blocks(0..6).collect(); + test_harness = test_harness.with_blocks(base_chain.clone()); + + // create a side chain with an invalid block + let side_chain = + test_harness.block_builder.create_fork(base_chain.last().unwrap().recovered_block(), 15); + let invalid_index = 9; + + test_harness.setup_range_insertion_for_invalid_chain(side_chain.clone(), invalid_index); + + for (index, block) in side_chain.iter().enumerate() { + test_harness.send_new_payload(block.clone()).await; + + if index < side_chain.len() - invalid_index - 1 { + test_harness.send_fcu(block.hash(), ForkchoiceStatus::Valid).await; + } + } + + // Try to do a forkchoice update to a block after the invalid one + let fork_tip_hash = side_chain.last().unwrap().hash(); + test_harness.send_fcu(fork_tip_hash, ForkchoiceStatus::Invalid).await; +} diff --git a/crates/era-downloader/src/client.rs b/crates/era-downloader/src/client.rs index 03a4d975977..2fae9f96f80 100644 --- a/crates/era-downloader/src/client.rs +++ b/crates/era-downloader/src/client.rs @@ -67,16 +67,30 @@ impl EraClient { let number = self.file_name_to_number(file_name).ok_or_eyre("Cannot parse number from file name")?; - let mut stream = client.get(url).await?; - let mut file = File::create(&path).await?; - let mut hasher = Sha256::new(); - while let Some(item) = stream.next().await.transpose()? { - io::copy(&mut item.as_ref(), &mut file).await?; - hasher.update(item); + let mut tries = 1..3; + let mut actual_checksum: eyre::Result<_>; + loop { + actual_checksum = async { + let mut file = File::create(&path).await?; + let mut stream = client.get(url.clone()).await?; + let mut hasher = Sha256::new(); + + while let Some(item) = stream.next().await.transpose()? { + io::copy(&mut item.as_ref(), &mut file).await?; + hasher.update(item); + } + + Ok(hasher.finalize().to_vec()) + } + .await; + + if actual_checksum.is_ok() || tries.next().is_none() { + break; + } } - let actual_checksum = hasher.finalize().to_vec(); + let actual_checksum = actual_checksum?; let file = File::open(self.folder.join(Self::CHECKSUMS)).await?; let reader = io::BufReader::new(file); diff --git a/crates/net/network/src/lib.rs b/crates/net/network/src/lib.rs index 38ed94ad94e..1072d526fbb 100644 --- a/crates/net/network/src/lib.rs +++ b/crates/net/network/src/lib.rs @@ -168,7 +168,6 @@ pub use manager::NetworkManager; pub use metrics::TxTypesCounter; pub use network::{NetworkHandle, NetworkProtocols}; pub use swarm::NetworkConnectionState; -pub use transactions::MessageFilter; /// re-export p2p interfaces pub use reth_network_p2p as p2p; diff --git a/crates/net/network/src/transactions/fetcher.rs b/crates/net/network/src/transactions/fetcher.rs index 43dc1715fb5..c1fdf0e1064 100644 --- a/crates/net/network/src/transactions/fetcher.rs +++ b/crates/net/network/src/transactions/fetcher.rs @@ -28,14 +28,12 @@ use super::{ config::TransactionFetcherConfig, constants::{tx_fetcher::*, SOFT_LIMIT_COUNT_HASHES_IN_GET_POOLED_TRANSACTIONS_REQUEST}, - MessageFilter, PeerMetadata, PooledTransactions, - SOFT_LIMIT_BYTE_SIZE_POOLED_TRANSACTIONS_RESPONSE, + PeerMetadata, PooledTransactions, SOFT_LIMIT_BYTE_SIZE_POOLED_TRANSACTIONS_RESPONSE, }; use crate::{ cache::{LruCache, LruMap}, duration_metered_exec, metrics::TransactionFetcherMetrics, - transactions::{validation, PartiallyFilterMessage}, }; use alloy_consensus::transaction::PooledTransaction; use alloy_primitives::TxHash; @@ -60,7 +58,6 @@ use std::{ }; use tokio::sync::{mpsc::error::TrySendError, oneshot, oneshot::error::RecvError}; use tracing::trace; -use validation::FilterOutcome; /// The type responsible for fetching missing transactions from peers. /// @@ -85,8 +82,6 @@ pub struct TransactionFetcher { pub hashes_pending_fetch: LruCache, /// Tracks all hashes in the transaction fetcher. pub hashes_fetch_inflight_and_pending_fetch: LruMap, - /// Filter for valid announcement and response data. - pub(super) filter_valid_message: MessageFilter, /// Info on capacity of the transaction fetcher. pub info: TransactionFetcherInfo, #[doc(hidden)] @@ -919,20 +914,19 @@ impl TransactionFetcher { // let unvalidated_payload_len = verified_payload.len(); - let (validation_outcome, valid_payload) = - self.filter_valid_message.partially_filter_valid_entries(verified_payload); + let valid_payload = verified_payload.dedup(); // todo: validate based on announced tx size/type and report peer for sending // invalid response . requires // passing the rlp encoded length down from active session along with the decoded // tx. - if validation_outcome == FilterOutcome::ReportPeer { + if valid_payload.len() != unvalidated_payload_len { trace!(target: "net::tx", - peer_id=format!("{peer_id:#}"), - unvalidated_payload_len, - valid_payload_len=valid_payload.len(), - "received invalid `PooledTransactions` response from peer, filtered out duplicate entries" + peer_id=format!("{peer_id:#}"), + unvalidated_payload_len, + valid_payload_len=valid_payload.len(), + "received `PooledTransactions` response from peer with duplicate entries, filtered them out" ); } // valid payload will have at least one transaction at this point. even if the tx @@ -1014,7 +1008,6 @@ impl Default for TransactionFetcher { hashes_fetch_inflight_and_pending_fetch: LruMap::new( DEFAULT_MAX_CAPACITY_CACHE_INFLIGHT_AND_PENDING_FETCH, ), - filter_valid_message: Default::default(), info: TransactionFetcherInfo::default(), metrics: Default::default(), } diff --git a/crates/net/network/src/transactions/mod.rs b/crates/net/network/src/transactions/mod.rs index ce9e00129dd..0fdee4a915f 100644 --- a/crates/net/network/src/transactions/mod.rs +++ b/crates/net/network/src/transactions/mod.rs @@ -8,7 +8,6 @@ pub mod constants; pub mod fetcher; /// Defines the [`TransactionPolicies`] trait for aggregating transaction-related policies. pub mod policy; -pub mod validation; pub use self::constants::{ tx_fetcher::DEFAULT_SOFT_LIMIT_BYTE_SIZE_POOLED_TRANSACTIONS_RESP_ON_PACK_GET_POOLED_TRANSACTIONS_REQ, @@ -20,7 +19,6 @@ pub use config::{ TransactionPropagationPolicy, TransactionsManagerConfig, }; use policy::{NetworkPolicies, TransactionPolicies}; -pub use validation::*; pub(crate) use fetcher::{FetchEvent, TransactionFetcher}; @@ -596,10 +594,15 @@ impl } // 1. filter out spam - let (validation_outcome, mut partially_valid_msg) = - self.transaction_fetcher.filter_valid_message.partially_filter_valid_entries(msg); + if msg.is_empty() { + self.report_peer(peer_id, ReputationChangeKind::BadAnnouncement); + return; + } + + let original_len = msg.len(); + let mut partially_valid_msg = msg.dedup(); - if validation_outcome == FilterOutcome::ReportPeer { + if partially_valid_msg.len() != original_len { self.report_peer(peer_id, ReputationChangeKind::BadAnnouncement); } diff --git a/crates/net/network/src/transactions/validation.rs b/crates/net/network/src/transactions/validation.rs deleted file mode 100644 index 0f4900c0489..00000000000 --- a/crates/net/network/src/transactions/validation.rs +++ /dev/null @@ -1,164 +0,0 @@ -//! Validation of [`NewPooledTransactionHashes66`](reth_eth_wire::NewPooledTransactionHashes66) -//! and [`NewPooledTransactionHashes68`](reth_eth_wire::NewPooledTransactionHashes68) -//! announcements. Validation and filtering of announcements is network dependent. - -use alloy_primitives::Signature; -use derive_more::{Deref, DerefMut}; -use reth_eth_wire::{DedupPayload, HandleMempoolData, PartiallyValidData}; -use std::{fmt, fmt::Display, mem}; -use tracing::trace; - -/// The size of a decoded signature in bytes. -pub const SIGNATURE_DECODED_SIZE_BYTES: usize = mem::size_of::(); - -/// Outcomes from validating a `(ty, hash, size)` entry from a -/// [`NewPooledTransactionHashes68`](reth_eth_wire::NewPooledTransactionHashes68). Signals to the -/// caller how to deal with an announcement entry and the peer who sent the announcement. -#[derive(Debug, Clone, Copy, PartialEq, Eq)] -pub enum ValidationOutcome { - /// Tells the caller to keep the entry in the announcement for fetch. - Fetch, - /// Tells the caller to filter out the entry from the announcement. - Ignore, - /// Tells the caller to filter out the entry from the announcement and penalize the peer. On - /// this outcome, caller can drop the announcement, that is up to each implementation. - ReportPeer, -} - -/// Generic filter for announcements and responses. Checks for empty message and unique hashes/ -/// transactions in message. -pub trait PartiallyFilterMessage { - /// Removes duplicate entries from a mempool message. Returns [`FilterOutcome::ReportPeer`] if - /// the caller should penalize the peer, otherwise [`FilterOutcome::Ok`]. - fn partially_filter_valid_entries( - &self, - msg: impl DedupPayload + fmt::Debug, - ) -> (FilterOutcome, PartiallyValidData) { - // 1. checks if the announcement is empty - if msg.is_empty() { - trace!(target: "net::tx", - msg=?msg, - "empty payload" - ); - return (FilterOutcome::ReportPeer, PartiallyValidData::empty_eth66()) - } - - // 2. checks if announcement is spam packed with duplicate hashes - let original_len = msg.len(); - let partially_valid_data = msg.dedup(); - - ( - if partially_valid_data.len() == original_len { - FilterOutcome::Ok - } else { - FilterOutcome::ReportPeer - }, - partially_valid_data, - ) - } -} - -/// Outcome from filtering -/// [`NewPooledTransactionHashes68`](reth_eth_wire::NewPooledTransactionHashes68). Signals to caller -/// whether to penalize the sender of the announcement or not. -#[derive(Debug, Clone, Copy, PartialEq, Eq)] -pub enum FilterOutcome { - /// Peer behaves appropriately. - Ok, - /// A penalty should be flagged for the peer. Peer sent an announcement with unacceptably - /// invalid entries. - ReportPeer, -} - -/// A generic wrapper for types that provide message filtering capabilities. -/// -/// This struct is typically used with types implementing traits like [`PartiallyFilterMessage`], -/// which perform initial stateless validation on network messages, such as checking for empty -/// payloads or removing duplicate entries. -#[derive(Debug, Default, Deref, DerefMut)] -pub struct MessageFilter(N); - -/// Filter for announcements containing EIP [`reth_ethereum_primitives::TxType`]s. -#[derive(Debug, Default)] -pub struct EthMessageFilter; - -impl Display for EthMessageFilter { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "EthMessageFilter") - } -} - -impl PartiallyFilterMessage for EthMessageFilter {} - -#[cfg(test)] -mod test { - use super::*; - use alloy_primitives::B256; - use reth_eth_wire::{NewPooledTransactionHashes66, NewPooledTransactionHashes68}; - use std::{collections::HashMap, str::FromStr}; - - #[test] - fn eth68_empty_announcement() { - let types = vec![]; - let sizes = vec![]; - let hashes = vec![]; - - let announcement = NewPooledTransactionHashes68 { types, sizes, hashes }; - - let filter = EthMessageFilter; - - let (outcome, _partially_valid_data) = filter.partially_filter_valid_entries(announcement); - - assert_eq!(outcome, FilterOutcome::ReportPeer); - } - - #[test] - fn eth66_empty_announcement() { - let hashes = vec![]; - - let announcement = NewPooledTransactionHashes66(hashes); - - let filter: MessageFilter = MessageFilter::default(); - - let (outcome, _partially_valid_data) = filter.partially_filter_valid_entries(announcement); - - assert_eq!(outcome, FilterOutcome::ReportPeer); - } - - #[test] - fn eth66_announcement_duplicate_tx_hash() { - // first three or the same - let hashes = vec![ - B256::from_str("0xbeefcafebeefcafebeefcafebeefcafebeefcafebeefcafebeefcafebeefbbbb") // dup1 - .unwrap(), - B256::from_str("0xbeefcafebeefcafebeefcafebeefcafebeefcafebeefcafebeefcafebeefcafa") // dup2 - .unwrap(), - B256::from_str("0xbeefcafebeefcafebeefcafebeefcafebeefcafebeefcafebeefcafebeefcafa") // removed dup2 - .unwrap(), - B256::from_str("0xbeefcafebeefcafebeefcafebeefcafebeefcafebeefcafebeefcafebeefcafa") // removed dup2 - .unwrap(), - B256::from_str("0xbeefcafebeefcafebeefcafebeefcafebeefcafebeefcafebeefcafebeefbbbb") // removed dup1 - .unwrap(), - ]; - - let announcement = NewPooledTransactionHashes66(hashes.clone()); - - let filter: MessageFilter = MessageFilter::default(); - - let (outcome, partially_valid_data) = filter.partially_filter_valid_entries(announcement); - - assert_eq!(outcome, FilterOutcome::ReportPeer); - - let mut expected_data = HashMap::default(); - expected_data.insert(hashes[1], None); - expected_data.insert(hashes[0], None); - - assert_eq!(expected_data, partially_valid_data.into_data()) - } - - #[test] - fn test_display_for_zst() { - let filter = EthMessageFilter; - assert_eq!("EthMessageFilter", &filter.to_string()); - } -} diff --git a/crates/optimism/txpool/src/supervisor/client.rs b/crates/optimism/txpool/src/supervisor/client.rs index 5b6c65eeb28..4cc67685b59 100644 --- a/crates/optimism/txpool/src/supervisor/client.rs +++ b/crates/optimism/txpool/src/supervisor/client.rs @@ -113,6 +113,7 @@ impl SupervisorClient { ) .await { + self.inner.metrics.increment_metrics_for_error(&err); trace!(target: "txpool", hash=%hash, err=%err, "Cross chain transaction invalid"); return Some(Err(InvalidCrossTx::ValidationError(err))); } diff --git a/crates/optimism/txpool/src/supervisor/metrics.rs b/crates/optimism/txpool/src/supervisor/metrics.rs index 1ccb2178916..0c66d0039ac 100644 --- a/crates/optimism/txpool/src/supervisor/metrics.rs +++ b/crates/optimism/txpool/src/supervisor/metrics.rs @@ -1,6 +1,11 @@ //! Optimism supervisor and sequencer metrics -use reth_metrics::{metrics::Histogram, Metrics}; +use crate::supervisor::InteropTxValidatorError; +use op_alloy_rpc_types::InvalidInboxEntry; +use reth_metrics::{ + metrics::{Counter, Histogram}, + Metrics, +}; use std::time::Duration; /// Optimism supervisor metrics @@ -9,6 +14,29 @@ use std::time::Duration; pub struct SupervisorMetrics { /// How long it takes to query the supervisor in the Optimism transaction pool pub(crate) supervisor_query_latency: Histogram, + + /// Counter for the number of times data was skipped + pub(crate) skipped_data_count: Counter, + /// Counter for the number of times an unknown chain was encountered + pub(crate) unknown_chain_count: Counter, + /// Counter for the number of times conflicting data was encountered + pub(crate) conflicting_data_count: Counter, + /// Counter for the number of times ineffective data was encountered + pub(crate) ineffective_data_count: Counter, + /// Counter for the number of times data was out of order + pub(crate) out_of_order_count: Counter, + /// Counter for the number of times data was awaiting replacement + pub(crate) awaiting_replacement_count: Counter, + /// Counter for the number of times data was out of scope + pub(crate) out_of_scope_count: Counter, + /// Counter for the number of times there was no parent for the first block + pub(crate) no_parent_for_first_block_count: Counter, + /// Counter for the number of times future data was encountered + pub(crate) future_data_count: Counter, + /// Counter for the number of times data was missed + pub(crate) missed_data_count: Counter, + /// Counter for the number of times data corruption was encountered + pub(crate) data_corruption_count: Counter, } impl SupervisorMetrics { @@ -17,6 +45,30 @@ impl SupervisorMetrics { pub fn record_supervisor_query(&self, duration: Duration) { self.supervisor_query_latency.record(duration.as_secs_f64()); } + + /// Increments the metrics for the given error + pub fn increment_metrics_for_error(&self, error: &InteropTxValidatorError) { + if let InteropTxValidatorError::InvalidEntry(inner) = error { + match inner { + InvalidInboxEntry::SkippedData => self.skipped_data_count.increment(1), + InvalidInboxEntry::UnknownChain => self.unknown_chain_count.increment(1), + InvalidInboxEntry::ConflictingData => self.conflicting_data_count.increment(1), + InvalidInboxEntry::IneffectiveData => self.ineffective_data_count.increment(1), + InvalidInboxEntry::OutOfOrder => self.out_of_order_count.increment(1), + InvalidInboxEntry::AwaitingReplacement => { + self.awaiting_replacement_count.increment(1) + } + InvalidInboxEntry::OutOfScope => self.out_of_scope_count.increment(1), + InvalidInboxEntry::NoParentForFirstBlock => { + self.no_parent_for_first_block_count.increment(1) + } + InvalidInboxEntry::FutureData => self.future_data_count.increment(1), + InvalidInboxEntry::MissedData => self.missed_data_count.increment(1), + InvalidInboxEntry::DataCorruption => self.data_corruption_count.increment(1), + InvalidInboxEntry::UninitializedChainDatabase => {} + } + } + } } /// Optimism sequencer metrics diff --git a/crates/rpc/rpc-eth-api/src/helpers/call.rs b/crates/rpc/rpc-eth-api/src/helpers/call.rs index 37aa9714513..dda235ffaf3 100644 --- a/crates/rpc/rpc-eth-api/src/helpers/call.rs +++ b/crates/rpc/rpc-eth-api/src/helpers/call.rs @@ -215,7 +215,7 @@ pub trait EthCall: EstimateCall + Call + LoadPendingBlock + LoadBlock + FullEthA overrides: EvmOverrides, ) -> impl Future> + Send { async move { - let (res, _env) = + let res = self.transact_call_at(request, block_number.unwrap_or_default(), overrides).await?; ensure_success(res.result) @@ -288,7 +288,7 @@ pub trait EthCall: EstimateCall + Call + LoadPendingBlock + LoadBlock + FullEthA let block_transactions = block.transactions_recovered().take(num_txs); for tx in block_transactions { let tx_env = RpcNodeCore::evm_config(&this).tx_env(tx); - let (res, _) = this.transact(&mut db, evm_env.clone(), tx_env)?; + let res = this.transact(&mut db, evm_env.clone(), tx_env)?; db.commit(res.state); } } @@ -313,7 +313,7 @@ pub trait EthCall: EstimateCall + Call + LoadPendingBlock + LoadBlock + FullEthA let (current_evm_env, prepared_tx) = this.prepare_call_env(evm_env.clone(), tx, &mut db, overrides)?; - let (res, _) = this.transact(&mut db, current_evm_env, prepared_tx)?; + let res = this.transact(&mut db, current_evm_env, prepared_tx)?; match ensure_success::<_, Self::Error>(res.result) { Ok(output) => { @@ -426,11 +426,11 @@ pub trait EthCall: EstimateCall + Call + LoadPendingBlock + LoadBlock + FullEthA }; // transact again to get the exact gas used - let (result, (_, tx_env)) = self.transact(&mut db, evm_env, tx_env)?; + let gas_limit = tx_env.gas_limit(); + let result = self.transact(&mut db, evm_env, tx_env)?; let res = match result.result { ExecutionResult::Halt { reason, gas_used } => { - let error = - Some(Self::Error::from_evm_halt(reason, tx_env.gas_limit()).to_string()); + let error = Some(Self::Error::from_evm_halt(reason, gas_limit).to_string()); AccessListResult { access_list, gas_used: U256::from(gas_used), error } } ExecutionResult::Revert { output, gas_used } => { @@ -477,61 +477,47 @@ pub trait Call: /// Executes the `TxEnv` against the given [Database] without committing state /// changes. - #[expect(clippy::type_complexity)] fn transact( &self, db: DB, evm_env: EvmEnvFor, tx_env: TxEnvFor, - ) -> Result< - (ResultAndState>, (EvmEnvFor, TxEnvFor)), - Self::Error, - > + ) -> Result>, Self::Error> where DB: Database, { - let mut evm = self.evm_config().evm_with_env(db, evm_env.clone()); - let res = evm.transact(tx_env.clone()).map_err(Self::Error::from_evm_err)?; + let mut evm = self.evm_config().evm_with_env(db, evm_env); + let res = evm.transact(tx_env).map_err(Self::Error::from_evm_err)?; - Ok((res, (evm_env, tx_env))) + Ok(res) } /// Executes the [`EvmEnv`] against the given [Database] without committing state /// changes. - #[expect(clippy::type_complexity)] fn transact_with_inspector( &self, db: DB, evm_env: EvmEnvFor, tx_env: TxEnvFor, inspector: I, - ) -> Result< - (ResultAndState>, (EvmEnvFor, TxEnvFor)), - Self::Error, - > + ) -> Result>, Self::Error> where DB: Database, I: InspectorFor, { - let mut evm = self.evm_config().evm_with_env_and_inspector(db, evm_env.clone(), inspector); - let res = evm.transact(tx_env.clone()).map_err(Self::Error::from_evm_err)?; + let mut evm = self.evm_config().evm_with_env_and_inspector(db, evm_env, inspector); + let res = evm.transact(tx_env).map_err(Self::Error::from_evm_err)?; - Ok((res, (evm_env, tx_env))) + Ok(res) } /// Executes the call request at the given [`BlockId`]. - #[expect(clippy::type_complexity)] fn transact_call_at( &self, request: TransactionRequest, at: BlockId, overrides: EvmOverrides, - ) -> impl Future< - Output = Result< - (ResultAndState>, (EvmEnvFor, TxEnvFor)), - Self::Error, - >, - > + Send + ) -> impl Future>, Self::Error>> + Send where Self: LoadPendingBlock, { @@ -655,7 +641,7 @@ pub trait Call: let tx_env = RpcNodeCore::evm_config(&this).tx_env(tx); - let (res, _) = this.transact(&mut db, evm_env, tx_env)?; + let res = this.transact(&mut db, evm_env, tx_env)?; f(tx_info, res, db) }) .await diff --git a/crates/rpc/rpc-eth-api/src/helpers/estimate.rs b/crates/rpc/rpc-eth-api/src/helpers/estimate.rs index ecef2270d42..297559fbabf 100644 --- a/crates/rpc/rpc-eth-api/src/helpers/estimate.rs +++ b/crates/rpc/rpc-eth-api/src/helpers/estimate.rs @@ -94,7 +94,7 @@ pub trait EstimateCall: Call { // with the minimum gas limit to make sure. let mut tx_env = tx_env.clone(); tx_env.set_gas_limit(MIN_TRANSACTION_GAS); - if let Ok((res, _)) = self.transact(&mut db, evm_env.clone(), tx_env) { + if let Ok(res) = self.transact(&mut db, evm_env.clone(), tx_env) { if res.result.is_success() { return Ok(U256::from(MIN_TRANSACTION_GAS)) } @@ -119,36 +119,30 @@ pub trait EstimateCall: Call { trace!(target: "rpc::eth::estimate", ?evm_env, ?tx_env, "Starting gas estimation"); // Execute the transaction with the highest possible gas limit. - let (mut res, (mut evm_env, mut tx_env)) = - match self.transact(&mut db, evm_env.clone(), tx_env.clone()) { - // Handle the exceptional case where the transaction initialization uses too much - // gas. If the gas price or gas limit was specified in the request, - // retry the transaction with the block's gas limit to determine if - // the failure was due to insufficient gas. - Err(err) - if err.is_gas_too_high() && - (tx_request_gas_limit.is_some() || tx_request_gas_price.is_some()) => - { - return Err(self.map_out_of_gas_err( - block_env_gas_limit, - evm_env, - tx_env, - &mut db, - )) - } - Err(err) if err.is_gas_too_low() => { - // This failed because the configured gas cost of the tx was lower than what - // actually consumed by the tx This can happen if the - // request provided fee values manually and the resulting gas cost exceeds the - // sender's allowance, so we return the appropriate error here - return Err(RpcInvalidTransactionError::GasRequiredExceedsAllowance { - gas_limit: tx_env.gas_limit(), - } - .into_eth_err()) + let mut res = match self.transact(&mut db, evm_env.clone(), tx_env.clone()) { + // Handle the exceptional case where the transaction initialization uses too much + // gas. If the gas price or gas limit was specified in the request, + // retry the transaction with the block's gas limit to determine if + // the failure was due to insufficient gas. + Err(err) + if err.is_gas_too_high() && + (tx_request_gas_limit.is_some() || tx_request_gas_price.is_some()) => + { + return Err(self.map_out_of_gas_err(block_env_gas_limit, evm_env, tx_env, &mut db)) + } + Err(err) if err.is_gas_too_low() => { + // This failed because the configured gas cost of the tx was lower than what + // actually consumed by the tx This can happen if the + // request provided fee values manually and the resulting gas cost exceeds the + // sender's allowance, so we return the appropriate error here + return Err(RpcInvalidTransactionError::GasRequiredExceedsAllowance { + gas_limit: tx_env.gas_limit(), } - // Propagate other results (successful or other errors). - ethres => ethres?, - }; + .into_eth_err()) + } + // Propagate other results (successful or other errors). + ethres => ethres?, + }; let gas_refund = match res.result { ExecutionResult::Success { gas_refunded, .. } => gas_refunded, @@ -194,7 +188,7 @@ pub trait EstimateCall: Call { tx_env.set_gas_limit(optimistic_gas_limit); // Re-execute the transaction with the new gas limit and update the result and // environment. - (res, (evm_env, tx_env)) = self.transact(&mut db, evm_env, tx_env)?; + res = self.transact(&mut db, evm_env.clone(), tx_env.clone())?; // Update the gas used based on the new result. gas_used = res.result.gas_used(); // Update the gas limit estimates (highest and lowest) based on the execution result. @@ -241,7 +235,7 @@ pub trait EstimateCall: Call { // Handle other cases, including successful transactions. ethres => { // Unpack the result and environment if the transaction was successful. - (res, (evm_env, tx_env)) = ethres?; + res = ethres?; // Update the estimated gas range based on the transaction result. update_estimated_gas_range( res.result, @@ -296,7 +290,7 @@ pub trait EstimateCall: Call { { let req_gas_limit = tx_env.gas_limit(); tx_env.set_gas_limit(env_gas_limit); - let (res, _) = match self.transact(db, evm_env, tx_env) { + let res = match self.transact(db, evm_env, tx_env) { Ok(res) => res, Err(err) => return err, }; diff --git a/crates/rpc/rpc-eth-types/src/gas_oracle.rs b/crates/rpc/rpc-eth-types/src/gas_oracle.rs index 18e4e14aa2b..27b23b54e40 100644 --- a/crates/rpc/rpc-eth-types/src/gas_oracle.rs +++ b/crates/rpc/rpc-eth-types/src/gas_oracle.rs @@ -272,8 +272,44 @@ where Ok(Some((parent_hash, prices))) } -} + /// Get the median tip value for the given block. This is useful for determining + /// tips when a block is at capacity. + /// + /// If the block cannot be found or has no transactions, this will return `None`. + pub async fn get_block_median_tip(&self, block_hash: B256) -> EthResult> { + // check the cache (this will hit the disk if the block is not cached) + let Some(block) = self.cache.get_recovered_block(block_hash).await? else { + return Ok(None) + }; + + let base_fee_per_gas = block.base_fee_per_gas(); + + // Filter, sort and collect the prices + let prices = block + .transactions_recovered() + .filter_map(|tx| { + if let Some(base_fee) = base_fee_per_gas { + (*tx).effective_tip_per_gas(base_fee) + } else { + Some((*tx).priority_fee_or_price()) + } + }) + .sorted() + .collect::>(); + + let median = if prices.is_empty() { + // if there are no prices, return `None` + None + } else if prices.len() % 2 == 1 { + Some(U256::from(prices[prices.len() / 2])) + } else { + Some(U256::from((prices[prices.len() / 2 - 1] + prices[prices.len() / 2]) / 2)) + }; + + Ok(median) + } +} /// Container type for mutable inner state of the [`GasPriceOracle`] #[derive(Debug)] struct GasPriceOracleInner { diff --git a/crates/rpc/rpc/src/debug.rs b/crates/rpc/rpc/src/debug.rs index c5a35ea47a6..4ca8317f5c3 100644 --- a/crates/rpc/rpc/src/debug.rs +++ b/crates/rpc/rpc/src/debug.rs @@ -528,7 +528,7 @@ where // Execute all transactions until index for tx in transactions { let tx_env = this.eth_api().evm_config().tx_env(tx); - let (res, _) = this.eth_api().transact(&mut db, evm_env.clone(), tx_env)?; + let res = this.eth_api().transact(&mut db, evm_env.clone(), tx_env)?; db.commit(res.state); } } diff --git a/crates/rpc/rpc/src/eth/filter.rs b/crates/rpc/rpc/src/eth/filter.rs index f44d5b7c572..d769ca90074 100644 --- a/crates/rpc/rpc/src/eth/filter.rs +++ b/crates/rpc/rpc/src/eth/filter.rs @@ -7,7 +7,7 @@ use alloy_rpc_types_eth::{ PendingTransactionFilterKind, }; use async_trait::async_trait; -use futures::future::TryFutureExt; +use futures::{future::TryFutureExt, stream, StreamExt}; use jsonrpsee::{core::RpcResult, server::IdProvider}; use reth_errors::ProviderError; use reth_primitives_traits::NodePrimitives; @@ -566,62 +566,153 @@ where to_block: u64, limits: QueryLimits, ) -> Result, EthFilterError> { + // generate all the block ranges we need to process + let ranges: Vec<_> = + BlockRangeInclusiveIter::new(from_block..=to_block, self.max_headers_range).collect(); let mut all_logs = Vec::new(); - // loop over the range of new blocks and check logs if the filter matches the log's bloom - // filter - for (from, to) in - BlockRangeInclusiveIter::new(from_block..=to_block, self.max_headers_range) + // for small ranges (less than 2 chunks) + if ranges.len() <= 1 { + for (from, to) in ranges { + let chunk_logs = self + .process_block_range(filter, from, to, from_block, to_block, limits) + .await?; + all_logs.extend(chunk_logs); + } + + return Ok(all_logs); + } + + // adjust based on system capabilities + //TODO: configurable, 4 is a good default for both I/O and CPU balance + let concurrency = std::cmp::min(ranges.len(), 4); + + // process block ranges in parallel while preserving order + let results = stream::iter(ranges.into_iter().enumerate()) + .map(|(range_idx, (from, to))| { + let self_inner = self; + let filter = &filter; + + async move { + let chunk_logs = self_inner + .process_block_range(filter, from, to, from_block, to_block, limits) + .await?; + + // return chunk index with logs to preserve order + Ok::<(usize, Vec), EthFilterError>((range_idx, chunk_logs)) + } + }) + .buffer_unordered(concurrency) + .collect::>() + .await; + + let mut ordered_results = vec![Vec::new(); results.len()]; + + for result in results { + match result { + Ok((idx, logs)) => { + ordered_results[idx] = logs; + } + Err(e) => return Err(e), + } + } + + // combine logs + for (range_idx, logs) in ordered_results.into_iter().enumerate() { + if let Some(max_logs) = limits.max_logs_per_response { + let is_multi_block_range = from_block != to_block; + if is_multi_block_range && all_logs.len() + logs.len() > max_logs { + // If we're going to exceed the limit, return error with processed range + // Calculate the block number we've processed up to this point + let processed_to_block = if range_idx > 0 { + // If we've processed at least one full range, use the end of the previous + // range + let (_prev_from, prev_to) = BlockRangeInclusiveIter::new( + from_block..=to_block, + self.max_headers_range, + ) + .nth(range_idx - 1) + .unwrap_or((from_block, from_block)); + prev_to + } else { + // Otherwise, we're still in the first range + from_block + }; + + return Err(EthFilterError::QueryExceedsMaxResults { + max_logs, + from_block, + to_block: processed_to_block, + }); + } + } + + all_logs.extend(logs); + } + + Ok(all_logs) + } + + /// Processes a single block range + async fn process_block_range( + &self, + filter: &Filter, + from: u64, + to: u64, + overall_from_block: u64, + overall_to_block: u64, + limits: QueryLimits, + ) -> Result, EthFilterError> { + let mut chunk_logs = Vec::new(); + + let headers = self.provider().headers_range(from..=to)?; + // these are consecutive headers, so we can use the parent hash of the next + // block to get the current header's hash + for (idx, header) in headers + .iter() + .enumerate() + .filter(|(_, header)| filter.matches_bloom(header.logs_bloom())) { - let headers = self.provider().headers_range(from..=to)?; - for (idx, header) in headers - .iter() - .enumerate() - .filter(|(_, header)| filter.matches_bloom(header.logs_bloom())) - { - // these are consecutive headers, so we can use the parent hash of the next - // block to get the current header's hash - let block_hash = match headers.get(idx + 1) { - Some(child) => child.parent_hash(), - None => self - .provider() - .block_hash(header.number())? - .ok_or_else(|| ProviderError::HeaderNotFound(header.number().into()))?, - }; + let block_hash = match headers.get(idx + 1) { + Some(child) => child.parent_hash(), + None => self + .provider() + .block_hash(header.number())? + .ok_or_else(|| ProviderError::HeaderNotFound(header.number().into()))?, + }; - let num_hash = BlockNumHash::new(header.number(), block_hash); - if let Some((receipts, maybe_block)) = - self.eth_cache().get_receipts_and_maybe_block(num_hash.hash).await? - { - append_matching_block_logs( - &mut all_logs, - maybe_block - .map(ProviderOrBlock::Block) - .unwrap_or_else(|| ProviderOrBlock::Provider(self.provider())), - filter, - num_hash, - &receipts, - false, - header.timestamp(), - )?; + let num_hash = BlockNumHash::new(header.number(), block_hash); + if let Some((receipts, maybe_block)) = + self.eth_cache().get_receipts_and_maybe_block(num_hash.hash).await? + { + append_matching_block_logs( + &mut chunk_logs, + maybe_block + .map(ProviderOrBlock::Block) + .unwrap_or_else(|| ProviderOrBlock::Provider(self.provider())), + filter, + num_hash, + &receipts, + false, + header.timestamp(), + )?; - // size check but only if range is multiple blocks, so we always return all - // logs of a single block - let is_multi_block_range = from_block != to_block; - if let Some(max_logs_per_response) = limits.max_logs_per_response { - if is_multi_block_range && all_logs.len() > max_logs_per_response { - return Err(EthFilterError::QueryExceedsMaxResults { - max_logs: max_logs_per_response, - from_block, - to_block: num_hash.number.saturating_sub(1), - }); - } + // size check but only if range is multiple blocks, so we always return all + // logs of a single block + let is_multi_block_range = overall_from_block != overall_to_block; + if let Some(max_logs_per_response) = limits.max_logs_per_response { + if is_multi_block_range && chunk_logs.len() > max_logs_per_response { + return Err(EthFilterError::QueryExceedsMaxResults { + max_logs: max_logs_per_response, + from_block: from, + to_block: num_hash.number.saturating_sub(1), + }); } } } } - Ok(all_logs) + Ok(chunk_logs) } }