Skip to content

Commit

Permalink
Add filesystem caching for WAPM queries
Browse files Browse the repository at this point in the history
  • Loading branch information
Michael-F-Bryan committed Jun 15, 2023
1 parent 4765c6b commit 817705d
Show file tree
Hide file tree
Showing 2 changed files with 218 additions and 13 deletions.
5 changes: 4 additions & 1 deletion lib/cli/src/commands/run/wasi.rs
Original file line number Diff line number Diff line change
Expand Up @@ -341,7 +341,10 @@ impl Wasi {
source.add_source(preloaded);

let graphql_endpoint = self.graphql_endpoint(wasmer_dir)?;
source.add_source(WapmSource::new(graphql_endpoint, Arc::clone(&client)));
let cache_dir = WapmSource::default_cache_dir(wasmer_dir);
let wapm_source =
WapmSource::new(graphql_endpoint, Arc::clone(&client)).with_local_cache(cache_dir);
source.add_source(wapm_source);

let cache_dir = WebSource::default_cache_dir(wasmer_dir);
source.add_source(WebSource::new(cache_dir, client));
Expand Down
226 changes: 214 additions & 12 deletions lib/wasix/src/runtime/resolver/wapm_source.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
use std::sync::Arc;
use std::{
path::{Path, PathBuf},
sync::Arc,
time::{Duration, SystemTime},
};

use anyhow::{Context, Error};
use http::{HeaderMap, Method};
Expand All @@ -19,6 +23,7 @@ use crate::{
pub struct WapmSource {
registry_endpoint: Url,
client: Arc<dyn HttpClient + Send + Sync>,
cache: Option<FileSystemCache>,
}

impl WapmSource {
Expand All @@ -29,26 +34,65 @@ impl WapmSource {
WapmSource {
registry_endpoint,
client,
cache: None,
}
}
}

#[async_trait::async_trait]
impl Source for WapmSource {
#[tracing::instrument(level = "debug", skip_all, fields(%package))]
async fn query(&self, package: &PackageSpecifier) -> Result<Vec<PackageSummary>, Error> {
let (full_name, version_constraint) = match package {
PackageSpecifier::Registry { full_name, version } => (full_name, version),
_ => return Ok(Vec::new()),
};
/// Get the directory that is typically used when caching downloaded
/// packages inside `$WASMER_DIR`.
pub fn default_cache_dir(wasmer_dir: impl AsRef<Path>) -> PathBuf {
wasmer_dir.as_ref().join("queries")
}

/// Cache query results locally.
pub fn with_local_cache(self, cache_dir: impl Into<PathBuf>) -> Self {
WapmSource {
cache: Some(FileSystemCache::new(cache_dir)),
..self
}
}

async fn lookup_package(&self, package_name: &str) -> Result<WapmWebQuery, Error> {
if let Some(cache) = &self.cache {
match cache.lookup_cached_query(package_name) {
Ok(Some(cached)) => {
return Ok(cached);
}
Ok(None) => {}
Err(e) => {
tracing::warn!(
package_name,
error = &*e,
"An unexpected error occurred while checking the local query cache",
);
}
}
}

let response = self.query_graphql(package_name).await?;

if let Some(cache) = &self.cache {
if let Err(e) = cache.update(package_name, &response) {
tracing::warn!(
package_name,
error = &*e,
"An error occurred while caching the GraphQL response",
);
}
}

Ok(response)
}

#[tracing::instrument(level = "debug", skip_all)]
async fn query_graphql(&self, package_name: &str) -> Result<WapmWebQuery, Error> {
#[derive(serde::Serialize)]
struct Body {
query: String,
}

let body = Body {
query: WASMER_WEBC_QUERY_ALL.replace("$NAME", full_name),
query: WASMER_WEBC_QUERY_ALL.replace("$NAME", package_name),
};

let request = HttpRequest {
Expand All @@ -75,13 +119,28 @@ impl Source for WapmSource {
%response.status,
%response.redirected,
?response.headers,
response.body=?String::from_utf8_lossy(&body),
response.body=String::from_utf8_lossy(&body).as_ref(),
"Received a response from GraphQL",
);

let response: WapmWebQuery =
serde_json::from_slice(&body).context("Unable to deserialize the response")?;

Ok(response)
}
}

#[async_trait::async_trait]
impl Source for WapmSource {
#[tracing::instrument(level = "debug", skip_all, fields(%package))]
async fn query(&self, package: &PackageSpecifier) -> Result<Vec<PackageSummary>, Error> {
let (full_name, version_constraint) = match package {
PackageSpecifier::Registry { full_name, version } => (full_name, version),
_ => return Ok(Vec::new()),
};

let response: WapmWebQuery = self.lookup_package(full_name).await?;

let mut summaries = Vec::new();

let versions = match response.data.get_package {
Expand Down Expand Up @@ -149,6 +208,149 @@ fn decode_summary(pkg_version: WapmWebQueryGetPackageVersion) -> Result<PackageS
})
}

/// A local cache for package queries.
#[derive(Debug, Clone)]
struct FileSystemCache {
cache_dir: PathBuf,
timeout: Duration,
}

impl FileSystemCache {
const DEFAULT_TIMEOUT: Duration = Duration::from_secs(10 * 60);

fn new(cache_dir: impl Into<PathBuf>) -> Self {
FileSystemCache {
cache_dir: cache_dir.into(),
timeout: FileSystemCache::DEFAULT_TIMEOUT,
}
}

fn path(&self, package_name: &str) -> PathBuf {
self.cache_dir.join(package_name)
}

fn lookup_cached_query(&self, package_name: &str) -> Result<Option<WapmWebQuery>, Error> {
let filename = self.path(package_name);

let _span =
tracing::debug_span!("lookup_cached_query", filename=%filename.display()).entered();

tracing::trace!("Reading cached entry from disk");
let json = match std::fs::read(&filename) {
Ok(json) => json,
Err(e) if e.kind() == std::io::ErrorKind::NotFound => {
tracing::debug!("Cache miss");
return Ok(None);
}
Err(e) => {
return Err(
Error::new(e).context(format!("Unable to read \"{}\"", filename.display()))
);
}
};

let entry: CacheEntry = match serde_json::from_slice(&json) {
Ok(entry) => entry,
Err(e) => {
// If the entry is invalid, we should delete it to avoid work
// in the future
let _ = std::fs::remove_file(&filename);

return Err(Error::new(e).context("Unable to parse the cached query"));
}
};

if !entry.is_still_valid(self.timeout) {
tracing::debug!(timestamp = entry.unix_timestamp, "Cached entry is stale");
let _ = std::fs::remove_file(&filename);
return Ok(None);
}

if entry.package_name != package_name {
let _ = std::fs::remove_file(&filename);
anyhow::bail!(
"The cached response at \"{}\" corresponds to the \"{}\" package, but expected \"{}\"",
filename.display(),
entry.package_name,
package_name,
);
}

Ok(Some(entry.response))
}

fn update(&self, package_name: &str, response: &WapmWebQuery) -> Result<(), Error> {
let entry = CacheEntry {
unix_timestamp: SystemTime::UNIX_EPOCH
.elapsed()
.unwrap_or_default()
.as_secs(),
package_name: package_name.to_string(),
response: response.clone(),
};

let _ = std::fs::create_dir_all(&self.cache_dir);

// First, save our cache entry to disk
let mut temp = tempfile::NamedTempFile::new_in(&self.cache_dir)
.context("Unable to create a temporary file")?;
serde_json::to_writer_pretty(&mut temp, &entry)
.context("Unable to serialize the cache entry")?;
temp.as_file()
.sync_all()
.context("Flushing the temp file failed")?;

// Now we've saved our cache entry we need to move it to the right
// location. We do this in two steps so concurrent queries don't see
// the cache entry until it has been completely written.
let filename = self.path(package_name);
tracing::debug!(
filename=%filename.display(),
package_name,
"Saving the query to disk",
);

if let Some(parent) = filename.parent() {
let _ = std::fs::create_dir_all(parent);
}
temp.persist(&filename).with_context(|| {
format!(
"Unable to persist the temp file to \"{}\"",
filename.display()
)
})?;

Ok(())
}
}

#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
struct CacheEntry {
unix_timestamp: u64,
package_name: String,
response: WapmWebQuery,
}

impl CacheEntry {
fn is_still_valid(&self, timeout: Duration) -> bool {
let timestamp = SystemTime::UNIX_EPOCH + Duration::from_secs(self.unix_timestamp);

match timestamp.elapsed() {
Ok(duration) if duration <= timeout => true,
Ok(_) => {
// The cached response is too old
false
}
Err(_) => {
// It looks like the current time is **after** the time this
// entry was recorded. That probably indicates a clock issue
// so we should mark the cached value as invalid.
false
}
}
}
}

#[allow(dead_code)]
pub const WASMER_WEBC_QUERY_ALL: &str = r#"{
getPackage(name: "$NAME") {
Expand Down

0 comments on commit 817705d

Please sign in to comment.