Skip to content

Commit

Permalink
Remove futures_core::stream::Stream from aws-smithy-async (#2978)
Browse files Browse the repository at this point in the history
## Motivation and Context
Removes `futures_core::stream::Stream` from the `aws-smithy-async`
crate.

## Description
This PR is part of our ongoing effort,
#2413. We remove the
`futures_core::stream::Stream` trait from the public API in the
`aws-smithy-async` crate. While doing so, we compensate
- `FnStream` by providing the explicit `.next` and `.collect` methods to
let the previously working code continue working.
- `TryFlatMap` by making it return a new-type wrapper `PaginationStream`
to hide `FnStream` from those who use paginators.

With this change, the latest canary no longer uses
`tokio_stream::StreamExt`, since the paginator does not work in terms of
the `Stream` trait. Furthermore, `aws-sdk-rust` has been more than [3
releases since
release-2023-01-26](https://github.com/awslabs/aws-sdk-rust/releases),
so the module `release-2023-01-26` has been removed from
`canary-lambda`.

## Testing
No new tests added, but made sure the existing tests keep working with
the change.

## Checklist
<!--- If a checkbox below is not applicable, then please DELETE it
rather than leaving it unchecked -->
- [x] I have updated `CHANGELOG.next.toml` if I made changes to the
smithy-rs codegen or runtime crates
- [x] I have updated `CHANGELOG.next.toml` if I made changes to the AWS
SDK, generated SDK code, or SDK runtime crates

----

_By submitting this pull request, I confirm that you can use, modify,
copy, and redistribute this contribution, under the terms of your
choice._
  • Loading branch information
ysaito1001 authored Sep 27, 2023
1 parent 673e385 commit 75fc85f
Show file tree
Hide file tree
Showing 20 changed files with 605 additions and 390 deletions.
12 changes: 12 additions & 0 deletions CHANGELOG.next.toml
Original file line number Diff line number Diff line change
Expand Up @@ -183,3 +183,15 @@ message = "Produce better docs when items are marked @required"
references = ["smithy-rs#2996"]
meta = { "breaking" = false, "tada" = false, "bug" = false, "target" = "client" }
author = "rcoh"

[[aws-sdk-rust]]
message = "The `futures_core::stream::Stream` trait has been removed from public API. It should not affect usual SDK use cases. If your code uses paginators, you do not need to use the `Stream` trait or its exntension traits, but only the `next`, `try_next`, `collect`, and `try_collect` methods are supported on `PaginationStream`. Other stream operations that were previously available through the trait or its extension traits can be added later in a backward compatible manner. Finally, `fn_stream` has been moved to be a child module of `pagination_stream`."
references = ["smithy-rs#2978"]
meta = { "breaking" = true, "tada" = false, "bug" = false }
author = "ysaito1001"

[[smithy-rs]]
message = "The `futures_core::stream::Stream` trait has been removed from public API. `FnStream` only supports `next`, `try_next`, `collect`, and `try_collect` methods. [`TryFlatMap::flat_map`](https://docs.rs/aws-smithy-async/latest/aws_smithy_async/future/pagination_stream/struct.TryFlatMap.html#method.flat_map) returns [`PaginationStream`](https://docs.rs/aws-smithy-async/latest/aws_smithy_async/future/pagination_stream/struct.PaginationStream.html), which should be preferred to `FnStream` at an interface level. Other stream operations that were previously available through the trait or its extension traits can be added later in a backward compatible manner. Finally, `fn_stream` has been moved to be a child module of `pagination_stream`."
references = ["smithy-rs#2978"]
meta = { "breaking" = true, "tada" = false, "bug" = false, "target" = "client" }
author = "ysaito1001"
2 changes: 0 additions & 2 deletions aws/sdk/integration-tests/dynamodb/tests/paginators.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,6 @@
use std::collections::HashMap;
use std::iter::FromIterator;

use tokio_stream::StreamExt;

use aws_credential_types::Credentials;
use aws_sdk_dynamodb::types::AttributeValue;
use aws_sdk_dynamodb::{Client, Config};
Expand Down
2 changes: 0 additions & 2 deletions aws/sdk/integration-tests/ec2/tests/paginators.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,6 @@
* SPDX-License-Identifier: Apache-2.0
*/

use tokio_stream::StreamExt;

use aws_sdk_ec2::{config::Credentials, config::Region, types::InstanceType, Client, Config};
use aws_smithy_client::http_connector::HttpConnector;
use aws_smithy_client::test_connection::TestConnection;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ class PaginatorGenerator private constructor(
"HttpResponse" to RuntimeType.smithyRuntimeApi(runtimeConfig).resolve("client::orchestrator::HttpResponse"),
"SdkError" to RuntimeType.sdkError(runtimeConfig),
"client" to RuntimeType.smithyClient(runtimeConfig),
"fn_stream" to RuntimeType.smithyAsync(runtimeConfig).resolve("future::fn_stream"),
"pagination_stream" to RuntimeType.smithyAsync(runtimeConfig).resolve("future::pagination_stream"),

// External Types
"Stream" to RuntimeType.TokioStream.resolve("Stream"),
Expand Down Expand Up @@ -141,13 +141,14 @@ class PaginatorGenerator private constructor(
/// Create the pagination stream
///
/// _Note:_ No requests will be dispatched until the stream is used (eg. with [`.next().await`](tokio_stream::StreamExt::next)).
pub fn send(self) -> impl #{Stream}<Item = #{item_type}> + #{Unpin} {
/// _Note:_ No requests will be dispatched until the stream is used
/// (e.g. with the [`.next().await`](aws_smithy_async::future::pagination_stream::PaginationStream::next) method).
pub fn send(self) -> #{pagination_stream}::PaginationStream<#{item_type}> {
// Move individual fields out of self for the borrow checker
let builder = self.builder;
let handle = self.handle;
#{runtime_plugin_init}
#{fn_stream}::FnStream::new(move |tx| #{Box}::pin(async move {
#{pagination_stream}::PaginationStream::new(#{pagination_stream}::fn_stream::FnStream::new(move |tx| #{Box}::pin(async move {
// Build the input for the first time. If required fields are missing, this is where we'll produce an early error.
let mut input = match builder.build().map_err(#{SdkError}::construction_failure) {
#{Ok}(input) => input,
Expand Down Expand Up @@ -177,7 +178,7 @@ class PaginatorGenerator private constructor(
return
}
}
}))
})))
}
}
""",
Expand Down Expand Up @@ -257,11 +258,12 @@ class PaginatorGenerator private constructor(
impl ${paginatorName}Items {
/// Create the pagination stream
///
/// _Note: No requests will be dispatched until the stream is used (eg. with [`.next().await`](tokio_stream::StreamExt::next))._
/// _Note_: No requests will be dispatched until the stream is used
/// (e.g. with the [`.next().await`](aws_smithy_async::future::pagination_stream::PaginationStream::next) method).
///
/// To read the entirety of the paginator, use [`.collect::<Result<Vec<_>, _>()`](tokio_stream::StreamExt::collect).
pub fn send(self) -> impl #{Stream}<Item = #{item_type}> + #{Unpin} {
#{fn_stream}::TryFlatMap::new(self.0.send()).flat_map(|page| #{extract_items}(page).unwrap_or_default().into_iter())
/// To read the entirety of the paginator, use [`.collect::<Result<Vec<_>, _>()`](aws_smithy_async::future::pagination_stream::PaginationStream::collect).
pub fn send(self) -> #{pagination_stream}::PaginationStream<#{item_type}> {
#{pagination_stream}::TryFlatMap::new(self.0.send()).flat_map(|page| #{extract_items}(page).unwrap_or_default().into_iter())
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -433,7 +433,7 @@ class FluentClientGenerator(
"""
/// Create a paginator for this request
///
/// Paginators are used by calling [`send().await`](#{Paginator}::send) which returns a `Stream`.
/// Paginators are used by calling [`send().await`](#{Paginator}::send) which returns a [`PaginationStream`](aws_smithy_async::future::pagination_stream::PaginationStream).
pub fn into_paginator(self) -> #{Paginator} {
#{Paginator}::new(self.handle, self.inner)
}
Expand Down
7 changes: 6 additions & 1 deletion rust-runtime/aws-smithy-async/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,18 @@ test-util = []
[dependencies]
pin-project-lite = "0.2"
tokio = { version = "1.23.1", features = ["sync"] }
tokio-stream = { version = "0.1.5", default-features = false }
futures-util = { version = "0.3.16", default-features = false }

[dev-dependencies]
pin-utils = "0.1"
tokio = { version = "1.23.1", features = ["rt", "macros", "test-util"] }
tokio-test = "0.4.2"

# futures-util is used by `now_or_later`, for instance, but the tooling
# reports a false positive, saying it is unused.
[package.metadata.cargo-udeps.ignore]
normal = ["futures-util"]

[package.metadata.docs.rs]
all-features = true
targets = ["x86_64-unknown-linux-gnu"]
Expand Down
3 changes: 0 additions & 3 deletions rust-runtime/aws-smithy-async/external-types.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,4 @@ allowed_external_types = [
"aws_smithy_types::config_bag::storable::Storable",
"aws_smithy_types::config_bag::storable::StoreReplace",
"aws_smithy_types::config_bag::storable::Storer",

# TODO(https://github.com/awslabs/smithy-rs/issues/1193): Switch to AsyncIterator once standardized
"futures_core::stream::Stream",
]
Loading

0 comments on commit 75fc85f

Please sign in to comment.