Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
156 changes: 63 additions & 93 deletions crates/uv-client/src/cached_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,90 +119,62 @@ where
}
}

/// Dispatch type: Either a cached client error or a (user specified) error from the callback
/// Dispatch type: Either a cached client error or a (user specified) error from the callback.
#[derive(Debug)]
pub enum CachedClientError<CallbackError: std::error::Error + 'static> {
Client {
retries: Option<u32>,
err: Error,
},
Callback {
retries: Option<u32>,
err: CallbackError,
},
/// The client tracks retries internally.
Client(Error),
/// Track retries before a callback explicitly, as we can't attach them to the callback error
/// type.
Callback { retries: u32, err: CallbackError },
Copy link
Copy Markdown
Member Author

@konstin konstin Dec 15, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed the optional as zero retries is indistinguishable from None.

}

impl<CallbackError: std::error::Error + 'static> CachedClientError<CallbackError> {
/// Attach the number of retries to the error context.
///
/// Adds to existing errors if any, in case different layers retried.
/// Attach the combined number of retries to the error context, discarding the previous count.
fn with_retries(self, retries: u32) -> Self {
match self {
Self::Client {
retries: existing_retries,
err,
} => Self::Client {
retries: Some(existing_retries.unwrap_or_default() + retries),
err,
},
Self::Callback {
retries: existing_retries,
err,
} => Self::Callback {
retries: Some(existing_retries.unwrap_or_default() + retries),
err,
},
Self::Client(err) => Self::Client(err.with_retries(retries)),
Self::Callback { retries: _, err } => Self::Callback { retries, err },
}
}

fn retries(&self) -> Option<u32> {
fn retries(&self) -> u32 {
match self {
Self::Client { retries, .. } => *retries,
Self::Client(err) => err.retries(),
Self::Callback { retries, .. } => *retries,
}
}

fn error(&self) -> &(dyn std::error::Error + 'static) {
match self {
Self::Client { err, .. } => err,
Self::Client(err) => err,
Self::Callback { err, .. } => err,
}
}
}

impl<CallbackError: std::error::Error + 'static> From<Error> for CachedClientError<CallbackError> {
fn from(error: Error) -> Self {
Self::Client {
retries: None,
err: error,
}
Self::Client(error)
}
}

impl<CallbackError: std::error::Error + 'static> From<ErrorKind>
for CachedClientError<CallbackError>
{
fn from(error: ErrorKind) -> Self {
Self::Client {
retries: None,
err: error.into(),
}
Self::Client(error.into())
}
}

impl<E: Into<Self> + std::error::Error + 'static> From<CachedClientError<E>> for Error {
/// Attach retry error context, if there were retries.
fn from(error: CachedClientError<E>) -> Self {
match error {
CachedClientError::Client {
retries: Some(retries),
err,
} => Self::new(err.into_kind(), retries),
CachedClientError::Client { retries: None, err } => err,
CachedClientError::Callback {
retries: Some(retries),
err,
} => Self::new(err.into().into_kind(), retries),
CachedClientError::Callback { retries: None, err } => err.into(),
CachedClientError::Client(error) => error,
CachedClientError::Callback { retries, err } => {
Self::new(err.into().into_kind(), retries)
}
}
}
}
Expand Down Expand Up @@ -450,10 +422,16 @@ impl CachedClient {
response_callback: Callback,
) -> Result<Payload::Target, CachedClientError<CallBackError>> {
let new_cache = info_span!("new_cache", file = %cache_entry.path().display());
// Capture retries from the retry middleware
let retries = response
.extensions()
.get::<reqwest_retry::RetryCount>()
.map(|retries| retries.value())
.unwrap_or_default();
let data = response_callback(response)
.boxed_local()
.await
.map_err(|err| CachedClientError::Callback { retries: None, err })?;
.map_err(|err| CachedClientError::Callback { retries, err })?;
let Some(cache_policy) = cache_policy else {
return Ok(data.into_target());
};
Expand Down Expand Up @@ -662,16 +640,10 @@ impl CachedClient {
} else {
None
};
return Err(CachedClientError::<Error>::Client {
retries: retry_count,
err: ErrorKind::from_reqwest_with_problem_details(
url,
status_error,
problem_details,
)
.into(),
}
.into());
return Err(Error::new(
ErrorKind::from_reqwest_with_problem_details(url, status_error, problem_details),
retry_count.unwrap_or_default(),
));
}

