From 077a8c1f1181ef6caf22d616a9a576fdc26e2c40 Mon Sep 17 00:00:00 2001 From: Peter Goodspeed-Niklaus Date: Tue, 5 Jan 2021 14:35:34 +0100 Subject: [PATCH 1/9] add timing setup to OverseerSubsystemContext --- Cargo.lock | 69 +++++++++++++++++++++++-- node/overseer/Cargo.toml | 16 +++--- node/overseer/src/lib.rs | 105 +++++++++++++++++++++++++++++++++++++-- 3 files changed, 174 insertions(+), 16 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 2bb7c933b6d0..71f640aaa5a6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1936,10 +1936,21 @@ checksum = "7abc8dd8451921606d809ba32e95b6111925cd2906060d2dcc29c070220503eb" dependencies = [ "cfg-if 0.1.10", "libc", - "wasi", + "wasi 0.9.0+wasi-snapshot-preview1", "wasm-bindgen", ] +[[package]] +name = "getrandom" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4060f4657be78b8e766215b02b18a2e862d83745545de804638e2b545e81aee6" +dependencies = [ + "cfg-if 1.0.0", + "libc", + "wasi 0.10.0+wasi-snapshot-preview1", +] + [[package]] name = "ghash" version = "0.3.0" @@ -5326,6 +5337,8 @@ dependencies = [ "polkadot-node-subsystem", "polkadot-node-subsystem-util", "polkadot-primitives", + "rand 0.8.1", + "rand_chacha 0.3.0", "sc-client-api", "sp-core", "streamunordered", @@ -6202,7 +6215,7 @@ version = "0.7.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6a6b1679d49b24bbfe0c803429aa1874472f50d9b363131f0e89fc356b544d03" dependencies = [ - "getrandom", + "getrandom 0.1.14", "libc", "rand_chacha 0.2.2", "rand_core 0.5.1", @@ -6210,6 +6223,18 @@ dependencies = [ "rand_pcg 0.2.1", ] +[[package]] +name = "rand" +version = "0.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c24fcd450d3fa2b592732565aa4f17a27a61c65ece4726353e000939b0edee34" +dependencies = [ + "libc", + "rand_chacha 0.3.0", + "rand_core 0.6.1", + "rand_hc 0.3.0", +] + [[package]] name = "rand_chacha" version = "0.1.1" @@ -6230,6 +6255,16 @@ dependencies = [ "rand_core 0.5.1", ] +[[package]] +name = "rand_chacha" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e12735cf05c9e10bf21534da50a147b924d555dc7a547c42e6bb2d5b6017ae0d" +dependencies = [ + "ppv-lite86", + "rand_core 0.6.1", +] + [[package]] name = "rand_core" version = "0.3.1" @@ -6251,7 +6286,16 @@ version = "0.5.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "90bde5296fc891b0cef12a6d03ddccc162ce7b2aff54160af9338f8d40df6d19" dependencies = [ - "getrandom", + "getrandom 0.1.14", +] + +[[package]] +name = "rand_core" +version = "0.6.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c026d7df8b298d90ccbbc5190bd04d85e159eaf5576caeacf8741da93ccbd2e5" +dependencies = [ + "getrandom 0.2.1", ] [[package]] @@ -6281,6 +6325,15 @@ dependencies = [ "rand_core 0.5.1", ] +[[package]] +name = "rand_hc" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3190ef7066a446f2e7f42e239d161e905420ccab01eb967c9eb27d21b2322a73" +dependencies = [ + "rand_core 0.6.1", +] + [[package]] name = "rand_isaac" version = "0.1.1" @@ -6407,7 +6460,7 @@ version = "0.3.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "09b23093265f8d200fa7b4c2c76297f47e681c655f6f1285a8780d6a022f7431" dependencies = [ - "getrandom", + "getrandom 0.1.14", "redox_syscall", "rust-argon2", ] @@ -7672,7 +7725,7 @@ dependencies = [ "arrayref", "arrayvec 0.5.2", "curve25519-dalek 2.1.0", - "getrandom", + "getrandom 0.1.14", "merlin", "rand 0.7.3", "rand_core 0.5.1", @@ -9875,6 +9928,12 @@ version = "0.9.0+wasi-snapshot-preview1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cccddf32554fecc6acb585f82a32a72e28b48f8c4c1883ddfeeeaa96f7d8e519" +[[package]] +name = "wasi" +version = "0.10.0+wasi-snapshot-preview1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1a143597ca7c7793eff794def352d41792a93c481eb1042423ff7ff72ba2c31f" + [[package]] name = "wasm-bindgen" version = "0.2.69" diff --git a/node/overseer/Cargo.toml b/node/overseer/Cargo.toml index dd9ca81d3f88..e420e785e257 100644 --- a/node/overseer/Cargo.toml +++ b/node/overseer/Cargo.toml @@ -5,17 +5,19 @@ authors = ["Parity Technologies "] edition = "2018" [dependencies] +async-trait = "0.1.42" +client = { package = "sc-client-api", git = "https://github.com/paritytech/substrate", branch = "master" } futures = "0.3.8" -tracing = "0.1.22" -tracing-futures = "0.2.4" futures-timer = "3.0.2" -streamunordered = "0.5.1" +polkadot-node-primitives = { package = "polkadot-node-primitives", path = "../primitives" } +polkadot-node-subsystem-util = { path = "../subsystem-util" } polkadot-primitives = { path = "../../primitives" } -client = { package = "sc-client-api", git = "https://github.com/paritytech/substrate", branch = "master" } polkadot-subsystem = { package = "polkadot-node-subsystem", path = "../subsystem" } -polkadot-node-subsystem-util = { path = "../subsystem-util" } -polkadot-node-primitives = { package = "polkadot-node-primitives", path = "../primitives" } -async-trait = "0.1.42" +rand = "0.8.1" +rand_chacha = "0.3.0" +streamunordered = "0.5.1" +tracing = "0.1.22" +tracing-futures = "0.2.4" [dev-dependencies] sp-core = { git = "https://github.com/paritytech/substrate", branch = "master" } diff --git a/node/overseer/src/lib.rs b/node/overseer/src/lib.rs index 9e5ff2a7548d..daa278d6fbda 100644 --- a/node/overseer/src/lib.rs +++ b/node/overseer/src/lib.rs @@ -59,7 +59,7 @@ // yielding false positives #![warn(missing_docs)] -use std::fmt::Debug; +use std::fmt::{self, Debug}; use std::pin::Pin; use std::sync::Arc; use std::task::Poll; @@ -74,6 +74,7 @@ use futures::{ Future, FutureExt, SinkExt, StreamExt, }; use futures_timer::Delay; +use rand_chacha::ChaCha8Rng as Rng; use streamunordered::{StreamYield, StreamUnordered}; use polkadot_primitives::v1::{Block, BlockNumber, Hash}; @@ -100,6 +101,8 @@ const CHANNEL_CAPACITY: usize = 1024; const STOP_DELAY: u64 = 1; // Target for logs. const LOG_TARGET: &'static str = "overseer"; +// Rate at which messages are timed. +const MESSAGE_TIMER_METRIC_CAPTURE_RATE: f64 = 0.005; /// A type of messages that are sent from [`Subsystem`] to [`Overseer`]. /// @@ -303,6 +306,50 @@ struct SubsystemInstance { pub struct OverseerSubsystemContext{ rx: mpsc::Receiver>, tx: mpsc::Sender, + metrics: Metrics, + rng: Rng, + distribution: rand::distributions::Bernoulli, +} + +impl OverseerSubsystemContext { + /// Create a new `OverseerSubsystemContext` with randomized initial RNG state. + /// + /// The internal RNG is used to determine which messages are timed. + /// + /// `capture_rate` determines what fraction of messages are timed. Its useful values are clamped + /// to the range `0.0..=1.0`. + fn new(rx: mpsc::Receiver>, tx: mpsc::Sender, metrics: Metrics, mut capture_rate: f64) -> Self { + let rng = unimplemented!(); + + if capture_rate < 0.0 { + capture_rate = 0.0; + } else if capture_rate > 1.0 { + capture_rate = 1.0; + } + let distribution = rand::distributions::Bernoulli::new(capture_rate).expect("input is clamped to valid range; qed"); + + OverseerSubsystemContext { rx, tx, metrics, rng, distribution } + } + + /// Create a new `OverseserSubsystemContext` with no metering. + /// + /// Intended for tests. + #[allow(unused)] + fn new_unmetered(rx: mpsc::Receiver>, tx: mpsc::Sender) -> Self { + let metrics = Metrics::default(); + Self::new(rx, tx, metrics, 0.0) + } + + /// Create a timer for a held message, with probability `self.distribution`. + fn time_message_hold(&mut self) -> Option { + use rand::distributions::Distribution; + + if self.distribution.sample(&mut self.rng) { + self.metrics.time_message_hold() + } else { + None + } + } } #[async_trait::async_trait] @@ -964,6 +1011,7 @@ struct MetricsInner { activated_heads_total: prometheus::Counter, deactivated_heads_total: prometheus::Counter, messages_relayed_total: prometheus::Counter, + message_relay_timing: prometheus::Histogram, } #[derive(Default, Clone)] @@ -987,6 +1035,11 @@ impl Metrics { metrics.messages_relayed_total.inc(); } } + + /// Provide a timer for the duration between receiving a message and passing it to `route_message` + fn time_message_hold(&self) -> Option { + self.0.as_ref().map(|metrics| metrics.message_relay_timing.start_timer()) + } } impl metrics::Metrics for Metrics { @@ -1013,11 +1066,39 @@ impl metrics::Metrics for Metrics { )?, registry, )?, + message_relay_timing: prometheus::register( + prometheus::Histogram::with_opts( + prometheus::HistogramOpts { + common_opts: prometheus::Opts::new( + "overseer_messages_relay_timing", + "Time spent holding a message in the overseer before passing it to `route_message`", + ), + // guessing at the desired resolution, but we know that messages will time + // out after 0.5 seconds, so the bucket set below seems plausible: + // `0.0001 * (1.6 ^ 18) ~= 0.472`. Prometheus auto-generates a final bucket + // for all values between the final value and `+Inf`, so this should work. + // + // The documented legal range for the inputs are: + // + // - `> 0.0` + // - `> 1.0` + // - `! 0` + buckets: prometheus::exponential_buckets(0.0001, 1.6, 18).expect("inputs are within documented range; qed"), + } + )?, + registry, + )?, }; Ok(Metrics(Some(metrics))) } } +impl fmt::Debug for Metrics { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.write_str("Metrics {{...}}") + } +} + impl Overseer where S: SpawnNamed, @@ -1134,6 +1215,8 @@ where events_tx: events_tx.clone(), }; + let metrics = ::register(prometheus_registry)?; + let mut running_subsystems_rx = StreamUnordered::new(); let mut running_subsystems = FuturesUnordered::new(); @@ -1142,6 +1225,7 @@ where &mut running_subsystems, &mut running_subsystems_rx, all_subsystems.candidate_validation, + &metrics, )?; let candidate_backing_subsystem = spawn( @@ -1149,6 +1233,7 @@ where &mut running_subsystems, &mut running_subsystems_rx, all_subsystems.candidate_backing, + &metrics, )?; let candidate_selection_subsystem = spawn( @@ -1156,6 +1241,7 @@ where &mut running_subsystems, &mut running_subsystems_rx, all_subsystems.candidate_selection, + &metrics, )?; let statement_distribution_subsystem = spawn( @@ -1163,6 +1249,7 @@ where &mut running_subsystems, &mut running_subsystems_rx, all_subsystems.statement_distribution, + &metrics, )?; let availability_distribution_subsystem = spawn( @@ -1170,6 +1257,7 @@ where &mut running_subsystems, &mut running_subsystems_rx, all_subsystems.availability_distribution, + &metrics, )?; let bitfield_signing_subsystem = spawn( @@ -1177,6 +1265,7 @@ where &mut running_subsystems, &mut running_subsystems_rx, all_subsystems.bitfield_signing, + &metrics, )?; let bitfield_distribution_subsystem = spawn( @@ -1184,6 +1273,7 @@ where &mut running_subsystems, &mut running_subsystems_rx, all_subsystems.bitfield_distribution, + &metrics, )?; let provisioner_subsystem = spawn( @@ -1191,6 +1281,7 @@ where &mut running_subsystems, &mut running_subsystems_rx, all_subsystems.provisioner, + &metrics, )?; let pov_distribution_subsystem = spawn( @@ -1198,6 +1289,7 @@ where &mut running_subsystems, &mut running_subsystems_rx, all_subsystems.pov_distribution, + &metrics, )?; let runtime_api_subsystem = spawn( @@ -1205,6 +1297,7 @@ where &mut running_subsystems, &mut running_subsystems_rx, all_subsystems.runtime_api, + &metrics, )?; let availability_store_subsystem = spawn( @@ -1212,6 +1305,7 @@ where &mut running_subsystems, &mut running_subsystems_rx, all_subsystems.availability_store, + &metrics, )?; let network_bridge_subsystem = spawn( @@ -1219,6 +1313,7 @@ where &mut running_subsystems, &mut running_subsystems_rx, all_subsystems.network_bridge, + &metrics, )?; let chain_api_subsystem = spawn( @@ -1226,6 +1321,7 @@ where &mut running_subsystems, &mut running_subsystems_rx, all_subsystems.chain_api, + &metrics, )?; let collation_generation_subsystem = spawn( @@ -1233,6 +1329,7 @@ where &mut running_subsystems, &mut running_subsystems_rx, all_subsystems.collation_generation, + &metrics, )?; @@ -1241,6 +1338,7 @@ where &mut running_subsystems, &mut running_subsystems_rx, all_subsystems.collator_protocol, + &metrics, )?; let leaves = leaves @@ -1249,8 +1347,6 @@ where .collect(); let active_leaves = HashMap::new(); - - let metrics = ::register(prometheus_registry)?; let activation_external_listeners = HashMap::new(); let this = Self { @@ -1575,10 +1671,11 @@ fn spawn( futures: &mut FuturesUnordered>>, streams: &mut StreamUnordered>, s: impl Subsystem>, + metrics: &Metrics, ) -> SubsystemResult> { let (to_tx, to_rx) = mpsc::channel(CHANNEL_CAPACITY); let (from_tx, from_rx) = mpsc::channel(CHANNEL_CAPACITY); - let ctx = OverseerSubsystemContext { rx: to_rx, tx: from_tx }; + let ctx = OverseerSubsystemContext::new(to_rx, from_tx, metrics.clone(), MESSAGE_TIMER_METRIC_CAPTURE_RATE); let SpawnedSubsystem { future, name } = s.start(ctx); let (tx, rx) = oneshot::channel(); From c92bd37e5e6d196570188efa0d80a6cbab46af87 Mon Sep 17 00:00:00 2001 From: Peter Goodspeed-Niklaus Date: Tue, 5 Jan 2021 14:46:37 +0100 Subject: [PATCH 2/9] figure out how to initialize the rng --- node/overseer/Cargo.toml | 2 +- node/overseer/src/lib.rs | 3 ++- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/node/overseer/Cargo.toml b/node/overseer/Cargo.toml index e420e785e257..fa1210554c47 100644 --- a/node/overseer/Cargo.toml +++ b/node/overseer/Cargo.toml @@ -14,7 +14,7 @@ polkadot-node-subsystem-util = { path = "../subsystem-util" } polkadot-primitives = { path = "../../primitives" } polkadot-subsystem = { package = "polkadot-node-subsystem", path = "../subsystem" } rand = "0.8.1" -rand_chacha = "0.3.0" +rand_chacha = { version = "0.3.0", feaures = [ "getrandom" ] } streamunordered = "0.5.1" tracing = "0.1.22" tracing-futures = "0.2.4" diff --git a/node/overseer/src/lib.rs b/node/overseer/src/lib.rs index daa278d6fbda..abb6656feed5 100644 --- a/node/overseer/src/lib.rs +++ b/node/overseer/src/lib.rs @@ -319,7 +319,8 @@ impl OverseerSubsystemContext { /// `capture_rate` determines what fraction of messages are timed. Its useful values are clamped /// to the range `0.0..=1.0`. fn new(rx: mpsc::Receiver>, tx: mpsc::Sender, metrics: Metrics, mut capture_rate: f64) -> Self { - let rng = unimplemented!(); + use rand_chacha::rand_core::SeedableRng; + let rng = Rng::from_entropy(); if capture_rate < 0.0 { capture_rate = 0.0; From 45aa7a95be5f170ed305cfbd03e9b541a7691935 Mon Sep 17 00:00:00 2001 From: Peter Goodspeed-Niklaus Date: Tue, 5 Jan 2021 16:44:48 +0100 Subject: [PATCH 3/9] attach a timer to a portion of the messages traveling to the Overseer This timer only exists / logs a fraction of the time (configurable by `MESSAGE_TIMER_METRIC_CAPTURE_RATE`). When it exists, it tracks the span between the `OverSubsystemContext` receiving the message and its receipt in `Overseer::run`. --- node/overseer/src/lib.rs | 112 +++++++++++++++++++++++++++++++-------- 1 file changed, 89 insertions(+), 23 deletions(-) diff --git a/node/overseer/src/lib.rs b/node/overseer/src/lib.rs index abb6656feed5..8ffb611ee093 100644 --- a/node/overseer/src/lib.rs +++ b/node/overseer/src/lib.rs @@ -295,6 +295,20 @@ struct SubsystemInstance { name: &'static str, } +type MaybeTimer = Option; + +#[derive(Debug)] +struct MaybeTimed { + timer: MaybeTimer, + t: T, +} + +impl MaybeTimed { + fn into_inner(self) -> T { + self.t + } +} + /// A context type that is given to the [`Subsystem`] upon spawning. /// It can be used by [`Subsystem`] to communicate with other [`Subsystem`]s /// or to spawn it's [`SubsystemJob`]s. @@ -305,7 +319,7 @@ struct SubsystemInstance { #[derive(Debug)] pub struct OverseerSubsystemContext{ rx: mpsc::Receiver>, - tx: mpsc::Sender, + tx: mpsc::Sender>, metrics: Metrics, rng: Rng, distribution: rand::distributions::Bernoulli, @@ -316,9 +330,9 @@ impl OverseerSubsystemContext { /// /// The internal RNG is used to determine which messages are timed. /// - /// `capture_rate` determines what fraction of messages are timed. Its useful values are clamped + /// `capture_rate` determines what fraction of messages are timed. Its value is clamped /// to the range `0.0..=1.0`. - fn new(rx: mpsc::Receiver>, tx: mpsc::Sender, metrics: Metrics, mut capture_rate: f64) -> Self { + fn new(rx: mpsc::Receiver>, tx: mpsc::Sender>, metrics: Metrics, mut capture_rate: f64) -> Self { use rand_chacha::rand_core::SeedableRng; let rng = Rng::from_entropy(); @@ -336,19 +350,50 @@ impl OverseerSubsystemContext { /// /// Intended for tests. #[allow(unused)] - fn new_unmetered(rx: mpsc::Receiver>, tx: mpsc::Sender) -> Self { + fn new_unmetered(rx: mpsc::Receiver>, tx: mpsc::Sender>) -> Self { let metrics = Metrics::default(); Self::new(rx, tx, metrics, 0.0) } - /// Create a timer for a held message, with probability `self.distribution`. - fn time_message_hold(&mut self) -> Option { + fn maybe_timed(&mut self, t: T) -> MaybeTimed { use rand::distributions::Distribution; - if self.distribution.sample(&mut self.rng) { + let timer = if self.distribution.sample(&mut self.rng) { self.metrics.time_message_hold() } else { None + }; + + MaybeTimed { timer, t } + } + + /// Make a standalone function which can construct a `MaybeTimed` wrapper around some `T` + /// without borrowing `self`. + /// + /// This is somewhat more expensive than `self.maybe_timed` because it must clone some stuff. + fn make_maybe_timed(&mut self) -> impl FnMut(T) -> MaybeTimed { + use rand::{RngCore, SeedableRng}; + + // We don't want to simply clone this RNG because we don't want to duplicate its state. + // It's not ever going to be used for cryptographic purposes, but it's still better to + // keep good habits. + let mut child_seed = [0_u8; 32]; + self.rng.fill_bytes(&mut child_seed); + let mut rng = Rng::from_seed(child_seed); + + let metrics = self.metrics.clone(); + let distribution = self.distribution.clone(); + + move |t| { + use rand::distributions::Distribution; + + let timer = if distribution.sample(&mut rng) { + metrics.time_message_hold() + } else { + None + }; + + MaybeTimed { timer, t } } } } @@ -376,7 +421,7 @@ impl SubsystemContext for OverseerSubsystemContext { async fn spawn(&mut self, name: &'static str, s: Pin + Send>>) -> SubsystemResult<()> { - self.tx.send(ToOverseer::SpawnJob { + self.send_timed(ToOverseer::SpawnJob { name, s, }).await.map_err(Into::into) @@ -385,7 +430,7 @@ impl SubsystemContext for OverseerSubsystemContext { async fn spawn_blocking(&mut self, name: &'static str, s: Pin + Send>>) -> SubsystemResult<()> { - self.tx.send(ToOverseer::SpawnBlockingJob { + self.send_timed(ToOverseer::SpawnBlockingJob { name, s, }).await.map_err(Into::into) @@ -398,25 +443,46 @@ impl SubsystemContext for OverseerSubsystemContext { async fn send_messages(&mut self, msgs: T) where T: IntoIterator + Send, T::IntoIter: Send { - let mut msgs = stream::iter(msgs.into_iter().map(ToOverseer::SubsystemMessage).map(Ok)); - if self.tx.send_all(&mut msgs).await.is_err() { + self.send_all_timed_or_log(msgs).await + } +} + +impl OverseerSubsystemContext { + async fn send_and_log_error(&mut self, msg: ToOverseer) { + if self.send_timed(msg).await.is_err() { tracing::debug!( target: LOG_TARGET, msg_type = std::any::type_name::(), - "Failed to send messages to Overseer", + "Failed to send a message to Overseer", ); - } } -} -impl OverseerSubsystemContext { - async fn send_and_log_error(&mut self, msg: ToOverseer) { - if self.tx.send(msg).await.is_err() { + async fn send_timed(&mut self, msg: ToOverseer) -> Result< + (), + > as futures::Sink>>::Error + > + { + let msg = self.maybe_timed(msg); + self.tx.send(msg).await + } + + async fn send_all_timed_or_log(&mut self, msgs: Msgs) + where + Msgs: IntoIterator + Send, + Msgs::IntoIter: Send, + Msg: Into + Send, + { + let mut maybe_timed = self.make_maybe_timed(); + let mut msgs = stream::iter( + msgs.into_iter() + .map(move |msg| Ok(maybe_timed(ToOverseer::SubsystemMessage(msg.into())))) + ); + if self.tx.send_all(&mut msgs).await.is_err() { tracing::debug!( target: LOG_TARGET, msg_type = std::any::type_name::(), - "Failed to send a message to Overseer", + "Failed to send messages to Overseer", ); } } @@ -529,8 +595,8 @@ pub struct Overseer { /// Here we keep handles to spawned subsystems to be notified when they terminate. running_subsystems: FuturesUnordered>>, - /// Gather running subsystms' outbound streams into one. - running_subsystems_rx: StreamUnordered>, + /// Gather running subsystems' outbound streams into one. + running_subsystems_rx: StreamUnordered>>, /// Events that are sent to the overseer from the outside world events_rx: mpsc::Receiver, @@ -1038,7 +1104,7 @@ impl Metrics { } /// Provide a timer for the duration between receiving a message and passing it to `route_message` - fn time_message_hold(&self) -> Option { + fn time_message_hold(&self) -> MaybeTimer { self.0.as_ref().map(|metrics| metrics.message_relay_timing.start_timer()) } } @@ -1455,7 +1521,7 @@ where }, msg = self.running_subsystems_rx.next().fuse() => { let msg = if let Some((StreamYield::Item(msg), _)) = msg { - msg + msg.into_inner() } else { continue }; @@ -1670,7 +1736,7 @@ where fn spawn( spawner: &mut S, futures: &mut FuturesUnordered>>, - streams: &mut StreamUnordered>, + streams: &mut StreamUnordered>>, s: impl Subsystem>, metrics: &Metrics, ) -> SubsystemResult> { From a0a15e57fa7fddb45fccbb9b00f905f904f326ec Mon Sep 17 00:00:00 2001 From: Peter Goodspeed-Niklaus Date: Tue, 5 Jan 2021 17:06:34 +0100 Subject: [PATCH 4/9] propagate message timing to the start of route_message This should be more accurate; it ensures that the timer runs at least as long as that function. As `route_message` is async, it may not actually run for some time after it is called (or ever). --- node/overseer/src/lib.rs | 20 +++++++++++++++----- 1 file changed, 15 insertions(+), 5 deletions(-) diff --git a/node/overseer/src/lib.rs b/node/overseer/src/lib.rs index 8ffb611ee093..63b968eefdad 100644 --- a/node/overseer/src/lib.rs +++ b/node/overseer/src/lib.rs @@ -309,6 +309,12 @@ impl MaybeTimed { } } +impl From for MaybeTimed { + fn from(t: T) -> Self { + Self { timer: None, t } + } +} + /// A context type that is given to the [`Subsystem`] upon spawning. /// It can be used by [`Subsystem`] to communicate with other [`Subsystem`]s /// or to spawn it's [`SubsystemJob`]s. @@ -1502,7 +1508,7 @@ where match msg { Event::MsgToSubsystem(msg) => { - self.route_message(msg).await?; + self.route_message(msg.into()).await?; } Event::Stop => { self.stop().await; @@ -1520,14 +1526,17 @@ where } }, msg = self.running_subsystems_rx.next().fuse() => { - let msg = if let Some((StreamYield::Item(msg), _)) = msg { - msg.into_inner() + let MaybeTimed { timer, t: msg } = if let Some((StreamYield::Item(msg), _)) = msg { + msg } else { continue }; match msg { - ToOverseer::SubsystemMessage(msg) => self.route_message(msg).await?, + ToOverseer::SubsystemMessage(msg) => { + let msg = MaybeTimed { timer, t: msg }; + self.route_message(msg).await? + }, ToOverseer::SpawnJob { name, s } => { self.spawn_job(name, s); } @@ -1628,7 +1637,8 @@ where } #[tracing::instrument(level = "trace", skip(self), fields(subsystem = LOG_TARGET))] - async fn route_message(&mut self, msg: AllMessages) -> SubsystemResult<()> { + async fn route_message(&mut self, msg: MaybeTimed) -> SubsystemResult<()> { + let msg = msg.into_inner(); self.metrics.on_message_relayed(); match msg { AllMessages::CandidateValidation(msg) => { From 9b7c2641459108306aed024448f1823b96aa0a57 Mon Sep 17 00:00:00 2001 From: Peter Goodspeed-Niklaus Date: Wed, 6 Jan 2021 09:32:02 +0100 Subject: [PATCH 5/9] fix failing test --- node/overseer/src/lib.rs | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/node/overseer/src/lib.rs b/node/overseer/src/lib.rs index 63b968eefdad..1ddeb3156165 100644 --- a/node/overseer/src/lib.rs +++ b/node/overseer/src/lib.rs @@ -2002,12 +2002,13 @@ mod tests { fn extract_metrics(registry: &prometheus::Registry) -> HashMap<&'static str, u64> { let gather = registry.gather(); - assert_eq!(gather[0].get_name(), "parachain_activated_heads_total"); - assert_eq!(gather[1].get_name(), "parachain_deactivated_heads_total"); - assert_eq!(gather[2].get_name(), "parachain_messages_relayed_total"); - let activated = gather[0].get_metric()[0].get_counter().get_value() as u64; - let deactivated = gather[1].get_metric()[0].get_counter().get_value() as u64; - let relayed = gather[2].get_metric()[0].get_counter().get_value() as u64; + assert_eq!(gather[0].get_name(), "overseer_messages_relay_timing"); + assert_eq!(gather[1].get_name(), "parachain_activated_heads_total"); + assert_eq!(gather[2].get_name(), "parachain_deactivated_heads_total"); + assert_eq!(gather[3].get_name(), "parachain_messages_relayed_total"); + let activated = gather[1].get_metric()[0].get_counter().get_value() as u64; + let deactivated = gather[2].get_metric()[0].get_counter().get_value() as u64; + let relayed = gather[3].get_metric()[0].get_counter().get_value() as u64; let mut result = HashMap::new(); result.insert("activated", activated); result.insert("deactivated", deactivated); From 88bbc7fa09b6e96fab22b4c3657b8e79f385bcd9 Mon Sep 17 00:00:00 2001 From: Peter Goodspeed-Niklaus Date: Wed, 6 Jan 2021 09:33:13 +0100 Subject: [PATCH 6/9] rand_chacha apparently implicitly has getrandom feature --- node/overseer/Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/node/overseer/Cargo.toml b/node/overseer/Cargo.toml index fa1210554c47..e420e785e257 100644 --- a/node/overseer/Cargo.toml +++ b/node/overseer/Cargo.toml @@ -14,7 +14,7 @@ polkadot-node-subsystem-util = { path = "../subsystem-util" } polkadot-primitives = { path = "../../primitives" } polkadot-subsystem = { package = "polkadot-node-subsystem", path = "../subsystem" } rand = "0.8.1" -rand_chacha = { version = "0.3.0", feaures = [ "getrandom" ] } +rand_chacha = "0.3.0" streamunordered = "0.5.1" tracing = "0.1.22" tracing-futures = "0.2.4" From f43d96460c6384cec4c1a0e030cf072354b255b4 Mon Sep 17 00:00:00 2001 From: Peter Goodspeed-Niklaus Date: Wed, 6 Jan 2021 10:25:29 +0100 Subject: [PATCH 7/9] change rng initialization The previous impl using `from_entropy` depends on the `getrandom` crate, which uses the system entropy source, and which does not work on `wasm32-unknown-unknown` because it wants to fall back to a JS implementation which we can't assume exists. This impl depends only on `rand::thread_rng`, which has no documentation stating that it's similarly limited. --- node/overseer/src/lib.rs | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/node/overseer/src/lib.rs b/node/overseer/src/lib.rs index 1ddeb3156165..edc2873933e2 100644 --- a/node/overseer/src/lib.rs +++ b/node/overseer/src/lib.rs @@ -340,7 +340,7 @@ impl OverseerSubsystemContext { /// to the range `0.0..=1.0`. fn new(rx: mpsc::Receiver>, tx: mpsc::Sender>, metrics: Metrics, mut capture_rate: f64) -> Self { use rand_chacha::rand_core::SeedableRng; - let rng = Rng::from_entropy(); + let rng = Rng::from_rng(rand::thread_rng()).expect("can fail only if the inner RNG is fallible; thread_rng is infallible; qed"); if capture_rate < 0.0 { capture_rate = 0.0; @@ -357,8 +357,13 @@ impl OverseerSubsystemContext { /// Intended for tests. #[allow(unused)] fn new_unmetered(rx: mpsc::Receiver>, tx: mpsc::Sender>) -> Self { + use rand_chacha::rand_core::SeedableRng; + let rng = Rng::seed_from_u64(0); + let metrics = Metrics::default(); - Self::new(rx, tx, metrics, 0.0) + let distribution = rand::distributions::Bernoulli::new(0.0).expect("input is within valid range; qed"); + + OverseerSubsystemContext { rx, tx, metrics, rng, distribution } } fn maybe_timed(&mut self, t: T) -> MaybeTimed { From 0ab8594c328b3f9ce1f696fe405556d4000630e9 Mon Sep 17 00:00:00 2001 From: Peter Goodspeed-Niklaus Date: Wed, 6 Jan 2021 10:59:03 +0100 Subject: [PATCH 8/9] remove randomness in favor of a simpler 1 of N procedure This deserves a bit of explanation, as the motivating issue explicitly requested randomness. In short, it's hard to get randomness to compile for `wasm32-unknown-unknown` because that is explicitly intended to be as deterministic as practical. Additionally, even though it would never be used for consensus purposes, it still felt offputting to intentionally introduce randomness into a node's operations. Except, it wasn't really random, either: it was a deterministic PRNG varying only in its state, and getting the state to work right for that target would have required initializing from a constant. Given that it was a deterministic sequence anyway, it seemed much simpler and more explicit to simply select one of each N messages instead of attempting any kind of realistic randomness. --- Cargo.lock | 69 +++-------------------------------- node/overseer/Cargo.toml | 2 -- node/overseer/src/lib.rs | 78 +++++++++++++++++++--------------------- 3 files changed, 41 insertions(+), 108 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 71f640aaa5a6..2bb7c933b6d0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1936,21 +1936,10 @@ checksum = "7abc8dd8451921606d809ba32e95b6111925cd2906060d2dcc29c070220503eb" dependencies = [ "cfg-if 0.1.10", "libc", - "wasi 0.9.0+wasi-snapshot-preview1", + "wasi", "wasm-bindgen", ] -[[package]] -name = "getrandom" -version = "0.2.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4060f4657be78b8e766215b02b18a2e862d83745545de804638e2b545e81aee6" -dependencies = [ - "cfg-if 1.0.0", - "libc", - "wasi 0.10.0+wasi-snapshot-preview1", -] - [[package]] name = "ghash" version = "0.3.0" @@ -5337,8 +5326,6 @@ dependencies = [ "polkadot-node-subsystem", "polkadot-node-subsystem-util", "polkadot-primitives", - "rand 0.8.1", - "rand_chacha 0.3.0", "sc-client-api", "sp-core", "streamunordered", @@ -6215,7 +6202,7 @@ version = "0.7.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6a6b1679d49b24bbfe0c803429aa1874472f50d9b363131f0e89fc356b544d03" dependencies = [ - "getrandom 0.1.14", + "getrandom", "libc", "rand_chacha 0.2.2", "rand_core 0.5.1", @@ -6223,18 +6210,6 @@ dependencies = [ "rand_pcg 0.2.1", ] -[[package]] -name = "rand" -version = "0.8.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c24fcd450d3fa2b592732565aa4f17a27a61c65ece4726353e000939b0edee34" -dependencies = [ - "libc", - "rand_chacha 0.3.0", - "rand_core 0.6.1", - "rand_hc 0.3.0", -] - [[package]] name = "rand_chacha" version = "0.1.1" @@ -6255,16 +6230,6 @@ dependencies = [ "rand_core 0.5.1", ] -[[package]] -name = "rand_chacha" -version = "0.3.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e12735cf05c9e10bf21534da50a147b924d555dc7a547c42e6bb2d5b6017ae0d" -dependencies = [ - "ppv-lite86", - "rand_core 0.6.1", -] - [[package]] name = "rand_core" version = "0.3.1" @@ -6286,16 +6251,7 @@ version = "0.5.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "90bde5296fc891b0cef12a6d03ddccc162ce7b2aff54160af9338f8d40df6d19" dependencies = [ - "getrandom 0.1.14", -] - -[[package]] -name = "rand_core" -version = "0.6.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c026d7df8b298d90ccbbc5190bd04d85e159eaf5576caeacf8741da93ccbd2e5" -dependencies = [ - "getrandom 0.2.1", + "getrandom", ] [[package]] @@ -6325,15 +6281,6 @@ dependencies = [ "rand_core 0.5.1", ] -[[package]] -name = "rand_hc" -version = "0.3.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3190ef7066a446f2e7f42e239d161e905420ccab01eb967c9eb27d21b2322a73" -dependencies = [ - "rand_core 0.6.1", -] - [[package]] name = "rand_isaac" version = "0.1.1" @@ -6460,7 +6407,7 @@ version = "0.3.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "09b23093265f8d200fa7b4c2c76297f47e681c655f6f1285a8780d6a022f7431" dependencies = [ - "getrandom 0.1.14", + "getrandom", "redox_syscall", "rust-argon2", ] @@ -7725,7 +7672,7 @@ dependencies = [ "arrayref", "arrayvec 0.5.2", "curve25519-dalek 2.1.0", - "getrandom 0.1.14", + "getrandom", "merlin", "rand 0.7.3", "rand_core 0.5.1", @@ -9928,12 +9875,6 @@ version = "0.9.0+wasi-snapshot-preview1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cccddf32554fecc6acb585f82a32a72e28b48f8c4c1883ddfeeeaa96f7d8e519" -[[package]] -name = "wasi" -version = "0.10.0+wasi-snapshot-preview1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1a143597ca7c7793eff794def352d41792a93c481eb1042423ff7ff72ba2c31f" - [[package]] name = "wasm-bindgen" version = "0.2.69" diff --git a/node/overseer/Cargo.toml b/node/overseer/Cargo.toml index e420e785e257..ec5edfb0c7d5 100644 --- a/node/overseer/Cargo.toml +++ b/node/overseer/Cargo.toml @@ -13,8 +13,6 @@ polkadot-node-primitives = { package = "polkadot-node-primitives", path = "../pr polkadot-node-subsystem-util = { path = "../subsystem-util" } polkadot-primitives = { path = "../../primitives" } polkadot-subsystem = { package = "polkadot-node-subsystem", path = "../subsystem" } -rand = "0.8.1" -rand_chacha = "0.3.0" streamunordered = "0.5.1" tracing = "0.1.22" tracing-futures = "0.2.4" diff --git a/node/overseer/src/lib.rs b/node/overseer/src/lib.rs index edc2873933e2..0864b7811527 100644 --- a/node/overseer/src/lib.rs +++ b/node/overseer/src/lib.rs @@ -74,7 +74,6 @@ use futures::{ Future, FutureExt, SinkExt, StreamExt, }; use futures_timer::Delay; -use rand_chacha::ChaCha8Rng as Rng; use streamunordered::{StreamYield, StreamUnordered}; use polkadot_primitives::v1::{Block, BlockNumber, Hash}; @@ -101,8 +100,8 @@ const CHANNEL_CAPACITY: usize = 1024; const STOP_DELAY: u64 = 1; // Target for logs. const LOG_TARGET: &'static str = "overseer"; -// Rate at which messages are timed. -const MESSAGE_TIMER_METRIC_CAPTURE_RATE: f64 = 0.005; +// 1 message out of each of this many is timed. +const MESSAGE_TIMER_METRIC_CAPTURE_RATE: u32 = 256; /// A type of messages that are sent from [`Subsystem`] to [`Overseer`]. /// @@ -327,29 +326,17 @@ pub struct OverseerSubsystemContext{ rx: mpsc::Receiver>, tx: mpsc::Sender>, metrics: Metrics, - rng: Rng, - distribution: rand::distributions::Bernoulli, + capture_each: u32, + capture_state: u32, } impl OverseerSubsystemContext { - /// Create a new `OverseerSubsystemContext` with randomized initial RNG state. + /// Create a new `OverseerSubsystemContext`. /// - /// The internal RNG is used to determine which messages are timed. - /// - /// `capture_rate` determines what fraction of messages are timed. Its value is clamped - /// to the range `0.0..=1.0`. - fn new(rx: mpsc::Receiver>, tx: mpsc::Sender>, metrics: Metrics, mut capture_rate: f64) -> Self { - use rand_chacha::rand_core::SeedableRng; - let rng = Rng::from_rng(rand::thread_rng()).expect("can fail only if the inner RNG is fallible; thread_rng is infallible; qed"); - - if capture_rate < 0.0 { - capture_rate = 0.0; - } else if capture_rate > 1.0 { - capture_rate = 1.0; - } - let distribution = rand::distributions::Bernoulli::new(capture_rate).expect("input is clamped to valid range; qed"); - - OverseerSubsystemContext { rx, tx, metrics, rng, distribution } + /// `capture_each` determines which messages are timed, i.e. a value of 256 means that every 256th + /// message is timed. A value of `0` has a special meaning: no messages are timed. + fn new(rx: mpsc::Receiver>, tx: mpsc::Sender>, metrics: Metrics, capture_each: u32) -> Self { + OverseerSubsystemContext { rx, tx, metrics, capture_each, capture_state: 0 } } /// Create a new `OverseserSubsystemContext` with no metering. @@ -357,19 +344,24 @@ impl OverseerSubsystemContext { /// Intended for tests. #[allow(unused)] fn new_unmetered(rx: mpsc::Receiver>, tx: mpsc::Sender>) -> Self { - use rand_chacha::rand_core::SeedableRng; - let rng = Rng::seed_from_u64(0); - let metrics = Metrics::default(); - let distribution = rand::distributions::Bernoulli::new(0.0).expect("input is within valid range; qed"); + OverseerSubsystemContext::new(rx, tx, metrics, 0) + } - OverseerSubsystemContext { rx, tx, metrics, rng, distribution } + fn should_capture(&mut self) -> bool { + if self.capture_each == 0 { + false + } else { + self.capture_state += 1; + if self.capture_state >= self.capture_each { + self.capture_state = 0; + } + self.capture_state == 0 + } } fn maybe_timed(&mut self, t: T) -> MaybeTimed { - use rand::distributions::Distribution; - - let timer = if self.distribution.sample(&mut self.rng) { + let timer = if self.should_capture() { self.metrics.time_message_hold() } else { None @@ -383,22 +375,24 @@ impl OverseerSubsystemContext { /// /// This is somewhat more expensive than `self.maybe_timed` because it must clone some stuff. fn make_maybe_timed(&mut self) -> impl FnMut(T) -> MaybeTimed { - use rand::{RngCore, SeedableRng}; - - // We don't want to simply clone this RNG because we don't want to duplicate its state. - // It's not ever going to be used for cryptographic purposes, but it's still better to - // keep good habits. - let mut child_seed = [0_u8; 32]; - self.rng.fill_bytes(&mut child_seed); - let mut rng = Rng::from_seed(child_seed); - let metrics = self.metrics.clone(); - let distribution = self.distribution.clone(); + let capture_each = self.capture_each; + let mut capture_state = self.capture_state; move |t| { - use rand::distributions::Distribution; + let mut should_capture = move || { + if capture_each == 0 { + false + } else { + capture_state += 1; + if capture_state >= capture_each { + capture_state = 0; + } + capture_state == 0 + } + }; - let timer = if distribution.sample(&mut rng) { + let timer = if should_capture() { metrics.time_message_hold() } else { None From ea593098feb8a1de368861be00aab837752551f4 Mon Sep 17 00:00:00 2001 From: Peter Goodspeed-Niklaus Date: Wed, 6 Jan 2021 12:40:08 +0100 Subject: [PATCH 9/9] reinstate randomness for better statistical properties This partially reverts commit 0ab8594c328b3f9ce1f696fe405556d4000630e9. `oorandom` is much lighter than the previous `rand`-based implementation, which makes this easier to work with. This implementation gives each subsystem and each child RNG a distinct increment, which should ensure they produce distinct streams of values. --- Cargo.lock | 7 +++ node/overseer/Cargo.toml | 1 + node/overseer/src/lib.rs | 97 ++++++++++++++++++++++++---------------- 3 files changed, 67 insertions(+), 38 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 2bb7c933b6d0..87d5fc3b2505 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3841,6 +3841,12 @@ dependencies = [ "parking_lot 0.11.1", ] +[[package]] +name = "oorandom" +version = "11.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0ab1bc2a289d34bd04a330323ac98a1b4bc82c9d9fcb1e66b63caa84da26b575" + [[package]] name = "opaque-debug" version = "0.2.3" @@ -5321,6 +5327,7 @@ dependencies = [ "futures 0.3.8", "futures-timer 3.0.2", "kv-log-macro", + "oorandom", "polkadot-node-network-protocol", "polkadot-node-primitives", "polkadot-node-subsystem", diff --git a/node/overseer/Cargo.toml b/node/overseer/Cargo.toml index ec5edfb0c7d5..41c114f15d4f 100644 --- a/node/overseer/Cargo.toml +++ b/node/overseer/Cargo.toml @@ -9,6 +9,7 @@ async-trait = "0.1.42" client = { package = "sc-client-api", git = "https://github.com/paritytech/substrate", branch = "master" } futures = "0.3.8" futures-timer = "3.0.2" +oorandom = "11.1.3" polkadot-node-primitives = { package = "polkadot-node-primitives", path = "../primitives" } polkadot-node-subsystem-util = { path = "../subsystem-util" } polkadot-primitives = { path = "../../primitives" } diff --git a/node/overseer/src/lib.rs b/node/overseer/src/lib.rs index 0864b7811527..cc5e9401cc11 100644 --- a/node/overseer/src/lib.rs +++ b/node/overseer/src/lib.rs @@ -74,6 +74,7 @@ use futures::{ Future, FutureExt, SinkExt, StreamExt, }; use futures_timer::Delay; +use oorandom::Rand32; use streamunordered::{StreamYield, StreamUnordered}; use polkadot_primitives::v1::{Block, BlockNumber, Hash}; @@ -100,8 +101,8 @@ const CHANNEL_CAPACITY: usize = 1024; const STOP_DELAY: u64 = 1; // Target for logs. const LOG_TARGET: &'static str = "overseer"; -// 1 message out of each of this many is timed. -const MESSAGE_TIMER_METRIC_CAPTURE_RATE: u32 = 256; +// Rate at which messages are timed. +const MESSAGE_TIMER_METRIC_CAPTURE_RATE: f64 = 0.005; /// A type of messages that are sent from [`Subsystem`] to [`Overseer`]. /// @@ -326,17 +327,35 @@ pub struct OverseerSubsystemContext{ rx: mpsc::Receiver>, tx: mpsc::Sender>, metrics: Metrics, - capture_each: u32, - capture_state: u32, + rng: Rand32, + threshold: u32, } impl OverseerSubsystemContext { /// Create a new `OverseerSubsystemContext`. /// - /// `capture_each` determines which messages are timed, i.e. a value of 256 means that every 256th - /// message is timed. A value of `0` has a special meaning: no messages are timed. - fn new(rx: mpsc::Receiver>, tx: mpsc::Sender>, metrics: Metrics, capture_each: u32) -> Self { - OverseerSubsystemContext { rx, tx, metrics, capture_each, capture_state: 0 } + /// `increment` determines the initial increment of the internal RNG. + /// The internal RNG is used to determine which messages are timed. + /// + /// `capture_rate` determines what fraction of messages are timed. Its value is clamped + /// to the range `0.0..=1.0`. + fn new( + rx: mpsc::Receiver>, + tx: mpsc::Sender>, + metrics: Metrics, + increment: u64, + mut capture_rate: f64, + ) -> Self { + let rng = Rand32::new_inc(0, increment); + + if capture_rate < 0.0 { + capture_rate = 0.0; + } else if capture_rate > 1.0 { + capture_rate = 1.0; + } + let threshold = (capture_rate * u32::MAX as f64) as u32; + + OverseerSubsystemContext { rx, tx, metrics, rng, threshold } } /// Create a new `OverseserSubsystemContext` with no metering. @@ -345,23 +364,11 @@ impl OverseerSubsystemContext { #[allow(unused)] fn new_unmetered(rx: mpsc::Receiver>, tx: mpsc::Sender>) -> Self { let metrics = Metrics::default(); - OverseerSubsystemContext::new(rx, tx, metrics, 0) - } - - fn should_capture(&mut self) -> bool { - if self.capture_each == 0 { - false - } else { - self.capture_state += 1; - if self.capture_state >= self.capture_each { - self.capture_state = 0; - } - self.capture_state == 0 - } + OverseerSubsystemContext::new(rx, tx, metrics, 0, 0.0) } fn maybe_timed(&mut self, t: T) -> MaybeTimed { - let timer = if self.should_capture() { + let timer = if self.rng.rand_u32() <= self.threshold { self.metrics.time_message_hold() } else { None @@ -375,24 +382,17 @@ impl OverseerSubsystemContext { /// /// This is somewhat more expensive than `self.maybe_timed` because it must clone some stuff. fn make_maybe_timed(&mut self) -> impl FnMut(T) -> MaybeTimed { + // We don't want to simply clone this RNG because we don't want to duplicate its state. + // It's not ever going to be used for cryptographic purposes, but it's still better to + // keep good habits. + let (seed, increment) = self.rng.state(); + let mut rng = Rand32::new_inc(seed, increment + 1); + let metrics = self.metrics.clone(); - let capture_each = self.capture_each; - let mut capture_state = self.capture_state; + let threshold = self.threshold; move |t| { - let mut should_capture = move || { - if capture_each == 0 { - false - } else { - capture_state += 1; - if capture_state >= capture_each { - capture_state = 0; - } - capture_state == 0 - } - }; - - let timer = if should_capture() { + let timer = if rng.rand_u32() <= threshold { metrics.time_message_hold() } else { None @@ -1292,12 +1292,15 @@ where let mut running_subsystems_rx = StreamUnordered::new(); let mut running_subsystems = FuturesUnordered::new(); + let mut seed = 0x533d; // arbitrary + let candidate_validation_subsystem = spawn( &mut s, &mut running_subsystems, &mut running_subsystems_rx, all_subsystems.candidate_validation, &metrics, + &mut seed, )?; let candidate_backing_subsystem = spawn( @@ -1306,6 +1309,7 @@ where &mut running_subsystems_rx, all_subsystems.candidate_backing, &metrics, + &mut seed, )?; let candidate_selection_subsystem = spawn( @@ -1314,6 +1318,7 @@ where &mut running_subsystems_rx, all_subsystems.candidate_selection, &metrics, + &mut seed, )?; let statement_distribution_subsystem = spawn( @@ -1322,6 +1327,7 @@ where &mut running_subsystems_rx, all_subsystems.statement_distribution, &metrics, + &mut seed, )?; let availability_distribution_subsystem = spawn( @@ -1330,6 +1336,7 @@ where &mut running_subsystems_rx, all_subsystems.availability_distribution, &metrics, + &mut seed, )?; let bitfield_signing_subsystem = spawn( @@ -1338,6 +1345,7 @@ where &mut running_subsystems_rx, all_subsystems.bitfield_signing, &metrics, + &mut seed, )?; let bitfield_distribution_subsystem = spawn( @@ -1346,6 +1354,7 @@ where &mut running_subsystems_rx, all_subsystems.bitfield_distribution, &metrics, + &mut seed, )?; let provisioner_subsystem = spawn( @@ -1354,6 +1363,7 @@ where &mut running_subsystems_rx, all_subsystems.provisioner, &metrics, + &mut seed, )?; let pov_distribution_subsystem = spawn( @@ -1362,6 +1372,7 @@ where &mut running_subsystems_rx, all_subsystems.pov_distribution, &metrics, + &mut seed, )?; let runtime_api_subsystem = spawn( @@ -1370,6 +1381,7 @@ where &mut running_subsystems_rx, all_subsystems.runtime_api, &metrics, + &mut seed, )?; let availability_store_subsystem = spawn( @@ -1378,6 +1390,7 @@ where &mut running_subsystems_rx, all_subsystems.availability_store, &metrics, + &mut seed, )?; let network_bridge_subsystem = spawn( @@ -1386,6 +1399,7 @@ where &mut running_subsystems_rx, all_subsystems.network_bridge, &metrics, + &mut seed, )?; let chain_api_subsystem = spawn( @@ -1394,6 +1408,7 @@ where &mut running_subsystems_rx, all_subsystems.chain_api, &metrics, + &mut seed, )?; let collation_generation_subsystem = spawn( @@ -1402,6 +1417,7 @@ where &mut running_subsystems_rx, all_subsystems.collation_generation, &metrics, + &mut seed, )?; @@ -1411,6 +1427,7 @@ where &mut running_subsystems_rx, all_subsystems.collator_protocol, &metrics, + &mut seed, )?; let leaves = leaves @@ -1748,12 +1765,16 @@ fn spawn( streams: &mut StreamUnordered>>, s: impl Subsystem>, metrics: &Metrics, + seed: &mut u64, ) -> SubsystemResult> { let (to_tx, to_rx) = mpsc::channel(CHANNEL_CAPACITY); let (from_tx, from_rx) = mpsc::channel(CHANNEL_CAPACITY); - let ctx = OverseerSubsystemContext::new(to_rx, from_tx, metrics.clone(), MESSAGE_TIMER_METRIC_CAPTURE_RATE); + let ctx = OverseerSubsystemContext::new(to_rx, from_tx, metrics.clone(), *seed, MESSAGE_TIMER_METRIC_CAPTURE_RATE); let SpawnedSubsystem { future, name } = s.start(ctx); + // increment the seed now that it's been used, so the next context will have its own distinct RNG + *seed += 1; + let (tx, rx) = oneshot::channel(); let fut = Box::pin(async move {