Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
269 changes: 223 additions & 46 deletions crates/common/trie/trie_sorted.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,39 +156,28 @@ fn flush_nodes_to_write(
Ok(())
}

/// trie_from_sorted_accounts computes and stores into a db a trie from a sorted
/// iterator of H256 paths and values. This function takes a ThreadPool Arc to send
/// the writing task to be done concurrently.
/// To limit the amount of memory this function can use, we use a crossbeam multiproducer
/// multiconsumer queue, which gives the function a buffer to write nodes into before
/// flushing to the db.
pub fn trie_from_sorted_accounts<'scope, T>(
/// Core trie building loop. Processes sorted (H256, Vec<u8>) pairs into a trie structure,
/// writing intermediate nodes to DB via the ThreadPool. Returns the root-level BranchNode
/// (with choices populated) and any unflushed nodes, or None if the iterator was empty.
#[allow(clippy::type_complexity)]
fn trie_from_sorted_accounts_inner<'scope, T>(
db: &'scope dyn TrieDB,
data_iter: &mut T,
scope: Arc<ThreadPool<'scope>>,
buffer_sender: Sender<Vec<(Nibbles, Node)>>,
buffer_receiver: Receiver<Vec<(Nibbles, Node)>>,
) -> Result<H256, TrieGenerationError>
) -> Result<Option<(BranchNode, Vec<(Nibbles, Node)>)>, TrieGenerationError>
where
T: Iterator<Item = (H256, Vec<u8>)> + Send,
{
let Some(initial_value) = data_iter.next() else {
return Ok(*EMPTY_TRIE_HASH);
return Ok(None);
};
let mut nodes_to_write: Vec<(Nibbles, Node)> = buffer_receiver
.recv()
.expect("This channel shouldn't close");
// We have a stack of the parents of the current parent
let mut trie_stack: Vec<StackElement> = Vec::with_capacity(64); // Optimized for H256

// This is the current parent of the first element. We assume that the root node
// is always a parent, and we fix it afterwards if it's not true
// The root is a parent of all nodes
let mut nodehash_buffer = Vec::with_capacity(512);
let mut trie_stack: Vec<StackElement> = Vec::with_capacity(64);
let mut current_parent = StackElement::default();

// The current node that is being used for computing. We compare it with the current
// parent and the next value to see where it should be written
let mut current_node: CenterSide = CenterSide::from_value(initial_value);
let mut next_value_opt: Option<(H256, Vec<u8>)> = data_iter.next();

Expand All @@ -198,17 +187,13 @@ where
scope.execute_priority(Box::new(move || {
let _ = flush_nodes_to_write(nodes_to_write, db, buffer_sender);
}));
// We wait to get a new buffer to avoid writing too much
nodes_to_write = buffer_receiver
.recv()
.expect("This channel shouldn't close");
}

let next_value_path = Nibbles::from_bytes(next_value.0.as_bytes());

