Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove futures_core::stream::Stream from aws_smithy_http::byte_stream::ByteStream #2983

Merged
merged 31 commits into from
Sep 28, 2023
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
f43afc0
Remove `futures_core::stream::Stream` from `aws-smithy-async`
ysaito1001 Sep 9, 2023
cbbaf77
Update codegen client to use `PaginationStream`
ysaito1001 Sep 9, 2023
8005c3d
Update canary to use `PaginationStream`
ysaito1001 Sep 9, 2023
75b96e3
Stop using `tokio_stream::StreamExt` in integration-tests
ysaito1001 Sep 9, 2023
2b1fcb1
Add comment explaining why the test uses `.enter`
ysaito1001 Sep 11, 2023
73966d4
Make `FnStream::poll_next` module private
ysaito1001 Sep 11, 2023
e25a916
Update CHANGELOG.next.toml
ysaito1001 Sep 11, 2023
cce169c
Remove impl `Stream` trait for `ByteStream`
ysaito1001 Sep 12, 2023
0362d9c
Make rendering stream payload serializer configurable
ysaito1001 Sep 12, 2023
0099bdb
Let client configure how to render stream payload serializer
ysaito1001 Sep 12, 2023
03e9278
Let server configure how to render stream payload serializer
ysaito1001 Sep 12, 2023
b04b3ca
Remove unused `tokio_stream::StreamExt` from `glacier_checksums`
ysaito1001 Sep 13, 2023
6575943
Update CHANGELOG.next.toml
ysaito1001 Sep 13, 2023
8147d81
Remove unnecessary `Unpin` bound
ysaito1001 Sep 19, 2023
43318e7
Add convenince method `try_collect`
ysaito1001 Sep 20, 2023
b6811bf
Merge branch 'main' into ysaito/remove-futures-stream-from-smithy-async
ysaito1001 Sep 20, 2023
3ad25c8
Merge branch 'ysaito/remove-futures-stream-from-smithy-async' into ys…
ysaito1001 Sep 20, 2023
e076f64
Wrap stream payload in a new-type at call sites of serializer
ysaito1001 Sep 21, 2023
5381cfe
Revert HttpBoundProtocolPayloadGenerator.kt since no change is needed
ysaito1001 Sep 21, 2023
1add37f
Move `fn_stream` to its own module
ysaito1001 Sep 27, 2023
b8f37ea
Provide docs for `PaginationStream` for end users
ysaito1001 Sep 27, 2023
57f2707
Merge branch 'main' into ysaito/remove-futures-stream-from-smithy-async
ysaito1001 Sep 27, 2023
7571c8c
Update CHANGELOG.next.toml
ysaito1001 Sep 27, 2023
e8b1f65
Merge branch 'ysaito/remove-futures-stream-from-smithy-async' into ys…
ysaito1001 Sep 27, 2023
a802799
Make `ByteStream::poll_next` to be `pub(crate)`
ysaito1001 Sep 28, 2023
5d07b16
Doc link `ByteStream::next` to `ByteStream::try_next`
ysaito1001 Sep 28, 2023
6fc9a96
Doc link `ByteStream::try_next` to `ByteStream::next`
ysaito1001 Sep 28, 2023
b01b1d4
Explain why `futures_stream_adapter` is `doc(hidden)`
ysaito1001 Sep 28, 2023
f4a60df
Merge branch 'main' into ysaito/remove-futures-stream-from-byte-stream
ysaito1001 Sep 28, 2023
5bcf5ca
Make `ByteStream::poll_next` `pub` again
ysaito1001 Sep 28, 2023
e942f24
Merge branch 'main' into ysaito/remove-futures-stream-from-byte-stream
ysaito1001 Sep 28, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions CHANGELOG.next.toml
Original file line number Diff line number Diff line change
Expand Up @@ -150,3 +150,27 @@ message = "Fix regression with redacting sensitive HTTP response bodies."
references = ["smithy-rs#2926", "smithy-rs#2972"]
meta = { "breaking" = false, "tada" = false, "bug" = true, "target" = "client" }
author = "ysaito1001"

