Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/main' into jdisanti-rfc-client-c…
Browse files Browse the repository at this point in the history
…rate-org
  • Loading branch information
jdisanti committed Dec 8, 2022
2 parents 9924210 + 3b8e0ee commit df19d7a
Show file tree
Hide file tree
Showing 5 changed files with 286 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,17 @@ import software.amazon.smithy.rust.codegen.core.rustlang.CargoDependency.Compani
import software.amazon.smithy.rust.codegen.core.rustlang.CargoDependency.Companion.AsyncStream
import software.amazon.smithy.rust.codegen.core.rustlang.CargoDependency.Companion.BytesUtils
import software.amazon.smithy.rust.codegen.core.rustlang.CargoDependency.Companion.Criterion
import software.amazon.smithy.rust.codegen.core.rustlang.CargoDependency.Companion.FastRand
import software.amazon.smithy.rust.codegen.core.rustlang.CargoDependency.Companion.FuturesCore
import software.amazon.smithy.rust.codegen.core.rustlang.CargoDependency.Companion.FuturesUtil
import software.amazon.smithy.rust.codegen.core.rustlang.CargoDependency.Companion.HdrHistogram
import software.amazon.smithy.rust.codegen.core.rustlang.CargoDependency.Companion.Hound
import software.amazon.smithy.rust.codegen.core.rustlang.CargoDependency.Companion.SerdeJson
import software.amazon.smithy.rust.codegen.core.rustlang.CargoDependency.Companion.Smol
import software.amazon.smithy.rust.codegen.core.rustlang.CargoDependency.Companion.TempFile
import software.amazon.smithy.rust.codegen.core.rustlang.CargoDependency.Companion.Tokio
import software.amazon.smithy.rust.codegen.core.rustlang.CargoDependency.Companion.Tracing
import software.amazon.smithy.rust.codegen.core.rustlang.CargoDependency.Companion.TracingAppender
import software.amazon.smithy.rust.codegen.core.rustlang.CargoDependency.Companion.TracingSubscriber
import software.amazon.smithy.rust.codegen.core.rustlang.DependencyScope
import software.amazon.smithy.rust.codegen.core.rustlang.Writable
Expand Down Expand Up @@ -117,7 +120,10 @@ class S3TestDependencies : LibRsCustomization() {
writable {
addDependency(AsyncStd)
addDependency(BytesUtils)
addDependency(FastRand)
addDependency(HdrHistogram)
addDependency(Smol)
addDependency(TempFile)
addDependency(TracingAppender)
}
}
6 changes: 5 additions & 1 deletion aws/sdk/integration-tests/s3/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@ aws-smithy-types = { path = "../../build/aws-sdk/sdk/aws-smithy-types" }
aws-types = { path = "../../build/aws-sdk/sdk/aws-types" }
bytes = "1"
bytes-utils = "0.1.2"
fastrand = "1.8.0"
futures-util = "0.3.25"
hdrhistogram = "7.5.2"
http = "0.2.3"
http-body = "0.4.5"
hyper = "0.14.12"
Expand All @@ -29,4 +32,5 @@ smol = "1.2"
tempfile = "3"
tokio = { version = "1.8.4", features = ["macros", "test-util", "rt-multi-thread"] }
tracing = "0.1.37"
tracing-subscriber = { version = "0.3.15", features = ["env-filter"] }
tracing-appender = "0.2.2"
tracing-subscriber = { version = "0.3.15", features = ["env-filter", "json"] }
262 changes: 262 additions & 0 deletions aws/sdk/integration-tests/s3/tests/concurrency.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,262 @@
/*
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
* SPDX-License-Identifier: Apache-2.0
*/

use std::future::Future;
use std::iter::repeat_with;
use std::net::SocketAddr;
use std::sync::Arc;

use aws_sdk_s3::Client;
use aws_smithy_http::endpoint::Endpoint;
use aws_smithy_types::timeout::TimeoutConfig;
use aws_types::credentials::SharedCredentialsProvider;
use aws_types::region::Region;
use aws_types::{Credentials, SdkConfig};
use bytes::BytesMut;
use futures_util::future;
use hdrhistogram::sync::SyncHistogram;
use hdrhistogram::Histogram;
use tokio::sync::Semaphore;
use tokio::time::{Duration, Instant};
use tracing::debug;

