Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add recv_many for mpsc channels #6010

Merged
merged 37 commits into from
Oct 17, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
f13ea3e
Method recv_many returns a Vec<T> for mpsc channels
Jul 1, 2023
0133425
Update recv_many to populate a passed-in buffer.
Sep 9, 2023
7d0d8db
fix rustfmt issues
Sep 15, 2023
487f887
fix returning the result of a binding from a block
Sep 15, 2023
7086b34
fix rustfmt braces around continue
Sep 15, 2023
9a170b0
Update from bencher and merge benchmarks into sync_mpsc.rs
Sep 16, 2023
01b2270
fix Cargo.toml to reflect deletion of old benchmark
Sep 16, 2023
8c8b06f
Simplified recv_many implementation
Sep 25, 2023
dc81be3
Fix test send_recv_unbounded for arbitrarty BLOCK_CAP values
Sep 25, 2023
8f5b00c
Fix for rustfmt
Sep 25, 2023
3c1dc33
switch to break to exit loop
Sep 26, 2023
757bc33
resolve return value 0 for recv_many
Sep 27, 2023
0948857
Change to recv_many to no longer clear the output buffer
Sep 27, 2023
1dc2dbf
Filled buffer handling now fail-safe, not fail-fast
Sep 28, 2023
6bf430c
improved doctest example
Sep 30, 2023
cdc3665
fixed nested if; resolve documentation issues for recv_many
Sep 30, 2023
4c1321f
Update tokio/src/sync/mpsc/unbounded.rs
aschweig Oct 1, 2023
27ae226
Update tokio/src/sync/mpsc/unbounded.rs
aschweig Oct 1, 2023
9a1d9bc
Updated doctests to be similar; clarified documentation
Oct 1, 2023
7fca954
accomodate that with_capacity(n) gives capacity of at least n
Oct 1, 2023
fe44b32
Improved documentation for `recv_many` and added test
Oct 1, 2023
aac0eda
Update tokio/src/sync/mpsc/bounded.rs
aschweig Oct 1, 2023
088537c
Update tokio/src/sync/mpsc/bounded.rs
aschweig Oct 1, 2023
effb0e5
change recv_many doctests to use drop not scope
Oct 1, 2023
1fb08df
adopt improved wording on buffer capacity behavior
Oct 1, 2023
bd1ef3a
Only reserve if no unused capacity and there is 1+ value(s) to receive
Oct 1, 2023
d31fd17
Updated `recv_many` to accept a new argument `limit`
Oct 2, 2023
3cd2e46
rename to_go to remaining
Oct 2, 2023
326fa53
Change behavior of recv_many(buf, 0) to return 0 immediately.
Oct 8, 2023
e8950ce
Fix doc issues
Oct 8, 2023
ead4ca6
Update tokio/src/sync/mpsc/chan.rs
aschweig Oct 11, 2023
2fcb715
Update tokio/src/sync/mpsc/chan.rs
aschweig Oct 11, 2023
ba46b7c
check the coop budget in limit==0 case
Oct 11, 2023
b1d1cae
clean up some recv_many tests
Oct 11, 2023
f162ca7
Update tokio/src/sync/mpsc/bounded.rs
aschweig Oct 11, 2023
9f39dc7
Reflow doc comments; adjust comment in chan.rs
Oct 11, 2023
27e39d9
Merge branch 'master' into recv_many
Darksonn Oct 13, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
130 changes: 130 additions & 0 deletions benches/sync_mpsc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,33 @@ fn contention_bounded(g: &mut BenchmarkGroup<WallTime>) {
});
}

fn contention_bounded_recv_many(g: &mut BenchmarkGroup<WallTime>) {
let rt = rt();

g.bench_function("bounded_recv_many", |b| {
b.iter(|| {
rt.block_on(async move {
let (tx, mut rx) = mpsc::channel::<usize>(1_000_000);

for _ in 0..5 {
let tx = tx.clone();
tokio::spawn(async move {
for i in 0..1000 {
tx.send(i).await.unwrap();
}
});
}

let mut buffer = Vec::<usize>::with_capacity(5_000);
let mut total = 0;
while total < 1_000 * 5 {
total += rx.recv_many(&mut buffer, 5_000).await;
}
})
})
});
}

