Skip to content

Commit

Permalink
Stream implementation (wrapper) for PaginationStream (#3299)
Browse files Browse the repository at this point in the history
## Motivation and Context
awslabs/aws-sdk-rust#995

## Description
I tried to implement futures::Stream for a wrapper struct around
`PaginationStream`. I am unsure if I did it in the best way. After
fighting with the borrow checker for a while I decided to try
`Arc<Mutex<_>>` - is this the way to go or does there exist a better
way?
Even then, does the code look correct? I used it in my project and my
integration tests do pass but I am not 100% that these tests will catch
any error in paginated ListObjectsV2.

I would appreciate any feedback so far.

## Testing
In progress while waiting for feedback on code

## Checklist
- [x] I have updated `CHANGELOG.next.toml` if I made changes to the
smithy-rs codegen or runtime crates


_By submitting this pull request, I confirm that you can use, modify,
copy, and redistribute this contribution, under the terms of your
choice._

---------

Co-authored-by: Russell Cohen <[email protected]>
Co-authored-by: Russell Cohen <[email protected]>
  • Loading branch information
3 people authored Jan 2, 2024
1 parent 3ea5992 commit 7541fe7
Show file tree
Hide file tree
Showing 6 changed files with 105 additions and 0 deletions.
28 changes: 28 additions & 0 deletions CHANGELOG.next.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,34 @@ references = ["smithy-rs#3300", "aws-sdk-rust#977"]
meta = { "breaking" = false, "tada" = true, "bug" = false }
author = "rcoh"

[[smithy-rs]]
message = """ Add `PaginationStreamExt` extension trait to `aws-smithy-types-convert` behind the `convert-streams` feature. This makes it possible to treat a paginator as a [`futures_core::Stream`](https://docs.rs/futures-core/latest/futures_core/stream/trait.Stream.html), allowing customers to use stream combinators like [`map`](https://docs.rs/tokio-stream/latest/tokio_stream/trait.StreamExt.html#method.map) and [`filter`](https://docs.rs/tokio-stream/latest/tokio_stream/trait.StreamExt.html#method.filter).
Example:
```rust
use aws_smithy_types_convert::stream::PaginationStreamExt
let stream = s3_client.list_objects_v2().bucket("...").into_paginator().send().into_stream_03x();
```
"""
references = ["smithy-rs#3299"]
meta = { "breaking" = false, "tada" = false, "bug" = false, "target" = "client"}
author = "Ploppz"

[[aws-sdk-rust]]
message = """ Add `PaginationStreamExt` extension trait to `aws-smithy-types-convert` behind the `convert-streams` feature. This makes it possible to treat a paginator as a [`futures_core::Stream`](https://docs.rs/futures-core/latest/futures_core/stream/trait.Stream.html), allowing customers to use stream combinators like [`map`](https://docs.rs/tokio-stream/latest/tokio_stream/trait.StreamExt.html#method.map) and [`filter`](https://docs.rs/tokio-stream/latest/tokio_stream/trait.StreamExt.html#method.filter).
Example:
```rust
use aws_smithy_types_convert::stream::PaginationStreamExt
let stream = s3_client.list_objects_v2().bucket("...").into_paginator().send().into_stream_03x();
```
"""
references = ["smithy-rs#3299"]
meta = { "breaking" = false, "tada" = false, "bug" = false }
author = "Ploppz"

[[smithy-rs]]
message = "Serialize 0/false in query parameters, and ignore actual default value during serialization instead of just 0/false. See [changelog discussion](https://github.com/smithy-lang/smithy-rs/discussions/3312) for details."
references = ["smithy-rs#3252", "smithy-rs#3312"]
Expand Down
7 changes: 7 additions & 0 deletions rust-runtime/aws-smithy-async/src/future/pagination_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
use crate::future::pagination_stream::collect::sealed::Collectable;
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};

