Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Commit

Permalink
Per subsystem CPU usage tracking (#4239)
Browse files Browse the repository at this point in the history
* SubsystemContext: add subsystem name str

Signed-off-by: Andrei Sandu <[email protected]>

* Overseer builder proc macro changes

* initilize SubsystemContext name field.
* Add subsystem name in TaskKind::launch_task()

Signed-off-by: Andrei Sandu <[email protected]>

* Update ToOverseer enum

Signed-off-by: Andrei Sandu <[email protected]>

* Assign subsystem names to orphan tasks

Signed-off-by: Andrei Sandu <[email protected]>

* cargo fmt

Signed-off-by: Andrei Sandu <[email protected]>

* SubsystemContext: add subsystem name str

Signed-off-by: Andrei Sandu <[email protected]>

* Overseer builder proc macro changes

* initilize SubsystemContext name field.
* Add subsystem name in TaskKind::launch_task()

Signed-off-by: Andrei Sandu <[email protected]>

* Update ToOverseer enum

Signed-off-by: Andrei Sandu <[email protected]>

* Assign subsystem names to orphan tasks

Signed-off-by: Andrei Sandu <[email protected]>

* cargo fmt

Signed-off-by: Andrei Sandu <[email protected]>

* Rebase changes for new spawn() group param

Signed-off-by: Andrei Sandu <[email protected]>

* Add subsystem constat in JobTrait

Signed-off-by: Andrei Sandu <[email protected]>

* Add subsystem string

Signed-off-by: Andrei Sandu <[email protected]>

* Fix tests

Signed-off-by: Andrei Sandu <[email protected]>

* Fix spawn() calls

Signed-off-by: Andrei Sandu <[email protected]>

* cargo fmt

Signed-off-by: Andrei Sandu <[email protected]>

* Fix

Signed-off-by: Andrei Sandu <[email protected]>

* Fix tests

Signed-off-by: Andrei Sandu <[email protected]>

* fix

Signed-off-by: Andrei Sandu <[email protected]>

* Fix more tests

Signed-off-by: Andrei Sandu <[email protected]>

* Address PR review feedback #1

Signed-off-by: Andrei Sandu <[email protected]>

* Address PR review round 2

Signed-off-by: Andrei Sandu <[email protected]>

* Fixes
- remove JobTrait::Subsystem
- fix tests

Signed-off-by: Andrei Sandu <[email protected]>

* update Cargo.lock

Co-authored-by: Andronik Ordian <[email protected]>
  • Loading branch information
2 people authored and drahnr committed Nov 15, 2021
1 parent b72a589 commit ab981ac
Show file tree
Hide file tree
Showing 22 changed files with 340 additions and 260 deletions.
401 changes: 197 additions & 204 deletions Cargo.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion node/collation-generation/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,7 @@ async fn handle_new_activations<Context: SubsystemContext>(
let mut task_sender = sender.clone();
let metrics = metrics.clone();
ctx.spawn(
"collation generation collation builder",
"collation-builder",
Box::pin(async move {
let persisted_validation_data_hash = validation_data.hash();

Expand Down
5 changes: 3 additions & 2 deletions node/core/av-store/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -383,8 +383,9 @@ impl Error {
fn trace(&self) {
match self {
// don't spam the log with spurious errors
Self::RuntimeApi(_) | Self::Oneshot(_) =>
tracing::debug!(target: LOG_TARGET, err = ?self),
Self::RuntimeApi(_) | Self::Oneshot(_) => {
tracing::debug!(target: LOG_TARGET, err = ?self)
},
// it's worth reporting otherwise
_ => tracing::warn!(target: LOG_TARGET, err = ?self),
}
Expand Down
14 changes: 8 additions & 6 deletions node/core/backing/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -659,7 +659,7 @@ impl CandidateBackingJob {
}
};
sender
.send_command(FromJobCommand::Spawn("Backing Validation", bg.boxed()))
.send_command(FromJobCommand::Spawn("backing-validation", bg.boxed()))
.await?;
}

Expand Down Expand Up @@ -900,11 +900,13 @@ impl CandidateBackingJob {
.await;

match confirmation_rx.await {
Err(oneshot::Canceled) =>
tracing::debug!(target: LOG_TARGET, "Dispute coordinator confirmation lost",),
Err(oneshot::Canceled) => {
tracing::debug!(target: LOG_TARGET, "Dispute coordinator confirmation lost",)
},
Ok(ImportStatementsResult::ValidImport) => {},
Ok(ImportStatementsResult::InvalidImport) =>
tracing::warn!(target: LOG_TARGET, "Failed to import statements of validity",),
Ok(ImportStatementsResult::InvalidImport) => {
tracing::warn!(target: LOG_TARGET, "Failed to import statements of validity",)
},
}
}

Expand Down Expand Up @@ -1168,7 +1170,7 @@ impl util::JobTrait for CandidateBackingJob {
type RunArgs = SyncCryptoStorePtr;
type Metrics = Metrics;

const NAME: &'static str = "CandidateBackingJob";
const NAME: &'static str = "candidate-backing-job";

fn run<S: SubsystemSender>(
parent: Hash,
Expand Down
2 changes: 1 addition & 1 deletion node/core/bitfield-signing/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ impl JobTrait for BitfieldSigningJob {
type RunArgs = SyncCryptoStorePtr;
type Metrics = Metrics;

const NAME: &'static str = "BitfieldSigningJob";
const NAME: &'static str = "bitfield-signing-job";

/// Run a job for the parent block indicated
fn run<S: SubsystemSender>(
Expand Down
2 changes: 1 addition & 1 deletion node/core/provisioner/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ impl JobTrait for ProvisioningJob {
type RunArgs = ();
type Metrics = Metrics;

const NAME: &'static str = "ProvisioningJob";
const NAME: &'static str = "provisioner-job";

/// Run a job for the parent block indicated
//
Expand Down
14 changes: 12 additions & 2 deletions node/core/pvf/src/executor_intf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -278,11 +278,21 @@ impl TaskExecutor {
}

impl sp_core::traits::SpawnNamed for TaskExecutor {
fn spawn_blocking(&self, _: &'static str, future: futures::future::BoxFuture<'static, ()>) {
fn spawn_blocking(
&self,
_task_name: &'static str,
_subsystem_name: Option<&'static str>,
future: futures::future::BoxFuture<'static, ()>,
) {
self.0.spawn_ok(future);
}

fn spawn(&self, _: &'static str, future: futures::future::BoxFuture<'static, ()>) {
fn spawn(
&self,
_task_name: &'static str,
_subsystem_name: Option<&'static str>,
future: futures::future::BoxFuture<'static, ()>,
) {
self.0.spawn_ok(future);
}
}
Expand Down
6 changes: 4 additions & 2 deletions node/core/runtime-api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,8 @@ where
)
}
} else {
self.spawn_handle.spawn_blocking(API_REQUEST_TASK_NAME, request);
self.spawn_handle
.spawn_blocking(API_REQUEST_TASK_NAME, Some("runtime-api"), request);
self.active_requests.push(receiver);
}
}
Expand All @@ -288,7 +289,8 @@ where
}

if let Some((req, recv)) = self.waiting_requests.pop_front() {
self.spawn_handle.spawn_blocking(API_REQUEST_TASK_NAME, req);
self.spawn_handle
.spawn_blocking(API_REQUEST_TASK_NAME, Some("runtime-api"), req);
self.active_requests.push(recv);
}
}
Expand Down
1 change: 1 addition & 0 deletions node/jaeger/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ impl Jaeger {
// Spawn a background task that pulls span information and sends them on the network.
spawner.spawn(
"jaeger-collector",
Some("jaeger"),
Box::pin(async move {
match async_std::net::UdpSocket::bind("0.0.0.0:0").await {
Ok(udp_socket) => loop {
Expand Down
6 changes: 4 additions & 2 deletions node/network/availability-distribution/src/tests/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,8 @@ impl TestState {
// lock ;-)
let update_tx = tx.clone();
harness.pool.spawn(
"Sending active leaves updates",
"sending-active-leaves-updates",
None,
async move {
for update in updates {
overseer_signal(update_tx.clone(), OverseerSignal::ActiveLeaves(update)).await;
Expand Down Expand Up @@ -308,7 +309,8 @@ fn to_incoming_req(
let (tx, rx): (oneshot::Sender<netconfig::OutgoingResponse>, oneshot::Receiver<_>) =
oneshot::channel();
executor.spawn(
"Message forwarding",
"message-forwarding",
None,
async {
let response = rx.await;
let payload = response.expect("Unexpected canceled request").result;
Expand Down
2 changes: 1 addition & 1 deletion node/network/availability-recovery/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -782,7 +782,7 @@ where
awaiting: vec![response_sender],
});

if let Err(e) = ctx.spawn("recovery task", Box::pin(remote)) {
if let Err(e) = ctx.spawn("recovery-task", Box::pin(remote)) {
tracing::warn!(
target: LOG_TARGET,
err = ?e,
Expand Down
18 changes: 14 additions & 4 deletions node/overseer/overseer-gen/examples/dummy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,12 +102,22 @@ struct Xxx {
struct DummySpawner;

impl SpawnNamed for DummySpawner {
fn spawn_blocking(&self, name: &'static str, _future: futures::future::BoxFuture<'static, ()>) {
unimplemented!("spawn blocking {}", name)
fn spawn_blocking(
&self,
task_name: &'static str,
subsystem_name: Option<&'static str>,
_future: futures::future::BoxFuture<'static, ()>,
) {
unimplemented!("spawn blocking {} {}", task_name, subsystem_name.unwrap_or("default"))
}

fn spawn(&self, name: &'static str, _future: futures::future::BoxFuture<'static, ()>) {
unimplemented!("spawn {}", name)
fn spawn(
&self,
task_name: &'static str,
subsystem_name: Option<&'static str>,
_future: futures::future::BoxFuture<'static, ()>,
) {
unimplemented!("spawn {} {}", task_name, subsystem_name.unwrap_or("default"))
}
}

Expand Down
23 changes: 16 additions & 7 deletions node/overseer/overseer-gen/proc-macro/src/impl_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -337,7 +337,7 @@ pub(crate) fn impl_builder(info: &OverseerInfo) -> proc_macro2::TokenStream {
// TODO generate a builder pattern that ensures this
// TODO https://github.com/paritytech/polkadot/issues/3427
let #subsystem_name = match self. #subsystem_name {
FieldInitMethod::Fn(func) => func(handle.clone())?,
FieldInitMethod::Fn(func) => func(handle.clone())?,
FieldInitMethod::Value(val) => val,
FieldInitMethod::Uninitialized =>
panic!("All subsystems must exist with the builder pattern."),
Expand All @@ -349,11 +349,18 @@ pub(crate) fn impl_builder(info: &OverseerInfo) -> proc_macro2::TokenStream {
#channel_name_rx, #channel_name_unbounded_rx
);
let (signal_tx, signal_rx) = #support_crate ::metered::channel(SIGNAL_CHANNEL_CAPACITY);

// Generate subsystem name based on overseer field name.
let mut subsystem_string = String::from(stringify!(#subsystem_name));
// Convert owned `snake case` string to a `kebab case` static str.
let subsystem_static_str = Box::leak(subsystem_string.replace("_", "-").into_boxed_str());

let ctx = #subsyste_ctx_name::< #consumes >::new(
signal_rx,
message_rx,
channels_out.clone(),
to_overseer_tx.clone(),
subsystem_static_str
);

let #subsystem_name: OverseenSubsystem< #consumes > =
Expand All @@ -364,6 +371,7 @@ pub(crate) fn impl_builder(info: &OverseerInfo) -> proc_macro2::TokenStream {
unbounded_meter,
ctx,
#subsystem_name,
subsystem_static_str,
&mut running_subsystems,
)?;
)*
Expand Down Expand Up @@ -489,22 +497,22 @@ pub(crate) fn impl_task_kind(info: &OverseerInfo) -> proc_macro2::TokenStream {
/// Task kind to launch.
pub trait TaskKind {
/// Spawn a task, it depends on the implementer if this is blocking or not.
fn launch_task<S: SpawnNamed>(spawner: &mut S, name: &'static str, future: BoxFuture<'static, ()>);
fn launch_task<S: SpawnNamed>(spawner: &mut S, task_name: &'static str, subsystem_name: &'static str, future: BoxFuture<'static, ()>);
}

#[allow(missing_docs)]
struct Regular;
impl TaskKind for Regular {
fn launch_task<S: SpawnNamed>(spawner: &mut S, name: &'static str, future: BoxFuture<'static, ()>) {
spawner.spawn(name, future)
fn launch_task<S: SpawnNamed>(spawner: &mut S, task_name: &'static str, subsystem_name: &'static str, future: BoxFuture<'static, ()>) {
spawner.spawn(task_name, Some(subsystem_name), future)
}
}

#[allow(missing_docs)]
struct Blocking;
impl TaskKind for Blocking {
fn launch_task<S: SpawnNamed>(spawner: &mut S, name: &'static str, future: BoxFuture<'static, ()>) {
spawner.spawn_blocking(name, future)
fn launch_task<S: SpawnNamed>(spawner: &mut S, task_name: &'static str, subsystem_name: &'static str, future: BoxFuture<'static, ()>) {
spawner.spawn(task_name, Some(subsystem_name), future)
}
}

Expand All @@ -517,6 +525,7 @@ pub(crate) fn impl_task_kind(info: &OverseerInfo) -> proc_macro2::TokenStream {
unbounded_meter: #support_crate ::metered::Meter,
ctx: Ctx,
s: SubSys,
subsystem_name: &'static str,
futures: &mut #support_crate ::FuturesUnordered<BoxFuture<'static, ::std::result::Result<(), #error_ty> >>,
) -> ::std::result::Result<OverseenSubsystem<M>, #error_ty >
where
Expand All @@ -540,7 +549,7 @@ pub(crate) fn impl_task_kind(info: &OverseerInfo) -> proc_macro2::TokenStream {
let _ = tx.send(());
});

<TK as TaskKind>::launch_task(spawner, name, fut);
<TK as TaskKind>::launch_task(spawner, name, subsystem_name, fut);

futures.push(Box::pin(
rx.map(|e| {
Expand Down
9 changes: 9 additions & 0 deletions node/overseer/overseer-gen/proc-macro/src/impl_misc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ pub(crate) fn impl_misc(info: &OverseerInfo) -> proc_macro2::TokenStream {
>,
signals_received: SignalsReceived,
pending_incoming: Option<(usize, M)>,
name: &'static str
}

impl<M> #subsystem_ctx_name<M> {
Expand All @@ -121,6 +122,7 @@ pub(crate) fn impl_misc(info: &OverseerInfo) -> proc_macro2::TokenStream {
messages: SubsystemIncomingMessages<M>,
to_subsystems: ChannelsOut,
to_overseer: #support_crate ::metered::UnboundedMeteredSender<#support_crate:: ToOverseer>,
name: &'static str
) -> Self {
let signals_received = SignalsReceived::default();
#subsystem_ctx_name {
Expand All @@ -133,8 +135,13 @@ pub(crate) fn impl_misc(info: &OverseerInfo) -> proc_macro2::TokenStream {
to_overseer,
signals_received,
pending_incoming: None,
name
}
}

fn name(&self) -> &'static str {
self.name
}
}

#[#support_crate ::async_trait]
Expand Down Expand Up @@ -229,6 +236,7 @@ pub(crate) fn impl_misc(info: &OverseerInfo) -> proc_macro2::TokenStream {
{
self.to_overseer.unbounded_send(#support_crate ::ToOverseer::SpawnJob {
name,
subsystem: Some(self.name()),
s,
}).map_err(|_| #support_crate ::OverseerError::TaskSpawn(name))?;
Ok(())
Expand All @@ -239,6 +247,7 @@ pub(crate) fn impl_misc(info: &OverseerInfo) -> proc_macro2::TokenStream {
{
self.to_overseer.unbounded_send(#support_crate ::ToOverseer::SpawnBlockingJob {
name,
subsystem: Some(self.name()),
s,
}).map_err(|_| #support_crate ::OverseerError::TaskSpawn(name))?;
Ok(())
Expand Down
12 changes: 10 additions & 2 deletions node/overseer/overseer-gen/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,8 @@ pub enum ToOverseer {
SpawnJob {
/// Name of the task to spawn which be shown in jaeger and tracing logs.
name: &'static str,
/// Subsystem of the task to spawn which be shown in jaeger and tracing logs.
subsystem: Option<&'static str>,
/// The future to execute.
s: BoxFuture<'static, ()>,
},
Expand All @@ -120,6 +122,8 @@ pub enum ToOverseer {
SpawnBlockingJob {
/// Name of the task to spawn which be shown in jaeger and tracing logs.
name: &'static str,
/// Subsystem of the task to spawn which be shown in jaeger and tracing logs.
subsystem: Option<&'static str>,
/// The future to execute.
s: BoxFuture<'static, ()>,
},
Expand All @@ -128,8 +132,12 @@ pub enum ToOverseer {
impl fmt::Debug for ToOverseer {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::SpawnJob { name, .. } => writeln!(f, "SpawnJob{{ {}, ..}}", name),
Self::SpawnBlockingJob { name, .. } => writeln!(f, "SpawnBlockingJob{{ {}, ..}}", name),
Self::SpawnJob { name, subsystem, .. } => {
writeln!(f, "SpawnJob{{ {}, {} ..}}", name, subsystem.unwrap_or("default"))
},
Self::SpawnBlockingJob { name, subsystem, .. } => {
writeln!(f, "SpawnBlockingJob{{ {}, {} ..}}", name, subsystem.unwrap_or("default"))
},
}
}
}
Expand Down
31 changes: 22 additions & 9 deletions node/overseer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -562,7 +562,10 @@ where

futures::future::ready(())
});
overseer.spawner().spawn("metrics_metronome", Box::pin(metronome));
overseer
.spawner()
.spawn("metrics-metronome", Some("overseer"), Box::pin(metronome));

Ok(())
}

Expand Down Expand Up @@ -616,11 +619,11 @@ where
},
msg = self.to_overseer_rx.select_next_some() => {
match msg {
ToOverseer::SpawnJob { name, s } => {
self.spawn_job(name, s);
ToOverseer::SpawnJob { name, subsystem, s } => {
self.spawn_job(name, subsystem, s);
}
ToOverseer::SpawnBlockingJob { name, s } => {
self.spawn_blocking_job(name, s);
ToOverseer::SpawnBlockingJob { name, subsystem, s } => {
self.spawn_blocking_job(name, subsystem, s);
}
}
},
Expand Down Expand Up @@ -772,11 +775,21 @@ where
}
}

fn spawn_job(&mut self, name: &'static str, j: BoxFuture<'static, ()>) {
self.spawner.spawn(name, j);
fn spawn_job(
&mut self,
task_name: &'static str,
subsystem_name: Option<&'static str>,
j: BoxFuture<'static, ()>,
) {
self.spawner.spawn(task_name, subsystem_name, j);
}

fn spawn_blocking_job(&mut self, name: &'static str, j: BoxFuture<'static, ()>) {
self.spawner.spawn_blocking(name, j);
fn spawn_blocking_job(
&mut self,
task_name: &'static str,
subsystem_name: Option<&'static str>,
j: BoxFuture<'static, ()>,
) {
self.spawner.spawn_blocking(task_name, subsystem_name, j);
}
}
Loading

0 comments on commit ab981ac

Please sign in to comment.