const TASK_COUNT: usize = 10_000;
// Larger requests take longer to send, which means we'll consume more network resources per
// request, which means we can't support as many concurrent connections to S3.
const TASK_PAYLOAD_LENGTH: usize = 10_000;
// At 130 and above, this test will fail with a `ConnectorError` from `hyper`. I've seen:
// - ConnectorError { kind: Io, source: hyper::Error(Canceled, hyper::Error(Io, Os { code: 54, kind: ConnectionReset, message: "Connection reset by peer" })) }
// - ConnectorError { kind: Io, source: hyper::Error(BodyWrite, Os { code: 32, kind: BrokenPipe, message: "Broken pipe" }) }
// These errors don't necessarily occur when actually running against S3 with concurrency levels
// above 129. You can test it for yourself by running the
// `test_concurrency_put_object_against_live` test that appears at the bottom of this file.
const CONCURRENCY_LIMIT: usize = 129;

#[tokio::test(flavor = "multi_thread")]
async fn test_concurrency_on_multi_thread_against_dummy_server() {
let (server, server_addr) = start_agreeable_server().await;
let _ = tokio::spawn(server);
let sdk_config = SdkConfig::builder()
.credentials_provider(SharedCredentialsProvider::new(Credentials::new(
"ANOTREAL",
"notrealrnrELgWzOk3IfjzDKtFBhDby",
Some("notarealsessiontoken".to_string()),
None,
"test",
)))
.region(Region::new("us-east-1"))
.endpoint_resolver(
Endpoint::immutable(format!("http://{server_addr}")).expect("valid endpoint"),
)
.build();

test_concurrency(sdk_config).await;
}

#[tokio::test(flavor = "current_thread")]
async fn test_concurrency_on_single_thread_against_dummy_server() {
let (server, server_addr) = start_agreeable_server().await;
let _ = tokio::spawn(server);
let sdk_config = SdkConfig::builder()
.credentials_provider(SharedCredentialsProvider::new(Credentials::new(
"ANOTREAL",
"notrealrnrELgWzOk3IfjzDKtFBhDby",
Some("notarealsessiontoken".to_string()),
None,
"test",
)))
.region(Region::new("us-east-1"))
.endpoint_resolver(
Endpoint::immutable(format!("http://{server_addr}")).expect("valid endpoint"),
)
.build();

test_concurrency(sdk_config).await;
}

#[ignore = "this test runs against S3 and requires credentials"]
#[tokio::test(flavor = "multi_thread")]
async fn test_concurrency_on_multi_thread_against_s3() {
let sdk_config = aws_config::from_env()
.timeout_config(
TimeoutConfig::builder()
.connect_timeout(Duration::from_secs(30))
.read_timeout(Duration::from_secs(30))
.build(),
)
.load()
.await;

test_concurrency(sdk_config).await;
}

#[derive(Clone, Copy)]
enum State {
Listening,
Speaking,
}

// This server is agreeable because it always replies with `OK`
async fn start_agreeable_server() -> (impl Future<Output = ()>, SocketAddr) {
use tokio::net::{TcpListener, TcpStream};
use tokio::time::sleep;

let listener = TcpListener::bind("0.0.0.0:0")
.await
.expect("socket is free");
let bind_addr = listener.local_addr().unwrap();
async fn handle_tcp_stream(tcp_stream: TcpStream) {
let mut buf = BytesMut::new();
let mut state = State::Listening;

let response: &[u8] = b"HTTP/1.1 200 OK\r\n\r\n";
let mut bytes_left_to_write = response.len();

loop {
match state {
State::Listening => {
match tcp_stream.try_read_buf(&mut buf) {
Ok(_) => {
// Check for CRLF to see if we've received the entire HTTP request.
let s = String::from_utf8_lossy(&buf);
if let Some(content_length) = discern_content_length(&s) {
if let Some(body_length) = discern_body_length(&s) {
if body_length == content_length {
state = State::Speaking;
}
}
}
}
Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => {
// reading would block, sleeping for 1ms and then trying again
sleep(Duration::from_millis(1)).await;
}
Err(err) => {
panic!("{}", err)
}
}
}
State::Speaking => {
if tcp_stream.writable().await.is_ok() {
let bytes_written = tcp_stream.try_write(response).unwrap();
bytes_left_to_write -= bytes_written;
if bytes_left_to_write == 0 {
break;
}
}
}
}
}
}

let fut = async move {
loop {
let (tcp_stream, _addr) = listener
.accept()
.await
.expect("listener can accept new connections");
handle_tcp_stream(tcp_stream).await;
}
};

(fut, bind_addr)
}

