Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions protocols/gossipsub/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
## 0.50.0
- Prevent mesh exceeding mesh_n_high.
See [PR 6184](https://github.com/libp2p/rust-libp2p/pull/6184)

- Fix underflow when shuffling peers after prunning.
See [PR 6183](https://github.com/libp2p/rust-libp2p/pull/6183)

Expand Down
9 changes: 2 additions & 7 deletions protocols/gossipsub/src/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1366,8 +1366,6 @@ where
tracing::error!(peer_id = %peer_id, "Peer non-existent when handling graft");
return;
};
// Needs to be here to comply with the borrow checker.
let is_outbound = connected_peer.outbound;

// For each topic, if a peer has grafted us, then we necessarily must be in their mesh
// and they must be subscribed to the topic. Ensure we have recorded the mapping.
Expand Down Expand Up @@ -1419,8 +1417,6 @@ where
peer_score.add_penalty(peer_id, 1);

// check the flood cutoff
// See: https://github.com/rust-lang/rust-clippy/issues/10061
#[allow(unknown_lints, clippy::unchecked_duration_subtraction)]
let flood_cutoff = (backoff_time
+ self.config.graft_flood_threshold())
- self.config.prune_backoff();
Expand Down Expand Up @@ -1455,10 +1451,9 @@ where
}

// check mesh upper bound and only allow graft if the upper bound is not reached
// or if it is an outbound peer
let mesh_n_high = self.config.mesh_n_high_for_topic(&topic_hash);

if peers.len() >= mesh_n_high && !is_outbound {
if peers.len() >= mesh_n_high {
to_prune_topics.insert(topic_hash.clone());
continue;
}
Expand Down Expand Up @@ -2208,7 +2203,7 @@ where
}

