From ddbfc6b5ecf52c3f53f70788520afa41fea7592d Mon Sep 17 00:00:00 2001 From: Lucas Fiegl Date: Mon, 27 Apr 2026 17:13:37 -0300 Subject: [PATCH 1/4] snap: schedule per-interval tasks for big accounts at function entry --- crates/networking/p2p/snap/client.rs | 64 +++++++++++++++++++++++++--- 1 file changed, 58 insertions(+), 6 deletions(-) diff --git a/crates/networking/p2p/snap/client.rs b/crates/networking/p2p/snap/client.rs index 9fede93a267..783ca29f941 100644 --- a/crates/networking/p2p/snap/client.rs +++ b/crates/networking/p2p/snap/client.rs @@ -557,19 +557,71 @@ pub async fn request_storage_ranges( // TODO: Turn this into a stable sort for binary search. accounts_by_root_hash.sort_unstable_by_key(|(_, accounts)| !accounts.len()); let chunk_size = STORAGE_BATCH_SIZE; - let chunk_count = (accounts_by_root_hash.len() / chunk_size) + 1; // list of tasks to be executed // Types are (start_index, end_index, starting_hash) // NOTE: end_index is NOT inclusive + // Partition into bulk-path tasks (fresh accounts with empty intervals) and + // per-interval tasks (big accounts marked in a prior call). The previous + // implementation queued every account from `start_hash: zero` and relied + // on branch-3 in the response handler re-firing each call to re-queue + // per-interval tasks for big accounts. That fails when peers cover a big + // account fully without hitting their response limit on it: branch-3 + // doesn't fire, no per-interval tasks get queued, intervals never drain, + // the account is stuck pending forever even after its data is on disk. let mut tasks_queue_not_started = VecDeque::::new(); - for i in 0..chunk_count { - let chunk_start = chunk_size * i; - let chunk_end = (chunk_start + chunk_size).min(accounts_by_root_hash.len()); + let mut bulk_chunk_start: Option = None; + for (i, (_, accounts)) in accounts_by_root_hash.iter().enumerate() { + let first_account = *accounts.first().ok_or_else(|| { + SnapError::InternalError("Empty accounts vector while scheduling tasks".to_owned()) + })?; + let intervals = &account_storage_roots + .accounts_with_storage_root + .get(&first_account) + .ok_or_else(|| { + SnapError::InternalError( + "Could not find intervals for account while scheduling".to_owned(), + ) + })? + .1; + if intervals.is_empty() { + if bulk_chunk_start.is_none() { + bulk_chunk_start = Some(i); + } + if i + 1 - bulk_chunk_start.unwrap_or(i) >= chunk_size { + tasks_queue_not_started.push_back(StorageTask { + start_index: bulk_chunk_start.unwrap_or(i), + end_index: i + 1, + start_hash: H256::zero(), + end_hash: None, + }); + bulk_chunk_start = None; + } + } else { + if let Some(start) = bulk_chunk_start { + tasks_queue_not_started.push_back(StorageTask { + start_index: start, + end_index: i, + start_hash: H256::zero(), + end_hash: None, + }); + bulk_chunk_start = None; + } + for (start_hash, end_hash) in intervals.clone() { + tasks_queue_not_started.push_back(StorageTask { + start_index: i, + end_index: i + 1, + start_hash, + end_hash: Some(end_hash), + }); + } + } + } + if let Some(start) = bulk_chunk_start { tasks_queue_not_started.push_back(StorageTask { - start_index: chunk_start, - end_index: chunk_end, + start_index: start, + end_index: accounts_by_root_hash.len(), start_hash: H256::zero(), end_hash: None, }); From 5d2c088a33320b29a39eb3897cc7bb7da65bb569 Mon Sep 17 00:00:00 2001 From: Lucas Fiegl Date: Wed, 29 Apr 2026 11:55:46 -0300 Subject: [PATCH 2/4] snap: drop per-interval task interval on clean finish --- crates/networking/p2p/snap/client.rs | 36 ++++++++++++++++++++++++++++ 1 file changed, 36 insertions(+) diff --git a/crates/networking/p2p/snap/client.rs b/crates/networking/p2p/snap/client.rs index 783ca29f941..a3c6151c0ad 100644 --- a/crates/networking/p2p/snap/client.rs +++ b/crates/networking/p2p/snap/client.rs @@ -921,6 +921,42 @@ pub async fn request_storage_ranges( debug!("Split big storage account into {chunk_count} chunks."); } } + } else if let Some(hash_end) = hash_end { + // Per-interval task completed: the peer covered + // [start_hash, hash_end] fully and verify_range reported + // should_continue=false, so the worker returns + // remaining_start == remaining_end and the guard above does + // not fire. Drop the matching interval here so the account + // can finalize across calls; otherwise the partition logic + // at function entry would re-queue the same range forever. + let mut acc_hash: H256 = H256::zero(); + for account in accounts_by_root_hash[start_index].1.iter() { + if let Some((_, old_intervals)) = account_storage_roots + .accounts_with_storage_root + .get(account) + && !old_intervals.is_empty() + { + acc_hash = *account; + } + } + if !acc_hash.is_zero() { + let (_, old_intervals) = account_storage_roots + .accounts_with_storage_root + .get_mut(&acc_hash) + .ok_or(SnapError::InternalError("Tried to get the old download intervals for an account but did not find them".to_owned()))?; + if let Some(pos) = old_intervals + .iter() + .position(|(_old_start, end)| end == &hash_end) + { + old_intervals.remove(pos); + } + if old_intervals.is_empty() { + for account in accounts_by_root_hash[start_index].1.iter() { + accounts_done.insert(*account, vec![]); + account_storage_roots.healed_accounts.insert(*account); + } + } + } } if account_storages.is_empty() { From 391b03cdd24bea78a9c41b5731e4771b7327b36b Mon Sep 17 00:00:00 2001 From: Lucas Fiegl Date: Wed, 29 Apr 2026 12:19:01 -0300 Subject: [PATCH 3/4] snap: address review nits in scheduling fix --- crates/networking/p2p/snap/client.rs | 36 +++++++++++++--------------- 1 file changed, 17 insertions(+), 19 deletions(-) diff --git a/crates/networking/p2p/snap/client.rs b/crates/networking/p2p/snap/client.rs index a3c6151c0ad..cc47a700c8d 100644 --- a/crates/networking/p2p/snap/client.rs +++ b/crates/networking/p2p/snap/client.rs @@ -558,18 +558,14 @@ pub async fn request_storage_ranges( accounts_by_root_hash.sort_unstable_by_key(|(_, accounts)| !accounts.len()); let chunk_size = STORAGE_BATCH_SIZE; - // list of tasks to be executed - // Types are (start_index, end_index, starting_hash) - // NOTE: end_index is NOT inclusive - // Partition into bulk-path tasks (fresh accounts with empty intervals) and // per-interval tasks (big accounts marked in a prior call). The previous // implementation queued every account from `start_hash: zero` and relied - // on branch-3 in the response handler re-firing each call to re-queue - // per-interval tasks for big accounts. That fails when peers cover a big - // account fully without hitting their response limit on it: branch-3 - // doesn't fire, no per-interval tasks get queued, intervals never drain, - // the account is stuck pending forever even after its data is on disk. + // on the response handler's bulk-task big-account split path to re-queue + // per-interval tasks each call. That fails when peers cover a big account + // fully without hitting their response limit on it: the split path doesn't + // fire, no per-interval tasks get queued, intervals never drain, the + // account is stuck pending forever even after its data is on disk. let mut tasks_queue_not_started = VecDeque::::new(); let mut bulk_chunk_start: Option = None; for (i, (_, accounts)) in accounts_by_root_hash.iter().enumerate() { @@ -586,12 +582,10 @@ pub async fn request_storage_ranges( })? .1; if intervals.is_empty() { - if bulk_chunk_start.is_none() { - bulk_chunk_start = Some(i); - } - if i + 1 - bulk_chunk_start.unwrap_or(i) >= chunk_size { + let chunk_start = *bulk_chunk_start.get_or_insert(i); + if i + 1 - chunk_start >= chunk_size { tasks_queue_not_started.push_back(StorageTask { - start_index: bulk_chunk_start.unwrap_or(i), + start_index: chunk_start, end_index: i + 1, start_hash: H256::zero(), end_hash: None, @@ -608,7 +602,7 @@ pub async fn request_storage_ranges( }); bulk_chunk_start = None; } - for (start_hash, end_hash) in intervals.clone() { + for &(start_hash, end_hash) in intervals.iter() { tasks_queue_not_started.push_back(StorageTask { start_index: i, end_index: i + 1, @@ -939,17 +933,21 @@ pub async fn request_storage_ranges( acc_hash = *account; } } + // acc_hash stays zero when a sibling per-interval task for the + // same account already drained the last interval and finalized + // it earlier in this call's loop — there's nothing left to do. if !acc_hash.is_zero() { let (_, old_intervals) = account_storage_roots .accounts_with_storage_root .get_mut(&acc_hash) .ok_or(SnapError::InternalError("Tried to get the old download intervals for an account but did not find them".to_owned()))?; - if let Some(pos) = old_intervals + let pos = old_intervals .iter() .position(|(_old_start, end)| end == &hash_end) - { - old_intervals.remove(pos); - } + .ok_or(SnapError::InternalError( + "Could not find an old interval that we were tracking".to_owned(), + ))?; + old_intervals.remove(pos); if old_intervals.is_empty() { for account in accounts_by_root_hash[start_index].1.iter() { accounts_done.insert(*account, vec![]); From ae7c501a8a73af41918824a72aae5fb9fbe87e58 Mon Sep 17 00:00:00 2001 From: Lucas Fiegl Date: Thu, 14 May 2026 11:41:04 -0300 Subject: [PATCH 4/4] snap: address review nits on big-account scheduling --- crates/networking/p2p/snap/client.rs | 217 +++++++++++++++------------ 1 file changed, 121 insertions(+), 96 deletions(-) diff --git a/crates/networking/p2p/snap/client.rs b/crates/networking/p2p/snap/client.rs index cc47a700c8d..0556261ebb6 100644 --- a/crates/networking/p2p/snap/client.rs +++ b/crates/networking/p2p/snap/client.rs @@ -69,6 +69,10 @@ struct StorageTaskResult { remaining_start: usize, remaining_end: usize, remaining_hash_range: (H256, Option), + // The start_hash of the original task. Distinct from remaining_hash_range.0, + // which is the worker's advancing pointer (zero on full completion). Needed + // by the response handler to match the completed interval unambiguously. + task_start_hash: H256, } #[derive(Debug)] @@ -80,6 +84,64 @@ struct StorageTask { end_hash: Option, } +/// Removes the completed interval `(start_hash, end_hash)` from whichever +/// account in the group at `accounts_by_root_hash[start_index]` currently +/// holds the interval list, and when the list empties marks every account in +/// the group as done and healed. +/// +/// Returns `true` if an interval was found and removed, `false` if no account +/// in the group has any live intervals (a sibling task already finalized the +/// account earlier in this call). +/// +/// Within a group sharing the same storage root, the split path stores +/// intervals under one canonical account, so only that account's entry holds +/// the live list. We scan the group rather than relying on `accounts.first()` +/// because the canonical account can shift across calls if the set of tracked +/// accounts changes between iterations of `accounts_with_storage_root`. +fn clear_completed_interval( + account_storage_roots: &mut AccountStorageRoots, + accounts_by_root_hash: &[(H256, Vec)], + accounts_done: &mut HashMap>, + start_index: usize, + interval: (H256, H256), +) -> Result { + let accounts = &accounts_by_root_hash[start_index].1; + let acc_hash = accounts.iter().copied().find(|account| { + account_storage_roots + .accounts_with_storage_root + .get(account) + .is_some_and(|(_, ivs)| !ivs.is_empty()) + }); + let Some(acc_hash) = acc_hash else { + return Ok(false); + }; + let (_, old_intervals) = account_storage_roots + .accounts_with_storage_root + .get_mut(&acc_hash) + .ok_or_else(|| { + SnapError::InternalError( + "Tried to get the old download intervals for an account but did not find them" + .to_owned(), + ) + })?; + let pos = old_intervals + .iter() + .position(|iv| *iv == interval) + .ok_or_else(|| { + SnapError::InternalError( + "Could not find an old interval that we were tracking".to_owned(), + ) + })?; + old_intervals.remove(pos); + if old_intervals.is_empty() { + for account in accounts { + accounts_done.insert(*account, vec![]); + account_storage_roots.healed_accounts.insert(*account); + } + } + Ok(true) +} + /// Requests an account range from any suitable peer given the state trie's root and the starting hash and the limit hash. /// Will also return a boolean indicating if there is more state to be fetched towards the right of the trie /// (Note that the boolean will be true even if the remaining state is ouside the boundary set by the limit hash) @@ -553,6 +615,13 @@ pub async fn request_storage_ranges( } } } + // Invariant: within a group sharing the same storage root, the split path + // stores intervals under one canonical account, so at most one account's + // entry in `accounts_with_storage_root` holds a non-empty interval list. + // Scheduling and completion code below scan the group to find that account + // rather than relying on `accounts.first()`, because iteration order of + // `accounts_with_storage_root` can shift between calls when the tracked + // set changes. let mut accounts_by_root_hash = Vec::from_iter(accounts_by_root_hash); // TODO: Turn this into a stable sort for binary search. accounts_by_root_hash.sort_unstable_by_key(|(_, accounts)| !accounts.len()); @@ -569,38 +638,20 @@ pub async fn request_storage_ranges( let mut tasks_queue_not_started = VecDeque::::new(); let mut bulk_chunk_start: Option = None; for (i, (_, accounts)) in accounts_by_root_hash.iter().enumerate() { - let first_account = *accounts.first().ok_or_else(|| { - SnapError::InternalError("Empty accounts vector while scheduling tasks".to_owned()) - })?; - let intervals = &account_storage_roots - .accounts_with_storage_root - .get(&first_account) - .ok_or_else(|| { - SnapError::InternalError( - "Could not find intervals for account while scheduling".to_owned(), - ) - })? - .1; - if intervals.is_empty() { - let chunk_start = *bulk_chunk_start.get_or_insert(i); - if i + 1 - chunk_start >= chunk_size { - tasks_queue_not_started.push_back(StorageTask { - start_index: chunk_start, - end_index: i + 1, - start_hash: H256::zero(), - end_hash: None, - }); - bulk_chunk_start = None; - } - } else { - if let Some(start) = bulk_chunk_start { + let intervals = accounts.iter().find_map(|acc| { + account_storage_roots + .accounts_with_storage_root + .get(acc) + .and_then(|(_, ivs)| (!ivs.is_empty()).then_some(ivs)) + }); + if let Some(intervals) = intervals { + if let Some(start) = bulk_chunk_start.take() { tasks_queue_not_started.push_back(StorageTask { start_index: start, end_index: i, start_hash: H256::zero(), end_hash: None, }); - bulk_chunk_start = None; } for &(start_hash, end_hash) in intervals.iter() { tasks_queue_not_started.push_back(StorageTask { @@ -610,6 +661,17 @@ pub async fn request_storage_ranges( end_hash: Some(end_hash), }); } + } else { + let chunk_start = *bulk_chunk_start.get_or_insert(i); + if i + 1 - chunk_start >= chunk_size { + tasks_queue_not_started.push_back(StorageTask { + start_index: chunk_start, + end_index: i + 1, + start_hash: H256::zero(), + end_hash: None, + }); + bulk_chunk_start = None; + } } } if let Some(start) = bulk_chunk_start { @@ -689,6 +751,7 @@ pub async fn request_storage_ranges( remaining_start, remaining_end, remaining_hash_range: (hash_start, hash_end), + task_start_hash, } = result; completed_tasks += 1; @@ -739,52 +802,30 @@ pub async fn request_storage_ranges( .accounts_with_storage_root .get_mut(&acc_hash).ok_or(SnapError::InternalError("Tried to get the old download intervals for an account but did not find them".to_owned()))?; for (old_start, end) in old_intervals { - if end == &hash_end { + if *old_start == task_start_hash && *end == hash_end { *old_start = hash_start; + break; } } account_storage_roots .healed_accounts .extend(accounts_by_root_hash[start_index].1.iter().copied()); } else { - let mut acc_hash: H256 = H256::zero(); - // This search could potentially be expensive, but it's something that should happen very - // infrequently (only when we encounter an account we think it's big but it's not). In - // normal cases the vec we are iterating over just has one element (the big account). - for account in accounts_by_root_hash[remaining_start].1.iter() { - if let Some((_, old_intervals)) = account_storage_roots - .accounts_with_storage_root - .get(account) - { - if !old_intervals.is_empty() { - acc_hash = *account; - } - } else { - continue; - } - } - if acc_hash.is_zero() { + // Peer overran the original interval limit; the original + // task's interval is fully covered. Remaining work in + // this chunk still exists, so no sibling task has + // drained the account yet — a missing acc_hash here + // indicates corruption. + let found = clear_completed_interval( + account_storage_roots, + &accounts_by_root_hash, + &mut accounts_done, + remaining_start, + (task_start_hash, hash_end), + )?; + if !found { panic!("Should have found the account hash"); } - let (_, old_intervals) = account_storage_roots - .accounts_with_storage_root - .get_mut(&acc_hash) - .ok_or(SnapError::InternalError("Tried to get the old download intervals for an account but did not find them".to_owned()))?; - old_intervals.remove( - old_intervals - .iter() - .position(|(_old_start, end)| end == &hash_end) - .ok_or(SnapError::InternalError( - "Could not find an old interval that we were tracking" - .to_owned(), - ))?, - ); - if old_intervals.is_empty() { - for account in accounts_by_root_hash[remaining_start].1.iter() { - accounts_done.insert(*account, vec![]); - account_storage_roots.healed_accounts.insert(*account); - } - } } } else { if remaining_start + 1 < remaining_end { @@ -917,44 +958,27 @@ pub async fn request_storage_ranges( } } else if let Some(hash_end) = hash_end { // Per-interval task completed: the peer covered - // [start_hash, hash_end] fully and verify_range reported + // [task_start_hash, hash_end] fully and verify_range reported // should_continue=false, so the worker returns // remaining_start == remaining_end and the guard above does // not fire. Drop the matching interval here so the account // can finalize across calls; otherwise the partition logic // at function entry would re-queue the same range forever. - let mut acc_hash: H256 = H256::zero(); - for account in accounts_by_root_hash[start_index].1.iter() { - if let Some((_, old_intervals)) = account_storage_roots - .accounts_with_storage_root - .get(account) - && !old_intervals.is_empty() - { - acc_hash = *account; - } - } - // acc_hash stays zero when a sibling per-interval task for the - // same account already drained the last interval and finalized - // it earlier in this call's loop — there's nothing left to do. - if !acc_hash.is_zero() { - let (_, old_intervals) = account_storage_roots - .accounts_with_storage_root - .get_mut(&acc_hash) - .ok_or(SnapError::InternalError("Tried to get the old download intervals for an account but did not find them".to_owned()))?; - let pos = old_intervals - .iter() - .position(|(_old_start, end)| end == &hash_end) - .ok_or(SnapError::InternalError( - "Could not find an old interval that we were tracking".to_owned(), - ))?; - old_intervals.remove(pos); - if old_intervals.is_empty() { - for account in accounts_by_root_hash[start_index].1.iter() { - accounts_done.insert(*account, vec![]); - account_storage_roots.healed_accounts.insert(*account); - } - } - } + // + // The helper returns false when no live interval is found — + // that happens when a sibling per-interval task for the same + // account already drained the last interval and finalized it + // earlier in this call's loop. Unlike the partial-completion + // path above (which panics on a missing acc_hash because no + // sibling can have drained while work still remains in the + // chunk), here we silently skip. + clear_completed_interval( + account_storage_roots, + &accounts_by_root_hash, + &mut accounts_done, + start_index, + (task_start_hash, hash_end), + )?; } if account_storages.is_empty() { @@ -1478,6 +1502,7 @@ async fn request_storage_ranges_worker( remaining_start, remaining_end, remaining_hash_range, + task_start_hash: start_hash, }; tx.send(task_result).await.ok(); Ok::<(), SnapError>(())