-
-
Notifications
You must be signed in to change notification settings - Fork 2.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
add a Handle::current() for getting access to TLS Runtime #2040
Changes from 7 commits
628f5e1
501e6f1
80f1705
4c40fa0
1aae563
7711bf1
370844d
92d56fa
816fe2f
2d1053e
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -8,7 +8,6 @@ use crate::runtime::blocking::task::BlockingTask; | |
use crate::runtime::{self, context::ThreadContext, io, time, Builder, Callback}; | ||
use crate::task::{self, JoinHandle}; | ||
|
||
use std::cell::Cell; | ||
use std::collections::VecDeque; | ||
use std::fmt; | ||
use std::time::Duration; | ||
|
@@ -68,28 +67,21 @@ struct Shared { | |
|
||
type Task = task::Task<NoopSchedule>; | ||
|
||
thread_local! { | ||
/// Thread-local tracking the current executor | ||
static BLOCKING: Cell<Option<*const Spawner>> = Cell::new(None) | ||
} | ||
|
||
const KEEP_ALIVE: Duration = Duration::from_secs(10); | ||
|
||
/// Run the provided function on an executor dedicated to blocking operations. | ||
pub(crate) fn spawn_blocking<F, R>(func: F) -> JoinHandle<R> | ||
where | ||
F: FnOnce() -> R + Send + 'static, | ||
{ | ||
BLOCKING.with(|cell| { | ||
let schedule = match cell.get() { | ||
Some(ptr) => unsafe { &*ptr }, | ||
None => panic!("not currently running on the Tokio runtime."), | ||
}; | ||
use crate::runtime::context::ThreadContext; | ||
|
||
let schedule = | ||
ThreadContext::blocking_spawner().expect("not currently running on the Tokio runtime."); | ||
|
||
let (task, handle) = task::joinable(BlockingTask::new(func)); | ||
schedule.schedule(task); | ||
handle | ||
}) | ||
let (task, handle) = task::joinable(BlockingTask::new(func)); | ||
schedule.schedule(task); | ||
handle | ||
} | ||
|
||
// ===== impl BlockingPool ===== | ||
|
@@ -168,30 +160,10 @@ impl Spawner { | |
where | ||
F: FnOnce() -> R, | ||
{ | ||
// While scary, this is safe. The function takes a `&BlockingPool`, | ||
// which guarantees that the reference lives for the duration of | ||
// `with_pool`. | ||
// | ||
// Because we are always clearing the TLS value at the end of the | ||
// function, we can cast the reference to 'static which thread-local | ||
// cells require. | ||
BLOCKING.with(|cell| { | ||
let was = cell.replace(None); | ||
|
||
// Ensure that the pool is removed from the thread-local context | ||
// when leaving the scope. This handles cases that involve panicking. | ||
struct Reset<'a>(&'a Cell<Option<*const Spawner>>, Option<*const Spawner>); | ||
|
||
impl Drop for Reset<'_> { | ||
fn drop(&mut self) { | ||
self.0.set(self.1); | ||
} | ||
} | ||
let ctx = crate::runtime::context::ThreadContext::clone_current(); | ||
let _e = ctx.with_blocking_spawner(self.clone()).enter(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm confused about why the Cell with a raw pointer was necessary here, when Spawner has an internal Arc. So I simplified this to focus on just using that fact. I'm sure I'm missing something. The Drop for ThreadContext along with enter performs the same revert that was happening with the Reset before. |
||
|
||
let _reset = Reset(cell, was); | ||
cell.set(Some(self as *const Spawner)); | ||
f() | ||
}) | ||
f() | ||
} | ||
|
||
fn schedule(&self, task: Task) { | ||
|
@@ -248,6 +220,7 @@ impl Spawner { | |
self.inner.io_handle.clone(), | ||
self.inner.time_handle.clone(), | ||
Some(self.inner.clock.clone()), | ||
Some(self.clone()), | ||
); | ||
let spawner = self.clone(); | ||
builder | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -20,6 +20,9 @@ pub(crate) struct ThreadContext { | |
|
||
/// Source of `Instant::now()` | ||
clock: Option<crate::runtime::time::Clock>, | ||
|
||
/// Blocking pool spawner | ||
blocking_spawner: Option<crate::runtime::blocking::Spawner>, | ||
} | ||
|
||
impl Default for ThreadContext { | ||
|
@@ -35,6 +38,7 @@ impl Default for ThreadContext { | |
#[cfg(any(not(feature = "time"), loom))] | ||
time_handle: (), | ||
clock: None, | ||
blocking_spawner: None, | ||
} | ||
} | ||
} | ||
|
@@ -48,6 +52,7 @@ impl ThreadContext { | |
io_handle: crate::runtime::io::Handle, | ||
time_handle: crate::runtime::time::Handle, | ||
clock: Option<crate::runtime::time::Clock>, | ||
blocking_spawner: Option<crate::runtime::blocking::Spawner>, | ||
) -> Self { | ||
ThreadContext { | ||
spawner, | ||
|
@@ -60,6 +65,7 @@ impl ThreadContext { | |
#[cfg(any(not(feature = "time"), loom))] | ||
time_handle, | ||
clock, | ||
blocking_spawner, | ||
} | ||
} | ||
|
||
|
@@ -93,31 +99,54 @@ impl ThreadContext { | |
self | ||
} | ||
|
||
#[cfg(all(feature = "io-driver", not(loom)))] | ||
#[cfg(any( | ||
feature = "blocking", | ||
feature = "dns", | ||
feature = "fs", | ||
feature = "io-std", | ||
feature = "rt-threaded", | ||
))] | ||
pub(crate) fn with_blocking_spawner( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think this can be replaced w/ the cfg_blocking_impl! {
impl ThreadContext {
pub(crate) fn with_blocking_spawner(....) { ... }
}
} There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ok, done. |
||
mut self, | ||
blocking_spawner: crate::runtime::blocking::Spawner, | ||
) -> Self { | ||
self.blocking_spawner.replace(blocking_spawner); | ||
self | ||
} | ||
|
||
pub(crate) fn io_handle() -> crate::runtime::io::Handle { | ||
#[cfg(any(not(feature = "io-driver"), loom))] | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Could you elaborate on why this change was made? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Oh. I think it was failing CI with a unused warning? Though I can try reverting it. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. So the way that it's written right now, the field is not optional, it gets swapped out for error[E0308]: match arms have incompatible types
--> tokio/src/runtime/context.rs:105:21
|
103 | CONTEXT.with(|ctx| match *ctx.borrow() {
| ____________________________-
104 | | Some(ref ctx) => ctx.io_handle.clone(),
| | --------------------- this is found to be of type `()`
105 | | None => None,
| | ^^^^ expected (), found enum `std::option::Option`
106 | | })
| |_________- `match` arms have incompatible types
|
= note: expected type `()`
found type `std::option::Option<_>` There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ah! What if you sub That said, it is curious that There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This stems from the way the Handle::Current works, which asks for all the components of ThreadContext, where Handle has all of these, so cfging off the io_handle function will spider out for a lot of other changes. Maybe Default::default() will work... There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ok, that works. |
||
{ | ||
return (); | ||
} | ||
|
||
#[cfg(all(feature = "io-driver", not(loom)))] | ||
CONTEXT.with(|ctx| match *ctx.borrow() { | ||
Some(ref ctx) => ctx.io_handle.clone(), | ||
None => None, | ||
}) | ||
} | ||
|
||
#[cfg(all(feature = "time", not(loom)))] | ||
pub(crate) fn time_handle() -> crate::runtime::time::Handle { | ||
#[cfg(any(not(feature = "time"), loom))] | ||
{ | ||
return (); | ||
} | ||
|
||
#[cfg(all(feature = "time", not(loom)))] | ||
CONTEXT.with(|ctx| match *ctx.borrow() { | ||
Some(ref ctx) => ctx.time_handle.clone(), | ||
None => None, | ||
}) | ||
} | ||
|
||
#[cfg(feature = "rt-core")] | ||
pub(crate) fn spawn_handle() -> Option<Spawner> { | ||
CONTEXT.with(|ctx| match *ctx.borrow() { | ||
Some(ref ctx) => Some(ctx.spawner.clone()), | ||
None => None, | ||
}) | ||
} | ||
|
||
#[cfg(all(feature = "test-util", feature = "time"))] | ||
pub(crate) fn clock() -> Option<crate::runtime::time::Clock> { | ||
CONTEXT.with( | ||
|ctx| match ctx.borrow().as_ref().map(|ctx| ctx.clock.clone()) { | ||
|
@@ -126,6 +155,19 @@ impl ThreadContext { | |
}, | ||
) | ||
} | ||
|
||
pub(crate) fn blocking_spawner() -> Option<crate::runtime::blocking::Spawner> { | ||
CONTEXT.with(|ctx| { | ||
match ctx | ||
.borrow() | ||
.as_ref() | ||
.map(|ctx| ctx.blocking_spawner.clone()) | ||
{ | ||
Some(Some(blocking_spawner)) => Some(blocking_spawner), | ||
_ => None, | ||
} | ||
}) | ||
} | ||
} | ||
|
||
/// [`ThreadContextDropGuard`] will replace the `previous` thread context on drop. | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't see why this isn't necessary and wasn't here before...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Currently, since
BlockingPool::enter
only sets the context for the blocking pool spawner, the no-op version doesn't need to do anything. This fn is only here to make feature flag code easier.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok. Would you like me to remove this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think removing it would be best.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed.