Skip to content
This repository was archived by the owner on Nov 15, 2023. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
85 changes: 51 additions & 34 deletions core/finality-grandpa/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,42 @@ struct TopicTracker {
set_id: u64,
}

impl TopicTracker {
fn is_expired(&self, round: u64, set_id: u64) -> bool {
if set_id < self.set_id {
trace!(target: "afg", "Expired: Message with expired set_id {} (ours {})", set_id, self.set_id);
telemetry!(CONSENSUS_TRACE; "afg.expired_set_id";
"set_id" => ?set_id, "ours" => ?self.set_id
);
return true;
} else if set_id == self.set_id + 1 {
// allow a few first rounds of future set.
if round > MESSAGE_ROUND_TOLERANCE {
trace!(target: "afg", "Expired: Message too far in the future set, round {} (ours set_id {})", round, self.set_id);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't we tag messages within the tolerance as Future?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think so, because we have enough information to validate them

telemetry!(CONSENSUS_TRACE; "afg.expired_msg_too_far_in_future_set";
"round" => ?round, "ours" => ?self.set_id
);
return true;
}
} else if set_id == self.set_id {
if round < self.min_live_round.saturating_sub(MESSAGE_ROUND_TOLERANCE) {
trace!(target: "afg", "Expired: Message round is out of bounds {} (ours {}-{})", round, self.min_live_round, self.max_round);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above.

telemetry!(CONSENSUS_TRACE; "afg.msg_round_oob";
"round" => ?round, "our_min_live_round" => ?self.min_live_round, "our_max_round" => ?self.max_round
);
return true;
}
} else {
trace!(target: "afg", "Expired: Message in invalid future set {} (ours {})", set_id, self.set_id);
telemetry!(CONSENSUS_TRACE; "afg.expired_msg_in_invalid_future_set";
"set_id" => ?set_id, "ours" => ?self.set_id
);
return true;
}
false
}
}

struct GossipValidator<Block: BlockT> {
rounds: parking_lot::RwLock<TopicTracker>,
_marker: ::std::marker::PhantomData<Block>,
Expand Down Expand Up @@ -294,38 +330,7 @@ impl<Block: BlockT> GossipValidator<Block> {
}

fn is_expired(&self, round: u64, set_id: u64) -> bool {
let rounds = self.rounds.read();
if set_id < rounds.set_id {
trace!(target: "afg", "Expired: Message with expired set_id {} (ours {})", set_id, rounds.set_id);
telemetry!(CONSENSUS_TRACE; "afg.expired_set_id";
"set_id" => ?set_id, "ours" => ?rounds.set_id
);
return true;
} else if set_id == rounds.set_id + 1 {
// allow a few first rounds of future set.
if round > MESSAGE_ROUND_TOLERANCE {
trace!(target: "afg", "Expired: Message too far in the future set, round {} (ours set_id {})", round, rounds.set_id);
telemetry!(CONSENSUS_TRACE; "afg.expired_msg_too_far_in_future_set";
"round" => ?round, "ours" => ?rounds.set_id
);
return true;
}
} else if set_id == rounds.set_id {
if round < rounds.min_live_round.saturating_sub(MESSAGE_ROUND_TOLERANCE) {
trace!(target: "afg", "Expired: Message round is out of bounds {} (ours {}-{})", round, rounds.min_live_round, rounds.max_round);
telemetry!(CONSENSUS_TRACE; "afg.msg_round_oob";
"round" => ?round, "our_min_live_round" => ?rounds.min_live_round, "our_max_round" => ?rounds.max_round
);
return true;
}
} else {
trace!(target: "afg", "Expired: Message in invalid future set {} (ours {})", set_id, rounds.set_id);
telemetry!(CONSENSUS_TRACE; "afg.expired_msg_in_invalid_future_set";
"set_id" => ?set_id, "ours" => ?rounds.set_id
);
return true;
}
false
self.rounds.read().is_expired(round, set_id)
}

fn validate_round_message(&self, full: VoteOrPrecommitMessage<Block>)
Expand Down Expand Up @@ -401,6 +406,18 @@ impl<Block: BlockT> network_gossip::Validator<Block::Hash> for GossipValidator<B
}
}
}

fn message_expired<'a>(&'a self) -> Box<FnMut(Block::Hash, &[u8]) -> bool + 'a> {
let rounds = self.rounds.read();
Box::new(move |_topic, mut data| {
match GossipMessage::<Block>::decode(&mut data) {
None => true,
Some(GossipMessage::Commit(full)) => rounds.is_expired(full.round, full.set_id),
Some(GossipMessage::VoteOrPrecommit(full)) =>
rounds.is_expired(full.round, full.set_id),
}
})
}
}

/// A handle to the network. This is generally implemented by providing some
Expand Down Expand Up @@ -476,7 +493,7 @@ impl<B: BlockT, S: network::specialization::NetworkSpecialization<B>,> Network<B
self.validator.note_round(round, set_id);
let (tx, rx) = sync::oneshot::channel();
self.service.with_gossip(move |gossip, _| {
let inner_rx = gossip.messages_for(message_topic::<B>(round, set_id));
let inner_rx = gossip.messages_for(GRANDPA_ENGINE_ID, message_topic::<B>(round, set_id));
let _ = tx.send(inner_rx);
});
NetworkStream { outer: rx, inner: None }
Expand All @@ -501,7 +518,7 @@ impl<B: BlockT, S: network::specialization::NetworkSpecialization<B>,> Network<B
self.validator.note_set(set_id);
let (tx, rx) = sync::oneshot::channel();
self.service.with_gossip(move |gossip, _| {
let inner_rx = gossip.messages_for(commit_topic::<B>(set_id));
let inner_rx = gossip.messages_for(GRANDPA_ENGINE_ID, commit_topic::<B>(set_id));
let _ = tx.send(inner_rx);
});
NetworkStream { outer: rx, inner: None }
Expand Down
10 changes: 8 additions & 2 deletions core/finality-grandpa/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,10 @@ impl Network<Block> for MessageRouting {
self.validator.note_round(round, set_id);
let inner = self.inner.lock();
let peer = inner.peer(self.peer_id);
let messages = peer.consensus_gossip_messages_for(make_topic(round, set_id));
let messages = peer.consensus_gossip_messages_for(
GRANDPA_ENGINE_ID,
make_topic(round, set_id),
);

let messages = messages.map_err(
move |_| panic!("Messages for round {} dropped too early", round)
Expand Down Expand Up @@ -211,7 +214,10 @@ impl Network<Block> for MessageRouting {
self.validator.note_set(set_id);
let inner = self.inner.lock();
let peer = inner.peer(self.peer_id);
let messages = peer.consensus_gossip_messages_for(make_commit_topic(set_id));
let messages = peer.consensus_gossip_messages_for(
GRANDPA_ENGINE_ID,
make_commit_topic(set_id),
);

let messages = messages.map_err(
move |_| panic!("Commit messages for set {} dropped too early", set_id)
Expand Down
Loading