diff --git a/Cargo.lock b/Cargo.lock index 6269e0336025e..33129d420fa04 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7336,7 +7336,6 @@ version = "0.8.0-dev" dependencies = [ "derive_more", "futures 0.3.4", - "futures-diagnose", "futures-timer 3.0.2", "libp2p", "log", diff --git a/bin/node-template/node/src/service.rs b/bin/node-template/node/src/service.rs index 16e5271cce943..51ebdbe1c1fae 100644 --- a/bin/node-template/node/src/service.rs +++ b/bin/node-template/node/src/service.rs @@ -41,7 +41,7 @@ macro_rules! new_full_start { let pool_api = sc_transaction_pool::FullChainApi::new(client.clone()); Ok(sc_transaction_pool::BasicPool::new(config, std::sync::Arc::new(pool_api), prometheus_registry)) })? - .with_import_queue(|_config, client, mut select_chain, _transaction_pool| { + .with_import_queue(|_config, client, mut select_chain, _transaction_pool, spawn_task_handle| { let select_chain = select_chain.take() .ok_or_else(|| sc_service::Error::SelectChainRequired)?; @@ -52,13 +52,16 @@ macro_rules! new_full_start { grandpa_block_import.clone(), client.clone(), ); - let import_queue = sc_consensus_aura::import_queue::<_, _, _, AuraPair>( + let spawner = |future| spawn_task_handle.spawn_blocking("import-queue-worker", future); + + let import_queue = sc_consensus_aura::import_queue::<_, _, _, AuraPair, _>( sc_consensus_aura::slot_duration(&*client)?, aura_block_import, Some(Box::new(grandpa_block_import.clone())), None, client, inherent_data_providers.clone(), + spawner, )?; import_setup = Some((grandpa_block_import, grandpa_link)); @@ -191,7 +194,7 @@ pub fn new_light(config: Configuration) -> Result Result( + let spawner = |future| spawn_task_handle.spawn_blocking("import-queue-worker", future); + + let import_queue = sc_consensus_aura::import_queue::<_, _, _, AuraPair, _>( sc_consensus_aura::slot_duration(&*client)?, grandpa_block_import, None, Some(Box::new(finality_proof_import)), client, inherent_data_providers.clone(), + spawner, )?; Ok((import_queue, finality_proof_request_builder)) diff --git a/bin/node/cli/src/service.rs b/bin/node/cli/src/service.rs index c8b0e50c4ff27..377a45498f001 100644 --- a/bin/node/cli/src/service.rs +++ b/bin/node/cli/src/service.rs @@ -52,7 +52,7 @@ macro_rules! new_full_start { let pool_api = sc_transaction_pool::FullChainApi::new(client.clone()); Ok(sc_transaction_pool::BasicPool::new(config, std::sync::Arc::new(pool_api), prometheus_registry)) })? - .with_import_queue(|_config, client, mut select_chain, _transaction_pool| { + .with_import_queue(|_config, client, mut select_chain, _transaction_pool, spawn_task_handle| { let select_chain = select_chain.take() .ok_or_else(|| sc_service::Error::SelectChainRequired)?; let (grandpa_block_import, grandpa_link) = grandpa::block_import( @@ -68,6 +68,8 @@ macro_rules! new_full_start { client.clone(), )?; + let spawner = |future| spawn_task_handle.spawn_blocking("import-queue-worker", future); + let import_queue = sc_consensus_babe::import_queue( babe_link.clone(), block_import.clone(), @@ -75,6 +77,7 @@ macro_rules! new_full_start { None, client, inherent_data_providers.clone(), + spawner, )?; import_setup = Some((block_import, grandpa_link, babe_link)); @@ -284,7 +287,7 @@ pub fn new_light(config: Configuration) ); Ok(pool) })? - .with_import_queue_and_fprb(|_config, client, backend, fetcher, _select_chain, _tx_pool| { + .with_import_queue_and_fprb(|_config, client, backend, fetcher, _select_chain, _tx_pool, spawn_task_handle| { let fetch_checker = fetcher .map(|fetcher| fetcher.checker().clone()) .ok_or_else(|| "Trying to start light import queue without active fetch checker")?; @@ -305,6 +308,8 @@ pub fn new_light(config: Configuration) client.clone(), )?; + let spawner = |future| spawn_task_handle.spawn_blocking("import-queue-worker", future); + let import_queue = sc_consensus_babe::import_queue( babe_link, babe_block_import, @@ -312,6 +317,7 @@ pub fn new_light(config: Configuration) Some(Box::new(finality_proof_import)), client.clone(), inherent_data_providers.clone(), + spawner, )?; Ok((import_queue, finality_proof_request_builder)) diff --git a/client/cli/src/config.rs b/client/cli/src/config.rs index c73992c59cbdf..84c2338dcb86a 100644 --- a/client/cli/src/config.rs +++ b/client/cli/src/config.rs @@ -27,7 +27,7 @@ use names::{Generator, Name}; use sc_service::config::{ WasmExecutionMethod, Role, OffchainWorkerConfig, Configuration, DatabaseConfig, ExtTransport, KeystoreConfig, NetworkConfiguration, - NodeKeyConfig, PrometheusConfig, PruningMode, TelemetryEndpoints, TransactionPoolOptions, + NodeKeyConfig, PrometheusConfig, PruningMode, TelemetryEndpoints, TransactionPoolOptions, TaskType }; use sc_client_api::execution_extensions::ExecutionStrategies; use sc_service::{ChainSpec, TracingReceiver}; @@ -385,7 +385,7 @@ pub trait CliConfiguration: Sized { fn create_configuration( &self, cli: &C, - task_executor: Arc + Send>>) + Send + Sync>, + task_executor: Arc + Send>>, TaskType) + Send + Sync>, ) -> Result { let is_dev = self.is_dev()?; let chain_id = self.chain_id(is_dev)?; diff --git a/client/cli/src/lib.rs b/client/cli/src/lib.rs index 25b71059b17e6..e723573bc2bdd 100644 --- a/client/cli/src/lib.rs +++ b/client/cli/src/lib.rs @@ -35,7 +35,7 @@ use log::info; pub use params::*; use regex::Regex; pub use runner::*; -use sc_service::{ChainSpec, Configuration}; +use sc_service::{ChainSpec, Configuration, TaskType}; use std::future::Future; use std::io::Write; use std::pin::Pin; @@ -177,7 +177,7 @@ pub trait SubstrateCli: Sized { fn create_configuration( &self, command: &T, - task_executor: Arc + Send>>) + Send + Sync>, + task_executor: Arc + Send>>, TaskType) + Send + Sync>, ) -> error::Result { command.create_configuration(self, task_executor) } diff --git a/client/cli/src/runner.rs b/client/cli/src/runner.rs index 6ebe84f9c55e8..ed0d3ef7d7a96 100644 --- a/client/cli/src/runner.rs +++ b/client/cli/src/runner.rs @@ -23,7 +23,7 @@ use futures::pin_mut; use futures::select; use futures::{future, future::FutureExt, Future}; use log::info; -use sc_service::{AbstractService, Configuration, Role, ServiceBuilderCommand}; +use sc_service::{AbstractService, Configuration, Role, ServiceBuilderCommand, TaskType}; use sp_runtime::traits::{Block as BlockT, Header as HeaderT}; use sp_utils::metrics::{TOKIO_THREADS_ALIVE, TOKIO_THREADS_TOTAL}; use std::fmt::Debug; @@ -116,13 +116,22 @@ impl Runner { /// Create a new runtime with the command provided in argument pub fn new(cli: &C, command: &T) -> Result> { let tokio_runtime = build_runtime()?; - - let task_executor = { - let runtime_handle = tokio_runtime.handle().clone(); - Arc::new(move |fut| { - runtime_handle.spawn(fut); - }) - }; + let runtime_handle = tokio_runtime.handle().clone(); + + let task_executor = Arc::new( + move |fut, task_type| { + match task_type { + TaskType::Async => { runtime_handle.spawn(fut); } + TaskType::Blocking => { + runtime_handle.spawn( async move { + // `spawn_blocking` is looking for the current runtime, and as such has to be called + // from within `spawn`. + tokio::task::spawn_blocking(move || futures::executor::block_on(fut)) + }); + } + } + } + ); Ok(Runner { config: command.create_configuration(cli, task_executor)?, diff --git a/client/consensus/aura/src/lib.rs b/client/consensus/aura/src/lib.rs index daa181abba5a5..bbbcf6874477e 100644 --- a/client/consensus/aura/src/lib.rs +++ b/client/consensus/aura/src/lib.rs @@ -33,7 +33,7 @@ use std::{ collections::HashMap }; -use futures::prelude::*; +use futures::{prelude::*, future::BoxFuture}; use parking_lot::Mutex; use log::{debug, info, trace}; @@ -788,13 +788,14 @@ impl BlockImport for AuraBlockImport( +pub fn import_queue( slot_duration: SlotDuration, block_import: I, justification_import: Option>, finality_proof_import: Option>, client: Arc, inherent_data_providers: InherentDataProviders, + spawner: F, ) -> Result>, sp_consensus::Error> where B: BlockT, C::Api: BlockBuilderApi + AuraApi> + ApiExt, @@ -804,6 +805,7 @@ pub fn import_queue( P: Pair + Send + Sync + 'static, P::Public: Clone + Eq + Send + Sync + Hash + Debug + Encode + Decode, P::Signature: Encode + Decode, + F: Fn(BoxFuture<'static, ()>) -> (), { register_aura_inherent_data_provider(&inherent_data_providers, slot_duration.get())?; initialize_authorities_cache(&*client)?; @@ -818,6 +820,7 @@ pub fn import_queue( Box::new(block_import), justification_import, finality_proof_import, + spawner, )) } diff --git a/client/consensus/babe/src/lib.rs b/client/consensus/babe/src/lib.rs index 028fe8e5e346f..4f8975d101d81 100644 --- a/client/consensus/babe/src/lib.rs +++ b/client/consensus/babe/src/lib.rs @@ -107,7 +107,7 @@ use sc_client_api::{ }; use sp_block_builder::BlockBuilder as BlockBuilderApi; -use futures::prelude::*; +use futures::{prelude::*, future::BoxFuture}; use log::{debug, info, log, trace, warn}; use sc_consensus_slots::{ SlotWorker, SlotInfo, SlotCompatible, StorageChanges, CheckedHeader, check_equivocation, @@ -1272,6 +1272,7 @@ pub fn import_queue( finality_proof_import: Option>, client: Arc, inherent_data_providers: InherentDataProviders, + spawner: impl Fn(BoxFuture<'static, ()>) -> (), ) -> ClientResult>> where Inner: BlockImport> + Send + Sync + 'static, @@ -1294,6 +1295,7 @@ pub fn import_queue( Box::new(block_import), justification_import, finality_proof_import, + spawner, )) } diff --git a/client/consensus/manual-seal/src/lib.rs b/client/consensus/manual-seal/src/lib.rs index 687d072aaa058..caddb47d3c8bf 100644 --- a/client/consensus/manual-seal/src/lib.rs +++ b/client/consensus/manual-seal/src/lib.rs @@ -17,7 +17,7 @@ //! A manual sealing engine: the engine listens for rpc calls to seal blocks and create forks. //! This is suitable for a testing environment. -use futures::prelude::*; +use futures::{prelude::*, future::BoxFuture}; use sp_consensus::{ Environment, Proposer, ForkChoiceStrategy, BlockImportParams, BlockOrigin, SelectChain, import_queue::{BasicQueue, CacheKeyId, Verifier, BoxBlockImport}, @@ -67,7 +67,8 @@ impl Verifier for ManualSealVerifier { /// Instantiate the import queue for the manual seal consensus engine. pub fn import_queue( - block_import: BoxBlockImport> + block_import: BoxBlockImport>, + spawner: impl Fn(BoxFuture<'static, ()>) -> () ) -> BasicQueue> where Block: BlockT, @@ -78,6 +79,7 @@ pub fn import_queue( Box::new(block_import), None, None, + spawner, ) } diff --git a/client/consensus/pow/src/lib.rs b/client/consensus/pow/src/lib.rs index e0149b475b713..5d504d8ffc01d 100644 --- a/client/consensus/pow/src/lib.rs +++ b/client/consensus/pow/src/lib.rs @@ -54,6 +54,7 @@ use codec::{Encode, Decode}; use sc_client_api; use log::*; use sp_timestamp::{InherentError as TIError, TimestampInherentData}; +use futures::future::BoxFuture; #[derive(derive_more::Display, Debug)] pub enum Error { @@ -461,6 +462,7 @@ pub fn import_queue( finality_proof_import: Option>, algorithm: Algorithm, inherent_data_providers: InherentDataProviders, + spawner: impl Fn(BoxFuture<'static, ()>) -> (), ) -> Result< PowImportQueue, sp_consensus::Error @@ -477,7 +479,8 @@ pub fn import_queue( verifier, block_import, justification_import, - finality_proof_import + finality_proof_import, + spawner, )) } diff --git a/client/network/src/service/tests.rs b/client/network/src/service/tests.rs index 84c393fbd2f42..6c38d1d87469b 100644 --- a/client/network/src/service/tests.rs +++ b/client/network/src/service/tests.rs @@ -80,11 +80,15 @@ fn build_test_full_node(config: config::NetworkConfiguration) } } + let threads_pool = futures::executor::ThreadPool::new().unwrap(); + let spawner = |future| threads_pool.spawn_ok(future); + let import_queue = Box::new(sp_consensus::import_queue::BasicQueue::new( PassThroughVerifier(false), Box::new(client.clone()), None, None, + spawner, )); let worker = NetworkWorker::new(config::Params { diff --git a/client/network/test/src/block_import.rs b/client/network/test/src/block_import.rs index aa6d275141fc7..5f152fa1322ee 100644 --- a/client/network/test/src/block_import.rs +++ b/client/network/test/src/block_import.rs @@ -84,7 +84,10 @@ fn async_import_queue_drops() { // Perform this test multiple times since it exhibits non-deterministic behavior. for _ in 0..100 { let verifier = PassThroughVerifier(true); - let queue = BasicQueue::new(verifier, Box::new(substrate_test_runtime_client::new()), None, None); + + let threads_pool = futures::executor::ThreadPool::new().unwrap(); + let spawner = |future| threads_pool.spawn_ok(future); + let queue = BasicQueue::new(verifier, Box::new(substrate_test_runtime_client::new()), None, None, spawner); drop(queue); } } diff --git a/client/network/test/src/lib.rs b/client/network/test/src/lib.rs index 5be5de9078ef1..9c2c4e3bfdf68 100644 --- a/client/network/test/src/lib.rs +++ b/client/network/test/src/lib.rs @@ -606,11 +606,15 @@ pub trait TestNetFactory: Sized { ); let verifier = VerifierAdapter::new(Arc::new(Mutex::new(Box::new(verifier) as Box<_>))); + let threads_pool = futures::executor::ThreadPool::new().unwrap(); + let spawner = |future| threads_pool.spawn_ok(future); + let import_queue = Box::new(BasicQueue::new( verifier.clone(), Box::new(block_import.clone()), justification_import, finality_proof_import, + spawner, )); let listen_addr = build_multiaddr![Memory(rand::random::())]; @@ -683,11 +687,15 @@ pub trait TestNetFactory: Sized { ); let verifier = VerifierAdapter::new(Arc::new(Mutex::new(Box::new(verifier) as Box<_>))); + let threads_pool = futures::executor::ThreadPool::new().unwrap(); + let spawner = |future| threads_pool.spawn_ok(future); + let import_queue = Box::new(BasicQueue::new( verifier.clone(), Box::new(block_import.clone()), justification_import, finality_proof_import, + spawner, )); let listen_addr = build_multiaddr![Memory(rand::random::())]; diff --git a/client/service/src/builder.rs b/client/service/src/builder.rs index a108561134425..74055bf976f3a 100644 --- a/client/service/src/builder.rs +++ b/client/service/src/builder.rs @@ -15,7 +15,7 @@ // along with Substrate. If not, see . use crate::{Service, NetworkStatus, NetworkState, error::Error, DEFAULT_PROTOCOL_ID, MallocSizeOfWasm}; -use crate::{start_rpc_servers, build_network_future, TransactionPoolAdapter, TaskManager}; +use crate::{start_rpc_servers, build_network_future, TransactionPoolAdapter, TaskManager, SpawnTaskHandle}; use crate::status_sinks; use crate::config::{Configuration, KeystoreConfig, PrometheusConfig, OffchainWorkerConfig}; use crate::metrics::MetricsService; @@ -97,7 +97,7 @@ pub struct ServiceBuilder>>, marker: PhantomData<(TBl, TRtApi)>, background_tasks: Vec<(&'static str, BackgroundTask)>, - block_announce_validator_builder: Option) -> Box + Send>>>, + block_announce_validator_builder: Option) -> Box + Send> + Send>>, } /// Full client type. @@ -487,7 +487,7 @@ impl /// Defines which import queue to use. pub fn with_import_queue( self, - builder: impl FnOnce(&Configuration, Arc, Option, Arc) + builder: impl FnOnce(&Configuration, Arc, Option, Arc, &SpawnTaskHandle) -> Result ) -> Result, Error> @@ -496,7 +496,8 @@ impl &self.config, self.client.clone(), self.select_chain.clone(), - self.transaction_pool.clone() + self.transaction_pool.clone(), + &self.task_manager.spawn_handle(), )?; Ok(ServiceBuilder { @@ -588,6 +589,7 @@ impl Option, Option, Arc, + &SpawnTaskHandle, ) -> Result<(UImpQu, Option), Error> ) -> Result, Error> @@ -598,7 +600,8 @@ impl self.backend.clone(), self.fetcher.clone(), self.select_chain.clone(), - self.transaction_pool.clone() + self.transaction_pool.clone(), + &self.task_manager.spawn_handle(), )?; Ok(ServiceBuilder { @@ -631,12 +634,13 @@ impl Option, Option, Arc, + &SpawnTaskHandle, ) -> Result<(UImpQu, UFprb), Error> ) -> Result, Error> where TSc: Clone, TFchr: Clone { - self.with_import_queue_and_opt_fprb(|cfg, cl, b, f, sc, tx| - builder(cfg, cl, b, f, sc, tx) + self.with_import_queue_and_opt_fprb(|cfg, cl, b, f, sc, tx, tb| + builder(cfg, cl, b, f, sc, tx, tb) .map(|(q, f)| (q, Some(f))) ) } @@ -718,7 +722,7 @@ impl pub fn with_block_announce_validator( self, block_announce_validator_builder: - impl FnOnce(Arc) -> Box + Send> + 'static, + impl FnOnce(Arc) -> Box + Send> + Send + 'static, ) -> Result, Error> where TSc: Clone, TFchr: Clone { diff --git a/client/service/src/chain_ops.rs b/client/service/src/chain_ops.rs index 59dbc8302c2e5..6303376dc1601 100644 --- a/client/service/src/chain_ops.rs +++ b/client/service/src/chain_ops.rs @@ -56,12 +56,13 @@ impl< TExecDisp: 'static + NativeExecutionDispatch, TImpQu: 'static + ImportQueue, TRtApi: 'static + Send + Sync, + Self: Send + 'static, { type Block = TBl; type NativeDispatch = TExecDisp; fn import_blocks( - self, + mut self, input: impl Read + Seek + Send + 'static, force: bool, ) -> Pin> + Send>> { @@ -98,9 +99,6 @@ impl< } } - let client = self.client; - let mut queue = self.import_queue; - let mut io_reader_input = IoReader(input); let mut count = None::; let mut read_block_count = 0; @@ -114,6 +112,9 @@ impl< // This makes it possible either to interleave other operations in-between the block imports, // or to stop the operation completely. let import = future::poll_fn(move |cx| { + let client = &self.client; + let queue = &mut self.import_queue; + // Start by reading the number of blocks if not done so already. let count = match count { Some(c) => c, @@ -203,13 +204,12 @@ impl< to: Option>, binary: bool ) -> Pin>>> { - let client = self.client; let mut block = from; let last = match to { Some(v) if v.is_zero() => One::one(), Some(v) => v, - None => client.chain_info().best_number, + None => self.client.chain_info().best_number, }; let mut wrote_header = false; @@ -222,6 +222,8 @@ impl< // This makes it possible either to interleave other operations in-between the block exports, // or to stop the operation completely. let export = future::poll_fn(move |cx| { + let client = &self.client; + if last < block { return std::task::Poll::Ready(Err("Invalid block range specified".into())); } diff --git a/client/service/src/config.rs b/client/service/src/config.rs index 4654158cb3616..e1adc02e9a43e 100644 --- a/client/service/src/config.rs +++ b/client/service/src/config.rs @@ -38,7 +38,7 @@ pub struct Configuration { /// Node role. pub role: Role, /// How to spawn background tasks. Mandatory, otherwise creating a `Service` will error. - pub task_executor: Arc + Send>>) + Send + Sync>, + pub task_executor: Arc + Send>>, TaskType) + Send + Sync>, /// Extrinsic pool configuration. pub transaction_pool: TransactionPoolOptions, /// Network configuration. @@ -102,6 +102,15 @@ pub struct Configuration { pub announce_block: bool, } +/// Type for tasks spawned by the executor. +#[derive(PartialEq)] +pub enum TaskType { + /// Regular non-blocking futures. Polling the task is expected to be a lightweight operation. + Async, + /// The task might perform a lot of expensive CPU operations and/or call `thread::sleep`. + Blocking, +} + /// Configuration of the client keystore. #[derive(Clone)] pub enum KeystoreConfig { diff --git a/client/service/src/lib.rs b/client/service/src/lib.rs index 8e8c9e1e37bb3..07899a6fd3203 100644 --- a/client/service/src/lib.rs +++ b/client/service/src/lib.rs @@ -64,7 +64,7 @@ pub use self::builder::{ ServiceBuilder, ServiceBuilderCommand, TFullClient, TLightClient, TFullBackend, TLightBackend, TFullCallExecutor, TLightCallExecutor, }; -pub use config::{Configuration, Role, PruningMode, DatabaseConfig}; +pub use config::{Configuration, Role, PruningMode, DatabaseConfig, TaskType}; pub use sc_chain_spec::{ ChainSpec, GenericChainSpec, Properties, RuntimeGenesis, Extension as ChainSpecExtension, NoExtension, ChainType, diff --git a/client/service/src/task_manager.rs b/client/service/src/task_manager.rs index e6847d0881120..8d19a0e2795da 100644 --- a/client/service/src/task_manager.rs +++ b/client/service/src/task_manager.rs @@ -31,11 +31,12 @@ use prometheus_endpoint::{ CounterVec, HistogramOpts, HistogramVec, Opts, Registry, U64 }; use sc_client_api::CloneableSpawn; +use crate::config::TaskType; mod prometheus_future; /// Type alias for service task executor (usually runtime). -pub type ServiceTaskExecutor = Arc + Send>>) + Send + Sync>; +pub type ServiceTaskExecutor = Arc + Send>>, TaskType) + Send + Sync>; /// An handle for spawning tasks in the service. #[derive(Clone)] @@ -55,6 +56,16 @@ impl SpawnTaskHandle { /// In other words, it would be a bad idea for someone to do for example /// `spawn(format!("{:?}", some_public_key))`. pub fn spawn(&self, name: &'static str, task: impl Future + Send + 'static) { + self.spawn_inner(name, task, TaskType::Async) + } + + /// Spawns the blocking task with the given name. See also `spawn`. + pub fn spawn_blocking(&self, name: &'static str, task: impl Future + Send + 'static) { + self.spawn_inner(name, task, TaskType::Blocking) + } + + /// Helper function that implements the spawning logic. See `spawn` and `spawn_blocking`. + fn spawn_inner(&self, name: &'static str, task: impl Future + Send + 'static, task_type: TaskType) { let on_exit = self.on_exit.clone(); let metrics = self.metrics.clone(); @@ -80,7 +91,7 @@ impl SpawnTaskHandle { } }; - (self.executor)(Box::pin(future)); + (self.executor)(Box::pin(future), task_type); } } diff --git a/client/service/test/src/lib.rs b/client/service/test/src/lib.rs index fd451ffc8d55f..11398f80032d5 100644 --- a/client/service/test/src/lib.rs +++ b/client/service/test/src/lib.rs @@ -36,6 +36,7 @@ use sc_service::{ RuntimeGenesis, Role, Error, + TaskType, }; use sp_blockchain::HeaderBackend; use sc_network::{multiaddr, Multiaddr}; @@ -139,7 +140,7 @@ fn node_config, role: Role, - task_executor: Arc + Send>>) + Send + Sync>, + task_executor: Arc + Send>>, TaskType) + Send + Sync>, key_seed: Option, base_port: u16, root: &TempDir, @@ -255,7 +256,7 @@ impl TestNet where for (key, authority) in authorities { let task_executor = { let executor = executor.clone(); - Arc::new(move |fut: Pin + Send>>| executor.spawn(fut.unit_error().compat())) + Arc::new(move |fut: Pin + Send>>, _| executor.spawn(fut.unit_error().compat())) }; let node_config = node_config( self.nodes, @@ -279,7 +280,7 @@ impl TestNet where for full in full { let task_executor = { let executor = executor.clone(); - Arc::new(move |fut: Pin + Send>>| executor.spawn(fut.unit_error().compat())) + Arc::new(move |fut: Pin + Send>>, _| executor.spawn(fut.unit_error().compat())) }; let node_config = node_config(self.nodes, &self.chain_spec, Role::Full, task_executor, None, self.base_port, &temp); let addr = node_config.network.listen_addresses.iter().next().unwrap().clone(); @@ -295,7 +296,7 @@ impl TestNet where for light in light { let task_executor = { let executor = executor.clone(); - Arc::new(move |fut: Pin + Send>>| executor.spawn(fut.unit_error().compat())) + Arc::new(move |fut: Pin + Send>>, _| executor.spawn(fut.unit_error().compat())) }; let node_config = node_config(self.nodes, &self.chain_spec, Role::Light, task_executor, None, self.base_port, &temp); let addr = node_config.network.listen_addresses.iter().next().unwrap().clone(); diff --git a/primitives/consensus/common/Cargo.toml b/primitives/consensus/common/Cargo.toml index d34333c883149..7101dde2fc3aa 100644 --- a/primitives/consensus/common/Cargo.toml +++ b/primitives/consensus/common/Cargo.toml @@ -22,7 +22,6 @@ sp-inherents = { version = "2.0.0-dev", path = "../../inherents" } sp-state-machine = { version = "0.8.0-dev", path = "../../../primitives/state-machine" } futures = { version = "0.3.1", features = ["thread-pool"] } futures-timer = "3.0.1" -futures-diagnose = "1.0" sp-std = { version = "2.0.0-dev", path = "../../std" } sp-version = { version = "2.0.0-dev", path = "../../version" } sp-runtime = { version = "2.0.0-dev", path = "../../runtime" } diff --git a/primitives/consensus/common/src/import_queue/basic_queue.rs b/primitives/consensus/common/src/import_queue/basic_queue.rs index 024e473849349..f39097ef3b366 100644 --- a/primitives/consensus/common/src/import_queue/basic_queue.rs +++ b/primitives/consensus/common/src/import_queue/basic_queue.rs @@ -14,10 +14,9 @@ // You should have received a copy of the GNU General Public License // along with Substrate. If not, see . -use std::{mem, pin::Pin, time::Duration, marker::PhantomData, sync::Arc}; -use futures::{prelude::*, task::Context, task::Poll}; +use std::{mem, pin::Pin, time::Duration, marker::PhantomData}; +use futures::{prelude::*, task::Context, task::Poll, future::BoxFuture}; use futures_timer::Delay; -use parking_lot::{Mutex, Condvar}; use sp_runtime::{Justification, traits::{Block as BlockT, Header as HeaderT, NumberFor}}; use sp_utils::mpsc::{TracingUnboundedSender, tracing_unbounded}; @@ -36,31 +35,14 @@ pub struct BasicQueue { sender: TracingUnboundedSender>, /// Results coming from the worker task. result_port: BufferedLinkReceiver, - /// If it isn't possible to spawn the future in `future_to_spawn` (which is notably the case in - /// "no std" environment), we instead put it in `manual_poll`. It is then polled manually from - /// `poll_actions`. - manual_poll: Option + Send>>>, - /// A thread pool where the background worker is being run. - pool: Option, - pool_guard: Arc<(Mutex, Condvar)>, _phantom: PhantomData, } impl Drop for BasicQueue { fn drop(&mut self) { - self.pool = None; // Flush the queue and close the receiver to terminate the future. self.sender.close_channel(); self.result_port.close(); - - // Make sure all pool threads terminate. - // https://github.com/rust-lang/futures-rs/issues/1470 - // https://github.com/rust-lang/futures-rs/issues/1349 - let (ref mutex, ref condvar) = *self.pool_guard; - let mut lock = mutex.lock(); - while *lock != 0 { - condvar.wait(&mut lock); - } } } @@ -74,6 +56,7 @@ impl BasicQueue { block_import: BoxBlockImport, justification_import: Option>, finality_proof_import: Option>, + spawner: impl Fn(BoxFuture<'static, ()>) -> (), ) -> Self { let (result_sender, result_port) = buffered_link::buffered_link(); let (future, worker_sender) = BlockImportWorker::new( @@ -84,39 +67,11 @@ impl BasicQueue { finality_proof_import, ); - let guard = Arc::new((Mutex::new(0usize), Condvar::new())); - let guard_start = guard.clone(); - let guard_end = guard.clone(); - - let mut pool = futures::executor::ThreadPool::builder() - .name_prefix("import-queue-worker-") - .pool_size(1) - .after_start(move |_| *guard_start.0.lock() += 1) - .before_stop(move |_| { - let (ref mutex, ref condvar) = *guard_end; - let mut lock = mutex.lock(); - *lock -= 1; - if *lock == 0 { - condvar.notify_one(); - } - }) - .create() - .ok(); - - let manual_poll; - if let Some(pool) = &mut pool { - pool.spawn_ok(futures_diagnose::diagnose("import-queue", future)); - manual_poll = None; - } else { - manual_poll = Some(Box::pin(future) as Pin>); - } + spawner(future.boxed()); Self { sender: worker_sender, result_port, - manual_poll, - pool, - pool_guard: guard, _phantom: PhantomData, } } @@ -160,15 +115,6 @@ impl ImportQueue for BasicQueue } fn poll_actions(&mut self, cx: &mut Context, link: &mut dyn Link) { - // As a backup mechanism, if we failed to spawn the `future_to_spawn`, we instead poll - // manually here. - if let Some(manual_poll) = self.manual_poll.as_mut() { - match Future::poll(Pin::new(manual_poll), cx) { - Poll::Pending => {} - _ => self.manual_poll = None, - } - } - self.result_port.poll_actions(cx, link); } } diff --git a/utils/browser/src/lib.rs b/utils/browser/src/lib.rs index 9be248883cb91..63ef450c26740 100644 --- a/utils/browser/src/lib.rs +++ b/utils/browser/src/lib.rs @@ -63,7 +63,7 @@ where network, telemetry_endpoints: chain_spec.telemetry_endpoints().clone(), chain_spec: Box::new(chain_spec), - task_executor: Arc::new(move |fut| wasm_bindgen_futures::spawn_local(fut)), + task_executor: Arc::new(move |fut, _| wasm_bindgen_futures::spawn_local(fut)), telemetry_external_transport: Some(transport), role: Role::Light, database: {