Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add a Handle::current() for getting access to TLS Runtime #2040

Merged
merged 10 commits into from
Jan 6, 2020
6 changes: 6 additions & 0 deletions tokio/src/runtime/blocking/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,12 @@ cfg_not_blocking_impl! {
self
}

#[cfg(any(
feature = "blocking",
feature = "dns",
feature = "fs",
feature = "io-std",
))]
pub(crate) fn enter<F, R>(&self, f: F) -> R
where
F: FnOnce() -> R,
Expand Down
49 changes: 11 additions & 38 deletions tokio/src/runtime/blocking/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ use crate::runtime::blocking::task::BlockingTask;
use crate::runtime::{self, context::ThreadContext, io, time, Builder, Callback};
use crate::task::{self, JoinHandle};

use std::cell::Cell;
use std::collections::VecDeque;
use std::fmt;
use std::time::Duration;
Expand Down Expand Up @@ -68,28 +67,21 @@ struct Shared {

type Task = task::Task<NoopSchedule>;

thread_local! {
/// Thread-local tracking the current executor
static BLOCKING: Cell<Option<*const Spawner>> = Cell::new(None)
}

const KEEP_ALIVE: Duration = Duration::from_secs(10);

/// Run the provided function on an executor dedicated to blocking operations.
pub(crate) fn spawn_blocking<F, R>(func: F) -> JoinHandle<R>
where
F: FnOnce() -> R + Send + 'static,
{
BLOCKING.with(|cell| {
let schedule = match cell.get() {
Some(ptr) => unsafe { &*ptr },
None => panic!("not currently running on the Tokio runtime."),
};
use crate::runtime::context::ThreadContext;

let schedule =
ThreadContext::blocking_spawner().expect("not currently running on the Tokio runtime.");

let (task, handle) = task::joinable(BlockingTask::new(func));
schedule.schedule(task);
handle
})
let (task, handle) = task::joinable(BlockingTask::new(func));
schedule.schedule(task);
handle
}

// ===== impl BlockingPool =====
Expand Down Expand Up @@ -168,30 +160,10 @@ impl Spawner {
where
F: FnOnce() -> R,
{
// While scary, this is safe. The function takes a `&BlockingPool`,
// which guarantees that the reference lives for the duration of
// `with_pool`.
//
// Because we are always clearing the TLS value at the end of the
// function, we can cast the reference to 'static which thread-local
// cells require.
BLOCKING.with(|cell| {
let was = cell.replace(None);

// Ensure that the pool is removed from the thread-local context
// when leaving the scope. This handles cases that involve panicking.
struct Reset<'a>(&'a Cell<Option<*const Spawner>>, Option<*const Spawner>);

impl Drop for Reset<'_> {
fn drop(&mut self) {
self.0.set(self.1);
}
}
let ctx = crate::runtime::context::ThreadContext::clone_current();
let _e = ctx.with_blocking_spawner(self.clone()).enter();
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm confused about why the Cell with a raw pointer was necessary here, when Spawner has an internal Arc.

So I simplified this to focus on just using that fact. I'm sure I'm missing something. The Drop for ThreadContext along with enter performs the same revert that was happening with the Reset before.


let _reset = Reset(cell, was);
cell.set(Some(self as *const Spawner));
f()
})
f()
}

fn schedule(&self, task: Task) {
Expand Down Expand Up @@ -248,6 +220,7 @@ impl Spawner {
self.inner.io_handle.clone(),
self.inner.time_handle.clone(),
Some(self.inner.clock.clone()),
Some(self.clone()),
);
let spawner = self.clone();
builder
Expand Down
39 changes: 33 additions & 6 deletions tokio/src/runtime/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ pub(crate) struct ThreadContext {

/// Source of `Instant::now()`
clock: Option<crate::runtime::time::Clock>,

/// Blocking pool spawner
blocking_spawner: Option<crate::runtime::blocking::Spawner>,
}

impl Default for ThreadContext {
Expand All @@ -35,6 +38,7 @@ impl Default for ThreadContext {
#[cfg(any(not(feature = "time"), loom))]
time_handle: (),
clock: None,
blocking_spawner: None,
}
}
}
Expand All @@ -48,6 +52,7 @@ impl ThreadContext {
io_handle: crate::runtime::io::Handle,
time_handle: crate::runtime::time::Handle,
clock: Option<crate::runtime::time::Clock>,
blocking_spawner: Option<crate::runtime::blocking::Spawner>,
) -> Self {
ThreadContext {
spawner,
Expand All @@ -60,6 +65,7 @@ impl ThreadContext {
#[cfg(any(not(feature = "time"), loom))]
time_handle,
clock,
blocking_spawner,
}
}

Expand All @@ -81,31 +87,27 @@ impl ThreadContext {
})
}

