Skip to content

Commit

Permalink
Fix panic of some TryStreamExt combinators
Browse files Browse the repository at this point in the history
  • Loading branch information
taiki-e committed Oct 31, 2020
1 parent e363f18 commit 0fc3590
Show file tree
Hide file tree
Showing 4 changed files with 43 additions and 5 deletions.
3 changes: 2 additions & 1 deletion futures-util/src/stream/try_stream/try_filter_map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,9 @@ impl<St, Fut, F, T> Stream for TryFilterMap<St, Fut, F>
Poll::Ready(loop {
if let Some(p) = this.pending.as_mut().as_pin_mut() {
// We have an item in progress, poll that until it's done
let item = ready!(p.try_poll(cx)?);
let res = ready!(p.try_poll(cx));
this.pending.set(None);
let item = res?;
if item.is_some() {
break item.map(Ok);
}
Expand Down
5 changes: 3 additions & 2 deletions futures-util/src/stream/try_stream/try_skip_while.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,9 +74,10 @@ impl<St, Fut, F> Stream for TrySkipWhile<St, Fut, F>

Poll::Ready(loop {
if let Some(fut) = this.pending_fut.as_mut().as_pin_mut() {
let skipped = ready!(fut.try_poll(cx)?);
let item = this.pending_item.take();
let res = ready!(fut.try_poll(cx));
this.pending_fut.set(None);
let skipped = res?;
let item = this.pending_item.take();
if !skipped {
*this.done_skipping = true;
break item.map(Ok);
Expand Down
5 changes: 3 additions & 2 deletions futures-util/src/stream/try_stream/try_take_while.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,9 +76,10 @@ where

Poll::Ready(loop {
if let Some(fut) = this.pending_fut.as_mut().as_pin_mut() {
let take = ready!(fut.try_poll(cx)?);
let item = this.pending_item.take();
let res = ready!(fut.try_poll(cx));
this.pending_fut.set(None);
let take = res?;
let item = this.pending_item.take();
if take {
break item.map(Ok);
} else {
Expand Down
35 changes: 35 additions & 0 deletions futures/tests/try_stream.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
use futures::{stream::{self, StreamExt, TryStreamExt},task::Poll};
use futures_test::task::noop_context;

#[test]
fn try_filter_map_after_err() {
let cx = &mut noop_context();
let mut s = stream::iter(1..=3)
.map(Ok)
.try_filter_map(|v| async move { Err::<Option<()>, _>(v) })
.filter_map(|r| async move { r.ok() })
.boxed();
assert_eq!(Poll::Ready(None), s.poll_next_unpin(cx));
}

#[test]
fn try_skip_while_after_err() {
let cx = &mut noop_context();
let mut s = stream::iter(1..=3)
.map(Ok)
.try_skip_while(|_| async move { Err::<_, ()>(()) })
.filter_map(|r| async move { r.ok() })
.boxed();
assert_eq!(Poll::Ready(None), s.poll_next_unpin(cx));
}

#[test]
fn try_take_while_after_err() {
let cx = &mut noop_context();
let mut s = stream::iter(1..=3)
.map(Ok)
.try_take_while(|_| async move { Err::<_, ()>(()) })
.filter_map(|r| async move { r.ok() })
.boxed();
assert_eq!(Poll::Ready(None), s.poll_next_unpin(cx));
}

0 comments on commit 0fc3590

Please sign in to comment.