diff --git a/CHANGELOG.next.toml b/CHANGELOG.next.toml index 7d66e72675..c5c9f7adff 100644 --- a/CHANGELOG.next.toml +++ b/CHANGELOG.next.toml @@ -474,3 +474,15 @@ message = "Enable custom auth schemes to work by changing the code generated aut references = ["smithy-rs#3034", "smithy-rs#3087"] meta = { "breaking" = false, "tada" = false, "bug" = true, "target" = "client" } author = "rcoh" + +[[smithy-rs]] +message = "Publicly exposed types from `http-body` and `hyper` crates within `aws-smithy-types` are now feature-gated. See the [upgrade guidance](https://github.com/awslabs/smithy-rs/discussions/3089) for details." +references = ["smithy-rs#3033", "smithy-rs#3088"] +meta = { "breaking" = true, "tada" = false, "bug" = false, "target" = "all" } +author = "ysaito1001" + +[[smithy-rs]] +message = "`ByteStream::poll_next` is now feature-gated. You can turn on a cargo feature `byte-stream-poll-next` in `aws-smithy-types` to use it." +references = ["smithy-rs#3033", "smithy-rs#3088"] +meta = { "breaking" = true, "tada" = false, "bug" = false, "target" = "all" } +author = "ysaito1001" diff --git a/aws/rust-runtime/aws-inlineable/Cargo.toml b/aws/rust-runtime/aws-inlineable/Cargo.toml index 2e902d2776..0657b00e5d 100644 --- a/aws/rust-runtime/aws-inlineable/Cargo.toml +++ b/aws/rust-runtime/aws-inlineable/Cargo.toml @@ -21,7 +21,7 @@ aws-smithy-checksums = { path = "../../../rust-runtime/aws-smithy-checksums" } aws-smithy-http = { path = "../../../rust-runtime/aws-smithy-http" } aws-smithy-runtime = { path = "../../../rust-runtime/aws-smithy-runtime", features = ["client"] } aws-smithy-runtime-api = { path = "../../../rust-runtime/aws-smithy-runtime-api", features = ["client"] } -aws-smithy-types = { path = "../../../rust-runtime/aws-smithy-types" } +aws-smithy-types = { path = "../../../rust-runtime/aws-smithy-types", features = ["http-body-0-4-x"] } bytes = "1" hex = "0.4.3" http = "0.2.9" diff --git a/aws/rust-runtime/aws-inlineable/src/http_request_checksum.rs b/aws/rust-runtime/aws-inlineable/src/http_request_checksum.rs index 846fbb5274..239133414e 100644 --- a/aws/rust-runtime/aws-inlineable/src/http_request_checksum.rs +++ b/aws/rust-runtime/aws-inlineable/src/http_request_checksum.rs @@ -19,7 +19,7 @@ use aws_smithy_runtime_api::client::interceptors::context::{ use aws_smithy_runtime_api::client::interceptors::Intercept; use aws_smithy_runtime_api::client::orchestrator::HttpRequest; use aws_smithy_runtime_api::client::runtime_components::RuntimeComponents; -use aws_smithy_types::body::{BoxBody, SdkBody}; +use aws_smithy_types::body::SdkBody; use aws_smithy_types::config_bag::{ConfigBag, Layer, Storable, StoreReplace}; use aws_smithy_types::error::operation::BuildError; use http::HeaderValue; @@ -173,7 +173,7 @@ fn wrap_streaming_request_body_in_checksum_calculating_body( let body = AwsChunkedBody::new(body, aws_chunked_body_options); - SdkBody::from_dyn(BoxBody::new(body)) + SdkBody::from_body_0_4(body) }) }; @@ -269,7 +269,7 @@ mod tests { let crc32c_checksum = crc32c_checksum.finalize(); let mut request = HttpRequest::new( - ByteStream::read_from() + ByteStream::read_with_body_0_4_from() .path(&file) .buffer_size(1024) .build() diff --git a/aws/rust-runtime/aws-inlineable/src/http_response_checksum.rs b/aws/rust-runtime/aws-inlineable/src/http_response_checksum.rs index d770b44bf3..687790cfff 100644 --- a/aws/rust-runtime/aws-inlineable/src/http_response_checksum.rs +++ b/aws/rust-runtime/aws-inlineable/src/http_response_checksum.rs @@ -14,7 +14,7 @@ use aws_smithy_runtime_api::client::interceptors::context::{ }; use aws_smithy_runtime_api::client::interceptors::Intercept; use aws_smithy_runtime_api::client::runtime_components::RuntimeComponents; -use aws_smithy_types::body::{BoxBody, SdkBody}; +use aws_smithy_types::body::SdkBody; use aws_smithy_types::config_bag::{ConfigBag, Layer, Storable, StoreReplace}; use http::HeaderValue; use std::{fmt, mem}; @@ -119,11 +119,11 @@ pub(crate) fn wrap_body_with_checksum_validator( use aws_smithy_checksums::body::validate; body.map(move |body| { - SdkBody::from_dyn(BoxBody::new(validate::ChecksumBody::new( + SdkBody::from_body_0_4(validate::ChecksumBody::new( body, checksum_algorithm.into_impl(), precalculated_checksum.clone(), - ))) + )) }) } diff --git a/aws/sdk/integration-tests/glacier/tests/custom-headers.rs b/aws/sdk/integration-tests/glacier/tests/custom-headers.rs index 52194567cc..46536dd2cd 100644 --- a/aws/sdk/integration-tests/glacier/tests/custom-headers.rs +++ b/aws/sdk/integration-tests/glacier/tests/custom-headers.rs @@ -21,7 +21,11 @@ async fn set_correct_headers() { let _resp = client .upload_archive() .vault_name("vault") - .body(ByteStream::from_path("tests/test-file.txt").await.unwrap()) + .body( + ByteStream::from_path_body_0_4("tests/test-file.txt") + .await + .unwrap(), + ) .send() .await; let req = handler.expect_request(); diff --git a/aws/sdk/integration-tests/s3/tests/checksums.rs b/aws/sdk/integration-tests/s3/tests/checksums.rs index 37782182e0..97155bfc09 100644 --- a/aws/sdk/integration-tests/s3/tests/checksums.rs +++ b/aws/sdk/integration-tests/s3/tests/checksums.rs @@ -177,7 +177,7 @@ async fn test_checksum_on_streaming_request<'a>( use std::io::Write; file.write_all(body).unwrap(); - let body = aws_sdk_s3::primitives::ByteStream::read_from() + let body = aws_sdk_s3::primitives::ByteStream::read_with_body_0_4_from() .path(file.path()) .buffer_size(1024) .build() diff --git a/codegen-client/src/main/kotlin/software/amazon/smithy/rust/codegen/client/smithy/customize/RequiredCustomizations.kt b/codegen-client/src/main/kotlin/software/amazon/smithy/rust/codegen/client/smithy/customize/RequiredCustomizations.kt index a1956f442e..22ec93e842 100644 --- a/codegen-client/src/main/kotlin/software/amazon/smithy/rust/codegen/client/smithy/customize/RequiredCustomizations.kt +++ b/codegen-client/src/main/kotlin/software/amazon/smithy/rust/codegen/client/smithy/customize/RequiredCustomizations.kt @@ -71,12 +71,12 @@ class RequiredCustomizations : ClientCodegenDecorator { override fun extras(codegenContext: ClientCodegenContext, rustCrate: RustCrate) { val rc = codegenContext.runtimeConfig - // Add rt-tokio feature for `ByteStream::from_path` + // Add rt-tokio and http-body-0-4-x features for `ByteStream::from_path_0_4` rustCrate.mergeFeature( Feature( "rt-tokio", true, - listOf("aws-smithy-async/rt-tokio", "aws-smithy-http/rt-tokio"), + listOf("aws-smithy-async/rt-tokio", "aws-smithy-types/rt-tokio", "aws-smithy-types/http-body-0-4-x"), ), ) diff --git a/codegen-client/src/main/kotlin/software/amazon/smithy/rust/codegen/client/smithy/protocols/HttpBoundProtocolGenerator.kt b/codegen-client/src/main/kotlin/software/amazon/smithy/rust/codegen/client/smithy/protocols/HttpBoundProtocolGenerator.kt index 1f60c251ac..f3cc0abad0 100644 --- a/codegen-client/src/main/kotlin/software/amazon/smithy/rust/codegen/client/smithy/protocols/HttpBoundProtocolGenerator.kt +++ b/codegen-client/src/main/kotlin/software/amazon/smithy/rust/codegen/client/smithy/protocols/HttpBoundProtocolGenerator.kt @@ -29,12 +29,12 @@ class ClientHttpBoundProtocolPayloadGenerator( _cfg.interceptor_state().store_put(signer_sender); let adapter: #{aws_smithy_http}::event_stream::MessageStreamAdapter<_, _> = ${params.outerName}.${params.memberName}.into_body_stream(marshaller, error_marshaller, signer); - let body: #{SdkBody} = #{hyper}::Body::wrap_stream(adapter).into(); - body + #{SdkBody}::from_body_0_4(#{hyper}::Body::wrap_stream(adapter)) } """, "hyper" to CargoDependency.HyperWithStream.toType(), - "SdkBody" to RuntimeType.sdkBody(codegenContext.runtimeConfig), + "SdkBody" to CargoDependency.smithyTypes(codegenContext.runtimeConfig).withFeature("http-body-0-4-x") + .toType().resolve("body::SdkBody"), "aws_smithy_http" to RuntimeType.smithyHttp(codegenContext.runtimeConfig), "DeferredSigner" to RuntimeType.smithyEventStream(codegenContext.runtimeConfig).resolve("frame::DeferredSigner"), "marshallerConstructorFn" to params.marshallerConstructorFn, diff --git a/codegen-server/src/main/kotlin/software/amazon/smithy/rust/codegen/server/smithy/customizations/ServerRequiredCustomizations.kt b/codegen-server/src/main/kotlin/software/amazon/smithy/rust/codegen/server/smithy/customizations/ServerRequiredCustomizations.kt index a353eb636b..dde2e91bae 100644 --- a/codegen-server/src/main/kotlin/software/amazon/smithy/rust/codegen/server/smithy/customizations/ServerRequiredCustomizations.kt +++ b/codegen-server/src/main/kotlin/software/amazon/smithy/rust/codegen/server/smithy/customizations/ServerRequiredCustomizations.kt @@ -37,8 +37,14 @@ class ServerRequiredCustomizations : ServerCodegenDecorator { override fun extras(codegenContext: ServerCodegenContext, rustCrate: RustCrate) { val rc = codegenContext.runtimeConfig - // Add rt-tokio feature for `ByteStream::from_path` - rustCrate.mergeFeature(Feature("rt-tokio", true, listOf("aws-smithy-http/rt-tokio"))) + // Add rt-tokio and http-body-0-4-x features for `ByteStream::from_path_body_0_4` + rustCrate.mergeFeature( + Feature( + "rt-tokio", + true, + listOf("aws-smithy-types/rt-tokio", "aws-smithy-types/http-body-0-4-x"), + ), + ) rustCrate.withModule(ServerRustModule.Types) { pubUseSmithyPrimitives(codegenContext, codegenContext.model)(this) diff --git a/rust-runtime/aws-smithy-http-server-python/Cargo.toml b/rust-runtime/aws-smithy-http-server-python/Cargo.toml index 61ce57d7a7..ebf17bd065 100644 --- a/rust-runtime/aws-smithy-http-server-python/Cargo.toml +++ b/rust-runtime/aws-smithy-http-server-python/Cargo.toml @@ -16,7 +16,7 @@ publish = true aws-smithy-http = { path = "../aws-smithy-http" } aws-smithy-http-server = { path = "../aws-smithy-http-server", features = ["aws-lambda"] } aws-smithy-json = { path = "../aws-smithy-json" } -aws-smithy-types = { path = "../aws-smithy-types" } +aws-smithy-types = { path = "../aws-smithy-types", features = ["byte-stream-poll-next", "http-body-0-4-x"] } aws-smithy-xml = { path = "../aws-smithy-xml" } bytes = "1.2" futures = "0.3" diff --git a/rust-runtime/aws-smithy-http-server-python/src/pytests/bytestream.rs b/rust-runtime/aws-smithy-http-server-python/src/pytests/bytestream.rs index c82ffb233b..0f15f5177e 100644 --- a/rust-runtime/aws-smithy-http-server-python/src/pytests/bytestream.rs +++ b/rust-runtime/aws-smithy-http-server-python/src/pytests/bytestream.rs @@ -147,5 +147,5 @@ async def handler(bytestream): fn streaming_bytestream_from_vec(chunks: Vec<&'static str>) -> ByteStream { let stream = stream::iter(chunks.into_iter().map(Ok::<_, io::Error>)); let body = Body::wrap_stream(stream); - ByteStream::new(SdkBody::from(body)) + ByteStream::new(SdkBody::from_body_0_4(body)) } diff --git a/rust-runtime/aws-smithy-http-server-python/src/types.rs b/rust-runtime/aws-smithy-http-server-python/src/types.rs index 1af75dbd27..a475f62351 100644 --- a/rust-runtime/aws-smithy-http-server-python/src/types.rs +++ b/rust-runtime/aws-smithy-http-server-python/src/types.rs @@ -407,7 +407,7 @@ impl ByteStream { #[staticmethod] pub fn from_path_blocking(py: Python, path: String) -> PyResult> { let byte_stream = Handle::current().block_on(async { - aws_smithy_types::byte_stream::ByteStream::from_path(path) + aws_smithy_types::byte_stream::ByteStream::from_path_body_0_4(path) .await .map_err(|e| PyRuntimeError::new_err(e.to_string())) })?; @@ -423,7 +423,7 @@ impl ByteStream { #[staticmethod] pub fn from_path(py: Python, path: String) -> PyResult<&PyAny> { pyo3_asyncio::tokio::future_into_py(py, async move { - let byte_stream = aws_smithy_types::byte_stream::ByteStream::from_path(path) + let byte_stream = aws_smithy_types::byte_stream::ByteStream::from_path_body_0_4(path) .await .map_err(|e| PyRuntimeError::new_err(e.to_string()))?; Ok(Self(Arc::new(Mutex::new(byte_stream)))) diff --git a/rust-runtime/aws-smithy-http-server/Cargo.toml b/rust-runtime/aws-smithy-http-server/Cargo.toml index f1cd62c2d4..3193b69e75 100644 --- a/rust-runtime/aws-smithy-http-server/Cargo.toml +++ b/rust-runtime/aws-smithy-http-server/Cargo.toml @@ -21,7 +21,7 @@ request-id = ["dep:uuid"] async-trait = "0.1" aws-smithy-http = { path = "../aws-smithy-http", features = ["rt-tokio"] } aws-smithy-json = { path = "../aws-smithy-json" } -aws-smithy-types = { path = "../aws-smithy-types" } +aws-smithy-types = { path = "../aws-smithy-types", features = ["http-body-0-4-x", "hyper-0-14-x"] } aws-smithy-xml = { path = "../aws-smithy-xml" } bytes = "1.1" futures-util = { version = "0.3.16", default-features = false } diff --git a/rust-runtime/aws-smithy-http/Cargo.toml b/rust-runtime/aws-smithy-http/Cargo.toml index 4f9641013e..e8bbb1e61f 100644 --- a/rust-runtime/aws-smithy-http/Cargo.toml +++ b/rust-runtime/aws-smithy-http/Cargo.toml @@ -16,7 +16,7 @@ rt-tokio = ["aws-smithy-types/rt-tokio"] [dependencies] aws-smithy-eventstream = { path = "../aws-smithy-eventstream", optional = true } -aws-smithy-types = { path = "../aws-smithy-types" } +aws-smithy-types = { path = "../aws-smithy-types", features = ["byte-stream-poll-next", "http-body-0-4-x"] } bytes = "1" bytes-utils = "0.1" http = "0.2.3" diff --git a/rust-runtime/aws-smithy-http/src/body.rs b/rust-runtime/aws-smithy-http/src/body.rs index bd961ce383..05e9776ccc 100644 --- a/rust-runtime/aws-smithy-http/src/body.rs +++ b/rust-runtime/aws-smithy-http/src/body.rs @@ -8,10 +8,6 @@ //! Types for representing the body of an HTTP request or response -/// A boxed generic HTTP body that, when consumed, will result in [`Bytes`](bytes::Bytes) or an [`Error`](aws_smithy_types::body::Error). -#[deprecated(note = "Moved to `aws_smithy_types::body::BoxBody`.")] -pub type BoxBody = aws_smithy_types::body::BoxBody; - /// A generic, boxed error that's `Send` and `Sync` #[deprecated(note = "`Moved to `aws_smithy_types::body::Error`.")] pub type Error = aws_smithy_types::body::Error; diff --git a/rust-runtime/aws-smithy-http/src/event_stream/receiver.rs b/rust-runtime/aws-smithy-http/src/event_stream/receiver.rs index eb6e754431..dd9b9cd021 100644 --- a/rust-runtime/aws-smithy-http/src/event_stream/receiver.rs +++ b/rust-runtime/aws-smithy-http/src/event_stream/receiver.rs @@ -342,7 +342,7 @@ mod tests { let chunks: Vec> = vec![Ok(encode_message("one")), Ok(encode_message("two"))]; let chunk_stream = futures_util::stream::iter(chunks); - let body = SdkBody::from(Body::wrap_stream(chunk_stream)); + let body = SdkBody::from_body_0_4(Body::wrap_stream(chunk_stream)); let mut receiver = Receiver::::new(Unmarshaller, body); assert_eq!( TestMessage("one".into()), @@ -363,7 +363,7 @@ mod tests { Ok(Bytes::from_static(&[])), ]; let chunk_stream = futures_util::stream::iter(chunks); - let body = SdkBody::from(Body::wrap_stream(chunk_stream)); + let body = SdkBody::from_body_0_4(Body::wrap_stream(chunk_stream)); let mut receiver = Receiver::::new(Unmarshaller, body); assert_eq!( TestMessage("one".into()), @@ -384,7 +384,7 @@ mod tests { Ok(encode_message("three").split_to(10)), ]; let chunk_stream = futures_util::stream::iter(chunks); - let body = SdkBody::from(Body::wrap_stream(chunk_stream)); + let body = SdkBody::from_body_0_4(Body::wrap_stream(chunk_stream)); let mut receiver = Receiver::::new(Unmarshaller, body); assert_eq!( TestMessage("one".into()), @@ -410,7 +410,7 @@ mod tests { )), ]; let chunk_stream = futures_util::stream::iter(chunks); - let body = SdkBody::from(Body::wrap_stream(chunk_stream)); + let body = SdkBody::from_body_0_4(Body::wrap_stream(chunk_stream)); let mut receiver = Receiver::::new(Unmarshaller, body); assert_eq!( TestMessage("one".into()), @@ -463,7 +463,7 @@ mod tests { ]; let chunk_stream = futures_util::stream::iter(chunks); - let body = SdkBody::from(Body::wrap_stream(chunk_stream)); + let body = SdkBody::from_body_0_4(Body::wrap_stream(chunk_stream)); let mut receiver = Receiver::::new(Unmarshaller, body); for payload in &["one", "two", "three", "four", "five", "six", "seven", "eight"] { assert_eq!( @@ -483,7 +483,7 @@ mod tests { Err(IOError::new(ErrorKind::ConnectionReset, FakeError)), ]; let chunk_stream = futures_util::stream::iter(chunks); - let body = SdkBody::from(Body::wrap_stream(chunk_stream)); + let body = SdkBody::from_body_0_4(Body::wrap_stream(chunk_stream)); let mut receiver = Receiver::::new(Unmarshaller, body); assert_eq!( TestMessage("one".into()), @@ -504,7 +504,7 @@ mod tests { Ok(Bytes::from_static(&[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0])), ]; let chunk_stream = futures_util::stream::iter(chunks); - let body = SdkBody::from(Body::wrap_stream(chunk_stream)); + let body = SdkBody::from_body_0_4(Body::wrap_stream(chunk_stream)); let mut receiver = Receiver::::new(Unmarshaller, body); assert_eq!( TestMessage("one".into()), @@ -521,7 +521,7 @@ mod tests { let chunks: Vec> = vec![Ok(encode_initial_response()), Ok(encode_message("one"))]; let chunk_stream = futures_util::stream::iter(chunks); - let body = SdkBody::from(Body::wrap_stream(chunk_stream)); + let body = SdkBody::from_body_0_4(Body::wrap_stream(chunk_stream)); let mut receiver = Receiver::::new(Unmarshaller, body); assert!(receiver.try_recv_initial().await.unwrap().is_some()); assert_eq!( @@ -535,7 +535,7 @@ mod tests { let chunks: Vec> = vec![Ok(encode_message("one")), Ok(encode_message("two"))]; let chunk_stream = futures_util::stream::iter(chunks); - let body = SdkBody::from(Body::wrap_stream(chunk_stream)); + let body = SdkBody::from_body_0_4(Body::wrap_stream(chunk_stream)); let mut receiver = Receiver::::new(Unmarshaller, body); assert!(receiver.try_recv_initial().await.unwrap().is_none()); assert_eq!( diff --git a/rust-runtime/aws-smithy-runtime/Cargo.toml b/rust-runtime/aws-smithy-runtime/Cargo.toml index 751758be8e..f0af47c00b 100644 --- a/rust-runtime/aws-smithy-runtime/Cargo.toml +++ b/rust-runtime/aws-smithy-runtime/Cargo.toml @@ -25,7 +25,7 @@ aws-smithy-async = { path = "../aws-smithy-async" } aws-smithy-http = { path = "../aws-smithy-http" } aws-smithy-protocol-test = { path = "../aws-smithy-protocol-test", optional = true } aws-smithy-runtime-api = { path = "../aws-smithy-runtime-api" } -aws-smithy-types = { path = "../aws-smithy-types" } +aws-smithy-types = { path = "../aws-smithy-types", features = ["http-body-0-4-x"] } bytes = "1" fastrand = "2.0.0" http = "0.2.8" diff --git a/rust-runtime/aws-smithy-runtime/src/client/http/hyper_014.rs b/rust-runtime/aws-smithy-runtime/src/client/http/hyper_014.rs index 7bcdc13082..abb643ab89 100644 --- a/rust-runtime/aws-smithy-runtime/src/client/http/hyper_014.rs +++ b/rust-runtime/aws-smithy-runtime/src/client/http/hyper_014.rs @@ -358,7 +358,10 @@ where let mut client = self.client.clone(); let fut = client.call(request); HttpConnectorFuture::new(async move { - Ok(fut.await.map_err(downcast_error)?.map(SdkBody::from)) + Ok(fut + .await + .map_err(downcast_error)? + .map(SdkBody::from_body_0_4)) }) } } diff --git a/rust-runtime/aws-smithy-runtime/src/client/http/test_util/dvr/record.rs b/rust-runtime/aws-smithy-runtime/src/client/http/test_util/dvr/record.rs index 912a409b26..3403ffcc0d 100644 --- a/rust-runtime/aws-smithy-runtime/src/client/http/test_util/dvr/record.rs +++ b/rust-runtime/aws-smithy-runtime/src/client/http/test_util/dvr/record.rs @@ -88,7 +88,7 @@ fn record_body( event_bus: Arc>>, ) -> JoinHandle<()> { let (sender, output_body) = hyper::Body::channel(); - let real_body = std::mem::replace(body, SdkBody::from(output_body)); + let real_body = std::mem::replace(body, SdkBody::from_body_0_4(output_body)); tokio::spawn(async move { let mut real_body = real_body; let mut sender = sender; diff --git a/rust-runtime/aws-smithy-runtime/src/client/http/test_util/dvr/replay.rs b/rust-runtime/aws-smithy-runtime/src/client/http/test_util/dvr/replay.rs index d0f868a19c..f24cb36fb2 100644 --- a/rust-runtime/aws-smithy-runtime/src/client/http/test_util/dvr/replay.rs +++ b/rust-runtime/aws-smithy-runtime/src/client/http/test_util/dvr/replay.rs @@ -294,7 +294,7 @@ impl HttpConnector for ReplayingClient { let _initial_request = events.pop_front().unwrap(); let (sender, response_body) = hyper::Body::channel(); - let body = SdkBody::from(response_body); + let body = SdkBody::from_body_0_4(response_body); let recording = self.recorded_requests.clone(); let recorded_request = tokio::spawn(async move { let mut data_read = vec![]; diff --git a/rust-runtime/aws-smithy-runtime/tests/reconnect_on_transient_error.rs b/rust-runtime/aws-smithy-runtime/tests/reconnect_on_transient_error.rs index 9b4f4c2666..c07ccab405 100644 --- a/rust-runtime/aws-smithy-runtime/tests/reconnect_on_transient_error.rs +++ b/rust-runtime/aws-smithy-runtime/tests/reconnect_on_transient_error.rs @@ -23,7 +23,7 @@ use aws_smithy_runtime::{ev, match_events}; use aws_smithy_runtime_api::client::interceptors::context::InterceptorContext; use aws_smithy_runtime_api::client::orchestrator::OrchestratorError; use aws_smithy_runtime_api::client::retries::classifiers::{ClassifyRetry, RetryAction}; -use aws_smithy_types::body::{BoxBody, SdkBody}; +use aws_smithy_types::body::SdkBody; use aws_smithy_types::retry::{ErrorKind, ProvideErrorKind, ReconnectMode, RetryConfig}; use aws_smithy_types::timeout::TimeoutConfig; use hyper::client::Builder as HyperBuilder; @@ -150,7 +150,7 @@ async fn wire_level_test( let request = http::Request::builder() .uri(endpoint_url.clone()) // Make the body non-replayable since we don't actually want to retry - .body(SdkBody::from_dyn(BoxBody::new(SdkBody::from("body")))) + .body(SdkBody::from_body_0_4(SdkBody::from("body"))) .unwrap() .try_into() .unwrap(); diff --git a/rust-runtime/aws-smithy-types/Cargo.toml b/rust-runtime/aws-smithy-types/Cargo.toml index 0b3796d9bd..42f50191a9 100644 --- a/rust-runtime/aws-smithy-types/Cargo.toml +++ b/rust-runtime/aws-smithy-types/Cargo.toml @@ -11,6 +11,9 @@ license = "Apache-2.0" repository = "https://github.com/awslabs/smithy-rs" [features] +byte-stream-poll-next = [] +http-body-0-4-x = ["dep:http-body-0-4"] +hyper-0-14-x = ["dep:hyper-0-14"] rt-tokio = ["dep:tokio-util", "dep:tokio", "tokio?/rt", "tokio?/fs", "tokio?/io-util", "tokio-util?/io"] test-util = [] serde-serialize = [] @@ -21,8 +24,8 @@ base64-simd = "0.8" bytes = "1" bytes-utils = "0.1" http = "0.2.3" -http-body = "0.4.4" -hyper = "0.14.26" +http-body-0-4 = { package = "http-body", version = "0.4.4", optional = true } +hyper-0-14 = { package = "hyper", version = "0.14.26", optional = true } itoa = "1.0.0" num-integer = "0.1.44" pin-project-lite = "0.2.9" diff --git a/rust-runtime/aws-smithy-types/external-types.toml b/rust-runtime/aws-smithy-types/external-types.toml index ad90d2a11b..4609bce1d1 100644 --- a/rust-runtime/aws-smithy-types/external-types.toml +++ b/rust-runtime/aws-smithy-types/external-types.toml @@ -2,14 +2,13 @@ allowed_external_types = [ "bytes::bytes::Bytes", "bytes::buf::buf_impl::Buf", - # TODO(https://github.com/awslabs/smithy-rs/issues/3033): Feature gate based on unstable versions + # TODO(https://github.com/awslabs/smithy-rs/issues/2412): Support cargo-features for cargo-check-external-types "http_body::Body", - "http_body::combinators::box_body::BoxBody", "hyper::body::body::Body", - # TODO(https://github.com/awslabs/smithy-rs/issues/1193): Feature gate Tokio `AsyncRead` + # TODO(https://github.com/awslabs/smithy-rs/issues/2412): Support cargo-features for cargo-check-external-types "tokio::io::async_read::AsyncRead", - # TODO(https://github.com/awslabs/smithy-rs/issues/1193): Feature gate references to Tokio `File` + # TODO(https://github.com/awslabs/smithy-rs/issues/2412): Support cargo-features for cargo-check-external-types "tokio::fs::file::File", ] diff --git a/rust-runtime/aws-smithy-types/src/body.rs b/rust-runtime/aws-smithy-types/src/body.rs index ae21207bfa..043aa47df3 100644 --- a/rust-runtime/aws-smithy-types/src/body.rs +++ b/rust-runtime/aws-smithy-types/src/body.rs @@ -6,15 +6,20 @@ //! Types for representing the body of an HTTP request or response use bytes::Bytes; -use http::{HeaderMap, HeaderValue}; -use http_body::{Body, SizeHint}; use pin_project_lite::pin_project; use std::error::Error as StdError; use std::fmt::{self, Debug, Formatter}; +use std::future::poll_fn; use std::pin::Pin; use std::sync::Arc; use std::task::{Context, Poll}; +/// This module is named after the `http-body` version number since we anticipate +/// needing to provide equivalent functionality for 1.x of that crate in the future. +/// The name has a suffix `_x` to avoid name collision with a third-party `http-body-0-4`. +#[cfg(feature = "http-body-0-4-x")] +pub mod http_body_0_4_x; + /// A generic, boxed error that's `Send` and `Sync` pub type Error = Box; @@ -49,7 +54,10 @@ impl Debug for SdkBody { } /// A boxed generic HTTP body that, when consumed, will result in [`Bytes`] or an [`Error`]. -pub type BoxBody = http_body::combinators::BoxBody; +enum BoxBody { + #[cfg(feature = "http-body-0-4-x")] + HttpBody04(http_body_0_4::combinators::BoxBody), +} pin_project! { #[project = InnerProj] @@ -59,14 +67,9 @@ pin_project! { inner: Option }, // A streaming body - Streaming { - #[pin] - inner: hyper::Body - }, - // Also a streaming body Dyn { #[pin] - inner: BoxBody + inner: BoxBody, }, /// When a streaming body is transferred out to a stream parser, the body is replaced with @@ -80,33 +83,22 @@ impl Debug for Inner { fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { match &self { Inner::Once { inner: once } => f.debug_tuple("Once").field(once).finish(), - Inner::Streaming { inner: streaming } => { - f.debug_tuple("Streaming").field(streaming).finish() - } - Inner::Taken => f.debug_tuple("Taken").finish(), Inner::Dyn { .. } => write!(f, "BoxBody"), + Inner::Taken => f.debug_tuple("Taken").finish(), } } } impl SdkBody { - /// Construct an SdkBody from a Boxed implementation of http::Body - pub fn from_dyn(body: BoxBody) -> Self { - Self { - inner: Inner::Dyn { inner: body }, - rebuild: None, - bytes_contents: None, - } - } - /// Construct an explicitly retryable SDK body /// /// _Note: This is probably not what you want_ /// /// All bodies constructed from in-memory data (`String`, `Vec`, `Bytes`, etc.) will be - /// retryable out of the box. If you want to read data from a file, you should use - /// [`ByteStream::from_path`](crate::byte_stream::ByteStream::from_path). This function - /// is only necessary when you need to enable retries for your own streaming container. + /// retryable out of the box. If you want to read data from a file, you should turn on a feature + /// `http-body-0-4-x` and use `ByteStream::from_path_body_0_4`. + /// + /// This function is only necessary when you need to enable retries for your own streaming container. pub fn retryable(f: impl Fn() -> SdkBody + Send + Sync + 'static) -> Self { let initial = f(); SdkBody { @@ -135,9 +127,14 @@ impl SdkBody { } } - fn poll_inner( + pub(crate) async fn next(&mut self) -> Option> { + let mut me = Pin::new(self); + poll_fn(|cx| me.as_mut().poll_next(cx)).await + } + + pub(crate) fn poll_next( self: Pin<&mut Self>, - cx: &mut Context<'_>, + #[allow(unused)] cx: &mut Context<'_>, ) -> Poll>> { let this = self.project(); match this.inner.project() { @@ -149,8 +146,17 @@ impl SdkBody { None => Poll::Ready(None), } } - InnerProj::Streaming { inner: body } => body.poll_data(cx).map_err(|e| e.into()), - InnerProj::Dyn { inner: box_body } => box_body.poll_data(cx), + InnerProj::Dyn { inner: body } => match body.get_mut() { + #[cfg(feature = "http-body-0-4-x")] + BoxBody::HttpBody04(box_body) => { + use http_body_0_4::Body; + Pin::new(box_body).poll_data(cx) + } + #[allow(unreachable_patterns)] + _ => unreachable!( + "enabling `http-body-0-4-x` is the only way to create the `Dyn` variant" + ), + }, InnerProj::Taken => { Poll::Ready(Some(Err("A `Taken` body should never be polled".into()))) } @@ -184,7 +190,53 @@ impl SdkBody { /// Return the length, in bytes, of this SdkBody. If this returns `None`, then the body does not /// have a known length. pub fn content_length(&self) -> Option { - http_body::Body::size_hint(self).exact() + match self.bounds_on_remaining_length() { + (lo, Some(hi)) if lo == hi => Some(lo), + _ => None, + } + } + + #[allow(dead_code)] // used by a feature-gated `http-body`'s trait method + pub(crate) fn is_end_stream(&self) -> bool { + match &self.inner { + Inner::Once { inner: None } => true, + Inner::Once { inner: Some(bytes) } => bytes.is_empty(), + Inner::Dyn { inner: box_body } => match box_body { + #[cfg(feature = "http-body-0-4-x")] + BoxBody::HttpBody04(box_body) => { + use http_body_0_4::Body; + box_body.is_end_stream() + } + #[allow(unreachable_patterns)] + _ => unreachable!( + "enabling `http-body-0-4-x` is the only way to create the `Dyn` variant" + ), + }, + Inner::Taken => true, + } + } + + pub(crate) fn bounds_on_remaining_length(&self) -> (u64, Option) { + match &self.inner { + Inner::Once { inner: None } => (0, Some(0)), + Inner::Once { inner: Some(bytes) } => { + let len = bytes.len() as u64; + (len, Some(len)) + } + Inner::Dyn { inner: box_body } => match box_body { + #[cfg(feature = "http-body-0-4-x")] + BoxBody::HttpBody04(box_body) => { + use http_body_0_4::Body; + let hint = box_body.size_hint(); + (hint.lower(), hint.upper()) + } + #[allow(unreachable_patterns)] + _ => unreachable!( + "enabling `http-body-0-4-x` is the only way to create the `Dyn` variant" + ), + }, + Inner::Taken => (0, Some(0)), + } } /// Given a function to modify an `SdkBody`, run that function against this `SdkBody` before @@ -238,16 +290,6 @@ impl From for SdkBody { } } -impl From for SdkBody { - fn from(body: hyper::Body) -> Self { - SdkBody { - inner: Inner::Streaming { inner: body }, - rebuild: None, - bytes_contents: None, - } - } -} - impl From> for SdkBody { fn from(data: Vec) -> Self { Self::from(Bytes::from(data)) @@ -266,64 +308,15 @@ impl From<&[u8]> for SdkBody { } } -impl http_body::Body for SdkBody { - type Data = Bytes; - type Error = Error; - - fn poll_data( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll>> { - self.poll_inner(cx) - } - - fn poll_trailers( - self: Pin<&mut Self>, - _cx: &mut Context<'_>, - ) -> Poll>, Self::Error>> { - Poll::Ready(Ok(None)) - } - - fn is_end_stream(&self) -> bool { - match &self.inner { - Inner::Once { inner: None } => true, - Inner::Once { inner: Some(bytes) } => bytes.is_empty(), - Inner::Streaming { inner: hyper_body } => hyper_body.is_end_stream(), - Inner::Dyn { inner: box_body } => box_body.is_end_stream(), - Inner::Taken => true, - } - } - - fn size_hint(&self) -> SizeHint { - match &self.inner { - Inner::Once { inner: None } => SizeHint::with_exact(0), - Inner::Once { inner: Some(bytes) } => SizeHint::with_exact(bytes.len() as u64), - Inner::Streaming { inner: hyper_body } => hyper_body.size_hint(), - Inner::Dyn { inner: box_body } => box_body.size_hint(), - Inner::Taken => SizeHint::new(), - } - } -} - #[cfg(test)] mod test { - use crate::body::{BoxBody, SdkBody}; - use http_body::Body; + use crate::body::SdkBody; use std::pin::Pin; #[test] fn valid_size_hint() { - assert_eq!(SdkBody::from("hello").size_hint().exact(), Some(5)); - assert_eq!(SdkBody::from("").size_hint().exact(), Some(0)); - } - - #[test] - fn map_preserve_preserves_bytes_hint() { - let initial = SdkBody::from("hello!"); - assert_eq!(initial.bytes(), Some(b"hello!".as_slice())); - - let new_body = initial.map_preserve_contents(|body| SdkBody::from_dyn(BoxBody::new(body))); - assert_eq!(new_body.bytes(), Some(b"hello!".as_slice())); + assert_eq!(SdkBody::from("hello").content_length(), Some(5)); + assert_eq!(SdkBody::from("").content_length(), Some(0)); } #[allow(clippy::bool_assert_comparison)] @@ -337,9 +330,9 @@ mod test { async fn http_body_consumes_data() { let mut body = SdkBody::from("hello!"); let mut body = Pin::new(&mut body); - let data = body.data().await; + let data = body.next().await; assert!(data.is_some()); - let data = body.data().await; + let data = body.next().await; assert!(data.is_none()); } @@ -348,31 +341,14 @@ mod test { // Its important to avoid sending empty chunks of data to avoid H2 data frame problems let mut body = SdkBody::from(""); let mut body = Pin::new(&mut body); - let data = body.data().await; + let data = body.next().await; assert!(data.is_none()); } #[test] fn sdkbody_debug_once() { let body = SdkBody::from("123"); - // actually don't really care what the debug impl is, just that it doesn't crash - let _ = format!("{:?}", body); - } - - #[test] - fn sdkbody_debug_dyn() { - let hyper_body = hyper::Body::channel().1; - let body = SdkBody::from_dyn(BoxBody::new(hyper_body.map_err(|e| e.into()))); - // actually don't really care what the debug impl is, just that it doesn't crash - let _ = format!("{:?}", body); - } - - #[test] - fn sdkbody_debug_hyper() { - let hyper_body = hyper::Body::channel().1; - let body = SdkBody::from(hyper_body); - // actually don't really care what the debug impl is, just that it doesn't crash - let _ = format!("{:?}", body); + assert!(format!("{:?}", body).contains("Once")); } #[test] diff --git a/rust-runtime/aws-smithy-types/src/body/http_body_0_4_x.rs b/rust-runtime/aws-smithy-types/src/body/http_body_0_4_x.rs new file mode 100644 index 0000000000..2c6a597c41 --- /dev/null +++ b/rust-runtime/aws-smithy-types/src/body/http_body_0_4_x.rs @@ -0,0 +1,91 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0 + */ + +use crate::body::{BoxBody, Error, Inner, SdkBody}; +use bytes::Bytes; +use std::pin::Pin; +use std::task::{Context, Poll}; + +impl SdkBody { + /// Construct an `SdkBody` from a type that implements [`http_body_0_4::Body`](http_body_0_4::Body). + /// + /// _Note: This is only available with `http-body-0-4-x` enabled._ + pub fn from_body_0_4(body: T) -> Self + where + T: http_body_0_4::Body + Send + Sync + 'static, + E: Into + 'static, + { + Self { + inner: Inner::Dyn { + inner: BoxBody::HttpBody04(http_body_0_4::combinators::BoxBody::new( + body.map_err(Into::into), + )), + }, + rebuild: None, + bytes_contents: None, + } + } +} + +#[cfg(feature = "hyper-0-14-x")] +impl From for SdkBody { + fn from(body: hyper_0_14::Body) -> Self { + SdkBody::from_body_0_4(body) + } +} + +impl http_body_0_4::Body for SdkBody { + type Data = Bytes; + type Error = Error; + + fn poll_data( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll>> { + self.poll_next(cx) + } + + fn poll_trailers( + self: Pin<&mut Self>, + _cx: &mut Context<'_>, + ) -> Poll>, Self::Error>> { + Poll::Ready(Ok(None)) + } + + fn is_end_stream(&self) -> bool { + self.is_end_stream() + } + + fn size_hint(&self) -> http_body_0_4::SizeHint { + let mut result = http_body_0_4::SizeHint::default(); + let (lower, upper) = self.bounds_on_remaining_length(); + result.set_lower(lower); + if let Some(u) = upper { + result.set_upper(u) + } + result + } +} + +#[cfg(test)] +mod tests { + use crate::body::SdkBody; + + #[test] + fn map_preserve_preserves_bytes_hint() { + let initial = SdkBody::from("hello!"); + assert_eq!(initial.bytes(), Some(b"hello!".as_slice())); + + let new_body = initial.map_preserve_contents(|body| SdkBody::from_body_0_4(body)); + assert_eq!(new_body.bytes(), Some(b"hello!".as_slice())); + } + + #[test] + fn sdkbody_debug_dyn() { + let hyper_body = hyper_0_14::Body::channel().1; + let body = SdkBody::from_body_0_4(hyper_body); + assert!(format!("{:?}", body).contains("BoxBody")); + } +} diff --git a/rust-runtime/aws-smithy-types/src/byte_stream.rs b/rust-runtime/aws-smithy-types/src/byte_stream.rs index 909bea34c0..0db39f4bd1 100644 --- a/rust-runtime/aws-smithy-types/src/byte_stream.rs +++ b/rust-runtime/aws-smithy-types/src/byte_stream.rs @@ -78,10 +78,10 @@ //! //! ### Create a ByteStream from a file //! -//! _Note: This is only available with `rt-tokio` enabled._ +//! _Note: This is only available with `rt-tokio` and `http-body-0-4-x` enabled._ //! //! ```no_run -//! # #[cfg(feature = "rt-tokio")] +//! # #[cfg(all(feature = "rt-tokio", feature = "http-body-0-4-x"))] //! # { //! use aws_smithy_types::byte_stream::ByteStream; //! use std::path::Path; @@ -90,7 +90,7 @@ //! } //! //! async fn bytestream_from_file() -> GetObjectInput { -//! let bytestream = ByteStream::from_path("docs/some-large-file.csv") +//! let bytestream = ByteStream::from_path_body_0_4("docs/some-large-file.csv") //! .await //! .expect("valid path"); //! GetObjectInput { body: bytestream } @@ -99,10 +99,10 @@ //! ``` //! //! If you want more control over how the file is read, such as specifying the size of the buffer used to read the file -//! or the length of the file, use an [`FsBuilder`](crate::byte_stream::FsBuilder). +//! or the length of the file, use an `FsBuilder`. //! //! ```no_run -//! # #[cfg(feature = "rt-tokio")] +//! # #[cfg(all(feature = "rt-tokio", feature = "http-body-0-4-x"))] //! # { //! use aws_smithy_types::byte_stream::{ByteStream, Length}; //! use std::path::Path; @@ -111,7 +111,7 @@ //! } //! //! async fn bytestream_from_file() -> GetObjectInput { -//! let bytestream = ByteStream::read_from().path("docs/some-large-file.csv") +//! let bytestream = ByteStream::read_with_body_0_4_from().path("docs/some-large-file.csv") //! .buffer_size(32_784) //! .length(Length::Exact(123_456)) //! .build() @@ -127,7 +127,6 @@ use crate::byte_stream::error::Error; use bytes::Buf; use bytes::Bytes; use bytes_utils::SegmentedBuf; -use http_body::Body; use pin_project_lite::pin_project; use std::future::poll_fn; use std::io::IoSlice; @@ -144,6 +143,12 @@ pub mod error; #[cfg(feature = "rt-tokio")] pub use self::bytestream_util::FsBuilder; +/// This module is named after the `http-body` version number since we anticipate +/// needing to provide equivalent functionality for 1.x of that crate in the future. +/// The name has a suffix `_x` to avoid name collision with a third-party `http-body-0-4`. +#[cfg(feature = "http-body-0-4-x")] +pub mod http_body_0_4_x; + pin_project! { /// Stream of binary data /// @@ -230,33 +235,43 @@ pin_project! { /// ``` /// /// 2. **From a file**: ByteStreams created from a path can be retried. A new file descriptor will be opened if a retry occurs. + /// + /// _Note: The `http-body-0-4-x` feature must be active to call `ByteStream::from_body_0_4`._ + /// /// ```no_run - /// #[cfg(feature = "tokio-rt")] + /// #[cfg(all(feature = "tokio-rt", feature = "http-body-0-4-x"))] /// # { /// use aws_smithy_types::byte_stream::ByteStream; - /// let stream = ByteStream::from_path("big_file.csv"); + /// let stream = ByteStream::from_path_body_0_4("big_file.csv"); /// # } /// ``` /// /// 3. **From an `SdkBody` directly**: For more advanced / custom use cases, a ByteStream can be created directly /// from an SdkBody. **When created from an SdkBody, care must be taken to ensure retriability.** An SdkBody is retryable /// when constructed from in-memory data or when using [`SdkBody::retryable`](crate::body::SdkBody::retryable). + /// + /// _Note: The `http-body-0-4-x` feature must be active to construct an `SdkBody` with `from_body_0_4`._ + /// /// ```no_run + /// # #[cfg(feature = "http-body-0-4-x")] + /// # { + /// # use hyper_0_14 as hyper; /// use aws_smithy_types::byte_stream::ByteStream; /// use aws_smithy_types::body::SdkBody; /// use bytes::Bytes; /// let (mut tx, channel_body) = hyper::Body::channel(); /// // this will not be retryable because the SDK has no way to replay this stream - /// let stream = ByteStream::new(SdkBody::from(channel_body)); + /// let stream = ByteStream::new(SdkBody::from_body_0_4(channel_body)); /// tx.send_data(Bytes::from_static(b"hello world!")); /// tx.send_data(Bytes::from_static(b"hello again!")); /// // NOTE! You must ensure that `tx` is dropped to ensure that EOF is sent + /// # } /// ``` /// #[derive(Debug)] pub struct ByteStream { #[pin] - inner: Inner + inner: Inner, } } @@ -291,8 +306,12 @@ impl ByteStream { Some(self.inner.next().await?.map_err(Error::streaming)) } + #[cfg(feature = "byte-stream-poll-next")] /// Attempt to pull out the next value of this stream, returning `None` if the stream is /// exhausted. + // This should only be used when one needs to implement a trait method like + // `futures_core::stream::Stream::poll_next` on a new-type wrapping a `ByteStream`. + // In general, use the `next` method instead. pub fn poll_next( self: Pin<&mut Self>, cx: &mut Context<'_>, @@ -334,77 +353,6 @@ impl ByteStream { self.inner.collect().await.map_err(Error::streaming) } - /// Returns a [`FsBuilder`](crate::byte_stream::FsBuilder), allowing you to build a `ByteStream` with - /// full control over how the file is read (eg. specifying the length of the file or the size of the buffer used to read the file). - /// ```no_run - /// # #[cfg(feature = "rt-tokio")] - /// # { - /// use aws_smithy_types::byte_stream::{ByteStream, Length}; - /// - /// async fn bytestream_from_file() -> ByteStream { - /// let bytestream = ByteStream::read_from() - /// .path("docs/some-large-file.csv") - /// // Specify the size of the buffer used to read the file (in bytes, default is 4096) - /// .buffer_size(32_784) - /// // Specify the length of the file used (skips an additional call to retrieve the size) - /// .length(Length::Exact(123_456)) - /// .build() - /// .await - /// .expect("valid path"); - /// bytestream - /// } - /// # } - /// ``` - #[cfg(feature = "rt-tokio")] - #[cfg_attr(docsrs, doc(cfg(feature = "rt-tokio")))] - pub fn read_from() -> FsBuilder { - FsBuilder::new() - } - - /// Create a ByteStream that streams data from the filesystem - /// - /// This function creates a retryable ByteStream for a given `path`. The returned ByteStream - /// will provide a size hint when used as an HTTP body. If the request fails, the read will - /// begin again by reloading the file handle. - /// - /// ## Warning - /// The contents of the file MUST not change during retries. The length & checksum of the file - /// will be cached. If the contents of the file change, the operation will almost certainly fail. - /// - /// Furthermore, a partial write MAY seek in the file and resume from the previous location. - /// - /// Note: If you want more control, such as specifying the size of the buffer used to read the file - /// or the length of the file, use a [`FsBuilder`](crate::byte_stream::FsBuilder) as returned - /// from `ByteStream::read_from` - /// - /// # Examples - /// ```no_run - /// use aws_smithy_types::byte_stream::ByteStream; - /// use std::path::Path; - /// async fn make_bytestream() -> ByteStream { - /// ByteStream::from_path("docs/rows.csv").await.expect("file should be readable") - /// } - /// ``` - #[cfg(feature = "rt-tokio")] - #[cfg_attr(docsrs, doc(cfg(feature = "rt-tokio")))] - pub async fn from_path(path: impl AsRef) -> Result { - FsBuilder::new().path(path).build().await - } - - /// Create a ByteStream from a file - /// - /// NOTE: This will NOT result in a retryable ByteStream. For a ByteStream that can be retried in the case of - /// upstream failures, use [`ByteStream::from_path`](ByteStream::from_path) - #[deprecated( - since = "0.40.0", - note = "Prefer the more extensible ByteStream::read_from() API" - )] - #[cfg(feature = "rt-tokio")] - #[cfg_attr(docsrs, doc(cfg(feature = "rt-tokio")))] - pub async fn from_file(file: tokio::fs::File) -> Result { - FsBuilder::new().file(file).build().await - } - #[cfg(feature = "rt-tokio")] /// Convert this `ByteStream` into a struct that implements [`AsyncRead`](tokio::io::AsyncRead). /// @@ -433,7 +381,9 @@ impl ByteStream { mut self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll> { - Pin::new(&mut self.0).poll_next(cx) + Pin::new(&mut self.0.inner) + .poll_next(cx) + .map_err(Error::streaming) } } tokio_util::io::StreamReader::new(FuturesStreamCompatByteStream(self)) @@ -479,12 +429,6 @@ impl From> for ByteStream { } } -impl From for ByteStream { - fn from(input: hyper::Body) -> Self { - ByteStream::new(SdkBody::from(input)) - } -} - /// Non-contiguous Binary Data Storage /// /// When data is read from the network, it is read in a sequence of chunks that are not in @@ -543,23 +487,19 @@ impl Buf for AggregatedBytes { } pin_project! { - #[derive(Debug, Clone, PartialEq, Eq)] - struct Inner { + #[derive(Debug)] + struct Inner { #[pin] - body: B, + body: SdkBody, } } -impl Inner { - fn new(body: B) -> Self { +impl Inner { + fn new(body: SdkBody) -> Self { Self { body } } - async fn next(&mut self) -> Option> - where - Self: Unpin, - B: http_body::Body, - { + async fn next(&mut self) -> Option> { let mut me = Pin::new(self); poll_fn(|cx| me.as_mut().poll_next(cx)).await } @@ -567,43 +507,34 @@ impl Inner { fn poll_next( self: Pin<&mut Self>, cx: &mut Context<'_>, - ) -> Poll>> - where - B: http_body::Body, - { - self.project().body.poll_data(cx) + ) -> Poll>> { + self.project().body.poll_next(cx) } - async fn collect(self) -> Result - where - B: http_body::Body, - { + async fn collect(self) -> Result { let mut output = SegmentedBuf::new(); let body = self.body; pin_utils::pin_mut!(body); - while let Some(buf) = body.data().await { + while let Some(buf) = body.next().await { output.push(buf?); } Ok(AggregatedBytes(output)) } - fn size_hint(&self) -> (u64, Option) - where - B: http_body::Body, - { - let size_hint = http_body::Body::size_hint(&self.body); - (size_hint.lower(), size_hint.upper()) + fn size_hint(&self) -> (u64, Option) { + self.body.bounds_on_remaining_length() } } #[cfg(test)] mod tests { + use crate::body::SdkBody; use crate::byte_stream::Inner; use bytes::Bytes; #[tokio::test] async fn read_from_string_body() { - let body = hyper::Body::from("a simple body"); + let body = SdkBody::from("a simple body"); assert_eq!( Inner::new(body) .collect() @@ -614,63 +545,6 @@ mod tests { ); } - #[tokio::test] - async fn read_from_channel_body() { - let (mut sender, body) = hyper::Body::channel(); - let byte_stream = Inner::new(body); - tokio::spawn(async move { - sender.send_data(Bytes::from("data 1")).await.unwrap(); - sender.send_data(Bytes::from("data 2")).await.unwrap(); - sender.send_data(Bytes::from("data 3")).await.unwrap(); - }); - assert_eq!( - byte_stream.collect().await.expect("no errors").into_bytes(), - Bytes::from("data 1data 2data 3") - ); - } - - #[cfg(feature = "rt-tokio")] - #[tokio::test] - async fn path_based_bytestreams() -> Result<(), Box> { - use super::ByteStream; - use bytes::Buf; - use http_body::Body; - use std::io::Write; - use tempfile::NamedTempFile; - let mut file = NamedTempFile::new()?; - - for i in 0..10000 { - writeln!(file, "Brian was here. Briefly. {}", i)?; - } - let body = ByteStream::from_path(&file).await?.into_inner(); - // assert that a valid size hint is immediately ready - assert_eq!(body.size_hint().exact(), Some(298890)); - let mut body1 = body.try_clone().expect("retryable bodies are cloneable"); - // read a little bit from one of the clones - let some_data = body1 - .data() - .await - .expect("should have some data") - .expect("read should not fail"); - assert!(!some_data.is_empty()); - // make some more clones - let body2 = body.try_clone().expect("retryable bodies are cloneable"); - let body3 = body.try_clone().expect("retryable bodies are cloneable"); - let body2 = ByteStream::new(body2).collect().await?.into_bytes(); - let body3 = ByteStream::new(body3).collect().await?.into_bytes(); - assert_eq!(body2, body3); - assert!(body2.starts_with(b"Brian was here.")); - assert!(body2.ends_with(b"9999\n")); - assert_eq!(body2.len(), 298890); - - assert_eq!( - ByteStream::new(body1).collect().await?.remaining(), - 298890 - some_data.len() - ); - - Ok(()) - } - #[cfg(feature = "rt-tokio")] #[tokio::test] async fn bytestream_into_async_read() { diff --git a/rust-runtime/aws-smithy-types/src/byte_stream/bytestream_util.rs b/rust-runtime/aws-smithy-types/src/byte_stream/bytestream_util.rs index 91ad71e917..467733b002 100644 --- a/rust-runtime/aws-smithy-types/src/byte_stream/bytestream_util.rs +++ b/rust-runtime/aws-smithy-types/src/byte_stream/bytestream_util.rs @@ -5,14 +5,10 @@ use crate::body::SdkBody; use crate::byte_stream::{error::Error, error::ErrorKind, ByteStream}; -use bytes::Bytes; -use futures_core::ready; -use http::HeaderMap; -use http_body::{Body, SizeHint}; use std::future::Future; use std::path::PathBuf; use std::pin::Pin; -use std::task::{Context, Poll}; +use std::sync::Arc; use tokio::fs::File; use tokio::io::{self, AsyncReadExt, AsyncSeekExt}; use tokio_util::io::ReaderStream; @@ -60,9 +56,11 @@ impl PathBody { /// Builder for creating [`ByteStreams`](ByteStream) from a file/path, with full control over advanced options. /// +/// _Note: A cargo feature `http-body-0-4-x` should be active to call `ByteStream::read_with_body_0_4_from` in the following example._ +/// /// Example usage: /// ```no_run -/// # #[cfg(feature = "rt-tokio")] +/// # #[cfg(all(feature = "rt-tokio", feature = "http-body-0-4-x"))] /// # { /// use aws_smithy_types::byte_stream::{ByteStream, Length}; /// use std::path::Path; @@ -71,7 +69,7 @@ impl PathBody { /// } /// /// async fn bytestream_from_file() -> GetObjectInput { -/// let bytestream = ByteStream::read_from() +/// let bytestream = ByteStream::read_with_body_0_4_from() /// .path("docs/some-large-file.csv") /// // Specify the size of the buffer used to read the file (in bytes, default is 4096) /// .buffer_size(32_784) @@ -91,12 +89,12 @@ pub struct FsBuilder { length: Option, buffer_size: usize, offset: Option, + sdk_body_creator: SdkBodyCreator, } -impl Default for FsBuilder { - fn default() -> Self { - Self::new() - } +enum SdkBodyCreator { + #[cfg(feature = "http-body-0-4-x")] + HttpBody04(Arc SdkBody + Send + Sync + 'static>), } /// The length (in bytes) to read. Determines whether or not a short read counts as an error. @@ -110,16 +108,22 @@ pub enum Length { } impl FsBuilder { + #[cfg(feature = "http-body-0-4-x")] /// Create a new [`FsBuilder`] (using a default read buffer of 4096 bytes). /// /// You must then call either [`file`](FsBuilder::file) or [`path`](FsBuilder::path) to specify what to read from. - pub fn new() -> Self { - FsBuilder { + /// + /// _Note: This is only available with `http-body-0-4-x` enabled._ + pub fn new_with_body_0_4() -> Self { + Self { buffer_size: DEFAULT_BUFFER_SIZE, file: None, length: None, offset: None, path: None, + sdk_body_creator: SdkBodyCreator::HttpBody04(Arc::new(|body: PathBody| { + SdkBody::from_body_0_4(body) + })), } } @@ -199,12 +203,19 @@ impl FsBuilder { let body_loader = move || { // If an offset was provided, seeking will be handled in `PathBody::poll_data` each // time the file is loaded. - SdkBody::from_dyn(http_body::combinators::BoxBody::new(PathBody::from_path( - path.clone(), - length, - buffer_size, - self.offset, - ))) + match &self.sdk_body_creator { + #[cfg(feature = "http-body-0-4-x")] + SdkBodyCreator::HttpBody04(f) => f(PathBody::from_path( + path.clone(), + length, + buffer_size, + self.offset, + )), + #[allow(unreachable_patterns)] + _ => unreachable!( + "`http-body-0-4-x` should've been enabled to create an `FsBuilder`" + ), + } }; Ok(ByteStream::new(SdkBody::retryable(body_loader))) @@ -214,9 +225,14 @@ impl FsBuilder { let _s = file.seek(io::SeekFrom::Start(offset)).await?; } - let body = SdkBody::from_dyn(http_body::combinators::BoxBody::new( - PathBody::from_file(file, length, buffer_size), - )); + let body = match self.sdk_body_creator { + #[cfg(feature = "http-body-0-4-x")] + SdkBodyCreator::HttpBody04(f) => f(PathBody::from_file(file, length, buffer_size)), + #[allow(unreachable_patterns)] + _ => unreachable!( + "`http-body-0-4-x` should've been enabled to create an `FsBuilder`" + ), + }; Ok(ByteStream::new(body)) } else { @@ -240,14 +256,16 @@ enum State { Loaded(ReaderStream>), } -impl Body for PathBody { - type Data = Bytes; +#[cfg(feature = "http-body-0-4-x")] +impl http_body_0_4::Body for PathBody { + type Data = bytes::Bytes; type Error = Box; fn poll_data( - mut self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll>> { + mut self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll>> { + use std::task::Poll; let offset = self.offset.unwrap_or(DEFAULT_OFFSET); loop { match self.state { @@ -264,7 +282,7 @@ impl Body for PathBody { })); } State::Loading(ref mut future) => { - match ready!(Pin::new(future).poll(cx)) { + match futures_core::ready!(Pin::new(future).poll(cx)) { Ok(file) => { self.state = State::Loaded(ReaderStream::with_capacity( file.take(self.length), @@ -276,7 +294,7 @@ impl Body for PathBody { } State::Loaded(ref mut stream) => { use futures_core::Stream; - return match ready!(Pin::new(stream).poll_next(cx)) { + return match futures_core::ready!(std::pin::Pin::new(stream).poll_next(cx)) { Some(Ok(bytes)) => Poll::Ready(Some(Ok(bytes))), None => Poll::Ready(None), Some(Err(e)) => Poll::Ready(Some(Err(e.into()))), @@ -287,10 +305,10 @@ impl Body for PathBody { } fn poll_trailers( - self: Pin<&mut Self>, - _cx: &mut Context<'_>, - ) -> Poll, Self::Error>> { - Poll::Ready(Ok(None)) + self: std::pin::Pin<&mut Self>, + _cx: &mut std::task::Context<'_>, + ) -> std::task::Poll, Self::Error>> { + std::task::Poll::Ready(Ok(None)) } fn is_end_stream(&self) -> bool { @@ -298,17 +316,17 @@ impl Body for PathBody { self.length == 0 } - fn size_hint(&self) -> SizeHint { - SizeHint::with_exact(self.length) + fn size_hint(&self) -> http_body_0_4::SizeHint { + http_body_0_4::SizeHint::with_exact(self.length) } } +#[cfg(feature = "http-body-0-4-x")] #[cfg(test)] mod test { use super::FsBuilder; use crate::byte_stream::{ByteStream, Length}; use bytes::Buf; - use http_body::Body; use std::io::Write; use tempfile::NamedTempFile; @@ -325,7 +343,7 @@ mod test { .expect("file metadata is accessible") .len(); - let body = FsBuilder::new() + let body = FsBuilder::new_with_body_0_4() .path(&file) .buffer_size(16384) .length(Length::Exact(file_length)) @@ -335,12 +353,12 @@ mod test { .into_inner(); // assert that the specified length is used as size hint - assert_eq!(body.size_hint().exact(), Some(file_length)); + assert_eq!(body.content_length(), Some(file_length)); let mut body1 = body.try_clone().expect("retryable bodies are cloneable"); // read a little bit from one of the clones let some_data = body1 - .data() + .next() .await .expect("should have some data") .expect("read should not fail"); @@ -364,7 +382,7 @@ mod test { // Ensure that the file was written to file.flush().expect("flushing is OK"); - let body = FsBuilder::new() + let body = FsBuilder::new_with_body_0_4() .path(&file) // The file is longer than 1 byte, let's see if this is used to generate the size hint .length(Length::Exact(1)) @@ -373,7 +391,7 @@ mod test { .unwrap() .into_inner(); - assert_eq!(body.size_hint().exact(), Some(1)); + assert_eq!(body.content_length(), Some(1)); } #[tokio::test] @@ -388,7 +406,7 @@ mod test { // Ensure that the file was written to file.flush().expect("flushing is OK"); - let body = FsBuilder::new() + let body = FsBuilder::new_with_body_0_4() .path(&file) // We're going to read line 0 only .length(Length::Exact(line_0.len() as u64)) @@ -412,7 +430,7 @@ mod test { // Ensure that the file was written to file.flush().expect("flushing is OK"); - assert!(FsBuilder::new() + assert!(FsBuilder::new_with_body_0_4() .path(&file) // The file is 30 bytes so this is fine .length(Length::Exact(29)) @@ -420,7 +438,7 @@ mod test { .await .is_ok()); - assert!(FsBuilder::new() + assert!(FsBuilder::new_with_body_0_4() .path(&file) // The file is 30 bytes so this is fine .length(Length::Exact(30)) @@ -428,7 +446,7 @@ mod test { .await .is_ok()); - assert!(FsBuilder::new() + assert!(FsBuilder::new_with_body_0_4() .path(&file) // Larger than 30 bytes, this will cause an error .length(Length::Exact(31)) @@ -449,7 +467,7 @@ mod test { // Ensure that the file was written to file.flush().expect("flushing is OK"); - let body = FsBuilder::new() + let body = FsBuilder::new_with_body_0_4() .path(&file) // We're going to skip the first line by using offset .offset(line_0.len() as u64) @@ -477,7 +495,7 @@ mod test { // Ensure that the file was written to file.flush().expect("flushing is OK"); - let body = FsBuilder::new() + let body = FsBuilder::new_with_body_0_4() .path(&file) // We're going to skip line 0 by using offset .offset(line_0.len() as u64) @@ -506,7 +524,7 @@ mod test { file.flush().expect("flushing is OK"); assert_eq!( - FsBuilder::new() + FsBuilder::new_with_body_0_4() .path(&file) // We're going to skip all file contents by setting an offset // much larger than the file size @@ -531,7 +549,7 @@ mod test { // Ensure that the file was written to file.flush().expect("flushing is OK"); - let body = FsBuilder::new() + let body = FsBuilder::new_with_body_0_4() .path(&file) .length(Length::UpTo(9000)) .build() @@ -583,7 +601,7 @@ mod test { chunk_size }; - let byte_stream = FsBuilder::new() + let byte_stream = FsBuilder::new_with_body_0_4() .path(&file_path) .offset(i * chunk_size) .length(Length::Exact(length)) diff --git a/rust-runtime/aws-smithy-types/src/byte_stream/http_body_0_4_x.rs b/rust-runtime/aws-smithy-types/src/byte_stream/http_body_0_4_x.rs new file mode 100644 index 0000000000..5edbaaba87 --- /dev/null +++ b/rust-runtime/aws-smithy-types/src/byte_stream/http_body_0_4_x.rs @@ -0,0 +1,172 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0 + */ + +use crate::body::SdkBody; +use crate::byte_stream::ByteStream; +use bytes::Bytes; + +impl ByteStream { + /// Construct a `ByteStream` from a type that implements [`http_body_0_4::Body`](http_body_0_4::Body). + /// + /// _Note: This is only available with `http-body-0-4-x` enabled._ + pub fn from_body_0_4(body: T) -> Self + where + T: http_body_0_4::Body + Send + Sync + 'static, + E: Into + 'static, + { + ByteStream::new(SdkBody::from_body_0_4(body)) + } + + /// Returns a [`FsBuilder`](crate::byte_stream::FsBuilder), allowing you to build a `ByteStream` with + /// full control over how the file is read (eg. specifying the length of the file or the size of the buffer used to read the file). + /// ```no_run + /// # #[cfg(all(feature = "rt-tokio", feature = "http-body-0-4-x"))] + /// # { + /// use aws_smithy_types::byte_stream::{ByteStream, Length}; + /// + /// async fn bytestream_from_file() -> ByteStream { + /// let bytestream = ByteStream::read_with_body_0_4_from() + /// .path("docs/some-large-file.csv") + /// // Specify the size of the buffer used to read the file (in bytes, default is 4096) + /// .buffer_size(32_784) + /// // Specify the length of the file used (skips an additional call to retrieve the size) + /// .length(Length::Exact(123_456)) + /// .build() + /// .await + /// .expect("valid path"); + /// bytestream + /// } + /// # } + /// ``` + #[cfg(feature = "rt-tokio")] + #[cfg_attr(docsrs, doc(cfg(feature = "rt-tokio")))] + pub fn read_with_body_0_4_from() -> crate::byte_stream::FsBuilder { + crate::byte_stream::FsBuilder::new_with_body_0_4() + } + + /// Create a ByteStream that streams data from the filesystem + /// + /// This function creates a retryable ByteStream for a given `path`. The returned ByteStream + /// will provide a size hint when used as an HTTP body. If the request fails, the read will + /// begin again by reloading the file handle. + /// + /// ## Warning + /// The contents of the file MUST not change during retries. The length & checksum of the file + /// will be cached. If the contents of the file change, the operation will almost certainly fail. + /// + /// Furthermore, a partial write MAY seek in the file and resume from the previous location. + /// + /// Note: If you want more control, such as specifying the size of the buffer used to read the file + /// or the length of the file, use a [`FsBuilder`](crate::byte_stream::FsBuilder) as returned + /// from `ByteStream::read_with_body_0_4_from` + /// + /// # Examples + /// ```no_run + /// use aws_smithy_types::byte_stream::ByteStream; + /// use std::path::Path; + /// async fn make_bytestream() -> ByteStream { + /// ByteStream::from_path_body_0_4("docs/rows.csv").await.expect("file should be readable") + /// } + /// ``` + #[cfg(feature = "rt-tokio")] + #[cfg_attr(docsrs, doc(cfg(feature = "rt-tokio")))] + pub async fn from_path_body_0_4( + path: impl AsRef, + ) -> Result { + crate::byte_stream::FsBuilder::new_with_body_0_4() + .path(path) + .build() + .await + } + + /// Create a ByteStream from a file + /// + /// NOTE: This will NOT result in a retryable ByteStream. For a ByteStream that can be retried in the case of + /// upstream failures, use [`ByteStream::from_path_body_0_4`](ByteStream::from_path_body_0_4) + #[deprecated( + since = "0.40.0", + note = "Prefer the more extensible ByteStream::read_from() API" + )] + #[cfg(feature = "rt-tokio")] + #[cfg_attr(docsrs, doc(cfg(feature = "rt-tokio")))] + pub async fn from_file_body_0_4( + file: tokio::fs::File, + ) -> Result { + crate::byte_stream::FsBuilder::new_with_body_0_4() + .file(file) + .build() + .await + } +} + +#[cfg(feature = "hyper-0-14-x")] +impl From for ByteStream { + fn from(input: hyper_0_14::Body) -> Self { + ByteStream::new(SdkBody::from_body_0_4(input)) + } +} + +#[cfg(test)] +mod tests { + use crate::body::SdkBody; + use crate::byte_stream::Inner; + use bytes::Bytes; + + #[tokio::test] + async fn read_from_channel_body() { + let (mut sender, body) = hyper_0_14::Body::channel(); + let byte_stream = Inner::new(SdkBody::from_body_0_4(body)); + tokio::spawn(async move { + sender.send_data(Bytes::from("data 1")).await.unwrap(); + sender.send_data(Bytes::from("data 2")).await.unwrap(); + sender.send_data(Bytes::from("data 3")).await.unwrap(); + }); + assert_eq!( + byte_stream.collect().await.expect("no errors").into_bytes(), + Bytes::from("data 1data 2data 3") + ); + } + + #[cfg(feature = "rt-tokio")] + #[tokio::test] + async fn path_based_bytestreams() -> Result<(), Box> { + use super::ByteStream; + use bytes::Buf; + use std::io::Write; + use tempfile::NamedTempFile; + let mut file = NamedTempFile::new()?; + + for i in 0..10000 { + writeln!(file, "Brian was here. Briefly. {}", i)?; + } + let body = ByteStream::from_path_body_0_4(&file).await?.into_inner(); + // assert that a valid size hint is immediately ready + assert_eq!(body.content_length(), Some(298890)); + let mut body1 = body.try_clone().expect("retryable bodies are cloneable"); + // read a little bit from one of the clones + let some_data = body1 + .next() + .await + .expect("should have some data") + .expect("read should not fail"); + assert!(!some_data.is_empty()); + // make some more clones + let body2 = body.try_clone().expect("retryable bodies are cloneable"); + let body3 = body.try_clone().expect("retryable bodies are cloneable"); + let body2 = ByteStream::new(body2).collect().await?.into_bytes(); + let body3 = ByteStream::new(body3).collect().await?.into_bytes(); + assert_eq!(body2, body3); + assert!(body2.starts_with(b"Brian was here.")); + assert!(body2.ends_with(b"9999\n")); + assert_eq!(body2.len(), 298890); + + assert_eq!( + ByteStream::new(body1).collect().await?.remaining(), + 298890 - some_data.len() + ); + + Ok(()) + } +}