Skip to content

Commit ce1c74f

Browse files
authored
metrics: fix deadlock in injection_queue_depth_multi_thread test (#6916)
1 parent 28c9a14 commit ce1c74f

File tree

1 file changed

+41
-25
lines changed

1 file changed

+41
-25
lines changed

tokio/tests/rt_metrics.rs

+41-25
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,8 @@
22
#![warn(rust_2018_idioms)]
33
#![cfg(all(feature = "full", not(target_os = "wasi"), target_has_atomic = "64"))]
44

5-
use std::sync::{Arc, Barrier};
5+
use std::sync::mpsc;
6+
use std::time::Duration;
67
use tokio::runtime::Runtime;
78

89
#[test]
@@ -68,36 +69,51 @@ fn global_queue_depth_multi_thread() {
6869
let rt = threaded();
6970
let metrics = rt.metrics();
7071

71-
let barrier1 = Arc::new(Barrier::new(3));
72-
let barrier2 = Arc::new(Barrier::new(3));
73-
74-
// Spawn a task per runtime worker to block it.
75-
for _ in 0..2 {
76-
let barrier1 = barrier1.clone();
77-
let barrier2 = barrier2.clone();
78-
rt.spawn(async move {
79-
barrier1.wait();
80-
barrier2.wait();
81-
});
82-
}
83-
84-
barrier1.wait();
72+
for _ in 0..10 {
73+
if let Ok(_blocking_tasks) = try_block_threaded(&rt) {
74+
for i in 0..10 {
75+
assert_eq!(i, metrics.global_queue_depth());
76+
rt.spawn(async {});
77+
}
8578

86-
let mut fail: Option<String> = None;
87-
for i in 0..10 {
88-
let depth = metrics.global_queue_depth();
89-
if i != depth {
90-
fail = Some(format!("{i} is not equal to {depth}"));
91-
break;
79+
return;
9280
}
93-
rt.spawn(async {});
9481
}
9582

96-
barrier2.wait();
83+
panic!("exhausted every try to block the runtime");
84+
}
9785

98-
if let Some(fail) = fail {
99-
panic!("{fail}");
86+
fn try_block_threaded(rt: &Runtime) -> Result<Vec<mpsc::Sender<()>>, mpsc::RecvTimeoutError> {
87+
let (tx, rx) = mpsc::channel();
88+
89+
let blocking_tasks = (0..rt.metrics().num_workers())
90+
.map(|_| {
91+
let tx = tx.clone();
92+
let (task, barrier) = mpsc::channel();
93+
94+
// Spawn a task per runtime worker to block it.
95+
rt.spawn(async move {
96+
tx.send(()).unwrap();
97+
barrier.recv().ok();
98+
});
99+
100+
task
101+
})
102+
.collect();
103+
104+
// Make sure the previously spawned tasks are blocking the runtime by
105+
// receiving a message from each blocking task.
106+
//
107+
// If this times out we were unsuccessful in blocking the runtime and hit
108+
// a deadlock instead (which might happen and is expected behaviour).
109+
for _ in 0..rt.metrics().num_workers() {
110+
rx.recv_timeout(Duration::from_secs(1))?;
100111
}
112+
113+
// Return senders of the mpsc channels used for blocking the runtime as a
114+
// surrogate handle for the tasks. Sending a message or dropping the senders
115+
// will unblock the runtime.
116+
Ok(blocking_tasks)
101117
}
102118

103119
fn current_thread() -> Runtime {

0 commit comments

Comments
 (0)