diff --git a/prdoc/pr_11085.prdoc b/prdoc/pr_11085.prdoc new file mode 100644 index 0000000000000..032a54e010623 --- /dev/null +++ b/prdoc/pr_11085.prdoc @@ -0,0 +1,10 @@ +title: 'Sync: Gracefully handle blocks from an unknown fork' +doc: +- audience: Node Dev + description: |- + There is the possibility that node A connects to node B. Both are at the same best block (20). Shortly after this, node B announces a block 21 that is from a completely different fork (started at e.g. block 15). Right now this leads to node A downloading this block 21 and then failing to import it because it doesn't have the parent block. + + This pull request solves this situation by putting the peer into ancestry search when it detects a fork that is "unknown". +crates: +- name: sc-network-sync + bump: patch diff --git a/substrate/client/network/sync/src/strategy/chain_sync.rs b/substrate/client/network/sync/src/strategy/chain_sync.rs index 96aeb3786f61a..9f38929a42729 100644 --- a/substrate/client/network/sync/src/strategy/chain_sync.rs +++ b/substrate/client/network/sync/src/strategy/chain_sync.rs @@ -348,7 +348,14 @@ pub(crate) enum PeerSyncState { /// Available for sync requests. Available, /// Searching for ancestors the Peer has in common with us. - AncestorSearch { start: NumberFor, current: NumberFor, state: AncestorSearchState }, + AncestorSearch { + /// The best queued number when starting the ancestor search. + start: NumberFor, + /// The current block that is being downloaded. + current: NumberFor, + /// The state of the search. + state: AncestorSearchState, + }, /// Actively downloading new blocks, starting from the given Number. DownloadingNew(NumberFor), /// Downloading a stale block with given Hash. Stale means that it is a @@ -497,6 +504,7 @@ where let ancient_parent = parent_status == BlockStatus::InChainPruned; let known = self.is_known(&hash); + let is_major_syncing = self.is_major_syncing(); let peer = if let Some(peer) = self.peers.get_mut(&peer_id) { peer } else { @@ -509,6 +517,11 @@ where return None; } + // The node is continuing a known fork if either the block itself is known, the + // parent is known or the block references the previously announced `best_hash`. + let continues_known_fork = + known || known_parent || announce.header.parent_hash() == &peer.best_hash; + let peer_info = is_best.then(|| { // update their best block peer.best_number = number; @@ -520,12 +533,33 @@ where // If the announced block is the best they have and is not ahead of us, our common number // is either one further ahead or it's the one they just announced, if we know about it. if is_best { - if known && self.best_queued_number >= number { - self.update_peer_common_number(&peer_id, number); + let best_queued_number = self.best_queued_number; + + if known && best_queued_number >= number { + peer.update_common_number(number); } else if announce.header.parent_hash() == &self.best_queued_hash || - known_parent && self.best_queued_number >= number + known_parent && best_queued_number >= number { - self.update_peer_common_number(&peer_id, number.saturating_sub(One::one())); + peer.update_common_number(number.saturating_sub(One::one())); + } + + // If this announced block isn't following any known fork, we have to start an + // ancestor search to find out our real common block. However, we skip this during + // major sync to avoid pulling peers out of the download pool. + if !continues_known_fork && !is_major_syncing { + let current = number.min(best_queued_number); + peer.common_number = peer.common_number.min(self.client.info().finalized_number); + peer.state = PeerSyncState::AncestorSearch { + current, + start: best_queued_number, + state: AncestorSearchState::ExponentialBackoff(One::one()), + }; + + let request = ancestry_request::(current); + let action = self.create_block_request_action(peer_id, request); + self.actions.push(action); + + return peer_info; } } self.allowed_requests.add(&peer_id); diff --git a/substrate/client/network/sync/src/strategy/chain_sync/test.rs b/substrate/client/network/sync/src/strategy/chain_sync/test.rs index 34a529f6de3be..d46ea2807d9f8 100644 --- a/substrate/client/network/sync/src/strategy/chain_sync/test.rs +++ b/substrate/client/network/sync/src/strategy/chain_sync/test.rs @@ -359,13 +359,16 @@ fn do_ancestor_search_when_common_block_to_best_queued_gap_is_to_big() { let client = Arc::new(TestClientBuilder::new().build()); let info = client.info(); + let protocol_name = ProtocolName::Static(""); + let proxy_block_downloader = Arc::new(ProxyBlockDownloader::new(protocol_name.clone())); + let mut sync = ChainSync::new( ChainSyncMode::Full, client.clone(), 5, 64, - ProtocolName::Static(""), - Arc::new(MockBlockDownloader::new()), + protocol_name, + proxy_block_downloader.clone(), false, None, std::iter::empty(), @@ -442,27 +445,43 @@ fn do_ancestor_search_when_common_block_to_best_queued_gap_is_to_big() { // Let peer2 announce that it finished syncing send_block_announce(best_block.header().clone(), peer_id2, &mut sync); - let (peer1_req, peer2_req) = - sync.block_requests().into_iter().fold((None, None), |res, req| { - if req.0 == peer_id1 { - (Some(req.1), res.1) - } else if req.0 == peer_id2 { - (res.0, Some(req.1)) - } else { - panic!("Unexpected req: {:?}", req) - } - }); + // Populate actions with block requests from `block_requests()` (peer1) alongside the + // ancestry search already queued for peer2. + let block_requests = sync + .block_requests() + .into_iter() + .map(|(peer_id, request)| sync.create_block_request_action(peer_id, request)) + .collect::>(); + sync.actions.extend(block_requests); + + let actions = sync.take_actions().collect::>(); + assert_eq!(actions.len(), 2); + + let (mut peer1_req, mut peer2_req) = (None, None); + for action in actions { + match action { + SyncingAction::StartRequest { peer_id, request, .. } => { + block_on(request).unwrap().unwrap(); + let req = proxy_block_downloader.next_request(); + if peer_id == peer_id1 { + peer1_req = Some(req); + } else if peer_id == peer_id2 { + peer2_req = Some(req); + } else { + panic!("Unexpected peer: {peer_id}"); + } + }, + action => panic!("Unexpected action: {}", action.name()), + } + } // We should now do an ancestor search to find the correct common block. let peer2_req = peer2_req.unwrap(); - assert_eq!(Some(1), peer2_req.max); assert_eq!(FromBlock::Number(best_block_num as u64), peer2_req.from); + assert_eq!(Some(1), peer2_req.max); let response = create_block_response(vec![blocks[(best_block_num - 1) as usize].clone()]); - // Clear old actions to not deal with them - let _ = sync.take_actions(); - sync.on_block_data(&peer_id2, Some(peer2_req), response).unwrap(); let actions = sync.take_actions().collect::>(); @@ -544,11 +563,18 @@ fn can_sync_huge_fork() { send_block_announce(fork_blocks.last().unwrap().header().clone(), peer_id1, &mut sync); - let mut request = - get_block_request(&mut sync, FromBlock::Number(info.best_number), 1, &peer_id1); - - // Discard old actions we are not interested in - let _ = sync.take_actions(); + // The announce triggers an ancestry search via actions + let mut actions = sync.take_actions().collect::>(); + assert_eq!(actions.len(), 1); + let mut request = match actions.pop().unwrap() { + SyncingAction::StartRequest { request, .. } => { + block_on(request).unwrap().unwrap(); + proxy_block_downloader.next_request() + }, + action => panic!("Unexpected action: {}", action.name()), + }; + assert_eq!(FromBlock::Number(info.best_number), request.from); + assert_eq!(Some(1), request.max); // Do the ancestor search loop { @@ -693,11 +719,18 @@ fn syncs_fork_without_duplicate_requests() { send_block_announce(fork_blocks.last().unwrap().header().clone(), peer_id1, &mut sync); - let mut request = - get_block_request(&mut sync, FromBlock::Number(info.best_number), 1, &peer_id1); - - // Discard pending actions - let _ = sync.take_actions(); + // The announce triggers an ancestry search via actions + let mut actions = sync.take_actions().collect::>(); + assert_eq!(actions.len(), 1); + let mut request = match actions.pop().unwrap() { + SyncingAction::StartRequest { request, .. } => { + block_on(request).unwrap().unwrap(); + proxy_block_downloader.next_request() + }, + action => panic!("Unexpected action: {}", action.name()), + }; + assert_eq!(FromBlock::Number(info.best_number), request.from); + assert_eq!(Some(1), request.max); // Do the ancestor search loop { @@ -1471,3 +1504,97 @@ fn regular_sync_always_requests_bodies_regardless_of_pruning() { } } } + +/// During major sync, peers that announce blocks on unknown forks should NOT be put into +/// `AncestorSearch`. The scenario: +/// +/// 1. We import some blocks, then peers connect that are far ahead. +/// 2. The import queue fills up, so `add_peer_inner` skips ancestry search and adds new peers as +/// `Available` with `common_number = best_queued_number`. +/// 3. One of those peers announces a new best block on an unknown fork. +/// 4. `continues_known_fork` is false, triggering ancestry search. +/// 5. The peer loses its `allowed_requests` token and can no longer serve block downloads. +/// +/// If this happens to enough peers, sync stalls. +#[test] +fn no_ancestry_search_during_major_sync() { + sp_tracing::try_init_simple(); + + let (blocks, fork_block) = { + let client = TestClientBuilder::new().build(); + let blocks = (0..MAX_DOWNLOAD_AHEAD * 2) + .map(|_| build_block(&client, None, false)) + .collect::>(); + + let fork_block = build_block(&client, Some(blocks[blocks.len() - 2].hash()), true); + (blocks, fork_block) + }; + + let client = Arc::new(TestClientBuilder::new().build()); + + // Import a few blocks so we're NOT at genesis (add_peer skips ancestry search at genesis). + for b in &blocks[..10] { + block_on(client.import(BlockOrigin::Own, b.clone())).unwrap(); + } + + let mut sync = ChainSync::new( + ChainSyncMode::Full, + client.clone(), + 5, + 64, + ProtocolName::Static(""), + Arc::new(MockBlockDownloader::new()), + false, + None, + std::iter::empty(), + ) + .unwrap(); + + let peer_id1 = PeerId::random(); + let peer_id2 = PeerId::random(); + + let best_block = blocks.last().unwrap().clone(); + let best_block_num = *best_block.header().number(); + + // peer1 is far ahead — triggers ancestry search since queue is empty. + sync.add_peer(peer_id1, best_block.hash(), best_block_num); + assert!(matches!( + sync.peers.get(&peer_id1).unwrap().state, + PeerSyncState::AncestorSearch { .. } + )); + + // Fill the import queue to trigger the "skip ancestry search" path in add_peer_inner. + for block in &blocks[..MAJOR_SYNC_BLOCKS as usize + 1] { + sync.queue_blocks.insert(block.hash()); + } + + // peer2 is added — should be `Available` because queue_blocks > MAJOR_SYNC_BLOCKS. + sync.add_peer(peer_id2, best_block.hash(), best_block_num); + assert_eq!(sync.peers.get(&peer_id2).unwrap().state, PeerSyncState::Available); + + // peer2 announces a new best block whose parent is NOT peer.best_hash. + // Sanity: the fork block's parent is NOT best_block.hash() + assert_ne!(fork_block.header().parent_hash(), &best_block.hash()); + + let announce = BlockAnnounce { + header: fork_block.header().clone(), + state: Some(BlockState::Best), + data: Some(Vec::new()), + }; + + let _ = sync.on_validated_block_announce(true, peer_id2, &announce); + + // peer2 should NOT be in AncestorSearch during major sync — it should stay Available. + assert!( + !matches!(sync.peers.get(&peer_id2).unwrap().state, PeerSyncState::AncestorSearch { .. }), + "Peer should not be in AncestorSearch during major sync — this would stall sync!", + ); + + // No ancestry search action should be queued for peer2. + let actions = sync.take_actions().collect::>(); + for action in &actions { + if let SyncingAction::StartRequest { peer_id, .. } = action { + assert_ne!(*peer_id, peer_id2, "No request should be sent to peer2 during major sync",); + } + } +} diff --git a/substrate/client/network/test/src/lib.rs b/substrate/client/network/test/src/lib.rs index 3694803f95e7d..c17fc2651b7a4 100644 --- a/substrate/client/network/test/src/lib.rs +++ b/substrate/client/network/test/src/lib.rs @@ -31,7 +31,10 @@ mod sync; use std::{ collections::HashMap, pin::Pin, - sync::Arc, + sync::{ + atomic::{AtomicU32, Ordering}, + Arc, + }, task::{Context as FutureContext, Poll}, time::Duration, }; @@ -586,6 +589,11 @@ where self.verifier.failed_verifications.lock().clone() } + /// Returns the number of errors while importing blocks. + pub fn import_error_count(&self) -> u32 { + self.block_import.import_error_count() + } + pub fn has_block(&self, hash: H256) -> bool { self.backend .as_ref() @@ -619,12 +627,18 @@ impl BlockImportAdapterFull for T where #[derive(Clone)] pub struct BlockImportAdapter { inner: I, + import_errors: Arc, } impl BlockImportAdapter { /// Create a new instance of `Self::Full`. pub fn new(inner: I) -> Self { - Self { inner } + Self { inner, import_errors: Default::default() } + } + + /// Returns the number of errors while importing blocks. + pub fn import_error_count(&self) -> u32 { + self.import_errors.load(Ordering::Relaxed) } } @@ -639,14 +653,25 @@ where &self, block: BlockCheckParams, ) -> Result { - self.inner.check_block(block).await + let result = self.inner.check_block(block).await; + if !matches!( + result, + Ok(ImportResult::Imported(_) | ImportResult::AlreadyInChain | ImportResult::KnownBad) + ) { + self.import_errors.fetch_add(1, Ordering::Relaxed); + } + result } async fn import_block( &self, block: BlockImportParams, ) -> Result { - self.inner.import_block(block).await + let result = self.inner.import_block(block).await; + if !matches!(result, Ok(ImportResult::Imported(_) | ImportResult::AlreadyInChain)) { + self.import_errors.fetch_add(1, Ordering::Relaxed); + } + result } } diff --git a/substrate/client/network/test/src/service.rs b/substrate/client/network/test/src/service.rs index c30a3bdb3d10f..30153d92395f3 100644 --- a/substrate/client/network/test/src/service.rs +++ b/substrate/client/network/test/src/service.rs @@ -61,7 +61,7 @@ impl TestNetwork { pub fn start_network( self, - ) -> (Arc, (impl Stream + std::marker::Unpin)) { + ) -> (Arc, impl Stream + std::marker::Unpin) { let worker = self.network; let service = worker.service().clone(); let event_stream = service.event_stream("test"); diff --git a/substrate/client/network/test/src/sync.rs b/substrate/client/network/test/src/sync.rs index 80d95c66c9571..78b63f69d46ba 100644 --- a/substrate/client/network/test/src/sync.rs +++ b/substrate/client/network/test/src/sync.rs @@ -1018,31 +1018,26 @@ async fn syncs_all_forks_from_single_peer() { net.peer(0).push_blocks(10, false); net.peer(1).push_blocks(10, false); - // poll until the two nodes connect, otherwise announcing the block will not work net.run_until_connected().await; - // Peer 0 produces new blocks and announces. - let branch1 = net.peer(0).push_blocks_at(BlockId::Number(10), 2, true).pop().unwrap(); - - // Wait till peer 1 starts downloading - loop { - futures::future::poll_fn::<(), _>(|cx| { - net.poll(cx); - Poll::Ready(()) - }) - .await; - - if net.peer(1).sync_service().status().await.unwrap().best_seen_block == Some(12) { - break; - } + let mut branch1 = None; + for i in 0..2 { + let at = if i == 0 { BlockId::Number(10) } else { BlockId::Hash(branch1.unwrap()) }; + branch1 = net.peer(0).push_blocks_at(at, 1, i == 0).pop(); + net.run_until_idle().await; } + let branch1 = branch1.unwrap(); - // Peer 0 produces and announces another fork - let branch2 = net.peer(0).push_blocks_at(BlockId::Number(10), 2, false).pop().unwrap(); + let mut branch2 = None; + for i in 0..2 { + let at = if i == 0 { BlockId::Number(10) } else { BlockId::Hash(branch2.unwrap()) }; + branch2 = net.peer(0).push_blocks_at(at, 1, false).pop(); + net.run_until_idle().await; + } + let branch2 = branch2.unwrap(); net.run_until_sync().await; - // Peer 1 should have both branches, assert!(net.peer(1).client().header(branch1).unwrap().is_some()); assert!(net.peer(1).client().header(branch2).unwrap().is_some()); } @@ -1443,3 +1438,52 @@ async fn syncs_blocks_with_large_headers() { assert_eq!(net.peer(2).client.info().best_number, 512); assert!(net.peers()[0].blockchain_canon_equals(&net.peers()[2])); } + +/// It sometimes happens that a node is producing a fork, then connects to some other node. Both of +/// these nodes share the same best block. Then the node produces a block that is his new best block +/// on this fork. The other node sees this best block and tries to download it, but fails on import +/// because it has not imported all the fork blocks. This test ensures that this can not happen +/// again. +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn fork_block_announcement_does_not_cause_unknown_parent() { + sp_tracing::try_init_simple(); + let mut net = TestNet::new(2); + + // Both peers build the same canonical chain of 20 blocks. + net.peer(0).push_blocks(20, false); + net.peer(1).push_blocks(20, false); + + let best_hash_20 = net.peer(0).client().info().best_hash; + assert_eq!(best_hash_20, net.peer(1).client().info().best_hash); + assert_eq!(net.peer(1).client().info().best_number, 20); + assert!(net.peers()[0].blockchain_canon_equals(&net.peers()[1])); + + // Peer 0 creates a fork from block 10 (fork blocks 11..20) without making it the best. + let fork_hashes = + net.peer(0) + .push_blocks_at_without_informing_sync(BlockId::Number(10), 10, true, false); + let fork_tip = *fork_hashes.last().unwrap(); + + // Peer 0's best is still canonical block 20. + assert_eq!(net.peer(0).client().info().best_hash, best_hash_20); + + // Connect the peers. Both handshake with best=20 (canonical). + net.run_until_connected().await; + net.run_until_idle().await; + + // Peer 0 produces block 21 on the fork. This makes the fork the new best chain + // (height 21 > 20) and announces it. + net.peer(0).push_blocks_at(BlockId::Hash(fork_tip), 1, false); + assert_eq!(net.peer(0).client().info().best_number, 21); + + // Wait for peer 1 to sync. It should resolve the fork via ancestor search back to + // common block 10, download fork blocks 11..21, and switch to the fork as best. + net.run_until_sync().await; + + assert_eq!(net.peer(1).client().info().best_number, 21); + assert!(net.peers()[0].blockchain_canon_equals(&net.peers()[1])); + + // Ensure it did not first tried to import the fork block without having downloaded the full + // fork. + assert_eq!(net.peer(1).import_error_count(), 0); +}