Skip to content

Commit

Permalink
Add connection poisoning to aws-smithy-client (#2445)
Browse files Browse the repository at this point in the history
* Add Connection Poisoning to aws-smithy-client

* Fix doc links

* Remove required tokio dependency from aws-smithy-client

* Remove external type exposed

* Rename, re-add tokio dependency

* Change IP to 127.0.0.1 to attempt to fix windows

* Add dns::Name to external types

* Remove non_exhaustive not needed

* Add client target to changelog
  • Loading branch information
rcoh authored Mar 14, 2023
1 parent b2c5eaa commit 61934da
Show file tree
Hide file tree
Showing 24 changed files with 1,289 additions and 103 deletions.
25 changes: 25 additions & 0 deletions CHANGELOG.next.toml
Original file line number Diff line number Diff line change
Expand Up @@ -287,3 +287,28 @@ message = "The modules in generated client crates have been reorganized. See the
references = ["smithy-rs#2448"]
meta = { "breaking" = true, "tada" = false, "bug" = false, "target" = "client" }
author = "jdisanti"

[[aws-sdk-rust]]
message = """Reconnect on transient errors.
If a transient error (timeout, 500, 503, 503) is encountered, the connection will be evicted from the pool and will not
be reused. This is enabled by default for all AWS services. It can be disabled by setting `RetryConfig::with_reconnect_mode`
Although there is no API breakage from this change, it alters the client behavior in a way that may cause breakage for customers.
"""
references = ["aws-sdk-rust#160", "smithy-rs#2445"]
meta = { "breaking" = true, "tada" = false, "bug" = false }
author = "rcoh"

[[smithy-rs]]
message = """Reconnect on transient errors.
Note: **this behavior is disabled by default for generic clients**. It can be enabled with
`aws_smithy_client::Builder::reconnect_on_transient_errors`
If a transient error (timeout, 500, 503, 503) is encountered, the connection will be evicted from the pool and will not
be reused.
"""
references = ["aws-sdk-rust#160", "smithy-rs#2445"]
meta = { "breaking" = false, "tada" = false, "bug" = false, "target" = "client" }
author = "rcoh"
2 changes: 1 addition & 1 deletion aws/rust-runtime/aws-config/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ aws-smithy-types = { path = "../../sdk/build/aws-sdk/sdk/aws-smithy-types" }
aws-types = { path = "../../sdk/build/aws-sdk/sdk/aws-types" }
hyper = { version = "0.14.12", default-features = false }
time = { version = "0.3.4", features = ["parsing"] }
tokio = { version = "1.8.4", features = ["sync"] }
tokio = { version = "1.13.1", features = ["sync"] }
tracing = { version = "0.1" }

# implementation detail of SSO credential caching
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,7 @@ private class AwsFluentClientExtensions(types: Types) {
};
let mut builder = builder
.middleware(#{DynMiddleware}::new(#{Middleware}::new()))
.reconnect_mode(retry_config.reconnect_mode())
.retry_config(retry_config.into())
.operation_timeout_config(timeout_config.into());
builder.set_sleep_impl(sleep_impl);
Expand Down Expand Up @@ -257,6 +258,7 @@ private fun renderCustomizableOperationSendMethod(
"combined_generics_decl" to combinedGenerics.declaration(),
"handle_generics_bounds" to handleGenerics.bounds(),
"SdkSuccess" to RuntimeType.sdkSuccess(runtimeConfig),
"SdkError" to RuntimeType.sdkError(runtimeConfig),
"ClassifyRetry" to RuntimeType.classifyRetry(runtimeConfig),
"ParseHttpResponse" to RuntimeType.parseHttpResponse(runtimeConfig),
)
Expand All @@ -272,7 +274,7 @@ private fun renderCustomizableOperationSendMethod(
where
E: std::error::Error + Send + Sync + 'static,
O: #{ParseHttpResponse}<Output = Result<T, E>> + Send + Sync + Clone + 'static,
Retry: #{ClassifyRetry}<#{SdkSuccess}<T>, SdkError<E>> + Send + Sync + Clone,
Retry: #{ClassifyRetry}<#{SdkSuccess}<T>, #{SdkError}<E>> + Send + Sync + Clone,
{
self.handle.client.call(self.operation).await
}
Expand Down
99 changes: 99 additions & 0 deletions aws/sdk/integration-tests/s3/tests/reconnects.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
/*
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
* SPDX-License-Identifier: Apache-2.0
*/

use aws_credential_types::provider::SharedCredentialsProvider;
use aws_credential_types::Credentials;
use aws_smithy_async::rt::sleep::TokioSleep;
use aws_smithy_client::test_connection::wire_mock::{
check_matches, ReplayedEvent, WireLevelTestConnection,
};
use aws_smithy_client::{ev, match_events};
use aws_smithy_types::retry::{ReconnectMode, RetryConfig};
use aws_types::region::Region;
use aws_types::SdkConfig;
use std::sync::Arc;

#[tokio::test]
/// test that disabling reconnects on retry config disables them for the client
async fn disable_reconnects() {
let mock = WireLevelTestConnection::spinup(vec![
ReplayedEvent::status(503),
ReplayedEvent::status(503),
ReplayedEvent::with_body("here-is-your-object"),
])
.await;

let sdk_config = SdkConfig::builder()
.region(Region::from_static("us-east-2"))
.credentials_provider(SharedCredentialsProvider::new(Credentials::for_tests()))
.sleep_impl(Arc::new(TokioSleep::new()))
.endpoint_url(mock.endpoint_url())
.http_connector(mock.http_connector())
.retry_config(
RetryConfig::standard().with_reconnect_mode(ReconnectMode::ReuseAllConnections),
)
.build();
let client = aws_sdk_s3::Client::new(&sdk_config);
let resp = client
.get_object()
.bucket("bucket")
.key("key")
.send()
.await
.expect("succeeds after retries");
assert_eq!(
resp.body.collect().await.unwrap().to_vec(),
b"here-is-your-object"
);
match_events!(
ev!(dns),
ev!(connect),
ev!(http(503)),
ev!(http(503)),
ev!(http(200))
)(&mock.events());
}

#[tokio::test]
async fn reconnect_on_503() {
let mock = WireLevelTestConnection::spinup(vec![
ReplayedEvent::status(503),
ReplayedEvent::status(503),
ReplayedEvent::with_body("here-is-your-object"),
])
.await;

let sdk_config = SdkConfig::builder()
.region(Region::from_static("us-east-2"))
.credentials_provider(SharedCredentialsProvider::new(Credentials::for_tests()))
.sleep_impl(Arc::new(TokioSleep::new()))
.endpoint_url(mock.endpoint_url())
.http_connector(mock.http_connector())
.retry_config(RetryConfig::standard())
.build();
let client = aws_sdk_s3::Client::new(&sdk_config);
let resp = client
.get_object()
.bucket("bucket")
.key("key")
.send()
.await
.expect("succeeds after retries");
assert_eq!(
resp.body.collect().await.unwrap().to_vec(),
b"here-is-your-object"
);
match_events!(
ev!(dns),
ev!(connect),
ev!(http(503)),
ev!(dns),
ev!(connect),
ev!(http(503)),
ev!(dns),
ev!(connect),
ev!(http(200))
)(&mock.events());
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ class CustomizableOperationGenerator(
"Operation" to smithyHttp.resolve("operation::Operation"),
"Request" to smithyHttp.resolve("operation::Request"),
"Response" to smithyHttp.resolve("operation::Response"),
"ClassifyRetry" to smithyHttp.resolve("retry::ClassifyRetry"),
"ClassifyRetry" to RuntimeType.classifyRetry(runtimeConfig),
"RetryKind" to smithyTypes.resolve("retry::RetryKind"),
)
renderCustomizableOperationModule(this)
Expand Down Expand Up @@ -150,6 +150,9 @@ class CustomizableOperationGenerator(
"ParseHttpResponse" to smithyHttp.resolve("response::ParseHttpResponse"),
"NewRequestPolicy" to smithyClient.resolve("retry::NewRequestPolicy"),
"SmithyRetryPolicy" to smithyClient.resolve("bounds::SmithyRetryPolicy"),
"ClassifyRetry" to RuntimeType.classifyRetry(runtimeConfig),
"SdkSuccess" to RuntimeType.sdkSuccess(runtimeConfig),
"SdkError" to RuntimeType.sdkError(runtimeConfig),
)

writer.rustTemplate(
Expand All @@ -164,6 +167,7 @@ class CustomizableOperationGenerator(
E: std::error::Error + Send + Sync + 'static,
O: #{ParseHttpResponse}<Output = Result<T, E>> + Send + Sync + Clone + 'static,
Retry: Send + Sync + Clone,
Retry: #{ClassifyRetry}<#{SdkSuccess}<T>, #{SdkError}<E>> + Send + Sync + Clone,
<R as #{NewRequestPolicy}>::Policy: #{SmithyRetryPolicy}<O, T, E, Retry> + Clone,
{
self.handle.client.call(self.operation).await
Expand Down
1 change: 1 addition & 0 deletions rust-runtime/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
[workspace]


members = [
"inlineable",
"aws-smithy-async",
Expand Down
10 changes: 7 additions & 3 deletions rust-runtime/aws-smithy-client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,13 @@ repository = "https://github.com/awslabs/smithy-rs"

[features]
rt-tokio = ["aws-smithy-async/rt-tokio"]
test-util = ["aws-smithy-protocol-test", "serde/derive", "rustls"]
test-util = ["aws-smithy-protocol-test", "serde/derive", "rustls", "hyper/server", "hyper/h2", "tokio/full"]
native-tls = ["client-hyper", "hyper-tls", "rt-tokio"]
rustls = ["client-hyper", "hyper-rustls", "rt-tokio", "lazy_static"]
client-hyper = ["hyper"]
hyper-webpki-doctest-only = ["hyper-rustls/webpki-roots"]


[dependencies]
aws-smithy-async = { path = "../aws-smithy-async" }
aws-smithy-http = { path = "../aws-smithy-http" }
Expand All @@ -25,7 +26,7 @@ bytes = "1"
fastrand = "1.4.0"
http = "0.2.3"
http-body = "0.4.4"
hyper = { version = "0.14.12", features = ["client", "http2", "http1", "tcp"], optional = true }
hyper = { version = "0.14.25", features = ["client", "http2", "http1", "tcp"], optional = true }
# cargo does not support optional test dependencies, so to completely disable rustls when
# the native-tls feature is enabled, we need to add the webpki-roots feature here.
# https://github.com/rust-lang/cargo/issues/1596
Expand All @@ -34,7 +35,7 @@ hyper-tls = { version = "0.5.0", optional = true }
lazy_static = { version = "1", optional = true }
pin-project-lite = "0.2.7"
serde = { version = "1", features = ["derive"], optional = true }
tokio = { version = "1.8.4" }
tokio = { version = "1.13.1" }
tower = { version = "0.4.6", features = ["util", "retry"] }
tracing = "0.1"

Expand All @@ -44,6 +45,9 @@ serde = { version = "1", features = ["derive"] }
serde_json = "1"
tokio = { version = "1.8.4", features = ["full", "test-util"] }
tower-test = "0.4.0"
tracing-subscriber = "0.3.16"
tracing-test = "0.2.4"


[package.metadata.docs.rs]
all-features = true
Expand Down
2 changes: 2 additions & 0 deletions rust-runtime/aws-smithy-client/external-types.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,12 @@ allowed_external_types = [
"tokio::io::async_read::AsyncRead",
"tokio::io::async_write::AsyncWrite",


# TODO(https://github.com/awslabs/smithy-rs/issues/1193): Once tooling permits it, only allow the following types in the `test-utils` feature
"bytes::bytes::Bytes",
"serde::ser::Serialize",
"serde::de::Deserialize",
"hyper::client::connect::dns::Name",

# TODO(https://github.com/awslabs/smithy-rs/issues/1193): Decide if we want to continue exposing tower_layer
"tower_layer::Layer",
Expand Down
45 changes: 45 additions & 0 deletions rust-runtime/aws-smithy-client/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use crate::{bounds, erase, retry, Client};
use aws_smithy_async::rt::sleep::{default_async_sleep, AsyncSleep};
use aws_smithy_http::body::SdkBody;
use aws_smithy_http::result::ConnectorError;
use aws_smithy_types::retry::ReconnectMode;
use aws_smithy_types::timeout::{OperationTimeoutConfig, TimeoutConfig};
use std::sync::Arc;

Expand Down Expand Up @@ -37,6 +38,12 @@ pub struct Builder<C = (), M = (), R = retry::Standard> {
retry_policy: MaybeRequiresSleep<R>,
operation_timeout_config: Option<OperationTimeoutConfig>,
sleep_impl: Option<Arc<dyn AsyncSleep>>,
reconnect_mode: Option<ReconnectMode>,
}

/// transitional default: disable this behavior by default
const fn default_reconnect_mode() -> ReconnectMode {
ReconnectMode::ReuseAllConnections
}

impl<C, M> Default for Builder<C, M>
Expand All @@ -55,6 +62,7 @@ where
),
operation_timeout_config: None,
sleep_impl: default_async_sleep(),
reconnect_mode: Some(default_reconnect_mode()),
}
}
}
Expand Down Expand Up @@ -173,6 +181,7 @@ impl<M, R> Builder<(), M, R> {
retry_policy: self.retry_policy,
operation_timeout_config: self.operation_timeout_config,
sleep_impl: self.sleep_impl,
reconnect_mode: self.reconnect_mode,
}
}

Expand Down Expand Up @@ -229,6 +238,7 @@ impl<C, R> Builder<C, (), R> {
operation_timeout_config: self.operation_timeout_config,
middleware,
sleep_impl: self.sleep_impl,
reconnect_mode: self.reconnect_mode,
}
}

Expand Down Expand Up @@ -280,6 +290,7 @@ impl<C, M> Builder<C, M, retry::Standard> {
operation_timeout_config: self.operation_timeout_config,
middleware: self.middleware,
sleep_impl: self.sleep_impl,
reconnect_mode: self.reconnect_mode,
}
}
}
Expand Down Expand Up @@ -347,6 +358,7 @@ impl<C, M, R> Builder<C, M, R> {
retry_policy: self.retry_policy,
operation_timeout_config: self.operation_timeout_config,
sleep_impl: self.sleep_impl,
reconnect_mode: self.reconnect_mode,
}
}

