Skip to content

Commit

Permalink
Add the SinkExt::feed combinator (rust-lang#2155)
Browse files Browse the repository at this point in the history
Like send, except no flushing is done at the end.
This allows sequentially feeding the sink with items in async code
without intermittent flushing.

Reuse code of Feed internally in the Send future.
  • Loading branch information
mzabaluev authored Dec 8, 2020
1 parent 3c5f6ca commit 7397e91
Show file tree
Hide file tree
Showing 4 changed files with 70 additions and 16 deletions.
43 changes: 43 additions & 0 deletions futures-util/src/sink/feed.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
use core::pin::Pin;
use futures_core::future::Future;
use futures_core::ready;
use futures_core::task::{Context, Poll};
use futures_sink::Sink;

/// Future for the [`feed`](super::SinkExt::feed) method.
#[derive(Debug)]
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct Feed<'a, Si: ?Sized, Item> {
sink: &'a mut Si,
item: Option<Item>,
}

// Pinning is never projected to children
impl<Si: Unpin + ?Sized, Item> Unpin for Feed<'_, Si, Item> {}

impl<'a, Si: Sink<Item> + Unpin + ?Sized, Item> Feed<'a, Si, Item> {
pub(super) fn new(sink: &'a mut Si, item: Item) -> Self {
Feed { sink, item: Some(item) }
}

pub(super) fn sink_pin_mut(&mut self) -> Pin<&mut Si> {
Pin::new(self.sink)
}

pub(super) fn is_item_pending(&self) -> bool {
self.item.is_some()
}
}

impl<Si: Sink<Item> + Unpin + ?Sized, Item> Future for Feed<'_, Si, Item> {
type Output = Result<(), Si::Error>;

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.get_mut();
let mut sink = Pin::new(&mut this.sink);
ready!(sink.as_mut().poll_ready(cx))?;
let item = this.item.take().expect("polled Feed after completion");
sink.as_mut().start_send(item)?;
Poll::Ready(Ok(()))
}
}
20 changes: 18 additions & 2 deletions futures-util/src/sink/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ pub use self::drain::{drain, Drain};
mod fanout;
pub use self::fanout::Fanout;

mod feed;
pub use self::feed::Feed;

mod flush;
pub use self::flush::Flush;

Expand Down Expand Up @@ -212,15 +215,28 @@ pub trait SinkExt<Item>: Sink<Item> {
/// into the sink, including flushing.
///
/// Note that, **because of the flushing requirement, it is usually better
/// to batch together items to send via `send_all`, rather than flushing
/// between each item.**
/// to batch together items to send via `feed` or `send_all`,
/// rather than flushing between each item.**
fn send(&mut self, item: Item) -> Send<'_, Self, Item>
where
Self: Unpin,
{
Send::new(self, item)
}

/// A future that completes after the given item has been received
/// by the sink.
///
/// Unlike `send`, the returned future does not flush the sink.
/// It is the caller's responsibility to ensure all pending items
/// are processed, which can be done via `flush` or `close`.
fn feed(&mut self, item: Item) -> Feed<'_, Self, Item>
where
Self: Unpin,
{
Feed::new(self, item)
}

/// A future that completes after the given stream has been fully processed
/// into the sink, including flushing.
///
Expand Down
21 changes: 8 additions & 13 deletions futures-util/src/sink/send.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use super::Feed;
use core::pin::Pin;
use futures_core::future::Future;
use futures_core::ready;
Expand All @@ -8,16 +9,15 @@ use futures_sink::Sink;
#[derive(Debug)]
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct Send<'a, Si: ?Sized, Item> {
sink: &'a mut Si,
item: Option<Item>,
feed: Feed<'a, Si, Item>,
}

// Pinning is never projected to children
impl<Si: Unpin + ?Sized, Item> Unpin for Send<'_, Si, Item> {}

impl<'a, Si: Sink<Item> + Unpin + ?Sized, Item> Send<'a, Si, Item> {
pub(super) fn new(sink: &'a mut Si, item: Item) -> Self {
Self { sink, item: Some(item) }
Self { feed: Feed::new(sink, item) }
}
}

Expand All @@ -26,20 +26,15 @@ impl<Si: Sink<Item> + Unpin + ?Sized, Item> Future for Send<'_, Si, Item> {

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = &mut *self;
if let Some(item) = this.item.take() {
let mut sink = Pin::new(&mut this.sink);
match sink.as_mut().poll_ready(cx)? {
Poll::Ready(()) => sink.as_mut().start_send(item)?,
Poll::Pending => {
this.item = Some(item);
return Poll::Pending;
}
}

if this.feed.is_item_pending() {
ready!(Pin::new(&mut this.feed).poll(cx))?;
debug_assert!(!this.feed.is_item_pending());
}

// we're done sending the item, but want to block on flushing the
// sink
ready!(Pin::new(&mut this.sink).poll_flush(cx))?;
ready!(this.feed.sink_pin_mut().poll_flush(cx))?;

Poll::Ready(Ok(()))
}
Expand Down
2 changes: 1 addition & 1 deletion futures/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -380,7 +380,7 @@ pub mod sink {
pub use futures_sink::Sink;

pub use futures_util::sink::{
drain, unfold, Close, Drain, Fanout, Flush, Send, SendAll, SinkErrInto, SinkExt,
drain, unfold, Close, Drain, Fanout, Feed, Flush, Send, SendAll, SinkErrInto, SinkExt,
SinkMapErr, Unfold, With, WithFlatMap,
};

Expand Down

0 comments on commit 7397e91

Please sign in to comment.