diff --git a/Cargo.lock b/Cargo.lock index 17e53c81377..450ab2bc263 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7774,6 +7774,30 @@ dependencies = [ "tracing", ] +[[package]] +name = "reth-chain" +version = "1.10.0" +dependencies = [ + "alloy-consensus", + "alloy-eips", + "alloy-primitives", + "arbitrary", + "bincode 1.3.3", + "metrics", + "parking_lot", + "rand 0.9.2", + "reth-ethereum-primitives", + "reth-execution-types", + "reth-metrics", + "reth-primitives-traits", + "reth-trie", + "reth-trie-common", + "revm", + "serde", + "serde_with", + "tracing", +] + [[package]] name = "reth-chain-state" version = "1.10.0" @@ -7789,6 +7813,7 @@ dependencies = [ "parking_lot", "pin-project", "rand 0.9.2", + "reth-chain", "reth-chainspec", "reth-errors", "reth-ethereum-primitives", @@ -8927,7 +8952,6 @@ dependencies = [ name = "reth-execution-types" version = "1.10.0" dependencies = [ - "alloy-consensus", "alloy-eips", "alloy-evm", "alloy-primitives", @@ -8981,6 +9005,7 @@ dependencies = [ "reth-trie-common", "rmp-serde", "secp256k1 0.30.0", + "serde_with", "tempfile", "thiserror 2.0.17", "tokio", @@ -8995,6 +9020,7 @@ dependencies = [ "alloy-eips", "eyre", "futures-util", + "reth-chain", "reth-chainspec", "reth-config", "reth-consensus", @@ -9002,7 +9028,6 @@ dependencies = [ "reth-db-common", "reth-ethereum-primitives", "reth-evm-ethereum", - "reth-execution-types", "reth-exex", "reth-network", "reth-node-api", @@ -9028,9 +9053,9 @@ dependencies = [ "arbitrary", "bincode 1.3.3", "rand 0.9.2", + "reth-chain", "reth-chain-state", "reth-ethereum-primitives", - "reth-execution-types", "reth-primitives-traits", "serde", "serde_with", @@ -9773,6 +9798,7 @@ dependencies = [ "op-alloy-consensus", "op-alloy-rpc-types-engine", "op-revm", + "reth-chain", "reth-chainspec", "reth-evm", "reth-execution-errors", @@ -10223,6 +10249,7 @@ dependencies = [ "parking_lot", "rand 0.9.2", "rayon", + "reth-chain", "reth-chain-state", "reth-chainspec", "reth-codecs", @@ -10712,12 +10739,12 @@ dependencies = [ "metrics", "rand 0.9.2", "reqwest", + "reth-chain", "reth-chain-state", "reth-chainspec", "reth-errors", "reth-ethereum-primitives", "reth-evm", - "reth-execution-types", "reth-metrics", "reth-primitives-traits", "reth-revm", @@ -10790,6 +10817,7 @@ dependencies = [ "rand 0.9.2", "rayon", "reqwest", + "reth-chain", "reth-chainspec", "reth-codecs", "reth-config", @@ -10805,7 +10833,6 @@ dependencies = [ "reth-etl", "reth-evm", "reth-evm-ethereum", - "reth-execution-types", "reth-exex", "reth-fs-util", "reth-network-p2p", @@ -10952,6 +10979,7 @@ dependencies = [ "alloy-primitives", "alloy-rpc-types-engine", "auto_impl", + "reth-chain", "reth-chainspec", "reth-db-api", "reth-db-models", @@ -11110,6 +11138,7 @@ dependencies = [ "proptest", "proptest-arbitrary-interop", "rand 0.9.2", + "reth-chain", "reth-chain-state", "reth-chainspec", "reth-eth-wire-types", diff --git a/Cargo.toml b/Cargo.toml index c9a3ba0d93c..1efe7985da1 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -44,6 +44,7 @@ members = [ "crates/ethereum/primitives/", "crates/ethereum/reth/", "crates/etl/", + "crates/evm/chain", "crates/evm/evm", "crates/evm/execution-errors", "crates/evm/execution-types", @@ -387,6 +388,7 @@ reth-etl = { path = "crates/etl" } reth-evm = { path = "crates/evm/evm", default-features = false } reth-evm-ethereum = { path = "crates/ethereum/evm", default-features = false } reth-optimism-evm = { path = "crates/optimism/evm", default-features = false } +reth-chain = { path = "crates/evm/chain" } reth-execution-errors = { path = "crates/evm/execution-errors", default-features = false } reth-execution-types = { path = "crates/evm/execution-types", default-features = false } reth-exex = { path = "crates/exex/exex" } diff --git a/crates/chain-state/Cargo.toml b/crates/chain-state/Cargo.toml index d21c83ae7c4..b3fbe487311 100644 --- a/crates/chain-state/Cargo.toml +++ b/crates/chain-state/Cargo.toml @@ -13,6 +13,7 @@ workspace = true [dependencies] # reth +reth-chain.workspace = true reth-chainspec.workspace = true reth-errors.workspace = true reth-execution-types.workspace = true @@ -65,6 +66,7 @@ serde = [ "alloy-primitives/serde", "parking_lot/serde", "rand?/serde", + "reth-chain/serde", "reth-ethereum-primitives/serde", "reth-execution-types/serde", "reth-primitives-traits/serde", diff --git a/crates/chain-state/src/in_memory.rs b/crates/chain-state/src/in_memory.rs index 7f2f328b191..1cf281395e7 100644 --- a/crates/chain-state/src/in_memory.rs +++ b/crates/chain-state/src/in_memory.rs @@ -8,9 +8,10 @@ use alloy_consensus::{transaction::TransactionMeta, BlockHeader}; use alloy_eips::{BlockHashOrNumber, BlockNumHash}; use alloy_primitives::{map::HashMap, BlockNumber, TxHash, B256}; use parking_lot::RwLock; +use reth_chain::Chain; use reth_chainspec::ChainInfo; use reth_ethereum_primitives::EthPrimitives; -use reth_execution_types::{BlockExecutionOutput, BlockExecutionResult, Chain, ExecutionOutcome}; +use reth_execution_types::{BlockExecutionOutput, BlockExecutionResult, ExecutionOutcome}; use reth_metrics::{metrics::Gauge, Metrics}; use reth_primitives_traits::{ BlockBody as _, IndexedTx, NodePrimitives, RecoveredBlock, SealedBlock, SealedHeader, @@ -944,6 +945,9 @@ impl> NewCanonicalChain { } /// Converts a slice of executed blocks into a [`Chain`]. + /// + /// Uses [`ExecutedBlock::trie_data_handle`] to avoid blocking on deferred trie computations. + /// The trie data will be computed lazily when actually needed by consumers. fn blocks_to_chain(blocks: &[ExecutedBlock]) -> Chain { match blocks { [] => Chain::default(), @@ -954,8 +958,7 @@ impl> NewCanonicalChain { first.execution_outcome().clone(), first.block_number(), )), - first.trie_updates(), - first.hashed_state(), + first.trie_data_handle(), ); for exec in rest { chain.append_block( @@ -964,8 +967,7 @@ impl> NewCanonicalChain { exec.execution_outcome().clone(), exec.block_number(), )), - exec.trie_updates(), - exec.hashed_state(), + exec.trie_data_handle(), ); } chain @@ -1560,17 +1562,30 @@ mod tests { ..Default::default() }; - assert_eq!( - chain_commit.to_chain_notification(), - CanonStateNotification::Commit { - new: Arc::new(Chain::new( - vec![block0.recovered_block().clone(), block1.recovered_block().clone()], - commit_execution_outcome, - expected_trie_updates, - expected_hashed_state - )) - } - ); + // Get the notification and verify + let notification = chain_commit.to_chain_notification(); + let CanonStateNotification::Commit { new } = notification else { + panic!("Expected Commit notification"); + }; + + // Compare blocks + let expected_blocks: Vec<_> = + vec![block0.recovered_block().clone(), block1.recovered_block().clone()]; + let actual_blocks: Vec<_> = new.blocks().values().cloned().collect(); + assert_eq!(actual_blocks, expected_blocks); + + // Compare execution outcome + assert_eq!(*new.execution_outcome(), commit_execution_outcome); + + // Compare trie data by waiting on deferred data + for (block_num, expected_updates) in &expected_trie_updates { + let actual = new.trie_data_at(*block_num).unwrap().wait_cloned(); + assert_eq!(actual.trie_updates, *expected_updates); + } + for (block_num, expected_state) in &expected_hashed_state { + let actual = new.trie_data_at(*block_num).unwrap().wait_cloned(); + assert_eq!(actual.hashed_state, *expected_state); + } // Test reorg notification let chain_reorg = NewCanonicalChain::Reorg { @@ -1607,22 +1622,48 @@ mod tests { ..Default::default() }; - assert_eq!( - chain_reorg.to_chain_notification(), - CanonStateNotification::Reorg { - old: Arc::new(Chain::new( - vec![block1.recovered_block().clone(), block2.recovered_block().clone()], - reorg_execution_outcome.clone(), - old_trie_updates, - old_hashed_state - )), - new: Arc::new(Chain::new( - vec![block1a.recovered_block().clone(), block2a.recovered_block().clone()], - reorg_execution_outcome, - new_trie_updates, - new_hashed_state - )) - } - ); + // Get the notification and verify + let notification = chain_reorg.to_chain_notification(); + let CanonStateNotification::Reorg { old, new } = notification else { + panic!("Expected Reorg notification"); + }; + + // Compare old chain blocks + let expected_old_blocks: Vec<_> = + vec![block1.recovered_block().clone(), block2.recovered_block().clone()]; + let actual_old_blocks: Vec<_> = old.blocks().values().cloned().collect(); + assert_eq!(actual_old_blocks, expected_old_blocks); + + // Compare old chain execution outcome + assert_eq!(*old.execution_outcome(), reorg_execution_outcome); + + // Compare old chain trie data + for (block_num, expected_updates) in &old_trie_updates { + let actual = old.trie_data_at(*block_num).unwrap().wait_cloned(); + assert_eq!(actual.trie_updates, *expected_updates); + } + for (block_num, expected_state) in &old_hashed_state { + let actual = old.trie_data_at(*block_num).unwrap().wait_cloned(); + assert_eq!(actual.hashed_state, *expected_state); + } + + // Compare new chain blocks + let expected_new_blocks: Vec<_> = + vec![block1a.recovered_block().clone(), block2a.recovered_block().clone()]; + let actual_new_blocks: Vec<_> = new.blocks().values().cloned().collect(); + assert_eq!(actual_new_blocks, expected_new_blocks); + + // Compare new chain execution outcome + assert_eq!(*new.execution_outcome(), reorg_execution_outcome); + + // Compare new chain trie data + for (block_num, expected_updates) in &new_trie_updates { + let actual = new.trie_data_at(*block_num).unwrap().wait_cloned(); + assert_eq!(actual.trie_updates, *expected_updates); + } + for (block_num, expected_state) in &new_hashed_state { + let actual = new.trie_data_at(*block_num).unwrap().wait_cloned(); + assert_eq!(actual.hashed_state, *expected_state); + } } } diff --git a/crates/chain-state/src/lib.rs b/crates/chain-state/src/lib.rs index f6abed91467..d32b131e0ee 100644 --- a/crates/chain-state/src/lib.rs +++ b/crates/chain-state/src/lib.rs @@ -11,8 +11,8 @@ mod in_memory; pub use in_memory::*; -mod deferred_trie; -pub use deferred_trie::*; +// Re-export deferred_trie types from reth_chain +pub use reth_chain::{AnchoredTrieInput, ComputedTrieData, DeferredTrieData}; mod lazy_overlay; pub use lazy_overlay::*; diff --git a/crates/chain-state/src/notifications.rs b/crates/chain-state/src/notifications.rs index 18676ce2005..88152edc6ef 100644 --- a/crates/chain-state/src/notifications.rs +++ b/crates/chain-state/src/notifications.rs @@ -2,7 +2,7 @@ use alloy_eips::{eip2718::Encodable2718, BlockNumHash}; use derive_more::{Deref, DerefMut}; -use reth_execution_types::{BlockReceipts, Chain}; +use reth_chain::{BlockReceipts, Chain}; use reth_primitives_traits::{NodePrimitives, RecoveredBlock, SealedHeader}; use reth_storage_api::NodePrimitivesProvider; use std::{ @@ -80,7 +80,7 @@ impl Stream for CanonStateNotificationStream { /// /// The notification contains at least one [`Chain`] with the imported segment. If some blocks were /// reverted (e.g. during a reorg), the old chain is also returned. -#[derive(Clone, Debug, PartialEq, Eq)] +#[derive(Clone, Debug)] #[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))] #[cfg_attr(feature = "serde", serde(bound = ""))] pub enum CanonStateNotification { @@ -280,14 +280,13 @@ mod tests { vec![block1.clone(), block2.clone()], ExecutionOutcome::default(), BTreeMap::new(), - BTreeMap::new(), )); // Create a commit notification let notification = CanonStateNotification::Commit { new: chain.clone() }; - // Test that `committed` returns the correct chain - assert_eq!(notification.committed(), chain); + // Test that `committed` returns the correct chain (compare Arc pointers) + assert!(Arc::ptr_eq(¬ification.committed(), &chain)); // Test that `reverted` returns None for `Commit` assert!(notification.reverted().is_none()); @@ -319,24 +318,22 @@ mod tests { vec![block1.clone()], ExecutionOutcome::default(), BTreeMap::new(), - BTreeMap::new(), )); let new_chain = Arc::new(Chain::new( vec![block2.clone(), block3.clone()], ExecutionOutcome::default(), BTreeMap::new(), - BTreeMap::new(), )); // Create a reorg notification let notification = CanonStateNotification::Reorg { old: old_chain.clone(), new: new_chain.clone() }; - // Test that `reverted` returns the old chain - assert_eq!(notification.reverted(), Some(old_chain)); + // Test that `reverted` returns the old chain (compare Arc pointers) + assert!(Arc::ptr_eq(¬ification.reverted().unwrap(), &old_chain)); - // Test that `committed` returns the new chain - assert_eq!(notification.committed(), new_chain); + // Test that `committed` returns the new chain (compare Arc pointers) + assert!(Arc::ptr_eq(¬ification.committed(), &new_chain)); // Test that `tip` returns the tip of the new chain (last block in the new chain) assert_eq!(*notification.tip(), block3); @@ -391,7 +388,6 @@ mod tests { vec![block1.clone(), block2.clone()], execution_outcome, BTreeMap::new(), - BTreeMap::new(), )); // Create a commit notification containing the new chain segment. @@ -449,12 +445,8 @@ mod tests { ExecutionOutcome { receipts: old_receipts, ..Default::default() }; // Create an old chain segment to be reverted, containing `old_block1`. - let old_chain: Arc = Arc::new(Chain::new( - vec![old_block1.clone()], - old_execution_outcome, - BTreeMap::new(), - BTreeMap::new(), - )); + let old_chain: Arc = + Arc::new(Chain::new(vec![old_block1.clone()], old_execution_outcome, BTreeMap::new())); // Define block2 for the new chain segment, which will be committed. let mut body = BlockBody::::default(); @@ -482,12 +474,8 @@ mod tests { ExecutionOutcome { receipts: new_receipts, ..Default::default() }; // Create a new chain segment to be committed, containing `new_block1`. - let new_chain = Arc::new(Chain::new( - vec![new_block1.clone()], - new_execution_outcome, - BTreeMap::new(), - BTreeMap::new(), - )); + let new_chain = + Arc::new(Chain::new(vec![new_block1.clone()], new_execution_outcome, BTreeMap::new())); // Create a reorg notification with both reverted (old) and committed (new) chain segments. let notification = CanonStateNotification::Reorg { old: old_chain, new: new_chain }; diff --git a/crates/chain-state/src/test_utils.rs b/crates/chain-state/src/test_utils.rs index 73bad27d79f..81baf10be65 100644 --- a/crates/chain-state/src/test_utils.rs +++ b/crates/chain-state/src/test_utils.rs @@ -9,11 +9,12 @@ use alloy_signer::SignerSync; use alloy_signer_local::PrivateKeySigner; use core::marker::PhantomData; use rand::Rng; +use reth_chain::Chain; use reth_chainspec::{ChainSpec, EthereumHardfork, MIN_TRANSACTION_GAS}; use reth_ethereum_primitives::{ Block, BlockBody, EthPrimitives, Receipt, Transaction, TransactionSigned, }; -use reth_execution_types::{BlockExecutionOutput, BlockExecutionResult, Chain, ExecutionOutcome}; +use reth_execution_types::{BlockExecutionOutput, BlockExecutionResult, ExecutionOutcome}; use reth_primitives_traits::{ proofs::{calculate_receipt_root, calculate_transaction_root, calculate_withdrawals_root}, Account, NodePrimitives, Recovered, RecoveredBlock, SealedBlock, SealedHeader, diff --git a/crates/evm/chain/Cargo.toml b/crates/evm/chain/Cargo.toml new file mode 100644 index 00000000000..67e2a73d0e9 --- /dev/null +++ b/crates/evm/chain/Cargo.toml @@ -0,0 +1,69 @@ +[package] +name = "reth-chain" +version.workspace = true +edition.workspace = true +rust-version.workspace = true +license.workspace = true +homepage.workspace = true +repository.workspace = true +description = "Chain and deferred trie data types for reth." + +[lints] +workspace = true + +[dependencies] +reth-ethereum-primitives.workspace = true +reth-execution-types = { workspace = true, features = ["std"] } +reth-metrics.workspace = true +reth-primitives-traits.workspace = true +reth-trie.workspace = true +reth-trie-common.workspace = true + +# alloy +alloy-consensus.workspace = true +alloy-primitives.workspace = true +alloy-eips.workspace = true + +serde = { workspace = true, optional = true } +serde_with = { workspace = true, optional = true } + +metrics.workspace = true +parking_lot.workspace = true +tracing.workspace = true + +[dev-dependencies] +reth-primitives-traits = { workspace = true, features = ["test-utils", "arbitrary"] } +reth-ethereum-primitives = { workspace = true, features = ["arbitrary"] } +alloy-primitives = { workspace = true, features = ["rand", "arbitrary"] } +alloy-consensus = { workspace = true, features = ["arbitrary"] } +arbitrary.workspace = true +bincode.workspace = true +rand.workspace = true +revm.workspace = true + +[features] +default = [] +serde = [ + "dep:serde", + "alloy-eips/serde", + "alloy-primitives/serde", + "reth-primitives-traits/serde", + "alloy-consensus/serde", + "reth-trie/serde", + "reth-trie-common/serde", + "reth-ethereum-primitives/serde", + "reth-execution-types/serde", + "rand/serde", + "revm/serde", + "parking_lot/serde", +] +serde-bincode-compat = [ + "serde", + "reth-trie-common/serde-bincode-compat", + "reth-primitives-traits/serde-bincode-compat", + "serde_with", + "alloy-eips/serde-bincode-compat", + "alloy-consensus/serde-bincode-compat", + "reth-ethereum-primitives/serde-bincode-compat", + "reth-execution-types/serde-bincode-compat", +] diff --git a/crates/evm/execution-types/src/chain.rs b/crates/evm/chain/src/chain.rs similarity index 74% rename from crates/evm/execution-types/src/chain.rs rename to crates/evm/chain/src/chain.rs index 1592cf78e05..7cd3c4a88cd 100644 --- a/crates/evm/execution-types/src/chain.rs +++ b/crates/evm/chain/src/chain.rs @@ -1,16 +1,16 @@ //! Contains [Chain], a chain of blocks and their final state. -use crate::ExecutionOutcome; -use alloc::{borrow::Cow, collections::BTreeMap, sync::Arc, vec::Vec}; +use crate::DeferredTrieData; use alloy_consensus::{transaction::Recovered, BlockHeader}; use alloy_eips::{eip1898::ForkBlock, eip2718::Encodable2718, BlockNumHash}; use alloy_primitives::{Address, BlockHash, BlockNumber, TxHash}; -use core::{fmt, ops::RangeInclusive}; +use reth_execution_types::ExecutionOutcome; use reth_primitives_traits::{ transaction::signed::SignedTransaction, Block, BlockBody, IndexedTx, NodePrimitives, RecoveredBlock, SealedHeader, }; use reth_trie_common::{updates::TrieUpdatesSorted, HashedPostStateSorted}; +use std::{borrow::Cow, collections::BTreeMap, fmt, ops::RangeInclusive, sync::Arc, vec::Vec}; /// A chain of blocks and their final state. /// @@ -22,8 +22,7 @@ use reth_trie_common::{updates::TrieUpdatesSorted, HashedPostStateSorted}; /// # Warning /// /// A chain of blocks should not be empty. -#[derive(Clone, Debug, PartialEq, Eq)] -#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))] +#[derive(Clone, Debug)] pub struct Chain { /// All blocks in this chain. blocks: BTreeMap>, @@ -34,10 +33,12 @@ pub struct Chain { /// /// Additionally, it includes the individual state changes that led to the current state. execution_outcome: ExecutionOutcome, - /// State trie updates for each block in the chain, keyed by block number. - trie_updates: BTreeMap>, - /// Hashed post state for each block in the chain, keyed by block number. - hashed_state: BTreeMap>, + /// Deferred trie data for each block in the chain, keyed by block number. + /// + /// Contains handles to lazily-computed sorted trie updates and hashed state. + /// This allows Chain to be constructed without blocking on expensive trie + /// computations - the data is only materialized when actually needed. + trie_data: BTreeMap, } type ChainTxReceiptMeta<'a, N> = ( @@ -52,8 +53,7 @@ impl Default for Chain { Self { blocks: Default::default(), execution_outcome: Default::default(), - trie_updates: Default::default(), - hashed_state: Default::default(), + trie_data: Default::default(), } } } @@ -67,27 +67,24 @@ impl Chain { pub fn new( blocks: impl IntoIterator>, execution_outcome: ExecutionOutcome, - trie_updates: BTreeMap>, - hashed_state: BTreeMap>, + trie_data: BTreeMap, ) -> Self { let blocks = blocks.into_iter().map(|b| (b.header().number(), b)).collect::>(); debug_assert!(!blocks.is_empty(), "Chain should have at least one block"); - Self { blocks, execution_outcome, trie_updates, hashed_state } + Self { blocks, execution_outcome, trie_data } } /// Create new Chain from a single block and its state. pub fn from_block( block: RecoveredBlock, execution_outcome: ExecutionOutcome, - trie_updates: Arc, - hashed_state: Arc, + trie_data: DeferredTrieData, ) -> Self { let block_number = block.header().number(); - let trie_updates_map = BTreeMap::from([(block_number, trie_updates)]); - let hashed_state_map = BTreeMap::from([(block_number, hashed_state)]); - Self::new([block], execution_outcome, trie_updates_map, hashed_state_map) + let trie_data_map = BTreeMap::from([(block_number, trie_data)]); + Self::new([block], execution_outcome, trie_data_map) } /// Get the blocks in this chain. @@ -105,37 +102,62 @@ impl Chain { self.blocks.values().map(|block| block.clone_sealed_header()) } + /// Get all deferred trie data for this chain. + /// + /// Returns handles to lazily-computed sorted trie updates and hashed state. + /// [`DeferredTrieData`] allows `Chain` to be constructed without blocking on + /// expensive trie computations - the data is only materialized when actually needed + /// via [`DeferredTrieData::wait_cloned`] or similar methods. + /// + /// This method does **not** block. To access the computed trie data, call + /// [`DeferredTrieData::wait_cloned`] on individual entries, which will block + /// if the background computation has not yet completed. + pub const fn trie_data(&self) -> &BTreeMap { + &self.trie_data + } + + /// Get deferred trie data for a specific block number. + /// + /// Returns a handle to the lazily-computed trie data. This method does **not** block. + /// Call [`DeferredTrieData::wait_cloned`] on the result to wait for and retrieve + /// the computed data, which will block if computation is still in progress. + pub fn trie_data_at(&self, block_number: BlockNumber) -> Option<&DeferredTrieData> { + self.trie_data.get(&block_number) + } + /// Get all trie updates for this chain. - pub const fn trie_updates(&self) -> &BTreeMap> { - &self.trie_updates + /// + /// Note: This blocks on deferred trie data for all blocks in the chain. + /// Prefer using [`trie_data`](Self::trie_data) when possible to avoid blocking. + pub fn trie_updates(&self) -> BTreeMap> { + self.trie_data.iter().map(|(num, data)| (*num, data.wait_cloned().trie_updates)).collect() } /// Get trie updates for a specific block number. - pub fn trie_updates_at(&self, block_number: BlockNumber) -> Option<&Arc> { - self.trie_updates.get(&block_number) + /// + /// Note: This waits for deferred trie data if not already computed. + pub fn trie_updates_at(&self, block_number: BlockNumber) -> Option> { + self.trie_data.get(&block_number).map(|data| data.wait_cloned().trie_updates) } - /// Remove all trie updates for this chain. - pub fn clear_trie_updates(&mut self) { - self.trie_updates.clear(); + /// Remove all trie data for this chain. + pub fn clear_trie_data(&mut self) { + self.trie_data.clear(); } /// Get all hashed states for this chain. - pub const fn hashed_state(&self) -> &BTreeMap> { - &self.hashed_state + /// + /// Note: This blocks on deferred trie data for all blocks in the chain. + /// Prefer using [`trie_data`](Self::trie_data) when possible to avoid blocking. + pub fn hashed_state(&self) -> BTreeMap> { + self.trie_data.iter().map(|(num, data)| (*num, data.wait_cloned().hashed_state)).collect() } /// Get hashed state for a specific block number. - pub fn hashed_state_at( - &self, - block_number: BlockNumber, - ) -> Option<&Arc> { - self.hashed_state.get(&block_number) - } - - /// Remove all hashed states for this chain. - pub fn clear_hashed_state(&mut self) { - self.hashed_state.clear(); + /// + /// Note: This waits for deferred trie data if not already computed. + pub fn hashed_state_at(&self, block_number: BlockNumber) -> Option> { + self.trie_data.get(&block_number).map(|data| data.wait_cloned().hashed_state) } /// Get execution outcome of this chain @@ -183,23 +205,16 @@ impl Chain { /// Destructure the chain into its inner components: /// 1. The blocks contained in the chain. /// 2. The execution outcome representing the final state. - /// 3. The trie updates map. - /// 4. The hashed state map. + /// 3. The deferred trie data map. #[allow(clippy::type_complexity)] pub fn into_inner( self, ) -> ( ChainBlocks<'static, N::Block>, ExecutionOutcome, - BTreeMap>, - BTreeMap>, + BTreeMap, ) { - ( - ChainBlocks { blocks: Cow::Owned(self.blocks) }, - self.execution_outcome, - self.trie_updates, - self.hashed_state, - ) + (ChainBlocks { blocks: Cow::Owned(self.blocks) }, self.execution_outcome, self.trie_data) } /// Destructure the chain into its inner components: @@ -329,14 +344,12 @@ impl Chain { &mut self, block: RecoveredBlock, execution_outcome: ExecutionOutcome, - trie_updates: Arc, - hashed_state: Arc, + trie_data: DeferredTrieData, ) { let block_number = block.header().number(); self.blocks.insert(block_number, block); self.execution_outcome.extend(execution_outcome); - self.trie_updates.insert(block_number, trie_updates); - self.hashed_state.insert(block_number, hashed_state); + self.trie_data.insert(block_number, trie_data); } /// Merge two chains by appending the given chain into the current one. @@ -355,8 +368,7 @@ impl Chain { // Insert blocks from other chain self.blocks.extend(other.blocks); self.execution_outcome.extend(other.execution_outcome); - self.trie_updates.extend(other.trie_updates); - self.hashed_state.extend(other.hashed_state); + self.trie_data.extend(other.trie_data); Ok(()) } @@ -459,7 +471,7 @@ impl>> ChainBlocks<'_, impl IntoIterator for ChainBlocks<'_, B> { type Item = (BlockNumber, RecoveredBlock); - type IntoIter = alloc::collections::btree_map::IntoIter>; + type IntoIter = std::collections::btree_map::IntoIter>; fn into_iter(self) -> Self::IntoIter { self.blocks.into_owned().into_iter() @@ -477,25 +489,95 @@ pub struct BlockReceipts { pub timestamp: u64, } +#[cfg(feature = "serde")] +mod chain_serde { + use super::*; + use crate::ComputedTrieData; + use serde::{Deserialize, Deserializer, Serialize, Serializer}; + + /// Serializable representation of Chain that waits for deferred trie data. + #[derive(Serialize, Deserialize)] + #[serde(bound = "")] + struct ChainRepr { + blocks: BTreeMap>, + execution_outcome: ExecutionOutcome, + #[serde(default)] + trie_updates: BTreeMap>, + #[serde(default)] + hashed_state: BTreeMap>, + } + + impl Serialize for Chain { + fn serialize(&self, serializer: S) -> Result + where + S: Serializer, + { + // Wait for deferred trie data for serialization + let trie_updates: BTreeMap<_, _> = self + .trie_data + .iter() + .map(|(num, data)| (*num, data.wait_cloned().trie_updates)) + .collect(); + let hashed_state: BTreeMap<_, _> = self + .trie_data + .iter() + .map(|(num, data)| (*num, data.wait_cloned().hashed_state)) + .collect(); + + let repr = ChainRepr:: { + blocks: self.blocks.clone(), + execution_outcome: self.execution_outcome.clone(), + trie_updates, + hashed_state, + }; + repr.serialize(serializer) + } + } + + impl<'de, N: NodePrimitives> Deserialize<'de> for Chain { + fn deserialize(deserializer: D) -> Result + where + D: Deserializer<'de>, + { + let repr = ChainRepr::::deserialize(deserializer)?; + + // Convert to ready DeferredTrieData handles + let trie_data = repr + .trie_updates + .into_iter() + .map(|(num, trie_updates)| { + let hashed_state = repr.hashed_state.get(&num).cloned().unwrap_or_default(); + let computed = ComputedTrieData::without_trie_input(hashed_state, trie_updates); + (num, DeferredTrieData::ready(computed)) + }) + .collect(); + + Ok(Self { blocks: repr.blocks, execution_outcome: repr.execution_outcome, trie_data }) + } + } +} + /// Bincode-compatible [`Chain`] serde implementation. #[cfg(feature = "serde-bincode-compat")] pub(super) mod serde_bincode_compat { - use crate::{serde_bincode_compat, ExecutionOutcome}; - use alloc::{borrow::Cow, collections::BTreeMap, sync::Arc}; use alloy_primitives::BlockNumber; use reth_ethereum_primitives::EthPrimitives; + use reth_execution_types::{ + serde_bincode_compat as exec_serde_bincode_compat, ExecutionOutcome, + }; use reth_primitives_traits::{ serde_bincode_compat::{RecoveredBlock, SerdeBincodeCompat}, Block, NodePrimitives, }; use serde::{ser::SerializeMap, Deserialize, Deserializer, Serialize, Serializer}; use serde_with::{DeserializeAs, SerializeAs}; + use std::{borrow::Cow, collections::BTreeMap, sync::Arc}; /// Bincode-compatible [`super::Chain`] serde implementation. /// /// Intended to use with the [`serde_with::serde_as`] macro in the following way: /// ```rust - /// use reth_execution_types::{serde_bincode_compat, Chain}; + /// use reth_chain::{serde_bincode_compat, Chain}; /// use serde::{Deserialize, Serialize}; /// use serde_with::serde_as; /// @@ -515,7 +597,7 @@ pub(super) mod serde_bincode_compat { >, { blocks: RecoveredBlocks<'a, N::Block>, - execution_outcome: serde_bincode_compat::ExecutionOutcome<'a, N::Receipt>, + execution_outcome: exec_serde_bincode_compat::ExecutionOutcome<'a, N::Receipt>, #[serde(default, rename = "trie_updates_legacy")] _trie_updates_legacy: Option>, @@ -571,31 +653,6 @@ pub(super) mod serde_bincode_compat { } } - impl<'a, N> From<&'a super::Chain> for Chain<'a, N> - where - N: NodePrimitives< - Block: Block + 'static, - >, - { - fn from(value: &'a super::Chain) -> Self { - Self { - blocks: RecoveredBlocks(Cow::Borrowed(&value.blocks)), - execution_outcome: value.execution_outcome.as_repr(), - _trie_updates_legacy: None, - trie_updates: value - .trie_updates - .iter() - .map(|(k, v)| (*k, v.as_ref().into())) - .collect(), - hashed_state: value - .hashed_state - .iter() - .map(|(k, v)| (*k, v.as_ref().into())) - .collect(), - } - } - } - impl<'a, N> From> for super::Chain where N: NodePrimitives< @@ -603,19 +660,26 @@ pub(super) mod serde_bincode_compat { >, { fn from(value: Chain<'a, N>) -> Self { + use crate::{ComputedTrieData, DeferredTrieData}; + + let trie_updates: BTreeMap<_, _> = + value.trie_updates.into_iter().map(|(k, v)| (k, Arc::new(v.into()))).collect(); + let hashed_state: BTreeMap<_, _> = + value.hashed_state.into_iter().map(|(k, v)| (k, Arc::new(v.into()))).collect(); + + let trie_data = trie_updates + .into_iter() + .map(|(num, trie_updates)| { + let hashed_state = hashed_state.get(&num).cloned().unwrap_or_default(); + let computed = ComputedTrieData::without_trie_input(hashed_state, trie_updates); + (num, DeferredTrieData::ready(computed)) + }) + .collect(); + Self { blocks: value.blocks.0.into_owned(), execution_outcome: ExecutionOutcome::from_repr(value.execution_outcome), - trie_updates: value - .trie_updates - .into_iter() - .map(|(k, v)| (k, Arc::new(v.into()))) - .collect(), - hashed_state: value - .hashed_state - .into_iter() - .map(|(k, v)| (k, Arc::new(v.into()))) - .collect(), + trie_data, } } } @@ -630,7 +694,31 @@ pub(super) mod serde_bincode_compat { where S: Serializer, { - Chain::from(source).serialize(serializer) + use reth_trie_common::serde_bincode_compat as trie_serde; + + // Wait for deferred trie data and collect into maps we can borrow from + let trie_updates_data: BTreeMap = + source.trie_data.iter().map(|(k, v)| (*k, v.wait_cloned().trie_updates)).collect(); + let hashed_state_data: BTreeMap = + source.trie_data.iter().map(|(k, v)| (*k, v.wait_cloned().hashed_state)).collect(); + + // Now create the serde-compatible struct borrowing from the collected data + let chain: Chain<'_, N> = Chain { + blocks: RecoveredBlocks(Cow::Borrowed(&source.blocks)), + execution_outcome: source.execution_outcome.as_repr(), + _trie_updates_legacy: None, + trie_updates: trie_updates_data + .iter() + .map(|(k, v)| (*k, trie_serde::updates::TrieUpdatesSorted::from(v.as_ref()))) + .collect(), + hashed_state: hashed_state_data + .iter() + .map(|(k, v)| { + (*k, trie_serde::hashed_state::HashedPostStateSorted::from(v.as_ref())) + }) + .collect(), + }; + chain.serialize(serializer) } } @@ -659,10 +747,10 @@ pub(super) mod serde_bincode_compat { #[test] fn test_chain_bincode_roundtrip() { - use alloc::collections::BTreeMap; + use std::collections::BTreeMap; #[serde_as] - #[derive(Debug, PartialEq, Eq, Serialize, Deserialize)] + #[derive(Debug, Serialize, Deserialize)] struct Data { #[serde_as(as = "serde_bincode_compat::Chain")] chain: Chain, @@ -676,13 +764,14 @@ pub(super) mod serde_bincode_compat { .unwrap()], Default::default(), BTreeMap::new(), - BTreeMap::new(), ), }; let encoded = bincode::serialize(&data).unwrap(); let decoded: Data = bincode::deserialize(&encoded).unwrap(); - assert_eq!(decoded, data); + // Note: Can't compare directly because DeferredTrieData doesn't implement PartialEq + assert_eq!(decoded.chain.blocks, data.chain.blocks); + assert_eq!(decoded.chain.execution_outcome, data.chain.execution_outcome); } } } @@ -776,12 +865,8 @@ mod tests { let mut block_state_extended = execution_outcome1; block_state_extended.extend(execution_outcome2); - let chain: Chain = Chain::new( - vec![block1.clone(), block2.clone()], - block_state_extended, - BTreeMap::new(), - BTreeMap::new(), - ); + let chain: Chain = + Chain::new(vec![block1.clone(), block2.clone()], block_state_extended, BTreeMap::new()); // return tip state assert_eq!( diff --git a/crates/chain-state/src/deferred_trie.rs b/crates/evm/chain/src/deferred_trie.rs similarity index 99% rename from crates/chain-state/src/deferred_trie.rs rename to crates/evm/chain/src/deferred_trie.rs index efe23a2ded3..9c870d02a40 100644 --- a/crates/chain-state/src/deferred_trie.rs +++ b/crates/evm/chain/src/deferred_trie.rs @@ -8,6 +8,7 @@ use reth_trie::{ use std::{ fmt, sync::{Arc, LazyLock}, + vec::Vec, }; use tracing::instrument; diff --git a/crates/evm/chain/src/lib.rs b/crates/evm/chain/src/lib.rs new file mode 100644 index 00000000000..38e7485de10 --- /dev/null +++ b/crates/evm/chain/src/lib.rs @@ -0,0 +1,30 @@ +//! Chain and deferred trie data types for reth. +//! +//! This crate contains the [`Chain`] type representing a chain of blocks and their final state, +//! as well as [`DeferredTrieData`] for handling asynchronously computed trie data. + +#![doc( + html_logo_url = "https://raw.githubusercontent.com/paradigmxyz/reth/main/assets/reth-docs.png", + html_favicon_url = "https://avatars0.githubusercontent.com/u/97369466?s=256", + issue_tracker_base_url = "https://github.com/paradigmxyz/reth/issues/" +)] +#![cfg_attr(not(test), warn(unused_crate_dependencies))] +#![cfg_attr(docsrs, feature(doc_cfg))] + +mod chain; +pub use chain::*; + +mod deferred_trie; +pub use deferred_trie::*; + +/// Bincode-compatible serde implementations for chain types. +/// +/// `bincode` crate doesn't work with optionally serializable serde fields, but some of the +/// chain types require optional serialization for RPC compatibility. This module makes so that +/// all fields are serialized. +/// +/// Read more: +#[cfg(feature = "serde-bincode-compat")] +pub mod serde_bincode_compat { + pub use super::chain::serde_bincode_compat::*; +} diff --git a/crates/evm/execution-types/Cargo.toml b/crates/evm/execution-types/Cargo.toml index 6c53e315b32..982408226f3 100644 --- a/crates/evm/execution-types/Cargo.toml +++ b/crates/evm/execution-types/Cargo.toml @@ -19,7 +19,6 @@ revm.workspace = true # alloy alloy-evm.workspace = true -alloy-consensus.workspace = true alloy-primitives.workspace = true alloy-eips.workspace = true @@ -45,7 +44,6 @@ serde = [ "alloy-eips/serde", "alloy-primitives/serde", "reth-primitives-traits/serde", - "alloy-consensus/serde", "reth-trie-common/serde", "reth-ethereum-primitives/serde", ] @@ -55,7 +53,6 @@ serde-bincode-compat = [ "reth-primitives-traits/serde-bincode-compat", "serde_with", "alloy-eips/serde-bincode-compat", - "alloy-consensus/serde-bincode-compat", "reth-ethereum-primitives/serde-bincode-compat", ] std = [ @@ -64,7 +61,6 @@ std = [ "revm/std", "serde?/std", "reth-primitives-traits/std", - "alloy-consensus/std", "serde_with?/std", "derive_more/std", "reth-ethereum-primitives/std", diff --git a/crates/evm/execution-types/src/execution_outcome.rs b/crates/evm/execution-types/src/execution_outcome.rs index 6df354219ea..9c2842899e6 100644 --- a/crates/evm/execution-types/src/execution_outcome.rs +++ b/crates/evm/execution-types/src/execution_outcome.rs @@ -564,8 +564,8 @@ pub(super) mod serde_bincode_compat { #[cfg(test)] mod tests { use super::*; - use alloy_consensus::TxType; use alloy_primitives::{bytes, Address, LogData, B256}; + use reth_ethereum_primitives::TxType; #[test] fn test_initialization() { diff --git a/crates/evm/execution-types/src/lib.rs b/crates/evm/execution-types/src/lib.rs index 8b795981fb5..f3dcc166eb3 100644 --- a/crates/evm/execution-types/src/lib.rs +++ b/crates/evm/execution-types/src/lib.rs @@ -11,9 +11,6 @@ extern crate alloc; -mod chain; -pub use chain::*; - mod execute; pub use execute::*; @@ -29,5 +26,5 @@ pub use execution_outcome::*; /// Read more: #[cfg(feature = "serde-bincode-compat")] pub mod serde_bincode_compat { - pub use super::{chain::serde_bincode_compat::*, execution_outcome::serde_bincode_compat::*}; + pub use super::execution_outcome::serde_bincode_compat::*; } diff --git a/crates/exex/exex/Cargo.toml b/crates/exex/exex/Cargo.toml index 189cd509655..8a550db8a73 100644 --- a/crates/exex/exex/Cargo.toml +++ b/crates/exex/exex/Cargo.toml @@ -48,6 +48,7 @@ itertools = { workspace = true, features = ["use_std"] } metrics.workspace = true parking_lot.workspace = true rmp-serde.workspace = true +serde_with.workspace = true thiserror.workspace = true tracing.workspace = true diff --git a/crates/exex/exex/src/backfill/job.rs b/crates/exex/exex/src/backfill/job.rs index 57b180eb30b..2d8d699d737 100644 --- a/crates/exex/exex/src/backfill/job.rs +++ b/crates/exex/exex/src/backfill/job.rs @@ -149,7 +149,7 @@ where executor.into_state().take_bundle(), results, ); - let chain = Chain::new(blocks, outcome, BTreeMap::new(), BTreeMap::new()); + let chain = Chain::new(blocks, outcome, BTreeMap::new()); Ok(chain) } } diff --git a/crates/exex/exex/src/manager.rs b/crates/exex/exex/src/manager.rs index b28aef51246..eadb7b81979 100644 --- a/crates/exex/exex/src/manager.rs +++ b/crates/exex/exex/src/manager.rs @@ -796,21 +796,20 @@ mod tests { block1.set_block_number(10); let notification1 = ExExNotification::ChainCommitted { - new: Arc::new(Chain::new( - vec![block1.clone()], - Default::default(), - Default::default(), - Default::default(), - )), + new: Arc::new(Chain::new(vec![block1.clone()], Default::default(), Default::default())), }; // Push the first notification - exex_manager.push_notification(notification1.clone()); + exex_manager.push_notification(notification1); // Verify the buffer contains the notification with the correct ID assert_eq!(exex_manager.buffer.len(), 1); assert_eq!(exex_manager.buffer.front().unwrap().0, 0); - assert_eq!(exex_manager.buffer.front().unwrap().1, notification1); + // Compare by tip block since ExExNotification doesn't implement PartialEq + assert_eq!( + *exex_manager.buffer.front().unwrap().1.committed_chain().unwrap().tip(), + block1 + ); assert_eq!(exex_manager.next_id, 1); // Push another notification @@ -819,22 +818,20 @@ mod tests { block2.set_block_number(20); let notification2 = ExExNotification::ChainCommitted { - new: Arc::new(Chain::new( - vec![block2.clone()], - Default::default(), - Default::default(), - Default::default(), - )), + new: Arc::new(Chain::new(vec![block2.clone()], Default::default(), Default::default())), }; - exex_manager.push_notification(notification2.clone()); + exex_manager.push_notification(notification2); // Verify the buffer contains both notifications with correct IDs assert_eq!(exex_manager.buffer.len(), 2); assert_eq!(exex_manager.buffer.front().unwrap().0, 0); - assert_eq!(exex_manager.buffer.front().unwrap().1, notification1); + assert_eq!( + *exex_manager.buffer.front().unwrap().1.committed_chain().unwrap().tip(), + block1 + ); assert_eq!(exex_manager.buffer.get(1).unwrap().0, 1); - assert_eq!(exex_manager.buffer.get(1).unwrap().1, notification2); + assert_eq!(*exex_manager.buffer.get(1).unwrap().1.committed_chain().unwrap().tip(), block2); assert_eq!(exex_manager.next_id, 2); } @@ -867,12 +864,7 @@ mod tests { block1.set_block_number(10); let notification1 = ExExNotification::ChainCommitted { - new: Arc::new(Chain::new( - vec![block1.clone()], - Default::default(), - Default::default(), - Default::default(), - )), + new: Arc::new(Chain::new(vec![block1.clone()], Default::default(), Default::default())), }; exex_manager.push_notification(notification1.clone()); @@ -1100,7 +1092,6 @@ mod tests { vec![Default::default()], Default::default(), Default::default(), - Default::default(), )), }; @@ -1166,10 +1157,10 @@ mod tests { block2.set_block_number(11); // Setup a notification + let expected_block: RecoveredBlock = Default::default(); let notification = ExExNotification::ChainCommitted { new: Arc::new(Chain::new( - vec![Default::default()], - Default::default(), + vec![expected_block.clone()], Default::default(), Default::default(), )), @@ -1181,7 +1172,8 @@ mod tests { match exex_handle.send(&mut cx, &(22, notification.clone())) { Poll::Ready(Ok(())) => { let received_notification = notifications.next().await.unwrap().unwrap(); - assert_eq!(received_notification, notification); + // Compare by tip block since ExExNotification doesn't implement PartialEq + assert_eq!(*received_notification.committed_chain().unwrap().tip(), expected_block); } Poll::Pending => panic!("Notification send is pending"), Poll::Ready(Err(e)) => panic!("Failed to send notification: {e:?}"), @@ -1216,12 +1208,7 @@ mod tests { block1.set_block_number(10); let notification = ExExNotification::ChainCommitted { - new: Arc::new(Chain::new( - vec![block1.clone()], - Default::default(), - Default::default(), - Default::default(), - )), + new: Arc::new(Chain::new(vec![block1.clone()], Default::default(), Default::default())), }; let mut cx = Context::from_waker(futures::task::noop_waker_ref()); @@ -1278,7 +1265,9 @@ mod tests { match exex_handle.send(&mut cx, &(22, notification.clone())) { Poll::Ready(Ok(())) => { let received_notification = notifications.next().await.unwrap().unwrap(); - assert_eq!(received_notification, notification); + // Compare by checking that both are reorgs with empty chains + assert!(received_notification.committed_chain().is_some()); + assert!(received_notification.reverted_chain().is_some()); } Poll::Pending | Poll::Ready(Err(_)) => { panic!("Notification should not be pending or fail") @@ -1318,7 +1307,9 @@ mod tests { match exex_handle.send(&mut cx, &(22, notification.clone())) { Poll::Ready(Ok(())) => { let received_notification = notifications.next().await.unwrap().unwrap(); - assert_eq!(received_notification, notification); + // Compare by checking that it's a revert with empty chain + assert!(received_notification.reverted_chain().is_some()); + assert!(received_notification.committed_chain().is_none()); } Poll::Pending | Poll::Ready(Err(_)) => { panic!("Notification should not be pending or fail") @@ -1371,16 +1362,10 @@ mod tests { vec![genesis_block.clone()], Default::default(), BTreeMap::new(), - BTreeMap::new(), )), }; let notification = ExExNotification::ChainCommitted { - new: Arc::new(Chain::new( - vec![block.clone()], - Default::default(), - BTreeMap::new(), - BTreeMap::new(), - )), + new: Arc::new(Chain::new(vec![block.clone()], Default::default(), BTreeMap::new())), }; let (finalized_headers_tx, rx) = watch::channel(None); @@ -1397,34 +1382,38 @@ mod tests { let mut cx = Context::from_waker(futures::task::noop_waker_ref()); - exex_manager - .handle() - .send(ExExNotificationSource::Pipeline, genesis_notification.clone())?; - exex_manager.handle().send(ExExNotificationSource::BlockchainTree, notification.clone())?; + exex_manager.handle().send(ExExNotificationSource::Pipeline, genesis_notification)?; + exex_manager.handle().send(ExExNotificationSource::BlockchainTree, notification)?; assert!(exex_manager.as_mut().poll(&mut cx)?.is_pending()); - assert_eq!( - notifications.try_poll_next_unpin(&mut cx)?, - Poll::Ready(Some(genesis_notification)) - ); + // Check genesis notification received + let poll_result = notifications.try_poll_next_unpin(&mut cx)?; + if let Poll::Ready(Some(n)) = poll_result { + assert_eq!(*n.committed_chain().unwrap().tip(), genesis_block); + } else { + panic!("Expected genesis notification"); + } assert!(exex_manager.as_mut().poll(&mut cx)?.is_pending()); - assert_eq!( - notifications.try_poll_next_unpin(&mut cx)?, - Poll::Ready(Some(notification.clone())) - ); + // Check block notification received + let poll_result = notifications.try_poll_next_unpin(&mut cx)?; + if let Poll::Ready(Some(n)) = poll_result { + assert_eq!(*n.committed_chain().unwrap().tip(), block); + } else { + panic!("Expected block notification"); + } // WAL shouldn't contain the genesis notification, because it's finalized - assert_eq!( - exex_manager.wal.iter_notifications()?.collect::>>()?, - std::slice::from_ref(¬ification) - ); + let wal_notifications = + exex_manager.wal.iter_notifications()?.collect::>>()?; + assert_eq!(wal_notifications.len(), 1); + assert_eq!(*wal_notifications[0].committed_chain().unwrap().tip(), block); finalized_headers_tx.send(Some(block.clone_sealed_header()))?; assert!(exex_manager.as_mut().poll(&mut cx).is_pending()); // WAL isn't finalized because the ExEx didn't emit the `FinishedHeight` event - assert_eq!( - exex_manager.wal.iter_notifications()?.collect::>>()?, - std::slice::from_ref(¬ification) - ); + let wal_notifications = + exex_manager.wal.iter_notifications()?.collect::>>()?; + assert_eq!(wal_notifications.len(), 1); + assert_eq!(*wal_notifications[0].committed_chain().unwrap().tip(), block); // Send a `FinishedHeight` event with a non-canonical block events_tx @@ -1435,10 +1424,10 @@ mod tests { assert!(exex_manager.as_mut().poll(&mut cx).is_pending()); // WAL isn't finalized because the ExEx emitted a `FinishedHeight` event with a // non-canonical block - assert_eq!( - exex_manager.wal.iter_notifications()?.collect::>>()?, - std::slice::from_ref(¬ification) - ); + let wal_notifications = + exex_manager.wal.iter_notifications()?.collect::>>()?; + assert_eq!(wal_notifications.len(), 1); + assert_eq!(*wal_notifications[0].committed_chain().unwrap().tip(), block); // Send a `FinishedHeight` event with a canonical block events_tx.send(ExExEvent::FinishedHeight(block.num_hash())).unwrap(); @@ -1446,7 +1435,7 @@ mod tests { finalized_headers_tx.send(Some(block.clone_sealed_header()))?; assert!(exex_manager.as_mut().poll(&mut cx).is_pending()); // WAL is finalized - assert_eq!(exex_manager.wal.iter_notifications()?.next().transpose()?, None); + assert!(exex_manager.wal.iter_notifications()?.next().is_none()); Ok(()) } @@ -1492,12 +1481,7 @@ mod tests { let mut make_notif = |id: u64| { let block = random_block(&mut rng, id, BlockParams::default()).try_recover().unwrap(); ExExNotification::ChainCommitted { - new: Arc::new(Chain::new( - vec![block], - Default::default(), - Default::default(), - Default::default(), - )), + new: Arc::new(Chain::new(vec![block], Default::default(), Default::default())), } }; diff --git a/crates/exex/exex/src/notifications.rs b/crates/exex/exex/src/notifications.rs index e32f065aba0..2b5d6d93d18 100644 --- a/crates/exex/exex/src/notifications.rs +++ b/crates/exex/exex/src/notifications.rs @@ -449,7 +449,7 @@ mod tests { use crate::Wal; use alloy_consensus::Header; use alloy_eips::BlockNumHash; - use eyre::OptionExt; + use futures::StreamExt; use reth_db_common::init::init_genesis; use reth_ethereum_primitives::Block; @@ -491,17 +491,17 @@ mod tests { let exex_head = ExExHead { block: BlockNumHash { number: genesis_block.number, hash: genesis_hash } }; + let expected_block = random_block( + &mut rng, + node_head.number + 1, + BlockParams { parent: Some(node_head.hash), ..Default::default() }, + ) + .try_recover()?; let notification = ExExNotification::ChainCommitted { new: Arc::new(Chain::new( - vec![random_block( - &mut rng, - node_head.number + 1, - BlockParams { parent: Some(node_head.hash), ..Default::default() }, - ) - .try_recover()?], + vec![expected_block.clone()], Default::default(), BTreeMap::new(), - BTreeMap::new(), )), }; @@ -519,23 +519,16 @@ mod tests { .with_head(exex_head); // First notification is the backfill of missing blocks from the canonical chain - assert_eq!( - notifications.next().await.transpose()?, - Some(ExExNotification::ChainCommitted { - new: Arc::new( - BackfillJobFactory::new( - notifications.evm_config.clone(), - notifications.provider.clone() - ) - .backfill(1..=1) - .next() - .ok_or_eyre("failed to backfill")?? - ) - }) - ); + let backfill_notification = notifications.next().await.transpose()?; + assert!(backfill_notification.is_some()); + // Verify it's a commit notification with the expected block range + let backfill_chain = backfill_notification.unwrap().committed_chain().unwrap(); + assert_eq!(backfill_chain.first().header().number(), 1); // Second notification is the actual notification that we sent before - assert_eq!(notifications.next().await.transpose()?, Some(notification)); + let received = notifications.next().await.transpose()?; + assert!(received.is_some()); + assert_eq!(*received.unwrap().committed_chain().unwrap().tip(), expected_block); Ok(()) } @@ -556,21 +549,21 @@ mod tests { let node_head = BlockNumHash { number: genesis_block.number, hash: genesis_hash }; let exex_head = ExExHead { block: node_head }; + let expected_block = Block { + header: Header { + parent_hash: node_head.hash, + number: node_head.number + 1, + ..Default::default() + }, + ..Default::default() + } + .seal_slow() + .try_recover()?; let notification = ExExNotification::ChainCommitted { new: Arc::new(Chain::new( - vec![Block { - header: Header { - parent_hash: node_head.hash, - number: node_head.number + 1, - ..Default::default() - }, - ..Default::default() - } - .seal_slow() - .try_recover()?], + vec![expected_block.clone()], Default::default(), BTreeMap::new(), - BTreeMap::new(), )), }; @@ -588,7 +581,8 @@ mod tests { .with_head(exex_head); let new_notification = notifications.next().await.transpose()?; - assert_eq!(new_notification, Some(notification)); + assert!(new_notification.is_some()); + assert_eq!(*new_notification.unwrap().committed_chain().unwrap().tip(), expected_block); Ok(()) } @@ -618,7 +612,7 @@ mod tests { let provider_rw = provider.database_provider_rw()?; provider_rw.insert_block(&node_head_block)?; provider_rw.commit()?; - let node_head_notification = ExExNotification::ChainCommitted { + let _node_head_notification = ExExNotification::ChainCommitted { new: Arc::new( BackfillJobFactory::new(EthEvmConfig::mainnet(), provider.clone()) .backfill(node_head.number..=node_head.number) @@ -633,28 +627,24 @@ mod tests { BlockParams { parent: Some(genesis_hash), tx_count: Some(0), ..Default::default() }, ); let exex_head = ExExHead { block: exex_head_block.num_hash() }; + let exex_head_recovered = exex_head_block.clone().try_recover()?; let exex_head_notification = ExExNotification::ChainCommitted { new: Arc::new(Chain::new( - vec![exex_head_block.clone().try_recover()?], + vec![exex_head_recovered.clone()], Default::default(), BTreeMap::new(), - BTreeMap::new(), )), }; wal.commit(&exex_head_notification)?; + let new_block = random_block( + &mut rng, + node_head.number + 1, + BlockParams { parent: Some(node_head.hash), ..Default::default() }, + ) + .try_recover()?; let new_notification = ExExNotification::ChainCommitted { - new: Arc::new(Chain::new( - vec![random_block( - &mut rng, - node_head.number + 1, - BlockParams { parent: Some(node_head.hash), ..Default::default() }, - ) - .try_recover()?], - Default::default(), - BTreeMap::new(), - BTreeMap::new(), - )), + new: Arc::new(Chain::new(vec![new_block.clone()], Default::default(), BTreeMap::new())), }; let (notifications_tx, notifications_rx) = mpsc::channel(1); @@ -672,15 +662,25 @@ mod tests { // First notification is the revert of the ExEx head block to get back to the canonical // chain + let revert_notification = notifications.next().await.transpose()?; + assert!(revert_notification.is_some()); + // Verify it's a revert with the exex_head block assert_eq!( - notifications.next().await.transpose()?, - Some(exex_head_notification.into_inverted()) + *revert_notification.unwrap().reverted_chain().unwrap().tip(), + exex_head_recovered ); // Second notification is the backfilled block from the canonical chain to get back to the // canonical tip - assert_eq!(notifications.next().await.transpose()?, Some(node_head_notification)); + let backfill_notification = notifications.next().await.transpose()?; + assert!(backfill_notification.is_some()); + assert_eq!( + backfill_notification.unwrap().committed_chain().unwrap().tip().header().number(), + node_head.number + ); // Third notification is the actual notification that we sent before - assert_eq!(notifications.next().await.transpose()?, Some(new_notification)); + let received = notifications.next().await.transpose()?; + assert!(received.is_some()); + assert_eq!(*received.unwrap().committed_chain().unwrap().tip(), new_block); Ok(()) } @@ -706,12 +706,12 @@ mod tests { genesis_block.number + 1, BlockParams { parent: Some(genesis_hash), tx_count: Some(0), ..Default::default() }, ); + let exex_head_recovered = exex_head_block.clone().try_recover()?; let exex_head_notification = ExExNotification::ChainCommitted { new: Arc::new(Chain::new( - vec![exex_head_block.clone().try_recover()?], + vec![exex_head_recovered.clone()], Default::default(), BTreeMap::new(), - BTreeMap::new(), )), }; wal.commit(&exex_head_notification)?; @@ -721,18 +721,14 @@ mod tests { block: BlockNumHash { number: exex_head_block.number, hash: exex_head_block.hash() }, }; + let new_block = random_block( + &mut rng, + genesis_block.number + 1, + BlockParams { parent: Some(genesis_hash), ..Default::default() }, + ) + .try_recover()?; let new_notification = ExExNotification::ChainCommitted { - new: Arc::new(Chain::new( - vec![random_block( - &mut rng, - genesis_block.number + 1, - BlockParams { parent: Some(genesis_hash), ..Default::default() }, - ) - .try_recover()?], - Default::default(), - BTreeMap::new(), - BTreeMap::new(), - )), + new: Arc::new(Chain::new(vec![new_block.clone()], Default::default(), BTreeMap::new())), }; let (notifications_tx, notifications_rx) = mpsc::channel(1); @@ -750,13 +746,17 @@ mod tests { // First notification is the revert of the ExEx head block to get back to the canonical // chain + let revert_notification = notifications.next().await.transpose()?; + assert!(revert_notification.is_some()); assert_eq!( - notifications.next().await.transpose()?, - Some(exex_head_notification.into_inverted()) + *revert_notification.unwrap().reverted_chain().unwrap().tip(), + exex_head_recovered ); // Second notification is the actual notification that we sent before - assert_eq!(notifications.next().await.transpose()?, Some(new_notification)); + let received = notifications.next().await.transpose()?; + assert!(received.is_some()); + assert_eq!(*received.unwrap().committed_chain().unwrap().tip(), new_block); Ok(()) } diff --git a/crates/exex/exex/src/wal/mod.rs b/crates/exex/exex/src/wal/mod.rs index 7ab49b6e0de..a59c7202b14 100644 --- a/crates/exex/exex/src/wal/mod.rs +++ b/crates/exex/exex/src/wal/mod.rs @@ -255,6 +255,36 @@ mod tests { }) } + fn notifications_equal(a: &[ExExNotification], b: &[ExExNotification]) -> bool { + if a.len() != b.len() { + return false; + } + a.iter().zip(b.iter()).all(|(n1, n2)| { + let committed_eq = match (n1.committed_chain(), n2.committed_chain()) { + (Some(c1), Some(c2)) => { + c1.tip().hash() == c2.tip().hash() && c1.blocks() == c2.blocks() + } + (None, None) => true, + _ => false, + }; + let reverted_eq = match (n1.reverted_chain(), n2.reverted_chain()) { + (Some(c1), Some(c2)) => { + c1.tip().hash() == c2.tip().hash() && c1.blocks() == c2.blocks() + } + (None, None) => true, + _ => false, + }; + committed_eq && reverted_eq + }) + } + + fn assert_notifications_eq(actual: Vec, expected: Vec) { + assert!( + notifications_equal(&actual, &expected), + "notifications mismatch:\nactual: {actual:?}\nexpected: {expected:?}" + ); + } + fn sort_committed_blocks( committed_blocks: Vec<(B256, u32, CachedBlock)>, ) -> Vec<(B256, u32, CachedBlock)> { @@ -304,37 +334,24 @@ mod tests { vec![blocks[0].clone(), blocks[1].clone()], Default::default(), BTreeMap::new(), - BTreeMap::new(), )), }; let reverted_notification = ExExNotification::ChainReverted { - old: Arc::new(Chain::new( - vec![blocks[1].clone()], - Default::default(), - BTreeMap::new(), - BTreeMap::new(), - )), + old: Arc::new(Chain::new(vec![blocks[1].clone()], Default::default(), BTreeMap::new())), }; let committed_notification_2 = ExExNotification::ChainCommitted { new: Arc::new(Chain::new( vec![block_1_reorged.clone(), blocks[2].clone()], Default::default(), BTreeMap::new(), - BTreeMap::new(), )), }; let reorged_notification = ExExNotification::ChainReorged { - old: Arc::new(Chain::new( - vec![blocks[2].clone()], - Default::default(), - BTreeMap::new(), - BTreeMap::new(), - )), + old: Arc::new(Chain::new(vec![blocks[2].clone()], Default::default(), BTreeMap::new())), new: Arc::new(Chain::new( vec![block_2_reorged.clone(), blocks[3].clone()], Default::default(), BTreeMap::new(), - BTreeMap::new(), )), }; @@ -371,7 +388,7 @@ mod tests { wal.inner.block_cache().committed_blocks_sorted(), committed_notification_1_cache_committed_blocks ); - assert_eq!(read_notifications(&wal)?, vec![committed_notification_1.clone()]); + assert_notifications_eq(read_notifications(&wal)?, vec![committed_notification_1.clone()]); // Second notification (revert block 1) wal.commit(&reverted_notification)?; @@ -385,9 +402,9 @@ mod tests { wal.inner.block_cache().committed_blocks_sorted(), committed_notification_1_cache_committed_blocks ); - assert_eq!( + assert_notifications_eq( read_notifications(&wal)?, - vec![committed_notification_1.clone(), reverted_notification.clone()] + vec![committed_notification_1.clone(), reverted_notification.clone()], ); // Third notification (commit block 1, 2) @@ -430,13 +447,13 @@ mod tests { .concat() ) ); - assert_eq!( + assert_notifications_eq( read_notifications(&wal)?, vec![ committed_notification_1.clone(), reverted_notification.clone(), - committed_notification_2.clone() - ] + committed_notification_2.clone(), + ], ); // Fourth notification (revert block 2, commit block 2, 3) @@ -481,14 +498,14 @@ mod tests { .concat() ) ); - assert_eq!( + assert_notifications_eq( read_notifications(&wal)?, vec![ committed_notification_1, reverted_notification, committed_notification_2.clone(), - reorged_notification.clone() - ] + reorged_notification.clone(), + ], ); // Now, finalize the WAL up to the block 1. Block 1 was in the third notification that also @@ -510,9 +527,9 @@ mod tests { .concat() ) ); - assert_eq!( + assert_notifications_eq( read_notifications(&wal)?, - vec![committed_notification_2.clone(), reorged_notification.clone()] + vec![committed_notification_2.clone(), reorged_notification.clone()], ); // Re-open the WAL and verify that the cache population works correctly @@ -531,7 +548,10 @@ mod tests { .concat() ) ); - assert_eq!(read_notifications(&wal)?, vec![committed_notification_2, reorged_notification]); + assert_notifications_eq( + read_notifications(&wal)?, + vec![committed_notification_2, reorged_notification], + ); Ok(()) } diff --git a/crates/exex/exex/src/wal/storage.rs b/crates/exex/exex/src/wal/storage.rs index af58eba7e0b..bb118c8a98a 100644 --- a/crates/exex/exex/src/wal/storage.rs +++ b/crates/exex/exex/src/wal/storage.rs @@ -163,12 +163,16 @@ where let file_path = self.file_path(file_id); debug!(target: "exex::wal::storage", ?file_path, "Writing notification to WAL"); - // Serialize using the bincode- and msgpack-compatible serde wrapper - let notification = - reth_exex_types::serde_bincode_compat::ExExNotification::::from(notification); - + // Serialize using the bincode- and msgpack-compatible serde wrapper via SerializeAs reth_fs_util::atomic_write_file(&file_path, |file| { - rmp_serde::encode::write(file, ¬ification) + use serde_with::SerializeAs; + let mut buf = Vec::new(); + reth_exex_types::serde_bincode_compat::ExExNotification::<'_, N>::serialize_as( + notification, + &mut rmp_serde::Serializer::new(&mut buf), + ) + .map_err(|err| std::io::Error::new(std::io::ErrorKind::InvalidData, err))?; + std::io::Write::write_all(file, &buf) })?; Ok(file_path.metadata().map_err(|err| WalError::FileMetadata(file_id, err))?.len()) @@ -224,8 +228,10 @@ mod tests { // Get expected data let expected_notification = get_test_notification_data().unwrap(); + // Compare by tip block since ExExNotification doesn't implement PartialEq assert_eq!( - ¬ification, &expected_notification, + *notification.committed_chain().unwrap().tip(), + *expected_notification.committed_chain().unwrap().tip(), "Decoded notification should match expected static data" ); } @@ -241,28 +247,18 @@ mod tests { let new_block = random_block(&mut rng, 0, Default::default()).try_recover()?; let notification = ExExNotification::ChainReorged { - new: Arc::new(Chain::new( - vec![new_block], - Default::default(), - BTreeMap::new(), - BTreeMap::new(), - )), - old: Arc::new(Chain::new( - vec![old_block], - Default::default(), - BTreeMap::new(), - BTreeMap::new(), - )), + new: Arc::new(Chain::new(vec![new_block.clone()], Default::default(), BTreeMap::new())), + old: Arc::new(Chain::new(vec![old_block.clone()], Default::default(), BTreeMap::new())), }; // Do a round trip serialization and deserialization let file_id = 0; storage.write_notification(file_id, ¬ification)?; let deserialized_notification = storage.read_notification(file_id)?; - assert_eq!( - deserialized_notification.map(|(notification, _)| notification), - Some(notification) - ); + // Compare by chain tips since ExExNotification doesn't implement PartialEq + let deserialized = deserialized_notification.map(|(n, _)| n).unwrap(); + assert_eq!(*deserialized.committed_chain().unwrap().tip(), new_block); + assert_eq!(*deserialized.reverted_chain().unwrap().tip(), old_block); Ok(()) } @@ -280,10 +276,14 @@ mod tests { let notification = get_test_notification_data()?; - // Serialize the notification - let notification_compat = - reth_exex_types::serde_bincode_compat::ExExNotification::from(¬ification); - let encoded = rmp_serde::encode::to_vec(¬ification_compat)?; + // Create a temp storage and write the notification using the existing serialization path + let temp_dir = tempfile::tempdir()?; + let storage = Storage::new(&temp_dir)?; + storage.write_notification(0, ¬ification)?; + + // Read it back as raw bytes + let temp_path = temp_dir.path().join("0.wal"); + let encoded = std::fs::read(&temp_path)?; // Write to test-data directory let test_data_dir = std::path::Path::new(env!("CARGO_MANIFEST_DIR")).join("test-data"); @@ -346,13 +346,18 @@ mod tests { )]), }; + let trie_data = + reth_chain_state::DeferredTrieData::ready(reth_chain_state::ComputedTrieData { + hashed_state: Arc::new(hashed_state.into_sorted()), + trie_updates: Arc::new(trie_updates.into_sorted()), + anchored_trie_input: None, + }); let notification: ExExNotification = ExExNotification::ChainCommitted { new: Arc::new(Chain::new( vec![block], Default::default(), - BTreeMap::from([(block_number, Arc::new(trie_updates.into_sorted()))]), - BTreeMap::from([(block_number, Arc::new(hashed_state.into_sorted()))]), + BTreeMap::from([(block_number, trie_data)]), )), }; Ok(notification) diff --git a/crates/exex/test-utils/Cargo.toml b/crates/exex/test-utils/Cargo.toml index 80ce4167e46..39b116e6786 100644 --- a/crates/exex/test-utils/Cargo.toml +++ b/crates/exex/test-utils/Cargo.toml @@ -12,13 +12,13 @@ workspace = true [dependencies] ## reth +reth-chain.workspace = true reth-chainspec.workspace = true reth-config.workspace = true reth-consensus = { workspace = true, features = ["test-utils"] } reth-db = { workspace = true, features = ["test-utils"] } reth-db-common.workspace = true reth-evm-ethereum = { workspace = true, features = ["test-utils"] } -reth-execution-types.workspace = true reth-exex.workspace = true reth-payload-builder.workspace = true reth-network.workspace = true diff --git a/crates/exex/test-utils/src/lib.rs b/crates/exex/test-utils/src/lib.rs index 8430ea5d91f..d6d112bf88f 100644 --- a/crates/exex/test-utils/src/lib.rs +++ b/crates/exex/test-utils/src/lib.rs @@ -17,6 +17,7 @@ use std::{ use alloy_eips::BlockNumHash; use futures_util::FutureExt; +use reth_chain::Chain; use reth_chainspec::{ChainSpec, MAINNET}; use reth_consensus::test_utils::TestConsensus; use reth_db::{ @@ -28,7 +29,6 @@ use reth_db::{ use reth_db_common::init::init_genesis; use reth_ethereum_primitives::{EthPrimitives, TransactionSigned}; use reth_evm_ethereum::MockEvmConfig; -use reth_execution_types::Chain; use reth_exex::{ExExContext, ExExEvent, ExExNotification, ExExNotifications, Wal}; use reth_network::{config::rng_secret_key, NetworkConfigBuilder, NetworkManager}; use reth_node_api::{ diff --git a/crates/exex/types/Cargo.toml b/crates/exex/types/Cargo.toml index 11dec0246fe..fadbf9c2abd 100644 --- a/crates/exex/types/Cargo.toml +++ b/crates/exex/types/Cargo.toml @@ -13,8 +13,8 @@ workspace = true [dependencies] # reth +reth-chain.workspace = true reth-chain-state.workspace = true -reth-execution-types.workspace = true reth-primitives-traits.workspace = true # reth @@ -36,7 +36,7 @@ rand.workspace = true default = [] serde = [ "dep:serde", - "reth-execution-types/serde", + "reth-chain/serde", "alloy-eips/serde", "alloy-primitives/serde", "rand/serde", @@ -45,7 +45,7 @@ serde = [ "reth-chain-state/serde", ] serde-bincode-compat = [ - "reth-execution-types/serde-bincode-compat", + "reth-chain/serde-bincode-compat", "serde_with", "alloy-eips/serde-bincode-compat", "reth-primitives-traits/serde-bincode-compat", diff --git a/crates/exex/types/src/notification.rs b/crates/exex/types/src/notification.rs index 4813450a010..e076540aec7 100644 --- a/crates/exex/types/src/notification.rs +++ b/crates/exex/types/src/notification.rs @@ -1,12 +1,13 @@ use std::sync::Arc; +use reth_chain::Chain; use reth_chain_state::CanonStateNotification; -use reth_execution_types::Chain; use reth_primitives_traits::NodePrimitives; /// Notifications sent to an `ExEx`. -#[derive(Debug, Clone, PartialEq, Eq)] +#[derive(Debug, Clone)] #[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))] +#[cfg_attr(feature = "serde", serde(bound = ""))] pub enum ExExNotification { /// Chain got committed without a reorg, and only the new chain is returned. ChainCommitted { @@ -73,7 +74,7 @@ impl From> for ExExNotification

/// Bincode-compatible [`ExExNotification`] serde implementation. #[cfg(all(feature = "serde", feature = "serde-bincode-compat"))] pub(super) mod serde_bincode_compat { - use reth_execution_types::serde_bincode_compat::Chain; + use reth_chain::serde_bincode_compat::Chain; use reth_primitives_traits::NodePrimitives; use serde::{Deserialize, Deserializer, Serialize, Serializer}; use serde_with::{DeserializeAs, SerializeAs}; @@ -124,28 +125,6 @@ pub(super) mod serde_bincode_compat { }, } - impl<'a, N> From<&'a super::ExExNotification> for ExExNotification<'a, N> - where - N: NodePrimitives, - { - fn from(value: &'a super::ExExNotification) -> Self { - match value { - super::ExExNotification::ChainCommitted { new } => { - ExExNotification::ChainCommitted { new: Chain::from(new.as_ref()) } - } - super::ExExNotification::ChainReorged { old, new } => { - ExExNotification::ChainReorged { - old: Chain::from(old.as_ref()), - new: Chain::from(new.as_ref()), - } - } - super::ExExNotification::ChainReverted { old } => { - ExExNotification::ChainReverted { old: Chain::from(old.as_ref()) } - } - } - } - } - impl<'a, N> From> for super::ExExNotification where N: NodePrimitives, @@ -176,7 +155,41 @@ pub(super) mod serde_bincode_compat { where S: Serializer, { - ExExNotification::from(source).serialize(serializer) + // Helper that uses Chain's SerializeAs for bincode-compatible serialization + struct ChainWrapper<'a, N: NodePrimitives>(&'a reth_chain::Chain); + + impl Serialize for ChainWrapper<'_, N> { + fn serialize(&self, serializer: S2) -> Result + where + S2: Serializer, + { + Chain::<'_, N>::serialize_as(self.0, serializer) + } + } + + // Create an enum that matches the ExExNotification structure but uses ChainWrapper + #[derive(Serialize)] + #[serde(bound = "")] + #[allow(clippy::enum_variant_names)] + enum Repr<'a, N: NodePrimitives> { + ChainCommitted { new: ChainWrapper<'a, N> }, + ChainReorged { old: ChainWrapper<'a, N>, new: ChainWrapper<'a, N> }, + ChainReverted { old: ChainWrapper<'a, N> }, + } + + match source { + super::ExExNotification::ChainCommitted { new } => { + Repr::ChainCommitted { new: ChainWrapper(new.as_ref()) }.serialize(serializer) + } + super::ExExNotification::ChainReorged { old, new } => Repr::ChainReorged { + old: ChainWrapper(old.as_ref()), + new: ChainWrapper(new.as_ref()), + } + .serialize(serializer), + super::ExExNotification::ChainReverted { old } => { + Repr::ChainReverted { old: ChainWrapper(old.as_ref()) }.serialize(serializer) + } + } } } @@ -197,7 +210,7 @@ pub(super) mod serde_bincode_compat { use super::super::{serde_bincode_compat, ExExNotification}; use arbitrary::Arbitrary; use rand::Rng; - use reth_execution_types::Chain; + use reth_chain::Chain; use reth_primitives_traits::RecoveredBlock; use serde::{Deserialize, Serialize}; use serde_with::serde_as; @@ -206,7 +219,7 @@ pub(super) mod serde_bincode_compat { #[test] fn test_exex_notification_bincode_roundtrip() { #[serde_as] - #[derive(Debug, PartialEq, Eq, Serialize, Deserialize)] + #[derive(Debug, Serialize, Deserialize)] struct Data { #[serde_as( as = "serde_bincode_compat::ExExNotification<'_, reth_ethereum_primitives::EthPrimitives>" @@ -216,28 +229,34 @@ pub(super) mod serde_bincode_compat { let mut bytes = [0u8; 1024]; rand::rng().fill(bytes.as_mut_slice()); + let old_block: reth_primitives_traits::RecoveredBlock = + RecoveredBlock::arbitrary(&mut arbitrary::Unstructured::new(&bytes)).unwrap(); + let new_block: reth_primitives_traits::RecoveredBlock = + RecoveredBlock::arbitrary(&mut arbitrary::Unstructured::new(&bytes)).unwrap(); + let data = Data { notification: ExExNotification::ChainReorged { - old: Arc::new(Chain::new( - vec![RecoveredBlock::arbitrary(&mut arbitrary::Unstructured::new(&bytes)) - .unwrap()], - Default::default(), - BTreeMap::new(), - BTreeMap::new(), - )), - new: Arc::new(Chain::new( - vec![RecoveredBlock::arbitrary(&mut arbitrary::Unstructured::new(&bytes)) - .unwrap()], - Default::default(), - BTreeMap::new(), - BTreeMap::new(), - )), + old: Arc::new(Chain::new(vec![old_block], Default::default(), BTreeMap::new())), + new: Arc::new(Chain::new(vec![new_block], Default::default(), BTreeMap::new())), }, }; let encoded = bincode::serialize(&data).unwrap(); let decoded: Data = bincode::deserialize(&encoded).unwrap(); - assert_eq!(decoded, data); + + // Compare fields individually since Chain doesn't implement PartialEq + match (&decoded.notification, &data.notification) { + ( + ExExNotification::ChainReorged { old: decoded_old, new: decoded_new }, + ExExNotification::ChainReorged { old: expected_old, new: expected_new }, + ) => { + assert_eq!(decoded_old.blocks(), expected_old.blocks()); + assert_eq!(decoded_old.execution_outcome(), expected_old.execution_outcome()); + assert_eq!(decoded_new.blocks(), expected_new.blocks()); + assert_eq!(decoded_new.execution_outcome(), expected_new.execution_outcome()); + } + _ => panic!("Expected ChainReorged variant"), + } } } } diff --git a/crates/optimism/evm/Cargo.toml b/crates/optimism/evm/Cargo.toml index 724f8555e09..4bbd87c6dfb 100644 --- a/crates/optimism/evm/Cargo.toml +++ b/crates/optimism/evm/Cargo.toml @@ -44,6 +44,7 @@ op-revm.workspace = true thiserror.workspace = true [dev-dependencies] +reth-chain.workspace = true reth-evm = { workspace = true, features = ["test-utils"] } reth-revm = { workspace = true, features = ["test-utils"] } alloy-genesis.workspace = true diff --git a/crates/optimism/evm/src/lib.rs b/crates/optimism/evm/src/lib.rs index 1dbd8c7e385..f8cde24b5ed 100644 --- a/crates/optimism/evm/src/lib.rs +++ b/crates/optimism/evm/src/lib.rs @@ -295,11 +295,10 @@ mod tests { use alloy_genesis::Genesis; use alloy_primitives::{bytes, map::HashMap, Address, LogData, B256}; use op_revm::OpSpecId; + use reth_chain::Chain; use reth_chainspec::ChainSpec; use reth_evm::execute::ProviderError; - use reth_execution_types::{ - AccountRevertInit, BundleStateInit, Chain, ExecutionOutcome, RevertsInit, - }; + use reth_execution_types::{AccountRevertInit, BundleStateInit, ExecutionOutcome, RevertsInit}; use reth_optimism_chainspec::{OpChainSpec, BASE_MAINNET}; use reth_optimism_primitives::{OpBlock, OpPrimitives, OpReceipt}; use reth_primitives_traits::{Account, RecoveredBlock}; @@ -529,12 +528,8 @@ mod tests { // Create a Chain object with a BTreeMap of blocks mapped to their block numbers, // including block1_hash and block2_hash, and the execution_outcome - let chain: Chain = Chain::new( - [block1, block2], - execution_outcome.clone(), - BTreeMap::new(), - BTreeMap::new(), - ); + let chain: Chain = + Chain::new([block1, block2], execution_outcome.clone(), BTreeMap::new()); // Assert that the proper receipt vector is returned for block1_hash assert_eq!(chain.receipts_by_block_hash(block1_hash), Some(vec![&receipt1])); diff --git a/crates/rpc/rpc-eth-types/Cargo.toml b/crates/rpc/rpc-eth-types/Cargo.toml index ab0855bf4f6..222440539c4 100644 --- a/crates/rpc/rpc-eth-types/Cargo.toml +++ b/crates/rpc/rpc-eth-types/Cargo.toml @@ -12,11 +12,11 @@ description = "Types supporting implementation of 'eth' namespace RPC server API workspace = true [dependencies] +reth-chain.workspace = true reth-chainspec.workspace = true reth-chain-state.workspace = true reth-errors.workspace = true reth-evm.workspace = true -reth-execution-types.workspace = true reth-metrics.workspace = true reth-ethereum-primitives = { workspace = true, features = ["rpc"] } reth-primitives-traits = { workspace = true, features = ["rpc-compat"] } diff --git a/crates/rpc/rpc-eth-types/src/cache/mod.rs b/crates/rpc/rpc-eth-types/src/cache/mod.rs index 73d8072e6d8..16d35028295 100644 --- a/crates/rpc/rpc-eth-types/src/cache/mod.rs +++ b/crates/rpc/rpc-eth-types/src/cache/mod.rs @@ -5,9 +5,9 @@ use alloy_consensus::BlockHeader; use alloy_eips::BlockHashOrNumber; use alloy_primitives::B256; use futures::{stream::FuturesOrdered, Stream, StreamExt}; +use reth_chain::Chain; use reth_chain_state::CanonStateNotification; use reth_errors::{ProviderError, ProviderResult}; -use reth_execution_types::Chain; use reth_primitives_traits::{Block, NodePrimitives, RecoveredBlock}; use reth_storage_api::{BlockReader, TransactionVariant}; use reth_tasks::{TaskSpawner, TokioTaskExecutor}; diff --git a/crates/stages/stages/Cargo.toml b/crates/stages/stages/Cargo.toml index 470a84a825b..45e1e5ff5fc 100644 --- a/crates/stages/stages/Cargo.toml +++ b/crates/stages/stages/Cargo.toml @@ -13,6 +13,7 @@ workspace = true [dependencies] # reth +reth-chain.workspace = true reth-chainspec = { workspace = true, optional = true } reth-codecs.workspace = true reth-config.workspace = true @@ -29,7 +30,6 @@ reth-fs-util.workspace = true reth-network-p2p.workspace = true reth-primitives-traits = { workspace = true, features = ["serde-bincode-compat"] } reth-provider.workspace = true -reth-execution-types.workspace = true reth-ethereum-primitives = { workspace = true, optional = true } reth-prune.workspace = true reth-prune-types.workspace = true diff --git a/crates/stages/stages/src/stages/execution.rs b/crates/stages/stages/src/stages/execution.rs index 29adf3b2d3f..a00b0780f74 100644 --- a/crates/stages/stages/src/stages/execution.rs +++ b/crates/stages/stages/src/stages/execution.rs @@ -2,11 +2,11 @@ use crate::stages::MERKLE_STAGE_DEFAULT_INCREMENTAL_THRESHOLD; use alloy_consensus::BlockHeader; use alloy_primitives::BlockNumber; use num_traits::Zero; +use reth_chain::Chain; use reth_config::config::ExecutionConfig; use reth_consensus::FullConsensus; use reth_db::{static_file::HeaderMask, tables}; use reth_evm::{execute::Executor, metrics::ExecutorMetrics, ConfigureEvm}; -use reth_execution_types::Chain; use reth_exex::{ExExManagerHandle, ExExNotification, ExExNotificationSource}; use reth_primitives_traits::{format_gas_throughput, BlockBody, NodePrimitives}; use reth_provider::{ @@ -423,7 +423,6 @@ where blocks, state.clone(), BTreeMap::new(), - BTreeMap::new(), )); if previous_input.is_some() { @@ -525,7 +524,6 @@ where blocks, bundle_state_with_receipts, BTreeMap::new(), - BTreeMap::new(), )); debug_assert!( diff --git a/crates/storage/provider/Cargo.toml b/crates/storage/provider/Cargo.toml index 0199b6d2fc4..7227d618cf9 100644 --- a/crates/storage/provider/Cargo.toml +++ b/crates/storage/provider/Cargo.toml @@ -13,6 +13,7 @@ workspace = true [dependencies] # reth +reth-chain.workspace = true reth-chainspec.workspace = true reth-execution-types.workspace = true reth-ethereum-primitives = { workspace = true, features = ["reth-codec"] } @@ -86,6 +87,15 @@ tokio = { workspace = true, features = ["sync", "macros", "rt-multi-thread"] } [features] rocksdb = ["dep:rocksdb"] +serde-bincode-compat = [ + "reth-chain/serde-bincode-compat", + "alloy-consensus/serde-bincode-compat", + "alloy-eips/serde-bincode-compat", + "reth-ethereum-primitives/serde-bincode-compat", + "reth-execution-types/serde-bincode-compat", + "reth-primitives-traits/serde-bincode-compat", + "reth-storage-api/serde-bincode-compat", +] test-utils = [ "reth-db/test-utils", "reth-nippy-jar/test-utils", diff --git a/crates/storage/provider/src/lib.rs b/crates/storage/provider/src/lib.rs index bfab44cb2ac..6c587cc2bed 100644 --- a/crates/storage/provider/src/lib.rs +++ b/crates/storage/provider/src/lib.rs @@ -35,11 +35,20 @@ pub mod test_utils; pub mod either_writer; pub use either_writer::*; +#[cfg(feature = "serde-bincode-compat")] +pub use reth_chain::serde_bincode_compat; +pub use reth_chain::{ + AnchoredTrieInput, BlockReceipts, Chain, ChainBlocks, ComputedTrieData, DeferredTrieData, + DisplayBlocksChain, +}; pub use reth_chain_state::{ CanonStateNotification, CanonStateNotificationSender, CanonStateNotificationStream, CanonStateNotifications, CanonStateSubscriptions, }; -pub use reth_execution_types::*; +pub use reth_execution_types::{ + AccountRevertInit, BlockExecutionOutput, BlockExecutionResult, BundleStateInit, ChangedAccount, + ExecutionOutcome, RevertsInit, +}; /// Re-export `OriginalValuesKnown` pub use revm_database::states::OriginalValuesKnown; // reexport traits to avoid breaking changes diff --git a/crates/storage/provider/src/providers/blockchain_provider.rs b/crates/storage/provider/src/providers/blockchain_provider.rs index 58ec1e25571..dcd95f3de23 100644 --- a/crates/storage/provider/src/providers/blockchain_provider.rs +++ b/crates/storage/provider/src/providers/blockchain_provider.rs @@ -782,6 +782,7 @@ mod tests { use alloy_primitives::{BlockNumber, TxNumber, B256}; use itertools::Itertools; use rand::Rng; + use reth_chain::Chain; use reth_chain_state::{ test_utils::TestBlockBuilder, CanonStateNotification, CanonStateSubscriptions, CanonicalInMemoryState, ExecutedBlock, NewCanonicalChain, @@ -790,9 +791,7 @@ mod tests { use reth_db_api::models::{AccountBeforeTx, StoredBlockBodyIndices}; use reth_errors::ProviderError; use reth_ethereum_primitives::{Block, Receipt}; - use reth_execution_types::{ - BlockExecutionOutput, BlockExecutionResult, Chain, ExecutionOutcome, - }; + use reth_execution_types::{BlockExecutionOutput, BlockExecutionResult, ExecutionOutcome}; use reth_primitives_traits::{RecoveredBlock, SealedBlock, SignerRecoverable}; use reth_storage_api::{ BlockBodyIndicesProvider, BlockHashReader, BlockIdReader, BlockNumReader, BlockReader, @@ -1348,33 +1347,33 @@ mod tests { // Send and receive commit notifications. let block_2 = test_block_builder.generate_random_block(1, block_hash_1).try_recover()?; - let chain = Chain::new( - vec![block_2], - ExecutionOutcome::default(), - BTreeMap::new(), - BTreeMap::new(), - ); + let chain = Chain::new(vec![block_2.clone()], ExecutionOutcome::default(), BTreeMap::new()); let commit = CanonStateNotification::Commit { new: Arc::new(chain.clone()) }; in_memory_state.notify_canon_state(commit.clone()); let (notification_1, notification_2) = tokio::join!(rx_1.recv(), rx_2.recv()); - assert_eq!(notification_1, Ok(commit.clone())); - assert_eq!(notification_2, Ok(commit.clone())); + // Verify both subscribers received commit notifications with matching tip + let n1 = notification_1.unwrap(); + let n2 = notification_2.unwrap(); + assert_eq!(*n1.tip(), block_2); + assert_eq!(*n2.tip(), block_2); // Send and receive re-org notifications. let block_3 = test_block_builder.generate_random_block(1, block_hash_1).try_recover()?; let block_4 = test_block_builder.generate_random_block(2, block_3.hash()).try_recover()?; let new_chain = Chain::new( - vec![block_3, block_4], + vec![block_3, block_4.clone()], ExecutionOutcome::default(), BTreeMap::new(), - BTreeMap::new(), ); let re_org = CanonStateNotification::Reorg { old: Arc::new(chain), new: Arc::new(new_chain) }; in_memory_state.notify_canon_state(re_org.clone()); let (notification_1, notification_2) = tokio::join!(rx_1.recv(), rx_2.recv()); - assert_eq!(notification_1, Ok(re_org.clone())); - assert_eq!(notification_2, Ok(re_org.clone())); + // Verify both subscribers received reorg notifications with matching tip + let n1 = notification_1.unwrap(); + let n2 = notification_2.unwrap(); + assert_eq!(*n1.tip(), block_4); + assert_eq!(*n2.tip(), block_4); Ok(()) } diff --git a/crates/storage/provider/src/providers/database/provider.rs b/crates/storage/provider/src/providers/database/provider.rs index af644a47a9b..b2d911bc5a4 100644 --- a/crates/storage/provider/src/providers/database/provider.rs +++ b/crates/storage/provider/src/providers/database/provider.rs @@ -33,6 +33,7 @@ use alloy_primitives::{ use itertools::Itertools; use parking_lot::RwLock; use rayon::slice::ParallelSliceMut; +use reth_chain::Chain; use reth_chain_state::{ComputedTrieData, ExecutedBlock}; use reth_chainspec::{ChainInfo, ChainSpecProvider, EthChainSpec}; use reth_db_api::{ @@ -47,7 +48,7 @@ use reth_db_api::{ transaction::{DbTx, DbTxMut}, BlockNumberList, PlainAccountState, PlainStorageState, }; -use reth_execution_types::{BlockExecutionOutput, BlockExecutionResult, Chain, ExecutionOutcome}; +use reth_execution_types::{BlockExecutionOutput, BlockExecutionResult, ExecutionOutcome}; use reth_node_types::{BlockTy, BodyTy, HeaderTy, NodeTypes, ReceiptTy, TxTy}; use reth_primitives_traits::{ Account, Block as _, BlockBody as _, Bytecode, RecoveredBlock, SealedHeader, StorageEntry, @@ -3076,7 +3077,7 @@ impl BlockExecutionWriter // Update pipeline progress self.update_pipeline_stages(block, true)?; - Ok(Chain::new(blocks, execution_state, BTreeMap::new(), BTreeMap::new())) + Ok(Chain::new(blocks, execution_state, BTreeMap::new())) } fn remove_block_and_execution_above(&self, block: BlockNumber) -> ProviderResult<()> { diff --git a/crates/storage/storage-api/Cargo.toml b/crates/storage/storage-api/Cargo.toml index 83cbbbd714e..0786e55aa45 100644 --- a/crates/storage/storage-api/Cargo.toml +++ b/crates/storage/storage-api/Cargo.toml @@ -13,6 +13,7 @@ workspace = true [dependencies] # reth +reth-chain.workspace = true reth-db-models.workspace = true reth-chainspec.workspace = true reth-db-api = { workspace = true, optional = true } @@ -60,6 +61,7 @@ db-api = [ ] serde = [ + "reth-chain/serde", "reth-ethereum-primitives/serde", "reth-db-models/serde", "reth-execution-types/serde", @@ -78,6 +80,7 @@ serde-bincode-compat = [ "reth-execution-types/serde-bincode-compat", "reth-primitives-traits/serde-bincode-compat", "reth-trie-common/serde-bincode-compat", + "reth-chain/serde-bincode-compat", "reth-ethereum-primitives/serde-bincode-compat", "alloy-eips/serde-bincode-compat", "alloy-consensus/serde-bincode-compat", diff --git a/crates/storage/storage-api/src/block_writer.rs b/crates/storage/storage-api/src/block_writer.rs index 233e9898d11..5124ff1676a 100644 --- a/crates/storage/storage-api/src/block_writer.rs +++ b/crates/storage/storage-api/src/block_writer.rs @@ -1,8 +1,9 @@ use crate::NodePrimitivesProvider; use alloc::vec::Vec; use alloy_primitives::BlockNumber; +use reth_chain::Chain; use reth_db_models::StoredBlockBodyIndices; -use reth_execution_types::{Chain, ExecutionOutcome}; +use reth_execution_types::ExecutionOutcome; use reth_primitives_traits::{Block, NodePrimitives, RecoveredBlock}; use reth_storage_errors::provider::ProviderResult; use reth_trie_common::HashedPostStateSorted; diff --git a/crates/transaction-pool/Cargo.toml b/crates/transaction-pool/Cargo.toml index 02030719840..bb4f9ba310b 100644 --- a/crates/transaction-pool/Cargo.toml +++ b/crates/transaction-pool/Cargo.toml @@ -13,6 +13,7 @@ workspace = true [dependencies] # reth +reth-chain.workspace = true reth-chain-state.workspace = true reth-ethereum-primitives.workspace = true reth-chainspec.workspace = true @@ -90,6 +91,7 @@ serde = [ "revm-primitives/serde", "reth-primitives-traits/serde", "reth-ethereum-primitives/serde", + "reth-chain/serde", "reth-chain-state/serde", "reth-storage-api/serde", ] diff --git a/crates/transaction-pool/src/blobstore/tracker.rs b/crates/transaction-pool/src/blobstore/tracker.rs index 44bd772cf1d..6edb41a6a23 100644 --- a/crates/transaction-pool/src/blobstore/tracker.rs +++ b/crates/transaction-pool/src/blobstore/tracker.rs @@ -3,7 +3,7 @@ use alloy_consensus::Typed2718; use alloy_eips::eip2718::Encodable2718; use alloy_primitives::{BlockNumber, B256}; -use reth_execution_types::ChainBlocks; +use reth_chain::ChainBlocks; use reth_primitives_traits::{Block, BlockBody, SignedTransaction}; use std::collections::BTreeMap; @@ -91,8 +91,8 @@ mod tests { use super::*; use alloy_consensus::{Header, Signed}; use alloy_primitives::Signature; + use reth_chain::Chain; use reth_ethereum_primitives::Transaction; - use reth_execution_types::Chain; use reth_primitives_traits::{RecoveredBlock, SealedBlock, SealedHeader}; #[test] @@ -175,8 +175,7 @@ mod tests { ); // Extract blocks from the chain - let chain: Chain = - Chain::new(vec![block1, block2], Default::default(), BTreeMap::new(), BTreeMap::new()); + let chain: Chain = Chain::new(vec![block1, block2], Default::default(), BTreeMap::new()); let blocks = chain.into_inner().0; // Add new chain blocks to the tracker