From 6eca6a87f29d283e06548fe215ed959ee685fdde Mon Sep 17 00:00:00 2001 From: Julian Antonielli Date: Thu, 8 Dec 2022 14:52:00 +0000 Subject: [PATCH 1/2] Don't require `thiserror` 1.0.36 (#2076) --- rust-runtime/aws-smithy-http-server/Cargo.toml | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/rust-runtime/aws-smithy-http-server/Cargo.toml b/rust-runtime/aws-smithy-http-server/Cargo.toml index d3f7b47d97..4497eb8806 100644 --- a/rust-runtime/aws-smithy-http-server/Cargo.toml +++ b/rust-runtime/aws-smithy-http-server/Cargo.toml @@ -35,8 +35,7 @@ once_cell = "1.13" regex = "1.5.5" serde_urlencoded = "0.7" strum_macros = "0.24" -# TODO Investigate. -thiserror = "<=1.0.36" +thiserror = "1.0.0" tracing = "0.1.35" tokio = { version = "1.8.4", features = ["full"] } tower = { version = "0.4.11", features = ["util", "make"], default-features = false } From 3b8e0eebe4cfbea97c8b6b88c9f11ec94e57832c Mon Sep 17 00:00:00 2001 From: Zelda Hessler Date: Thu, 8 Dec 2022 12:13:03 -0600 Subject: [PATCH 2/2] Add simple request concurrency tests to S3 integration test suite. (#2061) * add: multi-threaded concurrency test for S3 client add: single-threaded concurrency test for S3 client --- .../rustsdk/IntegrationTestDependencies.kt | 6 + aws/sdk/integration-tests/s3/Cargo.toml | 6 +- .../integration-tests/s3/tests/concurrency.rs | 262 ++++++++++++++++++ .../codegen/core/rustlang/CargoDependency.kt | 18 +- 4 files changed, 285 insertions(+), 7 deletions(-) create mode 100644 aws/sdk/integration-tests/s3/tests/concurrency.rs diff --git a/aws/sdk-codegen/src/main/kotlin/software/amazon/smithy/rustsdk/IntegrationTestDependencies.kt b/aws/sdk-codegen/src/main/kotlin/software/amazon/smithy/rustsdk/IntegrationTestDependencies.kt index 424fb2cef0..42a55481b1 100644 --- a/aws/sdk-codegen/src/main/kotlin/software/amazon/smithy/rustsdk/IntegrationTestDependencies.kt +++ b/aws/sdk-codegen/src/main/kotlin/software/amazon/smithy/rustsdk/IntegrationTestDependencies.kt @@ -13,14 +13,17 @@ import software.amazon.smithy.rust.codegen.core.rustlang.CargoDependency.Compani import software.amazon.smithy.rust.codegen.core.rustlang.CargoDependency.Companion.AsyncStream import software.amazon.smithy.rust.codegen.core.rustlang.CargoDependency.Companion.BytesUtils import software.amazon.smithy.rust.codegen.core.rustlang.CargoDependency.Companion.Criterion +import software.amazon.smithy.rust.codegen.core.rustlang.CargoDependency.Companion.FastRand import software.amazon.smithy.rust.codegen.core.rustlang.CargoDependency.Companion.FuturesCore import software.amazon.smithy.rust.codegen.core.rustlang.CargoDependency.Companion.FuturesUtil +import software.amazon.smithy.rust.codegen.core.rustlang.CargoDependency.Companion.HdrHistogram import software.amazon.smithy.rust.codegen.core.rustlang.CargoDependency.Companion.Hound import software.amazon.smithy.rust.codegen.core.rustlang.CargoDependency.Companion.SerdeJson import software.amazon.smithy.rust.codegen.core.rustlang.CargoDependency.Companion.Smol import software.amazon.smithy.rust.codegen.core.rustlang.CargoDependency.Companion.TempFile import software.amazon.smithy.rust.codegen.core.rustlang.CargoDependency.Companion.Tokio import software.amazon.smithy.rust.codegen.core.rustlang.CargoDependency.Companion.Tracing +import software.amazon.smithy.rust.codegen.core.rustlang.CargoDependency.Companion.TracingAppender import software.amazon.smithy.rust.codegen.core.rustlang.CargoDependency.Companion.TracingSubscriber import software.amazon.smithy.rust.codegen.core.rustlang.DependencyScope import software.amazon.smithy.rust.codegen.core.rustlang.Writable @@ -117,7 +120,10 @@ class S3TestDependencies : LibRsCustomization() { writable { addDependency(AsyncStd) addDependency(BytesUtils) + addDependency(FastRand) + addDependency(HdrHistogram) addDependency(Smol) addDependency(TempFile) + addDependency(TracingAppender) } } diff --git a/aws/sdk/integration-tests/s3/Cargo.toml b/aws/sdk/integration-tests/s3/Cargo.toml index a7c53fc3b6..987ed9c6e7 100644 --- a/aws/sdk/integration-tests/s3/Cargo.toml +++ b/aws/sdk/integration-tests/s3/Cargo.toml @@ -21,6 +21,9 @@ aws-smithy-types = { path = "../../build/aws-sdk/sdk/aws-smithy-types" } aws-types = { path = "../../build/aws-sdk/sdk/aws-types" } bytes = "1" bytes-utils = "0.1.2" +fastrand = "1.8.0" +futures-util = "0.3.25" +hdrhistogram = "7.5.2" http = "0.2.3" http-body = "0.4.5" hyper = "0.14.12" @@ -29,4 +32,5 @@ smol = "1.2" tempfile = "3" tokio = { version = "1.8.4", features = ["macros", "test-util", "rt-multi-thread"] } tracing = "0.1.37" -tracing-subscriber = { version = "0.3.15", features = ["env-filter"] } +tracing-appender = "0.2.2" +tracing-subscriber = { version = "0.3.15", features = ["env-filter", "json"] } diff --git a/aws/sdk/integration-tests/s3/tests/concurrency.rs b/aws/sdk/integration-tests/s3/tests/concurrency.rs new file mode 100644 index 0000000000..05e0117878 --- /dev/null +++ b/aws/sdk/integration-tests/s3/tests/concurrency.rs @@ -0,0 +1,262 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0 + */ + +use std::future::Future; +use std::iter::repeat_with; +use std::net::SocketAddr; +use std::sync::Arc; + +use aws_sdk_s3::Client; +use aws_smithy_http::endpoint::Endpoint; +use aws_smithy_types::timeout::TimeoutConfig; +use aws_types::credentials::SharedCredentialsProvider; +use aws_types::region::Region; +use aws_types::{Credentials, SdkConfig}; +use bytes::BytesMut; +use futures_util::future; +use hdrhistogram::sync::SyncHistogram; +use hdrhistogram::Histogram; +use tokio::sync::Semaphore; +use tokio::time::{Duration, Instant}; +use tracing::debug; + +const TASK_COUNT: usize = 10_000; +// Larger requests take longer to send, which means we'll consume more network resources per +// request, which means we can't support as many concurrent connections to S3. +const TASK_PAYLOAD_LENGTH: usize = 10_000; +// At 130 and above, this test will fail with a `ConnectorError` from `hyper`. I've seen: +// - ConnectorError { kind: Io, source: hyper::Error(Canceled, hyper::Error(Io, Os { code: 54, kind: ConnectionReset, message: "Connection reset by peer" })) } +// - ConnectorError { kind: Io, source: hyper::Error(BodyWrite, Os { code: 32, kind: BrokenPipe, message: "Broken pipe" }) } +// These errors don't necessarily occur when actually running against S3 with concurrency levels +// above 129. You can test it for yourself by running the +// `test_concurrency_put_object_against_live` test that appears at the bottom of this file. +const CONCURRENCY_LIMIT: usize = 129; + +#[tokio::test(flavor = "multi_thread")] +async fn test_concurrency_on_multi_thread_against_dummy_server() { + let (server, server_addr) = start_agreeable_server().await; + let _ = tokio::spawn(server); + let sdk_config = SdkConfig::builder() + .credentials_provider(SharedCredentialsProvider::new(Credentials::new( + "ANOTREAL", + "notrealrnrELgWzOk3IfjzDKtFBhDby", + Some("notarealsessiontoken".to_string()), + None, + "test", + ))) + .region(Region::new("us-east-1")) + .endpoint_resolver( + Endpoint::immutable(format!("http://{server_addr}")).expect("valid endpoint"), + ) + .build(); + + test_concurrency(sdk_config).await; +} + +#[tokio::test(flavor = "current_thread")] +async fn test_concurrency_on_single_thread_against_dummy_server() { + let (server, server_addr) = start_agreeable_server().await; + let _ = tokio::spawn(server); + let sdk_config = SdkConfig::builder() + .credentials_provider(SharedCredentialsProvider::new(Credentials::new( + "ANOTREAL", + "notrealrnrELgWzOk3IfjzDKtFBhDby", + Some("notarealsessiontoken".to_string()), + None, + "test", + ))) + .region(Region::new("us-east-1")) + .endpoint_resolver( + Endpoint::immutable(format!("http://{server_addr}")).expect("valid endpoint"), + ) + .build(); + + test_concurrency(sdk_config).await; +} + +#[ignore = "this test runs against S3 and requires credentials"] +#[tokio::test(flavor = "multi_thread")] +async fn test_concurrency_on_multi_thread_against_s3() { + let sdk_config = aws_config::from_env() + .timeout_config( + TimeoutConfig::builder() + .connect_timeout(Duration::from_secs(30)) + .read_timeout(Duration::from_secs(30)) + .build(), + ) + .load() + .await; + + test_concurrency(sdk_config).await; +} + +#[derive(Clone, Copy)] +enum State { + Listening, + Speaking, +} + +// This server is agreeable because it always replies with `OK` +async fn start_agreeable_server() -> (impl Future, SocketAddr) { + use tokio::net::{TcpListener, TcpStream}; + use tokio::time::sleep; + + let listener = TcpListener::bind("0.0.0.0:0") + .await + .expect("socket is free"); + let bind_addr = listener.local_addr().unwrap(); + async fn handle_tcp_stream(tcp_stream: TcpStream) { + let mut buf = BytesMut::new(); + let mut state = State::Listening; + + let response: &[u8] = b"HTTP/1.1 200 OK\r\n\r\n"; + let mut bytes_left_to_write = response.len(); + + loop { + match state { + State::Listening => { + match tcp_stream.try_read_buf(&mut buf) { + Ok(_) => { + // Check for CRLF to see if we've received the entire HTTP request. + let s = String::from_utf8_lossy(&buf); + if let Some(content_length) = discern_content_length(&s) { + if let Some(body_length) = discern_body_length(&s) { + if body_length == content_length { + state = State::Speaking; + } + } + } + } + Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => { + // reading would block, sleeping for 1ms and then trying again + sleep(Duration::from_millis(1)).await; + } + Err(err) => { + panic!("{}", err) + } + } + } + State::Speaking => { + if tcp_stream.writable().await.is_ok() { + let bytes_written = tcp_stream.try_write(response).unwrap(); + bytes_left_to_write -= bytes_written; + if bytes_left_to_write == 0 { + break; + } + } + } + } + } + } + + let fut = async move { + loop { + let (tcp_stream, _addr) = listener + .accept() + .await + .expect("listener can accept new connections"); + handle_tcp_stream(tcp_stream).await; + } + }; + + (fut, bind_addr) +} + +fn discern_content_length(s: &str) -> Option { + // split on newlines + s.split("\r\n") + // throw out all lines that aren't the content-length header + .find(|s| s.contains("content-length: ")) + // attempt to parse the numeric part of the header as a usize + .and_then(|s| s.trim_start_matches("content-length: ").parse().ok()) +} + +fn discern_body_length(s: &str) -> Option { + // If the request doesn't have a double CRLF, then we haven't finished reading it yet + if !s.contains("\r\n\r\n") { + return None; + } + // starting from end, split on the double CRLF that separates the body from the header + s.rsplit("\r\n\r\n") + // get the body, which must be the first element (we don't send trailers with PutObject requests) + .next() + // get the length of the body, in bytes, being sure to trim off the final newline + .map(|s| s.trim_end().len()) +} + +async fn test_concurrency(sdk_config: SdkConfig) { + let client = Client::new(&sdk_config); + + let histogram = + Histogram::new_with_bounds(1, Duration::from_secs(60 * 60).as_nanos() as u64, 3) + .unwrap() + .into_sync(); + + debug!("creating futures"); + // This semaphore ensures we only run up to requests at once. + let semaphore = Arc::new(Semaphore::new(CONCURRENCY_LIMIT)); + let futures = (0..TASK_COUNT).map(|i| { + let client = client.clone(); + let key = format!("concurrency/test_object_{:05}", i); + let body: Vec<_> = repeat_with(fastrand::alphanumeric) + .take(TASK_PAYLOAD_LENGTH) + .map(|c| c as u8) + .collect(); + let fut = client + .put_object() + .bucket("your-test-bucket-here") + .key(key) + .body(body.into()) + .send(); + // make a clone of the semaphore and the recorder that can live in the future + let semaphore = semaphore.clone(); + let mut histogram_recorder = histogram.recorder(); + + // because we wait on a permit from the semaphore, only futures + // will be run at once. Otherwise, we'd quickly get rate-limited by S3. + async move { + let permit = semaphore + .acquire() + .await + .expect("we'll get one if we wait long enough"); + let start = Instant::now(); + let res = fut.await.expect("request should succeed"); + histogram_recorder.saturating_record(start.elapsed().as_nanos() as u64); + drop(permit); + res + } + }); + + debug!("joining futures"); + let res: Vec<_> = future::join_all(futures).await; + // Assert we ran all the tasks + assert_eq!(TASK_COUNT, res.len()); + + display_metrics( + "Request Latency", + histogram, + "s", + Duration::from_secs(1).as_nanos() as f64, + ); +} + +fn display_metrics(name: &str, mut h: SyncHistogram, unit: &str, scale: f64) { + // Refreshing is required or else we won't see any results at all + h.refresh(); + debug!("displaying {} results from {name} histogram", h.len()); + debug!( + "{name}\n\ + \tmean:\t{:.1}{unit},\n\ + \tp50:\t{:.1}{unit},\n\ + \tp90:\t{:.1}{unit},\n\ + \tp99:\t{:.1}{unit},\n\ + \tmax:\t{:.1}{unit}", + h.mean() / scale, + h.value_at_quantile(0.5) as f64 / scale, + h.value_at_quantile(0.9) as f64 / scale, + h.value_at_quantile(0.99) as f64 / scale, + h.max() as f64 / scale, + ); +} diff --git a/codegen-core/src/main/kotlin/software/amazon/smithy/rust/codegen/core/rustlang/CargoDependency.kt b/codegen-core/src/main/kotlin/software/amazon/smithy/rust/codegen/core/rustlang/CargoDependency.kt index 60a43b8362..948304c569 100644 --- a/codegen-core/src/main/kotlin/software/amazon/smithy/rust/codegen/core/rustlang/CargoDependency.kt +++ b/codegen-core/src/main/kotlin/software/amazon/smithy/rust/codegen/core/rustlang/CargoDependency.kt @@ -191,7 +191,7 @@ data class CargoDependency( val Url: CargoDependency = CargoDependency("url", CratesIo("2.3.1")) val Bytes: CargoDependency = CargoDependency("bytes", CratesIo("1.0.0")) val BytesUtils: CargoDependency = CargoDependency("bytes-utils", CratesIo("0.1.0")) - val FastRand: CargoDependency = CargoDependency("fastrand", CratesIo("1.0.0")) + val FastRand: CargoDependency = CargoDependency("fastrand", CratesIo("1.8.0")) val Hex: CargoDependency = CargoDependency("hex", CratesIo("0.4.3")) val Http: CargoDependency = CargoDependency("http", CratesIo("0.2.0")) val HttpBody: CargoDependency = CargoDependency("http-body", CratesIo("0.4.4")) @@ -210,21 +210,27 @@ data class CargoDependency( val AsyncStd: CargoDependency = CargoDependency("async-std", CratesIo("1.12.0"), DependencyScope.Dev) val AsyncStream: CargoDependency = CargoDependency("async-stream", CratesIo("0.3.0"), DependencyScope.Dev) val Criterion: CargoDependency = CargoDependency("criterion", CratesIo("0.4.0"), DependencyScope.Dev) - val FuturesCore: CargoDependency = CargoDependency("futures-core", CratesIo("0.3.0"), DependencyScope.Dev) - val FuturesUtil: CargoDependency = CargoDependency("futures-util", CratesIo("0.3.0"), DependencyScope.Dev) + val FuturesCore: CargoDependency = CargoDependency("futures-core", CratesIo("0.3.25"), DependencyScope.Dev) + val FuturesUtil: CargoDependency = CargoDependency("futures-util", CratesIo("0.3.25"), DependencyScope.Dev) + val HdrHistogram: CargoDependency = CargoDependency("hdrhistogram", CratesIo("7.5.2"), DependencyScope.Dev) val Hound: CargoDependency = CargoDependency("hound", CratesIo("3.4.0"), DependencyScope.Dev) val PrettyAssertions: CargoDependency = - CargoDependency("pretty_assertions", CratesIo("1.0.0"), DependencyScope.Dev) + CargoDependency("pretty_assertions", CratesIo("1.3.0"), DependencyScope.Dev) val SerdeJson: CargoDependency = CargoDependency("serde_json", CratesIo("1.0.0"), DependencyScope.Dev) val Smol: CargoDependency = CargoDependency("smol", CratesIo("1.2.0"), DependencyScope.Dev) val TempFile: CargoDependency = CargoDependency("tempfile", CratesIo("3.2.0"), DependencyScope.Dev) val Tokio: CargoDependency = CargoDependency("tokio", CratesIo("1.8.4"), DependencyScope.Dev, features = setOf("macros", "test-util", "rt-multi-thread")) + val TracingAppender: CargoDependency = CargoDependency( + "tracing-appender", + CratesIo("0.2.2"), + DependencyScope.Dev, + ) val TracingSubscriber: CargoDependency = CargoDependency( "tracing-subscriber", - CratesIo("0.3.15"), + CratesIo("0.3.16"), DependencyScope.Dev, - features = setOf("env-filter"), + features = setOf("env-filter", "json"), ) fun smithyAsync(runtimeConfig: RuntimeConfig) = runtimeConfig.runtimeCrate("async")