Skip to content

Commit

Permalink
Fix use after free of task in FuturesUnordered (caused by accidental …
Browse files Browse the repository at this point in the history
…reference count release when dropping a future panics and unwinds)
  • Loading branch information
Imberflur committed Oct 1, 2024
1 parent 99ebe58 commit 8b678f4
Show file tree
Hide file tree
Showing 3 changed files with 73 additions and 20 deletions.
68 changes: 48 additions & 20 deletions futures-util/src/stream/futures_unordered/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -255,16 +255,6 @@ impl<Fut> FuturesUnordered<Fut> {
// `wake` from doing any work in the future
let prev = task.queued.swap(true, SeqCst);

// Drop the future, even if it hasn't finished yet. This is safe
// because we're dropping the future on the thread that owns
// `FuturesUnordered`, which correctly tracks `Fut`'s lifetimes and
// such.
unsafe {
// Set to `None` rather than `take()`ing to prevent moving the
// future.
*task.future.get() = None;
}

// If the queued flag was previously set, then it means that this task
// is still in our internal ready to run queue. We then transfer
// ownership of our reference count to the ready to run queue, and it'll
Expand All @@ -276,8 +266,25 @@ impl<Fut> FuturesUnordered<Fut> {
// enqueue the task, so our task will never see the ready to run queue
// again. The task itself will be deallocated once all reference counts
// have been dropped elsewhere by the various wakers that contain it.
if prev {
mem::forget(task);
//
// Use ManuallyDrop to transfer the reference count ownership before
// dropping the future so unwinding won't release the reference count.
let md_slot;
let task = if prev {
md_slot = mem::ManuallyDrop::new(task);
&*md_slot
} else {
&task
};

// Drop the future, even if it hasn't finished yet. This is safe
// because we're dropping the future on the thread that owns
// `FuturesUnordered`, which correctly tracks `Fut`'s lifetimes and
// such.
unsafe {
// Set to `None` rather than `take()`ing to prevent moving the
// future.
*task.future.get() = None;
}
}

Expand Down Expand Up @@ -566,15 +573,36 @@ impl<Fut> FuturesUnordered<Fut> {

impl<Fut> Drop for FuturesUnordered<Fut> {
fn drop(&mut self) {
// When a `FuturesUnordered` is dropped we want to drop all futures
// associated with it. At the same time though there may be tons of
// wakers flying around which contain `Task<Fut>` references
// inside them. We'll let those naturally get deallocated.
while !self.head_all.get_mut().is_null() {
let head = *self.head_all.get_mut();
let task = unsafe { self.unlink(head) };
self.release_task(task);
// Before the strong reference to the queue is dropped we need all
// futures to be dropped. See note at the bottom of this method.
//
// This ensures we drop all futures even in the case of one drop
// panicking and unwinding.
//
// If a second future panics, that will trigger an abort from panicking
// while panicking.
struct DropGuard<'a, Fut>(&'a mut FuturesUnordered<Fut>);
impl<Fut> DropGuard<'_, Fut> {
fn drop_futures(&mut self) {
// When a `FuturesUnordered` is dropped we want to drop all futures
// associated with it. At the same time though there may be tons of
// wakers flying around which contain `Task<Fut>` references
// inside them. We'll let those naturally get deallocated.
while !self.0.head_all.get_mut().is_null() {
let head = *self.0.head_all.get_mut();
let task = unsafe { self.0.unlink(head) };
self.0.release_task(task);
}
}
}
impl<Fut> Drop for DropGuard<'_, Fut> {
fn drop(&mut self) {
self.drop_futures();
}
}
let mut guard = DropGuard(self);
guard.drop_futures();
mem::forget(guard); // no need to check head_all again

// Note that at this point we could still have a bunch of tasks in the
// ready to run queue. None of those tasks, however, have futures
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ pub(super) struct ReadyToRunQueue<Fut> {
/// An MPSC queue into which the tasks containing the futures are inserted
/// whenever the future inside is scheduled for polling.
impl<Fut> ReadyToRunQueue<Fut> {
// FIXME: this takes raw pointer without safety conditions.

/// The enqueue function from the 1024cores intrusive MPSC queue algorithm.
pub(super) fn enqueue(&self, task: *const Task<Fut>) {
unsafe {
Expand Down
23 changes: 23 additions & 0 deletions futures/tests/stream_futures_unordered.rs
Original file line number Diff line number Diff line change
Expand Up @@ -406,3 +406,26 @@ fn clear_in_loop() {
}
});
}

// https://github.com/rust-lang/futures-rs/issues/2863#issuecomment-2219441515
#[test]
#[should_panic]
fn panic_on_drop_fut() {
struct BadFuture;

impl Drop for BadFuture {
fn drop(&mut self) {
panic!()
}
}

impl Future for BadFuture {
type Output = ();

fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Self::Output> {
Poll::Pending
}
}

FuturesUnordered::default().push(BadFuture);
}

0 comments on commit 8b678f4

Please sign in to comment.