Skip to content

Commit

Permalink
Merge branch 'master' into file-options
Browse files Browse the repository at this point in the history
  • Loading branch information
HyeonuPark authored Jul 16, 2023
2 parents 0435d38 + 6166e9b commit 2851e05
Show file tree
Hide file tree
Showing 3 changed files with 128 additions and 47 deletions.
12 changes: 12 additions & 0 deletions tokio/src/macros/cfg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,18 @@ macro_rules! cfg_windows {
}
}

/// Enables unstable Windows-specific code.
/// Use this macro instead of `cfg(windows)` to generate docs properly.
macro_rules! cfg_unstable_windows {
($($item:item)*) => {
$(
#[cfg(all(any(all(doc, docsrs), windows), tokio_unstable))]
#[cfg_attr(docsrs, doc(cfg(all(windows, tokio_unstable))))]
$item
)*
}
}

/// Enables enter::block_on.
macro_rules! cfg_block_on {
($($item:item)*) => {
Expand Down
29 changes: 15 additions & 14 deletions tokio/src/process/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -400,20 +400,21 @@ impl Command {
self
}

/// Append literal text to the command line without any quoting or escaping.
///
/// This is useful for passing arguments to `cmd.exe /c`, which doesn't follow
/// `CommandLineToArgvW` escaping rules.
///
/// **Note**: This is an [unstable API][unstable] but will be stabilised once
/// tokio's MSRV is sufficiently new. See [the documentation on
/// unstable features][unstable] for details about using unstable features.
#[cfg(windows)]
#[cfg(tokio_unstable)]
#[cfg_attr(docsrs, doc(cfg(all(windows, tokio_unstable))))]
pub fn raw_arg<S: AsRef<OsStr>>(&mut self, text_to_append_as_is: S) -> &mut Command {
self.std.raw_arg(text_to_append_as_is);
self
cfg_unstable_windows! {
/// Append literal text to the command line without any quoting or escaping.
///
/// This is useful for passing arguments to `cmd.exe /c`, which doesn't follow
/// `CommandLineToArgvW` escaping rules.
///
/// **Note**: This is an [unstable API][unstable] but will be stabilised once
/// tokio's MSRV is sufficiently new. See [the documentation on
/// unstable features][unstable] for details about using unstable features.
///
/// [unstable]: crate#unstable-features
pub fn raw_arg<S: AsRef<OsStr>>(&mut self, text_to_append_as_is: S) -> &mut Command {
self.std.raw_arg(text_to_append_as_is);
self
}
}

/// Inserts or updates an environment variable mapping.
Expand Down
134 changes: 101 additions & 33 deletions tokio/src/sync/broadcast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -444,42 +444,13 @@ const MAX_RECEIVERS: usize = usize::MAX >> 2;
/// This will panic if `capacity` is equal to `0` or larger
/// than `usize::MAX / 2`.
#[track_caller]
pub fn channel<T: Clone>(mut capacity: usize) -> (Sender<T>, Receiver<T>) {
assert!(capacity > 0, "capacity is empty");
assert!(capacity <= usize::MAX >> 1, "requested capacity too large");

// Round to a power of two
capacity = capacity.next_power_of_two();

let mut buffer = Vec::with_capacity(capacity);

for i in 0..capacity {
buffer.push(RwLock::new(Slot {
rem: AtomicUsize::new(0),
pos: (i as u64).wrapping_sub(capacity as u64),
val: UnsafeCell::new(None),
}));
}

let shared = Arc::new(Shared {
buffer: buffer.into_boxed_slice(),
mask: capacity - 1,
tail: Mutex::new(Tail {
pos: 0,
rx_cnt: 1,
closed: false,
waiters: LinkedList::new(),
}),
num_tx: AtomicUsize::new(1),
});

pub fn channel<T: Clone>(capacity: usize) -> (Sender<T>, Receiver<T>) {
// SAFETY: In the line below we are creating one extra receiver, so there will be 1 in total.
let tx = unsafe { Sender::new_with_receiver_count(1, capacity) };
let rx = Receiver {
shared: shared.clone(),
shared: tx.shared.clone(),
next: 0,
};

let tx = Sender { shared };

(tx, rx)
}

Expand All @@ -490,6 +461,65 @@ unsafe impl<T: Send> Send for Receiver<T> {}
unsafe impl<T: Send> Sync for Receiver<T> {}

impl<T> Sender<T> {
/// Creates the sending-half of the [`broadcast`] channel.
///
/// See documentation of [`broadcast::channel`] for errors when calling this function.
///
/// [`broadcast`]: crate::sync::broadcast
/// [`broadcast::channel`]: crate::sync::broadcast
#[track_caller]
pub fn new(capacity: usize) -> Self {
// SAFETY: We don't create extra receivers, so there are 0.
unsafe { Self::new_with_receiver_count(0, capacity) }
}

/// Creates the sending-half of the [`broadcast`](self) channel, and provide the receiver
/// count.
///
/// See the documentation of [`broadcast::channel`](self::channel) for more errors when
/// calling this function.
///
/// # Safety:
///
/// The caller must ensure that the amount of receivers for this Sender is correct before
/// the channel functionalities are used, the count is zero by default, as this function
/// does not create any receivers by itself.
#[track_caller]
unsafe fn new_with_receiver_count(receiver_count: usize, mut capacity: usize) -> Self {
assert!(capacity > 0, "broadcast channel capacity cannot be zero");
assert!(
capacity <= usize::MAX >> 1,
"broadcast channel capacity exceeded `usize::MAX / 2`"
);

// Round to a power of two
capacity = capacity.next_power_of_two();

let mut buffer = Vec::with_capacity(capacity);

for i in 0..capacity {
buffer.push(RwLock::new(Slot {
rem: AtomicUsize::new(0),
pos: (i as u64).wrapping_sub(capacity as u64),
val: UnsafeCell::new(None),
}));
}

let shared = Arc::new(Shared {
buffer: buffer.into_boxed_slice(),
mask: capacity - 1,
tail: Mutex::new(Tail {
pos: 0,
rx_cnt: receiver_count,
closed: false,
waiters: LinkedList::new(),
}),
num_tx: AtomicUsize::new(1),
});

Sender { shared }
}

/// Attempts to send a value to all active [`Receiver`] handles, returning
/// it back if it could not be sent.
///
Expand Down Expand Up @@ -1370,3 +1400,41 @@ impl<'a, T> Drop for RecvGuard<'a, T> {
}

fn is_unpin<T: Unpin>() {}

#[cfg(not(loom))]
#[cfg(test)]
mod tests {
use super::*;

#[test]
fn receiver_count_on_sender_constructor() {
let sender = Sender::<i32>::new(16);
assert_eq!(sender.receiver_count(), 0);

let rx_1 = sender.subscribe();
assert_eq!(sender.receiver_count(), 1);

let rx_2 = rx_1.resubscribe();
assert_eq!(sender.receiver_count(), 2);

let rx_3 = sender.subscribe();
assert_eq!(sender.receiver_count(), 3);

drop(rx_3);
drop(rx_1);
assert_eq!(sender.receiver_count(), 1);

drop(rx_2);
assert_eq!(sender.receiver_count(), 0);
}

#[cfg(not(loom))]
#[test]
fn receiver_count_on_channel_constructor() {
let (sender, rx) = channel::<i32>(16);
assert_eq!(sender.receiver_count(), 1);

let _rx_2 = rx.resubscribe();
assert_eq!(sender.receiver_count(), 2);
}
}

0 comments on commit 2851e05

Please sign in to comment.