Skip to content

Commit

Permalink
Merge pull request #4326 from wasmerio/retry-registry-queries
Browse files Browse the repository at this point in the history
Always re-execute a registry query when cache lookups fail
  • Loading branch information
Michael Bryan authored Nov 23, 2023
2 parents 5afcc59 + 837977c commit 5563c1c
Showing 1 changed file with 166 additions and 79 deletions.
245 changes: 166 additions & 79 deletions lib/wasix/src/runtime/resolver/wapm_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use std::{

use anyhow::{Context, Error};
use http::{HeaderMap, Method};
use semver::Version;
use semver::{Version, VersionReq};
use url::Url;
use webc::metadata::Manifest;

Expand Down Expand Up @@ -60,39 +60,6 @@ impl WapmSource {
&self.registry_endpoint
}

async fn lookup_package(&self, package_name: &str) -> Result<WapmWebQuery, Error> {
if let Some(cache) = &self.cache {
match cache.lookup_cached_query(package_name) {
Ok(Some(cached)) => {
tracing::debug!("Cache hit!");
return Ok(cached);
}
Ok(None) => {}
Err(e) => {
tracing::warn!(
package_name,
error = &*e,
"An unexpected error occurred while checking the local query cache",
);
}
}
}

let response = self.query_graphql(package_name).await?;

if let Some(cache) = &self.cache {
if let Err(e) = cache.update(package_name, &response) {
tracing::warn!(
package_name,
error = &*e,
"An error occurred while caching the GraphQL response",
);
}
}

Ok(response)
}

#[tracing::instrument(level = "debug", skip_all)]
async fn query_graphql(&self, package_name: &str) -> Result<WapmWebQuery, Error> {
#[derive(serde::Serialize)]
Expand Down Expand Up @@ -166,68 +133,104 @@ impl WapmSource {
impl Source for WapmSource {
#[tracing::instrument(level = "debug", skip_all, fields(%package))]
async fn query(&self, package: &PackageSpecifier) -> Result<Vec<PackageSummary>, QueryError> {
let (full_name, version_constraint) = match package {
let (package_name, version_constraint) = match package {
PackageSpecifier::Registry { full_name, version } => (full_name, version),
_ => return Err(QueryError::Unsupported),
};

let response: WapmWebQuery = self.lookup_package(full_name).await?;

let mut summaries = Vec::new();

let WapmWebQueryGetPackage {
package_name,
namespace,
versions,
} = response.data.get_package.ok_or(QueryError::NotFound)?;
let mut archived_versions = Vec::new();

for pkg_version in versions {
let version = match Version::parse(&pkg_version.version) {
Ok(v) => v,
if let Some(cache) = &self.cache {
match cache.lookup_cached_query(package_name) {
Ok(Some(cached)) => {
if let Ok(cached) = matching_package_summaries(cached, version_constraint) {
tracing::debug!("Cache hit!");
return Ok(cached);
}
}
Ok(None) => {}
Err(e) => {
tracing::debug!(
pkg.version = pkg_version.version.as_str(),
error = &e as &dyn std::error::Error,
"Skipping a version because it doesn't have a valid version numer",
tracing::warn!(
package_name,
error = &*e,
"An unexpected error occurred while checking the local query cache",
);
continue;
}
};
}
}

if pkg_version.is_archived {
let response = self.query_graphql(package_name).await?;

if let Some(cache) = &self.cache {
if let Err(e) = cache.update(package_name, &response) {
tracing::warn!(
package_name,
error = &*e,
"An error occurred while caching the GraphQL response",
);
}
}

matching_package_summaries(response, version_constraint)
}
}

fn matching_package_summaries(
response: WapmWebQuery,
version_constraint: &VersionReq,
) -> Result<Vec<PackageSummary>, QueryError> {
let mut summaries = Vec::new();

let WapmWebQueryGetPackage {
package_name,
namespace,
versions,
} = response.data.get_package.ok_or(QueryError::NotFound)?;
let mut archived_versions = Vec::new();

for pkg_version in versions {
let version = match Version::parse(&pkg_version.version) {
Ok(v) => v,
Err(e) => {
tracing::debug!(
pkg.version=%version,
"Skipping an archived version",
pkg.version = pkg_version.version.as_str(),
error = &e as &dyn std::error::Error,
"Skipping a version because it doesn't have a valid version numer",
);
archived_versions.push(version);
continue;
}
};

if version_constraint.matches(&version) {
match decode_summary(
pkg_version,
&response.data.info.default_frontend,
&namespace,
&package_name,
) {
Ok(summary) => summaries.push(summary),
Err(e) => {
tracing::debug!(
version=%version,
error=&*e,
"Skipping version because its metadata couldn't be parsed"
);
}
if pkg_version.is_archived {
tracing::debug!(
pkg.version=%version,
"Skipping an archived version",
);
archived_versions.push(version);
continue;
}

if version_constraint.matches(&version) {
match decode_summary(
pkg_version,
&response.data.info.default_frontend,
&namespace,
&package_name,
) {
Ok(summary) => summaries.push(summary),
Err(e) => {
tracing::debug!(
version=%version,
error=&*e,
"Skipping version because its metadata couldn't be parsed"
);
}
}
}
}

if summaries.is_empty() {
Err(QueryError::NoMatches { archived_versions })
} else {
Ok(summaries)
}
if summaries.is_empty() {
Err(QueryError::NoMatches { archived_versions })
} else {
Ok(summaries)
}
}

Expand Down Expand Up @@ -719,4 +722,88 @@ mod tests {
assert_eq!(summaries.len(), 1);
assert_eq!(summaries[0].pkg.version.to_string(), "3.12.1");
}

#[tokio::test]
async fn query_the_backend_again_if_cached_queries_dont_match() {
let cached_value = serde_json::from_value(serde_json::json! {
{
"data": {
"getPackage": {
"packageName": "python",
"namespace": "wasmer",
"versions": [
{
"version": "3.12.0",
"piritaManifest": "{\"package\": {\"wapm\": {\"name\": \"wasmer/python\", \"version\": \"3.12.0\", \"description\": \"Python\"}}}",
"distribution": {
"piritaDownloadUrl": "https://wasmer.io/wasmer/[email protected]",
"piritaSha256Hash": "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"
}
},
]
},
"info": {
"defaultFrontend": "https://wasmer.io/",
},
}
}
}).unwrap();
let body = serde_json::json! {
{
"data": {
"getPackage": {
"packageName": "python",
"namespace": "wasmer",
"versions": [
{
"version": "4.0.0",
"piritaManifest": "{\"package\": {\"wapm\": {\"name\": \"wasmer/python\", \"version\": \"4.0.0\", \"description\": \"Python\"}}}",
"distribution": {
"piritaDownloadUrl": "https://wasmer.io/wasmer/[email protected]",
"piritaSha256Hash": "bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb"
}
},
{
"version": "3.12.0",
"piritaManifest": "{\"package\": {\"wapm\": {\"name\": \"wasmer/python\", \"version\": \"3.12.0\", \"description\": \"Python\"}}}",
"distribution": {
"piritaDownloadUrl": "https://wasmer.io/wasmer/[email protected]",
"piritaSha256Hash": "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"
}
},
]
},
"info": {
"defaultFrontend": "https://wasmer.io/",
},
}
}
};
let response = HttpResponse {
body: Some(serde_json::to_vec(&body).unwrap()),
redirected: false,
status: StatusCode::OK,
headers: HeaderMap::new(),
};
let client = Arc::new(DummyClient::new(vec![response]));
let registry_endpoint = WapmSource::WASMER_PROD_ENDPOINT.parse().unwrap();
let request = PackageSpecifier::Registry {
full_name: "wasmer/python".to_string(),
version: "4.0.0".parse().unwrap(),
};
let temp = tempfile::tempdir().unwrap();
let source = WapmSource::new(registry_endpoint, client.clone())
.with_local_cache(temp.path(), Duration::from_secs(0));
source
.cache
.as_ref()
.unwrap()
.update("wasmer/python", &cached_value)
.unwrap();

let summaries = source.query(&request).await.unwrap();

assert_eq!(summaries.len(), 1);
assert_eq!(summaries[0].pkg.version.to_string(), "4.0.0");
}
}

0 comments on commit 5563c1c

Please sign in to comment.