Skip to content
This repository was archived by the owner on Nov 15, 2023. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 45 additions & 11 deletions client/network/src/protocol/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -400,7 +400,18 @@ enum PreValidateBlockAnnounce<H> {
/// The announcement.
announce: BlockAnnounce<H>,
},
/// The announcement validation returned an error.
///
/// An error means that *this* node failed to validate it because some internal error happened.
/// If the block announcement was invalid, [`Self::Failure`] is the correct variant to express
/// this.
Error {
who: PeerId,
},
/// The block announcement should be skipped.
///
/// This should *only* be returned when there wasn't a slot registered
/// for this block announcement validation.
Skip,
}

Expand Down Expand Up @@ -1223,6 +1234,11 @@ impl<B: BlockT> ChainSync<B> {
/// is capped.
///
/// Returns [`HasSlotForBlockAnnounceValidation`] to inform about the result.
///
/// # Note
///
/// It is *required* to call [`Self::peer_block_announce_validation_finished`] when the
/// validation is finished to clear the slot.
fn has_slot_for_block_announce_validation(&mut self, peer: &PeerId) -> HasSlotForBlockAnnounceValidation {
if self.block_announce_validation.len() >= MAX_CONCURRENT_BLOCK_ANNOUNCE_VALIDATIONS {
return HasSlotForBlockAnnounceValidation::TotalMaximumSlotsReached
Expand Down Expand Up @@ -1324,15 +1340,20 @@ impl<B: BlockT> ChainSync<B> {
Ok(Validation::Failure { disconnect }) => {
debug!(
target: "sync",
"Block announcement validation of block {} from {} failed",
"Block announcement validation of block {:?} from {} failed",
hash,
who,
);
PreValidateBlockAnnounce::Failure { who, disconnect }
}
Err(e) => {
error!(target: "sync", "💔 Block announcement validation errored: {}", e);
PreValidateBlockAnnounce::Skip
error!(
target: "sync",
"💔 Block announcement validation of block {:?} errored: {}",
hash,
e,
);
PreValidateBlockAnnounce::Error { who }
}
}
}.boxed());
Expand All @@ -1352,14 +1373,27 @@ impl<B: BlockT> ChainSync<B> {
cx: &mut std::task::Context,
) -> Poll<PollBlockAnnounceValidation<B::Header>> {
match self.block_announce_validation.poll_next_unpin(cx) {
Poll::Ready(Some(res)) => Poll::Ready(self.finish_block_announce_validation(res)),
Poll::Ready(Some(res)) => {
self.peer_block_announce_validation_finished(&res);
Poll::Ready(self.finish_block_announce_validation(res))
},
_ => Poll::Pending,
}
}

/// Should be called when a block announce validation was finished, to update the stats
/// of the given peer.
fn peer_block_announce_validation_finished(&mut self, peer: &PeerId) {
/// Should be called when a block announce validation is finished, to update the slots
/// of the peer that send the block announce.
fn peer_block_announce_validation_finished(
&mut self,
res: &PreValidateBlockAnnounce<B::Header>,
) {
let peer = match res {
PreValidateBlockAnnounce::Failure { who, .. } |
PreValidateBlockAnnounce::Process { who, .. } |
PreValidateBlockAnnounce::Error { who } => who,
PreValidateBlockAnnounce::Skip => return,
};

match self.block_announce_validation_per_peer_stats.entry(peer.clone()) {
Entry::Vacant(_) => {
error!(
Expand All @@ -1369,7 +1403,8 @@ impl<B: BlockT> ChainSync<B> {
);
},
Entry::Occupied(mut entry) => {
if entry.get_mut().saturating_sub(1) == 0 {
*entry.get_mut() = entry.get().saturating_sub(1);
if *entry.get() == 0 {
entry.remove();
}
}
Expand All @@ -1389,14 +1424,13 @@ impl<B: BlockT> ChainSync<B> {

let (announce, is_best, who) = match pre_validation_result {
PreValidateBlockAnnounce::Failure { who, disconnect } => {
self.peer_block_announce_validation_finished(&who);
return PollBlockAnnounceValidation::Failure { who, disconnect }
},
PreValidateBlockAnnounce::Process { announce, is_new_best, who } => {
self.peer_block_announce_validation_finished(&who);
(announce, is_new_best, who)
},
PreValidateBlockAnnounce::Skip => return PollBlockAnnounceValidation::Skip,
PreValidateBlockAnnounce::Error { .. } | PreValidateBlockAnnounce::Skip =>
return PollBlockAnnounceValidation::Skip,
};

let number = *announce.header.number();
Expand Down
38 changes: 38 additions & 0 deletions client/network/test/src/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -897,3 +897,41 @@ fn block_announce_data_is_propagated() {
net.block_until_idle();
}
}

#[test]
fn continue_to_sync_after_some_block_announcement_verifications_failed() {
struct TestBlockAnnounceValidator;

impl BlockAnnounceValidator<Block> for TestBlockAnnounceValidator {
fn validate(
&mut self,
header: &Header,
_: &[u8],
) -> Pin<Box<dyn Future<Output = Result<Validation, Box<dyn std::error::Error + Send>>> + Send>> {
let number = *header.number();
async move {
if number < 100 {
Err(Box::<dyn std::error::Error + Send + Sync>::from(String::from("error")) as Box<_>)
} else {
Ok(Validation::Success { is_new_best: false })
}
}.boxed()
}
}

sp_tracing::try_init_simple();
let mut net = TestNet::new(1);

net.add_full_peer_with_config(FullPeerConfig {
block_announce_validator: Some(Box::new(TestBlockAnnounceValidator)),
..Default::default()
});

net.block_until_connected();
net.block_until_idle();

let block_hash = net.peer(0).push_blocks(500, true);

net.block_until_sync();
assert!(net.peer(1).has_block(&block_hash));
}
4 changes: 4 additions & 0 deletions primitives/consensus/common/src/block_validation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,10 @@ pub trait BlockAnnounceValidator<B: Block> {
///
/// Returning [`Validation::Failure`] will lead to a decrease of the
/// peers reputation as it sent us invalid data.
///
/// The returned future should only resolve to an error iff there was an internal error validating
/// the block announcement. If the block announcement itself is invalid, this should *always*
/// return [`Validation::Failure`].
fn validate(
&mut self,
header: &B::Header,
Expand Down