Skip to content
This repository was archived by the owner on Nov 15, 2023. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions bin/node/cli/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,7 @@ macro_rules! new_full {
sentry_nodes,
service.keystore(),
dht_event_stream,
service.prometheus_registry(),
);

service.spawn_task("authority-discovery", authority_discovery);
Expand Down
1 change: 1 addition & 0 deletions client/authority-discovery/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ futures = "0.3.1"
futures-timer = "3.0.1"
libp2p = { version = "0.16.2", default-features = false, features = ["secp256k1", "libp2p-websocket"] }
log = "0.4.8"
prometheus-endpoint = { package = "substrate-prometheus-endpoint", path = "../../utils/prometheus", version = "0.8.0-alpha.2" }
prost = "0.6.1"
rand = "0.7.2"
sc-client-api = { version = "2.0.0-alpha.2", path = "../api" }
Expand Down
2 changes: 2 additions & 0 deletions client/authority-discovery/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,4 +46,6 @@ pub enum Error {
EncodingDecodingScale(codec::Error),
/// Failed to parse a libp2p multi address.
ParsingMultiaddress(libp2p::core::multiaddr::Error),
/// Failed to register Prometheus metric.
Prometheus(prometheus_endpoint::PrometheusError),
}
126 changes: 113 additions & 13 deletions client/authority-discovery/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ use codec::{Decode, Encode};
use error::{Error, Result};
use libp2p::Multiaddr;
use log::{debug, error, log_enabled, warn};
use prometheus_endpoint::{Counter, CounterVec, Gauge, Opts, U64, register};
use prost::Message;
use sc_client_api::blockchain::HeaderBackend;
use sc_network::{DhtEvent, ExHashT, NetworkStateInfo};
Expand Down Expand Up @@ -87,6 +88,53 @@ const LIBP2P_KADEMLIA_BOOTSTRAP_TIME: Duration = Duration::from_secs(30);
/// discovery module.
const AUTHORITIES_PRIORITY_GROUP_NAME: &'static str = "authorities";

/// Prometheus metrics for an `AuthorityDiscovery`.
#[derive(Clone)]
pub(crate) struct Metrics {
publish: Counter<U64>,
amount_last_published: Gauge<U64>,
request: Counter<U64>,
dht_event_received: CounterVec<U64>,
}

impl Metrics {
pub(crate) fn register(registry: &prometheus_endpoint::Registry) -> Result<Self> {
Ok(Self {
publish: register(
Counter::new(
"authority_discovery_times_published_total",
"Number of times authority discovery has published external addresses."
)?,
registry,
)?,
amount_last_published: register(
Gauge::new(
"authority_discovery_amount_external_addresses_last_published",
"Number of external addresses published when authority discovery last published addresses ."
)?,
registry,
)?,
request: register(
Counter::new(
"authority_discovery_times_requested_total",
"Number of times authority discovery has requested external addresses."
)?,
registry,
)?,
dht_event_received: register(
CounterVec::new(
Opts::new(
"authority_discovery_dht_event_received",
"Number of dht events received by authority discovery."
),
&["name"],
)?,
registry,
)?,
})
}
}

/// An `AuthorityDiscovery` makes a given authority discoverable and discovers other authorities.
pub struct AuthorityDiscovery<Client, Network, Block>
where
Expand Down Expand Up @@ -118,6 +166,8 @@ where

addr_cache: addr_cache::AddrCache<AuthorityId, Multiaddr>,

metrics: Option<Metrics>,

phantom: PhantomData<Block>,
}

Expand All @@ -140,6 +190,7 @@ where
sentry_nodes: Vec<String>,
key_store: BareCryptoStorePtr,
dht_event_rx: Pin<Box<dyn Stream<Item = DhtEvent> + Send>>,
prometheus_registry: Option<prometheus_endpoint::Registry>,
) -> Self {
// Kademlia's default time-to-live for Dht records is 36h, republishing records every 24h.
// Given that a node could restart at any point in time, one can not depend on the
Expand Down Expand Up @@ -177,6 +228,19 @@ where

let addr_cache = AddrCache::new();

let metrics = match prometheus_registry {
Some(registry) => {
match Metrics::register(&registry) {
Ok(metrics) => Some(metrics),
Err(e) => {
error!(target: "sub-authority-discovery", "Failed to register metrics: {:?}", e);
None
},
}
},
None => None,
};

AuthorityDiscovery {
client,
network,
Expand All @@ -186,13 +250,18 @@ where
publish_interval,
query_interval,
addr_cache,
metrics,
phantom: PhantomData,
}
}

