diff --git a/protocols/gossipsub/CHANGELOG.md b/protocols/gossipsub/CHANGELOG.md index 940030a632b..b143b72c15c 100644 --- a/protocols/gossipsub/CHANGELOG.md +++ b/protocols/gossipsub/CHANGELOG.md @@ -1,4 +1,7 @@ ## 0.50.0 +- Prevent mesh exceeding mesh_n_high. + See [PR 6184](https://github.com/libp2p/rust-libp2p/pull/6184) + - Fix underflow when shuffling peers after prunning. See [PR 6183](https://github.com/libp2p/rust-libp2p/pull/6183) diff --git a/protocols/gossipsub/src/behaviour.rs b/protocols/gossipsub/src/behaviour.rs index 1b780ffba5e..3d516098a7c 100644 --- a/protocols/gossipsub/src/behaviour.rs +++ b/protocols/gossipsub/src/behaviour.rs @@ -1366,8 +1366,6 @@ where tracing::error!(peer_id = %peer_id, "Peer non-existent when handling graft"); return; }; - // Needs to be here to comply with the borrow checker. - let is_outbound = connected_peer.outbound; // For each topic, if a peer has grafted us, then we necessarily must be in their mesh // and they must be subscribed to the topic. Ensure we have recorded the mapping. @@ -1419,8 +1417,6 @@ where peer_score.add_penalty(peer_id, 1); // check the flood cutoff - // See: https://github.com/rust-lang/rust-clippy/issues/10061 - #[allow(unknown_lints, clippy::unchecked_duration_subtraction)] let flood_cutoff = (backoff_time + self.config.graft_flood_threshold()) - self.config.prune_backoff(); @@ -1455,10 +1451,9 @@ where } // check mesh upper bound and only allow graft if the upper bound is not reached - // or if it is an outbound peer let mesh_n_high = self.config.mesh_n_high_for_topic(&topic_hash); - if peers.len() >= mesh_n_high && !is_outbound { + if peers.len() >= mesh_n_high { to_prune_topics.insert(topic_hash.clone()); continue; } @@ -2208,7 +2203,7 @@ where } // too many peers - remove some - if peers.len() > mesh_n_high { + if peers.len() >= mesh_n_high { tracing::debug!( topic=%topic_hash, "HEARTBEAT: Mesh high. Topic contains: {} will reduce to: {}", diff --git a/protocols/gossipsub/src/behaviour/tests.rs b/protocols/gossipsub/src/behaviour/tests.rs index 7dd6c4f3689..debc05d9d2b 100644 --- a/protocols/gossipsub/src/behaviour/tests.rs +++ b/protocols/gossipsub/src/behaviour/tests.rs @@ -2340,59 +2340,23 @@ fn test_gossip_to_at_most_gossip_factor_peers() { ); } -#[test] -fn test_accept_only_outbound_peer_grafts_when_mesh_full() { - let config: Config = Config::default(); - - // enough peers to fill the mesh - let (mut gs, peers, _, topics) = inject_nodes1() - .peer_no(config.mesh_n_high()) - .topics(vec!["test".into()]) - .to_subscribe(true) - .create_network(); - - // graft all the peers => this will fill the mesh - for peer in peers { - gs.handle_graft(&peer, topics.clone()); - } - - // assert current mesh size - assert_eq!(gs.mesh[&topics[0]].len(), config.mesh_n_high()); - - // create an outbound and an inbound peer - let (inbound, _in_queue) = add_peer(&mut gs, &topics, false, false); - let (outbound, _out_queue) = add_peer(&mut gs, &topics, true, false); - - // send grafts - gs.handle_graft(&inbound, vec![topics[0].clone()]); - gs.handle_graft(&outbound, vec![topics[0].clone()]); - - // assert mesh size - assert_eq!(gs.mesh[&topics[0]].len(), config.mesh_n_high() + 1); - - // inbound is not in mesh - assert!(!gs.mesh[&topics[0]].contains(&inbound)); - - // outbound is in mesh - assert!(gs.mesh[&topics[0]].contains(&outbound)); -} - #[test] fn test_do_not_remove_too_many_outbound_peers() { // use an extreme case to catch errors with high probability - let m = 50; - let n = 2 * m; + let mesh_n = 50; + let mesh_n_high = 2 * mesh_n; let config = ConfigBuilder::default() - .mesh_n_high(n) - .mesh_n(n) - .mesh_n_low(n) - .mesh_outbound_min(m) + .mesh_n_high(mesh_n_high) + .mesh_n(mesh_n) + // Irrelevant for this test. + .mesh_n_low(mesh_n) + .mesh_outbound_min(mesh_n) .build() .unwrap(); // fill the mesh with inbound connections let (mut gs, peers, _queues, topics) = inject_nodes1() - .peer_no(n) + .peer_no(mesh_n) .topics(vec!["test".into()]) .to_subscribe(true) .gs_config(config) @@ -2405,60 +2369,26 @@ fn test_do_not_remove_too_many_outbound_peers() { // create m outbound connections and graft (we will accept the graft) let mut outbound = HashSet::new(); - for _ in 0..m { + // Go from 50 (mesh_n) to 100 (mesh_n_high) to trigger prunning. + for _ in 0..mesh_n { let (peer, _) = add_peer(&mut gs, &topics, true, false); outbound.insert(peer); gs.handle_graft(&peer, topics.clone()); } // mesh is overly full - assert_eq!(gs.mesh.get(&topics[0]).unwrap().len(), n + m); + assert_eq!(gs.mesh.get(&topics[0]).unwrap().len(), mesh_n_high); // run a heartbeat gs.heartbeat(); - // Peers should be removed to reach n - assert_eq!(gs.mesh.get(&topics[0]).unwrap().len(), n); + // Peers should be removed to reach `mesh_n` + assert_eq!(gs.mesh.get(&topics[0]).unwrap().len(), mesh_n); // all outbound peers are still in the mesh assert!(outbound.iter().all(|p| gs.mesh[&topics[0]].contains(p))); } -#[test] -fn test_add_outbound_peers_if_min_is_not_satisfied() { - let config: Config = Config::default(); - - // Fill full mesh with inbound peers - let (mut gs, peers, _, topics) = inject_nodes1() - .peer_no(config.mesh_n_high()) - .topics(vec!["test".into()]) - .to_subscribe(true) - .create_network(); - - // graft all the peers - for peer in peers { - gs.handle_graft(&peer, topics.clone()); - } - - // create config.mesh_outbound_min() many outbound connections without grafting - let mut peers = vec![]; - for _ in 0..config.mesh_outbound_min() { - peers.push(add_peer(&mut gs, &topics, true, false)); - } - - // Nothing changed in the mesh yet - assert_eq!(gs.mesh[&topics[0]].len(), config.mesh_n_high()); - - // run a heartbeat - gs.heartbeat(); - - // The outbound peers got additionally added - assert_eq!( - gs.mesh[&topics[0]].len(), - config.mesh_n_high() + config.mesh_outbound_min() - ); -} - #[test] fn test_prune_negative_scored_peers() { let config = Config::default(); @@ -3205,22 +3135,20 @@ fn test_keep_best_scoring_peers_on_oversubscription() { .build() .unwrap(); - // build mesh with more peers than mesh can hold - let n = config.mesh_n_high() + 1; + let mesh_n_high = config.mesh_n_high(); + let (mut gs, peers, _queues, topics) = inject_nodes1() - .peer_no(n) + .peer_no(mesh_n_high) .topics(vec!["test".into()]) .to_subscribe(true) .gs_config(config.clone()) .explicit(0) - .outbound(n) .scoring(Some(( PeerScoreParams::default(), PeerScoreThresholds::default(), ))) .create_network(); - // graft all, will be accepted since the are outbound for peer in &peers { gs.handle_graft(peer, topics.clone()); } @@ -3232,7 +3160,7 @@ fn test_keep_best_scoring_peers_on_oversubscription() { gs.set_application_score(peer, index as f64); } - assert_eq!(gs.mesh[&topics[0]].len(), n); + assert_eq!(gs.mesh[&topics[0]].len(), mesh_n_high); // heartbeat to prune some peers gs.heartbeat(); @@ -3241,7 +3169,7 @@ fn test_keep_best_scoring_peers_on_oversubscription() { // mesh contains retain_scores best peers assert!(gs.mesh[&topics[0]].is_superset( - &peers[(n - config.retain_scores())..] + &peers[(mesh_n_high - config.retain_scores())..] .iter() .cloned() .collect() @@ -6118,10 +6046,13 @@ fn test_mesh_subtraction_with_topic_config() { let topic = String::from("topic1"); let topic_hash = TopicHash::from_raw(topic.clone()); + let mesh_n = 5; + let mesh_n_high = 7; + let topic_config = TopicMeshConfig { - mesh_n: 5, + mesh_n, + mesh_n_high, mesh_n_low: 3, - mesh_n_high: 7, mesh_outbound_min: 2, }; @@ -6130,15 +6061,12 @@ fn test_mesh_subtraction_with_topic_config() { .build() .unwrap(); - let peer_no = 12; - - // make all outbound connections so grafting to all will be allowed let (mut gs, peers, _, topics) = inject_nodes1() - .peer_no(peer_no) + .peer_no(mesh_n_high) .topics(vec![topic]) .to_subscribe(true) .gs_config(config.clone()) - .outbound(peer_no) + .outbound(mesh_n_high) .create_network(); // graft all peers @@ -6148,7 +6076,7 @@ fn test_mesh_subtraction_with_topic_config() { assert_eq!( gs.mesh.get(&topics[0]).unwrap().len(), - peer_no, + mesh_n_high, "Initially all peers should be in the mesh" ); @@ -6163,6 +6091,60 @@ fn test_mesh_subtraction_with_topic_config() { ); } +/// Tests that if a mesh reaches `mesh_n_high`, +/// but is only composed of outbound peers, it is not reduced to `mesh_n`. +#[test] +fn test_mesh_subtraction_with_topic_config_min_outbound() { + let topic = String::from("topic1"); + let topic_hash = TopicHash::from_raw(topic.clone()); + + let mesh_n = 5; + let mesh_n_high = 7; + + let topic_config = TopicMeshConfig { + mesh_n, + mesh_n_high, + mesh_n_low: 3, + mesh_outbound_min: 7, + }; + + let config = ConfigBuilder::default() + .set_topic_config(topic_hash.clone(), topic_config) + .build() + .unwrap(); + + let peer_no = 12; + + // make all outbound connections. + let (mut gs, peers, _, topics) = inject_nodes1() + .peer_no(peer_no) + .topics(vec![topic]) + .to_subscribe(true) + .gs_config(config.clone()) + .outbound(peer_no) + .create_network(); + + // graft all peers + for peer in peers { + gs.handle_graft(&peer, topics.clone()); + } + + assert_eq!( + gs.mesh.get(&topics[0]).unwrap().len(), + mesh_n_high, + "Initially mesh should be {mesh_n_high}" + ); + + // run a heartbeat + gs.heartbeat(); + + assert_eq!( + gs.mesh.get(&topics[0]).unwrap().len(), + mesh_n_high, + "After heartbeat, mesh should still be {mesh_n_high} as these are all outbound peers" + ); +} + /// Test behavior with multiple topics having different configs #[test] fn test_multiple_topics_with_different_configs() {