Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reduce LLVM code generation #5859

Merged
merged 17 commits into from
Jul 15, 2023
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 1 addition & 3 deletions tokio/src/runtime/scheduler/multi_thread/handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,7 @@ impl Handle {
{
let (handle, notified) = me.shared.owned.bind(future, me.clone(), id);

if let Some(notified) = notified {
me.schedule_task(notified, false);
}
me.schedule_option_task_without_yield(notified);

handle
}
Expand Down
6 changes: 6 additions & 0 deletions tokio/src/runtime/scheduler/multi_thread/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1024,6 +1024,12 @@ impl Handle {
})
}

pub(super) fn schedule_option_task_without_yield(&self, task: Option<Notified>) {
if let Some(task) = task {
self.schedule_task(task, false);
}
}

fn schedule_local(&self, core: &mut Core, task: Notified, is_yield: bool) {
core.stats.inc_local_schedule_count();

Expand Down
71 changes: 50 additions & 21 deletions tokio/src/runtime/task/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -211,44 +211,66 @@ impl<T: Future, S: Schedule> Cell<T, S> {
/// Allocates a new task cell, containing the header, trailer, and core
/// structures.
pub(super) fn new(future: T, scheduler: S, state: State, task_id: Id) -> Box<Cell<T, S>> {
// Separated into a non-generic function to reduce LLVM codegen
fn new_header(
state: State,
vtable: &'static Vtable,
#[cfg(all(tokio_unstable, feature = "tracing"))] tracing_id: Option<tracing::Id>,
) -> Header {
Header {
state,
queue_next: UnsafeCell::new(None),
vtable,
owner_id: UnsafeCell::new(0),
#[cfg(all(tokio_unstable, feature = "tracing"))]
tracing_id,
}
}

#[cfg(all(tokio_unstable, feature = "tracing"))]
let tracing_id = future.id();
let vtable = raw::vtable::<T, S>();
let result = Box::new(Cell {
header: Header {
header: new_header(
state,
queue_next: UnsafeCell::new(None),
vtable: raw::vtable::<T, S>(),
owner_id: UnsafeCell::new(0),
vtable,
#[cfg(all(tokio_unstable, feature = "tracing"))]
tracing_id,
},
),
core: Core {
scheduler,
stage: CoreStage {
stage: UnsafeCell::new(Stage::Running(future)),
},
task_id,
},
trailer: Trailer {
waker: UnsafeCell::new(None),
owned: linked_list::Pointers::new(),
},
trailer: Trailer::new(),
});

#[cfg(debug_assertions)]
{
let trailer_addr = (&result.trailer) as *const Trailer as usize;
let trailer_ptr = unsafe { Header::get_trailer(NonNull::from(&result.header)) };
assert_eq!(trailer_addr, trailer_ptr.as_ptr() as usize);

let scheduler_addr = (&result.core.scheduler) as *const S as usize;
let scheduler_ptr =
unsafe { Header::get_scheduler::<S>(NonNull::from(&result.header)) };
assert_eq!(scheduler_addr, scheduler_ptr.as_ptr() as usize);

let id_addr = (&result.core.task_id) as *const Id as usize;
let id_ptr = unsafe { Header::get_id_ptr(NonNull::from(&result.header)) };
assert_eq!(id_addr, id_ptr.as_ptr() as usize);
// Using a separate function for this code avoids instantiating it separately for every `T`.
unsafe fn check<S>(header: &Header, trailer: &Trailer, scheduler: &S, task_id: &Id) {
let trailer_addr = trailer as *const Trailer as usize;
let trailer_ptr = unsafe { Header::get_trailer(NonNull::from(header)) };
assert_eq!(trailer_addr, trailer_ptr.as_ptr() as usize);

let scheduler_addr = scheduler as *const S as usize;
let scheduler_ptr = unsafe { Header::get_scheduler::<S>(NonNull::from(header)) };
assert_eq!(scheduler_addr, scheduler_ptr.as_ptr() as usize);

let id_addr = task_id as *const Id as usize;
let id_ptr = unsafe { Header::get_id_ptr(NonNull::from(header)) };
assert_eq!(id_addr, id_ptr.as_ptr() as usize);
}
unsafe {
check(
&result.header,
&result.trailer,
&result.core.scheduler,
&result.core.task_id,
);
}
}

result
Expand Down Expand Up @@ -442,6 +464,13 @@ impl Header {
}

impl Trailer {
fn new() -> Self {
Trailer {
waker: UnsafeCell::new(None),
owned: linked_list::Pointers::new(),
}
}

pub(super) unsafe fn set_waker(&self, waker: Option<Waker>) {
self.waker.with_mut(|ptr| {
*ptr = waker;
Expand Down
58 changes: 37 additions & 21 deletions tokio/src/runtime/task/harness.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,9 @@ use crate::future::Future;
use crate::runtime::task::core::{Cell, Core, Header, Trailer};
use crate::runtime::task::state::{Snapshot, State};
use crate::runtime::task::waker::waker_ref;
use crate::runtime::task::{JoinError, Notified, RawTask, Schedule, Task};
use crate::runtime::task::{Id, JoinError, Notified, RawTask, Schedule, Task};

use std::any::Any;
use std::mem;
use std::mem::ManuallyDrop;
use std::panic;
Expand Down Expand Up @@ -192,6 +193,15 @@ where

match self.state().transition_to_running() {
TransitionToRunning::Success => {
// Separated to reduce LLVM codegen
fn transition_result_to_poll_future(result: TransitionToIdle) -> PollFuture {
match result {
TransitionToIdle::Ok => PollFuture::Done,
TransitionToIdle::OkNotified => PollFuture::Notified,
TransitionToIdle::OkDealloc => PollFuture::Dealloc,
TransitionToIdle::Cancelled => PollFuture::Complete,
}
}
let header_ptr = self.header_ptr();
let waker_ref = waker_ref::<T, S>(&header_ptr);
let cx = Context::from_waker(&waker_ref);
Expand All @@ -202,17 +212,13 @@ where
return PollFuture::Complete;
}

match self.state().transition_to_idle() {
TransitionToIdle::Ok => PollFuture::Done,
TransitionToIdle::OkNotified => PollFuture::Notified,
TransitionToIdle::OkDealloc => PollFuture::Dealloc,
TransitionToIdle::Cancelled => {
// The transition to idle failed because the task was
// cancelled during the poll.
cancel_task(self.core());
PollFuture::Complete
}
let transition_res = self.state().transition_to_idle();
if let &TransitionToIdle::Cancelled = &transition_res {
dullbananas marked this conversation as resolved.
Show resolved Hide resolved
// The transition to idle failed because the task was
// cancelled during the poll.
cancel_task(self.core());
}
transition_result_to_poll_future(transition_res)
}
TransitionToRunning::Cancelled => {
cancel_task(self.core());
Expand Down Expand Up @@ -447,13 +453,16 @@ fn cancel_task<T: Future, S: Schedule>(core: &Core<T, S>) {
core.drop_future_or_output();
}));

core.store_output(Err(panic_result_to_join_error(core.task_id, res)));
}

fn panic_result_to_join_error(
task_id: Id,
res: Result<(), Box<dyn Any + Send + 'static>>,
) -> JoinError {
match res {
Ok(()) => {
core.store_output(Err(JoinError::cancelled(core.task_id)));
}
Err(panic) => {
core.store_output(Err(JoinError::panic(core.task_id, panic)));
}
Ok(()) => JoinError::cancelled(task_id),
Err(panic) => JoinError::panic(task_id, panic),
}
}

Expand Down Expand Up @@ -482,10 +491,7 @@ fn poll_future<T: Future, S: Schedule>(core: &Core<T, S>, cx: Context<'_>) -> Po
let output = match output {
Ok(Poll::Pending) => return Poll::Pending,
Ok(Poll::Ready(output)) => Ok(output),
Err(panic) => {
core.scheduler.unhandled_panic();
Err(JoinError::panic(core.task_id, panic))
}
Err(panic) => Err(panic_to_error(&core.scheduler, core.task_id, panic)),
};

// Catch and ignore panics if the future panics on drop.
Expand All @@ -499,3 +505,13 @@ fn poll_future<T: Future, S: Schedule>(core: &Core<T, S>, cx: Context<'_>) -> Po

Poll::Ready(())
}

#[cold]
fn panic_to_error<S: Schedule>(
scheduler: &S,
task_id: Id,
panic: Box<dyn Any + Send + 'static>,
) -> JoinError {
scheduler.unhandled_panic();
JoinError::panic(task_id, panic)
}
12 changes: 10 additions & 2 deletions tokio/src/runtime/task/list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,15 @@ impl<S: 'static> OwnedTasks<S> {
T::Output: Send + 'static,
{
let (task, notified, join) = super::new_task(task, scheduler, id);
let notified = unsafe { self.bind_inner(task, notified) };
(join, notified)
}

/// The part of `bind` that's the same for every type of future.
unsafe fn bind_inner(&self, task: Task<S>, notified: Notified<S>) -> Option<Notified<S>>
where
S: Schedule,
{
unsafe {
// safety: We just created the task, so we have exclusive access
// to the field.
Expand All @@ -108,10 +116,10 @@ impl<S: 'static> OwnedTasks<S> {
drop(lock);
drop(notified);
task.shutdown();
(join, None)
None
} else {
lock.list.push_front(task);
(join, Some(notified))
Some(notified)
}
}

Expand Down
26 changes: 15 additions & 11 deletions tokio/src/util/trace.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,18 +11,22 @@ cfg_trace! {
#[inline]
#[track_caller]
pub(crate) fn task<F>(task: F, kind: &'static str, name: Option<&str>, id: u64) -> Instrumented<F> {
#[track_caller]
fn get_span(kind: &'static str, name: Option<&str>, id: u64) -> tracing::Span {
let location = std::panic::Location::caller();
tracing::trace_span!(
target: "tokio::task",
"runtime.spawn",
%kind,
task.name = %name.unwrap_or_default(),
task.id = id,
loc.file = location.file(),
loc.line = location.line(),
loc.col = location.column(),
)
}
use tracing::instrument::Instrument;
let location = std::panic::Location::caller();
let span = tracing::trace_span!(
target: "tokio::task",
"runtime.spawn",
%kind,
task.name = %name.unwrap_or_default(),
task.id = id,
loc.file = location.file(),
loc.line = location.line(),
loc.col = location.column(),
);
let span = get_span(kind, name, id);
task.instrument(span)
}

Expand Down