From cc1ab38c57628d46de60d00c97ca0585766f6a4f Mon Sep 17 00:00:00 2001 From: Max Inden Date: Tue, 11 Feb 2020 16:01:21 +0100 Subject: [PATCH 1/2] client/network-gossip: Merge GossipEngine and GossipEngineInner Given that GossipEngine and GossipEngineInner are not shared between threads anyone (public interface + background tasks), neither depends on being Send or Sync. Thus one can merge the two as done in this patch. One only needs to wrap an `Arc>` around the whole structure when the owner (e.g. finality-grandpa) needs to share the gossip engine between threads. --- Cargo.lock | 1 - client/network-gossip/Cargo.toml | 1 - client/network-gossip/src/bridge.rs | 73 +++++++---------------------- 3 files changed, 17 insertions(+), 58 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 48b14599b1ca9..af754ac1dccef 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6275,7 +6275,6 @@ dependencies = [ "libp2p", "log 0.4.8", "lru 0.1.17", - "parking_lot 0.10.0", "sc-network", "sp-runtime", "wasm-timer", diff --git a/client/network-gossip/Cargo.toml b/client/network-gossip/Cargo.toml index 25b0548c840c7..64a27d496d363 100644 --- a/client/network-gossip/Cargo.toml +++ b/client/network-gossip/Cargo.toml @@ -14,7 +14,6 @@ futures-timer = "3.0.1" libp2p = { version = "0.16.1", default-features = false, features = ["libp2p-websocket"] } log = "0.4.8" lru = "0.1.2" -parking_lot = "0.10.0" sc-network = { version = "0.8.0-dev", path = "../network" } sp-runtime = { version = "2.0.0-dev", path = "../../primitives/runtime" } wasm-timer = "0.2" diff --git a/client/network-gossip/src/bridge.rs b/client/network-gossip/src/bridge.rs index c911766aba40a..c06cb6268cc80 100644 --- a/client/network-gossip/src/bridge.rs +++ b/client/network-gossip/src/bridge.rs @@ -22,18 +22,12 @@ use sc_network::{Event, ReputationChange}; use futures::{prelude::*, channel::mpsc}; use libp2p::PeerId; -use parking_lot::Mutex; use sp_runtime::{traits::Block as BlockT, ConsensusEngineId}; use std::{borrow::Cow, pin::Pin, sync::Arc, task::{Context, Poll}}; /// Wraps around an implementation of the `Network` crate and provides gossiping capabilities on /// top of it. pub struct GossipEngine { - inner: Arc>>, - engine_id: ConsensusEngineId, -} - -struct GossipEngineInner { state_machine: ConsensusGossip, network: Box + Send>, periodic_maintenance_interval: futures_timer::Delay, @@ -41,7 +35,7 @@ struct GossipEngineInner { engine_id: ConsensusEngineId, } -impl Unpin for GossipEngineInner {} +impl Unpin for GossipEngine {} impl GossipEngine { /// Create a new instance. @@ -60,24 +54,17 @@ impl GossipEngine { network.register_notifications_protocol(engine_id, protocol_name.into()); state_machine.register_validator(&mut network, engine_id, validator); - let inner = Arc::new(Mutex::new(GossipEngineInner { + GossipEngine { state_machine, network: Box::new(network), periodic_maintenance_interval: futures_timer::Delay::new(PERIODIC_MAINTENANCE_INTERVAL), network_event_stream, engine_id, - })); - - let gossip_engine = GossipEngine { - inner: inner.clone(), - engine_id, - }; - - gossip_engine + } } pub fn report(&self, who: PeerId, reputation: ReputationChange) { - self.inner.lock().network.report_peer(who, reputation); + self.network.report_peer(who, reputation); } /// Registers a message without propagating it to any peers. The message @@ -86,7 +73,7 @@ impl GossipEngine { /// message is already expired it should be dropped on the next garbage /// collection. pub fn register_gossip_message( - &self, + &mut self, topic: B::Hash, message: Vec, ) { @@ -95,38 +82,34 @@ impl GossipEngine { data: message, }; - self.inner.lock().state_machine.register_message(topic, message); + self.state_machine.register_message(topic, message); } /// Broadcast all messages with given topic. - pub fn broadcast_topic(&self, topic: B::Hash, force: bool) { - let mut inner = self.inner.lock(); - let inner = &mut *inner; - inner.state_machine.broadcast_topic(&mut *inner.network, topic, force); + pub fn broadcast_topic(&mut self, topic: B::Hash, force: bool) { + self.state_machine.broadcast_topic(&mut *self.network, topic, force); } /// Get data of valid, incoming messages for a topic (but might have expired meanwhile). - pub fn messages_for(&self, topic: B::Hash) + pub fn messages_for(&mut self, topic: B::Hash) -> mpsc::UnboundedReceiver { - self.inner.lock().state_machine.messages_for(self.engine_id, topic) + self.state_machine.messages_for(self.engine_id, topic) } /// Send all messages with given topic to a peer. pub fn send_topic( - &self, + &mut self, who: &PeerId, topic: B::Hash, force: bool ) { - let mut inner = self.inner.lock(); - let inner = &mut *inner; - inner.state_machine.send_topic(&mut *inner.network, who, topic, self.engine_id, force) + self.state_machine.send_topic(&mut *self.network, who, topic, self.engine_id, force) } /// Multicast a message to all peers. pub fn gossip_message( - &self, + &mut self, topic: B::Hash, message: Vec, force: bool, @@ -136,19 +119,14 @@ impl GossipEngine { data: message, }; - let mut inner = self.inner.lock(); - let inner = &mut *inner; - inner.state_machine.multicast(&mut *inner.network, topic, message, force) + self.state_machine.multicast(&mut *self.network, topic, message, force) } /// Send addressed message to the given peers. The message is not kept or multicast /// later on. - pub fn send_message(&self, who: Vec, data: Vec) { - let mut inner = self.inner.lock(); - let inner = &mut *inner; - + pub fn send_message(&mut self, who: Vec, data: Vec) { for who in &who { - inner.state_machine.send_message(&mut *inner.network, who, ConsensusMessage { + self.state_machine.send_message(&mut *self.network, who, ConsensusMessage { engine_id: self.engine_id, data: data.clone(), }); @@ -160,21 +138,13 @@ impl GossipEngine { /// Note: this method isn't strictly related to gossiping and should eventually be moved /// somewhere else. pub fn announce(&self, block: B::Hash, associated_data: Vec) { - self.inner.lock().network.announce(block, associated_data); + self.network.announce(block, associated_data); } } impl Future for GossipEngine { type Output = (); - fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll { - self.inner.lock().poll_unpin(cx) - } -} - -impl Future for GossipEngineInner { - type Output = (); - fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll { let this = &mut *self; @@ -216,12 +186,3 @@ impl Future for GossipEngineInner { Poll::Pending } } - -impl Clone for GossipEngine { - fn clone(&self) -> Self { - GossipEngine { - inner: self.inner.clone(), - engine_id: self.engine_id.clone(), - } - } -} From 9ac9117a91fc9f103eca2495a02c874c19ed1e6f Mon Sep 17 00:00:00 2001 From: Max Inden Date: Tue, 11 Feb 2020 16:02:05 +0100 Subject: [PATCH 2/2] client/finality-grandpa: Wrap GossipEngine in Arc Mutex & lock it on use GossipEngine in itself has no need to be Send and Sync, given that it does not rely on separately spawned background tasks anymore. Given that finality-grandpa shares the `NetworkBridge` potentially between threads its components need to be clonable, thus this patch wraps `GossipEngine` in an `Arc>`. --- .../finality-grandpa/src/communication/mod.rs | 52 +++++++++---------- 1 file changed, 26 insertions(+), 26 deletions(-) diff --git a/client/finality-grandpa/src/communication/mod.rs b/client/finality-grandpa/src/communication/mod.rs index 1f4e22359712f..7525c44f15e0f 100644 --- a/client/finality-grandpa/src/communication/mod.rs +++ b/client/finality-grandpa/src/communication/mod.rs @@ -142,7 +142,7 @@ pub(crate) fn global_topic(set_id: SetIdNumber) -> B::Hash { /// Bridge between the underlying network service, gossiping consensus messages and Grandpa pub(crate) struct NetworkBridge> { service: N, - gossip_engine: GossipEngine, + gossip_engine: Arc>>, validator: Arc>, /// Sender side of the neighbor packet channel. @@ -185,12 +185,12 @@ impl> NetworkBridge { ); let validator = Arc::new(validator); - let gossip_engine = GossipEngine::new( + let gossip_engine = Arc::new(Mutex::new(GossipEngine::new( service.clone(), GRANDPA_ENGINE_ID, GRANDPA_PROTOCOL_NAME, validator.clone() - ); + ))); { // register all previous votes with the gossip service so that they're @@ -214,7 +214,7 @@ impl> NetworkBridge { } ); - gossip_engine.register_gossip_message( + gossip_engine.lock().register_gossip_message( topic, message.encode(), ); @@ -293,7 +293,7 @@ impl> NetworkBridge { }); let topic = round_topic::(round.0, set_id.0); - let incoming = self.gossip_engine.messages_for(topic) + let incoming = self.gossip_engine.lock().messages_for(topic) .filter_map(move |notification| { let decoded = GossipMessage::::decode(&mut ¬ification.message[..]); @@ -422,11 +422,11 @@ impl> NetworkBridge { impl> Future for NetworkBridge { type Output = Result<(), Error>; - fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll { + fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll { loop { match self.neighbor_packet_worker.lock().poll_next_unpin(cx) { Poll::Ready(Some((to, packet))) => { - self.gossip_engine.send_message(to, packet.encode()); + self.gossip_engine.lock().send_message(to, packet.encode()); }, Poll::Ready(None) => return Poll::Ready( Err(Error::Network("Neighbor packet worker stream closed.".into())) @@ -438,7 +438,7 @@ impl> Future for NetworkBridge { loop { match self.gossip_validator_report_stream.lock().poll_next_unpin(cx) { Poll::Ready(Some(PeerReport { who, cost_benefit })) => { - self.gossip_engine.report(who, cost_benefit); + self.gossip_engine.lock().report(who, cost_benefit); }, Poll::Ready(None) => return Poll::Ready( Err(Error::Network("Gossip validator report stream closed.".into())) @@ -447,7 +447,7 @@ impl> Future for NetworkBridge { } } - match self.gossip_engine.poll_unpin(cx) { + match self.gossip_engine.lock().poll_unpin(cx) { // The gossip engine future finished. We should do the same. Poll::Ready(()) => return Poll::Ready(Ok(())), Poll::Pending => {}, @@ -458,7 +458,7 @@ impl> Future for NetworkBridge { } fn incoming_global( - mut gossip_engine: GossipEngine, + gossip_engine: Arc>>, topic: B::Hash, voters: Arc>, gossip_validator: Arc>, @@ -467,7 +467,7 @@ fn incoming_global( let process_commit = move | msg: FullCommitMessage, mut notification: sc_network_gossip::TopicNotification, - gossip_engine: &mut GossipEngine, + gossip_engine: &Arc>>, gossip_validator: &Arc>, voters: &VoterSet, | { @@ -491,7 +491,7 @@ fn incoming_global( msg.set_id, ) { if let Some(who) = notification.sender { - gossip_engine.report(who, cost); + gossip_engine.lock().report(who, cost); } return None; @@ -513,12 +513,12 @@ fn incoming_global( |to, neighbor| neighbor_sender.send(to, neighbor), ); - gossip_engine.gossip_message(topic, notification.message.clone(), false); + gossip_engine.lock().gossip_message(topic, notification.message.clone(), false); } voter::CommitProcessingOutcome::Bad(_) => { // report peer and do not gossip. if let Some(who) = notification.sender.take() { - gossip_engine.report(who, cost::INVALID_COMMIT); + gossip_engine.lock().report(who, cost::INVALID_COMMIT); } } }; @@ -531,7 +531,7 @@ fn incoming_global( let process_catch_up = move | msg: FullCatchUpMessage, mut notification: sc_network_gossip::TopicNotification, - gossip_engine: &mut GossipEngine, + gossip_engine: &Arc>>, gossip_validator: &Arc>, voters: &VoterSet, | { @@ -544,7 +544,7 @@ fn incoming_global( msg.set_id, ) { if let Some(who) = notification.sender { - gossip_engine.report(who, cost); + gossip_engine.lock().report(who, cost); } return None; @@ -554,7 +554,7 @@ fn incoming_global( if let voter::CatchUpProcessingOutcome::Bad(_) = outcome { // report peer if let Some(who) = notification.sender.take() { - gossip_engine.report(who, cost::INVALID_CATCH_UP); + gossip_engine.lock().report(who, cost::INVALID_CATCH_UP); } } @@ -566,7 +566,7 @@ fn incoming_global( Some(voter::CommunicationIn::CatchUp(msg.message, cb)) }; - gossip_engine.messages_for(topic) + gossip_engine.clone().lock().messages_for(topic) .filter_map(|notification| { // this could be optimized by decoding piecewise. let decoded = GossipMessage::::decode(&mut ¬ification.message[..]); @@ -578,9 +578,9 @@ fn incoming_global( .filter_map(move |(notification, msg)| { future::ready(match msg { GossipMessage::Commit(msg) => - process_commit(msg, notification, &mut gossip_engine, &gossip_validator, &*voters), + process_commit(msg, notification, &gossip_engine, &gossip_validator, &*voters), GossipMessage::CatchUp(msg) => - process_catch_up(msg, notification, &mut gossip_engine, &gossip_validator, &*voters), + process_catch_up(msg, notification, &gossip_engine, &gossip_validator, &*voters), _ => { debug!(target: "afg", "Skipping unknown message type"); None @@ -688,7 +688,7 @@ pub(crate) struct OutgoingMessages { set_id: SetIdNumber, locals: Option<(AuthorityPair, AuthorityId)>, sender: mpsc::Sender>, - network: GossipEngine, + network: Arc>>, has_voted: HasVoted, } @@ -754,11 +754,11 @@ impl Sink> for OutgoingMessages ); // announce the block we voted on to our peers. - self.network.announce(target_hash, Vec::new()); + self.network.lock().announce(target_hash, Vec::new()); // propagate the message to peers let topic = round_topic::(self.round, self.set_id); - self.network.gossip_message(topic, message.encode(), false); + self.network.lock().gossip_message(topic, message.encode(), false); // forward the message to the inner sender. return self.sender.start_send(signed).map_err(|e| { @@ -959,7 +959,7 @@ fn check_catch_up( /// An output sink for commit messages. struct CommitsOut { - network: GossipEngine, + network: Arc>>, set_id: SetId, is_voter: bool, gossip_validator: Arc>, @@ -969,7 +969,7 @@ struct CommitsOut { impl CommitsOut { /// Create a new commit output stream. pub(crate) fn new( - network: GossipEngine, + network: Arc>>, set_id: SetIdNumber, is_voter: bool, gossip_validator: Arc>, @@ -1028,7 +1028,7 @@ impl Sink<(RoundNumber, Commit)> for CommitsOut { commit.target_number, |to, neighbor| self.neighbor_sender.send(to, neighbor), ); - self.network.gossip_message(topic, message.encode(), false); + self.network.lock().gossip_message(topic, message.encode(), false); Ok(()) }