/// Publish either our own or if specified the public addresses of our sentry nodes.
fn publish_ext_addresses(&mut self) -> Result<()> {
let addresses = match &self.sentry_nodes {
if let Some(metrics) = &self.metrics {
metrics.publish.inc()
}

let addresses: Vec<_> = match &self.sentry_nodes {
Some(addrs) => addrs.clone().into_iter()
.map(|a| a.to_vec())
.collect(),
Expand All @@ -205,6 +274,10 @@ where
.collect(),
};

if let Some(metrics) = &self.metrics {
metrics.amount_last_published.set(addresses.len() as u64);
}

let mut serialized_addresses = vec![];
schema::AuthorityAddresses { addresses }
.encode(&mut serialized_addresses)
Expand All @@ -231,6 +304,10 @@ where
}

fn request_addresses_of_others(&mut self) -> Result<()> {
if let Some(metrics) = &self.metrics {
metrics.request.inc();
}

let id = BlockId::hash(self.client.info().best_hash);

let authorities = self
Expand All @@ -251,6 +328,10 @@ where
while let Poll::Ready(Some(event)) = self.dht_event_rx.poll_next_unpin(cx) {
match event {
DhtEvent::ValueFound(v) => {
if let Some(metrics) = &self.metrics {
metrics.dht_event_received.with_label_values(&["value_found"]).inc();
}

if log_enabled!(log::Level::Debug) {
let hashes = v.iter().map(|(hash, _value)| hash.clone());
debug!(
Expand All @@ -261,17 +342,36 @@ where

self.handle_dht_value_found_event(v)?;
}
DhtEvent::ValueNotFound(hash) => debug!(
target: "sub-authority-discovery",
"Value for hash '{:?}' not found on Dht.", hash
),
DhtEvent::ValuePut(hash) => debug!(
target: "sub-authority-discovery",
"Successfully put hash '{:?}' on Dht.", hash),
DhtEvent::ValuePutFailed(hash) => warn!(
target: "sub-authority-discovery",
"Failed to put hash '{:?}' on Dht.", hash
),
DhtEvent::ValueNotFound(hash) => {
if let Some(metrics) = &self.metrics {
metrics.dht_event_received.with_label_values(&["value_not_found"]).inc();
}

debug!(
target: "sub-authority-discovery",
"Value for hash '{:?}' not found on Dht.", hash
)
},
DhtEvent::ValuePut(hash) => {
if let Some(metrics) = &self.metrics {
metrics.dht_event_received.with_label_values(&["value_put"]).inc();
}

debug!(
target: "sub-authority-discovery",
"Successfully put hash '{:?}' on Dht.", hash,
)
},
DhtEvent::ValuePutFailed(hash) => {
if let Some(metrics) = &self.metrics {
metrics.dht_event_received.with_label_values(&["value_put_failed"]).inc();
}

warn!(
target: "sub-authority-discovery",
"Failed to put hash '{:?}' on Dht.", hash
)
},
}
}

Expand Down Expand Up @@ -394,7 +494,7 @@ where

/// Update the peer set 'authority' priority group.
//
fn update_peer_set_priority_group(&self) -> Result<()>{
fn update_peer_set_priority_group(&self) -> Result<()> {
let addresses = self.addr_cache.get_subset();

debug!(
Expand Down
26 changes: 26 additions & 0 deletions client/authority-discovery/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,29 @@ impl NetworkStateInfo for TestNetwork {
}
}

#[test]
fn new_registers_metrics() {
let (_dht_event_tx, dht_event_rx) = channel(1000);
let network: Arc<TestNetwork> = Arc::new(Default::default());
let key_store = KeyStore::new();
let test_api = Arc::new(TestApi {
authorities: vec![],
});

let registry = prometheus_endpoint::Registry::new();

AuthorityDiscovery::new(
test_api,
network.clone(),
vec![],
key_store,
dht_event_rx.boxed(),
Some(registry.clone()),
);

assert!(registry.gather().len() > 0);
}

#[test]
fn publish_ext_addresses_puts_record_on_dht() {
let (_dht_event_tx, dht_event_rx) = channel(1000);
Expand All @@ -294,6 +317,7 @@ fn publish_ext_addresses_puts_record_on_dht() {
vec![],
key_store,
dht_event_rx.boxed(),
None,
);

authority_discovery.publish_ext_addresses().unwrap();
Expand Down Expand Up @@ -324,6 +348,7 @@ fn request_addresses_of_others_triggers_dht_get_query() {
vec![],
key_store,
dht_event_rx.boxed(),
None,
);

authority_discovery.request_addresses_of_others().unwrap();
Expand Down Expand Up @@ -351,6 +376,7 @@ fn handle_dht_events_with_value_found_should_call_set_priority_group() {
vec![],
key_store,
dht_event_rx.boxed(),
None,
);

// Create sample dht event.
Expand Down