Skip to content
This repository was archived by the owner on Nov 15, 2023. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions node/core/candidate-validation/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,10 @@ use polkadot_primitives::v1::{
ValidationCode, PoV, CandidateDescriptor, ValidationData, PersistedValidationData,
TransientValidationData, OccupiedCoreAssumption, Hash,
};
use polkadot_parachain::wasm_executor::{self, ValidationPool, ExecutionMode, ValidationError,
InvalidCandidate as WasmInvalidCandidate};
use polkadot_parachain::wasm_executor::{
self, ValidationPool, ExecutionMode, ValidationError,
InvalidCandidate as WasmInvalidCandidate, ValidationExecutionMode,
};
use polkadot_parachain::primitives::{ValidationResult as WasmValidationResult, ValidationParams};

use parity_scale_codec::Encode;
Expand Down Expand Up @@ -128,7 +130,7 @@ async fn run(
)
-> SubsystemResult<()>
{
let pool = ValidationPool::new();
let pool = ValidationPool::new(ValidationExecutionMode::ExternalProcessSelfHost);

loop {
match ctx.recv().await? {
Expand Down
17 changes: 2 additions & 15 deletions parachain/src/wasm_executor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use sp_externalities::Extensions;
use sp_wasm_interface::HostFunctions as _;

#[cfg(not(any(target_os = "android", target_os = "unknown")))]
pub use validation_host::{run_worker, ValidationPool, EXECUTION_TIMEOUT_SEC};
pub use validation_host::{run_worker, ValidationPool, EXECUTION_TIMEOUT_SEC, ValidationExecutionMode};

mod validation_host;

Expand Down Expand Up @@ -66,8 +66,6 @@ pub enum ExecutionMode<'a> {
Local,
/// Remote execution in a spawned process.
Remote(&'a ValidationPool),
/// Remote execution in a spawned test runner.
RemoteTest(&'a ValidationPool),
}

#[derive(Debug, derive_more::Display, derive_more::From)]
Expand Down Expand Up @@ -143,11 +141,7 @@ pub fn validate_candidate(
},
#[cfg(not(any(target_os = "android", target_os = "unknown")))]
ExecutionMode::Remote(pool) => {
pool.validate_candidate(validation_code, params, false)
},
#[cfg(not(any(target_os = "android", target_os = "unknown")))]
ExecutionMode::RemoteTest(pool) => {
pool.validate_candidate(validation_code, params, true)
pool.validate_candidate(validation_code, params)
},
#[cfg(any(target_os = "android", target_os = "unknown"))]
ExecutionMode::Remote(_pool) =>
Expand All @@ -156,13 +150,6 @@ pub fn validate_candidate(
"Remote validator not available".to_string()
) as Box<_>
))),
#[cfg(any(target_os = "android", target_os = "unknown"))]
ExecutionMode::RemoteTest(_pool) =>
Err(ValidationError::Internal(InternalError::System(
Box::<dyn std::error::Error + Send + Sync>::from(
"Remote validator not available".to_string()
) as Box<_>
))),
}
}

Expand Down
79 changes: 60 additions & 19 deletions parachain/src/wasm_executor/validation_host.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

#![cfg(not(any(target_os = "android", target_os = "unknown")))]