[[aws-sdk-rust]]
message = "The `futures_core::stream::Stream` trait has been removed from public API. It should not affect usual SDK use cases. If your code uses paginators, you do not need to use the `Stream` trait or its exntension traits, but only the `next`, `try_next`, `collect` methods are supported on `PaginationStream`. Other stream operations made available through the trait or its extension traits are not supported, but we should be able to add them as needed in a backward compatible manner."
references = ["smithy-rs#2978"]
meta = { "breaking" = true, "tada" = false, "bug" = false }
author = "ysaito1001"

[[smithy-rs]]
message = "The `futures_core::stream::Stream` trait has been removed from public API. [`FnStream`](https://docs.rs/aws-smithy-async/latest/aws_smithy_async/future/pagination_stream/struct.FnStream.html) only supports `next`, `try_next`, and `collect` methods. [`TryFlatMap::flat_map`](https://docs.rs/aws-smithy-async/latest/aws_smithy_async/future/pagination_stream/struct.TryFlatMap.html#method.flat_map) returns [`PaginationStream`](https://docs.rs/aws-smithy-async/latest/aws_smithy_async/future/pagination_stream/struct.PaginationStream.html), which should be preferred to `FnStream` at an interface level. For stream operations previously made available through the trait or its extension traits, we should be able to add them as needed in a backward compatible manner."
references = ["smithy-rs#2978"]
meta = { "breaking" = true, "tada" = false, "bug" = false, "target" = "client" }
author = "ysaito1001"

[[aws-sdk-rust]]
message = "The `futures_core::stream::Stream` trait has been removed from [`ByteStream`](https://docs.rs/aws-smithy-http/latest/aws_smithy_http/byte_stream/struct.ByteStream.html). The methods mentioned in the [doc](https://docs.rs/aws-smithy-http/latest/aws_smithy_http/byte_stream/struct.ByteStream.html#getting-data-out-of-a-bytestream) will continue to be supported. Other stream operations made available through the trait or its extension traits are not supported, but we should be able to add them as needed in a backward compatible manner."
references = ["smithy-rs#2983"]
meta = { "breaking" = true, "tada" = false, "bug" = false }
author = "ysaito1001"

[[smithy-rs]]
message = "The `futures_core::stream::Stream` trait has been removed from [`ByteStream`](https://docs.rs/aws-smithy-http/latest/aws_smithy_http/byte_stream/struct.ByteStream.html). The methods mentioned in the [doc](https://docs.rs/aws-smithy-http/latest/aws_smithy_http/byte_stream/struct.ByteStream.html#getting-data-out-of-a-bytestream) will continue to be supported. Other stream operations made available through the trait or its extension traits are not supported, but we should be able to add them as needed in a backward compatible manner."
references = ["smithy-rs#2983"]
meta = { "breaking" = true, "tada" = false, "bug" = false, "target" = "client" }
author = "ysaito1001"
1 change: 0 additions & 1 deletion aws/rust-runtime/aws-inlineable/src/glacier_checksums.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ use bytes::Buf;
use bytes_utils::SegmentedBuf;
use http::header::HeaderName;
use ring::digest::{Context, Digest, SHA256};
use tokio_stream::StreamExt;

const TREE_HASH_HEADER: &str = "x-amz-sha256-tree-hash";
const X_AMZ_CONTENT_SHA256: &str = "x-amz-content-sha256";
Expand Down
2 changes: 0 additions & 2 deletions aws/sdk/integration-tests/dynamodb/tests/paginators.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,6 @@
use std::collections::HashMap;
use std::iter::FromIterator;

use tokio_stream::StreamExt;

use aws_credential_types::Credentials;
use aws_sdk_dynamodb::types::AttributeValue;
use aws_sdk_dynamodb::{Client, Config};
Expand Down
2 changes: 0 additions & 2 deletions aws/sdk/integration-tests/ec2/tests/paginators.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,6 @@
* SPDX-License-Identifier: Apache-2.0
*/

