Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
85 changes: 45 additions & 40 deletions crates/engine/tree/src/tree/payload_processor/sparse_trie.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use crate::tree::{
use alloy_primitives::B256;
use alloy_rlp::{Decodable, Encodable};
use crossbeam_channel::{Receiver as CrossbeamReceiver, Sender as CrossbeamSender};
use rayon::iter::{IntoParallelRefMutIterator, ParallelIterator};
use rayon::iter::ParallelIterator;
use reth_primitives_traits::{Account, ParallelBridgeBuffered};
use reth_trie::{
proof_v2::Target, updates::TrieUpdates, DecodedMultiProofV2, HashedPostState, Nibbles,
Expand Down Expand Up @@ -675,6 +675,11 @@ where
/// Invokes `update_leaves` for the accounts trie and collects any new targets.
///
/// Returns whether any updates were drained (applied to the trie).
#[instrument(
level = "debug",
target = "engine::tree::payload_processor::sparse_trie",
skip_all
)]
fn process_account_leaf_updates(&mut self, new: bool) -> SparseTrieResult<bool> {
let account_updates =
if new { &mut self.new_account_updates } else { &mut self.account_updates };
Expand Down Expand Up @@ -718,53 +723,50 @@ where
return Ok(());
}

let span = tracing::Span::current();
let roots = self
let span = debug_span!("compute_storage_roots").entered();
self
.trie
.storage_tries_mut()
.par_iter_mut()
.filter(|(address, _)| {
self.storage_updates.get(*address).is_some_and(|updates| updates.is_empty())
.iter_mut()
.filter(|(address, trie)| {
self.storage_updates.get(*address).is_some_and(|updates| updates.is_empty()) &&
!trie.is_root_cached()
})
.map(|(address, trie)| {
.par_bridge_buffered()
.for_each(|(address, trie)| {
let _enter = debug_span!(target: "engine::tree::payload_processor::sparse_trie", parent: &span, "storage root", ?address).entered();
let root =
trie.root().expect("updates are drained, trie should be revealed by now");

(address, root)
})
.collect::<Vec<_>>();

for (addr, storage_root) in roots {
// If the storage root is known and we have a pending update for this account, encode it
// into a proper update.
if let Entry::Occupied(entry) = self.pending_account_updates.entry(*addr) &&
entry.get().is_some()
{
let account = entry.remove().expect("just checked, should be Some");
let encoded = if account.is_none_or(|account| account.is_empty()) &&
storage_root == EMPTY_ROOT_HASH
{
Vec::new()
} else {
self.account_rlp_buf.clear();
account
.unwrap_or_default()
.into_trie_account(storage_root)
.encode(&mut self.account_rlp_buf);
self.account_rlp_buf.clone()
};
self.account_updates.insert(*addr, LeafUpdate::Changed(encoded));
}
}
trie.root().expect("updates are drained, trie should be revealed by now");
});
drop(span);

loop {
let span = debug_span!("promote_updates", promoted = tracing::field::Empty).entered();
// Now handle pending account updates that can be upgraded to a proper update.
let account_rlp_buf = &mut self.account_rlp_buf;
let mut num_promoted = 0;
self.pending_account_updates.retain(|addr, account| {
// If account has pending storage updates, it is still pending.
if self.storage_updates.get(addr).is_some_and(|updates| !updates.is_empty()) {
return true;
if let Some(updates) = self.storage_updates.get(addr) {
if !updates.is_empty() {
// If account has pending storage updates, it is still pending.
return true;
} else if let Some(account) = account.take() {
let storage_root = self.trie.storage_root(addr).expect("updates are drained, storage trie should be revealed by now");
let encoded = if account.is_none_or(|account| account.is_empty()) &&
storage_root == EMPTY_ROOT_HASH
{
Vec::new()
} else {
account_rlp_buf.clear();
account
.unwrap_or_default()
.into_trie_account(storage_root)
.encode(account_rlp_buf);
account_rlp_buf.clone()
};
self.account_updates.insert(*addr, LeafUpdate::Changed(encoded));
num_promoted += 1;
return false;
}
}

// Get the current account state either from the trie or from latest account update.
Expand Down Expand Up @@ -799,15 +801,18 @@ where
account_rlp_buf.clone()
};
self.account_updates.insert(*addr, LeafUpdate::Changed(encoded));
num_promoted += 1;

false
});
span.record("promoted", num_promoted);
drop(span);

// Only exit when no new updates are processed.
//
// We need to keep iterating if any updates are being drained because that might
// indicate that more pending account updates can be promoted.
if !self.process_account_leaf_updates(false)? {
if num_promoted == 0 || !self.process_account_leaf_updates(false)? {
break
}
}
Expand Down
15 changes: 15 additions & 0 deletions crates/trie/sparse-parallel/src/trie.rs
Original file line number Diff line number Diff line change
Expand Up @@ -898,6 +898,13 @@ impl SparseTrie for ParallelSparseTrie {
fn root(&mut self) -> B256 {
trace!(target: "trie::parallel_sparse", "Calculating trie root hash");

if self.prefix_set.is_empty() &&
let Some(hash) =
self.upper_subtrie.nodes.get(&Nibbles::default()).and_then(|node| node.hash())
{
return hash;
}

// Update all lower subtrie hashes
self.update_subtrie_hashes();

Expand All @@ -910,6 +917,14 @@ impl SparseTrie for ParallelSparseTrie {
root_rlp.as_hash().unwrap_or(EMPTY_ROOT_HASH)
}

fn is_root_cached(&self) -> bool {
self.prefix_set.is_empty() &&
self.upper_subtrie
.nodes
.get(&Nibbles::default())
.is_some_and(|node| node.hash().is_some())
}

#[instrument(level = "trace", target = "trie::sparse::parallel", skip(self))]
fn update_subtrie_hashes(&mut self) {
trace!(target: "trie::parallel_sparse", "Updating subtrie hashes");
Expand Down
3 changes: 3 additions & 0 deletions crates/trie/sparse/src/traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,9 @@ pub trait SparseTrie: Sized + Debug + Send + Sync {
/// The root hash of the trie.
fn root(&mut self) -> B256;

/// Returns true if the root node is cached and does not need any recomputation.
fn is_root_cached(&self) -> bool;

/// Recalculates and updates the RLP hashes of subtries deeper than a certain level. The level
/// is defined in the implementation.
///
Expand Down
10 changes: 10 additions & 0 deletions crates/trie/sparse/src/trie.rs
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,11 @@ impl<T: SparseTrieTrait> RevealableSparseTrie<T> {
Some(self.as_revealed_mut()?.root())
}

/// Returns true if the root node is cached and does not need any recomputation.
pub fn is_root_cached(&self) -> bool {
self.as_revealed_ref().is_some_and(|trie| trie.is_root_cached())
}

/// Returns the root hash along with any accumulated update information.
///
/// This is useful for when you need both the root hash and information about
Expand Down Expand Up @@ -965,6 +970,11 @@ impl SparseTrieTrait for SerialSparseTrie {
}
}

fn is_root_cached(&self) -> bool {
self.prefix_set.is_empty() &&
self.nodes.get(&Nibbles::default()).is_some_and(|node| node.is_hash())
}

fn update_subtrie_hashes(&mut self) {
self.update_rlp_node_level(SPARSE_TRIE_SUBTRIE_HASHES_LEVEL);
}
Expand Down
Loading