Skip to content
This repository was archived by the owner on Nov 15, 2023. It is now read-only.
Closed
Show file tree
Hide file tree
Changes from 18 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 4 additions & 1 deletion core/peerset/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,7 @@ libp2p = { version = "0.6.0", default-features = false }
linked-hash-map = "0.5"
log = "0.4"
lru-cache = "0.1.2"
serde_json = "1.0.24"
serde_json = "1.0.24"

[dev-dependencies]
rand = "0.6.5"
27 changes: 15 additions & 12 deletions core/peerset/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@ use lru_cache::LruCache;
use slots::{SlotType, SlotState, Slots};
use serde_json::json;

#[cfg(test)]
mod test;

const PEERSET_SCORES_CACHE_SIZE: usize = 1000;
const DISCOVERED_NODES_LIMIT: u32 = 1000;

Expand Down Expand Up @@ -107,7 +110,7 @@ pub enum Message {
}

/// Opaque identifier for an incoming connection. Allocated by the network.
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
#[derive(Debug, Copy, Clone, Hash, PartialEq, Eq)]
pub struct IncomingIndex(pub u64);

impl From<u64> for IncomingIndex {
Expand Down Expand Up @@ -336,7 +339,7 @@ impl Peerset {
SlotType::Common
};

match self.data.in_slots.add_peer(peer_id, slot_type) {
match self.data.in_slots.add_peer(peer_id.clone(), slot_type) {
SlotState::Added(peer_id) => {
// reserved node may have been previously stored as normal node in discovered list
self.data.discovered.remove_peer(&peer_id);
Expand Down Expand Up @@ -442,11 +445,19 @@ impl Stream for Peerset {
}
}

#[cfg(test)]
pub fn next_message(peerset: Peerset) -> Result<(Message, Peerset), ()> {
let (next, peerset) = peerset.into_future()
.wait()
.map_err(|_| ())?;
let message = next.ok_or_else(|| ())?;
Ok((message, peerset))
}

#[cfg(test)]
mod tests {
use libp2p::PeerId;
use futures::prelude::*;
use super::{PeersetConfig, Peerset, Message, IncomingIndex};
use super::{PeersetConfig, Peerset, Message, IncomingIndex, next_message};

fn assert_messages(mut peerset: Peerset, messages: Vec<Message>) -> Peerset {
for expected_message in messages {
Expand All @@ -458,14 +469,6 @@ mod tests {
peerset
}

fn next_message(peerset: Peerset) -> Result<(Message, Peerset), ()> {
let (next, peerset) = peerset.into_future()
.wait()
.map_err(|_| ())?;
let message = next.ok_or_else(|| ())?;
Ok((message, peerset))
}

#[test]
fn test_peerset_from_config_with_bootnodes() {
let bootnode = PeerId::random();
Expand Down
1 change: 1 addition & 0 deletions core/peerset/src/slots.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ pub enum SlotType {
Common,
}

#[derive(Debug)]
/// Descibes the result of `add_peer` action.
pub enum SlotState {
/// Returned when `add_peer` successfully adds a peer to the slot.
Expand Down
262 changes: 262 additions & 0 deletions core/peerset/src/test/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,262 @@
use libp2p::PeerId;
use rand::thread_rng;
use rand::seq::SliceRandom;
use super::{PeersetConfig, Peerset, Message, IncomingIndex, next_message};
use std::collections::{HashMap, HashSet, VecDeque};
use std::fmt::{self, Debug};

#[test]
fn test_random_api_use() {

#[derive(Eq, PartialEq)]
enum TestAction {
AddReservedPeer,
RemoveReservedPeer,
SetReservedOnly(bool),
ReportPeer(i32),
DropPeer,
Incoming(IncomingIndex),
Discovered(Vec<PeerId>),
}

impl Debug for TestAction {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self {
TestAction::AddReservedPeer => {
write!(f, "AddReservedPeer")
},
TestAction::RemoveReservedPeer => {
write!(f, "RemoveReservedPeer")
},
TestAction::SetReservedOnly(reserved) => {
write!(f, "SetReservedOnly {:?}", reserved)
},
TestAction::ReportPeer(diff_score) => {
write!(f, "ReportPeer {:?}", diff_score)
},
TestAction::DropPeer => {
write!(f, "DropPeer")
},
TestAction::Incoming(index) => {
write!(f, "Incoming {:?}", index)
},
TestAction::Discovered(nodes) => {
write!(f, "Discovered {:?}", nodes.len())
},
}
}
}

let bootnode = PeerId::random();
let config = PeersetConfig {
in_peers: 100,
out_peers: 100,
bootnodes: vec![bootnode.clone()],
reserved_only: false,
reserved_nodes: Vec::new(),
};

let (mut peerset, handle) = Peerset::from_config(config);

let mut actions = vec![];
let mut discovered = vec![];
let mut index_to_peer = HashMap::new();

let bootnode_index = IncomingIndex(0);
actions.push((bootnode.clone(), TestAction::AddReservedPeer));
actions.push((bootnode.clone(), TestAction::SetReservedOnly(true)));
actions.push((bootnode.clone(), TestAction::ReportPeer(-1)));
actions.push((bootnode.clone(), TestAction::SetReservedOnly(false)));
actions.push((bootnode.clone(), TestAction::RemoveReservedPeer));
actions.push((bootnode.clone(), TestAction::DropPeer));
actions.push((bootnode.clone(), TestAction::Incoming(bootnode_index)));
actions.push((bootnode.clone(), TestAction::Discovered(discovered.clone())));

index_to_peer.insert(bootnode_index, bootnode.clone());
discovered.push(bootnode.clone());

for i in 1..150 {
let peer_id = PeerId::random();
let index = IncomingIndex(i);
index_to_peer.insert(index, peer_id.clone());
discovered.push(peer_id.clone());
if i > 75 {
// Includes AddReservedPeer.
actions.push((peer_id.clone(), TestAction::AddReservedPeer));
actions.push((peer_id.clone(), TestAction::SetReservedOnly(true)));
actions.push((peer_id.clone(), TestAction::ReportPeer(-1)));
actions.push((peer_id.clone(), TestAction::SetReservedOnly(false)));
actions.push((peer_id.clone(), TestAction::RemoveReservedPeer));
actions.push((peer_id.clone(), TestAction::DropPeer));
actions.push((peer_id.clone(), TestAction::Incoming(index)));
actions.push((peer_id.clone(), TestAction::Discovered(discovered.clone())));
} else {
actions.push((peer_id.clone(), TestAction::SetReservedOnly(true)));
actions.push((peer_id.clone(), TestAction::ReportPeer(-1)));
actions.push((peer_id.clone(), TestAction::SetReservedOnly(false)));
actions.push((peer_id.clone(), TestAction::RemoveReservedPeer));
actions.push((peer_id.clone(), TestAction::DropPeer));
actions.push((peer_id.clone(), TestAction::Incoming(index)));
actions.push((peer_id.clone(), TestAction::Discovered(discovered.clone())));
}
}

let mut dropped_called = HashSet::new();
let mut discovered_called = HashSet::new();
let mut performed_actions = HashMap::new();

let mut rng = thread_rng();
for (peer_id, action) in actions.choose_multiple(&mut rng, actions.len()) {
match action {
TestAction::AddReservedPeer => {
handle.add_reserved_peer(peer_id.clone());
},
TestAction::RemoveReservedPeer => {
handle.remove_reserved_peer(peer_id.clone());
},
TestAction::SetReservedOnly(reserved) => {
handle.set_reserved_only(reserved.clone());
},
TestAction::ReportPeer(diff_score) => {
handle.report_peer(peer_id.clone(), diff_score.clone());
},
TestAction::DropPeer => {
peerset.dropped(peer_id.clone());
dropped_called.insert(peer_id.clone());
},
TestAction::Incoming(index) => {
peerset.incoming(peer_id.clone(), index.clone());
},
TestAction::Discovered(nodes) => {
peerset.discovered(nodes.clone());
for node in nodes {
discovered_called.insert(node.clone());
}
},
}
let performed = performed_actions
.entry(peer_id.clone())
.or_insert(VecDeque::new());
performed.push_back(action)
}

drop(handle);

let mut last_received_messages: HashMap<PeerId, VecDeque<Message>> = HashMap::new();
loop {
let message = match next_message(peerset) {
Ok((message, p)) => {
peerset = p;
message
},
_ => break,
};
match message {
Message::Connect(peer_id) => {
let last_message = {
if let Some(messages) = last_received_messages.get_mut(&peer_id) {
messages.pop_front()
} else {
None
}
};
let action_sequence = {
if let Some(actions) = performed_actions.get_mut(&peer_id) {
Some(actions.clone())
} else {
None
}
};
match last_message {
Some(Message::Drop(_)) | Some(Message::Reject(_)) => {},
_ => {
let relevant_api_called = dropped_called.remove(&peer_id) || discovered_called.remove(&peer_id);
if !relevant_api_called && !(peer_id == bootnode) {
panic!("Unexpected Connect message after a {:?} message, sequence of actions: {:?}", last_message, action_sequence);
}
},
}
let received = last_received_messages.entry(peer_id.clone()).or_insert(VecDeque::new());
received.push_back(Message::Connect(peer_id));
},
Message::Drop(peer_id) => {
let action_sequence = {
if let Some(actions) = performed_actions.get_mut(&peer_id) {
Some(actions.clone())
} else {
None
}
};
let last_message = {
if let Some(messages) = last_received_messages.get_mut(&peer_id) {
messages.pop_front()
} else {
None
}
};
match last_message {
Some(Message::Connect(_)) | Some(Message::Accept(_)) | None => {},
_ => panic!("Unexpected Drop message, after a {:?} message, sequence of actions: {:?}", last_message, action_sequence),
}
let received = last_received_messages.entry(peer_id.clone()).or_insert(VecDeque::new());
received.push_back(Message::Drop(peer_id));
},
Message::Accept(index) => {
let peer_id = index_to_peer.get(&index).expect("Unknown index");
let action_sequence = {
if let Some(actions) = performed_actions.get_mut(&peer_id) {
Some(actions.clone())
} else {
None
}
};
let last_messages = {
if let Some(messages) = last_received_messages.get_mut(&peer_id) {
messages
} else {
continue;
}
};
if let Some(Message::Connect(_)) = last_messages.pop_front() {
if let Some(action_sequence) = action_sequence.clone() {
let mut actions = action_sequence.into_iter();
let drop_position = actions.clone().rposition(|x| x == &TestAction::DropPeer);
let incoming_position = actions.rposition(|x| x == &TestAction::Incoming(index));
match (drop_position, incoming_position) {
(Some(drop), Some(incoming)) => {
assert!(drop < incoming);
continue
},
_ => {}
}
}
panic!("Unexpected Accept message, after a Connect message, sequence of actions: {:?}", action_sequence);
}
let received = last_received_messages.entry(peer_id.clone()).or_insert(VecDeque::new());
received.push_back(Message::Accept(index));
},
Message::Reject(index) => {
let peer_id = index_to_peer.get(&index).expect("Unknown index");
let action_sequence = {
if let Some(actions) = performed_actions.get_mut(&peer_id) {
Some(actions.clone())
} else {
None
}
};
let last_messages = {
if let Some(messages) = last_received_messages.get_mut(&peer_id) {
messages
} else {
continue;
}
};
if let Some(Message::Connect(_)) = last_messages.pop_front() {
panic!("Unexpected Reject message, after a Connect message, sequence of actions: {:?}", action_sequence);
}
let received = last_received_messages.entry(peer_id.clone()).or_insert(VecDeque::new());
received.push_back(Message::Reject(index));
},
}
}
}