#[cfg(all(feature = "io-driver", not(loom)))]
pub(crate) fn io_handle() -> crate::runtime::io::Handle {
CONTEXT.with(|ctx| match *ctx.borrow() {
Some(ref ctx) => ctx.io_handle.clone(),
None => None,
None => Default::default(),
})
}

#[cfg(all(feature = "time", not(loom)))]
pub(crate) fn time_handle() -> crate::runtime::time::Handle {
CONTEXT.with(|ctx| match *ctx.borrow() {
Some(ref ctx) => ctx.time_handle.clone(),
None => None,
None => Default::default(),
})
}

#[cfg(feature = "rt-core")]
pub(crate) fn spawn_handle() -> Option<Spawner> {
CONTEXT.with(|ctx| match *ctx.borrow() {
Some(ref ctx) => Some(ctx.spawner.clone()),
None => None,
})
}

#[cfg(all(feature = "test-util", feature = "time"))]
pub(crate) fn clock() -> Option<crate::runtime::time::Clock> {
CONTEXT.with(
|ctx| match ctx.borrow().as_ref().map(|ctx| ctx.clock.clone()) {
Expand All @@ -114,6 +116,31 @@ impl ThreadContext {
},
)
}

pub(crate) fn blocking_spawner() -> Option<crate::runtime::blocking::Spawner> {
CONTEXT.with(|ctx| {
match ctx
.borrow()
.as_ref()
.map(|ctx| ctx.blocking_spawner.clone())
{
Some(Some(blocking_spawner)) => Some(blocking_spawner),
_ => None,
}
})
}
}

cfg_blocking_impl! {
impl ThreadContext {
pub(crate) fn with_blocking_spawner(
mut self,
blocking_spawner: crate::runtime::blocking::Spawner,
) -> Self {
self.blocking_spawner.replace(blocking_spawner);
self
}
}
}

/// [`ThreadContextDropGuard`] will replace the `previous` thread context on drop.
Expand Down
43 changes: 42 additions & 1 deletion tokio/src/runtime/handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,50 @@ impl Handle {
self.io_handle.clone(),
self.time_handle.clone(),
Some(self.clock.clone()),
Some(self.blocking_spawner.clone()),
)
.enter();
self.blocking_spawner.enter(|| f())

f()
}

/// Returns a Handle view over the currently running Runtime
///
/// # Panic
///
/// A Runtime must have been started or this will panic
///
/// # Examples
///
/// This allows for the current handle to be gotten when running in a `#`
///
/// ```
/// # use tokio::runtime::Runtime;
///
/// # fn dox() {
/// # let rt = Runtime::new().unwrap();
/// # rt.spawn(async {
/// use tokio::runtime::Handle;
///
/// let handle = Handle::current();
/// handle.spawn(async {
/// println!("now running in the existing Runtime");
/// })
/// # });
/// # }
/// ```
pub fn current() -> Self {
use crate::runtime::context::ThreadContext;

Handle {
spawner: ThreadContext::spawn_handle()
.expect("not currently running on the Tokio runtime."),
io_handle: ThreadContext::io_handle(),
time_handle: ThreadContext::time_handle(),
clock: ThreadContext::clock().expect("not currently running on the Tokio runtime."),
blocking_spawner: ThreadContext::blocking_spawner()
.expect("not currently running on the Tokio runtime."),
}
}
}

Expand Down