Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions protocols/gossipsub/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
## 0.50.0
- Prevent mesh exceeding mesh_n_high when handling GRAFT messages.
See [PR 6184](https://github.com/libp2p/rust-libp2p/pull/6184)

- Fix underflow when shuffling peers after prunning.
See [PR 6183](https://github.com/libp2p/rust-libp2p/pull/6183)

Expand Down
9 changes: 2 additions & 7 deletions protocols/gossipsub/src/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1366,8 +1366,6 @@ where
tracing::error!(peer_id = %peer_id, "Peer non-existent when handling graft");
return;
};
// Needs to be here to comply with the borrow checker.
let is_outbound = connected_peer.outbound;

// For each topic, if a peer has grafted us, then we necessarily must be in their mesh
// and they must be subscribed to the topic. Ensure we have recorded the mapping.
Expand Down Expand Up @@ -1419,8 +1417,6 @@ where
peer_score.add_penalty(peer_id, 1);

// check the flood cutoff
// See: https://github.com/rust-lang/rust-clippy/issues/10061
#[allow(unknown_lints, clippy::unchecked_duration_subtraction)]
let flood_cutoff = (backoff_time
+ self.config.graft_flood_threshold())
- self.config.prune_backoff();
Expand Down Expand Up @@ -1455,10 +1451,9 @@ where
}

// check mesh upper bound and only allow graft if the upper bound is not reached
// or if it is an outbound peer
let mesh_n_high = self.config.mesh_n_high_for_topic(&topic_hash);

if peers.len() >= mesh_n_high && !is_outbound {
if peers.len() >= mesh_n_high {
to_prune_topics.insert(topic_hash.clone());
continue;
}
Expand Down Expand Up @@ -2208,7 +2203,7 @@ where
}

