Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove futures_core::stream::Stream from aws_smithy_http::byte_stream::ByteStream #2983

Merged
merged 31 commits into from
Sep 28, 2023
Merged
Show file tree
Hide file tree
Changes from 19 commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
f43afc0
Remove `futures_core::stream::Stream` from `aws-smithy-async`
ysaito1001 Sep 9, 2023
cbbaf77
Update codegen client to use `PaginationStream`
ysaito1001 Sep 9, 2023
8005c3d
Update canary to use `PaginationStream`
ysaito1001 Sep 9, 2023
75b96e3
Stop using `tokio_stream::StreamExt` in integration-tests
ysaito1001 Sep 9, 2023
2b1fcb1
Add comment explaining why the test uses `.enter`
ysaito1001 Sep 11, 2023
73966d4
Make `FnStream::poll_next` module private
ysaito1001 Sep 11, 2023
e25a916
Update CHANGELOG.next.toml
ysaito1001 Sep 11, 2023
cce169c
Remove impl `Stream` trait for `ByteStream`
ysaito1001 Sep 12, 2023
0362d9c
Make rendering stream payload serializer configurable
ysaito1001 Sep 12, 2023
0099bdb
Let client configure how to render stream payload serializer
ysaito1001 Sep 12, 2023
03e9278
Let server configure how to render stream payload serializer
ysaito1001 Sep 12, 2023
b04b3ca
Remove unused `tokio_stream::StreamExt` from `glacier_checksums`
ysaito1001 Sep 13, 2023
6575943
Update CHANGELOG.next.toml
ysaito1001 Sep 13, 2023
8147d81
Remove unnecessary `Unpin` bound
ysaito1001 Sep 19, 2023
43318e7
Add convenince method `try_collect`
ysaito1001 Sep 20, 2023
b6811bf
Merge branch 'main' into ysaito/remove-futures-stream-from-smithy-async
ysaito1001 Sep 20, 2023
3ad25c8
Merge branch 'ysaito/remove-futures-stream-from-smithy-async' into ys…
ysaito1001 Sep 20, 2023
e076f64
Wrap stream payload in a new-type at call sites of serializer
ysaito1001 Sep 21, 2023
5381cfe
Revert HttpBoundProtocolPayloadGenerator.kt since no change is needed
ysaito1001 Sep 21, 2023
1add37f
Move `fn_stream` to its own module
ysaito1001 Sep 27, 2023
b8f37ea
Provide docs for `PaginationStream` for end users
ysaito1001 Sep 27, 2023
57f2707
Merge branch 'main' into ysaito/remove-futures-stream-from-smithy-async
ysaito1001 Sep 27, 2023
7571c8c
Update CHANGELOG.next.toml
ysaito1001 Sep 27, 2023
e8b1f65
Merge branch 'ysaito/remove-futures-stream-from-smithy-async' into ys…
ysaito1001 Sep 27, 2023
a802799
Make `ByteStream::poll_next` to be `pub(crate)`
ysaito1001 Sep 28, 2023
5d07b16
Doc link `ByteStream::next` to `ByteStream::try_next`
ysaito1001 Sep 28, 2023
6fc9a96
Doc link `ByteStream::try_next` to `ByteStream::next`
ysaito1001 Sep 28, 2023
b01b1d4
Explain why `futures_stream_adapter` is `doc(hidden)`
ysaito1001 Sep 28, 2023
f4a60df
Merge branch 'main' into ysaito/remove-futures-stream-from-byte-stream
ysaito1001 Sep 28, 2023
5bcf5ca
Make `ByteStream::poll_next` `pub` again
ysaito1001 Sep 28, 2023
e942f24
Merge branch 'main' into ysaito/remove-futures-stream-from-byte-stream
ysaito1001 Sep 28, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions CHANGELOG.next.toml
Original file line number Diff line number Diff line change
Expand Up @@ -156,3 +156,27 @@ message = "Source defaults from the default trait instead of implicitly based on
references = ["smithy-rs#2985"]
meta = { "breaking" = false, "tada" = false, "bug" = true, "target" = "client" }
author = "rcoh"

