diff --git a/node/router/src/handshake.rs b/node/router/src/handshake.rs index 4378baacae..bd20bc6c33 100644 --- a/node/router/src/handshake.rs +++ b/node/router/src/handshake.rs @@ -28,7 +28,7 @@ use snarkvm::{ use anyhow::{Result, bail}; use futures::SinkExt; use rand::{Rng, rngs::OsRng}; -use std::{collections::hash_map::Entry, io, net::SocketAddr}; +use std::{collections::hash_map::Entry, io, mem, net::SocketAddr}; use tokio::net::TcpStream; use tokio_stream::StreamExt; use tokio_util::codec::Framed; @@ -90,14 +90,14 @@ impl Router { peer_side: ConnectionSide, genesis_header: Header, restrictions_id: Field, - ) -> io::Result<(SocketAddr, Framed<&'a mut TcpStream, MessageCodec>)> { + ) -> io::Result> { // If this is an inbound connection, we log it, but don't know the listening address yet. // Otherwise, we can immediately register the listening address. - let mut peer_ip = if peer_side == ConnectionSide::Initiator { + let mut listener_addr = if peer_side == ConnectionSide::Initiator { debug!("Received a connection request from '{peer_addr}'"); None } else { - debug!("Connecting to {peer_addr}..."); + debug!("Connecting to '{peer_addr}'..."); Some(peer_addr) }; @@ -121,17 +121,23 @@ impl Router { } } - // Perform the handshake; we pass on a mutable reference to peer_ip in case the process is broken at any point in time. + // Perform the handshake; we pass on a mutable reference to listener_addr in case the process is broken at any point in time. let handshake_result = if peer_side == ConnectionSide::Responder { - self.handshake_inner_initiator(peer_addr, &mut peer_ip, stream, genesis_header, restrictions_id).await + self.handshake_inner_initiator(peer_addr, stream, genesis_header, restrictions_id).await } else { - self.handshake_inner_responder(peer_addr, &mut peer_ip, stream, genesis_header, restrictions_id).await + self.handshake_inner_responder(peer_addr, &mut listener_addr, stream, genesis_header, restrictions_id).await }; - if let Some(ip) = peer_ip { - if handshake_result.is_err() { - // Remove the address from the collection of connecting peers (if the handshake got to the point where it's known). - self.connecting_peers.lock().remove(&ip); + if let Some(addr) = listener_addr { + if let Ok(ref challenge_request) = handshake_result { + if let Some(peer) = self.peer_pool.write().get_mut(&addr) { + peer.upgrade_to_connected(peer_addr, challenge_request, self.clone()); + } + #[cfg(feature = "metrics")] + self.update_metrics(); + debug!("Completed the handshake with '{peer_addr}'"); + } else if let Some(peer) = self.peer_pool.write().get_mut(&addr) { + peer.downgrade_to_candidate(addr, false); } } @@ -142,13 +148,21 @@ impl Router { async fn handshake_inner_initiator<'a>( &'a self, peer_addr: SocketAddr, - peer_ip: &mut Option, stream: &'a mut TcpStream, genesis_header: Header, restrictions_id: Field, - ) -> io::Result<(SocketAddr, Framed<&'a mut TcpStream, MessageCodec>)> { - // This value is immediately guaranteed to be present, so it can be unwrapped. - let peer_ip = peer_ip.unwrap(); + ) -> io::Result> { + match self.peer_pool.write().entry(peer_addr) { + Entry::Vacant(entry) => { + entry.insert(Peer::Connecting); + } + Entry::Occupied(mut entry) if matches!(entry.get(), Peer::Candidate(_)) => { + entry.insert(Peer::Connecting); + } + Entry::Occupied(_) => { + return Err(error(format!("Duplicate connection attempt with '{peer_addr}'"))); + } + } // Construct the stream. let mut framed = Framed::new(stream, MessageCodec::::handshake()); @@ -191,6 +205,7 @@ impl Router { send(&mut framed, peer_addr, reason.into()).await?; return Err(error(format!("Dropped '{peer_addr}' for reason: {reason:?}"))); } + /* Step 3: Send the challenge response. */ let response_nonce: u64 = rng.r#gen(); @@ -208,23 +223,18 @@ impl Router { }; send(&mut framed, peer_addr, Message::ChallengeResponse(our_response)).await?; - // Finalize the connecting peer information. - self.connecting_peers.lock().insert(peer_ip, Some(Peer::new(peer_ip, peer_addr, &peer_request))); - // Adds a bidirectional map between the listener address and (ambiguous) peer address. - self.resolver.write().insert_peer(peer_ip, peer_addr); - - Ok((peer_ip, framed)) + Ok(peer_request) } /// The connection responder side of the handshake. async fn handshake_inner_responder<'a>( &'a self, peer_addr: SocketAddr, - peer_ip: &mut Option, + listener_addr: &mut Option, stream: &'a mut TcpStream, genesis_header: Header, restrictions_id: Field, - ) -> io::Result<(SocketAddr, Framed<&'a mut TcpStream, MessageCodec>)> { + ) -> io::Result> { // Construct the stream. let mut framed = Framed::new(stream, MessageCodec::::handshake()); @@ -234,11 +244,11 @@ impl Router { let peer_request = expect_message!(Message::ChallengeRequest, framed, peer_addr); // Obtain the peer's listening address. - *peer_ip = Some(SocketAddr::new(peer_addr.ip(), peer_request.listener_port)); - let peer_ip = peer_ip.unwrap(); + *listener_addr = Some(SocketAddr::new(peer_addr.ip(), peer_request.listener_port)); + let listener_addr = listener_addr.unwrap(); // Knowing the peer's listening address, ensure it is allowed to connect. - if let Err(forbidden_message) = self.ensure_peer_is_allowed(peer_ip) { + if let Err(forbidden_message) = self.ensure_peer_is_allowed(listener_addr) { return Err(error(format!("{forbidden_message}"))); } // Verify the challenge request. If a disconnect reason was returned, send the disconnect message and abort. @@ -246,6 +256,7 @@ impl Router { send(&mut framed, peer_addr, reason.into()).await?; return Err(error(format!("Dropped '{peer_addr}' for reason: {reason:?}"))); } + /* Step 2: Send the challenge response followed by own challenge request. */ // Initialize an RNG. @@ -293,50 +304,44 @@ impl Router { return Err(error(format!("Dropped '{peer_addr}' for reason: {reason:?}"))); } - // Finalize the connecting peer information. - self.connecting_peers.lock().insert(peer_ip, Some(Peer::new(peer_ip, peer_addr, &peer_request))); - // Adds a bidirectional map between the listener address and (ambiguous) peer address. - self.resolver.write().insert_peer(peer_ip, peer_addr); - - Ok((peer_ip, framed)) + Ok(peer_request) } /// Ensure the peer is allowed to connect. - fn ensure_peer_is_allowed(&self, peer_ip: SocketAddr) -> Result<()> { + fn ensure_peer_is_allowed(&self, listener_addr: SocketAddr) -> Result<()> { // Ensure the peer IP is not this node. - if self.is_local_ip(&peer_ip) { - bail!("Dropping connection request from '{peer_ip}' (attempted to self-connect)") - } - // Ensure the node is not already connecting to this peer. - match self.connecting_peers.lock().entry(peer_ip) { - Entry::Vacant(entry) => entry.insert(None), - Entry::Occupied(_) => { - bail!("Dropping connection request from '{peer_ip}' (already shaking hands as the initiator)") - } - }; - // Ensure the node is not already connected to this peer. - if self.is_connected(&peer_ip) { - bail!("Dropping connection request from '{peer_ip}' (already connected)") + if self.is_local_ip(&listener_addr) { + bail!("Dropping connection request from '{listener_addr}' (attempted to self-connect)") } // Ensure either the peer is trusted or `allow_external_peers` is true. - if !self.allow_external_peers() && !self.is_trusted(&peer_ip) { - bail!("Dropping connection request from '{peer_ip}' (untrusted)") + if !self.allow_external_peers() && !self.is_trusted(&listener_addr) { + bail!("Dropping connection request from '{listener_addr}' (untrusted)") } - // Ensure the peer is not restricted. - if self.is_restricted(&peer_ip) { - bail!("Dropping connection request from '{peer_ip}' (restricted)") - } - // Ensure the peer is not spamming connection attempts. - if !peer_ip.ip().is_loopback() { - // Add this connection attempt and retrieve the number of attempts. - let num_attempts = self.cache.insert_inbound_connection(peer_ip.ip(), Self::RADIO_SILENCE_IN_SECS as i64); - // Ensure the connecting peer has not surpassed the connection attempt limit. - if num_attempts > Self::MAXIMUM_CONNECTION_FAILURES { - // Restrict the peer. - self.insert_restricted_peer(peer_ip); - bail!("Dropping connection request from '{peer_ip}' (tried {num_attempts} times)") + // Ensure the node is not already connecting to this peer. + match self.peer_pool.write().entry(listener_addr) { + Entry::Vacant(entry) => { + entry.insert(Peer::Connecting); } - } + Entry::Occupied(mut entry) => match entry.get_mut() { + Peer::Candidate(peer) + if peer + .restricted + .map(|ts| ts.elapsed().as_secs() < Self::RADIO_SILENCE_IN_SECS) + .unwrap_or(false) => + { + bail!("Dropping connection request from '{listener_addr}' (restricted)"); + } + peer @ Peer::Candidate(_) => { + let _ = mem::replace(peer, Peer::Connecting); + } + Peer::Connecting => { + bail!("Dropping connection request from '{listener_addr}' (already connecting)"); + } + Peer::Connected(_) => { + bail!("Dropping connection request from '{listener_addr}' (already connected)"); + } + }, + }; Ok(()) } diff --git a/node/router/src/heartbeat.rs b/node/router/src/heartbeat.rs index 326b5e0bc0..3cb5030d72 100644 --- a/node/router/src/heartbeat.rs +++ b/node/router/src/heartbeat.rs @@ -14,8 +14,8 @@ // limitations under the License. use crate::{ + ConnectedPeer, Outbound, - Peer, Router, messages::{DisconnectReason, Message, PeerRequest}, }; @@ -99,11 +99,11 @@ pub trait Heartbeat: Outbound { // Check if any connected peer is stale. for peer in self.router().get_connected_peers() { // Disconnect if the peer has not communicated back within the predefined time. - let elapsed = peer.last_seen().elapsed().as_secs(); + let elapsed = peer.last_seen.elapsed().as_secs(); if elapsed > Router::::RADIO_SILENCE_IN_SECS { - warn!("Peer {} has not communicated in {elapsed} seconds", peer.ip()); + warn!("Peer {} has not communicated in {elapsed} seconds", peer.listener_addr); // Disconnect from this peer. - self.router().disconnect(peer.ip()); + self.router().disconnect(peer.listener_addr); } } } @@ -116,7 +116,7 @@ pub trait Heartbeat: Outbound { /// - Peers that we are currently syncing with are not removable. /// - Validators are considered higher priority than provers or clients. /// - Connections that have not been seen in a while are considered lower priority. - fn get_removable_peers(&self) -> Vec> { + fn get_removable_peers(&self) -> Vec> { // The trusted peers (specified at runtime). let trusted = self.router().trusted_peers(); // The hardcoded bootstrap nodes. @@ -129,16 +129,18 @@ pub trait Heartbeat: Outbound { // Note, that this gives equal priority to clients and provers, which // we might want to change in the future. let mut peers = self.router().get_connected_peers(); - peers.sort_by_key(|peer| (peer.is_validator(), peer.last_seen())); + peers.sort_by_key(|peer| (peer.node_type.is_validator(), peer.last_seen)); // Determine which of the peers can be removed. peers .into_iter() .filter(|peer| { - !trusted.contains(&peer.ip()) // Always keep trusted nodes. - && !bootstrap.contains(&peer.ip()) // Always keep bootstrap nodes. - && !self.router().cache.contains_inbound_block_request(&peer.ip()) // This peer is currently syncing from us. - && (is_block_synced || self.router().cache.num_outbound_block_requests(&peer.ip()) == 0) // We are currently syncing from this peer. + let addr = peer.listener_addr; + + !trusted.contains(&addr) // Always keep trusted nodes. + && !bootstrap.contains(&addr) // Always keep bootstrap nodes. + && !self.router().cache.contains_inbound_block_request(&addr) // This peer is currently syncing from us. + && (is_block_synced || self.router().cache.num_outbound_block_requests(&addr) == 0) // We are currently syncing from this peer. }) .collect() } @@ -160,7 +162,7 @@ pub trait Heartbeat: Outbound { // Disconnect from the oldest connected peer, which is the first entry in the list // of removable peers. // Do nothing, if the list is empty. - if let Some(oldest) = self.get_removable_peers().first().map(|peer| peer.ip()) { + if let Some(oldest) = self.get_removable_peers().first().map(|peer| peer.listener_addr) { info!("Disconnecting from '{oldest}' (periodic refresh of peers)"); let _ = self.router().send(oldest, Message::Disconnect(DisconnectReason::PeerRefresh.into())); self.router().disconnect(oldest); @@ -217,8 +219,8 @@ pub trait Heartbeat: Outbound { let peers_to_disconnect = self .get_removable_peers() .into_iter() - .filter(|peer| !peer.is_prover()) // remove provers as those are handled separately - .map(|p| p.ip()) + .filter(|peer| !peer.node_type.is_prover()) // remove provers as those are handled separately + .map(|peer| peer.listener_addr) .take(num_surplus_clients_validators); // Proceed to send disconnect requests to these peers. @@ -226,7 +228,7 @@ pub trait Heartbeat: Outbound { // TODO (howardwu): Remove this after specializing this function. if self.router().node_type().is_prover() { if let Some(peer) = self.router().get_connected_peer(&peer_ip) { - if peer.node_type().is_validator() { + if peer.node_type.is_validator() { continue; } } @@ -313,8 +315,8 @@ pub trait Heartbeat: Outbound { .trusted_peers() .iter() .filter_map(|peer_ip| { - // If the peer is not connected, attempt to connect to it. - if self.router().is_connected(peer_ip) { + // If the peer is not already connecting or connected, attempt to connect to it. + if self.router().is_connecting(peer_ip) || self.router().is_connected(peer_ip) { None } else { debug!("Attempting to (re-)connect to trusted peer `{peer_ip}`"); diff --git a/node/router/src/helpers/peer.rs b/node/router/src/helpers/peer.rs index 96f4281d6a..952ae85e8d 100644 --- a/node/router/src/helpers/peer.rs +++ b/node/router/src/helpers/peer.rs @@ -13,108 +13,131 @@ // See the License for the specific language governing permissions and // limitations under the License. -use crate::messages::{ChallengeRequest, NodeType}; +use crate::{NodeType, Router, messages::ChallengeRequest}; use snarkvm::prelude::{Address, Network}; use std::{net::SocketAddr, time::Instant}; -/// The state for each connected peer. -#[derive(Clone, Debug)] -pub struct Peer { - /// The IP address of the peer, with the port set to the listener port. - peer_ip: SocketAddr, +/// A peer of any connection status. +pub enum Peer { + /// A candidate peer that's currently not connected to. + Candidate(CandidatePeer), + /// A peer that's currently being connected to (the handshake is in progress). + Connecting, + /// A fully connected (post-handshake) peer. + Connected(ConnectedPeer), +} + +/// A candidate peer. +#[derive(Clone)] +pub struct CandidatePeer { + /// The listening address of a candidate peer. + pub listener_addr: SocketAddr, + /// A restricted peer is one that has recently exceeded the `MAXIMUM_CONNECTION_FAILURES`, + /// and is ineligible for connection attempts for `RADIO_SILENCE_IN_SECS`. + // TODO: consider removing in favor of the ban feature for simplicity. + pub restricted: Option, +} + +impl CandidatePeer { + /// Restrict a candidate peer with the current timestamp. + pub fn restrict(&mut self) { + self.restricted = Some(Instant::now()); + } +} + +/// A fully connected peer. +#[derive(Clone)] +pub struct ConnectedPeer { + /// The listener address of the peer. + pub listener_addr: SocketAddr, /// The connected address of the peer. - peer_addr: SocketAddr, + pub connected_addr: SocketAddr, /// The Aleo address of the peer. - address: Address, + pub aleo_addr: Address, /// The node type of the peer. - node_type: NodeType, + pub node_type: NodeType, /// The message version of the peer. - version: u32, + pub version: u32, /// The timestamp of the first message received from the peer. - first_seen: Instant, + pub first_seen: Instant, /// The timestamp of the last message received from this peer. - last_seen: Instant, + pub last_seen: Instant, + /// A reference to the associated `Router` object. + pub router: Router, } impl Peer { - /// Initializes a new instance of `Peer`. - pub fn new(listening_ip: SocketAddr, connected_ip: SocketAddr, challenge_request: &ChallengeRequest) -> Self { - Self { - peer_ip: listening_ip, - peer_addr: connected_ip, - address: challenge_request.address, - node_type: challenge_request.node_type, - version: challenge_request.version, - first_seen: Instant::now(), - last_seen: Instant::now(), - } - } - - /// Returns the IP address of the peer, with the port set to the listener port. - pub const fn ip(&self) -> SocketAddr { - self.peer_ip - } - - /// Returns the connected address of the peer. - pub const fn addr(&self) -> SocketAddr { - self.peer_addr + /// Create a candidate peer. + pub const fn new_candidate(listener_addr: SocketAddr) -> Self { + Self::Candidate(CandidatePeer { listener_addr, restricted: None }) } - /// Returns the Aleo address of the peer. - pub const fn address(&self) -> Address { - self.address + /// Promote a connecting peer to a fully connected one. + pub fn upgrade_to_connected(&mut self, connected_addr: SocketAddr, cr: &ChallengeRequest, router: Router) { + // Logic check: this can only happen during the handshake. + assert!(matches!(self, Self::Connecting)); + + let timestamp = Instant::now(); + let listener_addr = SocketAddr::from((connected_addr.ip(), cr.listener_port)); + + // Introduce the peer in the resolver. + router.resolver.write().insert_peer(listener_addr, connected_addr); + + *self = Self::Connected(ConnectedPeer { + listener_addr, + connected_addr, + aleo_addr: cr.address, + node_type: cr.node_type, + version: cr.version, + first_seen: timestamp, + last_seen: timestamp, + router, + }); } - /// Returns the node type. - pub const fn node_type(&self) -> NodeType { - self.node_type - } - - /// Returns `true` if the peer is a validator. - pub const fn is_validator(&self) -> bool { - self.node_type.is_validator() - } - - /// Returns `true` if the peer is a prover. - pub const fn is_prover(&self) -> bool { - self.node_type.is_prover() - } - - /// Returns `true` if the peer is a client. - pub const fn is_client(&self) -> bool { - self.node_type.is_client() - } + /// Demote a peer to candidate status, marking it as disconnected. + pub fn downgrade_to_candidate(&mut self, listener_addr: SocketAddr, restrict: bool) { + // Connecting peers are not in the resolver. + if let Self::Connected(peer) = self { + // Remove the peer from the resolver. + peer.router.resolver.write().remove_peer(&peer.connected_addr); + }; - /// Returns the message version of the peer. - pub const fn version(&self) -> u32 { - self.version + let restricted = if restrict { Some(Instant::now()) } else { None }; + *self = Self::Candidate(CandidatePeer { listener_addr, restricted }); } - /// Returns the first seen timestamp of the peer. - pub fn first_seen(&self) -> Instant { - self.first_seen + /// Returns the type of the node (only applicable to connected peers). + pub fn node_type(&self) -> Option { + match self { + Self::Candidate(_) => None, + Self::Connecting => None, + Self::Connected(peer) => Some(peer.node_type), + } } - /// Returns the last seen timestamp of the peer. - pub fn last_seen(&self) -> Instant { - self.last_seen + /// Returns `true` if the peer is currently undergoing the network handshake. + pub fn is_connecting(&self) -> bool { + matches!(self, Peer::Connecting) } -} -impl Peer { - /// Updates the node type. - pub fn set_node_type(&mut self, node_type: NodeType) { - self.node_type = node_type; + /// Returns `true` if the peer has concluded the network handshake. + pub fn is_connected(&self) -> bool { + matches!(self, Peer::Connected(_)) } - /// Updates the version. - pub fn set_version(&mut self, version: u32) { - self.version = version; + /// Updates the peer's `last_seen` timestamp. + pub fn update_last_seen(&mut self) { + if let Self::Connected(ConnectedPeer { last_seen, .. }) = self { + *last_seen = Instant::now(); + } } - /// Updates the last seen timestamp of the peer. - pub fn set_last_seen(&mut self, last_seen: Instant) { - self.last_seen = last_seen; + /// Updates the peer's version. + pub fn update_version(&mut self, new_version: u32) { + if let Self::Connected(ConnectedPeer { version, .. }) = self { + *version = new_version; + } } } diff --git a/node/router/src/helpers/resolver.rs b/node/router/src/helpers/resolver.rs index d6057a8f7f..5eed2a0242 100644 --- a/node/router/src/helpers/resolver.rs +++ b/node/router/src/helpers/resolver.rs @@ -17,36 +17,26 @@ use std::{collections::HashMap, net::SocketAddr}; /// The `Resolver` provides the means to map the connected address (used in the lower-level /// `tcp` module internals, and provided by the OS) to the listener address (used as the -/// unique peer identifier in the higher-level functions), and the other way around. +/// unique peer identifier in the higher-level functions). #[derive(Debug, Default)] pub(crate) struct Resolver { - /// The map of the listener address to (ambiguous) peer address. - from_listener: HashMap, - /// The map of the (ambiguous) peer address to listener address. + /// The map of the connected peer address to the correponding listener address. to_listener: HashMap, } impl Resolver { - /// Returns the listener address for the given (ambiguous) peer address, if it exists. - pub fn get_listener(&self, peer_addr: &SocketAddr) -> Option { - self.to_listener.get(peer_addr).copied() + /// Returns the listener address for the given connected address, if it exists. + pub fn get_listener(&self, connected_addr: &SocketAddr) -> Option { + self.to_listener.get(connected_addr).copied() } - /// Returns the (ambiguous) peer address for the given listener address, if it exists. - pub fn get_ambiguous(&self, peer_ip: &SocketAddr) -> Option { - self.from_listener.get(peer_ip).copied() + /// Inserts a new mapping of a connected address to the corresponding listener address. + pub fn insert_peer(&mut self, listener_addr: SocketAddr, connected_addr: SocketAddr) { + self.to_listener.insert(connected_addr, listener_addr); } - /// Inserts a bidirectional mapping of the listener address and the (ambiguous) peer address. - pub fn insert_peer(&mut self, listener_ip: SocketAddr, peer_addr: SocketAddr) { - self.from_listener.insert(listener_ip, peer_addr); - self.to_listener.insert(peer_addr, listener_ip); - } - - /// Removes the bidirectional mapping of the listener address and the (ambiguous) peer address. - pub fn remove_peer(&mut self, listener_ip: &SocketAddr) { - if let Some(peer_addr) = self.from_listener.remove(listener_ip) { - self.to_listener.remove(&peer_addr); - } + /// Removes the given mapping. + pub fn remove_peer(&mut self, connected_addr: &SocketAddr) { + self.to_listener.remove(connected_addr); } } diff --git a/node/router/src/inbound.rs b/node/router/src/inbound.rs index 0f9fae8ea9..7ee2c79b07 100644 --- a/node/router/src/inbound.rs +++ b/node/router/src/inbound.rs @@ -202,11 +202,8 @@ pub trait Inbound: Reading + Outbound { // Update the connected peer. if let Err(error) = - self.router().update_connected_peer(peer_ip, message.node_type, |peer: &mut Peer| { - // Update the version of the peer. - peer.set_version(message.version); - // Update the node type of the peer. - peer.set_node_type(message.node_type); + self.router().update_connected_peer(peer_ip, message.node_type, |conn: &mut Peer| { + conn.update_version(message.version); }) { bail!("[Ping] {error}"); diff --git a/node/router/src/lib.rs b/node/router/src/lib.rs index 37b5118e67..462d82024e 100644 --- a/node/router/src/lib.rs +++ b/node/router/src/lib.rs @@ -57,13 +57,12 @@ use parking_lot::{Mutex, RwLock}; #[cfg(not(any(test)))] use std::net::IpAddr; use std::{ - collections::{HashMap, HashSet, hash_map::Entry}, + collections::{HashMap, HashSet}, future::Future, net::SocketAddr, ops::Deref, str::FromStr, sync::Arc, - time::Instant, }; use tokio::task::JoinHandle; @@ -99,17 +98,8 @@ pub struct InnerRouter { resolver: RwLock, /// The set of trusted peers. trusted_peers: HashSet, - /// The map of connected peer IPs to their peer handlers. - connected_peers: RwLock>>, - /// The set of handshaking peers. While `Tcp` already recognizes the connecting IP addresses - /// and prevents duplicate outbound connection attempts to the same IP address, it is unable to - /// prevent simultaneous "two-way" connections between two peers (i.e. both nodes simultaneously - /// attempt to connect to each other). This set is used to prevent this from happening. - connecting_peers: Mutex>>>, - /// The set of candidate peer IPs. - candidate_peers: RwLock>, - /// The set of restricted peer IPs. - restricted_peers: RwLock>, + /// The collection of both candidate and connected peers. + peer_pool: RwLock>>, /// The spawned handles. handles: Mutex>>, /// If the flag is set, the node will periodically evict more external peers. @@ -126,8 +116,6 @@ impl Router { const CONNECTION_ATTEMPTS_SINCE_SECS: i64 = 10; /// The maximum number of candidate peers permitted to be stored in the node. const MAXIMUM_CANDIDATE_PEERS: usize = 10_000; - /// The maximum number of connection failures permitted by an inbound connecting peer. - const MAXIMUM_CONNECTION_FAILURES: usize = 5; /// The maximum amount of connection attempts withing a 10 second threshold #[cfg(not(test))] const MAX_CONNECTION_ATTEMPTS: usize = 10; @@ -162,10 +150,7 @@ impl Router { cache: Default::default(), resolver: Default::default(), trusted_peers: trusted_peers.iter().copied().collect(), - connected_peers: Default::default(), - connecting_peers: Default::default(), - candidate_peers: Default::default(), - restricted_peers: Default::default(), + peer_pool: Default::default(), handles: Default::default(), rotate_external_peers, allow_external_peers, @@ -195,13 +180,9 @@ impl Router { // Attempt to connect to the candidate peer. match router.tcp.connect(peer_ip).await { // Remove the peer from the candidate peers. - Ok(()) => { - router.remove_candidate_peer(peer_ip); - true - } + Ok(()) => true, // If the connection was not allowed, log the error. Err(error) => { - router.connecting_peers.lock().remove(&peer_ip); warn!("Unable to connect to '{peer_ip}' - {error}"); false } @@ -224,6 +205,11 @@ impl Router { if self.number_of_connected_peers() >= self.max_connected_peers() { bail!("Dropping connection attempt to '{peer_ip}' (maximum peers reached)") } + // Ensure the node is not already connecting to this peer. + if self.is_connecting(&peer_ip) { + debug!("Dropping connection attempt to '{peer_ip}' (already connecting)"); + return Ok(true); + } // Ensure the node is not already connected to this peer. if self.is_connected(&peer_ip) { debug!("Dropping connection attempt to '{peer_ip}' (already connected)"); @@ -231,42 +217,20 @@ impl Router { } // Ensure the peer is not restricted. if self.is_restricted(&peer_ip) { - bail!("Dropping connection attempt to '{peer_ip}' (restricted)") - } - // Ensure the node is not already connecting to this peer. - match self.connecting_peers.lock().entry(peer_ip) { - Entry::Vacant(entry) => { - entry.insert(None); - Ok(false) - } - Entry::Occupied(_) => { - debug!("Dropping connection attempt to '{peer_ip}' (already shaking hands as the initiator)"); - Ok(true) - } + bail!("Dropping connection attempt to '{peer_ip}' (restricted)"); } + + Ok(false) } /// Disconnects from the given peer IP, if the peer is connected. pub fn disconnect(&self, peer_ip: SocketAddr) -> JoinHandle { let router = self.clone(); tokio::spawn(async move { - if let Some(peer_addr) = router.resolve_to_ambiguous(&peer_ip) { - // Disconnect from this peer. - let disconnected = router.tcp.disconnect(peer_addr).await; - // FIXME (ljedrz): this shouldn't be necessary; it's a double-check - // that the higher-level collection is cleaned up after the lower-level disconnect. - if router.is_connected(&peer_ip) && !router.tcp.is_connected(peer_addr) { - warn!("Disconnecting with fallback safety (report this to @ljedrz)"); - router.remove_connected_peer(peer_ip); - } - disconnected + if let Some(peer) = router.get_connected_peer(&peer_ip) { + let connected_addr = peer.connected_addr; + router.tcp.disconnect(connected_addr).await } else { - // FIXME (ljedrz): this shouldn't be necessary; it's a double-check - // that the higher-level collection is consistent with the resolver. - if router.is_connected(&peer_ip) { - warn!("Fallback connection artifact cleanup (report this to @ljedrz)"); - router.remove_connected_peer(peer_ip); - } false } }) @@ -342,47 +306,51 @@ impl Router { } /// Returns the listener IP address from the (ambiguous) peer address. - pub fn resolve_to_listener(&self, peer_addr: &SocketAddr) -> Option { - self.resolver.read().get_listener(peer_addr) + pub fn resolve_to_listener(&self, connected_addr: &SocketAddr) -> Option { + self.resolver.read().get_listener(connected_addr) } /// Returns the (ambiguous) peer address from the listener IP address. - pub fn resolve_to_ambiguous(&self, peer_ip: &SocketAddr) -> Option { - self.resolver.read().get_ambiguous(peer_ip) + pub fn resolve_to_ambiguous(&self, listener_addr: &SocketAddr) -> Option { + if let Some(Peer::Connected(peer)) = self.peer_pool.read().get(listener_addr) { + Some(peer.connected_addr) + } else { + None + } + } + + /// Returns `true` if the node is connecting to the given peer IP. + pub fn is_connecting(&self, ip: &SocketAddr) -> bool { + self.peer_pool.read().get(ip).is_some_and(|peer| peer.is_connecting()) } /// Returns `true` if the node is connected to the given peer IP. pub fn is_connected(&self, ip: &SocketAddr) -> bool { - self.connected_peers.read().contains_key(ip) + self.peer_pool.read().get(ip).is_some_and(|peer| peer.is_connected()) } /// Returns `true` if the given peer IP is a connected validator. pub fn is_connected_validator(&self, peer_ip: &SocketAddr) -> bool { - self.connected_peers.read().get(peer_ip).is_some_and(|peer| peer.is_validator()) + self.peer_pool.read().get(peer_ip).is_some_and(|peer| peer.node_type() == Some(NodeType::Validator)) } /// Returns `true` if the given peer IP is a connected prover. pub fn is_connected_prover(&self, peer_ip: &SocketAddr) -> bool { - self.connected_peers.read().get(peer_ip).is_some_and(|peer| peer.is_prover()) + self.peer_pool.read().get(peer_ip).is_some_and(|peer| peer.node_type() == Some(NodeType::Prover)) } /// Returns `true` if the given peer IP is a connected client. pub fn is_connected_client(&self, peer_ip: &SocketAddr) -> bool { - self.connected_peers.read().get(peer_ip).is_some_and(|peer| peer.is_client()) - } - - /// Returns `true` if the node is currently connecting to the given peer IP. - pub fn is_connecting(&self, ip: &SocketAddr) -> bool { - self.connecting_peers.lock().contains_key(ip) + self.peer_pool.read().get(peer_ip).is_some_and(|peer| peer.node_type() == Some(NodeType::Client)) } /// Returns `true` if the given IP is restricted. pub fn is_restricted(&self, ip: &SocketAddr) -> bool { - self.restricted_peers - .read() - .get(ip) - .map(|time| time.elapsed().as_secs() < Self::RADIO_SILENCE_IN_SECS) - .unwrap_or(false) + if let Some(Peer::Candidate(peer)) = self.peer_pool.read().get(ip) { + peer.restricted.is_some_and(|ts| ts.elapsed().as_secs() < Self::RADIO_SILENCE_IN_SECS) + } else { + false + } } /// Returns `true` if the given IP is trusted. @@ -397,73 +365,105 @@ impl Router { /// Returns the number of connected peers. pub fn number_of_connected_peers(&self) -> usize { - self.connected_peers.read().len() + self.peer_pool.read().iter().filter(|(_, peer)| peer.is_connected()).count() } /// Returns the number of connected validators. pub fn number_of_connected_validators(&self) -> usize { - self.connected_peers.read().values().filter(|peer| peer.is_validator()).count() + self.peer_pool.read().values().filter(|peer| peer.node_type() == Some(NodeType::Validator)).count() } /// Returns the number of connected provers. pub fn number_of_connected_provers(&self) -> usize { - self.connected_peers.read().values().filter(|peer| peer.is_prover()).count() + self.peer_pool.read().values().filter(|peer| peer.node_type() == Some(NodeType::Prover)).count() } /// Returns the number of connected clients. pub fn number_of_connected_clients(&self) -> usize { - self.connected_peers.read().values().filter(|peer| peer.is_client()).count() + self.peer_pool.read().values().filter(|peer| peer.node_type() == Some(NodeType::Client)).count() } /// Returns the number of candidate peers. pub fn number_of_candidate_peers(&self) -> usize { - self.candidate_peers.read().len() + self.peer_pool.read().values().filter(|peer| matches!(peer, Peer::Candidate(_))).count() } /// Returns the number of restricted peers. pub fn number_of_restricted_peers(&self) -> usize { - self.restricted_peers.read().len() + self.peer_pool + .read() + .values() + .filter(|peer| matches!(peer, Peer::Candidate(peer) if peer.restricted.is_some())) + .count() } /// Returns the connected peer given the peer IP, if it exists. - pub fn get_connected_peer(&self, ip: &SocketAddr) -> Option> { - self.connected_peers.read().get(ip).cloned() + pub fn get_connected_peer(&self, ip: &SocketAddr) -> Option> { + if let Some(Peer::Connected(peer)) = self.peer_pool.read().get(ip) { Some(peer.clone()) } else { None } } /// Returns the connected peers. - pub fn get_connected_peers(&self) -> Vec> { - self.connected_peers.read().values().cloned().collect() + pub fn get_connected_peers(&self) -> Vec> { + self.peer_pool + .read() + .values() + .filter_map(|peer| if let Peer::Connected(p) = peer { Some(p.clone()) } else { None }) + .collect() } /// Returns the list of connected peers. pub fn connected_peers(&self) -> Vec { - self.connected_peers.read().keys().copied().collect() + self.peer_pool.read().iter().filter_map(|(addr, peer)| peer.is_connected().then_some(*addr)).collect() } /// Returns the list of connected validators. pub fn connected_validators(&self) -> Vec { - self.connected_peers.read().iter().filter(|(_, peer)| peer.is_validator()).map(|(ip, _)| *ip).collect() + self.peer_pool + .read() + .iter() + .filter_map(|(addr, peer)| (peer.node_type() == Some(NodeType::Validator)).then_some(*addr)) + .collect() } - /// Returns the list of connected provers. + /// Returns the list of the listening addresses of connected provers. pub fn connected_provers(&self) -> Vec { - self.connected_peers.read().iter().filter(|(_, peer)| peer.is_prover()).map(|(ip, _)| *ip).collect() + self.peer_pool + .read() + .iter() + .filter_map(|(addr, peer)| (peer.node_type() == Some(NodeType::Prover)).then_some(*addr)) + .collect() } /// Returns the list of connected clients. pub fn connected_clients(&self) -> Vec { - self.connected_peers.read().iter().filter(|(_, peer)| peer.is_client()).map(|(ip, _)| *ip).collect() + self.peer_pool + .read() + .iter() + .filter_map(|(addr, peer)| (peer.node_type() == Some(NodeType::Client)).then_some(*addr)) + .collect() } /// Returns the list of candidate peers. pub fn candidate_peers(&self) -> HashSet { let banned_ips = self.tcp().banned_peers().get_banned_ips(); - self.candidate_peers.read().iter().filter(|peer| !banned_ips.contains(&peer.ip())).copied().collect() + self.peer_pool + .read() + .iter() + .filter_map(|(addr, peer)| { + (matches!(peer, Peer::Candidate(_)) && !banned_ips.contains(&addr.ip())).then_some(*addr) + }) + .collect() } /// Returns the list of restricted peers. pub fn restricted_peers(&self) -> Vec { - self.restricted_peers.read().keys().copied().collect() + self.peer_pool + .read() + .iter() + .filter_map(|(addr, peer)| { + matches!(peer, Peer::Candidate(conn) if conn.restricted.is_some()).then_some(*addr) + }) + .collect() } /// Returns the list of trusted peers. @@ -521,39 +521,14 @@ impl Router { /// Returns the list of metrics for the connected peers. pub fn connected_metrics(&self) -> Vec<(SocketAddr, NodeType)> { - self.connected_peers.read().iter().map(|(ip, peer)| (*ip, peer.node_type())).collect() + self.get_connected_peers().iter().map(|peer| (peer.listener_addr, peer.node_type)).collect() } #[cfg(feature = "metrics")] fn update_metrics(&self) { - metrics::gauge(metrics::router::CONNECTED, self.connected_peers.read().len() as f64); - metrics::gauge(metrics::router::CANDIDATE, self.candidate_peers.read().len() as f64); - metrics::gauge(metrics::router::RESTRICTED, self.restricted_peers.read().len() as f64); - } - - /// Inserts the given peer into the connected peers. - pub fn insert_connected_peer(&self, peer_ip: SocketAddr) { - // Move the peer from "connecting" to "connected". - let peer = match self.connecting_peers.lock().remove(&peer_ip) { - Some(Some(peer)) => peer, - Some(None) => { - warn!("Couldn't promote {peer_ip} from \"connecting\" to \"connected\": Handshake not completed"); - return; - } - None => { - warn!("Couldn't promote {peer_ip} from \"connecting\" to \"connected\": Public/listen address unknown"); - return; - } - }; - // Add an entry for this `Peer` in the connected peers. - self.connected_peers.write().insert(peer_ip, peer); - // Remove this peer from the candidate peers, if it exists. - self.candidate_peers.write().remove(&peer_ip); - // Remove this peer from the restricted peers, if it exists. - self.restricted_peers.write().remove(&peer_ip); - #[cfg(feature = "metrics")] - self.update_metrics(); - info!("Connected to '{peer_ip}'"); + metrics::gauge(metrics::router::CONNECTED, self.number_of_connected_peers() as f64); + metrics::gauge(metrics::router::CANDIDATE, self.number_of_candidate_peers() as f64); + metrics::gauge(metrics::router::RESTRICTED, self.number_of_restricted_peers() as f64); } /// Inserts the given peer IPs to the set of candidate peers. @@ -563,27 +538,22 @@ impl Router { pub fn insert_candidate_peers(&self, peers: &[SocketAddr]) { // Compute the maximum number of candidate peers. let max_candidate_peers = Self::MAXIMUM_CANDIDATE_PEERS.saturating_sub(self.number_of_candidate_peers()); - // Ensure the combined number of peers does not surpass the threshold. - let eligible_peers = peers - .iter() - .filter(|peer_ip| { - // Ensure the peer is not itself, is not already connected, and is not restricted. - !self.is_local_ip(peer_ip) && !self.is_connected(peer_ip) && !self.is_restricted(peer_ip) - }) - .take(max_candidate_peers); - - // Proceed to insert the eligible candidate peer IPs. - self.candidate_peers.write().extend(eligible_peers); - #[cfg(feature = "metrics")] - self.update_metrics(); - } - - /// Inserts the given peer into the restricted peers. - pub fn insert_restricted_peer(&self, peer_ip: SocketAddr) { - // Remove this peer from the candidate peers, if it exists. - self.candidate_peers.write().remove(&peer_ip); - // Add the peer to the restricted peers. - self.restricted_peers.write().insert(peer_ip, Instant::now()); + { + let mut peer_pool = self.peer_pool.write(); + // Ensure the combined number of peers does not surpass the threshold. + let eligible_peers = peers + .iter() + .filter(|peer_ip| { + // Ensure the peer is not itself, is not already connected, and is not restricted. + !self.is_local_ip(peer_ip) && !peer_pool.contains_key(peer_ip) + }) + .take(max_candidate_peers) + .map(|addr| (*addr, Peer::new_candidate(*addr))) + .collect::>(); + + // Proceed to insert the eligible candidate peer IPs. + peer_pool.extend(eligible_peers); + } #[cfg(feature = "metrics")] self.update_metrics(); } @@ -596,10 +566,10 @@ impl Router { mut write_fn: Fn, ) -> Result<()> { // Retrieve the peer. - if let Some(peer) = self.connected_peers.write().get_mut(&peer_ip) { + if let Some(peer) = self.peer_pool.write().get_mut(&peer_ip) { // Ensure the node type has not changed. - if peer.node_type() != node_type { - bail!("Peer '{peer_ip}' has changed node types from {} to {node_type}", peer.node_type()) + if peer.node_type() != Some(node_type) { + bail!("Peer '{peer_ip}' has changed node types"); } // Lastly, update the peer with the given function. write_fn(peer); @@ -608,35 +578,18 @@ impl Router { } pub fn update_last_seen_for_connected_peer(&self, peer_ip: SocketAddr) { - if let Some(peer) = self.connected_peers.write().get_mut(&peer_ip) { - peer.set_last_seen(Instant::now()); + if let Some(peer) = self.peer_pool.write().get_mut(&peer_ip) { + peer.update_last_seen(); } } /// Removes the connected peer and adds them to the candidate peers. pub fn remove_connected_peer(&self, peer_ip: SocketAddr) { - // Remove this peer from the connected peers, if it exists. - self.connected_peers.write().remove(&peer_ip); - // Removes the bidirectional map between the listener address and (ambiguous) peer address. - self.resolver.write().remove_peer(&peer_ip); + if let Some(peer) = self.peer_pool.write().get_mut(&peer_ip) { + peer.downgrade_to_candidate(peer_ip, false); + } // Clear cached entries applicable to the peer. self.cache.clear_peer_entries(peer_ip); - // Add the peer to the candidate peers. - self.candidate_peers.write().insert(peer_ip); - #[cfg(feature = "metrics")] - self.update_metrics(); - } - - #[cfg(feature = "test")] - pub fn clear_candidate_peers(&self) { - self.candidate_peers.write().clear(); - #[cfg(feature = "metrics")] - self.update_metrics(); - } - - /// Removes the given address from the candidate peers, if it exists. - pub fn remove_candidate_peer(&self, peer_ip: SocketAddr) { - self.candidate_peers.write().remove(&peer_ip); #[cfg(feature = "metrics")] self.update_metrics(); } diff --git a/node/router/tests/common/router.rs b/node/router/tests/common/router.rs index cc0f017b98..5164236885 100644 --- a/node/router/tests/common/router.rs +++ b/node/router/tests/common/router.rs @@ -93,16 +93,7 @@ impl Handshake for TestRouter { #[async_trait] impl OnConnect for TestRouter { - async fn on_connect(&self, peer_addr: SocketAddr) { - let peer_ip = if let Some(ip) = self.router().resolve_to_listener(&peer_addr) { - ip - } else { - panic!("The peer IP should be known by the time OnConnect is triggered!"); - }; - - // Promote the peer's status from "connecting" to "connected". - self.router().insert_connected_peer(peer_ip); - } + async fn on_connect(&self, _peer_addr: SocketAddr) {} } #[async_trait] diff --git a/node/router/tests/connect.rs b/node/router/tests/connect.rs index 31705fde75..639e4ef1f5 100644 --- a/node/router/tests/connect.rs +++ b/node/router/tests/connect.rs @@ -18,7 +18,7 @@ use common::*; use snarkos_node_tcp::{ P2P, - protocols::{Handshake, OnConnect}, + protocols::{Disconnect, Handshake, OnConnect}, }; use core::time::Duration; @@ -172,6 +172,7 @@ async fn test_validator_connection() { assert_eq!(node0.number_of_connected_peers(), 0); node0.enable_handshake().await; node0.enable_on_connect().await; + node0.enable_disconnect().await; node0.tcp().enable_listener().await.unwrap(); // Get the local IP address from the first router. @@ -182,6 +183,7 @@ async fn test_validator_connection() { assert_eq!(node1.number_of_connected_peers(), 0); node1.enable_handshake().await; node1.enable_on_connect().await; + node1.enable_disconnect().await; node1.tcp().enable_listener().await.unwrap(); { diff --git a/node/router/tests/heartbeat.rs b/node/router/tests/heartbeat.rs index 1ad7300e71..471b2950b7 100644 --- a/node/router/tests/heartbeat.rs +++ b/node/router/tests/heartbeat.rs @@ -127,22 +127,22 @@ async fn peer_priority_ordering() { let mut removable_peers = removable_peers.into_iter(); // Client should be lowest priority. - assert_eq!(removable_peers.next().unwrap().ip(), client_peer.local_ip()); + assert_eq!(removable_peers.next().unwrap().listener_addr, client_peer.local_ip()); // Validator 1 has lower priority now because it was seen last. - assert_eq!(removable_peers.next().unwrap().ip(), validator_peer1.local_ip()); + assert_eq!(removable_peers.next().unwrap().listener_addr, validator_peer1.local_ip()); // Validator 2 has highest priority. - assert_eq!(removable_peers.next().unwrap().ip(), validator_peer2.local_ip()); + assert_eq!(removable_peers.next().unwrap().listener_addr, validator_peer2.local_ip()); // Update last seen again to check that priorities get updated correctly. heartbeat.router.update_last_seen_for_connected_peer(validator_peer1.local_ip()); // Validator 1 must have higher priority now. let mut removable_peers = heartbeat.get_removable_peers().into_iter(); - assert_eq!(removable_peers.next().unwrap().ip(), client_peer.local_ip()); - assert_eq!(removable_peers.next().unwrap().ip(), validator_peer2.local_ip()); - assert_eq!(removable_peers.next().unwrap().ip(), validator_peer1.local_ip()); + assert_eq!(removable_peers.next().unwrap().listener_addr, client_peer.local_ip()); + assert_eq!(removable_peers.next().unwrap().listener_addr, validator_peer2.local_ip()); + assert_eq!(removable_peers.next().unwrap().listener_addr, validator_peer1.local_ip()); } /// Checks that trusted peers are never marked as removable. diff --git a/node/src/client/router.rs b/node/src/client/router.rs index 75c9b80124..6b92a35008 100644 --- a/node/src/client/router.rs +++ b/node/src/client/router.rs @@ -66,8 +66,6 @@ impl> OnConnect for Client { async fn on_connect(&self, peer_addr: SocketAddr) { // Resolve the peer address to the listener address. let Some(peer_ip) = self.router.resolve_to_listener(&peer_addr) else { return }; - // Promote the peer's status from "connecting" to "connected". - self.router().insert_connected_peer(peer_ip); // If it's a bootstrap peer, first request its peers. if self.router.bootstrap_peers().contains(&peer_ip) { self.router().send(peer_ip, Message::PeerRequest(PeerRequest)); diff --git a/node/src/prover/router.rs b/node/src/prover/router.rs index 3d172f2c3f..052b0fde44 100644 --- a/node/src/prover/router.rs +++ b/node/src/prover/router.rs @@ -61,8 +61,6 @@ where async fn on_connect(&self, peer_addr: SocketAddr) { // Resolve the peer address to the listener address. let Some(peer_ip) = self.router.resolve_to_listener(&peer_addr) else { return }; - // Promote the peer's status from "connecting" to "connected". - self.router().insert_connected_peer(peer_ip); // Send the first `Ping` message to the peer. self.ping.on_peer_connected(peer_ip); } diff --git a/node/src/validator/router.rs b/node/src/validator/router.rs index c5d1d5f86c..218caad8be 100644 --- a/node/src/validator/router.rs +++ b/node/src/validator/router.rs @@ -64,8 +64,6 @@ where async fn on_connect(&self, peer_addr: SocketAddr) { // Resolve the peer address to the listener address. let Some(peer_ip) = self.router.resolve_to_listener(&peer_addr) else { return }; - // Promote the peer's status from "connecting" to "connected". - self.router().insert_connected_peer(peer_ip); // Send the first `Ping` message to the peer. self.ping.on_peer_connected(peer_ip); }