From 5ab58809050485b6c0bfbc8f5c412406d0d3fd10 Mon Sep 17 00:00:00 2001 From: Edoardo Marangoni Date: Wed, 22 May 2024 18:48:07 +0200 Subject: [PATCH 1/2] feat(argus): Compare v2 and v3 + add webhook message --- Cargo.lock | 1 + tests/wasmer-argus/Cargo.toml | 1 + tests/wasmer-argus/src/argus/config.rs | 4 + tests/wasmer-argus/src/argus/mod.rs | 86 ++++++++++++++++--- tests/wasmer-argus/src/argus/packages.rs | 44 ++++++++-- .../src/argus/tester/cli_tester.rs | 57 ++++++++---- .../src/argus/tester/lib_tester.rs | 60 ++++++++++--- 7 files changed, 202 insertions(+), 51 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index d8469af6d49..5be4f08b765 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6405,6 +6405,7 @@ dependencies = [ "reqwest", "serde", "serde_json", + "shared-buffer", "tokio 1.37.0", "tracing", "tracing-subscriber", diff --git a/tests/wasmer-argus/Cargo.toml b/tests/wasmer-argus/Cargo.toml index 624082886dc..831eb86162c 100644 --- a/tests/wasmer-argus/Cargo.toml +++ b/tests/wasmer-argus/Cargo.toml @@ -29,6 +29,7 @@ derive_more = "0.99.17" webc.workspace = true async-trait = "0.1.77" wasmer-api = { path = "../../lib/backend-api" } +shared-buffer.workspace = true [target.'cfg(not(target_arch = "riscv64"))'.dependencies] diff --git a/tests/wasmer-argus/src/argus/config.rs b/tests/wasmer-argus/src/argus/config.rs index 5a325f0e769..92c7bbdca4a 100644 --- a/tests/wasmer-argus/src/argus/config.rs +++ b/tests/wasmer-argus/src/argus/config.rs @@ -61,4 +61,8 @@ pub struct ArgusConfig { #[cfg(feature = "wasmer_lib")] #[arg(long, conflicts_with = "cli_path")] pub use_lib: bool, + + /// The webhook to use when sending the test outcome. + #[arg(long)] + pub webhook_url: Option, } diff --git a/tests/wasmer-argus/src/argus/mod.rs b/tests/wasmer-argus/src/argus/mod.rs index e205671b8b6..1a055cb8b7b 100644 --- a/tests/wasmer-argus/src/argus/mod.rs +++ b/tests/wasmer-argus/src/argus/mod.rs @@ -5,9 +5,13 @@ mod tester; use self::tester::{TestReport, Tester}; pub use config::*; use indicatif::{MultiProgress, ProgressBar}; -use std::{fs::OpenOptions, io::Write as _, path::Path, sync::Arc, time::Duration}; +use reqwest::header::CONTENT_TYPE; +use std::{fs::OpenOptions, io::Write as _, ops::AddAssign, path::Path, sync::Arc, time::Duration}; use tokio::{ - sync::{mpsc, Semaphore}, + sync::{ + mpsc::{self, UnboundedSender}, + Mutex, Semaphore, + }, task::JoinSet, }; use tracing::*; @@ -38,6 +42,8 @@ impl Argus { let m = MultiProgress::new(); let (s, mut r) = mpsc::unbounded_channel(); + let (successes_sx, successes_rx) = mpsc::unbounded_channel(); + let (failures_sx, failures_rx) = mpsc::unbounded_channel(); let mut pool = JoinSet::new(); let c = Arc::new(self.config.clone()); @@ -45,11 +51,15 @@ impl Argus { { let this = self.clone(); let bar = m.add(ProgressBar::new(0)); - - pool.spawn(async move { this.fetch_packages(s, bar, c.clone()).await }); + pool.spawn(async move { + this.fetch_packages(s, bar, c.clone(), successes_rx, failures_rx) + .await + }); } let mut count = 0; + let successes = Arc::new(Mutex::new(0)); + let failures = Arc::new(Mutex::new(0)); let c = Arc::new(self.config.clone()); let sem = Arc::new(Semaphore::new(self.config.jobs)); @@ -58,10 +68,29 @@ impl Argus { let c = c.clone(); let bar = m.add(ProgressBar::new(0)); let permit = Arc::clone(&sem).acquire_owned().await; + let successes_sx = successes_sx.clone(); + let failures_sx = failures_sx.clone(); + let failures = failures.clone(); + let successes = successes.clone(); pool.spawn(async move { let _permit = permit; - Argus::test(count, c, &pkg, bar).await + let ret = match Argus::test(count, c, &pkg, bar, successes_sx, failures_sx).await { + Err(e) => { + failures.lock().await.add_assign(1); + Err(e) + } + Ok(true) => { + successes.lock().await.add_assign(1); + Ok(()) + } + Ok(false) => { + failures.lock().await.add_assign(1); + Ok(()) + } + }; + + ret }); count += 1; @@ -69,10 +98,24 @@ impl Argus { while let Some(t) = pool.join_next().await { if let Err(e) = t { - error!("task failed: {e}") + error!("{:?}", e) } } + if let Some(webhook_url) = self.config.webhook_url { + let url = url::Url::parse(&webhook_url)?; + reqwest::Client::new() + .post(url) + .header(CONTENT_TYPE, "application/json") + .body(format!( + r#"{{"text":"Argus run report: {} tests succeeded, {} failed"}}"#, + successes.lock().await, + failures.lock().await + )) + .send() + .await?; + } + info!("done!"); Ok(()) } @@ -83,7 +126,9 @@ impl Argus { config: Arc, package: &PackageVersionWithPackage, p: ProgressBar, - ) -> anyhow::Result<()> { + successes_sx: UnboundedSender<()>, + failures_sx: UnboundedSender<()>, + ) -> anyhow::Result { p.set_style( indicatif::ProgressStyle::with_template(&format!( "[{test_id}] {{spinner:.blue}} {{msg}}" @@ -95,12 +140,21 @@ impl Argus { p.enable_steady_tick(Duration::from_millis(100)); let package_name = Argus::get_package_id(package); - let webc_url: Url = match &package.distribution_v2.pirita_download_url { + let webc_v2_url: Url = match &package.distribution_v2.pirita_download_url { + Some(url) => url.parse().unwrap(), + None => { + info!("package {} has no download url, skipping", package_name); + p.finish_and_clear(); + return Ok(true); + } + }; + + let webc_v3_url: Url = match &package.distribution_v3.pirita_download_url { Some(url) => url.parse().unwrap(), None => { info!("package {} has no download url, skipping", package_name); p.finish_and_clear(); - return Ok(()); + return Ok(true); } }; @@ -129,10 +183,10 @@ impl Argus { }; if !runner.is_to_test().await { - return Ok(()); + return Ok(true); } - Argus::download_package(test_id, &path, &webc_url, &p).await?; + Argus::download_webcs(test_id, &path, &webc_v2_url, &webc_v3_url, &p).await?; info!("package downloaded!"); @@ -150,14 +204,20 @@ impl Argus { p.set_message("package downloaded"); let report = runner.run_test().await?; - info!("\n\n\n\ntest finished\n\n\n"); + let outcome = report.outcome.is_ok(); + + if outcome { + successes_sx.send(())?; + } else { + failures_sx.send(())?; + }; Argus::write_report(&path, report).await?; p.finish_with_message(format!("test for package {package_name} done!")); p.finish_and_clear(); - Ok(()) + Ok(outcome) } /// Checks whether or not the package should be tested diff --git a/tests/wasmer-argus/src/argus/packages.rs b/tests/wasmer-argus/src/argus/packages.rs index d2eccccf7ea..d55dc03a0ff 100644 --- a/tests/wasmer-argus/src/argus/packages.rs +++ b/tests/wasmer-argus/src/argus/packages.rs @@ -4,7 +4,11 @@ use futures::StreamExt; use indicatif::{ProgressBar, ProgressStyle}; use reqwest::{header, Client}; use std::{path::PathBuf, sync::Arc, time::Duration}; -use tokio::{fs::File, io::AsyncWriteExt, sync::mpsc::UnboundedSender}; +use tokio::{ + fs::File, + io::AsyncWriteExt, + sync::mpsc::{UnboundedReceiver, UnboundedSender}, +}; use tracing::*; use url::Url; use wasmer_api::{ @@ -20,6 +24,8 @@ impl Argus { s: UnboundedSender, p: ProgressBar, config: Arc, + successes_rx: UnboundedReceiver<()>, + failures_rx: UnboundedReceiver<()>, ) -> anyhow::Result<()> { info!("starting to fetch packages.."); let vars = AllPackageVersionsVars { @@ -49,7 +55,12 @@ impl Argus { anyhow::bail!("failed to fetch packages: {e}") } }; - p.set_message(format!("fetched {} packages", count)); + p.set_message(format!( + "fetched {} packages [ok: {}, err: {}]", + count, + successes_rx.len(), + failures_rx.len() + )); count += pkgs.len(); for pkg in pkgs { @@ -70,7 +81,19 @@ impl Argus { } #[tracing::instrument(skip(p))] - pub(crate) async fn download_package<'a>( + pub(crate) async fn download_webcs<'a>( + test_id: u64, + path: &'a PathBuf, + webc_v2_url: &'a Url, + webc_v3_url: &'a Url, + p: &'a ProgressBar, + ) -> anyhow::Result<()> { + Argus::download_package(test_id, &path.join("package_v2.webc"), webc_v2_url, p).await?; + Argus::download_package(test_id, &path.join("package_v3.webc"), webc_v3_url, p).await?; + Ok(()) + } + + async fn download_package<'a>( test_id: u64, path: &'a PathBuf, url: &'a Url, @@ -80,9 +103,12 @@ impl Argus { static APP_USER_AGENT: &str = concat!(env!("CARGO_PKG_NAME"), "/", env!("CARGO_PKG_VERSION"),); - if !path.exists() { - tokio::fs::create_dir_all(path).await?; - } else if path.exists() && !path.is_dir() { + let mut dir_path = path.clone(); + dir_path.pop(); + + if !dir_path.exists() { + tokio::fs::create_dir_all(dir_path).await?; + } else if dir_path.exists() && !dir_path.is_dir() { anyhow::bail!("path {:?} exists, but it is not a directory!", path) } @@ -120,19 +146,19 @@ impl Argus { p.set_message(format!("downloading from {url}")); - let mut outfile = match File::create(&path.join("package.webc")).await { + let mut outfile = match File::create(&path).await { Ok(o) => o, Err(e) => { error!( "[{test_id}] failed to create file at {:?}. Error: {e}", - path.join("package.webc") + path.display() ); p.finish_and_clear(); anyhow::bail!( "[{test_id}] failed to create file at {:?}. Error: {e}", - path.join("package.webc") + path.display() ); } }; diff --git a/tests/wasmer-argus/src/argus/tester/cli_tester.rs b/tests/wasmer-argus/src/argus/tester/cli_tester.rs index 8cbfa766ed2..548a8441604 100644 --- a/tests/wasmer-argus/src/argus/tester/cli_tester.rs +++ b/tests/wasmer-argus/src/argus/tester/cli_tester.rs @@ -4,11 +4,7 @@ use std::{fs::File, io::BufReader, path::Path, process::Command, sync::Arc}; use tokio::time::{self, Instant}; use tracing::*; use wasmer_api::types::PackageVersionWithPackage; -use webc::{ - v1::{ParseOptions, WebCOwned}, - v2::read::OwnedReader, - Container, Version, -}; +use webc::{v2::read::OwnedReader, v3::read::OwnedReader as OwnedReaderV3, Container, Version}; use super::{TestReport, Tester}; @@ -158,27 +154,22 @@ impl<'a> Tester for CLIRunner<'a> { info!("starting test using CLI at {cli_path}"); let dir_path = Argus::get_path(self.config.clone(), self.package).await; - let webc_path = dir_path.join("package.webc"); + let webc_v2_path = dir_path.join("package_v2.webc"); self.p - .set_message(format!("unpacking webc at {:?}", webc_path)); + .set_message(format!("unpacking webc at {:?}", webc_v2_path)); - let bytes = std::fs::read(&webc_path)?; + let v2_bytes = std::fs::read(&webc_v2_path)?; - let webc = match webc::detect(bytes.as_slice()) { - Ok(Version::V1) => { - let options = ParseOptions::default(); - let webc = WebCOwned::parse(bytes, &options)?; - Container::from(webc) - } - Ok(Version::V2) => Container::from(OwnedReader::parse(bytes)?), + let webc_v2 = match webc::detect(v2_bytes.as_slice()) { + Ok(Version::V2) => Container::from(OwnedReader::parse(v2_bytes)?), Ok(other) => { return self.err(version, start_time, format!("Unsupported version, {other}")) } Err(e) => return self.err(version, start_time, format!("An error occurred: {e}")), }; - for (i, atom) in webc.atoms().iter().enumerate() { + for (i, atom) in webc_v2.atoms().iter().enumerate() { self.p.set_message(format!("testing atom #{i}")); if let Err(e) = self .test_atom(&cli_path, atom.1.as_slice(), &dir_path, i) @@ -188,6 +179,40 @@ impl<'a> Tester for CLIRunner<'a> { } } + let webc_v3_path = dir_path.join("package_v3.webc"); + + self.p + .set_message(format!("unpacking webc at {:?}", webc_v3_path)); + + let v3_bytes = std::fs::read(&webc_v3_path)?; + + let webc_v3 = match webc::detect(v3_bytes.as_slice()) { + Ok(Version::V3) => Container::from(OwnedReaderV3::parse(v3_bytes)?), + Ok(other) => { + return self.err(version, start_time, format!("Unsupported version, {other}")) + } + Err(e) => return self.err(version, start_time, format!("An error occurred: {e}")), + }; + + for (i, atom) in webc_v3.atoms().iter().enumerate() { + self.p.set_message(format!("testing atom #{i}")); + if let Err(e) = self + .test_atom(&cli_path, atom.1.as_slice(), &dir_path, i) + .await? + { + return self.err(version.clone(), start_time, e); + } + } + + let v2_file = std::fs::File::open(&webc_v2_path)?; + let v3_file = std::fs::File::open(&webc_v3_path)?; + if let Err(e) = webc::migration::are_semantically_equivalent( + shared_buffer::OwnedBuffer::from_file(&v2_file)?, + shared_buffer::OwnedBuffer::from_file(&v3_file)?, + ) { + return self.err(version.clone(), start_time, e.to_string()); + } + self.ok(version, start_time) } diff --git a/tests/wasmer-argus/src/argus/tester/lib_tester.rs b/tests/wasmer-argus/src/argus/tester/lib_tester.rs index 6adb5a2d5a1..8d27bd290e9 100644 --- a/tests/wasmer-argus/src/argus/tester/lib_tester.rs +++ b/tests/wasmer-argus/src/argus/tester/lib_tester.rs @@ -6,11 +6,7 @@ use tokio::time; use tracing::*; use wasmer::{sys::Features, Engine, NativeEngineExt, Target}; use wasmer_api::types::PackageVersionWithPackage; -use webc::{ - v1::{ParseOptions, WebCOwned}, - v2::read::OwnedReader, - Container, Version, -}; +use webc::{v2::read::OwnedReader, v3::read::OwnedReader as OwnedReaderV3, Container, Version}; pub struct LibRunner<'a> { pub test_id: u64, @@ -70,19 +66,15 @@ impl<'a> Tester for LibRunner<'a> { let start = time::Instant::now(); let dir_path = Argus::get_path(self.config.clone(), self.package).await; - let webc_path = dir_path.join("package.webc"); + let webc_v2_path = dir_path.join("package_v2.webc"); + let webc_v3_path = dir_path.join("package_v3.webc"); let test_exec_result = std::panic::catch_unwind(|| { self.p.set_message("reading webc bytes from filesystem"); - let bytes = std::fs::read(&webc_path)?; + let bytes = std::fs::read(&webc_v2_path)?; let store = wasmer::Store::new(Self::backend_to_engine(&self.config.compiler_backend)); let webc = match webc::detect(bytes.as_slice()) { - Ok(Version::V1) => { - let options = ParseOptions::default(); - let webc = WebCOwned::parse(bytes, &options)?; - Container::from(webc) - } Ok(Version::V2) => Container::from(OwnedReader::parse(bytes)?), Ok(other) => anyhow::bail!("Unsupported version, {other}"), Err(e) => anyhow::bail!("An error occurred: {e}"), @@ -104,12 +96,54 @@ impl<'a> Tester for LibRunner<'a> { wasmer::Module::new(&store, atom.1.as_slice())?; } + self.p.set_message("reading webc bytes from filesystem"); + let bytes = std::fs::read(&webc_v3_path)?; + let store = wasmer::Store::new(Self::backend_to_engine(&self.config.compiler_backend)); + + let webc = match webc::detect(bytes.as_slice()) { + Ok(Version::V3) => { + let c = Container::from(OwnedReaderV3::parse(bytes)?); + println!("\n\n\n c: {:?} \n\n\n", c.version()); + c + } + Ok(other) => anyhow::bail!("Unsupported version, {other}"), + Err(e) => anyhow::bail!("An error occurred: {e}"), + }; + + self.p.set_message("created webc"); + + for atom in webc.atoms().iter() { + info!( + "creating module for atom {} with length {}", + atom.0, + atom.1.len() + ); + self.p.set_message(format!( + "[{package_id}] creating module for atom {} (has length {} bytes)", + atom.0, + atom.1.len() + )); + wasmer::Module::new(&store, atom.1.as_slice())?; + } + Ok(()) }); let outcome = match test_exec_result { Ok(r) => match r { - Ok(_) => Ok(String::from("test passed")), + Ok(_) => { + let v2_file = std::fs::File::open(&webc_v2_path)?; + let v3_file = std::fs::File::open(&webc_v3_path)?; + + if let Err(e) = webc::migration::are_semantically_equivalent( + shared_buffer::OwnedBuffer::from_file(&v2_file)?, + shared_buffer::OwnedBuffer::from_file(&v3_file)?, + ) { + Err(e.to_string()) + } else { + Ok(String::from("test passed")) + } + } Err(e) => Err(format!("{e}")), }, Err(e) => Err(format!("{:?}", e)), From ba8d3148a6e95ea0e9e305eb262e98e53d383e8b Mon Sep 17 00:00:00 2001 From: Edoardo Marangoni Date: Wed, 22 May 2024 18:58:41 +0200 Subject: [PATCH 2/2] fix(argus): bump webc version and make linter happy --- Cargo.lock | 4 ++-- Cargo.toml | 2 +- tests/wasmer-argus/src/argus/mod.rs | 6 ++---- 3 files changed, 5 insertions(+), 7 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 5be4f08b765..96e3127331d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7275,9 +7275,9 @@ dependencies = [ [[package]] name = "webc" -version = "6.0.0-alpha8" +version = "6.0.0-alpha9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bbf53893f8df356f1305446c1bc59c4082cb592f39ffcae0a2f10bd8ed100bb9" +checksum = "9e1b4e8dd987046eede4348d660404ff990412631b7d493f9e547adcf2862cd5" dependencies = [ "anyhow", "base64 0.21.7", diff --git a/Cargo.toml b/Cargo.toml index 654bacc6ee0..b59967b3eb5 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -92,7 +92,7 @@ wasmer-config = { path = "./lib/config" } wasmer-wasix = { path = "./lib/wasix" } # Wasmer-owned crates -webc = { version = "6.0.0-alpha8", default-features = false, features = ["package"] } +webc = { version = "6.0.0-alpha9", default-features = false, features = ["package"] } edge-schema = { version = "=0.1.0" } shared-buffer = "0.1.4" diff --git a/tests/wasmer-argus/src/argus/mod.rs b/tests/wasmer-argus/src/argus/mod.rs index 1a055cb8b7b..63260730592 100644 --- a/tests/wasmer-argus/src/argus/mod.rs +++ b/tests/wasmer-argus/src/argus/mod.rs @@ -75,7 +75,7 @@ impl Argus { pool.spawn(async move { let _permit = permit; - let ret = match Argus::test(count, c, &pkg, bar, successes_sx, failures_sx).await { + match Argus::test(count, c, &pkg, bar, successes_sx, failures_sx).await { Err(e) => { failures.lock().await.add_assign(1); Err(e) @@ -88,9 +88,7 @@ impl Argus { failures.lock().await.add_assign(1); Ok(()) } - }; - - ret + } }); count += 1;