Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 6 additions & 3 deletions polkadot/node/core/candidate-validation/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -965,6 +965,8 @@ async fn validate_candidate_exhaustive(
Ok(ValidationResult::Invalid(InvalidCandidate::ExecutionError(err))),
Err(ValidationError::PossiblyInvalid(PossiblyInvalidError::RuntimeConstruction(err))) =>
Ok(ValidationResult::Invalid(InvalidCandidate::ExecutionError(err))),
Err(ValidationError::PossiblyInvalid(err @ PossiblyInvalidError::CorruptedArtifact)) =>
Ok(ValidationResult::Invalid(InvalidCandidate::ExecutionError(err.to_string()))),

Err(ValidationError::PossiblyInvalid(PossiblyInvalidError::AmbiguousJobDeath(err))) =>
Ok(ValidationResult::Invalid(InvalidCandidate::ExecutionError(format!(
Expand Down Expand Up @@ -1148,7 +1150,7 @@ trait ValidationBackend {
let mut num_death_retries_left = 1;
let mut num_job_error_retries_left = 1;
let mut num_internal_retries_left = 1;
let mut num_runtime_construction_retries_left = 1;
let mut num_execution_error_retries_left = 1;
loop {
// Stop retrying if we exceeded the timeout.
if total_time_start.elapsed() + retry_delay > exec_timeout {
Expand All @@ -1168,9 +1170,10 @@ trait ValidationBackend {
break_if_no_retries_left!(num_internal_retries_left),

Err(ValidationError::PossiblyInvalid(
PossiblyInvalidError::RuntimeConstruction(_),
PossiblyInvalidError::RuntimeConstruction(_) |
PossiblyInvalidError::CorruptedArtifact,
Comment thread
eskimor marked this conversation as resolved.
)) => {
break_if_no_retries_left!(num_runtime_construction_retries_left);
break_if_no_retries_left!(num_execution_error_retries_left);
self.precheck_pvf(pvf.clone()).await?;
// In this case the error is deterministic
// And a retry forces the ValidationBackend
Expand Down
2 changes: 2 additions & 0 deletions polkadot/node/core/pvf/common/src/execute.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,8 @@ pub enum JobResponse {
InvalidCandidate(String),
/// PoV decompression failed
PoVDecompressionFailure,
/// The artifact is corrupted, re-prepare the artifact and try again.
CorruptedArtifact,
}

impl JobResponse {
Expand Down
10 changes: 10 additions & 0 deletions polkadot/node/core/pvf/common/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ pub use sp_tracing;
const LOG_TARGET: &str = "parachain::pvf-common";

use codec::{Decode, Encode};
use sp_core::H256;
use std::{
io::{self, Read, Write},
mem,
Expand Down Expand Up @@ -88,6 +89,15 @@ pub fn framed_recv_blocking(r: &mut (impl Read + Unpin)) -> io::Result<Vec<u8>>
Ok(buf)
}

#[derive(Debug, Default, Clone, Copy, Encode, Decode, PartialEq, Eq)]
#[repr(transparent)]
pub struct ArtifactChecksum(H256);

/// Compute the checksum of the given artifact.
pub fn compute_checksum(data: &[u8]) -> ArtifactChecksum {
ArtifactChecksum(H256::from_slice(&sp_crypto_hashing::twox_256(data)))
}

#[cfg(all(test, not(feature = "test-utils")))]
mod tests {
use super::*;
Expand Down
5 changes: 4 additions & 1 deletion polkadot/node/core/pvf/common/src/prepare.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,21 +14,24 @@
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.

use crate::ArtifactChecksum;
use codec::{Decode, Encode};
use std::path::PathBuf;

/// Result from prepare worker if successful.
#[derive(Debug, Clone, Default, Encode, Decode)]
pub struct PrepareWorkerSuccess {
/// Checksum of the compiled PVF.
pub checksum: String,
pub checksum: ArtifactChecksum,
/// Stats of the current preparation run.
pub stats: PrepareStats,
}

/// Result of PVF preparation if successful.
#[derive(Debug, Clone, Default)]
pub struct PrepareSuccess {
/// Checksum of the compiled PVF.
pub checksum: ArtifactChecksum,
/// Canonical path to the compiled artifact.
pub path: PathBuf,
/// Size in bytes
Expand Down
49 changes: 38 additions & 11 deletions polkadot/node/core/pvf/execute-worker/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ use nix::{
unistd::{ForkResult, Pid},
};
use polkadot_node_core_pvf_common::{
compute_checksum,
error::InternalValidationError,
execute::{Handshake, JobError, JobResponse, JobResult, WorkerError, WorkerResponse},
executor_interface::params_to_wasmtime_semantics,
Expand All @@ -49,7 +50,7 @@ use polkadot_node_core_pvf_common::{
thread::{self, WaitOutcome},
PipeFd, WorkerInfo, WorkerKind,
},
worker_dir,
worker_dir, ArtifactChecksum,
};
use polkadot_node_primitives::{BlockData, PoV, POV_BOMB_LIMIT};
use polkadot_parachain_primitives::primitives::ValidationResult;
Expand Down Expand Up @@ -87,7 +88,9 @@ fn recv_execute_handshake(stream: &mut UnixStream) -> io::Result<Handshake> {
Ok(handshake)
}

fn recv_request(stream: &mut UnixStream) -> io::Result<(PersistedValidationData, PoV, Duration)> {
fn recv_request(
stream: &mut UnixStream,
) -> io::Result<(PersistedValidationData, PoV, Duration, ArtifactChecksum)> {
let pvd = framed_recv_blocking(stream)?;
let pvd = PersistedValidationData::decode(&mut &pvd[..]).map_err(|_| {
io::Error::new(
Expand All @@ -111,7 +114,17 @@ fn recv_request(stream: &mut UnixStream) -> io::Result<(PersistedValidationData,
"execute pvf recv_request: failed to decode duration".to_string(),
)
})?;
Ok((pvd, pov, execution_timeout))

let artifact_checksum = framed_recv_blocking(stream)?;
let artifact_checksum =
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm NOT encouraging to change this right away, but...

  1. Why do we want to encode a raw 32-byte sequence? Why not transfer it as a raw 32-byte sequence?
  2. If we ought to encode, why don't we encode the entire tuple and do one-by-one instead?

Maybe a good candidate for a refactoring issue? I bet single recv() and single decode() are somewhat more performant than one-by-ones.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's do it in another pr

ArtifactChecksum::decode(&mut &artifact_checksum[..]).map_err(|_| {
io::Error::new(
io::ErrorKind::Other,
"execute pvf recv_request: failed to decode artifact checksum".to_string(),
)
})?;

Ok((pvd, pov, execution_timeout, artifact_checksum))
}

/// Sends an error to the host and returns the original error wrapped in `io::Error`.
Expand Down Expand Up @@ -166,14 +179,15 @@ pub fn worker_entrypoint(
let execute_thread_stack_size = max_stack_size(&executor_params);

loop {
let (pvd, pov, execution_timeout) = recv_request(&mut stream).map_err(|e| {
map_and_send_err!(
e,
InternalValidationError::HostCommunication,
&mut stream,
worker_info
)
})?;
let (pvd, pov, execution_timeout, artifact_checksum) = recv_request(&mut stream)
.map_err(|e| {
map_and_send_err!(
e,
InternalValidationError::HostCommunication,
&mut stream,
worker_info
)
})?;
gum::debug!(
target: LOG_TARGET,
?worker_info,
Expand All @@ -192,6 +206,19 @@ pub fn worker_entrypoint(
)
})?;

if artifact_checksum != compute_checksum(&compiled_artifact_blob) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How much does this take for 10MiB, 100MiB ?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Blake3's throughput is ~3Gb/sec on what is close to our reference hw AFAIR

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

According to the crate's benchmark data, 10 MiB with Blake3 takes 1-2 ms. Twox should be at least 3x faster.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, so we are not worried about this eating up too much time.

send_result::<WorkerResponse, WorkerError>(
&mut stream,
Ok(WorkerResponse {
job_response: JobResponse::CorruptedArtifact,
duration: Duration::ZERO,
pov_size: 0,
}),
worker_info,
)?;
continue;
}

let (pipe_read_fd, pipe_write_fd) = pipe2_cloexec().map_err(|e| {
map_and_send_err!(
e,
Expand Down
1 change: 0 additions & 1 deletion polkadot/node/core/pvf/prepare-worker/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ name = "prepare_rococo_runtime"
harness = false

[dependencies]
blake3 = { workspace = true }
cfg-if = { workspace = true }
gum = { workspace = true, default-features = true }
libc = { workspace = true }
Expand Down
18 changes: 7 additions & 11 deletions polkadot/node/core/pvf/prepare-worker/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ const LOG_TARGET: &str = "parachain::pvf-prepare-worker";
use crate::memory_stats::max_rss_stat::{extract_max_rss_stat, get_max_rss_thread};
#[cfg(any(target_os = "linux", feature = "jemalloc-allocator"))]
use crate::memory_stats::memory_tracker::{get_memory_tracker_loop_stats, memory_tracker_loop};
use codec::{Decode, Encode};
use nix::{
errno::Errno,
sys::{
Expand All @@ -35,22 +36,17 @@ use nix::{
unistd::{ForkResult, Pid},
};
use polkadot_node_core_pvf_common::{
executor_interface::{prepare, prevalidate},
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks messy, but I just merged two imports of polkadot_node_core_pvf_common. In fact, only compute_checksum is newly imported.

worker::{pipe2_cloexec, PipeFd, WorkerInfo},
};

use codec::{Decode, Encode};
use polkadot_node_core_pvf_common::{
compute_checksum,
error::{PrepareError, PrepareWorkerResult},
executor_interface::create_runtime_from_artifact_bytes,
executor_interface::{create_runtime_from_artifact_bytes, prepare, prevalidate},
framed_recv_blocking, framed_send_blocking,
prepare::{MemoryStats, PrepareJobKind, PrepareStats, PrepareWorkerSuccess},
pvf::PvfPrepData,
worker::{
cpu_time_monitor_loop, get_total_cpu_usage, recv_child_response, run_worker, send_result,
stringify_errno, stringify_panic_payload,
cpu_time_monitor_loop, get_total_cpu_usage, pipe2_cloexec, recv_child_response, run_worker,
send_result, stringify_errno, stringify_panic_payload,
thread::{self, spawn_worker_thread, WaitOutcome},
WorkerKind,
PipeFd, WorkerInfo, WorkerKind,
},
worker_dir, ProcessTime,
};
Expand Down Expand Up @@ -718,7 +714,7 @@ fn handle_parent_process(
return Err(PrepareError::IoErr(err.to_string()))
};

let checksum = blake3::hash(&artifact.as_ref()).to_hex().to_string();
let checksum = compute_checksum(&artifact.as_ref());
Ok(PrepareWorkerSuccess {
checksum,
stats: PrepareStats {
Expand Down
33 changes: 29 additions & 4 deletions polkadot/node/core/pvf/src/artifacts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@

use crate::{host::PrecheckResultSender, worker_interface::WORKER_DIR_PREFIX};
use always_assert::always;
use polkadot_node_core_pvf_common::{error::PrepareError, pvf::PvfPrepData};
use polkadot_node_core_pvf_common::{error::PrepareError, pvf::PvfPrepData, ArtifactChecksum};
use polkadot_parachain_primitives::primitives::ValidationCodeHash;
use polkadot_primitives::ExecutorParamsPrepHash;
use std::{
Expand Down Expand Up @@ -120,11 +120,12 @@ impl ArtifactId {
pub struct ArtifactPathId {
pub(crate) id: ArtifactId,
pub(crate) path: PathBuf,
pub(crate) checksum: ArtifactChecksum,
}

impl ArtifactPathId {
pub(crate) fn new(artifact_id: ArtifactId, path: &Path) -> Self {
Self { id: artifact_id, path: path.to_owned() }
pub(crate) fn new(artifact_id: ArtifactId, path: &Path, checksum: ArtifactChecksum) -> Self {
Self { id: artifact_id, path: path.to_owned(), checksum }
}
}

Expand All @@ -135,6 +136,8 @@ pub enum ArtifactState {
/// That means that the artifact should be accessible through the path obtained by the artifact
/// id (unless, it was removed externally).
Prepared {
/// The checksum of the compiled artifact.
checksum: ArtifactChecksum,
/// The path of the compiled artifact.
path: PathBuf,
/// The time when the artifact was last needed.
Expand Down Expand Up @@ -212,6 +215,21 @@ impl Artifacts {
self.inner.keys().cloned().collect()
}

#[cfg(feature = "test-utils")]
pub fn replace_artifact_checksum(
&mut self,
checksum: ArtifactChecksum,
new_checksum: ArtifactChecksum,
) {
for artifact in self.inner.values_mut() {
if let ArtifactState::Prepared { checksum: c, .. } = artifact {
if *c == checksum {
*c = new_checksum;
}
}
}
}

/// Create an empty table and the cache directory on-disk if it doesn't exist.
pub async fn new(cache_path: &Path) -> Self {
// Make sure that the cache path directory and all its parents are created.
Expand Down Expand Up @@ -265,13 +283,14 @@ impl Artifacts {
&mut self,
artifact_id: ArtifactId,
path: PathBuf,
checksum: ArtifactChecksum,
last_time_needed: SystemTime,
size: u64,
) {
// See the precondition.
always!(self
.inner
.insert(artifact_id, ArtifactState::Prepared { path, last_time_needed, size })
.insert(artifact_id, ArtifactState::Prepared { path, checksum, last_time_needed, size })
.is_none());
}

Expand Down Expand Up @@ -376,18 +395,21 @@ mod tests {
artifacts.insert_prepared(
artifact_id1.clone(),
path1.clone(),
Default::default(),
mock_now - Duration::from_secs(5),
1024,
);
artifacts.insert_prepared(
artifact_id2.clone(),
path2.clone(),
Default::default(),
mock_now - Duration::from_secs(10),
1024,
);
artifacts.insert_prepared(
artifact_id3.clone(),
path3.clone(),
Default::default(),
mock_now - Duration::from_secs(15),
1024,
);
Expand Down Expand Up @@ -421,18 +443,21 @@ mod tests {
artifacts.insert_prepared(
artifact_id1.clone(),
path1.clone(),
Default::default(),
mock_now - Duration::from_secs(5),
1024,
);
artifacts.insert_prepared(
artifact_id2.clone(),
path2.clone(),
Default::default(),
mock_now - Duration::from_secs(10),
1024,
);
artifacts.insert_prepared(
artifact_id3.clone(),
path3.clone(),
Default::default(),
mock_now - Duration::from_secs(15),
1024,
);
Expand Down
3 changes: 3 additions & 0 deletions polkadot/node/core/pvf/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,9 @@ pub enum PossiblyInvalidError {
/// Possibly related to local issues or dirty node update. May be retried with re-preparation.
#[error("possibly invalid: runtime construction: {0}")]
RuntimeConstruction(String),
/// The artifact is corrupted, re-prepare the artifact and try again.
#[error("possibly invalid: artifact is corrupted")]
CorruptedArtifact,
}

impl From<PrepareError> for ValidationError {
Expand Down
Loading
Loading