diff --git a/crates/engine/tree/src/tree/payload_processor/configured_sparse_trie.rs b/crates/engine/tree/src/tree/payload_processor/configured_sparse_trie.rs index d59f14c796a..176cffcd8fa 100644 --- a/crates/engine/tree/src/tree/payload_processor/configured_sparse_trie.rs +++ b/crates/engine/tree/src/tree/payload_processor/configured_sparse_trie.rs @@ -14,7 +14,7 @@ use std::borrow::Cow; /// This type allows runtime selection between different sparse trie implementations, /// providing flexibility in choosing the appropriate implementation based on workload /// characteristics. -#[derive(Debug)] +#[derive(Debug, Clone)] pub(crate) enum ConfiguredSparseTrie { /// Serial implementation of the sparse trie. Serial(Box), diff --git a/crates/engine/tree/src/tree/payload_processor/mod.rs b/crates/engine/tree/src/tree/payload_processor/mod.rs index 6c298d76255..e7ccc47e1b4 100644 --- a/crates/engine/tree/src/tree/payload_processor/mod.rs +++ b/crates/engine/tree/src/tree/payload_processor/mod.rs @@ -33,8 +33,9 @@ use reth_trie_parallel::{ }; use reth_trie_sparse::{ provider::{TrieNodeProvider, TrieNodeProviderFactory}, - ClearedSparseStateTrie, SerialSparseTrie, SparseStateTrie, SparseTrie, + ClearedSparseStateTrie, SparseStateTrie, SparseTrie, }; +use reth_trie_sparse_parallel::{ParallelSparseTrie, ParallelismThresholds}; use std::sync::{ atomic::AtomicBool, mpsc::{self, channel, Sender}, @@ -51,6 +52,14 @@ pub mod sparse_trie; use configured_sparse_trie::ConfiguredSparseTrie; +/// Default parallelism thresholds to use with the [`ParallelSparseTrie`]. +/// +/// These values were determined by performing benchmarks using gradually increasing values to judge +/// the affects. Below 100 throughput would generally be equal or slightly less, while above 150 it +/// would deteriorate to the point where PST might as well not be used. +pub const PARALLEL_SPARSE_TRIE_PARALLELISM_THRESHOLDS: ParallelismThresholds = + ParallelismThresholds { min_revealed_nodes: 100, min_updated_nodes: 100 }; + /// Entrypoint for executing the payload. #[derive(Debug)] pub struct PayloadProcessor @@ -76,7 +85,9 @@ where /// A cleared `SparseStateTrie`, kept around to be reused for the state root computation so /// that allocations can be minimized. sparse_state_trie: Arc< - parking_lot::Mutex>>, + parking_lot::Mutex< + Option>, + >, >, /// Whether to use the parallel sparse trie. disable_parallel_sparse_trie: bool, @@ -363,20 +374,24 @@ where // there's none to reuse. let cleared_sparse_trie = Arc::clone(&self.sparse_state_trie); let sparse_state_trie = cleared_sparse_trie.lock().take().unwrap_or_else(|| { - let accounts_trie = if self.disable_parallel_sparse_trie { + let default_trie = SparseTrie::blind_from(if self.disable_parallel_sparse_trie { ConfiguredSparseTrie::Serial(Default::default()) } else { - ConfiguredSparseTrie::Parallel(Default::default()) - }; + ConfiguredSparseTrie::Parallel(Box::new( + ParallelSparseTrie::default() + .with_parallelism_thresholds(PARALLEL_SPARSE_TRIE_PARALLELISM_THRESHOLDS), + )) + }); ClearedSparseStateTrie::from_state_trie( SparseStateTrie::new() - .with_accounts_trie(SparseTrie::Blind(Some(Box::new(accounts_trie)))) + .with_accounts_trie(default_trie.clone()) + .with_default_storage_trie(default_trie) .with_updates(true), ) }); let task = - SparseTrieTask::<_, ConfiguredSparseTrie, SerialSparseTrie>::new_with_cleared_trie( + SparseTrieTask::<_, ConfiguredSparseTrie, ConfiguredSparseTrie>::new_with_cleared_trie( sparse_trie_rx, proof_task_handle, self.trie_metrics.clone(), diff --git a/crates/engine/tree/src/tree/payload_processor/sparse_trie.rs b/crates/engine/tree/src/tree/payload_processor/sparse_trie.rs index b458d7d58ea..65101ca7f0e 100644 --- a/crates/engine/tree/src/tree/payload_processor/sparse_trie.rs +++ b/crates/engine/tree/src/tree/payload_processor/sparse_trie.rs @@ -39,7 +39,7 @@ where BPF::AccountNodeProvider: TrieNodeProvider + Send + Sync, BPF::StorageNodeProvider: TrieNodeProvider + Send + Sync, A: SparseTrieInterface + Send + Sync + Default, - S: SparseTrieInterface + Send + Sync + Default, + S: SparseTrieInterface + Send + Sync + Default + Clone, { /// Creates a new sparse trie, pre-populating with a [`ClearedSparseStateTrie`]. pub(super) fn new_with_cleared_trie( @@ -140,7 +140,7 @@ where BPF::AccountNodeProvider: TrieNodeProvider + Send + Sync, BPF::StorageNodeProvider: TrieNodeProvider + Send + Sync, A: SparseTrieInterface + Send + Sync + Default, - S: SparseTrieInterface + Send + Sync + Default, + S: SparseTrieInterface + Send + Sync + Default + Clone, { trace!(target: "engine::root::sparse", "Updating sparse trie"); let started_at = Instant::now(); diff --git a/crates/trie/sparse-parallel/src/trie.rs b/crates/trie/sparse-parallel/src/trie.rs index 7523baf9a4b..908253c7a3e 100644 --- a/crates/trie/sparse-parallel/src/trie.rs +++ b/crates/trie/sparse-parallel/src/trie.rs @@ -30,6 +30,18 @@ pub const UPPER_TRIE_MAX_DEPTH: usize = 2; /// Number of lower subtries which are managed by the [`ParallelSparseTrie`]. pub const NUM_LOWER_SUBTRIES: usize = 16usize.pow(UPPER_TRIE_MAX_DEPTH as u32); +/// Configuration for controlling when parallelism is enabled in [`ParallelSparseTrie`] operations. +#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)] +pub struct ParallelismThresholds { + /// Minimum number of nodes to reveal before parallel processing is enabled. + /// When `reveal_nodes` has fewer nodes than this threshold, they will be processed serially. + pub min_revealed_nodes: usize, + /// Minimum number of changed keys (prefix set length) before parallel processing is enabled + /// for hash updates. When updating subtrie hashes with fewer changed keys than this threshold, + /// the updates will be processed serially. + pub min_updated_nodes: usize, +} + /// A revealed sparse trie with subtries that can be updated in parallel. /// /// ## Structure @@ -109,6 +121,8 @@ pub struct ParallelSparseTrie { /// Reusable buffer pool used for collecting [`SparseTrieUpdatesAction`]s during hash /// computations. update_actions_buffers: Vec>, + /// Thresholds controlling when parallelism is enabled for different operations. + parallelism_thresholds: ParallelismThresholds, /// Metrics for the parallel sparse trie. #[cfg(feature = "metrics")] metrics: crate::metrics::ParallelSparseTrieMetrics, @@ -127,6 +141,7 @@ impl Default for ParallelSparseTrie { branch_node_tree_masks: HashMap::default(), branch_node_hash_masks: HashMap::default(), update_actions_buffers: Vec::default(), + parallelism_thresholds: Default::default(), #[cfg(feature = "metrics")] metrics: Default::default(), } @@ -200,19 +215,20 @@ impl SparseTrieInterface for ParallelSparseTrie { self.reveal_upper_node(node.path, &node.node, node.masks)?; } - #[cfg(not(feature = "std"))] - // Reveal lower subtrie nodes serially if nostd - { + if !self.is_reveal_parallelism_enabled(lower_nodes.len()) { for node in lower_nodes { if let Some(subtrie) = self.lower_subtrie_for_path_mut(&node.path) { - subtrie.reveal_node(node.path, &node.node, &node.masks)?; + subtrie.reveal_node(node.path, &node.node, node.masks)?; } else { panic!("upper subtrie node {node:?} found amongst lower nodes"); } } - Ok(()) + return Ok(()) } + #[cfg(not(feature = "std"))] + unreachable!("nostd is checked by is_reveal_parallelism_enabled"); + #[cfg(feature = "std")] // Reveal lower subtrie nodes in parallel { @@ -725,76 +741,62 @@ impl SparseTrieInterface for ParallelSparseTrie { // Take changed subtries according to the prefix set let mut prefix_set = core::mem::take(&mut self.prefix_set).freeze(); - let (subtries, unchanged_prefix_set) = self.take_changed_lower_subtries(&mut prefix_set); + let num_changed_keys = prefix_set.len(); + let (mut changed_subtries, unchanged_prefix_set) = + self.take_changed_lower_subtries(&mut prefix_set); // update metrics #[cfg(feature = "metrics")] - self.metrics.subtries_updated.record(subtries.len() as f64); + self.metrics.subtries_updated.record(changed_subtries.len() as f64); // Update the prefix set with the keys that didn't have matching subtries self.prefix_set = unchanged_prefix_set; - let (tx, rx) = mpsc::channel(); + // Update subtrie hashes serially parallelism is not enabled + if !self.is_update_parallelism_enabled(num_changed_keys) { + for changed_subtrie in &mut changed_subtries { + changed_subtrie.subtrie.update_hashes( + &mut changed_subtrie.prefix_set, + &mut changed_subtrie.update_actions_buf, + &self.branch_node_tree_masks, + &self.branch_node_hash_masks, + ); + } - #[cfg(not(feature = "std"))] - // Update subtrie hashes serially if nostd - for ChangedSubtrie { index, mut subtrie, mut prefix_set, mut update_actions_buf } in - subtries - { - subtrie.update_hashes( - &mut prefix_set, - &mut update_actions_buf, - &self.branch_node_tree_masks, - &self.branch_node_hash_masks, - ); - tx.send((index, subtrie, update_actions_buf)).unwrap(); + self.insert_changed_subtries(changed_subtries); + return } + #[cfg(not(feature = "std"))] + unreachable!("nostd is checked by is_update_parallelism_enabled"); + #[cfg(feature = "std")] // Update subtrie hashes in parallel { use rayon::iter::{IntoParallelIterator, ParallelIterator}; + let (tx, rx) = mpsc::channel(); + let branch_node_tree_masks = &self.branch_node_tree_masks; let branch_node_hash_masks = &self.branch_node_hash_masks; - subtries + changed_subtries .into_par_iter() - .map( - |ChangedSubtrie { - index, - mut subtrie, - mut prefix_set, - mut update_actions_buf, - }| { - #[cfg(feature = "metrics")] - let start = std::time::Instant::now(); - subtrie.update_hashes( - &mut prefix_set, - &mut update_actions_buf, - branch_node_tree_masks, - branch_node_hash_masks, - ); - #[cfg(feature = "metrics")] - self.metrics.subtrie_hash_update_latency.record(start.elapsed()); - (index, subtrie, update_actions_buf) - }, - ) + .map(|mut changed_subtrie| { + #[cfg(feature = "metrics")] + let start = std::time::Instant::now(); + changed_subtrie.subtrie.update_hashes( + &mut changed_subtrie.prefix_set, + &mut changed_subtrie.update_actions_buf, + branch_node_tree_masks, + branch_node_hash_masks, + ); + #[cfg(feature = "metrics")] + self.metrics.subtrie_hash_update_latency.record(start.elapsed()); + changed_subtrie + }) .for_each_init(|| tx.clone(), |tx, result| tx.send(result).unwrap()); - } - drop(tx); - - // Return updated subtries back to the trie after executing any actions required on the - // top-level `SparseTrieUpdates`. - for (index, subtrie, update_actions_buf) in rx { - if let Some(mut update_actions_buf) = update_actions_buf { - self.apply_subtrie_update_actions( - #[allow(clippy::iter_with_drain)] - update_actions_buf.drain(..), - ); - self.update_actions_buffers.push(update_actions_buf); - } - - self.lower_subtries[index] = LowerSparseSubtrie::Revealed(subtrie); + drop(tx); + self.insert_changed_subtries(rx); } } @@ -896,11 +898,35 @@ impl SparseTrieInterface for ParallelSparseTrie { } impl ParallelSparseTrie { + /// Sets the thresholds that control when parallelism is used during operations. + pub const fn with_parallelism_thresholds(mut self, thresholds: ParallelismThresholds) -> Self { + self.parallelism_thresholds = thresholds; + self + } + /// Returns true if retaining updates is enabled for the overall trie. const fn updates_enabled(&self) -> bool { self.updates.is_some() } + /// Returns true if parallelism should be enabled for revealing the given number of nodes. + /// Will always return false in nostd builds. + const fn is_reveal_parallelism_enabled(&self, num_nodes: usize) -> bool { + #[cfg(not(feature = "std"))] + return false; + + num_nodes >= self.parallelism_thresholds.min_revealed_nodes + } + + /// Returns true if parallelism should be enabled for updating hashes with the given number + /// of changed keys. Will always return false in nostd builds. + const fn is_update_parallelism_enabled(&self, num_changed_keys: usize) -> bool { + #[cfg(not(feature = "std"))] + return false; + + num_changed_keys >= self.parallelism_thresholds.min_updated_nodes + } + /// Creates a new revealed sparse trie from the given root node. /// /// This function initializes the internal structures and then reveals the root. @@ -1310,6 +1336,12 @@ impl ParallelSparseTrie { &mut self, prefix_set: &mut PrefixSet, ) -> (Vec, PrefixSetMut) { + // Fast-path: If the prefix set is empty then no subtries can have been changed. Just return + // empty values. + if prefix_set.is_empty() && !prefix_set.all() { + return Default::default(); + } + // Clone the prefix set to iterate over its keys. Cloning is cheap, it's just an Arc. let prefix_set_clone = prefix_set.clone(); let mut prefix_set_iter = prefix_set_clone.into_iter().copied().peekable(); @@ -1453,6 +1485,25 @@ impl ParallelSparseTrie { Ok(()) } + + /// Return updated subtries back to the trie after executing any actions required on the + /// top-level `SparseTrieUpdates`. + fn insert_changed_subtries( + &mut self, + changed_subtries: impl IntoIterator, + ) { + for ChangedSubtrie { index, subtrie, update_actions_buf, .. } in changed_subtries { + if let Some(mut update_actions_buf) = update_actions_buf { + self.apply_subtrie_update_actions( + #[allow(clippy::iter_with_drain)] + update_actions_buf.drain(..), + ); + self.update_actions_buffers.push(update_actions_buf); + } + + self.lower_subtries[index] = LowerSparseSubtrie::Revealed(subtrie); + } + } } /// This is a subtrie of the [`ParallelSparseTrie`] that contains a map from path to sparse trie diff --git a/crates/trie/sparse/src/state.rs b/crates/trie/sparse/src/state.rs index 0071811f9bc..fde4810da57 100644 --- a/crates/trie/sparse/src/state.rs +++ b/crates/trie/sparse/src/state.rs @@ -30,8 +30,8 @@ pub struct ClearedSparseStateTrie< impl ClearedSparseStateTrie where - A: SparseTrieInterface + Default, - S: SparseTrieInterface + Default, + A: SparseTrieInterface, + S: SparseTrieInterface, { /// Creates a [`ClearedSparseStateTrie`] by clearing all the existing internal state of a /// [`SparseStateTrie`] and then storing that instance for later re-use. @@ -108,12 +108,18 @@ impl SparseStateTrie { self.state = trie; self } + + /// Set the default trie which will be cloned when creating new storage [`SparseTrie`]s. + pub fn with_default_storage_trie(mut self, trie: SparseTrie) -> Self { + self.storage.default_trie = trie; + self + } } impl SparseStateTrie where A: SparseTrieInterface + Default, - S: SparseTrieInterface + Default, + S: SparseTrieInterface + Default + Clone, { /// Create new [`SparseStateTrie`] pub fn new() -> Self { @@ -801,9 +807,11 @@ struct StorageTries { revealed_paths: B256Map>, /// Cleared revealed storage trie path collections, kept for re-use. cleared_revealed_paths: Vec>, + /// A default cleared trie instance, which will be cloned when creating new tries. + default_trie: SparseTrie, } -impl StorageTries { +impl StorageTries { /// Returns all fields to a cleared state, equivalent to the default state, keeping cleared /// collections for re-use later when possible. fn clear(&mut self) { @@ -813,7 +821,9 @@ impl StorageTries { set })); } +} +impl StorageTries { /// Returns the set of already revealed trie node paths for an account's storage, creating the /// set if it didn't previously exist. fn get_revealed_paths_mut(&mut self, account: B256) -> &mut HashSet { @@ -828,10 +838,9 @@ impl StorageTries { &mut self, account: B256, ) -> (&mut SparseTrie, &mut HashSet) { - let trie = self - .tries - .entry(account) - .or_insert_with(|| self.cleared_tries.pop().unwrap_or_default()); + let trie = self.tries.entry(account).or_insert_with(|| { + self.cleared_tries.pop().unwrap_or_else(|| self.default_trie.clone()) + }); let revealed_paths = self .revealed_paths @@ -845,7 +854,9 @@ impl StorageTries { /// doesn't already exist. #[cfg(feature = "std")] fn take_or_create_trie(&mut self, account: &B256) -> SparseTrie { - self.tries.remove(account).unwrap_or_else(|| self.cleared_tries.pop().unwrap_or_default()) + self.tries.remove(account).unwrap_or_else(|| { + self.cleared_tries.pop().unwrap_or_else(|| self.default_trie.clone()) + }) } /// Takes the revealed paths set from the account from the internal `HashMap`, creating one if diff --git a/crates/trie/sparse/src/trie.rs b/crates/trie/sparse/src/trie.rs index ca356e060e2..d0bd94b28dc 100644 --- a/crates/trie/sparse/src/trie.rs +++ b/crates/trie/sparse/src/trie.rs @@ -42,7 +42,7 @@ const SPARSE_TRIE_SUBTRIE_HASHES_LEVEL: usize = 2; /// 2. Update tracking - changes to the trie structure can be tracked and selectively persisted /// 3. Incremental operations - nodes can be revealed as needed without loading the entire trie. /// This is what gives rise to the notion of a "sparse" trie. -#[derive(PartialEq, Eq, Debug)] +#[derive(PartialEq, Eq, Debug, Clone)] pub enum SparseTrie { /// The trie is blind -- no nodes have been revealed /// @@ -132,6 +132,13 @@ impl SparseTrie { Self::Blind(None) } + /// Creates a new blind sparse trie, clearing and later reusing the given + /// [`SparseTrieInterface`]. + pub fn blind_from(mut trie: T) -> Self { + trie.clear(); + Self::Blind(Some(Box::new(trie))) + } + /// Returns `true` if the sparse trie has no revealed nodes. pub const fn is_blind(&self) -> bool { matches!(self, Self::Blind(_))