Skip to content

sync: oneshot::Receiver::is_empty() #7153

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Feb 16, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
98 changes: 98 additions & 0 deletions tokio/src/sync/oneshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -989,6 +989,91 @@ impl<T> Receiver<T> {
self.inner.is_none()
}

/// Checks if a channel is empty.
///
/// This method returns `true` if the channel has no messages.
///
/// It is not necessarily safe to poll an empty receiver, which may have
/// already yielded a value. Use [`is_terminated()`][Self::is_terminated]
/// to check whether or not a receiver can be safely polled, instead.
///
/// # Examples
///
/// Sending a value.
///
/// ```
/// use tokio::sync::oneshot;
///
/// #[tokio::main]
/// async fn main() {
/// let (tx, mut rx) = oneshot::channel();
/// assert!(rx.is_empty());
///
/// tx.send(0).unwrap();
/// assert!(!rx.is_empty());
///
/// let _ = (&mut rx).await;
/// assert!(rx.is_empty());
/// }
/// ```
///
/// Dropping the sender.
///
/// ```
/// use tokio::sync::oneshot;
///
/// #[tokio::main]
/// async fn main() {
/// let (tx, mut rx) = oneshot::channel::<()>();
///
/// // A channel is empty if the sender is dropped.
/// drop(tx);
/// assert!(rx.is_empty());
///
/// // A closed channel still yields an error, however.
/// (&mut rx).await.expect_err("should yield an error");
/// assert!(rx.is_empty());
/// }
/// ```
///
/// Terminated channels are empty.
///
/// ```should_panic
/// use tokio::sync::oneshot;
///
/// #[tokio::main]
/// async fn main() {
/// let (tx, mut rx) = oneshot::channel();
/// tx.send(0).unwrap();
/// let _ = (&mut rx).await;
///
/// // NB: an empty channel is not necessarily safe to poll!
/// assert!(rx.is_empty());
/// let _ = (&mut rx).await;
/// }
/// ```
pub fn is_empty(&self) -> bool {
let Some(inner) = self.inner.as_ref() else {
// The channel has already terminated.
return true;
};

let state = State::load(&inner.state, Acquire);
if state.is_complete() {
// SAFETY: If `state.is_complete()` returns true, then the
// `VALUE_SENT` bit has been set and the sender side of the
// channel will no longer attempt to access the inner
// `UnsafeCell`. Therefore, it is now safe for us to access the
// cell.
//
// The channel is empty if it does not have a value.
unsafe { !inner.has_value() }
} else {
// The receiver closed the channel or no value has been sent yet.
true
}
}

/// Attempts to receive a value.
///
/// If a pending value exists in the channel, it is returned. If no value
Expand Down Expand Up @@ -1291,6 +1376,19 @@ impl<T> Inner<T> {
unsafe fn consume_value(&self) -> Option<T> {
self.value.with_mut(|ptr| (*ptr).take())
}

/// Returns true if there is a value. This function does not check `state`.
///
/// # Safety
///
/// Calling this method concurrently on multiple threads will result in a
/// data race. The `VALUE_SENT` state bit is used to ensure that only the
/// sender *or* the receiver will call this method at a given point in time.
/// If `VALUE_SENT` is not set, then only the sender may call this method;
/// if it is set, then only the receiver may call this method.
unsafe fn has_value(&self) -> bool {
self.value.with(|ptr| (*ptr).is_some())
}
}

unsafe impl<T: Send> Send for Inner<T> {}
Expand Down
52 changes: 52 additions & 0 deletions tokio/tests/sync_oneshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -386,3 +386,55 @@ fn receiver_is_terminated_rx_close() {
"channel IS terminated after value is read"
);
}

#[test]
fn receiver_is_empty_send() {
let (tx, mut rx) = oneshot::channel::<i32>();

assert!(rx.is_empty(), "channel IS empty before value is sent");
tx.send(17).unwrap();
assert!(!rx.is_empty(), "channel is NOT empty after value is sent");

let mut task = task::spawn(());
let poll = task.enter(|cx, _| Pin::new(&mut rx).poll(cx));
assert_ready_eq!(poll, Ok(17));

assert!(rx.is_empty(), "channel IS empty after value is read");
}

#[test]
fn receiver_is_empty_try_recv() {
let (tx, mut rx) = oneshot::channel::<i32>();

assert!(rx.is_empty(), "channel IS empty before value is sent");
tx.send(17).unwrap();
assert!(!rx.is_empty(), "channel is NOT empty after value is sent");

let value = rx.try_recv().expect("value is waiting");
assert_eq!(value, 17);

assert!(rx.is_empty(), "channel IS empty after value is read");
}

#[test]
fn receiver_is_empty_drop() {
let (tx, mut rx) = oneshot::channel::<i32>();

assert!(rx.is_empty(), "channel IS empty before sender is dropped");
drop(tx);
assert!(rx.is_empty(), "channel IS empty after sender is dropped");

let mut task = task::spawn(());
let poll = task.enter(|cx, _| Pin::new(&mut rx).poll(cx));
assert_ready_err!(poll);

assert!(rx.is_empty(), "channel IS empty after value is read");
}

#[test]
fn receiver_is_empty_rx_close() {
let (_tx, mut rx) = oneshot::channel::<i32>();
assert!(rx.is_empty());
rx.close();
assert!(rx.is_empty());
}