Skip to content
This repository was archived by the owner on Nov 15, 2023. It is now read-only.
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
131 changes: 122 additions & 9 deletions client/network/src/protocol/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1510,20 +1510,38 @@ impl<B: BlockT> ChainSync<B> {
self.pending_requests.set_all();
}

/// Restart the sync process.
fn restart<'a>(&'a mut self) -> impl Iterator<Item = Result<(PeerId, BlockRequest<B>), BadPeer>> + 'a {
/// Restart the sync process. This will reset all pending block requests and return an iterator
/// of new block requests to make to peers. Peers that were downloading finality data (i.e.
/// their state was `DownloadingJustification` or `DownloadingFinalityProof`) are unaffected and
/// will stay in the same state.
fn restart<'a>(
&'a mut self,
) -> impl Iterator<Item = Result<(PeerId, BlockRequest<B>), BadPeer>> + 'a {
self.blocks.clear();
let info = self.client.info();
self.best_queued_hash = info.best_hash;
self.best_queued_number = std::cmp::max(info.best_number, self.best_imported_number);
self.pending_requests.set_all();
debug!(target:"sync", "Restarted with {} ({})", self.best_queued_number, self.best_queued_hash);
let old_peers = std::mem::take(&mut self.peers);

old_peers.into_iter().filter_map(move |(id, p)| {
// peers that were downloading justifications or finality proofs
// should be kept in that state.
match p.state {
PeerSyncState::DownloadingJustification(_)
| PeerSyncState::DownloadingFinalityProof(_) => {
self.peers.insert(id, p);
return None;
}
_ => {}
}

// handle peers that were in other states.
match self.new_peer(id.clone(), p.best_hash, p.best_number) {
Ok(None) => None,
Ok(Some(x)) => Some(Ok((id, x))),
Err(e) => Some(Err(e))
Err(e) => Some(Err(e)),
}
})
}
Expand Down Expand Up @@ -1792,15 +1810,15 @@ fn validate_blocks<Block: BlockT>(blocks: &Vec<message::BlockData<Block>>, who:

#[cfg(test)]
mod test {
use super::*;
use super::message::FromBlock;
use substrate_test_runtime_client::{
runtime::Block,
DefaultTestClientBuilderExt, TestClientBuilder, TestClientBuilderExt,
};
use sp_blockchain::HeaderBackend;
use super::*;
use sc_block_builder::BlockBuilderProvider;
use sp_blockchain::HeaderBackend;
use sp_consensus::block_validation::DefaultBlockAnnounceValidator;
use substrate_test_runtime_client::{
runtime::{Block, Hash},
ClientBlockImportExt, DefaultTestClientBuilderExt, TestClientBuilder, TestClientBuilderExt,
};

#[test]
fn processes_empty_response_on_justification_request_for_unknown_block() {
Expand Down Expand Up @@ -1879,4 +1897,99 @@ mod test {
})
);
}

#[test]
fn restart_doesnt_affect_peers_downloading_finality_data() {
let mut client = Arc::new(TestClientBuilder::new().build());
let info = client.info();

let mut sync = ChainSync::new(
Roles::AUTHORITY,
client.clone(),
&info,
None,
Box::new(DefaultBlockAnnounceValidator),
1,
);

let peer_id1 = PeerId::random();
let peer_id2 = PeerId::random();
let peer_id3 = PeerId::random();
let peer_id4 = PeerId::random();

let mut new_blocks = |n| {
for _ in 0..n {
let block = client.new_block(Default::default()).unwrap().build().unwrap().block;
client.import(BlockOrigin::Own, block.clone()).unwrap();
}

let info = client.info();
(info.best_hash, info.best_number)
};

let (b1_hash, b1_number) = new_blocks(50);
let (b2_hash, b2_number) = new_blocks(10);

// add 2 peers at blocks that we don't have locally
sync.new_peer(peer_id1.clone(), Hash::random(), 42).unwrap();
sync.new_peer(peer_id2.clone(), Hash::random(), 10).unwrap();

// we wil send block requests to these peers
// for these blocks we don't know about
assert!(sync.block_requests().all(|(p, _)| { *p == peer_id1 || *p == peer_id2 }));

// add a new peer at a known block
sync.new_peer(peer_id3.clone(), b1_hash, b1_number).unwrap();

// we request a justification for a block we have locally
sync.request_justification(&b1_hash, b1_number);

// the justification request should be scheduled to the
// new peer which is at the given block
assert!(sync.justification_requests().any(|(p, r)| {
p == peer_id3
&& r.fields == BlockAttributes::JUSTIFICATION
&& r.from == message::FromBlock::Hash(b1_hash)
&& r.to == None
}));

assert_eq!(
sync.peers.get(&peer_id3).unwrap().state,
PeerSyncState::DownloadingJustification(b1_hash),
);

// add another peer at a known later block
sync.new_peer(peer_id4.clone(), b2_hash, b2_number).unwrap();

// we request a finality proof for a block we have locally
sync.request_finality_proof(&b2_hash, b2_number);

// the finality proof request should be scheduled to peer 4
// which is at that block
assert!(
sync.finality_proof_requests().any(|(p, r)| { p == peer_id4 && r.block == b2_hash })
);

assert_eq!(
sync.peers.get(&peer_id4).unwrap().state,
PeerSyncState::DownloadingFinalityProof(b2_hash),
);

// we restart the sync state
let block_requests = sync.restart();

// which should make us send out block requests to the first two peers
assert!(block_requests.map(|r| r.unwrap()).all(|(p, _)| { p == peer_id1 || p == peer_id2 }));

// peer 3 and 4 should be unaffected as they were downloading finality data
assert_eq!(
sync.peers.get(&peer_id3).unwrap().state,
PeerSyncState::DownloadingJustification(b1_hash),
);

assert_eq!(
sync.peers.get(&peer_id4).unwrap().state,
PeerSyncState::DownloadingFinalityProof(b2_hash),
);
}
}