diff --git a/Cargo.lock b/Cargo.lock index 886255317316b..707b6a66a2b8a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6564,7 +6564,6 @@ dependencies = [ "futures 0.3.5", "log", "parity-util-mem", - "parking_lot 0.10.2", "sc-client-api", "sc-network", "sp-blockchain", diff --git a/client/informant/Cargo.toml b/client/informant/Cargo.toml index d2df78537d8d2..98c72f5deb5e0 100644 --- a/client/informant/Cargo.toml +++ b/client/informant/Cargo.toml @@ -16,11 +16,10 @@ ansi_term = "0.12.1" futures = "0.3.4" log = "0.4.8" parity-util-mem = { version = "0.6.1", default-features = false, features = ["primitive-types"] } -wasm-timer = "0.2" sc-client-api = { version = "2.0.0-rc4", path = "../api" } sc-network = { version = "0.8.0-rc4", path = "../network" } sp-blockchain = { version = "2.0.0-rc4", path = "../../primitives/blockchain" } sp-runtime = { version = "2.0.0-rc4", path = "../../primitives/runtime" } sp-utils = { version = "2.0.0-rc2", path = "../../primitives/utils" } sp-transaction-pool = { version = "2.0.0-rc2", path = "../../primitives/transaction-pool" } -parking_lot = "0.10.2" +wasm-timer = "0.2" diff --git a/client/informant/src/lib.rs b/client/informant/src/lib.rs index d56afcf335917..3daf29a9f7837 100644 --- a/client/informant/src/lib.rs +++ b/client/informant/src/lib.rs @@ -29,7 +29,6 @@ use sp_runtime::traits::{Block as BlockT, Header}; use sp_transaction_pool::TransactionPool; use sp_utils::{status_sinks, mpsc::tracing_unbounded}; use std::{fmt::Display, sync::Arc, time::Duration, collections::VecDeque}; -use parking_lot::Mutex; mod display; @@ -82,7 +81,7 @@ impl TransactionPoolAndMaybeMallogSizeOf for /// Builds the informant and returns a `Future` that drives the informant. pub fn build( client: Arc, - network_status_sinks: Arc, NetworkState)>>>, + network_status_sinks: Arc, NetworkState)>>, pool: Arc, format: OutputFormat, ) -> impl futures::Future @@ -94,7 +93,7 @@ where let client_1 = client.clone(); let (network_status_sink, network_status_stream) = tracing_unbounded("mpsc_network_status"); - network_status_sinks.lock().push(Duration::from_millis(5000), network_status_sink); + network_status_sinks.push(Duration::from_millis(5000), network_status_sink); let display_notifications = network_status_stream .for_each(move |(net_status, _)| { diff --git a/client/service/src/builder.rs b/client/service/src/builder.rs index 16b41e135a518..1585298d98bd6 100644 --- a/client/service/src/builder.rs +++ b/client/service/src/builder.rs @@ -1157,7 +1157,7 @@ async fn telemetry_periodic_send( client: Arc, transaction_pool: Arc, mut metrics_service: MetricsService, - network_status_sinks: Arc, NetworkState)>>> + network_status_sinks: Arc, NetworkState)>> ) where TBl: BlockT, @@ -1165,7 +1165,7 @@ async fn telemetry_periodic_send( TExPool: MaintainedTransactionPool::Hash>, { let (state_tx, state_rx) = tracing_unbounded::<(NetworkStatus<_>, NetworkState)>("mpsc_netstat1"); - network_status_sinks.lock().push(std::time::Duration::from_millis(5000), state_tx); + network_status_sinks.push(std::time::Duration::from_millis(5000), state_tx); state_rx.for_each(move |(net_status, _)| { let info = client.usage_info(); metrics_service.tick( @@ -1178,11 +1178,11 @@ async fn telemetry_periodic_send( } async fn telemetry_periodic_network_state( - network_status_sinks: Arc, NetworkState)>>> + network_status_sinks: Arc, NetworkState)>> ) { // Periodically send the network state to the telemetry. let (netstat_tx, netstat_rx) = tracing_unbounded::<(NetworkStatus<_>, NetworkState)>("mpsc_netstat2"); - network_status_sinks.lock().push(std::time::Duration::from_secs(30), netstat_tx); + network_status_sinks.push(std::time::Duration::from_secs(30), netstat_tx); netstat_rx.for_each(move |(_, network_state)| { telemetry!( SUBSTRATE_INFO; @@ -1347,7 +1347,7 @@ fn build_network( ) -> Result< ( Arc::Hash>>, - Arc, NetworkState)>>>, + Arc, NetworkState)>>, Pin + Send>> ), Error @@ -1407,7 +1407,7 @@ fn build_network( let has_bootnodes = !network_params.network_config.boot_nodes.is_empty(); let network_mut = sc_network::NetworkWorker::new(network_params)?; let network = network_mut.service().clone(); - let network_status_sinks = Arc::new(Mutex::new(status_sinks::StatusSinks::new())); + let network_status_sinks = Arc::new(status_sinks::StatusSinks::new()); let future = build_network_future( config.role.clone(), diff --git a/client/service/src/lib.rs b/client/service/src/lib.rs index 978b77974fbb3..2c09591fc7d65 100644 --- a/client/service/src/lib.rs +++ b/client/service/src/lib.rs @@ -20,7 +20,7 @@ //! Manages communication between them. #![warn(missing_docs)] -#![recursion_limit="128"] +#![recursion_limit = "1024"] pub mod config; pub mod chain_ops; @@ -42,7 +42,7 @@ use wasm_timer::Instant; use std::task::Poll; use parking_lot::Mutex; -use futures::{Future, FutureExt, Stream, StreamExt, compat::*}; +use futures::{Future, FutureExt, Stream, StreamExt, stream, compat::*}; use sc_network::{NetworkStatus, network_state::NetworkState, PeerId}; use log::{log, warn, debug, error, Level}; use codec::{Encode, Decode}; @@ -118,12 +118,12 @@ impl RpcHandlers { /// Sinks to propagate network status updates. /// For each element, every time the `Interval` fires we push an element on the sender. pub struct NetworkStatusSinks( - Arc, NetworkState)>>>, + Arc, NetworkState)>>, ); impl NetworkStatusSinks { fn new( - sinks: Arc, NetworkState)>>> + sinks: Arc, NetworkState)>> ) -> Self { Self(sinks) } @@ -132,7 +132,7 @@ impl NetworkStatusSinks { pub fn network_status(&self, interval: Duration) -> TracingUnboundedReceiver<(NetworkStatus, NetworkState)> { let (sink, stream) = tracing_unbounded("mpsc_network_status"); - self.0.lock().push(interval, sink); + self.0.push(interval, sink); stream } } @@ -181,7 +181,7 @@ pub struct ServiceComponents, TSc, TExPool, /// Builds a never-ending future that continuously polls the network. /// /// The `status_sink` contain a list of senders to send a periodic network status to. -fn build_network_future< +async fn build_network_future< B: BlockT, C: BlockchainEvents, H: sc_network::ExHashT @@ -189,126 +189,150 @@ fn build_network_future< role: Role, mut network: sc_network::NetworkWorker, client: Arc, - status_sinks: Arc, NetworkState)>>>, + status_sinks: Arc, NetworkState)>>, mut rpc_rx: TracingUnboundedReceiver>, should_have_peers: bool, announce_imported_blocks: bool, -) -> impl Future { +) { let mut imported_blocks_stream = client.import_notification_stream().fuse(); - let mut finality_notification_stream = client.finality_notification_stream().fuse(); - futures::future::poll_fn(move |cx| { - let before_polling = Instant::now(); + // Stream of finalized blocks reported by the client. + let mut finality_notification_stream = { + let mut finality_notification_stream = client.finality_notification_stream().fuse(); - // We poll `imported_blocks_stream`. - while let Poll::Ready(Some(notification)) = Pin::new(&mut imported_blocks_stream).poll_next(cx) { - if announce_imported_blocks { - network.service().announce_block(notification.hash, Vec::new()); + // We tweak the `Stream` in order to merge together multiple items if they happen to be + // ready. This way, we only get the latest finalized block. + stream::poll_fn(move |cx| { + let mut last = None; + while let Poll::Ready(Some(item)) = Pin::new(&mut finality_notification_stream).poll_next(cx) { + last = Some(item); } - - if let sp_consensus::BlockOrigin::Own = notification.origin { - network.service().own_block_imported( - notification.hash, - notification.header.number().clone(), - ); + if let Some(last) = last { + Poll::Ready(Some(last)) + } else { + Poll::Pending } - } + }).fuse() + }; - // We poll `finality_notification_stream`, but we only take the last event. - let mut last = None; - while let Poll::Ready(Some(item)) = Pin::new(&mut finality_notification_stream).poll_next(cx) { - last = Some(item); - } - if let Some(notification) = last { - network.on_block_finalized(notification.hash, notification.header); - } + loop { + let before_polling = Instant::now(); - // Poll the RPC requests and answer them. - while let Poll::Ready(Some(request)) = Pin::new(&mut rpc_rx).poll_next(cx) { - match request { - sc_rpc::system::Request::Health(sender) => { - let _ = sender.send(sc_rpc::system::Health { - peers: network.peers_debug_info().len(), - is_syncing: network.service().is_major_syncing(), - should_have_peers, - }); - }, - sc_rpc::system::Request::LocalPeerId(sender) => { - let _ = sender.send(network.local_peer_id().to_base58()); - }, - sc_rpc::system::Request::LocalListenAddresses(sender) => { - let peer_id = network.local_peer_id().clone().into(); - let p2p_proto_suffix = sc_network::multiaddr::Protocol::P2p(peer_id); - let addresses = network.listen_addresses() - .map(|addr| addr.clone().with(p2p_proto_suffix.clone()).to_string()) - .collect(); - let _ = sender.send(addresses); - }, - sc_rpc::system::Request::Peers(sender) => { - let _ = sender.send(network.peers_debug_info().into_iter().map(|(peer_id, p)| - sc_rpc::system::PeerInfo { - peer_id: peer_id.to_base58(), - roles: format!("{:?}", p.roles), - protocol_version: p.protocol_version, - best_hash: p.best_hash, - best_number: p.best_number, - } - ).collect()); + futures::select!{ + // List of blocks that the client has imported. + notification = imported_blocks_stream.next() => { + let notification = match notification { + Some(n) => n, + // If this stream is shut down, that means the client has shut down, and the + // most appropriate thing to do for the network future is to shut down too. + None => return, + }; + + if announce_imported_blocks { + network.service().announce_block(notification.hash, Vec::new()); } - sc_rpc::system::Request::NetworkState(sender) => { - if let Some(network_state) = serde_json::to_value(&network.network_state()).ok() { - let _ = sender.send(network_state); - } - } - sc_rpc::system::Request::NetworkAddReservedPeer(peer_addr, sender) => { - let x = network.add_reserved_peer(peer_addr) - .map_err(sc_rpc::system::error::Error::MalformattedPeerArg); - let _ = sender.send(x); + + if let sp_consensus::BlockOrigin::Own = notification.origin { + network.service().own_block_imported( + notification.hash, + notification.header.number().clone(), + ); } - sc_rpc::system::Request::NetworkRemoveReservedPeer(peer_id, sender) => { - let _ = match peer_id.parse::() { - Ok(peer_id) => { - network.remove_reserved_peer(peer_id); - sender.send(Ok(())) + } + + // List of blocks that the client has finalized. + notification = finality_notification_stream.select_next_some() => { + network.on_block_finalized(notification.hash, notification.header); + } + + // Answer incoming RPC requests. + request = rpc_rx.select_next_some() => { + match request { + sc_rpc::system::Request::Health(sender) => { + let _ = sender.send(sc_rpc::system::Health { + peers: network.peers_debug_info().len(), + is_syncing: network.service().is_major_syncing(), + should_have_peers, + }); + }, + sc_rpc::system::Request::LocalPeerId(sender) => { + let _ = sender.send(network.local_peer_id().to_base58()); + }, + sc_rpc::system::Request::LocalListenAddresses(sender) => { + let peer_id = network.local_peer_id().clone().into(); + let p2p_proto_suffix = sc_network::multiaddr::Protocol::P2p(peer_id); + let addresses = network.listen_addresses() + .map(|addr| addr.clone().with(p2p_proto_suffix.clone()).to_string()) + .collect(); + let _ = sender.send(addresses); + }, + sc_rpc::system::Request::Peers(sender) => { + let _ = sender.send(network.peers_debug_info().into_iter().map(|(peer_id, p)| + sc_rpc::system::PeerInfo { + peer_id: peer_id.to_base58(), + roles: format!("{:?}", p.roles), + protocol_version: p.protocol_version, + best_hash: p.best_hash, + best_number: p.best_number, + } + ).collect()); + } + sc_rpc::system::Request::NetworkState(sender) => { + if let Some(network_state) = serde_json::to_value(&network.network_state()).ok() { + let _ = sender.send(network_state); } - Err(e) => sender.send(Err(sc_rpc::system::error::Error::MalformattedPeerArg( - e.to_string(), - ))), - }; - } - sc_rpc::system::Request::NodeRoles(sender) => { - use sc_rpc::system::NodeRole; + } + sc_rpc::system::Request::NetworkAddReservedPeer(peer_addr, sender) => { + let x = network.add_reserved_peer(peer_addr) + .map_err(sc_rpc::system::error::Error::MalformattedPeerArg); + let _ = sender.send(x); + } + sc_rpc::system::Request::NetworkRemoveReservedPeer(peer_id, sender) => { + let _ = match peer_id.parse::() { + Ok(peer_id) => { + network.remove_reserved_peer(peer_id); + sender.send(Ok(())) + } + Err(e) => sender.send(Err(sc_rpc::system::error::Error::MalformattedPeerArg( + e.to_string(), + ))), + }; + } + sc_rpc::system::Request::NodeRoles(sender) => { + use sc_rpc::system::NodeRole; - let node_role = match role { - Role::Authority { .. } => NodeRole::Authority, - Role::Light => NodeRole::LightClient, - Role::Full => NodeRole::Full, - Role::Sentry { .. } => NodeRole::Sentry, - }; + let node_role = match role { + Role::Authority { .. } => NodeRole::Authority, + Role::Light => NodeRole::LightClient, + Role::Full => NodeRole::Full, + Role::Sentry { .. } => NodeRole::Sentry, + }; - let _ = sender.send(vec![node_role]); + let _ = sender.send(vec![node_role]); + } } - }; - } + } - // Interval report for the external API. - status_sinks.lock().poll(cx, || { - let status = NetworkStatus { - sync_state: network.sync_state(), - best_seen_block: network.best_seen_block(), - num_sync_peers: network.num_sync_peers(), - num_connected_peers: network.num_connected_peers(), - num_active_peers: network.num_active_peers(), - average_download_per_sec: network.average_download_per_sec(), - average_upload_per_sec: network.average_upload_per_sec(), - }; - let state = network.network_state(); - (status, state) - }); - - // Main network polling. - if let Poll::Ready(()) = network.poll_unpin(cx) { - return Poll::Ready(()); + // The network worker has done something. Nothing special to do, but could be + // used in the future to perform actions in response of things that happened on + // the network. + _ = (&mut network).fuse() => {} + + // At a regular interval, we send the state of the network on what is called + // the "status sinks". + ready_sink = status_sinks.next().fuse() => { + let status = NetworkStatus { + sync_state: network.sync_state(), + best_seen_block: network.best_seen_block(), + num_sync_peers: network.num_sync_peers(), + num_connected_peers: network.num_connected_peers(), + num_active_peers: network.num_active_peers(), + average_download_per_sec: network.average_download_per_sec(), + average_upload_per_sec: network.average_upload_per_sec(), + }; + let state = network.network_state(); + ready_sink.send((status, state)); + } } // Now some diagnostic for performances. @@ -319,9 +343,7 @@ fn build_network_future< "⚠️ Polling the network future took {:?}", polling_dur ); - - Poll::Pending - }) + } } #[cfg(not(target_os = "unknown"))] diff --git a/primitives/utils/src/status_sinks.rs b/primitives/utils/src/status_sinks.rs index 47bccebb960b4..65a560af4eaa5 100644 --- a/primitives/utils/src/status_sinks.rs +++ b/primitives/utils/src/status_sinks.rs @@ -14,19 +14,27 @@ // You should have received a copy of the GNU General Public License // along with Substrate. If not, see . -use futures::{Stream, stream::futures_unordered::FuturesUnordered}; -use std::time::Duration; -use std::pin::Pin; -use std::task::{Poll, Context}; +use crate::mpsc::{tracing_unbounded, TracingUnboundedReceiver, TracingUnboundedSender}; +use futures::{prelude::*, lock::Mutex}; use futures_timer::Delay; -use crate::mpsc::TracingUnboundedSender; +use std::{pin::Pin, task::{Poll, Context}, time::Duration}; /// Holds a list of `UnboundedSender`s, each associated with a certain time period. Every time the /// period elapses, we push an element on the sender. /// /// Senders are removed only when they are closed. pub struct StatusSinks { - entries: FuturesUnordered>, + /// Should only be locked by `next`. + inner: Mutex>, + /// Sending side of `Inner::entries_rx`. + entries_tx: TracingUnboundedSender>, +} + +struct Inner { + /// The actual entries of the list. + entries: stream::FuturesUnordered>, + /// Receives new entries and puts them in `entries`. + entries_rx: TracingUnboundedReceiver>, } struct YieldAfter { @@ -38,56 +46,114 @@ struct YieldAfter { impl StatusSinks { /// Builds a new empty collection. pub fn new() -> StatusSinks { + let (entries_tx, entries_rx) = tracing_unbounded("status-sinks-entries"); + StatusSinks { - entries: FuturesUnordered::new(), + inner: Mutex::new(Inner { + entries: stream::FuturesUnordered::new(), + entries_rx, + }), + entries_tx, } } /// Adds a sender to the collection. /// /// The `interval` is the time period between two pushes on the sender. - pub fn push(&mut self, interval: Duration, sender: TracingUnboundedSender) { - self.entries.push(YieldAfter { + pub fn push(&self, interval: Duration, sender: TracingUnboundedSender) { + let _ = self.entries_tx.unbounded_send(YieldAfter { delay: Delay::new(interval), interval, sender: Some(sender), - }) + }); } - /// Processes all the senders. If any sender is ready, calls the `status_grab` function and - /// pushes what it returns to the sender. + /// Waits until one of the sinks is ready, then returns an object that can be used to send + /// an element on said sink. /// - /// This function doesn't return anything, but it should be treated as if it implicitly - /// returns `Poll::Pending`. In particular, it should be called again when the task - /// is waken up. - /// - /// # Panic - /// - /// Panics if not called within the context of a task. - pub fn poll(&mut self, cx: &mut Context, mut status_grab: impl FnMut() -> T) { + /// If the object isn't used to send an element, the slot is skipped. + pub async fn next(&self) -> ReadySinkEvent<'_, T> { + // This is only ever locked by `next`, which means that one `next` at a time can run. + let mut inner = self.inner.lock().await; + let inner = &mut *inner; + loop { - match Pin::new(&mut self.entries).poll_next(cx) { - Poll::Ready(Some((sender, interval))) => { - let status = status_grab(); - if sender.unbounded_send(status).is_ok() { - self.entries.push(YieldAfter { - // Note that since there's a small delay between the moment a task is - // waken up and the moment it is polled, the period is actually not - // `interval` but `interval + `. We ignore this problem in - // practice. - delay: Delay::new(interval), - interval, - sender: Some(sender), - }); + // Future that produces the next ready entry in `entries`, or doesn't produce anything if + // the list is empty. + let next_ready_entry = { + let entries = &mut inner.entries; + async move { + if let Some(v) = entries.next().await { + v + } else { + loop { + futures::pending!() + } + } + } + }; + + futures::select!{ + new_entry = inner.entries_rx.next() => { + if let Some(new_entry) = new_entry { + inner.entries.push(new_entry); + } + }, + (sender, interval) = next_ready_entry.fuse() => { + return ReadySinkEvent { + sinks: self, + sender: Some(sender), + interval, } } - Poll::Ready(None) | - Poll::Pending => break, } } } } +/// One of the sinks is ready. +#[must_use] +pub struct ReadySinkEvent<'a, T> { + sinks: &'a StatusSinks, + sender: Option>, + interval: Duration, +} + +impl<'a, T> ReadySinkEvent<'a, T> { + /// Sends an element on the sender. + pub fn send(mut self, element: T) { + if let Some(sender) = self.sender.take() { + if sender.unbounded_send(element).is_ok() { + let _ = self.sinks.entries_tx.unbounded_send(YieldAfter { + // Note that since there's a small delay between the moment a task is + // woken up and the moment it is polled, the period is actually not + // `interval` but `interval + `. We ignore this problem in + // practice. + delay: Delay::new(self.interval), + interval: self.interval, + sender: Some(sender), + }); + } + } + } +} + +impl<'a, T> Drop for ReadySinkEvent<'a, T> { + fn drop(&mut self) { + if let Some(sender) = self.sender.take() { + if sender.is_closed() { + return; + } + + let _ = self.sinks.entries_tx.unbounded_send(YieldAfter { + delay: Delay::new(self.interval), + interval: self.interval, + sender: Some(sender), + }); + } + } +} + impl futures::Future for YieldAfter { type Output = (TracingUnboundedSender, Duration); @@ -107,28 +173,30 @@ impl futures::Future for YieldAfter { #[cfg(test)] mod tests { + use crate::mpsc::tracing_unbounded; use super::StatusSinks; use futures::prelude::*; - use crate::mpsc::tracing_unbounded; use std::time::Duration; - use std::task::Poll; #[test] fn works() { // We're not testing that the `StatusSink` properly enforces an order in the intervals, as // this easily causes test failures on busy CPUs. - let mut status_sinks = StatusSinks::new(); + let status_sinks = StatusSinks::new(); - let (tx, rx) = tracing_unbounded("status_sink_test"); + let (tx, rx) = tracing_unbounded("test"); status_sinks.push(Duration::from_millis(100), tx); let mut val_order = 5; futures::executor::block_on(futures::future::select( - futures::future::poll_fn(move |cx| { - status_sinks.poll(cx, || { val_order += 1; val_order }); - Poll::<()>::Pending + Box::pin(async move { + loop { + let ev = status_sinks.next().await; + val_order += 1; + ev.send(val_order); + } }), Box::pin(async { let items: Vec = rx.take(3).collect().await;