diff --git a/.github/workflows/merge.yml b/.github/workflows/merge.yml index ae08f8ec03..39a9370417 100644 --- a/.github/workflows/merge.yml +++ b/.github/workflows/merge.yml @@ -101,6 +101,10 @@ jobs: run: cargo test --no-run --release timeout-minutes: 30 + - name: Run network tests + timeout-minutes: 25 + run: cargo test --release -p safenode -- network + - name: Run protocol tests timeout-minutes: 25 run: cargo test --release -p safenode -- protocol diff --git a/safenode/src/client/api.rs b/safenode/src/client/api.rs index 7200940c70..69a41952db 100644 --- a/safenode/src/client/api.rs +++ b/safenode/src/client/api.rs @@ -29,7 +29,7 @@ use crate::{ use futures::future::select_all; use libp2p::PeerId; -use std::{collections::HashSet, time::Duration}; +use std::time::Duration; use tokio::task::spawn; impl Client { @@ -235,7 +235,7 @@ impl Client { info!("Sending {:?} to the closest peers.", request.dst()); let closest_peers = self .network - .get_closest_peers(*request.dst().name()) + .client_get_closest_peers(*request.dst().name()) .await?; Ok(self .send_and_get_responses(closest_peers, &request, true) @@ -248,7 +248,7 @@ impl Client { // If `get_all_responses` is false, we return the first successful response that we get. async fn send_and_get_responses( &self, - nodes: HashSet, + nodes: Vec, req: &Request, get_all_responses: bool, ) -> Vec> { diff --git a/safenode/src/network/mod.rs b/safenode/src/network/mod.rs index e462f3c151..af728b48ac 100644 --- a/safenode/src/network/mod.rs +++ b/safenode/src/network/mod.rs @@ -157,7 +157,10 @@ impl SwarmDriver { }; Ok(( - Network { swarm_cmd_sender }, + Network { + swarm_cmd_sender, + peer_id, + }, network_event_receiver, swarm_driver, )) @@ -234,6 +237,8 @@ fn restart_at_random(peer_id: &PeerId) { /// API to interact with the underlying Swarm pub struct Network { pub(super) swarm_cmd_sender: mpsc::Sender, + #[allow(dead_code)] + pub(super) peer_id: PeerId, } impl Network { @@ -257,8 +262,21 @@ impl Network { receiver.await? } - /// Find the closest peers to the given `XorName` - pub async fn get_closest_peers(&self, xor_name: XorName) -> Result> { + /// Returns the closest peers to the given `XorName`, sorted by their distance to the xor_name. + /// Excludes the client's `PeerId` while calculating the closest peers. + pub async fn client_get_closest_peers(&self, xor_name: XorName) -> Result> { + self.get_closest_peers(xor_name, true).await + } + + /// Returns the closest peers to the given `XorName`, sorted by their distance to the xor_name. + /// Includes our node's `PeerId` while calculating the closest peers. + pub async fn node_get_closest_peers(&self, xor_name: XorName) -> Result> { + self.get_closest_peers(xor_name, false).await + } + + /// Returns the closest peers to the given `XorName`, sorted by their distance to the xor_name. + /// If `client` is false, then include `self` among the `closest_peers` + async fn get_closest_peers(&self, xor_name: XorName, client: bool) -> Result> { let (sender, receiver) = oneshot::channel(); self.send_swarm_cmd(SwarmCmd::GetClosestPeers { xor_name, sender }) .await?; @@ -266,15 +284,16 @@ impl Network { // Count self in if among the CLOSE_GROUP_SIZE closest and sort the result let mut closest_peers: Vec<_> = k_bucket_peers.into_iter().collect(); - closest_peers.push(our_id); + if !client { + closest_peers.push(our_id); + } let target = KBucketKey::new(xor_name.0.to_vec()); closest_peers.sort_by(|a, b| { let a = KBucketKey::new(a.to_bytes()); let b = KBucketKey::new(b.to_bytes()); target.distance(&a).cmp(&target.distance(&b)) }); - // TODO: shall the return type shall be `Vec` to retain the order? - let closest_peers: HashSet = closest_peers + let closest_peers: Vec = closest_peers .iter() .take(CLOSE_GROUP_SIZE) .cloned() @@ -318,3 +337,112 @@ impl Network { Ok(()) } } + +#[cfg(test)] +mod tests { + use super::SwarmDriver; + use crate::log::init_node_logging; + use eyre::{eyre, Result}; + use libp2p::{ + kad::{ + kbucket::{Entry, InsertResult, KBucketsTable, NodeStatus}, + KBucketKey, + }, + PeerId, + }; + use rand::thread_rng; + use std::{ + collections::{BTreeMap, HashMap}, + fmt, + time::Duration, + }; + use xor_name::XorName; + + #[tokio::test(flavor = "multi_thread")] + async fn closest() -> Result<()> { + let _ = init_node_logging(&None)?; + let mut networks_list = Vec::new(); + let mut network_events_recievers = BTreeMap::new(); + for _ in 1..25 { + let (net, event_rx, driver) = SwarmDriver::new()?; + let _handle = tokio::spawn(driver.run()); + + let _ = network_events_recievers.insert(net.peer_id, event_rx); + networks_list.push(net); + } + + // Check the closest nodes to the following random_data + let mut rng = thread_rng(); + let random_data = XorName::random(&mut rng); + let random_data_key = KBucketKey::from(random_data.0.to_vec()); + + tokio::time::sleep(Duration::from_secs(5)).await; + let our_net = networks_list + .get(0) + .ok_or_else(|| eyre!("networks_list is not empty"))?; + + // Get the expected list of closest peers by creating a `KBucketsTable` with all the peers + // inserted inside it. + // The `KBucketsTable::local_key` is considered to be random since the `local_key` will not + // be part of the `closest_peers`. Since our implementation of `get_closest_peers` returns + // `self`, we'd want to insert `our_net` into the table as well. + let mut table = + KBucketsTable::<_, ()>::new(KBucketKey::from(PeerId::random()), Duration::from_secs(5)); + let mut key_to_peer_id = HashMap::new(); + for net in networks_list.iter() { + let key = KBucketKey::from(net.peer_id); + let _ = key_to_peer_id.insert(key.clone(), net.peer_id); + + if let Entry::Absent(e) = table.entry(&key) { + match e.insert((), NodeStatus::Connected) { + InsertResult::Inserted => {} + _ => continue, + } + } else { + return Err(eyre!("Table entry should be absent")); + } + } + let expected_from_table = table + .closest_keys(&random_data_key) + .map(|key| { + key_to_peer_id + .get(&key) + .cloned() + .ok_or_else(|| eyre::eyre!("Key should be present")) + }) + .take(8) + .collect::>>()?; + info!("Got Closest from table {:?}", expected_from_table.len()); + + // Ask the other nodes for the closest_peers. + let closest = our_net.get_closest_peers(random_data, false).await?; + + assert_lists(closest, expected_from_table); + Ok(()) + } + + /// Test utility + + fn assert_lists(a: I, b: J) + where + K: fmt::Debug + Eq, + I: IntoIterator, + J: IntoIterator, + { + let vec1: Vec<_> = a.into_iter().collect(); + let mut vec2: Vec<_> = b.into_iter().collect(); + + assert_eq!(vec1.len(), vec2.len()); + + for item1 in &vec1 { + let idx2 = vec2 + .iter() + .position(|item2| item1 == item2) + .expect("Item not found in second list"); + + let _ = vec2.swap_remove(idx2); + } + + assert_eq!(vec2.len(), 0); + } +} diff --git a/safenode/src/node/api.rs b/safenode/src/node/api.rs index 5c6b39c146..3764eab00e 100644 --- a/safenode/src/node/api.rs +++ b/safenode/src/node/api.rs @@ -25,10 +25,7 @@ use sn_dbc::{DbcTransaction, SignedSpend}; use futures::future::select_all; use libp2p::{request_response::ResponseChannel, PeerId}; -use std::{ - collections::{BTreeSet, HashSet}, - time::Duration, -}; +use std::{collections::BTreeSet, time::Duration}; use tokio::task::spawn; impl Node { @@ -327,9 +324,10 @@ impl Node { async fn send_to_closest(&self, request: &Request) -> Result>> { info!("Sending {:?} to the closest peers.", request.dst()); + // todo: if `self` is present among the closest peers, the request should be routed to self? let closest_peers = self .network - .get_closest_peers(*request.dst().name()) + .node_get_closest_peers(*request.dst().name()) .await?; Ok(self @@ -343,7 +341,7 @@ impl Node { // If `get_all_responses` is false, we return the first successful response that we get async fn send_and_get_responses( &self, - peers: HashSet, + peers: Vec, req: &Request, get_all_responses: bool, ) -> Vec> {