diff --git a/bin/node/cli/src/service.rs b/bin/node/cli/src/service.rs index 257068cf144ac..757022655dd83 100644 --- a/bin/node/cli/src/service.rs +++ b/bin/node/cli/src/service.rs @@ -176,7 +176,7 @@ macro_rules! new_full { service.spawn_essential_task("babe-proposer", babe); let network = service.network(); - let dht_event_stream = network.event_stream().filter_map(|e| async move { match e { + let dht_event_stream = network.event_stream("authority-discovery").filter_map(|e| async move { match e { Event::Dht(e) => Some(e), _ => None, }}).boxed(); diff --git a/client/network-gossip/src/lib.rs b/client/network-gossip/src/lib.rs index 4e4d32366f29d..42aeca86cb275 100644 --- a/client/network-gossip/src/lib.rs +++ b/client/network-gossip/src/lib.rs @@ -99,7 +99,7 @@ pub trait Network { impl Network for Arc> { fn event_stream(&self) -> Pin + Send>> { - Box::pin(NetworkService::event_stream(self)) + Box::pin(NetworkService::event_stream(self, "network-gossip")) } fn report_peer(&self, peer_id: PeerId, reputation: ReputationChange) { diff --git a/client/network/src/service.rs b/client/network/src/service.rs index f7075cf16bfe7..b099a5dabfabc 100644 --- a/client/network/src/service.rs +++ b/client/network/src/service.rs @@ -575,9 +575,13 @@ impl NetworkService { /// If this method is called multiple times, the events are duplicated. /// /// The stream never ends (unless the `NetworkWorker` gets shut down). - pub fn event_stream(&self) -> impl Stream { + /// + /// The name passed is used to identify the channel in the Prometheus metrics. Note that the + /// parameter is a `&'static str`, and not a `String`, in order to avoid accidentally having + /// an unbounded set of Prometheus metrics, which would be quite bad in terms of memory + pub fn event_stream(&self, name: &'static str) -> impl Stream { // Note: when transitioning to stable futures, remove the `Error` entirely - let (tx, rx) = out_events::channel(); + let (tx, rx) = out_events::channel(name); let _ = self.to_worker.unbounded_send(ServiceToWorkerMsg::EventStream(tx)); rx } diff --git a/client/network/src/service/out_events.rs b/client/network/src/service/out_events.rs index 10bb9b7e91f78..b279be3c22d3c 100644 --- a/client/network/src/service/out_events.rs +++ b/client/network/src/service/out_events.rs @@ -43,11 +43,13 @@ use std::{ }; /// Creates a new channel that can be associated to a [`OutChannels`]. -pub fn channel() -> (Sender, Receiver) { +/// +/// The name is used in Prometheus reports. +pub fn channel(name: &'static str) -> (Sender, Receiver) { let (tx, rx) = mpsc::unbounded(); let metrics = Arc::new(Mutex::new(None)); - let tx = Sender { inner: tx, metrics: metrics.clone() }; - let rx = Receiver { inner: rx, metrics }; + let tx = Sender { inner: tx, name, metrics: metrics.clone() }; + let rx = Receiver { inner: rx, name, metrics }; (tx, rx) } @@ -60,6 +62,7 @@ pub fn channel() -> (Sender, Receiver) { /// sync on drop. If someone adds a `#[derive(Clone)]` below, it is **wrong**. pub struct Sender { inner: mpsc::UnboundedSender, + name: &'static str, /// Clone of [`Receiver::metrics`]. metrics: Arc>>>>, } @@ -82,6 +85,7 @@ impl Drop for Sender { /// Receiving side of a channel. pub struct Receiver { inner: mpsc::UnboundedReceiver, + name: &'static str, /// Initially contains `None`, and will be set to a value once the corresponding [`Sender`] /// is assigned to an instance of [`OutChannels`]. metrics: Arc>>>>, @@ -94,7 +98,7 @@ impl Stream for Receiver { if let Some(ev) = ready!(Pin::new(&mut self.inner).poll_next(cx)) { let metrics = self.metrics.lock().clone(); if let Some(Some(metrics)) = metrics.as_ref().map(|m| &**m) { - metrics.event_out(&ev); + metrics.event_out(&ev, self.name); } else { log::warn!("Inconsistency in out_events: event happened before sender associated"); } @@ -161,7 +165,9 @@ impl OutChannels { }); if let Some(metrics) = &*self.metrics { - metrics.event_in(&event, self.event_streams.len() as u64); + for ev in &self.event_streams { + metrics.event_in(&event, 1, ev.name); + } } } } @@ -190,7 +196,7 @@ impl Metrics { "Number of broadcast network events that have been sent or received across all \ channels" ), - &["event_name", "action"] + &["event_name", "action", "name"] )?, registry)?, notifications_sizes: register(CounterVec::new( Opts::new( @@ -198,7 +204,7 @@ impl Metrics { "Size of notification events that have been sent or received across all \ channels" ), - &["protocol", "action"] + &["protocol", "action", "name"] )?, registry)?, num_channels: register(Gauge::new( "sub_libp2p_out_events_num_channels", @@ -207,60 +213,60 @@ impl Metrics { }) } - fn event_in(&self, event: &Event, num: u64) { + fn event_in(&self, event: &Event, num: u64, name: &str) { match event { Event::Dht(_) => { self.events_total - .with_label_values(&["dht", "sent"]) + .with_label_values(&["dht", "sent", name]) .inc_by(num); } Event::NotificationStreamOpened { engine_id, .. } => { self.events_total - .with_label_values(&[&format!("notif-open-{:?}", engine_id), "sent"]) + .with_label_values(&[&format!("notif-open-{:?}", engine_id), "sent", name]) .inc_by(num); }, Event::NotificationStreamClosed { engine_id, .. } => { self.events_total - .with_label_values(&[&format!("notif-closed-{:?}", engine_id), "sent"]) + .with_label_values(&[&format!("notif-closed-{:?}", engine_id), "sent", name]) .inc_by(num); }, Event::NotificationsReceived { messages, .. } => { for (engine_id, message) in messages { self.events_total - .with_label_values(&[&format!("notif-{:?}", engine_id), "sent"]) + .with_label_values(&[&format!("notif-{:?}", engine_id), "sent", name]) .inc_by(num); self.notifications_sizes - .with_label_values(&[&engine_id_to_string(engine_id), "sent"]) + .with_label_values(&[&engine_id_to_string(engine_id), "sent", name]) .inc_by(num.saturating_mul(u64::try_from(message.len()).unwrap_or(u64::max_value()))); } }, } } - fn event_out(&self, event: &Event) { + fn event_out(&self, event: &Event, name: &str) { match event { Event::Dht(_) => { self.events_total - .with_label_values(&["dht", "received"]) + .with_label_values(&["dht", "received", name]) .inc(); } Event::NotificationStreamOpened { engine_id, .. } => { self.events_total - .with_label_values(&[&format!("notif-open-{:?}", engine_id), "received"]) + .with_label_values(&[&format!("notif-open-{:?}", engine_id), "received", name]) .inc(); }, Event::NotificationStreamClosed { engine_id, .. } => { self.events_total - .with_label_values(&[&format!("notif-closed-{:?}", engine_id), "received"]) + .with_label_values(&[&format!("notif-closed-{:?}", engine_id), "received", name]) .inc(); }, Event::NotificationsReceived { messages, .. } => { for (engine_id, message) in messages { self.events_total - .with_label_values(&[&format!("notif-{:?}", engine_id), "received"]) + .with_label_values(&[&format!("notif-{:?}", engine_id), "received", name]) .inc(); self.notifications_sizes - .with_label_values(&[&engine_id_to_string(engine_id), "received"]) + .with_label_values(&[&engine_id_to_string(engine_id), "received", name]) .inc_by(u64::try_from(message.len()).unwrap_or(u64::max_value())); } }, diff --git a/client/network/src/service/tests.rs b/client/network/src/service/tests.rs index 0e097072e6c05..a60b32efb414e 100644 --- a/client/network/src/service/tests.rs +++ b/client/network/src/service/tests.rs @@ -106,7 +106,7 @@ fn build_test_full_node(config: config::NetworkConfiguration) .unwrap(); let service = worker.service().clone(); - let event_stream = service.event_stream(); + let event_stream = service.event_stream("test"); async_std::task::spawn(async move { futures::pin_mut!(worker);