Expand All @@ -361,9 +373,41 @@ impl<C, M, R> Builder<C, M, R> {
retry_policy: self.retry_policy,
operation_timeout_config: self.operation_timeout_config,
sleep_impl: self.sleep_impl,
reconnect_mode: self.reconnect_mode,
}
}

/// Set the [`ReconnectMode`] for the retry strategy
///
/// By default, no reconnection occurs.
///
/// When enabled and a transient error is encountered, the connection in use will be poisoned.
/// This prevents reusing a connection to a potentially bad host.
pub fn reconnect_mode(mut self, reconnect_mode: ReconnectMode) -> Self {
self.set_reconnect_mode(Some(reconnect_mode));
self
}

/// Set the [`ReconnectMode`] for the retry strategy
///
/// By default, no reconnection occurs.
///
/// When enabled and a transient error is encountered, the connection in use will be poisoned.
/// This prevents reusing a connection to a potentially bad host.
pub fn set_reconnect_mode(&mut self, reconnect_mode: Option<ReconnectMode>) -> &mut Self {
self.reconnect_mode = reconnect_mode;
self
}

/// Enable reconnection on transient errors
///
/// By default, when a transient error is encountered, the connection in use will be poisoned.
/// This prevents reusing a connection to a potentially bad host but may increase the load on
/// the server.
pub fn reconnect_on_transient_errors(self) -> Self {
self.reconnect_mode(ReconnectMode::ReconnectOnTransientError)
}

/// Build a Smithy service [`Client`].
pub fn build(self) -> Client<C, M, R> {
let operation_timeout_config = self
Expand Down Expand Up @@ -392,6 +436,7 @@ impl<C, M, R> Builder<C, M, R> {
middleware: self.middleware,
operation_timeout_config,
sleep_impl: self.sleep_impl,
reconnect_mode: self.reconnect_mode.unwrap_or(default_reconnect_mode()),
}
}
}
Expand Down
2 changes: 2 additions & 0 deletions rust-runtime/aws-smithy-client/src/erase.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ where
retry_policy: self.retry_policy,
operation_timeout_config: self.operation_timeout_config,
sleep_impl: self.sleep_impl,
reconnect_mode: self.reconnect_mode,
}
}
}
Expand Down Expand Up @@ -101,6 +102,7 @@ where
retry_policy: self.retry_policy,
operation_timeout_config: self.operation_timeout_config,
sleep_impl: self.sleep_impl,
reconnect_mode: self.reconnect_mode,
}
}

Expand Down
Loading

0 comments on commit 61934da

Please sign in to comment.