diff --git a/node/network/statement-distribution/src/lib.rs b/node/network/statement-distribution/src/lib.rs index cbdc8e5845a0..d997f5a9005a 100644 --- a/node/network/statement-distribution/src/lib.rs +++ b/node/network/statement-distribution/src/lib.rs @@ -36,7 +36,7 @@ use polkadot_node_network_protocol::{ }; use futures::prelude::*; -use futures::channel::oneshot; +use futures::channel::{mpsc, oneshot}; use indexmap::IndexSet; use std::collections::{HashMap, HashSet}; @@ -476,6 +476,24 @@ fn check_statement_signature( .and_then(|v| statement.check_signature(&signing_context, v)) } +/// Informs all registered listeners about a newly received statement. +/// +/// Removes all closed listeners. +async fn inform_statement_listeners( + statement: &SignedFullStatement, + listeners: &mut Vec>, +) { + // Ignore the errors since these will be removed later. + stream::iter(listeners.iter_mut()).for_each_concurrent( + None, + |listener| async move { + let _ = listener.send(statement.clone()).await; + } + ).await; + // Remove any closed listeners. + listeners.retain(|tx| !tx.is_closed()); +} + /// Places the statement in storage if it is new, and then /// circulates the statement to all peers who have not seen it yet, and /// sends all statements dependent on that statement to peers who could previously not receive @@ -821,6 +839,7 @@ async fn run( let mut peers: HashMap = HashMap::new(); let mut our_view = View::default(); let mut active_heads: HashMap = HashMap::new(); + let mut statement_listeners: Vec> = Vec::new(); loop { let message = ctx.recv().await?; @@ -874,14 +893,19 @@ async fn run( } FromOverseer::Signal(OverseerSignal::Conclude) => break, FromOverseer::Communication { msg } => match msg { - StatementDistributionMessage::Share(relay_parent, statement) => + StatementDistributionMessage::Share(relay_parent, statement) => { + inform_statement_listeners( + &statement, + &mut statement_listeners, + ).await; circulate_statement_and_dependents( &mut peers, &mut active_heads, &mut ctx, relay_parent, statement, - ).await?, + ).await?; + } StatementDistributionMessage::NetworkBridgeUpdateV1(event) => handle_network_update( &mut peers, @@ -890,6 +914,9 @@ async fn run( &mut our_view, event, ).await?, + StatementDistributionMessage::RegisterStatementListener(tx) => { + statement_listeners.push(tx); + } } } } diff --git a/node/subsystem/src/messages.rs b/node/subsystem/src/messages.rs index ad8b5a3bb11d..9db4e7e33c91 100644 --- a/node/subsystem/src/messages.rs +++ b/node/subsystem/src/messages.rs @@ -415,6 +415,8 @@ pub enum StatementDistributionMessage { Share(Hash, SignedFullStatement), /// Event from the network bridge. NetworkBridgeUpdateV1(NetworkBridgeEvent), + /// Register a listener for shared statements. + RegisterStatementListener(mpsc::Sender), } impl StatementDistributionMessage { @@ -423,6 +425,7 @@ impl StatementDistributionMessage { match self { Self::Share(hash, _) => Some(*hash), Self::NetworkBridgeUpdateV1(_) => None, + Self::RegisterStatementListener(_) => None, } } } diff --git a/roadmap/implementers-guide/src/types/overseer-protocol.md b/roadmap/implementers-guide/src/types/overseer-protocol.md index 3af9d0cac4fd..9d92e326a254 100644 --- a/roadmap/implementers-guide/src/types/overseer-protocol.md +++ b/roadmap/implementers-guide/src/types/overseer-protocol.md @@ -362,6 +362,8 @@ enum StatementDistributionMessage { /// The statement distribution subsystem assumes that the statement should be correctly /// signed. Share(Hash, SignedFullStatement), + /// Register a listener to be notified on any new statements. + RegisterStatementListener(ResponseChannel), } ```