diff --git a/Cargo.lock b/Cargo.lock index 809e50e80fe..7b3d00ea577 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2647,6 +2647,7 @@ dependencies = [ "serde_urlencoded", "tokio", "tokio-rustls", + "tokio-util", "tower-service", "url", "wasm-bindgen", @@ -4252,7 +4253,9 @@ dependencies = [ "anyhow", "dirs 4.0.0", "flate2", + "futures-util", "graphql_client", + "hex", "log", "lzma-rs", "rand 0.8.5", @@ -4263,9 +4266,11 @@ dependencies = [ "tar", "tempdir", "thiserror", + "tokio", "toml", "url", "wapm-toml", + "webc", "whoami", ] diff --git a/lib/cli/src/commands/login.rs b/lib/cli/src/commands/login.rs index 8e70b40ebfa..539f132b147 100644 --- a/lib/cli/src/commands/login.rs +++ b/lib/cli/src/commands/login.rs @@ -1,4 +1,5 @@ use clap::Parser; +#[cfg(not(test))] use dialoguer::Input; /// Subcommand for listing packages diff --git a/lib/cli/src/commands/run.rs b/lib/cli/src/commands/run.rs index ab019ae6d28..52c597287ea 100644 --- a/lib/cli/src/commands/run.rs +++ b/lib/cli/src/commands/run.rs @@ -11,6 +11,7 @@ use std::collections::HashMap; use std::ops::Deref; use std::path::PathBuf; use std::str::FromStr; +use url::Url; use wasmer::FunctionEnv; use wasmer::*; #[cfg(feature = "cache")] @@ -827,6 +828,11 @@ pub(crate) fn try_run_package_or_file( ) -> Result<(), anyhow::Error> { let debug_msgs_allowed = isatty::stdout_isatty(); + if let Ok(url) = url::Url::parse(&format!("{}", r.path.display())) { + let result = try_run_url(&url, args, r, debug); + return result; + } + // Check "r.path" is a file or a package / command name if r.path.exists() { if r.path.is_dir() && r.path.join("wapm.toml").exists() { @@ -908,3 +914,32 @@ pub(crate) fn try_run_package_or_file( // else: local package not found - try to download and install package try_autoinstall_package(args, &sv, package_download_info, r.force_install) } + +fn try_run_url(url: &Url, _args: &[String], r: &Run, _debug: bool) -> Result<(), anyhow::Error> { + let checksum = wasmer_registry::get_remote_webc_checksum(url) + .map_err(|e| anyhow::anyhow!("error fetching {url}: {e}"))?; + + let packages = wasmer_registry::get_all_installed_webc_packages(); + + if !packages.iter().any(|p| p.checksum == checksum) { + let sp = start_spinner(format!("Installing {}", url)); + + let result = wasmer_registry::install_webc_package(url, &checksum); + + result.map_err(|e| anyhow::anyhow!("error fetching {url}: {e}"))?; + + if let Some(sp) = sp { + sp.close(); + } + } + + let webc_dir = wasmer_registry::get_webc_dir(); + + let webc_install_path = webc_dir + .context("Error installing package: no webc dir")? + .join(checksum); + + let mut r = r.clone(); + r.path = webc_install_path; + r.execute() +} diff --git a/lib/registry/Cargo.toml b/lib/registry/Cargo.toml index 5e11649a5ba..4fb921454e1 100644 --- a/lib/registry/Cargo.toml +++ b/lib/registry/Cargo.toml @@ -13,7 +13,8 @@ dirs = "4.0.0" graphql_client = "0.11.0" serde = { version = "1.0.145", features = ["derive"] } anyhow = "1.0.65" -reqwest = { version = "0.11.12", default-features = false, features = ["rustls-tls", "blocking", "multipart", "json"] } +reqwest = { version = "0.11.12", default-features = false, features = ["rustls-tls", "blocking", "multipart", "json", "stream"] } +futures-util = "0.3.25" whoami = "1.2.3" serde_json = "1.0.85" url = "2.3.1" @@ -24,5 +25,8 @@ tar = "0.4.38" flate2 = "1.0.24" semver = "1.0.14" lzma-rs = "0.2.0" +webc = { version ="3.0.1", features = ["mmap"] } +hex = "0.4.3" +tokio = "1.21.2" tempdir = "0.3.7" log = "0.4.17" diff --git a/lib/registry/src/graphql.rs b/lib/registry/src/graphql.rs index 730c7599a97..c473f22ddd8 100644 --- a/lib/registry/src/graphql.rs +++ b/lib/registry/src/graphql.rs @@ -9,7 +9,7 @@ use std::time::Duration; #[cfg(target_os = "wasi")] use {wasm_bus_reqwest::prelude::header::*, wasm_bus_reqwest::prelude::*}; -mod proxy { +pub(crate) mod proxy { //! Code for dealing with setting things up to proxy network requests use thiserror::Error; @@ -25,6 +25,36 @@ mod proxy { ConnectionError(String), } + pub fn maybe_set_up_proxy_blocking( + builder: reqwest::blocking::ClientBuilder, + ) -> anyhow::Result { + #[cfg(not(target_os = "wasi"))] + use anyhow::Context; + #[cfg(not(target_os = "wasi"))] + if let Some(proxy) = maybe_set_up_proxy_inner() + .map_err(|e| anyhow::anyhow!("{e}")) + .context("install_webc_package: failed to setup proxy for reqwest Client")? + { + return Ok(builder.proxy(proxy)); + } + Ok(builder) + } + + pub fn maybe_set_up_proxy( + builder: reqwest::ClientBuilder, + ) -> anyhow::Result { + #[cfg(not(target_os = "wasi"))] + use anyhow::Context; + #[cfg(not(target_os = "wasi"))] + if let Some(proxy) = maybe_set_up_proxy_inner() + .map_err(|e| anyhow::anyhow!("{e}")) + .context("install_webc_package: failed to setup proxy for reqwest Client")? + { + return Ok(builder.proxy(proxy)); + } + Ok(builder) + } + /// Tries to set up a proxy /// /// This function reads from wapm config's `proxy.url` first, then checks @@ -37,7 +67,7 @@ mod proxy { /// A return value of `Ok(None)` means that there was no attempt to set up a proxy, /// `Ok(Some(proxy))` means that the proxy was set up successfully, and `Err(e)` that /// there was a failure while attempting to set up the proxy. - pub fn maybe_set_up_proxy() -> anyhow::Result> { + fn maybe_set_up_proxy_inner() -> anyhow::Result> { use std::env; let proxy = if let Ok(proxy_url) = env::var("ALL_PROXY").or_else(|_| env::var("all_proxy")) { @@ -120,13 +150,7 @@ pub fn whoami_distro() -> String { fn setup_client() -> Result { let builder = Client::builder(); - - let builder = if let Some(proxy) = proxy::maybe_set_up_proxy()? { - builder.proxy(proxy) - } else { - builder - }; - + let builder = proxy::maybe_set_up_proxy_blocking(builder)?; builder.build().map_err(|e| e.into()) } diff --git a/lib/registry/src/lib.rs b/lib/registry/src/lib.rs index 1ecda1bc7ce..9af6a79d8be 100644 --- a/lib/registry/src/lib.rs +++ b/lib/registry/src/lib.rs @@ -10,13 +10,17 @@ use crate::config::Registries; use anyhow::Context; +use core::ops::Range; +use reqwest::header::{ACCEPT, RANGE}; use std::fmt; +use std::io::Write; use std::path::{Path, PathBuf}; use std::time::Duration; use std::{ collections::BTreeMap, fmt::{Display, Formatter}, }; +use url::Url; pub mod config; pub mod graphql; @@ -630,6 +634,14 @@ pub fn get_checkouts_dir(#[cfg(test)] test_name: &str) -> Option { Some(root_dir.join("checkouts")) } +pub fn get_webc_dir(#[cfg(test)] test_name: &str) -> Option { + #[cfg(test)] + let root_dir = get_wasmer_root_dir(test_name)?; + #[cfg(not(test))] + let root_dir = get_wasmer_root_dir()?; + Some(root_dir.join("webc")) +} + /// Returs the path to the directory where all packages on this computer are being stored pub fn get_global_install_dir( #[cfg(test)] test_name: &str, @@ -919,6 +931,231 @@ pub fn get_all_available_registries(#[cfg(test)] test_name: &str) -> Result Result<(), anyhow::Error> { + tokio::runtime::Builder::new_multi_thread() + .enable_all() + .build() + .unwrap() + .block_on(async { + { + #[cfg(test)] + { + install_webc_package_inner(test_name, url, checksum).await + } + #[cfg(not(test))] + { + install_webc_package_inner(url, checksum).await + } + } + }) +} + +async fn install_webc_package_inner( + #[cfg(test)] test_name: &str, + url: &Url, + checksum: &str, +) -> Result<(), anyhow::Error> { + use futures_util::StreamExt; + + #[cfg(test)] + let path = get_webc_dir(test_name).ok_or_else(|| anyhow::anyhow!("no webc dir"))?; + #[cfg(not(test))] + let path = get_webc_dir().ok_or_else(|| anyhow::anyhow!("no webc dir"))?; + + let _ = std::fs::create_dir_all(&path); + + let webc_path = path.join(checksum); + + let mut file = std::fs::File::create(&webc_path) + .map_err(|e| anyhow::anyhow!("{e}")) + .context(anyhow::anyhow!("{}", webc_path.display()))?; + + let client = { + let builder = reqwest::Client::builder(); + let builder = crate::graphql::proxy::maybe_set_up_proxy(builder)?; + builder + .build() + .map_err(|e| anyhow::anyhow!("{e}")) + .context("install_webc_package: failed to build reqwest Client")? + }; + + let res = client + .get(url.clone()) + .header(ACCEPT, "application/webc") + .send() + .await + .and_then(|response| response.error_for_status()) + .map_err(|e| anyhow::anyhow!("{e}")) + .context(anyhow::anyhow!("install_webc_package: failed to GET {url}"))?; + + let mut stream = res.bytes_stream(); + + while let Some(item) = stream.next().await { + let item = item + .map_err(|e| anyhow::anyhow!("{e}")) + .context(anyhow::anyhow!("install_webc_package: failed to GET {url}"))?; + file.write_all(&item) + .map_err(|e| anyhow::anyhow!("{e}")) + .context(anyhow::anyhow!( + "install_webc_package: failed to write chunk to {}", + webc_path.display() + ))?; + } + + Ok(()) +} + +/// Returns a list of all installed webc packages +#[cfg(test)] +pub fn get_all_installed_webc_packages(test_name: &str) -> Vec { + get_all_installed_webc_packages_inner(test_name) +} + +#[cfg(not(test))] +pub fn get_all_installed_webc_packages() -> Vec { + get_all_installed_webc_packages_inner("") +} + +fn get_all_installed_webc_packages_inner(_test_name: &str) -> Vec { + #[cfg(test)] + let dir = match get_webc_dir(_test_name) { + Some(s) => s, + None => return Vec::new(), + }; + + #[cfg(not(test))] + let dir = match get_webc_dir() { + Some(s) => s, + None => return Vec::new(), + }; + + let read_dir = match std::fs::read_dir(dir) { + Ok(s) => s, + Err(_) => return Vec::new(), + }; + + read_dir + .filter_map(|r| Some(r.ok()?.path())) + .filter_map(|path| { + webc::WebCMmap::parse( + path, + &webc::ParseOptions { + parse_atoms: false, + parse_volumes: false, + ..Default::default() + }, + ) + .ok() + }) + .filter_map(|webc| { + let checksum = webc.checksum.as_ref().map(|s| &s.data)?.to_vec(); + let hex_string = get_checksum_hash(&checksum); + Some(RemoteWebcInfo { + checksum: hex_string, + manifest: webc.manifest.clone(), + }) + }) + .collect() +} + +/// The checksum of the webc file has a bunch of zeros at the end +/// (it's currently encoded that way in the webc format). This function +/// strips the zeros because otherwise the filename would become too long. +/// +/// So: +/// +/// `3ea47cb0000000000000` -> `3ea47cb` +/// +pub fn get_checksum_hash(bytes: &[u8]) -> String { + let mut checksum = bytes.to_vec(); + while checksum.last().copied() == Some(0) { + checksum.pop(); + } + hex::encode(&checksum) +} + +/// Returns the checksum of the .webc file, so that we can check whether the +/// file is already installed before downloading it +pub fn get_remote_webc_checksum(url: &Url) -> Result { + let request_max_bytes = webc::WebC::get_signature_offset_start() + 4 + 1024 + 8 + 8; + let data = get_webc_bytes(url, Some(0..request_max_bytes)).context("get_webc_bytes failed")?; + let checksum = webc::WebC::get_checksum_bytes(&data) + .map_err(|e| anyhow::anyhow!("{e}")) + .context("get_checksum_bytes failed")? + .to_vec(); + Ok(get_checksum_hash(&checksum)) +} + +/// Before fetching the entire file from a remote URL, just fetch the manifest +/// so we can see if the package has already been installed +pub fn get_remote_webc_manifest(url: &Url) -> Result { + // Request up unti manifest size / manifest len + let request_max_bytes = webc::WebC::get_signature_offset_start() + 4 + 1024 + 8 + 8; + let data = get_webc_bytes(url, Some(0..request_max_bytes))?; + let checksum = webc::WebC::get_checksum_bytes(&data) + .map_err(|e| anyhow::anyhow!("{e}")) + .context("WebC::get_checksum_bytes failed")? + .to_vec(); + let hex_string = get_checksum_hash(&checksum); + + let (manifest_start, manifest_len) = webc::WebC::get_manifest_offset_size(&data) + .map_err(|e| anyhow::anyhow!("{e}")) + .context("WebC::get_manifest_offset_size failed")?; + let data_with_manifest = get_webc_bytes(url, Some(0..manifest_start + manifest_len))?; + let manifest = webc::WebC::get_manifest(&data_with_manifest) + .map_err(|e| anyhow::anyhow!("{e}")) + .context("WebC::get_manifest failed")?; + Ok(RemoteWebcInfo { + checksum: hex_string, + manifest, + }) +} + +fn setup_webc_client(url: &Url) -> Result { + let client = { + let builder = reqwest::blocking::Client::builder(); + let builder = crate::graphql::proxy::maybe_set_up_proxy_blocking(builder) + .context("setup_webc_client")?; + builder + .build() + .map_err(|e| anyhow::anyhow!("{e}")) + .context("setup_webc_client: builder.build() failed")? + }; + + Ok(client.get(url.clone()).header(ACCEPT, "application/webc")) +} + +fn get_webc_bytes(url: &Url, range: Option>) -> Result, anyhow::Error> { + // curl -r 0-500 -L https://wapm.dev/syrusakbary/python -H "Accept: application/webc" --output python.webc + + let mut res = setup_webc_client(url)?; + + if let Some(range) = range.as_ref() { + res = res.header(RANGE, format!("bytes={}-{}", range.start, range.end)); + } + + let res = res + .send() + .map_err(|e| anyhow::anyhow!("{e}")) + .context("send() failed")?; + let bytes = res + .bytes() + .map_err(|e| anyhow::anyhow!("{e}")) + .context("bytes() failed")?; + + Ok(bytes.to_vec()) +} + // TODO: this test is segfaulting only on linux-musl, no other OS // See https://github.com/wasmerio/wasmer/pull/3215 #[cfg(not(target_env = "musl"))] diff --git a/tests/integration/cli/tests/run.rs b/tests/integration/cli/tests/run.rs index c43fe52b396..e1a5afefb4f 100644 --- a/tests/integration/cli/tests/run.rs +++ b/tests/integration/cli/tests/run.rs @@ -292,6 +292,32 @@ fn test_wasmer_run_pirita_works() -> anyhow::Result<()> { Ok(()) } +#[cfg(feature = "webc_runner")] +#[test] +fn test_wasmer_run_pirita_url_works() -> anyhow::Result<()> { + let output = Command::new(get_wasmer_path()) + .arg("run") + .arg("https://wapm.dev/syrusakbary/python") + .arg("--") + .arg("-c") + .arg("print(\"hello\")") + .output()?; + + let stdout = std::str::from_utf8(&output.stdout) + .expect("stdout is not utf8! need to handle arbitrary bytes"); + + if stdout != "hello\n" { + bail!( + "1 running python.wasmer failed with: stdout: {}\n\nstderr: {}", + stdout, + std::str::from_utf8(&output.stderr) + .expect("stderr is not utf8! need to handle arbitrary bytes") + ); + } + + Ok(()) +} + #[test] fn test_wasmer_run_works_with_dir() -> anyhow::Result<()> { let temp_dir = tempfile::TempDir::new()?;