Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 18 additions & 3 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -475,10 +475,18 @@ jobs:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Check semver
- name: Check `tokio` semver
uses: obi1kenobi/cargo-semver-checks-action@v2
with:
rust-toolchain: ${{ env.rust_stable }}
package: tokio
release-type: minor
- name: Check semver for rest of the workspace
if: ${{ !startsWith(github.event.pull_request.base.ref, 'tokio-1.') }}
uses: obi1kenobi/cargo-semver-checks-action@v2
with:
rust-toolchain: ${{ env.rust_stable }}
exclude: tokio
release-type: minor

cross-check:
Expand Down Expand Up @@ -718,7 +726,14 @@ jobs:
toolchain: ${{ env.rust_min }}
- uses: Swatinem/rust-cache@v2
- name: "check --workspace --all-features"
run: cargo check --workspace --all-features
run: |
if [[ "${{ github.event.pull_request.base.ref }}" =~ ^tokio-1\..* ]]; then
# Only check `tokio` crate as the PR is backporting to an earlier tokio release.
cargo check -p tokio --all-features
else
# Check all crates in the workspace
cargo check --workspace --all-features
fi
env:
RUSTFLAGS: "" # remove -Dwarnings

Expand Down Expand Up @@ -1014,7 +1029,7 @@ jobs:
targets: ${{ matrix.target }}

# Install dependencies
- name: Install cargo-hack, wasmtime, and cargo-wasi
- name: Install cargo-hack, wasmtime
uses: taiki-e/install-action@v2
with:
tool: cargo-hack,wasmtime
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ Make sure you activated the full features of the tokio crate on Cargo.toml:

```toml
[dependencies]
tokio = { version = "1.44.1", features = ["full"] }
tokio = { version = "1.44.2", features = ["full"] }
```
Then, on your main.rs:

Expand Down
42 changes: 42 additions & 0 deletions tokio/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,17 @@
# 1.44.2 (April 4, 2025)

This release fixes a soundness issue in the broadcast channel. The channel
accepts values that are `Send` but `!Sync`. Previously, the channel called
`clone()` on these values without synchronizing. This release fixes the channel
by synchronizing calls to `.clone()` (Thanks Austin Bonander for finding and
reporting the issue).

### Fixed

