Skip to content
Merged
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
205 changes: 158 additions & 47 deletions crates/networking/p2p/snap/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,10 @@ struct StorageTaskResult {
remaining_start: usize,
remaining_end: usize,
remaining_hash_range: (H256, Option<H256>),
// The start_hash of the original task. Distinct from remaining_hash_range.0,
// which is the worker's advancing pointer (zero on full completion). Needed
// by the response handler to match the completed interval unambiguously.
task_start_hash: H256,
}

#[derive(Debug)]
Expand All @@ -80,6 +84,64 @@ struct StorageTask {
end_hash: Option<H256>,
}

/// Removes the completed interval `(start_hash, end_hash)` from whichever
/// account in the group at `accounts_by_root_hash[start_index]` currently
/// holds the interval list, and when the list empties marks every account in
/// the group as done and healed.
///
/// Returns `true` if an interval was found and removed, `false` if no account
/// in the group has any live intervals (a sibling task already finalized the
/// account earlier in this call).
///
/// Within a group sharing the same storage root, the split path stores
/// intervals under one canonical account, so only that account's entry holds
/// the live list. We scan the group rather than relying on `accounts.first()`
/// because the canonical account can shift across calls if the set of tracked
/// accounts changes between iterations of `accounts_with_storage_root`.
fn clear_completed_interval(
account_storage_roots: &mut AccountStorageRoots,
accounts_by_root_hash: &[(H256, Vec<H256>)],
accounts_done: &mut HashMap<H256, Vec<(H256, H256)>>,
start_index: usize,
interval: (H256, H256),
) -> Result<bool, SnapError> {
let accounts = &accounts_by_root_hash[start_index].1;
let acc_hash = accounts.iter().copied().find(|account| {
account_storage_roots
.accounts_with_storage_root
.get(account)
.is_some_and(|(_, ivs)| !ivs.is_empty())
});
let Some(acc_hash) = acc_hash else {
return Ok(false);
};
let (_, old_intervals) = account_storage_roots
.accounts_with_storage_root
.get_mut(&acc_hash)
.ok_or_else(|| {
SnapError::InternalError(
"Tried to get the old download intervals for an account but did not find them"
.to_owned(),
)
})?;
let pos = old_intervals
.iter()
.position(|iv| *iv == interval)
.ok_or_else(|| {
SnapError::InternalError(
"Could not find an old interval that we were tracking".to_owned(),
)
})?;
old_intervals.remove(pos);
if old_intervals.is_empty() {
for account in accounts {
accounts_done.insert(*account, vec![]);
account_storage_roots.healed_accounts.insert(*account);
}
}
Ok(true)
}

/// Requests an account range from any suitable peer given the state trie's root and the starting hash and the limit hash.
/// Will also return a boolean indicating if there is more state to be fetched towards the right of the trie
/// (Note that the boolean will be true even if the remaining state is ouside the boundary set by the limit hash)
Expand Down Expand Up @@ -541,23 +603,69 @@ pub async fn request_storage_ranges(
}
}
}
// Invariant: within a group sharing the same storage root, the split path
// stores intervals under one canonical account, so at most one account's
// entry in `accounts_with_storage_root` holds a non-empty interval list.
// Scheduling and completion code below scan the group to find that account
// rather than relying on `accounts.first()`, because iteration order of
// `accounts_with_storage_root` can shift between calls when the tracked
// set changes.
let mut accounts_by_root_hash = Vec::from_iter(accounts_by_root_hash);
// TODO: Turn this into a stable sort for binary search.
accounts_by_root_hash.sort_unstable_by_key(|(_, accounts)| !accounts.len());
let chunk_size = STORAGE_BATCH_SIZE;
let chunk_count = (accounts_by_root_hash.len() / chunk_size) + 1;

// list of tasks to be executed
// Types are (start_index, end_index, starting_hash)
// NOTE: end_index is NOT inclusive

