diff --git a/node/overseer/src/lib.rs b/node/overseer/src/lib.rs index 76d967dc9ccd..661ef79b0636 100644 --- a/node/overseer/src/lib.rs +++ b/node/overseer/src/lib.rs @@ -68,7 +68,7 @@ use std::collections::{hash_map, HashMap}; use futures::channel::{mpsc, oneshot}; use futures::{ - pending, poll, select, + poll, select, future::BoxFuture, stream::{self, FuturesUnordered}, Future, FutureExt, SinkExt, StreamExt, @@ -384,6 +384,30 @@ struct OverseenSubsystem { instance: Option>, } +impl OverseenSubsystem { + /// Send a message to the wrapped subsystem. + /// + /// If the inner `instance` is `None`, nothing is happening. + async fn send_message(&mut self, msg: M) -> SubsystemResult<()> { + if let Some(ref mut instance) = self.instance { + instance.tx.send(FromOverseer::Communication { msg }).await?; + } + + Ok(()) + } + + /// Send a signal to the wrapped subsystem. + /// + /// If the inner `instance` is `None`, nothing is happening. + async fn send_signal(&mut self, signal: OverseerSignal) -> SubsystemResult<()> { + if let Some(ref mut instance) = self.instance { + instance.tx.send(FromOverseer::Signal(signal)).await?; + } + + Ok(()) + } +} + /// The `Overseer` itself. pub struct Overseer { /// A candidate validation subsystem. @@ -1240,65 +1264,21 @@ where // Stop the overseer. async fn stop(mut self) { - if let Some(ref mut s) = self.candidate_validation_subsystem.instance { - let _ = s.tx.send(FromOverseer::Signal(OverseerSignal::Conclude)).await; - } - - if let Some(ref mut s) = self.candidate_backing_subsystem.instance { - let _ = s.tx.send(FromOverseer::Signal(OverseerSignal::Conclude)).await; - } - - if let Some(ref mut s) = self.candidate_selection_subsystem.instance { - let _ = s.tx.send(FromOverseer::Signal(OverseerSignal::Conclude)).await; - } - - if let Some(ref mut s) = self.statement_distribution_subsystem.instance { - let _ = s.tx.send(FromOverseer::Signal(OverseerSignal::Conclude)).await; - } - - if let Some(ref mut s) = self.availability_distribution_subsystem.instance { - let _ = s.tx.send(FromOverseer::Signal(OverseerSignal::Conclude)).await; - } - - if let Some(ref mut s) = self.bitfield_signing_subsystem.instance { - let _ = s.tx.send(FromOverseer::Signal(OverseerSignal::Conclude)).await; - } - - if let Some(ref mut s) = self.bitfield_distribution_subsystem.instance { - let _ = s.tx.send(FromOverseer::Signal(OverseerSignal::Conclude)).await; - } - - if let Some(ref mut s) = self.provisioner_subsystem.instance { - let _ = s.tx.send(FromOverseer::Signal(OverseerSignal::Conclude)).await; - } - - if let Some(ref mut s) = self.pov_distribution_subsystem.instance { - let _ = s.tx.send(FromOverseer::Signal(OverseerSignal::Conclude)).await; - } - - if let Some(ref mut s) = self.runtime_api_subsystem.instance { - let _ = s.tx.send(FromOverseer::Signal(OverseerSignal::Conclude)).await; - } - - if let Some(ref mut s) = self.availability_store_subsystem.instance { - let _ = s.tx.send(FromOverseer::Signal(OverseerSignal::Conclude)).await; - } - - if let Some(ref mut s) = self.network_bridge_subsystem.instance { - let _ = s.tx.send(FromOverseer::Signal(OverseerSignal::Conclude)).await; - } - - if let Some(ref mut s) = self.chain_api_subsystem.instance { - let _ = s.tx.send(FromOverseer::Signal(OverseerSignal::Conclude)).await; - } - - if let Some(ref mut s) = self.collator_protocol_subsystem.instance { - let _ = s.tx.send(FromOverseer::Signal(OverseerSignal::Conclude)).await; - } - - if let Some(ref mut s) = self.collation_generation_subsystem.instance { - let _ = s.tx.send(FromOverseer::Signal(OverseerSignal::Conclude)).await; - } + let _ = self.candidate_validation_subsystem.send_signal(OverseerSignal::Conclude).await; + let _ = self.candidate_backing_subsystem.send_signal(OverseerSignal::Conclude).await; + let _ = self.candidate_selection_subsystem.send_signal(OverseerSignal::Conclude).await; + let _ = self.statement_distribution_subsystem.send_signal(OverseerSignal::Conclude).await; + let _ = self.availability_distribution_subsystem.send_signal(OverseerSignal::Conclude).await; + let _ = self.bitfield_signing_subsystem.send_signal(OverseerSignal::Conclude).await; + let _ = self.bitfield_distribution_subsystem.send_signal(OverseerSignal::Conclude).await; + let _ = self.provisioner_subsystem.send_signal(OverseerSignal::Conclude).await; + let _ = self.pov_distribution_subsystem.send_signal(OverseerSignal::Conclude).await; + let _ = self.runtime_api_subsystem.send_signal(OverseerSignal::Conclude).await; + let _ = self.availability_store_subsystem.send_signal(OverseerSignal::Conclude).await; + let _ = self.network_bridge_subsystem.send_signal(OverseerSignal::Conclude).await; + let _ = self.chain_api_subsystem.send_signal(OverseerSignal::Conclude).await; + let _ = self.collator_protocol_subsystem.send_signal(OverseerSignal::Conclude).await; + let _ = self.collation_generation_subsystem.send_signal(OverseerSignal::Conclude).await; let mut stop_delay = Delay::new(Duration::from_secs(STOP_DELAY)).fuse(); @@ -1318,10 +1298,9 @@ where /// Run the `Overseer`. #[tracing::instrument(skip(self), fields(subsystem = LOG_TARGET))] pub async fn run(mut self) -> SubsystemResult<()> { - let leaves = std::mem::take(&mut self.leaves); let mut update = ActiveLeavesUpdate::default(); - for (hash, number) in leaves.into_iter() { + for (hash, number) in std::mem::take(&mut self.leaves) { update.activated.push(hash); let _ = self.active_leaves.insert(hash, number); self.on_head_activated(&hash); @@ -1330,50 +1309,62 @@ where self.broadcast_signal(OverseerSignal::ActiveLeaves(update)).await?; loop { - while let Poll::Ready(Some(msg)) = poll!(&mut self.events_rx.next()) { - match msg { - Event::MsgToSubsystem(msg) => { - self.route_message(msg).await; - } - Event::Stop => { - self.stop().await; - return Ok(()); - } - Event::BlockImported(block) => { - self.block_imported(block).await?; - } - Event::BlockFinalized(block) => { - self.block_finalized(block).await?; - } - Event::ExternalRequest(request) => { - self.handle_external_request(request); - } - } - } - - while let Poll::Ready(Some((StreamYield::Item(msg), _))) = poll!( - &mut self.running_subsystems_rx.next() - ) { - match msg { - ToOverseer::SubsystemMessage(msg) => self.route_message(msg).await, - ToOverseer::SpawnJob { name, s } => { - self.spawn_job(name, s); + select! { + msg = self.events_rx.next().fuse() => { + let msg = if let Some(msg) = msg { + msg + } else { + continue + }; + + match msg { + Event::MsgToSubsystem(msg) => { + self.route_message(msg).await; + } + Event::Stop => { + self.stop().await; + return Ok(()); + } + Event::BlockImported(block) => { + self.block_imported(block).await?; + } + Event::BlockFinalized(block) => { + self.block_finalized(block).await?; + } + Event::ExternalRequest(request) => { + self.handle_external_request(request); + } } - ToOverseer::SpawnBlockingJob { name, s } => { - self.spawn_blocking_job(name, s); + }, + msg = self.running_subsystems_rx.next().fuse() => { + let msg = if let Some((StreamYield::Item(msg), _)) = msg { + msg + } else { + continue + }; + + match msg { + ToOverseer::SubsystemMessage(msg) => self.route_message(msg).await, + ToOverseer::SpawnJob { name, s } => { + self.spawn_job(name, s); + } + ToOverseer::SpawnBlockingJob { name, s } => { + self.spawn_blocking_job(name, s); + } } - } - } - - // Some subsystem exited? It's time to panic. - if let Poll::Ready(Some(finished)) = poll!(self.running_subsystems.next()) { - tracing::error!(target: LOG_TARGET, subsystem = ?finished, "subsystem finished unexpectedly"); - self.stop().await; - return finished; + }, + res = self.running_subsystems.next().fuse() => { + let finished = if let Some(finished) = res { + finished + } else { + continue + }; + + tracing::error!(target: LOG_TARGET, subsystem = ?finished, "subsystem finished unexpectedly"); + self.stop().await; + return finished; + }, } - - // Looks like nothing is left to be polled, let's take a break. - pending!(); } } @@ -1424,7 +1415,11 @@ where self.on_head_deactivated(deactivated) } - self.broadcast_signal(OverseerSignal::ActiveLeaves(update)).await?; + // Most of the time we have a leave already closed when it is finalized, so we check here if there are actually + // any updates before sending it to the subsystems. + if !update.is_empty() { + self.broadcast_signal(OverseerSignal::ActiveLeaves(update)).await?; + } self.broadcast_signal(OverseerSignal::BlockFinalized(block.hash)).await?; @@ -1433,65 +1428,21 @@ where #[tracing::instrument(level = "trace", skip(self), fields(subsystem = LOG_TARGET))] async fn broadcast_signal(&mut self, signal: OverseerSignal) -> SubsystemResult<()> { - if let Some(ref mut s) = self.candidate_validation_subsystem.instance { - s.tx.send(FromOverseer::Signal(signal.clone())).await?; - } - - if let Some(ref mut s) = self.candidate_backing_subsystem.instance { - s.tx.send(FromOverseer::Signal(signal.clone())).await?; - } - - if let Some(ref mut s) = self.candidate_selection_subsystem.instance { - s.tx.send(FromOverseer::Signal(signal.clone())).await?; - } - - if let Some(ref mut s) = self.statement_distribution_subsystem.instance { - s.tx.send(FromOverseer::Signal(signal.clone())).await?; - } - - if let Some(ref mut s) = self.availability_distribution_subsystem.instance { - s.tx.send(FromOverseer::Signal(signal.clone())).await?; - } - - if let Some(ref mut s) = self.bitfield_distribution_subsystem.instance { - s.tx.send(FromOverseer::Signal(signal.clone())).await?; - } - - if let Some(ref mut s) = self.bitfield_signing_subsystem.instance { - s.tx.send(FromOverseer::Signal(signal.clone())).await?; - } - - if let Some(ref mut s) = self.provisioner_subsystem.instance { - s.tx.send(FromOverseer::Signal(signal.clone())).await?; - } - - if let Some(ref mut s) = self.pov_distribution_subsystem.instance { - s.tx.send(FromOverseer::Signal(signal.clone())).await?; - } - - if let Some(ref mut s) = self.runtime_api_subsystem.instance { - s.tx.send(FromOverseer::Signal(signal.clone())).await?; - } - - if let Some(ref mut s) = self.availability_store_subsystem.instance { - s.tx.send(FromOverseer::Signal(signal.clone())).await?; - } - - if let Some(ref mut s) = self.network_bridge_subsystem.instance { - s.tx.send(FromOverseer::Signal(signal.clone())).await?; - } - - if let Some(ref mut s) = self.chain_api_subsystem.instance { - s.tx.send(FromOverseer::Signal(signal.clone())).await?; - } - - if let Some(ref mut s) = self.collator_protocol_subsystem.instance { - s.tx.send(FromOverseer::Signal(signal.clone())).await?; - } - - if let Some(ref mut s) = self.collation_generation_subsystem.instance { - s.tx.send(FromOverseer::Signal(signal.clone())).await?; - } + self.candidate_validation_subsystem.send_signal(signal.clone()).await?; + self.candidate_backing_subsystem.send_signal(signal.clone()).await?; + self.candidate_selection_subsystem.send_signal(signal.clone()).await?; + self.statement_distribution_subsystem.send_signal(signal.clone()).await?; + self.availability_distribution_subsystem.send_signal(signal.clone()).await?; + self.bitfield_signing_subsystem.send_signal(signal.clone()).await?; + self.bitfield_distribution_subsystem.send_signal(signal.clone()).await?; + self.provisioner_subsystem.send_signal(signal.clone()).await?; + self.pov_distribution_subsystem.send_signal(signal.clone()).await?; + self.runtime_api_subsystem.send_signal(signal.clone()).await?; + self.availability_store_subsystem.send_signal(signal.clone()).await?; + self.network_bridge_subsystem.send_signal(signal.clone()).await?; + self.chain_api_subsystem.send_signal(signal.clone()).await?; + self.collator_protocol_subsystem.send_signal(signal.clone()).await?; + self.collation_generation_subsystem.send_signal(signal).await?; Ok(()) } @@ -1501,80 +1452,50 @@ where self.metrics.on_message_relayed(); match msg { AllMessages::CandidateValidation(msg) => { - if let Some(ref mut s) = self.candidate_validation_subsystem.instance { - let _ = s.tx.send(FromOverseer::Communication { msg }).await; - } - } + let _ = self.candidate_validation_subsystem.send_message(msg).await; + }, AllMessages::CandidateBacking(msg) => { - if let Some(ref mut s) = self.candidate_backing_subsystem.instance { - let _ = s.tx.send(FromOverseer::Communication { msg }).await; - } - } + let _ = self.candidate_backing_subsystem.send_message(msg).await; + }, AllMessages::CandidateSelection(msg) => { - if let Some(ref mut s) = self.candidate_selection_subsystem.instance { - let _ = s.tx.send(FromOverseer::Communication { msg }).await; - } - } + let _ = self.candidate_selection_subsystem.send_message(msg).await; + }, AllMessages::StatementDistribution(msg) => { - if let Some(ref mut s) = self.statement_distribution_subsystem.instance { - let _ = s.tx.send(FromOverseer::Communication { msg }).await; - } - } + let _ = self.statement_distribution_subsystem.send_message(msg).await; + }, AllMessages::AvailabilityDistribution(msg) => { - if let Some(ref mut s) = self.availability_distribution_subsystem.instance { - let _ = s.tx.send(FromOverseer::Communication { msg }).await; - } - } + let _ = self.availability_distribution_subsystem.send_message(msg).await; + }, AllMessages::BitfieldDistribution(msg) => { - if let Some(ref mut s) = self.bitfield_distribution_subsystem.instance { - let _ = s.tx.send(FromOverseer::Communication { msg }).await; - } - } + let _ = self.bitfield_distribution_subsystem.send_message(msg).await; + }, AllMessages::BitfieldSigning(msg) => { - if let Some(ref mut s) = self.bitfield_signing_subsystem.instance { - let _ = s.tx.send(FromOverseer::Communication{ msg }).await; - } - } + let _ = self.bitfield_signing_subsystem.send_message(msg).await; + }, AllMessages::Provisioner(msg) => { - if let Some(ref mut s) = self.provisioner_subsystem.instance { - let _ = s.tx.send(FromOverseer::Communication { msg }).await; - } - } + let _ = self.provisioner_subsystem.send_message(msg).await; + }, AllMessages::PoVDistribution(msg) => { - if let Some(ref mut s) = self.pov_distribution_subsystem.instance { - let _ = s.tx.send(FromOverseer::Communication { msg }).await; - } - } + let _ = self.pov_distribution_subsystem.send_message(msg).await; + }, AllMessages::RuntimeApi(msg) => { - if let Some(ref mut s) = self.runtime_api_subsystem.instance { - let _ = s.tx.send(FromOverseer::Communication { msg }).await; - } - } + let _ = self.runtime_api_subsystem.send_message(msg).await; + }, AllMessages::AvailabilityStore(msg) => { - if let Some(ref mut s) = self.availability_store_subsystem.instance { - let _ = s.tx.send(FromOverseer::Communication { msg }).await; - } - } + let _ = self.availability_store_subsystem.send_message(msg).await; + }, AllMessages::NetworkBridge(msg) => { - if let Some(ref mut s) = self.network_bridge_subsystem.instance { - let _ = s.tx.send(FromOverseer::Communication { msg }).await; - } - } + let _ = self.network_bridge_subsystem.send_message(msg).await; + }, AllMessages::ChainApi(msg) => { - if let Some(ref mut s) = self.chain_api_subsystem.instance { - let _ = s.tx.send(FromOverseer::Communication { msg }).await; - } - } + let _ = self.chain_api_subsystem.send_message(msg).await; + }, AllMessages::CollationGeneration(msg) => { - if let Some(ref mut s) = self.collation_generation_subsystem.instance { - let _ = s.tx.send(FromOverseer::Communication { msg }).await; - } - } + let _ = self.collation_generation_subsystem.send_message(msg).await; + }, AllMessages::CollatorProtocol(msg) => { - if let Some(ref mut s) = self.collator_protocol_subsystem.instance { - let _ = s.tx.send(FromOverseer::Communication { msg }).await; - } - } + let _ = self.collator_protocol_subsystem.send_message(msg).await; + }, } } @@ -1671,7 +1592,7 @@ fn spawn( mod tests { use std::sync::atomic; use std::collections::HashMap; - use futures::{executor, pin_mut, select, channel::mpsc, FutureExt}; + use futures::{executor, pin_mut, select, channel::mpsc, FutureExt, pending}; use polkadot_primitives::v1::{BlockData, CollatorPair, PoV, CandidateHash}; use polkadot_subsystem::messages::RuntimeApiRequest; diff --git a/node/subsystem/src/lib.rs b/node/subsystem/src/lib.rs index 57f62649f14c..143c35d2c07c 100644 --- a/node/subsystem/src/lib.rs +++ b/node/subsystem/src/lib.rs @@ -58,12 +58,17 @@ pub struct ActiveLeavesUpdate { impl ActiveLeavesUpdate { /// Create a ActiveLeavesUpdate with a single activated hash pub fn start_work(hash: Hash) -> Self { - Self { activated: [hash].as_ref().into(), ..Default::default() } + Self { activated: [hash][..].into(), ..Default::default() } } /// Create a ActiveLeavesUpdate with a single deactivated hash pub fn stop_work(hash: Hash) -> Self { - Self { deactivated: [hash].as_ref().into(), ..Default::default() } + Self { deactivated: [hash][..].into(), ..Default::default() } + } + + /// Is this update empty and doesn't contain any information? + pub fn is_empty(&self) -> bool { + self.activated.is_empty() && self.deactivated.is_empty() } } @@ -72,9 +77,9 @@ impl PartialEq for ActiveLeavesUpdate { /// /// Instead, it means equality when `activated` and `deactivated` are considered as sets. fn eq(&self, other: &Self) -> bool { - use std::collections::HashSet; - self.activated.iter().collect::>() == other.activated.iter().collect::>() && - self.deactivated.iter().collect::>() == other.deactivated.iter().collect::>() + self.activated.len() == other.activated.len() && self.deactivated.len() == other.deactivated.len() + && self.activated.iter().all(|a| other.activated.contains(a)) + && self.deactivated.iter().all(|a| other.deactivated.contains(a)) } }