Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 14 additions & 1 deletion crates/uv-bin-install/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use std::path::PathBuf;
use std::pin::Pin;
use std::str::FromStr;
use std::task::{Context, Poll};
use std::time::{Duration, SystemTimeError};

use futures::{StreamExt, TryStreamExt};
use reqwest_retry::policies::ExponentialBackoff;
Expand Down Expand Up @@ -248,11 +249,16 @@ pub enum Error {
#[error("Failed to detect platform")]
Platform(#[from] uv_platform::Error),

#[error("Request failed after {retries} {subject}", subject = if *retries > 1 { "retries" } else { "retry" })]
#[error(
"Request failed after {retries} {subject} in {duration:.1}s",
subject = if *retries > 1 { "retries" } else { "retry" },
duration = duration.as_secs_f32()
)]
RetriedError {
#[source]
err: Box<Self>,
retries: u32,
duration: Duration,
},

#[error("Failed to fetch version manifest from: {url}")]
Expand Down Expand Up @@ -287,6 +293,9 @@ pub enum Error {

#[error("Unsupported archive format: {0}")]
UnsupportedArchiveFormat(String),

#[error(transparent)]
SystemTime(#[from] SystemTimeError),
}

impl Error {
Expand Down Expand Up @@ -352,6 +361,7 @@ pub async fn find_matching_version(
Err(Error::RetriedError {
err: Box::new(err),
retries: retry_state.total_retries(),
duration: retry_state.duration()?,
})
} else {
Err(err)
Expand Down Expand Up @@ -628,6 +638,7 @@ async fn download_and_unpack_with_retry(
Err(Error::RetriedError {
err: Box::new(err),
retries: retry_state.total_retries(),
duration: retry_state.duration()?,
})
} else {
Err(err)
Expand Down Expand Up @@ -678,6 +689,8 @@ async fn download_and_unpack(
return Err(Error::RetriedError {
err: Box::new(err),
retries,
// This value is overwritten in `download_and_unpack_with_retry`.
duration: Duration::default(),
});
}
return Err(err);
Expand Down
7 changes: 6 additions & 1 deletion crates/uv-client/src/base_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use std::fmt::Write;
use std::num::ParseIntError;
use std::path::Path;
use std::sync::Arc;
use std::time::{Duration, SystemTime};
use std::time::{Duration, SystemTime, SystemTimeError};
use std::{env, io, iter};

use anyhow::anyhow;
Expand Down Expand Up @@ -1304,6 +1304,11 @@ impl RetryState {
self.total_retries
}

/// The total duration from the first request to the (failure) of the last request.
pub fn duration(&self) -> Result<Duration, SystemTimeError> {
self.start_time.elapsed()
}

/// Determines whether request should be retried.
///
/// Takes the number of retries from nested layers associated with the specific `err` type as
Expand Down
55 changes: 44 additions & 11 deletions crates/uv-client/src/cached_client.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use std::time::{Duration, Instant};
use std::{borrow::Cow, path::Path};

use futures::FutureExt;
Expand Down Expand Up @@ -105,15 +106,27 @@ pub enum CachedClientError<CallbackError: std::error::Error + 'static> {
Client(Error),
/// Track retries before a callback explicitly, as we can't attach them to the callback error
/// type.
Callback { retries: u32, err: CallbackError },
Callback {
retries: u32,
err: CallbackError,
duration: Duration,
},
}

impl<CallbackError: std::error::Error + 'static> CachedClientError<CallbackError> {
/// Attach the combined number of retries to the error context, discarding the previous count.
fn with_retries(self, retries: u32) -> Self {
match self {
Self::Client(err) => Self::Client(err.with_retries(retries)),
Self::Callback { retries: _, err } => Self::Callback { retries, err },
Self::Callback {
retries: _,
err,
duration,
} => Self::Callback {
retries,
err,
duration,
},
}
}

Expand Down Expand Up @@ -151,9 +164,11 @@ impl<E: Into<Self> + std::error::Error + 'static> From<CachedClientError<E>> for
fn from(error: CachedClientError<E>) -> Self {
match error {
CachedClientError::Client(error) => error,
CachedClientError::Callback { retries, err } => {
Self::new(err.into().into_kind(), retries)
}
CachedClientError::Callback {
retries,
err,
duration,
} => Self::new(err.into().into_kind(), retries, duration),
}
}
}
Expand Down Expand Up @@ -261,6 +276,7 @@ impl CachedClient {
response_callback: Callback,
) -> Result<Payload::Target, CachedClientError<CallBackError>> {
let fresh_req = req.try_clone().expect("HTTP request must be cloneable");
let start = Instant::now();
let cached_response = if let Some(cached) = Self::read_cache(cache_entry).await {
self.send_cached(req, cache_control, cached)
.boxed_local()
Expand Down Expand Up @@ -339,6 +355,7 @@ impl CachedClient {
self.run_response_callback(
cache_entry,
cache_policy,
start,
response,
response_callback,
)
Expand All @@ -360,10 +377,11 @@ impl CachedClient {
cache_control: CacheControl<'_>,
response_callback: Callback,
) -> Result<Payload, CachedClientError<CallBackError>> {
let start = Instant::now();
let (response, cache_policy) = self.fresh_request(req, cache_control).await?;

let payload = self
.run_response_callback(cache_entry, cache_policy, response, async |resp| {
.run_response_callback(cache_entry, cache_policy, start, response, async |resp| {
let payload = response_callback(resp).await?;
Ok(SerdeCacheable { inner: payload })
})
Expand All @@ -384,9 +402,16 @@ impl CachedClient {
response_callback: Callback,
) -> Result<Payload::Target, CachedClientError<CallBackError>> {
let _ = fs_err::tokio::remove_file(&cache_entry.path()).await;
let start = Instant::now();
let (response, cache_policy) = self.fresh_request(req, cache_control).await?;
self.run_response_callback(cache_entry, cache_policy, response, response_callback)
.await
self.run_response_callback(
cache_entry,
cache_policy,
start,
response,
response_callback,
)
.await
}

async fn run_response_callback<
Expand All @@ -397,6 +422,7 @@ impl CachedClient {
&self,
cache_entry: &CacheEntry,
cache_policy: Option<Box<CachePolicy>>,
start: Instant,
response: Response,
response_callback: Callback,
) -> Result<Payload::Target, CachedClientError<CallBackError>> {
Expand All @@ -410,7 +436,11 @@ impl CachedClient {
let data = response_callback(response)
.boxed_local()
.await
.map_err(|err| CachedClientError::Callback { retries, err })?;
.map_err(|err| CachedClientError::Callback {
retries,
err,
duration: start.elapsed(),
})?;
let Some(cache_policy) = cache_policy else {
return Ok(data.into_target());
};
Expand Down Expand Up @@ -516,12 +546,13 @@ impl CachedClient {
) -> Result<CachedResponse, Error> {
let url = DisplaySafeUrl::from_url(req.url().clone());
debug!("Sending revalidation request for: {url}");
let start = Instant::now();
let mut response = self
.0
.execute(req)
.instrument(info_span!("revalidation_request", url = url.as_str()))
.await
.map_err(|err| Error::from_reqwest_middleware(url.clone(), err))?;
.map_err(|err| Error::from_reqwest_middleware(url.clone(), err, start))?;
trace!(
"Received response for revalidation request with status {} for: {}",
response.status(),
Expand Down Expand Up @@ -581,11 +612,12 @@ impl CachedClient {
let url = DisplaySafeUrl::from_url(req.url().clone());
debug!("Sending fresh {} request for: {}", req.method(), url);
let cache_policy_builder = CachePolicyBuilder::new(&req);
let start = Instant::now();
let mut response = self
.0
.execute(req)
.await
.map_err(|err| Error::from_reqwest_middleware(url.clone(), err))?;
.map_err(|err| Error::from_reqwest_middleware(url.clone(), err, start))?;
trace!(
"Received response for fresh request with status {} for: {}",
response.status(),
Expand All @@ -611,6 +643,7 @@ impl CachedClient {
return Err(Error::new(
ErrorKind::from_reqwest_with_problem_details(url, status_error, problem_details),
retry_count.unwrap_or_default(),
start.elapsed(),
));
}

Expand Down
26 changes: 20 additions & 6 deletions crates/uv-client/src/error.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
use std::fmt::{Display, Formatter};
use std::ops::Deref;
use std::path::PathBuf;
use std::time::{Duration, Instant};

use async_http_range_reader::AsyncHttpRangeReaderError;
use async_zip::error::ZipError;
use reqwest::Response;
use serde::Deserialize;
use std::fmt::{Display, Formatter};
use std::ops::Deref;
use std::path::PathBuf;
use tracing::warn;

use uv_cache::Error as CacheError;
Expand Down Expand Up @@ -114,16 +116,18 @@ impl ProblemDetails {
pub struct Error {
kind: Box<ErrorKind>,
retries: u32,
duration: Duration,
}

impl Display for Error {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
if self.retries > 0 {
write!(
f,
"Request failed after {retries} {subject}",
"Request failed after {retries} {subject} in {duration:.1}s",
retries = self.retries,
subject = if self.retries > 1 { "retries" } else { "retry" }
subject = if self.retries > 1 { "retries" } else { "retry" },
duration = self.duration.as_secs_f32(),
)
} else {
Display::fmt(&self.kind, f)
Expand All @@ -143,10 +147,11 @@ impl std::error::Error for Error {

impl Error {
/// Create a new [`Error`] with the given [`ErrorKind`] and number of retries.
pub fn new(kind: ErrorKind, retries: u32) -> Self {
pub fn new(kind: ErrorKind, retries: u32, duration: Duration) -> Self {
Self {
kind: Box::new(kind),
retries,
duration,
}
}

Expand All @@ -155,6 +160,12 @@ impl Error {
self.retries
}

/// Return the time taken for network requests, including retries, backoff and jitter,
/// before this error was returned.
pub fn duration(&self) -> Duration {
self.duration
}

/// Convert this error into an [`ErrorKind`].
pub fn into_kind(self) -> ErrorKind {
*self.kind
Expand Down Expand Up @@ -189,6 +200,7 @@ impl Error {
pub(crate) fn from_reqwest_middleware(
url: DisplaySafeUrl,
err: reqwest_middleware::Error,
start: Instant,
) -> Self {
if let reqwest_middleware::Error::Middleware(ref underlying) = err {
if let Some(offline_err) = underlying.downcast_ref::<OfflineError>() {
Expand All @@ -201,6 +213,7 @@ impl Error {
return Self::new(
ErrorKind::WrappedReqwestError(url, WrappedReqwestError::from(err)),
retries,
start.elapsed(),
);
}
}
Expand Down Expand Up @@ -316,6 +329,7 @@ impl From<ErrorKind> for Error {
Self {
kind: Box::new(kind),
retries: 0,
duration: Duration::default(),
}
}
}
Expand Down
16 changes: 14 additions & 2 deletions crates/uv-python/src/downloads.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use std::path::{Path, PathBuf};
use std::pin::Pin;
use std::str::FromStr;
use std::task::{Context, Poll};
use std::time::{Duration, Instant, SystemTimeError};
use std::{env, io};

use futures::TryStreamExt;
Expand Down Expand Up @@ -54,11 +55,16 @@ pub enum Error {
TooManyParts(String),
#[error("Failed to download {0}")]
NetworkError(DisplaySafeUrl, #[source] WrappedReqwestError),
#[error("Request failed after {retries} {subject}", subject = if *retries > 1 { "retries" } else { "retry" })]
#[error(
"Request failed after {retries} {subject} in {duration:.1}s",
subject = if *retries > 1 { "retries" } else { "retry" },
duration = duration.as_secs_f32()
)]
NetworkErrorWithRetries {
#[source]
err: Box<Self>,
retries: u32,
duration: Duration,
},
#[error("Failed to download {0}")]
NetworkMiddlewareError(DisplaySafeUrl, #[source] anyhow::Error),
Expand Down Expand Up @@ -116,6 +122,8 @@ pub enum Error {
BuildVersion(#[from] BuildVersionError),
#[error("No download URL found for Python")]
NoPythonDownloadUrlFound,
#[error(transparent)]
SystemTime(#[from] SystemTimeError),
}

impl Error {
Expand Down Expand Up @@ -1183,6 +1191,7 @@ impl ManagedPythonDownload {
Err(Error::NetworkErrorWithRetries {
err: Box::new(err),
retries: retry_state.total_retries(),
duration: retry_state.duration()?,
})
} else {
Err(err)
Expand Down Expand Up @@ -1698,12 +1707,14 @@ impl Error {
url: DisplaySafeUrl,
err: reqwest::Error,
retries: Option<u32>,
start: Instant,
) -> Self {
let err = Self::NetworkError(url, WrappedReqwestError::from(err));
if let Some(retries) = retries {
Self::NetworkErrorWithRetries {
err: Box::new(err),
retries,
duration: start.elapsed(),
}
} else {
err
Expand Down Expand Up @@ -1815,6 +1826,7 @@ async fn read_url(

Ok((Either::Left(reader), Some(size)))
} else {
let start = Instant::now();
let response = client
.for_host(url)
.get(Url::from(url.clone()))
Expand All @@ -1830,7 +1842,7 @@ async fn read_url(
// Check the status code.
let response = response
.error_for_status()
.map_err(|err| Error::from_reqwest(url.clone(), err, retry_count))?;
.map_err(|err| Error::from_reqwest(url.clone(), err, retry_count, start))?;

let size = response.content_length();
let stream = response
Expand Down
Loading
Loading