Skip to content

Commit 95fdb29

Browse files
committed
runtime: skip notified tasks during taskdumps
Fixes #6051.
1 parent a0a58d7 commit 95fdb29

File tree

4 files changed

+101
-19
lines changed

4 files changed

+101
-19
lines changed

tokio/src/runtime/task/mod.rs

+8-4
Original file line numberDiff line numberDiff line change
@@ -363,10 +363,14 @@ impl<S: 'static> Task<S> {
363363
}
364364

365365
cfg_taskdump! {
366-
pub(super) fn notify_for_tracing(&self) -> Notified<S> {
367-
self.as_raw().state().transition_to_notified_for_tracing();
368-
// SAFETY: `transition_to_notified_for_tracing` increments the refcount.
369-
unsafe { Notified(Task::new(self.raw)) }
366+
pub(super) fn notify_for_tracing(&self) -> Option<Notified<S>> {
367+
if self.as_raw().state().transition_to_notified_for_tracing() {
368+
// SAFETY: `transition_to_notified_for_tracing` increments the
369+
// refcount.
370+
Some(unsafe { Notified(Task::new(self.raw)) })
371+
} else {
372+
None
373+
}
370374
}
371375
}
372376
}

tokio/src/runtime/task/state.rs

+9-5
Original file line numberDiff line numberDiff line change
@@ -278,12 +278,16 @@ impl State {
278278
target_os = "linux",
279279
any(target_arch = "aarch64", target_arch = "x86", target_arch = "x86_64")
280280
))]
281-
pub(super) fn transition_to_notified_for_tracing(&self) {
281+
pub(super) fn transition_to_notified_for_tracing(&self) -> bool {
282282
self.fetch_update_action(|mut snapshot| {
283-
snapshot.set_notified();
284-
snapshot.ref_inc();
285-
((), Some(snapshot))
286-
});
283+
if snapshot.is_notified() {
284+
(false, None)
285+
} else {
286+
snapshot.set_notified();
287+
snapshot.ref_inc();
288+
(true, Some(snapshot))
289+
}
290+
})
287291
}
288292

289293
/// Sets the cancelled bit and transitions the state to `NOTIFIED` if idle.

tokio/src/runtime/task/trace/mod.rs

+21-10
Original file line numberDiff line numberDiff line change
@@ -272,14 +272,19 @@ pub(in crate::runtime) fn trace_current_thread(
272272
injection: &Inject<Arc<current_thread::Handle>>,
273273
) -> Vec<Trace> {
274274
// clear the local and injection queues
275-
local.clear();
275+
276+
let mut dequeued = Vec::new();
277+
278+
while let Some(task) = local.pop_back() {
279+
dequeued.push(task);
280+
}
276281

277282
while let Some(task) = injection.pop() {
278-
drop(task);
283+
dequeued.push(task);
279284
}
280285

281286
// precondition: We have drained the tasks from the injection queue.
282-
trace_owned(owned)
287+
trace_owned(owned, dequeued)
283288
}
284289

285290
cfg_rt_multi_thread! {
@@ -299,22 +304,24 @@ cfg_rt_multi_thread! {
299304
synced: &Mutex<Synced>,
300305
injection: &Shared<Arc<multi_thread::Handle>>,
301306
) -> Vec<Trace> {
307+
let mut dequeued = Vec::new();
308+
302309
// clear the local queue
303310
while let Some(notified) = local.pop() {
304-
drop(notified);
311+
dequeued.push(notified);
305312
}
306313

307314
// clear the injection queue
308315
let mut synced = synced.lock();
309316
while let Some(notified) = injection.pop(&mut synced.inject) {
310-
drop(notified);
317+
dequeued.push(notified);
311318
}
312319

313320
drop(synced);
314321

315322
// precondition: we have drained the tasks from the local and injection
316323
// queues.
317-
trace_owned(owned)
324+
trace_owned(owned, dequeued)
318325
}
319326
}
320327

@@ -324,12 +331,16 @@ cfg_rt_multi_thread! {
324331
///
325332
/// This helper presumes exclusive access to each task. The tasks must not exist
326333
/// in any other queue.
327-
fn trace_owned<S: Schedule>(owned: &OwnedTasks<S>) -> Vec<Trace> {
334+
fn trace_owned<S: Schedule>(owned: &OwnedTasks<S>, dequeued: Vec<Notified<S>>) -> Vec<Trace> {
328335
// notify each task
329-
let mut tasks = vec![];
336+
let mut tasks = dequeued;
330337
owned.for_each(|task| {
331-
// notify the task (and thus make it poll-able) and stash it
332-
tasks.push(task.notify_for_tracing());
338+
// Notify the task (and thus make it poll-able) and stash it. This fails
339+
// if the task is already notified. In these cases, we skip tracing the
340+
// task.
341+
if let Some(notified) = task.notify_for_tracing() {
342+
tasks.push(notified);
343+
}
333344
// we do not poll it here since we hold a lock on `owned` and the task
334345
// may complete and need to remove itself from `owned`.
335346
});

tokio/tests/dump.rs

+63
Original file line numberDiff line numberDiff line change
@@ -147,10 +147,73 @@ mod future_completes_during_trace {
147147
async fn dump() {
148148
let handle = Handle::current();
149149
let _dump = handle.dump().await;
150+
tokio::task::yield_now().await;
150151
}
151152

152153
rt.block_on(async {
153154
let _ = tokio::join!(tokio::spawn(complete_during_trace()), dump());
154155
});
155156
}
156157
}
158+
159+
/// Regression tests for #6051.
160+
///
161+
/// These tests ensure that tasks notified outside of a worker will not be
162+
/// traced, since doing so will un-set their notified bit prior to them being
163+
/// run and panic.
164+
mod notified_during_tracing {
165+
use super::*;
166+
167+
fn test(rt: tokio::runtime::Runtime) {
168+
async fn dump() {
169+
loop {
170+
let handle = Handle::current();
171+
let _dump = handle.dump().await;
172+
// TODO: This tests hangs with this commented out. Why?
173+
// tokio::task::yield_now().await;
174+
}
175+
}
176+
177+
rt.block_on(async {
178+
let timer = tokio::spawn(async {
179+
loop {
180+
tokio::time::sleep(tokio::time::Duration::from_nanos(1)).await;
181+
}
182+
});
183+
184+
let timeout = async {
185+
tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
186+
};
187+
188+
tokio::select!(
189+
biased;
190+
_ = timeout => {},
191+
_ = timer => {},
192+
_ = dump() => {},
193+
);
194+
});
195+
}
196+
197+
#[test]
198+
// TODO: This currently hangs, with or without the regression fix. Adding a
199+
// `yield_now` to the end of `fn dump()` above fixes the issue, but why?
200+
fn current_thread() {
201+
let rt = runtime::Builder::new_current_thread()
202+
.enable_all()
203+
.build()
204+
.unwrap();
205+
206+
test(rt)
207+
}
208+
209+
#[test]
210+
fn multi_thread() {
211+
let rt = runtime::Builder::new_multi_thread()
212+
.enable_all()
213+
.worker_threads(3)
214+
.build()
215+
.unwrap();
216+
217+
test(rt)
218+
}
219+
}

0 commit comments

Comments
 (0)