From 552fa7e5d1ca25517792a2535888de36fcb08433 Mon Sep 17 00:00:00 2001 From: Chris Brody Date: Sun, 19 Feb 2023 10:16:12 -0500 Subject: [PATCH] sync: add `WatchStream::from_changes` (#5432) --- tokio-stream/src/wrappers/watch.rs | 34 ++++++++++++++++++++++++++++-- tokio-stream/tests/watch.rs | 30 +++++++++++++++++++++++++- 2 files changed, 61 insertions(+), 3 deletions(-) diff --git a/tokio-stream/src/wrappers/watch.rs b/tokio-stream/src/wrappers/watch.rs index c682c9c271d..ec8ead06da0 100644 --- a/tokio-stream/src/wrappers/watch.rs +++ b/tokio-stream/src/wrappers/watch.rs @@ -10,8 +10,9 @@ use tokio::sync::watch::error::RecvError; /// A wrapper around [`tokio::sync::watch::Receiver`] that implements [`Stream`]. /// -/// This stream will always start by yielding the current value when the WatchStream is polled, -/// regardless of whether it was the initial value or sent afterwards. +/// This stream will start by yielding the current value when the WatchStream is polled, +/// regardless of whether it was the initial value or sent afterwards, +/// unless you use [`WatchStream::from_changes`]. /// /// # Examples /// @@ -40,6 +41,28 @@ use tokio::sync::watch::error::RecvError; /// let (tx, rx) = watch::channel("hello"); /// let mut rx = WatchStream::new(rx); /// +/// // existing rx output with "hello" is ignored here +/// +/// tx.send("goodbye").unwrap(); +/// assert_eq!(rx.next().await, Some("goodbye")); +/// # } +/// ``` +/// +/// Example with [`WatchStream::from_changes`]: +/// +/// ``` +/// # #[tokio::main] +/// # async fn main() { +/// use futures::future::FutureExt; +/// use tokio::sync::watch; +/// use tokio_stream::{StreamExt, wrappers::WatchStream}; +/// +/// let (tx, rx) = watch::channel("hello"); +/// let mut rx = WatchStream::from_changes(rx); +/// +/// // no output from rx is available at this point - let's check this: +/// assert!(rx.next().now_or_never().is_none()); +/// /// tx.send("goodbye").unwrap(); /// assert_eq!(rx.next().await, Some("goodbye")); /// # } @@ -66,6 +89,13 @@ impl WatchStream { inner: ReusableBoxFuture::new(async move { (Ok(()), rx) }), } } + + /// Create a new `WatchStream` that waits for the value to be changed. + pub fn from_changes(rx: Receiver) -> Self { + Self { + inner: ReusableBoxFuture::new(make_future(rx)), + } + } } impl Stream for WatchStream { diff --git a/tokio-stream/tests/watch.rs b/tokio-stream/tests/watch.rs index a56254edefd..3a39aaf3db7 100644 --- a/tokio-stream/tests/watch.rs +++ b/tokio-stream/tests/watch.rs @@ -3,9 +3,11 @@ use tokio::sync::watch; use tokio_stream::wrappers::WatchStream; use tokio_stream::StreamExt; +use tokio_test::assert_pending; +use tokio_test::task::spawn; #[tokio::test] -async fn message_not_twice() { +async fn watch_stream_message_not_twice() { let (tx, rx) = watch::channel("hello"); let mut counter = 0; @@ -27,3 +29,29 @@ async fn message_not_twice() { drop(tx); task.await.unwrap(); } + +#[tokio::test] +async fn watch_stream_from_rx() { + let (tx, rx) = watch::channel("hello"); + + let mut stream = WatchStream::from(rx); + + assert_eq!(stream.next().await.unwrap(), "hello"); + + tx.send("bye").unwrap(); + + assert_eq!(stream.next().await.unwrap(), "bye"); +} + +#[tokio::test] +async fn watch_stream_from_changes() { + let (tx, rx) = watch::channel("hello"); + + let mut stream = WatchStream::from_changes(rx); + + assert_pending!(spawn(&mut stream).poll_next()); + + tx.send("bye").unwrap(); + + assert_eq!(stream.next().await.unwrap(), "bye"); +}