fn discern_content_length(s: &str) -> Option<usize> {
// split on newlines
s.split("\r\n")
// throw out all lines that aren't the content-length header
.find(|s| s.contains("content-length: "))
// attempt to parse the numeric part of the header as a usize
.and_then(|s| s.trim_start_matches("content-length: ").parse().ok())
}

fn discern_body_length(s: &str) -> Option<usize> {
// If the request doesn't have a double CRLF, then we haven't finished reading it yet
if !s.contains("\r\n\r\n") {
return None;
}
// starting from end, split on the double CRLF that separates the body from the header
s.rsplit("\r\n\r\n")
// get the body, which must be the first element (we don't send trailers with PutObject requests)
.next()
// get the length of the body, in bytes, being sure to trim off the final newline
.map(|s| s.trim_end().len())
}

async fn test_concurrency(sdk_config: SdkConfig) {
let client = Client::new(&sdk_config);

let histogram =
Histogram::new_with_bounds(1, Duration::from_secs(60 * 60).as_nanos() as u64, 3)
.unwrap()
.into_sync();

debug!("creating futures");
// This semaphore ensures we only run up to <CONCURRENCY_LIMIT> requests at once.
let semaphore = Arc::new(Semaphore::new(CONCURRENCY_LIMIT));
let futures = (0..TASK_COUNT).map(|i| {
let client = client.clone();
let key = format!("concurrency/test_object_{:05}", i);
let body: Vec<_> = repeat_with(fastrand::alphanumeric)
.take(TASK_PAYLOAD_LENGTH)
.map(|c| c as u8)
.collect();
let fut = client
.put_object()
.bucket("your-test-bucket-here")
.key(key)
.body(body.into())
.send();
// make a clone of the semaphore and the recorder that can live in the future
let semaphore = semaphore.clone();
let mut histogram_recorder = histogram.recorder();

// because we wait on a permit from the semaphore, only <CONCURRENCY_LIMIT> futures
// will be run at once. Otherwise, we'd quickly get rate-limited by S3.
async move {
let permit = semaphore
.acquire()
.await
.expect("we'll get one if we wait long enough");
let start = Instant::now();
let res = fut.await.expect("request should succeed");
histogram_recorder.saturating_record(start.elapsed().as_nanos() as u64);
drop(permit);
res
}
});

debug!("joining futures");
let res: Vec<_> = future::join_all(futures).await;
// Assert we ran all the tasks
assert_eq!(TASK_COUNT, res.len());

display_metrics(
"Request Latency",
histogram,
"s",
Duration::from_secs(1).as_nanos() as f64,
);
}