let cache_policy = cache_policy_builder.build(&response);
Expand Down Expand Up @@ -720,7 +692,7 @@ impl CachedClient {
cache_control: CacheControl<'_>,
response_callback: Callback,
) -> Result<Payload::Target, CachedClientError<CallBackError>> {
let mut past_retries = 0;
let mut total_retries = 0;
let start_time = SystemTime::now();
let retry_policy = self.uncached().retry_policy();
loop {
Expand All @@ -729,40 +701,39 @@ impl CachedClient {
.get_cacheable(fresh_req, cache_entry, cache_control, &response_callback)
.await;

// Check if the middleware already performed retries
let middleware_retries = match &result {
Err(err) => err.retries().unwrap_or_default(),
Ok(_) => 0,
};

if result
.as_ref()
.is_err_and(|err| is_transient_network_error(err.error()))
{
// If middleware already retried, consider that in our retry budget
let total_retries = past_retries + middleware_retries;
let retry_decision = retry_policy.should_retry(start_time, total_retries);
if let reqwest_retry::RetryDecision::Retry { execute_after } = retry_decision {
let duration = execute_after
.duration_since(SystemTime::now())
.unwrap_or_else(|_| Duration::default());
match result {
Ok(ok) => return Ok(ok),
Err(err) => {
// Check if the middleware already performed retries
total_retries += err.retries();
if is_transient_network_error(err.error()) {
// If middleware already retried, consider that in our retry budget
let retry_decision = retry_policy.should_retry(start_time, total_retries);
Comment on lines +706 to +711
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nitpick: I think the second comment doesn't make much sense now the addition (consideration) has moved.

Suggested change
Err(err) => {
// Check if the middleware already performed retries
total_retries += err.retries().unwrap_or_default();
if is_transient_network_error(err.error()) {
// If middleware already retried, consider that in our retry budget
let retry_decision = retry_policy.should_retry(start_time, total_retries);
Err(err) => {
// Check if the middleware already performed retries and consider it in our retry budget
total_retries += err.retries().unwrap_or_default();
if is_transient_network_error(err.error()) {
let retry_decision = retry_policy.should_retry(start_time, total_retries);

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will look at it tomorrow.

if let reqwest_retry::RetryDecision::Retry { execute_after } =
retry_decision
{
let duration = execute_after
.duration_since(SystemTime::now())
.unwrap_or_else(|_| Duration::default());

debug!(
"Transient failure while handling response from {}; retrying after {:.1}s...",
req.url(),
duration.as_secs_f32(),
);
tokio::time::sleep(duration).await;
total_retries += 1;
continue;
}
}

debug!(
"Transient failure while handling response from {}; retrying after {:.1}s...",
req.url(),
duration.as_secs_f32(),
);
tokio::time::sleep(duration).await;
past_retries += 1;
continue;
return if total_retries > 0 {
Err(err.with_retries(total_retries))
} else {
Err(err)
};
Comment on lines +730 to +734
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
return if total_retries > 0 {
Err(err.with_retries(total_retries))
} else {
Err(err)
};
return Err(err.with_retries(total_retries));

Since the retry count is non-optional, conditionally doing with_retries here is now somewhat meaningless.

}
}

if past_retries > 0 {
return result.map_err(|err| err.with_retries(past_retries));
}

return result;
}
}

Expand Down Expand Up @@ -791,14 +762,13 @@ impl CachedClient {

// Check if the middleware already performed retries
let middleware_retries = match &result {
Err(err) => err.retries().unwrap_or_default(),
Err(err) => err.retries(),
_ => 0,
};

if result
.as_ref()
.err()
.is_some_and(|err| is_transient_network_error(err.error()))
.is_err_and(|err| is_transient_network_error(err.error()))
{
let total_retries = past_retries + middleware_retries;
let retry_decision = retry_policy.should_retry(start_time, total_retries);
Expand Down
5 changes: 5 additions & 0 deletions crates/uv-client/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,11 @@ impl Error {
&self.kind
}

pub(crate) fn with_retries(mut self, retries: u32) -> Self {
self.retries = retries;
self
}

/// Create a new error from a JSON parsing error.
pub(crate) fn from_json_err(err: serde_json::Error, url: DisplaySafeUrl) -> Self {
ErrorKind::BadJson { source: err, url }.into()
Expand Down
2 changes: 1 addition & 1 deletion crates/uv-client/src/flat_index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,7 @@ impl<'a> FlatIndexClient<'a> {
.collect();
Ok(FlatIndexEntries::from_entries(files))
}
Err(CachedClientError::Client { err, .. }) if err.is_offline() => {
Err(CachedClientError::Client(err)) if err.is_offline() => {
Ok(FlatIndexEntries::offline())
}
Err(err) => Err(err.into()),
Expand Down
8 changes: 4 additions & 4 deletions crates/uv-distribution/src/distribution_database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -721,7 +721,7 @@ impl<'a, Context: BuildContext> DistributionDatabase<'a, Context> {
.await
.map_err(|err| match err {
CachedClientError::Callback { err, .. } => err,
CachedClientError::Client { err, .. } => Error::Client(err),
CachedClientError::Client(err) => Error::Client(err),
})?;

// If the archive is missing the required hashes, or has since been removed, force a refresh.
Expand All @@ -745,7 +745,7 @@ impl<'a, Context: BuildContext> DistributionDatabase<'a, Context> {
.await
.map_err(|err| match err {
CachedClientError::Callback { err, .. } => err,
CachedClientError::Client { err, .. } => Error::Client(err),
CachedClientError::Client(err) => Error::Client(err),
})
})
.await?
Expand Down Expand Up @@ -926,7 +926,7 @@ impl<'a, Context: BuildContext> DistributionDatabase<'a, Context> {
.await
.map_err(|err| match err {
CachedClientError::Callback { err, .. } => err,
CachedClientError::Client { err, .. } => Error::Client(err),
CachedClientError::Client(err) => Error::Client(err),
})?;

// If the archive is missing the required hashes, or has since been removed, force a refresh.
Expand All @@ -950,7 +950,7 @@ impl<'a, Context: BuildContext> DistributionDatabase<'a, Context> {
.await
.map_err(|err| match err {
CachedClientError::Callback { err, .. } => err,
CachedClientError::Client { err, .. } => Error::Client(err),
CachedClientError::Client(err) => Error::Client(err),
})
})
.await?
Expand Down
6 changes: 3 additions & 3 deletions crates/uv-distribution/src/source/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -823,7 +823,7 @@ impl<'a, T: BuildContext> SourceDistributionBuilder<'a, T> {
.await
.map_err(|err| match err {
CachedClientError::Callback { err, .. } => err,
CachedClientError::Client { err, .. } => Error::Client(err),
CachedClientError::Client(err) => Error::Client(err),
})?;

// If the archive is missing the required hashes, force a refresh.
Expand All @@ -843,7 +843,7 @@ impl<'a, T: BuildContext> SourceDistributionBuilder<'a, T> {
.await
.map_err(|err| match err {
CachedClientError::Callback { err, .. } => err,
CachedClientError::Client { err, .. } => Error::Client(err),
CachedClientError::Client(err) => Error::Client(err),
})
})
.await
Expand Down Expand Up @@ -2280,7 +2280,7 @@ impl<'a, T: BuildContext> SourceDistributionBuilder<'a, T> {
.await
.map_err(|err| match err {
CachedClientError::Callback { err, .. } => err,
CachedClientError::Client { err, .. } => Error::Client(err),
CachedClientError::Client(err) => Error::Client(err),
})
})
.await
Expand Down
Loading
Loading