diff --git a/Cargo.lock b/Cargo.lock index 6720e9c658cb3..d4879c42c30ea 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1513,6 +1513,21 @@ dependencies = [ "num_cpus 1.11.1 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "futures-diagnose" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "futures 0.1.29 (registry+https://github.com/rust-lang/crates.io-index)", + "futures 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)", + "lazy_static 1.4.0 (registry+https://github.com/rust-lang/crates.io-index)", + "log 0.4.10 (registry+https://github.com/rust-lang/crates.io-index)", + "parking_lot 0.9.0 (registry+https://github.com/rust-lang/crates.io-index)", + "pin-project 0.4.6 (registry+https://github.com/rust-lang/crates.io-index)", + "serde 1.0.104 (registry+https://github.com/rust-lang/crates.io-index)", + "serde_json 1.0.44 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "futures-executor" version = "0.3.1" @@ -5834,6 +5849,7 @@ dependencies = [ "exit-future 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)", "futures 0.1.29 (registry+https://github.com/rust-lang/crates.io-index)", "futures 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)", + "futures-diagnose 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)", "futures-timer 2.0.2 (registry+https://github.com/rust-lang/crates.io-index)", "grafana-data-source 0.8.0", "lazy_static 1.4.0 (registry+https://github.com/rust-lang/crates.io-index)", @@ -5968,6 +5984,7 @@ version = "2.0.0" dependencies = [ "derive_more 0.99.2 (registry+https://github.com/rust-lang/crates.io-index)", "futures 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)", + "futures-diagnose 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.4.10 (registry+https://github.com/rust-lang/crates.io-index)", "parity-scale-codec 1.1.2 (registry+https://github.com/rust-lang/crates.io-index)", "parking_lot 0.9.0 (registry+https://github.com/rust-lang/crates.io-index)", @@ -6414,6 +6431,7 @@ version = "0.8.0" dependencies = [ "derive_more 0.99.2 (registry+https://github.com/rust-lang/crates.io-index)", "futures 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)", + "futures-diagnose 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)", "futures-timer 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)", "libp2p 0.15.0 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.4.10 (registry+https://github.com/rust-lang/crates.io-index)", @@ -7699,7 +7717,7 @@ name = "twox-hash" version = "1.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" dependencies = [ - "rand 0.6.5 (registry+https://github.com/rust-lang/crates.io-index)", + "rand 0.7.3 (registry+https://github.com/rust-lang/crates.io-index)", ] [[package]] @@ -8527,6 +8545,7 @@ dependencies = [ "checksum futures-core 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)" = "79564c427afefab1dfb3298535b21eda083ef7935b4f0ecbfcb121f0aec10866" "checksum futures-core-preview 0.3.0-alpha.19 (registry+https://github.com/rust-lang/crates.io-index)" = "b35b6263fb1ef523c3056565fa67b1d16f0a8604ff12b11b08c25f28a734c60a" "checksum futures-cpupool 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)" = "ab90cde24b3319636588d0c35fe03b1333857621051837ed769faefb4c2162e4" +"checksum futures-diagnose 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)" = "ebbb8371dd6ee87aa2aeaa8458a372fd82fe216032387b766255754c92dd7271" "checksum futures-executor 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)" = "1e274736563f686a837a0568b478bdabfeaec2dca794b5649b04e2fe1627c231" "checksum futures-io 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)" = "e676577d229e70952ab25f3945795ba5b16d63ca794ca9d2c860e5595d20b5ff" "checksum futures-macro 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)" = "52e7c56c15537adb4f76d0b7a76ad131cb4d2f4f32d3b0bcabcbe1c7c5e87764" diff --git a/bin/node-template/src/service.rs b/bin/node-template/src/service.rs index 8a5c660b23c7a..a396a5301c5c1 100644 --- a/bin/node-template/src/service.rs +++ b/bin/node-template/src/service.rs @@ -131,7 +131,7 @@ pub fn new_full(config: Configuration(config: Configuration { // start the lightweight GRANDPA observer - service.spawn_task(grandpa::run_grandpa_observer( + service.spawn_task("grandpa-observer", grandpa::run_grandpa_observer( grandpa_config, grandpa_link, service.network(), @@ -178,7 +178,7 @@ pub fn new_full(config: Configuration { grandpa::setup_disabled_grandpa( diff --git a/bin/node/cli/src/service.rs b/bin/node/cli/src/service.rs index 2aaca315b9058..32f2d0001c7c2 100644 --- a/bin/node/cli/src/service.rs +++ b/bin/node/cli/src/service.rs @@ -172,7 +172,7 @@ macro_rules! new_full { }; let babe = sc_consensus_babe::start_babe(babe_config)?; - service.spawn_essential_task(babe); + service.spawn_essential_task("babe-proposer", babe); let network = service.network(); let dht_event_stream = network.event_stream().filter_map(|e| async move { match e { @@ -187,7 +187,7 @@ macro_rules! new_full { dht_event_stream, ); - service.spawn_task(authority_discovery); + service.spawn_task("authority-discovery", authority_discovery); } // if the node isn't actively participating in consensus then it doesn't @@ -211,7 +211,7 @@ macro_rules! new_full { match (is_authority, disable_grandpa) { (false, false) => { // start the lightweight GRANDPA observer - service.spawn_task(grandpa::run_grandpa_observer( + service.spawn_task("grandpa-observer", grandpa::run_grandpa_observer( config, grandpa_link, service.network(), @@ -234,6 +234,7 @@ macro_rules! new_full { // the GRANDPA voter task is considered infallible, i.e. // if it fails we take down the service with it. service.spawn_essential_task( + "grandpa-voter", grandpa::run_grandpa_voter(grandpa_config)? ); }, diff --git a/client/service/Cargo.toml b/client/service/Cargo.toml index 095136a11c060..f5b3b71914d8d 100644 --- a/client/service/Cargo.toml +++ b/client/service/Cargo.toml @@ -17,6 +17,7 @@ wasmtime = [ derive_more = "0.99.2" futures01 = { package = "futures", version = "0.1.29" } futures = "0.3.1" +futures-diagnose = "1.0" parking_lot = "0.9.0" lazy_static = "1.4.0" log = "0.4.8" diff --git a/client/service/src/builder.rs b/client/service/src/builder.rs index 9b27a5b2701b0..1ea0df3ec723d 100644 --- a/client/service/src/builder.rs +++ b/client/service/src/builder.rs @@ -44,6 +44,7 @@ use sp_runtime::traits::{ use sp_api::ProvideRuntimeApi; use sc_executor::{NativeExecutor, NativeExecutionDispatch}; use std::{ + borrow::Cow, io::{Read, Write, Seek}, marker::PhantomData, sync::Arc, time::SystemTime, pin::Pin }; @@ -791,7 +792,7 @@ ServiceBuilder< // List of asynchronous tasks to spawn. We collect them, then spawn them all at once. let (to_spawn_tx, to_spawn_rx) = - mpsc::unbounded:: + Send>>>(); + mpsc::unbounded::<(Pin + Send>>, Cow<'static, str>)>(); // A side-channel for essential tasks to communicate shutdown. let (essential_failed_tx, essential_failed_rx) = mpsc::unbounded(); @@ -816,7 +817,7 @@ ServiceBuilder< imports_external_transactions: !config.roles.is_light(), pool: transaction_pool.clone(), client: client.clone(), - executor: Arc::new(SpawnTaskHandle { sender: to_spawn_tx.clone(), on_exit: exit.clone() }), + executor: SpawnTaskHandle { sender: to_spawn_tx.clone(), on_exit: exit.clone() }, }); let protocol_id = { @@ -840,7 +841,7 @@ ServiceBuilder< executor: { let to_spawn_tx = to_spawn_tx.clone(); Some(Box::new(move |fut| { - if let Err(e) = to_spawn_tx.unbounded_send(fut) { + if let Err(e) = to_spawn_tx.unbounded_send((fut, From::from("libp2p-node"))) { error!("Failed to spawn libp2p background task: {:?}", e); } })) @@ -891,7 +892,10 @@ ServiceBuilder< &BlockId::hash(notification.hash), ¬ification.retracted, ); - let _ = to_spawn_tx_.unbounded_send(Box::pin(future)); + let _ = to_spawn_tx_.unbounded_send(( + Box::pin(future), + From::from("txpool-maintain") + )); } let offchain = offchain.as_ref().and_then(|o| o.upgrade()); @@ -901,12 +905,18 @@ ServiceBuilder< network_state_info.clone(), is_validator ); - let _ = to_spawn_tx_.unbounded_send(Box::pin(future)); + let _ = to_spawn_tx_.unbounded_send(( + Box::pin(future), + From::from("offchain-on-block") + )); } ready(()) }); - let _ = to_spawn_tx.unbounded_send(Box::pin(select(events, exit.clone()).map(drop))); + let _ = to_spawn_tx.unbounded_send(( + Box::pin(select(events, exit.clone()).map(drop)), + From::from("txpool-and-offchain-notif") + )); } { @@ -926,7 +936,10 @@ ServiceBuilder< ready(()) }); - let _ = to_spawn_tx.unbounded_send(Box::pin(select(events, exit.clone()).map(drop))); + let _ = to_spawn_tx.unbounded_send(( + Box::pin(select(events, exit.clone()).map(drop)), + From::from("telemetry-on-block") + )); } // Periodically notify the telemetry. @@ -990,7 +1003,10 @@ ServiceBuilder< ready(()) }); - let _ = to_spawn_tx.unbounded_send(Box::pin(select(tel_task, exit.clone()).map(drop))); + let _ = to_spawn_tx.unbounded_send(( + Box::pin(select(tel_task, exit.clone()).map(drop)), + From::from("telemetry-periodic-send") + )); // Periodically send the network state to the telemetry. let (netstat_tx, netstat_rx) = mpsc::unbounded::<(NetworkStatus<_>, NetworkState)>(); @@ -1003,7 +1019,10 @@ ServiceBuilder< ); ready(()) }); - let _ = to_spawn_tx.unbounded_send(Box::pin(select(tel_task_2, exit.clone()).map(drop))); + let _ = to_spawn_tx.unbounded_send(( + Box::pin(select(tel_task_2, exit.clone()).map(drop)), + From::from("telemetry-periodic-network-state") + )); // RPC let (system_rpc_tx, system_rpc_rx) = mpsc::unbounded(); @@ -1066,14 +1085,17 @@ ServiceBuilder< let rpc = start_rpc_servers(&config, gen_handler)?; - let _ = to_spawn_tx.unbounded_send(Box::pin(select(build_network_future( - config.roles, - network_mut, - client.clone(), - network_status_sinks.clone(), - system_rpc_rx, - has_bootnodes, - ), exit.clone()).map(drop))); + let _ = to_spawn_tx.unbounded_send(( + Box::pin(select(build_network_future( + config.roles, + network_mut, + client.clone(), + network_status_sinks.clone(), + system_rpc_rx, + has_bootnodes, + ), exit.clone()).map(drop)), + From::from("network-worker") + )); let telemetry_connection_sinks: Arc>>> = Default::default(); @@ -1114,9 +1136,9 @@ ServiceBuilder< }); ready(()) }); - let _ = to_spawn_tx.unbounded_send(Box::pin(select( + let _ = to_spawn_tx.unbounded_send((Box::pin(select( future, exit.clone() - ).map(drop))); + ).map(drop)), From::from("telemetry-worker"))); telemetry }); @@ -1127,7 +1149,7 @@ ServiceBuilder< exit.clone() ).map(drop); - let _ = to_spawn_tx.unbounded_send(Box::pin(future)); + let _ = to_spawn_tx.unbounded_send((Box::pin(future), From::from("grafana-server"))); } // Instrumentation diff --git a/client/service/src/lib.rs b/client/service/src/lib.rs index c7069ac6e5818..3bd8f6d290416 100644 --- a/client/service/src/lib.rs +++ b/client/service/src/lib.rs @@ -27,7 +27,7 @@ pub mod error; mod builder; mod status_sinks; -use std::{io, pin::Pin}; +use std::{borrow::Cow, io, pin::Pin}; use std::marker::PhantomData; use std::net::SocketAddr; use std::collections::HashMap; @@ -42,7 +42,7 @@ use futures::{ future::select, channel::mpsc, compat::*, sink::SinkExt, - task::{Spawn, SpawnExt, FutureObj, SpawnError}, + task::{Spawn, FutureObj, SpawnError}, }; use sc_network::{ NetworkService, NetworkState, specialization::NetworkSpecialization, @@ -92,9 +92,9 @@ pub struct Service { /// A receiver for spawned essential-tasks concluding. essential_failed_rx: mpsc::UnboundedReceiver<()>, /// Sender for futures that must be spawned as background tasks. - to_spawn_tx: mpsc::UnboundedSender + Send>>>, + to_spawn_tx: mpsc::UnboundedSender<(Pin + Send>>, Cow<'static, str>)>, /// Receiver for futures that must be spawned as background tasks. - to_spawn_rx: mpsc::UnboundedReceiver + Send>>>, + to_spawn_rx: mpsc::UnboundedReceiver<(Pin + Send>>, Cow<'static, str>)>, /// How to spawn background tasks. tasks_executor: Box + Send>>) + Send>, rpc_handlers: sc_rpc_server::RpcHandler, @@ -112,15 +112,29 @@ pub type TaskExecutor = Arc; /// An handle for spawning tasks in the service. #[derive(Clone)] pub struct SpawnTaskHandle { - sender: mpsc::UnboundedSender + Send>>>, + sender: mpsc::UnboundedSender<(Pin + Send>>, Cow<'static, str>)>, on_exit: exit_future::Exit, } +impl SpawnTaskHandle { + /// Spawns the given task with the given name. + pub fn spawn(&self, name: impl Into>, task: impl Future + Send + 'static) { + let on_exit = self.on_exit.clone(); + let future = async move { + futures::pin_mut!(task); + let _ = select(on_exit, task).await; + }; + if self.sender.unbounded_send((Box::pin(future), name.into())).is_err() { + error!("Failed to send task to spawn over channel"); + } + } +} + impl Spawn for SpawnTaskHandle { fn spawn_obj(&self, future: FutureObj<'static, ()>) -> Result<(), SpawnError> { let future = select(self.on_exit.clone(), future).map(drop); - self.sender.unbounded_send(Box::pin(future)) + self.sender.unbounded_send((Box::pin(future), From::from("unnamed"))) .map_err(|_| SpawnError::shutdown()) } } @@ -129,7 +143,7 @@ type Boxed01Future01 = Box + Send + impl futures01::future::Executor for SpawnTaskHandle { fn execute(&self, future: Boxed01Future01) -> Result<(), futures01::future::ExecuteError>{ - self.spawn(future.compat().map(drop)); + self.spawn("unnamed", future.compat().map(drop)); Ok(()) } } @@ -159,12 +173,12 @@ pub trait AbstractService: 'static + Future> + fn telemetry(&self) -> Option; /// Spawns a task in the background that runs the future passed as parameter. - fn spawn_task(&self, task: impl Future + Send + Unpin + 'static); + fn spawn_task(&self, name: impl Into>, task: impl Future + Send + 'static); /// Spawns a task in the background that runs the future passed as /// parameter. The given task is considered essential, i.e. if it errors we /// trigger a service exit. - fn spawn_essential_task(&self, task: impl Future + Send + Unpin + 'static); + fn spawn_essential_task(&self, name: impl Into>, task: impl Future + Send + 'static); /// Returns a handle for spawning tasks. fn spawn_task_handle(&self) -> SpawnTaskHandle; @@ -238,12 +252,16 @@ where self.keystore.clone() } - fn spawn_task(&self, task: impl Future + Send + Unpin + 'static) { - let task = select(self.on_exit(), task).map(drop); - let _ = self.to_spawn_tx.unbounded_send(Box::pin(task)); + fn spawn_task(&self, name: impl Into>, task: impl Future + Send + 'static) { + let on_exit = self.on_exit(); + let task = async move { + futures::pin_mut!(task); + let _ = select(on_exit, task).await; + }; + let _ = self.to_spawn_tx.unbounded_send((Box::pin(task), name.into())); } - fn spawn_essential_task(&self, task: impl Future + Send + Unpin + 'static) { + fn spawn_essential_task(&self, name: impl Into>, task: impl Future + Send + 'static) { let mut essential_failed = self.essential_failed_tx.clone(); let essential_task = std::panic::AssertUnwindSafe(task) .catch_unwind() @@ -251,9 +269,13 @@ where error!("Essential task failed. Shutting down service."); let _ = essential_failed.send(()); }); - let task = select(self.on_exit(), essential_task).map(drop); + let on_exit = self.on_exit(); + let task = async move { + futures::pin_mut!(essential_task); + let _ = select(on_exit, essential_task).await; + }; - let _ = self.to_spawn_tx.unbounded_send(Box::pin(task)); + let _ = self.to_spawn_tx.unbounded_send((Box::pin(task), name.into())); } fn spawn_task_handle(&self) -> SpawnTaskHandle { @@ -317,8 +339,8 @@ impl Future for } } - while let Poll::Ready(Some(task_to_spawn)) = Pin::new(&mut this.to_spawn_rx).poll_next(cx) { - (this.tasks_executor)(task_to_spawn); + while let Poll::Ready(Some((task_to_spawn, name))) = Pin::new(&mut this.to_spawn_rx).poll_next(cx) { + (this.tasks_executor)(Box::pin(futures_diagnose::diagnose(name, task_to_spawn))); } // The service future never ends. @@ -333,7 +355,7 @@ impl Spawn for &self, future: FutureObj<'static, ()> ) -> Result<(), SpawnError> { - self.to_spawn_tx.unbounded_send(Box::pin(future)) + self.to_spawn_tx.unbounded_send((Box::pin(future), From::from("unnamed"))) .map_err(|_| SpawnError::shutdown()) } } @@ -575,7 +597,7 @@ pub struct TransactionPoolAdapter { imports_external_transactions: bool, pool: Arc

, client: Arc, - executor: TaskExecutor, + executor: SpawnTaskHandle, } /// Get transactions for propagation. @@ -649,9 +671,7 @@ where } }); - if let Err(e) = self.executor.spawn(Box::new(import_future)) { - warn!("Error scheduling extrinsic import: {:?}", e); - } + self.executor.spawn("extrinsic-import", import_future); } Err(e) => debug!("Error decoding transaction {}", e), } diff --git a/client/transaction-pool/Cargo.toml b/client/transaction-pool/Cargo.toml index c3cd8c9838d71..855c7bd40d69e 100644 --- a/client/transaction-pool/Cargo.toml +++ b/client/transaction-pool/Cargo.toml @@ -8,6 +8,7 @@ edition = "2018" codec = { package = "parity-scale-codec", version = "1.0.0" } derive_more = "0.99.2" futures = { version = "0.3.1", features = ["compat"] } +futures-diagnose = "1.0" log = "0.4.8" parking_lot = "0.9.0" sp-core = { path = "../../primitives/core" } diff --git a/client/transaction-pool/src/api.rs b/client/transaction-pool/src/api.rs index 1bf6348214842..bfc13c01fdf53 100644 --- a/client/transaction-pool/src/api.rs +++ b/client/transaction-pool/src/api.rs @@ -87,13 +87,13 @@ impl sc_transaction_graph::ChainApi for FullChainApi BasicQueue { let manual_poll; if let Some(pool) = &mut pool { - pool.spawn_ok(future); + pool.spawn_ok(futures_diagnose::diagnose("import-queue", future)); manual_poll = None; } else { manual_poll = Some(Box::pin(future) as Pin>);