Skip to content

Commit

Permalink
Reuse code of Feed in Send
Browse files Browse the repository at this point in the history
  • Loading branch information
mzabaluev committed May 26, 2020
1 parent bed010b commit 48cb859
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 15 deletions.
4 changes: 4 additions & 0 deletions futures-util/src/sink/feed.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@ impl<'a, Si: Sink<Item> + Unpin + ?Sized, Item> Feed<'a, Si, Item> {
item: Some(item),
}
}

pub(super) fn sink_pin_mut(&mut self) -> Pin<&mut Si> {
Pin::new(self.sink)
}
}

impl<Si: Sink<Item> + Unpin + ?Sized, Item> Future for Feed<'_, Si, Item> {
Expand Down
22 changes: 7 additions & 15 deletions futures-util/src/sink/send.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
use super::Feed;
use core::pin::Pin;
use futures_core::future::Future;
use futures_core::future::{Future, FusedFuture};
use futures_core::task::{Context, Poll};
use futures_sink::Sink;

/// Future for the [`send`](super::SinkExt::send) method.
#[derive(Debug)]
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct Send<'a, Si: ?Sized, Item> {
sink: &'a mut Si,
item: Option<Item>,
feed: Feed<'a, Si, Item>,
}

// Pinning is never projected to children
Expand All @@ -17,8 +17,7 @@ impl<Si: Unpin + ?Sized, Item> Unpin for Send<'_, Si, Item> {}
impl<'a, Si: Sink<Item> + Unpin + ?Sized, Item> Send<'a, Si, Item> {
pub(super) fn new(sink: &'a mut Si, item: Item) -> Self {
Send {
sink,
item: Some(item),
feed: Feed::new(sink, item),
}
}
}
Expand All @@ -31,20 +30,13 @@ impl<Si: Sink<Item> + Unpin + ?Sized, Item> Future for Send<'_, Si, Item> {
cx: &mut Context<'_>,
) -> Poll<Self::Output> {
let this = &mut *self;
if let Some(item) = this.item.take() {
let mut sink = Pin::new(&mut this.sink);
match sink.as_mut().poll_ready(cx)? {
Poll::Ready(()) => sink.as_mut().start_send(item)?,
Poll::Pending => {
this.item = Some(item);
return Poll::Pending;
}
}
if !this.feed.is_terminated() {
ready!(Pin::new(&mut this.feed).poll(cx))?;
}

// we're done sending the item, but want to block on flushing the
// sink
ready!(Pin::new(&mut this.sink).poll_flush(cx))?;
ready!(this.feed.sink_pin_mut().poll_flush(cx))?;

Poll::Ready(Ok(()))
}
Expand Down

0 comments on commit 48cb859

Please sign in to comment.