Skip to content

Commit 0193df3

Browse files
bluejekyllcarllerche
authored andcommitted
rt: add a Handle::current() (#2040)
Adds `Handle::current()` for accessing a handle to the runtime associated with the current thread. This handle can then be passed to other threads in order to spawn or perform other runtime related tasks.
1 parent 5930ace commit 0193df3

File tree

4 files changed

+92
-45
lines changed

4 files changed

+92
-45
lines changed

tokio/src/runtime/blocking/mod.rs

+6
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,12 @@ cfg_not_blocking_impl! {
5656
self
5757
}
5858

59+
#[cfg(any(
60+
feature = "blocking",
61+
feature = "dns",
62+
feature = "fs",
63+
feature = "io-std",
64+
))]
5965
pub(crate) fn enter<F, R>(&self, f: F) -> R
6066
where
6167
F: FnOnce() -> R,

tokio/src/runtime/blocking/pool.rs

+11-38
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@ use crate::runtime::blocking::task::BlockingTask;
88
use crate::runtime::{self, context::ThreadContext, io, time, Builder, Callback};
99
use crate::task::{self, JoinHandle};
1010

11-
use std::cell::Cell;
1211
use std::collections::VecDeque;
1312
use std::fmt;
1413
use std::time::Duration;
@@ -68,28 +67,21 @@ struct Shared {
6867

6968
type Task = task::Task<NoopSchedule>;
7069

71-
thread_local! {
72-
/// Thread-local tracking the current executor
73-
static BLOCKING: Cell<Option<*const Spawner>> = Cell::new(None)
74-
}
75-
7670
const KEEP_ALIVE: Duration = Duration::from_secs(10);
7771

7872
/// Run the provided function on an executor dedicated to blocking operations.
7973
pub(crate) fn spawn_blocking<F, R>(func: F) -> JoinHandle<R>
8074
where
8175
F: FnOnce() -> R + Send + 'static,
8276
{
83-
BLOCKING.with(|cell| {
84-
let schedule = match cell.get() {
85-
Some(ptr) => unsafe { &*ptr },
86-
None => panic!("not currently running on the Tokio runtime."),
87-
};
77+
use crate::runtime::context::ThreadContext;
78+
79+
let schedule =
80+
ThreadContext::blocking_spawner().expect("not currently running on the Tokio runtime.");
8881

89-
let (task, handle) = task::joinable(BlockingTask::new(func));
90-
schedule.schedule(task);
91-
handle
92-
})
82+
let (task, handle) = task::joinable(BlockingTask::new(func));
83+
schedule.schedule(task);
84+
handle
9385
}
9486

9587
// ===== impl BlockingPool =====
@@ -168,30 +160,10 @@ impl Spawner {
168160
where
169161
F: FnOnce() -> R,
170162
{
171-
// While scary, this is safe. The function takes a `&BlockingPool`,
172-
// which guarantees that the reference lives for the duration of
173-
// `with_pool`.
174-
//
175-
// Because we are always clearing the TLS value at the end of the
176-
// function, we can cast the reference to 'static which thread-local
177-
// cells require.
178-
BLOCKING.with(|cell| {
179-
let was = cell.replace(None);
180-
181-
// Ensure that the pool is removed from the thread-local context
182-
// when leaving the scope. This handles cases that involve panicking.
183-
struct Reset<'a>(&'a Cell<Option<*const Spawner>>, Option<*const Spawner>);
184-
185-
impl Drop for Reset<'_> {
186-
fn drop(&mut self) {
187-
self.0.set(self.1);
188-
}
189-
}
163+
let ctx = crate::runtime::context::ThreadContext::clone_current();
164+
let _e = ctx.with_blocking_spawner(self.clone()).enter();
190165

191-
let _reset = Reset(cell, was);
192-
cell.set(Some(self as *const Spawner));
193-
f()
194-
})
166+
f()
195167
}
196168

197169
fn schedule(&self, task: Task) {
@@ -248,6 +220,7 @@ impl Spawner {
248220
self.inner.io_handle.clone(),
249221
self.inner.time_handle.clone(),
250222
Some(self.inner.clock.clone()),
223+
Some(self.clone()),
251224
);
252225
let spawner = self.clone();
253226
builder

tokio/src/runtime/context.rs

+33-6
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,9 @@ pub(crate) struct ThreadContext {
2020

2121
/// Source of `Instant::now()`
2222
clock: Option<crate::runtime::time::Clock>,
23+
24+
/// Blocking pool spawner
25+
blocking_spawner: Option<crate::runtime::blocking::Spawner>,
2326
}
2427

2528
impl Default for ThreadContext {
@@ -35,6 +38,7 @@ impl Default for ThreadContext {
3538
#[cfg(any(not(feature = "time"), loom))]
3639
time_handle: (),
3740
clock: None,
41+
blocking_spawner: None,
3842
}
3943
}
4044
}
@@ -48,6 +52,7 @@ impl ThreadContext {
4852
io_handle: crate::runtime::io::Handle,
4953
time_handle: crate::runtime::time::Handle,
5054
clock: Option<crate::runtime::time::Clock>,
55+
blocking_spawner: Option<crate::runtime::blocking::Spawner>,
5156
) -> Self {
5257
ThreadContext {
5358
spawner,
@@ -60,6 +65,7 @@ impl ThreadContext {
6065
#[cfg(any(not(feature = "time"), loom))]
6166
time_handle,
6267
clock,
68+
blocking_spawner,
6369
}
6470
}
6571

@@ -81,31 +87,27 @@ impl ThreadContext {
8187
})
8288
}
8389

