diff --git a/Cargo.lock b/Cargo.lock index 2bb7c933b6d0..87d5fc3b2505 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3841,6 +3841,12 @@ dependencies = [ "parking_lot 0.11.1", ] +[[package]] +name = "oorandom" +version = "11.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0ab1bc2a289d34bd04a330323ac98a1b4bc82c9d9fcb1e66b63caa84da26b575" + [[package]] name = "opaque-debug" version = "0.2.3" @@ -5321,6 +5327,7 @@ dependencies = [ "futures 0.3.8", "futures-timer 3.0.2", "kv-log-macro", + "oorandom", "polkadot-node-network-protocol", "polkadot-node-primitives", "polkadot-node-subsystem", diff --git a/node/overseer/Cargo.toml b/node/overseer/Cargo.toml index dd9ca81d3f88..41c114f15d4f 100644 --- a/node/overseer/Cargo.toml +++ b/node/overseer/Cargo.toml @@ -5,17 +5,18 @@ authors = ["Parity Technologies "] edition = "2018" [dependencies] +async-trait = "0.1.42" +client = { package = "sc-client-api", git = "https://github.com/paritytech/substrate", branch = "master" } futures = "0.3.8" -tracing = "0.1.22" -tracing-futures = "0.2.4" futures-timer = "3.0.2" -streamunordered = "0.5.1" +oorandom = "11.1.3" +polkadot-node-primitives = { package = "polkadot-node-primitives", path = "../primitives" } +polkadot-node-subsystem-util = { path = "../subsystem-util" } polkadot-primitives = { path = "../../primitives" } -client = { package = "sc-client-api", git = "https://github.com/paritytech/substrate", branch = "master" } polkadot-subsystem = { package = "polkadot-node-subsystem", path = "../subsystem" } -polkadot-node-subsystem-util = { path = "../subsystem-util" } -polkadot-node-primitives = { package = "polkadot-node-primitives", path = "../primitives" } -async-trait = "0.1.42" +streamunordered = "0.5.1" +tracing = "0.1.22" +tracing-futures = "0.2.4" [dev-dependencies] sp-core = { git = "https://github.com/paritytech/substrate", branch = "master" } diff --git a/node/overseer/src/lib.rs b/node/overseer/src/lib.rs index 9e5ff2a7548d..cc5e9401cc11 100644 --- a/node/overseer/src/lib.rs +++ b/node/overseer/src/lib.rs @@ -59,7 +59,7 @@ // yielding false positives #![warn(missing_docs)] -use std::fmt::Debug; +use std::fmt::{self, Debug}; use std::pin::Pin; use std::sync::Arc; use std::task::Poll; @@ -74,6 +74,7 @@ use futures::{ Future, FutureExt, SinkExt, StreamExt, }; use futures_timer::Delay; +use oorandom::Rand32; use streamunordered::{StreamYield, StreamUnordered}; use polkadot_primitives::v1::{Block, BlockNumber, Hash}; @@ -100,6 +101,8 @@ const CHANNEL_CAPACITY: usize = 1024; const STOP_DELAY: u64 = 1; // Target for logs. const LOG_TARGET: &'static str = "overseer"; +// Rate at which messages are timed. +const MESSAGE_TIMER_METRIC_CAPTURE_RATE: f64 = 0.005; /// A type of messages that are sent from [`Subsystem`] to [`Overseer`]. /// @@ -292,6 +295,26 @@ struct SubsystemInstance { name: &'static str, } +type MaybeTimer = Option; + +#[derive(Debug)] +struct MaybeTimed { + timer: MaybeTimer, + t: T, +} + +impl MaybeTimed { + fn into_inner(self) -> T { + self.t + } +} + +impl From for MaybeTimed { + fn from(t: T) -> Self { + Self { timer: None, t } + } +} + /// A context type that is given to the [`Subsystem`] upon spawning. /// It can be used by [`Subsystem`] to communicate with other [`Subsystem`]s /// or to spawn it's [`SubsystemJob`]s. @@ -302,7 +325,82 @@ struct SubsystemInstance { #[derive(Debug)] pub struct OverseerSubsystemContext{ rx: mpsc::Receiver>, - tx: mpsc::Sender, + tx: mpsc::Sender>, + metrics: Metrics, + rng: Rand32, + threshold: u32, +} + +impl OverseerSubsystemContext { + /// Create a new `OverseerSubsystemContext`. + /// + /// `increment` determines the initial increment of the internal RNG. + /// The internal RNG is used to determine which messages are timed. + /// + /// `capture_rate` determines what fraction of messages are timed. Its value is clamped + /// to the range `0.0..=1.0`. + fn new( + rx: mpsc::Receiver>, + tx: mpsc::Sender>, + metrics: Metrics, + increment: u64, + mut capture_rate: f64, + ) -> Self { + let rng = Rand32::new_inc(0, increment); + + if capture_rate < 0.0 { + capture_rate = 0.0; + } else if capture_rate > 1.0 { + capture_rate = 1.0; + } + let threshold = (capture_rate * u32::MAX as f64) as u32; + + OverseerSubsystemContext { rx, tx, metrics, rng, threshold } + } + + /// Create a new `OverseserSubsystemContext` with no metering. + /// + /// Intended for tests. + #[allow(unused)] + fn new_unmetered(rx: mpsc::Receiver>, tx: mpsc::Sender>) -> Self { + let metrics = Metrics::default(); + OverseerSubsystemContext::new(rx, tx, metrics, 0, 0.0) + } + + fn maybe_timed(&mut self, t: T) -> MaybeTimed { + let timer = if self.rng.rand_u32() <= self.threshold { + self.metrics.time_message_hold() + } else { + None + }; + + MaybeTimed { timer, t } + } + + /// Make a standalone function which can construct a `MaybeTimed` wrapper around some `T` + /// without borrowing `self`. + /// + /// This is somewhat more expensive than `self.maybe_timed` because it must clone some stuff. + fn make_maybe_timed(&mut self) -> impl FnMut(T) -> MaybeTimed { + // We don't want to simply clone this RNG because we don't want to duplicate its state. + // It's not ever going to be used for cryptographic purposes, but it's still better to + // keep good habits. + let (seed, increment) = self.rng.state(); + let mut rng = Rand32::new_inc(seed, increment + 1); + + let metrics = self.metrics.clone(); + let threshold = self.threshold; + + move |t| { + let timer = if rng.rand_u32() <= threshold { + metrics.time_message_hold() + } else { + None + }; + + MaybeTimed { timer, t } + } + } } #[async_trait::async_trait] @@ -328,7 +426,7 @@ impl SubsystemContext for OverseerSubsystemContext { async fn spawn(&mut self, name: &'static str, s: Pin + Send>>) -> SubsystemResult<()> { - self.tx.send(ToOverseer::SpawnJob { + self.send_timed(ToOverseer::SpawnJob { name, s, }).await.map_err(Into::into) @@ -337,7 +435,7 @@ impl SubsystemContext for OverseerSubsystemContext { async fn spawn_blocking(&mut self, name: &'static str, s: Pin + Send>>) -> SubsystemResult<()> { - self.tx.send(ToOverseer::SpawnBlockingJob { + self.send_timed(ToOverseer::SpawnBlockingJob { name, s, }).await.map_err(Into::into) @@ -350,25 +448,46 @@ impl SubsystemContext for OverseerSubsystemContext { async fn send_messages(&mut self, msgs: T) where T: IntoIterator + Send, T::IntoIter: Send { - let mut msgs = stream::iter(msgs.into_iter().map(ToOverseer::SubsystemMessage).map(Ok)); - if self.tx.send_all(&mut msgs).await.is_err() { + self.send_all_timed_or_log(msgs).await + } +} + +impl OverseerSubsystemContext { + async fn send_and_log_error(&mut self, msg: ToOverseer) { + if self.send_timed(msg).await.is_err() { tracing::debug!( target: LOG_TARGET, msg_type = std::any::type_name::(), - "Failed to send messages to Overseer", + "Failed to send a message to Overseer", ); - } } -} -impl OverseerSubsystemContext { - async fn send_and_log_error(&mut self, msg: ToOverseer) { - if self.tx.send(msg).await.is_err() { + async fn send_timed(&mut self, msg: ToOverseer) -> Result< + (), + > as futures::Sink>>::Error + > + { + let msg = self.maybe_timed(msg); + self.tx.send(msg).await + } + + async fn send_all_timed_or_log(&mut self, msgs: Msgs) + where + Msgs: IntoIterator + Send, + Msgs::IntoIter: Send, + Msg: Into + Send, + { + let mut maybe_timed = self.make_maybe_timed(); + let mut msgs = stream::iter( + msgs.into_iter() + .map(move |msg| Ok(maybe_timed(ToOverseer::SubsystemMessage(msg.into())))) + ); + if self.tx.send_all(&mut msgs).await.is_err() { tracing::debug!( target: LOG_TARGET, msg_type = std::any::type_name::(), - "Failed to send a message to Overseer", + "Failed to send messages to Overseer", ); } } @@ -481,8 +600,8 @@ pub struct Overseer { /// Here we keep handles to spawned subsystems to be notified when they terminate. running_subsystems: FuturesUnordered>>, - /// Gather running subsystms' outbound streams into one. - running_subsystems_rx: StreamUnordered>, + /// Gather running subsystems' outbound streams into one. + running_subsystems_rx: StreamUnordered>>, /// Events that are sent to the overseer from the outside world events_rx: mpsc::Receiver, @@ -964,6 +1083,7 @@ struct MetricsInner { activated_heads_total: prometheus::Counter, deactivated_heads_total: prometheus::Counter, messages_relayed_total: prometheus::Counter, + message_relay_timing: prometheus::Histogram, } #[derive(Default, Clone)] @@ -987,6 +1107,11 @@ impl Metrics { metrics.messages_relayed_total.inc(); } } + + /// Provide a timer for the duration between receiving a message and passing it to `route_message` + fn time_message_hold(&self) -> MaybeTimer { + self.0.as_ref().map(|metrics| metrics.message_relay_timing.start_timer()) + } } impl metrics::Metrics for Metrics { @@ -1013,11 +1138,39 @@ impl metrics::Metrics for Metrics { )?, registry, )?, + message_relay_timing: prometheus::register( + prometheus::Histogram::with_opts( + prometheus::HistogramOpts { + common_opts: prometheus::Opts::new( + "overseer_messages_relay_timing", + "Time spent holding a message in the overseer before passing it to `route_message`", + ), + // guessing at the desired resolution, but we know that messages will time + // out after 0.5 seconds, so the bucket set below seems plausible: + // `0.0001 * (1.6 ^ 18) ~= 0.472`. Prometheus auto-generates a final bucket + // for all values between the final value and `+Inf`, so this should work. + // + // The documented legal range for the inputs are: + // + // - `> 0.0` + // - `> 1.0` + // - `! 0` + buckets: prometheus::exponential_buckets(0.0001, 1.6, 18).expect("inputs are within documented range; qed"), + } + )?, + registry, + )?, }; Ok(Metrics(Some(metrics))) } } +impl fmt::Debug for Metrics { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.write_str("Metrics {{...}}") + } +} + impl Overseer where S: SpawnNamed, @@ -1134,14 +1287,20 @@ where events_tx: events_tx.clone(), }; + let metrics = ::register(prometheus_registry)?; + let mut running_subsystems_rx = StreamUnordered::new(); let mut running_subsystems = FuturesUnordered::new(); + let mut seed = 0x533d; // arbitrary + let candidate_validation_subsystem = spawn( &mut s, &mut running_subsystems, &mut running_subsystems_rx, all_subsystems.candidate_validation, + &metrics, + &mut seed, )?; let candidate_backing_subsystem = spawn( @@ -1149,6 +1308,8 @@ where &mut running_subsystems, &mut running_subsystems_rx, all_subsystems.candidate_backing, + &metrics, + &mut seed, )?; let candidate_selection_subsystem = spawn( @@ -1156,6 +1317,8 @@ where &mut running_subsystems, &mut running_subsystems_rx, all_subsystems.candidate_selection, + &metrics, + &mut seed, )?; let statement_distribution_subsystem = spawn( @@ -1163,6 +1326,8 @@ where &mut running_subsystems, &mut running_subsystems_rx, all_subsystems.statement_distribution, + &metrics, + &mut seed, )?; let availability_distribution_subsystem = spawn( @@ -1170,6 +1335,8 @@ where &mut running_subsystems, &mut running_subsystems_rx, all_subsystems.availability_distribution, + &metrics, + &mut seed, )?; let bitfield_signing_subsystem = spawn( @@ -1177,6 +1344,8 @@ where &mut running_subsystems, &mut running_subsystems_rx, all_subsystems.bitfield_signing, + &metrics, + &mut seed, )?; let bitfield_distribution_subsystem = spawn( @@ -1184,6 +1353,8 @@ where &mut running_subsystems, &mut running_subsystems_rx, all_subsystems.bitfield_distribution, + &metrics, + &mut seed, )?; let provisioner_subsystem = spawn( @@ -1191,6 +1362,8 @@ where &mut running_subsystems, &mut running_subsystems_rx, all_subsystems.provisioner, + &metrics, + &mut seed, )?; let pov_distribution_subsystem = spawn( @@ -1198,6 +1371,8 @@ where &mut running_subsystems, &mut running_subsystems_rx, all_subsystems.pov_distribution, + &metrics, + &mut seed, )?; let runtime_api_subsystem = spawn( @@ -1205,6 +1380,8 @@ where &mut running_subsystems, &mut running_subsystems_rx, all_subsystems.runtime_api, + &metrics, + &mut seed, )?; let availability_store_subsystem = spawn( @@ -1212,6 +1389,8 @@ where &mut running_subsystems, &mut running_subsystems_rx, all_subsystems.availability_store, + &metrics, + &mut seed, )?; let network_bridge_subsystem = spawn( @@ -1219,6 +1398,8 @@ where &mut running_subsystems, &mut running_subsystems_rx, all_subsystems.network_bridge, + &metrics, + &mut seed, )?; let chain_api_subsystem = spawn( @@ -1226,6 +1407,8 @@ where &mut running_subsystems, &mut running_subsystems_rx, all_subsystems.chain_api, + &metrics, + &mut seed, )?; let collation_generation_subsystem = spawn( @@ -1233,6 +1416,8 @@ where &mut running_subsystems, &mut running_subsystems_rx, all_subsystems.collation_generation, + &metrics, + &mut seed, )?; @@ -1241,6 +1426,8 @@ where &mut running_subsystems, &mut running_subsystems_rx, all_subsystems.collator_protocol, + &metrics, + &mut seed, )?; let leaves = leaves @@ -1249,8 +1436,6 @@ where .collect(); let active_leaves = HashMap::new(); - - let metrics = ::register(prometheus_registry)?; let activation_external_listeners = HashMap::new(); let this = Self { @@ -1339,7 +1524,7 @@ where match msg { Event::MsgToSubsystem(msg) => { - self.route_message(msg).await?; + self.route_message(msg.into()).await?; } Event::Stop => { self.stop().await; @@ -1357,14 +1542,17 @@ where } }, msg = self.running_subsystems_rx.next().fuse() => { - let msg = if let Some((StreamYield::Item(msg), _)) = msg { + let MaybeTimed { timer, t: msg } = if let Some((StreamYield::Item(msg), _)) = msg { msg } else { continue }; match msg { - ToOverseer::SubsystemMessage(msg) => self.route_message(msg).await?, + ToOverseer::SubsystemMessage(msg) => { + let msg = MaybeTimed { timer, t: msg }; + self.route_message(msg).await? + }, ToOverseer::SpawnJob { name, s } => { self.spawn_job(name, s); } @@ -1465,7 +1653,8 @@ where } #[tracing::instrument(level = "trace", skip(self), fields(subsystem = LOG_TARGET))] - async fn route_message(&mut self, msg: AllMessages) -> SubsystemResult<()> { + async fn route_message(&mut self, msg: MaybeTimed) -> SubsystemResult<()> { + let msg = msg.into_inner(); self.metrics.on_message_relayed(); match msg { AllMessages::CandidateValidation(msg) => { @@ -1573,14 +1762,19 @@ where fn spawn( spawner: &mut S, futures: &mut FuturesUnordered>>, - streams: &mut StreamUnordered>, + streams: &mut StreamUnordered>>, s: impl Subsystem>, + metrics: &Metrics, + seed: &mut u64, ) -> SubsystemResult> { let (to_tx, to_rx) = mpsc::channel(CHANNEL_CAPACITY); let (from_tx, from_rx) = mpsc::channel(CHANNEL_CAPACITY); - let ctx = OverseerSubsystemContext { rx: to_rx, tx: from_tx }; + let ctx = OverseerSubsystemContext::new(to_rx, from_tx, metrics.clone(), *seed, MESSAGE_TIMER_METRIC_CAPTURE_RATE); let SpawnedSubsystem { future, name } = s.start(ctx); + // increment the seed now that it's been used, so the next context will have its own distinct RNG + *seed += 1; + let (tx, rx) = oneshot::channel(); let fut = Box::pin(async move { @@ -1828,12 +2022,13 @@ mod tests { fn extract_metrics(registry: &prometheus::Registry) -> HashMap<&'static str, u64> { let gather = registry.gather(); - assert_eq!(gather[0].get_name(), "parachain_activated_heads_total"); - assert_eq!(gather[1].get_name(), "parachain_deactivated_heads_total"); - assert_eq!(gather[2].get_name(), "parachain_messages_relayed_total"); - let activated = gather[0].get_metric()[0].get_counter().get_value() as u64; - let deactivated = gather[1].get_metric()[0].get_counter().get_value() as u64; - let relayed = gather[2].get_metric()[0].get_counter().get_value() as u64; + assert_eq!(gather[0].get_name(), "overseer_messages_relay_timing"); + assert_eq!(gather[1].get_name(), "parachain_activated_heads_total"); + assert_eq!(gather[2].get_name(), "parachain_deactivated_heads_total"); + assert_eq!(gather[3].get_name(), "parachain_messages_relayed_total"); + let activated = gather[1].get_metric()[0].get_counter().get_value() as u64; + let deactivated = gather[2].get_metric()[0].get_counter().get_value() as u64; + let relayed = gather[3].get_metric()[0].get_counter().get_value() as u64; let mut result = HashMap::new(); result.insert("activated", activated); result.insert("deactivated", deactivated);