fn contention_bounded_full(g: &mut BenchmarkGroup<WallTime>) {
let rt = rt();

Expand All @@ -98,6 +125,33 @@ fn contention_bounded_full(g: &mut BenchmarkGroup<WallTime>) {
});
}

fn contention_bounded_full_recv_many(g: &mut BenchmarkGroup<WallTime>) {
let rt = rt();

g.bench_function("bounded_full_recv_many", |b| {
b.iter(|| {
rt.block_on(async move {
let (tx, mut rx) = mpsc::channel::<usize>(100);

for _ in 0..5 {
let tx = tx.clone();
tokio::spawn(async move {
for i in 0..1000 {
tx.send(i).await.unwrap();
}
});
}

let mut buffer = Vec::<usize>::with_capacity(5_000);
let mut total = 0;
while total < 1_000 * 5 {
total += rx.recv_many(&mut buffer, 5_000).await;
}
})
})
});
}

fn contention_unbounded(g: &mut BenchmarkGroup<WallTime>) {
let rt = rt();

Expand All @@ -123,6 +177,33 @@ fn contention_unbounded(g: &mut BenchmarkGroup<WallTime>) {
});
}

fn contention_unbounded_recv_many(g: &mut BenchmarkGroup<WallTime>) {
let rt = rt();

g.bench_function("unbounded_recv_many", |b| {
b.iter(|| {
rt.block_on(async move {
let (tx, mut rx) = mpsc::unbounded_channel::<usize>();

for _ in 0..5 {
let tx = tx.clone();
tokio::spawn(async move {
for i in 0..1000 {
tx.send(i).unwrap();
}
});
}

let mut buffer = Vec::<usize>::with_capacity(5_000);
let mut total = 0;
while total < 1_000 * 5 {
total += rx.recv_many(&mut buffer, 5_000).await;
}
})
})
});
}

fn uncontented_bounded(g: &mut BenchmarkGroup<WallTime>) {
let rt = rt();

Expand All @@ -143,6 +224,28 @@ fn uncontented_bounded(g: &mut BenchmarkGroup<WallTime>) {
});
}

fn uncontented_bounded_recv_many(g: &mut BenchmarkGroup<WallTime>) {
let rt = rt();

g.bench_function("bounded_recv_many", |b| {
b.iter(|| {
rt.block_on(async move {
let (tx, mut rx) = mpsc::channel::<usize>(1_000_000);

for i in 0..5000 {
tx.send(i).await.unwrap();
}

let mut buffer = Vec::<usize>::with_capacity(5_000);
let mut total = 0;
while total < 1_000 * 5 {
total += rx.recv_many(&mut buffer, 5_000).await;
}
})
})
});
}

fn uncontented_unbounded(g: &mut BenchmarkGroup<WallTime>) {
let rt = rt();

Expand All @@ -163,6 +266,28 @@ fn uncontented_unbounded(g: &mut BenchmarkGroup<WallTime>) {
});
}

fn uncontented_unbounded_recv_many(g: &mut BenchmarkGroup<WallTime>) {
let rt = rt();

g.bench_function("unbounded_recv_many", |b| {
b.iter(|| {
rt.block_on(async move {
let (tx, mut rx) = mpsc::unbounded_channel::<usize>();

for i in 0..5000 {
tx.send(i).unwrap();
}

let mut buffer = Vec::<usize>::with_capacity(5_000);
let mut total = 0;
while total < 1_000 * 5 {
total += rx.recv_many(&mut buffer, 5_000).await;
}
})
})
});
}

