Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
129 changes: 67 additions & 62 deletions node/router/src/handshake.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use snarkvm::{
use anyhow::{Result, bail};
use futures::SinkExt;
use rand::{Rng, rngs::OsRng};
use std::{collections::hash_map::Entry, io, net::SocketAddr};
use std::{collections::hash_map::Entry, io, mem, net::SocketAddr};
use tokio::net::TcpStream;
use tokio_stream::StreamExt;
use tokio_util::codec::Framed;
Expand Down Expand Up @@ -90,14 +90,14 @@ impl<N: Network> Router<N> {
peer_side: ConnectionSide,
genesis_header: Header<N>,
restrictions_id: Field<N>,
) -> io::Result<(SocketAddr, Framed<&'a mut TcpStream, MessageCodec<N>>)> {
) -> io::Result<ChallengeRequest<N>> {
// If this is an inbound connection, we log it, but don't know the listening address yet.
// Otherwise, we can immediately register the listening address.
let mut peer_ip = if peer_side == ConnectionSide::Initiator {
let mut listener_addr = if peer_side == ConnectionSide::Initiator {
debug!("Received a connection request from '{peer_addr}'");
None
} else {
debug!("Connecting to {peer_addr}...");
debug!("Connecting to '{peer_addr}'...");
Some(peer_addr)
};

Expand All @@ -121,17 +121,23 @@ impl<N: Network> Router<N> {
}
}

// Perform the handshake; we pass on a mutable reference to peer_ip in case the process is broken at any point in time.
// Perform the handshake; we pass on a mutable reference to listener_addr in case the process is broken at any point in time.
let handshake_result = if peer_side == ConnectionSide::Responder {
self.handshake_inner_initiator(peer_addr, &mut peer_ip, stream, genesis_header, restrictions_id).await
self.handshake_inner_initiator(peer_addr, stream, genesis_header, restrictions_id).await
} else {
self.handshake_inner_responder(peer_addr, &mut peer_ip, stream, genesis_header, restrictions_id).await
self.handshake_inner_responder(peer_addr, &mut listener_addr, stream, genesis_header, restrictions_id).await
};

if let Some(ip) = peer_ip {
if handshake_result.is_err() {
// Remove the address from the collection of connecting peers (if the handshake got to the point where it's known).
self.connecting_peers.lock().remove(&ip);
if let Some(addr) = listener_addr {
if let Ok(ref challenge_request) = handshake_result {
if let Some(peer) = self.peer_pool.write().get_mut(&addr) {
peer.upgrade_to_connected(peer_addr, challenge_request, self.clone());
}
#[cfg(feature = "metrics")]
self.update_metrics();
debug!("Completed the handshake with '{peer_addr}'");
} else if let Some(peer) = self.peer_pool.write().get_mut(&addr) {
peer.downgrade_to_candidate(addr, false);
}
}

Expand All @@ -142,13 +148,21 @@ impl<N: Network> Router<N> {
async fn handshake_inner_initiator<'a>(
&'a self,
peer_addr: SocketAddr,
peer_ip: &mut Option<SocketAddr>,
stream: &'a mut TcpStream,
genesis_header: Header<N>,
restrictions_id: Field<N>,
) -> io::Result<(SocketAddr, Framed<&'a mut TcpStream, MessageCodec<N>>)> {
// This value is immediately guaranteed to be present, so it can be unwrapped.
let peer_ip = peer_ip.unwrap();
) -> io::Result<ChallengeRequest<N>> {
match self.peer_pool.write().entry(peer_addr) {
Entry::Vacant(entry) => {
entry.insert(Peer::Connecting);
}
Entry::Occupied(mut entry) if matches!(entry.get(), Peer::Candidate(_)) => {
entry.insert(Peer::Connecting);
}
Entry::Occupied(_) => {
return Err(error(format!("Duplicate connection attempt with '{peer_addr}'")));
}
}
// Construct the stream.
let mut framed = Framed::new(stream, MessageCodec::<N>::handshake());

Expand Down Expand Up @@ -191,6 +205,7 @@ impl<N: Network> Router<N> {
send(&mut framed, peer_addr, reason.into()).await?;
return Err(error(format!("Dropped '{peer_addr}' for reason: {reason:?}")));
}

/* Step 3: Send the challenge response. */

let response_nonce: u64 = rng.r#gen();
Expand All @@ -208,23 +223,18 @@ impl<N: Network> Router<N> {
};
send(&mut framed, peer_addr, Message::ChallengeResponse(our_response)).await?;

// Finalize the connecting peer information.
self.connecting_peers.lock().insert(peer_ip, Some(Peer::new(peer_ip, peer_addr, &peer_request)));
// Adds a bidirectional map between the listener address and (ambiguous) peer address.
self.resolver.write().insert_peer(peer_ip, peer_addr);

Ok((peer_ip, framed))
Ok(peer_request)
}

/// The connection responder side of the handshake.
async fn handshake_inner_responder<'a>(
&'a self,
peer_addr: SocketAddr,
peer_ip: &mut Option<SocketAddr>,
listener_addr: &mut Option<SocketAddr>,
stream: &'a mut TcpStream,
genesis_header: Header<N>,
restrictions_id: Field<N>,
) -> io::Result<(SocketAddr, Framed<&'a mut TcpStream, MessageCodec<N>>)> {
) -> io::Result<ChallengeRequest<N>> {
// Construct the stream.
let mut framed = Framed::new(stream, MessageCodec::<N>::handshake());

Expand All @@ -234,18 +244,19 @@ impl<N: Network> Router<N> {
let peer_request = expect_message!(Message::ChallengeRequest, framed, peer_addr);

// Obtain the peer's listening address.
*peer_ip = Some(SocketAddr::new(peer_addr.ip(), peer_request.listener_port));
let peer_ip = peer_ip.unwrap();
*listener_addr = Some(SocketAddr::new(peer_addr.ip(), peer_request.listener_port));
let listener_addr = listener_addr.unwrap();

// Knowing the peer's listening address, ensure it is allowed to connect.
if let Err(forbidden_message) = self.ensure_peer_is_allowed(peer_ip) {
if let Err(forbidden_message) = self.ensure_peer_is_allowed(listener_addr) {
return Err(error(format!("{forbidden_message}")));
}
// Verify the challenge request. If a disconnect reason was returned, send the disconnect message and abort.
if let Some(reason) = self.verify_challenge_request(peer_addr, &peer_request) {
send(&mut framed, peer_addr, reason.into()).await?;
return Err(error(format!("Dropped '{peer_addr}' for reason: {reason:?}")));
}

/* Step 2: Send the challenge response followed by own challenge request. */

// Initialize an RNG.
Expand Down Expand Up @@ -293,50 +304,44 @@ impl<N: Network> Router<N> {
return Err(error(format!("Dropped '{peer_addr}' for reason: {reason:?}")));
}

// Finalize the connecting peer information.
self.connecting_peers.lock().insert(peer_ip, Some(Peer::new(peer_ip, peer_addr, &peer_request)));
// Adds a bidirectional map between the listener address and (ambiguous) peer address.
self.resolver.write().insert_peer(peer_ip, peer_addr);

Ok((peer_ip, framed))
Ok(peer_request)
}

/// Ensure the peer is allowed to connect.
fn ensure_peer_is_allowed(&self, peer_ip: SocketAddr) -> Result<()> {
fn ensure_peer_is_allowed(&self, listener_addr: SocketAddr) -> Result<()> {
// Ensure the peer IP is not this node.
if self.is_local_ip(&peer_ip) {
bail!("Dropping connection request from '{peer_ip}' (attempted to self-connect)")
}
// Ensure the node is not already connecting to this peer.
match self.connecting_peers.lock().entry(peer_ip) {
Entry::Vacant(entry) => entry.insert(None),
Entry::Occupied(_) => {
bail!("Dropping connection request from '{peer_ip}' (already shaking hands as the initiator)")
}
};
// Ensure the node is not already connected to this peer.
if self.is_connected(&peer_ip) {
bail!("Dropping connection request from '{peer_ip}' (already connected)")
if self.is_local_ip(&listener_addr) {
bail!("Dropping connection request from '{listener_addr}' (attempted to self-connect)")
}
// Ensure either the peer is trusted or `allow_external_peers` is true.
if !self.allow_external_peers() && !self.is_trusted(&peer_ip) {
bail!("Dropping connection request from '{peer_ip}' (untrusted)")
if !self.allow_external_peers() && !self.is_trusted(&listener_addr) {
bail!("Dropping connection request from '{listener_addr}' (untrusted)")
}
// Ensure the peer is not restricted.
if self.is_restricted(&peer_ip) {
bail!("Dropping connection request from '{peer_ip}' (restricted)")
}
// Ensure the peer is not spamming connection attempts.
if !peer_ip.ip().is_loopback() {
// Add this connection attempt and retrieve the number of attempts.
let num_attempts = self.cache.insert_inbound_connection(peer_ip.ip(), Self::RADIO_SILENCE_IN_SECS as i64);
// Ensure the connecting peer has not surpassed the connection attempt limit.
if num_attempts > Self::MAXIMUM_CONNECTION_FAILURES {
// Restrict the peer.
self.insert_restricted_peer(peer_ip);
bail!("Dropping connection request from '{peer_ip}' (tried {num_attempts} times)")
// Ensure the node is not already connecting to this peer.
match self.peer_pool.write().entry(listener_addr) {
Entry::Vacant(entry) => {
entry.insert(Peer::Connecting);
}
}
Entry::Occupied(mut entry) => match entry.get_mut() {
Peer::Candidate(peer)
if peer
.restricted
.map(|ts| ts.elapsed().as_secs() < Self::RADIO_SILENCE_IN_SECS)
.unwrap_or(false) =>
{
bail!("Dropping connection request from '{listener_addr}' (restricted)");
}
peer @ Peer::Candidate(_) => {
let _ = mem::replace(peer, Peer::Connecting);
}
Peer::Connecting => {
bail!("Dropping connection request from '{listener_addr}' (already connecting)");
}
Peer::Connected(_) => {
bail!("Dropping connection request from '{listener_addr}' (already connected)");
}
},
};
Ok(())
}

Expand Down
34 changes: 18 additions & 16 deletions node/router/src/heartbeat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@
// limitations under the License.

use crate::{
ConnectedPeer,
Outbound,
Peer,
Router,
messages::{DisconnectReason, Message, PeerRequest},
};
Expand Down Expand Up @@ -99,11 +99,11 @@ pub trait Heartbeat<N: Network>: Outbound<N> {
// Check if any connected peer is stale.
for peer in self.router().get_connected_peers() {
// Disconnect if the peer has not communicated back within the predefined time.
let elapsed = peer.last_seen().elapsed().as_secs();
let elapsed = peer.last_seen.elapsed().as_secs();
if elapsed > Router::<N>::RADIO_SILENCE_IN_SECS {
warn!("Peer {} has not communicated in {elapsed} seconds", peer.ip());
warn!("Peer {} has not communicated in {elapsed} seconds", peer.listener_addr);
// Disconnect from this peer.
self.router().disconnect(peer.ip());
self.router().disconnect(peer.listener_addr);
}
}
}
Expand All @@ -116,7 +116,7 @@ pub trait Heartbeat<N: Network>: Outbound<N> {
/// - Peers that we are currently syncing with are not removable.
/// - Validators are considered higher priority than provers or clients.
/// - Connections that have not been seen in a while are considered lower priority.
fn get_removable_peers(&self) -> Vec<Peer<N>> {
fn get_removable_peers(&self) -> Vec<ConnectedPeer<N>> {
// The trusted peers (specified at runtime).
let trusted = self.router().trusted_peers();
// The hardcoded bootstrap nodes.
Expand All @@ -129,16 +129,18 @@ pub trait Heartbeat<N: Network>: Outbound<N> {
// Note, that this gives equal priority to clients and provers, which
// we might want to change in the future.
let mut peers = self.router().get_connected_peers();
peers.sort_by_key(|peer| (peer.is_validator(), peer.last_seen()));
peers.sort_by_key(|peer| (peer.node_type.is_validator(), peer.last_seen));

// Determine which of the peers can be removed.
peers
.into_iter()
.filter(|peer| {
!trusted.contains(&peer.ip()) // Always keep trusted nodes.
&& !bootstrap.contains(&peer.ip()) // Always keep bootstrap nodes.
&& !self.router().cache.contains_inbound_block_request(&peer.ip()) // This peer is currently syncing from us.
&& (is_block_synced || self.router().cache.num_outbound_block_requests(&peer.ip()) == 0) // We are currently syncing from this peer.
let addr = peer.listener_addr;

!trusted.contains(&addr) // Always keep trusted nodes.
&& !bootstrap.contains(&addr) // Always keep bootstrap nodes.
&& !self.router().cache.contains_inbound_block_request(&addr) // This peer is currently syncing from us.
&& (is_block_synced || self.router().cache.num_outbound_block_requests(&addr) == 0) // We are currently syncing from this peer.
})
.collect()
}
Expand All @@ -160,7 +162,7 @@ pub trait Heartbeat<N: Network>: Outbound<N> {
// Disconnect from the oldest connected peer, which is the first entry in the list
// of removable peers.
// Do nothing, if the list is empty.
if let Some(oldest) = self.get_removable_peers().first().map(|peer| peer.ip()) {
if let Some(oldest) = self.get_removable_peers().first().map(|peer| peer.listener_addr) {
info!("Disconnecting from '{oldest}' (periodic refresh of peers)");
let _ = self.router().send(oldest, Message::Disconnect(DisconnectReason::PeerRefresh.into()));
self.router().disconnect(oldest);
Expand Down Expand Up @@ -217,16 +219,16 @@ pub trait Heartbeat<N: Network>: Outbound<N> {
let peers_to_disconnect = self
.get_removable_peers()
.into_iter()
.filter(|peer| !peer.is_prover()) // remove provers as those are handled separately
.map(|p| p.ip())
.filter(|peer| !peer.node_type.is_prover()) // remove provers as those are handled separately
.map(|peer| peer.listener_addr)
.take(num_surplus_clients_validators);

// Proceed to send disconnect requests to these peers.
for peer_ip in peers_to_disconnect.chain(provers_to_disconnect) {
// TODO (howardwu): Remove this after specializing this function.
if self.router().node_type().is_prover() {
if let Some(peer) = self.router().get_connected_peer(&peer_ip) {
if peer.node_type().is_validator() {
if peer.node_type.is_validator() {
continue;
}
}
Expand Down Expand Up @@ -313,8 +315,8 @@ pub trait Heartbeat<N: Network>: Outbound<N> {
.trusted_peers()
.iter()
.filter_map(|peer_ip| {
// If the peer is not connected, attempt to connect to it.
if self.router().is_connected(peer_ip) {
// If the peer is not already connecting or connected, attempt to connect to it.
if self.router().is_connecting(peer_ip) || self.router().is_connected(peer_ip) {
None
} else {
debug!("Attempting to (re-)connect to trusted peer `{peer_ip}`");
Expand Down
Loading