// too many peers - remove some
if peers.len() > mesh_n_high {
if peers.len() >= mesh_n_high {
tracing::debug!(
topic=%topic_hash,
"HEARTBEAT: Mesh high. Topic contains: {} will reduce to: {}",
Expand Down
176 changes: 78 additions & 98 deletions protocols/gossipsub/src/behaviour/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2340,59 +2340,23 @@ fn test_gossip_to_at_most_gossip_factor_peers() {
);
}

#[test]
fn test_accept_only_outbound_peer_grafts_when_mesh_full() {
let config: Config = Config::default();

// enough peers to fill the mesh
let (mut gs, peers, _, topics) = inject_nodes1()
.peer_no(config.mesh_n_high())
.topics(vec!["test".into()])
.to_subscribe(true)
.create_network();

// graft all the peers => this will fill the mesh
for peer in peers {
gs.handle_graft(&peer, topics.clone());
}

// assert current mesh size
assert_eq!(gs.mesh[&topics[0]].len(), config.mesh_n_high());

// create an outbound and an inbound peer
let (inbound, _in_queue) = add_peer(&mut gs, &topics, false, false);
let (outbound, _out_queue) = add_peer(&mut gs, &topics, true, false);

// send grafts
gs.handle_graft(&inbound, vec![topics[0].clone()]);
gs.handle_graft(&outbound, vec![topics[0].clone()]);

// assert mesh size
assert_eq!(gs.mesh[&topics[0]].len(), config.mesh_n_high() + 1);

// inbound is not in mesh
assert!(!gs.mesh[&topics[0]].contains(&inbound));

// outbound is in mesh
assert!(gs.mesh[&topics[0]].contains(&outbound));
}

#[test]
fn test_do_not_remove_too_many_outbound_peers() {
// use an extreme case to catch errors with high probability
let m = 50;
let n = 2 * m;
let mesh_n = 50;
let mesh_n_high = 2 * mesh_n;
let config = ConfigBuilder::default()
.mesh_n_high(n)
.mesh_n(n)
.mesh_n_low(n)
.mesh_outbound_min(m)
.mesh_n_high(mesh_n_high)
.mesh_n(mesh_n)
// Irrelevant for this test.
.mesh_n_low(mesh_n)
.mesh_outbound_min(mesh_n)
.build()
.unwrap();

// fill the mesh with inbound connections
let (mut gs, peers, _queues, topics) = inject_nodes1()
.peer_no(n)
.peer_no(mesh_n)
.topics(vec!["test".into()])
.to_subscribe(true)
.gs_config(config)
Expand All @@ -2405,60 +2369,26 @@ fn test_do_not_remove_too_many_outbound_peers() {

// create m outbound connections and graft (we will accept the graft)
let mut outbound = HashSet::new();
for _ in 0..m {
// Go from 50 (mesh_n) to 100 (mesh_n_high) to trigger prunning.
for _ in 0..mesh_n {
let (peer, _) = add_peer(&mut gs, &topics, true, false);
outbound.insert(peer);
gs.handle_graft(&peer, topics.clone());
}

// mesh is overly full
assert_eq!(gs.mesh.get(&topics[0]).unwrap().len(), n + m);
assert_eq!(gs.mesh.get(&topics[0]).unwrap().len(), mesh_n_high);

// run a heartbeat
gs.heartbeat();

// Peers should be removed to reach n
assert_eq!(gs.mesh.get(&topics[0]).unwrap().len(), n);
// Peers should be removed to reach `mesh_n`
assert_eq!(gs.mesh.get(&topics[0]).unwrap().len(), mesh_n);

// all outbound peers are still in the mesh
assert!(outbound.iter().all(|p| gs.mesh[&topics[0]].contains(p)));
}

#[test]
fn test_add_outbound_peers_if_min_is_not_satisfied() {
let config: Config = Config::default();

// Fill full mesh with inbound peers
let (mut gs, peers, _, topics) = inject_nodes1()
.peer_no(config.mesh_n_high())
.topics(vec!["test".into()])
.to_subscribe(true)
.create_network();

// graft all the peers
for peer in peers {
gs.handle_graft(&peer, topics.clone());
}

// create config.mesh_outbound_min() many outbound connections without grafting
let mut peers = vec![];
for _ in 0..config.mesh_outbound_min() {
peers.push(add_peer(&mut gs, &topics, true, false));
}

// Nothing changed in the mesh yet
assert_eq!(gs.mesh[&topics[0]].len(), config.mesh_n_high());

// run a heartbeat
gs.heartbeat();

// The outbound peers got additionally added
assert_eq!(
gs.mesh[&topics[0]].len(),
config.mesh_n_high() + config.mesh_outbound_min()
);
}

#[test]
fn test_prune_negative_scored_peers() {
let config = Config::default();
Expand Down Expand Up @@ -3205,22 +3135,20 @@ fn test_keep_best_scoring_peers_on_oversubscription() {
.build()
.unwrap();

// build mesh with more peers than mesh can hold
let n = config.mesh_n_high() + 1;
let mesh_n_high = config.mesh_n_high();

let (mut gs, peers, _queues, topics) = inject_nodes1()
.peer_no(n)
.peer_no(mesh_n_high)
.topics(vec!["test".into()])
.to_subscribe(true)
.gs_config(config.clone())
.explicit(0)
.outbound(n)
.scoring(Some((
PeerScoreParams::default(),
PeerScoreThresholds::default(),
)))
.create_network();

// graft all, will be accepted since the are outbound
for peer in &peers {
gs.handle_graft(peer, topics.clone());
}
Expand All @@ -3232,7 +3160,7 @@ fn test_keep_best_scoring_peers_on_oversubscription() {
gs.set_application_score(peer, index as f64);
}

assert_eq!(gs.mesh[&topics[0]].len(), n);
assert_eq!(gs.mesh[&topics[0]].len(), mesh_n_high);

// heartbeat to prune some peers
gs.heartbeat();
Expand All @@ -3241,7 +3169,7 @@ fn test_keep_best_scoring_peers_on_oversubscription() {

// mesh contains retain_scores best peers
assert!(gs.mesh[&topics[0]].is_superset(
&peers[(n - config.retain_scores())..]
&peers[(mesh_n_high - config.retain_scores())..]
.iter()
.cloned()
.collect()
Expand Down Expand Up @@ -6118,10 +6046,13 @@ fn test_mesh_subtraction_with_topic_config() {
let topic = String::from("topic1");
let topic_hash = TopicHash::from_raw(topic.clone());

let mesh_n = 5;
let mesh_n_high = 7;

let topic_config = TopicMeshConfig {
mesh_n: 5,
mesh_n,
mesh_n_high,
mesh_n_low: 3,
mesh_n_high: 7,
mesh_outbound_min: 2,
};

Expand All @@ -6130,15 +6061,12 @@ fn test_mesh_subtraction_with_topic_config() {
.build()
.unwrap();

let peer_no = 12;

// make all outbound connections so grafting to all will be allowed
let (mut gs, peers, _, topics) = inject_nodes1()
.peer_no(peer_no)
.peer_no(mesh_n_high)
.topics(vec![topic])
.to_subscribe(true)
.gs_config(config.clone())
.outbound(peer_no)
.outbound(mesh_n_high)
.create_network();

// graft all peers
Expand All @@ -6148,7 +6076,7 @@ fn test_mesh_subtraction_with_topic_config() {

assert_eq!(
gs.mesh.get(&topics[0]).unwrap().len(),
peer_no,
mesh_n_high,
"Initially all peers should be in the mesh"
);

Expand All @@ -6163,6 +6091,58 @@ fn test_mesh_subtraction_with_topic_config() {
);
}

#[test]
fn test_mesh_subtraction_with_topic_config_min_outbound() {
let topic = String::from("topic1");
let topic_hash = TopicHash::from_raw(topic.clone());

let mesh_n = 5;
let mesh_n_high = 7;

let topic_config = TopicMeshConfig {
mesh_n,
mesh_n_high,
mesh_n_low: 3,
mesh_outbound_min: 7,
};

let config = ConfigBuilder::default()
.set_topic_config(topic_hash.clone(), topic_config)
.build()
.unwrap();

let peer_no = 12;

// make all outbound connections.
let (mut gs, peers, _, topics) = inject_nodes1()
.peer_no(peer_no)
.topics(vec![topic])
.to_subscribe(true)
.gs_config(config.clone())
.outbound(peer_no)
.create_network();

// graft all peers
for peer in peers {
gs.handle_graft(&peer, topics.clone());
}

assert_eq!(
gs.mesh.get(&topics[0]).unwrap().len(),
mesh_n_high,
"Initially mesh should be {mesh_n_high}"
);

// run a heartbeat
gs.heartbeat();

assert_eq!(
gs.mesh.get(&topics[0]).unwrap().len(),
mesh_n_high,
"After heartbeat, mesh should still be {mesh_n_high} as these are all outbound peers"
);
}

/// Test behavior with multiple topics having different configs
#[test]
fn test_multiple_topics_with_different_configs() {
Expand Down
Loading