fn bench_create_medium(c: &mut Criterion) {
let mut group = c.benchmark_group("create_medium");
create_medium::<1>(&mut group);
Expand All @@ -181,15 +306,20 @@ fn bench_send(c: &mut Criterion) {
fn bench_contention(c: &mut Criterion) {
let mut group = c.benchmark_group("contention");
contention_bounded(&mut group);
contention_bounded_recv_many(&mut group);
contention_bounded_full(&mut group);
contention_bounded_full_recv_many(&mut group);
contention_unbounded(&mut group);
contention_unbounded_recv_many(&mut group);
group.finish();
}

fn bench_uncontented(c: &mut Criterion) {
let mut group = c.benchmark_group("uncontented");
uncontented_bounded(&mut group);
uncontented_bounded_recv_many(&mut group);
uncontented_unbounded(&mut group);
uncontented_unbounded_recv_many(&mut group);
group.finish();
}

Expand Down
82 changes: 79 additions & 3 deletions tokio/src/sync/mpsc/bounded.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,10 @@ pub struct Sender<T> {
/// async fn main() {
/// let (tx, _rx) = channel::<i32>(15);
/// let tx_weak = tx.downgrade();
///
///
/// // Upgrading will succeed because `tx` still exists.
/// assert!(tx_weak.upgrade().is_some());
///
///
/// // If we drop `tx`, then it will fail.
/// drop(tx);
/// assert!(tx_weak.clone().upgrade().is_none());
Expand Down Expand Up @@ -230,6 +230,82 @@ impl<T> Receiver<T> {
poll_fn(|cx| self.chan.recv(cx)).await
}

/// Receives the next values for this receiver and extends `buffer`.
///
/// This method extends `buffer` by no more than a fixed number of values
/// as specified by `limit`. If `limit` is zero, the function immediately
/// returns `0`. The return value is the number of values added to `buffer`.
///
/// For `limit > 0`, if there are no messages in the channel's queue, but
/// the channel has not yet been closed, this method will sleep until a
/// message is sent or the channel is closed. Note that if [`close`] is
/// called, but there are still outstanding [`Permits`] from before it was
/// closed, the channel is not considered closed by `recv_many` until the
/// permits are released.
///
/// For non-zero values of `limit`, this method will never return `0` unless
/// the channel has been closed and there are no remaining messages in the
/// channel's queue. This indicates that no further values can ever be
/// received from this `Receiver`. The channel is closed when all senders
/// have been dropped, or when [`close`] is called.
///
/// The capacity of `buffer` is increased as needed.
///
/// # Cancel safety
///
/// This method is cancel safe. If `recv_many` is used as the event in a
/// [`tokio::select!`](crate::select) statement and some other branch
/// completes first, it is guaranteed that no messages were received on this
/// channel.
///
/// [`close`]: Self::close
/// [`Permits`]: struct@crate::sync::mpsc::Permit
///
/// # Examples
///
/// ```
/// use tokio::sync::mpsc;
///
/// #[tokio::main]
/// async fn main() {
/// let mut buffer: Vec<&str> = Vec::with_capacity(2);
/// let limit = 2;
/// let (tx, mut rx) = mpsc::channel(100);
/// let tx2 = tx.clone();
/// tx2.send("first").await.unwrap();
/// tx2.send("second").await.unwrap();
/// tx2.send("third").await.unwrap();
///
/// // Call `recv_many` to receive up to `limit` (2) values.
/// assert_eq!(2, rx.recv_many(&mut buffer, limit).await);
/// assert_eq!(vec!["first", "second"], buffer);
///
/// // If the buffer is full, the next call to `recv_many`
/// // reserves additional capacity.
/// assert_eq!(1, rx.recv_many(&mut buffer, 1).await);
///
/// tokio::spawn(async move {
/// tx.send("fourth").await.unwrap();
/// });
///
/// // 'tx' is dropped, but `recv_many`
/// // is guaranteed not to return 0 as the channel
/// // is not yet closed.
/// assert_eq!(1, rx.recv_many(&mut buffer, 1).await);
/// assert_eq!(vec!["first", "second", "third", "fourth"], buffer);
///
/// // Once the last sender is dropped, the channel is
/// // closed and `recv_many` returns 0, capacity unchanged.
/// drop(tx2);
/// assert_eq!(0, rx.recv_many(&mut buffer, limit).await);
/// assert_eq!(vec!["first", "second", "third", "fourth"], buffer);
/// }
/// ```
pub async fn recv_many(&mut self, buffer: &mut Vec<T>, limit: usize) -> usize {
use crate::future::poll_fn;
poll_fn(|cx| self.chan.recv_many(cx, buffer, limit)).await
}

/// Tries to receive the next value for this receiver.
///
/// This method returns the [`Empty`] error if the channel is currently
Expand Down Expand Up @@ -1072,7 +1148,7 @@ impl<T> Sender<T> {
/// #[tokio::main]
/// async fn main() {
/// let (tx, _rx) = mpsc::channel::<()>(5);
///
///
/// // both max capacity and capacity are the same at first
/// assert_eq!(tx.max_capacity(), 5);
/// assert_eq!(tx.capacity(), 5);
Expand Down
Loading