// too many peers - remove some
if peers.len() > mesh_n_high {
if peers.len() >= mesh_n_high {
tracing::debug!(
topic=%topic_hash,
"HEARTBEAT: Mesh high. Topic contains: {} will reduce to: {}",
Expand Down
178 changes: 80 additions & 98 deletions protocols/gossipsub/src/behaviour/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2340,59 +2340,23 @@ fn test_gossip_to_at_most_gossip_factor_peers() {
);
}

#[test]
fn test_accept_only_outbound_peer_grafts_when_mesh_full() {
let config: Config = Config::default();

// enough peers to fill the mesh
let (mut gs, peers, _, topics) = inject_nodes1()
.peer_no(config.mesh_n_high())
.topics(vec!["test".into()])
.to_subscribe(true)
.create_network();

// graft all the peers => this will fill the mesh
for peer in peers {
gs.handle_graft(&peer, topics.clone());
}

// assert current mesh size
assert_eq!(gs.mesh[&topics[0]].len(), config.mesh_n_high());

// create an outbound and an inbound peer
let (inbound, _in_queue) = add_peer(&mut gs, &topics, false, false);
let (outbound, _out_queue) = add_peer(&mut gs, &topics, true, false);

// send grafts
gs.handle_graft(&inbound, vec![topics[0].clone()]);
gs.handle_graft(&outbound, vec![topics[0].clone()]);

// assert mesh size
assert_eq!(gs.mesh[&topics[0]].len(), config.mesh_n_high() + 1);

// inbound is not in mesh
assert!(!gs.mesh[&topics[0]].contains(&inbound));

// outbound is in mesh
assert!(gs.mesh[&topics[0]].contains(&outbound));
}

#[test]
fn test_do_not_remove_too_many_outbound_peers() {
// use an extreme case to catch errors with high probability
let m = 50;
let n = 2 * m;
let mesh_n = 50;
let mesh_n_high = 2 * mesh_n;
let config = ConfigBuilder::default()
.mesh_n_high(n)
.mesh_n(n)
.mesh_n_low(n)
.mesh_outbound_min(m)
.mesh_n_high(mesh_n_high)
.mesh_n(mesh_n)
// Irrelevant for this test.
.mesh_n_low(mesh_n)
.mesh_outbound_min(mesh_n)
.build()
.unwrap();

// fill the mesh with inbound connections
let (mut gs, peers, _queues, topics) = inject_nodes1()
.peer_no(n)
.peer_no(mesh_n)
.topics(vec!["test".into()])
.to_subscribe(true)
.gs_config(config)
Expand All @@ -2405,60 +2369,26 @@ fn test_do_not_remove_too_many_outbound_peers() {

// create m outbound connections and graft (we will accept the graft)
let mut outbound = HashSet::new();
for _ in 0..m {
// Go from 50 (mesh_n) to 100 (mesh_n_high) to trigger prunning.
for _ in 0..mesh_n {
let (peer, _) = add_peer(&mut gs, &topics, true, false);
outbound.insert(peer);
gs.handle_graft(&peer, topics.clone());
}

// mesh is overly full
assert_eq!(gs.mesh.get(&topics[0]).unwrap().len(), n + m);
assert_eq!(gs.mesh.get(&topics[0]).unwrap().len(), mesh_n_high);

// run a heartbeat
gs.heartbeat();

// Peers should be removed to reach n
assert_eq!(gs.mesh.get(&topics[0]).unwrap().len(), n);
// Peers should be removed to reach `mesh_n`
assert_eq!(gs.mesh.get(&topics[0]).unwrap().len(), mesh_n);

// all outbound peers are still in the mesh
assert!(outbound.iter().all(|p| gs.mesh[&topics[0]].contains(p)));
}

#[test]
fn test_add_outbound_peers_if_min_is_not_satisfied() {
let config: Config = Config::default();

// Fill full mesh with inbound peers
let (mut gs, peers, _, topics) = inject_nodes1()
.peer_no(config.mesh_n_high())
.topics(vec!["test".into()])
.to_subscribe(true)
.create_network();

// graft all the peers
for peer in peers {
gs.handle_graft(&peer, topics.clone());
}

// create config.mesh_outbound_min() many outbound connections without grafting
let mut peers = vec![];
for _ in 0..config.mesh_outbound_min() {
peers.push(add_peer(&mut gs, &topics, true, false));
}

// Nothing changed in the mesh yet
assert_eq!(gs.mesh[&topics[0]].len(), config.mesh_n_high());

// run a heartbeat
gs.heartbeat();

// The outbound peers got additionally added
assert_eq!(
gs.mesh[&topics[0]].len(),
config.mesh_n_high() + config.mesh_outbound_min()
);
}

#[test]
fn test_prune_negative_scored_peers() {
let config = Config::default();
Expand Down Expand Up @@ -3205,22 +3135,20 @@ fn test_keep_best_scoring_peers_on_oversubscription() {
.build()
.unwrap();

// build mesh with more peers than mesh can hold
let n = config.mesh_n_high() + 1;
let mesh_n_high = config.mesh_n_high();

let (mut gs, peers, _queues, topics) = inject_nodes1()
.peer_no(n)
.peer_no(mesh_n_high)
.topics(vec!["test".into()])
.to_subscribe(true)
.gs_config(config.clone())
.explicit(0)
.outbound(n)
.scoring(Some((
PeerScoreParams::default(),
PeerScoreThresholds::default(),
)))
.create_network();

// graft all, will be accepted since the are outbound
for peer in &peers {
gs.handle_graft(peer, topics.clone());
}
Expand All @@ -3232,7 +3160,7 @@ fn test_keep_best_scoring_peers_on_oversubscription() {
gs.set_application_score(peer, index as f64);
}

assert_eq!(gs.mesh[&topics[0]].len(), n);
assert_eq!(gs.mesh[&topics[0]].len(), mesh_n_high);

// heartbeat to prune some peers
gs.heartbeat();
Expand All @@ -3241,7 +3169,7 @@ fn test_keep_best_scoring_peers_on_oversubscription() {

// mesh contains retain_scores best peers
assert!(gs.mesh[&topics[0]].is_superset(
&peers[(n - config.retain_scores())..]
&peers[(mesh_n_high - config.retain_scores())..]
.iter()
.cloned()
.collect()
Expand Down Expand Up @@ -6118,10 +6046,13 @@ fn test_mesh_subtraction_with_topic_config() {
let topic = String::from("topic1");
let topic_hash = TopicHash::from_raw(topic.clone());

let mesh_n = 5;
let mesh_n_high = 7;

let topic_config = TopicMeshConfig {
mesh_n: 5,
mesh_n,
mesh_n_high,
mesh_n_low: 3,
mesh_n_high: 7,
mesh_outbound_min: 2,
};

Expand All @@ -6130,15 +6061,12 @@ fn test_mesh_subtraction_with_topic_config() {
.build()
.unwrap();

let peer_no = 12;

// make all outbound connections so grafting to all will be allowed
let (mut gs, peers, _, topics) = inject_nodes1()
.peer_no(peer_no)
.peer_no(mesh_n_high)
.topics(vec![topic])
.to_subscribe(true)
.gs_config(config.clone())
.outbound(peer_no)
.outbound(mesh_n_high)
.create_network();

// graft all peers
Expand All @@ -6148,7 +6076,7 @@ fn test_mesh_subtraction_with_topic_config() {

assert_eq!(
gs.mesh.get(&topics[0]).unwrap().len(),
peer_no,
mesh_n_high,
"Initially all peers should be in the mesh"
);

Expand All @@ -6163,6 +6091,60 @@ fn test_mesh_subtraction_with_topic_config() {
);
}

/// Tests that if a mesh reaches `mesh_n_high`,
/// but is only composed of outbound peers, it is not reduced to `mesh_n`.
#[test]
fn test_mesh_subtraction_with_topic_config_min_outbound() {
let topic = String::from("topic1");
let topic_hash = TopicHash::from_raw(topic.clone());

let mesh_n = 5;
let mesh_n_high = 7;

let topic_config = TopicMeshConfig {
mesh_n,
mesh_n_high,
mesh_n_low: 3,
mesh_outbound_min: 7,
};

let config = ConfigBuilder::default()
.set_topic_config(topic_hash.clone(), topic_config)
.build()
.unwrap();

let peer_no = 12;

// make all outbound connections.
let (mut gs, peers, _, topics) = inject_nodes1()
.peer_no(peer_no)
.topics(vec![topic])
.to_subscribe(true)
.gs_config(config.clone())
.outbound(peer_no)
.create_network();

// graft all peers
for peer in peers {
gs.handle_graft(&peer, topics.clone());
}

assert_eq!(
gs.mesh.get(&topics[0]).unwrap().len(),
mesh_n_high,
"Initially mesh should be {mesh_n_high}"
);

// run a heartbeat
gs.heartbeat();

assert_eq!(
gs.mesh.get(&topics[0]).unwrap().len(),
mesh_n_high,
"After heartbeat, mesh should still be {mesh_n_high} as these are all outbound peers"
);
}

/// Test behavior with multiple topics having different configs
#[test]
fn test_multiple_topics_with_different_configs() {
Expand Down