Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion crates/uv-bin-install/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,5 +33,4 @@ tempfile = { workspace = true }
thiserror = { workspace = true }
tokio = { workspace = true }
tokio-util = { workspace = true }
tracing = { workspace = true }
url = { workspace = true }
58 changes: 19 additions & 39 deletions crates/uv-bin-install/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,21 +6,18 @@
use std::path::PathBuf;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::{Duration, SystemTime};

use futures::TryStreamExt;
use reqwest_retry::RetryPolicy;
use reqwest_retry::policies::ExponentialBackoff;
use std::fmt;
use thiserror::Error;
use tokio::io::{AsyncRead, ReadBuf};
use tokio_util::compat::FuturesAsyncReadCompatExt;
use tracing::debug;
use url::Url;
use uv_distribution_filename::SourceDistExtension;

use uv_cache::{Cache, CacheBucket, CacheEntry, Error as CacheError};
use uv_client::{BaseClient, is_transient_network_error};
use uv_client::{BaseClient, RetryState};
use uv_extract::{Error as ExtractError, stream};
use uv_pep440::Version;
use uv_platform::Platform;
Expand Down Expand Up @@ -141,7 +138,7 @@ pub enum Error {
#[error("Failed to detect platform")]
Platform(#[from] uv_platform::Error),

#[error("Attempt failed after {retries} {subject}", subject = if *retries > 1 { "retries" } else { "retry" })]
#[error("Request failed after {retries} {subject}", subject = if *retries > 1 { "retries" } else { "retry" })]
RetriedError {
#[source]
err: Box<Error>,
Expand All @@ -150,13 +147,15 @@ pub enum Error {
}

impl Error {
/// Return the number of attempts that were made to complete this request before this error was
/// returned. Note that e.g. 3 retries equates to 4 attempts.
fn attempts(&self) -> u32 {
/// Return the number of retries that were made to complete this request before this error was
/// returned.
///
/// Note that e.g. 3 retries equates to 4 attempts.
fn retries(&self) -> u32 {
if let Self::RetriedError { retries, .. } = self {
return retries + 1;
return *retries;
}
1
0
}
}

Expand Down Expand Up @@ -242,9 +241,7 @@ async fn download_and_unpack_with_retry(
download_url: &Url,
cache_entry: &CacheEntry,
) -> Result<PathBuf, Error> {
let mut total_attempts = 0;
let mut retried_here = false;
let start_time = SystemTime::now();
let mut retry_state = RetryState::start(*retry_policy, download_url.clone());

loop {
let result = download_and_unpack(
Expand All @@ -260,40 +257,23 @@ async fn download_and_unpack_with_retry(
)
.await;

let result = match result {
Ok(path) => Ok(path),
match result {
Ok(path) => return Ok(path),
Err(err) => {
total_attempts += err.attempts();
let past_retries = total_attempts - 1;

if is_transient_network_error(&err) {
let retry_decision = retry_policy.should_retry(start_time, past_retries);
if let reqwest_retry::RetryDecision::Retry { execute_after } = retry_decision {
debug!(
"Transient failure while installing {} {}; retrying...",
binary.name(),
version
);
let duration = execute_after
.duration_since(SystemTime::now())
.unwrap_or_else(|_| Duration::default());
tokio::time::sleep(duration).await;
retried_here = true;
continue;
}
if let Some(backoff) = retry_state.should_retry(&err, err.retries()) {
retry_state.sleep_backoff(backoff).await;
continue;
}

if retried_here {
return if retry_state.total_retries() > 0 {
Err(Error::RetriedError {
err: Box::new(err),
retries: past_retries,
retries: retry_state.total_retries(),
})
} else {
Err(err)
}
};
}
};
return result;
}
}
}

Expand Down
79 changes: 74 additions & 5 deletions crates/uv-client/src/base_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use std::fmt::Write;
use std::num::ParseIntError;
use std::path::Path;
use std::sync::Arc;
use std::time::Duration;
use std::time::{Duration, SystemTime};
use std::{env, io, iter};

use anyhow::anyhow;
Expand All @@ -20,7 +20,7 @@ use reqwest::{Client, ClientBuilder, IntoUrl, Proxy, Request, Response, multipar
use reqwest_middleware::{ClientWithMiddleware, Middleware};
use reqwest_retry::policies::ExponentialBackoff;
use reqwest_retry::{
RetryTransientMiddleware, Retryable, RetryableStrategy, default_on_request_error,
RetryPolicy, RetryTransientMiddleware, Retryable, RetryableStrategy, default_on_request_error,
default_on_request_success,
};
use thiserror::Error;
Expand Down Expand Up @@ -1143,8 +1143,77 @@ fn retryable_on_request_failure(err: &(dyn Error + 'static)) -> Option<Retryable
None
}

pub fn is_transient_network_error(err: &(dyn Error + 'static)) -> bool {
retryable_on_request_failure(err) == Some(Retryable::Transient)
/// Per-request retry state and policy.
pub struct RetryState {
retry_policy: ExponentialBackoff,
start_time: SystemTime,
total_retries: u32,
url: DisplaySafeUrl,
}

impl RetryState {
/// Initialize the [`RetryState`] and start the backoff timer.
pub fn start(retry_policy: ExponentialBackoff, url: impl Into<DisplaySafeUrl>) -> Self {
Self {
retry_policy,
start_time: SystemTime::now(),
total_retries: 0,
url: url.into(),
}
}

/// The number of retries across all requests.
///
/// After a failed retryable request, this equals the maximum number of retries.
pub fn total_retries(&self) -> u32 {
self.total_retries
}

/// Determines whether request should be retried.
///
/// Takes the number of retries from nested layers associated with the specific `err` type as
/// `error_retries`.
///
/// Returns the backoff duration if the request should be retried.
#[must_use]
pub fn should_retry(
&mut self,
err: &(dyn Error + 'static),
error_retries: u32,
) -> Option<Duration> {
// If the middleware performed any retries, consider them in our budget.
self.total_retries += error_retries;
match retryable_on_request_failure(err) {
Some(Retryable::Transient) => {
let retry_decision = self
.retry_policy
.should_retry(self.start_time, self.total_retries);
if let reqwest_retry::RetryDecision::Retry { execute_after } = retry_decision {
let duration = execute_after
.duration_since(SystemTime::now())
.unwrap_or_else(|_| Duration::default());

self.total_retries += 1;
return Some(duration);
}

None
}
Some(Retryable::Fatal) | None => None,
}
}

/// Wait before retrying the request.
pub async fn sleep_backoff(&self, duration: Duration) {
debug!(
"Transient failure while handling response from {}; retrying after {:.1}s...",
self.url,
duration.as_secs_f32(),
);
// TODO(konsti): Should we show a spinner plus a message in the CLI while
// waiting?
tokio::time::sleep(duration).await;
}
}

/// Whether the error is a status code error that is retryable.
Expand Down Expand Up @@ -1414,7 +1483,7 @@ mod tests {

let uv_retry = match response.error_for_status() {
Ok(_) => false,
Err(err) => is_transient_network_error(&err),
Err(err) => retryable_on_request_failure(&err) == Some(Retryable::Transient),
};

// Ensure we're retrying the same status code as the reqwest_retry crate. We may choose
Expand Down
81 changes: 15 additions & 66 deletions crates/uv-client/src/cached_client.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
use std::time::{Duration, SystemTime};
use std::{borrow::Cow, path::Path};

use futures::FutureExt;
use reqwest::{Request, Response};
use reqwest_retry::RetryPolicy;
use rkyv::util::AlignedVec;
use serde::de::DeserializeOwned;
use serde::{Deserialize, Serialize};
Expand All @@ -14,7 +12,7 @@ use uv_fs::write_atomic;
use uv_redacted::DisplaySafeUrl;

use crate::BaseClient;
use crate::base_client::is_transient_network_error;
use crate::base_client::RetryState;
use crate::error::ProblemDetails;
use crate::{
Error, ErrorKind,
Expand Down Expand Up @@ -692,9 +690,7 @@ impl CachedClient {
cache_control: CacheControl<'_>,
response_callback: Callback,
) -> Result<Payload::Target, CachedClientError<CallBackError>> {
let mut total_retries = 0;
let start_time = SystemTime::now();
let retry_policy = self.uncached().retry_policy();
let mut retry_state = RetryState::start(self.uncached().retry_policy(), req.url().clone());
loop {
let fresh_req = req.try_clone().expect("HTTP request must be cloneable");
let result = self
Expand All @@ -704,34 +700,11 @@ impl CachedClient {
match result {
Ok(ok) => return Ok(ok),
Err(err) => {
// Check if the middleware already performed retries
total_retries += err.retries();
if is_transient_network_error(err.error()) {
// If middleware already retried, consider that in our retry budget
let retry_decision = retry_policy.should_retry(start_time, total_retries);
if let reqwest_retry::RetryDecision::Retry { execute_after } =
retry_decision
{
let duration = execute_after
.duration_since(SystemTime::now())
.unwrap_or_else(|_| Duration::default());

debug!(
"Transient failure while handling response from {}; retrying after {:.1}s...",
req.url(),
duration.as_secs_f32(),
);
tokio::time::sleep(duration).await;
total_retries += 1;
continue;
}
if let Some(backoff) = retry_state.should_retry(err.error(), err.retries()) {
retry_state.sleep_backoff(backoff).await;
continue;
}

return if total_retries > 0 {
Err(err.with_retries(total_retries))
} else {
Err(err)
};
return Err(err.with_retries(retry_state.total_retries()));
}
}
}
Expand All @@ -751,47 +724,23 @@ impl CachedClient {
cache_control: CacheControl<'_>,
response_callback: Callback,
) -> Result<Payload, CachedClientError<CallBackError>> {
let mut past_retries = 0;
let start_time = SystemTime::now();
let retry_policy = self.uncached().retry_policy();
let mut retry_state = RetryState::start(self.uncached().retry_policy(), req.url().clone());
loop {
let fresh_req = req.try_clone().expect("HTTP request must be cloneable");
let result = self
.skip_cache(fresh_req, cache_entry, cache_control, &response_callback)
.await;

// Check if the middleware already performed retries
let middleware_retries = match &result {
Err(err) => err.retries(),
_ => 0,
};

if result
.as_ref()
.is_err_and(|err| is_transient_network_error(err.error()))
{
let total_retries = past_retries + middleware_retries;
let retry_decision = retry_policy.should_retry(start_time, total_retries);
if let reqwest_retry::RetryDecision::Retry { execute_after } = retry_decision {
let duration = execute_after
.duration_since(SystemTime::now())
.unwrap_or_else(|_| Duration::default());
debug!(
"Transient failure while handling response from {}; retrying after {}s...",
req.url(),
duration.as_secs(),
);
tokio::time::sleep(duration).await;
past_retries += 1;
continue;
match result {
Ok(ok) => return Ok(ok),
Err(err) => {
if let Some(backoff) = retry_state.should_retry(err.error(), err.retries()) {
retry_state.sleep_backoff(backoff).await;
continue;
}
return Err(err.with_retries(retry_state.total_retries()));
}
}

if past_retries > 0 {
return result.map_err(|err| err.with_retries(past_retries));
}

return result;
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion crates/uv-client/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
pub use base_client::{
AuthIntegration, BaseClient, BaseClientBuilder, DEFAULT_MAX_REDIRECTS, DEFAULT_RETRIES,
ExtraMiddleware, RedirectClientWithMiddleware, RedirectPolicy, RequestBuilder,
RetryParsingError, UvRetryableStrategy, is_transient_network_error,
RetryParsingError, RetryState, UvRetryableStrategy,
};
pub use cached_client::{CacheControl, CachedClient, CachedClientError, DataWithCachePolicy};
pub use error::{Error, ErrorKind, WrappedReqwestError};
Expand Down
Loading
Loading