[[aws-sdk-rust]]
message = "The `futures_core::stream::Stream` trait has been removed from public API. It should not affect usual SDK use cases. If your code uses paginators, you do not need to use the `Stream` trait or its exntension traits, but only the `next`, `try_next`, `collect` methods are supported on `PaginationStream`. Other stream operations made available through the trait or its extension traits are not supported, but we should be able to add them as needed in a backward compatible manner."
references = ["smithy-rs#2978"]
meta = { "breaking" = true, "tada" = false, "bug" = false }
author = "ysaito1001"

[[smithy-rs]]
message = "The `futures_core::stream::Stream` trait has been removed from public API. [`FnStream`](https://docs.rs/aws-smithy-async/latest/aws_smithy_async/future/pagination_stream/struct.FnStream.html) only supports `next`, `try_next`, and `collect` methods. [`TryFlatMap::flat_map`](https://docs.rs/aws-smithy-async/latest/aws_smithy_async/future/pagination_stream/struct.TryFlatMap.html#method.flat_map) returns [`PaginationStream`](https://docs.rs/aws-smithy-async/latest/aws_smithy_async/future/pagination_stream/struct.PaginationStream.html), which should be preferred to `FnStream` at an interface level. For stream operations previously made available through the trait or its extension traits, we should be able to add them as needed in a backward compatible manner."
references = ["smithy-rs#2978"]
meta = { "breaking" = true, "tada" = false, "bug" = false, "target" = "client" }
author = "ysaito1001"

[[aws-sdk-rust]]
message = "The `futures_core::stream::Stream` trait has been removed from [`ByteStream`](https://docs.rs/aws-smithy-http/latest/aws_smithy_http/byte_stream/struct.ByteStream.html). The methods mentioned in the [doc](https://docs.rs/aws-smithy-http/latest/aws_smithy_http/byte_stream/struct.ByteStream.html#getting-data-out-of-a-bytestream) will continue to be supported. Other stream operations made available through the trait or its extension traits are not supported, but we should be able to add them as needed in a backward compatible manner."
references = ["smithy-rs#2983"]
meta = { "breaking" = true, "tada" = false, "bug" = false }
author = "ysaito1001"

[[smithy-rs]]
message = "The `futures_core::stream::Stream` trait has been removed from [`ByteStream`](https://docs.rs/aws-smithy-http/latest/aws_smithy_http/byte_stream/struct.ByteStream.html). The methods mentioned in the [doc](https://docs.rs/aws-smithy-http/latest/aws_smithy_http/byte_stream/struct.ByteStream.html#getting-data-out-of-a-bytestream) will continue to be supported. Other stream operations made available through the trait or its extension traits are not supported, but we should be able to add them as needed in a backward compatible manner."
references = ["smithy-rs#2983"]
meta = { "breaking" = true, "tada" = false, "bug" = false, "target" = "client" }
author = "ysaito1001"
1 change: 0 additions & 1 deletion aws/rust-runtime/aws-inlineable/src/glacier_checksums.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ use bytes::Buf;
use bytes_utils::SegmentedBuf;
use http::header::HeaderName;
use ring::digest::{Context, Digest, SHA256};
use tokio_stream::StreamExt;

const TREE_HASH_HEADER: &str = "x-amz-sha256-tree-hash";
const X_AMZ_CONTENT_SHA256: &str = "x-amz-content-sha256";
Expand Down
2 changes: 0 additions & 2 deletions aws/sdk/integration-tests/dynamodb/tests/paginators.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,6 @@
use std::collections::HashMap;
use std::iter::FromIterator;

use tokio_stream::StreamExt;

