Skip to content

Commit

Permalink
FuturesUnordered: Fix clear implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
taiki-e committed Dec 24, 2023
1 parent 04d01a0 commit e6735f3
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 40 deletions.
21 changes: 6 additions & 15 deletions futures-util/src/stream/futures_unordered/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -558,20 +558,7 @@ impl<Fut> Debug for FuturesUnordered<Fut> {
impl<Fut> FuturesUnordered<Fut> {
/// Clears the set, removing all futures.
pub fn clear(&mut self) {
self.clear_head_all();

// we just cleared all the tasks, and we have &mut self, so this is safe.
unsafe { self.ready_to_run_queue.clear() };

self.is_terminated.store(false, Relaxed);
}

fn clear_head_all(&mut self) {
while !self.head_all.get_mut().is_null() {
let head = *self.head_all.get_mut();
let task = unsafe { self.unlink(head) };
self.release_task(task);
}
*self = Self::new();
}
}

Expand All @@ -581,7 +568,11 @@ impl<Fut> Drop for FuturesUnordered<Fut> {
// associated with it. At the same time though there may be tons of
// wakers flying around which contain `Task<Fut>` references
// inside them. We'll let those naturally get deallocated.
self.clear_head_all();
while !self.head_all.get_mut().is_null() {
let head = *self.head_all.get_mut();
let task = unsafe { self.unlink(head) };
self.release_task(task);
}

// Note that at this point we could still have a bunch of tasks in the
// ready to run queue. None of those tasks, however, have futures
Expand Down
37 changes: 12 additions & 25 deletions futures-util/src/stream/futures_unordered/ready_to_run_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,38 +85,25 @@ impl<Fut> ReadyToRunQueue<Fut> {
pub(super) fn stub(&self) -> *const Task<Fut> {
Arc::as_ptr(&self.stub)
}

// Clear the queue of tasks.
//
// Note that each task has a strong reference count associated with it
// which is owned by the ready to run queue. This method just pulls out
// tasks and drops their refcounts.
//
// # Safety
//
// - All tasks **must** have had their futures dropped already (by FuturesUnordered::clear)
// - The caller **must** guarantee unique access to `self`
pub(crate) unsafe fn clear(&self) {
loop {
// SAFETY: We have the guarantee of mutual exclusion required by `dequeue`.
match self.dequeue() {
Dequeue::Empty => break,
Dequeue::Inconsistent => abort("inconsistent in drop"),
Dequeue::Data(ptr) => drop(Arc::from_raw(ptr)),
}
}
}
}

impl<Fut> Drop for ReadyToRunQueue<Fut> {
fn drop(&mut self) {
// Once we're in the destructor for `Inner<Fut>` we need to clear out
// the ready to run queue of tasks if there's anything left in there.

// All tasks have had their futures dropped already by the `FuturesUnordered`
// destructor above, and we have &mut self, so this is safe.
//
// Note that each task has a strong reference count associated with it
// which is owned by the ready to run queue. All tasks should have had
// their futures dropped already by the `FuturesUnordered` destructor
// above, so we're just pulling out tasks and dropping their refcounts.
unsafe {
self.clear();
loop {
match self.dequeue() {
Dequeue::Empty => break,
Dequeue::Inconsistent => abort("inconsistent in drop"),
Dequeue::Data(ptr) => drop(Arc::from_raw(ptr)),
}
}
}
}
}
25 changes: 25 additions & 0 deletions futures/tests/stream_futures_unordered.rs
Original file line number Diff line number Diff line change
Expand Up @@ -381,3 +381,28 @@ fn clear() {
tasks.clear();
assert!(!tasks.is_terminated());
}

// https://github.com/rust-lang/futures-rs/issues/2529#issuecomment-997290279
#[test]
fn clear_in_loop() {
const N: usize =
if cfg!(miri) || option_env!("QEMU_LD_PREFIX").is_some() { 100 } else { 10_000 };
futures::executor::block_on(async {
async fn task() {
let (s, r) = oneshot::channel();
std::thread::spawn(|| {
std::thread::sleep(std::time::Duration::from_micros(100));
let _ = s.send(());
});
r.await.unwrap()
}
let mut futures = FuturesUnordered::new();
for _ in 0..N {
for _ in 0..24 {
futures.push(task());
}
let _ = futures.next().await;
futures.clear();
}
});
}

0 comments on commit e6735f3

Please sign in to comment.