Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rt: initial implementation of new threaded runtime #5823

Merged
merged 33 commits into from
Jul 21, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
2f4fc13
rt: initial implementation of new threaded runtime
carllerche Jun 26, 2023
cf1bac3
loom PR was merged
carllerche Jun 26, 2023
f6a5668
fmt
carllerche Jun 26, 2023
911a29b
Merge remote-tracking branch 'origin/master' into rt-multi-thread-alt
carllerche Jun 27, 2023
3dc1886
fixes for CI
carllerche Jun 27, 2023
8baf117
update loom
carllerche Jun 27, 2023
74fc03d
fix test
carllerche Jun 27, 2023
91b7e80
restructure loom tests
carllerche Jun 27, 2023
b1a3055
Merge remote-tracking branch 'origin/master' into rt-multi-thread-alt
carllerche Jun 27, 2023
cbe55c7
Merge remote-tracking branch 'origin/master' into rt-multi-thread-alt
carllerche Jun 29, 2023
7524f02
fix leak
carllerche Jun 30, 2023
90910c0
Merge remote-tracking branch 'origin/master' into rt-multi-thread-alt
carllerche Jun 30, 2023
0abf746
try to make clippy happy
carllerche Jun 30, 2023
3c7973e
see if this passes
carllerche Jul 2, 2023
3e3aa98
try with more stack
carllerche Jul 2, 2023
15dac69
remove release
carllerche Jul 2, 2023
55e7da4
actually remove release
carllerche Jul 2, 2023
5bf1f0d
try no capture
carllerche Jul 2, 2023
6de9bf6
release
carllerche Jul 3, 2023
a00d890
Merge remote-tracking branch 'origin/master' into rt-multi-thread-alt
carllerche Jul 17, 2023
9f93079
try to make CI happy... again
carllerche Jul 18, 2023
1b4fb39
try again
carllerche Jul 18, 2023
9b62ba3
Merge remote-tracking branch 'origin/master' into rt-multi-thread-alt
carllerche Jul 18, 2023
f37e054
fix build
carllerche Jul 18, 2023
7dff6be
increase global queue interval for loom tests
carllerche Jul 20, 2023
4888908
Merge remote-tracking branch 'origin/master' into rt-multi-thread-alt
carllerche Jul 20, 2023
80f037d
fix build
carllerche Jul 20, 2023
43bc3ae
try removing idle map
carllerche Jul 20, 2023
2542796
try fixing the build
carllerche Jul 20, 2023
51b342c
again
carllerche Jul 20, 2023
43331f1
fmt
carllerche Jul 20, 2023
fad2ce2
try again
carllerche Jul 21, 2023
ca7d282
use released loom
carllerche Jul 21, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,6 @@ members = [
"tests-build",
"tests-integration",
]

[patch.crates-io]
loom = { git = "https://github.com/tokio-rs/loom" }
carllerche marked this conversation as resolved.
Show resolved Hide resolved
2 changes: 2 additions & 0 deletions tokio/src/loom/std/unsafe_cell.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,12 @@ impl<T> UnsafeCell<T> {
UnsafeCell(std::cell::UnsafeCell::new(data))
}

#[inline(always)]
pub(crate) fn with<R>(&self, f: impl FnOnce(*const T) -> R) -> R {
f(self.0.get())
}

#[inline(always)]
pub(crate) fn with_mut<R>(&self, f: impl FnOnce(*mut T) -> R) -> R {
f(self.0.get())
}
Expand Down
4 changes: 4 additions & 0 deletions tokio/src/runtime/blocking/schedule.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ impl BlockingSchedule {
}
#[cfg(all(feature = "rt-multi-thread", not(tokio_wasi)))]
scheduler::Handle::MultiThread(_) => {}
#[cfg(all(tokio_unstable, feature = "rt-multi-thread", not(tokio_wasi)))]
scheduler::Handle::MultiThreadAlt(_) => {}
}
}
BlockingSchedule {
Expand All @@ -45,6 +47,8 @@ impl task::Schedule for BlockingSchedule {
}
#[cfg(all(feature = "rt-multi-thread", not(tokio_wasi)))]
scheduler::Handle::MultiThread(_) => {}
#[cfg(all(tokio_unstable, feature = "rt-multi-thread", not(tokio_wasi)))]
scheduler::Handle::MultiThreadAlt(_) => {}
}
}
None
Expand Down
68 changes: 68 additions & 0 deletions tokio/src/runtime/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,8 @@ pub(crate) enum Kind {
CurrentThread,
#[cfg(all(feature = "rt-multi-thread", not(tokio_wasi)))]
MultiThread,
#[cfg(all(tokio_unstable, feature = "rt-multi-thread", not(tokio_wasi)))]
MultiThreadAlt,
}