use std::{process, env, sync::Arc, sync::atomic};
use std::{process, env, sync::Arc, sync::atomic, path::PathBuf};
use codec::{Decode, Encode};
use crate::primitives::{ValidationParams, ValidationResult};
use super::{
Expand All @@ -29,7 +29,6 @@ use log::{debug, trace};
use futures::executor::ThreadPool;
use sp_core::traits::SpawnNamed;

const WORKER_ARGS_TEST: &[&'static str] = &["--nocapture", "validation_worker"];
/// CLI Argument to start in validation worker mode.
const WORKER_ARG: &'static str = "validation-worker";
const WORKER_ARGS: &[&'static str] = &[WORKER_ARG];
Expand Down Expand Up @@ -66,19 +65,40 @@ impl SpawnNamed for TaskExecutor {
}
}

/// The execution mode for the `ValidationPool`.
#[derive(Debug, Clone)]
pub enum ValidationExecutionMode {
/// The validation worker is ran in a thread inside the same process.
InProcess,
/// The validation worker is ran using the process' executable and the subcommand `validation-worker` is passed
/// following by the address of the shared memory.
ExternalProcessSelfHost,
/// The validation worker is ran using the command provided and the argument provided. The address of the shared
/// memory is added at the end of the arguments.
ExternalProcessCustomHost {
/// Path to the validation worker. The file must exists and be executable.
binary: PathBuf,
/// List of arguments passed to the validation worker. The address of the shared memory will be automatically
/// added after the arguments.
args: Vec<String>,
},
}

/// A pool of hosts.
#[derive(Clone)]
pub struct ValidationPool {
hosts: Arc<Vec<Mutex<ValidationHost>>>,
execution_mode: ValidationExecutionMode,
}

const DEFAULT_NUM_HOSTS: usize = 8;

impl ValidationPool {
/// Creates a validation pool with the default configuration.
pub fn new() -> ValidationPool {
pub fn new(execution_mode: ValidationExecutionMode) -> ValidationPool {
ValidationPool {
hosts: Arc::new((0..DEFAULT_NUM_HOSTS).map(|_| Default::default()).collect()),
execution_mode,
}
}

Expand All @@ -90,16 +110,15 @@ impl ValidationPool {
&self,
validation_code: &[u8],
params: ValidationParams,
test_mode: bool,
) -> Result<ValidationResult, ValidationError> {
for host in self.hosts.iter() {
if let Some(mut host) = host.try_lock() {
return host.validate_candidate(validation_code, params, test_mode);
return host.validate_candidate(validation_code, params, self.execution_mode.clone());
}
}

// all workers are busy, just wait for the first one
self.hosts[0].lock().validate_candidate(validation_code, params, test_mode)
self.hosts[0].lock().validate_candidate(validation_code, params, self.execution_mode.clone())
}
}

Expand Down Expand Up @@ -208,6 +227,7 @@ unsafe impl Send for ValidationHost {}
#[derive(Default)]
struct ValidationHost {
worker: Option<process::Child>,
worker_thread: Option<std::thread::JoinHandle<Result<(), String>>>,
memory: Option<SharedMem>,
id: u32,
}
Expand All @@ -233,25 +253,46 @@ impl ValidationHost {
Ok(mem_config.create()?)
}

fn start_worker(&mut self, test_mode: bool) -> Result<(), InternalError> {
fn start_worker(&mut self, execution_mode: ValidationExecutionMode) -> Result<(), InternalError> {
if let Some(ref mut worker) = self.worker {
// Check if still alive
if let Ok(None) = worker.try_wait() {
// Still running
return Ok(());
}
}
if self.worker_thread.is_some() {
return Ok(());
}

let memory = Self::create_memory()?;
let self_path = env::current_exe()?;
debug!("Starting worker at {:?}", self_path);
let mut args = if test_mode { WORKER_ARGS_TEST.to_vec() } else { WORKER_ARGS.to_vec() };
args.push(memory.get_os_path());
let worker = process::Command::new(self_path)
.args(args)
.stdin(process::Stdio::piped())
.spawn()?;
self.id = worker.id();
self.worker = Some(worker);

let mut run_worker_process = |cmd: PathBuf, args: Vec<String>| -> Result<(), std::io::Error> {
debug!("Starting worker at {:?} with arguments: {:?} and {:?}", cmd, args, memory.get_os_path());
let worker = process::Command::new(cmd)
.args(args)
.arg(memory.get_os_path())
.stdin(process::Stdio::piped())
.spawn()?;
self.id = worker.id();
self.worker = Some(worker);
Ok(())
};

match execution_mode {
ValidationExecutionMode::InProcess => {
let mem_id = memory.get_os_path().to_string();
self.worker_thread = Some(std::thread::spawn(move || run_worker(mem_id.as_str())));
},
ValidationExecutionMode::ExternalProcessSelfHost => run_worker_process(
env::current_exe()?,
WORKER_ARGS.iter().map(|x| x.to_string()).collect(),
)?,
ValidationExecutionMode::ExternalProcessCustomHost { binary, args } => run_worker_process(
binary,
args,
)?,
};

memory.wait(
Event::WorkerReady as usize,
Expand All @@ -268,13 +309,13 @@ impl ValidationHost {
&mut self,
validation_code: &[u8],
params: ValidationParams,
test_mode: bool,
execution_mode: ValidationExecutionMode,
) -> Result<ValidationResult, ValidationError> {
if validation_code.len() > MAX_CODE_MEM {
return Err(ValidationError::InvalidCandidate(InvalidCandidate::CodeTooLarge(validation_code.len())));
}
// First, check if need to spawn the child process
self.start_worker(test_mode)?;
self.start_worker(execution_mode)?;
let memory = self.memory.as_mut()
.expect("memory is always `Some` after `start_worker` completes successfully");
{
Expand Down
48 changes: 36 additions & 12 deletions parachain/test-parachains/tests/adder/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,16 @@

//! Basic parachain that adds a number as part of its state.

use parachain::primitives::{
RelayChainBlockNumber,
BlockData as GenericBlockData,
HeadData as GenericHeadData,
ValidationParams,
const WORKER_ARGS_TEST: &[&'static str] = &["--nocapture", "validation_worker"];

use parachain::{
primitives::{
RelayChainBlockNumber,
BlockData as GenericBlockData,
HeadData as GenericHeadData,
ValidationParams,
},
wasm_executor::{ValidationPool, ValidationExecutionMode}
};
use codec::{Decode, Encode};

Expand Down Expand Up @@ -52,8 +57,28 @@ fn hash_head(head: &HeadData) -> [u8; 32] {
tiny_keccak::keccak256(head.encode().as_slice())
}

fn validation_pool() -> ValidationPool {
let execution_mode = ValidationExecutionMode::ExternalProcessCustomHost {
binary: std::env::current_exe().unwrap(),
args: WORKER_ARGS_TEST.iter().map(|x| x.to_string()).collect(),
};

ValidationPool::new(execution_mode)
}

#[test]
pub fn execute_good_on_parent() {
fn execute_good_on_parent_with_inprocess_validation() {
let pool = ValidationPool::new(ValidationExecutionMode::InProcess);
execute_good_on_parent(pool);
}

#[test]
pub fn execute_good_on_parent_with_external_process_validation() {
let pool = validation_pool();
execute_good_on_parent(pool);
}

fn execute_good_on_parent(pool: ValidationPool) {
let parent_head = HeadData {
number: 0,
parent_hash: [0; 32],
Expand All @@ -65,7 +90,6 @@ pub fn execute_good_on_parent() {
add: 512,
};

let pool = parachain::wasm_executor::ValidationPool::new();

let ret = parachain::wasm_executor::validate_candidate(
adder::wasm_binary_unwrap(),
Expand All @@ -75,7 +99,7 @@ pub fn execute_good_on_parent() {
relay_chain_height: 1,
hrmp_mqc_heads: Vec::new(),
},
parachain::wasm_executor::ExecutionMode::RemoteTest(&pool),
parachain::wasm_executor::ExecutionMode::Remote(&pool),
sp_core::testing::TaskExecutor::new(),
).unwrap();

Expand All @@ -91,7 +115,7 @@ fn execute_good_chain_on_parent() {
let mut number = 0;
let mut parent_hash = [0; 32];
let mut last_state = 0;
let pool = parachain::wasm_executor::ValidationPool::new();
let pool = validation_pool();

for add in 0..10 {
let parent_head = HeadData {
Expand All @@ -113,7 +137,7 @@ fn execute_good_chain_on_parent() {
relay_chain_height: number as RelayChainBlockNumber + 1,
hrmp_mqc_heads: Vec::new(),
},
parachain::wasm_executor::ExecutionMode::RemoteTest(&pool),
parachain::wasm_executor::ExecutionMode::Remote(&pool),
sp_core::testing::TaskExecutor::new(),
).unwrap();

Expand All @@ -131,7 +155,7 @@ fn execute_good_chain_on_parent() {

#[test]
fn execute_bad_on_parent() {
let pool = parachain::wasm_executor::ValidationPool::new();
let pool = validation_pool();

let parent_head = HeadData {
number: 0,
Expand All @@ -152,7 +176,7 @@ fn execute_bad_on_parent() {
relay_chain_height: 1,
hrmp_mqc_heads: Vec::new(),
},
parachain::wasm_executor::ExecutionMode::RemoteTest(&pool),
parachain::wasm_executor::ExecutionMode::Remote(&pool),
sp_core::testing::TaskExecutor::new(),
).unwrap_err();
}
Loading