Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

test: closest_peers + small fix #78

Merged
merged 2 commits into from
Apr 12, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .github/workflows/merge.yml
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,10 @@ jobs:
run: cargo test --no-run --release
timeout-minutes: 30

- name: Run network tests
timeout-minutes: 25
run: cargo test --release -p safenode -- network

- name: Run protocol tests
timeout-minutes: 25
run: cargo test --release -p safenode -- protocol
Expand Down
6 changes: 3 additions & 3 deletions safenode/src/client/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use crate::{

use futures::future::select_all;
use libp2p::PeerId;
use std::{collections::HashSet, time::Duration};
use std::time::Duration;
use tokio::task::spawn;

impl Client {
Expand Down Expand Up @@ -235,7 +235,7 @@ impl Client {
info!("Sending {:?} to the closest peers.", request.dst());
let closest_peers = self
.network
.get_closest_peers(*request.dst().name())
.client_get_closest_peers(*request.dst().name())
.await?;
Ok(self
.send_and_get_responses(closest_peers, &request, true)
Expand All @@ -248,7 +248,7 @@ impl Client {
// If `get_all_responses` is false, we return the first successful response that we get.
async fn send_and_get_responses(
&self,
nodes: HashSet<PeerId>,
nodes: Vec<PeerId>,
RolandSherwin marked this conversation as resolved.
Show resolved Hide resolved
req: &Request,
get_all_responses: bool,
) -> Vec<Result<Response>> {
Expand Down
140 changes: 134 additions & 6 deletions safenode/src/network/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,10 @@ impl SwarmDriver {
};

Ok((
Network { swarm_cmd_sender },
Network {
swarm_cmd_sender,
peer_id,
},
network_event_receiver,
swarm_driver,
))
Expand Down Expand Up @@ -234,6 +237,8 @@ fn restart_at_random(peer_id: &PeerId) {
/// API to interact with the underlying Swarm
pub struct Network {
pub(super) swarm_cmd_sender: mpsc::Sender<SwarmCmd>,
#[allow(dead_code)]
RolandSherwin marked this conversation as resolved.
Show resolved Hide resolved
pub(super) peer_id: PeerId,
}

impl Network {
Expand All @@ -257,24 +262,38 @@ impl Network {
receiver.await?
}

/// Find the closest peers to the given `XorName`
pub async fn get_closest_peers(&self, xor_name: XorName) -> Result<HashSet<PeerId>> {
/// Returns the closest peers to the given `XorName`, sorted by their distance to the xor_name.
/// Excludes the client's `PeerId` while calculating the closest peers.
pub async fn client_get_closest_peers(&self, xor_name: XorName) -> Result<Vec<PeerId>> {
self.get_closest_peers(xor_name, true).await
}

/// Returns the closest peers to the given `XorName`, sorted by their distance to the xor_name.
/// Includes our node's `PeerId` while calculating the closest peers.
pub async fn node_get_closest_peers(&self, xor_name: XorName) -> Result<Vec<PeerId>> {
self.get_closest_peers(xor_name, false).await
}

/// Returns the closest peers to the given `XorName`, sorted by their distance to the xor_name.
/// If `client` is false, then include `self` among the `closest_peers`
async fn get_closest_peers(&self, xor_name: XorName, client: bool) -> Result<Vec<PeerId>> {
let (sender, receiver) = oneshot::channel();
self.send_swarm_cmd(SwarmCmd::GetClosestPeers { xor_name, sender })
.await?;
let (our_id, k_bucket_peers) = receiver.await?;

// Count self in if among the CLOSE_GROUP_SIZE closest and sort the result
let mut closest_peers: Vec<_> = k_bucket_peers.into_iter().collect();
closest_peers.push(our_id);
if !client {
closest_peers.push(our_id);
}
let target = KBucketKey::new(xor_name.0.to_vec());
closest_peers.sort_by(|a, b| {
let a = KBucketKey::new(a.to_bytes());
let b = KBucketKey::new(b.to_bytes());
target.distance(&a).cmp(&target.distance(&b))
});
// TODO: shall the return type shall be `Vec<PeerId>` to retain the order?
let closest_peers: HashSet<PeerId> = closest_peers
let closest_peers: Vec<PeerId> = closest_peers
.iter()
.take(CLOSE_GROUP_SIZE)
.cloned()
Expand Down Expand Up @@ -318,3 +337,112 @@ impl Network {
Ok(())
}
}

#[cfg(test)]
mod tests {
use super::SwarmDriver;
use crate::log::init_node_logging;
use eyre::{eyre, Result};
use libp2p::{
kad::{
kbucket::{Entry, InsertResult, KBucketsTable, NodeStatus},
KBucketKey,
},
PeerId,
};
use rand::thread_rng;
use std::{
collections::{BTreeMap, HashMap},
fmt,
time::Duration,
};
use xor_name::XorName;

#[tokio::test(flavor = "multi_thread")]
async fn closest() -> Result<()> {
RolandSherwin marked this conversation as resolved.
Show resolved Hide resolved
let _ = init_node_logging(&None)?;
let mut networks_list = Vec::new();
let mut network_events_recievers = BTreeMap::new();
for _ in 1..25 {
RolandSherwin marked this conversation as resolved.
Show resolved Hide resolved
let (net, event_rx, driver) = SwarmDriver::new()?;
let _handle = tokio::spawn(driver.run());

let _ = network_events_recievers.insert(net.peer_id, event_rx);
networks_list.push(net);
}

// Check the closest nodes to the following random_data
let mut rng = thread_rng();
let random_data = XorName::random(&mut rng);
let random_data_key = KBucketKey::from(random_data.0.to_vec());

tokio::time::sleep(Duration::from_secs(5)).await;
let our_net = networks_list
.get(0)
.ok_or_else(|| eyre!("networks_list is not empty"))?;

// Get the expected list of closest peers by creating a `KBucketsTable` with all the peers
// inserted inside it.
// The `KBucketsTable::local_key` is considered to be random since the `local_key` will not
// be part of the `closest_peers`. Since our implementation of `get_closest_peers` returns
// `self`, we'd want to insert `our_net` into the table as well.
let mut table =
KBucketsTable::<_, ()>::new(KBucketKey::from(PeerId::random()), Duration::from_secs(5));
let mut key_to_peer_id = HashMap::new();
for net in networks_list.iter() {
let key = KBucketKey::from(net.peer_id);
let _ = key_to_peer_id.insert(key.clone(), net.peer_id);

if let Entry::Absent(e) = table.entry(&key) {
match e.insert((), NodeStatus::Connected) {
InsertResult::Inserted => {}
_ => continue,
}
} else {
return Err(eyre!("Table entry should be absent"));
}
}
let expected_from_table = table
.closest_keys(&random_data_key)
.map(|key| {
key_to_peer_id
.get(&key)
.cloned()
.ok_or_else(|| eyre::eyre!("Key should be present"))
})
.take(8)
.collect::<Result<Vec<_>>>()?;
info!("Got Closest from table {:?}", expected_from_table.len());

// Ask the other nodes for the closest_peers.
let closest = our_net.get_closest_peers(random_data, false).await?;

assert_lists(closest, expected_from_table);
Ok(())
}

/// Test utility

fn assert_lists<I, J, K>(a: I, b: J)
where
K: fmt::Debug + Eq,
I: IntoIterator<Item = K>,
J: IntoIterator<Item = K>,
{
let vec1: Vec<_> = a.into_iter().collect();
let mut vec2: Vec<_> = b.into_iter().collect();

assert_eq!(vec1.len(), vec2.len());

for item1 in &vec1 {
let idx2 = vec2
.iter()
.position(|item2| item1 == item2)
.expect("Item not found in second list");

let _ = vec2.swap_remove(idx2);
}

assert_eq!(vec2.len(), 0);
}
}
10 changes: 4 additions & 6 deletions safenode/src/node/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,7 @@ use sn_dbc::{DbcTransaction, SignedSpend};

use futures::future::select_all;
use libp2p::{request_response::ResponseChannel, PeerId};
use std::{
collections::{BTreeSet, HashSet},
time::Duration,
};
use std::{collections::BTreeSet, time::Duration};
use tokio::task::spawn;

impl Node {
Expand Down Expand Up @@ -327,9 +324,10 @@ impl Node {

async fn send_to_closest(&self, request: &Request) -> Result<Vec<Result<Response>>> {
info!("Sending {:?} to the closest peers.", request.dst());
// todo: if `self` is present among the closest peers, the request should be routed to self?
RolandSherwin marked this conversation as resolved.
Show resolved Hide resolved
let closest_peers = self
.network
.get_closest_peers(*request.dst().name())
.node_get_closest_peers(*request.dst().name())
.await?;

Ok(self
Expand All @@ -343,7 +341,7 @@ impl Node {
// If `get_all_responses` is false, we return the first successful response that we get
async fn send_and_get_responses(
&self,
peers: HashSet<PeerId>,
peers: Vec<PeerId>,
req: &Request,
get_all_responses: bool,
) -> Vec<Result<Response>> {
Expand Down