diff --git a/Cargo.lock b/Cargo.lock index 2ef17b3f2517..d374fa071f6e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4260,7 +4260,6 @@ dependencies = [ "adder", "derive_more 0.99.3", "halt", - "lazy_static", "log 0.4.8", "parity-scale-codec", "parking_lot 0.10.0", diff --git a/network/src/protocol/tests.rs b/network/src/protocol/tests.rs index d53ac1b6b887..cefc8f126f03 100644 --- a/network/src/protocol/tests.rs +++ b/network/src/protocol/tests.rs @@ -299,6 +299,7 @@ fn consensus_instances_cleaned_up() { signing_context, AvailabilityStore::new_in_memory(service.clone()), None, + None, )); pool.spawner().spawn_local(worker_task).unwrap(); @@ -329,6 +330,7 @@ fn collation_is_received_with_dropped_router() { signing_context, AvailabilityStore::new_in_memory(service.clone()), None, + None, )); pool.spawner().spawn_local(worker_task).unwrap(); @@ -550,6 +552,7 @@ fn fetches_pov_block_from_gossip() { signing_context, AvailabilityStore::new_in_memory(service.clone()), None, + None, )); let spawner = pool.spawner(); diff --git a/parachain/Cargo.toml b/parachain/Cargo.toml index 9651c29039c9..e8d34d080b77 100644 --- a/parachain/Cargo.toml +++ b/parachain/Cargo.toml @@ -16,7 +16,6 @@ sp-wasm-interface = { git = "https://github.com/paritytech/substrate", branch = sp-externalities = { git = "https://github.com/paritytech/substrate", branch = "master", optional = true } sc-executor = { git = "https://github.com/paritytech/substrate", branch = "master", optional = true } sp-io = { git = "https://github.com/paritytech/substrate", branch = "master", optional = true } -lazy_static = { version = "1.4.0", optional = true } parking_lot = { version = "0.10.0", optional = true } log = { version = "0.4.8", optional = true } @@ -38,7 +37,6 @@ std = [ "sp-std/std", "shared_memory", "sp-core/std", - "lazy_static", "parking_lot", "log", "sp-runtime-interface/std", diff --git a/parachain/src/wasm_executor/mod.rs b/parachain/src/wasm_executor/mod.rs index e98f9177050e..9b5369b0d867 100644 --- a/parachain/src/wasm_executor/mod.rs +++ b/parachain/src/wasm_executor/mod.rs @@ -28,7 +28,7 @@ use sp_core::traits::CallInWasm; use sp_wasm_interface::HostFunctions as _; #[cfg(not(target_os = "unknown"))] -pub use validation_host::{run_worker, EXECUTION_TIMEOUT_SEC}; +pub use validation_host::{run_worker, ValidationPool, EXECUTION_TIMEOUT_SEC}; mod validation_host; @@ -48,16 +48,31 @@ impl ParachainExt { } } +/// A stub validation-pool defined when compiling for WASM. +#[cfg(target_os = "unknown")] +#[derive(Clone)] +pub struct ValidationPool { + _inner: (), // private field means not publicly-instantiable +} + +#[cfg(target_os = "unknown")] +impl ValidationPool { + /// Create a new `ValidationPool`. + pub fn new() -> Self { + ValidationPool { _inner: () } + } +} + /// WASM code execution mode. /// /// > Note: When compiling for WASM, the `Remote` variants are not available. -pub enum ExecutionMode { +pub enum ExecutionMode<'a> { /// Execute in-process. The execution can not be interrupted or aborted. Local, /// Remote execution in a spawned process. - Remote, + Remote(&'a ValidationPool), /// Remote execution in a spawned test runner. - RemoteTest, + RemoteTest(&'a ValidationPool), } /// Error type for the wasm executor @@ -115,27 +130,27 @@ pub fn validate_candidate( validation_code: &[u8], params: ValidationParams, ext: E, - options: ExecutionMode, + options: ExecutionMode<'_>, ) -> Result { match options { ExecutionMode::Local => { validate_candidate_internal(validation_code, ¶ms.encode(), ext) }, #[cfg(not(target_os = "unknown"))] - ExecutionMode::Remote => { - validation_host::validate_candidate(validation_code, params, ext, false) + ExecutionMode::Remote(pool) => { + pool.validate_candidate(validation_code, params, ext, false) }, #[cfg(not(target_os = "unknown"))] - ExecutionMode::RemoteTest => { - validation_host::validate_candidate(validation_code, params, ext, true) + ExecutionMode::RemoteTest(pool) => { + pool.validate_candidate(validation_code, params, ext, true) }, #[cfg(target_os = "unknown")] - ExecutionMode::Remote => + ExecutionMode::Remote(pool) => Err(Error::System(Box::::from( "Remote validator not available".to_string() ) as Box<_>)), #[cfg(target_os = "unknown")] - ExecutionMode::RemoteTest => + ExecutionMode::RemoteTest(pool) => Err(Error::System(Box::::from( "Remote validator not available".to_string() ) as Box<_>)), diff --git a/parachain/src/wasm_executor/validation_host.rs b/parachain/src/wasm_executor/validation_host.rs index 4f74b3b25f41..06829a6a0b08 100644 --- a/parachain/src/wasm_executor/validation_host.rs +++ b/parachain/src/wasm_executor/validation_host.rs @@ -33,8 +33,6 @@ const WORKER_ARGS_TEST: &[&'static str] = &["--nocapture", "validation_worker"]; const WORKER_ARG: &'static str = "validation-worker"; const WORKER_ARGS: &[&'static str] = &[WORKER_ARG]; -const NUM_HOSTS: usize = 8; - /// Execution timeout in seconds; #[cfg(debug_assertions)] pub const EXECUTION_TIMEOUT_SEC: u64 = 30; @@ -69,8 +67,42 @@ enum Event { WorkerReady = 2, } -lazy_static::lazy_static! { - static ref HOSTS: [Mutex; NUM_HOSTS] = Default::default(); +/// A pool of hosts. +#[derive(Clone)] +pub struct ValidationPool { + hosts: Arc>>, +} + +const DEFAULT_NUM_HOSTS: usize = 8; + +impl ValidationPool { + /// Creates a validation pool with the default configuration. + pub fn new() -> ValidationPool { + ValidationPool { + hosts: Arc::new((0..DEFAULT_NUM_HOSTS).map(|_| Default::default()).collect()), + } + } + + /// Validate a candidate under the given validation code using the next + /// free validation host. + /// + /// This will fail if the validation code is not a proper parachain validation module. + pub fn validate_candidate( + &self, + validation_code: &[u8], + params: ValidationParams, + externalities: E, + test_mode: bool, + ) -> Result { + for host in self.hosts.iter() { + if let Some(mut host) = host.try_lock() { + return host.validate_candidate(validation_code, params, externalities, test_mode); + } + } + + // all workers are busy, just wait for the first one + self.hosts[0].lock().validate_candidate(validation_code, params, externalities, test_mode) + } } /// Validation worker process entry point. Runs a loop waiting for candidates to validate @@ -184,25 +216,6 @@ struct ValidationHost { id: u32, } -/// Validate a candidate under the given validation code. -/// -/// This will fail if the validation code is not a proper parachain validation module. -pub fn validate_candidate( - validation_code: &[u8], - params: ValidationParams, - externalities: E, - test_mode: bool, -) -> Result { - for host in HOSTS.iter() { - if let Some(mut host) = host.try_lock() { - return host.validate_candidate(validation_code, params, externalities, test_mode); - } - } - - // all workers are busy, just wait for the first one - HOSTS[0].lock().validate_candidate(validation_code, params, externalities, test_mode) -} - impl Drop for ValidationHost { fn drop(&mut self) { if let Some(ref mut worker) = &mut self.worker { diff --git a/parachain/tests/adder/mod.rs b/parachain/tests/adder/mod.rs index e4378d971403..7493a211cfd5 100644 --- a/parachain/tests/adder/mod.rs +++ b/parachain/tests/adder/mod.rs @@ -70,6 +70,8 @@ pub fn execute_good_on_parent() { add: 512, }; + let pool = parachain::wasm_executor::ValidationPool::new(); + let ret = parachain::wasm_executor::validate_candidate( TEST_CODE, ValidationParams { @@ -77,7 +79,7 @@ pub fn execute_good_on_parent() { block_data: block_data.encode(), }, DummyExt, - parachain::wasm_executor::ExecutionMode::RemoteTest, + parachain::wasm_executor::ExecutionMode::RemoteTest(&pool), ).unwrap(); let new_head = HeadData::decode(&mut &ret.head_data[..]).unwrap(); @@ -92,6 +94,7 @@ fn execute_good_chain_on_parent() { let mut number = 0; let mut parent_hash = [0; 32]; let mut last_state = 0; + let pool = parachain::wasm_executor::ValidationPool::new(); for add in 0..10 { let parent_head = HeadData { @@ -112,7 +115,7 @@ fn execute_good_chain_on_parent() { block_data: block_data.encode(), }, DummyExt, - parachain::wasm_executor::ExecutionMode::RemoteTest, + parachain::wasm_executor::ExecutionMode::RemoteTest(&pool), ).unwrap(); let new_head = HeadData::decode(&mut &ret.head_data[..]).unwrap(); @@ -129,6 +132,8 @@ fn execute_good_chain_on_parent() { #[test] fn execute_bad_on_parent() { + let pool = parachain::wasm_executor::ValidationPool::new(); + let parent_head = HeadData { number: 0, parent_hash: [0; 32], @@ -147,6 +152,6 @@ fn execute_bad_on_parent() { block_data: block_data.encode(), }, DummyExt, - parachain::wasm_executor::ExecutionMode::RemoteTest, + parachain::wasm_executor::ExecutionMode::RemoteTest(&pool), ).unwrap_err(); } diff --git a/parachain/tests/wasm_executor/mod.rs b/parachain/tests/wasm_executor/mod.rs index 619f36580ea1..92e657a2ff8c 100644 --- a/parachain/tests/wasm_executor/mod.rs +++ b/parachain/tests/wasm_executor/mod.rs @@ -25,6 +25,8 @@ const INFINITE_LOOP_CODE: &[u8] = halt::WASM_BINARY; #[test] fn terminates_on_timeout() { + let pool = parachain::wasm_executor::ValidationPool::new(); + let result = parachain::wasm_executor::validate_candidate( INFINITE_LOOP_CODE, ValidationParams { @@ -32,7 +34,7 @@ fn terminates_on_timeout() { block_data: Vec::new(), }, DummyExt, - parachain::wasm_executor::ExecutionMode::RemoteTest, + parachain::wasm_executor::ExecutionMode::RemoteTest(&pool), ); match result { Err(parachain::wasm_executor::Error::Timeout) => {}, @@ -45,7 +47,11 @@ fn terminates_on_timeout() { #[test] fn parallel_execution() { + let pool = parachain::wasm_executor::ValidationPool::new(); + let start = std::time::Instant::now(); + + let pool2 = pool.clone(); let thread = std::thread::spawn(move || parachain::wasm_executor::validate_candidate( INFINITE_LOOP_CODE, @@ -54,7 +60,7 @@ fn parallel_execution() { block_data: Vec::new(), }, DummyExt, - parachain::wasm_executor::ExecutionMode::RemoteTest, + parachain::wasm_executor::ExecutionMode::RemoteTest(&pool2), ).ok()); let _ = parachain::wasm_executor::validate_candidate( INFINITE_LOOP_CODE, @@ -63,7 +69,7 @@ fn parallel_execution() { block_data: Vec::new(), }, DummyExt, - parachain::wasm_executor::ExecutionMode::RemoteTest, + parachain::wasm_executor::ExecutionMode::RemoteTest(&pool), ); thread.join().unwrap(); // total time should be < 2 x EXECUTION_TIMEOUT_SEC diff --git a/validation/src/collation.rs b/validation/src/collation.rs index b7cdc644613d..a2e682a066d6 100644 --- a/validation/src/collation.rs +++ b/validation/src/collation.rs @@ -59,6 +59,7 @@ pub trait Collators: Clone { /// A future which resolves when a collation is available. pub async fn collation_fetch( + validation_pool: Option, parachain: ParaId, relay_parent: Hash, collators: C, @@ -76,6 +77,7 @@ pub async fn collation_fetch( let collation = collators.collate(parachain, relay_parent).await?; let Collation { info, pov } = collation; let res = crate::pipeline::full_output_validation_with_api( + validation_pool.as_ref(), &*client, &info, &pov, diff --git a/validation/src/pipeline.rs b/validation/src/pipeline.rs index fb14a42e8f4a..04c839a726e2 100644 --- a/validation/src/pipeline.rs +++ b/validation/src/pipeline.rs @@ -36,6 +36,8 @@ use sp_api::ProvideRuntimeApi; use parking_lot::Mutex; use crate::Error; +pub use parachain::wasm_executor::ValidationPool; + /// Does basic checks of a collation. Provide the encoded PoV-block. pub fn basic_checks( collation: &CollationInfo, @@ -227,6 +229,7 @@ impl<'a> ValidatedCandidate<'a> { /// Does full checks of a collation, with provided PoV-block and contextual data. pub fn validate<'a>( + validation_pool: Option<&'_ ValidationPool>, collation: &'a CollationInfo, pov_block: &'a PoVBlock, local_validation: &'a LocalValidationData, @@ -251,12 +254,16 @@ pub fn validate<'a>( per_byte: 0, }; + let execution_mode = validation_pool + .map(ExecutionMode::Remote) + .unwrap_or(ExecutionMode::Local); + let ext = Externalities::new(local_validation.balance, fee_schedule); match wasm_executor::validate_candidate( &validation_code, params, ext.clone(), - ExecutionMode::Remote, + execution_mode, ) { Ok(result) => { if result.head_data == collation.head_data.0 { @@ -306,6 +313,7 @@ where /// Does full-pipeline validation of a collation with provided contextual parameters. pub fn full_output_validation_with_api

( + validation_pool: Option<&ValidationPool>, api: &P, collation: &CollationInfo, pov_block: &PoVBlock, @@ -330,6 +338,7 @@ pub fn full_output_validation_with_api

( &encoded_pov, ) .and_then(|()| validate( + validation_pool, &collation, &pov_block, &local_validation, diff --git a/validation/src/shared_table/mod.rs b/validation/src/shared_table/mod.rs index 8b0b42b54fa4..fd6702a7e640 100644 --- a/validation/src/shared_table/mod.rs +++ b/validation/src/shared_table/mod.rs @@ -39,7 +39,7 @@ use self::includable::IncludabilitySender; use primitives::Pair; use sp_api::ProvideRuntimeApi; -use crate::pipeline::FullOutput; +use crate::pipeline::{FullOutput, ValidationPool}; use crate::Error; mod includable; @@ -132,6 +132,7 @@ struct SharedTableInner { trackers: Vec, availability_store: AvailabilityStore, validated: HashMap, + validation_pool: Option, } impl SharedTableInner { @@ -193,6 +194,7 @@ impl SharedTableInner { }; work.map(|work| ParachainWork { + validation_pool: self.validation_pool.clone(), availability_store: self.availability_store.clone(), relay_parent: context.signing_context.parent_hash.clone(), work, @@ -259,6 +261,7 @@ impl Validated { /// Future that performs parachain validation work. pub struct ParachainWork { + validation_pool: Option, work: Work, relay_parent: Hash, availability_store: AvailabilityStore, @@ -283,9 +286,11 @@ impl ParachainWork { let n_validators = self.n_validators; let expected_relay_parent = self.relay_parent; + let pool = self.validation_pool.clone(); let validate = move |pov_block: &PoVBlock, candidate: &AbridgedCandidateReceipt| { let collation_info = candidate.to_collation_info(); let full_output = crate::pipeline::full_output_validation_with_api( + pool.as_ref(), &*api, &collation_info, pov_block, @@ -416,6 +421,7 @@ impl SharedTable { signing_context: SigningContext, availability_store: AvailabilityStore, max_block_data_size: Option, + validation_pool: Option, ) -> Self { SharedTable { context: Arc::new(TableContext { groups, key, signing_context, validators: validators.clone(), }), @@ -425,6 +431,7 @@ impl SharedTable { validated: HashMap::new(), trackers: Vec::new(), availability_store, + validation_pool, })) } } @@ -685,6 +692,7 @@ mod tests { signing_context.clone(), AvailabilityStore::new_in_memory(DummyErasureNetworking), None, + None, ); let mut candidate = AbridgedCandidateReceipt::default(); @@ -741,6 +749,7 @@ mod tests { signing_context.clone(), AvailabilityStore::new_in_memory(DummyErasureNetworking), None, + None, ); let mut candidate = AbridgedCandidateReceipt::default(); @@ -798,6 +807,7 @@ mod tests { availability_store: store.clone(), max_block_data_size: None, n_validators, + validation_pool: None, }; for i in 0..n_validators { @@ -867,6 +877,7 @@ mod tests { availability_store: store.clone(), max_block_data_size: None, n_validators, + validation_pool: None, }; let validated = block_on(producer.prime_with(|_, _| Ok( @@ -921,6 +932,7 @@ mod tests { signing_context.clone(), AvailabilityStore::new_in_memory(DummyErasureNetworking), None, + None, ); let mut candidate = AbridgedCandidateReceipt::default(); @@ -988,6 +1000,7 @@ mod tests { signing_context.clone(), AvailabilityStore::new_in_memory(DummyErasureNetworking), None, + None, ); let mut candidate = AbridgedCandidateReceipt::default(); diff --git a/validation/src/validation_service/mod.rs b/validation/src/validation_service/mod.rs index 987f0e063e82..3c8d1857350e 100644 --- a/validation/src/validation_service/mod.rs +++ b/validation/src/validation_service/mod.rs @@ -47,6 +47,7 @@ use log::{warn, error, info, debug, trace}; use super::{Network, Collators, SharedTable, TableRouter}; use crate::Error; +use crate::pipeline::ValidationPool; /// A handle to spawn background tasks onto. pub type TaskExecutor = Arc; @@ -164,13 +165,15 @@ impl ServiceBuilder where NotifyImport(sc_client_api::BlockImportNotification), } + let validation_pool = Some(ValidationPool::new()); let mut parachain_validation = ParachainValidationInstances { client: self.client.clone(), network: self.network, spawner: self.spawner, availability_store: self.availability_store, live_instances: HashMap::new(), - collation_fetch: DefaultCollationFetch(self.collators), + validation_pool: validation_pool.clone(), + collation_fetch: DefaultCollationFetch(self.collators, validation_pool), }; let client = self.client; @@ -252,7 +255,7 @@ pub(crate) trait CollationFetch { } #[derive(Clone)] -struct DefaultCollationFetch(C); +struct DefaultCollationFetch(C, Option); impl CollationFetch for DefaultCollationFetch where C: Collators + Send + Sync + Unpin + 'static, @@ -272,10 +275,12 @@ impl CollationFetch for DefaultCollationFetch P::Api: ParachainHost, P: ProvideRuntimeApi + Send + Sync + 'static, { + let DefaultCollationFetch(collators, validation_pool) = self; crate::collation::collation_fetch( + validation_pool, parachain, relay_parent, - self.0, + collators, client, max_block_data_size, n_validators, @@ -307,6 +312,9 @@ pub(crate) struct ParachainValidationInstances { /// Live agreements. Maps relay chain parent hashes to attestation /// instances. live_instances: HashMap, + /// The underlying validation pool of processes to use. + /// Only `None` in tests. + validation_pool: Option, /// Used to fetch a collation. collation_fetch: CF, } @@ -406,6 +414,7 @@ impl ParachainValidationInstances where signing_context, self.availability_store.clone(), max_block_data_size, + self.validation_pool.clone(), )); let build_router = self.network.build_table_router( @@ -709,6 +718,7 @@ mod tests { spawner: executor.clone(), availability_store: AvailabilityStore::new_in_memory(MockErasureNetworking), live_instances: HashMap::new(), + validation_pool: None, }; parachain_validation.get_or_instantiate(Default::default(), &keystore, None) @@ -747,6 +757,7 @@ mod tests { spawner: executor.clone(), availability_store: AvailabilityStore::new_in_memory(MockErasureNetworking), live_instances: HashMap::new(), + validation_pool: None, }; parachain_validation.get_or_instantiate(Default::default(), &keystore, None)