Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add poll_recv_many for Receiver & UnboundedReceiver #6236

Merged
81 changes: 81 additions & 0 deletions tokio/src/sync/mpsc/bounded.rs
Original file line number Diff line number Diff line change
Expand Up @@ -475,6 +475,87 @@ impl<T> Receiver<T> {
pub fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll<Option<T>> {
self.chan.recv(cx)
}

/// Polls to receive multiple messages on this channel, extending the provided buffer.
///
/// This method attempts to receive messages from the channel and store them in the provided buffer,
/// up to the specified `limit`. It behaves similarly to `poll_recv` but for multiple messages. If `limit`
/// is zero, the function returns immediately with `0`.
///
/// This method also shares similarities with `recv_many`, including its behavior in response to channel closure
/// and message availability. For `limit > 0`, if there are no messages in the channel's queue,
/// but the channel has not yet been closed, this method will sleep until a message is sent or
/// the channel is closed. The channel is closed when all senders have been dropped, or when `close` is called.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These two paragraphs can probably just be deleted. As long as the list below is comprehensive, this isn't necessary, and it is obvious that it is similar to poll_recv and recv_many.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure & deleted

///
/// This method returns:
/// * `Poll::Pending` if no messages are available but the channel is not closed, or if a
/// spurious failure happens. In such cases, the `Waker` from the `Context` is scheduled
/// to be woken up when new messages are sent on the channel or when the channel is closed.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The part about the waker should be moved below this list, similar to poll_recv.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks & I've included your suggestions in my latest commit

/// * `Poll::Ready(count)` where `count` is the number of messages successfully received and
/// stored in `buffer`. This can be less than, or equal to, `limit`. If the channel is closed
/// and there are no more messages to receive, `count` will be the number of messages received
/// before the channel was closed, which could be zero.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would split this into two cases: Poll::Ready(count) for count > 0 and Poll::Ready(0). The length-zero case is special because it only happens when the channel is closed (or n == 0).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup that will be much clearer. I've split it into 2

///
/// Note that this method does not guarantee that the buffer will be filled up to `limit` on each call,
/// especially if fewer messages are available. Also, the actual number of messages received can be
/// zero if the channel is empty or closed.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The English used here is rather complicated. Perhaps we could simplify it like this?

Suggested change
/// Note that this method does not guarantee that the buffer will be filled up to `limit` on each call,
/// especially if fewer messages are available. Also, the actual number of messages received can be
/// zero if the channel is empty or closed.
/// Note that this method does not guarantee that exactly `limit` messages
/// are received. Rather, if at least one message is available, it returns
/// as many messages as it can up to the given limit. This method returns
/// zero only if the channel is closed (or if `limit` is zero).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks & I've adopted your suggestion

///
/// # Examples
///
/// ```
/// use std::task::{Context, Poll};
/// use std::pin::Pin;
/// use tokio::sync::mpsc;
/// use futures::Future;
///
/// struct MyReceiverFuture<'a> {
/// receiver: mpsc::Receiver<i32>,
/// buffer: &'a mut Vec<i32>,
/// limit: usize,
/// }
///
/// impl<'a> Future for MyReceiverFuture<'a> {
/// type Output = usize; // Number of messages received
///
/// fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
/// let MyReceiverFuture { receiver, buffer, limit } = &mut *self;
///
/// // Now `receiver` and `buffer` are mutable references, and `limit` is copied
/// match receiver.poll_recv_many(cx, *buffer, *limit) {
/// Poll::Pending => Poll::Pending,
/// Poll::Ready(count) => Poll::Ready(count),
/// }
/// }
/// }
///
/// #[tokio::main]
/// async fn main() {
/// let (tx, rx) = mpsc::channel(32);
/// let mut buffer = Vec::new();
///
/// let my_receiver_future = MyReceiverFuture {
/// receiver: rx,
/// buffer: &mut buffer,
/// limit: 3,
/// };
///
/// for i in 0..10 {
/// tx.send(i).await.unwrap();
/// }
///
/// let count = my_receiver_future.await;
/// assert_eq!(count, 3);
/// assert_eq!(buffer, vec![0,1,2])
/// }
/// ```
pub fn poll_recv_many(
&mut self,
cx: &mut Context<'_>,
buffer: &mut Vec<T>,
limit: usize,
) -> Poll<usize> {
self.chan.recv_many(cx, buffer, limit)
}
}