use aws_credential_types::Credentials;
use aws_sdk_dynamodb::types::AttributeValue;
use aws_sdk_dynamodb::{Client, Config};
Expand Down
2 changes: 0 additions & 2 deletions aws/sdk/integration-tests/ec2/tests/paginators.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,6 @@
* SPDX-License-Identifier: Apache-2.0
*/

use tokio_stream::StreamExt;

use aws_sdk_ec2::{config::Credentials, config::Region, types::InstanceType, Client, Config};
use aws_smithy_client::http_connector::HttpConnector;
use aws_smithy_client::test_connection::TestConnection;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ class PaginatorGenerator private constructor(
"HttpResponse" to RuntimeType.smithyRuntimeApi(runtimeConfig).resolve("client::orchestrator::HttpResponse"),
"SdkError" to RuntimeType.sdkError(runtimeConfig),
"client" to RuntimeType.smithyClient(runtimeConfig),
"fn_stream" to RuntimeType.smithyAsync(runtimeConfig).resolve("future::fn_stream"),
"pagination_stream" to RuntimeType.smithyAsync(runtimeConfig).resolve("future::pagination_stream"),

// External Types
"Stream" to RuntimeType.TokioStream.resolve("Stream"),
Expand Down Expand Up @@ -141,13 +141,14 @@ class PaginatorGenerator private constructor(

/// Create the pagination stream
///
/// _Note:_ No requests will be dispatched until the stream is used (eg. with [`.next().await`](tokio_stream::StreamExt::next)).
pub fn send(self) -> impl #{Stream}<Item = #{item_type}> + #{Unpin} {
/// _Note:_ No requests will be dispatched until the stream is used
/// (e.g. with the [`.next().await`](aws_smithy_async::future::pagination_stream::PaginationStream::next) method).
pub fn send(self) -> #{pagination_stream}::PaginationStream<#{item_type}> {
// Move individual fields out of self for the borrow checker
let builder = self.builder;
let handle = self.handle;
#{runtime_plugin_init}
#{fn_stream}::FnStream::new(move |tx| #{Box}::pin(async move {
#{pagination_stream}::PaginationStream::new(#{pagination_stream}::FnStream::new(move |tx| #{Box}::pin(async move {
// Build the input for the first time. If required fields are missing, this is where we'll produce an early error.
let mut input = match builder.build().map_err(#{SdkError}::construction_failure) {
#{Ok}(input) => input,
Expand Down Expand Up @@ -177,7 +178,7 @@ class PaginatorGenerator private constructor(
return
}
}
}))
})))
}
}
""",
Expand Down Expand Up @@ -257,11 +258,12 @@ class PaginatorGenerator private constructor(
impl ${paginatorName}Items {
/// Create the pagination stream
///
/// _Note: No requests will be dispatched until the stream is used (eg. with [`.next().await`](tokio_stream::StreamExt::next))._
/// _Note_: No requests will be dispatched until the stream is used
/// (e.g. with the [`.next().await`](aws_smithy_async::future::pagination_stream::PaginationStream::next) method).
///
/// To read the entirety of the paginator, use [`.collect::<Result<Vec<_>, _>()`](tokio_stream::StreamExt::collect).
pub fn send(self) -> impl #{Stream}<Item = #{item_type}> + #{Unpin} {
#{fn_stream}::TryFlatMap::new(self.0.send()).flat_map(|page| #{extract_items}(page).unwrap_or_default().into_iter())
/// To read the entirety of the paginator, use [`.collect::<Result<Vec<_>, _>()`](aws_smithy_async::future::pagination_stream::PaginationStream::collect).
pub fn send(self) -> #{pagination_stream}::PaginationStream<#{item_type}> {
#{pagination_stream}::TryFlatMap::new(self.0.send()).flat_map(|page| #{extract_items}(page).unwrap_or_default().into_iter())
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -433,7 +433,7 @@ class FluentClientGenerator(
"""
/// Create a paginator for this request
///
/// Paginators are used by calling [`send().await`](#{Paginator}::send) which returns a `Stream`.
/// Paginators are used by calling [`send().await`](#{Paginator}::send) which returns a [`PaginationStream`](aws_smithy_async::future::pagination_stream::PaginationStream).
pub fn into_paginator(self) -> #{Paginator} {
#{Paginator}::new(self.handle, self.inner)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -407,9 +407,10 @@ data class RuntimeType(val path: String, val dependency: RustDependency? = null)
fun retryErrorKind(runtimeConfig: RuntimeConfig) = smithyTypes(runtimeConfig).resolve("retry::ErrorKind")
fun eventStreamReceiver(runtimeConfig: RuntimeConfig): RuntimeType =
smithyHttp(runtimeConfig).resolve("event_stream::Receiver")

