Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions tokio/src/runtime/handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -329,7 +329,7 @@ impl Handle {
}

#[track_caller]
pub(crate) fn spawn_named<F>(&self, future: F, _meta: SpawnMeta<'_>) -> JoinHandle<F::Output>
pub(crate) fn spawn_named<F>(&self, future: F, meta: SpawnMeta<'_>) -> JoinHandle<F::Output>
where
F: Future + Send + 'static,
F::Output: Send + 'static,
Expand All @@ -344,16 +344,16 @@ impl Handle {
))]
let future = super::task::trace::Trace::root(future);
#[cfg(all(tokio_unstable, feature = "tracing"))]
let future = crate::util::trace::task(future, "task", _meta, id.as_u64());
self.inner.spawn(future, id)
let future = crate::util::trace::task(future, "task", meta, id.as_u64());
self.inner.spawn(future, id, meta.spawned_at)
}

#[track_caller]
#[allow(dead_code)]
pub(crate) unsafe fn spawn_local_named<F>(
&self,
future: F,
_meta: SpawnMeta<'_>,
meta: SpawnMeta<'_>,
) -> JoinHandle<F::Output>
where
F: Future + 'static,
Expand All @@ -369,8 +369,8 @@ impl Handle {
))]
let future = super::task::trace::Trace::root(future);
#[cfg(all(tokio_unstable, feature = "tracing"))]
let future = crate::util::trace::task(future, "task", _meta, id.as_u64());
self.inner.spawn_local(future, id)
let future = crate::util::trace::task(future, "task", meta, id.as_u64());
self.inner.spawn_local(future, id, meta.spawned_at)
}

