diff --git a/client/consensus/common/src/import_queue.rs b/client/consensus/common/src/import_queue.rs index a7b456191b000..c71e21ccd4b00 100644 --- a/client/consensus/common/src/import_queue.rs +++ b/client/consensus/common/src/import_queue.rs @@ -160,6 +160,16 @@ pub enum BlockImportStatus { ImportedUnknown(N, ImportedAux, Option), } +impl BlockImportStatus { + /// Returns the imported block number. + pub fn number(&self) -> &N { + match self { + BlockImportStatus::ImportedKnown(n, _) | + BlockImportStatus::ImportedUnknown(n, _, _) => n, + } + } +} + /// Block import error. #[derive(Debug, thiserror::Error)] pub enum BlockImportError { diff --git a/client/network/sync/src/blocks.rs b/client/network/sync/src/blocks.rs index b897184e7a44c..2cca13bfd7cc7 100644 --- a/client/network/sync/src/blocks.rs +++ b/client/network/sync/src/blocks.rs @@ -18,7 +18,7 @@ use crate::message; use libp2p::PeerId; -use log::trace; +use log::{debug, trace}; use sp_runtime::traits::{Block as BlockT, NumberFor, One}; use std::{ cmp, @@ -39,6 +39,7 @@ pub struct BlockData { enum BlockRangeState { Downloading { len: NumberFor, downloading: u32 }, Complete(Vec>), + Queued { len: NumberFor }, } impl BlockRangeState { @@ -46,6 +47,7 @@ impl BlockRangeState { match *self { Self::Downloading { len, .. } => len, Self::Complete(ref blocks) => (blocks.len() as u32).into(), + Self::Queued { len } => len, } } } @@ -56,12 +58,19 @@ pub struct BlockCollection { /// Downloaded blocks. blocks: BTreeMap, BlockRangeState>, peer_requests: HashMap>, + /// Block ranges downloaded and queued for import. + /// Maps start_hash => (start_num, end_num). + queued_blocks: HashMap, NumberFor)>, } impl BlockCollection { /// Create a new instance. pub fn new() -> Self { - Self { blocks: BTreeMap::new(), peer_requests: HashMap::new() } + Self { + blocks: BTreeMap::new(), + peer_requests: HashMap::new(), + queued_blocks: HashMap::new(), + } } /// Clear everything. @@ -170,29 +179,52 @@ impl BlockCollection { } /// Get a valid chain of blocks ordered in descending order and ready for importing into - /// blockchain. - pub fn drain(&mut self, from: NumberFor) -> Vec> { - let mut drained = Vec::new(); - let mut ranges = Vec::new(); + /// the blockchain. + pub fn ready_blocks(&mut self, from: NumberFor) -> Vec> { + let mut ready = Vec::new(); let mut prev = from; - for (start, range_data) in &mut self.blocks { - match range_data { - BlockRangeState::Complete(blocks) if *start <= prev => { - prev = *start + (blocks.len() as u32).into(); - // Remove all elements from `blocks` and add them to `drained` - drained.append(blocks); - ranges.push(*start); + for (&start, range_data) in &mut self.blocks { + if start > prev { + break + } + let len = match range_data { + BlockRangeState::Complete(blocks) => { + let len = (blocks.len() as u32).into(); + prev = start + len; + // Remove all elements from `blocks` and add them to `ready` + ready.append(blocks); + len }, + BlockRangeState::Queued { .. } => continue, _ => break, - } + }; + *range_data = BlockRangeState::Queued { len }; } - for r in ranges { - self.blocks.remove(&r); + if let Some(BlockData { block, .. }) = ready.first() { + self.queued_blocks + .insert(block.hash, (from, from + (ready.len() as u32).into())); + } + + trace!(target: "sync", "{} blocks ready for import", ready.len()); + ready + } + + pub fn clear_queued(&mut self, from_hash: &B::Hash) { + match self.queued_blocks.remove(from_hash) { + None => { + debug!(target: "sync", "Can't clear unknown queued blocks from {:?}", from_hash); + }, + Some((from, to)) => { + let mut block_num = from; + while block_num < to { + self.blocks.remove(&block_num); + block_num += One::one(); + } + trace!(target: "sync", "Cleared blocks from {:?} to {:?}", from, to); + }, } - trace!(target: "sync", "Drained {} blocks from {:?}", drained.len(), from); - drained } pub fn clear_peer_download(&mut self, who: &PeerId) { @@ -268,14 +300,14 @@ mod test { bc.clear_peer_download(&peer1); bc.insert(41, blocks[41..81].to_vec(), peer1.clone()); - assert_eq!(bc.drain(1), vec![]); + assert_eq!(bc.ready_blocks(1), vec![]); assert_eq!(bc.needed_blocks(peer1.clone(), 40, 150, 0, 1, 200), Some(121..151)); bc.clear_peer_download(&peer0); bc.insert(1, blocks[1..11].to_vec(), peer0.clone()); assert_eq!(bc.needed_blocks(peer0.clone(), 40, 150, 0, 1, 200), Some(11..41)); assert_eq!( - bc.drain(1), + bc.ready_blocks(1), blocks[1..11] .iter() .map(|b| BlockData { block: b.clone(), origin: Some(peer0.clone()) }) @@ -285,16 +317,16 @@ mod test { bc.clear_peer_download(&peer0); bc.insert(11, blocks[11..41].to_vec(), peer0.clone()); - let drained = bc.drain(12); + let ready = bc.ready_blocks(12); assert_eq!( - drained[..30], + ready[..30], blocks[11..41] .iter() .map(|b| BlockData { block: b.clone(), origin: Some(peer0.clone()) }) .collect::>()[..] ); assert_eq!( - drained[30..], + ready[30..], blocks[41..81] .iter() .map(|b| BlockData { block: b.clone(), origin: Some(peer1.clone()) }) @@ -308,17 +340,17 @@ mod test { bc.clear_peer_download(&peer1); bc.insert(121, blocks[121..150].to_vec(), peer1.clone()); - assert_eq!(bc.drain(80), vec![]); - let drained = bc.drain(81); + assert_eq!(bc.ready_blocks(80), vec![]); + let ready = bc.ready_blocks(81); assert_eq!( - drained[..40], + ready[..40], blocks[81..121] .iter() .map(|b| BlockData { block: b.clone(), origin: Some(peer2.clone()) }) .collect::>()[..] ); assert_eq!( - drained[40..], + ready[40..], blocks[121..150] .iter() .map(|b| BlockData { block: b.clone(), origin: Some(peer1.clone()) }) @@ -344,4 +376,32 @@ mod test { Some(100 + 128..100 + 128 + 128) ); } + + #[test] + fn no_duplicate_requests_on_fork() { + let mut bc = BlockCollection::new(); + assert!(is_empty(&bc)); + let peer = PeerId::random(); + + let blocks = generate_blocks(10); + + // count = 5, peer_best = 50, common = 39, max_parallel = 0, max_ahead = 200 + assert_eq!(bc.needed_blocks(peer.clone(), 5, 50, 39, 0, 200), Some(40..45)); + + // got a response on the request for `40..45` + bc.clear_peer_download(&peer); + bc.insert(40, blocks[..5].to_vec(), peer.clone()); + + // our "node" started on a fork, with its current best = 47, which is > common + let ready = bc.ready_blocks(48); + assert_eq!( + ready, + blocks[..5] + .iter() + .map(|b| BlockData { block: b.clone(), origin: Some(peer.clone()) }) + .collect::>() + ); + + assert_eq!(bc.needed_blocks(peer.clone(), 5, 50, 39, 0, 200), Some(45..50)); + } } diff --git a/client/network/sync/src/lib.rs b/client/network/sync/src/lib.rs index bc0ed46c3f068..dccb08a9f3a92 100644 --- a/client/network/sync/src/lib.rs +++ b/client/network/sync/src/lib.rs @@ -1084,7 +1084,7 @@ where { self.blocks.insert(start_block, blocks, *who); } - self.drain_blocks() + self.ready_blocks() }, PeerSyncState::DownloadingGap(_) => { peer.state = PeerSyncState::Available; @@ -1098,7 +1098,7 @@ where gap = true; let blocks: Vec<_> = gap_sync .blocks - .drain(gap_sync.best_queued_number + One::one()) + .ready_blocks(gap_sync.best_queued_number + One::one()) .into_iter() .map(|block_data| { let justifications = @@ -1420,6 +1420,12 @@ where OnBlockData::Import(origin, new_blocks) } + fn update_peer_common_number(&mut self, peer_id: &PeerId, new_common: NumberFor) { + if let Some(peer) = self.peers.get_mut(peer_id) { + peer.update_common_number(new_common); + } + } + /// Handle a response from the remote to a justification request that we made. /// /// `request` must be the original request that triggered `response`. @@ -1501,9 +1507,12 @@ where for (_, hash) in &results { self.queue_blocks.remove(hash); } + if let Some(from_hash) = results.first().map(|(_, h)| h) { + self.blocks.clear_queued(from_hash); + } for (result, hash) in results { if has_error { - continue + break } if result.is_err() { @@ -1511,11 +1520,10 @@ where } match result { - Ok(BlockImportStatus::ImportedKnown(number, who)) => { - if let Some(peer) = who.and_then(|p| self.peers.get_mut(&p)) { - peer.update_common_number(number); - } - }, + Ok(BlockImportStatus::ImportedKnown(number, who)) => + if let Some(peer) = who { + self.update_peer_common_number(&peer, number); + }, Ok(BlockImportStatus::ImportedUnknown(number, aux, who)) => { if aux.clear_justification_requests { trace!( @@ -1544,8 +1552,8 @@ where } } - if let Some(peer) = who.and_then(|p| self.peers.get_mut(&p)) { - peer.update_common_number(number); + if let Some(peer) = who { + self.update_peer_common_number(&peer, number); } let state_sync_complete = self.state_sync.as_ref().map_or(false, |s| s.target() == hash); @@ -1979,11 +1987,11 @@ where // is either one further ahead or it's the one they just announced, if we know about it. if is_best { if known && self.best_queued_number >= number { - peer.update_common_number(number); + self.update_peer_common_number(&who, number); } else if announce.header.parent_hash() == &self.best_queued_hash || known_parent && self.best_queued_number >= number { - peer.update_common_number(number - One::one()); + self.update_peer_common_number(&who, number - One::one()); } } self.allowed_requests.add(&who); @@ -2057,7 +2065,7 @@ where target.peers.remove(who); !target.peers.is_empty() }); - let blocks = self.drain_blocks(); + let blocks = self.ready_blocks(); if !blocks.is_empty() { Some(self.validate_and_queue_blocks(blocks, false)) } else { @@ -2177,10 +2185,10 @@ where } } - /// Drain the downloaded block set up to the first gap. - fn drain_blocks(&mut self) -> Vec> { + /// Get the set of downloaded blocks that are ready to be queued for import. + fn ready_blocks(&mut self) -> Vec> { self.blocks - .drain(self.best_queued_number + One::one()) + .ready_blocks(self.best_queued_number + One::one()) .into_iter() .map(|block_data| { let justifications = block_data @@ -2967,6 +2975,25 @@ mod test { best_block_num += MAX_BLOCKS_TO_REQUEST as u32; + let _ = sync.on_blocks_processed( + MAX_BLOCKS_TO_REQUEST as usize, + MAX_BLOCKS_TO_REQUEST as usize, + resp_blocks + .iter() + .rev() + .map(|b| { + ( + Ok(BlockImportStatus::ImportedUnknown( + b.header().number().clone(), + Default::default(), + Some(peer_id1.clone()), + )), + b.hash(), + ) + }) + .collect(), + ); + resp_blocks .into_iter() .rev() @@ -3151,6 +3178,147 @@ mod test { ); } + #[test] + fn syncs_fork_without_duplicate_requests() { + sp_tracing::try_init_simple(); + + let mut client = Arc::new(TestClientBuilder::new().build()); + let blocks = (0..MAX_BLOCKS_TO_LOOK_BACKWARDS * 4) + .map(|_| build_block(&mut client, None, false)) + .collect::>(); + + let fork_blocks = { + let mut client = Arc::new(TestClientBuilder::new().build()); + let fork_blocks = blocks[..MAX_BLOCKS_TO_LOOK_BACKWARDS as usize * 2] + .into_iter() + .inspect(|b| block_on(client.import(BlockOrigin::Own, (*b).clone())).unwrap()) + .cloned() + .collect::>(); + + fork_blocks + .into_iter() + .chain( + (0..MAX_BLOCKS_TO_LOOK_BACKWARDS * 2 + 1) + .map(|_| build_block(&mut client, None, true)), + ) + .collect::>() + }; + + let info = client.info(); + + let mut sync = ChainSync::new( + SyncMode::Full, + client.clone(), + Box::new(DefaultBlockAnnounceValidator), + 5, + None, + ) + .unwrap(); + + let finalized_block = blocks[MAX_BLOCKS_TO_LOOK_BACKWARDS as usize * 2 - 1].clone(); + let just = (*b"TEST", Vec::new()); + client + .finalize_block(BlockId::Hash(finalized_block.hash()), Some(just)) + .unwrap(); + sync.update_chain_info(&info.best_hash, info.best_number); + + let peer_id1 = PeerId::random(); + + let common_block = blocks[MAX_BLOCKS_TO_LOOK_BACKWARDS as usize / 2].clone(); + // Connect the node we will sync from + sync.new_peer(peer_id1.clone(), common_block.hash(), *common_block.header().number()) + .unwrap(); + + send_block_announce(fork_blocks.last().unwrap().header().clone(), &peer_id1, &mut sync); + + let mut request = + get_block_request(&mut sync, FromBlock::Number(info.best_number), 1, &peer_id1); + + // Do the ancestor search + loop { + let block = &fork_blocks[unwrap_from_block_number(request.from.clone()) as usize - 1]; + let response = create_block_response(vec![block.clone()]); + + let on_block_data = sync.on_block_data(&peer_id1, Some(request), response).unwrap(); + request = match on_block_data.into_request() { + Some(req) => req.1, + // We found the ancenstor + None => break, + }; + + log::trace!(target: "sync", "Request: {:?}", request); + } + + // Now request and import the fork. + let mut best_block_num = finalized_block.header().number().clone() as u32; + let mut request = get_block_request( + &mut sync, + FromBlock::Number(MAX_BLOCKS_TO_REQUEST as u64 + best_block_num as u64), + MAX_BLOCKS_TO_REQUEST as u32, + &peer_id1, + ); + let last_block_num = *fork_blocks.last().unwrap().header().number() as u32 - 1; + while best_block_num < last_block_num { + let from = unwrap_from_block_number(request.from.clone()); + + let mut resp_blocks = fork_blocks[best_block_num as usize..from as usize].to_vec(); + resp_blocks.reverse(); + + let response = create_block_response(resp_blocks.clone()); + + let res = sync.on_block_data(&peer_id1, Some(request.clone()), response).unwrap(); + assert!(matches!( + res, + OnBlockData::Import(_, blocks) if blocks.len() == MAX_BLOCKS_TO_REQUEST + ),); + + best_block_num += MAX_BLOCKS_TO_REQUEST as u32; + + if best_block_num < last_block_num { + // make sure we're not getting a duplicate request in the time before the blocks are + // processed + request = get_block_request( + &mut sync, + FromBlock::Number(MAX_BLOCKS_TO_REQUEST as u64 + best_block_num as u64), + MAX_BLOCKS_TO_REQUEST as u32, + &peer_id1, + ); + } + + let _ = sync.on_blocks_processed( + MAX_BLOCKS_TO_REQUEST as usize, + MAX_BLOCKS_TO_REQUEST as usize, + resp_blocks + .iter() + .rev() + .map(|b| { + ( + Ok(BlockImportStatus::ImportedUnknown( + b.header().number().clone(), + Default::default(), + Some(peer_id1.clone()), + )), + b.hash(), + ) + }) + .collect(), + ); + + resp_blocks + .into_iter() + .rev() + .for_each(|b| block_on(client.import(BlockOrigin::Own, b)).unwrap()); + } + + // Request the tip + get_block_request( + &mut sync, + FromBlock::Hash(fork_blocks.last().unwrap().hash()), + 1, + &peer_id1, + ); + } + #[test] fn removes_target_fork_on_disconnect() { sp_tracing::try_init_simple();