Skip to content

Commit 8e88893

Browse files
committed
fix: avoid memory leack when waker panic
1 parent 5564fe6 commit 8e88893

File tree

2 files changed

+83
-19
lines changed

2 files changed

+83
-19
lines changed

tokio/src/runtime/time/mod.rs

+8-15
Original file line numberDiff line numberDiff line change
@@ -321,26 +321,19 @@ impl Handle {
321321

322322
while let Some(expiration) = lock.poll(now) {
323323
lock.set_elapsed(expiration.deadline);
324-
// Note that we need to take _all_ of the entries off the list before
325-
// processing any of them. This is important because it's possible that
326-
// those entries might need to be reinserted into the same slot.
327-
//
328-
// This happens only on the highest level, when an entry is inserted
329-
// more than MAX_DURATION into the future. When this happens, we wrap
330-
// around, and process some entries a multiple of MAX_DURATION before
331-
// they actually need to be dropped down a level. We then reinsert them
332-
// back into the same position; we must make sure we don't then process
333-
// those entries again or we'll end up in an infinite loop.
334-
let unguarded_list = lock.take_entries(&expiration);
335324
// It is critical for `GuardedLinkedList` safety that the guard node is
336325
// pinned in memory and is not dropped until the guarded list is dropped.
337326
let guard = TimerShared::new(id);
338327
pin!(guard);
339-
let guard_ptr = NonNull::from(guard.as_ref().get_ref());
340-
// GuardedLinkedList ensures that the concurrent drop of Entry in this slot is safe.
341-
let mut guarded_list = unguarded_list.into_guarded(guard_ptr);
342328

343-
while let Some(entry) = guarded_list.pop_back() {
329+
// * This list will be still guarded by the lock of the Wheel with the specefied id.
330+
// `EntryWaitersList` wrapper makes sure we hold the lock to modify it.
331+
// * This wrapper will empty the list on drop. It is critical for safety
332+
// that we will not leave any list entry with a pointer to the local
333+
// guard node after this function returns / panics.
334+
let mut list = lock.get_waiters_list(&expiration, guard.as_ref(), id, self);
335+
336+
while let Some(entry) = list.pop_back_locked(&mut lock) {
344337
let entry = unsafe { entry.as_ref().handle() };
345338
let deadline = expiration.deadline;
346339
// Try to expire the entry; this is cheap (doesn't synchronize) if

tokio/src/runtime/time/wheel/mod.rs

+75-4
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,68 @@
11
use crate::runtime::time::{TimerHandle, TimerShared};
22
use crate::time::error::InsertError;
3+
use crate::util::linked_list::{self, GuardedLinkedList, LinkedList};
34

45
mod level;
56
pub(crate) use self::level::Expiration;
67
use self::level::Level;
78

9+
use std::pin::Pin;
810
use std::{array, ptr::NonNull};
911

1012
use super::entry::MAX_SAFE_MILLIS_DURATION;
11-
use super::EntryList;
13+
use super::Handle;
14+
15+
/// List used in `Handle::process_at_sharded_time`. It wraps a guarded linked list
16+
/// and gates the access to it on the lock of the `Wheel` with the specified `wheel_id`.
17+
/// It also empties the list on drop.
18+
pub(super) struct EntryWaitersList<'a> {
19+
// GuardedLinkedList ensures that the concurrent drop of Entry in this slot is safe.
20+
list: GuardedLinkedList<TimerShared, <TimerShared as linked_list::Link>::Target>,
21+
is_empty: bool,
22+
wheel_id: u32,
23+
handle: &'a Handle,
24+
}
25+
26+
impl<'a> Drop for EntryWaitersList<'a> {
27+
fn drop(&mut self) {
28+
// If the list is not empty, we unlink all waiters from it.
29+
// We do not wake the waiters to avoid double panics.
30+
if !self.is_empty {
31+
let _lock = self.handle.inner.lock_sharded_wheel(self.wheel_id);
32+
while self.list.pop_back().is_some() {}
33+
}
34+
}
35+
}
36+
37+
impl<'a> EntryWaitersList<'a> {
38+
fn new(
39+
unguarded_list: LinkedList<TimerShared, <TimerShared as linked_list::Link>::Target>,
40+
guard: Pin<&'a TimerShared>,
41+
wheel_id: u32,
42+
handle: &'a Handle,
43+
) -> Self {
44+
let guard_ptr = NonNull::from(guard.get_ref());
45+
let list = unguarded_list.into_guarded(guard_ptr);
46+
Self {
47+
list,
48+
is_empty: false,
49+
wheel_id,
50+
handle,
51+
}
52+
}
53+
54+
/// Removes the last element from the guarded list. Modifying this list
55+
/// requires an exclusive access to the Wheel with the specified `wheel_id`.
56+
pub(super) fn pop_back_locked(&mut self, _wheel: &mut Wheel) -> Option<NonNull<TimerShared>> {
57+
let result = self.list.pop_back();
58+
if result.is_none() {
59+
// Save information about emptiness to avoid waiting for lock
60+
// in the destructor.
61+
self.is_empty = true;
62+
}
63+
result
64+
}
65+
}
1266

1367
/// Timing wheel implementation.
1468
///
@@ -208,9 +262,26 @@ impl Wheel {
208262
}
209263
}
210264

211-
/// Obtains the list of entries that need processing for the given expiration.
212-
pub(super) fn take_entries(&mut self, expiration: &Expiration) -> EntryList {
213-
self.levels[expiration.level].take_slot(expiration.slot)
265+
/// Obtains the guarded list of entries that need processing for the given expiration.
266+
pub(super) fn get_waiters_list<'a>(
267+
&mut self,
268+
expiration: &Expiration,
269+
guard: Pin<&'a TimerShared>,
270+
wheel_id: u32,
271+
handle: &'a Handle,
272+
) -> EntryWaitersList<'a> {
273+
// Note that we need to take _all_ of the entries off the list before
274+
// processing any of them. This is important because it's possible that
275+
// those entries might need to be reinserted into the same slot.
276+
//
277+
// This happens only on the highest level, when an entry is inserted
278+
// more than MAX_DURATION into the future. When this happens, we wrap
279+
// around, and process some entries a multiple of MAX_DURATION before
280+
// they actually need to be dropped down a level. We then reinsert them
281+
// back into the same position; we must make sure we don't then process
282+
// those entries again or we'll end up in an infinite loop.
283+
let unguarded_list = self.levels[expiration.level].take_slot(expiration.slot);
284+
EntryWaitersList::new(unguarded_list, guard, wheel_id, handle)
214285
}
215286

216287
pub(super) fn occupied_bit_maintain(&mut self, expiration: &Expiration) {

0 commit comments

Comments
 (0)