diff --git a/Cargo.lock b/Cargo.lock index 96dbf5c3deef..bfe249dadd60 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -861,8 +861,8 @@ dependencies = [ [[package]] name = "finality-grandpa" -version = "0.8.0" -source = "git+https://github.com/paritytech/finality-grandpa?rev=f682e3dec54b19c5dd018324028c47f777f3b3a1#f682e3dec54b19c5dd018324028c47f777f3b3a1" +version = "0.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" dependencies = [ "futures 0.1.28 (registry+https://github.com/rust-lang/crates.io-index)", "hashmap_core 0.1.10 (registry+https://github.com/rust-lang/crates.io-index)", @@ -4361,7 +4361,7 @@ name = "substrate-finality-grandpa" version = "2.0.0" dependencies = [ "env_logger 0.6.2 (registry+https://github.com/rust-lang/crates.io-index)", - "finality-grandpa 0.8.0 (git+https://github.com/paritytech/finality-grandpa?rev=f682e3dec54b19c5dd018324028c47f777f3b3a1)", + "finality-grandpa 0.8.1 (registry+https://github.com/rust-lang/crates.io-index)", "fork-tree 2.0.0", "futures 0.1.28 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.4.6 (registry+https://github.com/rust-lang/crates.io-index)", @@ -5912,7 +5912,7 @@ dependencies = [ "checksum failure_derive 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)" = "ea1063915fd7ef4309e222a5a07cf9c319fb9c7836b1f89b85458672dbb127e1" "checksum fake-simd 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)" = "e88a8acf291dafb59c2d96e8f59828f3838bb1a70398823ade51a84de6a6deed" "checksum fdlimit 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "b1ee15a7050e5580b3712877157068ea713b245b080ff302ae2ca973cfcd9baa" -"checksum finality-grandpa 0.8.0 (git+https://github.com/paritytech/finality-grandpa?rev=f682e3dec54b19c5dd018324028c47f777f3b3a1)" = "" +"checksum finality-grandpa 0.8.1 (registry+https://github.com/rust-lang/crates.io-index)" = "b9e7cba2aaadf09932452a4fc054a77451b31eb99bc0b42bf54bd44f01a9daf4" "checksum fixed-hash 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)" = "516877b7b9a1cc2d0293cbce23cd6203f0edbfd4090e6ca4489fecb5aa73050e" "checksum flate2 1.0.9 (registry+https://github.com/rust-lang/crates.io-index)" = "550934ad4808d5d39365e5d61727309bf18b3b02c6c56b729cb92e7dd84bc3d8" "checksum fnv 1.0.6 (registry+https://github.com/rust-lang/crates.io-index)" = "2fad85553e09a6f881f739c29f0b00b0f01357c743266d478b68951ce23285f3" diff --git a/core/finality-grandpa/Cargo.toml b/core/finality-grandpa/Cargo.toml index c7f5e0c03a32..41e4668b5149 100644 --- a/core/finality-grandpa/Cargo.toml +++ b/core/finality-grandpa/Cargo.toml @@ -24,10 +24,11 @@ network = { package = "substrate-network", path = "../network" } service = { package = "substrate-service", path = "../service", optional = true } srml-finality-tracker = { path = "../../srml/finality-tracker" } fg_primitives = { package = "substrate-finality-grandpa-primitives", path = "primitives" } -grandpa = { package = "finality-grandpa", version = "0.8.0", features = ["derive-codec"], git = "https://github.com/paritytech/finality-grandpa", rev = "f682e3dec54b19c5dd018324028c47f777f3b3a1" } +grandpa = { package = "finality-grandpa", version = "0.8.1", features = ["derive-codec"] } [dev-dependencies] consensus_common = { package = "substrate-consensus-common", path = "../consensus/common", features = ["test-helpers"] } +grandpa = { package = "finality-grandpa", version = "0.8.1", features = ["derive-codec", "test-helpers"] } network = { package = "substrate-network", path = "../network", features = ["test-helpers"] } keyring = { package = "substrate-keyring", path = "../keyring" } test-client = { package = "substrate-test-runtime-client", path = "../test-runtime/client"} diff --git a/core/finality-grandpa/src/communication/gossip.rs b/core/finality-grandpa/src/communication/gossip.rs index ca529125ec17..dfaa96628f2d 100644 --- a/core/finality-grandpa/src/communication/gossip.rs +++ b/core/finality-grandpa/src/communication/gossip.rs @@ -46,7 +46,7 @@ //! #### Propose //! //! This is a broadcast by a known voter of the last-round estimate. - +//! //! #### Commit //! //! These are used to announce past agreement of finality. @@ -58,6 +58,21 @@ //! Sending a commit is polite when it may finalize something that the receiving peer //! was not aware of. //! +//! #### Catch Up +//! +//! These allow a peer to request another peer, which they perceive to be in a +//! later round, to provide all the votes necessary to complete a given round +//! `R`. +//! +//! It is impolite to send a catch up request for a round `R` to a peer whose +//! announced view is behind `R`. It is also impolite to send a catch up request +//! to a peer in a new different Set ID. +//! +//! The logic for issuing and tracking pending catch up requests is implemented +//! in the `GossipValidator`. A catch up request is issued anytime we see a +//! neighbor packet from a peer at a round `CATCH_UP_THRESHOLD` higher than at +//! we are. +//! //! ## Expiration //! //! We keep some amount of recent rounds' messages, but do not accept new ones from rounds @@ -78,13 +93,20 @@ use log::{trace, debug, warn}; use futures::prelude::*; use futures::sync::mpsc; -use crate::{CompactCommit, SignedMessage}; +use crate::{environment, CatchUp, CompactCommit, SignedMessage}; use super::{cost, benefit, Round, SetId}; use std::collections::{HashMap, VecDeque}; use std::time::{Duration, Instant}; const REBROADCAST_AFTER: Duration = Duration::from_secs(60 * 5); +const CATCH_UP_REQUEST_TIMEOUT: Duration = Duration::from_secs(5); +const CATCH_UP_PROCESS_TIMEOUT: Duration = Duration::from_secs(15); +/// Maximum number of rounds we are behind a peer before issuing a +/// catch up request. +const CATCH_UP_THRESHOLD: u64 = 2; + +type Report = (PeerId, i32); /// An outcome of examining a message. #[derive(Debug, PartialEq, Clone, Copy)] @@ -230,6 +252,10 @@ pub(super) enum GossipMessage { Commit(FullCommitMessage), /// A neighbor packet. Not repropagated. Neighbor(VersionedNeighborPacket>), + /// Grandpa catch up request message with round and set info. Not repropagated. + CatchUpRequest(CatchUpRequestMessage), + /// Grandpa catch up message with round and set info. Not repropagated. + CatchUp(FullCatchUpMessage), } impl From>> for GossipMessage { @@ -264,9 +290,12 @@ pub(super) struct FullCommitMessage { /// and are not repropagated. These contain information about the node's state. #[derive(Debug, Encode, Decode, Clone)] pub(super) struct NeighborPacket { - round: Round, - set_id: SetId, - commit_finalized_height: N, + /// The round the node is currently at. + pub(super) round: Round, + /// The set ID the node is currently at. + pub(super) set_id: SetId, + /// The highest finalizing commit observed. + pub(super) commit_finalized_height: N, } /// A versioned neighbor packet. @@ -284,6 +313,24 @@ impl VersionedNeighborPacket { } } +/// A catch up request for a given round (or any further round) localized by set id. +#[derive(Clone, Debug, Encode, Decode)] +pub(super) struct CatchUpRequestMessage { + /// The round that we want to catch up to. + pub(super) round: Round, + /// The voter set ID this message is from. + pub(super) set_id: SetId, +} + +/// Network level catch up message with topic information. +#[derive(Debug, Encode, Decode)] +pub(super) struct FullCatchUpMessage { + /// The voter set ID this message is from. + pub(super) set_id: SetId, + /// The compact commit message. + pub(super) message: CatchUp, +} + /// Misbehavior that peers can perform. /// /// `cost` gives a cost that can be used to perform cost/benefit analysis of a @@ -294,6 +341,10 @@ pub(super) enum Misbehavior { InvalidViewChange, // could not decode neighbor message. bytes-length of the packet. UndecodablePacket(i32), + // Bad catch up message (invalid signatures). + BadCatchUpMessage { + signatures_checked: i32, + }, // Bad commit message BadCommitMessage { signatures_checked: i32, @@ -315,7 +366,9 @@ impl Misbehavior { match *self { InvalidViewChange => cost::INVALID_VIEW_CHANGE, - UndecodablePacket(bytes) => bytes.saturating_mul(cost::PER_UNDECODABLE_BYTE), + UndecodablePacket(bytes) => bytes.saturating_mul(cost::PER_UNDECODABLE_BYTE), + BadCatchUpMessage { signatures_checked } => + cost::PER_SIGNATURE_CHECKED.saturating_mul(signatures_checked), BadCommitMessage { signatures_checked, blocks_loaded, equivocations_caught } => { let cost = cost::PER_SIGNATURE_CHECKED .saturating_mul(signatures_checked) @@ -425,6 +478,23 @@ pub(super) enum Action { Discard(i32), } +/// State of catch up request handling. +#[derive(Debug)] +enum PendingCatchUp { + /// No pending catch up requests. + None, + /// Pending catch up request which has not been answered yet. + Requesting { + who: PeerId, + request: CatchUpRequestMessage, + instant: Instant, + }, + /// Pending catch up request that was answered and is being processed. + Processing { + instant: Instant, + }, +} + struct Inner { local_view: Option>>, peers: Peers>, @@ -432,6 +502,7 @@ struct Inner { authorities: Vec, config: crate::Config, next_rebroadcast: Instant, + pending_catch_up: PendingCatchUp, } type MaybeMessage = Option<(Vec, NeighborPacket>)>; @@ -444,6 +515,7 @@ impl Inner { live_topics: KeepTopics::new(), next_rebroadcast: Instant::now() + REBROADCAST_AFTER, authorities: Vec::new(), + pending_catch_up: PendingCatchUp::None, config, } } @@ -593,18 +665,201 @@ impl Inner { Action::ProcessAndDiscard(topic, benefit::BASIC_VALIDATED_COMMIT) } + fn validate_catch_up_message(&mut self, who: &PeerId, full: &FullCatchUpMessage) + -> Action + { + match &self.pending_catch_up { + PendingCatchUp::Requesting { who: peer, request, instant } => { + if peer != who { + return Action::Discard(Misbehavior::OutOfScopeMessage.cost()); + } + + if request.set_id != full.set_id { + return Action::Discard(cost::MALFORMED_CATCH_UP); + } + + if request.round.0 > full.message.round_number { + return Action::Discard(cost::MALFORMED_CATCH_UP); + } + + if full.message.prevotes.is_empty() || full.message.precommits.is_empty() { + return Action::Discard(cost::MALFORMED_CATCH_UP); + } + + // move request to pending processing state, we won't push out + // any catch up requests until we import this one (either with a + // success or failure). + self.pending_catch_up = PendingCatchUp::Processing { + instant: instant.clone(), + }; + + // always discard catch up messages, they're point-to-point + let topic = super::global_topic::(full.set_id.0); + Action::ProcessAndDiscard(topic, benefit::BASIC_VALIDATED_CATCH_UP) + }, + _ => Action::Discard(Misbehavior::OutOfScopeMessage.cost()), + } + } + + fn note_catch_up_message_processed(&mut self) { + match &self.pending_catch_up { + PendingCatchUp::Processing { .. } => { + self.pending_catch_up = PendingCatchUp::None; + }, + state => trace!(target: "afg", + "Noted processed catch up message when state was: {:?}", + state, + ), + } + } + + fn handle_catch_up_request( + &mut self, + who: &PeerId, + request: CatchUpRequestMessage, + set_state: &environment::SharedVoterSetState, + ) -> (Option>, Action) { + let local_view = match self.local_view { + None => return (None, Action::Discard(Misbehavior::OutOfScopeMessage.cost())), + Some(ref view) => view, + }; + + if request.set_id != local_view.set_id { + // NOTE: When we're close to a set change there is potentially a + // race where the peer sent us the request before it observed that + // we had transitioned to a new set. In this case we charge a lower + // cost. + if local_view.round.0.saturating_sub(CATCH_UP_THRESHOLD) == 0 { + return (None, Action::Discard(cost::HONEST_OUT_OF_SCOPE_CATCH_UP)); + } + + return (None, Action::Discard(Misbehavior::OutOfScopeMessage.cost())); + } + + match self.peers.peer(who) { + None => + return (None, Action::Discard(Misbehavior::OutOfScopeMessage.cost())), + Some(peer) if peer.view.round >= request.round => + return (None, Action::Discard(Misbehavior::OutOfScopeMessage.cost())), + _ => {}, + } + + let last_completed_round = set_state.read().last_completed_round(); + if last_completed_round.number < request.round.0 { + return (None, Action::Discard(Misbehavior::OutOfScopeMessage.cost())); + } + + trace!(target: "afg", "Replying to catch-up request for round {} from {} with round {}", + request.round.0, + who, + last_completed_round.number, + ); + + let mut prevotes = Vec::new(); + let mut precommits = Vec::new(); + + // NOTE: the set of votes stored in `LastCompletedRound` is a minimal + // set of votes, i.e. at most one equivocation is stored per voter. The + // code below assumes this invariant is maintained when creating the + // catch up reply since peers won't accept catch-up messages that have + // too many equivocations (we exceed the fault-tolerance bound). + for vote in last_completed_round.votes { + match vote.message { + grandpa::Message::Prevote(prevote) => { + prevotes.push(grandpa::SignedPrevote { + prevote, + signature: vote.signature, + id: vote.id, + }); + }, + grandpa::Message::Precommit(precommit) => { + precommits.push(grandpa::SignedPrecommit { + precommit, + signature: vote.signature, + id: vote.id, + }); + }, + _ => {}, + } + } + + let (base_hash, base_number) = last_completed_round.base; + + let catch_up = CatchUp:: { + round_number: last_completed_round.number, + prevotes, + precommits, + base_hash, + base_number, + }; + + let full_catch_up = GossipMessage::CatchUp::(FullCatchUpMessage { + set_id: request.set_id, + message: catch_up, + }); + + (Some(full_catch_up), Action::Discard(cost::CATCH_UP_REPLY)) + } + + fn try_catch_up(&mut self, who: &PeerId) -> (Option>, Option) { + let mut catch_up = None; + let mut report = None; + + // if the peer is on the same set and ahead of us by a margin bigger + // than `CATCH_UP_THRESHOLD` then we should ask it for a catch up + // message. + if let (Some(peer), Some(local_view)) = (self.peers.peer(who), &self.local_view) { + if peer.view.set_id == local_view.set_id && + peer.view.round.0.saturating_sub(CATCH_UP_THRESHOLD) > local_view.round.0 + { + // send catch up request if allowed + let round = peer.view.round.0 - 1; // peer.view.round is > 0 + let request = CatchUpRequestMessage { + set_id: peer.view.set_id, + round: Round(round), + }; + + let (catch_up_allowed, catch_up_report) = self.note_catch_up_request(who, &request); + + if catch_up_allowed { + trace!(target: "afg", "Sending catch-up request for round {} to {}", + round, + who, + ); + + catch_up = Some(GossipMessage::::CatchUpRequest(request)); + } + + report = catch_up_report; + } + } + + (catch_up, report) + } + fn import_neighbor_message(&mut self, who: &PeerId, update: NeighborPacket>) - -> (Vec, Action) + -> (Vec, Action, Option>, Option) { - let (cb, topics) = match self.peers.update_peer_state(who, update) { - Ok(view) => (100i32, view.map(|view| neighbor_topics::(view))), - Err(misbehavior) => (misbehavior.cost(), None) + let update_res = self.peers.update_peer_state(who, update); + + let (cost_benefit, topics) = match update_res { + Ok(view) => + (benefit::NEIGHBOR_MESSAGE, view.map(|view| neighbor_topics::(view))), + Err(misbehavior) => + (misbehavior.cost(), None), + }; + + let (catch_up, report) = match update_res { + Ok(_) => self.try_catch_up(who), + _ => (None, None), }; let neighbor_topics = topics.unwrap_or_default(); - // always discard, it's valid for one hop. - (neighbor_topics, Action::Discard(cb)) + // always discard neighbor messages, it's only valid for one hop. + let action = Action::Discard(cost_benefit); + + (neighbor_topics, action, catch_up, report) } fn multicast_neighbor_packet(&self) -> MaybeMessage { @@ -619,20 +874,55 @@ impl Inner { (peers, packet) }) } + + fn note_catch_up_request( + &mut self, + who: &PeerId, + catch_up_request: &CatchUpRequestMessage, + ) -> (bool, Option) { + let report = match &self.pending_catch_up { + PendingCatchUp::Requesting { who: peer, instant, .. } => + if instant.elapsed() <= CATCH_UP_REQUEST_TIMEOUT { + return (false, None); + } else { + // report peer for timeout + Some((peer.clone(), cost::CATCH_UP_REQUEST_TIMEOUT)) + }, + PendingCatchUp::Processing { instant, .. } => + if instant.elapsed() < CATCH_UP_PROCESS_TIMEOUT { + return (false, None); + } else { + None + }, + _ => None, + }; + + self.pending_catch_up = PendingCatchUp::Requesting { + who: who.clone(), + request: catch_up_request.clone(), + instant: Instant::now(), + }; + + (true, report) + } } /// A validator for GRANDPA gossip messages. pub(super) struct GossipValidator { inner: parking_lot::RwLock>, + set_state: environment::SharedVoterSetState, report_sender: mpsc::UnboundedSender, } impl GossipValidator { /// Create a new gossip-validator. This initialized the current set to 0. - pub(super) fn new(config: crate::Config) -> (GossipValidator, ReportStream) { + pub(super) fn new(config: crate::Config, set_state: environment::SharedVoterSetState) + -> (GossipValidator, ReportStream) + { let (tx, rx) = mpsc::unbounded(); let val = GossipValidator { inner: parking_lot::RwLock::new(Inner::new(config)), + set_state, report_sender: tx, }; @@ -670,26 +960,50 @@ impl GossipValidator { } } + /// Note that we've processed a catch up message. + pub(super) fn note_catch_up_message_processed(&self) { + self.inner.write().note_catch_up_message_processed(); + } + fn report(&self, who: PeerId, cost_benefit: i32) { let _ = self.report_sender.unbounded_send(PeerReport { who, cost_benefit }); } pub(super) fn do_validate(&self, who: &PeerId, mut data: &[u8]) - -> (Action, Vec) + -> (Action, Vec, Option>) { let mut broadcast_topics = Vec::new(); + let mut peer_reply = None; + let action = { match GossipMessage::::decode(&mut data) { Some(GossipMessage::VoteOrPrecommit(ref message)) => self.inner.write().validate_round_message(who, message), Some(GossipMessage::Commit(ref message)) => self.inner.write().validate_commit_message(who, message), Some(GossipMessage::Neighbor(update)) => { - let (topics, action) = self.inner.write().import_neighbor_message( + let (topics, action, catch_up, report) = self.inner.write().import_neighbor_message( who, update.into_neighbor_packet(), ); + if let Some((peer, cost_benefit)) = report { + self.report(peer, cost_benefit); + } + broadcast_topics = topics; + peer_reply = catch_up; + action + } + Some(GossipMessage::CatchUp(ref message)) + => self.inner.write().validate_catch_up_message(who, message), + Some(GossipMessage::CatchUpRequest(request)) => { + let (reply, action) = self.inner.write().handle_catch_up_request( + who, + request, + &self.set_state, + ); + + peer_reply = reply; action } None => { @@ -702,7 +1016,7 @@ impl GossipValidator { } }; - (action, broadcast_topics) + (action, broadcast_topics, peer_reply) } } @@ -734,9 +1048,13 @@ impl network_gossip::Validator for GossipValidator fn validate(&self, context: &mut dyn ValidatorContext, who: &PeerId, data: &[u8]) -> network_gossip::ValidationResult { - let (action, broadcast_topics) = self.do_validate(who, data); + let (action, broadcast_topics, peer_reply) = self.do_validate(who, data); // not with lock held! + if let Some(msg) = peer_reply { + context.send_message(who, msg.encode()); + } + for topic in broadcast_topics { context.send_topic(who, topic, false); } @@ -817,6 +1135,8 @@ impl network_gossip::Validator for GossipValidator && Some(full.message.target_number) > peer_best_commit } Some(GossipMessage::Neighbor(_)) => false, + Some(GossipMessage::CatchUpRequest(_)) => false, + Some(GossipMessage::CatchUp(_)) => false, Some(GossipMessage::VoteOrPrecommit(_)) => false, // should not be the case. } }) @@ -910,6 +1230,7 @@ impl> Future for ReportingTask { #[cfg(test)] mod tests { use super::*; + use super::environment::SharedVoterSetState; use network_gossip::Validator as GossipValidatorT; use network::test::Block; @@ -923,6 +1244,33 @@ mod tests { } } + // dummy voter set state + fn voter_set_state() -> SharedVoterSetState { + use crate::authorities::AuthoritySet; + use crate::environment::{CompletedRound, CompletedRounds, HasVoted, VoterSetState}; + use grandpa::round::State as RoundState; + use substrate_primitives::H256; + + let state = RoundState::genesis((H256::zero(), 0)); + let base = state.prevote_ghost.unwrap(); + let voters = AuthoritySet::genesis(Vec::new()); + let set_state = VoterSetState::Live { + completed_rounds: CompletedRounds::new( + CompletedRound { + state, + number: 0, + votes: Vec::new(), + base, + }, + 0, + &voters, + ), + current_round: HasVoted::No, + }; + + set_state.into() + } + #[test] fn view_vote_rules() { let view = View { round: Round(100), set_id: SetId(1), last_commit: Some(1000u64) }; @@ -1064,7 +1412,10 @@ mod tests { #[test] fn messages_not_expired_immediately() { - let (val, _) = GossipValidator::::new(config()); + let (val, _) = GossipValidator::::new( + config(), + voter_set_state(), + ); let set_id = 1; @@ -1096,7 +1447,10 @@ mod tests { fn message_from_unknown_authority_discarded() { assert!(cost::UNKNOWN_VOTER != cost::BAD_SIGNATURE); - let (val, _) = GossipValidator::::new(config()); + let (val, _) = GossipValidator::::new( + config(), + voter_set_state(), + ); let set_id = 1; let auth = AuthorityId::from_raw([1u8; 32]); let peer = PeerId::random(); @@ -1134,4 +1488,122 @@ mod tests { assert_eq!(unknown_voter, Action::Discard(cost::UNKNOWN_VOTER)); assert_eq!(bad_sig, Action::Discard(cost::BAD_SIGNATURE)); } + + #[test] + fn unsolicited_catch_up_messages_discarded() { + let (val, _) = GossipValidator::::new( + config(), + voter_set_state(), + ); + + let set_id = 1; + let auth = AuthorityId::from_raw([1u8; 32]); + let peer = PeerId::random(); + + val.note_set(SetId(set_id), vec![auth.clone()], |_, _| {}); + val.note_round(Round(0), |_, _| {}); + + let validate_catch_up = || { + let mut inner = val.inner.write(); + inner.validate_catch_up_message(&peer, &FullCatchUpMessage { + set_id: SetId(set_id), + message: grandpa::CatchUp { + round_number: 10, + prevotes: Default::default(), + precommits: Default::default(), + base_hash: Default::default(), + base_number: Default::default(), + } + }) + }; + + // the catch up is discarded because we have no pending request + assert_eq!(validate_catch_up(), Action::Discard(cost::OUT_OF_SCOPE_MESSAGE)); + + let noted = val.inner.write().note_catch_up_request( + &peer, + &CatchUpRequestMessage { + set_id: SetId(set_id), + round: Round(10), + } + ); + + assert!(noted.0); + + // catch up is allowed because we have requested it, but it's rejected + // because it's malformed (empty prevotes and precommits) + assert_eq!(validate_catch_up(), Action::Discard(cost::MALFORMED_CATCH_UP)); + } + + #[test] + fn unanswerable_catch_up_requests_discarded() { + // create voter set state with round 1 completed + let set_state: SharedVoterSetState = { + let mut completed_rounds = voter_set_state().read().completed_rounds(); + + assert!(completed_rounds.push(environment::CompletedRound { + number: 1, + state: grandpa::round::State::genesis(Default::default()), + base: Default::default(), + votes: Default::default(), + })); + + let set_state = environment::VoterSetState::::Live { + completed_rounds, + current_round: environment::HasVoted::No, + }; + + set_state.into() + }; + + let (val, _) = GossipValidator::::new( + config(), + set_state.clone(), + ); + + let set_id = 1; + let auth = AuthorityId::from_raw([1u8; 32]); + let peer = PeerId::random(); + + val.note_set(SetId(set_id), vec![auth.clone()], |_, _| {}); + val.note_round(Round(2), |_, _| {}); + + // add the peer making the request to the validator, + // otherwise it is discarded + let mut inner = val.inner.write(); + inner.peers.new_peer(peer.clone()); + + let res = inner.handle_catch_up_request( + &peer, + CatchUpRequestMessage { + set_id: SetId(set_id), + round: Round(10), + }, + &set_state, + ); + + // we're at round 2, a catch up request for round 10 is out of scope + assert!(res.0.is_none()); + assert_eq!(res.1, Action::Discard(cost::OUT_OF_SCOPE_MESSAGE)); + + let res = inner.handle_catch_up_request( + &peer, + CatchUpRequestMessage { + set_id: SetId(set_id), + round: Round(1), + }, + &set_state, + ); + + // a catch up request for round 1 should be answered successfully + match res.0.unwrap() { + GossipMessage::CatchUp(catch_up) => { + assert_eq!(catch_up.set_id, SetId(set_id)); + assert_eq!(catch_up.message.round_number, 1); + + assert_eq!(res.1, Action::Discard(cost::CATCH_UP_REPLY)); + }, + _ => panic!("expected catch up message"), + }; + } } diff --git a/core/finality-grandpa/src/communication/mod.rs b/core/finality-grandpa/src/communication/mod.rs index adfdf0629102..3d60af700a6e 100644 --- a/core/finality-grandpa/src/communication/mod.rs +++ b/core/finality-grandpa/src/communication/mod.rs @@ -29,7 +29,7 @@ use std::sync::Arc; -use grandpa::voter_set::VoterSet; +use grandpa::{voter, voter_set::VoterSet}; use grandpa::Message::{Prevote, Precommit, PrimaryPropose}; use futures::prelude::*; use futures::sync::{oneshot, mpsc}; @@ -42,10 +42,13 @@ use runtime_primitives::traits::{Block as BlockT, Hash as HashT, Header as Heade use network::{consensus_gossip as network_gossip, NetworkService}; use network_gossip::ConsensusMessage; -use crate::{Error, Message, SignedMessage, Commit, CompactCommit}; +use crate::{ + CatchUp, Commit, CommunicationIn, CommunicationOut, CompactCommit, Error, + Message, SignedMessage, +}; use crate::environment::HasVoted; use gossip::{ - GossipMessage, FullCommitMessage, VoteOrPrecommitMessage, GossipValidator + GossipMessage, FullCatchUpMessage, FullCommitMessage, VoteOrPrecommitMessage, GossipValidator }; use substrate_primitives::ed25519::{Public as AuthorityId, Signature as AuthoritySignature}; @@ -61,6 +64,7 @@ pub use fg_primitives::GRANDPA_ENGINE_ID; mod cost { pub(super) const PAST_REJECTION: i32 = -50; pub(super) const BAD_SIGNATURE: i32 = -100; + pub(super) const MALFORMED_CATCH_UP: i32 = -1000; pub(super) const MALFORMED_COMMIT: i32 = -1000; pub(super) const FUTURE_MESSAGE: i32 = -500; pub(super) const UNKNOWN_VOTER: i32 = -150; @@ -69,13 +73,21 @@ mod cost { pub(super) const PER_UNDECODABLE_BYTE: i32 = -5; pub(super) const PER_SIGNATURE_CHECKED: i32 = -25; pub(super) const PER_BLOCK_LOADED: i32 = -10; + pub(super) const INVALID_CATCH_UP: i32 = -5000; pub(super) const INVALID_COMMIT: i32 = -5000; pub(super) const OUT_OF_SCOPE_MESSAGE: i32 = -500; + pub(super) const CATCH_UP_REQUEST_TIMEOUT: i32 = -200; + + // cost of answering a catch up request + pub(super) const CATCH_UP_REPLY: i32 = -200; + pub(super) const HONEST_OUT_OF_SCOPE_CATCH_UP: i32 = -200; } // benefit scalars for reporting peers. mod benefit { + pub(super) const NEIGHBOR_MESSAGE: i32 = 100; pub(super) const ROUND_MESSAGE: i32 = 100; + pub(super) const BASIC_VALIDATED_CATCH_UP: i32 = 200; pub(super) const BASIC_VALIDATED_COMMIT: i32 = 100; pub(super) const PER_EQUIVOCATION: i32 = 10; } @@ -214,12 +226,6 @@ impl Stream for NetworkStream { } } -/// The result of processing a commit. -pub(crate) enum CommitProcessingOutcome { - Good, - Bad, -} - /// Bridge between the underlying network service, gossiping consensus messages and Grandpa pub(crate) struct NetworkBridge> { service: N, @@ -235,21 +241,21 @@ impl> NetworkBridge { pub(crate) fn new( service: N, config: crate::Config, - set_state: Option<&crate::environment::VoterSetState>, + set_state: crate::environment::SharedVoterSetState, on_exit: impl Future + Clone + Send + 'static, ) -> ( Self, impl futures::Future + Send + 'static, ) { - let (validator, report_stream) = GossipValidator::new(config); + let (validator, report_stream) = GossipValidator::new(config, set_state.clone()); let validator = Arc::new(validator); service.register_validator(validator.clone()); - if let Some(set_state) = set_state { + { // register all previous votes with the gossip service so that they're // available to peers potentially stuck on a previous round. - let completed = set_state.completed_rounds(); + let completed = set_state.read().completed_rounds(); let (set_id, voters) = completed.set_info(); validator.note_set(SetId(set_id), voters.to_vec(), |_, _| {}); for round in completed.iter() { @@ -422,8 +428,8 @@ impl> NetworkBridge { voters: Arc>, is_voter: bool, ) -> ( - impl Stream, impl FnMut(CommitProcessingOutcome)), Error = Error>, - impl Sink), SinkError = Error>, + impl Stream, Error = Error>, + impl Sink, SinkError = Error>, ) { self.validator.note_set( set_id, @@ -442,16 +448,123 @@ impl> NetworkBridge { self.validator.clone(), ); + let outgoing = outgoing.with(|out| { + let voter::CommunicationOut::Commit(round, commit) = out; + Ok((round, commit)) + }); + (incoming, outgoing) } } fn incoming_global>( - service: N, + mut service: N, topic: B::Hash, voters: Arc>, gossip_validator: Arc>, -) -> impl Stream, impl FnMut(CommitProcessingOutcome)), Error = Error> { +) -> impl Stream, Error = Error> { + let process_commit = move | + msg: FullCommitMessage, + mut notification: network_gossip::TopicNotification, + service: &mut N, + gossip_validator: &Arc>, + voters: &VoterSet, + | { + let precommits_signed_by: Vec = + msg.message.auth_data.iter().map(move |(_, a)| { + format!("{}", a) + }).collect(); + + telemetry!(CONSENSUS_INFO; "afg.received_commit"; + "contains_precommits_signed_by" => ?precommits_signed_by, + "target_number" => ?msg.message.target_number.clone(), + "target_hash" => ?msg.message.target_hash.clone(), + ); + + if let Err(cost) = check_compact_commit::( + &msg.message, + voters, + msg.round, + msg.set_id, + ) { + if let Some(who) = notification.sender { + service.report(who, cost); + } + + return None; + } + + let round = msg.round.0; + let commit = msg.message; + let finalized_number = commit.target_number; + let gossip_validator = gossip_validator.clone(); + let service = service.clone(); + let cb = move |outcome| match outcome { + voter::CommitProcessingOutcome::Good(_) => { + // if it checks out, gossip it. not accounting for + // any discrepancy between the actual ghost and the claimed + // finalized number. + gossip_validator.note_commit_finalized( + finalized_number, + |to, neighbor_msg| service.send_message( + to, + GossipMessage::::from(neighbor_msg).encode(), + ), + ); + + service.gossip_message(topic, notification.message.clone(), false); + } + voter::CommitProcessingOutcome::Bad(_) => { + // report peer and do not gossip. + if let Some(who) = notification.sender.take() { + service.report(who, cost::INVALID_COMMIT); + } + } + }; + + let cb = voter::Callback::Work(Box::new(cb)); + + Some(voter::CommunicationIn::Commit(round, commit, cb)) + }; + + let process_catch_up = move | + msg: FullCatchUpMessage, + mut notification: network_gossip::TopicNotification, + service: &mut N, + gossip_validator: &Arc>, + voters: &VoterSet, + | { + let gossip_validator = gossip_validator.clone(); + let service = service.clone(); + + if let Err(cost) = check_catch_up::( + &msg.message, + voters, + msg.set_id, + ) { + if let Some(who) = notification.sender { + service.report(who, cost); + } + + return None; + } + + let cb = move |outcome| { + if let voter::CatchUpProcessingOutcome::Bad(_) = outcome { + // report peer + if let Some(who) = notification.sender.take() { + service.report(who, cost::INVALID_CATCH_UP); + } + } + + gossip_validator.note_catch_up_message_processed(); + }; + + let cb = voter::Callback::Work(Box::new(cb)); + + Some(voter::CommunicationIn::CatchUp(msg.message, cb)) + }; + service.messages_for(topic) .filter_map(|notification| { // this could be optimized by decoding piecewise. @@ -463,66 +576,16 @@ fn incoming_global>( }) .filter_map(move |(notification, msg)| { match msg { - GossipMessage::Commit(msg) => { - let precommits_signed_by: Vec = - msg.message.auth_data.iter().map(move |(_, a)| { - format!("{}", a) - }).collect(); - telemetry!(CONSENSUS_INFO; "afg.received_commit"; - "contains_precommits_signed_by" => ?precommits_signed_by, - "target_number" => ?msg.message.target_number.clone(), - "target_hash" => ?msg.message.target_hash.clone(), - ); - if let Err(cost) = check_compact_commit::( - &msg.message, - &*voters, - msg.round, - msg.set_id, - ) { - if let Some(who) = notification.sender { - service.report(who, cost); - } - None - } else { - Some((msg, notification, service.clone())) - } - }, + GossipMessage::Commit(msg) => + process_commit(msg, notification, &mut service, &gossip_validator, &*voters), + GossipMessage::CatchUp(msg) => + process_catch_up(msg, notification, &mut service, &gossip_validator, &*voters), _ => { debug!(target: "afg", "Skipping unknown message type"); return None; } } }) - .map(move |(msg, mut notification, service)| { - let round = msg.round.0; - let commit = msg.message; - let finalized_number = commit.target_number; - let gossip_validator = gossip_validator.clone(); - let cb = move |outcome| match outcome { - CommitProcessingOutcome::Good => { - // if it checks out, gossip it. not accounting for - // any discrepancy between the actual ghost and the claimed - // finalized number. - gossip_validator.note_commit_finalized( - finalized_number, - |to, neighbor_msg| service.send_message( - to, - GossipMessage::::from(neighbor_msg).encode(), - ), - ); - - service.gossip_message(topic, notification.message.clone(), false); - } - CommitProcessingOutcome::Bad => { - // report peer and do not gossip. - if let Some(who) = notification.sender.take() { - service.report(who, cost::INVALID_COMMIT); - } - } - }; - - (round, commit, cb) - }) .map_err(|()| Error::Network(format!("Failed to receive message on unbounded stream"))) } @@ -657,7 +720,8 @@ impl> Sink for OutgoingMessages } } -// checks a compact commit. returns `None` if it was bad and +// checks a compact commit. returns the cost associated with processing it if +// the commit was bad. fn check_compact_commit( msg: &CompactCommit, voters: &VoterSet, @@ -716,6 +780,114 @@ fn check_compact_commit( Ok(()) } +// checks a catch up. returns the cost associated with processing it if +// the catch up was bad. +fn check_catch_up( + msg: &CatchUp, + voters: &VoterSet, + set_id: SetId, +) -> Result<(), i32> { + // 4f + 1 = equivocations from f voters. + let f = voters.total_weight() - voters.threshold(); + let full_threshold = voters.total_weight() + f; + + // check total weight is not out of range for a set of votes. + fn check_weight<'a>( + voters: &'a VoterSet, + votes: impl Iterator, + full_threshold: u64, + ) -> Result<(), i32> { + let mut total_weight = 0; + + for id in votes { + if let Some(weight) = voters.info(&id).map(|info| info.weight()) { + total_weight += weight; + if total_weight > full_threshold { + return Err(cost::MALFORMED_CATCH_UP); + } + } else { + debug!(target: "afg", "Skipping catch up message containing unknown voter {}", id); + return Err(cost::MALFORMED_CATCH_UP); + } + } + + if total_weight < voters.threshold() { + return Err(cost::MALFORMED_CATCH_UP); + } + + Ok(()) + }; + + check_weight( + voters, + msg.prevotes.iter().map(|vote| &vote.id), + full_threshold, + )?; + + check_weight( + voters, + msg.precommits.iter().map(|vote| &vote.id), + full_threshold, + )?; + + fn check_signatures<'a, B, I>( + messages: I, + round: u64, + set_id: u64, + mut signatures_checked: usize, + ) -> Result where + B: BlockT, + I: Iterator, &'a AuthorityId, &'a AuthoritySignature)>, + { + use crate::communication::gossip::Misbehavior; + + for (msg, id, sig) in messages { + signatures_checked += 1; + + if let Err(()) = check_message_sig::( + &msg, + id, + sig, + round, + set_id, + ) { + debug!(target: "afg", "Bad catch up message signature {}", id); + telemetry!(CONSENSUS_DEBUG; "afg.bad_catch_up_msg_signature"; "id" => ?id); + + let cost = Misbehavior::BadCatchUpMessage { + signatures_checked: signatures_checked as i32, + }.cost(); + + return Err(cost); + } + } + + Ok(signatures_checked) + } + + // check signatures on all contained prevotes. + let signatures_checked = check_signatures::( + msg.prevotes.iter().map(|vote| { + (grandpa::Message::Prevote(vote.prevote.clone()), &vote.id, &vote.signature) + }), + msg.round_number, + set_id.0, + 0, + )?; + + // check signatures on all contained precommits. + let _ = check_signatures::( + msg.precommits.iter().map(|vote| { + (grandpa::Message::Precommit(vote.precommit.clone()), &vote.id, &vote.signature) + }), + msg.round_number, + set_id.0, + signatures_checked, + )?; + + Ok(()) +} + /// An output sink for commit messages. struct CommitsOut> { network: N, diff --git a/core/finality-grandpa/src/communication/tests.rs b/core/finality-grandpa/src/communication/tests.rs index f2b50ab80c2b..5760b3936cd9 100644 --- a/core/finality-grandpa/src/communication/tests.rs +++ b/core/finality-grandpa/src/communication/tests.rs @@ -26,6 +26,7 @@ use std::sync::Arc; use keyring::AuthorityKeyring; use parity_codec::Encode; +use crate::environment::SharedVoterSetState; use super::gossip::{self, GossipValidator}; use super::{AuthorityId, VoterSet, Round, SetId}; @@ -92,6 +93,18 @@ impl super::Network for TestNetwork { } } +impl network_gossip::ValidatorContext for TestNetwork { + fn broadcast_topic(&mut self, _: Hash, _: bool) { } + + fn broadcast_message(&mut self, _: Hash, _: Vec, _: bool) { } + + fn send_message(&mut self, who: &network::PeerId, data: Vec) { + >::send_message(self, vec![who.clone()], data); + } + + fn send_topic(&mut self, _: &network::PeerId, _: Hash, _: bool) { } +} + struct Tester { net_handle: super::NetworkBridge, gossip_validator: Arc>, @@ -125,8 +138,38 @@ fn config() -> crate::Config { } } +// dummy voter set state +fn voter_set_state() -> SharedVoterSetState { + use crate::authorities::AuthoritySet; + use crate::environment::{CompletedRound, CompletedRounds, HasVoted, VoterSetState}; + use grandpa::round::State as RoundState; + use substrate_primitives::H256; + + let state = RoundState::genesis((H256::zero(), 0)); + let base = state.prevote_ghost.unwrap(); + let voters = AuthoritySet::genesis(Vec::new()); + let set_state = VoterSetState::Live { + completed_rounds: CompletedRounds::new( + CompletedRound { + state, + number: 0, + votes: Vec::new(), + base, + }, + 0, + &voters, + ), + current_round: HasVoted::No, + }; + + set_state.into() +} + // needs to run in a tokio runtime. -fn make_test_network() -> impl Future { +fn make_test_network() -> ( + impl Future, + TestNetwork, +) { let (tx, rx) = mpsc::unbounded(); let net = TestNetwork { sender: tx }; @@ -145,15 +188,18 @@ fn make_test_network() -> impl Future { let (bridge, startup_work) = super::NetworkBridge::new( net.clone(), config(), - None, + voter_set_state(), Exit, ); - startup_work.map(move |()| Tester { - gossip_validator: bridge.validator.clone(), - net_handle: bridge, - events: rx, - }) + ( + startup_work.map(move |()| Tester { + gossip_validator: bridge.validator.clone(), + net_handle: bridge, + events: rx, + }), + net, + ) } fn make_ids(keys: &[AuthorityKeyring]) -> Vec<(AuthorityId, u64)> { @@ -217,7 +263,7 @@ fn good_commit_leads_to_relay() { let id = network::PeerId::random(); let global_topic = super::global_topic::(set_id); - let test = make_test_network() + let test = make_test_network().0 .and_then(move |tester| { // register a peer. tester.gossip_validator.new_peer(&mut NoopContext, &id, network::config::Roles::FULL); @@ -228,7 +274,7 @@ fn good_commit_leads_to_relay() { let (commits_in, _) = tester.net_handle.global_communication(SetId(1), voter_set, false); { - let (action, _) = tester.gossip_validator.do_validate(&id, &encoded_commit[..]); + let (action, ..) = tester.gossip_validator.do_validate(&id, &encoded_commit[..]); match action { gossip::Action::ProcessAndDiscard(t, _) => assert_eq!(t, global_topic), _ => panic!("wrong expected outcome from initial commit validation"), @@ -257,8 +303,12 @@ fn good_commit_leads_to_relay() { // when the commit comes in, we'll tell the callback it was good. let handle_commit = commits_in.into_future() .map(|(item, _)| { - let (_, _, mut callback) = item.unwrap(); - (callback)(super::CommitProcessingOutcome::Good); + match item.unwrap() { + grandpa::voter::CommunicationIn::Commit(_, _, mut callback) => { + callback.run(grandpa::voter::CommitProcessingOutcome::good()); + }, + _ => panic!("commit expected"), + } }) .map_err(|_| panic!("could not process commit")); @@ -328,7 +378,7 @@ fn bad_commit_leads_to_report() { let id = network::PeerId::random(); let global_topic = super::global_topic::(set_id); - let test = make_test_network() + let test = make_test_network().0 .and_then(move |tester| { // register a peer. tester.gossip_validator.new_peer(&mut NoopContext, &id, network::config::Roles::FULL); @@ -339,7 +389,7 @@ fn bad_commit_leads_to_report() { let (commits_in, _) = tester.net_handle.global_communication(SetId(1), voter_set, false); { - let (action, _) = tester.gossip_validator.do_validate(&id, &encoded_commit[..]); + let (action, ..) = tester.gossip_validator.do_validate(&id, &encoded_commit[..]); match action { gossip::Action::ProcessAndDiscard(t, _) => assert_eq!(t, global_topic), _ => panic!("wrong expected outcome from initial commit validation"), @@ -368,8 +418,12 @@ fn bad_commit_leads_to_report() { // when the commit comes in, we'll tell the callback it was good. let handle_commit = commits_in.into_future() .map(|(item, _)| { - let (_, _, mut callback) = item.unwrap(); - (callback)(super::CommitProcessingOutcome::Bad); + match item.unwrap() { + grandpa::voter::CommunicationIn::Commit(_, _, mut callback) => { + callback.run(grandpa::voter::CommitProcessingOutcome::bad()); + }, + _ => panic!("commit expected"), + } }) .map_err(|_| panic!("could not process commit")); @@ -393,3 +447,61 @@ fn bad_commit_leads_to_report() { current_thread::block_on_all(test).unwrap(); } + +#[test] +fn peer_with_higher_view_leads_to_catch_up_request() { + let id = network::PeerId::random(); + + let (tester, mut net) = make_test_network(); + let test = tester + .and_then(move |tester| { + // register a peer. + tester.gossip_validator.new_peer(&mut NoopContext, &id, network::config::Roles::FULL); + Ok((tester, id)) + }) + .and_then(move |(tester, id)| { + // send neighbor message at round 10 and height 50 + let result = tester.gossip_validator.validate( + &mut net, + &id, + &gossip::GossipMessage::::from(gossip::NeighborPacket { + set_id: SetId(0), + round: Round(10), + commit_finalized_height: 50, + }).encode(), + ); + + // neighbor packets are always discard + match result { + network_gossip::ValidationResult::Discard => {}, + _ => panic!("wrong expected outcome from neighbor validation"), + } + + // a catch up request should be sent to the peer for round - 1 + tester.filter_network_events(move |event| match event { + Event::SendMessage(peers, message) => { + assert_eq!( + peers, + vec![id.clone()], + ); + + assert_eq!( + message, + gossip::GossipMessage::::CatchUpRequest( + gossip::CatchUpRequestMessage { + set_id: SetId(0), + round: Round(9), + } + ).encode(), + ); + + true + }, + _ => false, + }) + .map_err(|_| panic!("could not watch for peer send message")) + .map(|_| ()) + }); + + current_thread::block_on_all(test).unwrap(); +} diff --git a/core/finality-grandpa/src/environment.rs b/core/finality-grandpa/src/environment.rs index 9d5078116df2..414e3ca0ab43 100644 --- a/core/finality-grandpa/src/environment.rs +++ b/core/finality-grandpa/src/environment.rs @@ -30,7 +30,7 @@ use client::{ }; use grandpa::{ BlockNumberOps, Equivocation, Error as GrandpaError, round::State as RoundState, - voter, voter_set::VoterSet, HistoricalVotes, + voter, voter_set::VoterSet, }; use runtime_primitives::generic::BlockId; use runtime_primitives::traits::{ @@ -50,9 +50,17 @@ use crate::authorities::{AuthoritySet, SharedAuthoritySet}; use crate::consensus_changes::SharedConsensusChanges; use crate::justification::GrandpaJustification; use crate::until_imported::UntilVoteTargetImported; -use fg_primitives::AuthorityId; +use fg_primitives::{AuthorityId, AuthoritySignature}; -/// Data about a completed round. +type HistoricalVotes = grandpa::HistoricalVotes< + ::Hash, + NumberFor, + AuthoritySignature, + AuthorityId, +>; + +/// Data about a completed round. The set of votes that is stored must be +/// minimal, i.e. at most one equivocation is stored per voter. #[derive(Debug, Clone, Decode, Encode, PartialEq)] pub struct CompletedRound { /// The round number. @@ -177,6 +185,16 @@ impl VoterSetState { completed_rounds.clone(), } } + + /// Returns the last completed round. + pub(crate) fn last_completed_round(&self) -> CompletedRound { + match self { + VoterSetState::Live { completed_rounds, .. } => + completed_rounds.last().clone(), + VoterSetState::Paused { completed_rounds } => + completed_rounds.last().clone(), + } + } } /// Whether we've voted already during a prior run of the program. @@ -636,7 +654,7 @@ where round: u64, state: RoundState>, base: (Block::Hash, NumberFor), - votes: &HistoricalVotes, Self::Signature, Self::Id>, + historical_votes: &HistoricalVotes, ) -> Result<(), Self::Error> { debug!( target: "afg", "Voter {} completed round {} in set {}. Estimate = {:?}, Finalized in round = {:?}", @@ -650,12 +668,15 @@ where self.update_voter_set_state(|voter_set_state| { let mut completed_rounds = voter_set_state.completed_rounds(); + // TODO: Future integration will store the prevote and precommit index. See #2611. + let votes = historical_votes.seen().clone(); + // NOTE: the Environment assumes that rounds are *always* completed in-order. if !completed_rounds.push(CompletedRound { number: round, state: state.clone(), base, - votes: votes.seen().to_owned(), + votes, }) { let msg = "Voter completed round that is older than the last completed round."; return Err(Error::Safety(msg.to_string())); diff --git a/core/finality-grandpa/src/lib.rs b/core/finality-grandpa/src/lib.rs index 7d82c0f2e448..65c5f6330887 100644 --- a/core/finality-grandpa/src/lib.rs +++ b/core/finality-grandpa/src/lib.rs @@ -102,7 +102,7 @@ pub use observer::run_grandpa_observer; use aux_schema::PersistentData; use environment::{CompletedRound, CompletedRounds, Environment, HasVoted, SharedVoterSetState, VoterSetState}; use import::GrandpaBlockImport; -use until_imported::UntilCommitBlocksImported; +use until_imported::UntilGlobalMessageBlocksImported; use communication::NetworkBridge; use service::TelemetryOnConnect; use fg_primitives::AuthoritySignature; @@ -129,19 +129,64 @@ pub type PrimaryPropose = grandpa::PrimaryPropose<::Hash pub type Prevote = grandpa::Prevote<::Hash, NumberFor>; /// A precommit message for this chain's block type. pub type Precommit = grandpa::Precommit<::Hash, NumberFor>; +/// A catch up message for this chain's block type. +pub type CatchUp = grandpa::CatchUp< + ::Hash, + NumberFor, + AuthoritySignature, + AuthorityId, +>; /// A commit message for this chain's block type. pub type Commit = grandpa::Commit< ::Hash, NumberFor, AuthoritySignature, - AuthorityId + AuthorityId, >; /// A compact commit message for this chain's block type. pub type CompactCommit = grandpa::CompactCommit< ::Hash, NumberFor, AuthoritySignature, - AuthorityId + AuthorityId, +>; +/// A global communication input stream for commits and catch up messages. Not +/// exposed publicly, used internally to simplify types in the communication +/// layer. +type CommunicationIn = grandpa::voter::CommunicationIn< + ::Hash, + NumberFor, + AuthoritySignature, + AuthorityId, +>; + +/// Global communication input stream for commits and catch up messages, with +/// the hash type not being derived from the block, useful for forcing the hash +/// to some type (e.g. `H256`) when the compiler can't do the inference. +type CommunicationInH = grandpa::voter::CommunicationIn< + H, + NumberFor, + AuthoritySignature, + AuthorityId, +>; + +/// A global communication sink for commits. Not exposed publicly, used +/// internally to simplify types in the communication layer. +type CommunicationOut = grandpa::voter::CommunicationOut< + ::Hash, + NumberFor, + AuthoritySignature, + AuthorityId, +>; + +/// Global communication sink for commits with the hash type not being derived +/// from the block, useful for forcing the hash to some type (e.g. `H256`) when +/// the compiler can't do the inference. +type CommunicationOutH = grandpa::voter::CommunicationOut< + H, + NumberFor, + AuthoritySignature, + AuthorityId, >; /// Configuration for the GRANDPA service. @@ -358,11 +403,11 @@ fn global_communication, B, E, N, RA>( network: &NetworkBridge, ) -> ( impl Stream< - Item = voter::CommunicationIn, AuthoritySignature, AuthorityId>, + Item = CommunicationInH, Error = CommandOrError>, >, impl Sink< - SinkItem = voter::CommunicationOut, AuthoritySignature, AuthorityId>, + SinkItem = CommunicationOutH, SinkError = CommandOrError>, >, ) where @@ -378,37 +423,21 @@ fn global_communication, B, E, N, RA>( .unwrap_or(false); // verification stream - let (commit_in, commit_out) = network.global_communication( + let (global_in, global_out) = network.global_communication( communication::SetId(set_id), voters.clone(), is_voter, ); - // block commit messages until relevant blocks are imported. - let commit_in = UntilCommitBlocksImported::new( + // block commit and catch up messages until relevant blocks are imported. + let global_in = UntilGlobalMessageBlocksImported::new( client.import_notification_stream(), client.clone(), - commit_in, + global_in, ); - let commits_in = commit_in.map_err(CommandOrError::from); - let commits_out = commit_out.sink_map_err(CommandOrError::from); - - let global_in = commits_in.map(|(round, commit, mut callback)| { - let callback = voter::Callback::Work(Box::new(move |outcome| match outcome { - voter::CommitProcessingOutcome::Good(_) => - callback(communication::CommitProcessingOutcome::Good), - voter::CommitProcessingOutcome::Bad(_) => - callback(communication::CommitProcessingOutcome::Bad), - })); - voter::CommunicationIn::Commit(round, commit, callback) - }); - - // NOTE: eventually this will also handle catch-up requests - let global_out = commits_out.with(|global| match global { - voter::CommunicationOut::Commit(round, commit) => Ok((round, commit)), - _ => unimplemented!(), - }); + let global_in = global_in.map_err(CommandOrError::from); + let global_out = global_out.sink_map_err(CommandOrError::from); (global_in, global_out) } @@ -497,7 +526,7 @@ pub fn run_grandpa_voter, N, RA, SC, X>( let (network, network_startup) = NetworkBridge::new( network, config.clone(), - Some(&set_state.read()), + set_state.clone(), on_exit.clone(), ); diff --git a/core/finality-grandpa/src/observer.rs b/core/finality-grandpa/src/observer.rs index 4738fd3ed633..cd55211cfed8 100644 --- a/core/finality-grandpa/src/observer.rs +++ b/core/finality-grandpa/src/observer.rs @@ -30,7 +30,7 @@ use runtime_primitives::traits::{NumberFor, Block as BlockT}; use substrate_primitives::{H256, Blake2Hasher}; use crate::{ - AuthoritySignature, global_communication, CommandOrError, Config, environment, + global_communication, CommandOrError, CommunicationIn, Config, environment, LinkHalf, Network, aux_schema::PersistentData, VoterCommand, VoterSetState, }; use crate::authorities::SharedAuthoritySet; @@ -70,7 +70,7 @@ fn grandpa_observer, RA, S>( E: CallExecutor + Send + Sync, RA: Send + Sync, S: Stream< - Item = voter::CommunicationIn, AuthoritySignature, AuthorityId>, + Item = CommunicationIn, Error = CommandOrError>, >, { @@ -85,8 +85,8 @@ fn grandpa_observer, RA, S>( let commit = grandpa::Commit::from(commit); (round, commit, callback) }, - voter::CommunicationIn::Auxiliary(_) => { - // ignore aux messages + voter::CommunicationIn::CatchUp(..) => { + // ignore catch up messages return future::ok(last_finalized_number); }, }; @@ -167,15 +167,9 @@ pub fn run_grandpa_observer, N, RA, SC>( } = link; let PersistentData { authority_set, consensus_changes, set_state } = persistent_data; + let initial_state = (authority_set, consensus_changes, set_state.clone(), voter_commands_rx.into_future()); - let (network, network_startup) = NetworkBridge::new( - network, - config.clone(), - None, - on_exit.clone(), - ); - - let initial_state = (authority_set, consensus_changes, set_state, voter_commands_rx.into_future()); + let (network, network_startup) = NetworkBridge::new(network, config.clone(), set_state, on_exit.clone()); let observer_work = future::loop_fn(initial_state, move |state| { let (authority_set, consensus_changes, set_state, voter_commands_rx) = state; diff --git a/core/finality-grandpa/src/tests.rs b/core/finality-grandpa/src/tests.rs index 8afa49533439..010bd7a091f5 100644 --- a/core/finality-grandpa/src/tests.rs +++ b/core/finality-grandpa/src/tests.rs @@ -1181,6 +1181,7 @@ fn voter_persists_its_votes() { // sender is dropped the voter is stopped. { let net = net.clone(); + let client = client.clone(); let voter = future::loop_fn(voter_rx, move |rx| { let (_block_import, _, _, _, link) = net.lock().make_block_import(client.clone()); @@ -1244,11 +1245,19 @@ fn voter_persists_its_votes() { local_key: Some(Arc::new(peers[1].clone().into())), name: Some(format!("peer#{}", 1)), }; + + let set_state = { + let (_, _, _, _, link) = net.lock().make_block_import(client); + let LinkHalf { persistent_data, .. } = link.lock().take().unwrap(); + let PersistentData { set_state, .. } = persistent_data; + set_state + }; + let routing = MessageRouting::new(net.clone(), 1); let (network, routing_work) = communication::NetworkBridge::new( routing, config.clone(), - None, + set_state, Exit, ); runtime.block_on(routing_work).unwrap(); @@ -1480,3 +1489,111 @@ fn empty_finality_proof_is_returned_to_light_client_when_authority_set_is_differ if FORCE_CHANGE { 0 } else { 10 }, ); } + +#[test] +fn voter_catches_up_to_latest_round_when_behind() { + let _ = env_logger::try_init(); + + let peers = &[AuthorityKeyring::Alice, AuthorityKeyring::Bob]; + let voters = make_ids(peers); + + let mut net = GrandpaTestNet::new(TestApi::new(voters), 3); + net.peer(0).push_blocks(50, false); + net.sync(); + + let net = Arc::new(Mutex::new(net)); + let mut finality_notifications = Vec::new(); + + let mut runtime = current_thread::Runtime::new().unwrap(); + + let voter = |local_key, peer_id, link, net| -> Box + Send> { + let grandpa_params = GrandpaParams { + config: Config { + gossip_duration: TEST_GOSSIP_DURATION, + justification_period: 32, + local_key, + name: Some(format!("peer#{}", peer_id)), + }, + link: link, + network: MessageRouting::new(net, peer_id), + inherent_data_providers: InherentDataProviders::new(), + on_exit: Exit, + telemetry_on_connect: None, + }; + + Box::new(run_grandpa_voter(grandpa_params).expect("all in order with client and network")) + }; + + // spawn authorities + for (peer_id, key) in peers.iter().enumerate() { + let (client, link) = { + let net = net.lock(); + let link = net.peers[peer_id].data.lock().take().expect("link initialized at startup; qed"); + ( + net.peers[peer_id].client().clone(), + link, + ) + }; + + finality_notifications.push( + client.finality_notification_stream() + .take_while(|n| Ok(n.header.number() < &50)) + .for_each(move |_| Ok(())) + ); + + let voter = voter(Some(Arc::new((*key).into())), peer_id, link, net.clone()); + + runtime.spawn(voter); + } + + // wait for them to finalize block 50. since they'll vote on 3/4 of the + // unfinalized chain it will take at least 4 rounds to do it. + let wait_for_finality = ::futures::future::join_all(finality_notifications) + .map(|_| ()) + .map_err(|_| ()); + + // spawn a new voter, it should be behind by at least 4 rounds and should be + // able to catch up to the latest round + let test = { + let net = net.clone(); + let runtime = runtime.handle(); + + wait_for_finality.and_then(move |_| { + let peer_id = 2; + let (client, link) = { + let net = net.lock(); + let link = net.peers[peer_id].data.lock().take().expect("link initialized at startup; qed"); + ( + net.peers[peer_id].client().clone(), + link, + ) + }; + + let set_state = link.persistent_data.set_state.clone(); + + let wait = client.finality_notification_stream() + .take_while(|n| Ok(n.header.number() < &50)) + .collect() + .map(|_| set_state); + + let voter = voter(None, peer_id, link, net); + + runtime.spawn(voter).unwrap(); + + wait + }) + .and_then(|set_state| { + // the last completed round in the new voter is higher than 4 + // which means it caught up to the voters + assert!(set_state.read().last_completed_round().number >= 4); + Ok(()) + }) + }; + + let drive_to_completion = ::tokio::timer::Interval::new_interval(TEST_ROUTING_INTERVAL) + .for_each(move |_| { net.lock().sync_without_disconnects(); Ok(()) }) + .map(|_| ()) + .map_err(|_| ()); + + let _ = runtime.block_on(test.select(drive_to_completion).map_err(|_| ())).unwrap(); +} diff --git a/core/finality-grandpa/src/until_imported.rs b/core/finality-grandpa/src/until_imported.rs index daef7cafb1f4..d10c2ac8e7db 100644 --- a/core/finality-grandpa/src/until_imported.rs +++ b/core/finality-grandpa/src/until_imported.rs @@ -20,12 +20,13 @@ //! //! This is used for votes and commit messages currently. -use super::{BlockStatus, Error, SignedMessage, CompactCommit}; +use super::{BlockStatus, CommunicationIn, Error, SignedMessage}; use log::{debug, warn}; use client::ImportNotifications; use futures::prelude::*; use futures::stream::Fuse; +use grandpa::voter; use parking_lot::Mutex; use runtime_primitives::traits::{Block as BlockT, Header as HeaderT, NumberFor}; use tokio_timer::Interval; @@ -253,18 +254,18 @@ impl BlockUntilImported for SignedMessage { /// signed messages are imported. pub(crate) type UntilVoteTargetImported = UntilImported>; -/// This blocks a commit message's import until all blocks -/// referenced in its votes are known. +/// This blocks a global message import, i.e. a commit or catch up messages, +/// until all blocks referenced in its votes are known. /// -/// This is used for compact commits which have already been checked for -/// structural soundness. -pub(crate) struct BlockCommitMessage { - inner: Arc<(AtomicUsize, Mutex, U)>>)>, +/// This is used for compact commits and catch up messages which have already +/// been checked for structural soundness (e.g. valid signatures). +pub(crate) struct BlockGlobalMessage { + inner: Arc<(AtomicUsize, Mutex>>)>, target_number: NumberFor, } -impl BlockUntilImported for BlockCommitMessage { - type Blocked = (u64, CompactCommit, U); +impl BlockUntilImported for BlockGlobalMessage { + type Blocked = CommunicationIn; fn schedule_wait( input: Self::Blocked, @@ -298,7 +299,7 @@ impl BlockUntilImported for BlockCommitMessage Result { - // check integrity: all precommits for same hash have same number. + // check integrity: all votes for same hash have same number. let canon_number = match checked_hashes.entry(target_hash) { Entry::Occupied(entry) => entry.get().number().clone(), Entry::Vacant(entry) => { @@ -315,51 +316,68 @@ impl BlockUntilImported for BlockCommitMessage { + // add known hashes from all precommits. + let precommit_targets = commit.precommits + .iter() + .map(|c| (c.target_number, c.target_hash)); - // add known hashes from the precommits. - for precommit in &commit.precommits { - let target_number = precommit.target_number; - let target_hash = precommit.target_hash; + for (target_number, target_hash) in precommit_targets { + if !query_known(target_hash, target_number)? { + return Ok(()) + } + } + }, + voter::CommunicationIn::CatchUp(ref catch_up, ..) => { + // add known hashes from all prevotes and precommits. + let prevote_targets = catch_up.prevotes + .iter() + .map(|s| (s.prevote.target_number, s.prevote.target_hash)); - if !query_known(target_hash, target_number)? { - return Ok(()) - } - } + let precommit_targets = catch_up.precommits + .iter() + .map(|s| (s.precommit.target_number, s.precommit.target_hash)); - // see if commit target hash is known. - if !query_known(commit.target_hash, commit.target_number)? { - return Ok(()) - } + let targets = prevote_targets.chain(precommit_targets); + + for (target_number, target_hash) in targets { + if !query_known(target_hash, target_number)? { + return Ok(()) + } + } + }, + }; } - // none of the hashes in the commit message were unknown. - // we can just return the commit directly. + // none of the hashes in the global message were unknown. + // we can just return the message directly. if unknown_count == 0 { ready(input); return Ok(()) } - let locked_commit = Arc::new((AtomicUsize::new(unknown_count), Mutex::new(Some(input)))); + let locked_global = Arc::new((AtomicUsize::new(unknown_count), Mutex::new(Some(input)))); // schedule waits for all unknown messages. // when the last one of these has `wait_completed` called on it, - // the commit will be returned. + // the global message will be returned. // // in the future, we may want to issue sync requests to the network // if this is taking a long time. for (hash, is_known) in checked_hashes { if let KnownOrUnknown::Unknown(target_number) = is_known { - wait(hash, BlockCommitMessage { - inner: locked_commit.clone(), + wait(hash, BlockGlobalMessage { + inner: locked_global.clone(), target_number, }) } @@ -398,18 +416,19 @@ impl BlockUntilImported for BlockCommitMessage = UntilImported< +/// A stream which gates off incoming global messages, i.e. commit and catch up +/// messages, until all referenced block hashes have been imported. +pub(crate) type UntilGlobalMessageBlocksImported = UntilImported< Block, Status, I, - BlockCommitMessage, + BlockGlobalMessage, >; #[cfg(test)] mod tests { use super::*; + use crate::{CatchUp, CompactCommit}; use tokio::runtime::current_thread::Runtime; use tokio_timer::Delay; use test_client::runtime::{Block, Hash, Header}; @@ -474,41 +493,73 @@ mod tests { ) } - #[test] - fn blocking_commit_message() { - let h1 = make_header(5); - let h2 = make_header(6); - let h3 = make_header(7); + // unwrap the commit from `CommunicationIn` returning its fields in a tuple, + // panics if the given message isn't a commit + fn unapply_commit(msg: CommunicationIn) -> (u64, CompactCommit::) { + match msg { + voter::CommunicationIn::Commit(round, commit, ..) => (round, commit), + _ => panic!("expected commit"), + } + } + + // unwrap the catch up from `CommunicationIn` returning its inner representation, + // panics if the given message isn't a catch up + fn unapply_catch_up(msg: CommunicationIn) -> CatchUp { + match msg { + voter::CommunicationIn::CatchUp(catch_up, ..) => catch_up, + _ => panic!("expected catch up"), + } + } + fn message_all_dependencies_satisfied( + msg: CommunicationIn, + enact_dependencies: F, + ) -> CommunicationIn where + F: FnOnce(&TestChainState), + { let (chain_state, import_notifications) = TestChainState::new(); let block_status = chain_state.block_status(); - let unknown_commit = CompactCommit:: { - target_hash: h1.hash(), - target_number: 5, - precommits: vec![ - Precommit { - target_hash: h2.hash(), - target_number: 6, - }, - Precommit { - target_hash: h3.hash(), - target_number: 7, - }, - ], - auth_data: Vec::new(), // not used - }; + // enact all dependencies before importing the message + enact_dependencies(&chain_state); - let (commit_tx, commit_rx) = mpsc::unbounded(); + let (global_tx, global_rx) = mpsc::unbounded(); - let until_imported = UntilCommitBlocksImported::new( + let until_imported = UntilGlobalMessageBlocksImported::new( import_notifications, block_status, - commit_rx.map_err(|_| panic!("should never error")), + global_rx.map_err(|_| panic!("should never error")), ); - commit_tx.unbounded_send((0, unknown_commit.clone(), ())).unwrap(); + global_tx.unbounded_send(msg).unwrap(); + + let work = until_imported.into_future(); + + let mut runtime = Runtime::new().unwrap(); + runtime.block_on(work).map_err(|(e, _)| e).unwrap().0.unwrap() + } + fn blocking_message_on_dependencies( + msg: CommunicationIn, + enact_dependencies: F, + ) -> CommunicationIn where + F: FnOnce(&TestChainState), + { + let (chain_state, import_notifications) = TestChainState::new(); + let block_status = chain_state.block_status(); + + let (global_tx, global_rx) = mpsc::unbounded(); + + let until_imported = UntilGlobalMessageBlocksImported::new( + import_notifications, + block_status, + global_rx.map_err(|_| panic!("should never error")), + ); + + global_tx.unbounded_send(msg).unwrap(); + + // NOTE: needs to be cloned otherwise it is moved to the stream and + // dropped too early. let inner_chain_state = chain_state.clone(); let work = until_imported .into_future() @@ -518,26 +569,64 @@ mod tests { Ok(Either::A(_)) => panic!("timeout should have fired first"), Ok(Either::B((_, until_imported))) => { // timeout fired. push in the headers. - inner_chain_state.import_header(h1); - inner_chain_state.import_header(h2); - inner_chain_state.import_header(h3); + enact_dependencies(&inner_chain_state); until_imported } }); let mut runtime = Runtime::new().unwrap(); - assert_eq!(runtime.block_on(work).map_err(|(e, _)| e).unwrap().0, Some((0, unknown_commit, ()))); + runtime.block_on(work).map_err(|(e, _)| e).unwrap().0.unwrap() } #[test] - fn commit_message_all_known() { + fn blocking_commit_message() { let h1 = make_header(5); let h2 = make_header(6); let h3 = make_header(7); - let (chain_state, import_notifications) = TestChainState::new(); - let block_status = chain_state.block_status(); + let unknown_commit = CompactCommit:: { + target_hash: h1.hash(), + target_number: 5, + precommits: vec![ + Precommit { + target_hash: h2.hash(), + target_number: 6, + }, + Precommit { + target_hash: h3.hash(), + target_number: 7, + }, + ], + auth_data: Vec::new(), // not used + }; + + let unknown_commit = || voter::CommunicationIn::Commit( + 0, + unknown_commit.clone(), + voter::Callback::Blank, + ); + + let res = blocking_message_on_dependencies( + unknown_commit(), + |chain_state| { + chain_state.import_header(h1); + chain_state.import_header(h2); + chain_state.import_header(h3); + }, + ); + + assert_eq!( + unapply_commit(res), + unapply_commit(unknown_commit()), + ); + } + + #[test] + fn commit_message_all_known() { + let h1 = make_header(5); + let h2 = make_header(6); + let h3 = make_header(7); let known_commit = CompactCommit:: { target_hash: h1.hash(), @@ -555,23 +644,156 @@ mod tests { auth_data: Vec::new(), // not used }; - chain_state.import_header(h1); - chain_state.import_header(h2); - chain_state.import_header(h3); + let known_commit = || voter::CommunicationIn::Commit( + 0, + known_commit.clone(), + voter::Callback::Blank, + ); - let (commit_tx, commit_rx) = mpsc::unbounded(); + let res = message_all_dependencies_satisfied( + known_commit(), + |chain_state| { + chain_state.import_header(h1); + chain_state.import_header(h2); + chain_state.import_header(h3); + }, + ); - let until_imported = UntilCommitBlocksImported::new( - import_notifications, - block_status, - commit_rx.map_err(|_| panic!("should never error")), + assert_eq!( + unapply_commit(res), + unapply_commit(known_commit()), ); + } - commit_tx.unbounded_send((0, known_commit.clone(), ())).unwrap(); + #[test] + fn blocking_catch_up_message() { + let h1 = make_header(5); + let h2 = make_header(6); + let h3 = make_header(7); - let work = until_imported.into_future(); + let signed_prevote = |header: &Header| { + grandpa::SignedPrevote { + id: Default::default(), + signature: Default::default(), + prevote: grandpa::Prevote { + target_hash: header.hash(), + target_number: *header.number(), + }, + } + }; - let mut runtime = Runtime::new().unwrap(); - assert_eq!(runtime.block_on(work).map_err(|(e, _)| e).unwrap().0, Some((0, known_commit, ()))); + let signed_precommit = |header: &Header| { + grandpa::SignedPrecommit { + id: Default::default(), + signature: Default::default(), + precommit: grandpa::Precommit { + target_hash: header.hash(), + target_number: *header.number(), + }, + } + }; + + let prevotes = vec![ + signed_prevote(&h1), + signed_prevote(&h3), + ]; + + let precommits = vec![ + signed_precommit(&h1), + signed_precommit(&h2), + ]; + + let unknown_catch_up = grandpa::CatchUp { + round_number: 1, + prevotes, + precommits, + base_hash: h1.hash(), + base_number: *h1.number(), + }; + + let unknown_catch_up = || voter::CommunicationIn::CatchUp( + unknown_catch_up.clone(), + voter::Callback::Blank, + ); + + let res = blocking_message_on_dependencies( + unknown_catch_up(), + |chain_state| { + chain_state.import_header(h1); + chain_state.import_header(h2); + chain_state.import_header(h3); + }, + ); + + assert_eq!( + unapply_catch_up(res), + unapply_catch_up(unknown_catch_up()), + ); + } + + #[test] + fn catch_up_message_all_known() { + let h1 = make_header(5); + let h2 = make_header(6); + let h3 = make_header(7); + + let signed_prevote = |header: &Header| { + grandpa::SignedPrevote { + id: Default::default(), + signature: Default::default(), + prevote: grandpa::Prevote { + target_hash: header.hash(), + target_number: *header.number(), + }, + } + }; + + let signed_precommit = |header: &Header| { + grandpa::SignedPrecommit { + id: Default::default(), + signature: Default::default(), + precommit: grandpa::Precommit { + target_hash: header.hash(), + target_number: *header.number(), + }, + } + }; + + let prevotes = vec![ + signed_prevote(&h1), + signed_prevote(&h3), + ]; + + let precommits = vec![ + signed_precommit(&h1), + signed_precommit(&h2), + ]; + + let unknown_catch_up = grandpa::CatchUp { + round_number: 1, + prevotes, + precommits, + base_hash: h1.hash(), + base_number: *h1.number(), + }; + + let unknown_catch_up = || voter::CommunicationIn::CatchUp( + unknown_catch_up.clone(), + voter::Callback::Blank, + ); + + let res = message_all_dependencies_satisfied( + unknown_catch_up(), + |chain_state| { + chain_state.import_header(h1); + chain_state.import_header(h2); + chain_state.import_header(h3); + }, + ); + + assert_eq!( + unapply_catch_up(res), + unapply_catch_up(unknown_catch_up()), + ); } } diff --git a/core/network/src/protocol/consensus_gossip.rs b/core/network/src/protocol/consensus_gossip.rs index a1c9783b91be..f1343269596a 100644 --- a/core/network/src/protocol/consensus_gossip.rs +++ b/core/network/src/protocol/consensus_gossip.rs @@ -432,13 +432,14 @@ impl ConsensusGossip { } let engine_id = message.engine_id; - //validate the message + // validate the message let validation = self.validators.get(&engine_id) .cloned() .map(|v| { let mut context = NetworkContext { gossip: self, protocol, engine_id }; v.validate(&mut context, &who, &message.data) }); + let validation_result = match validation { Some(ValidationResult::ProcessAndKeep(topic)) => Some((topic, true)), Some(ValidationResult::ProcessAndDiscard(topic)) => Some((topic, false)),