impl Builder {
Expand Down Expand Up @@ -230,6 +232,26 @@ impl Builder {
// The number `61` is fairly arbitrary. I believe this value was copied from golang.
Builder::new(Kind::MultiThread, 61)
}

cfg_unstable! {
/// Returns a new builder with the alternate multi thread scheduler
/// selected.
///
/// The alternate multi threaded scheduler is an in-progress
/// candidate to replace the existing multi threaded scheduler. It
/// currently does not scale as well to 16+ processors.
///
/// This runtime flavor is currently **not considered production
/// ready**.
///
/// Configuration methods can be chained on the return value.
#[cfg(feature = "rt-multi-thread")]
#[cfg_attr(docsrs, doc(cfg(feature = "rt-multi-thread")))]
pub fn new_multi_thread_alt() -> Builder {
// The number `61` is fairly arbitrary. I believe this value was copied from golang.
Builder::new(Kind::MultiThreadAlt, 61)
}
}
}

/// Returns a new runtime builder initialized with default configuration
Expand Down Expand Up @@ -656,6 +678,8 @@ impl Builder {
Kind::CurrentThread => self.build_current_thread_runtime(),
#[cfg(all(feature = "rt-multi-thread", not(tokio_wasi)))]
Kind::MultiThread => self.build_threaded_runtime(),
#[cfg(all(tokio_unstable, feature = "rt-multi-thread", not(tokio_wasi)))]
Kind::MultiThreadAlt => self.build_alt_threaded_runtime(),
}
}

Expand All @@ -665,6 +689,8 @@ impl Builder {
Kind::CurrentThread => true,
#[cfg(all(feature = "rt-multi-thread", not(tokio_wasi)))]
Kind::MultiThread => false,
#[cfg(all(tokio_unstable, feature = "rt-multi-thread", not(tokio_wasi)))]
Kind::MultiThreadAlt => false,
},
enable_io: self.enable_io,
enable_time: self.enable_time,
Expand Down Expand Up @@ -1214,6 +1240,48 @@ cfg_rt_multi_thread! {

Ok(Runtime::from_parts(Scheduler::MultiThread(scheduler), handle, blocking_pool))
}

cfg_unstable! {
fn build_alt_threaded_runtime(&mut self) -> io::Result<Runtime> {
use crate::loom::sys::num_cpus;
use crate::runtime::{Config, runtime::Scheduler};
use crate::runtime::scheduler::MultiThreadAlt;

let core_threads = self.worker_threads.unwrap_or_else(num_cpus);

let (driver, driver_handle) = driver::Driver::new(self.get_cfg())?;

// Create the blocking pool
let blocking_pool =
blocking::create_blocking_pool(self, self.max_blocking_threads + core_threads);
let blocking_spawner = blocking_pool.spawner().clone();

// Generate a rng seed for this runtime.
let seed_generator_1 = self.seed_generator.next_generator();
let seed_generator_2 = self.seed_generator.next_generator();

let (scheduler, handle) = MultiThreadAlt::new(
core_threads,
driver,
driver_handle,
blocking_spawner,
seed_generator_2,
Config {
before_park: self.before_park.clone(),
after_unpark: self.after_unpark.clone(),
global_queue_interval: self.global_queue_interval,
event_interval: self.event_interval,
#[cfg(tokio_unstable)]
unhandled_panic: self.unhandled_panic.clone(),
disable_lifo_slot: self.disable_lifo_slot,
seed_generator: seed_generator_1,
metrics_poll_count_histogram: self.metrics_poll_count_histogram_builder(),
},
);

Ok(Runtime::from_parts(Scheduler::MultiThreadAlt(scheduler), handle, blocking_pool))
}
}
}
}

