Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions node/router/src/handshake.rs
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ impl<N: Network> Router<N> {
// Finalize the connecting peer information.
self.connecting_peers.lock().insert(peer_ip, Some(Peer::new(peer_ip, peer_addr, &peer_request)));
// Adds a bidirectional map between the listener address and (ambiguous) peer address.
self.resolver.insert_peer(peer_ip, peer_addr);
self.resolver.write().insert_peer(peer_ip, peer_addr);

Ok((peer_ip, framed))
}
Expand Down Expand Up @@ -296,7 +296,7 @@ impl<N: Network> Router<N> {
// Finalize the connecting peer information.
self.connecting_peers.lock().insert(peer_ip, Some(Peer::new(peer_ip, peer_addr, &peer_request)));
// Adds a bidirectional map between the listener address and (ambiguous) peer address.
self.resolver.insert_peer(peer_ip, peer_addr);
self.resolver.write().insert_peer(peer_ip, peer_addr);

Ok((peer_ip, framed))
}
Expand Down
2 changes: 1 addition & 1 deletion node/router/src/helpers/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,4 @@ mod peer;
pub use peer::*;

mod resolver;
pub use resolver::*;
pub(crate) use resolver::*;
43 changes: 15 additions & 28 deletions node/router/src/helpers/resolver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,53 +13,40 @@
// See the License for the specific language governing permissions and
// limitations under the License.

#[cfg(feature = "locktick")]
use locktick::parking_lot::RwLock;
#[cfg(not(feature = "locktick"))]
use parking_lot::RwLock;
use std::{collections::HashMap, net::SocketAddr};

