-
Notifications
You must be signed in to change notification settings - Fork 203
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add StandardRetryStrategy
to orchestrator
#2725
Conversation
add configurable invocation ID generator to be used for tests update invocation ID interceptor fix tests broken by orchestrator "flow" refactor fix tests broken by interceptor ID changes fix request info interceptor to use accessors and correctly format the TTL fix ordering of request info interceptor's header value parts refactor and simplify retry classifiers as well as related codegen give retry classifiers names to aid in debugging classification issues remove unused request info test JSON files add ability to add fake latency when using the orchestrator TestConnection update ServiceRuntimePluginGenerator to add retry and timeout configs, sleep impl, time source, and retry classifiers, to the config bag. update ServiceRuntimePluginGenerator so no panic occurs if an HTTP connector isn't provided in the config InterceptorContext accessors no longer panic if data is missing and instead return options fix incorrect orchestrator timeout handling for request attempts
e430f31
to
e58ebaa
Compare
A new generated diff is ready to view.
A new doc preview is ready to view. |
fix doc comment link
A new generated diff is ready to view.
A new doc preview is ready to view. |
add `aws-smithy-async` as an approved type for `aws-smithy-runtime` remove unused `fastrand` dep allow the retry classifiers to handle attempt timeouts
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Posting comments at the halfway point in reviewing the PR. Will continue reading it.
} | ||
|
||
/// A "generator" that returns [`InvocationId`]s from a predefined list. | ||
#[cfg(feature = "test-util")] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: You can create a private test_util
inline module that is behind the feature flag, throw all the imports into it, and then re-export InvocationIdGeneratorForTests publicly outside of it to make things nicer (only two cfg
flags needed instead of four.
pub trait InvocationIdGenerator: Debug + Send + Sync { | ||
/// Call this function to receive a new [`InvocationId`] or an error explaining why one couldn't | ||
/// be provided. | ||
fn generate(&self) -> Result<InvocationId, BoxError>; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we make this Result<Option<InvocationId, _>>
so that tests can easily disable the invocation ID entirely? The alternative is to update all the old tests to have the invocation ID in the test connections (and tediously update the signatures on those too).
let headers = context.request_mut().headers_mut(); | ||
let id = _cfg.get::<InvocationId>().unwrap_or(&self.id); | ||
let gen = cfg | ||
.get::<Box<dyn InvocationIdGenerator>>() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overriding this is the exception, so I think we should check for a generator in the bag, but otherwise default to the random UUID generator (and more generally avoid an allocation for this thing in the common case).
"00000000-0000-4000-8000-000000000000", | ||
)) | ||
/// Create a new, random, invocation ID. | ||
pub fn new() -> Self { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider renaming this to random()
// Check for a result | ||
let output_or_error = match ctx.output_or_error() { | ||
Some(output) => output, | ||
None => return None, | ||
}; | ||
|
||
// Check for an error | ||
let error = match output_or_error { | ||
Ok(_) => return None, | ||
Err(err) => err, | ||
}; | ||
|
||
// Check that the error is an operation error | ||
let error = match error.as_operation_error() { | ||
Some(err) => err, | ||
None => return None, | ||
}; | ||
|
||
// Downcast the error | ||
let error = match error.downcast_ref::<E>() { | ||
Some(err) => err, | ||
None => { | ||
panic!("can't downcast {error:#?}, wtf is it?"); | ||
} | ||
}; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This can be one line (although, don't take the suggestion directly as I don't think it will compile since I don't know the type as_operation_error is on):
// Check for a result | |
let output_or_error = match ctx.output_or_error() { | |
Some(output) => output, | |
None => return None, | |
}; | |
// Check for an error | |
let error = match output_or_error { | |
Ok(_) => return None, | |
Err(err) => err, | |
}; | |
// Check that the error is an operation error | |
let error = match error.as_operation_error() { | |
Some(err) => err, | |
None => return None, | |
}; | |
// Downcast the error | |
let error = match error.downcast_ref::<E>() { | |
Some(err) => err, | |
None => { | |
panic!("can't downcast {error:#?}, wtf is it?"); | |
} | |
}; | |
let error = ctx.output_or_error()?.err().map(as_operation_error)?.expect("wrong error type"); |
None | ||
impl fmt::Display for CodedError { | ||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { | ||
write!(f, "Coded Error") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't it output the error code?
@@ -11,7 +11,7 @@ | |||
"notarealsessiontoken" | |||
], | |||
"authorization": [ | |||
"AWS4-HMAC-SHA256 Credential=ANOTREAL/20210618/us-east-1/s3/aws4_request, SignedHeaders=amz-sdk-invocation-id;host;x-amz-content-sha256;x-amz-date;x-amz-security-token;x-amz-user-agent, Signature=e7eccf4e792113f5f17a50bfd8f1719479e89ba0b476894e6f3dba030dc87f82" | |||
"AWS4-HMAC-SHA256 Credential=ANOTREAL/20210618/us-east-1/s3/aws4_request, SignedHeaders=host;x-amz-content-sha256;x-amz-date;x-amz-security-token;x-amz-user-agent, Signature=ae78f74d26b6b0c3a403d9e8cc7ec3829d6264a2b33db672bf2b151bbb901786" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this correct?
@@ -31,228 +128,267 @@ struct FixupPlugin { | |||
// # Client waits 1 second between retry attempts. | |||
#[tokio::test] | |||
async fn three_retries_and_then_success() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It should be possible to add this directly to the S3 integration tests now, although the S3 crate doesn't compile in orchestrator mode right now due to presigning still directly referencing middleware.
// # Fast network, latency + server time is less than one second. | ||
// # No clock skew | ||
// # Client waits 1 second between attempts. | ||
// TODO(orchestratorCrossOperationState) We don't support cross-operation state yet, so we can't calculate the service time skew for operations being sent after the first one. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Suggest using this TODO tag so we can continue to code search for these.
// TODO(orchestratorCrossOperationState) We don't support cross-operation state yet, so we can't calculate the service time skew for operations being sent after the first one. | |
// TODO(enableNewSmithyRuntime) We don't support cross-operation state yet, so we can't calculate the service time skew for operations being sent after the first one. |
@@ -121,16 +134,38 @@ class ServiceRuntimePluginGenerator( | |||
#{SharedEndpointResolver}::from(self.handle.conf.endpoint_resolver())); | |||
cfg.set_endpoint_resolver(endpoint_resolver); | |||
|
|||
${"" /* TODO(EndpointResolver): Create endpoint params builder from service config */} | |||
cfg.put(#{Params}::builder()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I thought we already did this?
${"" /* TODO(EndpointResolver): Create endpoint params builder from service config */} | ||
cfg.put(#{Params}::builder()); | ||
|
||
let retry_classifiers = #{RetryClassifiers}::new() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we use ConfigBag's list store for this, or do you think that will make getting the order correct harder?
##[allow(clippy::useless_conversion)] | ||
Some(#{type_erase_result}(#{parse_streaming_response}(response)).into()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think I fixed this one in another PR:
##[allow(clippy::useless_conversion)] | |
Some(#{type_erase_result}(#{parse_streaming_response}(response)).into()) | |
Some(#{type_erase_result}(#{parse_streaming_response}(response))) |
##[allow(clippy::useless_conversion)] | ||
#{type_erase_result}(#{parse_error}(response.status().as_u16(), response.headers(), body)).into() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
##[allow(clippy::useless_conversion)] | |
#{type_erase_result}(#{parse_error}(response.status().as_u16(), response.headers(), body)).into() | |
#{type_erase_result}(#{parse_error}(response.status().as_u16(), response.headers(), body)) |
} | ||
|
||
#[derive(Debug, Default)] | ||
pub struct RequestAttemptsInterceptor {} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This seems like something the orchestrator should do directly.
); | ||
if output_or_error.is_ok() { | ||
tracing::trace!("request succeeded, no retry necessary"); | ||
return Ok(ShouldAttempt::No); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@rcoh suggested in another PR that ShouldAttempt::No
should have a reason, maybe a &'static str
, for debugging. I think that's nicer than having a tracing::trace!
at each site where we return No
. Then the logging can be centralized (and it should probably be debug!
instead of trace!
).
rust-runtime/aws-smithy-runtime/src/client/retries/strategy/standard.rs
Outdated
Show resolved
Hide resolved
let output_or_error = ctx.output_or_error().expect( | ||
"This must never be called without reaching the point where the result exists.", | ||
); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are there retry strategies that don't require the output_or_error? Should we do the same wrapping strategy that we do for interceptors to make certain things always available in the context?
let request_attempts: &RequestAttempts = cfg | ||
.get() | ||
.expect("at least one request attempt is made before any retry is attempted"); | ||
if request_attempts.attempts() >= self.max_attempts { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this correct? Is max attempts inclusive or exclusive?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for the work. Looks awesome!
timestamp: SystemTime, | ||
} | ||
|
||
impl RuntimePlugin for FixupPlugin { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since send_orchestrator_with_plugin
has been removed in main
, we probably need to put an invocation id via .interceptor
(at least for now).
|
||
// pub use adaptive::AdaptiveRetryStrategy; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can be removed?
let request = ctx | ||
.request_mut() | ||
.expect("request is present before orchestrate_endpoint is called"); | ||
// .ok_or("No request was present in the InterceptorContext")?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can be removed?
@@ -190,16 +229,24 @@ impl Connection for TestConnection { | |||
fn call(&self, request: HttpRequest) -> BoxFuture<HttpResponse> { | |||
// TODO(orchestrator) Validate request | |||
|
|||
let res = if let Some((expected, resp)) = self.data.lock().unwrap().pop() { | |||
let simulated_latency; | |||
let res = if let Some(event) = self.data.lock().unwrap().pop() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe also initialize simulated_latency
as a result of the if
expression? e.g.
let (res, simulated_latency) = if let Some(event) = self.data.lock().unwrap().pop() {
...
(Ok(event.res.map(SdkBody::from)), event_latency)
} else {
(Err(connectionError::other(...).into()), Duration::from_secs(0))
}
@@ -125,15 +127,21 @@ async fn try_op(ctx: &mut InterceptorContext, cfg: &mut ConfigBag, interceptors: | |||
} | |||
|
|||
loop { | |||
if !ctx.rewind(cfg) { | |||
break; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need to distinguish how we break out of the loop, i.e. due to a rewind failure vs. ShouldAttempt::No
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's a good question. I don't think so, but I can see that it'd be helpful for debugging purposes.
_ => None, | ||
pub struct ModeledAsRetryableClassifier<E> | ||
where | ||
E: StdError + ProvideErrorKind + Send + Sync + 'static, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we can remove a bound on E
here (and the same goes for lines 25, 35, 72, 79, and 89).
Co-authored-by: John DiSanti <[email protected]>
It's getting too hard for me to keep merging |
A new generated diff is ready to view.
A new doc preview is ready to view. |
## Motivation and Context <!--- Why is this change required? What problem does it solve? --> <!--- If it fixes an open issue, please link to the issue here --> This change fixes many of the smaller issues I ran into during the implementation of standard retries for the orchestrator. Merges from `main` were getting difficult with my [other PR](#2725) so I'm breaking things up. ## Description <!--- Describe your changes in detail --> - when orchestrator attempt timeout occurs, error is now set in context - update test connection to allow defining connection events with optional latency simulation update orchestrator attempt loop to track iteration count - set request attempts from the attempt loop - add comment explaining "rewind" step of making request attempts add `doesnt_matter` method to `TypeErasedBox`, useful when testing update tests to use the new `TypeErasedBox::doesnt_matter` method - add more doc comments - add `set_subsec_nanos` method to `DateTime`. - I added this to make it easier to string-format a datetime that didn't include the nanos. - fix Invocation ID interceptor not inserting the expected header update input type for `OperationError::other` to be more user-friendly - add `test-util` feature to `aws-smithy-runtime-api` - add `test-util` feature to `aws-runtime` - fix presigining inlineable to pull in tower dep during codegen ## Testing <!--- Please describe in detail how you tested your changes --> <!--- Include details of your testing environment, and the tests you ran to --> <!--- see how your change affects other areas of the code, etc. --> tests have been updated where necessary ---- _By submitting this pull request, I confirm that you can use, modify, copy, and redistribute this contribution, under the terms of your choice._
Closing this PR in favor of a new one that I'll upload shortly. |
Description
This change adds support for the standard retry strategy and fixes up some tests that I wrote for the "request info" PR but had left commented out. We still can't run one of them because it requires cross-operation state and I didn't want this PR to keep growing.
Testing
I updated some broken tests and wrote new ones as well.
By submitting this pull request, I confirm that you can use, modify, copy, and redistribute this contribution, under the terms of your choice.