Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Argus: Compare v2 and v3 + add webhook message #4736

Merged
merged 2 commits into from
May 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ wasmer-config = { path = "./lib/config" }
wasmer-wasix = { path = "./lib/wasix" }

# Wasmer-owned crates
webc = { version = "6.0.0-alpha8", default-features = false, features = ["package"] }
webc = { version = "6.0.0-alpha9", default-features = false, features = ["package"] }
edge-schema = { version = "=0.1.0" }
shared-buffer = "0.1.4"

Expand Down
1 change: 1 addition & 0 deletions tests/wasmer-argus/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ derive_more = "0.99.17"
webc.workspace = true
async-trait = "0.1.77"
wasmer-api = { path = "../../lib/backend-api" }
shared-buffer.workspace = true


[target.'cfg(not(target_arch = "riscv64"))'.dependencies]
Expand Down
4 changes: 4 additions & 0 deletions tests/wasmer-argus/src/argus/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,4 +61,8 @@ pub struct ArgusConfig {
#[cfg(feature = "wasmer_lib")]
#[arg(long, conflicts_with = "cli_path")]
pub use_lib: bool,

/// The webhook to use when sending the test outcome.
#[arg(long)]
pub webhook_url: Option<String>,
}
84 changes: 71 additions & 13 deletions tests/wasmer-argus/src/argus/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,13 @@ mod tester;
use self::tester::{TestReport, Tester};
pub use config::*;
use indicatif::{MultiProgress, ProgressBar};
use std::{fs::OpenOptions, io::Write as _, path::Path, sync::Arc, time::Duration};
use reqwest::header::CONTENT_TYPE;
use std::{fs::OpenOptions, io::Write as _, ops::AddAssign, path::Path, sync::Arc, time::Duration};
use tokio::{
sync::{mpsc, Semaphore},
sync::{
mpsc::{self, UnboundedSender},
Mutex, Semaphore,
},
task::JoinSet,
};
use tracing::*;
Expand Down Expand Up @@ -38,18 +42,24 @@ impl Argus {

let m = MultiProgress::new();
let (s, mut r) = mpsc::unbounded_channel();
let (successes_sx, successes_rx) = mpsc::unbounded_channel();
let (failures_sx, failures_rx) = mpsc::unbounded_channel();

let mut pool = JoinSet::new();
let c = Arc::new(self.config.clone());

{
let this = self.clone();
let bar = m.add(ProgressBar::new(0));

pool.spawn(async move { this.fetch_packages(s, bar, c.clone()).await });
pool.spawn(async move {
this.fetch_packages(s, bar, c.clone(), successes_rx, failures_rx)
.await
});
}

let mut count = 0;
let successes = Arc::new(Mutex::new(0));
let failures = Arc::new(Mutex::new(0));

let c = Arc::new(self.config.clone());
let sem = Arc::new(Semaphore::new(self.config.jobs));
Expand All @@ -58,21 +68,52 @@ impl Argus {
let c = c.clone();
let bar = m.add(ProgressBar::new(0));
let permit = Arc::clone(&sem).acquire_owned().await;
let successes_sx = successes_sx.clone();
let failures_sx = failures_sx.clone();
let failures = failures.clone();
let successes = successes.clone();

pool.spawn(async move {
let _permit = permit;
Argus::test(count, c, &pkg, bar).await
match Argus::test(count, c, &pkg, bar, successes_sx, failures_sx).await {
Err(e) => {
failures.lock().await.add_assign(1);
Err(e)
}
Ok(true) => {
successes.lock().await.add_assign(1);
Ok(())
}
Ok(false) => {
failures.lock().await.add_assign(1);
Ok(())
}
}
});

count += 1;
}

while let Some(t) = pool.join_next().await {
if let Err(e) = t {
error!("task failed: {e}")
error!("{:?}", e)
}
}

if let Some(webhook_url) = self.config.webhook_url {
let url = url::Url::parse(&webhook_url)?;
reqwest::Client::new()
.post(url)
.header(CONTENT_TYPE, "application/json")
.body(format!(
r#"{{"text":"Argus run report: {} tests succeeded, {} failed"}}"#,
successes.lock().await,
failures.lock().await
))
.send()
.await?;
}

info!("done!");
Ok(())
}
Expand All @@ -83,7 +124,9 @@ impl Argus {
config: Arc<ArgusConfig>,
package: &PackageVersionWithPackage,
p: ProgressBar,
) -> anyhow::Result<()> {
successes_sx: UnboundedSender<()>,
failures_sx: UnboundedSender<()>,
) -> anyhow::Result<bool> {
p.set_style(
indicatif::ProgressStyle::with_template(&format!(
"[{test_id}] {{spinner:.blue}} {{msg}}"
Expand All @@ -95,12 +138,21 @@ impl Argus {
p.enable_steady_tick(Duration::from_millis(100));

let package_name = Argus::get_package_id(package);
let webc_url: Url = match &package.distribution_v2.pirita_download_url {
let webc_v2_url: Url = match &package.distribution_v2.pirita_download_url {
Some(url) => url.parse().unwrap(),
None => {
info!("package {} has no download url, skipping", package_name);
p.finish_and_clear();
return Ok(true);
}
};

let webc_v3_url: Url = match &package.distribution_v3.pirita_download_url {
Some(url) => url.parse().unwrap(),
None => {
info!("package {} has no download url, skipping", package_name);
p.finish_and_clear();
return Ok(());
return Ok(true);
}
};

Expand Down Expand Up @@ -129,10 +181,10 @@ impl Argus {
};

if !runner.is_to_test().await {
return Ok(());
return Ok(true);
}

Argus::download_package(test_id, &path, &webc_url, &p).await?;
Argus::download_webcs(test_id, &path, &webc_v2_url, &webc_v3_url, &p).await?;

info!("package downloaded!");

Expand All @@ -150,14 +202,20 @@ impl Argus {
p.set_message("package downloaded");

let report = runner.run_test().await?;
info!("\n\n\n\ntest finished\n\n\n");

let outcome = report.outcome.is_ok();

if outcome {
successes_sx.send(())?;
} else {
failures_sx.send(())?;
};
Argus::write_report(&path, report).await?;

p.finish_with_message(format!("test for package {package_name} done!"));
p.finish_and_clear();

Ok(())
Ok(outcome)
}

/// Checks whether or not the package should be tested
Expand Down
44 changes: 35 additions & 9 deletions tests/wasmer-argus/src/argus/packages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,11 @@ use futures::StreamExt;
use indicatif::{ProgressBar, ProgressStyle};
use reqwest::{header, Client};
use std::{path::PathBuf, sync::Arc, time::Duration};
use tokio::{fs::File, io::AsyncWriteExt, sync::mpsc::UnboundedSender};
use tokio::{
fs::File,
io::AsyncWriteExt,
sync::mpsc::{UnboundedReceiver, UnboundedSender},
};
use tracing::*;
use url::Url;
use wasmer_api::{
Expand All @@ -20,6 +24,8 @@ impl Argus {
s: UnboundedSender<PackageVersionWithPackage>,
p: ProgressBar,
config: Arc<ArgusConfig>,
successes_rx: UnboundedReceiver<()>,
failures_rx: UnboundedReceiver<()>,
) -> anyhow::Result<()> {
info!("starting to fetch packages..");
let vars = AllPackageVersionsVars {
Expand Down Expand Up @@ -49,7 +55,12 @@ impl Argus {
anyhow::bail!("failed to fetch packages: {e}")
}
};
p.set_message(format!("fetched {} packages", count));
p.set_message(format!(
"fetched {} packages [ok: {}, err: {}]",
count,
successes_rx.len(),
failures_rx.len()
));
count += pkgs.len();

for pkg in pkgs {
Expand All @@ -70,7 +81,19 @@ impl Argus {
}

#[tracing::instrument(skip(p))]
pub(crate) async fn download_package<'a>(
pub(crate) async fn download_webcs<'a>(
test_id: u64,
path: &'a PathBuf,
webc_v2_url: &'a Url,
webc_v3_url: &'a Url,
p: &'a ProgressBar,
) -> anyhow::Result<()> {
Argus::download_package(test_id, &path.join("package_v2.webc"), webc_v2_url, p).await?;
Argus::download_package(test_id, &path.join("package_v3.webc"), webc_v3_url, p).await?;
Ok(())
}

async fn download_package<'a>(
test_id: u64,
path: &'a PathBuf,
url: &'a Url,
Expand All @@ -80,9 +103,12 @@ impl Argus {
static APP_USER_AGENT: &str =
concat!(env!("CARGO_PKG_NAME"), "/", env!("CARGO_PKG_VERSION"),);

if !path.exists() {
tokio::fs::create_dir_all(path).await?;
} else if path.exists() && !path.is_dir() {
let mut dir_path = path.clone();
dir_path.pop();

if !dir_path.exists() {
tokio::fs::create_dir_all(dir_path).await?;
} else if dir_path.exists() && !dir_path.is_dir() {
anyhow::bail!("path {:?} exists, but it is not a directory!", path)
}

Expand Down Expand Up @@ -120,19 +146,19 @@ impl Argus {

p.set_message(format!("downloading from {url}"));

let mut outfile = match File::create(&path.join("package.webc")).await {
let mut outfile = match File::create(&path).await {
Ok(o) => o,
Err(e) => {
error!(
"[{test_id}] failed to create file at {:?}. Error: {e}",
path.join("package.webc")
path.display()
);

p.finish_and_clear();

anyhow::bail!(
"[{test_id}] failed to create file at {:?}. Error: {e}",
path.join("package.webc")
path.display()
);
}
};
Expand Down
57 changes: 41 additions & 16 deletions tests/wasmer-argus/src/argus/tester/cli_tester.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,7 @@ use std::{fs::File, io::BufReader, path::Path, process::Command, sync::Arc};
use tokio::time::{self, Instant};
use tracing::*;
use wasmer_api::types::PackageVersionWithPackage;
use webc::{
v1::{ParseOptions, WebCOwned},
v2::read::OwnedReader,
Container, Version,
};
use webc::{v2::read::OwnedReader, v3::read::OwnedReader as OwnedReaderV3, Container, Version};

use super::{TestReport, Tester};

Expand Down Expand Up @@ -158,27 +154,22 @@ impl<'a> Tester for CLIRunner<'a> {

info!("starting test using CLI at {cli_path}");
let dir_path = Argus::get_path(self.config.clone(), self.package).await;
let webc_path = dir_path.join("package.webc");
let webc_v2_path = dir_path.join("package_v2.webc");

self.p
.set_message(format!("unpacking webc at {:?}", webc_path));
.set_message(format!("unpacking webc at {:?}", webc_v2_path));

let bytes = std::fs::read(&webc_path)?;
let v2_bytes = std::fs::read(&webc_v2_path)?;

let webc = match webc::detect(bytes.as_slice()) {
Ok(Version::V1) => {
let options = ParseOptions::default();
let webc = WebCOwned::parse(bytes, &options)?;
Container::from(webc)
}
Ok(Version::V2) => Container::from(OwnedReader::parse(bytes)?),
let webc_v2 = match webc::detect(v2_bytes.as_slice()) {
Ok(Version::V2) => Container::from(OwnedReader::parse(v2_bytes)?),
Ok(other) => {
return self.err(version, start_time, format!("Unsupported version, {other}"))
}
Err(e) => return self.err(version, start_time, format!("An error occurred: {e}")),
};

for (i, atom) in webc.atoms().iter().enumerate() {
for (i, atom) in webc_v2.atoms().iter().enumerate() {
self.p.set_message(format!("testing atom #{i}"));
if let Err(e) = self
.test_atom(&cli_path, atom.1.as_slice(), &dir_path, i)
Expand All @@ -188,6 +179,40 @@ impl<'a> Tester for CLIRunner<'a> {
}
}

let webc_v3_path = dir_path.join("package_v3.webc");

self.p
.set_message(format!("unpacking webc at {:?}", webc_v3_path));

let v3_bytes = std::fs::read(&webc_v3_path)?;

let webc_v3 = match webc::detect(v3_bytes.as_slice()) {
Ok(Version::V3) => Container::from(OwnedReaderV3::parse(v3_bytes)?),
Ok(other) => {
return self.err(version, start_time, format!("Unsupported version, {other}"))
}
Err(e) => return self.err(version, start_time, format!("An error occurred: {e}")),
};

for (i, atom) in webc_v3.atoms().iter().enumerate() {
self.p.set_message(format!("testing atom #{i}"));
if let Err(e) = self
.test_atom(&cli_path, atom.1.as_slice(), &dir_path, i)
.await?
{
return self.err(version.clone(), start_time, e);
}
}

let v2_file = std::fs::File::open(&webc_v2_path)?;
let v3_file = std::fs::File::open(&webc_v3_path)?;
if let Err(e) = webc::migration::are_semantically_equivalent(
shared_buffer::OwnedBuffer::from_file(&v2_file)?,
shared_buffer::OwnedBuffer::from_file(&v3_file)?,
) {
return self.err(version.clone(), start_time, e.to_string());
}

self.ok(version, start_time)
}

Expand Down
Loading
Loading