From 331102bd542eedfba4fe4d609b4a5cb767a7ffea Mon Sep 17 00:00:00 2001 From: driftluo Date: Wed, 26 Jun 2024 11:06:35 +0800 Subject: [PATCH] fix: port fix discard all message on receiver droped --- crossbeam-channel/src/channel.rs | 4 +- crossbeam-channel/src/flavors/array.rs | 123 ++++++++++++++++--------- crossbeam-channel/tests/array.rs | 12 +++ 3 files changed, 95 insertions(+), 44 deletions(-) diff --git a/crossbeam-channel/src/channel.rs b/crossbeam-channel/src/channel.rs index 5447e3303..265cbf0d2 100644 --- a/crossbeam-channel/src/channel.rs +++ b/crossbeam-channel/src/channel.rs @@ -662,7 +662,7 @@ impl Drop for Sender { fn drop(&mut self) { unsafe { match &self.flavor { - SenderFlavor::Array(chan) => chan.release(|c| c.disconnect()), + SenderFlavor::Array(chan) => chan.release(|c| c.disconnect_senders()), SenderFlavor::List(chan) => chan.release(|c| c.disconnect_senders()), SenderFlavor::Zero(chan) => chan.release(|c| c.disconnect()), } @@ -1159,7 +1159,7 @@ impl Drop for Receiver { fn drop(&mut self) { unsafe { match &self.flavor { - ReceiverFlavor::Array(chan) => chan.release(|c| c.disconnect()), + ReceiverFlavor::Array(chan) => chan.release(|c| c.disconnect_receivers()), ReceiverFlavor::List(chan) => chan.release(|c| c.disconnect_receivers()), ReceiverFlavor::Zero(chan) => chan.release(|c| c.disconnect()), ReceiverFlavor::At(_) => {} diff --git a/crossbeam-channel/src/flavors/array.rs b/crossbeam-channel/src/flavors/array.rs index 206a05a86..c116c9284 100644 --- a/crossbeam-channel/src/flavors/array.rs +++ b/crossbeam-channel/src/flavors/array.rs @@ -476,14 +476,13 @@ impl Channel { Some(self.cap()) } - /// Disconnects the channel and wakes up all blocked senders and receivers. + /// Disconnects senders and wakes up all blocked senders and receivers. /// /// Returns `true` if this call disconnected the channel. - pub(crate) fn disconnect(&self) -> bool { + pub(crate) fn disconnect_senders(&self) -> bool { let tail = self.tail.fetch_or(self.mark_bit, Ordering::SeqCst); if tail & self.mark_bit == 0 { - self.senders.disconnect(); self.receivers.disconnect(); true } else { @@ -491,6 +490,85 @@ impl Channel { } } + /// Disconnects receivers and wakes up all blocked senders. + /// + /// Returns `true` if this call disconnected the channel. + /// + /// # Safety + /// May only be called once upon dropping the last receiver. The + /// destruction of all other receivers must have been observed with acquire + /// ordering or stronger. + pub(crate) unsafe fn disconnect_receivers(&self) -> bool { + let tail = self.tail.fetch_or(self.mark_bit, Ordering::SeqCst); + let disconnected = if tail & self.mark_bit == 0 { + self.senders.disconnect(); + true + } else { + false + }; + + self.discard_all_messages(tail); + disconnected + } + + /// Discards all messages. + /// + /// `tail` should be the current (and therefore last) value of `tail`. + /// + /// # Panicking + /// If a destructor panics, the remaining messages are leaked, matching the + /// behaviour of the unbounded channel. + /// + /// # Safety + /// This method must only be called when dropping the last receiver. The + /// destruction of all other receivers must have been observed with acquire + /// ordering or stronger. + unsafe fn discard_all_messages(&self, tail: usize) { + debug_assert!(self.is_disconnected()); + + // Only receivers modify `head`, so since we are the last one, + // this value will not change and will not be observed (since + // no new messages can be sent after disconnection). + let mut head = self.head.load(Ordering::Relaxed); + let tail = tail & !self.mark_bit; + + let backoff = Backoff::new(); + loop { + // Deconstruct the head. + let index = head & (self.mark_bit - 1); + let lap = head & !(self.one_lap - 1); + + // Inspect the corresponding slot. + debug_assert!(index < self.buffer.len()); + let slot = unsafe { self.buffer.get_unchecked(index) }; + let stamp = slot.stamp.load(Ordering::Acquire); + + // If the stamp is ahead of the head by 1, we may drop the message. + if head + 1 == stamp { + head = if index + 1 < self.cap() { + // Same lap, incremented index. + // Set to `{ lap: lap, mark: 0, index: index + 1 }`. + head + 1 + } else { + // One lap forward, index wraps around to zero. + // Set to `{ lap: lap.wrapping_add(1), mark: 0, index: 0 }`. + lap.wrapping_add(self.one_lap) + }; + + unsafe { + (*slot.msg.get()).assume_init_drop(); + } + // If the tail equals the head, that means the channel is empty. + } else if tail == head { + return; + // Otherwise, a sender is about to write into the slot, so we need + // to wait for it to update the stamp. + } else { + backoff.spin(); + } + } + } + /// Returns `true` if the channel is disconnected. pub(crate) fn is_disconnected(&self) -> bool { self.tail.load(Ordering::SeqCst) & self.mark_bit != 0 @@ -521,45 +599,6 @@ impl Channel { } } -impl Drop for Channel { - fn drop(&mut self) { - if mem::needs_drop::() { - // Get the index of the head. - let head = *self.head.get_mut(); - let tail = *self.tail.get_mut(); - - let hix = head & (self.mark_bit - 1); - let tix = tail & (self.mark_bit - 1); - - let len = if hix < tix { - tix - hix - } else if hix > tix { - self.cap() - hix + tix - } else if (tail & !self.mark_bit) == head { - 0 - } else { - self.cap() - }; - - // Loop over all slots that hold a message and drop them. - for i in 0..len { - // Compute the index of the next slot holding a message. - let index = if hix + i < self.cap() { - hix + i - } else { - hix + i - self.cap() - }; - - unsafe { - debug_assert!(index < self.buffer.len()); - let slot = self.buffer.get_unchecked_mut(index); - (*slot.msg.get()).assume_init_drop(); - } - } - } - } -} - /// Receiver handle to a channel. pub(crate) struct Receiver<'a, T>(&'a Channel); diff --git a/crossbeam-channel/tests/array.rs b/crossbeam-channel/tests/array.rs index 486f56a78..cc349a608 100644 --- a/crossbeam-channel/tests/array.rs +++ b/crossbeam-channel/tests/array.rs @@ -742,3 +742,15 @@ fn panic_on_drop() { // Elements after the panicked element will leak. assert!(!b); } + +#[test] +fn drop_unreceived() { + let (tx, rx) = bounded::>(1); + let msg = std::rc::Rc::new(()); + let weak = std::rc::Rc::downgrade(&msg); + assert!(tx.send(msg).is_ok()); + drop(rx); + // Messages should be dropped immediately when the last receiver is destroyed. + assert!(weak.upgrade().is_none()); + drop(tx); +}