fun eventStreamSender(runtimeConfig: RuntimeConfig): RuntimeType =
smithyHttp(runtimeConfig).resolve("event_stream::EventStreamSender")
fun futuresStreamCompatByteStream(runtimeConfig: RuntimeConfig): RuntimeType =
ysaito1001 marked this conversation as resolved.
Show resolved Hide resolved
smithyHttp(runtimeConfig).resolve("futures_stream_adapter::FuturesStreamCompatByteStream")

fun errorMetadata(runtimeConfig: RuntimeConfig) = smithyTypes(runtimeConfig).resolve("error::ErrorMetadata")
fun errorMetadataBuilder(runtimeConfig: RuntimeConfig) =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ class PythonServerAfterDeserializedMemberServerHttpBoundCustomization() :
is ServerHttpBoundProtocolSection.AfterTimestampDeserializedMember -> writable {
rust(".into()")
}

else -> emptySection
}
}
Expand All @@ -78,6 +79,23 @@ class PythonServerAfterDeserializedMemberHttpBindingCustomization(private val ru
}
}

/**
* Customization class used to determine how serialized stream payload should be rendered for the Python server.
*
* In this customization, we do not need to wrap the payload in a new-type wrapper to enable the
* `futures_core::stream::Stream` trait since the payload in question has a type
* `aws_smithy_http_server_python::types::ByteStream` which already implements the `Stream` trait.
*/
class PythonServerStreamPayloadSerializerCustomization() : ServerHttpBoundProtocolCustomization() {
override fun section(section: ServerHttpBoundProtocolSection): Writable = when (section) {
is ServerHttpBoundProtocolSection.WrapStreamPayload -> writable {
section.params.payloadGenerator.generatePayload(this, section.params.shapeName, section.params.shape)
}

else -> emptySection
}
}

