diff --git a/Cargo.lock b/Cargo.lock index ed90ccbf2ffa9..7b8bf23c0a45a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5605,6 +5605,7 @@ dependencies = [ "sp-blockchain", "sp-core", "sp-runtime", + "substrate-prometheus-endpoint", "substrate-test-runtime-client", ] diff --git a/bin/node/cli/src/service.rs b/bin/node/cli/src/service.rs index 332c47ea132ea..5b0fdbbdbdf7b 100644 --- a/bin/node/cli/src/service.rs +++ b/bin/node/cli/src/service.rs @@ -193,6 +193,7 @@ macro_rules! new_full { sentry_nodes, service.keystore(), dht_event_stream, + service.prometheus_registry(), ); service.spawn_task("authority-discovery", authority_discovery); diff --git a/client/authority-discovery/Cargo.toml b/client/authority-discovery/Cargo.toml index 0c3b739bb9deb..a7ea523552d8f 100644 --- a/client/authority-discovery/Cargo.toml +++ b/client/authority-discovery/Cargo.toml @@ -20,6 +20,7 @@ futures = "0.3.1" futures-timer = "3.0.1" libp2p = { version = "0.16.2", default-features = false, features = ["secp256k1", "libp2p-websocket"] } log = "0.4.8" +prometheus-endpoint = { package = "substrate-prometheus-endpoint", path = "../../utils/prometheus", version = "0.8.0-alpha.2" } prost = "0.6.1" rand = "0.7.2" sc-client-api = { version = "2.0.0-alpha.2", path = "../api" } diff --git a/client/authority-discovery/src/error.rs b/client/authority-discovery/src/error.rs index d62281c0c2829..74b7043c29ba0 100644 --- a/client/authority-discovery/src/error.rs +++ b/client/authority-discovery/src/error.rs @@ -46,4 +46,6 @@ pub enum Error { EncodingDecodingScale(codec::Error), /// Failed to parse a libp2p multi address. ParsingMultiaddress(libp2p::core::multiaddr::Error), + /// Failed to register Prometheus metric. + Prometheus(prometheus_endpoint::PrometheusError), } diff --git a/client/authority-discovery/src/lib.rs b/client/authority-discovery/src/lib.rs index 92dcc264502f0..171a401bbbf8b 100644 --- a/client/authority-discovery/src/lib.rs +++ b/client/authority-discovery/src/lib.rs @@ -58,6 +58,7 @@ use codec::{Decode, Encode}; use error::{Error, Result}; use libp2p::Multiaddr; use log::{debug, error, log_enabled, warn}; +use prometheus_endpoint::{Counter, CounterVec, Gauge, Opts, U64, register}; use prost::Message; use sc_client_api::blockchain::HeaderBackend; use sc_network::{DhtEvent, ExHashT, NetworkStateInfo}; @@ -87,6 +88,53 @@ const LIBP2P_KADEMLIA_BOOTSTRAP_TIME: Duration = Duration::from_secs(30); /// discovery module. const AUTHORITIES_PRIORITY_GROUP_NAME: &'static str = "authorities"; +/// Prometheus metrics for an `AuthorityDiscovery`. +#[derive(Clone)] +pub(crate) struct Metrics { + publish: Counter, + amount_last_published: Gauge, + request: Counter, + dht_event_received: CounterVec, +} + +impl Metrics { + pub(crate) fn register(registry: &prometheus_endpoint::Registry) -> Result { + Ok(Self { + publish: register( + Counter::new( + "authority_discovery_times_published_total", + "Number of times authority discovery has published external addresses." + )?, + registry, + )?, + amount_last_published: register( + Gauge::new( + "authority_discovery_amount_external_addresses_last_published", + "Number of external addresses published when authority discovery last published addresses ." + )?, + registry, + )?, + request: register( + Counter::new( + "authority_discovery_times_requested_total", + "Number of times authority discovery has requested external addresses." + )?, + registry, + )?, + dht_event_received: register( + CounterVec::new( + Opts::new( + "authority_discovery_dht_event_received", + "Number of dht events received by authority discovery." + ), + &["name"], + )?, + registry, + )?, + }) + } +} + /// An `AuthorityDiscovery` makes a given authority discoverable and discovers other authorities. pub struct AuthorityDiscovery where @@ -118,6 +166,8 @@ where addr_cache: addr_cache::AddrCache, + metrics: Option, + phantom: PhantomData, } @@ -140,6 +190,7 @@ where sentry_nodes: Vec, key_store: BareCryptoStorePtr, dht_event_rx: Pin + Send>>, + prometheus_registry: Option, ) -> Self { // Kademlia's default time-to-live for Dht records is 36h, republishing records every 24h. // Given that a node could restart at any point in time, one can not depend on the @@ -177,6 +228,19 @@ where let addr_cache = AddrCache::new(); + let metrics = match prometheus_registry { + Some(registry) => { + match Metrics::register(®istry) { + Ok(metrics) => Some(metrics), + Err(e) => { + error!(target: "sub-authority-discovery", "Failed to register metrics: {:?}", e); + None + }, + } + }, + None => None, + }; + AuthorityDiscovery { client, network, @@ -186,13 +250,18 @@ where publish_interval, query_interval, addr_cache, + metrics, phantom: PhantomData, } } /// Publish either our own or if specified the public addresses of our sentry nodes. fn publish_ext_addresses(&mut self) -> Result<()> { - let addresses = match &self.sentry_nodes { + if let Some(metrics) = &self.metrics { + metrics.publish.inc() + } + + let addresses: Vec<_> = match &self.sentry_nodes { Some(addrs) => addrs.clone().into_iter() .map(|a| a.to_vec()) .collect(), @@ -205,6 +274,10 @@ where .collect(), }; + if let Some(metrics) = &self.metrics { + metrics.amount_last_published.set(addresses.len() as u64); + } + let mut serialized_addresses = vec![]; schema::AuthorityAddresses { addresses } .encode(&mut serialized_addresses) @@ -231,6 +304,10 @@ where } fn request_addresses_of_others(&mut self) -> Result<()> { + if let Some(metrics) = &self.metrics { + metrics.request.inc(); + } + let id = BlockId::hash(self.client.info().best_hash); let authorities = self @@ -251,6 +328,10 @@ where while let Poll::Ready(Some(event)) = self.dht_event_rx.poll_next_unpin(cx) { match event { DhtEvent::ValueFound(v) => { + if let Some(metrics) = &self.metrics { + metrics.dht_event_received.with_label_values(&["value_found"]).inc(); + } + if log_enabled!(log::Level::Debug) { let hashes = v.iter().map(|(hash, _value)| hash.clone()); debug!( @@ -261,17 +342,36 @@ where self.handle_dht_value_found_event(v)?; } - DhtEvent::ValueNotFound(hash) => debug!( - target: "sub-authority-discovery", - "Value for hash '{:?}' not found on Dht.", hash - ), - DhtEvent::ValuePut(hash) => debug!( - target: "sub-authority-discovery", - "Successfully put hash '{:?}' on Dht.", hash), - DhtEvent::ValuePutFailed(hash) => warn!( - target: "sub-authority-discovery", - "Failed to put hash '{:?}' on Dht.", hash - ), + DhtEvent::ValueNotFound(hash) => { + if let Some(metrics) = &self.metrics { + metrics.dht_event_received.with_label_values(&["value_not_found"]).inc(); + } + + debug!( + target: "sub-authority-discovery", + "Value for hash '{:?}' not found on Dht.", hash + ) + }, + DhtEvent::ValuePut(hash) => { + if let Some(metrics) = &self.metrics { + metrics.dht_event_received.with_label_values(&["value_put"]).inc(); + } + + debug!( + target: "sub-authority-discovery", + "Successfully put hash '{:?}' on Dht.", hash, + ) + }, + DhtEvent::ValuePutFailed(hash) => { + if let Some(metrics) = &self.metrics { + metrics.dht_event_received.with_label_values(&["value_put_failed"]).inc(); + } + + warn!( + target: "sub-authority-discovery", + "Failed to put hash '{:?}' on Dht.", hash + ) + }, } } @@ -394,7 +494,7 @@ where /// Update the peer set 'authority' priority group. // - fn update_peer_set_priority_group(&self) -> Result<()>{ + fn update_peer_set_priority_group(&self) -> Result<()> { let addresses = self.addr_cache.get_subset(); debug!( diff --git a/client/authority-discovery/src/tests.rs b/client/authority-discovery/src/tests.rs index 55ee6c7aac13a..1296c2b627a1e 100644 --- a/client/authority-discovery/src/tests.rs +++ b/client/authority-discovery/src/tests.rs @@ -275,6 +275,29 @@ impl NetworkStateInfo for TestNetwork { } } +#[test] +fn new_registers_metrics() { + let (_dht_event_tx, dht_event_rx) = channel(1000); + let network: Arc = Arc::new(Default::default()); + let key_store = KeyStore::new(); + let test_api = Arc::new(TestApi { + authorities: vec![], + }); + + let registry = prometheus_endpoint::Registry::new(); + + AuthorityDiscovery::new( + test_api, + network.clone(), + vec![], + key_store, + dht_event_rx.boxed(), + Some(registry.clone()), + ); + + assert!(registry.gather().len() > 0); +} + #[test] fn publish_ext_addresses_puts_record_on_dht() { let (_dht_event_tx, dht_event_rx) = channel(1000); @@ -294,6 +317,7 @@ fn publish_ext_addresses_puts_record_on_dht() { vec![], key_store, dht_event_rx.boxed(), + None, ); authority_discovery.publish_ext_addresses().unwrap(); @@ -324,6 +348,7 @@ fn request_addresses_of_others_triggers_dht_get_query() { vec![], key_store, dht_event_rx.boxed(), + None, ); authority_discovery.request_addresses_of_others().unwrap(); @@ -351,6 +376,7 @@ fn handle_dht_events_with_value_found_should_call_set_priority_group() { vec![], key_store, dht_event_rx.boxed(), + None, ); // Create sample dht event.