From 8b678f40fedd9e4c6c925a6d4a7ebce8adb3c4a1 Mon Sep 17 00:00:00 2001 From: Imbris Date: Tue, 1 Oct 2024 01:57:43 -0400 Subject: [PATCH] Fix use after free of task in FuturesUnordered (caused by accidental reference count release when dropping a future panics and unwinds) --- .../src/stream/futures_unordered/mod.rs | 68 +++++++++++++------ .../futures_unordered/ready_to_run_queue.rs | 2 + futures/tests/stream_futures_unordered.rs | 23 +++++++ 3 files changed, 73 insertions(+), 20 deletions(-) diff --git a/futures-util/src/stream/futures_unordered/mod.rs b/futures-util/src/stream/futures_unordered/mod.rs index 2d4f15158..d4ee16937 100644 --- a/futures-util/src/stream/futures_unordered/mod.rs +++ b/futures-util/src/stream/futures_unordered/mod.rs @@ -255,16 +255,6 @@ impl FuturesUnordered { // `wake` from doing any work in the future let prev = task.queued.swap(true, SeqCst); - // Drop the future, even if it hasn't finished yet. This is safe - // because we're dropping the future on the thread that owns - // `FuturesUnordered`, which correctly tracks `Fut`'s lifetimes and - // such. - unsafe { - // Set to `None` rather than `take()`ing to prevent moving the - // future. - *task.future.get() = None; - } - // If the queued flag was previously set, then it means that this task // is still in our internal ready to run queue. We then transfer // ownership of our reference count to the ready to run queue, and it'll @@ -276,8 +266,25 @@ impl FuturesUnordered { // enqueue the task, so our task will never see the ready to run queue // again. The task itself will be deallocated once all reference counts // have been dropped elsewhere by the various wakers that contain it. - if prev { - mem::forget(task); + // + // Use ManuallyDrop to transfer the reference count ownership before + // dropping the future so unwinding won't release the reference count. + let md_slot; + let task = if prev { + md_slot = mem::ManuallyDrop::new(task); + &*md_slot + } else { + &task + }; + + // Drop the future, even if it hasn't finished yet. This is safe + // because we're dropping the future on the thread that owns + // `FuturesUnordered`, which correctly tracks `Fut`'s lifetimes and + // such. + unsafe { + // Set to `None` rather than `take()`ing to prevent moving the + // future. + *task.future.get() = None; } } @@ -566,15 +573,36 @@ impl FuturesUnordered { impl Drop for FuturesUnordered { fn drop(&mut self) { - // When a `FuturesUnordered` is dropped we want to drop all futures - // associated with it. At the same time though there may be tons of - // wakers flying around which contain `Task` references - // inside them. We'll let those naturally get deallocated. - while !self.head_all.get_mut().is_null() { - let head = *self.head_all.get_mut(); - let task = unsafe { self.unlink(head) }; - self.release_task(task); + // Before the strong reference to the queue is dropped we need all + // futures to be dropped. See note at the bottom of this method. + // + // This ensures we drop all futures even in the case of one drop + // panicking and unwinding. + // + // If a second future panics, that will trigger an abort from panicking + // while panicking. + struct DropGuard<'a, Fut>(&'a mut FuturesUnordered); + impl DropGuard<'_, Fut> { + fn drop_futures(&mut self) { + // When a `FuturesUnordered` is dropped we want to drop all futures + // associated with it. At the same time though there may be tons of + // wakers flying around which contain `Task` references + // inside them. We'll let those naturally get deallocated. + while !self.0.head_all.get_mut().is_null() { + let head = *self.0.head_all.get_mut(); + let task = unsafe { self.0.unlink(head) }; + self.0.release_task(task); + } + } + } + impl Drop for DropGuard<'_, Fut> { + fn drop(&mut self) { + self.drop_futures(); + } } + let mut guard = DropGuard(self); + guard.drop_futures(); + mem::forget(guard); // no need to check head_all again // Note that at this point we could still have a bunch of tasks in the // ready to run queue. None of those tasks, however, have futures diff --git a/futures-util/src/stream/futures_unordered/ready_to_run_queue.rs b/futures-util/src/stream/futures_unordered/ready_to_run_queue.rs index 77abdf4ea..6ffaf554d 100644 --- a/futures-util/src/stream/futures_unordered/ready_to_run_queue.rs +++ b/futures-util/src/stream/futures_unordered/ready_to_run_queue.rs @@ -27,6 +27,8 @@ pub(super) struct ReadyToRunQueue { /// An MPSC queue into which the tasks containing the futures are inserted /// whenever the future inside is scheduled for polling. impl ReadyToRunQueue { + // FIXME: this takes raw pointer without safety conditions. + /// The enqueue function from the 1024cores intrusive MPSC queue algorithm. pub(super) fn enqueue(&self, task: *const Task) { unsafe { diff --git a/futures/tests/stream_futures_unordered.rs b/futures/tests/stream_futures_unordered.rs index 727feb062..6be7f05be 100644 --- a/futures/tests/stream_futures_unordered.rs +++ b/futures/tests/stream_futures_unordered.rs @@ -406,3 +406,26 @@ fn clear_in_loop() { } }); } + +// https://github.com/rust-lang/futures-rs/issues/2863#issuecomment-2219441515 +#[test] +#[should_panic] +fn panic_on_drop_fut() { + struct BadFuture; + + impl Drop for BadFuture { + fn drop(&mut self) { + panic!() + } + } + + impl Future for BadFuture { + type Output = (); + + fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll { + Poll::Pending + } + } + + FuturesUnordered::default().push(BadFuture); +}