// If the current parent isn't a parent of the next value, that means
// that the current value doesn't have a sibling to the right
// As such we write this node and change the current node to the current parent
while !is_child(&next_value_path, &current_parent) {
add_current_to_parent_and_write_queue(
&mut nodes_to_write,
Expand All @@ -222,12 +207,6 @@ where
current_node = temp;
}

// If the "distance" (same prefix count) between the current and next value is equal to the
// parent node, that means that they're both "siblings" of the current parent
// Ex: parent=[05] current=[0567] next=[0589]
// there is not a branch between the parent and current, so we just write the
// current element and change the current with the next value while
// advancing the iterator for our next value
if current_node.path.count_prefix(&current_parent.path)
== current_node.path.count_prefix(&next_value_path)
{
Expand All @@ -236,15 +215,6 @@ where
&current_node,
&mut current_parent,
)?;

// If the "distance" between the current and next value is larger than that to
// the parent node, that means that there is a closer parent for both of them
// Ex: parent=[05] current=[0567] next=[0569]
// This means that there is a branch in [056] and current is a child
// of that parent
// So we create a parent, mark it as current, write the current node to that parent.
// The old parent goes into the stack
// Then we advance the iterator for our next value
} else {
let mut element = create_parent(&current_node, &next_value_path);
add_current_to_parent_and_write_queue(
Expand All @@ -259,8 +229,7 @@ where
next_value_opt = data_iter.next();
}

// We empty the stack, where each node is a child of the one in the stack, so we just keep
// popping and adding to parent
// Empty the stack
add_current_to_parent_and_write_queue(&mut nodes_to_write, &current_node, &mut current_parent)?;
while let Some(mut parent_node) = trie_stack.pop() {
add_current_to_parent_and_write_queue(
Expand All @@ -271,16 +240,25 @@ where
current_parent = parent_node;
}

let hash = if current_parent
.element
Ok(Some((current_parent.element, nodes_to_write)))
}

/// Computes the root hash from the root-level BranchNode, handling the single-choice
/// collapse optimization required by the MPT spec. Appends the root node to nodes_to_write.
fn finalize_root(
root_branch: BranchNode,
nodes_to_write: &mut Vec<(Nibbles, Node)>,
) -> H256 {
let mut nodehash_buffer = Vec::with_capacity(512);

if root_branch
.choices
.iter()
.filter(|choice| choice.is_valid())
.count()
== 1
{
let (index, child) = current_parent
.element
let (index, child) = root_branch
.choices
.into_iter()
.enumerate()
Expand Down Expand Up @@ -323,20 +301,141 @@ where
}
}
} else {
let node: Node = current_parent.element.into();
let node: Node = root_branch.into();
nodes_to_write.push((Nibbles::default(), node));
nodes_to_write
.last()
.expect("we just inserted")
.1
.compute_hash_no_alloc(&mut nodehash_buffer)
.finalize()
};
}
}

/// trie_from_sorted_accounts computes and stores into a db a trie from a sorted
/// iterator of H256 paths and values. This function takes a ThreadPool Arc to send
/// the writing task to be done concurrently.
/// To limit the amount of memory this function can use, we use a crossbeam multiproducer
/// multiconsumer queue, which gives the function a buffer to write nodes into before
/// flushing to the db.
pub fn trie_from_sorted_accounts<'scope, T>(
db: &'scope dyn TrieDB,
data_iter: &mut T,
scope: Arc<ThreadPool<'scope>>,
buffer_sender: Sender<Vec<(Nibbles, Node)>>,
buffer_receiver: Receiver<Vec<(Nibbles, Node)>>,
) -> Result<H256, TrieGenerationError>
where
T: Iterator<Item = (H256, Vec<u8>)> + Send,
{
let Some((root_branch, mut nodes_to_write)) = trie_from_sorted_accounts_inner(
db,
data_iter,
scope,
buffer_sender.clone(),
buffer_receiver,
)?
else {
return Ok(*EMPTY_TRIE_HASH);
};
let hash = finalize_root(root_branch, &mut nodes_to_write);
let _ = flush_nodes_to_write(nodes_to_write, db, buffer_sender);
Ok(hash)
}

