Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
178 changes: 162 additions & 16 deletions substrate/client/network/statement/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ use codec::{Compact, Decode, Encode, MaxEncodedLen};
use futures::future::pending;
use futures::{channel::oneshot, future::FusedFuture, prelude::*, stream::FuturesUnordered};
use prometheus_endpoint::{
prometheus, register, Counter, Gauge, Histogram, HistogramOpts, PrometheusError, Registry, U64,
exponential_buckets, register, Counter, Gauge, Histogram, HistogramOpts, PrometheusError,
Registry, U64,
};
use sc_network::{
config::{NonReservedPeerMode, SetConfig},
Expand All @@ -59,6 +60,7 @@ use std::{
num::NonZeroUsize,
pin::Pin,
sync::Arc,
time::Instant,
};
use tokio::time::timeout;
pub mod config;
Expand Down Expand Up @@ -98,6 +100,15 @@ struct Metrics {
propagated_statements_chunks: Histogram,
pending_statements: Gauge<U64>,
ignored_statements: Counter<U64>,
peers_connected: Gauge<U64>,
statements_received: Counter<U64>,
bytes_sent_total: Counter<U64>,
bytes_received_total: Counter<U64>,
sent_latency_seconds: Histogram,
initial_sync_statements_sent: Counter<U64>,
initial_sync_bursts_total: Counter<U64>,
initial_sync_peers_active: Gauge<U64>,
initial_sync_duration_seconds: Histogram,
}

impl Metrics {
Expand Down Expand Up @@ -129,7 +140,8 @@ impl Metrics {
HistogramOpts::new(
"substrate_sync_propagated_statements_chunks",
"Distribution of chunk sizes when propagating statements",
).buckets(prometheus::exponential_buckets(1.0, 2.0, 14)?),
)
.buckets(exponential_buckets(1.0, 2.0, 14)?),
)?,
r,
)?,
Expand All @@ -147,6 +159,76 @@ impl Metrics {
)?,
r,
)?,
peers_connected: register(
Gauge::new(
"substrate_sync_statement_peers_connected",
"Number of peers connected using the statement protocol",
)?,
r,
)?,
statements_received: register(
Counter::new(
"substrate_sync_statements_received",
"Total number of statements received from peers",
)?,
r,
)?,
bytes_sent_total: register(
Counter::new(
"substrate_sync_statement_bytes_sent_total",
"Total bytes sent for statement protocol messages",
)?,
r,
)?,
bytes_received_total: register(
Counter::new(
"substrate_sync_statement_bytes_received_total",
"Total bytes received for statement protocol messages",
)?,
r,
)?,
sent_latency_seconds: register(
Histogram::with_opts(
HistogramOpts::new(
"substrate_sync_statement_sent_latency_seconds",
"Time to send statement messages to peers",
)
// Buckets from 1μs to ~1s covering microsecond to millisecond range.
.buckets(vec![0.000_001, 0.000_01, 0.000_1, 0.001, 0.01, 0.1, 1.0]),
)?,
r,
)?,
initial_sync_statements_sent: register(
Counter::new(
"substrate_sync_initial_sync_statements_sent",
"Total statements sent during initial sync bursts to newly connected peers",
)?,
r,
)?,
initial_sync_bursts_total: register(
Counter::new(
"substrate_sync_initial_sync_bursts_total",
"Total number of initial sync burst rounds processed",
)?,
r,
)?,
initial_sync_peers_active: register(
Gauge::new(
"substrate_sync_initial_sync_peers_active",
"Number of peers currently being synced via initial sync",
)?,
r,
)?,
initial_sync_duration_seconds: register(
Histogram::with_opts(
HistogramOpts::new(
"substrate_sync_initial_sync_duration_seconds",
"Per-peer total duration of initial sync from start to completion",
)
.buckets(vec![0.01, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0, 30.0, 60.0]),
)?,
r,
)?,
})
}
}
Expand Down Expand Up @@ -220,6 +302,9 @@ impl StatementHandlerPrototype {
num_submission_workers = 1;
}

let metrics =
if let Some(r) = metrics_registry { Some(Metrics::register(r)?) } else { None };