pub mod collect;
pub mod fn_stream;
use fn_stream::FnStream;
Expand Down Expand Up @@ -60,6 +62,11 @@ impl<Item> PaginationStream<Item> {
self.0.next().await
}

/// Poll an item from the stream
pub fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll<Option<Item>> {
Pin::new(&mut self.0).poll_next(cx)
}

/// Consumes this stream and gathers elements into a collection.
pub async fn collect<T: Collectable<Item>>(self) -> T {
self.0.collect().await
Expand Down
3 changes: 3 additions & 0 deletions rust-runtime/aws-smithy-types-convert/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,14 @@ repository = "https://github.com/smithy-lang/smithy-rs"
[features]
convert-chrono = ["aws-smithy-types", "chrono"]
convert-time = ["aws-smithy-types", "time"]
convert-streams = ["aws-smithy-async", "futures-core"]

[dependencies]
aws-smithy-types = { path = "../aws-smithy-types", optional = true }
aws-smithy-async = {path = "../aws-smithy-async", optional = true}
chrono = { version = "0.4.26", optional = true, default-features = false, features = ["std"] }
time = { version = "0.3.4", optional = true }
futures-core = { version = "0.3.0", optional = true }

[package.metadata.docs.rs]
all-features = true
Expand Down
2 changes: 2 additions & 0 deletions rust-runtime/aws-smithy-types-convert/external-types.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,6 @@ allowed_external_types = [
"chrono::offset::fixed::FixedOffset",
"chrono::offset::utc::Utc",
"time::offset_date_time::OffsetDateTime",
"aws_smithy_async::future::pagination_stream::PaginationStream",
"futures_core::stream::Stream",
]
3 changes: 3 additions & 0 deletions rust-runtime/aws-smithy-types-convert/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,6 @@

#[cfg(any(feature = "convert-time", feature = "convert-chrono"))]
pub mod date_time;

#[cfg(feature = "convert-streams")]
pub mod stream;
62 changes: 62 additions & 0 deletions rust-runtime/aws-smithy-types-convert/src/stream.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
* SPDX-License-Identifier: Apache-2.0
*/

//! Conversions from Stream-like structs to implementors of `futures::Stream`

use futures_core::Stream;
use std::pin::Pin;
use std::task::{Context, Poll};

use aws_smithy_async::future::pagination_stream::PaginationStream;

/// Stream implementor wrapping `PaginationStream`
pub struct PaginationStreamImplStream<Item> {
pagination_stream: PaginationStream<Item>,
}

impl<Item> PaginationStreamImplStream<Item> {
/// Create a new Stream object wrapping a `PaginationStream`
pub fn new(pagination_stream: PaginationStream<Item>) -> Self {
PaginationStreamImplStream { pagination_stream }
}
}

impl<Item> Stream for PaginationStreamImplStream<Item> {
type Item = Item;

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
self.pagination_stream.poll_next(cx)
}
}

/// Trait to convert PaginationStream into implementor of `Stream`
pub trait PaginationStreamExt<Item> {
/// Convert PaginationStream into implementor of `Stream`
///
/// # Example
/// ```no_run
/// # use aws_smithy_async::future::pagination_stream::PaginationStream;
/// use aws_smithy_types_convert::stream::PaginationStreamExt;
/// // Assuming you have obtained a pagination stream, by something like:
/// // ```
/// // let pagination_stream = s3_client
/// // .list_objects_v2()
/// // .bucket(bucket)
/// // .into_paginator()
/// // .send();
/// // ```
/// # let pagination_stream: PaginationStream<i32> = unimplemented!();
/// let futures_stream = pagination_stream.into_stream_03x();
/// ```
fn into_stream_03x(self) -> PaginationStreamImplStream<Item>;
}

impl<Item> PaginationStreamExt<Item> for PaginationStream<Item> {
fn into_stream_03x(self) -> PaginationStreamImplStream<Item> {
PaginationStreamImplStream {
pagination_stream: self,
}
}
}

0 comments on commit 7541fe7

Please sign in to comment.