Skip to content

Commit f80d191

Browse files
committed
task: Drop the join waker of a task eagerly when the JoinHandle gets
dropped or the task completes Currently, the waker registered with a JoinHandle is not dropped until the task allocation is dropped. This behaviour may cause the memory allocated by a task to not be freed when in the case of two tasks awaiting each others JoinHandle. This commit changes the behaviour by actively dropping the waker when the JoinHandle gets dropped (or the task completes in some cases). Closes #6505.
1 parent c07257f commit f80d191

File tree

4 files changed

+111
-17
lines changed

4 files changed

+111
-17
lines changed

tokio/src/runtime/task/harness.rs

+44-4
Original file line numberDiff line numberDiff line change
@@ -284,9 +284,11 @@ where
284284
}
285285

286286
pub(super) fn drop_join_handle_slow(self) {
287-
// Try to unset `JOIN_INTEREST`. This must be done as a first step in
287+
// Try to unset `JOIN_INTEREST` and `JOIN_WAKER`. This must be done as a first step in
288288
// case the task concurrently completed.
289-
if self.state().unset_join_interested().is_err() {
289+
let snapshot = self.state().transition_to_join_handle_dropped();
290+
291+
if snapshot.is_complete() {
290292
// It is our responsibility to drop the output. This is critical as
291293
// the task output may not be `Send` and as such must remain with
292294
// the scheduler or `JoinHandle`. i.e. if the output remains in the
@@ -301,6 +303,25 @@ where
301303
}));
302304
}
303305

306+
if !snapshot.is_join_waker_set() {
307+
// If the JOIN_WAKER flag is unset at this point, the task is either
308+
// already terminal or not complete so the `JoinHandle` is responsible
309+
// for dropping the waker.
310+
// Safety:
311+
// If the JOIN_WAKER bit is not set the join handle has exclusive
312+
// access to the waker as per rule 2 in task/mod.rs.
313+
// This can only be the case at this point in two scenarios:
314+
// 1. The task completed and the runtime unset `JOIN_WAKER` flag
315+
// after accessing the waker during task completion. So the
316+
// `JoinHandle` is the only one to access the join waker here.
317+
// 2. The task is not completed so the `JoinHandle` was able to unset
318+
// `JOIN_WAKER` bit itself to get mutable access to the waker.
319+
// The runtime will not access the waker when this flag is unset.
320+
unsafe {
321+
self.trailer().set_waker(None);
322+
}
323+
}
324+
304325
// Drop the `JoinHandle` reference, possibly deallocating the task
305326
self.drop_reference();
306327
}
@@ -311,7 +332,6 @@ where
311332
fn complete(self) {
312333
// The future has completed and its output has been written to the task
313334
// stage. We transition from running to complete.
314-
315335
let snapshot = self.state().transition_to_complete();
316336

317337
// We catch panics here in case dropping the future or waking the
@@ -320,13 +340,33 @@ where
320340
if !snapshot.is_join_interested() {
321341
// The `JoinHandle` is not interested in the output of
322342
// this task. It is our responsibility to drop the
323-
// output.
343+
// output. The join waker was already dropped by the
344+
// `JoinHandle` before.
324345
self.core().drop_future_or_output();
325346
} else if snapshot.is_join_waker_set() {
326347
// Notify the waker. Reading the waker field is safe per rule 4
327348
// in task/mod.rs, since the JOIN_WAKER bit is set and the call
328349
// to transition_to_complete() above set the COMPLETE bit.
329350
self.trailer().wake_join();
351+
352+
// If JOIN_INTEREST is still set at this point the `JoinHandle`
353+
// was not dropped since setting COMPLETE so we unset JOIN_WAKER
354+
// to give the responsibility of dropping the join waker back to
355+
// the `JoinHandle`. `JoinHandle` is able to drop the waker when
356+
// itself gets dropped.
357+
if self.state().unset_waker_if_join_interested().is_err() {
358+
// Unsetting JOIN_WAKER flag will fail if JOIN_INTERESTED is
359+
// not set to indicate that the runtime has the responsibility
360+
// to drop the join waker here as per rule 7 in task/mod.rs.
361+
// Safety:
362+
// If JOIN_INTEREST got unset since setting COMPLETE we are
363+
// the only ones to have access to the join waker and need
364+
// to drop it here because the `JoinHandle` of the task
365+
// already got dropped.
366+
unsafe {
367+
self.trailer().set_waker(None);
368+
}
369+
}
330370
}
331371
}));
332372

tokio/src/runtime/task/mod.rs

