From 799206af1759b8b2a64da63a9c4de859b72a94b1 Mon Sep 17 00:00:00 2001 From: Pierre Krieger Date: Mon, 6 Jan 2020 14:36:44 +0100 Subject: [PATCH 1/4] Remove usage of sc_network::Context trait --- client/network-gossip/src/bridge.rs | 72 ++++---------------- client/network-gossip/src/state_machine.rs | 77 ++++++++++------------ 2 files changed, 47 insertions(+), 102 deletions(-) diff --git a/client/network-gossip/src/bridge.rs b/client/network-gossip/src/bridge.rs index d327f8209236c..1ed843d9960d5 100644 --- a/client/network-gossip/src/bridge.rs +++ b/client/network-gossip/src/bridge.rs @@ -17,7 +17,6 @@ use crate::Network; use crate::state_machine::{ConsensusGossip, Validator, TopicNotification}; -use sc_network::Context; use sc_network::message::generic::ConsensusMessage; use sc_network::{Event, ReputationChange}; @@ -36,8 +35,7 @@ pub struct GossipEngine { struct GossipEngineInner { state_machine: ConsensusGossip, - context: Box + Send>, - context_ext: Box + Send>, + network: Box + Send>, } impl GossipEngine { @@ -49,24 +47,17 @@ impl GossipEngine { validator: Arc>, ) -> Self where B: 'static { let mut state_machine = ConsensusGossip::new(); - let mut context = Box::new(ContextOverService { - network: network.clone(), - }); - let context_ext = Box::new(ContextOverService { - network: network.clone(), - }); // We grab the event stream before registering the notifications protocol, otherwise we // might miss events. let event_stream = network.event_stream(); network.register_notifications_protocol(engine_id); - state_machine.register_validator(&mut *context, engine_id, validator); + state_machine.register_validator(&network, engine_id, validator); let inner = Arc::new(Mutex::new(GossipEngineInner { state_machine, - context, - context_ext, + network: Box::new(network), })); let gossip_engine = GossipEngine { @@ -82,7 +73,7 @@ impl GossipEngine { if let Some(inner) = inner.upgrade() { let mut inner = inner.lock(); let inner = &mut *inner; - inner.state_machine.tick(&mut *inner.context); + inner.state_machine.tick(&*inner.network); } else { // We reach this branch if the `Arc` has no reference // left. We can now let the task end. @@ -107,7 +98,7 @@ impl GossipEngine { } let mut inner = inner.lock(); let inner = &mut *inner; - inner.state_machine.new_peer(&mut *inner.context, remote, roles); + inner.state_machine.new_peer(&*inner.network, remote, roles); } Event::NotificationsStreamClosed { remote, engine_id: msg_engine_id } => { if msg_engine_id != engine_id { @@ -115,13 +106,13 @@ impl GossipEngine { } let mut inner = inner.lock(); let inner = &mut *inner; - inner.state_machine.peer_disconnected(&mut *inner.context, remote); + inner.state_machine.peer_disconnected(&*inner.network, remote); }, Event::NotificationsReceived { remote, messages } => { let mut inner = inner.lock(); let inner = &mut *inner; inner.state_machine.on_incoming( - &mut *inner.context, + &*inner.network, remote, messages.into_iter() .filter_map(|(engine, data)| if engine == engine_id { @@ -149,7 +140,7 @@ impl GossipEngine { } pub fn report(&self, who: PeerId, reputation: ReputationChange) { - self.inner.lock().context.report_peer(who, reputation); + self.inner.lock().network.report_peer(who, reputation); } /// Registers a message without propagating it to any peers. The message @@ -174,7 +165,7 @@ impl GossipEngine { pub fn broadcast_topic(&self, topic: B::Hash, force: bool) { let mut inner = self.inner.lock(); let inner = &mut *inner; - inner.state_machine.broadcast_topic(&mut *inner.context, topic, force); + inner.state_machine.broadcast_topic(&*inner.network, topic, force); } /// Get data of valid, incoming messages for a topic (but might have expired meanwhile). @@ -193,7 +184,7 @@ impl GossipEngine { ) { let mut inner = self.inner.lock(); let inner = &mut *inner; - inner.state_machine.send_topic(&mut *inner.context, who, topic, self.engine_id, force) + inner.state_machine.send_topic(&*inner.network, who, topic, self.engine_id, force) } /// Multicast a message to all peers. @@ -210,7 +201,7 @@ impl GossipEngine { let mut inner = self.inner.lock(); let inner = &mut *inner; - inner.state_machine.multicast(&mut *inner.context, topic, message, force) + inner.state_machine.multicast(&*inner.network, topic, message, force) } /// Send addressed message to the given peers. The message is not kept or multicast @@ -220,7 +211,7 @@ impl GossipEngine { let inner = &mut *inner; for who in &who { - inner.state_machine.send_message(&mut *inner.context, who, ConsensusMessage { + inner.state_machine.send_message(&*inner.network, who, ConsensusMessage { engine_id: self.engine_id, data: data.clone(), }); @@ -232,7 +223,7 @@ impl GossipEngine { /// Note: this method isn't strictly related to gossiping and should eventually be moved /// somewhere else. pub fn announce(&self, block: B::Hash, associated_data: Vec) { - self.inner.lock().context_ext.announce(block, associated_data); + self.inner.lock().network.announce(block, associated_data); } } @@ -244,40 +235,3 @@ impl Clone for GossipEngine { } } } - -struct ContextOverService { - network: N, -} - -impl> Context for ContextOverService { - fn report_peer(&mut self, who: PeerId, reputation: ReputationChange) { - self.network.report_peer(who, reputation); - } - - fn disconnect_peer(&mut self, who: PeerId) { - self.network.disconnect_peer(who) - } - - fn send_consensus(&mut self, who: PeerId, messages: Vec) { - for message in messages { - self.network.write_notification(who.clone(), message.engine_id, message.data); - } - } - - fn send_chain_specific(&mut self, _: PeerId, _: Vec) { - log::error!( - target: "sub-libp2p", - "send_chain_specific has been called in a context where it shouldn't" - ); - } -} - -trait ContextExt { - fn announce(&self, block: B::Hash, associated_data: Vec); -} - -impl> ContextExt for ContextOverService { - fn announce(&self, block: B::Hash, associated_data: Vec) { - Network::announce(&self.network, block, associated_data) - } -} diff --git a/client/network-gossip/src/state_machine.rs b/client/network-gossip/src/state_machine.rs index ad7ce068333f1..9cef8eb670f7a 100644 --- a/client/network-gossip/src/state_machine.rs +++ b/client/network-gossip/src/state_machine.rs @@ -14,6 +14,8 @@ // You should have received a copy of the GNU General Public License // along with Substrate. If not, see . +use crate::Network; + use std::collections::{HashMap, HashSet, hash_map::Entry}; use std::sync::Arc; use std::iter; @@ -25,7 +27,6 @@ use libp2p::PeerId; use sp_runtime::traits::{Block as BlockT, Hash, HashFor}; use sp_runtime::ConsensusEngineId; pub use sc_network::message::generic::{Message, ConsensusMessage}; -use sc_network::Context; use sc_network::config::Roles; // FIXME: Add additional spam/DoS attack protection: https://github.com/paritytech/substrate/issues/1115 @@ -109,20 +110,20 @@ pub trait ValidatorContext { struct NetworkContext<'g, 'p, B: BlockT> { gossip: &'g mut ConsensusGossip, - protocol: &'p mut dyn Context, + network: &'p dyn Network, engine_id: ConsensusEngineId, } impl<'g, 'p, B: BlockT> ValidatorContext for NetworkContext<'g, 'p, B> { /// Broadcast all messages with given topic to peers that do not have it yet. fn broadcast_topic(&mut self, topic: B::Hash, force: bool) { - self.gossip.broadcast_topic(self.protocol, topic, force); + self.gossip.broadcast_topic(self.network, topic, force); } /// Broadcast a message to all peers that have not received it previously. fn broadcast_message(&mut self, topic: B::Hash, message: Vec, force: bool) { self.gossip.multicast( - self.protocol, + self.network, topic, ConsensusMessage{ data: message, engine_id: self.engine_id.clone() }, force, @@ -131,20 +132,17 @@ impl<'g, 'p, B: BlockT> ValidatorContext for NetworkContext<'g, 'p, B> { /// Send addressed message to a peer. fn send_message(&mut self, who: &PeerId, message: Vec) { - self.protocol.send_consensus(who.clone(), vec![ConsensusMessage { - engine_id: self.engine_id, - data: message, - }]); + self.network.write_notification(who.clone(), self.engine_id, message); } /// Send all messages with given topic to a peer. fn send_topic(&mut self, who: &PeerId, topic: B::Hash, force: bool) { - self.gossip.send_topic(self.protocol, who, topic, self.engine_id, force); + self.gossip.send_topic(self.network, who, topic, self.engine_id, force); } } fn propagate<'a, B: BlockT, I>( - protocol: &mut dyn Context, + network: &dyn Network, messages: I, intent: MessageIntent, peers: &mut HashMap>, @@ -168,7 +166,6 @@ fn propagate<'a, B: BlockT, I>( }; for (id, ref mut peer) in peers.iter_mut() { - let mut batch = Vec::new(); for (message_hash, topic, message) in messages.clone() { let intent = match intent { MessageIntent::Broadcast { .. } => @@ -195,9 +192,8 @@ fn propagate<'a, B: BlockT, I>( peer.known_messages.insert(message_hash.clone()); trace!(target: "gossip", "Propagating to {}: {:?}", id, message); - batch.push(message.clone()) + network.write_notification(id.clone(), message.engine_id, message.data.clone()); } - protocol.send_consensus(id.clone(), batch); } } @@ -261,14 +257,14 @@ impl ConsensusGossip { /// Register message validator for a message type. pub fn register_validator( &mut self, - protocol: &mut dyn Context, + network: &dyn Network, engine_id: ConsensusEngineId, validator: Arc> ) { self.register_validator_internal(engine_id, validator.clone()); let peers: Vec<_> = self.peers.iter().map(|(id, peer)| (id.clone(), peer.roles)).collect(); for (id, roles) in peers { - let mut context = NetworkContext { gossip: self, protocol, engine_id: engine_id.clone() }; + let mut context = NetworkContext { gossip: self, network, engine_id: engine_id.clone() }; validator.new_peer(&mut context, &id, roles); } } @@ -278,7 +274,7 @@ impl ConsensusGossip { } /// Handle new connected peer. - pub fn new_peer(&mut self, protocol: &mut dyn Context, who: PeerId, roles: Roles) { + pub fn new_peer(&mut self, network: &dyn Network, who: PeerId, roles: Roles) { // light nodes are not valid targets for consensus gossip messages if !roles.is_full() { return; @@ -290,7 +286,7 @@ impl ConsensusGossip { roles, }); for (engine_id, v) in self.validators.clone() { - let mut context = NetworkContext { gossip: self, protocol, engine_id: engine_id.clone() }; + let mut context = NetworkContext { gossip: self, network, engine_id: engine_id.clone() }; v.new_peer(&mut context, &who, roles); } } @@ -327,37 +323,37 @@ impl ConsensusGossip { } /// Call when a peer has been disconnected to stop tracking gossip status. - pub fn peer_disconnected(&mut self, protocol: &mut dyn Context, who: PeerId) { + pub fn peer_disconnected(&mut self, network: &dyn Network, who: PeerId) { for (engine_id, v) in self.validators.clone() { - let mut context = NetworkContext { gossip: self, protocol, engine_id: engine_id.clone() }; + let mut context = NetworkContext { gossip: self, network, engine_id: engine_id.clone() }; v.peer_disconnected(&mut context, &who); } } /// Perform periodic maintenance - pub fn tick(&mut self, protocol: &mut dyn Context) { + pub fn tick(&mut self, network: &dyn Network) { self.collect_garbage(); if time::Instant::now() >= self.next_broadcast { - self.rebroadcast(protocol); + self.rebroadcast(network); self.next_broadcast = time::Instant::now() + REBROADCAST_INTERVAL; } } /// Rebroadcast all messages to all peers. - fn rebroadcast(&mut self, protocol: &mut dyn Context) { + fn rebroadcast(&mut self, network: &dyn Network) { let messages = self.messages.iter() .map(|entry| (&entry.message_hash, &entry.topic, &entry.message)); - propagate(protocol, messages, MessageIntent::PeriodicRebroadcast, &mut self.peers, &self.validators); + propagate(network, messages, MessageIntent::PeriodicRebroadcast, &mut self.peers, &self.validators); } /// Broadcast all messages with given topic. - pub fn broadcast_topic(&mut self, protocol: &mut dyn Context, topic: B::Hash, force: bool) { + pub fn broadcast_topic(&mut self, network: &dyn Network, topic: B::Hash, force: bool) { let messages = self.messages.iter() .filter_map(|entry| if entry.topic == topic { Some((&entry.message_hash, &entry.topic, &entry.message)) } else { None } ); let intent = if force { MessageIntent::ForcedBroadcast } else { MessageIntent::broadcast() }; - propagate(protocol, messages, intent, &mut self.peers, &self.validators); + propagate(network, messages, intent, &mut self.peers, &self.validators); } /// Prune old or no longer relevant consensus messages. Provide a predicate @@ -425,7 +421,7 @@ impl ConsensusGossip { /// in all other cases. pub fn on_incoming( &mut self, - protocol: &mut dyn Context, + network: &dyn Network, who: PeerId, messages: Vec, ) { @@ -435,7 +431,7 @@ impl ConsensusGossip { if self.known_messages.contains(&message_hash) { trace!(target:"gossip", "Ignored already known message from {}", who); - protocol.report_peer(who.clone(), rep::DUPLICATE_GOSSIP); + network.report_peer(who.clone(), rep::DUPLICATE_GOSSIP); continue; } @@ -444,7 +440,7 @@ impl ConsensusGossip { let validation = self.validators.get(&engine_id) .cloned() .map(|v| { - let mut context = NetworkContext { gossip: self, protocol, engine_id }; + let mut context = NetworkContext { gossip: self, network, engine_id }; v.validate(&mut context, &who, &message.data) }); @@ -454,14 +450,14 @@ impl ConsensusGossip { Some(ValidationResult::Discard) => None, None => { trace!(target:"gossip", "Unknown message engine id {:?} from {}", engine_id, who); - protocol.report_peer(who.clone(), rep::UNKNOWN_GOSSIP); - protocol.disconnect_peer(who.clone()); + network.report_peer(who.clone(), rep::UNKNOWN_GOSSIP); + network.disconnect_peer(who.clone()); continue; } }; if let Some((topic, keep)) = validation_result { - protocol.report_peer(who.clone(), rep::GOSSIP_SUCCESS); + network.report_peer(who.clone(), rep::GOSSIP_SUCCESS); if let Some(ref mut peer) = self.peers.get_mut(&who) { peer.known_messages.insert(message_hash); if let Entry::Occupied(mut entry) = self.live_message_sinks.entry((engine_id, topic)) { @@ -484,7 +480,7 @@ impl ConsensusGossip { } } else { trace!(target:"gossip", "Ignored statement from unregistered peer {}", who); - protocol.report_peer(who.clone(), rep::UNREGISTERED_TOPIC); + network.report_peer(who.clone(), rep::UNREGISTERED_TOPIC); } } else { trace!(target:"gossip", "Handled valid one hop message from peer {}", who); @@ -495,7 +491,7 @@ impl ConsensusGossip { /// Send all messages with given topic to a peer. pub fn send_topic( &mut self, - protocol: &mut dyn Context, + network: &dyn Network, who: &PeerId, topic: B::Hash, engine_id: ConsensusEngineId, @@ -508,7 +504,6 @@ impl ConsensusGossip { }; if let Some(ref mut peer) = self.peers.get_mut(who) { - let mut batch = Vec::new(); for entry in self.messages.iter().filter(|m| m.topic == topic && m.message.engine_id == engine_id) { let intent = if force { MessageIntent::ForcedBroadcast @@ -527,19 +522,15 @@ impl ConsensusGossip { peer.known_messages.insert(entry.message_hash.clone()); trace!(target: "gossip", "Sending topic message to {}: {:?}", who, entry.message); - batch.push(ConsensusMessage { - engine_id: engine_id.clone(), - data: entry.message.data.clone(), - }); + network.write_notification(who.clone(), engine_id, entry.message.data.clone()); } - protocol.send_consensus(who.clone(), batch); } } /// Multicast a message to all peers. pub fn multicast( &mut self, - protocol: &mut dyn Context, + network: &dyn Network, topic: B::Hash, message: ConsensusMessage, force: bool, @@ -547,14 +538,14 @@ impl ConsensusGossip { let message_hash = HashFor::::hash(&message.data); self.register_message_hashed(message_hash, topic, message.clone(), None); let intent = if force { MessageIntent::ForcedBroadcast } else { MessageIntent::broadcast() }; - propagate(protocol, iter::once((&message_hash, &topic, &message)), intent, &mut self.peers, &self.validators); + propagate(network, iter::once((&message_hash, &topic, &message)), intent, &mut self.peers, &self.validators); } /// Send addressed message to a peer. The message is not kept or multicast /// later on. pub fn send_message( &mut self, - protocol: &mut dyn Context, + network: &dyn Network, who: &PeerId, message: ConsensusMessage, ) { @@ -568,7 +559,7 @@ impl ConsensusGossip { trace!(target: "gossip", "Sending direct to {}: {:?}", who, message); peer.known_messages.insert(message_hash); - protocol.send_consensus(who.clone(), vec![message.clone()]); + network.write_notification(who.clone(), message.engine_id, message.data); } } From 86c634f0b651e0078f88b0eb9686c8a817c318af Mon Sep 17 00:00:00 2001 From: Pierre Krieger Date: Mon, 6 Jan 2020 14:39:54 +0100 Subject: [PATCH 2/4] Remove Context::send_consensus --- client/network/src/protocol.rs | 34 ---------------------------------- 1 file changed, 34 deletions(-) diff --git a/client/network/src/protocol.rs b/client/network/src/protocol.rs index 1b30da59dec5a..a427535886cf2 100644 --- a/client/network/src/protocol.rs +++ b/client/network/src/protocol.rs @@ -81,8 +81,6 @@ pub(crate) const MIN_VERSION: u32 = 3; // Maximum allowed entries in `BlockResponse` const MAX_BLOCK_DATA_RESPONSE: u32 = 128; -// Maximum allowed entries in `ConsensusBatch` -const MAX_CONSENSUS_MESSAGES: usize = 256; /// When light node connects to the full node and the full node is behind light node /// for at least `LIGHT_MAXIMAL_BLOCKS_DIFFERENCE` blocks, we consider it unuseful /// and disconnect to free connection slot. @@ -328,9 +326,6 @@ pub trait Context { /// Force disconnecting from a peer. Use this when a peer misbehaved. fn disconnect_peer(&mut self, who: PeerId); - /// Send a consensus message to a peer. - fn send_consensus(&mut self, who: PeerId, messages: Vec); - /// Send a chain-specific message to a peer. fn send_chain_specific(&mut self, who: PeerId, message: Vec); } @@ -361,35 +356,6 @@ impl<'a, B: BlockT + 'a, H: ExHashT + 'a> Context for ProtocolContext<'a, B, self.behaviour.disconnect_peer(&who) } - fn send_consensus(&mut self, who: PeerId, messages: Vec) { - if self.context_data.peers.get(&who).map_or(false, |peer| peer.info.protocol_version > 4) { - let mut batch = Vec::new(); - let len = messages.len(); - for (index, message) in messages.into_iter().enumerate() { - batch.reserve(MAX_CONSENSUS_MESSAGES); - batch.push(message); - if batch.len() == MAX_CONSENSUS_MESSAGES || index == len - 1 { - send_message:: ( - self.behaviour, - &mut self.context_data.stats, - &who, - GenericMessage::ConsensusBatch(std::mem::replace(&mut batch, Vec::new())), - ) - } - } - } else { - // Backwards compatibility - for message in messages { - send_message:: ( - self.behaviour, - &mut self.context_data.stats, - &who, - GenericMessage::Consensus(message) - ) - } - } - } - fn send_chain_specific(&mut self, who: PeerId, message: Vec) { send_message:: ( self.behaviour, From 34b106932472cd858b62d3d28478797d26292fdc Mon Sep 17 00:00:00 2001 From: Pierre Krieger Date: Mon, 6 Jan 2020 14:42:47 +0100 Subject: [PATCH 3/4] Pass &mut dyn Network instead of &dyn Network --- client/network-gossip/src/bridge.rs | 20 +++++++++--------- client/network-gossip/src/state_machine.rs | 24 +++++++++++----------- 2 files changed, 22 insertions(+), 22 deletions(-) diff --git a/client/network-gossip/src/bridge.rs b/client/network-gossip/src/bridge.rs index 1ed843d9960d5..5e13141f8f508 100644 --- a/client/network-gossip/src/bridge.rs +++ b/client/network-gossip/src/bridge.rs @@ -41,7 +41,7 @@ struct GossipEngineInner { impl GossipEngine { /// Create a new instance. pub fn new + Send + Clone + 'static>( - network: N, + mut network: N, executor: &impl futures::task::Spawn, engine_id: ConsensusEngineId, validator: Arc>, @@ -53,7 +53,7 @@ impl GossipEngine { let event_stream = network.event_stream(); network.register_notifications_protocol(engine_id); - state_machine.register_validator(&network, engine_id, validator); + state_machine.register_validator(&mut network, engine_id, validator); let inner = Arc::new(Mutex::new(GossipEngineInner { state_machine, @@ -73,7 +73,7 @@ impl GossipEngine { if let Some(inner) = inner.upgrade() { let mut inner = inner.lock(); let inner = &mut *inner; - inner.state_machine.tick(&*inner.network); + inner.state_machine.tick(&mut *inner.network); } else { // We reach this branch if the `Arc` has no reference // left. We can now let the task end. @@ -98,7 +98,7 @@ impl GossipEngine { } let mut inner = inner.lock(); let inner = &mut *inner; - inner.state_machine.new_peer(&*inner.network, remote, roles); + inner.state_machine.new_peer(&mut *inner.network, remote, roles); } Event::NotificationsStreamClosed { remote, engine_id: msg_engine_id } => { if msg_engine_id != engine_id { @@ -106,13 +106,13 @@ impl GossipEngine { } let mut inner = inner.lock(); let inner = &mut *inner; - inner.state_machine.peer_disconnected(&*inner.network, remote); + inner.state_machine.peer_disconnected(&mut *inner.network, remote); }, Event::NotificationsReceived { remote, messages } => { let mut inner = inner.lock(); let inner = &mut *inner; inner.state_machine.on_incoming( - &*inner.network, + &mut *inner.network, remote, messages.into_iter() .filter_map(|(engine, data)| if engine == engine_id { @@ -165,7 +165,7 @@ impl GossipEngine { pub fn broadcast_topic(&self, topic: B::Hash, force: bool) { let mut inner = self.inner.lock(); let inner = &mut *inner; - inner.state_machine.broadcast_topic(&*inner.network, topic, force); + inner.state_machine.broadcast_topic(&mut *inner.network, topic, force); } /// Get data of valid, incoming messages for a topic (but might have expired meanwhile). @@ -184,7 +184,7 @@ impl GossipEngine { ) { let mut inner = self.inner.lock(); let inner = &mut *inner; - inner.state_machine.send_topic(&*inner.network, who, topic, self.engine_id, force) + inner.state_machine.send_topic(&mut *inner.network, who, topic, self.engine_id, force) } /// Multicast a message to all peers. @@ -201,7 +201,7 @@ impl GossipEngine { let mut inner = self.inner.lock(); let inner = &mut *inner; - inner.state_machine.multicast(&*inner.network, topic, message, force) + inner.state_machine.multicast(&mut *inner.network, topic, message, force) } /// Send addressed message to the given peers. The message is not kept or multicast @@ -211,7 +211,7 @@ impl GossipEngine { let inner = &mut *inner; for who in &who { - inner.state_machine.send_message(&*inner.network, who, ConsensusMessage { + inner.state_machine.send_message(&mut *inner.network, who, ConsensusMessage { engine_id: self.engine_id, data: data.clone(), }); diff --git a/client/network-gossip/src/state_machine.rs b/client/network-gossip/src/state_machine.rs index 9cef8eb670f7a..64f8b7b4d495d 100644 --- a/client/network-gossip/src/state_machine.rs +++ b/client/network-gossip/src/state_machine.rs @@ -110,7 +110,7 @@ pub trait ValidatorContext { struct NetworkContext<'g, 'p, B: BlockT> { gossip: &'g mut ConsensusGossip, - network: &'p dyn Network, + network: &'p mut dyn Network, engine_id: ConsensusEngineId, } @@ -142,7 +142,7 @@ impl<'g, 'p, B: BlockT> ValidatorContext for NetworkContext<'g, 'p, B> { } fn propagate<'a, B: BlockT, I>( - network: &dyn Network, + network: &mut dyn Network, messages: I, intent: MessageIntent, peers: &mut HashMap>, @@ -257,7 +257,7 @@ impl ConsensusGossip { /// Register message validator for a message type. pub fn register_validator( &mut self, - network: &dyn Network, + network: &mut dyn Network, engine_id: ConsensusEngineId, validator: Arc> ) { @@ -274,7 +274,7 @@ impl ConsensusGossip { } /// Handle new connected peer. - pub fn new_peer(&mut self, network: &dyn Network, who: PeerId, roles: Roles) { + pub fn new_peer(&mut self, network: &mut dyn Network, who: PeerId, roles: Roles) { // light nodes are not valid targets for consensus gossip messages if !roles.is_full() { return; @@ -323,7 +323,7 @@ impl ConsensusGossip { } /// Call when a peer has been disconnected to stop tracking gossip status. - pub fn peer_disconnected(&mut self, network: &dyn Network, who: PeerId) { + pub fn peer_disconnected(&mut self, network: &mut dyn Network, who: PeerId) { for (engine_id, v) in self.validators.clone() { let mut context = NetworkContext { gossip: self, network, engine_id: engine_id.clone() }; v.peer_disconnected(&mut context, &who); @@ -331,7 +331,7 @@ impl ConsensusGossip { } /// Perform periodic maintenance - pub fn tick(&mut self, network: &dyn Network) { + pub fn tick(&mut self, network: &mut dyn Network) { self.collect_garbage(); if time::Instant::now() >= self.next_broadcast { self.rebroadcast(network); @@ -340,14 +340,14 @@ impl ConsensusGossip { } /// Rebroadcast all messages to all peers. - fn rebroadcast(&mut self, network: &dyn Network) { + fn rebroadcast(&mut self, network: &mut dyn Network) { let messages = self.messages.iter() .map(|entry| (&entry.message_hash, &entry.topic, &entry.message)); propagate(network, messages, MessageIntent::PeriodicRebroadcast, &mut self.peers, &self.validators); } /// Broadcast all messages with given topic. - pub fn broadcast_topic(&mut self, network: &dyn Network, topic: B::Hash, force: bool) { + pub fn broadcast_topic(&mut self, network: &mut dyn Network, topic: B::Hash, force: bool) { let messages = self.messages.iter() .filter_map(|entry| if entry.topic == topic { Some((&entry.message_hash, &entry.topic, &entry.message)) } else { None } @@ -421,7 +421,7 @@ impl ConsensusGossip { /// in all other cases. pub fn on_incoming( &mut self, - network: &dyn Network, + network: &mut dyn Network, who: PeerId, messages: Vec, ) { @@ -491,7 +491,7 @@ impl ConsensusGossip { /// Send all messages with given topic to a peer. pub fn send_topic( &mut self, - network: &dyn Network, + network: &mut dyn Network, who: &PeerId, topic: B::Hash, engine_id: ConsensusEngineId, @@ -530,7 +530,7 @@ impl ConsensusGossip { /// Multicast a message to all peers. pub fn multicast( &mut self, - network: &dyn Network, + network: &mut dyn Network, topic: B::Hash, message: ConsensusMessage, force: bool, @@ -545,7 +545,7 @@ impl ConsensusGossip { /// later on. pub fn send_message( &mut self, - network: &dyn Network, + network: &mut dyn Network, who: &PeerId, message: ConsensusMessage, ) { From b752a57a53222f5df562230e1253edbd02d49b78 Mon Sep 17 00:00:00 2001 From: Pierre Krieger Date: Mon, 6 Jan 2020 14:56:06 +0100 Subject: [PATCH 4/4] Move Validator traits and related to separate module --- client/network-gossip/src/bridge.rs | 4 +- client/network-gossip/src/lib.rs | 6 +- client/network-gossip/src/state_machine.rs | 98 +------------------- client/network-gossip/src/validator.rs | 103 +++++++++++++++++++++ 4 files changed, 112 insertions(+), 99 deletions(-) create mode 100644 client/network-gossip/src/validator.rs diff --git a/client/network-gossip/src/bridge.rs b/client/network-gossip/src/bridge.rs index 5e13141f8f508..5a745961626fe 100644 --- a/client/network-gossip/src/bridge.rs +++ b/client/network-gossip/src/bridge.rs @@ -14,8 +14,8 @@ // You should have received a copy of the GNU General Public License // along with Substrate. If not, see . -use crate::Network; -use crate::state_machine::{ConsensusGossip, Validator, TopicNotification}; +use crate::{Network, Validator}; +use crate::state_machine::{ConsensusGossip, TopicNotification}; use sc_network::message::generic::ConsensusMessage; use sc_network::{Event, ReputationChange}; diff --git a/client/network-gossip/src/lib.rs b/client/network-gossip/src/lib.rs index bc28bbb8090bf..364f0df3135ed 100644 --- a/client/network-gossip/src/lib.rs +++ b/client/network-gossip/src/lib.rs @@ -55,9 +55,8 @@ //! used to inform peers of a current view of protocol state. pub use self::bridge::GossipEngine; -pub use self::state_machine::{TopicNotification, MessageIntent}; -pub use self::state_machine::{Validator, ValidatorContext, ValidationResult}; -pub use self::state_machine::DiscardAll; +pub use self::state_machine::TopicNotification; +pub use self::validator::{DiscardAll, MessageIntent, Validator, ValidatorContext, ValidationResult}; use sc_network::{specialization::NetworkSpecialization, Event, ExHashT, NetworkService, PeerId, ReputationChange}; use sp_runtime::{traits::Block as BlockT, ConsensusEngineId}; @@ -65,6 +64,7 @@ use std::sync::Arc; mod bridge; mod state_machine; +mod validator; /// Abstraction over a network. pub trait Network { diff --git a/client/network-gossip/src/state_machine.rs b/client/network-gossip/src/state_machine.rs index 64f8b7b4d495d..42925b77428ea 100644 --- a/client/network-gossip/src/state_machine.rs +++ b/client/network-gossip/src/state_machine.rs @@ -14,7 +14,7 @@ // You should have received a copy of the GNU General Public License // along with Substrate. If not, see . -use crate::Network; +use crate::{Network, MessageIntent, Validator, ValidatorContext, ValidationResult}; use std::collections::{HashMap, HashSet, hash_map::Entry}; use std::sync::Arc; @@ -68,46 +68,7 @@ struct MessageEntry { sender: Option, } -/// The reason for sending out the message. -#[derive(Eq, PartialEq, Copy, Clone)] -#[cfg_attr(test, derive(Debug))] -pub enum MessageIntent { - /// Requested broadcast. - Broadcast, - /// Requested broadcast to all peers. - ForcedBroadcast, - /// Periodic rebroadcast of all messages to all peers. - PeriodicRebroadcast, -} - -/// Message validation result. -pub enum ValidationResult { - /// Message should be stored and propagated under given topic. - ProcessAndKeep(H), - /// Message should be processed, but not propagated. - ProcessAndDiscard(H), - /// Message should be ignored. - Discard, -} - -impl MessageIntent { - fn broadcast() -> MessageIntent { - MessageIntent::Broadcast - } -} - -/// Validation context. Allows reacting to incoming messages by sending out further messages. -pub trait ValidatorContext { - /// Broadcast all messages with given topic to peers that do not have it yet. - fn broadcast_topic(&mut self, topic: B::Hash, force: bool); - /// Broadcast a message to all peers that have not received it previously. - fn broadcast_message(&mut self, topic: B::Hash, message: Vec, force: bool); - /// Send addressed message to a peer. - fn send_message(&mut self, who: &PeerId, message: Vec); - /// Send all messages with given topic to a peer. - fn send_topic(&mut self, who: &PeerId, topic: B::Hash, force: bool); -} - +/// Local implementation of `ValidatorContext`. struct NetworkContext<'g, 'p, B: BlockT> { gossip: &'g mut ConsensusGossip, network: &'p mut dyn Network, @@ -197,35 +158,6 @@ fn propagate<'a, B: BlockT, I>( } } -/// Validates consensus messages. -pub trait Validator: Send + Sync { - /// New peer is connected. - fn new_peer(&self, _context: &mut dyn ValidatorContext, _who: &PeerId, _roles: Roles) { - } - - /// New connection is dropped. - fn peer_disconnected(&self, _context: &mut dyn ValidatorContext, _who: &PeerId) { - } - - /// Validate consensus message. - fn validate( - &self, - context: &mut dyn ValidatorContext, - sender: &PeerId, - data: &[u8] - ) -> ValidationResult; - - /// Produce a closure for validating messages on a given topic. - fn message_expired<'a>(&'a self) -> Box bool + 'a> { - Box::new(move |_topic, _data| false) - } - - /// Produce a closure for filtering egress messages. - fn message_allowed<'a>(&'a self) -> Box bool + 'a> { - Box::new(move |_who, _intent, _topic, _data| true) - } -} - /// Consensus network protocol handler. Manages statements and candidate requests. pub struct ConsensusGossip { peers: HashMap>, @@ -352,7 +284,7 @@ impl ConsensusGossip { .filter_map(|entry| if entry.topic == topic { Some((&entry.message_hash, &entry.topic, &entry.message)) } else { None } ); - let intent = if force { MessageIntent::ForcedBroadcast } else { MessageIntent::broadcast() }; + let intent = if force { MessageIntent::ForcedBroadcast } else { MessageIntent::Broadcast }; propagate(network, messages, intent, &mut self.peers, &self.validators); } @@ -537,7 +469,7 @@ impl ConsensusGossip { ) { let message_hash = HashFor::::hash(&message.data); self.register_message_hashed(message_hash, topic, message.clone(), None); - let intent = if force { MessageIntent::ForcedBroadcast } else { MessageIntent::broadcast() }; + let intent = if force { MessageIntent::ForcedBroadcast } else { MessageIntent::Broadcast }; propagate(network, iter::once((&message_hash, &topic, &message)), intent, &mut self.peers, &self.validators); } @@ -563,28 +495,6 @@ impl ConsensusGossip { } } -/// A gossip message validator that discards all messages. -pub struct DiscardAll; - -impl Validator for DiscardAll { - fn validate( - &self, - _context: &mut dyn ValidatorContext, - _sender: &PeerId, - _data: &[u8], - ) -> ValidationResult { - ValidationResult::Discard - } - - fn message_expired<'a>(&'a self) -> Box bool + 'a> { - Box::new(move |_topic, _data| true) - } - - fn message_allowed<'a>(&'a self) -> Box bool + 'a> { - Box::new(move |_who, _intent, _topic, _data| false) - } -} - #[cfg(test)] mod tests { use std::sync::Arc; diff --git a/client/network-gossip/src/validator.rs b/client/network-gossip/src/validator.rs new file mode 100644 index 0000000000000..74b5307ee9cdc --- /dev/null +++ b/client/network-gossip/src/validator.rs @@ -0,0 +1,103 @@ +// Copyright 2017-2020 Parity Technologies (UK) Ltd. +// This file is part of Substrate. + +// Substrate is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Substrate is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Substrate. If not, see . + +use sc_network::{config::Roles, PeerId}; +use sp_runtime::traits::Block as BlockT; + +/// Validates consensus messages. +pub trait Validator: Send + Sync { + /// New peer is connected. + fn new_peer(&self, _context: &mut dyn ValidatorContext, _who: &PeerId, _roles: Roles) { + } + + /// New connection is dropped. + fn peer_disconnected(&self, _context: &mut dyn ValidatorContext, _who: &PeerId) { + } + + /// Validate consensus message. + fn validate( + &self, + context: &mut dyn ValidatorContext, + sender: &PeerId, + data: &[u8] + ) -> ValidationResult; + + /// Produce a closure for validating messages on a given topic. + fn message_expired<'a>(&'a self) -> Box bool + 'a> { + Box::new(move |_topic, _data| false) + } + + /// Produce a closure for filtering egress messages. + fn message_allowed<'a>(&'a self) -> Box bool + 'a> { + Box::new(move |_who, _intent, _topic, _data| true) + } +} + +/// Validation context. Allows reacting to incoming messages by sending out further messages. +pub trait ValidatorContext { + /// Broadcast all messages with given topic to peers that do not have it yet. + fn broadcast_topic(&mut self, topic: B::Hash, force: bool); + /// Broadcast a message to all peers that have not received it previously. + fn broadcast_message(&mut self, topic: B::Hash, message: Vec, force: bool); + /// Send addressed message to a peer. + fn send_message(&mut self, who: &PeerId, message: Vec); + /// Send all messages with given topic to a peer. + fn send_topic(&mut self, who: &PeerId, topic: B::Hash, force: bool); +} + +/// The reason for sending out the message. +#[derive(Eq, PartialEq, Copy, Clone)] +#[cfg_attr(test, derive(Debug))] +pub enum MessageIntent { + /// Requested broadcast. + Broadcast, + /// Requested broadcast to all peers. + ForcedBroadcast, + /// Periodic rebroadcast of all messages to all peers. + PeriodicRebroadcast, +} + +/// Message validation result. +pub enum ValidationResult { + /// Message should be stored and propagated under given topic. + ProcessAndKeep(H), + /// Message should be processed, but not propagated. + ProcessAndDiscard(H), + /// Message should be ignored. + Discard, +} + +/// A gossip message validator that discards all messages. +pub struct DiscardAll; + +impl Validator for DiscardAll { + fn validate( + &self, + _context: &mut dyn ValidatorContext, + _sender: &PeerId, + _data: &[u8], + ) -> ValidationResult { + ValidationResult::Discard + } + + fn message_expired<'a>(&'a self) -> Box bool + 'a> { + Box::new(move |_topic, _data| true) + } + + fn message_allowed<'a>(&'a self) -> Box bool + 'a> { + Box::new(move |_who, _intent, _topic, _data| false) + } +}