Skip to content

Commit

Permalink
Add a Stream::try_stream() method (#70)
Browse files Browse the repository at this point in the history
* Add a Stream::try_stream() method
* Mention non-fuse status in docs.
* Add accessor methods for inner stream.

Signed-off-by: John Nunley <[email protected]>
Signed-off-by: Alain Zscheile <[email protected]>
  • Loading branch information
notgull authored Feb 18, 2024
1 parent 2e55568 commit d4230bd
Showing 1 changed file with 135 additions and 0 deletions.
135 changes: 135 additions & 0 deletions src/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1763,6 +1763,57 @@ pub trait StreamExt: Stream {
}
}

/// Yields all immediately available values from a stream.
///
/// This is intended to be used as a way of polling a stream without waiting, similar to the
/// [`try_iter`] function on [`std::sync::mpsc::Receiver`]. For instance, running this stream
/// on an [`async_channel::Receiver`] will return all messages that are currently in the
/// channel, but will not wait for new messages.
///
/// This returns a [`Stream`] instead of an [`Iterator`] because it still needs access to the
/// polling context in order to poll the underlying stream. Since this stream will never return
/// `Poll::Pending`, wrapping it in [`block_on`] will allow it to be effectively used as an
/// [`Iterator`].
///
/// This stream is not necessarily fused. After it returns `None`, it can return `Some(x)` in
/// the future if it is polled again.
///
/// [`try_iter`]: std::sync::mpsc::Receiver::try_iter
/// [`async_channel::Receiver`]: https://docs.rs/async-channel/latest/async_channel/struct.Receiver.html
/// [`Stream`]: crate::stream::Stream
/// [`Iterator`]: std::iter::Iterator
///
/// # Examples
///
/// ```
/// use futures_lite::{future, pin};
/// use futures_lite::stream::{self, StreamExt};
///
/// # #[cfg(feature = "std")] {
/// // A stream that yields two values, returns `Pending`, and then yields one more value.
/// let pend_once = stream::once_future(async {
/// future::yield_now().await;
/// 3
/// });
/// let s = stream::iter(vec![1, 2]).chain(pend_once);
/// pin!(s);
///
/// // This will return the first two values, and then `None` because the stream returns
/// // `Pending` after that.
/// let mut iter = stream::block_on(s.try_stream());
/// assert_eq!(iter.next(), Some(1));
/// assert_eq!(iter.next(), Some(2));
/// assert_eq!(iter.next(), None);
///
/// // This will return the last value, because the stream returns `Ready` when polled.
/// assert_eq!(iter.next(), Some(3));
/// assert_eq!(iter.next(), None);
/// # }
/// ```
fn try_stream(&mut self) -> TryStream<'_, Self> {
TryStream { stream: self }
}

/// Boxes the stream and changes its type to `dyn Stream + Send + 'a`.
///
/// # Examples
Expand Down Expand Up @@ -3175,3 +3226,87 @@ where
}
}
}

/// Stream for the [`StreamExt::try_stream()`] method.
#[derive(Debug)]
#[must_use = "streams do nothing unless polled"]
pub struct TryStream<'a, S: ?Sized> {
stream: &'a mut S,
}

impl<'a, S: Unpin + ?Sized> Unpin for TryStream<'a, S> {}

impl<'a, S: Unpin + ?Sized> TryStream<'a, S> {
/// Get a reference to the underlying stream.
///
/// ## Examples
///
/// ```
/// use futures_lite::{prelude::*, stream};
///
/// # spin_on::spin_on(async {
/// let mut s = stream::iter(vec![1, 2, 3]);
/// let s2 = s.try_stream();
///
/// let inner = s2.get_ref();
/// // s and inner are the same.
/// # });
/// ```
pub fn get_ref(&self) -> &S {
&self.stream
}

/// Get a mutable reference to the underlying stream.
///
/// ## Examples
///
/// ```
/// use futures_lite::{prelude::*, stream};
///
/// # spin_on::spin_on(async {
/// let mut s = stream::iter(vec![1, 2, 3]);
/// let mut s2 = s.try_stream();
///
/// let inner = s2.get_mut();
/// assert_eq!(inner.collect::<Vec<_>>().await, vec![1, 2, 3]);
/// # });
/// ```
pub fn get_mut(&mut self) -> &mut S {
&mut self.stream
}

/// Consume this stream and get the underlying stream.
///
/// ## Examples
///
/// ```
/// use futures_lite::{prelude::*, stream};
///
/// # spin_on::spin_on(async {
/// let mut s = stream::iter(vec![1, 2, 3]);
/// let mut s2 = s.try_stream();
///
/// let inner = s2.into_inner();
/// assert_eq!(inner.collect::<Vec<_>>().await, vec![1, 2, 3]);
/// # });
/// ```
pub fn into_inner(self) -> &'a mut S {
self.stream
}
}

impl<'a, S: Stream + Unpin + ?Sized> Stream for TryStream<'a, S> {
type Item = S::Item;

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
match self.stream.poll_next(cx) {
Poll::Ready(x) => Poll::Ready(x),
Poll::Pending => Poll::Ready(None),
}
}

fn size_hint(&self) -> (usize, Option<usize>) {
let (_, hi) = self.stream.size_hint();
(0, hi)
}
}

0 comments on commit d4230bd

Please sign in to comment.