From e4e3a511a7d241806c288ee5d737717b8c339aa0 Mon Sep 17 00:00:00 2001 From: nichmor Date: Tue, 11 Feb 2025 18:34:32 +0200 Subject: [PATCH 01/23] feat: add archive cache --- crates/rattler_cache/Cargo.toml | 4 +- .../src/archive_cache/cache_key.rs | 64 +++ .../src/archive_cache/download.rs | 64 +++ crates/rattler_cache/src/archive_cache/mod.rs | 515 ++++++++++++++++++ crates/rattler_cache/src/lib.rs | 1 + .../tests/integration_test.rs | 6 +- crates/simple_spawn_blocking/src/lib.rs | 2 +- 7 files changed, 650 insertions(+), 6 deletions(-) create mode 100644 crates/rattler_cache/src/archive_cache/cache_key.rs create mode 100644 crates/rattler_cache/src/archive_cache/download.rs create mode 100644 crates/rattler_cache/src/archive_cache/mod.rs diff --git a/crates/rattler_cache/Cargo.toml b/crates/rattler_cache/Cargo.toml index 5352798682..1cf14197e7 100644 --- a/crates/rattler_cache/Cargo.toml +++ b/crates/rattler_cache/Cargo.toml @@ -23,7 +23,9 @@ rattler_digest = { version = "1.0.5", path = "../rattler_digest", default-featur rattler_networking = { version = "0.22.1", path = "../rattler_networking", default-features = false } rattler_package_streaming = { version = "0.22.25", path = "../rattler_package_streaming", default-features = false, features = ["reqwest"] } reqwest.workspace = true +tempfile.workspace = true tokio = { workspace = true, features = ["macros"] } +tokio-util = { workspace = true, features = ["io-util"] } tracing.workspace = true url.workspace = true thiserror.workspace = true @@ -39,7 +41,7 @@ axum.workspace = true bytes.workspace = true futures.workspace = true rstest.workspace = true -tempfile.workspace = true +# tempfile.workspace = true tokio-stream.workspace = true tower-http = { workspace = true, features = ["fs"] } tools = { path = "../tools" } diff --git a/crates/rattler_cache/src/archive_cache/cache_key.rs b/crates/rattler_cache/src/archive_cache/cache_key.rs new file mode 100644 index 0000000000..e953ef84e7 --- /dev/null +++ b/crates/rattler_cache/src/archive_cache/cache_key.rs @@ -0,0 +1,64 @@ +use rattler_conda_types::PackageRecord; +use rattler_digest::Sha256Hash; +use std::fmt::{Display, Formatter}; + +/// Provides a unique identifier for packages in the cache. +/// TODO: This could not be unique over multiple subdir. How to handle? +#[derive(Debug, Hash, Clone, Eq, PartialEq)] +pub struct CacheKey { + pub(crate) name: String, + pub(crate) version: String, + pub(crate) build_string: String, + pub(crate) sha256: Option, +} + +impl CacheKey { + /// Adds a sha256 hash of the archive. + pub fn with_sha256(mut self, sha256: Sha256Hash) -> Self { + self.sha256 = Some(sha256); + self + } + + /// Potentially adds a sha256 hash of the archive. + pub fn with_opt_sha256(mut self, sha256: Option) -> Self { + self.sha256 = sha256; + self + } +} + +impl CacheKey { + /// Return the sha256 hash of the package if it is known. + pub fn sha256(&self) -> Option { + self.sha256 + } + + pub fn sha256_str(&self) -> String { + self.sha256() + .map(|hash| format!("{hash:x}")) + .unwrap_or_default() + } +} + +impl From<&PackageRecord> for CacheKey { + fn from(record: &PackageRecord) -> Self { + Self { + name: record.name.as_normalized().to_string(), + version: record.version.to_string(), + build_string: record.build.clone(), + sha256: record.sha256, + } + } +} + +impl Display for CacheKey { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + write!( + f, + "{}-{}-{}-{}", + &self.name, + &self.version, + &self.build_string, + self.sha256_str() + ) + } +} diff --git a/crates/rattler_cache/src/archive_cache/download.rs b/crates/rattler_cache/src/archive_cache/download.rs new file mode 100644 index 0000000000..16cfb314fd --- /dev/null +++ b/crates/rattler_cache/src/archive_cache/download.rs @@ -0,0 +1,64 @@ +use std::sync::Arc; + +use ::tokio::io::AsyncWriteExt; +use fs_err::tokio; +use futures::StreamExt; +use rattler_package_streaming::DownloadReporter; +use tempfile::NamedTempFile; +// use tokio_stream::StreamExt; +use url::Url; + +/// Download the contents of the archive from the specified remote location +/// and store it in a temporary file. +pub(crate) async fn download( + client: reqwest_middleware::ClientWithMiddleware, + url: Url, + reporter: Option>, +) -> Result { + let temp_file = NamedTempFile::new()?; + + // Send the request for the file + let response = client.get(url.clone()).send().await?.error_for_status()?; + + if let Some(reporter) = &reporter { + reporter.on_download_start(); + } + + let total_bytes = response.content_length(); + // Convert the named temp file into a tokio file + let mut file = tokio::File::from_std(fs_err::File::from_parts( + temp_file.as_file().try_clone().unwrap(), + temp_file.path(), + )); + + let mut stream = response.bytes_stream(); + + let mut bytes_received = 0; + while let Some(chunk_result) = stream.next().await { + let chunk = chunk_result?; + + if let Some(reporter) = &reporter { + bytes_received += chunk.len() as u64; + reporter.on_download_progress(bytes_received, total_bytes); + } + file.write_all(&chunk).await?; + } + + file.flush().await?; + + Ok(temp_file) +} + +/// An error that can occur when downloading an archive. +#[derive(thiserror::Error, Debug)] +#[allow(missing_docs)] +pub enum DownloadError { + #[error("an io error occurred: {0}")] + Io(#[from] std::io::Error), + + #[error(transparent)] + ReqwestMiddleware(#[from] ::reqwest_middleware::Error), + + #[error(transparent)] + Reqwest(#[from] ::reqwest::Error), +} diff --git a/crates/rattler_cache/src/archive_cache/mod.rs b/crates/rattler_cache/src/archive_cache/mod.rs new file mode 100644 index 0000000000..9294863f50 --- /dev/null +++ b/crates/rattler_cache/src/archive_cache/mod.rs @@ -0,0 +1,515 @@ +//! This module provides functionality to cache extracted Conda packages. See +//! [`PackageCache`]. + +use std::{ + fmt::Debug, + future::Future, + path::PathBuf, + sync::Arc, + time::{Duration, SystemTime}, +}; + +use dashmap::DashMap; +use download::DownloadError; +use fs_err::tokio as tokio_fs; +use parking_lot::Mutex; +use rattler_networking::retry_policies::{DoNotRetryPolicy, RetryDecision, RetryPolicy}; +use rattler_package_streaming::DownloadReporter; +use tempfile::NamedTempFile; +use tracing::instrument; +use url::Url; + +mod cache_key; +mod download; + +use cache_key::CacheKey; + +use crate::package_cache::{CacheReporter, PackageCacheError}; + +/// A [`ArchiveCache`] manages a cache of Conda packages on disk. +/// +/// The store does not provide an implementation to get the data into the store. +/// Instead, this is left up to the user when the package is requested. If the +/// package is found in the cache it is returned immediately. However, if the +/// cache is missing a user defined function is called to populate the cache. This +/// separates the corners between caching and fetching of the content. +#[derive(Clone)] +pub struct ArchiveCache { + inner: Arc, +} + +#[derive(Default)] +struct ArchiveCacheInner { + path: PathBuf, + packages: DashMap>>, +} + +/// A key that defines the actual location of the package in the cache. +#[derive(Debug, Hash, Clone, Eq, PartialEq)] +pub struct BucketKey { + name: String, + version: String, + build_string: String, + sha256_string: String, +} + +impl From for BucketKey { + fn from(key: CacheKey) -> Self { + Self { + name: key.name.clone(), + version: key.version.clone(), + build_string: key.build_string.clone(), + sha256_string: key.sha256_str(), + } + } +} + +impl ArchiveCache { + /// Constructs a new [`ArchiveCache`] located at the specified path. + pub fn new(path: impl Into) -> Self { + Self { + inner: Arc::new(ArchiveCacheInner { + path: path.into(), + packages: DashMap::default(), + }), + } + } + + /// Returns the directory that contains the specified package. + /// + /// If the package was previously successfully fetched and stored in the + /// cache the directory containing the data is returned immediately. If + /// the package was not previously fetch the filesystem is checked to + /// see if a directory with valid package content exists. Otherwise, the + /// user provided `fetch` function is called to populate the cache. + /// + /// If the package is already being fetched by another task/thread the + /// request is coalesced. No duplicate fetch is performed. + pub async fn get_or_fetch( + &self, + pkg: impl Into, + fetch: F, + ) -> Result + where + F: (Fn() -> Fut) + Send + 'static, + Fut: Future> + Send + 'static, + E: std::error::Error + Send + Sync + 'static, + { + let cache_key = pkg.into(); + let cache_path = self.inner.path.join(cache_key.to_string()); + let cache_entry = self + .inner + .packages + .entry(cache_key.clone().into()) + .or_default() + .clone(); + + // Acquire the entry. From this point on we can be sure that only one task is + // accessing the cache entry. + let _ = cache_entry.lock().await; + + // Check if the cache entry is already stored in the cache. + if cache_path.exists() { + return Ok(cache_path); + } + + // Otherwise, defer to populate method to fill our cache. + let temp_file = fetch() + .await + .map_err(|e| PackageCacheError::FetchError(Arc::new(e)))?; + + if let Some(parent_dir) = cache_path.parent() { + if !parent_dir.exists() { + tokio_fs::create_dir_all(parent_dir).await.unwrap(); + } + } + + temp_file.persist(&cache_path).unwrap(); + + Ok(cache_path) + } + + /// Returns the directory that contains the specified package. + /// + /// This is a convenience wrapper around `get_or_fetch` which fetches the + /// package from the given URL if the package could not be found in the + /// cache. + pub async fn get_or_fetch_from_url( + &self, + pkg: impl Into, + url: Url, + client: reqwest_middleware::ClientWithMiddleware, + reporter: Option>, + ) -> Result { + self.get_or_fetch_from_url_with_retry(pkg, url, client, DoNotRetryPolicy, reporter) + .await + } + + /// Returns the directory that contains the specified package. + /// + /// This is a convenience wrapper around `get_or_fetch` which fetches the + /// package from the given URL if the package could not be found in the + /// cache. + /// + /// This function assumes that the `client` is already configured with a + /// retry middleware that will retry any request that fails. This function + /// uses the passed in `retry_policy` if, after the request has been sent + /// and the response is successful, streaming of the package data fails + /// and the whole request must be retried. + #[instrument(skip_all, fields(url=%url))] + pub async fn get_or_fetch_from_url_with_retry( + &self, + pkg: impl Into, + url: Url, + client: reqwest_middleware::ClientWithMiddleware, + retry_policy: impl RetryPolicy + Send + 'static + Clone, + reporter: Option>, + ) -> Result { + let request_start = SystemTime::now(); + // Convert into cache key + let cache_key = pkg.into(); + let download_reporter = reporter.clone(); + // Get or fetch the package, using the specified fetch function + self.get_or_fetch(cache_key, move || { + let url = url.clone(); + let client = client.clone(); + let retry_policy = retry_policy.clone(); + let download_reporter = download_reporter.clone(); + async move { + let mut current_try = 0; + // Retry until the retry policy says to stop + loop { + current_try += 1; + tracing::debug!("downloading {}", &url); + // Extract the package + let result = crate::archive_cache::download::download( + client.clone(), + url.clone(), + // &temp_file, + download_reporter.clone().map(|reporter| Arc::new(PassthroughReporter { + reporter, + index: Mutex::new(None), + }) as Arc::), + ) + .await; + + // Extract any potential error + let err = match result { + Ok(result) => return Ok(result), + Err(err) => err, + }; + + // Only retry on io errors. We assume that the user has + // middleware installed that handles connection retries. + if !matches!(&err,DownloadError::Io(_)) { + return Err(err); + } + + // Determine whether to retry based on the retry policy + let execute_after = match retry_policy.should_retry(request_start, current_try) { + RetryDecision::Retry { execute_after } => execute_after, + RetryDecision::DoNotRetry => return Err(err), + }; + let duration = execute_after.duration_since(SystemTime::now()).unwrap_or(Duration::ZERO); + + // Wait for a second to let the remote service restore itself. This increases the + // chance of success. + tracing::warn!( + "failed to download and extract {} {}. Retry #{}, Sleeping {:?} until the next attempt...", + &url, + // destination.display(), + err, + current_try, + duration + ); + tokio::time::sleep(duration).await; + } + } + }) + .await + } +} + +struct PassthroughReporter { + reporter: Arc, + index: Mutex>, +} + +impl DownloadReporter for PassthroughReporter { + fn on_download_start(&self) { + let index = self.reporter.on_download_start(); + assert!( + self.index.lock().replace(index).is_none(), + "on_download_start was called multiple times" + ); + } + + fn on_download_progress(&self, bytes_downloaded: u64, total_bytes: Option) { + let index = self.index.lock().expect("on_download_start was not called"); + self.reporter + .on_download_progress(index, bytes_downloaded, total_bytes); + } + + fn on_download_complete(&self) { + let index = self + .index + .lock() + .take() + .expect("on_download_start was not called"); + self.reporter.on_download_completed(index); + } +} + +#[cfg(test)] +mod test { + use std::{future::IntoFuture, net::SocketAddr, str::FromStr, sync::Arc}; + + use assert_matches::assert_matches; + use axum::{ + body::Body, + extract::State, + http::{Request, StatusCode}, + middleware, + middleware::Next, + response::{Redirect, Response}, + routing::get, + Router, + }; + + use rattler_conda_types::{PackageName, PackageRecord, Version}; + use rattler_digest::{parse_digest_from_hex, Sha256}; + use rattler_networking::retry_policies::{DoNotRetryPolicy, ExponentialBackoffBuilder}; + use reqwest::Client; + use reqwest_middleware::{ClientBuilder, ClientWithMiddleware}; + use reqwest_retry::RetryTransientMiddleware; + use tempfile::tempdir; + use tokio::sync::Mutex; + + use url::Url; + + use super::ArchiveCache; + + #[tokio::test] + pub async fn test_package_cache() { + let package_url = Url::parse("https://conda.anaconda.org/robostack/linux-64/ros-noetic-rosbridge-suite-0.11.14-py39h6fdeb60_14.tar.bz2").unwrap(); + + let cache_dir = tempdir().unwrap().into_path(); + + let cache = ArchiveCache::new(&cache_dir); + + let mut pkg_record = PackageRecord::new( + PackageName::from_str("ros-noetic-rosbridge-suite").unwrap(), + Version::from_str("0.11.14").unwrap(), + "py39h6fdeb60_14".to_string(), + ); + pkg_record.sha256 = Some( + parse_digest_from_hex::( + "4dd9893f1eee45e1579d1a4f5533ef67a84b5e4b7515de7ed0db1dd47adc6bc8", + ) + .unwrap(), + ); + + // Get the package to the cache + let cache_path = cache + .get_or_fetch_from_url( + &pkg_record, + package_url.clone(), + ClientWithMiddleware::from(Client::new()), + None, + ) + .await + .unwrap(); + + assert!(cache_path.exists()); + } + + /// A helper middleware function that fails the first two requests. + async fn fail_the_first_two_requests( + State(count): State>>, + req: Request, + next: Next, + ) -> Result { + let count = { + let mut count = count.lock().await; + *count += 1; + *count + }; + + println!("Running middleware for request #{count} for {}", req.uri()); + if count <= 2 { + println!("Discarding request!"); + return Err(StatusCode::INTERNAL_SERVER_ERROR); + } + + // requires the http crate to get the header name + Ok(next.run(req).await) + } + + enum Middleware { + FailTheFirstTwoRequests, + } + + async fn redirect_to_prefix( + axum::extract::Path((channel, subdir, file)): axum::extract::Path<(String, String, String)>, + ) -> Redirect { + Redirect::permanent(&format!("https://prefix.dev/{channel}/{subdir}/{file}")) + } + + async fn test_flaky_package_cache( + archive_name: &str, + package_record: &PackageRecord, + middleware: Middleware, + ) { + // Construct a service that serves raw files from the test directory + // build our application with a route + let router = Router::new() + // `GET /` goes to `root` + .route("/{channel}/{subdir}/{file}", get(redirect_to_prefix)); + + // Construct a router that returns data from the static dir but fails the first + // try. + let request_count = Arc::new(Mutex::new(0)); + + let router = match middleware { + Middleware::FailTheFirstTwoRequests => router.layer(middleware::from_fn_with_state( + request_count.clone(), + fail_the_first_two_requests, + )), + }; + + // Construct the server that will listen on localhost but with a *random port*. + // The random port is very important because it enables creating + // multiple instances at the same time. We need this to be able to run + // tests in parallel. + let addr = SocketAddr::new([127, 0, 0, 1].into(), 0); + let listener = tokio::net::TcpListener::bind(&addr).await.unwrap(); + let addr = listener.local_addr().unwrap(); + + let service = router.into_make_service(); + tokio::spawn(axum::serve(listener, service).into_future()); + + let packages_dir = tempdir().unwrap(); + let cache = ArchiveCache::new(packages_dir.path()); + + let server_url = Url::parse(&format!("http://localhost:{}", addr.port())).unwrap(); + + let client = ClientBuilder::new(Client::default()).build(); + + // Do the first request without + let result = cache + .get_or_fetch_from_url_with_retry( + package_record, + server_url.join(archive_name).unwrap(), + client.clone(), + DoNotRetryPolicy, + None, + ) + .await; + + // First request without retry policy should fail + assert_matches!(result, Err(_)); + { + let request_count_lock = request_count.lock().await; + assert_eq!(*request_count_lock, 1, "Expected there to be 1 request"); + } + + let retry_policy = ExponentialBackoffBuilder::default().build_with_max_retries(3); + let client = ClientBuilder::from_client(client) + .with(RetryTransientMiddleware::new_with_policy(retry_policy)) + .build(); + + // The second one should fail after the 2nd try + let result = cache + .get_or_fetch_from_url_with_retry( + package_record, + server_url.join(archive_name).unwrap(), + client, + retry_policy, + None, + ) + .await; + + assert!(result.is_ok()); + { + let request_count_lock = request_count.lock().await; + assert_eq!(*request_count_lock, 3, "Expected there to be 3 requests"); + } + } + + #[tokio::test] + async fn test_flaky() { + let tar_bz2 = "conda-forge/win-64/conda-22.9.0-py310h5588dad_2.tar.bz2"; + let conda = "conda-forge/win-64/conda-22.11.1-py38haa244fe_1.conda"; + + let tar_record = PackageRecord::new( + PackageName::from_str("conda").unwrap(), + Version::from_str("22.9.0").unwrap(), + "py310h5588dad_2".to_string(), + ); + + let conda_record = PackageRecord::new( + PackageName::from_str("conda").unwrap(), + Version::from_str("22.11.1").unwrap(), + "py38haa244fe_1".to_string(), + ); + + test_flaky_package_cache(tar_bz2, &tar_record, Middleware::FailTheFirstTwoRequests).await; + test_flaky_package_cache(conda, &conda_record, Middleware::FailTheFirstTwoRequests).await; + } + + #[tokio::test] + // Test if packages with different sha's are replaced even though they share the + // same BucketKey. + pub async fn test_package_cache_key_with_sha() { + let package_url = Url::parse("https://conda.anaconda.org/robostack/linux-64/ros-noetic-rosbridge-suite-0.11.14-py39h6fdeb60_14.tar.bz2").unwrap(); + + let mut pkg_record = PackageRecord::new( + PackageName::from_str("ros-noetic-rosbridge-suite").unwrap(), + Version::from_str("0.11.14").unwrap(), + "py39h6fdeb60_14".to_string(), + ); + pkg_record.sha256 = Some( + parse_digest_from_hex::( + "4dd9893f1eee45e1579d1a4f5533ef67a84b5e4b7515de7ed0db1dd47adc6bc8", + ) + .unwrap(), + ); + + // Create a temporary directory to store the packages + let packages_dir = tempdir().unwrap(); + let cache = ArchiveCache::new(packages_dir.path()); + + // Get the package to the cache + let first_cache_path = cache + .get_or_fetch_from_url( + &pkg_record, + package_url.clone(), + ClientWithMiddleware::from(Client::new()), + None, + ) + .await + .unwrap(); + + // Change the sha256 of the package + // And expect the package to be replaced + let new_sha = parse_digest_from_hex::( + "5dd9893f1eee45e1579d1a4f5533ef67a84b5e4b7515de7ed0db1dd47adc6bc9", + ) + .unwrap(); + pkg_record.sha256 = Some(new_sha); + + // Get the package again + // and verify that the package was replaced + let second_package_cache = cache + .get_or_fetch_from_url( + &pkg_record, + package_url.clone(), + ClientWithMiddleware::from(Client::new()), + None, + ) + .await + .unwrap(); + + assert_ne!(first_cache_path, second_package_cache); + } +} diff --git a/crates/rattler_cache/src/lib.rs b/crates/rattler_cache/src/lib.rs index 753b867a10..1be3b7210e 100644 --- a/crates/rattler_cache/src/lib.rs +++ b/crates/rattler_cache/src/lib.rs @@ -1,5 +1,6 @@ use std::path::PathBuf; +pub mod archive_cache; pub mod package_cache; pub mod validation; diff --git a/crates/rattler_networking/tests/integration_test.rs b/crates/rattler_networking/tests/integration_test.rs index f6305cbcf3..464439e2bb 100644 --- a/crates/rattler_networking/tests/integration_test.rs +++ b/crates/rattler_networking/tests/integration_test.rs @@ -119,8 +119,7 @@ async fn test_r2_download_repodata(r2_host: String, r2_credentials: Option<(Stri let body = result.text().await.unwrap(); assert!( body.contains("my-webserver-0.1.0-pyh4616a5c_0.conda"), - "body does not contain package: {}", - body + "body does not contain package: {body}" ); } @@ -159,7 +158,6 @@ async fn test_r2_download_repodata_aws_profile(aws_config: Option<(TempDir, std: let body = result.text().await.unwrap(); assert!( body.contains("my-webserver-0.1.0-pyh4616a5c_0.conda"), - "body does not contain package: {}", - body + "body does not contain package: {body}" ); } diff --git a/crates/simple_spawn_blocking/src/lib.rs b/crates/simple_spawn_blocking/src/lib.rs index d82f4737fe..753ed5c408 100644 --- a/crates/simple_spawn_blocking/src/lib.rs +++ b/crates/simple_spawn_blocking/src/lib.rs @@ -1,4 +1,4 @@ -//! A simpel crate that makes it more ergonomic to spawn blocking tasks and +//! A simple crate that makes it more ergonomic to spawn blocking tasks and //! await their completion. #[cfg(feature = "tokio")] From c00c711ae07e5ea7a3a4ff06ce6e847b046a29a1 Mon Sep 17 00:00:00 2001 From: nichmor Date: Tue, 11 Feb 2025 18:39:56 +0200 Subject: [PATCH 02/23] misc: change cargo.toml --- crates/rattler_cache/Cargo.toml | 2 -- 1 file changed, 2 deletions(-) diff --git a/crates/rattler_cache/Cargo.toml b/crates/rattler_cache/Cargo.toml index 1cf14197e7..4a1012b344 100644 --- a/crates/rattler_cache/Cargo.toml +++ b/crates/rattler_cache/Cargo.toml @@ -25,7 +25,6 @@ rattler_package_streaming = { version = "0.22.25", path = "../rattler_package_st reqwest.workspace = true tempfile.workspace = true tokio = { workspace = true, features = ["macros"] } -tokio-util = { workspace = true, features = ["io-util"] } tracing.workspace = true url.workspace = true thiserror.workspace = true @@ -41,7 +40,6 @@ axum.workspace = true bytes.workspace = true futures.workspace = true rstest.workspace = true -# tempfile.workspace = true tokio-stream.workspace = true tower-http = { workspace = true, features = ["fs"] } tools = { path = "../tools" } From 8f667d495baa8fd3d7d8c8638749f064f052f6ef Mon Sep 17 00:00:00 2001 From: nichmor Date: Tue, 11 Feb 2025 18:44:16 +0200 Subject: [PATCH 03/23] misc: change to archivecacheerror --- crates/rattler_cache/src/archive_cache/mod.rs | 41 +++++++++++++++---- 1 file changed, 33 insertions(+), 8 deletions(-) diff --git a/crates/rattler_cache/src/archive_cache/mod.rs b/crates/rattler_cache/src/archive_cache/mod.rs index 9294863f50..67c26cd6b4 100644 --- a/crates/rattler_cache/src/archive_cache/mod.rs +++ b/crates/rattler_cache/src/archive_cache/mod.rs @@ -15,7 +15,7 @@ use fs_err::tokio as tokio_fs; use parking_lot::Mutex; use rattler_networking::retry_policies::{DoNotRetryPolicy, RetryDecision, RetryPolicy}; use rattler_package_streaming::DownloadReporter; -use tempfile::NamedTempFile; +use tempfile::{NamedTempFile, PersistError}; use tracing::instrument; use url::Url; @@ -24,7 +24,7 @@ mod download; use cache_key::CacheKey; -use crate::package_cache::{CacheReporter, PackageCacheError}; +use crate::package_cache::CacheReporter; /// A [`ArchiveCache`] manages a cache of Conda packages on disk. /// @@ -89,7 +89,7 @@ impl ArchiveCache { &self, pkg: impl Into, fetch: F, - ) -> Result + ) -> Result where F: (Fn() -> Fut) + Send + 'static, Fut: Future> + Send + 'static, @@ -116,15 +116,15 @@ impl ArchiveCache { // Otherwise, defer to populate method to fill our cache. let temp_file = fetch() .await - .map_err(|e| PackageCacheError::FetchError(Arc::new(e)))?; + .map_err(|e| ArchiveCacheError::Fetch(Arc::new(e)))?; if let Some(parent_dir) = cache_path.parent() { if !parent_dir.exists() { - tokio_fs::create_dir_all(parent_dir).await.unwrap(); + tokio_fs::create_dir_all(parent_dir).await?; } } - temp_file.persist(&cache_path).unwrap(); + temp_file.persist(&cache_path)?; Ok(cache_path) } @@ -140,7 +140,7 @@ impl ArchiveCache { url: Url, client: reqwest_middleware::ClientWithMiddleware, reporter: Option>, - ) -> Result { + ) -> Result { self.get_or_fetch_from_url_with_retry(pkg, url, client, DoNotRetryPolicy, reporter) .await } @@ -164,7 +164,7 @@ impl ArchiveCache { client: reqwest_middleware::ClientWithMiddleware, retry_policy: impl RetryPolicy + Send + 'static + Clone, reporter: Option>, - ) -> Result { + ) -> Result { let request_start = SystemTime::now(); // Convert into cache key let cache_key = pkg.into(); @@ -230,6 +230,31 @@ impl ArchiveCache { } } +/// An error that might be returned from one of the caching function of the +/// [`PackageCache`]. +#[derive(Debug, thiserror::Error)] +pub enum ArchiveCacheError { + /// An error occurred while fetching the package. + #[error(transparent)] + Fetch(#[from] Arc), + + /// A locking error occurred + #[error("{0}")] + Lock(String, #[source] std::io::Error), + + /// An IO error occurred + #[error("{0}")] + Io(#[from] std::io::Error), + + /// An error occurred while persisting the temp file + #[error("{0}")] + Persist(#[from] PersistError), + + /// The operation was cancelled + #[error("operation was cancelled")] + Cancelled, +} + struct PassthroughReporter { reporter: Arc, index: Mutex>, From e28ca216450bcdc26d5dc9870e73f721cf97d1a6 Mon Sep 17 00:00:00 2001 From: nichmor Date: Tue, 11 Feb 2025 18:48:48 +0200 Subject: [PATCH 04/23] misc: remove ugly unwrap --- crates/rattler_cache/src/archive_cache/download.rs | 2 +- crates/rattler_repodata_gateway/src/fetch/mod.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/crates/rattler_cache/src/archive_cache/download.rs b/crates/rattler_cache/src/archive_cache/download.rs index 16cfb314fd..511dc126c9 100644 --- a/crates/rattler_cache/src/archive_cache/download.rs +++ b/crates/rattler_cache/src/archive_cache/download.rs @@ -27,7 +27,7 @@ pub(crate) async fn download( let total_bytes = response.content_length(); // Convert the named temp file into a tokio file let mut file = tokio::File::from_std(fs_err::File::from_parts( - temp_file.as_file().try_clone().unwrap(), + temp_file.as_file().try_clone()?, temp_file.path(), )); diff --git a/crates/rattler_repodata_gateway/src/fetch/mod.rs b/crates/rattler_repodata_gateway/src/fetch/mod.rs index 7e09585f94..5a3986f0ff 100644 --- a/crates/rattler_repodata_gateway/src/fetch/mod.rs +++ b/crates/rattler_repodata_gateway/src/fetch/mod.rs @@ -725,7 +725,7 @@ async fn stream_and_decode_to_file( // Clone the file handle and create a hashing writer so we can compute a hash // while the content is being written to disk. let file = tokio_fs::File::from_std(fs_err::File::from_parts( - temp_file.as_file().try_clone().unwrap(), + temp_file.as_file().try_clone()?, temp_file.path(), )); let mut hashing_file_writer = HashingWriter::<_, Blake2b256>::new(file); From 87add40306450632d7bf15c266f53346615369af Mon Sep 17 00:00:00 2001 From: nichmor Date: Tue, 11 Feb 2025 19:14:20 +0200 Subject: [PATCH 05/23] misc: throw up the error --- crates/rattler_repodata_gateway/src/fetch/mod.rs | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/crates/rattler_repodata_gateway/src/fetch/mod.rs b/crates/rattler_repodata_gateway/src/fetch/mod.rs index 5a3986f0ff..77c6dc73cd 100644 --- a/crates/rattler_repodata_gateway/src/fetch/mod.rs +++ b/crates/rattler_repodata_gateway/src/fetch/mod.rs @@ -725,7 +725,10 @@ async fn stream_and_decode_to_file( // Clone the file handle and create a hashing writer so we can compute a hash // while the content is being written to disk. let file = tokio_fs::File::from_std(fs_err::File::from_parts( - temp_file.as_file().try_clone()?, + temp_file + .as_file() + .try_clone() + .map_err(|err| FetchRepoDataError::IoError(err))?, temp_file.path(), )); let mut hashing_file_writer = HashingWriter::<_, Blake2b256>::new(file); From 8bd810e609f134c9aa2d9ea8e21a34290fda6e0a Mon Sep 17 00:00:00 2001 From: nichmor Date: Wed, 12 Feb 2025 09:57:52 +0200 Subject: [PATCH 06/23] misc: use ArchiveCache in doc links --- crates/rattler_cache/src/archive_cache/mod.rs | 4 ++-- crates/rattler_repodata_gateway/src/fetch/mod.rs | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/crates/rattler_cache/src/archive_cache/mod.rs b/crates/rattler_cache/src/archive_cache/mod.rs index 67c26cd6b4..447aae246b 100644 --- a/crates/rattler_cache/src/archive_cache/mod.rs +++ b/crates/rattler_cache/src/archive_cache/mod.rs @@ -1,5 +1,5 @@ //! This module provides functionality to cache extracted Conda packages. See -//! [`PackageCache`]. +//! [`ArchiveCache`]. use std::{ fmt::Debug, @@ -231,7 +231,7 @@ impl ArchiveCache { } /// An error that might be returned from one of the caching function of the -/// [`PackageCache`]. +/// [`ArchiveCache`]. #[derive(Debug, thiserror::Error)] pub enum ArchiveCacheError { /// An error occurred while fetching the package. diff --git a/crates/rattler_repodata_gateway/src/fetch/mod.rs b/crates/rattler_repodata_gateway/src/fetch/mod.rs index 77c6dc73cd..43bda98fa5 100644 --- a/crates/rattler_repodata_gateway/src/fetch/mod.rs +++ b/crates/rattler_repodata_gateway/src/fetch/mod.rs @@ -728,7 +728,7 @@ async fn stream_and_decode_to_file( temp_file .as_file() .try_clone() - .map_err(|err| FetchRepoDataError::IoError(err))?, + .map_err(FetchRepoDataError::IoError)?, temp_file.path(), )); let mut hashing_file_writer = HashingWriter::<_, Blake2b256>::new(file); From 9f9c384f7d06014988c359aefec94cb3df1d66f3 Mon Sep 17 00:00:00 2001 From: nichmor Date: Wed, 12 Feb 2025 10:33:25 +0200 Subject: [PATCH 07/23] misc: re-export CacheKey --- crates/rattler_cache/src/archive_cache/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/rattler_cache/src/archive_cache/mod.rs b/crates/rattler_cache/src/archive_cache/mod.rs index 447aae246b..a3ec5b692d 100644 --- a/crates/rattler_cache/src/archive_cache/mod.rs +++ b/crates/rattler_cache/src/archive_cache/mod.rs @@ -22,7 +22,7 @@ use url::Url; mod cache_key; mod download; -use cache_key::CacheKey; +pub use cache_key::CacheKey; use crate::package_cache::CacheReporter; From 2cfb9cb814076e511396aeab8823a8c436c87242 Mon Sep 17 00:00:00 2001 From: nichmor Date: Wed, 12 Feb 2025 11:08:16 +0200 Subject: [PATCH 08/23] misc: add public exposed constant --- crates/rattler_cache/src/consts.rs | 1 + crates/rattler_cache/src/lib.rs | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/crates/rattler_cache/src/consts.rs b/crates/rattler_cache/src/consts.rs index d635d262e1..20f97918ff 100644 --- a/crates/rattler_cache/src/consts.rs +++ b/crates/rattler_cache/src/consts.rs @@ -1,4 +1,5 @@ /// The location in the main cache folder where the conda package cache is stored. pub const PACKAGE_CACHE_DIR: &str = "pkgs"; +pub const ARCHIVE_CACHE_DIR: &str = "archives"; /// The location in the main cache folder where the repodata cache is stored. pub const REPODATA_CACHE_DIR: &str = "repodata"; diff --git a/crates/rattler_cache/src/lib.rs b/crates/rattler_cache/src/lib.rs index 1be3b7210e..7ff686a0a8 100644 --- a/crates/rattler_cache/src/lib.rs +++ b/crates/rattler_cache/src/lib.rs @@ -6,7 +6,7 @@ pub mod package_cache; pub mod validation; mod consts; -pub use consts::{PACKAGE_CACHE_DIR, REPODATA_CACHE_DIR}; +pub use consts::{PACKAGE_CACHE_DIR, REPODATA_CACHE_DIR, ARCHIVE_CACHE_DIR}; /// Determines the default cache directory for rattler. /// It first checks the environment variable `RATTLER_CACHE_DIR`. From 411688252458e5f82b85489503cd4142eaca5679 Mon Sep 17 00:00:00 2001 From: nichmor Date: Wed, 12 Feb 2025 12:48:52 +0200 Subject: [PATCH 09/23] misc: change cachekey to take filename --- .../src/archive_cache/cache_key.rs | 28 ++++++++---- crates/rattler_cache/src/archive_cache/mod.rs | 45 +++++++++++++------ crates/rattler_cache/src/lib.rs | 2 +- 3 files changed, 53 insertions(+), 22 deletions(-) diff --git a/crates/rattler_cache/src/archive_cache/cache_key.rs b/crates/rattler_cache/src/archive_cache/cache_key.rs index e953ef84e7..dde9d2ea9c 100644 --- a/crates/rattler_cache/src/archive_cache/cache_key.rs +++ b/crates/rattler_cache/src/archive_cache/cache_key.rs @@ -1,4 +1,4 @@ -use rattler_conda_types::PackageRecord; +use rattler_conda_types::{package::ArchiveIdentifier, PackageRecord}; use rattler_digest::Sha256Hash; use std::fmt::{Display, Formatter}; @@ -10,6 +10,7 @@ pub struct CacheKey { pub(crate) version: String, pub(crate) build_string: String, pub(crate) sha256: Option, + pub(crate) extension: String, } impl CacheKey { @@ -32,33 +33,44 @@ impl CacheKey { self.sha256 } + /// Return the sha256 hash string of the package if it is known. pub fn sha256_str(&self) -> String { self.sha256() .map(|hash| format!("{hash:x}")) .unwrap_or_default() } -} -impl From<&PackageRecord> for CacheKey { - fn from(record: &PackageRecord) -> Self { - Self { + /// Try to create a new cache key from a package record and a filename. + pub fn new(record: &PackageRecord, filename: &str) -> Result { + let archive_identifier = ArchiveIdentifier::try_from_filename(filename) + .ok_or_else(|| CacheKeyError::InvalidArchiveIdentifier(filename.to_string()))?; + + Ok(Self { name: record.name.as_normalized().to_string(), version: record.version.to_string(), build_string: record.build.clone(), sha256: record.sha256, - } + extension: archive_identifier.archive_type.extension().to_string(), + }) } } +#[derive(Debug, thiserror::Error)] +pub enum CacheKeyError { + #[error("Could not identify the archive type from the name: {0}")] + InvalidArchiveIdentifier(String), +} + impl Display for CacheKey { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { write!( f, - "{}-{}-{}-{}", + "{}-{}-{}-{}{}", &self.name, &self.version, &self.build_string, - self.sha256_str() + self.sha256_str(), + self.extension ) } } diff --git a/crates/rattler_cache/src/archive_cache/mod.rs b/crates/rattler_cache/src/archive_cache/mod.rs index a3ec5b692d..6b7ba2ba0a 100644 --- a/crates/rattler_cache/src/archive_cache/mod.rs +++ b/crates/rattler_cache/src/archive_cache/mod.rs @@ -22,7 +22,7 @@ use url::Url; mod cache_key; mod download; -pub use cache_key::CacheKey; +pub use cache_key::{CacheKey, CacheKeyError}; use crate::package_cache::CacheReporter; @@ -87,7 +87,7 @@ impl ArchiveCache { /// request is coalesced. No duplicate fetch is performed. pub async fn get_or_fetch( &self, - pkg: impl Into, + cache_key: &CacheKey, fetch: F, ) -> Result where @@ -95,7 +95,6 @@ impl ArchiveCache { Fut: Future> + Send + 'static, E: std::error::Error + Send + Sync + 'static, { - let cache_key = pkg.into(); let cache_path = self.inner.path.join(cache_key.to_string()); let cache_entry = self .inner @@ -136,12 +135,12 @@ impl ArchiveCache { /// cache. pub async fn get_or_fetch_from_url( &self, - pkg: impl Into, + cache_key: &CacheKey, url: Url, client: reqwest_middleware::ClientWithMiddleware, reporter: Option>, ) -> Result { - self.get_or_fetch_from_url_with_retry(pkg, url, client, DoNotRetryPolicy, reporter) + self.get_or_fetch_from_url_with_retry(cache_key, url, client, DoNotRetryPolicy, reporter) .await } @@ -159,7 +158,7 @@ impl ArchiveCache { #[instrument(skip_all, fields(url=%url))] pub async fn get_or_fetch_from_url_with_retry( &self, - pkg: impl Into, + cache_key: &CacheKey, url: Url, client: reqwest_middleware::ClientWithMiddleware, retry_policy: impl RetryPolicy + Send + 'static + Clone, @@ -167,7 +166,6 @@ impl ArchiveCache { ) -> Result { let request_start = SystemTime::now(); // Convert into cache key - let cache_key = pkg.into(); let download_reporter = reporter.clone(); // Get or fetch the package, using the specified fetch function self.get_or_fetch(cache_key, move || { @@ -185,7 +183,6 @@ impl ArchiveCache { let result = crate::archive_cache::download::download( client.clone(), url.clone(), - // &temp_file, download_reporter.clone().map(|reporter| Arc::new(PassthroughReporter { reporter, index: Mutex::new(None), @@ -312,6 +309,8 @@ mod test { use url::Url; + use crate::archive_cache::CacheKey; + use super::ArchiveCache; #[tokio::test] @@ -334,10 +333,16 @@ mod test { .unwrap(), ); + let cache_key = CacheKey::new( + &pkg_record, + "ros-noetic-rosbridge-suite-0.11.14-py39h6fdeb60_14.tar.bz2", + ) + .unwrap(); + // Get the package to the cache let cache_path = cache .get_or_fetch_from_url( - &pkg_record, + &cache_key, package_url.clone(), ClientWithMiddleware::from(Client::new()), None, @@ -420,10 +425,12 @@ mod test { let client = ClientBuilder::new(Client::default()).build(); + let cache_key = CacheKey::new(package_record, archive_name).unwrap(); + // Do the first request without let result = cache .get_or_fetch_from_url_with_retry( - package_record, + &cache_key, server_url.join(archive_name).unwrap(), client.clone(), DoNotRetryPolicy, @@ -446,7 +453,7 @@ mod test { // The second one should fail after the 2nd try let result = cache .get_or_fetch_from_url_with_retry( - package_record, + &cache_key, server_url.join(archive_name).unwrap(), client, retry_policy, @@ -504,10 +511,16 @@ mod test { let packages_dir = tempdir().unwrap(); let cache = ArchiveCache::new(packages_dir.path()); + let cache_key = CacheKey::new( + &pkg_record, + "ros-noetic-rosbridge-suite-0.11.14-py39h6fdeb60_14.tar.bz2", + ) + .unwrap(); + // Get the package to the cache let first_cache_path = cache .get_or_fetch_from_url( - &pkg_record, + &cache_key, package_url.clone(), ClientWithMiddleware::from(Client::new()), None, @@ -523,11 +536,17 @@ mod test { .unwrap(); pkg_record.sha256 = Some(new_sha); + let cache_key = CacheKey::new( + &pkg_record, + "ros-noetic-rosbridge-suite-0.11.14-py39h6fdeb60_14.tar.bz2", + ) + .unwrap(); + // Get the package again // and verify that the package was replaced let second_package_cache = cache .get_or_fetch_from_url( - &pkg_record, + &cache_key, package_url.clone(), ClientWithMiddleware::from(Client::new()), None, diff --git a/crates/rattler_cache/src/lib.rs b/crates/rattler_cache/src/lib.rs index 7ff686a0a8..083e048de4 100644 --- a/crates/rattler_cache/src/lib.rs +++ b/crates/rattler_cache/src/lib.rs @@ -6,7 +6,7 @@ pub mod package_cache; pub mod validation; mod consts; -pub use consts::{PACKAGE_CACHE_DIR, REPODATA_CACHE_DIR, ARCHIVE_CACHE_DIR}; +pub use consts::{ARCHIVE_CACHE_DIR, PACKAGE_CACHE_DIR, REPODATA_CACHE_DIR}; /// Determines the default cache directory for rattler. /// It first checks the environment variable `RATTLER_CACHE_DIR`. From 2db3c161a395d640d9ed7a03ad7916872d2ba631 Mon Sep 17 00:00:00 2001 From: nichmor Date: Wed, 12 Feb 2025 18:10:40 +0200 Subject: [PATCH 10/23] misc: refactor to cache run_exports.json --- crates/rattler_cache/Cargo.toml | 2 + .../src/archive_cache/download.rs | 3 +- crates/rattler_cache/src/archive_cache/mod.rs | 179 ++++++++++++++---- 3 files changed, 146 insertions(+), 38 deletions(-) diff --git a/crates/rattler_cache/Cargo.toml b/crates/rattler_cache/Cargo.toml index c14534509d..12df6b7aaa 100644 --- a/crates/rattler_cache/Cargo.toml +++ b/crates/rattler_cache/Cargo.toml @@ -33,6 +33,7 @@ digest.workspace = true fs4 = { workspace = true, features = ["fs-err3-tokio", "tokio"] } simple_spawn_blocking = { version = "1.0.0", path = "../simple_spawn_blocking", features = ["tokio"] } rayon = { workspace = true } +serde_json = { workspace = true } [dev-dependencies] assert_matches.workspace = true @@ -44,3 +45,4 @@ tokio-stream.workspace = true tower-http = { workspace = true, features = ["fs"] } tools = { path = "../tools" } reqwest-retry = { workspace = true } +insta = { workspace = true, features = ["yaml"] } diff --git a/crates/rattler_cache/src/archive_cache/download.rs b/crates/rattler_cache/src/archive_cache/download.rs index 511dc126c9..90d6893a9a 100644 --- a/crates/rattler_cache/src/archive_cache/download.rs +++ b/crates/rattler_cache/src/archive_cache/download.rs @@ -13,9 +13,10 @@ use url::Url; pub(crate) async fn download( client: reqwest_middleware::ClientWithMiddleware, url: Url, + suffix: &str, reporter: Option>, ) -> Result { - let temp_file = NamedTempFile::new()?; + let temp_file = NamedTempFile::with_suffix(suffix)?; // Send the request for the file let response = client.get(url.clone()).send().await?.error_for_status()?; diff --git a/crates/rattler_cache/src/archive_cache/mod.rs b/crates/rattler_cache/src/archive_cache/mod.rs index 6b7ba2ba0a..cf6a7f5d52 100644 --- a/crates/rattler_cache/src/archive_cache/mod.rs +++ b/crates/rattler_cache/src/archive_cache/mod.rs @@ -4,7 +4,7 @@ use std::{ fmt::Debug, future::Future, - path::PathBuf, + path::{Path, PathBuf}, sync::Arc, time::{Duration, SystemTime}, }; @@ -13,9 +13,10 @@ use dashmap::DashMap; use download::DownloadError; use fs_err::tokio as tokio_fs; use parking_lot::Mutex; +use rattler_conda_types::package::RunExportsJson; use rattler_networking::retry_policies::{DoNotRetryPolicy, RetryDecision, RetryPolicy}; -use rattler_package_streaming::DownloadReporter; -use tempfile::{NamedTempFile, PersistError}; +use rattler_package_streaming::{DownloadReporter, ExtractError}; +use tempfile::PersistError; use tracing::instrument; use url::Url; @@ -26,22 +27,45 @@ pub use cache_key::{CacheKey, CacheKeyError}; use crate::package_cache::CacheReporter; -/// A [`ArchiveCache`] manages a cache of Conda packages on disk. +/// A [`RunExportsCache`] manages a cache of `run_exports.json` /// /// The store does not provide an implementation to get the data into the store. -/// Instead, this is left up to the user when the package is requested. If the -/// package is found in the cache it is returned immediately. However, if the +/// Instead, this is left up to the user when the `run_exports.json` is requested. If the +/// `run_exports.json` is found in the cache it is returned immediately. However, if the /// cache is missing a user defined function is called to populate the cache. This /// separates the corners between caching and fetching of the content. #[derive(Clone)] -pub struct ArchiveCache { - inner: Arc, +pub struct RunExportsCache { + inner: Arc, +} + +#[derive(Clone, Debug)] +pub struct CacheEntry { + pub(crate) run_exports: Arc>, + pub(crate) path: PathBuf, +} + +impl CacheEntry { + pub(crate) fn new(run_exports: Option, path: PathBuf) -> Self { + Self { + run_exports: Arc::new(run_exports), + path, + } + } + + pub fn run_exports(&self) -> Arc> { + self.run_exports.clone() + } + + pub fn path(&self) -> &Path { + &self.path + } } #[derive(Default)] -struct ArchiveCacheInner { +struct RunExportsCacheInner { path: PathBuf, - packages: DashMap>>, + run_exports: DashMap>>>, } /// A key that defines the actual location of the package in the cache. @@ -64,13 +88,13 @@ impl From for BucketKey { } } -impl ArchiveCache { +impl RunExportsCache { /// Constructs a new [`ArchiveCache`] located at the specified path. pub fn new(path: impl Into) -> Self { Self { - inner: Arc::new(ArchiveCacheInner { + inner: Arc::new(RunExportsCacheInner { path: path.into(), - packages: DashMap::default(), + run_exports: DashMap::default(), }), } } @@ -89,33 +113,33 @@ impl ArchiveCache { &self, cache_key: &CacheKey, fetch: F, - ) -> Result + ) -> Result where F: (Fn() -> Fut) + Send + 'static, - Fut: Future> + Send + 'static, + Fut: Future, E>> + Send + 'static, E: std::error::Error + Send + Sync + 'static, { let cache_path = self.inner.path.join(cache_key.to_string()); let cache_entry = self .inner - .packages + .run_exports .entry(cache_key.clone().into()) .or_default() .clone(); // Acquire the entry. From this point on we can be sure that only one task is // accessing the cache entry. - let _ = cache_entry.lock().await; + let mut entry = cache_entry.lock().await; // Check if the cache entry is already stored in the cache. - if cache_path.exists() { - return Ok(cache_path); + if let Some(run_exports) = entry.as_ref() { + return Ok(run_exports.clone()); } // Otherwise, defer to populate method to fill our cache. - let temp_file = fetch() + let run_exports = fetch() .await - .map_err(|e| ArchiveCacheError::Fetch(Arc::new(e)))?; + .map_err(|e| RunExportsCacheError::Fetch(Arc::new(e)))?; if let Some(parent_dir) = cache_path.parent() { if !parent_dir.exists() { @@ -123,9 +147,12 @@ impl ArchiveCache { } } - temp_file.persist(&cache_path)?; + tokio_fs::write(&cache_path, serde_json::to_string(&run_exports)?).await?; + let cache_entry = CacheEntry::new(run_exports, cache_path); - Ok(cache_path) + entry.replace(cache_entry.clone()); + + Ok(cache_entry) } /// Returns the directory that contains the specified package. @@ -139,7 +166,7 @@ impl ArchiveCache { url: Url, client: reqwest_middleware::ClientWithMiddleware, reporter: Option>, - ) -> Result { + ) -> Result { self.get_or_fetch_from_url_with_retry(cache_key, url, client, DoNotRetryPolicy, reporter) .await } @@ -163,16 +190,31 @@ impl ArchiveCache { client: reqwest_middleware::ClientWithMiddleware, retry_policy: impl RetryPolicy + Send + 'static + Clone, reporter: Option>, - ) -> Result { + ) -> Result { let request_start = SystemTime::now(); // Convert into cache key let download_reporter = reporter.clone(); + + let extension = cache_key.extension.clone(); // Get or fetch the package, using the specified fetch function self.get_or_fetch(cache_key, move || { + + #[derive(Debug, thiserror::Error)] + enum FetchError{ + #[error(transparent)] + Download(#[from] DownloadError), + + #[error(transparent)] + Extract(#[from] ExtractError), + + } + let url = url.clone(); let client = client.clone(); let retry_policy = retry_policy.clone(); let download_reporter = download_reporter.clone(); + let extension = extension.clone(); + async move { let mut current_try = 0; // Retry until the retry policy says to stop @@ -183,6 +225,7 @@ impl ArchiveCache { let result = crate::archive_cache::download::download( client.clone(), url.clone(), + &extension, download_reporter.clone().map(|reporter| Arc::new(PassthroughReporter { reporter, index: Mutex::new(None), @@ -190,15 +233,33 @@ impl ArchiveCache { ) .await; + // Extract any potential error let err = match result { - Ok(result) => return Ok(result), - Err(err) => err, + Ok(result) => { + // now extract run_exports.json from the archive without unpacking + let file = + rattler_package_streaming::seek::read_package_file::(result); + + match file { + Ok(run_exports) => { + return Ok(Some(run_exports)); + }, + Err(err) => { + if matches!(err, ExtractError::MissingComponent) { + return Ok(None); + } + return Err(FetchError::Extract(err)); + + } + } + }, + Err(err) => FetchError::Download(err), }; // Only retry on io errors. We assume that the user has // middleware installed that handles connection retries. - if !matches!(&err,DownloadError::Io(_)) { + if !matches!(&err, FetchError::Download(_)) { return Err(err); } @@ -230,7 +291,7 @@ impl ArchiveCache { /// An error that might be returned from one of the caching function of the /// [`ArchiveCache`]. #[derive(Debug, thiserror::Error)] -pub enum ArchiveCacheError { +pub enum RunExportsCacheError { /// An error occurred while fetching the package. #[error(transparent)] Fetch(#[from] Arc), @@ -247,6 +308,14 @@ pub enum ArchiveCacheError { #[error("{0}")] Persist(#[from] PersistError), + /// An error occured when extracting `run_exports` from archive + #[error(transparent)] + Extract(#[from] ExtractError), + + /// An error occured when serializing `run_exports` + #[error(transparent)] + Serialize(#[from] serde_json::Error), + /// The operation was cancelled #[error("operation was cancelled")] Cancelled, @@ -311,15 +380,17 @@ mod test { use crate::archive_cache::CacheKey; - use super::ArchiveCache; + use super::RunExportsCache; #[tokio::test] - pub async fn test_package_cache() { + pub async fn test_run_exports_cache_when_empty() { + // This archive does not contain a run_exports.json + // so we expect the cache to return None let package_url = Url::parse("https://conda.anaconda.org/robostack/linux-64/ros-noetic-rosbridge-suite-0.11.14-py39h6fdeb60_14.tar.bz2").unwrap(); let cache_dir = tempdir().unwrap().into_path(); - let cache = ArchiveCache::new(&cache_dir); + let cache = RunExportsCache::new(&cache_dir); let mut pkg_record = PackageRecord::new( PackageName::from_str("ros-noetic-rosbridge-suite").unwrap(), @@ -340,7 +411,41 @@ mod test { .unwrap(); // Get the package to the cache - let cache_path = cache + let cached_run_exports = cache + .get_or_fetch_from_url( + &cache_key, + package_url.clone(), + ClientWithMiddleware::from(Client::new()), + None, + ) + .await + .unwrap(); + + assert!(cached_run_exports.run_exports.is_none()); + } + + #[tokio::test] + pub async fn test_run_exports_cache_when_present() { + // This archive contains a run_exports.json + // so we expect the cache to return it + let package_url = + Url::parse("https://repo.prefix.dev/conda-forge/linux-64/zlib-1.3.1-hb9d3cd8_2.conda") + .unwrap(); + + let cache_dir = tempdir().unwrap().into_path(); + + let cache = RunExportsCache::new(&cache_dir); + + let pkg_record = PackageRecord::new( + PackageName::from_str("zlib").unwrap(), + Version::from_str("1.3.1").unwrap(), + "hb9d3cd8_2".to_string(), + ); + + let cache_key = CacheKey::new(&pkg_record, "zlib-1.3.1-hb9d3cd8_2.conda").unwrap(); + + // Get the package to the cache + let cached_run_exports = cache .get_or_fetch_from_url( &cache_key, package_url.clone(), @@ -350,7 +455,7 @@ mod test { .await .unwrap(); - assert!(cache_path.exists()); + assert!(cached_run_exports.run_exports.is_some()); } /// A helper middleware function that fails the first two requests. @@ -419,7 +524,7 @@ mod test { tokio::spawn(axum::serve(listener, service).into_future()); let packages_dir = tempdir().unwrap(); - let cache = ArchiveCache::new(packages_dir.path()); + let cache = RunExportsCache::new(packages_dir.path()); let server_url = Url::parse(&format!("http://localhost:{}", addr.port())).unwrap(); @@ -509,7 +614,7 @@ mod test { // Create a temporary directory to store the packages let packages_dir = tempdir().unwrap(); - let cache = ArchiveCache::new(packages_dir.path()); + let cache = RunExportsCache::new(packages_dir.path()); let cache_key = CacheKey::new( &pkg_record, @@ -554,6 +659,6 @@ mod test { .await .unwrap(); - assert_ne!(first_cache_path, second_package_cache); + assert_ne!(first_cache_path.path(), second_package_cache.path()); } } From 69054a747a799ef990428861659083e6f6cf113a Mon Sep 17 00:00:00 2001 From: nichmor Date: Wed, 12 Feb 2025 18:11:29 +0200 Subject: [PATCH 11/23] misc: remove insta --- crates/rattler_cache/Cargo.toml | 1 - 1 file changed, 1 deletion(-) diff --git a/crates/rattler_cache/Cargo.toml b/crates/rattler_cache/Cargo.toml index 12df6b7aaa..b725a36536 100644 --- a/crates/rattler_cache/Cargo.toml +++ b/crates/rattler_cache/Cargo.toml @@ -45,4 +45,3 @@ tokio-stream.workspace = true tower-http = { workspace = true, features = ["fs"] } tools = { path = "../tools" } reqwest-retry = { workspace = true } -insta = { workspace = true, features = ["yaml"] } From 15c5881bcb89cf4dd1631317250bbf9bd2088ca3 Mon Sep 17 00:00:00 2001 From: nichmor Date: Wed, 12 Feb 2025 18:12:57 +0200 Subject: [PATCH 12/23] misc: add some docstrings --- crates/rattler_cache/src/archive_cache/mod.rs | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/crates/rattler_cache/src/archive_cache/mod.rs b/crates/rattler_cache/src/archive_cache/mod.rs index cf6a7f5d52..b8c3cfe991 100644 --- a/crates/rattler_cache/src/archive_cache/mod.rs +++ b/crates/rattler_cache/src/archive_cache/mod.rs @@ -39,13 +39,17 @@ pub struct RunExportsCache { inner: Arc, } +/// A cache entry that contains the path to the package and the `run_exports.json` #[derive(Clone, Debug)] pub struct CacheEntry { + /// The `run_exports.json` of the package. pub(crate) run_exports: Arc>, + /// The path to the file on disk. pub(crate) path: PathBuf, } impl CacheEntry { + /// Create a new cache entry. pub(crate) fn new(run_exports: Option, path: PathBuf) -> Self { Self { run_exports: Arc::new(run_exports), @@ -53,10 +57,12 @@ impl CacheEntry { } } + /// Returns the `run_exports.json` of the package. pub fn run_exports(&self) -> Arc> { self.run_exports.clone() } + /// Returns the path to the file on disk. pub fn path(&self) -> &Path { &self.path } From 9dd1cf04c8f18a91f32db285af207d3a9b2ac231 Mon Sep 17 00:00:00 2001 From: nichmor Date: Wed, 12 Feb 2025 18:19:15 +0200 Subject: [PATCH 13/23] misc: rename to run-exports cache --- .../src/archive_cache/cache_key.rs | 76 -- .../src/archive_cache/download.rs | 65 -- crates/rattler_cache/src/archive_cache/mod.rs | 670 ------------------ crates/rattler_cache/src/lib.rs | 2 +- 4 files changed, 1 insertion(+), 812 deletions(-) delete mode 100644 crates/rattler_cache/src/archive_cache/cache_key.rs delete mode 100644 crates/rattler_cache/src/archive_cache/download.rs delete mode 100644 crates/rattler_cache/src/archive_cache/mod.rs diff --git a/crates/rattler_cache/src/archive_cache/cache_key.rs b/crates/rattler_cache/src/archive_cache/cache_key.rs deleted file mode 100644 index dde9d2ea9c..0000000000 --- a/crates/rattler_cache/src/archive_cache/cache_key.rs +++ /dev/null @@ -1,76 +0,0 @@ -use rattler_conda_types::{package::ArchiveIdentifier, PackageRecord}; -use rattler_digest::Sha256Hash; -use std::fmt::{Display, Formatter}; - -/// Provides a unique identifier for packages in the cache. -/// TODO: This could not be unique over multiple subdir. How to handle? -#[derive(Debug, Hash, Clone, Eq, PartialEq)] -pub struct CacheKey { - pub(crate) name: String, - pub(crate) version: String, - pub(crate) build_string: String, - pub(crate) sha256: Option, - pub(crate) extension: String, -} - -impl CacheKey { - /// Adds a sha256 hash of the archive. - pub fn with_sha256(mut self, sha256: Sha256Hash) -> Self { - self.sha256 = Some(sha256); - self - } - - /// Potentially adds a sha256 hash of the archive. - pub fn with_opt_sha256(mut self, sha256: Option) -> Self { - self.sha256 = sha256; - self - } -} - -impl CacheKey { - /// Return the sha256 hash of the package if it is known. - pub fn sha256(&self) -> Option { - self.sha256 - } - - /// Return the sha256 hash string of the package if it is known. - pub fn sha256_str(&self) -> String { - self.sha256() - .map(|hash| format!("{hash:x}")) - .unwrap_or_default() - } - - /// Try to create a new cache key from a package record and a filename. - pub fn new(record: &PackageRecord, filename: &str) -> Result { - let archive_identifier = ArchiveIdentifier::try_from_filename(filename) - .ok_or_else(|| CacheKeyError::InvalidArchiveIdentifier(filename.to_string()))?; - - Ok(Self { - name: record.name.as_normalized().to_string(), - version: record.version.to_string(), - build_string: record.build.clone(), - sha256: record.sha256, - extension: archive_identifier.archive_type.extension().to_string(), - }) - } -} - -#[derive(Debug, thiserror::Error)] -pub enum CacheKeyError { - #[error("Could not identify the archive type from the name: {0}")] - InvalidArchiveIdentifier(String), -} - -impl Display for CacheKey { - fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - write!( - f, - "{}-{}-{}-{}{}", - &self.name, - &self.version, - &self.build_string, - self.sha256_str(), - self.extension - ) - } -} diff --git a/crates/rattler_cache/src/archive_cache/download.rs b/crates/rattler_cache/src/archive_cache/download.rs deleted file mode 100644 index 90d6893a9a..0000000000 --- a/crates/rattler_cache/src/archive_cache/download.rs +++ /dev/null @@ -1,65 +0,0 @@ -use std::sync::Arc; - -use ::tokio::io::AsyncWriteExt; -use fs_err::tokio; -use futures::StreamExt; -use rattler_package_streaming::DownloadReporter; -use tempfile::NamedTempFile; -// use tokio_stream::StreamExt; -use url::Url; - -/// Download the contents of the archive from the specified remote location -/// and store it in a temporary file. -pub(crate) async fn download( - client: reqwest_middleware::ClientWithMiddleware, - url: Url, - suffix: &str, - reporter: Option>, -) -> Result { - let temp_file = NamedTempFile::with_suffix(suffix)?; - - // Send the request for the file - let response = client.get(url.clone()).send().await?.error_for_status()?; - - if let Some(reporter) = &reporter { - reporter.on_download_start(); - } - - let total_bytes = response.content_length(); - // Convert the named temp file into a tokio file - let mut file = tokio::File::from_std(fs_err::File::from_parts( - temp_file.as_file().try_clone()?, - temp_file.path(), - )); - - let mut stream = response.bytes_stream(); - - let mut bytes_received = 0; - while let Some(chunk_result) = stream.next().await { - let chunk = chunk_result?; - - if let Some(reporter) = &reporter { - bytes_received += chunk.len() as u64; - reporter.on_download_progress(bytes_received, total_bytes); - } - file.write_all(&chunk).await?; - } - - file.flush().await?; - - Ok(temp_file) -} - -/// An error that can occur when downloading an archive. -#[derive(thiserror::Error, Debug)] -#[allow(missing_docs)] -pub enum DownloadError { - #[error("an io error occurred: {0}")] - Io(#[from] std::io::Error), - - #[error(transparent)] - ReqwestMiddleware(#[from] ::reqwest_middleware::Error), - - #[error(transparent)] - Reqwest(#[from] ::reqwest::Error), -} diff --git a/crates/rattler_cache/src/archive_cache/mod.rs b/crates/rattler_cache/src/archive_cache/mod.rs deleted file mode 100644 index b8c3cfe991..0000000000 --- a/crates/rattler_cache/src/archive_cache/mod.rs +++ /dev/null @@ -1,670 +0,0 @@ -//! This module provides functionality to cache extracted Conda packages. See -//! [`ArchiveCache`]. - -use std::{ - fmt::Debug, - future::Future, - path::{Path, PathBuf}, - sync::Arc, - time::{Duration, SystemTime}, -}; - -use dashmap::DashMap; -use download::DownloadError; -use fs_err::tokio as tokio_fs; -use parking_lot::Mutex; -use rattler_conda_types::package::RunExportsJson; -use rattler_networking::retry_policies::{DoNotRetryPolicy, RetryDecision, RetryPolicy}; -use rattler_package_streaming::{DownloadReporter, ExtractError}; -use tempfile::PersistError; -use tracing::instrument; -use url::Url; - -mod cache_key; -mod download; - -pub use cache_key::{CacheKey, CacheKeyError}; - -use crate::package_cache::CacheReporter; - -/// A [`RunExportsCache`] manages a cache of `run_exports.json` -/// -/// The store does not provide an implementation to get the data into the store. -/// Instead, this is left up to the user when the `run_exports.json` is requested. If the -/// `run_exports.json` is found in the cache it is returned immediately. However, if the -/// cache is missing a user defined function is called to populate the cache. This -/// separates the corners between caching and fetching of the content. -#[derive(Clone)] -pub struct RunExportsCache { - inner: Arc, -} - -/// A cache entry that contains the path to the package and the `run_exports.json` -#[derive(Clone, Debug)] -pub struct CacheEntry { - /// The `run_exports.json` of the package. - pub(crate) run_exports: Arc>, - /// The path to the file on disk. - pub(crate) path: PathBuf, -} - -impl CacheEntry { - /// Create a new cache entry. - pub(crate) fn new(run_exports: Option, path: PathBuf) -> Self { - Self { - run_exports: Arc::new(run_exports), - path, - } - } - - /// Returns the `run_exports.json` of the package. - pub fn run_exports(&self) -> Arc> { - self.run_exports.clone() - } - - /// Returns the path to the file on disk. - pub fn path(&self) -> &Path { - &self.path - } -} - -#[derive(Default)] -struct RunExportsCacheInner { - path: PathBuf, - run_exports: DashMap>>>, -} - -/// A key that defines the actual location of the package in the cache. -#[derive(Debug, Hash, Clone, Eq, PartialEq)] -pub struct BucketKey { - name: String, - version: String, - build_string: String, - sha256_string: String, -} - -impl From for BucketKey { - fn from(key: CacheKey) -> Self { - Self { - name: key.name.clone(), - version: key.version.clone(), - build_string: key.build_string.clone(), - sha256_string: key.sha256_str(), - } - } -} - -impl RunExportsCache { - /// Constructs a new [`ArchiveCache`] located at the specified path. - pub fn new(path: impl Into) -> Self { - Self { - inner: Arc::new(RunExportsCacheInner { - path: path.into(), - run_exports: DashMap::default(), - }), - } - } - - /// Returns the directory that contains the specified package. - /// - /// If the package was previously successfully fetched and stored in the - /// cache the directory containing the data is returned immediately. If - /// the package was not previously fetch the filesystem is checked to - /// see if a directory with valid package content exists. Otherwise, the - /// user provided `fetch` function is called to populate the cache. - /// - /// If the package is already being fetched by another task/thread the - /// request is coalesced. No duplicate fetch is performed. - pub async fn get_or_fetch( - &self, - cache_key: &CacheKey, - fetch: F, - ) -> Result - where - F: (Fn() -> Fut) + Send + 'static, - Fut: Future, E>> + Send + 'static, - E: std::error::Error + Send + Sync + 'static, - { - let cache_path = self.inner.path.join(cache_key.to_string()); - let cache_entry = self - .inner - .run_exports - .entry(cache_key.clone().into()) - .or_default() - .clone(); - - // Acquire the entry. From this point on we can be sure that only one task is - // accessing the cache entry. - let mut entry = cache_entry.lock().await; - - // Check if the cache entry is already stored in the cache. - if let Some(run_exports) = entry.as_ref() { - return Ok(run_exports.clone()); - } - - // Otherwise, defer to populate method to fill our cache. - let run_exports = fetch() - .await - .map_err(|e| RunExportsCacheError::Fetch(Arc::new(e)))?; - - if let Some(parent_dir) = cache_path.parent() { - if !parent_dir.exists() { - tokio_fs::create_dir_all(parent_dir).await?; - } - } - - tokio_fs::write(&cache_path, serde_json::to_string(&run_exports)?).await?; - let cache_entry = CacheEntry::new(run_exports, cache_path); - - entry.replace(cache_entry.clone()); - - Ok(cache_entry) - } - - /// Returns the directory that contains the specified package. - /// - /// This is a convenience wrapper around `get_or_fetch` which fetches the - /// package from the given URL if the package could not be found in the - /// cache. - pub async fn get_or_fetch_from_url( - &self, - cache_key: &CacheKey, - url: Url, - client: reqwest_middleware::ClientWithMiddleware, - reporter: Option>, - ) -> Result { - self.get_or_fetch_from_url_with_retry(cache_key, url, client, DoNotRetryPolicy, reporter) - .await - } - - /// Returns the directory that contains the specified package. - /// - /// This is a convenience wrapper around `get_or_fetch` which fetches the - /// package from the given URL if the package could not be found in the - /// cache. - /// - /// This function assumes that the `client` is already configured with a - /// retry middleware that will retry any request that fails. This function - /// uses the passed in `retry_policy` if, after the request has been sent - /// and the response is successful, streaming of the package data fails - /// and the whole request must be retried. - #[instrument(skip_all, fields(url=%url))] - pub async fn get_or_fetch_from_url_with_retry( - &self, - cache_key: &CacheKey, - url: Url, - client: reqwest_middleware::ClientWithMiddleware, - retry_policy: impl RetryPolicy + Send + 'static + Clone, - reporter: Option>, - ) -> Result { - let request_start = SystemTime::now(); - // Convert into cache key - let download_reporter = reporter.clone(); - - let extension = cache_key.extension.clone(); - // Get or fetch the package, using the specified fetch function - self.get_or_fetch(cache_key, move || { - - #[derive(Debug, thiserror::Error)] - enum FetchError{ - #[error(transparent)] - Download(#[from] DownloadError), - - #[error(transparent)] - Extract(#[from] ExtractError), - - } - - let url = url.clone(); - let client = client.clone(); - let retry_policy = retry_policy.clone(); - let download_reporter = download_reporter.clone(); - let extension = extension.clone(); - - async move { - let mut current_try = 0; - // Retry until the retry policy says to stop - loop { - current_try += 1; - tracing::debug!("downloading {}", &url); - // Extract the package - let result = crate::archive_cache::download::download( - client.clone(), - url.clone(), - &extension, - download_reporter.clone().map(|reporter| Arc::new(PassthroughReporter { - reporter, - index: Mutex::new(None), - }) as Arc::), - ) - .await; - - - // Extract any potential error - let err = match result { - Ok(result) => { - // now extract run_exports.json from the archive without unpacking - let file = - rattler_package_streaming::seek::read_package_file::(result); - - match file { - Ok(run_exports) => { - return Ok(Some(run_exports)); - }, - Err(err) => { - if matches!(err, ExtractError::MissingComponent) { - return Ok(None); - } - return Err(FetchError::Extract(err)); - - } - } - }, - Err(err) => FetchError::Download(err), - }; - - // Only retry on io errors. We assume that the user has - // middleware installed that handles connection retries. - if !matches!(&err, FetchError::Download(_)) { - return Err(err); - } - - // Determine whether to retry based on the retry policy - let execute_after = match retry_policy.should_retry(request_start, current_try) { - RetryDecision::Retry { execute_after } => execute_after, - RetryDecision::DoNotRetry => return Err(err), - }; - let duration = execute_after.duration_since(SystemTime::now()).unwrap_or(Duration::ZERO); - - // Wait for a second to let the remote service restore itself. This increases the - // chance of success. - tracing::warn!( - "failed to download and extract {} {}. Retry #{}, Sleeping {:?} until the next attempt...", - &url, - // destination.display(), - err, - current_try, - duration - ); - tokio::time::sleep(duration).await; - } - } - }) - .await - } -} - -/// An error that might be returned from one of the caching function of the -/// [`ArchiveCache`]. -#[derive(Debug, thiserror::Error)] -pub enum RunExportsCacheError { - /// An error occurred while fetching the package. - #[error(transparent)] - Fetch(#[from] Arc), - - /// A locking error occurred - #[error("{0}")] - Lock(String, #[source] std::io::Error), - - /// An IO error occurred - #[error("{0}")] - Io(#[from] std::io::Error), - - /// An error occurred while persisting the temp file - #[error("{0}")] - Persist(#[from] PersistError), - - /// An error occured when extracting `run_exports` from archive - #[error(transparent)] - Extract(#[from] ExtractError), - - /// An error occured when serializing `run_exports` - #[error(transparent)] - Serialize(#[from] serde_json::Error), - - /// The operation was cancelled - #[error("operation was cancelled")] - Cancelled, -} - -struct PassthroughReporter { - reporter: Arc, - index: Mutex>, -} - -impl DownloadReporter for PassthroughReporter { - fn on_download_start(&self) { - let index = self.reporter.on_download_start(); - assert!( - self.index.lock().replace(index).is_none(), - "on_download_start was called multiple times" - ); - } - - fn on_download_progress(&self, bytes_downloaded: u64, total_bytes: Option) { - let index = self.index.lock().expect("on_download_start was not called"); - self.reporter - .on_download_progress(index, bytes_downloaded, total_bytes); - } - - fn on_download_complete(&self) { - let index = self - .index - .lock() - .take() - .expect("on_download_start was not called"); - self.reporter.on_download_completed(index); - } -} - -#[cfg(test)] -mod test { - use std::{future::IntoFuture, net::SocketAddr, str::FromStr, sync::Arc}; - - use assert_matches::assert_matches; - use axum::{ - body::Body, - extract::State, - http::{Request, StatusCode}, - middleware, - middleware::Next, - response::{Redirect, Response}, - routing::get, - Router, - }; - - use rattler_conda_types::{PackageName, PackageRecord, Version}; - use rattler_digest::{parse_digest_from_hex, Sha256}; - use rattler_networking::retry_policies::{DoNotRetryPolicy, ExponentialBackoffBuilder}; - use reqwest::Client; - use reqwest_middleware::{ClientBuilder, ClientWithMiddleware}; - use reqwest_retry::RetryTransientMiddleware; - use tempfile::tempdir; - use tokio::sync::Mutex; - - use url::Url; - - use crate::archive_cache::CacheKey; - - use super::RunExportsCache; - - #[tokio::test] - pub async fn test_run_exports_cache_when_empty() { - // This archive does not contain a run_exports.json - // so we expect the cache to return None - let package_url = Url::parse("https://conda.anaconda.org/robostack/linux-64/ros-noetic-rosbridge-suite-0.11.14-py39h6fdeb60_14.tar.bz2").unwrap(); - - let cache_dir = tempdir().unwrap().into_path(); - - let cache = RunExportsCache::new(&cache_dir); - - let mut pkg_record = PackageRecord::new( - PackageName::from_str("ros-noetic-rosbridge-suite").unwrap(), - Version::from_str("0.11.14").unwrap(), - "py39h6fdeb60_14".to_string(), - ); - pkg_record.sha256 = Some( - parse_digest_from_hex::( - "4dd9893f1eee45e1579d1a4f5533ef67a84b5e4b7515de7ed0db1dd47adc6bc8", - ) - .unwrap(), - ); - - let cache_key = CacheKey::new( - &pkg_record, - "ros-noetic-rosbridge-suite-0.11.14-py39h6fdeb60_14.tar.bz2", - ) - .unwrap(); - - // Get the package to the cache - let cached_run_exports = cache - .get_or_fetch_from_url( - &cache_key, - package_url.clone(), - ClientWithMiddleware::from(Client::new()), - None, - ) - .await - .unwrap(); - - assert!(cached_run_exports.run_exports.is_none()); - } - - #[tokio::test] - pub async fn test_run_exports_cache_when_present() { - // This archive contains a run_exports.json - // so we expect the cache to return it - let package_url = - Url::parse("https://repo.prefix.dev/conda-forge/linux-64/zlib-1.3.1-hb9d3cd8_2.conda") - .unwrap(); - - let cache_dir = tempdir().unwrap().into_path(); - - let cache = RunExportsCache::new(&cache_dir); - - let pkg_record = PackageRecord::new( - PackageName::from_str("zlib").unwrap(), - Version::from_str("1.3.1").unwrap(), - "hb9d3cd8_2".to_string(), - ); - - let cache_key = CacheKey::new(&pkg_record, "zlib-1.3.1-hb9d3cd8_2.conda").unwrap(); - - // Get the package to the cache - let cached_run_exports = cache - .get_or_fetch_from_url( - &cache_key, - package_url.clone(), - ClientWithMiddleware::from(Client::new()), - None, - ) - .await - .unwrap(); - - assert!(cached_run_exports.run_exports.is_some()); - } - - /// A helper middleware function that fails the first two requests. - async fn fail_the_first_two_requests( - State(count): State>>, - req: Request, - next: Next, - ) -> Result { - let count = { - let mut count = count.lock().await; - *count += 1; - *count - }; - - println!("Running middleware for request #{count} for {}", req.uri()); - if count <= 2 { - println!("Discarding request!"); - return Err(StatusCode::INTERNAL_SERVER_ERROR); - } - - // requires the http crate to get the header name - Ok(next.run(req).await) - } - - enum Middleware { - FailTheFirstTwoRequests, - } - - async fn redirect_to_prefix( - axum::extract::Path((channel, subdir, file)): axum::extract::Path<(String, String, String)>, - ) -> Redirect { - Redirect::permanent(&format!("https://prefix.dev/{channel}/{subdir}/{file}")) - } - - async fn test_flaky_package_cache( - archive_name: &str, - package_record: &PackageRecord, - middleware: Middleware, - ) { - // Construct a service that serves raw files from the test directory - // build our application with a route - let router = Router::new() - // `GET /` goes to `root` - .route("/{channel}/{subdir}/{file}", get(redirect_to_prefix)); - - // Construct a router that returns data from the static dir but fails the first - // try. - let request_count = Arc::new(Mutex::new(0)); - - let router = match middleware { - Middleware::FailTheFirstTwoRequests => router.layer(middleware::from_fn_with_state( - request_count.clone(), - fail_the_first_two_requests, - )), - }; - - // Construct the server that will listen on localhost but with a *random port*. - // The random port is very important because it enables creating - // multiple instances at the same time. We need this to be able to run - // tests in parallel. - let addr = SocketAddr::new([127, 0, 0, 1].into(), 0); - let listener = tokio::net::TcpListener::bind(&addr).await.unwrap(); - let addr = listener.local_addr().unwrap(); - - let service = router.into_make_service(); - tokio::spawn(axum::serve(listener, service).into_future()); - - let packages_dir = tempdir().unwrap(); - let cache = RunExportsCache::new(packages_dir.path()); - - let server_url = Url::parse(&format!("http://localhost:{}", addr.port())).unwrap(); - - let client = ClientBuilder::new(Client::default()).build(); - - let cache_key = CacheKey::new(package_record, archive_name).unwrap(); - - // Do the first request without - let result = cache - .get_or_fetch_from_url_with_retry( - &cache_key, - server_url.join(archive_name).unwrap(), - client.clone(), - DoNotRetryPolicy, - None, - ) - .await; - - // First request without retry policy should fail - assert_matches!(result, Err(_)); - { - let request_count_lock = request_count.lock().await; - assert_eq!(*request_count_lock, 1, "Expected there to be 1 request"); - } - - let retry_policy = ExponentialBackoffBuilder::default().build_with_max_retries(3); - let client = ClientBuilder::from_client(client) - .with(RetryTransientMiddleware::new_with_policy(retry_policy)) - .build(); - - // The second one should fail after the 2nd try - let result = cache - .get_or_fetch_from_url_with_retry( - &cache_key, - server_url.join(archive_name).unwrap(), - client, - retry_policy, - None, - ) - .await; - - assert!(result.is_ok()); - { - let request_count_lock = request_count.lock().await; - assert_eq!(*request_count_lock, 3, "Expected there to be 3 requests"); - } - } - - #[tokio::test] - async fn test_flaky() { - let tar_bz2 = "conda-forge/win-64/conda-22.9.0-py310h5588dad_2.tar.bz2"; - let conda = "conda-forge/win-64/conda-22.11.1-py38haa244fe_1.conda"; - - let tar_record = PackageRecord::new( - PackageName::from_str("conda").unwrap(), - Version::from_str("22.9.0").unwrap(), - "py310h5588dad_2".to_string(), - ); - - let conda_record = PackageRecord::new( - PackageName::from_str("conda").unwrap(), - Version::from_str("22.11.1").unwrap(), - "py38haa244fe_1".to_string(), - ); - - test_flaky_package_cache(tar_bz2, &tar_record, Middleware::FailTheFirstTwoRequests).await; - test_flaky_package_cache(conda, &conda_record, Middleware::FailTheFirstTwoRequests).await; - } - - #[tokio::test] - // Test if packages with different sha's are replaced even though they share the - // same BucketKey. - pub async fn test_package_cache_key_with_sha() { - let package_url = Url::parse("https://conda.anaconda.org/robostack/linux-64/ros-noetic-rosbridge-suite-0.11.14-py39h6fdeb60_14.tar.bz2").unwrap(); - - let mut pkg_record = PackageRecord::new( - PackageName::from_str("ros-noetic-rosbridge-suite").unwrap(), - Version::from_str("0.11.14").unwrap(), - "py39h6fdeb60_14".to_string(), - ); - pkg_record.sha256 = Some( - parse_digest_from_hex::( - "4dd9893f1eee45e1579d1a4f5533ef67a84b5e4b7515de7ed0db1dd47adc6bc8", - ) - .unwrap(), - ); - - // Create a temporary directory to store the packages - let packages_dir = tempdir().unwrap(); - let cache = RunExportsCache::new(packages_dir.path()); - - let cache_key = CacheKey::new( - &pkg_record, - "ros-noetic-rosbridge-suite-0.11.14-py39h6fdeb60_14.tar.bz2", - ) - .unwrap(); - - // Get the package to the cache - let first_cache_path = cache - .get_or_fetch_from_url( - &cache_key, - package_url.clone(), - ClientWithMiddleware::from(Client::new()), - None, - ) - .await - .unwrap(); - - // Change the sha256 of the package - // And expect the package to be replaced - let new_sha = parse_digest_from_hex::( - "5dd9893f1eee45e1579d1a4f5533ef67a84b5e4b7515de7ed0db1dd47adc6bc9", - ) - .unwrap(); - pkg_record.sha256 = Some(new_sha); - - let cache_key = CacheKey::new( - &pkg_record, - "ros-noetic-rosbridge-suite-0.11.14-py39h6fdeb60_14.tar.bz2", - ) - .unwrap(); - - // Get the package again - // and verify that the package was replaced - let second_package_cache = cache - .get_or_fetch_from_url( - &cache_key, - package_url.clone(), - ClientWithMiddleware::from(Client::new()), - None, - ) - .await - .unwrap(); - - assert_ne!(first_cache_path.path(), second_package_cache.path()); - } -} diff --git a/crates/rattler_cache/src/lib.rs b/crates/rattler_cache/src/lib.rs index 083e048de4..ff501ab49b 100644 --- a/crates/rattler_cache/src/lib.rs +++ b/crates/rattler_cache/src/lib.rs @@ -1,7 +1,7 @@ use std::path::PathBuf; -pub mod archive_cache; pub mod package_cache; +pub mod run_exports_cache; pub mod validation; From 7120f7efd59bba381293d2bdd4f9d79d66e0ef84 Mon Sep 17 00:00:00 2001 From: nichmor Date: Wed, 12 Feb 2025 18:19:31 +0200 Subject: [PATCH 14/23] misc: rename to run-exports cache --- .../src/run_exports_cache/cache_key.rs | 76 ++ .../src/run_exports_cache/download.rs | 65 ++ .../src/run_exports_cache/mod.rs | 670 ++++++++++++++++++ 3 files changed, 811 insertions(+) create mode 100644 crates/rattler_cache/src/run_exports_cache/cache_key.rs create mode 100644 crates/rattler_cache/src/run_exports_cache/download.rs create mode 100644 crates/rattler_cache/src/run_exports_cache/mod.rs diff --git a/crates/rattler_cache/src/run_exports_cache/cache_key.rs b/crates/rattler_cache/src/run_exports_cache/cache_key.rs new file mode 100644 index 0000000000..dde9d2ea9c --- /dev/null +++ b/crates/rattler_cache/src/run_exports_cache/cache_key.rs @@ -0,0 +1,76 @@ +use rattler_conda_types::{package::ArchiveIdentifier, PackageRecord}; +use rattler_digest::Sha256Hash; +use std::fmt::{Display, Formatter}; + +/// Provides a unique identifier for packages in the cache. +/// TODO: This could not be unique over multiple subdir. How to handle? +#[derive(Debug, Hash, Clone, Eq, PartialEq)] +pub struct CacheKey { + pub(crate) name: String, + pub(crate) version: String, + pub(crate) build_string: String, + pub(crate) sha256: Option, + pub(crate) extension: String, +} + +impl CacheKey { + /// Adds a sha256 hash of the archive. + pub fn with_sha256(mut self, sha256: Sha256Hash) -> Self { + self.sha256 = Some(sha256); + self + } + + /// Potentially adds a sha256 hash of the archive. + pub fn with_opt_sha256(mut self, sha256: Option) -> Self { + self.sha256 = sha256; + self + } +} + +impl CacheKey { + /// Return the sha256 hash of the package if it is known. + pub fn sha256(&self) -> Option { + self.sha256 + } + + /// Return the sha256 hash string of the package if it is known. + pub fn sha256_str(&self) -> String { + self.sha256() + .map(|hash| format!("{hash:x}")) + .unwrap_or_default() + } + + /// Try to create a new cache key from a package record and a filename. + pub fn new(record: &PackageRecord, filename: &str) -> Result { + let archive_identifier = ArchiveIdentifier::try_from_filename(filename) + .ok_or_else(|| CacheKeyError::InvalidArchiveIdentifier(filename.to_string()))?; + + Ok(Self { + name: record.name.as_normalized().to_string(), + version: record.version.to_string(), + build_string: record.build.clone(), + sha256: record.sha256, + extension: archive_identifier.archive_type.extension().to_string(), + }) + } +} + +#[derive(Debug, thiserror::Error)] +pub enum CacheKeyError { + #[error("Could not identify the archive type from the name: {0}")] + InvalidArchiveIdentifier(String), +} + +impl Display for CacheKey { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + write!( + f, + "{}-{}-{}-{}{}", + &self.name, + &self.version, + &self.build_string, + self.sha256_str(), + self.extension + ) + } +} diff --git a/crates/rattler_cache/src/run_exports_cache/download.rs b/crates/rattler_cache/src/run_exports_cache/download.rs new file mode 100644 index 0000000000..90d6893a9a --- /dev/null +++ b/crates/rattler_cache/src/run_exports_cache/download.rs @@ -0,0 +1,65 @@ +use std::sync::Arc; + +use ::tokio::io::AsyncWriteExt; +use fs_err::tokio; +use futures::StreamExt; +use rattler_package_streaming::DownloadReporter; +use tempfile::NamedTempFile; +// use tokio_stream::StreamExt; +use url::Url; + +/// Download the contents of the archive from the specified remote location +/// and store it in a temporary file. +pub(crate) async fn download( + client: reqwest_middleware::ClientWithMiddleware, + url: Url, + suffix: &str, + reporter: Option>, +) -> Result { + let temp_file = NamedTempFile::with_suffix(suffix)?; + + // Send the request for the file + let response = client.get(url.clone()).send().await?.error_for_status()?; + + if let Some(reporter) = &reporter { + reporter.on_download_start(); + } + + let total_bytes = response.content_length(); + // Convert the named temp file into a tokio file + let mut file = tokio::File::from_std(fs_err::File::from_parts( + temp_file.as_file().try_clone()?, + temp_file.path(), + )); + + let mut stream = response.bytes_stream(); + + let mut bytes_received = 0; + while let Some(chunk_result) = stream.next().await { + let chunk = chunk_result?; + + if let Some(reporter) = &reporter { + bytes_received += chunk.len() as u64; + reporter.on_download_progress(bytes_received, total_bytes); + } + file.write_all(&chunk).await?; + } + + file.flush().await?; + + Ok(temp_file) +} + +/// An error that can occur when downloading an archive. +#[derive(thiserror::Error, Debug)] +#[allow(missing_docs)] +pub enum DownloadError { + #[error("an io error occurred: {0}")] + Io(#[from] std::io::Error), + + #[error(transparent)] + ReqwestMiddleware(#[from] ::reqwest_middleware::Error), + + #[error(transparent)] + Reqwest(#[from] ::reqwest::Error), +} diff --git a/crates/rattler_cache/src/run_exports_cache/mod.rs b/crates/rattler_cache/src/run_exports_cache/mod.rs new file mode 100644 index 0000000000..a92c1ea223 --- /dev/null +++ b/crates/rattler_cache/src/run_exports_cache/mod.rs @@ -0,0 +1,670 @@ +//! This module provides functionality to cache extracted Conda packages. See +//! [`RunExportsCache`]. + +use std::{ + fmt::Debug, + future::Future, + path::{Path, PathBuf}, + sync::Arc, + time::{Duration, SystemTime}, +}; + +use dashmap::DashMap; +use download::DownloadError; +use fs_err::tokio as tokio_fs; +use parking_lot::Mutex; +use rattler_conda_types::package::RunExportsJson; +use rattler_networking::retry_policies::{DoNotRetryPolicy, RetryDecision, RetryPolicy}; +use rattler_package_streaming::{DownloadReporter, ExtractError}; +use tempfile::PersistError; +use tracing::instrument; +use url::Url; + +mod cache_key; +mod download; + +pub use cache_key::{CacheKey, CacheKeyError}; + +use crate::package_cache::CacheReporter; + +/// A [`RunExportsCache`] manages a cache of `run_exports.json` +/// +/// The store does not provide an implementation to get the data into the store. +/// Instead, this is left up to the user when the `run_exports.json` is requested. If the +/// `run_exports.json` is found in the cache it is returned immediately. However, if the +/// cache is missing a user defined function is called to populate the cache. This +/// separates the corners between caching and fetching of the content. +#[derive(Clone)] +pub struct RunExportsCache { + inner: Arc, +} + +/// A cache entry that contains the path to the package and the `run_exports.json` +#[derive(Clone, Debug)] +pub struct CacheEntry { + /// The `run_exports.json` of the package. + pub(crate) run_exports: Arc>, + /// The path to the file on disk. + pub(crate) path: PathBuf, +} + +impl CacheEntry { + /// Create a new cache entry. + pub(crate) fn new(run_exports: Option, path: PathBuf) -> Self { + Self { + run_exports: Arc::new(run_exports), + path, + } + } + + /// Returns the `run_exports.json` of the package. + pub fn run_exports(&self) -> Arc> { + self.run_exports.clone() + } + + /// Returns the path to the file on disk. + pub fn path(&self) -> &Path { + &self.path + } +} + +#[derive(Default)] +struct RunExportsCacheInner { + path: PathBuf, + run_exports: DashMap>>>, +} + +/// A key that defines the actual location of the package in the cache. +#[derive(Debug, Hash, Clone, Eq, PartialEq)] +pub struct BucketKey { + name: String, + version: String, + build_string: String, + sha256_string: String, +} + +impl From for BucketKey { + fn from(key: CacheKey) -> Self { + Self { + name: key.name.clone(), + version: key.version.clone(), + build_string: key.build_string.clone(), + sha256_string: key.sha256_str(), + } + } +} + +impl RunExportsCache { + /// Constructs a new [`RunExportsCache`] located at the specified path. + pub fn new(path: impl Into) -> Self { + Self { + inner: Arc::new(RunExportsCacheInner { + path: path.into(), + run_exports: DashMap::default(), + }), + } + } + + /// Returns the directory that contains the specified package. + /// + /// If the package was previously successfully fetched and stored in the + /// cache the directory containing the data is returned immediately. If + /// the package was not previously fetch the filesystem is checked to + /// see if a directory with valid package content exists. Otherwise, the + /// user provided `fetch` function is called to populate the cache. + /// + /// If the package is already being fetched by another task/thread the + /// request is coalesced. No duplicate fetch is performed. + pub async fn get_or_fetch( + &self, + cache_key: &CacheKey, + fetch: F, + ) -> Result + where + F: (Fn() -> Fut) + Send + 'static, + Fut: Future, E>> + Send + 'static, + E: std::error::Error + Send + Sync + 'static, + { + let cache_path = self.inner.path.join(cache_key.to_string()); + let cache_entry = self + .inner + .run_exports + .entry(cache_key.clone().into()) + .or_default() + .clone(); + + // Acquire the entry. From this point on we can be sure that only one task is + // accessing the cache entry. + let mut entry = cache_entry.lock().await; + + // Check if the cache entry is already stored in the cache. + if let Some(run_exports) = entry.as_ref() { + return Ok(run_exports.clone()); + } + + // Otherwise, defer to populate method to fill our cache. + let run_exports = fetch() + .await + .map_err(|e| RunExportsCacheError::Fetch(Arc::new(e)))?; + + if let Some(parent_dir) = cache_path.parent() { + if !parent_dir.exists() { + tokio_fs::create_dir_all(parent_dir).await?; + } + } + + tokio_fs::write(&cache_path, serde_json::to_string(&run_exports)?).await?; + let cache_entry = CacheEntry::new(run_exports, cache_path); + + entry.replace(cache_entry.clone()); + + Ok(cache_entry) + } + + /// Returns the directory that contains the specified package. + /// + /// This is a convenience wrapper around `get_or_fetch` which fetches the + /// package from the given URL if the package could not be found in the + /// cache. + pub async fn get_or_fetch_from_url( + &self, + cache_key: &CacheKey, + url: Url, + client: reqwest_middleware::ClientWithMiddleware, + reporter: Option>, + ) -> Result { + self.get_or_fetch_from_url_with_retry(cache_key, url, client, DoNotRetryPolicy, reporter) + .await + } + + /// Returns the directory that contains the specified package. + /// + /// This is a convenience wrapper around `get_or_fetch` which fetches the + /// package from the given URL if the package could not be found in the + /// cache. + /// + /// This function assumes that the `client` is already configured with a + /// retry middleware that will retry any request that fails. This function + /// uses the passed in `retry_policy` if, after the request has been sent + /// and the response is successful, streaming of the package data fails + /// and the whole request must be retried. + #[instrument(skip_all, fields(url=%url))] + pub async fn get_or_fetch_from_url_with_retry( + &self, + cache_key: &CacheKey, + url: Url, + client: reqwest_middleware::ClientWithMiddleware, + retry_policy: impl RetryPolicy + Send + 'static + Clone, + reporter: Option>, + ) -> Result { + let request_start = SystemTime::now(); + // Convert into cache key + let download_reporter = reporter.clone(); + + let extension = cache_key.extension.clone(); + // Get or fetch the package, using the specified fetch function + self.get_or_fetch(cache_key, move || { + + #[derive(Debug, thiserror::Error)] + enum FetchError{ + #[error(transparent)] + Download(#[from] DownloadError), + + #[error(transparent)] + Extract(#[from] ExtractError), + + } + + let url = url.clone(); + let client = client.clone(); + let retry_policy = retry_policy.clone(); + let download_reporter = download_reporter.clone(); + let extension = extension.clone(); + + async move { + let mut current_try = 0; + // Retry until the retry policy says to stop + loop { + current_try += 1; + tracing::debug!("downloading {}", &url); + // Extract the package + let result = crate::run_exports_cache::download::download( + client.clone(), + url.clone(), + &extension, + download_reporter.clone().map(|reporter| Arc::new(PassthroughReporter { + reporter, + index: Mutex::new(None), + }) as Arc::), + ) + .await; + + + // Extract any potential error + let err = match result { + Ok(result) => { + // now extract run_exports.json from the archive without unpacking + let file = + rattler_package_streaming::seek::read_package_file::(result); + + match file { + Ok(run_exports) => { + return Ok(Some(run_exports)); + }, + Err(err) => { + if matches!(err, ExtractError::MissingComponent) { + return Ok(None); + } + return Err(FetchError::Extract(err)); + + } + } + }, + Err(err) => FetchError::Download(err), + }; + + // Only retry on io errors. We assume that the user has + // middleware installed that handles connection retries. + if !matches!(&err, FetchError::Download(_)) { + return Err(err); + } + + // Determine whether to retry based on the retry policy + let execute_after = match retry_policy.should_retry(request_start, current_try) { + RetryDecision::Retry { execute_after } => execute_after, + RetryDecision::DoNotRetry => return Err(err), + }; + let duration = execute_after.duration_since(SystemTime::now()).unwrap_or(Duration::ZERO); + + // Wait for a second to let the remote service restore itself. This increases the + // chance of success. + tracing::warn!( + "failed to download and extract {} {}. Retry #{}, Sleeping {:?} until the next attempt...", + &url, + // destination.display(), + err, + current_try, + duration + ); + tokio::time::sleep(duration).await; + } + } + }) + .await + } +} + +/// An error that might be returned from one of the caching function of the +/// [`RunExportsCache`]. +#[derive(Debug, thiserror::Error)] +pub enum RunExportsCacheError { + /// An error occurred while fetching the package. + #[error(transparent)] + Fetch(#[from] Arc), + + /// A locking error occurred + #[error("{0}")] + Lock(String, #[source] std::io::Error), + + /// An IO error occurred + #[error("{0}")] + Io(#[from] std::io::Error), + + /// An error occurred while persisting the temp file + #[error("{0}")] + Persist(#[from] PersistError), + + /// An error occured when extracting `run_exports` from archive + #[error(transparent)] + Extract(#[from] ExtractError), + + /// An error occured when serializing `run_exports` + #[error(transparent)] + Serialize(#[from] serde_json::Error), + + /// The operation was cancelled + #[error("operation was cancelled")] + Cancelled, +} + +struct PassthroughReporter { + reporter: Arc, + index: Mutex>, +} + +impl DownloadReporter for PassthroughReporter { + fn on_download_start(&self) { + let index = self.reporter.on_download_start(); + assert!( + self.index.lock().replace(index).is_none(), + "on_download_start was called multiple times" + ); + } + + fn on_download_progress(&self, bytes_downloaded: u64, total_bytes: Option) { + let index = self.index.lock().expect("on_download_start was not called"); + self.reporter + .on_download_progress(index, bytes_downloaded, total_bytes); + } + + fn on_download_complete(&self) { + let index = self + .index + .lock() + .take() + .expect("on_download_start was not called"); + self.reporter.on_download_completed(index); + } +} + +#[cfg(test)] +mod test { + use std::{future::IntoFuture, net::SocketAddr, str::FromStr, sync::Arc}; + + use assert_matches::assert_matches; + use axum::{ + body::Body, + extract::State, + http::{Request, StatusCode}, + middleware, + middleware::Next, + response::{Redirect, Response}, + routing::get, + Router, + }; + + use rattler_conda_types::{PackageName, PackageRecord, Version}; + use rattler_digest::{parse_digest_from_hex, Sha256}; + use rattler_networking::retry_policies::{DoNotRetryPolicy, ExponentialBackoffBuilder}; + use reqwest::Client; + use reqwest_middleware::{ClientBuilder, ClientWithMiddleware}; + use reqwest_retry::RetryTransientMiddleware; + use tempfile::tempdir; + use tokio::sync::Mutex; + + use url::Url; + + use crate::run_exports_cache::CacheKey; + + use super::RunExportsCache; + + #[tokio::test] + pub async fn test_run_exports_cache_when_empty() { + // This archive does not contain a run_exports.json + // so we expect the cache to return None + let package_url = Url::parse("https://conda.anaconda.org/robostack/linux-64/ros-noetic-rosbridge-suite-0.11.14-py39h6fdeb60_14.tar.bz2").unwrap(); + + let cache_dir = tempdir().unwrap().into_path(); + + let cache = RunExportsCache::new(&cache_dir); + + let mut pkg_record = PackageRecord::new( + PackageName::from_str("ros-noetic-rosbridge-suite").unwrap(), + Version::from_str("0.11.14").unwrap(), + "py39h6fdeb60_14".to_string(), + ); + pkg_record.sha256 = Some( + parse_digest_from_hex::( + "4dd9893f1eee45e1579d1a4f5533ef67a84b5e4b7515de7ed0db1dd47adc6bc8", + ) + .unwrap(), + ); + + let cache_key = CacheKey::new( + &pkg_record, + "ros-noetic-rosbridge-suite-0.11.14-py39h6fdeb60_14.tar.bz2", + ) + .unwrap(); + + // Get the package to the cache + let cached_run_exports = cache + .get_or_fetch_from_url( + &cache_key, + package_url.clone(), + ClientWithMiddleware::from(Client::new()), + None, + ) + .await + .unwrap(); + + assert!(cached_run_exports.run_exports.is_none()); + } + + #[tokio::test] + pub async fn test_run_exports_cache_when_present() { + // This archive contains a run_exports.json + // so we expect the cache to return it + let package_url = + Url::parse("https://repo.prefix.dev/conda-forge/linux-64/zlib-1.3.1-hb9d3cd8_2.conda") + .unwrap(); + + let cache_dir = tempdir().unwrap().into_path(); + + let cache = RunExportsCache::new(&cache_dir); + + let pkg_record = PackageRecord::new( + PackageName::from_str("zlib").unwrap(), + Version::from_str("1.3.1").unwrap(), + "hb9d3cd8_2".to_string(), + ); + + let cache_key = CacheKey::new(&pkg_record, "zlib-1.3.1-hb9d3cd8_2.conda").unwrap(); + + // Get the package to the cache + let cached_run_exports = cache + .get_or_fetch_from_url( + &cache_key, + package_url.clone(), + ClientWithMiddleware::from(Client::new()), + None, + ) + .await + .unwrap(); + + assert!(cached_run_exports.run_exports.is_some()); + } + + /// A helper middleware function that fails the first two requests. + async fn fail_the_first_two_requests( + State(count): State>>, + req: Request, + next: Next, + ) -> Result { + let count = { + let mut count = count.lock().await; + *count += 1; + *count + }; + + println!("Running middleware for request #{count} for {}", req.uri()); + if count <= 2 { + println!("Discarding request!"); + return Err(StatusCode::INTERNAL_SERVER_ERROR); + } + + // requires the http crate to get the header name + Ok(next.run(req).await) + } + + enum Middleware { + FailTheFirstTwoRequests, + } + + async fn redirect_to_prefix( + axum::extract::Path((channel, subdir, file)): axum::extract::Path<(String, String, String)>, + ) -> Redirect { + Redirect::permanent(&format!("https://prefix.dev/{channel}/{subdir}/{file}")) + } + + async fn test_flaky_package_cache( + archive_name: &str, + package_record: &PackageRecord, + middleware: Middleware, + ) { + // Construct a service that serves raw files from the test directory + // build our application with a route + let router = Router::new() + // `GET /` goes to `root` + .route("/{channel}/{subdir}/{file}", get(redirect_to_prefix)); + + // Construct a router that returns data from the static dir but fails the first + // try. + let request_count = Arc::new(Mutex::new(0)); + + let router = match middleware { + Middleware::FailTheFirstTwoRequests => router.layer(middleware::from_fn_with_state( + request_count.clone(), + fail_the_first_two_requests, + )), + }; + + // Construct the server that will listen on localhost but with a *random port*. + // The random port is very important because it enables creating + // multiple instances at the same time. We need this to be able to run + // tests in parallel. + let addr = SocketAddr::new([127, 0, 0, 1].into(), 0); + let listener = tokio::net::TcpListener::bind(&addr).await.unwrap(); + let addr = listener.local_addr().unwrap(); + + let service = router.into_make_service(); + tokio::spawn(axum::serve(listener, service).into_future()); + + let packages_dir = tempdir().unwrap(); + let cache = RunExportsCache::new(packages_dir.path()); + + let server_url = Url::parse(&format!("http://localhost:{}", addr.port())).unwrap(); + + let client = ClientBuilder::new(Client::default()).build(); + + let cache_key = CacheKey::new(package_record, archive_name).unwrap(); + + // Do the first request without + let result = cache + .get_or_fetch_from_url_with_retry( + &cache_key, + server_url.join(archive_name).unwrap(), + client.clone(), + DoNotRetryPolicy, + None, + ) + .await; + + // First request without retry policy should fail + assert_matches!(result, Err(_)); + { + let request_count_lock = request_count.lock().await; + assert_eq!(*request_count_lock, 1, "Expected there to be 1 request"); + } + + let retry_policy = ExponentialBackoffBuilder::default().build_with_max_retries(3); + let client = ClientBuilder::from_client(client) + .with(RetryTransientMiddleware::new_with_policy(retry_policy)) + .build(); + + // The second one should fail after the 2nd try + let result = cache + .get_or_fetch_from_url_with_retry( + &cache_key, + server_url.join(archive_name).unwrap(), + client, + retry_policy, + None, + ) + .await; + + assert!(result.is_ok()); + { + let request_count_lock = request_count.lock().await; + assert_eq!(*request_count_lock, 3, "Expected there to be 3 requests"); + } + } + + #[tokio::test] + async fn test_flaky() { + let tar_bz2 = "conda-forge/win-64/conda-22.9.0-py310h5588dad_2.tar.bz2"; + let conda = "conda-forge/win-64/conda-22.11.1-py38haa244fe_1.conda"; + + let tar_record = PackageRecord::new( + PackageName::from_str("conda").unwrap(), + Version::from_str("22.9.0").unwrap(), + "py310h5588dad_2".to_string(), + ); + + let conda_record = PackageRecord::new( + PackageName::from_str("conda").unwrap(), + Version::from_str("22.11.1").unwrap(), + "py38haa244fe_1".to_string(), + ); + + test_flaky_package_cache(tar_bz2, &tar_record, Middleware::FailTheFirstTwoRequests).await; + test_flaky_package_cache(conda, &conda_record, Middleware::FailTheFirstTwoRequests).await; + } + + #[tokio::test] + // Test if packages with different sha's are replaced even though they share the + // same BucketKey. + pub async fn test_package_cache_key_with_sha() { + let package_url = Url::parse("https://conda.anaconda.org/robostack/linux-64/ros-noetic-rosbridge-suite-0.11.14-py39h6fdeb60_14.tar.bz2").unwrap(); + + let mut pkg_record = PackageRecord::new( + PackageName::from_str("ros-noetic-rosbridge-suite").unwrap(), + Version::from_str("0.11.14").unwrap(), + "py39h6fdeb60_14".to_string(), + ); + pkg_record.sha256 = Some( + parse_digest_from_hex::( + "4dd9893f1eee45e1579d1a4f5533ef67a84b5e4b7515de7ed0db1dd47adc6bc8", + ) + .unwrap(), + ); + + // Create a temporary directory to store the packages + let packages_dir = tempdir().unwrap(); + let cache = RunExportsCache::new(packages_dir.path()); + + let cache_key = CacheKey::new( + &pkg_record, + "ros-noetic-rosbridge-suite-0.11.14-py39h6fdeb60_14.tar.bz2", + ) + .unwrap(); + + // Get the package to the cache + let first_cache_path = cache + .get_or_fetch_from_url( + &cache_key, + package_url.clone(), + ClientWithMiddleware::from(Client::new()), + None, + ) + .await + .unwrap(); + + // Change the sha256 of the package + // And expect the package to be replaced + let new_sha = parse_digest_from_hex::( + "5dd9893f1eee45e1579d1a4f5533ef67a84b5e4b7515de7ed0db1dd47adc6bc9", + ) + .unwrap(); + pkg_record.sha256 = Some(new_sha); + + let cache_key = CacheKey::new( + &pkg_record, + "ros-noetic-rosbridge-suite-0.11.14-py39h6fdeb60_14.tar.bz2", + ) + .unwrap(); + + // Get the package again + // and verify that the package was replaced + let second_package_cache = cache + .get_or_fetch_from_url( + &cache_key, + package_url.clone(), + ClientWithMiddleware::from(Client::new()), + None, + ) + .await + .unwrap(); + + assert_ne!(first_cache_path.path(), second_package_cache.path()); + } +} From 3a76eee007140b5d6f576e0c3ae096a1b214d953 Mon Sep 17 00:00:00 2001 From: nichmor Date: Wed, 12 Feb 2025 19:17:59 +0200 Subject: [PATCH 15/23] misc: make writing file atomic --- .../src/run_exports_cache/download.rs | 4 ++- .../src/run_exports_cache/mod.rs | 21 ++++++++---- crates/rattler_package_streaming/src/seek.rs | 32 +++++++++++++++++++ 3 files changed, 49 insertions(+), 8 deletions(-) diff --git a/crates/rattler_cache/src/run_exports_cache/download.rs b/crates/rattler_cache/src/run_exports_cache/download.rs index 90d6893a9a..bd63d43320 100644 --- a/crates/rattler_cache/src/run_exports_cache/download.rs +++ b/crates/rattler_cache/src/run_exports_cache/download.rs @@ -1,6 +1,6 @@ use std::sync::Arc; -use ::tokio::io::AsyncWriteExt; +use ::tokio::io::{AsyncSeekExt, AsyncWriteExt}; use fs_err::tokio; use futures::StreamExt; use rattler_package_streaming::DownloadReporter; @@ -47,6 +47,8 @@ pub(crate) async fn download( file.flush().await?; + file.rewind().await?; + Ok(temp_file) } diff --git a/crates/rattler_cache/src/run_exports_cache/mod.rs b/crates/rattler_cache/src/run_exports_cache/mod.rs index a92c1ea223..e35286a35f 100644 --- a/crates/rattler_cache/src/run_exports_cache/mod.rs +++ b/crates/rattler_cache/src/run_exports_cache/mod.rs @@ -13,10 +13,10 @@ use dashmap::DashMap; use download::DownloadError; use fs_err::tokio as tokio_fs; use parking_lot::Mutex; -use rattler_conda_types::package::RunExportsJson; +use rattler_conda_types::package::{PackageFile, RunExportsJson}; use rattler_networking::retry_policies::{DoNotRetryPolicy, RetryDecision, RetryPolicy}; use rattler_package_streaming::{DownloadReporter, ExtractError}; -use tempfile::PersistError; +use tempfile::{NamedTempFile, PersistError}; use tracing::instrument; use url::Url; @@ -122,7 +122,7 @@ impl RunExportsCache { ) -> Result where F: (Fn() -> Fut) + Send + 'static, - Fut: Future, E>> + Send + 'static, + Fut: Future, E>> + Send + 'static, E: std::error::Error + Send + Sync + 'static, { let cache_path = self.inner.path.join(cache_key.to_string()); @@ -143,7 +143,7 @@ impl RunExportsCache { } // Otherwise, defer to populate method to fill our cache. - let run_exports = fetch() + let run_exports_file = fetch() .await .map_err(|e| RunExportsCacheError::Fetch(Arc::new(e)))?; @@ -153,7 +153,15 @@ impl RunExportsCache { } } - tokio_fs::write(&cache_path, serde_json::to_string(&run_exports)?).await?; + let run_exports = if let Some(file) = run_exports_file { + file.persist(&cache_path)?; + + let run_exports_str = tokio_fs::read_to_string(&cache_path).await?; + Some(RunExportsJson::from_str(&run_exports_str)?) + } else { + None + }; + let cache_entry = CacheEntry::new(run_exports, cache_path); entry.replace(cache_entry.clone()); @@ -239,13 +247,12 @@ impl RunExportsCache { ) .await; - // Extract any potential error let err = match result { Ok(result) => { // now extract run_exports.json from the archive without unpacking let file = - rattler_package_streaming::seek::read_package_file::(result); + rattler_package_streaming::seek::get_package_file::(result); match file { Ok(run_exports) => { diff --git a/crates/rattler_package_streaming/src/seek.rs b/crates/rattler_package_streaming/src/seek.rs index 7af6f984c8..2c596b1e47 100644 --- a/crates/rattler_package_streaming/src/seek.rs +++ b/crates/rattler_package_streaming/src/seek.rs @@ -6,11 +6,13 @@ use crate::ExtractError; use rattler_conda_types::package::ArchiveType; use rattler_conda_types::package::PackageFile; use std::fs::File; +use std::io::Write; use std::{ io::{Read, Seek, SeekFrom}, path::Path, }; use tar::Archive; +use tempfile::NamedTempFile; use zip::CompressionMethod; fn stream_conda_zip_entry<'a>( @@ -116,3 +118,33 @@ pub fn read_package_file(path: impl AsRef) -> Result( + named_temp_file: NamedTempFile, +) -> Result { + let mut tmp_file = NamedTempFile::new()?; + + match ArchiveType::try_from(named_temp_file.path()) + .ok_or(ExtractError::UnsupportedArchiveType)? + { + ArchiveType::TarBz2 => { + let mut archive = stream_tar_bz2(named_temp_file); + let buf = get_file_from_archive(&mut archive, P::package_path())?; + + let mut tmp_file = NamedTempFile::new()?; + + tmp_file.write_all(&buf)?; + } + ArchiveType::Conda => { + let mut info_archive = stream_conda_info(named_temp_file).unwrap(); + let buf = get_file_from_archive(&mut info_archive, P::package_path())?; + + tmp_file.write_all(&buf)?; + } + } + + tmp_file.flush()?; + + Ok(tmp_file) +} From fbc5708b5dc07183129d7f1a3ede78eea5af57e1 Mon Sep 17 00:00:00 2001 From: nichmor Date: Thu, 13 Feb 2025 10:53:44 +0200 Subject: [PATCH 16/23] misc: deduplicate some code --- .../src/run_exports_cache/mod.rs | 5 +- crates/rattler_package_streaming/Cargo.toml | 1 + crates/rattler_package_streaming/src/lib.rs | 7 ++ crates/rattler_package_streaming/src/seek.rs | 64 +++++++++---------- 4 files changed, 41 insertions(+), 36 deletions(-) diff --git a/crates/rattler_cache/src/run_exports_cache/mod.rs b/crates/rattler_cache/src/run_exports_cache/mod.rs index e35286a35f..d61ec1a652 100644 --- a/crates/rattler_cache/src/run_exports_cache/mod.rs +++ b/crates/rattler_cache/src/run_exports_cache/mod.rs @@ -251,8 +251,9 @@ impl RunExportsCache { let err = match result { Ok(result) => { // now extract run_exports.json from the archive without unpacking - let file = - rattler_package_streaming::seek::get_package_file::(result); + let file = simple_spawn_blocking::tokio::run_blocking_task(move || { + rattler_package_streaming::seek::get_package_file::(result) + }).await; match file { Ok(run_exports) => { diff --git a/crates/rattler_package_streaming/Cargo.toml b/crates/rattler_package_streaming/Cargo.toml index a02b15fef9..08b8c14e3c 100644 --- a/crates/rattler_package_streaming/Cargo.toml +++ b/crates/rattler_package_streaming/Cargo.toml @@ -22,6 +22,7 @@ rattler_networking = { path = "../rattler_networking", version = "0.22.3", defau rattler_redaction = { version = "0.1.6", path = "../rattler_redaction", features = ["reqwest", "reqwest-middleware"] } reqwest = { workspace = true, features = ["stream"], optional = true } reqwest-middleware = { workspace = true, optional = true } +simple_spawn_blocking = { version = "1.0.0", path = "../simple_spawn_blocking", features = ["tokio"] } serde_json = { workspace = true } tar = { workspace = true } tempfile = { workspace = true } diff --git a/crates/rattler_package_streaming/src/lib.rs b/crates/rattler_package_streaming/src/lib.rs index e64fafefdd..eee0c0ef7c 100644 --- a/crates/rattler_package_streaming/src/lib.rs +++ b/crates/rattler_package_streaming/src/lib.rs @@ -2,6 +2,7 @@ //! This crate provides the ability to extract a Conda package archive or specific parts of it. +use simple_spawn_blocking::Cancelled; use std::path::PathBuf; use zip::result::ZipError; @@ -62,6 +63,12 @@ impl From for ExtractError { } } +impl From for ExtractError { + fn from(_value: Cancelled) -> Self { + Self::Cancelled + } +} + #[cfg(feature = "reqwest")] impl From<::reqwest_middleware::Error> for ExtractError { fn from(err: ::reqwest_middleware::Error) -> Self { diff --git a/crates/rattler_package_streaming/src/seek.rs b/crates/rattler_package_streaming/src/seek.rs index 2c596b1e47..b6dd83d1b5 100644 --- a/crates/rattler_package_streaming/src/seek.rs +++ b/crates/rattler_package_streaming/src/seek.rs @@ -86,6 +86,25 @@ fn get_file_from_archive( Err(ExtractError::MissingComponent) } +/// Read a package file content from archive +fn read_package_file_content( + file: &File, + path: impl AsRef, +) -> Result, ExtractError> { + match ArchiveType::try_from(&path).ok_or(ExtractError::UnsupportedArchiveType)? { + ArchiveType::TarBz2 => { + let mut archive = stream_tar_bz2(file); + let buf = get_file_from_archive(&mut archive, P::package_path())?; + Ok(buf) + } + ArchiveType::Conda => { + let mut info_archive = stream_conda_info(file).unwrap(); + let buf = get_file_from_archive(&mut info_archive, P::package_path())?; + Ok(buf) + } + } +} + /// Read a package file from archive /// Note: If you want to extract multiple `info/*` files then this will be slightly /// slower than manually iterating over the archive entries with @@ -102,49 +121,26 @@ fn get_file_from_archive( pub fn read_package_file(path: impl AsRef) -> Result { // stream extract the file from a package let file = File::open(&path)?; + let content = read_package_file_content::

(&file, &path)?; - match ArchiveType::try_from(&path).ok_or(ExtractError::UnsupportedArchiveType)? { - ArchiveType::TarBz2 => { - let mut archive = stream_tar_bz2(file); - let buf = get_file_from_archive(&mut archive, P::package_path())?; - P::from_str(&String::from_utf8_lossy(&buf)) - .map_err(|e| ExtractError::ArchiveMemberParseError(P::package_path().to_owned(), e)) - } - ArchiveType::Conda => { - let mut info_archive = stream_conda_info(file).unwrap(); - let buf = get_file_from_archive(&mut info_archive, P::package_path())?; - P::from_str(&String::from_utf8_lossy(&buf)) - .map_err(|e| ExtractError::ArchiveMemberParseError(P::package_path().to_owned(), e)) - } - } + P::from_str(&String::from_utf8_lossy(&content)) + .map_err(|e| ExtractError::ArchiveMemberParseError(P::package_path().to_owned(), e)) } /// Get a [`PackageFile`] from temporary archive and return it as a temporary file. pub fn get_package_file( - named_temp_file: NamedTempFile, + archive_temp_file: NamedTempFile, ) -> Result { - let mut tmp_file = NamedTempFile::new()?; + let mut output_file = NamedTempFile::new()?; - match ArchiveType::try_from(named_temp_file.path()) - .ok_or(ExtractError::UnsupportedArchiveType)? - { - ArchiveType::TarBz2 => { - let mut archive = stream_tar_bz2(named_temp_file); - let buf = get_file_from_archive(&mut archive, P::package_path())?; + let content = + read_package_file_content::

