From 817705dfc1bf14d2be73f8d65b251649e45908d1 Mon Sep 17 00:00:00 2001 From: Michael-F-Bryan Date: Thu, 15 Jun 2023 21:38:29 +0800 Subject: [PATCH] Add filesystem caching for WAPM queries --- lib/cli/src/commands/run/wasi.rs | 5 +- lib/wasix/src/runtime/resolver/wapm_source.rs | 226 +++++++++++++++++- 2 files changed, 218 insertions(+), 13 deletions(-) diff --git a/lib/cli/src/commands/run/wasi.rs b/lib/cli/src/commands/run/wasi.rs index b945351c1f4..8c48901640f 100644 --- a/lib/cli/src/commands/run/wasi.rs +++ b/lib/cli/src/commands/run/wasi.rs @@ -341,7 +341,10 @@ impl Wasi { source.add_source(preloaded); let graphql_endpoint = self.graphql_endpoint(wasmer_dir)?; - source.add_source(WapmSource::new(graphql_endpoint, Arc::clone(&client))); + let cache_dir = WapmSource::default_cache_dir(wasmer_dir); + let wapm_source = + WapmSource::new(graphql_endpoint, Arc::clone(&client)).with_local_cache(cache_dir); + source.add_source(wapm_source); let cache_dir = WebSource::default_cache_dir(wasmer_dir); source.add_source(WebSource::new(cache_dir, client)); diff --git a/lib/wasix/src/runtime/resolver/wapm_source.rs b/lib/wasix/src/runtime/resolver/wapm_source.rs index 6dc2695434d..1265405057e 100644 --- a/lib/wasix/src/runtime/resolver/wapm_source.rs +++ b/lib/wasix/src/runtime/resolver/wapm_source.rs @@ -1,4 +1,8 @@ -use std::sync::Arc; +use std::{ + path::{Path, PathBuf}, + sync::Arc, + time::{Duration, SystemTime}, +}; use anyhow::{Context, Error}; use http::{HeaderMap, Method}; @@ -19,6 +23,7 @@ use crate::{ pub struct WapmSource { registry_endpoint: Url, client: Arc, + cache: Option, } impl WapmSource { @@ -29,26 +34,65 @@ impl WapmSource { WapmSource { registry_endpoint, client, + cache: None, } } -} -#[async_trait::async_trait] -impl Source for WapmSource { - #[tracing::instrument(level = "debug", skip_all, fields(%package))] - async fn query(&self, package: &PackageSpecifier) -> Result, Error> { - let (full_name, version_constraint) = match package { - PackageSpecifier::Registry { full_name, version } => (full_name, version), - _ => return Ok(Vec::new()), - }; + /// Get the directory that is typically used when caching downloaded + /// packages inside `$WASMER_DIR`. + pub fn default_cache_dir(wasmer_dir: impl AsRef) -> PathBuf { + wasmer_dir.as_ref().join("queries") + } + + /// Cache query results locally. + pub fn with_local_cache(self, cache_dir: impl Into) -> Self { + WapmSource { + cache: Some(FileSystemCache::new(cache_dir)), + ..self + } + } + + async fn lookup_package(&self, package_name: &str) -> Result { + if let Some(cache) = &self.cache { + match cache.lookup_cached_query(package_name) { + Ok(Some(cached)) => { + return Ok(cached); + } + Ok(None) => {} + Err(e) => { + tracing::warn!( + package_name, + error = &*e, + "An unexpected error occurred while checking the local query cache", + ); + } + } + } + + let response = self.query_graphql(package_name).await?; + if let Some(cache) = &self.cache { + if let Err(e) = cache.update(package_name, &response) { + tracing::warn!( + package_name, + error = &*e, + "An error occurred while caching the GraphQL response", + ); + } + } + + Ok(response) + } + + #[tracing::instrument(level = "debug", skip_all)] + async fn query_graphql(&self, package_name: &str) -> Result { #[derive(serde::Serialize)] struct Body { query: String, } let body = Body { - query: WASMER_WEBC_QUERY_ALL.replace("$NAME", full_name), + query: WASMER_WEBC_QUERY_ALL.replace("$NAME", package_name), }; let request = HttpRequest { @@ -75,13 +119,28 @@ impl Source for WapmSource { %response.status, %response.redirected, ?response.headers, - response.body=?String::from_utf8_lossy(&body), + response.body=String::from_utf8_lossy(&body).as_ref(), "Received a response from GraphQL", ); let response: WapmWebQuery = serde_json::from_slice(&body).context("Unable to deserialize the response")?; + Ok(response) + } +} + +#[async_trait::async_trait] +impl Source for WapmSource { + #[tracing::instrument(level = "debug", skip_all, fields(%package))] + async fn query(&self, package: &PackageSpecifier) -> Result, Error> { + let (full_name, version_constraint) = match package { + PackageSpecifier::Registry { full_name, version } => (full_name, version), + _ => return Ok(Vec::new()), + }; + + let response: WapmWebQuery = self.lookup_package(full_name).await?; + let mut summaries = Vec::new(); let versions = match response.data.get_package { @@ -149,6 +208,149 @@ fn decode_summary(pkg_version: WapmWebQueryGetPackageVersion) -> Result) -> Self { + FileSystemCache { + cache_dir: cache_dir.into(), + timeout: FileSystemCache::DEFAULT_TIMEOUT, + } + } + + fn path(&self, package_name: &str) -> PathBuf { + self.cache_dir.join(package_name) + } + + fn lookup_cached_query(&self, package_name: &str) -> Result, Error> { + let filename = self.path(package_name); + + let _span = + tracing::debug_span!("lookup_cached_query", filename=%filename.display()).entered(); + + tracing::trace!("Reading cached entry from disk"); + let json = match std::fs::read(&filename) { + Ok(json) => json, + Err(e) if e.kind() == std::io::ErrorKind::NotFound => { + tracing::debug!("Cache miss"); + return Ok(None); + } + Err(e) => { + return Err( + Error::new(e).context(format!("Unable to read \"{}\"", filename.display())) + ); + } + }; + + let entry: CacheEntry = match serde_json::from_slice(&json) { + Ok(entry) => entry, + Err(e) => { + // If the entry is invalid, we should delete it to avoid work + // in the future + let _ = std::fs::remove_file(&filename); + + return Err(Error::new(e).context("Unable to parse the cached query")); + } + }; + + if !entry.is_still_valid(self.timeout) { + tracing::debug!(timestamp = entry.unix_timestamp, "Cached entry is stale"); + let _ = std::fs::remove_file(&filename); + return Ok(None); + } + + if entry.package_name != package_name { + let _ = std::fs::remove_file(&filename); + anyhow::bail!( + "The cached response at \"{}\" corresponds to the \"{}\" package, but expected \"{}\"", + filename.display(), + entry.package_name, + package_name, + ); + } + + Ok(Some(entry.response)) + } + + fn update(&self, package_name: &str, response: &WapmWebQuery) -> Result<(), Error> { + let entry = CacheEntry { + unix_timestamp: SystemTime::UNIX_EPOCH + .elapsed() + .unwrap_or_default() + .as_secs(), + package_name: package_name.to_string(), + response: response.clone(), + }; + + let _ = std::fs::create_dir_all(&self.cache_dir); + + // First, save our cache entry to disk + let mut temp = tempfile::NamedTempFile::new_in(&self.cache_dir) + .context("Unable to create a temporary file")?; + serde_json::to_writer_pretty(&mut temp, &entry) + .context("Unable to serialize the cache entry")?; + temp.as_file() + .sync_all() + .context("Flushing the temp file failed")?; + + // Now we've saved our cache entry we need to move it to the right + // location. We do this in two steps so concurrent queries don't see + // the cache entry until it has been completely written. + let filename = self.path(package_name); + tracing::debug!( + filename=%filename.display(), + package_name, + "Saving the query to disk", + ); + + if let Some(parent) = filename.parent() { + let _ = std::fs::create_dir_all(parent); + } + temp.persist(&filename).with_context(|| { + format!( + "Unable to persist the temp file to \"{}\"", + filename.display() + ) + })?; + + Ok(()) + } +} + +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] +struct CacheEntry { + unix_timestamp: u64, + package_name: String, + response: WapmWebQuery, +} + +impl CacheEntry { + fn is_still_valid(&self, timeout: Duration) -> bool { + let timestamp = SystemTime::UNIX_EPOCH + Duration::from_secs(self.unix_timestamp); + + match timestamp.elapsed() { + Ok(duration) if duration <= timeout => true, + Ok(_) => { + // The cached response is too old + false + } + Err(_) => { + // It looks like the current time is **after** the time this + // entry was recorded. That probably indicates a clock issue + // so we should mark the cached value as invalid. + false + } + } + } +} + #[allow(dead_code)] pub const WASMER_WEBC_QUERY_ALL: &str = r#"{ getPackage(name: "$NAME") {