diff --git a/CHANGELOG.next.toml b/CHANGELOG.next.toml index afb23e8029..bffedc818e 100644 --- a/CHANGELOG.next.toml +++ b/CHANGELOG.next.toml @@ -183,3 +183,15 @@ message = "Produce better docs when items are marked @required" references = ["smithy-rs#2996"] meta = { "breaking" = false, "tada" = false, "bug" = false, "target" = "client" } author = "rcoh" + +[[aws-sdk-rust]] +message = "The `futures_core::stream::Stream` trait has been removed from public API. It should not affect usual SDK use cases. If your code uses paginators, you do not need to use the `Stream` trait or its exntension traits, but only the `next`, `try_next`, `collect`, and `try_collect` methods are supported on `PaginationStream`. Other stream operations that were previously available through the trait or its extension traits can be added later in a backward compatible manner. Finally, `fn_stream` has been moved to be a child module of `pagination_stream`." +references = ["smithy-rs#2978"] +meta = { "breaking" = true, "tada" = false, "bug" = false } +author = "ysaito1001" + +[[smithy-rs]] +message = "The `futures_core::stream::Stream` trait has been removed from public API. `FnStream` only supports `next`, `try_next`, `collect`, and `try_collect` methods. [`TryFlatMap::flat_map`](https://docs.rs/aws-smithy-async/latest/aws_smithy_async/future/pagination_stream/struct.TryFlatMap.html#method.flat_map) returns [`PaginationStream`](https://docs.rs/aws-smithy-async/latest/aws_smithy_async/future/pagination_stream/struct.PaginationStream.html), which should be preferred to `FnStream` at an interface level. Other stream operations that were previously available through the trait or its extension traits can be added later in a backward compatible manner. Finally, `fn_stream` has been moved to be a child module of `pagination_stream`." +references = ["smithy-rs#2978"] +meta = { "breaking" = true, "tada" = false, "bug" = false, "target" = "client" } +author = "ysaito1001" diff --git a/aws/sdk/integration-tests/dynamodb/tests/paginators.rs b/aws/sdk/integration-tests/dynamodb/tests/paginators.rs index 807a11890d..a3d0c62473 100644 --- a/aws/sdk/integration-tests/dynamodb/tests/paginators.rs +++ b/aws/sdk/integration-tests/dynamodb/tests/paginators.rs @@ -6,8 +6,6 @@ use std::collections::HashMap; use std::iter::FromIterator; -use tokio_stream::StreamExt; - use aws_credential_types::Credentials; use aws_sdk_dynamodb::types::AttributeValue; use aws_sdk_dynamodb::{Client, Config}; diff --git a/aws/sdk/integration-tests/ec2/tests/paginators.rs b/aws/sdk/integration-tests/ec2/tests/paginators.rs index 83528f2075..d070971a4f 100644 --- a/aws/sdk/integration-tests/ec2/tests/paginators.rs +++ b/aws/sdk/integration-tests/ec2/tests/paginators.rs @@ -3,8 +3,6 @@ * SPDX-License-Identifier: Apache-2.0 */ -use tokio_stream::StreamExt; - use aws_sdk_ec2::{config::Credentials, config::Region, types::InstanceType, Client, Config}; use aws_smithy_client::http_connector::HttpConnector; use aws_smithy_client::test_connection::TestConnection; diff --git a/codegen-client/src/main/kotlin/software/amazon/smithy/rust/codegen/client/smithy/generators/PaginatorGenerator.kt b/codegen-client/src/main/kotlin/software/amazon/smithy/rust/codegen/client/smithy/generators/PaginatorGenerator.kt index dedc50f31f..6f9892b2cc 100644 --- a/codegen-client/src/main/kotlin/software/amazon/smithy/rust/codegen/client/smithy/generators/PaginatorGenerator.kt +++ b/codegen-client/src/main/kotlin/software/amazon/smithy/rust/codegen/client/smithy/generators/PaginatorGenerator.kt @@ -90,7 +90,7 @@ class PaginatorGenerator private constructor( "HttpResponse" to RuntimeType.smithyRuntimeApi(runtimeConfig).resolve("client::orchestrator::HttpResponse"), "SdkError" to RuntimeType.sdkError(runtimeConfig), "client" to RuntimeType.smithyClient(runtimeConfig), - "fn_stream" to RuntimeType.smithyAsync(runtimeConfig).resolve("future::fn_stream"), + "pagination_stream" to RuntimeType.smithyAsync(runtimeConfig).resolve("future::pagination_stream"), // External Types "Stream" to RuntimeType.TokioStream.resolve("Stream"), @@ -141,13 +141,14 @@ class PaginatorGenerator private constructor( /// Create the pagination stream /// - /// _Note:_ No requests will be dispatched until the stream is used (eg. with [`.next().await`](tokio_stream::StreamExt::next)). - pub fn send(self) -> impl #{Stream} + #{Unpin} { + /// _Note:_ No requests will be dispatched until the stream is used + /// (e.g. with the [`.next().await`](aws_smithy_async::future::pagination_stream::PaginationStream::next) method). + pub fn send(self) -> #{pagination_stream}::PaginationStream<#{item_type}> { // Move individual fields out of self for the borrow checker let builder = self.builder; let handle = self.handle; #{runtime_plugin_init} - #{fn_stream}::FnStream::new(move |tx| #{Box}::pin(async move { + #{pagination_stream}::PaginationStream::new(#{pagination_stream}::fn_stream::FnStream::new(move |tx| #{Box}::pin(async move { // Build the input for the first time. If required fields are missing, this is where we'll produce an early error. let mut input = match builder.build().map_err(#{SdkError}::construction_failure) { #{Ok}(input) => input, @@ -177,7 +178,7 @@ class PaginatorGenerator private constructor( return } } - })) + }))) } } """, @@ -257,11 +258,12 @@ class PaginatorGenerator private constructor( impl ${paginatorName}Items { /// Create the pagination stream /// - /// _Note: No requests will be dispatched until the stream is used (eg. with [`.next().await`](tokio_stream::StreamExt::next))._ + /// _Note_: No requests will be dispatched until the stream is used + /// (e.g. with the [`.next().await`](aws_smithy_async::future::pagination_stream::PaginationStream::next) method). /// - /// To read the entirety of the paginator, use [`.collect::, _>()`](tokio_stream::StreamExt::collect). - pub fn send(self) -> impl #{Stream} + #{Unpin} { - #{fn_stream}::TryFlatMap::new(self.0.send()).flat_map(|page| #{extract_items}(page).unwrap_or_default().into_iter()) + /// To read the entirety of the paginator, use [`.collect::, _>()`](aws_smithy_async::future::pagination_stream::PaginationStream::collect). + pub fn send(self) -> #{pagination_stream}::PaginationStream<#{item_type}> { + #{pagination_stream}::TryFlatMap::new(self.0.send()).flat_map(|page| #{extract_items}(page).unwrap_or_default().into_iter()) } } diff --git a/codegen-client/src/main/kotlin/software/amazon/smithy/rust/codegen/client/smithy/generators/client/FluentClientGenerator.kt b/codegen-client/src/main/kotlin/software/amazon/smithy/rust/codegen/client/smithy/generators/client/FluentClientGenerator.kt index cd50393a92..614d89bd48 100644 --- a/codegen-client/src/main/kotlin/software/amazon/smithy/rust/codegen/client/smithy/generators/client/FluentClientGenerator.kt +++ b/codegen-client/src/main/kotlin/software/amazon/smithy/rust/codegen/client/smithy/generators/client/FluentClientGenerator.kt @@ -433,7 +433,7 @@ class FluentClientGenerator( """ /// Create a paginator for this request /// - /// Paginators are used by calling [`send().await`](#{Paginator}::send) which returns a `Stream`. + /// Paginators are used by calling [`send().await`](#{Paginator}::send) which returns a [`PaginationStream`](aws_smithy_async::future::pagination_stream::PaginationStream). pub fn into_paginator(self) -> #{Paginator} { #{Paginator}::new(self.handle, self.inner) } diff --git a/rust-runtime/aws-smithy-async/Cargo.toml b/rust-runtime/aws-smithy-async/Cargo.toml index c95862d9ff..fd51b4fb1e 100644 --- a/rust-runtime/aws-smithy-async/Cargo.toml +++ b/rust-runtime/aws-smithy-async/Cargo.toml @@ -14,13 +14,18 @@ test-util = [] [dependencies] pin-project-lite = "0.2" tokio = { version = "1.23.1", features = ["sync"] } -tokio-stream = { version = "0.1.5", default-features = false } futures-util = { version = "0.3.16", default-features = false } [dev-dependencies] +pin-utils = "0.1" tokio = { version = "1.23.1", features = ["rt", "macros", "test-util"] } tokio-test = "0.4.2" +# futures-util is used by `now_or_later`, for instance, but the tooling +# reports a false positive, saying it is unused. +[package.metadata.cargo-udeps.ignore] +normal = ["futures-util"] + [package.metadata.docs.rs] all-features = true targets = ["x86_64-unknown-linux-gnu"] diff --git a/rust-runtime/aws-smithy-async/external-types.toml b/rust-runtime/aws-smithy-async/external-types.toml index 424f7dc1db..464456a2dc 100644 --- a/rust-runtime/aws-smithy-async/external-types.toml +++ b/rust-runtime/aws-smithy-async/external-types.toml @@ -2,7 +2,4 @@ allowed_external_types = [ "aws_smithy_types::config_bag::storable::Storable", "aws_smithy_types::config_bag::storable::StoreReplace", "aws_smithy_types::config_bag::storable::Storer", - - # TODO(https://github.com/awslabs/smithy-rs/issues/1193): Switch to AsyncIterator once standardized - "futures_core::stream::Stream", ] diff --git a/rust-runtime/aws-smithy-async/src/future/fn_stream.rs b/rust-runtime/aws-smithy-async/src/future/fn_stream.rs deleted file mode 100644 index 804b08f6bb..0000000000 --- a/rust-runtime/aws-smithy-async/src/future/fn_stream.rs +++ /dev/null @@ -1,297 +0,0 @@ -/* - * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. - * SPDX-License-Identifier: Apache-2.0 - */ - -//! Utility to drive a stream with an async function and a channel. - -use crate::future::rendezvous; -use futures_util::StreamExt; -use pin_project_lite::pin_project; -use std::future::Future; -use std::pin::Pin; -use std::task::{Context, Poll}; -use tokio_stream::{Iter, Once, Stream}; - -pin_project! { - /// Utility to drive a stream with an async function and a channel. - /// - /// The closure is passed a reference to a `Sender` which acts as a rendezvous channel. Messages - /// sent to the sender will be emitted to the stream. Because the stream is 1-bounded, the function - /// will not proceed until the stream is read. - /// - /// This utility is used by generated paginators to generate a stream of paginated results. - /// - /// If `tx.send` returns an error, the function MUST return immediately. - /// - /// # Examples - /// ```no_run - /// use tokio_stream::StreamExt; - /// # async fn docs() { - /// use aws_smithy_async::future::fn_stream::FnStream; - /// let stream = FnStream::new(|tx| Box::pin(async move { - /// if let Err(_) = tx.send("Hello!").await { - /// return; - /// } - /// if let Err(_) = tx.send("Goodbye!").await { - /// return; - /// } - /// })); - /// assert_eq!(stream.collect::>().await, vec!["Hello!", "Goodbye!"]); - /// # } - pub struct FnStream { - #[pin] - rx: rendezvous::Receiver, - #[pin] - generator: Option, - } -} - -impl FnStream { - /// Creates a new function based stream driven by `generator`. - /// - /// For examples, see the documentation for [`FnStream`] - pub fn new(generator: T) -> Self - where - T: FnOnce(rendezvous::Sender) -> F, - { - let (tx, rx) = rendezvous::channel::(); - Self { - rx, - generator: Some(generator(tx)), - } - } -} - -impl Stream for FnStream -where - F: Future, -{ - type Item = Item; - - fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - let mut me = self.project(); - match me.rx.poll_recv(cx) { - Poll::Ready(item) => Poll::Ready(item), - Poll::Pending => { - if let Some(generator) = me.generator.as_mut().as_pin_mut() { - if generator.poll(cx).is_ready() { - // if the generator returned ready we MUST NOT poll it again—doing so - // will cause a panic. - me.generator.set(None); - } - } - Poll::Pending - } - } - } -} - -/// Utility wrapper to flatten paginated results -/// -/// When flattening paginated results, it's most convenient to produce an iterator where the `Result` -/// is present in each item. This provides `items()` which can wrap an stream of `Result` -/// and produce a stream of `Result`. -#[derive(Debug)] -pub struct TryFlatMap(I); - -impl TryFlatMap { - /// Create a `TryFlatMap` that wraps the input - pub fn new(i: I) -> Self { - Self(i) - } - - /// Produce a new [`Stream`] by mapping this stream with `map` then flattening the result - pub fn flat_map(self, map: M) -> impl Stream> - where - I: Stream>, - M: Fn(Page) -> Iter, - Iter: IntoIterator, - { - self.0.flat_map(move |page| match page { - Ok(page) => OnceOrMany::Many { - many: tokio_stream::iter(map(page).into_iter().map(Ok)), - }, - Err(e) => OnceOrMany::Once { - once: tokio_stream::once(Err(e)), - }, - }) - } -} - -pin_project! { - /// Helper enum to to support returning `Once` and `Iter` from `Items::items` - #[project = OnceOrManyProj] - enum OnceOrMany { - Many { #[pin] many: Iter }, - Once { #[pin] once: Once }, - } -} - -impl Stream for OnceOrMany -where - Iter: Iterator, -{ - type Item = Item; - - fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - let me = self.project(); - match me { - OnceOrManyProj::Many { many } => many.poll_next(cx), - OnceOrManyProj::Once { once } => once.poll_next(cx), - } - } -} - -#[cfg(test)] -mod test { - use crate::future::fn_stream::{FnStream, TryFlatMap}; - use std::sync::{Arc, Mutex}; - use std::time::Duration; - use tokio_stream::StreamExt; - - /// basic test of FnStream functionality - #[tokio::test] - async fn fn_stream_returns_results() { - tokio::time::pause(); - let mut stream = FnStream::new(|tx| { - Box::pin(async move { - tx.send("1").await.expect("failed to send"); - tokio::time::sleep(Duration::from_secs(1)).await; - tokio::time::sleep(Duration::from_secs(1)).await; - tx.send("2").await.expect("failed to send"); - tokio::time::sleep(Duration::from_secs(1)).await; - tx.send("3").await.expect("failed to send"); - }) - }); - let mut out = vec![]; - while let Some(value) = stream.next().await { - out.push(value); - } - assert_eq!(out, vec!["1", "2", "3"]); - } - - // smithy-rs#1902: there was a bug where we could continue to poll the generator after it - // had returned Poll::Ready. This test case leaks the tx half so that the channel stays open - // but the send side generator completes. By calling `poll` multiple times on the resulting future, - // we can trigger the bug and validate the fix. - #[tokio::test] - async fn fn_stream_doesnt_poll_after_done() { - let mut stream = FnStream::new(|tx| { - Box::pin(async move { - assert!(tx.send("blah").await.is_ok()); - Box::leak(Box::new(tx)); - }) - }); - assert_eq!(stream.next().await, Some("blah")); - let mut test_stream = tokio_test::task::spawn(stream); - assert!(test_stream.poll_next().is_pending()); - assert!(test_stream.poll_next().is_pending()); - } - - /// Tests that the generator will not advance until demand exists - #[tokio::test] - async fn waits_for_reader() { - let progress = Arc::new(Mutex::new(0)); - let mut stream = FnStream::new(|tx| { - let progress = progress.clone(); - Box::pin(async move { - *progress.lock().unwrap() = 1; - tx.send("1").await.expect("failed to send"); - *progress.lock().unwrap() = 2; - tx.send("2").await.expect("failed to send"); - *progress.lock().unwrap() = 3; - tx.send("3").await.expect("failed to send"); - *progress.lock().unwrap() = 4; - }) - }); - assert_eq!(*progress.lock().unwrap(), 0); - stream.next().await.expect("ready"); - assert_eq!(*progress.lock().unwrap(), 1); - - assert_eq!(stream.next().await.expect("ready"), "2"); - assert_eq!(*progress.lock().unwrap(), 2); - - let _ = stream.next().await.expect("ready"); - assert_eq!(*progress.lock().unwrap(), 3); - assert_eq!(stream.next().await, None); - assert_eq!(*progress.lock().unwrap(), 4); - } - - #[tokio::test] - async fn generator_with_errors() { - let mut stream = FnStream::new(|tx| { - Box::pin(async move { - for i in 0..5 { - if i != 2 { - if tx.send(Ok(i)).await.is_err() { - return; - } - } else { - tx.send(Err(i)).await.unwrap(); - return; - } - } - }) - }); - let mut out = vec![]; - while let Some(Ok(value)) = stream.next().await { - out.push(value); - } - assert_eq!(out, vec![0, 1]); - } - - #[tokio::test] - async fn flatten_items_ok() { - #[derive(Debug)] - struct Output { - items: Vec, - } - let stream = FnStream::new(|tx| { - Box::pin(async move { - tx.send(Ok(Output { - items: vec![1, 2, 3], - })) - .await - .unwrap(); - tx.send(Ok(Output { - items: vec![4, 5, 6], - })) - .await - .unwrap(); - }) - }); - assert_eq!( - TryFlatMap(stream) - .flat_map(|output| output.items.into_iter()) - .collect::, &str>>() - .await, - Ok(vec![1, 2, 3, 4, 5, 6]) - ) - } - - #[tokio::test] - async fn flatten_items_error() { - #[derive(Debug)] - struct Output { - items: Vec, - } - let stream = FnStream::new(|tx| { - Box::pin(async move { - tx.send(Ok(Output { - items: vec![1, 2, 3], - })) - .await - .unwrap(); - tx.send(Err("bummer")).await.unwrap(); - }) - }); - assert_eq!( - TryFlatMap(stream) - .flat_map(|output| output.items.into_iter()) - .collect::, &str>>() - .await, - Err("bummer") - ) - } -} diff --git a/rust-runtime/aws-smithy-async/src/future/mod.rs b/rust-runtime/aws-smithy-async/src/future/mod.rs index 1e99bdc304..44894e0733 100644 --- a/rust-runtime/aws-smithy-async/src/future/mod.rs +++ b/rust-runtime/aws-smithy-async/src/future/mod.rs @@ -5,8 +5,8 @@ //! Useful runtime-agnostic future implementations. -pub mod fn_stream; pub mod never; pub mod now_or_later; +pub mod pagination_stream; pub mod rendezvous; pub mod timeout; diff --git a/rust-runtime/aws-smithy-async/src/future/pagination_stream.rs b/rust-runtime/aws-smithy-async/src/future/pagination_stream.rs new file mode 100644 index 0000000000..f263685b49 --- /dev/null +++ b/rust-runtime/aws-smithy-async/src/future/pagination_stream.rs @@ -0,0 +1,302 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0 + */ + +//! Provides types to support stream-like operations for paginators. + +use crate::future::pagination_stream::collect::sealed::Collectable; +use std::future::Future; +use std::pin::Pin; +pub mod collect; +pub mod fn_stream; +use fn_stream::FnStream; + +/// Stream specifically made to support paginators. +/// +/// `PaginationStream` provides two primary mechanisms for accessing stream of data. +/// 1. With [`.next()`](PaginationStream::next) (or [`try_next()`](PaginationStream::try_next)): +/// +/// ```no_run +/// # async fn docs() { +/// # use aws_smithy_async::future::pagination_stream::PaginationStream; +/// # fn operation_to_yield_paginator() -> PaginationStream { +/// # todo!() +/// # } +/// # struct Page; +/// let mut stream: PaginationStream = operation_to_yield_paginator(); +/// while let Some(page) = stream.next().await { +/// // process `page` +/// } +/// # } +/// ``` +/// 2. With [`.collect()`](PaginationStream::collect) (or [`try_collect()`](PaginationStream::try_collect)): +/// +/// ```no_run +/// # async fn docs() { +/// # use aws_smithy_async::future::pagination_stream::PaginationStream; +/// # fn operation_to_yield_paginator() -> PaginationStream { +/// # todo!() +/// # } +/// # struct Page; +/// let mut stream: PaginationStream = operation_to_yield_paginator(); +/// let result = stream.collect::>().await; +/// # } +/// ``` +/// +/// [`PaginationStream`] is implemented in terms of [`FnStream`], but the latter is meant to be +/// used internally and not by external users. +#[derive(Debug)] +pub struct PaginationStream(FnStream); + +impl PaginationStream { + /// Creates a `PaginationStream` from the given [`FnStream`]. + pub fn new(stream: FnStream) -> Self { + Self(stream) + } + + /// Consumes and returns the next `Item` from this stream. + pub async fn next(&mut self) -> Option { + self.0.next().await + } + + /// Consumes this stream and gathers elements into a collection. + pub async fn collect>(self) -> T { + self.0.collect().await + } +} + +impl PaginationStream> { + /// Yields the next item in the stream or returns an error if an error is encountered. + pub async fn try_next(&mut self) -> Result, E> { + self.next().await.transpose() + } + + /// Convenience method for `.collect::, _>()`. + pub async fn try_collect(self) -> Result, E> { + self.collect::, E>>().await + } +} + +/// Utility wrapper to flatten paginated results +/// +/// When flattening paginated results, it's most convenient to produce an iterator where the `Result` +/// is present in each item. This provides `items()` which can wrap an stream of `Result` +/// and produce a stream of `Result`. +#[derive(Debug)] +pub struct TryFlatMap(PaginationStream>); + +impl TryFlatMap { + /// Creates a `TryFlatMap` that wraps the input. + pub fn new(stream: PaginationStream>) -> Self { + Self(stream) + } + + /// Produces a new [`PaginationStream`] by mapping this stream with `map` then flattening the result. + pub fn flat_map(mut self, map: M) -> PaginationStream> + where + Page: Send + 'static, + Err: Send + 'static, + M: Fn(Page) -> Iter + Send + 'static, + Item: Send + 'static, + Iter: IntoIterator + Send, + ::IntoIter: Send, + { + PaginationStream::new(FnStream::new(|tx| { + Box::pin(async move { + while let Some(page) = self.0.next().await { + match page { + Ok(page) => { + let mapped = map(page); + for item in mapped.into_iter() { + let _ = tx.send(Ok(item)).await; + } + } + Err(e) => { + let _ = tx.send(Err(e)).await; + break; + } + } + } + }) as Pin + Send>> + })) + } +} + +#[cfg(test)] +mod test { + use crate::future::pagination_stream::{FnStream, PaginationStream, TryFlatMap}; + use std::sync::{Arc, Mutex}; + use std::time::Duration; + + /// basic test of FnStream functionality + #[tokio::test] + async fn fn_stream_returns_results() { + tokio::time::pause(); + let mut stream = FnStream::new(|tx| { + Box::pin(async move { + tx.send("1").await.expect("failed to send"); + tokio::time::sleep(Duration::from_secs(1)).await; + tokio::time::sleep(Duration::from_secs(1)).await; + tx.send("2").await.expect("failed to send"); + tokio::time::sleep(Duration::from_secs(1)).await; + tx.send("3").await.expect("failed to send"); + }) + }); + let mut out = vec![]; + while let Some(value) = stream.next().await { + out.push(value); + } + assert_eq!(vec!["1", "2", "3"], out); + } + + #[tokio::test] + async fn fn_stream_try_next() { + tokio::time::pause(); + let mut stream = FnStream::new(|tx| { + Box::pin(async move { + tx.send(Ok(1)).await.unwrap(); + tx.send(Ok(2)).await.unwrap(); + tx.send(Err("err")).await.unwrap(); + }) + }); + let mut out = vec![]; + while let Ok(value) = stream.try_next().await { + out.push(value); + } + assert_eq!(vec![Some(1), Some(2)], out); + } + + // smithy-rs#1902: there was a bug where we could continue to poll the generator after it + // had returned Poll::Ready. This test case leaks the tx half so that the channel stays open + // but the send side generator completes. By calling `poll` multiple times on the resulting future, + // we can trigger the bug and validate the fix. + #[tokio::test] + async fn fn_stream_doesnt_poll_after_done() { + let mut stream = FnStream::new(|tx| { + Box::pin(async move { + assert!(tx.send("blah").await.is_ok()); + Box::leak(Box::new(tx)); + }) + }); + assert_eq!(Some("blah"), stream.next().await); + let mut test_stream = tokio_test::task::spawn(stream); + // `tokio_test::task::Spawn::poll_next` can only be invoked when the wrapped + // type implements the `Stream` trait. Here, `FnStream` does not implement it, + // so we work around it by using the `enter` method. + let _ = test_stream.enter(|ctx, pin| { + let polled = pin.poll_next(ctx); + assert!(polled.is_pending()); + }); + let _ = test_stream.enter(|ctx, pin| { + let polled = pin.poll_next(ctx); + assert!(polled.is_pending()); + }); + } + + /// Tests that the generator will not advance until demand exists + #[tokio::test] + async fn waits_for_reader() { + let progress = Arc::new(Mutex::new(0)); + let mut stream = FnStream::new(|tx| { + let progress = progress.clone(); + Box::pin(async move { + *progress.lock().unwrap() = 1; + tx.send("1").await.expect("failed to send"); + *progress.lock().unwrap() = 2; + tx.send("2").await.expect("failed to send"); + *progress.lock().unwrap() = 3; + tx.send("3").await.expect("failed to send"); + *progress.lock().unwrap() = 4; + }) + }); + assert_eq!(*progress.lock().unwrap(), 0); + stream.next().await.expect("ready"); + assert_eq!(*progress.lock().unwrap(), 1); + + assert_eq!("2", stream.next().await.expect("ready")); + assert_eq!(2, *progress.lock().unwrap()); + + let _ = stream.next().await.expect("ready"); + assert_eq!(3, *progress.lock().unwrap()); + assert_eq!(None, stream.next().await); + assert_eq!(4, *progress.lock().unwrap()); + } + + #[tokio::test] + async fn generator_with_errors() { + let mut stream = FnStream::new(|tx| { + Box::pin(async move { + for i in 0..5 { + if i != 2 { + if tx.send(Ok(i)).await.is_err() { + return; + } + } else { + tx.send(Err(i)).await.unwrap(); + return; + } + } + }) + }); + let mut out = vec![]; + while let Some(Ok(value)) = stream.next().await { + out.push(value); + } + assert_eq!(vec![0, 1], out); + } + + #[tokio::test] + async fn flatten_items_ok() { + #[derive(Debug)] + struct Output { + items: Vec, + } + let stream: FnStream> = FnStream::new(|tx| { + Box::pin(async move { + tx.send(Ok(Output { + items: vec![1, 2, 3], + })) + .await + .unwrap(); + tx.send(Ok(Output { + items: vec![4, 5, 6], + })) + .await + .unwrap(); + }) + }); + assert_eq!( + Ok(vec![1, 2, 3, 4, 5, 6]), + TryFlatMap::new(PaginationStream::new(stream)) + .flat_map(|output| output.items.into_iter()) + .try_collect() + .await, + ); + } + + #[tokio::test] + async fn flatten_items_error() { + #[derive(Debug)] + struct Output { + items: Vec, + } + let stream = FnStream::new(|tx| { + Box::pin(async move { + tx.send(Ok(Output { + items: vec![1, 2, 3], + })) + .await + .unwrap(); + tx.send(Err("bummer")).await.unwrap(); + }) + }); + assert_eq!( + Err("bummer"), + TryFlatMap::new(PaginationStream::new(stream)) + .flat_map(|output| output.items.into_iter()) + .try_collect() + .await + ) + } +} diff --git a/rust-runtime/aws-smithy-async/src/future/pagination_stream/collect.rs b/rust-runtime/aws-smithy-async/src/future/pagination_stream/collect.rs new file mode 100644 index 0000000000..1a6fcfbf9a --- /dev/null +++ b/rust-runtime/aws-smithy-async/src/future/pagination_stream/collect.rs @@ -0,0 +1,75 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0 + */ + +//! Module to extend the functionality of types in `patination_stream` module to allow for +//! collecting elements of the stream into collection. +//! +//! Majority of the code is borrowed from +//! + +pub(crate) mod sealed { + /// A trait that signifies that elements can be collected into `T`. + /// + /// Currently the trait may not be implemented by clients so we can make changes in the future + /// without breaking code depending on it. + #[doc(hidden)] + pub trait Collectable { + type Collection; + + fn initialize() -> Self::Collection; + + fn extend(collection: &mut Self::Collection, item: T) -> bool; + + fn finalize(collection: Self::Collection) -> Self; + } +} + +impl sealed::Collectable for Vec { + type Collection = Self; + + fn initialize() -> Self::Collection { + Vec::default() + } + + fn extend(collection: &mut Self::Collection, item: T) -> bool { + collection.push(item); + true + } + + fn finalize(collection: Self::Collection) -> Self { + collection + } +} + +impl sealed::Collectable> for Result +where + U: sealed::Collectable, +{ + type Collection = Result; + + fn initialize() -> Self::Collection { + Ok(U::initialize()) + } + + fn extend(collection: &mut Self::Collection, item: Result) -> bool { + match item { + Ok(item) => { + let collection = collection.as_mut().ok().expect("invalid state"); + U::extend(collection, item) + } + Err(e) => { + *collection = Err(e); + false + } + } + } + + fn finalize(collection: Self::Collection) -> Self { + match collection { + Ok(collection) => Ok(U::finalize(collection)), + err @ Err(_) => Err(err.map(drop).unwrap_err()), + } + } +} diff --git a/rust-runtime/aws-smithy-async/src/future/pagination_stream/fn_stream.rs b/rust-runtime/aws-smithy-async/src/future/pagination_stream/fn_stream.rs new file mode 100644 index 0000000000..260bfb9e87 --- /dev/null +++ b/rust-runtime/aws-smithy-async/src/future/pagination_stream/fn_stream.rs @@ -0,0 +1,124 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0 + */ + +//! Module to define utility to drive a stream with an async function and a channel. + +use crate::future::pagination_stream::collect::sealed::Collectable; +use crate::future::rendezvous; +use pin_project_lite::pin_project; +use std::fmt; +use std::future::poll_fn; +use std::future::Future; +use std::pin::Pin; +use std::task::{Context, Poll}; + +pin_project! { + /// The closure is passed a reference to a `Sender` which acts as a rendezvous channel. Messages + /// sent to the sender will be emitted to the stream. Because the stream is 1-bounded, the function + /// will not proceed until the stream is read. + /// + /// This utility is used by generated paginators to generate a stream of paginated results. + /// + /// If `tx.send` returns an error, the function MUST return immediately. + /// + /// Note `FnStream` is only `Send` but not `Sync` because `generator` is a boxed future that + /// is `Send` and returns `()` as output when it is done. + /// + /// # Examples + /// ```no_run + /// # async fn docs() { + /// use aws_smithy_async::future::pagination_stream::fn_stream::FnStream; + /// let mut stream = FnStream::new(|tx| Box::pin(async move { + /// if let Err(_) = tx.send("Hello!").await { + /// return; + /// } + /// if let Err(_) = tx.send("Goodbye!").await { + /// return; + /// } + /// })); + /// assert_eq!(stream.collect::>().await, vec!["Hello!", "Goodbye!"]); + /// # } + pub struct FnStream { + #[pin] + rx: rendezvous::Receiver, + generator: Option + Send + 'static>>>, + } +} + +impl fmt::Debug for FnStream { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + let item_typename = std::any::type_name::(); + write!(f, "FnStream<{item_typename}>") + } +} + +impl FnStream { + /// Creates a new function based stream driven by `generator`. + /// + /// For examples, see the documentation for [`FnStream`] + pub fn new(generator: T) -> Self + where + T: FnOnce(rendezvous::Sender) -> Pin + Send + 'static>>, + { + let (tx, rx) = rendezvous::channel::(); + Self { + rx, + generator: Some(Box::pin(generator(tx))), + } + } + + /// Consumes and returns the next `Item` from this stream. + pub async fn next(&mut self) -> Option + where + Self: Unpin, + { + let mut me = Pin::new(self); + poll_fn(|cx| me.as_mut().poll_next(cx)).await + } + + /// Attempts to pull out the next value of this stream, returning `None` if the stream is + /// exhausted. + pub(crate) fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let mut me = self.project(); + match me.rx.poll_recv(cx) { + Poll::Ready(item) => Poll::Ready(item), + Poll::Pending => { + if let Some(generator) = me.generator { + if generator.as_mut().poll(cx).is_ready() { + // `generator` keeps writing items to `tx` and will not be `Poll::Ready` + // until it is done writing to `tx`. Once it is done, it returns `()` + // as output and is `Poll::Ready`, at which point we MUST NOT poll it again + // since doing so will cause a panic. + *me.generator = None; + } + } + Poll::Pending + } + } + } + + /// Consumes this stream and gathers elements into a collection. + pub async fn collect>(mut self) -> T { + let mut collection = T::initialize(); + while let Some(item) = self.next().await { + if !T::extend(&mut collection, item) { + break; + } + } + T::finalize(collection) + } +} + +impl FnStream> { + /// Yields the next item in the stream or returns an error if an error is encountered. + pub async fn try_next(&mut self) -> Result, E> { + self.next().await.transpose() + } + + /// Convenience method for `.collect::, _>()`. + pub async fn try_collect(self) -> Result, E> { + self.collect::, E>>().await + } +} diff --git a/rust-runtime/aws-smithy-async/src/future/rendezvous.rs b/rust-runtime/aws-smithy-async/src/future/rendezvous.rs index 16456f123e..78762866e2 100644 --- a/rust-runtime/aws-smithy-async/src/future/rendezvous.rs +++ b/rust-runtime/aws-smithy-async/src/future/rendezvous.rs @@ -10,8 +10,9 @@ //! and coordinate with the receiver. //! //! Rendezvous channels should be used with care—it's inherently easy to deadlock unless they're being -//! used from separate tasks or an a coroutine setup (e.g. [`crate::future::fn_stream::FnStream`]) +//! used from separate tasks or an a coroutine setup (e.g. [`crate::future::pagination_stream::fn_stream::FnStream`]) +use std::future::poll_fn; use std::sync::Arc; use std::task::{Context, Poll}; use tokio::sync::Semaphore; @@ -104,7 +105,11 @@ pub struct Receiver { impl Receiver { /// Polls to receive an item from the channel - pub fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll> { + pub async fn recv(&mut self) -> Option { + poll_fn(|cx| self.poll_recv(cx)).await + } + + pub(crate) fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll> { // This uses `needs_permit` to track whether this is the first poll since we last returned an item. // If it is, we will grant a permit to the semaphore. Otherwise, we'll just forward the response through. let resp = self.chan.poll_recv(cx); @@ -124,13 +129,8 @@ impl Receiver { #[cfg(test)] mod test { - use crate::future::rendezvous::{channel, Receiver}; + use crate::future::rendezvous::channel; use std::sync::{Arc, Mutex}; - use tokio::macros::support::poll_fn; - - async fn recv(rx: &mut Receiver) -> Option { - poll_fn(|cx| rx.poll_recv(cx)).await - } #[tokio::test] async fn send_blocks_caller() { @@ -145,11 +145,11 @@ mod test { *idone.lock().unwrap() = 3; }); assert_eq!(*done.lock().unwrap(), 0); - assert_eq!(recv(&mut rx).await, Some(0)); + assert_eq!(rx.recv().await, Some(0)); assert_eq!(*done.lock().unwrap(), 1); - assert_eq!(recv(&mut rx).await, Some(1)); + assert_eq!(rx.recv().await, Some(1)); assert_eq!(*done.lock().unwrap(), 2); - assert_eq!(recv(&mut rx).await, None); + assert_eq!(rx.recv().await, None); assert_eq!(*done.lock().unwrap(), 3); let _ = send.await; } diff --git a/tools/ci-cdk/canary-lambda/src/latest/paginator_canary.rs b/tools/ci-cdk/canary-lambda/src/latest/paginator_canary.rs index d50c4f2be8..11914660ad 100644 --- a/tools/ci-cdk/canary-lambda/src/latest/paginator_canary.rs +++ b/tools/ci-cdk/canary-lambda/src/latest/paginator_canary.rs @@ -10,7 +10,6 @@ use aws_sdk_ec2 as ec2; use aws_sdk_ec2::types::InstanceType; use crate::CanaryEnv; -use tokio_stream::StreamExt; mk_canary!( "ec2_paginator", diff --git a/tools/ci-cdk/canary-lambda/src/main.rs b/tools/ci-cdk/canary-lambda/src/main.rs index 688462031d..8fc6f4b2e7 100644 --- a/tools/ci-cdk/canary-lambda/src/main.rs +++ b/tools/ci-cdk/canary-lambda/src/main.rs @@ -26,11 +26,11 @@ mod latest; #[cfg(feature = "latest")] pub(crate) use latest as current_canary; -// NOTE: This module can be deleted 3 releases after release-2023-01-26 -#[cfg(feature = "release-2023-01-26")] -mod release_2023_01_26; -#[cfg(feature = "release-2023-01-26")] -pub(crate) use release_2023_01_26 as current_canary; +// NOTE: This module can be deleted 3 releases after release-2023-08-23 +#[cfg(feature = "release-2023-08-23")] +mod release_2023_08_23; +#[cfg(feature = "release-2023-08-23")] +pub(crate) use release_2023_08_23 as current_canary; #[tokio::main] async fn main() -> Result<(), Error> { diff --git a/tools/ci-cdk/canary-lambda/src/release_2023_01_26.rs b/tools/ci-cdk/canary-lambda/src/release_2023_08_23.rs similarity index 100% rename from tools/ci-cdk/canary-lambda/src/release_2023_01_26.rs rename to tools/ci-cdk/canary-lambda/src/release_2023_08_23.rs diff --git a/tools/ci-cdk/canary-lambda/src/release_2023_01_26/paginator_canary.rs b/tools/ci-cdk/canary-lambda/src/release_2023_08_23/paginator_canary.rs similarity index 92% rename from tools/ci-cdk/canary-lambda/src/release_2023_01_26/paginator_canary.rs rename to tools/ci-cdk/canary-lambda/src/release_2023_08_23/paginator_canary.rs index 72c9b40ed0..66df5a03e4 100644 --- a/tools/ci-cdk/canary-lambda/src/release_2023_01_26/paginator_canary.rs +++ b/tools/ci-cdk/canary-lambda/src/release_2023_08_23/paginator_canary.rs @@ -7,7 +7,7 @@ use crate::mk_canary; use anyhow::bail; use aws_sdk_ec2 as ec2; -use aws_sdk_ec2::model::InstanceType; +use aws_sdk_ec2::types::InstanceType; use crate::CanaryEnv; use tokio_stream::StreamExt; @@ -30,7 +30,7 @@ pub async fn paginator_canary(client: ec2::Client, page_size: usize) -> anyhow:: let mut num_pages = 0; while let Some(page) = history.try_next().await? { let items_in_page = page.spot_price_history.unwrap_or_default().len(); - if items_in_page > page_size as usize { + if items_in_page > page_size { bail!( "failed to retrieve results of correct page size (expected {}, got {})", page_size, @@ -60,7 +60,7 @@ pub async fn paginator_canary(client: ec2::Client, page_size: usize) -> anyhow:: #[cfg(test)] mod test { - use crate::paginator_canary::paginator_canary; + use crate::current_canary::paginator_canary::paginator_canary; #[tokio::test] async fn test_paginator() { diff --git a/tools/ci-cdk/canary-lambda/src/release_2023_01_26/s3_canary.rs b/tools/ci-cdk/canary-lambda/src/release_2023_08_23/s3_canary.rs similarity index 98% rename from tools/ci-cdk/canary-lambda/src/release_2023_01_26/s3_canary.rs rename to tools/ci-cdk/canary-lambda/src/release_2023_08_23/s3_canary.rs index 70e3d18c55..fbcba976d8 100644 --- a/tools/ci-cdk/canary-lambda/src/release_2023_01_26/s3_canary.rs +++ b/tools/ci-cdk/canary-lambda/src/release_2023_08_23/s3_canary.rs @@ -8,8 +8,8 @@ use crate::{mk_canary, CanaryEnv}; use anyhow::Context; use aws_config::SdkConfig; use aws_sdk_s3 as s3; -use aws_sdk_s3::presigning::config::PresigningConfig; -use s3::types::ByteStream; +use s3::presigning::PresigningConfig; +use s3::primitives::ByteStream; use std::time::Duration; use uuid::Uuid; diff --git a/tools/ci-cdk/canary-lambda/src/release_2023_01_26/transcribe_canary.rs b/tools/ci-cdk/canary-lambda/src/release_2023_08_23/transcribe_canary.rs similarity index 97% rename from tools/ci-cdk/canary-lambda/src/release_2023_01_26/transcribe_canary.rs rename to tools/ci-cdk/canary-lambda/src/release_2023_08_23/transcribe_canary.rs index 554f4c3ddf..8f6420fc1b 100644 --- a/tools/ci-cdk/canary-lambda/src/release_2023_01_26/transcribe_canary.rs +++ b/tools/ci-cdk/canary-lambda/src/release_2023_08_23/transcribe_canary.rs @@ -9,10 +9,10 @@ use async_stream::stream; use aws_config::SdkConfig; use aws_sdk_transcribestreaming as transcribe; use bytes::BufMut; -use transcribe::model::{ +use transcribe::primitives::Blob; +use transcribe::types::{ AudioEvent, AudioStream, LanguageCode, MediaEncoding, TranscriptResultStream, }; -use transcribe::types::Blob; const CHUNK_SIZE: usize = 8192; use crate::canary::CanaryEnv; diff --git a/tools/ci-cdk/canary-runner/src/build_bundle.rs b/tools/ci-cdk/canary-runner/src/build_bundle.rs index 464ee2e4ad..4ec7861460 100644 --- a/tools/ci-cdk/canary-runner/src/build_bundle.rs +++ b/tools/ci-cdk/canary-runner/src/build_bundle.rs @@ -63,9 +63,10 @@ const REQUIRED_SDK_CRATES: &[&str] = &[ "aws-sdk-transcribestreaming", ]; +// The elements in this `Vec` should be sorted in an ascending order by the release date. lazy_static! { static ref NOTABLE_SDK_RELEASE_TAGS: Vec = vec![ - ReleaseTag::from_str("release-2023-01-26").unwrap(), // last version before the crate reorg + ReleaseTag::from_str("release-2023-08-23").unwrap(), // last version before `Stream` trait removal ]; } @@ -112,38 +113,58 @@ enum CrateSource { }, } -fn enabled_features(crate_source: &CrateSource) -> Vec { - let mut enabled = Vec::new(); +fn enabled_feature(crate_source: &CrateSource) -> String { if let CrateSource::VersionsManifest { release_tag, .. } = crate_source { - // we want to select the newest module specified after this release + // we want to select the oldest module specified after this release for notable in NOTABLE_SDK_RELEASE_TAGS.iter() { tracing::debug!(release_tag = ?release_tag, notable = ?notable, "considering if release tag came before notable release"); if release_tag <= notable { tracing::debug!("selecting {} as chosen release", notable); - enabled.push(notable.as_str().into()); - break; + return notable.as_str().into(); } } } - if enabled.is_empty() { - enabled.push("latest".into()); - } - enabled + "latest".into() } fn generate_crate_manifest(crate_source: CrateSource) -> Result { let mut output = BASE_MANIFEST.to_string(); - for &sdk_crate in REQUIRED_SDK_CRATES { + write_dependencies(REQUIRED_SDK_CRATES, &mut output, &crate_source)?; + write!(output, "\n[features]\n").unwrap(); + writeln!(output, "latest = []").unwrap(); + for release_tag in NOTABLE_SDK_RELEASE_TAGS.iter() { + writeln!( + output, + "\"{release_tag}\" = []", + release_tag = release_tag.as_str() + ) + .unwrap(); + } + writeln!( + output, + "default = [\"{enabled}\"]", + enabled = enabled_feature(&crate_source) + ) + .unwrap(); + Ok(output) +} + +fn write_dependencies( + required_crates: &[&str], + output: &mut String, + crate_source: &CrateSource, +) -> Result<()> { + for &required_crate in required_crates { match &crate_source { CrateSource::Path(path) => { - let path_name = match sdk_crate.strip_prefix("aws-sdk-") { + let path_name = match required_crate.strip_prefix("aws-sdk-") { Some(path) => path, - None => sdk_crate, + None => required_crate, }; let crate_path = path.join(path_name); writeln!( output, - r#"{sdk_crate} = {{ path = "{path}" }}"#, + r#"{required_crate} = {{ path = "{path}" }}"#, path = crate_path.to_string_lossy() ) .unwrap() @@ -151,40 +172,20 @@ fn generate_crate_manifest(crate_source: CrateSource) -> Result { CrateSource::VersionsManifest { versions, release_tag, - } => match versions.crates.get(sdk_crate) { + } => match versions.crates.get(required_crate) { Some(version) => writeln!( output, - r#"{sdk_crate} = "{version}""#, + r#"{required_crate} = "{version}""#, version = version.version ) .unwrap(), None => { - bail!("Couldn't find `{sdk_crate}` in versions.toml for `{release_tag}`") + bail!("Couldn't find `{required_crate}` in versions.toml for `{release_tag}`") } }, } } - write!(output, "\n[features]\n").unwrap(); - writeln!(output, "latest = []").unwrap(); - for release_tag in NOTABLE_SDK_RELEASE_TAGS.iter() { - writeln!( - output, - "\"{release_tag}\" = []", - release_tag = release_tag.as_str() - ) - .unwrap(); - } - writeln!( - output, - "default = [{enabled}]", - enabled = enabled_features(&crate_source) - .into_iter() - .map(|f| format!("\"{f}\"")) - .collect::>() - .join(", ") - ) - .unwrap(); - Ok(output) + Ok(()) } fn sha1_file(path: &Path) -> Result { @@ -441,7 +442,7 @@ aws-sdk-transcribestreaming = { path = "some/sdk/path/transcribestreaming" } [features] latest = [] -"release-2023-01-26" = [] +"release-2023-08-23" = [] default = ["latest"] "#, generate_crate_manifest(CrateSource::Path("some/sdk/path".into())).expect("success") @@ -505,7 +506,7 @@ aws-sdk-transcribestreaming = "0.16.0" [features] latest = [] -"release-2023-01-26" = [] +"release-2023-08-23" = [] default = ["latest"] "#, generate_crate_manifest(CrateSource::VersionsManifest { @@ -523,7 +524,7 @@ default = ["latest"] .collect(), release: None, }, - release_tag: ReleaseTag::from_str("release-2023-05-26").unwrap(), + release_tag: ReleaseTag::from_str("release-2023-08-26").unwrap(), }) .expect("success") ); @@ -577,26 +578,25 @@ default = ["latest"] release: None, }; assert_eq!( - enabled_features(&CrateSource::VersionsManifest { + "latest".to_string(), + enabled_feature(&CrateSource::VersionsManifest { versions: versions.clone(), - release_tag: "release-2023-02-23".parse().unwrap(), + release_tag: "release-9999-12-31".parse().unwrap(), }), - vec!["latest".to_string()] ); - assert_eq!( - enabled_features(&CrateSource::VersionsManifest { + "release-2023-08-23".to_string(), + enabled_feature(&CrateSource::VersionsManifest { versions: versions.clone(), - release_tag: "release-2023-01-26".parse().unwrap(), + release_tag: "release-2023-08-23".parse().unwrap(), }), - vec!["release-2023-01-26".to_string()] ); assert_eq!( - enabled_features(&CrateSource::VersionsManifest { + "release-2023-08-23".to_string(), + enabled_feature(&CrateSource::VersionsManifest { versions, release_tag: "release-2023-01-13".parse().unwrap(), }), - vec!["release-2023-01-26".to_string()] ); } }