Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix FnStream generator polling to prevent poll-after-ready #1903

Merged
merged 4 commits into from
Oct 26, 2022
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 34 additions & 3 deletions rust-runtime/aws-smithy-async/src/future/fn_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ pin_project! {
#[pin]
rx: rendezvous::Receiver<Item>,
#[pin]
generator: F,
generator: Option<F>,
}
}

Expand All @@ -58,7 +58,7 @@ impl<Item, F> FnStream<Item, F> {
let (tx, rx) = rendezvous::channel::<Item>();
Self {
rx,
generator: generator(tx),
generator: Some(generator(tx)),
}
}
}
Expand All @@ -74,7 +74,11 @@ where
match me.rx.poll_recv(cx) {
Poll::Ready(item) => Poll::Ready(item),
Poll::Pending => {
let _ = me.generator.poll(cx);
if let Some(generator) = me.generator.as_mut().as_pin_mut() {
if generator.poll(cx).is_ready() {
me.generator.set(None);
}
}
Poll::Pending
}
}
Expand Down Expand Up @@ -140,7 +144,10 @@ where
#[cfg(test)]
mod test {
use crate::future::fn_stream::{FnStream, TryFlatMap};
use futures_util::task::noop_waker_ref;
use std::future::Future;
use std::sync::{Arc, Mutex};
use std::task::Context;
use std::time::Duration;
use tokio_stream::StreamExt;

Expand All @@ -165,6 +172,30 @@ mod test {
assert_eq!(out, vec!["1", "2", "3"]);
}

// smithy-rs#1902: there was a bug where we could continue to poll the generator after it
// had returned Poll::Ready. This test case leaks the tx half so that the channel stays open
// but the send side generator completes. By calling `poll` multiple times on the resulting future,
// we can trigger the bug and validate the fix.
Comment on lines +175 to +178
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for the explanatory comment!

#[tokio::test]
async fn fn_stream_doesnt_poll_after_done() {
let mut stream = FnStream::new(|tx| {
Box::pin(async move {
assert!(tx.send("blah").await.is_ok());
Box::leak(Box::new(tx));
})
});
assert_eq!(stream.next().await, Some("blah"));
let mut fut = Box::pin(stream.next());
assert!(fut
.as_mut()
.poll(&mut Context::from_waker(noop_waker_ref()))
.is_pending());
assert!(fut
.as_mut()
.poll(&mut Context::from_waker(noop_waker_ref()))
.is_pending());
}

/// Tests that the generator will not advance until demand exists
#[tokio::test]
async fn waits_for_reader() {
Expand Down