From c86b760d80cf2ef6f5e240fe318cf79bb8174ef2 Mon Sep 17 00:00:00 2001 From: debris Date: Tue, 26 Mar 2019 18:45:53 +0100 Subject: [PATCH 01/21] Introduction of PeersetHandle --- core/peerset/src/lib.rs | 291 +++++++++++++++++++++++----------------- 1 file changed, 170 insertions(+), 121 deletions(-) diff --git a/core/peerset/src/lib.rs b/core/peerset/src/lib.rs index 74444a2c388a3..89c4ee4493327 100644 --- a/core/peerset/src/lib.rs +++ b/core/peerset/src/lib.rs @@ -17,19 +17,11 @@ //! Peer Set Manager (PSM). Contains the strategy for choosing which nodes the network should be //! connected to. -use std::collections::HashSet; +use std::collections::{HashSet, VecDeque}; use futures::{prelude::*, sync::mpsc}; use libp2p::PeerId; -use parking_lot::Mutex; -use std::sync::Arc; -/// Shared part of the peer set manager (PSM). Distributed around the code. -pub struct Peerset { - tx: mpsc::UnboundedSender, - inner: Mutex, -} - -struct Inner { +struct PeersetData { /// List of nodes that we know exist but we are not connected to. /// Elements in this list must never be in `out_slots` or `in_slots`. discovered: Vec, @@ -45,6 +37,82 @@ struct Inner { in_slots: Vec>, } +#[derive(Debug)] +enum Action { + AddReservedPeer(PeerId), + RemoveReservedPeer(PeerId), + SetReservedOnly(bool), + ReportPeer(PeerId, i32), + Incoming(PeerId, IncomingIndex), + Dropped(PeerId), + Discovered(PeerId), +} + +/// Shared handle to the peer set manager (PSM). Distributed around the code. +#[derive(Clone)] +pub struct PeersetHandle { + tx: mpsc::UnboundedSender, +} + +impl PeersetHandle { + /// Adds a new reserved peer. The peerset will make an effort to always remain connected to + /// this peer. + /// + /// Has no effect if the node was already a reserved peer. + /// + /// > **Note**: Keep in mind that the networking has to know an address for this node, + /// > otherwise it will not be able to connect to it. + pub fn add_reserved_peer(&self, peer_id: PeerId) { + let _ = self.tx.unbounded_send(Action::AddReservedPeer(peer_id)); + } + + /// Remove a previously-added reserved peer. + /// + /// Has no effect if the node was not a reserved peer. + pub fn remove_reserved_peer(&self, peer_id: PeerId) { + let _ = self.tx.unbounded_send(Action::RemoveReservedPeer(peer_id)); + } + + /// Sets whether or not the peerset only has connections . + pub fn set_reserved_only(&self, reserved: bool) { + let _ = self.tx.unbounded_send(Action::SetReservedOnly(reserved)); + } + + /// Reports an adjustement to the reputation of the given peer. + pub fn report_peer(&self, peer_id: PeerId, score_diff: i32) { + let _ = self.tx.unbounded_send(Action::ReportPeer(peer_id, score_diff)); + } + + /// Indicate that we received an incoming connection. Must be answered either with + /// a corresponding `Accept` or `Reject`, except if we were already connected to this peer. + /// + /// Note that this mechanism is orthogonal to `Connect`/`Drop`. Accepting an incoming + /// connection implicitely means `Accept`, but incoming connections aren't cancelled by + /// `dropped`. + /// + /// Because of concurrency issues, it is acceptable to call `incoming` with a `PeerId` the + /// peerset is already connected to, in which case it must not answer. + pub fn incoming(&self, peer_id: PeerId, index: IncomingIndex) { + let _ = self.tx.unbounded_send(Action::Incoming(peer_id, index)); + } + + /// Indicate that we dropped an active connection with a peer, or that we failed to connect. + /// + /// Must only be called after the PSM has either generated a `Connect` message with this + /// `PeerId`, or accepted an incoming connection with this `PeerId`. + pub fn dropped(&self, peer_id: PeerId) { + let _ = self.tx.unbounded_send(Action::Dropped(peer_id)); + } + + /// Adds a discovered peer id to the PSM. + /// + /// > **Note**: There is no equivalent "expired" message, meaning that it is the responsibility + /// > of the PSM to remove `PeerId`s that fail to dial too often. + pub fn discovered(&self, peer_id: PeerId) { + let _ = self.tx.unbounded_send(Action::Discovered(peer_id)); + } +} + /// Message that can be sent by the peer set manager (PSM). #[derive(Debug)] pub enum Message { @@ -101,17 +169,19 @@ pub struct PeersetConfig { /// /// Implements the `Stream` trait and can be polled for messages. The `Stream` never ends and never /// errors. -pub struct PeersetMut { - parent: Arc, - rx: mpsc::UnboundedReceiver, +pub struct Peerset { + data: PeersetData, + tx: mpsc::UnboundedSender, + rx: mpsc::UnboundedReceiver, + message_queue: VecDeque, } impl Peerset { /// Builds a new peerset from the given configuration. - pub fn from_config(config: PeersetConfig) -> (Arc, PeersetMut) { + pub fn from_config(config: PeersetConfig) -> Peerset { let (tx, rx) = mpsc::unbounded(); - let mut inner = Inner { + let data = PeersetData { discovered: config.bootnodes.into_iter().collect(), reserved: Default::default(), reserved_only: config.reserved_only, @@ -119,80 +189,67 @@ impl Peerset { in_slots: (0 .. config.in_peers).map(|_| None).collect(), }; - alloc_slots(&mut inner, &tx); - - let peerset = Arc::new(Peerset { + let mut peerset = Peerset { + data, tx, - inner: Mutex::new(inner), - }); - - let rx = PeersetMut { - parent: peerset.clone(), rx, + message_queue: VecDeque::new(), }; + peerset.alloc_slots(); + + let handle = peerset.handle(); + for reserved in config.reserved_nodes { - peerset.add_reserved_peer(reserved); + handle.add_reserved_peer(reserved); } - (peerset, rx) + peerset } - /// Adds a new reserved peer. The peerset will make an effort to always remain connected to - /// this peer. - /// - /// Has no effect if the node was already a reserved peer. - /// - /// > **Note**: Keep in mind that the networking has to know an address for this node, - /// > otherwise it will not be able to connect to it. - pub fn add_reserved_peer(&self, peer_id: PeerId) { - let mut inner = self.inner.lock(); - if !inner.reserved.insert(peer_id.clone()) { + fn handle(&self) -> PeersetHandle { + PeersetHandle { + tx: self.tx.clone(), + } + } + + fn add_reserved_peer(&mut self, peer_id: PeerId) { + if !self.data.reserved.insert(peer_id.clone()) { // Immediately return if this peer was already in the list. return; } // Nothing more to do if we're already connected. - if inner.out_slots.iter().chain(inner.in_slots.iter()).any(|s| s.as_ref() == Some(&peer_id)) { + if self.data.out_slots.iter().chain(self.data.in_slots.iter()).any(|s| s.as_ref() == Some(&peer_id)) { return; } // Assign a slot for this reserved peer. - if let Some(pos) = inner.out_slots.iter().position(|s| s.as_ref().map(|n| !inner.reserved.contains(n)).unwrap_or(true)) { - let _ = self.tx.unbounded_send(Message::Connect(peer_id.clone())); - inner.out_slots[pos] = Some(peer_id); - + if let Some(pos) = self.data.out_slots.iter().position(|s| s.as_ref().map(|n| !self.data.reserved.contains(n)).unwrap_or(true)) { + self.message_queue.push_back(Message::Connect(peer_id.clone())); + self.data.out_slots[pos] = Some(peer_id); } else { // All slots are filled with reserved peers. - if inner.discovered.iter().all(|p| *p != peer_id) { - inner.discovered.push(peer_id); + if self.data.discovered.iter().all(|p| *p != peer_id) { + self.data.discovered.push(peer_id); } } } - /// Remove a previously-added reserved peer. - /// - /// Has no effect if the node was not a reserved peer. - pub fn remove_reserved_peer(&self, peer_id: &PeerId) { - let mut inner = self.inner.lock(); - inner.reserved.remove(peer_id); + fn remove_reserved_peer(&mut self, peer_id: &PeerId) { + self.data.reserved.remove(peer_id); } - /// Sets whether or not the peerset only has connections . - pub fn set_reserved_only(&self, reserved_only: bool) { - let mut inner = self.inner.lock(); - let inner = &mut *inner; // Fixes a borrowing issue. - inner.reserved_only = reserved_only; - + fn set_reserved_only(&mut self, reserved_only: bool) { // Disconnect non-reserved nodes. - if reserved_only { - for slot in inner.out_slots.iter_mut().chain(inner.in_slots.iter_mut()) { + if self.data.reserved_only { + for slot in self.data.out_slots.iter_mut().chain(self.data.in_slots.iter_mut()) { if let Some(peer) = slot.as_ref() { - if inner.reserved.contains(peer) { + if self.data.reserved.contains(peer) { continue; } - let _ = self.tx.unbounded_send(Message::Drop(peer.clone())); + self.message_queue.push_back(Message::Drop(peer.clone())); } *slot = None; @@ -200,74 +257,55 @@ impl Peerset { } } - /// Reports an adjustement to the reputation of the given peer. - pub fn report_peer(&self, _peer_id: &PeerId, _score_diff: i32) { + pub fn report_peer(&self, _peer_id: PeerId, _score_diff: i32) { + //unimplemented!(); // This is not implemented in this dummy implementation. } -} - -fn alloc_slots(inner: &mut Inner, tx: &mpsc::UnboundedSender) { - if inner.reserved_only { - return; - } - for slot in inner.out_slots.iter_mut() { - if slot.is_some() { - continue; + fn alloc_slots(&mut self) { + if self.data.reserved_only { + return; } - if !inner.discovered.is_empty() { - let elem = inner.discovered.remove(0); - *slot = Some(elem.clone()); - let _ = tx.unbounded_send(Message::Connect(elem)); + for slot in self.data.out_slots.iter_mut() { + if slot.is_some() { + continue; + } + + if !self.data.discovered.is_empty() { + let elem = self.data.discovered.remove(0); + *slot = Some(elem.clone()); + self.message_queue.push_back(Message::Connect(elem)); + } } } -} -impl PeersetMut { - /// Indicate that we received an incoming connection. Must be answered either with - /// a corresponding `Accept` or `Reject`, except if we were already connected to this peer. - /// - /// Note that this mechanism is orthogonal to `Connect`/`Drop`. Accepting an incoming - /// connection implicitely means `Accept`, but incoming connections aren't cancelled by - /// `dropped`. - /// - /// Because of concurrency issues, it is acceptable to call `incoming` with a `PeerId` the - /// peerset is already connected to, in which case it must not answer. - pub fn incoming(&self, peer_id: PeerId, index: IncomingIndex) { - let mut inner = self.parent.inner.lock(); - if inner.out_slots.iter().chain(inner.in_slots.iter()).any(|s| s.as_ref() == Some(&peer_id)) { + fn incoming(&mut self, peer_id: PeerId, index: IncomingIndex) { + if self.data.out_slots.iter().chain(self.data.in_slots.iter()).any(|s| s.as_ref() == Some(&peer_id)) { return } - if let Some(pos) = inner.in_slots.iter().position(|s| s.is_none()) { - inner.in_slots[pos] = Some(peer_id); - let _ = self.parent.tx.unbounded_send(Message::Accept(index)); + if let Some(pos) = self.data.in_slots.iter().position(|s| s.is_none()) { + self.data.in_slots[pos] = Some(peer_id); + self.message_queue.push_back(Message::Accept(index)); } else { - if inner.discovered.iter().all(|p| *p != peer_id) { - inner.discovered.push(peer_id); + if self.data.discovered.iter().all(|p| *p != peer_id) { + self.data.discovered.push(peer_id); } - let _ = self.parent.tx.unbounded_send(Message::Reject(index)); + self.message_queue.push_back(Message::Reject(index)); } } - /// Indicate that we dropped an active connection with a peer, or that we failed to connect. - /// - /// Must only be called after the PSM has either generated a `Connect` message with this - /// `PeerId`, or accepted an incoming connection with this `PeerId`. - pub fn dropped(&self, peer_id: &PeerId) { - let mut inner = self.parent.inner.lock(); - let inner = &mut *inner; // Fixes a borrowing issue. - + fn dropped(&mut self, peer_id: PeerId) { // Automatically connect back if reserved. - if inner.reserved.contains(peer_id) { - let _ = self.parent.tx.unbounded_send(Message::Connect(peer_id.clone())); + if self.data.reserved.contains(&peer_id) { + self.message_queue.push_back(Message::Connect(peer_id.clone())); return } // Otherwise, free the slot. - for slot in inner.out_slots.iter_mut().chain(inner.in_slots.iter_mut()) { - if slot.as_ref() == Some(peer_id) { + for slot in self.data.out_slots.iter_mut().chain(self.data.in_slots.iter_mut()) { + if slot.as_ref() == Some(&peer_id) { *slot = None; break; } @@ -275,35 +313,46 @@ impl PeersetMut { // Note: in this dummy implementation we consider that peers never expire. As soon as we // are disconnected from a peer, we try again. - if inner.discovered.iter().all(|p| p != peer_id) { - inner.discovered.push(peer_id.clone()); + if self.data.discovered.iter().all(|p| p != &peer_id) { + self.data.discovered.push(peer_id.clone()); } - alloc_slots(inner, &self.parent.tx); + self.alloc_slots(); } - /// Adds a discovered peer id to the PSM. - /// - /// > **Note**: There is no equivalent "expired" message, meaning that it is the responsibility - /// > of the PSM to remove `PeerId`s that fail to dial too often. - pub fn discovered(&self, peer_id: PeerId) { - let mut inner = self.parent.inner.lock(); - - if inner.out_slots.iter().chain(inner.in_slots.iter()).any(|p| p.as_ref() == Some(&peer_id)) { + fn discovered(&mut self, peer_id: PeerId) { + if self.data.out_slots.iter().chain(self.data.in_slots.iter()).any(|p| p.as_ref() == Some(&peer_id)) { return; } - if inner.discovered.iter().all(|p| *p != peer_id) { - inner.discovered.push(peer_id); + if self.data.discovered.iter().all(|p| *p != peer_id) { + self.data.discovered.push(peer_id); } - alloc_slots(&mut inner, &self.parent.tx); + self.alloc_slots(); } } -impl Stream for PeersetMut { +impl Stream for Peerset { type Item = Message; type Error = (); fn poll(&mut self) -> Poll, Self::Error> { - self.rx.poll() + loop { + if let Some(message) = self.message_queue.pop_front() { + return Ok(Async::Ready(Some(message))); + } + match self.rx.poll()? { + Async::NotReady => return Ok(Async::NotReady), + Async::Ready(None) => return Ok(Async::Ready(None)), + Async::Ready(Some(action)) => match action { + Action::AddReservedPeer(peer_id) => self.add_reserved_peer(peer_id), + Action::RemoveReservedPeer(peer_id) => self.remove_reserved_peer(&peer_id), + Action::SetReservedOnly(reserved) => self.set_reserved_only(reserved), + Action::ReportPeer(peer_id, score_diff) => self.report_peer(peer_id, score_diff), + Action::Incoming(peer_id, index) => self.incoming(peer_id, index), + Action::Dropped(peer_id) => self.dropped(peer_id), + Action::Discovered(peer_id) => self.discovered(peer_id), + } + } + } } } From 97430fbfd9954d322684998a69aa7c7769b2d863 Mon Sep 17 00:00:00 2001 From: debris Date: Tue, 26 Mar 2019 21:52:55 +0100 Subject: [PATCH 02/21] integrate PeersetHandle with the rest of the codebase --- core/network-libp2p/src/behaviour.rs | 2 +- .../src/custom_proto/behaviour.rs | 16 +-- core/network-libp2p/src/service_task.rs | 9 +- core/peerset/src/lib.rs | 125 ++++++++++++++---- 4 files changed, 113 insertions(+), 39 deletions(-) diff --git a/core/network-libp2p/src/behaviour.rs b/core/network-libp2p/src/behaviour.rs index bc41a9cf5539c..6fbd058ff6ab8 100644 --- a/core/network-libp2p/src/behaviour.rs +++ b/core/network-libp2p/src/behaviour.rs @@ -54,7 +54,7 @@ impl Behaviour { local_public_key: PublicKey, protocol: RegisteredProtocol, known_addresses: Vec<(PeerId, Multiaddr)>, - peerset: substrate_peerset::PeersetMut, + peerset: substrate_peerset::Peerset, ) -> Self { let identify = { let proto_version = "/substrate/1.0".to_string(); diff --git a/core/network-libp2p/src/custom_proto/behaviour.rs b/core/network-libp2p/src/custom_proto/behaviour.rs index fa5b1a3b727cb..7d947f9bdc551 100644 --- a/core/network-libp2p/src/custom_proto/behaviour.rs +++ b/core/network-libp2p/src/custom_proto/behaviour.rs @@ -31,7 +31,7 @@ pub struct CustomProto { protocol: RegisteredProtocol, /// Receiver for instructions about who to connect to or disconnect from. - peerset: substrate_peerset::PeersetMut, + peerset: substrate_peerset::Peerset, /// List of peers in our state. peers: FnvHashMap, @@ -175,7 +175,7 @@ impl CustomProto { /// Creates a `CustomProtos`. pub fn new( protocol: RegisteredProtocol, - peerset: substrate_peerset::PeersetMut, + peerset: substrate_peerset::Peerset, ) -> Self { CustomProto { protocol, @@ -213,7 +213,7 @@ impl CustomProto { // DisabledPendingEnable => Disabled. PeerState::DisabledPendingEnable { open, connected_point, timer } => { debug!(target: "sub-libp2p", "PSM <= Dropped({:?})", peer_id); - self.peerset.dropped(peer_id); + self.peerset.dropped(peer_id.clone()); let banned_until = Some(if let Some(ban) = ban { cmp::max(timer.deadline(), Instant::now() + ban) } else { @@ -225,7 +225,7 @@ impl CustomProto { // Enabled => Disabled. PeerState::Enabled { open, connected_point } => { debug!(target: "sub-libp2p", "PSM <= Dropped({:?})", peer_id); - self.peerset.dropped(peer_id); + self.peerset.dropped(peer_id.clone()); debug!(target: "sub-libp2p", "Handler({:?}) <= Disable", peer_id); self.events.push(NetworkBehaviourAction::SendEvent { peer_id: peer_id.clone(), @@ -478,7 +478,7 @@ impl CustomProto { debug!(target: "sub-libp2p", "PSM => Accept({:?}, {:?}): Obsolete incoming, sending back dropped", index, incoming.peer_id); debug!(target: "sub-libp2p", "PSM <= Dropped({:?})", incoming.peer_id); - self.peerset.dropped(&incoming.peer_id); + self.peerset.dropped(incoming.peer_id.clone()); return } @@ -656,7 +656,7 @@ where debug!(target: "sub-libp2p", "Libp2p => Disconnected({:?}): Was disabled \ (through {:?}) but pending enable", peer_id, endpoint); debug!(target: "sub-libp2p", "PSM <= Dropped({:?})", peer_id); - self.peerset.dropped(peer_id); + self.peerset.dropped(peer_id.clone()); self.peers.insert(peer_id.clone(), PeerState::Banned { until: timer.deadline() }); if open { debug!(target: "sub-libp2p", "External API <= Closed({:?})", peer_id); @@ -673,7 +673,7 @@ where debug!(target: "sub-libp2p", "Libp2p => Disconnected({:?}): Was enabled \ (through {:?})", peer_id, endpoint); debug!(target: "sub-libp2p", "PSM <= Dropped({:?})", peer_id); - self.peerset.dropped(peer_id); + self.peerset.dropped(peer_id.clone()); if open { debug!(target: "sub-libp2p", "External API <= Closed({:?})", peer_id); @@ -724,7 +724,7 @@ where until: Instant::now() + Duration::from_secs(5) }; debug!(target: "sub-libp2p", "PSM <= Dropped({:?})", peer_id); - self.peerset.dropped(peer_id) + self.peerset.dropped(peer_id.clone()) }, // We can still get dial failures even if we are already connected to the node, diff --git a/core/network-libp2p/src/service_task.rs b/core/network-libp2p/src/service_task.rs index bd4ff45440999..b88fac2a2b5cf 100644 --- a/core/network-libp2p/src/service_task.rs +++ b/core/network-libp2p/src/service_task.rs @@ -38,7 +38,7 @@ use std::time::Duration; pub fn start_service( config: NetworkConfiguration, registered_custom: RegisteredProtocol, -) -> Result<(Service, Arc), IoError> +) -> Result<(Service, substrate_peerset::PeersetHandle), IoError> where TMessage: CustomMessage + Send + 'static { if let Some(ref path) = config.net_config_path { @@ -72,13 +72,14 @@ where TMessage: CustomMessage + Send + 'static { } // Build the peerset. - let (peerset, peerset_receiver) = substrate_peerset::Peerset::from_config(substrate_peerset::PeersetConfig { + let peerset = substrate_peerset::Peerset::from_config(substrate_peerset::PeersetConfig { in_peers: config.in_peers, out_peers: config.out_peers, bootnodes, reserved_only: config.non_reserved_mode == NonReservedPeerMode::Deny, reserved_nodes, }); + let peerset_handle = peerset.handle(); // Private and public keys configuration. let local_identity = config.node_key.clone().into_keypair()?; @@ -88,7 +89,7 @@ where TMessage: CustomMessage + Send + 'static { // Build the swarm. let (mut swarm, bandwidth) = { let user_agent = format!("{} ({})", config.client_version, config.node_name); - let behaviour = Behaviour::new(user_agent, local_public, registered_custom, known_addresses, peerset_receiver); + let behaviour = Behaviour::new(user_agent, local_public, registered_custom, known_addresses, peerset); let (transport, bandwidth) = transport::build_transport(local_identity); (Swarm::new(transport, behaviour, local_peer_id.clone()), bandwidth) }; @@ -116,7 +117,7 @@ where TMessage: CustomMessage + Send + 'static { injected_events: Vec::new(), }; - Ok((service, peerset)) + Ok((service, peerset_handle)) } /// Event produced by the service. diff --git a/core/peerset/src/lib.rs b/core/peerset/src/lib.rs index 89c4ee4493327..2a6e62a942d64 100644 --- a/core/peerset/src/lib.rs +++ b/core/peerset/src/lib.rs @@ -18,9 +18,11 @@ //! connected to. use std::collections::{HashSet, VecDeque}; +use std::ops; use futures::{prelude::*, sync::mpsc}; use libp2p::PeerId; +#[derive(Debug)] struct PeersetData { /// List of nodes that we know exist but we are not connected to. /// Elements in this list must never be in `out_slots` or `in_slots`. @@ -49,7 +51,7 @@ enum Action { } /// Shared handle to the peer set manager (PSM). Distributed around the code. -#[derive(Clone)] +#[derive(Debug, Clone)] pub struct PeersetHandle { tx: mpsc::UnboundedSender, } @@ -114,7 +116,7 @@ impl PeersetHandle { } /// Message that can be sent by the peer set manager (PSM). -#[derive(Debug)] +#[derive(Debug, PartialEq)] pub enum Message { /// Request to open a connection to the given peer. From the point of view of the PSM, we are /// immediately connected. @@ -169,20 +171,29 @@ pub struct PeersetConfig { /// /// Implements the `Stream` trait and can be polled for messages. The `Stream` never ends and never /// errors. +#[derive(Debug)] pub struct Peerset { data: PeersetData, - tx: mpsc::UnboundedSender, + handle: PeersetHandle, rx: mpsc::UnboundedReceiver, message_queue: VecDeque, } +impl ops::Deref for Peerset { + type Target = PeersetHandle; + + fn deref(&self) -> &Self::Target { + &self.handle + } +} + impl Peerset { /// Builds a new peerset from the given configuration. pub fn from_config(config: PeersetConfig) -> Peerset { let (tx, rx) = mpsc::unbounded(); let data = PeersetData { - discovered: config.bootnodes.into_iter().collect(), + discovered: config.bootnodes, reserved: Default::default(), reserved_only: config.reserved_only, out_slots: (0 .. config.out_peers).map(|_| None).collect(), @@ -191,29 +202,28 @@ impl Peerset { let mut peerset = Peerset { data, - tx, + handle: PeersetHandle { + tx, + }, rx, message_queue: VecDeque::new(), }; peerset.alloc_slots(); - let handle = peerset.handle(); - for reserved in config.reserved_nodes { - handle.add_reserved_peer(reserved); + peerset.add_reserved_peer(reserved); } peerset } - fn handle(&self) -> PeersetHandle { - PeersetHandle { - tx: self.tx.clone(), - } + /// Creates shared handle to the peer set manager (PSM). + pub fn handle(&self) -> PeersetHandle { + self.handle.clone() } - fn add_reserved_peer(&mut self, peer_id: PeerId) { + fn on_add_reserved_peer(&mut self, peer_id: PeerId) { if !self.data.reserved.insert(peer_id.clone()) { // Immediately return if this peer was already in the list. return; @@ -236,12 +246,13 @@ impl Peerset { } } - fn remove_reserved_peer(&mut self, peer_id: &PeerId) { + fn on_remove_reserved_peer(&mut self, peer_id: &PeerId) { self.data.reserved.remove(peer_id); } - fn set_reserved_only(&mut self, reserved_only: bool) { + fn on_set_reserved_only(&mut self, reserved_only: bool) { // Disconnect non-reserved nodes. + self.data.reserved_only = reserved_only; if self.data.reserved_only { for slot in self.data.out_slots.iter_mut().chain(self.data.in_slots.iter_mut()) { if let Some(peer) = slot.as_ref() { @@ -257,7 +268,7 @@ impl Peerset { } } - pub fn report_peer(&self, _peer_id: PeerId, _score_diff: i32) { + fn on_report_peer(&self, _peer_id: PeerId, _score_diff: i32) { //unimplemented!(); // This is not implemented in this dummy implementation. } @@ -280,7 +291,7 @@ impl Peerset { } } - fn incoming(&mut self, peer_id: PeerId, index: IncomingIndex) { + fn on_incoming(&mut self, peer_id: PeerId, index: IncomingIndex) { if self.data.out_slots.iter().chain(self.data.in_slots.iter()).any(|s| s.as_ref() == Some(&peer_id)) { return } @@ -296,7 +307,7 @@ impl Peerset { } } - fn dropped(&mut self, peer_id: PeerId) { + fn on_dropped(&mut self, peer_id: PeerId) { // Automatically connect back if reserved. if self.data.reserved.contains(&peer_id) { self.message_queue.push_back(Message::Connect(peer_id.clone())); @@ -319,7 +330,7 @@ impl Peerset { self.alloc_slots(); } - fn discovered(&mut self, peer_id: PeerId) { + fn on_discovered(&mut self, peer_id: PeerId) { if self.data.out_slots.iter().chain(self.data.in_slots.iter()).any(|p| p.as_ref() == Some(&peer_id)) { return; } @@ -344,15 +355,77 @@ impl Stream for Peerset { Async::NotReady => return Ok(Async::NotReady), Async::Ready(None) => return Ok(Async::Ready(None)), Async::Ready(Some(action)) => match action { - Action::AddReservedPeer(peer_id) => self.add_reserved_peer(peer_id), - Action::RemoveReservedPeer(peer_id) => self.remove_reserved_peer(&peer_id), - Action::SetReservedOnly(reserved) => self.set_reserved_only(reserved), - Action::ReportPeer(peer_id, score_diff) => self.report_peer(peer_id, score_diff), - Action::Incoming(peer_id, index) => self.incoming(peer_id, index), - Action::Dropped(peer_id) => self.dropped(peer_id), - Action::Discovered(peer_id) => self.discovered(peer_id), + Action::AddReservedPeer(peer_id) => self.on_add_reserved_peer(peer_id), + Action::RemoveReservedPeer(peer_id) => self.on_remove_reserved_peer(&peer_id), + Action::SetReservedOnly(reserved) => self.on_set_reserved_only(reserved), + Action::ReportPeer(peer_id, score_diff) => self.on_report_peer(peer_id, score_diff), + Action::Incoming(peer_id, index) => self.on_incoming(peer_id, index), + Action::Dropped(peer_id) => self.on_dropped(peer_id), + Action::Discovered(peer_id) => self.on_discovered(peer_id), } } } } } + +#[cfg(test)] +mod tests { + use libp2p::PeerId; + use futures::prelude::*; + use super::{PeersetConfig, Peerset, Message}; + + #[test] + fn test_peerset_from_config_with_bootnodes() { + let bootnode = PeerId::random(); + let config = PeersetConfig { + in_peers: 0, + out_peers: 1, + bootnodes: vec![bootnode.clone()], + reserved_only: false, + reserved_nodes: Vec::new(), + }; + + let peerset = Peerset::from_config(config); + let (next, _peerset) = peerset.into_future() + .wait() + .expect("Ok((Some(Message::Connect), peerset))"); + + let message = next.expect("Some(Message::Connect)"); + assert_eq!(message, Message::Connect(bootnode)); + } + + #[test] + fn test_peerset_add_reserved_peer() { + //unimplemented!(); + } + + #[test] + fn test_peerset_remove_reserved_peer() { + //unimplemented!(); + } + + #[test] + fn test_peerset_set_reserved_only() { + //unimplemented!(); + } + + #[test] + fn test_peerset_report_peer() { + //unimplemented!(); + } + + #[test] + fn test_peerset_incoming() { + //unimplemented!(); + } + + #[test] + fn test_peerset_dropped() { + //unimplemented!(); + } + + #[test] + fn test_peerset_discovered() { + //unimplemented!(); + } +} From 939cc3029c0efecf4dd31b07643406739faacfed Mon Sep 17 00:00:00 2001 From: debris Date: Tue, 26 Mar 2019 23:11:55 +0100 Subject: [PATCH 03/21] fix compilation errors --- core/network/src/service.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/core/network/src/service.rs b/core/network/src/service.rs index 643fd3bbe7db0..c732c669e4337 100644 --- a/core/network/src/service.rs +++ b/core/network/src/service.rs @@ -24,7 +24,7 @@ use parking_lot::{Mutex, RwLock}; use network_libp2p::{ProtocolId, NetworkConfiguration, Severity}; use network_libp2p::{start_service, parse_str_addr, Service as NetworkService, ServiceEvent as NetworkServiceEvent}; use network_libp2p::{multiaddr, RegisteredProtocol, NetworkState}; -use peerset::Peerset; +use peerset::PeersetHandle; use consensus::import_queue::{ImportQueue, Link}; use crate::consensus_gossip::ConsensusGossip; use crate::message::{Message, ConsensusEngineId}; @@ -141,7 +141,7 @@ pub struct Service> { network: Arc>>>, /// Peerset manager (PSM); manages the reputation of nodes and indicates the network which /// nodes it should be connected to or not. - peerset: Arc, + peerset: PeersetHandle, /// Protocol sender protocol_sender: Sender>, /// Sender for messages to the background service task, and handle for the background thread. @@ -356,7 +356,7 @@ impl> ManageNetwork for Service } fn remove_reserved_peer(&self, peer: PeerId) { - self.peerset.remove_reserved_peer(&peer); + self.peerset.remove_reserved_peer(peer); } fn add_reserved_peer(&self, peer: String) -> Result<(), String> { @@ -470,7 +470,7 @@ fn start_thread( network_port: NetworkPort, config: NetworkConfiguration, registered: RegisteredProtocol>, -) -> Result<((oneshot::Sender<()>, thread::JoinHandle<()>), Arc>>>, Arc), Error> { +) -> Result<((oneshot::Sender<()>, thread::JoinHandle<()>), Arc>>>, PeersetHandle), Error> { // Start the main service. let (service, peerset) = match start_service(config, registered) { Ok((service, peerset)) => (Arc::new(Mutex::new(service)), peerset), From 5a04f536e545a267b01a37956178481c19ebe361 Mon Sep 17 00:00:00 2001 From: debris Date: Thu, 28 Mar 2019 12:58:02 +0100 Subject: [PATCH 04/21] more tests for peerset, fixed overwriting bug in add_reserved_peer --- core/peerset/src/lib.rs | 103 +++++++++++++++++++++++++++++++++++----- 1 file changed, 92 insertions(+), 11 deletions(-) diff --git a/core/peerset/src/lib.rs b/core/peerset/src/lib.rs index f31829d1404f3..3599ac67ba354 100644 --- a/core/peerset/src/lib.rs +++ b/core/peerset/src/lib.rs @@ -236,7 +236,9 @@ impl Peerset { } // Assign a slot for this reserved peer. - if let Some(pos) = self.data.out_slots.iter().position(|s| s.as_ref().map(|n| !self.data.reserved.contains(n)).unwrap_or(true)) { + // TODO: override slots that are occupied by not reserved peers + // send Message::Drop in those cases + if let Some(pos) = self.data.out_slots.iter().position(Option::is_none) { self.message_queue.push_back(Message::Connect(peer_id.clone())); self.data.out_slots[pos] = Some(peer_id); } else { @@ -311,7 +313,7 @@ impl Peerset { fn on_dropped(&mut self, peer_id: PeerId) { // Automatically connect back if reserved. if self.data.reserved.contains(&peer_id) { - self.message_queue.push_back(Message::Connect(peer_id.clone())); + self.message_queue.push_back(Message::Connect(peer_id)); return } @@ -326,7 +328,7 @@ impl Peerset { // Note: in this dummy implementation we consider that peers never expire. As soon as we // are disconnected from a peer, we try again. if self.data.discovered.iter().all(|p| p != &peer_id) { - self.data.discovered.push(peer_id.clone()); + self.data.discovered.push(peer_id); } self.alloc_slots(); } @@ -380,29 +382,80 @@ mod tests { use futures::prelude::*; use super::{PeersetConfig, Peerset, Message}; + fn next_message(peerset: Peerset) -> Result<(Message, Peerset), ()> { + let (next, peerset) = peerset.into_future() + .wait() + .map_err(|_| ())?; + let message = next.ok_or_else(|| ())?; + Ok((message, peerset)) + } + #[test] fn test_peerset_from_config_with_bootnodes() { let bootnode = PeerId::random(); + let bootnode2 = PeerId::random(); let config = PeersetConfig { in_peers: 0, - out_peers: 1, - bootnodes: vec![bootnode.clone()], + out_peers: 2, + bootnodes: vec![bootnode.clone(), bootnode2.clone()], reserved_only: false, reserved_nodes: Vec::new(), }; let peerset = Peerset::from_config(config); - let (next, _peerset) = peerset.into_future() - .wait() - .expect("Ok((Some(Message::Connect), peerset))"); - let message = next.expect("Some(Message::Connect)"); + let (message, peerset) = next_message(peerset).expect("Message::Connect to bootnode"); + assert_eq!(message, Message::Connect(bootnode)); + let (message, _peerset) = next_message(peerset).expect("Message::Connect to bootnode2"); + assert_eq!(message, Message::Connect(bootnode2)); + } + + #[test] + fn test_peerset_from_config_with_reserved_nodes() { + let bootnode = PeerId::random(); + let bootnode2 = PeerId::random(); + let reserved_peer = PeerId::random(); + let reserved_peer2 = PeerId::random(); + let config = PeersetConfig { + in_peers: 0, + out_peers: 3, + bootnodes: vec![bootnode.clone(), bootnode2.clone()], + reserved_only: false, + reserved_nodes: vec![reserved_peer.clone(), reserved_peer2.clone()], + }; + + let peerset = Peerset::from_config(config); + + // TODO: decide whether the order is correct. Should we first connect to bootnodes or reserved nodes? + let (message, peerset) = next_message(peerset).expect("Message::Connect to bootnode"); assert_eq!(message, Message::Connect(bootnode)); + let (message, peerset) = next_message(peerset).expect("Message::Connect to bootnode2"); + assert_eq!(message, Message::Connect(bootnode2)); + let (message, _peerset) = next_message(peerset).expect("Message::Connect to reserved_peer"); + assert_eq!(message, Message::Connect(reserved_peer)); } #[test] fn test_peerset_add_reserved_peer() { - //unimplemented!(); + let bootnode = PeerId::random(); + let reserved_peer = PeerId::random(); + let reserved_peer2 = PeerId::random(); + let config = PeersetConfig { + in_peers: 0, + out_peers: 2, + bootnodes: vec![bootnode], + reserved_only: true, + reserved_nodes: Vec::new(), + }; + + let peerset = Peerset::from_config(config); + peerset.add_reserved_peer(reserved_peer.clone()); + peerset.add_reserved_peer(reserved_peer2.clone()); + + let (message, peerset) = next_message(peerset).expect("Message::Connect to reserved_peer"); + assert_eq!(message, Message::Connect(reserved_peer)); + let (message, _peerset) = next_message(peerset).expect("Message::Connect to reserved_peer2"); + assert_eq!(message, Message::Connect(reserved_peer2)); } #[test] @@ -412,7 +465,35 @@ mod tests { #[test] fn test_peerset_set_reserved_only() { - //unimplemented!(); + let bootnode = PeerId::random(); + let bootnode2 = PeerId::random(); + let reserved_peer = PeerId::random(); + let reserved_peer2 = PeerId::random(); + let config = PeersetConfig { + in_peers: 0, + out_peers: 4, + bootnodes: vec![bootnode.clone(), bootnode2.clone()], + reserved_only: false, + reserved_nodes: vec![reserved_peer.clone(), reserved_peer2.clone()], + }; + + let peerset = Peerset::from_config(config); + peerset.set_reserved_only(true); + + // TODO: decide whether the order is correct. Should we first connect to bootnodes or reserved nodes? + let (message, peerset) = next_message(peerset).expect("Message::Connect to bootnode"); + assert_eq!(message, Message::Connect(bootnode.clone())); + let (message, peerset) = next_message(peerset).expect("Message::Connect to bootnode2"); + assert_eq!(message, Message::Connect(bootnode2.clone())); + let (message, peerset) = next_message(peerset).expect("Message::Connect to reserved_peer"); + assert_eq!(message, Message::Connect(reserved_peer)); + let (message, peerset) = next_message(peerset).expect("Message::Connect to reserved_peer2"); + assert_eq!(message, Message::Connect(reserved_peer2)); + + let (message, peerset) = next_message(peerset).expect("Message::Drop the bootnode"); + assert_eq!(message, Message::Drop(bootnode)); + let (message, _peerset) = next_message(peerset).expect("Message::Drop the bootnode2"); + assert_eq!(message, Message::Drop(bootnode2)); } #[test] From 141284e76bdd720a715619bfa5362ac67202a6bb Mon Sep 17 00:00:00 2001 From: debris Date: Mon, 1 Apr 2019 17:15:13 +0200 Subject: [PATCH 05/21] Slots data structure and bugfixes for peerset --- Cargo.lock | 6 - core/peerset/Cargo.toml | 6 - core/peerset/src/lib.rs | 262 ++++++++++++++++++++++---------------- core/peerset/src/slots.rs | 117 +++++++++++++++++ 4 files changed, 272 insertions(+), 119 deletions(-) create mode 100644 core/peerset/src/slots.rs diff --git a/Cargo.lock b/Cargo.lock index 9666ce55eaad5..dfb3be58dd948 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4039,16 +4039,10 @@ dependencies = [ name = "substrate-peerset" version = "0.1.0" dependencies = [ - "fnv 1.0.6 (registry+https://github.com/rust-lang/crates.io-index)", "futures 0.1.25 (registry+https://github.com/rust-lang/crates.io-index)", "libp2p 0.6.0 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.4.6 (registry+https://github.com/rust-lang/crates.io-index)", - "parking_lot 0.7.1 (registry+https://github.com/rust-lang/crates.io-index)", - "rand 0.6.5 (registry+https://github.com/rust-lang/crates.io-index)", - "serde 1.0.87 (registry+https://github.com/rust-lang/crates.io-index)", - "serde_derive 1.0.87 (registry+https://github.com/rust-lang/crates.io-index)", "serde_json 1.0.38 (registry+https://github.com/rust-lang/crates.io-index)", - "tokio-io 0.1.11 (registry+https://github.com/rust-lang/crates.io-index)", ] [[package]] diff --git a/core/peerset/Cargo.toml b/core/peerset/Cargo.toml index c2247cdd0b2b2..80ed9309a4653 100644 --- a/core/peerset/Cargo.toml +++ b/core/peerset/Cargo.toml @@ -8,13 +8,7 @@ authors = ["Parity Technologies "] edition = "2018" [dependencies] -fnv = "1.0" futures = "0.1" libp2p = { version = "0.6.0", default-features = false } log = "0.4" -parking_lot = "0.7" -rand = "0.6" -serde = "1.0.70" -serde_derive = "1.0.70" serde_json = "1.0.24" -tokio-io = "0.1" diff --git a/core/peerset/src/lib.rs b/core/peerset/src/lib.rs index 3599ac67ba354..b47ed1d046987 100644 --- a/core/peerset/src/lib.rs +++ b/core/peerset/src/lib.rs @@ -17,27 +17,28 @@ //! Peer Set Manager (PSM). Contains the strategy for choosing which nodes the network should be //! connected to. -use std::collections::{HashSet, VecDeque}; +mod slots; + +use std::collections::{HashMap, VecDeque}; use std::ops; use futures::{prelude::*, sync::mpsc}; use libp2p::PeerId; +use slots::{SlotType, SlotError, Slots}; pub use serde_json::Value; #[derive(Debug)] struct PeersetData { /// List of nodes that we know exist but we are not connected to. /// Elements in this list must never be in `out_slots` or `in_slots`. - discovered: Vec, - /// List of reserved nodes. - reserved: HashSet, + discovered: VecDeque<(PeerId, SlotType)>, /// If true, we only accept reserved nodes. reserved_only: bool, - /// Node slots for outgoing connections. Each slot contains either `None` if the node is free, - /// or `Some` if it is assigned to a peer. - out_slots: Vec>, - /// Node slots for incoming connections. Each slot contains either `None` if the node is free, - /// or `Some` if it is assigned to a peer. - in_slots: Vec>, + /// Node slots for outgoing connections. + out_slots: Slots, + /// Node slots for incoming connections. + in_slots: Slots, + /// List of node scores. + scores: HashMap, } #[derive(Debug)] @@ -147,10 +148,10 @@ impl From for IncomingIndex { #[derive(Debug)] pub struct PeersetConfig { /// Maximum number of ingoing links to peers. - pub in_peers: u32, + pub in_peers: usize, /// Maximum number of outgoing links to peers. - pub out_peers: u32, + pub out_peers: usize, /// List of bootstrap nodes to initialize the peer with. /// @@ -194,11 +195,11 @@ impl Peerset { let (tx, rx) = mpsc::unbounded(); let data = PeersetData { - discovered: config.bootnodes, - reserved: Default::default(), + discovered: Default::default(), reserved_only: config.reserved_only, - out_slots: (0 .. config.out_peers).map(|_| None).collect(), - in_slots: (0 .. config.in_peers).map(|_| None).collect(), + out_slots: Slots::new(config.out_peers), + in_slots: Slots::new(config.in_peers), + scores: Default::default(), }; let mut peerset = Peerset { @@ -210,12 +211,15 @@ impl Peerset { message_queue: VecDeque::new(), }; - peerset.alloc_slots(); + for peer_id in config.reserved_nodes { + peerset.data.discovered.push_back((peer_id, SlotType::Reserved)); + } - for reserved in config.reserved_nodes { - peerset.add_reserved_peer(reserved); + for peer_id in config.bootnodes { + peerset.data.discovered.push_back((peer_id, SlotType::Common)); } + peerset.alloc_slots(); peerset } @@ -225,121 +229,137 @@ impl Peerset { } fn on_add_reserved_peer(&mut self, peer_id: PeerId) { - if !self.data.reserved.insert(peer_id.clone()) { - // Immediately return if this peer was already in the list. - return; - } - // Nothing more to do if we're already connected. - if self.data.out_slots.iter().chain(self.data.in_slots.iter()).any(|s| s.as_ref() == Some(&peer_id)) { + if self.data.in_slots.contains(&peer_id) { + self.data.in_slots.mark_reserved(&peer_id); return; } - // Assign a slot for this reserved peer. - // TODO: override slots that are occupied by not reserved peers - // send Message::Drop in those cases - if let Some(pos) = self.data.out_slots.iter().position(Option::is_none) { - self.message_queue.push_back(Message::Connect(peer_id.clone())); - self.data.out_slots[pos] = Some(peer_id); - } else { - // All slots are filled with reserved peers. - if self.data.discovered.iter().all(|p| *p != peer_id) { - self.data.discovered.push(peer_id); + match self.data.out_slots.add_peer(peer_id.clone(), SlotType::Reserved) { + Ok(_) => { + self.message_queue.push_back(Message::Connect(peer_id)); + }, + Err(SlotError::AlreadyConnected(_)) => { + return; + } + Err(SlotError::MaxConnections(_)) => { + if self.data.discovered.iter().all(|(p, _)| *p != peer_id) { + self.data.discovered.push_front((peer_id, SlotType::Reserved)); + } + } + Err(SlotError::DemandReroute { disconnect, ..}) => { + self.message_queue.push_back(Message::Drop(disconnect)); + if self.data.discovered.iter().all(|(p, _)| *p != peer_id) { + self.data.discovered.push_front((peer_id, SlotType::Reserved)); + } } } } fn on_remove_reserved_peer(&mut self, peer_id: &PeerId) { - self.data.reserved.remove(peer_id); + self.data.in_slots.mark_not_reserved(peer_id); + self.data.out_slots.mark_not_reserved(peer_id); + // TODO: should we disconnect from this peer? + // a) always? + // b) only if reserved_only is set } fn on_set_reserved_only(&mut self, reserved_only: bool) { // Disconnect non-reserved nodes. self.data.reserved_only = reserved_only; if self.data.reserved_only { - for slot in self.data.out_slots.iter_mut().chain(self.data.in_slots.iter_mut()) { - if let Some(peer) = slot.as_ref() { - if self.data.reserved.contains(peer) { - continue; - } - - self.message_queue.push_back(Message::Drop(peer.clone())); - } - - *slot = None; + for peer in self.data.in_slots.common_peers().chain(self.data.out_slots.common_peers()) { + // peer will be removed from `in_slots` or `out_slots` in `on_dropped` method + self.message_queue.push_back(Message::Drop(peer.clone())) } } } - fn on_report_peer(&self, _peer_id: PeerId, _score_diff: i32) { - //unimplemented!(); - // This is not implemented in this dummy implementation. - } + fn on_report_peer(&mut self, peer_id: PeerId, score_diff: i32) { + let score = self.data.scores.entry(peer_id.clone()).or_default(); + *score = score.saturating_add(score_diff); - fn alloc_slots(&mut self) { - if self.data.reserved_only { - return; + if *score < 0 { + // peer will be removed from `in_slots` or `out_slots` in `on_dropped` method + self.message_queue.push_back(Message::Drop(peer_id)); } + } - for slot in self.data.out_slots.iter_mut() { - if slot.is_some() { - continue; + fn alloc_slots(&mut self) { + while let Some((peer_id, slot_type)) = self.data.discovered.pop_front() { + // reserved peers are always at the beginning of discovered vec + // if we get a common peer, that means it's a goot time to stop + if self.data.reserved_only && slot_type == SlotType::Common { + self.data.discovered.push_front((peer_id, slot_type)); + break; } - - if !self.data.discovered.is_empty() { - let elem = self.data.discovered.remove(0); - *slot = Some(elem.clone()); - self.message_queue.push_back(Message::Connect(elem)); + match self.data.out_slots.add_peer(peer_id.clone(), slot_type) { + Ok(_) => { + self.message_queue.push_back(Message::Connect(peer_id)); + }, + Err(SlotError::AlreadyConnected(_)) => (), + Err(SlotError::MaxConnections(_)) => { + self.data.discovered.push_front((peer_id, slot_type)); + break; + }, + Err(SlotError::DemandReroute { disconnect, .. }) => { + self.message_queue.push_back(Message::Drop(disconnect)); + self.data.discovered.push_front((peer_id, slot_type)); + break; + } } } } fn on_incoming(&mut self, peer_id: PeerId, index: IncomingIndex) { - if self.data.out_slots.iter().chain(self.data.in_slots.iter()).any(|s| s.as_ref() == Some(&peer_id)) { - return + // check if we are already connected to this peer + if self.data.out_slots.contains(&peer_id) { + self.message_queue.push_back(Message::Reject(index)); } - if let Some(pos) = self.data.in_slots.iter().position(|s| s.is_none()) { - self.data.in_slots[pos] = Some(peer_id); - self.message_queue.push_back(Message::Accept(index)); - } else { - if self.data.discovered.iter().all(|p| *p != peer_id) { - self.data.discovered.push(peer_id); + match self.data.in_slots.add_peer(peer_id, SlotType::Common) { + Ok(_) => { + self.message_queue.push_back(Message::Accept(index)); + }, + Err(SlotError::MaxConnections(peer_id)) => { + if self.data.discovered.iter().all(|(p, _)| *p != peer_id) { + self.data.discovered.push_back((peer_id, SlotType::Common)); + } + self.message_queue.push_back(Message::Reject(index)); + }, + _ => { + self.message_queue.push_back(Message::Reject(index)); } - self.message_queue.push_back(Message::Reject(index)); } } fn on_dropped(&mut self, peer_id: PeerId) { // Automatically connect back if reserved. - if self.data.reserved.contains(&peer_id) { + if self.data.in_slots.is_reserved(&peer_id) || self.data.out_slots.is_reserved(&peer_id) { self.message_queue.push_back(Message::Connect(peer_id)); - return + return; } // Otherwise, free the slot. - for slot in self.data.out_slots.iter_mut().chain(self.data.in_slots.iter_mut()) { - if slot.as_ref() == Some(&peer_id) { - *slot = None; - break; - } - } + self.data.in_slots.clear_slot(&peer_id); + self.data.out_slots.clear_slot(&peer_id); // Note: in this dummy implementation we consider that peers never expire. As soon as we // are disconnected from a peer, we try again. - if self.data.discovered.iter().all(|p| p != &peer_id) { - self.data.discovered.push(peer_id); + if self.data.discovered.iter().all(|(p, _)| p != &peer_id) { + self.data.discovered.push_back((peer_id, SlotType::Common)); } + self.alloc_slots(); } fn on_discovered(&mut self, peer_id: PeerId) { - if self.data.out_slots.iter().chain(self.data.in_slots.iter()).any(|p| p.as_ref() == Some(&peer_id)) { + if self.data.in_slots.contains(&peer_id) || self.data.out_slots.contains(&peer_id) { return; } - if self.data.discovered.iter().all(|p| *p != peer_id) { - self.data.discovered.push(peer_id); + if self.data.discovered.iter().all(|(p, _)| *p != peer_id) { + self.data.discovered.push_back((peer_id, SlotType::Common)); } self.alloc_slots(); } @@ -382,6 +402,15 @@ mod tests { use futures::prelude::*; use super::{PeersetConfig, Peerset, Message}; + fn assert_messages(mut peerset: Peerset, messages: Vec) { + for expected_message in messages { + let (message, p) = next_message(peerset).expect("expected message"); + assert_eq!(message, expected_message); + peerset = p; + } + assert!(peerset.message_queue.is_empty()) + } + fn next_message(peerset: Peerset) -> Result<(Message, Peerset), ()> { let (next, peerset) = peerset.into_future() .wait() @@ -404,10 +433,10 @@ mod tests { let peerset = Peerset::from_config(config); - let (message, peerset) = next_message(peerset).expect("Message::Connect to bootnode"); - assert_eq!(message, Message::Connect(bootnode)); - let (message, _peerset) = next_message(peerset).expect("Message::Connect to bootnode2"); - assert_eq!(message, Message::Connect(bootnode2)); + assert_messages(peerset, vec![ + Message::Connect(bootnode), + Message::Connect(bootnode2), + ]); } #[test] @@ -426,13 +455,11 @@ mod tests { let peerset = Peerset::from_config(config); - // TODO: decide whether the order is correct. Should we first connect to bootnodes or reserved nodes? - let (message, peerset) = next_message(peerset).expect("Message::Connect to bootnode"); - assert_eq!(message, Message::Connect(bootnode)); - let (message, peerset) = next_message(peerset).expect("Message::Connect to bootnode2"); - assert_eq!(message, Message::Connect(bootnode2)); - let (message, _peerset) = next_message(peerset).expect("Message::Connect to reserved_peer"); - assert_eq!(message, Message::Connect(reserved_peer)); + assert_messages(peerset, vec![ + Message::Connect(reserved_peer), + Message::Connect(reserved_peer2), + Message::Connect(bootnode) + ]); } #[test] @@ -452,10 +479,10 @@ mod tests { peerset.add_reserved_peer(reserved_peer.clone()); peerset.add_reserved_peer(reserved_peer2.clone()); - let (message, peerset) = next_message(peerset).expect("Message::Connect to reserved_peer"); - assert_eq!(message, Message::Connect(reserved_peer)); - let (message, _peerset) = next_message(peerset).expect("Message::Connect to reserved_peer2"); - assert_eq!(message, Message::Connect(reserved_peer2)); + assert_messages(peerset, vec![ + Message::Connect(reserved_peer), + Message::Connect(reserved_peer2) + ]); } #[test] @@ -480,20 +507,21 @@ mod tests { let peerset = Peerset::from_config(config); peerset.set_reserved_only(true); - // TODO: decide whether the order is correct. Should we first connect to bootnodes or reserved nodes? - let (message, peerset) = next_message(peerset).expect("Message::Connect to bootnode"); - assert_eq!(message, Message::Connect(bootnode.clone())); - let (message, peerset) = next_message(peerset).expect("Message::Connect to bootnode2"); - assert_eq!(message, Message::Connect(bootnode2.clone())); let (message, peerset) = next_message(peerset).expect("Message::Connect to reserved_peer"); assert_eq!(message, Message::Connect(reserved_peer)); let (message, peerset) = next_message(peerset).expect("Message::Connect to reserved_peer2"); assert_eq!(message, Message::Connect(reserved_peer2)); + let (message, peerset) = next_message(peerset).expect("Message::Connect to bootnode"); + assert_eq!(message, Message::Connect(bootnode.clone())); + let (message, peerset) = next_message(peerset).expect("Message::Connect to bootnode2"); + assert_eq!(message, Message::Connect(bootnode2.clone())); let (message, peerset) = next_message(peerset).expect("Message::Drop the bootnode"); - assert_eq!(message, Message::Drop(bootnode)); - let (message, _peerset) = next_message(peerset).expect("Message::Drop the bootnode2"); - assert_eq!(message, Message::Drop(bootnode2)); + let (message2, _peerset) = next_message(peerset).expect("Message::Drop the bootnode2"); + assert!( + (message == Message::Drop(bootnode.clone()) && message2 == Message::Drop(bootnode2.clone())) || + (message2 == Message::Drop(bootnode) && message == Message::Drop(bootnode2)) + ); } #[test] @@ -508,7 +536,27 @@ mod tests { #[test] fn test_peerset_dropped() { - //unimplemented!(); + let bootnode = PeerId::random(); + let bootnode2 = PeerId::random(); + let reserved_peer = PeerId::random(); + let config = PeersetConfig { + in_peers: 0, + out_peers: 2, + bootnodes: vec![bootnode.clone(), bootnode2.clone()], + reserved_only: false, + reserved_nodes: vec![reserved_peer.clone()], + }; + + let peerset = Peerset::from_config(config); + peerset.dropped(reserved_peer.clone()); + peerset.dropped(bootnode.clone()); + + assert_messages(peerset, vec![ + Message::Connect(reserved_peer.clone()), + Message::Connect(bootnode), + Message::Connect(reserved_peer), + Message::Connect(bootnode2), + ]); } #[test] diff --git a/core/peerset/src/slots.rs b/core/peerset/src/slots.rs new file mode 100644 index 0000000000000..15120385b7194 --- /dev/null +++ b/core/peerset/src/slots.rs @@ -0,0 +1,117 @@ +// Copyright 2018-2019 Parity Technologies (UK) Ltd. +// This file is part of Substrate. + +// Substrate is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Substrate is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Substrate. If not, see . + +use std::collections::HashMap; +use libp2p::PeerId; + +/// Describes the nature of connection with a given peer. +#[derive(Debug, PartialEq, Clone, Copy)] +pub enum SlotType { + /// Reserved peer is a peer we should always stay connected to. + Reserved, + /// Common peer is a type of peer that we stay connected to only if it's + /// useful for us. + Common, +} + +/// Descibes why the reason of not being able to add given peer. +pub enum SlotError { + /// Error returned when we are already connected to this peer. + AlreadyConnected(PeerId), + /// Error returned when max number of connections has been already established. + MaxConnections(PeerId), + /// Error returned when we should disconnect from a given common peer to make space + /// for a reserved peer. + DemandReroute { + /// Peer we should disconnect from. + disconnect: PeerId, + /// Peer we should connect to. + connect: PeerId, + } +} + +/// Contains all information about group of slots. +#[derive(Debug)] +pub struct Slots { + max_slots: usize, + slots: HashMap, +} + +impl Slots { + /// Creates a group of slots with a limited size. + pub fn new(max_slots: usize) -> Self { + Slots { + max_slots, + slots: HashMap::with_capacity(max_slots), + } + } + + /// Returns true if one of the slots contains given peer. + pub fn contains(&self, peer_id: &PeerId) -> bool { + self.slots.contains_key(peer_id) + } + + /// Returns Ok if we successfully connected to a given peer. + pub fn add_peer(&mut self, peer_id: PeerId, slot_type: SlotType) -> Result<(), SlotError> { + if let Some(slot_type) = self.slots.get_mut(&peer_id) { + *slot_type = SlotType::Reserved; + return Err(SlotError::AlreadyConnected(peer_id)); + } + + if self.slots.len() == self.max_slots { + if let SlotType::Reserved = slot_type { + if let Some((to_disconnect, _)) = self.slots.iter().find(|(_, &slot_type)| slot_type == SlotType::Common) { + return Err(SlotError::DemandReroute { + disconnect: to_disconnect.clone(), + connect: peer_id, + }); + } + } + return Err(SlotError::MaxConnections(peer_id)); + } + + self.slots.insert(peer_id, slot_type); + Ok(()) + } + + pub fn common_peers(&self) -> impl Iterator { + self.slots.iter() + .filter(|&(_, slot_type)| *slot_type == SlotType::Common) + .map(|(peer_id, _)| peer_id) + } + + pub fn mark_reserved(&mut self, peer_id: &PeerId) { + if let Some(slot_type) = self.slots.get_mut(peer_id) { + *slot_type = SlotType::Reserved; + } + } + + pub fn mark_not_reserved(&mut self, peer_id: &PeerId) { + if let Some(slot_type) = self.slots.get_mut(peer_id) { + *slot_type = SlotType::Common; + } + } + + pub fn clear_slot(&mut self, peer_id: &PeerId) { + self.slots.remove(peer_id); + } + + pub fn is_reserved(&self, peer_id: &PeerId) -> bool { + self.slots.get(peer_id) + .map(|slot_type| *slot_type == SlotType::Reserved) + .unwrap_or_else(|| false) + } +} From e849837a0c025bd21e0645fd1a7d9fcdbe80152c Mon Sep 17 00:00:00 2001 From: debris Date: Mon, 1 Apr 2019 17:29:36 +0200 Subject: [PATCH 06/21] bend to pressure --- core/network-libp2p/src/service_task.rs | 4 +- core/peerset/src/lib.rs | 75 ++++++++++--------------- 2 files changed, 33 insertions(+), 46 deletions(-) diff --git a/core/network-libp2p/src/service_task.rs b/core/network-libp2p/src/service_task.rs index f6c71084104e1..cecba79bdfab4 100644 --- a/core/network-libp2p/src/service_task.rs +++ b/core/network-libp2p/src/service_task.rs @@ -73,8 +73,8 @@ where TMessage: CustomMessage + Send + 'static { // Build the peerset. let peerset = substrate_peerset::Peerset::from_config(substrate_peerset::PeersetConfig { - in_peers: config.in_peers, - out_peers: config.out_peers, + in_peers: config.in_peers as usize, + out_peers: config.out_peers as usize, bootnodes, reserved_only: config.non_reserved_mode == NonReservedPeerMode::Deny, reserved_nodes, diff --git a/core/peerset/src/lib.rs b/core/peerset/src/lib.rs index b47ed1d046987..314e59237ebf2 100644 --- a/core/peerset/src/lib.rs +++ b/core/peerset/src/lib.rs @@ -47,9 +47,6 @@ enum Action { RemoveReservedPeer(PeerId), SetReservedOnly(bool), ReportPeer(PeerId, i32), - Incoming(PeerId, IncomingIndex), - Dropped(PeerId), - Discovered(PeerId), } /// Shared handle to the peer set manager (PSM). Distributed around the code. @@ -86,35 +83,6 @@ impl PeersetHandle { pub fn report_peer(&self, peer_id: PeerId, score_diff: i32) { let _ = self.tx.unbounded_send(Action::ReportPeer(peer_id, score_diff)); } - - /// Indicate that we received an incoming connection. Must be answered either with - /// a corresponding `Accept` or `Reject`, except if we were already connected to this peer. - /// - /// Note that this mechanism is orthogonal to `Connect`/`Drop`. Accepting an incoming - /// connection implicitely means `Accept`, but incoming connections aren't cancelled by - /// `dropped`. - /// - /// Because of concurrency issues, it is acceptable to call `incoming` with a `PeerId` the - /// peerset is already connected to, in which case it must not answer. - pub fn incoming(&self, peer_id: PeerId, index: IncomingIndex) { - let _ = self.tx.unbounded_send(Action::Incoming(peer_id, index)); - } - - /// Indicate that we dropped an active connection with a peer, or that we failed to connect. - /// - /// Must only be called after the PSM has either generated a `Connect` message with this - /// `PeerId`, or accepted an incoming connection with this `PeerId`. - pub fn dropped(&self, peer_id: PeerId) { - let _ = self.tx.unbounded_send(Action::Dropped(peer_id)); - } - - /// Adds a discovered peer id to the PSM. - /// - /// > **Note**: There is no equivalent "expired" message, meaning that it is the responsibility - /// > of the PSM to remove `PeerId`s that fail to dial too often. - pub fn discovered(&self, peer_id: PeerId) { - let _ = self.tx.unbounded_send(Action::Discovered(peer_id)); - } } /// Message that can be sent by the peer set manager (PSM). @@ -311,7 +279,16 @@ impl Peerset { } } - fn on_incoming(&mut self, peer_id: PeerId, index: IncomingIndex) { + /// Indicate that we received an incoming connection. Must be answered either with + /// a corresponding `Accept` or `Reject`, except if we were already connected to this peer. + /// + /// Note that this mechanism is orthogonal to `Connect`/`Drop`. Accepting an incoming + /// connection implicitely means `Accept`, but incoming connections aren't cancelled by + /// `dropped`. + /// + /// Because of concurrency issues, it is acceptable to call `incoming` with a `PeerId` the + /// peerset is already connected to, in which case it must not answer. + pub fn incoming(&mut self, peer_id: PeerId, index: IncomingIndex) { // check if we are already connected to this peer if self.data.out_slots.contains(&peer_id) { self.message_queue.push_back(Message::Reject(index)); @@ -333,7 +310,11 @@ impl Peerset { } } - fn on_dropped(&mut self, peer_id: PeerId) { + /// Indicate that we dropped an active connection with a peer, or that we failed to connect. + /// + /// Must only be called after the PSM has either generated a `Connect` message with this + /// `PeerId`, or accepted an incoming connection with this `PeerId`. + pub fn dropped(&mut self, peer_id: PeerId) { // Automatically connect back if reserved. if self.data.in_slots.is_reserved(&peer_id) || self.data.out_slots.is_reserved(&peer_id) { self.message_queue.push_back(Message::Connect(peer_id)); @@ -353,7 +334,11 @@ impl Peerset { self.alloc_slots(); } - fn on_discovered(&mut self, peer_id: PeerId) { + /// Adds a discovered peer id to the PSM. + /// + /// > **Note**: There is no equivalent "expired" message, meaning that it is the responsibility + /// > of the PSM to remove `PeerId`s that fail to dial too often. + pub fn discovered(&mut self, peer_id: PeerId) { if self.data.in_slots.contains(&peer_id) || self.data.out_slots.contains(&peer_id) { return; } @@ -387,9 +372,6 @@ impl Stream for Peerset { Action::RemoveReservedPeer(peer_id) => self.on_remove_reserved_peer(&peer_id), Action::SetReservedOnly(reserved) => self.on_set_reserved_only(reserved), Action::ReportPeer(peer_id, score_diff) => self.on_report_peer(peer_id, score_diff), - Action::Incoming(peer_id, index) => self.on_incoming(peer_id, index), - Action::Dropped(peer_id) => self.on_dropped(peer_id), - Action::Discovered(peer_id) => self.on_discovered(peer_id), } } } @@ -402,13 +384,14 @@ mod tests { use futures::prelude::*; use super::{PeersetConfig, Peerset, Message}; - fn assert_messages(mut peerset: Peerset, messages: Vec) { + fn assert_messages(mut peerset: Peerset, messages: Vec) -> Peerset { for expected_message in messages { let (message, p) = next_message(peerset).expect("expected message"); assert_eq!(message, expected_message); peerset = p; } - assert!(peerset.message_queue.is_empty()) + assert!(peerset.message_queue.is_empty()); + peerset } fn next_message(peerset: Peerset) -> Result<(Message, Peerset), ()> { @@ -548,12 +531,16 @@ mod tests { }; let peerset = Peerset::from_config(config); - peerset.dropped(reserved_peer.clone()); - peerset.dropped(bootnode.clone()); - assert_messages(peerset, vec![ + let mut peerset = assert_messages(peerset, vec![ Message::Connect(reserved_peer.clone()), - Message::Connect(bootnode), + Message::Connect(bootnode.clone()), + ]); + + peerset.dropped(reserved_peer.clone()); + peerset.dropped(bootnode); + + let _peerset = assert_messages(peerset, vec![ Message::Connect(reserved_peer), Message::Connect(bootnode2), ]); From a74b69c1bf7be86012e03009ecc3dff6384c02ee Mon Sep 17 00:00:00 2001 From: debris Date: Tue, 2 Apr 2019 11:06:27 +0200 Subject: [PATCH 07/21] updated lru-cache to 0.1.2 and updated linked-hash-map to 0.5.2 --- Cargo.lock | 24 +++++++++--------------- 1 file changed, 9 insertions(+), 15 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 3abcfb75873d8..81b4e512f1d1a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1648,12 +1648,7 @@ dependencies = [ [[package]] name = "linked-hash-map" -version = "0.4.2" -source = "registry+https://github.com/rust-lang/crates.io-index" - -[[package]] -name = "linked-hash-map" -version = "0.5.1" +version = "0.5.2" source = "registry+https://github.com/rust-lang/crates.io-index" [[package]] @@ -1661,7 +1656,7 @@ name = "linked_hash_set" version = "0.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" dependencies = [ - "linked-hash-map 0.5.1 (registry+https://github.com/rust-lang/crates.io-index)", + "linked-hash-map 0.5.2 (registry+https://github.com/rust-lang/crates.io-index)", ] [[package]] @@ -1691,10 +1686,10 @@ dependencies = [ [[package]] name = "lru-cache" -version = "0.1.1" +version = "0.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" dependencies = [ - "linked-hash-map 0.4.2 (registry+https://github.com/rust-lang/crates.io-index)", + "linked-hash-map 0.5.2 (registry+https://github.com/rust-lang/crates.io-index)", ] [[package]] @@ -3711,7 +3706,7 @@ dependencies = [ "kvdb-memorydb 0.1.0 (git+https://github.com/paritytech/parity-common?rev=b0317f649ab2c665b7987b8475878fc4d2e1f81d)", "kvdb-rocksdb 0.1.4 (git+https://github.com/paritytech/parity-common?rev=b0317f649ab2c665b7987b8475878fc4d2e1f81d)", "log 0.4.6 (registry+https://github.com/rust-lang/crates.io-index)", - "lru-cache 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)", + "lru-cache 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)", "parity-codec 3.2.0 (registry+https://github.com/rust-lang/crates.io-index)", "parking_lot 0.7.1 (registry+https://github.com/rust-lang/crates.io-index)", "sr-primitives 1.0.0", @@ -3954,10 +3949,10 @@ dependencies = [ "error-chain 0.12.0 (registry+https://github.com/rust-lang/crates.io-index)", "fork-tree 1.0.0", "futures 0.1.25 (registry+https://github.com/rust-lang/crates.io-index)", - "linked-hash-map 0.5.1 (registry+https://github.com/rust-lang/crates.io-index)", + "linked-hash-map 0.5.2 (registry+https://github.com/rust-lang/crates.io-index)", "linked_hash_set 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.4.6 (registry+https://github.com/rust-lang/crates.io-index)", - "lru-cache 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)", + "lru-cache 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)", "parity-codec 3.2.0 (registry+https://github.com/rust-lang/crates.io-index)", "parking_lot 0.7.1 (registry+https://github.com/rust-lang/crates.io-index)", "rand 0.6.5 (registry+https://github.com/rust-lang/crates.io-index)", @@ -5254,13 +5249,12 @@ source = "registry+https://github.com/rust-lang/crates.io-index" "checksum libp2p-yamux 0.6.0 (registry+https://github.com/rust-lang/crates.io-index)" = "0dbb8d08cb536a964727e77b868a026c6d92993f08e387d49163565575a478d9" "checksum librocksdb-sys 5.14.3 (registry+https://github.com/rust-lang/crates.io-index)" = "b9024327233e7fac7982440f73301c00046d438c5b1011e8f4e394226ce19007" "checksum libsecp256k1 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)" = "688e8d65e495567c2c35ea0001b26b9debf0b4ea11f8cccc954233b75fc3428a" -"checksum linked-hash-map 0.4.2 (registry+https://github.com/rust-lang/crates.io-index)" = "7860ec297f7008ff7a1e3382d7f7e1dcd69efc94751a2284bafc3d013c2aa939" -"checksum linked-hash-map 0.5.1 (registry+https://github.com/rust-lang/crates.io-index)" = "70fb39025bc7cdd76305867c4eccf2f2dcf6e9a57f5b21a93e1c2d86cd03ec9e" +"checksum linked-hash-map 0.5.2 (registry+https://github.com/rust-lang/crates.io-index)" = "ae91b68aebc4ddb91978b11a1b02ddd8602a05ec19002801c5666000e05e0f83" "checksum linked_hash_set 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)" = "3c7c91c4c7bbeb4f2f7c4e5be11e6a05bd6830bc37249c47ce1ad86ad453ff9c" "checksum lock_api 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)" = "62ebf1391f6acad60e5c8b43706dde4582df75c06698ab44511d15016bc2442c" "checksum log 0.3.9 (registry+https://github.com/rust-lang/crates.io-index)" = "e19e8d5c34a3e0e2223db8e060f9e8264aeeb5c5fc64a4ee9965c062211c024b" "checksum log 0.4.6 (registry+https://github.com/rust-lang/crates.io-index)" = "c84ec4b527950aa83a329754b01dbe3f58361d1c5efacd1f6d68c494d08a17c6" -"checksum lru-cache 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "4d06ff7ff06f729ce5f4e227876cb88d10bc59cd4ae1e09fbb2bde15c850dc21" +"checksum lru-cache 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)" = "31e24f1ad8321ca0e8a1e0ac13f23cb668e6f5466c2c57319f6a5cf1cc8e3b1c" "checksum make-cmd 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "a8ca8afbe8af1785e09636acb5a41e08a765f5f0340568716c18a8700ba3c0d3" "checksum matches 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)" = "7ffc5c5338469d4d3ea17d269fa8ea3512ad247247c30bd2df69e68309ed0a08" "checksum memchr 2.1.3 (registry+https://github.com/rust-lang/crates.io-index)" = "e1dd4eaac298c32ce07eb6ed9242eda7d82955b9170b7d6db59b2e02cc63fcb8" From 94dc0f74f3f3a17d508789d4387e7d7f0b8f0580 Mon Sep 17 00:00:00 2001 From: debris Date: Tue, 2 Apr 2019 12:07:54 +0200 Subject: [PATCH 08/21] peerset discovered list is now a LinkedHashMap --- Cargo.lock | 1 + core/peerset/Cargo.toml | 1 + core/peerset/src/lib.rs | 141 ++++++++++++++++++++++++++++++---------- 3 files changed, 107 insertions(+), 36 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 81b4e512f1d1a..cba5932ad40aa 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4036,6 +4036,7 @@ version = "1.0.0" dependencies = [ "futures 0.1.25 (registry+https://github.com/rust-lang/crates.io-index)", "libp2p 0.6.0 (registry+https://github.com/rust-lang/crates.io-index)", + "linked-hash-map 0.5.2 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.4.6 (registry+https://github.com/rust-lang/crates.io-index)", "serde_json 1.0.38 (registry+https://github.com/rust-lang/crates.io-index)", ] diff --git a/core/peerset/Cargo.toml b/core/peerset/Cargo.toml index ed8bee0d4c292..156897bf29c3a 100644 --- a/core/peerset/Cargo.toml +++ b/core/peerset/Cargo.toml @@ -10,5 +10,6 @@ edition = "2018" [dependencies] futures = "0.1" libp2p = { version = "0.6.0", default-features = false } +linked-hash-map = "0.5" log = "0.4" serde_json = "1.0.24" diff --git a/core/peerset/src/lib.rs b/core/peerset/src/lib.rs index 314e59237ebf2..ef73fb7fcd20b 100644 --- a/core/peerset/src/lib.rs +++ b/core/peerset/src/lib.rs @@ -23,14 +23,52 @@ use std::collections::{HashMap, VecDeque}; use std::ops; use futures::{prelude::*, sync::mpsc}; use libp2p::PeerId; +use linked_hash_map::LinkedHashMap; use slots::{SlotType, SlotError, Slots}; pub use serde_json::Value; +/// FIFO-ordered list of nodes that we know exist, but we are not connected to. +#[derive(Debug, Default)] +struct Discovered { + /// Nodes we should connect to first. + reserved: LinkedHashMap, + /// All remaining nodes. + common: LinkedHashMap, +} + +impl Discovered { + /// Returns true if we already know given node. + fn contains(&self, peer_id: &PeerId) -> bool { + self.reserved.contains_key(peer_id) || self.common.contains_key(peer_id) + } + + /// Adds new peer of a given type. + fn add_peer(&mut self, peer_id: PeerId, slot_type: SlotType) { + if !self.contains(&peer_id) { + match slot_type { + SlotType::Common => self.common.insert(peer_id, ()), + SlotType::Reserved => self.reserved.insert(peer_id, ()), + }; + } + } + + /// Pops the oldest peer from the list. + fn pop_peer(&mut self) -> Option<(PeerId, SlotType)> { + match self.reserved.pop_front() { + Some((peer_id, _)) => Some((peer_id, SlotType::Reserved)), + None => { + let (peer_id, _) = self.common.pop_front()?; + Some((peer_id, SlotType::Common)) + } + } + } +} + #[derive(Debug)] struct PeersetData { - /// List of nodes that we know exist but we are not connected to. + /// List of nodes that we know exist, but we are not connected to. /// Elements in this list must never be in `out_slots` or `in_slots`. - discovered: VecDeque<(PeerId, SlotType)>, + discovered: Discovered, /// If true, we only accept reserved nodes. reserved_only: bool, /// Node slots for outgoing connections. @@ -180,11 +218,11 @@ impl Peerset { }; for peer_id in config.reserved_nodes { - peerset.data.discovered.push_back((peer_id, SlotType::Reserved)); + peerset.data.discovered.add_peer(peer_id, SlotType::Reserved); } for peer_id in config.bootnodes { - peerset.data.discovered.push_back((peer_id, SlotType::Common)); + peerset.data.discovered.add_peer(peer_id, SlotType::Common); } peerset.alloc_slots(); @@ -211,15 +249,11 @@ impl Peerset { return; } Err(SlotError::MaxConnections(_)) => { - if self.data.discovered.iter().all(|(p, _)| *p != peer_id) { - self.data.discovered.push_front((peer_id, SlotType::Reserved)); - } + self.data.discovered.add_peer(peer_id, SlotType::Reserved); } Err(SlotError::DemandReroute { disconnect, ..}) => { self.message_queue.push_back(Message::Drop(disconnect)); - if self.data.discovered.iter().all(|(p, _)| *p != peer_id) { - self.data.discovered.push_front((peer_id, SlotType::Reserved)); - } + self.data.discovered.add_peer(peer_id, SlotType::Reserved); } } } @@ -249,16 +283,18 @@ impl Peerset { if *score < 0 { // peer will be removed from `in_slots` or `out_slots` in `on_dropped` method - self.message_queue.push_back(Message::Drop(peer_id)); + if self.data.in_slots.contains(&peer_id) || self.data.out_slots.contains(&peer_id) { + self.message_queue.push_back(Message::Drop(peer_id)); + } } } fn alloc_slots(&mut self) { - while let Some((peer_id, slot_type)) = self.data.discovered.pop_front() { + while let Some((peer_id, slot_type)) = self.data.discovered.pop_peer() { // reserved peers are always at the beginning of discovered vec // if we get a common peer, that means it's a goot time to stop if self.data.reserved_only && slot_type == SlotType::Common { - self.data.discovered.push_front((peer_id, slot_type)); + self.data.discovered.add_peer(peer_id, slot_type); break; } match self.data.out_slots.add_peer(peer_id.clone(), slot_type) { @@ -267,12 +303,12 @@ impl Peerset { }, Err(SlotError::AlreadyConnected(_)) => (), Err(SlotError::MaxConnections(_)) => { - self.data.discovered.push_front((peer_id, slot_type)); + self.data.discovered.add_peer(peer_id, slot_type); break; }, Err(SlotError::DemandReroute { disconnect, .. }) => { self.message_queue.push_back(Message::Drop(disconnect)); - self.data.discovered.push_front((peer_id, slot_type)); + self.data.discovered.add_peer(peer_id, slot_type); break; } } @@ -290,6 +326,7 @@ impl Peerset { /// peerset is already connected to, in which case it must not answer. pub fn incoming(&mut self, peer_id: PeerId, index: IncomingIndex) { // check if we are already connected to this peer + // TODO: should we take into account `reserved_only`? if self.data.out_slots.contains(&peer_id) { self.message_queue.push_back(Message::Reject(index)); } @@ -299,9 +336,7 @@ impl Peerset { self.message_queue.push_back(Message::Accept(index)); }, Err(SlotError::MaxConnections(peer_id)) => { - if self.data.discovered.iter().all(|(p, _)| *p != peer_id) { - self.data.discovered.push_back((peer_id, SlotType::Common)); - } + self.data.discovered.add_peer(peer_id, SlotType::Common); self.message_queue.push_back(Message::Reject(index)); }, _ => { @@ -327,10 +362,7 @@ impl Peerset { // Note: in this dummy implementation we consider that peers never expire. As soon as we // are disconnected from a peer, we try again. - if self.data.discovered.iter().all(|(p, _)| p != &peer_id) { - self.data.discovered.push_back((peer_id, SlotType::Common)); - } - + self.data.discovered.add_peer(peer_id, SlotType::Common); self.alloc_slots(); } @@ -343,9 +375,7 @@ impl Peerset { return; } - if self.data.discovered.iter().all(|(p, _)| *p != peer_id) { - self.data.discovered.push_back((peer_id, SlotType::Common)); - } + self.data.discovered.add_peer(peer_id, SlotType::Common); self.alloc_slots(); } @@ -382,7 +412,7 @@ impl Stream for Peerset { mod tests { use libp2p::PeerId; use futures::prelude::*; - use super::{PeersetConfig, Peerset, Message}; + use super::{PeersetConfig, Peerset, Message, IncomingIndex}; fn assert_messages(mut peerset: Peerset, messages: Vec) -> Peerset { for expected_message in messages { @@ -490,14 +520,12 @@ mod tests { let peerset = Peerset::from_config(config); peerset.set_reserved_only(true); - let (message, peerset) = next_message(peerset).expect("Message::Connect to reserved_peer"); - assert_eq!(message, Message::Connect(reserved_peer)); - let (message, peerset) = next_message(peerset).expect("Message::Connect to reserved_peer2"); - assert_eq!(message, Message::Connect(reserved_peer2)); - let (message, peerset) = next_message(peerset).expect("Message::Connect to bootnode"); - assert_eq!(message, Message::Connect(bootnode.clone())); - let (message, peerset) = next_message(peerset).expect("Message::Connect to bootnode2"); - assert_eq!(message, Message::Connect(bootnode2.clone())); + let peerset = assert_messages(peerset, vec![ + Message::Connect(reserved_peer), + Message::Connect(reserved_peer2), + Message::Connect(bootnode.clone()), + Message::Connect(bootnode2.clone()), + ]); let (message, peerset) = next_message(peerset).expect("Message::Drop the bootnode"); let (message2, _peerset) = next_message(peerset).expect("Message::Drop the bootnode2"); @@ -509,12 +537,53 @@ mod tests { #[test] fn test_peerset_report_peer() { - //unimplemented!(); + let bootnode = PeerId::random(); + let bootnode2 = PeerId::random(); + let config = PeersetConfig { + in_peers: 0, + out_peers: 1, + bootnodes: vec![bootnode.clone(), bootnode2.clone()], + reserved_only: false, + reserved_nodes: Vec::new(), + }; + + let peerset = Peerset::from_config(config); + peerset.report_peer(bootnode2, -1); + peerset.report_peer(bootnode.clone(), -1); + + assert_messages(peerset, vec![ + Message::Connect(bootnode.clone()), + Message::Drop(bootnode) + ]); } #[test] fn test_peerset_incoming() { - //unimplemented!(); + let bootnode = PeerId::random(); + let incoming = PeerId::random(); + let incoming2 = PeerId::random(); + let ii = IncomingIndex(1); + let ii2 = IncomingIndex(2); + let ii3 = IncomingIndex(3); + let config = PeersetConfig { + in_peers: 1, + out_peers: 1, + bootnodes: vec![bootnode.clone()], + reserved_only: false, + reserved_nodes: Vec::new(), + }; + + let mut peerset = Peerset::from_config(config); + peerset.incoming(incoming.clone(), ii); + peerset.incoming(incoming2.clone(), ii2); + peerset.incoming(incoming.clone(), ii3); + + assert_messages(peerset, vec![ + Message::Connect(bootnode.clone()), + Message::Accept(ii.clone()), + Message::Reject(ii2), + Message::Reject(ii3), + ]); } #[test] From 832233a9759744f089cb80e4dd716649c886b207 Mon Sep 17 00:00:00 2001 From: debris Date: Tue, 2 Apr 2019 12:15:09 +0200 Subject: [PATCH 09/21] fix review suggestions --- core/peerset/src/lib.rs | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/core/peerset/src/lib.rs b/core/peerset/src/lib.rs index ef73fb7fcd20b..a5ed55a07ddc0 100644 --- a/core/peerset/src/lib.rs +++ b/core/peerset/src/lib.rs @@ -21,7 +21,7 @@ mod slots; use std::collections::{HashMap, VecDeque}; use std::ops; -use futures::{prelude::*, sync::mpsc}; +use futures::{prelude::*, sync::mpsc, try_ready}; use libp2p::PeerId; use linked_hash_map::LinkedHashMap; use slots::{SlotType, SlotError, Slots}; @@ -117,7 +117,7 @@ impl PeersetHandle { let _ = self.tx.unbounded_send(Action::SetReservedOnly(reserved)); } - /// Reports an adjustement to the reputation of the given peer. + /// Reports an adjustment to the reputation of the given peer. pub fn report_peer(&self, peer_id: PeerId, score_diff: i32) { let _ = self.tx.unbounded_send(Action::ReportPeer(peer_id, score_diff)); } @@ -394,10 +394,9 @@ impl Stream for Peerset { if let Some(message) = self.message_queue.pop_front() { return Ok(Async::Ready(Some(message))); } - match self.rx.poll()? { - Async::NotReady => return Ok(Async::NotReady), - Async::Ready(None) => return Ok(Async::Ready(None)), - Async::Ready(Some(action)) => match action { + match try_ready!(self.rx.poll()) { + None => return Ok(Async::Ready(None)), + Some(action) => match action { Action::AddReservedPeer(peer_id) => self.on_add_reserved_peer(peer_id), Action::RemoveReservedPeer(peer_id) => self.on_remove_reserved_peer(&peer_id), Action::SetReservedOnly(reserved) => self.on_set_reserved_only(reserved), From 48a7abf22d416e43c04f19129b9507761589709e Mon Sep 17 00:00:00 2001 From: debris Date: Tue, 2 Apr 2019 14:22:06 +0200 Subject: [PATCH 10/21] split back Peerset and PeersetHandle --- core/network-libp2p/src/service_task.rs | 3 +- core/peerset/src/lib.rs | 50 +++++++++---------------- 2 files changed, 19 insertions(+), 34 deletions(-) diff --git a/core/network-libp2p/src/service_task.rs b/core/network-libp2p/src/service_task.rs index cecba79bdfab4..62e10b045b297 100644 --- a/core/network-libp2p/src/service_task.rs +++ b/core/network-libp2p/src/service_task.rs @@ -72,14 +72,13 @@ where TMessage: CustomMessage + Send + 'static { } // Build the peerset. - let peerset = substrate_peerset::Peerset::from_config(substrate_peerset::PeersetConfig { + let (peerset, peerset_handle) = substrate_peerset::Peerset::from_config(substrate_peerset::PeersetConfig { in_peers: config.in_peers as usize, out_peers: config.out_peers as usize, bootnodes, reserved_only: config.non_reserved_mode == NonReservedPeerMode::Deny, reserved_nodes, }); - let peerset_handle = peerset.handle(); // Private and public keys configuration. let local_identity = config.node_key.clone().into_keypair()?; diff --git a/core/peerset/src/lib.rs b/core/peerset/src/lib.rs index a5ed55a07ddc0..960a6dc3294ad 100644 --- a/core/peerset/src/lib.rs +++ b/core/peerset/src/lib.rs @@ -20,7 +20,6 @@ mod slots; use std::collections::{HashMap, VecDeque}; -use std::ops; use futures::{prelude::*, sync::mpsc, try_ready}; use libp2p::PeerId; use linked_hash_map::LinkedHashMap; @@ -182,22 +181,13 @@ pub struct PeersetConfig { #[derive(Debug)] pub struct Peerset { data: PeersetData, - handle: PeersetHandle, rx: mpsc::UnboundedReceiver, message_queue: VecDeque, } -impl ops::Deref for Peerset { - type Target = PeersetHandle; - - fn deref(&self) -> &Self::Target { - &self.handle - } -} - impl Peerset { /// Builds a new peerset from the given configuration. - pub fn from_config(config: PeersetConfig) -> Peerset { + pub fn from_config(config: PeersetConfig) -> (Peerset, PeersetHandle) { let (tx, rx) = mpsc::unbounded(); let data = PeersetData { @@ -208,11 +198,12 @@ impl Peerset { scores: Default::default(), }; + let handle = PeersetHandle { + tx, + }; + let mut peerset = Peerset { data, - handle: PeersetHandle { - tx, - }, rx, message_queue: VecDeque::new(), }; @@ -226,12 +217,7 @@ impl Peerset { } peerset.alloc_slots(); - peerset - } - - /// Creates shared handle to the peer set manager (PSM). - pub fn handle(&self) -> PeersetHandle { - self.handle.clone() + (peerset, handle) } fn on_add_reserved_peer(&mut self, peer_id: PeerId) { @@ -443,7 +429,7 @@ mod tests { reserved_nodes: Vec::new(), }; - let peerset = Peerset::from_config(config); + let (peerset, _handle) = Peerset::from_config(config); assert_messages(peerset, vec![ Message::Connect(bootnode), @@ -465,7 +451,7 @@ mod tests { reserved_nodes: vec![reserved_peer.clone(), reserved_peer2.clone()], }; - let peerset = Peerset::from_config(config); + let (peerset, _handle) = Peerset::from_config(config); assert_messages(peerset, vec![ Message::Connect(reserved_peer), @@ -487,9 +473,9 @@ mod tests { reserved_nodes: Vec::new(), }; - let peerset = Peerset::from_config(config); - peerset.add_reserved_peer(reserved_peer.clone()); - peerset.add_reserved_peer(reserved_peer2.clone()); + let (peerset, handle) = Peerset::from_config(config); + handle.add_reserved_peer(reserved_peer.clone()); + handle.add_reserved_peer(reserved_peer2.clone()); assert_messages(peerset, vec![ Message::Connect(reserved_peer), @@ -516,8 +502,8 @@ mod tests { reserved_nodes: vec![reserved_peer.clone(), reserved_peer2.clone()], }; - let peerset = Peerset::from_config(config); - peerset.set_reserved_only(true); + let (peerset, handle) = Peerset::from_config(config); + handle.set_reserved_only(true); let peerset = assert_messages(peerset, vec![ Message::Connect(reserved_peer), @@ -546,9 +532,9 @@ mod tests { reserved_nodes: Vec::new(), }; - let peerset = Peerset::from_config(config); - peerset.report_peer(bootnode2, -1); - peerset.report_peer(bootnode.clone(), -1); + let (peerset, handle) = Peerset::from_config(config); + handle.report_peer(bootnode2, -1); + handle.report_peer(bootnode.clone(), -1); assert_messages(peerset, vec![ Message::Connect(bootnode.clone()), @@ -572,7 +558,7 @@ mod tests { reserved_nodes: Vec::new(), }; - let mut peerset = Peerset::from_config(config); + let (mut peerset, _handle) = Peerset::from_config(config); peerset.incoming(incoming.clone(), ii); peerset.incoming(incoming2.clone(), ii2); peerset.incoming(incoming.clone(), ii3); @@ -598,7 +584,7 @@ mod tests { reserved_nodes: vec![reserved_peer.clone()], }; - let peerset = Peerset::from_config(config); + let (peerset, _handle) = Peerset::from_config(config); let mut peerset = assert_messages(peerset, vec![ Message::Connect(reserved_peer.clone()), From 700f73ba98aa8311e958b804c5fde6ad7bd29e0a Mon Sep 17 00:00:00 2001 From: debris Date: Tue, 2 Apr 2019 14:29:24 +0200 Subject: [PATCH 11/21] test for Peerset::discovered --- core/peerset/src/lib.rs | 33 ++++++++++++++++++++++++++++----- 1 file changed, 28 insertions(+), 5 deletions(-) diff --git a/core/peerset/src/lib.rs b/core/peerset/src/lib.rs index 960a6dc3294ad..9f4355b8c2662 100644 --- a/core/peerset/src/lib.rs +++ b/core/peerset/src/lib.rs @@ -547,11 +547,13 @@ mod tests { let bootnode = PeerId::random(); let incoming = PeerId::random(); let incoming2 = PeerId::random(); + let incoming3 = PeerId::random(); let ii = IncomingIndex(1); let ii2 = IncomingIndex(2); let ii3 = IncomingIndex(3); + let ii4 = IncomingIndex(3); let config = PeersetConfig { - in_peers: 1, + in_peers: 2, out_peers: 1, bootnodes: vec![bootnode.clone()], reserved_only: false, @@ -560,13 +562,15 @@ mod tests { let (mut peerset, _handle) = Peerset::from_config(config); peerset.incoming(incoming.clone(), ii); + peerset.incoming(incoming.clone(), ii4); peerset.incoming(incoming2.clone(), ii2); - peerset.incoming(incoming.clone(), ii3); + peerset.incoming(incoming3.clone(), ii3); assert_messages(peerset, vec![ Message::Connect(bootnode.clone()), - Message::Accept(ii.clone()), - Message::Reject(ii2), + Message::Accept(ii), + Message::Reject(ii4), + Message::Accept(ii2), Message::Reject(ii3), ]); } @@ -602,6 +606,25 @@ mod tests { #[test] fn test_peerset_discovered() { - //unimplemented!(); + let bootnode = PeerId::random(); + let discovered = PeerId::random(); + let discovered2 = PeerId::random(); + let config = PeersetConfig { + in_peers: 0, + out_peers: 2, + bootnodes: vec![bootnode.clone()], + reserved_only: false, + reserved_nodes: vec![], + }; + + let (mut peerset, _handle) = Peerset::from_config(config); + peerset.discovered(discovered.clone()); + peerset.discovered(discovered.clone()); + peerset.discovered(discovered2); + + assert_messages(peerset, vec![ + Message::Connect(bootnode), + Message::Connect(discovered), + ]); } } From 169e606363b8747c21c26e43a455e8730d611e05 Mon Sep 17 00:00:00 2001 From: debris Date: Tue, 2 Apr 2019 16:05:28 +0200 Subject: [PATCH 12/21] applied review suggestions --- core/network-libp2p/src/service_task.rs | 4 +- core/peerset/src/lib.rs | 79 +++++++++++++++++-------- core/peerset/src/slots.rs | 7 ++- 3 files changed, 61 insertions(+), 29 deletions(-) diff --git a/core/network-libp2p/src/service_task.rs b/core/network-libp2p/src/service_task.rs index 62e10b045b297..d9d65e6af86d5 100644 --- a/core/network-libp2p/src/service_task.rs +++ b/core/network-libp2p/src/service_task.rs @@ -73,8 +73,8 @@ where TMessage: CustomMessage + Send + 'static { // Build the peerset. let (peerset, peerset_handle) = substrate_peerset::Peerset::from_config(substrate_peerset::PeersetConfig { - in_peers: config.in_peers as usize, - out_peers: config.out_peers as usize, + in_peers: config.in_peers, + out_peers: config.out_peers, bootnodes, reserved_only: config.non_reserved_mode == NonReservedPeerMode::Deny, reserved_nodes, diff --git a/core/peerset/src/lib.rs b/core/peerset/src/lib.rs index 9f4355b8c2662..55c758a19e9d1 100644 --- a/core/peerset/src/lib.rs +++ b/core/peerset/src/lib.rs @@ -52,13 +52,23 @@ impl Discovered { } /// Pops the oldest peer from the list. - fn pop_peer(&mut self) -> Option<(PeerId, SlotType)> { - match self.reserved.pop_front() { - Some((peer_id, _)) => Some((peer_id, SlotType::Reserved)), - None => { - let (peer_id, _) = self.common.pop_front()?; - Some((peer_id, SlotType::Common)) - } + fn pop_peer(&mut self, reserved_only: bool) -> Option<(PeerId, SlotType)> { + if let Some((peer_id, _)) = self.reserved.pop_front() { + return Some((peer_id, SlotType::Reserved)); + } + + if reserved_only { + return None; + } + + self.common.pop_front() + .map(|(peer_id, _)| (peer_id, SlotType::Common)) + } + + /// Marks the peer as not reserved. + fn mark_not_reserved(&mut self, peer_id: &PeerId) { + if let Some(_) = self.reserved.remove(peer_id) { + self.common.insert(peer_id.clone(), ()); } } } @@ -153,10 +163,10 @@ impl From for IncomingIndex { #[derive(Debug)] pub struct PeersetConfig { /// Maximum number of ingoing links to peers. - pub in_peers: usize, + pub in_peers: u32, /// Maximum number of outgoing links to peers. - pub out_peers: usize, + pub out_peers: u32, /// List of bootstrap nodes to initialize the peer with. /// @@ -244,12 +254,15 @@ impl Peerset { } } - fn on_remove_reserved_peer(&mut self, peer_id: &PeerId) { - self.data.in_slots.mark_not_reserved(peer_id); - self.data.out_slots.mark_not_reserved(peer_id); - // TODO: should we disconnect from this peer? - // a) always? - // b) only if reserved_only is set + fn on_remove_reserved_peer(&mut self, peer_id: PeerId) { + self.data.in_slots.mark_not_reserved(&peer_id); + self.data.out_slots.mark_not_reserved(&peer_id); + self.data.discovered.mark_not_reserved(&peer_id); + if self.data.reserved_only { + if self.data.in_slots.contains(&peer_id) || self.data.out_slots.contains(&peer_id) { + self.message_queue.push_back(Message::Drop(peer_id)) + } + } } fn on_set_reserved_only(&mut self, reserved_only: bool) { @@ -276,13 +289,7 @@ impl Peerset { } fn alloc_slots(&mut self) { - while let Some((peer_id, slot_type)) = self.data.discovered.pop_peer() { - // reserved peers are always at the beginning of discovered vec - // if we get a common peer, that means it's a goot time to stop - if self.data.reserved_only && slot_type == SlotType::Common { - self.data.discovered.add_peer(peer_id, slot_type); - break; - } + while let Some((peer_id, slot_type)) = self.data.discovered.pop_peer(self.data.reserved_only) { match self.data.out_slots.add_peer(peer_id.clone(), slot_type) { Ok(_) => { self.message_queue.push_back(Message::Connect(peer_id)); @@ -384,7 +391,7 @@ impl Stream for Peerset { None => return Ok(Async::Ready(None)), Some(action) => match action { Action::AddReservedPeer(peer_id) => self.on_add_reserved_peer(peer_id), - Action::RemoveReservedPeer(peer_id) => self.on_remove_reserved_peer(&peer_id), + Action::RemoveReservedPeer(peer_id) => self.on_remove_reserved_peer(peer_id), Action::SetReservedOnly(reserved) => self.on_set_reserved_only(reserved), Action::ReportPeer(peer_id, score_diff) => self.on_report_peer(peer_id, score_diff), } @@ -485,7 +492,31 @@ mod tests { #[test] fn test_peerset_remove_reserved_peer() { - //unimplemented!(); + let reserved_peer = PeerId::random(); + let reserved_peer2 = PeerId::random(); + let config = PeersetConfig { + in_peers: 0, + out_peers: 2, + bootnodes: vec![], + reserved_only: false, + reserved_nodes: vec![reserved_peer.clone(), reserved_peer2.clone()], + }; + + let (peerset, handle) = Peerset::from_config(config); + handle.remove_reserved_peer(reserved_peer.clone()); + + let peerset = assert_messages(peerset, vec![ + Message::Connect(reserved_peer.clone()), + Message::Connect(reserved_peer2.clone()), + ]); + + handle.set_reserved_only(true); + handle.remove_reserved_peer(reserved_peer2.clone()); + + assert_messages(peerset, vec![ + Message::Drop(reserved_peer), + Message::Drop(reserved_peer2), + ]); } #[test] diff --git a/core/peerset/src/slots.rs b/core/peerset/src/slots.rs index 15120385b7194..076ebc8852111 100644 --- a/core/peerset/src/slots.rs +++ b/core/peerset/src/slots.rs @@ -52,7 +52,8 @@ pub struct Slots { impl Slots { /// Creates a group of slots with a limited size. - pub fn new(max_slots: usize) -> Self { + pub fn new(max_slots: u32) -> Self { + let max_slots = max_slots as usize; Slots { max_slots, slots: HashMap::with_capacity(max_slots), @@ -66,8 +67,8 @@ impl Slots { /// Returns Ok if we successfully connected to a given peer. pub fn add_peer(&mut self, peer_id: PeerId, slot_type: SlotType) -> Result<(), SlotError> { - if let Some(slot_type) = self.slots.get_mut(&peer_id) { - *slot_type = SlotType::Reserved; + if let Some(st) = self.slots.get_mut(&peer_id) { + *st = slot_type; return Err(SlotError::AlreadyConnected(peer_id)); } From 913f2210b31bb813e0988bc72852a053f28eb285 Mon Sep 17 00:00:00 2001 From: debris Date: Tue, 2 Apr 2019 16:35:33 +0200 Subject: [PATCH 13/21] fixes to peerset::incoming --- core/peerset/src/lib.rs | 35 +++++++++++++++++++++++++++++++---- core/peerset/src/slots.rs | 4 +++- 2 files changed, 34 insertions(+), 5 deletions(-) diff --git a/core/peerset/src/lib.rs b/core/peerset/src/lib.rs index 55c758a19e9d1..5ba7bc918593a 100644 --- a/core/peerset/src/lib.rs +++ b/core/peerset/src/lib.rs @@ -41,6 +41,11 @@ impl Discovered { self.reserved.contains_key(peer_id) || self.common.contains_key(peer_id) } + /// Returns true if given node is reserved. + fn is_reserved(&self, peer_id: &PeerId) -> bool { + self.reserved.contains_key(peer_id) + } + /// Adds new peer of a given type. fn add_peer(&mut self, peer_id: PeerId, slot_type: SlotType) { if !self.contains(&peer_id) { @@ -65,12 +70,18 @@ impl Discovered { .map(|(peer_id, _)| (peer_id, SlotType::Common)) } - /// Marks the peer as not reserved. + /// Marks the node as not reserved. fn mark_not_reserved(&mut self, peer_id: &PeerId) { if let Some(_) = self.reserved.remove(peer_id) { self.common.insert(peer_id.clone(), ()); } } + + /// Removes the node from the list. + fn remove_peer(&mut self, peer_id: &PeerId) { + self.reserved.remove(peer_id); + self.common.remove(peer_id); + } } #[derive(Debug)] @@ -239,6 +250,7 @@ impl Peerset { match self.data.out_slots.add_peer(peer_id.clone(), SlotType::Reserved) { Ok(_) => { + self.data.discovered.remove_peer(&peer_id); self.message_queue.push_back(Message::Connect(peer_id)); }, Err(SlotError::AlreadyConnected(_)) => { @@ -318,18 +330,33 @@ impl Peerset { /// Because of concurrency issues, it is acceptable to call `incoming` with a `PeerId` the /// peerset is already connected to, in which case it must not answer. pub fn incoming(&mut self, peer_id: PeerId, index: IncomingIndex) { + // if `reserved_only` is set, but this peer is not a part of our discovered list, + // a) it is not reserved, so we reject the connection + // b) we are already connected to it, so we reject the connection + if self.data.reserved_only && !self.data.discovered.is_reserved(&peer_id) { + self.message_queue.push_back(Message::Reject(index)); + return; + } + // check if we are already connected to this peer - // TODO: should we take into account `reserved_only`? if self.data.out_slots.contains(&peer_id) { self.message_queue.push_back(Message::Reject(index)); + return; } - match self.data.in_slots.add_peer(peer_id, SlotType::Common) { + let slot_type = if self.data.reserved_only { + SlotType::Reserved + } else { + SlotType::Common + }; + + match self.data.in_slots.add_peer(peer_id.clone(), slot_type) { Ok(_) => { + self.data.discovered.remove_peer(&peer_id); self.message_queue.push_back(Message::Accept(index)); }, Err(SlotError::MaxConnections(peer_id)) => { - self.data.discovered.add_peer(peer_id, SlotType::Common); + self.data.discovered.add_peer(peer_id, slot_type); self.message_queue.push_back(Message::Reject(index)); }, _ => { diff --git a/core/peerset/src/slots.rs b/core/peerset/src/slots.rs index 076ebc8852111..7e2069e015974 100644 --- a/core/peerset/src/slots.rs +++ b/core/peerset/src/slots.rs @@ -68,7 +68,9 @@ impl Slots { /// Returns Ok if we successfully connected to a given peer. pub fn add_peer(&mut self, peer_id: PeerId, slot_type: SlotType) -> Result<(), SlotError> { if let Some(st) = self.slots.get_mut(&peer_id) { - *st = slot_type; + if let SlotType::Reserved = slot_type { + *st = SlotType::Reserved; + } return Err(SlotError::AlreadyConnected(peer_id)); } From 6bc7e14f4be745a25c70c30deccc6c0c54aa7e18 Mon Sep 17 00:00:00 2001 From: debris Date: Thu, 4 Apr 2019 11:13:18 +0200 Subject: [PATCH 14/21] peerset disconnects are all instantaneous --- core/peerset/src/lib.rs | 77 +++++++++++++++++++++++++++-------------- 1 file changed, 51 insertions(+), 26 deletions(-) diff --git a/core/peerset/src/lib.rs b/core/peerset/src/lib.rs index 5ba7bc918593a..a538a36178a65 100644 --- a/core/peerset/src/lib.rs +++ b/core/peerset/src/lib.rs @@ -248,20 +248,30 @@ impl Peerset { return; } - match self.data.out_slots.add_peer(peer_id.clone(), SlotType::Reserved) { - Ok(_) => { - self.data.discovered.remove_peer(&peer_id); - self.message_queue.push_back(Message::Connect(peer_id)); - }, - Err(SlotError::AlreadyConnected(_)) => { - return; - } - Err(SlotError::MaxConnections(_)) => { - self.data.discovered.add_peer(peer_id, SlotType::Reserved); - } - Err(SlotError::DemandReroute { disconnect, ..}) => { - self.message_queue.push_back(Message::Drop(disconnect)); - self.data.discovered.add_peer(peer_id, SlotType::Reserved); + loop { + match self.data.out_slots.add_peer(peer_id.clone(), SlotType::Reserved) { + Ok(_) => { + // reserved node may have been previously stored as normal node in discovered list + // let's remove it + self.data.discovered.remove_peer(&peer_id); + + // notify that connection has been made + self.message_queue.push_back(Message::Connect(peer_id)); + return; + }, + Err(SlotError::AlreadyConnected(_)) => { + return; + } + Err(SlotError::MaxConnections(_)) => { + self.data.discovered.add_peer(peer_id, SlotType::Reserved); + return; + } + Err(SlotError::DemandReroute { disconnect, ..}) => { + // disconnect not reserved node + self.data.out_slots.clear_slot(&disconnect); + self.message_queue.push_back(Message::Drop(disconnect)); + // on the next loop iteration we should connect to the peer + } } } } @@ -312,9 +322,13 @@ impl Peerset { break; }, Err(SlotError::DemandReroute { disconnect, .. }) => { + // disconnect not reserved node + self.data.out_slots.clear_slot(&disconnect); self.message_queue.push_back(Message::Drop(disconnect)); + + // add reserved node back to the discovered list, so it is + // processed again in the future self.data.discovered.add_peer(peer_id, slot_type); - break; } } } @@ -350,17 +364,28 @@ impl Peerset { SlotType::Common }; - match self.data.in_slots.add_peer(peer_id.clone(), slot_type) { - Ok(_) => { - self.data.discovered.remove_peer(&peer_id); - self.message_queue.push_back(Message::Accept(index)); - }, - Err(SlotError::MaxConnections(peer_id)) => { - self.data.discovered.add_peer(peer_id, slot_type); - self.message_queue.push_back(Message::Reject(index)); - }, - _ => { - self.message_queue.push_back(Message::Reject(index)); + loop { + match self.data.in_slots.add_peer(peer_id.clone(), slot_type) { + Ok(_) => { + self.data.discovered.remove_peer(&peer_id); + self.message_queue.push_back(Message::Accept(index)); + return; + }, + Err(SlotError::MaxConnections(peer_id)) => { + self.data.discovered.add_peer(peer_id, slot_type); + self.message_queue.push_back(Message::Reject(index)); + return; + }, + Err(SlotError::AlreadyConnected(_)) => { + self.message_queue.push_back(Message::Reject(index)); + return; + }, + Err(SlotError::DemandReroute { disconnect, .. }) => { + // disconnect not reserved node + self.data.in_slots.clear_slot(&disconnect); + self.message_queue.push_back(Message::Drop(disconnect)); + // on the next loop iteration we should accept the connection + }, } } } From a92825f9e81d2c2037ef2ae48ec312448707fc22 Mon Sep 17 00:00:00 2001 From: debris Date: Thu, 4 Apr 2019 11:36:13 +0200 Subject: [PATCH 15/21] instantaneous drop in peerset finished --- core/peerset/src/lib.rs | 9 +++++++-- core/peerset/src/slots.rs | 19 +++++++++++++++---- 2 files changed, 22 insertions(+), 6 deletions(-) diff --git a/core/peerset/src/lib.rs b/core/peerset/src/lib.rs index a538a36178a65..41d34e7f223c8 100644 --- a/core/peerset/src/lib.rs +++ b/core/peerset/src/lib.rs @@ -282,6 +282,8 @@ impl Peerset { self.data.discovered.mark_not_reserved(&peer_id); if self.data.reserved_only { if self.data.in_slots.contains(&peer_id) || self.data.out_slots.contains(&peer_id) { + self.data.in_slots.clear_slot(&peer_id); + self.data.out_slots.clear_slot(&peer_id); self.message_queue.push_back(Message::Drop(peer_id)) } } @@ -291,9 +293,10 @@ impl Peerset { // Disconnect non-reserved nodes. self.data.reserved_only = reserved_only; if self.data.reserved_only { - for peer in self.data.in_slots.common_peers().chain(self.data.out_slots.common_peers()) { + for peer_id in self.data.in_slots.clear_common_slots().into_iter().chain(self.data.out_slots.clear_common_slots().into_iter()) { // peer will be removed from `in_slots` or `out_slots` in `on_dropped` method - self.message_queue.push_back(Message::Drop(peer.clone())) + self.data.in_slots.clear_slot(&peer_id); + self.message_queue.push_back(Message::Drop(peer_id)); } } } @@ -305,6 +308,8 @@ impl Peerset { if *score < 0 { // peer will be removed from `in_slots` or `out_slots` in `on_dropped` method if self.data.in_slots.contains(&peer_id) || self.data.out_slots.contains(&peer_id) { + self.data.in_slots.clear_slot(&peer_id); + self.data.out_slots.clear_slot(&peer_id); self.message_queue.push_back(Message::Drop(peer_id)); } } diff --git a/core/peerset/src/slots.rs b/core/peerset/src/slots.rs index 7e2069e015974..49a60a191324d 100644 --- a/core/peerset/src/slots.rs +++ b/core/peerset/src/slots.rs @@ -15,6 +15,7 @@ // along with Substrate. If not, see . use std::collections::HashMap; +use std::mem; use libp2p::PeerId; /// Describes the nature of connection with a given peer. @@ -90,10 +91,20 @@ impl Slots { Ok(()) } - pub fn common_peers(&self) -> impl Iterator { - self.slots.iter() - .filter(|&(_, slot_type)| *slot_type == SlotType::Common) - .map(|(peer_id, _)| peer_id) + pub fn clear_common_slots(&mut self) -> Vec { + let slots = mem::replace(&mut self.slots, HashMap::with_capacity(self.max_slots)); + let mut common_peers = Vec::new(); + for (peer_id, slot_type) in slots { + match slot_type { + SlotType::Common => { + common_peers.push(peer_id); + }, + SlotType::Reserved => { + self.slots.insert(peer_id, slot_type); + }, + } + } + common_peers } pub fn mark_reserved(&mut self, peer_id: &PeerId) { From ce5b4b290ce27cf632cfa42e5d87a03771d4de77 Mon Sep 17 00:00:00 2001 From: debris Date: Thu, 4 Apr 2019 11:44:38 +0200 Subject: [PATCH 16/21] Peerset::set_reserved_only can also reconnect nodes --- core/peerset/src/lib.rs | 19 ++++++++++--------- core/peerset/src/slots.rs | 9 +++++---- 2 files changed, 15 insertions(+), 13 deletions(-) diff --git a/core/peerset/src/lib.rs b/core/peerset/src/lib.rs index 41d34e7f223c8..c35cee9962cc5 100644 --- a/core/peerset/src/lib.rs +++ b/core/peerset/src/lib.rs @@ -296,8 +296,11 @@ impl Peerset { for peer_id in self.data.in_slots.clear_common_slots().into_iter().chain(self.data.out_slots.clear_common_slots().into_iter()) { // peer will be removed from `in_slots` or `out_slots` in `on_dropped` method self.data.in_slots.clear_slot(&peer_id); - self.message_queue.push_back(Message::Drop(peer_id)); + self.message_queue.push_back(Message::Drop(peer_id.clone())); + self.data.discovered.add_peer(peer_id, SlotType::Common); } + } else { + self.alloc_slots(); } } @@ -592,20 +595,18 @@ mod tests { let (peerset, handle) = Peerset::from_config(config); handle.set_reserved_only(true); + handle.set_reserved_only(false); - let peerset = assert_messages(peerset, vec![ + assert_messages(peerset, vec![ Message::Connect(reserved_peer), Message::Connect(reserved_peer2), Message::Connect(bootnode.clone()), Message::Connect(bootnode2.clone()), + Message::Drop(bootnode.clone()), + Message::Drop(bootnode2.clone()), + Message::Connect(bootnode), + Message::Connect(bootnode2), ]); - - let (message, peerset) = next_message(peerset).expect("Message::Drop the bootnode"); - let (message2, _peerset) = next_message(peerset).expect("Message::Drop the bootnode2"); - assert!( - (message == Message::Drop(bootnode.clone()) && message2 == Message::Drop(bootnode2.clone())) || - (message2 == Message::Drop(bootnode) && message == Message::Drop(bootnode2)) - ); } #[test] diff --git a/core/peerset/src/slots.rs b/core/peerset/src/slots.rs index 49a60a191324d..cec421d6f8f3c 100644 --- a/core/peerset/src/slots.rs +++ b/core/peerset/src/slots.rs @@ -14,9 +14,9 @@ // You should have received a copy of the GNU General Public License // along with Substrate. If not, see . -use std::collections::HashMap; use std::mem; use libp2p::PeerId; +use linked_hash_map::LinkedHashMap; /// Describes the nature of connection with a given peer. #[derive(Debug, PartialEq, Clone, Copy)] @@ -48,7 +48,8 @@ pub enum SlotError { #[derive(Debug)] pub struct Slots { max_slots: usize, - slots: HashMap, + /// Nodes and their type. We use `LinkedHashMap` to make this data structure more predictable + slots: LinkedHashMap, } impl Slots { @@ -57,7 +58,7 @@ impl Slots { let max_slots = max_slots as usize; Slots { max_slots, - slots: HashMap::with_capacity(max_slots), + slots: LinkedHashMap::with_capacity(max_slots), } } @@ -92,7 +93,7 @@ impl Slots { } pub fn clear_common_slots(&mut self) -> Vec { - let slots = mem::replace(&mut self.slots, HashMap::with_capacity(self.max_slots)); + let slots = mem::replace(&mut self.slots, LinkedHashMap::with_capacity(self.max_slots)); let mut common_peers = Vec::new(); for (peer_id, slot_type) in slots { match slot_type { From a1fb98875628fdd514137076f5425298c7aa9c01 Mon Sep 17 00:00:00 2001 From: debris Date: Thu, 4 Apr 2019 15:45:45 +0200 Subject: [PATCH 17/21] Peerset scores cache uses lru-cache --- Cargo.lock | 1 + core/peerset/Cargo.toml | 1 + core/peerset/src/lib.rs | 21 ++++++++++++++++----- 3 files changed, 18 insertions(+), 5 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index d64cc72942a23..29886a032feab 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4036,6 +4036,7 @@ dependencies = [ "libp2p 0.6.0 (registry+https://github.com/rust-lang/crates.io-index)", "linked-hash-map 0.5.2 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.4.6 (registry+https://github.com/rust-lang/crates.io-index)", + "lru-cache 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)", "serde_json 1.0.38 (registry+https://github.com/rust-lang/crates.io-index)", ] diff --git a/core/peerset/Cargo.toml b/core/peerset/Cargo.toml index 156897bf29c3a..1b505682d85cc 100644 --- a/core/peerset/Cargo.toml +++ b/core/peerset/Cargo.toml @@ -12,4 +12,5 @@ futures = "0.1" libp2p = { version = "0.6.0", default-features = false } linked-hash-map = "0.5" log = "0.4" +lru-cache = "0.1.2" serde_json = "1.0.24" diff --git a/core/peerset/src/lib.rs b/core/peerset/src/lib.rs index c35cee9962cc5..62a9533177030 100644 --- a/core/peerset/src/lib.rs +++ b/core/peerset/src/lib.rs @@ -23,9 +23,12 @@ use std::collections::{HashMap, VecDeque}; use futures::{prelude::*, sync::mpsc, try_ready}; use libp2p::PeerId; use linked_hash_map::LinkedHashMap; +use lru_cache::LruCache; use slots::{SlotType, SlotError, Slots}; pub use serde_json::Value; +const PEERSET_SCORES_CACHE_SIZE: usize = 1000; + /// FIFO-ordered list of nodes that we know exist, but we are not connected to. #[derive(Debug, Default)] struct Discovered { @@ -96,7 +99,7 @@ struct PeersetData { /// Node slots for incoming connections. in_slots: Slots, /// List of node scores. - scores: HashMap, + scores: LruCache, } #[derive(Debug)] @@ -216,7 +219,7 @@ impl Peerset { reserved_only: config.reserved_only, out_slots: Slots::new(config.out_peers), in_slots: Slots::new(config.in_peers), - scores: Default::default(), + scores: LruCache::new(PEERSET_SCORES_CACHE_SIZE), }; let handle = PeersetHandle { @@ -305,10 +308,18 @@ impl Peerset { } fn on_report_peer(&mut self, peer_id: PeerId, score_diff: i32) { - let score = self.data.scores.entry(peer_id.clone()).or_default(); - *score = score.saturating_add(score_diff); + let score = match self.data.scores.get_mut(&peer_id) { + Some(score) => { + *score = score.saturating_add(score_diff); + *score + }, + None => { + self.data.scores.insert(peer_id.clone(), score_diff); + score_diff + } + }; - if *score < 0 { + if score < 0 { // peer will be removed from `in_slots` or `out_slots` in `on_dropped` method if self.data.in_slots.contains(&peer_id) || self.data.out_slots.contains(&peer_id) { self.data.in_slots.clear_slot(&peer_id); From 8b202525a6dbf12d13f99ed3e8f5425b00f973e8 Mon Sep 17 00:00:00 2001 From: debris Date: Thu, 4 Apr 2019 15:46:49 +0200 Subject: [PATCH 18/21] remove redundant function call and comment from Peerset::on_set_reserved_only --- core/peerset/src/lib.rs | 2 -- 1 file changed, 2 deletions(-) diff --git a/core/peerset/src/lib.rs b/core/peerset/src/lib.rs index 62a9533177030..581b9a99b4567 100644 --- a/core/peerset/src/lib.rs +++ b/core/peerset/src/lib.rs @@ -297,8 +297,6 @@ impl Peerset { self.data.reserved_only = reserved_only; if self.data.reserved_only { for peer_id in self.data.in_slots.clear_common_slots().into_iter().chain(self.data.out_slots.clear_common_slots().into_iter()) { - // peer will be removed from `in_slots` or `out_slots` in `on_dropped` method - self.data.in_slots.clear_slot(&peer_id); self.message_queue.push_back(Message::Drop(peer_id.clone())); self.data.discovered.add_peer(peer_id, SlotType::Common); } From 310cef2011eeffec3f4f25c65cf58a02dd706267 Mon Sep 17 00:00:00 2001 From: debris Date: Thu, 4 Apr 2019 16:23:04 +0200 Subject: [PATCH 19/21] add_peer returns SlotState enum --- core/peerset/src/lib.rs | 122 ++++++++++++++++++-------------------- core/peerset/src/slots.rs | 60 ++++++++++++------- 2 files changed, 97 insertions(+), 85 deletions(-) diff --git a/core/peerset/src/lib.rs b/core/peerset/src/lib.rs index 581b9a99b4567..e14184e15f2bb 100644 --- a/core/peerset/src/lib.rs +++ b/core/peerset/src/lib.rs @@ -19,12 +19,12 @@ mod slots; -use std::collections::{HashMap, VecDeque}; +use std::collections::VecDeque; use futures::{prelude::*, sync::mpsc, try_ready}; use libp2p::PeerId; use linked_hash_map::LinkedHashMap; use lru_cache::LruCache; -use slots::{SlotType, SlotError, Slots}; +use slots::{SlotType, SlotState, Slots}; pub use serde_json::Value; const PEERSET_SCORES_CACHE_SIZE: usize = 1000; @@ -251,30 +251,30 @@ impl Peerset { return; } - loop { - match self.data.out_slots.add_peer(peer_id.clone(), SlotType::Reserved) { - Ok(_) => { - // reserved node may have been previously stored as normal node in discovered list - // let's remove it - self.data.discovered.remove_peer(&peer_id); + match self.data.out_slots.add_peer(peer_id, SlotType::Reserved) { + SlotState::Added(peer_id) => { + // reserved node may have been previously stored as normal node in discovered list + self.data.discovered.remove_peer(&peer_id); - // notify that connection has been made - self.message_queue.push_back(Message::Connect(peer_id)); - return; - }, - Err(SlotError::AlreadyConnected(_)) => { - return; - } - Err(SlotError::MaxConnections(_)) => { - self.data.discovered.add_peer(peer_id, SlotType::Reserved); - return; - } - Err(SlotError::DemandReroute { disconnect, ..}) => { - // disconnect not reserved node - self.data.out_slots.clear_slot(&disconnect); - self.message_queue.push_back(Message::Drop(disconnect)); - // on the next loop iteration we should connect to the peer - } + // notify that connection has been made + self.message_queue.push_back(Message::Connect(peer_id)); + return; + }, + SlotState::Swaped { removed, added } => { + // reserved node may have been previously stored as normal node in discovered list + self.data.discovered.remove_peer(&added); + // let's add the peer we disconnected from to the discovered list again + self.data.discovered.add_peer(removed.clone(), SlotType::Common); + // swap connections + self.message_queue.push_back(Message::Drop(removed)); + self.message_queue.push_back(Message::Connect(added)); + } + SlotState::AlreadyConnected(_) | SlotState::Upgraded(_) => { + return; + } + SlotState::MaxConnections(peer_id) => { + self.data.discovered.add_peer(peer_id, SlotType::Reserved); + return; } } } @@ -329,24 +329,21 @@ impl Peerset { fn alloc_slots(&mut self) { while let Some((peer_id, slot_type)) = self.data.discovered.pop_peer(self.data.reserved_only) { - match self.data.out_slots.add_peer(peer_id.clone(), slot_type) { - Ok(_) => { + match self.data.out_slots.add_peer(peer_id, slot_type) { + SlotState::Added(peer_id) => { self.message_queue.push_back(Message::Connect(peer_id)); }, - Err(SlotError::AlreadyConnected(_)) => (), - Err(SlotError::MaxConnections(_)) => { + SlotState::Swaped { removed, added } => { + self.message_queue.push_back(Message::Drop(removed)); + self.message_queue.push_back(Message::Connect(added)); + } + SlotState::Upgraded(_) | SlotState::AlreadyConnected(_) => { + // TODO: we should never reach this point + }, + SlotState::MaxConnections(peer_id) => { self.data.discovered.add_peer(peer_id, slot_type); break; }, - Err(SlotError::DemandReroute { disconnect, .. }) => { - // disconnect not reserved node - self.data.out_slots.clear_slot(&disconnect); - self.message_queue.push_back(Message::Drop(disconnect)); - - // add reserved node back to the discovered list, so it is - // processed again in the future - self.data.discovered.add_peer(peer_id, slot_type); - } } } } @@ -371,7 +368,7 @@ impl Peerset { // check if we are already connected to this peer if self.data.out_slots.contains(&peer_id) { - self.message_queue.push_back(Message::Reject(index)); + // we are already connected. in this case we do not answer return; } @@ -381,29 +378,29 @@ impl Peerset { SlotType::Common }; - loop { - match self.data.in_slots.add_peer(peer_id.clone(), slot_type) { - Ok(_) => { - self.data.discovered.remove_peer(&peer_id); - self.message_queue.push_back(Message::Accept(index)); - return; - }, - Err(SlotError::MaxConnections(peer_id)) => { - self.data.discovered.add_peer(peer_id, slot_type); - self.message_queue.push_back(Message::Reject(index)); - return; - }, - Err(SlotError::AlreadyConnected(_)) => { - self.message_queue.push_back(Message::Reject(index)); - return; - }, - Err(SlotError::DemandReroute { disconnect, .. }) => { - // disconnect not reserved node - self.data.in_slots.clear_slot(&disconnect); - self.message_queue.push_back(Message::Drop(disconnect)); - // on the next loop iteration we should accept the connection - }, - } + match self.data.in_slots.add_peer(peer_id, slot_type) { + SlotState::Added(peer_id) => { + // reserved node may have been previously stored as normal node in discovered list + self.data.discovered.remove_peer(&peer_id); + self.message_queue.push_back(Message::Accept(index)); + return; + }, + SlotState::Swaped { removed, added } => { + // reserved node may have been previously stored as normal node in discovered list + self.data.discovered.remove_peer(&added); + // swap connections. + self.message_queue.push_back(Message::Drop(removed)); + self.message_queue.push_back(Message::Accept(index)); + }, + SlotState::AlreadyConnected(_) | SlotState::Upgraded(_) => { + // we are already connected. in this case we do not answer + return; + }, + SlotState::MaxConnections(peer_id) => { + self.data.discovered.add_peer(peer_id, slot_type); + self.message_queue.push_back(Message::Reject(index)); + return; + }, } } @@ -667,7 +664,6 @@ mod tests { assert_messages(peerset, vec![ Message::Connect(bootnode.clone()), Message::Accept(ii), - Message::Reject(ii4), Message::Accept(ii2), Message::Reject(ii3), ]); diff --git a/core/peerset/src/slots.rs b/core/peerset/src/slots.rs index cec421d6f8f3c..548be2d08c6d6 100644 --- a/core/peerset/src/slots.rs +++ b/core/peerset/src/slots.rs @@ -28,20 +28,24 @@ pub enum SlotType { Common, } -/// Descibes why the reason of not being able to add given peer. -pub enum SlotError { +/// Descibes the result of `add_peer` action. +pub enum SlotState { + /// Returned when `add_peer` successfully adds a peer to the slot. + Added(PeerId), + /// Returned we already have a connection to a given peer, but it is upgraded from + /// `Common` to `Reserved`. + Upgraded(PeerId), + /// Returned when we should removed a common peer to make space for a reserved peer. + Swaped { + /// Peer we should disconnect from. + removed: PeerId, + /// Peer we should connect to. + added: PeerId, + }, /// Error returned when we are already connected to this peer. AlreadyConnected(PeerId), /// Error returned when max number of connections has been already established. MaxConnections(PeerId), - /// Error returned when we should disconnect from a given common peer to make space - /// for a reserved peer. - DemandReroute { - /// Peer we should disconnect from. - disconnect: PeerId, - /// Peer we should connect to. - connect: PeerId, - } } /// Contains all information about group of slots. @@ -67,29 +71,41 @@ impl Slots { self.slots.contains_key(peer_id) } - /// Returns Ok if we successfully connected to a given peer. - pub fn add_peer(&mut self, peer_id: PeerId, slot_type: SlotType) -> Result<(), SlotError> { + /// Tries to find a slot for a given peer and returns `SlotState`. + pub fn add_peer(&mut self, peer_id: PeerId, slot_type: SlotType) -> SlotState { if let Some(st) = self.slots.get_mut(&peer_id) { - if let SlotType::Reserved = slot_type { + if *st == SlotType::Common && slot_type == SlotType::Reserved { *st = SlotType::Reserved; + return SlotState::Upgraded(peer_id); + } else { + return SlotState::AlreadyConnected(peer_id); } - return Err(SlotError::AlreadyConnected(peer_id)); } if self.slots.len() == self.max_slots { if let SlotType::Reserved = slot_type { - if let Some((to_disconnect, _)) = self.slots.iter().find(|(_, &slot_type)| slot_type == SlotType::Common) { - return Err(SlotError::DemandReroute { - disconnect: to_disconnect.clone(), - connect: peer_id, - }); + // if we are trying to insert a reserved peer, but we all of our slots are full, + // we need to remove one of the existing common connections + let to_remove = self.slots.iter() + .find(|(_, &slot_type)| slot_type == SlotType::Common) + .map(|(to_remove, _)| to_remove) + .cloned(); + + if let Some(to_remove) = to_remove { + self.slots.remove(&to_remove); + self.slots.insert(peer_id.clone(), slot_type); + + return SlotState::Swaped { + removed: to_remove, + added: peer_id, + }; } } - return Err(SlotError::MaxConnections(peer_id)); + return SlotState::MaxConnections(peer_id); } - self.slots.insert(peer_id, slot_type); - Ok(()) + self.slots.insert(peer_id.clone(), slot_type); + SlotState::Added(peer_id) } pub fn clear_common_slots(&mut self) -> Vec { From 3e2340dfe028df8b44294ac1d743ef4b511573a8 Mon Sep 17 00:00:00 2001 From: debris Date: Thu, 4 Apr 2019 16:33:50 +0200 Subject: [PATCH 20/21] apply review suggestions --- core/peerset/src/lib.rs | 18 ++++++++++++------ core/peerset/src/slots.rs | 4 ++-- 2 files changed, 14 insertions(+), 8 deletions(-) diff --git a/core/peerset/src/lib.rs b/core/peerset/src/lib.rs index e14184e15f2bb..d2d3ea59cbf02 100644 --- a/core/peerset/src/lib.rs +++ b/core/peerset/src/lib.rs @@ -284,10 +284,13 @@ impl Peerset { self.data.out_slots.mark_not_reserved(&peer_id); self.data.discovered.mark_not_reserved(&peer_id); if self.data.reserved_only { - if self.data.in_slots.contains(&peer_id) || self.data.out_slots.contains(&peer_id) { - self.data.in_slots.clear_slot(&peer_id); - self.data.out_slots.clear_slot(&peer_id); - self.message_queue.push_back(Message::Drop(peer_id)) + if self.data.in_slots.clear_slot(&peer_id) || self.data.out_slots.clear_slot(&peer_id) { + // insert peer back into discovered list + self.data.discovered.add_peer(peer_id.clone(), SlotType::Common); + self.message_queue.push_back(Message::Drop(peer_id)); + // call alloc_slots again, cause we may have some reserved peers in discovered list + // waiting for the slot that was just cleared + self.alloc_slots(); } } } @@ -297,8 +300,9 @@ impl Peerset { self.data.reserved_only = reserved_only; if self.data.reserved_only { for peer_id in self.data.in_slots.clear_common_slots().into_iter().chain(self.data.out_slots.clear_common_slots().into_iter()) { - self.message_queue.push_back(Message::Drop(peer_id.clone())); - self.data.discovered.add_peer(peer_id, SlotType::Common); + // insert peer back into discovered list + self.data.discovered.add_peer(peer_id.clone(), SlotType::Common); + self.message_queue.push_back(Message::Drop(peer_id)); } } else { self.alloc_slots(); @@ -334,6 +338,8 @@ impl Peerset { self.message_queue.push_back(Message::Connect(peer_id)); }, SlotState::Swaped { removed, added } => { + // insert peer back into discovered list + self.data.discovered.add_peer(removed.clone(), SlotType::Common); self.message_queue.push_back(Message::Drop(removed)); self.message_queue.push_back(Message::Connect(added)); } diff --git a/core/peerset/src/slots.rs b/core/peerset/src/slots.rs index 548be2d08c6d6..568981d04ac19 100644 --- a/core/peerset/src/slots.rs +++ b/core/peerset/src/slots.rs @@ -136,8 +136,8 @@ impl Slots { } } - pub fn clear_slot(&mut self, peer_id: &PeerId) { - self.slots.remove(peer_id); + pub fn clear_slot(&mut self, peer_id: &PeerId) -> bool { + self.slots.remove(peer_id).is_some() } pub fn is_reserved(&self, peer_id: &PeerId) -> bool { From de2525bd1bbd3f03e656892146a91a6081c0c937 Mon Sep 17 00:00:00 2001 From: debris Date: Thu, 4 Apr 2019 16:36:39 +0200 Subject: [PATCH 21/21] is_reserved -> is_connected_and_reserved --- core/peerset/src/lib.rs | 2 +- core/peerset/src/slots.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/core/peerset/src/lib.rs b/core/peerset/src/lib.rs index d2d3ea59cbf02..e49a5b2baebd6 100644 --- a/core/peerset/src/lib.rs +++ b/core/peerset/src/lib.rs @@ -416,7 +416,7 @@ impl Peerset { /// `PeerId`, or accepted an incoming connection with this `PeerId`. pub fn dropped(&mut self, peer_id: PeerId) { // Automatically connect back if reserved. - if self.data.in_slots.is_reserved(&peer_id) || self.data.out_slots.is_reserved(&peer_id) { + if self.data.in_slots.is_connected_and_reserved(&peer_id) || self.data.out_slots.is_connected_and_reserved(&peer_id) { self.message_queue.push_back(Message::Connect(peer_id)); return; } diff --git a/core/peerset/src/slots.rs b/core/peerset/src/slots.rs index 568981d04ac19..7fa655d7ff608 100644 --- a/core/peerset/src/slots.rs +++ b/core/peerset/src/slots.rs @@ -140,7 +140,7 @@ impl Slots { self.slots.remove(peer_id).is_some() } - pub fn is_reserved(&self, peer_id: &PeerId) -> bool { + pub fn is_connected_and_reserved(&self, peer_id: &PeerId) -> bool { self.slots.get(peer_id) .map(|slot_type| *slot_type == SlotType::Reserved) .unwrap_or_else(|| false)