Skip to content
This repository was archived by the owner on Nov 15, 2023. It is now read-only.
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 44 additions & 32 deletions node/network/statement-distribution/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -312,7 +312,9 @@ impl PeerData {
fingerprint: &(CompactStatement, ValidatorIndex),
max_message_count: usize,
) -> Result<bool, Rep> {
self.view_knowledge.get_mut(relay_parent).ok_or(COST_UNEXPECTED_STATEMENT)?
self.view_knowledge
.get_mut(relay_parent)
.ok_or(COST_UNEXPECTED_STATEMENT)?
.receive(fingerprint, max_message_count)
}
}
Expand Down Expand Up @@ -492,13 +494,15 @@ fn check_statement_signature(
.and_then(|v| statement.check_signature(&signing_context, v))
}

type StatementListeners = Vec<mpsc::Sender<SignedFullStatement>>;

/// Informs all registered listeners about a newly received statement.
///
/// Removes all closed listeners.
#[tracing::instrument(level = "trace", skip(listeners), fields(subsystem = LOG_TARGET))]
async fn inform_statement_listeners(
statement: &SignedFullStatement,
listeners: &mut Vec<mpsc::Sender<SignedFullStatement>>,
listeners: &mut StatementListeners,
) {
// Ignore the errors since these will be removed later.
stream::iter(listeners.iter_mut()).for_each_concurrent(
Expand All @@ -524,36 +528,38 @@ async fn circulate_statement_and_dependents(
statement: SignedFullStatement,
metrics: &Metrics,
) {
if let Some(active_head)= active_heads.get_mut(&relay_parent) {

// First circulate the statement directly to all peers needing it.
// The borrow of `active_head` needs to encompass only this (Rust) statement.
let outputs: Option<(CandidateHash, Vec<PeerId>)> = {
match active_head.note_statement(statement) {
NotedStatement::Fresh(stored) => Some((
*stored.compact().candidate_hash(),
circulate_statement(peers, ctx, relay_parent, stored).await,
)),
_ => None,
}
};
let active_head = match active_heads.get_mut(&relay_parent) {
Some(res) => res,
None => return,
};

// Now send dependent statements to all peers needing them, if any.
if let Some((candidate_hash, peers_needing_dependents)) = outputs {
for peer in peers_needing_dependents {
if let Some(peer_data) = peers.get_mut(&peer) {
// defensive: the peer data should always be some because the iterator
// of peers is derived from the set of peers.
send_statements_about(
peer,
peer_data,
ctx,
relay_parent,
candidate_hash,
&*active_head,
metrics,
).await;
}
// First circulate the statement directly to all peers needing it.
// The borrow of `active_head` needs to encompass only this (Rust) statement.
let outputs: Option<(CandidateHash, Vec<PeerId>)> = {
match active_head.note_statement(statement) {
NotedStatement::Fresh(stored) => Some((
*stored.compact().candidate_hash(),
circulate_statement(peers, ctx, relay_parent, stored).await,
)),
_ => None,
}
};

// Now send dependent statements to all peers needing them, if any.
if let Some((candidate_hash, peers_needing_dependents)) = outputs {
for peer in peers_needing_dependents {
if let Some(peer_data) = peers.get_mut(&peer) {
// defensive: the peer data should always be some because the iterator
// of peers is derived from the set of peers.
send_statements_about(
peer,
peer_data,
ctx,
relay_parent,
candidate_hash,
&*active_head,
metrics,
).await;
}
}
}
Expand Down Expand Up @@ -679,6 +685,7 @@ async fn handle_incoming_message<'a>(
ctx: &mut impl SubsystemContext<Message = StatementDistributionMessage>,
message: protocol_v1::StatementDistributionMessage,
metrics: &Metrics,
statement_listeners: &mut StatementListeners,
) -> Option<(Hash, &'a StoredStatement)> {
let (relay_parent, statement) = match message {
protocol_v1::StatementDistributionMessage::Statement(r, s) => (r, s),
Expand Down Expand Up @@ -735,6 +742,8 @@ async fn handle_incoming_message<'a>(
Ok(false) => {}
}

inform_statement_listeners(&statement, statement_listeners).await;

// Note: `peer_data.receive` already ensures that the statement is not an unbounded equivocation
// or unpinned to a seconded candidate. So it is safe to place it into the storage.
match active_head.note_statement(statement) {
Expand Down Expand Up @@ -794,6 +803,7 @@ async fn handle_network_update(
our_view: &mut View,
update: NetworkBridgeEvent<protocol_v1::StatementDistributionMessage>,
metrics: &Metrics,
statement_listeners: &mut StatementListeners,
) {
match update {
NetworkBridgeEvent::PeerConnected(peer, _role) => {
Expand All @@ -816,6 +826,7 @@ async fn handle_network_update(
ctx,
message,
metrics,
statement_listeners,
).await;

if let Some((relay_parent, new)) = new_stored {
Expand Down Expand Up @@ -875,7 +886,7 @@ impl StatementDistribution {
let mut peers: HashMap<PeerId, PeerData> = HashMap::new();
let mut our_view = View::default();
let mut active_heads: HashMap<Hash, ActiveHeadData> = HashMap::new();
let mut statement_listeners: Vec<mpsc::Sender<SignedFullStatement>> = Vec::new();
let mut statement_listeners = StatementListeners::new();
let metrics = self.metrics;

loop {
Expand Down Expand Up @@ -958,6 +969,7 @@ impl StatementDistribution {
&mut our_view,
event,
&metrics,
&mut statement_listeners,
).await;
}
StatementDistributionMessage::RegisterStatementListener(tx) => {
Expand Down