/// Builds the account trie in parallel by partitioning sorted data into 16 buckets
/// (one per first nibble of the key hash) and building subtries concurrently.
/// Each subtrie writes its nodes at correct full paths, then a root BranchNode
/// is created from the 16 subtrie root hashes.
pub fn trie_from_sorted_accounts_parallel<T>(
db: &dyn TrieDB,
accounts_iter: &mut T,
) -> Result<H256, TrieGenerationError>
where
T: Iterator<Item = (H256, Vec<u8>)> + Send,
{
// Partition data into 16 buckets by first nibble
let mut buckets: [Vec<(H256, Vec<u8>)>; 16] = Default::default();
for (key, value) in accounts_iter {
let first_nibble = (key.0[0] >> 4) as usize;
buckets[first_nibble].push((key, value));
}

let non_empty_count = buckets.iter().filter(|b| !b.is_empty()).count();
if non_empty_count == 0 {
return Ok(*EMPTY_TRIE_HASH);
}
// Single bucket: fall back to sequential (parallel overhead not worth it)
if non_empty_count == 1 {
let items: Vec<_> = buckets.into_iter().flatten().collect();
return trie_from_sorted_accounts_wrap(db, &mut items.into_iter());
}

// Build 16 subtries in parallel using scoped threads + shared ThreadPool for DB writes
scope(|s| {
let pool = Arc::new(ThreadPool::new(16, s));
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This creates a ThreadPool with 16 threads for DB writes, plus 16 scoped threads (one per nibble) are spawned at line 388. That's up to 32 threads competing for CPU. On the benchmark machine (12 cores) this is fine, but on smaller machines the over-subscription could hurt.

Consider sizing the pool based on available parallelism, e.g.:

let num_threads = std::thread::available_parallelism()
    .map(|n| n.get())
    .unwrap_or(4);
let pool = Arc::new(ThreadPool::new(num_threads, s));


let handles: Vec<_> = buckets
.into_iter()
.enumerate()
.filter_map(|(nibble, bucket)| {
if bucket.is_empty() {
return None;
}
let pool = pool.clone();
let handle = std::thread::Builder::new()
.name(format!("subtrie-{nibble}"))
.spawn_scoped(s, move || -> Result<Option<BranchNode>, TrieGenerationError> {
let (buf_tx, buf_rx) =
bounded::<Vec<(Nibbles, Node)>>(BUFFER_COUNT as usize);
for _ in 0..BUFFER_COUNT {
let _ = buf_tx.send(Vec::with_capacity(SIZE_TO_WRITE_DB as usize));
}
let result = trie_from_sorted_accounts_inner(
db,
&mut bucket.into_iter(),
pool,
buf_tx.clone(),
buf_rx,
)?;
if let Some((branch, nodes_to_write)) = result {
let _ = flush_nodes_to_write(nodes_to_write, db, buf_tx);
Ok(Some(branch))
} else {
Ok(None)
}
})
.expect("Failed to spawn subtrie thread");
Some((nibble, handle))
})
.collect();

// Collect subtrie roots and merge into root BranchNode
let mut root_choices = BranchNode::EMPTY_CHOICES;
for (nibble, handle) in handles {
let branch_opt = handle
.join()
.map_err(|_| TrieGenerationError::ThreadJoinError())??;
if let Some(branch) = branch_opt {
root_choices[nibble] = branch.choices[nibble].clone();
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

incorrect branch choice selection

Each subtrie builds nodes for keys starting with nibble N, so the root choice at index N should reference the subtrie's root. However, this line uses branch.choices[nibble] which reads the Nth choice from the subtrie's root branch, not the subtrie's own root hash.

Suggested change
root_choices[nibble] = branch.choices[nibble].clone();
root_choices[nibble] = branch.compute_hash_no_alloc(&mut Vec::new()).into();

This will cause incorrect trie root hashes when accounts span multiple nibbles.

Prompt To Fix With AI
This is a comment left during a code review.
Path: crates/common/trie/trie_sorted.rs
Line: 420:420

Comment:
incorrect branch choice selection

Each subtrie builds nodes for keys starting with nibble N, so the root choice at index N should reference the subtrie's root. However, this line uses `branch.choices[nibble]` which reads the Nth choice from the subtrie's root branch, not the subtrie's own root hash.

```suggestion
                root_choices[nibble] = branch.compute_hash_no_alloc(&mut Vec::new()).into();
```

This will cause incorrect trie root hashes when accounts span multiple nibbles.

How can I resolve this? If you propose a fix, please make it concise.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This line relies on the invariant that trie_from_sorted_accounts_inner, when fed keys all sharing the same first nibble N, returns a BranchNode where only choices[N] is populated. That's true today because _inner processes full H256 paths (not stripped keys), but it's subtle and fragile — a future caller passing pre-stripped keys would silently produce a wrong trie.

Consider adding a debug assertion:

if let Some(branch) = branch_opt {
    debug_assert!(
        branch.choices.iter().enumerate()
            .filter(|(_, c)| c.is_valid())
            .all(|(i, _)| i == nibble),
        "subtrie for nibble {nibble} has unexpected populated choices"
    );
    root_choices[nibble] = branch.choices[nibble].clone();
}

}
}

// Write root BranchNode to DB
let root_node: Node = BranchNode {
choices: root_choices,
value: vec![],
}
.into();
let mut buf = Vec::with_capacity(512);
let hash = root_node.compute_hash_no_alloc(&mut buf).finalize();
db.put_batch_no_alloc(&[(Nibbles::default(), root_node)])
.map_err(TrieGenerationError::FlushToDbError)?;

Ok(hash)
})
}

