Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add a Handle::current() for getting access to TLS Runtime #2040

Merged
merged 10 commits into from
Jan 6, 2020
2 changes: 2 additions & 0 deletions tokio/src/runtime/blocking/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ cfg_not_blocking_impl! {
where
F: FnOnce() -> R,
{
let ctx = crate::runtime::context::ThreadContext::clone_current();
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see why this isn't necessary and wasn't here before...

Copy link
Member

@carllerche carllerche Jan 2, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently, since BlockingPool::enter only sets the context for the blocking pool spawner, the no-op version doesn't need to do anything. This fn is only here to make feature flag code easier.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok. Would you like me to remove this?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think removing it would be best.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed.

let _e = ctx.with_blocking_spawner(self.clone()).enter();
f()
}
}
Expand Down
49 changes: 11 additions & 38 deletions tokio/src/runtime/blocking/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ use crate::runtime::blocking::task::BlockingTask;
use crate::runtime::{self, context::ThreadContext, io, time, Builder, Callback};
use crate::task::{self, JoinHandle};

use std::cell::Cell;
use std::collections::VecDeque;
use std::fmt;
use std::time::Duration;
Expand Down Expand Up @@ -68,28 +67,21 @@ struct Shared {

type Task = task::Task<NoopSchedule>;

thread_local! {
/// Thread-local tracking the current executor
static BLOCKING: Cell<Option<*const Spawner>> = Cell::new(None)
}

const KEEP_ALIVE: Duration = Duration::from_secs(10);

/// Run the provided function on an executor dedicated to blocking operations.
pub(crate) fn spawn_blocking<F, R>(func: F) -> JoinHandle<R>
where
F: FnOnce() -> R + Send + 'static,
{
BLOCKING.with(|cell| {
let schedule = match cell.get() {
Some(ptr) => unsafe { &*ptr },
None => panic!("not currently running on the Tokio runtime."),
};
use crate::runtime::context::ThreadContext;

let schedule =
ThreadContext::blocking_spawner().expect("not currently running on the Tokio runtime.");

let (task, handle) = task::joinable(BlockingTask::new(func));
schedule.schedule(task);
handle
})
let (task, handle) = task::joinable(BlockingTask::new(func));
schedule.schedule(task);
handle
}

// ===== impl BlockingPool =====
Expand Down Expand Up @@ -168,30 +160,10 @@ impl Spawner {
where
F: FnOnce() -> R,
{
// While scary, this is safe. The function takes a `&BlockingPool`,
// which guarantees that the reference lives for the duration of
// `with_pool`.
//
// Because we are always clearing the TLS value at the end of the
// function, we can cast the reference to 'static which thread-local
// cells require.
BLOCKING.with(|cell| {
let was = cell.replace(None);

// Ensure that the pool is removed from the thread-local context
// when leaving the scope. This handles cases that involve panicking.
struct Reset<'a>(&'a Cell<Option<*const Spawner>>, Option<*const Spawner>);

impl Drop for Reset<'_> {
fn drop(&mut self) {
self.0.set(self.1);
}
}
let ctx = crate::runtime::context::ThreadContext::clone_current();
let _e = ctx.with_blocking_spawner(self.clone()).enter();
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm confused about why the Cell with a raw pointer was necessary here, when Spawner has an internal Arc.

So I simplified this to focus on just using that fact. I'm sure I'm missing something. The Drop for ThreadContext along with enter performs the same revert that was happening with the Reset before.


let _reset = Reset(cell, was);
cell.set(Some(self as *const Spawner));
f()
})
f()
}