for _ in 0..num_submission_workers {
let store = statement_store.clone();
let mut queue_receiver = queue_receiver.clone();
Expand Down Expand Up @@ -260,11 +345,7 @@ impl StatementHandlerPrototype {
peers: HashMap::new(),
statement_store,
queue_sender,
metrics: if let Some(r) = metrics_registry {
Some(Metrics::register(r)?)
} else {
None
},
metrics,
initial_sync_timeout: Box::pin(tokio::time::sleep(INITIAL_SYNC_BURST_INTERVAL).fuse()),
pending_initial_syncs: HashMap::new(),
initial_sync_peer_queue: VecDeque::new(),
Expand Down Expand Up @@ -324,6 +405,7 @@ pub struct Peer {
/// Tracks pending initial sync state for a peer (hashes only, statements fetched on-demand).
struct PendingInitialSync {
hashes: Vec<Hash>,
started_at: Instant,
}

/// Result of finding a sendable chunk of statements.
Expand Down Expand Up @@ -503,18 +585,27 @@ where
ChunkResult::Send(0) => SendChunkResult::Empty,
ChunkResult::Send(chunk_end) => {
let chunk = &statements[..chunk_end];
if let Err(e) = timeout(
let encoded = chunk.encode();
let bytes_to_send = encoded.len() as u64;

let sent_latency_timer =
self.metrics.as_ref().map(|m| m.sent_latency_seconds.start_timer());
let send_result = timeout(
SEND_TIMEOUT,
self.notification_service.send_async_notification(peer, chunk.encode()),
self.notification_service.send_async_notification(peer, encoded),
)
.await
{
.await;
drop(sent_latency_timer);

if let Err(e) = send_result {
log::debug!(target: LOG_TARGET, "Failed to send notification to {peer}: {e:?}");
return SendChunkResult::Failed;
}

log::trace!(target: LOG_TARGET, "Sent {} statements to {}", chunk.len(), peer);
self.metrics.as_ref().map(|metrics| {
metrics.propagated_statements.inc_by(chunk.len() as u64);
metrics.bytes_sent_total.inc_by(bytes_to_send);
metrics.propagated_statements_chunks.observe(chunk.len() as f64);
});
SendChunkResult::Sent(chunk_end)
Expand Down Expand Up @@ -581,21 +672,46 @@ where
);
debug_assert!(_was_in.is_none());

if !self.sync.is_major_syncing() && !role.is_light() {
self.metrics.as_ref().map(|metrics| {
metrics.peers_connected.set(self.peers.len() as u64);
});

if !self.sync.is_major_syncing() {
let hashes = self.statement_store.statement_hashes();
if !hashes.is_empty() {
self.pending_initial_syncs.insert(peer, PendingInitialSync { hashes });
self.pending_initial_syncs.insert(
peer,
PendingInitialSync { hashes, started_at: Instant::now() },
);
self.initial_sync_peer_queue.push_back(peer);
self.metrics.as_ref().map(|metrics| {
metrics.initial_sync_peers_active.inc();
});
}
}
},
NotificationEvent::NotificationStreamClosed { peer } => {
let _peer = self.peers.remove(&peer);
debug_assert!(_peer.is_some());
self.pending_initial_syncs.remove(&peer);
if let Some(pending) = self.pending_initial_syncs.remove(&peer) {
self.metrics.as_ref().map(|metrics| {
metrics.initial_sync_peers_active.dec();
metrics
.initial_sync_duration_seconds
.observe(pending.started_at.elapsed().as_secs_f64());
});
}
self.initial_sync_peer_queue.retain(|p| *p != peer);
self.metrics.as_ref().map(|metrics| {
metrics.peers_connected.set(self.peers.len() as u64);
});
},
NotificationEvent::NotificationReceived { peer, notification } => {
let bytes_received = notification.len() as u64;
self.metrics.as_ref().map(|metrics| {
metrics.bytes_received_total.inc_by(bytes_received);
});

// Accept statements only when node is not major syncing
if self.sync.is_major_syncing() {
log::trace!(
Expand All @@ -618,6 +734,11 @@ where
#[cfg_attr(not(any(test, feature = "test-helpers")), doc(hidden))]
pub fn on_statements(&mut self, who: PeerId, statements: Statements) {
log::trace!(target: LOG_TARGET, "Received {} statements from {}", statements.len(), who);

self.metrics.as_ref().map(|metrics| {
metrics.statements_received.inc_by(statements.len() as u64);
});

if let Some(ref mut peer) = self.peers.get_mut(&who) {
let mut statements_left = statements.len() as u64;
for s in statements {
Expand Down Expand Up @@ -788,6 +909,16 @@ where
}
}

/// Record initial sync completion metrics for a peer being removed.
fn record_initial_sync_completion(&self, started_at: Instant) {
self.metrics.as_ref().map(|metrics| {
metrics.initial_sync_peers_active.dec();
metrics
.initial_sync_duration_seconds
.observe(started_at.elapsed().as_secs_f64());
});
}

/// Process one batch of initial sync for the next peer in the queue (round-robin).
async fn process_initial_sync_burst(&mut self) {
if self.sync.is_major_syncing() {
Expand All @@ -802,8 +933,14 @@ where
return;
};

self.metrics.as_ref().map(|metrics| {
metrics.initial_sync_bursts_total.inc();
});

if entry.get().hashes.is_empty() {
let started_at = entry.get().started_at;
entry.remove();
self.record_initial_sync_completion(started_at);
return;
}

Expand All @@ -823,7 +960,9 @@ where
Ok(r) => r,
Err(e) => {
log::debug!(target: LOG_TARGET, "Failed to fetch statements for initial sync: {e:?}");
let started_at = entry.get().started_at;
entry.remove();
self.record_initial_sync_completion(started_at);
return;
},
};
Expand All @@ -837,11 +976,16 @@ where
let to_send: Vec<_> = statements.iter().map(|(_, stmt)| stmt).collect();
match self.send_statement_chunk(&peer_id, &to_send).await {
SendChunkResult::Failed => {
self.pending_initial_syncs.remove(&peer_id);
if let Some(pending) = self.pending_initial_syncs.remove(&peer_id) {
self.record_initial_sync_completion(pending.started_at);
}
return;
},
SendChunkResult::Sent(sent) => {
debug_assert_eq!(to_send.len(), sent);
self.metrics.as_ref().map(|metrics| {
metrics.initial_sync_statements_sent.inc_by(sent as u64);
});
// Mark statements as known
if let Some(peer) = self.peers.get_mut(&peer_id) {
for (hash, _) in &statements {
Expand All @@ -856,7 +1000,9 @@ where
if has_more {
self.initial_sync_peer_queue.push_back(peer_id);
} else {
self.pending_initial_syncs.remove(&peer_id);
if let Some(pending) = self.pending_initial_syncs.remove(&peer_id) {
self.record_initial_sync_completion(pending.started_at);
}
}
}
}
Expand Down
Loading
Loading