From b262f179cfb6364f7cd2967855731d287b3b099b Mon Sep 17 00:00:00 2001 From: Gav Wood Date: Thu, 28 Feb 2019 19:26:52 +0100 Subject: [PATCH] Revert "Various gossip improvements (#1894)" This reverts commit 214056cf19938f67005940e078b486f284e13f95. --- core/finality-grandpa/src/lib.rs | 61 ++++----- core/finality-grandpa/src/tests.rs | 10 +- core/network/src/consensus_gossip.rs | 183 +++++++-------------------- core/network/src/test/mod.rs | 3 +- 4 files changed, 69 insertions(+), 188 deletions(-) diff --git a/core/finality-grandpa/src/lib.rs b/core/finality-grandpa/src/lib.rs index 0dd4662e11eed..1394f210fa172 100644 --- a/core/finality-grandpa/src/lib.rs +++ b/core/finality-grandpa/src/lib.rs @@ -281,30 +281,6 @@ struct TopicTracker { set_id: u64, } -impl TopicTracker { - fn is_expired(&self, round: u64, set_id: u64) -> bool { - if set_id < self.set_id { - trace!(target: "afg", "Expired: Message with expired set_id {} (ours {})", set_id, self.set_id); - return true; - } else if set_id == self.set_id + 1 { - // allow a few first rounds of future set. - if round > MESSAGE_ROUND_TOLERANCE { - trace!(target: "afg", "Expired: Message too far in the future set, round {} (ours set_id {})", round, self.set_id); - return true; - } - } else if set_id == self.set_id { - if round < self.min_live_round.saturating_sub(MESSAGE_ROUND_TOLERANCE) { - trace!(target: "afg", "Expired: Message round is out of bounds {} (ours {}-{})", round, self.min_live_round, self.max_round); - return true; - } - } else { - trace!(target: "afg", "Expired: Message in invalid future set {} (ours {})", set_id, self.set_id); - return true; - } - false - } -} - struct GossipValidator { rounds: parking_lot::RwLock, _marker: ::std::marker::PhantomData, @@ -346,7 +322,26 @@ impl GossipValidator { } fn is_expired(&self, round: u64, set_id: u64) -> bool { - self.rounds.read().is_expired(round, set_id) + let rounds = self.rounds.read(); + if set_id < rounds.set_id { + trace!(target: "afg", "Expired: Message with expired set_id {} (ours {})", set_id, rounds.set_id); + return true; + } else if set_id == rounds.set_id + 1 { + // allow a few first rounds of future set. + if round > MESSAGE_ROUND_TOLERANCE { + trace!(target: "afg", "Expired: Message too far in the future set, round {} (ours set_id {})", round, rounds.set_id); + return true; + } + } else if set_id == rounds.set_id { + if round < rounds.min_live_round.saturating_sub(MESSAGE_ROUND_TOLERANCE) { + trace!(target: "afg", "Expired: Message round is out of bounds {} (ours {}-{})", round, rounds.min_live_round, rounds.max_round); + return true; + } + } else { + trace!(target: "afg", "Expired: Message in invalid future set {} (ours {})", set_id, rounds.set_id); + return true; + } + false } fn validate_round_message(&self, full: VoteOrPrecommitMessage) @@ -414,18 +409,6 @@ impl network_gossip::Validator for GossipValidator(&'a self) -> Box bool + 'a> { - let rounds = self.rounds.read(); - Box::new(move |_topic, mut data| { - match GossipMessage::::decode(&mut data) { - None => true, - Some(GossipMessage::Commit(full)) => rounds.is_expired(full.round, full.set_id), - Some(GossipMessage::VoteOrPrecommit(full)) => - rounds.is_expired(full.round, full.set_id), - } - }) - } } /// A handle to the network. This is generally implemented by providing some @@ -501,7 +484,7 @@ impl,> Network(round, set_id)); + let inner_rx = gossip.messages_for(message_topic::(round, set_id)); let _ = tx.send(inner_rx); }); NetworkStream { outer: rx, inner: None } @@ -526,7 +509,7 @@ impl,> Network(set_id)); + let inner_rx = gossip.messages_for(commit_topic::(set_id)); let _ = tx.send(inner_rx); }); NetworkStream { outer: rx, inner: None } diff --git a/core/finality-grandpa/src/tests.rs b/core/finality-grandpa/src/tests.rs index 2cca70f812711..470640ae826fa 100644 --- a/core/finality-grandpa/src/tests.rs +++ b/core/finality-grandpa/src/tests.rs @@ -182,10 +182,7 @@ impl Network for MessageRouting { self.validator.note_round(round, set_id); let inner = self.inner.lock(); let peer = inner.peer(self.peer_id); - let messages = peer.consensus_gossip_messages_for( - GRANDPA_ENGINE_ID, - make_topic(round, set_id), - ); + let messages = peer.consensus_gossip_messages_for(make_topic(round, set_id)); let messages = messages.map_err( move |_| panic!("Messages for round {} dropped too early", round) @@ -215,10 +212,7 @@ impl Network for MessageRouting { self.validator.note_set(set_id); let inner = self.inner.lock(); let peer = inner.peer(self.peer_id); - let messages = peer.consensus_gossip_messages_for( - GRANDPA_ENGINE_ID, - make_commit_topic(set_id), - ); + let messages = peer.consensus_gossip_messages_for(make_commit_topic(set_id)); let messages = messages.map_err( move |_| panic!("Commit messages for set {} dropped too early", set_id) diff --git a/core/network/src/consensus_gossip.rs b/core/network/src/consensus_gossip.rs index 8ed6f8ba58f75..720cf55a67b75 100644 --- a/core/network/src/consensus_gossip.rs +++ b/core/network/src/consensus_gossip.rs @@ -40,26 +40,17 @@ struct PeerConsensus { is_authority: bool, } -#[derive(Clone, Copy)] -enum Status { - Live, - Future, -} - struct MessageEntry { message_hash: B::Hash, topic: B::Hash, message: ConsensusMessage, timestamp: Instant, - status: Status, } /// Message validation result. pub enum ValidationResult { /// Message is valid with this topic. Valid(H), - /// Message is future with this topic. - Future(H), /// Invalid message. Invalid, /// Obsolete message. @@ -70,20 +61,12 @@ pub enum ValidationResult { pub trait Validator { /// Validate consensus message. fn validate(&self, data: &[u8]) -> ValidationResult; - - /// Produce a closure for validating messages on a given topic. - fn message_expired<'a>(&'a self) -> Box bool + 'a> { - Box::new(move |_topic, data| match self.validate(data) { - ValidationResult::Valid(_) | ValidationResult::Future(_) => false, - ValidationResult::Invalid | ValidationResult::Expired => true, - }) - } } /// Consensus network protocol handler. Manages statements and candidate requests. pub struct ConsensusGossip { peers: HashMap>, - live_message_sinks: HashMap<(ConsensusEngineId, B::Hash), Vec>>>, + live_message_sinks: HashMap>>>, messages: Vec>, known_messages: LruCache, validators: HashMap>>, @@ -119,9 +102,7 @@ impl ConsensusGossip { // Send out all known messages to authorities. let mut known_messages = HashSet::new(); for entry in self.messages.iter() { - if entry.timestamp + MESSAGE_LIFETIME < now { continue } - if let Status::Future = entry.status { continue } - + if entry.timestamp + MESSAGE_LIFETIME < now { continue }; known_messages.insert(entry.message_hash); protocol.send_message(who, Message::Consensus(entry.message.clone())); } @@ -180,23 +161,18 @@ impl ConsensusGossip { } } - fn register_message( - &mut self, - message_hash: B::Hash, - topic: B::Hash, - status: Status, - get_message: F, - ) + fn register_message(&mut self, message_hash: B::Hash, topic: B::Hash, get_message: F) where F: Fn() -> ConsensusMessage { - if self.known_messages.insert(message_hash, ()).is_none() { + if self.known_messages.insert(message_hash, ()).is_none() + { self.messages.push(MessageEntry { topic, message_hash, message: get_message(), timestamp: Instant::now(), - status, }); + } } @@ -208,8 +184,6 @@ impl ConsensusGossip { /// Prune old or no longer relevant consensus messages. Provide a predicate /// for pruning, which returns `false` when the items with a given topic should be pruned. pub fn collect_garbage(&mut self) { - use std::collections::hash_map::Entry; - self.live_message_sinks.retain(|_, sinks| { sinks.retain(|sink| !sink.is_closed()); !sinks.is_empty() @@ -220,23 +194,15 @@ impl ConsensusGossip { let validators = &self.validators; let now = Instant::now(); - let mut check_fns = HashMap::new(); - let mut message_expired = move |entry: &MessageEntry| { - let engine_id = entry.message.engine_id; - let check_fn = match check_fns.entry(engine_id) { - Entry::Occupied(entry) => entry.into_mut(), - Entry::Vacant(vacant) => match validators.get(&engine_id) { - None => return true, // treat all messages with no validator as expired - Some(validator) => vacant.insert(validator.message_expired()), - } - }; - - (check_fn)(entry.topic, &entry.message.data) - }; - - self.messages.retain(|entry| - entry.timestamp + MESSAGE_LIFETIME >= now && !message_expired(entry) - ); + self.messages.retain(|entry| { + entry.timestamp + MESSAGE_LIFETIME >= now + && match validators.get(&entry.message.engine_id) + .map(|v| v.validate(&entry.message.data)) + { + Some(ValidationResult::Valid(_)) => true, + _ => false, + } + }); trace!(target: "gossip", "Cleaned up {} stale messages, {} left ({} known)", before - self.messages.len(), @@ -250,46 +216,12 @@ impl ConsensusGossip { } /// Get data of valid, incoming messages for a topic (but might have expired meanwhile) - pub fn messages_for(&mut self, engine_id: ConsensusEngineId, topic: B::Hash) - -> mpsc::UnboundedReceiver> - { + pub fn messages_for(&mut self, topic: B::Hash) -> mpsc::UnboundedReceiver> { let (tx, rx) = mpsc::unbounded(); - - let validator = match self.validators.get(&engine_id) { - None => { - self.live_message_sinks.entry((engine_id, topic)).or_default().push(tx); - return rx; - } - Some(v) => v, - }; - - for entry in self.messages.iter_mut() - .filter(|e| e.topic == topic && e.message.engine_id == engine_id) - { - let live = match entry.status { - Status::Live => true, - Status::Future => match validator.validate(&entry.message.data) { - ValidationResult::Valid(_) => { - entry.status = Status::Live; - true - } - _ => { - // don't send messages considered to be future still. - // if messages are considered expired they'll be cleaned up when we - // collect garbage. - false - } - } - }; - - if live { - entry.status = Status::Live; - tx.unbounded_send(entry.message.data.clone()) - .expect("receiver known to be live; qed"); - } + for entry in self.messages.iter().filter(|e| e.topic == topic) { + tx.unbounded_send(entry.message.data.clone()).expect("receiver known to be live; qed"); } - - self.live_message_sinks.entry((engine_id, topic)).or_default().push(tx); + self.live_message_sinks.entry(topic).or_default().push(tx); rx } @@ -315,13 +247,11 @@ impl ConsensusGossip { if let Some(ref mut peer) = self.peers.get_mut(&who) { use std::collections::hash_map::Entry; - let engine_id = message.engine_id; //validate the message - let (topic, status) = match self.validators.get(&engine_id) + let topic = match self.validators.get(&message.engine_id) .map(|v| v.validate(&message.data)) { - Some(ValidationResult::Valid(topic)) => (topic, Status::Live), - Some(ValidationResult::Future(topic)) => (topic, Status::Future), + Some(ValidationResult::Valid(topic)) => topic, Some(ValidationResult::Invalid) => { trace!(target:"gossip", "Invalid message from {}", who); protocol.report_peer( @@ -345,14 +275,13 @@ impl ConsensusGossip { who, Severity::Useless(format!("Sent unknown consensus engine id")), ); - trace!(target:"gossip", "Unknown message engine id {:?} from {}", - engine_id, who); + trace!(target:"gossip", "Unknown message engine id {:?} from {}", message.engine_id, who); return None; } }; peer.known_messages.insert(message_hash); - if let Entry::Occupied(mut entry) = self.live_message_sinks.entry((engine_id, topic)) { + if let Entry::Occupied(mut entry) = self.live_message_sinks.entry(topic) { debug!(target: "gossip", "Pushing consensus message to sinks for {}.", topic); entry.get_mut().retain(|sink| { if let Err(e) = sink.unbounded_send(message.data.clone()) { @@ -364,7 +293,7 @@ impl ConsensusGossip { entry.remove_entry(); } } - self.multicast_inner(protocol, message_hash, topic, status, || message.clone()); + self.multicast_inner(protocol, message_hash, topic, || message.clone()); Some((topic, message)) } else { trace!(target:"gossip", "Ignored statement from unregistered peer {}", who); @@ -380,7 +309,7 @@ impl ConsensusGossip { message: ConsensusMessage, ) { let message_hash = HashFor::::hash(&message.data); - self.multicast_inner(protocol, message_hash, topic, Status::Live, || message.clone()); + self.multicast_inner(protocol, message_hash, topic, || message.clone()); } fn multicast_inner( @@ -388,15 +317,12 @@ impl ConsensusGossip { protocol: &mut Context, message_hash: B::Hash, topic: B::Hash, - status: Status, get_message: F, ) where F: Fn() -> ConsensusMessage { - self.register_message(message_hash, topic, status, &get_message); - if let Status::Live = status { - self.propagate(protocol, message_hash, get_message); - } + self.register_message(message_hash, topic, &get_message); + self.propagate(protocol, message_hash, get_message); } /// Note new consensus session. @@ -409,8 +335,6 @@ impl ConsensusGossip { mod tests { use runtime_primitives::testing::{H256, Block as RawBlock, ExtrinsicWrapper}; use std::time::Instant; - use futures::Stream; - use super::*; type Block = RawBlock>; @@ -423,21 +347,21 @@ mod tests { message_hash: $hash, message: ConsensusMessage { data: $m, engine_id: [0, 0, 0, 0]}, timestamp: $now, - status: Status::Live, }); } } } - struct AllowAll; - impl Validator for AllowAll { - fn validate(&self, _data: &[u8]) -> ValidationResult { - ValidationResult::Valid(H256::default()) - } - } - #[test] fn collects_garbage() { + + struct AllowAll; + impl Validator for AllowAll { + fn validate(&self, _data: &[u8]) -> ValidationResult { + ValidationResult::Valid(H256::default()) + } + } + struct AllowOne; impl Validator for AllowOne { fn validate(&self, data: &[u8]) -> ValidationResult { @@ -493,15 +417,14 @@ mod tests { use futures::Stream; let mut consensus = ConsensusGossip::::new(); - consensus.register_validator([0, 0, 0, 0], Arc::new(AllowAll)); let message = ConsensusMessage { data: vec![4, 5, 6], engine_id: [0, 0, 0, 0] }; let message_hash = HashFor::::hash(&message.data); let topic = HashFor::::hash(&[1,2,3]); - consensus.register_message(message_hash, topic, Status::Live, || message.clone()); - let stream = consensus.messages_for([0, 0, 0, 0], topic); + consensus.register_message(message_hash, topic, || message.clone()); + let stream = consensus.messages_for(topic); assert_eq!(stream.wait().next(), Some(Ok(message.data))); } @@ -514,47 +437,29 @@ mod tests { let msg_a = ConsensusMessage { data: vec![1, 2, 3], engine_id: [0, 0, 0, 0] }; let msg_b = ConsensusMessage { data: vec![4, 5, 6], engine_id: [0, 0, 0, 0] }; - consensus.register_message(HashFor::::hash(&msg_a.data), topic, Status::Live, || msg_a.clone()); - consensus.register_message(HashFor::::hash(&msg_b.data), topic, Status::Live, || msg_b.clone()); + consensus.register_message(HashFor::::hash(&msg_a.data), topic, || msg_a.clone()); + consensus.register_message(HashFor::::hash(&msg_b.data), topic, || msg_b.clone()); assert_eq!(consensus.messages.len(), 2); } #[test] fn can_keep_multiple_subscribers_per_topic() { + use futures::Stream; + let mut consensus = ConsensusGossip::::new(); - consensus.register_validator([0, 0, 0, 0], Arc::new(AllowAll)); let message = ConsensusMessage { data: vec![4, 5, 6], engine_id: [0, 0, 0, 0] }; let message_hash = HashFor::::hash(&message.data); let topic = HashFor::::hash(&[1,2,3]); - consensus.register_message(message_hash, topic, Status::Live, || message.clone()); + consensus.register_message(message_hash, topic, || message.clone()); - let stream1 = consensus.messages_for([0, 0, 0, 0], topic); - let stream2 = consensus.messages_for([0, 0, 0, 0], topic); + let stream1 = consensus.messages_for(topic); + let stream2 = consensus.messages_for(topic); assert_eq!(stream1.wait().next(), Some(Ok(message.data.clone()))); assert_eq!(stream2.wait().next(), Some(Ok(message.data))); } - - #[test] - fn topics_are_localized_to_engine_id() { - let mut consensus = ConsensusGossip::::new(); - consensus.register_validator([0, 0, 0, 0], Arc::new(AllowAll)); - - let topic = [1; 32].into(); - let msg_a = ConsensusMessage { data: vec![1, 2, 3], engine_id: [0, 0, 0, 0] }; - let msg_b = ConsensusMessage { data: vec![4, 5, 6], engine_id: [0, 0, 0, 1] }; - - consensus.register_message(HashFor::::hash(&msg_a.data), topic, Status::Live, || msg_a.clone()); - consensus.register_message(HashFor::::hash(&msg_b.data), topic, Status::Live, || msg_b.clone()); - - let mut stream = consensus.messages_for([0, 0, 0, 0], topic).wait(); - - assert_eq!(stream.next(), Some(Ok(vec![1, 2, 3]))); - let _ = consensus.live_message_sinks.remove(&([0, 0, 0, 0], topic)); - assert_eq!(stream.next(), None); - } } diff --git a/core/network/src/test/mod.rs b/core/network/src/test/mod.rs index 94cafc0f0555b..f34ceaa4b88e9 100644 --- a/core/network/src/test/mod.rs +++ b/core/network/src/test/mod.rs @@ -379,12 +379,11 @@ impl, D> Peer { /// access the underlying consensus gossip handler pub fn consensus_gossip_messages_for( &self, - engine_id: ConsensusEngineId, topic: ::Hash, ) -> mpsc::UnboundedReceiver> { let (tx, rx) = oneshot::channel(); self.with_gossip(move |gossip, _| { - let inner_rx = gossip.messages_for(engine_id, topic); + let inner_rx = gossip.messages_for(topic); let _ = tx.send(inner_rx); }); rx.wait().ok().expect("1. Network is running, 2. it should handle the above closure successfully")