diff --git a/Cargo.lock b/Cargo.lock index 588012408d76f..56a294a5eae36 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -15483,7 +15483,6 @@ dependencies = [ name = "polkadot-node-core-pvf-prepare-worker" version = "7.0.0" dependencies = [ - "blake3", "cfg-if", "criterion", "libc", diff --git a/polkadot/node/core/candidate-validation/src/lib.rs b/polkadot/node/core/candidate-validation/src/lib.rs index 0c2bcc281daf7..c4913b6b31b63 100644 --- a/polkadot/node/core/candidate-validation/src/lib.rs +++ b/polkadot/node/core/candidate-validation/src/lib.rs @@ -965,6 +965,8 @@ async fn validate_candidate_exhaustive( Ok(ValidationResult::Invalid(InvalidCandidate::ExecutionError(err))), Err(ValidationError::PossiblyInvalid(PossiblyInvalidError::RuntimeConstruction(err))) => Ok(ValidationResult::Invalid(InvalidCandidate::ExecutionError(err))), + Err(ValidationError::PossiblyInvalid(err @ PossiblyInvalidError::CorruptedArtifact)) => + Ok(ValidationResult::Invalid(InvalidCandidate::ExecutionError(err.to_string()))), Err(ValidationError::PossiblyInvalid(PossiblyInvalidError::AmbiguousJobDeath(err))) => Ok(ValidationResult::Invalid(InvalidCandidate::ExecutionError(format!( @@ -1148,7 +1150,7 @@ trait ValidationBackend { let mut num_death_retries_left = 1; let mut num_job_error_retries_left = 1; let mut num_internal_retries_left = 1; - let mut num_runtime_construction_retries_left = 1; + let mut num_execution_error_retries_left = 1; loop { // Stop retrying if we exceeded the timeout. if total_time_start.elapsed() + retry_delay > exec_timeout { @@ -1168,9 +1170,10 @@ trait ValidationBackend { break_if_no_retries_left!(num_internal_retries_left), Err(ValidationError::PossiblyInvalid( - PossiblyInvalidError::RuntimeConstruction(_), + PossiblyInvalidError::RuntimeConstruction(_) | + PossiblyInvalidError::CorruptedArtifact, )) => { - break_if_no_retries_left!(num_runtime_construction_retries_left); + break_if_no_retries_left!(num_execution_error_retries_left); self.precheck_pvf(pvf.clone()).await?; // In this case the error is deterministic // And a retry forces the ValidationBackend diff --git a/polkadot/node/core/pvf/common/src/execute.rs b/polkadot/node/core/pvf/common/src/execute.rs index cff3f3b86e952..cf5ae19897a17 100644 --- a/polkadot/node/core/pvf/common/src/execute.rs +++ b/polkadot/node/core/pvf/common/src/execute.rs @@ -81,6 +81,8 @@ pub enum JobResponse { InvalidCandidate(String), /// PoV decompression failed PoVDecompressionFailure, + /// The artifact is corrupted, re-prepare the artifact and try again. + CorruptedArtifact, } impl JobResponse { diff --git a/polkadot/node/core/pvf/common/src/lib.rs b/polkadot/node/core/pvf/common/src/lib.rs index 30d0aa4452811..49544e5d42a53 100644 --- a/polkadot/node/core/pvf/common/src/lib.rs +++ b/polkadot/node/core/pvf/common/src/lib.rs @@ -33,6 +33,7 @@ pub use sp_tracing; const LOG_TARGET: &str = "parachain::pvf-common"; use codec::{Decode, Encode}; +use sp_core::H256; use std::{ io::{self, Read, Write}, mem, @@ -88,6 +89,15 @@ pub fn framed_recv_blocking(r: &mut (impl Read + Unpin)) -> io::Result> Ok(buf) } +#[derive(Debug, Default, Clone, Copy, Encode, Decode, PartialEq, Eq)] +#[repr(transparent)] +pub struct ArtifactChecksum(H256); + +/// Compute the checksum of the given artifact. +pub fn compute_checksum(data: &[u8]) -> ArtifactChecksum { + ArtifactChecksum(H256::from_slice(&sp_crypto_hashing::twox_256(data))) +} + #[cfg(all(test, not(feature = "test-utils")))] mod tests { use super::*; diff --git a/polkadot/node/core/pvf/common/src/prepare.rs b/polkadot/node/core/pvf/common/src/prepare.rs index 4cd1beb309918..c56daf81fcb5f 100644 --- a/polkadot/node/core/pvf/common/src/prepare.rs +++ b/polkadot/node/core/pvf/common/src/prepare.rs @@ -14,6 +14,7 @@ // You should have received a copy of the GNU General Public License // along with Polkadot. If not, see . +use crate::ArtifactChecksum; use codec::{Decode, Encode}; use std::path::PathBuf; @@ -21,7 +22,7 @@ use std::path::PathBuf; #[derive(Debug, Clone, Default, Encode, Decode)] pub struct PrepareWorkerSuccess { /// Checksum of the compiled PVF. - pub checksum: String, + pub checksum: ArtifactChecksum, /// Stats of the current preparation run. pub stats: PrepareStats, } @@ -29,6 +30,8 @@ pub struct PrepareWorkerSuccess { /// Result of PVF preparation if successful. #[derive(Debug, Clone, Default)] pub struct PrepareSuccess { + /// Checksum of the compiled PVF. + pub checksum: ArtifactChecksum, /// Canonical path to the compiled artifact. pub path: PathBuf, /// Size in bytes diff --git a/polkadot/node/core/pvf/execute-worker/src/lib.rs b/polkadot/node/core/pvf/execute-worker/src/lib.rs index 4b7c167cc9ec3..88a3af985bb5d 100644 --- a/polkadot/node/core/pvf/execute-worker/src/lib.rs +++ b/polkadot/node/core/pvf/execute-worker/src/lib.rs @@ -39,6 +39,7 @@ use nix::{ unistd::{ForkResult, Pid}, }; use polkadot_node_core_pvf_common::{ + compute_checksum, error::InternalValidationError, execute::{Handshake, JobError, JobResponse, JobResult, WorkerError, WorkerResponse}, executor_interface::params_to_wasmtime_semantics, @@ -49,7 +50,7 @@ use polkadot_node_core_pvf_common::{ thread::{self, WaitOutcome}, PipeFd, WorkerInfo, WorkerKind, }, - worker_dir, + worker_dir, ArtifactChecksum, }; use polkadot_node_primitives::{BlockData, PoV, POV_BOMB_LIMIT}; use polkadot_parachain_primitives::primitives::ValidationResult; @@ -87,7 +88,9 @@ fn recv_execute_handshake(stream: &mut UnixStream) -> io::Result { Ok(handshake) } -fn recv_request(stream: &mut UnixStream) -> io::Result<(PersistedValidationData, PoV, Duration)> { +fn recv_request( + stream: &mut UnixStream, +) -> io::Result<(PersistedValidationData, PoV, Duration, ArtifactChecksum)> { let pvd = framed_recv_blocking(stream)?; let pvd = PersistedValidationData::decode(&mut &pvd[..]).map_err(|_| { io::Error::new( @@ -111,7 +114,17 @@ fn recv_request(stream: &mut UnixStream) -> io::Result<(PersistedValidationData, "execute pvf recv_request: failed to decode duration".to_string(), ) })?; - Ok((pvd, pov, execution_timeout)) + + let artifact_checksum = framed_recv_blocking(stream)?; + let artifact_checksum = + ArtifactChecksum::decode(&mut &artifact_checksum[..]).map_err(|_| { + io::Error::new( + io::ErrorKind::Other, + "execute pvf recv_request: failed to decode artifact checksum".to_string(), + ) + })?; + + Ok((pvd, pov, execution_timeout, artifact_checksum)) } /// Sends an error to the host and returns the original error wrapped in `io::Error`. @@ -166,14 +179,15 @@ pub fn worker_entrypoint( let execute_thread_stack_size = max_stack_size(&executor_params); loop { - let (pvd, pov, execution_timeout) = recv_request(&mut stream).map_err(|e| { - map_and_send_err!( - e, - InternalValidationError::HostCommunication, - &mut stream, - worker_info - ) - })?; + let (pvd, pov, execution_timeout, artifact_checksum) = recv_request(&mut stream) + .map_err(|e| { + map_and_send_err!( + e, + InternalValidationError::HostCommunication, + &mut stream, + worker_info + ) + })?; gum::debug!( target: LOG_TARGET, ?worker_info, @@ -192,6 +206,19 @@ pub fn worker_entrypoint( ) })?; + if artifact_checksum != compute_checksum(&compiled_artifact_blob) { + send_result::( + &mut stream, + Ok(WorkerResponse { + job_response: JobResponse::CorruptedArtifact, + duration: Duration::ZERO, + pov_size: 0, + }), + worker_info, + )?; + continue; + } + let (pipe_read_fd, pipe_write_fd) = pipe2_cloexec().map_err(|e| { map_and_send_err!( e, diff --git a/polkadot/node/core/pvf/prepare-worker/Cargo.toml b/polkadot/node/core/pvf/prepare-worker/Cargo.toml index c54d48540388c..9921cf73aea05 100644 --- a/polkadot/node/core/pvf/prepare-worker/Cargo.toml +++ b/polkadot/node/core/pvf/prepare-worker/Cargo.toml @@ -16,7 +16,6 @@ name = "prepare_rococo_runtime" harness = false [dependencies] -blake3 = { workspace = true } cfg-if = { workspace = true } gum = { workspace = true, default-features = true } libc = { workspace = true } diff --git a/polkadot/node/core/pvf/prepare-worker/src/lib.rs b/polkadot/node/core/pvf/prepare-worker/src/lib.rs index 533abe414a0a9..cfe86200624a8 100644 --- a/polkadot/node/core/pvf/prepare-worker/src/lib.rs +++ b/polkadot/node/core/pvf/prepare-worker/src/lib.rs @@ -26,6 +26,7 @@ const LOG_TARGET: &str = "parachain::pvf-prepare-worker"; use crate::memory_stats::max_rss_stat::{extract_max_rss_stat, get_max_rss_thread}; #[cfg(any(target_os = "linux", feature = "jemalloc-allocator"))] use crate::memory_stats::memory_tracker::{get_memory_tracker_loop_stats, memory_tracker_loop}; +use codec::{Decode, Encode}; use nix::{ errno::Errno, sys::{ @@ -35,22 +36,17 @@ use nix::{ unistd::{ForkResult, Pid}, }; use polkadot_node_core_pvf_common::{ - executor_interface::{prepare, prevalidate}, - worker::{pipe2_cloexec, PipeFd, WorkerInfo}, -}; - -use codec::{Decode, Encode}; -use polkadot_node_core_pvf_common::{ + compute_checksum, error::{PrepareError, PrepareWorkerResult}, - executor_interface::create_runtime_from_artifact_bytes, + executor_interface::{create_runtime_from_artifact_bytes, prepare, prevalidate}, framed_recv_blocking, framed_send_blocking, prepare::{MemoryStats, PrepareJobKind, PrepareStats, PrepareWorkerSuccess}, pvf::PvfPrepData, worker::{ - cpu_time_monitor_loop, get_total_cpu_usage, recv_child_response, run_worker, send_result, - stringify_errno, stringify_panic_payload, + cpu_time_monitor_loop, get_total_cpu_usage, pipe2_cloexec, recv_child_response, run_worker, + send_result, stringify_errno, stringify_panic_payload, thread::{self, spawn_worker_thread, WaitOutcome}, - WorkerKind, + PipeFd, WorkerInfo, WorkerKind, }, worker_dir, ProcessTime, }; @@ -718,7 +714,7 @@ fn handle_parent_process( return Err(PrepareError::IoErr(err.to_string())) }; - let checksum = blake3::hash(&artifact.as_ref()).to_hex().to_string(); + let checksum = compute_checksum(&artifact.as_ref()); Ok(PrepareWorkerSuccess { checksum, stats: PrepareStats { diff --git a/polkadot/node/core/pvf/src/artifacts.rs b/polkadot/node/core/pvf/src/artifacts.rs index 1126a0c90c8ce..3530de0993978 100644 --- a/polkadot/node/core/pvf/src/artifacts.rs +++ b/polkadot/node/core/pvf/src/artifacts.rs @@ -56,7 +56,7 @@ use crate::{host::PrecheckResultSender, worker_interface::WORKER_DIR_PREFIX}; use always_assert::always; -use polkadot_node_core_pvf_common::{error::PrepareError, pvf::PvfPrepData}; +use polkadot_node_core_pvf_common::{error::PrepareError, pvf::PvfPrepData, ArtifactChecksum}; use polkadot_parachain_primitives::primitives::ValidationCodeHash; use polkadot_primitives::ExecutorParamsPrepHash; use std::{ @@ -120,11 +120,12 @@ impl ArtifactId { pub struct ArtifactPathId { pub(crate) id: ArtifactId, pub(crate) path: PathBuf, + pub(crate) checksum: ArtifactChecksum, } impl ArtifactPathId { - pub(crate) fn new(artifact_id: ArtifactId, path: &Path) -> Self { - Self { id: artifact_id, path: path.to_owned() } + pub(crate) fn new(artifact_id: ArtifactId, path: &Path, checksum: ArtifactChecksum) -> Self { + Self { id: artifact_id, path: path.to_owned(), checksum } } } @@ -135,6 +136,8 @@ pub enum ArtifactState { /// That means that the artifact should be accessible through the path obtained by the artifact /// id (unless, it was removed externally). Prepared { + /// The checksum of the compiled artifact. + checksum: ArtifactChecksum, /// The path of the compiled artifact. path: PathBuf, /// The time when the artifact was last needed. @@ -212,6 +215,21 @@ impl Artifacts { self.inner.keys().cloned().collect() } + #[cfg(feature = "test-utils")] + pub fn replace_artifact_checksum( + &mut self, + checksum: ArtifactChecksum, + new_checksum: ArtifactChecksum, + ) { + for artifact in self.inner.values_mut() { + if let ArtifactState::Prepared { checksum: c, .. } = artifact { + if *c == checksum { + *c = new_checksum; + } + } + } + } + /// Create an empty table and the cache directory on-disk if it doesn't exist. pub async fn new(cache_path: &Path) -> Self { // Make sure that the cache path directory and all its parents are created. @@ -265,13 +283,14 @@ impl Artifacts { &mut self, artifact_id: ArtifactId, path: PathBuf, + checksum: ArtifactChecksum, last_time_needed: SystemTime, size: u64, ) { // See the precondition. always!(self .inner - .insert(artifact_id, ArtifactState::Prepared { path, last_time_needed, size }) + .insert(artifact_id, ArtifactState::Prepared { path, checksum, last_time_needed, size }) .is_none()); } @@ -376,18 +395,21 @@ mod tests { artifacts.insert_prepared( artifact_id1.clone(), path1.clone(), + Default::default(), mock_now - Duration::from_secs(5), 1024, ); artifacts.insert_prepared( artifact_id2.clone(), path2.clone(), + Default::default(), mock_now - Duration::from_secs(10), 1024, ); artifacts.insert_prepared( artifact_id3.clone(), path3.clone(), + Default::default(), mock_now - Duration::from_secs(15), 1024, ); @@ -421,18 +443,21 @@ mod tests { artifacts.insert_prepared( artifact_id1.clone(), path1.clone(), + Default::default(), mock_now - Duration::from_secs(5), 1024, ); artifacts.insert_prepared( artifact_id2.clone(), path2.clone(), + Default::default(), mock_now - Duration::from_secs(10), 1024, ); artifacts.insert_prepared( artifact_id3.clone(), path3.clone(), + Default::default(), mock_now - Duration::from_secs(15), 1024, ); diff --git a/polkadot/node/core/pvf/src/error.rs b/polkadot/node/core/pvf/src/error.rs index e68ba595ef5a1..f00956ab336be 100644 --- a/polkadot/node/core/pvf/src/error.rs +++ b/polkadot/node/core/pvf/src/error.rs @@ -98,6 +98,9 @@ pub enum PossiblyInvalidError { /// Possibly related to local issues or dirty node update. May be retried with re-preparation. #[error("possibly invalid: runtime construction: {0}")] RuntimeConstruction(String), + /// The artifact is corrupted, re-prepare the artifact and try again. + #[error("possibly invalid: artifact is corrupted")] + CorruptedArtifact, } impl From for ValidationError { diff --git a/polkadot/node/core/pvf/src/execute/queue.rs b/polkadot/node/core/pvf/src/execute/queue.rs index 69355b8fd55df..3464e0d990859 100644 --- a/polkadot/node/core/pvf/src/execute/queue.rs +++ b/polkadot/node/core/pvf/src/execute/queue.rs @@ -503,6 +503,26 @@ async fn handle_job_finish( None, ) }, + Ok(WorkerInterfaceResponse { + worker_response: WorkerResponse { job_response: JobResponse::CorruptedArtifact, .. }, + idle_worker, + }) => { + let (tx, rx) = oneshot::channel(); + queue + .from_queue_tx + .unbounded_send(FromQueue::RemoveArtifact { + artifact: artifact_id.clone(), + reply_to: tx, + }) + .expect("from execute queue receiver is listened by the host; qed"); + ( + Some(idle_worker), + Err(ValidationError::PossiblyInvalid(PossiblyInvalidError::CorruptedArtifact)), + None, + Some(rx), + None, + ) + }, Err(WorkerInterfaceError::InternalError(err)) | Err(WorkerInterfaceError::WorkerError(WorkerError::InternalError(err))) => @@ -906,7 +926,11 @@ mod tests { }); let pov = Arc::new(PoV { block_data: BlockData(b"pov".to_vec()) }); ExecuteJob { - artifact: ArtifactPathId { id: artifact_id(0), path: PathBuf::new() }, + artifact: ArtifactPathId { + id: artifact_id(0), + path: PathBuf::new(), + checksum: Default::default(), + }, exec_timeout: Duration::from_secs(10), exec_kind: PvfExecKind::Approval, pvd, @@ -1070,7 +1094,11 @@ mod tests { let mut result_rxs = vec![]; let (result_tx, _result_rx) = oneshot::channel(); let relevant_job = ExecuteJob { - artifact: ArtifactPathId { id: artifact_id(0), path: PathBuf::new() }, + artifact: ArtifactPathId { + id: artifact_id(0), + path: PathBuf::new(), + checksum: Default::default(), + }, exec_timeout: Duration::from_secs(1), exec_kind: PvfExecKind::Backing(relevant_relay_parent), pvd: Arc::new(PersistedValidationData::default()), @@ -1083,7 +1111,11 @@ mod tests { for _ in 0..10 { let (result_tx, result_rx) = oneshot::channel(); let expired_job = ExecuteJob { - artifact: ArtifactPathId { id: artifact_id(0), path: PathBuf::new() }, + artifact: ArtifactPathId { + id: artifact_id(0), + path: PathBuf::new(), + checksum: Default::default(), + }, exec_timeout: Duration::from_secs(1), exec_kind: PvfExecKind::Backing(old_relay_parent), pvd: Arc::new(PersistedValidationData::default()), diff --git a/polkadot/node/core/pvf/src/execute/worker_interface.rs b/polkadot/node/core/pvf/src/execute/worker_interface.rs index 77bd6bedd75c7..545d7c6f568bb 100644 --- a/polkadot/node/core/pvf/src/execute/worker_interface.rs +++ b/polkadot/node/core/pvf/src/execute/worker_interface.rs @@ -30,7 +30,7 @@ use futures_timer::Delay; use polkadot_node_core_pvf_common::{ error::InternalValidationError, execute::{Handshake, WorkerError, WorkerResponse}, - worker_dir, SecurityStatus, + worker_dir, ArtifactChecksum, SecurityStatus, }; use polkadot_node_primitives::PoV; use polkadot_primitives::{ExecutorParams, PersistedValidationData}; @@ -139,16 +139,18 @@ pub async fn start_work( ); with_worker_dir_setup(worker_dir, pid, &artifact.path, |worker_dir| async move { - send_request(&mut stream, pvd, pov, execution_timeout).await.map_err(|error| { - gum::warn!( - target: LOG_TARGET, - worker_pid = %pid, - validation_code_hash = ?artifact.id.code_hash, - "failed to send an execute request: {}", - error, - ); - Error::InternalError(InternalValidationError::HostCommunication(error.to_string())) - })?; + send_request(&mut stream, pvd, pov, execution_timeout, artifact.checksum) + .await + .map_err(|error| { + gum::warn!( + target: LOG_TARGET, + worker_pid = %pid, + validation_code_hash = ?artifact.id.code_hash, + "failed to send an execute request: {}", + error, + ); + Error::InternalError(InternalValidationError::HostCommunication(error.to_string())) + })?; // We use a generous timeout here. This is in addition to the one in the child process, in // case the child stalls. We have a wall clock timeout here in the host, but a CPU timeout @@ -291,10 +293,12 @@ async fn send_request( pvd: Arc, pov: Arc, execution_timeout: Duration, + artifact_checksum: ArtifactChecksum, ) -> io::Result<()> { framed_send(stream, &pvd.encode()).await?; framed_send(stream, &pov.encode()).await?; - framed_send(stream, &execution_timeout.encode()).await + framed_send(stream, &execution_timeout.encode()).await?; + framed_send(stream, &artifact_checksum.encode()).await } async fn recv_result(stream: &mut UnixStream) -> io::Result> { diff --git a/polkadot/node/core/pvf/src/host.rs b/polkadot/node/core/pvf/src/host.rs index 8252904095b3f..1ab060c69e43b 100644 --- a/polkadot/node/core/pvf/src/host.rs +++ b/polkadot/node/core/pvf/src/host.rs @@ -31,6 +31,8 @@ use futures::{ channel::{mpsc, oneshot}, Future, FutureExt, SinkExt, StreamExt, }; +#[cfg(feature = "test-utils")] +use polkadot_node_core_pvf_common::ArtifactChecksum; use polkadot_node_core_pvf_common::{ error::{PrecheckResult, PrepareError}, prepare::PrepareSuccess, @@ -159,13 +161,41 @@ impl ValidationHost { .await .map_err(|_| "the inner loop hung up".to_string()) } + + /// Replace the artifact checksum with a new one. + /// + /// Only for test purposes to imitate a corruption of the artifact on disk. + #[cfg(feature = "test-utils")] + pub async fn replace_artifact_checksum( + &mut self, + checksum: ArtifactChecksum, + new_checksum: ArtifactChecksum, + ) -> Result<(), String> { + self.to_host_tx + .send(ToHost::ReplaceArtifactChecksum { checksum, new_checksum }) + .await + .map_err(|_| "the inner loop hung up".to_string()) + } } enum ToHost { - PrecheckPvf { pvf: PvfPrepData, result_tx: PrecheckResultSender }, + PrecheckPvf { + pvf: PvfPrepData, + result_tx: PrecheckResultSender, + }, ExecutePvf(ExecutePvfInputs), - HeadsUp { active_pvfs: Vec }, - UpdateActiveLeaves { update: ActiveLeavesUpdate, ancestors: Vec }, + HeadsUp { + active_pvfs: Vec, + }, + UpdateActiveLeaves { + update: ActiveLeavesUpdate, + ancestors: Vec, + }, + #[cfg(feature = "test-utils")] + ReplaceArtifactChecksum { + checksum: ArtifactChecksum, + new_checksum: ArtifactChecksum, + }, } struct ExecutePvfInputs { @@ -507,6 +537,10 @@ async fn handle_to_host( handle_heads_up(artifacts, prepare_queue, active_pvfs).await?, ToHost::UpdateActiveLeaves { update, ancestors } => handle_update_active_leaves(execute_queue, update, ancestors).await?, + #[cfg(feature = "test-utils")] + ToHost::ReplaceArtifactChecksum { checksum, new_checksum } => { + artifacts.replace_artifact_checksum(checksum, new_checksum); + }, } Ok(()) @@ -573,7 +607,7 @@ async fn handle_execute_pvf( if let Some(state) = artifacts.artifact_state_mut(&artifact_id) { match state { - ArtifactState::Prepared { ref path, last_time_needed, .. } => { + ArtifactState::Prepared { ref path, checksum, last_time_needed, .. } => { let file_metadata = std::fs::metadata(path); if file_metadata.is_ok() { @@ -583,7 +617,7 @@ async fn handle_execute_pvf( send_execute( execute_queue, execute::ToQueue::Enqueue { - artifact: ArtifactPathId::new(artifact_id, path), + artifact: ArtifactPathId::new(artifact_id, path, *checksum), pending_execution_request: PendingExecutionRequest { exec_timeout, pvd, @@ -827,8 +861,8 @@ async fn handle_prepare_done( continue } - let path = match &result { - Ok(success) => success.path.clone(), + let (path, checksum) = match &result { + Ok(success) => (success.path.clone(), success.checksum), Err(error) => { let _ = result_tx.send(Err(ValidationError::from(error.clone()))); continue @@ -838,7 +872,7 @@ async fn handle_prepare_done( send_execute( execute_queue, execute::ToQueue::Enqueue { - artifact: ArtifactPathId::new(artifact_id.clone(), &path), + artifact: ArtifactPathId::new(artifact_id.clone(), &path, checksum), pending_execution_request: PendingExecutionRequest { exec_timeout, pvd, @@ -853,8 +887,8 @@ async fn handle_prepare_done( } *state = match result { - Ok(PrepareSuccess { path, size, .. }) => - ArtifactState::Prepared { path, last_time_needed: SystemTime::now(), size }, + Ok(PrepareSuccess { checksum, path, size, .. }) => + ArtifactState::Prepared { checksum, path, last_time_needed: SystemTime::now(), size }, Err(error) => { let last_time_failed = SystemTime::now(); let num_failures = *num_failures + 1; @@ -1239,8 +1273,20 @@ pub(crate) mod tests { builder.cleanup_config = ArtifactsCleanupConfig::new(1024, Duration::from_secs(0)); let path1 = generate_artifact_path(cache_path); let path2 = generate_artifact_path(cache_path); - builder.artifacts.insert_prepared(artifact_id(1), path1.clone(), mock_now, 1024); - builder.artifacts.insert_prepared(artifact_id(2), path2.clone(), mock_now, 1024); + builder.artifacts.insert_prepared( + artifact_id(1), + path1.clone(), + Default::default(), + mock_now, + 1024, + ); + builder.artifacts.insert_prepared( + artifact_id(2), + path2.clone(), + Default::default(), + mock_now, + 1024, + ); let mut test = builder.build(); let mut host = test.host_handle(); diff --git a/polkadot/node/core/pvf/src/prepare/worker_interface.rs b/polkadot/node/core/pvf/src/prepare/worker_interface.rs index 718416e8be76a..04044ab677c3e 100644 --- a/polkadot/node/core/pvf/src/prepare/worker_interface.rs +++ b/polkadot/node/core/pvf/src/prepare/worker_interface.rs @@ -209,7 +209,7 @@ async fn handle_response( // TODO: Add `checksum` to `ArtifactPathId`. See: // https://github.com/paritytech/polkadot-sdk/issues/2399 let PrepareWorkerSuccess { - checksum: _, + checksum, stats: PrepareStats { cpu_time_elapsed, memory_stats, observed_wasm_code_len }, } = match result.clone() { Ok(result) => result, @@ -266,6 +266,7 @@ async fn handle_response( Ok(()) => Outcome::Concluded { worker, result: Ok(PrepareSuccess { + checksum, path: artifact_path, size, stats: PrepareStats { diff --git a/polkadot/node/core/pvf/tests/it/main.rs b/polkadot/node/core/pvf/tests/it/main.rs index 9b24e7b64c89c..46dd98d92bc7d 100644 --- a/polkadot/node/core/pvf/tests/it/main.rs +++ b/polkadot/node/core/pvf/tests/it/main.rs @@ -24,6 +24,7 @@ use polkadot_node_core_pvf::{ PossiblyInvalidError, PrepareError, PrepareJobKind, PvfPrepData, ValidationError, ValidationHost, JOB_TIMEOUT_WALL_CLOCK_FACTOR, }; +use polkadot_node_core_pvf_common::{compute_checksum, ArtifactChecksum}; use polkadot_node_primitives::{PoV, POV_BOMB_LIMIT}; use polkadot_node_subsystem::messages::PvfExecKind; use polkadot_parachain_primitives::primitives::{BlockData, ValidationResult}; @@ -138,6 +139,19 @@ impl TestHost { result_rx.await.unwrap() } + async fn replace_artifact_checksum( + &self, + checksum: ArtifactChecksum, + new_checksum: ArtifactChecksum, + ) { + self.host + .lock() + .await + .replace_artifact_checksum(checksum, new_checksum) + .await + .unwrap(); + } + #[cfg(all(feature = "ci-only-tests", target_os = "linux"))] async fn security_status(&self) -> SecurityStatus { self.host.lock().await.security_status.clone() @@ -386,9 +400,10 @@ async fn deleting_prepared_artifact_does_not_dispute() { assert_matches!(result, Err(ValidationError::Invalid(InvalidCandidate::HardTimeout))); } -// Test that corruption of a prepared artifact does not lead to a dispute when we try to execute it. +// Test that corruption of a prepared artifact due to disk issues does not lead to a dispute when we +// try to execute it. #[tokio::test] -async fn corrupted_prepared_artifact_does_not_dispute() { +async fn corrupted_on_disk_prepared_artifact_does_not_dispute() { let host = TestHost::new().await; let cache_dir = host.cache_dir.path(); let pvd = PersistedValidationData { @@ -429,6 +444,95 @@ async fn corrupted_prepared_artifact_does_not_dispute() { assert!(artifact_path.path().exists()); + // Try to validate, artifact should get removed because of the corruption. + let result = host + .validate_candidate( + test_parachain_halt::wasm_binary_unwrap(), + pvd, + pov, + Default::default(), + H256::default(), + ) + .await; + + assert_matches!( + result, + Err(ValidationError::PossiblyInvalid(PossiblyInvalidError::CorruptedArtifact)) + ); + + // because of CorruptedArtifact we may retry + host.precheck_pvf(test_parachain_halt::wasm_binary_unwrap(), Default::default()) + .await + .unwrap(); + + // The actual artifact removal is done concurrently + // with sending of the result of the execution + // it is not a problem for further re-preparation as + // artifact filenames are random + for _ in 1..5 { + if !artifact_path.path().exists() { + break; + } + tokio::time::sleep(Duration::from_secs(1)).await; + } + + assert!( + !artifact_path.path().exists(), + "the corrupted artifact ({}) should be deleted by the host", + artifact_path.path().display() + ); +} + +// Test that corruption of a prepared artifact does not lead to a dispute when we try to execute it. +#[tokio::test] +async fn corrupted_prepared_artifact_does_not_dispute() { + let host = TestHost::new().await; + let cache_dir = host.cache_dir.path(); + let pvd = PersistedValidationData { + parent_head: Default::default(), + relay_parent_number: 1u32, + relay_parent_storage_root: H256::default(), + max_pov_size: 4096 * 1024, + }; + let pov = PoV { block_data: BlockData(Vec::new()) }; + + let _stats = host + .precheck_pvf(test_parachain_halt::wasm_binary_unwrap(), Default::default()) + .await + .unwrap(); + + // Manually corrupting the prepared artifact from disk. The in-memory artifacts table won't + // change. + let (artifact_path, checksum, new_checksum) = { + // Get the artifact path (asserting it exists). + let mut cache_dir: Vec<_> = std::fs::read_dir(cache_dir).unwrap().collect(); + // Should contain the artifact and the worker dir. + assert_eq!(cache_dir.len(), 2); + let mut artifact_path = cache_dir.pop().unwrap().unwrap(); + if artifact_path.path().is_dir() { + artifact_path = cache_dir.pop().unwrap().unwrap(); + } + + let checksum = + compute_checksum(&std::fs::read(artifact_path.path()).expect("artifact exists")); + let new_artifact = b"corrupted wasm"; + let new_checksum = compute_checksum(new_artifact); + + // Corrupt the artifact. + let mut f = std::fs::OpenOptions::new() + .write(true) + .truncate(true) + .open(artifact_path.path()) + .unwrap(); + f.write_all(new_artifact).unwrap(); + f.flush().unwrap(); + (artifact_path, checksum, new_checksum) + }; + + assert!(artifact_path.path().exists()); + + host.replace_artifact_checksum(checksum, new_checksum).await; + // Try to validate, artifact should get removed because of the corruption. let result = host .validate_candidate( diff --git a/prdoc/pr_8833.prdoc b/prdoc/pr_8833.prdoc new file mode 100644 index 0000000000000..e3d82d324bc58 --- /dev/null +++ b/prdoc/pr_8833.prdoc @@ -0,0 +1,15 @@ +title: 'Check artifact integrity before execution' +doc: +- audience: Node Dev + description: In case of a corrupted artifact, we can find it out before execution and re-prepare the artifact. +crates: +- name: polkadot-node-core-pvf-common + bump: major +- name: polkadot-node-core-pvf + bump: major +- name: polkadot-node-core-pvf-prepare-worker + bump: minor +- name: polkadot-node-core-pvf-execute-worker + bump: minor +- name: polkadot-node-core-candidate-validation + bump: minor