From 05ddda648cb599b066b950da81353c94b3f74666 Mon Sep 17 00:00:00 2001 From: Michael-F-Bryan Date: Fri, 9 Jun 2023 20:32:08 +0800 Subject: [PATCH 1/6] Instrument more functions with tracing --- .../src/runtime/package_loader/builtin_loader.rs | 13 +++++++++++++ lib/wasix/src/runtime/resolver/wapm_source.rs | 13 +++++++++---- lib/wasix/src/runtime/resolver/web_source.rs | 2 +- 3 files changed, 23 insertions(+), 5 deletions(-) diff --git a/lib/wasix/src/runtime/package_loader/builtin_loader.rs b/lib/wasix/src/runtime/package_loader/builtin_loader.rs index 83a6fe0f67f..633a2dfda46 100644 --- a/lib/wasix/src/runtime/package_loader/builtin_loader.rs +++ b/lib/wasix/src/runtime/package_loader/builtin_loader.rs @@ -87,6 +87,7 @@ impl BuiltinPackageLoader { Ok(None) } + #[tracing::instrument(level = "debug", skip_all, fields(%dist.webc, %dist.webc_sha256))] async fn download(&self, dist: &DistributionInfo) -> Result { if dist.webc.scheme() == "file" { match crate::runtime::resolver::utils::file_path_from_url(&dist.webc) { @@ -114,8 +115,19 @@ impl BuiltinPackageLoader { options: Default::default(), }; + tracing::debug!(%request.url, %request.method, "Downloading a webc file"); + tracing::trace!(?request.headers); + let response = self.client.request(request).await?; + tracing::trace!( + %response.status, + %response.redirected, + ?response.headers, + response.len=response.body.as_ref().map(|body| body.len()), + "Received a response", + ); + if !response.is_ok() { let url = &dist.webc; return Err(crate::runtime::resolver::utils::http_error(&response) @@ -129,6 +141,7 @@ impl BuiltinPackageLoader { Ok(body.into()) } + #[tracing::instrument(level = "debug", skip_all)] async fn save_and_load_as_mmapped( &self, webc: &[u8], diff --git a/lib/wasix/src/runtime/resolver/wapm_source.rs b/lib/wasix/src/runtime/resolver/wapm_source.rs index b0595495186..6dc2695434d 100644 --- a/lib/wasix/src/runtime/resolver/wapm_source.rs +++ b/lib/wasix/src/runtime/resolver/wapm_source.rs @@ -41,6 +41,7 @@ impl Source for WapmSource { PackageSpecifier::Registry { full_name, version } => (full_name, version), _ => return Ok(Vec::new()), }; + #[derive(serde::Serialize)] struct Body { query: String, @@ -49,17 +50,18 @@ impl Source for WapmSource { let body = Body { query: WASMER_WEBC_QUERY_ALL.replace("$NAME", full_name), }; - let body = serde_json::to_string(&body)?; - tracing::trace!(%body, "Sending GraphQL query"); let request = HttpRequest { url: self.registry_endpoint.clone(), method: Method::POST, - body: Some(body.into_bytes()), + body: Some(serde_json::to_string(&body)?.into_bytes()), headers: headers(), options: Default::default(), }; + tracing::debug!(%request.url, %request.method, "Querying the GraphQL API"); + tracing::trace!(?request.headers, request.body=body.query.as_str()); + let response = self.client.request(request).await?; if !response.is_ok() { @@ -70,7 +72,10 @@ impl Source for WapmSource { let body = response.body.unwrap_or_default(); tracing::trace!( - body=?String::from_utf8_lossy(&body), + %response.status, + %response.redirected, + ?response.headers, + response.body=?String::from_utf8_lossy(&body), "Received a response from GraphQL", ); diff --git a/lib/wasix/src/runtime/resolver/web_source.rs b/lib/wasix/src/runtime/resolver/web_source.rs index 8ab89fa2ead..787ca19853d 100644 --- a/lib/wasix/src/runtime/resolver/web_source.rs +++ b/lib/wasix/src/runtime/resolver/web_source.rs @@ -68,7 +68,7 @@ impl WebSource { } /// Download a package and cache it locally. - #[tracing::instrument(skip_all, fields(%url))] + #[tracing::instrument(level = "debug", skip_all, fields(%url))] async fn get_locally_cached_file(&self, url: &Url) -> Result { // This function is a bit tricky because we go to great lengths to avoid // unnecessary downloads. From 5df78e2c996b8005d5a20c6a81a247d8abb8f6c6 Mon Sep 17 00:00:00 2001 From: Michael-F-Bryan Date: Fri, 9 Jun 2023 20:32:43 +0800 Subject: [PATCH 2/6] Rename "multi_source_registry.rs" --- lib/wasix/src/runtime/resolver/mod.rs | 4 ++-- .../resolver/{multi_source_registry.rs => multi_source.rs} | 0 2 files changed, 2 insertions(+), 2 deletions(-) rename lib/wasix/src/runtime/resolver/{multi_source_registry.rs => multi_source.rs} (100%) diff --git a/lib/wasix/src/runtime/resolver/mod.rs b/lib/wasix/src/runtime/resolver/mod.rs index 6f5a16738e1..d5497711eba 100644 --- a/lib/wasix/src/runtime/resolver/mod.rs +++ b/lib/wasix/src/runtime/resolver/mod.rs @@ -1,7 +1,7 @@ mod filesystem_source; mod in_memory_source; mod inputs; -mod multi_source_registry; +mod multi_source; mod outputs; mod resolve; mod source; @@ -16,7 +16,7 @@ pub use self::{ Command, Dependency, DistributionInfo, PackageInfo, PackageSpecifier, PackageSummary, WebcHash, }, - multi_source_registry::MultiSource, + multi_source::MultiSource, outputs::{ DependencyGraph, Edge, ItemLocation, Node, PackageId, Resolution, ResolvedFileSystemMapping, ResolvedPackage, diff --git a/lib/wasix/src/runtime/resolver/multi_source_registry.rs b/lib/wasix/src/runtime/resolver/multi_source.rs similarity index 100% rename from lib/wasix/src/runtime/resolver/multi_source_registry.rs rename to lib/wasix/src/runtime/resolver/multi_source.rs From fe4978f458e1e802b18cf485086fd192fafa9e46 Mon Sep 17 00:00:00 2001 From: Michael-F-Bryan Date: Thu, 15 Jun 2023 21:38:29 +0800 Subject: [PATCH 3/6] Add filesystem caching for WAPM queries --- lib/cli/src/commands/run/wasi.rs | 5 +- lib/wasix/src/runtime/resolver/wapm_source.rs | 226 +++++++++++++++++- 2 files changed, 218 insertions(+), 13 deletions(-) diff --git a/lib/cli/src/commands/run/wasi.rs b/lib/cli/src/commands/run/wasi.rs index b945351c1f4..8c48901640f 100644 --- a/lib/cli/src/commands/run/wasi.rs +++ b/lib/cli/src/commands/run/wasi.rs @@ -341,7 +341,10 @@ impl Wasi { source.add_source(preloaded); let graphql_endpoint = self.graphql_endpoint(wasmer_dir)?; - source.add_source(WapmSource::new(graphql_endpoint, Arc::clone(&client))); + let cache_dir = WapmSource::default_cache_dir(wasmer_dir); + let wapm_source = + WapmSource::new(graphql_endpoint, Arc::clone(&client)).with_local_cache(cache_dir); + source.add_source(wapm_source); let cache_dir = WebSource::default_cache_dir(wasmer_dir); source.add_source(WebSource::new(cache_dir, client)); diff --git a/lib/wasix/src/runtime/resolver/wapm_source.rs b/lib/wasix/src/runtime/resolver/wapm_source.rs index 6dc2695434d..1265405057e 100644 --- a/lib/wasix/src/runtime/resolver/wapm_source.rs +++ b/lib/wasix/src/runtime/resolver/wapm_source.rs @@ -1,4 +1,8 @@ -use std::sync::Arc; +use std::{ + path::{Path, PathBuf}, + sync::Arc, + time::{Duration, SystemTime}, +}; use anyhow::{Context, Error}; use http::{HeaderMap, Method}; @@ -19,6 +23,7 @@ use crate::{ pub struct WapmSource { registry_endpoint: Url, client: Arc, + cache: Option, } impl WapmSource { @@ -29,26 +34,65 @@ impl WapmSource { WapmSource { registry_endpoint, client, + cache: None, } } -} -#[async_trait::async_trait] -impl Source for WapmSource { - #[tracing::instrument(level = "debug", skip_all, fields(%package))] - async fn query(&self, package: &PackageSpecifier) -> Result, Error> { - let (full_name, version_constraint) = match package { - PackageSpecifier::Registry { full_name, version } => (full_name, version), - _ => return Ok(Vec::new()), - }; + /// Get the directory that is typically used when caching downloaded + /// packages inside `$WASMER_DIR`. + pub fn default_cache_dir(wasmer_dir: impl AsRef) -> PathBuf { + wasmer_dir.as_ref().join("queries") + } + + /// Cache query results locally. + pub fn with_local_cache(self, cache_dir: impl Into) -> Self { + WapmSource { + cache: Some(FileSystemCache::new(cache_dir)), + ..self + } + } + + async fn lookup_package(&self, package_name: &str) -> Result { + if let Some(cache) = &self.cache { + match cache.lookup_cached_query(package_name) { + Ok(Some(cached)) => { + return Ok(cached); + } + Ok(None) => {} + Err(e) => { + tracing::warn!( + package_name, + error = &*e, + "An unexpected error occurred while checking the local query cache", + ); + } + } + } + + let response = self.query_graphql(package_name).await?; + if let Some(cache) = &self.cache { + if let Err(e) = cache.update(package_name, &response) { + tracing::warn!( + package_name, + error = &*e, + "An error occurred while caching the GraphQL response", + ); + } + } + + Ok(response) + } + + #[tracing::instrument(level = "debug", skip_all)] + async fn query_graphql(&self, package_name: &str) -> Result { #[derive(serde::Serialize)] struct Body { query: String, } let body = Body { - query: WASMER_WEBC_QUERY_ALL.replace("$NAME", full_name), + query: WASMER_WEBC_QUERY_ALL.replace("$NAME", package_name), }; let request = HttpRequest { @@ -75,13 +119,28 @@ impl Source for WapmSource { %response.status, %response.redirected, ?response.headers, - response.body=?String::from_utf8_lossy(&body), + response.body=String::from_utf8_lossy(&body).as_ref(), "Received a response from GraphQL", ); let response: WapmWebQuery = serde_json::from_slice(&body).context("Unable to deserialize the response")?; + Ok(response) + } +} + +#[async_trait::async_trait] +impl Source for WapmSource { + #[tracing::instrument(level = "debug", skip_all, fields(%package))] + async fn query(&self, package: &PackageSpecifier) -> Result, Error> { + let (full_name, version_constraint) = match package { + PackageSpecifier::Registry { full_name, version } => (full_name, version), + _ => return Ok(Vec::new()), + }; + + let response: WapmWebQuery = self.lookup_package(full_name).await?; + let mut summaries = Vec::new(); let versions = match response.data.get_package { @@ -149,6 +208,149 @@ fn decode_summary(pkg_version: WapmWebQueryGetPackageVersion) -> Result) -> Self { + FileSystemCache { + cache_dir: cache_dir.into(), + timeout: FileSystemCache::DEFAULT_TIMEOUT, + } + } + + fn path(&self, package_name: &str) -> PathBuf { + self.cache_dir.join(package_name) + } + + fn lookup_cached_query(&self, package_name: &str) -> Result, Error> { + let filename = self.path(package_name); + + let _span = + tracing::debug_span!("lookup_cached_query", filename=%filename.display()).entered(); + + tracing::trace!("Reading cached entry from disk"); + let json = match std::fs::read(&filename) { + Ok(json) => json, + Err(e) if e.kind() == std::io::ErrorKind::NotFound => { + tracing::debug!("Cache miss"); + return Ok(None); + } + Err(e) => { + return Err( + Error::new(e).context(format!("Unable to read \"{}\"", filename.display())) + ); + } + }; + + let entry: CacheEntry = match serde_json::from_slice(&json) { + Ok(entry) => entry, + Err(e) => { + // If the entry is invalid, we should delete it to avoid work + // in the future + let _ = std::fs::remove_file(&filename); + + return Err(Error::new(e).context("Unable to parse the cached query")); + } + }; + + if !entry.is_still_valid(self.timeout) { + tracing::debug!(timestamp = entry.unix_timestamp, "Cached entry is stale"); + let _ = std::fs::remove_file(&filename); + return Ok(None); + } + + if entry.package_name != package_name { + let _ = std::fs::remove_file(&filename); + anyhow::bail!( + "The cached response at \"{}\" corresponds to the \"{}\" package, but expected \"{}\"", + filename.display(), + entry.package_name, + package_name, + ); + } + + Ok(Some(entry.response)) + } + + fn update(&self, package_name: &str, response: &WapmWebQuery) -> Result<(), Error> { + let entry = CacheEntry { + unix_timestamp: SystemTime::UNIX_EPOCH + .elapsed() + .unwrap_or_default() + .as_secs(), + package_name: package_name.to_string(), + response: response.clone(), + }; + + let _ = std::fs::create_dir_all(&self.cache_dir); + + // First, save our cache entry to disk + let mut temp = tempfile::NamedTempFile::new_in(&self.cache_dir) + .context("Unable to create a temporary file")?; + serde_json::to_writer_pretty(&mut temp, &entry) + .context("Unable to serialize the cache entry")?; + temp.as_file() + .sync_all() + .context("Flushing the temp file failed")?; + + // Now we've saved our cache entry we need to move it to the right + // location. We do this in two steps so concurrent queries don't see + // the cache entry until it has been completely written. + let filename = self.path(package_name); + tracing::debug!( + filename=%filename.display(), + package_name, + "Saving the query to disk", + ); + + if let Some(parent) = filename.parent() { + let _ = std::fs::create_dir_all(parent); + } + temp.persist(&filename).with_context(|| { + format!( + "Unable to persist the temp file to \"{}\"", + filename.display() + ) + })?; + + Ok(()) + } +} + +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] +struct CacheEntry { + unix_timestamp: u64, + package_name: String, + response: WapmWebQuery, +} + +impl CacheEntry { + fn is_still_valid(&self, timeout: Duration) -> bool { + let timestamp = SystemTime::UNIX_EPOCH + Duration::from_secs(self.unix_timestamp); + + match timestamp.elapsed() { + Ok(duration) if duration <= timeout => true, + Ok(_) => { + // The cached response is too old + false + } + Err(_) => { + // It looks like the current time is **after** the time this + // entry was recorded. That probably indicates a clock issue + // so we should mark the cached value as invalid. + false + } + } + } +} + #[allow(dead_code)] pub const WASMER_WEBC_QUERY_ALL: &str = r#"{ getPackage(name: "$NAME") { From 0c8220ce22d1eb5dd89f691a63c00595ba59aea2 Mon Sep 17 00:00:00 2001 From: Michael-F-Bryan Date: Fri, 16 Jun 2023 16:48:35 +0800 Subject: [PATCH 4/6] Make the WapmSource local cache timeout configurable --- lib/cli/src/commands/run/wasi.rs | 7 +++++-- lib/wasix/src/runtime/resolver/wapm_source.rs | 10 ++++------ 2 files changed, 9 insertions(+), 8 deletions(-) diff --git a/lib/cli/src/commands/run/wasi.rs b/lib/cli/src/commands/run/wasi.rs index 8c48901640f..273329926b1 100644 --- a/lib/cli/src/commands/run/wasi.rs +++ b/lib/cli/src/commands/run/wasi.rs @@ -2,6 +2,7 @@ use std::{ collections::{BTreeSet, HashMap}, path::{Path, PathBuf}, sync::{mpsc::Sender, Arc}, + time::Duration, }; use anyhow::{Context, Result}; @@ -36,6 +37,8 @@ use wasmer_wasix::{ use crate::utils::{parse_envvar, parse_mapdir}; +const WAPM_SOURCE_CACHE_TIMEOUT: Duration = Duration::from_secs(10 * 60); + #[derive(Debug, Parser, Clone, Default)] /// WASI Options pub struct Wasi { @@ -342,8 +345,8 @@ impl Wasi { let graphql_endpoint = self.graphql_endpoint(wasmer_dir)?; let cache_dir = WapmSource::default_cache_dir(wasmer_dir); - let wapm_source = - WapmSource::new(graphql_endpoint, Arc::clone(&client)).with_local_cache(cache_dir); + let wapm_source = WapmSource::new(graphql_endpoint, Arc::clone(&client)) + .with_local_cache(cache_dir, WAPM_SOURCE_CACHE_TIMEOUT); source.add_source(wapm_source); let cache_dir = WebSource::default_cache_dir(wasmer_dir); diff --git a/lib/wasix/src/runtime/resolver/wapm_source.rs b/lib/wasix/src/runtime/resolver/wapm_source.rs index 1265405057e..1ef69c148bf 100644 --- a/lib/wasix/src/runtime/resolver/wapm_source.rs +++ b/lib/wasix/src/runtime/resolver/wapm_source.rs @@ -45,9 +45,9 @@ impl WapmSource { } /// Cache query results locally. - pub fn with_local_cache(self, cache_dir: impl Into) -> Self { + pub fn with_local_cache(self, cache_dir: impl Into, timeout: Duration) -> Self { WapmSource { - cache: Some(FileSystemCache::new(cache_dir)), + cache: Some(FileSystemCache::new(cache_dir, timeout)), ..self } } @@ -216,12 +216,10 @@ struct FileSystemCache { } impl FileSystemCache { - const DEFAULT_TIMEOUT: Duration = Duration::from_secs(10 * 60); - - fn new(cache_dir: impl Into) -> Self { + fn new(cache_dir: impl Into, timeout: Duration) -> Self { FileSystemCache { cache_dir: cache_dir.into(), - timeout: FileSystemCache::DEFAULT_TIMEOUT, + timeout, } } From aecb4282a655eadd3067c3621ae6864a69212bae Mon Sep 17 00:00:00 2001 From: Michael-F-Bryan Date: Fri, 16 Jun 2023 17:17:24 +0800 Subject: [PATCH 5/6] Added some quick'n'dirty cache invalidation to "wasmer publish" --- lib/cli/src/commands/publish.rs | 29 ++++++++++++++++++- lib/wasix/src/runtime/resolver/wapm_source.rs | 14 +++++++++ 2 files changed, 42 insertions(+), 1 deletion(-) diff --git a/lib/cli/src/commands/publish.rs b/lib/cli/src/commands/publish.rs index 307e686c83d..14aa1ef15a4 100644 --- a/lib/cli/src/commands/publish.rs +++ b/lib/cli/src/commands/publish.rs @@ -1,4 +1,7 @@ +use anyhow::Context; use clap::Parser; +use wasmer_registry::WasmerConfig; +use wasmer_wasix::runtime::resolver::WapmSource; /// Publish a package to the package registry. #[derive(Debug, Parser)] @@ -44,7 +47,16 @@ impl Publish { no_validate: self.no_validate, package_path: self.package_path.clone(), }; - publish.execute().map_err(on_error) + publish.execute().map_err(on_error)?; + + if let Err(e) = invalidate_graphql_query_cache() { + tracing::warn!( + error = &*e, + "Unable to invalidate the cache used for package version queries", + ); + } + + Ok(()) } } @@ -54,3 +66,18 @@ fn on_error(e: anyhow::Error) -> anyhow::Error { e } + +// HACK: We want to invalidate the cache used for GraphQL queries so +// the current user sees the results of publishing immediately. There +// are cleaner ways to achieve this, but for now we're just going to +// clear out the whole GraphQL query cache. +// See https://github.com/wasmerio/wasmer/pull/3983 for more +fn invalidate_graphql_query_cache() -> Result<(), anyhow::Error> { + let wasmer_dir = WasmerConfig::get_wasmer_dir() + .map_err(anyhow::Error::msg) + .context("Unable to determine the wasmer dir")?; + + let cache_dir = WapmSource::invalidate_local_cache(wasmer_dir)?; + + Ok(()) +} diff --git a/lib/wasix/src/runtime/resolver/wapm_source.rs b/lib/wasix/src/runtime/resolver/wapm_source.rs index 1ef69c148bf..e162bdd2c85 100644 --- a/lib/wasix/src/runtime/resolver/wapm_source.rs +++ b/lib/wasix/src/runtime/resolver/wapm_source.rs @@ -52,6 +52,20 @@ impl WapmSource { } } + /// Clean the local cache. + /// + /// This is a workaround used primarily when publishing and will probably + /// be removed in the future. + pub fn invalidate_local_cache(wasmer_dir: impl AsRef) -> Result<(), Error> { + let cache_dir = WapmSource::default_cache_dir(wasmer_dir); + + std::fs::remove_dir_all(&cache_dir).with_context(|| { + format!("Unable to delete the \"{}\" directory", cache_dir.display()) + })?; + + Ok(()) + } + async fn lookup_package(&self, package_name: &str) -> Result { if let Some(cache) = &self.cache { match cache.lookup_cached_query(package_name) { From bba3f9d116508be92a4ada7a44bbaa27616f7950 Mon Sep 17 00:00:00 2001 From: Michael-F-Bryan Date: Fri, 16 Jun 2023 17:36:44 +0800 Subject: [PATCH 6/6] Remove an unused variable --- lib/cli/src/commands/publish.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/cli/src/commands/publish.rs b/lib/cli/src/commands/publish.rs index 14aa1ef15a4..bd332d6276c 100644 --- a/lib/cli/src/commands/publish.rs +++ b/lib/cli/src/commands/publish.rs @@ -77,7 +77,7 @@ fn invalidate_graphql_query_cache() -> Result<(), anyhow::Error> { .map_err(anyhow::Error::msg) .context("Unable to determine the wasmer dir")?; - let cache_dir = WapmSource::invalidate_local_cache(wasmer_dir)?; + WapmSource::invalidate_local_cache(wasmer_dir)?; Ok(()) }