diff --git a/Cargo.lock b/Cargo.lock index a968929292..121400b12d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3896,6 +3896,7 @@ dependencies = [ "snarkos-node-bft-ledger-service", "snarkos-node-bft-storage-service", "snarkos-node-metrics", + "snarkos-node-router", "snarkos-node-sync", "snarkos-node-tcp", "snarkvm", diff --git a/display/src/pages/overview.rs b/display/src/pages/overview.rs index 3915647c18..bcb4fb3ab1 100644 --- a/display/src/pages/overview.rs +++ b/display/src/pages/overview.rs @@ -15,7 +15,10 @@ use crate::{content_style, header_style}; -use snarkos_node::{Node, router::Peer}; +use snarkos_node::{ + Node, + router::{Peer, PeerPoolHandling}, +}; use snarkvm::prelude::Network; use ratatui::{ @@ -79,7 +82,7 @@ impl Overview { Peer::Connected(_) => "connected", }.to_string(); - let node_type = if let Some(node_type ) = peer.node_type() { + let node_type = if let Some(node_type) = peer.node_type() { node_type.to_string() } else { "unknown".to_string() diff --git a/node/bft/Cargo.toml b/node/bft/Cargo.toml index 61ab97f8b5..490eb5f8de 100644 --- a/node/bft/Cargo.toml +++ b/node/bft/Cargo.toml @@ -111,6 +111,9 @@ features = [ "ledger", "prover" ] workspace = true features = [ "memory" ] +[dependencies.snarkos-node-router] +workspace = true + [dependencies.snarkos-node-sync] workspace = true diff --git a/node/bft/src/gateway.rs b/node/bft/src/gateway.rs index ec2144ea03..07bb71c084 100644 --- a/node/bft/src/gateway.rs +++ b/node/bft/src/gateway.rs @@ -42,6 +42,7 @@ use snarkos_node_bft_events::{ ValidatorsResponse, }; use snarkos_node_bft_ledger_service::LedgerService; +use snarkos_node_router::{NodeType, Peer, PeerPoolHandling}; use snarkos_node_sync::{MAX_BLOCKS_BEHIND, communication_service::CommunicationService}; use snarkos_node_tcp::{ Config, @@ -73,7 +74,7 @@ use rand::seq::{IteratorRandom, SliceRandom}; #[cfg(not(any(test)))] use std::net::IpAddr; use std::{ - collections::HashSet, + collections::{HashMap, HashSet}, future::Future, io, net::{Ipv4Addr, SocketAddr, SocketAddrV4}, @@ -132,17 +133,9 @@ pub struct Gateway { /// The cache. cache: Arc>, /// The resolver. - resolver: Arc>, - /// The set of trusted validators. - trusted_validators: IndexSet, - /// The map of connected peer IPs to their peer handlers. - connected_peers: Arc>>, - /// The set of handshaking peers. While `Tcp` already recognizes the connecting IP addresses - /// and prevents duplicate outbound connection attempts to the same IP address, it is unable to - /// prevent simultaneous "two-way" connections between two peers (i.e. both nodes simultaneously - /// attempt to connect to each other). This set is used to prevent this from happening. - connecting_peers: Arc>>, - /// The validator telemetry. + resolver: Arc>>, + /// The collection of both candidate and connected peers. + peer_pool: Arc>>>, #[cfg(feature = "telemetry")] validator_telemetry: Telemetry, /// The primary sender. @@ -157,6 +150,12 @@ pub struct Gateway { dev: Option, } +impl PeerPoolHandling for Gateway { + fn peer_pool(&self) -> &RwLock>> { + &self.peer_pool + } +} + impl Gateway { /// Initializes a new gateway. pub fn new( @@ -176,6 +175,13 @@ impl Gateway { // Initialize the TCP stack. let tcp = Tcp::new(Config::new(ip, Committee::::max_committee_size()?)); + // Add the trusted validators to the peer pool. + let initial_peers = trusted_validators + .iter() + .copied() + .map(|addr| (addr, Peer::new_candidate(addr, true))) + .collect::>(); + // Return the gateway. Ok(Self { account, @@ -184,9 +190,7 @@ impl Gateway { tcp, cache: Default::default(), resolver: Default::default(), - trusted_validators: trusted_validators.iter().copied().collect(), - connected_peers: Default::default(), - connecting_peers: Default::default(), + peer_pool: Arc::new(RwLock::new(initial_peers)), #[cfg(feature = "telemetry")] validator_telemetry: Default::default(), primary_sender: Default::default(), @@ -326,10 +330,15 @@ impl Gateway { } /// Returns the resolver. - pub fn resolver(&self) -> &Resolver { + pub fn resolver(&self) -> &RwLock> { &self.resolver } + /// Returns the listener IP address from the (ambiguous) peer address. + pub fn resolve_to_listener(&self, connected_addr: &SocketAddr) -> Option { + self.resolver.read().get_listener(*connected_addr) + } + /// Returns the validator telemetry. #[cfg(feature = "telemetry")] pub fn validator_telemetry(&self) -> &Telemetry { @@ -355,31 +364,21 @@ impl Gateway { /// Returns `true` if the node is connected to the given Aleo address. pub fn is_connected_address(&self, address: Address) -> bool { // Retrieve the peer IP of the given address. - match self.resolver.get_peer_ip_for_address(address) { + match self.resolver.read().get_peer_ip_for_address(address) { // Determine if the peer IP is connected. - Some(peer_ip) => self.is_connected_ip(peer_ip), + Some(peer_ip) => self.is_connected(peer_ip), None => false, } } - /// Returns `true` if the node is connected to the given peer IP. - pub fn is_connected_ip(&self, ip: SocketAddr) -> bool { - self.connected_peers.read().contains(&ip) - } - - /// Returns `true` if the node is connecting to the given peer IP. - pub fn is_connecting_ip(&self, ip: SocketAddr) -> bool { - self.connecting_peers.lock().contains(&ip) - } - /// Returns `true` if the given peer IP is an authorized validator. pub fn is_authorized_validator_ip(&self, ip: SocketAddr) -> bool { // If the peer IP is in the trusted validators, return early. - if self.trusted_validators.contains(&ip) { + if self.trusted_peers().contains(&ip) { return true; } // Retrieve the Aleo address of the peer IP. - match self.resolver.get_address(ip) { + match self.resolver.read().get_address(ip) { // Determine if the peer IP is an authorized validator. Some(address) => self.is_authorized_validator_address(address), None => false, @@ -427,23 +426,13 @@ impl Gateway { self.tcp.config().max_connections as usize } - /// Returns the number of connected peers. - pub fn number_of_connected_peers(&self) -> usize { - self.connected_peers.read().len() - } - /// Returns the list of connected addresses. pub fn connected_addresses(&self) -> HashSet> { - self.connected_peers.read().iter().filter_map(|peer_ip| self.resolver.get_address(*peer_ip)).collect() - } - - /// Returns the list of connected peers. - pub fn connected_peers(&self) -> &RwLock> { - &self.connected_peers + self.get_connected_peers().into_iter().map(|peer| peer.aleo_addr).collect() } /// Attempts to connect to the given peer IP. - pub fn connect(&self, peer_ip: SocketAddr) -> Option> { + pub fn connect(&self, peer_ip: SocketAddr) -> Option> { // Return early if the attempt is against the protocol rules. if let Err(forbidden_error) = self.check_connection_attempt(peer_ip) { warn!("{forbidden_error}"); @@ -454,9 +443,12 @@ impl Gateway { Some(tokio::spawn(async move { debug!("Connecting to validator {peer_ip}..."); // Attempt to connect to the peer. - if let Err(error) = self_.tcp.connect(peer_ip).await { - self_.connecting_peers.lock().shift_remove(&peer_ip); - warn!("Unable to connect to '{peer_ip}' - {error}"); + match self_.tcp.connect(peer_ip).await { + Ok(_) => true, + Err(error) => { + warn!("Unable to connect to '{peer_ip}' - {error}"); + false + } } })) } @@ -472,40 +464,33 @@ impl Gateway { bail!("{CONTEXT} Dropping connection attempt to '{peer_ip}' (maximum peers reached)") } // Ensure the node is not already connected to this peer. - if self.is_connected_ip(peer_ip) { + if self.is_connected(peer_ip) { bail!("{CONTEXT} Dropping connection attempt to '{peer_ip}' (already connected)") } // Ensure the node is not already connecting to this peer. - if self.is_connecting_ip(peer_ip) { + if self.is_connecting(peer_ip) { bail!("{CONTEXT} Dropping connection attempt to '{peer_ip}' (already connecting)") } Ok(()) } /// Ensure the peer is allowed to connect. - fn ensure_peer_is_allowed(&self, peer_ip: SocketAddr) -> Result<()> { + fn ensure_peer_is_allowed(&self, listener_addr: SocketAddr) -> Result<()> { // Ensure the peer IP is not this node. - if self.is_local_ip(peer_ip) { - bail!("{CONTEXT} Dropping connection request from '{peer_ip}' (attempted to self-connect)") - } - // Ensure the node is not already connecting to this peer. - if !self.connecting_peers.lock().insert(peer_ip) { - bail!("{CONTEXT} Dropping connection request from '{peer_ip}' (already shaking hands as the initiator)") - } - // Ensure the node is not already connected to this peer. - if self.is_connected_ip(peer_ip) { - bail!("{CONTEXT} Dropping connection request from '{peer_ip}' (already connected)") + if self.is_local_ip(listener_addr) { + bail!("{CONTEXT} Dropping connection request from '{listener_addr}' (attempted to self-connect)") } // Ensure the peer is not spamming connection attempts. - if !peer_ip.ip().is_loopback() { + if !listener_addr.ip().is_loopback() { // Add this connection attempt and retrieve the number of attempts. - let num_attempts = self.cache.insert_inbound_connection(peer_ip.ip(), RESTRICTED_INTERVAL); + let num_attempts = self.cache.insert_inbound_connection(listener_addr.ip(), RESTRICTED_INTERVAL); // Ensure the connecting peer has not surpassed the connection attempt limit. if num_attempts > MAX_CONNECTION_ATTEMPTS { - bail!("Dropping connection request from '{peer_ip}' (tried {num_attempts} times)") + bail!("Dropping connection request from '{listener_addr}' (tried {num_attempts} times)") } } - Ok(()) + + self.add_peer_on_handshake_resp(listener_addr) } /// Check whether the given IP address is currently banned. @@ -522,28 +507,20 @@ impl Gateway { #[cfg(feature = "metrics")] fn update_metrics(&self) { - metrics::gauge(metrics::bft::CONNECTED, self.connected_peers.read().len() as f64); - metrics::gauge(metrics::bft::CONNECTING, self.connecting_peers.lock().len() as f64); - } - - /// Inserts the given peer into the connected peers. - #[cfg(not(test))] - fn insert_connected_peer(&self, peer_ip: SocketAddr, peer_addr: SocketAddr, address: Address) { - // Adds a bidirectional map between the listener address and (ambiguous) peer address. - self.resolver.insert_peer(peer_ip, peer_addr, address); - // Add a transmission for this peer in the connected peers. - self.connected_peers.write().insert(peer_ip); - #[cfg(feature = "metrics")] - self.update_metrics(); + metrics::gauge(metrics::bft::CONNECTED, self.number_of_connected_peers() as f64); + metrics::gauge(metrics::bft::CONNECTING, self.number_of_connecting_peers() as f64); } /// Inserts the given peer into the connected peers. This is only used in testing. #[cfg(test)] pub fn insert_connected_peer(&self, peer_ip: SocketAddr, peer_addr: SocketAddr, address: Address) { // Adds a bidirectional map between the listener address and (ambiguous) peer address. - self.resolver.insert_peer(peer_ip, peer_addr, address); + self.resolver.write().insert_peer(peer_ip, peer_addr, address); // Add a transmission for this peer in the connected peers. - self.connected_peers.write().insert(peer_ip); + self.peer_pool.write().insert(peer_ip, Peer::new_connecting(peer_ip, false)); + if let Some(peer) = self.peer_pool.write().get_mut(&peer_ip) { + peer.upgrade_to_connected(peer_addr, peer_ip.port(), address, NodeType::Validator, 0); + } } /// Removes the connected peer and adds them to the candidate peers. @@ -557,10 +534,12 @@ impl Gateway { } }); } - // Removes the bidirectional map between the listener address and (ambiguous) peer address. - self.resolver.remove_peer(peer_ip); - // Remove this peer from the connected peers, if it exists. - self.connected_peers.write().shift_remove(&peer_ip); + if let Some(peer) = self.peer_pool.write().get_mut(&peer_ip) { + if matches!(peer, Peer::Connected(_)) { + self.resolver.write().remove_peer(peer_ip); + } + peer.downgrade_to_candidate(peer_ip); + } #[cfg(feature = "metrics")] self.update_metrics(); } @@ -572,7 +551,7 @@ impl Gateway { /// which can be used to determine when and whether the event has been delivered. fn send_inner(&self, peer_ip: SocketAddr, event: Event) -> Option>> { // Resolve the listener IP to the (ambiguous) peer address. - let Some(peer_addr) = self.resolver.get_ambiguous(peer_ip) else { + let Some(peer_addr) = self.resolver.read().get_ambiguous(peer_ip) else { warn!("Unable to resolve the listener IP address '{peer_ip}'"); return None; }; @@ -595,7 +574,7 @@ impl Gateway { /// propagated to the caller. async fn inbound(&self, peer_addr: SocketAddr, event: Event) -> Result { // Retrieve the listener IP for the peer. - let Some(peer_ip) = self.resolver.get_listener(peer_addr) else { + let Some(peer_ip) = self.resolver.read().get_listener(peer_addr) else { // No longer connected to the peer. trace!("Dropping a {} from {peer_addr} - no longer connected.", event.name()); return Ok(false); @@ -808,11 +787,9 @@ impl Gateway { // Retrieve the connected peers. let mut connected_peers: Vec<_> = match self.dev.is_some() { // In development mode, relax the validity requirements to make operating devnets more flexible. - true => self.connected_peers.read().iter().copied().collect(), + true => self.connected_peers(), // In production mode, ensure the peer IPs are valid. - false => { - self.connected_peers.read().iter().copied().filter(|ip| self.is_valid_peer_ip(*ip)).collect() - } + false => self.connected_peers().into_iter().filter(|ip| self.is_valid_peer_ip(*ip)).collect(), }; // Shuffle the connected peers. connected_peers.shuffle(&mut rand::thread_rng()); @@ -824,7 +801,7 @@ impl Gateway { // Iterate over the validators. for validator_ip in connected_peers.into_iter().take(MAX_VALIDATORS_TO_SEND) { // Retrieve the validator address. - if let Some(validator_address) = self_.resolver.get_address(validator_ip) { + if let Some(validator_address) = self_.resolver.read().get_address(validator_ip) { // Add the validator to the list of validators. validators.insert(validator_ip, validator_address); } @@ -869,7 +846,7 @@ impl Gateway { continue; } // Ensure the validator IP is not already connected or connecting. - if self_.is_connected_ip(validator_ip) || self_.is_connecting_ip(validator_ip) { + if self_.is_connected(validator_ip) || self_.is_connecting(validator_ip) { continue; } // Ensure the validator address is not already connected. @@ -913,14 +890,16 @@ impl Gateway { } } - /// Disconnects from the given peer IP, if the peer is connected. - pub fn disconnect(&self, peer_ip: SocketAddr) -> JoinHandle<()> { + /// Disconnects from the given peer IP, if the peer is connected. The returned boolean + /// indicates whether the peer was actually disconnected from, or if this was a noop. + pub fn disconnect(&self, peer_ip: SocketAddr) -> JoinHandle { let gateway = self.clone(); tokio::spawn(async move { - if let Some(peer_addr) = gateway.resolver.get_ambiguous(peer_ip) { - // Disconnect from this peer. - let _disconnected = gateway.tcp.disconnect(peer_addr).await; - debug_assert!(_disconnected); + if let Some(peer) = gateway.get_connected_peer(peer_ip) { + let connected_addr = peer.connected_addr; + gateway.tcp.disconnect(connected_addr).await + } else { + false } }) } @@ -978,7 +957,7 @@ impl Gateway { /// Logs the connected validators. fn log_connected_validators(&self) { // Log the connected validators. - let connected_validators = self.connected_peers().read().clone(); + let connected_validators = self.connected_peers(); // Resolve the total number of connectable validators. let validators_total = self.ledger.current_committee().map_or(0, |c| c.num_members().saturating_sub(1)); // Format the total validators message. @@ -994,7 +973,7 @@ impl Gateway { // Log the connected validators. info!("{connections_msg}"); for peer_ip in &connected_validators { - let address = self.resolver.get_address(*peer_ip).map_or("Unknown".to_string(), |a| { + let address = self.resolver.read().get_address(*peer_ip).map_or("Unknown".to_string(), |a| { connected_validator_addresses.insert(a); a.to_string() }); @@ -1033,11 +1012,11 @@ impl Gateway { /// This function attempts to connect to any disconnected trusted validators. fn handle_trusted_validators(&self) { // Ensure that the trusted nodes are connected. - for validator_ip in &self.trusted_validators { + for validator_ip in &self.trusted_peers() { // If the trusted_validator is not connected, attempt to connect to it. if !self.is_local_ip(*validator_ip) - && !self.is_connecting_ip(*validator_ip) - && !self.is_connected_ip(*validator_ip) + && !self.is_connecting(*validator_ip) + && !self.is_connected(*validator_ip) { // Attempt to connect to the trusted validator. self.connect(*validator_ip); @@ -1050,7 +1029,7 @@ impl Gateway { let self_ = self.clone(); tokio::spawn(async move { // Retrieve the connected validators. - let validators = self_.connected_peers().read().clone(); + let validators = self_.connected_peers(); // Iterate over the validator IPs. for peer_ip in validators { // Disconnect any validator that is not in the current committee. @@ -1070,7 +1049,7 @@ impl Gateway { // If the number of connected validators is less than the minimum, send a `ValidatorsRequest`. if self.number_of_connected_peers() < MIN_CONNECTED_VALIDATORS { // Retrieve the connected validators. - let validators = self.connected_peers().read().clone(); + let validators = self.connected_peers(); // If there are no validator IPs to connect to, return early. if validators.is_empty() { return; @@ -1092,7 +1071,7 @@ impl Gateway { async fn process_message_inner(&self, peer_addr: SocketAddr, message: Event) { // Process the message. Disconnect if the peer violated the protocol. if let Err(error) = self.inbound(peer_addr, message).await { - if let Some(peer_ip) = self.resolver.get_listener(peer_addr) { + if let Some(peer_ip) = self.resolver.read().get_listener(peer_addr) { warn!("{CONTEXT} Disconnecting from '{peer_ip}' - {error}"); let self_ = self.clone(); tokio::spawn(async move { @@ -1166,7 +1145,7 @@ impl Transport for Gateway { // Ensure there are connected peers. if self.number_of_connected_peers() > 0 { let self_ = self.clone(); - let connected_peers = self.connected_peers.read().clone(); + let connected_peers = self.connected_peers(); tokio::spawn(async move { // Iterate through all connected peers. for peer_ip in connected_peers { @@ -1245,7 +1224,7 @@ impl Writing for Gateway { impl Disconnect for Gateway { /// Any extra operations to be performed during a disconnect. async fn handle_disconnect(&self, peer_addr: SocketAddr) { - if let Some(peer_ip) = self.resolver.get_listener(peer_addr) { + if let Some(peer_ip) = self.resolve_to_listener(&peer_addr) { self.remove_connected_peer(peer_ip); // We don't clear this map based on time but only on peer disconnect. @@ -1295,7 +1274,7 @@ impl Handshake for Gateway { // If this is an inbound connection, we log it, but don't know the listening address yet. // Otherwise, we can immediately register the listening address. - let mut peer_ip = if peer_side == ConnectionSide::Initiator { + let mut listener_addr = if peer_side == ConnectionSide::Initiator { debug!("{CONTEXT} Gateway received a connection request from '{peer_addr}'"); None } else { @@ -1308,17 +1287,37 @@ impl Handshake for Gateway { // Perform the handshake; we pass on a mutable reference to peer_ip in case the process is broken at any point in time. let handshake_result = if peer_side == ConnectionSide::Responder { - self.handshake_inner_initiator(peer_addr, peer_ip, restrictions_id, stream).await + self.handshake_inner_initiator(peer_addr, restrictions_id, stream).await } else { - self.handshake_inner_responder(peer_addr, &mut peer_ip, restrictions_id, stream).await + self.handshake_inner_responder(peer_addr, &mut listener_addr, restrictions_id, stream).await }; - // Remove the address from the collection of connecting peers (if the handshake got to the point where it's known). - if let Some(ip) = peer_ip { - self.connecting_peers.lock().shift_remove(&ip); + if let Some(addr) = listener_addr { + match handshake_result { + Ok(ref cr) => { + if let Some(peer) = self.peer_pool.write().get_mut(&addr) { + self.resolver.write().insert_peer(addr, peer_addr, cr.address); + peer.upgrade_to_connected( + peer_addr, + cr.listener_port, + cr.address, + NodeType::Validator, + cr.version, + ); + } + #[cfg(feature = "metrics")] + self.update_metrics(); + info!("{CONTEXT} Gateway is connected to '{addr}'"); + } + Err(error) => { + if let Some(peer) = self.peer_pool.write().get_mut(&addr) { + peer.downgrade_to_candidate(addr); + } + // This error needs to be "repackaged" in order to conform to the return type. + return Err(error); + } + } } - let (ref peer_ip, _) = handshake_result?; - info!("{CONTEXT} Gateway is connected to '{peer_ip}'"); Ok(connection) } @@ -1372,12 +1371,11 @@ impl Gateway { async fn handshake_inner_initiator<'a>( &'a self, peer_addr: SocketAddr, - peer_ip: Option, restrictions_id: Field, stream: &'a mut TcpStream, - ) -> io::Result<(SocketAddr, Framed<&'a mut TcpStream, EventCodec>)> { - // This value is immediately guaranteed to be present, so it can be unwrapped. - let peer_ip = peer_ip.unwrap(); + ) -> io::Result> { + // Introduce the peer into the peer pool. + self.add_peer_on_handshake_init(peer_addr)?; // Construct the stream. let mut framed = Framed::new(stream, EventCodec::::handshake()); @@ -1427,10 +1425,7 @@ impl Gateway { ChallengeResponse { restrictions_id, signature: Data::Object(our_signature), nonce: response_nonce }; send_event(&mut framed, peer_addr, Event::ChallengeResponse(our_response)).await?; - // Add the peer to the gateway. - self.insert_connected_peer(peer_ip, peer_addr, peer_request.address); - - Ok((peer_ip, framed)) + Ok(peer_request) } /// The connection responder side of the handshake. @@ -1440,7 +1435,7 @@ impl Gateway { peer_ip: &mut Option, restrictions_id: Field, stream: &'a mut TcpStream, - ) -> io::Result<(SocketAddr, Framed<&'a mut TcpStream, EventCodec>)> { + ) -> io::Result> { // Construct the stream. let mut framed = Framed::new(stream, EventCodec::::handshake()); @@ -1502,10 +1497,8 @@ impl Gateway { send_event(&mut framed, peer_addr, reason.into()).await?; return Err(error(format!("Dropped '{peer_addr}' for reason: {reason:?}"))); } - // Add the peer to the gateway. - self.insert_connected_peer(peer_ip, peer_addr, peer_request.address); - Ok((peer_ip, framed)) + Ok(peer_request) } /// Verifies the given challenge request. Returns a disconnect reason if the request is invalid. diff --git a/node/bft/src/helpers/resolver.rs b/node/bft/src/helpers/resolver.rs index adcc7bb2da..7d1723fd61 100644 --- a/node/bft/src/helpers/resolver.rs +++ b/node/bft/src/helpers/resolver.rs @@ -15,22 +15,18 @@ use snarkvm::prelude::{Address, Network}; -#[cfg(feature = "locktick")] -use locktick::parking_lot::RwLock; -#[cfg(not(feature = "locktick"))] -use parking_lot::RwLock; use std::{collections::HashMap, net::SocketAddr}; #[derive(Debug)] pub struct Resolver { /// The map of the listener address to (ambiguous) peer address. - from_listener: RwLock>, + from_listener: HashMap, /// The map of the (ambiguous) peer address to listener address. - to_listener: RwLock>, + to_listener: HashMap, /// A map of `peer IP` to `address`. - peer_addresses: RwLock>>, + peer_addresses: HashMap>, /// A map of `address` to `peer IP`. - address_peers: RwLock, SocketAddr>>, + address_peers: HashMap, SocketAddr>, } impl Default for Resolver { @@ -55,41 +51,41 @@ impl Resolver { impl Resolver { /// Returns the listener address for the given (ambiguous) peer address, if it exists. pub fn get_listener(&self, peer_addr: SocketAddr) -> Option { - self.to_listener.read().get(&peer_addr).copied() + self.to_listener.get(&peer_addr).copied() } /// Returns the (ambiguous) peer address for the given listener address, if it exists. pub fn get_ambiguous(&self, peer_ip: SocketAddr) -> Option { - self.from_listener.read().get(&peer_ip).copied() + self.from_listener.get(&peer_ip).copied() } /// Returns the address for the given peer IP. pub fn get_address(&self, peer_ip: SocketAddr) -> Option> { - self.peer_addresses.read().get(&peer_ip).copied() + self.peer_addresses.get(&peer_ip).copied() } /// Returns the peer IP for the given address. pub fn get_peer_ip_for_address(&self, address: Address) -> Option { - self.address_peers.read().get(&address).copied() + self.address_peers.get(&address).copied() } /// Inserts a bidirectional mapping of the listener address and the (ambiguous) peer address, /// alongside a bidirectional mapping of the listener address and the Aleo address. - pub fn insert_peer(&self, listener_ip: SocketAddr, peer_addr: SocketAddr, address: Address) { - self.from_listener.write().insert(listener_ip, peer_addr); - self.to_listener.write().insert(peer_addr, listener_ip); - self.peer_addresses.write().insert(listener_ip, address); - self.address_peers.write().insert(address, listener_ip); + pub fn insert_peer(&mut self, listener_ip: SocketAddr, peer_addr: SocketAddr, address: Address) { + self.from_listener.insert(listener_ip, peer_addr); + self.to_listener.insert(peer_addr, listener_ip); + self.peer_addresses.insert(listener_ip, address); + self.address_peers.insert(address, listener_ip); } /// Removes the bidirectional mapping of the listener address and the (ambiguous) peer address, /// alongside the bidirectional mapping of the listener address and the Aleo address. - pub fn remove_peer(&self, listener_ip: SocketAddr) { - if let Some(peer_addr) = self.from_listener.write().remove(&listener_ip) { - self.to_listener.write().remove(&peer_addr); + pub fn remove_peer(&mut self, listener_ip: SocketAddr) { + if let Some(peer_addr) = self.from_listener.remove(&listener_ip) { + self.to_listener.remove(&peer_addr); } - if let Some(address) = self.peer_addresses.write().remove(&listener_ip) { - self.address_peers.write().remove(&address); + if let Some(address) = self.peer_addresses.remove(&listener_ip) { + self.address_peers.remove(&address); } } } @@ -103,7 +99,7 @@ mod tests { #[test] fn test_resolver() { - let resolver = Resolver::::new(); + let mut resolver = Resolver::::new(); let listener_ip = SocketAddr::from(([127, 0, 0, 1], 1234)); let peer_addr = SocketAddr::from(([127, 0, 0, 1], 4321)); let mut rng = TestRng::default(); diff --git a/node/bft/src/primary.rs b/node/bft/src/primary.rs index 41e0b50083..b5c688b810 100644 --- a/node/bft/src/primary.rs +++ b/node/bft/src/primary.rs @@ -401,7 +401,7 @@ impl Primary { // Iterate through the non-signers. for address in proposal.nonsigners(&self.ledger.get_committee_lookback_for_round(proposal.round())?) { // Resolve the address to the peer IP. - match self.gateway.resolver().get_peer_ip_for_address(address) { + match self.gateway.resolver().read().get_peer_ip_for_address(address) { // Resend the batch proposal to the validator for signing. Some(peer_ip) => { let (gateway, event_, round) = (self.gateway.clone(), event.clone(), proposal.round()); @@ -730,7 +730,7 @@ impl Primary { let batch_author = batch_header.author(); // Ensure the batch proposal is from the validator. - match self.gateway.resolver().get_address(peer_ip) { + match self.gateway.resolver().read().get_address(peer_ip) { // If the peer is a validator, then ensure the batch proposal is from the validator. Some(address) => { if address != batch_author { @@ -1004,7 +1004,7 @@ impl Primary { let signer = signature.to_address(); // Ensure the batch signature is signed by the validator. - if self.gateway.resolver().get_address(peer_ip) != Some(signer) { + if self.gateway.resolver().read().get_address(peer_ip) != Some(signer) { // Proceed to disconnect the validator. self.gateway.disconnect(peer_ip); bail!("Malicious peer - batch signature is from a different validator ({signer})"); @@ -1043,7 +1043,7 @@ impl Primary { // Retrieve the committee lookback for the round. let committee_lookback = self_.ledger.get_committee_lookback_for_round(proposal.round())?; // Retrieve the address of the validator. - let Some(signer) = self_.gateway.resolver().get_address(peer_ip) else { + let Some(signer) = self_.gateway.resolver().read().get_address(peer_ip) else { bail!("Signature is from a disconnected validator"); }; // Add the signature to the batch. @@ -2229,7 +2229,7 @@ mod tests { fn map_account_addresses(primary: &Primary, accounts: &[(SocketAddr, Account)]) { // First account is primary, which doesn't need to resolve. for (addr, acct) in accounts.iter().skip(1) { - primary.gateway.resolver().insert_peer(*addr, *addr, acct.address()); + primary.gateway.resolver().write().insert_peer(*addr, *addr, acct.address()); } } @@ -2426,7 +2426,7 @@ mod tests { } // The author must be known to resolver to pass propose checks. - primary.gateway.resolver().insert_peer(peer_ip, peer_ip, peer_account.1.address()); + primary.gateway.resolver().write().insert_peer(peer_ip, peer_ip, peer_account.1.address()); // The primary will only consider itself synced if we received // block locators from a peer. @@ -2465,7 +2465,7 @@ mod tests { } // The author must be known to resolver to pass propose checks. - primary.gateway.resolver().insert_peer(peer_ip, peer_ip, peer_account.1.address()); + primary.gateway.resolver().write().insert_peer(peer_ip, peer_ip, peer_account.1.address()); // Add a high block locator to indicate we are not synced. primary.sync.test_update_peer_locators(peer_ip, sample_block_locators(20)).unwrap(); @@ -2505,7 +2505,7 @@ mod tests { } // The author must be known to resolver to pass propose checks. - primary.gateway.resolver().insert_peer(peer_ip, peer_ip, peer_account.1.address()); + primary.gateway.resolver().write().insert_peer(peer_ip, peer_ip, peer_account.1.address()); // The primary will only consider itself synced if we received // block locators from a peer. @@ -2542,7 +2542,7 @@ mod tests { } // The author must be known to resolver to pass propose checks. - primary.gateway.resolver().insert_peer(peer_ip, peer_ip, peer_account.1.address()); + primary.gateway.resolver().write().insert_peer(peer_ip, peer_ip, peer_account.1.address()); // The primary must be considered synced. primary.sync.try_block_sync().await; @@ -2587,7 +2587,7 @@ mod tests { } // The author must be known to resolver to pass propose checks. - primary.gateway.resolver().insert_peer(peer_ip, peer_ip, peer_account.1.address()); + primary.gateway.resolver().write().insert_peer(peer_ip, peer_ip, peer_account.1.address()); // The primary must be considered synced. primary.sync.try_block_sync().await; @@ -2643,7 +2643,7 @@ mod tests { } // The author must be known to resolver to pass propose checks. - primary.gateway.resolver().insert_peer(peer_ip, peer_ip, peer_account.1.address()); + primary.gateway.resolver().write().insert_peer(peer_ip, peer_ip, peer_account.1.address()); // The primary must be considered synced. primary.sync.try_block_sync().await; @@ -2690,8 +2690,8 @@ mod tests { } // The author must be known to resolver to pass propose checks. - primary_v4.gateway.resolver().insert_peer(peer_ip, peer_ip, peer_account.1.address()); - primary_v5.gateway.resolver().insert_peer(peer_ip, peer_ip, peer_account.1.address()); + primary_v4.gateway.resolver().write().insert_peer(peer_ip, peer_ip, peer_account.1.address()); + primary_v5.gateway.resolver().write().insert_peer(peer_ip, peer_ip, peer_account.1.address()); // primary v4 must be considered synced. primary_v4.sync.test_update_peer_locators(peer_ip, sample_block_locators(0)).unwrap(); diff --git a/node/bft/src/sync/mod.rs b/node/bft/src/sync/mod.rs index 85ad6c0a7e..ae492468de 100644 --- a/node/bft/src/sync/mod.rs +++ b/node/bft/src/sync/mod.rs @@ -24,6 +24,7 @@ use crate::{ }; use snarkos_node_bft_events::{CertificateRequest, CertificateResponse, Event}; use snarkos_node_bft_ledger_service::LedgerService; +use snarkos_node_router::PeerPoolHandling; use snarkos_node_sync::{BLOCK_REQUEST_BATCH_DELAY, BlockSync, Ping, PrepareSyncRequest, locators::BlockLocators}; use snarkvm::{ console::{network::Network, types::Field}, diff --git a/node/bft/tests/common/primary.rs b/node/bft/tests/common/primary.rs index 3af72f1da7..1e29f26d99 100644 --- a/node/bft/tests/common/primary.rs +++ b/node/bft/tests/common/primary.rs @@ -27,6 +27,7 @@ use snarkos_node_bft::{ helpers::{PrimarySender, Storage, init_primary_channels}, }; use snarkos_node_bft_storage_service::BFTMemoryService; +use snarkos_node_router::PeerPoolHandling; use snarkos_node_sync::BlockSync; use snarkvm::{ console::{ @@ -120,7 +121,7 @@ impl TestValidator { let self_clone = self.clone(); self.handles.lock().push(tokio::task::spawn(async move { loop { - let connections = self_clone.primary.gateway().connected_peers().read().clone(); + let connections = self_clone.primary.gateway().connected_peers(); info!("{} connections", connections.len()); for connection in connections { debug!(" {}", connection); @@ -285,7 +286,7 @@ impl TestNetwork { // Disconnects N nodes from all other nodes. pub async fn disconnect(&self, num_nodes: u16) { for validator in self.validators.values().take(num_nodes as usize) { - for peer_ip in validator.primary.gateway().connected_peers().read().iter() { + for peer_ip in validator.primary.gateway().connected_peers().iter() { validator.primary.gateway().disconnect(*peer_ip); } } @@ -297,7 +298,7 @@ impl TestNetwork { // Disconnects a specific node from all other nodes. pub async fn disconnect_one(&self, id: u16) { let target_validator = self.validators.get(&id).unwrap(); - for peer_ip in target_validator.primary.gateway().connected_peers().read().iter() { + for peer_ip in target_validator.primary.gateway().connected_peers().iter() { target_validator.primary.gateway().disconnect(*peer_ip); } diff --git a/node/bft/tests/gateway_e2e.rs b/node/bft/tests/gateway_e2e.rs index 2ac0b37509..d616b70378 100644 --- a/node/bft/tests/gateway_e2e.rs +++ b/node/bft/tests/gateway_e2e.rs @@ -24,7 +24,8 @@ use crate::common::{ }; use snarkos_account::Account; use snarkos_node_bft::{Gateway, helpers::init_primary_channels}; -use snarkos_node_bft_events::{ChallengeRequest, ChallengeResponse, Disconnect, DisconnectReason, Event, WorkerPing}; +use snarkos_node_bft_events::{ChallengeRequest, ChallengeResponse, Event}; +use snarkos_node_router::PeerPoolHandling; use snarkos_node_tcp::P2P; use snarkvm::{ledger::narwhal::Data, prelude::TestRng}; @@ -78,79 +79,10 @@ async fn handshake_responder_side_timeout() { deadline!(Duration::from_secs(5), move || gateway_clone.tcp().num_connecting() == 0); // Check the test peer hasn't been added to the gateway's connected peers. - assert!(gateway.connected_peers().read().is_empty()); + assert!(gateway.connected_peers().is_empty()); assert_eq!(gateway.tcp().num_connected(), 0); } -// The test peer connects to the gateway and sends an unexpected event. -// The gateway's handshake should be interrupted and the peer should be -// disconnected. -macro_rules! handshake_responder_side_unexpected_event { - ($test_name:ident, $payload:expr) => { - paste::paste! { - #[tokio::test(flavor = "multi_thread")] - async fn []() { - const NUM_NODES: u16 = 4; - - let mut rng = TestRng::default(); - let (_accounts, gateway) = new_test_gateway(NUM_NODES, &mut rng).await; - let test_peer = TestPeer::new().await; - - // Initiate a connection with the gateway, this will only return once the handshake protocol has - // completed on the test peer's side, which is a no-op. - assert!(test_peer.connect(gateway.local_ip()).await.is_ok()); - - // Check the connection has been registered. - let gateway_clone = gateway.clone(); - deadline!(Duration::from_secs(1), move || gateway_clone.tcp().num_connecting() == 1); - - // Send an unexpected event. - let _ = test_peer.unicast( - gateway.local_ip(), - $payload - ); - - // Check the tcp stack's connection counts, make sure the disconnect interrupted handshaking, - // wait a short time to ensure the gateway has time to process the disconnect (note: this is - // shorter than the gateway's timeout, so we can ensure that's not the reason for the - // disconnect). - let gateway_clone = gateway.clone(); - deadline!(Duration::from_secs(1), move || gateway_clone.tcp().num_connecting() == 0); - - // Check the test peer hasn't been added to the gateway's connected peers. - assert!(gateway.connected_peers().read().is_empty()); - assert_eq!(gateway.tcp().num_connected(), 0); - } - } - }; -} - -/* Unexpected disconnects. */ - -macro_rules! handshake_responder_side_unexpected_disconnect { - ($($reason:ident),*) => { - $( - paste::paste! { - handshake_responder_side_unexpected_event!( - [], - Event::Disconnect(Disconnect::from(DisconnectReason::$reason)) - ); - } - )* - } - } - -handshake_responder_side_unexpected_disconnect!( - ProtocolViolation, - NoReasonGiven, - InvalidChallengeResponse, - OutdatedClientVersion -); - -/* Other unexpected event types */ - -handshake_responder_side_unexpected_event!(worker_ping, Event::WorkerPing(WorkerPing::new([].into()))); - // TODO(nkls): other event types, can be done as a follow up. /* Invalid challenge request */ @@ -188,7 +120,7 @@ async fn handshake_responder_side_invalid_challenge_request() { let gateway_clone = gateway.clone(); deadline!(Duration::from_secs(1), move || gateway_clone.tcp().num_connecting() == 0); // Check the test peer hasn't been added to the gateway's connected peers. - assert!(gateway.connected_peers().read().is_empty()); + assert!(gateway.connected_peers().is_empty()); assert_eq!(gateway.tcp().num_connected(), 0); } @@ -268,6 +200,6 @@ async fn handshake_responder_side_invalid_challenge_response() { let gateway_clone = gateway.clone(); deadline!(Duration::from_secs(1), move || gateway_clone.tcp().num_connecting() == 0); // Check the test peer hasn't been added to the gateway's connected peers. - assert!(gateway.connected_peers().read().is_empty()); + assert!(gateway.connected_peers().is_empty()); assert_eq!(gateway.tcp().num_connected(), 0); } diff --git a/node/rest/src/routes.rs b/node/rest/src/routes.rs index e10606367a..4d85e15129 100644 --- a/node/rest/src/routes.rs +++ b/node/rest/src/routes.rs @@ -14,7 +14,7 @@ // limitations under the License. use super::*; -use snarkos_node_router::messages::UnconfirmedSolution; +use snarkos_node_router::{PeerPoolHandling, messages::UnconfirmedSolution}; use snarkvm::{ ledger::puzzle::Solution, prelude::{Address, Identifier, LimitedWriter, Plaintext, Program, ToBytes, VM, block::Transaction}, diff --git a/node/router/src/handshake.rs b/node/router/src/handshake.rs index 4c36ad0947..9acbda1ffc 100644 --- a/node/router/src/handshake.rs +++ b/node/router/src/handshake.rs @@ -15,7 +15,7 @@ use crate::{ NodeType, - Peer, + PeerPoolHandling, Router, messages::{ChallengeRequest, ChallengeResponse, DisconnectReason, Message, MessageCodec, MessageTrait}, }; @@ -28,7 +28,7 @@ use snarkvm::{ use anyhow::{Result, bail}; use futures::SinkExt; use rand::{Rng, rngs::OsRng}; -use std::{collections::hash_map::Entry, io, net::SocketAddr}; +use std::{io, net::SocketAddr}; use tokio::net::TcpStream; use tokio_stream::StreamExt; use tokio_util::codec::Framed; @@ -132,9 +132,10 @@ impl Router { }; if let Some(addr) = listener_addr { - if let Ok(ref challenge_request) = handshake_result { + if let Ok(ref cr) = handshake_result { if let Some(peer) = self.peer_pool.write().get_mut(&addr) { - peer.upgrade_to_connected(peer_addr, challenge_request, self.clone()); + self.resolver.write().insert_peer(peer.listener_addr(), peer_addr); + peer.upgrade_to_connected(peer_addr, cr.listener_port, cr.address, cr.node_type, cr.version); } #[cfg(feature = "metrics")] self.update_metrics(); @@ -155,17 +156,9 @@ impl Router { genesis_header: Header, restrictions_id: Field, ) -> io::Result> { - match self.peer_pool.write().entry(peer_addr) { - Entry::Vacant(entry) => { - entry.insert(Peer::new_connecting(false, peer_addr)); - } - Entry::Occupied(mut entry) if matches!(entry.get(), Peer::Candidate(_)) => { - entry.insert(Peer::new_connecting(entry.get().is_trusted(), peer_addr)); - } - Entry::Occupied(_) => { - return Err(error(format!("Duplicate connection attempt with '{peer_addr}'"))); - } - } + // Introduce the peer into the peer pool. + self.add_peer_on_handshake_init(peer_addr)?; + // Construct the stream. let mut framed = Framed::new(stream, MessageCodec::::handshake()); @@ -317,27 +310,11 @@ impl Router { bail!("Dropping connection request from '{listener_addr}' (attempted to self-connect)") } // Unknown peers are untrusted, so check if `allow_external_peers` is true. - if !self.allow_external_peers() && !self.is_trusted(&listener_addr) { + if !self.allow_external_peers() && !self.is_trusted(listener_addr) { bail!("Dropping connection request from '{listener_addr}' (untrusted)") } - match self.peer_pool.write().entry(listener_addr) { - Entry::Vacant(entry) => { - entry.insert(Peer::new_connecting(false, listener_addr)); - } - Entry::Occupied(mut entry) => match entry.get_mut() { - peer @ Peer::Candidate(_) => { - *peer = Peer::new_connecting(peer.is_trusted(), listener_addr); - } - Peer::Connecting(_) => { - bail!("Dropping connection request from '{listener_addr}' (already connecting)"); - } - Peer::Connected(_) => { - bail!("Dropping connection request from '{listener_addr}' (already connected)"); - } - }, - }; - Ok(()) + self.add_peer_on_handshake_resp(listener_addr) } /// Verifies the given challenge request. Returns a disconnect reason if the request is invalid. diff --git a/node/router/src/heartbeat.rs b/node/router/src/heartbeat.rs index 5967ea6550..81ae779c54 100644 --- a/node/router/src/heartbeat.rs +++ b/node/router/src/heartbeat.rs @@ -16,6 +16,7 @@ use crate::{ ConnectedPeer, Outbound, + PeerPoolHandling, Router, bootstrap_peers, messages::{DisconnectReason, Message, PeerRequest}, diff --git a/node/router/src/helpers/peer.rs b/node/router/src/helpers/peer.rs index a05c1d3173..c454b524e5 100644 --- a/node/router/src/helpers/peer.rs +++ b/node/router/src/helpers/peer.rs @@ -13,7 +13,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use crate::{NodeType, Router, messages::ChallengeRequest}; +use crate::NodeType; use snarkvm::prelude::{Address, Network}; use std::{net::SocketAddr, time::Instant}; @@ -68,8 +68,6 @@ pub struct ConnectedPeer { pub first_seen: Instant, /// The timestamp of the last message received from this peer. pub last_seen: Instant, - /// A reference to the associated `Router` object. - pub router: Router, } impl Peer { @@ -79,44 +77,41 @@ impl Peer { } /// Create a connecting peer. - pub const fn new_connecting(trusted: bool, listener_addr: SocketAddr) -> Self { - Self::Connecting(ConnectingPeer { trusted, listener_addr }) + pub const fn new_connecting(listener_addr: SocketAddr, trusted: bool) -> Self { + Self::Connecting(ConnectingPeer { listener_addr, trusted }) } /// Promote a connecting peer to a fully connected one. - pub fn upgrade_to_connected(&mut self, connected_addr: SocketAddr, cr: &ChallengeRequest, router: Router) { + pub fn upgrade_to_connected( + &mut self, + connected_addr: SocketAddr, + listener_port: u16, + aleo_address: Address, + node_type: NodeType, + node_version: u32, + ) { // Logic check: this can only happen during the handshake. assert!(matches!(self, Self::Connecting(_))); let timestamp = Instant::now(); - let listener_addr = SocketAddr::from((connected_addr.ip(), cr.listener_port)); - - // Introduce the peer in the resolver. - router.resolver.write().insert_peer(listener_addr, connected_addr); + let listener_addr = SocketAddr::from((connected_addr.ip(), listener_port)); *self = Self::Connected(ConnectedPeer { listener_addr, connected_addr, - aleo_addr: cr.address, - node_type: cr.node_type, + aleo_addr: aleo_address, + node_type, trusted: self.is_trusted(), - version: cr.version, + version: node_version, last_height_seen: None, first_seen: timestamp, last_seen: timestamp, - router, }); } /// Demote a peer to candidate status, marking it as disconnected. pub fn downgrade_to_candidate(&mut self, listener_addr: SocketAddr) { - // Connecting peers are not in the resolver. - if let Self::Connected(peer) = self { - // Remove the peer from the resolver. - peer.router.resolver.write().remove_peer(&peer.connected_addr); - }; - - *self = Self::Candidate(CandidatePeer { listener_addr, trusted: self.is_trusted() }); + *self = Self::new_candidate(listener_addr, self.is_trusted()); } /// Returns the type of the node (only applicable to connected peers). @@ -129,11 +124,11 @@ impl Peer { } /// The listener (public) address of this peer. - pub fn listener_addr(&self) -> &SocketAddr { + pub fn listener_addr(&self) -> SocketAddr { match self { - Self::Candidate(p) => &p.listener_addr, - Self::Connecting(p) => &p.listener_addr, - Self::Connected(p) => &p.listener_addr, + Self::Candidate(p) => p.listener_addr, + Self::Connecting(p) => p.listener_addr, + Self::Connected(p) => p.listener_addr, } } diff --git a/node/router/src/inbound.rs b/node/router/src/inbound.rs index 0679c02dd0..bd6d92812c 100644 --- a/node/router/src/inbound.rs +++ b/node/router/src/inbound.rs @@ -15,6 +15,7 @@ use crate::{ Outbound, + PeerPoolHandling, messages::{ BlockRequest, BlockResponse, diff --git a/node/router/src/lib.rs b/node/router/src/lib.rs index b64c78474d..79d52e899c 100644 --- a/node/router/src/lib.rs +++ b/node/router/src/lib.rs @@ -44,13 +44,14 @@ pub use routing::*; mod writing; -use crate::messages::{Message, MessageCodec, NodeType}; +pub use crate::messages::NodeType; +use crate::messages::{Message, MessageCodec}; use snarkos_account::Account; use snarkos_node_bft_ledger_service::LedgerService; use snarkos_node_tcp::{Config, ConnectionSide, P2P, Tcp, is_bogon_ip, is_unspecified_or_broadcast_ip}; -use snarkvm::prelude::{Address, Network, PrivateKey, ViewKey}; +use snarkvm::prelude::{Address, Network, PrivateKey, ViewKey, error}; use aleo_std::{StorageMode, aleo_ledger_dir}; use anyhow::{Result, bail}; @@ -62,10 +63,10 @@ use parking_lot::{Mutex, RwLock}; use std::net::IpAddr; use std::{ cmp, - collections::{HashMap, HashSet}, + collections::{HashMap, HashSet, hash_map::Entry}, fs, future::Future, - io::Write, + io::{self, Write}, net::SocketAddr, ops::Deref, str::FromStr, @@ -80,6 +81,207 @@ pub const DEFAULT_NODE_PORT: u16 = 4130; /// The name of the file containing cached peers. const PEER_CACHE_FILENAME: &str = "cached_router_peers"; +pub trait PeerPoolHandling: P2P { + fn peer_pool(&self) -> &RwLock>>; + + /// Returns the connected peer address from the listener IP address. + fn resolve_to_ambiguous(&self, listener_addr: SocketAddr) -> Option { + if let Some(Peer::Connected(peer)) = self.peer_pool().read().get(&listener_addr) { + Some(peer.connected_addr) + } else { + None + } + } + + /// Returns `true` if the node is connecting to the given peer's listener address. + fn is_connecting(&self, listener_addr: SocketAddr) -> bool { + self.peer_pool().read().get(&listener_addr).is_some_and(|peer| peer.is_connecting()) + } + + /// Returns `true` if the node is connected to the given peer listener address. + fn is_connected(&self, listener_addr: SocketAddr) -> bool { + self.peer_pool().read().get(&listener_addr).is_some_and(|peer| peer.is_connected()) + } + + /// Returns `true` if the given listener address is trusted. + fn is_trusted(&self, listener_addr: SocketAddr) -> bool { + self.peer_pool().read().get(&listener_addr).is_some_and(|peer| peer.is_trusted()) + } + + /// Returns the number of connected peers. + fn number_of_connected_peers(&self) -> usize { + self.peer_pool().read().iter().filter(|(_, peer)| peer.is_connected()).count() + } + + /// Returns the number of connecting peers. + fn number_of_connecting_peers(&self) -> usize { + self.peer_pool().read().iter().filter(|(_, peer)| peer.is_connecting()).count() + } + + /// Returns the number of candidate peers. + fn number_of_candidate_peers(&self) -> usize { + self.peer_pool().read().values().filter(|peer| matches!(peer, Peer::Candidate(_))).count() + } + + /// Returns the connected peer given the peer IP, if it exists. + fn get_connected_peer(&self, listener_addr: SocketAddr) -> Option> { + if let Some(Peer::Connected(peer)) = self.peer_pool().read().get(&listener_addr) { + Some(peer.clone()) + } else { + None + } + } + + /// Updates the connected peer - if it exists - given the peer IP and a closure. + /// The returned status indicates whether the update was successful, i.e. the peer had existed. + fn update_connected_peer)>( + &self, + listener_addr: &SocketAddr, + mut update_fn: F, + ) -> bool { + if let Some(Peer::Connected(peer)) = self.peer_pool().write().get_mut(listener_addr) { + update_fn(peer); + true + } else { + false + } + } + + /// Returns the list of all peers (connected, connecting, and candidate). + fn get_peers(&self) -> Vec> { + self.peer_pool().read().values().cloned().collect() + } + + /// Returns all connected peers. + fn get_connected_peers(&self) -> Vec> { + self.filter_connected_peers(|_| true) + } + + /// Returns all connected peers that satisify the given predicate. + fn filter_connected_peers) -> bool>(&self, mut predicate: P) -> Vec> { + self.peer_pool() + .read() + .values() + .filter_map(|p| { + if let Peer::Connected(peer) = p + && predicate(peer) + { + Some(peer) + } else { + None + } + }) + .cloned() + .collect() + } + + /// Returns the list of connected peers. + fn connected_peers(&self) -> Vec { + self.peer_pool().read().iter().filter_map(|(addr, peer)| peer.is_connected().then_some(*addr)).collect() + } + + /// Returns the list of trusted peers. + fn trusted_peers(&self) -> Vec { + self.peer_pool().read().iter().filter_map(|(addr, peer)| peer.is_trusted().then_some(*addr)).collect() + } + + /// Returns the list of candidate peers. + fn candidate_peers(&self) -> HashSet { + let banned_ips = self.tcp().banned_peers().get_banned_ips(); + self.peer_pool() + .read() + .iter() + .filter_map(|(addr, peer)| { + (matches!(peer, Peer::Candidate(_)) && !banned_ips.contains(&addr.ip())).then_some(*addr) + }) + .collect() + } + + /// Returns the list of unconnected trusted peers. + fn unconnected_trusted_peers(&self) -> HashSet { + self.peer_pool() + .read() + .iter() + .filter_map( + |(addr, peer)| if let Peer::Candidate(peer) = peer { peer.trusted.then_some(*addr) } else { None }, + ) + .collect() + } + + /// Preserve the peers who have the greatest known block heights, and the lowest + /// number of registered network failures. + fn save_best_peers(&self, storage_mode: &StorageMode) -> Result<()> { + // Collect all prospect peers. + let mut peers = self.get_peers(); + + // Get the low-level peer stats. + let known_peers = self.tcp().known_peers().snapshot(); + + // Sort the list of peers. + peers.sort_unstable_by_key(|peer| { + if let Some(peer_stats) = known_peers.get(&peer.listener_addr().ip()) { + // Prioritize greatest height, then lowest failure count. + (cmp::Reverse(peer.last_height_seen()), peer_stats.failures()) + } else { + // Unreachable; use an else-compatible dummy. + (cmp::Reverse(peer.last_height_seen()), 0) + } + }); + peers.truncate(MAX_PEERS_TO_SEND); + + // Dump the connected peers to a file. + let mut path = aleo_ledger_dir(N::ID, storage_mode); + path.push(PEER_CACHE_FILENAME); + let mut file = fs::File::create(path)?; + for peer in peers { + writeln!(file, "{}", peer.listener_addr())?; + } + + Ok(()) + } + + // Introduces a new connecting peer into the peer pool if unknown, or promotes + // a known candidate peer to a connecting one, at the beginning of handshake + // when initiating it. + fn add_peer_on_handshake_init(&self, listener_addr: SocketAddr) -> io::Result<()> { + match self.peer_pool().write().entry(listener_addr) { + Entry::Vacant(entry) => { + entry.insert(Peer::new_connecting(listener_addr, false)); + } + Entry::Occupied(mut entry) if matches!(entry.get(), Peer::Candidate(_)) => { + entry.insert(Peer::new_connecting(listener_addr, entry.get().is_trusted())); + } + Entry::Occupied(_) => { + return Err(error(format!("Duplicate connection attempt with '{listener_addr}'"))); + } + } + Ok(()) + } + + // Introduces a new connecting peer into the peer pool if unknown, or promotes + // a known candidate peer to a connecting one, during the handshake when responding + // to it, once the peer's listener address is known. + fn add_peer_on_handshake_resp(&self, listener_addr: SocketAddr) -> anyhow::Result<()> { + match self.peer_pool().write().entry(listener_addr) { + Entry::Vacant(entry) => { + entry.insert(Peer::new_connecting(listener_addr, false)); + } + Entry::Occupied(mut entry) => match entry.get_mut() { + peer @ Peer::Candidate(_) => { + *peer = Peer::new_connecting(listener_addr, peer.is_trusted()); + } + Peer::Connecting(_) => { + bail!("Dropping connection request from '{listener_addr}' (already connecting)"); + } + Peer::Connected(_) => { + bail!("Dropping connection request from '{listener_addr}' (already connected)"); + } + }, + } + Ok(()) + } +} + /// The router keeps track of connected and connecting peers. /// The actual network communication happens in Inbound/Outbound, /// which is implemented by Validator, Prover, and Client. @@ -94,6 +296,12 @@ impl Deref for Router { } } +impl PeerPoolHandling for Router { + fn peer_pool(&self) -> &RwLock>> { + &self.peer_pool + } +} + pub struct InnerRouter { /// The TCP stack. tcp: Tcp, @@ -236,12 +444,12 @@ impl Router { bail!("Dropping connection attempt to '{peer_ip}' (maximum peers reached)") } // Ensure the node is not already connecting to this peer. - if self.is_connecting(&peer_ip) { + if self.is_connecting(peer_ip) { debug!("Dropping connection attempt to '{peer_ip}' (already connecting)"); return Ok(true); } // Ensure the node is not already connected to this peer. - if self.is_connected(&peer_ip) { + if self.is_connected(peer_ip) { debug!("Dropping connection attempt to '{peer_ip}' (already connected)"); return Ok(true); } @@ -249,11 +457,12 @@ impl Router { Ok(false) } - /// Disconnects from the given peer IP, if the peer is connected. + /// Disconnects from the given peer IP, if the peer is connected. The returned boolean + /// indicates whether the peer was actually disconnected from, or if this was a noop. pub fn disconnect(&self, peer_ip: SocketAddr) -> JoinHandle { let router = self.clone(); tokio::spawn(async move { - if let Some(peer) = router.get_connected_peer(&peer_ip) { + if let Some(peer) = router.get_connected_peer(peer_ip) { let connected_addr = peer.connected_addr; router.tcp.disconnect(connected_addr).await } else { @@ -336,128 +545,11 @@ impl Router { self.resolver.read().get_listener(connected_addr) } - /// Returns the (ambiguous) peer address from the listener IP address. - pub fn resolve_to_ambiguous(&self, listener_addr: &SocketAddr) -> Option { - if let Some(Peer::Connected(peer)) = self.peer_pool.read().get(listener_addr) { - Some(peer.connected_addr) - } else { - None - } - } - - /// Returns `true` if the node is connecting to the given peer's listener address. - pub fn is_connecting(&self, listener_addr: &SocketAddr) -> bool { - self.peer_pool.read().get(listener_addr).is_some_and(|peer| peer.is_connecting()) - } - - /// Returns `true` if the node is connected to the given peer listener address. - pub fn is_connected(&self, listener_addr: &SocketAddr) -> bool { - self.peer_pool.read().get(listener_addr).is_some_and(|peer| peer.is_connected()) - } - - /// Returns `true` if the given listener address is trusted. - pub fn is_trusted(&self, listener_addr: &SocketAddr) -> bool { - self.peer_pool.read().get(listener_addr).is_some_and(|peer| peer.is_trusted()) - } - /// Returns the maximum number of connected peers. pub fn max_connected_peers(&self) -> usize { self.tcp.config().max_connections as usize } - /// Returns the number of connected peers. - pub fn number_of_connected_peers(&self) -> usize { - self.peer_pool.read().iter().filter(|(_, peer)| peer.is_connected()).count() - } - - /// Returns the number of candidate peers. - pub fn number_of_candidate_peers(&self) -> usize { - self.peer_pool.read().values().filter(|peer| matches!(peer, Peer::Candidate(_))).count() - } - - /// Returns the connected peer given the peer IP, if it exists. - pub fn get_connected_peer(&self, listener_addr: &SocketAddr) -> Option> { - if let Some(Peer::Connected(peer)) = self.peer_pool.read().get(listener_addr) { - Some(peer.clone()) - } else { - None - } - } - - /// Updates the connected peer - if it exists - given the peer IP and a closure. - /// The returned status indicates whether the update was successful, i.e. the peer had existed. - pub fn update_connected_peer)>( - &self, - listener_addr: &SocketAddr, - mut update_fn: F, - ) -> bool { - if let Some(Peer::Connected(peer)) = self.peer_pool.write().get_mut(listener_addr) { - update_fn(peer); - true - } else { - false - } - } - - /// Returns the list of all peers (connected, connecting, and candidate). - pub fn get_peers(&self) -> Vec> { - self.peer_pool.read().values().cloned().collect() - } - - /// Returns all connected peers. - pub fn get_connected_peers(&self) -> Vec> { - self.filter_connected_peers(|_| true) - } - - /// Returns all connected peers that satisify the given predicate. - pub fn filter_connected_peers) -> bool>( - &self, - mut predicate: P, - ) -> Vec> { - self.peer_pool - .read() - .values() - .filter_map(|p| { - if let Peer::Connected(peer) = p - && predicate(peer) - { - Some(peer) - } else { - None - } - }) - .cloned() - .collect() - } - - /// Returns the list of connected peers. - pub fn connected_peers(&self) -> Vec { - self.peer_pool.read().iter().filter_map(|(addr, peer)| peer.is_connected().then_some(*addr)).collect() - } - - /// Returns the list of candidate peers. - pub fn candidate_peers(&self) -> HashSet { - let banned_ips = self.tcp().banned_peers().get_banned_ips(); - self.peer_pool - .read() - .iter() - .filter_map(|(addr, peer)| { - (matches!(peer, Peer::Candidate(_)) && !banned_ips.contains(&addr.ip())).then_some(*addr) - }) - .collect() - } - - /// Returns the list of unconnected trusted peers. - pub fn unconnected_trusted_peers(&self) -> HashSet { - self.peer_pool - .read() - .iter() - .filter_map( - |(addr, peer)| if let Peer::Candidate(peer) = peer { peer.trusted.then_some(*addr) } else { None }, - ) - .collect() - } - /// Check whether the given IP address is currently banned. #[cfg(not(any(test)))] fn is_ip_banned(&self, ip: IpAddr) -> bool { @@ -517,6 +609,9 @@ impl Router { /// Removes the connected peer and adds them to the candidate peers. pub fn remove_connected_peer(&self, peer_ip: SocketAddr) { if let Some(peer) = self.peer_pool.write().get_mut(&peer_ip) { + if let Peer::Connected(peer) = peer { + self.resolver.write().remove_peer(&peer.connected_addr); + } peer.downgrade_to_candidate(peer_ip); } // Clear cached entries applicable to the peer. @@ -530,42 +625,11 @@ impl Router { self.handles.lock().push(tokio::spawn(future)); } - // Save the best peers to disk. - fn save_best_peers(&self) -> Result<()> { - // Collect all prospect peers. - let mut peers = self.get_peers(); - - // Get the low-level peer stats. - let known_peers = self.tcp().known_peers().snapshot(); - - // Sort the list of peers. - peers.sort_unstable_by_key(|peer| { - if let Some(peer_stats) = known_peers.get(&peer.listener_addr().ip()) { - // Prioritize greatest height, then lowest failure count. - (cmp::Reverse(peer.last_height_seen()), peer_stats.failures()) - } else { - // Unreachable; use an else-compatible dummy. - (cmp::Reverse(peer.last_height_seen()), 0) - } - }); - peers.truncate(MAX_PEERS_TO_SEND); - - // Dump the connected peers to a file. - let mut path = aleo_ledger_dir(N::ID, &self.storage_mode); - path.push(PEER_CACHE_FILENAME); - let mut file = fs::File::create(path)?; - for peer in peers { - writeln!(file, "{}", peer.listener_addr())?; - } - - Ok(()) - } - /// Shuts down the router. pub async fn shut_down(&self) { info!("Shutting down the router..."); // Save the best peers for future use. - if let Err(e) = self.save_best_peers() { + if let Err(e) = self.save_best_peers(&self.storage_mode) { warn!("Failed to persist best peers to disk: {e}"); } // Abort the tasks. diff --git a/node/router/src/outbound.rs b/node/router/src/outbound.rs index 8c717c082e..a5ff2f2766 100644 --- a/node/router/src/outbound.rs +++ b/node/router/src/outbound.rs @@ -13,7 +13,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use crate::{Router, messages::Message}; +use crate::{PeerPoolHandling, Router, messages::Message}; use snarkvm::prelude::Network; use std::net::SocketAddr; diff --git a/node/router/src/writing.rs b/node/router/src/writing.rs index 7323cd3887..95ac21c746 100644 --- a/node/router/src/writing.rs +++ b/node/router/src/writing.rs @@ -44,7 +44,7 @@ impl Router { return None; } // Resolve the listener IP to the (ambiguous) peer address. - let peer_addr = match self.resolve_to_ambiguous(&peer_ip) { + let peer_addr = match self.resolve_to_ambiguous(peer_ip) { Some(peer_addr) => peer_addr, None => { warn!("Unable to resolve the listener IP address '{peer_ip}'"); @@ -80,7 +80,7 @@ impl Router { /// Returns `true` if the message can be sent. fn can_send(&self, peer_ip: SocketAddr, message: &Message) -> bool { // Ensure the peer is connected before sending. - if !self.is_connected(&peer_ip) { + if !self.is_connected(peer_ip) { warn!("Attempted to send to a non-connected peer {peer_ip}"); return false; } diff --git a/node/router/tests/cleanups.rs b/node/router/tests/cleanups.rs index 03829c6c68..a16151678c 100644 --- a/node/router/tests/cleanups.rs +++ b/node/router/tests/cleanups.rs @@ -18,7 +18,7 @@ use common::*; use deadline::deadline; use peak_alloc::PeakAlloc; -use snarkos_node_router::{Outbound, Routing}; +use snarkos_node_router::{Outbound, PeerPoolHandling, Routing}; use snarkos_node_tcp::protocols::{Disconnect, Handshake, OnConnect}; use snarkvm::{prelude::Rng, utilities::TestRng}; diff --git a/node/router/tests/connect.rs b/node/router/tests/connect.rs index 639e4ef1f5..19600b40e4 100644 --- a/node/router/tests/connect.rs +++ b/node/router/tests/connect.rs @@ -16,6 +16,7 @@ mod common; use common::*; +use snarkos_node_router::PeerPoolHandling; use snarkos_node_tcp::{ P2P, protocols::{Disconnect, Handshake, OnConnect}, @@ -106,7 +107,7 @@ async fn test_connect_with_handshake() { // Await for node1 to be connected. let node0_ip = node0.local_ip(); let node1_ = node1.clone(); - deadline!(Duration::from_secs(5), move || { node1_.is_connected(&node0_ip) }); + deadline!(Duration::from_secs(5), move || { node1_.is_connected(node0_ip) }); print_tcp!(node0); print_tcp!(node1); @@ -127,7 +128,7 @@ async fn test_connect_with_handshake() { // Await for node1 to be connected. let node0_ip = node0.local_ip(); let node1_ = node1.clone(); - deadline!(Duration::from_secs(5), move || { node1_.is_connected(&node0_ip) }); + deadline!(Duration::from_secs(5), move || { node1_.is_connected(node0_ip) }); print_tcp!(node0); print_tcp!(node1); @@ -148,7 +149,7 @@ async fn test_connect_with_handshake() { // Await for node0 to be connected. let node1_ip = node1.local_ip(); let node0_ = node0.clone(); - deadline!(Duration::from_secs(5), move || { node0_.is_connected(&node1_ip) }); + deadline!(Duration::from_secs(5), move || { node0_.is_connected(node1_ip) }); print_tcp!(node0); print_tcp!(node1); @@ -192,7 +193,7 @@ async fn test_validator_connection() { // Await for node1 to be connected. let node0_ip = node0.local_ip(); let node1_ = node1.clone(); - deadline!(Duration::from_secs(5), move || { node1_.is_connected(&node0_ip) }); + deadline!(Duration::from_secs(5), move || { node1_.is_connected(node0_ip) }); print_tcp!(node0); print_tcp!(node1); @@ -213,7 +214,7 @@ async fn test_validator_connection() { let node1_ = node1.clone(); let node0_ = node0.clone(); deadline!(Duration::from_secs(5), move || { - !node1_.is_connected(&node0_.local_ip()) && !node0_.is_connected(&node1_.local_ip()) + !node1_.is_connected(node0_.local_ip()) && !node0_.is_connected(node1_.local_ip()) }); // Connect node1 to node0. diff --git a/node/router/tests/disconnect.rs b/node/router/tests/disconnect.rs index 856e3c98fa..16fd80bf47 100644 --- a/node/router/tests/disconnect.rs +++ b/node/router/tests/disconnect.rs @@ -16,6 +16,7 @@ mod common; use common::*; +use snarkos_node_router::PeerPoolHandling; use snarkos_node_tcp::{ P2P, protocols::{Handshake, OnConnect}, diff --git a/node/router/tests/heartbeat.rs b/node/router/tests/heartbeat.rs index c181c7c1c2..241c592449 100644 --- a/node/router/tests/heartbeat.rs +++ b/node/router/tests/heartbeat.rs @@ -19,6 +19,7 @@ use common::*; use snarkos_node_router::{ Heartbeat, Outbound, + PeerPoolHandling, Router, Routing, messages::{Message, MessageCodec}, @@ -84,7 +85,7 @@ async fn connect_to(router: &TestRouter, other: &TestRouter) { let success = router.connect(other.local_ip()).unwrap().await.unwrap(); assert!(success, "Connection failed"); - while !router.is_connected(&other.local_ip()) { + while !router.is_connected(other.local_ip()) { sleep(Duration::from_millis(10)).await; } } diff --git a/node/src/client/router.rs b/node/src/client/router.rs index 7a14c0dfcd..5ca2ae91ed 100644 --- a/node/src/client/router.rs +++ b/node/src/client/router.rs @@ -15,6 +15,7 @@ use super::*; use snarkos_node_router::{ + PeerPoolHandling, Routing, bootstrap_peers, messages::{ diff --git a/node/src/prover/mod.rs b/node/src/prover/mod.rs index 44fe46e827..70e279e4ba 100644 --- a/node/src/prover/mod.rs +++ b/node/src/prover/mod.rs @@ -22,6 +22,7 @@ use snarkos_node_router::{ Heartbeat, Inbound, Outbound, + PeerPoolHandling, Router, Routing, messages::{Message, NodeType, UnconfirmedSolution}, diff --git a/node/tests/disconnect.rs b/node/tests/disconnect.rs index 97469e8bce..322d1755b1 100644 --- a/node/tests/disconnect.rs +++ b/node/tests/disconnect.rs @@ -19,6 +19,7 @@ mod common; use common::{node::*, test_peer::TestPeer}; +use snarkos_node_router::PeerPoolHandling; use snarkos_node_tcp::P2P; use deadline::deadline; @@ -37,6 +38,7 @@ macro_rules! test_disconnect { async fn $peer_type() { use deadline::deadline; use pea2pea::Pea2Pea; + use snarkos_node_router::PeerPoolHandling; use snarkos_node_tcp::P2P; use std::time::Duration; diff --git a/node/tests/handshake.rs b/node/tests/handshake.rs index b3b8455f7e..d52930a1f7 100644 --- a/node/tests/handshake.rs +++ b/node/tests/handshake.rs @@ -20,6 +20,7 @@ mod common; use common::{node::*, test_peer::TestPeer}; use snarkos_node::{Client, Prover, Validator}; +use snarkos_node_router::PeerPoolHandling; use snarkos_node_tcp::{ConnectError, P2P}; use snarkvm::prelude::{MainnetV0 as CurrentNetwork, store::helpers::memory::ConsensusMemory}; diff --git a/node/tests/peering.rs b/node/tests/peering.rs index e2ffd20132..3048540a77 100644 --- a/node/tests/peering.rs +++ b/node/tests/peering.rs @@ -17,7 +17,10 @@ mod common; use common::test_peer::TestPeer; -use snarkos_node_router::messages::{Message, PeerResponse}; +use snarkos_node_router::{ + PeerPoolHandling, + messages::{Message, PeerResponse}, +}; use snarkos_node_tcp::P2P; use deadline::deadline;