diff --git a/core/cli/src/informant.rs b/core/cli/src/informant.rs index 3eb5a0d992249..f89f8b23f5a1e 100644 --- a/core/cli/src/informant.rs +++ b/core/cli/src/informant.rs @@ -44,7 +44,7 @@ where C: Components { let mut last_number = None; let mut last_update = time::Instant::now(); - let display_notifications = service.network_status().for_each(move |net_status| { + let display_notifications = service.network_status().for_each(move |(net_status, _)| { let info = client.info(); let best_number = info.chain.best_number.saturated_into::(); diff --git a/core/cli/src/lib.rs b/core/cli/src/lib.rs index 874e08c7bbded..39b98c092b748 100644 --- a/core/cli/src/lib.rs +++ b/core/cli/src/lib.rs @@ -32,8 +32,7 @@ use service::{ }; use network::{ self, multiaddr::Protocol, - config::{NetworkConfiguration, TransportConfig, NonReservedPeerMode, NodeKeyConfig}, - build_multiaddr, + config::{NetworkConfiguration, TransportConfig, NonReservedPeerMode, NodeKeyConfig, build_multiaddr}, }; use primitives::H256; @@ -250,16 +249,16 @@ where params.node_key.as_ref().map(parse_secp256k1_secret).unwrap_or_else(|| Ok(params.node_key_file .or_else(|| net_config_file(net_config_dir, NODE_KEY_SECP256K1_FILE)) - .map(network::Secret::File) - .unwrap_or(network::Secret::New))) + .map(network::config::Secret::File) + .unwrap_or(network::config::Secret::New))) .map(NodeKeyConfig::Secp256k1), NodeKeyType::Ed25519 => params.node_key.as_ref().map(parse_ed25519_secret).unwrap_or_else(|| Ok(params.node_key_file .or_else(|| net_config_file(net_config_dir, NODE_KEY_ED25519_FILE)) - .map(network::Secret::File) - .unwrap_or(network::Secret::New))) + .map(network::config::Secret::File) + .unwrap_or(network::config::Secret::New))) .map(NodeKeyConfig::Ed25519) } } @@ -277,18 +276,18 @@ fn invalid_node_key(e: impl std::fmt::Display) -> error::Error { } /// Parse a Secp256k1 secret key from a hex string into a `network::Secret`. -fn parse_secp256k1_secret(hex: &String) -> error::Result { +fn parse_secp256k1_secret(hex: &String) -> error::Result { H256::from_str(hex).map_err(invalid_node_key).and_then(|bytes| - network::identity::secp256k1::SecretKey::from_bytes(bytes) - .map(network::Secret::Input) + network::config::identity::secp256k1::SecretKey::from_bytes(bytes) + .map(network::config::Secret::Input) .map_err(invalid_node_key)) } /// Parse a Ed25519 secret key from a hex string into a `network::Secret`. -fn parse_ed25519_secret(hex: &String) -> error::Result { +fn parse_ed25519_secret(hex: &String) -> error::Result { H256::from_str(&hex).map_err(invalid_node_key).and_then(|bytes| - network::identity::ed25519::SecretKey::from_bytes(bytes) - .map(network::Secret::Input) + network::config::identity::ed25519::SecretKey::from_bytes(bytes) + .map(network::config::Secret::Input) .map_err(invalid_node_key)) } @@ -810,7 +809,7 @@ fn kill_color(s: &str) -> String { mod tests { use super::*; use tempdir::TempDir; - use network::identity::{secp256k1, ed25519}; + use network::config::identity::{secp256k1, ed25519}; #[test] fn tests_node_name_good() { @@ -842,10 +841,10 @@ mod tests { node_key_file: None }; node_key_config(params, &net_config_dir).and_then(|c| match c { - NodeKeyConfig::Secp256k1(network::Secret::Input(ref ski)) + NodeKeyConfig::Secp256k1(network::config::Secret::Input(ref ski)) if node_key_type == NodeKeyType::Secp256k1 && &sk[..] == ski.to_bytes() => Ok(()), - NodeKeyConfig::Ed25519(network::Secret::Input(ref ski)) + NodeKeyConfig::Ed25519(network::config::Secret::Input(ref ski)) if node_key_type == NodeKeyType::Ed25519 && &sk[..] == ski.as_ref() => Ok(()), _ => Err(error::Error::Input("Unexpected node key config".into())) @@ -870,9 +869,9 @@ mod tests { node_key_file: Some(file.clone()) }; node_key_config(params, &net_config_dir).and_then(|c| match c { - NodeKeyConfig::Secp256k1(network::Secret::File(ref f)) + NodeKeyConfig::Secp256k1(network::config::Secret::File(ref f)) if node_key_type == NodeKeyType::Secp256k1 && f == &file => Ok(()), - NodeKeyConfig::Ed25519(network::Secret::File(ref f)) + NodeKeyConfig::Ed25519(network::config::Secret::File(ref f)) if node_key_type == NodeKeyType::Ed25519 && f == &file => Ok(()), _ => Err(error::Error::Input("Unexpected node key config".into())) }) @@ -904,9 +903,9 @@ mod tests { let typ = params.node_key_type; node_key_config::(params, &None) .and_then(|c| match c { - NodeKeyConfig::Secp256k1(network::Secret::New) + NodeKeyConfig::Secp256k1(network::config::Secret::New) if typ == NodeKeyType::Secp256k1 => Ok(()), - NodeKeyConfig::Ed25519(network::Secret::New) + NodeKeyConfig::Ed25519(network::config::Secret::New) if typ == NodeKeyType::Ed25519 => Ok(()), _ => Err(error::Error::Input("Unexpected node key config".into())) }) @@ -919,10 +918,10 @@ mod tests { let typ = params.node_key_type; node_key_config(params, &Some(net_config_dir.clone())) .and_then(move |c| match c { - NodeKeyConfig::Secp256k1(network::Secret::File(ref f)) + NodeKeyConfig::Secp256k1(network::config::Secret::File(ref f)) if typ == NodeKeyType::Secp256k1 && f == &dir.join(NODE_KEY_SECP256K1_FILE) => Ok(()), - NodeKeyConfig::Ed25519(network::Secret::File(ref f)) + NodeKeyConfig::Ed25519(network::config::Secret::File(ref f)) if typ == NodeKeyType::Ed25519 && f == &dir.join(NODE_KEY_ED25519_FILE) => Ok(()), _ => Err(error::Error::Input("Unexpected node key config".into())) diff --git a/core/consensus/aura/src/lib.rs b/core/consensus/aura/src/lib.rs index 6b5ac2e6b60f4..49b505a7f5611 100644 --- a/core/consensus/aura/src/lib.rs +++ b/core/consensus/aura/src/lib.rs @@ -797,8 +797,8 @@ mod tests { } } - fn peer(&self, i: usize) -> &Peer { - &self.peers[i] + fn peer(&mut self, i: usize) -> &mut Peer { + &mut self.peers[i] } fn peers(&self) -> &Vec> { diff --git a/core/consensus/babe/src/lib.rs b/core/consensus/babe/src/lib.rs index cdaeae6adb5c8..853b1c91bf1b6 100644 --- a/core/consensus/babe/src/lib.rs +++ b/core/consensus/babe/src/lib.rs @@ -960,9 +960,9 @@ mod tests { }) } - fn peer(&self, i: usize) -> &Peer { + fn peer(&mut self, i: usize) -> &mut Peer { trace!(target: "babe", "Retreiving a peer"); - &self.peers[i] + &mut self.peers[i] } fn peers(&self) -> &Vec> { diff --git a/core/consensus/common/Cargo.toml b/core/consensus/common/Cargo.toml index 52da224871e72..c6da0c682e705 100644 --- a/core/consensus/common/Cargo.toml +++ b/core/consensus/common/Cargo.toml @@ -25,4 +25,3 @@ test-client = { package = "substrate-test-runtime-client", path = "../../test-ru [features] default = [] -test-helpers = [] diff --git a/core/consensus/common/src/import_queue.rs b/core/consensus/common/src/import_queue.rs index cc090a9b91b6d..3987844a1e040 100644 --- a/core/consensus/common/src/import_queue.rs +++ b/core/consensus/common/src/import_queue.rs @@ -145,9 +145,6 @@ pub trait Link: Send { fn report_peer(&mut self, _who: Origin, _reputation_change: i32) {} /// Restart sync. fn restart(&mut self) {} - /// Synchronization request has been processed. - #[cfg(any(test, feature = "test-helpers"))] - fn synchronized(&mut self) {} } /// Block import successful result. diff --git a/core/consensus/common/src/import_queue/basic_queue.rs b/core/consensus/common/src/import_queue/basic_queue.rs index 74353dbbecb64..b06334270c783 100644 --- a/core/consensus/common/src/import_queue/basic_queue.rs +++ b/core/consensus/common/src/import_queue/basic_queue.rs @@ -85,15 +85,6 @@ impl BasicQueue { manual_poll: None, } } - - /// Send synchronization request to the block import channel. - /// - /// The caller should wait for Link::synchronized() call to ensure that it - /// has synchronized with ImportQueue. - #[cfg(any(test, feature = "test-helpers"))] - pub fn synchronize(&self) { - let _ = self.sender.unbounded_send(ToWorkerMsg::Synchronize); - } } impl ImportQueue for BasicQueue { @@ -153,8 +144,6 @@ enum ToWorkerMsg { ImportBlocks(BlockOrigin, Vec>), ImportJustification(Origin, B::Hash, NumberFor, Justification), ImportFinalityProof(Origin, B::Hash, NumberFor, Vec), - #[cfg(any(test, feature = "test-helpers"))] - Synchronize, } struct BlockImportWorker> { @@ -213,11 +202,6 @@ impl> BlockImportWorker { ToWorkerMsg::ImportJustification(who, hash, number, justification) => { worker.import_justification(who, hash, number, justification); } - #[cfg(any(test, feature = "test-helpers"))] - ToWorkerMsg::Synchronize => { - trace!(target: "sync", "Sending sync message"); - worker.result_sender.synchronized(); - }, } } }); diff --git a/core/consensus/common/src/import_queue/buffered_link.rs b/core/consensus/common/src/import_queue/buffered_link.rs index f73b439a66cfa..6a42ddb01afab 100644 --- a/core/consensus/common/src/import_queue/buffered_link.rs +++ b/core/consensus/common/src/import_queue/buffered_link.rs @@ -63,8 +63,6 @@ enum BlockImportWorkerMsg { SetFinalityProofRequestBuilder(SharedFinalityProofRequestBuilder), ReportPeer(Origin, i32), Restart, - #[cfg(any(test, feature = "test-helpers"))] - Synchronized, } impl Link for BufferedLinkSender { @@ -120,11 +118,6 @@ impl Link for BufferedLinkSender { fn restart(&mut self) { let _ = self.tx.unbounded_send(BlockImportWorkerMsg::Restart); } - - #[cfg(any(test, feature = "test-helpers"))] - fn synchronized(&mut self) { - let _ = self.tx.unbounded_send(BlockImportWorkerMsg::Synchronized); - } } /// See [`buffered_link`]. @@ -168,9 +161,6 @@ impl BufferedLinkReceiver { link.report_peer(who, reput), BlockImportWorkerMsg::Restart => link.restart(), - #[cfg(any(test, feature = "test-helpers"))] - BlockImportWorkerMsg::Synchronized => - link.synchronized(), } } } diff --git a/core/finality-grandpa/Cargo.toml b/core/finality-grandpa/Cargo.toml index 41e4668b51496..7aeefa53cb5ac 100644 --- a/core/finality-grandpa/Cargo.toml +++ b/core/finality-grandpa/Cargo.toml @@ -27,7 +27,6 @@ fg_primitives = { package = "substrate-finality-grandpa-primitives", path = "pri grandpa = { package = "finality-grandpa", version = "0.8.1", features = ["derive-codec"] } [dev-dependencies] -consensus_common = { package = "substrate-consensus-common", path = "../consensus/common", features = ["test-helpers"] } grandpa = { package = "finality-grandpa", version = "0.8.1", features = ["derive-codec", "test-helpers"] } network = { package = "substrate-network", path = "../network", features = ["test-helpers"] } keyring = { package = "substrate-keyring", path = "../keyring" } diff --git a/core/finality-grandpa/src/tests.rs b/core/finality-grandpa/src/tests.rs index 67cf518d52067..9babc74a3ea6e 100644 --- a/core/finality-grandpa/src/tests.rs +++ b/core/finality-grandpa/src/tests.rs @@ -158,8 +158,8 @@ impl TestNetFactory for GrandpaTestNet { } } - fn peer(&self, i: usize) -> &GrandpaPeer { - &self.peers[i] + fn peer(&mut self, i: usize) -> &mut GrandpaPeer { + &mut self.peers[i] } fn peers(&self) -> &Vec { @@ -949,7 +949,7 @@ fn allows_reimporting_change_blocks() { let peers_b = &[AuthorityKeyring::Alice, AuthorityKeyring::Bob]; let voters = make_ids(peers_a); let api = TestApi::new(voters); - let net = GrandpaTestNet::new(api.clone(), 3); + let mut net = GrandpaTestNet::new(api.clone(), 3); let client = net.peer(0).client().clone(); let (block_import, ..) = net.make_block_import(client.clone()); @@ -998,7 +998,7 @@ fn test_bad_justification() { let peers_b = &[AuthorityKeyring::Alice, AuthorityKeyring::Bob]; let voters = make_ids(peers_a); let api = TestApi::new(voters); - let net = GrandpaTestNet::new(api.clone(), 3); + let mut net = GrandpaTestNet::new(api.clone(), 3); let client = net.peer(0).client().clone(); let (block_import, ..) = net.make_block_import(client.clone()); diff --git a/core/network/Cargo.toml b/core/network/Cargo.toml index c18c56f270ff5..afddb00dea1a9 100644 --- a/core/network/Cargo.toml +++ b/core/network/Cargo.toml @@ -50,10 +50,9 @@ quickcheck = "0.8.5" rand = "0.6.5" test-client = { package = "substrate-test-runtime-client", path = "../../core/test-runtime/client" } test_runtime = { package = "substrate-test-runtime", path = "../../core/test-runtime" } -consensus = { package = "substrate-consensus-common", path = "../../core/consensus/common", features = ["test-helpers"] } tempdir = "0.3" tokio = "0.1.11" [features] default = [] -test-helpers = ["keyring", "test-client", "consensus/test-helpers", "tokio"] +test-helpers = ["keyring", "test-client", "tokio"] diff --git a/core/network/src/behaviour.rs b/core/network/src/behaviour.rs index 8d5c3fe9cce15..4390c663972a5 100644 --- a/core/network/src/behaviour.rs +++ b/core/network/src/behaviour.rs @@ -15,7 +15,8 @@ // along with Substrate. If not, see . use crate::{ - debug_info, discovery::DiscoveryBehaviour, discovery::DiscoveryOut, DiscoveryNetBehaviour, event::DhtEvent + debug_info, discovery::DiscoveryBehaviour, discovery::DiscoveryOut, DiscoveryNetBehaviour, + protocol::event::DhtEvent }; use crate::{ExHashT, specialization::NetworkSpecialization}; use crate::protocol::{CustomMessageOutcome, Protocol}; diff --git a/core/network/src/config.rs b/core/network/src/config.rs index be7f32d995721..7b6c0eff47d6c 100644 --- a/core/network/src/config.rs +++ b/core/network/src/config.rs @@ -14,11 +14,14 @@ // You should have received a copy of the GNU General Public License // along with Substrate. If not, see . -//! Configuration for the networking layer of Substrate. +//! Configuration of the networking layer. +//! +//! The [`Params`] struct is the struct that must be passed in order to initialize the networking. +//! See the documentation of [`Params`]. pub use crate::protocol::ProtocolConfig; +pub use libp2p::{identity, core::PublicKey, wasm_ext::ExtTransport, build_multiaddr}; -use crate::ProtocolId; use crate::chain::{Client, FinalityProofProvider}; use crate::on_demand_layer::OnDemand; use crate::service::{ExHashT, TransactionPool}; @@ -29,30 +32,49 @@ use runtime_primitives::traits::{Block as BlockT}; use std::sync::Arc; use libp2p::identity::{Keypair, secp256k1, ed25519}; use libp2p::wasm_ext; -use libp2p::{Multiaddr, multiaddr::Protocol}; +use libp2p::{PeerId, Multiaddr, multiaddr}; use std::error::Error; -use std::{io::{self, Write}, iter, fs, net::Ipv4Addr, path::{Path, PathBuf}}; +use std::{io::{self, Write}, iter, fmt, fs, net::Ipv4Addr, path::{Path, PathBuf}}; use zeroize::Zeroize; -/// Service initialization parameters. +/// Network initialization parameters. pub struct Params { - /// Assigned roles for our node. + /// Assigned roles for our node (full, light, ...). pub roles: Roles, + /// Network layer configuration. pub network_config: NetworkConfiguration, - /// Substrate relay chain access point. + + /// Client that contains the blockchain. pub chain: Arc>, + /// Finality proof provider. + /// + /// This object, if `Some`, is used when a node on the network requests a proof of finality + /// from us. pub finality_proof_provider: Option>>, - /// On-demand service reference. + + /// The `OnDemand` object acts as a "receiver" for block data requests from the client. + /// If `Some`, the network worker will process these requests and answer them. + /// Normally used only for light clients. pub on_demand: Option>>, - /// Transaction pool. + + /// Pool of transactions. + /// + /// The network worker will fetch transactions from this object in order to propagate them on + /// the network. pub transaction_pool: Arc>, + /// Name of the protocol to use on the wire. Should be different for each chain. pub protocol_id: ProtocolId, + /// Import queue to use. + /// + /// The import queue is the component that verifies that blocks received from other nodes are + /// valid. pub import_queue: Box>, - /// Protocol specialization. + + /// Customization of the network. Use this to plug additional networking capabilities. pub specialization: S, } @@ -94,6 +116,85 @@ impl parity_codec::Decode for Roles { } } +/// Name of a protocol, transmitted on the wire. Should be unique for each chain. +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +pub struct ProtocolId(smallvec::SmallVec<[u8; 6]>); + +impl<'a> From<&'a [u8]> for ProtocolId { + fn from(bytes: &'a [u8]) -> ProtocolId { + ProtocolId(bytes.into()) + } +} + +impl ProtocolId { + /// Exposes the `ProtocolId` as bytes. + pub fn as_bytes(&self) -> &[u8] { + self.0.as_ref() + } +} + +/// Parses a string address and returns the component, if valid. +/// +/// # Example +/// +/// ``` +/// # use substrate_network::{Multiaddr, PeerId, config::parse_str_addr}; +/// let (peer_id, addr) = parse_str_addr( +/// "/ip4/198.51.100.19/tcp/30333/p2p/QmSk5HQbn6LhUwDiNMseVUjuRYhEtYj4aUZ6WfWoGURpdV" +/// ).unwrap(); +/// assert_eq!(peer_id, "QmSk5HQbn6LhUwDiNMseVUjuRYhEtYj4aUZ6WfWoGURpdV".parse::().unwrap()); +/// assert_eq!(addr, "/ip4/198.51.100.19/tcp/30333".parse::().unwrap()); +/// ``` +/// +pub fn parse_str_addr(addr_str: &str) -> Result<(PeerId, Multiaddr), ParseErr> { + let mut addr: Multiaddr = addr_str.parse()?; + + let who = match addr.pop() { + Some(multiaddr::Protocol::P2p(key)) => PeerId::from_multihash(key) + .map_err(|_| ParseErr::InvalidPeerId)?, + _ => return Err(ParseErr::PeerIdMissing), + }; + + Ok((who, addr)) +} + +/// Error that can be generated by `parse_str_addr`. +#[derive(Debug)] +pub enum ParseErr { + /// Error while parsing the multiaddress. + MultiaddrParse(multiaddr::Error), + /// Multihash of the peer ID is invalid. + InvalidPeerId, + /// The peer ID is missing from the address. + PeerIdMissing, +} + +impl fmt::Display for ParseErr { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + ParseErr::MultiaddrParse(err) => write!(f, "{}", err), + ParseErr::InvalidPeerId => write!(f, "Peer id at the end of the address is invalid"), + ParseErr::PeerIdMissing => write!(f, "Peer id is missing from the address"), + } + } +} + +impl std::error::Error for ParseErr { + fn source(&self) -> Option<&(dyn std::error::Error + 'static)> { + match self { + ParseErr::MultiaddrParse(err) => Some(err), + ParseErr::InvalidPeerId => None, + ParseErr::PeerIdMissing => None, + } + } +} + +impl From for ParseErr { + fn from(err: multiaddr::Error) -> ParseErr { + ParseErr::MultiaddrParse(err) + } +} + /// Network service configuration. #[derive(Clone)] pub struct NetworkConfiguration { @@ -158,8 +259,8 @@ impl NetworkConfiguration { pub fn new_local() -> NetworkConfiguration { let mut config = NetworkConfiguration::new(); config.listen_addresses = vec![ - iter::once(Protocol::Ip4(Ipv4Addr::new(127, 0, 0, 1))) - .chain(iter::once(Protocol::Tcp(0))) + iter::once(multiaddr::Protocol::Ip4(Ipv4Addr::new(127, 0, 0, 1))) + .chain(iter::once(multiaddr::Protocol::Tcp(0))) .collect() ]; config @@ -169,8 +270,8 @@ impl NetworkConfiguration { pub fn new_memory() -> NetworkConfiguration { let mut config = NetworkConfiguration::new(); config.listen_addresses = vec![ - iter::once(Protocol::Ip4(Ipv4Addr::new(127, 0, 0, 1))) - .chain(iter::once(Protocol::Tcp(0))) + iter::once(multiaddr::Protocol::Ip4(Ipv4Addr::new(127, 0, 0, 1))) + .chain(iter::once(multiaddr::Protocol::Tcp(0))) .collect() ]; config diff --git a/core/network/src/custom_proto/behaviour.rs b/core/network/src/custom_proto/behaviour.rs index 1c787e2e10bcd..727977326f799 100644 --- a/core/network/src/custom_proto/behaviour.rs +++ b/core/network/src/custom_proto/behaviour.rs @@ -14,7 +14,7 @@ // You should have received a copy of the GNU General Public License // along with Substrate. If not, see . -use crate::{DiscoveryNetBehaviour, ProtocolId}; +use crate::{DiscoveryNetBehaviour, config::ProtocolId}; use crate::custom_proto::handler::{CustomProtoHandlerProto, CustomProtoHandlerOut, CustomProtoHandlerIn}; use crate::custom_proto::upgrade::{CustomMessage, RegisteredProtocol}; use fnv::FnvHashMap; diff --git a/core/network/src/custom_proto/tests.rs b/core/network/src/custom_proto/tests.rs index 0a85b66181f97..33ff81be4752e 100644 --- a/core/network/src/custom_proto/tests.rs +++ b/core/network/src/custom_proto/tests.rs @@ -26,7 +26,7 @@ use libp2p::{PeerId, Multiaddr, Transport}; use rand::seq::SliceRandom; use std::{io, time::Duration, time::Instant}; use test_client::runtime::Block; -use crate::protocol::message::{Message as MessageAlias, generic::Message}; +use crate::message::{Message as MessageAlias, generic::Message}; use crate::custom_proto::{CustomProto, CustomProtoOut, CustomMessage}; /// Builds two nodes that have each other as bootstrap nodes. diff --git a/core/network/src/custom_proto/upgrade.rs b/core/network/src/custom_proto/upgrade.rs index 9ede4753494c6..4cb6cb5dd9042 100644 --- a/core/network/src/custom_proto/upgrade.rs +++ b/core/network/src/custom_proto/upgrade.rs @@ -14,7 +14,7 @@ // You should have received a copy of the GNU General Public License // along with Substrate. If not, see . -use crate::ProtocolId; +use crate::config::ProtocolId; use bytes::Bytes; use libp2p::core::{Negotiated, Endpoint, UpgradeInfo, InboundUpgrade, OutboundUpgrade, upgrade::ProtocolName}; use libp2p::tokio_codec::Framed; diff --git a/core/network/src/lib.rs b/core/network/src/lib.rs index dc2b7a8af12af..98cbf75c63fcc 100644 --- a/core/network/src/lib.rs +++ b/core/network/src/lib.rs @@ -158,6 +158,15 @@ //! `Arc` from the `NetworkWorker`, which can be shared amongst multiple places //! in order to give orders to the networking. //! +//! See the [`config`] module for more information about how to configure the networking. +//! +//! After the `NetworkWorker` has been created, the important things to do are: +//! +//! - Calling `NetworkWorker::poll` in order to advance the network. +//! - Calling `on_block_import` whenever a block is added to the client. +//! - Calling `on_block_finalized` whenever a block is finalized. +//! - Calling `trigger_repropagate` when a transaction is added to the pool. +//! //! More precise usage details are still being worked on and will likely change in the future. //! @@ -167,7 +176,6 @@ mod custom_proto; mod debug_info; mod discovery; mod on_demand_layer; -#[macro_use] mod protocol; mod service; mod transport; @@ -180,27 +188,25 @@ pub mod test; pub use chain::{Client as ClientHandle, FinalityProofProvider}; pub use service::{ - NetworkService, NetworkWorker, FetchFuture, TransactionPool, ManageNetwork, - NetworkMsg, ExHashT, ReportHandle, + NetworkService, NetworkWorker, TransactionPool, ExHashT, ReportHandle, }; -pub use config::{NodeKeyConfig, Secret, Secp256k1Secret, Ed25519Secret}; -pub use protocol::{PeerInfo, Context, consensus_gossip, event, message, specialization}; +pub use protocol::{PeerInfo, Context, consensus_gossip, message, specialization}; pub use protocol::sync::SyncState; -pub use libp2p::{Multiaddr, multiaddr, build_multiaddr}; -pub use libp2p::{identity, PeerId, core::PublicKey, wasm_ext::ExtTransport}; +pub use libp2p::{Multiaddr, PeerId}; +#[doc(inline)] +pub use libp2p::multiaddr; pub use message::{generic as generic_message, RequestId, Status as StatusMessage}; -pub use event::Event; -pub use error::Error; -pub use protocol::on_demand::AlwaysBadChecker; pub use on_demand_layer::{OnDemand, RemoteResponse}; + +// Used by the `construct_simple_protocol!` macro. #[doc(hidden)] pub use runtime_primitives::traits::Block as BlockT; use libp2p::core::nodes::ConnectedPoint; use serde::{Deserialize, Serialize}; use slog_derive::SerdeValue; -use std::{collections::{HashMap, HashSet}, fmt, time::Duration}; +use std::{collections::{HashMap, HashSet}, time::Duration}; /// Extension trait for `NetworkBehaviour` that also accepts discovering nodes. pub trait DiscoveryNetBehaviour { @@ -213,73 +219,6 @@ pub trait DiscoveryNetBehaviour { fn add_discovered_nodes(&mut self, nodes: impl Iterator); } -/// Name of a protocol, transmitted on the wire. Should be unique for each chain. -#[derive(Debug, Clone, PartialEq, Eq, Hash)] -pub struct ProtocolId(smallvec::SmallVec<[u8; 6]>); - -impl<'a> From<&'a [u8]> for ProtocolId { - fn from(bytes: &'a [u8]) -> ProtocolId { - ProtocolId(bytes.into()) - } -} - -impl ProtocolId { - /// Exposes the `ProtocolId` as bytes. - pub fn as_bytes(&self) -> &[u8] { - self.0.as_ref() - } -} - -/// Parses a string address and returns the component, if valid. -pub fn parse_str_addr(addr_str: &str) -> Result<(PeerId, Multiaddr), ParseErr> { - let mut addr: Multiaddr = addr_str.parse()?; - - let who = match addr.pop() { - Some(multiaddr::Protocol::P2p(key)) => PeerId::from_multihash(key) - .map_err(|_| ParseErr::InvalidPeerId)?, - _ => return Err(ParseErr::PeerIdMissing), - }; - - Ok((who, addr)) -} - -/// Error that can be generated by `parse_str_addr`. -#[derive(Debug)] -pub enum ParseErr { - /// Error while parsing the multiaddress. - MultiaddrParse(multiaddr::Error), - /// Multihash of the peer ID is invalid. - InvalidPeerId, - /// The peer ID is missing from the address. - PeerIdMissing, -} - -impl fmt::Display for ParseErr { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - match self { - ParseErr::MultiaddrParse(err) => write!(f, "{}", err), - ParseErr::InvalidPeerId => write!(f, "Peer id at the end of the address is invalid"), - ParseErr::PeerIdMissing => write!(f, "Peer id is missing from the address"), - } - } -} - -impl std::error::Error for ParseErr { - fn source(&self) -> Option<&(dyn std::error::Error + 'static)> { - match self { - ParseErr::MultiaddrParse(err) => Some(err), - ParseErr::InvalidPeerId => None, - ParseErr::PeerIdMissing => None, - } - } -} - -impl From for ParseErr { - fn from(err: multiaddr::Error) -> ParseErr { - ParseErr::MultiaddrParse(err) - } -} - /// Returns general information about the networking. /// /// Meant for general diagnostic purposes. diff --git a/core/network/src/protocol.rs b/core/network/src/protocol.rs index 83f59e2208bbf..9521627536480 100644 --- a/core/network/src/protocol.rs +++ b/core/network/src/protocol.rs @@ -14,7 +14,7 @@ // You should have received a copy of the GNU General Public License // along with Substrate. If not, see . -use crate::{DiscoveryNetBehaviour, ProtocolId}; +use crate::{DiscoveryNetBehaviour, config::ProtocolId}; use crate::custom_proto::{CustomProto, CustomProtoOut}; use futures::prelude::*; use libp2p::{Multiaddr, PeerId}; @@ -117,12 +117,6 @@ pub struct Protocol, H: ExHashT> { behaviour: CustomProto, Substream>, } -/// A peer from whom we have received a Status message. -#[derive(Clone)] -pub struct ConnectedPeer { - pub peer_info: PeerInfo -} - /// A peer that we are connected to /// and from whom we have not yet received a Status message. struct HandshakingPeer { @@ -448,16 +442,6 @@ impl, H: ExHashT> Protocol { self.behaviour.is_enabled(peer_id) } - /// Sends a message to a peer. - /// - /// Has no effect if the custom protocol is not open with the given peer. - /// - /// Also note that even we have a valid open substream, it may in fact be already closed - /// without us knowing, in which case the packet will not be received. - pub fn send_packet(&mut self, target: &PeerId, message: Message) { - self.behaviour.send_packet(target, message) - } - /// Returns the state of the peerset manager, for debugging purposes. pub fn peerset_debug_info(&mut self) -> serde_json::Value { self.behaviour.peerset_debug_info() diff --git a/core/network/src/protocol/specialization.rs b/core/network/src/protocol/specialization.rs index 0078e66522969..085f201a455a1 100644 --- a/core/network/src/protocol/specialization.rs +++ b/core/network/src/protocol/specialization.rs @@ -16,6 +16,8 @@ //! Specializations of the substrate network protocol to allow more complex forms of communication. +pub use crate::protocol::event::{DhtEvent, Event}; + use crate::protocol::Context; use libp2p::PeerId; use runtime_primitives::traits::Block as BlockT; @@ -40,10 +42,7 @@ pub trait NetworkSpecialization: Send + Sync + 'static { ); /// Called when a network-specific event arrives. - fn on_event( - &mut self, - event: crate::protocol::event::Event - ); + fn on_event(&mut self, event: Event); /// Called on abort. #[deprecated(note = "This method is never called; aborting corresponds to dropping the object")] @@ -138,7 +137,7 @@ macro_rules! construct_simple_protocol { fn on_event( &mut self, - _event: $crate::event::Event + _event: $crate::specialization::Event ) { $( self.$sub_protocol_name.on_event(_event); )* } diff --git a/core/network/src/service.rs b/core/network/src/service.rs index 5aece788d1610..cd47b0056a929 100644 --- a/core/network/src/service.rs +++ b/core/network/src/service.rs @@ -14,43 +14,38 @@ // You should have received a copy of the GNU General Public License // along with Substrate. If not, see . -use std::collections::HashMap; -use std::{fs, io, path::Path}; -use std::sync::Arc; -use std::sync::atomic::{AtomicBool, Ordering}; -use std::time::Duration; +//! Main entry point of the substrate-network crate. +//! +//! There are two main structs in this module: [`NetworkWorker`] and [`NetworkService`]. +//! The [`NetworkWorker`] *is* the network and implements the `Future` trait. It must be polled in +//! order fo the network to advance. +//! The [`NetworkService`] is merely a shared version of the [`NetworkWorker`]. You can obtain an +//! `Arc` by calling [`NetworkWorker::service`]. +//! +//! The methods of the [`NetworkService`] are implemented by sending a message over a channel, +//! which is then processed by [`NetworkWorker::poll`]. + +use std::{collections::HashMap, fs, marker::PhantomData, io, path::Path}; +use std::sync::{Arc, atomic::{AtomicBool, AtomicUsize, Ordering}}; +use consensus::import_queue::{ImportQueue, Link, SharedFinalityProofRequestBuilder}; +use futures::{prelude::*, sync::mpsc}; use log::{warn, error, info}; -use libp2p::core::swarm::NetworkBehaviour; -use libp2p::core::{transport::boxed::Boxed, muxing::StreamMuxerBox}; -use libp2p::{Multiaddr, multihash::Multihash}; -use futures::{prelude::*, sync::oneshot, sync::mpsc}; -use parking_lot::{Mutex, RwLock}; -use crate::protocol::Protocol; -use crate::{behaviour::{Behaviour, BehaviourOut}, parse_str_addr}; -use crate::{NetworkState, NetworkStateNotConnectedPeer, NetworkStatePeer}; -use crate::{transport, config::NodeKeyConfig, config::NonReservedPeerMode}; +use libp2p::core::{swarm::NetworkBehaviour, transport::boxed::Boxed, muxing::StreamMuxerBox}; +use libp2p::{PeerId, Multiaddr, multihash::Multihash}; use peerset::PeersetHandle; -use consensus::import_queue::{ImportQueue, Link, SharedFinalityProofRequestBuilder}; use runtime_primitives::{traits::{Block as BlockT, NumberFor}, ConsensusEngineId}; -use crate::AlwaysBadChecker; -use crate::protocol::consensus_gossip::{ConsensusGossip, MessageRecipient as GossipMessageRecipient}; -use crate::protocol::{event::Event, message::Message}; -use crate::protocol::on_demand::RequestData; -use crate::protocol::{self, Context, CustomMessageOutcome, ConnectedPeer, PeerInfo}; -use crate::protocol::sync::SyncState; +use crate::{behaviour::{Behaviour, BehaviourOut}, config::parse_str_addr}; +use crate::{NetworkState, NetworkStateNotConnectedPeer, NetworkStatePeer}; +use crate::{transport, config::NodeKeyConfig, config::NonReservedPeerMode}; use crate::config::{Params, TransportConfig}; use crate::error::Error; +use crate::protocol::{self, Protocol, Context, CustomMessageOutcome, PeerInfo}; +use crate::protocol::consensus_gossip::{ConsensusGossip, MessageRecipient as GossipMessageRecipient}; +use crate::protocol::{event::Event, on_demand::{AlwaysBadChecker, RequestData}}; use crate::protocol::specialization::NetworkSpecialization; - -/// Interval at which we update the `peers` field on the main thread. -const CONNECTED_PEERS_INTERVAL: Duration = Duration::from_millis(500); - -pub use libp2p::PeerId; - -/// Type that represents fetch completion future. -pub type FetchFuture = oneshot::Receiver>; +use crate::protocol::sync::SyncState; /// Minimum Requirements for a Hash within Networking pub trait ExHashT: @@ -88,23 +83,22 @@ impl ReportHandle { /// Substrate network service. Handles network IO and manages connectivity. pub struct NetworkService, H: ExHashT> { - /// Are we connected to any peer? - is_offline: Arc, + /// Number of peers we're connected to. + num_connected: Arc, /// Are we actively catching up with the chain? is_major_syncing: Arc, - /// Peers whom we are connected with. - peers: Arc>>>, - /// Channel for networking messages processed by the background thread. - network_chan: mpsc::UnboundedSender>, - /// Network service - network: Arc>>, + /// Local copy of the `PeerId` of the local node. + local_peer_id: PeerId, /// Bandwidth logging system. Can be queried to know the average bandwidth consumed. bandwidth: Arc, /// Peerset manager (PSM); manages the reputation of nodes and indicates the network which /// nodes it should be connected to or not. peerset: PeersetHandle, - /// Protocol sender - protocol_sender: mpsc::UnboundedSender>, + /// Channel that sends messages to the actual worker. + to_worker: mpsc::UnboundedSender>, + /// Marker to pin the `H` generic. Serves no purpose except to not break backwards + /// compatibility. + _marker: PhantomData, } impl, H: ExHashT> NetworkWorker { @@ -116,8 +110,7 @@ impl, H: ExHashT> NetworkWorker pub fn new( params: Params, ) -> Result, Error> { - let (network_chan, network_port) = mpsc::unbounded(); - let (protocol_sender, protocol_rx) = mpsc::unbounded(); + let (to_worker, from_worker) = mpsc::unbounded(); if let Some(ref path) = params.network_config.net_config_path { fs::create_dir_all(Path::new(path))?; @@ -166,10 +159,8 @@ impl, H: ExHashT> NetworkWorker let local_peer_id = local_public.clone().into_peer_id(); info!(target: "sub-libp2p", "Local node identity is: {}", local_peer_id.to_base58()); - // Start in off-line mode, since we're not connected to any nodes yet. - let is_offline = Arc::new(AtomicBool::new(true)); + let num_connected = Arc::new(AtomicUsize::new(0)); let is_major_syncing = Arc::new(AtomicBool::new(false)); - let peers: Arc>>> = Arc::new(Default::default()); let (protocol, peerset_handle) = Protocol::new( protocol::ProtocolConfig { roles: params.roles }, params.chain, @@ -222,31 +213,24 @@ impl, H: ExHashT> NetworkWorker Swarm::::add_external_address(&mut swarm, addr.clone()); } - let network = Arc::new(Mutex::new(swarm)); - let service = Arc::new(NetworkService { bandwidth, - is_offline: is_offline.clone(), + num_connected: num_connected.clone(), is_major_syncing: is_major_syncing.clone(), - network_chan, - peers: peers.clone(), - peerset: peerset_handle.clone(), - network: network.clone(), - protocol_sender: protocol_sender.clone(), + peerset: peerset_handle, + local_peer_id, + to_worker: to_worker.clone(), + _marker: PhantomData, }); Ok(NetworkWorker { - is_offline, + num_connected, is_major_syncing, - network_service: network, - peerset: peerset_handle, + network_service: swarm, service, - peers, import_queue: params.import_queue, - network_port, - protocol_rx, + from_worker, on_demand_in: params.on_demand.and_then(|od| od.extract_receiver()), - connected_peers_interval: tokio_timer::Interval::new_interval(CONNECTED_PEERS_INTERVAL), }) } @@ -262,32 +246,32 @@ impl, H: ExHashT> NetworkWorker /// Returns the number of peers we're connected to. pub fn num_connected_peers(&self) -> usize { - self.network_service.lock().user_protocol_mut().num_connected_peers() + self.network_service.user_protocol().num_connected_peers() } /// Returns the number of peers we're connected to and that are being queried. pub fn num_active_peers(&self) -> usize { - self.network_service.lock().user_protocol_mut().num_active_peers() + self.network_service.user_protocol().num_active_peers() } /// Current global sync state. pub fn sync_state(&self) -> SyncState { - self.network_service.lock().user_protocol_mut().sync_state() + self.network_service.user_protocol().sync_state() } /// Target sync block number. pub fn best_seen_block(&self) -> Option> { - self.network_service.lock().user_protocol_mut().best_seen_block() + self.network_service.user_protocol().best_seen_block() } /// Number of peers participating in syncing. pub fn num_sync_peers(&self) -> u32 { - self.network_service.lock().user_protocol_mut().num_sync_peers() + self.network_service.user_protocol().num_sync_peers() } /// Adds an address for a node. pub fn add_known_address(&mut self, peer_id: PeerId, addr: Multiaddr) { - self.network_service.lock().add_known_address(peer_id, addr); + self.network_service.add_known_address(peer_id, addr); } /// Return a `NetworkService` that can be shared through the code base and can be used to @@ -295,114 +279,23 @@ impl, H: ExHashT> NetworkWorker pub fn service(&self) -> &Arc> { &self.service } -} - -impl, H: ExHashT> NetworkService { - /// Returns the network identity of the node. - pub fn local_peer_id(&self) -> PeerId { - Swarm::::local_peer_id(&*self.network.lock()).clone() - } - - /// Called when a new block is imported by the client. - pub fn on_block_imported(&self, hash: B::Hash, header: B::Header) { - let _ = self - .protocol_sender - .unbounded_send(ProtocolMsg::BlockImported(hash, header)); - } - - /// Called when a new block is finalized by the client. - pub fn on_block_finalized(&self, hash: B::Hash, header: B::Header) { - let _ = self - .protocol_sender - .unbounded_send(ProtocolMsg::BlockFinalized(hash, header)); - } - - /// Called when new transactons are imported by the client. - pub fn trigger_repropagate(&self) { - let _ = self.protocol_sender.unbounded_send(ProtocolMsg::PropagateExtrinsics); - } - /// Make sure an important block is propagated to peers. - /// - /// In chain-based consensus, we often need to make sure non-best forks are - /// at least temporarily synced. - pub fn announce_block(&self, hash: B::Hash) { - let _ = self.protocol_sender.unbounded_send(ProtocolMsg::AnnounceBlock(hash)); + /// You must call this when a new block is imported by the client. + pub fn on_block_imported(&mut self, hash: B::Hash, header: B::Header) { + self.network_service.user_protocol_mut().on_block_imported(hash, &header); } - /// Send a consensus message through the gossip - pub fn gossip_consensus_message( - &self, - topic: B::Hash, - engine_id: ConsensusEngineId, - message: Vec, - recipient: GossipMessageRecipient, - ) { - let _ = self - .protocol_sender - .unbounded_send(ProtocolMsg::GossipConsensusMessage( - topic, engine_id, message, recipient, - )); - } - - /// Report a given peer as either beneficial (+) or costly (-) according to the - /// given scalar. - pub fn report_peer(&self, who: PeerId, cost_benefit: i32) { - self.peerset.report_peer(who, cost_benefit); - } - - /// Send a message to the given peer. Has no effect if we're not connected to this peer. - /// - /// This method is extremely poor in terms of API and should be eventually removed. - pub fn disconnect_peer(&self, who: PeerId) { - let _ = self.network_chan.unbounded_send(NetworkMsg::DisconnectPeer(who)); + /// You must call this when a new block is finalized by the client. + pub fn on_block_finalized(&mut self, hash: B::Hash, header: B::Header) { + self.network_service.user_protocol_mut().on_block_finalized(hash, &header); } - /// Request a justification for the given block. - pub fn request_justification(&self, hash: &B::Hash, number: NumberFor) { - let _ = self - .protocol_sender - .unbounded_send(ProtocolMsg::RequestJustification(hash.clone(), number)); - } - - /// Execute a closure with the chain-specific network specialization. - pub fn with_spec(&self, f: F) - where F: FnOnce(&mut S, &mut dyn Context) + Send + 'static - { - let _ = self - .protocol_sender - .unbounded_send(ProtocolMsg::ExecuteWithSpec(Box::new(f))); - } - - /// Execute a closure with the consensus gossip. - pub fn with_gossip(&self, f: F) - where F: FnOnce(&mut ConsensusGossip, &mut dyn Context) + Send + 'static - { - let _ = self - .protocol_sender - .unbounded_send(ProtocolMsg::ExecuteWithGossip(Box::new(f))); - } - - /// Are we in the process of downloading the chain? - pub fn is_major_syncing(&self) -> bool { - self.is_major_syncing.load(Ordering::Relaxed) - } - - /// Get a value. - pub fn get_value(&mut self, key: &Multihash) { - self.network.lock().get_value(key); - } - - /// Put a value. - pub fn put_value(&mut self, key: Multihash, value: Vec) { - self.network.lock().put_value(key, value); - } -} - -impl, H: ExHashT> NetworkService { /// Get network state. - pub fn network_state(&self) -> NetworkState { - let mut swarm = self.network.lock(); + /// + /// **Note**: Use this only for debugging. This API is unstable. There are warnings literaly + /// everywhere about this. Please don't use this function to retreive actual information. + pub fn network_state(&mut self) -> NetworkState { + let swarm = &mut self.network_service; let open = swarm.user_protocol().open_peers().cloned().collect::>(); let connected_peers = { @@ -450,8 +343,8 @@ impl, H: ExHashT> NetworkServic peer_id: Swarm::::local_peer_id(&swarm).to_base58(), listened_addresses: Swarm::::listeners(&swarm).cloned().collect(), external_addresses: Swarm::::external_addresses(&swarm).cloned().collect(), - average_download_per_sec: self.bandwidth.average_download_per_sec(), - average_upload_per_sec: self.bandwidth.average_upload_per_sec(), + average_download_per_sec: self.service.bandwidth.average_download_per_sec(), + average_upload_per_sec: self.service.bandwidth.average_upload_per_sec(), connected_peers, not_connected_peers, peerset: swarm.user_protocol_mut().peerset_debug_info(), @@ -459,155 +352,186 @@ impl, H: ExHashT> NetworkServic } /// Get currently connected peers. - /// - /// > **Warning**: This method can return outdated information and should only ever be used - /// > when obtaining outdated information is acceptable. - pub fn peers_debug_info(&self) -> Vec<(PeerId, PeerInfo)> { - let peers = (*self.peers.read()).clone(); - peers.into_iter().map(|(idx, connected)| (idx, connected.peer_info)).collect() + pub fn peers_debug_info(&mut self) -> Vec<(PeerId, PeerInfo)> { + self.network_service.user_protocol_mut() + .peers_info() + .map(|(id, info)| (id.clone(), info.clone())) + .collect() } } -impl, H: ExHashT> - ::consensus::SyncOracle for NetworkService { - fn is_major_syncing(&self) -> bool { - self.is_major_syncing() +impl, H: ExHashT> NetworkService { + /// Returns the network identity of the node. + pub fn local_peer_id(&self) -> PeerId { + self.local_peer_id.clone() } - fn is_offline(&self) -> bool { - self.is_offline.load(Ordering::Relaxed) + /// You must call this when new transactons are imported by the transaction pool. + /// + /// The latest transactions will be fetched from the `TransactionPool` that was passed at + /// initialization as part of the configuration. + pub fn trigger_repropagate(&self) { + let _ = self.to_worker.unbounded_send(ServerToWorkerMsg::PropagateExtrinsics); } -} -/// Trait for managing network -pub trait ManageNetwork { - /// Set to allow unreserved peers to connect - fn accept_unreserved_peers(&self); - /// Set to deny unreserved peers to connect - fn deny_unreserved_peers(&self); - /// Remove reservation for the peer - fn remove_reserved_peer(&self, peer: PeerId); - /// Add reserved peer - fn add_reserved_peer(&self, peer: String) -> Result<(), String>; -} + /// Make sure an important block is propagated to peers. + /// + /// In chain-based consensus, we often need to make sure non-best forks are + /// at least temporarily synced. This function forces such an announcement. + pub fn announce_block(&self, hash: B::Hash) { + let _ = self.to_worker.unbounded_send(ServerToWorkerMsg::AnnounceBlock(hash)); + } + + /// Send a consensus message through the gossip + pub fn gossip_consensus_message( + &self, + topic: B::Hash, + engine_id: ConsensusEngineId, + message: Vec, + recipient: GossipMessageRecipient, + ) { + let _ = self + .to_worker + .unbounded_send(ServerToWorkerMsg::GossipConsensusMessage( + topic, engine_id, message, recipient, + )); + } + + /// Report a given peer as either beneficial (+) or costly (-) according to the + /// given scalar. + pub fn report_peer(&self, who: PeerId, cost_benefit: i32) { + self.peerset.report_peer(who, cost_benefit); + } + + /// Request a justification for the given block from the network. + /// + /// On success, the justification will be passed to the import queue that was part at + /// initialization as part of the configuration. + pub fn request_justification(&self, hash: &B::Hash, number: NumberFor) { + let _ = self + .to_worker + .unbounded_send(ServerToWorkerMsg::RequestJustification(hash.clone(), number)); + } + + /// Execute a closure with the chain-specific network specialization. + pub fn with_spec(&self, f: F) + where F: FnOnce(&mut S, &mut dyn Context) + Send + 'static + { + let _ = self + .to_worker + .unbounded_send(ServerToWorkerMsg::ExecuteWithSpec(Box::new(f))); + } + + /// Execute a closure with the consensus gossip. + pub fn with_gossip(&self, f: F) + where F: FnOnce(&mut ConsensusGossip, &mut dyn Context) + Send + 'static + { + let _ = self + .to_worker + .unbounded_send(ServerToWorkerMsg::ExecuteWithGossip(Box::new(f))); + } + + /// Are we in the process of downloading the chain? + pub fn is_major_syncing(&self) -> bool { + self.is_major_syncing.load(Ordering::Relaxed) + } -impl, H: ExHashT> ManageNetwork for NetworkService { - fn accept_unreserved_peers(&self) { + /// Start getting a value from the DHT. + /// + /// This will generate either a `ValueFound` or a `ValueNotFound` event and pass it to + /// `on_event` on the network specialization. + pub fn get_value(&mut self, key: &Multihash) { + let _ = self + .to_worker + .unbounded_send(ServerToWorkerMsg::GetValue(key.clone())); + } + + /// Start putting a value in the DHT. + /// + /// This will generate either a `ValuePut` or a `ValuePutFailed` event and pass it to + /// `on_event` on the network specialization. + pub fn put_value(&mut self, key: Multihash, value: Vec) { + let _ = self + .to_worker + .unbounded_send(ServerToWorkerMsg::PutValue(key, value)); + } + + /// Connect to unreserved peers and allow unreserved peers to connect. + pub fn accept_unreserved_peers(&self) { self.peerset.set_reserved_only(false); } - fn deny_unreserved_peers(&self) { + /// Disconnect from unreserved peers and deny new unreserved peers to connect. + pub fn deny_unreserved_peers(&self) { self.peerset.set_reserved_only(true); } - fn remove_reserved_peer(&self, peer: PeerId) { + /// Removes a `PeerId` from the list of reserved peers. + pub fn remove_reserved_peer(&self, peer: PeerId) { self.peerset.remove_reserved_peer(peer); } - fn add_reserved_peer(&self, peer: String) -> Result<(), String> { + /// Adds a `PeerId` and its address as reserved. + pub fn add_reserved_peer(&self, peer: String) -> Result<(), String> { let (peer_id, addr) = parse_str_addr(&peer).map_err(|e| format!("{:?}", e))?; self.peerset.add_reserved_peer(peer_id.clone()); - self.network.lock().add_known_address(peer_id, addr); + let _ = self + .to_worker + .unbounded_send(ServerToWorkerMsg::AddKnownAddress(peer_id, addr)); Ok(()) } + + /// Returns the number of peers we're connected to. + pub fn num_connected(&self) -> usize { + self.num_connected.load(Ordering::Relaxed) + } } -/// Messages to be handled by Libp2pNetService. -#[derive(Debug)] -pub enum NetworkMsg { - /// Send an outgoing custom message. - Outgoing(PeerId, Message), - /// Disconnect a peer we're connected to, or do nothing if we're not connected. - DisconnectPeer(PeerId), - /// Performs a reputation adjustement on a peer. - ReportPeer(PeerId, i32), - /// Synchronization response. - #[cfg(any(test, feature = "test-helpers"))] - Synchronized, +impl, H: ExHashT> + ::consensus::SyncOracle for NetworkService { + fn is_major_syncing(&self) -> bool { + self.is_major_syncing() + } + + fn is_offline(&self) -> bool { + self.num_connected.load(Ordering::Relaxed) == 0 + } } -/// Messages sent to Protocol from elsewhere inside the system. -pub enum ProtocolMsg> { - /// A batch of blocks has been processed, with or without errors. - BlocksProcessed(Vec, bool), - /// Tell protocol to restart sync. - RestartSync, - /// Tell protocol to propagate extrinsics. +/// Messages sent from the `NetworkService` to the `NetworkWorker`. +/// +/// Each entry corresponds to a method of `NetworkService`. +enum ServerToWorkerMsg> { PropagateExtrinsics, - /// Tell protocol that a block was imported (sent by the import-queue). - BlockImportedSync(B::Hash, NumberFor), - /// Tell protocol to clear all pending justification requests. - ClearJustificationRequests, - /// Tell protocol to request justification for a block. RequestJustification(B::Hash, NumberFor), - /// Inform protocol whether a justification was successfully imported. - JustificationImportResult(B::Hash, NumberFor, bool), - /// Set finality proof request builder. - SetFinalityProofRequestBuilder(SharedFinalityProofRequestBuilder), - /// Tell protocol to request finality proof for a block. - RequestFinalityProof(B::Hash, NumberFor), - /// Inform protocol whether a finality proof was successfully imported. - FinalityProofImportResult((B::Hash, NumberFor), Result<(B::Hash, NumberFor), ()>), - /// Propagate a block to peers. AnnounceBlock(B::Hash), - /// A block has been imported (sent by the client). - BlockImported(B::Hash, B::Header), - /// A block has been finalized (sent by the client). - BlockFinalized(B::Hash, B::Header), - /// Execute a closure with the chain-specific network specialization. - ExecuteWithSpec(Box + Send + 'static>), - /// Execute a closure with the consensus gossip. - ExecuteWithGossip(Box + Send + 'static>), - /// Incoming gossip consensus message. + ExecuteWithSpec(Box) + Send>), + ExecuteWithGossip(Box, &mut dyn Context) + Send>), GossipConsensusMessage(B::Hash, ConsensusEngineId, Vec, GossipMessageRecipient), - /// Tell protocol to perform regular maintenance. - #[cfg(any(test, feature = "test-helpers"))] - Tick, - /// Synchronization request. - #[cfg(any(test, feature = "test-helpers"))] - Synchronize, + GetValue(Multihash), + PutValue(Multihash, Vec), + AddKnownAddress(PeerId, Multiaddr), } -/// A task, consisting of a user-provided closure, to be executed on the Protocol thread. -pub trait SpecTask> { - fn call_box(self: Box, spec: &mut S, context: &mut dyn Context); -} - -impl, F: FnOnce(&mut S, &mut dyn Context)> SpecTask for F { - fn call_box(self: Box, spec: &mut S, context: &mut dyn Context) { - (*self)(spec, context) - } -} - -/// A task, consisting of a user-provided closure, to be executed on the Protocol thread. -pub trait GossipTask { - fn call_box(self: Box, gossip: &mut ConsensusGossip, context: &mut dyn Context); -} - -impl, &mut dyn Context)> GossipTask for F { - fn call_box(self: Box, gossip: &mut ConsensusGossip, context: &mut dyn Context) { - (*self)(gossip, context) - } -} - -/// Future tied to the `Network` service and that must be polled in order for the network to -/// advance. +/// Main network worker. Must be polled in order for the network to advance. +/// +/// You are encouraged to poll this in a separate background thread or task. #[must_use = "The NetworkWorker must be polled in order for the network to work"] pub struct NetworkWorker, H: ExHashT> { - is_offline: Arc, + /// Updated by the `NetworkWorker` and loaded by the `NetworkService`. + num_connected: Arc, + /// Updated by the `NetworkWorker` and loaded by the `NetworkService`. is_major_syncing: Arc, /// The network service that can be extracted and shared through the codebase. service: Arc>, - network_service: Arc>>, - peers: Arc>>>, + /// The *actual* network. + network_service: Swarm, + /// The import queue that was passed as initialization. import_queue: Box>, - network_port: mpsc::UnboundedReceiver>, - protocol_rx: mpsc::UnboundedReceiver>, - peerset: PeersetHandle, + /// Messages from the `NetworkService` and that must be processed. + from_worker: mpsc::UnboundedReceiver>, + /// Receiver for queries from the on-demand that must be processed. on_demand_in: Option>>, - - /// Interval at which we update the `connected_peers` Arc. - connected_peers_interval: tokio_timer::Interval, } impl, H: ExHashT> Future for NetworkWorker { @@ -615,165 +539,63 @@ impl, H: ExHashT> Future for Ne type Error = io::Error; fn poll(&mut self) -> Poll { - // Implementation of `import_queue::Link` trait using the available local variables. - struct NetworkLink<'a, B: BlockT, S: NetworkSpecialization, H: ExHashT> { - protocol: &'a mut Swarm, - } - impl<'a, B: BlockT, S: NetworkSpecialization, H: ExHashT> Link for NetworkLink<'a, B, S, H> { - fn block_imported(&mut self, hash: &B::Hash, number: NumberFor) { - self.protocol.user_protocol_mut().block_imported(&hash, number) - } - fn blocks_processed(&mut self, hashes: Vec, has_error: bool) { - self.protocol.user_protocol_mut().blocks_processed(hashes, has_error) - } - fn justification_imported(&mut self, who: PeerId, hash: &B::Hash, number: NumberFor, success: bool) { - self.protocol.user_protocol_mut().justification_import_result(hash.clone(), number, success); - if !success { - info!("Invalid justification provided by {} for #{}", who, hash); - self.protocol.user_protocol_mut().disconnect_peer(&who); - self.protocol.user_protocol_mut().report_peer(who, i32::min_value()); - } - } - fn clear_justification_requests(&mut self) { - self.protocol.user_protocol_mut().clear_justification_requests() - } - fn request_justification(&mut self, hash: &B::Hash, number: NumberFor) { - self.protocol.user_protocol_mut().request_justification(hash, number) - } - fn request_finality_proof(&mut self, hash: &B::Hash, number: NumberFor) { - self.protocol.user_protocol_mut().request_finality_proof(hash, number) - } - fn finality_proof_imported( - &mut self, - who: PeerId, - request_block: (B::Hash, NumberFor), - finalization_result: Result<(B::Hash, NumberFor), ()>, - ) { - let success = finalization_result.is_ok(); - self.protocol.user_protocol_mut().finality_proof_import_result(request_block, finalization_result); - if !success { - info!("Invalid finality proof provided by {} for #{}", who, request_block.0); - self.protocol.user_protocol_mut().disconnect_peer(&who); - self.protocol.user_protocol_mut().report_peer(who, i32::min_value()); - } - } - fn report_peer(&mut self, who: PeerId, reputation_change: i32) { - self.protocol.user_protocol_mut().report_peer(who, reputation_change) - } - fn restart(&mut self) { - self.protocol.user_protocol_mut().restart() - } - fn set_finality_proof_request_builder(&mut self, builder: SharedFinalityProofRequestBuilder) { - self.protocol.user_protocol_mut().set_finality_proof_request_builder(builder) - } - } - - { - let mut network_service = self.network_service.lock(); - let mut link = NetworkLink { - protocol: &mut network_service, - }; - self.import_queue.poll_actions(&mut link); - } - - while let Ok(Async::Ready(_)) = self.connected_peers_interval.poll() { - let mut network_service = self.network_service.lock(); - let infos = network_service.user_protocol_mut().peers_info().map(|(id, info)| { - (id.clone(), ConnectedPeer { peer_info: info.clone() }) - }).collect(); - *self.peers.write() = infos; - } + // Poll the import queue for actions to perform. + self.import_queue.poll_actions(&mut NetworkLink { + protocol: &mut self.network_service, + }); // Check for new incoming on-demand requests. if let Some(on_demand_in) = self.on_demand_in.as_mut() { while let Ok(Async::Ready(Some(rq))) = on_demand_in.poll() { - let mut network_service = self.network_service.lock(); - network_service.user_protocol_mut().add_on_demand_request(rq); - } - } - - loop { - match self.network_port.poll() { - Ok(Async::NotReady) => break, - Ok(Async::Ready(Some(NetworkMsg::Outgoing(who, outgoing_message)))) => - self.network_service.lock().user_protocol_mut().send_packet(&who, outgoing_message), - Ok(Async::Ready(Some(NetworkMsg::ReportPeer(who, reputation)))) => - self.peerset.report_peer(who, reputation), - Ok(Async::Ready(Some(NetworkMsg::DisconnectPeer(who)))) => - self.network_service.lock().user_protocol_mut().disconnect_peer(&who), - - #[cfg(any(test, feature = "test-helpers"))] - Ok(Async::Ready(Some(NetworkMsg::Synchronized))) => {} - - Ok(Async::Ready(None)) | Err(_) => return Ok(Async::Ready(())), + self.network_service.user_protocol_mut().add_on_demand_request(rq); } } loop { - let msg = match self.protocol_rx.poll() { + // Process the next message coming from the `NetworkService`. + let msg = match self.from_worker.poll() { Ok(Async::Ready(Some(msg))) => msg, Ok(Async::Ready(None)) | Err(_) => return Ok(Async::Ready(())), Ok(Async::NotReady) => break, }; - let mut network_service = self.network_service.lock(); - match msg { - ProtocolMsg::BlockImported(hash, header) => - network_service.user_protocol_mut().on_block_imported(hash, &header), - ProtocolMsg::BlockFinalized(hash, header) => - network_service.user_protocol_mut().on_block_finalized(hash, &header), - ProtocolMsg::ExecuteWithSpec(task) => { - let protocol = network_service.user_protocol_mut(); + ServerToWorkerMsg::ExecuteWithSpec(task) => { + let protocol = self.network_service.user_protocol_mut(); let (mut context, spec) = protocol.specialization_lock(); - task.call_box(spec, &mut context); + task(spec, &mut context); }, - ProtocolMsg::ExecuteWithGossip(task) => { - let protocol = network_service.user_protocol_mut(); + ServerToWorkerMsg::ExecuteWithGossip(task) => { + let protocol = self.network_service.user_protocol_mut(); let (mut context, gossip) = protocol.consensus_gossip_lock(); - task.call_box(gossip, &mut context); + task(gossip, &mut context); } - ProtocolMsg::GossipConsensusMessage(topic, engine_id, message, recipient) => - network_service.user_protocol_mut().gossip_consensus_message(topic, engine_id, message, recipient), - ProtocolMsg::BlocksProcessed(hashes, has_error) => - network_service.user_protocol_mut().blocks_processed(hashes, has_error), - ProtocolMsg::RestartSync => - network_service.user_protocol_mut().restart(), - ProtocolMsg::AnnounceBlock(hash) => - network_service.user_protocol_mut().announce_block(hash), - ProtocolMsg::BlockImportedSync(hash, number) => - network_service.user_protocol_mut().block_imported(&hash, number), - ProtocolMsg::ClearJustificationRequests => - network_service.user_protocol_mut().clear_justification_requests(), - ProtocolMsg::RequestJustification(hash, number) => - network_service.user_protocol_mut().request_justification(&hash, number), - ProtocolMsg::JustificationImportResult(hash, number, success) => - network_service.user_protocol_mut().justification_import_result(hash, number, success), - ProtocolMsg::SetFinalityProofRequestBuilder(builder) => - network_service.user_protocol_mut().set_finality_proof_request_builder(builder), - ProtocolMsg::RequestFinalityProof(hash, number) => - network_service.user_protocol_mut().request_finality_proof(&hash, number), - ProtocolMsg::FinalityProofImportResult(requested_block, finalziation_result) => - network_service.user_protocol_mut() - .finality_proof_import_result(requested_block, finalziation_result), - ProtocolMsg::PropagateExtrinsics => - network_service.user_protocol_mut().propagate_extrinsics(), - #[cfg(any(test, feature = "test-helpers"))] - ProtocolMsg::Tick => network_service.user_protocol_mut().tick(), - #[cfg(any(test, feature = "test-helpers"))] - ProtocolMsg::Synchronize => {}, + ServerToWorkerMsg::GossipConsensusMessage(topic, engine_id, message, recipient) => + self.network_service.user_protocol_mut().gossip_consensus_message(topic, engine_id, message, recipient), + ServerToWorkerMsg::AnnounceBlock(hash) => + self.network_service.user_protocol_mut().announce_block(hash), + ServerToWorkerMsg::RequestJustification(hash, number) => + self.network_service.user_protocol_mut().request_justification(&hash, number), + ServerToWorkerMsg::PropagateExtrinsics => + self.network_service.user_protocol_mut().propagate_extrinsics(), + ServerToWorkerMsg::GetValue(key) => + self.network_service.get_value(&key), + ServerToWorkerMsg::PutValue(key, value) => + self.network_service.put_value(key, value), + ServerToWorkerMsg::AddKnownAddress(peer_id, addr) => + self.network_service.add_known_address(peer_id, addr), } } loop { - let mut network_service = self.network_service.lock(); - let poll_value = network_service.poll(); + // Process the next action coming from the network. + let poll_value = self.network_service.poll(); let outcome = match poll_value { Ok(Async::NotReady) => break, Ok(Async::Ready(Some(BehaviourOut::SubstrateAction(outcome)))) => outcome, Ok(Async::Ready(Some(BehaviourOut::Dht(ev)))) => { - network_service.user_protocol_mut() + self.network_service.user_protocol_mut() .on_event(Event::Dht(ev)); CustomMessageOutcome::None }, @@ -795,9 +617,9 @@ impl, H: ExHashT> Future for Ne } } - let mut network_service = self.network_service.lock(); - self.is_offline.store(network_service.user_protocol_mut().num_connected_peers() == 0, Ordering::Relaxed); - self.is_major_syncing.store(match network_service.user_protocol_mut().sync_state() { + // Update the variables shared with the `NetworkService`. + self.num_connected.store(self.network_service.user_protocol_mut().num_connected_peers(), Ordering::Relaxed); + self.is_major_syncing.store(match self.network_service.user_protocol_mut().sync_state() { SyncState::Idle => false, SyncState::Downloading => true, }, Ordering::Relaxed); @@ -811,3 +633,57 @@ type Swarm = libp2p::core::Swarm< Boxed<(PeerId, StreamMuxerBox), io::Error>, Behaviour >; + +// Implementation of `import_queue::Link` trait using the available local variables. +struct NetworkLink<'a, B: BlockT, S: NetworkSpecialization, H: ExHashT> { + protocol: &'a mut Swarm, +} + +impl<'a, B: BlockT, S: NetworkSpecialization, H: ExHashT> Link for NetworkLink<'a, B, S, H> { + fn block_imported(&mut self, hash: &B::Hash, number: NumberFor) { + self.protocol.user_protocol_mut().block_imported(&hash, number) + } + fn blocks_processed(&mut self, hashes: Vec, has_error: bool) { + self.protocol.user_protocol_mut().blocks_processed(hashes, has_error) + } + fn justification_imported(&mut self, who: PeerId, hash: &B::Hash, number: NumberFor, success: bool) { + self.protocol.user_protocol_mut().justification_import_result(hash.clone(), number, success); + if !success { + info!("Invalid justification provided by {} for #{}", who, hash); + self.protocol.user_protocol_mut().disconnect_peer(&who); + self.protocol.user_protocol_mut().report_peer(who, i32::min_value()); + } + } + fn clear_justification_requests(&mut self) { + self.protocol.user_protocol_mut().clear_justification_requests() + } + fn request_justification(&mut self, hash: &B::Hash, number: NumberFor) { + self.protocol.user_protocol_mut().request_justification(hash, number) + } + fn request_finality_proof(&mut self, hash: &B::Hash, number: NumberFor) { + self.protocol.user_protocol_mut().request_finality_proof(hash, number) + } + fn finality_proof_imported( + &mut self, + who: PeerId, + request_block: (B::Hash, NumberFor), + finalization_result: Result<(B::Hash, NumberFor), ()>, + ) { + let success = finalization_result.is_ok(); + self.protocol.user_protocol_mut().finality_proof_import_result(request_block, finalization_result); + if !success { + info!("Invalid finality proof provided by {} for #{}", who, request_block.0); + self.protocol.user_protocol_mut().disconnect_peer(&who); + self.protocol.user_protocol_mut().report_peer(who, i32::min_value()); + } + } + fn report_peer(&mut self, who: PeerId, reputation_change: i32) { + self.protocol.user_protocol_mut().report_peer(who, reputation_change) + } + fn restart(&mut self) { + self.protocol.user_protocol_mut().restart() + } + fn set_finality_proof_request_builder(&mut self, builder: SharedFinalityProofRequestBuilder) { + self.protocol.user_protocol_mut().set_finality_proof_request_builder(builder) + } +} diff --git a/core/network/src/test/mod.rs b/core/network/src/test/mod.rs index 289f23bc22b43..7fc626f593b53 100644 --- a/core/network/src/test/mod.rs +++ b/core/network/src/test/mod.rs @@ -24,10 +24,10 @@ mod sync; use std::collections::HashMap; use std::sync::Arc; -use crate::build_multiaddr; +use crate::config::build_multiaddr; use log::trace; use crate::chain::FinalityProofProvider; -use client::{self, ClientInfo, BlockchainEvents, FinalityNotifications}; +use client::{self, ClientInfo, BlockchainEvents, ImportNotifications, FinalityNotifications}; use client::{in_mem::Backend as InMemoryBackend, error::Result as ClientResult}; use client::block_builder::BlockBuilder; use client::backend::AuxStore; @@ -41,7 +41,7 @@ use consensus::block_import::BlockImport; use consensus::{Error as ConsensusError, well_known_cache_keys::{self, Id as CacheKeyId}}; use consensus::{BlockOrigin, ForkChoiceStrategy, ImportBlock, JustificationImport}; use futures::prelude::*; -use crate::{NetworkWorker, NetworkService, ProtocolId}; +use crate::{NetworkWorker, NetworkService, config::ProtocolId}; use crate::config::{NetworkConfiguration, TransportConfig}; use libp2p::PeerId; use primitives::{H256, Blake2Hasher}; @@ -119,7 +119,7 @@ impl NetworkSpecialization for DummySpecialization { fn on_event( &mut self, - _event: crate::event::Event + _event: crate::specialization::Event ) {} } @@ -216,7 +216,8 @@ pub struct Peer> { /// instead of going through the import queue. block_import: Arc>, network: NetworkWorker::Hash>, - to_poll: smallvec::SmallVec<[Box + Send>; 2]>, + imported_blocks_stream: futures::stream::Fuse>, + finality_notification_stream: futures::stream::Fuse>, } impl> Peer { @@ -246,7 +247,7 @@ impl> Peer { } /// Add blocks to the peer -- edit the block before adding - pub fn generate_blocks(&self, count: usize, origin: BlockOrigin, edit_block: F) -> H256 + pub fn generate_blocks(&mut self, count: usize, origin: BlockOrigin, edit_block: F) -> H256 where F: FnMut(BlockBuilder) -> Block { let best_hash = self.client.info().chain.best_hash; @@ -256,7 +257,7 @@ impl> Peer { /// Add blocks to the peer -- edit the block before adding. The chain will /// start at the given block iD. fn generate_blocks_at( - &self, + &mut self, at: BlockId, count: usize, origin: BlockOrigin, @@ -289,7 +290,7 @@ impl> Peer { Default::default() }; self.block_import.import_block(import_block, cache).expect("block_import failed"); - self.network.service().on_block_imported(hash, header); + self.network.on_block_imported(hash, header); at = hash; } @@ -298,14 +299,14 @@ impl> Peer { } /// Push blocks to the peer (simplified: with or without a TX) - pub fn push_blocks(&self, count: usize, with_tx: bool) -> H256 { + pub fn push_blocks(&mut self, count: usize, with_tx: bool) -> H256 { let best_hash = self.client.info().chain.best_hash; self.push_blocks_at(BlockId::Hash(best_hash), count, with_tx) } /// Push blocks to the peer (simplified: with or without a TX) starting from /// given hash. - pub fn push_blocks_at(&self, at: BlockId, count: usize, with_tx: bool) -> H256 { + pub fn push_blocks_at(&mut self, at: BlockId, count: usize, with_tx: bool) -> H256 { let mut nonce = 0; if with_tx { self.generate_blocks_at(at, count, BlockOrigin::File, |mut builder| { @@ -324,7 +325,7 @@ impl> Peer { } } - pub fn push_authorities_change_block(&self, new_authorities: Vec) -> H256 { + pub fn push_authorities_change_block(&mut self, new_authorities: Vec) -> H256 { self.generate_blocks(1, BlockOrigin::File, |mut builder| { builder.push(Extrinsic::AuthoritiesChange(new_authorities.clone())).unwrap(); builder.bake().unwrap() @@ -376,7 +377,7 @@ pub trait TestNetFactory: Sized { fn make_verifier(&self, client: PeersClient, config: &ProtocolConfig) -> Arc; /// Get reference to peer. - fn peer(&self, i: usize) -> &Peer; + fn peer(&mut self, i: usize) -> &mut Peer; fn peers(&self) -> &Vec>; fn mut_peers>)>(&mut self, closure: F); @@ -448,76 +449,21 @@ pub trait TestNetFactory: Sized { specialization: self::SpecializationFactory::create(), }).unwrap(); - let blocks_notif_future = { - let network = Arc::downgrade(&network.service().clone()); - client.import_notification_stream() - .for_each(move |notification| { - if let Some(network) = network.upgrade() { - network.on_block_imported(notification.hash, notification.header); - } - Ok(()) - }) - .then(|_| Ok(())) - }; - - let finality_notif_future = { - let network = Arc::downgrade(&network.service().clone()); - - // A utility stream that drops all ready items and only returns the last one. - // This is used to only keep the last finality notification and avoid - // overloading the sync module with notifications. - struct MostRecentNotification(futures::stream::Fuse>); - - impl Stream for MostRecentNotification { - type Item = as Stream>::Item; - type Error = as Stream>::Error; - - fn poll(&mut self) -> Poll, Self::Error> { - let mut last = None; - let last = loop { - match self.0.poll()? { - Async::Ready(Some(item)) => { last = Some(item) } - Async::Ready(None) => match last { - None => return Ok(Async::Ready(None)), - Some(last) => break last, - }, - Async::NotReady => match last { - None => return Ok(Async::NotReady), - Some(last) => break last, - }, - } - }; - - Ok(Async::Ready(Some(last))) - } - } - - MostRecentNotification(client.finality_notification_stream().fuse()) - .for_each(move |notification| { - if let Some(network) = network.upgrade() { - network.on_block_finalized(notification.hash, notification.header); - } - Ok(()) - }) - .then(|_| Ok(())) - }; - self.mut_peers(|peers| { for peer in peers.iter_mut() { peer.network.add_known_address(network.service().local_peer_id(), listen_addr.clone()); } + let imported_blocks_stream = client.import_notification_stream().fuse(); + let finality_notification_stream = client.finality_notification_stream().fuse(); + peers.push(Peer { data, client: PeersClient::Full(client), + imported_blocks_stream, + finality_notification_stream, block_import, verifier, - to_poll: { - let mut sv = smallvec::SmallVec::new(); - sv.push(Box::new(blocks_notif_future) as Box<_>); - sv.push(Box::new(finality_notif_future) as Box<_>); - sv - }, network, }); }); @@ -559,33 +505,21 @@ pub trait TestNetFactory: Sized { specialization: self::SpecializationFactory::create(), }).unwrap(); - let blocks_notif_future = { - let network = Arc::downgrade(&network.service().clone()); - client.import_notification_stream() - .for_each(move |notification| { - if let Some(network) = network.upgrade() { - network.on_block_imported(notification.hash, notification.header); - } - Ok(()) - }) - .then(|_| Ok(())) - }; - self.mut_peers(|peers| { for peer in peers.iter_mut() { peer.network.add_known_address(network.service().local_peer_id(), listen_addr.clone()); } + let imported_blocks_stream = client.import_notification_stream().fuse(); + let finality_notification_stream = client.finality_notification_stream().fuse(); + peers.push(Peer { data, verifier, block_import, client: PeersClient::Light(client), - to_poll: { - let mut sv = smallvec::SmallVec::new(); - sv.push(Box::new(blocks_notif_future) as Box<_>); - sv - }, + imported_blocks_stream, + finality_notification_stream, network, }); }); @@ -621,7 +555,20 @@ pub trait TestNetFactory: Sized { self.mut_peers(|peers| { for peer in peers { peer.network.poll().unwrap(); - peer.to_poll.retain(|f| f.poll() == Ok(Async::NotReady)); + + // We poll `imported_blocks_stream`. + while let Ok(Async::Ready(Some(notification))) = peer.imported_blocks_stream.poll() { + peer.network.on_block_imported(notification.hash, notification.header); + } + + // We poll `finality_notification_stream`, but we only take the last event. + let mut last = None; + while let Ok(Async::Ready(Some(item))) = peer.finality_notification_stream.poll() { + last = Some(item); + } + if let Some(notification) = last { + peer.network.on_block_finalized(notification.hash, notification.header); + } } }); } @@ -649,8 +596,8 @@ impl TestNetFactory for TestNet { Arc::new(PassThroughVerifier(false)) } - fn peer(&self, i: usize) -> &Peer<(), Self::Specialization> { - &self.peers[i] + fn peer(&mut self, i: usize) -> &mut Peer<(), Self::Specialization> { + &mut self.peers[i] } fn peers(&self) -> &Vec> { @@ -695,7 +642,7 @@ impl TestNetFactory for JustificationTestNet { self.0.make_verifier(client, config) } - fn peer(&self, i: usize) -> &Peer { + fn peer(&mut self, i: usize) -> &mut Peer { self.0.peer(i) } diff --git a/core/service/src/config.rs b/core/service/src/config.rs index ae4d0982d23b2..8769f1dac9564 100644 --- a/core/service/src/config.rs +++ b/core/service/src/config.rs @@ -21,8 +21,7 @@ use transaction_pool; use crate::chain_spec::ChainSpec; pub use client::ExecutionStrategies; pub use client_db::PruningMode; -pub use network::ExtTransport; -pub use network::config::{NetworkConfiguration, Roles}; +pub use network::config::{ExtTransport, NetworkConfiguration, Roles}; use runtime_primitives::BuildStorage; use serde::{Serialize, de::DeserializeOwned}; use target_info::Target; @@ -51,8 +50,8 @@ pub struct Configuration { pub database_cache_size: Option, /// Size of internal state cache in Bytes pub state_cache_size: usize, - /// Size in percent of cache size dedicated to child tries - pub state_cache_child_ratio: Option, + /// Size in percent of cache size dedicated to child tries + pub state_cache_child_ratio: Option, /// Pruning settings. pub pruning: PruningMode, /// Additional key seeds. diff --git a/core/service/src/lib.rs b/core/service/src/lib.rs index 933aafd706ca0..83f0488a7e6cb 100644 --- a/core/service/src/lib.rs +++ b/core/service/src/lib.rs @@ -36,6 +36,7 @@ use client::{BlockchainEvents, backend::Backend, runtime_api::BlockT}; use exit_future::Signal; use futures::prelude::*; use keystore::Store as Keystore; +use network::NetworkState; use log::{info, warn, debug, error}; use parity_codec::{Encode, Decode}; use primitives::{Pair, Public, crypto::TypedKey, ed25519}; @@ -76,7 +77,9 @@ pub struct Service { select_chain: Option<::SelectChain>, network: Arc>, /// Sinks to propagate network status updates. - network_status_sinks: Arc>>>>>, + network_status_sinks: Arc>, NetworkState + )>>>>, transaction_pool: Arc>, keystore: Option, exit: ::exit_future::Exit, @@ -236,7 +239,7 @@ impl Service { DEFAULT_PROTOCOL_ID } }.as_bytes(); - network::ProtocolId::from(protocol_id_full) + network::config::ProtocolId::from(protocol_id_full) }; let network_params = network::config::Params { @@ -256,11 +259,6 @@ impl Service { let network = network_mut.service().clone(); let network_status_sinks = Arc::new(Mutex::new(Vec::new())); - let _ = to_spawn_tx.unbounded_send(Box::new(build_network_future(network_mut, network_status_sinks.clone()) - .map_err(|_| ()) - .select(exit.clone()) - .then(|_| Ok(())))); - #[allow(deprecated)] let offchain_storage = client.backend().offchain_storage(); let offchain_workers = match (config.offchain_worker, offchain_storage) { @@ -279,7 +277,6 @@ impl Service { { // block notifications - let network = Arc::downgrade(&network); let txpool = Arc::downgrade(&transaction_pool); let wclient = Arc::downgrade(&client); let offchain = offchain_workers.as_ref().map(Arc::downgrade); @@ -289,10 +286,6 @@ impl Service { .for_each(move |notification| { let number = *notification.header.number(); - if let Some(network) = network.upgrade() { - network.on_block_imported(notification.hash, notification.header); - } - if let (Some(txpool), Some(client)) = (txpool.upgrade(), wclient.upgrade()) { Components::RuntimeServices::maintain_transaction_pool( &BlockId::hash(notification.hash), @@ -317,52 +310,6 @@ impl Service { let _ = to_spawn_tx.unbounded_send(Box::new(events)); } - { - // finality notifications - let network = Arc::downgrade(&network); - - // A utility stream that drops all ready items and only returns the last one. - // This is used to only keep the last finality notification and avoid - // overloading the sync module with notifications. - struct MostRecentNotification(futures::stream::Fuse>); - - impl Stream for MostRecentNotification { - type Item = as Stream>::Item; - type Error = as Stream>::Error; - - fn poll(&mut self) -> Poll, Self::Error> { - let mut last = None; - let last = loop { - match self.0.poll()? { - Async::Ready(Some(item)) => { last = Some(item) } - Async::Ready(None) => match last { - None => return Ok(Async::Ready(None)), - Some(last) => break last, - }, - Async::NotReady => match last { - None => return Ok(Async::NotReady), - Some(last) => break last, - }, - } - }; - - Ok(Async::Ready(Some(last))) - } - } - - let events = MostRecentNotification(client.finality_notification_stream().fuse()) - .for_each(move |notification| { - if let Some(network) = network.upgrade() { - network.on_block_finalized(notification.hash, notification.header); - } - Ok(()) - }) - .select(exit.clone()) - .then(|_| Ok(())); - - let _ = to_spawn_tx.unbounded_send(Box::new(events)); - } - { // extrinsic notifications let network = Arc::downgrade(&network); @@ -388,12 +335,11 @@ impl Service { // Periodically notify the telemetry. let transaction_pool_ = transaction_pool.clone(); let client_ = client.clone(); - let network_ = network.clone(); let mut sys = System::new(); let self_pid = get_current_pid().ok(); - let (netstat_tx, netstat_rx) = mpsc::unbounded(); + let (netstat_tx, netstat_rx) = mpsc::unbounded::<(NetworkStatus>, NetworkState)>(); network_status_sinks.lock().push(netstat_tx); - let tel_task = netstat_rx.for_each(move |net_status| { + let tel_task = netstat_rx.for_each(move |(net_status, network_state)| { let info = client_.info(); let best_number = info.chain.best_number.saturated_into::(); let best_hash = info.chain.best_hash; @@ -419,8 +365,6 @@ impl Service { } else { (0.0, 0) } } else { (0.0, 0) }; - let network_state = network_.network_state(); - telemetry!( SUBSTRATE_INFO; "system.interval"; @@ -461,11 +405,17 @@ impl Service { }; let rpc_handlers = gen_handler(); let rpc = start_rpc_servers::(&config, gen_handler)?; - let _ = to_spawn_tx.unbounded_send(Box::new(build_system_rpc_handler::( - network.clone(), + + let _ = to_spawn_tx.unbounded_send(Box::new(build_network_future::( + network_mut, + client.clone(), + network_status_sinks.clone(), system_rpc_rx, has_bootnodes - ))); + ) + .map_err(|_| ()) + .select(exit.clone()) + .then(|_| Ok(())))); let telemetry_connection_sinks: Arc>>> = Default::default(); @@ -597,7 +547,7 @@ impl Service { } /// Returns a receiver that periodically receives a status of the network. - pub fn network_status(&self) -> mpsc::UnboundedReceiver>> { + pub fn network_status(&self) -> mpsc::UnboundedReceiver<(NetworkStatus>, NetworkState)> { let (sink, stream) = mpsc::unbounded(); self.network_status_sinks.lock().push(sink); stream @@ -660,15 +610,67 @@ impl Executor + Send>> /// Builds a never-ending future that continuously polls the network. /// /// The `status_sink` contain a list of senders to send a periodic network status to. -fn build_network_future, H: network::ExHashT>( - mut network: network::NetworkWorker, - status_sinks: Arc>>>>, +fn build_network_future< + Components: components::Components, + S: network::specialization::NetworkSpecialization>, + H: network::ExHashT +> ( + mut network: network::NetworkWorker, S, H>, + client: Arc>, + status_sinks: Arc>, NetworkState)>>>>, + mut rpc_rx: mpsc::UnboundedReceiver>>, + should_have_peers: bool, ) -> impl Future { // Interval at which we send status updates on the status stream. const STATUS_INTERVAL: Duration = Duration::from_millis(5000); let mut status_interval = tokio_timer::Interval::new_interval(STATUS_INTERVAL); + let mut imported_blocks_stream = client.import_notification_stream().fuse(); + let mut finality_notification_stream = client.finality_notification_stream().fuse(); + futures::future::poll_fn(move || { + // We poll `imported_blocks_stream`. + while let Ok(Async::Ready(Some(notification))) = imported_blocks_stream.poll() { + network.on_block_imported(notification.hash, notification.header); + } + + // We poll `finality_notification_stream`, but we only take the last event. + let mut last = None; + while let Ok(Async::Ready(Some(item))) = finality_notification_stream.poll() { + last = Some(item); + } + if let Some(notification) = last { + network.on_block_finalized(notification.hash, notification.header); + } + + // Poll the RPC requests and answer them. + while let Ok(Async::Ready(Some(request))) = rpc_rx.poll() { + match request { + rpc::apis::system::Request::Health(sender) => { + let _ = sender.send(rpc::apis::system::Health { + peers: network.peers_debug_info().len(), + is_syncing: network.service().is_major_syncing(), + should_have_peers, + }); + }, + rpc::apis::system::Request::Peers(sender) => { + let _ = sender.send(network.peers_debug_info().into_iter().map(|(peer_id, p)| + rpc::apis::system::PeerInfo { + peer_id: peer_id.to_base58(), + roles: format!("{:?}", p.roles), + protocol_version: p.protocol_version, + best_hash: p.best_hash, + best_number: p.best_number, + } + ).collect()); + } + rpc::apis::system::Request::NetworkState(sender) => { + let _ = sender.send(network.network_state()); + } + }; + } + + // Interval report for the external API. while let Ok(Async::Ready(_)) = status_interval.poll() { let status = NetworkStatus { sync_state: network.sync_state(), @@ -679,10 +681,12 @@ fn build_network_future network::TransactionPool, ComponentBlock< } } -/// Builds a never-ending `Future` that answers the RPC requests coming on the receiver. -fn build_system_rpc_handler( - network: Arc>, - rx: mpsc::UnboundedReceiver>>, - should_have_peers: bool, -) -> impl Future { - rx.for_each(move |request| { - match request { - rpc::apis::system::Request::Health(sender) => { - let _ = sender.send(rpc::apis::system::Health { - peers: network.peers_debug_info().len(), - is_syncing: network.is_major_syncing(), - should_have_peers, - }); - }, - rpc::apis::system::Request::Peers(sender) => { - let _ = sender.send(network.peers_debug_info().into_iter().map(|(peer_id, p)| rpc::apis::system::PeerInfo { - peer_id: peer_id.to_base58(), - roles: format!("{:?}", p.roles), - protocol_version: p.protocol_version, - best_hash: p.best_hash, - best_number: p.best_number, - }).collect()); - } - rpc::apis::system::Request::NetworkState(sender) => { - let _ = sender.send(network.network_state()); - } - }; - - Ok(()) - }) -} - /// Constructs a service factory with the given name that implements the `ServiceFactory` trait. /// The required parameters are required to be given in the exact order. Some parameters are followed /// by `{}` blocks. These blocks are required and used to initialize the given parameter. diff --git a/core/service/test/src/lib.rs b/core/service/test/src/lib.rs index 134ffe507290a..48b310db5fc03 100644 --- a/core/service/test/src/lib.rs +++ b/core/service/test/src/lib.rs @@ -34,7 +34,7 @@ use service::{ Roles, FactoryExtrinsic, }; -use network::{multiaddr, Multiaddr, ManageNetwork}; +use network::{multiaddr, Multiaddr}; use network::config::{NetworkConfiguration, TransportConfig, NodeKeyConfig, Secret, NonReservedPeerMode}; use sr_primitives::generic::BlockId; use consensus::{ImportBlock, BlockImport}; @@ -301,9 +301,9 @@ pub fn connectivity(spec: FactoryChainSpec) where service.get().network().add_reserved_peer(first_address.to_string()).expect("Error adding reserved peer"); } network.run_until_all_full( - |_index, service| service.get().network().peers_debug_info().len() == NUM_FULL_NODES - 1 + |_index, service| service.get().network().num_connected() == NUM_FULL_NODES - 1 + NUM_LIGHT_NODES, - |_index, service| service.get().network().peers_debug_info().len() == NUM_FULL_NODES, + |_index, service| service.get().network().num_connected() == NUM_FULL_NODES, ); network.runtime }; @@ -340,9 +340,9 @@ pub fn connectivity(spec: FactoryChainSpec) where } } network.run_until_all_full( - |_index, service| service.get().network().peers_debug_info().len() == NUM_FULL_NODES - 1 + |_index, service| service.get().network().num_connected() == NUM_FULL_NODES - 1 + NUM_LIGHT_NODES, - |_index, service| service.get().network().peers_debug_info().len() == NUM_FULL_NODES, + |_index, service| service.get().network().num_connected() == NUM_FULL_NODES, ); } temp.close().expect("Error removing temp dir");