diff --git a/crates/install-wheel-rs/src/metadata.rs b/crates/install-wheel-rs/src/metadata.rs index da3622b35a55..5129ad097a54 100644 --- a/crates/install-wheel-rs/src/metadata.rs +++ b/crates/install-wheel-rs/src/metadata.rs @@ -72,6 +72,43 @@ pub fn find_archive_dist_info<'a, T: Copy>( Ok((payload, dist_info_prefix)) } +/// Returns `true` if the file is a `METADATA` file in a `.dist-info` directory that matches the +/// wheel filename. +pub fn is_metadata_entry(path: &str, filename: &WheelFilename) -> Result { + let Some((dist_info_dir, file)) = path.split_once('/') else { + return Ok(false); + }; + if file != "METADATA" { + return Ok(false); + } + let Some(dist_info_prefix) = dist_info_dir.strip_suffix(".dist-info") else { + return Ok(false); + }; + + // Like `pip`, validate that the `.dist-info` directory is prefixed with the canonical + // package name, but only warn if the version is not the normalized version. + let Some((name, version)) = dist_info_prefix.rsplit_once('-') else { + return Err(Error::MissingDistInfoSegments(dist_info_prefix.to_string())); + }; + if PackageName::from_str(name)? != filename.name { + return Err(Error::MissingDistInfoPackageName( + dist_info_prefix.to_string(), + filename.name.to_string(), + )); + } + if !Version::from_str(version).is_ok_and(|version| version == filename.version) { + warn!( + "{}", + Error::MissingDistInfoVersion( + dist_info_prefix.to_string(), + filename.version.to_string(), + ) + ); + } + + Ok(true) +} + /// Given an archive, read the `METADATA` from the `.dist-info` directory. pub fn read_archive_metadata( filename: &WheelFilename, diff --git a/crates/uv-client/src/error.rs b/crates/uv-client/src/error.rs index 64258a588dc9..afa86b54478f 100644 --- a/crates/uv-client/src/error.rs +++ b/crates/uv-client/src/error.rs @@ -173,6 +173,10 @@ pub enum ErrorKind { #[source] Box, ), + /// The metadata file was not found in the wheel. + #[error("Metadata file `{0}` was not found in {1}")] + MetadataNotFound(WheelFilename, String), + /// An error that happened while making a request or in a reqwest middleware. #[error(transparent)] WrappedReqwestError(#[from] WrappedReqwestError), diff --git a/crates/uv-client/src/registry_client.rs b/crates/uv-client/src/registry_client.rs index ba5882a8ed4a..3670ff78b58e 100644 --- a/crates/uv-client/src/registry_client.rs +++ b/crates/uv-client/src/registry_client.rs @@ -4,17 +4,18 @@ use std::path::PathBuf; use std::str::FromStr; use async_http_range_reader::AsyncHttpRangeReader; -use futures::FutureExt; +use futures::{FutureExt, TryStreamExt}; use http::HeaderMap; use reqwest::{Client, Response, StatusCode}; use serde::{Deserialize, Serialize}; -use tokio_util::compat::TokioAsyncReadCompatExt; +use tokio::io::AsyncReadExt; +use tokio_util::compat::{FuturesAsyncReadCompatExt, TokioAsyncReadCompatExt}; use tracing::{info_span, instrument, trace, warn, Instrument}; use url::Url; use distribution_filename::{DistFilename, SourceDistFilename, WheelFilename}; use distribution_types::{BuiltDist, File, FileLocation, IndexUrl, IndexUrls, Name}; -use install_wheel_rs::metadata::find_archive_dist_info; +use install_wheel_rs::metadata::{find_archive_dist_info, is_metadata_entry}; use pep440_rs::Version; use pep508_rs::MarkerEnvironment; use platform_tags::Platform; @@ -593,7 +594,8 @@ impl RegistryClient { .instrument(info_span!("read_metadata_range_request", wheel = %filename)) }; - self.cached_client() + let result = self + .cached_client() .get_serde( req, &cache_entry, @@ -601,8 +603,67 @@ impl RegistryClient { read_metadata_range_request, ) .await + .map_err(crate::Error::from); + + match result { + Ok(metadata) => return Ok(metadata), + Err(err) => { + if err.is_http_range_requests_unsupported() { + // The range request version failed. Fall back to streaming the file to search + // for the METADATA file. + warn!("Range requests not supported for {filename}; streaming wheel"); + } else { + return Err(err); + } + } + }; + + // Create a request to stream the file. + let req = self + .uncached_client() + .get(url.clone()) + .header( + // `reqwest` defaults to accepting compressed responses. + // Specify identity encoding to get consistent .whl downloading + // behavior from servers. ref: https://github.com/pypa/pip/pull/1688 + "accept-encoding", + reqwest::header::HeaderValue::from_static("identity"), + ) + .build() + .map_err(ErrorKind::from)?; + + // Stream the file, searching for the METADATA. + let read_metadata_stream = |response: Response| { + async { + let reader = response + .bytes_stream() + .map_err(|err| self.handle_response_errors(err)) + .into_async_read(); + + read_metadata_async_stream(filename, url.to_string(), reader).await + } + .instrument(info_span!("read_metadata_stream", wheel = %filename)) + }; + + self.cached_client() + .get_serde(req, &cache_entry, cache_control, read_metadata_stream) + .await .map_err(crate::Error::from) } + + /// Handle a specific `reqwest` error, and convert it to [`io::Error`]. + fn handle_response_errors(&self, err: reqwest::Error) -> std::io::Error { + if err.is_timeout() { + std::io::Error::new( + std::io::ErrorKind::TimedOut, + format!( + "Failed to download distribution due to network timeout. Try increasing UV_HTTP_TIMEOUT (current value: {}s).", self.timeout() + ), + ) + } else { + std::io::Error::new(std::io::ErrorKind::Other, err) + } + } } /// Read a wheel's `METADATA` file from a zip file. @@ -643,6 +704,50 @@ async fn read_metadata_async_seek( Ok(metadata) } +/// Like [`read_metadata_async_seek`], but doesn't use seek. +async fn read_metadata_async_stream( + filename: &WheelFilename, + debug_source: String, + reader: R, +) -> Result { + let reader = futures::io::BufReader::with_capacity(128 * 1024, reader); + let mut zip = async_zip::base::read::stream::ZipFileReader::new(reader); + + while let Some(mut entry) = zip + .next_with_entry() + .await + .map_err(|err| ErrorKind::Zip(filename.clone(), err))? + { + // Find the `METADATA` entry. + let path = entry + .reader() + .entry() + .filename() + .as_str() + .map_err(|err| ErrorKind::Zip(filename.clone(), err))?; + + if is_metadata_entry(path, filename).map_err(ErrorKind::DistInfo)? { + let mut reader = entry.reader_mut().compat(); + let mut contents = Vec::new(); + reader.read_to_end(&mut contents).await.unwrap(); + + let metadata = Metadata23::parse_metadata(&contents).map_err(|err| { + ErrorKind::MetadataParseError(filename.clone(), debug_source, Box::new(err)) + })?; + return Ok(metadata); + } + + // Close current file to get access to the next one. See docs: + // https://docs.rs/async_zip/0.0.16/async_zip/base/read/stream/ + zip = entry + .skip() + .await + .map_err(|err| ErrorKind::Zip(filename.clone(), err))?; + } + + Err(ErrorKind::MetadataNotFound(filename.clone(), debug_source).into()) +} + #[derive( Default, Debug, Serialize, Deserialize, rkyv::Archive, rkyv::Deserialize, rkyv::Serialize, )] diff --git a/crates/uv-distribution/src/distribution_database.rs b/crates/uv-distribution/src/distribution_database.rs index f8e1e848a82e..f8ddcd296b8a 100644 --- a/crates/uv-distribution/src/distribution_database.rs +++ b/crates/uv-distribution/src/distribution_database.rs @@ -198,31 +198,6 @@ impl<'a, Context: BuildContext> DistributionDatabase<'a, Context> { hashes: archive.hashes, filename: wheel.filename.clone(), }), - Err(Error::Client(err)) if err.is_http_streaming_unsupported() => { - warn!( - "Streaming unsupported for {dist}; downloading wheel to disk ({err})" - ); - - // If the request failed because streaming is unsupported, download the - // wheel directly. - let archive = self - .download_wheel( - url, - &wheel.filename, - wheel.file.size, - &wheel_entry, - dist, - hashes, - ) - .await?; - - Ok(LocalWheel { - dist: Dist::Built(dist.clone()), - archive: self.build_context.cache().archive(&archive.id), - hashes: archive.hashes, - filename: wheel.filename.clone(), - }) - } Err(Error::Extract(err)) => { if err.is_http_streaming_unsupported() { warn!( @@ -308,36 +283,6 @@ impl<'a, Context: BuildContext> DistributionDatabase<'a, Context> { filename: wheel.filename.clone(), }) } - Err(Error::Extract(err)) => { - if err.is_http_streaming_unsupported() { - warn!( - "Streaming unsupported for {dist}; downloading wheel to disk ({err})" - ); - } else if err.is_http_streaming_failed() { - warn!("Streaming failed for {dist}; downloading wheel to disk ({err})"); - } else { - return Err(Error::Extract(err)); - } - - // If the request failed because streaming is unsupported, download the - // wheel directly. - let archive = self - .download_wheel( - wheel.url.raw().clone(), - &wheel.filename, - None, - &wheel_entry, - dist, - hashes, - ) - .await?; - Ok(LocalWheel { - dist: Dist::Built(dist.clone()), - archive: self.build_context.cache().archive(&archive.id), - hashes: archive.hashes, - filename: wheel.filename.clone(), - }) - } Err(err) => Err(err), } } @@ -443,11 +388,11 @@ impl<'a, Context: BuildContext> DistributionDatabase<'a, Context> { match result { Ok(metadata) => Ok(ArchiveMetadata::from_metadata23(metadata)), - Err(err) if err.is_http_range_requests_unsupported() => { - warn!("Range requests unsupported when fetching metadata for {dist}; downloading wheel directly ({err})"); + Err(err) if err.is_http_streaming_unsupported() => { + warn!("Streaming unsupported when fetching metadata for {dist}; downloading wheel directly ({err})"); - // If the request failed due to an error that could be resolved by downloading the - // wheel directly, try that. + // If the request failed due to an error that could be resolved by + // downloading the wheel directly, try that. let wheel = self.get_wheel(dist, hashes).await?; let metadata = wheel.metadata()?; let hashes = wheel.hashes; diff --git a/crates/uv/tests/pip_sync.rs b/crates/uv/tests/pip_sync.rs index eb640a74cdac..f39e251f1092 100644 --- a/crates/uv/tests/pip_sync.rs +++ b/crates/uv/tests/pip_sync.rs @@ -3340,6 +3340,7 @@ fn no_stream() -> Result<()> { ----- stderr ----- Resolved 1 package in [TIME] + Prepared 1 package in [TIME] Installed 1 package in [TIME] + hashb-foxglove-protocolbuffers-python==25.3.0.1.20240226043130+465630478360 "###