From 24f0a5243792cd0e92d7a270acd4784027bbb9ed Mon Sep 17 00:00:00 2001 From: j01tz <47043188+j01tz@users.noreply.github.com> Date: Wed, 21 Aug 2019 04:43:09 -0700 Subject: [PATCH] Improve checking for p2p connection limits (#2985) * Add check for p2p connection limits * Simplify undesirable connection shutdown * Make inbound and outbound connections more explicit * Cleanup inbound and outbound connections * Cleanup an outbound peers check * Rename healthy_peers_mix to enough_outbound_peers --- config/src/comments.rs | 16 +++++--- p2p/src/peers.rs | 62 +++++++++++++++++++++---------- p2p/src/serv.rs | 28 +++++++++----- p2p/src/types.rs | 65 +++++++++++++++++++++++++-------- servers/src/grin/seed.rs | 18 ++++++--- servers/src/grin/sync/syncer.rs | 2 +- 6 files changed, 133 insertions(+), 58 deletions(-) diff --git a/config/src/comments.rs b/config/src/comments.rs index 1262b12e93..b0e9f7f37c 100644 --- a/config/src/comments.rs +++ b/config/src/comments.rs @@ -274,12 +274,18 @@ fn comments() -> HashMap { #how long a banned peer should stay banned #ban_window = 10800 -#maximum number of peers -#peer_max_count = 125 +#maximum number of inbound peer connections +#peer_max_inbound_count = 128 -#preferred minimum number of peers (we'll actively keep trying to add peers -#until we get to at least this number -#peer_min_preferred_count = 8 +#maximum number of outbound peer connections +#peer_max_outbound_count = 8 + +#preferred minimum number of outbound peers (we'll actively keep trying to add peers +#until we get to at least this number) +#peer_min_preferred_outbound_count = 8 + +#amount of incoming connections temporarily allowed to exceed peer_max_inbound_count +#peer_listener_buffer_count = 8 # 15 = Bit flags for FULL_NODE #This structure needs to be changed internally, to make it more configurable diff --git a/p2p/src/peers.rs b/p2p/src/peers.rs index 5c1ec247ac..4b5f16d5a2 100644 --- a/p2p/src/peers.rs +++ b/p2p/src/peers.rs @@ -126,6 +126,7 @@ impl Peers { res } + /// Get vec of peers we currently have an outgoing connection with. pub fn outgoing_connected_peers(&self) -> Vec> { self.connected_peers() .into_iter() @@ -133,6 +134,14 @@ impl Peers { .collect() } + /// Get vec of peers we currently have an incoming connection with. + pub fn incoming_connected_peers(&self) -> Vec> { + self.connected_peers() + .into_iter() + .filter(|x| x.info.is_inbound()) + .collect() + } + /// Get a peer we're connected to by address. pub fn get_connected_peer(&self, addr: PeerAddr) -> Option> { let peers = match self.peers.try_read_for(LOCK_TIMEOUT) { @@ -155,6 +164,11 @@ impl Peers { self.outgoing_connected_peers().len() as u32 } + /// Number of inbound peers currently connected to. + pub fn peer_inbound_count(&self) -> u32 { + self.incoming_connected_peers().len() as u32 + } + // Return vec of connected peers that currently advertise more work // (total_difficulty) than we do. pub fn more_work_peers(&self) -> Result>, chain::Error> { @@ -324,7 +338,8 @@ impl Peers { /// A peer implementation may drop the broadcast request /// if it knows the remote peer already has the block. pub fn broadcast_compact_block(&self, b: &core::CompactBlock) { - let num_peers = self.config.peer_max_count(); + let num_peers = + self.config.peer_max_inbound_count() + self.config.peer_max_outbound_count(); let count = self.broadcast("compact block", num_peers, |p| p.send_compact_block(b)); debug!( "broadcast_compact_block: {}, {} at {}, to {} peers, done.", @@ -341,7 +356,7 @@ impl Peers { /// A peer implementation may drop the broadcast request /// if it knows the remote peer already has the header. pub fn broadcast_header(&self, bh: &core::BlockHeader) { - let num_peers = self.config.peer_min_preferred_count(); + let num_peers = self.config.peer_min_preferred_outbound_count(); let count = self.broadcast("header", num_peers, |p| p.send_header(bh)); debug!( "broadcast_header: {}, {} at {}, to {} peers, done.", @@ -358,7 +373,8 @@ impl Peers { /// A peer implementation may drop the broadcast request /// if it knows the remote peer already has the transaction. pub fn broadcast_transaction(&self, tx: &core::Transaction) { - let num_peers = self.config.peer_max_count(); + let num_peers = + self.config.peer_max_inbound_count() + self.config.peer_max_outbound_count(); let count = self.broadcast("transaction", num_peers, |p| p.send_transaction(tx)); debug!( "broadcast_transaction: {} to {} peers, done.", @@ -433,7 +449,7 @@ impl Peers { /// Iterate over the peer list and prune all peers we have /// lost connection to or have been deemed problematic. /// Also avoid connected peer count getting too high. - pub fn clean_peers(&self, max_count: usize) { + pub fn clean_peers(&self, max_inbound_count: usize, max_outbound_count: usize) { let mut rm = vec![]; // build a list of peers to be cleaned up @@ -477,16 +493,27 @@ impl Peers { } } - // ensure we do not still have too many connected peers - let excess_count = (self.peer_count() as usize) - .saturating_sub(rm.len()) - .saturating_sub(max_count); - if excess_count > 0 { - // map peers to addrs in a block to bound how long we keep the read lock for + // check here to make sure we don't have too many outgoing connections + let excess_outgoing_count = + (self.peer_outbound_count() as usize).saturating_sub(max_outbound_count); + if excess_outgoing_count > 0 { + let mut addrs = self + .outgoing_connected_peers() + .iter() + .take(excess_outgoing_count) + .map(|x| x.info.addr.clone()) + .collect::>(); + rm.append(&mut addrs); + } + + // check here to make sure we don't have too many incoming connections + let excess_incoming_count = + (self.peer_inbound_count() as usize).saturating_sub(max_inbound_count); + if excess_incoming_count > 0 { let mut addrs = self - .connected_peers() + .incoming_connected_peers() .iter() - .take(excess_count) + .take(excess_incoming_count) .map(|x| x.info.addr.clone()) .collect::>(); rm.append(&mut addrs); @@ -518,14 +545,9 @@ impl Peers { } } - pub fn enough_peers(&self) -> bool { - self.peer_count() >= self.config.peer_min_preferred_count() - } - - /// We have enough peers, both total connected and outbound connected - pub fn healthy_peers_mix(&self) -> bool { - self.enough_peers() - && self.peer_outbound_count() >= self.config.peer_min_preferred_count() / 2 + /// We have enough outbound connected peers + pub fn enough_outbound_peers(&self) -> bool { + self.peer_outbound_count() >= self.config.peer_min_preferred_outbound_count() } /// Removes those peers that seem to have expired diff --git a/p2p/src/serv.rs b/p2p/src/serv.rs index 7da4eebbaf..a8a48ec07b 100644 --- a/p2p/src/serv.rs +++ b/p2p/src/serv.rs @@ -87,6 +87,10 @@ impl Server { let peer_addr = PeerAddr(peer_addr); if self.check_undesirable(&stream) { + // Shutdown the incoming TCP connection if it is not desired + if let Err(e) = stream.shutdown(Shutdown::Both) { + debug!("Error shutting down conn: {:?}", e); + } continue; } match self.handle_new_peer(stream) { @@ -194,30 +198,34 @@ impl Server { Ok(()) } - /// Checks whether there's any reason we don't want to accept a peer - /// connection. There can be a couple of them: - /// 1. The peer has been previously banned and the ban period hasn't + /// Checks whether there's any reason we don't want to accept an incoming peer + /// connection. There can be a few of them: + /// 1. Accepting the peer connection would exceed the configured maximum allowed + /// inbound peer count. Note that seed nodes may wish to increase the default + /// value for PEER_LISTENER_BUFFER_COUNT to help with network bootstrapping. + /// A default buffer of 8 peers is allowed to help with network growth. + /// 2. The peer has been previously banned and the ban period hasn't /// expired yet. - /// 2. We're already connected to a peer at the same IP. While there are + /// 3. We're already connected to a peer at the same IP. While there are /// many reasons multiple peers can legitimately share identical IP /// addresses (NAT), network distribution is improved if they choose /// different sets of peers themselves. In addition, it prevent potential /// duplicate connections, malicious or not. fn check_undesirable(&self, stream: &TcpStream) -> bool { + if self.peers.peer_inbound_count() + >= self.config.peer_max_inbound_count() + self.config.peer_listener_buffer_count() + { + debug!("Accepting new connection will exceed peer limit, refusing connection."); + return true; + } if let Ok(peer_addr) = stream.peer_addr() { let peer_addr = PeerAddr(peer_addr); if self.peers.is_banned(peer_addr) { debug!("Peer {} banned, refusing connection.", peer_addr); - if let Err(e) = stream.shutdown(Shutdown::Both) { - debug!("Error shutting down conn: {:?}", e); - } return true; } if self.peers.is_known(peer_addr) { debug!("Peer {} already known, refusing connection.", peer_addr); - if let Err(e) = stream.shutdown(Shutdown::Both) { - debug!("Error shutting down conn: {:?}", e); - } return true; } } diff --git a/p2p/src/types.rs b/p2p/src/types.rs index 735d90c0fe..bcd92a4d77 100644 --- a/p2p/src/types.rs +++ b/p2p/src/types.rs @@ -48,11 +48,18 @@ pub const MAX_LOCATORS: u32 = 20; /// How long a banned peer should be banned for const BAN_WINDOW: i64 = 10800; -/// The max peer count -const PEER_MAX_COUNT: u32 = 125; +/// The max inbound peer count +const PEER_MAX_INBOUND_COUNT: u32 = 128; -/// min preferred peer count -const PEER_MIN_PREFERRED_COUNT: u32 = 8; +/// The max outbound peer count +const PEER_MAX_OUTBOUND_COUNT: u32 = 8; + +/// The min preferred outbound peer count +const PEER_MIN_PREFERRED_OUTBOUND_COUNT: u32 = 8; + +/// The peer listener buffer count. Allows temporarily accepting more connections +/// than allowed by PEER_MAX_INBOUND_COUNT to encourage network bootstrapping. +const PEER_LISTENER_BUFFER_COUNT: u32 = 8; #[derive(Debug)] pub enum Error { @@ -229,9 +236,13 @@ pub struct P2PConfig { pub ban_window: Option, - pub peer_max_count: Option, + pub peer_max_inbound_count: Option, + + pub peer_max_outbound_count: Option, + + pub peer_min_preferred_outbound_count: Option, - pub peer_min_preferred_count: Option, + pub peer_listener_buffer_count: Option, pub dandelion_peer: Option, } @@ -250,8 +261,10 @@ impl Default for P2PConfig { peers_deny: None, peers_preferred: None, ban_window: None, - peer_max_count: None, - peer_min_preferred_count: None, + peer_max_inbound_count: None, + peer_max_outbound_count: None, + peer_min_preferred_outbound_count: None, + peer_listener_buffer_count: None, dandelion_peer: None, } } @@ -268,19 +281,35 @@ impl P2PConfig { } } - /// return peer_max_count - pub fn peer_max_count(&self) -> u32 { - match self.peer_max_count { + /// return maximum inbound peer connections count + pub fn peer_max_inbound_count(&self) -> u32 { + match self.peer_max_inbound_count { + Some(n) => n, + None => PEER_MAX_INBOUND_COUNT, + } + } + + /// return maximum outbound peer connections count + pub fn peer_max_outbound_count(&self) -> u32 { + match self.peer_max_outbound_count { Some(n) => n, - None => PEER_MAX_COUNT, + None => PEER_MAX_OUTBOUND_COUNT, } } - /// return peer_preferred_count - pub fn peer_min_preferred_count(&self) -> u32 { - match self.peer_min_preferred_count { + /// return minimum preferred outbound peer count + pub fn peer_min_preferred_outbound_count(&self) -> u32 { + match self.peer_min_preferred_outbound_count { Some(n) => n, - None => PEER_MIN_PREFERRED_COUNT, + None => PEER_MIN_PREFERRED_OUTBOUND_COUNT, + } + } + + /// return peer buffer count for listener + pub fn peer_listener_buffer_count(&self) -> u32 { + match self.peer_listener_buffer_count { + Some(n) => n, + None => PEER_LISTENER_BUFFER_COUNT, } } } @@ -398,6 +427,10 @@ impl PeerInfo { self.direction == Direction::Outbound } + pub fn is_inbound(&self) -> bool { + self.direction == Direction::Inbound + } + /// The current height of the peer. pub fn height(&self) -> u64 { self.live_info.read().height diff --git a/servers/src/grin/seed.rs b/servers/src/grin/seed.rs index 5ccd9af99e..3ad3fa28ac 100644 --- a/servers/src/grin/seed.rs +++ b/servers/src/grin/seed.rs @@ -185,9 +185,12 @@ fn monitor_peers( ); // maintenance step first, clean up p2p server peers - peers.clean_peers(config.peer_max_count() as usize); + peers.clean_peers( + config.peer_max_inbound_count() as usize, + config.peer_max_outbound_count() as usize, + ); - if peers.healthy_peers_mix() { + if peers.enough_outbound_peers() { return; } @@ -230,7 +233,7 @@ fn monitor_peers( let new_peers = peers.find_peers( p2p::State::Healthy, p2p::Capabilities::UNKNOWN, - config.peer_max_count() as usize, + config.peer_max_outbound_count() as usize, ); for p in new_peers.iter().filter(|p| !peers.is_known(p.addr)) { @@ -296,15 +299,18 @@ fn listen_for_addrs( let addrs: Vec = rx.try_iter().collect(); // If we have a healthy number of outbound peers then we are done here. - if peers.peer_count() > peers.peer_outbound_count() && peers.healthy_peers_mix() { + if peers.enough_outbound_peers() { return; } - // Try to connect to (up to max peers) peer addresses. + // Try to connect to (up to max outbound peers) peer addresses. // Note: We drained the rx queue earlier to keep it under control. // Even if there are many addresses to try we will only try a bounded number of them. let connect_min_interval = 30; - for addr in addrs.into_iter().take(p2p.config.peer_max_count() as usize) { + for addr in addrs + .into_iter() + .take(p2p.config.peer_max_outbound_count() as usize) + { // ignore the duplicate connecting to same peer within 30 seconds let now = Utc::now(); if let Some(last_connect_time) = connecting_history.get(&addr) { diff --git a/servers/src/grin/sync/syncer.rs b/servers/src/grin/sync/syncer.rs index 1564b4e8a1..81552b78a7 100644 --- a/servers/src/grin/sync/syncer.rs +++ b/servers/src/grin/sync/syncer.rs @@ -86,7 +86,7 @@ impl SyncRunner { // * timeout if wp > MIN_PEERS || (wp == 0 - && self.peers.enough_peers() + && self.peers.enough_outbound_peers() && head.total_difficulty > Difficulty::zero()) || n > wait_secs {