From 5303d8c2099451ad05fe7a303d8b367a0a9be0c3 Mon Sep 17 00:00:00 2001 From: Marcin S Date: Mon, 31 Jul 2023 15:35:42 +0200 Subject: [PATCH] 98.6% OF DEVELOPERS CANNOT REVIEW THIS PR! [read more...] (#7337) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * [WIP] PVF: Split out worker binaries * Address compilation problems and re-design a bit * Reorganize once more, fix tests * Reformat with new nightly to make `cargo fmt` test happy * Address `clippy` warnings * Add temporary trace to debug zombienet tests * Fix zombienet node upgrade test * Fix malus and its CI * Fix building worker binaries with malus * More fixes for malus * Remove unneeded cli subcommands * Support placing auxiliary binaries to `/usr/libexec` * Fix spelling * Spelling Co-authored-by: Marcin S. * Implement review comments (mostly nits) * Fix worker node version flag * Rework getting the worker paths * Address a couple of review comments * Minor restructuring * Fix CI error * Add tests for worker binaries detection * Improve tests; try to fix CI * Move workers module into separate file * Try to fix failing test and workers not printing latest version - Tests were not finding the worker binaries - Workers were not being rebuilt when the version changed - Made some errors easier to read * Make a bunch of fixes * Rebuild nodes on version change * Fix more issues * Fix tests * Pass node version from node into dependencies to avoid recompiles - [X] get version in CLI - [X] pass it in to service - [X] pass version along to PVF - [X] remove rerun from service - [X] add rerun to CLI - [X] don’t rerun pvf/worker’s (these should be built by nodes which have rerun enabled) * Some more improvements for smoother tests - [X] Fix tests - [X] Make puppet workers pass None for version and remove rerun - [X] Make test collators self-contained * Add back rerun to PVF workers * Move worker binaries into files in cli crate As a final optimization I've separated out each worker binary from its own crate into the CLI crate. Before, the worker bin shared a crate with the worker lib, so when the binaries got recompiled so did the libs and everything transitively depending on the libs. This commit fixes this regression that was causing recompiles after every commit. * Fix bug (was passing worker version for node version) * Move workers out of cli into root src/bin/ dir - [X] Pass in node version from top-level (polkadot) - [X] Add build.rs with rerun-git-head to root dir * Add some sanity checks for workers to dockerfiles * Update malus + [X] Make it self-contained + [X] Undo multiple binary changes * Try to fix clippy errors * Address `cargo run` issue - [X] Add default-run for polkadot - [X] Add note about installation to error * Update readme (installation instructions) * Allow disabling external workers for local/testing setups + [X] cli flag to enable single-binary mode + [X] Add message to error * Revert unnecessary Cargo.lock changes * Remove unnecessary build scripts from collators * Add back missing malus commands (should fix failing ZN job) * Some minor fixes * Update Cargo.lock * Fix some build errors * Undo self-contained binaries; cli flag to disable version check + [X] Remove --dont-run-external-workers + [X] Add --disable-worker-version-check + [X] Remove PVF subcommands + [X] Redo malus changes * Try to fix failing job and add some docs for local tests --------- Co-authored-by: Dmitry Sinyavin Co-authored-by: s0me0ne-unkn0wn <48632512+s0me0ne-unkn0wn@users.noreply.github.com> Co-authored-by: parity-processbot <> --- Cargo.lock | 59 +- Cargo.toml | 18 + README.md | 4 +- node/core/pvf/build.rs => build.rs | 3 + cli/Cargo.toml | 4 - cli/build.rs | 3 + cli/src/cli.rs | 23 +- cli/src/command.rs | 78 +-- node/core/candidate-validation/Cargo.toml | 1 + node/core/candidate-validation/src/lib.rs | 41 +- node/core/pvf/common/Cargo.toml | 3 - node/core/pvf/common/src/lib.rs | 4 + node/core/pvf/common/src/worker/mod.rs | 61 +- node/core/pvf/execute-worker/src/lib.rs | 239 ++++---- node/core/pvf/prepare-worker/src/lib.rs | 328 +++++------ node/core/pvf/src/execute/queue.rs | 37 +- node/core/pvf/src/execute/worker_intf.rs | 14 +- node/core/pvf/src/host.rs | 26 +- node/core/pvf/src/lib.rs | 2 +- node/core/pvf/src/prepare/pool.rs | 20 +- node/core/pvf/src/prepare/worker_intf.rs | 13 +- node/core/pvf/src/testing.rs | 38 +- node/core/pvf/src/worker_intf.rs | 8 +- node/core/pvf/tests/it/main.rs | 3 +- node/malus/Cargo.toml | 29 +- node/{core/pvf/common => malus}/build.rs | 3 + node/malus/src/malus.rs | 37 -- node/metrics/README.md | 9 + node/service/Cargo.toml | 4 + node/service/src/lib.rs | 145 +++-- node/service/src/overseer.rs | 2 +- node/service/src/workers.rs | 520 ++++++++++++++++++ node/test/service/README.md | 9 + node/test/service/src/lib.rs | 40 +- .../adder/collator/src/main.rs | 28 +- .../undying/collator/src/main.rs | 28 +- .../ci/dockerfiles/malus_injected.Dockerfile | 4 +- .../polkadot_injected_debug.Dockerfile | 4 +- .../polkadot_injected_release.Dockerfile | 2 + scripts/ci/gitlab/pipeline/build.yml | 6 +- scripts/ci/gitlab/pipeline/test.yml | 3 + scripts/ci/gitlab/pipeline/zombienet.yml | 2 +- src/bin/execute-worker.rs | 23 + src/bin/prepare-worker.rs | 23 + tests/workers.rs | 38 ++ zombienet_tests/README.md | 2 +- .../misc/0002-download-polkadot-from-pr.sh | 6 +- zombienet_tests/misc/0002-upgrade-node.zndsl | 6 +- 48 files changed, 1432 insertions(+), 571 deletions(-) rename node/core/pvf/build.rs => build.rs (80%) rename node/{core/pvf/common => malus}/build.rs (80%) create mode 100644 node/metrics/README.md create mode 100644 node/service/src/workers.rs create mode 100644 node/test/service/README.md create mode 100644 src/bin/execute-worker.rs create mode 100644 src/bin/prepare-worker.rs create mode 100644 tests/workers.rs diff --git a/Cargo.lock b/Cargo.lock index b421cfbfd0c6..1fc6175ad3a0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1503,6 +1503,19 @@ dependencies = [ "syn 1.0.109", ] +[[package]] +name = "dashmap" +version = "5.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "907076dfda823b0b36d2a1bb5f90c96660a5bbcd7729e10727f07858f22c4edc" +dependencies = [ + "cfg-if", + "hashbrown 0.12.3", + "lock_api", + "once_cell", + "parking_lot_core 0.9.6", +] + [[package]] name = "data-encoding" version = "2.4.0" @@ -3366,6 +3379,15 @@ dependencies = [ "windows-sys 0.48.0", ] +[[package]] +name = "is_executable" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fa9acdc6d67b75e626ad644734e8bc6df893d9cd2a834129065d3dd6158ea9c8" +dependencies = [ + "winapi", +] + [[package]] name = "itertools" version = "0.10.3" @@ -6414,8 +6436,11 @@ dependencies = [ "polkadot-cli", "polkadot-core-primitives", "polkadot-node-core-pvf", + "polkadot-node-core-pvf-common", + "polkadot-node-core-pvf-execute-worker", "polkadot-node-core-pvf-prepare-worker", "polkadot-overseer", + "substrate-build-script-utils", "substrate-rpc-client", "tempfile", "tikv-jemallocator", @@ -6541,8 +6566,6 @@ dependencies = [ "frame-benchmarking-cli", "futures", "log", - "polkadot-node-core-pvf-execute-worker", - "polkadot-node-core-pvf-prepare-worker", "polkadot-node-metrics", "polkadot-performance-test", "polkadot-service", @@ -6852,6 +6875,7 @@ dependencies = [ "polkadot-node-subsystem", "polkadot-node-subsystem-test-helpers", "polkadot-node-subsystem-util", + "polkadot-overseer", "polkadot-parachain", "polkadot-primitives", "polkadot-primitives-test-helpers", @@ -7042,7 +7066,6 @@ dependencies = [ "sp-externalities", "sp-io", "sp-tracing", - "substrate-build-script-utils", "tempfile", "tokio", "tracing-gum", @@ -7663,6 +7686,7 @@ dependencies = [ "frame-system-rpc-runtime-api", "futures", "hex-literal 0.4.1", + "is_executable", "kusama-runtime", "kusama-runtime-constants", "kvdb", @@ -7697,6 +7721,7 @@ dependencies = [ "polkadot-node-core-dispute-coordinator", "polkadot-node-core-parachains-inherent", "polkadot-node-core-provisioner", + "polkadot-node-core-pvf", "polkadot-node-core-pvf-checker", "polkadot-node-core-runtime-api", "polkadot-node-network-protocol", @@ -7742,6 +7767,7 @@ dependencies = [ "sc-transaction-pool-api", "serde", "serde_json", + "serial_test", "sp-api", "sp-authority-discovery", "sp-block-builder", @@ -7858,6 +7884,7 @@ dependencies = [ "polkadot-node-core-backing", "polkadot-node-core-candidate-validation", "polkadot-node-core-dispute-coordinator", + "polkadot-node-core-pvf-common", "polkadot-node-core-pvf-execute-worker", "polkadot-node-core-pvf-prepare-worker", "polkadot-node-primitives", @@ -7869,6 +7896,7 @@ dependencies = [ "rand 0.8.5", "sp-core", "sp-keystore", + "substrate-build-script-utils", "tracing-gum", ] @@ -10388,6 +10416,31 @@ dependencies = [ "serde", ] +[[package]] +name = "serial_test" +version = "2.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0e56dd856803e253c8f298af3f4d7eb0ae5e23a737252cd90bb4f3b435033b2d" +dependencies = [ + "dashmap", + "futures", + "lazy_static", + "log", + "parking_lot 0.12.1", + "serial_test_derive", +] + +[[package]] +name = "serial_test_derive" +version = "2.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "91d129178576168c589c9ec973feedf7d3126c01ac2bf08795109aa35b69fb8f" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.20", +] + [[package]] name = "sha-1" version = "0.8.2" diff --git a/Cargo.toml b/Cargo.toml index c23837b9c5ed..760c6ce39533 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -2,12 +2,21 @@ name = "polkadot" path = "src/main.rs" +[[bin]] +name = "polkadot-execute-worker" +path = "src/bin/execute-worker.rs" + +[[bin]] +name = "polkadot-prepare-worker" +path = "src/bin/prepare-worker.rs" + [package] name = "polkadot" description = "Implementation of a `https://polkadot.network` node in Rust based on the Substrate framework." license = "GPL-3.0-only" rust-version = "1.64.0" # workspace properties readme = "README.md" +default-run = "polkadot" authors.workspace = true edition.workspace = true version.workspace = true @@ -28,6 +37,10 @@ polkadot-node-core-pvf = { path = "node/core/pvf" } polkadot-node-core-pvf-prepare-worker = { path = "node/core/pvf/prepare-worker" } polkadot-overseer = { path = "node/overseer" } +# Needed for worker binaries. +polkadot-node-core-pvf-common = { path = "node/core/pvf/common" } +polkadot-node-core-pvf-execute-worker = { path = "node/core/pvf/execute-worker" } + [dev-dependencies] assert_cmd = "2.0.4" nix = { version = "0.26.1", features = ["signal"] } @@ -36,6 +49,9 @@ tokio = "1.24.2" substrate-rpc-client = { git = "https://github.com/paritytech/substrate", branch = "master" } polkadot-core-primitives = { path = "core-primitives" } +[build-dependencies] +substrate-build-script-utils = { git = "https://github.com/paritytech/substrate", branch = "master" } + [workspace] members = [ "cli", @@ -226,6 +242,8 @@ license-file = ["LICENSE", "0"] maintainer-scripts = "scripts/packaging/deb-maintainer-scripts" assets = [ ["target/release/polkadot", "/usr/bin/", "755"], + ["target/release/polkadot-prepare-worker", "/usr/lib/polkadot/", "755"], + ["target/release/polkadot-execute-worker", "/usr/lib/polkadot/", "755"], ["scripts/packaging/polkadot.service", "/lib/systemd/system/", "644"] ] conf-files = [ diff --git a/README.md b/README.md index 1f64c941b5ad..f3d1f5e276cd 100644 --- a/README.md +++ b/README.md @@ -91,7 +91,9 @@ git checkout cargo build --release ``` -Note that compilation is a memory intensive process. We recommend having 4 GiB of physical RAM or swap available (keep in mind that if a build hits swap it tends to be very slow). +**Note:** compilation is a memory intensive process. We recommend having 4 GiB of physical RAM or swap available (keep in mind that if a build hits swap it tends to be very slow). + +**Note:** if you want to move the built `polkadot` binary somewhere (e.g. into $PATH) you will also need to move `polkadot-execute-worker` and `polkadot-prepare-worker`. You can let cargo do all this for you by running `cargo install --path .`. #### Build from Source with Docker diff --git a/node/core/pvf/build.rs b/build.rs similarity index 80% rename from node/core/pvf/build.rs rename to build.rs index 40e9f832586e..84fe22e23ed6 100644 --- a/node/core/pvf/build.rs +++ b/build.rs @@ -16,4 +16,7 @@ fn main() { substrate_build_script_utils::generate_cargo_keys(); + // For the node/worker version check, make sure we always rebuild the node and binary workers + // when the version changes. + substrate_build_script_utils::rerun_if_git_head_changed(); } diff --git a/cli/Cargo.toml b/cli/Cargo.toml index e7aa562880cc..7b782644125a 100644 --- a/cli/Cargo.toml +++ b/cli/Cargo.toml @@ -22,8 +22,6 @@ pyro = { package = "pyroscope", version = "0.5.3", optional = true } pyroscope_pprofrs = { version = "0.2", optional = true } service = { package = "polkadot-service", path = "../node/service", default-features = false, optional = true } -polkadot-node-core-pvf-execute-worker = { path = "../node/core/pvf/execute-worker", optional = true } -polkadot-node-core-pvf-prepare-worker = { path = "../node/core/pvf/prepare-worker", optional = true } polkadot-performance-test = { path = "../node/test/performance-test", optional = true } sp-core = { git = "https://github.com/paritytech/substrate", branch = "master" } @@ -53,8 +51,6 @@ cli = [ "sc-tracing", "frame-benchmarking-cli", "try-runtime-cli", - "polkadot-node-core-pvf-execute-worker", - "polkadot-node-core-pvf-prepare-worker", "service", ] runtime-benchmarks = [ diff --git a/cli/build.rs b/cli/build.rs index 018ea752a009..483cc04163fc 100644 --- a/cli/build.rs +++ b/cli/build.rs @@ -19,4 +19,7 @@ fn main() { println!("cargo:rustc-cfg=build_type=\"{}\"", profile); } substrate_build_script_utils::generate_cargo_keys(); + // For the node/worker version check, make sure we always rebuild the node when the version + // changes. + substrate_build_script_utils::rerun_if_git_head_changed(); } diff --git a/cli/src/cli.rs b/cli/src/cli.rs index 69c54b428a92..00ec54f8d969 100644 --- a/cli/src/cli.rs +++ b/cli/src/cli.rs @@ -17,6 +17,10 @@ //! Polkadot CLI library. use clap::Parser; +use std::path::PathBuf; + +/// The version of the node. The passed-in version of the workers should match this. +pub const NODE_VERSION: &'static str = env!("SUBSTRATE_CLI_IMPL_VERSION"); #[allow(missing_docs)] #[derive(Debug, Parser)] @@ -42,14 +46,6 @@ pub enum Subcommand { /// Revert the chain to a previous state. Revert(sc_cli::RevertCmd), - #[allow(missing_docs)] - #[command(name = "prepare-worker", hide = true)] - PvfPrepareWorker(ValidationWorkerCommand), - - #[allow(missing_docs)] - #[command(name = "execute-worker", hide = true)] - PvfExecuteWorker(ValidationWorkerCommand), - /// Sub-commands concerned with benchmarking. /// The pallet benchmarking moved to the `pallet` sub-command. #[command(subcommand)] @@ -148,6 +144,17 @@ pub struct RunCmd { /// **Dangerous!** Do not touch unless explicitly adviced to. #[arg(long)] pub overseer_channel_capacity_override: Option, + + /// Path to the directory where auxiliary worker binaries reside. If not specified, the main + /// binary's directory is searched first, then `/usr/lib/polkadot` is searched. TESTING ONLY: if + /// the path points to an executable rather then directory, that executable is used both as + /// preparation and execution worker. + #[arg(long, value_name = "PATH")] + pub workers_path: Option, + + /// TESTING ONLY: disable the version check between nodes and workers. + #[arg(long, hide = true)] + pub disable_worker_version_check: bool, } #[allow(missing_docs)] diff --git a/cli/src/command.rs b/cli/src/command.rs index 8697f999b545..ee71bb0840dc 100644 --- a/cli/src/command.rs +++ b/cli/src/command.rs @@ -14,7 +14,7 @@ // You should have received a copy of the GNU General Public License // along with Polkadot. If not, see . -use crate::cli::{Cli, Subcommand}; +use crate::cli::{Cli, Subcommand, NODE_VERSION}; use frame_benchmarking_cli::{BenchmarkCmd, ExtrinsicFactory, SUBSTRATE_REFERENCE_HARDWARE}; use futures::future::TryFutureExt; use log::info; @@ -55,7 +55,7 @@ impl SubstrateCli for Cli { } fn impl_version() -> String { - env!("SUBSTRATE_CLI_IMPL_VERSION").into() + NODE_VERSION.into() } fn description() -> String { @@ -272,6 +272,9 @@ where None }; + let node_version = + if cli.run.disable_worker_version_check { None } else { Some(NODE_VERSION.to_string()) }; + runner.run_node_until_exit(move |config| async move { let hwbench = (!cli.run.no_hardware_benchmarks) .then_some(config.database.path().map(|database_path| { @@ -283,16 +286,23 @@ where let database_source = config.database.clone(); let task_manager = service::build_full( config, - service::IsCollator::No, - grandpa_pause, - enable_beefy, - jaeger_agent, - None, - false, - overseer_gen, - cli.run.overseer_channel_capacity_override, - maybe_malus_finality_delay, - hwbench, + service::NewFullParams { + is_collator: service::IsCollator::No, + grandpa_pause, + enable_beefy, + jaeger_agent, + telemetry_worker_handle: None, + node_version, + workers_path: cli.run.workers_path, + workers_names: None, + overseer_enable_anyways: false, + overseer_gen, + overseer_message_channel_capacity_override: cli + .run + .overseer_channel_capacity_override, + malus_finality_delay: maybe_malus_finality_delay, + hwbench, + }, ) .map(|full| full.task_manager)?; @@ -419,50 +429,6 @@ pub fn run() -> Result<()> { )) })?) }, - Some(Subcommand::PvfPrepareWorker(cmd)) => { - let mut builder = sc_cli::LoggerBuilder::new(""); - builder.with_colors(false); - let _ = builder.init(); - - #[cfg(target_os = "android")] - { - return Err(sc_cli::Error::Input( - "PVF preparation workers are not supported under this platform".into(), - ) - .into()) - } - - #[cfg(not(target_os = "android"))] - { - polkadot_node_core_pvf_prepare_worker::worker_entrypoint( - &cmd.socket_path, - Some(&cmd.node_impl_version), - ); - Ok(()) - } - }, - Some(Subcommand::PvfExecuteWorker(cmd)) => { - let mut builder = sc_cli::LoggerBuilder::new(""); - builder.with_colors(false); - let _ = builder.init(); - - #[cfg(target_os = "android")] - { - return Err(sc_cli::Error::Input( - "PVF execution workers are not supported under this platform".into(), - ) - .into()) - } - - #[cfg(not(target_os = "android"))] - { - polkadot_node_core_pvf_execute_worker::worker_entrypoint( - &cmd.socket_path, - Some(&cmd.node_impl_version), - ); - Ok(()) - } - }, Some(Subcommand::Benchmark(cmd)) => { let runner = cli.create_runner(cmd)?; let chain_spec = &runner.config().chain_spec; diff --git a/node/core/candidate-validation/Cargo.toml b/node/core/candidate-validation/Cargo.toml index 515aabbb3b41..c0fca9a49996 100644 --- a/node/core/candidate-validation/Cargo.toml +++ b/node/core/candidate-validation/Cargo.toml @@ -19,6 +19,7 @@ polkadot-node-primitives = { path = "../../primitives" } polkadot-node-subsystem = { path = "../../subsystem" } polkadot-node-subsystem-util = { path = "../../subsystem-util" } polkadot-node-metrics = { path = "../../metrics" } +polkadot-overseer = { path = "../../overseer" } [target.'cfg(not(any(target_os = "android", target_os = "unknown")))'.dependencies] polkadot-node-core-pvf = { path = "../pvf" } diff --git a/node/core/candidate-validation/src/lib.rs b/node/core/candidate-validation/src/lib.rs index b3055afd5772..93a7e05c8724 100644 --- a/node/core/candidate-validation/src/lib.rs +++ b/node/core/candidate-validation/src/lib.rs @@ -93,9 +93,12 @@ const DEFAULT_APPROVAL_EXECUTION_TIMEOUT: Duration = Duration::from_secs(12); pub struct Config { /// The path where candidate validation can store compiled artifacts for PVFs. pub artifacts_cache_path: PathBuf, - /// The path to the executable which can be used for spawning PVF compilation & validation - /// workers. - pub program_path: PathBuf, + /// The version of the node. `None` can be passed to skip the version check (only for tests). + pub node_version: Option, + /// Path to the preparation worker binary + pub prep_worker_path: PathBuf, + /// Path to the execution worker binary + pub exec_worker_path: PathBuf, } /// The candidate validation subsystem. @@ -104,7 +107,7 @@ pub struct CandidateValidationSubsystem { pub metrics: Metrics, #[allow(missing_docs)] pub pvf_metrics: polkadot_node_core_pvf::Metrics, - config: Config, + config: Option, } impl CandidateValidationSubsystem { @@ -113,7 +116,7 @@ impl CandidateValidationSubsystem { /// /// Check out [`IsolationStrategy`] to get more details. pub fn with_config( - config: Config, + config: Option, metrics: Metrics, pvf_metrics: polkadot_node_core_pvf::Metrics, ) -> Self { @@ -124,16 +127,14 @@ impl CandidateValidationSubsystem { #[overseer::subsystem(CandidateValidation, error=SubsystemError, prefix=self::overseer)] impl CandidateValidationSubsystem { fn start(self, ctx: Context) -> SpawnedSubsystem { - let future = run( - ctx, - self.metrics, - self.pvf_metrics, - self.config.artifacts_cache_path, - self.config.program_path, - ) - .map_err(|e| SubsystemError::with_origin("candidate-validation", e)) - .boxed(); - SpawnedSubsystem { name: "candidate-validation-subsystem", future } + if let Some(config) = self.config { + let future = run(ctx, self.metrics, self.pvf_metrics, config) + .map_err(|e| SubsystemError::with_origin("candidate-validation", e)) + .boxed(); + SpawnedSubsystem { name: "candidate-validation-subsystem", future } + } else { + polkadot_overseer::DummySubsystem.start(ctx) + } } } @@ -142,11 +143,15 @@ async fn run( mut ctx: Context, metrics: Metrics, pvf_metrics: polkadot_node_core_pvf::Metrics, - cache_path: PathBuf, - program_path: PathBuf, + Config { artifacts_cache_path, node_version, prep_worker_path, exec_worker_path }: Config, ) -> SubsystemResult<()> { let (validation_host, task) = polkadot_node_core_pvf::start( - polkadot_node_core_pvf::Config::new(cache_path, program_path), + polkadot_node_core_pvf::Config::new( + artifacts_cache_path, + node_version, + prep_worker_path, + exec_worker_path, + ), pvf_metrics, ); ctx.spawn_blocking("pvf-validation-host", task.boxed())?; diff --git a/node/core/pvf/common/Cargo.toml b/node/core/pvf/common/Cargo.toml index 52a0fb37c569..be119297cbc3 100644 --- a/node/core/pvf/common/Cargo.toml +++ b/node/core/pvf/common/Cargo.toml @@ -31,6 +31,3 @@ landlock = "0.2.0" [dev-dependencies] assert_matches = "1.4.0" tempfile = "3.3.0" - -[build-dependencies] -substrate-build-script-utils = { git = "https://github.com/paritytech/substrate", branch = "master" } diff --git a/node/core/pvf/common/src/lib.rs b/node/core/pvf/common/src/lib.rs index 028fd9b17947..e5737a66aaec 100644 --- a/node/core/pvf/common/src/lib.rs +++ b/node/core/pvf/common/src/lib.rs @@ -25,6 +25,10 @@ pub mod worker; pub use cpu_time::ProcessTime; +// Used by `decl_worker_main!`. +#[doc(hidden)] +pub use sp_tracing; + const LOG_TARGET: &str = "parachain::pvf-common"; use std::mem; diff --git a/node/core/pvf/common/src/worker/mod.rs b/node/core/pvf/common/src/worker/mod.rs index 748994fa78a0..8dd99fc762d8 100644 --- a/node/core/pvf/common/src/worker/mod.rs +++ b/node/core/pvf/common/src/worker/mod.rs @@ -33,34 +33,54 @@ use tokio::{io, net::UnixStream, runtime::Runtime}; /// spawning the desired worker. #[macro_export] macro_rules! decl_worker_main { - ($expected_command:expr, $entrypoint:expr) => { + ($expected_command:expr, $entrypoint:expr, $worker_version:expr) => { + fn print_help(expected_command: &str) { + println!("{} {}", expected_command, $worker_version); + println!(); + println!("PVF worker that is called by polkadot."); + } + fn main() { - ::sp_tracing::try_init_simple(); + $crate::sp_tracing::try_init_simple(); let args = std::env::args().collect::>(); - if args.len() < 3 { - panic!("wrong number of arguments"); + if args.len() == 1 { + print_help($expected_command); + return } - let mut version = None; + match args[1].as_ref() { + "--help" | "-h" => { + print_help($expected_command); + return + }, + "--version" | "-v" => { + println!("{}", $worker_version); + return + }, + subcommand => { + // Must be passed for compatibility with the single-binary test workers. + if subcommand != $expected_command { + panic!( + "trying to run {} binary with the {} subcommand", + $expected_command, subcommand + ) + } + }, + } + + let mut node_version = None; let mut socket_path: &str = ""; - for i in 2..args.len() { + for i in (2..args.len()).step_by(2) { match args[i].as_ref() { "--socket-path" => socket_path = args[i + 1].as_str(), - "--node-version" => version = Some(args[i + 1].as_str()), - _ => (), + "--node-impl-version" => node_version = Some(args[i + 1].as_str()), + arg => panic!("Unexpected argument found: {}", arg), } } - let subcommand = &args[1]; - if subcommand != $expected_command { - panic!( - "trying to run {} binary with the {} subcommand", - $expected_command, subcommand - ) - } - $entrypoint(&socket_path, version); + $entrypoint(&socket_path, node_version, Some($worker_version)); } }; } @@ -75,10 +95,13 @@ pub fn bytes_to_path(bytes: &[u8]) -> Option { std::str::from_utf8(bytes).ok().map(PathBuf::from) } +// The worker version must be passed in so that we accurately get the version of the worker, and not +// the version that this crate was compiled with. pub fn worker_event_loop( debug_id: &'static str, socket_path: &str, node_version: Option<&str>, + worker_version: Option<&str>, mut event_loop: F, ) where F: FnMut(UnixStream) -> Fut, @@ -88,11 +111,13 @@ pub fn worker_event_loop( gum::debug!(target: LOG_TARGET, %worker_pid, "starting pvf worker ({})", debug_id); // Check for a mismatch between the node and worker versions. - if let Some(version) = node_version { - if version != env!("SUBSTRATE_CLI_IMPL_VERSION") { + if let (Some(node_version), Some(worker_version)) = (node_version, worker_version) { + if node_version != worker_version { gum::error!( target: LOG_TARGET, %worker_pid, + %node_version, + %worker_version, "Node and worker version mismatch, node needs restarting, forcing shutdown", ); kill_parent_node_in_emergency(); diff --git a/node/core/pvf/execute-worker/src/lib.rs b/node/core/pvf/execute-worker/src/lib.rs index b2714b60a6ee..d90cac2522fd 100644 --- a/node/core/pvf/execute-worker/src/lib.rs +++ b/node/core/pvf/execute-worker/src/lib.rs @@ -121,136 +121,153 @@ async fn send_response(stream: &mut UnixStream, response: Response) -> io::Resul /// `node_version`, if `Some`, is checked against the worker version. A mismatch results in /// immediate worker termination. `None` is used for tests and in other situations when version /// check is not necessary. -pub fn worker_entrypoint(socket_path: &str, node_version: Option<&str>) { - worker_event_loop("execute", socket_path, node_version, |mut stream| async move { - let worker_pid = std::process::id(); +pub fn worker_entrypoint( + socket_path: &str, + node_version: Option<&str>, + worker_version: Option<&str>, +) { + worker_event_loop( + "execute", + socket_path, + node_version, + worker_version, + |mut stream| async move { + let worker_pid = std::process::id(); - let handshake = recv_handshake(&mut stream).await?; - let executor = Executor::new(handshake.executor_params).map_err(|e| { - io::Error::new(io::ErrorKind::Other, format!("cannot create executor: {}", e)) - })?; + let handshake = recv_handshake(&mut stream).await?; + let executor = Executor::new(handshake.executor_params).map_err(|e| { + io::Error::new(io::ErrorKind::Other, format!("cannot create executor: {}", e)) + })?; - loop { - let (artifact_path, params, execution_timeout) = recv_request(&mut stream).await?; - gum::debug!( - target: LOG_TARGET, - %worker_pid, - "worker: validating artifact {}", - artifact_path.display(), - ); + loop { + let (artifact_path, params, execution_timeout) = recv_request(&mut stream).await?; + gum::debug!( + target: LOG_TARGET, + %worker_pid, + "worker: validating artifact {}", + artifact_path.display(), + ); - // Get the artifact bytes. - // - // We do this outside the thread so that we can lock down filesystem access there. - let compiled_artifact_blob = match std::fs::read(artifact_path) { - Ok(bytes) => bytes, - Err(err) => { - let response = Response::InternalError( - InternalValidationError::CouldNotOpenFile(err.to_string()), - ); - send_response(&mut stream, response).await?; - continue - }, - }; + // Get the artifact bytes. + // + // We do this outside the thread so that we can lock down filesystem access there. + let compiled_artifact_blob = match std::fs::read(artifact_path) { + Ok(bytes) => bytes, + Err(err) => { + let response = Response::InternalError( + InternalValidationError::CouldNotOpenFile(err.to_string()), + ); + send_response(&mut stream, response).await?; + continue + }, + }; - // Conditional variable to notify us when a thread is done. - let condvar = thread::get_condvar(); + // Conditional variable to notify us when a thread is done. + let condvar = thread::get_condvar(); - let cpu_time_start = ProcessTime::now(); + let cpu_time_start = ProcessTime::now(); - // Spawn a new thread that runs the CPU time monitor. - let (cpu_time_monitor_tx, cpu_time_monitor_rx) = channel::<()>(); - let cpu_time_monitor_thread = thread::spawn_worker_thread( - "cpu time monitor thread", - move || { - cpu_time_monitor_loop(cpu_time_start, execution_timeout, cpu_time_monitor_rx) - }, - Arc::clone(&condvar), - WaitOutcome::TimedOut, - )?; - let executor_2 = executor.clone(); - let execute_thread = thread::spawn_worker_thread_with_stack_size( - "execute thread", - move || { - // Try to enable landlock. - #[cfg(target_os = "linux")] + // Spawn a new thread that runs the CPU time monitor. + let (cpu_time_monitor_tx, cpu_time_monitor_rx) = channel::<()>(); + let cpu_time_monitor_thread = thread::spawn_worker_thread( + "cpu time monitor thread", + move || { + cpu_time_monitor_loop( + cpu_time_start, + execution_timeout, + cpu_time_monitor_rx, + ) + }, + Arc::clone(&condvar), + WaitOutcome::TimedOut, + )?; + let executor_2 = executor.clone(); + let execute_thread = thread::spawn_worker_thread_with_stack_size( + "execute thread", + move || { + // Try to enable landlock. + #[cfg(target_os = "linux")] let landlock_status = polkadot_node_core_pvf_common::worker::security::landlock::try_restrict_thread() .map(LandlockStatus::from_ruleset_status) .map_err(|e| e.to_string()); - #[cfg(not(target_os = "linux"))] - let landlock_status: Result = Ok(LandlockStatus::NotEnforced); - - ( - validate_using_artifact( - &compiled_artifact_blob, - ¶ms, - executor_2, - cpu_time_start, - ), - landlock_status, - ) - }, - Arc::clone(&condvar), - WaitOutcome::Finished, - EXECUTE_THREAD_STACK_SIZE, - )?; + #[cfg(not(target_os = "linux"))] + let landlock_status: Result = Ok(LandlockStatus::NotEnforced); - let outcome = thread::wait_for_threads(condvar); - - let response = match outcome { - WaitOutcome::Finished => { - let _ = cpu_time_monitor_tx.send(()); - let (result, landlock_status) = execute_thread.join().unwrap_or_else(|e| { ( - Response::Panic(stringify_panic_payload(e)), - Ok(LandlockStatus::Unavailable), + validate_using_artifact( + &compiled_artifact_blob, + ¶ms, + executor_2, + cpu_time_start, + ), + landlock_status, ) - }); + }, + Arc::clone(&condvar), + WaitOutcome::Finished, + EXECUTE_THREAD_STACK_SIZE, + )?; - // Log if landlock threw an error. - if let Err(err) = landlock_status { - gum::warn!( - target: LOG_TARGET, - %worker_pid, - "error enabling landlock: {}", - err - ); - } + let outcome = thread::wait_for_threads(condvar); - result - }, - // If the CPU thread is not selected, we signal it to end, the join handle is - // dropped and the thread will finish in the background. - WaitOutcome::TimedOut => { - match cpu_time_monitor_thread.join() { - Ok(Some(cpu_time_elapsed)) => { - // Log if we exceed the timeout and the other thread hasn't finished. + let response = match outcome { + WaitOutcome::Finished => { + let _ = cpu_time_monitor_tx.send(()); + let (result, landlock_status) = execute_thread.join().unwrap_or_else(|e| { + ( + Response::Panic(stringify_panic_payload(e)), + Ok(LandlockStatus::Unavailable), + ) + }); + + // Log if landlock threw an error. + if let Err(err) = landlock_status { gum::warn!( target: LOG_TARGET, %worker_pid, - "execute job took {}ms cpu time, exceeded execute timeout {}ms", - cpu_time_elapsed.as_millis(), - execution_timeout.as_millis(), + "error enabling landlock: {}", + err ); - Response::TimedOut - }, - Ok(None) => - Response::InternalError(InternalValidationError::CpuTimeMonitorThread( - "error communicating over finished channel".into(), - )), - Err(e) => - Response::InternalError(InternalValidationError::CpuTimeMonitorThread( - stringify_panic_payload(e), - )), - } - }, - WaitOutcome::Pending => - unreachable!("we run wait_while until the outcome is no longer pending; qed"), - }; + } + + result + }, + // If the CPU thread is not selected, we signal it to end, the join handle is + // dropped and the thread will finish in the background. + WaitOutcome::TimedOut => { + match cpu_time_monitor_thread.join() { + Ok(Some(cpu_time_elapsed)) => { + // Log if we exceed the timeout and the other thread hasn't finished. + gum::warn!( + target: LOG_TARGET, + %worker_pid, + "execute job took {}ms cpu time, exceeded execute timeout {}ms", + cpu_time_elapsed.as_millis(), + execution_timeout.as_millis(), + ); + Response::TimedOut + }, + Ok(None) => Response::InternalError( + InternalValidationError::CpuTimeMonitorThread( + "error communicating over finished channel".into(), + ), + ), + Err(e) => Response::InternalError( + InternalValidationError::CpuTimeMonitorThread( + stringify_panic_payload(e), + ), + ), + } + }, + WaitOutcome::Pending => unreachable!( + "we run wait_while until the outcome is no longer pending; qed" + ), + }; - send_response(&mut stream, response).await?; - } - }); + send_response(&mut stream, response).await?; + } + }, + ); } fn validate_using_artifact( diff --git a/node/core/pvf/prepare-worker/src/lib.rs b/node/core/pvf/prepare-worker/src/lib.rs index 6f3cb18b4280..228ad3d4668d 100644 --- a/node/core/pvf/prepare-worker/src/lib.rs +++ b/node/core/pvf/prepare-worker/src/lib.rs @@ -116,169 +116,189 @@ async fn send_response(stream: &mut UnixStream, result: PrepareResult) -> io::Re /// /// 7. Send the result of preparation back to the host. If any error occurred in the above steps, we /// send that in the `PrepareResult`. -pub fn worker_entrypoint(socket_path: &str, node_version: Option<&str>) { - worker_event_loop("prepare", socket_path, node_version, |mut stream| async move { - let worker_pid = std::process::id(); - - loop { - let (pvf, temp_artifact_dest) = recv_request(&mut stream).await?; - gum::debug!( - target: LOG_TARGET, - %worker_pid, - "worker: preparing artifact", - ); - - let preparation_timeout = pvf.prep_timeout(); - let prepare_job_kind = pvf.prep_kind(); - let executor_params = (*pvf.executor_params()).clone(); - - // Conditional variable to notify us when a thread is done. - let condvar = thread::get_condvar(); - - // Run the memory tracker in a regular, non-worker thread. - #[cfg(any(target_os = "linux", feature = "jemalloc-allocator"))] - let condvar_memory = Arc::clone(&condvar); - #[cfg(any(target_os = "linux", feature = "jemalloc-allocator"))] - let memory_tracker_thread = std::thread::spawn(|| memory_tracker_loop(condvar_memory)); - - let cpu_time_start = ProcessTime::now(); - - // Spawn a new thread that runs the CPU time monitor. - let (cpu_time_monitor_tx, cpu_time_monitor_rx) = channel::<()>(); - let cpu_time_monitor_thread = thread::spawn_worker_thread( - "cpu time monitor thread", - move || { - cpu_time_monitor_loop(cpu_time_start, preparation_timeout, cpu_time_monitor_rx) - }, - Arc::clone(&condvar), - WaitOutcome::TimedOut, - )?; - // Spawn another thread for preparation. - let prepare_thread = thread::spawn_worker_thread( - "prepare thread", - move || { - // Try to enable landlock. - #[cfg(target_os = "linux")] +pub fn worker_entrypoint( + socket_path: &str, + node_version: Option<&str>, + worker_version: Option<&str>, +) { + worker_event_loop( + "prepare", + socket_path, + node_version, + worker_version, + |mut stream| async move { + let worker_pid = std::process::id(); + + loop { + let (pvf, temp_artifact_dest) = recv_request(&mut stream).await?; + gum::debug!( + target: LOG_TARGET, + %worker_pid, + "worker: preparing artifact", + ); + + let preparation_timeout = pvf.prep_timeout(); + let prepare_job_kind = pvf.prep_kind(); + let executor_params = (*pvf.executor_params()).clone(); + + // Conditional variable to notify us when a thread is done. + let condvar = thread::get_condvar(); + + // Run the memory tracker in a regular, non-worker thread. + #[cfg(any(target_os = "linux", feature = "jemalloc-allocator"))] + let condvar_memory = Arc::clone(&condvar); + #[cfg(any(target_os = "linux", feature = "jemalloc-allocator"))] + let memory_tracker_thread = std::thread::spawn(|| memory_tracker_loop(condvar_memory)); + + let cpu_time_start = ProcessTime::now(); + + // Spawn a new thread that runs the CPU time monitor. + let (cpu_time_monitor_tx, cpu_time_monitor_rx) = channel::<()>(); + let cpu_time_monitor_thread = thread::spawn_worker_thread( + "cpu time monitor thread", + move || { + cpu_time_monitor_loop( + cpu_time_start, + preparation_timeout, + cpu_time_monitor_rx, + ) + }, + Arc::clone(&condvar), + WaitOutcome::TimedOut, + )?; + // Spawn another thread for preparation. + let prepare_thread = thread::spawn_worker_thread( + "prepare thread", + move || { + // Try to enable landlock. + #[cfg(target_os = "linux")] let landlock_status = polkadot_node_core_pvf_common::worker::security::landlock::try_restrict_thread() .map(LandlockStatus::from_ruleset_status) .map_err(|e| e.to_string()); - #[cfg(not(target_os = "linux"))] - let landlock_status: Result = Ok(LandlockStatus::NotEnforced); - - #[allow(unused_mut)] - let mut result = prepare_artifact(pvf, cpu_time_start); - - // Get the `ru_maxrss` stat. If supported, call getrusage for the thread. - #[cfg(target_os = "linux")] - let mut result = result.map(|(artifact, elapsed)| (artifact, elapsed, get_max_rss_thread())); - - // If we are pre-checking, check for runtime construction errors. - // - // As pre-checking is more strict than just preparation in terms of memory and - // time, it is okay to do extra checks here. This takes negligible time anyway. - if let PrepareJobKind::Prechecking = prepare_job_kind { - result = result.and_then(|output| { - runtime_construction_check(output.0.as_ref(), executor_params)?; - Ok(output) - }); - } - - (result, landlock_status) - }, - Arc::clone(&condvar), - WaitOutcome::Finished, - )?; - - let outcome = thread::wait_for_threads(condvar); - - let result = match outcome { - WaitOutcome::Finished => { - let _ = cpu_time_monitor_tx.send(()); - - match prepare_thread.join().unwrap_or_else(|err| { - ( - Err(PrepareError::Panic(stringify_panic_payload(err))), - Ok(LandlockStatus::Unavailable), - ) - }) { - (Err(err), _) => { - // Serialized error will be written into the socket. - Err(err) - }, - (Ok(ok), landlock_status) => { - #[cfg(not(target_os = "linux"))] - let (artifact, cpu_time_elapsed) = ok; - #[cfg(target_os = "linux")] - let (artifact, cpu_time_elapsed, max_rss) = ok; - - // Stop the memory stats worker and get its observed memory stats. - #[cfg(any(target_os = "linux", feature = "jemalloc-allocator"))] - let memory_tracker_stats = get_memory_tracker_loop_stats(memory_tracker_thread, worker_pid).await; - let memory_stats = MemoryStats { - #[cfg(any(target_os = "linux", feature = "jemalloc-allocator"))] - memory_tracker_stats, + #[cfg(not(target_os = "linux"))] + let landlock_status: Result = Ok(LandlockStatus::NotEnforced); + + #[allow(unused_mut)] + let mut result = prepare_artifact(pvf, cpu_time_start); + + // Get the `ru_maxrss` stat. If supported, call getrusage for the thread. + #[cfg(target_os = "linux")] + let mut result = result + .map(|(artifact, elapsed)| (artifact, elapsed, get_max_rss_thread())); + + // If we are pre-checking, check for runtime construction errors. + // + // As pre-checking is more strict than just preparation in terms of memory and + // time, it is okay to do extra checks here. This takes negligible time anyway. + if let PrepareJobKind::Prechecking = prepare_job_kind { + result = result.and_then(|output| { + runtime_construction_check(output.0.as_ref(), executor_params)?; + Ok(output) + }); + } + + (result, landlock_status) + }, + Arc::clone(&condvar), + WaitOutcome::Finished, + )?; + + let outcome = thread::wait_for_threads(condvar); + + let result = match outcome { + WaitOutcome::Finished => { + let _ = cpu_time_monitor_tx.send(()); + + match prepare_thread.join().unwrap_or_else(|err| { + ( + Err(PrepareError::Panic(stringify_panic_payload(err))), + Ok(LandlockStatus::Unavailable), + ) + }) { + (Err(err), _) => { + // Serialized error will be written into the socket. + Err(err) + }, + (Ok(ok), landlock_status) => { + #[cfg(not(target_os = "linux"))] + let (artifact, cpu_time_elapsed) = ok; #[cfg(target_os = "linux")] - max_rss: extract_max_rss_stat(max_rss, worker_pid), - }; + let (artifact, cpu_time_elapsed, max_rss) = ok; - // Log if landlock threw an error. - if let Err(err) = landlock_status { + // Stop the memory stats worker and get its observed memory stats. + #[cfg(any(target_os = "linux", feature = "jemalloc-allocator"))] + let memory_tracker_stats = get_memory_tracker_loop_stats(memory_tracker_thread, worker_pid) + .await; + let memory_stats = MemoryStats { + #[cfg(any( + target_os = "linux", + feature = "jemalloc-allocator" + ))] + memory_tracker_stats, + #[cfg(target_os = "linux")] + max_rss: extract_max_rss_stat(max_rss, worker_pid), + }; + + // Log if landlock threw an error. + if let Err(err) = landlock_status { + gum::warn!( + target: LOG_TARGET, + %worker_pid, + "error enabling landlock: {}", + err + ); + } + + // Write the serialized artifact into a temp file. + // + // PVF host only keeps artifacts statuses in its memory, successfully + // compiled code gets stored on the disk (and consequently deserialized + // by execute-workers). The prepare worker is only required to send `Ok` + // to the pool to indicate the success. + + gum::debug!( + target: LOG_TARGET, + %worker_pid, + "worker: writing artifact to {}", + temp_artifact_dest.display(), + ); + tokio::fs::write(&temp_artifact_dest, &artifact).await?; + + Ok(PrepareStats { cpu_time_elapsed, memory_stats }) + }, + } + }, + // If the CPU thread is not selected, we signal it to end, the join handle is + // dropped and the thread will finish in the background. + WaitOutcome::TimedOut => { + match cpu_time_monitor_thread.join() { + Ok(Some(cpu_time_elapsed)) => { + // Log if we exceed the timeout and the other thread hasn't finished. gum::warn!( target: LOG_TARGET, %worker_pid, - "error enabling landlock: {}", - err + "prepare job took {}ms cpu time, exceeded prepare timeout {}ms", + cpu_time_elapsed.as_millis(), + preparation_timeout.as_millis(), ); - } - - // Write the serialized artifact into a temp file. - // - // PVF host only keeps artifacts statuses in its memory, successfully - // compiled code gets stored on the disk (and consequently deserialized - // by execute-workers). The prepare worker is only required to send `Ok` - // to the pool to indicate the success. - - gum::debug!( - target: LOG_TARGET, - %worker_pid, - "worker: writing artifact to {}", - temp_artifact_dest.display(), - ); - tokio::fs::write(&temp_artifact_dest, &artifact).await?; - - Ok(PrepareStats { cpu_time_elapsed, memory_stats }) - }, - } - }, - // If the CPU thread is not selected, we signal it to end, the join handle is - // dropped and the thread will finish in the background. - WaitOutcome::TimedOut => { - match cpu_time_monitor_thread.join() { - Ok(Some(cpu_time_elapsed)) => { - // Log if we exceed the timeout and the other thread hasn't finished. - gum::warn!( - target: LOG_TARGET, - %worker_pid, - "prepare job took {}ms cpu time, exceeded prepare timeout {}ms", - cpu_time_elapsed.as_millis(), - preparation_timeout.as_millis(), - ); - Err(PrepareError::TimedOut) - }, - Ok(None) => Err(PrepareError::IoErr( - "error communicating over closed channel".into(), - )), - // Errors in this thread are independent of the PVF. - Err(err) => Err(PrepareError::IoErr(stringify_panic_payload(err))), - } - }, - WaitOutcome::Pending => - unreachable!("we run wait_while until the outcome is no longer pending; qed"), - }; - - send_response(&mut stream, result).await?; - } - }); + Err(PrepareError::TimedOut) + }, + Ok(None) => Err(PrepareError::IoErr( + "error communicating over closed channel".into(), + )), + // Errors in this thread are independent of the PVF. + Err(err) => Err(PrepareError::IoErr(stringify_panic_payload(err))), + } + }, + WaitOutcome::Pending => unreachable!( + "we run wait_while until the outcome is no longer pending; qed" + ), + }; + + send_response(&mut stream, result).await?; + } + }, + ); } fn prepare_artifact( diff --git a/node/core/pvf/src/execute/queue.rs b/node/core/pvf/src/execute/queue.rs index 395697616b36..33a1c6f89709 100644 --- a/node/core/pvf/src/execute/queue.rs +++ b/node/core/pvf/src/execute/queue.rs @@ -137,8 +137,10 @@ struct Queue { /// The receiver that receives messages to the pool. to_queue_rx: mpsc::Receiver, + // Some variables related to the current session. program_path: PathBuf, spawn_timeout: Duration, + node_version: Option, /// The queue of jobs that are waiting for a worker to pick up. queue: VecDeque, @@ -152,12 +154,14 @@ impl Queue { program_path: PathBuf, worker_capacity: usize, spawn_timeout: Duration, + node_version: Option, to_queue_rx: mpsc::Receiver, ) -> Self { Self { metrics, program_path, spawn_timeout, + node_version, to_queue_rx, queue: VecDeque::new(), mux: Mux::new(), @@ -398,9 +402,15 @@ fn spawn_extra_worker(queue: &mut Queue, job: ExecuteJob) { queue.metrics.execute_worker().on_begin_spawn(); gum::debug!(target: LOG_TARGET, "spawning an extra worker"); - queue - .mux - .push(spawn_worker_task(queue.program_path.clone(), job, queue.spawn_timeout).boxed()); + queue.mux.push( + spawn_worker_task( + queue.program_path.clone(), + job, + queue.spawn_timeout, + queue.node_version.clone(), + ) + .boxed(), + ); queue.workers.spawn_inflight += 1; } @@ -414,12 +424,18 @@ async fn spawn_worker_task( program_path: PathBuf, job: ExecuteJob, spawn_timeout: Duration, + node_version: Option, ) -> QueueEvent { use futures_timer::Delay; loop { - match super::worker_intf::spawn(&program_path, job.executor_params.clone(), spawn_timeout) - .await + match super::worker_intf::spawn( + &program_path, + job.executor_params.clone(), + spawn_timeout, + node_version.as_deref(), + ) + .await { Ok((idle, handle)) => break QueueEvent::Spawn(idle, handle, job), Err(err) => { @@ -481,8 +497,17 @@ pub fn start( program_path: PathBuf, worker_capacity: usize, spawn_timeout: Duration, + node_version: Option, ) -> (mpsc::Sender, impl Future) { let (to_queue_tx, to_queue_rx) = mpsc::channel(20); - let run = Queue::new(metrics, program_path, worker_capacity, spawn_timeout, to_queue_rx).run(); + let run = Queue::new( + metrics, + program_path, + worker_capacity, + spawn_timeout, + node_version, + to_queue_rx, + ) + .run(); (to_queue_tx, run) } diff --git a/node/core/pvf/src/execute/worker_intf.rs b/node/core/pvf/src/execute/worker_intf.rs index 6e54e17e515a..9d8b61d10447 100644 --- a/node/core/pvf/src/execute/worker_intf.rs +++ b/node/core/pvf/src/execute/worker_intf.rs @@ -45,14 +45,14 @@ pub async fn spawn( program_path: &Path, executor_params: ExecutorParams, spawn_timeout: Duration, + node_version: Option<&str>, ) -> Result<(IdleWorker, WorkerHandle), SpawnErr> { - let (mut idle_worker, worker_handle) = spawn_with_program_path( - "execute", - program_path, - &["execute-worker", "--node-impl-version", env!("SUBSTRATE_CLI_IMPL_VERSION")], - spawn_timeout, - ) - .await?; + let mut extra_args = vec!["execute-worker"]; + if let Some(node_version) = node_version { + extra_args.extend_from_slice(&["--node-impl-version", node_version]); + } + let (mut idle_worker, worker_handle) = + spawn_with_program_path("execute", program_path, &extra_args, spawn_timeout).await?; send_handshake(&mut idle_worker.stream, Handshake { executor_params }) .await .map_err(|error| { diff --git a/node/core/pvf/src/host.rs b/node/core/pvf/src/host.rs index 3ca4ea43de1b..a5772e34e16e 100644 --- a/node/core/pvf/src/host.rs +++ b/node/core/pvf/src/host.rs @@ -52,6 +52,12 @@ pub const PREPARE_FAILURE_COOLDOWN: Duration = Duration::from_millis(200); /// The amount of times we will retry failed prepare jobs. pub const NUM_PREPARE_RETRIES: u32 = 5; +/// The name of binary spawned to prepare a PVF artifact +pub const PREPARE_BINARY_NAME: &str = "polkadot-prepare-worker"; + +/// The name of binary spawned to execute a PVF +pub const EXECUTE_BINARY_NAME: &str = "polkadot-execute-worker"; + /// An alias to not spell the type for the oneshot sender for the PVF execution result. pub(crate) type ResultSender = oneshot::Sender>; @@ -144,6 +150,8 @@ struct ExecutePvfInputs { pub struct Config { /// The root directory where the prepared artifacts can be stored. pub cache_path: PathBuf, + /// The version of the node. `None` can be passed to skip the version check (only for tests). + pub node_version: Option, /// The path to the program that can be used to spawn the prepare workers. pub prepare_worker_program_path: PathBuf, /// The time allotted for a prepare worker to spawn and report to the host. @@ -163,18 +171,20 @@ pub struct Config { impl Config { /// Create a new instance of the configuration. - pub fn new(cache_path: std::path::PathBuf, program_path: std::path::PathBuf) -> Self { - // Do not contaminate the other parts of the codebase with the types from `tokio`. - let cache_path = PathBuf::from(cache_path); - let program_path = PathBuf::from(program_path); - + pub fn new( + cache_path: PathBuf, + node_version: Option, + prepare_worker_program_path: PathBuf, + execute_worker_program_path: PathBuf, + ) -> Self { Self { cache_path, - prepare_worker_program_path: program_path.clone(), + node_version, + prepare_worker_program_path, prepare_worker_spawn_timeout: Duration::from_secs(3), prepare_workers_soft_max_num: 1, prepare_workers_hard_max_num: 1, - execute_worker_program_path: program_path, + execute_worker_program_path, execute_worker_spawn_timeout: Duration::from_secs(3), execute_workers_max_num: 2, } @@ -204,6 +214,7 @@ pub fn start(config: Config, metrics: Metrics) -> (ValidationHost, impl Future (ValidationHost, impl Future, + to_pool: mpsc::Receiver, from_pool: mpsc::UnboundedSender, spawned: HopSlotMap, mux: Mux, + metrics: Metrics, } @@ -128,6 +131,7 @@ async fn run( program_path, cache_path, spawn_timeout, + node_version, to_pool, mut from_pool, mut spawned, @@ -155,6 +159,7 @@ async fn run( &program_path, &cache_path, spawn_timeout, + node_version.clone(), &mut spawned, &mut mux, to_pool, @@ -201,6 +206,7 @@ fn handle_to_pool( program_path: &Path, cache_path: &Path, spawn_timeout: Duration, + node_version: Option, spawned: &mut HopSlotMap, mux: &mut Mux, to_pool: ToPool, @@ -209,7 +215,9 @@ fn handle_to_pool( ToPool::Spawn => { gum::debug!(target: LOG_TARGET, "spawning a new prepare worker"); metrics.prepare_worker().on_begin_spawn(); - mux.push(spawn_worker_task(program_path.to_owned(), spawn_timeout).boxed()); + mux.push( + spawn_worker_task(program_path.to_owned(), spawn_timeout, node_version).boxed(), + ); }, ToPool::StartWork { worker, pvf, artifact_path } => { if let Some(data) = spawned.get_mut(worker) { @@ -248,11 +256,15 @@ fn handle_to_pool( } } -async fn spawn_worker_task(program_path: PathBuf, spawn_timeout: Duration) -> PoolEvent { +async fn spawn_worker_task( + program_path: PathBuf, + spawn_timeout: Duration, + node_version: Option, +) -> PoolEvent { use futures_timer::Delay; loop { - match worker_intf::spawn(&program_path, spawn_timeout).await { + match worker_intf::spawn(&program_path, spawn_timeout, node_version.as_deref()).await { Ok((idle, handle)) => break PoolEvent::Spawn(idle, handle), Err(err) => { gum::warn!(target: LOG_TARGET, "failed to spawn a prepare worker: {:?}", err); @@ -419,6 +431,7 @@ pub fn start( program_path: PathBuf, cache_path: PathBuf, spawn_timeout: Duration, + node_version: Option, ) -> (mpsc::Sender, mpsc::UnboundedReceiver, impl Future) { let (to_pool_tx, to_pool_rx) = mpsc::channel(10); let (from_pool_tx, from_pool_rx) = mpsc::unbounded(); @@ -428,6 +441,7 @@ pub fn start( program_path, cache_path, spawn_timeout, + node_version, to_pool: to_pool_rx, from_pool: from_pool_tx, spawned: HopSlotMap::with_capacity_and_key(20), diff --git a/node/core/pvf/src/prepare/worker_intf.rs b/node/core/pvf/src/prepare/worker_intf.rs index 47522d3f0856..d0d9a026dda7 100644 --- a/node/core/pvf/src/prepare/worker_intf.rs +++ b/node/core/pvf/src/prepare/worker_intf.rs @@ -45,14 +45,13 @@ use tokio::{io, net::UnixStream}; pub async fn spawn( program_path: &Path, spawn_timeout: Duration, + node_version: Option<&str>, ) -> Result<(IdleWorker, WorkerHandle), SpawnErr> { - spawn_with_program_path( - "prepare", - program_path, - &["prepare-worker", "--node-impl-version", env!("SUBSTRATE_CLI_IMPL_VERSION")], - spawn_timeout, - ) - .await + let mut extra_args = vec!["prepare-worker"]; + if let Some(node_version) = node_version { + extra_args.extend_from_slice(&["--node-impl-version", node_version]); + } + spawn_with_program_path("prepare", program_path, &extra_args, spawn_timeout).await } pub enum Outcome { diff --git a/node/core/pvf/src/testing.rs b/node/core/pvf/src/testing.rs index cc07d7aeef02..3cd1ce304ab8 100644 --- a/node/core/pvf/src/testing.rs +++ b/node/core/pvf/src/testing.rs @@ -58,37 +58,35 @@ macro_rules! decl_puppet_worker_main { $crate::sp_tracing::try_init_simple(); let args = std::env::args().collect::>(); - if args.len() < 3 { + if args.len() == 1 { panic!("wrong number of arguments"); } - let mut version = None; - let mut socket_path: &str = ""; - - for i in 2..args.len() { - match args[i].as_ref() { - "--socket-path" => socket_path = args[i + 1].as_str(), - "--node-version" => version = Some(args[i + 1].as_str()), - _ => (), - } - } - - let subcommand = &args[1]; - match subcommand.as_ref() { + let entrypoint = match args[1].as_ref() { "exit" => { std::process::exit(1); }, "sleep" => { std::thread::sleep(std::time::Duration::from_secs(5)); + return }, - "prepare-worker" => { - $crate::prepare_worker_entrypoint(&socket_path, version); - }, - "execute-worker" => { - $crate::execute_worker_entrypoint(&socket_path, version); - }, + "prepare-worker" => $crate::prepare_worker_entrypoint, + "execute-worker" => $crate::execute_worker_entrypoint, other => panic!("unknown subcommand: {}", other), + }; + + let mut node_version = None; + let mut socket_path: &str = ""; + + for i in (2..args.len()).step_by(2) { + match args[i].as_ref() { + "--socket-path" => socket_path = args[i + 1].as_str(), + "--node-impl-version" => node_version = Some(args[i + 1].as_str()), + arg => panic!("Unexpected argument found: {}", arg), + } } + + entrypoint(&socket_path, node_version, None); } }; } diff --git a/node/core/pvf/src/worker_intf.rs b/node/core/pvf/src/worker_intf.rs index 33144616601d..ef5733ec0e6d 100644 --- a/node/core/pvf/src/worker_intf.rs +++ b/node/core/pvf/src/worker_intf.rs @@ -43,12 +43,14 @@ pub const JOB_TIMEOUT_WALL_CLOCK_FACTOR: u32 = 4; pub async fn spawn_with_program_path( debug_id: &'static str, program_path: impl Into, - extra_args: &'static [&'static str], + extra_args: &[&str], spawn_timeout: Duration, ) -> Result<(IdleWorker, WorkerHandle), SpawnErr> { let program_path = program_path.into(); with_transient_socket_path(debug_id, |socket_path| { let socket_path = socket_path.to_owned(); + let extra_args: Vec = extra_args.iter().map(|arg| arg.to_string()).collect(); + async move { let listener = UnixListener::bind(&socket_path).map_err(|err| { gum::warn!( @@ -63,7 +65,7 @@ pub async fn spawn_with_program_path( })?; let handle = - WorkerHandle::spawn(&program_path, extra_args, socket_path).map_err(|err| { + WorkerHandle::spawn(&program_path, &extra_args, socket_path).map_err(|err| { gum::warn!( target: LOG_TARGET, %debug_id, @@ -214,7 +216,7 @@ pub struct WorkerHandle { impl WorkerHandle { fn spawn( program: impl AsRef, - extra_args: &[&str], + extra_args: &[String], socket_path: impl AsRef, ) -> io::Result { let mut child = process::Command::new(program.as_ref()) diff --git a/node/core/pvf/tests/it/main.rs b/node/core/pvf/tests/it/main.rs index e2877346f09d..72c459c2f632 100644 --- a/node/core/pvf/tests/it/main.rs +++ b/node/core/pvf/tests/it/main.rs @@ -53,7 +53,8 @@ impl TestHost { { let cache_dir = tempfile::tempdir().unwrap(); let program_path = std::path::PathBuf::from(PUPPET_EXE); - let mut config = Config::new(cache_dir.path().to_owned(), program_path); + let mut config = + Config::new(cache_dir.path().to_owned(), None, program_path.clone(), program_path); f(&mut config); let (host, task) = start(config, Metrics::default()); let _ = tokio::task::spawn(task); diff --git a/node/malus/Cargo.toml b/node/malus/Cargo.toml index 8e23e623174f..7e0bf0d8dd08 100644 --- a/node/malus/Cargo.toml +++ b/node/malus/Cargo.toml @@ -12,6 +12,19 @@ publish = false name = "malus" path = "src/malus.rs" +# Use artifact dependencies once stable. +# See https://github.com/rust-lang/cargo/issues/9096. +[[bin]] +name = "polkadot-execute-worker" +path = "../../src/bin/execute-worker.rs" +# Prevent rustdoc error. Already documented from top-level Cargo.toml. +doc = false +[[bin]] +name = "polkadot-prepare-worker" +path = "../../src/bin/prepare-worker.rs" +# Prevent rustdoc error. Already documented from top-level Cargo.toml. +doc = false + [dependencies] polkadot-cli = { path = "../../cli", features = [ "malus", "rococo-native", "kusama-native", "westend-native", "polkadot-native" ] } polkadot-node-subsystem = { path = "../subsystem" } @@ -20,8 +33,6 @@ polkadot-node-subsystem-types = { path = "../subsystem-types" } polkadot-node-core-dispute-coordinator = { path = "../core/dispute-coordinator" } polkadot-node-core-candidate-validation = { path = "../core/candidate-validation" } polkadot-node-core-backing = { path = "../core/backing" } -polkadot-node-core-pvf-execute-worker = { path = "../core/pvf/execute-worker" } -polkadot-node-core-pvf-prepare-worker = { path = "../core/pvf/prepare-worker" } polkadot-node-primitives = { path = "../primitives" } polkadot-primitives = { path = "../../primitives" } color-eyre = { version = "0.6.1", default-features = false } @@ -36,11 +47,19 @@ gum = { package = "tracing-gum", path = "../gum/" } erasure = { package = "polkadot-erasure-coding", path = "../../erasure-coding" } rand = "0.8.5" -[features] -default = [] -fast-runtime = ["polkadot-cli/fast-runtime"] +# Required for worker binaries to build. +polkadot-node-core-pvf-common = { path = "../core/pvf/common" } +polkadot-node-core-pvf-execute-worker = { path = "../core/pvf/execute-worker" } +polkadot-node-core-pvf-prepare-worker = { path = "../core/pvf/prepare-worker" } [dev-dependencies] polkadot-node-subsystem-test-helpers = { path = "../subsystem-test-helpers" } sp-core = { git = "https://github.com/paritytech/substrate", branch = "master" } futures = { version = "0.3.21", features = ["thread-pool"] } + +[build-dependencies] +substrate-build-script-utils = { git = "https://github.com/paritytech/substrate", branch = "master" } + +[features] +default = [] +fast-runtime = ["polkadot-cli/fast-runtime"] diff --git a/node/core/pvf/common/build.rs b/node/malus/build.rs similarity index 80% rename from node/core/pvf/common/build.rs rename to node/malus/build.rs index 40e9f832586e..84fe22e23ed6 100644 --- a/node/core/pvf/common/build.rs +++ b/node/malus/build.rs @@ -16,4 +16,7 @@ fn main() { substrate_build_script_utils::generate_cargo_keys(); + // For the node/worker version check, make sure we always rebuild the node and binary workers + // when the version changes. + substrate_build_script_utils::rerun_if_git_head_changed(); } diff --git a/node/malus/src/malus.rs b/node/malus/src/malus.rs index d09f8be990a4..69dd7c869fc0 100644 --- a/node/malus/src/malus.rs +++ b/node/malus/src/malus.rs @@ -36,14 +36,6 @@ enum NemesisVariant { BackGarbageCandidate(BackGarbageCandidateOptions), /// Delayed disputing of ancestors that are perfectly fine. DisputeAncestor(DisputeAncestorOptions), - - #[allow(missing_docs)] - #[command(name = "prepare-worker", hide = true)] - PvfPrepareWorker(polkadot_cli::ValidationWorkerCommand), - - #[allow(missing_docs)] - #[command(name = "execute-worker", hide = true)] - PvfExecuteWorker(polkadot_cli::ValidationWorkerCommand), } #[derive(Debug, Parser)] @@ -88,35 +80,6 @@ impl MalusCli { finality_delay, )? }, - NemesisVariant::PvfPrepareWorker(cmd) => { - #[cfg(target_os = "android")] - { - return Err("PVF preparation workers are not supported under this platform") - .into() - } - - #[cfg(not(target_os = "android"))] - { - polkadot_node_core_pvf_prepare_worker::worker_entrypoint( - &cmd.socket_path, - None, - ); - } - }, - NemesisVariant::PvfExecuteWorker(cmd) => { - #[cfg(target_os = "android")] - { - return Err("PVF execution workers are not supported under this platform").into() - } - - #[cfg(not(target_os = "android"))] - { - polkadot_node_core_pvf_execute_worker::worker_entrypoint( - &cmd.socket_path, - None, - ); - } - }, } Ok(()) } diff --git a/node/metrics/README.md b/node/metrics/README.md new file mode 100644 index 000000000000..cc88884f2142 --- /dev/null +++ b/node/metrics/README.md @@ -0,0 +1,9 @@ +# polkadot-node-metrics + +## Testing + +Before running `cargo test` in this crate, make sure the worker binaries are built first. This can be done with: + +```sh +cargo build --bin polkadot-execute-worker --bin polkadot-prepare-worker +``` diff --git a/node/service/Cargo.toml b/node/service/Cargo.toml index 0de3d0e8df77..d02bee89ffa1 100644 --- a/node/service/Cargo.toml +++ b/node/service/Cargo.toml @@ -88,6 +88,7 @@ codec = { package = "parity-scale-codec", version = "3.6.1" } async-trait = "0.1.57" lru = "0.11" log = "0.4.17" +is_executable = "1.0.1" # Polkadot polkadot-core-primitives = { path = "../../core-primitives" } @@ -135,6 +136,7 @@ polkadot-node-core-chain-api = { path = "../core/chain-api", optional = true } polkadot-node-core-chain-selection = { path = "../core/chain-selection", optional = true } polkadot-node-core-dispute-coordinator = { path = "../core/dispute-coordinator", optional = true } polkadot-node-core-provisioner = { path = "../core/provisioner", optional = true } +polkadot-node-core-pvf = { path = "../core/pvf", optional = true } polkadot-node-core-pvf-checker = { path = "../core/pvf-checker", optional = true } polkadot-node-core-runtime-api = { path = "../core/runtime-api", optional = true } polkadot-statement-distribution = { path = "../network/statement-distribution", optional = true } @@ -144,6 +146,7 @@ polkadot-test-client = { path = "../test/client" } polkadot-node-subsystem-test-helpers = { path = "../subsystem-test-helpers" } env_logger = "0.9.0" assert_matches = "1.5.0" +serial_test = "2.0.0" tempfile = "3.2" [features] @@ -174,6 +177,7 @@ full-node = [ "polkadot-node-core-runtime-api", "polkadot-statement-distribution", "polkadot-approval-distribution", + "polkadot-node-core-pvf", "polkadot-node-core-pvf-checker", "kvdb-rocksdb", "parity-db", diff --git a/node/service/src/lib.rs b/node/service/src/lib.rs index d996fcfc6123..efdd01d27321 100644 --- a/node/service/src/lib.rs +++ b/node/service/src/lib.rs @@ -27,6 +27,8 @@ mod relay_chain_selection; #[cfg(feature = "full-node")] pub mod overseer; +#[cfg(feature = "full-node")] +pub mod workers; #[cfg(feature = "full-node")] pub use self::overseer::{OverseerGen, OverseerGenArgs, RealOverseerGen}; @@ -73,7 +75,7 @@ pub use { #[cfg(feature = "full-node")] use polkadot_node_subsystem::jaeger; -use std::{sync::Arc, time::Duration}; +use std::{path::PathBuf, sync::Arc, time::Duration}; use prometheus_endpoint::Registry; #[cfg(feature = "full-node")] @@ -235,6 +237,26 @@ pub enum Error { #[cfg(feature = "full-node")] #[error("Expected at least one of polkadot, kusama, westend or rococo runtime feature")] NoRuntime, + + #[cfg(feature = "full-node")] + #[error("Worker binaries not executable, prepare binary: {prep_worker_path:?}, execute binary: {exec_worker_path:?}")] + InvalidWorkerBinaries { prep_worker_path: PathBuf, exec_worker_path: PathBuf }, + + #[cfg(feature = "full-node")] + #[error("Worker binaries could not be found, make sure polkadot was built/installed correctly. Searched given workers path ({given_workers_path:?}), polkadot binary path ({current_exe_path:?}), and lib path (/usr/lib/polkadot), workers names: {workers_names:?}")] + MissingWorkerBinaries { + given_workers_path: Option, + current_exe_path: PathBuf, + workers_names: Option<(String, String)>, + }, + + #[cfg(feature = "full-node")] + #[error("Version of worker binary ({worker_version}) is different from node version ({node_version}), worker_path: {worker_path}. TESTING ONLY: this check can be disabled with --disable-worker-version-check")] + WorkerBinaryVersionMismatch { + worker_version: String, + node_version: String, + worker_path: PathBuf, + }, } /// Identifies the variant of the chain. @@ -603,6 +625,28 @@ where }) } +#[cfg(feature = "full-node")] +pub struct NewFullParams { + pub is_collator: IsCollator, + pub grandpa_pause: Option<(u32, u32)>, + pub enable_beefy: bool, + pub jaeger_agent: Option, + pub telemetry_worker_handle: Option, + /// The version of the node. TESTING ONLY: `None` can be passed to skip the node/worker version + /// check, both on startup and in the workers. + pub node_version: Option, + /// An optional path to a directory containing the workers. + pub workers_path: Option, + /// Optional custom names for the prepare and execute workers. + pub workers_names: Option<(String, String)>, + pub overseer_enable_anyways: bool, + pub overseer_gen: OverseerGenerator, + pub overseer_message_channel_capacity_override: Option, + #[allow(dead_code)] + pub malus_finality_delay: Option, + pub hwbench: Option, +} + #[cfg(feature = "full-node")] pub struct NewFull { pub task_manager: TaskManager, @@ -656,24 +700,30 @@ pub const AVAILABILITY_CONFIG: AvailabilityConfig = AvailabilityConfig { /// `overseer_enable_anyways` always enables the overseer, based on the provided `OverseerGenerator`, /// regardless of the role the node has. The relay chain selection (longest or disputes-aware) is /// still determined based on the role of the node. Likewise for authority discovery. +/// +/// `workers_path` is used to get the path to the directory where auxiliary worker binaries reside. +/// If not specified, the main binary's directory is searched first, then `/usr/lib/polkadot` is +/// searched. If the path points to an executable rather then directory, that executable is used +/// both as preparation and execution worker (supposed to be used for tests only). #[cfg(feature = "full-node")] -pub fn new_full( +pub fn new_full( mut config: Configuration, - is_collator: IsCollator, - grandpa_pause: Option<(u32, u32)>, - enable_beefy: bool, - jaeger_agent: Option, - telemetry_worker_handle: Option, - program_path: Option, - overseer_enable_anyways: bool, - overseer_gen: OverseerGenerator, - overseer_message_channel_capacity_override: Option, - _malus_finality_delay: Option, - hwbench: Option, -) -> Result -where - OverseerGenerator: OverseerGen, -{ + NewFullParams { + is_collator, + grandpa_pause, + enable_beefy, + jaeger_agent, + telemetry_worker_handle, + node_version, + workers_path, + workers_names, + overseer_enable_anyways, + overseer_gen, + overseer_message_channel_capacity_override, + malus_finality_delay: _malus_finality_delay, + hwbench, + }: NewFullParams, +) -> Result { use polkadot_node_network_protocol::request_response::IncomingRequest; use sc_network_common::sync::warp::WarpSyncParams; @@ -859,16 +909,24 @@ where slot_duration_millis: slot_duration.as_millis() as u64, }; - let candidate_validation_config = CandidateValidationConfig { - artifacts_cache_path: config - .database - .path() - .ok_or(Error::DatabasePathRequired)? - .join("pvf-artifacts"), - program_path: match program_path { - None => std::env::current_exe()?, - Some(p) => p, - }, + let candidate_validation_config = if is_collator.is_collator() { + None + } else { + let (prep_worker_path, exec_worker_path) = + workers::determine_workers_paths(workers_path, workers_names, node_version.clone())?; + log::info!("🚀 Using prepare-worker binary at: {:?}", prep_worker_path); + log::info!("🚀 Using execute-worker binary at: {:?}", exec_worker_path); + + Some(CandidateValidationConfig { + artifacts_cache_path: config + .database + .path() + .ok_or(Error::DatabasePathRequired)? + .join("pvf-artifacts"), + node_version, + prep_worker_path, + exec_worker_path, + }) }; let chain_selection_config = ChainSelectionConfig { @@ -1278,40 +1336,21 @@ pub fn new_chain_ops( /// regardless of the role the node has. The relay chain selection (longest or disputes-aware) is /// still determined based on the role of the node. Likewise for authority discovery. #[cfg(feature = "full-node")] -pub fn build_full( +pub fn build_full( config: Configuration, - is_collator: IsCollator, - grandpa_pause: Option<(u32, u32)>, - enable_beefy: bool, - jaeger_agent: Option, - telemetry_worker_handle: Option, - overseer_enable_anyways: bool, - overseer_gen: impl OverseerGen, - overseer_message_channel_override: Option, - malus_finality_delay: Option, - hwbench: Option, + mut params: NewFullParams, ) -> Result { let is_polkadot = config.chain_spec.is_polkadot(); - new_full( - config, - is_collator, - grandpa_pause, - enable_beefy, - jaeger_agent, - telemetry_worker_handle, - None, - overseer_enable_anyways, - overseer_gen, - overseer_message_channel_override.map(move |capacity| { + params.overseer_message_channel_capacity_override = + params.overseer_message_channel_capacity_override.map(move |capacity| { if is_polkadot { gum::warn!("Channel capacity should _never_ be tampered with on polkadot!"); } capacity - }), - malus_finality_delay, - hwbench, - ) + }); + + new_full(config, params) } /// Reverts the node state down to at most the last finalized block. diff --git a/node/service/src/overseer.rs b/node/service/src/overseer.rs index b1172cd9a549..29122ddca162 100644 --- a/node/service/src/overseer.rs +++ b/node/service/src/overseer.rs @@ -114,7 +114,7 @@ where /// Configuration for the availability store subsystem. pub availability_config: AvailabilityConfig, /// Configuration for the candidate validation subsystem. - pub candidate_validation_config: CandidateValidationConfig, + pub candidate_validation_config: Option, /// Configuration for the chain selection subsystem. pub chain_selection_config: ChainSelectionConfig, /// Configuration for the dispute coordinator subsystem. diff --git a/node/service/src/workers.rs b/node/service/src/workers.rs new file mode 100644 index 000000000000..5f7cc1c2ed49 --- /dev/null +++ b/node/service/src/workers.rs @@ -0,0 +1,520 @@ +// Copyright (C) Parity Technologies (UK) Ltd. +// This file is part of Polkadot. + +// Polkadot is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Polkadot is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Polkadot. If not, see . + +//! Utilities and tests for locating the PVF worker binaries. + +use super::Error; +use is_executable::IsExecutable; +use std::{path::PathBuf, process::Command}; + +#[cfg(test)] +use std::sync::{Mutex, OnceLock}; + +/// Override the workers polkadot binary directory path, used for testing. +#[cfg(test)] +fn workers_exe_path_override() -> &'static Mutex> { + static OVERRIDE: OnceLock>> = OnceLock::new(); + OVERRIDE.get_or_init(|| Mutex::new(None)) +} +/// Override the workers lib directory path, used for testing. +#[cfg(test)] +fn workers_lib_path_override() -> &'static Mutex> { + static OVERRIDE: OnceLock>> = OnceLock::new(); + OVERRIDE.get_or_init(|| Mutex::new(None)) +} + +/// Determines the final set of paths to use for the PVF workers. +/// +/// 1. Get the binaries from the workers path if it is passed in, or consider all possible +/// locations on the filesystem in order and get all sets of paths at which the binaries exist. +/// +/// 2. If no paths exist, error out. We can't proceed without workers. +/// +/// 3. Log a warning if more than one set of paths exists. Continue with the first set of paths. +/// +/// 4. Check if the returned paths are executable. If not it's evidence of a borked installation +/// so error out. +/// +/// 5. Do the version check, if mismatch error out. +/// +/// 6. At this point the final set of paths should be good to use. +pub fn determine_workers_paths( + given_workers_path: Option, + workers_names: Option<(String, String)>, + node_version: Option, +) -> Result<(PathBuf, PathBuf), Error> { + let mut workers_paths = list_workers_paths(given_workers_path.clone(), workers_names.clone())?; + if workers_paths.is_empty() { + let current_exe_path = get_exe_path()?; + return Err(Error::MissingWorkerBinaries { + given_workers_path, + current_exe_path, + workers_names, + }) + } else if workers_paths.len() > 1 { + log::warn!("multiple sets of worker binaries found ({:?})", workers_paths,); + } + + let (prep_worker_path, exec_worker_path) = workers_paths.swap_remove(0); + if !prep_worker_path.is_executable() || !exec_worker_path.is_executable() { + return Err(Error::InvalidWorkerBinaries { prep_worker_path, exec_worker_path }) + } + + // Do the version check. + if let Some(node_version) = node_version { + let worker_version = Command::new(&prep_worker_path).args(["--version"]).output()?.stdout; + let worker_version = std::str::from_utf8(&worker_version) + .expect("version is printed as a string; qed") + .trim() + .to_string(); + if worker_version != node_version { + return Err(Error::WorkerBinaryVersionMismatch { + worker_version, + node_version, + worker_path: prep_worker_path, + }) + } + let worker_version = Command::new(&exec_worker_path).args(["--version"]).output()?.stdout; + let worker_version = std::str::from_utf8(&worker_version) + .expect("version is printed as a string; qed") + .trim() + .to_string(); + if worker_version != node_version { + return Err(Error::WorkerBinaryVersionMismatch { + worker_version, + node_version, + worker_path: exec_worker_path, + }) + } + } else { + log::warn!("Skipping node/worker version checks. This could result in incorrect behavior in PVF workers."); + } + + Ok((prep_worker_path, exec_worker_path)) +} + +/// Get list of workers paths by considering the passed-in `given_workers_path` option, or possible +/// locations on the filesystem. See `new_full`. +fn list_workers_paths( + given_workers_path: Option, + workers_names: Option<(String, String)>, +) -> Result, Error> { + if let Some(path) = given_workers_path { + log::trace!("Using explicitly provided workers path {:?}", path); + + if path.is_executable() { + return Ok(vec![(path.clone(), path)]) + } + + let (prep_worker, exec_worker) = build_worker_paths(path, workers_names); + + // Check if both workers exist. Otherwise return an empty vector which results in an error. + return if prep_worker.exists() && exec_worker.exists() { + Ok(vec![(prep_worker, exec_worker)]) + } else { + Ok(vec![]) + } + } + + // Workers path not provided, check all possible valid locations. + + let mut workers_paths = vec![]; + + // Consider the polkadot binary directory. + { + let exe_path = get_exe_path()?; + + let (prep_worker, exec_worker) = + build_worker_paths(exe_path.clone(), workers_names.clone()); + + // Add to set if both workers exist. Warn on partial installs. + let (prep_worker_exists, exec_worker_exists) = (prep_worker.exists(), exec_worker.exists()); + if prep_worker_exists && exec_worker_exists { + log::trace!("Worker binaries found at current exe path: {:?}", exe_path); + workers_paths.push((prep_worker, exec_worker)); + } else if prep_worker_exists { + log::warn!("Worker binary found at {:?} but not {:?}", prep_worker, exec_worker); + } else if exec_worker_exists { + log::warn!("Worker binary found at {:?} but not {:?}", exec_worker, prep_worker); + } + } + + // Consider the /usr/lib/polkadot/ directory. + { + #[allow(unused_mut)] + let mut lib_path = PathBuf::from("/usr/lib/polkadot"); + #[cfg(test)] + if let Some(ref path_override) = *workers_lib_path_override().lock().unwrap() { + lib_path = path_override.clone(); + } + + let (prep_worker, exec_worker) = build_worker_paths(lib_path, workers_names); + + // Add to set if both workers exist. Warn on partial installs. + let (prep_worker_exists, exec_worker_exists) = (prep_worker.exists(), exec_worker.exists()); + if prep_worker_exists && exec_worker_exists { + log::trace!("Worker binaries found at /usr/lib/polkadot"); + workers_paths.push((prep_worker, exec_worker)); + } else if prep_worker_exists { + log::warn!("Worker binary found at {:?} but not {:?}", prep_worker, exec_worker); + } else if exec_worker_exists { + log::warn!("Worker binary found at {:?} but not {:?}", exec_worker, prep_worker); + } + } + + Ok(workers_paths) +} + +fn get_exe_path() -> Result { + let mut exe_path = std::env::current_exe()?; + let _ = exe_path.pop(); // executable file will always have a parent directory. + #[cfg(test)] + if let Some(ref path_override) = *workers_exe_path_override().lock().unwrap() { + exe_path = path_override.clone(); + } + Ok(exe_path) +} + +fn build_worker_paths( + worker_dir: PathBuf, + workers_names: Option<(String, String)>, +) -> (PathBuf, PathBuf) { + let (prep_worker_name, exec_worker_name) = workers_names.unwrap_or(( + polkadot_node_core_pvf::PREPARE_BINARY_NAME.to_string(), + polkadot_node_core_pvf::EXECUTE_BINARY_NAME.to_string(), + )); + + let mut prep_worker = worker_dir.clone(); + prep_worker.push(prep_worker_name); + let mut exec_worker = worker_dir; + exec_worker.push(exec_worker_name); + + (prep_worker, exec_worker) +} + +// Tests that set up a temporary directory tree according to what scenario we want to test and +// run worker detection. +#[cfg(test)] +mod tests { + use super::*; + + use assert_matches::assert_matches; + use serial_test::serial; + use std::{env::temp_dir, fs, os::unix::fs::PermissionsExt, path::Path}; + + const NODE_VERSION: &'static str = "v0.1.2"; + + /// Write a dummy executable to the path which satisfies the version check. + fn write_worker_exe(path: impl AsRef) -> Result<(), Box> { + let program = get_program(NODE_VERSION); + fs::write(&path, program)?; + Ok(fs::set_permissions(&path, fs::Permissions::from_mode(0o744))?) + } + + fn write_worker_exe_invalid_version( + path: impl AsRef, + version: &str, + ) -> Result<(), Box> { + let program = get_program(version); + fs::write(&path, program)?; + Ok(fs::set_permissions(&path, fs::Permissions::from_mode(0o744))?) + } + + fn get_program(version: &str) -> String { + format!( + "#!/bin/bash + +if [[ $# -ne 1 ]] ; then + echo \"unexpected number of arguments: $#\" + exit 1 +fi + +if [[ \"$1\" != \"--version\" ]] ; then + echo \"unexpected argument: $1\" + exit 1 +fi + +echo {} +", + version + ) + } + + /// Sets up an empty temp dir structure where the workers can be put by tests. Uses the temp dir + /// to override the standard locations where the node searches for the workers. + fn with_temp_dir_structure( + f: impl FnOnce(PathBuf, PathBuf) -> Result<(), Box>, + ) -> Result<(), Box> { + // Set up /usr/lib/polkadot and /usr/bin, both empty. + + let tempdir = temp_dir(); + let lib_path = tempdir.join("usr/lib/polkadot"); + let _ = fs::remove_dir_all(&lib_path); + fs::create_dir_all(&lib_path)?; + *workers_lib_path_override().lock()? = Some(lib_path); + + let exe_path = tempdir.join("usr/bin"); + let _ = fs::remove_dir_all(&exe_path); + fs::create_dir_all(&exe_path)?; + *workers_exe_path_override().lock()? = Some(exe_path.clone()); + + // Set up custom path at /usr/local/bin. + let custom_path = tempdir.join("usr/local/bin"); + let _ = fs::remove_dir_all(&custom_path); + fs::create_dir_all(&custom_path)?; + + f(tempdir, exe_path) + } + + #[test] + #[serial] + fn test_given_worker_path() { + with_temp_dir_structure(|tempdir, exe_path| { + let given_workers_path = tempdir.join("usr/local/bin"); + + // Try with provided workers path that has missing binaries. + assert_matches!( + determine_workers_paths(Some(given_workers_path.clone()), None, Some(NODE_VERSION.into())), + Err(Error::MissingWorkerBinaries { given_workers_path: Some(p1), current_exe_path: p2, workers_names: None }) if p1 == given_workers_path && p2 == exe_path + ); + + // Try with provided workers path that has non-executable binaries. + let prepare_worker_path = given_workers_path.join("polkadot-prepare-worker"); + write_worker_exe(&prepare_worker_path)?; + fs::set_permissions(&prepare_worker_path, fs::Permissions::from_mode(0o644))?; + let execute_worker_path = given_workers_path.join("polkadot-execute-worker"); + write_worker_exe(&execute_worker_path)?; + fs::set_permissions(&execute_worker_path, fs::Permissions::from_mode(0o644))?; + assert_matches!( + determine_workers_paths(Some(given_workers_path.clone()), None, Some(NODE_VERSION.into())), + Err(Error::InvalidWorkerBinaries { prep_worker_path: p1, exec_worker_path: p2 }) if p1 == prepare_worker_path && p2 == execute_worker_path + ); + + // Try with valid workers directory path that has executable binaries. + fs::set_permissions(&prepare_worker_path, fs::Permissions::from_mode(0o744))?; + fs::set_permissions(&execute_worker_path, fs::Permissions::from_mode(0o744))?; + assert_matches!( + determine_workers_paths(Some(given_workers_path), None, Some(NODE_VERSION.into())), + Ok((p1, p2)) if p1 == prepare_worker_path && p2 == execute_worker_path + ); + + // Try with valid provided workers path that is a binary file. + let given_workers_path = tempdir.join("usr/local/bin/puppet-worker"); + write_worker_exe(&given_workers_path)?; + assert_matches!( + determine_workers_paths(Some(given_workers_path.clone()), None, Some(NODE_VERSION.into())), + Ok((p1, p2)) if p1 == given_workers_path && p2 == given_workers_path + ); + + Ok(()) + }) + .unwrap(); + } + + #[test] + #[serial] + fn missing_workers_paths_throws_error() { + with_temp_dir_structure(|tempdir, exe_path| { + // Try with both binaries missing. + assert_matches!( + determine_workers_paths(None, None, Some(NODE_VERSION.into())), + Err(Error::MissingWorkerBinaries { given_workers_path: None, current_exe_path: p, workers_names: None }) if p == exe_path + ); + + // Try with only prep worker (at bin location). + let prepare_worker_path = tempdir.join("usr/bin/polkadot-prepare-worker"); + write_worker_exe(&prepare_worker_path)?; + assert_matches!( + determine_workers_paths(None, None, Some(NODE_VERSION.into())), + Err(Error::MissingWorkerBinaries { given_workers_path: None, current_exe_path: p, workers_names: None }) if p == exe_path + ); + + // Try with only exec worker (at bin location). + fs::remove_file(&prepare_worker_path)?; + let execute_worker_path = tempdir.join("usr/bin/polkadot-execute-worker"); + write_worker_exe(&execute_worker_path)?; + assert_matches!( + determine_workers_paths(None, None, Some(NODE_VERSION.into())), + Err(Error::MissingWorkerBinaries { given_workers_path: None, current_exe_path: p, workers_names: None }) if p == exe_path + ); + + // Try with only prep worker (at lib location). + fs::remove_file(&execute_worker_path)?; + let prepare_worker_path = tempdir.join("usr/lib/polkadot/polkadot-prepare-worker"); + write_worker_exe(&prepare_worker_path)?; + assert_matches!( + determine_workers_paths(None, None, Some(NODE_VERSION.into())), + Err(Error::MissingWorkerBinaries { given_workers_path: None, current_exe_path: p, workers_names: None }) if p == exe_path + ); + + // Try with only exec worker (at lib location). + fs::remove_file(&prepare_worker_path)?; + let execute_worker_path = tempdir.join("usr/lib/polkadot/polkadot-execute-worker"); + write_worker_exe(execute_worker_path)?; + assert_matches!( + determine_workers_paths(None, None, Some(NODE_VERSION.into())), + Err(Error::MissingWorkerBinaries { given_workers_path: None, current_exe_path: p, workers_names: None }) if p == exe_path + ); + + Ok(()) + }) + .unwrap() + } + + #[test] + #[serial] + fn should_find_workers_at_all_locations() { + with_temp_dir_structure(|tempdir, _| { + let prepare_worker_bin_path = tempdir.join("usr/bin/polkadot-prepare-worker"); + write_worker_exe(&prepare_worker_bin_path)?; + + let execute_worker_bin_path = tempdir.join("usr/bin/polkadot-execute-worker"); + write_worker_exe(&execute_worker_bin_path)?; + + let prepare_worker_lib_path = tempdir.join("usr/lib/polkadot/polkadot-prepare-worker"); + write_worker_exe(&prepare_worker_lib_path)?; + + let execute_worker_lib_path = tempdir.join("usr/lib/polkadot/polkadot-execute-worker"); + write_worker_exe(&execute_worker_lib_path)?; + + assert_matches!( + list_workers_paths(None, None), + Ok(v) if v == vec![(prepare_worker_bin_path, execute_worker_bin_path), (prepare_worker_lib_path, execute_worker_lib_path)] + ); + + Ok(()) + }) + .unwrap(); + } + + #[test] + #[serial] + fn should_find_workers_with_custom_names_at_all_locations() { + with_temp_dir_structure(|tempdir, _| { + let (prep_worker_name, exec_worker_name) = ("test-prepare", "test-execute"); + + let prepare_worker_bin_path = tempdir.join("usr/bin").join(prep_worker_name); + write_worker_exe(&prepare_worker_bin_path)?; + + let execute_worker_bin_path = tempdir.join("usr/bin").join(exec_worker_name); + write_worker_exe(&execute_worker_bin_path)?; + + let prepare_worker_lib_path = tempdir.join("usr/lib/polkadot").join(prep_worker_name); + write_worker_exe(&prepare_worker_lib_path)?; + + let execute_worker_lib_path = tempdir.join("usr/lib/polkadot").join(exec_worker_name); + write_worker_exe(&execute_worker_lib_path)?; + + assert_matches!( + list_workers_paths(None, Some((prep_worker_name.into(), exec_worker_name.into()))), + Ok(v) if v == vec![(prepare_worker_bin_path, execute_worker_bin_path), (prepare_worker_lib_path, execute_worker_lib_path)] + ); + + Ok(()) + }) + .unwrap(); + } + + #[test] + #[serial] + fn workers_version_mismatch_throws_error() { + let bad_version = "v9.9.9.9"; + + with_temp_dir_structure(|tempdir, _| { + // Workers at bin location return bad version. + let prepare_worker_bin_path = tempdir.join("usr/bin/polkadot-prepare-worker"); + let execute_worker_bin_path = tempdir.join("usr/bin/polkadot-execute-worker"); + write_worker_exe_invalid_version(&prepare_worker_bin_path, bad_version)?; + write_worker_exe(&execute_worker_bin_path)?; + assert_matches!( + determine_workers_paths(None, None, Some(NODE_VERSION.into())), + Err(Error::WorkerBinaryVersionMismatch { worker_version: v1, node_version: v2, worker_path: p }) if v1 == bad_version && v2 == NODE_VERSION && p == prepare_worker_bin_path + ); + + // Workers at lib location return bad version. + fs::remove_file(prepare_worker_bin_path)?; + fs::remove_file(execute_worker_bin_path)?; + let prepare_worker_lib_path = tempdir.join("usr/lib/polkadot/polkadot-prepare-worker"); + let execute_worker_lib_path = tempdir.join("usr/lib/polkadot/polkadot-execute-worker"); + write_worker_exe(&prepare_worker_lib_path)?; + write_worker_exe_invalid_version(&execute_worker_lib_path, bad_version)?; + assert_matches!( + determine_workers_paths(None, None, Some(NODE_VERSION.into())), + Err(Error::WorkerBinaryVersionMismatch { worker_version: v1, node_version: v2, worker_path: p }) if v1 == bad_version && v2 == NODE_VERSION && p == execute_worker_lib_path + ); + + // Workers at provided workers location return bad version. + let given_workers_path = tempdir.join("usr/local/bin"); + let prepare_worker_path = given_workers_path.join("polkadot-prepare-worker"); + let execute_worker_path = given_workers_path.join("polkadot-execute-worker"); + write_worker_exe_invalid_version(&prepare_worker_path, bad_version)?; + write_worker_exe_invalid_version(&execute_worker_path, bad_version)?; + assert_matches!( + determine_workers_paths(Some(given_workers_path), None, Some(NODE_VERSION.into())), + Err(Error::WorkerBinaryVersionMismatch { worker_version: v1, node_version: v2, worker_path: p }) if v1 == bad_version && v2 == NODE_VERSION && p == prepare_worker_path + ); + + // Given worker binary returns bad version. + let given_workers_path = tempdir.join("usr/local/bin/puppet-worker"); + write_worker_exe_invalid_version(&given_workers_path, bad_version)?; + assert_matches!( + determine_workers_paths(Some(given_workers_path.clone()), None, Some(NODE_VERSION.into())), + Err(Error::WorkerBinaryVersionMismatch { worker_version: v1, node_version: v2, worker_path: p }) if v1 == bad_version && v2 == NODE_VERSION && p == given_workers_path + ); + + Ok(()) + }) + .unwrap(); + } + + #[test] + #[serial] + fn should_find_valid_workers() { + // Test bin location. + with_temp_dir_structure(|tempdir, _| { + let prepare_worker_bin_path = tempdir.join("usr/bin/polkadot-prepare-worker"); + write_worker_exe(&prepare_worker_bin_path)?; + + let execute_worker_bin_path = tempdir.join("usr/bin/polkadot-execute-worker"); + write_worker_exe(&execute_worker_bin_path)?; + + assert_matches!( + determine_workers_paths(None, None, Some(NODE_VERSION.into())), + Ok((p1, p2)) if p1 == prepare_worker_bin_path && p2 == execute_worker_bin_path + ); + + Ok(()) + }) + .unwrap(); + + // Test lib location. + with_temp_dir_structure(|tempdir, _| { + let prepare_worker_lib_path = tempdir.join("usr/lib/polkadot/polkadot-prepare-worker"); + write_worker_exe(&prepare_worker_lib_path)?; + + let execute_worker_lib_path = tempdir.join("usr/lib/polkadot/polkadot-execute-worker"); + write_worker_exe(&execute_worker_lib_path)?; + + assert_matches!( + determine_workers_paths(None, None, Some(NODE_VERSION.into())), + Ok((p1, p2)) if p1 == prepare_worker_lib_path && p2 == execute_worker_lib_path + ); + + Ok(()) + }) + .unwrap(); + } +} diff --git a/node/test/service/README.md b/node/test/service/README.md new file mode 100644 index 000000000000..2fdee46a7f93 --- /dev/null +++ b/node/test/service/README.md @@ -0,0 +1,9 @@ +# polkadot-test-service + +## Testing + +Before running `cargo test` in this crate, make sure the worker binaries are built first. This can be done with: + +```sh +cargo build --bin polkadot-execute-worker --bin polkadot-prepare-worker +``` diff --git a/node/test/service/src/lib.rs b/node/test/service/src/lib.rs index 0cf52c0934d5..99ccacb78f7e 100644 --- a/node/test/service/src/lib.rs +++ b/node/test/service/src/lib.rs @@ -72,24 +72,40 @@ pub use polkadot_service::{FullBackend, GetLastTimestamp}; pub fn new_full( config: Configuration, is_collator: IsCollator, - worker_program_path: Option, + workers_path: Option, ) -> Result { + let workers_path = Some(workers_path.unwrap_or_else(get_relative_workers_path_for_test)); + polkadot_service::new_full( config, - is_collator, - None, - true, - None, - None, - worker_program_path, - false, - polkadot_service::RealOverseerGen, - None, - None, - None, + polkadot_service::NewFullParams { + is_collator, + grandpa_pause: None, + enable_beefy: true, + jaeger_agent: None, + telemetry_worker_handle: None, + node_version: None, + workers_path, + workers_names: None, + overseer_enable_anyways: false, + overseer_gen: polkadot_service::RealOverseerGen, + overseer_message_channel_capacity_override: None, + malus_finality_delay: None, + hwbench: None, + }, ) } +fn get_relative_workers_path_for_test() -> PathBuf { + // If no explicit worker path is passed in, we need to specify it ourselves as test binaries + // are in the "deps/" directory, one level below where the worker binaries are generated. + let mut exe_path = std::env::current_exe() + .expect("for test purposes it's reasonable to expect that this will not fail"); + let _ = exe_path.pop(); + let _ = exe_path.pop(); + exe_path +} + /// Returns a prometheus config usable for testing. pub fn test_prometheus_config(port: u16) -> PrometheusConfig { PrometheusConfig::new_with_default_registry( diff --git a/parachain/test-parachains/adder/collator/src/main.rs b/parachain/test-parachains/adder/collator/src/main.rs index 699cee202cb8..d4bfc50c8db7 100644 --- a/parachain/test-parachains/adder/collator/src/main.rs +++ b/parachain/test-parachains/adder/collator/src/main.rs @@ -58,16 +58,24 @@ fn main() -> Result<()> { let full_node = polkadot_service::build_full( config, - polkadot_service::IsCollator::Yes(collator.collator_key()), - None, - false, - None, - None, - false, - polkadot_service::RealOverseerGen, - None, - None, - None, + polkadot_service::NewFullParams { + is_collator: polkadot_service::IsCollator::Yes(collator.collator_key()), + grandpa_pause: None, + enable_beefy: false, + jaeger_agent: None, + telemetry_worker_handle: None, + + // Collators don't spawn PVF workers, so we can disable version checks. + node_version: None, + workers_path: None, + workers_names: None, + + overseer_enable_anyways: false, + overseer_gen: polkadot_service::RealOverseerGen, + overseer_message_channel_capacity_override: None, + malus_finality_delay: None, + hwbench: None, + }, ) .map_err(|e| e.to_string())?; let mut overseer_handle = full_node diff --git a/parachain/test-parachains/undying/collator/src/main.rs b/parachain/test-parachains/undying/collator/src/main.rs index 189674b82a97..3b6b4259aaec 100644 --- a/parachain/test-parachains/undying/collator/src/main.rs +++ b/parachain/test-parachains/undying/collator/src/main.rs @@ -58,16 +58,24 @@ fn main() -> Result<()> { let full_node = polkadot_service::build_full( config, - polkadot_service::IsCollator::Yes(collator.collator_key()), - None, - false, - None, - None, - false, - polkadot_service::RealOverseerGen, - None, - None, - None, + polkadot_service::NewFullParams { + is_collator: polkadot_service::IsCollator::Yes(collator.collator_key()), + grandpa_pause: None, + enable_beefy: false, + jaeger_agent: None, + telemetry_worker_handle: None, + + // Collators don't spawn PVF workers, so we can disable version checks. + node_version: None, + workers_path: None, + workers_names: None, + + overseer_enable_anyways: false, + overseer_gen: polkadot_service::RealOverseerGen, + overseer_message_channel_capacity_override: None, + malus_finality_delay: None, + hwbench: None, + }, ) .map_err(|e| e.to_string())?; let mut overseer_handle = full_node diff --git a/scripts/ci/dockerfiles/malus_injected.Dockerfile b/scripts/ci/dockerfiles/malus_injected.Dockerfile index 3f7f1313b38f..fa429b5f142a 100644 --- a/scripts/ci/dockerfiles/malus_injected.Dockerfile +++ b/scripts/ci/dockerfiles/malus_injected.Dockerfile @@ -38,8 +38,8 @@ RUN apt-get update && \ --uid 10000 nonroot -# add adder-collator binary to docker image -COPY ./malus /usr/local/bin +# add malus binary to docker image +COPY ./malus ./polkadot-execute-worker ./polkadot-prepare-worker /usr/local/bin USER nonroot diff --git a/scripts/ci/dockerfiles/polkadot_injected_debug.Dockerfile b/scripts/ci/dockerfiles/polkadot_injected_debug.Dockerfile index 128b802b7adc..aebbbdcf1b7f 100644 --- a/scripts/ci/dockerfiles/polkadot_injected_debug.Dockerfile +++ b/scripts/ci/dockerfiles/polkadot_injected_debug.Dockerfile @@ -33,12 +33,14 @@ RUN apt-get update && \ ln -s /data /polkadot/.local/share/polkadot # add polkadot binary to docker image -COPY ./polkadot ./polkadot-*-worker /usr/local/bin +COPY ./polkadot ./polkadot-execute-worker ./polkadot-prepare-worker /usr/local/bin USER polkadot # check if executable works in this container RUN /usr/local/bin/polkadot --version +RUN /usr/local/bin/polkadot-execute-worker --version +RUN /usr/local/bin/polkadot-prepare-worker --version EXPOSE 30333 9933 9944 VOLUME ["/polkadot"] diff --git a/scripts/ci/dockerfiles/polkadot_injected_release.Dockerfile b/scripts/ci/dockerfiles/polkadot_injected_release.Dockerfile index ba0a79e78187..74b5c7f48f88 100644 --- a/scripts/ci/dockerfiles/polkadot_injected_release.Dockerfile +++ b/scripts/ci/dockerfiles/polkadot_injected_release.Dockerfile @@ -44,6 +44,8 @@ USER polkadot # check if executable works in this container RUN /usr/bin/polkadot --version +RUN /usr/bin/polkadot-execute-worker --version +RUN /usr/bin/polkadot-prepare-worker --version EXPOSE 30333 9933 9944 VOLUME ["/polkadot"] diff --git a/scripts/ci/gitlab/pipeline/build.yml b/scripts/ci/gitlab/pipeline/build.yml index d07037626d65..dafca393cd4f 100644 --- a/scripts/ci/gitlab/pipeline/build.yml +++ b/scripts/ci/gitlab/pipeline/build.yml @@ -26,8 +26,8 @@ build-linux-stable: - mkdir -p ./artifacts - VERSION="${CI_COMMIT_REF_NAME}" # will be tag or branch name - mv ./target/testnet/polkadot ./artifacts/. - - mv ./target/testnet/polkadot-prepare-worker ./artifacts/. 2>/dev/null || true - - mv ./target/testnet/polkadot-execute-worker ./artifacts/. 2>/dev/null || true + - mv ./target/testnet/polkadot-prepare-worker ./artifacts/. + - mv ./target/testnet/polkadot-execute-worker ./artifacts/. - pushd artifacts - sha256sum polkadot | tee polkadot.sha256 - shasum -c polkadot.sha256 @@ -83,6 +83,8 @@ build-malus: # pack artifacts - mkdir -p ./artifacts - mv ./target/testnet/malus ./artifacts/. + - mv ./target/testnet/polkadot-execute-worker ./artifacts/. + - mv ./target/testnet/polkadot-prepare-worker ./artifacts/. - echo -n "${CI_COMMIT_REF_NAME}" > ./artifacts/VERSION - echo -n "${CI_COMMIT_REF_NAME}-${CI_COMMIT_SHORT_SHA}" > ./artifacts/EXTRATAG - echo "polkadot-test-malus = $(cat ./artifacts/VERSION) (EXTRATAG = $(cat ./artifacts/EXTRATAG))" diff --git a/scripts/ci/gitlab/pipeline/test.yml b/scripts/ci/gitlab/pipeline/test.yml index 60886dc60cde..b45c4c1be890 100644 --- a/scripts/ci/gitlab/pipeline/test.yml +++ b/scripts/ci/gitlab/pipeline/test.yml @@ -89,6 +89,9 @@ test-node-metrics: # but still want to have debug assertions. RUSTFLAGS: "-Cdebug-assertions=y -Dwarnings" script: + # Build the required workers. + - cargo build --bin polkadot-execute-worker --bin polkadot-prepare-worker --profile testnet --verbose --locked + # Run tests. - time cargo test --profile testnet --verbose --locked --features=runtime-metrics -p polkadot-node-metrics test-deterministic-wasm: diff --git a/scripts/ci/gitlab/pipeline/zombienet.yml b/scripts/ci/gitlab/pipeline/zombienet.yml index 5f51b06e2e78..cc4a7eb2ccc1 100644 --- a/scripts/ci/gitlab/pipeline/zombienet.yml +++ b/scripts/ci/gitlab/pipeline/zombienet.yml @@ -237,7 +237,7 @@ zombienet-tests-misc-upgrade-node: - export ZOMBIENET_INTEGRATION_TEST_IMAGE="docker.io/parity/polkadot:latest" - export COL_IMAGE=${COLLATOR_IMAGE_NAME}:${COLLATOR_IMAGE_TAG} - BUILD_LINUX_JOB_ID="$(cat ./artifacts/BUILD_LINUX_JOB_ID)" - - export POLKADOT_PR_BIN_URL="https://gitlab.parity.io/parity/mirrors/polkadot/-/jobs/${BUILD_LINUX_JOB_ID}/artifacts/raw/artifacts/polkadot" + - export POLKADOT_PR_ARTIFACTS_URL="https://gitlab.parity.io/parity/mirrors/polkadot/-/jobs/${BUILD_LINUX_JOB_ID}/artifacts/raw/artifacts" script: - /home/nonroot/zombie-net/scripts/ci/run-test-env-manager.sh --github-remote-dir="${GH_DIR}" diff --git a/src/bin/execute-worker.rs b/src/bin/execute-worker.rs new file mode 100644 index 000000000000..72cab799d753 --- /dev/null +++ b/src/bin/execute-worker.rs @@ -0,0 +1,23 @@ +// Copyright (C) Parity Technologies (UK) Ltd. +// This file is part of Polkadot. + +// Polkadot is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Polkadot is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Polkadot. If not, see . + +//! Execute worker. + +polkadot_node_core_pvf_common::decl_worker_main!( + "execute-worker", + polkadot_node_core_pvf_execute_worker::worker_entrypoint, + env!("SUBSTRATE_CLI_IMPL_VERSION") +); diff --git a/src/bin/prepare-worker.rs b/src/bin/prepare-worker.rs new file mode 100644 index 000000000000..695f66cc7b7d --- /dev/null +++ b/src/bin/prepare-worker.rs @@ -0,0 +1,23 @@ +// Copyright (C) Parity Technologies (UK) Ltd. +// This file is part of Polkadot. + +// Polkadot is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Polkadot is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Polkadot. If not, see . + +//! Prepare worker. + +polkadot_node_core_pvf_common::decl_worker_main!( + "prepare-worker", + polkadot_node_core_pvf_prepare_worker::worker_entrypoint, + env!("SUBSTRATE_CLI_IMPL_VERSION") +); diff --git a/tests/workers.rs b/tests/workers.rs new file mode 100644 index 000000000000..2872a1298dcd --- /dev/null +++ b/tests/workers.rs @@ -0,0 +1,38 @@ +// Copyright (C) Parity Technologies (UK) Ltd. +// This file is part of Polkadot. + +// Polkadot is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Polkadot is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Polkadot. If not, see . + +use polkadot_cli::NODE_VERSION; +use std::process::Command; + +const PREPARE_WORKER_EXE: &str = env!("CARGO_BIN_EXE_polkadot-prepare-worker"); +const EXECUTE_WORKER_EXE: &str = env!("CARGO_BIN_EXE_polkadot-execute-worker"); + +#[test] +fn worker_binaries_have_same_version_as_node() { + let prep_worker_version = + Command::new(&PREPARE_WORKER_EXE).args(["--version"]).output().unwrap().stdout; + let prep_worker_version = std::str::from_utf8(&prep_worker_version) + .expect("version is printed as a string; qed") + .trim(); + assert_eq!(prep_worker_version, NODE_VERSION); + + let exec_worker_version = + Command::new(&EXECUTE_WORKER_EXE).args(["--version"]).output().unwrap().stdout; + let exec_worker_version = std::str::from_utf8(&exec_worker_version) + .expect("version is printed as a string; qed") + .trim(); + assert_eq!(exec_worker_version, NODE_VERSION); +} diff --git a/zombienet_tests/README.md b/zombienet_tests/README.md index 5a4c97355f09..84334c3e1cfe 100644 --- a/zombienet_tests/README.md +++ b/zombienet_tests/README.md @@ -18,7 +18,7 @@ To run any test locally use the native provider (`zombienet test -p native ...`) * adder-collator -> polkadot/target/testnet/adder-collator * malus -> polkadot/target/testnet/malus -* polkadot -> polkadot/target/testnet/polkadot +* polkadot -> polkadot/target/testnet/polkadot, polkadot/target/testnet/polkadot-prepare-worker, polkadot/target/testnet/polkadot-execute-worker * polkadot-collator -> cumulus/target/release/polkadot-parachain * undying-collator -> polkadot/target/testnet/undying-collator diff --git a/zombienet_tests/misc/0002-download-polkadot-from-pr.sh b/zombienet_tests/misc/0002-download-polkadot-from-pr.sh index 7ff323d9c41f..0d4b28075795 100644 --- a/zombienet_tests/misc/0002-download-polkadot-from-pr.sh +++ b/zombienet_tests/misc/0002-download-polkadot-from-pr.sh @@ -12,6 +12,8 @@ export PATH=$CFG_DIR:$PATH cd $CFG_DIR # see 0002-upgrade-node.zndsl to view the args. -curl -L -O $1 -chmod +x $CFG_DIR/polkadot +curl -L -O $1/polkadot +curl -L -O $1/polkadot-prepare-worker +curl -L -O $1/polkadot-execute-worker +chmod +x $CFG_DIR/polkadot $CFG_DIR/polkadot-prepare-worker $CFG_DIR/polkadot-execute-worker echo $(polkadot --version) diff --git a/zombienet_tests/misc/0002-upgrade-node.zndsl b/zombienet_tests/misc/0002-upgrade-node.zndsl index 7b23a2604989..fdf16b7286c9 100644 --- a/zombienet_tests/misc/0002-upgrade-node.zndsl +++ b/zombienet_tests/misc/0002-upgrade-node.zndsl @@ -7,12 +7,12 @@ dave: parachain 2001 block height is at least 10 within 200 seconds # upgrade both nodes # For testing using native provider you should set this env var -# POLKADOT_PR_BIN_URL=https://gitlab.parity.io/parity/mirrors/polkadot/-/jobs/1842869/artifacts/raw/artifacts/polkadot +# POLKADOT_PR_ARTIFACTS_URL=https://gitlab.parity.io/parity/mirrors/polkadot/-/jobs/1842869/artifacts/raw/artifacts # with the version of polkadot you want to download. # avg 30s in our infra -alice: run ./0002-download-polkadot-from-pr.sh with "{{POLKADOT_PR_BIN_URL}}" within 40 seconds -bob: run ./0002-download-polkadot-from-pr.sh with "{{POLKADOT_PR_BIN_URL}}" within 40 seconds +alice: run ./0002-download-polkadot-from-pr.sh with "{{POLKADOT_PR_ARTIFACTS_URL}}" within 40 seconds +bob: run ./0002-download-polkadot-from-pr.sh with "{{POLKADOT_PR_ARTIFACTS_URL}}" within 40 seconds alice: restart after 5 seconds bob: restart after 5 seconds