diff --git a/bin/node-template/node/src/service.rs b/bin/node-template/node/src/service.rs index d673a54a94882..c71336c330882 100644 --- a/bin/node-template/node/src/service.rs +++ b/bin/node-template/node/src/service.rs @@ -89,7 +89,7 @@ pub fn new_partial( let client = Arc::new(client); let telemetry = telemetry.map(|(worker, telemetry)| { - task_manager.spawn_handle().spawn("telemetry", worker.run()); + task_manager.spawn_handle().spawn("telemetry", None, worker.run()); telemetry }); @@ -289,7 +289,9 @@ pub fn new_full(mut config: Configuration) -> Result // the AURA authoring task is considered essential, i.e. if it // fails we take down the service with it. - task_manager.spawn_essential_handle().spawn_blocking("aura", aura); + task_manager + .spawn_essential_handle() + .spawn_blocking("aura", Some("block-authoring"), aura); } // if the node isn't actively participating in consensus then it doesn't @@ -329,6 +331,7 @@ pub fn new_full(mut config: Configuration) -> Result // if it fails we take down the service with it. task_manager.spawn_essential_handle().spawn_blocking( "grandpa-voter", + None, sc_finality_grandpa::run_grandpa_voter(grandpa_config)?, ); } diff --git a/bin/node/cli/src/service.rs b/bin/node/cli/src/service.rs index 2220614ebaf2a..39307bafadd06 100644 --- a/bin/node/cli/src/service.rs +++ b/bin/node/cli/src/service.rs @@ -170,7 +170,7 @@ pub fn new_partial( let client = Arc::new(client); let telemetry = telemetry.map(|(worker, telemetry)| { - task_manager.spawn_handle().spawn("telemetry", worker.run()); + task_manager.spawn_handle().spawn("telemetry", None, worker.run()); telemetry }); @@ -434,7 +434,11 @@ pub fn new_full_base( }; let babe = sc_consensus_babe::start_babe(babe_config)?; - task_manager.spawn_essential_handle().spawn_blocking("babe-proposer", babe); + task_manager.spawn_essential_handle().spawn_blocking( + "babe-proposer", + Some("block-authoring"), + babe, + ); } // Spawn authority discovery module. @@ -461,9 +465,11 @@ pub fn new_full_base( prometheus_registry.clone(), ); - task_manager - .spawn_handle() - .spawn("authority-discovery-worker", authority_discovery_worker.run()); + task_manager.spawn_handle().spawn( + "authority-discovery-worker", + Some("networking"), + authority_discovery_worker.run(), + ); } // if the node isn't actively participating in consensus then it doesn't @@ -501,9 +507,11 @@ pub fn new_full_base( // the GRANDPA voter task is considered infallible, i.e. // if it fails we take down the service with it. - task_manager - .spawn_essential_handle() - .spawn_blocking("grandpa-voter", grandpa::run_grandpa_voter(grandpa_config)?); + task_manager.spawn_essential_handle().spawn_blocking( + "grandpa-voter", + None, + grandpa::run_grandpa_voter(grandpa_config)?, + ); } network_starter.start_network(); diff --git a/bin/node/testing/src/bench.rs b/bin/node/testing/src/bench.rs index cf0a463cc3e99..5ee1ec998be4d 100644 --- a/bin/node/testing/src/bench.rs +++ b/bin/node/testing/src/bench.rs @@ -243,11 +243,21 @@ impl TaskExecutor { } impl SpawnNamed for TaskExecutor { - fn spawn(&self, _: &'static str, future: futures::future::BoxFuture<'static, ()>) { + fn spawn( + &self, + _: &'static str, + _: Option<&'static str>, + future: futures::future::BoxFuture<'static, ()>, + ) { self.pool.spawn_ok(future); } - fn spawn_blocking(&self, _: &'static str, future: futures::future::BoxFuture<'static, ()>) { + fn spawn_blocking( + &self, + _: &'static str, + _: Option<&'static str>, + future: futures::future::BoxFuture<'static, ()>, + ) { self.pool.spawn_ok(future); } } diff --git a/client/basic-authorship/src/basic_authorship.rs b/client/basic-authorship/src/basic_authorship.rs index 573601a9102c5..305c4d753c1ea 100644 --- a/client/basic-authorship/src/basic_authorship.rs +++ b/client/basic-authorship/src/basic_authorship.rs @@ -270,6 +270,7 @@ where spawn_handle.spawn_blocking( "basic-authorship-proposer", + None, Box::pin(async move { // leave some time for evaluation and block finalization (33%) let deadline = (self.now)() + max_duration - max_duration / 3; diff --git a/client/consensus/common/src/import_queue/basic_queue.rs b/client/consensus/common/src/import_queue/basic_queue.rs index 9042c8798be4f..0461d7cf954cb 100644 --- a/client/consensus/common/src/import_queue/basic_queue.rs +++ b/client/consensus/common/src/import_queue/basic_queue.rs @@ -89,7 +89,11 @@ impl BasicQueue { metrics, ); - spawner.spawn_essential_blocking("basic-block-import-worker", future.boxed()); + spawner.spawn_essential_blocking( + "basic-block-import-worker", + Some("block-import"), + future.boxed(), + ); Self { justification_sender, block_import_sender, result_port, _phantom: PhantomData } } diff --git a/client/executor/src/native_executor.rs b/client/executor/src/native_executor.rs index d912fc0fd13c9..62e76d559c0f2 100644 --- a/client/executor/src/native_executor.rs +++ b/client/executor/src/native_executor.rs @@ -399,6 +399,7 @@ impl RuntimeSpawn for RuntimeInstanceSpawn { let scheduler = self.scheduler.clone(); self.scheduler.spawn( "executor-extra-runtime-instance", + None, Box::pin(async move { let module = AssertUnwindSafe(module); diff --git a/client/offchain/src/lib.rs b/client/offchain/src/lib.rs index a77fd17a2c8b8..2de24e10d927d 100644 --- a/client/offchain/src/lib.rs +++ b/client/offchain/src/lib.rs @@ -226,6 +226,7 @@ pub async fn notification_future( if n.is_new_best { spawner.spawn( "offchain-on-block", + Some("offchain-worker"), offchain .on_block_imported(&n.header, network_provider.clone(), is_validator) .boxed(), diff --git a/client/rpc/src/lib.rs b/client/rpc/src/lib.rs index 832585db4854c..8f951632698fd 100644 --- a/client/rpc/src/lib.rs +++ b/client/rpc/src/lib.rs @@ -54,7 +54,8 @@ impl SubscriptionTaskExecutor { impl Spawn for SubscriptionTaskExecutor { fn spawn_obj(&self, future: FutureObj<'static, ()>) -> Result<(), SpawnError> { - self.0.spawn("substrate-rpc-subscription", future.map(drop).boxed()); + self.0 + .spawn("substrate-rpc-subscription", Some("rpc"), future.map(drop).boxed()); Ok(()) } diff --git a/client/service/src/builder.rs b/client/service/src/builder.rs index bcb05ce743701..88ba6282b5f4e 100644 --- a/client/service/src/builder.rs +++ b/client/service/src/builder.rs @@ -424,6 +424,7 @@ where if let Some(offchain) = offchain_workers.clone() { spawn_handle.spawn( "offchain-notifications", + Some("offchain-worker"), sc_offchain::notification_future( config.role.is_authority(), client.clone(), @@ -505,11 +506,13 @@ where // Inform the tx pool about imported and finalized blocks. spawn_handle.spawn( "txpool-notifications", + Some("transaction-pool"), sc_transaction_pool::notification_future(client.clone(), transaction_pool.clone()), ); spawn_handle.spawn( "on-transaction-imported", + Some("transaction-pool"), transaction_notifications(transaction_pool.clone(), network.clone(), telemetry.clone()), ); @@ -520,6 +523,7 @@ where let metrics = MetricsService::with_prometheus(telemetry.clone(), ®istry, &config)?; spawn_handle.spawn( "prometheus-endpoint", + None, prometheus_endpoint::init_prometheus(port, registry).map(drop), ); @@ -531,6 +535,7 @@ where // Periodically updated metrics and telemetry updates. spawn_handle.spawn( "telemetry-periodic-send", + None, metrics_service.run(client.clone(), transaction_pool.clone(), network.clone()), ); @@ -567,6 +572,7 @@ where // Spawn informant task spawn_handle.spawn( "informant", + None, sc_informant::build( client.clone(), network.clone(), @@ -798,7 +804,7 @@ where config.network.default_peers_set.in_peers as usize + config.network.default_peers_set.out_peers as usize, ); - spawn_handle.spawn("block_request_handler", handler.run()); + spawn_handle.spawn("block-request-handler", Some("networking"), handler.run()); protocol_config } }; @@ -815,7 +821,7 @@ where config.network.default_peers_set.in_peers as usize + config.network.default_peers_set.out_peers as usize, ); - spawn_handle.spawn("state_request_handler", handler.run()); + spawn_handle.spawn("state-request-handler", Some("networking"), handler.run()); protocol_config } }; @@ -828,7 +834,7 @@ where // Allow both outgoing and incoming requests. let (handler, protocol_config) = WarpSyncRequestHandler::new(protocol_id.clone(), provider.clone()); - spawn_handle.spawn("warp_sync_request_handler", handler.run()); + spawn_handle.spawn("warp-sync-request-handler", Some("networking"), handler.run()); protocol_config }; (provider, protocol_config) @@ -842,7 +848,7 @@ where // Allow both outgoing and incoming requests. let (handler, protocol_config) = LightClientRequestHandler::new(&protocol_id, client.clone()); - spawn_handle.spawn("light_client_request_handler", handler.run()); + spawn_handle.spawn("light-client-request-handler", Some("networking"), handler.run()); protocol_config } }; @@ -852,13 +858,13 @@ where executor: { let spawn_handle = Clone::clone(&spawn_handle); Some(Box::new(move |fut| { - spawn_handle.spawn("libp2p-node", fut); + spawn_handle.spawn("libp2p-node", Some("networking"), fut); })) }, transactions_handler_executor: { let spawn_handle = Clone::clone(&spawn_handle); Box::new(move |fut| { - spawn_handle.spawn("network-transactions-handler", fut); + spawn_handle.spawn("network-transactions-handler", Some("networking"), fut); }) }, network_config: config.network.clone(), @@ -920,7 +926,7 @@ where // issue, and ideally we would like to fix the network future to take as little time as // possible, but we also take the extra harm-prevention measure to execute the networking // future using `spawn_blocking`. - spawn_handle.spawn_blocking("network-worker", async move { + spawn_handle.spawn_blocking("network-worker", Some("networking"), async move { if network_start_rx.await.is_err() { log::warn!( "The NetworkStart returned as part of `build_network` has been silently dropped" diff --git a/client/service/src/lib.rs b/client/service/src/lib.rs index ce77be5a7c1d9..bd43d4c464ea0 100644 --- a/client/service/src/lib.rs +++ b/client/service/src/lib.rs @@ -75,7 +75,7 @@ pub use sc_transaction_pool::Options as TransactionPoolOptions; pub use sc_transaction_pool_api::{error::IntoPoolError, InPoolTransaction, TransactionPool}; #[doc(hidden)] pub use std::{ops::Deref, result::Result, sync::Arc}; -pub use task_manager::{SpawnTaskHandle, TaskManager}; +pub use task_manager::{SpawnTaskHandle, TaskManager, DEFAULT_GROUP_NAME}; const DEFAULT_PROTOCOL_ID: &str = "sup"; diff --git a/client/service/src/task_manager/mod.rs b/client/service/src/task_manager/mod.rs index c827aa71dac2c..64c00226073c7 100644 --- a/client/service/src/task_manager/mod.rs +++ b/client/service/src/task_manager/mod.rs @@ -38,6 +38,9 @@ mod prometheus_future; #[cfg(test)] mod tests; +/// Default task group name. +pub const DEFAULT_GROUP_NAME: &'static str = "default"; + /// An handle for spawning tasks in the service. #[derive(Clone)] pub struct SpawnTaskHandle { @@ -48,31 +51,39 @@ pub struct SpawnTaskHandle { } impl SpawnTaskHandle { - /// Spawns the given task with the given name. + /// Spawns the given task with the given name and an optional group name. + /// If group is not specified `DEFAULT_GROUP_NAME` will be used. /// - /// Note that the `name` is a `&'static str`. The reason for this choice is that statistics - /// about this task are getting reported to the Prometheus endpoint (if enabled), and that - /// therefore the set of possible task names must be bounded. + /// Note that the `name`/`group` is a `&'static str`. The reason for this choice is that + /// statistics about this task are getting reported to the Prometheus endpoint (if enabled), and + /// that therefore the set of possible task names must be bounded. /// /// In other words, it would be a bad idea for someone to do for example /// `spawn(format!("{:?}", some_public_key))`. - pub fn spawn(&self, name: &'static str, task: impl Future + Send + 'static) { - self.spawn_inner(name, task, TaskType::Async) + pub fn spawn( + &self, + name: &'static str, + group: Option<&'static str>, + task: impl Future + Send + 'static, + ) { + self.spawn_inner(name, group, task, TaskType::Async) } /// Spawns the blocking task with the given name. See also `spawn`. pub fn spawn_blocking( &self, name: &'static str, + group: Option<&'static str>, task: impl Future + Send + 'static, ) { - self.spawn_inner(name, task, TaskType::Blocking) + self.spawn_inner(name, group, task, TaskType::Blocking) } /// Helper function that implements the spawning logic. See `spawn` and `spawn_blocking`. fn spawn_inner( &self, name: &'static str, + group: Option<&'static str>, task: impl Future + Send + 'static, task_type: TaskType, ) { @@ -83,21 +94,23 @@ impl SpawnTaskHandle { let on_exit = self.on_exit.clone(); let metrics = self.metrics.clone(); + // If no group is specified use default. + let group = group.unwrap_or(DEFAULT_GROUP_NAME); // Note that we increase the started counter here and not within the future. This way, // we could properly visualize on Prometheus situations where the spawning doesn't work. if let Some(metrics) = &self.metrics { - metrics.tasks_spawned.with_label_values(&[name]).inc(); + metrics.tasks_spawned.with_label_values(&[name, group]).inc(); // We do a dummy increase in order for the task to show up in metrics. - metrics.tasks_ended.with_label_values(&[name, "finished"]).inc_by(0); + metrics.tasks_ended.with_label_values(&[name, "finished", group]).inc_by(0); } let future = async move { if let Some(metrics) = metrics { // Add some wrappers around `task`. let task = { - let poll_duration = metrics.poll_duration.with_label_values(&[name]); - let poll_start = metrics.poll_start.with_label_values(&[name]); + let poll_duration = metrics.poll_duration.with_label_values(&[name, group]); + let poll_start = metrics.poll_start.with_label_values(&[name, group]); let inner = prometheus_future::with_poll_durations(poll_duration, poll_start, task); // The logic of `AssertUnwindSafe` here is ok considering that we throw @@ -108,15 +121,15 @@ impl SpawnTaskHandle { match select(on_exit, task).await { Either::Right((Err(payload), _)) => { - metrics.tasks_ended.with_label_values(&[name, "panic"]).inc(); + metrics.tasks_ended.with_label_values(&[name, "panic", group]).inc(); panic::resume_unwind(payload) }, Either::Right((Ok(()), _)) => { - metrics.tasks_ended.with_label_values(&[name, "finished"]).inc(); + metrics.tasks_ended.with_label_values(&[name, "finished", group]).inc(); }, Either::Left(((), _)) => { // The `on_exit` has triggered. - metrics.tasks_ended.with_label_values(&[name, "interrupted"]).inc(); + metrics.tasks_ended.with_label_values(&[name, "interrupted", group]).inc(); }, } } else { @@ -141,12 +154,22 @@ impl SpawnTaskHandle { } impl sp_core::traits::SpawnNamed for SpawnTaskHandle { - fn spawn_blocking(&self, name: &'static str, future: BoxFuture<'static, ()>) { - self.spawn_blocking(name, future); + fn spawn_blocking( + &self, + name: &'static str, + group: Option<&'static str>, + future: BoxFuture<'static, ()>, + ) { + self.spawn_inner(name, group, future, TaskType::Blocking) } - fn spawn(&self, name: &'static str, future: BoxFuture<'static, ()>) { - self.spawn(name, future); + fn spawn( + &self, + name: &'static str, + group: Option<&'static str>, + future: BoxFuture<'static, ()>, + ) { + self.spawn_inner(name, group, future, TaskType::Async) } } @@ -172,8 +195,13 @@ impl SpawnEssentialTaskHandle { /// Spawns the given task with the given name. /// /// See also [`SpawnTaskHandle::spawn`]. - pub fn spawn(&self, name: &'static str, task: impl Future + Send + 'static) { - self.spawn_inner(name, task, TaskType::Async) + pub fn spawn( + &self, + name: &'static str, + group: Option<&'static str>, + task: impl Future + Send + 'static, + ) { + self.spawn_inner(name, group, task, TaskType::Async) } /// Spawns the blocking task with the given name. @@ -182,14 +210,16 @@ impl SpawnEssentialTaskHandle { pub fn spawn_blocking( &self, name: &'static str, + group: Option<&'static str>, task: impl Future + Send + 'static, ) { - self.spawn_inner(name, task, TaskType::Blocking) + self.spawn_inner(name, group, task, TaskType::Blocking) } fn spawn_inner( &self, name: &'static str, + group: Option<&'static str>, task: impl Future + Send + 'static, task_type: TaskType, ) { @@ -199,17 +229,27 @@ impl SpawnEssentialTaskHandle { let _ = essential_failed.close_channel(); }); - let _ = self.inner.spawn_inner(name, essential_task, task_type); + let _ = self.inner.spawn_inner(name, group, essential_task, task_type); } } impl sp_core::traits::SpawnEssentialNamed for SpawnEssentialTaskHandle { - fn spawn_essential_blocking(&self, name: &'static str, future: BoxFuture<'static, ()>) { - self.spawn_blocking(name, future); + fn spawn_essential_blocking( + &self, + name: &'static str, + group: Option<&'static str>, + future: BoxFuture<'static, ()>, + ) { + self.spawn_blocking(name, group, future); } - fn spawn_essential(&self, name: &'static str, future: BoxFuture<'static, ()>) { - self.spawn(name, future); + fn spawn_essential( + &self, + name: &'static str, + group: Option<&'static str>, + future: BoxFuture<'static, ()>, + ) { + self.spawn(name, group, future); } } @@ -396,28 +436,28 @@ impl Metrics { buckets: exponential_buckets(0.001, 4.0, 9) .expect("function parameters are constant and always valid; qed"), }, - &["task_name"] + &["task_name", "task_group"] )?, registry)?, poll_start: register(CounterVec::new( Opts::new( "tasks_polling_started_total", "Total number of times we started invoking Future::poll" ), - &["task_name"] + &["task_name", "task_group"] )?, registry)?, tasks_spawned: register(CounterVec::new( Opts::new( "tasks_spawned_total", "Total number of tasks that have been spawned on the Service" ), - &["task_name"] + &["task_name", "task_group"] )?, registry)?, tasks_ended: register(CounterVec::new( Opts::new( "tasks_ended_total", "Total number of tasks for which Future::poll has returned Ready(()) or panicked" ), - &["task_name", "reason"] + &["task_name", "reason", "task_group"] )?, registry)?, }) } diff --git a/client/service/src/task_manager/tests.rs b/client/service/src/task_manager/tests.rs index 291d71ebaf03b..75092ff2ae62e 100644 --- a/client/service/src/task_manager/tests.rs +++ b/client/service/src/task_manager/tests.rs @@ -96,8 +96,8 @@ fn ensure_tasks_are_awaited_on_shutdown() { let task_manager = new_task_manager(handle); let spawn_handle = task_manager.spawn_handle(); let drop_tester = DropTester::new(); - spawn_handle.spawn("task1", run_background_task(drop_tester.new_ref())); - spawn_handle.spawn("task2", run_background_task(drop_tester.new_ref())); + spawn_handle.spawn("task1", None, run_background_task(drop_tester.new_ref())); + spawn_handle.spawn("task2", None, run_background_task(drop_tester.new_ref())); assert_eq!(drop_tester, 2); // allow the tasks to even start runtime.block_on(async { tokio::time::sleep(Duration::from_secs(1)).await }); @@ -115,7 +115,7 @@ fn ensure_keep_alive_during_shutdown() { let spawn_handle = task_manager.spawn_handle(); let drop_tester = DropTester::new(); task_manager.keep_alive(drop_tester.new_ref()); - spawn_handle.spawn("task1", run_background_task(())); + spawn_handle.spawn("task1", None, run_background_task(())); assert_eq!(drop_tester, 1); // allow the tasks to even start runtime.block_on(async { tokio::time::sleep(Duration::from_secs(1)).await }); @@ -134,10 +134,12 @@ fn ensure_blocking_futures_are_awaited_on_shutdown() { let drop_tester = DropTester::new(); spawn_handle.spawn( "task1", + None, run_background_task_blocking(Duration::from_secs(3), drop_tester.new_ref()), ); spawn_handle.spawn( "task2", + None, run_background_task_blocking(Duration::from_secs(3), drop_tester.new_ref()), ); assert_eq!(drop_tester, 2); @@ -156,14 +158,14 @@ fn ensure_no_task_can_be_spawn_after_terminate() { let mut task_manager = new_task_manager(handle); let spawn_handle = task_manager.spawn_handle(); let drop_tester = DropTester::new(); - spawn_handle.spawn("task1", run_background_task(drop_tester.new_ref())); - spawn_handle.spawn("task2", run_background_task(drop_tester.new_ref())); + spawn_handle.spawn("task1", None, run_background_task(drop_tester.new_ref())); + spawn_handle.spawn("task2", None, run_background_task(drop_tester.new_ref())); assert_eq!(drop_tester, 2); // allow the tasks to even start runtime.block_on(async { tokio::time::sleep(Duration::from_secs(1)).await }); assert_eq!(drop_tester, 2); task_manager.terminate(); - spawn_handle.spawn("task3", run_background_task(drop_tester.new_ref())); + spawn_handle.spawn("task3", None, run_background_task(drop_tester.new_ref())); runtime.block_on(task_manager.clean_shutdown()); drop_tester.wait_on_drop(); } @@ -176,8 +178,8 @@ fn ensure_task_manager_future_ends_when_task_manager_terminated() { let mut task_manager = new_task_manager(handle); let spawn_handle = task_manager.spawn_handle(); let drop_tester = DropTester::new(); - spawn_handle.spawn("task1", run_background_task(drop_tester.new_ref())); - spawn_handle.spawn("task2", run_background_task(drop_tester.new_ref())); + spawn_handle.spawn("task1", None, run_background_task(drop_tester.new_ref())); + spawn_handle.spawn("task2", None, run_background_task(drop_tester.new_ref())); assert_eq!(drop_tester, 2); // allow the tasks to even start runtime.block_on(async { tokio::time::sleep(Duration::from_secs(1)).await }); @@ -197,13 +199,13 @@ fn ensure_task_manager_future_ends_with_error_when_essential_task_fails() { let spawn_handle = task_manager.spawn_handle(); let spawn_essential_handle = task_manager.spawn_essential_handle(); let drop_tester = DropTester::new(); - spawn_handle.spawn("task1", run_background_task(drop_tester.new_ref())); - spawn_handle.spawn("task2", run_background_task(drop_tester.new_ref())); + spawn_handle.spawn("task1", None, run_background_task(drop_tester.new_ref())); + spawn_handle.spawn("task2", None, run_background_task(drop_tester.new_ref())); assert_eq!(drop_tester, 2); // allow the tasks to even start runtime.block_on(async { tokio::time::sleep(Duration::from_secs(1)).await }); assert_eq!(drop_tester, 2); - spawn_essential_handle.spawn("task3", async { panic!("task failed") }); + spawn_essential_handle.spawn("task3", None, async { panic!("task failed") }); runtime .block_on(task_manager.future()) .expect_err("future()'s Result must be Err"); @@ -226,10 +228,10 @@ fn ensure_children_tasks_ends_when_task_manager_terminated() { task_manager.add_child(child_2); let spawn_handle = task_manager.spawn_handle(); let drop_tester = DropTester::new(); - spawn_handle.spawn("task1", run_background_task(drop_tester.new_ref())); - spawn_handle.spawn("task2", run_background_task(drop_tester.new_ref())); - spawn_handle_child_1.spawn("task3", run_background_task(drop_tester.new_ref())); - spawn_handle_child_2.spawn("task4", run_background_task(drop_tester.new_ref())); + spawn_handle.spawn("task1", None, run_background_task(drop_tester.new_ref())); + spawn_handle.spawn("task2", None, run_background_task(drop_tester.new_ref())); + spawn_handle_child_1.spawn("task3", None, run_background_task(drop_tester.new_ref())); + spawn_handle_child_2.spawn("task4", None, run_background_task(drop_tester.new_ref())); assert_eq!(drop_tester, 4); // allow the tasks to even start runtime.block_on(async { tokio::time::sleep(Duration::from_secs(1)).await }); @@ -255,15 +257,15 @@ fn ensure_task_manager_future_ends_with_error_when_childs_essential_task_fails() task_manager.add_child(child_2); let spawn_handle = task_manager.spawn_handle(); let drop_tester = DropTester::new(); - spawn_handle.spawn("task1", run_background_task(drop_tester.new_ref())); - spawn_handle.spawn("task2", run_background_task(drop_tester.new_ref())); - spawn_handle_child_1.spawn("task3", run_background_task(drop_tester.new_ref())); - spawn_handle_child_2.spawn("task4", run_background_task(drop_tester.new_ref())); + spawn_handle.spawn("task1", None, run_background_task(drop_tester.new_ref())); + spawn_handle.spawn("task2", None, run_background_task(drop_tester.new_ref())); + spawn_handle_child_1.spawn("task3", None, run_background_task(drop_tester.new_ref())); + spawn_handle_child_2.spawn("task4", None, run_background_task(drop_tester.new_ref())); assert_eq!(drop_tester, 4); // allow the tasks to even start runtime.block_on(async { tokio::time::sleep(Duration::from_secs(1)).await }); assert_eq!(drop_tester, 4); - spawn_essential_handle_child_1.spawn("task5", async { panic!("task failed") }); + spawn_essential_handle_child_1.spawn("task5", None, async { panic!("task failed") }); runtime .block_on(task_manager.future()) .expect_err("future()'s Result must be Err"); @@ -286,15 +288,15 @@ fn ensure_task_manager_future_continues_when_childs_not_essential_task_fails() { task_manager.add_child(child_2); let spawn_handle = task_manager.spawn_handle(); let drop_tester = DropTester::new(); - spawn_handle.spawn("task1", run_background_task(drop_tester.new_ref())); - spawn_handle.spawn("task2", run_background_task(drop_tester.new_ref())); - spawn_handle_child_1.spawn("task3", run_background_task(drop_tester.new_ref())); - spawn_handle_child_2.spawn("task4", run_background_task(drop_tester.new_ref())); + spawn_handle.spawn("task1", None, run_background_task(drop_tester.new_ref())); + spawn_handle.spawn("task2", None, run_background_task(drop_tester.new_ref())); + spawn_handle_child_1.spawn("task3", None, run_background_task(drop_tester.new_ref())); + spawn_handle_child_2.spawn("task4", None, run_background_task(drop_tester.new_ref())); assert_eq!(drop_tester, 4); // allow the tasks to even start runtime.block_on(async { tokio::time::sleep(Duration::from_secs(1)).await }); assert_eq!(drop_tester, 4); - spawn_handle_child_1.spawn("task5", async { panic!("task failed") }); + spawn_handle_child_1.spawn("task5", None, async { panic!("task failed") }); runtime.block_on(async { let t1 = task_manager.future().fuse(); let t2 = tokio::time::sleep(Duration::from_secs(3)).fuse(); diff --git a/client/transaction-pool/src/api.rs b/client/transaction-pool/src/api.rs index a735c67d846ce..8af0ea98f8100 100644 --- a/client/transaction-pool/src/api.rs +++ b/client/transaction-pool/src/api.rs @@ -64,6 +64,7 @@ fn spawn_validation_pool_task( ) { spawner.spawn_essential_blocking( name, + Some("transaction-pool"), async move { loop { let task = receiver.lock().await.next().await; diff --git a/client/transaction-pool/src/lib.rs b/client/transaction-pool/src/lib.rs index 8af73c3fe5b48..3565cb52ad87b 100644 --- a/client/transaction-pool/src/lib.rs +++ b/client/transaction-pool/src/lib.rs @@ -217,7 +217,7 @@ where }; if let Some(background_task) = background_task { - spawner.spawn_essential("txpool-background", background_task); + spawner.spawn_essential("txpool-background", Some("transaction-pool"), background_task); } Self { diff --git a/primitives/core/src/testing.rs b/primitives/core/src/testing.rs index a7fff0def83f2..a40a37804c031 100644 --- a/primitives/core/src/testing.rs +++ b/primitives/core/src/testing.rs @@ -152,10 +152,20 @@ impl Default for TaskExecutor { #[cfg(feature = "std")] impl crate::traits::SpawnNamed for TaskExecutor { - fn spawn_blocking(&self, _: &'static str, future: futures::future::BoxFuture<'static, ()>) { + fn spawn_blocking( + &self, + _name: &'static str, + _group: Option<&'static str>, + future: futures::future::BoxFuture<'static, ()>, + ) { self.0.spawn_ok(future); } - fn spawn(&self, _: &'static str, future: futures::future::BoxFuture<'static, ()>) { + fn spawn( + &self, + _name: &'static str, + _group: Option<&'static str>, + future: futures::future::BoxFuture<'static, ()>, + ) { self.0.spawn_ok(future); } } @@ -165,11 +175,17 @@ impl crate::traits::SpawnEssentialNamed for TaskExecutor { fn spawn_essential_blocking( &self, _: &'static str, + _: Option<&'static str>, future: futures::future::BoxFuture<'static, ()>, ) { self.0.spawn_ok(future); } - fn spawn_essential(&self, _: &'static str, future: futures::future::BoxFuture<'static, ()>) { + fn spawn_essential( + &self, + _: &'static str, + _: Option<&'static str>, + future: futures::future::BoxFuture<'static, ()>, + ) { self.0.spawn_ok(future); } } diff --git a/primitives/core/src/traits.rs b/primitives/core/src/traits.rs index 47639f9d87ba6..e3d7d8e283e21 100644 --- a/primitives/core/src/traits.rs +++ b/primitives/core/src/traits.rs @@ -190,58 +190,91 @@ sp_externalities::decl_extension! { pub struct RuntimeSpawnExt(Box); } -/// Something that can spawn tasks (blocking and non-blocking) with an assigned name. +/// Something that can spawn tasks (blocking and non-blocking) with an assigned name +/// and optional group. #[dyn_clonable::clonable] pub trait SpawnNamed: Clone + Send + Sync { /// Spawn the given blocking future. /// - /// The given `name` is used to identify the future in tracing. - fn spawn_blocking(&self, name: &'static str, future: futures::future::BoxFuture<'static, ()>); + /// The given `group` and `name` is used to identify the future in tracing. + fn spawn_blocking( + &self, + name: &'static str, + group: Option<&'static str>, + future: futures::future::BoxFuture<'static, ()>, + ); /// Spawn the given non-blocking future. /// - /// The given `name` is used to identify the future in tracing. - fn spawn(&self, name: &'static str, future: futures::future::BoxFuture<'static, ()>); + /// The given `group` and `name` is used to identify the future in tracing. + fn spawn( + &self, + name: &'static str, + group: Option<&'static str>, + future: futures::future::BoxFuture<'static, ()>, + ); } impl SpawnNamed for Box { - fn spawn_blocking(&self, name: &'static str, future: futures::future::BoxFuture<'static, ()>) { - (**self).spawn_blocking(name, future) + fn spawn_blocking( + &self, + name: &'static str, + group: Option<&'static str>, + future: futures::future::BoxFuture<'static, ()>, + ) { + (**self).spawn_blocking(name, group, future) } - - fn spawn(&self, name: &'static str, future: futures::future::BoxFuture<'static, ()>) { - (**self).spawn(name, future) + fn spawn( + &self, + name: &'static str, + group: Option<&'static str>, + future: futures::future::BoxFuture<'static, ()>, + ) { + (**self).spawn(name, group, future) } } -/// Something that can spawn essential tasks (blocking and non-blocking) with an assigned name. +/// Something that can spawn essential tasks (blocking and non-blocking) with an assigned name +/// and optional group. /// /// Essential tasks are special tasks that should take down the node when they end. #[dyn_clonable::clonable] pub trait SpawnEssentialNamed: Clone + Send + Sync { /// Spawn the given blocking future. /// - /// The given `name` is used to identify the future in tracing. + /// The given `group` and `name` is used to identify the future in tracing. fn spawn_essential_blocking( &self, name: &'static str, + group: Option<&'static str>, future: futures::future::BoxFuture<'static, ()>, ); /// Spawn the given non-blocking future. /// - /// The given `name` is used to identify the future in tracing. - fn spawn_essential(&self, name: &'static str, future: futures::future::BoxFuture<'static, ()>); + /// The given `group` and `name` is used to identify the future in tracing. + fn spawn_essential( + &self, + name: &'static str, + group: Option<&'static str>, + future: futures::future::BoxFuture<'static, ()>, + ); } impl SpawnEssentialNamed for Box { fn spawn_essential_blocking( &self, name: &'static str, + group: Option<&'static str>, future: futures::future::BoxFuture<'static, ()>, ) { - (**self).spawn_essential_blocking(name, future) + (**self).spawn_essential_blocking(name, group, future) } - fn spawn_essential(&self, name: &'static str, future: futures::future::BoxFuture<'static, ()>) { - (**self).spawn_essential(name, future) + fn spawn_essential( + &self, + name: &'static str, + group: Option<&'static str>, + future: futures::future::BoxFuture<'static, ()>, + ) { + (**self).spawn_essential(name, group, future) } } diff --git a/primitives/io/src/batch_verifier.rs b/primitives/io/src/batch_verifier.rs index b6da1d85907bd..05c8a63694eb3 100644 --- a/primitives/io/src/batch_verifier.rs +++ b/primitives/io/src/batch_verifier.rs @@ -74,6 +74,7 @@ impl BatchVerifier { self.scheduler.spawn( name, + None, async move { if !f() { invalid_clone.store(true, AtomicOrdering::Relaxed); @@ -177,7 +178,8 @@ impl BatchVerifier { if pending.len() > 0 { let (sender, receiver) = std::sync::mpsc::channel(); self.scheduler.spawn( - "substrate_batch_verify_join", + "substrate-batch-verify-join", + None, async move { futures::future::join_all(pending).await; sender.send(()).expect( diff --git a/primitives/tasks/src/lib.rs b/primitives/tasks/src/lib.rs index e9c80ae5ff4c8..c874bb98e1ae6 100644 --- a/primitives/tasks/src/lib.rs +++ b/primitives/tasks/src/lib.rs @@ -95,6 +95,7 @@ mod inner { let extra_scheduler = scheduler.clone(); scheduler.spawn( "parallel-runtime-spawn", + Some("substrate-runtime"), Box::pin(async move { let result = match crate::new_async_externalities(extra_scheduler) { Ok(mut ext) => { diff --git a/test-utils/test-runner/src/client.rs b/test-utils/test-runner/src/client.rs index 58c4cf6503a93..27c04c40fe6fe 100644 --- a/test-utils/test-runner/src/client.rs +++ b/test-utils/test-runner/src/client.rs @@ -235,7 +235,9 @@ where }); // spawn the authorship task as an essential task. - task_manager.spawn_essential_handle().spawn("manual-seal", authorship_future); + task_manager + .spawn_essential_handle() + .spawn("manual-seal", None, authorship_future); network_starter.start_network(); let rpc_handler = rpc_handlers.io_handler();