use tokio_stream::StreamExt;

use aws_sdk_ec2::{config::Credentials, config::Region, types::InstanceType, Client, Config};
use aws_smithy_client::http_connector::HttpConnector;
use aws_smithy_client::test_connection::TestConnection;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ class PaginatorGenerator private constructor(
"HttpResponse" to RuntimeType.smithyRuntimeApi(runtimeConfig).resolve("client::orchestrator::HttpResponse"),
"SdkError" to RuntimeType.sdkError(runtimeConfig),
"client" to RuntimeType.smithyClient(runtimeConfig),
"fn_stream" to RuntimeType.smithyAsync(runtimeConfig).resolve("future::fn_stream"),
"pagination_stream" to RuntimeType.smithyAsync(runtimeConfig).resolve("future::pagination_stream"),

// External Types
"Stream" to RuntimeType.TokioStream.resolve("Stream"),
Expand Down Expand Up @@ -141,13 +141,14 @@ class PaginatorGenerator private constructor(

/// Create the pagination stream
///
/// _Note:_ No requests will be dispatched until the stream is used (eg. with [`.next().await`](tokio_stream::StreamExt::next)).
pub fn send(self) -> impl #{Stream}<Item = #{item_type}> + #{Unpin} {
/// _Note:_ No requests will be dispatched until the stream is used
/// (e.g. with the [`.next().await`](aws_smithy_async::future::pagination_stream::PaginationStream::next) method).
pub fn send(self) -> #{pagination_stream}::PaginationStream<#{item_type}> {
// Move individual fields out of self for the borrow checker
let builder = self.builder;
let handle = self.handle;
#{runtime_plugin_init}
#{fn_stream}::FnStream::new(move |tx| #{Box}::pin(async move {
#{pagination_stream}::PaginationStream::new(#{pagination_stream}::FnStream::new(move |tx| #{Box}::pin(async move {
// Build the input for the first time. If required fields are missing, this is where we'll produce an early error.
let mut input = match builder.build().map_err(#{SdkError}::construction_failure) {
#{Ok}(input) => input,
Expand Down Expand Up @@ -177,7 +178,7 @@ class PaginatorGenerator private constructor(
return
}
}
}))
})))
}
}
""",
Expand Down Expand Up @@ -257,11 +258,12 @@ class PaginatorGenerator private constructor(
impl ${paginatorName}Items {
/// Create the pagination stream
///
/// _Note: No requests will be dispatched until the stream is used (eg. with [`.next().await`](tokio_stream::StreamExt::next))._
/// _Note_: No requests will be dispatched until the stream is used
/// (e.g. with the [`.next().await`](aws_smithy_async::future::pagination_stream::PaginationStream::next) method).
///
/// To read the entirety of the paginator, use [`.collect::<Result<Vec<_>, _>()`](tokio_stream::StreamExt::collect).
pub fn send(self) -> impl #{Stream}<Item = #{item_type}> + #{Unpin} {
#{fn_stream}::TryFlatMap::new(self.0.send()).flat_map(|page| #{extract_items}(page).unwrap_or_default().into_iter())
/// To read the entirety of the paginator, use [`.collect::<Result<Vec<_>, _>()`](aws_smithy_async::future::pagination_stream::PaginationStream::collect).
pub fn send(self) -> #{pagination_stream}::PaginationStream<#{item_type}> {
#{pagination_stream}::TryFlatMap::new(self.0.send()).flat_map(|page| #{extract_items}(page).unwrap_or_default().into_iter())
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -433,7 +433,7 @@ class FluentClientGenerator(
"""
/// Create a paginator for this request
///
/// Paginators are used by calling [`send().await`](#{Paginator}::send) which returns a `Stream`.
/// Paginators are used by calling [`send().await`](#{Paginator}::send) which returns a [`PaginationStream`](aws_smithy_async::future::pagination_stream::PaginationStream).
pub fn into_paginator(self) -> #{Paginator} {
#{Paginator}::new(self.handle, self.inner)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,32 @@ package software.amazon.smithy.rust.codegen.client.smithy.protocols

