Skip to content

Commit 3a94eb0

Browse files
authored
rt: batch pop from injection queue when idle (#5705)
In the multi-threaded scheduler, when there are no tasks on the local queue, a worker will attempt to pull tasks from the injection queue. Previously, the worker would only attempt to poll one task from the injection queue then continue trying to find work from other sources. This can result in the injection queue backing up when there are many tasks being scheduled from outside of the runtime. This patch updates the worker to try to poll more than one task from the injection queue when it has no more local work. Note that we also don't want a single worker to poll **all** tasks on the injection queue as that would result in work becoming unbalanced.
1 parent 93bde08 commit 3a94eb0

File tree

10 files changed

+381
-53
lines changed

10 files changed

+381
-53
lines changed

.github/workflows/loom.yml

+14-8
Original file line numberDiff line numberDiff line change
@@ -24,13 +24,19 @@ jobs:
2424
runs-on: ubuntu-latest
2525
strategy:
2626
matrix:
27-
scope:
28-
- --skip loom_pool
29-
- loom_pool::group_a
30-
- loom_pool::group_b
31-
- loom_pool::group_c
32-
- loom_pool::group_d
33-
- time::driver
27+
include:
28+
- scope: --skip loom_pool
29+
max_preemptions: 2
30+
- scope: loom_pool::group_a
31+
max_preemptions: 1
32+
- scope: loom_pool::group_b
33+
max_preemptions: 2
34+
- scope: loom_pool::group_c
35+
max_preemptions: 1
36+
- scope: loom_pool::group_d
37+
max_preemptions: 1
38+
- scope: time::driver
39+
max_preemptions: 2
3440
steps:
3541
- uses: actions/checkout@v3
3642
- name: Install Rust ${{ env.rust_stable }}
@@ -43,6 +49,6 @@ jobs:
4349
working-directory: tokio
4450
env:
4551
RUSTFLAGS: --cfg loom --cfg tokio_unstable -Dwarnings
46-
LOOM_MAX_PREEMPTIONS: 2
52+
LOOM_MAX_PREEMPTIONS: ${{ matrix.max_preemptions }}
4753
LOOM_MAX_BRANCHES: 10000
4854
SCOPE: ${{ matrix.scope }}

benches/rt_multi_threaded.rs

+59-4
Original file line numberDiff line numberDiff line change
@@ -10,9 +10,10 @@ use std::sync::atomic::AtomicUsize;
1010
use std::sync::atomic::Ordering::Relaxed;
1111
use std::sync::{mpsc, Arc};
1212

13-
fn spawn_many(b: &mut Bencher) {
14-
const NUM_SPAWN: usize = 10_000;
13+
const NUM_WORKERS: usize = 4;
14+
const NUM_SPAWN: usize = 10_000;
1515

16+
fn spawn_many_local(b: &mut Bencher) {
1617
let rt = rt();
1718

1819
let (tx, rx) = mpsc::sync_channel(1000);
@@ -38,6 +39,52 @@ fn spawn_many(b: &mut Bencher) {
3839
});
3940
}
4041

42+
fn spawn_many_remote_idle(b: &mut Bencher) {
43+
let rt = rt();
44+
45+
let mut handles = Vec::with_capacity(NUM_SPAWN);
46+
47+
b.iter(|| {
48+
for _ in 0..NUM_SPAWN {
49+
handles.push(rt.spawn(async {}));
50+
}
51+
52+
rt.block_on(async {
53+
for handle in handles.drain(..) {
54+
handle.await.unwrap();
55+
}
56+
});
57+
});
58+
}
59+
60+
fn spawn_many_remote_busy(b: &mut Bencher) {
61+
let rt = rt();
62+
let rt_handle = rt.handle();
63+
let mut handles = Vec::with_capacity(NUM_SPAWN);
64+
65+
// Spawn some tasks to keep the runtimes busy
66+
for _ in 0..(2 * NUM_WORKERS) {
67+
rt.spawn(async {
68+
loop {
69+
tokio::task::yield_now().await;
70+
std::thread::sleep(std::time::Duration::from_micros(10));
71+
}
72+
});
73+
}
74+
75+
b.iter(|| {
76+
for _ in 0..NUM_SPAWN {
77+
handles.push(rt_handle.spawn(async {}));
78+
}
79+
80+
rt.block_on(async {
81+
for handle in handles.drain(..) {
82+
handle.await.unwrap();
83+
}
84+
});
85+
});
86+
}
87+
4188
fn yield_many(b: &mut Bencher) {
4289
const NUM_YIELD: usize = 1_000;
4390
const TASKS: usize = 200;
@@ -140,12 +187,20 @@ fn chained_spawn(b: &mut Bencher) {
140187

141188
fn rt() -> Runtime {
142189
runtime::Builder::new_multi_thread()
143-
.worker_threads(4)
190+
.worker_threads(NUM_WORKERS)
144191
.enable_all()
145192
.build()
146193
.unwrap()
147194
}
148195

149-
benchmark_group!(scheduler, spawn_many, ping_pong, yield_many, chained_spawn,);
196+
benchmark_group!(
197+
scheduler,
198+
spawn_many_local,
199+
spawn_many_remote_idle,
200+
spawn_many_remote_busy,
201+
ping_pong,
202+
yield_many,
203+
chained_spawn,
204+
);
150205

151206
benchmark_main!(scheduler);

tokio/src/runtime/scheduler/multi_thread/queue.rs

+77-2
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,15 @@ impl<T> Local<T> {
110110
!self.inner.is_empty()
111111
}
112112

113+
/// How many tasks can be pushed into the queue
114+
pub(crate) fn remaining_slots(&self) -> usize {
115+
self.inner.remaining_slots()
116+
}
117+
118+
pub(crate) fn max_capacity(&self) -> usize {
119+
LOCAL_QUEUE_CAPACITY
120+
}
121+
113122
/// Returns false if there are any entries in the queue
114123
///
115124
/// Separate to is_stealable so that refactors of is_stealable to "protect"
@@ -118,8 +127,62 @@ impl<T> Local<T> {
118127
!self.inner.is_empty()
119128
}
120129

121-
/// Pushes a task to the back of the local queue, skipping the LIFO slot.
122-
pub(crate) fn push_back(
130+
/// Pushes a batch of tasks to the back of the queue. All tasks must fit in
131+
/// the local queue.
132+
///
133+
/// # Panics
134+
///
135+
/// The method panics if there is not enough capacity to fit in the queue.
136+
pub(crate) fn push_back(&mut self, tasks: impl ExactSizeIterator<Item = task::Notified<T>>) {
137+
let len = tasks.len();
138+
assert!(len <= LOCAL_QUEUE_CAPACITY);
139+
140+
if len == 0 {
141+
// Nothing to do
142+
return;
143+
}
144+
145+
let head = self.inner.head.load(Acquire);
146+
let (steal, _) = unpack(head);
147+
148+
// safety: this is the **only** thread that updates this cell.
149+
let mut tail = unsafe { self.inner.tail.unsync_load() };
150+
151+
if tail.wrapping_sub(steal) <= (LOCAL_QUEUE_CAPACITY - len) as UnsignedShort {
152+
// Yes, this if condition is structured a bit weird (first block
153+
// does nothing, second returns an error). It is this way to match
154+
// `push_back_or_overflow`.
155+
} else {
156+
panic!()
157+
}
158+
159+
for task in tasks {
160+
let idx = tail as usize & MASK;
161+
162+
self.inner.buffer[idx].with_mut(|ptr| {
163+
// Write the task to the slot
164+
//
165+
// Safety: There is only one producer and the above `if`
166+
// condition ensures we don't touch a cell if there is a
167+
// value, thus no consumer.
168+
unsafe {
169+
ptr::write((*ptr).as_mut_ptr(), task);
170+
}
171+
});
172+
173+
tail = tail.wrapping_add(1);
174+
}
175+
176+
self.inner.tail.store(tail, Release);
177+
}
178+
179+
/// Pushes a task to the back of the local queue, if there is not enough
180+
/// capacity in the queue, this triggers the overflow operation.
181+
///
182+
/// When the queue overflows, half of the curent contents of the queue is
183+
/// moved to the given Injection queue. This frees up capacity for more
184+
/// tasks to be pushed into the local queue.
185+
pub(crate) fn push_back_or_overflow(
123186
&mut self,
124187
mut task: task::Notified<T>,
125188
inject: &Inject<T>,
@@ -153,6 +216,11 @@ impl<T> Local<T> {
153216
}
154217
};
155218

219+
self.push_back_finish(task, tail);
220+
}
221+
222+
// Second half of `push_back`
223+
fn push_back_finish(&self, task: task::Notified<T>, tail: UnsignedShort) {
156224
// Map the position to a slot index.
157225
let idx = tail as usize & MASK;
158226

@@ -501,6 +569,13 @@ impl<T> Drop for Local<T> {
501569
}
502570

503571
impl<T> Inner<T> {
572+
fn remaining_slots(&self) -> usize {
573+
let (steal, _) = unpack(self.head.load(Acquire));
574+
let tail = self.tail.load(Acquire);
575+
576+
LOCAL_QUEUE_CAPACITY - (tail.wrapping_sub(steal) as usize)
577+
}
578+
504579
fn len(&self) -> UnsignedShort {
505580
let (_, head) = unpack(self.head.load(Acquire));
506581
let tail = self.tail.load(Acquire);

tokio/src/runtime/scheduler/multi_thread/worker.rs

+39-5
Original file line numberDiff line numberDiff line change
@@ -509,8 +509,11 @@ impl Context {
509509
} else {
510510
// Not enough budget left to run the LIFO task, push it to
511511
// the back of the queue and return.
512-
core.run_queue
513-
.push_back(task, self.worker.inject(), &mut core.metrics);
512+
core.run_queue.push_back_or_overflow(
513+
task,
514+
self.worker.inject(),
515+
&mut core.metrics,
516+
);
514517
return Ok(core);
515518
}
516519
}
@@ -612,7 +615,38 @@ impl Core {
612615
if self.tick % worker.handle.shared.config.global_queue_interval == 0 {
613616
worker.inject().pop().or_else(|| self.next_local_task())
614617
} else {
615-
self.next_local_task().or_else(|| worker.inject().pop())
618+
let maybe_task = self.next_local_task();
619+
620+
if maybe_task.is_some() {
621+
return maybe_task;
622+
}
623+
624+
// Other threads can only **remove** tasks from the current worker's
625+
// `run_queue`. So, we can be confident that by the time we call
626+
// `run_queue.push_back` below, there will be *at least* `cap`
627+
// available slots in the queue.
628+
let cap = usize::min(
629+
self.run_queue.remaining_slots(),
630+
self.run_queue.max_capacity() / 2,
631+
);
632+
633+
// The worker is currently idle, pull a batch of work from the
634+
// injection queue. We don't want to pull *all* the work so other
635+
// workers can also get some.
636+
let n = usize::min(
637+
worker.inject().len() / worker.handle.shared.remotes.len() + 1,
638+
cap,
639+
);
640+
641+
let mut tasks = worker.inject().pop_n(n);
642+
643+
// Pop the first task to return immedietly
644+
let ret = tasks.next();
645+
646+
// Push the rest of the on the run queue
647+
self.run_queue.push_back(tasks);
648+
649+
ret
616650
}
617651
}
618652

@@ -808,7 +842,7 @@ impl Handle {
808842
// flexibility and the task may go to the front of the queue.
809843
let should_notify = if is_yield || self.shared.config.disable_lifo_slot {
810844
core.run_queue
811-
.push_back(task, &self.shared.inject, &mut core.metrics);
845+
.push_back_or_overflow(task, &self.shared.inject, &mut core.metrics);
812846
true
813847
} else {
814848
// Push to the LIFO slot
@@ -817,7 +851,7 @@ impl Handle {
817851

818852
if let Some(prev) = prev {
819853
core.run_queue
820-
.push_back(prev, &self.shared.inject, &mut core.metrics);
854+
.push_back_or_overflow(prev, &self.shared.inject, &mut core.metrics);
821855
}
822856

823857
core.lifo_slot = Some(task);

0 commit comments

Comments
 (0)