Skip to content
26 changes: 9 additions & 17 deletions node/router/src/handshake.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ impl<N: Network> Router<N> {
self.update_metrics();
debug!("Completed the handshake with '{peer_addr}'");
} else if let Some(peer) = self.peer_pool.write().get_mut(&addr) {
peer.downgrade_to_candidate(addr, false);
peer.downgrade_to_candidate(addr);
}
}

Expand All @@ -154,10 +154,10 @@ impl<N: Network> Router<N> {
) -> io::Result<ChallengeRequest<N>> {
match self.peer_pool.write().entry(peer_addr) {
Entry::Vacant(entry) => {
entry.insert(Peer::Connecting);
entry.insert(Peer::new_connecting(false));
}
Entry::Occupied(mut entry) if matches!(entry.get(), Peer::Candidate(_)) => {
entry.insert(Peer::Connecting);
entry.insert(Peer::new_connecting(entry.get().is_trusted()));
}
Entry::Occupied(_) => {
return Err(error(format!("Duplicate connection attempt with '{peer_addr}'")));
Expand Down Expand Up @@ -309,32 +309,24 @@ impl<N: Network> Router<N> {

/// Ensure the peer is allowed to connect.
fn ensure_peer_is_allowed(&self, listener_addr: SocketAddr) -> Result<()> {
// Ensure the peer IP is not this node.
// Ensure that it's not a self-connect attempt.
if self.is_local_ip(&listener_addr) {
bail!("Dropping connection request from '{listener_addr}' (attempted to self-connect)")
}
// Ensure either the peer is trusted or `allow_external_peers` is true.
// Unknown peers are untrusted, so check if `allow_external_peers` is true.
if !self.allow_external_peers() && !self.is_trusted(&listener_addr) {
bail!("Dropping connection request from '{listener_addr}' (untrusted)")
}
// Ensure the node is not already connecting to this peer.

match self.peer_pool.write().entry(listener_addr) {
Entry::Vacant(entry) => {
entry.insert(Peer::Connecting);
entry.insert(Peer::new_connecting(false));
}
Entry::Occupied(mut entry) => match entry.get_mut() {
Peer::Candidate(peer)
if peer
.restricted
.map(|ts| ts.elapsed().as_secs() < Self::RADIO_SILENCE_IN_SECS)
.unwrap_or(false) =>
{
bail!("Dropping connection request from '{listener_addr}' (restricted)");
}
peer @ Peer::Candidate(_) => {
let _ = mem::replace(peer, Peer::Connecting);
let _ = mem::replace(peer, Peer::new_connecting(peer.is_trusted()));
}
Peer::Connecting => {
Peer::Connecting(_) => {
bail!("Dropping connection request from '{listener_addr}' (already connecting)");
}
Peer::Connected(_) => {
Expand Down
83 changes: 33 additions & 50 deletions node/router/src/heartbeat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,9 +99,9 @@ pub trait Heartbeat<N: Network>: Outbound<N> {
// Check if any connected peer is stale.
for peer in self.router().get_connected_peers() {
// Disconnect if the peer has not communicated back within the predefined time.
let elapsed = peer.last_seen.elapsed().as_secs();
if elapsed > Router::<N>::RADIO_SILENCE_IN_SECS {
warn!("Peer {} has not communicated in {elapsed} seconds", peer.listener_addr);
let elapsed = peer.last_seen.elapsed();
if elapsed > Router::<N>::MAX_RADIO_SILENCE {
warn!("Peer {} has not communicated in {elapsed:?}", peer.listener_addr);
// Disconnect from this peer.
self.router().disconnect(peer.listener_addr);
}
Expand All @@ -117,8 +117,6 @@ pub trait Heartbeat<N: Network>: Outbound<N> {
/// - Validators are considered higher priority than provers or clients.
/// - Connections that have not been seen in a while are considered lower priority.
fn get_removable_peers(&self) -> Vec<ConnectedPeer<N>> {
// The trusted peers (specified at runtime).
let trusted = self.router().trusted_peers();
// The hardcoded bootstrap nodes.
let bootstrap = self.router().bootstrap_peers();
// Are we synced already? (cache this here, so it does not need to be recomputed)
Expand All @@ -128,34 +126,28 @@ pub trait Heartbeat<N: Network>: Outbound<N> {
// of the vector.
// Note, that this gives equal priority to clients and provers, which
// we might want to change in the future.
let mut peers = self.router().get_connected_peers();
let mut peers = self.router().filter_connected_peers(|peer| {
!peer.trusted
&& !bootstrap.contains(&peer.listener_addr)
&& !self.router().cache.contains_inbound_block_request(&peer.listener_addr) // This peer is currently syncing from us.
&& (is_block_synced || self.router().cache.num_outbound_block_requests(&peer.listener_addr) == 0) // We are currently syncing from this peer.
});
peers.sort_by_key(|peer| (peer.node_type.is_validator(), peer.last_seen));

// Determine which of the peers can be removed.
peers
.into_iter()
.filter(|peer| {
let addr = peer.listener_addr;

!trusted.contains(&addr) // Always keep trusted nodes.
&& !bootstrap.contains(&addr) // Always keep bootstrap nodes.
&& !self.router().cache.contains_inbound_block_request(&addr) // This peer is currently syncing from us.
&& (is_block_synced || self.router().cache.num_outbound_block_requests(&addr) == 0) // We are currently syncing from this peer.
})
.collect()
}

/// This function removes the peer that we have not heard from the longest,
/// to keep the connections fresh.
/// It only triggers if the router is above the minimum number of connected peers.
fn remove_oldest_connected_peer(&self) {
// Skip if the router is at or below the minimum number of connected peers.
if self.router().number_of_connected_peers() <= Self::MINIMUM_NUMBER_OF_PEERS {
// Skip if the node is not requesting peers.
if !self.router().allow_external_peers() {
return;
}

// Skip if the node is not requesting peers.
if !self.router().allow_external_peers() {
// Skip if the router is at or below the minimum number of connected peers.
if self.router().number_of_connected_peers() <= Self::MINIMUM_NUMBER_OF_PEERS {
return;
}

Expand All @@ -177,7 +169,7 @@ pub trait Heartbeat<N: Network>: Outbound<N> {
// Obtain the number of connected peers.
let num_connected = self.router().number_of_connected_peers();
// Obtain the number of connected provers.
let num_connected_provers = self.router().number_of_connected_provers();
let num_connected_provers = self.router().filter_connected_peers(|peer| peer.node_type.is_prover()).len();

// Consider rotating more external peers every ~10 heartbeats.
let reduce_peers = self.router().rotate_external_peers() && rng.gen_range(0..10) == 0;
Expand All @@ -202,42 +194,37 @@ pub trait Heartbeat<N: Network>: Outbound<N> {
"Exceeded maximum number of connected peers, disconnecting from ({num_surplus_provers} + {num_surplus_clients_validators}) peers"
);

// Retrieve the trusted peers.
let trusted = self.router().trusted_peers();
// Retrieve the bootstrap peers.
let bootstrap = self.router().bootstrap_peers();

// Determine the provers to disconnect from.
let provers_to_disconnect = self
.router()
.connected_provers()
.filter_connected_peers(|peer| {
peer.node_type.is_prover() && !peer.trusted && !bootstrap.contains(&peer.listener_addr)
})
.into_iter()
.filter(|peer_ip| !trusted.contains(peer_ip) && !bootstrap.contains(peer_ip))
.choose_multiple(rng, num_surplus_provers);

// Determine the clients and validators to disconnect from.
let peers_to_disconnect = self
.get_removable_peers()
.into_iter()
.filter(|peer| !peer.node_type.is_prover()) // remove provers as those are handled separately
.map(|peer| peer.listener_addr)
.take(num_surplus_clients_validators);

// Proceed to send disconnect requests to these peers.
for peer_ip in peers_to_disconnect.chain(provers_to_disconnect) {
for peer in peers_to_disconnect.chain(provers_to_disconnect) {
// TODO (howardwu): Remove this after specializing this function.
if self.router().node_type().is_prover() {
if let Some(peer) = self.router().get_connected_peer(&peer_ip) {
if peer.node_type.is_validator() {
continue;
}
}
if self.router().node_type().is_prover() && peer.node_type.is_validator() {
continue;
}

info!("Disconnecting from '{peer_ip}' (exceeded maximum connections)");
self.router().send(peer_ip, Message::Disconnect(DisconnectReason::TooManyPeers.into()));
let peer_addr = peer.listener_addr;
info!("Disconnecting from '{peer_addr}' (exceeded maximum connections)");
self.router().send(peer_addr, Message::Disconnect(DisconnectReason::TooManyPeers.into()));
// Disconnect from this peer.
self.router().disconnect(peer_ip);
self.router().disconnect(peer_addr);
}
}

Expand Down Expand Up @@ -269,8 +256,9 @@ pub trait Heartbeat<N: Network>: Outbound<N> {
// Split the bootstrap peers into connected and candidate lists.
let mut connected_bootstrap = Vec::new();
let mut candidate_bootstrap = Vec::new();
let connected_peers = self.router().connected_peers();
for bootstrap_ip in self.router().bootstrap_peers() {
match self.router().is_connected(&bootstrap_ip) {
match connected_peers.contains(&bootstrap_ip) {
true => connected_bootstrap.push(bootstrap_ip),
false => candidate_bootstrap.push(bootstrap_ip),
}
Expand Down Expand Up @@ -312,20 +300,15 @@ pub trait Heartbeat<N: Network>: Outbound<N> {
// Ensure that the trusted nodes are connected.
let handles: Vec<_> = self
.router()
.trusted_peers()
.unconnected_trusted_peers()
.iter()
.filter_map(|peer_ip| {
// If the peer is not already connecting or connected, attempt to connect to it.
if self.router().is_connecting(peer_ip) || self.router().is_connected(peer_ip) {
None
} else {
debug!("Attempting to (re-)connect to trusted peer `{peer_ip}`");
let hdl = self.router().connect(*peer_ip);
if hdl.is_none() {
warn!("Could not initiate connection to trusted peer at `{peer_ip}`");
}
hdl
.filter_map(|listener_addr| {
debug!("Attempting to (re-)connect to trusted peer `{listener_addr}`");
let hdl = self.router().connect(*listener_addr);
if hdl.is_none() {
warn!("Could not initiate connection to trusted peer at `{listener_addr}`");
}
hdl
})
.collect();

Expand Down
57 changes: 32 additions & 25 deletions node/router/src/helpers/peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ pub enum Peer<N: Network> {
/// A candidate peer that's currently not connected to.
Candidate(CandidatePeer),
/// A peer that's currently being connected to (the handshake is in progress).
Connecting,
Connecting(ConnectingPeer),
/// A fully connected (post-handshake) peer.
Connected(ConnectedPeer<N>),
}
Expand All @@ -33,17 +33,15 @@ pub enum Peer<N: Network> {
pub struct CandidatePeer {
/// The listening address of a candidate peer.
pub listener_addr: SocketAddr,
/// A restricted peer is one that has recently exceeded the `MAXIMUM_CONNECTION_FAILURES`,
/// and is ineligible for connection attempts for `RADIO_SILENCE_IN_SECS`.
// TODO: consider removing in favor of the ban feature for simplicity.
pub restricted: Option<Instant>,
/// Indicates whether the peer is considered trusted.
pub trusted: bool,
}

impl CandidatePeer {
/// Restrict a candidate peer with the current timestamp.
pub fn restrict(&mut self) {
self.restricted = Some(Instant::now());
}
/// A connecting peer.
#[derive(Clone)]
pub struct ConnectingPeer {
/// Indicates whether the peer is considered trusted.
pub trusted: bool,
}

/// A fully connected peer.
Expand All @@ -53,6 +51,8 @@ pub struct ConnectedPeer<N: Network> {
pub listener_addr: SocketAddr,
/// The connected address of the peer.
pub connected_addr: SocketAddr,
/// Indicates whether the peer is considered trusted.
pub trusted: bool,
/// The Aleo address of the peer.
pub aleo_addr: Address<N>,
/// The node type of the peer.
Expand All @@ -69,14 +69,19 @@ pub struct ConnectedPeer<N: Network> {

impl<N: Network> Peer<N> {
/// Create a candidate peer.
pub const fn new_candidate(listener_addr: SocketAddr) -> Self {
Self::Candidate(CandidatePeer { listener_addr, restricted: None })
pub const fn new_candidate(listener_addr: SocketAddr, trusted: bool) -> Self {
Self::Candidate(CandidatePeer { listener_addr, trusted })
}

/// Create a connecting peer.
pub const fn new_connecting(trusted: bool) -> Self {
Self::Connecting(ConnectingPeer { trusted })
}

/// Promote a connecting peer to a fully connected one.
pub fn upgrade_to_connected(&mut self, connected_addr: SocketAddr, cr: &ChallengeRequest<N>, router: Router<N>) {
// Logic check: this can only happen during the handshake.
assert!(matches!(self, Self::Connecting));
assert!(matches!(self, Self::Connecting(_)));

let timestamp = Instant::now();
let listener_addr = SocketAddr::from((connected_addr.ip(), cr.listener_port));
Expand All @@ -89,6 +94,7 @@ impl<N: Network> Peer<N> {
connected_addr,
aleo_addr: cr.address,
node_type: cr.node_type,
trusted: self.is_trusted(),
version: cr.version,
first_seen: timestamp,
last_seen: timestamp,
Expand All @@ -97,47 +103,48 @@ impl<N: Network> Peer<N> {
}

/// Demote a peer to candidate status, marking it as disconnected.
pub fn downgrade_to_candidate(&mut self, listener_addr: SocketAddr, restrict: bool) {
pub fn downgrade_to_candidate(&mut self, listener_addr: SocketAddr) {
// Connecting peers are not in the resolver.
if let Self::Connected(peer) = self {
// Remove the peer from the resolver.
peer.router.resolver.write().remove_peer(&peer.connected_addr);
};

let restricted = if restrict { Some(Instant::now()) } else { None };
*self = Self::Candidate(CandidatePeer { listener_addr, restricted });
*self = Self::Candidate(CandidatePeer { listener_addr, trusted: self.is_trusted() });
}

/// Returns the type of the node (only applicable to connected peers).
pub fn node_type(&self) -> Option<NodeType> {
match self {
Self::Candidate(_) => None,
Self::Connecting => None,
Self::Connecting(_) => None,
Self::Connected(peer) => Some(peer.node_type),
}
}

/// Returns `true` if the peer is currently undergoing the network handshake.
pub fn is_connecting(&self) -> bool {
matches!(self, Peer::Connecting)
matches!(self, Peer::Connecting(_))
}

/// Returns `true` if the peer has concluded the network handshake.
pub fn is_connected(&self) -> bool {
matches!(self, Peer::Connected(_))
}

/// Returns `true` if the peer is considered trusted.
pub fn is_trusted(&self) -> bool {
match self {
Self::Candidate(peer) => peer.trusted,
Self::Connecting(peer) => peer.trusted,
Self::Connected(peer) => peer.trusted,
}
}

/// Updates the peer's `last_seen` timestamp.
pub fn update_last_seen(&mut self) {
if let Self::Connected(ConnectedPeer { last_seen, .. }) = self {
*last_seen = Instant::now();
}
}

/// Updates the peer's version.
pub fn update_version(&mut self, new_version: u32) {
if let Self::Connected(ConnectedPeer { version, .. }) = self {
*version = new_version;
}
}
}
10 changes: 0 additions & 10 deletions node/router/src/inbound.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@

use crate::{
Outbound,
Peer,
messages::{
BlockRequest,
BlockResponse,
Expand Down Expand Up @@ -200,15 +199,6 @@ pub trait Inbound<N: Network>: Reading + Outbound<N> {
bail!("Peer '{peer_ip}' is a prover or client, but block locators were provided");
}

// Update the connected peer.
if let Err(error) =
self.router().update_connected_peer(peer_ip, message.node_type, |conn: &mut Peer<N>| {
conn.update_version(message.version);
})
{
bail!("[Ping] {error}");
}

// Process the ping message.
match self.ping(peer_ip, message) {
true => Ok(true),
Expand Down
Loading