Skip to content
This repository was archived by the owner on Nov 15, 2023. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 20 additions & 28 deletions client/peerset/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ const BANNED_THRESHOLD: i32 = 82 * (i32::min_value() / 100);
/// Reputation change for a node when we get disconnected from it.
const DISCONNECT_REPUTATION_CHANGE: i32 = -256;
/// Reserved peers group ID
const RESERVED_NODES: &'static str = "reserved";
const RESERVED_NODES: &str = "reserved";
/// Amount of time between the moment we disconnect from a node and the moment we remove it from
/// the list.
const FORGET_AFTER: Duration = Duration::from_secs(3600);
Expand Down Expand Up @@ -87,7 +87,7 @@ impl PeersetHandle {
/// Has no effect if the node was already a reserved peer.
///
/// > **Note**: Keep in mind that the networking has to know an address for this node,
/// > otherwise it will not be able to connect to it.
/// > otherwise it will not be able to connect to it.
pub fn add_reserved_peer(&self, peer_id: PeerId) {
let _ = self.tx.unbounded_send(Action::AddReservedPeer(peer_id));
}
Expand Down Expand Up @@ -169,7 +169,7 @@ pub struct PeersetConfig {
/// List of bootstrap nodes to initialize the peer with.
///
/// > **Note**: Keep in mind that the networking has to know an address for these nodes,
/// > otherwise it will not be able to connect to them.
/// > otherwise it will not be able to connect to them.
pub bootnodes: Vec<PeerId>,

/// If true, we only accept nodes in [`PeersetConfig::priority_groups`].
Expand All @@ -178,7 +178,7 @@ pub struct PeersetConfig {
/// Lists of nodes we should always be connected to.
///
/// > **Note**: Keep in mind that the networking has to know an address for these nodes,
/// > otherwise it will not be able to connect to them.
/// > otherwise it will not be able to connect to them.
pub priority_groups: Vec<(String, HashSet<PeerId>)>,
}

Expand Down Expand Up @@ -430,10 +430,9 @@ impl Peerset {
.get(RESERVED_NODES)
.into_iter()
.flatten()
.filter(move |n| {
.find(move |n| {
data.peer(n).into_connected().is_none()
})
.next()
.cloned()
};

Expand Down Expand Up @@ -469,10 +468,9 @@ impl Peerset {
self.priority_groups
.values()
.flatten()
.filter(move |n| {
.find(move |n| {
data.peer(n).into_connected().is_none()
})
.next()
.cloned()
};

Expand All @@ -497,21 +495,17 @@ impl Peerset {
}

// Now, we try to connect to non-priority nodes.
loop {
// Try to grab the next node to attempt to connect to.
let next = match self.data.highest_not_connected_peer() {
Some(p) => p,
None => break, // No known node to add.
};

while let Some(next) = self.data.highest_not_connected_peer() {
// Don't connect to nodes with an abysmal reputation.
if next.reputation() < BANNED_THRESHOLD {
break;
}

match next.try_outgoing() {
Ok(conn) => self.message_queue.push_back(Message::Connect(conn.into_peer_id())),
Err(_) => break, // No more slots available.
Ok(conn) => self
.message_queue
.push_back(Message::Connect(conn.into_peer_id())),
Err(_) => break, // No more slots available.
}
}
}
Expand All @@ -530,11 +524,9 @@ impl Peerset {
trace!(target: "peerset", "Incoming {:?}", peer_id);
self.update_time();

if self.reserved_only {
if !self.priority_groups.get(RESERVED_NODES).map_or(false, |n| n.contains(&peer_id)) {
self.message_queue.push_back(Message::Reject(index));
return;
}
if self.reserved_only && !self.priority_groups.get(RESERVED_NODES).map_or(false, |n| n.contains(&peer_id)) {
self.message_queue.push_back(Message::Reject(index));
return;
}

let not_connected = match self.data.peer(&peer_id) {
Expand Down Expand Up @@ -584,7 +576,7 @@ impl Peerset {
/// Adds discovered peer ids to the PSM.
///
/// > **Note**: There is no equivalent "expired" message, meaning that it is the responsibility
/// > of the PSM to remove `PeerId`s that fail to dial too often.
/// > of the PSM to remove `PeerId`s that fail to dial too often.
pub fn discovered<I: IntoIterator<Item = PeerId>>(&mut self, peer_ids: I) {
let mut discovered_any = false;

Expand Down Expand Up @@ -747,12 +739,12 @@ mod tests {

let (mut peerset, _handle) = Peerset::from_config(config);
peerset.incoming(incoming.clone(), ii);
peerset.incoming(incoming.clone(), ii4);
peerset.incoming(incoming2.clone(), ii2);
peerset.incoming(incoming3.clone(), ii3);
peerset.incoming(incoming, ii4);
peerset.incoming(incoming2, ii2);
peerset.incoming(incoming3, ii3);

assert_messages(peerset, vec![
Message::Connect(bootnode.clone()),
Message::Connect(bootnode),
Message::Accept(ii),
Message::Accept(ii2),
Message::Reject(ii3),
Expand All @@ -772,7 +764,7 @@ mod tests {
};

let (mut peerset, _) = Peerset::from_config(config);
peerset.incoming(incoming.clone(), ii);
peerset.incoming(incoming, ii);

assert_messages(peerset, vec![
Message::Reject(ii),
Expand Down
8 changes: 4 additions & 4 deletions client/peerset/src/peersstate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,8 @@ pub struct PeersState {
/// List of nodes that we know about.
///
/// > **Note**: This list should really be ordered by decreasing reputation, so that we can
/// easily select the best node to connect to. As a first draft, however, we don't
/// sort, to make the logic easier.
/// easily select the best node to connect to. As a first draft, however, we don't
/// sort, to make the logic easier.
nodes: HashMap<PeerId, Node>,

/// Number of slot-occupying nodes for which the `ConnectionState` is `In`.
Expand Down Expand Up @@ -130,7 +130,7 @@ impl PeersState {
/// Returns an object that grants access to the state of a peer.
pub fn peer<'a>(&'a mut self, peer_id: &'a PeerId) -> Peer<'a> {
match self.nodes.get_mut(peer_id) {
None => return Peer::Unknown(UnknownPeer {
None => Peer::Unknown(UnknownPeer {
parent: self,
peer_id: Cow::Borrowed(peer_id),
}),
Expand Down Expand Up @@ -585,7 +585,7 @@ mod tests {
peers_state.peer(&id2).into_connected().unwrap().disconnect();
assert_eq!(peers_state.highest_not_connected_peer().map(|p| p.into_peer_id()), Some(id1.clone()));
peers_state.peer(&id1).into_not_connected().unwrap().set_reputation(-100);
assert_eq!(peers_state.highest_not_connected_peer().map(|p| p.into_peer_id()), Some(id2.clone()));
assert_eq!(peers_state.highest_not_connected_peer().map(|p| p.into_peer_id()), Some(id2));
}

#[test]
Expand Down
4 changes: 2 additions & 2 deletions client/peerset/tests/fuzz.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,8 +115,8 @@ fn test_once() {
4 => if let Some(id) = known_nodes.iter()
.filter(|n| incoming_nodes.values().all(|m| m != *n) && !connected_nodes.contains(*n))
.choose(&mut rng) {
peerset.incoming(id.clone(), next_incoming_id.clone());
incoming_nodes.insert(next_incoming_id.clone(), id.clone());
peerset.incoming(id.clone(), next_incoming_id);
incoming_nodes.insert(next_incoming_id, id.clone());
next_incoming_id.0 += 1;
}

Expand Down
2 changes: 1 addition & 1 deletion primitives/utils/src/mpsc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ mod inner {
/// `UNBOUNDED_CHANNELS_COUNTER`
pub fn tracing_unbounded<T>(key: &'static str) ->(TracingUnboundedSender<T>, TracingUnboundedReceiver<T>) {
let (s, r) = mpsc::unbounded();
(TracingUnboundedSender(key.clone(), s), TracingUnboundedReceiver(key,r))
(TracingUnboundedSender(key, s), TracingUnboundedReceiver(key,r))
}

impl<T> TracingUnboundedSender<T> {
Expand Down
6 changes: 6 additions & 0 deletions primitives/utils/src/status_sinks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,12 @@ struct YieldAfter<T> {
sender: Option<TracingUnboundedSender<T>>,
}

impl <T> Default for StatusSinks<T> {
fn default() -> Self {
Self::new()
}
}

impl<T> StatusSinks<T> {
/// Builds a new empty collection.
pub fn new() -> StatusSinks<T> {
Expand Down