diff --git a/node/router/src/handshake.rs b/node/router/src/handshake.rs index bd20bc6c33..8fbc3f8b2b 100644 --- a/node/router/src/handshake.rs +++ b/node/router/src/handshake.rs @@ -137,7 +137,7 @@ impl Router { self.update_metrics(); debug!("Completed the handshake with '{peer_addr}'"); } else if let Some(peer) = self.peer_pool.write().get_mut(&addr) { - peer.downgrade_to_candidate(addr, false); + peer.downgrade_to_candidate(addr); } } @@ -154,10 +154,10 @@ impl Router { ) -> io::Result> { match self.peer_pool.write().entry(peer_addr) { Entry::Vacant(entry) => { - entry.insert(Peer::Connecting); + entry.insert(Peer::new_connecting(false)); } Entry::Occupied(mut entry) if matches!(entry.get(), Peer::Candidate(_)) => { - entry.insert(Peer::Connecting); + entry.insert(Peer::new_connecting(entry.get().is_trusted())); } Entry::Occupied(_) => { return Err(error(format!("Duplicate connection attempt with '{peer_addr}'"))); @@ -309,32 +309,24 @@ impl Router { /// Ensure the peer is allowed to connect. fn ensure_peer_is_allowed(&self, listener_addr: SocketAddr) -> Result<()> { - // Ensure the peer IP is not this node. + // Ensure that it's not a self-connect attempt. if self.is_local_ip(&listener_addr) { bail!("Dropping connection request from '{listener_addr}' (attempted to self-connect)") } - // Ensure either the peer is trusted or `allow_external_peers` is true. + // Unknown peers are untrusted, so check if `allow_external_peers` is true. if !self.allow_external_peers() && !self.is_trusted(&listener_addr) { bail!("Dropping connection request from '{listener_addr}' (untrusted)") } - // Ensure the node is not already connecting to this peer. + match self.peer_pool.write().entry(listener_addr) { Entry::Vacant(entry) => { - entry.insert(Peer::Connecting); + entry.insert(Peer::new_connecting(false)); } Entry::Occupied(mut entry) => match entry.get_mut() { - Peer::Candidate(peer) - if peer - .restricted - .map(|ts| ts.elapsed().as_secs() < Self::RADIO_SILENCE_IN_SECS) - .unwrap_or(false) => - { - bail!("Dropping connection request from '{listener_addr}' (restricted)"); - } peer @ Peer::Candidate(_) => { - let _ = mem::replace(peer, Peer::Connecting); + let _ = mem::replace(peer, Peer::new_connecting(peer.is_trusted())); } - Peer::Connecting => { + Peer::Connecting(_) => { bail!("Dropping connection request from '{listener_addr}' (already connecting)"); } Peer::Connected(_) => { diff --git a/node/router/src/heartbeat.rs b/node/router/src/heartbeat.rs index 3cb5030d72..a5e68a9749 100644 --- a/node/router/src/heartbeat.rs +++ b/node/router/src/heartbeat.rs @@ -99,9 +99,9 @@ pub trait Heartbeat: Outbound { // Check if any connected peer is stale. for peer in self.router().get_connected_peers() { // Disconnect if the peer has not communicated back within the predefined time. - let elapsed = peer.last_seen.elapsed().as_secs(); - if elapsed > Router::::RADIO_SILENCE_IN_SECS { - warn!("Peer {} has not communicated in {elapsed} seconds", peer.listener_addr); + let elapsed = peer.last_seen.elapsed(); + if elapsed > Router::::MAX_RADIO_SILENCE { + warn!("Peer {} has not communicated in {elapsed:?}", peer.listener_addr); // Disconnect from this peer. self.router().disconnect(peer.listener_addr); } @@ -117,8 +117,6 @@ pub trait Heartbeat: Outbound { /// - Validators are considered higher priority than provers or clients. /// - Connections that have not been seen in a while are considered lower priority. fn get_removable_peers(&self) -> Vec> { - // The trusted peers (specified at runtime). - let trusted = self.router().trusted_peers(); // The hardcoded bootstrap nodes. let bootstrap = self.router().bootstrap_peers(); // Are we synced already? (cache this here, so it does not need to be recomputed) @@ -128,34 +126,28 @@ pub trait Heartbeat: Outbound { // of the vector. // Note, that this gives equal priority to clients and provers, which // we might want to change in the future. - let mut peers = self.router().get_connected_peers(); + let mut peers = self.router().filter_connected_peers(|peer| { + !peer.trusted + && !bootstrap.contains(&peer.listener_addr) + && !self.router().cache.contains_inbound_block_request(&peer.listener_addr) // This peer is currently syncing from us. + && (is_block_synced || self.router().cache.num_outbound_block_requests(&peer.listener_addr) == 0) // We are currently syncing from this peer. + }); peers.sort_by_key(|peer| (peer.node_type.is_validator(), peer.last_seen)); - // Determine which of the peers can be removed. peers - .into_iter() - .filter(|peer| { - let addr = peer.listener_addr; - - !trusted.contains(&addr) // Always keep trusted nodes. - && !bootstrap.contains(&addr) // Always keep bootstrap nodes. - && !self.router().cache.contains_inbound_block_request(&addr) // This peer is currently syncing from us. - && (is_block_synced || self.router().cache.num_outbound_block_requests(&addr) == 0) // We are currently syncing from this peer. - }) - .collect() } /// This function removes the peer that we have not heard from the longest, /// to keep the connections fresh. /// It only triggers if the router is above the minimum number of connected peers. fn remove_oldest_connected_peer(&self) { - // Skip if the router is at or below the minimum number of connected peers. - if self.router().number_of_connected_peers() <= Self::MINIMUM_NUMBER_OF_PEERS { + // Skip if the node is not requesting peers. + if !self.router().allow_external_peers() { return; } - // Skip if the node is not requesting peers. - if !self.router().allow_external_peers() { + // Skip if the router is at or below the minimum number of connected peers. + if self.router().number_of_connected_peers() <= Self::MINIMUM_NUMBER_OF_PEERS { return; } @@ -177,7 +169,7 @@ pub trait Heartbeat: Outbound { // Obtain the number of connected peers. let num_connected = self.router().number_of_connected_peers(); // Obtain the number of connected provers. - let num_connected_provers = self.router().number_of_connected_provers(); + let num_connected_provers = self.router().filter_connected_peers(|peer| peer.node_type.is_prover()).len(); // Consider rotating more external peers every ~10 heartbeats. let reduce_peers = self.router().rotate_external_peers() && rng.gen_range(0..10) == 0; @@ -202,17 +194,16 @@ pub trait Heartbeat: Outbound { "Exceeded maximum number of connected peers, disconnecting from ({num_surplus_provers} + {num_surplus_clients_validators}) peers" ); - // Retrieve the trusted peers. - let trusted = self.router().trusted_peers(); // Retrieve the bootstrap peers. let bootstrap = self.router().bootstrap_peers(); // Determine the provers to disconnect from. let provers_to_disconnect = self .router() - .connected_provers() + .filter_connected_peers(|peer| { + peer.node_type.is_prover() && !peer.trusted && !bootstrap.contains(&peer.listener_addr) + }) .into_iter() - .filter(|peer_ip| !trusted.contains(peer_ip) && !bootstrap.contains(peer_ip)) .choose_multiple(rng, num_surplus_provers); // Determine the clients and validators to disconnect from. @@ -220,24 +211,20 @@ pub trait Heartbeat: Outbound { .get_removable_peers() .into_iter() .filter(|peer| !peer.node_type.is_prover()) // remove provers as those are handled separately - .map(|peer| peer.listener_addr) .take(num_surplus_clients_validators); // Proceed to send disconnect requests to these peers. - for peer_ip in peers_to_disconnect.chain(provers_to_disconnect) { + for peer in peers_to_disconnect.chain(provers_to_disconnect) { // TODO (howardwu): Remove this after specializing this function. - if self.router().node_type().is_prover() { - if let Some(peer) = self.router().get_connected_peer(&peer_ip) { - if peer.node_type.is_validator() { - continue; - } - } + if self.router().node_type().is_prover() && peer.node_type.is_validator() { + continue; } - info!("Disconnecting from '{peer_ip}' (exceeded maximum connections)"); - self.router().send(peer_ip, Message::Disconnect(DisconnectReason::TooManyPeers.into())); + let peer_addr = peer.listener_addr; + info!("Disconnecting from '{peer_addr}' (exceeded maximum connections)"); + self.router().send(peer_addr, Message::Disconnect(DisconnectReason::TooManyPeers.into())); // Disconnect from this peer. - self.router().disconnect(peer_ip); + self.router().disconnect(peer_addr); } } @@ -269,8 +256,9 @@ pub trait Heartbeat: Outbound { // Split the bootstrap peers into connected and candidate lists. let mut connected_bootstrap = Vec::new(); let mut candidate_bootstrap = Vec::new(); + let connected_peers = self.router().connected_peers(); for bootstrap_ip in self.router().bootstrap_peers() { - match self.router().is_connected(&bootstrap_ip) { + match connected_peers.contains(&bootstrap_ip) { true => connected_bootstrap.push(bootstrap_ip), false => candidate_bootstrap.push(bootstrap_ip), } @@ -312,20 +300,15 @@ pub trait Heartbeat: Outbound { // Ensure that the trusted nodes are connected. let handles: Vec<_> = self .router() - .trusted_peers() + .unconnected_trusted_peers() .iter() - .filter_map(|peer_ip| { - // If the peer is not already connecting or connected, attempt to connect to it. - if self.router().is_connecting(peer_ip) || self.router().is_connected(peer_ip) { - None - } else { - debug!("Attempting to (re-)connect to trusted peer `{peer_ip}`"); - let hdl = self.router().connect(*peer_ip); - if hdl.is_none() { - warn!("Could not initiate connection to trusted peer at `{peer_ip}`"); - } - hdl + .filter_map(|listener_addr| { + debug!("Attempting to (re-)connect to trusted peer `{listener_addr}`"); + let hdl = self.router().connect(*listener_addr); + if hdl.is_none() { + warn!("Could not initiate connection to trusted peer at `{listener_addr}`"); } + hdl }) .collect(); diff --git a/node/router/src/helpers/peer.rs b/node/router/src/helpers/peer.rs index 952ae85e8d..cfeae7fd8a 100644 --- a/node/router/src/helpers/peer.rs +++ b/node/router/src/helpers/peer.rs @@ -23,7 +23,7 @@ pub enum Peer { /// A candidate peer that's currently not connected to. Candidate(CandidatePeer), /// A peer that's currently being connected to (the handshake is in progress). - Connecting, + Connecting(ConnectingPeer), /// A fully connected (post-handshake) peer. Connected(ConnectedPeer), } @@ -33,17 +33,15 @@ pub enum Peer { pub struct CandidatePeer { /// The listening address of a candidate peer. pub listener_addr: SocketAddr, - /// A restricted peer is one that has recently exceeded the `MAXIMUM_CONNECTION_FAILURES`, - /// and is ineligible for connection attempts for `RADIO_SILENCE_IN_SECS`. - // TODO: consider removing in favor of the ban feature for simplicity. - pub restricted: Option, + /// Indicates whether the peer is considered trusted. + pub trusted: bool, } -impl CandidatePeer { - /// Restrict a candidate peer with the current timestamp. - pub fn restrict(&mut self) { - self.restricted = Some(Instant::now()); - } +/// A connecting peer. +#[derive(Clone)] +pub struct ConnectingPeer { + /// Indicates whether the peer is considered trusted. + pub trusted: bool, } /// A fully connected peer. @@ -53,6 +51,8 @@ pub struct ConnectedPeer { pub listener_addr: SocketAddr, /// The connected address of the peer. pub connected_addr: SocketAddr, + /// Indicates whether the peer is considered trusted. + pub trusted: bool, /// The Aleo address of the peer. pub aleo_addr: Address, /// The node type of the peer. @@ -69,14 +69,19 @@ pub struct ConnectedPeer { impl Peer { /// Create a candidate peer. - pub const fn new_candidate(listener_addr: SocketAddr) -> Self { - Self::Candidate(CandidatePeer { listener_addr, restricted: None }) + pub const fn new_candidate(listener_addr: SocketAddr, trusted: bool) -> Self { + Self::Candidate(CandidatePeer { listener_addr, trusted }) + } + + /// Create a connecting peer. + pub const fn new_connecting(trusted: bool) -> Self { + Self::Connecting(ConnectingPeer { trusted }) } /// Promote a connecting peer to a fully connected one. pub fn upgrade_to_connected(&mut self, connected_addr: SocketAddr, cr: &ChallengeRequest, router: Router) { // Logic check: this can only happen during the handshake. - assert!(matches!(self, Self::Connecting)); + assert!(matches!(self, Self::Connecting(_))); let timestamp = Instant::now(); let listener_addr = SocketAddr::from((connected_addr.ip(), cr.listener_port)); @@ -89,6 +94,7 @@ impl Peer { connected_addr, aleo_addr: cr.address, node_type: cr.node_type, + trusted: self.is_trusted(), version: cr.version, first_seen: timestamp, last_seen: timestamp, @@ -97,29 +103,28 @@ impl Peer { } /// Demote a peer to candidate status, marking it as disconnected. - pub fn downgrade_to_candidate(&mut self, listener_addr: SocketAddr, restrict: bool) { + pub fn downgrade_to_candidate(&mut self, listener_addr: SocketAddr) { // Connecting peers are not in the resolver. if let Self::Connected(peer) = self { // Remove the peer from the resolver. peer.router.resolver.write().remove_peer(&peer.connected_addr); }; - let restricted = if restrict { Some(Instant::now()) } else { None }; - *self = Self::Candidate(CandidatePeer { listener_addr, restricted }); + *self = Self::Candidate(CandidatePeer { listener_addr, trusted: self.is_trusted() }); } /// Returns the type of the node (only applicable to connected peers). pub fn node_type(&self) -> Option { match self { Self::Candidate(_) => None, - Self::Connecting => None, + Self::Connecting(_) => None, Self::Connected(peer) => Some(peer.node_type), } } /// Returns `true` if the peer is currently undergoing the network handshake. pub fn is_connecting(&self) -> bool { - matches!(self, Peer::Connecting) + matches!(self, Peer::Connecting(_)) } /// Returns `true` if the peer has concluded the network handshake. @@ -127,17 +132,19 @@ impl Peer { matches!(self, Peer::Connected(_)) } + /// Returns `true` if the peer is considered trusted. + pub fn is_trusted(&self) -> bool { + match self { + Self::Candidate(peer) => peer.trusted, + Self::Connecting(peer) => peer.trusted, + Self::Connected(peer) => peer.trusted, + } + } + /// Updates the peer's `last_seen` timestamp. pub fn update_last_seen(&mut self) { if let Self::Connected(ConnectedPeer { last_seen, .. }) = self { *last_seen = Instant::now(); } } - - /// Updates the peer's version. - pub fn update_version(&mut self, new_version: u32) { - if let Self::Connected(ConnectedPeer { version, .. }) = self { - *version = new_version; - } - } } diff --git a/node/router/src/inbound.rs b/node/router/src/inbound.rs index 7ee2c79b07..ddbb70a54d 100644 --- a/node/router/src/inbound.rs +++ b/node/router/src/inbound.rs @@ -15,7 +15,6 @@ use crate::{ Outbound, - Peer, messages::{ BlockRequest, BlockResponse, @@ -200,15 +199,6 @@ pub trait Inbound: Reading + Outbound { bail!("Peer '{peer_ip}' is a prover or client, but block locators were provided"); } - // Update the connected peer. - if let Err(error) = - self.router().update_connected_peer(peer_ip, message.node_type, |conn: &mut Peer| { - conn.update_version(message.version); - }) - { - bail!("[Ping] {error}"); - } - // Process the ping message. match self.ping(peer_ip, message) { true => Ok(true), diff --git a/node/router/src/lib.rs b/node/router/src/lib.rs index d4e6e1c429..6fdd75f95c 100644 --- a/node/router/src/lib.rs +++ b/node/router/src/lib.rs @@ -66,6 +66,7 @@ use std::{ ops::Deref, str::FromStr, sync::Arc, + time::Duration, }; use tokio::task::JoinHandle; @@ -99,8 +100,6 @@ pub struct InnerRouter { cache: Cache, /// The resolver. resolver: RwLock, - /// The set of trusted peers. - trusted_peers: HashSet, /// The collection of both candidate and connected peers. peer_pool: RwLock>>, /// The spawned handles. @@ -122,9 +121,9 @@ impl Router { /// The maximum amount of connection attempts withing a 10 second threshold #[cfg(not(test))] const MAX_CONNECTION_ATTEMPTS: usize = 10; - /// The duration in seconds after which a connected peer is considered inactive or + /// The duration after which a connected peer is considered inactive or /// disconnected if no message has been received in the meantime. - const RADIO_SILENCE_IN_SECS: u64 = 150; // 2.5 minutes + const MAX_RADIO_SILENCE: Duration = Duration::from_secs(150); // 2.5 minutes } impl Router { @@ -144,6 +143,12 @@ impl Router { // Initialize the TCP stack. let tcp = Tcp::new(Config::new(node_ip, max_peers)); + let trusted_peers = trusted_peers + .iter() + .copied() + .map(|addr| (addr, Peer::new_candidate(addr, true))) + .collect::>(); + // Initialize the router. Ok(Self(Arc::new(InnerRouter { tcp, @@ -152,8 +157,7 @@ impl Router { ledger, cache: Default::default(), resolver: Default::default(), - trusted_peers: trusted_peers.iter().copied().collect(), - peer_pool: Default::default(), + peer_pool: RwLock::new(trusted_peers), handles: Default::default(), rotate_external_peers, allow_external_peers, @@ -218,10 +222,6 @@ impl Router { debug!("Dropping connection attempt to '{peer_ip}' (already connected)"); return Ok(true); } - // Ensure the peer is not restricted. - if self.is_restricted(&peer_ip) { - bail!("Dropping connection attempt to '{peer_ip}' (restricted)"); - } Ok(false) } @@ -322,43 +322,19 @@ impl Router { } } - /// Returns `true` if the node is connecting to the given peer IP. - pub fn is_connecting(&self, ip: &SocketAddr) -> bool { - self.peer_pool.read().get(ip).is_some_and(|peer| peer.is_connecting()) - } - - /// Returns `true` if the node is connected to the given peer IP. - pub fn is_connected(&self, ip: &SocketAddr) -> bool { - self.peer_pool.read().get(ip).is_some_and(|peer| peer.is_connected()) - } - - /// Returns `true` if the given peer IP is a connected validator. - pub fn is_connected_validator(&self, peer_ip: &SocketAddr) -> bool { - self.peer_pool.read().get(peer_ip).is_some_and(|peer| peer.node_type() == Some(NodeType::Validator)) - } - - /// Returns `true` if the given peer IP is a connected prover. - pub fn is_connected_prover(&self, peer_ip: &SocketAddr) -> bool { - self.peer_pool.read().get(peer_ip).is_some_and(|peer| peer.node_type() == Some(NodeType::Prover)) - } - - /// Returns `true` if the given peer IP is a connected client. - pub fn is_connected_client(&self, peer_ip: &SocketAddr) -> bool { - self.peer_pool.read().get(peer_ip).is_some_and(|peer| peer.node_type() == Some(NodeType::Client)) + /// Returns `true` if the node is connecting to the given peer's listener address. + pub fn is_connecting(&self, listener_addr: &SocketAddr) -> bool { + self.peer_pool.read().get(listener_addr).is_some_and(|peer| peer.is_connecting()) } - /// Returns `true` if the given IP is restricted. - pub fn is_restricted(&self, ip: &SocketAddr) -> bool { - if let Some(Peer::Candidate(peer)) = self.peer_pool.read().get(ip) { - peer.restricted.is_some_and(|ts| ts.elapsed().as_secs() < Self::RADIO_SILENCE_IN_SECS) - } else { - false - } + /// Returns `true` if the node is connected to the given peer listener address. + pub fn is_connected(&self, listener_addr: &SocketAddr) -> bool { + self.peer_pool.read().get(listener_addr).is_some_and(|peer| peer.is_connected()) } - /// Returns `true` if the given IP is trusted. - pub fn is_trusted(&self, ip: &SocketAddr) -> bool { - self.trusted_peers.contains(ip) + /// Returns `true` if the given listener address is trusted. + pub fn is_trusted(&self, listener_addr: &SocketAddr) -> bool { + self.peer_pool.read().get(listener_addr).is_some_and(|peer| peer.is_trusted()) } /// Returns the maximum number of connected peers. @@ -371,46 +347,43 @@ impl Router { self.peer_pool.read().iter().filter(|(_, peer)| peer.is_connected()).count() } - /// Returns the number of connected validators. - pub fn number_of_connected_validators(&self) -> usize { - self.peer_pool.read().values().filter(|peer| peer.node_type() == Some(NodeType::Validator)).count() - } - - /// Returns the number of connected provers. - pub fn number_of_connected_provers(&self) -> usize { - self.peer_pool.read().values().filter(|peer| peer.node_type() == Some(NodeType::Prover)).count() - } - - /// Returns the number of connected clients. - pub fn number_of_connected_clients(&self) -> usize { - self.peer_pool.read().values().filter(|peer| peer.node_type() == Some(NodeType::Client)).count() - } - /// Returns the number of candidate peers. pub fn number_of_candidate_peers(&self) -> usize { self.peer_pool.read().values().filter(|peer| matches!(peer, Peer::Candidate(_))).count() } - /// Returns the number of restricted peers. - pub fn number_of_restricted_peers(&self) -> usize { - self.peer_pool - .read() - .values() - .filter(|peer| matches!(peer, Peer::Candidate(peer) if peer.restricted.is_some())) - .count() - } - /// Returns the connected peer given the peer IP, if it exists. - pub fn get_connected_peer(&self, ip: &SocketAddr) -> Option> { - if let Some(Peer::Connected(peer)) = self.peer_pool.read().get(ip) { Some(peer.clone()) } else { None } + pub fn get_connected_peer(&self, listener_addr: &SocketAddr) -> Option> { + if let Some(Peer::Connected(peer)) = self.peer_pool.read().get(listener_addr) { + Some(peer.clone()) + } else { + None + } } - /// Returns the connected peers. + /// Returns all connected peers. pub fn get_connected_peers(&self) -> Vec> { + self.filter_connected_peers(|_| true) + } + + /// Returns all connected peers that satisify the given predicate. + pub fn filter_connected_peers) -> bool>( + &self, + mut predicate: P, + ) -> Vec> { self.peer_pool .read() .values() - .filter_map(|peer| if let Peer::Connected(p) = peer { Some(p.clone()) } else { None }) + .filter_map(|p| { + if let Peer::Connected(peer) = p + && predicate(peer) + { + Some(peer) + } else { + None + } + }) + .cloned() .collect() } @@ -419,33 +392,6 @@ impl Router { self.peer_pool.read().iter().filter_map(|(addr, peer)| peer.is_connected().then_some(*addr)).collect() } - /// Returns the list of connected validators. - pub fn connected_validators(&self) -> Vec { - self.peer_pool - .read() - .iter() - .filter_map(|(addr, peer)| (peer.node_type() == Some(NodeType::Validator)).then_some(*addr)) - .collect() - } - - /// Returns the list of the listening addresses of connected provers. - pub fn connected_provers(&self) -> Vec { - self.peer_pool - .read() - .iter() - .filter_map(|(addr, peer)| (peer.node_type() == Some(NodeType::Prover)).then_some(*addr)) - .collect() - } - - /// Returns the list of connected clients. - pub fn connected_clients(&self) -> Vec { - self.peer_pool - .read() - .iter() - .filter_map(|(addr, peer)| (peer.node_type() == Some(NodeType::Client)).then_some(*addr)) - .collect() - } - /// Returns the list of candidate peers. pub fn candidate_peers(&self) -> HashSet { let banned_ips = self.tcp().banned_peers().get_banned_ips(); @@ -458,22 +404,17 @@ impl Router { .collect() } - /// Returns the list of restricted peers. - pub fn restricted_peers(&self) -> Vec { + /// Returns the list of unconnected trusted peers. + pub fn unconnected_trusted_peers(&self) -> HashSet { self.peer_pool .read() .iter() - .filter_map(|(addr, peer)| { - matches!(peer, Peer::Candidate(conn) if conn.restricted.is_some()).then_some(*addr) - }) + .filter_map( + |(addr, peer)| if let Peer::Candidate(peer) = peer { peer.trusted.then_some(*addr) } else { None }, + ) .collect() } - /// Returns the list of trusted peers. - pub fn trusted_peers(&self) -> &HashSet { - &self.trusted_peers - } - /// Returns the list of bootstrap peers. #[allow(clippy::if_same_then_else)] pub fn bootstrap_peers(&self) -> Vec { @@ -531,7 +472,6 @@ impl Router { fn update_metrics(&self) { metrics::gauge(metrics::router::CONNECTED, self.number_of_connected_peers() as f64); metrics::gauge(metrics::router::CANDIDATE, self.number_of_candidate_peers() as f64); - metrics::gauge(metrics::router::RESTRICTED, self.number_of_restricted_peers() as f64); } /// Inserts the given peer IPs to the set of candidate peers. @@ -547,11 +487,11 @@ impl Router { let eligible_peers = peers .iter() .filter(|peer_ip| { - // Ensure the peer is not itself, is not already connected, and is not restricted. + // Ensure the peer is not itself, and is not already known. !self.is_local_ip(peer_ip) && !peer_pool.contains_key(peer_ip) }) .take(max_candidate_peers) - .map(|addr| (*addr, Peer::new_candidate(*addr))) + .map(|addr| (*addr, Peer::new_candidate(*addr, false))) .collect::>(); // Proceed to insert the eligible candidate peer IPs. @@ -561,25 +501,6 @@ impl Router { self.update_metrics(); } - /// Updates the connected peer with the given function. - pub fn update_connected_peer)>( - &self, - peer_ip: SocketAddr, - node_type: NodeType, - mut write_fn: Fn, - ) -> Result<()> { - // Retrieve the peer. - if let Some(peer) = self.peer_pool.write().get_mut(&peer_ip) { - // Ensure the node type has not changed. - if peer.node_type() != Some(node_type) { - bail!("Peer '{peer_ip}' has changed node types"); - } - // Lastly, update the peer with the given function. - write_fn(peer); - } - Ok(()) - } - pub fn update_last_seen_for_connected_peer(&self, peer_ip: SocketAddr) { if let Some(peer) = self.peer_pool.write().get_mut(&peer_ip) { peer.update_last_seen(); @@ -589,7 +510,7 @@ impl Router { /// Removes the connected peer and adds them to the candidate peers. pub fn remove_connected_peer(&self, peer_ip: SocketAddr) { if let Some(peer) = self.peer_pool.write().get_mut(&peer_ip) { - peer.downgrade_to_candidate(peer_ip, false); + peer.downgrade_to_candidate(peer_ip); } // Clear cached entries applicable to the peer. self.cache.clear_peer_entries(peer_ip); diff --git a/node/router/src/outbound.rs b/node/router/src/outbound.rs index 148cea56a6..3d50a0f2f6 100644 --- a/node/router/src/outbound.rs +++ b/node/router/src/outbound.rs @@ -48,12 +48,12 @@ pub trait Outbound { // } // Prepare the peers to send to. - let connected_peers = self.router().connected_peers(); - let peers = connected_peers.iter().filter(|peer_ip| !excluded_peers.contains(peer_ip)); + let connected_peers = + self.router().filter_connected_peers(|peer| !excluded_peers.contains(&peer.listener_addr)); // Iterate through all peers that are not the sender and excluded peers. - for peer_ip in peers { - self.router().send(*peer_ip, message.clone()); + for addr in connected_peers.iter().map(|peer| peer.listener_addr) { + self.router().send(addr, message.clone()); } } @@ -76,12 +76,13 @@ pub trait Outbound { // } // Prepare the peers to send to. - let connected_validators = self.router().connected_validators(); - let peers = connected_validators.iter().filter(|peer_ip| !excluded_peers.contains(peer_ip)); + let connected_validators = self.router().filter_connected_peers(|peer| { + peer.node_type.is_validator() && !excluded_peers.contains(&peer.listener_addr) + }); // Iterate through all validators that are not the sender and excluded validators. - for peer_ip in peers { - self.router().send(*peer_ip, message.clone()); + for listener_addr in connected_validators.iter().map(|peer| peer.listener_addr) { + self.router().send(listener_addr, message.clone()); } } }