Skip to content
This repository was archived by the owner on Nov 15, 2023. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion bin/node/cli/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ macro_rules! new_full {
service.spawn_essential_task("babe-proposer", babe);

let network = service.network();
let dht_event_stream = network.event_stream().filter_map(|e| async move { match e {
let dht_event_stream = network.event_stream("authority-discovery").filter_map(|e| async move { match e {
Event::Dht(e) => Some(e),
_ => None,
}}).boxed();
Expand Down
2 changes: 1 addition & 1 deletion client/network-gossip/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ pub trait Network<B: BlockT> {

impl<B: BlockT, H: ExHashT> Network<B> for Arc<NetworkService<B, H>> {
fn event_stream(&self) -> Pin<Box<dyn Stream<Item = Event> + Send>> {
Box::pin(NetworkService::event_stream(self))
Box::pin(NetworkService::event_stream(self, "network-gossip"))
}

fn report_peer(&self, peer_id: PeerId, reputation: ReputationChange) {
Expand Down
8 changes: 6 additions & 2 deletions client/network/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -575,9 +575,13 @@ impl<B: BlockT + 'static, H: ExHashT> NetworkService<B, H> {
/// If this method is called multiple times, the events are duplicated.
///
/// The stream never ends (unless the `NetworkWorker` gets shut down).
pub fn event_stream(&self) -> impl Stream<Item = Event> {
///
/// The name passed is used to identify the channel in the Prometheus metrics. Note that the
/// parameter is a `&'static str`, and not a `String`, in order to avoid accidentally having
/// an unbounded set of Prometheus metrics, which would be quite bad in terms of memory
pub fn event_stream(&self, name: &'static str) -> impl Stream<Item = Event> {
// Note: when transitioning to stable futures, remove the `Error` entirely
let (tx, rx) = out_events::channel();
let (tx, rx) = out_events::channel(name);
let _ = self.to_worker.unbounded_send(ServiceToWorkerMsg::EventStream(tx));
rx
}
Expand Down
44 changes: 25 additions & 19 deletions client/network/src/service/out_events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,13 @@ use std::{
};

/// Creates a new channel that can be associated to a [`OutChannels`].
pub fn channel() -> (Sender, Receiver) {
///
/// The name is used in Prometheus reports.
pub fn channel(name: &'static str) -> (Sender, Receiver) {
let (tx, rx) = mpsc::unbounded();
let metrics = Arc::new(Mutex::new(None));
let tx = Sender { inner: tx, metrics: metrics.clone() };
let rx = Receiver { inner: rx, metrics };
let tx = Sender { inner: tx, name, metrics: metrics.clone() };
let rx = Receiver { inner: rx, name, metrics };
(tx, rx)
}

Expand All @@ -60,6 +62,7 @@ pub fn channel() -> (Sender, Receiver) {
/// sync on drop. If someone adds a `#[derive(Clone)]` below, it is **wrong**.
pub struct Sender {
inner: mpsc::UnboundedSender<Event>,
name: &'static str,
/// Clone of [`Receiver::metrics`].
metrics: Arc<Mutex<Option<Arc<Option<Metrics>>>>>,
}
Expand All @@ -82,6 +85,7 @@ impl Drop for Sender {
/// Receiving side of a channel.
pub struct Receiver {
inner: mpsc::UnboundedReceiver<Event>,
name: &'static str,
/// Initially contains `None`, and will be set to a value once the corresponding [`Sender`]
/// is assigned to an instance of [`OutChannels`].
metrics: Arc<Mutex<Option<Arc<Option<Metrics>>>>>,
Expand All @@ -94,7 +98,7 @@ impl Stream for Receiver {
if let Some(ev) = ready!(Pin::new(&mut self.inner).poll_next(cx)) {
let metrics = self.metrics.lock().clone();
if let Some(Some(metrics)) = metrics.as_ref().map(|m| &**m) {
metrics.event_out(&ev);
metrics.event_out(&ev, self.name);
} else {
log::warn!("Inconsistency in out_events: event happened before sender associated");
}
Expand Down Expand Up @@ -161,7 +165,9 @@ impl OutChannels {
});

if let Some(metrics) = &*self.metrics {
metrics.event_in(&event, self.event_streams.len() as u64);
for ev in &self.event_streams {
metrics.event_in(&event, 1, ev.name);
}
}
}
}
Expand Down Expand Up @@ -190,15 +196,15 @@ impl Metrics {
"Number of broadcast network events that have been sent or received across all \
channels"
),
&["event_name", "action"]
&["event_name", "action", "name"]
)?, registry)?,
notifications_sizes: register(CounterVec::new(
Opts::new(
"sub_libp2p_out_events_notifications_sizes",
"Size of notification events that have been sent or received across all \
channels"
),
&["protocol", "action"]
&["protocol", "action", "name"]
)?, registry)?,
num_channels: register(Gauge::new(
"sub_libp2p_out_events_num_channels",
Expand All @@ -207,60 +213,60 @@ impl Metrics {
})
}

fn event_in(&self, event: &Event, num: u64) {
fn event_in(&self, event: &Event, num: u64, name: &str) {
match event {
Event::Dht(_) => {
self.events_total
.with_label_values(&["dht", "sent"])
.with_label_values(&["dht", "sent", name])
.inc_by(num);
}
Event::NotificationStreamOpened { engine_id, .. } => {
self.events_total
.with_label_values(&[&format!("notif-open-{:?}", engine_id), "sent"])
.with_label_values(&[&format!("notif-open-{:?}", engine_id), "sent", name])
.inc_by(num);
},
Event::NotificationStreamClosed { engine_id, .. } => {
self.events_total
.with_label_values(&[&format!("notif-closed-{:?}", engine_id), "sent"])
.with_label_values(&[&format!("notif-closed-{:?}", engine_id), "sent", name])
.inc_by(num);
},
Event::NotificationsReceived { messages, .. } => {
for (engine_id, message) in messages {
self.events_total
.with_label_values(&[&format!("notif-{:?}", engine_id), "sent"])
.with_label_values(&[&format!("notif-{:?}", engine_id), "sent", name])
.inc_by(num);
self.notifications_sizes
.with_label_values(&[&engine_id_to_string(engine_id), "sent"])
.with_label_values(&[&engine_id_to_string(engine_id), "sent", name])
.inc_by(num.saturating_mul(u64::try_from(message.len()).unwrap_or(u64::max_value())));
}
},
}
}

fn event_out(&self, event: &Event) {
fn event_out(&self, event: &Event, name: &str) {
match event {
Event::Dht(_) => {
self.events_total
.with_label_values(&["dht", "received"])
.with_label_values(&["dht", "received", name])
.inc();
}
Event::NotificationStreamOpened { engine_id, .. } => {
self.events_total
.with_label_values(&[&format!("notif-open-{:?}", engine_id), "received"])
.with_label_values(&[&format!("notif-open-{:?}", engine_id), "received", name])
.inc();
},
Event::NotificationStreamClosed { engine_id, .. } => {
self.events_total
.with_label_values(&[&format!("notif-closed-{:?}", engine_id), "received"])
.with_label_values(&[&format!("notif-closed-{:?}", engine_id), "received", name])
.inc();
},
Event::NotificationsReceived { messages, .. } => {
for (engine_id, message) in messages {
self.events_total
.with_label_values(&[&format!("notif-{:?}", engine_id), "received"])
.with_label_values(&[&format!("notif-{:?}", engine_id), "received", name])
.inc();
self.notifications_sizes
.with_label_values(&[&engine_id_to_string(engine_id), "received"])
.with_label_values(&[&engine_id_to_string(engine_id), "received", name])
.inc_by(u64::try_from(message.len()).unwrap_or(u64::max_value()));
}
},
Expand Down
2 changes: 1 addition & 1 deletion client/network/src/service/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ fn build_test_full_node(config: config::NetworkConfiguration)
.unwrap();

let service = worker.service().clone();
let event_stream = service.event_stream();
let event_stream = service.event_stream("test");

async_std::task::spawn(async move {
futures::pin_mut!(worker);
Expand Down