diff --git a/Cargo.lock b/Cargo.lock index a6dbdf857150f..b1573c81bc9ea 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1899,6 +1899,7 @@ dependencies = [ "substrate-primitives 0.1.0", "substrate-service 0.3.0", "substrate-service-test 0.3.0", + "substrate-telemetry 0.3.1", "substrate-transaction-pool 0.1.0", "tokio 0.1.15 (registry+https://github.com/rust-lang/crates.io-index)", ] @@ -3644,7 +3645,7 @@ dependencies = [ "substrate-primitives 0.1.0", "substrate-service 0.3.0", "substrate-state-machine 0.1.0", - "substrate-telemetry 0.3.0", + "substrate-telemetry 0.3.1", "sysinfo 0.8.0 (registry+https://github.com/rust-lang/crates.io-index)", "time 0.1.42 (registry+https://github.com/rust-lang/crates.io-index)", "tokio 0.1.15 (registry+https://github.com/rust-lang/crates.io-index)", @@ -3676,7 +3677,7 @@ dependencies = [ "substrate-keyring 0.1.0", "substrate-primitives 0.1.0", "substrate-state-machine 0.1.0", - "substrate-telemetry 0.3.0", + "substrate-telemetry 0.3.1", "substrate-test-client 0.1.0", "substrate-trie 0.4.0", ] @@ -3732,6 +3733,7 @@ dependencies = [ "substrate-network 0.1.0", "substrate-primitives 0.1.0", "substrate-service 0.3.0", + "substrate-telemetry 0.3.1", "substrate-test-client 0.1.0", "tokio 0.1.15 (registry+https://github.com/rust-lang/crates.io-index)", ] @@ -3853,6 +3855,7 @@ dependencies = [ "substrate-network 0.1.0", "substrate-primitives 0.1.0", "substrate-service 0.3.0", + "substrate-telemetry 0.3.1", "substrate-test-client 0.1.0", "tokio 0.1.15 (registry+https://github.com/rust-lang/crates.io-index)", ] @@ -4073,7 +4076,7 @@ dependencies = [ "substrate-network 0.1.0", "substrate-primitives 0.1.0", "substrate-rpc-servers 0.1.0", - "substrate-telemetry 0.3.0", + "substrate-telemetry 0.3.1", "substrate-test-client 0.1.0", "substrate-transaction-pool 0.1.0", "target_info 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)", @@ -4129,11 +4132,14 @@ dependencies = [ [[package]] name = "substrate-telemetry" -version = "0.3.0" +version = "0.3.1" dependencies = [ "lazy_static 1.2.0 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.4.6 (registry+https://github.com/rust-lang/crates.io-index)", "parking_lot 0.7.1 (registry+https://github.com/rust-lang/crates.io-index)", + "rand 0.6.5 (registry+https://github.com/rust-lang/crates.io-index)", + "serde 1.0.87 (registry+https://github.com/rust-lang/crates.io-index)", + "serde_derive 1.0.87 (registry+https://github.com/rust-lang/crates.io-index)", "slog 2.4.1 (registry+https://github.com/rust-lang/crates.io-index)", "slog-async 2.3.0 (registry+https://github.com/rust-lang/crates.io-index)", "slog-json 2.3.0 (registry+https://github.com/rust-lang/crates.io-index)", diff --git a/core/cli/src/informant.rs b/core/cli/src/informant.rs index e6de9d5b634ca..3955152691285 100644 --- a/core/cli/src/informant.rs +++ b/core/cli/src/informant.rs @@ -25,7 +25,7 @@ use tokio::timer::Interval; use sysinfo::{get_current_pid, ProcessExt, System, SystemExt}; use network::{SyncState, SyncProvider}; use client::{backend::Backend, BlockchainEvents}; -use substrate_telemetry::telemetry; +use substrate_telemetry::*; use log::{debug, info, warn}; use runtime_primitives::generic::BlockId; @@ -86,6 +86,7 @@ pub fn start(service: &Service, exit: ::exit_future::Exit, handle: TaskExe } else { (0.0, 0) }; telemetry!( + SUBSTRATE_INFO; "system.interval"; "status" => format!("{}{}", status, target), "peers" => num_peers, @@ -144,7 +145,7 @@ pub fn start(service: &Service, exit: ::exit_future::Exit, handle: TaskExe let txpool = service.transaction_pool(); let display_txpool_import = txpool.import_notification_stream().for_each(move |_| { let status = txpool.status(); - telemetry!("txpool.import"; "ready" => status.ready, "future" => status.future); + telemetry!(SUBSTRATE_INFO; "txpool.import"; "ready" => status.ready, "future" => status.future); Ok(()) }); diff --git a/core/cli/src/lib.rs b/core/cli/src/lib.rs index 76fb7a31cdbbe..4fb0647cad303 100644 --- a/core/cli/src/lib.rs +++ b/core/cli/src/lib.rs @@ -59,6 +59,7 @@ use log::info; use lazy_static::lazy_static; use futures::Future; +use substrate_telemetry::TelemetryEndpoints; const MAX_NODE_NAME_LENGTH: usize = 32; @@ -401,9 +402,9 @@ where // Override telemetry if cli.no_telemetry { - config.telemetry_url = None; - } else if let Some(url) = cli.telemetry_url { - config.telemetry_url = Some(url); + config.telemetry_endpoints = None; + } else if !cli.telemetry_endpoints.is_empty() { + config.telemetry_endpoints = Some(TelemetryEndpoints::new(cli.telemetry_endpoints)); } Ok(config) diff --git a/core/cli/src/params.rs b/core/cli/src/params.rs index bd33c7f1f16b7..26a9a00ebf0a0 100644 --- a/core/cli/src/params.rs +++ b/core/cli/src/params.rs @@ -173,9 +173,11 @@ pub struct RunCmd { #[structopt(long = "no-telemetry")] pub no_telemetry: bool, - /// The URL of the telemetry server to connect to - #[structopt(long = "telemetry-url", value_name = "TELEMETRY_URL")] - pub telemetry_url: Option, + /// The URL of the telemetry server to connect to. This flag can be passed multiple times + /// as a mean to specify multiple telemetry endpoints. Verbosity levels range from 0-9, with + /// 0 denoting the least verbosity. If no verbosity level is specified the default is 0. + #[structopt(long = "telemetry-url", value_name = "URL VERBOSITY", parse(try_from_str = "parse_telemetry_endpoints"))] + pub telemetry_endpoints: Vec<(String, u8)>, /// The means of execution used when calling into the runtime while syncing blocks. #[structopt( @@ -239,6 +241,21 @@ pub struct RunCmd { pub pool_config: TransactionPoolParams, } +/// Default to verbosity level 0, if none is provided. +fn parse_telemetry_endpoints(s: &str) -> Result<(String, u8), Box> { + let pos = s.find(' '); + match pos { + None => { + Ok((s.to_owned(), 0)) + }, + Some(pos_) => { + let verbosity = s[pos_ + 1..].parse()?; + let url = s[..pos_].parse()?; + Ok((url, verbosity)) + } + } +} + impl_augment_clap!(RunCmd); impl_get_log_filter!(RunCmd); diff --git a/core/client/src/client.rs b/core/client/src/client.rs index 99f276ea2bbaf..bafc077833a61 100644 --- a/core/client/src/client.rs +++ b/core/client/src/client.rs @@ -61,7 +61,7 @@ use crate::in_mem; use crate::block_builder::{self, api::BlockBuilder as BlockBuilderAPI}; use crate::genesis; use consensus; -use substrate_telemetry::telemetry; +use substrate_telemetry::*; use log::{info, trace, warn}; use error_chain::bail; @@ -729,7 +729,7 @@ impl Client where fork_choice, ); - telemetry!("block.import"; + telemetry!(SUBSTRATE_INFO; "block.import"; "height" => height, "best" => ?hash, "origin" => ?origin @@ -859,7 +859,7 @@ impl Client where warn!(" Header {:?}", header); warn!(" Native result {:?}", native_result); warn!(" Wasm result {:?}", wasm_result); - telemetry!("block.execute.consensus_failure"; + telemetry!(SUBSTRATE_INFO; "block.execute.consensus_failure"; "hash" => ?hash, "origin" => ?origin, "header" => ?header diff --git a/core/consensus/aura/Cargo.toml b/core/consensus/aura/Cargo.toml index 1b7f44cd38adc..81588f66dccb6 100644 --- a/core/consensus/aura/Cargo.toml +++ b/core/consensus/aura/Cargo.toml @@ -18,6 +18,7 @@ aura_primitives = { package = "substrate-consensus-aura-primitives", path = "pri inherents = { package = "substrate-inherents", path = "../../inherents" } srml-consensus = { path = "../../../srml/consensus" } srml-aura = { path = "../../../srml/aura" } +substrate-telemetry = { path = "../../telemetry" } futures = "0.1.17" tokio = "0.1.7" parking_lot = "0.7.1" diff --git a/core/consensus/aura/src/lib.rs b/core/consensus/aura/src/lib.rs index e3a87e8ac988f..38fe854626399 100644 --- a/core/consensus/aura/src/lib.rs +++ b/core/consensus/aura/src/lib.rs @@ -52,6 +52,7 @@ use srml_aura::{ InherentType as AuraInherent, AuraInherentData, timestamp::{TimestampInherentData, InherentType as TimestampInherent, InherentError as TIError} }; +use substrate_telemetry::*; use aura_slots::{CheckedHeader, SlotWorker, SlotInfo, SlotCompatible}; @@ -265,12 +266,18 @@ impl SlotWorker for AuraWorker whe chain_head.hash(), e ); + telemetry!(CONSENSUS_WARN; "aura.unable_fetching_authorities"; + "slot" => ?chain_head.hash(), "err" => ?e + ); return Box::new(future::ok(())); } }; if self.sync_oracle.is_offline() && authorities.len() > 1 { - debug!(target: "aura", "Skipping proposal slot. Waiting for the netork."); + debug!(target: "aura", "Skipping proposal slot. Waiting for the network."); + telemetry!(CONSENSUS_DEBUG; "aura.skipping_proposal_slot"; + "authorities_len" => authorities.len() + ); return Box::new(future::ok(())); } @@ -282,12 +289,18 @@ impl SlotWorker for AuraWorker whe slot_num, timestamp ); + telemetry!(CONSENSUS_DEBUG; "aura.starting_authorship"; + "slot_num" => slot_num, "timestamp" => timestamp + ); // we are the slot author. make a block and sign it. let proposer = match env.init(&chain_head, &authorities) { Ok(p) => p, Err(e) => { warn!("Unable to author block in slot {:?}: {:?}", slot_num, e); + telemetry!(CONSENSUS_WARN; "aura.unable_authoring_block"; + "slot" => slot_num, "err" => ?e + ); return Box::new(future::ok(())) } }; @@ -315,6 +328,9 @@ impl SlotWorker for AuraWorker whe "Discarding proposal for slot {}; block production took too long", slot_num ); + telemetry!(CONSENSUS_INFO; "aura.discarding_proposal_took_too_long"; + "slot" => slot_num + ); return } @@ -348,10 +364,18 @@ impl SlotWorker for AuraWorker whe import_block.post_header().hash(), pre_hash ); + telemetry!(CONSENSUS_INFO; "aura.pre_sealed_block"; + "header_num" => ?header_num, + "hash_now" => ?import_block.post_header().hash(), + "hash_previously" => ?pre_hash + ); if let Err(e) = block_import.import_block(import_block, None) { warn!(target: "aura", "Error with block built on {:?}: {:?}", parent_hash, e); + telemetry!(CONSENSUS_WARN; "aura.err_with_block_built_on"; + "hash" => ?parent_hash, "err" => ?e + ); } }) .map_err(|e| consensus_common::ErrorKind::ClientImport(format!("{:?}", e)).into()) @@ -456,6 +480,9 @@ impl AuraVerifier "halting for block {} seconds in the future", diff ); + telemetry!(CONSENSUS_INFO; "aura.halting_for_future_block"; + "diff" => ?diff + ); thread::sleep(Duration::from_secs(diff)); Ok(()) }, @@ -504,6 +531,7 @@ impl AuraVerifier "halting for block {} seconds in the future", diff ); + telemetry!(CONSENSUS_INFO; "aura.halting_for_future_block"; "diff" => ?diff); thread::sleep(Duration::from_secs(diff)); Ok(()) }, @@ -589,6 +617,7 @@ impl Verifier for AuraVerifier where } trace!(target: "aura", "Checked {:?}; importing.", pre_header); + telemetry!(CONSENSUS_TRACE; "aura.checked_and_importing"; "pre_header" => ?pre_header); extra_verification.into_future().wait()?; @@ -608,6 +637,9 @@ impl Verifier for AuraVerifier where } CheckedHeader::Deferred(a, b) => { debug!(target: "aura", "Checking {:?} failed; {:?}, {:?}.", hash, a, b); + telemetry!(CONSENSUS_DEBUG; "aura.header_too_far_in_future"; + "hash" => ?hash, "a" => ?a, "b" => ?b + ); Err(format!("Header {:?} rejected: too far in the future", hash)) } } diff --git a/core/finality-grandpa/Cargo.toml b/core/finality-grandpa/Cargo.toml index d94f950287ee5..3f02c33842357 100644 --- a/core/finality-grandpa/Cargo.toml +++ b/core/finality-grandpa/Cargo.toml @@ -16,6 +16,7 @@ parity-codec-derive = "3.0" runtime_primitives = { package = "sr-primitives", path = "../sr-primitives" } consensus_common = { package = "substrate-consensus-common", path = "../consensus/common" } substrate-primitives = { path = "../primitives" } +substrate-telemetry = { path = "../telemetry" } client = { package = "substrate-client", path = "../client" } network = { package = "substrate-network", path = "../network" } service = { package = "substrate-service", path = "../service", optional = true } diff --git a/core/finality-grandpa/src/lib.rs b/core/finality-grandpa/src/lib.rs index ad00980d3c4e4..c1bfeb0d08e90 100644 --- a/core/finality-grandpa/src/lib.rs +++ b/core/finality-grandpa/src/lib.rs @@ -69,6 +69,7 @@ use runtime_primitives::traits::{ use fg_primitives::GrandpaApi; use runtime_primitives::generic::BlockId; use substrate_primitives::{ed25519, H256, Ed25519AuthorityId, Blake2Hasher}; +use substrate_telemetry::*; use grandpa::Error as GrandpaError; use grandpa::{voter, round::State as RoundState, BlockNumberOps, VoterSet}; @@ -296,20 +297,32 @@ impl GossipValidator { let rounds = self.rounds.read(); if set_id < rounds.set_id { trace!(target: "afg", "Expired: Message with expired set_id {} (ours {})", set_id, rounds.set_id); + telemetry!(CONSENSUS_TRACE; "afg.expired_set_id"; + "set_id" => ?set_id, "ours" => ?rounds.set_id + ); return true; } else if set_id == rounds.set_id + 1 { // allow a few first rounds of future set. if round > MESSAGE_ROUND_TOLERANCE { trace!(target: "afg", "Expired: Message too far in the future set, round {} (ours set_id {})", round, rounds.set_id); + telemetry!(CONSENSUS_TRACE; "afg.expired_msg_too_far_in_future_set"; + "round" => ?round, "ours" => ?rounds.set_id + ); return true; } } else if set_id == rounds.set_id { if round < rounds.min_live_round.saturating_sub(MESSAGE_ROUND_TOLERANCE) { trace!(target: "afg", "Expired: Message round is out of bounds {} (ours {}-{})", round, rounds.min_live_round, rounds.max_round); + telemetry!(CONSENSUS_TRACE; "afg.msg_round_oob"; + "round" => ?round, "our_min_live_round" => ?rounds.min_live_round, "our_max_round" => ?rounds.max_round + ); return true; } } else { trace!(target: "afg", "Expired: Message in invalid future set {} (ours {})", set_id, rounds.set_id); + telemetry!(CONSENSUS_TRACE; "afg.expired_msg_in_invalid_future_set"; + "set_id" => ?set_id, "ours" => ?rounds.set_id + ); return true; } false @@ -330,6 +343,7 @@ impl GossipValidator { full.set_id ) { debug!(target: "afg", "Bad message signature {}", full.message.id); + telemetry!(CONSENSUS_DEBUG; "afg.bad_msg_signature"; "signature" => ?full.message.id); return network_gossip::ValidationResult::Invalid; } @@ -347,6 +361,11 @@ impl GossipValidator { if full.message.precommits.len() != full.message.auth_data.len() || full.message.precommits.is_empty() { debug!(target: "afg", "Malformed compact commit"); + telemetry!(CONSENSUS_DEBUG; "afg.malformed_compact_commit"; + "precommits_len" => ?full.message.precommits.len(), + "auth_data_len" => ?full.message.auth_data.len(), + "precommits_is_empty" => ?full.message.precommits.is_empty(), + ); return network_gossip::ValidationResult::Invalid; } @@ -360,6 +379,7 @@ impl GossipValidator { full.set_id, ) { debug!(target: "afg", "Bad commit message signature {}", id); + telemetry!(CONSENSUS_DEBUG; "afg.bad_commit_msg_signature"; "id" => ?id); return network_gossip::ValidationResult::Invalid; } } @@ -376,6 +396,7 @@ impl network_gossip::Validator for GossipValidator self.validate_commit_message(message), None => { debug!(target: "afg", "Error decoding message"); + telemetry!(CONSENSUS_DEBUG; "afg.err_decoding_msg"; "" => ""); network_gossip::ValidationResult::Invalid } } @@ -493,6 +514,9 @@ impl,> Network ?block, "round" => ?round + ); self.service.announce_block(block) } } @@ -550,6 +574,9 @@ pub fn block_import, RA, PRA>( // are unsupported for following GRANDPA directly. let genesis_authorities = api.runtime_api() .grandpa_authorities(&BlockId::number(Zero::zero()))?; + telemetry!(CONSENSUS_DEBUG; "afg.loading_authorities"; + "authorities_len" => ?genesis_authorities.len() + ); let authority_set = SharedAuthoritySet::genesis(genesis_authorities); let encoded = authority_set.inner().read().encode(); @@ -703,6 +730,9 @@ pub fn run_grandpa, N, RA>( let voter_work = future::loop_fn(initial_state, move |params| { let (env, last_round_number, last_state, authority_set_change) = params; debug!(target: "afg", "{}: Starting new voter with set ID {}", config.name(), env.set_id); + telemetry!(CONSENSUS_DEBUG; "afg.starting_new_voter"; + "name" => ?config.name(), "set_id" => ?env.set_id + ); let chain_info = match client.info() { Ok(i) => i, @@ -790,7 +820,10 @@ pub fn run_grandpa, N, RA>( let voter_work = voter_work .join(broadcast_worker) .map(|((), ())| ()) - .map_err(|e| warn!("GRANDPA Voter failed: {:?}", e)); + .map_err(|e| { + warn!("GRANDPA Voter failed: {:?}", e); + telemetry!(CONSENSUS_WARN; "afg.voter_failed"; "e" => ?e); + }); Ok(voter_work.select(on_exit).then(|_| Ok(()))) } diff --git a/core/service/src/chain_spec.rs b/core/service/src/chain_spec.rs index 5351bc96fdf22..8e231f40294f7 100644 --- a/core/service/src/chain_spec.rs +++ b/core/service/src/chain_spec.rs @@ -25,6 +25,7 @@ use runtime_primitives::{BuildStorage, StorageOverlay, ChildrenStorageOverlay}; use serde_json as json; use crate::components::RuntimeGenesis; use network::Multiaddr; +use tel::TelemetryEndpoints; enum GenesisSource { File(PathBuf), @@ -87,7 +88,7 @@ struct ChainSpecFile { pub name: String, pub id: String, pub boot_nodes: Vec, - pub telemetry_url: Option, + pub telemetry_endpoints: Option, pub protocol_id: Option, pub consensus_engine: Option, pub properties: Option, @@ -124,8 +125,8 @@ impl ChainSpec { &self.spec.id } - pub fn telemetry_url(&self) -> Option<&str> { - self.spec.telemetry_url.as_ref().map(String::as_str) + pub fn telemetry_endpoints(&self) -> &Option { + &self.spec.telemetry_endpoints } pub fn protocol_id(&self) -> Option<&str> { @@ -170,7 +171,7 @@ impl ChainSpec { id: &str, constructor: fn() -> G, boot_nodes: Vec, - telemetry_url: Option<&str>, + telemetry_endpoints: Option, protocol_id: Option<&str>, consensus_engine: Option<&str>, properties: Option, @@ -180,7 +181,7 @@ impl ChainSpec { name: name.to_owned(), id: id.to_owned(), boot_nodes: boot_nodes, - telemetry_url: telemetry_url.map(str::to_owned), + telemetry_endpoints, protocol_id: protocol_id.map(str::to_owned), consensus_engine: consensus_engine.map(str::to_owned), properties, diff --git a/core/service/src/config.rs b/core/service/src/config.rs index 84dfe599e2fcc..9883996eacf62 100644 --- a/core/service/src/config.rs +++ b/core/service/src/config.rs @@ -25,6 +25,7 @@ pub use network::config::{NetworkConfiguration, Roles}; use runtime_primitives::BuildStorage; use serde::{Serialize, de::DeserializeOwned}; use target_info::Target; +use tel::TelemetryEndpoints; /// Service configuration. #[derive(Clone)] @@ -64,7 +65,7 @@ pub struct Configuration { /// RPC over Websockets binding address. `None` if disabled. pub rpc_ws: Option, /// Telemetry service URL. `None` if disabled. - pub telemetry_url: Option, + pub telemetry_endpoints: Option, /// The default number of 64KB pages to allocate for Wasm execution pub default_heap_pages: Option, } @@ -90,11 +91,13 @@ impl Configuration Service { let version = config.full_version(); info!("Best block: #{}", best_header.number()); - telemetry!("node.start"; "height" => best_header.number().as_(), "best" => ?best_header.hash()); + telemetry!(SUBSTRATE_INFO; "node.start"; "height" => best_header.number().as_(), "best" => ?best_header.hash()); let network_protocol = ::build_network_protocol(&config)?; let transaction_pool = Arc::new( @@ -269,7 +269,7 @@ impl Service { )?; // Telemetry - let telemetry = config.telemetry_url.clone().map(|url| { + let telemetry = config.telemetry_endpoints.clone().map(|endpoints| { let is_authority = config.roles == Roles::AUTHORITY; let network_id = network.local_peer_id().to_base58(); let pubkey = format!("{}", public_key); @@ -278,9 +278,9 @@ impl Service { let version = version.clone(); let chain_name = config.chain_spec.name().to_owned(); Arc::new(tel::init_telemetry(tel::TelemetryConfig { - url: url, + endpoints, on_connect: Box::new(move || { - telemetry!("system.connected"; + telemetry!(SUBSTRATE_INFO; "system.connected"; "name" => name.clone(), "implementation" => impl_name.clone(), "version" => version.clone(), diff --git a/core/service/test/src/lib.rs b/core/service/test/src/lib.rs index 10cac1608ad28..795fa05bc733d 100644 --- a/core/service/test/src/lib.rs +++ b/core/service/test/src/lib.rs @@ -120,7 +120,7 @@ fn node_config ( execution_strategies: Default::default(), rpc_http: None, rpc_ws: None, - telemetry_url: None, + telemetry_endpoints: None, default_heap_pages: None, } } diff --git a/core/telemetry/Cargo.toml b/core/telemetry/Cargo.toml index 6567cdcf13ea5..0553c4f16b757 100644 --- a/core/telemetry/Cargo.toml +++ b/core/telemetry/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "substrate-telemetry" -version = "0.3.0" +version = "0.3.1" authors = ["Parity Technologies "] description = "Telemetry utils" edition = "2018" @@ -9,6 +9,9 @@ edition = "2018" parking_lot = "0.7.1" lazy_static = "1.0" log = "0.4" +rand = "0.6" +serde = "1.0.81" +serde_derive = "1.0" slog = "^2" slog-json = "^2" slog-async = "^2" diff --git a/core/telemetry/src/lib.rs b/core/telemetry/src/lib.rs index 9afa7b03ed3db..37bb8c1a8e7e3 100644 --- a/core/telemetry/src/lib.rs +++ b/core/telemetry/src/lib.rs @@ -26,15 +26,21 @@ use std::sync::Arc; use parking_lot::Mutex; use slog::{Drain, o}; use log::trace; +use rand::{thread_rng, Rng}; pub use slog_scope::with_logger; pub use slog; +use serde_derive::{Serialize, Deserialize}; +use slog::OwnedKVList; +use slog::Record; +use core::result; /// Configuration for telemetry. pub struct TelemetryConfig { - /// URL of the telemetry WebSocket server. - pub url: String, - /// What do do when we connect to the server. - pub on_connect: Box, + /// Collection of telemetry WebSocket servers with a corresponding verbosity level. + pub endpoints: TelemetryEndpoints, + /// What do do when we connect to the servers. + /// Note that this closure is executed each time we connect to a telemetry endpoint. + pub on_connect: Box, } /// Telemetry service guard. @@ -43,66 +49,165 @@ pub type Telemetry = slog_scope::GlobalLoggerGuard; /// Size of the channel for passing messages to telemetry thread. const CHANNEL_SIZE: usize = 262144; +/// Log levels. +pub const SUBSTRATE_DEBUG: &str = "9"; +pub const SUBSTRATE_INFO: &str = "0"; + +pub const CONSENSUS_TRACE: &str = "9"; +pub const CONSENSUS_DEBUG: &str = "5"; +pub const CONSENSUS_WARN: &str = "4"; +pub const CONSENSUS_INFO: &str = "3"; + +/// Multiply logging to all drains. This is similar to `slog::Duplicate`, which is +/// limited to two drains though and doesn't support dynamic nesting at runtime. +#[derive(Debug, Clone)] +pub struct Multiply (pub Vec>); + +impl Multiply { + pub fn new(v: Vec>) -> Self { + Multiply(v) + } +} + +impl Drain for Multiply { + type Ok = Vec; + type Err = Vec; + + fn log(&self, record: &Record, logger_values: &OwnedKVList) -> result::Result { + let mut oks = Vec::new(); + let mut errs = Vec::new(); + + self.0.iter().for_each(|l| { + let res: Result<::Ok, ::Err> = (*l).log(record, logger_values); + match res { + Ok(o) => oks.push(o), + Err(e) => errs.push(e), + } + }); + + if !errs.is_empty() { + result::Result::Err(errs) + } else { + result::Result::Ok(oks) + } + } +} + /// Initialise telemetry. pub fn init_telemetry(config: TelemetryConfig) -> slog_scope::GlobalLoggerGuard { - let writer = TelemetryWriter::new(); - let out_sync = writer.out.clone(); - let log = slog::Logger::root( - slog_async::Async::new( - slog_json::Json::default(writer).fuse() - ).chan_size(CHANNEL_SIZE) + let mut endpoint_drains: Vec>> = Vec::new(); + let mut out_syncs = Vec::new(); + + // Set up a filter/drain for each endpoint + config.endpoints.0.iter().for_each(|(url, verbosity)| { + let writer = TelemetryWriter::new(Arc::new(url.to_owned())); + let out_sync = writer.out.clone(); + out_syncs.push(out_sync); + + let until_verbosity = *verbosity; + let filter = slog::Filter( + slog_json::Json::default(writer).fuse(), + move |rec| { + let tag = rec.tag().parse::() + .expect("`telemetry!` macro requires tag."); + tag <= until_verbosity + }); + + let filter = Box::new(filter) as Box>; + endpoint_drains.push(filter); + }); + + // Set up logging to all endpoints + let drain = slog_async::Async::new(Multiply::new(endpoint_drains).fuse()); + let root = slog::Logger::root(drain.chan_size(CHANNEL_SIZE) .overflow_strategy(slog_async::OverflowStrategy::DropAndReport) .build().fuse(), o!() ); - let logger_guard = slog_scope::set_global_logger(log); - - thread::spawn(move || { - loop { - trace!(target: "telemetry", "Connecting to Telemetry... {:?}", config.url); - let _ = ws::connect(config.url.as_str(), |out| Connection::new(out, &*out_sync, &config)); - - thread::sleep(time::Duration::from_millis(5000)); - } + let logger_guard = slog_scope::set_global_logger(root); + + // Spawn a thread for each endpoint + let on_connect = Arc::new(config.on_connect); + config.endpoints.0.into_iter().for_each(|(url, verbosity)| { + let inner_verbosity = Arc::new(verbosity.to_owned()); + let inner_on_connect = Arc::clone(&on_connect); + + let out_sync = out_syncs.remove(0); + let out_sync = Arc::clone(&out_sync); + + thread::spawn(move || { + loop { + let on_connect = Arc::clone(&inner_on_connect); + let out_sync = Arc::clone(&out_sync); + let verbosity = Arc::clone(&inner_verbosity); + + trace!(target: "telemetry", + "Connecting to Telemetry at {} with verbosity {}", url, Arc::clone(&verbosity)); + + let _ = ws::connect(url.to_owned(), + |out| { + Connection::new(out, Arc::clone(&out_sync), Arc::clone(&on_connect), url.clone()) + }); + + // Sleep for a random time between 5-10 secs. If there are general connection + // issues not all threads should be synchronized in their re-connection time. + let random_sleep = thread_rng().gen_range(0, 5); + thread::sleep(time::Duration::from_secs(5) + time::Duration::from_secs(random_sleep)); + } + }); }); return logger_guard; } -/// Exactly equivalent to `slog_scope::info`, provided as a convenience. +/// Translates to `slog_scope::info`, but contains an additional verbosity +/// parameter which the log record is tagged with. Additionally the verbosity +/// parameter is added to the record as a key-value pair. #[macro_export] macro_rules! telemetry { - ( $($t:tt)* ) => { $crate::with_logger(|l| $crate::slog::slog_info!(l, $($t)* )) } + ( $a:expr; $b:expr; $( $t:tt )* ) => { + $crate::with_logger(|l| { + $crate::slog::slog_info!(l, #$a, $b; "verbosity" => stringify!($a), $($t)* ) + }) + } } -struct Connection<'a> { +struct Connection { out: ws::Sender, - out_sync: &'a Mutex>, - config: &'a TelemetryConfig, + out_sync: Arc>>, + on_connect: Arc>, + url: String, } -impl<'a> Connection<'a> { - fn new(out: ws::Sender, out_sync: &'a Mutex>, config: &'a TelemetryConfig) -> Self { +impl Connection { + fn new( + out: ws::Sender, + out_sync: Arc>>, + on_connect: Arc>, + url: String + ) -> Self { Connection { out, out_sync, - config, + on_connect, + url, } } } -impl<'a> ws::Handler for Connection<'a> { +impl ws::Handler for Connection { fn on_open(&mut self, _: ws::Handshake) -> ws::Result<()> { - trace!(target: "telemetry", "Connected!"); + trace!(target: "telemetry", "Connected to {}!", self.url); *self.out_sync.lock() = Some(self.out.clone()); - (self.config.on_connect)(); + (self.on_connect)(); Ok(()) } fn on_close(&mut self, code: ws::CloseCode, reason: &str) { *self.out_sync.lock() = None; - trace!(target: "telemetry", "Connection closing due to ({:?}) {}", code, reason); + trace!(target: "telemetry", "Connection to {} closing due to ({:?}) {}", + self.url, code, reason); } fn on_error(&mut self, _: ws::Error) { @@ -117,15 +222,17 @@ impl<'a> ws::Handler for Connection<'a> { struct TelemetryWriter { buffer: Vec, out: Arc>>, + url: Arc, } impl TelemetryWriter { - fn new() -> Self { + fn new(url: Arc) -> Self { let out = Arc::new(Mutex::new(None)); TelemetryWriter { buffer: Vec::new(), out, + url, } } } @@ -155,11 +262,11 @@ impl io::Write for TelemetryWriter { let error = if let Some(ref mut o) = *out { let r = o.send(s); - trace!(target: "telemetry", "Sent to telemetry: {} -> {:?}", s, r); + trace!(target: "telemetry", "Sent to telemetry {}: {} -> {:?}", self.url, s, r); r.is_err() } else { - trace!(target: "telemetry", "Telemetry socket closed, failed to send: {}", s); + trace!(target: "telemetry", "Telemetry socket closed to {}, failed to send: {}", self.url, s); false }; @@ -171,3 +278,12 @@ impl io::Write for TelemetryWriter { Ok(()) } } + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct TelemetryEndpoints (Vec<(String, u8)>); + +impl TelemetryEndpoints { + pub fn new(endpoints: Vec<(String, u8)>) -> Self { + TelemetryEndpoints(endpoints) + } +} diff --git a/node/cli/Cargo.toml b/node/cli/Cargo.toml index 2c81f127d1c26..082d168a4e044 100644 --- a/node/cli/Cargo.toml +++ b/node/cli/Cargo.toml @@ -30,6 +30,7 @@ grandpa = { package = "substrate-finality-grandpa", path = "../../core/finality- sr-primitives = { path = "../../core/sr-primitives" } node-executor = { path = "../executor" } substrate-keystore = { path = "../../core/keystore" } +substrate-telemetry = { package = "substrate-telemetry", path = "../../core/telemetry" } [dev-dependencies] service-test = { package = "substrate-service-test", path = "../../core/service/test" } diff --git a/node/cli/res/dried-danta.json b/node/cli/res/dried-danta.json index 6f7167d06cccc..1830a6411a39f 100644 --- a/node/cli/res/dried-danta.json +++ b/node/cli/res/dried-danta.json @@ -11,7 +11,9 @@ "/ip4/104.211.48.247/tcp/30333/p2p/QmV2zjgFRfxbgYZQC9qFr4aHsQt7tDBJRAdgqqxqTq1Kta", "/ip4/40.114.120.164/tcp/30333/p2p/QmQbPCeurXuKhzCw6Ar6ovizNKATMTnkkqFJKgZzbF2MJs" ], - "telemetryUrl": "wss://telemetry.polkadot.io/submit/", + "telemetryEndpoints": [ + ["wss://telemetry.polkadot.io/submit/", 0] + ], "protocolId": null, "consensusEngine": null, "genesis": { diff --git a/node/cli/src/chain_spec.rs b/node/cli/src/chain_spec.rs index 5148c191238b4..47e98c1b7c4e2 100644 --- a/node/cli/src/chain_spec.rs +++ b/node/cli/src/chain_spec.rs @@ -26,6 +26,7 @@ use substrate_service; use hex_literal::{hex, hex_impl}; use substrate_keystore::pad_seed; +use substrate_telemetry::TelemetryEndpoints; const STAGING_TELEMETRY_URL: &str = "wss://telemetry.polkadot.io/submit/"; @@ -153,7 +154,7 @@ pub fn staging_testnet_config() -> ChainSpec { "staging_testnet", staging_testnet_config_genesis, boot_nodes, - Some(STAGING_TELEMETRY_URL.into()), + Some(TelemetryEndpoints::new(vec![(STAGING_TELEMETRY_URL.to_string(), 0)])), None, None, None,