impl<T> fmt::Debug for Receiver<T> {
Expand Down
81 changes: 81 additions & 0 deletions tokio/src/sync/mpsc/unbounded.rs
Original file line number Diff line number Diff line change
Expand Up @@ -354,6 +354,87 @@ impl<T> UnboundedReceiver<T> {
pub fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll<Option<T>> {
self.chan.recv(cx)
}

/// Polls to receive multiple messages on this channel, extending the provided buffer.
///
/// This method attempts to receive messages from the channel and store them in the provided buffer,
/// up to the specified `limit`. It behaves similarly to `poll_recv` but for multiple messages. If `limit`
/// is zero, the function returns immediately with `0`.
///
/// This method also shares similarities with `recv_many`, including its behavior in response to channel closure
/// and message availability. For `limit > 0`, if there are no messages in the channel's queue,
/// but the channel has not yet been closed, this method will sleep until a message is sent or
/// the channel is closed. The channel is closed when all senders have been dropped, or when `close` is called.
///
/// This method returns:
/// * `Poll::Pending` if no messages are available but the channel is not closed, or if a
/// spurious failure happens. In such cases, the `Waker` from the `Context` is scheduled
/// to be woken up when new messages are sent on the channel or when the channel is closed.
/// * `Poll::Ready(count)` where `count` is the number of messages successfully received and
/// stored in `buffer`. This can be less than, or equal to, `limit`. If the channel is closed
/// and there are no more messages to receive, `count` will be the number of messages received
/// before the channel was closed, which could be zero.
///
/// Note that this method does not guarantee that the buffer will be filled up to `limit` on each call,
/// especially if fewer messages are available. Also, the actual number of messages received can be
/// zero if the channel is empty or closed.
///
/// # Examples
///
/// ```
/// use std::task::{Context, Poll};
/// use std::pin::Pin;
/// use tokio::sync::mpsc;
/// use futures::Future;
///
/// struct MyReceiverFuture<'a> {
/// receiver: mpsc::UnboundedReceiver<i32>,
/// buffer: &'a mut Vec<i32>,
/// limit: usize,
/// }
///
/// impl<'a> Future for MyReceiverFuture<'a> {
/// type Output = usize; // Number of messages received
///
/// fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
/// let MyReceiverFuture { receiver, buffer, limit } = &mut *self;
///
/// // Now `receiver` and `buffer` are mutable references, and `limit` is copied
/// match receiver.poll_recv_many(cx, *buffer, *limit) {
/// Poll::Pending => Poll::Pending,
/// Poll::Ready(count) => Poll::Ready(count),
/// }
/// }
/// }
///
/// #[tokio::main]
/// async fn main() {
/// let (tx, rx) = mpsc::unbounded_channel::<i32>();
/// let mut buffer = Vec::new();
///
/// let my_receiver_future = MyReceiverFuture {
/// receiver: rx,
/// buffer: &mut buffer,
/// limit: 3,
/// };
///
/// for i in 0..10 {
/// tx.send(i).expect("Unable to send integer");
/// }
///
/// let count = my_receiver_future.await;
/// assert_eq!(count, 3);
/// assert_eq!(buffer, vec![0,1,2])
/// }
/// ```
pub fn poll_recv_many(
&mut self,
cx: &mut Context<'_>,
buffer: &mut Vec<T>,
limit: usize,
) -> Poll<usize> {
self.chan.recv_many(cx, buffer, limit)
}
}

impl<T> UnboundedSender<T> {
Expand Down
Loading