// Partition into bulk-path tasks (fresh accounts with empty intervals) and
// per-interval tasks (big accounts marked in a prior call). The previous
// implementation queued every account from `start_hash: zero` and relied
// on the response handler's bulk-task big-account split path to re-queue
// per-interval tasks each call. That fails when peers cover a big account
// fully without hitting their response limit on it: the split path doesn't
// fire, no per-interval tasks get queued, intervals never drain, the
// account is stuck pending forever even after its data is on disk.
let mut tasks_queue_not_started = VecDeque::<StorageTask>::new();
for i in 0..chunk_count {
let chunk_start = chunk_size * i;
let chunk_end = (chunk_start + chunk_size).min(accounts_by_root_hash.len());
let mut bulk_chunk_start: Option<usize> = None;
for (i, (_, accounts)) in accounts_by_root_hash.iter().enumerate() {
let intervals = accounts.iter().find_map(|acc| {
account_storage_roots
.accounts_with_storage_root
.get(acc)
.and_then(|(_, ivs)| (!ivs.is_empty()).then_some(ivs))
});
if let Some(intervals) = intervals {
if let Some(start) = bulk_chunk_start.take() {
tasks_queue_not_started.push_back(StorageTask {
start_index: start,
end_index: i,
start_hash: H256::zero(),
end_hash: None,
});
}
for &(start_hash, end_hash) in intervals.iter() {
tasks_queue_not_started.push_back(StorageTask {
start_index: i,
end_index: i + 1,
start_hash,
end_hash: Some(end_hash),
});
}
} else {
let chunk_start = *bulk_chunk_start.get_or_insert(i);
if i + 1 - chunk_start >= chunk_size {
tasks_queue_not_started.push_back(StorageTask {
start_index: chunk_start,
end_index: i + 1,
start_hash: H256::zero(),
end_hash: None,
});
bulk_chunk_start = None;
}
}
}
if let Some(start) = bulk_chunk_start {
tasks_queue_not_started.push_back(StorageTask {
start_index: chunk_start,
end_index: chunk_end,
start_index: start,
end_index: accounts_by_root_hash.len(),
start_hash: H256::zero(),
end_hash: None,
});
Expand Down Expand Up @@ -624,6 +732,7 @@ pub async fn request_storage_ranges(
remaining_start,
remaining_end,
remaining_hash_range: (hash_start, hash_end),
task_start_hash,
} = result;
completed_tasks += 1;

Expand Down Expand Up @@ -674,52 +783,30 @@ pub async fn request_storage_ranges(
.accounts_with_storage_root
.get_mut(&acc_hash).ok_or(SnapError::InternalError("Tried to get the old download intervals for an account but did not find them".to_owned()))?;
for (old_start, end) in old_intervals {
if end == &hash_end {
if *old_start == task_start_hash && *end == hash_end {
*old_start = hash_start;
break;
}
}
account_storage_roots
.healed_accounts
.extend(accounts_by_root_hash[start_index].1.iter().copied());
} else {
let mut acc_hash: H256 = H256::zero();
// This search could potentially be expensive, but it's something that should happen very
// infrequently (only when we encounter an account we think it's big but it's not). In
// normal cases the vec we are iterating over just has one element (the big account).
for account in accounts_by_root_hash[remaining_start].1.iter() {
if let Some((_, old_intervals)) = account_storage_roots
.accounts_with_storage_root
.get(account)
{
if !old_intervals.is_empty() {
acc_hash = *account;
}
} else {
continue;
}
}
if acc_hash.is_zero() {
// Peer overran the original interval limit; the original
// task's interval is fully covered. Remaining work in
// this chunk still exists, so no sibling task has
// drained the account yet — a missing acc_hash here
// indicates corruption.
let found = clear_completed_interval(
account_storage_roots,
&accounts_by_root_hash,
&mut accounts_done,
remaining_start,
(task_start_hash, hash_end),
)?;
if !found {
panic!("Should have found the account hash");
}
let (_, old_intervals) = account_storage_roots
.accounts_with_storage_root
.get_mut(&acc_hash)
.ok_or(SnapError::InternalError("Tried to get the old download intervals for an account but did not find them".to_owned()))?;
old_intervals.remove(
old_intervals
.iter()
.position(|(_old_start, end)| end == &hash_end)
.ok_or(SnapError::InternalError(
"Could not find an old interval that we were tracking"
.to_owned(),
))?,
);
if old_intervals.is_empty() {
for account in accounts_by_root_hash[remaining_start].1.iter() {
accounts_done.insert(*account, vec![]);
account_storage_roots.healed_accounts.insert(*account);
}
}
}
} else {
if remaining_start + 1 < remaining_end {
Expand Down Expand Up @@ -850,6 +937,29 @@ pub async fn request_storage_ranges(
debug!("Split big storage account into {chunk_count} chunks.");
}
}
} else if let Some(hash_end) = hash_end {
// Per-interval task completed: the peer covered
// [task_start_hash, hash_end] fully and verify_range reported
// should_continue=false, so the worker returns
// remaining_start == remaining_end and the guard above does
// not fire. Drop the matching interval here so the account
// can finalize across calls; otherwise the partition logic
// at function entry would re-queue the same range forever.
//
// The helper returns false when no live interval is found —
// that happens when a sibling per-interval task for the same
// account already drained the last interval and finalized it
// earlier in this call's loop. Unlike the partial-completion
// path above (which panics on a missing acc_hash because no
// sibling can have drained while work still remains in the
// chunk), here we silently skip.
clear_completed_interval(
account_storage_roots,
&accounts_by_root_hash,
&mut accounts_done,
start_index,
(task_start_hash, hash_end),
)?;
}

if account_storages.is_empty() {
Expand Down Expand Up @@ -1368,6 +1478,7 @@ async fn request_storage_ranges_worker(
remaining_start,
remaining_end,
remaining_hash_range,
task_start_hash: start_hash,
};
tx.send(task_result).await.ok();
Ok::<(), SnapError>(())
Expand Down
Loading