+5
Original file line numberDiff line numberDiff line change
@@ -94,10 +94,15 @@
9494
//! `JoinHandle` needs to (i) successfully set `JOIN_WAKER` to zero if it is
9595
//! not already zero to gain exclusive access to the waker field per rule
9696
//! 2, (ii) write a waker, and (iii) successfully set `JOIN_WAKER` to one.
97+
//! If the `JoinHandle` unsets `JOIN_WAKER` in the process of being dropped
98+
//! to clear the waker field, only steps (i) and (ii) are relevant.
9799
//!
98100
//! 6. The `JoinHandle` can change `JOIN_WAKER` only if COMPLETE is zero (i.e.
99101
//! the task hasn't yet completed).
100102
//!
103+
//! 7. If JOIN_INTEREST is zero and COMPLETE is one, then the runtime has
104+
//! exclusive (mutable) access to the waker field.
105+
//!
101106
//! Rule 6 implies that the steps (i) or (iii) of rule 5 may fail due to a
102107
//! race. If step (i) fails, then the attempt to write a waker is aborted. If
103108
//! step (iii) fails because COMPLETE is set to one by another thread after

tokio/src/runtime/task/state.rs

+32-13
Original file line numberDiff line numberDiff line change
@@ -371,22 +371,21 @@ impl State {
371371
.map_err(|_| ())
372372
}
373373

374-
/// Tries to unset the `JOIN_INTEREST` flag.
375-
///
376-
/// Returns `Ok` if the operation happens before the task transitions to a
377-
/// completed state, `Err` otherwise.
378-
pub(super) fn unset_join_interested(&self) -> UpdateResult {
379-
self.fetch_update(|curr| {
380-
assert!(curr.is_join_interested());
374+
/// Unsets the `JOIN_INTEREST` flag. If `COMPLETE` is not set, the `JOIN_WAKER`
375+
/// flag is also unset.
376+
pub(super) fn transition_to_join_handle_dropped(&self) -> Snapshot {
377+
self.fetch_update_action(|mut snapshot| {
378+
assert!(snapshot.is_join_interested());
381379

382-
if curr.is_complete() {
383-
return None;
384-
}
380+
snapshot.unset_join_interested();
385381

386-
let mut next = curr;
387-
next.unset_join_interested();
382+
if !snapshot.is_complete() {
383+
// If `COMPLETE` is unset we also unset `JOIN_WAKER` to give the
384+
// `JoinHandle` exclusive access to the waker to drop it.
385+
snapshot.unset_join_waker();
386+
}
388387

389-
Some(next)
388+
(snapshot, Some(snapshot))
390389
})
391390
}
392391

@@ -430,6 +429,26 @@ impl State {
430429
})
431430
}
432431

432+
/// Unsets the `JOIN_WAKER` bit only if the `JOIN_INTEREST` is still set.
433+
///
434+
/// Returns `Ok` has been unset, `Err` otherwise. This operation requires
435+
/// the task to be completed.
436+
pub(super) fn unset_waker_if_join_interested(&self) -> UpdateResult {
437+
self.fetch_update(|curr| {
438+
assert!(curr.is_complete());
439+
assert!(curr.is_join_waker_set());
440+
441+
if !curr.is_join_interested() {
442+
return None;
443+
}
444+
445+
let mut next = curr;
446+
next.unset_join_waker();
447+
448+
Some(next)
449+
})
450+
}
451+
433452
pub(super) fn ref_inc(&self) {
434453
use std::process;
435454
use std::sync::atomic::Ordering::Relaxed;

tokio/src/runtime/tests/loom_multi_thread.rs

+30
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ mod yield_now;
1010
/// In order to speed up the C
1111
use crate::runtime::tests::loom_oneshot as oneshot;
1212
use crate::runtime::{self, Runtime};
13+
use crate::sync::mpsc::channel;
1314
use crate::{spawn, task};
1415
use tokio_test::assert_ok;
1516

@@ -459,3 +460,32 @@ impl<T: Future> Future for Track<T> {
459460
})
460461
}
461462
}
463+
464+
#[test]
465+
fn drop_tasks_with_reference_cycle() {
466+
loom::model(|| {
467+
let pool = mk_pool(2);
468+
469+
pool.block_on(async move {
470+
let (tx, mut rx) = channel(1);
471+
472+
let (a_closer, mut wait_for_close_a) = channel::<()>(1);
473+
let (b_closer, mut wait_for_close_b) = channel::<()>(1);
474+
475+
let a = spawn(async move {
476+
let b = rx.recv().await.unwrap();
477+
478+
futures::future::select(std::pin::pin!(b), std::pin::pin!(a_closer.send(()))).await;
479+
});
480+
481+
let b = spawn(async move {
482+
let _ = a.await;
483+
let _ = b_closer.send(()).await;
484+
});
485+
486+
tx.send(b).await.unwrap();
487+
488+
futures::future::join(wait_for_close_a.recv(), wait_for_close_b.recv()).await;
489+
});
490+
});
491+
}

0 commit comments

Comments
 (0)