Skip to content

Commit

Permalink
Revert FusedFuture implementations for Feed and FeedAll
Browse files Browse the repository at this point in the history
The provided implementations did not account for the underlying
stream or sink returning an Err. Accounting for the error cases
requires extra logic or flags, which may not be worth it.
  • Loading branch information
mzabaluev committed May 26, 2020
1 parent 70b375a commit d82d0e8
Show file tree
Hide file tree
Showing 3 changed files with 11 additions and 21 deletions.
12 changes: 5 additions & 7 deletions futures-util/src/sink/feed.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use core::pin::Pin;
use futures_core::future::{Future, FusedFuture};
use futures_core::future::Future;
use futures_core::task::{Context, Poll};
use futures_sink::Sink;

Expand All @@ -25,6 +25,10 @@ impl<'a, Si: Sink<Item> + Unpin + ?Sized, Item> Feed<'a, Si, Item> {
pub(super) fn sink_pin_mut(&mut self) -> Pin<&mut Si> {
Pin::new(self.sink)
}

pub(super) fn is_item_pending(&self) -> bool {
self.item.is_some()
}
}

impl<Si: Sink<Item> + Unpin + ?Sized, Item> Future for Feed<'_, Si, Item> {
Expand All @@ -42,9 +46,3 @@ impl<Si: Sink<Item> + Unpin + ?Sized, Item> Future for Feed<'_, Si, Item> {
Poll::Ready(Ok(()))
}
}

impl<Si: Sink<Item> + Unpin + ?Sized, Item> FusedFuture for Feed<'_, Si, Item> {
fn is_terminated(&self) -> bool {
self.item.is_none()
}
}
14 changes: 2 additions & 12 deletions futures-util/src/sink/feed_all.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use crate::stream::TryStreamExt;
use core::fmt;
use core::pin::Pin;
use futures_core::future::{Future, FusedFuture};
use futures_core::stream::{TryStream, Stream, FusedStream};
use futures_core::future::Future;
use futures_core::stream::{TryStream, Stream};
use futures_core::task::{Context, Poll};
use futures_sink::Sink;

Expand Down Expand Up @@ -113,13 +113,3 @@ where
}
}
}

impl<Si, St, Ok, Error> FusedFuture for FeedAll<'_, Si, St>
where
Si: Sink<Ok, Error = Error> + Unpin + ?Sized,
St: Stream<Item = Result<Ok, Error>> + FusedStream + Unpin + ?Sized,
{
fn is_terminated(&self) -> bool {
self.buffered.is_none() && self.stream.is_terminated()
}
}
6 changes: 4 additions & 2 deletions futures-util/src/sink/send.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use super::Feed;
use core::pin::Pin;
use futures_core::future::{Future, FusedFuture};
use futures_core::future::Future;
use futures_core::task::{Context, Poll};
use futures_sink::Sink;

Expand Down Expand Up @@ -30,8 +30,10 @@ impl<Si: Sink<Item> + Unpin + ?Sized, Item> Future for Send<'_, Si, Item> {
cx: &mut Context<'_>,
) -> Poll<Self::Output> {
let this = &mut *self;
if !this.feed.is_terminated() {

if this.feed.is_item_pending() {
ready!(Pin::new(&mut this.feed).poll(cx))?;
debug_assert!(!this.feed.is_item_pending());
}

// we're done sending the item, but want to block on flushing the
Expand Down

0 comments on commit d82d0e8

Please sign in to comment.