From 15a451a038da05066c02c453f21e9ebb70f8747c Mon Sep 17 00:00:00 2001 From: Scott Piriou Date: Sat, 25 Apr 2020 10:53:00 +0200 Subject: [PATCH 1/6] Add spawner when importing queue --- service/src/lib.rs | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/service/src/lib.rs b/service/src/lib.rs index f27f19e1c721..a3d575c3b754 100644 --- a/service/src/lib.rs +++ b/service/src/lib.rs @@ -163,7 +163,7 @@ macro_rules! new_full_start { let pool = sc_transaction_pool::BasicPool::new(config, std::sync::Arc::new(pool_api), prometheus_registry); Ok(pool) })? - .with_import_queue(|config, client, mut select_chain, _| { + .with_import_queue(|config, client, mut select_chain, _, spawn_task_handle| { let select_chain = select_chain.take() .ok_or_else(|| service::Error::SelectChainRequired)?; @@ -189,6 +189,7 @@ macro_rules! new_full_start { client.clone(), )?; + let spawner = |future| spawn_task_handle.spawn_blocking("import-queue-worker", future); let import_queue = babe::import_queue( babe_link.clone(), block_import.clone(), @@ -196,6 +197,7 @@ macro_rules! new_full_start { None, client, inherent_data_providers.clone(), + spawner, )?; import_setup = Some((block_import, grandpa_link, babe_link)); @@ -726,7 +728,7 @@ where ); Ok(pool) })? - .with_import_queue_and_fprb(|_config, client, backend, fetcher, _select_chain, _| { + .with_import_queue_and_fprb(|_config, client, backend, fetcher, _select_chain, _, spawn_task_handle| { let fetch_checker = fetcher .map(|fetcher| fetcher.checker().clone()) .ok_or_else(|| "Trying to start light import queue without active fetch checker")?; @@ -744,6 +746,7 @@ where client.clone(), )?; + let spawner = |future| spawn_task_handle.spawn_blocking("import-queue-worker", future); // FIXME: pruning task isn't started since light client doesn't do `AuthoritySetup`. let import_queue = babe::import_queue( babe_link, @@ -752,6 +755,7 @@ where Some(Box::new(finality_proof_import)), client, inherent_data_providers.clone(), + spawner, )?; Ok((import_queue, finality_proof_request_builder)) From 7e248b4ed3e23699e360968bd5786adaa0904146 Mon Sep 17 00:00:00 2001 From: Scott Piriou Date: Mon, 27 Apr 2020 14:19:49 +0200 Subject: [PATCH 2/6] Add spawner to tests --- network/test/src/lib.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/network/test/src/lib.rs b/network/test/src/lib.rs index 36bdda0ba4d6..a58e4e0c2397 100644 --- a/network/test/src/lib.rs +++ b/network/test/src/lib.rs @@ -561,11 +561,13 @@ pub trait TestNetFactory: Sized { ); let verifier = VerifierAdapter::new(Arc::new(Mutex::new(Box::new(verifier) as Box<_>))); + let spawner = |future| spawn_task_handle.spawn_blocking("import-queue-worker", future); let import_queue = Box::new(BasicQueue::new( verifier.clone(), Box::new(block_import.clone()), justification_import, finality_proof_import, + spawner, )); let listen_addr = build_multiaddr![Memory(rand::random::())]; From dc505f7bbace7e0b426864abf9f8273d1becd681 Mon Sep 17 00:00:00 2001 From: Scott Piriou Date: Mon, 27 Apr 2020 14:19:49 +0200 Subject: [PATCH 3/6] Add spawner to tests --- network/test/src/lib.rs | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/network/test/src/lib.rs b/network/test/src/lib.rs index 36bdda0ba4d6..22f20e854819 100644 --- a/network/test/src/lib.rs +++ b/network/test/src/lib.rs @@ -561,11 +561,13 @@ pub trait TestNetFactory: Sized { ); let verifier = VerifierAdapter::new(Arc::new(Mutex::new(Box::new(verifier) as Box<_>))); + let spawner = |future| spawn_task_handle.spawn_blocking("import-queue-worker", future); let import_queue = Box::new(BasicQueue::new( verifier.clone(), Box::new(block_import.clone()), justification_import, finality_proof_import, + spawner, )); let listen_addr = build_multiaddr![Memory(rand::random::())]; @@ -636,11 +638,13 @@ pub trait TestNetFactory: Sized { ); let verifier = VerifierAdapter::new(Arc::new(Mutex::new(Box::new(verifier) as Box<_>))); + let spawner = |future| spawn_task_handle.spawn_blocking("import-queue-worker", future); let import_queue = Box::new(BasicQueue::new( verifier.clone(), Box::new(block_import.clone()), justification_import, finality_proof_import, + spawner, )); let listen_addr = build_multiaddr![Memory(rand::random::())]; From cfef11da658928ab74c9d1c1b14766308b0710ab Mon Sep 17 00:00:00 2001 From: Scott Piriou Date: Mon, 27 Apr 2020 17:50:22 +0200 Subject: [PATCH 4/6] Use threadpool for spawning import queue in tests --- network/test/src/lib.rs | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/network/test/src/lib.rs b/network/test/src/lib.rs index 22f20e854819..82dd6532a6e3 100644 --- a/network/test/src/lib.rs +++ b/network/test/src/lib.rs @@ -561,7 +561,9 @@ pub trait TestNetFactory: Sized { ); let verifier = VerifierAdapter::new(Arc::new(Mutex::new(Box::new(verifier) as Box<_>))); - let spawner = |future| spawn_task_handle.spawn_blocking("import-queue-worker", future); + let threads_pool = futures::executor::ThreadPool::new().unwrap(); + let spawner = |future| threads_pool.spawn_ok(future); + let import_queue = Box::new(BasicQueue::new( verifier.clone(), Box::new(block_import.clone()), @@ -638,7 +640,9 @@ pub trait TestNetFactory: Sized { ); let verifier = VerifierAdapter::new(Arc::new(Mutex::new(Box::new(verifier) as Box<_>))); - let spawner = |future| spawn_task_handle.spawn_blocking("import-queue-worker", future); + let threads_pool = futures::executor::ThreadPool::new().unwrap(); + let spawner = |future| threads_pool.spawn_ok(future); + let import_queue = Box::new(BasicQueue::new( verifier.clone(), Box::new(block_import.clone()), From 596e1e6a0ae072687347e09283c3bafdb6658a0c Mon Sep 17 00:00:00 2001 From: Scott Piriou Date: Wed, 29 Apr 2020 17:41:44 +0200 Subject: [PATCH 5/6] Add spawner closure to async_import_queue_drops test --- network/test/src/block_import.rs | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/network/test/src/block_import.rs b/network/test/src/block_import.rs index 2b61898e41e2..eaab561dec87 100644 --- a/network/test/src/block_import.rs +++ b/network/test/src/block_import.rs @@ -91,7 +91,11 @@ fn async_import_queue_drops() { // Perform this test multiple times since it exhibits non-deterministic behavior. for _ in 0..100 { let verifier = PassThroughVerifier(true); - let queue = BasicQueue::new(verifier, Box::new(polkadot_test_runtime_client::new()), None, None); + + let threads_pool = futures::executor::ThreadPool::new().unwrap(); + let spawner = |future| threads_pool.spawn_ok(future); + + let queue = BasicQueue::new(verifier, Box::new(polkadot_test_runtime_client::new()), None, None, spawner); drop(queue); } } From ffea011ff6349e10708790556a67bd2bdf20a5c0 Mon Sep 17 00:00:00 2001 From: Scott Piriou Date: Wed, 29 Apr 2020 18:15:33 +0200 Subject: [PATCH 6/6] Update start_collator_is_send test --- collator/src/lib.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/collator/src/lib.rs b/collator/src/lib.rs index 300ebeb723f8..7e4d44c0d9f3 100644 --- a/collator/src/lib.rs +++ b/collator/src/lib.rs @@ -441,7 +441,7 @@ mod tests { fn check_send(_: T) {} let cli = Cli::from_iter(&["-dev"]); - let task_executor = Arc::new(|_| unimplemented!()); + let task_executor = Arc::new(|_, _| unimplemented!()); let config = cli.create_configuration(&cli.run.base, task_executor).unwrap(); check_send(start_collator(