Skip to content

Commit f5d2b5a

Browse files
committed
sync::broadcast: create Sender::new
1 parent 910a1e2 commit f5d2b5a

File tree

1 file changed

+84
-36
lines changed

1 file changed

+84
-36
lines changed

tokio/src/sync/broadcast.rs

+84-36
Original file line numberDiff line numberDiff line change
@@ -444,42 +444,9 @@ const MAX_RECEIVERS: usize = usize::MAX >> 2;
444444
/// This will panic if `capacity` is equal to `0` or larger
445445
/// than `usize::MAX / 2`.
446446
#[track_caller]
447-
pub fn channel<T: Clone>(mut capacity: usize) -> (Sender<T>, Receiver<T>) {
448-
assert!(capacity > 0, "capacity is empty");
449-
assert!(capacity <= usize::MAX >> 1, "requested capacity too large");
450-
451-
// Round to a power of two
452-
capacity = capacity.next_power_of_two();
453-
454-
let mut buffer = Vec::with_capacity(capacity);
455-
456-
for i in 0..capacity {
457-
buffer.push(RwLock::new(Slot {
458-
rem: AtomicUsize::new(0),
459-
pos: (i as u64).wrapping_sub(capacity as u64),
460-
val: UnsafeCell::new(None),
461-
}));
462-
}
463-
464-
let shared = Arc::new(Shared {
465-
buffer: buffer.into_boxed_slice(),
466-
mask: capacity - 1,
467-
tail: Mutex::new(Tail {
468-
pos: 0,
469-
rx_cnt: 1,
470-
closed: false,
471-
waiters: LinkedList::new(),
472-
}),
473-
num_tx: AtomicUsize::new(1),
474-
});
475-
476-
let rx = Receiver {
477-
shared: shared.clone(),
478-
next: 0,
479-
};
480-
481-
let tx = Sender { shared };
482-
447+
pub fn channel<T: Clone>(capacity: usize) -> (Sender<T>, Receiver<T>) {
448+
let tx = Sender::new(capacity);
449+
let rx = tx.subscribe();
483450
(tx, rx)
484451
}
485452

@@ -490,6 +457,45 @@ unsafe impl<T: Send> Send for Receiver<T> {}
490457
unsafe impl<T: Send> Sync for Receiver<T> {}
491458

492459
impl<T> Sender<T> {
460+
/// Creates the sending-half of the [`broadcast`] channel.
461+
///
462+
/// See documentation of [`broadcast::channel`] for errors when calling this function.
463+
///
464+
/// [`broadcast`]: crate::sync::broadcast
465+
/// [`broadcast::channel`]: crate::sync::broadcast
466+
#[track_caller]
467+
pub fn new(mut capacity: usize) -> Self {
468+
assert!(capacity > 0, "capacity is empty");
469+
assert!(capacity <= usize::MAX >> 1, "requested capacity too large");
470+
471+
// Round to a power of two
472+
capacity = capacity.next_power_of_two();
473+
474+
let mut buffer = Vec::with_capacity(capacity);
475+
476+
for i in 0..capacity {
477+
buffer.push(RwLock::new(Slot {
478+
rem: AtomicUsize::new(0),
479+
pos: (i as u64).wrapping_sub(capacity as u64),
480+
val: UnsafeCell::new(None),
481+
}));
482+
}
483+
484+
let shared = Arc::new(Shared {
485+
buffer: buffer.into_boxed_slice(),
486+
mask: capacity - 1,
487+
tail: Mutex::new(Tail {
488+
pos: 0,
489+
rx_cnt: 0,
490+
closed: false,
491+
waiters: LinkedList::new(),
492+
}),
493+
num_tx: AtomicUsize::new(1),
494+
});
495+
496+
Sender { shared }
497+
}
498+
493499
/// Attempts to send a value to all active [`Receiver`] handles, returning
494500
/// it back if it could not be sent.
495501
///
@@ -1369,3 +1375,45 @@ impl<'a, T> Drop for RecvGuard<'a, T> {
13691375
}
13701376

13711377
fn is_unpin<T: Unpin>() {}
1378+
1379+
#[cfg(not(loom))]
1380+
#[cfg(test)]
1381+
mod tests {
1382+
use super::*;
1383+
1384+
#[test]
1385+
fn receiver_count_on_sender_constructor() {
1386+
let count_of = |sender: &Sender<i32>| sender.shared.tail.lock().rx_cnt;
1387+
1388+
let sender = Sender::<i32>::new(16);
1389+
assert_eq!(count_of(&sender), 0);
1390+
1391+
let rx_1 = sender.subscribe();
1392+
assert_eq!(count_of(&sender), 1);
1393+
1394+
let rx_2 = rx_1.resubscribe();
1395+
assert_eq!(count_of(&sender), 2);
1396+
1397+
let rx_3 = sender.subscribe();
1398+
assert_eq!(count_of(&sender), 3);
1399+
1400+
drop(rx_3);
1401+
drop(rx_1);
1402+
assert_eq!(count_of(&sender), 1);
1403+
1404+
drop(rx_2);
1405+
assert_eq!(count_of(&sender), 0);
1406+
}
1407+
1408+
#[cfg(not(loom))]
1409+
#[test]
1410+
fn receiver_count_on_channel_constructor() {
1411+
let count_of = |sender: &Sender<i32>| sender.shared.tail.lock().rx_cnt;
1412+
1413+
let (sender, rx) = channel::<i32>(16);
1414+
assert_eq!(count_of(&sender), 1);
1415+
1416+
let _rx_2 = rx.resubscribe();
1417+
assert_eq!(count_of(&sender), 2);
1418+
}
1419+
}

0 commit comments

Comments
 (0)