Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions prdoc/pr_11085.prdoc
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
title: 'Sync: Gracefully handle blocks from an unknown fork'
doc:
- audience: Node Dev
description: |-
There is the possibility that node A connects to node B. Both are at the same best block (20). Shortly after this, node B announces a block 21 that is from a completely different fork (started at e.g. block 15). Right now this leads to node A downloading this block 21 and then failing to import it because it doesn't have the parent block.

This pull request solves this situation by putting the peer into ancestry search when it detects a fork that is "unknown".
crates:
- name: sc-network-sync
bump: patch
44 changes: 39 additions & 5 deletions substrate/client/network/sync/src/strategy/chain_sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -348,7 +348,14 @@ pub(crate) enum PeerSyncState<B: BlockT> {
/// Available for sync requests.
Available,
/// Searching for ancestors the Peer has in common with us.
AncestorSearch { start: NumberFor<B>, current: NumberFor<B>, state: AncestorSearchState<B> },
AncestorSearch {
/// The best queued number when starting the ancestor search.
start: NumberFor<B>,
/// The current block that is being downloaded.
current: NumberFor<B>,
/// The state of the search.
state: AncestorSearchState<B>,
},
/// Actively downloading new blocks, starting from the given Number.
DownloadingNew(NumberFor<B>),
/// Downloading a stale block with given Hash. Stale means that it is a
Expand Down Expand Up @@ -497,6 +504,7 @@ where
let ancient_parent = parent_status == BlockStatus::InChainPruned;

let known = self.is_known(&hash);
let is_major_syncing = self.is_major_syncing();
let peer = if let Some(peer) = self.peers.get_mut(&peer_id) {
peer
} else {
Expand All @@ -509,6 +517,11 @@ where
return None;
}

// The node is continuing a known fork if either the block itself is known, the
// parent is known or the block references the previously announced `best_hash`.
let continues_known_fork =
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about moving this inside

if is_best {
    let continues_known_fork =
        known || known_parent || announce.header.parent_hash() == &peer.best_hash;
    (...)
}

where it is only used?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This doesn't work, because then peer.best_hash maybe is already updated.

known || known_parent || announce.header.parent_hash() == &peer.best_hash;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we run into a race condition here? Something like a previous block triggered ancestor search and peer.best_hash is already set. But then next block gets announced and this condition would be true.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When the peer is in ancestry search mode, this method aborts early (check above).


let peer_info = is_best.then(|| {
// update their best block
peer.best_number = number;
Expand All @@ -520,12 +533,33 @@ where
// If the announced block is the best they have and is not ahead of us, our common number
// is either one further ahead or it's the one they just announced, if we know about it.
if is_best {
if known && self.best_queued_number >= number {
self.update_peer_common_number(&peer_id, number);
let best_queued_number = self.best_queued_number;

if known && best_queued_number >= number {
peer.update_common_number(number);
} else if announce.header.parent_hash() == &self.best_queued_hash ||
known_parent && self.best_queued_number >= number
known_parent && best_queued_number >= number
{
self.update_peer_common_number(&peer_id, number.saturating_sub(One::one()));
peer.update_common_number(number.saturating_sub(One::one()));
}

// If this announced block isn't following any known fork, we have to start an
// ancestor search to find out our real common block. However, we skip this during
// major sync to avoid pulling peers out of the download pool.
if !continues_known_fork && !is_major_syncing {
let current = number.min(best_queued_number);
peer.common_number = peer.common_number.min(self.client.info().finalized_number);
peer.state = PeerSyncState::AncestorSearch {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we get this peer stuck in an AncestorSearch? What would be the worst case if peer B is maliciously advertising a block 21? Could it force us to go back to genesis (e.g., if the block is 1M and on a malicious fork)?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The node can not go back to block 21, especially if this block is below the last finalized block. Ancestry search is always the state with one peer and not with all peers together. So, if we are doing ancestry search with B, we can still import blocks from other peers.

current,
start: best_queued_number,
state: AncestorSearchState::ExponentialBackoff(One::one()),
};

let request = ancestry_request::<B>(current);
let action = self.create_block_request_action(peer_id, request);
self.actions.push(action);

return peer_info;
}
}
self.allowed_requests.add(&peer_id);
Expand Down
179 changes: 153 additions & 26 deletions substrate/client/network/sync/src/strategy/chain_sync/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -359,13 +359,16 @@ fn do_ancestor_search_when_common_block_to_best_queued_gap_is_to_big() {
let client = Arc::new(TestClientBuilder::new().build());
let info = client.info();

let protocol_name = ProtocolName::Static("");
let proxy_block_downloader = Arc::new(ProxyBlockDownloader::new(protocol_name.clone()));

let mut sync = ChainSync::new(
ChainSyncMode::Full,
client.clone(),
5,
64,
ProtocolName::Static(""),
Arc::new(MockBlockDownloader::new()),
protocol_name,
proxy_block_downloader.clone(),
false,
None,
std::iter::empty(),
Expand Down Expand Up @@ -442,27 +445,43 @@ fn do_ancestor_search_when_common_block_to_best_queued_gap_is_to_big() {
// Let peer2 announce that it finished syncing
send_block_announce(best_block.header().clone(), peer_id2, &mut sync);

let (peer1_req, peer2_req) =
sync.block_requests().into_iter().fold((None, None), |res, req| {
if req.0 == peer_id1 {
(Some(req.1), res.1)
} else if req.0 == peer_id2 {
(res.0, Some(req.1))
} else {
panic!("Unexpected req: {:?}", req)
}
});
// Populate actions with block requests from `block_requests()` (peer1) alongside the
// ancestry search already queued for peer2.
let block_requests = sync
.block_requests()
.into_iter()
.map(|(peer_id, request)| sync.create_block_request_action(peer_id, request))
.collect::<Vec<_>>();
sync.actions.extend(block_requests);

let actions = sync.take_actions().collect::<Vec<_>>();
assert_eq!(actions.len(), 2);

let (mut peer1_req, mut peer2_req) = (None, None);
for action in actions {
match action {
SyncingAction::StartRequest { peer_id, request, .. } => {
block_on(request).unwrap().unwrap();
let req = proxy_block_downloader.next_request();
if peer_id == peer_id1 {
peer1_req = Some(req);
} else if peer_id == peer_id2 {
peer2_req = Some(req);
} else {
panic!("Unexpected peer: {peer_id}");
}
},
action => panic!("Unexpected action: {}", action.name()),
}
}

// We should now do an ancestor search to find the correct common block.
let peer2_req = peer2_req.unwrap();
assert_eq!(Some(1), peer2_req.max);
assert_eq!(FromBlock::Number(best_block_num as u64), peer2_req.from);
assert_eq!(Some(1), peer2_req.max);

let response = create_block_response(vec![blocks[(best_block_num - 1) as usize].clone()]);

// Clear old actions to not deal with them
let _ = sync.take_actions();

sync.on_block_data(&peer_id2, Some(peer2_req), response).unwrap();

let actions = sync.take_actions().collect::<Vec<_>>();
Expand Down Expand Up @@ -544,11 +563,18 @@ fn can_sync_huge_fork() {

send_block_announce(fork_blocks.last().unwrap().header().clone(), peer_id1, &mut sync);

let mut request =
get_block_request(&mut sync, FromBlock::Number(info.best_number), 1, &peer_id1);

// Discard old actions we are not interested in
let _ = sync.take_actions();
// The announce triggers an ancestry search via actions
let mut actions = sync.take_actions().collect::<Vec<_>>();
assert_eq!(actions.len(), 1);
let mut request = match actions.pop().unwrap() {
SyncingAction::StartRequest { request, .. } => {
block_on(request).unwrap().unwrap();
proxy_block_downloader.next_request()
},
action => panic!("Unexpected action: {}", action.name()),
};
assert_eq!(FromBlock::Number(info.best_number), request.from);
assert_eq!(Some(1), request.max);

// Do the ancestor search
loop {
Expand Down Expand Up @@ -693,11 +719,18 @@ fn syncs_fork_without_duplicate_requests() {

send_block_announce(fork_blocks.last().unwrap().header().clone(), peer_id1, &mut sync);

let mut request =
get_block_request(&mut sync, FromBlock::Number(info.best_number), 1, &peer_id1);

// Discard pending actions
let _ = sync.take_actions();
// The announce triggers an ancestry search via actions
let mut actions = sync.take_actions().collect::<Vec<_>>();
assert_eq!(actions.len(), 1);
let mut request = match actions.pop().unwrap() {
SyncingAction::StartRequest { request, .. } => {
block_on(request).unwrap().unwrap();
proxy_block_downloader.next_request()
},
action => panic!("Unexpected action: {}", action.name()),
};
assert_eq!(FromBlock::Number(info.best_number), request.from);
assert_eq!(Some(1), request.max);

// Do the ancestor search
loop {
Expand Down Expand Up @@ -1471,3 +1504,97 @@ fn regular_sync_always_requests_bodies_regardless_of_pruning() {
}
}
}

/// During major sync, peers that announce blocks on unknown forks should NOT be put into
/// `AncestorSearch`. The scenario:
///
/// 1. We import some blocks, then peers connect that are far ahead.
/// 2. The import queue fills up, so `add_peer_inner` skips ancestry search and adds new peers as
/// `Available` with `common_number = best_queued_number`.
/// 3. One of those peers announces a new best block on an unknown fork.
/// 4. `continues_known_fork` is false, triggering ancestry search.
/// 5. The peer loses its `allowed_requests` token and can no longer serve block downloads.
///
/// If this happens to enough peers, sync stalls.
#[test]
fn no_ancestry_search_during_major_sync() {
sp_tracing::try_init_simple();

let (blocks, fork_block) = {
let client = TestClientBuilder::new().build();
let blocks = (0..MAX_DOWNLOAD_AHEAD * 2)
.map(|_| build_block(&client, None, false))
.collect::<Vec<_>>();

let fork_block = build_block(&client, Some(blocks[blocks.len() - 2].hash()), true);
(blocks, fork_block)
};

let client = Arc::new(TestClientBuilder::new().build());

// Import a few blocks so we're NOT at genesis (add_peer skips ancestry search at genesis).
for b in &blocks[..10] {
block_on(client.import(BlockOrigin::Own, b.clone())).unwrap();
}

let mut sync = ChainSync::new(
ChainSyncMode::Full,
client.clone(),
5,
64,
ProtocolName::Static(""),
Arc::new(MockBlockDownloader::new()),
false,
None,
std::iter::empty(),
)
.unwrap();

let peer_id1 = PeerId::random();
let peer_id2 = PeerId::random();

let best_block = blocks.last().unwrap().clone();
let best_block_num = *best_block.header().number();

// peer1 is far ahead — triggers ancestry search since queue is empty.
sync.add_peer(peer_id1, best_block.hash(), best_block_num);
assert!(matches!(
sync.peers.get(&peer_id1).unwrap().state,
PeerSyncState::AncestorSearch { .. }
));

// Fill the import queue to trigger the "skip ancestry search" path in add_peer_inner.
for block in &blocks[..MAJOR_SYNC_BLOCKS as usize + 1] {
sync.queue_blocks.insert(block.hash());
}

// peer2 is added — should be `Available` because queue_blocks > MAJOR_SYNC_BLOCKS.
sync.add_peer(peer_id2, best_block.hash(), best_block_num);
assert_eq!(sync.peers.get(&peer_id2).unwrap().state, PeerSyncState::Available);

// peer2 announces a new best block whose parent is NOT peer.best_hash.
// Sanity: the fork block's parent is NOT best_block.hash()
assert_ne!(fork_block.header().parent_hash(), &best_block.hash());

let announce = BlockAnnounce {
header: fork_block.header().clone(),
state: Some(BlockState::Best),
data: Some(Vec::new()),
};

let _ = sync.on_validated_block_announce(true, peer_id2, &announce);

// peer2 should NOT be in AncestorSearch during major sync — it should stay Available.
assert!(
!matches!(sync.peers.get(&peer_id2).unwrap().state, PeerSyncState::AncestorSearch { .. }),
"Peer should not be in AncestorSearch during major sync — this would stall sync!",
);

// No ancestry search action should be queued for peer2.
let actions = sync.take_actions().collect::<Vec<_>>();
for action in &actions {
if let SyncingAction::StartRequest { peer_id, .. } = action {
assert_ne!(*peer_id, peer_id2, "No request should be sent to peer2 during major sync",);
}
}
}
33 changes: 29 additions & 4 deletions substrate/client/network/test/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,10 @@ mod sync;
use std::{
collections::HashMap,
pin::Pin,
sync::Arc,
sync::{
atomic::{AtomicU32, Ordering},
Arc,
},
task::{Context as FutureContext, Poll},
time::Duration,
};
Expand Down Expand Up @@ -586,6 +589,11 @@ where
self.verifier.failed_verifications.lock().clone()
}

/// Returns the number of errors while importing blocks.
pub fn import_error_count(&self) -> u32 {
self.block_import.import_error_count()
}

pub fn has_block(&self, hash: H256) -> bool {
self.backend
.as_ref()
Expand Down Expand Up @@ -619,12 +627,18 @@ impl<T> BlockImportAdapterFull for T where
#[derive(Clone)]
pub struct BlockImportAdapter<I> {
inner: I,
import_errors: Arc<AtomicU32>,
}

impl<I> BlockImportAdapter<I> {
/// Create a new instance of `Self::Full`.
pub fn new(inner: I) -> Self {
Self { inner }
Self { inner, import_errors: Default::default() }
}

/// Returns the number of errors while importing blocks.
pub fn import_error_count(&self) -> u32 {
self.import_errors.load(Ordering::Relaxed)
}
}

Expand All @@ -639,14 +653,25 @@ where
&self,
block: BlockCheckParams<Block>,
) -> Result<ImportResult, Self::Error> {
self.inner.check_block(block).await
let result = self.inner.check_block(block).await;
if !matches!(
result,
Ok(ImportResult::Imported(_) | ImportResult::AlreadyInChain | ImportResult::KnownBad)
) {
self.import_errors.fetch_add(1, Ordering::Relaxed);
}
result
}

async fn import_block(
&self,
block: BlockImportParams<Block>,
) -> Result<ImportResult, Self::Error> {
self.inner.import_block(block).await
let result = self.inner.import_block(block).await;
if !matches!(result, Ok(ImportResult::Imported(_) | ImportResult::AlreadyInChain)) {
self.import_errors.fetch_add(1, Ordering::Relaxed);
}
result
}
}

Expand Down
2 changes: 1 addition & 1 deletion substrate/client/network/test/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ impl TestNetwork {

pub fn start_network(
self,
) -> (Arc<TestNetworkService>, (impl Stream<Item = Event> + std::marker::Unpin)) {
) -> (Arc<TestNetworkService>, impl Stream<Item = Event> + std::marker::Unpin) {
let worker = self.network;
let service = worker.service().clone();
let event_stream = service.event_stream("test");
Expand Down
Loading
Loading