diff --git a/beacon_node/lighthouse_network/src/gossipsub/behaviour/tests.rs b/beacon_node/lighthouse_network/src/gossipsub/behaviour/tests.rs index 4e02e4016fa..ee7c1a005d3 100644 --- a/beacon_node/lighthouse_network/src/gossipsub/behaviour/tests.rs +++ b/beacon_node/lighthouse_network/src/gossipsub/behaviour/tests.rs @@ -95,7 +95,7 @@ where // build and connect peer_no random peers let mut peers = vec![]; - let mut receiver_queues = HashMap::new(); + let mut receivers = HashMap::new(); let empty = vec![]; for i in 0..self.peer_no { @@ -110,10 +110,10 @@ where i < self.explicit, ); peers.push(peer); - receiver_queues.insert(peer, receiver); + receivers.insert(peer, receiver); } - (gs, peers, receiver_queues, topic_hashes) + (gs, peers, receivers, topic_hashes) } fn peer_no(mut self, peer_no: usize) -> Self { @@ -420,7 +420,7 @@ fn test_subscribe() { // - run JOIN(topic) let subscribe_topic = vec![String::from("test_subscribe")]; - let (gs, _, queues, topic_hashes) = inject_nodes1() + let (gs, _, receivers, topic_hashes) = inject_nodes1() .peer_no(20) .topics(subscribe_topic) .to_subscribe(true) @@ -432,11 +432,12 @@ fn test_subscribe() { ); // collect all the subscriptions - let subscriptions = queues + let subscriptions = receivers .into_values() .fold(0, |mut collected_subscriptions, c| { - while !c.priority.is_empty() { - if let Ok(RpcOut::Subscribe(_)) = c.priority.try_recv() { + let priority = c.priority.into_inner(); + while !priority.is_empty() { + if let Ok(RpcOut::Subscribe(_)) = priority.try_recv() { collected_subscriptions += 1 } } @@ -447,8 +448,8 @@ fn test_subscribe() { assert_eq!(subscriptions, 20); } -#[test] /// Test unsubscribe. +#[test] fn test_unsubscribe() { // Unsubscribe should: // - Remove the mesh entry for topic @@ -462,7 +463,7 @@ fn test_unsubscribe() { .collect::>(); // subscribe to topic_strings - let (mut gs, _, queues, topic_hashes) = inject_nodes1() + let (mut gs, _, receivers, topic_hashes) = inject_nodes1() .peer_no(20) .topics(topic_strings) .to_subscribe(true) @@ -492,11 +493,12 @@ fn test_unsubscribe() { ); // collect all the subscriptions - let subscriptions = queues + let subscriptions = receivers .into_values() .fold(0, |mut collected_subscriptions, c| { - while !c.priority.is_empty() { - if let Ok(RpcOut::Subscribe(_)) = c.priority.try_recv() { + let priority = c.priority.into_inner(); + while !priority.is_empty() { + if let Ok(RpcOut::Subscribe(_)) = priority.try_recv() { collected_subscriptions += 1 } } @@ -515,8 +517,8 @@ fn test_unsubscribe() { } } -#[test] /// Test JOIN(topic) functionality. +#[test] fn test_join() { // The Join function should: // - Remove peers from fanout[topic] @@ -540,7 +542,7 @@ fn test_join() { .create_network(); // Flush previous GRAFT messages. - flush_events(&mut gs, &receivers); + receivers = flush_events(&mut gs, receivers); // unsubscribe, then call join to invoke functionality assert!( @@ -564,17 +566,33 @@ fn test_join() { "Should have added 6 nodes to the mesh" ); - fn count_grafts(mut acc: usize, receiver: &RpcReceiver) -> usize { - while !receiver.priority.is_empty() || !receiver.non_priority.is_empty() { - if let Ok(RpcOut::Graft(_)) = receiver.priority.try_recv() { - acc += 1; + fn count_grafts( + receivers: HashMap, + ) -> (usize, HashMap) { + let mut new_receivers = HashMap::new(); + let mut acc = 0; + + for (peer_id, c) in receivers.into_iter() { + let priority = c.priority.into_inner(); + while !priority.is_empty() { + if let Ok(RpcOut::Graft(_)) = priority.try_recv() { + acc += 1; + } } + new_receivers.insert( + peer_id, + RpcReceiver { + priority_len: c.priority_len, + priority: priority.peekable(), + non_priority: c.non_priority, + }, + ); } - acc + (acc, new_receivers) } // there should be mesh_n GRAFT messages. - let graft_messages = receivers.values().fold(0, count_grafts); + let (graft_messages, mut receivers) = count_grafts(receivers); assert_eq!( graft_messages, 6, @@ -645,12 +663,12 @@ fn test_join() { ); } - // there should now be 12 graft messages to be sent - let graft_messages = receivers.values().fold(graft_messages, count_grafts); + // there should now 6 graft messages to be sent + let (graft_messages, _) = count_grafts(receivers); assert_eq!( - graft_messages, 12, - "There should be 12 grafts messages sent to peers" + graft_messages, 6, + "There should be 6 grafts messages sent to peers" ); } @@ -668,7 +686,7 @@ fn test_publish_without_flood_publishing() { .unwrap(); let publish_topic = String::from("test_publish"); - let (mut gs, _, queues, topic_hashes) = inject_nodes1() + let (mut gs, _, receivers, topic_hashes) = inject_nodes1() .peer_no(20) .topics(vec![publish_topic.clone()]) .to_subscribe(true) @@ -695,11 +713,12 @@ fn test_publish_without_flood_publishing() { gs.publish(Topic::new(publish_topic), publish_data).unwrap(); // Collect all publish messages - let publishes = queues + let publishes = receivers .into_values() .fold(vec![], |mut collected_publish, c| { - while !c.priority.is_empty() { - if let Ok(RpcOut::Publish { message, .. }) = c.priority.try_recv() { + let priority = c.priority.into_inner(); + while !priority.is_empty() { + if let Ok(RpcOut::Publish { message, .. }) = priority.try_recv() { collected_publish.push(message); } } @@ -747,7 +766,7 @@ fn test_fanout() { .unwrap(); let fanout_topic = String::from("test_fanout"); - let (mut gs, _, queues, topic_hashes) = inject_nodes1() + let (mut gs, _, receivers, topic_hashes) = inject_nodes1() .peer_no(20) .topics(vec![fanout_topic.clone()]) .to_subscribe(true) @@ -779,11 +798,12 @@ fn test_fanout() { ); // Collect all publish messages - let publishes = queues + let publishes = receivers .into_values() .fold(vec![], |mut collected_publish, c| { - while !c.priority.is_empty() { - if let Ok(RpcOut::Publish { message, .. }) = c.priority.try_recv() { + let priority = c.priority.into_inner(); + while !priority.is_empty() { + if let Ok(RpcOut::Publish { message, .. }) = priority.try_recv() { collected_publish.push(message); } } @@ -815,10 +835,10 @@ fn test_fanout() { ); } -#[test] /// Test the gossipsub NetworkBehaviour peer connection logic. +#[test] fn test_inject_connected() { - let (gs, peers, queues, topic_hashes) = inject_nodes1() + let (gs, peers, receivers, topic_hashes) = inject_nodes1() .peer_no(20) .topics(vec![String::from("topic1"), String::from("topic2")]) .to_subscribe(true) @@ -826,11 +846,12 @@ fn test_inject_connected() { // check that our subscriptions are sent to each of the peers // collect all the SendEvents - let subscriptions = queues.into_iter().fold( + let subscriptions = receivers.into_iter().fold( HashMap::>::new(), |mut collected_subscriptions, (peer, c)| { - while !c.priority.is_empty() { - if let Ok(RpcOut::Subscribe(topic)) = c.priority.try_recv() { + let priority = c.priority.into_inner(); + while !priority.is_empty() { + if let Ok(RpcOut::Subscribe(topic)) = priority.try_recv() { let mut peer_subs = collected_subscriptions.remove(&peer).unwrap_or_default(); peer_subs.push(topic.into_string()); collected_subscriptions.insert(peer, peer_subs); @@ -860,8 +881,8 @@ fn test_inject_connected() { } } -#[test] /// Test subscription handling +#[test] fn test_handle_received_subscriptions() { // For every subscription: // SUBSCRIBE: - Add subscribed topic to peer_topics for peer. @@ -972,8 +993,8 @@ fn test_handle_received_subscriptions() { ); } -#[test] /// Test Gossipsub.get_random_peers() function +#[test] fn test_get_random_peers() { // generate a default Config let gs_config = ConfigBuilder::default() @@ -1031,7 +1052,7 @@ fn test_get_random_peers() { /// Tests that the correct message is sent when a peer asks for a message in our cache. #[test] fn test_handle_iwant_msg_cached() { - let (mut gs, peers, queues, _) = inject_nodes1() + let (mut gs, peers, receivers, _) = inject_nodes1() .peer_no(20) .topics(Vec::new()) .to_subscribe(true) @@ -1059,11 +1080,12 @@ fn test_handle_iwant_msg_cached() { gs.handle_iwant(&peers[7], vec![msg_id.clone()]); // the messages we are sending - let sent_messages = queues + let sent_messages = receivers .into_values() .fold(vec![], |mut collected_messages, c| { - while !c.non_priority.is_empty() { - if let Ok(RpcOut::Forward { message, .. }) = c.non_priority.try_recv() { + let non_priority = c.non_priority.into_inner(); + while !non_priority.is_empty() { + if let Ok(RpcOut::Forward { message, .. }) = non_priority.try_recv() { collected_messages.push(message) } } @@ -1082,7 +1104,7 @@ fn test_handle_iwant_msg_cached() { /// Tests that messages are sent correctly depending on the shifting of the message cache. #[test] fn test_handle_iwant_msg_cached_shifted() { - let (mut gs, peers, queues, _) = inject_nodes1() + let (mut gs, peers, mut receivers, _) = inject_nodes1() .peer_no(20) .topics(Vec::new()) .to_subscribe(true) @@ -1115,21 +1137,29 @@ fn test_handle_iwant_msg_cached_shifted() { gs.handle_iwant(&peers[7], vec![msg_id.clone()]); // is the message is being sent? - let message_exists = queues.values().any(|c| { - let mut out = false; - while !c.non_priority.is_empty() { - if matches!(c.non_priority.try_recv(), Ok(RpcOut::Forward{message, timeout: _ }) if + let mut message_exists = false; + receivers = receivers.into_iter().map(|(peer_id, c)| { + let non_priority = c.non_priority.into_inner(); + while !non_priority.is_empty() { + if matches!(non_priority.try_recv(), Ok(RpcOut::Forward{message, timeout: _ }) if gs.config.message_id( &gs.data_transform .inbound_transform(message.clone()) .unwrap(), ) == msg_id) { - out = true; + message_exists = true; } } - out - }); + ( + peer_id, + RpcReceiver { + priority_len: c.priority_len, + priority: c.priority, + non_priority: non_priority.peekable(), + }, + ) + }).collect(); // default history_length is 5, expect no messages after shift > 5 if shift < 5 { assert!( @@ -1145,8 +1175,8 @@ fn test_handle_iwant_msg_cached_shifted() { } } +/// tests that an event is not created when a peers asks for a message not in our cache #[test] -// tests that an event is not created when a peers asks for a message not in our cache fn test_handle_iwant_msg_not_cached() { let (mut gs, peers, _, _) = inject_nodes1() .peer_no(20) @@ -1164,10 +1194,10 @@ fn test_handle_iwant_msg_not_cached() { ); } +/// tests that an event is created when a peer shares that it has a message we want #[test] -// tests that an event is created when a peer shares that it has a message we want fn test_handle_ihave_subscribed_and_msg_not_cached() { - let (mut gs, peers, receivers, topic_hashes) = inject_nodes1() + let (mut gs, peers, mut receivers, topic_hashes) = inject_nodes1() .peer_no(20) .topics(vec![String::from("topic1")]) .to_subscribe(true) @@ -1180,9 +1210,10 @@ fn test_handle_ihave_subscribed_and_msg_not_cached() { // check that we sent an IWANT request for `unknown id` let mut iwant_exists = false; - let receiver = receivers.get(&peers[7]).unwrap(); - while !receiver.non_priority.is_empty() { - if let Ok(RpcOut::IWant(IWant { message_ids })) = receiver.non_priority.try_recv() { + let receiver = receivers.remove(&peers[7]).unwrap(); + let non_priority = receiver.non_priority.into_inner(); + while !non_priority.is_empty() { + if let Ok(RpcOut::IWant(IWant { message_ids })) = non_priority.try_recv() { if message_ids .iter() .any(|m| *m == MessageId::new(b"unknown id")) @@ -1199,9 +1230,9 @@ fn test_handle_ihave_subscribed_and_msg_not_cached() { ); } +/// tests that an event is not created when a peer shares that it has a message that +/// we already have #[test] -// tests that an event is not created when a peer shares that it has a message that -// we already have fn test_handle_ihave_subscribed_and_msg_cached() { let (mut gs, peers, _, topic_hashes) = inject_nodes1() .peer_no(20) @@ -1221,9 +1252,9 @@ fn test_handle_ihave_subscribed_and_msg_cached() { ) } +/// test that an event is not created when a peer shares that it has a message in +/// a topic that we are not subscribed to #[test] -// test that an event is not created when a peer shares that it has a message in -// a topic that we are not subscribed to fn test_handle_ihave_not_subscribed() { let (mut gs, peers, _, _) = inject_nodes1() .peer_no(20) @@ -1247,9 +1278,9 @@ fn test_handle_ihave_not_subscribed() { ) } +/// tests that a peer is added to our mesh when we are both subscribed +/// to the same topic #[test] -// tests that a peer is added to our mesh when we are both subscribed -// to the same topic fn test_handle_graft_is_subscribed() { let (mut gs, peers, _, topic_hashes) = inject_nodes1() .peer_no(20) @@ -1265,9 +1296,9 @@ fn test_handle_graft_is_subscribed() { ); } +/// tests that a peer is not added to our mesh when they are subscribed to +/// a topic that we are not #[test] -// tests that a peer is not added to our mesh when they are subscribed to -// a topic that we are not fn test_handle_graft_is_not_subscribed() { let (mut gs, peers, _, topic_hashes) = inject_nodes1() .peer_no(20) @@ -1286,8 +1317,8 @@ fn test_handle_graft_is_not_subscribed() { ); } +/// tests multiple topics in a single graft message #[test] -// tests multiple topics in a single graft message fn test_handle_graft_multiple_topics() { let topics: Vec = ["topic1", "topic2", "topic3", "topic4"] .iter() @@ -1321,8 +1352,8 @@ fn test_handle_graft_multiple_topics() { ); } +/// tests that a peer is removed from our mesh #[test] -// tests that a peer is removed from our mesh fn test_handle_prune_peer_in_mesh() { let (mut gs, peers, _, topic_hashes) = inject_nodes1() .peer_no(20) @@ -1352,43 +1383,65 @@ fn test_handle_prune_peer_in_mesh() { } fn count_control_msgs( - queues: &HashMap, + receivers: HashMap, mut filter: impl FnMut(&PeerId, &RpcOut) -> bool, -) -> usize { - queues - .iter() - .fold(0, |mut collected_messages, (peer_id, c)| { - while !c.priority.is_empty() || !c.non_priority.is_empty() { - if let Ok(rpc) = c.priority.try_recv() { - if filter(peer_id, &rpc) { - collected_messages += 1; - } +) -> (usize, HashMap) { + let mut new_receivers = HashMap::new(); + let mut collected_messages = 0; + for (peer_id, c) in receivers.into_iter() { + let priority = c.priority.into_inner(); + let non_priority = c.non_priority.into_inner(); + while !priority.is_empty() || !non_priority.is_empty() { + if let Ok(rpc) = priority.try_recv() { + if filter(&peer_id, &rpc) { + collected_messages += 1; } - if let Ok(rpc) = c.non_priority.try_recv() { - if filter(peer_id, &rpc) { - collected_messages += 1; - } + } + if let Ok(rpc) = non_priority.try_recv() { + if filter(&peer_id, &rpc) { + collected_messages += 1; } } - collected_messages - }) + } + new_receivers.insert( + peer_id, + RpcReceiver { + priority_len: c.priority_len, + priority: priority.peekable(), + non_priority: non_priority.peekable(), + }, + ); + } + (collected_messages, new_receivers) } fn flush_events( gs: &mut Behaviour, - receiver_queues: &HashMap, -) { + receivers: HashMap, +) -> HashMap { gs.events.clear(); - for c in receiver_queues.values() { - while !c.priority.is_empty() || !c.non_priority.is_empty() { - let _ = c.priority.try_recv(); - let _ = c.non_priority.try_recv(); + let mut new_receivers = HashMap::new(); + for (peer_id, c) in receivers.into_iter() { + let priority = c.priority.into_inner(); + let non_priority = c.non_priority.into_inner(); + while !priority.is_empty() || !non_priority.is_empty() { + let _ = priority.try_recv(); + let _ = non_priority.try_recv(); } + new_receivers.insert( + peer_id, + RpcReceiver { + priority_len: c.priority_len, + priority: priority.peekable(), + non_priority: non_priority.peekable(), + }, + ); } + new_receivers } +/// tests that a peer added as explicit peer gets connected to #[test] -// tests that a peer added as explicit peer gets connected to fn test_explicit_peer_gets_connected() { let (mut gs, _, _, _) = inject_nodes1() .peer_no(0) @@ -1423,7 +1476,7 @@ fn test_explicit_peer_reconnects() { .check_explicit_peers_ticks(2) .build() .unwrap(); - let (mut gs, others, queues, _) = inject_nodes1() + let (mut gs, others, receivers, _) = inject_nodes1() .peer_no(1) .topics(Vec::new()) .to_subscribe(true) @@ -1435,7 +1488,7 @@ fn test_explicit_peer_reconnects() { //add peer as explicit peer gs.add_explicit_peer(peer); - flush_events(&mut gs, &queues); + flush_events(&mut gs, receivers); //disconnect peer disconnect_peer(&mut gs, peer); @@ -1473,7 +1526,7 @@ fn test_explicit_peer_reconnects() { #[test] fn test_handle_graft_explicit_peer() { - let (mut gs, peers, queues, topic_hashes) = inject_nodes1() + let (mut gs, peers, receivers, topic_hashes) = inject_nodes1() .peer_no(1) .topics(vec![String::from("topic1"), String::from("topic2")]) .to_subscribe(true) @@ -1490,21 +1543,24 @@ fn test_handle_graft_explicit_peer() { assert!(gs.mesh[&topic_hashes[1]].is_empty()); //check prunes - assert!( - count_control_msgs(&queues, |peer_id, m| peer_id == peer + let (control_msgs, _) = count_control_msgs(receivers, |peer_id, m| { + peer_id == peer && match m { - RpcOut::Prune(Prune { topic_hash, .. }) => - topic_hash == &topic_hashes[0] || topic_hash == &topic_hashes[1], + RpcOut::Prune(Prune { topic_hash, .. }) => { + topic_hash == &topic_hashes[0] || topic_hash == &topic_hashes[1] + } _ => false, - }) - >= 2, + } + }); + assert!( + control_msgs >= 2, "Not enough prunes sent when grafting from explicit peer" ); } #[test] fn explicit_peers_not_added_to_mesh_on_receiving_subscription() { - let (gs, peers, queues, topic_hashes) = inject_nodes1() + let (gs, peers, receivers, topic_hashes) = inject_nodes1() .peer_no(2) .topics(vec![String::from("topic1")]) .to_subscribe(true) @@ -1519,25 +1575,27 @@ fn explicit_peers_not_added_to_mesh_on_receiving_subscription() { ); //assert that graft gets created to non-explicit peer + let (control_msgs, receivers) = count_control_msgs(receivers, |peer_id, m| { + peer_id == &peers[1] && matches!(m, RpcOut::Graft { .. }) + }); assert!( - count_control_msgs(&queues, |peer_id, m| peer_id == &peers[1] - && matches!(m, RpcOut::Graft { .. })) - >= 1, + control_msgs >= 1, "No graft message got created to non-explicit peer" ); //assert that no graft gets created to explicit peer + let (control_msgs, _) = count_control_msgs(receivers, |peer_id, m| { + peer_id == &peers[0] && matches!(m, RpcOut::Graft { .. }) + }); assert_eq!( - count_control_msgs(&queues, |peer_id, m| peer_id == &peers[0] - && matches!(m, RpcOut::Graft { .. })), - 0, + control_msgs, 0, "A graft message got created to an explicit peer" ); } #[test] fn do_not_graft_explicit_peer() { - let (mut gs, others, queues, topic_hashes) = inject_nodes1() + let (mut gs, others, receivers, topic_hashes) = inject_nodes1() .peer_no(1) .topics(vec![String::from("topic")]) .to_subscribe(true) @@ -1551,17 +1609,18 @@ fn do_not_graft_explicit_peer() { assert_eq!(gs.mesh[&topic_hashes[0]], BTreeSet::new()); //assert that no graft gets created to explicit peer + let (control_msgs, _) = count_control_msgs(receivers, |peer_id, m| { + peer_id == &others[0] && matches!(m, RpcOut::Graft { .. }) + }); assert_eq!( - count_control_msgs(&queues, |peer_id, m| peer_id == &others[0] - && matches!(m, RpcOut::Graft { .. })), - 0, + control_msgs, 0, "A graft message got created to an explicit peer" ); } #[test] fn do_forward_messages_to_explicit_peers() { - let (mut gs, peers, queues, topic_hashes) = inject_nodes1() + let (mut gs, peers, receivers, topic_hashes) = inject_nodes1() .peer_no(2) .topics(vec![String::from("topic1"), String::from("topic2")]) .to_subscribe(true) @@ -1582,9 +1641,10 @@ fn do_forward_messages_to_explicit_peers() { }; gs.handle_received_message(message.clone(), &local_id); assert_eq!( - queues.into_iter().fold(0, |mut fwds, (peer_id, c)| { - while !c.non_priority.is_empty() { - if matches!(c.non_priority.try_recv(), Ok(RpcOut::Forward{message: m, timeout: _}) if peer_id == peers[0] && m.data == message.data) { + receivers.into_iter().fold(0, |mut fwds, (peer_id, c)| { + let non_priority = c.non_priority.into_inner(); + while !non_priority.is_empty() { + if matches!(non_priority.try_recv(), Ok(RpcOut::Forward{message: m, timeout: _}) if peer_id == peers[0] && m.data == message.data) { fwds +=1; } } @@ -1597,7 +1657,7 @@ fn do_forward_messages_to_explicit_peers() { #[test] fn explicit_peers_not_added_to_mesh_on_subscribe() { - let (mut gs, peers, queues, _) = inject_nodes1() + let (mut gs, peers, receivers, _) = inject_nodes1() .peer_no(2) .topics(Vec::new()) .to_subscribe(true) @@ -1625,25 +1685,27 @@ fn explicit_peers_not_added_to_mesh_on_subscribe() { assert_eq!(gs.mesh[&topic_hash], vec![peers[1]].into_iter().collect()); //assert that graft gets created to non-explicit peer + let (control_msgs, receivers) = count_control_msgs(receivers, |peer_id, m| { + peer_id == &peers[1] && matches!(m, RpcOut::Graft { .. }) + }); assert!( - count_control_msgs(&queues, |peer_id, m| peer_id == &peers[1] - && matches!(m, RpcOut::Graft { .. })) - > 0, + control_msgs > 0, "No graft message got created to non-explicit peer" ); //assert that no graft gets created to explicit peer + let (control_msgs, _) = count_control_msgs(receivers, |peer_id, m| { + peer_id == &peers[0] && matches!(m, RpcOut::Graft { .. }) + }); assert_eq!( - count_control_msgs(&queues, |peer_id, m| peer_id == &peers[0] - && matches!(m, RpcOut::Graft { .. })), - 0, + control_msgs, 0, "A graft message got created to an explicit peer" ); } #[test] fn explicit_peers_not_added_to_mesh_from_fanout_on_subscribe() { - let (mut gs, peers, queues, _) = inject_nodes1() + let (mut gs, peers, receivers, _) = inject_nodes1() .peer_no(2) .topics(Vec::new()) .to_subscribe(true) @@ -1674,25 +1736,27 @@ fn explicit_peers_not_added_to_mesh_from_fanout_on_subscribe() { assert_eq!(gs.mesh[&topic_hash], vec![peers[1]].into_iter().collect()); //assert that graft gets created to non-explicit peer + let (control_msgs, receivers) = count_control_msgs(receivers, |peer_id, m| { + peer_id == &peers[1] && matches!(m, RpcOut::Graft { .. }) + }); assert!( - count_control_msgs(&queues, |peer_id, m| peer_id == &peers[1] - && matches!(m, RpcOut::Graft { .. })) - >= 1, + control_msgs >= 1, "No graft message got created to non-explicit peer" ); //assert that no graft gets created to explicit peer + let (control_msgs, _) = count_control_msgs(receivers, |peer_id, m| { + peer_id == &peers[0] && matches!(m, RpcOut::Graft { .. }) + }); assert_eq!( - count_control_msgs(&queues, |peer_id, m| peer_id == &peers[0] - && matches!(m, RpcOut::Graft { .. })), - 0, + control_msgs, 0, "A graft message got created to an explicit peer" ); } #[test] fn no_gossip_gets_sent_to_explicit_peers() { - let (mut gs, peers, receivers, topic_hashes) = inject_nodes1() + let (mut gs, peers, mut receivers, topic_hashes) = inject_nodes1() .peer_no(2) .topics(vec![String::from("topic1"), String::from("topic2")]) .to_subscribe(true) @@ -1721,23 +1785,24 @@ fn no_gossip_gets_sent_to_explicit_peers() { } //assert that no gossip gets sent to explicit peer - let receiver = receivers.get(&peers[0]).unwrap(); + let receiver = receivers.remove(&peers[0]).unwrap(); let mut gossips = 0; - while !receiver.non_priority.is_empty() { - if let Ok(RpcOut::IHave(_)) = receiver.non_priority.try_recv() { + let non_priority = receiver.non_priority.into_inner(); + while !non_priority.is_empty() { + if let Ok(RpcOut::IHave(_)) = non_priority.try_recv() { gossips += 1; } } assert_eq!(gossips, 0, "Gossip got emitted to explicit peer"); } -// Tests the mesh maintenance addition +/// Tests the mesh maintenance addition #[test] fn test_mesh_addition() { let config: Config = Config::default(); // Adds mesh_low peers and PRUNE 2 giving us a deficit. - let (mut gs, peers, _queues, topics) = inject_nodes1() + let (mut gs, peers, _receivers, topics) = inject_nodes1() .peer_no(config.mesh_n() + 1) .topics(vec!["test".into()]) .to_subscribe(true) @@ -1765,7 +1830,7 @@ fn test_mesh_addition() { assert_eq!(gs.mesh.get(&topics[0]).unwrap().len(), config.mesh_n()); } -// Tests the mesh maintenance subtraction +/// Tests the mesh maintenance subtraction #[test] fn test_mesh_subtraction() { let config = Config::default(); @@ -1853,7 +1918,7 @@ fn test_send_px_and_backoff_in_prune() { let config: Config = Config::default(); //build mesh with enough peers for px - let (mut gs, peers, queues, topics) = inject_nodes1() + let (mut gs, peers, receivers, topics) = inject_nodes1() .peer_no(config.prune_peers() + 1) .topics(vec!["test".into()]) .to_subscribe(true) @@ -1869,24 +1934,25 @@ fn test_send_px_and_backoff_in_prune() { ); //check prune message - assert_eq!( - count_control_msgs(&queues, |peer_id, m| peer_id == &peers[0] + let (control_msgs, _) = count_control_msgs(receivers, |peer_id, m| { + peer_id == &peers[0] && match m { RpcOut::Prune(Prune { topic_hash, peers, backoff, - }) => + }) => { topic_hash == &topics[0] && peers.len() == config.prune_peers() && //all peers are different peers.iter().collect::>().len() == config.prune_peers() && - backoff.unwrap() == config.prune_backoff().as_secs(), + backoff.unwrap() == config.prune_backoff().as_secs() + } _ => false, - }), - 1 - ); + } + }); + assert_eq!(control_msgs, 1); } #[test] @@ -1894,7 +1960,7 @@ fn test_prune_backoffed_peer_on_graft() { let config: Config = Config::default(); //build mesh with enough peers for px - let (mut gs, peers, queues, topics) = inject_nodes1() + let (mut gs, peers, receivers, topics) = inject_nodes1() .peer_no(config.prune_peers() + 1) .topics(vec!["test".into()]) .to_subscribe(true) @@ -1911,28 +1977,29 @@ fn test_prune_backoffed_peer_on_graft() { ); //ignore all messages until now - flush_events(&mut gs, &queues); + let receivers = flush_events(&mut gs, receivers); //handle graft gs.handle_graft(&peers[0], vec![topics[0].clone()]); //check prune message - assert_eq!( - count_control_msgs(&queues, |peer_id, m| peer_id == &peers[0] + let (control_msgs, _) = count_control_msgs(receivers, |peer_id, m| { + peer_id == &peers[0] && match m { RpcOut::Prune(Prune { topic_hash, peers, backoff, - }) => + }) => { topic_hash == &topics[0] && //no px in this case peers.is_empty() && - backoff.unwrap() == config.prune_backoff().as_secs(), + backoff.unwrap() == config.prune_backoff().as_secs() + } _ => false, - }), - 1 - ); + } + }); + assert_eq!(control_msgs, 1); } #[test] @@ -1943,7 +2010,7 @@ fn test_do_not_graft_within_backoff_period() { .build() .unwrap(); //only one peer => mesh too small and will try to regraft as early as possible - let (mut gs, peers, queues, topics) = inject_nodes1() + let (mut gs, peers, receivers, topics) = inject_nodes1() .peer_no(1) .topics(vec!["test".into()]) .to_subscribe(true) @@ -1954,7 +2021,7 @@ fn test_do_not_graft_within_backoff_period() { gs.handle_prune(&peers[0], vec![(topics[0].clone(), Vec::new(), Some(1))]); //forget all events until now - flush_events(&mut gs, &queues); + let receivers = flush_events(&mut gs, receivers); //call heartbeat gs.heartbeat(); @@ -1967,9 +2034,10 @@ fn test_do_not_graft_within_backoff_period() { //Check that no graft got created (we have backoff_slack = 1 therefore one more heartbeat // is needed). + let (control_msgs, receivers) = + count_control_msgs(receivers, |_, m| matches!(m, RpcOut::Graft { .. })); assert_eq!( - count_control_msgs(&queues, |_, m| matches!(m, RpcOut::Graft { .. })), - 0, + control_msgs, 0, "Graft message created too early within backoff period" ); @@ -1978,8 +2046,9 @@ fn test_do_not_graft_within_backoff_period() { gs.heartbeat(); //check that graft got created + let (control_msgs, _) = count_control_msgs(receivers, |_, m| matches!(m, RpcOut::Graft { .. })); assert!( - count_control_msgs(&queues, |_, m| matches!(m, RpcOut::Graft { .. })) > 0, + control_msgs > 0, "No graft message was created after backoff period" ); } @@ -1994,7 +2063,7 @@ fn test_do_not_graft_within_default_backoff_period_after_receiving_prune_without .build() .unwrap(); //only one peer => mesh too small and will try to regraft as early as possible - let (mut gs, peers, queues, topics) = inject_nodes1() + let (mut gs, peers, receivers, topics) = inject_nodes1() .peer_no(1) .topics(vec!["test".into()]) .to_subscribe(true) @@ -2005,7 +2074,7 @@ fn test_do_not_graft_within_default_backoff_period_after_receiving_prune_without gs.handle_prune(&peers[0], vec![(topics[0].clone(), Vec::new(), None)]); //forget all events until now - flush_events(&mut gs, &queues); + let receivers = flush_events(&mut gs, receivers); //call heartbeat gs.heartbeat(); @@ -2016,9 +2085,10 @@ fn test_do_not_graft_within_default_backoff_period_after_receiving_prune_without //Check that no graft got created (we have backoff_slack = 1 therefore one more heartbeat // is needed). + let (control_msgs, receivers) = + count_control_msgs(receivers, |_, m| matches!(m, RpcOut::Graft { .. })); assert_eq!( - count_control_msgs(&queues, |_, m| matches!(m, RpcOut::Graft { .. })), - 0, + control_msgs, 0, "Graft message created too early within backoff period" ); @@ -2027,8 +2097,9 @@ fn test_do_not_graft_within_default_backoff_period_after_receiving_prune_without gs.heartbeat(); //check that graft got created + let (control_msgs, _) = count_control_msgs(receivers, |_, m| matches!(m, RpcOut::Graft { .. })); assert!( - count_control_msgs(&queues, |_, m| matches!(m, RpcOut::Graft { .. })) > 0, + control_msgs > 0, "No graft message was created after backoff period" ); } @@ -2047,7 +2118,7 @@ fn test_unsubscribe_backoff() { let topic = String::from("test"); // only one peer => mesh too small and will try to regraft as early as possible - let (mut gs, _, queues, topics) = inject_nodes1() + let (mut gs, _, receivers, topics) = inject_nodes1() .peer_no(1) .topics(vec![topic.clone()]) .to_subscribe(true) @@ -2056,19 +2127,19 @@ fn test_unsubscribe_backoff() { let _ = gs.unsubscribe(&Topic::new(topic)); + let (control_msgs, receivers) = count_control_msgs(receivers, |_, m| match m { + RpcOut::Prune(Prune { backoff, .. }) => backoff == &Some(1), + _ => false, + }); assert_eq!( - count_control_msgs(&queues, |_, m| match m { - RpcOut::Prune(Prune { backoff, .. }) => backoff == &Some(1), - _ => false, - }), - 1, + control_msgs, 1, "Peer should be pruned with `unsubscribe_backoff`." ); let _ = gs.subscribe(&Topic::new(topics[0].to_string())); // forget all events until now - flush_events(&mut gs, &queues); + let receivers = flush_events(&mut gs, receivers); // call heartbeat gs.heartbeat(); @@ -2081,9 +2152,10 @@ fn test_unsubscribe_backoff() { // Check that no graft got created (we have backoff_slack = 1 therefore one more heartbeat // is needed). + let (control_msgs, receivers) = + count_control_msgs(receivers, |_, m| matches!(m, RpcOut::Graft { .. })); assert_eq!( - count_control_msgs(&queues, |_, m| matches!(m, RpcOut::Graft { .. })), - 0, + control_msgs, 0, "Graft message created too early within backoff period" ); @@ -2092,8 +2164,9 @@ fn test_unsubscribe_backoff() { gs.heartbeat(); // check that graft got created + let (control_msgs, _) = count_control_msgs(receivers, |_, m| matches!(m, RpcOut::Graft { .. })); assert!( - count_control_msgs(&queues, |_, m| matches!(m, RpcOut::Graft { .. })) > 0, + control_msgs > 0, "No graft message was created after backoff period" ); } @@ -2104,7 +2177,7 @@ fn test_flood_publish() { let topic = "test"; // Adds more peers than mesh can hold to test flood publishing - let (mut gs, _, queues, _) = inject_nodes1() + let (mut gs, _, receivers, _) = inject_nodes1() .peer_no(config.mesh_n_high() + 10) .topics(vec![topic.into()]) .to_subscribe(true) @@ -2115,11 +2188,12 @@ fn test_flood_publish() { gs.publish(Topic::new(topic), publish_data).unwrap(); // Collect all publish messages - let publishes = queues + let publishes = receivers .into_values() .fold(vec![], |mut collected_publish, c| { - while !c.priority.is_empty() { - if let Ok(RpcOut::Publish { message, .. }) = c.priority.try_recv() { + let priority = c.priority.into_inner(); + while !priority.is_empty() { + if let Ok(RpcOut::Publish { message, .. }) = priority.try_recv() { collected_publish.push(message); } } @@ -2158,7 +2232,7 @@ fn test_gossip_to_at_least_gossip_lazy_peers() { //add more peers than in mesh to test gossipping //by default only mesh_n_low peers will get added to mesh - let (mut gs, _, queues, topic_hashes) = inject_nodes1() + let (mut gs, _, receivers, topic_hashes) = inject_nodes1() .peer_no(config.mesh_n_low() + config.gossip_lazy() + 1) .topics(vec!["topic".into()]) .to_subscribe(true) @@ -2185,16 +2259,14 @@ fn test_gossip_to_at_least_gossip_lazy_peers() { let msg_id = gs.config.message_id(message); //check that exactly config.gossip_lazy() many gossip messages were sent. - assert_eq!( - count_control_msgs(&queues, |_, action| match action { - RpcOut::IHave(IHave { - topic_hash, - message_ids, - }) => topic_hash == &topic_hashes[0] && message_ids.iter().any(|id| id == &msg_id), - _ => false, - }), - config.gossip_lazy() - ); + let (control_msgs, _) = count_control_msgs(receivers, |_, action| match action { + RpcOut::IHave(IHave { + topic_hash, + message_ids, + }) => topic_hash == &topic_hashes[0] && message_ids.iter().any(|id| id == &msg_id), + _ => false, + }); + assert_eq!(control_msgs, config.gossip_lazy()); } #[test] @@ -2203,7 +2275,7 @@ fn test_gossip_to_at_most_gossip_factor_peers() { //add a lot of peers let m = config.mesh_n_low() + config.gossip_lazy() * (2.0 / config.gossip_factor()) as usize; - let (mut gs, _, queues, topic_hashes) = inject_nodes1() + let (mut gs, _, receivers, topic_hashes) = inject_nodes1() .peer_no(m) .topics(vec!["topic".into()]) .to_subscribe(true) @@ -2229,14 +2301,15 @@ fn test_gossip_to_at_most_gossip_factor_peers() { let msg_id = gs.config.message_id(message); //check that exactly config.gossip_lazy() many gossip messages were sent. + let (control_msgs, _) = count_control_msgs(receivers, |_, action| match action { + RpcOut::IHave(IHave { + topic_hash, + message_ids, + }) => topic_hash == &topic_hashes[0] && message_ids.iter().any(|id| id == &msg_id), + _ => false, + }); assert_eq!( - count_control_msgs(&queues, |_, action| match action { - RpcOut::IHave(IHave { - topic_hash, - message_ids, - }) => topic_hash == &topic_hashes[0] && message_ids.iter().any(|id| id == &msg_id), - _ => false, - }), + control_msgs, ((m - config.mesh_n_low()) as f64 * config.gossip_factor()) as usize ); } @@ -2365,7 +2438,7 @@ fn test_prune_negative_scored_peers() { let config = Config::default(); //build mesh with one peer - let (mut gs, peers, queues, topics) = inject_nodes1() + let (mut gs, peers, receivers, topics) = inject_nodes1() .peer_no(1) .topics(vec!["test".into()]) .to_subscribe(true) @@ -2388,22 +2461,23 @@ fn test_prune_negative_scored_peers() { assert!(gs.mesh[&topics[0]].is_empty()); //check prune message - assert_eq!( - count_control_msgs(&queues, |peer_id, m| peer_id == &peers[0] + let (control_msgs, _) = count_control_msgs(receivers, |peer_id, m| { + peer_id == &peers[0] && match m { RpcOut::Prune(Prune { topic_hash, peers, backoff, - }) => + }) => { topic_hash == &topics[0] && //no px in this case peers.is_empty() && - backoff.unwrap() == config.prune_backoff().as_secs(), + backoff.unwrap() == config.prune_backoff().as_secs() + } _ => false, - }), - 1 - ); + } + }); + assert_eq!(control_msgs, 1); } #[test] @@ -2496,7 +2570,7 @@ fn test_only_send_nonnegative_scoring_peers_in_px() { .unwrap(); // Build mesh with three peer - let (mut gs, peers, queues, topics) = inject_nodes1() + let (mut gs, peers, receivers, topics) = inject_nodes1() .peer_no(3) .topics(vec!["test".into()]) .to_subscribe(true) @@ -2522,29 +2596,26 @@ fn test_only_send_nonnegative_scoring_peers_in_px() { ); // Check that px in prune message only contains third peer - assert_eq!( - count_control_msgs(&queues, |peer_id, m| peer_id == &peers[1] + let (control_msgs, _) = count_control_msgs(receivers, |peer_id, m| { + peer_id == &peers[1] && match m { RpcOut::Prune(Prune { topic_hash, peers: px, .. - }) => + }) => { topic_hash == &topics[0] && px.len() == 1 - && px[0].peer_id.as_ref().unwrap() == &peers[2], + && px[0].peer_id.as_ref().unwrap() == &peers[2] + } _ => false, - }), - 1 - ); + } + }); + assert_eq!(control_msgs, 1); } #[test] fn test_do_not_gossip_to_peers_below_gossip_threshold() { - // use tracing_subscriber::EnvFilter; - // let _ = tracing_subscriber::fmt() - // .with_env_filter(EnvFilter::from_default_env()) - // .try_init(); let config = Config::default(); let peer_score_params = PeerScoreParams::default(); let peer_score_thresholds = PeerScoreThresholds { @@ -2601,23 +2672,21 @@ fn test_do_not_gossip_to_peers_below_gossip_threshold() { gs.emit_gossip(); // Check that exactly one gossip messages got sent and it got sent to p2 - assert_eq!( - count_control_msgs(&receivers, |peer, action| match action { - RpcOut::IHave(IHave { - topic_hash, - message_ids, - }) => { - if topic_hash == &topics[0] && message_ids.iter().any(|id| id == &msg_id) { - assert_eq!(peer, &p2); - true - } else { - false - } + let (control_msgs, _) = count_control_msgs(receivers, |peer, action| match action { + RpcOut::IHave(IHave { + topic_hash, + message_ids, + }) => { + if topic_hash == &topics[0] && message_ids.iter().any(|id| id == &msg_id) { + assert_eq!(peer, &p2); + true + } else { + false } - _ => false, - }), - 1 - ); + } + _ => false, + }); + assert_eq!(control_msgs, 1); } #[test] @@ -2630,7 +2699,7 @@ fn test_iwant_msg_from_peer_below_gossip_threshold_gets_ignored() { }; // Build full mesh - let (mut gs, peers, mut queues, topics) = inject_nodes1() + let (mut gs, peers, mut receivers, topics) = inject_nodes1() .peer_no(config.mesh_n_high()) .topics(vec!["test".into()]) .to_subscribe(true) @@ -2647,9 +2716,9 @@ fn test_iwant_msg_from_peer_below_gossip_threshold_gets_ignored() { // Add two additional peers that will not be part of the mesh let (p1, receiver1) = add_peer(&mut gs, &topics, false, false); - queues.insert(p1, receiver1); + receivers.insert(p1, receiver1); let (p2, receiver2) = add_peer(&mut gs, &topics, false, false); - queues.insert(p2, receiver2); + receivers.insert(p2, receiver2); // Reduce score of p1 below peer_score_thresholds.gossip_threshold // note that penalties get squared so two penalties means a score of @@ -2680,16 +2749,18 @@ fn test_iwant_msg_from_peer_below_gossip_threshold_gets_ignored() { gs.handle_iwant(&p2, vec![msg_id.clone()]); // the messages we are sending - let sent_messages = queues - .into_iter() - .fold(vec![], |mut collected_messages, (peer_id, c)| { - while !c.non_priority.is_empty() { - if let Ok(RpcOut::Forward { message, .. }) = c.non_priority.try_recv() { - collected_messages.push((peer_id, message)); + let sent_messages = + receivers + .into_iter() + .fold(vec![], |mut collected_messages, (peer_id, c)| { + let non_priority = c.non_priority.into_inner(); + while !non_priority.is_empty() { + if let Ok(RpcOut::Forward { message, .. }) = non_priority.try_recv() { + collected_messages.push((peer_id, message)); + } } - } - collected_messages - }); + collected_messages + }); //the message got sent to p2 assert!(sent_messages @@ -2718,7 +2789,7 @@ fn test_ihave_msg_from_peer_below_gossip_threshold_gets_ignored() { ..PeerScoreThresholds::default() }; //build full mesh - let (mut gs, peers, mut queues, topics) = inject_nodes1() + let (mut gs, peers, mut receivers, topics) = inject_nodes1() .peer_no(config.mesh_n_high()) .topics(vec!["test".into()]) .to_subscribe(true) @@ -2735,9 +2806,9 @@ fn test_ihave_msg_from_peer_below_gossip_threshold_gets_ignored() { //add two additional peers that will not be part of the mesh let (p1, receiver1) = add_peer(&mut gs, &topics, false, false); - queues.insert(p1, receiver1); + receivers.insert(p1, receiver1); let (p2, receiver2) = add_peer(&mut gs, &topics, false, false); - queues.insert(p2, receiver2); + receivers.insert(p2, receiver2); //reduce score of p1 below peer_score_thresholds.gossip_threshold //note that penalties get squared so two penalties means a score of @@ -2767,19 +2838,18 @@ fn test_ihave_msg_from_peer_below_gossip_threshold_gets_ignored() { gs.handle_ihave(&p2, vec![(topics[0].clone(), vec![msg_id.clone()])]); // check that we sent exactly one IWANT request to p2 - assert_eq!( - count_control_msgs(&queues, |peer, c| match c { - RpcOut::IWant(IWant { message_ids }) => - if message_ids.iter().any(|m| m == &msg_id) { - assert_eq!(peer, &p2); - true - } else { - false - }, - _ => false, - }), - 1 - ); + let (control_msgs, _) = count_control_msgs(receivers, |peer, c| match c { + RpcOut::IWant(IWant { message_ids }) => { + if message_ids.iter().any(|m| m == &msg_id) { + assert_eq!(peer, &p2); + true + } else { + false + } + } + _ => false, + }); + assert_eq!(control_msgs, 1); } #[test] @@ -2796,7 +2866,7 @@ fn test_do_not_publish_to_peer_below_publish_threshold() { }; //build mesh with no peers and no subscribed topics - let (mut gs, _, mut queues, _) = inject_nodes1() + let (mut gs, _, mut receivers, _) = inject_nodes1() .gs_config(config) .scoring(Some((peer_score_params, peer_score_thresholds))) .create_network(); @@ -2807,9 +2877,9 @@ fn test_do_not_publish_to_peer_below_publish_threshold() { //add two additional peers that will be added to the mesh let (p1, receiver1) = add_peer(&mut gs, &topics, false, false); - queues.insert(p1, receiver1); + receivers.insert(p1, receiver1); let (p2, receiver2) = add_peer(&mut gs, &topics, false, false); - queues.insert(p2, receiver2); + receivers.insert(p2, receiver2); //reduce score of p1 below peer_score_thresholds.publish_threshold //note that penalties get squared so two penalties means a score of @@ -2827,11 +2897,12 @@ fn test_do_not_publish_to_peer_below_publish_threshold() { gs.publish(topic, publish_data).unwrap(); // Collect all publish messages - let publishes = queues + let publishes = receivers .into_iter() .fold(vec![], |mut collected_publish, (peer_id, c)| { - while !c.priority.is_empty() { - if let Ok(RpcOut::Publish { message, .. }) = c.priority.try_recv() { + let priority = c.priority.into_inner(); + while !priority.is_empty() { + if let Ok(RpcOut::Publish { message, .. }) = priority.try_recv() { collected_publish.push((peer_id, message)); } } @@ -2853,7 +2924,7 @@ fn test_do_not_flood_publish_to_peer_below_publish_threshold() { ..PeerScoreThresholds::default() }; //build mesh with no peers - let (mut gs, _, mut queues, topics) = inject_nodes1() + let (mut gs, _, mut receivers, topics) = inject_nodes1() .topics(vec!["test".into()]) .gs_config(config) .scoring(Some((peer_score_params, peer_score_thresholds))) @@ -2861,9 +2932,9 @@ fn test_do_not_flood_publish_to_peer_below_publish_threshold() { //add two additional peers that will be added to the mesh let (p1, receiver1) = add_peer(&mut gs, &topics, false, false); - queues.insert(p1, receiver1); + receivers.insert(p1, receiver1); let (p2, receiver2) = add_peer(&mut gs, &topics, false, false); - queues.insert(p2, receiver2); + receivers.insert(p2, receiver2); //reduce score of p1 below peer_score_thresholds.publish_threshold //note that penalties get squared so two penalties means a score of @@ -2881,11 +2952,12 @@ fn test_do_not_flood_publish_to_peer_below_publish_threshold() { gs.publish(Topic::new("test"), publish_data).unwrap(); // Collect all publish messages - let publishes = queues + let publishes = receivers .into_iter() .fold(vec![], |mut collected_publish, (peer_id, c)| { - while !c.priority.is_empty() { - if let Ok(RpcOut::Publish { message, .. }) = c.priority.try_recv() { + let priority = c.priority.into_inner(); + while !priority.is_empty() { + if let Ok(RpcOut::Publish { message, .. }) = priority.try_recv() { collected_publish.push((peer_id, message)) } } @@ -4351,7 +4423,7 @@ fn test_opportunistic_grafting() { #[test] fn test_ignore_graft_from_unknown_topic() { //build gossipsub without subscribing to any topics - let (mut gs, peers, queues, _) = inject_nodes1() + let (mut gs, peers, receivers, _) = inject_nodes1() .peer_no(1) .topics(vec![]) .to_subscribe(false) @@ -4361,9 +4433,9 @@ fn test_ignore_graft_from_unknown_topic() { gs.handle_graft(&peers[0], vec![Topic::new("test").hash()]); //assert that no prune got created + let (control_msgs, _) = count_control_msgs(receivers, |_, a| matches!(a, RpcOut::Prune { .. })); assert_eq!( - count_control_msgs(&queues, |_, a| matches!(a, RpcOut::Prune { .. })), - 0, + control_msgs, 0, "we should not prune after graft in unknown topic" ); } @@ -4372,7 +4444,7 @@ fn test_ignore_graft_from_unknown_topic() { fn test_ignore_too_many_iwants_from_same_peer_for_same_message() { let config = Config::default(); //build gossipsub with full mesh - let (mut gs, _, mut queues, topics) = inject_nodes1() + let (mut gs, _, mut receivers, topics) = inject_nodes1() .peer_no(config.mesh_n_high()) .topics(vec!["test".into()]) .to_subscribe(false) @@ -4380,7 +4452,7 @@ fn test_ignore_too_many_iwants_from_same_peer_for_same_message() { //add another peer not in the mesh let (peer, receiver) = add_peer(&mut gs, &topics, false, false); - queues.insert(peer, receiver); + receivers.insert(peer, receiver); //receive a message let mut seq = 0; @@ -4394,7 +4466,7 @@ fn test_ignore_too_many_iwants_from_same_peer_for_same_message() { gs.handle_received_message(m1, &PeerId::random()); //clear events - flush_events(&mut gs, &queues); + let receivers = flush_events(&mut gs, receivers); //the first gossip_retransimission many iwants return the valid message, all others are // ignored. @@ -4403,9 +4475,10 @@ fn test_ignore_too_many_iwants_from_same_peer_for_same_message() { } assert_eq!( - queues.into_values().fold(0, |mut fwds, c| { - while !c.non_priority.is_empty() { - if let Ok(RpcOut::Forward { .. }) = c.non_priority.try_recv() { + receivers.into_values().fold(0, |mut fwds, c| { + let non_priority = c.non_priority.into_inner(); + while !non_priority.is_empty() { + if let Ok(RpcOut::Forward { .. }) = non_priority.try_recv() { fwds += 1; } } @@ -4460,10 +4533,12 @@ fn test_ignore_too_many_ihaves() { .collect(); //we send iwant only for the first 10 messages + let (control_msgs, receivers) = count_control_msgs(receivers, |p, action| { + p == &peer + && matches!(action, RpcOut::IWant(IWant { message_ids }) if message_ids.len() == 1 && first_ten.contains(&message_ids[0])) + }); assert_eq!( - count_control_msgs(&receivers, |p, action| p == &peer - && matches!(action, RpcOut::IWant(IWant { message_ids }) if message_ids.len() == 1 && first_ten.contains(&message_ids[0]))), - 10, + control_msgs, 10, "exactly the first ten ihaves should be processed and one iwant for each created" ); @@ -4484,12 +4559,11 @@ fn test_ignore_too_many_ihaves() { } //we sent iwant for all 10 messages - assert_eq!( - count_control_msgs(&receivers, |p, action| p == &peer - && matches!(action, RpcOut::IWant(IWant { message_ids }) if message_ids.len() == 1)), - 10, - "all 20 should get sent" - ); + let (control_msgs, _) = count_control_msgs(receivers, |p, action| { + p == &peer + && matches!(action, RpcOut::IWant(IWant { message_ids }) if message_ids.len() == 1) + }); + assert_eq!(control_msgs, 10, "all 20 should get sent"); } #[test] @@ -4500,7 +4574,7 @@ fn test_ignore_too_many_messages_in_ihave() { .build() .unwrap(); //build gossipsub with full mesh - let (mut gs, _, mut queues, topics) = inject_nodes1() + let (mut gs, _, mut receivers, topics) = inject_nodes1() .peer_no(config.mesh_n_high()) .topics(vec!["test".into()]) .to_subscribe(false) @@ -4509,7 +4583,7 @@ fn test_ignore_too_many_messages_in_ihave() { //add another peer not in the mesh let (peer, receiver) = add_peer(&mut gs, &topics, false, false); - queues.insert(peer, receiver); + receivers.insert(peer, receiver); //peer has 20 messages let mut seq = 0; @@ -4534,18 +4608,18 @@ fn test_ignore_too_many_messages_in_ihave() { //we send iwant only for the first 10 messages let mut sum = 0; - assert_eq!( - count_control_msgs(&queues, |p, rpc| match rpc { - RpcOut::IWant(IWant { message_ids }) => { - p == &peer && { - assert!(first_twelve.is_superset(&message_ids.iter().collect())); - sum += message_ids.len(); - true - } + let (control_msgs, receivers) = count_control_msgs(receivers, |p, rpc| match rpc { + RpcOut::IWant(IWant { message_ids }) => { + p == &peer && { + assert!(first_twelve.is_superset(&message_ids.iter().collect())); + sum += message_ids.len(); + true } - _ => false, - }), - 2, + } + _ => false, + }); + assert_eq!( + control_msgs, 2, "the third ihave should get ignored and no iwant sent" ); @@ -4560,20 +4634,16 @@ fn test_ignore_too_many_messages_in_ihave() { //we sent 10 iwant messages ids via a IWANT rpc. let mut sum = 0; - assert_eq!( - count_control_msgs(&queues, |p, rpc| { - match rpc { - RpcOut::IWant(IWant { message_ids }) => { - p == &peer && { - sum += message_ids.len(); - true - } - } - _ => false, + let (control_msgs, _) = count_control_msgs(receivers, |p, rpc| match rpc { + RpcOut::IWant(IWant { message_ids }) => { + p == &peer && { + sum += message_ids.len(); + true } - }), - 1 - ); + } + _ => false, + }); + assert_eq!(control_msgs, 1); assert_eq!(sum, 10, "exactly 20 iwants should get sent"); } @@ -4619,22 +4689,22 @@ fn test_limit_number_of_message_ids_inside_ihave() { let mut ihaves1 = HashSet::new(); let mut ihaves2 = HashSet::new(); - assert_eq!( - count_control_msgs(&receivers, |p, action| match action { - RpcOut::IHave(IHave { message_ids, .. }) => { - if p == &p1 { - ihaves1 = message_ids.iter().cloned().collect(); - true - } else if p == &p2 { - ihaves2 = message_ids.iter().cloned().collect(); - true - } else { - false - } + let (control_msgs, _) = count_control_msgs(receivers, |p, action| match action { + RpcOut::IHave(IHave { message_ids, .. }) => { + if p == &p1 { + ihaves1 = message_ids.iter().cloned().collect(); + true + } else if p == &p2 { + ihaves2 = message_ids.iter().cloned().collect(); + true + } else { + false } - _ => false, - }), - 2, + } + _ => false, + }); + assert_eq!( + control_msgs, 2, "should have emitted one ihave to p1 and one to p2" ); @@ -4668,7 +4738,6 @@ fn test_iwant_penalties() { .with_env_filter(EnvFilter::from_default_env()) .try_init(); */ - let config = ConfigBuilder::default() .iwant_followup_time(Duration::from_secs(4)) .build() @@ -4791,7 +4860,7 @@ fn test_publish_to_floodsub_peers_without_flood_publish() { .flood_publish(false) .build() .unwrap(); - let (mut gs, _, mut queues, topics) = inject_nodes1() + let (mut gs, _, mut receivers, topics) = inject_nodes1() .peer_no(config.mesh_n_low() - 1) .topics(vec!["test".into()]) .to_subscribe(false) @@ -4807,11 +4876,11 @@ fn test_publish_to_floodsub_peers_without_flood_publish() { Multiaddr::empty(), Some(PeerKind::Floodsub), ); - queues.insert(p1, receiver1); + receivers.insert(p1, receiver1); let (p2, receiver2) = add_peer_with_addr_and_kind(&mut gs, &topics, false, false, Multiaddr::empty(), None); - queues.insert(p2, receiver2); + receivers.insert(p2, receiver2); //p1 and p2 are not in the mesh assert!(!gs.mesh[&topics[0]].contains(&p1) && !gs.mesh[&topics[0]].contains(&p2)); @@ -4821,12 +4890,13 @@ fn test_publish_to_floodsub_peers_without_flood_publish() { gs.publish(Topic::new("test"), publish_data).unwrap(); // Collect publish messages to floodsub peers - let publishes = queues - .iter() + let publishes = receivers + .into_iter() .fold(0, |mut collected_publish, (peer_id, c)| { - while !c.priority.is_empty() { - if matches!(c.priority.try_recv(), - Ok(RpcOut::Publish{..}) if peer_id == &p1 || peer_id == &p2) + let priority = c.priority.into_inner(); + while !priority.is_empty() { + if matches!(priority.try_recv(), + Ok(RpcOut::Publish{..}) if peer_id == p1 || peer_id == p2) { collected_publish += 1; } @@ -4846,7 +4916,7 @@ fn test_do_not_use_floodsub_in_fanout() { .flood_publish(false) .build() .unwrap(); - let (mut gs, _, mut queues, _) = inject_nodes1() + let (mut gs, _, mut receivers, _) = inject_nodes1() .peer_no(config.mesh_n_low() - 1) .topics(Vec::new()) .to_subscribe(false) @@ -4866,22 +4936,23 @@ fn test_do_not_use_floodsub_in_fanout() { Some(PeerKind::Floodsub), ); - queues.insert(p1, receiver1); + receivers.insert(p1, receiver1); let (p2, receiver2) = add_peer_with_addr_and_kind(&mut gs, &topics, false, false, Multiaddr::empty(), None); - queues.insert(p2, receiver2); + receivers.insert(p2, receiver2); //publish a message let publish_data = vec![0; 42]; gs.publish(Topic::new("test"), publish_data).unwrap(); // Collect publish messages to floodsub peers - let publishes = queues - .iter() + let publishes = receivers + .into_iter() .fold(0, |mut collected_publish, (peer_id, c)| { - while !c.priority.is_empty() { - if matches!(c.priority.try_recv(), - Ok(RpcOut::Publish{..}) if peer_id == &p1 || peer_id == &p2) + let priority = c.priority.into_inner(); + while !priority.is_empty() { + if matches!(priority.try_recv(), + Ok(RpcOut::Publish{..}) if peer_id == p1 || peer_id == p2) { collected_publish += 1; } @@ -4932,7 +5003,7 @@ fn test_dont_add_floodsub_peers_to_mesh_on_join() { #[test] fn test_dont_send_px_to_old_gossipsub_peers() { - let (mut gs, _, queues, topics) = inject_nodes1() + let (mut gs, _, receivers, topics) = inject_nodes1() .peer_no(0) .topics(vec!["test".into()]) .to_subscribe(false) @@ -4956,20 +5027,17 @@ fn test_dont_send_px_to_old_gossipsub_peers() { ); //check that prune does not contain px - assert_eq!( - count_control_msgs(&queues, |_, m| match m { - RpcOut::Prune(Prune { peers: px, .. }) => !px.is_empty(), - _ => false, - }), - 0, - "Should not send px to floodsub peers" - ); + let (control_msgs, _) = count_control_msgs(receivers, |_, m| match m { + RpcOut::Prune(Prune { peers: px, .. }) => !px.is_empty(), + _ => false, + }); + assert_eq!(control_msgs, 0, "Should not send px to floodsub peers"); } #[test] fn test_dont_send_floodsub_peers_in_px() { //build mesh with one peer - let (mut gs, peers, queues, topics) = inject_nodes1() + let (mut gs, peers, receivers, topics) = inject_nodes1() .peer_no(1) .topics(vec!["test".into()]) .to_subscribe(true) @@ -4994,14 +5062,11 @@ fn test_dont_send_floodsub_peers_in_px() { ); //check that px in prune message is empty - assert_eq!( - count_control_msgs(&queues, |_, m| match m { - RpcOut::Prune(Prune { peers: px, .. }) => !px.is_empty(), - _ => false, - }), - 0, - "Should not include floodsub peers in px" - ); + let (control_msgs, _) = count_control_msgs(receivers, |_, m| match m { + RpcOut::Prune(Prune { peers: px, .. }) => !px.is_empty(), + _ => false, + }); + assert_eq!(control_msgs, 0, "Should not include floodsub peers in px"); } #[test] @@ -5088,7 +5153,7 @@ fn test_subscribe_and_graft_with_negative_score() { ))) .create_network(); - let (mut gs2, _, queues, _) = inject_nodes1().create_network(); + let (mut gs2, _, receivers, _) = inject_nodes1().create_network(); let connection_id = ConnectionId::new_unchecked(0); @@ -5105,37 +5170,41 @@ fn test_subscribe_and_graft_with_negative_score() { //subscribe to topic in gs2 gs2.subscribe(&topic).unwrap(); - let forward_messages_to_p1 = |gs1: &mut Behaviour<_, _>, _gs2: &mut Behaviour<_, _>| { - //collect messages to p1 - let messages_to_p1 = - queues - .iter() - .filter_map(|(peer_id, c)| match c.non_priority.try_recv() { - Ok(rpc) if peer_id == &p1 => Some(rpc), - _ => None, - }); - - for message in messages_to_p1 { - gs1.on_connection_handler_event( - p2, - connection_id, - HandlerEvent::Message { - rpc: proto_to_message(&message.into_protobuf()), - invalid_messages: vec![], - }, - ); + let forward_messages_to_p1 = |gs1: &mut Behaviour<_, _>, + p1: PeerId, + p2: PeerId, + connection_id: ConnectionId, + receivers: HashMap| + -> HashMap { + let new_receivers = HashMap::new(); + for (peer_id, receiver) in receivers.into_iter() { + let non_priority = receiver.non_priority.into_inner(); + match non_priority.try_recv() { + Ok(rpc) if peer_id == p1 => { + gs1.on_connection_handler_event( + p2, + connection_id, + HandlerEvent::Message { + rpc: proto_to_message(&rpc.into_protobuf()), + invalid_messages: vec![], + }, + ); + } + _ => {} + } } + new_receivers }; //forward the subscribe message - forward_messages_to_p1(&mut gs1, &mut gs2); + let receivers = forward_messages_to_p1(&mut gs1, p1, p2, connection_id, receivers); //heartbeats on both gs1.heartbeat(); gs2.heartbeat(); //forward messages again - forward_messages_to_p1(&mut gs1, &mut gs2); + forward_messages_to_p1(&mut gs1, p1, p2, connection_id, receivers); //nobody got penalized assert!(gs1.peer_score.as_ref().unwrap().0.score(&p2) >= original_score); diff --git a/beacon_node/lighthouse_network/src/gossipsub/handler.rs b/beacon_node/lighthouse_network/src/gossipsub/handler.rs index a8a980ae87e..298570955fc 100644 --- a/beacon_node/lighthouse_network/src/gossipsub/handler.rs +++ b/beacon_node/lighthouse_network/src/gossipsub/handler.rs @@ -176,12 +176,6 @@ impl Handler { } impl EnabledHandler { - #[cfg(test)] - /// For testing purposed obtain the RPCReceiver - pub fn receiver(&mut self) -> RpcReceiver { - self.send_queue.clone() - } - fn on_fully_negotiated_inbound( &mut self, (substream, peer_kind): (Framed, PeerKind), @@ -237,7 +231,7 @@ impl EnabledHandler { } // determine if we need to create the outbound stream - if !self.send_queue.is_empty() + if !self.send_queue.poll_is_empty(cx) && self.outbound_substream.is_none() && !self.outbound_substream_establishing { @@ -247,10 +241,6 @@ impl EnabledHandler { }); } - // We may need to inform the behviour if we have a dropped a message. This gets set if that - // is the case. - let mut dropped_message = None; - // process outbound stream loop { match std::mem::replace( @@ -271,10 +261,11 @@ impl EnabledHandler { } => { if Pin::new(timeout).poll(cx).is_ready() { // Inform the behaviour and end the poll. - dropped_message = Some(HandlerEvent::MessageDropped(message)); self.outbound_substream = Some(OutboundSubstreamState::WaitingOutput(substream)); - break; + return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour( + HandlerEvent::MessageDropped(message), + )); } } _ => {} // All other messages are not time-bound. @@ -348,13 +339,7 @@ impl EnabledHandler { } } - // If there was a timeout in sending a message, inform the behaviour before restarting the - // poll - if let Some(handler_event) = dropped_message { - return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(handler_event)); - } - - // Handle inbound messages + // Handle inbound messages. loop { match std::mem::replace( &mut self.inbound_substream, @@ -419,6 +404,13 @@ impl EnabledHandler { } } + // Drop the next message in queue if it's stale. + if let Poll::Ready(Some(rpc)) = self.send_queue.poll_stale(cx) { + return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour( + HandlerEvent::MessageDropped(rpc), + )); + } + Poll::Pending } } diff --git a/beacon_node/lighthouse_network/src/gossipsub/types.rs b/beacon_node/lighthouse_network/src/gossipsub/types.rs index b7bcbf6b3ac..f77185c7c58 100644 --- a/beacon_node/lighthouse_network/src/gossipsub/types.rs +++ b/beacon_node/lighthouse_network/src/gossipsub/types.rs @@ -22,7 +22,8 @@ use crate::gossipsub::metrics::Metrics; use crate::gossipsub::TopicHash; use async_channel::{Receiver, Sender}; -use futures::Stream; +use futures::stream::Peekable; +use futures::{Future, Stream, StreamExt}; use futures_timer::Delay; use instant::Duration; use libp2p::identity::PeerId; @@ -33,7 +34,7 @@ use std::collections::BTreeSet; use std::fmt::Debug; use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::Arc; -use std::task::Poll; +use std::task::{Context, Poll}; use std::{fmt, pin::Pin}; use crate::gossipsub::rpc_proto::proto; @@ -591,9 +592,10 @@ impl fmt::Display for PeerKind { pub(crate) struct RpcSender { cap: usize, len: Arc, - priority: Sender, - non_priority: Sender, - receiver: RpcReceiver, + pub(crate) priority_sender: Sender, + pub(crate) non_priority_sender: Sender, + priority_receiver: Receiver, + non_priority_receiver: Receiver, } impl RpcSender { @@ -602,29 +604,29 @@ impl RpcSender { let (priority_sender, priority_receiver) = async_channel::unbounded(); let (non_priority_sender, non_priority_receiver) = async_channel::bounded(cap / 2); let len = Arc::new(AtomicUsize::new(0)); - let receiver = RpcReceiver { - priority_len: len.clone(), - priority: priority_receiver, - non_priority: non_priority_receiver, - }; RpcSender { cap: cap / 2, len, - priority: priority_sender, - non_priority: non_priority_sender, - receiver: receiver.clone(), + priority_sender, + non_priority_sender, + priority_receiver, + non_priority_receiver, } } /// Create a new Receiver to the sender. pub(crate) fn new_receiver(&self) -> RpcReceiver { - self.receiver.clone() + RpcReceiver { + priority_len: self.len.clone(), + priority: self.priority_receiver.clone().peekable(), + non_priority: self.non_priority_receiver.clone().peekable(), + } } /// Send a `RpcOut::Graft` message to the `RpcReceiver` /// this is high priority. pub(crate) fn graft(&mut self, graft: Graft) { - self.priority + self.priority_sender .try_send(RpcOut::Graft(graft)) .expect("Channel is unbounded and should always be open"); } @@ -632,7 +634,7 @@ impl RpcSender { /// Send a `RpcOut::Prune` message to the `RpcReceiver` /// this is high priority. pub(crate) fn prune(&mut self, prune: Prune) { - self.priority + self.priority_sender .try_send(RpcOut::Prune(prune)) .expect("Channel is unbounded and should always be open"); } @@ -641,7 +643,7 @@ impl RpcSender { /// this is low priority, if the queue is full an Err is returned. #[allow(clippy::result_large_err)] pub(crate) fn ihave(&mut self, ihave: IHave) -> Result<(), RpcOut> { - self.non_priority + self.non_priority_sender .try_send(RpcOut::IHave(ihave)) .map_err(|err| err.into_inner()) } @@ -650,7 +652,7 @@ impl RpcSender { /// this is low priority, if the queue is full an Err is returned. #[allow(clippy::result_large_err)] pub(crate) fn iwant(&mut self, iwant: IWant) -> Result<(), RpcOut> { - self.non_priority + self.non_priority_sender .try_send(RpcOut::IWant(iwant)) .map_err(|err| err.into_inner()) } @@ -658,7 +660,7 @@ impl RpcSender { /// Send a `RpcOut::Subscribe` message to the `RpcReceiver` /// this is high priority. pub(crate) fn subscribe(&mut self, topic: TopicHash) { - self.priority + self.priority_sender .try_send(RpcOut::Subscribe(topic)) .expect("Channel is unbounded and should always be open"); } @@ -666,7 +668,7 @@ impl RpcSender { /// Send a `RpcOut::Unsubscribe` message to the `RpcReceiver` /// this is high priority. pub(crate) fn unsubscribe(&mut self, topic: TopicHash) { - self.priority + self.priority_sender .try_send(RpcOut::Unsubscribe(topic)) .expect("Channel is unbounded and should always be open"); } @@ -682,7 +684,7 @@ impl RpcSender { if self.len.load(Ordering::Relaxed) >= self.cap { return Err(()); } - self.priority + self.priority_sender .try_send(RpcOut::Publish { message: message.clone(), timeout: Delay::new(timeout), @@ -705,7 +707,7 @@ impl RpcSender { timeout: Duration, metrics: Option<&mut Metrics>, ) -> Result<(), ()> { - self.non_priority + self.non_priority_sender .try_send(RpcOut::Forward { message: message.clone(), timeout: Delay::new(timeout), @@ -726,25 +728,73 @@ impl RpcSender { /// Returns the current size of the non-priority queue. pub(crate) fn non_priority_len(&self) -> usize { - self.non_priority.len() + self.non_priority_sender.len() } } /// `RpcOut` sender that is priority aware. -#[derive(Debug, Clone)] +#[derive(Debug)] pub struct RpcReceiver { /// The maximum length of the priority queue. - priority_len: Arc, + pub(crate) priority_len: Arc, /// The priority queue receiver. - pub(crate) priority: Receiver, + pub(crate) priority: Peekable>, /// The non priority queue receiver. - pub(crate) non_priority: Receiver, + pub(crate) non_priority: Peekable>, } impl RpcReceiver { - /// Check if both queues are empty. - pub(crate) fn is_empty(&self) -> bool { - self.priority.is_empty() && self.non_priority.is_empty() + // Peek the next message in the queues and return it if its timeout has elapsed. + // Returns `None` if there aren't any more messages on the stream or none is stale. + pub(crate) fn poll_stale(&mut self, cx: &mut Context<'_>) -> Poll> { + // Peek priority queue. + let priority = match Pin::new(&mut self.priority).poll_peek_mut(cx) { + Poll::Ready(Some(RpcOut::Publish { + message: _, + ref mut timeout, + })) => { + if Pin::new(timeout).poll(cx).is_ready() { + // Return the message. + let dropped = futures::ready!(self.priority.poll_next_unpin(cx)) + .expect("There should be a message"); + return Poll::Ready(Some(dropped)); + } + Poll::Ready(None) + } + poll => poll, + }; + + let non_priority = match Pin::new(&mut self.non_priority).poll_peek_mut(cx) { + Poll::Ready(Some(RpcOut::Forward { + message: _, + ref mut timeout, + })) => { + if Pin::new(timeout).poll(cx).is_ready() { + // Return the message. + let dropped = futures::ready!(self.non_priority.poll_next_unpin(cx)) + .expect("There should be a message"); + return Poll::Ready(Some(dropped)); + } + Poll::Ready(None) + } + poll => poll, + }; + + match (priority, non_priority) { + (Poll::Ready(None), Poll::Ready(None)) => Poll::Ready(None), + _ => Poll::Pending, + } + } + + /// Poll queues and return true if both are empty. + pub(crate) fn poll_is_empty(&mut self, cx: &mut Context<'_>) -> bool { + matches!( + ( + Pin::new(&mut self.priority).poll_peek(cx), + Pin::new(&mut self.non_priority).poll_peek(cx), + ), + (Poll::Ready(None), Poll::Ready(None)) + ) } }