diff --git a/Cargo.lock b/Cargo.lock index 10caf9a3d45ff..4edf000310a85 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1910,6 +1910,7 @@ dependencies = [ "substrate-primitives 0.1.0", "substrate-service 0.3.0", "substrate-service-test 0.3.0", + "substrate-telemetry 0.3.1", "substrate-transaction-pool 0.1.0", "tokio 0.1.15 (registry+https://github.com/rust-lang/crates.io-index)", ] @@ -3637,7 +3638,7 @@ dependencies = [ "substrate-primitives 0.1.0", "substrate-service 0.3.0", "substrate-state-machine 0.1.0", - "substrate-telemetry 0.3.0", + "substrate-telemetry 0.3.1", "sysinfo 0.8.0 (registry+https://github.com/rust-lang/crates.io-index)", "time 0.1.42 (registry+https://github.com/rust-lang/crates.io-index)", "tokio 0.1.15 (registry+https://github.com/rust-lang/crates.io-index)", @@ -3669,7 +3670,7 @@ dependencies = [ "substrate-keyring 0.1.0", "substrate-primitives 0.1.0", "substrate-state-machine 0.1.0", - "substrate-telemetry 0.3.0", + "substrate-telemetry 0.3.1", "substrate-test-client 0.1.0", "substrate-trie 0.4.0", ] @@ -3725,6 +3726,7 @@ dependencies = [ "substrate-network 0.1.0", "substrate-primitives 0.1.0", "substrate-service 0.3.0", + "substrate-telemetry 0.3.1", "substrate-test-client 0.1.0", "tokio 0.1.15 (registry+https://github.com/rust-lang/crates.io-index)", ] @@ -3846,6 +3848,7 @@ dependencies = [ "substrate-network 0.1.0", "substrate-primitives 0.1.0", "substrate-service 0.3.0", + "substrate-telemetry 0.3.1", "substrate-test-client 0.1.0", "tokio 0.1.15 (registry+https://github.com/rust-lang/crates.io-index)", ] @@ -4063,7 +4066,7 @@ dependencies = [ "substrate-network 0.1.0", "substrate-primitives 0.1.0", "substrate-rpc-servers 0.1.0", - "substrate-telemetry 0.3.0", + "substrate-telemetry 0.3.1", "substrate-test-client 0.1.0", "substrate-transaction-pool 0.1.0", "target_info 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)", @@ -4119,11 +4122,14 @@ dependencies = [ [[package]] name = "substrate-telemetry" -version = "0.3.0" +version = "0.3.1" dependencies = [ "lazy_static 1.2.0 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.4.6 (registry+https://github.com/rust-lang/crates.io-index)", "parking_lot 0.7.1 (registry+https://github.com/rust-lang/crates.io-index)", + "rand 0.6.5 (registry+https://github.com/rust-lang/crates.io-index)", + "serde 1.0.87 (registry+https://github.com/rust-lang/crates.io-index)", + "serde_derive 1.0.87 (registry+https://github.com/rust-lang/crates.io-index)", "slog 2.4.1 (registry+https://github.com/rust-lang/crates.io-index)", "slog-async 2.3.0 (registry+https://github.com/rust-lang/crates.io-index)", "slog-json 2.3.0 (registry+https://github.com/rust-lang/crates.io-index)", diff --git a/core/cli/src/informant.rs b/core/cli/src/informant.rs index e6de9d5b634ca..3955152691285 100644 --- a/core/cli/src/informant.rs +++ b/core/cli/src/informant.rs @@ -25,7 +25,7 @@ use tokio::timer::Interval; use sysinfo::{get_current_pid, ProcessExt, System, SystemExt}; use network::{SyncState, SyncProvider}; use client::{backend::Backend, BlockchainEvents}; -use substrate_telemetry::telemetry; +use substrate_telemetry::*; use log::{debug, info, warn}; use runtime_primitives::generic::BlockId; @@ -86,6 +86,7 @@ pub fn start(service: &Service, exit: ::exit_future::Exit, handle: TaskExe } else { (0.0, 0) }; telemetry!( + SUBSTRATE_INFO; "system.interval"; "status" => format!("{}{}", status, target), "peers" => num_peers, @@ -144,7 +145,7 @@ pub fn start(service: &Service, exit: ::exit_future::Exit, handle: TaskExe let txpool = service.transaction_pool(); let display_txpool_import = txpool.import_notification_stream().for_each(move |_| { let status = txpool.status(); - telemetry!("txpool.import"; "ready" => status.ready, "future" => status.future); + telemetry!(SUBSTRATE_INFO; "txpool.import"; "ready" => status.ready, "future" => status.future); Ok(()) }); diff --git a/core/cli/src/lib.rs b/core/cli/src/lib.rs index c39a6d5ae6140..2276922afcecc 100644 --- a/core/cli/src/lib.rs +++ b/core/cli/src/lib.rs @@ -59,6 +59,7 @@ use log::info; use lazy_static::lazy_static; use futures::Future; +use substrate_telemetry::TelemetryEndpoints; const MAX_NODE_NAME_LENGTH: usize = 32; @@ -401,9 +402,9 @@ where // Override telemetry if cli.no_telemetry { - config.telemetry_url = None; - } else if let Some(url) = cli.telemetry_url { - config.telemetry_url = Some(url); + config.telemetry_endpoints = None; + } else if !cli.telemetry_endpoints.is_empty() { + config.telemetry_endpoints = Some(TelemetryEndpoints::new(cli.telemetry_endpoints)); } Ok(config) diff --git a/core/cli/src/params.rs b/core/cli/src/params.rs index bd33c7f1f16b7..26a9a00ebf0a0 100644 --- a/core/cli/src/params.rs +++ b/core/cli/src/params.rs @@ -173,9 +173,11 @@ pub struct RunCmd { #[structopt(long = "no-telemetry")] pub no_telemetry: bool, - /// The URL of the telemetry server to connect to - #[structopt(long = "telemetry-url", value_name = "TELEMETRY_URL")] - pub telemetry_url: Option, + /// The URL of the telemetry server to connect to. This flag can be passed multiple times + /// as a mean to specify multiple telemetry endpoints. Verbosity levels range from 0-9, with + /// 0 denoting the least verbosity. If no verbosity level is specified the default is 0. + #[structopt(long = "telemetry-url", value_name = "URL VERBOSITY", parse(try_from_str = "parse_telemetry_endpoints"))] + pub telemetry_endpoints: Vec<(String, u8)>, /// The means of execution used when calling into the runtime while syncing blocks. #[structopt( @@ -239,6 +241,21 @@ pub struct RunCmd { pub pool_config: TransactionPoolParams, } +/// Default to verbosity level 0, if none is provided. +fn parse_telemetry_endpoints(s: &str) -> Result<(String, u8), Box> { + let pos = s.find(' '); + match pos { + None => { + Ok((s.to_owned(), 0)) + }, + Some(pos_) => { + let verbosity = s[pos_ + 1..].parse()?; + let url = s[..pos_].parse()?; + Ok((url, verbosity)) + } + } +} + impl_augment_clap!(RunCmd); impl_get_log_filter!(RunCmd); diff --git a/core/client/db/src/light.rs b/core/client/db/src/light.rs index f6fdb9a4b7e5f..ceb0ff7c7932b 100644 --- a/core/client/db/src/light.rs +++ b/core/client/db/src/light.rs @@ -403,12 +403,13 @@ impl LightBlockchainStorage for LightStorage ); transaction.put(columns::HEADER, &lookup_key, &header.encode()); - if number.is_zero() { - transaction.put(columns::META, meta_keys::FINALIZED_BLOCK, &lookup_key); + let is_genesis = number.is_zero(); + if is_genesis { transaction.put(columns::META, meta_keys::GENESIS_HASH, hash.as_ref()); } let finalized = match leaf_state { + _ if is_genesis => true, NewBlockState::Final => true, _ => false, }; @@ -972,7 +973,7 @@ pub(crate) mod tests { #[test] fn test_leaves_pruned_on_finality() { let db = LightStorage::::new_test(); - let block0 = insert_final_block(&db, None, || default_header(&Default::default(), 0)); + let block0 = insert_block(&db, None, || default_header(&Default::default(), 0)); let block1_a = insert_block(&db, None, || default_header(&block0, 1)); let block1_b = insert_block(&db, None, || header_with_extrinsics_root(&block0, 1, [1; 32].into())); diff --git a/core/client/src/client.rs b/core/client/src/client.rs index 4eef88fad05c6..bac76bcd330c1 100644 --- a/core/client/src/client.rs +++ b/core/client/src/client.rs @@ -61,7 +61,7 @@ use crate::in_mem; use crate::block_builder::{self, api::BlockBuilder as BlockBuilderAPI}; use crate::genesis; use consensus; -use substrate_telemetry::telemetry; +use substrate_telemetry::*; use log::{info, trace, warn}; use error_chain::bail; @@ -729,7 +729,7 @@ impl Client where fork_choice, ); - telemetry!("block.import"; + telemetry!(SUBSTRATE_INFO; "block.import"; "height" => height, "best" => ?hash, "origin" => ?origin @@ -859,7 +859,7 @@ impl Client where warn!(" Header {:?}", header); warn!(" Native result {:?}", native_result); warn!(" Wasm result {:?}", wasm_result); - telemetry!("block.execute.consensus_failure"; + telemetry!(SUBSTRATE_INFO; "block.execute.consensus_failure"; "hash" => ?hash, "origin" => ?origin, "header" => ?header diff --git a/core/consensus/aura/Cargo.toml b/core/consensus/aura/Cargo.toml index 1b7f44cd38adc..81588f66dccb6 100644 --- a/core/consensus/aura/Cargo.toml +++ b/core/consensus/aura/Cargo.toml @@ -18,6 +18,7 @@ aura_primitives = { package = "substrate-consensus-aura-primitives", path = "pri inherents = { package = "substrate-inherents", path = "../../inherents" } srml-consensus = { path = "../../../srml/consensus" } srml-aura = { path = "../../../srml/aura" } +substrate-telemetry = { path = "../../telemetry" } futures = "0.1.17" tokio = "0.1.7" parking_lot = "0.7.1" diff --git a/core/consensus/aura/src/lib.rs b/core/consensus/aura/src/lib.rs index 8cb63c0464ead..a54ab3ae3fe64 100644 --- a/core/consensus/aura/src/lib.rs +++ b/core/consensus/aura/src/lib.rs @@ -52,6 +52,7 @@ use srml_aura::{ InherentType as AuraInherent, AuraInherentData, timestamp::{TimestampInherentData, InherentType as TimestampInherent, InherentError as TIError} }; +use substrate_telemetry::*; use aura_slots::{CheckedHeader, SlotWorker, SlotInfo, SlotCompatible}; @@ -265,12 +266,18 @@ impl SlotWorker for AuraWorker whe chain_head.hash(), e ); + telemetry!(CONSENSUS_WARN; "aura.unable_fetching_authorities"; + "slot" => ?chain_head.hash(), "err" => ?e + ); return Box::new(future::ok(())); } }; if self.sync_oracle.is_offline() && authorities.len() > 1 { - debug!(target: "aura", "Skipping proposal slot. Waiting for the netork."); + debug!(target: "aura", "Skipping proposal slot. Waiting for the network."); + telemetry!(CONSENSUS_DEBUG; "aura.skipping_proposal_slot"; + "authorities_len" => authorities.len() + ); return Box::new(future::ok(())); } @@ -282,12 +289,18 @@ impl SlotWorker for AuraWorker whe slot_num, timestamp ); + telemetry!(CONSENSUS_DEBUG; "aura.starting_authorship"; + "slot_num" => slot_num, "timestamp" => timestamp + ); // we are the slot author. make a block and sign it. let proposer = match env.init(&chain_head, &authorities) { Ok(p) => p, Err(e) => { warn!("Unable to author block in slot {:?}: {:?}", slot_num, e); + telemetry!(CONSENSUS_WARN; "aura.unable_authoring_block"; + "slot" => slot_num, "err" => ?e + ); return Box::new(future::ok(())) } }; @@ -315,6 +328,9 @@ impl SlotWorker for AuraWorker whe "Discarding proposal for slot {}; block production took too long", slot_num ); + telemetry!(CONSENSUS_INFO; "aura.discarding_proposal_took_too_long"; + "slot" => slot_num + ); return } @@ -348,10 +364,18 @@ impl SlotWorker for AuraWorker whe import_block.post_header().hash(), pre_hash ); + telemetry!(CONSENSUS_INFO; "aura.pre_sealed_block"; + "header_num" => ?header_num, + "hash_now" => ?import_block.post_header().hash(), + "hash_previously" => ?pre_hash + ); if let Err(e) = block_import.import_block(import_block, None) { warn!(target: "aura", "Error with block built on {:?}: {:?}", parent_hash, e); + telemetry!(CONSENSUS_WARN; "aura.err_with_block_built_on"; + "hash" => ?parent_hash, "err" => ?e + ); } }) .map_err(|e| consensus_common::ErrorKind::ClientImport(format!("{:?}", e)).into()) @@ -456,6 +480,9 @@ impl AuraVerifier "halting for block {} seconds in the future", diff ); + telemetry!(CONSENSUS_INFO; "aura.halting_for_future_block"; + "diff" => ?diff + ); thread::sleep(Duration::from_secs(diff)); Ok(()) }, @@ -504,6 +531,7 @@ impl AuraVerifier "halting for block {} seconds in the future", diff ); + telemetry!(CONSENSUS_INFO; "aura.halting_for_future_block"; "diff" => ?diff); thread::sleep(Duration::from_secs(diff)); Ok(()) }, @@ -589,6 +617,7 @@ impl Verifier for AuraVerifier where } trace!(target: "aura", "Checked {:?}; importing.", pre_header); + telemetry!(CONSENSUS_TRACE; "aura.checked_and_importing"; "pre_header" => ?pre_header); extra_verification.into_future().wait()?; @@ -608,6 +637,9 @@ impl Verifier for AuraVerifier where } CheckedHeader::Deferred(a, b) => { debug!(target: "aura", "Checking {:?} failed; {:?}, {:?}.", hash, a, b); + telemetry!(CONSENSUS_DEBUG; "aura.header_too_far_in_future"; + "hash" => ?hash, "a" => ?a, "b" => ?b + ); Err(format!("Header {:?} rejected: too far in the future", hash)) } } diff --git a/core/finality-grandpa/Cargo.toml b/core/finality-grandpa/Cargo.toml index 51e24eeba48d8..4f8dfb6d9031b 100644 --- a/core/finality-grandpa/Cargo.toml +++ b/core/finality-grandpa/Cargo.toml @@ -13,6 +13,7 @@ substrate-primitives = { path = "../primitives" } substrate-client = { path = "../client" } substrate-network = { path = "../network" } substrate-service = { path = "../service", optional = true } +substrate-telemetry = { path = "../telemetry" } log = "0.4" parking_lot = "0.7.1" tokio = "0.1.7" diff --git a/core/finality-grandpa/src/lib.rs b/core/finality-grandpa/src/lib.rs index 1394f210fa172..73458b299d7cc 100644 --- a/core/finality-grandpa/src/lib.rs +++ b/core/finality-grandpa/src/lib.rs @@ -59,6 +59,7 @@ extern crate sr_primitives as runtime_primitives; extern crate substrate_consensus_common as consensus_common; extern crate substrate_network as network; extern crate substrate_primitives; +extern crate substrate_telemetry; extern crate tokio; extern crate parking_lot; extern crate parity_codec as codec; @@ -100,6 +101,7 @@ use runtime_primitives::traits::{ use fg_primitives::GrandpaApi; use runtime_primitives::generic::BlockId; use substrate_primitives::{ed25519, H256, Ed25519AuthorityId, Blake2Hasher}; +use substrate_telemetry::{telemetry, CONSENSUS_DEBUG, CONSENSUS_TRACE, CONSENSUS_WARN}; use grandpa::Error as GrandpaError; use grandpa::{voter, round::State as RoundState, BlockNumberOps, VoterSet}; @@ -281,6 +283,42 @@ struct TopicTracker { set_id: u64, } +impl TopicTracker { + fn is_expired(&self, round: u64, set_id: u64) -> bool { + if set_id < self.set_id { + trace!(target: "afg", "Expired: Message with expired set_id {} (ours {})", set_id, self.set_id); + telemetry!(CONSENSUS_TRACE; "afg.expired_set_id"; + "set_id" => ?set_id, "ours" => ?self.set_id + ); + return true; + } else if set_id == self.set_id + 1 { + // allow a few first rounds of future set. + if round > MESSAGE_ROUND_TOLERANCE { + trace!(target: "afg", "Expired: Message too far in the future set, round {} (ours set_id {})", round, self.set_id); + telemetry!(CONSENSUS_TRACE; "afg.expired_msg_too_far_in_future_set"; + "round" => ?round, "ours" => ?self.set_id + ); + return true; + } + } else if set_id == self.set_id { + if round < self.min_live_round.saturating_sub(MESSAGE_ROUND_TOLERANCE) { + trace!(target: "afg", "Expired: Message round is out of bounds {} (ours {}-{})", round, self.min_live_round, self.max_round); + telemetry!(CONSENSUS_TRACE; "afg.msg_round_oob"; + "round" => ?round, "our_min_live_round" => ?self.min_live_round, "our_max_round" => ?self.max_round + ); + return true; + } + } else { + trace!(target: "afg", "Expired: Message in invalid future set {} (ours {})", set_id, self.set_id); + telemetry!(CONSENSUS_TRACE; "afg.expired_msg_in_invalid_future_set"; + "set_id" => ?set_id, "ours" => ?self.set_id + ); + return true; + } + false + } +} + struct GossipValidator { rounds: parking_lot::RwLock, _marker: ::std::marker::PhantomData, @@ -322,26 +360,7 @@ impl GossipValidator { } fn is_expired(&self, round: u64, set_id: u64) -> bool { - let rounds = self.rounds.read(); - if set_id < rounds.set_id { - trace!(target: "afg", "Expired: Message with expired set_id {} (ours {})", set_id, rounds.set_id); - return true; - } else if set_id == rounds.set_id + 1 { - // allow a few first rounds of future set. - if round > MESSAGE_ROUND_TOLERANCE { - trace!(target: "afg", "Expired: Message too far in the future set, round {} (ours set_id {})", round, rounds.set_id); - return true; - } - } else if set_id == rounds.set_id { - if round < rounds.min_live_round.saturating_sub(MESSAGE_ROUND_TOLERANCE) { - trace!(target: "afg", "Expired: Message round is out of bounds {} (ours {}-{})", round, rounds.min_live_round, rounds.max_round); - return true; - } - } else { - trace!(target: "afg", "Expired: Message in invalid future set {} (ours {})", set_id, rounds.set_id); - return true; - } - false + self.rounds.read().is_expired(round, set_id) } fn validate_round_message(&self, full: VoteOrPrecommitMessage) @@ -359,6 +378,7 @@ impl GossipValidator { full.set_id ) { debug!(target: "afg", "Bad message signature {}", full.message.id); + telemetry!(CONSENSUS_DEBUG; "afg.bad_msg_signature"; "signature" => ?full.message.id); return network_gossip::ValidationResult::Invalid; } @@ -376,6 +396,11 @@ impl GossipValidator { if full.message.precommits.len() != full.message.auth_data.len() || full.message.precommits.is_empty() { debug!(target: "afg", "Malformed compact commit"); + telemetry!(CONSENSUS_DEBUG; "afg.malformed_compact_commit"; + "precommits_len" => ?full.message.precommits.len(), + "auth_data_len" => ?full.message.auth_data.len(), + "precommits_is_empty" => ?full.message.precommits.is_empty(), + ); return network_gossip::ValidationResult::Invalid; } @@ -389,6 +414,7 @@ impl GossipValidator { full.set_id, ) { debug!(target: "afg", "Bad commit message signature {}", id); + telemetry!(CONSENSUS_DEBUG; "afg.bad_commit_msg_signature"; "id" => ?id); return network_gossip::ValidationResult::Invalid; } } @@ -405,10 +431,23 @@ impl network_gossip::Validator for GossipValidator self.validate_commit_message(message), None => { debug!(target: "afg", "Error decoding message"); + telemetry!(CONSENSUS_DEBUG; "afg.err_decoding_msg"; "" => ""); network_gossip::ValidationResult::Invalid } } } + + fn message_expired<'a>(&'a self) -> Box bool + 'a> { + let rounds = self.rounds.read(); + Box::new(move |_topic, mut data| { + match GossipMessage::::decode(&mut data) { + None => true, + Some(GossipMessage::Commit(full)) => rounds.is_expired(full.round, full.set_id), + Some(GossipMessage::VoteOrPrecommit(full)) => + rounds.is_expired(full.round, full.set_id), + } + }) + } } /// A handle to the network. This is generally implemented by providing some @@ -484,7 +523,7 @@ impl,> Network(round, set_id)); + let inner_rx = gossip.messages_for(GRANDPA_ENGINE_ID, message_topic::(round, set_id)); let _ = tx.send(inner_rx); }); NetworkStream { outer: rx, inner: None } @@ -509,7 +548,7 @@ impl,> Network(set_id)); + let inner_rx = gossip.messages_for(GRANDPA_ENGINE_ID, commit_topic::(set_id)); let _ = tx.send(inner_rx); }); NetworkStream { outer: rx, inner: None } @@ -522,6 +561,9 @@ impl,> Network ?block, "round" => ?round + ); self.service.announce_block(block) } } @@ -579,6 +621,9 @@ pub fn block_import, RA, PRA>( // are unsupported for following GRANDPA directly. let genesis_authorities = api.runtime_api() .grandpa_authorities(&BlockId::number(Zero::zero()))?; + telemetry!(CONSENSUS_DEBUG; "afg.loading_authorities"; + "authorities_len" => ?genesis_authorities.len() + ); let authority_set = SharedAuthoritySet::genesis(genesis_authorities); let encoded = authority_set.inner().read().encode(); @@ -732,6 +777,9 @@ pub fn run_grandpa, N, RA>( let voter_work = future::loop_fn(initial_state, move |params| { let (env, last_round_number, last_state, authority_set_change) = params; debug!(target: "afg", "{}: Starting new voter with set ID {}", config.name(), env.set_id); + telemetry!(CONSENSUS_DEBUG; "afg.starting_new_voter"; + "name" => ?config.name(), "set_id" => ?env.set_id + ); let chain_info = match client.info() { Ok(i) => i, @@ -819,7 +867,10 @@ pub fn run_grandpa, N, RA>( let voter_work = voter_work .join(broadcast_worker) .map(|((), ())| ()) - .map_err(|e| warn!("GRANDPA Voter failed: {:?}", e)); + .map_err(|e| { + warn!("GRANDPA Voter failed: {:?}", e); + telemetry!(CONSENSUS_WARN; "afg.voter_failed"; "e" => ?e); + }); Ok(voter_work.select(on_exit).then(|_| Ok(()))) } diff --git a/core/finality-grandpa/src/tests.rs b/core/finality-grandpa/src/tests.rs index 470640ae826fa..2cca70f812711 100644 --- a/core/finality-grandpa/src/tests.rs +++ b/core/finality-grandpa/src/tests.rs @@ -182,7 +182,10 @@ impl Network for MessageRouting { self.validator.note_round(round, set_id); let inner = self.inner.lock(); let peer = inner.peer(self.peer_id); - let messages = peer.consensus_gossip_messages_for(make_topic(round, set_id)); + let messages = peer.consensus_gossip_messages_for( + GRANDPA_ENGINE_ID, + make_topic(round, set_id), + ); let messages = messages.map_err( move |_| panic!("Messages for round {} dropped too early", round) @@ -212,7 +215,10 @@ impl Network for MessageRouting { self.validator.note_set(set_id); let inner = self.inner.lock(); let peer = inner.peer(self.peer_id); - let messages = peer.consensus_gossip_messages_for(make_commit_topic(set_id)); + let messages = peer.consensus_gossip_messages_for( + GRANDPA_ENGINE_ID, + make_commit_topic(set_id), + ); let messages = messages.map_err( move |_| panic!("Commit messages for set {} dropped too early", set_id) diff --git a/core/network/src/consensus_gossip.rs b/core/network/src/consensus_gossip.rs index 720cf55a67b75..8ed6f8ba58f75 100644 --- a/core/network/src/consensus_gossip.rs +++ b/core/network/src/consensus_gossip.rs @@ -40,17 +40,26 @@ struct PeerConsensus { is_authority: bool, } +#[derive(Clone, Copy)] +enum Status { + Live, + Future, +} + struct MessageEntry { message_hash: B::Hash, topic: B::Hash, message: ConsensusMessage, timestamp: Instant, + status: Status, } /// Message validation result. pub enum ValidationResult { /// Message is valid with this topic. Valid(H), + /// Message is future with this topic. + Future(H), /// Invalid message. Invalid, /// Obsolete message. @@ -61,12 +70,20 @@ pub enum ValidationResult { pub trait Validator { /// Validate consensus message. fn validate(&self, data: &[u8]) -> ValidationResult; + + /// Produce a closure for validating messages on a given topic. + fn message_expired<'a>(&'a self) -> Box bool + 'a> { + Box::new(move |_topic, data| match self.validate(data) { + ValidationResult::Valid(_) | ValidationResult::Future(_) => false, + ValidationResult::Invalid | ValidationResult::Expired => true, + }) + } } /// Consensus network protocol handler. Manages statements and candidate requests. pub struct ConsensusGossip { peers: HashMap>, - live_message_sinks: HashMap>>>, + live_message_sinks: HashMap<(ConsensusEngineId, B::Hash), Vec>>>, messages: Vec>, known_messages: LruCache, validators: HashMap>>, @@ -102,7 +119,9 @@ impl ConsensusGossip { // Send out all known messages to authorities. let mut known_messages = HashSet::new(); for entry in self.messages.iter() { - if entry.timestamp + MESSAGE_LIFETIME < now { continue }; + if entry.timestamp + MESSAGE_LIFETIME < now { continue } + if let Status::Future = entry.status { continue } + known_messages.insert(entry.message_hash); protocol.send_message(who, Message::Consensus(entry.message.clone())); } @@ -161,18 +180,23 @@ impl ConsensusGossip { } } - fn register_message(&mut self, message_hash: B::Hash, topic: B::Hash, get_message: F) + fn register_message( + &mut self, + message_hash: B::Hash, + topic: B::Hash, + status: Status, + get_message: F, + ) where F: Fn() -> ConsensusMessage { - if self.known_messages.insert(message_hash, ()).is_none() - { + if self.known_messages.insert(message_hash, ()).is_none() { self.messages.push(MessageEntry { topic, message_hash, message: get_message(), timestamp: Instant::now(), + status, }); - } } @@ -184,6 +208,8 @@ impl ConsensusGossip { /// Prune old or no longer relevant consensus messages. Provide a predicate /// for pruning, which returns `false` when the items with a given topic should be pruned. pub fn collect_garbage(&mut self) { + use std::collections::hash_map::Entry; + self.live_message_sinks.retain(|_, sinks| { sinks.retain(|sink| !sink.is_closed()); !sinks.is_empty() @@ -194,15 +220,23 @@ impl ConsensusGossip { let validators = &self.validators; let now = Instant::now(); - self.messages.retain(|entry| { - entry.timestamp + MESSAGE_LIFETIME >= now - && match validators.get(&entry.message.engine_id) - .map(|v| v.validate(&entry.message.data)) - { - Some(ValidationResult::Valid(_)) => true, - _ => false, - } - }); + let mut check_fns = HashMap::new(); + let mut message_expired = move |entry: &MessageEntry| { + let engine_id = entry.message.engine_id; + let check_fn = match check_fns.entry(engine_id) { + Entry::Occupied(entry) => entry.into_mut(), + Entry::Vacant(vacant) => match validators.get(&engine_id) { + None => return true, // treat all messages with no validator as expired + Some(validator) => vacant.insert(validator.message_expired()), + } + }; + + (check_fn)(entry.topic, &entry.message.data) + }; + + self.messages.retain(|entry| + entry.timestamp + MESSAGE_LIFETIME >= now && !message_expired(entry) + ); trace!(target: "gossip", "Cleaned up {} stale messages, {} left ({} known)", before - self.messages.len(), @@ -216,12 +250,46 @@ impl ConsensusGossip { } /// Get data of valid, incoming messages for a topic (but might have expired meanwhile) - pub fn messages_for(&mut self, topic: B::Hash) -> mpsc::UnboundedReceiver> { + pub fn messages_for(&mut self, engine_id: ConsensusEngineId, topic: B::Hash) + -> mpsc::UnboundedReceiver> + { let (tx, rx) = mpsc::unbounded(); - for entry in self.messages.iter().filter(|e| e.topic == topic) { - tx.unbounded_send(entry.message.data.clone()).expect("receiver known to be live; qed"); + + let validator = match self.validators.get(&engine_id) { + None => { + self.live_message_sinks.entry((engine_id, topic)).or_default().push(tx); + return rx; + } + Some(v) => v, + }; + + for entry in self.messages.iter_mut() + .filter(|e| e.topic == topic && e.message.engine_id == engine_id) + { + let live = match entry.status { + Status::Live => true, + Status::Future => match validator.validate(&entry.message.data) { + ValidationResult::Valid(_) => { + entry.status = Status::Live; + true + } + _ => { + // don't send messages considered to be future still. + // if messages are considered expired they'll be cleaned up when we + // collect garbage. + false + } + } + }; + + if live { + entry.status = Status::Live; + tx.unbounded_send(entry.message.data.clone()) + .expect("receiver known to be live; qed"); + } } - self.live_message_sinks.entry(topic).or_default().push(tx); + + self.live_message_sinks.entry((engine_id, topic)).or_default().push(tx); rx } @@ -247,11 +315,13 @@ impl ConsensusGossip { if let Some(ref mut peer) = self.peers.get_mut(&who) { use std::collections::hash_map::Entry; + let engine_id = message.engine_id; //validate the message - let topic = match self.validators.get(&message.engine_id) + let (topic, status) = match self.validators.get(&engine_id) .map(|v| v.validate(&message.data)) { - Some(ValidationResult::Valid(topic)) => topic, + Some(ValidationResult::Valid(topic)) => (topic, Status::Live), + Some(ValidationResult::Future(topic)) => (topic, Status::Future), Some(ValidationResult::Invalid) => { trace!(target:"gossip", "Invalid message from {}", who); protocol.report_peer( @@ -275,13 +345,14 @@ impl ConsensusGossip { who, Severity::Useless(format!("Sent unknown consensus engine id")), ); - trace!(target:"gossip", "Unknown message engine id {:?} from {}", message.engine_id, who); + trace!(target:"gossip", "Unknown message engine id {:?} from {}", + engine_id, who); return None; } }; peer.known_messages.insert(message_hash); - if let Entry::Occupied(mut entry) = self.live_message_sinks.entry(topic) { + if let Entry::Occupied(mut entry) = self.live_message_sinks.entry((engine_id, topic)) { debug!(target: "gossip", "Pushing consensus message to sinks for {}.", topic); entry.get_mut().retain(|sink| { if let Err(e) = sink.unbounded_send(message.data.clone()) { @@ -293,7 +364,7 @@ impl ConsensusGossip { entry.remove_entry(); } } - self.multicast_inner(protocol, message_hash, topic, || message.clone()); + self.multicast_inner(protocol, message_hash, topic, status, || message.clone()); Some((topic, message)) } else { trace!(target:"gossip", "Ignored statement from unregistered peer {}", who); @@ -309,7 +380,7 @@ impl ConsensusGossip { message: ConsensusMessage, ) { let message_hash = HashFor::::hash(&message.data); - self.multicast_inner(protocol, message_hash, topic, || message.clone()); + self.multicast_inner(protocol, message_hash, topic, Status::Live, || message.clone()); } fn multicast_inner( @@ -317,12 +388,15 @@ impl ConsensusGossip { protocol: &mut Context, message_hash: B::Hash, topic: B::Hash, + status: Status, get_message: F, ) where F: Fn() -> ConsensusMessage { - self.register_message(message_hash, topic, &get_message); - self.propagate(protocol, message_hash, get_message); + self.register_message(message_hash, topic, status, &get_message); + if let Status::Live = status { + self.propagate(protocol, message_hash, get_message); + } } /// Note new consensus session. @@ -335,6 +409,8 @@ impl ConsensusGossip { mod tests { use runtime_primitives::testing::{H256, Block as RawBlock, ExtrinsicWrapper}; use std::time::Instant; + use futures::Stream; + use super::*; type Block = RawBlock>; @@ -347,21 +423,21 @@ mod tests { message_hash: $hash, message: ConsensusMessage { data: $m, engine_id: [0, 0, 0, 0]}, timestamp: $now, + status: Status::Live, }); } } } - #[test] - fn collects_garbage() { - - struct AllowAll; - impl Validator for AllowAll { - fn validate(&self, _data: &[u8]) -> ValidationResult { - ValidationResult::Valid(H256::default()) - } + struct AllowAll; + impl Validator for AllowAll { + fn validate(&self, _data: &[u8]) -> ValidationResult { + ValidationResult::Valid(H256::default()) } + } + #[test] + fn collects_garbage() { struct AllowOne; impl Validator for AllowOne { fn validate(&self, data: &[u8]) -> ValidationResult { @@ -417,14 +493,15 @@ mod tests { use futures::Stream; let mut consensus = ConsensusGossip::::new(); + consensus.register_validator([0, 0, 0, 0], Arc::new(AllowAll)); let message = ConsensusMessage { data: vec![4, 5, 6], engine_id: [0, 0, 0, 0] }; let message_hash = HashFor::::hash(&message.data); let topic = HashFor::::hash(&[1,2,3]); - consensus.register_message(message_hash, topic, || message.clone()); - let stream = consensus.messages_for(topic); + consensus.register_message(message_hash, topic, Status::Live, || message.clone()); + let stream = consensus.messages_for([0, 0, 0, 0], topic); assert_eq!(stream.wait().next(), Some(Ok(message.data))); } @@ -437,29 +514,47 @@ mod tests { let msg_a = ConsensusMessage { data: vec![1, 2, 3], engine_id: [0, 0, 0, 0] }; let msg_b = ConsensusMessage { data: vec![4, 5, 6], engine_id: [0, 0, 0, 0] }; - consensus.register_message(HashFor::::hash(&msg_a.data), topic, || msg_a.clone()); - consensus.register_message(HashFor::::hash(&msg_b.data), topic, || msg_b.clone()); + consensus.register_message(HashFor::::hash(&msg_a.data), topic, Status::Live, || msg_a.clone()); + consensus.register_message(HashFor::::hash(&msg_b.data), topic, Status::Live, || msg_b.clone()); assert_eq!(consensus.messages.len(), 2); } #[test] fn can_keep_multiple_subscribers_per_topic() { - use futures::Stream; - let mut consensus = ConsensusGossip::::new(); + consensus.register_validator([0, 0, 0, 0], Arc::new(AllowAll)); let message = ConsensusMessage { data: vec![4, 5, 6], engine_id: [0, 0, 0, 0] }; let message_hash = HashFor::::hash(&message.data); let topic = HashFor::::hash(&[1,2,3]); - consensus.register_message(message_hash, topic, || message.clone()); + consensus.register_message(message_hash, topic, Status::Live, || message.clone()); - let stream1 = consensus.messages_for(topic); - let stream2 = consensus.messages_for(topic); + let stream1 = consensus.messages_for([0, 0, 0, 0], topic); + let stream2 = consensus.messages_for([0, 0, 0, 0], topic); assert_eq!(stream1.wait().next(), Some(Ok(message.data.clone()))); assert_eq!(stream2.wait().next(), Some(Ok(message.data))); } + + #[test] + fn topics_are_localized_to_engine_id() { + let mut consensus = ConsensusGossip::::new(); + consensus.register_validator([0, 0, 0, 0], Arc::new(AllowAll)); + + let topic = [1; 32].into(); + let msg_a = ConsensusMessage { data: vec![1, 2, 3], engine_id: [0, 0, 0, 0] }; + let msg_b = ConsensusMessage { data: vec![4, 5, 6], engine_id: [0, 0, 0, 1] }; + + consensus.register_message(HashFor::::hash(&msg_a.data), topic, Status::Live, || msg_a.clone()); + consensus.register_message(HashFor::::hash(&msg_b.data), topic, Status::Live, || msg_b.clone()); + + let mut stream = consensus.messages_for([0, 0, 0, 0], topic).wait(); + + assert_eq!(stream.next(), Some(Ok(vec![1, 2, 3]))); + let _ = consensus.live_message_sinks.remove(&([0, 0, 0, 0], topic)); + assert_eq!(stream.next(), None); + } } diff --git a/core/network/src/test/mod.rs b/core/network/src/test/mod.rs index f34ceaa4b88e9..94cafc0f0555b 100644 --- a/core/network/src/test/mod.rs +++ b/core/network/src/test/mod.rs @@ -379,11 +379,12 @@ impl, D> Peer { /// access the underlying consensus gossip handler pub fn consensus_gossip_messages_for( &self, + engine_id: ConsensusEngineId, topic: ::Hash, ) -> mpsc::UnboundedReceiver> { let (tx, rx) = oneshot::channel(); self.with_gossip(move |gossip, _| { - let inner_rx = gossip.messages_for(topic); + let inner_rx = gossip.messages_for(engine_id, topic); let _ = tx.send(inner_rx); }); rx.wait().ok().expect("1. Network is running, 2. it should handle the above closure successfully") diff --git a/core/service/src/chain_spec.rs b/core/service/src/chain_spec.rs index 5351bc96fdf22..8e231f40294f7 100644 --- a/core/service/src/chain_spec.rs +++ b/core/service/src/chain_spec.rs @@ -25,6 +25,7 @@ use runtime_primitives::{BuildStorage, StorageOverlay, ChildrenStorageOverlay}; use serde_json as json; use crate::components::RuntimeGenesis; use network::Multiaddr; +use tel::TelemetryEndpoints; enum GenesisSource { File(PathBuf), @@ -87,7 +88,7 @@ struct ChainSpecFile { pub name: String, pub id: String, pub boot_nodes: Vec, - pub telemetry_url: Option, + pub telemetry_endpoints: Option, pub protocol_id: Option, pub consensus_engine: Option, pub properties: Option, @@ -124,8 +125,8 @@ impl ChainSpec { &self.spec.id } - pub fn telemetry_url(&self) -> Option<&str> { - self.spec.telemetry_url.as_ref().map(String::as_str) + pub fn telemetry_endpoints(&self) -> &Option { + &self.spec.telemetry_endpoints } pub fn protocol_id(&self) -> Option<&str> { @@ -170,7 +171,7 @@ impl ChainSpec { id: &str, constructor: fn() -> G, boot_nodes: Vec, - telemetry_url: Option<&str>, + telemetry_endpoints: Option, protocol_id: Option<&str>, consensus_engine: Option<&str>, properties: Option, @@ -180,7 +181,7 @@ impl ChainSpec { name: name.to_owned(), id: id.to_owned(), boot_nodes: boot_nodes, - telemetry_url: telemetry_url.map(str::to_owned), + telemetry_endpoints, protocol_id: protocol_id.map(str::to_owned), consensus_engine: consensus_engine.map(str::to_owned), properties, diff --git a/core/service/src/config.rs b/core/service/src/config.rs index 84dfe599e2fcc..9883996eacf62 100644 --- a/core/service/src/config.rs +++ b/core/service/src/config.rs @@ -25,6 +25,7 @@ pub use network::config::{NetworkConfiguration, Roles}; use runtime_primitives::BuildStorage; use serde::{Serialize, de::DeserializeOwned}; use target_info::Target; +use tel::TelemetryEndpoints; /// Service configuration. #[derive(Clone)] @@ -64,7 +65,7 @@ pub struct Configuration { /// RPC over Websockets binding address. `None` if disabled. pub rpc_ws: Option, /// Telemetry service URL. `None` if disabled. - pub telemetry_url: Option, + pub telemetry_endpoints: Option, /// The default number of 64KB pages to allocate for Wasm execution pub default_heap_pages: Option, } @@ -90,11 +91,13 @@ impl Configuration Service { let version = config.full_version(); info!("Best block: #{}", best_header.number()); - telemetry!("node.start"; "height" => best_header.number().as_(), "best" => ?best_header.hash()); + telemetry!(SUBSTRATE_INFO; "node.start"; "height" => best_header.number().as_(), "best" => ?best_header.hash()); let network_protocol = ::build_network_protocol(&config)?; let transaction_pool = Arc::new( @@ -269,7 +269,7 @@ impl Service { )?; // Telemetry - let telemetry = config.telemetry_url.clone().map(|url| { + let telemetry = config.telemetry_endpoints.clone().map(|endpoints| { let is_authority = config.roles == Roles::AUTHORITY; let pubkey = format!("{}", public_key); let name = config.name.clone(); @@ -277,9 +277,9 @@ impl Service { let version = version.clone(); let chain_name = config.chain_spec.name().to_owned(); Arc::new(tel::init_telemetry(tel::TelemetryConfig { - url: url, + endpoints, on_connect: Box::new(move || { - telemetry!("system.connected"; + telemetry!(SUBSTRATE_INFO; "system.connected"; "name" => name.clone(), "implementation" => impl_name.clone(), "version" => version.clone(), diff --git a/core/service/test/src/lib.rs b/core/service/test/src/lib.rs index 10cac1608ad28..795fa05bc733d 100644 --- a/core/service/test/src/lib.rs +++ b/core/service/test/src/lib.rs @@ -120,7 +120,7 @@ fn node_config ( execution_strategies: Default::default(), rpc_http: None, rpc_ws: None, - telemetry_url: None, + telemetry_endpoints: None, default_heap_pages: None, } } diff --git a/core/telemetry/Cargo.toml b/core/telemetry/Cargo.toml index 6567cdcf13ea5..0553c4f16b757 100644 --- a/core/telemetry/Cargo.toml +++ b/core/telemetry/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "substrate-telemetry" -version = "0.3.0" +version = "0.3.1" authors = ["Parity Technologies "] description = "Telemetry utils" edition = "2018" @@ -9,6 +9,9 @@ edition = "2018" parking_lot = "0.7.1" lazy_static = "1.0" log = "0.4" +rand = "0.6" +serde = "1.0.81" +serde_derive = "1.0" slog = "^2" slog-json = "^2" slog-async = "^2" diff --git a/core/telemetry/src/lib.rs b/core/telemetry/src/lib.rs index 9afa7b03ed3db..37bb8c1a8e7e3 100644 --- a/core/telemetry/src/lib.rs +++ b/core/telemetry/src/lib.rs @@ -26,15 +26,21 @@ use std::sync::Arc; use parking_lot::Mutex; use slog::{Drain, o}; use log::trace; +use rand::{thread_rng, Rng}; pub use slog_scope::with_logger; pub use slog; +use serde_derive::{Serialize, Deserialize}; +use slog::OwnedKVList; +use slog::Record; +use core::result; /// Configuration for telemetry. pub struct TelemetryConfig { - /// URL of the telemetry WebSocket server. - pub url: String, - /// What do do when we connect to the server. - pub on_connect: Box, + /// Collection of telemetry WebSocket servers with a corresponding verbosity level. + pub endpoints: TelemetryEndpoints, + /// What do do when we connect to the servers. + /// Note that this closure is executed each time we connect to a telemetry endpoint. + pub on_connect: Box, } /// Telemetry service guard. @@ -43,66 +49,165 @@ pub type Telemetry = slog_scope::GlobalLoggerGuard; /// Size of the channel for passing messages to telemetry thread. const CHANNEL_SIZE: usize = 262144; +/// Log levels. +pub const SUBSTRATE_DEBUG: &str = "9"; +pub const SUBSTRATE_INFO: &str = "0"; + +pub const CONSENSUS_TRACE: &str = "9"; +pub const CONSENSUS_DEBUG: &str = "5"; +pub const CONSENSUS_WARN: &str = "4"; +pub const CONSENSUS_INFO: &str = "3"; + +/// Multiply logging to all drains. This is similar to `slog::Duplicate`, which is +/// limited to two drains though and doesn't support dynamic nesting at runtime. +#[derive(Debug, Clone)] +pub struct Multiply (pub Vec>); + +impl Multiply { + pub fn new(v: Vec>) -> Self { + Multiply(v) + } +} + +impl Drain for Multiply { + type Ok = Vec; + type Err = Vec; + + fn log(&self, record: &Record, logger_values: &OwnedKVList) -> result::Result { + let mut oks = Vec::new(); + let mut errs = Vec::new(); + + self.0.iter().for_each(|l| { + let res: Result<::Ok, ::Err> = (*l).log(record, logger_values); + match res { + Ok(o) => oks.push(o), + Err(e) => errs.push(e), + } + }); + + if !errs.is_empty() { + result::Result::Err(errs) + } else { + result::Result::Ok(oks) + } + } +} + /// Initialise telemetry. pub fn init_telemetry(config: TelemetryConfig) -> slog_scope::GlobalLoggerGuard { - let writer = TelemetryWriter::new(); - let out_sync = writer.out.clone(); - let log = slog::Logger::root( - slog_async::Async::new( - slog_json::Json::default(writer).fuse() - ).chan_size(CHANNEL_SIZE) + let mut endpoint_drains: Vec>> = Vec::new(); + let mut out_syncs = Vec::new(); + + // Set up a filter/drain for each endpoint + config.endpoints.0.iter().for_each(|(url, verbosity)| { + let writer = TelemetryWriter::new(Arc::new(url.to_owned())); + let out_sync = writer.out.clone(); + out_syncs.push(out_sync); + + let until_verbosity = *verbosity; + let filter = slog::Filter( + slog_json::Json::default(writer).fuse(), + move |rec| { + let tag = rec.tag().parse::() + .expect("`telemetry!` macro requires tag."); + tag <= until_verbosity + }); + + let filter = Box::new(filter) as Box>; + endpoint_drains.push(filter); + }); + + // Set up logging to all endpoints + let drain = slog_async::Async::new(Multiply::new(endpoint_drains).fuse()); + let root = slog::Logger::root(drain.chan_size(CHANNEL_SIZE) .overflow_strategy(slog_async::OverflowStrategy::DropAndReport) .build().fuse(), o!() ); - let logger_guard = slog_scope::set_global_logger(log); - - thread::spawn(move || { - loop { - trace!(target: "telemetry", "Connecting to Telemetry... {:?}", config.url); - let _ = ws::connect(config.url.as_str(), |out| Connection::new(out, &*out_sync, &config)); - - thread::sleep(time::Duration::from_millis(5000)); - } + let logger_guard = slog_scope::set_global_logger(root); + + // Spawn a thread for each endpoint + let on_connect = Arc::new(config.on_connect); + config.endpoints.0.into_iter().for_each(|(url, verbosity)| { + let inner_verbosity = Arc::new(verbosity.to_owned()); + let inner_on_connect = Arc::clone(&on_connect); + + let out_sync = out_syncs.remove(0); + let out_sync = Arc::clone(&out_sync); + + thread::spawn(move || { + loop { + let on_connect = Arc::clone(&inner_on_connect); + let out_sync = Arc::clone(&out_sync); + let verbosity = Arc::clone(&inner_verbosity); + + trace!(target: "telemetry", + "Connecting to Telemetry at {} with verbosity {}", url, Arc::clone(&verbosity)); + + let _ = ws::connect(url.to_owned(), + |out| { + Connection::new(out, Arc::clone(&out_sync), Arc::clone(&on_connect), url.clone()) + }); + + // Sleep for a random time between 5-10 secs. If there are general connection + // issues not all threads should be synchronized in their re-connection time. + let random_sleep = thread_rng().gen_range(0, 5); + thread::sleep(time::Duration::from_secs(5) + time::Duration::from_secs(random_sleep)); + } + }); }); return logger_guard; } -/// Exactly equivalent to `slog_scope::info`, provided as a convenience. +/// Translates to `slog_scope::info`, but contains an additional verbosity +/// parameter which the log record is tagged with. Additionally the verbosity +/// parameter is added to the record as a key-value pair. #[macro_export] macro_rules! telemetry { - ( $($t:tt)* ) => { $crate::with_logger(|l| $crate::slog::slog_info!(l, $($t)* )) } + ( $a:expr; $b:expr; $( $t:tt )* ) => { + $crate::with_logger(|l| { + $crate::slog::slog_info!(l, #$a, $b; "verbosity" => stringify!($a), $($t)* ) + }) + } } -struct Connection<'a> { +struct Connection { out: ws::Sender, - out_sync: &'a Mutex>, - config: &'a TelemetryConfig, + out_sync: Arc>>, + on_connect: Arc>, + url: String, } -impl<'a> Connection<'a> { - fn new(out: ws::Sender, out_sync: &'a Mutex>, config: &'a TelemetryConfig) -> Self { +impl Connection { + fn new( + out: ws::Sender, + out_sync: Arc>>, + on_connect: Arc>, + url: String + ) -> Self { Connection { out, out_sync, - config, + on_connect, + url, } } } -impl<'a> ws::Handler for Connection<'a> { +impl ws::Handler for Connection { fn on_open(&mut self, _: ws::Handshake) -> ws::Result<()> { - trace!(target: "telemetry", "Connected!"); + trace!(target: "telemetry", "Connected to {}!", self.url); *self.out_sync.lock() = Some(self.out.clone()); - (self.config.on_connect)(); + (self.on_connect)(); Ok(()) } fn on_close(&mut self, code: ws::CloseCode, reason: &str) { *self.out_sync.lock() = None; - trace!(target: "telemetry", "Connection closing due to ({:?}) {}", code, reason); + trace!(target: "telemetry", "Connection to {} closing due to ({:?}) {}", + self.url, code, reason); } fn on_error(&mut self, _: ws::Error) { @@ -117,15 +222,17 @@ impl<'a> ws::Handler for Connection<'a> { struct TelemetryWriter { buffer: Vec, out: Arc>>, + url: Arc, } impl TelemetryWriter { - fn new() -> Self { + fn new(url: Arc) -> Self { let out = Arc::new(Mutex::new(None)); TelemetryWriter { buffer: Vec::new(), out, + url, } } } @@ -155,11 +262,11 @@ impl io::Write for TelemetryWriter { let error = if let Some(ref mut o) = *out { let r = o.send(s); - trace!(target: "telemetry", "Sent to telemetry: {} -> {:?}", s, r); + trace!(target: "telemetry", "Sent to telemetry {}: {} -> {:?}", self.url, s, r); r.is_err() } else { - trace!(target: "telemetry", "Telemetry socket closed, failed to send: {}", s); + trace!(target: "telemetry", "Telemetry socket closed to {}, failed to send: {}", self.url, s); false }; @@ -171,3 +278,12 @@ impl io::Write for TelemetryWriter { Ok(()) } } + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct TelemetryEndpoints (Vec<(String, u8)>); + +impl TelemetryEndpoints { + pub fn new(endpoints: Vec<(String, u8)>) -> Self { + TelemetryEndpoints(endpoints) + } +} diff --git a/node/cli/Cargo.toml b/node/cli/Cargo.toml index 2c81f127d1c26..082d168a4e044 100644 --- a/node/cli/Cargo.toml +++ b/node/cli/Cargo.toml @@ -30,6 +30,7 @@ grandpa = { package = "substrate-finality-grandpa", path = "../../core/finality- sr-primitives = { path = "../../core/sr-primitives" } node-executor = { path = "../executor" } substrate-keystore = { path = "../../core/keystore" } +substrate-telemetry = { package = "substrate-telemetry", path = "../../core/telemetry" } [dev-dependencies] service-test = { package = "substrate-service-test", path = "../../core/service/test" } diff --git a/node/cli/src/chain_spec.rs b/node/cli/src/chain_spec.rs index 878ffb2b10b14..2fddc549ff7d0 100644 --- a/node/cli/src/chain_spec.rs +++ b/node/cli/src/chain_spec.rs @@ -26,6 +26,7 @@ use substrate_service; use hex_literal::{hex, hex_impl}; use substrate_keystore::pad_seed; +use substrate_telemetry::TelemetryEndpoints; const STAGING_TELEMETRY_URL: &str = "wss://telemetry.polkadot.io/submit/"; @@ -151,7 +152,7 @@ pub fn staging_testnet_config() -> ChainSpec { "staging_testnet", staging_testnet_config_genesis, boot_nodes, - Some(STAGING_TELEMETRY_URL.into()), + Some(TelemetryEndpoints::new(vec![(STAGING_TELEMETRY_URL.to_string(), 0)])), None, None, None,