import software.amazon.smithy.rust.codegen.client.smithy.ClientCodegenContext
import software.amazon.smithy.rust.codegen.core.rustlang.CargoDependency
import software.amazon.smithy.rust.codegen.core.rustlang.RustWriter
import software.amazon.smithy.rust.codegen.core.rustlang.rust
import software.amazon.smithy.rust.codegen.core.rustlang.rustTemplate
import software.amazon.smithy.rust.codegen.core.smithy.RuntimeType
import software.amazon.smithy.rust.codegen.core.smithy.generators.http.HttpMessageType
import software.amazon.smithy.rust.codegen.core.smithy.generators.protocol.ProtocolPayloadGenerator
import software.amazon.smithy.rust.codegen.core.smithy.protocols.HttpBoundProtocolPayloadGenerator
import software.amazon.smithy.rust.codegen.core.smithy.protocols.Protocol
import software.amazon.smithy.rust.codegen.core.smithy.protocols.StreamPayloadSerializerParams
import software.amazon.smithy.rust.codegen.core.smithy.protocols.StreamPayloadSerializerRenderer

private class ClientStreamPayloadSerializerRenderer : StreamPayloadSerializerRenderer {
override fun renderOutputType(writer: RustWriter, params: StreamPayloadSerializerParams) {
writer.rust(
"#T",
RuntimeType.futuresStreamCompatByteStream(params.runtimeConfig).toSymbol(),
)
}

override fun renderPayload(writer: RustWriter, params: StreamPayloadSerializerParams) {
writer.rust(
"#T::new(${params.payloadName!!})",
RuntimeType.futuresStreamCompatByteStream(params.runtimeConfig),
)
}
}

class ClientHttpBoundProtocolPayloadGenerator(
codegenContext: ClientCodegenContext,
Expand Down Expand Up @@ -41,4 +61,5 @@ class ClientHttpBoundProtocolPayloadGenerator(
"errorMarshallerConstructorFn" to params.errorMarshallerConstructorFn,
)
},
ClientStreamPayloadSerializerRenderer(),
)
Original file line number Diff line number Diff line change
Expand Up @@ -407,9 +407,10 @@ data class RuntimeType(val path: String, val dependency: RustDependency? = null)
fun retryErrorKind(runtimeConfig: RuntimeConfig) = smithyTypes(runtimeConfig).resolve("retry::ErrorKind")
fun eventStreamReceiver(runtimeConfig: RuntimeConfig): RuntimeType =
smithyHttp(runtimeConfig).resolve("event_stream::Receiver")

fun eventStreamSender(runtimeConfig: RuntimeConfig): RuntimeType =
smithyHttp(runtimeConfig).resolve("event_stream::EventStreamSender")
fun futuresStreamCompatByteStream(runtimeConfig: RuntimeConfig): RuntimeType =
ysaito1001 marked this conversation as resolved.
Show resolved Hide resolved
smithyHttp(runtimeConfig).resolve("futures_stream_adapter::FuturesStreamCompatByteStream")

fun errorMetadata(runtimeConfig: RuntimeConfig) = smithyTypes(runtimeConfig).resolve("error::ErrorMetadata")
fun errorMetadataBuilder(runtimeConfig: RuntimeConfig) =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package software.amazon.smithy.rust.codegen.core.smithy.protocols

