diff --git a/node/router/src/handshake.rs b/node/router/src/handshake.rs index bdcb542711..5d2fe9136b 100644 --- a/node/router/src/handshake.rs +++ b/node/router/src/handshake.rs @@ -211,7 +211,7 @@ impl Router { // Finalize the connecting peer information. self.connecting_peers.lock().insert(peer_ip, Some(Peer::new(peer_ip, peer_addr, &peer_request))); // Adds a bidirectional map between the listener address and (ambiguous) peer address. - self.resolver.insert_peer(peer_ip, peer_addr); + self.resolver.write().insert_peer(peer_ip, peer_addr); Ok((peer_ip, framed)) } @@ -296,7 +296,7 @@ impl Router { // Finalize the connecting peer information. self.connecting_peers.lock().insert(peer_ip, Some(Peer::new(peer_ip, peer_addr, &peer_request))); // Adds a bidirectional map between the listener address and (ambiguous) peer address. - self.resolver.insert_peer(peer_ip, peer_addr); + self.resolver.write().insert_peer(peer_ip, peer_addr); Ok((peer_ip, framed)) } diff --git a/node/router/src/helpers/mod.rs b/node/router/src/helpers/mod.rs index 607f9a0519..915d3212de 100644 --- a/node/router/src/helpers/mod.rs +++ b/node/router/src/helpers/mod.rs @@ -20,4 +20,4 @@ mod peer; pub use peer::*; mod resolver; -pub use resolver::*; +pub(crate) use resolver::*; diff --git a/node/router/src/helpers/resolver.rs b/node/router/src/helpers/resolver.rs index 8f9aad3f23..d6057a8f7f 100644 --- a/node/router/src/helpers/resolver.rs +++ b/node/router/src/helpers/resolver.rs @@ -13,53 +13,40 @@ // See the License for the specific language governing permissions and // limitations under the License. -#[cfg(feature = "locktick")] -use locktick::parking_lot::RwLock; -#[cfg(not(feature = "locktick"))] -use parking_lot::RwLock; use std::{collections::HashMap, net::SocketAddr}; -#[derive(Debug)] -pub struct Resolver { +/// The `Resolver` provides the means to map the connected address (used in the lower-level +/// `tcp` module internals, and provided by the OS) to the listener address (used as the +/// unique peer identifier in the higher-level functions), and the other way around. +#[derive(Debug, Default)] +pub(crate) struct Resolver { /// The map of the listener address to (ambiguous) peer address. - from_listener: RwLock>, + from_listener: HashMap, /// The map of the (ambiguous) peer address to listener address. - to_listener: RwLock>, -} - -impl Default for Resolver { - /// Initializes a new instance of the resolver. - fn default() -> Self { - Self::new() - } + to_listener: HashMap, } impl Resolver { - /// Initializes a new instance of the resolver. - pub fn new() -> Self { - Self { from_listener: Default::default(), to_listener: Default::default() } - } - /// Returns the listener address for the given (ambiguous) peer address, if it exists. pub fn get_listener(&self, peer_addr: &SocketAddr) -> Option { - self.to_listener.read().get(peer_addr).copied() + self.to_listener.get(peer_addr).copied() } /// Returns the (ambiguous) peer address for the given listener address, if it exists. pub fn get_ambiguous(&self, peer_ip: &SocketAddr) -> Option { - self.from_listener.read().get(peer_ip).copied() + self.from_listener.get(peer_ip).copied() } /// Inserts a bidirectional mapping of the listener address and the (ambiguous) peer address. - pub fn insert_peer(&self, listener_ip: SocketAddr, peer_addr: SocketAddr) { - self.from_listener.write().insert(listener_ip, peer_addr); - self.to_listener.write().insert(peer_addr, listener_ip); + pub fn insert_peer(&mut self, listener_ip: SocketAddr, peer_addr: SocketAddr) { + self.from_listener.insert(listener_ip, peer_addr); + self.to_listener.insert(peer_addr, listener_ip); } /// Removes the bidirectional mapping of the listener address and the (ambiguous) peer address. - pub fn remove_peer(&self, listener_ip: &SocketAddr) { - if let Some(peer_addr) = self.from_listener.write().remove(listener_ip) { - self.to_listener.write().remove(&peer_addr); + pub fn remove_peer(&mut self, listener_ip: &SocketAddr) { + if let Some(peer_addr) = self.from_listener.remove(listener_ip) { + self.to_listener.remove(&peer_addr); } } } diff --git a/node/router/src/inbound.rs b/node/router/src/inbound.rs index 1433928eb2..0f9fae8ea9 100644 --- a/node/router/src/inbound.rs +++ b/node/router/src/inbound.rs @@ -73,12 +73,18 @@ pub trait Inbound: Reading + Outbound { } } - /// Handles the inbound message from the peer. - async fn inbound(&self, peer_addr: SocketAddr, message: Message) -> Result<()> { + /// Handles the inbound message from the peer. The returned value indicates whether + /// the connection is still active, and errors causing a disconnect once they are + /// propagated to the caller. + async fn inbound(&self, peer_addr: SocketAddr, message: Message) -> Result { // Retrieve the listener IP for the peer. let peer_ip = match self.router().resolve_to_listener(&peer_addr) { Some(peer_ip) => peer_ip, - None => bail!("Unable to resolve the (ambiguous) peer address '{peer_addr}'"), + None => { + // No longer connected to the peer. + trace!("Dropping a {} from {peer_addr} - no longer connected.", message.name()); + return Ok(false); + } }; // Drop the peer, if they have sent more than `MESSAGE_LIMIT` messages @@ -115,7 +121,7 @@ pub trait Inbound: Reading + Outbound { let node = self.clone(); match spawn_blocking(move || node.block_request(peer_ip, message)).await? { - true => Ok(()), + true => Ok(true), false => bail!("Peer '{peer_ip}' sent an invalid block request"), } } @@ -146,7 +152,7 @@ pub trait Inbound: Reading + Outbound { // Process the block response. let node = self.clone(); match spawn_blocking(move || node.block_response(peer_ip, blocks.0)).await? { - true => Ok(()), + true => Ok(true), false => bail!("Peer '{peer_ip}' sent an invalid block response"), } } @@ -155,10 +161,13 @@ pub trait Inbound: Reading + Outbound { bail!("Peer '{peer_ip}' is not following the protocol") } Message::Disconnect(message) => { - bail!("{:?}", message.reason) + // The peer informs us that they had disconnected. Disconnect from them too. + debug!("Peer '{peer_ip}' decided to disconnect due to '{:?}'", message.reason); + self.router().disconnect(peer_ip); + Ok(false) } Message::PeerRequest(..) => match self.peer_request(peer_ip) { - true => Ok(()), + true => Ok(true), false => bail!("Peer '{peer_ip}' sent an invalid peer request"), }, Message::PeerResponse(message) => { @@ -171,7 +180,7 @@ pub trait Inbound: Reading + Outbound { } match self.peer_response(peer_ip, &message.peers) { - true => Ok(()), + true => Ok(true), false => bail!("Peer '{peer_ip}' sent an invalid peer response"), } } @@ -205,12 +214,12 @@ pub trait Inbound: Reading + Outbound { // Process the ping message. match self.ping(peer_ip, message) { - true => Ok(()), + true => Ok(true), false => bail!("Peer '{peer_ip}' sent an invalid ping"), } } Message::Pong(message) => match self.pong(peer_ip, message) { - true => Ok(()), + true => Ok(true), false => bail!("Peer '{peer_ip}' sent an invalid pong"), }, Message::PuzzleRequest(..) => { @@ -222,7 +231,7 @@ pub trait Inbound: Reading + Outbound { } // Process the puzzle request. match self.puzzle_request(peer_ip) { - true => Ok(()), + true => Ok(true), false => bail!("Peer '{peer_ip}' sent an invalid puzzle request"), } } @@ -241,7 +250,7 @@ pub trait Inbound: Reading + Outbound { }; // Process the puzzle response. match self.puzzle_response(peer_ip, message.epoch_hash, header) { - true => Ok(()), + true => Ok(true), false => bail!("Peer '{peer_ip}' sent an invalid puzzle response"), } } @@ -249,7 +258,7 @@ pub trait Inbound: Reading + Outbound { // Do not process unconfirmed solutions if the node is too far behind. if !self.is_within_sync_leniency() { trace!("Skipped processing unconfirmed solution '{}' (node is syncing)", message.solution_id); - return Ok(()); + return Ok(true); } // Update the timestamp for the unconfirmed solution. @@ -257,7 +266,7 @@ pub trait Inbound: Reading + Outbound { // Determine whether to propagate the solution. if seen_before { trace!("Skipping 'UnconfirmedSolution' from '{peer_ip}'"); - return Ok(()); + return Ok(true); } // Clone the serialized message. let serialized = message.clone(); @@ -272,7 +281,7 @@ pub trait Inbound: Reading + Outbound { } // Handle the unconfirmed solution. match self.unconfirmed_solution(peer_ip, serialized, solution).await { - true => Ok(()), + true => Ok(true), false => bail!("Peer '{peer_ip}' sent an invalid unconfirmed solution"), } } @@ -280,7 +289,7 @@ pub trait Inbound: Reading + Outbound { // Do not process unconfirmed solutions if the node is too far behind. if !self.is_within_sync_leniency() { trace!("Skipped processing unconfirmed transaction '{}' (node is syncing)", message.transaction_id); - return Ok(()); + return Ok(true); } // Update the timestamp for the unconfirmed transaction. let seen_before = @@ -288,7 +297,7 @@ pub trait Inbound: Reading + Outbound { // Determine whether to propagate the transaction. if seen_before { trace!("Skipping 'UnconfirmedTransaction' from '{peer_ip}'"); - return Ok(()); + return Ok(true); } // Clone the serialized message. let serialized = message.clone(); @@ -303,7 +312,7 @@ pub trait Inbound: Reading + Outbound { } // Handle the unconfirmed transaction. match self.unconfirmed_transaction(peer_ip, serialized, transaction).await { - true => Ok(()), + true => Ok(true), false => bail!("Peer '{peer_ip}' sent an invalid unconfirmed transaction"), } } diff --git a/node/router/src/lib.rs b/node/router/src/lib.rs index 2dcd7ba6be..a715bf781a 100644 --- a/node/router/src/lib.rs +++ b/node/router/src/lib.rs @@ -96,7 +96,7 @@ pub struct InnerRouter { /// The cache. cache: Cache, /// The resolver. - resolver: Resolver, + resolver: RwLock, /// The set of trusted peers. trusted_peers: HashSet, /// The map of connected peer IPs to their peer handlers. @@ -261,6 +261,12 @@ impl Router { } disconnected } else { + // FIXME (ljedrz): this shouldn't be necessary; it's a double-check + // that the higher-level collection is consistent with the resolver. + if router.is_connected(&peer_ip) { + warn!("Fallback connection artifact cleanup (report this to @ljedrz)"); + router.remove_connected_peer(peer_ip); + } false } }) @@ -337,12 +343,12 @@ impl Router { /// Returns the listener IP address from the (ambiguous) peer address. pub fn resolve_to_listener(&self, peer_addr: &SocketAddr) -> Option { - self.resolver.get_listener(peer_addr) + self.resolver.read().get_listener(peer_addr) } /// Returns the (ambiguous) peer address from the listener IP address. pub fn resolve_to_ambiguous(&self, peer_ip: &SocketAddr) -> Option { - self.resolver.get_ambiguous(peer_ip) + self.resolver.read().get_ambiguous(peer_ip) } /// Returns `true` if the node is connected to the given peer IP. @@ -609,14 +615,14 @@ impl Router { /// Removes the connected peer and adds them to the candidate peers. pub fn remove_connected_peer(&self, peer_ip: SocketAddr) { - // Removes the bidirectional map between the listener address and (ambiguous) peer address. - self.resolver.remove_peer(&peer_ip); // Remove this peer from the connected peers, if it exists. self.connected_peers.write().remove(&peer_ip); - // Add the peer to the candidate peers. - self.candidate_peers.write().insert(peer_ip); + // Removes the bidirectional map between the listener address and (ambiguous) peer address. + self.resolver.write().remove_peer(&peer_ip); // Clear cached entries applicable to the peer. self.cache.clear_peer_entries(peer_ip); + // Add the peer to the candidate peers. + self.candidate_peers.write().insert(peer_ip); #[cfg(feature = "metrics")] self.update_metrics(); }