Skip to content

Commit fc8f1a4

Browse files
authored
Attempt to publish to at least mesh_n peers (#5357)
* Code improvements * Fix gossipsub tests * Merge latest unstable * Differentiate errors and better scoring * Attempt to publish to mesh_n peers
1 parent b961457 commit fc8f1a4

File tree

3 files changed

+37
-7
lines changed

3 files changed

+37
-7
lines changed

beacon_node/lighthouse_network/src/gossipsub/behaviour.rs

Lines changed: 32 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -635,9 +635,33 @@ where
635635
|| !self.score_below_threshold(p, |ts| ts.publish_threshold).0
636636
}));
637637
} else {
638-
match self.mesh.get(&raw_message.topic) {
638+
match self.mesh.get(&topic_hash) {
639639
// Mesh peers
640640
Some(mesh_peers) => {
641+
// We have a mesh set. We want to make sure to publish to at least `mesh_n`
642+
// peers (if possible).
643+
let needed_extra_peers = self.config.mesh_n().saturating_sub(mesh_peers.len());
644+
645+
if needed_extra_peers > 0 {
646+
// We don't have `mesh_n` peers in our mesh, we will randomly select extras
647+
// and publish to them.
648+
649+
// Get a random set of peers that are appropriate to send messages too.
650+
let peer_list = get_random_peers(
651+
&self.connected_peers,
652+
&topic_hash,
653+
needed_extra_peers,
654+
|peer| {
655+
!mesh_peers.contains(peer)
656+
&& !self.explicit_peers.contains(peer)
657+
&& !self
658+
.score_below_threshold(peer, |pst| pst.publish_threshold)
659+
.0
660+
},
661+
);
662+
recipient_peers.extend(peer_list);
663+
}
664+
641665
recipient_peers.extend(mesh_peers);
642666
}
643667
// Gossipsub peers
@@ -729,10 +753,14 @@ where
729753
}
730754
}
731755

732-
if publish_failed {
756+
if recipient_peers.is_empty() {
733757
return Err(PublishError::InsufficientPeers);
734758
}
735759

760+
if publish_failed {
761+
return Err(PublishError::AllQueuesFull(recipient_peers.len()));
762+
}
763+
736764
tracing::debug!(message=%msg_id, "Published message");
737765

738766
if let Some(metrics) = self.metrics.as_mut() {
@@ -2203,10 +2231,9 @@ where
22032231
if outbound <= self.config.mesh_outbound_min() {
22042232
// do not remove anymore outbound peers
22052233
continue;
2206-
} else {
2207-
// an outbound peer gets removed
2208-
outbound -= 1;
22092234
}
2235+
// an outbound peer gets removed
2236+
outbound -= 1;
22102237
}
22112238

22122239
// remove the peer

beacon_node/lighthouse_network/src/gossipsub/behaviour/tests.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -741,8 +741,8 @@ fn test_publish_without_flood_publishing() {
741741
let config: Config = Config::default();
742742
assert_eq!(
743743
publishes.len(),
744-
config.mesh_n_low(),
745-
"Should send a publish message to all known peers"
744+
config.mesh_n(),
745+
"Should send a publish message to at least mesh_n peers"
746746
);
747747

748748
assert!(

beacon_node/lighthouse_network/src/gossipsub/error.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,9 @@ pub enum PublishError {
3636
MessageTooLarge,
3737
/// The compression algorithm failed.
3838
TransformFailed(std::io::Error),
39+
/// Messages could not be sent because all queues for peers were full. The usize represents the
40+
/// number of peers that have full queues.
41+
AllQueuesFull(usize),
3942
}
4043

4144
impl std::fmt::Display for PublishError {

0 commit comments

Comments
 (0)