Skip to content

Commit

Permalink
Improve checking for p2p connection limits (#2985)
Browse files Browse the repository at this point in the history
* Add check for p2p connection limits

* Simplify undesirable connection shutdown

* Make inbound and outbound connections more explicit

* Cleanup inbound and outbound connections

* Cleanup an outbound peers check

* Rename healthy_peers_mix to enough_outbound_peers
  • Loading branch information
j01tz authored and antiochp committed Aug 21, 2019
1 parent ea02338 commit 24f0a52
Show file tree
Hide file tree
Showing 6 changed files with 133 additions and 58 deletions.
16 changes: 11 additions & 5 deletions config/src/comments.rs
Original file line number Diff line number Diff line change
Expand Up @@ -274,12 +274,18 @@ fn comments() -> HashMap<String, String> {
#how long a banned peer should stay banned
#ban_window = 10800
#maximum number of peers
#peer_max_count = 125
#maximum number of inbound peer connections
#peer_max_inbound_count = 128
#preferred minimum number of peers (we'll actively keep trying to add peers
#until we get to at least this number
#peer_min_preferred_count = 8
#maximum number of outbound peer connections
#peer_max_outbound_count = 8
#preferred minimum number of outbound peers (we'll actively keep trying to add peers
#until we get to at least this number)
#peer_min_preferred_outbound_count = 8
#amount of incoming connections temporarily allowed to exceed peer_max_inbound_count
#peer_listener_buffer_count = 8
# 15 = Bit flags for FULL_NODE
#This structure needs to be changed internally, to make it more configurable
Expand Down
62 changes: 42 additions & 20 deletions p2p/src/peers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,13 +126,22 @@ impl Peers {
res
}

/// Get vec of peers we currently have an outgoing connection with.
pub fn outgoing_connected_peers(&self) -> Vec<Arc<Peer>> {
self.connected_peers()
.into_iter()
.filter(|x| x.info.is_outbound())
.collect()
}

/// Get vec of peers we currently have an incoming connection with.
pub fn incoming_connected_peers(&self) -> Vec<Arc<Peer>> {
self.connected_peers()
.into_iter()
.filter(|x| x.info.is_inbound())
.collect()
}

/// Get a peer we're connected to by address.
pub fn get_connected_peer(&self, addr: PeerAddr) -> Option<Arc<Peer>> {
let peers = match self.peers.try_read_for(LOCK_TIMEOUT) {
Expand All @@ -155,6 +164,11 @@ impl Peers {
self.outgoing_connected_peers().len() as u32
}

/// Number of inbound peers currently connected to.
pub fn peer_inbound_count(&self) -> u32 {
self.incoming_connected_peers().len() as u32
}

// Return vec of connected peers that currently advertise more work
// (total_difficulty) than we do.
pub fn more_work_peers(&self) -> Result<Vec<Arc<Peer>>, chain::Error> {
Expand Down Expand Up @@ -324,7 +338,8 @@ impl Peers {
/// A peer implementation may drop the broadcast request
/// if it knows the remote peer already has the block.
pub fn broadcast_compact_block(&self, b: &core::CompactBlock) {
let num_peers = self.config.peer_max_count();
let num_peers =
self.config.peer_max_inbound_count() + self.config.peer_max_outbound_count();
let count = self.broadcast("compact block", num_peers, |p| p.send_compact_block(b));
debug!(
"broadcast_compact_block: {}, {} at {}, to {} peers, done.",
Expand All @@ -341,7 +356,7 @@ impl Peers {
/// A peer implementation may drop the broadcast request
/// if it knows the remote peer already has the header.
pub fn broadcast_header(&self, bh: &core::BlockHeader) {
let num_peers = self.config.peer_min_preferred_count();
let num_peers = self.config.peer_min_preferred_outbound_count();
let count = self.broadcast("header", num_peers, |p| p.send_header(bh));
debug!(
"broadcast_header: {}, {} at {}, to {} peers, done.",
Expand All @@ -358,7 +373,8 @@ impl Peers {
/// A peer implementation may drop the broadcast request
/// if it knows the remote peer already has the transaction.
pub fn broadcast_transaction(&self, tx: &core::Transaction) {
let num_peers = self.config.peer_max_count();
let num_peers =
self.config.peer_max_inbound_count() + self.config.peer_max_outbound_count();
let count = self.broadcast("transaction", num_peers, |p| p.send_transaction(tx));
debug!(
"broadcast_transaction: {} to {} peers, done.",
Expand Down Expand Up @@ -433,7 +449,7 @@ impl Peers {
/// Iterate over the peer list and prune all peers we have
/// lost connection to or have been deemed problematic.
/// Also avoid connected peer count getting too high.
pub fn clean_peers(&self, max_count: usize) {
pub fn clean_peers(&self, max_inbound_count: usize, max_outbound_count: usize) {
let mut rm = vec![];

// build a list of peers to be cleaned up
Expand Down Expand Up @@ -477,16 +493,27 @@ impl Peers {
}
}

// ensure we do not still have too many connected peers
let excess_count = (self.peer_count() as usize)
.saturating_sub(rm.len())
.saturating_sub(max_count);
if excess_count > 0 {
// map peers to addrs in a block to bound how long we keep the read lock for
// check here to make sure we don't have too many outgoing connections
let excess_outgoing_count =
(self.peer_outbound_count() as usize).saturating_sub(max_outbound_count);
if excess_outgoing_count > 0 {
let mut addrs = self
.outgoing_connected_peers()
.iter()
.take(excess_outgoing_count)
.map(|x| x.info.addr.clone())
.collect::<Vec<_>>();
rm.append(&mut addrs);
}

// check here to make sure we don't have too many incoming connections
let excess_incoming_count =
(self.peer_inbound_count() as usize).saturating_sub(max_inbound_count);
if excess_incoming_count > 0 {
let mut addrs = self
.connected_peers()
.incoming_connected_peers()
.iter()
.take(excess_count)
.take(excess_incoming_count)
.map(|x| x.info.addr.clone())
.collect::<Vec<_>>();
rm.append(&mut addrs);
Expand Down Expand Up @@ -518,14 +545,9 @@ impl Peers {
}
}

pub fn enough_peers(&self) -> bool {
self.peer_count() >= self.config.peer_min_preferred_count()
}

/// We have enough peers, both total connected and outbound connected
pub fn healthy_peers_mix(&self) -> bool {
self.enough_peers()
&& self.peer_outbound_count() >= self.config.peer_min_preferred_count() / 2
/// We have enough outbound connected peers
pub fn enough_outbound_peers(&self) -> bool {
self.peer_outbound_count() >= self.config.peer_min_preferred_outbound_count()
}

/// Removes those peers that seem to have expired
Expand Down
28 changes: 18 additions & 10 deletions p2p/src/serv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,10 @@ impl Server {
let peer_addr = PeerAddr(peer_addr);

if self.check_undesirable(&stream) {
// Shutdown the incoming TCP connection if it is not desired
if let Err(e) = stream.shutdown(Shutdown::Both) {
debug!("Error shutting down conn: {:?}", e);
}
continue;
}
match self.handle_new_peer(stream) {
Expand Down Expand Up @@ -194,30 +198,34 @@ impl Server {
Ok(())
}

/// Checks whether there's any reason we don't want to accept a peer
/// connection. There can be a couple of them:
/// 1. The peer has been previously banned and the ban period hasn't
/// Checks whether there's any reason we don't want to accept an incoming peer
/// connection. There can be a few of them:
/// 1. Accepting the peer connection would exceed the configured maximum allowed
/// inbound peer count. Note that seed nodes may wish to increase the default
/// value for PEER_LISTENER_BUFFER_COUNT to help with network bootstrapping.
/// A default buffer of 8 peers is allowed to help with network growth.
/// 2. The peer has been previously banned and the ban period hasn't
/// expired yet.
/// 2. We're already connected to a peer at the same IP. While there are
/// 3. We're already connected to a peer at the same IP. While there are
/// many reasons multiple peers can legitimately share identical IP
/// addresses (NAT), network distribution is improved if they choose
/// different sets of peers themselves. In addition, it prevent potential
/// duplicate connections, malicious or not.
fn check_undesirable(&self, stream: &TcpStream) -> bool {
if self.peers.peer_inbound_count()
>= self.config.peer_max_inbound_count() + self.config.peer_listener_buffer_count()
{
debug!("Accepting new connection will exceed peer limit, refusing connection.");
return true;
}
if let Ok(peer_addr) = stream.peer_addr() {
let peer_addr = PeerAddr(peer_addr);
if self.peers.is_banned(peer_addr) {
debug!("Peer {} banned, refusing connection.", peer_addr);
if let Err(e) = stream.shutdown(Shutdown::Both) {
debug!("Error shutting down conn: {:?}", e);
}
return true;
}
if self.peers.is_known(peer_addr) {
debug!("Peer {} already known, refusing connection.", peer_addr);
if let Err(e) = stream.shutdown(Shutdown::Both) {
debug!("Error shutting down conn: {:?}", e);
}
return true;
}
}
Expand Down
65 changes: 49 additions & 16 deletions p2p/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,18 @@ pub const MAX_LOCATORS: u32 = 20;
/// How long a banned peer should be banned for
const BAN_WINDOW: i64 = 10800;

/// The max peer count
const PEER_MAX_COUNT: u32 = 125;
/// The max inbound peer count
const PEER_MAX_INBOUND_COUNT: u32 = 128;

/// min preferred peer count
const PEER_MIN_PREFERRED_COUNT: u32 = 8;
/// The max outbound peer count
const PEER_MAX_OUTBOUND_COUNT: u32 = 8;

/// The min preferred outbound peer count
const PEER_MIN_PREFERRED_OUTBOUND_COUNT: u32 = 8;

/// The peer listener buffer count. Allows temporarily accepting more connections
/// than allowed by PEER_MAX_INBOUND_COUNT to encourage network bootstrapping.
const PEER_LISTENER_BUFFER_COUNT: u32 = 8;

#[derive(Debug)]
pub enum Error {
Expand Down Expand Up @@ -229,9 +236,13 @@ pub struct P2PConfig {

pub ban_window: Option<i64>,

pub peer_max_count: Option<u32>,
pub peer_max_inbound_count: Option<u32>,

pub peer_max_outbound_count: Option<u32>,

pub peer_min_preferred_outbound_count: Option<u32>,

pub peer_min_preferred_count: Option<u32>,
pub peer_listener_buffer_count: Option<u32>,

pub dandelion_peer: Option<PeerAddr>,
}
Expand All @@ -250,8 +261,10 @@ impl Default for P2PConfig {
peers_deny: None,
peers_preferred: None,
ban_window: None,
peer_max_count: None,
peer_min_preferred_count: None,
peer_max_inbound_count: None,
peer_max_outbound_count: None,
peer_min_preferred_outbound_count: None,
peer_listener_buffer_count: None,
dandelion_peer: None,
}
}
Expand All @@ -268,19 +281,35 @@ impl P2PConfig {
}
}

/// return peer_max_count
pub fn peer_max_count(&self) -> u32 {
match self.peer_max_count {
/// return maximum inbound peer connections count
pub fn peer_max_inbound_count(&self) -> u32 {
match self.peer_max_inbound_count {
Some(n) => n,
None => PEER_MAX_INBOUND_COUNT,
}
}

/// return maximum outbound peer connections count
pub fn peer_max_outbound_count(&self) -> u32 {
match self.peer_max_outbound_count {
Some(n) => n,
None => PEER_MAX_COUNT,
None => PEER_MAX_OUTBOUND_COUNT,
}
}

/// return peer_preferred_count
pub fn peer_min_preferred_count(&self) -> u32 {
match self.peer_min_preferred_count {
/// return minimum preferred outbound peer count
pub fn peer_min_preferred_outbound_count(&self) -> u32 {
match self.peer_min_preferred_outbound_count {
Some(n) => n,
None => PEER_MIN_PREFERRED_COUNT,
None => PEER_MIN_PREFERRED_OUTBOUND_COUNT,
}
}

/// return peer buffer count for listener
pub fn peer_listener_buffer_count(&self) -> u32 {
match self.peer_listener_buffer_count {
Some(n) => n,
None => PEER_LISTENER_BUFFER_COUNT,
}
}
}
Expand Down Expand Up @@ -398,6 +427,10 @@ impl PeerInfo {
self.direction == Direction::Outbound
}

pub fn is_inbound(&self) -> bool {
self.direction == Direction::Inbound
}

/// The current height of the peer.
pub fn height(&self) -> u64 {
self.live_info.read().height
Expand Down
18 changes: 12 additions & 6 deletions servers/src/grin/seed.rs
Original file line number Diff line number Diff line change
Expand Up @@ -185,9 +185,12 @@ fn monitor_peers(
);

// maintenance step first, clean up p2p server peers
peers.clean_peers(config.peer_max_count() as usize);
peers.clean_peers(
config.peer_max_inbound_count() as usize,
config.peer_max_outbound_count() as usize,
);

if peers.healthy_peers_mix() {
if peers.enough_outbound_peers() {
return;
}

Expand Down Expand Up @@ -230,7 +233,7 @@ fn monitor_peers(
let new_peers = peers.find_peers(
p2p::State::Healthy,
p2p::Capabilities::UNKNOWN,
config.peer_max_count() as usize,
config.peer_max_outbound_count() as usize,
);

for p in new_peers.iter().filter(|p| !peers.is_known(p.addr)) {
Expand Down Expand Up @@ -296,15 +299,18 @@ fn listen_for_addrs(
let addrs: Vec<PeerAddr> = rx.try_iter().collect();

// If we have a healthy number of outbound peers then we are done here.
if peers.peer_count() > peers.peer_outbound_count() && peers.healthy_peers_mix() {
if peers.enough_outbound_peers() {
return;
}

// Try to connect to (up to max peers) peer addresses.
// Try to connect to (up to max outbound peers) peer addresses.
// Note: We drained the rx queue earlier to keep it under control.
// Even if there are many addresses to try we will only try a bounded number of them.
let connect_min_interval = 30;
for addr in addrs.into_iter().take(p2p.config.peer_max_count() as usize) {
for addr in addrs
.into_iter()
.take(p2p.config.peer_max_outbound_count() as usize)
{
// ignore the duplicate connecting to same peer within 30 seconds
let now = Utc::now();
if let Some(last_connect_time) = connecting_history.get(&addr) {
Expand Down
2 changes: 1 addition & 1 deletion servers/src/grin/sync/syncer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ impl SyncRunner {
// * timeout
if wp > MIN_PEERS
|| (wp == 0
&& self.peers.enough_peers()
&& self.peers.enough_outbound_peers()
&& head.total_difficulty > Difficulty::zero())
|| n > wait_secs
{
Expand Down

0 comments on commit 24f0a52

Please sign in to comment.