Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Always re-execute a registry query when cache lookups fail #4326

Merged
merged 2 commits into from
Nov 23, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
245 changes: 166 additions & 79 deletions lib/wasix/src/runtime/resolver/wapm_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use std::{

use anyhow::{Context, Error};
use http::{HeaderMap, Method};
use semver::Version;
use semver::{Version, VersionReq};
use url::Url;
use webc::metadata::Manifest;

Expand Down Expand Up @@ -60,39 +60,6 @@ impl WapmSource {
&self.registry_endpoint
}

async fn lookup_package(&self, package_name: &str) -> Result<WapmWebQuery, Error> {
if let Some(cache) = &self.cache {
match cache.lookup_cached_query(package_name) {
Ok(Some(cached)) => {
tracing::debug!("Cache hit!");
return Ok(cached);
}
Ok(None) => {}
Err(e) => {
tracing::warn!(
package_name,
error = &*e,
"An unexpected error occurred while checking the local query cache",
);
}
}
}

let response = self.query_graphql(package_name).await?;

if let Some(cache) = &self.cache {
if let Err(e) = cache.update(package_name, &response) {
tracing::warn!(
package_name,
error = &*e,
"An error occurred while caching the GraphQL response",
);
}
}

Ok(response)
}

#[tracing::instrument(level = "debug", skip_all)]
async fn query_graphql(&self, package_name: &str) -> Result<WapmWebQuery, Error> {
#[derive(serde::Serialize)]
Expand Down Expand Up @@ -166,68 +133,104 @@ impl WapmSource {
impl Source for WapmSource {
#[tracing::instrument(level = "debug", skip_all, fields(%package))]
async fn query(&self, package: &PackageSpecifier) -> Result<Vec<PackageSummary>, QueryError> {
let (full_name, version_constraint) = match package {
let (package_name, version_constraint) = match package {
PackageSpecifier::Registry { full_name, version } => (full_name, version),
_ => return Err(QueryError::Unsupported),
};

let response: WapmWebQuery = self.lookup_package(full_name).await?;

let mut summaries = Vec::new();

let WapmWebQueryGetPackage {
package_name,
namespace,
versions,
} = response.data.get_package.ok_or(QueryError::NotFound)?;
let mut archived_versions = Vec::new();

for pkg_version in versions {
let version = match Version::parse(&pkg_version.version) {
Ok(v) => v,
if let Some(cache) = &self.cache {
match cache.lookup_cached_query(package_name) {
Ok(Some(cached)) => {
if let Ok(cached) = matching_package_summaries(cached, version_constraint) {
tracing::debug!("Cache hit!");
return Ok(cached);
}
}
Ok(None) => {}
Err(e) => {
tracing::debug!(
pkg.version = pkg_version.version.as_str(),
error = &e as &dyn std::error::Error,
"Skipping a version because it doesn't have a valid version numer",
tracing::warn!(
package_name,
error = &*e,
"An unexpected error occurred while checking the local query cache",
);
continue;
}
};
}
}

if pkg_version.is_archived {
let response = self.query_graphql(package_name).await?;

if let Some(cache) = &self.cache {
if let Err(e) = cache.update(package_name, &response) {
tracing::warn!(
package_name,
error = &*e,
"An error occurred while caching the GraphQL response",
);
}
}

matching_package_summaries(response, version_constraint)
}
}

fn matching_package_summaries(
response: WapmWebQuery,
version_constraint: &VersionReq,
) -> Result<Vec<PackageSummary>, QueryError> {
let mut summaries = Vec::new();

let WapmWebQueryGetPackage {
package_name,
namespace,
versions,
} = response.data.get_package.ok_or(QueryError::NotFound)?;
let mut archived_versions = Vec::new();

for pkg_version in versions {
let version = match Version::parse(&pkg_version.version) {
Ok(v) => v,
Err(e) => {
tracing::debug!(
pkg.version=%version,
"Skipping an archived version",
pkg.version = pkg_version.version.as_str(),
error = &e as &dyn std::error::Error,
"Skipping a version because it doesn't have a valid version numer",
);
archived_versions.push(version);
continue;
}
};

if version_constraint.matches(&version) {
match decode_summary(
pkg_version,
&response.data.info.default_frontend,
&namespace,
&package_name,
) {
Ok(summary) => summaries.push(summary),
Err(e) => {
tracing::debug!(
version=%version,
error=&*e,
"Skipping version because its metadata couldn't be parsed"
);
}
if pkg_version.is_archived {
tracing::debug!(
pkg.version=%version,
"Skipping an archived version",
);
archived_versions.push(version);
continue;
}

if version_constraint.matches(&version) {
match decode_summary(
pkg_version,
&response.data.info.default_frontend,
&namespace,
&package_name,
) {
Ok(summary) => summaries.push(summary),
Err(e) => {
tracing::debug!(
version=%version,
error=&*e,
"Skipping version because its metadata couldn't be parsed"
);
}
}
}
}

if summaries.is_empty() {
Err(QueryError::NoMatches { archived_versions })
} else {
Ok(summaries)
}
if summaries.is_empty() {
Err(QueryError::NoMatches { archived_versions })
} else {
Ok(summaries)
}
}

Expand Down Expand Up @@ -719,4 +722,88 @@ mod tests {
assert_eq!(summaries.len(), 1);
assert_eq!(summaries[0].pkg.version.to_string(), "3.12.1");
}

#[tokio::test]
async fn query_the_backend_again_if_cached_queries_dont_match() {
let cached_value = serde_json::from_value(serde_json::json! {
{
"data": {
"getPackage": {
"packageName": "python",
"namespace": "wasmer",
"versions": [
{
"version": "3.12.0",
"piritaManifest": "{\"package\": {\"wapm\": {\"name\": \"wasmer/python\", \"version\": \"3.12.0\", \"description\": \"Python\"}}}",
"distribution": {
"piritaDownloadUrl": "https://wasmer.io/wasmer/[email protected]",
"piritaSha256Hash": "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"
}
},
]
},
"info": {
"defaultFrontend": "https://wasmer.io/",
},
}
}
}).unwrap();
let body = serde_json::json! {
{
"data": {
"getPackage": {
"packageName": "python",
"namespace": "wasmer",
"versions": [
{
"version": "4.0.0",
"piritaManifest": "{\"package\": {\"wapm\": {\"name\": \"wasmer/python\", \"version\": \"4.0.0\", \"description\": \"Python\"}}}",
"distribution": {
"piritaDownloadUrl": "https://wasmer.io/wasmer/[email protected]",
"piritaSha256Hash": "bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb"
}
},
{
"version": "3.12.0",
"piritaManifest": "{\"package\": {\"wapm\": {\"name\": \"wasmer/python\", \"version\": \"3.12.0\", \"description\": \"Python\"}}}",
"distribution": {
"piritaDownloadUrl": "https://wasmer.io/wasmer/[email protected]",
"piritaSha256Hash": "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"
}
},
]
},
"info": {
"defaultFrontend": "https://wasmer.io/",
},
}
}
};
let response = HttpResponse {
body: Some(serde_json::to_vec(&body).unwrap()),
redirected: false,
status: StatusCode::OK,
headers: HeaderMap::new(),
};
let client = Arc::new(DummyClient::new(vec![response]));
let registry_endpoint = WapmSource::WASMER_PROD_ENDPOINT.parse().unwrap();
let request = PackageSpecifier::Registry {
full_name: "wasmer/python".to_string(),
version: "4.0.0".parse().unwrap(),
};
let temp = tempfile::tempdir().unwrap();
let source = WapmSource::new(registry_endpoint, client.clone())
.with_local_cache(temp.path(), Duration::from_secs(0));
source
.cache
.as_ref()
.unwrap()
.update("wasmer/python", &cached_value)
.unwrap();

let summaries = source.query(&request).await.unwrap();

assert_eq!(summaries.len(), 1);
assert_eq!(summaries[0].pkg.version.to_string(), "4.0.0");
}
}
Loading