Skip to content

Commit

Permalink
sync::mpsc: synchronize receiver disconnect with initialization
Browse files Browse the repository at this point in the history
Receiver disconnection relies on the incorrect assumption that
`head.index != tail.index` implies that the channel is initialized (i.e
`head.block` and `tail.block` point to allocated blocks). However, it
can happen that `head.index != tail.index` and `head.block == null` at
the same time which leads to a segfault when a channel is dropped in
that state.

This can happen because initialization is performed in two steps. First,
the tail block is allocated and the `tail.block` is set. If that is
successful `head.block` is set to the same pointer. Importantly,
initialization is skipped if `tail.block` is not null.

Therefore we can have the following situation:

1. Thread A starts to send the first value of the channel, observes that
   `tail.block` is null and begins initialization. It sets `tail.block`
   to point to a newly allocated block and then gets preempted.
   `head.block` is still null at this point.
2. Thread B starts to send the second value of the channel, observes
   that `tail.block` *is not* null and proceeds with writing its value
   in the allocated tail block and sets `tail.index` to 1.
3. Thread B drops the receiver of the channel which observes that
   `head.index != tail.index` (0 and 1 respectively), therefore there
   must be messages to drop. It starts traversing the linked list from
   `head.block` which is still a null pointer, leading to a segfault.

This PR fixes this problem by waiting for initialization to complete
when `head.index != tail.index` and the `head.block` is still null. A
similar check exists in `start_recv` for similar reasons.

Fixes #110001

Signed-off-by: Petros Angelatos <[email protected]>
  • Loading branch information
petrosagg committed Apr 8, 2023
1 parent 4f87a63 commit f0d487d
Showing 1 changed file with 12 additions and 0 deletions.
12 changes: 12 additions & 0 deletions library/std/src/sync/mpmc/list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -549,6 +549,18 @@ impl<T> Channel<T> {
let mut head = self.head.index.load(Ordering::Acquire);
let mut block = self.head.block.load(Ordering::Acquire);

// If we're going to be dropping messages we need to synchronize with initialization
if head >> SHIFT != tail >> SHIFT {
// The block can be null here only if a sender is in the process of initializing the
// channel while another sender managed to send a message by inserting it into the
// semi-initialized channel and advanced the tail.
// In that case, just wait until it gets initialized.
while block.is_null() {
backoff.spin_heavy();
block = self.head.block.load(Ordering::Acquire);
}
}

unsafe {
// Drop all messages between head and tail and deallocate the heap-allocated blocks.
while head >> SHIFT != tail >> SHIFT {
Expand Down

0 comments on commit f0d487d

Please sign in to comment.