Skip to content

Commit

Permalink
channel: Create list::Block directly on the heap
Browse files Browse the repository at this point in the history
The list channel's `Block::new` was causing a stack overflow because it
held 32 item slots, instantiated on the stack before moving to
`Box::new`. The 32x multiplier made modestly-large item sizes untenable.

That block is now initialized directly on the heap.

References from the `std` channel implementation:
* rust-lang/rust#102246
* rust-lang/rust#132738
  • Loading branch information
cuviper committed Nov 7, 2024
1 parent 10f0296 commit 9be9131
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 12 deletions.
25 changes: 13 additions & 12 deletions crossbeam-channel/src/flavors/list.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
//! Unbounded channel implemented as a linked list.
use std::alloc::{alloc_zeroed, Layout};
use std::boxed::Box;
use std::cell::UnsafeCell;
use std::marker::PhantomData;
Expand Down Expand Up @@ -50,11 +51,6 @@ struct Slot<T> {
}

impl<T> Slot<T> {
const UNINIT: Self = Self {
msg: UnsafeCell::new(MaybeUninit::uninit()),
state: AtomicUsize::new(0),
};

/// Waits until a message is written into the slot.
fn wait_write(&self) {
let backoff = Backoff::new();
Expand All @@ -77,11 +73,16 @@ struct Block<T> {

impl<T> Block<T> {
/// Creates an empty block.
fn new() -> Self {
Self {
next: AtomicPtr::new(ptr::null_mut()),
slots: [Slot::UNINIT; BLOCK_CAP],
}
fn new() -> Box<Self> {
// SAFETY: This is safe because:
// [1] `Block::next` (AtomicPtr) may be safely zero initialized.
// [2] `Block::slots` (Array) may be safely zero initialized because of [3, 4].
// [3] `Slot::msg` (UnsafeCell) may be safely zero initialized because it
// holds a MaybeUninit.
// [4] `Slot::state` (AtomicUsize) may be safely zero initialized.
// TODO: unsafe { Box::new_zeroed().assume_init() }
let layout = Layout::new::<Self>();
unsafe { Box::from_raw(alloc_zeroed(layout).cast()) }
}

/// Waits until the next pointer is set.
Expand Down Expand Up @@ -223,13 +224,13 @@ impl<T> Channel<T> {
// If we're going to have to install the next block, allocate it in advance in order to
// make the wait for other threads as short as possible.
if offset + 1 == BLOCK_CAP && next_block.is_none() {
next_block = Some(Box::new(Block::<T>::new()));
next_block = Some(Block::<T>::new());
}

// If this is the first message to be sent into the channel, we need to allocate the
// first block and install it.
if block.is_null() {
let new = Box::into_raw(Box::new(Block::<T>::new()));
let new = Box::into_raw(Block::<T>::new());

if self
.tail
Expand Down
15 changes: 15 additions & 0 deletions crossbeam-channel/tests/list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -580,3 +580,18 @@ fn channel_through_channel() {
})
.unwrap();
}

// If `Block` is created on the stack, the array of slots will multiply this `BigStruct` and
// probably overflow the thread stack. It's now directly created on the heap to avoid this.
#[test]
fn stack_overflow() {
const N: usize = 32_768;
struct BigStruct {
_data: [u8; N],
}

let (sender, receiver) = unbounded::<BigStruct>();
sender.send(BigStruct { _data: [0u8; N] }).unwrap();

for _data in receiver.try_iter() {}
}

0 comments on commit 9be9131

Please sign in to comment.