diff --git a/node/network/statement-distribution/src/lib.rs b/node/network/statement-distribution/src/lib.rs index d8afe8341bb0..04517477a38c 100644 --- a/node/network/statement-distribution/src/lib.rs +++ b/node/network/statement-distribution/src/lib.rs @@ -312,7 +312,9 @@ impl PeerData { fingerprint: &(CompactStatement, ValidatorIndex), max_message_count: usize, ) -> Result { - self.view_knowledge.get_mut(relay_parent).ok_or(COST_UNEXPECTED_STATEMENT)? + self.view_knowledge + .get_mut(relay_parent) + .ok_or(COST_UNEXPECTED_STATEMENT)? .receive(fingerprint, max_message_count) } } @@ -492,13 +494,15 @@ fn check_statement_signature( .and_then(|v| statement.check_signature(&signing_context, v)) } +type StatementListeners = Vec>; + /// Informs all registered listeners about a newly received statement. /// /// Removes all closed listeners. #[tracing::instrument(level = "trace", skip(listeners), fields(subsystem = LOG_TARGET))] async fn inform_statement_listeners( statement: &SignedFullStatement, - listeners: &mut Vec>, + listeners: &mut StatementListeners, ) { // Ignore the errors since these will be removed later. stream::iter(listeners.iter_mut()).for_each_concurrent( @@ -524,36 +528,38 @@ async fn circulate_statement_and_dependents( statement: SignedFullStatement, metrics: &Metrics, ) { - if let Some(active_head)= active_heads.get_mut(&relay_parent) { - - // First circulate the statement directly to all peers needing it. - // The borrow of `active_head` needs to encompass only this (Rust) statement. - let outputs: Option<(CandidateHash, Vec)> = { - match active_head.note_statement(statement) { - NotedStatement::Fresh(stored) => Some(( - *stored.compact().candidate_hash(), - circulate_statement(peers, ctx, relay_parent, stored).await, - )), - _ => None, - } - }; + let active_head = match active_heads.get_mut(&relay_parent) { + Some(res) => res, + None => return, + }; - // Now send dependent statements to all peers needing them, if any. - if let Some((candidate_hash, peers_needing_dependents)) = outputs { - for peer in peers_needing_dependents { - if let Some(peer_data) = peers.get_mut(&peer) { - // defensive: the peer data should always be some because the iterator - // of peers is derived from the set of peers. - send_statements_about( - peer, - peer_data, - ctx, - relay_parent, - candidate_hash, - &*active_head, - metrics, - ).await; - } + // First circulate the statement directly to all peers needing it. + // The borrow of `active_head` needs to encompass only this (Rust) statement. + let outputs: Option<(CandidateHash, Vec)> = { + match active_head.note_statement(statement) { + NotedStatement::Fresh(stored) => Some(( + *stored.compact().candidate_hash(), + circulate_statement(peers, ctx, relay_parent, stored).await, + )), + _ => None, + } + }; + + // Now send dependent statements to all peers needing them, if any. + if let Some((candidate_hash, peers_needing_dependents)) = outputs { + for peer in peers_needing_dependents { + if let Some(peer_data) = peers.get_mut(&peer) { + // defensive: the peer data should always be some because the iterator + // of peers is derived from the set of peers. + send_statements_about( + peer, + peer_data, + ctx, + relay_parent, + candidate_hash, + &*active_head, + metrics, + ).await; } } } @@ -679,6 +685,7 @@ async fn handle_incoming_message<'a>( ctx: &mut impl SubsystemContext, message: protocol_v1::StatementDistributionMessage, metrics: &Metrics, + statement_listeners: &mut StatementListeners, ) -> Option<(Hash, &'a StoredStatement)> { let (relay_parent, statement) = match message { protocol_v1::StatementDistributionMessage::Statement(r, s) => (r, s), @@ -735,6 +742,8 @@ async fn handle_incoming_message<'a>( Ok(false) => {} } + inform_statement_listeners(&statement, statement_listeners).await; + // Note: `peer_data.receive` already ensures that the statement is not an unbounded equivocation // or unpinned to a seconded candidate. So it is safe to place it into the storage. match active_head.note_statement(statement) { @@ -794,6 +803,7 @@ async fn handle_network_update( our_view: &mut View, update: NetworkBridgeEvent, metrics: &Metrics, + statement_listeners: &mut StatementListeners, ) { match update { NetworkBridgeEvent::PeerConnected(peer, _role) => { @@ -816,6 +826,7 @@ async fn handle_network_update( ctx, message, metrics, + statement_listeners, ).await; if let Some((relay_parent, new)) = new_stored { @@ -875,7 +886,7 @@ impl StatementDistribution { let mut peers: HashMap = HashMap::new(); let mut our_view = View::default(); let mut active_heads: HashMap = HashMap::new(); - let mut statement_listeners: Vec> = Vec::new(); + let mut statement_listeners = StatementListeners::new(); let metrics = self.metrics; loop { @@ -958,6 +969,7 @@ impl StatementDistribution { &mut our_view, event, &metrics, + &mut statement_listeners, ).await; } StatementDistributionMessage::RegisterStatementListener(tx) => {