diff --git a/Cargo.lock b/Cargo.lock index 8340d9b977dad..fb8ce7f1e1e4f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5606,7 +5606,6 @@ dependencies = [ "thiserror 2.0.17", "tokio", "tokio-util", - "tracing", "url", "uv-cache", "uv-client", diff --git a/crates/uv-bin-install/Cargo.toml b/crates/uv-bin-install/Cargo.toml index ed92c8c4e43fc..b9db98653647d 100644 --- a/crates/uv-bin-install/Cargo.toml +++ b/crates/uv-bin-install/Cargo.toml @@ -33,5 +33,4 @@ tempfile = { workspace = true } thiserror = { workspace = true } tokio = { workspace = true } tokio-util = { workspace = true } -tracing = { workspace = true } url = { workspace = true } diff --git a/crates/uv-bin-install/src/lib.rs b/crates/uv-bin-install/src/lib.rs index dd3107aa3fe3b..b73feb07947c7 100644 --- a/crates/uv-bin-install/src/lib.rs +++ b/crates/uv-bin-install/src/lib.rs @@ -6,21 +6,18 @@ use std::path::PathBuf; use std::pin::Pin; use std::task::{Context, Poll}; -use std::time::{Duration, SystemTime}; use futures::TryStreamExt; -use reqwest_retry::RetryPolicy; use reqwest_retry::policies::ExponentialBackoff; use std::fmt; use thiserror::Error; use tokio::io::{AsyncRead, ReadBuf}; use tokio_util::compat::FuturesAsyncReadCompatExt; -use tracing::debug; use url::Url; use uv_distribution_filename::SourceDistExtension; use uv_cache::{Cache, CacheBucket, CacheEntry, Error as CacheError}; -use uv_client::{BaseClient, is_transient_network_error}; +use uv_client::{BaseClient, RetryState}; use uv_extract::{Error as ExtractError, stream}; use uv_pep440::Version; use uv_platform::Platform; @@ -141,7 +138,7 @@ pub enum Error { #[error("Failed to detect platform")] Platform(#[from] uv_platform::Error), - #[error("Attempt failed after {retries} {subject}", subject = if *retries > 1 { "retries" } else { "retry" })] + #[error("Request failed after {retries} {subject}", subject = if *retries > 1 { "retries" } else { "retry" })] RetriedError { #[source] err: Box, @@ -150,13 +147,15 @@ pub enum Error { } impl Error { - /// Return the number of attempts that were made to complete this request before this error was - /// returned. Note that e.g. 3 retries equates to 4 attempts. - fn attempts(&self) -> u32 { + /// Return the number of retries that were made to complete this request before this error was + /// returned. + /// + /// Note that e.g. 3 retries equates to 4 attempts. + fn retries(&self) -> u32 { if let Self::RetriedError { retries, .. } = self { - return retries + 1; + return *retries; } - 1 + 0 } } @@ -242,9 +241,7 @@ async fn download_and_unpack_with_retry( download_url: &Url, cache_entry: &CacheEntry, ) -> Result { - let mut total_attempts = 0; - let mut retried_here = false; - let start_time = SystemTime::now(); + let mut retry_state = RetryState::start(*retry_policy, download_url.clone()); loop { let result = download_and_unpack( @@ -260,40 +257,23 @@ async fn download_and_unpack_with_retry( ) .await; - let result = match result { - Ok(path) => Ok(path), + match result { + Ok(path) => return Ok(path), Err(err) => { - total_attempts += err.attempts(); - let past_retries = total_attempts - 1; - - if is_transient_network_error(&err) { - let retry_decision = retry_policy.should_retry(start_time, past_retries); - if let reqwest_retry::RetryDecision::Retry { execute_after } = retry_decision { - debug!( - "Transient failure while installing {} {}; retrying...", - binary.name(), - version - ); - let duration = execute_after - .duration_since(SystemTime::now()) - .unwrap_or_else(|_| Duration::default()); - tokio::time::sleep(duration).await; - retried_here = true; - continue; - } + if let Some(backoff) = retry_state.should_retry(&err, err.retries()) { + retry_state.sleep_backoff(backoff).await; + continue; } - - if retried_here { + return if retry_state.total_retries() > 0 { Err(Error::RetriedError { err: Box::new(err), - retries: past_retries, + retries: retry_state.total_retries(), }) } else { Err(err) - } + }; } - }; - return result; + } } } diff --git a/crates/uv-client/src/base_client.rs b/crates/uv-client/src/base_client.rs index dd60b65a940da..00ab7d30e250c 100644 --- a/crates/uv-client/src/base_client.rs +++ b/crates/uv-client/src/base_client.rs @@ -4,7 +4,7 @@ use std::fmt::Write; use std::num::ParseIntError; use std::path::Path; use std::sync::Arc; -use std::time::Duration; +use std::time::{Duration, SystemTime}; use std::{env, io, iter}; use anyhow::anyhow; @@ -20,7 +20,7 @@ use reqwest::{Client, ClientBuilder, IntoUrl, Proxy, Request, Response, multipar use reqwest_middleware::{ClientWithMiddleware, Middleware}; use reqwest_retry::policies::ExponentialBackoff; use reqwest_retry::{ - RetryTransientMiddleware, Retryable, RetryableStrategy, default_on_request_error, + RetryPolicy, RetryTransientMiddleware, Retryable, RetryableStrategy, default_on_request_error, default_on_request_success, }; use thiserror::Error; @@ -1143,8 +1143,77 @@ fn retryable_on_request_failure(err: &(dyn Error + 'static)) -> Option bool { - retryable_on_request_failure(err) == Some(Retryable::Transient) +/// Per-request retry state and policy. +pub struct RetryState { + retry_policy: ExponentialBackoff, + start_time: SystemTime, + total_retries: u32, + url: DisplaySafeUrl, +} + +impl RetryState { + /// Initialize the [`RetryState`] and start the backoff timer. + pub fn start(retry_policy: ExponentialBackoff, url: impl Into) -> Self { + Self { + retry_policy, + start_time: SystemTime::now(), + total_retries: 0, + url: url.into(), + } + } + + /// The number of retries across all requests. + /// + /// After a failed retryable request, this equals the maximum number of retries. + pub fn total_retries(&self) -> u32 { + self.total_retries + } + + /// Determines whether request should be retried. + /// + /// Takes the number of retries from nested layers associated with the specific `err` type as + /// `error_retries`. + /// + /// Returns the backoff duration if the request should be retried. + #[must_use] + pub fn should_retry( + &mut self, + err: &(dyn Error + 'static), + error_retries: u32, + ) -> Option { + // If the middleware performed any retries, consider them in our budget. + self.total_retries += error_retries; + match retryable_on_request_failure(err) { + Some(Retryable::Transient) => { + let retry_decision = self + .retry_policy + .should_retry(self.start_time, self.total_retries); + if let reqwest_retry::RetryDecision::Retry { execute_after } = retry_decision { + let duration = execute_after + .duration_since(SystemTime::now()) + .unwrap_or_else(|_| Duration::default()); + + self.total_retries += 1; + return Some(duration); + } + + None + } + Some(Retryable::Fatal) | None => None, + } + } + + /// Wait before retrying the request. + pub async fn sleep_backoff(&self, duration: Duration) { + debug!( + "Transient failure while handling response from {}; retrying after {:.1}s...", + self.url, + duration.as_secs_f32(), + ); + // TODO(konsti): Should we show a spinner plus a message in the CLI while + // waiting? + tokio::time::sleep(duration).await; + } } /// Whether the error is a status code error that is retryable. @@ -1414,7 +1483,7 @@ mod tests { let uv_retry = match response.error_for_status() { Ok(_) => false, - Err(err) => is_transient_network_error(&err), + Err(err) => retryable_on_request_failure(&err) == Some(Retryable::Transient), }; // Ensure we're retrying the same status code as the reqwest_retry crate. We may choose diff --git a/crates/uv-client/src/cached_client.rs b/crates/uv-client/src/cached_client.rs index ff14437f7efec..640567dab10fb 100644 --- a/crates/uv-client/src/cached_client.rs +++ b/crates/uv-client/src/cached_client.rs @@ -1,9 +1,7 @@ -use std::time::{Duration, SystemTime}; use std::{borrow::Cow, path::Path}; use futures::FutureExt; use reqwest::{Request, Response}; -use reqwest_retry::RetryPolicy; use rkyv::util::AlignedVec; use serde::de::DeserializeOwned; use serde::{Deserialize, Serialize}; @@ -14,7 +12,7 @@ use uv_fs::write_atomic; use uv_redacted::DisplaySafeUrl; use crate::BaseClient; -use crate::base_client::is_transient_network_error; +use crate::base_client::RetryState; use crate::error::ProblemDetails; use crate::{ Error, ErrorKind, @@ -692,9 +690,7 @@ impl CachedClient { cache_control: CacheControl<'_>, response_callback: Callback, ) -> Result> { - let mut total_retries = 0; - let start_time = SystemTime::now(); - let retry_policy = self.uncached().retry_policy(); + let mut retry_state = RetryState::start(self.uncached().retry_policy(), req.url().clone()); loop { let fresh_req = req.try_clone().expect("HTTP request must be cloneable"); let result = self @@ -704,34 +700,11 @@ impl CachedClient { match result { Ok(ok) => return Ok(ok), Err(err) => { - // Check if the middleware already performed retries - total_retries += err.retries(); - if is_transient_network_error(err.error()) { - // If middleware already retried, consider that in our retry budget - let retry_decision = retry_policy.should_retry(start_time, total_retries); - if let reqwest_retry::RetryDecision::Retry { execute_after } = - retry_decision - { - let duration = execute_after - .duration_since(SystemTime::now()) - .unwrap_or_else(|_| Duration::default()); - - debug!( - "Transient failure while handling response from {}; retrying after {:.1}s...", - req.url(), - duration.as_secs_f32(), - ); - tokio::time::sleep(duration).await; - total_retries += 1; - continue; - } + if let Some(backoff) = retry_state.should_retry(err.error(), err.retries()) { + retry_state.sleep_backoff(backoff).await; + continue; } - - return if total_retries > 0 { - Err(err.with_retries(total_retries)) - } else { - Err(err) - }; + return Err(err.with_retries(retry_state.total_retries())); } } } @@ -751,47 +724,23 @@ impl CachedClient { cache_control: CacheControl<'_>, response_callback: Callback, ) -> Result> { - let mut past_retries = 0; - let start_time = SystemTime::now(); - let retry_policy = self.uncached().retry_policy(); + let mut retry_state = RetryState::start(self.uncached().retry_policy(), req.url().clone()); loop { let fresh_req = req.try_clone().expect("HTTP request must be cloneable"); let result = self .skip_cache(fresh_req, cache_entry, cache_control, &response_callback) .await; - // Check if the middleware already performed retries - let middleware_retries = match &result { - Err(err) => err.retries(), - _ => 0, - }; - - if result - .as_ref() - .is_err_and(|err| is_transient_network_error(err.error())) - { - let total_retries = past_retries + middleware_retries; - let retry_decision = retry_policy.should_retry(start_time, total_retries); - if let reqwest_retry::RetryDecision::Retry { execute_after } = retry_decision { - let duration = execute_after - .duration_since(SystemTime::now()) - .unwrap_or_else(|_| Duration::default()); - debug!( - "Transient failure while handling response from {}; retrying after {}s...", - req.url(), - duration.as_secs(), - ); - tokio::time::sleep(duration).await; - past_retries += 1; - continue; + match result { + Ok(ok) => return Ok(ok), + Err(err) => { + if let Some(backoff) = retry_state.should_retry(err.error(), err.retries()) { + retry_state.sleep_backoff(backoff).await; + continue; + } + return Err(err.with_retries(retry_state.total_retries())); } } - - if past_retries > 0 { - return result.map_err(|err| err.with_retries(past_retries)); - } - - return result; } } } diff --git a/crates/uv-client/src/lib.rs b/crates/uv-client/src/lib.rs index d74700e04be82..220e3f35ab990 100644 --- a/crates/uv-client/src/lib.rs +++ b/crates/uv-client/src/lib.rs @@ -1,7 +1,7 @@ pub use base_client::{ AuthIntegration, BaseClient, BaseClientBuilder, DEFAULT_MAX_REDIRECTS, DEFAULT_RETRIES, ExtraMiddleware, RedirectClientWithMiddleware, RedirectPolicy, RequestBuilder, - RetryParsingError, UvRetryableStrategy, is_transient_network_error, + RetryParsingError, RetryState, UvRetryableStrategy, }; pub use cached_client::{CacheControl, CachedClient, CachedClientError, DataWithCachePolicy}; pub use error::{Error, ErrorKind, WrappedReqwestError}; diff --git a/crates/uv-publish/src/lib.rs b/crates/uv-publish/src/lib.rs index b743033bcc672..78879195bc03c 100644 --- a/crates/uv-publish/src/lib.rs +++ b/crates/uv-publish/src/lib.rs @@ -3,7 +3,6 @@ mod trusted_publishing; use std::collections::BTreeSet; use std::path::{Path, PathBuf}; use std::sync::Arc; -use std::time::{Duration, SystemTime}; use std::{fmt, io}; use fs_err::tokio::File; @@ -13,7 +12,7 @@ use itertools::Itertools; use reqwest::header::{AUTHORIZATION, LOCATION, ToStrError}; use reqwest::multipart::Part; use reqwest::{Body, Response, StatusCode}; -use reqwest_retry::RetryPolicy; +use reqwest_retry::RetryError; use reqwest_retry::policies::ExponentialBackoff; use rustc_hash::FxHashMap; use serde::Deserialize; @@ -28,7 +27,7 @@ use uv_auth::{Credentials, PyxTokenStore, Realm}; use uv_cache::{Cache, Refresh}; use uv_client::{ BaseClient, DEFAULT_MAX_REDIRECTS, MetadataFormat, OwnedArchive, RegistryClientBuilder, - RequestBuilder, RetryParsingError, is_transient_network_error, + RequestBuilder, RetryParsingError, RetryState, }; use uv_configuration::{KeyringProviderType, TrustedPublishing}; use uv_distribution_filename::{DistFilename, SourceDistExtension, SourceDistFilename}; @@ -474,11 +473,10 @@ pub async fn upload( download_concurrency: &Semaphore, reporter: Arc, ) -> Result { - let mut n_past_retries = 0; let mut n_past_redirections = 0; let max_redirects = DEFAULT_MAX_REDIRECTS; - let start_time = SystemTime::now(); let mut current_registry = registry.clone(); + let mut retry_state = RetryState::start(retry_policy, registry.clone()); loop { let (request, idx) = build_upload_request( @@ -553,23 +551,17 @@ pub async fn upload( response } Err(err) => { - if is_transient_network_error(&err) { - let retry_decision = retry_policy.should_retry(start_time, n_past_retries); - if let reqwest_retry::RetryDecision::Retry { execute_after } = retry_decision { - let duration = execute_after - .duration_since(SystemTime::now()) - .unwrap_or_else(|_| Duration::default()); - warn_user!( - "Transient failure while handling response for {}; retrying after {}s...", - current_registry, - duration.as_secs() - ); - tokio::time::sleep(duration).await; - n_past_retries += 1; - continue; - } + let middleware_retries = if let Some(RetryError::WithRetries { retries, .. }) = + (&err as &dyn std::error::Error).downcast_ref::() + { + *retries + } else { + 0 + }; + if let Some(backoff) = retry_state.should_retry(&err, middleware_retries) { + retry_state.sleep_backoff(backoff).await; + continue; } - return Err(PublishError::PublishSend( group.file.clone(), current_registry.clone().into(), diff --git a/crates/uv-python/src/downloads.rs b/crates/uv-python/src/downloads.rs index 9c687e6881ccd..95b1654a0ad00 100644 --- a/crates/uv-python/src/downloads.rs +++ b/crates/uv-python/src/downloads.rs @@ -5,14 +5,13 @@ use std::path::{Path, PathBuf}; use std::pin::Pin; use std::str::FromStr; use std::task::{Context, Poll}; -use std::time::{Duration, SystemTime}; use std::{env, io}; use futures::TryStreamExt; use itertools::Itertools; use owo_colors::OwoColorize; +use reqwest_retry::RetryError; use reqwest_retry::policies::ExponentialBackoff; -use reqwest_retry::{RetryError, RetryPolicy}; use serde::Deserialize; use thiserror::Error; use tokio::io::{AsyncRead, AsyncReadExt, AsyncWriteExt, BufWriter, ReadBuf}; @@ -21,7 +20,7 @@ use tokio_util::either::Either; use tracing::{debug, instrument}; use url::Url; -use uv_client::{BaseClient, WrappedReqwestError, is_transient_network_error}; +use uv_client::{BaseClient, RetryState, WrappedReqwestError}; use uv_distribution_filename::{ExtensionError, SourceDistExtension}; use uv_extract::hash::Hasher; use uv_fs::{Simplified, rename_with_retry}; @@ -118,26 +117,24 @@ pub enum Error { } impl Error { - // Return the number of attempts that were made to complete this request before this error was - // returned. Note that e.g. 3 retries equates to 4 attempts. + // Return the number of retries that were made to complete this request before this error was + // returned. // - // It's easier to do arithmetic with "attempts" instead of "retries", because if you have - // nested retry loops you can just add up all the attempts directly, while adding up the - // retries requires +1/-1 adjustments. - fn attempts(&self) -> u32 { + // Note that e.g. 3 retries equates to 4 attempts. + fn retries(&self) -> u32 { // Unfortunately different variants of `Error` track retry counts in different ways. We // could consider unifying the variants we handle here in `Error::from_reqwest_middleware` // instead, but both approaches will be fragile as new variants get added over time. if let Self::NetworkErrorWithRetries { retries, .. } = self { - return retries + 1; + return *retries; } if let Self::NetworkMiddlewareError(_, anyhow_error) = self && let Some(RetryError::WithRetries { retries, .. }) = anyhow_error.downcast_ref::() { - return retries + 1; + return *retries; } - 1 + 0 } } @@ -1108,9 +1105,11 @@ impl ManagedPythonDownload { pypy_install_mirror: Option<&str>, reporter: Option<&dyn Reporter>, ) -> Result { - let mut total_attempts = 0; - let mut retried_here = false; - let start_time = SystemTime::now(); + let mut retry_state = RetryState::start( + *retry_policy, + self.download_url(python_install_mirror, pypy_install_mirror)?, + ); + loop { let result = self .fetch( @@ -1123,43 +1122,23 @@ impl ManagedPythonDownload { reporter, ) .await; - let result = match result { - Ok(download_result) => Ok(download_result), + match result { + Ok(download_result) => return Ok(download_result), Err(err) => { - // Inner retry loops (e.g. `reqwest-retry` middleware) might make more than one - // attempt per error we see here. - total_attempts += err.attempts(); - // We currently interpret e.g. "3 retries" to mean we should make 4 attempts. - let n_past_retries = total_attempts - 1; - if is_transient_network_error(&err) { - let retry_decision = retry_policy.should_retry(start_time, n_past_retries); - if let reqwest_retry::RetryDecision::Retry { execute_after } = - retry_decision - { - let duration = execute_after - .duration_since(SystemTime::now()) - .unwrap_or_else(|_| Duration::default()); - debug!( - "Transient failure while handling response for {}; retrying after {}s...", - self.key(), - duration.as_secs() - ); - tokio::time::sleep(duration).await; - retried_here = true; - continue; // Retry. - } + if let Some(backoff) = retry_state.should_retry(&err, err.retries()) { + retry_state.sleep_backoff(backoff).await; + continue; } - if retried_here { + return if retry_state.total_retries() > 0 { Err(Error::NetworkErrorWithRetries { err: Box::new(err), - retries: n_past_retries, + retries: retry_state.total_retries(), }) } else { Err(err) - } + }; } }; - return result; } } diff --git a/crates/uv-redacted/src/lib.rs b/crates/uv-redacted/src/lib.rs index ddd1a5dfcf682..9a5adf2809f48 100644 --- a/crates/uv-redacted/src/lib.rs +++ b/crates/uv-redacted/src/lib.rs @@ -265,6 +265,12 @@ impl From for Url { } } +impl From for DisplaySafeUrl { + fn from(url: Url) -> Self { + Self(url) + } +} + impl FromStr for DisplaySafeUrl { type Err = DisplaySafeUrlError;