Skip to content
This repository was archived by the owner on Nov 15, 2023. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
5c2f3dc
Fix tracing tests (#8022)
bkchr Feb 2, 2021
094fee8
Fix tracing spans are not being forwarded to spawned task (#8009)
bkchr Feb 1, 2021
a443b5c
Proper test for telemetry and prefix span
cecton Feb 4, 2021
5448b94
WIP
cecton Feb 4, 2021
f52d1ea
Fix test (need to create & enter the span at the same time)
cecton Feb 4, 2021
09ef926
WIP
cecton Feb 4, 2021
6f42906
Remove telemtry_span from sc_service config
cecton Feb 4, 2021
1b8d945
CLEANUP
cecton Feb 4, 2021
c2e1e37
Merge commit 30ec0bedf7b902b10188e9da8650c688aad23e1f (no conflict)
cecton Feb 4, 2021
4f7f326
Merge commit 017a9a06b44c191d98ab76ccd4e021aea2d16e79 (conflicts)
cecton Feb 4, 2021
60c02ef
Update comment
cecton Feb 4, 2021
d1381de
Incorrect indent
cecton Feb 4, 2021
fd5cfbb
Merge commit 169b16f67509366e8a0ceaf8022ec791e4b2eea7 (no conflict)
cecton Feb 4, 2021
95e4742
More meaningful name
cecton Feb 4, 2021
3d7adba
Dedent
cecton Feb 4, 2021
34008ec
Naming XD
cecton Feb 4, 2021
c94bdc6
Attempt to make a more complete test
cecton Feb 5, 2021
4d023d5
Merge commit 6105169c51344d3c1344532e2b5831804a4c7abd (no conflict)
cecton Feb 10, 2021
17b2010
Merge commit a675f9a1551770d45bab35bc20cfe5a2663721a0 (conflicts)
cecton Feb 10, 2021
e7dae73
Merge commit 22441aa4bc40200cbd98503e6f44769dd39f7031 (no conflict)
cecton Feb 10, 2021
35fa4c3
lint
cecton Feb 10, 2021
e4544c3
Missing licenses
cecton Feb 11, 2021
4315639
Remove user data
cecton Feb 11, 2021
86870f0
CLEANUP
cecton Feb 11, 2021
b7538be
Apply suggestions from code review
cecton Feb 11, 2021
6d4b06b
CLEANUP
cecton Feb 11, 2021
5705955
Apply suggestion
cecton Feb 12, 2021
51738dd
Update bin/node/cli/tests/telemetry.rs
cecton Feb 12, 2021
d57c361
Wrapping lines
cecton Feb 12, 2021
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 9 additions & 0 deletions bin/node-template/node/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ pub use sc_executor::NativeExecutor;
use sp_consensus_aura::sr25519::{AuthorityPair as AuraPair};
use sc_finality_grandpa::SharedVoterState;
use sc_keystore::LocalKeystore;
use sc_telemetry::TelemetrySpan;

// Our native executor instance.
native_executor_instance!(
Expand Down Expand Up @@ -161,6 +162,9 @@ pub fn new_full(mut config: Configuration) -> Result<TaskManager, ServiceError>
})
};

let telemetry_span = TelemetrySpan::new();
let _telemetry_span_entered = telemetry_span.enter();

let (_rpc_handlers, telemetry_connection_notifier) = sc_service::spawn_tasks(
sc_service::SpawnTasksParams {
network: network.clone(),
Expand All @@ -175,6 +179,7 @@ pub fn new_full(mut config: Configuration) -> Result<TaskManager, ServiceError>
network_status_sinks,
system_rpc_tx,
config,
telemetry_span: Some(telemetry_span.clone()),
},
)?;

Expand Down Expand Up @@ -311,6 +316,9 @@ pub fn new_light(mut config: Configuration) -> Result<TaskManager, ServiceError>
);
}

let telemetry_span = TelemetrySpan::new();
let _telemetry_span_entered = telemetry_span.enter();

