Skip to content

Commit

Permalink
Fix use after free of task in FuturesUnordered when dropped future pa…
Browse files Browse the repository at this point in the history
…nics (#2886)
  • Loading branch information
Imberflur authored and taiki-e committed Oct 5, 2024
1 parent 33c46b3 commit f00e7af
Show file tree
Hide file tree
Showing 4 changed files with 68 additions and 18 deletions.
10 changes: 8 additions & 2 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -246,10 +246,16 @@ jobs:
- uses: taiki-e/checkout-action@v1
- name: Install Rust
run: rustup toolchain install nightly --component miri && rustup default nightly
- run: cargo miri test --workspace --all-features
- run: cargo miri test --workspace --all-features -- --skip panic_on_drop_fut
env:
MIRIFLAGS: -Zmiri-strict-provenance -Zmiri-symbolic-alignment-check -Zmiri-disable-isolation
RUSTFLAGS: ${{ env.RUSTFLAGS }} -Z randomize-layout
# This test is expected to leak.
- run: cargo miri test --workspace --all-features --test stream_futures_unordered -- panic_on_drop_fut
env:
MIRIFLAGS: -Zmiri-strict-provenance -Zmiri-symbolic-alignment-check -Zmiri-disable-isolation -Zmiri-ignore-leaks
RUSTDOCFLAGS: ${{ env.RUSTDOCFLAGS }} -Z randomize-layout
RUSTFLAGS: ${{ env.RUSTFLAGS }} -Z randomize-layout

san:
name: cargo test -Z sanitizer=${{ matrix.sanitizer }}
Expand All @@ -268,7 +274,7 @@ jobs:
run: rustup toolchain install nightly --component rust-src && rustup default nightly
# https://github.com/google/sanitizers/issues/1716 / https://github.com/actions/runner-images/issues/9491
- run: sudo sysctl vm.mmap_rnd_bits=28
- run: cargo -Z build-std test --workspace --all-features --target x86_64-unknown-linux-gnu --lib --tests
- run: cargo -Z build-std test --workspace --all-features --target x86_64-unknown-linux-gnu --lib --tests -- --skip panic_on_drop_fut
env:
# TODO: Once `cfg(sanitize = "..")` is stable, replace
# `cfg(futures_sanitizer)` with `cfg(sanitize = "..")` and remove
Expand Down
51 changes: 35 additions & 16 deletions futures-util/src/stream/futures_unordered/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -256,16 +256,6 @@ impl<Fut> FuturesUnordered<Fut> {
// `wake` from doing any work in the future
let prev = task.queued.swap(true, SeqCst);

// Drop the future, even if it hasn't finished yet. This is safe
// because we're dropping the future on the thread that owns
// `FuturesUnordered`, which correctly tracks `Fut`'s lifetimes and
// such.
unsafe {
// Set to `None` rather than `take()`ing to prevent moving the
// future.
*task.future.get() = None;
}

// If the queued flag was previously set, then it means that this task
// is still in our internal ready to run queue. We then transfer
// ownership of our reference count to the ready to run queue, and it'll
Expand All @@ -277,8 +267,25 @@ impl<Fut> FuturesUnordered<Fut> {
// enqueue the task, so our task will never see the ready to run queue
// again. The task itself will be deallocated once all reference counts
// have been dropped elsewhere by the various wakers that contain it.
if prev {
mem::forget(task);
//
// Use ManuallyDrop to transfer the reference count ownership before
// dropping the future so unwinding won't release the reference count.
let md_slot;
let task = if prev {
md_slot = mem::ManuallyDrop::new(task);
&*md_slot
} else {
&task
};

// Drop the future, even if it hasn't finished yet. This is safe
// because we're dropping the future on the thread that owns
// `FuturesUnordered`, which correctly tracks `Fut`'s lifetimes and
// such.
unsafe {
// Set to `None` rather than `take()`ing to prevent moving the
// future.
*task.future.get() = None;
}
}

Expand Down Expand Up @@ -567,15 +574,27 @@ impl<Fut> FuturesUnordered<Fut> {

impl<Fut> Drop for FuturesUnordered<Fut> {
fn drop(&mut self) {
// Before the strong reference to the queue is dropped we need all
// futures to be dropped. See note at the bottom of this method.
//
// If there is a panic before this completes, we leak the queue.
struct LeakQueueOnDrop<'a, Fut>(&'a mut FuturesUnordered<Fut>);
impl<Fut> Drop for LeakQueueOnDrop<'_, Fut> {
fn drop(&mut self) {
mem::forget(Arc::clone(&self.0.ready_to_run_queue));
}
}
let guard = LeakQueueOnDrop(self);
// When a `FuturesUnordered` is dropped we want to drop all futures
// associated with it. At the same time though there may be tons of
// wakers flying around which contain `Task<Fut>` references
// inside them. We'll let those naturally get deallocated.
while !self.head_all.get_mut().is_null() {
let head = *self.head_all.get_mut();
let task = unsafe { self.unlink(head) };
self.release_task(task);
while !guard.0.head_all.get_mut().is_null() {
let head = *guard.0.head_all.get_mut();
let task = unsafe { guard.0.unlink(head) };
guard.0.release_task(task);
}
mem::forget(guard); // safe to release strong reference to queue

// Note that at this point we could still have a bunch of tasks in the
// ready to run queue. None of those tasks, however, have futures
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ pub(super) struct ReadyToRunQueue<Fut> {
/// An MPSC queue into which the tasks containing the futures are inserted
/// whenever the future inside is scheduled for polling.
impl<Fut> ReadyToRunQueue<Fut> {
// FIXME: this takes raw pointer without safety conditions.

/// The enqueue function from the 1024cores intrusive MPSC queue algorithm.
pub(super) fn enqueue(&self, task: *const Task<Fut>) {
unsafe {
Expand Down
23 changes: 23 additions & 0 deletions futures/tests/stream_futures_unordered.rs
Original file line number Diff line number Diff line change
Expand Up @@ -406,3 +406,26 @@ fn clear_in_loop() {
}
});
}

// https://github.com/rust-lang/futures-rs/issues/2863#issuecomment-2219441515
#[test]
#[should_panic]
fn panic_on_drop_fut() {
struct BadFuture;

impl Drop for BadFuture {
fn drop(&mut self) {
panic!()
}
}

impl Future for BadFuture {
type Output = ();

fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Self::Output> {
Poll::Pending
}
}

FuturesUnordered::default().push(BadFuture);
}

0 comments on commit f00e7af

Please sign in to comment.