Skip to content

Commit

Permalink
feat(argus): Compare v2 and v3 + add webhook message
Browse files Browse the repository at this point in the history
  • Loading branch information
xdoardo committed May 22, 2024
1 parent dacf004 commit 5ab5880
Show file tree
Hide file tree
Showing 7 changed files with 202 additions and 51 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions tests/wasmer-argus/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ derive_more = "0.99.17"
webc.workspace = true
async-trait = "0.1.77"
wasmer-api = { path = "../../lib/backend-api" }
shared-buffer.workspace = true


[target.'cfg(not(target_arch = "riscv64"))'.dependencies]
Expand Down
4 changes: 4 additions & 0 deletions tests/wasmer-argus/src/argus/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,4 +61,8 @@ pub struct ArgusConfig {
#[cfg(feature = "wasmer_lib")]
#[arg(long, conflicts_with = "cli_path")]
pub use_lib: bool,

/// The webhook to use when sending the test outcome.
#[arg(long)]
pub webhook_url: Option<String>,
}
86 changes: 73 additions & 13 deletions tests/wasmer-argus/src/argus/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,13 @@ mod tester;
use self::tester::{TestReport, Tester};
pub use config::*;
use indicatif::{MultiProgress, ProgressBar};
use std::{fs::OpenOptions, io::Write as _, path::Path, sync::Arc, time::Duration};
use reqwest::header::CONTENT_TYPE;
use std::{fs::OpenOptions, io::Write as _, ops::AddAssign, path::Path, sync::Arc, time::Duration};
use tokio::{
sync::{mpsc, Semaphore},
sync::{
mpsc::{self, UnboundedSender},
Mutex, Semaphore,
},
task::JoinSet,
};
use tracing::*;
Expand Down Expand Up @@ -38,18 +42,24 @@ impl Argus {

let m = MultiProgress::new();
let (s, mut r) = mpsc::unbounded_channel();
let (successes_sx, successes_rx) = mpsc::unbounded_channel();
let (failures_sx, failures_rx) = mpsc::unbounded_channel();

let mut pool = JoinSet::new();
let c = Arc::new(self.config.clone());

{
let this = self.clone();
let bar = m.add(ProgressBar::new(0));

pool.spawn(async move { this.fetch_packages(s, bar, c.clone()).await });
pool.spawn(async move {
this.fetch_packages(s, bar, c.clone(), successes_rx, failures_rx)
.await
});
}

let mut count = 0;
let successes = Arc::new(Mutex::new(0));
let failures = Arc::new(Mutex::new(0));

let c = Arc::new(self.config.clone());
let sem = Arc::new(Semaphore::new(self.config.jobs));
Expand All @@ -58,21 +68,54 @@ impl Argus {
let c = c.clone();
let bar = m.add(ProgressBar::new(0));
let permit = Arc::clone(&sem).acquire_owned().await;
let successes_sx = successes_sx.clone();
let failures_sx = failures_sx.clone();
let failures = failures.clone();
let successes = successes.clone();

pool.spawn(async move {
let _permit = permit;
Argus::test(count, c, &pkg, bar).await
let ret = match Argus::test(count, c, &pkg, bar, successes_sx, failures_sx).await {
Err(e) => {
failures.lock().await.add_assign(1);
Err(e)
}
Ok(true) => {
successes.lock().await.add_assign(1);
Ok(())
}
Ok(false) => {
failures.lock().await.add_assign(1);
Ok(())
}
};

ret
});

count += 1;
}

while let Some(t) = pool.join_next().await {
if let Err(e) = t {
error!("task failed: {e}")
error!("{:?}", e)
}
}

if let Some(webhook_url) = self.config.webhook_url {
let url = url::Url::parse(&webhook_url)?;
reqwest::Client::new()
.post(url)
.header(CONTENT_TYPE, "application/json")
.body(format!(
r#"{{"text":"Argus run report: {} tests succeeded, {} failed"}}"#,
successes.lock().await,
failures.lock().await
))
.send()
.await?;
}

info!("done!");
Ok(())
}
Expand All @@ -83,7 +126,9 @@ impl Argus {
config: Arc<ArgusConfig>,
package: &PackageVersionWithPackage,
p: ProgressBar,
) -> anyhow::Result<()> {
successes_sx: UnboundedSender<()>,
failures_sx: UnboundedSender<()>,
) -> anyhow::Result<bool> {
p.set_style(
indicatif::ProgressStyle::with_template(&format!(
"[{test_id}] {{spinner:.blue}} {{msg}}"
Expand All @@ -95,12 +140,21 @@ impl Argus {
p.enable_steady_tick(Duration::from_millis(100));

let package_name = Argus::get_package_id(package);
let webc_url: Url = match &package.distribution_v2.pirita_download_url {
let webc_v2_url: Url = match &package.distribution_v2.pirita_download_url {
Some(url) => url.parse().unwrap(),
None => {
info!("package {} has no download url, skipping", package_name);
p.finish_and_clear();
return Ok(true);
}
};

let webc_v3_url: Url = match &package.distribution_v3.pirita_download_url {
Some(url) => url.parse().unwrap(),
None => {
info!("package {} has no download url, skipping", package_name);
p.finish_and_clear();
return Ok(());
return Ok(true);
}
};

Expand Down Expand Up @@ -129,10 +183,10 @@ impl Argus {
};

if !runner.is_to_test().await {
return Ok(());
return Ok(true);
}

Argus::download_package(test_id, &path, &webc_url, &p).await?;
Argus::download_webcs(test_id, &path, &webc_v2_url, &webc_v3_url, &p).await?;

info!("package downloaded!");

Expand All @@ -150,14 +204,20 @@ impl Argus {
p.set_message("package downloaded");

let report = runner.run_test().await?;
info!("\n\n\n\ntest finished\n\n\n");

let outcome = report.outcome.is_ok();

if outcome {
successes_sx.send(())?;
} else {
failures_sx.send(())?;
};
Argus::write_report(&path, report).await?;

p.finish_with_message(format!("test for package {package_name} done!"));
p.finish_and_clear();

Ok(())
Ok(outcome)
}

/// Checks whether or not the package should be tested
Expand Down
44 changes: 35 additions & 9 deletions tests/wasmer-argus/src/argus/packages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,11 @@ use futures::StreamExt;
use indicatif::{ProgressBar, ProgressStyle};
use reqwest::{header, Client};
use std::{path::PathBuf, sync::Arc, time::Duration};
use tokio::{fs::File, io::AsyncWriteExt, sync::mpsc::UnboundedSender};
use tokio::{
fs::File,
io::AsyncWriteExt,
sync::mpsc::{UnboundedReceiver, UnboundedSender},
};
use tracing::*;
use url::Url;
use wasmer_api::{
Expand All @@ -20,6 +24,8 @@ impl Argus {
s: UnboundedSender<PackageVersionWithPackage>,
p: ProgressBar,
config: Arc<ArgusConfig>,
successes_rx: UnboundedReceiver<()>,
failures_rx: UnboundedReceiver<()>,
) -> anyhow::Result<()> {
info!("starting to fetch packages..");
let vars = AllPackageVersionsVars {
Expand Down Expand Up @@ -49,7 +55,12 @@ impl Argus {
anyhow::bail!("failed to fetch packages: {e}")
}
};
p.set_message(format!("fetched {} packages", count));
p.set_message(format!(
"fetched {} packages [ok: {}, err: {}]",
count,
successes_rx.len(),
failures_rx.len()
));
count += pkgs.len();

for pkg in pkgs {
Expand All @@ -70,7 +81,19 @@ impl Argus {
}

#[tracing::instrument(skip(p))]
pub(crate) async fn download_package<'a>(
pub(crate) async fn download_webcs<'a>(
test_id: u64,
path: &'a PathBuf,
webc_v2_url: &'a Url,
webc_v3_url: &'a Url,
p: &'a ProgressBar,
) -> anyhow::Result<()> {
Argus::download_package(test_id, &path.join("package_v2.webc"), webc_v2_url, p).await?;
Argus::download_package(test_id, &path.join("package_v3.webc"), webc_v3_url, p).await?;
Ok(())
}

async fn download_package<'a>(
test_id: u64,
path: &'a PathBuf,
url: &'a Url,
Expand All @@ -80,9 +103,12 @@ impl Argus {
static APP_USER_AGENT: &str =
concat!(env!("CARGO_PKG_NAME"), "/", env!("CARGO_PKG_VERSION"),);

if !path.exists() {
tokio::fs::create_dir_all(path).await?;
} else if path.exists() && !path.is_dir() {
let mut dir_path = path.clone();
dir_path.pop();

if !dir_path.exists() {
tokio::fs::create_dir_all(dir_path).await?;
} else if dir_path.exists() && !dir_path.is_dir() {
anyhow::bail!("path {:?} exists, but it is not a directory!", path)
}

Expand Down Expand Up @@ -120,19 +146,19 @@ impl Argus {

p.set_message(format!("downloading from {url}"));

let mut outfile = match File::create(&path.join("package.webc")).await {
let mut outfile = match File::create(&path).await {
Ok(o) => o,
Err(e) => {
error!(
"[{test_id}] failed to create file at {:?}. Error: {e}",
path.join("package.webc")
path.display()
);

p.finish_and_clear();

anyhow::bail!(
"[{test_id}] failed to create file at {:?}. Error: {e}",
path.join("package.webc")
path.display()
);
}
};
Expand Down
57 changes: 41 additions & 16 deletions tests/wasmer-argus/src/argus/tester/cli_tester.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,7 @@ use std::{fs::File, io::BufReader, path::Path, process::Command, sync::Arc};
use tokio::time::{self, Instant};
use tracing::*;
use wasmer_api::types::PackageVersionWithPackage;
use webc::{
v1::{ParseOptions, WebCOwned},
v2::read::OwnedReader,
Container, Version,
};
use webc::{v2::read::OwnedReader, v3::read::OwnedReader as OwnedReaderV3, Container, Version};

use super::{TestReport, Tester};

Expand Down Expand Up @@ -158,27 +154,22 @@ impl<'a> Tester for CLIRunner<'a> {

info!("starting test using CLI at {cli_path}");
let dir_path = Argus::get_path(self.config.clone(), self.package).await;
let webc_path = dir_path.join("package.webc");
let webc_v2_path = dir_path.join("package_v2.webc");

self.p
.set_message(format!("unpacking webc at {:?}", webc_path));
.set_message(format!("unpacking webc at {:?}", webc_v2_path));

let bytes = std::fs::read(&webc_path)?;
let v2_bytes = std::fs::read(&webc_v2_path)?;

let webc = match webc::detect(bytes.as_slice()) {
Ok(Version::V1) => {
let options = ParseOptions::default();
let webc = WebCOwned::parse(bytes, &options)?;
Container::from(webc)
}
Ok(Version::V2) => Container::from(OwnedReader::parse(bytes)?),
let webc_v2 = match webc::detect(v2_bytes.as_slice()) {
Ok(Version::V2) => Container::from(OwnedReader::parse(v2_bytes)?),
Ok(other) => {
return self.err(version, start_time, format!("Unsupported version, {other}"))
}
Err(e) => return self.err(version, start_time, format!("An error occurred: {e}")),
};

for (i, atom) in webc.atoms().iter().enumerate() {
for (i, atom) in webc_v2.atoms().iter().enumerate() {
self.p.set_message(format!("testing atom #{i}"));
if let Err(e) = self
.test_atom(&cli_path, atom.1.as_slice(), &dir_path, i)
Expand All @@ -188,6 +179,40 @@ impl<'a> Tester for CLIRunner<'a> {
}
}

let webc_v3_path = dir_path.join("package_v3.webc");

self.p
.set_message(format!("unpacking webc at {:?}", webc_v3_path));

let v3_bytes = std::fs::read(&webc_v3_path)?;

let webc_v3 = match webc::detect(v3_bytes.as_slice()) {
Ok(Version::V3) => Container::from(OwnedReaderV3::parse(v3_bytes)?),
Ok(other) => {
return self.err(version, start_time, format!("Unsupported version, {other}"))
}
Err(e) => return self.err(version, start_time, format!("An error occurred: {e}")),
};

for (i, atom) in webc_v3.atoms().iter().enumerate() {
self.p.set_message(format!("testing atom #{i}"));
if let Err(e) = self
.test_atom(&cli_path, atom.1.as_slice(), &dir_path, i)
.await?
{
return self.err(version.clone(), start_time, e);
}
}

let v2_file = std::fs::File::open(&webc_v2_path)?;
let v3_file = std::fs::File::open(&webc_v3_path)?;
if let Err(e) = webc::migration::are_semantically_equivalent(
shared_buffer::OwnedBuffer::from_file(&v2_file)?,
shared_buffer::OwnedBuffer::from_file(&v3_file)?,
) {
return self.err(version.clone(), start_time, e.to_string());
}

self.ok(version, start_time)
}

Expand Down
Loading

0 comments on commit 5ab5880

Please sign in to comment.