(archive_temp_file.as_file(), archive_temp_file.path())?; - let mut tmp_file = NamedTempFile::new()?; + output_file.write_all(&content)?; - tmp_file.write_all(&buf)?; - } - ArchiveType::Conda => { - let mut info_archive = stream_conda_info(named_temp_file).unwrap(); - let buf = get_file_from_archive(&mut info_archive, P::package_path())?; - - tmp_file.write_all(&buf)?; - } - } + output_file.flush()?; - tmp_file.flush()?; + output_file.rewind()?; - Ok(tmp_file) + Ok(output_file) } From 64b5fb75fa44f55dd0c0850de0372d8562d51d33 Mon Sep 17 00:00:00 2001 From: nichmor Date: Thu, 13 Feb 2025 10:55:07 +0200 Subject: [PATCH 17/23] misc: refactor to run_exports constant --- crates/rattler_cache/src/consts.rs | 2 +- crates/rattler_cache/src/lib.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/crates/rattler_cache/src/consts.rs b/crates/rattler_cache/src/consts.rs index 20f97918ff..a72f4b25ee 100644 --- a/crates/rattler_cache/src/consts.rs +++ b/crates/rattler_cache/src/consts.rs @@ -1,5 +1,5 @@ /// The location in the main cache folder where the conda package cache is stored. pub const PACKAGE_CACHE_DIR: &str = "pkgs"; -pub const ARCHIVE_CACHE_DIR: &str = "archives"; +pub const RUN_EXPORTS_CACHE_DIR: &str = "run_exports"; /// The location in the main cache folder where the repodata cache is stored. pub const REPODATA_CACHE_DIR: &str = "repodata"; diff --git a/crates/rattler_cache/src/lib.rs b/crates/rattler_cache/src/lib.rs index ff501ab49b..1f932e301b 100644 --- a/crates/rattler_cache/src/lib.rs +++ b/crates/rattler_cache/src/lib.rs @@ -6,7 +6,7 @@ pub mod run_exports_cache; pub mod validation; mod consts; -pub use consts::{ARCHIVE_CACHE_DIR, PACKAGE_CACHE_DIR, REPODATA_CACHE_DIR}; +pub use consts::{PACKAGE_CACHE_DIR, REPODATA_CACHE_DIR, RUN_EXPORTS_CACHE_DIR}; /// Determines the default cache directory for rattler. /// It first checks the environment variable `RATTLER_CACHE_DIR`. From 642742e563faaa62724b84196a87f22ebf50039a Mon Sep 17 00:00:00 2001 From: nichmor Date: Mon, 17 Feb 2025 15:26:54 +0200 Subject: [PATCH 18/23] misc: apply changes --- .../src/run_exports_cache/cache_key.rs | 37 ++++++++++++------- .../src/run_exports_cache/download.rs | 1 - .../src/run_exports_cache/mod.rs | 22 +++++++---- crates/rattler_package_streaming/src/seek.rs | 27 ++++++-------- 4 files changed, 50 insertions(+), 37 deletions(-) diff --git a/crates/rattler_cache/src/run_exports_cache/cache_key.rs b/crates/rattler_cache/src/run_exports_cache/cache_key.rs index dde9d2ea9c..8ad7347b54 100644 --- a/crates/rattler_cache/src/run_exports_cache/cache_key.rs +++ b/crates/rattler_cache/src/run_exports_cache/cache_key.rs @@ -1,5 +1,5 @@ use rattler_conda_types::{package::ArchiveIdentifier, PackageRecord}; -use rattler_digest::Sha256Hash; +use rattler_digest::{Md5Hash, Sha256Hash}; use std::fmt::{Display, Formatter}; /// Provides a unique identifier for packages in the cache. @@ -10,21 +10,22 @@ pub struct CacheKey { pub(crate) version: String, pub(crate) build_string: String, pub(crate) sha256: Option, + pub(crate) md5: Option, pub(crate) extension: String, } impl CacheKey { - /// Adds a sha256 hash of the archive. - pub fn with_sha256(mut self, sha256: Sha256Hash) -> Self { - self.sha256 = Some(sha256); - self - } - /// Potentially adds a sha256 hash of the archive. pub fn with_opt_sha256(mut self, sha256: Option) -> Self { self.sha256 = sha256; self } + + /// Potentially adds a md5 hash of the archive. + pub fn with_opt_md5(mut self, md5: Option) -> Self { + self.md5 = md5; + self + } } impl CacheKey { @@ -33,6 +34,11 @@ impl CacheKey { self.sha256 } + /// Return the md5 hash of the package if it is known. + pub fn md5(&self) -> Option { + self.md5 + } + /// Return the sha256 hash string of the package if it is known. pub fn sha256_str(&self) -> String { self.sha256() @@ -50,6 +56,7 @@ impl CacheKey { version: record.version.to_string(), build_string: record.build.clone(), sha256: record.sha256, + md5: record.md5, extension: archive_identifier.archive_type.extension().to_string(), }) } @@ -63,14 +70,18 @@ pub enum CacheKeyError { impl Display for CacheKey { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + // we need to use either sha256 or md5 hash to display the key + // if both are none, we ignore them + let display_key = match (self.sha256(), self.md5()) { + (Some(sha256), _) => format!("-{sha256:x}"), + (_, Some(md5)) => format!("-{md5:x}"), + _ => "".to_string(), + }; + write!( f, - "{}-{}-{}-{}{}", - &self.name, - &self.version, - &self.build_string, - self.sha256_str(), - self.extension + "{}-{}-{}{}{}", + &self.name, &self.version, &self.build_string, display_key, self.extension ) } } diff --git a/crates/rattler_cache/src/run_exports_cache/download.rs b/crates/rattler_cache/src/run_exports_cache/download.rs index bd63d43320..75f5858a1e 100644 --- a/crates/rattler_cache/src/run_exports_cache/download.rs +++ b/crates/rattler_cache/src/run_exports_cache/download.rs @@ -5,7 +5,6 @@ use fs_err::tokio; use futures::StreamExt; use rattler_package_streaming::DownloadReporter; use tempfile::NamedTempFile; -// use tokio_stream::StreamExt; use url::Url; /// Download the contents of the archive from the specified remote location diff --git a/crates/rattler_cache/src/run_exports_cache/mod.rs b/crates/rattler_cache/src/run_exports_cache/mod.rs index d61ec1a652..5ddbbdf2ad 100644 --- a/crates/rattler_cache/src/run_exports_cache/mod.rs +++ b/crates/rattler_cache/src/run_exports_cache/mod.rs @@ -43,7 +43,7 @@ pub struct RunExportsCache { #[derive(Clone, Debug)] pub struct CacheEntry { /// The `run_exports.json` of the package. - pub(crate) run_exports: Arc>, + pub(crate) run_exports: Option, /// The path to the file on disk. pub(crate) path: PathBuf, } @@ -52,13 +52,13 @@ impl CacheEntry { /// Create a new cache entry. pub(crate) fn new(run_exports: Option, path: PathBuf) -> Self { Self { - run_exports: Arc::new(run_exports), + run_exports: run_exports, path, } } /// Returns the `run_exports.json` of the package. - pub fn run_exports(&self) -> Arc> { + pub fn run_exports(&self) -> Option { self.run_exports.clone() } @@ -221,6 +221,9 @@ impl RunExportsCache { #[error(transparent)] Extract(#[from] ExtractError), + #[error(transparent)] + Io(#[from] std::io::Error), + } let url = url.clone(); @@ -250,14 +253,17 @@ impl RunExportsCache { // Extract any potential error let err = match result { Ok(result) => { + let temp_file = NamedTempFile::new()?; + // Clone the file handler to be able to pass it to the blocking task + let mut file_handler = temp_file.as_file().try_clone()?; // now extract run_exports.json from the archive without unpacking - let file = simple_spawn_blocking::tokio::run_blocking_task(move || { - rattler_package_streaming::seek::get_package_file::(result) + let result = simple_spawn_blocking::tokio::run_blocking_task(move || { + rattler_package_streaming::seek::extract_package_file::(result.as_file(), result.path(), &mut file_handler) }).await; - match file { - Ok(run_exports) => { - return Ok(Some(run_exports)); + match result { + Ok(()) => { + return Ok(Some(temp_file)); }, Err(err) => { if matches!(err, ExtractError::MissingComponent) { diff --git a/crates/rattler_package_streaming/src/seek.rs b/crates/rattler_package_streaming/src/seek.rs index b6dd83d1b5..0e8245a2e5 100644 --- a/crates/rattler_package_streaming/src/seek.rs +++ b/crates/rattler_package_streaming/src/seek.rs @@ -87,8 +87,8 @@ fn get_file_from_archive( } /// Read a package file content from archive -fn read_package_file_content( - file: &File, +fn read_package_file_content<'a, P: PackageFile>( + file: impl Read + Seek + 'a, path: impl AsRef, ) -> Result, ExtractError> { match ArchiveType::try_from(&path).ok_or(ExtractError::UnsupportedArchiveType)? { @@ -127,20 +127,17 @@ pub fn read_package_file(path: impl AsRef) -> Result( - archive_temp_file: NamedTempFile, -) -> Result { - let mut output_file = NamedTempFile::new()?; - - let content = - read_package_file_content::

