From 6d319cd775905b09e56d1fee0a2094afd3254726 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9=20Silva?= Date: Wed, 6 Nov 2019 16:20:53 +0000 Subject: [PATCH 1/9] grandpa: stricter gossip message filtering --- .../src/communication/gossip.rs | 78 ++++++++++++++++++- core/network/src/protocol/consensus_gossip.rs | 71 +++++++++++++---- 2 files changed, 134 insertions(+), 15 deletions(-) diff --git a/core/finality-grandpa/src/communication/gossip.rs b/core/finality-grandpa/src/communication/gossip.rs index efcd1d48c67f7..f10d5382ac819 100644 --- a/core/finality-grandpa/src/communication/gossip.rs +++ b/core/finality-grandpa/src/communication/gossip.rs @@ -92,6 +92,7 @@ use substrate_telemetry::{telemetry, CONSENSUS_DEBUG}; use log::{trace, debug, warn}; use futures::prelude::*; use futures::sync::mpsc; +use rand::Rng; use crate::{environment, CatchUp, CompactCommit, SignedMessage}; use super::{cost, benefit, Round, SetId}; @@ -483,6 +484,14 @@ impl Peers { fn peer<'a>(&'a self, who: &PeerId) -> Option<&'a PeerInfo> { self.inner.get(who) } + + fn authorities(&self) -> usize { + self.inner.iter().filter(|(_, info)| info.roles.is_authority()).count() + } + + fn non_authorities(&self) -> usize { + self.inner.iter().filter(|(_, info)| !info.roles.is_authority()).count() + } } #[derive(Debug, PartialEq)] @@ -980,6 +989,64 @@ impl Inner { (true, report) } + + /// The initial logic for filtering messages follows the given state + /// transitions. + /// + /// - State 0: not allowed to anyone (only if our local node is not an authority) + /// - State 1: allowed to random `sqrt(authorities)` + /// - State 2: allowed to all authorities + /// - State 3: allowed to random `sqrt(non-authorities)` + /// - State 4: allowed to all non-authorities + /// + /// Transitions will be triggered on repropagation attempts by the + /// underlying gossip layer, which should happen every 30 seconds. + fn message_allowed(&self, peer: &PeerInfo, mut previous_attempts: usize) -> bool { + if !self.config.is_authority && previous_attempts == 0 { + // non-authority nodes don't gossip any messages right away. we + // assume that authorities (and sentries) are strongly connected, so + // it should be unnecessary for non-authorities to gossip all + // messages right away. + return false; + } + + if !self.config.is_authority { + // since the node is not an authority we skipped the initial attempt + // to gossip the message, therefore we decrement `previous_attempts` + // so that the state machine below works the same way it does for + // authority nodes. + previous_attempts -= 1; + } + + if peer.roles.is_authority() { + // the target node is an authority, on the first attempt we start by + // sending the message to only `sqrt(authorities)`. + if previous_attempts == 0 { + let authorities = self.peers.authorities() as f64; + let p = authorities.sqrt() / authorities; + rand::thread_rng().gen_bool(p) + } else { + // otherwise we already went through the step above, so + // we won't filter the message and send it to all + // authorities for whom it is polite to do so + true + } + } else { + // the node is not an authority so we apply stricter filters + if previous_attempts >= 3 { + // if we previously tried to send this message 3 (or more) + // times, then it is allowed to be sent to all peers. + true + } else if previous_attempts == 2 { + // otherwise we only send it to `sqrt(non-authorities)`. + let non_authorities = self.peers.non_authorities() as f64; + let p = non_authorities.sqrt() / non_authorities; + rand::thread_rng().gen_bool(p) + } else { + false + } + } + } } /// A validator for GRANDPA gossip messages. @@ -1183,6 +1250,13 @@ impl network_gossip::Validator for GossipValidator Some(x) => x, }; + if let MessageIntent::Broadcast { previous_attempts } = intent { + // early return if the message isn't allowed at this stage. + if !inner.message_allowed(peer, previous_attempts) { + return false; + } + } + // if the topic is not something we're keeping at the moment, // do not send. let (maybe_round, set_id) = match inner.live_topics.topic_info(&topic) { @@ -1209,8 +1283,8 @@ impl network_gossip::Validator for GossipValidator Ok(GossipMessage::Commit(full)) => { // we only broadcast our best commit and only if it's // better than last received by peer. - Some(full.message.target_number) == our_best_commit - && Some(full.message.target_number) > peer_best_commit + Some(full.message.target_number) == our_best_commit && + Some(full.message.target_number) > peer_best_commit } Ok(GossipMessage::Neighbor(_)) => false, Ok(GossipMessage::CatchUpRequest(_)) => false, diff --git a/core/network/src/protocol/consensus_gossip.rs b/core/network/src/protocol/consensus_gossip.rs index f3d4e536a788d..23644d3ec11ba 100644 --- a/core/network/src/protocol/consensus_gossip.rs +++ b/core/network/src/protocol/consensus_gossip.rs @@ -73,6 +73,7 @@ const UNREGISTERED_TOPIC_REPUTATION_CHANGE: i32 = -(1 << 10); struct PeerConsensus { known_messages: HashSet, + filtered_messages: HashMap, roles: Roles, } @@ -105,8 +106,12 @@ pub enum MessageRecipient { /// The reason for sending out the message. #[derive(Eq, PartialEq, Copy, Clone)] pub enum MessageIntent { - /// Requested broadcast - Broadcast, + /// Requested broadcast. + Broadcast { + /// How many times this message was previously filtered by the gossip + /// validator when trying to propagate to a given peer. + previous_attempts: usize + }, /// Requested broadcast to all peers. ForcedBroadcast, /// Periodic rebroadcast of all messages to all peers. @@ -123,6 +128,12 @@ pub enum ValidationResult { Discard, } +impl MessageIntent { + fn broadcast() -> MessageIntent { + MessageIntent::Broadcast { previous_attempts: 0 } + } +} + /// Validation context. Allows reacting to incoming messages by sending out further messages. pub trait ValidatorContext { /// Broadcast all messages with given topic to peers that do not have it yet. @@ -196,12 +207,17 @@ fn propagate<'a, B: BlockT, I>( for (message_hash, topic, message) in messages { for (id, ref mut peer) in peers.iter_mut() { + let previous_attempts = peer.filtered_messages + .get(&message_hash) + .cloned() + .unwrap_or(0); + let intent = match intent { - MessageIntent::Broadcast => + MessageIntent::Broadcast { .. } => if peer.known_messages.contains(&message_hash) { - continue + continue; } else { - MessageIntent::Broadcast + MessageIntent::Broadcast { previous_attempts } }, MessageIntent::PeriodicRebroadcast => if peer.known_messages.contains(&message_hash) { @@ -209,15 +225,24 @@ fn propagate<'a, B: BlockT, I>( } else { // peer doesn't know message, so the logic should treat it as an // initial broadcast. - MessageIntent::Broadcast + MessageIntent::Broadcast { previous_attempts } }, other => other, }; if !message_allowed(id, intent, &topic, &message) { - continue + let count = peer.filtered_messages + .entry(message_hash.clone()) + .or_insert(0); + + *count += 1; + + continue; } + + peer.filtered_messages.remove(message_hash); peer.known_messages.insert(message_hash.clone()); + trace!(target: "gossip", "Propagating to {}: {:?}", id, message); protocol.send_consensus(id.clone(), message.clone()); } @@ -310,6 +335,7 @@ impl ConsensusGossip { trace!(target:"gossip", "Registering {:?} {}", roles, who); self.peers.insert(who.clone(), PeerConsensus { known_messages: HashSet::new(), + filtered_messages: HashMap::new(), roles, }); for (engine_id, v) in self.validators.clone() { @@ -379,7 +405,7 @@ impl ConsensusGossip { .filter_map(|entry| if entry.topic == topic { Some((&entry.message_hash, &entry.topic, &entry.message)) } else { None } ); - let intent = if force { MessageIntent::ForcedBroadcast } else { MessageIntent::Broadcast }; + let intent = if force { MessageIntent::ForcedBroadcast } else { MessageIntent::broadcast() }; propagate(protocol, messages, intent, &mut self.peers, &self.validators); } @@ -527,17 +553,36 @@ impl ConsensusGossip { Some(validator) => validator.message_allowed(), }; - let intent = if force { MessageIntent::ForcedBroadcast } else { MessageIntent::Broadcast }; - if let Some(ref mut peer) = self.peers.get_mut(who) { for entry in self.messages.iter().filter(|m| m.topic == topic && m.message.engine_id == engine_id) { + let intent = if force { + MessageIntent::ForcedBroadcast + } else { + let previous_attempts = peer.filtered_messages + .get(&entry.message_hash) + .cloned() + .unwrap_or(0); + + MessageIntent::Broadcast { previous_attempts } + }; + if !force && peer.known_messages.contains(&entry.message_hash) { - continue + continue; } + if !message_allowed(who, intent, &entry.topic, &entry.message.data) { - continue + let count = peer.filtered_messages + .entry(entry.message_hash) + .or_insert(0); + + *count += 1; + + continue; } + + peer.filtered_messages.remove(&entry.message_hash); peer.known_messages.insert(entry.message_hash.clone()); + trace!(target: "gossip", "Sending topic message to {}: {:?}", who, entry.message); protocol.send_consensus(who.clone(), ConsensusMessage { engine_id: engine_id.clone(), @@ -557,7 +602,7 @@ impl ConsensusGossip { ) { let message_hash = HashFor::::hash(&message.data); self.register_message_hashed(message_hash, topic, message.clone(), None); - let intent = if force { MessageIntent::ForcedBroadcast } else { MessageIntent::Broadcast }; + let intent = if force { MessageIntent::ForcedBroadcast } else { MessageIntent::broadcast() }; propagate(protocol, iter::once((&message_hash, &topic, &message)), intent, &mut self.peers, &self.validators); } From d10b43121b823a70c3a08a865df4266203420082 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9=20Silva?= Date: Wed, 6 Nov 2019 16:29:46 +0000 Subject: [PATCH 2/9] gossip: remove filtered message on send_message --- core/network/src/protocol/consensus_gossip.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/core/network/src/protocol/consensus_gossip.rs b/core/network/src/protocol/consensus_gossip.rs index 23644d3ec11ba..81d6b80cc346e 100644 --- a/core/network/src/protocol/consensus_gossip.rs +++ b/core/network/src/protocol/consensus_gossip.rs @@ -623,7 +623,9 @@ impl ConsensusGossip { trace!(target: "gossip", "Sending direct to {}: {:?}", who, message); + peer.filtered_messages.remove(&message_hash); peer.known_messages.insert(message_hash); + protocol.send_consensus(who.clone(), message.clone()); } } From 1f42b389eb435715111c36b65244346c07c63176 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9=20Silva?= Date: Thu, 7 Nov 2019 13:56:38 +0000 Subject: [PATCH 3/9] gossip: add test for tracking of broadcast attempts --- core/network/src/protocol/consensus_gossip.rs | 110 +++++++++++++++++- 1 file changed, 109 insertions(+), 1 deletion(-) diff --git a/core/network/src/protocol/consensus_gossip.rs b/core/network/src/protocol/consensus_gossip.rs index 81d6b80cc346e..10dae4a18e13e 100644 --- a/core/network/src/protocol/consensus_gossip.rs +++ b/core/network/src/protocol/consensus_gossip.rs @@ -105,6 +105,7 @@ pub enum MessageRecipient { /// The reason for sending out the message. #[derive(Eq, PartialEq, Copy, Clone)] +#[cfg_attr(test, derive(Debug))] pub enum MessageIntent { /// Requested broadcast. Broadcast { @@ -654,6 +655,8 @@ impl Validator for DiscardAll { #[cfg(test)] mod tests { + use std::sync::{Arc, atomic::{AtomicBool, Ordering}}; + use parking_lot::Mutex; use sr_primitives::testing::{H256, Block as RawBlock, ExtrinsicWrapper}; use futures03::executor::block_on_stream; @@ -704,7 +707,7 @@ mod tests { } fn message_expired<'a>(&'a self) -> Box bool + 'a> { - Box::new(move |_topic, data| data[0] != 1 ) + Box::new(move |_topic, data| data[0] != 1) } } @@ -802,4 +805,109 @@ mod tests { let _ = consensus.live_message_sinks.remove(&([0, 0, 0, 0], topic)); assert_eq!(stream.next(), None); } + + #[test] + fn keeps_track_of_broadcast_attempts() { + struct DummyNetworkContext; + impl Context for DummyNetworkContext { + fn report_peer(&mut self, _who: PeerId, _reputation: i32) {} + fn disconnect_peer(&mut self, _who: PeerId) {} + fn send_consensus(&mut self, _who: PeerId, _consensus: ConsensusMessage) {} + fn send_chain_specific(&mut self, _who: PeerId, _message: Vec) {} + } + + // A mock gossip validator that never expires any message, allows + // setting whether messages should be allowed and keeps track of any + // messages passed to `message_allowed`. + struct MockValidator { + allow: AtomicBool, + messages: Arc, MessageIntent)>>>, + } + + impl MockValidator { + fn new() -> MockValidator { + MockValidator { + allow: AtomicBool::new(false), + messages: Arc::new(Mutex::new(Vec::new())), + } + } + } + + impl Validator for MockValidator { + fn validate( + &self, + _context: &mut dyn ValidatorContext, + _sender: &PeerId, + _data: &[u8], + ) -> ValidationResult { + ValidationResult::ProcessAndKeep(H256::default()) + } + + fn message_expired<'a>(&'a self) -> Box bool + 'a> { + Box::new(move |_topic, _data| false) + } + + fn message_allowed<'a>(&'a self) -> Box bool + 'a> { + let messages = self.messages.clone(); + Box::new(move |_, intent, _, data| { + messages.lock().push((data.to_vec(), intent)); + self.allow.load(Ordering::SeqCst) + }) + } + } + + // we setup an instance of the mock gossip validator, add a new peer to + // it and register a message. + let mut consensus = ConsensusGossip::::new(); + let validator = Arc::new(MockValidator::new()); + consensus.register_validator_internal([0, 0, 0, 0], validator.clone()); + consensus.new_peer( + &mut DummyNetworkContext, + PeerId::random(), + Roles::AUTHORITY, + ); + + let data = vec![1, 2, 3]; + let msg = ConsensusMessage { data: data.clone(), engine_id: [0, 0, 0, 0] }; + consensus.register_message(H256::default(), msg); + + // tick the gossip handler and make sure it triggers a message rebroadcast + let mut tick = || { + consensus.next_broadcast = std::time::Instant::now(); + consensus.tick(&mut DummyNetworkContext); + }; + + // by default we won't allow the message we registered, so everytime we + // tick the gossip handler, the message intent should be kept as + // `Broadcast` but the previous attempts should be incremented. + tick(); + assert_eq!( + validator.messages.lock().pop().unwrap(), + (data.clone(), MessageIntent::Broadcast { previous_attempts: 0 }), + ); + + tick(); + assert_eq!( + validator.messages.lock().pop().unwrap(), + (data.clone(), MessageIntent::Broadcast { previous_attempts: 1 }), + ); + + // we set the validator to allow the message to go through + validator.allow.store(true, Ordering::SeqCst); + + // we still get the same message intent but it should be delivered now + tick(); + assert_eq!( + validator.messages.lock().pop().unwrap(), + (data.clone(), MessageIntent::Broadcast { previous_attempts: 2 }), + ); + + // ticking the gossip handler again the message intent should change to + // `PeriodicRebroadcast` since it was sent. + tick(); + assert_eq!( + validator.messages.lock().pop().unwrap(), + (data.clone(), MessageIntent::PeriodicRebroadcast), + ); + } } From c2dcba2a370b230dc85cde31b1186a522298ca5b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9=20Silva?= Date: Fri, 8 Nov 2019 13:10:46 +0000 Subject: [PATCH 4/9] grandpa: only restrict gossip if we're connected to more than 5 authorities --- core/finality-grandpa/src/communication/gossip.rs | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/core/finality-grandpa/src/communication/gossip.rs b/core/finality-grandpa/src/communication/gossip.rs index f10d5382ac819..205733ec2b705 100644 --- a/core/finality-grandpa/src/communication/gossip.rs +++ b/core/finality-grandpa/src/communication/gossip.rs @@ -1019,10 +1019,12 @@ impl Inner { } if peer.roles.is_authority() { + let authorities = self.peers.authorities(); + // the target node is an authority, on the first attempt we start by // sending the message to only `sqrt(authorities)`. - if previous_attempts == 0 { - let authorities = self.peers.authorities() as f64; + if previous_attempts == 0 && authorities > 5 { + let authorities = authorities as f64; let p = authorities.sqrt() / authorities; rand::thread_rng().gen_bool(p) } else { From bb622012b30ee21ffceea1701cc78157d25669bc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9=20Silva?= Date: Fri, 8 Nov 2019 13:24:40 +0000 Subject: [PATCH 5/9] grandpa: add test for progressive gossip --- .../src/communication/gossip.rs | 123 +++++++++++++++++- 1 file changed, 117 insertions(+), 6 deletions(-) diff --git a/core/finality-grandpa/src/communication/gossip.rs b/core/finality-grandpa/src/communication/gossip.rs index 205733ec2b705..fc832a2c7c73c 100644 --- a/core/finality-grandpa/src/communication/gossip.rs +++ b/core/finality-grandpa/src/communication/gossip.rs @@ -1002,6 +1002,8 @@ impl Inner { /// Transitions will be triggered on repropagation attempts by the /// underlying gossip layer, which should happen every 30 seconds. fn message_allowed(&self, peer: &PeerInfo, mut previous_attempts: usize) -> bool { + const MIN_AUTHORITIES: usize = 5; + if !self.config.is_authority && previous_attempts == 0 { // non-authority nodes don't gossip any messages right away. we // assume that authorities (and sentries) are strongly connected, so @@ -1022,10 +1024,11 @@ impl Inner { let authorities = self.peers.authorities(); // the target node is an authority, on the first attempt we start by - // sending the message to only `sqrt(authorities)`. - if previous_attempts == 0 && authorities > 5 { + // sending the message to only `sqrt(authorities)` (if we're + // connected to at least `MIN_AUTHORITIES`). + if previous_attempts == 0 && authorities > MIN_AUTHORITIES { let authorities = authorities as f64; - let p = authorities.sqrt() / authorities; + let p = (authorities.sqrt()).max(MIN_AUTHORITIES as f64) / authorities; rand::thread_rng().gen_bool(p) } else { // otherwise we already went through the step above, so @@ -1387,7 +1390,7 @@ mod tests { use super::environment::SharedVoterSetState; use network_gossip::Validator as GossipValidatorT; use network::test::Block; - use primitives::crypto::Public; + use primitives::{crypto::Public, H256}; // some random config (not really needed) fn config() -> crate::Config { @@ -1405,9 +1408,8 @@ mod tests { fn voter_set_state() -> SharedVoterSetState { use crate::authorities::AuthoritySet; use crate::environment::VoterSetState; - use primitives::H256; - let base = (H256::zero(), 0); + let base = (H256::zero(), 0); let voters = AuthoritySet::genesis(Vec::new()); let set_state = VoterSetState::live( 0, @@ -2067,4 +2069,113 @@ mod tests { ) } } + + #[test] + fn progressively_gossips_to_more_peers() { + let (val, _) = GossipValidator::::new( + config(), + voter_set_state(), + ); + + // the validator start at set id 0 + val.note_set(SetId(0), Vec::new(), |_, _| {}); + + // add 60 peers, 30 authorities and 30 full nodes + let mut authorities = Vec::new(); + authorities.resize_with(30, || PeerId::random()); + + let mut full_nodes = Vec::new(); + full_nodes.resize_with(30, || PeerId::random()); + + for i in 0..30 { + val.inner.write().peers.new_peer(authorities[i].clone(), Roles::AUTHORITY); + val.inner.write().peers.new_peer(full_nodes[i].clone(), Roles::FULL); + } + + let test = |previous_attempts, peers| { + let mut message_allowed = val.message_allowed(); + + move || { + let mut allowed = 0; + for peer in peers { + if message_allowed( + peer, + MessageIntent::Broadcast { previous_attempts }, + &crate::communication::round_topic::(1, 0), + &[], + ) { + allowed += 1; + } + } + allowed + } + }; + + fn trial usize>(mut test: F) -> usize { + let mut results = Vec::new(); + let n = 1000; + + for _ in 0..n { + results.push(test()); + } + + let n = results.len(); + let sum: usize = results.iter().sum(); + + sum / n + } + + // on the first attempt we will only gossip to `sqrt(authorities)`, + // which should average out to 5 peers after a couple of trials + assert_eq!(trial(test(0, &authorities)), 5); + + // on the second (and subsequent attempts) we should gossip to all + // authorities we're connected to. + assert_eq!(trial(test(1, &authorities)), 30); + assert_eq!(trial(test(2, &authorities)), 30); + + // we should only gossip to non-authorities after the third attempt + assert_eq!(trial(test(0, &full_nodes)), 0); + assert_eq!(trial(test(1, &full_nodes)), 0); + + // and only to `sqrt(non-authorities)` + assert_eq!(trial(test(2, &full_nodes)), 5); + + // only on the fourth attempt should we gossip to all non-authorities + assert_eq!(trial(test(3, &full_nodes)), 30); + } + + #[test] + fn only_restricts_gossip_to_authorities_after_a_minimum_threshold() { + let (val, _) = GossipValidator::::new( + config(), + voter_set_state(), + ); + + // the validator start at set id 0 + val.note_set(SetId(0), Vec::new(), |_, _| {}); + + let mut authorities = Vec::new(); + for _ in 0..5 { + let peer_id = PeerId::random(); + val.inner.write().peers.new_peer(peer_id.clone(), Roles::AUTHORITY); + authorities.push(peer_id); + } + + let mut message_allowed = val.message_allowed(); + + // since we're only connected to 5 authorities, we should never restrict + // sending of gossip messages, and instead just allow them to all + // non-authorities on the first attempt. + for authority in &authorities { + assert!( + message_allowed( + authority, + MessageIntent::Broadcast { previous_attempts: 0 }, + &crate::communication::round_topic::(1, 0), + &[], + ) + ); + } + } } From e6242a43361e695a6342c73bcdb9c33f3c177bf8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9=20Silva?= Date: Fri, 8 Nov 2019 13:29:10 +0000 Subject: [PATCH 6/9] grandpa: add test for gossip filtering on local non-authority node --- .../src/communication/gossip.rs | 49 +++++++++++++++++++ 1 file changed, 49 insertions(+) diff --git a/core/finality-grandpa/src/communication/gossip.rs b/core/finality-grandpa/src/communication/gossip.rs index fc832a2c7c73c..0425d3104592f 100644 --- a/core/finality-grandpa/src/communication/gossip.rs +++ b/core/finality-grandpa/src/communication/gossip.rs @@ -2178,4 +2178,53 @@ mod tests { ); } } + + #[test] + fn non_authorities_never_gossip_messages_on_first_attempt() { + let mut config = config(); + config.is_authority = false; + + let (val, _) = GossipValidator::::new( + config, + voter_set_state(), + ); + + // the validator start at set id 0 + val.note_set(SetId(0), Vec::new(), |_, _| {}); + + let mut authorities = Vec::new(); + for _ in 0..100 { + let peer_id = PeerId::random(); + val.inner.write().peers.new_peer(peer_id.clone(), Roles::AUTHORITY); + authorities.push(peer_id); + } + + let mut message_allowed = val.message_allowed(); + + // since our node is not an authority we should **never** gossip any + // messages on the first attempt. + for authority in &authorities { + assert!( + !message_allowed( + authority, + MessageIntent::Broadcast { previous_attempts: 0 }, + &crate::communication::round_topic::(1, 0), + &[], + ) + ); + } + + // on the third attempt we should allow messages to authorities + // (on the second attempt we would do `sqrt(authorities)`) + for authority in &authorities { + assert!( + message_allowed( + authority, + MessageIntent::Broadcast { previous_attempts: 2 }, + &crate::communication::round_topic::(1, 0), + &[], + ) + ); + } + } } From b7fd92ef923d70880bbd69dc4c766e52476b39c1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9=20Silva?= Date: Fri, 8 Nov 2019 13:29:57 +0000 Subject: [PATCH 7/9] grandpa: fix doc --- core/finality-grandpa/src/communication/gossip.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/finality-grandpa/src/communication/gossip.rs b/core/finality-grandpa/src/communication/gossip.rs index 0425d3104592f..d27b4ca38854d 100644 --- a/core/finality-grandpa/src/communication/gossip.rs +++ b/core/finality-grandpa/src/communication/gossip.rs @@ -991,7 +991,7 @@ impl Inner { } /// The initial logic for filtering messages follows the given state - /// transitions. + /// transitions: /// /// - State 0: not allowed to anyone (only if our local node is not an authority) /// - State 1: allowed to random `sqrt(authorities)` From 839863e681f8d541b006e2b3ef93f75e097a5813 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9=20Silva?= Date: Fri, 8 Nov 2019 15:17:52 +0000 Subject: [PATCH 8/9] gossip, grandpa: tabify --- .../src/communication/gossip.rs | 438 +++++++++--------- core/network/src/protocol/consensus_gossip.rs | 4 +- 2 files changed, 221 insertions(+), 221 deletions(-) diff --git a/core/finality-grandpa/src/communication/gossip.rs b/core/finality-grandpa/src/communication/gossip.rs index d27b4ca38854d..1d9837f16aeee 100644 --- a/core/finality-grandpa/src/communication/gossip.rs +++ b/core/finality-grandpa/src/communication/gossip.rs @@ -485,13 +485,13 @@ impl Peers { self.inner.get(who) } - fn authorities(&self) -> usize { - self.inner.iter().filter(|(_, info)| info.roles.is_authority()).count() - } + fn authorities(&self) -> usize { + self.inner.iter().filter(|(_, info)| info.roles.is_authority()).count() + } - fn non_authorities(&self) -> usize { - self.inner.iter().filter(|(_, info)| !info.roles.is_authority()).count() - } + fn non_authorities(&self) -> usize { + self.inner.iter().filter(|(_, info)| !info.roles.is_authority()).count() + } } #[derive(Debug, PartialEq)] @@ -990,68 +990,68 @@ impl Inner { (true, report) } - /// The initial logic for filtering messages follows the given state - /// transitions: - /// - /// - State 0: not allowed to anyone (only if our local node is not an authority) - /// - State 1: allowed to random `sqrt(authorities)` - /// - State 2: allowed to all authorities - /// - State 3: allowed to random `sqrt(non-authorities)` - /// - State 4: allowed to all non-authorities - /// - /// Transitions will be triggered on repropagation attempts by the - /// underlying gossip layer, which should happen every 30 seconds. - fn message_allowed(&self, peer: &PeerInfo, mut previous_attempts: usize) -> bool { - const MIN_AUTHORITIES: usize = 5; - - if !self.config.is_authority && previous_attempts == 0 { - // non-authority nodes don't gossip any messages right away. we - // assume that authorities (and sentries) are strongly connected, so - // it should be unnecessary for non-authorities to gossip all - // messages right away. - return false; - } - - if !self.config.is_authority { - // since the node is not an authority we skipped the initial attempt - // to gossip the message, therefore we decrement `previous_attempts` - // so that the state machine below works the same way it does for - // authority nodes. - previous_attempts -= 1; - } - - if peer.roles.is_authority() { - let authorities = self.peers.authorities(); - - // the target node is an authority, on the first attempt we start by - // sending the message to only `sqrt(authorities)` (if we're - // connected to at least `MIN_AUTHORITIES`). - if previous_attempts == 0 && authorities > MIN_AUTHORITIES { - let authorities = authorities as f64; - let p = (authorities.sqrt()).max(MIN_AUTHORITIES as f64) / authorities; - rand::thread_rng().gen_bool(p) - } else { - // otherwise we already went through the step above, so - // we won't filter the message and send it to all - // authorities for whom it is polite to do so - true - } - } else { - // the node is not an authority so we apply stricter filters - if previous_attempts >= 3 { - // if we previously tried to send this message 3 (or more) - // times, then it is allowed to be sent to all peers. - true - } else if previous_attempts == 2 { - // otherwise we only send it to `sqrt(non-authorities)`. - let non_authorities = self.peers.non_authorities() as f64; - let p = non_authorities.sqrt() / non_authorities; - rand::thread_rng().gen_bool(p) - } else { - false - } - } - } + /// The initial logic for filtering messages follows the given state + /// transitions: + /// + /// - State 0: not allowed to anyone (only if our local node is not an authority) + /// - State 1: allowed to random `sqrt(authorities)` + /// - State 2: allowed to all authorities + /// - State 3: allowed to random `sqrt(non-authorities)` + /// - State 4: allowed to all non-authorities + /// + /// Transitions will be triggered on repropagation attempts by the + /// underlying gossip layer, which should happen every 30 seconds. + fn message_allowed(&self, peer: &PeerInfo, mut previous_attempts: usize) -> bool { + const MIN_AUTHORITIES: usize = 5; + + if !self.config.is_authority && previous_attempts == 0 { + // non-authority nodes don't gossip any messages right away. we + // assume that authorities (and sentries) are strongly connected, so + // it should be unnecessary for non-authorities to gossip all + // messages right away. + return false; + } + + if !self.config.is_authority { + // since the node is not an authority we skipped the initial attempt + // to gossip the message, therefore we decrement `previous_attempts` + // so that the state machine below works the same way it does for + // authority nodes. + previous_attempts -= 1; + } + + if peer.roles.is_authority() { + let authorities = self.peers.authorities(); + + // the target node is an authority, on the first attempt we start by + // sending the message to only `sqrt(authorities)` (if we're + // connected to at least `MIN_AUTHORITIES`). + if previous_attempts == 0 && authorities > MIN_AUTHORITIES { + let authorities = authorities as f64; + let p = (authorities.sqrt()).max(MIN_AUTHORITIES as f64) / authorities; + rand::thread_rng().gen_bool(p) + } else { + // otherwise we already went through the step above, so + // we won't filter the message and send it to all + // authorities for whom it is polite to do so + true + } + } else { + // the node is not an authority so we apply stricter filters + if previous_attempts >= 3 { + // if we previously tried to send this message 3 (or more) + // times, then it is allowed to be sent to all peers. + true + } else if previous_attempts == 2 { + // otherwise we only send it to `sqrt(non-authorities)`. + let non_authorities = self.peers.non_authorities() as f64; + let p = non_authorities.sqrt() / non_authorities; + rand::thread_rng().gen_bool(p) + } else { + false + } + } + } } /// A validator for GRANDPA gossip messages. @@ -1255,12 +1255,12 @@ impl network_gossip::Validator for GossipValidator Some(x) => x, }; - if let MessageIntent::Broadcast { previous_attempts } = intent { - // early return if the message isn't allowed at this stage. - if !inner.message_allowed(peer, previous_attempts) { - return false; - } - } + if let MessageIntent::Broadcast { previous_attempts } = intent { + // early return if the message isn't allowed at this stage. + if !inner.message_allowed(peer, previous_attempts) { + return false; + } + } // if the topic is not something we're keeping at the moment, // do not send. @@ -1289,7 +1289,7 @@ impl network_gossip::Validator for GossipValidator // we only broadcast our best commit and only if it's // better than last received by peer. Some(full.message.target_number) == our_best_commit && - Some(full.message.target_number) > peer_best_commit + Some(full.message.target_number) > peer_best_commit } Ok(GossipMessage::Neighbor(_)) => false, Ok(GossipMessage::CatchUpRequest(_)) => false, @@ -1409,7 +1409,7 @@ mod tests { use crate::authorities::AuthoritySet; use crate::environment::VoterSetState; - let base = (H256::zero(), 0); + let base = (H256::zero(), 0); let voters = AuthoritySet::genesis(Vec::new()); let set_state = VoterSetState::live( 0, @@ -2070,161 +2070,161 @@ mod tests { } } - #[test] - fn progressively_gossips_to_more_peers() { + #[test] + fn progressively_gossips_to_more_peers() { let (val, _) = GossipValidator::::new( config(), voter_set_state(), ); - // the validator start at set id 0 - val.note_set(SetId(0), Vec::new(), |_, _| {}); - - // add 60 peers, 30 authorities and 30 full nodes - let mut authorities = Vec::new(); - authorities.resize_with(30, || PeerId::random()); - - let mut full_nodes = Vec::new(); - full_nodes.resize_with(30, || PeerId::random()); - - for i in 0..30 { - val.inner.write().peers.new_peer(authorities[i].clone(), Roles::AUTHORITY); - val.inner.write().peers.new_peer(full_nodes[i].clone(), Roles::FULL); - } - - let test = |previous_attempts, peers| { - let mut message_allowed = val.message_allowed(); - - move || { - let mut allowed = 0; - for peer in peers { - if message_allowed( - peer, - MessageIntent::Broadcast { previous_attempts }, - &crate::communication::round_topic::(1, 0), - &[], - ) { - allowed += 1; - } - } - allowed - } - }; - - fn trial usize>(mut test: F) -> usize { - let mut results = Vec::new(); - let n = 1000; - - for _ in 0..n { - results.push(test()); - } - - let n = results.len(); - let sum: usize = results.iter().sum(); - - sum / n - } - - // on the first attempt we will only gossip to `sqrt(authorities)`, - // which should average out to 5 peers after a couple of trials - assert_eq!(trial(test(0, &authorities)), 5); - - // on the second (and subsequent attempts) we should gossip to all - // authorities we're connected to. - assert_eq!(trial(test(1, &authorities)), 30); - assert_eq!(trial(test(2, &authorities)), 30); - - // we should only gossip to non-authorities after the third attempt - assert_eq!(trial(test(0, &full_nodes)), 0); - assert_eq!(trial(test(1, &full_nodes)), 0); - - // and only to `sqrt(non-authorities)` - assert_eq!(trial(test(2, &full_nodes)), 5); - - // only on the fourth attempt should we gossip to all non-authorities - assert_eq!(trial(test(3, &full_nodes)), 30); - } - - #[test] - fn only_restricts_gossip_to_authorities_after_a_minimum_threshold() { + // the validator start at set id 0 + val.note_set(SetId(0), Vec::new(), |_, _| {}); + + // add 60 peers, 30 authorities and 30 full nodes + let mut authorities = Vec::new(); + authorities.resize_with(30, || PeerId::random()); + + let mut full_nodes = Vec::new(); + full_nodes.resize_with(30, || PeerId::random()); + + for i in 0..30 { + val.inner.write().peers.new_peer(authorities[i].clone(), Roles::AUTHORITY); + val.inner.write().peers.new_peer(full_nodes[i].clone(), Roles::FULL); + } + + let test = |previous_attempts, peers| { + let mut message_allowed = val.message_allowed(); + + move || { + let mut allowed = 0; + for peer in peers { + if message_allowed( + peer, + MessageIntent::Broadcast { previous_attempts }, + &crate::communication::round_topic::(1, 0), + &[], + ) { + allowed += 1; + } + } + allowed + } + }; + + fn trial usize>(mut test: F) -> usize { + let mut results = Vec::new(); + let n = 1000; + + for _ in 0..n { + results.push(test()); + } + + let n = results.len(); + let sum: usize = results.iter().sum(); + + sum / n + } + + // on the first attempt we will only gossip to `sqrt(authorities)`, + // which should average out to 5 peers after a couple of trials + assert_eq!(trial(test(0, &authorities)), 5); + + // on the second (and subsequent attempts) we should gossip to all + // authorities we're connected to. + assert_eq!(trial(test(1, &authorities)), 30); + assert_eq!(trial(test(2, &authorities)), 30); + + // we should only gossip to non-authorities after the third attempt + assert_eq!(trial(test(0, &full_nodes)), 0); + assert_eq!(trial(test(1, &full_nodes)), 0); + + // and only to `sqrt(non-authorities)` + assert_eq!(trial(test(2, &full_nodes)), 5); + + // only on the fourth attempt should we gossip to all non-authorities + assert_eq!(trial(test(3, &full_nodes)), 30); + } + + #[test] + fn only_restricts_gossip_to_authorities_after_a_minimum_threshold() { let (val, _) = GossipValidator::::new( config(), voter_set_state(), ); - // the validator start at set id 0 - val.note_set(SetId(0), Vec::new(), |_, _| {}); - - let mut authorities = Vec::new(); - for _ in 0..5 { - let peer_id = PeerId::random(); - val.inner.write().peers.new_peer(peer_id.clone(), Roles::AUTHORITY); - authorities.push(peer_id); - } - - let mut message_allowed = val.message_allowed(); - - // since we're only connected to 5 authorities, we should never restrict - // sending of gossip messages, and instead just allow them to all - // non-authorities on the first attempt. - for authority in &authorities { - assert!( - message_allowed( - authority, - MessageIntent::Broadcast { previous_attempts: 0 }, - &crate::communication::round_topic::(1, 0), - &[], - ) - ); - } - } - - #[test] - fn non_authorities_never_gossip_messages_on_first_attempt() { - let mut config = config(); - config.is_authority = false; - - let (val, _) = GossipValidator::::new( + // the validator start at set id 0 + val.note_set(SetId(0), Vec::new(), |_, _| {}); + + let mut authorities = Vec::new(); + for _ in 0..5 { + let peer_id = PeerId::random(); + val.inner.write().peers.new_peer(peer_id.clone(), Roles::AUTHORITY); + authorities.push(peer_id); + } + + let mut message_allowed = val.message_allowed(); + + // since we're only connected to 5 authorities, we should never restrict + // sending of gossip messages, and instead just allow them to all + // non-authorities on the first attempt. + for authority in &authorities { + assert!( + message_allowed( + authority, + MessageIntent::Broadcast { previous_attempts: 0 }, + &crate::communication::round_topic::(1, 0), + &[], + ) + ); + } + } + + #[test] + fn non_authorities_never_gossip_messages_on_first_attempt() { + let mut config = config(); + config.is_authority = false; + + let (val, _) = GossipValidator::::new( config, voter_set_state(), ); - // the validator start at set id 0 - val.note_set(SetId(0), Vec::new(), |_, _| {}); - - let mut authorities = Vec::new(); - for _ in 0..100 { - let peer_id = PeerId::random(); - val.inner.write().peers.new_peer(peer_id.clone(), Roles::AUTHORITY); - authorities.push(peer_id); - } - - let mut message_allowed = val.message_allowed(); - - // since our node is not an authority we should **never** gossip any - // messages on the first attempt. - for authority in &authorities { - assert!( - !message_allowed( - authority, - MessageIntent::Broadcast { previous_attempts: 0 }, - &crate::communication::round_topic::(1, 0), - &[], - ) - ); - } - - // on the third attempt we should allow messages to authorities - // (on the second attempt we would do `sqrt(authorities)`) - for authority in &authorities { - assert!( - message_allowed( - authority, - MessageIntent::Broadcast { previous_attempts: 2 }, - &crate::communication::round_topic::(1, 0), - &[], - ) - ); - } - } + // the validator start at set id 0 + val.note_set(SetId(0), Vec::new(), |_, _| {}); + + let mut authorities = Vec::new(); + for _ in 0..100 { + let peer_id = PeerId::random(); + val.inner.write().peers.new_peer(peer_id.clone(), Roles::AUTHORITY); + authorities.push(peer_id); + } + + let mut message_allowed = val.message_allowed(); + + // since our node is not an authority we should **never** gossip any + // messages on the first attempt. + for authority in &authorities { + assert!( + !message_allowed( + authority, + MessageIntent::Broadcast { previous_attempts: 0 }, + &crate::communication::round_topic::(1, 0), + &[], + ) + ); + } + + // on the third attempt we should allow messages to authorities + // (on the second attempt we would do `sqrt(authorities)`) + for authority in &authorities { + assert!( + message_allowed( + authority, + MessageIntent::Broadcast { previous_attempts: 2 }, + &crate::communication::round_topic::(1, 0), + &[], + ) + ); + } + } } diff --git a/core/network/src/protocol/consensus_gossip.rs b/core/network/src/protocol/consensus_gossip.rs index 10dae4a18e13e..67e8364abbc2d 100644 --- a/core/network/src/protocol/consensus_gossip.rs +++ b/core/network/src/protocol/consensus_gossip.rs @@ -73,7 +73,7 @@ const UNREGISTERED_TOPIC_REPUTATION_CHANGE: i32 = -(1 << 10); struct PeerConsensus { known_messages: HashSet, - filtered_messages: HashMap, + filtered_messages: HashMap, roles: Roles, } @@ -336,7 +336,7 @@ impl ConsensusGossip { trace!(target:"gossip", "Registering {:?} {}", roles, who); self.peers.insert(who.clone(), PeerConsensus { known_messages: HashSet::new(), - filtered_messages: HashMap::new(), + filtered_messages: HashMap::new(), roles, }); for (engine_id, v) in self.validators.clone() { From b17cb0daac9a0d52a471c88fcc66407a7e4177c5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9=20Silva?= Date: Fri, 8 Nov 2019 17:12:42 +0000 Subject: [PATCH 9/9] grandpa: relax filtering logic for global messages --- .../src/communication/gossip.rs | 78 ++++++++++++++++--- 1 file changed, 69 insertions(+), 9 deletions(-) diff --git a/core/finality-grandpa/src/communication/gossip.rs b/core/finality-grandpa/src/communication/gossip.rs index 1d9837f16aeee..7758de6afa70d 100644 --- a/core/finality-grandpa/src/communication/gossip.rs +++ b/core/finality-grandpa/src/communication/gossip.rs @@ -990,7 +990,7 @@ impl Inner { (true, report) } - /// The initial logic for filtering messages follows the given state + /// The initial logic for filtering round messages follows the given state /// transitions: /// /// - State 0: not allowed to anyone (only if our local node is not an authority) @@ -1001,7 +1001,7 @@ impl Inner { /// /// Transitions will be triggered on repropagation attempts by the /// underlying gossip layer, which should happen every 30 seconds. - fn message_allowed(&self, peer: &PeerInfo, mut previous_attempts: usize) -> bool { + fn round_message_allowed(&self, peer: &PeerInfo, mut previous_attempts: usize) -> bool { const MIN_AUTHORITIES: usize = 5; if !self.config.is_authority && previous_attempts == 0 { @@ -1052,6 +1052,59 @@ impl Inner { } } } + + /// The initial logic for filtering global messages follows the given state + /// transitions: + /// + /// - State 0: send to `sqrt(authorities)` ++ `sqrt(non-authorities)`. + /// - State 1: send to all authorities + /// - State 2: send to all non-authorities + /// + /// We are more lenient with global messages since there should be a lot + /// less global messages than round messages (just commits), and we want + /// these to propagate to non-authorities fast enough so that they can + /// observe finality. + /// + /// Transitions will be triggered on repropagation attempts by the + /// underlying gossip layer, which should happen every 30 seconds. + fn global_message_allowed(&self, peer: &PeerInfo, previous_attempts: usize) -> bool { + const MIN_PEERS: usize = 5; + + if peer.roles.is_authority() { + let authorities = self.peers.authorities(); + + // the target node is an authority, on the first attempt we start by + // sending the message to only `sqrt(authorities)` (if we're + // connected to at least `MIN_PEERS`). + if previous_attempts == 0 && authorities > MIN_PEERS { + let authorities = authorities as f64; + let p = (authorities.sqrt()).max(MIN_PEERS as f64) / authorities; + rand::thread_rng().gen_bool(p) + } else { + // otherwise we already went through the step above, so + // we won't filter the message and send it to all + // authorities for whom it is polite to do so + true + } + } else { + let non_authorities = self.peers.non_authorities(); + + // the target node is not an authority, on the first and second + // attempt we start by sending the message to only + // `sqrt(non_authorities)` (if we're connected to at least + // `MIN_PEERS`). + if previous_attempts <= 1 && non_authorities > MIN_PEERS { + let non_authorities = non_authorities as f64; + let p = (non_authorities.sqrt()).max(MIN_PEERS as f64) / non_authorities ; + rand::thread_rng().gen_bool(p) + } else { + // otherwise we already went through the step above, so + // we won't filter the message and send it to all + // non-authorities for whom it is polite to do so + true + } + } + } } /// A validator for GRANDPA gossip messages. @@ -1255,13 +1308,6 @@ impl network_gossip::Validator for GossipValidator Some(x) => x, }; - if let MessageIntent::Broadcast { previous_attempts } = intent { - // early return if the message isn't allowed at this stage. - if !inner.message_allowed(peer, previous_attempts) { - return false; - } - } - // if the topic is not something we're keeping at the moment, // do not send. let (maybe_round, set_id) = match inner.live_topics.topic_info(&topic) { @@ -1269,6 +1315,20 @@ impl network_gossip::Validator for GossipValidator Some(x) => x, }; + if let MessageIntent::Broadcast { previous_attempts } = intent { + if maybe_round.is_some() { + if !inner.round_message_allowed(peer, previous_attempts) { + // early return if the vote message isn't allowed at this stage. + return false; + } + } else { + if !inner.global_message_allowed(peer, previous_attempts) { + // early return if the global message isn't allowed at this stage. + return false; + } + } + } + // if the topic is not something the peer accepts, discard. if let Some(round) = maybe_round { return peer.view.consider_vote(round, set_id) == Consider::Accept