sc_service::spawn_tasks(sc_service::SpawnTasksParams {
remote_blockchain: Some(backend.remote_blockchain()),
transaction_pool,
Expand All @@ -324,6 +332,7 @@ pub fn new_light(mut config: Configuration) -> Result<TaskManager, ServiceError>
network,
network_status_sinks,
system_rpc_tx,
telemetry_span: Some(telemetry_span.clone()),
})?;

network_starter.start_network();
Expand Down
10 changes: 9 additions & 1 deletion bin/node/cli/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ use sp_runtime::traits::Block as BlockT;
use futures::prelude::*;
use sc_client_api::{ExecutorProvider, RemoteBackend};
use node_executor::Executor;
use sc_telemetry::TelemetryConnectionNotifier;
use sc_telemetry::{TelemetryConnectionNotifier, TelemetrySpan};

type FullClient = sc_service::TFullClient<Block, RuntimeApi, Executor>;
type FullBackend = sc_service::TFullBackend<Block>;
Expand Down Expand Up @@ -225,6 +225,9 @@ pub fn new_full_base(
let enable_grandpa = !config.disable_grandpa;
let prometheus_registry = config.prometheus_registry().cloned();

let telemetry_span = TelemetrySpan::new();
let _telemetry_span_entered = telemetry_span.enter();

let (_rpc_handlers, telemetry_connection_notifier) = sc_service::spawn_tasks(
sc_service::SpawnTasksParams {
config,
Expand All @@ -239,6 +242,7 @@ pub fn new_full_base(
remote_blockchain: None,
network_status_sinks: network_status_sinks.clone(),
system_rpc_tx,
telemetry_span: Some(telemetry_span.clone()),
},
)?;

Expand Down Expand Up @@ -432,6 +436,9 @@ pub fn new_light_base(mut config: Configuration) -> Result<(

let rpc_extensions = node_rpc::create_light(light_deps);

let telemetry_span = TelemetrySpan::new();
let _telemetry_span_entered = telemetry_span.enter();

let (rpc_handlers, telemetry_connection_notifier) =
sc_service::spawn_tasks(sc_service::SpawnTasksParams {
on_demand: Some(on_demand),
Expand All @@ -443,6 +450,7 @@ pub fn new_light_base(mut config: Configuration) -> Result<(
config, backend, network_status_sinks, system_rpc_tx,
network: network.clone(),
task_manager: &mut task_manager,
telemetry_span: Some(telemetry_span.clone()),
})?;

Ok((
Expand Down
4 changes: 1 addition & 3 deletions client/cli/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ use sc_service::config::{
TaskExecutor, TelemetryEndpoints, TransactionPoolOptions, WasmExecutionMethod,
};
use sc_service::{ChainSpec, TracingReceiver, KeepBlocks, TransactionStorageMode};
use sc_telemetry::{TelemetryHandle, TelemetrySpan};
use sc_telemetry::TelemetryHandle;
use sc_tracing::logging::LoggerBuilder;
use std::net::SocketAddr;
use std::path::PathBuf;
Expand Down Expand Up @@ -494,7 +494,6 @@ pub trait CliConfiguration<DCV: DefaultConfigurationValues = ()>: Sized {
.transpose()?
// Don't initialise telemetry if `telemetry_endpoints` == Some([])
.filter(|x| !x.is_empty());
let telemetry_span = telemetry_endpoints.as_ref().map(|_| TelemetrySpan::new());

let unsafe_pruning = self
.import_params()
Expand Down Expand Up @@ -534,7 +533,6 @@ pub trait CliConfiguration<DCV: DefaultConfigurationValues = ()>: Sized {
rpc_cors: self.rpc_cors(is_dev)?,
prometheus_config: self.prometheus_config(DCV::prometheus_listen_port())?,
telemetry_endpoints,
telemetry_span,
telemetry_external_transport: self.telemetry_external_transport()?,
default_heap_pages: self.default_heap_pages()?,
offchain_worker: self.offchain_worker(&role)?,
Expand Down
1 change: 1 addition & 0 deletions client/service/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -92,3 +92,4 @@ grandpa-primitives = { version = "2.0.0", package = "sp-finality-grandpa", path
tokio = { version = "0.2.25", default-features = false }
async-std = { version = "1.6.5", default-features = false }
tracing-subscriber = "0.2.15"
tracing-log = "0.1.1"
14 changes: 11 additions & 3 deletions client/service/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ use sc_telemetry::{
telemetry,
ConnectionMessage,
TelemetryConnectionNotifier,
TelemetrySpan,
SUBSTRATE_INFO,
};
use sp_transaction_pool::MaintainedTransactionPool;
Expand Down Expand Up @@ -307,7 +308,7 @@ pub fn new_full_parts<TBl, TRtApi, TExecDisp>(

let task_manager = {
let registry = config.prometheus_config.as_ref().map(|cfg| &cfg.registry);
TaskManager::new(config.task_executor.clone(), registry, config.telemetry_span.clone())?
TaskManager::new(config.task_executor.clone(), registry)?
};

let executor = NativeExecutor::<TExecDisp>::new(
Expand Down Expand Up @@ -376,7 +377,7 @@ pub fn new_light_parts<TBl, TRtApi, TExecDisp>(
let keystore_container = KeystoreContainer::new(&config.keystore)?;
let task_manager = {
let registry = config.prometheus_config.as_ref().map(|cfg| &cfg.registry);
TaskManager::new(config.task_executor.clone(), registry, config.telemetry_span.clone())?
TaskManager::new(config.task_executor.clone(), registry)?
};

let executor = NativeExecutor::<TExecDisp>::new(
Expand Down Expand Up @@ -490,6 +491,10 @@ pub struct SpawnTasksParams<'a, TBl: BlockT, TCl, TExPool, TRpc, Backend> {
pub network_status_sinks: NetworkStatusSinks<TBl>,
/// A Sender for RPC requests.
pub system_rpc_tx: TracingUnboundedSender<sc_rpc::system::Request<TBl>>,
/// Telemetry span.
///
/// This span needs to be entered **before** calling [`spawn_tasks()`].

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

pub telemetry_span: Option<TelemetrySpan>,
}

/// Build a shared offchain workers instance.
Expand Down Expand Up @@ -569,6 +574,7 @@ pub fn spawn_tasks<TBl, TBackend, TExPool, TRpc, TCl>(
network,
network_status_sinks,
system_rpc_tx,
telemetry_span,
} = params;

let chain_info = client.usage_info().chain;
Expand All @@ -581,6 +587,7 @@ pub fn spawn_tasks<TBl, TBackend, TExPool, TRpc, TCl>(

let telemetry_connection_notifier = init_telemetry(
&mut config,
telemetry_span,
network.clone(),
client.clone(),
);
Expand Down Expand Up @@ -681,10 +688,11 @@ async fn transaction_notifications<TBl, TExPool>(

fn init_telemetry<TBl: BlockT, TCl: BlockBackend<TBl>>(
config: &mut Configuration,
telemetry_span: Option<TelemetrySpan>,
network: Arc<NetworkService<TBl, <TBl as BlockT>::Hash>>,
client: Arc<TCl>,
) -> Option<TelemetryConnectionNotifier> {
let telemetry_span = config.telemetry_span.clone()?;
let telemetry_span = telemetry_span?;
let endpoints = config.telemetry_endpoints.clone()?;
let genesis_hash = client.block_hash(Zero::zero()).ok().flatten().unwrap_or_default();
let connection_message = ConnectionMessage {
Expand Down
4 changes: 0 additions & 4 deletions client/service/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,10 +101,6 @@ pub struct Configuration {
/// This is a handle to a `TelemetryWorker` instance. It is used to initialize the telemetry for
/// a substrate node.
pub telemetry_handle: Option<sc_telemetry::TelemetryHandle>,
/// Telemetry span.
///
/// This span is entered for every background task spawned using the TaskManager.
pub telemetry_span: Option<sc_telemetry::TelemetrySpan>,
/// The default number of 64KB pages to allocate for Wasm execution
pub default_heap_pages: Option<u64>,
/// Should offchain workers be executed.
Expand Down
46 changes: 2 additions & 44 deletions client/service/src/task_manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use log::{debug, error};
use futures::{
Future, FutureExt, StreamExt,
future::{select, Either, BoxFuture, join_all, try_join_all, pending},
sink::SinkExt, task::{Context, Poll},
sink::SinkExt,
};
use prometheus_endpoint::{
exponential_buckets, register,
Expand All @@ -34,51 +34,18 @@ use prometheus_endpoint::{
use sp_utils::mpsc::{TracingUnboundedSender, TracingUnboundedReceiver, tracing_unbounded};
use tracing_futures::Instrument;
use crate::{config::{TaskExecutor, TaskType, JoinFuture}, Error};
use sc_telemetry::TelemetrySpan;

mod prometheus_future;
#[cfg(test)]
mod tests;

/// A wrapper around a `[Option<TelemetrySpan>]` and a [`Future`].
///
/// The telemetry in Substrate uses a span to identify the telemetry context. The span "infrastructure"
/// is provided by the tracing-crate. Now it is possible to have your own spans as well. To support
/// this with the [`TaskManager`] we have this wrapper. This wrapper enters the telemetry span every
/// time the future is polled and polls the inner future. So, the inner future can still have its
/// own span attached and we get our telemetry span ;)
struct WithTelemetrySpan<T> {
span: Option<TelemetrySpan>,
inner: T,
}

impl<T> WithTelemetrySpan<T> {
fn new(span: Option<TelemetrySpan>, inner: T) -> Self {
Self {
span,
inner,
}
}
}

impl<T: Future<Output = ()> + Unpin> Future for WithTelemetrySpan<T> {
type Output = ();

fn poll(mut self: Pin<&mut Self>, ctx: &mut Context) -> Poll<Self::Output> {
let span = self.span.clone();
let _enter = span.as_ref().map(|s| s.enter());
Pin::new(&mut self.inner).poll(ctx)
}
}

/// An handle for spawning tasks in the service.
#[derive(Clone)]
pub struct SpawnTaskHandle {
on_exit: exit_future::Exit,
executor: TaskExecutor,
metrics: Option<Metrics>,
task_notifier: TracingUnboundedSender<JoinFuture>,
telemetry_span: Option<TelemetrySpan>,
}

impl SpawnTaskHandle {
Expand Down Expand Up @@ -155,11 +122,7 @@ impl SpawnTaskHandle {
}
};

let future = future.in_current_span().boxed();
let join_handle = self.executor.spawn(
WithTelemetrySpan::new(self.telemetry_span.clone(), future).boxed(),
task_type,
);
let join_handle = self.executor.spawn(future.in_current_span().boxed(), task_type);

let mut task_notifier = self.task_notifier.clone();
self.executor.spawn(
Expand Down Expand Up @@ -266,8 +229,6 @@ pub struct TaskManager {
/// terminates and gracefully shutdown. Also ends the parent `future()` if a child's essential
/// task fails.
children: Vec<TaskManager>,
/// A `TelemetrySpan` used to enter the telemetry span when a task is spawned.
telemetry_span: Option<TelemetrySpan>,
}

impl TaskManager {
Expand All @@ -276,7 +237,6 @@ impl TaskManager {
pub(super) fn new(
executor: TaskExecutor,
prometheus_registry: Option<&Registry>,
telemetry_span: Option<TelemetrySpan>,
) -> Result<Self, PrometheusError> {
let (signal, on_exit) = exit_future::signal();

Expand Down Expand Up @@ -305,7 +265,6 @@ impl TaskManager {
task_notifier,
completion_future,
children: Vec::new(),
telemetry_span,
})
}

Expand All @@ -316,7 +275,6 @@ impl TaskManager {
executor: self.executor.clone(),
metrics: self.metrics.clone(),
task_notifier: self.task_notifier.clone(),
telemetry_span: self.telemetry_span.clone(),
}
}

Expand Down
Loading