From 7397e91f4545b3a29a7fdcb521f18fc04657ff7d Mon Sep 17 00:00:00 2001 From: Mikhail Zabaluev Date: Wed, 9 Dec 2020 01:10:58 +0200 Subject: [PATCH] Add the SinkExt::feed combinator (#2155) Like send, except no flushing is done at the end. This allows sequentially feeding the sink with items in async code without intermittent flushing. Reuse code of Feed internally in the Send future. --- futures-util/src/sink/feed.rs | 43 +++++++++++++++++++++++++++++++++++ futures-util/src/sink/mod.rs | 20 ++++++++++++++-- futures-util/src/sink/send.rs | 21 +++++++---------- futures/src/lib.rs | 2 +- 4 files changed, 70 insertions(+), 16 deletions(-) create mode 100644 futures-util/src/sink/feed.rs diff --git a/futures-util/src/sink/feed.rs b/futures-util/src/sink/feed.rs new file mode 100644 index 0000000000..6701f7a1b4 --- /dev/null +++ b/futures-util/src/sink/feed.rs @@ -0,0 +1,43 @@ +use core::pin::Pin; +use futures_core::future::Future; +use futures_core::ready; +use futures_core::task::{Context, Poll}; +use futures_sink::Sink; + +/// Future for the [`feed`](super::SinkExt::feed) method. +#[derive(Debug)] +#[must_use = "futures do nothing unless you `.await` or poll them"] +pub struct Feed<'a, Si: ?Sized, Item> { + sink: &'a mut Si, + item: Option, +} + +// Pinning is never projected to children +impl Unpin for Feed<'_, Si, Item> {} + +impl<'a, Si: Sink + Unpin + ?Sized, Item> Feed<'a, Si, Item> { + pub(super) fn new(sink: &'a mut Si, item: Item) -> Self { + Feed { sink, item: Some(item) } + } + + pub(super) fn sink_pin_mut(&mut self) -> Pin<&mut Si> { + Pin::new(self.sink) + } + + pub(super) fn is_item_pending(&self) -> bool { + self.item.is_some() + } +} + +impl + Unpin + ?Sized, Item> Future for Feed<'_, Si, Item> { + type Output = Result<(), Si::Error>; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let this = self.get_mut(); + let mut sink = Pin::new(&mut this.sink); + ready!(sink.as_mut().poll_ready(cx))?; + let item = this.item.take().expect("polled Feed after completion"); + sink.as_mut().start_send(item)?; + Poll::Ready(Ok(())) + } +} diff --git a/futures-util/src/sink/mod.rs b/futures-util/src/sink/mod.rs index 7fe5b055ed..742d35da07 100644 --- a/futures-util/src/sink/mod.rs +++ b/futures-util/src/sink/mod.rs @@ -26,6 +26,9 @@ pub use self::drain::{drain, Drain}; mod fanout; pub use self::fanout::Fanout; +mod feed; +pub use self::feed::Feed; + mod flush; pub use self::flush::Flush; @@ -212,8 +215,8 @@ pub trait SinkExt: Sink { /// into the sink, including flushing. /// /// Note that, **because of the flushing requirement, it is usually better - /// to batch together items to send via `send_all`, rather than flushing - /// between each item.** + /// to batch together items to send via `feed` or `send_all`, + /// rather than flushing between each item.** fn send(&mut self, item: Item) -> Send<'_, Self, Item> where Self: Unpin, @@ -221,6 +224,19 @@ pub trait SinkExt: Sink { Send::new(self, item) } + /// A future that completes after the given item has been received + /// by the sink. + /// + /// Unlike `send`, the returned future does not flush the sink. + /// It is the caller's responsibility to ensure all pending items + /// are processed, which can be done via `flush` or `close`. + fn feed(&mut self, item: Item) -> Feed<'_, Self, Item> + where + Self: Unpin, + { + Feed::new(self, item) + } + /// A future that completes after the given stream has been fully processed /// into the sink, including flushing. /// diff --git a/futures-util/src/sink/send.rs b/futures-util/src/sink/send.rs index a655b67ecc..6d21f33fe4 100644 --- a/futures-util/src/sink/send.rs +++ b/futures-util/src/sink/send.rs @@ -1,3 +1,4 @@ +use super::Feed; use core::pin::Pin; use futures_core::future::Future; use futures_core::ready; @@ -8,8 +9,7 @@ use futures_sink::Sink; #[derive(Debug)] #[must_use = "futures do nothing unless you `.await` or poll them"] pub struct Send<'a, Si: ?Sized, Item> { - sink: &'a mut Si, - item: Option, + feed: Feed<'a, Si, Item>, } // Pinning is never projected to children @@ -17,7 +17,7 @@ impl Unpin for Send<'_, Si, Item> {} impl<'a, Si: Sink + Unpin + ?Sized, Item> Send<'a, Si, Item> { pub(super) fn new(sink: &'a mut Si, item: Item) -> Self { - Self { sink, item: Some(item) } + Self { feed: Feed::new(sink, item) } } } @@ -26,20 +26,15 @@ impl + Unpin + ?Sized, Item> Future for Send<'_, Si, Item> { fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let this = &mut *self; - if let Some(item) = this.item.take() { - let mut sink = Pin::new(&mut this.sink); - match sink.as_mut().poll_ready(cx)? { - Poll::Ready(()) => sink.as_mut().start_send(item)?, - Poll::Pending => { - this.item = Some(item); - return Poll::Pending; - } - } + + if this.feed.is_item_pending() { + ready!(Pin::new(&mut this.feed).poll(cx))?; + debug_assert!(!this.feed.is_item_pending()); } // we're done sending the item, but want to block on flushing the // sink - ready!(Pin::new(&mut this.sink).poll_flush(cx))?; + ready!(this.feed.sink_pin_mut().poll_flush(cx))?; Poll::Ready(Ok(())) } diff --git a/futures/src/lib.rs b/futures/src/lib.rs index 953a4fc696..ba46c3b89f 100644 --- a/futures/src/lib.rs +++ b/futures/src/lib.rs @@ -380,7 +380,7 @@ pub mod sink { pub use futures_sink::Sink; pub use futures_util::sink::{ - drain, unfold, Close, Drain, Fanout, Flush, Send, SendAll, SinkErrInto, SinkExt, + drain, unfold, Close, Drain, Fanout, Feed, Flush, Send, SendAll, SinkErrInto, SinkExt, SinkMapErr, Unfold, With, WithFlatMap, };