#[derive(Debug)]
pub struct Resolver {
/// The `Resolver` provides the means to map the connected address (used in the lower-level
/// `tcp` module internals, and provided by the OS) to the listener address (used as the
/// unique peer identifier in the higher-level functions), and the other way around.
#[derive(Debug, Default)]
pub(crate) struct Resolver {
/// The map of the listener address to (ambiguous) peer address.
from_listener: RwLock<HashMap<SocketAddr, SocketAddr>>,
from_listener: HashMap<SocketAddr, SocketAddr>,
/// The map of the (ambiguous) peer address to listener address.
to_listener: RwLock<HashMap<SocketAddr, SocketAddr>>,
}

impl Default for Resolver {
/// Initializes a new instance of the resolver.
fn default() -> Self {
Self::new()
}
to_listener: HashMap<SocketAddr, SocketAddr>,
}

impl Resolver {
/// Initializes a new instance of the resolver.
pub fn new() -> Self {
Self { from_listener: Default::default(), to_listener: Default::default() }
}

/// Returns the listener address for the given (ambiguous) peer address, if it exists.
pub fn get_listener(&self, peer_addr: &SocketAddr) -> Option<SocketAddr> {
self.to_listener.read().get(peer_addr).copied()
self.to_listener.get(peer_addr).copied()
}

/// Returns the (ambiguous) peer address for the given listener address, if it exists.
pub fn get_ambiguous(&self, peer_ip: &SocketAddr) -> Option<SocketAddr> {
self.from_listener.read().get(peer_ip).copied()
self.from_listener.get(peer_ip).copied()
}

/// Inserts a bidirectional mapping of the listener address and the (ambiguous) peer address.
pub fn insert_peer(&self, listener_ip: SocketAddr, peer_addr: SocketAddr) {
self.from_listener.write().insert(listener_ip, peer_addr);
self.to_listener.write().insert(peer_addr, listener_ip);
pub fn insert_peer(&mut self, listener_ip: SocketAddr, peer_addr: SocketAddr) {
self.from_listener.insert(listener_ip, peer_addr);
self.to_listener.insert(peer_addr, listener_ip);
}

/// Removes the bidirectional mapping of the listener address and the (ambiguous) peer address.
pub fn remove_peer(&self, listener_ip: &SocketAddr) {
if let Some(peer_addr) = self.from_listener.write().remove(listener_ip) {
self.to_listener.write().remove(&peer_addr);
pub fn remove_peer(&mut self, listener_ip: &SocketAddr) {
if let Some(peer_addr) = self.from_listener.remove(listener_ip) {
self.to_listener.remove(&peer_addr);
}
}
}
45 changes: 27 additions & 18 deletions node/router/src/inbound.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,12 +73,18 @@ pub trait Inbound<N: Network>: Reading + Outbound<N> {
}
}

/// Handles the inbound message from the peer.
async fn inbound(&self, peer_addr: SocketAddr, message: Message<N>) -> Result<()> {
/// Handles the inbound message from the peer. The returned value indicates whether
/// the connection is still active, and errors causing a disconnect once they are
/// propagated to the caller.
async fn inbound(&self, peer_addr: SocketAddr, message: Message<N>) -> Result<bool> {
// Retrieve the listener IP for the peer.
let peer_ip = match self.router().resolve_to_listener(&peer_addr) {
Some(peer_ip) => peer_ip,
None => bail!("Unable to resolve the (ambiguous) peer address '{peer_addr}'"),
None => {
// No longer connected to the peer.
trace!("Dropping a {} from {peer_addr} - no longer connected.", message.name());
return Ok(false);
}
};

// Drop the peer, if they have sent more than `MESSAGE_LIMIT` messages
Expand Down Expand Up @@ -115,7 +121,7 @@ pub trait Inbound<N: Network>: Reading + Outbound<N> {

let node = self.clone();
match spawn_blocking(move || node.block_request(peer_ip, message)).await? {
true => Ok(()),
true => Ok(true),
false => bail!("Peer '{peer_ip}' sent an invalid block request"),
}
}
Expand Down Expand Up @@ -146,7 +152,7 @@ pub trait Inbound<N: Network>: Reading + Outbound<N> {
// Process the block response.
let node = self.clone();
match spawn_blocking(move || node.block_response(peer_ip, blocks.0)).await? {
true => Ok(()),
true => Ok(true),
false => bail!("Peer '{peer_ip}' sent an invalid block response"),
}
}
Expand All @@ -155,10 +161,13 @@ pub trait Inbound<N: Network>: Reading + Outbound<N> {
bail!("Peer '{peer_ip}' is not following the protocol")
}
Message::Disconnect(message) => {
bail!("{:?}", message.reason)
// The peer informs us that they had disconnected. Disconnect from them too.
debug!("Peer '{peer_ip}' decided to disconnect due to '{:?}'", message.reason);
self.router().disconnect(peer_ip);
Ok(false)
}
Message::PeerRequest(..) => match self.peer_request(peer_ip) {
true => Ok(()),
true => Ok(true),
false => bail!("Peer '{peer_ip}' sent an invalid peer request"),
},
Message::PeerResponse(message) => {
Expand All @@ -171,7 +180,7 @@ pub trait Inbound<N: Network>: Reading + Outbound<N> {
}

match self.peer_response(peer_ip, &message.peers) {
true => Ok(()),
true => Ok(true),
false => bail!("Peer '{peer_ip}' sent an invalid peer response"),
}
}
Expand Down Expand Up @@ -205,12 +214,12 @@ pub trait Inbound<N: Network>: Reading + Outbound<N> {

// Process the ping message.
match self.ping(peer_ip, message) {
true => Ok(()),
true => Ok(true),
false => bail!("Peer '{peer_ip}' sent an invalid ping"),
}
}
Message::Pong(message) => match self.pong(peer_ip, message) {
true => Ok(()),
true => Ok(true),
false => bail!("Peer '{peer_ip}' sent an invalid pong"),
},
Message::PuzzleRequest(..) => {
Expand All @@ -222,7 +231,7 @@ pub trait Inbound<N: Network>: Reading + Outbound<N> {
}
// Process the puzzle request.
match self.puzzle_request(peer_ip) {
true => Ok(()),
true => Ok(true),
false => bail!("Peer '{peer_ip}' sent an invalid puzzle request"),
}
}
Expand All @@ -241,23 +250,23 @@ pub trait Inbound<N: Network>: Reading + Outbound<N> {
};
// Process the puzzle response.
match self.puzzle_response(peer_ip, message.epoch_hash, header) {
true => Ok(()),
true => Ok(true),
false => bail!("Peer '{peer_ip}' sent an invalid puzzle response"),
}
}
Message::UnconfirmedSolution(message) => {
// Do not process unconfirmed solutions if the node is too far behind.
if !self.is_within_sync_leniency() {
trace!("Skipped processing unconfirmed solution '{}' (node is syncing)", message.solution_id);
return Ok(());
return Ok(true);
}

// Update the timestamp for the unconfirmed solution.
let seen_before = self.router().cache.insert_inbound_solution(peer_ip, message.solution_id).is_some();
// Determine whether to propagate the solution.
if seen_before {
trace!("Skipping 'UnconfirmedSolution' from '{peer_ip}'");
return Ok(());
return Ok(true);
}
// Clone the serialized message.
let serialized = message.clone();
Expand All @@ -272,23 +281,23 @@ pub trait Inbound<N: Network>: Reading + Outbound<N> {
}
// Handle the unconfirmed solution.
match self.unconfirmed_solution(peer_ip, serialized, solution).await {
true => Ok(()),
true => Ok(true),
false => bail!("Peer '{peer_ip}' sent an invalid unconfirmed solution"),
}
}
Message::UnconfirmedTransaction(message) => {
// Do not process unconfirmed solutions if the node is too far behind.
if !self.is_within_sync_leniency() {
trace!("Skipped processing unconfirmed transaction '{}' (node is syncing)", message.transaction_id);
return Ok(());
return Ok(true);
}
// Update the timestamp for the unconfirmed transaction.
let seen_before =
self.router().cache.insert_inbound_transaction(peer_ip, message.transaction_id).is_some();
// Determine whether to propagate the transaction.
if seen_before {
trace!("Skipping 'UnconfirmedTransaction' from '{peer_ip}'");
return Ok(());
return Ok(true);
}
// Clone the serialized message.
let serialized = message.clone();
Expand All @@ -303,7 +312,7 @@ pub trait Inbound<N: Network>: Reading + Outbound<N> {
}
// Handle the unconfirmed transaction.
match self.unconfirmed_transaction(peer_ip, serialized, transaction).await {
true => Ok(()),
true => Ok(true),
false => bail!("Peer '{peer_ip}' sent an invalid unconfirmed transaction"),
}
}
Expand Down
20 changes: 13 additions & 7 deletions node/router/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ pub struct InnerRouter<N: Network> {
/// The cache.
cache: Cache<N>,
/// The resolver.
resolver: Resolver,
resolver: RwLock<Resolver>,
/// The set of trusted peers.
trusted_peers: HashSet<SocketAddr>,
/// The map of connected peer IPs to their peer handlers.
Expand Down Expand Up @@ -261,6 +261,12 @@ impl<N: Network> Router<N> {
}
disconnected
} else {
// FIXME (ljedrz): this shouldn't be necessary; it's a double-check
// that the higher-level collection is consistent with the resolver.
if router.is_connected(&peer_ip) {
warn!("Fallback connection artifact cleanup (report this to @ljedrz)");
router.remove_connected_peer(peer_ip);
}
false
}
})
Expand Down Expand Up @@ -337,12 +343,12 @@ impl<N: Network> Router<N> {

/// Returns the listener IP address from the (ambiguous) peer address.
pub fn resolve_to_listener(&self, peer_addr: &SocketAddr) -> Option<SocketAddr> {
self.resolver.get_listener(peer_addr)
self.resolver.read().get_listener(peer_addr)
}

/// Returns the (ambiguous) peer address from the listener IP address.
pub fn resolve_to_ambiguous(&self, peer_ip: &SocketAddr) -> Option<SocketAddr> {
self.resolver.get_ambiguous(peer_ip)
self.resolver.read().get_ambiguous(peer_ip)
}

/// Returns `true` if the node is connected to the given peer IP.
Expand Down Expand Up @@ -609,14 +615,14 @@ impl<N: Network> Router<N> {

/// Removes the connected peer and adds them to the candidate peers.
pub fn remove_connected_peer(&self, peer_ip: SocketAddr) {
// Removes the bidirectional map between the listener address and (ambiguous) peer address.
self.resolver.remove_peer(&peer_ip);
// Remove this peer from the connected peers, if it exists.
self.connected_peers.write().remove(&peer_ip);
// Add the peer to the candidate peers.
self.candidate_peers.write().insert(peer_ip);
// Removes the bidirectional map between the listener address and (ambiguous) peer address.
self.resolver.write().remove_peer(&peer_ip);
// Clear cached entries applicable to the peer.
self.cache.clear_peer_entries(peer_ip);
// Add the peer to the candidate peers.
self.candidate_peers.write().insert(peer_ip);
#[cfg(feature = "metrics")]
self.update_metrics();
}
Expand Down