diff --git a/futures-util/src/sink/feed.rs b/futures-util/src/sink/feed.rs index 3c1f7de2df..db660b89da 100644 --- a/futures-util/src/sink/feed.rs +++ b/futures-util/src/sink/feed.rs @@ -21,6 +21,10 @@ impl<'a, Si: Sink + Unpin + ?Sized, Item> Feed<'a, Si, Item> { item: Some(item), } } + + pub(super) fn sink_pin_mut(&mut self) -> Pin<&mut Si> { + Pin::new(self.sink) + } } impl + Unpin + ?Sized, Item> Future for Feed<'_, Si, Item> { diff --git a/futures-util/src/sink/send.rs b/futures-util/src/sink/send.rs index dc7f0be254..860f24d8f2 100644 --- a/futures-util/src/sink/send.rs +++ b/futures-util/src/sink/send.rs @@ -1,5 +1,6 @@ +use super::Feed; use core::pin::Pin; -use futures_core::future::Future; +use futures_core::future::{Future, FusedFuture}; use futures_core::task::{Context, Poll}; use futures_sink::Sink; @@ -7,8 +8,7 @@ use futures_sink::Sink; #[derive(Debug)] #[must_use = "futures do nothing unless you `.await` or poll them"] pub struct Send<'a, Si: ?Sized, Item> { - sink: &'a mut Si, - item: Option, + feed: Feed<'a, Si, Item>, } // Pinning is never projected to children @@ -17,8 +17,7 @@ impl Unpin for Send<'_, Si, Item> {} impl<'a, Si: Sink + Unpin + ?Sized, Item> Send<'a, Si, Item> { pub(super) fn new(sink: &'a mut Si, item: Item) -> Self { Send { - sink, - item: Some(item), + feed: Feed::new(sink, item), } } } @@ -31,20 +30,13 @@ impl + Unpin + ?Sized, Item> Future for Send<'_, Si, Item> { cx: &mut Context<'_>, ) -> Poll { let this = &mut *self; - if let Some(item) = this.item.take() { - let mut sink = Pin::new(&mut this.sink); - match sink.as_mut().poll_ready(cx)? { - Poll::Ready(()) => sink.as_mut().start_send(item)?, - Poll::Pending => { - this.item = Some(item); - return Poll::Pending; - } - } + if !this.feed.is_terminated() { + ready!(Pin::new(&mut this.feed).poll(cx))?; } // we're done sending the item, but want to block on flushing the // sink - ready!(Pin::new(&mut this.sink).poll_flush(cx))?; + ready!(this.feed.sink_pin_mut().poll_flush(cx))?; Poll::Ready(Ok(())) }