From 037989c7804e019d6ecbafe490d0b327d3d6ac3a Mon Sep 17 00:00:00 2001 From: Scott Piriou Date: Wed, 15 Apr 2020 16:40:30 +0200 Subject: [PATCH 01/15] Move tasks from tx pool to Service for basic_queue --- Cargo.lock | 1 - bin/node-template/node/src/service.rs | 16 +++-- bin/node/cli/src/service.rs | 12 +++- client/consensus/aura/src/lib.rs | 7 ++- client/consensus/babe/src/lib.rs | 4 +- client/consensus/manual-seal/src/lib.rs | 6 +- client/consensus/pow/src/lib.rs | 5 +- client/network/test/src/lib.rs | 4 ++ client/service/src/builder.rs | 14 +++-- primitives/consensus/common/Cargo.toml | 1 - .../common/src/import_queue/basic_queue.rs | 62 ++----------------- 11 files changed, 55 insertions(+), 77 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 37aeca5892292..18915b122cb3d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7281,7 +7281,6 @@ version = "0.8.0-dev" dependencies = [ "derive_more", "futures 0.3.4", - "futures-diagnose", "futures-timer 3.0.2", "libp2p", "log", diff --git a/bin/node-template/node/src/service.rs b/bin/node-template/node/src/service.rs index 7c4a574f6be04..7dbb09c6a5aee 100644 --- a/bin/node-template/node/src/service.rs +++ b/bin/node-template/node/src/service.rs @@ -39,7 +39,7 @@ macro_rules! new_full_start { let pool_api = sc_transaction_pool::FullChainApi::new(client.clone()); Ok(sc_transaction_pool::BasicPool::new(config, std::sync::Arc::new(pool_api))) })? - .with_import_queue(|_config, client, mut select_chain, _transaction_pool| { + .with_import_queue(|_config, client, mut select_chain, _transaction_pool, tasks_builder| { let select_chain = select_chain.take() .ok_or_else(|| sc_service::Error::SelectChainRequired)?; @@ -50,13 +50,17 @@ macro_rules! new_full_start { grandpa_block_import.clone(), client.clone(), ); - let import_queue = sc_consensus_aura::import_queue::<_, _, _, AuraPair>( + let spawn_handle = tasks_builder.spawn_handle(); + let spawner = |future| spawn_handle.spawn("import-queue-worker", future); + + let import_queue = sc_consensus_aura::import_queue::<_, _, _, AuraPair, _>( sc_consensus_aura::slot_duration(&*client)?, aura_block_import, Some(Box::new(grandpa_block_import.clone())), None, client, inherent_data_providers.clone(), + spawner, )?; import_setup = Some((grandpa_block_import, grandpa_link)); @@ -193,7 +197,7 @@ pub fn new_light(config: Configuration) ); Ok(pool) })? - .with_import_queue_and_fprb(|_config, client, backend, fetcher, _select_chain, _tx_pool| { + .with_import_queue_and_fprb(|_config, client, backend, fetcher, _select_chain, _tx_pool, tasks_builder| { let fetch_checker = fetcher .map(|fetcher| fetcher.checker().clone()) .ok_or_else(|| "Trying to start light import queue without active fetch checker")?; @@ -207,13 +211,17 @@ pub fn new_light(config: Configuration) let finality_proof_request_builder = finality_proof_import.create_finality_proof_request_builder(); - let import_queue = sc_consensus_aura::import_queue::<_, _, _, AuraPair>( + let spawn_handle = tasks_builder.spawn_handle(); + let spawner = |future| spawn_handle.spawn("import-queue-worker", future); + + let import_queue = sc_consensus_aura::import_queue::<_, _, _, AuraPair, _>( sc_consensus_aura::slot_duration(&*client)?, grandpa_block_import, None, Some(Box::new(finality_proof_import)), client, inherent_data_providers.clone(), + spawner, )?; Ok((import_queue, finality_proof_request_builder)) diff --git a/bin/node/cli/src/service.rs b/bin/node/cli/src/service.rs index 757022655dd83..0558ab049d714 100644 --- a/bin/node/cli/src/service.rs +++ b/bin/node/cli/src/service.rs @@ -60,7 +60,7 @@ macro_rules! new_full_start { let pool_api = sc_transaction_pool::FullChainApi::new(client.clone()); Ok(sc_transaction_pool::BasicPool::new(config, std::sync::Arc::new(pool_api))) })? - .with_import_queue(|_config, client, mut select_chain, _transaction_pool| { + .with_import_queue(|_config, client, mut select_chain, _transaction_pool, tasks_builder| { let select_chain = select_chain.take() .ok_or_else(|| sc_service::Error::SelectChainRequired)?; let (grandpa_block_import, grandpa_link) = grandpa::block_import( @@ -76,6 +76,9 @@ macro_rules! new_full_start { client.clone(), )?; + let spawn_handle = tasks_builder.spawn_handle(); + let spawner = |future| spawn_handle.spawn("import-queue-worker", future); + let import_queue = sc_consensus_babe::import_queue( babe_link.clone(), block_import.clone(), @@ -83,6 +86,7 @@ macro_rules! new_full_start { None, client, inherent_data_providers.clone(), + spawner, )?; import_setup = Some((block_import, grandpa_link, babe_link)); @@ -304,7 +308,7 @@ pub fn new_light(config: Configuration) ); Ok(pool) })? - .with_import_queue_and_fprb(|_config, client, backend, fetcher, _select_chain, _tx_pool| { + .with_import_queue_and_fprb(|_config, client, backend, fetcher, _select_chain, _tx_pool, tasks_builder| { let fetch_checker = fetcher .map(|fetcher| fetcher.checker().clone()) .ok_or_else(|| "Trying to start light import queue without active fetch checker")?; @@ -325,6 +329,9 @@ pub fn new_light(config: Configuration) client.clone(), )?; + let spawn_handle = tasks_builder.spawn_handle(); + let spawner = |future| spawn_handle.spawn("import-queue-worker", future); + let import_queue = sc_consensus_babe::import_queue( babe_link, babe_block_import, @@ -332,6 +339,7 @@ pub fn new_light(config: Configuration) Some(Box::new(finality_proof_import)), client.clone(), inherent_data_providers.clone(), + spawner, )?; Ok((import_queue, finality_proof_request_builder)) diff --git a/client/consensus/aura/src/lib.rs b/client/consensus/aura/src/lib.rs index 56674546d372b..047a0b35dfad4 100644 --- a/client/consensus/aura/src/lib.rs +++ b/client/consensus/aura/src/lib.rs @@ -33,7 +33,7 @@ use std::{ collections::HashMap }; -use futures::prelude::*; +use futures::{prelude::*, future::BoxFuture}; use parking_lot::Mutex; use log::{debug, info, trace}; @@ -788,13 +788,14 @@ impl BlockImport for AuraBlockImport( +pub fn import_queue( slot_duration: SlotDuration, block_import: I, justification_import: Option>, finality_proof_import: Option>, client: Arc, inherent_data_providers: InherentDataProviders, + spawner: F, ) -> Result>, sp_consensus::Error> where B: BlockT, C::Api: BlockBuilderApi + AuraApi> + ApiExt, @@ -804,6 +805,7 @@ pub fn import_queue( P: Pair + Send + Sync + 'static, P::Public: Clone + Eq + Send + Sync + Hash + Debug + Encode + Decode, P::Signature: Encode + Decode, + F: Fn(BoxFuture<'static, ()>) -> (), { register_aura_inherent_data_provider(&inherent_data_providers, slot_duration.get())?; initialize_authorities_cache(&*client)?; @@ -818,6 +820,7 @@ pub fn import_queue( Box::new(block_import), justification_import, finality_proof_import, + spawner, )) } diff --git a/client/consensus/babe/src/lib.rs b/client/consensus/babe/src/lib.rs index 092bf8153b9bf..4e8ce8f360b2d 100644 --- a/client/consensus/babe/src/lib.rs +++ b/client/consensus/babe/src/lib.rs @@ -100,7 +100,7 @@ use sc_client_api::{ }; use sp_block_builder::BlockBuilder as BlockBuilderApi; -use futures::prelude::*; +use futures::{prelude::*, future::BoxFuture}; use log::{debug, info, log, trace, warn}; use sc_consensus_slots::{ SlotWorker, SlotInfo, SlotCompatible, StorageChanges, CheckedHeader, check_equivocation, @@ -1203,6 +1203,7 @@ pub fn import_queue( finality_proof_import: Option>, client: Arc, inherent_data_providers: InherentDataProviders, + spawner: impl Fn(BoxFuture<'static, ()>) -> (), ) -> ClientResult>> where Inner: BlockImport> + Send + Sync + 'static, @@ -1225,6 +1226,7 @@ pub fn import_queue( Box::new(block_import), justification_import, finality_proof_import, + spawner, )) } diff --git a/client/consensus/manual-seal/src/lib.rs b/client/consensus/manual-seal/src/lib.rs index 8294ae049f658..253521bbb4564 100644 --- a/client/consensus/manual-seal/src/lib.rs +++ b/client/consensus/manual-seal/src/lib.rs @@ -17,7 +17,7 @@ //! A manual sealing engine: the engine listens for rpc calls to seal blocks and create forks. //! This is suitable for a testing environment. -use futures::prelude::*; +use futures::{prelude::*, future::BoxFuture}; use sp_consensus::{ Environment, Proposer, ForkChoiceStrategy, BlockImportParams, BlockOrigin, SelectChain, import_queue::{BasicQueue, CacheKeyId, Verifier, BoxBlockImport}, @@ -67,7 +67,8 @@ impl Verifier for ManualSealVerifier { /// Instantiate the import queue for the manual seal consensus engine. pub fn import_queue( - block_import: BoxBlockImport> + block_import: BoxBlockImport>, + spawner: impl Fn(BoxFuture<'static, ()>) -> () ) -> BasicQueue> where Block: BlockT, @@ -78,6 +79,7 @@ pub fn import_queue( Box::new(block_import), None, None, + spawner, ) } diff --git a/client/consensus/pow/src/lib.rs b/client/consensus/pow/src/lib.rs index de41ea7bd2356..8c2a17eccf08f 100644 --- a/client/consensus/pow/src/lib.rs +++ b/client/consensus/pow/src/lib.rs @@ -54,6 +54,7 @@ use codec::{Encode, Decode}; use sc_client_api; use log::*; use sp_timestamp::{InherentError as TIError, TimestampInherentData}; +use futures::future::BoxFuture; #[derive(derive_more::Display, Debug)] pub enum Error { @@ -459,6 +460,7 @@ pub fn import_queue( block_import: BoxBlockImport, algorithm: Algorithm, inherent_data_providers: InherentDataProviders, + spawner: impl Fn(BoxFuture<'static, ()>) -> (), ) -> Result< PowImportQueue, sp_consensus::Error @@ -475,7 +477,8 @@ pub fn import_queue( verifier, block_import, None, - None + None, + spawner )) } diff --git a/client/network/test/src/lib.rs b/client/network/test/src/lib.rs index ae129871db95e..dba77bed91d8b 100644 --- a/client/network/test/src/lib.rs +++ b/client/network/test/src/lib.rs @@ -594,11 +594,13 @@ pub trait TestNetFactory: Sized { ); let verifier = VerifierAdapter::new(Arc::new(Mutex::new(Box::new(verifier) as Box<_>))); + let spawner = |_fut| (); let import_queue = Box::new(BasicQueue::new( verifier.clone(), Box::new(block_import.clone()), justification_import, finality_proof_import, + spawner, )); let listen_addr = build_multiaddr![Memory(rand::random::())]; @@ -670,11 +672,13 @@ pub trait TestNetFactory: Sized { ); let verifier = VerifierAdapter::new(Arc::new(Mutex::new(Box::new(verifier) as Box<_>))); + let spawner = |_fut| (); let import_queue = Box::new(BasicQueue::new( verifier.clone(), Box::new(block_import.clone()), justification_import, finality_proof_import, + spawner, )); let listen_addr = build_multiaddr![Memory(rand::random::())]; diff --git a/client/service/src/builder.rs b/client/service/src/builder.rs index 4f370c1118fb2..60e7895b43a5d 100644 --- a/client/service/src/builder.rs +++ b/client/service/src/builder.rs @@ -427,7 +427,7 @@ impl /// Defines which import queue to use. pub fn with_import_queue( self, - builder: impl FnOnce(&Configuration, Arc, Option, Arc) + builder: impl FnOnce(&Configuration, Arc, Option, Arc, &TaskManagerBuilder) -> Result ) -> Result, Error> @@ -436,7 +436,8 @@ impl &self.config, self.client.clone(), self.select_chain.clone(), - self.transaction_pool.clone() + self.transaction_pool.clone(), + &self.tasks_builder, )?; Ok(ServiceBuilder { @@ -526,6 +527,7 @@ impl Option, Option, Arc, + &TaskManagerBuilder, ) -> Result<(UImpQu, Option), Error> ) -> Result, Error> @@ -536,7 +538,8 @@ impl self.backend.clone(), self.fetcher.clone(), self.select_chain.clone(), - self.transaction_pool.clone() + self.transaction_pool.clone(), + &self.tasks_builder, )?; Ok(ServiceBuilder { @@ -568,12 +571,13 @@ impl Option, Option, Arc, + &TaskManagerBuilder, ) -> Result<(UImpQu, UFprb), Error> ) -> Result, Error> where TSc: Clone, TFchr: Clone { - self.with_import_queue_and_opt_fprb(|cfg, cl, b, f, sc, tx| - builder(cfg, cl, b, f, sc, tx) + self.with_import_queue_and_opt_fprb(|cfg, cl, b, f, sc, tx, tb| + builder(cfg, cl, b, f, sc, tx, tb) .map(|(q, f)| (q, Some(f))) ) } diff --git a/primitives/consensus/common/Cargo.toml b/primitives/consensus/common/Cargo.toml index afb2b8dfb5e9b..c61f0dd31fac0 100644 --- a/primitives/consensus/common/Cargo.toml +++ b/primitives/consensus/common/Cargo.toml @@ -22,7 +22,6 @@ sp-inherents = { version = "2.0.0-dev", path = "../../inherents" } sp-state-machine = { version = "0.8.0-dev", path = "../../../primitives/state-machine" } futures = { version = "0.3.1", features = ["thread-pool"] } futures-timer = "3.0.1" -futures-diagnose = "1.0" sp-std = { version = "2.0.0-dev", path = "../../std" } sp-version = { version = "2.0.0-dev", path = "../../version" } sp-runtime = { version = "2.0.0-dev", path = "../../runtime" } diff --git a/primitives/consensus/common/src/import_queue/basic_queue.rs b/primitives/consensus/common/src/import_queue/basic_queue.rs index 024e473849349..f39097ef3b366 100644 --- a/primitives/consensus/common/src/import_queue/basic_queue.rs +++ b/primitives/consensus/common/src/import_queue/basic_queue.rs @@ -14,10 +14,9 @@ // You should have received a copy of the GNU General Public License // along with Substrate. If not, see . -use std::{mem, pin::Pin, time::Duration, marker::PhantomData, sync::Arc}; -use futures::{prelude::*, task::Context, task::Poll}; +use std::{mem, pin::Pin, time::Duration, marker::PhantomData}; +use futures::{prelude::*, task::Context, task::Poll, future::BoxFuture}; use futures_timer::Delay; -use parking_lot::{Mutex, Condvar}; use sp_runtime::{Justification, traits::{Block as BlockT, Header as HeaderT, NumberFor}}; use sp_utils::mpsc::{TracingUnboundedSender, tracing_unbounded}; @@ -36,31 +35,14 @@ pub struct BasicQueue { sender: TracingUnboundedSender>, /// Results coming from the worker task. result_port: BufferedLinkReceiver, - /// If it isn't possible to spawn the future in `future_to_spawn` (which is notably the case in - /// "no std" environment), we instead put it in `manual_poll`. It is then polled manually from - /// `poll_actions`. - manual_poll: Option + Send>>>, - /// A thread pool where the background worker is being run. - pool: Option, - pool_guard: Arc<(Mutex, Condvar)>, _phantom: PhantomData, } impl Drop for BasicQueue { fn drop(&mut self) { - self.pool = None; // Flush the queue and close the receiver to terminate the future. self.sender.close_channel(); self.result_port.close(); - - // Make sure all pool threads terminate. - // https://github.com/rust-lang/futures-rs/issues/1470 - // https://github.com/rust-lang/futures-rs/issues/1349 - let (ref mutex, ref condvar) = *self.pool_guard; - let mut lock = mutex.lock(); - while *lock != 0 { - condvar.wait(&mut lock); - } } } @@ -74,6 +56,7 @@ impl BasicQueue { block_import: BoxBlockImport, justification_import: Option>, finality_proof_import: Option>, + spawner: impl Fn(BoxFuture<'static, ()>) -> (), ) -> Self { let (result_sender, result_port) = buffered_link::buffered_link(); let (future, worker_sender) = BlockImportWorker::new( @@ -84,39 +67,11 @@ impl BasicQueue { finality_proof_import, ); - let guard = Arc::new((Mutex::new(0usize), Condvar::new())); - let guard_start = guard.clone(); - let guard_end = guard.clone(); - - let mut pool = futures::executor::ThreadPool::builder() - .name_prefix("import-queue-worker-") - .pool_size(1) - .after_start(move |_| *guard_start.0.lock() += 1) - .before_stop(move |_| { - let (ref mutex, ref condvar) = *guard_end; - let mut lock = mutex.lock(); - *lock -= 1; - if *lock == 0 { - condvar.notify_one(); - } - }) - .create() - .ok(); - - let manual_poll; - if let Some(pool) = &mut pool { - pool.spawn_ok(futures_diagnose::diagnose("import-queue", future)); - manual_poll = None; - } else { - manual_poll = Some(Box::pin(future) as Pin>); - } + spawner(future.boxed()); Self { sender: worker_sender, result_port, - manual_poll, - pool, - pool_guard: guard, _phantom: PhantomData, } } @@ -160,15 +115,6 @@ impl ImportQueue for BasicQueue } fn poll_actions(&mut self, cx: &mut Context, link: &mut dyn Link) { - // As a backup mechanism, if we failed to spawn the `future_to_spawn`, we instead poll - // manually here. - if let Some(manual_poll) = self.manual_poll.as_mut() { - match Future::poll(Pin::new(manual_poll), cx) { - Poll::Pending => {} - _ => self.manual_poll = None, - } - } - self.result_port.poll_actions(cx, link); } } From 894c74fe962a8011e0af71a34fc23f9ebf311a38 Mon Sep 17 00:00:00 2001 From: Scott Piriou Date: Wed, 15 Apr 2020 17:15:38 +0200 Subject: [PATCH 02/15] Add spawner to tests --- client/network/src/service/tests.rs | 3 +++ client/network/test/src/block_import.rs | 1 + client/network/test/src/lib.rs | 1 + 3 files changed, 5 insertions(+) diff --git a/client/network/src/service/tests.rs b/client/network/src/service/tests.rs index a60b32efb414e..7d0b052947c95 100644 --- a/client/network/src/service/tests.rs +++ b/client/network/src/service/tests.rs @@ -80,11 +80,14 @@ fn build_test_full_node(config: config::NetworkConfiguration) } } + let spawner = |_fut| (); + let import_queue = Box::new(sp_consensus::import_queue::BasicQueue::new( PassThroughVerifier(false), Box::new(client.clone()), None, None, + spawner, )); let worker = NetworkWorker::new(config::Params { diff --git a/client/network/test/src/block_import.rs b/client/network/test/src/block_import.rs index aa6d275141fc7..9206d2c2be3ec 100644 --- a/client/network/test/src/block_import.rs +++ b/client/network/test/src/block_import.rs @@ -84,6 +84,7 @@ fn async_import_queue_drops() { // Perform this test multiple times since it exhibits non-deterministic behavior. for _ in 0..100 { let verifier = PassThroughVerifier(true); + let spawner = |_fut| (); // SCOTT let queue = BasicQueue::new(verifier, Box::new(substrate_test_runtime_client::new()), None, None); drop(queue); } diff --git a/client/network/test/src/lib.rs b/client/network/test/src/lib.rs index dba77bed91d8b..afd74ce7312cb 100644 --- a/client/network/test/src/lib.rs +++ b/client/network/test/src/lib.rs @@ -595,6 +595,7 @@ pub trait TestNetFactory: Sized { let verifier = VerifierAdapter::new(Arc::new(Mutex::new(Box::new(verifier) as Box<_>))); let spawner = |_fut| (); + let import_queue = Box::new(BasicQueue::new( verifier.clone(), Box::new(block_import.clone()), From b8fae5db50c20447846bd8e64218c9797fca61d8 Mon Sep 17 00:00:00 2001 From: Scott Piriou Date: Wed, 15 Apr 2020 19:24:18 +0200 Subject: [PATCH 03/15] Update tests --- client/network/src/service/tests.rs | 3 ++- client/network/test/src/block_import.rs | 6 ++++-- client/network/test/src/lib.rs | 7 +++++-- 3 files changed, 11 insertions(+), 5 deletions(-) diff --git a/client/network/src/service/tests.rs b/client/network/src/service/tests.rs index 7d0b052947c95..238cc228b9fe6 100644 --- a/client/network/src/service/tests.rs +++ b/client/network/src/service/tests.rs @@ -80,7 +80,8 @@ fn build_test_full_node(config: config::NetworkConfiguration) } } - let spawner = |_fut| (); + let threads_pool = futures::executor::ThreadPool::new().unwrap(); + let spawner = |future| threads_pool.spawn_ok(future); let import_queue = Box::new(sp_consensus::import_queue::BasicQueue::new( PassThroughVerifier(false), diff --git a/client/network/test/src/block_import.rs b/client/network/test/src/block_import.rs index 9206d2c2be3ec..5f152fa1322ee 100644 --- a/client/network/test/src/block_import.rs +++ b/client/network/test/src/block_import.rs @@ -84,8 +84,10 @@ fn async_import_queue_drops() { // Perform this test multiple times since it exhibits non-deterministic behavior. for _ in 0..100 { let verifier = PassThroughVerifier(true); - let spawner = |_fut| (); // SCOTT - let queue = BasicQueue::new(verifier, Box::new(substrate_test_runtime_client::new()), None, None); + + let threads_pool = futures::executor::ThreadPool::new().unwrap(); + let spawner = |future| threads_pool.spawn_ok(future); + let queue = BasicQueue::new(verifier, Box::new(substrate_test_runtime_client::new()), None, None, spawner); drop(queue); } } diff --git a/client/network/test/src/lib.rs b/client/network/test/src/lib.rs index afd74ce7312cb..f2ceea8b39776 100644 --- a/client/network/test/src/lib.rs +++ b/client/network/test/src/lib.rs @@ -594,7 +594,8 @@ pub trait TestNetFactory: Sized { ); let verifier = VerifierAdapter::new(Arc::new(Mutex::new(Box::new(verifier) as Box<_>))); - let spawner = |_fut| (); + let threads_pool = futures::executor::ThreadPool::new().unwrap(); + let spawner = |future| threads_pool.spawn_ok(future); let import_queue = Box::new(BasicQueue::new( verifier.clone(), @@ -673,7 +674,9 @@ pub trait TestNetFactory: Sized { ); let verifier = VerifierAdapter::new(Arc::new(Mutex::new(Box::new(verifier) as Box<_>))); - let spawner = |_fut| (); + let threads_pool = futures::executor::ThreadPool::new().unwrap(); + let spawner = |future| threads_pool.spawn_ok(future); + let import_queue = Box::new(BasicQueue::new( verifier.clone(), Box::new(block_import.clone()), From 5ae26529d6bdbe4bd87cb313b2a91169c0bb28ec Mon Sep 17 00:00:00 2001 From: Scott Piriou Date: Wed, 22 Apr 2020 15:40:36 +0200 Subject: [PATCH 04/15] Change usage of TaskManagerBuilder to TaskManager --- bin/node-template/node/src/service.rs | 10 ++++------ bin/node/cli/src/service.rs | 10 ++++------ client/service/src/builder.rs | 10 +++++----- client/service/src/chain_ops.rs | 2 ++ client/service/src/task_manager.rs | 8 ++++---- 5 files changed, 19 insertions(+), 21 deletions(-) diff --git a/bin/node-template/node/src/service.rs b/bin/node-template/node/src/service.rs index 27af1e5daea88..49fc3293eee02 100644 --- a/bin/node-template/node/src/service.rs +++ b/bin/node-template/node/src/service.rs @@ -41,7 +41,7 @@ macro_rules! new_full_start { let pool_api = sc_transaction_pool::FullChainApi::new(client.clone()); Ok(sc_transaction_pool::BasicPool::new(config, std::sync::Arc::new(pool_api), prometheus_registry)) })? - .with_import_queue(|_config, client, mut select_chain, _transaction_pool, tasks_builder| { + .with_import_queue(|_config, client, mut select_chain, _transaction_pool, task_manager| { let select_chain = select_chain.take() .ok_or_else(|| sc_service::Error::SelectChainRequired)?; @@ -52,8 +52,7 @@ macro_rules! new_full_start { grandpa_block_import.clone(), client.clone(), ); - let spawn_handle = tasks_builder.spawn_handle(); - let spawner = |future| spawn_handle.spawn("import-queue-worker", future); + let spawner = |future| task_manager.spawn("import-queue-worker", future); let import_queue = sc_consensus_aura::import_queue::<_, _, _, AuraPair, _>( sc_consensus_aura::slot_duration(&*client)?, @@ -199,7 +198,7 @@ pub fn new_light(config: Configuration) ); Ok(pool) })? - .with_import_queue_and_fprb(|_config, client, backend, fetcher, _select_chain, _tx_pool, tasks_builder| { + .with_import_queue_and_fprb(|_config, client, backend, fetcher, _select_chain, _tx_pool, task_manager| { let fetch_checker = fetcher .map(|fetcher| fetcher.checker().clone()) .ok_or_else(|| "Trying to start light import queue without active fetch checker")?; @@ -213,8 +212,7 @@ pub fn new_light(config: Configuration) let finality_proof_request_builder = finality_proof_import.create_finality_proof_request_builder(); - let spawn_handle = tasks_builder.spawn_handle(); - let spawner = |future| spawn_handle.spawn("import-queue-worker", future); + let spawner = |future| task_manager.spawn("import-queue-worker", future); let import_queue = sc_consensus_aura::import_queue::<_, _, _, AuraPair, _>( sc_consensus_aura::slot_duration(&*client)?, diff --git a/bin/node/cli/src/service.rs b/bin/node/cli/src/service.rs index 294331caf998d..9429ef9d95696 100644 --- a/bin/node/cli/src/service.rs +++ b/bin/node/cli/src/service.rs @@ -60,7 +60,7 @@ macro_rules! new_full_start { let pool_api = sc_transaction_pool::FullChainApi::new(client.clone()); Ok(sc_transaction_pool::BasicPool::new(config, std::sync::Arc::new(pool_api), prometheus_registry)) })? - .with_import_queue(|_config, client, mut select_chain, _transaction_pool, tasks_builder| { + .with_import_queue(|_config, client, mut select_chain, _transaction_pool, task_manager| { let select_chain = select_chain.take() .ok_or_else(|| sc_service::Error::SelectChainRequired)?; let (grandpa_block_import, grandpa_link) = grandpa::block_import( @@ -76,8 +76,7 @@ macro_rules! new_full_start { client.clone(), )?; - let spawn_handle = tasks_builder.spawn_handle(); - let spawner = |future| spawn_handle.spawn("import-queue-worker", future); + let spawner = |future| task_manager.spawn("import-queue-worker", future); let import_queue = sc_consensus_babe::import_queue( babe_link.clone(), @@ -325,7 +324,7 @@ pub fn new_light(config: Configuration) ); Ok(pool) })? - .with_import_queue_and_fprb(|_config, client, backend, fetcher, _select_chain, _tx_pool, tasks_builder| { + .with_import_queue_and_fprb(|_config, client, backend, fetcher, _select_chain, _tx_pool, task_manager| { let fetch_checker = fetcher .map(|fetcher| fetcher.checker().clone()) .ok_or_else(|| "Trying to start light import queue without active fetch checker")?; @@ -346,8 +345,7 @@ pub fn new_light(config: Configuration) client.clone(), )?; - let spawn_handle = tasks_builder.spawn_handle(); - let spawner = |future| spawn_handle.spawn("import-queue-worker", future); + let spawner = |future| task_manager.spawn("import-queue-worker", future); let import_queue = sc_consensus_babe::import_queue( babe_link, diff --git a/client/service/src/builder.rs b/client/service/src/builder.rs index 82b77064802b3..07f1d23c9ae91 100644 --- a/client/service/src/builder.rs +++ b/client/service/src/builder.rs @@ -429,7 +429,7 @@ impl /// Defines which import queue to use. pub fn with_import_queue( self, - builder: impl FnOnce(&Configuration, Arc, Option, Arc, &TaskManagerBuilder) + builder: impl FnOnce(&Configuration, Arc, Option, Arc, &TaskManager) -> Result ) -> Result, Error> @@ -439,7 +439,7 @@ impl self.client.clone(), self.select_chain.clone(), self.transaction_pool.clone(), - &self.tasks_builder, + &self.task_manager, )?; Ok(ServiceBuilder { @@ -529,7 +529,7 @@ impl Option, Option, Arc, - &TaskManagerBuilder, + &TaskManager, ) -> Result<(UImpQu, Option), Error> ) -> Result, Error> @@ -541,7 +541,7 @@ impl self.fetcher.clone(), self.select_chain.clone(), self.transaction_pool.clone(), - &self.tasks_builder, + &self.task_manager, )?; Ok(ServiceBuilder { @@ -573,7 +573,7 @@ impl Option, Option, Arc, - &TaskManagerBuilder, + &TaskManager, ) -> Result<(UImpQu, UFprb), Error> ) -> Result, Error> diff --git a/client/service/src/chain_ops.rs b/client/service/src/chain_ops.rs index 12fae3224108a..eaeccdbe84d2b 100644 --- a/client/service/src/chain_ops.rs +++ b/client/service/src/chain_ops.rs @@ -114,6 +114,7 @@ impl< // This makes it possible either to interleave other operations in-between the block imports, // or to stop the operation completely. let import = future::poll_fn(move |cx| { + dbg!("import"); // Start by reading the number of blocks if not done so already. let count = match count { Some(c) => c, @@ -190,6 +191,7 @@ impl< } else { // Polling the import queue will re-schedule the task when ready. + dbg!("PENDING"); return std::task::Poll::Pending; } }); diff --git a/client/service/src/task_manager.rs b/client/service/src/task_manager.rs index e6847d0881120..a1a00590cd8da 100644 --- a/client/service/src/task_manager.rs +++ b/client/service/src/task_manager.rs @@ -123,7 +123,7 @@ pub struct TaskManager { impl TaskManager { /// If a Prometheus registry is passed, it will be used to report statistics about the /// service tasks. - pub(super) fn new( + pub fn new( executor: ServiceTaskExecutor, prometheus_registry: Option<&Registry> ) -> Result { @@ -142,11 +142,11 @@ impl TaskManager { /// Spawn background/async task, which will be aware on exit signal. /// /// See also the documentation of [`SpawnTaskHandler::spawn`]. - pub(super) fn spawn(&self, name: &'static str, task: impl Future + Send + 'static) { + pub fn spawn(&self, name: &'static str, task: impl Future + Send + 'static) { self.spawn_handle().spawn(name, task) } - pub(super) fn spawn_handle(&self) -> SpawnTaskHandle { + pub fn spawn_handle(&self) -> SpawnTaskHandle { SpawnTaskHandle { on_exit: self.on_exit.clone(), executor: self.executor.clone(), @@ -155,7 +155,7 @@ impl TaskManager { } /// Clone on exit signal. - pub(super) fn on_exit(&self) -> exit_future::Exit { + pub fn on_exit(&self) -> exit_future::Exit { self.on_exit.clone() } } From f51dc2bb9c47231778c43bcd31eba7c5feaa1589 Mon Sep 17 00:00:00 2001 From: Scott Piriou Date: Wed, 22 Apr 2020 15:47:02 +0200 Subject: [PATCH 05/15] Renove dbg! --- client/service/src/chain_ops.rs | 2 -- 1 file changed, 2 deletions(-) diff --git a/client/service/src/chain_ops.rs b/client/service/src/chain_ops.rs index eaeccdbe84d2b..12fae3224108a 100644 --- a/client/service/src/chain_ops.rs +++ b/client/service/src/chain_ops.rs @@ -114,7 +114,6 @@ impl< // This makes it possible either to interleave other operations in-between the block imports, // or to stop the operation completely. let import = future::poll_fn(move |cx| { - dbg!("import"); // Start by reading the number of blocks if not done so already. let count = match count { Some(c) => c, @@ -191,7 +190,6 @@ impl< } else { // Polling the import queue will re-schedule the task when ready. - dbg!("PENDING"); return std::task::Poll::Pending; } }); From 225e6c17a2b92725cf69c2731841383cfc64631c Mon Sep 17 00:00:00 2001 From: Scott Piriou Date: Wed, 22 Apr 2020 17:34:30 +0200 Subject: [PATCH 06/15] Revert exposing TaskManager in public API --- bin/node-template/node/src/service.rs | 8 ++++---- bin/node/cli/src/service.rs | 8 ++++---- client/service/src/builder.rs | 12 ++++++------ client/service/src/task_manager.rs | 8 ++++---- 4 files changed, 18 insertions(+), 18 deletions(-) diff --git a/bin/node-template/node/src/service.rs b/bin/node-template/node/src/service.rs index 49fc3293eee02..4539783c5e843 100644 --- a/bin/node-template/node/src/service.rs +++ b/bin/node-template/node/src/service.rs @@ -41,7 +41,7 @@ macro_rules! new_full_start { let pool_api = sc_transaction_pool::FullChainApi::new(client.clone()); Ok(sc_transaction_pool::BasicPool::new(config, std::sync::Arc::new(pool_api), prometheus_registry)) })? - .with_import_queue(|_config, client, mut select_chain, _transaction_pool, task_manager| { + .with_import_queue(|_config, client, mut select_chain, _transaction_pool, spawn_task_handle| { let select_chain = select_chain.take() .ok_or_else(|| sc_service::Error::SelectChainRequired)?; @@ -52,7 +52,7 @@ macro_rules! new_full_start { grandpa_block_import.clone(), client.clone(), ); - let spawner = |future| task_manager.spawn("import-queue-worker", future); + let spawner = |future| spawn_task_handle.spawn("import-queue-worker", future); let import_queue = sc_consensus_aura::import_queue::<_, _, _, AuraPair, _>( sc_consensus_aura::slot_duration(&*client)?, @@ -198,7 +198,7 @@ pub fn new_light(config: Configuration) ); Ok(pool) })? - .with_import_queue_and_fprb(|_config, client, backend, fetcher, _select_chain, _tx_pool, task_manager| { + .with_import_queue_and_fprb(|_config, client, backend, fetcher, _select_chain, _tx_pool, spawn_task_handle| { let fetch_checker = fetcher .map(|fetcher| fetcher.checker().clone()) .ok_or_else(|| "Trying to start light import queue without active fetch checker")?; @@ -212,7 +212,7 @@ pub fn new_light(config: Configuration) let finality_proof_request_builder = finality_proof_import.create_finality_proof_request_builder(); - let spawner = |future| task_manager.spawn("import-queue-worker", future); + let spawner = |future| spawn_task_handle.spawn("import-queue-worker", future); let import_queue = sc_consensus_aura::import_queue::<_, _, _, AuraPair, _>( sc_consensus_aura::slot_duration(&*client)?, diff --git a/bin/node/cli/src/service.rs b/bin/node/cli/src/service.rs index 9429ef9d95696..5058601628b32 100644 --- a/bin/node/cli/src/service.rs +++ b/bin/node/cli/src/service.rs @@ -60,7 +60,7 @@ macro_rules! new_full_start { let pool_api = sc_transaction_pool::FullChainApi::new(client.clone()); Ok(sc_transaction_pool::BasicPool::new(config, std::sync::Arc::new(pool_api), prometheus_registry)) })? - .with_import_queue(|_config, client, mut select_chain, _transaction_pool, task_manager| { + .with_import_queue(|_config, client, mut select_chain, _transaction_pool, spawn_task_handle| { let select_chain = select_chain.take() .ok_or_else(|| sc_service::Error::SelectChainRequired)?; let (grandpa_block_import, grandpa_link) = grandpa::block_import( @@ -76,7 +76,7 @@ macro_rules! new_full_start { client.clone(), )?; - let spawner = |future| task_manager.spawn("import-queue-worker", future); + let spawner = |future| spawn_task_handle.spawn("import-queue-worker", future); let import_queue = sc_consensus_babe::import_queue( babe_link.clone(), @@ -324,7 +324,7 @@ pub fn new_light(config: Configuration) ); Ok(pool) })? - .with_import_queue_and_fprb(|_config, client, backend, fetcher, _select_chain, _tx_pool, task_manager| { + .with_import_queue_and_fprb(|_config, client, backend, fetcher, _select_chain, _tx_pool, spawn_task_handle| { let fetch_checker = fetcher .map(|fetcher| fetcher.checker().clone()) .ok_or_else(|| "Trying to start light import queue without active fetch checker")?; @@ -345,7 +345,7 @@ pub fn new_light(config: Configuration) client.clone(), )?; - let spawner = |future| task_manager.spawn("import-queue-worker", future); + let spawner = |future| spawn_task_handle.spawn("import-queue-worker", future); let import_queue = sc_consensus_babe::import_queue( babe_link, diff --git a/client/service/src/builder.rs b/client/service/src/builder.rs index 07f1d23c9ae91..468f870d55a81 100644 --- a/client/service/src/builder.rs +++ b/client/service/src/builder.rs @@ -15,7 +15,7 @@ // along with Substrate. If not, see . use crate::{Service, NetworkStatus, NetworkState, error::Error, DEFAULT_PROTOCOL_ID, MallocSizeOfWasm}; -use crate::{start_rpc_servers, build_network_future, TransactionPoolAdapter, TaskManager}; +use crate::{start_rpc_servers, build_network_future, TransactionPoolAdapter, TaskManager, SpawnTaskHandle}; use crate::status_sinks; use crate::config::{Configuration, KeystoreConfig, PrometheusConfig}; use crate::metrics::MetricsService; @@ -429,7 +429,7 @@ impl /// Defines which import queue to use. pub fn with_import_queue( self, - builder: impl FnOnce(&Configuration, Arc, Option, Arc, &TaskManager) + builder: impl FnOnce(&Configuration, Arc, Option, Arc, &SpawnTaskHandle) -> Result ) -> Result, Error> @@ -439,7 +439,7 @@ impl self.client.clone(), self.select_chain.clone(), self.transaction_pool.clone(), - &self.task_manager, + &self.task_manager.spawn_handle(), )?; Ok(ServiceBuilder { @@ -529,7 +529,7 @@ impl Option, Option, Arc, - &TaskManager, + &SpawnTaskHandle, ) -> Result<(UImpQu, Option), Error> ) -> Result, Error> @@ -541,7 +541,7 @@ impl self.fetcher.clone(), self.select_chain.clone(), self.transaction_pool.clone(), - &self.task_manager, + &self.task_manager.spawn_handle(), )?; Ok(ServiceBuilder { @@ -573,7 +573,7 @@ impl Option, Option, Arc, - &TaskManager, + &SpawnTaskHandle, ) -> Result<(UImpQu, UFprb), Error> ) -> Result, Error> diff --git a/client/service/src/task_manager.rs b/client/service/src/task_manager.rs index a1a00590cd8da..e6847d0881120 100644 --- a/client/service/src/task_manager.rs +++ b/client/service/src/task_manager.rs @@ -123,7 +123,7 @@ pub struct TaskManager { impl TaskManager { /// If a Prometheus registry is passed, it will be used to report statistics about the /// service tasks. - pub fn new( + pub(super) fn new( executor: ServiceTaskExecutor, prometheus_registry: Option<&Registry> ) -> Result { @@ -142,11 +142,11 @@ impl TaskManager { /// Spawn background/async task, which will be aware on exit signal. /// /// See also the documentation of [`SpawnTaskHandler::spawn`]. - pub fn spawn(&self, name: &'static str, task: impl Future + Send + 'static) { + pub(super) fn spawn(&self, name: &'static str, task: impl Future + Send + 'static) { self.spawn_handle().spawn(name, task) } - pub fn spawn_handle(&self) -> SpawnTaskHandle { + pub(super) fn spawn_handle(&self) -> SpawnTaskHandle { SpawnTaskHandle { on_exit: self.on_exit.clone(), executor: self.executor.clone(), @@ -155,7 +155,7 @@ impl TaskManager { } /// Clone on exit signal. - pub fn on_exit(&self) -> exit_future::Exit { + pub(super) fn on_exit(&self) -> exit_future::Exit { self.on_exit.clone() } } From 5b9975e550b1ba1f876ef84e8b606f97734921ba Mon Sep 17 00:00:00 2001 From: Scott Piriou Date: Wed, 22 Apr 2020 18:42:14 +0200 Subject: [PATCH 07/15] Move client and import queue inside the import closure --- client/service/src/chain_ops.rs | 24 +++++++++++++----------- 1 file changed, 13 insertions(+), 11 deletions(-) diff --git a/client/service/src/chain_ops.rs b/client/service/src/chain_ops.rs index 12fae3224108a..845ca54840cb9 100644 --- a/client/service/src/chain_ops.rs +++ b/client/service/src/chain_ops.rs @@ -56,12 +56,13 @@ impl< TExecDisp: 'static + NativeExecutionDispatch, TImpQu: 'static + ImportQueue, TRtApi: 'static + Send + Sync, + Self: Send + 'static, { type Block = TBl; type NativeDispatch = TExecDisp; fn import_blocks( - self, + mut self, input: impl Read + Seek + Send + 'static, force: bool, ) -> Pin> + Send>> { @@ -98,22 +99,23 @@ impl< } } - let client = self.client; - let mut queue = self.import_queue; - let mut io_reader_input = IoReader(input); let mut count = None::; let mut read_block_count = 0; let mut link = WaitLink::new(); - // Importing blocks is implemented as a future, because we want the operation to be - // interruptible. - // - // Every time we read a block from the input or import a bunch of blocks from the import - // queue, the `Future` re-schedules itself and returns `Poll::Pending`. - // This makes it possible either to interleave other operations in-between the block imports, - // or to stop the operation completely. let import = future::poll_fn(move |cx| { + // Importing blocks is implemented as a future, because we want the operation to be + // interruptible. + // + // Every time we read a block from the input or import a bunch of blocks from the import + // queue, the `Future` re-schedules itself and returns `Poll::Pending`. + // This makes it possible either to interleave other operations in-between the block imports, + // or to stop the operation completely. + + let client = &self.client; + let queue = &mut self.import_queue; + // Start by reading the number of blocks if not done so already. let count = match count { Some(c) => c, From 58351b6a4262aa93db93bd91799c39d03c753557 Mon Sep 17 00:00:00 2001 From: Scott Piriou Date: Wed, 22 Apr 2020 19:02:00 +0200 Subject: [PATCH 08/15] Move client inside the export closure --- client/service/src/chain_ops.rs | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/client/service/src/chain_ops.rs b/client/service/src/chain_ops.rs index 845ca54840cb9..e97e16dc4948a 100644 --- a/client/service/src/chain_ops.rs +++ b/client/service/src/chain_ops.rs @@ -205,13 +205,12 @@ impl< to: Option>, binary: bool ) -> Pin>>> { - let client = self.client; let mut block = from; let last = match to { Some(v) if v.is_zero() => One::one(), Some(v) => v, - None => client.chain_info().best_number, + None => self.client.chain_info().best_number, }; let mut wrote_header = false; @@ -224,6 +223,8 @@ impl< // This makes it possible either to interleave other operations in-between the block exports, // or to stop the operation completely. let export = future::poll_fn(move |cx| { + let client = &self.client; + if last < block { return std::task::Poll::Ready(Err("Invalid block range specified".into())); } From 0edfdce4c8b759ebd9865a4943e73594a154594c Mon Sep 17 00:00:00 2001 From: Scott Piriou Date: Wed, 22 Apr 2020 19:38:25 +0200 Subject: [PATCH 09/15] Move comments outside of closure --- client/service/src/chain_ops.rs | 15 +++++++-------- 1 file changed, 7 insertions(+), 8 deletions(-) diff --git a/client/service/src/chain_ops.rs b/client/service/src/chain_ops.rs index e97e16dc4948a..aac6a977bdd76 100644 --- a/client/service/src/chain_ops.rs +++ b/client/service/src/chain_ops.rs @@ -104,15 +104,14 @@ impl< let mut read_block_count = 0; let mut link = WaitLink::new(); + // Importing blocks is implemented as a future, because we want the operation to be + // interruptible. + // + // Every time we read a block from the input or import a bunch of blocks from the import + // queue, the `Future` re-schedules itself and returns `Poll::Pending`. + // This makes it possible either to interleave other operations in-between the block imports, + // or to stop the operation completely. let import = future::poll_fn(move |cx| { - // Importing blocks is implemented as a future, because we want the operation to be - // interruptible. - // - // Every time we read a block from the input or import a bunch of blocks from the import - // queue, the `Future` re-schedules itself and returns `Poll::Pending`. - // This makes it possible either to interleave other operations in-between the block imports, - // or to stop the operation completely. - let client = &self.client; let queue = &mut self.import_queue; From 13e0f23a385cb8f2c0c37c11416a0a6634c610bb Mon Sep 17 00:00:00 2001 From: Scott Piriou Date: Thu, 23 Apr 2020 17:15:55 +0200 Subject: [PATCH 10/15] Add spawn_blocking method to SpawnTaskHandle; WIP --- bin/node-template/node/src/service.rs | 4 ++-- bin/node/cli/src/service.rs | 4 ++-- client/cli/src/config.rs | 4 ++-- client/cli/src/lib.rs | 4 ++-- client/cli/src/runner.rs | 13 ++++++++--- client/service/src/config.rs | 11 ++++++++- client/service/src/lib.rs | 2 +- client/service/src/task_manager.rs | 32 +++++++++++++++++++++++++-- client/service/test/src/lib.rs | 9 ++++---- utils/browser/src/lib.rs | 2 +- 10 files changed, 65 insertions(+), 20 deletions(-) diff --git a/bin/node-template/node/src/service.rs b/bin/node-template/node/src/service.rs index 4539783c5e843..d92d5b2ab1e2a 100644 --- a/bin/node-template/node/src/service.rs +++ b/bin/node-template/node/src/service.rs @@ -52,7 +52,7 @@ macro_rules! new_full_start { grandpa_block_import.clone(), client.clone(), ); - let spawner = |future| spawn_task_handle.spawn("import-queue-worker", future); + let spawner = |future| spawn_task_handle.spawn_blocking("import-queue-worker", future); let import_queue = sc_consensus_aura::import_queue::<_, _, _, AuraPair, _>( sc_consensus_aura::slot_duration(&*client)?, @@ -212,7 +212,7 @@ pub fn new_light(config: Configuration) let finality_proof_request_builder = finality_proof_import.create_finality_proof_request_builder(); - let spawner = |future| spawn_task_handle.spawn("import-queue-worker", future); + let spawner = |future| spawn_task_handle.spawn_blocking("import-queue-worker", future); let import_queue = sc_consensus_aura::import_queue::<_, _, _, AuraPair, _>( sc_consensus_aura::slot_duration(&*client)?, diff --git a/bin/node/cli/src/service.rs b/bin/node/cli/src/service.rs index 5058601628b32..1811e1b80939d 100644 --- a/bin/node/cli/src/service.rs +++ b/bin/node/cli/src/service.rs @@ -76,7 +76,7 @@ macro_rules! new_full_start { client.clone(), )?; - let spawner = |future| spawn_task_handle.spawn("import-queue-worker", future); + let spawner = |future| spawn_task_handle.spawn_blocking("import-queue-worker", future); let import_queue = sc_consensus_babe::import_queue( babe_link.clone(), @@ -345,7 +345,7 @@ pub fn new_light(config: Configuration) client.clone(), )?; - let spawner = |future| spawn_task_handle.spawn("import-queue-worker", future); + let spawner = |future| spawn_task_handle.spawn_blocking("import-queue-worker", future); let import_queue = sc_consensus_babe::import_queue( babe_link, diff --git a/client/cli/src/config.rs b/client/cli/src/config.rs index 9de2022a592f4..9639b37ccdf5b 100644 --- a/client/cli/src/config.rs +++ b/client/cli/src/config.rs @@ -27,7 +27,7 @@ use names::{Generator, Name}; use sc_service::config::{ Configuration, DatabaseConfig, ExecutionStrategies, ExtTransport, KeystoreConfig, NetworkConfiguration, NodeKeyConfig, PrometheusConfig, PruningMode, Role, TelemetryEndpoints, - TransactionPoolOptions, WasmExecutionMethod, + TransactionPoolOptions, WasmExecutionMethod, TaskType, }; use sc_service::{ChainSpec, TracingReceiver}; use std::future::Future; @@ -377,7 +377,7 @@ pub trait CliConfiguration: Sized { fn create_configuration( &self, cli: &C, - task_executor: Arc + Send>>) + Send + Sync>, + task_executor: Arc + Send>>, TaskType) + Send + Sync>, ) -> Result { let is_dev = self.is_dev()?; let chain_id = self.chain_id(is_dev)?; diff --git a/client/cli/src/lib.rs b/client/cli/src/lib.rs index 25b71059b17e6..e723573bc2bdd 100644 --- a/client/cli/src/lib.rs +++ b/client/cli/src/lib.rs @@ -35,7 +35,7 @@ use log::info; pub use params::*; use regex::Regex; pub use runner::*; -use sc_service::{ChainSpec, Configuration}; +use sc_service::{ChainSpec, Configuration, TaskType}; use std::future::Future; use std::io::Write; use std::pin::Pin; @@ -177,7 +177,7 @@ pub trait SubstrateCli: Sized { fn create_configuration( &self, command: &T, - task_executor: Arc + Send>>) + Send + Sync>, + task_executor: Arc + Send>>, TaskType) + Send + Sync>, ) -> error::Result { command.create_configuration(self, task_executor) } diff --git a/client/cli/src/runner.rs b/client/cli/src/runner.rs index 6ebe84f9c55e8..47b8f395263b3 100644 --- a/client/cli/src/runner.rs +++ b/client/cli/src/runner.rs @@ -23,7 +23,7 @@ use futures::pin_mut; use futures::select; use futures::{future, future::FutureExt, Future}; use log::info; -use sc_service::{AbstractService, Configuration, Role, ServiceBuilderCommand}; +use sc_service::{AbstractService, Configuration, Role, ServiceBuilderCommand, TaskType}; use sp_runtime::traits::{Block as BlockT, Header as HeaderT}; use sp_utils::metrics::{TOKIO_THREADS_ALIVE, TOKIO_THREADS_TOTAL}; use std::fmt::Debug; @@ -119,8 +119,15 @@ impl Runner { let task_executor = { let runtime_handle = tokio_runtime.handle().clone(); - Arc::new(move |fut| { - runtime_handle.spawn(fut); + Arc::new(move |fut, task_type| { + match task_type { + TaskType::Async => { runtime_handle.spawn(fut); } + TaskType::Blocking => { + runtime_handle.spawn( async move { + tokio::task::spawn_blocking(|| fut) + }); + } + } }) }; diff --git a/client/service/src/config.rs b/client/service/src/config.rs index affb4aabfc632..33bbe3dc085a4 100644 --- a/client/service/src/config.rs +++ b/client/service/src/config.rs @@ -38,7 +38,7 @@ pub struct Configuration { /// Node role. pub role: Role, /// How to spawn background tasks. Mandatory, otherwise creating a `Service` will error. - pub task_executor: Arc + Send>>) + Send + Sync>, + pub task_executor: Arc + Send>>, TaskType) + Send + Sync>, /// Extrinsic pool configuration. pub transaction_pool: TransactionPoolOptions, /// Network configuration. @@ -102,6 +102,15 @@ pub struct Configuration { pub announce_block: bool, } +#[derive(PartialEq)] +/// Type for tasks spawned by the executor. +pub enum TaskType { + /// Async type. + Async, + /// Blocking type. + Blocking, +} + /// Configuration of the client keystore. #[derive(Clone)] pub enum KeystoreConfig { diff --git a/client/service/src/lib.rs b/client/service/src/lib.rs index 56fee6b6d7423..d1d5d70dab5dc 100644 --- a/client/service/src/lib.rs +++ b/client/service/src/lib.rs @@ -59,7 +59,7 @@ pub use self::builder::{ ServiceBuilder, ServiceBuilderCommand, TFullClient, TLightClient, TFullBackend, TLightBackend, TFullCallExecutor, TLightCallExecutor, }; -pub use config::{Configuration, Role, PruningMode, DatabaseConfig}; +pub use config::{Configuration, Role, PruningMode, DatabaseConfig, TaskType}; pub use sc_chain_spec::{ ChainSpec, GenericChainSpec, Properties, RuntimeGenesis, Extension as ChainSpecExtension, NoExtension, ChainType, diff --git a/client/service/src/task_manager.rs b/client/service/src/task_manager.rs index e6847d0881120..c01334512ff6e 100644 --- a/client/service/src/task_manager.rs +++ b/client/service/src/task_manager.rs @@ -31,11 +31,12 @@ use prometheus_endpoint::{ CounterVec, HistogramOpts, HistogramVec, Opts, Registry, U64 }; use sc_client_api::CloneableSpawn; +use crate::config::TaskType; mod prometheus_future; /// Type alias for service task executor (usually runtime). -pub type ServiceTaskExecutor = Arc + Send>>) + Send + Sync>; +pub type ServiceTaskExecutor = Arc + Send>>, TaskType) + Send + Sync>; /// An handle for spawning tasks in the service. #[derive(Clone)] @@ -80,7 +81,34 @@ impl SpawnTaskHandle { } }; - (self.executor)(Box::pin(future)); + (self.executor)(Box::pin(future), TaskType::Async); + } + + /// Spawns the blocking task with the given name. See also `spawn`. + pub fn spawn_blocking(&self, name: &'static str, task: impl Future + Send + 'static) { + let on_exit = self.on_exit.clone(); + let metrics = self.metrics.clone(); + + if let Some(metrics) = &self.metrics { + metrics.tasks_spawned.with_label_values(&[name]).inc(); + metrics.tasks_ended.with_label_values(&[name]).inc_by(0); + } + + let future = async move { + if let Some(metrics) = metrics { + let poll_duration = metrics.poll_duration.with_label_values(&[name]); + let poll_start = metrics.poll_start.with_label_values(&[name]); + let task = prometheus_future::with_poll_durations(poll_duration, poll_start, task); + futures::pin_mut!(task); + let _ = select(on_exit, task).await; + metrics.tasks_ended.with_label_values(&[name]).inc(); + } else { + futures::pin_mut!(task); + let _ = select(on_exit, task).await; + } + }; + + (self.executor)(Box::pin(future), TaskType::Blocking); } } diff --git a/client/service/test/src/lib.rs b/client/service/test/src/lib.rs index 9b9140dd8d3ff..e9eec333f6af2 100644 --- a/client/service/test/src/lib.rs +++ b/client/service/test/src/lib.rs @@ -36,6 +36,7 @@ use sc_service::{ RuntimeGenesis, Role, Error, + TaskType, }; use sc_network::{multiaddr, Multiaddr}; use sc_network::config::{NetworkConfiguration, TransportConfig}; @@ -135,7 +136,7 @@ fn node_config, role: Role, - task_executor: Arc + Send>>) + Send + Sync>, + task_executor: Arc + Send>>, TaskType) + Send + Sync>, key_seed: Option, base_port: u16, root: &TempDir, @@ -249,7 +250,7 @@ impl TestNet where for (key, authority) in authorities { let task_executor = { let executor = executor.clone(); - Arc::new(move |fut: Pin + Send>>| executor.spawn(fut.unit_error().compat())) + Arc::new(move |fut: Pin + Send>>, _| executor.spawn(fut.unit_error().compat())) }; let node_config = node_config( self.nodes, @@ -273,7 +274,7 @@ impl TestNet where for full in full { let task_executor = { let executor = executor.clone(); - Arc::new(move |fut: Pin + Send>>| executor.spawn(fut.unit_error().compat())) + Arc::new(move |fut: Pin + Send>>, _| executor.spawn(fut.unit_error().compat())) }; let node_config = node_config(self.nodes, &self.chain_spec, Role::Full, task_executor, None, self.base_port, &temp); let addr = node_config.network.listen_addresses.iter().next().unwrap().clone(); @@ -289,7 +290,7 @@ impl TestNet where for light in light { let task_executor = { let executor = executor.clone(); - Arc::new(move |fut: Pin + Send>>| executor.spawn(fut.unit_error().compat())) + Arc::new(move |fut: Pin + Send>>, _| executor.spawn(fut.unit_error().compat())) }; let node_config = node_config(self.nodes, &self.chain_spec, Role::Light, task_executor, None, self.base_port, &temp); let addr = node_config.network.listen_addresses.iter().next().unwrap().clone(); diff --git a/utils/browser/src/lib.rs b/utils/browser/src/lib.rs index 9be248883cb91..63ef450c26740 100644 --- a/utils/browser/src/lib.rs +++ b/utils/browser/src/lib.rs @@ -63,7 +63,7 @@ where network, telemetry_endpoints: chain_spec.telemetry_endpoints().clone(), chain_spec: Box::new(chain_spec), - task_executor: Arc::new(move |fut| wasm_bindgen_futures::spawn_local(fut)), + task_executor: Arc::new(move |fut, _| wasm_bindgen_futures::spawn_local(fut)), telemetry_external_transport: Some(transport), role: Role::Light, database: { From 7dc8aa96389869d4388349f16e0797763487c448 Mon Sep 17 00:00:00 2001 From: Scott Piriou Date: Thu, 23 Apr 2020 17:56:41 +0200 Subject: [PATCH 11/15] Use futures::executor::block_on instead of || wrapping the future --- client/cli/src/runner.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/client/cli/src/runner.rs b/client/cli/src/runner.rs index 47b8f395263b3..6ef1d04810d9f 100644 --- a/client/cli/src/runner.rs +++ b/client/cli/src/runner.rs @@ -116,15 +116,15 @@ impl Runner { /// Create a new runtime with the command provided in argument pub fn new(cli: &C, command: &T) -> Result> { let tokio_runtime = build_runtime()?; + let runtime_handle = tokio_runtime.handle().clone(); let task_executor = { - let runtime_handle = tokio_runtime.handle().clone(); Arc::new(move |fut, task_type| { match task_type { TaskType::Async => { runtime_handle.spawn(fut); } TaskType::Blocking => { runtime_handle.spawn( async move { - tokio::task::spawn_blocking(|| fut) + tokio::task::spawn_blocking(move || futures::executor::block_on(fut)) }); } } From 786811c094b97094c885228601e13580603e4230 Mon Sep 17 00:00:00 2001 From: pscott <30843220+pscott@users.noreply.github.com> Date: Fri, 24 Apr 2020 19:15:43 +0200 Subject: [PATCH 12/15] Add comments to code Co-Authored-By: Pierre Krieger --- client/cli/src/runner.rs | 2 ++ client/service/src/config.rs | 6 +++--- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/client/cli/src/runner.rs b/client/cli/src/runner.rs index 6ef1d04810d9f..9f2d7d5d33e85 100644 --- a/client/cli/src/runner.rs +++ b/client/cli/src/runner.rs @@ -124,6 +124,8 @@ impl Runner { TaskType::Async => { runtime_handle.spawn(fut); } TaskType::Blocking => { runtime_handle.spawn( async move { + // `spawn_blocking` is looking for the current runtime, and as such has to be called + // from within `spawn`. tokio::task::spawn_blocking(move || futures::executor::block_on(fut)) }); } diff --git a/client/service/src/config.rs b/client/service/src/config.rs index 33bbe3dc085a4..fc0eabc7eeb6a 100644 --- a/client/service/src/config.rs +++ b/client/service/src/config.rs @@ -102,12 +102,12 @@ pub struct Configuration { pub announce_block: bool, } -#[derive(PartialEq)] /// Type for tasks spawned by the executor. +#[derive(PartialEq)] pub enum TaskType { - /// Async type. + /// Regular non-blocking futures. Polling the task is expected to be a lightweight operation. Async, - /// Blocking type. + /// The task might perform a lot of expensive CPU operations and/or call `thread::sleep`. Blocking, } From b891cbea917c214621e5c796a81f95e826177fc5 Mon Sep 17 00:00:00 2001 From: Scott Piriou Date: Fri, 24 Apr 2020 19:17:36 +0200 Subject: [PATCH 13/15] Remove useless block on task_executor declaration --- client/cli/src/runner.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/client/cli/src/runner.rs b/client/cli/src/runner.rs index 9f2d7d5d33e85..ed0d3ef7d7a96 100644 --- a/client/cli/src/runner.rs +++ b/client/cli/src/runner.rs @@ -118,8 +118,8 @@ impl Runner { let tokio_runtime = build_runtime()?; let runtime_handle = tokio_runtime.handle().clone(); - let task_executor = { - Arc::new(move |fut, task_type| { + let task_executor = Arc::new( + move |fut, task_type| { match task_type { TaskType::Async => { runtime_handle.spawn(fut); } TaskType::Blocking => { @@ -130,8 +130,8 @@ impl Runner { }); } } - }) - }; + } + ); Ok(Runner { config: command.create_configuration(cli, task_executor)?, From ddd28a15a2c04384f099b7f80aa92e528c20cc87 Mon Sep 17 00:00:00 2001 From: Scott Piriou Date: Fri, 24 Apr 2020 19:24:32 +0200 Subject: [PATCH 14/15] Add spawn_inner private function --- client/service/src/task_manager.rs | 37 ++++++++---------------------- 1 file changed, 10 insertions(+), 27 deletions(-) diff --git a/client/service/src/task_manager.rs b/client/service/src/task_manager.rs index c01334512ff6e..8d19a0e2795da 100644 --- a/client/service/src/task_manager.rs +++ b/client/service/src/task_manager.rs @@ -56,41 +56,24 @@ impl SpawnTaskHandle { /// In other words, it would be a bad idea for someone to do for example /// `spawn(format!("{:?}", some_public_key))`. pub fn spawn(&self, name: &'static str, task: impl Future + Send + 'static) { - let on_exit = self.on_exit.clone(); - let metrics = self.metrics.clone(); - - // Note that we increase the started counter here and not within the future. This way, - // we could properly visualize on Prometheus situations where the spawning doesn't work. - if let Some(metrics) = &self.metrics { - metrics.tasks_spawned.with_label_values(&[name]).inc(); - // We do a dummy increase in order for the task to show up in metrics. - metrics.tasks_ended.with_label_values(&[name]).inc_by(0); - } - - let future = async move { - if let Some(metrics) = metrics { - let poll_duration = metrics.poll_duration.with_label_values(&[name]); - let poll_start = metrics.poll_start.with_label_values(&[name]); - let task = prometheus_future::with_poll_durations(poll_duration, poll_start, task); - futures::pin_mut!(task); - let _ = select(on_exit, task).await; - metrics.tasks_ended.with_label_values(&[name]).inc(); - } else { - futures::pin_mut!(task); - let _ = select(on_exit, task).await; - } - }; - - (self.executor)(Box::pin(future), TaskType::Async); + self.spawn_inner(name, task, TaskType::Async) } /// Spawns the blocking task with the given name. See also `spawn`. pub fn spawn_blocking(&self, name: &'static str, task: impl Future + Send + 'static) { + self.spawn_inner(name, task, TaskType::Blocking) + } + + /// Helper function that implements the spawning logic. See `spawn` and `spawn_blocking`. + fn spawn_inner(&self, name: &'static str, task: impl Future + Send + 'static, task_type: TaskType) { let on_exit = self.on_exit.clone(); let metrics = self.metrics.clone(); + // Note that we increase the started counter here and not within the future. This way, + // we could properly visualize on Prometheus situations where the spawning doesn't work. if let Some(metrics) = &self.metrics { metrics.tasks_spawned.with_label_values(&[name]).inc(); + // We do a dummy increase in order for the task to show up in metrics. metrics.tasks_ended.with_label_values(&[name]).inc_by(0); } @@ -108,7 +91,7 @@ impl SpawnTaskHandle { } }; - (self.executor)(Box::pin(future), TaskType::Blocking); + (self.executor)(Box::pin(future), task_type); } } From 7b9f700af39c3b0427d9dd8f4f505523078e5591 Mon Sep 17 00:00:00 2001 From: Scott Piriou Date: Tue, 28 Apr 2020 13:16:17 +0200 Subject: [PATCH 15/15] Add Send to block_announce_validator_builder --- client/service/src/builder.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/client/service/src/builder.rs b/client/service/src/builder.rs index 64834a0980393..205d8439a719b 100644 --- a/client/service/src/builder.rs +++ b/client/service/src/builder.rs @@ -96,7 +96,7 @@ pub struct ServiceBuilder>>, marker: PhantomData<(TBl, TRtApi)>, background_tasks: Vec<(&'static str, BackgroundTask)>, - block_announce_validator_builder: Option) -> Box + Send>>>, + block_announce_validator_builder: Option) -> Box + Send> + Send>>, } /// Full client type. @@ -675,7 +675,7 @@ impl pub fn with_block_announce_validator( self, block_announce_validator_builder: - impl FnOnce(Arc) -> Box + Send> + 'static, + impl FnOnce(Arc) -> Box + Send> + Send + 'static, ) -> Result, Error> where TSc: Clone, TFchr: Clone {