import software.amazon.smithy.codegen.core.CodegenException
import software.amazon.smithy.codegen.core.SymbolProvider
import software.amazon.smithy.model.shapes.BlobShape
import software.amazon.smithy.model.shapes.DocumentShape
import software.amazon.smithy.model.shapes.MemberShape
Expand All @@ -17,12 +18,14 @@ import software.amazon.smithy.model.traits.EnumTrait
import software.amazon.smithy.rust.codegen.core.rustlang.CargoDependency
import software.amazon.smithy.rust.codegen.core.rustlang.RustWriter
import software.amazon.smithy.rust.codegen.core.rustlang.rust
import software.amazon.smithy.rust.codegen.core.rustlang.rustBlockTemplate
import software.amazon.smithy.rust.codegen.core.rustlang.rustTemplate
import software.amazon.smithy.rust.codegen.core.rustlang.withBlock
import software.amazon.smithy.rust.codegen.core.rustlang.withBlockTemplate
import software.amazon.smithy.rust.codegen.core.smithy.CodegenContext
import software.amazon.smithy.rust.codegen.core.smithy.CodegenTarget
import software.amazon.smithy.rust.codegen.core.smithy.RuntimeConfig
import software.amazon.smithy.rust.codegen.core.smithy.RuntimeType
import software.amazon.smithy.rust.codegen.core.smithy.RuntimeType.Companion.preludeScope
import software.amazon.smithy.rust.codegen.core.smithy.generators.http.HttpMessageType
import software.amazon.smithy.rust.codegen.core.smithy.generators.operationBuildError
import software.amazon.smithy.rust.codegen.core.smithy.generators.protocol.AdditionalPayloadContext
Expand Down Expand Up @@ -50,11 +53,38 @@ data class EventStreamBodyParams(
val additionalPayloadContext: AdditionalPayloadContext,
)

data class StreamPayloadSerializerParams(
val symbolProvider: SymbolProvider,
val runtimeConfig: RuntimeConfig,
val member: MemberShape,
val payloadName: String?,
)

/**
* An interface to help customize how to render a stream payload serializer.
*
* When the output of the serializer is passed to `hyper::body::Body::wrap_stream`,
* it requires what's passed to implement `futures_core::stream::Stream` trait.
* However, a certain type, such as `aws_smithy_http::byte_stream::ByteStream` does not
* implement the trait, so we need to wrap it with a new-type that does implement the trait.
*
* Each implementing type of the interface can choose whether the payload should be wrapped
* with such a new-type or should simply be used as-is.
*/
interface StreamPayloadSerializerRenderer {
/** Renders the return type of stream payload serializer **/
fun renderOutputType(writer: RustWriter, params: StreamPayloadSerializerParams)

/** Renders the stream payload **/
fun renderPayload(writer: RustWriter, params: StreamPayloadSerializerParams)
}

