Skip to content

Commit

Permalink
Reuse code of FeedAll in SendAll
Browse files Browse the repository at this point in the history
  • Loading branch information
mzabaluev committed May 26, 2020
1 parent 48cb859 commit 70b375a
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 46 deletions.
4 changes: 4 additions & 0 deletions futures-util/src/sink/feed_all.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,10 @@ where
}
}

pub(super) fn sink_pin_mut(&mut self) -> Pin<&mut Si> {
Pin::new(self.sink)
}

fn try_start_send(
&mut self,
cx: &mut Context<'_>,
Expand Down
59 changes: 13 additions & 46 deletions futures-util/src/sink/send_all.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::stream::{StreamExt, TryStreamExt, Fuse};
use super::FeedAll;
use core::fmt;
use core::pin::Pin;
use futures_core::future::Future;
Expand All @@ -14,9 +14,8 @@ where
Si: ?Sized,
St: ?Sized + TryStream,
{
sink: &'a mut Si,
stream: Fuse<&'a mut St>,
buffered: Option<St::Ok>,
feed_all: FeedAll<'a, Si, St>,
is_flushing: bool,
}

impl<Si, St> fmt::Debug for SendAll<'_, Si, St>
Expand All @@ -27,9 +26,8 @@ where
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("SendAll")
.field("sink", &self.sink)
.field("stream", &self.stream)
.field("buffered", &self.buffered)
.field("feed_all", &self.feed_all)
.field("is_flushing", &self.is_flushing)
.finish()
}
}
Expand All @@ -51,26 +49,8 @@ where
stream: &'a mut St,
) -> SendAll<'a, Si, St> {
SendAll {
sink,
stream: stream.fuse(),
buffered: None,
}
}

fn try_start_send(
&mut self,
cx: &mut Context<'_>,
item: St::Ok,
) -> Poll<Result<(), Si::Error>> {
debug_assert!(self.buffered.is_none());
match Pin::new(&mut self.sink).poll_ready(cx)? {
Poll::Ready(()) => {
Poll::Ready(Pin::new(&mut self.sink).start_send(item))
}
Poll::Pending => {
self.buffered = Some(item);
Poll::Pending
}
feed_all: FeedAll::new(sink, stream),
is_flushing: false,
}
}
}
Expand All @@ -87,26 +67,13 @@ where
cx: &mut Context<'_>,
) -> Poll<Self::Output> {
let this = &mut *self;
// If we've got an item buffered already, we need to write it to the
// sink before we can do anything else
if let Some(item) = this.buffered.take() {
ready!(this.try_start_send(cx, item))?
}

loop {
match this.stream.try_poll_next_unpin(cx)? {
Poll::Ready(Some(item)) => {
ready!(this.try_start_send(cx, item))?
}
Poll::Ready(None) => {
ready!(Pin::new(&mut this.sink).poll_flush(cx))?;
return Poll::Ready(Ok(()))
}
Poll::Pending => {
ready!(Pin::new(&mut this.sink).poll_flush(cx))?;
return Poll::Pending
}
}
if !this.is_flushing {
ready!(Pin::new(&mut this.feed_all).poll(cx))?;
this.is_flushing = true;
}

ready!(this.feed_all.sink_pin_mut().poll_flush(cx))?;
Poll::Ready(Ok(()))
}
}

0 comments on commit 70b375a

Please sign in to comment.