Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use std::borrow::Cow;
/// This type allows runtime selection between different sparse trie implementations,
/// providing flexibility in choosing the appropriate implementation based on workload
/// characteristics.
#[derive(Debug)]
#[derive(Debug, Clone)]
pub(crate) enum ConfiguredSparseTrie {
/// Serial implementation of the sparse trie.
Serial(Box<SerialSparseTrie>),
Expand Down
29 changes: 22 additions & 7 deletions crates/engine/tree/src/tree/payload_processor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,9 @@ use reth_trie_parallel::{
};
use reth_trie_sparse::{
provider::{TrieNodeProvider, TrieNodeProviderFactory},
ClearedSparseStateTrie, SerialSparseTrie, SparseStateTrie, SparseTrie,
ClearedSparseStateTrie, SparseStateTrie, SparseTrie,
};
use reth_trie_sparse_parallel::{ParallelSparseTrie, ParallelismThresholds};
use std::sync::{
atomic::AtomicBool,
mpsc::{self, channel, Sender},
Expand All @@ -51,6 +52,14 @@ pub mod sparse_trie;

use configured_sparse_trie::ConfiguredSparseTrie;

/// Default parallelism thresholds to use with the [`ParallelSparseTrie`].
///
/// These values were determined by performing benchmarks using gradually increasing values to judge
/// the affects. Below 100 throughput would generally be equal or slightly less, while above 150 it
/// would deteriorate to the point where PST might as well not be used.
pub const PARALLEL_SPARSE_TRIE_PARALLELISM_THRESHOLDS: ParallelismThresholds =
ParallelismThresholds { min_revealed_nodes: 100, min_updated_nodes: 100 };

/// Entrypoint for executing the payload.
#[derive(Debug)]
pub struct PayloadProcessor<Evm>
Expand All @@ -76,7 +85,9 @@ where
/// A cleared `SparseStateTrie`, kept around to be reused for the state root computation so
/// that allocations can be minimized.
sparse_state_trie: Arc<
parking_lot::Mutex<Option<ClearedSparseStateTrie<ConfiguredSparseTrie, SerialSparseTrie>>>,
parking_lot::Mutex<
Option<ClearedSparseStateTrie<ConfiguredSparseTrie, ConfiguredSparseTrie>>,
>,
>,
/// Whether to use the parallel sparse trie.
disable_parallel_sparse_trie: bool,
Expand Down Expand Up @@ -363,20 +374,24 @@ where
// there's none to reuse.
let cleared_sparse_trie = Arc::clone(&self.sparse_state_trie);
let sparse_state_trie = cleared_sparse_trie.lock().take().unwrap_or_else(|| {
let accounts_trie = if self.disable_parallel_sparse_trie {
let default_trie = SparseTrie::blind_from(if self.disable_parallel_sparse_trie {
ConfiguredSparseTrie::Serial(Default::default())
} else {
ConfiguredSparseTrie::Parallel(Default::default())
};
ConfiguredSparseTrie::Parallel(Box::new(
ParallelSparseTrie::default()
.with_parallelism_thresholds(PARALLEL_SPARSE_TRIE_PARALLELISM_THRESHOLDS),
))
});
ClearedSparseStateTrie::from_state_trie(
SparseStateTrie::new()
.with_accounts_trie(SparseTrie::Blind(Some(Box::new(accounts_trie))))
.with_accounts_trie(default_trie.clone())
.with_default_storage_trie(default_trie)
.with_updates(true),
)
});

let task =
SparseTrieTask::<_, ConfiguredSparseTrie, SerialSparseTrie>::new_with_cleared_trie(
SparseTrieTask::<_, ConfiguredSparseTrie, ConfiguredSparseTrie>::new_with_cleared_trie(
sparse_trie_rx,
proof_task_handle,
self.trie_metrics.clone(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ where
BPF::AccountNodeProvider: TrieNodeProvider + Send + Sync,
BPF::StorageNodeProvider: TrieNodeProvider + Send + Sync,
A: SparseTrieInterface + Send + Sync + Default,
S: SparseTrieInterface + Send + Sync + Default,
S: SparseTrieInterface + Send + Sync + Default + Clone,
{
/// Creates a new sparse trie, pre-populating with a [`ClearedSparseStateTrie`].
pub(super) fn new_with_cleared_trie(
Expand Down Expand Up @@ -140,7 +140,7 @@ where
BPF::AccountNodeProvider: TrieNodeProvider + Send + Sync,
BPF::StorageNodeProvider: TrieNodeProvider + Send + Sync,
A: SparseTrieInterface + Send + Sync + Default,
S: SparseTrieInterface + Send + Sync + Default,
S: SparseTrieInterface + Send + Sync + Default + Clone,
{
trace!(target: "engine::root::sparse", "Updating sparse trie");
let started_at = Instant::now();
Expand Down
163 changes: 107 additions & 56 deletions crates/trie/sparse-parallel/src/trie.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,18 @@ pub const UPPER_TRIE_MAX_DEPTH: usize = 2;
/// Number of lower subtries which are managed by the [`ParallelSparseTrie`].
pub const NUM_LOWER_SUBTRIES: usize = 16usize.pow(UPPER_TRIE_MAX_DEPTH as u32);

/// Configuration for controlling when parallelism is enabled in [`ParallelSparseTrie`] operations.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
pub struct ParallelismThresholds {
/// Minimum number of nodes to reveal before parallel processing is enabled.
/// When `reveal_nodes` has fewer nodes than this threshold, they will be processed serially.
pub min_revealed_nodes: usize,
/// Minimum number of changed keys (prefix set length) before parallel processing is enabled
/// for hash updates. When updating subtrie hashes with fewer changed keys than this threshold,
/// the updates will be processed serially.
pub min_updated_nodes: usize,
}

/// A revealed sparse trie with subtries that can be updated in parallel.
///
/// ## Structure
Expand Down Expand Up @@ -109,6 +121,8 @@ pub struct ParallelSparseTrie {
/// Reusable buffer pool used for collecting [`SparseTrieUpdatesAction`]s during hash
/// computations.
update_actions_buffers: Vec<Vec<SparseTrieUpdatesAction>>,
/// Thresholds controlling when parallelism is enabled for different operations.
parallelism_thresholds: ParallelismThresholds,
/// Metrics for the parallel sparse trie.
#[cfg(feature = "metrics")]
metrics: crate::metrics::ParallelSparseTrieMetrics,
Expand All @@ -127,6 +141,7 @@ impl Default for ParallelSparseTrie {
branch_node_tree_masks: HashMap::default(),
branch_node_hash_masks: HashMap::default(),
update_actions_buffers: Vec::default(),
parallelism_thresholds: Default::default(),
#[cfg(feature = "metrics")]
metrics: Default::default(),
}
Expand Down Expand Up @@ -200,19 +215,20 @@ impl SparseTrieInterface for ParallelSparseTrie {
self.reveal_upper_node(node.path, &node.node, node.masks)?;
}

#[cfg(not(feature = "std"))]
// Reveal lower subtrie nodes serially if nostd
{
if !self.is_reveal_parallelism_enabled(lower_nodes.len()) {
for node in lower_nodes {
if let Some(subtrie) = self.lower_subtrie_for_path_mut(&node.path) {
subtrie.reveal_node(node.path, &node.node, &node.masks)?;
subtrie.reveal_node(node.path, &node.node, node.masks)?;
} else {
panic!("upper subtrie node {node:?} found amongst lower nodes");
}
}
Ok(())
return Ok(())
}

#[cfg(not(feature = "std"))]
unreachable!("nostd is checked by is_reveal_parallelism_enabled");

#[cfg(feature = "std")]
// Reveal lower subtrie nodes in parallel
{
Expand Down Expand Up @@ -725,76 +741,62 @@ impl SparseTrieInterface for ParallelSparseTrie {

// Take changed subtries according to the prefix set
let mut prefix_set = core::mem::take(&mut self.prefix_set).freeze();
let (subtries, unchanged_prefix_set) = self.take_changed_lower_subtries(&mut prefix_set);
let num_changed_keys = prefix_set.len();
let (mut changed_subtries, unchanged_prefix_set) =
self.take_changed_lower_subtries(&mut prefix_set);

// update metrics
#[cfg(feature = "metrics")]
self.metrics.subtries_updated.record(subtries.len() as f64);
self.metrics.subtries_updated.record(changed_subtries.len() as f64);

// Update the prefix set with the keys that didn't have matching subtries
self.prefix_set = unchanged_prefix_set;

let (tx, rx) = mpsc::channel();
// Update subtrie hashes serially parallelism is not enabled
if !self.is_update_parallelism_enabled(num_changed_keys) {
for changed_subtrie in &mut changed_subtries {
changed_subtrie.subtrie.update_hashes(
&mut changed_subtrie.prefix_set,
&mut changed_subtrie.update_actions_buf,
&self.branch_node_tree_masks,
&self.branch_node_hash_masks,
);
}

#[cfg(not(feature = "std"))]
// Update subtrie hashes serially if nostd
for ChangedSubtrie { index, mut subtrie, mut prefix_set, mut update_actions_buf } in
subtries
{
subtrie.update_hashes(
&mut prefix_set,
&mut update_actions_buf,
&self.branch_node_tree_masks,
&self.branch_node_hash_masks,
);
tx.send((index, subtrie, update_actions_buf)).unwrap();
self.insert_changed_subtries(changed_subtries);
return
}

#[cfg(not(feature = "std"))]
unreachable!("nostd is checked by is_update_parallelism_enabled");

#[cfg(feature = "std")]
// Update subtrie hashes in parallel
{
use rayon::iter::{IntoParallelIterator, ParallelIterator};
let (tx, rx) = mpsc::channel();

let branch_node_tree_masks = &self.branch_node_tree_masks;
let branch_node_hash_masks = &self.branch_node_hash_masks;
subtries
changed_subtries
.into_par_iter()
.map(
|ChangedSubtrie {
index,
mut subtrie,
mut prefix_set,
mut update_actions_buf,
}| {
#[cfg(feature = "metrics")]
let start = std::time::Instant::now();
subtrie.update_hashes(
&mut prefix_set,
&mut update_actions_buf,
branch_node_tree_masks,
branch_node_hash_masks,
);
#[cfg(feature = "metrics")]
self.metrics.subtrie_hash_update_latency.record(start.elapsed());
(index, subtrie, update_actions_buf)
},
)
.map(|mut changed_subtrie| {
#[cfg(feature = "metrics")]
let start = std::time::Instant::now();
changed_subtrie.subtrie.update_hashes(
&mut changed_subtrie.prefix_set,
&mut changed_subtrie.update_actions_buf,
branch_node_tree_masks,
branch_node_hash_masks,
);
#[cfg(feature = "metrics")]
self.metrics.subtrie_hash_update_latency.record(start.elapsed());
changed_subtrie
})
.for_each_init(|| tx.clone(), |tx, result| tx.send(result).unwrap());
}

drop(tx);

// Return updated subtries back to the trie after executing any actions required on the
// top-level `SparseTrieUpdates`.
for (index, subtrie, update_actions_buf) in rx {
if let Some(mut update_actions_buf) = update_actions_buf {
self.apply_subtrie_update_actions(
#[allow(clippy::iter_with_drain)]
update_actions_buf.drain(..),
);
self.update_actions_buffers.push(update_actions_buf);
}

self.lower_subtries[index] = LowerSparseSubtrie::Revealed(subtrie);
drop(tx);
self.insert_changed_subtries(rx);
}
}

Expand Down Expand Up @@ -896,11 +898,35 @@ impl SparseTrieInterface for ParallelSparseTrie {
}

impl ParallelSparseTrie {
/// Sets the thresholds that control when parallelism is used during operations.
pub const fn with_parallelism_thresholds(mut self, thresholds: ParallelismThresholds) -> Self {
self.parallelism_thresholds = thresholds;
self
}

/// Returns true if retaining updates is enabled for the overall trie.
const fn updates_enabled(&self) -> bool {
self.updates.is_some()
}

/// Returns true if parallelism should be enabled for revealing the given number of nodes.
/// Will always return false in nostd builds.
const fn is_reveal_parallelism_enabled(&self, num_nodes: usize) -> bool {
#[cfg(not(feature = "std"))]
return false;

num_nodes >= self.parallelism_thresholds.min_revealed_nodes
}

/// Returns true if parallelism should be enabled for updating hashes with the given number
/// of changed keys. Will always return false in nostd builds.
const fn is_update_parallelism_enabled(&self, num_changed_keys: usize) -> bool {
#[cfg(not(feature = "std"))]
return false;

num_changed_keys >= self.parallelism_thresholds.min_updated_nodes
}

/// Creates a new revealed sparse trie from the given root node.
///
/// This function initializes the internal structures and then reveals the root.
Expand Down Expand Up @@ -1310,6 +1336,12 @@ impl ParallelSparseTrie {
&mut self,
prefix_set: &mut PrefixSet,
) -> (Vec<ChangedSubtrie>, PrefixSetMut) {
// Fast-path: If the prefix set is empty then no subtries can have been changed. Just return
// empty values.
if prefix_set.is_empty() && !prefix_set.all() {
return Default::default();
}

// Clone the prefix set to iterate over its keys. Cloning is cheap, it's just an Arc.
let prefix_set_clone = prefix_set.clone();
let mut prefix_set_iter = prefix_set_clone.into_iter().copied().peekable();
Expand Down Expand Up @@ -1453,6 +1485,25 @@ impl ParallelSparseTrie {

Ok(())
}

/// Return updated subtries back to the trie after executing any actions required on the
/// top-level `SparseTrieUpdates`.
fn insert_changed_subtries(
&mut self,
changed_subtries: impl IntoIterator<Item = ChangedSubtrie>,
) {
for ChangedSubtrie { index, subtrie, update_actions_buf, .. } in changed_subtries {
if let Some(mut update_actions_buf) = update_actions_buf {
self.apply_subtrie_update_actions(
#[allow(clippy::iter_with_drain)]
update_actions_buf.drain(..),
);
self.update_actions_buffers.push(update_actions_buf);
}

self.lower_subtries[index] = LowerSparseSubtrie::Revealed(subtrie);
}
}
}

/// This is a subtrie of the [`ParallelSparseTrie`] that contains a map from path to sparse trie
Expand Down
Loading