From e57111594f831a8a8bf83e77727c08c5d1784ce1 Mon Sep 17 00:00:00 2001 From: Dmitry Sinyavin Date: Thu, 25 Jul 2024 10:59:37 +0200 Subject: [PATCH 01/12] Move decompression to workers --- Cargo.lock | 3 + .../node/core/candidate-validation/Cargo.toml | 2 +- .../node/core/candidate-validation/src/lib.rs | 142 +++++------- .../core/candidate-validation/src/tests.rs | 205 ++---------------- .../benches/host_prepare_rococo_runtime.rs | 2 +- polkadot/node/core/pvf/common/src/error.rs | 10 +- polkadot/node/core/pvf/common/src/execute.rs | 4 + polkadot/node/core/pvf/common/src/pvf.rs | 20 +- .../node/core/pvf/execute-worker/Cargo.toml | 3 + .../node/core/pvf/execute-worker/src/lib.rs | 65 +++++- .../node/core/pvf/prepare-worker/Cargo.toml | 2 + .../benches/prepare_rococo_runtime.rs | 6 +- .../node/core/pvf/prepare-worker/src/lib.rs | 8 +- polkadot/node/core/pvf/src/error.rs | 3 + polkadot/node/core/pvf/src/execute/queue.rs | 36 ++- .../core/pvf/src/execute/worker_interface.rs | 36 +-- polkadot/node/core/pvf/src/host.rs | 109 ++++++++-- polkadot/node/core/pvf/tests/it/adder.rs | 92 ++++---- polkadot/node/core/pvf/tests/it/main.rs | 194 +++++++++++------ polkadot/node/core/pvf/tests/it/process.rs | 80 ++++--- 20 files changed, 539 insertions(+), 483 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 99dfbafafe28..682a9ecc70ea 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -13562,8 +13562,10 @@ dependencies = [ "nix 0.28.0", "parity-scale-codec", "polkadot-node-core-pvf-common", + "polkadot-node-primitives", "polkadot-parachain-primitives", "polkadot-primitives", + "sp-maybe-compressed-blob", "tracing-gum", ] @@ -13578,6 +13580,7 @@ dependencies = [ "nix 0.28.0", "parity-scale-codec", "polkadot-node-core-pvf-common", + "polkadot-node-primitives", "polkadot-primitives", "rayon", "rococo-runtime", diff --git a/polkadot/node/core/candidate-validation/Cargo.toml b/polkadot/node/core/candidate-validation/Cargo.toml index 13ab3e3fba50..fcacc38cae65 100644 --- a/polkadot/node/core/candidate-validation/Cargo.toml +++ b/polkadot/node/core/candidate-validation/Cargo.toml @@ -17,7 +17,6 @@ gum = { workspace = true, default-features = true } sp-keystore = { workspace = true } sp-application-crypto = { workspace = true } -sp-maybe-compressed-blob = { workspace = true, default-features = true } codec = { features = ["bit-vec", "derive"], workspace = true } polkadot-primitives = { workspace = true, default-features = true } @@ -36,5 +35,6 @@ sp-keyring = { workspace = true, default-features = true } futures = { features = ["thread-pool"], workspace = true } assert_matches = { workspace = true } polkadot-node-subsystem-test-helpers = { workspace = true } +sp-maybe-compressed-blob = { workspace = true, default-features = true } sp-core = { workspace = true, default-features = true } polkadot-primitives-test-helpers = { workspace = true } diff --git a/polkadot/node/core/candidate-validation/src/lib.rs b/polkadot/node/core/candidate-validation/src/lib.rs index 1985964ebc51..2b54e77274b6 100644 --- a/polkadot/node/core/candidate-validation/src/lib.rs +++ b/polkadot/node/core/candidate-validation/src/lib.rs @@ -27,9 +27,7 @@ use polkadot_node_core_pvf::{ InternalValidationError, InvalidCandidate as WasmInvalidCandidate, PossiblyInvalidError, PrepareError, PrepareJobKind, PvfPrepData, ValidationError, ValidationHost, }; -use polkadot_node_primitives::{ - BlockData, InvalidCandidate, PoV, ValidationResult, POV_BOMB_LIMIT, VALIDATION_CODE_BOMB_LIMIT, -}; +use polkadot_node_primitives::{InvalidCandidate, PoV, ValidationResult}; use polkadot_node_subsystem::{ errors::RuntimeApiError, messages::{ @@ -41,9 +39,7 @@ use polkadot_node_subsystem::{ }; use polkadot_node_subsystem_util as util; use polkadot_overseer::ActiveLeavesUpdate; -use polkadot_parachain_primitives::primitives::{ - ValidationParams, ValidationResult as WasmValidationResult, -}; +use polkadot_parachain_primitives::primitives::ValidationResult as WasmValidationResult; use polkadot_primitives::{ executor_params::{ DEFAULT_APPROVAL_EXECUTION_TIMEOUT, DEFAULT_BACKING_EXECUTION_TIMEOUT, @@ -504,21 +500,12 @@ where continue; }; - let pvf = match sp_maybe_compressed_blob::decompress( - &validation_code.0, - VALIDATION_CODE_BOMB_LIMIT, - ) { - Ok(code) => PvfPrepData::from_code( - code.into_owned(), - executor_params.clone(), - timeout, - PrepareJobKind::Prechecking, - ), - Err(e) => { - gum::debug!(target: LOG_TARGET, err=?e, "cannot decompress validation code"); - continue - }, - }; + let pvf = PvfPrepData::from_code( + validation_code.0, + executor_params.clone(), + timeout, + PrepareJobKind::Prechecking, + ); active_pvfs.push(pvf); processed_code_hashes.push(code_hash); @@ -651,21 +638,12 @@ where let timeout = pvf_prep_timeout(&executor_params, PvfPrepKind::Precheck); - let pvf = match sp_maybe_compressed_blob::decompress( - &validation_code.0, - VALIDATION_CODE_BOMB_LIMIT, - ) { - Ok(code) => PvfPrepData::from_code( - code.into_owned(), - executor_params, - timeout, - PrepareJobKind::Prechecking, - ), - Err(e) => { - gum::debug!(target: LOG_TARGET, err=?e, "precheck: cannot decompress validation code"); - return PreCheckOutcome::Invalid - }, - }; + let pvf = PvfPrepData::from_code( + validation_code.0, + executor_params, + timeout, + PrepareJobKind::Prechecking, + ); match validation_backend.precheck_pvf(pvf).await { Ok(_) => PreCheckOutcome::Valid, @@ -873,41 +851,11 @@ async fn validate_candidate_exhaustive( return Ok(ValidationResult::Invalid(e)) } - let raw_validation_code = match sp_maybe_compressed_blob::decompress( - &validation_code.0, - VALIDATION_CODE_BOMB_LIMIT, - ) { - Ok(code) => code, - Err(e) => { - gum::info!(target: LOG_TARGET, ?para_id, err=?e, "Invalid candidate (validation code)"); - - // Code already passed pre-checking, if decompression fails now this most likely means - // some local corruption happened. - return Err(ValidationFailed("Code decompression failed".to_string())) - }, - }; - metrics.observe_code_size(raw_validation_code.len()); + // metrics.observe_code_size(raw_validation_code.len()); // TODO!!! metrics.observe_pov_size(pov.block_data.0.len(), true); - let raw_block_data = - match sp_maybe_compressed_blob::decompress(&pov.block_data.0, POV_BOMB_LIMIT) { - Ok(block_data) => BlockData(block_data.to_vec()), - Err(e) => { - gum::info!(target: LOG_TARGET, ?para_id, err=?e, "Invalid candidate (PoV code)"); - - // If the PoV is invalid, the candidate certainly is. - return Ok(ValidationResult::Invalid(InvalidCandidate::PoVDecompressionFailure)) - }, - }; - metrics.observe_pov_size(raw_block_data.0.len(), false); - - let params = ValidationParams { - parent_head: persisted_validation_data.parent_head.clone(), - block_data: raw_block_data, - relay_parent_number: persisted_validation_data.relay_parent_number, - relay_parent_storage_root: persisted_validation_data.relay_parent_storage_root, - }; + let persisted_validation_data = Arc::new(persisted_validation_data); let result = match exec_kind { // Retry is disabled to reduce the chance of nondeterministic blocks getting backed and // honest backers getting slashed. @@ -915,7 +863,7 @@ async fn validate_candidate_exhaustive( let prep_timeout = pvf_prep_timeout(&executor_params, PvfPrepKind::Prepare); let exec_timeout = pvf_exec_timeout(&executor_params, exec_kind); let pvf = PvfPrepData::from_code( - raw_validation_code.to_vec(), + validation_code.0, executor_params, prep_timeout, PrepareJobKind::Compilation, @@ -925,7 +873,8 @@ async fn validate_candidate_exhaustive( .validate_candidate( pvf, exec_timeout, - params.encode(), + persisted_validation_data.clone(), + pov, polkadot_node_core_pvf::Priority::Normal, ) .await @@ -933,9 +882,10 @@ async fn validate_candidate_exhaustive( PvfExecKind::Approval => validation_backend .validate_candidate_with_retry( - raw_validation_code.to_vec(), + validation_code.0, pvf_exec_timeout(&executor_params, exec_kind), - params, + persisted_validation_data.clone(), + pov, executor_params, PVF_APPROVAL_EXECUTION_RETRY_DELAY, polkadot_node_core_pvf::Priority::Critical, @@ -961,6 +911,8 @@ async fn validate_candidate_exhaustive( Ok(ValidationResult::Invalid(InvalidCandidate::Timeout)), Err(ValidationError::Invalid(WasmInvalidCandidate::WorkerReportedInvalid(e))) => Ok(ValidationResult::Invalid(InvalidCandidate::ExecutionError(e))), + Err(ValidationError::Invalid(WasmInvalidCandidate::PoVDecompressionFailure)) => + Ok(ValidationResult::Invalid(InvalidCandidate::PoVDecompressionFailure)), Err(ValidationError::PossiblyInvalid(PossiblyInvalidError::AmbiguousWorkerDeath)) => Ok(ValidationResult::Invalid(InvalidCandidate::ExecutionError( "ambiguous worker death".to_string(), @@ -983,11 +935,13 @@ async fn validate_candidate_exhaustive( ); Err(ValidationFailed(e.to_string())) }, - Ok(res) => + Ok((res, pov_size)) => if res.head_data.hash() != candidate_receipt.descriptor.para_head { gum::info!(target: LOG_TARGET, ?para_id, "Invalid candidate (para_head)"); Ok(ValidationResult::Invalid(InvalidCandidate::ParaHeadHashMismatch)) } else { + metrics.observe_pov_size(pov_size as usize, false); + let outputs = CandidateCommitments { head_data: res.head_data, upward_messages: res.upward_messages, @@ -1007,7 +961,7 @@ async fn validate_candidate_exhaustive( // invalid. Ok(ValidationResult::Invalid(InvalidCandidate::CommitmentsHashMismatch)) } else { - Ok(ValidationResult::Valid(outputs, persisted_validation_data)) + Ok(ValidationResult::Valid(outputs, (*persisted_validation_data).clone())) } }, } @@ -1020,10 +974,11 @@ trait ValidationBackend { &mut self, pvf: PvfPrepData, exec_timeout: Duration, - encoded_params: Vec, + pvd: Arc, + pov: Arc, // The priority for the preparation job. prepare_priority: polkadot_node_core_pvf::Priority, - ) -> Result; + ) -> Result<(WasmValidationResult, u32), ValidationError>; /// Tries executing a PVF. Will retry once if an error is encountered that may have /// been transient. @@ -1035,18 +990,19 @@ trait ValidationBackend { /// preparation. async fn validate_candidate_with_retry( &mut self, - raw_validation_code: Vec, + code: Vec, exec_timeout: Duration, - params: ValidationParams, + pvd: Arc, + pov: Arc, executor_params: ExecutorParams, retry_delay: Duration, // The priority for the preparation job. prepare_priority: polkadot_node_core_pvf::Priority, - ) -> Result { + ) -> Result<(WasmValidationResult, u32), ValidationError> { let prep_timeout = pvf_prep_timeout(&executor_params, PvfPrepKind::Prepare); // Construct the PVF a single time, since it is an expensive operation. Cloning it is cheap. let pvf = PvfPrepData::from_code( - raw_validation_code, + code, executor_params, prep_timeout, PrepareJobKind::Compilation, @@ -1057,7 +1013,13 @@ trait ValidationBackend { // Use `Priority::Critical` as finality trumps parachain liveliness. let mut validation_result = self - .validate_candidate(pvf.clone(), exec_timeout, params.encode(), prepare_priority) + .validate_candidate( + pvf.clone(), + exec_timeout, + pvd.clone(), + pov.clone(), + prepare_priority, + ) .await; if validation_result.is_ok() { return validation_result @@ -1130,10 +1092,14 @@ trait ValidationBackend { validation_result ); - // Encode the params again when re-trying. We expect the retry case to be relatively - // rare, and we want to avoid unconditionally cloning data. validation_result = self - .validate_candidate(pvf.clone(), new_timeout, params.encode(), prepare_priority) + .validate_candidate( + pvf.clone(), + new_timeout, + pvd.clone(), + pov.clone(), + prepare_priority, + ) .await; } } @@ -1153,13 +1119,13 @@ impl ValidationBackend for ValidationHost { &mut self, pvf: PvfPrepData, exec_timeout: Duration, - encoded_params: Vec, + pvd: Arc, + pov: Arc, // The priority for the preparation job. prepare_priority: polkadot_node_core_pvf::Priority, - ) -> Result { + ) -> Result<(WasmValidationResult, u32), ValidationError> { let (tx, rx) = oneshot::channel(); - if let Err(err) = - self.execute_pvf(pvf, exec_timeout, encoded_params, prepare_priority, tx).await + if let Err(err) = self.execute_pvf(pvf, exec_timeout, pvd, pov, prepare_priority, tx).await { return Err(InternalValidationError::HostCommunication(format!( "cannot send pvf to the validation host, it might have shut down: {:?}", diff --git a/polkadot/node/core/candidate-validation/src/tests.rs b/polkadot/node/core/candidate-validation/src/tests.rs index 86d855f78b45..1bd2a39fd47f 100644 --- a/polkadot/node/core/candidate-validation/src/tests.rs +++ b/polkadot/node/core/candidate-validation/src/tests.rs @@ -20,6 +20,7 @@ use super::*; use assert_matches::assert_matches; use futures::executor; use polkadot_node_core_pvf::PrepareError; +use polkadot_node_primitives::{BlockData, POV_BOMB_LIMIT, VALIDATION_CODE_BOMB_LIMIT}; use polkadot_node_subsystem::messages::AllMessages; use polkadot_node_subsystem_util::reexports::SubsystemContext; use polkadot_overseer::ActivatedLeaf; @@ -363,17 +364,17 @@ fn check_does_not_match() { } struct MockValidateCandidateBackend { - result_list: Vec>, + result_list: Vec>, num_times_called: usize, } impl MockValidateCandidateBackend { - fn with_hardcoded_result(result: Result) -> Self { + fn with_hardcoded_result(result: Result<(WasmValidationResult, u32), ValidationError>) -> Self { Self { result_list: vec![result], num_times_called: 0 } } fn with_hardcoded_result_list( - result_list: Vec>, + result_list: Vec>, ) -> Self { Self { result_list, num_times_called: 0 } } @@ -385,9 +386,10 @@ impl ValidationBackend for MockValidateCandidateBackend { &mut self, _pvf: PvfPrepData, _timeout: Duration, - _encoded_params: Vec, + _pvd: Arc, + _pov: Arc, _prepare_priority: polkadot_node_core_pvf::Priority, - ) -> Result { + ) -> Result<(WasmValidationResult, u32), ValidationError> { // This is expected to panic if called more times than expected, indicating an error in the // test. let result = self.result_list[self.num_times_called].clone(); @@ -453,7 +455,7 @@ fn candidate_validation_ok_is_ok() { let candidate_receipt = CandidateReceipt { descriptor, commitments_hash: commitments.hash() }; let v = executor::block_on(validate_candidate_exhaustive( - MockValidateCandidateBackend::with_hardcoded_result(Ok(validation_result)), + MockValidateCandidateBackend::with_hardcoded_result(Ok((validation_result, 1))), validation_data.clone(), validation_code, candidate_receipt, @@ -585,7 +587,7 @@ fn candidate_validation_one_ambiguous_error_is_valid() { let v = executor::block_on(validate_candidate_exhaustive( MockValidateCandidateBackend::with_hardcoded_result_list(vec![ Err(ValidationError::PossiblyInvalid(PossiblyInvalidError::AmbiguousWorkerDeath)), - Ok(validation_result), + Ok((validation_result, 1)), ]), validation_data.clone(), validation_code, @@ -712,7 +714,7 @@ fn candidate_validation_dont_retry_panic_errors() { fn candidate_validation_retry_on_error_helper( exec_kind: PvfExecKind, - mock_errors: Vec>, + mock_errors: Vec>, ) -> Result { let validation_data = PersistedValidationData { max_pov_size: 1024, ..Default::default() }; @@ -828,7 +830,7 @@ fn candidate_validation_commitment_hash_mismatch_is_invalid() { }; let result = executor::block_on(validate_candidate_exhaustive( - MockValidateCandidateBackend::with_hardcoded_result(Ok(validation_result)), + MockValidateCandidateBackend::with_hardcoded_result(Ok((validation_result, 1))), validation_data, validation_code, candidate_receipt, @@ -937,7 +939,7 @@ fn compressed_code_works() { let candidate_receipt = CandidateReceipt { descriptor, commitments_hash: commitments.hash() }; let v = executor::block_on(validate_candidate_exhaustive( - MockValidateCandidateBackend::with_hardcoded_result(Ok(validation_result)), + MockValidateCandidateBackend::with_hardcoded_result(Ok((validation_result, 1))), validation_data, validation_code, candidate_receipt, @@ -950,115 +952,6 @@ fn compressed_code_works() { assert_matches!(v, Ok(ValidationResult::Valid(_, _))); } -#[test] -fn code_decompression_failure_is_error() { - let validation_data = PersistedValidationData { max_pov_size: 1024, ..Default::default() }; - let pov = PoV { block_data: BlockData(vec![1; 32]) }; - let head_data = HeadData(vec![1, 1, 1]); - - let raw_code = vec![2u8; VALIDATION_CODE_BOMB_LIMIT + 1]; - let validation_code = - sp_maybe_compressed_blob::compress(&raw_code, VALIDATION_CODE_BOMB_LIMIT + 1) - .map(ValidationCode) - .unwrap(); - - let descriptor = make_valid_candidate_descriptor( - ParaId::from(1_u32), - dummy_hash(), - validation_data.hash(), - pov.hash(), - validation_code.hash(), - head_data.hash(), - dummy_hash(), - Sr25519Keyring::Alice, - ); - - let validation_result = WasmValidationResult { - head_data, - new_validation_code: None, - upward_messages: Default::default(), - horizontal_messages: Default::default(), - processed_downward_messages: 0, - hrmp_watermark: 0, - }; - - let candidate_receipt = CandidateReceipt { descriptor, commitments_hash: Hash::zero() }; - - let pool = TaskExecutor::new(); - let (_ctx, _ctx_handle) = polkadot_node_subsystem_test_helpers::make_subsystem_context::< - AllMessages, - _, - >(pool.clone()); - - let v = executor::block_on(validate_candidate_exhaustive( - MockValidateCandidateBackend::with_hardcoded_result(Ok(validation_result)), - validation_data, - validation_code, - candidate_receipt, - Arc::new(pov), - ExecutorParams::default(), - PvfExecKind::Backing, - &Default::default(), - )); - - assert_matches!(v, Err(_)); -} - -#[test] -fn pov_decompression_failure_is_invalid() { - let validation_data = - PersistedValidationData { max_pov_size: POV_BOMB_LIMIT as u32, ..Default::default() }; - let head_data = HeadData(vec![1, 1, 1]); - - let raw_block_data = vec![2u8; POV_BOMB_LIMIT + 1]; - let pov = sp_maybe_compressed_blob::compress(&raw_block_data, POV_BOMB_LIMIT + 1) - .map(|raw| PoV { block_data: BlockData(raw) }) - .unwrap(); - - let validation_code = ValidationCode(vec![2; 16]); - - let descriptor = make_valid_candidate_descriptor( - ParaId::from(1_u32), - dummy_hash(), - validation_data.hash(), - pov.hash(), - validation_code.hash(), - head_data.hash(), - dummy_hash(), - Sr25519Keyring::Alice, - ); - - let validation_result = WasmValidationResult { - head_data, - new_validation_code: None, - upward_messages: Default::default(), - horizontal_messages: Default::default(), - processed_downward_messages: 0, - hrmp_watermark: 0, - }; - - let candidate_receipt = CandidateReceipt { descriptor, commitments_hash: Hash::zero() }; - - let pool = TaskExecutor::new(); - let (_ctx, _ctx_handle) = polkadot_node_subsystem_test_helpers::make_subsystem_context::< - AllMessages, - _, - >(pool.clone()); - - let v = executor::block_on(validate_candidate_exhaustive( - MockValidateCandidateBackend::with_hardcoded_result(Ok(validation_result)), - validation_data, - validation_code, - candidate_receipt, - Arc::new(pov), - ExecutorParams::default(), - PvfExecKind::Backing, - &Default::default(), - )); - - assert_matches!(v, Ok(ValidationResult::Invalid(InvalidCandidate::PoVDecompressionFailure))); -} - struct MockPreCheckBackend { result: Result<(), PrepareError>, } @@ -1075,9 +968,10 @@ impl ValidationBackend for MockPreCheckBackend { &mut self, _pvf: PvfPrepData, _timeout: Duration, - _encoded_params: Vec, + _pvd: Arc, + _pov: Arc, _prepare_priority: polkadot_node_core_pvf::Priority, - ) -> Result { + ) -> Result<(WasmValidationResult, u32), ValidationError> { unreachable!() } @@ -1149,70 +1043,6 @@ fn precheck_works() { executor::block_on(test_fut); } -#[test] -fn precheck_invalid_pvf_blob_compression() { - let relay_parent = [3; 32].into(); - - let raw_code = vec![2u8; VALIDATION_CODE_BOMB_LIMIT + 1]; - let validation_code = - sp_maybe_compressed_blob::compress(&raw_code, VALIDATION_CODE_BOMB_LIMIT + 1) - .map(ValidationCode) - .unwrap(); - let validation_code_hash = validation_code.hash(); - - let pool = TaskExecutor::new(); - let (mut ctx, mut ctx_handle) = polkadot_node_subsystem_test_helpers::make_subsystem_context::< - AllMessages, - _, - >(pool.clone()); - - let (check_fut, check_result) = precheck_pvf( - ctx.sender(), - MockPreCheckBackend::with_hardcoded_result(Ok(())), - relay_parent, - validation_code_hash, - ) - .remote_handle(); - - let test_fut = async move { - assert_matches!( - ctx_handle.recv().await, - AllMessages::RuntimeApi(RuntimeApiMessage::Request( - rp, - RuntimeApiRequest::ValidationCodeByHash( - vch, - tx - ), - )) => { - assert_eq!(vch, validation_code_hash); - assert_eq!(rp, relay_parent); - - let _ = tx.send(Ok(Some(validation_code.clone()))); - } - ); - assert_matches!( - ctx_handle.recv().await, - AllMessages::RuntimeApi( - RuntimeApiMessage::Request(_, RuntimeApiRequest::SessionIndexForChild(tx)) - ) => { - tx.send(Ok(1u32.into())).unwrap(); - } - ); - assert_matches!( - ctx_handle.recv().await, - AllMessages::RuntimeApi( - RuntimeApiMessage::Request(_, RuntimeApiRequest::SessionExecutorParams(_, tx)) - ) => { - tx.send(Ok(Some(ExecutorParams::default()))).unwrap(); - } - ); - assert_matches!(check_result.await, PreCheckOutcome::Invalid); - }; - - let test_fut = future::join(test_fut, check_fut); - executor::block_on(test_fut); -} - #[test] fn precheck_properly_classifies_outcomes() { let inner = |prepare_result, precheck_outcome| { @@ -1292,9 +1122,10 @@ impl ValidationBackend for MockHeadsUp { &mut self, _pvf: PvfPrepData, _timeout: Duration, - _encoded_params: Vec, + _pvd: Arc, + _pov: Arc, _prepare_priority: polkadot_node_core_pvf::Priority, - ) -> Result { + ) -> Result<(WasmValidationResult, u32), ValidationError> { unreachable!() } diff --git a/polkadot/node/core/pvf/benches/host_prepare_rococo_runtime.rs b/polkadot/node/core/pvf/benches/host_prepare_rococo_runtime.rs index 97a03e6596d1..342128b7cca2 100644 --- a/polkadot/node/core/pvf/benches/host_prepare_rococo_runtime.rs +++ b/polkadot/node/core/pvf/benches/host_prepare_rococo_runtime.rs @@ -116,7 +116,7 @@ fn host_prepare_rococo_runtime(c: &mut Criterion) { cfg.prepare_workers_hard_max_num = 1; }) .await, - pvf.clone().code(), + pvf.clone().maybe_compressed_code(), ) }, |result| async move { diff --git a/polkadot/node/core/pvf/common/src/error.rs b/polkadot/node/core/pvf/common/src/error.rs index 7ee05448d3c5..b0cdba9501db 100644 --- a/polkadot/node/core/pvf/common/src/error.rs +++ b/polkadot/node/core/pvf/common/src/error.rs @@ -94,6 +94,10 @@ pub enum PrepareError { #[codec(index = 11)] #[error("prepare: error interfacing with the kernel: {0}")] Kernel(String), + /// Code blob failed to decompress + #[codec(index = 12)] + #[error("prepare: could not decompress code blob: {0}")] + CouldNotDecompressCodeBlob(String), } impl PrepareError { @@ -106,7 +110,11 @@ impl PrepareError { pub fn is_deterministic(&self) -> bool { use PrepareError::*; match self { - Prevalidation(_) | Preparation(_) | JobError(_) | OutOfMemory => true, + Prevalidation(_) | + Preparation(_) | + JobError(_) | + OutOfMemory | + CouldNotDecompressCodeBlob(_) => true, IoErr(_) | JobDied { .. } | CreateTmpFile(_) | diff --git a/polkadot/node/core/pvf/common/src/execute.rs b/polkadot/node/core/pvf/common/src/execute.rs index 46862f9f80b6..cff3f3b86e95 100644 --- a/polkadot/node/core/pvf/common/src/execute.rs +++ b/polkadot/node/core/pvf/common/src/execute.rs @@ -35,6 +35,8 @@ pub struct WorkerResponse { pub job_response: JobResponse, /// The amount of CPU time taken by the job. pub duration: Duration, + /// The uncompressed PoV size. + pub pov_size: u32, } /// An error occurred in the worker process. @@ -77,6 +79,8 @@ pub enum JobResponse { RuntimeConstruction(String), /// The candidate is invalid. InvalidCandidate(String), + /// PoV decompression failed + PoVDecompressionFailure, } impl JobResponse { diff --git a/polkadot/node/core/pvf/common/src/pvf.rs b/polkadot/node/core/pvf/common/src/pvf.rs index e2ac36a2406a..4019a8d8b0d0 100644 --- a/polkadot/node/core/pvf/common/src/pvf.rs +++ b/polkadot/node/core/pvf/common/src/pvf.rs @@ -26,9 +26,9 @@ use std::{fmt, sync::Arc, time::Duration}; /// Should be cheap to clone. #[derive(Clone, Encode, Decode)] pub struct PvfPrepData { - /// Wasm code (uncompressed) - code: Arc>, - /// Wasm code hash + /// Wasm code (maybe compressed) + maybe_compressed_code: Arc>, + /// Wasm code hash. code_hash: ValidationCodeHash, /// Executor environment parameters for the session for which artifact is prepared executor_params: Arc, @@ -46,20 +46,20 @@ impl PvfPrepData { prep_timeout: Duration, prep_kind: PrepareJobKind, ) -> Self { - let code = Arc::new(code); - let code_hash = sp_crypto_hashing::blake2_256(&code).into(); + let maybe_compressed_code = Arc::new(code); + let code_hash = sp_crypto_hashing::blake2_256(&maybe_compressed_code).into(); let executor_params = Arc::new(executor_params); - Self { code, code_hash, executor_params, prep_timeout, prep_kind } + Self { maybe_compressed_code, code_hash, executor_params, prep_timeout, prep_kind } } - /// Returns validation code hash for the PVF + /// Returns validation code hash pub fn code_hash(&self) -> ValidationCodeHash { self.code_hash } - /// Returns PVF code - pub fn code(&self) -> Arc> { - self.code.clone() + /// Returns PVF code blob + pub fn maybe_compressed_code(&self) -> Arc> { + self.maybe_compressed_code.clone() } /// Returns executor params diff --git a/polkadot/node/core/pvf/execute-worker/Cargo.toml b/polkadot/node/core/pvf/execute-worker/Cargo.toml index f24b66dc4a0e..6ad340d25612 100644 --- a/polkadot/node/core/pvf/execute-worker/Cargo.toml +++ b/polkadot/node/core/pvf/execute-worker/Cargo.toml @@ -19,8 +19,11 @@ libc = { workspace = true } codec = { features = ["derive"], workspace = true } polkadot-node-core-pvf-common = { workspace = true, default-features = true } +polkadot-node-primitives = { workspace = true, default-features = true } polkadot-parachain-primitives = { workspace = true, default-features = true } polkadot-primitives = { workspace = true, default-features = true } +sp-maybe-compressed-blob = { workspace = true, default-features = true } + [features] builder = [] diff --git a/polkadot/node/core/pvf/execute-worker/src/lib.rs b/polkadot/node/core/pvf/execute-worker/src/lib.rs index 35858ab36cec..4b7c167cc9ec 100644 --- a/polkadot/node/core/pvf/execute-worker/src/lib.rs +++ b/polkadot/node/core/pvf/execute-worker/src/lib.rs @@ -22,6 +22,7 @@ pub use polkadot_node_core_pvf_common::{ error::ExecuteError, executor_interface::execute_artifact, }; +use polkadot_parachain_primitives::primitives::ValidationParams; // NOTE: Initializing logging in e.g. tests will not have an effect in the workers, as they are // separate spawned processes. Run with e.g. `RUST_LOG=parachain::pvf-execute-worker=trace`. @@ -50,8 +51,9 @@ use polkadot_node_core_pvf_common::{ }, worker_dir, }; +use polkadot_node_primitives::{BlockData, PoV, POV_BOMB_LIMIT}; use polkadot_parachain_primitives::primitives::ValidationResult; -use polkadot_primitives::ExecutorParams; +use polkadot_primitives::{ExecutorParams, PersistedValidationData}; use std::{ io::{self, Read}, os::{ @@ -85,8 +87,23 @@ fn recv_execute_handshake(stream: &mut UnixStream) -> io::Result { Ok(handshake) } -fn recv_request(stream: &mut UnixStream) -> io::Result<(Vec, Duration)> { - let params = framed_recv_blocking(stream)?; +fn recv_request(stream: &mut UnixStream) -> io::Result<(PersistedValidationData, PoV, Duration)> { + let pvd = framed_recv_blocking(stream)?; + let pvd = PersistedValidationData::decode(&mut &pvd[..]).map_err(|_| { + io::Error::new( + io::ErrorKind::Other, + "execute pvf recv_request: failed to decode persisted validation data".to_string(), + ) + })?; + + let pov = framed_recv_blocking(stream)?; + let pov = PoV::decode(&mut &pov[..]).map_err(|_| { + io::Error::new( + io::ErrorKind::Other, + "execute pvf recv_request: failed to decode PoV".to_string(), + ) + })?; + let execution_timeout = framed_recv_blocking(stream)?; let execution_timeout = Duration::decode(&mut &execution_timeout[..]).map_err(|_| { io::Error::new( @@ -94,7 +111,7 @@ fn recv_request(stream: &mut UnixStream) -> io::Result<(Vec, Duration)> { "execute pvf recv_request: failed to decode duration".to_string(), ) })?; - Ok((params, execution_timeout)) + Ok((pvd, pov, execution_timeout)) } /// Sends an error to the host and returns the original error wrapped in `io::Error`. @@ -149,7 +166,7 @@ pub fn worker_entrypoint( let execute_thread_stack_size = max_stack_size(&executor_params); loop { - let (params, execution_timeout) = recv_request(&mut stream).map_err(|e| { + let (pvd, pov, execution_timeout) = recv_request(&mut stream).map_err(|e| { map_and_send_err!( e, InternalValidationError::HostCommunication, @@ -197,7 +214,33 @@ pub fn worker_entrypoint( let stream_fd = stream.as_raw_fd(); let compiled_artifact_blob = Arc::new(compiled_artifact_blob); - let params = Arc::new(params); + + let raw_block_data = + match sp_maybe_compressed_blob::decompress(&pov.block_data.0, POV_BOMB_LIMIT) { + Ok(data) => data, + Err(_) => { + send_result::( + &mut stream, + Ok(WorkerResponse { + job_response: JobResponse::PoVDecompressionFailure, + duration: Duration::ZERO, + pov_size: 0, + }), + worker_info, + )?; + continue; + }, + }; + + let pov_size = raw_block_data.len() as u32; + + let params = ValidationParams { + parent_head: pvd.parent_head.clone(), + block_data: BlockData(raw_block_data.to_vec()), + relay_parent_number: pvd.relay_parent_number, + relay_parent_storage_root: pvd.relay_parent_storage_root, + }; + let params = Arc::new(params.encode()); cfg_if::cfg_if! { if #[cfg(target_os = "linux")] { @@ -214,6 +257,7 @@ pub fn worker_entrypoint( worker_info, security_status.can_unshare_user_namespace_and_change_root, usage_before, + pov_size, )? } else { // Fall back to using fork. @@ -228,6 +272,7 @@ pub fn worker_entrypoint( execute_thread_stack_size, worker_info, usage_before, + pov_size, )? }; } else { @@ -242,6 +287,7 @@ pub fn worker_entrypoint( execute_thread_stack_size, worker_info, usage_before, + pov_size, )?; } } @@ -300,6 +346,7 @@ fn handle_clone( worker_info: &WorkerInfo, have_unshare_newuser: bool, usage_before: Usage, + pov_size: u32, ) -> io::Result> { use polkadot_node_core_pvf_common::worker::security; @@ -329,6 +376,7 @@ fn handle_clone( worker_info, child, usage_before, + pov_size, execution_timeout, ), Err(security::clone::Error::Clone(errno)) => @@ -347,6 +395,7 @@ fn handle_fork( execute_worker_stack_size: usize, worker_info: &WorkerInfo, usage_before: Usage, + pov_size: u32, ) -> io::Result> { // SAFETY: new process is spawned within a single threaded process. This invariant // is enforced by tests. @@ -367,6 +416,7 @@ fn handle_fork( worker_info, child, usage_before, + pov_size, execution_timeout, ), Err(errno) => Ok(Err(internal_error_from_errno("fork", errno))), @@ -513,6 +563,7 @@ fn handle_parent_process( worker_info: &WorkerInfo, job_pid: Pid, usage_before: Usage, + pov_size: u32, timeout: Duration, ) -> io::Result> { // the read end will wait until all write ends have been closed, @@ -578,7 +629,7 @@ fn handle_parent_process( )))); } - Ok(Ok(WorkerResponse { job_response, duration: cpu_tv })) + Ok(Ok(WorkerResponse { job_response, pov_size, duration: cpu_tv })) }, Err(job_error) => { gum::warn!( diff --git a/polkadot/node/core/pvf/prepare-worker/Cargo.toml b/polkadot/node/core/pvf/prepare-worker/Cargo.toml index 9e0d01fc438b..56235bd82192 100644 --- a/polkadot/node/core/pvf/prepare-worker/Cargo.toml +++ b/polkadot/node/core/pvf/prepare-worker/Cargo.toml @@ -23,10 +23,12 @@ nix = { features = ["process", "resource", "sched"], workspace = true } codec = { features = ["derive"], workspace = true } polkadot-node-core-pvf-common = { workspace = true, default-features = true } +polkadot-node-primitives = { workspace = true, default-features = true } polkadot-primitives = { workspace = true, default-features = true } sc-executor-common = { workspace = true, default-features = true } sc-executor-wasmtime = { workspace = true, default-features = true } +sp-maybe-compressed-blob = { workspace = true, default-features = true } [target.'cfg(target_os = "linux")'.dependencies] tikv-jemallocator = "0.5.0" diff --git a/polkadot/node/core/pvf/prepare-worker/benches/prepare_rococo_runtime.rs b/polkadot/node/core/pvf/prepare-worker/benches/prepare_rococo_runtime.rs index d531c90b64b5..49b30dc33ceb 100644 --- a/polkadot/node/core/pvf/prepare-worker/benches/prepare_rococo_runtime.rs +++ b/polkadot/node/core/pvf/prepare-worker/benches/prepare_rococo_runtime.rs @@ -24,7 +24,11 @@ use polkadot_primitives::ExecutorParams; use std::time::Duration; fn do_prepare_runtime(pvf: PvfPrepData) { - let blob = match prevalidate(&pvf.code()) { + let maybe_compressed_code = pvf.maybe_compressed_code(); + let raw_validation_code = + sp_maybe_compressed_blob::decompress(&maybe_compressed_code, usize::MAX).unwrap(); + + let blob = match prevalidate(&raw_validation_code) { Err(err) => panic!("{:?}", err), Ok(b) => b, }; diff --git a/polkadot/node/core/pvf/prepare-worker/src/lib.rs b/polkadot/node/core/pvf/prepare-worker/src/lib.rs index ef33d11720eb..82e54b1c9222 100644 --- a/polkadot/node/core/pvf/prepare-worker/src/lib.rs +++ b/polkadot/node/core/pvf/prepare-worker/src/lib.rs @@ -38,6 +38,7 @@ use polkadot_node_core_pvf_common::{ executor_interface::{prepare, prevalidate}, worker::{pipe2_cloexec, PipeFd, WorkerInfo}, }; +use polkadot_node_primitives::VALIDATION_CODE_BOMB_LIMIT; use codec::{Decode, Encode}; use polkadot_node_core_pvf_common::{ @@ -295,7 +296,12 @@ pub fn worker_entrypoint( } fn prepare_artifact(pvf: PvfPrepData) -> Result { - let blob = match prevalidate(&pvf.code()) { + let maybe_compressed_code = pvf.maybe_compressed_code(); + let raw_validation_code = + sp_maybe_compressed_blob::decompress(&maybe_compressed_code, VALIDATION_CODE_BOMB_LIMIT) + .map_err(|e| PrepareError::CouldNotDecompressCodeBlob(e.to_string()))?; + + let blob = match prevalidate(&raw_validation_code) { Err(err) => return Err(PrepareError::Prevalidation(format!("{:?}", err))), Ok(b) => b, }; diff --git a/polkadot/node/core/pvf/src/error.rs b/polkadot/node/core/pvf/src/error.rs index 8dc96305eadb..a0634106052d 100644 --- a/polkadot/node/core/pvf/src/error.rs +++ b/polkadot/node/core/pvf/src/error.rs @@ -52,6 +52,9 @@ pub enum InvalidCandidate { /// PVF execution (compilation is not included) took more time than was allotted. #[error("invalid: hard timeout")] HardTimeout, + /// Proof-of-validity failed to decompress correctly + #[error("invalid: PoV failed to decompress")] + PoVDecompressionFailure, } /// Possibly transient issue that may resolve after retries. diff --git a/polkadot/node/core/pvf/src/execute/queue.rs b/polkadot/node/core/pvf/src/execute/queue.rs index bb00a5a652d6..173138c12b8f 100644 --- a/polkadot/node/core/pvf/src/execute/queue.rs +++ b/polkadot/node/core/pvf/src/execute/queue.rs @@ -34,12 +34,14 @@ use polkadot_node_core_pvf_common::{ execute::{JobResponse, WorkerError, WorkerResponse}, SecurityStatus, }; -use polkadot_primitives::{ExecutorParams, ExecutorParamsHash}; +use polkadot_node_primitives::PoV; +use polkadot_primitives::{ExecutorParams, ExecutorParamsHash, PersistedValidationData}; use slotmap::HopSlotMap; use std::{ collections::VecDeque, fmt, path::PathBuf, + sync::Arc, time::{Duration, Instant}, }; @@ -68,7 +70,8 @@ pub enum FromQueue { #[derive(Debug)] pub struct PendingExecutionRequest { pub exec_timeout: Duration, - pub params: Vec, + pub pvd: Arc, + pub pov: Arc, pub executor_params: ExecutorParams, pub result_tx: ResultSender, } @@ -76,7 +79,8 @@ pub struct PendingExecutionRequest { struct ExecuteJob { artifact: ArtifactPathId, exec_timeout: Duration, - params: Vec, + pvd: Arc, + pov: Arc, executor_params: ExecutorParams, result_tx: ResultSender, waiting_since: Instant, @@ -293,7 +297,7 @@ async fn purge_dead(metrics: &Metrics, workers: &mut Workers) { fn handle_to_queue(queue: &mut Queue, to_queue: ToQueue) { let ToQueue::Enqueue { artifact, pending_execution_request } = to_queue; - let PendingExecutionRequest { exec_timeout, params, executor_params, result_tx } = + let PendingExecutionRequest { exec_timeout, pvd, pov, executor_params, result_tx } = pending_execution_request; gum::debug!( target: LOG_TARGET, @@ -304,7 +308,8 @@ fn handle_to_queue(queue: &mut Queue, to_queue: ToQueue) { let job = ExecuteJob { artifact, exec_timeout, - params, + pvd, + pov, executor_params, result_tx, waiting_since: Instant::now(), @@ -355,12 +360,16 @@ async fn handle_job_finish( let (idle_worker, result, duration, sync_channel) = match worker_result { Ok(WorkerInterfaceResponse { worker_response: - WorkerResponse { job_response: JobResponse::Ok { result_descriptor }, duration }, + WorkerResponse { + job_response: JobResponse::Ok { result_descriptor }, + duration, + pov_size, + }, idle_worker, }) => { // TODO: propagate the soft timeout - (Some(idle_worker), Ok(result_descriptor), Some(duration), None) + (Some(idle_worker), Ok((result_descriptor, pov_size)), Some(duration), None) }, Ok(WorkerInterfaceResponse { worker_response: WorkerResponse { job_response: JobResponse::InvalidCandidate(err), .. }, @@ -371,6 +380,16 @@ async fn handle_job_finish( None, None, ), + Ok(WorkerInterfaceResponse { + worker_response: + WorkerResponse { job_response: JobResponse::PoVDecompressionFailure, .. }, + idle_worker, + }) => ( + Some(idle_worker), + Err(ValidationError::Invalid(InvalidCandidate::PoVDecompressionFailure)), + None, + None, + ), Ok(WorkerInterfaceResponse { worker_response: WorkerResponse { job_response: JobResponse::RuntimeConstruction(err), .. }, @@ -573,7 +592,8 @@ fn assign(queue: &mut Queue, worker: Worker, job: ExecuteJob) { idle, job.artifact.clone(), job.exec_timeout, - job.params, + job.pvd, + job.pov, ) .await; QueueEvent::StartWork(worker, result, job.artifact.id, job.result_tx) diff --git a/polkadot/node/core/pvf/src/execute/worker_interface.rs b/polkadot/node/core/pvf/src/execute/worker_interface.rs index d15d7c15426e..77bd6bedd75c 100644 --- a/polkadot/node/core/pvf/src/execute/worker_interface.rs +++ b/polkadot/node/core/pvf/src/execute/worker_interface.rs @@ -32,8 +32,9 @@ use polkadot_node_core_pvf_common::{ execute::{Handshake, WorkerError, WorkerResponse}, worker_dir, SecurityStatus, }; -use polkadot_primitives::ExecutorParams; -use std::{path::Path, time::Duration}; +use polkadot_node_primitives::PoV; +use polkadot_primitives::{ExecutorParams, PersistedValidationData}; +use std::{path::Path, sync::Arc, time::Duration}; use tokio::{io, net::UnixStream}; /// Spawns a new worker with the given program path that acts as the worker and the spawn timeout. @@ -123,7 +124,8 @@ pub async fn start_work( worker: IdleWorker, artifact: ArtifactPathId, execution_timeout: Duration, - validation_params: Vec, + pvd: Arc, + pov: Arc, ) -> Result { let IdleWorker { mut stream, pid, worker_dir } = worker; @@ -137,18 +139,16 @@ pub async fn start_work( ); with_worker_dir_setup(worker_dir, pid, &artifact.path, |worker_dir| async move { - send_request(&mut stream, &validation_params, execution_timeout).await.map_err( - |error| { - gum::warn!( - target: LOG_TARGET, - worker_pid = %pid, - validation_code_hash = ?artifact.id.code_hash, - "failed to send an execute request: {}", - error, - ); - Error::InternalError(InternalValidationError::HostCommunication(error.to_string())) - }, - )?; + send_request(&mut stream, pvd, pov, execution_timeout).await.map_err(|error| { + gum::warn!( + target: LOG_TARGET, + worker_pid = %pid, + validation_code_hash = ?artifact.id.code_hash, + "failed to send an execute request: {}", + error, + ); + Error::InternalError(InternalValidationError::HostCommunication(error.to_string())) + })?; // We use a generous timeout here. This is in addition to the one in the child process, in // case the child stalls. We have a wall clock timeout here in the host, but a CPU timeout @@ -288,10 +288,12 @@ async fn send_execute_handshake(stream: &mut UnixStream, handshake: Handshake) - async fn send_request( stream: &mut UnixStream, - validation_params: &[u8], + pvd: Arc, + pov: Arc, execution_timeout: Duration, ) -> io::Result<()> { - framed_send(stream, validation_params).await?; + framed_send(stream, &pvd.encode()).await?; + framed_send(stream, &pov.encode()).await?; framed_send(stream, &execution_timeout.encode()).await } diff --git a/polkadot/node/core/pvf/src/host.rs b/polkadot/node/core/pvf/src/host.rs index 462631d33b52..961d0f87c171 100644 --- a/polkadot/node/core/pvf/src/host.rs +++ b/polkadot/node/core/pvf/src/host.rs @@ -36,11 +36,14 @@ use polkadot_node_core_pvf_common::{ prepare::PrepareSuccess, pvf::PvfPrepData, }; +use polkadot_node_primitives::PoV; use polkadot_node_subsystem::{SubsystemError, SubsystemResult}; use polkadot_parachain_primitives::primitives::ValidationResult; +use polkadot_primitives::PersistedValidationData; use std::{ collections::HashMap, path::PathBuf, + sync::Arc, time::{Duration, SystemTime}, }; @@ -64,7 +67,7 @@ pub const EXECUTE_BINARY_NAME: &str = "polkadot-execute-worker"; pub const HOST_MESSAGE_QUEUE_SIZE: usize = 10; /// An alias to not spell the type for the oneshot sender for the PVF execution result. -pub(crate) type ResultSender = oneshot::Sender>; +pub(crate) type ResultSender = oneshot::Sender>; /// Transmission end used for sending the PVF preparation result. pub(crate) type PrecheckResultSender = oneshot::Sender; @@ -108,7 +111,8 @@ impl ValidationHost { &mut self, pvf: PvfPrepData, exec_timeout: Duration, - params: Vec, + pvd: Arc, + pov: Arc, priority: Priority, result_tx: ResultSender, ) -> Result<(), String> { @@ -116,7 +120,8 @@ impl ValidationHost { .send(ToHost::ExecutePvf(ExecutePvfInputs { pvf, exec_timeout, - params, + pvd, + pov, priority, result_tx, })) @@ -147,7 +152,9 @@ enum ToHost { struct ExecutePvfInputs { pvf: PvfPrepData, exec_timeout: Duration, - params: Vec, + pvd: Arc, + pov: Arc, + // params: Vec, priority: Priority, result_tx: ResultSender, } @@ -539,7 +546,7 @@ async fn handle_execute_pvf( awaiting_prepare: &mut AwaitingPrepare, inputs: ExecutePvfInputs, ) -> Result<(), Fatal> { - let ExecutePvfInputs { pvf, exec_timeout, params, priority, result_tx } = inputs; + let ExecutePvfInputs { pvf, exec_timeout, pvd, pov, priority, result_tx } = inputs; let artifact_id = ArtifactId::from_pvf_prep_data(&pvf); let executor_params = (*pvf.executor_params()).clone(); @@ -558,7 +565,8 @@ async fn handle_execute_pvf( artifact: ArtifactPathId::new(artifact_id, path), pending_execution_request: PendingExecutionRequest { exec_timeout, - params, + pvd, + pov, executor_params, result_tx, }, @@ -587,7 +595,8 @@ async fn handle_execute_pvf( artifact_id, PendingExecutionRequest { exec_timeout, - params, + pvd, + pov, executor_params, result_tx, }, @@ -598,7 +607,7 @@ async fn handle_execute_pvf( ArtifactState::Preparing { .. } => { awaiting_prepare.add( artifact_id, - PendingExecutionRequest { exec_timeout, params, executor_params, result_tx }, + PendingExecutionRequest { exec_timeout, pvd, pov, executor_params, result_tx }, ); }, ArtifactState::FailedToProcess { last_time_failed, num_failures, error } => { @@ -627,7 +636,8 @@ async fn handle_execute_pvf( artifact_id, PendingExecutionRequest { exec_timeout, - params, + pvd, + pov, executor_params, result_tx, }, @@ -648,7 +658,7 @@ async fn handle_execute_pvf( pvf, priority, artifact_id, - PendingExecutionRequest { exec_timeout, params, executor_params, result_tx }, + PendingExecutionRequest { exec_timeout, pvd, pov, executor_params, result_tx }, ) .await?; } @@ -770,7 +780,7 @@ async fn handle_prepare_done( // It's finally time to dispatch all the execution requests that were waiting for this artifact // to be prepared. let pending_requests = awaiting_prepare.take(&artifact_id); - for PendingExecutionRequest { exec_timeout, params, executor_params, result_tx } in + for PendingExecutionRequest { exec_timeout, pvd, pov, executor_params, result_tx } in pending_requests { if result_tx.is_canceled() { @@ -793,7 +803,8 @@ async fn handle_prepare_done( artifact: ArtifactPathId::new(artifact_id.clone(), &path), pending_execution_request: PendingExecutionRequest { exec_timeout, - params, + pvd, + pov, executor_params, result_tx, }, @@ -967,6 +978,8 @@ pub(crate) mod tests { use assert_matches::assert_matches; use futures::future::BoxFuture; use polkadot_node_core_pvf_common::prepare::PrepareStats; + use polkadot_node_primitives::BlockData; + use sp_core::H256; const TEST_EXECUTION_TIMEOUT: Duration = Duration::from_secs(3); pub(crate) const TEST_PREPARATION_TIMEOUT: Duration = Duration::from_secs(30); @@ -1223,12 +1236,21 @@ pub(crate) mod tests { async fn execute_pvf_requests() { let mut test = Builder::default().build(); let mut host = test.host_handle(); + let pvd = Arc::new(PersistedValidationData { + parent_head: Default::default(), + relay_parent_number: 1u32, + relay_parent_storage_root: H256::default(), + max_pov_size: 4096 * 1024, + }); + let pov1 = Arc::new(PoV { block_data: BlockData(b"pov1".to_vec()) }); + let pov2 = Arc::new(PoV { block_data: BlockData(b"pov2".to_vec()) }); let (result_tx, result_rx_pvf_1_1) = oneshot::channel(); host.execute_pvf( PvfPrepData::from_discriminator(1), TEST_EXECUTION_TIMEOUT, - b"pvf1".to_vec(), + pvd.clone(), + pov1.clone(), Priority::Normal, result_tx, ) @@ -1239,7 +1261,8 @@ pub(crate) mod tests { host.execute_pvf( PvfPrepData::from_discriminator(1), TEST_EXECUTION_TIMEOUT, - b"pvf1".to_vec(), + pvd.clone(), + pov1, Priority::Critical, result_tx, ) @@ -1250,7 +1273,8 @@ pub(crate) mod tests { host.execute_pvf( PvfPrepData::from_discriminator(2), TEST_EXECUTION_TIMEOUT, - b"pvf2".to_vec(), + pvd, + pov2, Priority::Normal, result_tx, ) @@ -1382,6 +1406,13 @@ pub(crate) mod tests { async fn test_prepare_done() { let mut test = Builder::default().build(); let mut host = test.host_handle(); + let pvd = Arc::new(PersistedValidationData { + parent_head: Default::default(), + relay_parent_number: 1u32, + relay_parent_storage_root: H256::default(), + max_pov_size: 4096 * 1024, + }); + let pov = Arc::new(PoV { block_data: BlockData(b"pov".to_vec()) }); // Test mixed cases of receiving execute and precheck requests // for the same PVF. @@ -1391,7 +1422,8 @@ pub(crate) mod tests { host.execute_pvf( PvfPrepData::from_discriminator(1), TEST_EXECUTION_TIMEOUT, - b"pvf2".to_vec(), + pvd.clone(), + pov.clone(), Priority::Critical, result_tx, ) @@ -1438,7 +1470,8 @@ pub(crate) mod tests { host.execute_pvf( PvfPrepData::from_discriminator(2), TEST_EXECUTION_TIMEOUT, - b"pvf2".to_vec(), + pvd, + pov, Priority::Critical, result_tx, ) @@ -1534,13 +1567,21 @@ pub(crate) mod tests { async fn test_execute_prepare_retry() { let mut test = Builder::default().build(); let mut host = test.host_handle(); + let pvd = Arc::new(PersistedValidationData { + parent_head: Default::default(), + relay_parent_number: 1u32, + relay_parent_storage_root: H256::default(), + max_pov_size: 4096 * 1024, + }); + let pov = Arc::new(PoV { block_data: BlockData(b"pov".to_vec()) }); // Submit a execute request that fails. let (result_tx, result_rx) = oneshot::channel(); host.execute_pvf( PvfPrepData::from_discriminator(1), TEST_EXECUTION_TIMEOUT, - b"pvf".to_vec(), + pvd.clone(), + pov.clone(), Priority::Critical, result_tx, ) @@ -1570,7 +1611,8 @@ pub(crate) mod tests { host.execute_pvf( PvfPrepData::from_discriminator(1), TEST_EXECUTION_TIMEOUT, - b"pvf".to_vec(), + pvd.clone(), + pov.clone(), Priority::Critical, result_tx_2, ) @@ -1592,7 +1634,8 @@ pub(crate) mod tests { host.execute_pvf( PvfPrepData::from_discriminator(1), TEST_EXECUTION_TIMEOUT, - b"pvf".to_vec(), + pvd.clone(), + pov.clone(), Priority::Critical, result_tx_3, ) @@ -1636,13 +1679,21 @@ pub(crate) mod tests { async fn test_execute_prepare_no_retry() { let mut test = Builder::default().build(); let mut host = test.host_handle(); + let pvd = Arc::new(PersistedValidationData { + parent_head: Default::default(), + relay_parent_number: 1u32, + relay_parent_storage_root: H256::default(), + max_pov_size: 4096 * 1024, + }); + let pov = Arc::new(PoV { block_data: BlockData(b"pov".to_vec()) }); // Submit an execute request that fails. let (result_tx, result_rx) = oneshot::channel(); host.execute_pvf( PvfPrepData::from_discriminator(1), TEST_EXECUTION_TIMEOUT, - b"pvf".to_vec(), + pvd.clone(), + pov.clone(), Priority::Critical, result_tx, ) @@ -1672,7 +1723,8 @@ pub(crate) mod tests { host.execute_pvf( PvfPrepData::from_discriminator(1), TEST_EXECUTION_TIMEOUT, - b"pvf".to_vec(), + pvd.clone(), + pov.clone(), Priority::Critical, result_tx_2, ) @@ -1694,7 +1746,8 @@ pub(crate) mod tests { host.execute_pvf( PvfPrepData::from_discriminator(1), TEST_EXECUTION_TIMEOUT, - b"pvf".to_vec(), + pvd.clone(), + pov.clone(), Priority::Critical, result_tx_3, ) @@ -1755,12 +1808,20 @@ pub(crate) mod tests { async fn cancellation() { let mut test = Builder::default().build(); let mut host = test.host_handle(); + let pvd = Arc::new(PersistedValidationData { + parent_head: Default::default(), + relay_parent_number: 1u32, + relay_parent_storage_root: H256::default(), + max_pov_size: 4096 * 1024, + }); + let pov = Arc::new(PoV { block_data: BlockData(b"pov".to_vec()) }); let (result_tx, result_rx) = oneshot::channel(); host.execute_pvf( PvfPrepData::from_discriminator(1), TEST_EXECUTION_TIMEOUT, - b"pvf1".to_vec(), + pvd, + pov, Priority::Normal, result_tx, ) diff --git a/polkadot/node/core/pvf/tests/it/adder.rs b/polkadot/node/core/pvf/tests/it/adder.rs index 455e8c36c88d..c04ada3d4cd0 100644 --- a/polkadot/node/core/pvf/tests/it/adder.rs +++ b/polkadot/node/core/pvf/tests/it/adder.rs @@ -18,35 +18,39 @@ use super::TestHost; use codec::{Decode, Encode}; +use polkadot_node_primitives::PoV; use polkadot_parachain_primitives::primitives::{ - BlockData as GenericBlockData, HeadData as GenericHeadData, RelayChainBlockNumber, - ValidationParams, + BlockData as GenericBlockData, HeadData as GenericHeadData, }; +use polkadot_primitives::PersistedValidationData; +use sp_core::H256; use test_parachain_adder::{hash_state, BlockData, HeadData}; #[tokio::test] async fn execute_good_block_on_parent() { let parent_head = HeadData { number: 0, parent_hash: [0; 32], post_state: hash_state(0) }; - let block_data = BlockData { state: 0, add: 512 }; + let pvd = PersistedValidationData { + parent_head: GenericHeadData(parent_head.encode()), + relay_parent_number: 1u32, + relay_parent_storage_root: H256::default(), + max_pov_size: 4096 * 1024, + }; + let pov = PoV { block_data: GenericBlockData(block_data.encode()) }; let host = TestHost::new().await; let ret = host .validate_candidate( test_parachain_adder::wasm_binary_unwrap(), - ValidationParams { - parent_head: GenericHeadData(parent_head.encode()), - block_data: GenericBlockData(block_data.encode()), - relay_parent_number: 1, - relay_parent_storage_root: Default::default(), - }, + pvd, + pov, Default::default(), ) .await .unwrap(); - let new_head = HeadData::decode(&mut &ret.head_data.0[..]).unwrap(); + let new_head = HeadData::decode(&mut &ret.0.head_data.0[..]).unwrap(); assert_eq!(new_head.number, 1); assert_eq!(new_head.parent_hash, parent_head.hash()); @@ -63,24 +67,26 @@ async fn execute_good_chain_on_parent() { for (number, add) in (0..10).enumerate() { let parent_head = HeadData { number: number as u64, parent_hash, post_state: hash_state(last_state) }; - let block_data = BlockData { state: last_state, add }; + let pvd = PersistedValidationData { + parent_head: GenericHeadData(parent_head.encode()), + relay_parent_number: 1u32, + relay_parent_storage_root: H256::default(), + max_pov_size: 4096 * 1024, + }; + let pov = PoV { block_data: GenericBlockData(block_data.encode()) }; let ret = host .validate_candidate( test_parachain_adder::wasm_binary_unwrap(), - ValidationParams { - parent_head: GenericHeadData(parent_head.encode()), - block_data: GenericBlockData(block_data.encode()), - relay_parent_number: number as RelayChainBlockNumber + 1, - relay_parent_storage_root: Default::default(), - }, + pvd, + pov, Default::default(), ) .await .unwrap(); - let new_head = HeadData::decode(&mut &ret.head_data.0[..]).unwrap(); + let new_head = HeadData::decode(&mut &ret.0.head_data.0[..]).unwrap(); assert_eq!(new_head.number, number as u64 + 1); assert_eq!(new_head.parent_hash, parent_head.hash()); @@ -94,23 +100,25 @@ async fn execute_good_chain_on_parent() { #[tokio::test] async fn execute_bad_block_on_parent() { let parent_head = HeadData { number: 0, parent_hash: [0; 32], post_state: hash_state(0) }; - let block_data = BlockData { state: 256, // start state is wrong. add: 256, }; + let pvd = PersistedValidationData { + parent_head: GenericHeadData(parent_head.encode()), + relay_parent_number: 1u32, + relay_parent_storage_root: H256::default(), + max_pov_size: 4096 * 1024, + }; + let pov = PoV { block_data: GenericBlockData(block_data.encode()) }; let host = TestHost::new().await; let _err = host .validate_candidate( test_parachain_adder::wasm_binary_unwrap(), - ValidationParams { - parent_head: GenericHeadData(parent_head.encode()), - block_data: GenericBlockData(block_data.encode()), - relay_parent_number: 1, - relay_parent_storage_root: Default::default(), - }, + pvd, + pov, Default::default(), ) .await @@ -124,21 +132,24 @@ async fn stress_spawn() { async fn execute(host: std::sync::Arc) { let parent_head = HeadData { number: 0, parent_hash: [0; 32], post_state: hash_state(0) }; let block_data = BlockData { state: 0, add: 512 }; + let pvd = PersistedValidationData { + parent_head: GenericHeadData(parent_head.encode()), + relay_parent_number: 1u32, + relay_parent_storage_root: H256::default(), + max_pov_size: 4096 * 1024, + }; + let pov = PoV { block_data: GenericBlockData(block_data.encode()) }; let ret = host .validate_candidate( test_parachain_adder::wasm_binary_unwrap(), - ValidationParams { - parent_head: GenericHeadData(parent_head.encode()), - block_data: GenericBlockData(block_data.encode()), - relay_parent_number: 1, - relay_parent_storage_root: Default::default(), - }, + pvd, + pov, Default::default(), ) .await .unwrap(); - let new_head = HeadData::decode(&mut &ret.head_data.0[..]).unwrap(); + let new_head = HeadData::decode(&mut &ret.0.head_data.0[..]).unwrap(); assert_eq!(new_head.number, 1); assert_eq!(new_head.parent_hash, parent_head.hash()); @@ -161,21 +172,24 @@ async fn execute_can_run_serially() { async fn execute(host: std::sync::Arc) { let parent_head = HeadData { number: 0, parent_hash: [0; 32], post_state: hash_state(0) }; let block_data = BlockData { state: 0, add: 512 }; + let pvd = PersistedValidationData { + parent_head: GenericHeadData(parent_head.encode()), + relay_parent_number: 1u32, + relay_parent_storage_root: H256::default(), + max_pov_size: 4096 * 1024, + }; + let pov = PoV { block_data: GenericBlockData(block_data.encode()) }; let ret = host .validate_candidate( test_parachain_adder::wasm_binary_unwrap(), - ValidationParams { - parent_head: GenericHeadData(parent_head.encode()), - block_data: GenericBlockData(block_data.encode()), - relay_parent_number: 1, - relay_parent_storage_root: Default::default(), - }, + pvd, + pov, Default::default(), ) .await .unwrap(); - let new_head = HeadData::decode(&mut &ret.head_data.0[..]).unwrap(); + let new_head = HeadData::decode(&mut &ret.0.head_data.0[..]).unwrap(); assert_eq!(new_head.number, 1); assert_eq!(new_head.parent_hash, parent_head.hash()); diff --git a/polkadot/node/core/pvf/tests/it/main.rs b/polkadot/node/core/pvf/tests/it/main.rs index 9ad486657512..be7197dcc29e 100644 --- a/polkadot/node/core/pvf/tests/it/main.rs +++ b/polkadot/node/core/pvf/tests/it/main.rs @@ -17,7 +17,6 @@ //! General PVF host integration tests checking the functionality of the PVF host itself. use assert_matches::assert_matches; -use codec::Encode as _; #[cfg(all(feature = "ci-only-tests", target_os = "linux"))] use polkadot_node_core_pvf::SecurityStatus; use polkadot_node_core_pvf::{ @@ -25,10 +24,14 @@ use polkadot_node_core_pvf::{ PossiblyInvalidError, PrepareError, PrepareJobKind, PvfPrepData, ValidationError, ValidationHost, JOB_TIMEOUT_WALL_CLOCK_FACTOR, }; -use polkadot_parachain_primitives::primitives::{BlockData, ValidationParams, ValidationResult}; -use polkadot_primitives::{ExecutorParam, ExecutorParams, PvfExecKind, PvfPrepKind}; +use polkadot_node_primitives::{PoV, POV_BOMB_LIMIT, VALIDATION_CODE_BOMB_LIMIT}; +use polkadot_parachain_primitives::primitives::{BlockData, ValidationResult}; +use polkadot_primitives::{ + ExecutorParam, ExecutorParams, PersistedValidationData, PvfExecKind, PvfPrepKind, +}; +use sp_core::H256; -use std::{io::Write, time::Duration}; +use std::{io::Write, sync::Arc, time::Duration}; use tokio::sync::Mutex; mod adder; @@ -80,9 +83,6 @@ impl TestHost { ) -> Result<(), PrepareError> { let (result_tx, result_rx) = futures::channel::oneshot::channel(); - let code = sp_maybe_compressed_blob::decompress(code, 16 * 1024 * 1024) - .expect("Compression works"); - self.host .lock() .await @@ -103,14 +103,12 @@ impl TestHost { async fn validate_candidate( &self, code: &[u8], - params: ValidationParams, + pvd: PersistedValidationData, + pov: PoV, executor_params: ExecutorParams, - ) -> Result { + ) -> Result<(ValidationResult, u32), ValidationError> { let (result_tx, result_rx) = futures::channel::oneshot::channel(); - let code = sp_maybe_compressed_blob::decompress(code, 16 * 1024 * 1024) - .expect("Compression works"); - self.host .lock() .await @@ -122,7 +120,8 @@ impl TestHost { PrepareJobKind::Compilation, ), TEST_EXECUTION_TIMEOUT, - params.encode(), + Arc::new(pvd), + Arc::new(pov), polkadot_node_core_pvf::Priority::Normal, result_tx, ) @@ -159,19 +158,17 @@ async fn prepare_job_terminates_on_timeout() { #[tokio::test] async fn execute_job_terminates_on_timeout() { let host = TestHost::new().await; + let pvd = PersistedValidationData { + parent_head: Default::default(), + relay_parent_number: 1u32, + relay_parent_storage_root: H256::default(), + max_pov_size: 4096 * 1024, + }; + let pov = PoV { block_data: BlockData(Vec::new()) }; let start = std::time::Instant::now(); let result = host - .validate_candidate( - test_parachain_halt::wasm_binary_unwrap(), - ValidationParams { - block_data: BlockData(Vec::new()), - parent_head: Default::default(), - relay_parent_number: 1, - relay_parent_storage_root: Default::default(), - }, - Default::default(), - ) + .validate_candidate(test_parachain_halt::wasm_binary_unwrap(), pvd, pov, Default::default()) .await; match result { @@ -189,24 +186,23 @@ async fn execute_job_terminates_on_timeout() { async fn ensure_parallel_execution() { // Run some jobs that do not complete, thus timing out. let host = TestHost::new().await; + let pvd = PersistedValidationData { + parent_head: Default::default(), + relay_parent_number: 1u32, + relay_parent_storage_root: H256::default(), + max_pov_size: 4096 * 1024, + }; + let pov = PoV { block_data: BlockData(Vec::new()) }; let execute_pvf_future_1 = host.validate_candidate( test_parachain_halt::wasm_binary_unwrap(), - ValidationParams { - block_data: BlockData(Vec::new()), - parent_head: Default::default(), - relay_parent_number: 1, - relay_parent_storage_root: Default::default(), - }, + pvd.clone(), + pov.clone(), Default::default(), ); let execute_pvf_future_2 = host.validate_candidate( test_parachain_halt::wasm_binary_unwrap(), - ValidationParams { - block_data: BlockData(Vec::new()), - parent_head: Default::default(), - relay_parent_number: 1, - relay_parent_storage_root: Default::default(), - }, + pvd, + pov, Default::default(), ); @@ -237,6 +233,13 @@ async fn execute_queue_doesnt_stall_if_workers_died() { cfg.execute_workers_max_num = 5; }) .await; + let pvd = PersistedValidationData { + parent_head: Default::default(), + relay_parent_number: 1u32, + relay_parent_storage_root: H256::default(), + max_pov_size: 4096 * 1024, + }; + let pov = PoV { block_data: BlockData(Vec::new()) }; // Here we spawn 8 validation jobs for the `halt` PVF and share those between 5 workers. The // first five jobs should timeout and the workers killed. For the next 3 jobs a new batch of @@ -245,12 +248,8 @@ async fn execute_queue_doesnt_stall_if_workers_died() { futures::future::join_all((0u8..=8).map(|_| { host.validate_candidate( test_parachain_halt::wasm_binary_unwrap(), - ValidationParams { - block_data: BlockData(Vec::new()), - parent_head: Default::default(), - relay_parent_number: 1, - relay_parent_storage_root: Default::default(), - }, + pvd.clone(), + pov.clone(), Default::default(), ) })) @@ -275,6 +274,13 @@ async fn execute_queue_doesnt_stall_with_varying_executor_params() { cfg.execute_workers_max_num = 2; }) .await; + let pvd = PersistedValidationData { + parent_head: Default::default(), + relay_parent_number: 1u32, + relay_parent_storage_root: H256::default(), + max_pov_size: 4096 * 1024, + }; + let pov = PoV { block_data: BlockData(Vec::new()) }; let executor_params_1 = ExecutorParams::default(); let executor_params_2 = ExecutorParams::from(&[ExecutorParam::StackLogicalMax(1024)][..]); @@ -288,12 +294,8 @@ async fn execute_queue_doesnt_stall_with_varying_executor_params() { futures::future::join_all((0u8..6).map(|i| { host.validate_candidate( test_parachain_halt::wasm_binary_unwrap(), - ValidationParams { - block_data: BlockData(Vec::new()), - parent_head: Default::default(), - relay_parent_number: 1, - relay_parent_storage_root: Default::default(), - }, + pvd.clone(), + pov.clone(), match i % 3 { 0 => executor_params_1.clone(), _ => executor_params_2.clone(), @@ -324,6 +326,13 @@ async fn execute_queue_doesnt_stall_with_varying_executor_params() { async fn deleting_prepared_artifact_does_not_dispute() { let host = TestHost::new().await; let cache_dir = host.cache_dir.path(); + let pvd = PersistedValidationData { + parent_head: Default::default(), + relay_parent_number: 1u32, + relay_parent_storage_root: H256::default(), + max_pov_size: 4096 * 1024, + }; + let pov = PoV { block_data: BlockData(Vec::new()) }; let _stats = host .precheck_pvf(test_parachain_halt::wasm_binary_unwrap(), Default::default()) @@ -347,16 +356,7 @@ async fn deleting_prepared_artifact_does_not_dispute() { // Try to validate, artifact should get recreated. let result = host - .validate_candidate( - test_parachain_halt::wasm_binary_unwrap(), - ValidationParams { - block_data: BlockData(Vec::new()), - parent_head: Default::default(), - relay_parent_number: 1, - relay_parent_storage_root: Default::default(), - }, - Default::default(), - ) + .validate_candidate(test_parachain_halt::wasm_binary_unwrap(), pvd, pov, Default::default()) .await; assert_matches!(result, Err(ValidationError::Invalid(InvalidCandidate::HardTimeout))); @@ -367,6 +367,13 @@ async fn deleting_prepared_artifact_does_not_dispute() { async fn corrupted_prepared_artifact_does_not_dispute() { let host = TestHost::new().await; let cache_dir = host.cache_dir.path(); + let pvd = PersistedValidationData { + parent_head: Default::default(), + relay_parent_number: 1u32, + relay_parent_storage_root: H256::default(), + max_pov_size: 4096 * 1024, + }; + let pov = PoV { block_data: BlockData(Vec::new()) }; let _stats = host .precheck_pvf(test_parachain_halt::wasm_binary_unwrap(), Default::default()) @@ -400,16 +407,7 @@ async fn corrupted_prepared_artifact_does_not_dispute() { // Try to validate, artifact should get removed because of the corruption. let result = host - .validate_candidate( - test_parachain_halt::wasm_binary_unwrap(), - ValidationParams { - block_data: BlockData(Vec::new()), - parent_head: Default::default(), - relay_parent_number: 1, - relay_parent_storage_root: Default::default(), - }, - Default::default(), - ) + .validate_candidate(test_parachain_halt::wasm_binary_unwrap(), pvd, pov, Default::default()) .await; assert_matches!( @@ -652,3 +650,65 @@ async fn artifact_does_reprepare_on_meaningful_exec_parameter_change() { assert_eq!(cache_dir_contents.len(), 3); // new artifact has been added } + +// Checks that we cannot prepare oversized compressed code +#[tokio::test] +async fn invalid_compressed_code_fails_prechecking() { + let host = TestHost::new().await; + let raw_code = vec![2u8; VALIDATION_CODE_BOMB_LIMIT + 1]; + let validation_code = + sp_maybe_compressed_blob::compress(&raw_code, VALIDATION_CODE_BOMB_LIMIT + 1).unwrap(); + + let res = host.precheck_pvf(&validation_code, Default::default()).await; + + assert_matches!(res, Err(PrepareError::CouldNotDecompressCodeBlob(_))); +} + +// Checks that we cannot validate with oversized compressed code +#[tokio::test] +async fn invalid_compressed_code_fails_validation() { + let host = TestHost::new().await; + let pvd = PersistedValidationData { + parent_head: Default::default(), + relay_parent_number: 1u32, + relay_parent_storage_root: H256::default(), + max_pov_size: 4096 * 1024, + }; + let pov = PoV { block_data: BlockData(Vec::new()) }; + + let raw_code = vec![2u8; VALIDATION_CODE_BOMB_LIMIT + 1]; + let validation_code = + sp_maybe_compressed_blob::compress(&raw_code, VALIDATION_CODE_BOMB_LIMIT + 1).unwrap(); + + let result = host.validate_candidate(&validation_code, pvd, pov, Default::default()).await; + + assert_matches!( + result, + Err(ValidationError::Preparation(PrepareError::CouldNotDecompressCodeBlob(_))) + ); +} + +// Checks that we cannot validate with an oversized PoV +#[tokio::test] +async fn invalid_compressed_pov_fails_validation() { + let host = TestHost::new().await; + let pvd = PersistedValidationData { + parent_head: Default::default(), + relay_parent_number: 1u32, + relay_parent_storage_root: H256::default(), + max_pov_size: 4096 * 1024, + }; + let raw_block_data = vec![1u8; POV_BOMB_LIMIT + 1]; + let block_data = + sp_maybe_compressed_blob::compress(&raw_block_data, POV_BOMB_LIMIT + 1).unwrap(); + let pov = PoV { block_data: BlockData(block_data) }; + + let result = host + .validate_candidate(test_parachain_halt::wasm_binary_unwrap(), pvd, pov, Default::default()) + .await; + + assert_matches!( + result, + Err(ValidationError::Invalid(InvalidCandidate::PoVDecompressionFailure)) + ); +} diff --git a/polkadot/node/core/pvf/tests/it/process.rs b/polkadot/node/core/pvf/tests/it/process.rs index b8fd2cdce0ce..b3023c8a45c3 100644 --- a/polkadot/node/core/pvf/tests/it/process.rs +++ b/polkadot/node/core/pvf/tests/it/process.rs @@ -23,11 +23,14 @@ use codec::Encode; use polkadot_node_core_pvf::{ InvalidCandidate, PossiblyInvalidError, PrepareError, ValidationError, }; +use polkadot_node_primitives::PoV; use polkadot_parachain_primitives::primitives::{ - BlockData as GenericBlockData, HeadData as GenericHeadData, ValidationParams, + BlockData as GenericBlockData, HeadData as GenericHeadData, }; +use polkadot_primitives::PersistedValidationData; use procfs::process; use rusty_fork::rusty_fork_test; +use sp_core::H256; use std::{future::Future, sync::Arc, time::Duration}; use test_parachain_adder::{hash_state, BlockData, HeadData}; @@ -125,15 +128,18 @@ rusty_fork_test! { test_wrapper(|host, _sid| async move { let parent_head = HeadData { number: 0, parent_hash: [0; 32], post_state: hash_state(0) }; let block_data = BlockData { state: 0, add: 512 }; + let pvd = PersistedValidationData { + parent_head: GenericHeadData(parent_head.encode()), + relay_parent_number: 1u32, + relay_parent_storage_root: H256::default(), + max_pov_size: 4096 * 1024, + }; + let pov = PoV { block_data: GenericBlockData(block_data.encode()) }; host .validate_candidate( test_parachain_adder::wasm_binary_unwrap(), - ValidationParams { - parent_head: GenericHeadData(parent_head.encode()), - block_data: GenericBlockData(block_data.encode()), - relay_parent_number: 1, - relay_parent_storage_root: Default::default(), - }, + pvd, + pov, Default::default(), ) .await @@ -166,17 +172,20 @@ rusty_fork_test! { // Prepare the artifact ahead of time. let binary = test_parachain_halt::wasm_binary_unwrap(); host.precheck_pvf(binary, Default::default()).await.unwrap(); + let pvd = PersistedValidationData { + parent_head: GenericHeadData(HeadData::default().encode()), + relay_parent_number: 1u32, + relay_parent_storage_root: H256::default(), + max_pov_size: 4096 * 1024, + }; + let pov = PoV { block_data: GenericBlockData(Vec::new()) }; let (result, _) = futures::join!( // Choose an job that would normally take the entire timeout. host.validate_candidate( binary, - ValidationParams { - block_data: GenericBlockData(Vec::new()), - parent_head: Default::default(), - relay_parent_number: 1, - relay_parent_storage_root: Default::default(), - }, + pvd, + pov, Default::default(), ), // Send a stop signal to pause the worker. @@ -218,17 +227,20 @@ rusty_fork_test! { // Prepare the artifact ahead of time. let binary = test_parachain_halt::wasm_binary_unwrap(); host.precheck_pvf(binary, Default::default()).await.unwrap(); + let pvd = PersistedValidationData { + parent_head: GenericHeadData(HeadData::default().encode()), + relay_parent_number: 1u32, + relay_parent_storage_root: H256::default(), + max_pov_size: 4096 * 1024, + }; + let pov = PoV { block_data: GenericBlockData(Vec::new()) }; let (result, _) = futures::join!( // Choose an job that would normally take the entire timeout. host.validate_candidate( binary, - ValidationParams { - block_data: GenericBlockData(Vec::new()), - parent_head: Default::default(), - relay_parent_number: 1, - relay_parent_storage_root: Default::default(), - }, + pvd, + pov, Default::default(), ), // Run a future that kills the job while it's running. @@ -274,17 +286,20 @@ rusty_fork_test! { // Prepare the artifact ahead of time. let binary = test_parachain_halt::wasm_binary_unwrap(); host.precheck_pvf(binary, Default::default()).await.unwrap(); + let pvd = PersistedValidationData { + parent_head: GenericHeadData(HeadData::default().encode()), + relay_parent_number: 1u32, + relay_parent_storage_root: H256::default(), + max_pov_size: 4096 * 1024, + }; + let pov = PoV { block_data: GenericBlockData(Vec::new()) }; let (result, _) = futures::join!( // Choose a job that would normally take the entire timeout. host.validate_candidate( binary, - ValidationParams { - block_data: GenericBlockData(Vec::new()), - parent_head: Default::default(), - relay_parent_number: 1, - relay_parent_storage_root: Default::default(), - }, + pvd, + pov, Default::default(), ), // Run a future that kills the job while it's running. @@ -342,17 +357,20 @@ rusty_fork_test! { // Prepare the artifact ahead of time. let binary = test_parachain_halt::wasm_binary_unwrap(); host.precheck_pvf(binary, Default::default()).await.unwrap(); + let pvd = PersistedValidationData { + parent_head: GenericHeadData(HeadData::default().encode()), + relay_parent_number: 1u32, + relay_parent_storage_root: H256::default(), + max_pov_size: 4096 * 1024, + }; + let pov = PoV { block_data: GenericBlockData(Vec::new()) }; let _ = futures::join!( // Choose a job that would normally take the entire timeout. host.validate_candidate( binary, - ValidationParams { - block_data: GenericBlockData(Vec::new()), - parent_head: Default::default(), - relay_parent_number: 1, - relay_parent_storage_root: Default::default(), - }, + pvd, + pov, Default::default(), ), // Run a future that tests the thread count while the worker is running. From 477c9aea43acb53b58f18fda6177c0babc97fa9e Mon Sep 17 00:00:00 2001 From: Dmitry Sinyavin Date: Thu, 1 Aug 2024 18:00:05 +0200 Subject: [PATCH 02/12] Move metrics to PVF host --- .../node/core/candidate-validation/src/lib.rs | 12 ++--- .../core/candidate-validation/src/metrics.rs | 44 ------------------- .../core/candidate-validation/src/tests.rs | 22 +++++----- polkadot/node/core/pvf/src/execute/queue.rs | 18 ++++++-- polkadot/node/core/pvf/src/host.rs | 2 +- polkadot/node/core/pvf/src/metrics.rs | 44 +++++++++++++++++++ polkadot/node/core/pvf/tests/it/adder.rs | 8 ++-- polkadot/node/core/pvf/tests/it/main.rs | 2 +- 8 files changed, 79 insertions(+), 73 deletions(-) diff --git a/polkadot/node/core/candidate-validation/src/lib.rs b/polkadot/node/core/candidate-validation/src/lib.rs index 2b54e77274b6..3d4cdb73d69f 100644 --- a/polkadot/node/core/candidate-validation/src/lib.rs +++ b/polkadot/node/core/candidate-validation/src/lib.rs @@ -853,8 +853,6 @@ async fn validate_candidate_exhaustive( // metrics.observe_code_size(raw_validation_code.len()); // TODO!!! - metrics.observe_pov_size(pov.block_data.0.len(), true); - let persisted_validation_data = Arc::new(persisted_validation_data); let result = match exec_kind { // Retry is disabled to reduce the chance of nondeterministic blocks getting backed and @@ -935,13 +933,11 @@ async fn validate_candidate_exhaustive( ); Err(ValidationFailed(e.to_string())) }, - Ok((res, pov_size)) => + Ok(res) => if res.head_data.hash() != candidate_receipt.descriptor.para_head { gum::info!(target: LOG_TARGET, ?para_id, "Invalid candidate (para_head)"); Ok(ValidationResult::Invalid(InvalidCandidate::ParaHeadHashMismatch)) } else { - metrics.observe_pov_size(pov_size as usize, false); - let outputs = CandidateCommitments { head_data: res.head_data, upward_messages: res.upward_messages, @@ -978,7 +974,7 @@ trait ValidationBackend { pov: Arc, // The priority for the preparation job. prepare_priority: polkadot_node_core_pvf::Priority, - ) -> Result<(WasmValidationResult, u32), ValidationError>; + ) -> Result; /// Tries executing a PVF. Will retry once if an error is encountered that may have /// been transient. @@ -998,7 +994,7 @@ trait ValidationBackend { retry_delay: Duration, // The priority for the preparation job. prepare_priority: polkadot_node_core_pvf::Priority, - ) -> Result<(WasmValidationResult, u32), ValidationError> { + ) -> Result { let prep_timeout = pvf_prep_timeout(&executor_params, PvfPrepKind::Prepare); // Construct the PVF a single time, since it is an expensive operation. Cloning it is cheap. let pvf = PvfPrepData::from_code( @@ -1123,7 +1119,7 @@ impl ValidationBackend for ValidationHost { pov: Arc, // The priority for the preparation job. prepare_priority: polkadot_node_core_pvf::Priority, - ) -> Result<(WasmValidationResult, u32), ValidationError> { + ) -> Result { let (tx, rx) = oneshot::channel(); if let Err(err) = self.execute_pvf(pvf, exec_timeout, pvd, pov, prepare_priority, tx).await { diff --git a/polkadot/node/core/candidate-validation/src/metrics.rs b/polkadot/node/core/candidate-validation/src/metrics.rs index 28fc957ddb1a..1459907aa599 100644 --- a/polkadot/node/core/candidate-validation/src/metrics.rs +++ b/polkadot/node/core/candidate-validation/src/metrics.rs @@ -23,8 +23,6 @@ pub(crate) struct MetricsInner { pub(crate) validate_from_chain_state: prometheus::Histogram, pub(crate) validate_from_exhaustive: prometheus::Histogram, pub(crate) validate_candidate_exhaustive: prometheus::Histogram, - pub(crate) pov_size: prometheus::HistogramVec, - pub(crate) code_size: prometheus::Histogram, } /// Candidate validation metrics. @@ -70,21 +68,6 @@ impl Metrics { .as_ref() .map(|metrics| metrics.validate_candidate_exhaustive.start_timer()) } - - pub fn observe_code_size(&self, code_size: usize) { - if let Some(metrics) = &self.0 { - metrics.code_size.observe(code_size as f64); - } - } - - pub fn observe_pov_size(&self, pov_size: usize, compressed: bool) { - if let Some(metrics) = &self.0 { - metrics - .pov_size - .with_label_values(&[if compressed { "true" } else { "false" }]) - .observe(pov_size as f64); - } - } } impl metrics::Metrics for Metrics { @@ -121,33 +104,6 @@ impl metrics::Metrics for Metrics { ))?, registry, )?, - pov_size: prometheus::register( - prometheus::HistogramVec::new( - prometheus::HistogramOpts::new( - "polkadot_parachain_candidate_validation_pov_size", - "The compressed and decompressed size of the proof of validity of a candidate", - ) - .buckets( - prometheus::exponential_buckets(16384.0, 2.0, 10) - .expect("arguments are always valid; qed"), - ), - &["compressed"], - )?, - registry, - )?, - code_size: prometheus::register( - prometheus::Histogram::with_opts( - prometheus::HistogramOpts::new( - "polkadot_parachain_candidate_validation_code_size", - "The size of the decompressed WASM validation blob used for checking a candidate", - ) - .buckets( - prometheus::exponential_buckets(16384.0, 2.0, 10) - .expect("arguments are always valid; qed"), - ), - )?, - registry, - )?, }; Ok(Metrics(Some(metrics))) } diff --git a/polkadot/node/core/candidate-validation/src/tests.rs b/polkadot/node/core/candidate-validation/src/tests.rs index 1bd2a39fd47f..d62f3ae0c76c 100644 --- a/polkadot/node/core/candidate-validation/src/tests.rs +++ b/polkadot/node/core/candidate-validation/src/tests.rs @@ -364,17 +364,17 @@ fn check_does_not_match() { } struct MockValidateCandidateBackend { - result_list: Vec>, + result_list: Vec>, num_times_called: usize, } impl MockValidateCandidateBackend { - fn with_hardcoded_result(result: Result<(WasmValidationResult, u32), ValidationError>) -> Self { + fn with_hardcoded_result(result: Result) -> Self { Self { result_list: vec![result], num_times_called: 0 } } fn with_hardcoded_result_list( - result_list: Vec>, + result_list: Vec>, ) -> Self { Self { result_list, num_times_called: 0 } } @@ -389,7 +389,7 @@ impl ValidationBackend for MockValidateCandidateBackend { _pvd: Arc, _pov: Arc, _prepare_priority: polkadot_node_core_pvf::Priority, - ) -> Result<(WasmValidationResult, u32), ValidationError> { + ) -> Result { // This is expected to panic if called more times than expected, indicating an error in the // test. let result = self.result_list[self.num_times_called].clone(); @@ -455,7 +455,7 @@ fn candidate_validation_ok_is_ok() { let candidate_receipt = CandidateReceipt { descriptor, commitments_hash: commitments.hash() }; let v = executor::block_on(validate_candidate_exhaustive( - MockValidateCandidateBackend::with_hardcoded_result(Ok((validation_result, 1))), + MockValidateCandidateBackend::with_hardcoded_result(Ok(validation_result)), validation_data.clone(), validation_code, candidate_receipt, @@ -587,7 +587,7 @@ fn candidate_validation_one_ambiguous_error_is_valid() { let v = executor::block_on(validate_candidate_exhaustive( MockValidateCandidateBackend::with_hardcoded_result_list(vec![ Err(ValidationError::PossiblyInvalid(PossiblyInvalidError::AmbiguousWorkerDeath)), - Ok((validation_result, 1)), + Ok(validation_result), ]), validation_data.clone(), validation_code, @@ -714,7 +714,7 @@ fn candidate_validation_dont_retry_panic_errors() { fn candidate_validation_retry_on_error_helper( exec_kind: PvfExecKind, - mock_errors: Vec>, + mock_errors: Vec>, ) -> Result { let validation_data = PersistedValidationData { max_pov_size: 1024, ..Default::default() }; @@ -830,7 +830,7 @@ fn candidate_validation_commitment_hash_mismatch_is_invalid() { }; let result = executor::block_on(validate_candidate_exhaustive( - MockValidateCandidateBackend::with_hardcoded_result(Ok((validation_result, 1))), + MockValidateCandidateBackend::with_hardcoded_result(Ok(validation_result)), validation_data, validation_code, candidate_receipt, @@ -939,7 +939,7 @@ fn compressed_code_works() { let candidate_receipt = CandidateReceipt { descriptor, commitments_hash: commitments.hash() }; let v = executor::block_on(validate_candidate_exhaustive( - MockValidateCandidateBackend::with_hardcoded_result(Ok((validation_result, 1))), + MockValidateCandidateBackend::with_hardcoded_result(Ok(validation_result)), validation_data, validation_code, candidate_receipt, @@ -971,7 +971,7 @@ impl ValidationBackend for MockPreCheckBackend { _pvd: Arc, _pov: Arc, _prepare_priority: polkadot_node_core_pvf::Priority, - ) -> Result<(WasmValidationResult, u32), ValidationError> { + ) -> Result { unreachable!() } @@ -1125,7 +1125,7 @@ impl ValidationBackend for MockHeadsUp { _pvd: Arc, _pov: Arc, _prepare_priority: polkadot_node_core_pvf::Priority, - ) -> Result<(WasmValidationResult, u32), ValidationError> { + ) -> Result { unreachable!() } diff --git a/polkadot/node/core/pvf/src/execute/queue.rs b/polkadot/node/core/pvf/src/execute/queue.rs index 173138c12b8f..11031bf1074a 100644 --- a/polkadot/node/core/pvf/src/execute/queue.rs +++ b/polkadot/node/core/pvf/src/execute/queue.rs @@ -304,6 +304,7 @@ fn handle_to_queue(queue: &mut Queue, to_queue: ToQueue) { validation_code_hash = ?artifact.id.code_hash, "enqueueing an artifact for execution", ); + queue.metrics.observe_pov_size(pov.block_data.0.len(), true); queue.metrics.execute_enqueued(); let job = ExecuteJob { artifact, @@ -357,7 +358,7 @@ async fn handle_job_finish( artifact_id: ArtifactId, result_tx: ResultSender, ) { - let (idle_worker, result, duration, sync_channel) = match worker_result { + let (idle_worker, result, duration, sync_channel, pov_size) = match worker_result { Ok(WorkerInterfaceResponse { worker_response: WorkerResponse { @@ -369,7 +370,7 @@ async fn handle_job_finish( }) => { // TODO: propagate the soft timeout - (Some(idle_worker), Ok((result_descriptor, pov_size)), Some(duration), None) + (Some(idle_worker), Ok(result_descriptor), Some(duration), None, Some(pov_size)) }, Ok(WorkerInterfaceResponse { worker_response: WorkerResponse { job_response: JobResponse::InvalidCandidate(err), .. }, @@ -379,6 +380,7 @@ async fn handle_job_finish( Err(ValidationError::Invalid(InvalidCandidate::WorkerReportedInvalid(err))), None, None, + None, ), Ok(WorkerInterfaceResponse { worker_response: @@ -389,6 +391,7 @@ async fn handle_job_finish( Err(ValidationError::Invalid(InvalidCandidate::PoVDecompressionFailure)), None, None, + None, ), Ok(WorkerInterfaceResponse { worker_response: @@ -412,39 +415,46 @@ async fn handle_job_finish( ))), None, Some(result_rx), + None, ) }, Err(WorkerInterfaceError::InternalError(err)) | Err(WorkerInterfaceError::WorkerError(WorkerError::InternalError(err))) => - (None, Err(ValidationError::Internal(err)), None, None), + (None, Err(ValidationError::Internal(err)), None, None, None), // Either the worker or the job timed out. Kill the worker in either case. Treated as // definitely-invalid, because if we timed out, there's no time left for a retry. Err(WorkerInterfaceError::HardTimeout) | Err(WorkerInterfaceError::WorkerError(WorkerError::JobTimedOut)) => - (None, Err(ValidationError::Invalid(InvalidCandidate::HardTimeout)), None, None), + (None, Err(ValidationError::Invalid(InvalidCandidate::HardTimeout)), None, None, None), // "Maybe invalid" errors (will retry). Err(WorkerInterfaceError::CommunicationErr(_err)) => ( None, Err(ValidationError::PossiblyInvalid(PossiblyInvalidError::AmbiguousWorkerDeath)), None, None, + None, ), Err(WorkerInterfaceError::WorkerError(WorkerError::JobDied { err, .. })) => ( None, Err(ValidationError::PossiblyInvalid(PossiblyInvalidError::AmbiguousJobDeath(err))), None, None, + None, ), Err(WorkerInterfaceError::WorkerError(WorkerError::JobError(err))) => ( None, Err(ValidationError::PossiblyInvalid(PossiblyInvalidError::JobError(err.to_string()))), None, None, + None, ), }; queue.metrics.execute_finished(); + if let Some(pov_size) = pov_size { + queue.metrics.observe_pov_size(pov_size as usize, false) + } if let Err(ref err) = result { gum::warn!( target: LOG_TARGET, diff --git a/polkadot/node/core/pvf/src/host.rs b/polkadot/node/core/pvf/src/host.rs index 961d0f87c171..96c686a2b145 100644 --- a/polkadot/node/core/pvf/src/host.rs +++ b/polkadot/node/core/pvf/src/host.rs @@ -67,7 +67,7 @@ pub const EXECUTE_BINARY_NAME: &str = "polkadot-execute-worker"; pub const HOST_MESSAGE_QUEUE_SIZE: usize = 10; /// An alias to not spell the type for the oneshot sender for the PVF execution result. -pub(crate) type ResultSender = oneshot::Sender>; +pub(crate) type ResultSender = oneshot::Sender>; /// Transmission end used for sending the PVF preparation result. pub(crate) type PrecheckResultSender = oneshot::Sender; diff --git a/polkadot/node/core/pvf/src/metrics.rs b/polkadot/node/core/pvf/src/metrics.rs index bc8d300037fe..9404ba543eaa 100644 --- a/polkadot/node/core/pvf/src/metrics.rs +++ b/polkadot/node/core/pvf/src/metrics.rs @@ -105,6 +105,21 @@ impl Metrics { .observe((memory_stats.peak_tracked_alloc / 1024) as f64); } } + + pub(crate) fn observe_code_size(&self, code_size: usize) { + if let Some(metrics) = &self.0 { + metrics.code_size.observe(code_size as f64); + } + } + + pub(crate) fn observe_pov_size(&self, pov_size: usize, compressed: bool) { + if let Some(metrics) = &self.0 { + metrics + .pov_size + .with_label_values(&[if compressed { "true" } else { "false" }]) + .observe(pov_size as f64); + } + } } #[derive(Clone)] @@ -129,6 +144,8 @@ struct MetricsInner { preparation_max_resident: prometheus::Histogram, // Peak allocation value, tracked by tracking-allocator preparation_peak_tracked_allocation: prometheus::Histogram, + pov_size: prometheus::HistogramVec, + code_size: prometheus::Histogram, } impl metrics::Metrics for Metrics { @@ -323,6 +340,33 @@ impl metrics::Metrics for Metrics { )?, registry, )?, + pov_size: prometheus::register( + prometheus::HistogramVec::new( + prometheus::HistogramOpts::new( + "polkadot_parachain_candidate_validation_pov_size", + "The compressed and decompressed size of the proof of validity of a candidate", + ) + .buckets( + prometheus::exponential_buckets(16384.0, 2.0, 10) + .expect("arguments are always valid; qed"), + ), + &["compressed"], + )?, + registry, + )?, + code_size: prometheus::register( + prometheus::Histogram::with_opts( + prometheus::HistogramOpts::new( + "polkadot_parachain_candidate_validation_code_size", + "The size of the decompressed WASM validation blob used for checking a candidate", + ) + .buckets( + prometheus::exponential_buckets(16384.0, 2.0, 10) + .expect("arguments are always valid; qed"), + ), + )?, + registry, + )?, }; Ok(Metrics(Some(inner))) } diff --git a/polkadot/node/core/pvf/tests/it/adder.rs b/polkadot/node/core/pvf/tests/it/adder.rs index c04ada3d4cd0..1a95a28fe077 100644 --- a/polkadot/node/core/pvf/tests/it/adder.rs +++ b/polkadot/node/core/pvf/tests/it/adder.rs @@ -50,7 +50,7 @@ async fn execute_good_block_on_parent() { .await .unwrap(); - let new_head = HeadData::decode(&mut &ret.0.head_data.0[..]).unwrap(); + let new_head = HeadData::decode(&mut &ret.head_data.0[..]).unwrap(); assert_eq!(new_head.number, 1); assert_eq!(new_head.parent_hash, parent_head.hash()); @@ -86,7 +86,7 @@ async fn execute_good_chain_on_parent() { .await .unwrap(); - let new_head = HeadData::decode(&mut &ret.0.head_data.0[..]).unwrap(); + let new_head = HeadData::decode(&mut &ret.head_data.0[..]).unwrap(); assert_eq!(new_head.number, number as u64 + 1); assert_eq!(new_head.parent_hash, parent_head.hash()); @@ -149,7 +149,7 @@ async fn stress_spawn() { .await .unwrap(); - let new_head = HeadData::decode(&mut &ret.0.head_data.0[..]).unwrap(); + let new_head = HeadData::decode(&mut &ret.head_data.0[..]).unwrap(); assert_eq!(new_head.number, 1); assert_eq!(new_head.parent_hash, parent_head.hash()); @@ -189,7 +189,7 @@ async fn execute_can_run_serially() { .await .unwrap(); - let new_head = HeadData::decode(&mut &ret.0.head_data.0[..]).unwrap(); + let new_head = HeadData::decode(&mut &ret.head_data.0[..]).unwrap(); assert_eq!(new_head.number, 1); assert_eq!(new_head.parent_hash, parent_head.hash()); diff --git a/polkadot/node/core/pvf/tests/it/main.rs b/polkadot/node/core/pvf/tests/it/main.rs index be7197dcc29e..a4a085318957 100644 --- a/polkadot/node/core/pvf/tests/it/main.rs +++ b/polkadot/node/core/pvf/tests/it/main.rs @@ -106,7 +106,7 @@ impl TestHost { pvd: PersistedValidationData, pov: PoV, executor_params: ExecutorParams, - ) -> Result<(ValidationResult, u32), ValidationError> { + ) -> Result { let (result_tx, result_rx) = futures::channel::oneshot::channel(); self.host From 05d80913061ff410b298a9423fe6fbe0ec544ccf Mon Sep 17 00:00:00 2001 From: Dmitry Sinyavin Date: Thu, 1 Aug 2024 18:32:14 +0200 Subject: [PATCH 03/12] Observe Wasm code size --- .../node/core/candidate-validation/src/lib.rs | 2 - polkadot/node/core/pvf/common/src/prepare.rs | 2 + .../node/core/pvf/prepare-worker/src/lib.rs | 38 +++++++++++++++---- .../core/pvf/src/prepare/worker_interface.rs | 10 ++++- 4 files changed, 40 insertions(+), 12 deletions(-) diff --git a/polkadot/node/core/candidate-validation/src/lib.rs b/polkadot/node/core/candidate-validation/src/lib.rs index 3d4cdb73d69f..103d29e8d269 100644 --- a/polkadot/node/core/candidate-validation/src/lib.rs +++ b/polkadot/node/core/candidate-validation/src/lib.rs @@ -851,8 +851,6 @@ async fn validate_candidate_exhaustive( return Ok(ValidationResult::Invalid(e)) } - // metrics.observe_code_size(raw_validation_code.len()); // TODO!!! - let persisted_validation_data = Arc::new(persisted_validation_data); let result = match exec_kind { // Retry is disabled to reduce the chance of nondeterministic blocks getting backed and diff --git a/polkadot/node/core/pvf/common/src/prepare.rs b/polkadot/node/core/pvf/common/src/prepare.rs index 81e165a7b8a4..4cd1beb30991 100644 --- a/polkadot/node/core/pvf/common/src/prepare.rs +++ b/polkadot/node/core/pvf/common/src/prepare.rs @@ -44,6 +44,8 @@ pub struct PrepareStats { pub cpu_time_elapsed: std::time::Duration, /// The observed memory statistics for the preparation job. pub memory_stats: MemoryStats, + /// The decompressed Wasm code length observed during the preparation. + pub observed_wasm_code_len: u32, } /// Helper struct to contain all the memory stats, including `MemoryAllocationStats` and, if diff --git a/polkadot/node/core/pvf/prepare-worker/src/lib.rs b/polkadot/node/core/pvf/prepare-worker/src/lib.rs index 82e54b1c9222..09b8640da4c5 100644 --- a/polkadot/node/core/pvf/prepare-worker/src/lib.rs +++ b/polkadot/node/core/pvf/prepare-worker/src/lib.rs @@ -106,6 +106,12 @@ impl AsRef<[u8]> for CompiledArtifact { } } +#[derive(Encode, Decode)] +pub struct PrepareOutcome { + pub compiled_artifact: CompiledArtifact, + pub observed_wasm_code_len: u32, +} + /// Get a worker request. fn recv_request(stream: &mut UnixStream) -> io::Result { let pvf = framed_recv_blocking(stream)?; @@ -295,11 +301,12 @@ pub fn worker_entrypoint( ); } -fn prepare_artifact(pvf: PvfPrepData) -> Result { +fn prepare_artifact(pvf: PvfPrepData) -> Result { let maybe_compressed_code = pvf.maybe_compressed_code(); let raw_validation_code = sp_maybe_compressed_blob::decompress(&maybe_compressed_code, VALIDATION_CODE_BOMB_LIMIT) .map_err(|e| PrepareError::CouldNotDecompressCodeBlob(e.to_string()))?; + let observed_wasm_code_len = raw_validation_code.len() as u32; let blob = match prevalidate(&raw_validation_code) { Err(err) => return Err(PrepareError::Prevalidation(format!("{:?}", err))), @@ -307,7 +314,10 @@ fn prepare_artifact(pvf: PvfPrepData) -> Result }; match prepare(blob, &pvf.executor_params()) { - Ok(compiled_artifact) => Ok(CompiledArtifact::new(compiled_artifact)), + Ok(compiled_artifact) => Ok(PrepareOutcome { + compiled_artifact: CompiledArtifact::new(compiled_artifact), + observed_wasm_code_len, + }), Err(err) => Err(PrepareError::Preparation(format!("{:?}", err))), } } @@ -328,6 +338,7 @@ fn runtime_construction_check( struct JobResponse { artifact: CompiledArtifact, memory_stats: MemoryStats, + observed_wasm_code_len: u32, } #[cfg(target_os = "linux")] @@ -519,7 +530,10 @@ fn handle_child_process( // anyway. if let PrepareJobKind::Prechecking = prepare_job_kind { result = result.and_then(|output| { - runtime_construction_check(output.0.as_ref(), &executor_params)?; + runtime_construction_check( + output.0.compiled_artifact.as_ref(), + &executor_params, + )?; Ok(output) }); } @@ -559,9 +573,9 @@ fn handle_child_process( Ok(ok) => { cfg_if::cfg_if! { if #[cfg(target_os = "linux")] { - let (artifact, max_rss) = ok; + let (PrepareOutcome { compiled_artifact, observed_wasm_code_len }, max_rss) = ok; } else { - let artifact = ok; + let PrepareOutcome { compiled_artifact, observed_wasm_code_len } = ok; } } @@ -580,7 +594,11 @@ fn handle_child_process( peak_tracked_alloc: if peak_alloc > 0 { peak_alloc as u64 } else { 0u64 }, }; - Ok(JobResponse { artifact, memory_stats }) + Ok(JobResponse { + artifact: compiled_artifact, + observed_wasm_code_len, + memory_stats, + }) }, } }, @@ -671,7 +689,7 @@ fn handle_parent_process( match result { Err(err) => Err(err), - Ok(JobResponse { artifact, memory_stats }) => { + Ok(JobResponse { artifact, memory_stats, observed_wasm_code_len }) => { // The exit status should have been zero if no error occurred. if exit_status != 0 { return Err(PrepareError::JobError(format!( @@ -702,7 +720,11 @@ fn handle_parent_process( let checksum = blake3::hash(&artifact.as_ref()).to_hex().to_string(); Ok(PrepareWorkerSuccess { checksum, - stats: PrepareStats { memory_stats, cpu_time_elapsed: cpu_tv }, + stats: PrepareStats { + memory_stats, + cpu_time_elapsed: cpu_tv, + observed_wasm_code_len, + }, }) }, } diff --git a/polkadot/node/core/pvf/src/prepare/worker_interface.rs b/polkadot/node/core/pvf/src/prepare/worker_interface.rs index 22ee93319d84..d29d2717c4b6 100644 --- a/polkadot/node/core/pvf/src/prepare/worker_interface.rs +++ b/polkadot/node/core/pvf/src/prepare/worker_interface.rs @@ -211,7 +211,7 @@ async fn handle_response( // https://github.com/paritytech/polkadot-sdk/issues/2399 let PrepareWorkerSuccess { checksum: _, - stats: PrepareStats { cpu_time_elapsed, memory_stats }, + stats: PrepareStats { cpu_time_elapsed, memory_stats, observed_wasm_code_len }, } = match result.clone() { Ok(result) => result, // Timed out on the child. This should already be logged by the child. @@ -221,6 +221,8 @@ async fn handle_response( Err(err) => return Outcome::Concluded { worker, result: Err(err) }, }; + metrics.observe_code_size(observed_wasm_code_len as usize); + if cpu_time_elapsed > preparation_timeout { // The job didn't complete within the timeout. gum::warn!( @@ -267,7 +269,11 @@ async fn handle_response( result: Ok(PrepareSuccess { path: artifact_path, size, - stats: PrepareStats { cpu_time_elapsed, memory_stats: memory_stats.clone() }, + stats: PrepareStats { + cpu_time_elapsed, + memory_stats: memory_stats.clone(), + observed_wasm_code_len, + }, }), }, Err(err) => { From 105af417491a375975e03951b69b3a7ce817bcb5 Mon Sep 17 00:00:00 2001 From: Dmitry Sinyavin Date: Thu, 1 Aug 2024 18:54:56 +0200 Subject: [PATCH 04/12] clippy --- polkadot/node/core/candidate-validation/src/tests.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/polkadot/node/core/candidate-validation/src/tests.rs b/polkadot/node/core/candidate-validation/src/tests.rs index d62f3ae0c76c..55282fdf4ee1 100644 --- a/polkadot/node/core/candidate-validation/src/tests.rs +++ b/polkadot/node/core/candidate-validation/src/tests.rs @@ -20,7 +20,7 @@ use super::*; use assert_matches::assert_matches; use futures::executor; use polkadot_node_core_pvf::PrepareError; -use polkadot_node_primitives::{BlockData, POV_BOMB_LIMIT, VALIDATION_CODE_BOMB_LIMIT}; +use polkadot_node_primitives::{BlockData, VALIDATION_CODE_BOMB_LIMIT}; use polkadot_node_subsystem::messages::AllMessages; use polkadot_node_subsystem_util::reexports::SubsystemContext; use polkadot_overseer::ActivatedLeaf; From c29f58b3ebe3e05177d4ab29daa49736ca144587 Mon Sep 17 00:00:00 2001 From: Dmitry Sinyavin Date: Thu, 1 Aug 2024 19:54:23 +0200 Subject: [PATCH 05/12] Fix non-Linux build --- polkadot/node/core/pvf/prepare-worker/src/lib.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/polkadot/node/core/pvf/prepare-worker/src/lib.rs b/polkadot/node/core/pvf/prepare-worker/src/lib.rs index 09b8640da4c5..795404174c74 100644 --- a/polkadot/node/core/pvf/prepare-worker/src/lib.rs +++ b/polkadot/node/core/pvf/prepare-worker/src/lib.rs @@ -517,11 +517,11 @@ fn handle_child_process( "prepare worker", move || { #[allow(unused_mut)] - let mut result = prepare_artifact(pvf); + let mut result = prepare_artifact(pvf).map(|o| (o,)); // Get the `ru_maxrss` stat. If supported, call getrusage for the thread. #[cfg(target_os = "linux")] - let mut result = result.map(|artifact| (artifact, get_max_rss_thread())); + let mut result = result.map(|outcome| (outcome.0, get_max_rss_thread())); // If we are pre-checking, check for runtime construction errors. // From 91f3e7a0e67e88ca183a059341419840954f1db3 Mon Sep 17 00:00:00 2001 From: Dmitry Sinyavin Date: Thu, 1 Aug 2024 20:03:03 +0200 Subject: [PATCH 06/12] More non-Linux fixes --- polkadot/node/core/pvf/prepare-worker/src/lib.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/polkadot/node/core/pvf/prepare-worker/src/lib.rs b/polkadot/node/core/pvf/prepare-worker/src/lib.rs index 795404174c74..f8ebb6effcec 100644 --- a/polkadot/node/core/pvf/prepare-worker/src/lib.rs +++ b/polkadot/node/core/pvf/prepare-worker/src/lib.rs @@ -575,7 +575,7 @@ fn handle_child_process( if #[cfg(target_os = "linux")] { let (PrepareOutcome { compiled_artifact, observed_wasm_code_len }, max_rss) = ok; } else { - let PrepareOutcome { compiled_artifact, observed_wasm_code_len } = ok; + let (PrepareOutcome { compiled_artifact, observed_wasm_code_len },) = ok; } } From 7eb22b1e587991fe09ab0bd29154dfbf0c100608 Mon Sep 17 00:00:00 2001 From: Dmitry Sinyavin Date: Thu, 1 Aug 2024 20:03:26 +0200 Subject: [PATCH 07/12] Move candidate validation back to non-blocking pool --- polkadot/node/overseer/src/lib.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/polkadot/node/overseer/src/lib.rs b/polkadot/node/overseer/src/lib.rs index 4e13d5eda76f..baaff9c7c9f6 100644 --- a/polkadot/node/overseer/src/lib.rs +++ b/polkadot/node/overseer/src/lib.rs @@ -466,7 +466,7 @@ pub async fn forward_events>(client: Arc

, mut hand message_capacity=2048, )] pub struct Overseer { - #[subsystem(blocking, CandidateValidationMessage, sends: [ + #[subsystem(CandidateValidationMessage, sends: [ RuntimeApiMessage, ])] candidate_validation: CandidateValidation, From d65af1e31ee3db3c2547571eebafe50590e5ae46 Mon Sep 17 00:00:00 2001 From: Dmitry Sinyavin Date: Fri, 2 Aug 2024 10:49:19 +0200 Subject: [PATCH 08/12] Bootstrap prdoc --- prdoc/pr_5142.prdoc | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) create mode 100644 prdoc/pr_5142.prdoc diff --git a/prdoc/pr_5142.prdoc b/prdoc/pr_5142.prdoc new file mode 100644 index 000000000000..da1e58e4cb77 --- /dev/null +++ b/prdoc/pr_5142.prdoc @@ -0,0 +1,16 @@ +# Schema: Polkadot SDK PRDoc Schema (prdoc) v1.0.0 +# See doc at https://raw.githubusercontent.com/paritytech/polkadot-sdk/master/prdoc/schema_user.json + +title: "Move decompression to worker processes" + +doc: + - audience: Node Dev + description: | + Candidate validation subsystem performed the PVF code decompression as well as the PoV + decompression itself which might affect the subsystem main loop performance and required + it to run on the blocking threadpool. This change moves the decompression to PVF host + workers running synchronously in separate processed. + +crates: + - name: polkadot-node-core-candidate-validation + bump: patch From 1f630e1589e6b7394853aeaa17f007471ef05d7e Mon Sep 17 00:00:00 2001 From: Dmitry Sinyavin Date: Fri, 2 Aug 2024 11:23:37 +0200 Subject: [PATCH 09/12] Update prdoc --- prdoc/pr_5142.prdoc | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/prdoc/pr_5142.prdoc b/prdoc/pr_5142.prdoc index da1e58e4cb77..4ea51fb03956 100644 --- a/prdoc/pr_5142.prdoc +++ b/prdoc/pr_5142.prdoc @@ -14,3 +14,13 @@ doc: crates: - name: polkadot-node-core-candidate-validation bump: patch + - name: polkadot-overseer + bump: patch + - name: polkadot-node-core-pvf + bump: patch + - name: polkadot-node-core-pvf-common + bump: patch + - name: polkadot-node-core-pvf-execute-worker + bump: patch + - name: polkadot-node-core-pvf-prepare-worker + bump: patch From c45a43be1eab3ae71c647d33c4c30516871631df Mon Sep 17 00:00:00 2001 From: s0me0ne-unkn0wn <48632512+s0me0ne-unkn0wn@users.noreply.github.com> Date: Fri, 2 Aug 2024 11:37:00 +0200 Subject: [PATCH 10/12] Update prdoc --- prdoc/pr_5142.prdoc | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/prdoc/pr_5142.prdoc b/prdoc/pr_5142.prdoc index 4ea51fb03956..c4478941c0b7 100644 --- a/prdoc/pr_5142.prdoc +++ b/prdoc/pr_5142.prdoc @@ -17,9 +17,9 @@ crates: - name: polkadot-overseer bump: patch - name: polkadot-node-core-pvf - bump: patch + bump: major - name: polkadot-node-core-pvf-common - bump: patch + bump: major - name: polkadot-node-core-pvf-execute-worker bump: patch - name: polkadot-node-core-pvf-prepare-worker From 9452c07604bc4d54e7d7e827639dbc51cdf29ef5 Mon Sep 17 00:00:00 2001 From: s0me0ne-unkn0wn <48632512+s0me0ne-unkn0wn@users.noreply.github.com> Date: Thu, 8 Aug 2024 13:24:08 +0200 Subject: [PATCH 11/12] Remove commented out code --- polkadot/node/core/pvf/src/host.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/polkadot/node/core/pvf/src/host.rs b/polkadot/node/core/pvf/src/host.rs index 96c686a2b145..44a4cba2fbf8 100644 --- a/polkadot/node/core/pvf/src/host.rs +++ b/polkadot/node/core/pvf/src/host.rs @@ -154,7 +154,6 @@ struct ExecutePvfInputs { exec_timeout: Duration, pvd: Arc, pov: Arc, - // params: Vec, priority: Priority, result_tx: ResultSender, } From 1877bba1e97a00f269c837bdbaa1e5de16043718 Mon Sep 17 00:00:00 2001 From: Dmitry Sinyavin Date: Fri, 9 Aug 2024 17:29:02 +0200 Subject: [PATCH 12/12] Address discussions --- polkadot/node/core/pvf/src/metrics.rs | 2 ++ prdoc/pr_5142.prdoc | 2 +- 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/polkadot/node/core/pvf/src/metrics.rs b/polkadot/node/core/pvf/src/metrics.rs index 9404ba543eaa..c59cab464180 100644 --- a/polkadot/node/core/pvf/src/metrics.rs +++ b/polkadot/node/core/pvf/src/metrics.rs @@ -340,6 +340,8 @@ impl metrics::Metrics for Metrics { )?, registry, )?, + // The following metrics was moved here from the candidate valiidation subsystem. + // Names are kept to avoid breaking dashboards and stuff. pov_size: prometheus::register( prometheus::HistogramVec::new( prometheus::HistogramOpts::new( diff --git a/prdoc/pr_5142.prdoc b/prdoc/pr_5142.prdoc index c4478941c0b7..4083e5bf53cd 100644 --- a/prdoc/pr_5142.prdoc +++ b/prdoc/pr_5142.prdoc @@ -9,7 +9,7 @@ doc: Candidate validation subsystem performed the PVF code decompression as well as the PoV decompression itself which might affect the subsystem main loop performance and required it to run on the blocking threadpool. This change moves the decompression to PVF host - workers running synchronously in separate processed. + workers running synchronously in separate processes. crates: - name: polkadot-node-core-candidate-validation