Skip to content
This repository was archived by the owner on Nov 15, 2023. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions client/network/src/protocol/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,26 @@ pub struct RemoteReadResponse {
pub proof: StorageProof,
}

/// Announcement summary used for debug logging.
#[derive(Debug)]
pub struct AnnouncementSummary<H: HeaderT> {
block_hash: H::Hash,
number: H::Number,
parent_hash: H::Hash,
state: Option<BlockState>,
}

impl<H: HeaderT> generic::BlockAnnounce<H> {
pub fn summary(&self) -> AnnouncementSummary<H> {
AnnouncementSummary {
block_hash: self.header.hash(),
number: *self.header.number(),
parent_hash: self.header.parent_hash().clone(),
state: self.state,
}
}
}

/// Generic types.
pub mod generic {
use bitflags::bitflags;
Expand Down
67 changes: 39 additions & 28 deletions client/network/src/protocol/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -505,9 +505,10 @@ impl<B: BlockT> ChainSync<B> {
}
}

/// Number of active sync requests.
/// Number of active forks requests. This includes
/// requests that are pending or could be issued right away.
pub fn num_sync_requests(&self) -> usize {
self.fork_targets.len()
self.fork_targets.values().filter(|f| f.number <= self.best_queued_number).count()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need to filter here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This function is used in tests to detect when there's no activity (block_unti_idle). Since now fork_targets might contain some future blocks, that are not requested until we reach that block number it will stall tests. Basically only forks that can be requested are considered as active.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ahh okay, ty. Maybe we should add some comment to the function to explain this :D

}

/// Number of downloaded blocks.
Expand Down Expand Up @@ -1421,23 +1422,36 @@ impl<B: BlockT> ChainSync<B> {
&mut self,
pre_validation_result: PreValidateBlockAnnounce<B::Header>,
) -> PollBlockAnnounceValidation<B::Header> {
trace!(
target: "sync",
"Finished block announce validation: {:?}",
pre_validation_result,
);

let (announce, is_best, who) = match pre_validation_result {
PreValidateBlockAnnounce::Failure { who, disconnect } => {
debug!(
target: "sync",
"Failed announce validation: {:?}, disconnect: {}",
who,
disconnect,
);
return PollBlockAnnounceValidation::Failure { who, disconnect }
},
PreValidateBlockAnnounce::Process { announce, is_new_best, who } => {
(announce, is_new_best, who)
},
PreValidateBlockAnnounce::Error { .. } | PreValidateBlockAnnounce::Skip =>
return PollBlockAnnounceValidation::Skip,
PreValidateBlockAnnounce::Error { .. } | PreValidateBlockAnnounce::Skip => {
debug!(
target: "sync",
"Ignored announce validation",
);
return PollBlockAnnounceValidation::Skip
},
};

trace!(
target: "sync",
"Finished block announce validation: from {:?}: {:?}. local_best={}",
who,
announce.summary(),
is_best,
);

let number = *announce.header.number();
let hash = announce.header.hash();
let parent_status = self.block_status(announce.header.parent_hash()).unwrap_or(BlockStatus::Unknown);
Expand Down Expand Up @@ -1508,25 +1522,22 @@ impl<B: BlockT> ChainSync<B> {
return PollBlockAnnounceValidation::ImportHeader { is_best, announce, who }
}

if number <= self.best_queued_number {
trace!(
target: "sync",
"Added sync target for block announced from {}: {} {:?}",
who,
hash,
announce.header,
);
self.fork_targets
.entry(hash.clone())
.or_insert_with(|| ForkTarget {
number,
parent_hash: Some(*announce.header.parent_hash()),
peers: Default::default(),
})
.peers.insert(who.clone());
}
trace!(
target: "sync",
"Added sync target for block announced from {}: {} {:?}",
who,
hash,
announce.summary(),
);
self.fork_targets
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, the problem was that we only synced forks if the block number was below our best number?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the block was below best, sync assumed it will be downloaded by simply advancing from peers common number. fork_targets was intended for old forks that are behind our current best block, but it is clear now this mechanism should be used for parallel current forks as well.

.entry(hash.clone())
.or_insert_with(|| ForkTarget {
number,
parent_hash: Some(*announce.header.parent_hash()),
peers: Default::default(),
})
.peers.insert(who.clone());

trace!(target: "sync", "Announce validation result is nothing");
PollBlockAnnounceValidation::Nothing { is_best, who, announce }
}

Expand Down
11 changes: 8 additions & 3 deletions client/network/test/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -489,6 +489,11 @@ impl<D, B> Peer<D, B> where
&self.network.service()
}