/// Returns the flavor of the current `Runtime`.
Expand Down
20 changes: 8 additions & 12 deletions tokio/src/runtime/scheduler/current_thread/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use crate::loom::sync::Arc;
use crate::runtime::driver::{self, Driver};
use crate::runtime::scheduler::{self, Defer, Inject};
use crate::runtime::task::{
self, JoinHandle, OwnedTasks, Schedule, Task, TaskHarnessScheduleHooks,
self, JoinHandle, OwnedTasks, Schedule, SpawnLocation, Task, TaskHarnessScheduleHooks,
};
use crate::runtime::{
blocking, context, Config, MetricsBatch, SchedulerMetrics, TaskHooks, TaskMeta, WorkerMetrics,
Expand All @@ -15,7 +15,6 @@ use crate::util::{waker_ref, RngSeedGenerator, Wake, WakerRef};
use std::cell::RefCell;
use std::collections::VecDeque;
use std::future::{poll_fn, Future};
use std::panic::Location;
use std::sync::atomic::Ordering::{AcqRel, Release};
use std::task::Poll::{Pending, Ready};
use std::task::Waker;
Expand Down Expand Up @@ -451,16 +450,13 @@ impl Handle {
me: &Arc<Self>,
future: F,
id: crate::runtime::task::Id,
spawned_at: SpawnLocation,
) -> JoinHandle<F::Output>
where
F: crate::future::Future + Send + 'static,
F::Output: Send + 'static,
{
let spawned_at = Location::caller();
let (handle, notified) = me
.shared
.owned
.bind(future, me.clone(), id, spawned_at.into());
let (handle, notified) = me.shared.owned.bind(future, me.clone(), id, spawned_at);

me.task_hooks.spawn(&TaskMeta {
id,
Expand All @@ -486,16 +482,16 @@ impl Handle {
me: &Arc<Self>,
future: F,
id: crate::runtime::task::Id,
spawned_at: SpawnLocation,
) -> JoinHandle<F::Output>
where
F: crate::future::Future + 'static,
F::Output: 'static,
{
let spawned_at = Location::caller();
let (handle, notified) =
me.shared
.owned
.bind_local(future, me.clone(), id, spawned_at.into());
let (handle, notified) = me
.shared
.owned
.bind_local(future, me.clone(), id, spawned_at);

me.task_hooks.spawn(&TaskMeta {
id,
Expand Down
13 changes: 6 additions & 7 deletions tokio/src/runtime/scheduler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ impl Handle {
cfg_rt! {
use crate::future::Future;
use crate::loom::sync::Arc;
use crate::runtime::{blocking, task::Id};
use crate::runtime::{blocking, task::{Id, SpawnLocation}};
use crate::runtime::context;
use crate::task::JoinHandle;
use crate::util::RngSeedGenerator;
Expand Down Expand Up @@ -117,17 +117,16 @@ cfg_rt! {
}
}

#[track_caller]
pub(crate) fn spawn<F>(&self, future: F, id: Id) -> JoinHandle<F::Output>
pub(crate) fn spawn<F>(&self, future: F, id: Id, spawned_at: SpawnLocation) -> JoinHandle<F::Output>
where
F: Future + Send + 'static,
F::Output: Send + 'static,
{
match self {
Handle::CurrentThread(h) => current_thread::Handle::spawn(h, future, id),
Handle::CurrentThread(h) => current_thread::Handle::spawn(h, future, id, spawned_at),

#[cfg(feature = "rt-multi-thread")]
Handle::MultiThread(h) => multi_thread::Handle::spawn(h, future, id),
Handle::MultiThread(h) => multi_thread::Handle::spawn(h, future, id, spawned_at),
}
}

Expand All @@ -138,13 +137,13 @@ cfg_rt! {
/// by the current thread.
#[allow(irrefutable_let_patterns)]
#[track_caller]
pub(crate) unsafe fn spawn_local<F>(&self, future: F, id: Id) -> JoinHandle<F::Output>
pub(crate) unsafe fn spawn_local<F>(&self, future: F, id: Id, spawned_at: SpawnLocation) -> JoinHandle<F::Output>
where
F: Future + 'static,
F::Output: 'static,
{
if let Handle::CurrentThread(h) = self {
current_thread::Handle::spawn_local(h, future, id)
current_thread::Handle::spawn_local(h, future, id, spawned_at)
} else {
panic!("Only current_thread and LocalSet have spawn_local internals implemented")
}
Expand Down
26 changes: 15 additions & 11 deletions tokio/src/runtime/scheduler/multi_thread/handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,12 @@ use crate::runtime::scheduler::multi_thread::worker;
use crate::runtime::task::{Notified, Task, TaskHarnessScheduleHooks};
use crate::runtime::{
blocking, driver,
task::{self, JoinHandle},
task::{self, JoinHandle, SpawnLocation},
TaskHooks, TaskMeta,
};
use crate::util::RngSeedGenerator;

use std::fmt;
use std::panic::Location;

mod metrics;

Expand Down Expand Up @@ -38,30 +37,35 @@ pub(crate) struct Handle {

impl Handle {
/// Spawns a future onto the thread pool
#[track_caller]
pub(crate) fn spawn<F>(me: &Arc<Self>, future: F, id: task::Id) -> JoinHandle<F::Output>
pub(crate) fn spawn<F>(
me: &Arc<Self>,
future: F,
id: task::Id,
spawned_at: SpawnLocation,
) -> JoinHandle<F::Output>
where
F: crate::future::Future + Send + 'static,
F::Output: Send + 'static,
{
Self::bind_new_task(me, future, id)
Self::bind_new_task(me, future, id, spawned_at)
}

pub(crate) fn shutdown(&self) {
self.close();
}

#[track_caller]
pub(super) fn bind_new_task<T>(me: &Arc<Self>, future: T, id: task::Id) -> JoinHandle<T::Output>
pub(super) fn bind_new_task<T>(
me: &Arc<Self>,
future: T,
id: task::Id,
spawned_at: SpawnLocation,
) -> JoinHandle<T::Output>
where
T: Future + Send + 'static,
T::Output: Send + 'static,
{
let spawned_at = Location::caller();
let (handle, notified) = me
.shared
.owned
.bind(future, me.clone(), id, spawned_at.into());
let (handle, notified) = me.shared.owned.bind(future, me.clone(), id, spawned_at);

me.task_hooks.spawn(&TaskMeta {
id,
Expand Down
2 changes: 1 addition & 1 deletion tokio/src/runtime/task/harness.rs
Original file line number Diff line number Diff line change
Expand Up @@ -373,7 +373,7 @@ where
let _ = panic::catch_unwind(panic::AssertUnwindSafe(|| {
f(&TaskMeta {
id: self.core().task_id,
spawned_at: self.core().spawned_at,
spawned_at: self.core().spawned_at.into(),
_phantom: Default::default(),
})
}));
Expand Down
2 changes: 1 addition & 1 deletion tokio/src/runtime/task/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -444,7 +444,7 @@ impl<S: 'static> Task<S> {
pub(crate) fn task_meta<'task, 'meta>(&'task self) -> crate::runtime::TaskMeta<'meta> {
crate::runtime::TaskMeta {
id: self.id(),
spawned_at: self.spawned_at(),
spawned_at: self.spawned_at().into(),
_phantom: PhantomData,
}
}
Expand Down
13 changes: 6 additions & 7 deletions tokio/src/runtime/task_hooks.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
use std::marker::PhantomData;
use std::panic::Location;

use super::Config;
use std::marker::PhantomData;

impl TaskHooks {
pub(crate) fn spawn(&self, meta: &TaskMeta<'_>) {
Expand Down Expand Up @@ -62,7 +60,8 @@ pub struct TaskMeta<'a> {
/// The opaque ID of the task.
pub(crate) id: super::task::Id,
/// The location where the task was spawned.
pub(crate) spawned_at: &'static Location<'static>,
#[cfg_attr(not(tokio_unstable), allow(unreachable_pub, dead_code))]
pub(crate) spawned_at: crate::runtime::task::SpawnLocation,
pub(crate) _phantom: PhantomData<&'a ()>,
}

Expand All @@ -74,9 +73,9 @@ impl<'a> TaskMeta<'a> {
}

/// Return the source code location where the task was spawned.
#[cfg_attr(not(tokio_unstable), allow(unreachable_pub, dead_code))]
pub fn spawned_at(&self) -> &'static Location<'static> {
self.spawned_at
#[cfg(tokio_unstable)]
pub fn spawned_at(&self) -> &'static std::panic::Location<'static> {
self.spawned_at.0
}
}

Expand Down
2 changes: 1 addition & 1 deletion tokio/src/task/local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -414,7 +414,7 @@ cfg_rt! {
let task = crate::util::trace::task(future, "task", meta, id.as_u64());

// safety: we have verified that this is a `LocalRuntime` owned by the current thread
unsafe { handle.spawn_local(task, id) }
unsafe { handle.spawn_local(task, id, meta.spawned_at) }
} else {
match CURRENT.with(|LocalData { ctx, .. }| ctx.get()) {
None => panic!("`spawn_local` called from outside of a `task::LocalSet` or LocalRuntime"),
Expand Down
2 changes: 1 addition & 1 deletion tokio/src/task/spawn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ cfg_rt! {
let id = task::Id::next();
let task = crate::util::trace::task(future, "task", meta, id.as_u64());

match context::with_current(|handle| handle.spawn(task, id)) {
match context::with_current(|handle| handle.spawn(task, id, meta.spawned_at)) {
Ok(join_handle) => join_handle,
Err(e) => panic!("{}", e),
}
Expand Down
27 changes: 15 additions & 12 deletions tokio/src/util/trace.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,21 +9,29 @@ cfg_rt! {
/// The original size of the future or function being spawned
#[cfg(all(tokio_unstable, feature = "tracing"))]
pub(crate) original_size: usize,
/// The source code location where the task was spawned.
///
/// This is wrapped in a type that may be empty when `tokio_unstable` is
/// not enabled.
pub(crate) spawned_at: crate::runtime::task::SpawnLocation,
_pd: PhantomData<&'a ()>,
}

impl<'a> SpawnMeta<'a> {
/// Create new spawn meta with a name and original size (before possible auto-boxing)
#[cfg(all(tokio_unstable, feature = "tracing"))]
#[track_caller]
pub(crate) fn new(name: Option<&'a str>, original_size: usize) -> Self {
Self {
name,
original_size,
spawned_at: crate::runtime::task::SpawnLocation::capture(),
_pd: PhantomData,
}
}

/// Create a new unnamed spawn meta with the original size (before possible auto-boxing)
#[track_caller]
pub(crate) fn new_unnamed(original_size: usize) -> Self {
#[cfg(not(all(tokio_unstable, feature = "tracing")))]
let _original_size = original_size;
Expand All @@ -33,6 +41,7 @@ cfg_rt! {
name: None,
#[cfg(all(tokio_unstable, feature = "tracing"))]
original_size,
spawned_at: crate::runtime::task::SpawnLocation::capture(),
_pd: PhantomData,
}
}
Expand All @@ -50,11 +59,8 @@ cfg_rt! {
pub(crate) use tracing::instrument::Instrumented;

#[inline]
#[track_caller]
pub(crate) fn task<F>(task: F, kind: &'static str, meta: SpawnMeta<'_>, id: u64) -> Instrumented<F> {
#[track_caller]
fn get_span(kind: &'static str, spawn_meta: SpawnMeta<'_>, id: u64, task_size: usize) -> tracing::Span {
let location = std::panic::Location::caller();
let original_size = if spawn_meta.original_size != task_size {
Some(spawn_meta.original_size)
} else {
Expand All @@ -69,9 +75,9 @@ cfg_rt! {
task.id = id,
original_size.bytes = original_size,
size.bytes = task_size,
loc.file = location.file(),
loc.line = location.line(),
loc.col = location.column(),
loc.file = spawn_meta.spawned_at.0.file(),
loc.line = spawn_meta.spawned_at.0.line(),
loc.col = spawn_meta.spawned_at.0.column(),
)
}
use tracing::instrument::Instrument;
Expand All @@ -80,10 +86,7 @@ cfg_rt! {
}

#[inline]
#[track_caller]
pub(crate) fn blocking_task<Fn, Fut>(task: Fut, spawn_meta: SpawnMeta<'_>, id: u64) -> Instrumented<Fut> {
let location = std::panic::Location::caller();

let fn_size = mem::size_of::<Fn>();
let original_size = if spawn_meta.original_size != fn_size {
Some(spawn_meta.original_size)
Expand All @@ -100,9 +103,9 @@ cfg_rt! {
"fn" = %std::any::type_name::<Fn>(),
original_size.bytes = original_size,
size.bytes = fn_size,
loc.file = location.file(),
loc.line = location.line(),
loc.col = location.column(),
loc.file = spawn_meta.spawned_at.0.file(),
loc.line = spawn_meta.spawned_at.0.line(),
loc.col = spawn_meta.spawned_at.0.column(),
);
task.instrument(span)

Expand Down
Loading