Expand Down
2 changes: 2 additions & 0 deletions tokio/src/runtime/handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -355,6 +355,8 @@ impl Handle {
scheduler::Handle::CurrentThread(_) => RuntimeFlavor::CurrentThread,
#[cfg(all(feature = "rt-multi-thread", not(tokio_wasi)))]
scheduler::Handle::MultiThread(_) => RuntimeFlavor::MultiThread,
#[cfg(all(tokio_unstable, feature = "rt-multi-thread", not(tokio_wasi)))]
scheduler::Handle::MultiThreadAlt(_) => RuntimeFlavor::MultiThreadAlt,
}
}
}
Expand Down
19 changes: 19 additions & 0 deletions tokio/src/runtime/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,10 @@ use std::time::Duration;
cfg_rt_multi_thread! {
use crate::runtime::Builder;
use crate::runtime::scheduler::MultiThread;

cfg_unstable! {
use crate::runtime::scheduler::MultiThreadAlt;
}
}

/// The Tokio runtime.
Expand Down Expand Up @@ -84,6 +88,9 @@ pub enum RuntimeFlavor {
CurrentThread,
/// The flavor that executes tasks across multiple threads.
MultiThread,
/// The flavor that executes tasks across multiple threads.
#[cfg(tokio_unstable)]
MultiThreadAlt,
}

/// The runtime scheduler is either a multi-thread or a current-thread executor.
Expand All @@ -95,6 +102,10 @@ pub(super) enum Scheduler {
/// Execute tasks across multiple threads.
#[cfg(all(feature = "rt-multi-thread", not(tokio_wasi)))]
MultiThread(MultiThread),

/// Execute tasks across multiple threads.
#[cfg(all(tokio_unstable, feature = "rt-multi-thread", not(tokio_wasi)))]
MultiThreadAlt(MultiThreadAlt),
}

impl Runtime {
Expand Down Expand Up @@ -311,6 +322,8 @@ impl Runtime {
Scheduler::CurrentThread(exec) => exec.block_on(&self.handle.inner, future),
#[cfg(all(feature = "rt-multi-thread", not(tokio_wasi)))]
Scheduler::MultiThread(exec) => exec.block_on(&self.handle.inner, future),
#[cfg(all(tokio_unstable, feature = "rt-multi-thread", not(tokio_wasi)))]
Scheduler::MultiThreadAlt(exec) => exec.block_on(&self.handle.inner, future),
}
}

Expand Down Expand Up @@ -431,6 +444,12 @@ impl Drop for Runtime {
// already in the runtime's context.
multi_thread.shutdown(&self.handle.inner);
}
#[cfg(all(tokio_unstable, feature = "rt-multi-thread", not(tokio_wasi)))]
Scheduler::MultiThreadAlt(multi_thread) => {
// The threaded scheduler drops its tasks on its worker threads, which is
// already in the runtime's context.
multi_thread.shutdown(&self.handle.inner);
}
}
}
}
Expand Down
21 changes: 21 additions & 0 deletions tokio/src/runtime/scheduler/block_in_place.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
use crate::runtime::scheduler;

#[track_caller]
pub(crate) fn block_in_place<F, R>(f: F) -> R
where
F: FnOnce() -> R,
{
#[cfg(tokio_unstable)]
{
use crate::runtime::{Handle, RuntimeFlavor::MultiThreadAlt};

match Handle::try_current().map(|h| h.runtime_flavor()) {
Ok(MultiThreadAlt) => {
return scheduler::multi_thread_alt::block_in_place(f);
}
_ => {}
}
}

scheduler::multi_thread::block_in_place(f)
}
4 changes: 4 additions & 0 deletions tokio/src/runtime/scheduler/current_thread.rs
Original file line number Diff line number Diff line change
Expand Up @@ -523,6 +523,10 @@ cfg_metrics! {
&self.shared.worker_metrics
}

pub(crate) fn worker_local_queue_depth(&self, worker: usize) -> usize {
self.worker_metrics(worker).queue_depth()
}

pub(crate) fn num_blocking_threads(&self) -> usize {
self.blocking_spawner.num_threads()
}
Expand Down
Loading