Skip to content

Commit eed672b

Browse files
committed
sync::broadcast: create Sender::new
1 parent 910a1e2 commit eed672b

File tree

1 file changed

+42
-31
lines changed

1 file changed

+42
-31
lines changed

tokio/src/sync/broadcast.rs

+42-31
Original file line numberDiff line numberDiff line change
@@ -444,42 +444,14 @@ const MAX_RECEIVERS: usize = usize::MAX >> 2;
444444
/// This will panic if `capacity` is equal to `0` or larger
445445
/// than `usize::MAX / 2`.
446446
#[track_caller]
447-
pub fn channel<T: Clone>(mut capacity: usize) -> (Sender<T>, Receiver<T>) {
448-
assert!(capacity > 0, "capacity is empty");
449-
assert!(capacity <= usize::MAX >> 1, "requested capacity too large");
450-
451-
// Round to a power of two
452-
capacity = capacity.next_power_of_two();
453-
454-
let mut buffer = Vec::with_capacity(capacity);
455-
456-
for i in 0..capacity {
457-
buffer.push(RwLock::new(Slot {
458-
rem: AtomicUsize::new(0),
459-
pos: (i as u64).wrapping_sub(capacity as u64),
460-
val: UnsafeCell::new(None),
461-
}));
462-
}
463-
464-
let shared = Arc::new(Shared {
465-
buffer: buffer.into_boxed_slice(),
466-
mask: capacity - 1,
467-
tail: Mutex::new(Tail {
468-
pos: 0,
469-
rx_cnt: 1,
470-
closed: false,
471-
waiters: LinkedList::new(),
472-
}),
473-
num_tx: AtomicUsize::new(1),
474-
});
447+
pub fn channel<T: Clone>(capacity: usize) -> (Sender<T>, Receiver<T>) {
448+
let tx = Sender::new(capacity);
475449

476450
let rx = Receiver {
477-
shared: shared.clone(),
451+
shared: tx.shared.clone(),
478452
next: 0,
479453
};
480454

481-
let tx = Sender { shared };
482-
483455
(tx, rx)
484456
}
485457

@@ -490,6 +462,45 @@ unsafe impl<T: Send> Send for Receiver<T> {}
490462
unsafe impl<T: Send> Sync for Receiver<T> {}
491463

492464
impl<T> Sender<T> {
465+
/// Creates the sending-half of the [`broadcast`] channel.
466+
///
467+
/// See documentation of [`broadcast::channel`] for errors when calling this function.
468+
///
469+
/// [`broadcast`]: crate::sync::broadcast
470+
/// [`broadcast::channel`]: crate::sync::broadcast
471+
#[track_caller]
472+
pub fn new(mut capacity: usize) -> Self {
473+
assert!(capacity > 0, "capacity is empty");
474+
assert!(capacity <= usize::MAX >> 1, "requested capacity too large");
475+
476+
// Round to a power of two
477+
capacity = capacity.next_power_of_two();
478+
479+
let mut buffer = Vec::with_capacity(capacity);
480+
481+
for i in 0..capacity {
482+
buffer.push(RwLock::new(Slot {
483+
rem: AtomicUsize::new(0),
484+
pos: (i as u64).wrapping_sub(capacity as u64),
485+
val: UnsafeCell::new(None),
486+
}));
487+
}
488+
489+
let shared = Arc::new(Shared {
490+
buffer: buffer.into_boxed_slice(),
491+
mask: capacity - 1,
492+
tail: Mutex::new(Tail {
493+
pos: 0,
494+
rx_cnt: 1,
495+
closed: false,
496+
waiters: LinkedList::new(),
497+
}),
498+
num_tx: AtomicUsize::new(1),
499+
});
500+
501+
Sender { shared }
502+
}
503+
493504
/// Attempts to send a value to all active [`Receiver`] handles, returning
494505
/// it back if it could not be sent.
495506
///

0 commit comments

Comments
 (0)