Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replenish cross-request retry allowance on successful response #1197

Merged
merged 7 commits into from
Feb 22, 2022
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions CHANGELOG.next.toml
Original file line number Diff line number Diff line change
Expand Up @@ -256,3 +256,21 @@ message = "Fixed example showing how to use hardcoded credentials in `aws-types`
references = ["smithy-rs#1180"]
meta = { "breaking" = false, "tada" = false, "bug" = true }
author = "jdisanti"

[[aws-sdk-rust]]
message = "Fixed a bug that caused clients to eventually stop retrying. The cross-request retry allowance wasn't being reimbursed upon receiving a successful response, so once this allowance reached zero, no further retries would ever be attempted."
references = ["smithy-rs#1197"]
meta = { "breaking" = false, "tada" = false, "bug" = true }
author = "jdisanti"

[[smithy-rs]]
message = "Fixed a bug that caused clients to eventually stop retrying. The cross-request retry allowance wasn't being reimbursed upon receiving a successful response, so once this allowance reached zero, no further retries would ever be attempted."
references = ["smithy-rs#1197"]
meta = { "breaking" = false, "tada" = false, "bug" = true }
author = "jdisanti"

[[smithy-rs]]
message = "`aws_smithy_types::retry::RetryKind` had its `NotRetryable` variant split into `NotRetryable` and `Unnecessary`. If you implement the `ClassifyResponse`, then successful responses need to return `Unnecessary` rather than `NotRetryable`."
references = ["smithy-rs#1197"]
meta = { "breaking" = true, "tada" = false, "bug" = false }
author = "jdisanti"
2 changes: 1 addition & 1 deletion aws/rust-runtime/aws-http/src/retry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ where
{
fn classify(&self, err: Result<&T, &SdkError<E>>) -> RetryKind {
let (err, response) = match err {
Ok(_) => return RetryKind::NotRetryable,
Ok(_) => return RetryKind::Unnecessary,
Err(SdkError::ServiceError { err, raw }) => (err, raw),
Err(SdkError::DispatchFailure(err)) => {
return if err.is_timeout() || err.is_io() {
Expand Down
5 changes: 3 additions & 2 deletions aws/sdk/integration-tests/dynamodb/tests/movies.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,8 +125,9 @@ where
&self,
response: Result<&SdkSuccess<DescribeTableOutput>, &SdkError<DescribeTableError>>,
) -> RetryKind {
match self.inner.classify(response.clone()) {
match self.inner.classify(response) {
jdisanti marked this conversation as resolved.
Show resolved Hide resolved
RetryKind::NotRetryable => (),
RetryKind::Unnecessary => (),
other => return other,
};
match response {
Expand All @@ -142,7 +143,7 @@ where
{
RetryKind::Explicit(Duration::from_secs(1))
} else {
RetryKind::NotRetryable
RetryKind::Unnecessary
}
}
_ => RetryKind::NotRetryable,
Expand Down
149 changes: 94 additions & 55 deletions rust-runtime/aws-smithy-client/src/retry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,8 @@ impl CrossRequestRetryState {
}
}

type BoxFuture<T> = Pin<Box<dyn Future<Output = T> + Send>>;

/// RetryHandler
///
/// Implement retries for an individual request.
Expand All @@ -253,19 +255,12 @@ impl RetryHandler {
///
/// If a retry is specified, this function returns `(next, backoff_duration)`
/// If no retry is specified, this function returns None
fn attempt_retry(&self, retry_kind: Result<(), ErrorKind>) -> Option<(Self, Duration)> {
let quota_used = match retry_kind {
Ok(_) => {
self.shared
.quota_release(self.local.last_quota_usage, &self.config);
fn should_retry_error(&self, error_kind: &ErrorKind) -> Option<(Self, Duration)> {
let quota_used = {
if self.local.attempts == self.config.max_attempts {
return None;
}
Err(e) => {
if self.local.attempts == self.config.max_attempts {
return None;
}
self.shared.quota_acquire(&e, &self.config)?
}
self.shared.quota_acquire(error_kind, &self.config)?
};
/*
From the retry spec:
Expand All @@ -291,48 +286,61 @@ impl RetryHandler {

Some((next, backoff))
}
}

impl<Handler, R, T, E>
tower::retry::Policy<operation::Operation<Handler, R>, SdkSuccess<T>, SdkError<E>>
for RetryHandler
where
Handler: Clone,
R: ClassifyResponse<SdkSuccess<T>, SdkError<E>>,
{
type Future = Pin<Box<dyn Future<Output = Self> + Send>>;
fn should_retry(&self, retry_kind: &RetryKind) -> Option<(Self, Duration)> {
match retry_kind {
RetryKind::Explicit(dur) => Some((self.clone(), *dur)),
RetryKind::NotRetryable => None,
RetryKind::Unnecessary => {
self.shared
.quota_release(self.local.last_quota_usage, &self.config);
None
}
RetryKind::Error(err) => self.should_retry_error(err),
_ => None,
}
}

fn retry_for(&self, retry_kind: RetryKind) -> Option<BoxFuture<Self>> {
let (next, dur) = self.should_retry(&retry_kind)?;

fn retry(
&self,
req: &Operation<Handler, R>,
result: Result<&SdkSuccess<T>, &SdkError<E>>,
) -> Option<Self::Future> {
let policy = req.retry_policy();
let retry = policy.classify(result);
let sleep = match &self.sleep_impl {
Some(sleep) => sleep,
None => {
if retry != RetryKind::NotRetryable {
if retry_kind != RetryKind::NotRetryable {
tracing::debug!("cannot retry because no sleep implementation exists");
}
return None;
}
};
let (next, dur) = match retry {
RetryKind::Explicit(dur) => (self.clone(), dur),
RetryKind::NotRetryable => return None,
RetryKind::Error(err) => self.attempt_retry(Err(err))?,
_ => return None,
};

let sleep_future = sleep.sleep(dur);
let fut = async move {
sleep_future.await;
next
}
.instrument(tracing::info_span!("retry", kind = &debug(retry)));
.instrument(tracing::info_span!("retry", kind = &debug(retry_kind)));
Some(check_send(Box::pin(fut)))
}
}

impl<Handler, R, T, E>
tower::retry::Policy<operation::Operation<Handler, R>, SdkSuccess<T>, SdkError<E>>
for RetryHandler
where
Handler: Clone,
R: ClassifyResponse<SdkSuccess<T>, SdkError<E>>,
{
type Future = BoxFuture<Self>;

fn retry(
&self,
req: &Operation<Handler, R>,
result: Result<&SdkSuccess<T>, &SdkError<E>>,
) -> Option<Self::Future> {
let policy = req.retry_policy();
let retry_kind = policy.classify(result);
self.retry_for(retry_kind)
}

fn clone_request(&self, req: &Operation<Handler, R>) -> Option<Operation<Handler, R>> {
req.try_clone()
Expand All @@ -348,7 +356,7 @@ mod test {

use crate::retry::{Config, NewRequestPolicy, RetryHandler, Standard};

use aws_smithy_types::retry::ErrorKind;
use aws_smithy_types::retry::{ErrorKind, RetryKind};

use std::time::Duration;

Expand All @@ -367,18 +375,18 @@ mod test {
fn eventual_success() {
let policy = Standard::new(test_config()).new_request_policy(None);
let (policy, dur) = policy
.attempt_retry(Err(ErrorKind::ServerError))
.should_retry(&RetryKind::Error(ErrorKind::ServerError))
.expect("should retry");
assert_eq!(dur, Duration::from_secs(1));
assert_eq!(policy.retry_quota(), 495);

let (policy, dur) = policy
.attempt_retry(Err(ErrorKind::ServerError))
.should_retry(&RetryKind::Error(ErrorKind::ServerError))
.expect("should retry");
assert_eq!(dur, Duration::from_secs(2));
assert_eq!(policy.retry_quota(), 490);

let no_retry = policy.attempt_retry(Ok(()));
let no_retry = policy.should_retry(&RetryKind::Unnecessary);
assert!(no_retry.is_none());
assert_eq!(policy.retry_quota(), 495);
}
Expand All @@ -387,18 +395,18 @@ mod test {
fn no_more_attempts() {
let policy = Standard::new(test_config()).new_request_policy(None);
let (policy, dur) = policy
.attempt_retry(Err(ErrorKind::ServerError))
.should_retry(&RetryKind::Error(ErrorKind::ServerError))
.expect("should retry");
assert_eq!(dur, Duration::from_secs(1));
assert_eq!(policy.retry_quota(), 495);

let (policy, dur) = policy
.attempt_retry(Err(ErrorKind::ServerError))
.should_retry(&RetryKind::Error(ErrorKind::ServerError))
.expect("should retry");
assert_eq!(dur, Duration::from_secs(2));
assert_eq!(policy.retry_quota(), 490);

let no_retry = policy.attempt_retry(Err(ErrorKind::ServerError));
let no_retry = policy.should_retry(&RetryKind::Error(ErrorKind::ServerError));
assert!(no_retry.is_none());
assert_eq!(policy.retry_quota(), 490);
}
Expand All @@ -408,46 +416,77 @@ mod test {
let mut conf = test_config();
conf.initial_retry_tokens = 5;
let policy = Standard::new(conf).new_request_policy(None);

let (policy, dur) = policy
.attempt_retry(Err(ErrorKind::ServerError))
.should_retry(&RetryKind::Error(ErrorKind::ServerError))
.expect("should retry");
assert_eq!(dur, Duration::from_secs(1));
assert_eq!(policy.retry_quota(), 0);
let no_retry = policy.attempt_retry(Err(ErrorKind::ServerError));

let no_retry = policy.should_retry(&RetryKind::Error(ErrorKind::ServerError));
assert!(no_retry.is_none());
assert_eq!(policy.retry_quota(), 0);
}

#[test]
fn quota_replenishes_on_success() {
let mut conf = test_config();
conf.initial_retry_tokens = 100;
let policy = Standard::new(conf).new_request_policy(None);
let (policy, dur) = policy
.should_retry(&RetryKind::Error(ErrorKind::TransientError))
.expect("should retry");
assert_eq!(dur, Duration::from_secs(1));
assert_eq!(policy.retry_quota(), 90);

let (policy, dur) = policy
.should_retry(&RetryKind::Explicit(Duration::from_secs(1)))
.expect("should retry");
assert_eq!(dur, Duration::from_secs(1));
assert_eq!(
policy.retry_quota(),
90,
"explicit retry should not subtract from quota"
);

assert!(
policy.should_retry(&RetryKind::Unnecessary).is_none(),
"it should not retry success"
);
let available = policy.shared.quota_available.lock().unwrap();
assert_eq!(100, *available, "successful request should replenish quota");
}

#[test]
fn backoff_timing() {
let mut conf = test_config();
conf.max_attempts = 5;
let policy = Standard::new(conf).new_request_policy(None);
let (policy, dur) = policy
.attempt_retry(Err(ErrorKind::ServerError))
.should_retry(&RetryKind::Error(ErrorKind::ServerError))
.expect("should retry");
assert_eq!(dur, Duration::from_secs(1));
assert_eq!(policy.retry_quota(), 495);

let (policy, dur) = policy
.attempt_retry(Err(ErrorKind::ServerError))
.should_retry(&RetryKind::Error(ErrorKind::ServerError))
.expect("should retry");
assert_eq!(dur, Duration::from_secs(2));
assert_eq!(policy.retry_quota(), 490);

let (policy, dur) = policy
.attempt_retry(Err(ErrorKind::ServerError))
.should_retry(&RetryKind::Error(ErrorKind::ServerError))
.expect("should retry");
assert_eq!(dur, Duration::from_secs(4));
assert_eq!(policy.retry_quota(), 485);

let (policy, dur) = policy
.attempt_retry(Err(ErrorKind::ServerError))
.should_retry(&RetryKind::Error(ErrorKind::ServerError))
.expect("should retry");
assert_eq!(dur, Duration::from_secs(8));
assert_eq!(policy.retry_quota(), 480);

let no_retry = policy.attempt_retry(Err(ErrorKind::ServerError));
let no_retry = policy.should_retry(&RetryKind::Error(ErrorKind::ServerError));
assert!(no_retry.is_none());
assert_eq!(policy.retry_quota(), 480);
}
Expand All @@ -459,30 +498,30 @@ mod test {
conf.max_backoff = Duration::from_secs(3);
let policy = Standard::new(conf).new_request_policy(None);
let (policy, dur) = policy
.attempt_retry(Err(ErrorKind::ServerError))
.should_retry(&RetryKind::Error(ErrorKind::ServerError))
.expect("should retry");
assert_eq!(dur, Duration::from_secs(1));
assert_eq!(policy.retry_quota(), 495);

let (policy, dur) = policy
.attempt_retry(Err(ErrorKind::ServerError))
.should_retry(&RetryKind::Error(ErrorKind::ServerError))
.expect("should retry");
assert_eq!(dur, Duration::from_secs(2));
assert_eq!(policy.retry_quota(), 490);

let (policy, dur) = policy
.attempt_retry(Err(ErrorKind::ServerError))
.should_retry(&RetryKind::Error(ErrorKind::ServerError))
.expect("should retry");
assert_eq!(dur, Duration::from_secs(3));
assert_eq!(policy.retry_quota(), 485);

let (policy, dur) = policy
.attempt_retry(Err(ErrorKind::ServerError))
.should_retry(&RetryKind::Error(ErrorKind::ServerError))
.expect("should retry");
assert_eq!(dur, Duration::from_secs(3));
assert_eq!(policy.retry_quota(), 480);

let no_retry = policy.attempt_retry(Err(ErrorKind::ServerError));
let no_retry = policy.should_retry(&RetryKind::Error(ErrorKind::ServerError));
assert!(no_retry.is_none());
assert_eq!(policy.retry_quota(), 480);
}
Expand Down
5 changes: 4 additions & 1 deletion rust-runtime/aws-smithy-types/src/retry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,11 @@ pub enum RetryKind {
/// Note: The specified `Duration` is considered a suggestion and may be replaced or ignored.
Explicit(Duration),

/// The response associated with this variant should not be retried.
/// The response was a failure, but should not be retried.
NotRetryable,

/// The response was successful, so no retry is necessary.
Unnecessary,
}

/// Specifies how failed requests should be retried.
Expand Down