84-
#[cfg(all(feature = "io-driver", not(loom)))]
8590
pub(crate) fn io_handle() -> crate::runtime::io::Handle {
8691
CONTEXT.with(|ctx| match *ctx.borrow() {
8792
Some(ref ctx) => ctx.io_handle.clone(),
88-
None => None,
93+
None => Default::default(),
8994
})
9095
}
9196

92-
#[cfg(all(feature = "time", not(loom)))]
9397
pub(crate) fn time_handle() -> crate::runtime::time::Handle {
9498
CONTEXT.with(|ctx| match *ctx.borrow() {
9599
Some(ref ctx) => ctx.time_handle.clone(),
96-
None => None,
100+
None => Default::default(),
97101
})
98102
}
99103

100-
#[cfg(feature = "rt-core")]
101104
pub(crate) fn spawn_handle() -> Option<Spawner> {
102105
CONTEXT.with(|ctx| match *ctx.borrow() {
103106
Some(ref ctx) => Some(ctx.spawner.clone()),
104107
None => None,
105108
})
106109
}
107110

108-
#[cfg(all(feature = "test-util", feature = "time"))]
109111
pub(crate) fn clock() -> Option<crate::runtime::time::Clock> {
110112
CONTEXT.with(
111113
|ctx| match ctx.borrow().as_ref().map(|ctx| ctx.clock.clone()) {
@@ -114,6 +116,31 @@ impl ThreadContext {
114116
},
115117
)
116118
}
119+
120+
pub(crate) fn blocking_spawner() -> Option<crate::runtime::blocking::Spawner> {
121+
CONTEXT.with(|ctx| {
122+
match ctx
123+
.borrow()
124+
.as_ref()
125+
.map(|ctx| ctx.blocking_spawner.clone())
126+
{
127+
Some(Some(blocking_spawner)) => Some(blocking_spawner),
128+
_ => None,
129+
}
130+
})
131+
}
132+
}
133+
134+
cfg_blocking_impl! {
135+
impl ThreadContext {
136+
pub(crate) fn with_blocking_spawner(
137+
mut self,
138+
blocking_spawner: crate::runtime::blocking::Spawner,
139+
) -> Self {
140+
self.blocking_spawner.replace(blocking_spawner);
141+
self
142+
}
143+
}
117144
}
118145

119146
/// [`ThreadContextDropGuard`] will replace the `previous` thread context on drop.

tokio/src/runtime/handle.rs

+42-1
Original file line numberDiff line numberDiff line change
@@ -35,9 +35,50 @@ impl Handle {
3535
self.io_handle.clone(),
3636
self.time_handle.clone(),
3737
Some(self.clock.clone()),
38+
Some(self.blocking_spawner.clone()),
3839
)
3940
.enter();
40-
self.blocking_spawner.enter(|| f())
41+
42+
f()
43+
}
44+
45+
/// Returns a Handle view over the currently running Runtime
46+
///
47+
/// # Panic
48+
///
49+
/// A Runtime must have been started or this will panic
50+
///
51+
/// # Examples
52+
///
53+
/// This allows for the current handle to be gotten when running in a `#`
54+
///
55+
/// ```
56+
/// # use tokio::runtime::Runtime;
57+
///
58+
/// # fn dox() {
59+
/// # let rt = Runtime::new().unwrap();
60+
/// # rt.spawn(async {
61+
/// use tokio::runtime::Handle;
62+
///
63+
/// let handle = Handle::current();
64+
/// handle.spawn(async {
65+
/// println!("now running in the existing Runtime");
66+
/// })
67+
/// # });
68+
/// # }
69+
/// ```
70+
pub fn current() -> Self {
71+
use crate::runtime::context::ThreadContext;
72+
73+
Handle {
74+
spawner: ThreadContext::spawn_handle()
75+
.expect("not currently running on the Tokio runtime."),
76+
io_handle: ThreadContext::io_handle(),
77+
time_handle: ThreadContext::time_handle(),
78+
clock: ThreadContext::clock().expect("not currently running on the Tokio runtime."),
79+
blocking_spawner: ThreadContext::blocking_spawner()
80+
.expect("not currently running on the Tokio runtime."),
81+
}
4182
}
4283
}
4384

0 commit comments

Comments
 (0)