class PythonServerProtocolLoader(
private val supportedProtocols: ProtocolMap<ServerProtocolGenerator, ServerCodegenContext>,
) : ProtocolLoader<ServerProtocolGenerator, ServerCodegenContext>(supportedProtocols) {
Expand All @@ -91,6 +109,7 @@ class PythonServerProtocolLoader(
),
additionalServerHttpBoundProtocolCustomizations = listOf(
PythonServerAfterDeserializedMemberServerHttpBoundCustomization(),
PythonServerStreamPayloadSerializerCustomization(),
),
additionalHttpBindingCustomizations = listOf(
PythonServerAfterDeserializedMemberHttpBindingCustomization(runtimeConfig),
Expand All @@ -103,6 +122,7 @@ class PythonServerProtocolLoader(
),
additionalServerHttpBoundProtocolCustomizations = listOf(
PythonServerAfterDeserializedMemberServerHttpBoundCustomization(),
PythonServerStreamPayloadSerializerCustomization(),
),
additionalHttpBindingCustomizations = listOf(
PythonServerAfterDeserializedMemberHttpBindingCustomization(runtimeConfig),
Expand All @@ -115,6 +135,7 @@ class PythonServerProtocolLoader(
),
additionalServerHttpBoundProtocolCustomizations = listOf(
PythonServerAfterDeserializedMemberServerHttpBoundCustomization(),
PythonServerStreamPayloadSerializerCustomization(),
),
additionalHttpBindingCustomizations = listOf(
PythonServerAfterDeserializedMemberHttpBindingCustomization(runtimeConfig),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,11 +81,28 @@ import software.amazon.smithy.rust.codegen.server.smithy.generators.protocol.Ser
import software.amazon.smithy.rust.codegen.server.smithy.generators.serverBuilderSymbol
import java.util.logging.Logger

data class StreamPayloadSerializerParams(
val codegenContext: ServerCodegenContext,
val payloadGenerator: ServerHttpBoundProtocolPayloadGenerator,
val shapeName: String,
val shape: OperationShape,
)

/**
* Class describing a ServerHttpBoundProtocol section that can be used in a customization.
*/
sealed class ServerHttpBoundProtocolSection(name: String) : Section(name) {
data class AfterTimestampDeserializedMember(val shape: MemberShape) : ServerHttpBoundProtocolSection("AfterTimestampDeserializedMember")

/**
* Represent a section for rendering the serialized stream payload.
*
* If the payload does not implement the `futures_core::stream::Stream`, which is the case for
* `aws_smithy_http::byte_stream::ByteStream`, the section needs to be overridden and renders a new-type wrapper
* around the payload to enable the `Stream` trait.
*/
data class WrapStreamPayload(val params: StreamPayloadSerializerParams) :
ServerHttpBoundProtocolSection("WrapStreamPayload")
}

/**
Expand Down Expand Up @@ -540,7 +557,18 @@ class ServerHttpBoundProtocolTraitImplGenerator(
operationShape.outputShape(model).findStreamingMember(model)?.let {
val payloadGenerator = ServerHttpBoundProtocolPayloadGenerator(codegenContext, protocol)
withBlockTemplate("let body = #{SmithyHttpServer}::body::boxed(#{SmithyHttpServer}::body::Body::wrap_stream(", "));", *codegenScope) {
payloadGenerator.generatePayload(this, "output", operationShape)
for (customization in customizations) {
customization.section(
ServerHttpBoundProtocolSection.WrapStreamPayload(
StreamPayloadSerializerParams(
codegenContext,
payloadGenerator,
"output",
operationShape,
),
),
)(this)
}
ysaito1001 marked this conversation as resolved.
Show resolved Hide resolved
}
} ?: run {
val payloadGenerator = ServerHttpBoundProtocolPayloadGenerator(codegenContext, protocol)
Expand Down Expand Up @@ -707,7 +735,7 @@ class ServerHttpBoundProtocolTraitImplGenerator(
rustTemplate(
"""
#{SmithyHttpServer}::protocol::content_type_header_classifier(
&parts.headers,
&parts.headers,
Some("$expectedRequestContentType"),
)?;
input = #{parser}(bytes.as_ref(), input)?;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,21 +9,68 @@ import software.amazon.smithy.aws.traits.protocols.AwsJson1_0Trait
import software.amazon.smithy.aws.traits.protocols.AwsJson1_1Trait
import software.amazon.smithy.aws.traits.protocols.RestJson1Trait
import software.amazon.smithy.aws.traits.protocols.RestXmlTrait
import software.amazon.smithy.rust.codegen.core.rustlang.Writable
import software.amazon.smithy.rust.codegen.core.rustlang.withBlockTemplate
import software.amazon.smithy.rust.codegen.core.rustlang.writable
import software.amazon.smithy.rust.codegen.core.smithy.RuntimeType
import software.amazon.smithy.rust.codegen.core.smithy.protocols.AwsJsonVersion
import software.amazon.smithy.rust.codegen.core.smithy.protocols.ProtocolLoader
import software.amazon.smithy.rust.codegen.core.smithy.protocols.ProtocolMap
import software.amazon.smithy.rust.codegen.core.util.isOutputEventStream
import software.amazon.smithy.rust.codegen.server.smithy.ServerCodegenContext
import software.amazon.smithy.rust.codegen.server.smithy.generators.protocol.ServerProtocolGenerator

class StreamPayloadSerializerCustomization() : ServerHttpBoundProtocolCustomization() {
override fun section(section: ServerHttpBoundProtocolSection): Writable = when (section) {
is ServerHttpBoundProtocolSection.WrapStreamPayload -> writable {
if (section.params.shape.isOutputEventStream(section.params.codegenContext.model)) {
// Event stream payload, of type `aws_smithy_http::event_stream::MessageStreamAdapter`, already
// implements the `Stream` trait, so no need to wrap it in the new-type.
section.params.payloadGenerator.generatePayload(this, section.params.shapeName, section.params.shape)
} else {
// Otherwise, the stream payload is `aws_smithy_http::byte_stream::ByteStream`. We wrap it in the
// new-type to enable the `Stream` trait.
withBlockTemplate(
"#{FuturesStreamCompatByteStream}::new(",
")",
"FuturesStreamCompatByteStream" to RuntimeType.futuresStreamCompatByteStream(section.params.codegenContext.runtimeConfig),
) {
section.params.payloadGenerator.generatePayload(
this,
section.params.shapeName,
section.params.shape,
)
}
}
}

else -> emptySection
}
}

class ServerProtocolLoader(supportedProtocols: ProtocolMap<ServerProtocolGenerator, ServerCodegenContext>) :
ProtocolLoader<ServerProtocolGenerator, ServerCodegenContext>(supportedProtocols) {

companion object {
val DefaultProtocols = mapOf(
RestJson1Trait.ID to ServerRestJsonFactory(),
RestXmlTrait.ID to ServerRestXmlFactory(),
AwsJson1_0Trait.ID to ServerAwsJsonFactory(AwsJsonVersion.Json10),
AwsJson1_1Trait.ID to ServerAwsJsonFactory(AwsJsonVersion.Json11),
RestJson1Trait.ID to ServerRestJsonFactory(
additionalServerHttpBoundProtocolCustomizations = listOf(
StreamPayloadSerializerCustomization(),
),
),
RestXmlTrait.ID to ServerRestXmlFactory(
additionalServerHttpBoundProtocolCustomizations = listOf(
StreamPayloadSerializerCustomization(),
),
),
AwsJson1_0Trait.ID to ServerAwsJsonFactory(
AwsJsonVersion.Json10,
additionalServerHttpBoundProtocolCustomizations = listOf(StreamPayloadSerializerCustomization()),
),
AwsJson1_1Trait.ID to ServerAwsJsonFactory(
AwsJsonVersion.Json11,
additionalServerHttpBoundProtocolCustomizations = listOf(StreamPayloadSerializerCustomization()),
),
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,17 @@ import software.amazon.smithy.rust.codegen.server.smithy.generators.protocol.Ser
* RestXml server-side protocol factory. This factory creates the [ServerHttpProtocolGenerator]
* with RestXml specific configurations.
*/
class ServerRestXmlFactory : ProtocolGeneratorFactory<ServerHttpBoundProtocolGenerator, ServerCodegenContext> {
class ServerRestXmlFactory(
private val additionalServerHttpBoundProtocolCustomizations: List<ServerHttpBoundProtocolCustomization> = listOf(),
) : ProtocolGeneratorFactory<ServerHttpBoundProtocolGenerator, ServerCodegenContext> {
override fun protocol(codegenContext: ServerCodegenContext): Protocol = ServerRestXmlProtocol(codegenContext)

override fun buildProtocolGenerator(codegenContext: ServerCodegenContext): ServerHttpBoundProtocolGenerator =
ServerHttpBoundProtocolGenerator(codegenContext, ServerRestXmlProtocol(codegenContext))
ServerHttpBoundProtocolGenerator(
codegenContext,
ServerRestXmlProtocol(codegenContext),
additionalServerHttpBoundProtocolCustomizations,
)

override fun support(): ProtocolSupport {
return ProtocolSupport(
Expand Down
Loading