From 5597eedb8369267855a2846891a13316ead09529 Mon Sep 17 00:00:00 2001 From: Andrei Sandu Date: Fri, 5 Nov 2021 13:14:51 +0000 Subject: [PATCH 01/16] SpawnNamed: add new trait methods Signed-off-by: Andrei Sandu --- primitives/core/src/traits.rs | 23 ++++++++++++++++++++++- 1 file changed, 22 insertions(+), 1 deletion(-) diff --git a/primitives/core/src/traits.rs b/primitives/core/src/traits.rs index 47639f9d87ba6..4f5adbdc3df81 100644 --- a/primitives/core/src/traits.rs +++ b/primitives/core/src/traits.rs @@ -190,7 +190,8 @@ sp_externalities::decl_extension! { pub struct RuntimeSpawnExt(Box); } -/// Something that can spawn tasks (blocking and non-blocking) with an assigned name. +/// Something that can spawn tasks (blocking and non-blocking) with an assigned name +/// and subsystem. #[dyn_clonable::clonable] pub trait SpawnNamed: Clone + Send + Sync { /// Spawn the given blocking future. @@ -201,6 +202,20 @@ pub trait SpawnNamed: Clone + Send + Sync { /// /// The given `name` is used to identify the future in tracing. fn spawn(&self, name: &'static str, future: futures::future::BoxFuture<'static, ()>); + /// Spawn the given blocking future. + /// + /// The given `subsystem` and `name` is used to identify the future in tracing. + fn spawn_blocking_with_subsystem(&self, name: &'static str, _subsystem: &'static str, future: futures::future::BoxFuture<'static, ()>) { + // Default impl doesn't trace subsystem. + self.spawn_blocking(name, future); + } + /// Spawn the given non-blocking future. + /// + /// The given `subsystem` and `name` is used to identify the future in tracing. + fn spawn_with_subsystem(&self, name: &'static str, _subsystem: &'static str, future: futures::future::BoxFuture<'static, ()>) { + // Default impl doesn't trace subsystem. + self.spawn(name, future); + } } impl SpawnNamed for Box { @@ -211,6 +226,12 @@ impl SpawnNamed for Box { fn spawn(&self, name: &'static str, future: futures::future::BoxFuture<'static, ()>) { (**self).spawn(name, future) } + fn spawn_blocking_with_subsystem(&self, name: &'static str, subsystem: &'static str, future: futures::future::BoxFuture<'static, ()>) { + (**self).spawn_blocking_with_subsystem(name, subsystem, future) + } + fn spawn_with_subsystem(&self, name: &'static str, subsystem: &'static str, future: futures::future::BoxFuture<'static, ()>) { + (**self).spawn_with_subsystem(name, subsystem, future) + } } /// Something that can spawn essential tasks (blocking and non-blocking) with an assigned name. From 51bb28c82b7ba2730e3bc0db5ec9e2f0654092f8 Mon Sep 17 00:00:00 2001 From: Andrei Sandu Date: Fri, 5 Nov 2021 13:15:32 +0000 Subject: [PATCH 02/16] Implement new methods Signed-off-by: Andrei Sandu --- client/service/src/task_manager/mod.rs | 64 ++++++++++++++++++-------- 1 file changed, 45 insertions(+), 19 deletions(-) diff --git a/client/service/src/task_manager/mod.rs b/client/service/src/task_manager/mod.rs index c827aa71dac2c..fc4243ef3c28f 100644 --- a/client/service/src/task_manager/mod.rs +++ b/client/service/src/task_manager/mod.rs @@ -57,7 +57,7 @@ impl SpawnTaskHandle { /// In other words, it would be a bad idea for someone to do for example /// `spawn(format!("{:?}", some_public_key))`. pub fn spawn(&self, name: &'static str, task: impl Future + Send + 'static) { - self.spawn_inner(name, task, TaskType::Async) + self.spawn_inner(name, None, task, TaskType::Async) } /// Spawns the blocking task with the given name. See also `spawn`. @@ -66,38 +66,46 @@ impl SpawnTaskHandle { name: &'static str, task: impl Future + Send + 'static, ) { - self.spawn_inner(name, task, TaskType::Blocking) + self.spawn_inner(name, None, task, TaskType::Blocking) } /// Helper function that implements the spawning logic. See `spawn` and `spawn_blocking`. fn spawn_inner( &self, name: &'static str, + subsystem: Option<&'static str>, task: impl Future + Send + 'static, task_type: TaskType, ) { if self.task_notifier.is_closed() { debug!("Attempt to spawn a new task has been prevented: {}", name); - return + return; } let on_exit = self.on_exit.clone(); let metrics = self.metrics.clone(); + // Provide a default subsystem name. + let subsystem_name = subsystem.unwrap_or("substrate-unspecified"); // Note that we increase the started counter here and not within the future. This way, // we could properly visualize on Prometheus situations where the spawning doesn't work. if let Some(metrics) = &self.metrics { - metrics.tasks_spawned.with_label_values(&[name]).inc(); + metrics.tasks_spawned.with_label_values(&[name, subsystem_name]).inc(); // We do a dummy increase in order for the task to show up in metrics. - metrics.tasks_ended.with_label_values(&[name, "finished"]).inc_by(0); + metrics + .tasks_ended + .with_label_values(&[name, "finished", subsystem_name]) + .inc_by(0); } let future = async move { if let Some(metrics) = metrics { // Add some wrappers around `task`. let task = { - let poll_duration = metrics.poll_duration.with_label_values(&[name]); - let poll_start = metrics.poll_start.with_label_values(&[name]); + let poll_duration = + metrics.poll_duration.with_label_values(&[name, subsystem_name]); + let poll_start = + metrics.poll_start.with_label_values(&[name, subsystem_name]); let inner = prometheus_future::with_poll_durations(poll_duration, poll_start, task); // The logic of `AssertUnwindSafe` here is ok considering that we throw @@ -108,16 +116,25 @@ impl SpawnTaskHandle { match select(on_exit, task).await { Either::Right((Err(payload), _)) => { - metrics.tasks_ended.with_label_values(&[name, "panic"]).inc(); + metrics + .tasks_ended + .with_label_values(&[name, "panic", subsystem_name]) + .inc(); panic::resume_unwind(payload) - }, + } Either::Right((Ok(()), _)) => { - metrics.tasks_ended.with_label_values(&[name, "finished"]).inc(); - }, + metrics + .tasks_ended + .with_label_values(&[name, "finished", subsystem_name]) + .inc(); + } Either::Left(((), _)) => { // The `on_exit` has triggered. - metrics.tasks_ended.with_label_values(&[name, "interrupted"]).inc(); - }, + metrics + .tasks_ended + .with_label_values(&[name, "interrupted", subsystem_name]) + .inc(); + } } } else { futures::pin_mut!(task); @@ -133,7 +150,7 @@ impl SpawnTaskHandle { self.tokio_handle.spawn_blocking(move || { handle.block_on(future); }) - }, + } }; let _ = self.task_notifier.unbounded_send(join_handle); @@ -148,6 +165,15 @@ impl sp_core::traits::SpawnNamed for SpawnTaskHandle { fn spawn(&self, name: &'static str, future: BoxFuture<'static, ()>) { self.spawn(name, future); } + + fn spawn_blocking_with_subsystem(&self, name: &'static str, subsystem: &'static str,future: BoxFuture<'static, ()>) { + self.spawn_inner(name, Some(subsystem), future, TaskType::Blocking) + } + + fn spawn_with_subsystem(&self, name: &'static str, subsystem: &'static str, future: BoxFuture<'static, ()>) { + self.spawn_inner(name, Some(subsystem), future, TaskType::Async) + } + } /// A wrapper over `SpawnTaskHandle` that will notify a receiver whenever any @@ -199,7 +225,7 @@ impl SpawnEssentialTaskHandle { let _ = essential_failed.close_channel(); }); - let _ = self.inner.spawn_inner(name, essential_task, task_type); + let _ = self.inner.spawn_inner(name, None, essential_task, task_type); } } @@ -396,28 +422,28 @@ impl Metrics { buckets: exponential_buckets(0.001, 4.0, 9) .expect("function parameters are constant and always valid; qed"), }, - &["task_name"] + &["task_name", "subsystem"] )?, registry)?, poll_start: register(CounterVec::new( Opts::new( "tasks_polling_started_total", "Total number of times we started invoking Future::poll" ), - &["task_name"] + &["task_name", "subsystem"] )?, registry)?, tasks_spawned: register(CounterVec::new( Opts::new( "tasks_spawned_total", "Total number of tasks that have been spawned on the Service" ), - &["task_name"] + &["task_name", "subsystem"] )?, registry)?, tasks_ended: register(CounterVec::new( Opts::new( "tasks_ended_total", "Total number of tasks for which Future::poll has returned Ready(()) or panicked" ), - &["task_name", "reason"] + &["task_name", "reason", "subsystem"] )?, registry)?, }) } From 67926c8117c87325afb45819191dd5b56a1a51dd Mon Sep 17 00:00:00 2001 From: Andrei Sandu Date: Fri, 5 Nov 2021 09:59:59 +0000 Subject: [PATCH 03/16] cargo fmt Signed-off-by: Andrei Sandu --- primitives/core/src/traits.rs | 30 +++++++++++++++++++++++++----- 1 file changed, 25 insertions(+), 5 deletions(-) diff --git a/primitives/core/src/traits.rs b/primitives/core/src/traits.rs index 4f5adbdc3df81..dd12d1e3d1071 100644 --- a/primitives/core/src/traits.rs +++ b/primitives/core/src/traits.rs @@ -190,7 +190,7 @@ sp_externalities::decl_extension! { pub struct RuntimeSpawnExt(Box); } -/// Something that can spawn tasks (blocking and non-blocking) with an assigned name +/// Something that can spawn tasks (blocking and non-blocking) with an assigned name /// and subsystem. #[dyn_clonable::clonable] pub trait SpawnNamed: Clone + Send + Sync { @@ -205,14 +205,24 @@ pub trait SpawnNamed: Clone + Send + Sync { /// Spawn the given blocking future. /// /// The given `subsystem` and `name` is used to identify the future in tracing. - fn spawn_blocking_with_subsystem(&self, name: &'static str, _subsystem: &'static str, future: futures::future::BoxFuture<'static, ()>) { + fn spawn_blocking_with_subsystem( + &self, + name: &'static str, + _subsystem: &'static str, + future: futures::future::BoxFuture<'static, ()>, + ) { // Default impl doesn't trace subsystem. self.spawn_blocking(name, future); } /// Spawn the given non-blocking future. /// /// The given `subsystem` and `name` is used to identify the future in tracing. - fn spawn_with_subsystem(&self, name: &'static str, _subsystem: &'static str, future: futures::future::BoxFuture<'static, ()>) { + fn spawn_with_subsystem( + &self, + name: &'static str, + _subsystem: &'static str, + future: futures::future::BoxFuture<'static, ()>, + ) { // Default impl doesn't trace subsystem. self.spawn(name, future); } @@ -226,10 +236,20 @@ impl SpawnNamed for Box { fn spawn(&self, name: &'static str, future: futures::future::BoxFuture<'static, ()>) { (**self).spawn(name, future) } - fn spawn_blocking_with_subsystem(&self, name: &'static str, subsystem: &'static str, future: futures::future::BoxFuture<'static, ()>) { + fn spawn_blocking_with_subsystem( + &self, + name: &'static str, + subsystem: &'static str, + future: futures::future::BoxFuture<'static, ()>, + ) { (**self).spawn_blocking_with_subsystem(name, subsystem, future) } - fn spawn_with_subsystem(&self, name: &'static str, subsystem: &'static str, future: futures::future::BoxFuture<'static, ()>) { + fn spawn_with_subsystem( + &self, + name: &'static str, + subsystem: &'static str, + future: futures::future::BoxFuture<'static, ()>, + ) { (**self).spawn_with_subsystem(name, subsystem, future) } } From 47640394f95c1a6e5caabf6be953e38af8c288f7 Mon Sep 17 00:00:00 2001 From: Andrei Sandu Date: Fri, 5 Nov 2021 13:14:51 +0000 Subject: [PATCH 04/16] SpawnNamed: add new trait methods Signed-off-by: Andrei Sandu --- primitives/core/src/traits.rs | 23 ++++++++++++++++++++++- 1 file changed, 22 insertions(+), 1 deletion(-) diff --git a/primitives/core/src/traits.rs b/primitives/core/src/traits.rs index 47639f9d87ba6..4f5adbdc3df81 100644 --- a/primitives/core/src/traits.rs +++ b/primitives/core/src/traits.rs @@ -190,7 +190,8 @@ sp_externalities::decl_extension! { pub struct RuntimeSpawnExt(Box); } -/// Something that can spawn tasks (blocking and non-blocking) with an assigned name. +/// Something that can spawn tasks (blocking and non-blocking) with an assigned name +/// and subsystem. #[dyn_clonable::clonable] pub trait SpawnNamed: Clone + Send + Sync { /// Spawn the given blocking future. @@ -201,6 +202,20 @@ pub trait SpawnNamed: Clone + Send + Sync { /// /// The given `name` is used to identify the future in tracing. fn spawn(&self, name: &'static str, future: futures::future::BoxFuture<'static, ()>); + /// Spawn the given blocking future. + /// + /// The given `subsystem` and `name` is used to identify the future in tracing. + fn spawn_blocking_with_subsystem(&self, name: &'static str, _subsystem: &'static str, future: futures::future::BoxFuture<'static, ()>) { + // Default impl doesn't trace subsystem. + self.spawn_blocking(name, future); + } + /// Spawn the given non-blocking future. + /// + /// The given `subsystem` and `name` is used to identify the future in tracing. + fn spawn_with_subsystem(&self, name: &'static str, _subsystem: &'static str, future: futures::future::BoxFuture<'static, ()>) { + // Default impl doesn't trace subsystem. + self.spawn(name, future); + } } impl SpawnNamed for Box { @@ -211,6 +226,12 @@ impl SpawnNamed for Box { fn spawn(&self, name: &'static str, future: futures::future::BoxFuture<'static, ()>) { (**self).spawn(name, future) } + fn spawn_blocking_with_subsystem(&self, name: &'static str, subsystem: &'static str, future: futures::future::BoxFuture<'static, ()>) { + (**self).spawn_blocking_with_subsystem(name, subsystem, future) + } + fn spawn_with_subsystem(&self, name: &'static str, subsystem: &'static str, future: futures::future::BoxFuture<'static, ()>) { + (**self).spawn_with_subsystem(name, subsystem, future) + } } /// Something that can spawn essential tasks (blocking and non-blocking) with an assigned name. From 3a5a3e49c6955e12de9df8652c6324d4822c707f Mon Sep 17 00:00:00 2001 From: Andrei Sandu Date: Fri, 5 Nov 2021 13:15:32 +0000 Subject: [PATCH 05/16] Implement new methods Signed-off-by: Andrei Sandu --- client/service/src/task_manager/mod.rs | 64 ++++++++++++++++++-------- 1 file changed, 45 insertions(+), 19 deletions(-) diff --git a/client/service/src/task_manager/mod.rs b/client/service/src/task_manager/mod.rs index c827aa71dac2c..fc4243ef3c28f 100644 --- a/client/service/src/task_manager/mod.rs +++ b/client/service/src/task_manager/mod.rs @@ -57,7 +57,7 @@ impl SpawnTaskHandle { /// In other words, it would be a bad idea for someone to do for example /// `spawn(format!("{:?}", some_public_key))`. pub fn spawn(&self, name: &'static str, task: impl Future + Send + 'static) { - self.spawn_inner(name, task, TaskType::Async) + self.spawn_inner(name, None, task, TaskType::Async) } /// Spawns the blocking task with the given name. See also `spawn`. @@ -66,38 +66,46 @@ impl SpawnTaskHandle { name: &'static str, task: impl Future + Send + 'static, ) { - self.spawn_inner(name, task, TaskType::Blocking) + self.spawn_inner(name, None, task, TaskType::Blocking) } /// Helper function that implements the spawning logic. See `spawn` and `spawn_blocking`. fn spawn_inner( &self, name: &'static str, + subsystem: Option<&'static str>, task: impl Future + Send + 'static, task_type: TaskType, ) { if self.task_notifier.is_closed() { debug!("Attempt to spawn a new task has been prevented: {}", name); - return + return; } let on_exit = self.on_exit.clone(); let metrics = self.metrics.clone(); + // Provide a default subsystem name. + let subsystem_name = subsystem.unwrap_or("substrate-unspecified"); // Note that we increase the started counter here and not within the future. This way, // we could properly visualize on Prometheus situations where the spawning doesn't work. if let Some(metrics) = &self.metrics { - metrics.tasks_spawned.with_label_values(&[name]).inc(); + metrics.tasks_spawned.with_label_values(&[name, subsystem_name]).inc(); // We do a dummy increase in order for the task to show up in metrics. - metrics.tasks_ended.with_label_values(&[name, "finished"]).inc_by(0); + metrics + .tasks_ended + .with_label_values(&[name, "finished", subsystem_name]) + .inc_by(0); } let future = async move { if let Some(metrics) = metrics { // Add some wrappers around `task`. let task = { - let poll_duration = metrics.poll_duration.with_label_values(&[name]); - let poll_start = metrics.poll_start.with_label_values(&[name]); + let poll_duration = + metrics.poll_duration.with_label_values(&[name, subsystem_name]); + let poll_start = + metrics.poll_start.with_label_values(&[name, subsystem_name]); let inner = prometheus_future::with_poll_durations(poll_duration, poll_start, task); // The logic of `AssertUnwindSafe` here is ok considering that we throw @@ -108,16 +116,25 @@ impl SpawnTaskHandle { match select(on_exit, task).await { Either::Right((Err(payload), _)) => { - metrics.tasks_ended.with_label_values(&[name, "panic"]).inc(); + metrics + .tasks_ended + .with_label_values(&[name, "panic", subsystem_name]) + .inc(); panic::resume_unwind(payload) - }, + } Either::Right((Ok(()), _)) => { - metrics.tasks_ended.with_label_values(&[name, "finished"]).inc(); - }, + metrics + .tasks_ended + .with_label_values(&[name, "finished", subsystem_name]) + .inc(); + } Either::Left(((), _)) => { // The `on_exit` has triggered. - metrics.tasks_ended.with_label_values(&[name, "interrupted"]).inc(); - }, + metrics + .tasks_ended + .with_label_values(&[name, "interrupted", subsystem_name]) + .inc(); + } } } else { futures::pin_mut!(task); @@ -133,7 +150,7 @@ impl SpawnTaskHandle { self.tokio_handle.spawn_blocking(move || { handle.block_on(future); }) - }, + } }; let _ = self.task_notifier.unbounded_send(join_handle); @@ -148,6 +165,15 @@ impl sp_core::traits::SpawnNamed for SpawnTaskHandle { fn spawn(&self, name: &'static str, future: BoxFuture<'static, ()>) { self.spawn(name, future); } + + fn spawn_blocking_with_subsystem(&self, name: &'static str, subsystem: &'static str,future: BoxFuture<'static, ()>) { + self.spawn_inner(name, Some(subsystem), future, TaskType::Blocking) + } + + fn spawn_with_subsystem(&self, name: &'static str, subsystem: &'static str, future: BoxFuture<'static, ()>) { + self.spawn_inner(name, Some(subsystem), future, TaskType::Async) + } + } /// A wrapper over `SpawnTaskHandle` that will notify a receiver whenever any @@ -199,7 +225,7 @@ impl SpawnEssentialTaskHandle { let _ = essential_failed.close_channel(); }); - let _ = self.inner.spawn_inner(name, essential_task, task_type); + let _ = self.inner.spawn_inner(name, None, essential_task, task_type); } } @@ -396,28 +422,28 @@ impl Metrics { buckets: exponential_buckets(0.001, 4.0, 9) .expect("function parameters are constant and always valid; qed"), }, - &["task_name"] + &["task_name", "subsystem"] )?, registry)?, poll_start: register(CounterVec::new( Opts::new( "tasks_polling_started_total", "Total number of times we started invoking Future::poll" ), - &["task_name"] + &["task_name", "subsystem"] )?, registry)?, tasks_spawned: register(CounterVec::new( Opts::new( "tasks_spawned_total", "Total number of tasks that have been spawned on the Service" ), - &["task_name"] + &["task_name", "subsystem"] )?, registry)?, tasks_ended: register(CounterVec::new( Opts::new( "tasks_ended_total", "Total number of tasks for which Future::poll has returned Ready(()) or panicked" ), - &["task_name", "reason"] + &["task_name", "reason", "subsystem"] )?, registry)?, }) } From fbe6cdabedd75dbd71a58204483ae0e357a84f9e Mon Sep 17 00:00:00 2001 From: Andrei Sandu Date: Fri, 5 Nov 2021 09:59:59 +0000 Subject: [PATCH 06/16] cargo fmt Signed-off-by: Andrei Sandu --- primitives/core/src/traits.rs | 30 +++++++++++++++++++++++++----- 1 file changed, 25 insertions(+), 5 deletions(-) diff --git a/primitives/core/src/traits.rs b/primitives/core/src/traits.rs index 4f5adbdc3df81..dd12d1e3d1071 100644 --- a/primitives/core/src/traits.rs +++ b/primitives/core/src/traits.rs @@ -190,7 +190,7 @@ sp_externalities::decl_extension! { pub struct RuntimeSpawnExt(Box); } -/// Something that can spawn tasks (blocking and non-blocking) with an assigned name +/// Something that can spawn tasks (blocking and non-blocking) with an assigned name /// and subsystem. #[dyn_clonable::clonable] pub trait SpawnNamed: Clone + Send + Sync { @@ -205,14 +205,24 @@ pub trait SpawnNamed: Clone + Send + Sync { /// Spawn the given blocking future. /// /// The given `subsystem` and `name` is used to identify the future in tracing. - fn spawn_blocking_with_subsystem(&self, name: &'static str, _subsystem: &'static str, future: futures::future::BoxFuture<'static, ()>) { + fn spawn_blocking_with_subsystem( + &self, + name: &'static str, + _subsystem: &'static str, + future: futures::future::BoxFuture<'static, ()>, + ) { // Default impl doesn't trace subsystem. self.spawn_blocking(name, future); } /// Spawn the given non-blocking future. /// /// The given `subsystem` and `name` is used to identify the future in tracing. - fn spawn_with_subsystem(&self, name: &'static str, _subsystem: &'static str, future: futures::future::BoxFuture<'static, ()>) { + fn spawn_with_subsystem( + &self, + name: &'static str, + _subsystem: &'static str, + future: futures::future::BoxFuture<'static, ()>, + ) { // Default impl doesn't trace subsystem. self.spawn(name, future); } @@ -226,10 +236,20 @@ impl SpawnNamed for Box { fn spawn(&self, name: &'static str, future: futures::future::BoxFuture<'static, ()>) { (**self).spawn(name, future) } - fn spawn_blocking_with_subsystem(&self, name: &'static str, subsystem: &'static str, future: futures::future::BoxFuture<'static, ()>) { + fn spawn_blocking_with_subsystem( + &self, + name: &'static str, + subsystem: &'static str, + future: futures::future::BoxFuture<'static, ()>, + ) { (**self).spawn_blocking_with_subsystem(name, subsystem, future) } - fn spawn_with_subsystem(&self, name: &'static str, subsystem: &'static str, future: futures::future::BoxFuture<'static, ()>) { + fn spawn_with_subsystem( + &self, + name: &'static str, + subsystem: &'static str, + future: futures::future::BoxFuture<'static, ()>, + ) { (**self).spawn_with_subsystem(name, subsystem, future) } } From 2237a95476b812149626239bf1d4a341d9486b81 Mon Sep 17 00:00:00 2001 From: Andrei Sandu Date: Mon, 8 Nov 2021 09:59:23 +0000 Subject: [PATCH 07/16] New approach - spaw() group param Signed-off-by: Andrei Sandu --- client/service/src/task_manager/mod.rs | 67 ++++++++++++-------------- 1 file changed, 30 insertions(+), 37 deletions(-) diff --git a/client/service/src/task_manager/mod.rs b/client/service/src/task_manager/mod.rs index fc4243ef3c28f..2681142d760f3 100644 --- a/client/service/src/task_manager/mod.rs +++ b/client/service/src/task_manager/mod.rs @@ -56,24 +56,25 @@ impl SpawnTaskHandle { /// /// In other words, it would be a bad idea for someone to do for example /// `spawn(format!("{:?}", some_public_key))`. - pub fn spawn(&self, name: &'static str, task: impl Future + Send + 'static) { - self.spawn_inner(name, None, task, TaskType::Async) + pub fn spawn(&self, name: &'static str, group: &'static str, task: impl Future + Send + 'static) { + self.spawn_inner(name, group, task, TaskType::Async) } /// Spawns the blocking task with the given name. See also `spawn`. pub fn spawn_blocking( &self, name: &'static str, + group: &'static str, task: impl Future + Send + 'static, ) { - self.spawn_inner(name, None, task, TaskType::Blocking) + self.spawn_inner(name, group, task, TaskType::Blocking) } /// Helper function that implements the spawning logic. See `spawn` and `spawn_blocking`. fn spawn_inner( &self, name: &'static str, - subsystem: Option<&'static str>, + group: &'static str, task: impl Future + Send + 'static, task_type: TaskType, ) { @@ -84,17 +85,15 @@ impl SpawnTaskHandle { let on_exit = self.on_exit.clone(); let metrics = self.metrics.clone(); - // Provide a default subsystem name. - let subsystem_name = subsystem.unwrap_or("substrate-unspecified"); // Note that we increase the started counter here and not within the future. This way, // we could properly visualize on Prometheus situations where the spawning doesn't work. if let Some(metrics) = &self.metrics { - metrics.tasks_spawned.with_label_values(&[name, subsystem_name]).inc(); + metrics.tasks_spawned.with_label_values(&[name, group]).inc(); // We do a dummy increase in order for the task to show up in metrics. metrics .tasks_ended - .with_label_values(&[name, "finished", subsystem_name]) + .with_label_values(&[name, "finished", group]) .inc_by(0); } @@ -103,9 +102,9 @@ impl SpawnTaskHandle { // Add some wrappers around `task`. let task = { let poll_duration = - metrics.poll_duration.with_label_values(&[name, subsystem_name]); + metrics.poll_duration.with_label_values(&[name, group]); let poll_start = - metrics.poll_start.with_label_values(&[name, subsystem_name]); + metrics.poll_start.with_label_values(&[name, group]); let inner = prometheus_future::with_poll_durations(poll_duration, poll_start, task); // The logic of `AssertUnwindSafe` here is ok considering that we throw @@ -118,21 +117,21 @@ impl SpawnTaskHandle { Either::Right((Err(payload), _)) => { metrics .tasks_ended - .with_label_values(&[name, "panic", subsystem_name]) + .with_label_values(&[name, "panic", group]) .inc(); panic::resume_unwind(payload) } Either::Right((Ok(()), _)) => { metrics .tasks_ended - .with_label_values(&[name, "finished", subsystem_name]) + .with_label_values(&[name, "finished", group]) .inc(); } Either::Left(((), _)) => { // The `on_exit` has triggered. metrics .tasks_ended - .with_label_values(&[name, "interrupted", subsystem_name]) + .with_label_values(&[name, "interrupted", group]) .inc(); } } @@ -158,20 +157,12 @@ impl SpawnTaskHandle { } impl sp_core::traits::SpawnNamed for SpawnTaskHandle { - fn spawn_blocking(&self, name: &'static str, future: BoxFuture<'static, ()>) { - self.spawn_blocking(name, future); + fn spawn_blocking(&self, name: &'static str, group: &'static str,future: BoxFuture<'static, ()>) { + self.spawn_inner(name, group, future, TaskType::Blocking) } - fn spawn(&self, name: &'static str, future: BoxFuture<'static, ()>) { - self.spawn(name, future); - } - - fn spawn_blocking_with_subsystem(&self, name: &'static str, subsystem: &'static str,future: BoxFuture<'static, ()>) { - self.spawn_inner(name, Some(subsystem), future, TaskType::Blocking) - } - - fn spawn_with_subsystem(&self, name: &'static str, subsystem: &'static str, future: BoxFuture<'static, ()>) { - self.spawn_inner(name, Some(subsystem), future, TaskType::Async) + fn spawn(&self, name: &'static str, group: &'static str, future: BoxFuture<'static, ()>) { + self.spawn_inner(name, group, future, TaskType::Async) } } @@ -198,8 +189,8 @@ impl SpawnEssentialTaskHandle { /// Spawns the given task with the given name. /// /// See also [`SpawnTaskHandle::spawn`]. - pub fn spawn(&self, name: &'static str, task: impl Future + Send + 'static) { - self.spawn_inner(name, task, TaskType::Async) + pub fn spawn(&self, name: &'static str, group: &'static str, task: impl Future + Send + 'static) { + self.spawn_inner(name, group, task, TaskType::Async) } /// Spawns the blocking task with the given name. @@ -208,14 +199,16 @@ impl SpawnEssentialTaskHandle { pub fn spawn_blocking( &self, name: &'static str, + group: &'static str, task: impl Future + Send + 'static, ) { - self.spawn_inner(name, task, TaskType::Blocking) + self.spawn_inner(name, group, task, TaskType::Blocking) } fn spawn_inner( &self, name: &'static str, + group: &'static str, task: impl Future + Send + 'static, task_type: TaskType, ) { @@ -225,17 +218,17 @@ impl SpawnEssentialTaskHandle { let _ = essential_failed.close_channel(); }); - let _ = self.inner.spawn_inner(name, None, essential_task, task_type); + let _ = self.inner.spawn_inner(name, group, essential_task, task_type); } } impl sp_core::traits::SpawnEssentialNamed for SpawnEssentialTaskHandle { - fn spawn_essential_blocking(&self, name: &'static str, future: BoxFuture<'static, ()>) { - self.spawn_blocking(name, future); + fn spawn_essential_blocking(&self, name: &'static str, group: &'static str, future: BoxFuture<'static, ()>) { + self.spawn_blocking(name, group, future); } - fn spawn_essential(&self, name: &'static str, future: BoxFuture<'static, ()>) { - self.spawn(name, future); + fn spawn_essential(&self, name: &'static str, group: &'static str, future: BoxFuture<'static, ()>) { + self.spawn(name, group, future); } } @@ -422,28 +415,28 @@ impl Metrics { buckets: exponential_buckets(0.001, 4.0, 9) .expect("function parameters are constant and always valid; qed"), }, - &["task_name", "subsystem"] + &["task_name", "task_group"] )?, registry)?, poll_start: register(CounterVec::new( Opts::new( "tasks_polling_started_total", "Total number of times we started invoking Future::poll" ), - &["task_name", "subsystem"] + &["task_name", "task_group"] )?, registry)?, tasks_spawned: register(CounterVec::new( Opts::new( "tasks_spawned_total", "Total number of tasks that have been spawned on the Service" ), - &["task_name", "subsystem"] + &["task_name", "task_group"] )?, registry)?, tasks_ended: register(CounterVec::new( Opts::new( "tasks_ended_total", "Total number of tasks for which Future::poll has returned Ready(()) or panicked" ), - &["task_name", "reason", "subsystem"] + &["task_name", "reason", "task_group"] )?, registry)?, }) } From b8220a4f7f16a631f9d483b03ef19752af3d5898 Mon Sep 17 00:00:00 2001 From: Andrei Sandu Date: Mon, 8 Nov 2021 10:00:34 +0000 Subject: [PATCH 08/16] Update traits: SpawnNamed and SpawnNamed Signed-off-by: Andrei Sandu --- primitives/core/src/traits.rs | 63 ++++++++++++----------------------- 1 file changed, 22 insertions(+), 41 deletions(-) diff --git a/primitives/core/src/traits.rs b/primitives/core/src/traits.rs index dd12d1e3d1071..9871990561cc7 100644 --- a/primitives/core/src/traits.rs +++ b/primitives/core/src/traits.rs @@ -191,66 +191,45 @@ sp_externalities::decl_extension! { } /// Something that can spawn tasks (blocking and non-blocking) with an assigned name -/// and subsystem. +/// and group. #[dyn_clonable::clonable] pub trait SpawnNamed: Clone + Send + Sync { /// Spawn the given blocking future. /// - /// The given `name` is used to identify the future in tracing. - fn spawn_blocking(&self, name: &'static str, future: futures::future::BoxFuture<'static, ()>); - /// Spawn the given non-blocking future. - /// - /// The given `name` is used to identify the future in tracing. - fn spawn(&self, name: &'static str, future: futures::future::BoxFuture<'static, ()>); - /// Spawn the given blocking future. - /// - /// The given `subsystem` and `name` is used to identify the future in tracing. - fn spawn_blocking_with_subsystem( + /// The given `group` and `name` is used to identify the future in tracing. + fn spawn_blocking( &self, name: &'static str, - _subsystem: &'static str, + group: &'static str, future: futures::future::BoxFuture<'static, ()>, - ) { - // Default impl doesn't trace subsystem. - self.spawn_blocking(name, future); - } + ); /// Spawn the given non-blocking future. /// - /// The given `subsystem` and `name` is used to identify the future in tracing. - fn spawn_with_subsystem( + /// The given `group` and `name` is used to identify the future in tracing. + fn spawn( &self, name: &'static str, - _subsystem: &'static str, + group: &'static str, future: futures::future::BoxFuture<'static, ()>, - ) { - // Default impl doesn't trace subsystem. - self.spawn(name, future); - } + ); } impl SpawnNamed for Box { - fn spawn_blocking(&self, name: &'static str, future: futures::future::BoxFuture<'static, ()>) { - (**self).spawn_blocking(name, future) - } - - fn spawn(&self, name: &'static str, future: futures::future::BoxFuture<'static, ()>) { - (**self).spawn(name, future) - } - fn spawn_blocking_with_subsystem( + fn spawn_blocking( &self, name: &'static str, - subsystem: &'static str, + group: &'static str, future: futures::future::BoxFuture<'static, ()>, ) { - (**self).spawn_blocking_with_subsystem(name, subsystem, future) + (**self).spawn_blocking(name, group, future) } - fn spawn_with_subsystem( + fn spawn( &self, name: &'static str, - subsystem: &'static str, + group: &'static str, future: futures::future::BoxFuture<'static, ()>, ) { - (**self).spawn_with_subsystem(name, subsystem, future) + (**self).spawn(name, group, future) } } @@ -258,31 +237,33 @@ impl SpawnNamed for Box { /// /// Essential tasks are special tasks that should take down the node when they end. #[dyn_clonable::clonable] -pub trait SpawnEssentialNamed: Clone + Send + Sync { +pub trait SpawnNamedSpawnNamedSpawnNamed: Clone + Send + Sync { /// Spawn the given blocking future. /// /// The given `name` is used to identify the future in tracing. fn spawn_essential_blocking( &self, name: &'static str, + group: &'static str, future: futures::future::BoxFuture<'static, ()>, ); /// Spawn the given non-blocking future. /// /// The given `name` is used to identify the future in tracing. - fn spawn_essential(&self, name: &'static str, future: futures::future::BoxFuture<'static, ()>); + fn spawn_essential(&self, name: &'static str, group: &'static str, future: futures::future::BoxFuture<'static, ()>); } impl SpawnEssentialNamed for Box { fn spawn_essential_blocking( &self, name: &'static str, + group: &'static str, future: futures::future::BoxFuture<'static, ()>, ) { - (**self).spawn_essential_blocking(name, future) + (**self).spawn_essential_blocking(name, group, future) } - fn spawn_essential(&self, name: &'static str, future: futures::future::BoxFuture<'static, ()>) { - (**self).spawn_essential(name, future) + fn spawn_essential(&self, name: &'static str, group: &'static str, future: futures::future::BoxFuture<'static, ()>) { + (**self).spawn_essential(name, group, future) } } From 136b0bc7e9164c2f2e0c0c554bc5536a6ef091ca Mon Sep 17 00:00:00 2001 From: Andrei Sandu Date: Mon, 8 Nov 2021 10:01:03 +0000 Subject: [PATCH 09/16] Update TaskManager tests Signed-off-by: Andrei Sandu --- client/service/src/task_manager/tests.rs | 52 ++++++++++++------------ 1 file changed, 27 insertions(+), 25 deletions(-) diff --git a/client/service/src/task_manager/tests.rs b/client/service/src/task_manager/tests.rs index 291d71ebaf03b..aa1c26f7a0feb 100644 --- a/client/service/src/task_manager/tests.rs +++ b/client/service/src/task_manager/tests.rs @@ -96,8 +96,8 @@ fn ensure_tasks_are_awaited_on_shutdown() { let task_manager = new_task_manager(handle); let spawn_handle = task_manager.spawn_handle(); let drop_tester = DropTester::new(); - spawn_handle.spawn("task1", run_background_task(drop_tester.new_ref())); - spawn_handle.spawn("task2", run_background_task(drop_tester.new_ref())); + spawn_handle.spawn("task1", "", run_background_task(drop_tester.new_ref())); + spawn_handle.spawn("task2", "", run_background_task(drop_tester.new_ref())); assert_eq!(drop_tester, 2); // allow the tasks to even start runtime.block_on(async { tokio::time::sleep(Duration::from_secs(1)).await }); @@ -115,7 +115,7 @@ fn ensure_keep_alive_during_shutdown() { let spawn_handle = task_manager.spawn_handle(); let drop_tester = DropTester::new(); task_manager.keep_alive(drop_tester.new_ref()); - spawn_handle.spawn("task1", run_background_task(())); + spawn_handle.spawn("task1", "", run_background_task(())); assert_eq!(drop_tester, 1); // allow the tasks to even start runtime.block_on(async { tokio::time::sleep(Duration::from_secs(1)).await }); @@ -134,10 +134,12 @@ fn ensure_blocking_futures_are_awaited_on_shutdown() { let drop_tester = DropTester::new(); spawn_handle.spawn( "task1", + "", run_background_task_blocking(Duration::from_secs(3), drop_tester.new_ref()), ); spawn_handle.spawn( "task2", + "", run_background_task_blocking(Duration::from_secs(3), drop_tester.new_ref()), ); assert_eq!(drop_tester, 2); @@ -156,14 +158,14 @@ fn ensure_no_task_can_be_spawn_after_terminate() { let mut task_manager = new_task_manager(handle); let spawn_handle = task_manager.spawn_handle(); let drop_tester = DropTester::new(); - spawn_handle.spawn("task1", run_background_task(drop_tester.new_ref())); - spawn_handle.spawn("task2", run_background_task(drop_tester.new_ref())); + spawn_handle.spawn("task1", "", run_background_task(drop_tester.new_ref())); + spawn_handle.spawn("task2", "", run_background_task(drop_tester.new_ref())); assert_eq!(drop_tester, 2); // allow the tasks to even start runtime.block_on(async { tokio::time::sleep(Duration::from_secs(1)).await }); assert_eq!(drop_tester, 2); task_manager.terminate(); - spawn_handle.spawn("task3", run_background_task(drop_tester.new_ref())); + spawn_handle.spawn("task3", "", run_background_task(drop_tester.new_ref())); runtime.block_on(task_manager.clean_shutdown()); drop_tester.wait_on_drop(); } @@ -176,8 +178,8 @@ fn ensure_task_manager_future_ends_when_task_manager_terminated() { let mut task_manager = new_task_manager(handle); let spawn_handle = task_manager.spawn_handle(); let drop_tester = DropTester::new(); - spawn_handle.spawn("task1", run_background_task(drop_tester.new_ref())); - spawn_handle.spawn("task2", run_background_task(drop_tester.new_ref())); + spawn_handle.spawn("task1", "", run_background_task(drop_tester.new_ref())); + spawn_handle.spawn("task2", "", run_background_task(drop_tester.new_ref())); assert_eq!(drop_tester, 2); // allow the tasks to even start runtime.block_on(async { tokio::time::sleep(Duration::from_secs(1)).await }); @@ -197,13 +199,13 @@ fn ensure_task_manager_future_ends_with_error_when_essential_task_fails() { let spawn_handle = task_manager.spawn_handle(); let spawn_essential_handle = task_manager.spawn_essential_handle(); let drop_tester = DropTester::new(); - spawn_handle.spawn("task1", run_background_task(drop_tester.new_ref())); - spawn_handle.spawn("task2", run_background_task(drop_tester.new_ref())); + spawn_handle.spawn("task1", "", run_background_task(drop_tester.new_ref())); + spawn_handle.spawn("task2", "", run_background_task(drop_tester.new_ref())); assert_eq!(drop_tester, 2); // allow the tasks to even start runtime.block_on(async { tokio::time::sleep(Duration::from_secs(1)).await }); assert_eq!(drop_tester, 2); - spawn_essential_handle.spawn("task3", async { panic!("task failed") }); + spawn_essential_handle.spawn("task3", "", async { panic!("task failed") }); runtime .block_on(task_manager.future()) .expect_err("future()'s Result must be Err"); @@ -226,10 +228,10 @@ fn ensure_children_tasks_ends_when_task_manager_terminated() { task_manager.add_child(child_2); let spawn_handle = task_manager.spawn_handle(); let drop_tester = DropTester::new(); - spawn_handle.spawn("task1", run_background_task(drop_tester.new_ref())); - spawn_handle.spawn("task2", run_background_task(drop_tester.new_ref())); - spawn_handle_child_1.spawn("task3", run_background_task(drop_tester.new_ref())); - spawn_handle_child_2.spawn("task4", run_background_task(drop_tester.new_ref())); + spawn_handle.spawn("task1", "", run_background_task(drop_tester.new_ref())); + spawn_handle.spawn("task2", "", run_background_task(drop_tester.new_ref())); + spawn_handle_child_1.spawn("task3", "", run_background_task(drop_tester.new_ref())); + spawn_handle_child_2.spawn("task4", "", run_background_task(drop_tester.new_ref())); assert_eq!(drop_tester, 4); // allow the tasks to even start runtime.block_on(async { tokio::time::sleep(Duration::from_secs(1)).await }); @@ -255,15 +257,15 @@ fn ensure_task_manager_future_ends_with_error_when_childs_essential_task_fails() task_manager.add_child(child_2); let spawn_handle = task_manager.spawn_handle(); let drop_tester = DropTester::new(); - spawn_handle.spawn("task1", run_background_task(drop_tester.new_ref())); - spawn_handle.spawn("task2", run_background_task(drop_tester.new_ref())); - spawn_handle_child_1.spawn("task3", run_background_task(drop_tester.new_ref())); - spawn_handle_child_2.spawn("task4", run_background_task(drop_tester.new_ref())); + spawn_handle.spawn("task1", "", run_background_task(drop_tester.new_ref())); + spawn_handle.spawn("task2", "", run_background_task(drop_tester.new_ref())); + spawn_handle_child_1.spawn("task3", "", run_background_task(drop_tester.new_ref())); + spawn_handle_child_2.spawn("task4", "", run_background_task(drop_tester.new_ref())); assert_eq!(drop_tester, 4); // allow the tasks to even start runtime.block_on(async { tokio::time::sleep(Duration::from_secs(1)).await }); assert_eq!(drop_tester, 4); - spawn_essential_handle_child_1.spawn("task5", async { panic!("task failed") }); + spawn_essential_handle_child_1.spawn("task5", "", async { panic!("task failed") }); runtime .block_on(task_manager.future()) .expect_err("future()'s Result must be Err"); @@ -286,15 +288,15 @@ fn ensure_task_manager_future_continues_when_childs_not_essential_task_fails() { task_manager.add_child(child_2); let spawn_handle = task_manager.spawn_handle(); let drop_tester = DropTester::new(); - spawn_handle.spawn("task1", run_background_task(drop_tester.new_ref())); - spawn_handle.spawn("task2", run_background_task(drop_tester.new_ref())); - spawn_handle_child_1.spawn("task3", run_background_task(drop_tester.new_ref())); - spawn_handle_child_2.spawn("task4", run_background_task(drop_tester.new_ref())); + spawn_handle.spawn("task1", "", run_background_task(drop_tester.new_ref())); + spawn_handle.spawn("task2", "", run_background_task(drop_tester.new_ref())); + spawn_handle_child_1.spawn("task3", "", run_background_task(drop_tester.new_ref())); + spawn_handle_child_2.spawn("task4", "", run_background_task(drop_tester.new_ref())); assert_eq!(drop_tester, 4); // allow the tasks to even start runtime.block_on(async { tokio::time::sleep(Duration::from_secs(1)).await }); assert_eq!(drop_tester, 4); - spawn_handle_child_1.spawn("task5", async { panic!("task failed") }); + spawn_handle_child_1.spawn("task5", "", async { panic!("task failed") }); runtime.block_on(async { let t1 = task_manager.future().fuse(); let t2 = tokio::time::sleep(Duration::from_secs(3)).fuse(); From 4d83a1a4a1660ceac62e14870a815d5683de011a Mon Sep 17 00:00:00 2001 From: Andrei Sandu Date: Mon, 8 Nov 2021 10:02:03 +0000 Subject: [PATCH 10/16] Update test TaskExecutor Signed-off-by: Andrei Sandu --- primitives/core/src/testing.rs | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/primitives/core/src/testing.rs b/primitives/core/src/testing.rs index a7fff0def83f2..a178b6261dcc3 100644 --- a/primitives/core/src/testing.rs +++ b/primitives/core/src/testing.rs @@ -152,10 +152,10 @@ impl Default for TaskExecutor { #[cfg(feature = "std")] impl crate::traits::SpawnNamed for TaskExecutor { - fn spawn_blocking(&self, _: &'static str, future: futures::future::BoxFuture<'static, ()>) { + fn spawn_blocking(&self, _: &'static str, _: &'static str, future: futures::future::BoxFuture<'static, ()>) { self.0.spawn_ok(future); } - fn spawn(&self, _: &'static str, future: futures::future::BoxFuture<'static, ()>) { + fn spawn(&self, _: &'static str, _: &'static str, future: futures::future::BoxFuture<'static, ()>) { self.0.spawn_ok(future); } } @@ -165,11 +165,12 @@ impl crate::traits::SpawnEssentialNamed for TaskExecutor { fn spawn_essential_blocking( &self, _: &'static str, + _: &'static str, future: futures::future::BoxFuture<'static, ()>, ) { self.0.spawn_ok(future); } - fn spawn_essential(&self, _: &'static str, future: futures::future::BoxFuture<'static, ()>) { + fn spawn_essential(&self, _: &'static str, _: &'static str, future: futures::future::BoxFuture<'static, ()>) { self.0.spawn_ok(future); } } From b5a88bcf9827d76f313e895159a4e601045774d0 Mon Sep 17 00:00:00 2001 From: Andrei Sandu Date: Mon, 8 Nov 2021 10:05:22 +0000 Subject: [PATCH 11/16] Fix typo Signed-off-by: Andrei Sandu --- primitives/core/src/traits.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/primitives/core/src/traits.rs b/primitives/core/src/traits.rs index 9871990561cc7..4642db6fc5b09 100644 --- a/primitives/core/src/traits.rs +++ b/primitives/core/src/traits.rs @@ -237,7 +237,7 @@ impl SpawnNamed for Box { /// /// Essential tasks are special tasks that should take down the node when they end. #[dyn_clonable::clonable] -pub trait SpawnNamedSpawnNamedSpawnNamed: Clone + Send + Sync { +pub trait SpawnEssentialNamed: Clone + Send + Sync { /// Spawn the given blocking future. /// /// The given `name` is used to identify the future in tracing. From f02b3afebf2220fca4610c25e1183d1474bb715b Mon Sep 17 00:00:00 2001 From: Andrei Sandu Date: Mon, 8 Nov 2021 10:22:44 +0000 Subject: [PATCH 12/16] grunt work: fix spawn() calls Signed-off-by: Andrei Sandu --- bin/node-template/node/src/service.rs | 5 +++-- bin/node/cli/src/service.rs | 8 ++++---- bin/node/testing/src/bench.rs | 4 ++-- .../basic-authorship/src/basic_authorship.rs | 1 + .../common/src/import_queue/basic_queue.rs | 2 +- client/executor/src/native_executor.rs | 1 + client/offchain/src/lib.rs | 1 + client/rpc/src/lib.rs | 2 +- client/service/src/builder.rs | 20 ++++++++++++------- client/transaction-pool/src/api.rs | 1 + client/transaction-pool/src/lib.rs | 2 +- primitives/io/src/batch_verifier.rs | 2 ++ primitives/tasks/src/lib.rs | 1 + test-utils/test-runner/src/client.rs | 2 +- 14 files changed, 33 insertions(+), 19 deletions(-) diff --git a/bin/node-template/node/src/service.rs b/bin/node-template/node/src/service.rs index d673a54a94882..92b1fa80f7b71 100644 --- a/bin/node-template/node/src/service.rs +++ b/bin/node-template/node/src/service.rs @@ -89,7 +89,7 @@ pub fn new_partial( let client = Arc::new(client); let telemetry = telemetry.map(|(worker, telemetry)| { - task_manager.spawn_handle().spawn("telemetry", worker.run()); + task_manager.spawn_handle().spawn("telemetry", "", worker.run()); telemetry }); @@ -289,7 +289,7 @@ pub fn new_full(mut config: Configuration) -> Result // the AURA authoring task is considered essential, i.e. if it // fails we take down the service with it. - task_manager.spawn_essential_handle().spawn_blocking("aura", aura); + task_manager.spawn_essential_handle().spawn_blocking("aura", "", aura); } // if the node isn't actively participating in consensus then it doesn't @@ -329,6 +329,7 @@ pub fn new_full(mut config: Configuration) -> Result // if it fails we take down the service with it. task_manager.spawn_essential_handle().spawn_blocking( "grandpa-voter", + "", sc_finality_grandpa::run_grandpa_voter(grandpa_config)?, ); } diff --git a/bin/node/cli/src/service.rs b/bin/node/cli/src/service.rs index 2220614ebaf2a..07f02e8f9a2aa 100644 --- a/bin/node/cli/src/service.rs +++ b/bin/node/cli/src/service.rs @@ -170,7 +170,7 @@ pub fn new_partial( let client = Arc::new(client); let telemetry = telemetry.map(|(worker, telemetry)| { - task_manager.spawn_handle().spawn("telemetry", worker.run()); + task_manager.spawn_handle().spawn("telemetry", "", worker.run()); telemetry }); @@ -434,7 +434,7 @@ pub fn new_full_base( }; let babe = sc_consensus_babe::start_babe(babe_config)?; - task_manager.spawn_essential_handle().spawn_blocking("babe-proposer", babe); + task_manager.spawn_essential_handle().spawn_blocking("babe-proposer", "", babe); } // Spawn authority discovery module. @@ -463,7 +463,7 @@ pub fn new_full_base( task_manager .spawn_handle() - .spawn("authority-discovery-worker", authority_discovery_worker.run()); + .spawn("authority-discovery-worker", "", authority_discovery_worker.run()); } // if the node isn't actively participating in consensus then it doesn't @@ -503,7 +503,7 @@ pub fn new_full_base( // if it fails we take down the service with it. task_manager .spawn_essential_handle() - .spawn_blocking("grandpa-voter", grandpa::run_grandpa_voter(grandpa_config)?); + .spawn_blocking("grandpa-voter", "", grandpa::run_grandpa_voter(grandpa_config)?); } network_starter.start_network(); diff --git a/bin/node/testing/src/bench.rs b/bin/node/testing/src/bench.rs index cf0a463cc3e99..1cf1b7b1e17e0 100644 --- a/bin/node/testing/src/bench.rs +++ b/bin/node/testing/src/bench.rs @@ -243,11 +243,11 @@ impl TaskExecutor { } impl SpawnNamed for TaskExecutor { - fn spawn(&self, _: &'static str, future: futures::future::BoxFuture<'static, ()>) { + fn spawn(&self, _: &'static str, _: &'static str, future: futures::future::BoxFuture<'static, ()>) { self.pool.spawn_ok(future); } - fn spawn_blocking(&self, _: &'static str, future: futures::future::BoxFuture<'static, ()>) { + fn spawn_blocking(&self, _: &'static str, _: &'static str, future: futures::future::BoxFuture<'static, ()>) { self.pool.spawn_ok(future); } } diff --git a/client/basic-authorship/src/basic_authorship.rs b/client/basic-authorship/src/basic_authorship.rs index 573601a9102c5..e80dff427e24c 100644 --- a/client/basic-authorship/src/basic_authorship.rs +++ b/client/basic-authorship/src/basic_authorship.rs @@ -270,6 +270,7 @@ where spawn_handle.spawn_blocking( "basic-authorship-proposer", + "", Box::pin(async move { // leave some time for evaluation and block finalization (33%) let deadline = (self.now)() + max_duration - max_duration / 3; diff --git a/client/consensus/common/src/import_queue/basic_queue.rs b/client/consensus/common/src/import_queue/basic_queue.rs index 9042c8798be4f..c382cce91c6f5 100644 --- a/client/consensus/common/src/import_queue/basic_queue.rs +++ b/client/consensus/common/src/import_queue/basic_queue.rs @@ -89,7 +89,7 @@ impl BasicQueue { metrics, ); - spawner.spawn_essential_blocking("basic-block-import-worker", future.boxed()); + spawner.spawn_essential_blocking("basic-block-import-worker", "", future.boxed()); Self { justification_sender, block_import_sender, result_port, _phantom: PhantomData } } diff --git a/client/executor/src/native_executor.rs b/client/executor/src/native_executor.rs index d912fc0fd13c9..17eae29050570 100644 --- a/client/executor/src/native_executor.rs +++ b/client/executor/src/native_executor.rs @@ -399,6 +399,7 @@ impl RuntimeSpawn for RuntimeInstanceSpawn { let scheduler = self.scheduler.clone(); self.scheduler.spawn( "executor-extra-runtime-instance", + "", Box::pin(async move { let module = AssertUnwindSafe(module); diff --git a/client/offchain/src/lib.rs b/client/offchain/src/lib.rs index a77fd17a2c8b8..5c17067173706 100644 --- a/client/offchain/src/lib.rs +++ b/client/offchain/src/lib.rs @@ -226,6 +226,7 @@ pub async fn notification_future( if n.is_new_best { spawner.spawn( "offchain-on-block", + "", offchain .on_block_imported(&n.header, network_provider.clone(), is_validator) .boxed(), diff --git a/client/rpc/src/lib.rs b/client/rpc/src/lib.rs index 832585db4854c..a974fe12aea4b 100644 --- a/client/rpc/src/lib.rs +++ b/client/rpc/src/lib.rs @@ -54,7 +54,7 @@ impl SubscriptionTaskExecutor { impl Spawn for SubscriptionTaskExecutor { fn spawn_obj(&self, future: FutureObj<'static, ()>) -> Result<(), SpawnError> { - self.0.spawn("substrate-rpc-subscription", future.map(drop).boxed()); + self.0.spawn("substrate-rpc-subscription", "", future.map(drop).boxed()); Ok(()) } diff --git a/client/service/src/builder.rs b/client/service/src/builder.rs index bcb05ce743701..591b28abb5d55 100644 --- a/client/service/src/builder.rs +++ b/client/service/src/builder.rs @@ -424,6 +424,7 @@ where if let Some(offchain) = offchain_workers.clone() { spawn_handle.spawn( "offchain-notifications", + "", sc_offchain::notification_future( config.role.is_authority(), client.clone(), @@ -505,11 +506,13 @@ where // Inform the tx pool about imported and finalized blocks. spawn_handle.spawn( "txpool-notifications", + "", sc_transaction_pool::notification_future(client.clone(), transaction_pool.clone()), ); spawn_handle.spawn( "on-transaction-imported", + "", transaction_notifications(transaction_pool.clone(), network.clone(), telemetry.clone()), ); @@ -520,6 +523,7 @@ where let metrics = MetricsService::with_prometheus(telemetry.clone(), ®istry, &config)?; spawn_handle.spawn( "prometheus-endpoint", + "", prometheus_endpoint::init_prometheus(port, registry).map(drop), ); @@ -531,6 +535,7 @@ where // Periodically updated metrics and telemetry updates. spawn_handle.spawn( "telemetry-periodic-send", + "", metrics_service.run(client.clone(), transaction_pool.clone(), network.clone()), ); @@ -567,6 +572,7 @@ where // Spawn informant task spawn_handle.spawn( "informant", + "", sc_informant::build( client.clone(), network.clone(), @@ -798,7 +804,7 @@ where config.network.default_peers_set.in_peers as usize + config.network.default_peers_set.out_peers as usize, ); - spawn_handle.spawn("block_request_handler", handler.run()); + spawn_handle.spawn("block_request_handler", "", handler.run()); protocol_config } }; @@ -815,7 +821,7 @@ where config.network.default_peers_set.in_peers as usize + config.network.default_peers_set.out_peers as usize, ); - spawn_handle.spawn("state_request_handler", handler.run()); + spawn_handle.spawn("state_request_handler", "", handler.run()); protocol_config } }; @@ -828,7 +834,7 @@ where // Allow both outgoing and incoming requests. let (handler, protocol_config) = WarpSyncRequestHandler::new(protocol_id.clone(), provider.clone()); - spawn_handle.spawn("warp_sync_request_handler", handler.run()); + spawn_handle.spawn("warp_sync_request_handler", "", handler.run()); protocol_config }; (provider, protocol_config) @@ -842,7 +848,7 @@ where // Allow both outgoing and incoming requests. let (handler, protocol_config) = LightClientRequestHandler::new(&protocol_id, client.clone()); - spawn_handle.spawn("light_client_request_handler", handler.run()); + spawn_handle.spawn("light_client_request_handler", "", handler.run()); protocol_config } }; @@ -852,13 +858,13 @@ where executor: { let spawn_handle = Clone::clone(&spawn_handle); Some(Box::new(move |fut| { - spawn_handle.spawn("libp2p-node", fut); + spawn_handle.spawn("libp2p-node", "", fut); })) }, transactions_handler_executor: { let spawn_handle = Clone::clone(&spawn_handle); Box::new(move |fut| { - spawn_handle.spawn("network-transactions-handler", fut); + spawn_handle.spawn("network-transactions-handler", "", fut); }) }, network_config: config.network.clone(), @@ -920,7 +926,7 @@ where // issue, and ideally we would like to fix the network future to take as little time as // possible, but we also take the extra harm-prevention measure to execute the networking // future using `spawn_blocking`. - spawn_handle.spawn_blocking("network-worker", async move { + spawn_handle.spawn_blocking("network-worker", "", async move { if network_start_rx.await.is_err() { log::warn!( "The NetworkStart returned as part of `build_network` has been silently dropped" diff --git a/client/transaction-pool/src/api.rs b/client/transaction-pool/src/api.rs index a735c67d846ce..d1b85e2d6aa32 100644 --- a/client/transaction-pool/src/api.rs +++ b/client/transaction-pool/src/api.rs @@ -64,6 +64,7 @@ fn spawn_validation_pool_task( ) { spawner.spawn_essential_blocking( name, + "substrate_tx_pool", async move { loop { let task = receiver.lock().await.next().await; diff --git a/client/transaction-pool/src/lib.rs b/client/transaction-pool/src/lib.rs index 8af73c3fe5b48..b65c0ec8c4ac6 100644 --- a/client/transaction-pool/src/lib.rs +++ b/client/transaction-pool/src/lib.rs @@ -217,7 +217,7 @@ where }; if let Some(background_task) = background_task { - spawner.spawn_essential("txpool-background", background_task); + spawner.spawn_essential("txpool-background", "substrate_tx_pool", background_task); } Self { diff --git a/primitives/io/src/batch_verifier.rs b/primitives/io/src/batch_verifier.rs index b6da1d85907bd..f66071d0dc11e 100644 --- a/primitives/io/src/batch_verifier.rs +++ b/primitives/io/src/batch_verifier.rs @@ -74,6 +74,7 @@ impl BatchVerifier { self.scheduler.spawn( name, + "", async move { if !f() { invalid_clone.store(true, AtomicOrdering::Relaxed); @@ -178,6 +179,7 @@ impl BatchVerifier { let (sender, receiver) = std::sync::mpsc::channel(); self.scheduler.spawn( "substrate_batch_verify_join", + "", async move { futures::future::join_all(pending).await; sender.send(()).expect( diff --git a/primitives/tasks/src/lib.rs b/primitives/tasks/src/lib.rs index e9c80ae5ff4c8..7fafb0022baf1 100644 --- a/primitives/tasks/src/lib.rs +++ b/primitives/tasks/src/lib.rs @@ -95,6 +95,7 @@ mod inner { let extra_scheduler = scheduler.clone(); scheduler.spawn( "parallel-runtime-spawn", + "substrate_runtime", Box::pin(async move { let result = match crate::new_async_externalities(extra_scheduler) { Ok(mut ext) => { diff --git a/test-utils/test-runner/src/client.rs b/test-utils/test-runner/src/client.rs index 58c4cf6503a93..58030990bcd62 100644 --- a/test-utils/test-runner/src/client.rs +++ b/test-utils/test-runner/src/client.rs @@ -235,7 +235,7 @@ where }); // spawn the authorship task as an essential task. - task_manager.spawn_essential_handle().spawn("manual-seal", authorship_future); + task_manager.spawn_essential_handle().spawn("manual-seal", "", authorship_future); network_starter.start_network(); let rpc_handler = rpc_handlers.io_handler(); From 355a2981d5b93c5bb1c90d4a1c494dbdeb974f82 Mon Sep 17 00:00:00 2001 From: Andrei Sandu Date: Mon, 8 Nov 2021 10:40:26 +0000 Subject: [PATCH 13/16] cargo fmt Signed-off-by: Andrei Sandu --- bin/node/cli/src/service.rs | 16 +++-- bin/node/testing/src/bench.rs | 14 ++++- client/consensus/babe/src/verification.rs | 4 +- client/service/src/task_manager/mod.rs | 74 +++++++++++++---------- primitives/core/src/testing.rs | 21 ++++++- primitives/core/src/traits.rs | 14 ++++- test-utils/test-runner/src/client.rs | 4 +- 7 files changed, 99 insertions(+), 48 deletions(-) diff --git a/bin/node/cli/src/service.rs b/bin/node/cli/src/service.rs index 07f02e8f9a2aa..dbc471d19acf5 100644 --- a/bin/node/cli/src/service.rs +++ b/bin/node/cli/src/service.rs @@ -461,9 +461,11 @@ pub fn new_full_base( prometheus_registry.clone(), ); - task_manager - .spawn_handle() - .spawn("authority-discovery-worker", "", authority_discovery_worker.run()); + task_manager.spawn_handle().spawn( + "authority-discovery-worker", + "", + authority_discovery_worker.run(), + ); } // if the node isn't actively participating in consensus then it doesn't @@ -501,9 +503,11 @@ pub fn new_full_base( // the GRANDPA voter task is considered infallible, i.e. // if it fails we take down the service with it. - task_manager - .spawn_essential_handle() - .spawn_blocking("grandpa-voter", "", grandpa::run_grandpa_voter(grandpa_config)?); + task_manager.spawn_essential_handle().spawn_blocking( + "grandpa-voter", + "", + grandpa::run_grandpa_voter(grandpa_config)?, + ); } network_starter.start_network(); diff --git a/bin/node/testing/src/bench.rs b/bin/node/testing/src/bench.rs index 1cf1b7b1e17e0..131e85b60f9d8 100644 --- a/bin/node/testing/src/bench.rs +++ b/bin/node/testing/src/bench.rs @@ -243,11 +243,21 @@ impl TaskExecutor { } impl SpawnNamed for TaskExecutor { - fn spawn(&self, _: &'static str, _: &'static str, future: futures::future::BoxFuture<'static, ()>) { + fn spawn( + &self, + _: &'static str, + _: &'static str, + future: futures::future::BoxFuture<'static, ()>, + ) { self.pool.spawn_ok(future); } - fn spawn_blocking(&self, _: &'static str, _: &'static str, future: futures::future::BoxFuture<'static, ()>) { + fn spawn_blocking( + &self, + _: &'static str, + _: &'static str, + future: futures::future::BoxFuture<'static, ()>, + ) { self.pool.spawn_ok(future); } } diff --git a/client/consensus/babe/src/verification.rs b/client/consensus/babe/src/verification.rs index af118312dd07c..1554fa6de31be 100644 --- a/client/consensus/babe/src/verification.rs +++ b/client/consensus/babe/src/verification.rs @@ -114,7 +114,7 @@ where ); check_secondary_plain_header::(pre_hash, secondary, sig, &epoch)?; - } + }, PreDigest::SecondaryVRF(secondary) if epoch.config.allowed_slots.is_secondary_vrf_slots_allowed() => { @@ -125,7 +125,7 @@ where ); check_secondary_vrf_header::(pre_hash, secondary, sig, &epoch)?; - } + }, _ => return Err(babe_err(Error::SecondarySlotAssignmentsDisabled)), } diff --git a/client/service/src/task_manager/mod.rs b/client/service/src/task_manager/mod.rs index 2681142d760f3..ae16653964107 100644 --- a/client/service/src/task_manager/mod.rs +++ b/client/service/src/task_manager/mod.rs @@ -56,7 +56,12 @@ impl SpawnTaskHandle { /// /// In other words, it would be a bad idea for someone to do for example /// `spawn(format!("{:?}", some_public_key))`. - pub fn spawn(&self, name: &'static str, group: &'static str, task: impl Future + Send + 'static) { + pub fn spawn( + &self, + name: &'static str, + group: &'static str, + task: impl Future + Send + 'static, + ) { self.spawn_inner(name, group, task, TaskType::Async) } @@ -67,7 +72,7 @@ impl SpawnTaskHandle { group: &'static str, task: impl Future + Send + 'static, ) { - self.spawn_inner(name, group, task, TaskType::Blocking) + self.spawn_inner(name, group, task, TaskType::Blocking) } /// Helper function that implements the spawning logic. See `spawn` and `spawn_blocking`. @@ -80,7 +85,7 @@ impl SpawnTaskHandle { ) { if self.task_notifier.is_closed() { debug!("Attempt to spawn a new task has been prevented: {}", name); - return; + return } let on_exit = self.on_exit.clone(); @@ -91,20 +96,15 @@ impl SpawnTaskHandle { if let Some(metrics) = &self.metrics { metrics.tasks_spawned.with_label_values(&[name, group]).inc(); // We do a dummy increase in order for the task to show up in metrics. - metrics - .tasks_ended - .with_label_values(&[name, "finished", group]) - .inc_by(0); + metrics.tasks_ended.with_label_values(&[name, "finished", group]).inc_by(0); } let future = async move { if let Some(metrics) = metrics { // Add some wrappers around `task`. let task = { - let poll_duration = - metrics.poll_duration.with_label_values(&[name, group]); - let poll_start = - metrics.poll_start.with_label_values(&[name, group]); + let poll_duration = metrics.poll_duration.with_label_values(&[name, group]); + let poll_start = metrics.poll_start.with_label_values(&[name, group]); let inner = prometheus_future::with_poll_durations(poll_duration, poll_start, task); // The logic of `AssertUnwindSafe` here is ok considering that we throw @@ -115,25 +115,16 @@ impl SpawnTaskHandle { match select(on_exit, task).await { Either::Right((Err(payload), _)) => { - metrics - .tasks_ended - .with_label_values(&[name, "panic", group]) - .inc(); + metrics.tasks_ended.with_label_values(&[name, "panic", group]).inc(); panic::resume_unwind(payload) - } + }, Either::Right((Ok(()), _)) => { - metrics - .tasks_ended - .with_label_values(&[name, "finished", group]) - .inc(); - } + metrics.tasks_ended.with_label_values(&[name, "finished", group]).inc(); + }, Either::Left(((), _)) => { // The `on_exit` has triggered. - metrics - .tasks_ended - .with_label_values(&[name, "interrupted", group]) - .inc(); - } + metrics.tasks_ended.with_label_values(&[name, "interrupted", group]).inc(); + }, } } else { futures::pin_mut!(task); @@ -149,7 +140,7 @@ impl SpawnTaskHandle { self.tokio_handle.spawn_blocking(move || { handle.block_on(future); }) - } + }, }; let _ = self.task_notifier.unbounded_send(join_handle); @@ -157,14 +148,18 @@ impl SpawnTaskHandle { } impl sp_core::traits::SpawnNamed for SpawnTaskHandle { - fn spawn_blocking(&self, name: &'static str, group: &'static str,future: BoxFuture<'static, ()>) { + fn spawn_blocking( + &self, + name: &'static str, + group: &'static str, + future: BoxFuture<'static, ()>, + ) { self.spawn_inner(name, group, future, TaskType::Blocking) } fn spawn(&self, name: &'static str, group: &'static str, future: BoxFuture<'static, ()>) { self.spawn_inner(name, group, future, TaskType::Async) } - } /// A wrapper over `SpawnTaskHandle` that will notify a receiver whenever any @@ -189,7 +184,12 @@ impl SpawnEssentialTaskHandle { /// Spawns the given task with the given name. /// /// See also [`SpawnTaskHandle::spawn`]. - pub fn spawn(&self, name: &'static str, group: &'static str, task: impl Future + Send + 'static) { + pub fn spawn( + &self, + name: &'static str, + group: &'static str, + task: impl Future + Send + 'static, + ) { self.spawn_inner(name, group, task, TaskType::Async) } @@ -223,11 +223,21 @@ impl SpawnEssentialTaskHandle { } impl sp_core::traits::SpawnEssentialNamed for SpawnEssentialTaskHandle { - fn spawn_essential_blocking(&self, name: &'static str, group: &'static str, future: BoxFuture<'static, ()>) { + fn spawn_essential_blocking( + &self, + name: &'static str, + group: &'static str, + future: BoxFuture<'static, ()>, + ) { self.spawn_blocking(name, group, future); } - fn spawn_essential(&self, name: &'static str, group: &'static str, future: BoxFuture<'static, ()>) { + fn spawn_essential( + &self, + name: &'static str, + group: &'static str, + future: BoxFuture<'static, ()>, + ) { self.spawn(name, group, future); } } diff --git a/primitives/core/src/testing.rs b/primitives/core/src/testing.rs index a178b6261dcc3..3152ebdf1241d 100644 --- a/primitives/core/src/testing.rs +++ b/primitives/core/src/testing.rs @@ -152,10 +152,20 @@ impl Default for TaskExecutor { #[cfg(feature = "std")] impl crate::traits::SpawnNamed for TaskExecutor { - fn spawn_blocking(&self, _: &'static str, _: &'static str, future: futures::future::BoxFuture<'static, ()>) { + fn spawn_blocking( + &self, + _: &'static str, + _: &'static str, + future: futures::future::BoxFuture<'static, ()>, + ) { self.0.spawn_ok(future); } - fn spawn(&self, _: &'static str, _: &'static str, future: futures::future::BoxFuture<'static, ()>) { + fn spawn( + &self, + _: &'static str, + _: &'static str, + future: futures::future::BoxFuture<'static, ()>, + ) { self.0.spawn_ok(future); } } @@ -170,7 +180,12 @@ impl crate::traits::SpawnEssentialNamed for TaskExecutor { ) { self.0.spawn_ok(future); } - fn spawn_essential(&self, _: &'static str, _: &'static str, future: futures::future::BoxFuture<'static, ()>) { + fn spawn_essential( + &self, + _: &'static str, + _: &'static str, + future: futures::future::BoxFuture<'static, ()>, + ) { self.0.spawn_ok(future); } } diff --git a/primitives/core/src/traits.rs b/primitives/core/src/traits.rs index 4642db6fc5b09..0832ebd56b6f5 100644 --- a/primitives/core/src/traits.rs +++ b/primitives/core/src/traits.rs @@ -250,7 +250,12 @@ pub trait SpawnEssentialNamed: Clone + Send + Sync { /// Spawn the given non-blocking future. /// /// The given `name` is used to identify the future in tracing. - fn spawn_essential(&self, name: &'static str, group: &'static str, future: futures::future::BoxFuture<'static, ()>); + fn spawn_essential( + &self, + name: &'static str, + group: &'static str, + future: futures::future::BoxFuture<'static, ()>, + ); } impl SpawnEssentialNamed for Box { @@ -263,7 +268,12 @@ impl SpawnEssentialNamed for Box { (**self).spawn_essential_blocking(name, group, future) } - fn spawn_essential(&self, name: &'static str, group: &'static str, future: futures::future::BoxFuture<'static, ()>) { + fn spawn_essential( + &self, + name: &'static str, + group: &'static str, + future: futures::future::BoxFuture<'static, ()>, + ) { (**self).spawn_essential(name, group, future) } } diff --git a/test-utils/test-runner/src/client.rs b/test-utils/test-runner/src/client.rs index 58030990bcd62..7445c4a0d9221 100644 --- a/test-utils/test-runner/src/client.rs +++ b/test-utils/test-runner/src/client.rs @@ -235,7 +235,9 @@ where }); // spawn the authorship task as an essential task. - task_manager.spawn_essential_handle().spawn("manual-seal", "", authorship_future); + task_manager + .spawn_essential_handle() + .spawn("manual-seal", "", authorship_future); network_starter.start_network(); let rpc_handler = rpc_handlers.io_handler(); From 53d5c4be56e56dd4ad4b70e2398452032b03f378 Mon Sep 17 00:00:00 2001 From: Andrei Sandu Date: Mon, 8 Nov 2021 12:04:58 +0000 Subject: [PATCH 14/16] remove old code Signed-off-by: Andrei Sandu --- client/service/src/task_manager/mod.rs | 11 ----------- primitives/core/src/traits.rs | 16 ---------------- 2 files changed, 27 deletions(-) diff --git a/client/service/src/task_manager/mod.rs b/client/service/src/task_manager/mod.rs index be4a0d3503d0f..e536a6aa948a8 100644 --- a/client/service/src/task_manager/mod.rs +++ b/client/service/src/task_manager/mod.rs @@ -90,8 +90,6 @@ impl SpawnTaskHandle { let on_exit = self.on_exit.clone(); let metrics = self.metrics.clone(); - // Provide a default subsystem name. - let subsystem_name = subsystem.unwrap_or("substrate-unspecified"); // Note that we increase the started counter here and not within the future. This way, // we could properly visualize on Prometheus situations where the spawning doesn't work. @@ -162,15 +160,6 @@ impl sp_core::traits::SpawnNamed for SpawnTaskHandle { fn spawn(&self, name: &'static str, group: &'static str, future: BoxFuture<'static, ()>) { self.spawn_inner(name, group, future, TaskType::Async) } - - fn spawn_blocking_with_subsystem(&self, name: &'static str, subsystem: &'static str,future: BoxFuture<'static, ()>) { - self.spawn_inner(name, Some(subsystem), future, TaskType::Blocking) - } - - fn spawn_with_subsystem(&self, name: &'static str, subsystem: &'static str, future: BoxFuture<'static, ()>) { - self.spawn_inner(name, Some(subsystem), future, TaskType::Async) - } - } /// A wrapper over `SpawnTaskHandle` that will notify a receiver whenever any diff --git a/primitives/core/src/traits.rs b/primitives/core/src/traits.rs index f9a1f79276857..0832ebd56b6f5 100644 --- a/primitives/core/src/traits.rs +++ b/primitives/core/src/traits.rs @@ -231,22 +231,6 @@ impl SpawnNamed for Box { ) { (**self).spawn(name, group, future) } - fn spawn_blocking_with_subsystem( - &self, - name: &'static str, - subsystem: &'static str, - future: futures::future::BoxFuture<'static, ()>, - ) { - (**self).spawn_blocking_with_subsystem(name, subsystem, future) - } - fn spawn_with_subsystem( - &self, - name: &'static str, - subsystem: &'static str, - future: futures::future::BoxFuture<'static, ()>, - ) { - (**self).spawn_with_subsystem(name, subsystem, future) - } } /// Something that can spawn essential tasks (blocking and non-blocking) with an assigned name. From aa260a67f3684a92c338983da740b18cb79c5359 Mon Sep 17 00:00:00 2001 From: Andrei Sandu Date: Mon, 8 Nov 2021 12:55:35 +0000 Subject: [PATCH 15/16] cargo fmt - the right version Signed-off-by: Andrei Sandu --- client/consensus/babe/src/verification.rs | 4 ++-- client/service/src/task_manager/mod.rs | 6 +++--- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/client/consensus/babe/src/verification.rs b/client/consensus/babe/src/verification.rs index 1554fa6de31be..af118312dd07c 100644 --- a/client/consensus/babe/src/verification.rs +++ b/client/consensus/babe/src/verification.rs @@ -114,7 +114,7 @@ where ); check_secondary_plain_header::(pre_hash, secondary, sig, &epoch)?; - }, + } PreDigest::SecondaryVRF(secondary) if epoch.config.allowed_slots.is_secondary_vrf_slots_allowed() => { @@ -125,7 +125,7 @@ where ); check_secondary_vrf_header::(pre_hash, secondary, sig, &epoch)?; - }, + } _ => return Err(babe_err(Error::SecondarySlotAssignmentsDisabled)), } diff --git a/client/service/src/task_manager/mod.rs b/client/service/src/task_manager/mod.rs index e536a6aa948a8..ae16653964107 100644 --- a/client/service/src/task_manager/mod.rs +++ b/client/service/src/task_manager/mod.rs @@ -85,7 +85,7 @@ impl SpawnTaskHandle { ) { if self.task_notifier.is_closed() { debug!("Attempt to spawn a new task has been prevented: {}", name); - return; + return } let on_exit = self.on_exit.clone(); @@ -117,7 +117,7 @@ impl SpawnTaskHandle { Either::Right((Err(payload), _)) => { metrics.tasks_ended.with_label_values(&[name, "panic", group]).inc(); panic::resume_unwind(payload) - } + }, Either::Right((Ok(()), _)) => { metrics.tasks_ended.with_label_values(&[name, "finished", group]).inc(); }, @@ -140,7 +140,7 @@ impl SpawnTaskHandle { self.tokio_handle.spawn_blocking(move || { handle.block_on(future); }) - } + }, }; let _ = self.task_notifier.unbounded_send(join_handle); From 62663c4e8aa12168c3a2a5ddf8e1ec328b40e2a3 Mon Sep 17 00:00:00 2001 From: Andrei Sandu Date: Wed, 10 Nov 2021 10:52:55 +0000 Subject: [PATCH 16/16] Implement review feedback - use Option group name in SpawnNamed methods - switch to kebab case - implement default group name - add group name to some tasks Signed-off-by: Andrei Sandu --- bin/node-template/node/src/service.rs | 8 +-- bin/node/cli/src/service.rs | 12 +++-- bin/node/testing/src/bench.rs | 4 +- .../basic-authorship/src/basic_authorship.rs | 2 +- .../common/src/import_queue/basic_queue.rs | 6 ++- client/executor/src/native_executor.rs | 2 +- client/offchain/src/lib.rs | 2 +- client/rpc/src/lib.rs | 3 +- client/service/src/builder.rs | 26 ++++----- client/service/src/lib.rs | 2 +- client/service/src/task_manager/mod.rs | 39 +++++++++----- client/service/src/task_manager/tests.rs | 54 +++++++++---------- client/transaction-pool/src/api.rs | 2 +- client/transaction-pool/src/lib.rs | 2 +- primitives/core/src/testing.rs | 12 ++--- primitives/core/src/traits.rs | 25 ++++----- primitives/io/src/batch_verifier.rs | 6 +-- primitives/tasks/src/lib.rs | 2 +- test-utils/test-runner/src/client.rs | 2 +- 19 files changed, 117 insertions(+), 94 deletions(-) diff --git a/bin/node-template/node/src/service.rs b/bin/node-template/node/src/service.rs index 92b1fa80f7b71..c71336c330882 100644 --- a/bin/node-template/node/src/service.rs +++ b/bin/node-template/node/src/service.rs @@ -89,7 +89,7 @@ pub fn new_partial( let client = Arc::new(client); let telemetry = telemetry.map(|(worker, telemetry)| { - task_manager.spawn_handle().spawn("telemetry", "", worker.run()); + task_manager.spawn_handle().spawn("telemetry", None, worker.run()); telemetry }); @@ -289,7 +289,9 @@ pub fn new_full(mut config: Configuration) -> Result // the AURA authoring task is considered essential, i.e. if it // fails we take down the service with it. - task_manager.spawn_essential_handle().spawn_blocking("aura", "", aura); + task_manager + .spawn_essential_handle() + .spawn_blocking("aura", Some("block-authoring"), aura); } // if the node isn't actively participating in consensus then it doesn't @@ -329,7 +331,7 @@ pub fn new_full(mut config: Configuration) -> Result // if it fails we take down the service with it. task_manager.spawn_essential_handle().spawn_blocking( "grandpa-voter", - "", + None, sc_finality_grandpa::run_grandpa_voter(grandpa_config)?, ); } diff --git a/bin/node/cli/src/service.rs b/bin/node/cli/src/service.rs index dbc471d19acf5..39307bafadd06 100644 --- a/bin/node/cli/src/service.rs +++ b/bin/node/cli/src/service.rs @@ -170,7 +170,7 @@ pub fn new_partial( let client = Arc::new(client); let telemetry = telemetry.map(|(worker, telemetry)| { - task_manager.spawn_handle().spawn("telemetry", "", worker.run()); + task_manager.spawn_handle().spawn("telemetry", None, worker.run()); telemetry }); @@ -434,7 +434,11 @@ pub fn new_full_base( }; let babe = sc_consensus_babe::start_babe(babe_config)?; - task_manager.spawn_essential_handle().spawn_blocking("babe-proposer", "", babe); + task_manager.spawn_essential_handle().spawn_blocking( + "babe-proposer", + Some("block-authoring"), + babe, + ); } // Spawn authority discovery module. @@ -463,7 +467,7 @@ pub fn new_full_base( task_manager.spawn_handle().spawn( "authority-discovery-worker", - "", + Some("networking"), authority_discovery_worker.run(), ); } @@ -505,7 +509,7 @@ pub fn new_full_base( // if it fails we take down the service with it. task_manager.spawn_essential_handle().spawn_blocking( "grandpa-voter", - "", + None, grandpa::run_grandpa_voter(grandpa_config)?, ); } diff --git a/bin/node/testing/src/bench.rs b/bin/node/testing/src/bench.rs index 131e85b60f9d8..5ee1ec998be4d 100644 --- a/bin/node/testing/src/bench.rs +++ b/bin/node/testing/src/bench.rs @@ -246,7 +246,7 @@ impl SpawnNamed for TaskExecutor { fn spawn( &self, _: &'static str, - _: &'static str, + _: Option<&'static str>, future: futures::future::BoxFuture<'static, ()>, ) { self.pool.spawn_ok(future); @@ -255,7 +255,7 @@ impl SpawnNamed for TaskExecutor { fn spawn_blocking( &self, _: &'static str, - _: &'static str, + _: Option<&'static str>, future: futures::future::BoxFuture<'static, ()>, ) { self.pool.spawn_ok(future); diff --git a/client/basic-authorship/src/basic_authorship.rs b/client/basic-authorship/src/basic_authorship.rs index e80dff427e24c..305c4d753c1ea 100644 --- a/client/basic-authorship/src/basic_authorship.rs +++ b/client/basic-authorship/src/basic_authorship.rs @@ -270,7 +270,7 @@ where spawn_handle.spawn_blocking( "basic-authorship-proposer", - "", + None, Box::pin(async move { // leave some time for evaluation and block finalization (33%) let deadline = (self.now)() + max_duration - max_duration / 3; diff --git a/client/consensus/common/src/import_queue/basic_queue.rs b/client/consensus/common/src/import_queue/basic_queue.rs index c382cce91c6f5..0461d7cf954cb 100644 --- a/client/consensus/common/src/import_queue/basic_queue.rs +++ b/client/consensus/common/src/import_queue/basic_queue.rs @@ -89,7 +89,11 @@ impl BasicQueue { metrics, ); - spawner.spawn_essential_blocking("basic-block-import-worker", "", future.boxed()); + spawner.spawn_essential_blocking( + "basic-block-import-worker", + Some("block-import"), + future.boxed(), + ); Self { justification_sender, block_import_sender, result_port, _phantom: PhantomData } } diff --git a/client/executor/src/native_executor.rs b/client/executor/src/native_executor.rs index 17eae29050570..62e76d559c0f2 100644 --- a/client/executor/src/native_executor.rs +++ b/client/executor/src/native_executor.rs @@ -399,7 +399,7 @@ impl RuntimeSpawn for RuntimeInstanceSpawn { let scheduler = self.scheduler.clone(); self.scheduler.spawn( "executor-extra-runtime-instance", - "", + None, Box::pin(async move { let module = AssertUnwindSafe(module); diff --git a/client/offchain/src/lib.rs b/client/offchain/src/lib.rs index 5c17067173706..2de24e10d927d 100644 --- a/client/offchain/src/lib.rs +++ b/client/offchain/src/lib.rs @@ -226,7 +226,7 @@ pub async fn notification_future( if n.is_new_best { spawner.spawn( "offchain-on-block", - "", + Some("offchain-worker"), offchain .on_block_imported(&n.header, network_provider.clone(), is_validator) .boxed(), diff --git a/client/rpc/src/lib.rs b/client/rpc/src/lib.rs index a974fe12aea4b..8f951632698fd 100644 --- a/client/rpc/src/lib.rs +++ b/client/rpc/src/lib.rs @@ -54,7 +54,8 @@ impl SubscriptionTaskExecutor { impl Spawn for SubscriptionTaskExecutor { fn spawn_obj(&self, future: FutureObj<'static, ()>) -> Result<(), SpawnError> { - self.0.spawn("substrate-rpc-subscription", "", future.map(drop).boxed()); + self.0 + .spawn("substrate-rpc-subscription", Some("rpc"), future.map(drop).boxed()); Ok(()) } diff --git a/client/service/src/builder.rs b/client/service/src/builder.rs index 591b28abb5d55..88ba6282b5f4e 100644 --- a/client/service/src/builder.rs +++ b/client/service/src/builder.rs @@ -424,7 +424,7 @@ where if let Some(offchain) = offchain_workers.clone() { spawn_handle.spawn( "offchain-notifications", - "", + Some("offchain-worker"), sc_offchain::notification_future( config.role.is_authority(), client.clone(), @@ -506,13 +506,13 @@ where // Inform the tx pool about imported and finalized blocks. spawn_handle.spawn( "txpool-notifications", - "", + Some("transaction-pool"), sc_transaction_pool::notification_future(client.clone(), transaction_pool.clone()), ); spawn_handle.spawn( "on-transaction-imported", - "", + Some("transaction-pool"), transaction_notifications(transaction_pool.clone(), network.clone(), telemetry.clone()), ); @@ -523,7 +523,7 @@ where let metrics = MetricsService::with_prometheus(telemetry.clone(), ®istry, &config)?; spawn_handle.spawn( "prometheus-endpoint", - "", + None, prometheus_endpoint::init_prometheus(port, registry).map(drop), ); @@ -535,7 +535,7 @@ where // Periodically updated metrics and telemetry updates. spawn_handle.spawn( "telemetry-periodic-send", - "", + None, metrics_service.run(client.clone(), transaction_pool.clone(), network.clone()), ); @@ -572,7 +572,7 @@ where // Spawn informant task spawn_handle.spawn( "informant", - "", + None, sc_informant::build( client.clone(), network.clone(), @@ -804,7 +804,7 @@ where config.network.default_peers_set.in_peers as usize + config.network.default_peers_set.out_peers as usize, ); - spawn_handle.spawn("block_request_handler", "", handler.run()); + spawn_handle.spawn("block-request-handler", Some("networking"), handler.run()); protocol_config } }; @@ -821,7 +821,7 @@ where config.network.default_peers_set.in_peers as usize + config.network.default_peers_set.out_peers as usize, ); - spawn_handle.spawn("state_request_handler", "", handler.run()); + spawn_handle.spawn("state-request-handler", Some("networking"), handler.run()); protocol_config } }; @@ -834,7 +834,7 @@ where // Allow both outgoing and incoming requests. let (handler, protocol_config) = WarpSyncRequestHandler::new(protocol_id.clone(), provider.clone()); - spawn_handle.spawn("warp_sync_request_handler", "", handler.run()); + spawn_handle.spawn("warp-sync-request-handler", Some("networking"), handler.run()); protocol_config }; (provider, protocol_config) @@ -848,7 +848,7 @@ where // Allow both outgoing and incoming requests. let (handler, protocol_config) = LightClientRequestHandler::new(&protocol_id, client.clone()); - spawn_handle.spawn("light_client_request_handler", "", handler.run()); + spawn_handle.spawn("light-client-request-handler", Some("networking"), handler.run()); protocol_config } }; @@ -858,13 +858,13 @@ where executor: { let spawn_handle = Clone::clone(&spawn_handle); Some(Box::new(move |fut| { - spawn_handle.spawn("libp2p-node", "", fut); + spawn_handle.spawn("libp2p-node", Some("networking"), fut); })) }, transactions_handler_executor: { let spawn_handle = Clone::clone(&spawn_handle); Box::new(move |fut| { - spawn_handle.spawn("network-transactions-handler", "", fut); + spawn_handle.spawn("network-transactions-handler", Some("networking"), fut); }) }, network_config: config.network.clone(), @@ -926,7 +926,7 @@ where // issue, and ideally we would like to fix the network future to take as little time as // possible, but we also take the extra harm-prevention measure to execute the networking // future using `spawn_blocking`. - spawn_handle.spawn_blocking("network-worker", "", async move { + spawn_handle.spawn_blocking("network-worker", Some("networking"), async move { if network_start_rx.await.is_err() { log::warn!( "The NetworkStart returned as part of `build_network` has been silently dropped" diff --git a/client/service/src/lib.rs b/client/service/src/lib.rs index ce77be5a7c1d9..bd43d4c464ea0 100644 --- a/client/service/src/lib.rs +++ b/client/service/src/lib.rs @@ -75,7 +75,7 @@ pub use sc_transaction_pool::Options as TransactionPoolOptions; pub use sc_transaction_pool_api::{error::IntoPoolError, InPoolTransaction, TransactionPool}; #[doc(hidden)] pub use std::{ops::Deref, result::Result, sync::Arc}; -pub use task_manager::{SpawnTaskHandle, TaskManager}; +pub use task_manager::{SpawnTaskHandle, TaskManager, DEFAULT_GROUP_NAME}; const DEFAULT_PROTOCOL_ID: &str = "sup"; diff --git a/client/service/src/task_manager/mod.rs b/client/service/src/task_manager/mod.rs index ae16653964107..64c00226073c7 100644 --- a/client/service/src/task_manager/mod.rs +++ b/client/service/src/task_manager/mod.rs @@ -38,6 +38,9 @@ mod prometheus_future; #[cfg(test)] mod tests; +/// Default task group name. +pub const DEFAULT_GROUP_NAME: &'static str = "default"; + /// An handle for spawning tasks in the service. #[derive(Clone)] pub struct SpawnTaskHandle { @@ -48,18 +51,19 @@ pub struct SpawnTaskHandle { } impl SpawnTaskHandle { - /// Spawns the given task with the given name. + /// Spawns the given task with the given name and an optional group name. + /// If group is not specified `DEFAULT_GROUP_NAME` will be used. /// - /// Note that the `name` is a `&'static str`. The reason for this choice is that statistics - /// about this task are getting reported to the Prometheus endpoint (if enabled), and that - /// therefore the set of possible task names must be bounded. + /// Note that the `name`/`group` is a `&'static str`. The reason for this choice is that + /// statistics about this task are getting reported to the Prometheus endpoint (if enabled), and + /// that therefore the set of possible task names must be bounded. /// /// In other words, it would be a bad idea for someone to do for example /// `spawn(format!("{:?}", some_public_key))`. pub fn spawn( &self, name: &'static str, - group: &'static str, + group: Option<&'static str>, task: impl Future + Send + 'static, ) { self.spawn_inner(name, group, task, TaskType::Async) @@ -69,7 +73,7 @@ impl SpawnTaskHandle { pub fn spawn_blocking( &self, name: &'static str, - group: &'static str, + group: Option<&'static str>, task: impl Future + Send + 'static, ) { self.spawn_inner(name, group, task, TaskType::Blocking) @@ -79,7 +83,7 @@ impl SpawnTaskHandle { fn spawn_inner( &self, name: &'static str, - group: &'static str, + group: Option<&'static str>, task: impl Future + Send + 'static, task_type: TaskType, ) { @@ -90,6 +94,8 @@ impl SpawnTaskHandle { let on_exit = self.on_exit.clone(); let metrics = self.metrics.clone(); + // If no group is specified use default. + let group = group.unwrap_or(DEFAULT_GROUP_NAME); // Note that we increase the started counter here and not within the future. This way, // we could properly visualize on Prometheus situations where the spawning doesn't work. @@ -151,13 +157,18 @@ impl sp_core::traits::SpawnNamed for SpawnTaskHandle { fn spawn_blocking( &self, name: &'static str, - group: &'static str, + group: Option<&'static str>, future: BoxFuture<'static, ()>, ) { self.spawn_inner(name, group, future, TaskType::Blocking) } - fn spawn(&self, name: &'static str, group: &'static str, future: BoxFuture<'static, ()>) { + fn spawn( + &self, + name: &'static str, + group: Option<&'static str>, + future: BoxFuture<'static, ()>, + ) { self.spawn_inner(name, group, future, TaskType::Async) } } @@ -187,7 +198,7 @@ impl SpawnEssentialTaskHandle { pub fn spawn( &self, name: &'static str, - group: &'static str, + group: Option<&'static str>, task: impl Future + Send + 'static, ) { self.spawn_inner(name, group, task, TaskType::Async) @@ -199,7 +210,7 @@ impl SpawnEssentialTaskHandle { pub fn spawn_blocking( &self, name: &'static str, - group: &'static str, + group: Option<&'static str>, task: impl Future + Send + 'static, ) { self.spawn_inner(name, group, task, TaskType::Blocking) @@ -208,7 +219,7 @@ impl SpawnEssentialTaskHandle { fn spawn_inner( &self, name: &'static str, - group: &'static str, + group: Option<&'static str>, task: impl Future + Send + 'static, task_type: TaskType, ) { @@ -226,7 +237,7 @@ impl sp_core::traits::SpawnEssentialNamed for SpawnEssentialTaskHandle { fn spawn_essential_blocking( &self, name: &'static str, - group: &'static str, + group: Option<&'static str>, future: BoxFuture<'static, ()>, ) { self.spawn_blocking(name, group, future); @@ -235,7 +246,7 @@ impl sp_core::traits::SpawnEssentialNamed for SpawnEssentialTaskHandle { fn spawn_essential( &self, name: &'static str, - group: &'static str, + group: Option<&'static str>, future: BoxFuture<'static, ()>, ) { self.spawn(name, group, future); diff --git a/client/service/src/task_manager/tests.rs b/client/service/src/task_manager/tests.rs index aa1c26f7a0feb..75092ff2ae62e 100644 --- a/client/service/src/task_manager/tests.rs +++ b/client/service/src/task_manager/tests.rs @@ -96,8 +96,8 @@ fn ensure_tasks_are_awaited_on_shutdown() { let task_manager = new_task_manager(handle); let spawn_handle = task_manager.spawn_handle(); let drop_tester = DropTester::new(); - spawn_handle.spawn("task1", "", run_background_task(drop_tester.new_ref())); - spawn_handle.spawn("task2", "", run_background_task(drop_tester.new_ref())); + spawn_handle.spawn("task1", None, run_background_task(drop_tester.new_ref())); + spawn_handle.spawn("task2", None, run_background_task(drop_tester.new_ref())); assert_eq!(drop_tester, 2); // allow the tasks to even start runtime.block_on(async { tokio::time::sleep(Duration::from_secs(1)).await }); @@ -115,7 +115,7 @@ fn ensure_keep_alive_during_shutdown() { let spawn_handle = task_manager.spawn_handle(); let drop_tester = DropTester::new(); task_manager.keep_alive(drop_tester.new_ref()); - spawn_handle.spawn("task1", "", run_background_task(())); + spawn_handle.spawn("task1", None, run_background_task(())); assert_eq!(drop_tester, 1); // allow the tasks to even start runtime.block_on(async { tokio::time::sleep(Duration::from_secs(1)).await }); @@ -134,12 +134,12 @@ fn ensure_blocking_futures_are_awaited_on_shutdown() { let drop_tester = DropTester::new(); spawn_handle.spawn( "task1", - "", + None, run_background_task_blocking(Duration::from_secs(3), drop_tester.new_ref()), ); spawn_handle.spawn( "task2", - "", + None, run_background_task_blocking(Duration::from_secs(3), drop_tester.new_ref()), ); assert_eq!(drop_tester, 2); @@ -158,14 +158,14 @@ fn ensure_no_task_can_be_spawn_after_terminate() { let mut task_manager = new_task_manager(handle); let spawn_handle = task_manager.spawn_handle(); let drop_tester = DropTester::new(); - spawn_handle.spawn("task1", "", run_background_task(drop_tester.new_ref())); - spawn_handle.spawn("task2", "", run_background_task(drop_tester.new_ref())); + spawn_handle.spawn("task1", None, run_background_task(drop_tester.new_ref())); + spawn_handle.spawn("task2", None, run_background_task(drop_tester.new_ref())); assert_eq!(drop_tester, 2); // allow the tasks to even start runtime.block_on(async { tokio::time::sleep(Duration::from_secs(1)).await }); assert_eq!(drop_tester, 2); task_manager.terminate(); - spawn_handle.spawn("task3", "", run_background_task(drop_tester.new_ref())); + spawn_handle.spawn("task3", None, run_background_task(drop_tester.new_ref())); runtime.block_on(task_manager.clean_shutdown()); drop_tester.wait_on_drop(); } @@ -178,8 +178,8 @@ fn ensure_task_manager_future_ends_when_task_manager_terminated() { let mut task_manager = new_task_manager(handle); let spawn_handle = task_manager.spawn_handle(); let drop_tester = DropTester::new(); - spawn_handle.spawn("task1", "", run_background_task(drop_tester.new_ref())); - spawn_handle.spawn("task2", "", run_background_task(drop_tester.new_ref())); + spawn_handle.spawn("task1", None, run_background_task(drop_tester.new_ref())); + spawn_handle.spawn("task2", None, run_background_task(drop_tester.new_ref())); assert_eq!(drop_tester, 2); // allow the tasks to even start runtime.block_on(async { tokio::time::sleep(Duration::from_secs(1)).await }); @@ -199,13 +199,13 @@ fn ensure_task_manager_future_ends_with_error_when_essential_task_fails() { let spawn_handle = task_manager.spawn_handle(); let spawn_essential_handle = task_manager.spawn_essential_handle(); let drop_tester = DropTester::new(); - spawn_handle.spawn("task1", "", run_background_task(drop_tester.new_ref())); - spawn_handle.spawn("task2", "", run_background_task(drop_tester.new_ref())); + spawn_handle.spawn("task1", None, run_background_task(drop_tester.new_ref())); + spawn_handle.spawn("task2", None, run_background_task(drop_tester.new_ref())); assert_eq!(drop_tester, 2); // allow the tasks to even start runtime.block_on(async { tokio::time::sleep(Duration::from_secs(1)).await }); assert_eq!(drop_tester, 2); - spawn_essential_handle.spawn("task3", "", async { panic!("task failed") }); + spawn_essential_handle.spawn("task3", None, async { panic!("task failed") }); runtime .block_on(task_manager.future()) .expect_err("future()'s Result must be Err"); @@ -228,10 +228,10 @@ fn ensure_children_tasks_ends_when_task_manager_terminated() { task_manager.add_child(child_2); let spawn_handle = task_manager.spawn_handle(); let drop_tester = DropTester::new(); - spawn_handle.spawn("task1", "", run_background_task(drop_tester.new_ref())); - spawn_handle.spawn("task2", "", run_background_task(drop_tester.new_ref())); - spawn_handle_child_1.spawn("task3", "", run_background_task(drop_tester.new_ref())); - spawn_handle_child_2.spawn("task4", "", run_background_task(drop_tester.new_ref())); + spawn_handle.spawn("task1", None, run_background_task(drop_tester.new_ref())); + spawn_handle.spawn("task2", None, run_background_task(drop_tester.new_ref())); + spawn_handle_child_1.spawn("task3", None, run_background_task(drop_tester.new_ref())); + spawn_handle_child_2.spawn("task4", None, run_background_task(drop_tester.new_ref())); assert_eq!(drop_tester, 4); // allow the tasks to even start runtime.block_on(async { tokio::time::sleep(Duration::from_secs(1)).await }); @@ -257,15 +257,15 @@ fn ensure_task_manager_future_ends_with_error_when_childs_essential_task_fails() task_manager.add_child(child_2); let spawn_handle = task_manager.spawn_handle(); let drop_tester = DropTester::new(); - spawn_handle.spawn("task1", "", run_background_task(drop_tester.new_ref())); - spawn_handle.spawn("task2", "", run_background_task(drop_tester.new_ref())); - spawn_handle_child_1.spawn("task3", "", run_background_task(drop_tester.new_ref())); - spawn_handle_child_2.spawn("task4", "", run_background_task(drop_tester.new_ref())); + spawn_handle.spawn("task1", None, run_background_task(drop_tester.new_ref())); + spawn_handle.spawn("task2", None, run_background_task(drop_tester.new_ref())); + spawn_handle_child_1.spawn("task3", None, run_background_task(drop_tester.new_ref())); + spawn_handle_child_2.spawn("task4", None, run_background_task(drop_tester.new_ref())); assert_eq!(drop_tester, 4); // allow the tasks to even start runtime.block_on(async { tokio::time::sleep(Duration::from_secs(1)).await }); assert_eq!(drop_tester, 4); - spawn_essential_handle_child_1.spawn("task5", "", async { panic!("task failed") }); + spawn_essential_handle_child_1.spawn("task5", None, async { panic!("task failed") }); runtime .block_on(task_manager.future()) .expect_err("future()'s Result must be Err"); @@ -288,15 +288,15 @@ fn ensure_task_manager_future_continues_when_childs_not_essential_task_fails() { task_manager.add_child(child_2); let spawn_handle = task_manager.spawn_handle(); let drop_tester = DropTester::new(); - spawn_handle.spawn("task1", "", run_background_task(drop_tester.new_ref())); - spawn_handle.spawn("task2", "", run_background_task(drop_tester.new_ref())); - spawn_handle_child_1.spawn("task3", "", run_background_task(drop_tester.new_ref())); - spawn_handle_child_2.spawn("task4", "", run_background_task(drop_tester.new_ref())); + spawn_handle.spawn("task1", None, run_background_task(drop_tester.new_ref())); + spawn_handle.spawn("task2", None, run_background_task(drop_tester.new_ref())); + spawn_handle_child_1.spawn("task3", None, run_background_task(drop_tester.new_ref())); + spawn_handle_child_2.spawn("task4", None, run_background_task(drop_tester.new_ref())); assert_eq!(drop_tester, 4); // allow the tasks to even start runtime.block_on(async { tokio::time::sleep(Duration::from_secs(1)).await }); assert_eq!(drop_tester, 4); - spawn_handle_child_1.spawn("task5", "", async { panic!("task failed") }); + spawn_handle_child_1.spawn("task5", None, async { panic!("task failed") }); runtime.block_on(async { let t1 = task_manager.future().fuse(); let t2 = tokio::time::sleep(Duration::from_secs(3)).fuse(); diff --git a/client/transaction-pool/src/api.rs b/client/transaction-pool/src/api.rs index d1b85e2d6aa32..8af0ea98f8100 100644 --- a/client/transaction-pool/src/api.rs +++ b/client/transaction-pool/src/api.rs @@ -64,7 +64,7 @@ fn spawn_validation_pool_task( ) { spawner.spawn_essential_blocking( name, - "substrate_tx_pool", + Some("transaction-pool"), async move { loop { let task = receiver.lock().await.next().await; diff --git a/client/transaction-pool/src/lib.rs b/client/transaction-pool/src/lib.rs index b65c0ec8c4ac6..3565cb52ad87b 100644 --- a/client/transaction-pool/src/lib.rs +++ b/client/transaction-pool/src/lib.rs @@ -217,7 +217,7 @@ where }; if let Some(background_task) = background_task { - spawner.spawn_essential("txpool-background", "substrate_tx_pool", background_task); + spawner.spawn_essential("txpool-background", Some("transaction-pool"), background_task); } Self { diff --git a/primitives/core/src/testing.rs b/primitives/core/src/testing.rs index 3152ebdf1241d..a40a37804c031 100644 --- a/primitives/core/src/testing.rs +++ b/primitives/core/src/testing.rs @@ -154,16 +154,16 @@ impl Default for TaskExecutor { impl crate::traits::SpawnNamed for TaskExecutor { fn spawn_blocking( &self, - _: &'static str, - _: &'static str, + _name: &'static str, + _group: Option<&'static str>, future: futures::future::BoxFuture<'static, ()>, ) { self.0.spawn_ok(future); } fn spawn( &self, - _: &'static str, - _: &'static str, + _name: &'static str, + _group: Option<&'static str>, future: futures::future::BoxFuture<'static, ()>, ) { self.0.spawn_ok(future); @@ -175,7 +175,7 @@ impl crate::traits::SpawnEssentialNamed for TaskExecutor { fn spawn_essential_blocking( &self, _: &'static str, - _: &'static str, + _: Option<&'static str>, future: futures::future::BoxFuture<'static, ()>, ) { self.0.spawn_ok(future); @@ -183,7 +183,7 @@ impl crate::traits::SpawnEssentialNamed for TaskExecutor { fn spawn_essential( &self, _: &'static str, - _: &'static str, + _: Option<&'static str>, future: futures::future::BoxFuture<'static, ()>, ) { self.0.spawn_ok(future); diff --git a/primitives/core/src/traits.rs b/primitives/core/src/traits.rs index 0832ebd56b6f5..e3d7d8e283e21 100644 --- a/primitives/core/src/traits.rs +++ b/primitives/core/src/traits.rs @@ -191,7 +191,7 @@ sp_externalities::decl_extension! { } /// Something that can spawn tasks (blocking and non-blocking) with an assigned name -/// and group. +/// and optional group. #[dyn_clonable::clonable] pub trait SpawnNamed: Clone + Send + Sync { /// Spawn the given blocking future. @@ -200,7 +200,7 @@ pub trait SpawnNamed: Clone + Send + Sync { fn spawn_blocking( &self, name: &'static str, - group: &'static str, + group: Option<&'static str>, future: futures::future::BoxFuture<'static, ()>, ); /// Spawn the given non-blocking future. @@ -209,7 +209,7 @@ pub trait SpawnNamed: Clone + Send + Sync { fn spawn( &self, name: &'static str, - group: &'static str, + group: Option<&'static str>, future: futures::future::BoxFuture<'static, ()>, ); } @@ -218,7 +218,7 @@ impl SpawnNamed for Box { fn spawn_blocking( &self, name: &'static str, - group: &'static str, + group: Option<&'static str>, future: futures::future::BoxFuture<'static, ()>, ) { (**self).spawn_blocking(name, group, future) @@ -226,34 +226,35 @@ impl SpawnNamed for Box { fn spawn( &self, name: &'static str, - group: &'static str, + group: Option<&'static str>, future: futures::future::BoxFuture<'static, ()>, ) { (**self).spawn(name, group, future) } } -/// Something that can spawn essential tasks (blocking and non-blocking) with an assigned name. +/// Something that can spawn essential tasks (blocking and non-blocking) with an assigned name +/// and optional group. /// /// Essential tasks are special tasks that should take down the node when they end. #[dyn_clonable::clonable] pub trait SpawnEssentialNamed: Clone + Send + Sync { /// Spawn the given blocking future. /// - /// The given `name` is used to identify the future in tracing. + /// The given `group` and `name` is used to identify the future in tracing. fn spawn_essential_blocking( &self, name: &'static str, - group: &'static str, + group: Option<&'static str>, future: futures::future::BoxFuture<'static, ()>, ); /// Spawn the given non-blocking future. /// - /// The given `name` is used to identify the future in tracing. + /// The given `group` and `name` is used to identify the future in tracing. fn spawn_essential( &self, name: &'static str, - group: &'static str, + group: Option<&'static str>, future: futures::future::BoxFuture<'static, ()>, ); } @@ -262,7 +263,7 @@ impl SpawnEssentialNamed for Box { fn spawn_essential_blocking( &self, name: &'static str, - group: &'static str, + group: Option<&'static str>, future: futures::future::BoxFuture<'static, ()>, ) { (**self).spawn_essential_blocking(name, group, future) @@ -271,7 +272,7 @@ impl SpawnEssentialNamed for Box { fn spawn_essential( &self, name: &'static str, - group: &'static str, + group: Option<&'static str>, future: futures::future::BoxFuture<'static, ()>, ) { (**self).spawn_essential(name, group, future) diff --git a/primitives/io/src/batch_verifier.rs b/primitives/io/src/batch_verifier.rs index f66071d0dc11e..05c8a63694eb3 100644 --- a/primitives/io/src/batch_verifier.rs +++ b/primitives/io/src/batch_verifier.rs @@ -74,7 +74,7 @@ impl BatchVerifier { self.scheduler.spawn( name, - "", + None, async move { if !f() { invalid_clone.store(true, AtomicOrdering::Relaxed); @@ -178,8 +178,8 @@ impl BatchVerifier { if pending.len() > 0 { let (sender, receiver) = std::sync::mpsc::channel(); self.scheduler.spawn( - "substrate_batch_verify_join", - "", + "substrate-batch-verify-join", + None, async move { futures::future::join_all(pending).await; sender.send(()).expect( diff --git a/primitives/tasks/src/lib.rs b/primitives/tasks/src/lib.rs index 7fafb0022baf1..c874bb98e1ae6 100644 --- a/primitives/tasks/src/lib.rs +++ b/primitives/tasks/src/lib.rs @@ -95,7 +95,7 @@ mod inner { let extra_scheduler = scheduler.clone(); scheduler.spawn( "parallel-runtime-spawn", - "substrate_runtime", + Some("substrate-runtime"), Box::pin(async move { let result = match crate::new_async_externalities(extra_scheduler) { Ok(mut ext) => { diff --git a/test-utils/test-runner/src/client.rs b/test-utils/test-runner/src/client.rs index 7445c4a0d9221..27c04c40fe6fe 100644 --- a/test-utils/test-runner/src/client.rs +++ b/test-utils/test-runner/src/client.rs @@ -237,7 +237,7 @@ where // spawn the authorship task as an essential task. task_manager .spawn_essential_handle() - .spawn("manual-seal", "", authorship_future); + .spawn("manual-seal", None, authorship_future); network_starter.start_network(); let rpc_handler = rpc_handlers.io_handler();