/// Get a reference to the network worker.
pub fn network(&self) -> &NetworkWorker<Block, <Block as BlockT>::Hash> {
&self.network
}

/// Test helper to compare the blockchain state of multiple (networked)
/// clients.
pub fn blockchain_canon_equals(&self, other: &Self) -> bool {
Expand Down Expand Up @@ -985,12 +990,12 @@ pub trait TestNetFactory: Sized where <Self::BlockImport as BlockImport<Block>>:
/// Polls the testnet. Processes all the pending actions.
fn poll(&mut self, cx: &mut FutureContext) {
self.mut_peers(|peers| {
for peer in peers {
trace!(target: "sync", "-- Polling {}", peer.id());
for (i, peer) in peers.into_iter().enumerate() {
trace!(target: "sync", "-- Polling {}: {}", i, peer.id());
if let Poll::Ready(()) = peer.network.poll_unpin(cx) {
panic!("NetworkWorker has terminated unexpectedly.")
}
trace!(target: "sync", "-- Polling complete {}", peer.id());
trace!(target: "sync", "-- Polling complete {}: {}", i, peer.id());

// We poll `imported_blocks_stream`.
while let Poll::Ready(Some(notification)) = peer.imported_blocks_stream.as_mut().poll_next(cx) {
Expand Down
77 changes: 77 additions & 0 deletions client/network/test/src/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -740,6 +740,27 @@ impl BlockAnnounceValidator<Block> for NewBestBlockAnnounceValidator {
}
}

/// Returns `Validation::Failure` for specified block number
struct FailingBlockAnnounceValidator(u64);

impl BlockAnnounceValidator<Block> for FailingBlockAnnounceValidator {
fn validate(
&mut self,
header: &Header,
_: &[u8],
) -> Pin<Box<dyn Future<Output = Result<Validation, Box<dyn std::error::Error + Send>>> + Send>> {
let number = *header.number();
let target_number = self.0;
async move { Ok(
if number == target_number {
Validation::Failure { disconnect: false }
} else {
Validation::Success { is_new_best: true }
}
) }.boxed()
}
}

#[test]
fn sync_blocks_when_block_announce_validator_says_it_is_new_best() {
sp_tracing::try_init_simple();
Expand Down Expand Up @@ -1010,3 +1031,59 @@ fn multiple_requests_are_accepted_as_long_as_they_are_not_fulfilled() {
Poll::Ready(())
}));
}

#[test]
fn syncs_all_forks_from_single_peer() {
sp_tracing::try_init_simple();
let mut net = TestNet::new(2);
net.peer(0).push_blocks(10, false);
net.peer(1).push_blocks(10, false);

// poll until the two nodes connect, otherwise announcing the block will not work
net.block_until_connected();

// Peer 0 produces new blocks and announces.
let branch1 = net.peer(0).push_blocks_at(BlockId::Number(10), 2, true);

// Wait till peer 1 starts downloading
block_on(futures::future::poll_fn::<(), _>(|cx| {
net.poll(cx);
if net.peer(1).network().best_seen_block() != Some(12) {
return Poll::Pending
}
Poll::Ready(())
}));

// Peer 0 produces and announces another fork
let branch2 = net.peer(0).push_blocks_at(BlockId::Number(10), 2, false);

net.block_until_sync();

// Peer 1 should have both branches,
assert!(net.peer(1).client().header(&BlockId::Hash(branch1)).unwrap().is_some());
assert!(net.peer(1).client().header(&BlockId::Hash(branch2)).unwrap().is_some());
}

#[test]
fn syncs_after_missing_announcement() {
sp_tracing::try_init_simple();
let mut net = TestNet::new(0);
net.add_full_peer_with_config(Default::default());
// Set peer 1 to ignore announcement
net.add_full_peer_with_config(FullPeerConfig {
block_announce_validator: Some(Box::new(FailingBlockAnnounceValidator(11))),
..Default::default()
});
net.peer(0).push_blocks(10, false);
net.peer(1).push_blocks(10, false);

net.block_until_connected();

// Peer 0 produces a new block and announces. Peer 1 ignores announcement.
net.peer(0).push_blocks_at(BlockId::Number(10), 1, false);
// Peer 0 produces another block and announces.
let final_block = net.peer(0).push_blocks_at(BlockId::Number(11), 1, false);
net.peer(1).push_blocks_at(BlockId::Number(10), 1, true);
net.block_until_sync();
assert!(net.peer(1).client().header(&BlockId::Hash(final_block)).unwrap().is_some());
}