fn display_metrics(name: &str, mut h: SyncHistogram<u64>, unit: &str, scale: f64) {
// Refreshing is required or else we won't see any results at all
h.refresh();
debug!("displaying {} results from {name} histogram", h.len());
debug!(
"{name}\n\
\tmean:\t{:.1}{unit},\n\
\tp50:\t{:.1}{unit},\n\
\tp90:\t{:.1}{unit},\n\
\tp99:\t{:.1}{unit},\n\
\tmax:\t{:.1}{unit}",
h.mean() / scale,
h.value_at_quantile(0.5) as f64 / scale,
h.value_at_quantile(0.9) as f64 / scale,
h.value_at_quantile(0.99) as f64 / scale,
h.max() as f64 / scale,
);
}
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ data class CargoDependency(
val Url: CargoDependency = CargoDependency("url", CratesIo("2.3.1"))
val Bytes: CargoDependency = CargoDependency("bytes", CratesIo("1.0.0"))
val BytesUtils: CargoDependency = CargoDependency("bytes-utils", CratesIo("0.1.0"))
val FastRand: CargoDependency = CargoDependency("fastrand", CratesIo("1.0.0"))
val FastRand: CargoDependency = CargoDependency("fastrand", CratesIo("1.8.0"))
val Hex: CargoDependency = CargoDependency("hex", CratesIo("0.4.3"))
val Http: CargoDependency = CargoDependency("http", CratesIo("0.2.0"))
val HttpBody: CargoDependency = CargoDependency("http-body", CratesIo("0.4.4"))
Expand All @@ -210,21 +210,27 @@ data class CargoDependency(
val AsyncStd: CargoDependency = CargoDependency("async-std", CratesIo("1.12.0"), DependencyScope.Dev)
val AsyncStream: CargoDependency = CargoDependency("async-stream", CratesIo("0.3.0"), DependencyScope.Dev)
val Criterion: CargoDependency = CargoDependency("criterion", CratesIo("0.4.0"), DependencyScope.Dev)
val FuturesCore: CargoDependency = CargoDependency("futures-core", CratesIo("0.3.0"), DependencyScope.Dev)
val FuturesUtil: CargoDependency = CargoDependency("futures-util", CratesIo("0.3.0"), DependencyScope.Dev)
val FuturesCore: CargoDependency = CargoDependency("futures-core", CratesIo("0.3.25"), DependencyScope.Dev)
val FuturesUtil: CargoDependency = CargoDependency("futures-util", CratesIo("0.3.25"), DependencyScope.Dev)
val HdrHistogram: CargoDependency = CargoDependency("hdrhistogram", CratesIo("7.5.2"), DependencyScope.Dev)
val Hound: CargoDependency = CargoDependency("hound", CratesIo("3.4.0"), DependencyScope.Dev)
val PrettyAssertions: CargoDependency =
CargoDependency("pretty_assertions", CratesIo("1.0.0"), DependencyScope.Dev)
CargoDependency("pretty_assertions", CratesIo("1.3.0"), DependencyScope.Dev)
val SerdeJson: CargoDependency = CargoDependency("serde_json", CratesIo("1.0.0"), DependencyScope.Dev)
val Smol: CargoDependency = CargoDependency("smol", CratesIo("1.2.0"), DependencyScope.Dev)
val TempFile: CargoDependency = CargoDependency("tempfile", CratesIo("3.2.0"), DependencyScope.Dev)
val Tokio: CargoDependency =
CargoDependency("tokio", CratesIo("1.8.4"), DependencyScope.Dev, features = setOf("macros", "test-util", "rt-multi-thread"))
val TracingAppender: CargoDependency = CargoDependency(
"tracing-appender",
CratesIo("0.2.2"),
DependencyScope.Dev,
)
val TracingSubscriber: CargoDependency = CargoDependency(
"tracing-subscriber",
CratesIo("0.3.15"),
CratesIo("0.3.16"),
DependencyScope.Dev,
features = setOf("env-filter"),
features = setOf("env-filter", "json"),
)

fun smithyAsync(runtimeConfig: RuntimeConfig) = runtimeConfig.runtimeCrate("async")
Expand Down
3 changes: 1 addition & 2 deletions rust-runtime/aws-smithy-http-server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,7 @@ once_cell = "1.13"
regex = "1.5.5"
serde_urlencoded = "0.7"
strum_macros = "0.24"
# TODO Investigate.
thiserror = "<=1.0.36"
thiserror = "1.0.0"
tracing = "0.1.35"
tokio = { version = "1.8.4", features = ["full"] }
tower = { version = "0.4.11", features = ["util", "make"], default-features = false }
Expand Down

0 comments on commit df19d7a

Please sign in to comment.