class HttpBoundProtocolPayloadGenerator(
codegenContext: CodegenContext,
private val protocol: Protocol,
private val httpMessageType: HttpMessageType = HttpMessageType.REQUEST,
private val renderEventStreamBody: (RustWriter, EventStreamBodyParams) -> Unit,
private val streamPayloadSerializerRenderer: StreamPayloadSerializerRenderer,
) : ProtocolPayloadGenerator {
private val symbolProvider = codegenContext.symbolProvider
private val model = codegenContext.model
Expand All @@ -63,6 +93,7 @@ class HttpBoundProtocolPayloadGenerator(
private val httpBindingResolver = protocol.httpBindingResolver
private val smithyEventStream = RuntimeType.smithyEventStream(runtimeConfig)
private val codegenScope = arrayOf(
*preludeScope,
"hyper" to CargoDependency.HyperWithStream.toType(),
"SdkBody" to RuntimeType.sdkBody(runtimeConfig),
"BuildError" to runtimeConfig.operationBuildError(),
Expand Down Expand Up @@ -238,17 +269,22 @@ class HttpBoundProtocolPayloadGenerator(
) {
val ref = if (payloadMetadata.takesOwnership) "" else "&"
val serializer = protocolFunctions.serializeFn(member, fnNameSuffix = "http_payload") { fnName ->
val outputT = if (member.isStreaming(model)) {
symbolProvider.toSymbol(member)
} else {
RuntimeType.ByteSlab.toSymbol()
}
rustBlockTemplate(
"pub fn $fnName(payload: $ref#{Member}) -> Result<#{outputT}, #{BuildError}>",
rustTemplate(
"pub(crate) fn $fnName(payload: $ref#{Member}) -> #{Result}<",
"Member" to symbolProvider.toSymbol(member),
"outputT" to outputT,
*codegenScope,
) {
)
if (member.isStreaming(model)) {
streamPayloadSerializerRenderer.renderOutputType(
this,
StreamPayloadSerializerParams(symbolProvider, runtimeConfig, member, null),
)
} else {
rust("#T", RuntimeType.ByteSlab.toSymbol())
}
rustTemplate(", #{BuildError}>", *codegenScope)

withBlockTemplate("{", "}", *codegenScope) {
val asRef = if (payloadMetadata.takesOwnership) "" else ".as_ref()"

if (symbolProvider.toSymbol(member).isOptional()) {
Expand Down Expand Up @@ -303,8 +339,10 @@ class HttpBoundProtocolPayloadGenerator(
is BlobShape -> {
// Write the raw blob to the payload.
if (member.isStreaming(model)) {
// Return the `ByteStream`.
rust(payloadName)
streamPayloadSerializerRenderer.renderPayload(
this,
StreamPayloadSerializerParams(symbolProvider, runtimeConfig, member, payloadName),
)
} else {
// Convert the `Blob` into a `Vec<u8>` and return it.
rust("$payloadName.into_inner()")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,29 @@ class PythonServerAfterDeserializedMemberServerHttpBoundCustomization() :
is ServerHttpBoundProtocolSection.AfterTimestampDeserializedMember -> writable {
rust(".into()")
}

else -> emptySection
}
}

/**
* Customization class used to determine the type of serialized stream payload and how it should be wrapped in a
* new-type wrapper to enable `futures_core::stream::Stream` trait.
*/
class PythonServerStreamPayloadSerializerCustomization() : ServerHttpBoundProtocolCustomization() {
override fun section(section: ServerHttpBoundProtocolSection): Writable = when (section) {
is ServerHttpBoundProtocolSection.TypeOfSerializedStreamPayload -> writable {
// `aws_smithy_http_server_python::types::ByteStream` already implements
// `futures::stream::Stream`, so no need to wrap it in a futures' stream-compatible
// wrapper.
rust("#T", section.params.symbolProvider.toSymbol(section.params.member))
}

is ServerHttpBoundProtocolSection.WrapStreamPayload -> writable {
// payloadName is always non-null within WrapStreamAfterPayloadGenerated
ysaito1001 marked this conversation as resolved.
Show resolved Hide resolved
rust(section.params.payloadName!!)
}

else -> emptySection
}
}
Expand Down Expand Up @@ -91,6 +114,7 @@ class PythonServerProtocolLoader(
),
additionalServerHttpBoundProtocolCustomizations = listOf(
PythonServerAfterDeserializedMemberServerHttpBoundCustomization(),
PythonServerStreamPayloadSerializerCustomization(),
),
additionalHttpBindingCustomizations = listOf(
PythonServerAfterDeserializedMemberHttpBindingCustomization(runtimeConfig),
Expand All @@ -103,6 +127,7 @@ class PythonServerProtocolLoader(
),
additionalServerHttpBoundProtocolCustomizations = listOf(
PythonServerAfterDeserializedMemberServerHttpBoundCustomization(),
PythonServerStreamPayloadSerializerCustomization(),
),
additionalHttpBindingCustomizations = listOf(
PythonServerAfterDeserializedMemberHttpBindingCustomization(runtimeConfig),
Expand All @@ -115,6 +140,7 @@ class PythonServerProtocolLoader(
),
additionalServerHttpBoundProtocolCustomizations = listOf(
PythonServerAfterDeserializedMemberServerHttpBoundCustomization(),
PythonServerStreamPayloadSerializerCustomization(),
),
additionalHttpBindingCustomizations = listOf(
PythonServerAfterDeserializedMemberHttpBindingCustomization(runtimeConfig),
Expand Down
Loading