From 0fc359041bca31a457b030f974d1eb0469f120a4 Mon Sep 17 00:00:00 2001 From: Taiki Endo Date: Sun, 1 Nov 2020 04:20:07 +0900 Subject: [PATCH] Fix panic of some TryStreamExt combinators --- .../src/stream/try_stream/try_filter_map.rs | 3 +- .../src/stream/try_stream/try_skip_while.rs | 5 +-- .../src/stream/try_stream/try_take_while.rs | 5 +-- futures/tests/try_stream.rs | 35 +++++++++++++++++++ 4 files changed, 43 insertions(+), 5 deletions(-) create mode 100644 futures/tests/try_stream.rs diff --git a/futures-util/src/stream/try_stream/try_filter_map.rs b/futures-util/src/stream/try_stream/try_filter_map.rs index 6c1e48fdd4..b02e9ee290 100644 --- a/futures-util/src/stream/try_stream/try_filter_map.rs +++ b/futures-util/src/stream/try_stream/try_filter_map.rs @@ -66,8 +66,9 @@ impl Stream for TryFilterMap Poll::Ready(loop { if let Some(p) = this.pending.as_mut().as_pin_mut() { // We have an item in progress, poll that until it's done - let item = ready!(p.try_poll(cx)?); + let res = ready!(p.try_poll(cx)); this.pending.set(None); + let item = res?; if item.is_some() { break item.map(Ok); } diff --git a/futures-util/src/stream/try_stream/try_skip_while.rs b/futures-util/src/stream/try_stream/try_skip_while.rs index 35759d0477..b359ae7948 100644 --- a/futures-util/src/stream/try_stream/try_skip_while.rs +++ b/futures-util/src/stream/try_stream/try_skip_while.rs @@ -74,9 +74,10 @@ impl Stream for TrySkipWhile Poll::Ready(loop { if let Some(fut) = this.pending_fut.as_mut().as_pin_mut() { - let skipped = ready!(fut.try_poll(cx)?); - let item = this.pending_item.take(); + let res = ready!(fut.try_poll(cx)); this.pending_fut.set(None); + let skipped = res?; + let item = this.pending_item.take(); if !skipped { *this.done_skipping = true; break item.map(Ok); diff --git a/futures-util/src/stream/try_stream/try_take_while.rs b/futures-util/src/stream/try_stream/try_take_while.rs index 16bfb2047e..e1ffffd48d 100644 --- a/futures-util/src/stream/try_stream/try_take_while.rs +++ b/futures-util/src/stream/try_stream/try_take_while.rs @@ -76,9 +76,10 @@ where Poll::Ready(loop { if let Some(fut) = this.pending_fut.as_mut().as_pin_mut() { - let take = ready!(fut.try_poll(cx)?); - let item = this.pending_item.take(); + let res = ready!(fut.try_poll(cx)); this.pending_fut.set(None); + let take = res?; + let item = this.pending_item.take(); if take { break item.map(Ok); } else { diff --git a/futures/tests/try_stream.rs b/futures/tests/try_stream.rs new file mode 100644 index 0000000000..080eb3ccc3 --- /dev/null +++ b/futures/tests/try_stream.rs @@ -0,0 +1,35 @@ +use futures::{stream::{self, StreamExt, TryStreamExt},task::Poll}; +use futures_test::task::noop_context; + +#[test] +fn try_filter_map_after_err() { + let cx = &mut noop_context(); + let mut s = stream::iter(1..=3) + .map(Ok) + .try_filter_map(|v| async move { Err::, _>(v) }) + .filter_map(|r| async move { r.ok() }) + .boxed(); + assert_eq!(Poll::Ready(None), s.poll_next_unpin(cx)); +} + +#[test] +fn try_skip_while_after_err() { + let cx = &mut noop_context(); + let mut s = stream::iter(1..=3) + .map(Ok) + .try_skip_while(|_| async move { Err::<_, ()>(()) }) + .filter_map(|r| async move { r.ok() }) + .boxed(); + assert_eq!(Poll::Ready(None), s.poll_next_unpin(cx)); +} + +#[test] +fn try_take_while_after_err() { + let cx = &mut noop_context(); + let mut s = stream::iter(1..=3) + .map(Ok) + .try_take_while(|_| async move { Err::<_, ()>(()) }) + .filter_map(|r| async move { r.ok() }) + .boxed(); + assert_eq!(Poll::Ready(None), s.poll_next_unpin(cx)); +}