- sync: synchronize `clone()` call in broadcast channel ([#7232])

[#7232]: https://github.com/tokio-rs/tokio/pull/7232

# 1.44.1 (March 13th, 2025)

### Fixed
Expand Down Expand Up @@ -85,6 +99,20 @@ comment on [#7172].
[#7186]: https://github.com/tokio-rs/tokio/pull/7186
[#7192]: https://github.com/tokio-rs/tokio/pull/7192

# 1.43.1 (April 2nd, 2025)

This release fixes a soundness issue in the broadcast channel. The channel
accepts values that are `Send` but `!Sync`. Previously, the channel called
`clone()` on these values without synchronizing. This release fixes the channel
by synchronizing calls to `.clone()` (Thanks Austin Bonander for finding and
reporting the issue).

### Fixed

- sync: synchronize `clone()` call in broadcast channel ([#7232])

[#7232]: https://github.com/tokio-rs/tokio/pull/7232

# 1.43.0 (Jan 8th, 2025)

### Added
Expand Down Expand Up @@ -390,6 +418,20 @@ Yanked. Please use 1.39.1 instead.
[#6709]: https://github.com/tokio-rs/tokio/pull/6709
[#6710]: https://github.com/tokio-rs/tokio/pull/6710

# 1.38.2 (April 2nd, 2025)

This release fixes a soundness issue in the broadcast channel. The channel
accepts values that are `Send` but `!Sync`. Previously, the channel called
`clone()` on these values without synchronizing. This release fixes the channel
by synchronizing calls to `.clone()` (Thanks Austin Bonander for finding and
reporting the issue).

### Fixed

- sync: synchronize `clone()` call in broadcast channel ([#7232])

[#7232]: https://github.com/tokio-rs/tokio/pull/7232

# 1.38.1 (July 16th, 2024)

This release fixes the bug identified as ([#6682]), which caused timers not
Expand Down
2 changes: 1 addition & 1 deletion tokio/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ name = "tokio"
# - README.md
# - Update CHANGELOG.md.
# - Create "v1.x.y" git tag.
version = "1.44.1"
version = "1.44.2"
edition = "2021"
rust-version = "1.70"
authors = ["Tokio Contributors <team@tokio.rs>"]
Expand Down
2 changes: 1 addition & 1 deletion tokio/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ Make sure you activated the full features of the tokio crate on Cargo.toml:

```toml
[dependencies]
tokio = { version = "1.44.1", features = ["full"] }
tokio = { version = "1.44.2", features = ["full"] }
```
Then, on your main.rs:

Expand Down
20 changes: 0 additions & 20 deletions tokio/src/runtime/tests/queue.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
use crate::runtime::scheduler::multi_thread::{queue, Stats};
use crate::runtime::task::{self, Schedule, Task, TaskHarnessScheduleHooks};

use std::cell::RefCell;
use std::thread;
Expand Down Expand Up @@ -272,22 +271,3 @@ fn stress2() {
assert_eq!(num_pop, NUM_TASKS);
}
}

#[allow(dead_code)]
struct Runtime;

impl Schedule for Runtime {
fn release(&self, _task: &Task<Self>) -> Option<Task<Self>> {
None
}

fn schedule(&self, _task: task::Notified<Self>) {
unreachable!();
}

fn hooks(&self) -> TaskHarnessScheduleHooks {
TaskHarnessScheduleHooks {
task_terminate_callback: None,
}
}
}
58 changes: 27 additions & 31 deletions tokio/src/sync/broadcast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@

use crate::loom::cell::UnsafeCell;
use crate::loom::sync::atomic::{AtomicBool, AtomicUsize};
use crate::loom::sync::{Arc, Mutex, MutexGuard, RwLock, RwLockReadGuard};
use crate::loom::sync::{Arc, Mutex, MutexGuard};
use crate::task::coop::cooperative;
use crate::util::linked_list::{self, GuardedLinkedList, LinkedList};
use crate::util::WakeList;
Expand Down Expand Up @@ -340,7 +340,7 @@ use super::Notify;
/// Data shared between senders and receivers.
struct Shared<T> {
/// slots in the channel.
buffer: Box<[RwLock<Slot<T>>]>,
buffer: Box<[Mutex<Slot<T>>]>,

/// Mask a position -> index.
mask: usize,
Expand Down Expand Up @@ -390,7 +390,7 @@ struct Slot<T> {
///
/// The value is set by `send` when the write lock is held. When a reader
/// drops, `rem` is decremented. When it hits zero, the value is dropped.
val: UnsafeCell<Option<T>>,
val: Option<T>,
}

/// An entry in the wait queue.
Expand Down Expand Up @@ -428,7 +428,7 @@ generate_addr_of_methods! {
}

struct RecvGuard<'a, T> {
slot: RwLockReadGuard<'a, Slot<T>>,
slot: MutexGuard<'a, Slot<T>>,
}

/// Receive a value future.
Expand All @@ -437,11 +437,15 @@ struct Recv<'a, T> {
receiver: &'a mut Receiver<T>,

/// Entry in the waiter `LinkedList`.
waiter: UnsafeCell<Waiter>,
waiter: WaiterCell,
}

unsafe impl<'a, T: Send> Send for Recv<'a, T> {}
unsafe impl<'a, T: Send> Sync for Recv<'a, T> {}
// The wrapper around `UnsafeCell` isolates the unsafe impl `Send` and `Sync`
// from `Recv`.
struct WaiterCell(UnsafeCell<Waiter>);

unsafe impl Send for WaiterCell {}
unsafe impl Sync for WaiterCell {}

/// Max number of receivers. Reserve space to lock.
const MAX_RECEIVERS: usize = usize::MAX >> 2;
Expand Down Expand Up @@ -509,15 +513,6 @@ pub fn channel<T: Clone>(capacity: usize) -> (Sender<T>, Receiver<T>) {
(tx, rx)
}

unsafe impl<T: Send> Send for Sender<T> {}
unsafe impl<T: Send> Sync for Sender<T> {}

unsafe impl<T: Send> Send for WeakSender<T> {}
unsafe impl<T: Send> Sync for WeakSender<T> {}

unsafe impl<T: Send> Send for Receiver<T> {}
unsafe impl<T: Send> Sync for Receiver<T> {}

impl<T> Sender<T> {
/// Creates the sending-half of the [`broadcast`] channel.
///
Expand Down Expand Up @@ -556,10 +551,10 @@ impl<T> Sender<T> {
let mut buffer = Vec::with_capacity(capacity);

for i in 0..capacity {
buffer.push(RwLock::new(Slot {
buffer.push(Mutex::new(Slot {
rem: AtomicUsize::new(0),
pos: (i as u64).wrapping_sub(capacity as u64),
val: UnsafeCell::new(None),
val: None,
}));
}

Expand Down Expand Up @@ -647,7 +642,7 @@ impl<T> Sender<T> {
tail.pos = tail.pos.wrapping_add(1);

// Get the slot
let mut slot = self.shared.buffer[idx].write();
let mut slot = self.shared.buffer[idx].lock();

// Track the position
slot.pos = pos;
Expand All @@ -656,7 +651,7 @@ impl<T> Sender<T> {
slot.rem.with_mut(|v| *v = rem);

// Write the value
slot.val = UnsafeCell::new(Some(value));
slot.val = Some(value);

// Release the slot lock before notifying the receivers.
drop(slot);
Expand Down Expand Up @@ -755,7 +750,7 @@ impl<T> Sender<T> {
while low < high {
let mid = low + (high - low) / 2;
let idx = base_idx.wrapping_add(mid) & self.shared.mask;
if self.shared.buffer[idx].read().rem.load(SeqCst) == 0 {
if self.shared.buffer[idx].lock().rem.load(SeqCst) == 0 {
low = mid + 1;
} else {
high = mid;
Expand Down Expand Up @@ -797,7 +792,7 @@ impl<T> Sender<T> {
let tail = self.shared.tail.lock();

let idx = (tail.pos.wrapping_sub(1) & self.shared.mask as u64) as usize;
self.shared.buffer[idx].read().rem.load(SeqCst) == 0
self.shared.buffer[idx].lock().rem.load(SeqCst) == 0
}

/// Returns the number of active receivers.
Expand Down Expand Up @@ -1230,7 +1225,7 @@ impl<T> Receiver<T> {
let idx = (self.next & self.shared.mask as u64) as usize;

// The slot holding the next value to read
let mut slot = self.shared.buffer[idx].read();
let mut slot = self.shared.buffer[idx].lock();

if slot.pos != self.next {
// Release the `slot` lock before attempting to acquire the `tail`
Expand All @@ -1247,7 +1242,7 @@ impl<T> Receiver<T> {
let mut tail = self.shared.tail.lock();

// Acquire slot lock again
slot = self.shared.buffer[idx].read();
slot = self.shared.buffer[idx].lock();

// Make sure the position did not change. This could happen in the
// unlikely event that the buffer is wrapped between dropping the
Expand Down Expand Up @@ -1581,12 +1576,12 @@ impl<'a, T> Recv<'a, T> {
fn new(receiver: &'a mut Receiver<T>) -> Recv<'a, T> {
Recv {
receiver,
waiter: UnsafeCell::new(Waiter {
waiter: WaiterCell(UnsafeCell::new(Waiter {
queued: AtomicBool::new(false),
waker: None,
pointers: linked_list::Pointers::new(),
_p: PhantomPinned,
}),
})),
}
}

Expand All @@ -1598,7 +1593,7 @@ impl<'a, T> Recv<'a, T> {
is_unpin::<&mut Receiver<T>>();

let me = self.get_unchecked_mut();
(me.receiver, &me.waiter)
(me.receiver, &me.waiter.0)
}
}
}
Expand Down Expand Up @@ -1632,6 +1627,7 @@ impl<'a, T> Drop for Recv<'a, T> {
// `Shared::notify_rx` before we drop the object.
let queued = self
.waiter
.0
.with(|ptr| unsafe { (*ptr).queued.load(Acquire) });

// If the waiter is queued, we need to unlink it from the waiters list.
Expand All @@ -1646,6 +1642,7 @@ impl<'a, T> Drop for Recv<'a, T> {
// `Relaxed` order suffices because we hold the tail lock.
let queued = self
.waiter
.0
.with_mut(|ptr| unsafe { (*ptr).queued.load(Relaxed) });

if queued {
Expand All @@ -1654,7 +1651,7 @@ impl<'a, T> Drop for Recv<'a, T> {
// safety: tail lock is held and the wait node is verified to be in
// the list.
unsafe {
self.waiter.with_mut(|ptr| {
self.waiter.0.with_mut(|ptr| {
tail.waiters.remove((&mut *ptr).into());
});
}
Expand Down Expand Up @@ -1706,16 +1703,15 @@ impl<'a, T> RecvGuard<'a, T> {
where
T: Clone,
{
self.slot.val.with(|ptr| unsafe { (*ptr).clone() })
self.slot.val.clone()
}
}

impl<'a, T> Drop for RecvGuard<'a, T> {
fn drop(&mut self) {
// Decrement the remaining counter
if 1 == self.slot.rem.fetch_sub(1, SeqCst) {
// Safety: Last receiver, drop the value
self.slot.val.with_mut(|ptr| unsafe { *ptr = None });
self.slot.val = None;
}
}
}
Expand Down
Loading