(archive_temp_file.as_file(), archive_temp_file.path())?; - - output_file.write_all(&content)?; +/// Get a [`PackageFile`] from temporary archive and extract it to a writer +pub fn extract_package_file<'a, P: PackageFile>( + reader: impl Read + Seek + 'a, + location: &Path, + writer: &mut impl Write, +) -> Result<(), ExtractError> { + let content = read_package_file_content::

(reader, location)?; - output_file.flush()?; + writer.write_all(&content)?; - output_file.rewind()?; + writer.flush()?; - Ok(output_file) + Ok(()) } From e61fcbfd3713187850bcd2a5f569c467886022dd Mon Sep 17 00:00:00 2001 From: nichmor Date: Mon, 17 Feb 2025 15:32:56 +0200 Subject: [PATCH 19/23] misc: rewind after writing --- crates/rattler_cache/src/run_exports_cache/mod.rs | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/crates/rattler_cache/src/run_exports_cache/mod.rs b/crates/rattler_cache/src/run_exports_cache/mod.rs index 5ddbbdf2ad..4b08756ef6 100644 --- a/crates/rattler_cache/src/run_exports_cache/mod.rs +++ b/crates/rattler_cache/src/run_exports_cache/mod.rs @@ -2,11 +2,7 @@ //! [`RunExportsCache`]. use std::{ - fmt::Debug, - future::Future, - path::{Path, PathBuf}, - sync::Arc, - time::{Duration, SystemTime}, + fmt::Debug, future::Future, io::Seek, path::{Path, PathBuf}, sync::Arc, time::{Duration, SystemTime} }; use dashmap::DashMap; @@ -258,7 +254,9 @@ impl RunExportsCache { let mut file_handler = temp_file.as_file().try_clone()?; // now extract run_exports.json from the archive without unpacking let result = simple_spawn_blocking::tokio::run_blocking_task(move || { - rattler_package_streaming::seek::extract_package_file::(result.as_file(), result.path(), &mut file_handler) + let op_result = rattler_package_streaming::seek::extract_package_file::(result.as_file(), result.path(), &mut file_handler)?; + file_handler.rewind()?; + Ok(op_result) }).await; match result { From f7e2c56242d17bfc0b6f69ad17b01177490e715b Mon Sep 17 00:00:00 2001 From: nichmor Date: Mon, 17 Feb 2025 15:36:10 +0200 Subject: [PATCH 20/23] misc: remove unit return --- crates/rattler_cache/src/run_exports_cache/mod.rs | 6 +++--- crates/rattler_package_streaming/src/seek.rs | 1 - 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/crates/rattler_cache/src/run_exports_cache/mod.rs b/crates/rattler_cache/src/run_exports_cache/mod.rs index 4b08756ef6..c398958114 100644 --- a/crates/rattler_cache/src/run_exports_cache/mod.rs +++ b/crates/rattler_cache/src/run_exports_cache/mod.rs @@ -48,7 +48,7 @@ impl CacheEntry { /// Create a new cache entry. pub(crate) fn new(run_exports: Option, path: PathBuf) -> Self { Self { - run_exports: run_exports, + run_exports, path, } } @@ -254,9 +254,9 @@ impl RunExportsCache { let mut file_handler = temp_file.as_file().try_clone()?; // now extract run_exports.json from the archive without unpacking let result = simple_spawn_blocking::tokio::run_blocking_task(move || { - let op_result = rattler_package_streaming::seek::extract_package_file::(result.as_file(), result.path(), &mut file_handler)?; + rattler_package_streaming::seek::extract_package_file::(result.as_file(), result.path(), &mut file_handler)?; file_handler.rewind()?; - Ok(op_result) + Ok(()) }).await; match result { diff --git a/crates/rattler_package_streaming/src/seek.rs b/crates/rattler_package_streaming/src/seek.rs index 0e8245a2e5..a4cdd4f89b 100644 --- a/crates/rattler_package_streaming/src/seek.rs +++ b/crates/rattler_package_streaming/src/seek.rs @@ -12,7 +12,6 @@ use std::{ path::Path, }; use tar::Archive; -use tempfile::NamedTempFile; use zip::CompressionMethod; fn stream_conda_zip_entry<'a>( From 45002a1773be364397ff7a1f9fc786b28285f1f3 Mon Sep 17 00:00:00 2001 From: nichmor Date: Mon, 17 Feb 2025 18:23:55 +0200 Subject: [PATCH 21/23] Update crates/rattler_cache/src/run_exports_cache/cache_key.rs Co-authored-by: Tim de Jager --- crates/rattler_cache/src/run_exports_cache/cache_key.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/rattler_cache/src/run_exports_cache/cache_key.rs b/crates/rattler_cache/src/run_exports_cache/cache_key.rs index 8ad7347b54..5d1bbdfbfb 100644 --- a/crates/rattler_cache/src/run_exports_cache/cache_key.rs +++ b/crates/rattler_cache/src/run_exports_cache/cache_key.rs @@ -64,7 +64,7 @@ impl CacheKey { #[derive(Debug, thiserror::Error)] pub enum CacheKeyError { - #[error("Could not identify the archive type from the name: {0}")] + #[error("could not identify the archive type from the name: {0}")] InvalidArchiveIdentifier(String), } From 35f44a3496d9f8c025a5ad6b2f4e1a7be477ca78 Mon Sep 17 00:00:00 2001 From: nichmor Date: Mon, 17 Feb 2025 18:27:45 +0200 Subject: [PATCH 22/23] misc: remove some outdated comments --- .../src/run_exports_cache/cache_key.rs | 3 +-- .../src/run_exports_cache/mod.rs | 22 ++++++++++--------- 2 files changed, 13 insertions(+), 12 deletions(-) diff --git a/crates/rattler_cache/src/run_exports_cache/cache_key.rs b/crates/rattler_cache/src/run_exports_cache/cache_key.rs index 5d1bbdfbfb..a5383160ae 100644 --- a/crates/rattler_cache/src/run_exports_cache/cache_key.rs +++ b/crates/rattler_cache/src/run_exports_cache/cache_key.rs @@ -3,7 +3,6 @@ use rattler_digest::{Md5Hash, Sha256Hash}; use std::fmt::{Display, Formatter}; /// Provides a unique identifier for packages in the cache. -/// TODO: This could not be unique over multiple subdir. How to handle? #[derive(Debug, Hash, Clone, Eq, PartialEq)] pub struct CacheKey { pub(crate) name: String, @@ -47,7 +46,7 @@ impl CacheKey { } /// Try to create a new cache key from a package record and a filename. - pub fn new(record: &PackageRecord, filename: &str) -> Result { + pub fn create(record: &PackageRecord, filename: &str) -> Result { let archive_identifier = ArchiveIdentifier::try_from_filename(filename) .ok_or_else(|| CacheKeyError::InvalidArchiveIdentifier(filename.to_string()))?; diff --git a/crates/rattler_cache/src/run_exports_cache/mod.rs b/crates/rattler_cache/src/run_exports_cache/mod.rs index c398958114..627b0a7975 100644 --- a/crates/rattler_cache/src/run_exports_cache/mod.rs +++ b/crates/rattler_cache/src/run_exports_cache/mod.rs @@ -2,7 +2,12 @@ //! [`RunExportsCache`]. use std::{ - fmt::Debug, future::Future, io::Seek, path::{Path, PathBuf}, sync::Arc, time::{Duration, SystemTime} + fmt::Debug, + future::Future, + io::Seek, + path::{Path, PathBuf}, + sync::Arc, + time::{Duration, SystemTime}, }; use dashmap::DashMap; @@ -47,10 +52,7 @@ pub struct CacheEntry { impl CacheEntry { /// Create a new cache entry. pub(crate) fn new(run_exports: Option, path: PathBuf) -> Self { - Self { - run_exports, - path, - } + Self { run_exports, path } } /// Returns the `run_exports.json` of the package. @@ -422,7 +424,7 @@ mod test { .unwrap(), ); - let cache_key = CacheKey::new( + let cache_key = CacheKey::create( &pkg_record, "ros-noetic-rosbridge-suite-0.11.14-py39h6fdeb60_14.tar.bz2", ) @@ -460,7 +462,7 @@ mod test { "hb9d3cd8_2".to_string(), ); - let cache_key = CacheKey::new(&pkg_record, "zlib-1.3.1-hb9d3cd8_2.conda").unwrap(); + let cache_key = CacheKey::create(&pkg_record, "zlib-1.3.1-hb9d3cd8_2.conda").unwrap(); // Get the package to the cache let cached_run_exports = cache @@ -548,7 +550,7 @@ mod test { let client = ClientBuilder::new(Client::default()).build(); - let cache_key = CacheKey::new(package_record, archive_name).unwrap(); + let cache_key = CacheKey::create(package_record, archive_name).unwrap(); // Do the first request without let result = cache @@ -634,7 +636,7 @@ mod test { let packages_dir = tempdir().unwrap(); let cache = RunExportsCache::new(packages_dir.path()); - let cache_key = CacheKey::new( + let cache_key = CacheKey::create( &pkg_record, "ros-noetic-rosbridge-suite-0.11.14-py39h6fdeb60_14.tar.bz2", ) @@ -659,7 +661,7 @@ mod test { .unwrap(); pkg_record.sha256 = Some(new_sha); - let cache_key = CacheKey::new( + let cache_key = CacheKey::create( &pkg_record, "ros-noetic-rosbridge-suite-0.11.14-py39h6fdeb60_14.tar.bz2", ) From bb02a0c4736b9e3a47c3549b372af040e7cba1c3 Mon Sep 17 00:00:00 2001 From: nichmor Date: Tue, 18 Feb 2025 16:06:47 +0200 Subject: [PATCH 23/23] misc: apply changes --- .../rattler_cache/src/run_exports_cache/download.rs | 10 +++++----- crates/rattler_package_streaming/src/seek.rs | 13 +++++++------ 2 files changed, 12 insertions(+), 11 deletions(-) diff --git a/crates/rattler_cache/src/run_exports_cache/download.rs b/crates/rattler_cache/src/run_exports_cache/download.rs index 75f5858a1e..e9f8d4a619 100644 --- a/crates/rattler_cache/src/run_exports_cache/download.rs +++ b/crates/rattler_cache/src/run_exports_cache/download.rs @@ -25,11 +25,9 @@ pub(crate) async fn download( } let total_bytes = response.content_length(); + let (tmp_file_handle, tmp_path) = temp_file.into_parts(); // Convert the named temp file into a tokio file - let mut file = tokio::File::from_std(fs_err::File::from_parts( - temp_file.as_file().try_clone()?, - temp_file.path(), - )); + let mut file = tokio::File::from_std(fs_err::File::from_parts(tmp_file_handle, &tmp_path)); let mut stream = response.bytes_stream(); @@ -48,7 +46,9 @@ pub(crate) async fn download( file.rewind().await?; - Ok(temp_file) + let file_handle = file.into_parts().0.into_std().await; + + Ok(NamedTempFile::from_parts(file_handle, tmp_path)) } /// An error that can occur when downloading an archive. diff --git a/crates/rattler_package_streaming/src/seek.rs b/crates/rattler_package_streaming/src/seek.rs index a4cdd4f89b..6a1bd3d46a 100644 --- a/crates/rattler_package_streaming/src/seek.rs +++ b/crates/rattler_package_streaming/src/seek.rs @@ -85,20 +85,21 @@ fn get_file_from_archive( Err(ExtractError::MissingComponent) } -/// Read a package file content from archive -fn read_package_file_content<'a, P: PackageFile>( +/// Read a package file content from archive based on the path +fn read_package_file_content<'a>( file: impl Read + Seek + 'a, path: impl AsRef, + package_path: impl AsRef, ) -> Result, ExtractError> { match ArchiveType::try_from(&path).ok_or(ExtractError::UnsupportedArchiveType)? { ArchiveType::TarBz2 => { let mut archive = stream_tar_bz2(file); - let buf = get_file_from_archive(&mut archive, P::package_path())?; + let buf = get_file_from_archive(&mut archive, package_path.as_ref())?; Ok(buf) } ArchiveType::Conda => { let mut info_archive = stream_conda_info(file).unwrap(); - let buf = get_file_from_archive(&mut info_archive, P::package_path())?; + let buf = get_file_from_archive(&mut info_archive, package_path.as_ref())?; Ok(buf) } } @@ -120,7 +121,7 @@ fn read_package_file_content<'a, P: PackageFile>( pub fn read_package_file(path: impl AsRef) -> Result { // stream extract the file from a package let file = File::open(&path)?; - let content = read_package_file_content::

(&file, &path)?; + let content = read_package_file_content(&file, &path, P::package_path())?; P::from_str(&String::from_utf8_lossy(&content)) .map_err(|e| ExtractError::ArchiveMemberParseError(P::package_path().to_owned(), e)) @@ -132,7 +133,7 @@ pub fn extract_package_file<'a, P: PackageFile>( location: &Path, writer: &mut impl Write, ) -> Result<(), ExtractError> { - let content = read_package_file_content::

(reader, location)?; + let content = read_package_file_content(reader, location, P::package_path())?; writer.write_all(&content)?;