/// Wrapper function for `trie_from_sorted_accounts` that handles concurrency
/// and memory limits
pub fn trie_from_sorted_accounts_wrap<T>(
Expand Down Expand Up @@ -508,6 +607,48 @@ mod test {
assert!(tested_trie_hash == trie_hash)
}

/// Verifies that the parallel trie builder produces the same root hash and
/// DB contents as the sequential builder
fn run_test_parallel(accounts: BTreeMap<H256, Vec<u8>>) {
// Build with sequential algorithm
let seq_data = Arc::new(Mutex::new(BTreeMap::new()));
let seq_trie = Trie::new(Box::new(InMemoryTrieDB::new(seq_data.clone())));
let seq_hash: H256 = trie_from_sorted_accounts_wrap(
seq_trie.db(),
&mut accounts
.clone()
.into_iter()
.map(|(hash, state)| (hash, state.encode_to_vec())),
)
.expect("Sequential build failed");

// Build with parallel algorithm
let par_data = Arc::new(Mutex::new(BTreeMap::new()));
let par_trie = Trie::new(Box::new(InMemoryTrieDB::new(par_data.clone())));
let par_hash: H256 = trie_from_sorted_accounts_parallel(
par_trie.db(),
&mut accounts
.into_iter()
.map(|(hash, state)| (hash, state.encode_to_vec())),
)
.expect("Parallel build failed");

assert_eq!(seq_hash, par_hash, "Root hashes differ");

let seq_data = seq_data.lock().unwrap();
let par_data = par_data.lock().unwrap();
for (k, v) in seq_data.iter() {
assert!(
par_data.contains_key(k),
"Parallel DB missing key: {k:?}"
);
assert_eq!(
*v, par_data[k],
"Value mismatch for key: {k:?}"
);
}
}

#[test]
fn test_1() {
run_test_account_state(generate_input_1());
Expand Down Expand Up @@ -537,4 +678,40 @@ mod test {
fn test_slots_1() {
run_test_storage_slots(generate_input_slots_1());
}

#[test]
fn test_parallel_multi_nibble() {
// generate_input_2 has entries spanning multiple first nibbles (0, 1, 6, 9, a, b, c, f)
run_test_parallel(generate_input_2());
}

#[test]
fn test_parallel_single_nibble() {
// generate_input_1 has all entries starting with nibble 6 — falls back to sequential
run_test_parallel(generate_input_1());
}

#[test]
fn test_parallel_same_prefix() {
// generate_input_3 has all entries starting with 05 — falls back to sequential
run_test_parallel(generate_input_3());
}

#[test]
fn test_parallel_single_item() {
run_test_parallel(generate_input_4());
}

#[test]
fn test_parallel_empty() {
let accounts: BTreeMap<H256, Vec<u8>> = BTreeMap::new();
let par_data = Arc::new(Mutex::new(BTreeMap::new()));
let par_trie = Trie::new(Box::new(InMemoryTrieDB::new(par_data)));
let hash = trie_from_sorted_accounts_parallel(
par_trie.db(),
&mut accounts.into_iter(),
)
.expect("Parallel build failed");
assert_eq!(hash, *EMPTY_TRIE_HASH);
}
}
3 changes: 3 additions & 0 deletions crates/networking/p2p/peer_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use crate::{
message::Message as RLPxMessage,
p2p::{Capability, SUPPORTED_ETH_CAPABILITIES},
},
snap::RequestSizerMap,
};
use ethrex_common::{
H256,
Expand Down Expand Up @@ -40,6 +41,7 @@ pub use crate::snap::{DumpError, RequestMetadata, RequestStorageTrieNodesError,
pub struct PeerHandler {
pub peer_table: PeerTable,
pub initiator: GenServerHandle<RLPxInitiator>,
pub request_sizer: RequestSizerMap,
}

pub enum BlockRequestOrder {
Expand Down Expand Up @@ -100,6 +102,7 @@ impl PeerHandler {
Self {
peer_table,
initiator,
request_sizer: RequestSizerMap::new(),
}
}

Expand Down
Loading
Loading