From 6b528c34971b48eb6901c17d8cd1fc5ece3cb7d8 Mon Sep 17 00:00:00 2001 From: driftluo Date: Wed, 26 Jun 2024 11:06:35 +0800 Subject: [PATCH 1/3] fix: port fix discard all message on receiver droped --- crossbeam-channel/src/channel.rs | 4 +- crossbeam-channel/src/flavors/array.rs | 128 ++++++++++++++++--------- crossbeam-channel/tests/array.rs | 12 +++ 3 files changed, 99 insertions(+), 45 deletions(-) diff --git a/crossbeam-channel/src/channel.rs b/crossbeam-channel/src/channel.rs index 5447e3303..265cbf0d2 100644 --- a/crossbeam-channel/src/channel.rs +++ b/crossbeam-channel/src/channel.rs @@ -662,7 +662,7 @@ impl Drop for Sender { fn drop(&mut self) { unsafe { match &self.flavor { - SenderFlavor::Array(chan) => chan.release(|c| c.disconnect()), + SenderFlavor::Array(chan) => chan.release(|c| c.disconnect_senders()), SenderFlavor::List(chan) => chan.release(|c| c.disconnect_senders()), SenderFlavor::Zero(chan) => chan.release(|c| c.disconnect()), } @@ -1159,7 +1159,7 @@ impl Drop for Receiver { fn drop(&mut self) { unsafe { match &self.flavor { - ReceiverFlavor::Array(chan) => chan.release(|c| c.disconnect()), + ReceiverFlavor::Array(chan) => chan.release(|c| c.disconnect_receivers()), ReceiverFlavor::List(chan) => chan.release(|c| c.disconnect_receivers()), ReceiverFlavor::Zero(chan) => chan.release(|c| c.disconnect()), ReceiverFlavor::At(_) => {} diff --git a/crossbeam-channel/src/flavors/array.rs b/crossbeam-channel/src/flavors/array.rs index 206a05a86..872d3f8f1 100644 --- a/crossbeam-channel/src/flavors/array.rs +++ b/crossbeam-channel/src/flavors/array.rs @@ -10,7 +10,7 @@ use std::boxed::Box; use std::cell::UnsafeCell; -use std::mem::{self, MaybeUninit}; +use std::mem::MaybeUninit; use std::ptr; use std::sync::atomic::{self, AtomicUsize, Ordering}; use std::time::Instant; @@ -476,14 +476,13 @@ impl Channel { Some(self.cap()) } - /// Disconnects the channel and wakes up all blocked senders and receivers. + /// Disconnects senders and wakes up all blocked senders and receivers. /// /// Returns `true` if this call disconnected the channel. - pub(crate) fn disconnect(&self) -> bool { + pub(crate) fn disconnect_senders(&self) -> bool { let tail = self.tail.fetch_or(self.mark_bit, Ordering::SeqCst); if tail & self.mark_bit == 0 { - self.senders.disconnect(); self.receivers.disconnect(); true } else { @@ -491,6 +490,88 @@ impl Channel { } } + /// Disconnects receivers and wakes up all blocked senders. + /// + /// Returns `true` if this call disconnected the channel. + /// + /// # Safety + /// May only be called once upon dropping the last receiver. The + /// destruction of all other receivers must have been observed with acquire + /// ordering or stronger. + pub(crate) unsafe fn disconnect_receivers(&self) -> bool { + let tail = self.tail.fetch_or(self.mark_bit, Ordering::SeqCst); + let disconnected = if tail & self.mark_bit == 0 { + self.senders.disconnect(); + true + } else { + false + }; + + unsafe { + self.discard_all_messages(tail); + } + + disconnected + } + + /// Discards all messages. + /// + /// `tail` should be the current (and therefore last) value of `tail`. + /// + /// # Panicking + /// If a destructor panics, the remaining messages are leaked, matching the + /// behaviour of the unbounded channel. + /// + /// # Safety + /// This method must only be called when dropping the last receiver. The + /// destruction of all other receivers must have been observed with acquire + /// ordering or stronger. + unsafe fn discard_all_messages(&self, tail: usize) { + debug_assert!(self.is_disconnected()); + + // Only receivers modify `head`, so since we are the last one, + // this value will not change and will not be observed (since + // no new messages can be sent after disconnection). + let mut head = self.head.load(Ordering::Relaxed); + let tail = tail & !self.mark_bit; + + let backoff = Backoff::new(); + loop { + // Deconstruct the head. + let index = head & (self.mark_bit - 1); + let lap = head & !(self.one_lap - 1); + + // Inspect the corresponding slot. + debug_assert!(index < self.buffer.len()); + let slot = unsafe { self.buffer.get_unchecked(index) }; + let stamp = slot.stamp.load(Ordering::Acquire); + + // If the stamp is ahead of the head by 1, we may drop the message. + if head + 1 == stamp { + head = if index + 1 < self.cap() { + // Same lap, incremented index. + // Set to `{ lap: lap, mark: 0, index: index + 1 }`. + head + 1 + } else { + // One lap forward, index wraps around to zero. + // Set to `{ lap: lap.wrapping_add(1), mark: 0, index: 0 }`. + lap.wrapping_add(self.one_lap) + }; + + unsafe { + (*slot.msg.get()).assume_init_drop(); + } + // If the tail equals the head, that means the channel is empty. + } else if tail == head { + return; + // Otherwise, a sender is about to write into the slot, so we need + // to wait for it to update the stamp. + } else { + backoff.spin(); + } + } + } + /// Returns `true` if the channel is disconnected. pub(crate) fn is_disconnected(&self) -> bool { self.tail.load(Ordering::SeqCst) & self.mark_bit != 0 @@ -521,45 +602,6 @@ impl Channel { } } -impl Drop for Channel { - fn drop(&mut self) { - if mem::needs_drop::() { - // Get the index of the head. - let head = *self.head.get_mut(); - let tail = *self.tail.get_mut(); - - let hix = head & (self.mark_bit - 1); - let tix = tail & (self.mark_bit - 1); - - let len = if hix < tix { - tix - hix - } else if hix > tix { - self.cap() - hix + tix - } else if (tail & !self.mark_bit) == head { - 0 - } else { - self.cap() - }; - - // Loop over all slots that hold a message and drop them. - for i in 0..len { - // Compute the index of the next slot holding a message. - let index = if hix + i < self.cap() { - hix + i - } else { - hix + i - self.cap() - }; - - unsafe { - debug_assert!(index < self.buffer.len()); - let slot = self.buffer.get_unchecked_mut(index); - (*slot.msg.get()).assume_init_drop(); - } - } - } - } -} - /// Receiver handle to a channel. pub(crate) struct Receiver<'a, T>(&'a Channel); diff --git a/crossbeam-channel/tests/array.rs b/crossbeam-channel/tests/array.rs index 486f56a78..cc349a608 100644 --- a/crossbeam-channel/tests/array.rs +++ b/crossbeam-channel/tests/array.rs @@ -742,3 +742,15 @@ fn panic_on_drop() { // Elements after the panicked element will leak. assert!(!b); } + +#[test] +fn drop_unreceived() { + let (tx, rx) = bounded::>(1); + let msg = std::rc::Rc::new(()); + let weak = std::rc::Rc::downgrade(&msg); + assert!(tx.send(msg).is_ok()); + drop(rx); + // Messages should be dropped immediately when the last receiver is destroyed. + assert!(weak.upgrade().is_none()); + drop(tx); +} From 77f38c2349370011dfc82a46169dd435dcba3d50 Mon Sep 17 00:00:00 2001 From: Taiki Endo Date: Sat, 25 Jan 2025 15:10:39 +0900 Subject: [PATCH 2/3] ci: Use ubuntu-22.04-arm instead of ubuntu-24.04-arm https://github.com/rust-lang/rust/issues/135867 --- .github/workflows/ci.yml | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 3b193cd3a..f44d0193c 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -43,26 +43,26 @@ jobs: - rust: '1.63' os: ubuntu-latest - rust: '1.63' - os: ubuntu-24.04-arm + os: ubuntu-22.04-arm # TODO: update to 24.04 when https://github.com/rust-lang/rust/issues/135867 solved - rust: '1.63' os: windows-latest - rust: stable os: ubuntu-latest - rust: stable - os: ubuntu-24.04-arm + os: ubuntu-22.04-arm # TODO: update to 24.04 when https://github.com/rust-lang/rust/issues/135867 solved - rust: stable os: windows-latest - rust: nightly os: ubuntu-latest - rust: nightly - os: ubuntu-24.04-arm + os: ubuntu-22.04-arm # TODO: update to 24.04 when https://github.com/rust-lang/rust/issues/135867 solved - rust: nightly os: windows-latest - rust: nightly os: ubuntu-latest target: i686-unknown-linux-gnu - rust: nightly - os: ubuntu-24.04-arm + os: ubuntu-22.04-arm # TODO: update to 24.04 when https://github.com/rust-lang/rust/issues/135867 solved target: armv7-unknown-linux-gnueabihf # Test 32-bit target that does not have AtomicU64/AtomicI64. - rust: nightly From 2de82b853eae6560c18a90d142fac9b3825a8d69 Mon Sep 17 00:00:00 2001 From: driftluo Date: Fri, 24 Jan 2025 14:12:36 +0800 Subject: [PATCH 3/3] ci: add MIRI_LEAK_CHECK=1 on ci --- ci/miri.sh | 3 +++ crossbeam-channel/tests/array.rs | 2 ++ 2 files changed, 5 insertions(+) diff --git a/ci/miri.sh b/ci/miri.sh index fa6054821..7358c9053 100755 --- a/ci/miri.sh +++ b/ci/miri.sh @@ -15,10 +15,13 @@ export MIRIFLAGS="${MIRIFLAGS:-} -Zmiri-strict-provenance -Zmiri-symbolic-alignm case "${group}" in channel) + # -Zmiri-ignore-leaks is needed because we use detached threads in tests in tests/array.rs: panic_on_drop MIRI_LEAK_CHECK='1' \ + MIRIFLAGS="${MIRIFLAGS} -Zmiri-ignore-leaks" \ cargo miri test --all-features \ -p crossbeam-channel 2>&1 | ts -i '%.s ' # -Zmiri-ignore-leaks is needed because we use detached threads in tests in tests/golang.rs: https://github.com/rust-lang/miri/issues/1371 + MIRI_LEAK_CHECK='1' \ MIRIFLAGS="${MIRIFLAGS} -Zmiri-ignore-leaks" \ cargo miri test --all-features \ -p crossbeam-channel --test golang 2>&1 | ts -i '%.s ' diff --git a/crossbeam-channel/tests/array.rs b/crossbeam-channel/tests/array.rs index cc349a608..3ec50a8f6 100644 --- a/crossbeam-channel/tests/array.rs +++ b/crossbeam-channel/tests/array.rs @@ -690,6 +690,8 @@ fn channel_through_channel() { .unwrap(); } +// On the current implementation, panic on drop will cause memory leaks +#[cfg_attr(crossbeam_sanitize, ignore)] #[test] fn panic_on_drop() { struct Msg1<'a>(&'a mut bool);