From 5efae33a05cf1f77f1396b42b71d72e68844f2d3 Mon Sep 17 00:00:00 2001 From: ilitteri Date: Wed, 11 Feb 2026 12:56:34 -0300 Subject: [PATCH 1/2] Add adaptive request sizing and storage range bisection Adaptive request sizing: add per-peer response_bytes tuning based on response latency. Start at 128KB, scale up 1.5x when latency < 2s, scale down 1.5x when > 10s, clamped to [50KB, 2MB]. Applied to all snap protocol request types (account ranges, storage ranges, trie nodes, bytecodes). Includes RequestSizerMap in PeerHandler with 7 unit tests. Storage range bisection: when a big-account sub-range returns incomplete, bisect the remaining range into two sub-tasks dispatched to different peers, doubling download bandwidth for large contracts. Falls back to single continuation for ranges smaller than 2. --- crates/networking/p2p/peer_handler.rs | 3 + crates/networking/p2p/snap/client.rs | 122 ++++++++++--- crates/networking/p2p/snap/constants.rs | 11 ++ crates/networking/p2p/snap/mod.rs | 4 + crates/networking/p2p/snap/request_sizer.rs | 169 ++++++++++++++++++ crates/networking/p2p/sync/healing/state.rs | 2 + crates/networking/p2p/sync/healing/storage.rs | 11 +- 7 files changed, 293 insertions(+), 29 deletions(-) create mode 100644 crates/networking/p2p/snap/request_sizer.rs diff --git a/crates/networking/p2p/peer_handler.rs b/crates/networking/p2p/peer_handler.rs index ec2af80136b..ee1c37ebd3d 100644 --- a/crates/networking/p2p/peer_handler.rs +++ b/crates/networking/p2p/peer_handler.rs @@ -12,6 +12,7 @@ use crate::{ message::Message as RLPxMessage, p2p::{Capability, SUPPORTED_ETH_CAPABILITIES}, }, + snap::RequestSizerMap, }; use ethrex_common::{ H256, @@ -40,6 +41,7 @@ pub use crate::snap::{DumpError, RequestMetadata, RequestStorageTrieNodesError, pub struct PeerHandler { pub peer_table: PeerTable, pub initiator: GenServerHandle, + pub request_sizer: RequestSizerMap, } pub enum BlockRequestOrder { @@ -100,6 +102,7 @@ impl PeerHandler { Self { peer_table, initiator, + request_sizer: RequestSizerMap::new(), } } diff --git a/crates/networking/p2p/snap/client.rs b/crates/networking/p2p/snap/client.rs index 0757f73a434..6c4958b75dc 100644 --- a/crates/networking/p2p/snap/client.rs +++ b/crates/networking/p2p/snap/client.rs @@ -16,7 +16,7 @@ use crate::{ GetStorageRanges, GetTrieNodes, StorageRanges, TrieNodes, }, }, - snap::{constants::*, encodable_to_proof, error::SnapError}, + snap::{constants::*, encodable_to_proof, error::SnapError, request_sizer::RequestSizerMap}, sync::{AccountStorageRoots, SnapBlockSyncState, block_is_stale, update_pivot}, utils::{ AccountsWithStorage, dump_accounts_to_file, dump_storages_to_file, @@ -36,7 +36,7 @@ use std::{ collections::{BTreeMap, HashMap, VecDeque}, path::Path, sync::atomic::Ordering, - time::{Duration, SystemTime}, + time::{Duration, Instant, SystemTime}, }; use tracing::{debug, error, info, trace, warn}; @@ -259,6 +259,7 @@ pub async fn request_account_range( } let peer_table = peers.peer_table.clone(); + let request_sizer = peers.request_sizer.clone(); tokio::spawn(request_account_range_worker( peer_id, @@ -268,6 +269,7 @@ pub async fn request_account_range( chunk_end, pivot_header.state_root, tx, + request_sizer, )); } @@ -448,6 +450,7 @@ pub async fn request_bytecodes( .collect(); let mut peer_table = peers.peer_table.clone(); + let request_sizer = peers.request_sizer.clone(); tokio::spawn(async move { let empty_task_result = TaskResult { @@ -460,12 +463,14 @@ pub async fn request_bytecodes( debug!( "Requesting bytecode from peer {peer_id}, chunk: {chunk_start:?} - {chunk_end:?}" ); + let response_bytes = request_sizer.response_bytes_for_peer(&peer_id); let request_id = rand::random(); let request = RLPxMessage::GetByteCodes(GetByteCodes { id: request_id, hashes: hashes_to_request.clone(), - bytes: MAX_RESPONSE_BYTES, + bytes: response_bytes, }); + let request_start = Instant::now(); if let Ok(RLPxMessage::ByteCodes(ByteCodes { id: _, codes })) = PeerHandler::make_request( &mut peer_table, @@ -476,6 +481,7 @@ pub async fn request_bytecodes( ) .await { + request_sizer.record_response(&peer_id, request_start.elapsed()); if codes.is_empty() { tx.send(empty_task_result).await.ok(); // Too spammy @@ -684,15 +690,6 @@ pub async fn request_storage_ranges( } else if let Some(hash_end) = hash_end { // Task was a big storage account result if hash_start <= hash_end { - let task = StorageTask { - start_index: remaining_start, - end_index: remaining_end, - start_hash: hash_start, - end_hash: Some(hash_end), - }; - tasks_queue_not_started.push_back(task); - task_count += 1; - let acc_hash = *accounts_by_root_hash[remaining_start] .1 .first() @@ -700,11 +697,65 @@ pub async fn request_storage_ranges( let (_, old_intervals) = account_storage_roots .accounts_with_storage_root .get_mut(&acc_hash).ok_or(SnapError::InternalError("Tried to get the old download intervals for an account but did not find them".to_owned()))?; - for (old_start, end) in old_intervals { - if end == &hash_end { - *old_start = hash_start; + + // Bisect: split remaining range into two sub-tasks for parallel download + let start_u256 = U256::from_big_endian(&hash_start.0); + let end_u256 = U256::from_big_endian(&hash_end.0); + let range_size = end_u256.saturating_sub(start_u256); + + if range_size >= U256::from(2) { + let midpoint_u256 = start_u256 + range_size / 2; + let midpoint = H256::from_uint(&midpoint_u256); + let midpoint_next_u256 = midpoint_u256 + 1; + let midpoint_next = H256::from_uint(&midpoint_next_u256); + + tasks_queue_not_started.push_back(StorageTask { + start_index: remaining_start, + end_index: remaining_end, + start_hash: hash_start, + end_hash: Some(midpoint), + }); + tasks_queue_not_started.push_back(StorageTask { + start_index: remaining_start, + end_index: remaining_end, + start_hash: midpoint_next, + end_hash: Some(hash_end), + }); + task_count += 2; + + // Replace the old interval [_, hash_end] with two: + // [hash_start, midpoint] and [midpoint+1, hash_end] + for (old_start, end) in old_intervals.iter_mut() { + if *end == hash_end { + *old_start = hash_start; + *end = midpoint; + break; + } + } + old_intervals.push((midpoint_next, hash_end)); + + debug!( + "Bisected storage range for account {:x}: [{:x}..{:x}] -> [{:x}..{:x}] + [{:x}..{:x}]", + acc_hash, hash_start, hash_end, + hash_start, midpoint, midpoint_next, hash_end + ); + } else { + // Range too small to bisect, create single continuation task + tasks_queue_not_started.push_back(StorageTask { + start_index: remaining_start, + end_index: remaining_end, + start_hash: hash_start, + end_hash: Some(hash_end), + }); + task_count += 1; + + for (old_start, end) in old_intervals { + if end == &hash_end { + *old_start = hash_start; + } } } + account_storage_roots .healed_accounts .extend(accounts_by_root_hash[start_index].1.iter().copied()); @@ -988,6 +1039,7 @@ pub async fn request_storage_ranges( ); } let peer_table = peers.peer_table.clone(); + let request_sizer = peers.request_sizer.clone(); tokio::spawn(request_storage_ranges_worker( task, @@ -998,6 +1050,7 @@ pub async fn request_storage_ranges( chunk_account_hashes, chunk_storage_roots, tx, + request_sizer, )); } @@ -1049,11 +1102,13 @@ pub async fn request_state_trienodes( mut peer_table: PeerTable, state_root: H256, paths: Vec, + request_sizer: RequestSizerMap, ) -> Result, SnapError> { let expected_nodes = paths.len(); // Keep track of peers we requested from so we can penalize unresponsive peers when we get a response // This is so we avoid penalizing peers due to requesting stale data + let response_bytes = request_sizer.response_bytes_for_peer(&peer_id); let request_id = rand::random(); let request = RLPxMessage::GetTrieNodes(GetTrieNodes { id: request_id, @@ -1063,8 +1118,9 @@ pub async fn request_state_trienodes( .iter() .map(|vec| vec![Bytes::from(vec.path.encode_compact())]) .collect(), - bytes: MAX_RESPONSE_BYTES, + bytes: response_bytes, }); + let request_start = Instant::now(); let nodes = match PeerHandler::make_request( &mut peer_table, peer_id, @@ -1074,12 +1130,15 @@ pub async fn request_state_trienodes( ) .await { - Ok(RLPxMessage::TrieNodes(trie_nodes)) => trie_nodes - .nodes - .iter() - .map(|node| Node::decode(node)) - .collect::, _>>() - .map_err(SnapError::from), + Ok(RLPxMessage::TrieNodes(trie_nodes)) => { + request_sizer.record_response(&peer_id, request_start.elapsed()); + trie_nodes + .nodes + .iter() + .map(|node| Node::decode(node)) + .collect::, _>>() + .map_err(SnapError::from) + } Ok(other_msg) => Err(SnapError::Protocol( PeerConnectionError::UnexpectedResponse("TrieNodes".to_string(), other_msg.to_string()), )), @@ -1113,11 +1172,13 @@ pub async fn request_storage_trienodes( mut connection: PeerConnection, mut peer_table: PeerTable, get_trie_nodes: GetTrieNodes, + request_sizer: RequestSizerMap, ) -> Result { // Keep track of peers we requested from so we can penalize unresponsive peers when we get a response // This is so we avoid penalizing peers due to requesting stale data let request_id = get_trie_nodes.id; let request = RLPxMessage::GetTrieNodes(get_trie_nodes); + let request_start = Instant::now(); match PeerHandler::make_request( &mut peer_table, peer_id, @@ -1127,7 +1188,10 @@ pub async fn request_storage_trienodes( ) .await { - Ok(RLPxMessage::TrieNodes(trie_nodes)) => Ok(trie_nodes), + Ok(RLPxMessage::TrieNodes(trie_nodes)) => { + request_sizer.record_response(&peer_id, request_start.elapsed()); + Ok(trie_nodes) + } Ok(other_msg) => Err(RequestStorageTrieNodesError { request_id, source: SnapError::Protocol(PeerConnectionError::UnexpectedResponse( @@ -1151,16 +1215,19 @@ async fn request_account_range_worker( chunk_end: H256, state_root: H256, tx: tokio::sync::mpsc::Sender<(Vec, H256, Option<(H256, H256)>)>, + request_sizer: RequestSizerMap, ) -> Result<(), SnapError> { debug!("Requesting account range from peer {peer_id}, chunk: {chunk_start:?} - {chunk_end:?}"); + let response_bytes = request_sizer.response_bytes_for_peer(&peer_id); let request_id = rand::random(); let request = RLPxMessage::GetAccountRange(GetAccountRange { id: request_id, root_hash: state_root, starting_hash: chunk_start, limit_hash: chunk_end, - response_bytes: MAX_RESPONSE_BYTES, + response_bytes, }); + let request_start = Instant::now(); if let Ok(RLPxMessage::AccountRange(AccountRange { id: _, accounts, @@ -1174,6 +1241,7 @@ async fn request_account_range_worker( ) .await { + request_sizer.record_response(&peer_id, request_start.elapsed()); if accounts.is_empty() { tx.send((Vec::new(), peer_id, Some((chunk_start, chunk_end)))) .await @@ -1253,6 +1321,7 @@ async fn request_storage_ranges_worker( chunk_account_hashes: Vec, chunk_storage_roots: Vec, tx: tokio::sync::mpsc::Sender, + request_sizer: RequestSizerMap, ) -> Result<(), SnapError> { let start = task.start_index; let end = task.end_index; @@ -1266,6 +1335,7 @@ async fn request_storage_ranges_worker( remaining_end: task.end_index, remaining_hash_range: (start_hash, task.end_hash), }; + let response_bytes = request_sizer.response_bytes_for_peer(&peer_id); let request_id = rand::random(); let request = RLPxMessage::GetStorageRanges(GetStorageRanges { id: request_id, @@ -1273,8 +1343,9 @@ async fn request_storage_ranges_worker( account_hashes: chunk_account_hashes, starting_hash: start_hash, limit_hash: task.end_hash.unwrap_or(HASH_MAX), - response_bytes: MAX_RESPONSE_BYTES, + response_bytes, }); + let request_start = Instant::now(); let Ok(RLPxMessage::StorageRanges(StorageRanges { id: _, slots, @@ -1292,6 +1363,7 @@ async fn request_storage_ranges_worker( tx.send(empty_task_result).await.ok(); return Ok(()); }; + request_sizer.record_response(&peer_id, request_start.elapsed()); if slots.is_empty() && proof.is_empty() { tx.send(empty_task_result).await.ok(); tracing::debug!("Received empty storage range"); diff --git a/crates/networking/p2p/snap/constants.rs b/crates/networking/p2p/snap/constants.rs index 81bdc59365e..64108531cb6 100644 --- a/crates/networking/p2p/snap/constants.rs +++ b/crates/networking/p2p/snap/constants.rs @@ -14,8 +14,19 @@ use std::time::Duration; /// /// This limits the amount of data a peer can return in a single response, /// preventing memory exhaustion and ensuring reasonable response times. +/// Used on the server side for response capping. Client-side requests use +/// adaptive sizing via [`super::request_sizer::RequestSizerMap`]. pub const MAX_RESPONSE_BYTES: u64 = 512 * 1024; +/// Initial response bytes budget for adaptive request sizing (128 KB). +pub const INITIAL_RESPONSE_BYTES: u64 = 128 * 1024; + +/// Minimum response bytes for adaptive sizing (50 KB). +pub const MIN_RESPONSE_BYTES_ADAPTIVE: u64 = 50 * 1024; + +/// Maximum response bytes for adaptive sizing (2 MB). +pub const MAX_RESPONSE_BYTES_ADAPTIVE: u64 = 2 * 1024 * 1024; + /// Maximum number of accounts/items to request in a single snap request. /// /// This magic number is not part of the protocol specification and is taken diff --git a/crates/networking/p2p/snap/mod.rs b/crates/networking/p2p/snap/mod.rs index 1b0fee1005d..7dfe60b2519 100644 --- a/crates/networking/p2p/snap/mod.rs +++ b/crates/networking/p2p/snap/mod.rs @@ -15,6 +15,7 @@ pub mod client; pub mod constants; pub mod error; +pub mod request_sizer; mod server; use bytes::Bytes; @@ -28,6 +29,9 @@ pub use server::{ // Re-export error types pub use error::{DumpError, SnapError}; +// Re-export adaptive request sizer +pub use request_sizer::RequestSizerMap; + // Re-export client types and functions pub use client::{ RequestMetadata, RequestStorageTrieNodesError, request_account_range, request_bytecodes, diff --git a/crates/networking/p2p/snap/request_sizer.rs b/crates/networking/p2p/snap/request_sizer.rs new file mode 100644 index 00000000000..1d3492ae7a7 --- /dev/null +++ b/crates/networking/p2p/snap/request_sizer.rs @@ -0,0 +1,169 @@ +//! Adaptive request sizing for snap sync protocol. +//! +//! Adjusts the `response_bytes` budget per peer based on observed response latency, +//! similar to Nethermind's adaptive approach. Fast peers get larger requests, +//! slow peers get smaller ones. + +use super::constants::{ + INITIAL_RESPONSE_BYTES, MAX_RESPONSE_BYTES_ADAPTIVE, MIN_RESPONSE_BYTES_ADAPTIVE, +}; +use ethrex_common::H256; +use std::{ + collections::HashMap, + sync::{Arc, Mutex}, + time::Duration, +}; + +/// Latency threshold below which the budget is scaled up. +const LATENCY_LOW: Duration = Duration::from_secs(2); + +/// Latency threshold above which the budget is scaled down. +const LATENCY_HIGH: Duration = Duration::from_secs(10); + +/// Scale factor: multiply by 3/2 (1.5x) for scale-up, multiply by 2/3 for scale-down. +const SCALE_UP_NUM: u64 = 3; +const SCALE_UP_DEN: u64 = 2; + +/// Per-peer adaptive request sizer state. +#[derive(Debug, Clone)] +struct PeerSizer { + response_bytes: u64, +} + +impl PeerSizer { + fn new() -> Self { + Self { + response_bytes: INITIAL_RESPONSE_BYTES, + } + } + + fn response_bytes(&self) -> u64 { + self.response_bytes + } + + fn record_response(&mut self, latency: Duration) { + if latency < LATENCY_LOW { + // Scale up: multiply by 1.5, cap at max + self.response_bytes = (self.response_bytes * SCALE_UP_NUM / SCALE_UP_DEN) + .min(MAX_RESPONSE_BYTES_ADAPTIVE); + } else if latency > LATENCY_HIGH { + // Scale down: divide by 1.5, floor at min + self.response_bytes = (self.response_bytes * SCALE_UP_DEN / SCALE_UP_NUM) + .max(MIN_RESPONSE_BYTES_ADAPTIVE); + } + } +} + +/// Thread-safe map of per-peer adaptive request sizers. +/// +/// Cheaply cloneable (Arc-backed). Pass clones to spawned tasks. +#[derive(Debug, Clone, Default)] +pub struct RequestSizerMap { + inner: Arc>>, +} + +impl RequestSizerMap { + pub fn new() -> Self { + Self::default() + } + + /// Returns the current response bytes budget for a peer. + /// Creates a new sizer at the default budget if the peer is not yet tracked. + pub fn response_bytes_for_peer(&self, peer_id: &H256) -> u64 { + self.inner + .lock() + .expect("RequestSizerMap lock poisoned") + .entry(*peer_id) + .or_insert_with(PeerSizer::new) + .response_bytes() + } + + /// Records a response latency for a peer, adjusting its future request budget. + pub fn record_response(&self, peer_id: &H256, latency: Duration) { + self.inner + .lock() + .expect("RequestSizerMap lock poisoned") + .entry(*peer_id) + .or_insert_with(PeerSizer::new) + .record_response(latency); + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn starts_at_initial_budget() { + let sizer = RequestSizerMap::new(); + let peer = H256::zero(); + assert_eq!(sizer.response_bytes_for_peer(&peer), INITIAL_RESPONSE_BYTES); + } + + #[test] + fn scales_up_on_fast_response() { + let sizer = RequestSizerMap::new(); + let peer = H256::zero(); + sizer.record_response(&peer, Duration::from_secs(1)); + let expected = INITIAL_RESPONSE_BYTES * 3 / 2; + assert_eq!(sizer.response_bytes_for_peer(&peer), expected); + } + + #[test] + fn scales_down_on_slow_response() { + let sizer = RequestSizerMap::new(); + let peer = H256::zero(); + sizer.record_response(&peer, Duration::from_secs(11)); + let expected = INITIAL_RESPONSE_BYTES * 2 / 3; + assert_eq!(sizer.response_bytes_for_peer(&peer), expected); + } + + #[test] + fn no_change_in_normal_range() { + let sizer = RequestSizerMap::new(); + let peer = H256::zero(); + sizer.record_response(&peer, Duration::from_secs(5)); + assert_eq!(sizer.response_bytes_for_peer(&peer), INITIAL_RESPONSE_BYTES); + } + + #[test] + fn clamps_to_max() { + let sizer = RequestSizerMap::new(); + let peer = H256::zero(); + // Scale up many times + for _ in 0..50 { + sizer.record_response(&peer, Duration::from_millis(100)); + } + assert_eq!( + sizer.response_bytes_for_peer(&peer), + MAX_RESPONSE_BYTES_ADAPTIVE + ); + } + + #[test] + fn clamps_to_min() { + let sizer = RequestSizerMap::new(); + let peer = H256::zero(); + // Scale down many times + for _ in 0..50 { + sizer.record_response(&peer, Duration::from_secs(30)); + } + assert_eq!( + sizer.response_bytes_for_peer(&peer), + MIN_RESPONSE_BYTES_ADAPTIVE + ); + } + + #[test] + fn independent_per_peer() { + let sizer = RequestSizerMap::new(); + let fast_peer = H256::from_low_u64_be(1); + let slow_peer = H256::from_low_u64_be(2); + + sizer.record_response(&fast_peer, Duration::from_millis(500)); + sizer.record_response(&slow_peer, Duration::from_secs(15)); + + assert!(sizer.response_bytes_for_peer(&fast_peer) > INITIAL_RESPONSE_BYTES); + assert!(sizer.response_bytes_for_peer(&slow_peer) < INITIAL_RESPONSE_BYTES); + } +} diff --git a/crates/networking/p2p/sync/healing/state.rs b/crates/networking/p2p/sync/healing/state.rs index 3cb0924da15..95303abbdb3 100644 --- a/crates/networking/p2p/sync/healing/state.rs +++ b/crates/networking/p2p/sync/healing/state.rs @@ -335,6 +335,7 @@ async fn dispatch_state_healing_batches( let tx = task_sender.clone(); *inflight_tasks += 1; let peer_table = peers.peer_table.clone(); + let request_sizer = peers.request_sizer.clone(); tokio::spawn(async move { let response = request_state_trienodes( @@ -343,6 +344,7 @@ async fn dispatch_state_healing_batches( peer_table, state_root, batch.clone(), + request_sizer, ) .await; let _ = tx diff --git a/crates/networking/p2p/sync/healing/storage.rs b/crates/networking/p2p/sync/healing/storage.rs index e9d8922cb3d..29146acd4bb 100644 --- a/crates/networking/p2p/sync/healing/storage.rs +++ b/crates/networking/p2p/sync/healing/storage.rs @@ -8,8 +8,7 @@ use crate::{ snap::{ RequestStorageTrieNodesError, constants::{ - MAX_IN_FLIGHT_REQUESTS, MAX_RESPONSE_BYTES, SHOW_PROGRESS_INTERVAL_DURATION, - STORAGE_BATCH_SIZE, + MAX_IN_FLIGHT_REQUESTS, SHOW_PROGRESS_INTERVAL_DURATION, STORAGE_BATCH_SIZE, }, request_storage_trienodes, }, @@ -359,11 +358,13 @@ async fn ask_peers_for_nodes( peer_id, }, ); + let request_sizer = peers.request_sizer.clone(); + let response_bytes = request_sizer.response_bytes_for_peer(&peer_id); let gtn = GetTrieNodes { id: req_id, root_hash: state_root, paths, - bytes: MAX_RESPONSE_BYTES, + bytes: response_bytes, }; let tx = task_sender.clone(); @@ -372,7 +373,9 @@ async fn ask_peers_for_nodes( requests_task_joinset.spawn(async move { let req_id = gtn.id; - let response = request_storage_trienodes(peer_id, connection, peer_table, gtn).await; + let response = + request_storage_trienodes(peer_id, connection, peer_table, gtn, request_sizer) + .await; // TODO: add error handling tx.try_send(response).inspect_err( |err| debug!(error=?err, "Failed to send state trie nodes response"), From fb91a34620a324488826f3bd9f0ae7323091a758 Mon Sep 17 00:00:00 2001 From: ilitteri Date: Wed, 11 Feb 2026 12:56:51 -0300 Subject: [PATCH 2/2] Parallelize account trie construction by first-nibble partitioning Partition the sorted account iterator into 16 buckets by first nibble of H256 key and build each subtrie in parallel using std::thread::scope. Merge the 16 subtrie root hashes into a single root BranchNode. Refactor trie_from_sorted_accounts into inner + finalize layers so the core loop can be reused for subtrie construction. Falls back to sequential build when only 0-1 nibble buckets are non-empty. Includes 5 new tests for parallel construction. Call site in snap_sync.rs updated to use trie_from_sorted_accounts_parallel. --- crates/common/trie/trie_sorted.rs | 269 ++++++++++++++++++++---- crates/networking/p2p/sync/snap_sync.rs | 4 +- 2 files changed, 225 insertions(+), 48 deletions(-) diff --git a/crates/common/trie/trie_sorted.rs b/crates/common/trie/trie_sorted.rs index 32e47da806a..06854e50913 100644 --- a/crates/common/trie/trie_sorted.rs +++ b/crates/common/trie/trie_sorted.rs @@ -156,39 +156,28 @@ fn flush_nodes_to_write( Ok(()) } -/// trie_from_sorted_accounts computes and stores into a db a trie from a sorted -/// iterator of H256 paths and values. This function takes a ThreadPool Arc to send -/// the writing task to be done concurrently. -/// To limit the amount of memory this function can use, we use a crossbeam multiproducer -/// multiconsumer queue, which gives the function a buffer to write nodes into before -/// flushing to the db. -pub fn trie_from_sorted_accounts<'scope, T>( +/// Core trie building loop. Processes sorted (H256, Vec) pairs into a trie structure, +/// writing intermediate nodes to DB via the ThreadPool. Returns the root-level BranchNode +/// (with choices populated) and any unflushed nodes, or None if the iterator was empty. +#[allow(clippy::type_complexity)] +fn trie_from_sorted_accounts_inner<'scope, T>( db: &'scope dyn TrieDB, data_iter: &mut T, scope: Arc>, buffer_sender: Sender>, buffer_receiver: Receiver>, -) -> Result +) -> Result)>, TrieGenerationError> where T: Iterator)> + Send, { let Some(initial_value) = data_iter.next() else { - return Ok(*EMPTY_TRIE_HASH); + return Ok(None); }; let mut nodes_to_write: Vec<(Nibbles, Node)> = buffer_receiver .recv() .expect("This channel shouldn't close"); - // We have a stack of the parents of the current parent - let mut trie_stack: Vec = Vec::with_capacity(64); // Optimized for H256 - - // This is the current parent of the first element. We assume that the root node - // is always a parent, and we fix it afterwards if it's not true - // The root is a parent of all nodes - let mut nodehash_buffer = Vec::with_capacity(512); + let mut trie_stack: Vec = Vec::with_capacity(64); let mut current_parent = StackElement::default(); - - // The current node that is being used for computing. We compare it with the current - // parent and the next value to see where it should be written let mut current_node: CenterSide = CenterSide::from_value(initial_value); let mut next_value_opt: Option<(H256, Vec)> = data_iter.next(); @@ -198,7 +187,6 @@ where scope.execute_priority(Box::new(move || { let _ = flush_nodes_to_write(nodes_to_write, db, buffer_sender); })); - // We wait to get a new buffer to avoid writing too much nodes_to_write = buffer_receiver .recv() .expect("This channel shouldn't close"); @@ -206,9 +194,6 @@ where let next_value_path = Nibbles::from_bytes(next_value.0.as_bytes()); - // If the current parent isn't a parent of the next value, that means - // that the current value doesn't have a sibling to the right - // As such we write this node and change the current node to the current parent while !is_child(&next_value_path, ¤t_parent) { add_current_to_parent_and_write_queue( &mut nodes_to_write, @@ -222,12 +207,6 @@ where current_node = temp; } - // If the "distance" (same prefix count) between the current and next value is equal to the - // parent node, that means that they're both "siblings" of the current parent - // Ex: parent=[05] current=[0567] next=[0589] - // there is not a branch between the parent and current, so we just write the - // current element and change the current with the next value while - // advancing the iterator for our next value if current_node.path.count_prefix(¤t_parent.path) == current_node.path.count_prefix(&next_value_path) { @@ -236,15 +215,6 @@ where ¤t_node, &mut current_parent, )?; - - // If the "distance" between the current and next value is larger than that to - // the parent node, that means that there is a closer parent for both of them - // Ex: parent=[05] current=[0567] next=[0569] - // This means that there is a branch in [056] and current is a child - // of that parent - // So we create a parent, mark it as current, write the current node to that parent. - // The old parent goes into the stack - // Then we advance the iterator for our next value } else { let mut element = create_parent(¤t_node, &next_value_path); add_current_to_parent_and_write_queue( @@ -259,8 +229,7 @@ where next_value_opt = data_iter.next(); } - // We empty the stack, where each node is a child of the one in the stack, so we just keep - // popping and adding to parent + // Empty the stack add_current_to_parent_and_write_queue(&mut nodes_to_write, ¤t_node, &mut current_parent)?; while let Some(mut parent_node) = trie_stack.pop() { add_current_to_parent_and_write_queue( @@ -271,16 +240,25 @@ where current_parent = parent_node; } - let hash = if current_parent - .element + Ok(Some((current_parent.element, nodes_to_write))) +} + +/// Computes the root hash from the root-level BranchNode, handling the single-choice +/// collapse optimization required by the MPT spec. Appends the root node to nodes_to_write. +fn finalize_root( + root_branch: BranchNode, + nodes_to_write: &mut Vec<(Nibbles, Node)>, +) -> H256 { + let mut nodehash_buffer = Vec::with_capacity(512); + + if root_branch .choices .iter() .filter(|choice| choice.is_valid()) .count() == 1 { - let (index, child) = current_parent - .element + let (index, child) = root_branch .choices .into_iter() .enumerate() @@ -323,7 +301,7 @@ where } } } else { - let node: Node = current_parent.element.into(); + let node: Node = root_branch.into(); nodes_to_write.push((Nibbles::default(), node)); nodes_to_write .last() @@ -331,12 +309,133 @@ where .1 .compute_hash_no_alloc(&mut nodehash_buffer) .finalize() - }; + } +} +/// trie_from_sorted_accounts computes and stores into a db a trie from a sorted +/// iterator of H256 paths and values. This function takes a ThreadPool Arc to send +/// the writing task to be done concurrently. +/// To limit the amount of memory this function can use, we use a crossbeam multiproducer +/// multiconsumer queue, which gives the function a buffer to write nodes into before +/// flushing to the db. +pub fn trie_from_sorted_accounts<'scope, T>( + db: &'scope dyn TrieDB, + data_iter: &mut T, + scope: Arc>, + buffer_sender: Sender>, + buffer_receiver: Receiver>, +) -> Result +where + T: Iterator)> + Send, +{ + let Some((root_branch, mut nodes_to_write)) = trie_from_sorted_accounts_inner( + db, + data_iter, + scope, + buffer_sender.clone(), + buffer_receiver, + )? + else { + return Ok(*EMPTY_TRIE_HASH); + }; + let hash = finalize_root(root_branch, &mut nodes_to_write); let _ = flush_nodes_to_write(nodes_to_write, db, buffer_sender); Ok(hash) } +/// Builds the account trie in parallel by partitioning sorted data into 16 buckets +/// (one per first nibble of the key hash) and building subtries concurrently. +/// Each subtrie writes its nodes at correct full paths, then a root BranchNode +/// is created from the 16 subtrie root hashes. +pub fn trie_from_sorted_accounts_parallel( + db: &dyn TrieDB, + accounts_iter: &mut T, +) -> Result +where + T: Iterator)> + Send, +{ + // Partition data into 16 buckets by first nibble + let mut buckets: [Vec<(H256, Vec)>; 16] = Default::default(); + for (key, value) in accounts_iter { + let first_nibble = (key.0[0] >> 4) as usize; + buckets[first_nibble].push((key, value)); + } + + let non_empty_count = buckets.iter().filter(|b| !b.is_empty()).count(); + if non_empty_count == 0 { + return Ok(*EMPTY_TRIE_HASH); + } + // Single bucket: fall back to sequential (parallel overhead not worth it) + if non_empty_count == 1 { + let items: Vec<_> = buckets.into_iter().flatten().collect(); + return trie_from_sorted_accounts_wrap(db, &mut items.into_iter()); + } + + // Build 16 subtries in parallel using scoped threads + shared ThreadPool for DB writes + scope(|s| { + let pool = Arc::new(ThreadPool::new(16, s)); + + let handles: Vec<_> = buckets + .into_iter() + .enumerate() + .filter_map(|(nibble, bucket)| { + if bucket.is_empty() { + return None; + } + let pool = pool.clone(); + let handle = std::thread::Builder::new() + .name(format!("subtrie-{nibble}")) + .spawn_scoped(s, move || -> Result, TrieGenerationError> { + let (buf_tx, buf_rx) = + bounded::>(BUFFER_COUNT as usize); + for _ in 0..BUFFER_COUNT { + let _ = buf_tx.send(Vec::with_capacity(SIZE_TO_WRITE_DB as usize)); + } + let result = trie_from_sorted_accounts_inner( + db, + &mut bucket.into_iter(), + pool, + buf_tx.clone(), + buf_rx, + )?; + if let Some((branch, nodes_to_write)) = result { + let _ = flush_nodes_to_write(nodes_to_write, db, buf_tx); + Ok(Some(branch)) + } else { + Ok(None) + } + }) + .expect("Failed to spawn subtrie thread"); + Some((nibble, handle)) + }) + .collect(); + + // Collect subtrie roots and merge into root BranchNode + let mut root_choices = BranchNode::EMPTY_CHOICES; + for (nibble, handle) in handles { + let branch_opt = handle + .join() + .map_err(|_| TrieGenerationError::ThreadJoinError())??; + if let Some(branch) = branch_opt { + root_choices[nibble] = branch.choices[nibble].clone(); + } + } + + // Write root BranchNode to DB + let root_node: Node = BranchNode { + choices: root_choices, + value: vec![], + } + .into(); + let mut buf = Vec::with_capacity(512); + let hash = root_node.compute_hash_no_alloc(&mut buf).finalize(); + db.put_batch_no_alloc(&[(Nibbles::default(), root_node)]) + .map_err(TrieGenerationError::FlushToDbError)?; + + Ok(hash) + }) +} + /// Wrapper function for `trie_from_sorted_accounts` that handles concurrency /// and memory limits pub fn trie_from_sorted_accounts_wrap( @@ -508,6 +607,48 @@ mod test { assert!(tested_trie_hash == trie_hash) } + /// Verifies that the parallel trie builder produces the same root hash and + /// DB contents as the sequential builder + fn run_test_parallel(accounts: BTreeMap>) { + // Build with sequential algorithm + let seq_data = Arc::new(Mutex::new(BTreeMap::new())); + let seq_trie = Trie::new(Box::new(InMemoryTrieDB::new(seq_data.clone()))); + let seq_hash: H256 = trie_from_sorted_accounts_wrap( + seq_trie.db(), + &mut accounts + .clone() + .into_iter() + .map(|(hash, state)| (hash, state.encode_to_vec())), + ) + .expect("Sequential build failed"); + + // Build with parallel algorithm + let par_data = Arc::new(Mutex::new(BTreeMap::new())); + let par_trie = Trie::new(Box::new(InMemoryTrieDB::new(par_data.clone()))); + let par_hash: H256 = trie_from_sorted_accounts_parallel( + par_trie.db(), + &mut accounts + .into_iter() + .map(|(hash, state)| (hash, state.encode_to_vec())), + ) + .expect("Parallel build failed"); + + assert_eq!(seq_hash, par_hash, "Root hashes differ"); + + let seq_data = seq_data.lock().unwrap(); + let par_data = par_data.lock().unwrap(); + for (k, v) in seq_data.iter() { + assert!( + par_data.contains_key(k), + "Parallel DB missing key: {k:?}" + ); + assert_eq!( + *v, par_data[k], + "Value mismatch for key: {k:?}" + ); + } + } + #[test] fn test_1() { run_test_account_state(generate_input_1()); @@ -537,4 +678,40 @@ mod test { fn test_slots_1() { run_test_storage_slots(generate_input_slots_1()); } + + #[test] + fn test_parallel_multi_nibble() { + // generate_input_2 has entries spanning multiple first nibbles (0, 1, 6, 9, a, b, c, f) + run_test_parallel(generate_input_2()); + } + + #[test] + fn test_parallel_single_nibble() { + // generate_input_1 has all entries starting with nibble 6 — falls back to sequential + run_test_parallel(generate_input_1()); + } + + #[test] + fn test_parallel_same_prefix() { + // generate_input_3 has all entries starting with 05 — falls back to sequential + run_test_parallel(generate_input_3()); + } + + #[test] + fn test_parallel_single_item() { + run_test_parallel(generate_input_4()); + } + + #[test] + fn test_parallel_empty() { + let accounts: BTreeMap> = BTreeMap::new(); + let par_data = Arc::new(Mutex::new(BTreeMap::new())); + let par_trie = Trie::new(Box::new(InMemoryTrieDB::new(par_data))); + let hash = trie_from_sorted_accounts_parallel( + par_trie.db(), + &mut accounts.into_iter(), + ) + .expect("Parallel build failed"); + assert_eq!(hash, *EMPTY_TRIE_HASH); + } } diff --git a/crates/networking/p2p/sync/snap_sync.rs b/crates/networking/p2p/sync/snap_sync.rs index 9d7f13c0761..19946596211 100644 --- a/crates/networking/p2p/sync/snap_sync.rs +++ b/crates/networking/p2p/sync/snap_sync.rs @@ -940,7 +940,7 @@ async fn insert_accounts( code_hash_collector: &mut CodeHashCollector, ) -> Result<(H256, BTreeSet), SyncError> { use crate::utils::get_rocksdb_temp_accounts_dir; - use ethrex_trie::trie_sorted::trie_from_sorted_accounts_wrap; + use ethrex_trie::trie_sorted::trie_from_sorted_accounts_parallel; let trie = store.open_direct_state_trie_no_wal(*EMPTY_TRIE_HASH)?; let mut db_options = rocksdb::Options::default(); @@ -967,7 +967,7 @@ async fn insert_accounts( } let iter = db.full_iterator(rocksdb::IteratorMode::Start); - let compute_state_root = trie_from_sorted_accounts_wrap( + let compute_state_root = trie_from_sorted_accounts_parallel( trie.db(), &mut iter .map(|k| k.expect("We shouldn't have a rocksdb error here")) // TODO: remove unwrap