fn schedule(&self, task: Task) {
Expand Down Expand Up @@ -248,6 +220,7 @@ impl Spawner {
self.inner.io_handle.clone(),
self.inner.time_handle.clone(),
Some(self.inner.clock.clone()),
Some(self.clone()),
);
let spawner = self.clone();
builder
Expand Down
43 changes: 39 additions & 4 deletions tokio/src/runtime/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ pub(crate) struct ThreadContext {

/// Source of `Instant::now()`
clock: Option<crate::runtime::time::Clock>,

/// Blocking pool spawner
blocking_spawner: Option<crate::runtime::blocking::Spawner>,
}

impl Default for ThreadContext {
Expand All @@ -35,6 +38,7 @@ impl Default for ThreadContext {
#[cfg(any(not(feature = "time"), loom))]
time_handle: (),
clock: None,
blocking_spawner: None,
}
}
}
Expand All @@ -48,6 +52,7 @@ impl ThreadContext {
io_handle: crate::runtime::io::Handle,
time_handle: crate::runtime::time::Handle,
clock: Option<crate::runtime::time::Clock>,
blocking_spawner: Option<crate::runtime::blocking::Spawner>,
) -> Self {
ThreadContext {
spawner,
Expand All @@ -60,6 +65,7 @@ impl ThreadContext {
#[cfg(any(not(feature = "time"), loom))]
time_handle,
clock,
blocking_spawner,
}
}

Expand Down Expand Up @@ -93,31 +99,47 @@ impl ThreadContext {
self
}

#[cfg(all(feature = "io-driver", not(loom)))]
pub(crate) fn with_blocking_spawner(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this can be replaced w/ the cfg_blocking_impl macro (here). It cannot go around fn defs, so you would split it out to another impl block:

cfg_blocking_impl! {
    impl ThreadContext {
        pub(crate) fn with_blocking_spawner(....) { ... }
    }
}

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, done.

mut self,
blocking_spawner: crate::runtime::blocking::Spawner,
) -> Self {
self.blocking_spawner.replace(blocking_spawner);
self
}

pub(crate) fn io_handle() -> crate::runtime::io::Handle {
#[cfg(any(not(feature = "io-driver"), loom))]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you elaborate on why this change was made?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh. I think it was failing CI with a unused warning? Though I can try reverting it.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should the fn io_handle be scoped w/ a cfg then? The fns below have had their cfg bits removed. I'd be interested in seeing the error.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So the way that it's written right now, the field is not optional, it gets swapped out for () when these features aren't enabled (apparently). Here's the error with a default cargo check with those changed reverted::

error[E0308]: match arms have incompatible types
   --> tokio/src/runtime/context.rs:105:21
    |
103 |           CONTEXT.with(|ctx| match *ctx.borrow() {
    |  ____________________________-
104 | |             Some(ref ctx) => ctx.io_handle.clone(),
    | |                              --------------------- this is found to be of type `()`
105 | |             None => None,
    | |                     ^^^^ expected (), found enum `std::option::Option`
106 | |         })
    | |_________- `match` arms have incompatible types
    |
    = note: expected type `()`
               found type `std::option::Option<_>`

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah! What if you sub None for Default::default()?

That said, it is curious that io_handle() is required even when the io-driver feature isn't enabled.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This stems from the way the Handle::Current works, which asks for all the components of ThreadContext, where Handle has all of these, so cfging off the io_handle function will spider out for a lot of other changes. Maybe Default::default() will work...

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, that works.

{
return ();
}

#[cfg(all(feature = "io-driver", not(loom)))]
CONTEXT.with(|ctx| match *ctx.borrow() {
Some(ref ctx) => ctx.io_handle.clone(),
None => None,
})
}

#[cfg(all(feature = "time", not(loom)))]
pub(crate) fn time_handle() -> crate::runtime::time::Handle {
#[cfg(any(not(feature = "time"), loom))]
{
return ();
}

#[cfg(all(feature = "time", not(loom)))]
CONTEXT.with(|ctx| match *ctx.borrow() {
Some(ref ctx) => ctx.time_handle.clone(),
None => None,
})
}

#[cfg(feature = "rt-core")]
pub(crate) fn spawn_handle() -> Option<Spawner> {
CONTEXT.with(|ctx| match *ctx.borrow() {
Some(ref ctx) => Some(ctx.spawner.clone()),
None => None,
})
}

#[cfg(all(feature = "test-util", feature = "time"))]
pub(crate) fn clock() -> Option<crate::runtime::time::Clock> {
CONTEXT.with(
|ctx| match ctx.borrow().as_ref().map(|ctx| ctx.clock.clone()) {
Expand All @@ -126,6 +148,19 @@ impl ThreadContext {
},
)
}

pub(crate) fn blocking_spawner() -> Option<crate::runtime::blocking::Spawner> {
CONTEXT.with(|ctx| {
match ctx
.borrow()
.as_ref()
.map(|ctx| ctx.blocking_spawner.clone())
{
Some(Some(blocking_spawner)) => Some(blocking_spawner),
_ => None,
}
})
}
}

/// [`ThreadContextDropGuard`] will replace the `previous` thread context on drop.
Expand Down
19 changes: 19 additions & 0 deletions tokio/src/runtime/handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,29 @@ impl Handle {
self.io_handle.clone(),
self.time_handle.clone(),
Some(self.clock.clone()),
Some(self.blocking_spawner.clone()),
)
.enter();
self.blocking_spawner.enter(|| f())
}

/// Returns a Handle view over the currently running Runtime
///
/// # Panic
///
/// A Runtime must have been started or this will panic
pub fn current() -> Self {
use crate::runtime::context::ThreadContext;

Handle {
spawner: ThreadContext::spawn_handle().expect("Spawner not registered"),
io_handle: ThreadContext::io_handle(),
time_handle: ThreadContext::time_handle(),
clock: ThreadContext::clock().expect("Clock not registered"),
blocking_spawner: ThreadContext::blocking_spawner()
.expect("BlockingSpawner not registered"),
}
}
}

cfg_rt_core! {
Expand Down