From 92e084509201996c68ea514e8aae42d1387b6fab Mon Sep 17 00:00:00 2001 From: Imbris <2002109+Imberflur@users.noreply.github.com> Date: Thu, 3 Oct 2024 12:11:15 -0400 Subject: [PATCH] Fix use after free of task in FuturesUnordered when dropped future panics (#2886) --- .github/workflows/ci.yml | 10 +++- .../src/stream/futures_unordered/mod.rs | 51 +++++++++++++------ .../futures_unordered/ready_to_run_queue.rs | 2 + futures/tests/stream_futures_unordered.rs | 23 +++++++++ 4 files changed, 68 insertions(+), 18 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 5a48f5340..aafcfee05 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -246,10 +246,16 @@ jobs: - uses: taiki-e/checkout-action@v1 - name: Install Rust run: rustup toolchain install nightly --component miri && rustup default nightly - - run: cargo miri test --workspace --all-features + - run: cargo miri test --workspace --all-features -- --skip panic_on_drop_fut env: MIRIFLAGS: -Zmiri-strict-provenance -Zmiri-symbolic-alignment-check -Zmiri-disable-isolation RUSTFLAGS: ${{ env.RUSTFLAGS }} -Z randomize-layout + # This test is expected to leak. + - run: cargo miri test --workspace --all-features --test stream_futures_unordered -- panic_on_drop_fut + env: + MIRIFLAGS: -Zmiri-strict-provenance -Zmiri-symbolic-alignment-check -Zmiri-disable-isolation -Zmiri-ignore-leaks + RUSTDOCFLAGS: ${{ env.RUSTDOCFLAGS }} -Z randomize-layout + RUSTFLAGS: ${{ env.RUSTFLAGS }} -Z randomize-layout san: name: cargo test -Z sanitizer=${{ matrix.sanitizer }} @@ -268,7 +274,7 @@ jobs: run: rustup toolchain install nightly --component rust-src && rustup default nightly # https://github.com/google/sanitizers/issues/1716 / https://github.com/actions/runner-images/issues/9491 - run: sudo sysctl vm.mmap_rnd_bits=28 - - run: cargo -Z build-std test --workspace --all-features --target x86_64-unknown-linux-gnu --lib --tests + - run: cargo -Z build-std test --workspace --all-features --target x86_64-unknown-linux-gnu --lib --tests -- --skip panic_on_drop_fut env: # TODO: Once `cfg(sanitize = "..")` is stable, replace # `cfg(futures_sanitizer)` with `cfg(sanitize = "..")` and remove diff --git a/futures-util/src/stream/futures_unordered/mod.rs b/futures-util/src/stream/futures_unordered/mod.rs index 5d4b3f295..ca62b10b9 100644 --- a/futures-util/src/stream/futures_unordered/mod.rs +++ b/futures-util/src/stream/futures_unordered/mod.rs @@ -256,16 +256,6 @@ impl FuturesUnordered { // `wake` from doing any work in the future let prev = task.queued.swap(true, SeqCst); - // Drop the future, even if it hasn't finished yet. This is safe - // because we're dropping the future on the thread that owns - // `FuturesUnordered`, which correctly tracks `Fut`'s lifetimes and - // such. - unsafe { - // Set to `None` rather than `take()`ing to prevent moving the - // future. - *task.future.get() = None; - } - // If the queued flag was previously set, then it means that this task // is still in our internal ready to run queue. We then transfer // ownership of our reference count to the ready to run queue, and it'll @@ -277,8 +267,25 @@ impl FuturesUnordered { // enqueue the task, so our task will never see the ready to run queue // again. The task itself will be deallocated once all reference counts // have been dropped elsewhere by the various wakers that contain it. - if prev { - mem::forget(task); + // + // Use ManuallyDrop to transfer the reference count ownership before + // dropping the future so unwinding won't release the reference count. + let md_slot; + let task = if prev { + md_slot = mem::ManuallyDrop::new(task); + &*md_slot + } else { + &task + }; + + // Drop the future, even if it hasn't finished yet. This is safe + // because we're dropping the future on the thread that owns + // `FuturesUnordered`, which correctly tracks `Fut`'s lifetimes and + // such. + unsafe { + // Set to `None` rather than `take()`ing to prevent moving the + // future. + *task.future.get() = None; } } @@ -567,15 +574,27 @@ impl FuturesUnordered { impl Drop for FuturesUnordered { fn drop(&mut self) { + // Before the strong reference to the queue is dropped we need all + // futures to be dropped. See note at the bottom of this method. + // + // If there is a panic before this completes, we leak the queue. + struct LeakQueueOnDrop<'a, Fut>(&'a mut FuturesUnordered); + impl Drop for LeakQueueOnDrop<'_, Fut> { + fn drop(&mut self) { + mem::forget(Arc::clone(&self.0.ready_to_run_queue)); + } + } + let guard = LeakQueueOnDrop(self); // When a `FuturesUnordered` is dropped we want to drop all futures // associated with it. At the same time though there may be tons of // wakers flying around which contain `Task` references // inside them. We'll let those naturally get deallocated. - while !self.head_all.get_mut().is_null() { - let head = *self.head_all.get_mut(); - let task = unsafe { self.unlink(head) }; - self.release_task(task); + while !guard.0.head_all.get_mut().is_null() { + let head = *guard.0.head_all.get_mut(); + let task = unsafe { guard.0.unlink(head) }; + guard.0.release_task(task); } + mem::forget(guard); // safe to release strong reference to queue // Note that at this point we could still have a bunch of tasks in the // ready to run queue. None of those tasks, however, have futures diff --git a/futures-util/src/stream/futures_unordered/ready_to_run_queue.rs b/futures-util/src/stream/futures_unordered/ready_to_run_queue.rs index 77abdf4ea..6ffaf554d 100644 --- a/futures-util/src/stream/futures_unordered/ready_to_run_queue.rs +++ b/futures-util/src/stream/futures_unordered/ready_to_run_queue.rs @@ -27,6 +27,8 @@ pub(super) struct ReadyToRunQueue { /// An MPSC queue into which the tasks containing the futures are inserted /// whenever the future inside is scheduled for polling. impl ReadyToRunQueue { + // FIXME: this takes raw pointer without safety conditions. + /// The enqueue function from the 1024cores intrusive MPSC queue algorithm. pub(super) fn enqueue(&self, task: *const Task) { unsafe { diff --git a/futures/tests/stream_futures_unordered.rs b/futures/tests/stream_futures_unordered.rs index 9243c437c..e25e755cd 100644 --- a/futures/tests/stream_futures_unordered.rs +++ b/futures/tests/stream_futures_unordered.rs @@ -406,3 +406,26 @@ fn clear_in_loop() { } }); } + +// https://github.com/rust-lang/futures-rs/issues/2863#issuecomment-2219441515 +#[test] +#[should_panic] +fn panic_on_drop_fut() { + struct BadFuture; + + impl Drop for BadFuture { + fn drop(&mut self) { + panic!() + } + } + + impl Future for BadFuture { + type Output = (); + + fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll { + Poll::Pending + } + } + + FuturesUnordered::default().push(BadFuture); +}