diff --git a/crates/networking/p2p/snap/client.rs b/crates/networking/p2p/snap/client.rs index 79998355611..4bb687c13d6 100644 --- a/crates/networking/p2p/snap/client.rs +++ b/crates/networking/p2p/snap/client.rs @@ -69,6 +69,10 @@ struct StorageTaskResult { remaining_start: usize, remaining_end: usize, remaining_hash_range: (H256, Option), + // The start_hash of the original task. Distinct from remaining_hash_range.0, + // which is the worker's advancing pointer (zero on full completion). Needed + // by the response handler to match the completed interval unambiguously. + task_start_hash: H256, } #[derive(Debug)] @@ -80,6 +84,64 @@ struct StorageTask { end_hash: Option, } +/// Removes the completed interval `(start_hash, end_hash)` from whichever +/// account in the group at `accounts_by_root_hash[start_index]` currently +/// holds the interval list, and when the list empties marks every account in +/// the group as done and healed. +/// +/// Returns `true` if an interval was found and removed, `false` if no account +/// in the group has any live intervals (a sibling task already finalized the +/// account earlier in this call). +/// +/// Within a group sharing the same storage root, the split path stores +/// intervals under one canonical account, so only that account's entry holds +/// the live list. We scan the group rather than relying on `accounts.first()` +/// because the canonical account can shift across calls if the set of tracked +/// accounts changes between iterations of `accounts_with_storage_root`. +fn clear_completed_interval( + account_storage_roots: &mut AccountStorageRoots, + accounts_by_root_hash: &[(H256, Vec)], + accounts_done: &mut HashMap>, + start_index: usize, + interval: (H256, H256), +) -> Result { + let accounts = &accounts_by_root_hash[start_index].1; + let acc_hash = accounts.iter().copied().find(|account| { + account_storage_roots + .accounts_with_storage_root + .get(account) + .is_some_and(|(_, ivs)| !ivs.is_empty()) + }); + let Some(acc_hash) = acc_hash else { + return Ok(false); + }; + let (_, old_intervals) = account_storage_roots + .accounts_with_storage_root + .get_mut(&acc_hash) + .ok_or_else(|| { + SnapError::InternalError( + "Tried to get the old download intervals for an account but did not find them" + .to_owned(), + ) + })?; + let pos = old_intervals + .iter() + .position(|iv| *iv == interval) + .ok_or_else(|| { + SnapError::InternalError( + "Could not find an old interval that we were tracking".to_owned(), + ) + })?; + old_intervals.remove(pos); + if old_intervals.is_empty() { + for account in accounts { + accounts_done.insert(*account, vec![]); + account_storage_roots.healed_accounts.insert(*account); + } + } + Ok(true) +} + /// Requests an account range from any suitable peer given the state trie's root and the starting hash and the limit hash. /// Will also return a boolean indicating if there is more state to be fetched towards the right of the trie /// (Note that the boolean will be true even if the remaining state is ouside the boundary set by the limit hash) @@ -536,23 +598,69 @@ pub async fn request_storage_ranges( } } } + // Invariant: within a group sharing the same storage root, the split path + // stores intervals under one canonical account, so at most one account's + // entry in `accounts_with_storage_root` holds a non-empty interval list. + // Scheduling and completion code below scan the group to find that account + // rather than relying on `accounts.first()`, because iteration order of + // `accounts_with_storage_root` can shift between calls when the tracked + // set changes. let mut accounts_by_root_hash = Vec::from_iter(accounts_by_root_hash); // TODO: Turn this into a stable sort for binary search. accounts_by_root_hash.sort_unstable_by_key(|(_, accounts)| !accounts.len()); let chunk_size = STORAGE_BATCH_SIZE; - let chunk_count = (accounts_by_root_hash.len() / chunk_size) + 1; - - // list of tasks to be executed - // Types are (start_index, end_index, starting_hash) - // NOTE: end_index is NOT inclusive + // Partition into bulk-path tasks (fresh accounts with empty intervals) and + // per-interval tasks (big accounts marked in a prior call). The previous + // implementation queued every account from `start_hash: zero` and relied + // on the response handler's bulk-task big-account split path to re-queue + // per-interval tasks each call. That fails when peers cover a big account + // fully without hitting their response limit on it: the split path doesn't + // fire, no per-interval tasks get queued, intervals never drain, the + // account is stuck pending forever even after its data is on disk. let mut tasks_queue_not_started = VecDeque::::new(); - for i in 0..chunk_count { - let chunk_start = chunk_size * i; - let chunk_end = (chunk_start + chunk_size).min(accounts_by_root_hash.len()); + let mut bulk_chunk_start: Option = None; + for (i, (_, accounts)) in accounts_by_root_hash.iter().enumerate() { + let intervals = accounts.iter().find_map(|acc| { + account_storage_roots + .accounts_with_storage_root + .get(acc) + .and_then(|(_, ivs)| (!ivs.is_empty()).then_some(ivs)) + }); + if let Some(intervals) = intervals { + if let Some(start) = bulk_chunk_start.take() { + tasks_queue_not_started.push_back(StorageTask { + start_index: start, + end_index: i, + start_hash: H256::zero(), + end_hash: None, + }); + } + for &(start_hash, end_hash) in intervals.iter() { + tasks_queue_not_started.push_back(StorageTask { + start_index: i, + end_index: i + 1, + start_hash, + end_hash: Some(end_hash), + }); + } + } else { + let chunk_start = *bulk_chunk_start.get_or_insert(i); + if i + 1 - chunk_start >= chunk_size { + tasks_queue_not_started.push_back(StorageTask { + start_index: chunk_start, + end_index: i + 1, + start_hash: H256::zero(), + end_hash: None, + }); + bulk_chunk_start = None; + } + } + } + if let Some(start) = bulk_chunk_start { tasks_queue_not_started.push_back(StorageTask { - start_index: chunk_start, - end_index: chunk_end, + start_index: start, + end_index: accounts_by_root_hash.len(), start_hash: H256::zero(), end_hash: None, }); @@ -619,6 +727,7 @@ pub async fn request_storage_ranges( remaining_start, remaining_end, remaining_hash_range: (hash_start, hash_end), + task_start_hash, } = result; completed_tasks += 1; @@ -669,52 +778,30 @@ pub async fn request_storage_ranges( .accounts_with_storage_root .get_mut(&acc_hash).ok_or(SnapError::InternalError("Tried to get the old download intervals for an account but did not find them".to_owned()))?; for (old_start, end) in old_intervals { - if end == &hash_end { + if *old_start == task_start_hash && *end == hash_end { *old_start = hash_start; + break; } } account_storage_roots .healed_accounts .extend(accounts_by_root_hash[start_index].1.iter().copied()); } else { - let mut acc_hash: H256 = H256::zero(); - // This search could potentially be expensive, but it's something that should happen very - // infrequently (only when we encounter an account we think it's big but it's not). In - // normal cases the vec we are iterating over just has one element (the big account). - for account in accounts_by_root_hash[remaining_start].1.iter() { - if let Some((_, old_intervals)) = account_storage_roots - .accounts_with_storage_root - .get(account) - { - if !old_intervals.is_empty() { - acc_hash = *account; - } - } else { - continue; - } - } - if acc_hash.is_zero() { + // Peer overran the original interval limit; the original + // task's interval is fully covered. Remaining work in + // this chunk still exists, so no sibling task has + // drained the account yet — a missing acc_hash here + // indicates corruption. + let found = clear_completed_interval( + account_storage_roots, + &accounts_by_root_hash, + &mut accounts_done, + remaining_start, + (task_start_hash, hash_end), + )?; + if !found { panic!("Should have found the account hash"); } - let (_, old_intervals) = account_storage_roots - .accounts_with_storage_root - .get_mut(&acc_hash) - .ok_or(SnapError::InternalError("Tried to get the old download intervals for an account but did not find them".to_owned()))?; - old_intervals.remove( - old_intervals - .iter() - .position(|(_old_start, end)| end == &hash_end) - .ok_or(SnapError::InternalError( - "Could not find an old interval that we were tracking" - .to_owned(), - ))?, - ); - if old_intervals.is_empty() { - for account in accounts_by_root_hash[remaining_start].1.iter() { - accounts_done.insert(*account, vec![]); - account_storage_roots.healed_accounts.insert(*account); - } - } } } else { if remaining_start + 1 < remaining_end { @@ -845,6 +932,29 @@ pub async fn request_storage_ranges( debug!("Split big storage account into {chunk_count} chunks."); } } + } else if let Some(hash_end) = hash_end { + // Per-interval task completed: the peer covered + // [task_start_hash, hash_end] fully and verify_range reported + // should_continue=false, so the worker returns + // remaining_start == remaining_end and the guard above does + // not fire. Drop the matching interval here so the account + // can finalize across calls; otherwise the partition logic + // at function entry would re-queue the same range forever. + // + // The helper returns false when no live interval is found — + // that happens when a sibling per-interval task for the same + // account already drained the last interval and finalized it + // earlier in this call's loop. Unlike the partial-completion + // path above (which panics on a missing acc_hash because no + // sibling can have drained while work still remains in the + // chunk), here we silently skip. + clear_completed_interval( + account_storage_roots, + &accounts_by_root_hash, + &mut accounts_done, + start_index, + (task_start_hash, hash_end), + )?; } if account_storages.is_empty() { @@ -1363,6 +1473,7 @@ async fn request_storage_ranges_worker( remaining_start, remaining_end, remaining_hash_range, + task_start_hash: start_hash, }; tx.send(task_result).await.ok(); Ok::<(), SnapError>(())