diff --git a/Cargo.lock b/Cargo.lock index cc63cf57f03e5..c9212445877ea 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1780,6 +1780,11 @@ dependencies = [ "scopeguard 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "hashbrown" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" + [[package]] name = "hashbrown" version = "0.6.3" @@ -2775,6 +2780,14 @@ dependencies = [ "cfg-if 0.1.10 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "lru" +version = "0.1.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "hashbrown 0.5.0 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "lru" version = "0.4.3" @@ -5302,6 +5315,7 @@ dependencies = [ "fork-tree 2.0.0", "futures 0.1.29 (registry+https://github.com/rust-lang/crates.io-index)", "futures 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)", + "futures-timer 2.0.2 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)", "parity-scale-codec 1.1.0 (registry+https://github.com/rust-lang/crates.io-index)", "parking_lot 0.9.0 (registry+https://github.com/rust-lang/crates.io-index)", @@ -5310,6 +5324,7 @@ dependencies = [ "sc-client-api 2.0.0", "sc-keystore 2.0.0", "sc-network 2.0.0", + "sc-network-gossip 2.0.0", "sc-network-test 2.0.0", "sc-telemetry 2.0.0", "serde_json 1.0.44 (registry+https://github.com/rust-lang/crates.io-index)", @@ -5327,8 +5342,6 @@ dependencies = [ "substrate-test-runtime-client 2.0.0", "tempfile 3.1.0 (registry+https://github.com/rust-lang/crates.io-index)", "tokio 0.1.22 (registry+https://github.com/rust-lang/crates.io-index)", - "tokio-executor 0.1.9 (registry+https://github.com/rust-lang/crates.io-index)", - "tokio-timer 0.2.12 (registry+https://github.com/rust-lang/crates.io-index)", ] [[package]] @@ -5398,6 +5411,21 @@ dependencies = [ "zeroize 1.1.0 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "sc-network-gossip" +version = "2.0.0" +dependencies = [ + "futures 0.1.29 (registry+https://github.com/rust-lang/crates.io-index)", + "futures 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)", + "futures-timer 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)", + "libp2p 0.13.1 (registry+https://github.com/rust-lang/crates.io-index)", + "log 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)", + "lru 0.1.17 (registry+https://github.com/rust-lang/crates.io-index)", + "parking_lot 0.9.0 (registry+https://github.com/rust-lang/crates.io-index)", + "sc-network 2.0.0", + "sp-runtime 2.0.0", +] + [[package]] name = "sc-network-test" version = "2.0.0" @@ -8233,6 +8261,7 @@ dependencies = [ "checksum hash-db 0.15.2 (registry+https://github.com/rust-lang/crates.io-index)" = "d23bd4e7b5eda0d0f3a307e8b381fdc8ba9000f26fbe912250c0a4cc3956364a" "checksum hash256-std-hasher 0.15.2 (registry+https://github.com/rust-lang/crates.io-index)" = "92c171d55b98633f4ed3860808f004099b36c1cc29c42cfc53aa8591b21efcf2" "checksum hashbrown 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)" = "3bae29b6653b3412c2e71e9d486db9f9df5d701941d86683005efb9f2d28e3da" +"checksum hashbrown 0.5.0 (registry+https://github.com/rust-lang/crates.io-index)" = "e1de41fb8dba9714efd92241565cdff73f78508c95697dd56787d3cba27e2353" "checksum hashbrown 0.6.3 (registry+https://github.com/rust-lang/crates.io-index)" = "8e6073d0ca812575946eb5f35ff68dbe519907b25c42530389ff946dc84c6ead" "checksum heapsize 0.4.2 (registry+https://github.com/rust-lang/crates.io-index)" = "1679e6ea370dee694f91f1dc469bf94cf8f52051d147aec3e1f9497c6fc22461" "checksum heck 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)" = "20564e78d53d2bb135c343b3f47714a56af2061f1c928fdb541dc7b9fdd94205" @@ -8317,6 +8346,7 @@ dependencies = [ "checksum lock_api 0.3.2 (registry+https://github.com/rust-lang/crates.io-index)" = "e57b3997725d2b60dbec1297f6c2e2957cc383db1cebd6be812163f969c7d586" "checksum log 0.3.9 (registry+https://github.com/rust-lang/crates.io-index)" = "e19e8d5c34a3e0e2223db8e060f9e8264aeeb5c5fc64a4ee9965c062211c024b" "checksum log 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)" = "14b6052be84e6b71ab17edffc2eeabf5c2c3ae1fdb464aae35ac50c67a44e1f7" +"checksum lru 0.1.17 (registry+https://github.com/rust-lang/crates.io-index)" = "5d8f669d42c72d18514dfca8115689c5f6370a17d980cb5bd777a67f404594c8" "checksum lru 0.4.3 (registry+https://github.com/rust-lang/crates.io-index)" = "0609345ddee5badacf857d4f547e0e5a2e987db77085c24cd887f73573a04237" "checksum mach 0.2.3 (registry+https://github.com/rust-lang/crates.io-index)" = "86dd2487cdfea56def77b88438a2c915fb45113c5319bfe7e14306ca4cd0b0e1" "checksum malloc_size_of_derive 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "e37c5d4cd9473c5f4c9c111f033f15d4df9bd378fdf615944e360a4f55a05f0b" diff --git a/Cargo.toml b/Cargo.toml index 84f412851556d..4aabe5916f411 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -33,6 +33,7 @@ members = [ "client/keystore", "client/network", "client/network/test", + "client/network-gossip", "client/offchain", "client/peerset", "client/rpc-servers", diff --git a/bin/node-template/src/service.rs b/bin/node-template/src/service.rs index 600ae2c5b2db1..9161e26732888 100644 --- a/bin/node-template/src/service.rs +++ b/bin/node-template/src/service.rs @@ -158,6 +158,7 @@ pub fn new_full(config: Configuration { @@ -170,6 +171,7 @@ pub fn new_full(config: Configuration { @@ -243,6 +244,7 @@ macro_rules! new_full { on_exit: service.on_exit(), telemetry_on_connect: Some(service.telemetry_on_connect_stream()), voting_rule: grandpa::VotingRulesBuilder::default().build(), + executor: service.spawn_task_handle(), }; // the GRANDPA voter task is considered infallible, i.e. // if it fails we take down the service with it. diff --git a/client/finality-grandpa/Cargo.toml b/client/finality-grandpa/Cargo.toml index fb6cb0827a820..3b1d57d2133f6 100644 --- a/client/finality-grandpa/Cargo.toml +++ b/client/finality-grandpa/Cargo.toml @@ -8,10 +8,9 @@ edition = "2018" fork-tree = { path = "../../utils/fork-tree" } futures = "0.1.29" futures03 = { package = "futures", version = "0.3.1", features = ["compat"] } +futures-timer = "2.0.2" log = "0.4.8" parking_lot = "0.9.0" -tokio-executor = "0.1.8" -tokio-timer = "0.2.11" rand = "0.7.2" codec = { package = "parity-scale-codec", version = "1.0.0", features = ["derive"] } sp-runtime = { path = "../../primitives/runtime" } @@ -25,6 +24,7 @@ client = { package = "sc-client", path = "../" } inherents = { package = "sp-inherents", path = "../../primitives/inherents" } sp-blockchain = { path = "../../primitives/blockchain" } network = { package = "sc-network", path = "../network" } +network-gossip = { package = "sc-network-gossip", path = "../network-gossip" } sp-finality-tracker = { path = "../../primitives/finality-tracker" } fg_primitives = { package = "sp-finality-grandpa", path = "../../primitives/finality-grandpa" } grandpa = { package = "finality-grandpa", version = "0.10.1", features = ["derive-codec"] } diff --git a/client/finality-grandpa/src/communication/gossip.rs b/client/finality-grandpa/src/communication/gossip.rs index 298d68d58f050..6ed70b66ae6c2 100644 --- a/client/finality-grandpa/src/communication/gossip.rs +++ b/client/finality-grandpa/src/communication/gossip.rs @@ -83,7 +83,7 @@ //! We only send polite messages to peers, use sp_runtime::traits::{NumberFor, Block as BlockT, Zero}; -use network::consensus_gossip::{self as network_gossip, MessageIntent, ValidatorContext}; +use network_gossip::{GossipEngine, MessageIntent, ValidatorContext}; use network::{config::Roles, PeerId, ReputationChange}; use codec::{Encode, Decode}; use fg_primitives::AuthorityId; @@ -1459,29 +1459,26 @@ pub(super) struct ReportStream { impl ReportStream { /// Consume the report stream, converting it into a future that /// handles all reports. - pub(super) fn consume(self, net: N) + pub(super) fn consume(self, net: GossipEngine) -> impl Future + Send + 'static where B: BlockT, - N: super::Network + Send + 'static, { ReportingTask { reports: self.reports, net, - _marker: Default::default(), } } } /// A future for reporting peers. #[must_use = "Futures do nothing unless polled"] -struct ReportingTask { +struct ReportingTask { reports: mpsc::UnboundedReceiver, - net: N, - _marker: std::marker::PhantomData, + net: GossipEngine, } -impl> Future for ReportingTask { +impl Future for ReportingTask { type Item = (); type Error = (); diff --git a/client/finality-grandpa/src/communication/mod.rs b/client/finality-grandpa/src/communication/mod.rs index d4a6363d1bd4f..e535f8577641b 100644 --- a/client/finality-grandpa/src/communication/mod.rs +++ b/client/finality-grandpa/src/communication/mod.rs @@ -29,19 +29,17 @@ use std::sync::Arc; -use futures::prelude::*; -use futures::sync::{oneshot, mpsc}; -use futures03::stream::{StreamExt, TryStreamExt}; +use futures::{prelude::*, future::Executor as _, sync::mpsc}; +use futures03::{compat::Compat, stream::StreamExt, future::FutureExt as _, future::TryFutureExt as _}; use grandpa::Message::{Prevote, Precommit, PrimaryPropose}; use grandpa::{voter, voter_set::VoterSet}; use log::{debug, trace}; -use network::{consensus_gossip as network_gossip, NetworkService, ReputationChange}; -use network_gossip::ConsensusMessage; +use network::ReputationChange; +use network_gossip::{GossipEngine, Network}; use codec::{Encode, Decode}; use primitives::Pair; use sp_runtime::traits::{Block as BlockT, Hash as HashT, Header as HeaderT, NumberFor}; use sc_telemetry::{telemetry, CONSENSUS_DEBUG, CONSENSUS_INFO}; -use tokio_executor::Executor; use crate::{ CatchUp, Commit, CommunicationIn, CommunicationOut, CompactCommit, Error, @@ -97,50 +95,6 @@ mod benefit { pub(super) const PER_EQUIVOCATION: i32 = 10; } -/// A handle to the network. This is generally implemented by providing some -/// handle to a gossip service or similar. -/// -/// Intended to be a lightweight handle such as an `Arc`. -pub trait Network: Clone + Send + 'static { - /// A stream of input messages for a topic. - type In: Stream; - - /// Get a stream of messages for a specific gossip topic. - fn messages_for(&self, topic: Block::Hash) -> Self::In; - - /// Register a gossip validator. - fn register_validator(&self, validator: Arc>); - - /// Gossip a message out to all connected peers. - /// - /// Force causes it to be sent to all peers, even if they've seen it already. - /// Only should be used in case of consensus stall. - fn gossip_message(&self, topic: Block::Hash, data: Vec, force: bool); - - /// Register a message with the gossip service, it isn't broadcast right - /// away to any peers, but may be sent to new peers joining or when asked to - /// broadcast the topic. Useful to register previous messages on node - /// startup. - fn register_gossip_message(&self, topic: Block::Hash, data: Vec); - - /// Send a message to a bunch of specific peers, even if they've seen it already. - fn send_message(&self, who: Vec, data: Vec); - - /// Report a peer's cost or benefit after some action. - fn report(&self, who: network::PeerId, cost_benefit: ReputationChange); - - /// Inform peers that a block with given hash should be downloaded. - fn announce(&self, block: Block::Hash, associated_data: Vec); - - /// Notifies the sync service to try and sync the given block from the given - /// peers. - /// - /// If the given vector of peers is empty then the underlying implementation - /// should make a best effort to fetch the block from any peers it is - /// connected to (NOTE: this assumption will change in the future #3629). - fn set_sync_fork_request(&self, peers: Vec, hash: Block::Hash, number: NumberFor); -} - /// Create a unique topic for a round and set-id combo. pub(crate) fn round_topic(round: RoundNumber, set_id: SetIdNumber) -> B::Hash { <::Hashing as HashT>::hash(format!("{}-{}", set_id, round).as_bytes()) @@ -151,157 +105,32 @@ pub(crate) fn global_topic(set_id: SetIdNumber) -> B::Hash { <::Hashing as HashT>::hash(format!("{}-GLOBAL", set_id).as_bytes()) } -impl Network for Arc> where - B: BlockT, - S: network::specialization::NetworkSpecialization, - H: network::ExHashT, -{ - type In = NetworkStream< - Box + Send + 'static>, - >; - - fn messages_for(&self, topic: B::Hash) -> Self::In { - // Given that one can only communicate with the Substrate network via the `NetworkService` via message-passing, - // and given that methods on the network consensus gossip are not exposed but only reachable by passing a - // closure into `with_gossip` on the `NetworkService` this function needs to make use of the `NetworkStream` - // construction. - // - // We create a oneshot channel and pass the sender within a closure to the network. At some point in the future - // the network passes the message channel back through the oneshot channel. But the consumer of this function - // expects a stream, not a stream within a oneshot. This complexity is abstracted within `NetworkStream`, - // waiting for the oneshot to resolve and from there on acting like a normal message channel. - let (tx, rx) = oneshot::channel(); - self.with_gossip(move |gossip, _| { - let inner_rx: Box + Send> = Box::new(gossip - .messages_for(GRANDPA_ENGINE_ID, topic) - .map(|x| Ok(x)) - .compat() - ); - let _ = tx.send(inner_rx); - }); - NetworkStream::PollingOneshot(rx) - } - - fn register_validator(&self, validator: Arc>) { - self.with_gossip( - move |gossip, context| gossip.register_validator(context, GRANDPA_ENGINE_ID, validator) - ) - } - - fn gossip_message(&self, topic: B::Hash, data: Vec, force: bool) { - let msg = ConsensusMessage { - engine_id: GRANDPA_ENGINE_ID, - data, - }; - - self.with_gossip( - move |gossip, ctx| gossip.multicast(ctx, topic, msg, force) - ) - } - - fn register_gossip_message(&self, topic: B::Hash, data: Vec) { - let msg = ConsensusMessage { - engine_id: GRANDPA_ENGINE_ID, - data, - }; - - self.with_gossip(move |gossip, _| gossip.register_message(topic, msg)) - } - - fn send_message(&self, who: Vec, data: Vec) { - let msg = ConsensusMessage { - engine_id: GRANDPA_ENGINE_ID, - data, - }; - - self.with_gossip(move |gossip, ctx| for who in &who { - gossip.send_message(ctx, who, msg.clone()) - }) - } - - fn report(&self, who: network::PeerId, cost_benefit: ReputationChange) { - self.report_peer(who, cost_benefit) - } - - fn announce(&self, block: B::Hash, associated_data: Vec) { - self.announce_block(block, associated_data) - } - - fn set_sync_fork_request(&self, peers: Vec, hash: B::Hash, number: NumberFor) { - NetworkService::set_sync_fork_request(self, peers, hash, number) - } -} - -/// A stream used by NetworkBridge in its implementation of Network. Given a oneshot that eventually returns a channel -/// which eventually returns messages, instead of: -/// -/// 1. polling the oneshot until it returns a message channel -/// -/// 2. polling the message channel for messages -/// -/// `NetworkStream` combines the two steps into one, requiring a consumer to only poll `NetworkStream` to retrieve -/// messages directly. -pub enum NetworkStream { - PollingOneshot(oneshot::Receiver), - PollingTopicNotifications(R), -} - -impl Stream for NetworkStream -where - R: Stream, -{ - type Item = R::Item; - type Error = (); - - fn poll(&mut self) -> Poll, Self::Error> { - match self { - NetworkStream::PollingOneshot(oneshot) => { - match oneshot.poll() { - Ok(futures::Async::Ready(mut stream)) => { - let poll_result = stream.poll(); - *self = NetworkStream::PollingTopicNotifications(stream); - poll_result - }, - Ok(futures::Async::NotReady) => Ok(futures::Async::NotReady), - Err(_) => Err(()) - } - }, - NetworkStream::PollingTopicNotifications(stream) => { - stream.poll() - }, - } - } -} - /// Bridge between the underlying network service, gossiping consensus messages and Grandpa -pub(crate) struct NetworkBridge> { - service: N, +pub(crate) struct NetworkBridge { + gossip_engine: GossipEngine, validator: Arc>, neighbor_sender: periodic::NeighborPacketSender, } -impl> NetworkBridge { +impl NetworkBridge { /// Create a new NetworkBridge to the given NetworkService. Returns the service - /// handle and a future that must be polled to completion to finish startup. + /// handle. /// On creation it will register previous rounds' votes with the gossip /// service taken from the VoterSetState. - pub(crate) fn new( + pub(crate) fn new + Clone + Send + 'static>( service: N, config: crate::Config, set_state: crate::environment::SharedVoterSetState, + executor: &impl futures03::task::Spawn, on_exit: impl futures03::Future + Clone + Send + Unpin + 'static, - ) -> ( - Self, - impl Future + Send + 'static, - ) { - + ) -> Self { let (validator, report_stream) = GossipValidator::new( config, set_state.clone(), ); let validator = Arc::new(validator); - service.register_validator(validator.clone()); + let gossip_engine = GossipEngine::new(service, executor, GRANDPA_ENGINE_ID, validator.clone()); { // register all previous votes with the gossip service so that they're @@ -325,7 +154,7 @@ impl> NetworkBridge { } ); - service.register_gossip_message( + gossip_engine.register_gossip_message( topic, message.encode(), ); @@ -341,34 +170,18 @@ impl> NetworkBridge { } } - let (rebroadcast_job, neighbor_sender) = periodic::neighbor_packet_worker(service.clone()); - let reporting_job = report_stream.consume(service.clone()); - - let bridge = NetworkBridge { service, validator, neighbor_sender }; - - let startup_work = futures::future::lazy(move || { - // lazily spawn these jobs onto their own tasks. the lazy future has access - // to tokio globals, which aren't available outside. - let mut executor = tokio_executor::DefaultExecutor::current(); + let (rebroadcast_job, neighbor_sender) = periodic::neighbor_packet_worker(gossip_engine.clone()); + let reporting_job = report_stream.consume(gossip_engine.clone()); - use futures03::{FutureExt, TryFutureExt}; + let bridge = NetworkBridge { gossip_engine, validator, neighbor_sender }; - let rebroadcast_job = rebroadcast_job - .select(on_exit.clone().map(Ok).compat()) - .then(|_| Ok(())); - - let reporting_job = reporting_job - .select(on_exit.clone().map(Ok).compat()) - .then(|_| Ok(())); - - executor.spawn(Box::new(rebroadcast_job)) - .expect("failed to spawn grandpa rebroadcast job task"); - executor.spawn(Box::new(reporting_job)) - .expect("failed to spawn grandpa reporting job task"); - Ok(()) - }); + let executor = Compat::new(executor); + executor.execute(Box::new(rebroadcast_job.select(on_exit.clone().map(Ok).compat()).then(|_| Ok(())))) + .expect("failed to spawn grandpa rebroadcast job task"); + executor.execute(Box::new(reporting_job.select(on_exit.clone().map(Ok).compat()).then(|_| Ok(())))) + .expect("failed to spawn grandpa reporting job task"); - (bridge, startup_work) + bridge } /// Note the beginning of a new round to the `GossipValidator`. @@ -420,7 +233,8 @@ impl> NetworkBridge { }); let topic = round_topic::(round.0, set_id.0); - let incoming = self.service.messages_for(topic) + let incoming = Compat::new(self.gossip_engine.messages_for(topic) + .map(|item| Ok::<_, ()>(item))) .filter_map(|notification| { let decoded = GossipMessage::::decode(&mut ¬ification.message[..]); if let Err(ref e) = decoded { @@ -473,10 +287,10 @@ impl> NetworkBridge { .map_err(|()| Error::Network(format!("Failed to receive message on unbounded stream"))); let (tx, out_rx) = mpsc::unbounded(); - let outgoing = OutgoingMessages:: { + let outgoing = OutgoingMessages:: { round: round.0, set_id: set_id.0, - network: self.service.clone(), + network: self.gossip_engine.clone(), locals, sender: tx, has_voted, @@ -510,7 +324,7 @@ impl> NetworkBridge { |to, neighbor| self.neighbor_sender.send(to, neighbor), ); - let service = self.service.clone(); + let service = self.gossip_engine.clone(); let topic = global_topic::(set_id.0); let incoming = incoming_global( service, @@ -520,8 +334,8 @@ impl> NetworkBridge { self.neighbor_sender.clone(), ); - let outgoing = CommitsOut::::new( - self.service.clone(), + let outgoing = CommitsOut::::new( + self.gossip_engine.clone(), set_id.0, is_voter, self.validator.clone(), @@ -543,12 +357,12 @@ impl> NetworkBridge { /// should make a best effort to fetch the block from any peers it is /// connected to (NOTE: this assumption will change in the future #3629). pub(crate) fn set_sync_fork_request(&self, peers: Vec, hash: B::Hash, number: NumberFor) { - self.service.set_sync_fork_request(peers, hash, number) + self.gossip_engine.set_sync_fork_request(peers, hash, number) } } -fn incoming_global>( - mut service: N, +fn incoming_global( + mut gossip_engine: GossipEngine, topic: B::Hash, voters: Arc>, gossip_validator: Arc>, @@ -557,7 +371,7 @@ fn incoming_global>( let process_commit = move | msg: FullCommitMessage, mut notification: network_gossip::TopicNotification, - service: &mut N, + gossip_engine: &mut GossipEngine, gossip_validator: &Arc>, voters: &VoterSet, | { @@ -579,7 +393,7 @@ fn incoming_global>( msg.set_id, ) { if let Some(who) = notification.sender { - service.report(who, cost); + gossip_engine.report(who, cost); } return None; @@ -589,7 +403,7 @@ fn incoming_global>( let commit = msg.message; let finalized_number = commit.target_number; let gossip_validator = gossip_validator.clone(); - let service = service.clone(); + let gossip_engine = gossip_engine.clone(); let neighbor_sender = neighbor_sender.clone(); let cb = move |outcome| match outcome { voter::CommitProcessingOutcome::Good(_) => { @@ -601,12 +415,12 @@ fn incoming_global>( |to, neighbor| neighbor_sender.send(to, neighbor), ); - service.gossip_message(topic, notification.message.clone(), false); + gossip_engine.gossip_message(topic, notification.message.clone(), false); } voter::CommitProcessingOutcome::Bad(_) => { // report peer and do not gossip. if let Some(who) = notification.sender.take() { - service.report(who, cost::INVALID_COMMIT); + gossip_engine.report(who, cost::INVALID_COMMIT); } } }; @@ -619,12 +433,12 @@ fn incoming_global>( let process_catch_up = move | msg: FullCatchUpMessage, mut notification: network_gossip::TopicNotification, - service: &mut N, + gossip_engine: &mut GossipEngine, gossip_validator: &Arc>, voters: &VoterSet, | { let gossip_validator = gossip_validator.clone(); - let service = service.clone(); + let gossip_engine = gossip_engine.clone(); if let Err(cost) = check_catch_up::( &msg.message, @@ -632,7 +446,7 @@ fn incoming_global>( msg.set_id, ) { if let Some(who) = notification.sender { - service.report(who, cost); + gossip_engine.report(who, cost); } return None; @@ -642,7 +456,7 @@ fn incoming_global>( if let voter::CatchUpProcessingOutcome::Bad(_) = outcome { // report peer if let Some(who) = notification.sender.take() { - service.report(who, cost::INVALID_CATCH_UP); + gossip_engine.report(who, cost::INVALID_CATCH_UP); } } @@ -654,7 +468,8 @@ fn incoming_global>( Some(voter::CommunicationIn::CatchUp(msg.message, cb)) }; - service.messages_for(topic) + Compat::new(gossip_engine.messages_for(topic) + .map(|m| Ok::<_, ()>(m))) .filter_map(|notification| { // this could be optimized by decoding piecewise. let decoded = GossipMessage::::decode(&mut ¬ification.message[..]); @@ -666,9 +481,9 @@ fn incoming_global>( .filter_map(move |(notification, msg)| { match msg { GossipMessage::Commit(msg) => - process_commit(msg, notification, &mut service, &gossip_validator, &*voters), + process_commit(msg, notification, &mut gossip_engine, &gossip_validator, &*voters), GossipMessage::CatchUp(msg) => - process_catch_up(msg, notification, &mut service, &gossip_validator, &*voters), + process_catch_up(msg, notification, &mut gossip_engine, &gossip_validator, &*voters), _ => { debug!(target: "afg", "Skipping unknown message type"); return None; @@ -678,10 +493,10 @@ fn incoming_global>( .map_err(|()| Error::Network(format!("Failed to receive message on unbounded stream"))) } -impl> Clone for NetworkBridge { +impl Clone for NetworkBridge { fn clone(&self) -> Self { NetworkBridge { - service: self.service.clone(), + gossip_engine: self.gossip_engine.clone(), validator: Arc::clone(&self.validator), neighbor_sender: self.neighbor_sender.clone(), } @@ -725,16 +540,16 @@ pub(crate) fn check_message_sig( /// use the same raw message and key to sign. This is currently true for /// `ed25519` and `BLS` signatures (which we might use in the future), care must /// be taken when switching to different key types. -struct OutgoingMessages> { +struct OutgoingMessages { round: RoundNumber, set_id: SetIdNumber, locals: Option<(AuthorityPair, AuthorityId)>, sender: mpsc::UnboundedSender>, - network: N, + network: GossipEngine, has_voted: HasVoted, } -impl> Sink for OutgoingMessages +impl Sink for OutgoingMessages { type SinkItem = Message; type SinkError = Error; @@ -978,18 +793,18 @@ fn check_catch_up( } /// An output sink for commit messages. -struct CommitsOut> { - network: N, +struct CommitsOut { + network: GossipEngine, set_id: SetId, is_voter: bool, gossip_validator: Arc>, neighbor_sender: periodic::NeighborPacketSender, } -impl> CommitsOut { +impl CommitsOut { /// Create a new commit output stream. pub(crate) fn new( - network: N, + network: GossipEngine, set_id: SetIdNumber, is_voter: bool, gossip_validator: Arc>, @@ -1005,7 +820,7 @@ impl> CommitsOut { } } -impl> Sink for CommitsOut { +impl Sink for CommitsOut { type SinkItem = (RoundNumber, Commit); type SinkError = Error; diff --git a/client/finality-grandpa/src/communication/periodic.rs b/client/finality-grandpa/src/communication/periodic.rs index 7db5fb692e191..3f9cc0dd8eb8c 100644 --- a/client/finality-grandpa/src/communication/periodic.rs +++ b/client/finality-grandpa/src/communication/periodic.rs @@ -21,12 +21,14 @@ use std::time::{Instant, Duration}; use codec::Encode; use futures::prelude::*; use futures::sync::mpsc; +use futures_timer::Delay; +use futures03::future::{FutureExt as _, TryFutureExt as _}; use log::{debug, warn}; -use tokio_timer::Delay; use network::PeerId; +use network_gossip::GossipEngine; use sp_runtime::traits::{NumberFor, Block as BlockT}; -use super::{gossip::{NeighborPacket, GossipMessage}, Network}; +use super::gossip::{NeighborPacket, GossipMessage}; // how often to rebroadcast, if no other const REBROADCAST_AFTER: Duration = Duration::from_secs(2 * 60); @@ -58,16 +60,15 @@ impl NeighborPacketSender { /// /// It may rebroadcast the last neighbor packet periodically when no /// progress is made. -pub(super) fn neighbor_packet_worker(net: N) -> ( +pub(super) fn neighbor_packet_worker(net: GossipEngine) -> ( impl Future + Send + 'static, NeighborPacketSender, ) where B: BlockT, - N: Network, { let mut last = None; let (tx, mut rx) = mpsc::unbounded::<(Vec, NeighborPacket>)>(); - let mut delay = Delay::new(rebroadcast_instant()); + let mut delay = Delay::new(REBROADCAST_AFTER); let work = futures::future::poll_fn(move || { loop { @@ -88,7 +89,7 @@ pub(super) fn neighbor_packet_worker(net: N) -> ( // has to be done in a loop because it needs to be polled after // re-scheduling. loop { - match delay.poll() { + match (&mut delay).unit_error().compat().poll() { Err(e) => { warn!(target: "afg", "Could not rebroadcast neighbor packets: {:?}", e); delay.reset(rebroadcast_instant()); diff --git a/client/finality-grandpa/src/communication/tests.rs b/client/finality-grandpa/src/communication/tests.rs index 86bbdb8f64f40..6e80291c40d29 100644 --- a/client/finality-grandpa/src/communication/tests.rs +++ b/client/finality-grandpa/src/communication/tests.rs @@ -18,25 +18,23 @@ use futures::sync::mpsc; use futures::prelude::*; -use network::consensus_gossip as network_gossip; +use network::{Event as NetworkEvent, PeerId, config::Roles}; use sc_network_test::{Block, Hash}; use network_gossip::Validator; use tokio::runtime::current_thread; use std::sync::Arc; use keyring::Ed25519Keyring; use codec::Encode; -use sp_runtime::traits::NumberFor; +use sp_runtime::{ConsensusEngineId, traits::NumberFor}; use std::{pin::Pin, task::{Context, Poll}}; use crate::environment::SharedVoterSetState; -use fg_primitives::AuthorityList; +use fg_primitives::{AuthorityList, GRANDPA_ENGINE_ID}; use super::gossip::{self, GossipValidator}; use super::{AuthorityId, VoterSet, Round, SetId}; enum Event { - MessagesFor(Hash, mpsc::UnboundedSender), - RegisterValidator(Arc>), - GossipMessage(Hash, Vec, bool), - SendMessage(Vec, Vec), + EventStream(mpsc::UnboundedSender), + WriteNotification(network::PeerId, Vec), Report(network::PeerId, network::ReputationChange), Announce(Hash), } @@ -46,56 +44,36 @@ struct TestNetwork { sender: mpsc::UnboundedSender, } -impl super::Network for TestNetwork { - type In = mpsc::UnboundedReceiver; - - /// Get a stream of messages for a specific gossip topic. - fn messages_for(&self, topic: Hash) -> Self::In { +impl network_gossip::Network for TestNetwork { + fn event_stream(&self) + -> Box + Send> { let (tx, rx) = mpsc::unbounded(); - let _ = self.sender.unbounded_send(Event::MessagesFor(topic, tx)); - - rx + let _ = self.sender.unbounded_send(Event::EventStream(tx)); + Box::new(rx) } - /// Register a gossip validator. - fn register_validator(&self, validator: Arc>) { - let _ = self.sender.unbounded_send(Event::RegisterValidator(validator)); + fn report_peer(&self, who: network::PeerId, cost_benefit: network::ReputationChange) { + let _ = self.sender.unbounded_send(Event::Report(who, cost_benefit)); } - /// Gossip a message out to all connected peers. - /// - /// Force causes it to be sent to all peers, even if they've seen it already. - /// Only should be used in case of consensus stall. - fn gossip_message(&self, topic: Hash, data: Vec, force: bool) { - let _ = self.sender.unbounded_send(Event::GossipMessage(topic, data, force)); - } + fn disconnect_peer(&self, _: PeerId) {} - /// Send a message to a bunch of specific peers, even if they've seen it already. - fn send_message(&self, who: Vec, data: Vec) { - let _ = self.sender.unbounded_send(Event::SendMessage(who, data)); + fn write_notification(&self, who: PeerId, _: ConsensusEngineId, message: Vec) { + let _ = self.sender.unbounded_send(Event::WriteNotification(who, message)); } - /// Register a message with the gossip service, it isn't broadcast right - /// away to any peers, but may be sent to new peers joining or when asked to - /// broadcast the topic. Useful to register previous messages on node - /// startup. - fn register_gossip_message(&self, _topic: Hash, _data: Vec) { - // NOTE: only required to restore previous state on startup - // not required for tests currently - } + fn register_notifications_protocol(&self, _: ConsensusEngineId) {} - /// Report a peer's cost or benefit after some action. - fn report(&self, who: network::PeerId, cost_benefit: network::ReputationChange) { - let _ = self.sender.unbounded_send(Event::Report(who, cost_benefit)); - } - - /// Inform peers that a block with given hash should be downloaded. fn announce(&self, block: Hash, _associated_data: Vec) { let _ = self.sender.unbounded_send(Event::Announce(block)); } - /// Notify the sync service to try syncing the given chain. - fn set_sync_fork_request(&self, _peers: Vec, _hash: Hash, _number: NumberFor) {} + fn set_sync_fork_request( + &self, + _peers: Vec, + _hash: Hash, + _number: NumberFor, + ) {} } impl network_gossip::ValidatorContext for TestNetwork { @@ -104,14 +82,19 @@ impl network_gossip::ValidatorContext for TestNetwork { fn broadcast_message(&mut self, _: Hash, _: Vec, _: bool) { } fn send_message(&mut self, who: &network::PeerId, data: Vec) { - >::send_message(self, vec![who.clone()], data); + >::write_notification( + self, + who.clone(), + GRANDPA_ENGINE_ID, + data, + ); } fn send_topic(&mut self, _: &network::PeerId, _: Hash, _: bool) { } } struct Tester { - net_handle: super::NetworkBridge, + net_handle: super::NetworkBridge, gossip_validator: Arc>, events: mpsc::UnboundedReceiver, } @@ -165,7 +148,7 @@ fn voter_set_state() -> SharedVoterSetState { } // needs to run in a tokio runtime. -fn make_test_network() -> ( +fn make_test_network(executor: &impl futures03::task::Spawn) -> ( impl Future, TestNetwork, ) { @@ -183,15 +166,16 @@ fn make_test_network() -> ( } } - let (bridge, startup_work) = super::NetworkBridge::new( + let bridge = super::NetworkBridge::new( net.clone(), config(), voter_set_state(), + executor, Exit, ); ( - startup_work.map(move |()| Tester { + futures::future::ok(Tester { gossip_validator: bridge.validator.clone(), net_handle: bridge, events: rx, @@ -261,7 +245,8 @@ fn good_commit_leads_to_relay() { let id = network::PeerId::random(); let global_topic = super::global_topic::(set_id); - let test = make_test_network().0 + let threads_pool = futures03::executor::ThreadPool::new().unwrap(); + let test = make_test_network(&threads_pool).0 .and_then(move |tester| { // register a peer. tester.gossip_validator.new_peer(&mut NoopContext, &id, network::config::Roles::FULL); @@ -286,11 +271,15 @@ fn good_commit_leads_to_relay() { // send a message. let sender_id = id.clone(); let send_message = tester.filter_network_events(move |event| match event { - Event::MessagesFor(topic, sender) => { - if topic != global_topic { return false } - let _ = sender.unbounded_send(network_gossip::TopicNotification { - message: commit_to_send.clone(), - sender: Some(sender_id.clone()), + Event::EventStream(sender) => { + let _ = sender.unbounded_send(NetworkEvent::NotificationStreamOpened { + remote: sender_id.clone(), + engine_id: GRANDPA_ENGINE_ID, + roles: Roles::FULL, + }); + let _ = sender.unbounded_send(NetworkEvent::NotificationsReceived { + remote: sender_id.clone(), + messages: vec![(GRANDPA_ENGINE_ID, commit_to_send.clone().into())], }); true @@ -314,12 +303,8 @@ fn good_commit_leads_to_relay() { // a repropagation event coming from the network. send_message.join(handle_commit).and_then(move |(tester, ())| { tester.filter_network_events(move |event| match event { - Event::GossipMessage(topic, data, false) => { - if topic == global_topic && data == encoded_commit { - true - } else { - panic!("Trying to gossip something strange") - } + Event::WriteNotification(_, data) => { + data == encoded_commit } _ => false, }) @@ -328,11 +313,12 @@ fn good_commit_leads_to_relay() { .map(|_| ()) }); - current_thread::block_on_all(test).unwrap(); + current_thread::Runtime::new().unwrap().block_on(test).unwrap(); } #[test] fn bad_commit_leads_to_report() { + env_logger::init(); let private = [Ed25519Keyring::Alice, Ed25519Keyring::Bob, Ed25519Keyring::Charlie]; let public = make_ids(&private[..]); let voter_set = Arc::new(public.iter().cloned().collect::>()); @@ -376,7 +362,8 @@ fn bad_commit_leads_to_report() { let id = network::PeerId::random(); let global_topic = super::global_topic::(set_id); - let test = make_test_network().0 + let threads_pool = futures03::executor::ThreadPool::new().unwrap(); + let test = make_test_network(&threads_pool).0 .and_then(move |tester| { // register a peer. tester.gossip_validator.new_peer(&mut NoopContext, &id, network::config::Roles::FULL); @@ -401,11 +388,15 @@ fn bad_commit_leads_to_report() { // send a message. let sender_id = id.clone(); let send_message = tester.filter_network_events(move |event| match event { - Event::MessagesFor(topic, sender) => { - if topic != global_topic { return false } - let _ = sender.unbounded_send(network_gossip::TopicNotification { - message: commit_to_send.clone(), - sender: Some(sender_id.clone()), + Event::EventStream(sender) => { + let _ = sender.unbounded_send(NetworkEvent::NotificationStreamOpened { + remote: sender_id.clone(), + engine_id: GRANDPA_ENGINE_ID, + roles: Roles::FULL, + }); + let _ = sender.unbounded_send(NetworkEvent::NotificationsReceived { + remote: sender_id.clone(), + messages: vec![(GRANDPA_ENGINE_ID, commit_to_send.clone().into())], }); true @@ -430,11 +421,7 @@ fn bad_commit_leads_to_report() { send_message.join(handle_commit).and_then(move |(tester, ())| { tester.filter_network_events(move |event| match event { Event::Report(who, cost_benefit) => { - if who == id && cost_benefit == super::cost::INVALID_COMMIT { - true - } else { - panic!("reported unknown peer or unexpected cost"); - } + who == id && cost_benefit == super::cost::INVALID_COMMIT } _ => false, }) @@ -443,14 +430,15 @@ fn bad_commit_leads_to_report() { .map(|_| ()) }); - current_thread::block_on_all(test).unwrap(); + current_thread::Runtime::new().unwrap().block_on(test).unwrap(); } #[test] fn peer_with_higher_view_leads_to_catch_up_request() { let id = network::PeerId::random(); - let (tester, mut net) = make_test_network(); + let threads_pool = futures03::executor::ThreadPool::new().unwrap(); + let (tester, mut net) = make_test_network(&threads_pool); let test = tester .and_then(move |tester| { // register a peer with authority role. @@ -477,10 +465,10 @@ fn peer_with_higher_view_leads_to_catch_up_request() { // a catch up request should be sent to the peer for round - 1 tester.filter_network_events(move |event| match event { - Event::SendMessage(peers, message) => { + Event::WriteNotification(peer, message) => { assert_eq!( - peers, - vec![id.clone()], + peer, + id, ); assert_eq!( @@ -501,5 +489,5 @@ fn peer_with_higher_view_leads_to_catch_up_request() { .map(|_| ()) }); - current_thread::block_on_all(test).unwrap(); + current_thread::Runtime::new().unwrap().block_on(test).unwrap(); } diff --git a/client/finality-grandpa/src/environment.rs b/client/finality-grandpa/src/environment.rs index 52e1bea3e72ff..a25266848137d 100644 --- a/client/finality-grandpa/src/environment.rs +++ b/client/finality-grandpa/src/environment.rs @@ -17,12 +17,13 @@ use std::collections::BTreeMap; use std::iter::FromIterator; use std::sync::Arc; -use std::time::{Duration, Instant}; +use std::time::Duration; use log::{debug, warn, info}; use codec::{Decode, Encode}; use futures::prelude::*; -use tokio_timer::Delay; +use futures03::future::{FutureExt as _, TryFutureExt as _}; +use futures_timer::Delay; use parking_lot::RwLock; use sp_blockchain::{HeaderBackend, Error as ClientError}; @@ -48,7 +49,7 @@ use sp_runtime::traits::{ use sc_telemetry::{telemetry, CONSENSUS_INFO}; use crate::{ - CommandOrError, Commit, Config, Error, Network, Precommit, Prevote, + CommandOrError, Commit, Config, Error, Precommit, Prevote, PrimaryPropose, SignedMessage, NewAuthoritySet, VoterCommand, }; @@ -375,20 +376,20 @@ impl SharedVoterSetState { } /// The environment we run GRANDPA in. -pub(crate) struct Environment, RA, SC, VR> { +pub(crate) struct Environment { pub(crate) client: Arc>, pub(crate) select_chain: SC, pub(crate) voters: Arc>, pub(crate) config: Config, pub(crate) authority_set: SharedAuthoritySet>, pub(crate) consensus_changes: SharedConsensusChanges>, - pub(crate) network: crate::communication::NetworkBridge, + pub(crate) network: crate::communication::NetworkBridge, pub(crate) set_id: SetId, pub(crate) voter_set_state: SharedVoterSetState, pub(crate) voting_rule: VR, } -impl, RA, SC, VR> Environment { +impl Environment { /// Updates the voter set state using the given closure. The write lock is /// held during evaluation of the closure and the environment's voter set /// state is set to its result if successful. @@ -404,15 +405,13 @@ impl, RA, SC, VR> Environment, B, E, N, RA, SC, VR> +impl, B, E, RA, SC, VR> grandpa::Chain> -for Environment +for Environment where Block: 'static, B: Backend + 'static, E: CallExecutor + Send + Sync + 'static, - N: Network + 'static, - N::In: 'static, SC: SelectChain + 'static, VR: VotingRule>, RA: Send + Sync, @@ -555,15 +554,13 @@ pub(crate) fn ancestry, E, RA>( Ok(tree_route.retracted().iter().skip(1).map(|e| e.hash).collect()) } -impl, N, RA, SC, VR> +impl, RA, SC, VR> voter::Environment> -for Environment +for Environment where Block: 'static, B: Backend + 'static, E: CallExecutor + 'static + Send + Sync, - N: Network + 'static + Send, - N::In: 'static + Send, RA: 'static + Send + Sync, SC: SelectChain + 'static, VR: VotingRule>, @@ -589,9 +586,8 @@ where &self, round: RoundNumber, ) -> voter::RoundData { - let now = Instant::now(); - let prevote_timer = Delay::new(now + self.config.gossip_duration * 2); - let precommit_timer = Delay::new(now + self.config.gossip_duration * 4); + let prevote_timer = Delay::new(self.config.gossip_duration * 2); + let precommit_timer = Delay::new(self.config.gossip_duration * 4); let local_key = crate::is_voter(&self.voters, &self.config.keystore); @@ -629,8 +625,8 @@ where voter::RoundData { voter_id: local_key.map(|pair| pair.public()), - prevote_timer: Box::new(prevote_timer.map_err(|e| Error::Timer(e).into())), - precommit_timer: Box::new(precommit_timer.map_err(|e| Error::Timer(e).into())), + prevote_timer: Box::new(prevote_timer.map(Ok).compat()), + precommit_timer: Box::new(precommit_timer.map(Ok).compat()), incoming, outgoing, } @@ -904,9 +900,7 @@ where //random between 0-1 seconds. let delay: u64 = thread_rng().gen_range(0, 1000); - Box::new(Delay::new( - Instant::now() + Duration::from_millis(delay) - ).map_err(|e| Error::Timer(e).into())) + Box::new(Delay::new(Duration::from_millis(delay)).map(Ok).compat()) } fn prevote_equivocation( diff --git a/client/finality-grandpa/src/lib.rs b/client/finality-grandpa/src/lib.rs index 0890b6db0f193..82c04006127f3 100644 --- a/client/finality-grandpa/src/lib.rs +++ b/client/finality-grandpa/src/lib.rs @@ -73,7 +73,7 @@ use sp_finality_tracker; use grandpa::Error as GrandpaError; use grandpa::{voter, BlockNumberOps, voter_set::VoterSet}; -use std::fmt; +use std::{fmt, io}; use std::sync::Arc; use std::time::Duration; @@ -90,7 +90,7 @@ mod observer; mod until_imported; mod voting_rule; -pub use communication::Network; +pub use network_gossip::Network; pub use finality_proof::FinalityProofProvider; pub use justification::GrandpaJustification; pub use light_import::light_block_import; @@ -230,7 +230,7 @@ pub enum Error { /// An invariant has been violated (e.g. not finalizing pending change blocks in-order) Safety(String), /// A timer failed to fire. - Timer(tokio_timer::Error), + Timer(io::Error), } impl From for Error { @@ -276,9 +276,8 @@ pub(crate) trait BlockSyncRequester { fn set_sync_fork_request(&self, peers: Vec, hash: Block::Hash, number: NumberFor); } -impl BlockSyncRequester for NetworkBridge where +impl BlockSyncRequester for NetworkBridge where Block: BlockT, - N: communication::Network, { fn set_sync_fork_request(&self, peers: Vec, hash: Block::Hash, number: NumberFor) { NetworkBridge::set_sync_fork_request(self, peers, hash, number) @@ -447,11 +446,11 @@ where )) } -fn global_communication, B, E, N, RA>( +fn global_communication, B, E, RA>( set_id: SetId, voters: &Arc>, client: &Arc>, - network: &NetworkBridge, + network: &NetworkBridge, keystore: &Option, ) -> ( impl Stream< @@ -465,7 +464,6 @@ fn global_communication, B, E, N, RA>( ) where B: Backend, E: CallExecutor + Send + Sync, - N: Network, RA: Send + Sync, NumberFor: BlockNumberOps, { @@ -523,7 +521,7 @@ fn register_finality_tracker_inherent_data_provider, N, RA, SC, VR, X> { +pub struct GrandpaParams, N, RA, SC, VR, X, Sp> { /// Configuration for the GRANDPA service. pub config: Config, /// A link to the block import worker. @@ -538,24 +536,26 @@ pub struct GrandpaParams, N, RA, SC, VR, X> { pub telemetry_on_connect: Option>, /// A voting rule used to potentially restrict target votes. pub voting_rule: VR, + /// How to spawn background tasks. + pub executor: Sp, } /// Run a GRANDPA voter as a task. Provide configuration and a link to a /// block import worker that has already been instantiated with `block_import`. -pub fn run_grandpa_voter, N, RA, SC, VR, X>( - grandpa_params: GrandpaParams, +pub fn run_grandpa_voter, N, RA, SC, VR, X, Sp>( + grandpa_params: GrandpaParams, ) -> sp_blockchain::Result + Send + 'static> where Block::Hash: Ord, B: Backend + 'static, E: CallExecutor + Send + Sync + 'static, - N: Network + Send + Sync + 'static, - N::In: Send + 'static, + N: Network + Send + Clone + 'static, SC: SelectChain + 'static, VR: VotingRule> + Clone + 'static, NumberFor: BlockNumberOps, DigestFor: Encode, RA: Send + Sync + 'static, X: futures03::Future + Clone + Send + Unpin + 'static, + Sp: futures03::task::Spawn + 'static, { let GrandpaParams { config, @@ -565,6 +565,7 @@ pub fn run_grandpa_voter, N, RA, SC, VR, X>( on_exit, telemetry_on_connect, voting_rule, + executor, } = grandpa_params; let LinkHalf { @@ -574,10 +575,11 @@ pub fn run_grandpa_voter, N, RA, SC, VR, X>( voter_commands_rx, } = link; - let (network, network_startup) = NetworkBridge::new( + let network = NetworkBridge::new( network, config.clone(), persistent_data.set_state.clone(), + &executor, on_exit.clone(), ); @@ -628,8 +630,6 @@ pub fn run_grandpa_voter, N, RA, SC, VR, X>( telemetry!(CONSENSUS_WARN; "afg.voter_failed"; "e" => ?e); }); - let voter_work = network_startup.and_then(move |()| voter_work); - // Make sure that `telemetry_task` doesn't accidentally finish and kill grandpa. let telemetry_task = telemetry_task .then(|_| futures::future::empty::<(), ()>()); @@ -641,17 +641,15 @@ pub fn run_grandpa_voter, N, RA, SC, VR, X>( /// Future that powers the voter. #[must_use] -struct VoterWork, RA, SC, VR> { +struct VoterWork { voter: Box>> + Send>, - env: Arc>, + env: Arc>, voter_commands_rx: mpsc::UnboundedReceiver>>, } -impl VoterWork +impl VoterWork where Block: BlockT, - N: Network + Sync, - N::In: Send + 'static, NumberFor: BlockNumberOps, RA: 'static + Send + Sync, E: CallExecutor + Send + Sync + 'static, @@ -662,7 +660,7 @@ where fn new( client: Arc>, config: Config, - network: NetworkBridge, + network: NetworkBridge, select_chain: SC, voting_rule: VR, persistent_data: PersistentData, @@ -823,11 +821,9 @@ where } } -impl Future for VoterWork +impl Future for VoterWork where Block: BlockT, - N: Network + Sync, - N::In: Send + 'static, NumberFor: BlockNumberOps, RA: 'static + Send + Sync, E: CallExecutor + Send + Sync + 'static, @@ -878,20 +874,20 @@ where } #[deprecated(since = "1.1.0", note = "Please switch to run_grandpa_voter.")] -pub fn run_grandpa, N, RA, SC, VR, X>( - grandpa_params: GrandpaParams, +pub fn run_grandpa, N, RA, SC, VR, X, Sp>( + grandpa_params: GrandpaParams, ) -> ::sp_blockchain::Result + Send + 'static> where Block::Hash: Ord, B: Backend + 'static, E: CallExecutor + Send + Sync + 'static, - N: Network + Send + Sync + 'static, - N::In: Send + 'static, + N: Network + Send + Clone + 'static, SC: SelectChain + 'static, NumberFor: BlockNumberOps, DigestFor: Encode, RA: Send + Sync + 'static, VR: VotingRule> + Clone + 'static, X: futures03::Future + Clone + Send + Unpin + 'static, + Sp: futures03::task::Spawn + 'static, { run_grandpa_voter(grandpa_params) } @@ -910,15 +906,17 @@ pub fn setup_disabled_grandpa, RA, N>( B: Backend + 'static, E: CallExecutor + Send + Sync + 'static, RA: Send + Sync + 'static, - N: Network + Send + Sync + 'static, - N::In: Send + 'static, + N: Network + Send + Clone + 'static, { register_finality_tracker_inherent_data_provider( client, inherent_data_providers, )?; - network.register_validator(Arc::new(network::consensus_gossip::DiscardAll)); + // We register the GRANDPA protocol so that we don't consider it an anomaly + // to receive GRANDPA messages on the network. We don't process the + // messages. + network.register_notifications_protocol(communication::GRANDPA_ENGINE_ID); Ok(()) } diff --git a/client/finality-grandpa/src/observer.rs b/client/finality-grandpa/src/observer.rs index cf09ae3b21a34..4681c12753178 100644 --- a/client/finality-grandpa/src/observer.rs +++ b/client/finality-grandpa/src/observer.rs @@ -151,19 +151,20 @@ fn grandpa_observer, RA, S, F>( /// listening for and validating GRANDPA commits instead of following the full /// protocol. Provide configuration and a link to a block import worker that has /// already been instantiated with `block_import`. -pub fn run_grandpa_observer, N, RA, SC>( +pub fn run_grandpa_observer, N, RA, SC, Sp>( config: Config, link: LinkHalf, network: N, on_exit: impl futures03::Future + Clone + Send + Unpin + 'static, + executor: Sp, ) -> ::sp_blockchain::Result + Send + 'static> where B: Backend + 'static, E: CallExecutor + Send + Sync + 'static, - N: Network + Send + Sync + 'static, - N::In: Send + 'static, + N: Network + Send + Clone + 'static, SC: SelectChain + 'static, NumberFor: BlockNumberOps, RA: Send + Sync + 'static, + Sp: futures03::task::Spawn + 'static, { let LinkHalf { client, @@ -172,10 +173,11 @@ pub fn run_grandpa_observer, N, RA, SC>( voter_commands_rx, } = link; - let (network, network_startup) = NetworkBridge::new( + let network = NetworkBridge::new( network, config.clone(), persistent_data.set_state.clone(), + &executor, on_exit.clone(), ); @@ -193,8 +195,6 @@ pub fn run_grandpa_observer, N, RA, SC>( warn!("GRANDPA Observer failed: {:?}", e); }); - let observer_work = network_startup.and_then(move |()| observer_work); - use futures03::{FutureExt, TryFutureExt}; Ok(observer_work.select(on_exit.map(Ok).compat()).map(|_| ()).map_err(|_| ())) @@ -202,20 +202,18 @@ pub fn run_grandpa_observer, N, RA, SC>( /// Future that powers the observer. #[must_use] -struct ObserverWork, N: Network, E, Backend, RA> { +struct ObserverWork, E, Backend, RA> { observer: Box>> + Send>, client: Arc>, - network: NetworkBridge, + network: NetworkBridge, persistent_data: PersistentData, keystore: Option, voter_commands_rx: mpsc::UnboundedReceiver>>, } -impl ObserverWork +impl ObserverWork where B: BlockT, - N: Network, - N::In: Send + 'static, NumberFor: BlockNumberOps, RA: 'static + Send + Sync, E: CallExecutor + Send + Sync + 'static, @@ -223,7 +221,7 @@ where { fn new( client: Arc>, - network: NetworkBridge, + network: NetworkBridge, persistent_data: PersistentData, keystore: Option, voter_commands_rx: mpsc::UnboundedReceiver>>, @@ -327,11 +325,9 @@ where } } -impl Future for ObserverWork +impl Future for ObserverWork where B: BlockT, - N: Network, - N::In: Send + 'static, NumberFor: BlockNumberOps, RA: 'static + Send + Sync, E: CallExecutor + Send + Sync + 'static, diff --git a/client/finality-grandpa/src/tests.rs b/client/finality-grandpa/src/tests.rs index ece9bf167b8d1..a63ce2a7f6a0a 100644 --- a/client/finality-grandpa/src/tests.rs +++ b/client/finality-grandpa/src/tests.rs @@ -22,6 +22,7 @@ use sc_network_test::{Block, DummySpecialization, Hash, TestNetFactory, Peer, Pe use sc_network_test::{PassThroughVerifier}; use network::config::{ProtocolConfig, Roles, BoxFinalityProofRequestBuilder}; use parking_lot::Mutex; +use futures_timer::Delay; use futures03::{StreamExt as _, TryStreamExt as _}; use tokio::runtime::current_thread; use keyring::Ed25519Keyring; @@ -338,6 +339,7 @@ fn create_keystore(authority: Ed25519Keyring) -> (KeyStorePtr, tempfile::TempDir // the voters are spawned but before blocking on them. fn run_to_completion_with( runtime: &mut current_thread::Runtime, + threads_pool: &futures03::executor::ThreadPool, blocks: u64, net: Arc>, peers: &[Ed25519Keyring], @@ -405,6 +407,7 @@ fn run_to_completion_with( on_exit: Exit, telemetry_on_connect: None, voting_rule: (), + executor: threads_pool.clone(), }; let voter = run_grandpa_voter(grandpa_params).expect("all in order with client and network"); @@ -427,11 +430,12 @@ fn run_to_completion_with( fn run_to_completion( runtime: &mut current_thread::Runtime, + threads_pool: &futures03::executor::ThreadPool, blocks: u64, net: Arc>, peers: &[Ed25519Keyring] ) -> u64 { - run_to_completion_with(runtime, blocks, net, peers, |_| None) + run_to_completion_with(runtime, threads_pool, blocks, net, peers, |_| None) } fn add_scheduled_change(block: &mut Block, change: ScheduledChange) { @@ -456,6 +460,7 @@ fn add_forced_change( fn finalize_3_voters_no_observers() { let _ = env_logger::try_init(); let mut runtime = current_thread::Runtime::new().unwrap(); + let threads_pool = futures03::executor::ThreadPool::new().unwrap(); let peers = &[Ed25519Keyring::Alice, Ed25519Keyring::Bob, Ed25519Keyring::Charlie]; let voters = make_ids(peers); @@ -469,7 +474,7 @@ fn finalize_3_voters_no_observers() { } let net = Arc::new(Mutex::new(net)); - run_to_completion(&mut runtime, 20, net.clone(), peers); + run_to_completion(&mut runtime, &threads_pool, 20, net.clone(), peers); // normally there's no justification for finalized blocks assert!( @@ -481,6 +486,7 @@ fn finalize_3_voters_no_observers() { #[test] fn finalize_3_voters_1_full_observer() { let mut runtime = current_thread::Runtime::new().unwrap(); + let threads_pool = futures03::executor::ThreadPool::new().unwrap(); let peers = &[Ed25519Keyring::Alice, Ed25519Keyring::Bob, Ed25519Keyring::Charlie]; let voters = make_ids(peers); @@ -499,6 +505,8 @@ fn finalize_3_voters_1_full_observer() { let mut keystore_paths = Vec::new(); + let mut voters = Vec::new(); + for (peer_id, local_key) in all_peers.enumerate() { let (client, net_service, link) = { let net = net.lock(); @@ -539,9 +547,13 @@ fn finalize_3_voters_1_full_observer() { on_exit: Exit, telemetry_on_connect: None, voting_rule: (), + executor: threads_pool.clone(), }; - let voter = run_grandpa_voter(grandpa_params).expect("all in order with client and network"); + voters.push(run_grandpa_voter(grandpa_params).expect("all in order with client and network")); + } + + for voter in voters { runtime.spawn(voter); } @@ -583,6 +595,7 @@ fn transition_3_voters_twice_1_full_observer() { let net = Arc::new(Mutex::new(GrandpaTestNet::new(api, 8))); let mut runtime = current_thread::Runtime::new().unwrap(); + let threads_pool = futures03::executor::ThreadPool::new().unwrap(); net.lock().peer(0).push_blocks(1, false); net.lock().block_until_sync(&mut runtime); @@ -687,6 +700,7 @@ fn transition_3_voters_twice_1_full_observer() { assert_eq!(set.pending_changes().count(), 0); }) ); + let grandpa_params = GrandpaParams { config: Config { gossip_duration: TEST_GOSSIP_DURATION, @@ -702,6 +716,7 @@ fn transition_3_voters_twice_1_full_observer() { on_exit: Exit, telemetry_on_connect: None, voting_rule: (), + executor: threads_pool.clone(), }; let voter = run_grandpa_voter(grandpa_params).expect("all in order with client and network"); @@ -720,6 +735,7 @@ fn transition_3_voters_twice_1_full_observer() { #[test] fn justification_is_emitted_when_consensus_data_changes() { let mut runtime = current_thread::Runtime::new().unwrap(); + let threads_pool = futures03::executor::ThreadPool::new().unwrap(); let peers = &[Ed25519Keyring::Alice, Ed25519Keyring::Bob, Ed25519Keyring::Charlie]; let mut net = GrandpaTestNet::new(TestApi::new(make_ids(peers)), 3); @@ -728,7 +744,7 @@ fn justification_is_emitted_when_consensus_data_changes() { net.peer(0).push_authorities_change_block(new_authorities); net.block_until_sync(&mut runtime); let net = Arc::new(Mutex::new(net)); - run_to_completion(&mut runtime, 1, net.clone(), peers); + run_to_completion(&mut runtime, &threads_pool, 1, net.clone(), peers); // ... and check that there's justification for block#1 assert!(net.lock().peer(0).client().justification(&BlockId::Number(1)).unwrap().is_some(), @@ -738,6 +754,7 @@ fn justification_is_emitted_when_consensus_data_changes() { #[test] fn justification_is_generated_periodically() { let mut runtime = current_thread::Runtime::new().unwrap(); + let threads_pool = futures03::executor::ThreadPool::new().unwrap(); let peers = &[Ed25519Keyring::Alice, Ed25519Keyring::Bob, Ed25519Keyring::Charlie]; let voters = make_ids(peers); @@ -746,7 +763,7 @@ fn justification_is_generated_periodically() { net.block_until_sync(&mut runtime); let net = Arc::new(Mutex::new(net)); - run_to_completion(&mut runtime, 32, net.clone(), peers); + run_to_completion(&mut runtime, &threads_pool, 32, net.clone(), peers); // when block#32 (justification_period) is finalized, justification // is required => generated @@ -777,6 +794,7 @@ fn consensus_changes_works() { #[test] fn sync_justifications_on_change_blocks() { let mut runtime = current_thread::Runtime::new().unwrap(); + let threads_pool = futures03::executor::ThreadPool::new().unwrap(); let peers_a = &[Ed25519Keyring::Alice, Ed25519Keyring::Bob, Ed25519Keyring::Charlie]; let peers_b = &[Ed25519Keyring::Alice, Ed25519Keyring::Bob]; let voters = make_ids(peers_b); @@ -808,7 +826,7 @@ fn sync_justifications_on_change_blocks() { } let net = Arc::new(Mutex::new(net)); - run_to_completion(&mut runtime, 25, net.clone(), peers_a); + run_to_completion(&mut runtime, &threads_pool, 25, net.clone(), peers_a); // the first 3 peers are grandpa voters and therefore have already finalized // block 21 and stored a justification @@ -831,6 +849,7 @@ fn sync_justifications_on_change_blocks() { fn finalizes_multiple_pending_changes_in_order() { let _ = env_logger::try_init(); let mut runtime = current_thread::Runtime::new().unwrap(); + let threads_pool = futures03::executor::ThreadPool::new().unwrap(); let peers_a = &[Ed25519Keyring::Alice, Ed25519Keyring::Bob, Ed25519Keyring::Charlie]; let peers_b = &[Ed25519Keyring::Dave, Ed25519Keyring::Eve, Ed25519Keyring::Ferdie]; @@ -884,13 +903,14 @@ fn finalizes_multiple_pending_changes_in_order() { } let net = Arc::new(Mutex::new(net)); - run_to_completion(&mut runtime, 30, net.clone(), all_peers); + run_to_completion(&mut runtime, &threads_pool, 30, net.clone(), all_peers); } #[test] fn force_change_to_new_set() { let _ = env_logger::try_init(); let mut runtime = current_thread::Runtime::new().unwrap(); + let threads_pool = futures03::executor::ThreadPool::new().unwrap(); // two of these guys are offline. let genesis_authorities = &[ Ed25519Keyring::Alice, @@ -941,7 +961,7 @@ fn force_change_to_new_set() { // it will only finalize if the forced transition happens. // we add_blocks after the voters are spawned because otherwise // the link-halfs have the wrong AuthoritySet - run_to_completion(&mut runtime, 25, net, peers_a); + run_to_completion(&mut runtime, &threads_pool, 25, net, peers_a); } #[test] @@ -1059,6 +1079,7 @@ fn voter_persists_its_votes() { let _ = env_logger::try_init(); let mut runtime = current_thread::Runtime::new().unwrap(); + let threads_pool = futures03::executor::ThreadPool::new().unwrap(); // we have two authorities but we'll only be running the voter for alice // we are going to be listening for the prevotes it casts @@ -1097,6 +1118,7 @@ fn voter_persists_its_votes() { net: Arc>, client: PeersClient, keystore: KeyStorePtr, + threads_pool: futures03::executor::ThreadPool, } impl Future for ResettableVoter { @@ -1132,6 +1154,7 @@ fn voter_persists_its_votes() { on_exit: Exit, telemetry_on_connect: None, voting_rule: VotingRulesBuilder::default().build(), + executor: self.threads_pool.clone(), }; let voter = run_grandpa_voter(grandpa_params) @@ -1163,6 +1186,7 @@ fn voter_persists_its_votes() { net: net.clone(), client: client.clone(), keystore, + threads_pool: threads_pool.clone(), }); } @@ -1191,13 +1215,13 @@ fn voter_persists_its_votes() { set_state }; - let (network, routing_work) = communication::NetworkBridge::new( + let network = communication::NetworkBridge::new( net.lock().peers[1].network_service().clone(), config.clone(), set_state, + &threads_pool, Exit, ); - runtime.block_on(routing_work).unwrap(); let (round_rx, round_tx) = network.round_communication( communication::Round(1), @@ -1232,7 +1256,14 @@ fn voter_persists_its_votes() { let net = net.clone(); let voter_tx = voter_tx.clone(); let round_tx = round_tx.clone(); - future::Either::A(tokio_timer::Interval::new_interval(Duration::from_millis(200)) + + let interval = futures03::stream::unfold(Delay::new(Duration::from_millis(200)), |delay| + Box::pin(async move { + delay.await; + Some(((), Delay::new(Duration::from_millis(200)))) + })).map(Ok::<_, ()>).compat(); + + future::Either::A(interval .take_while(move |_| { Ok(net2.lock().peer(1).client().info().chain.best_number != 40) }) @@ -1302,6 +1333,7 @@ fn voter_persists_its_votes() { fn finalize_3_voters_1_light_observer() { let _ = env_logger::try_init(); let mut runtime = current_thread::Runtime::new().unwrap(); + let threads_pool = futures03::executor::ThreadPool::new().unwrap(); let authorities = &[Ed25519Keyring::Alice, Ed25519Keyring::Bob, Ed25519Keyring::Charlie]; let voters = make_ids(authorities); @@ -1322,7 +1354,7 @@ fn finalize_3_voters_1_light_observer() { .take_while(|n| Ok(n.header.number() < &20)) .collect(); - run_to_completion_with(&mut runtime, 20, net.clone(), authorities, |executor| { + run_to_completion_with(&mut runtime, &threads_pool, 20, net.clone(), authorities, |executor| { executor.spawn( run_grandpa_observer( Config { @@ -1336,6 +1368,7 @@ fn finalize_3_voters_1_light_observer() { link, net.lock().peers[3].network_service().clone(), Exit, + threads_pool.clone(), ).unwrap() ).unwrap(); @@ -1347,6 +1380,7 @@ fn finalize_3_voters_1_light_observer() { fn finality_proof_is_fetched_by_light_client_when_consensus_data_changes() { let _ = ::env_logger::try_init(); let mut runtime = current_thread::Runtime::new().unwrap(); + let threads_pool = futures03::executor::ThreadPool::new().unwrap(); let peers = &[Ed25519Keyring::Alice]; let mut net = GrandpaTestNet::new(TestApi::new(make_ids(peers)), 1); @@ -1356,7 +1390,7 @@ fn finality_proof_is_fetched_by_light_client_when_consensus_data_changes() { // && instead fetches finality proof for block #1 net.peer(0).push_authorities_change_block(vec![babe_primitives::AuthorityId::from_slice(&[42; 32])]); let net = Arc::new(Mutex::new(net)); - run_to_completion(&mut runtime, 1, net.clone(), peers); + run_to_completion(&mut runtime, &threads_pool, 1, net.clone(), peers); net.lock().block_until_sync(&mut runtime); // check that the block#1 is finalized on light client @@ -1377,6 +1411,7 @@ fn empty_finality_proof_is_returned_to_light_client_when_authority_set_is_differ let _ = ::env_logger::try_init(); let mut runtime = current_thread::Runtime::new().unwrap(); + let threads_pool = futures03::executor::ThreadPool::new().unwrap(); // two of these guys are offline. let genesis_authorities = if FORCE_CHANGE { @@ -1424,7 +1459,7 @@ fn empty_finality_proof_is_returned_to_light_client_when_authority_set_is_differ net.lock().block_until_sync(&mut runtime); // finalize block #11 on full clients - run_to_completion(&mut runtime, 11, net.clone(), peers_a); + run_to_completion(&mut runtime, &threads_pool, 11, net.clone(), peers_a); // request finalization by light client net.lock().add_light_peer(&GrandpaTestNet::default_config()); @@ -1441,6 +1476,7 @@ fn empty_finality_proof_is_returned_to_light_client_when_authority_set_is_differ fn voter_catches_up_to_latest_round_when_behind() { let _ = env_logger::try_init(); let mut runtime = current_thread::Runtime::new().unwrap(); + let threads_pool = futures03::executor::ThreadPool::new().unwrap(); let peers = &[Ed25519Keyring::Alice, Ed25519Keyring::Bob]; let voters = make_ids(peers); @@ -1468,6 +1504,7 @@ fn voter_catches_up_to_latest_round_when_behind() { on_exit: Exit, telemetry_on_connect: None, voting_rule: (), + executor: threads_pool.clone(), }; Box::new(run_grandpa_voter(grandpa_params).expect("all in order with client and network")) @@ -1555,6 +1592,8 @@ fn grandpa_environment_respects_voting_rules() { use grandpa::Chain; use sc_network_test::TestClient; + let threads_pool = futures03::executor::ThreadPool::new().unwrap(); + let peers = &[Ed25519Keyring::Alice]; let voters = make_ids(peers); @@ -1581,10 +1620,11 @@ fn grandpa_environment_respects_voting_rules() { observer_enabled: true, }; - let (network, _) = NetworkBridge::new( + let network = NetworkBridge::new( network_service.clone(), config.clone(), set_state.clone(), + &threads_pool, Exit, ); diff --git a/client/finality-grandpa/src/until_imported.rs b/client/finality-grandpa/src/until_imported.rs index 7e209e13b8ea2..c843547a7bbbe 100644 --- a/client/finality-grandpa/src/until_imported.rs +++ b/client/finality-grandpa/src/until_imported.rs @@ -32,11 +32,11 @@ use log::{debug, warn}; use client_api::{BlockImportNotification, ImportNotifications}; use futures::prelude::*; use futures::stream::Fuse; +use futures_timer::Delay; use futures03::{StreamExt as _, TryStreamExt as _}; use grandpa::voter; use parking_lot::Mutex; use sp_runtime::traits::{Block as BlockT, Header as HeaderT, NumberFor}; -use tokio_timer::Interval; use std::collections::{HashMap, VecDeque}; use std::sync::{atomic::{AtomicUsize, Ordering}, Arc}; @@ -76,7 +76,7 @@ pub(crate) struct UntilImported, ready: VecDeque, - check_pending: Interval, + check_pending: Box + Send>, /// Mapping block hashes to their block number, the point in time it was /// first encountered (Instant) and a list of GRANDPA messages referencing /// the block hash. @@ -104,9 +104,13 @@ impl UntilImported _>(|v| Ok::<_, ()>(v)).compat(); @@ -116,7 +120,7 @@ impl UntilImported panic!("neither should have had error"), Ok(Either::A(_)) => panic!("timeout should have fired first"), @@ -929,7 +933,7 @@ mod tests { // the `until_imported` stream doesn't request the blocks immediately, // but it should request them after a small timeout - let timeout = Delay::new(Instant::now() + Duration::from_secs(60)); + let timeout = Delay::new(Duration::from_secs(60)).unit_error().compat(); let test = assert.select2(timeout).map(|res| match res { Either::A(_) => {}, Either::B(_) => panic!("timed out waiting for block sync request"), diff --git a/client/network-gossip/Cargo.toml b/client/network-gossip/Cargo.toml new file mode 100644 index 0000000000000..b0fcd1fe72a01 --- /dev/null +++ b/client/network-gossip/Cargo.toml @@ -0,0 +1,18 @@ +[package] +description = "Gossiping for the Substrate network protocol" +name = "sc-network-gossip" +version = "2.0.0" +license = "GPL-3.0" +authors = ["Parity Technologies "] +edition = "2018" + +[dependencies] +log = "0.4.8" +futures01 = { package = "futures", version = "0.1.29" } +futures = { version = "0.3.1", features = ["compat"] } +futures-timer = "0.4.0" +lru = "0.1.2" +libp2p = { version = "0.13.0", default-features = false, features = ["libp2p-websocket"] } +network = { package = "sc-network", path = "../network" } +parking_lot = "0.9.0" +sp-runtime = { path = "../../primitives/runtime" } diff --git a/client/network-gossip/src/bridge.rs b/client/network-gossip/src/bridge.rs new file mode 100644 index 0000000000000..28f0e3f9b446c --- /dev/null +++ b/client/network-gossip/src/bridge.rs @@ -0,0 +1,301 @@ +// Copyright 2019 Parity Technologies (UK) Ltd. +// This file is part of Substrate. + +// Substrate is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Substrate is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Substrate. If not, see . + +use crate::Network; +use crate::state_machine::{ConsensusGossip, Validator, TopicNotification}; + +use network::Context; +use network::message::generic::ConsensusMessage; +use network::{Event, ReputationChange}; + +use futures::{prelude::*, channel::mpsc, compat::Compat01As03, task::SpawnExt as _}; +use libp2p::PeerId; +use parking_lot::Mutex; +use sp_runtime::{traits::{Block as BlockT, NumberFor}, ConsensusEngineId}; +use std::{sync::Arc, time::Duration}; + +/// Wraps around an implementation of the `Network` crate and provides gossiping capabilities on +/// top of it. +pub struct GossipEngine { + inner: Arc>>, + engine_id: ConsensusEngineId, +} + +struct GossipEngineInner { + state_machine: ConsensusGossip, + context: Box + Send>, + context_ext: Box + Send>, +} + +impl GossipEngine { + /// Create a new instance. + pub fn new + Send + Clone + 'static>( + network: N, + executor: &impl futures::task::Spawn, + engine_id: ConsensusEngineId, + validator: Arc>, + ) -> Self where B: 'static { + let mut state_machine = ConsensusGossip::new(); + let mut context = Box::new(ContextOverService { + network: network.clone(), + }); + let context_ext = Box::new(ContextOverService { + network: network.clone(), + }); + + // We grab the event stream before registering the notifications protocol, otherwise we + // might miss events. + let event_stream = network.event_stream(); + + network.register_notifications_protocol(engine_id); + state_machine.register_validator(&mut *context, engine_id, validator); + + let inner = Arc::new(Mutex::new(GossipEngineInner { + state_machine, + context, + context_ext, + })); + + let gossip_engine = GossipEngine { + inner: inner.clone(), + engine_id, + }; + + let res = executor.spawn({ + let inner = Arc::downgrade(&inner); + async move { + loop { + let _ = futures_timer::Delay::new(Duration::from_millis(1100)).await; + if let Some(inner) = inner.upgrade() { + let mut inner = inner.lock(); + let inner = &mut *inner; + inner.state_machine.tick(&mut *inner.context); + } else { + // We reach this branch if the `Arc` has no reference + // left. We can now let the task end. + break; + } + } + } + }); + + // Note: we consider the chances of an error to spawn a background task almost null. + if res.is_err() { + log::error!(target: "gossip", "Failed to spawn background task"); + } + + let res = executor.spawn(async move { + let mut stream = Compat01As03::new(event_stream); + while let Some(Ok(event)) = stream.next().await { + match event { + Event::NotificationStreamOpened { remote, engine_id: msg_engine_id, roles } => { + if msg_engine_id != engine_id { + continue; + } + let mut inner = inner.lock(); + let inner = &mut *inner; + inner.state_machine.new_peer(&mut *inner.context, remote, roles); + } + Event::NotificationsStreamClosed { remote, engine_id: msg_engine_id } => { + if msg_engine_id != engine_id { + continue; + } + let mut inner = inner.lock(); + let inner = &mut *inner; + inner.state_machine.peer_disconnected(&mut *inner.context, remote); + }, + Event::NotificationsReceived { remote, messages } => { + let mut inner = inner.lock(); + let inner = &mut *inner; + inner.state_machine.on_incoming( + &mut *inner.context, + remote, + messages.into_iter() + .filter_map(|(engine, data)| if engine == engine_id { + Some(ConsensusMessage { engine_id: engine, data: data.to_vec() }) + } else { None }) + .collect() + ); + }, + Event::Dht(_) => {} + } + } + }); + + // Note: we consider the chances of an error to spawn a background task almost null. + if res.is_err() { + log::error!(target: "gossip", "Failed to spawn background task"); + } + + gossip_engine + } + + /// Closes all notification streams. + pub fn abort(&self) { + self.inner.lock().state_machine.abort(); + } + + pub fn report(&self, who: PeerId, reputation: ReputationChange) { + self.inner.lock().context.report_peer(who, reputation); + } + + /// Registers a message without propagating it to any peers. The message + /// becomes available to new peers or when the service is asked to gossip + /// the message's topic. No validation is performed on the message, if the + /// message is already expired it should be dropped on the next garbage + /// collection. + pub fn register_gossip_message( + &self, + topic: B::Hash, + message: Vec, + ) { + let message = ConsensusMessage { + engine_id: self.engine_id, + data: message, + }; + + self.inner.lock().state_machine.register_message(topic, message); + } + + /// Broadcast all messages with given topic. + pub fn broadcast_topic(&self, topic: B::Hash, force: bool) { + let mut inner = self.inner.lock(); + let inner = &mut *inner; + inner.state_machine.broadcast_topic(&mut *inner.context, topic, force); + } + + /// Get data of valid, incoming messages for a topic (but might have expired meanwhile). + pub fn messages_for(&self, topic: B::Hash) + -> mpsc::UnboundedReceiver + { + self.inner.lock().state_machine.messages_for(self.engine_id, topic) + } + + /// Send all messages with given topic to a peer. + pub fn send_topic( + &self, + who: &PeerId, + topic: B::Hash, + force: bool + ) { + let mut inner = self.inner.lock(); + let inner = &mut *inner; + inner.state_machine.send_topic(&mut *inner.context, who, topic, self.engine_id, force) + } + + /// Multicast a message to all peers. + pub fn gossip_message( + &self, + topic: B::Hash, + message: Vec, + force: bool, + ) { + let message = ConsensusMessage { + engine_id: self.engine_id, + data: message, + }; + + let mut inner = self.inner.lock(); + let inner = &mut *inner; + inner.state_machine.multicast(&mut *inner.context, topic, message, force) + } + + /// Send addressed message to the given peers. The message is not kept or multicast + /// later on. + pub fn send_message(&self, who: Vec, data: Vec) { + let mut inner = self.inner.lock(); + let inner = &mut *inner; + + for who in &who { + inner.state_machine.send_message(&mut *inner.context, who, ConsensusMessage { + engine_id: self.engine_id, + data: data.clone(), + }); + } + } + + /// Notify everyone we're connected to that we have the given block. + /// + /// Note: this method isn't strictly related to gossiping and should eventually be moved + /// somewhere else. + pub fn announce(&self, block: B::Hash, associated_data: Vec) { + self.inner.lock().context_ext.announce(block, associated_data); + } + + /// Notifies the sync service to try and sync the given block from the given + /// peers. + /// + /// If the given vector of peers is empty then the underlying implementation + /// should make a best effort to fetch the block from any peers it is + /// connected to (NOTE: this assumption will change in the future #3629). + /// + /// Note: this method isn't strictly related to gossiping and should eventually be moved + /// somewhere else. + pub fn set_sync_fork_request(&self, peers: Vec, hash: B::Hash, number: NumberFor) { + self.inner.lock().context_ext.set_sync_fork_request(peers, hash, number); + } +} + +impl Clone for GossipEngine { + fn clone(&self) -> Self { + GossipEngine { + inner: self.inner.clone(), + engine_id: self.engine_id.clone(), + } + } +} + +struct ContextOverService { + network: N, +} + +impl> Context for ContextOverService { + fn report_peer(&mut self, who: PeerId, reputation: ReputationChange) { + self.network.report_peer(who, reputation); + } + + fn disconnect_peer(&mut self, who: PeerId) { + self.network.disconnect_peer(who) + } + + fn send_consensus(&mut self, who: PeerId, messages: Vec) { + for message in messages { + self.network.write_notification(who.clone(), message.engine_id, message.data); + } + } + + fn send_chain_specific(&mut self, _: PeerId, _: Vec) { + log::error!( + target: "sub-libp2p", + "send_chain_specific has been called in a context where it shouldn't" + ); + } +} + +trait ContextExt { + fn announce(&self, block: B::Hash, associated_data: Vec); + fn set_sync_fork_request(&self, peers: Vec, hash: B::Hash, number: NumberFor); +} + +impl> ContextExt for ContextOverService { + fn announce(&self, block: B::Hash, associated_data: Vec) { + Network::announce(&self.network, block, associated_data) + } + + fn set_sync_fork_request(&self, peers: Vec, hash: B::Hash, number: NumberFor) { + Network::set_sync_fork_request(&self.network, peers, hash, number) + } +} diff --git a/client/network-gossip/src/lib.rs b/client/network-gossip/src/lib.rs new file mode 100644 index 0000000000000..6decda05c5142 --- /dev/null +++ b/client/network-gossip/src/lib.rs @@ -0,0 +1,140 @@ +// Copyright 2019 Parity Technologies (UK) Ltd. +// This file is part of Substrate. + +// Substrate is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Substrate is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Substrate. If not, see . + +//! Polite gossiping. +//! +//! This crate provides gossiping capabilities on top of a network. +//! +//! Gossip messages are separated by two categories: "topics" and consensus engine ID. +//! The consensus engine ID is sent over the wire with the message, while the topic is not, +//! with the expectation that the topic can be derived implicitly from the content of the +//! message, assuming it is valid. +//! +//! Topics are a single 32-byte tag associated with a message, used to group those messages +//! in an opaque way. Consensus code can invoke `broadcast_topic` to attempt to send all messages +//! under a single topic to all peers who don't have them yet, and `send_topic` to +//! send all messages under a single topic to a specific peer. +//! +//! # Usage +//! +//! - Implement the `Network` trait, representing the low-level networking primitives. It is +//! already implemented on `sc_network::NetworkService`. +//! - Implement the `Validator` trait. See the section below. +//! - Decide on a `ConsensusEngineId`. Each gossiping protocol should have a different one. +//! - Build a `GossipEngine` using these three elements. +//! - Use the methods of the `GossipEngine` in order to send out messages and receive incoming +//! messages. +//! +//! # What is a validator? +//! +//! The primary role of a `Validator` is to process incoming messages from peers, and decide +//! whether to discard them or process them. It also decides whether to re-broadcast the message. +//! +//! The secondary role of the `Validator` is to check if a message is allowed to be sent to a given +//! peer. All messages, before being sent, will be checked against this filter. +//! This enables the validator to use information it's aware of about connected peers to decide +//! whether to send messages to them at any given moment in time - In particular, to wait until +//! peers can accept and process the message before sending it. +//! +//! Lastly, the fact that gossip validators can decide not to rebroadcast messages +//! opens the door for neighbor status packets to be baked into the gossip protocol. +//! These status packets will typically contain light pieces of information +//! used to inform peers of a current view of protocol state. + +pub use self::bridge::GossipEngine; +pub use self::state_machine::{TopicNotification, MessageIntent}; +pub use self::state_machine::{Validator, ValidatorContext, ValidationResult}; +pub use self::state_machine::DiscardAll; + +use network::{specialization::NetworkSpecialization, Event, ExHashT, NetworkService, PeerId, ReputationChange}; +use sp_runtime::{traits::{Block as BlockT, NumberFor}, ConsensusEngineId}; +use std::sync::Arc; + +mod bridge; +mod state_machine; + +/// Abstraction over a network. +pub trait Network { + /// Returns a stream of events representing what happens on the network. + fn event_stream(&self) -> Box + Send>; + + /// Adjust the reputation of a node. + fn report_peer(&self, peer_id: PeerId, reputation: ReputationChange); + + /// Force-disconnect a peer. + fn disconnect_peer(&self, who: PeerId); + + /// Send a notification to a peer. + fn write_notification(&self, who: PeerId, engine_id: ConsensusEngineId, message: Vec); + + /// Registers a notifications protocol. + /// + /// See the documentation of [`NetworkService:register_notifications_protocol`] for more information. + fn register_notifications_protocol( + &self, + engine_id: ConsensusEngineId + ); + + /// Notify everyone we're connected to that we have the given block. + /// + /// Note: this method isn't strictly related to gossiping and should eventually be moved + /// somewhere else. + fn announce(&self, block: B::Hash, associated_data: Vec); + + /// Notifies the sync service to try and sync the given block from the given + /// peers. + /// + /// If the given vector of peers is empty then the underlying implementation + /// should make a best effort to fetch the block from any peers it is + /// connected to (NOTE: this assumption will change in the future #3629). + /// + /// Note: this method isn't strictly related to gossiping and should eventually be moved + /// somewhere else. + fn set_sync_fork_request(&self, peers: Vec, hash: B::Hash, number: NumberFor); +} + +impl, H: ExHashT> Network for Arc> { + fn event_stream(&self) -> Box + Send> { + Box::new(NetworkService::event_stream(self)) + } + + fn report_peer(&self, peer_id: PeerId, reputation: ReputationChange) { + NetworkService::report_peer(self, peer_id, reputation); + } + + fn disconnect_peer(&self, who: PeerId) { + NetworkService::disconnect_peer(self, who) + } + + fn write_notification(&self, who: PeerId, engine_id: ConsensusEngineId, message: Vec) { + NetworkService::write_notification(self, who, engine_id, message) + } + + fn register_notifications_protocol( + &self, + engine_id: ConsensusEngineId, + ) { + NetworkService::register_notifications_protocol(self, engine_id) + } + + fn announce(&self, block: B::Hash, associated_data: Vec) { + NetworkService::announce_block(self, block, associated_data) + } + + fn set_sync_fork_request(&self, peers: Vec, hash: B::Hash, number: NumberFor) { + NetworkService::set_sync_fork_request(self, peers, hash, number) + } +} diff --git a/client/network/src/protocol/consensus_gossip.rs b/client/network-gossip/src/state_machine.rs similarity index 91% rename from client/network/src/protocol/consensus_gossip.rs rename to client/network-gossip/src/state_machine.rs index 24561debefb54..48854fc2a8b82 100644 --- a/client/network/src/protocol/consensus_gossip.rs +++ b/client/network-gossip/src/state_machine.rs @@ -14,48 +14,19 @@ // You should have received a copy of the GNU General Public License // along with Substrate. If not, see . -//! Utility for gossip of network messages between nodes. -//! Handles chain-specific and standard BFT messages. -//! -//! Gossip messages are separated by two categories: "topics" and consensus engine ID. -//! The consensus engine ID is sent over the wire with the message, while the topic is not, -//! with the expectation that the topic can be derived implicitly from the content of the -//! message, assuming it is valid. -//! -//! Topics are a single 32-byte tag associated with a message, used to group those messages -//! in an opaque way. Consensus code can invoke `broadcast_topic` to attempt to send all messages -//! under a single topic to all peers who don't have them yet, and `send_topic` to -//! send all messages under a single topic to a specific peer. -//! -//! Each consensus engine ID must have an associated, -//! registered `Validator` for all gossip messages. The primary role of this `Validator` is -//! to process incoming messages from peers, and decide whether to discard them or process -//! them. It also decides whether to re-broadcast the message. -//! -//! The secondary role of the `Validator` is to check if a message is allowed to be sent to a given -//! peer. All messages, before being sent, will be checked against this filter. -//! This enables the validator to use information it's aware of about connected peers to decide -//! whether to send messages to them at any given moment in time - In particular, to wait until -//! peers can accept and process the message before sending it. -//! -//! Lastly, the fact that gossip validators can decide not to rebroadcast messages -//! opens the door for neighbor status packets to be baked into the gossip protocol. -//! These status packets will typically contain light pieces of information -//! used to inform peers of a current view of protocol state. - use std::collections::{HashMap, HashSet, hash_map::Entry}; use std::sync::Arc; use std::iter; use std::time; use log::{trace, debug}; -use futures03::channel::mpsc; +use futures::channel::mpsc; use lru::LruCache; use libp2p::PeerId; use sp_runtime::traits::{Block as BlockT, Hash, HashFor}; use sp_runtime::ConsensusEngineId; -pub use crate::message::generic::{Message, ConsensusMessage}; -use crate::protocol::Context; -use crate::config::Roles; +pub use network::message::generic::{Message, ConsensusMessage}; +use network::Context; +use network::config::Roles; // FIXME: Add additional spam/DoS attack protection: https://github.com/paritytech/substrate/issues/1115 const KNOWN_MESSAGES_CACHE_SIZE: usize = 4096; @@ -63,7 +34,7 @@ const KNOWN_MESSAGES_CACHE_SIZE: usize = 4096; const REBROADCAST_INTERVAL: time::Duration = time::Duration::from_secs(30); mod rep { - use peerset::ReputationChange as Rep; + use network::ReputationChange as Rep; /// Reputation change when a peer sends us a gossip message that we didn't know about. pub const GOSSIP_SUCCESS: Rep = Rep::new(1 << 4, "Successfull gossip"); /// Reputation change when a peer sends us a gossip message that we already knew about. @@ -96,16 +67,6 @@ struct MessageEntry { sender: Option, } -/// Consensus message destination. -pub enum MessageRecipient { - /// Send to all peers. - BroadcastToAll, - /// Send to peers that don't have that message already. - BroadcastNew, - /// Send to specific peer. - Peer(PeerId), -} - /// The reason for sending out the message. #[derive(Eq, PartialEq, Copy, Clone)] #[cfg_attr(test, derive(Debug))] @@ -190,7 +151,7 @@ fn propagate<'a, B: BlockT, I>( validators: &HashMap>>, ) // (msg_hash, topic, message) - where I: Clone + IntoIterator, + where I: Clone + IntoIterator, { let mut check_fns = HashMap::new(); let mut message_allowed = move |who: &PeerId, intent: MessageIntent, topic: &B::Hash, message: &ConsensusMessage| { @@ -637,7 +598,7 @@ impl Validator for DiscardAll { mod tests { use std::sync::Arc; use sp_runtime::testing::{H256, Block as RawBlock, ExtrinsicWrapper}; - use futures03::executor::block_on_stream; + use futures::executor::block_on_stream; use super::*; diff --git a/client/network/src/behaviour.rs b/client/network/src/behaviour.rs index fb1f39726a3f2..ae00c717570ee 100644 --- a/client/network/src/behaviour.rs +++ b/client/network/src/behaviour.rs @@ -16,10 +16,11 @@ use crate::{ debug_info, discovery::DiscoveryBehaviour, discovery::DiscoveryOut, DiscoveryNetBehaviour, - protocol::event::DhtEvent + Event, protocol::event::DhtEvent }; use crate::{ExHashT, specialization::NetworkSpecialization}; use crate::protocol::{CustomMessageOutcome, Protocol}; +use consensus::{BlockOrigin, import_queue::{IncomingBlock, Origin}}; use futures::prelude::*; use libp2p::NetworkBehaviour; use libp2p::core::{Multiaddr, PeerId, PublicKey}; @@ -27,7 +28,7 @@ use libp2p::kad::record; use libp2p::swarm::{NetworkBehaviourAction, NetworkBehaviourEventProcess}; use libp2p::core::{nodes::Substream, muxing::StreamMuxerBox}; use log::{debug, warn}; -use sp_runtime::traits::Block as BlockT; +use sp_runtime::{traits::{Block as BlockT, NumberFor}, Justification}; use std::iter; use void; @@ -50,8 +51,10 @@ pub struct Behaviour, H: ExHashT> { /// Event generated by `Behaviour`. pub enum BehaviourOut { - SubstrateAction(CustomMessageOutcome), - Dht(DhtEvent), + BlockImport(BlockOrigin, Vec>), + JustificationImport(Origin, B::Hash, NumberFor, Justification), + FinalityProofImport(Origin, B::Hash, NumberFor, Vec), + Event(Event), } impl, H: ExHashT> Behaviour { @@ -127,7 +130,34 @@ Behaviour { impl, H: ExHashT> NetworkBehaviourEventProcess> for Behaviour { fn inject_event(&mut self, event: CustomMessageOutcome) { - self.events.push(BehaviourOut::SubstrateAction(event)); + match event { + CustomMessageOutcome::BlockImport(origin, blocks) => + self.events.push(BehaviourOut::BlockImport(origin, blocks)), + CustomMessageOutcome::JustificationImport(origin, hash, nb, justification) => + self.events.push(BehaviourOut::JustificationImport(origin, hash, nb, justification)), + CustomMessageOutcome::FinalityProofImport(origin, hash, nb, proof) => + self.events.push(BehaviourOut::FinalityProofImport(origin, hash, nb, proof)), + CustomMessageOutcome::NotificationStreamOpened { remote, protocols, roles } => + for engine_id in protocols { + self.events.push(BehaviourOut::Event(Event::NotificationStreamOpened { + remote: remote.clone(), + engine_id, + roles, + })); + }, + CustomMessageOutcome::NotificationsStreamClosed { remote, protocols } => + for engine_id in protocols { + self.events.push(BehaviourOut::Event(Event::NotificationsStreamClosed { + remote: remote.clone(), + engine_id, + })); + }, + CustomMessageOutcome::NotificationsReceived { remote, messages } => { + let ev = Event::NotificationsReceived { remote, messages }; + self.events.push(BehaviourOut::Event(ev)); + }, + CustomMessageOutcome::None => {} + } } } @@ -166,16 +196,16 @@ impl, H: ExHashT> NetworkBehaviourEventPr self.substrate.add_discovered_nodes(iter::once(peer_id)); } DiscoveryOut::ValueFound(results) => { - self.events.push(BehaviourOut::Dht(DhtEvent::ValueFound(results))); + self.events.push(BehaviourOut::Event(Event::Dht(DhtEvent::ValueFound(results)))); } DiscoveryOut::ValueNotFound(key) => { - self.events.push(BehaviourOut::Dht(DhtEvent::ValueNotFound(key))); + self.events.push(BehaviourOut::Event(Event::Dht(DhtEvent::ValueNotFound(key)))); } DiscoveryOut::ValuePut(key) => { - self.events.push(BehaviourOut::Dht(DhtEvent::ValuePut(key))); + self.events.push(BehaviourOut::Event(Event::Dht(DhtEvent::ValuePut(key)))); } DiscoveryOut::ValuePutFailed(key) => { - self.events.push(BehaviourOut::Dht(DhtEvent::ValuePutFailed(key))); + self.events.push(BehaviourOut::Event(Event::Dht(DhtEvent::ValuePutFailed(key)))); } } } diff --git a/client/network/src/lib.rs b/client/network/src/lib.rs index 755ae803fe66e..ad98986276c9e 100644 --- a/client/network/src/lib.rs +++ b/client/network/src/lib.rs @@ -186,7 +186,7 @@ pub use service::{ NetworkService, NetworkWorker, TransactionPool, ExHashT, ReportHandle, NetworkStateInfo, }; -pub use protocol::{PeerInfo, Context, ProtocolConfig, consensus_gossip, message, specialization}; +pub use protocol::{PeerInfo, Context, ProtocolConfig, message, specialization}; pub use protocol::event::{Event, DhtEvent}; pub use protocol::sync::SyncState; pub use libp2p::{Multiaddr, PeerId}; diff --git a/client/network/src/protocol.rs b/client/network/src/protocol.rs index 88bc1ede52275..87dd1be4ec3ef 100644 --- a/client/network/src/protocol.rs +++ b/client/network/src/protocol.rs @@ -17,7 +17,7 @@ use crate::{DiscoveryNetBehaviour, config::ProtocolId}; use legacy_proto::{LegacyProto, LegacyProtoOut}; use crate::utils::interval; -use bytes::BytesMut; +use bytes::{Bytes, BytesMut}; use futures::prelude::*; use futures03::{StreamExt as _, TryStreamExt as _}; use libp2p::{Multiaddr, PeerId}; @@ -38,7 +38,6 @@ use sp_runtime::traits::{ use sp_arithmetic::traits::SaturatedConversion; use message::{BlockAnnounce, BlockAttributes, Direction, FromBlock, Message, RequestId}; use message::generic::{Message as GenericMessage, ConsensusMessage}; -use consensus_gossip::{ConsensusGossip, MessageRecipient as GossipMessageRecipient}; use light_dispatch::{LightDispatch, LightDispatchNetwork, RequestData}; use specialization::NetworkSpecialization; use sync::{ChainSync, SyncState}; @@ -58,7 +57,6 @@ use util::LruHashSet; mod legacy_proto; mod util; -pub mod consensus_gossip; pub mod message; pub mod event; pub mod light_dispatch; @@ -135,7 +133,6 @@ pub struct Protocol, H: ExHashT> { genesis_hash: B::Hash, sync: ChainSync, specialization: S, - consensus_gossip: ConsensusGossip, context_data: ContextData, /// List of nodes for which we perform additional logging because they are important for the /// user. @@ -149,6 +146,8 @@ pub struct Protocol, H: ExHashT> { finality_proof_provider: Option>>, /// Handles opening the unique substream and sending and receiving raw messages. behaviour: LegacyProto>, + /// List of notification protocols that have been registered. + registered_notif_protocols: HashSet, } #[derive(Default)] @@ -473,13 +472,13 @@ impl, H: ExHashT> Protocol { genesis_hash: info.chain.genesis_hash, sync, specialization, - consensus_gossip: ConsensusGossip::new(), handshaking_peers: HashMap::new(), important_peers, transaction_pool, finality_proof_provider, peerset_handle: peerset_handle.clone(), behaviour, + registered_notif_protocols: HashSet::new(), }; Ok((protocol, peerset_handle)) @@ -614,7 +613,7 @@ impl, H: ExHashT> Protocol { stats.count_in += 1; match message { - GenericMessage::Status(s) => self.on_status_message(who, s), + GenericMessage::Status(s) => return self.on_status_message(who, s), GenericMessage::BlockRequest(r) => self.on_block_request(who, r), GenericMessage::BlockResponse(r) => { // Note, this is safe because only `ordinary bodies` and `remote bodies` are received in this matter. @@ -656,20 +655,38 @@ impl, H: ExHashT> Protocol { return self.on_finality_proof_response(who, response), GenericMessage::RemoteReadChildRequest(request) => self.on_remote_read_child_request(who, request), - GenericMessage::Consensus(msg) => { - self.consensus_gossip.on_incoming( - &mut ProtocolContext::new(&mut self.context_data, &mut self.behaviour, &self.peerset_handle), - who, - vec![msg], - ); - } + GenericMessage::Consensus(msg) => + return if self.registered_notif_protocols.contains(&msg.engine_id) { + CustomMessageOutcome::NotificationsReceived { + remote: who.clone(), + messages: vec![(msg.engine_id, From::from(msg.data))], + } + } else { + warn!(target: "sync", "Received message on non-registered protocol: {:?}", msg.engine_id); + CustomMessageOutcome::None + }, GenericMessage::ConsensusBatch(messages) => { - self.consensus_gossip.on_incoming( - &mut ProtocolContext::new(&mut self.context_data, &mut self.behaviour, &self.peerset_handle), - who, - messages, - ); - } + let messages = messages + .into_iter() + .filter_map(|msg| { + if self.registered_notif_protocols.contains(&msg.engine_id) { + Some((msg.engine_id, From::from(msg.data))) + } else { + warn!(target: "sync", "Received message on non-registered protocol: {:?}", msg.engine_id); + None + } + }) + .collect::>(); + + return if !messages.is_empty() { + CustomMessageOutcome::NotificationsReceived { + remote: who.clone(), + messages, + } + } else { + CustomMessageOutcome::None + }; + }, GenericMessage::ChainSpecific(msg) => self.specialization.on_message( &mut ProtocolContext::new(&mut self.context_data, &mut self.behaviour, &self.peerset_handle), who, @@ -699,14 +716,6 @@ impl, H: ExHashT> Protocol { ); } - /// Locks `self` and returns a context plus the `ConsensusGossip` struct. - pub fn consensus_gossip_lock<'a>( - &'a mut self, - ) -> (impl Context + 'a, &'a mut ConsensusGossip) { - let context = ProtocolContext::new(&mut self.context_data, &mut self.behaviour, &self.peerset_handle); - (context, &mut self.consensus_gossip) - } - /// Locks `self` and returns a context plus the network specialization. pub fn specialization_lock<'a>( &'a mut self, @@ -715,26 +724,6 @@ impl, H: ExHashT> Protocol { (context, &mut self.specialization) } - /// Gossip a consensus message to the network. - pub fn gossip_consensus_message( - &mut self, - topic: B::Hash, - engine_id: ConsensusEngineId, - message: Vec, - recipient: GossipMessageRecipient, - ) { - let mut context = ProtocolContext::new(&mut self.context_data, &mut self.behaviour, &self.peerset_handle); - let message = ConsensusMessage { data: message, engine_id }; - match recipient { - GossipMessageRecipient::BroadcastToAll => - self.consensus_gossip.multicast(&mut context, topic, message, true), - GossipMessageRecipient::BroadcastNew => - self.consensus_gossip.multicast(&mut context, topic, message, false), - GossipMessageRecipient::Peer(who) => - self.send_message(&who, GenericMessage::Consensus(message)), - } - } - /// Called when a new peer is connected pub fn on_peer_connected(&mut self, who: PeerId) { trace!(target: "sync", "Connecting {}", who); @@ -755,11 +744,8 @@ impl, H: ExHashT> Protocol { self.handshaking_peers.remove(&peer); self.context_data.peers.remove(&peer) }; - if let Some(peer_data) = removed { + if let Some(_peer_data) = removed { let mut context = ProtocolContext::new(&mut self.context_data, &mut self.behaviour, &self.peerset_handle); - if peer_data.info.protocol_version > 2 { - self.consensus_gossip.peer_disconnected(&mut context, peer.clone()); - } self.sync.peer_disconnected(peer.clone()); self.specialization.on_disconnect(&mut context, peer.clone()); self.light_dispatch.on_disconnect(LightDispatchIn { @@ -922,9 +908,6 @@ impl, H: ExHashT> Protocol { /// /// > **Note**: This method normally doesn't have to be called except for testing purposes. pub fn tick(&mut self) { - self.consensus_gossip.tick( - &mut ProtocolContext::new(&mut self.context_data, &mut self.behaviour, &self.peerset_handle) - ); self.maintain_peers(); self.light_dispatch.maintain_peers(LightDispatchIn { behaviour: &mut self.behaviour, @@ -975,9 +958,9 @@ impl, H: ExHashT> Protocol { } /// Called by peer to report status - fn on_status_message(&mut self, who: PeerId, status: message::Status) { + fn on_status_message(&mut self, who: PeerId, status: message::Status) -> CustomMessageOutcome { trace!(target: "sync", "New peer {} {:?}", who, status); - let protocol_version = { + let _protocol_version = { if self.context_data.peers.contains_key(&who) { log!( target: "sync", @@ -985,7 +968,7 @@ impl, H: ExHashT> Protocol { "Unexpected status packet from {}", who ); self.peerset_handle.report_peer(who, rep::UNEXPECTED_STATUS); - return; + return CustomMessageOutcome::None; } if status.genesis_hash != self.genesis_hash { log!( @@ -996,7 +979,7 @@ impl, H: ExHashT> Protocol { ); self.peerset_handle.report_peer(who.clone(), rep::GENESIS_MISMATCH); self.behaviour.disconnect_peer(&who); - return; + return CustomMessageOutcome::None; } if status.version < MIN_VERSION && CURRENT_VERSION < status.min_supported_version { log!( @@ -1006,7 +989,7 @@ impl, H: ExHashT> Protocol { ); self.peerset_handle.report_peer(who.clone(), rep::BAD_PROTOCOL); self.behaviour.disconnect_peer(&who); - return; + return CustomMessageOutcome::None; } if self.config.roles.is_light() { @@ -1015,7 +998,7 @@ impl, H: ExHashT> Protocol { debug!(target: "sync", "Peer {} is unable to serve light requests", who); self.peerset_handle.report_peer(who.clone(), rep::BAD_ROLE); self.behaviour.disconnect_peer(&who); - return; + return CustomMessageOutcome::None; } // we don't interested in peers that are far behind us @@ -1032,7 +1015,7 @@ impl, H: ExHashT> Protocol { debug!(target: "sync", "Peer {} is far behind us and will unable to serve light requests", who); self.peerset_handle.report_peer(who.clone(), rep::PEER_BEHIND_US_LIGHT); self.behaviour.disconnect_peer(&who); - return; + return CustomMessageOutcome::None; } } @@ -1047,7 +1030,7 @@ impl, H: ExHashT> Protocol { }, None => { error!(target: "sync", "Received status from previously unconnected node {}", who); - return; + return CustomMessageOutcome::None; }, }; @@ -1082,11 +1065,64 @@ impl, H: ExHashT> Protocol { } } } + let mut context = ProtocolContext::new(&mut self.context_data, &mut self.behaviour, &self.peerset_handle); - if protocol_version > 2 { - self.consensus_gossip.new_peer(&mut context, who.clone(), status.roles); + self.specialization.on_connect(&mut context, who.clone(), status); + + // Notify all the notification protocols as open. + CustomMessageOutcome::NotificationStreamOpened { + remote: who, + protocols: self.registered_notif_protocols.iter().cloned().collect(), + roles: info.roles, + } + } + + /// Send a notification to the given peer we're connected to. + /// + /// Doesn't do anything if we don't have a notifications substream for that protocol with that + /// peer. + pub fn write_notification( + &mut self, + target: PeerId, + engine_id: ConsensusEngineId, + message: impl Into> + ) { + if !self.registered_notif_protocols.contains(&engine_id) { + error!( + target: "sub-libp2p", + "Sending a notification with a protocol that wasn't registered: {:?}", + engine_id + ); + } + + self.send_message(&target, GenericMessage::Consensus(ConsensusMessage { + engine_id, + data: message.into(), + })); + } + + /// Registers a new notifications protocol. + /// + /// You are very strongly encouraged to call this method very early on. Any connection open + /// will retain the protocols that were registered then, and not any new one. + pub fn register_notifications_protocol( + &mut self, + engine_id: ConsensusEngineId, + ) -> Vec { + if !self.registered_notif_protocols.insert(engine_id) { + error!(target: "sub-libp2p", "Notifications protocol already registered: {:?}", engine_id); } - self.specialization.on_connect(&mut context, who, status); + + // Registering a protocol while we already have open connections isn't great, but for now + // we handle it by notifying that we opened channels with everyone. + self.context_data.peers.iter() + .map(|(peer_id, peer)| + event::Event::NotificationStreamOpened { + remote: peer_id.clone(), + engine_id, + roles: peer.info.roles, + }) + .collect() } /// Called when peer sends us new extrinsics @@ -1758,6 +1794,12 @@ pub enum CustomMessageOutcome { BlockImport(BlockOrigin, Vec>), JustificationImport(Origin, B::Hash, NumberFor, Justification), FinalityProofImport(Origin, B::Hash, NumberFor, Vec), + /// Notification protocols have been opened with a remote. + NotificationStreamOpened { remote: PeerId, protocols: Vec, roles: Roles }, + /// Notification protocols have been closed with a remote. + NotificationsStreamClosed { remote: PeerId, protocols: Vec }, + /// Messages have been received on one or more notifications protocols. + NotificationsReceived { remote: PeerId, messages: Vec<(ConsensusEngineId, Bytes)> }, None, } @@ -1887,12 +1929,16 @@ Protocol { version <= CURRENT_VERSION as u8 && version >= MIN_VERSION as u8 ); - self.on_peer_connected(peer_id); + self.on_peer_connected(peer_id.clone()); CustomMessageOutcome::None } LegacyProtoOut::CustomProtocolClosed { peer_id, .. } => { - self.on_peer_disconnected(peer_id); - CustomMessageOutcome::None + self.on_peer_disconnected(peer_id.clone()); + // Notify all the notification protocols as closed. + CustomMessageOutcome::NotificationsStreamClosed { + remote: peer_id, + protocols: self.registered_notif_protocols.iter().cloned().collect(), + } }, LegacyProtoOut::CustomMessage { peer_id, message } => self.on_custom_message(peer_id, message), diff --git a/client/network/src/protocol/event.rs b/client/network/src/protocol/event.rs index c8bee5588c704..98aad8c76c804 100644 --- a/client/network/src/protocol/event.rs +++ b/client/network/src/protocol/event.rs @@ -17,10 +17,15 @@ //! Network event types. These are are not the part of the protocol, but rather //! events that happen on the network like DHT get/put results received. +use crate::config::Roles; +use bytes::Bytes; +use libp2p::core::PeerId; use libp2p::kad::record::Key; +use sp_runtime::ConsensusEngineId; /// Events generated by DHT as a response to get_value and put_value requests. #[derive(Debug, Clone)] +#[must_use] pub enum DhtEvent { /// The value was found. ValueFound(Vec<(Key, Vec)>), @@ -37,7 +42,37 @@ pub enum DhtEvent { /// Type for events generated by networking layer. #[derive(Debug, Clone)] +#[must_use] pub enum Event { /// Event generated by a DHT. Dht(DhtEvent), + + /// Opened a substream with the given node with the given notifications protocol. + /// + /// The protocol is always one of the notification protocols that have been registered. + NotificationStreamOpened { + /// Node we opened the substream with. + remote: PeerId, + /// The concerned protocol. Each protocol uses a different substream. + engine_id: ConsensusEngineId, + /// Roles that the remote . + roles: Roles, + }, + + /// Closed a substream with the given node. Always matches a corresponding previous + /// `NotificationStreamOpened` message. + NotificationsStreamClosed { + /// Node we closed the substream with. + remote: PeerId, + /// The concerned protocol. Each protocol uses a different substream. + engine_id: ConsensusEngineId, + }, + + /// Received one or more messages from the given node using the given protocol. + NotificationsReceived { + /// Node we received the message from. + remote: PeerId, + /// Concerned protocol and associated message. + messages: Vec<(ConsensusEngineId, Bytes)>, + }, } diff --git a/client/network/src/service.rs b/client/network/src/service.rs index 85fd1c3ff5581..c137932090878 100644 --- a/client/network/src/service.rs +++ b/client/network/src/service.rs @@ -45,8 +45,7 @@ use crate::{NetworkState, NetworkStateNotConnectedPeer, NetworkStatePeer}; use crate::{transport, config::NonReservedPeerMode, ReputationChange}; use crate::config::{Params, TransportConfig}; use crate::error::Error; -use crate::protocol::{self, Protocol, Context, CustomMessageOutcome, PeerInfo}; -use crate::protocol::consensus_gossip::{ConsensusGossip, MessageRecipient as GossipMessageRecipient}; +use crate::protocol::{self, Protocol, Context, PeerInfo}; use crate::protocol::{event::Event, light_dispatch::{AlwaysBadChecker, RequestData}}; use crate::protocol::specialization::NetworkSpecialization; use crate::protocol::sync::SyncState; @@ -276,6 +275,7 @@ impl, H: ExHashT> NetworkWorker import_queue: params.import_queue, from_worker, light_client_rqs: params.on_demand.and_then(|od| od.extract_receiver()), + event_streams: Vec::new(), }) } @@ -416,6 +416,55 @@ impl, H: ExHashT> NetworkServic self.local_peer_id.clone() } + /// Writes a message on an open notifications channel. Has no effect if the notifications + /// channel with this protocol name is closed. + /// + /// > **Note**: The reason why this is a no-op in the situation where we have no channel is + /// > that we don't guarantee message delivery anyway. Networking issues can cause + /// > connections to drop at any time, and higher-level logic shouldn't differentiate + /// > between the remote voluntarily closing a substream or a network error + /// > preventing the message from being delivered. + /// + /// The protocol must have been registered with `register_notifications_protocol`. + /// + pub fn write_notification(&self, target: PeerId, engine_id: ConsensusEngineId, message: Vec) { + let _ = self.to_worker.unbounded_send(ServerToWorkerMsg::WriteNotification { + target, + engine_id, + message, + }); + } + + /// Returns a stream containing the events that happen on the network. + /// + /// If this method is called multiple times, the events are duplicated. + /// + /// The stream never ends (unless the `NetworkWorker` gets shut down). + pub fn event_stream(&self) -> impl Stream { + // Note: when transitioning to stable futures, remove the `Error` entirely + let (tx, rx) = mpsc::unbounded(); + let _ = self.to_worker.unbounded_send(ServerToWorkerMsg::EventStream(tx)); + rx + } + + /// Registers a new notifications protocol. + /// + /// After that, you can call `write_notifications`. + /// + /// Please call `event_stream` before registering a protocol, otherwise you may miss events + /// about the protocol that you have registered. + /// + /// You are very strongly encouraged to call this method very early on. Any connection open + /// will retain the protocols that were registered then, and not any new one. + pub fn register_notifications_protocol( + &self, + engine_id: ConsensusEngineId, + ) { + let _ = self.to_worker.unbounded_send(ServerToWorkerMsg::RegisterNotifProtocol { + engine_id, + }); + } + /// You must call this when new transactons are imported by the transaction pool. /// /// The latest transactions will be fetched from the `TransactionPool` that was passed at @@ -432,27 +481,19 @@ impl, H: ExHashT> NetworkServic let _ = self.to_worker.unbounded_send(ServerToWorkerMsg::AnnounceBlock(hash, data)); } - /// Send a consensus message through the gossip - pub fn gossip_consensus_message( - &self, - topic: B::Hash, - engine_id: ConsensusEngineId, - message: Vec, - recipient: GossipMessageRecipient, - ) { - let _ = self - .to_worker - .unbounded_send(ServerToWorkerMsg::GossipConsensusMessage( - topic, engine_id, message, recipient, - )); - } - /// Report a given peer as either beneficial (+) or costly (-) according to the /// given scalar. pub fn report_peer(&self, who: PeerId, cost_benefit: ReputationChange) { self.peerset.report_peer(who, cost_benefit); } + /// Disconnect from a node as soon as possible. + /// + /// This triggers the same effects as if the connection had closed itself spontaneously. + pub fn disconnect_peer(&self, who: PeerId) { + let _ = self.to_worker.unbounded_send(ServerToWorkerMsg::DisconnectPeer(who)); + } + /// Request a justification for the given block from the network. /// /// On success, the justification will be passed to the import queue that was part at @@ -472,15 +513,6 @@ impl, H: ExHashT> NetworkServic .unbounded_send(ServerToWorkerMsg::ExecuteWithSpec(Box::new(f))); } - /// Execute a closure with the consensus gossip. - pub fn with_gossip(&self, f: F) - where F: FnOnce(&mut ConsensusGossip, &mut dyn Context) + Send + 'static - { - let _ = self - .to_worker - .unbounded_send(ServerToWorkerMsg::ExecuteWithGossip(Box::new(f))); - } - /// Are we in the process of downloading the chain? pub fn is_major_syncing(&self) -> bool { self.is_major_syncing.load(Ordering::Relaxed) @@ -630,12 +662,20 @@ enum ServerToWorkerMsg> { RequestJustification(B::Hash, NumberFor), AnnounceBlock(B::Hash, Vec), ExecuteWithSpec(Box) + Send>), - ExecuteWithGossip(Box, &mut dyn Context) + Send>), - GossipConsensusMessage(B::Hash, ConsensusEngineId, Vec, GossipMessageRecipient), GetValue(record::Key), PutValue(record::Key, Vec), AddKnownAddress(PeerId, Multiaddr), SyncFork(Vec, B::Hash, NumberFor), + EventStream(mpsc::UnboundedSender), + WriteNotification { + message: Vec, + engine_id: ConsensusEngineId, + target: PeerId, + }, + RegisterNotifProtocol { + engine_id: ConsensusEngineId, + }, + DisconnectPeer(PeerId), } /// Main network worker. Must be polled in order for the network to advance. @@ -659,13 +699,15 @@ pub struct NetworkWorker, H: Ex from_worker: mpsc::UnboundedReceiver>, /// Receiver for queries from the light client that must be processed. light_client_rqs: Option>>, + /// Senders for events that happen on the network. + event_streams: Vec>, } -impl, H: ExHashT> Stream for NetworkWorker { - type Item = Event; +impl, H: ExHashT> Future for NetworkWorker { + type Item = (); type Error = io::Error; - fn poll(&mut self) -> Poll, Self::Error> { + fn poll(&mut self) -> Poll { // Poll the import queue for actions to perform. let _ = futures03::future::poll_fn(|cx| { self.import_queue.poll_actions(cx, &mut NetworkLink { @@ -685,7 +727,7 @@ impl, H: ExHashT> Stream for Ne // Process the next message coming from the `NetworkService`. let msg = match self.from_worker.poll() { Ok(Async::Ready(Some(msg))) => msg, - Ok(Async::Ready(None)) | Err(_) => return Ok(Async::Ready(None)), + Ok(Async::Ready(None)) | Err(_) => return Ok(Async::Ready(())), Ok(Async::NotReady) => break, }; @@ -695,13 +737,6 @@ impl, H: ExHashT> Stream for Ne let (mut context, spec) = protocol.specialization_lock(); task(spec, &mut context); }, - ServerToWorkerMsg::ExecuteWithGossip(task) => { - let protocol = self.network_service.user_protocol_mut(); - let (mut context, gossip) = protocol.consensus_gossip_lock(); - task(gossip, &mut context); - } - ServerToWorkerMsg::GossipConsensusMessage(topic, engine_id, message, recipient) => - self.network_service.user_protocol_mut().gossip_consensus_message(topic, engine_id, message, recipient), ServerToWorkerMsg::AnnounceBlock(hash, data) => self.network_service.user_protocol_mut().announce_block(hash, data), ServerToWorkerMsg::RequestJustification(hash, number) => @@ -716,6 +751,18 @@ impl, H: ExHashT> Stream for Ne self.network_service.add_known_address(peer_id, addr), ServerToWorkerMsg::SyncFork(peer_ids, hash, number) => self.network_service.user_protocol_mut().set_sync_fork_request(peer_ids, &hash, number), + ServerToWorkerMsg::EventStream(sender) => + self.event_streams.push(sender), + ServerToWorkerMsg::WriteNotification { message, engine_id, target } => + self.network_service.user_protocol_mut().write_notification(target, engine_id, message), + ServerToWorkerMsg::RegisterNotifProtocol { engine_id } => { + let events = self.network_service.user_protocol_mut().register_notifications_protocol(engine_id); + for event in events { + self.event_streams.retain(|sender| sender.unbounded_send(event.clone()).is_ok()); + } + }, + ServerToWorkerMsg::DisconnectPeer(who) => + self.network_service.user_protocol_mut().disconnect_peer(&who), } } @@ -723,27 +770,23 @@ impl, H: ExHashT> Stream for Ne // Process the next action coming from the network. let poll_value = self.network_service.poll(); - let outcome = match poll_value { + match poll_value { Ok(Async::NotReady) => break, - Ok(Async::Ready(Some(BehaviourOut::SubstrateAction(outcome)))) => outcome, - Ok(Async::Ready(Some(BehaviourOut::Dht(ev)))) => - return Ok(Async::Ready(Some(Event::Dht(ev)))), - Ok(Async::Ready(None)) => CustomMessageOutcome::None, + Ok(Async::Ready(Some(BehaviourOut::BlockImport(origin, blocks)))) => + self.import_queue.import_blocks(origin, blocks), + Ok(Async::Ready(Some(BehaviourOut::JustificationImport(origin, hash, nb, justification)))) => + self.import_queue.import_justification(origin, hash, nb, justification), + Ok(Async::Ready(Some(BehaviourOut::FinalityProofImport(origin, hash, nb, proof)))) => + self.import_queue.import_finality_proof(origin, hash, nb, proof), + Ok(Async::Ready(Some(BehaviourOut::Event(ev)))) => { + self.event_streams.retain(|sender| sender.unbounded_send(ev.clone()).is_ok()); + }, + Ok(Async::Ready(None)) => {}, Err(err) => { error!(target: "sync", "Error in the network: {:?}", err); return Err(err) } }; - - match outcome { - CustomMessageOutcome::BlockImport(origin, blocks) => - self.import_queue.import_blocks(origin, blocks), - CustomMessageOutcome::JustificationImport(origin, hash, nb, justification) => - self.import_queue.import_justification(origin, hash, nb, justification), - CustomMessageOutcome::FinalityProofImport(origin, hash, nb, proof) => - self.import_queue.import_finality_proof(origin, hash, nb, proof), - CustomMessageOutcome::None => {} - } } // Update the variables shared with the `NetworkService`. diff --git a/client/service/src/lib.rs b/client/service/src/lib.rs index 3ec785ff4ac13..de6077e12f592 100644 --- a/client/service/src/lib.rs +++ b/client/service/src/lib.rs @@ -131,6 +131,14 @@ impl Executor + Send>> for SpawnTaskHandle } } +impl futures03::task::Spawn for SpawnTaskHandle { + fn spawn_obj(&self, future: futures03::task::FutureObj<'static, ()>) + -> Result<(), futures03::task::SpawnError> { + self.execute(Box::new(futures03::compat::Compat::new(future.unit_error()))) + .map_err(|_| futures03::task::SpawnError::shutdown()) + } +} + /// Abstraction over a Substrate service. pub trait AbstractService: 'static + Future + Executor + Send>> + Send { @@ -375,6 +383,9 @@ fn build_network_future< let mut finality_notification_stream = client.finality_notification_stream().fuse() .map(|v| Ok::<_, ()>(v)).compat(); + // Initializing a stream in order to obtain DHT events from the network. + let mut event_stream = network.service().event_stream(); + futures::future::poll_fn(move || { let before_polling = Instant::now(); @@ -451,22 +462,32 @@ fn build_network_future< (status, state) }); + // Processing DHT events. + while let Ok(Async::Ready(Some(event))) = event_stream.poll() { + match event { + Event::Dht(event) => { + // Given that client/authority-discovery is the only upper stack consumer of Dht events at the moment, all Dht + // events are being passed on to the authority-discovery module. In the future there might be multiple + // consumers of these events. In that case this would need to be refactored to properly dispatch the events, + // e.g. via a subscriber model. + if let Some(Err(e)) = dht_event_tx.as_ref().map(|c| c.clone().try_send(event)) { + if e.is_full() { + warn!(target: "service", "Dht event channel to authority discovery is full, dropping event."); + } else if e.is_disconnected() { + warn!(target: "service", "Dht event channel to authority discovery is disconnected, dropping event."); + } + } + } + _ => {} + } + } + // Main network polling. - while let Ok(Async::Ready(Some(Event::Dht(event)))) = network.poll().map_err(|err| { + if let Ok(Async::Ready(())) = network.poll().map_err(|err| { warn!(target: "service", "Error in network: {:?}", err); }) { - // Given that client/authority-discovery is the only upper stack consumer of Dht events at the moment, all Dht - // events are being passed on to the authority-discovery module. In the future there might be multiple - // consumers of these events. In that case this would need to be refactored to properly dispatch the events, - // e.g. via a subscriber model. - if let Some(Err(e)) = dht_event_tx.as_ref().map(|c| c.clone().try_send(event)) { - if e.is_full() { - warn!(target: "service", "Dht event channel to authority discovery is full, dropping event."); - } else if e.is_disconnected() { - warn!(target: "service", "Dht event channel to authority discovery is disconnected, dropping event."); - } - } - }; + return Ok(Async::Ready(())); + } // Now some diagnostic for performances. let polling_dur = before_polling.elapsed();