Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve checking for p2p connection limits #2985

Merged
merged 6 commits into from
Aug 21, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 11 additions & 5 deletions config/src/comments.rs
Original file line number Diff line number Diff line change
Expand Up @@ -274,12 +274,18 @@ fn comments() -> HashMap<String, String> {
#how long a banned peer should stay banned
#ban_window = 10800

#maximum number of peers
#peer_max_count = 125
#maximum number of inbound peer connections
#peer_max_inbound_count = 128

#preferred minimum number of peers (we'll actively keep trying to add peers
#until we get to at least this number
#peer_min_preferred_count = 8
#maximum number of outbound peer connections
#peer_max_outbound_count = 8
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the point of this? If this is greater than peer_min_preferred_outbound_count, do we continue to connect to more outbound peers?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

peer_max_outbound_count lets us put a configurable limit on how many outgoing connections with peers we can have and by extension all connections. We continue to try to connect to more outbound peers until peer_min_preferred_outbound_count is reached (a "healthy peers mix"). By default these values are the same (we try to maintain connections on all available outgoing slots)

Copy link
Contributor

@DavidBurkett DavidBurkett Aug 15, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But let's say you have peer_min_preferred_outbound_count of 8, and peer_max_outbound_count of 16, how would you ever end up with more than 8 outbound connections?

Copy link
Member Author

@j01tz j01tz Aug 15, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We may not after future refactoring to better clean up distinguishing incoming and outgoing connections. For now it happens when we broadcast compact blocks for example. There are probably still cases where outgoing connections are opened for one reason or another, exceeding the peer_max_outbound_count at least until clean_peers() is run to clean up any potential excess connections. In this case if peer_max_outbound_count was set to 16, I think we would hang on to those connections instead of dropping them when cleaning up peers.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

😕 You're saying we open outbound connections when broadcasting compact blocks? Where in the code does that take place, and what for?

Copy link
Member Author

@j01tz j01tz Aug 15, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After taking a closer look new outbound connections are not open when broadcasting, my mistake. During testing there were cases where logic allowed opening new connections beyond the outgoing limit which got closed shortly after during clean_peers(). You can set some test limits and watch logs to see the outgoing connections be temporarily exceeded. Hopefully as we better define incoming and outgoing connections in future PRs these cases can all be thoughtfully handled (as in the broadcast scenarios)


#preferred minimum number of outbound peers (we'll actively keep trying to add peers
#until we get to at least this number)
#peer_min_preferred_outbound_count = 8

#amount of incoming connections temporarily allowed to exceed peer_max_inbound_count
#peer_listener_buffer_count = 8

# 15 = Bit flags for FULL_NODE
#This structure needs to be changed internally, to make it more configurable
Expand Down
62 changes: 42 additions & 20 deletions p2p/src/peers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,13 +126,22 @@ impl Peers {
res
}

/// Get vec of peers we currently have an outgoing connection with.
pub fn outgoing_connected_peers(&self) -> Vec<Arc<Peer>> {
self.connected_peers()
.into_iter()
.filter(|x| x.info.is_outbound())
.collect()
}

/// Get vec of peers we currently have an incoming connection with.
pub fn incoming_connected_peers(&self) -> Vec<Arc<Peer>> {
self.connected_peers()
.into_iter()
.filter(|x| x.info.is_inbound())
.collect()
}

/// Get a peer we're connected to by address.
pub fn get_connected_peer(&self, addr: PeerAddr) -> Option<Arc<Peer>> {
let peers = match self.peers.try_read_for(LOCK_TIMEOUT) {
Expand All @@ -155,6 +164,11 @@ impl Peers {
self.outgoing_connected_peers().len() as u32
}

/// Number of inbound peers currently connected to.
pub fn peer_inbound_count(&self) -> u32 {
self.incoming_connected_peers().len() as u32
}

// Return vec of connected peers that currently advertise more work
// (total_difficulty) than we do.
pub fn more_work_peers(&self) -> Result<Vec<Arc<Peer>>, chain::Error> {
Expand Down Expand Up @@ -324,7 +338,8 @@ impl Peers {
/// A peer implementation may drop the broadcast request
/// if it knows the remote peer already has the block.
pub fn broadcast_compact_block(&self, b: &core::CompactBlock) {
let num_peers = self.config.peer_max_count();
let num_peers =
self.config.peer_max_inbound_count() + self.config.peer_max_outbound_count();
let count = self.broadcast("compact block", num_peers, |p| p.send_compact_block(b));
debug!(
"broadcast_compact_block: {}, {} at {}, to {} peers, done.",
Expand All @@ -341,7 +356,7 @@ impl Peers {
/// A peer implementation may drop the broadcast request
/// if it knows the remote peer already has the header.
pub fn broadcast_header(&self, bh: &core::BlockHeader) {
let num_peers = self.config.peer_min_preferred_count();
let num_peers = self.config.peer_min_preferred_outbound_count();
let count = self.broadcast("header", num_peers, |p| p.send_header(bh));
debug!(
"broadcast_header: {}, {} at {}, to {} peers, done.",
Expand All @@ -358,7 +373,8 @@ impl Peers {
/// A peer implementation may drop the broadcast request
/// if it knows the remote peer already has the transaction.
pub fn broadcast_transaction(&self, tx: &core::Transaction) {
let num_peers = self.config.peer_max_count();
let num_peers =
self.config.peer_max_inbound_count() + self.config.peer_max_outbound_count();
let count = self.broadcast("transaction", num_peers, |p| p.send_transaction(tx));
debug!(
"broadcast_transaction: {} to {} peers, done.",
Expand Down Expand Up @@ -433,7 +449,7 @@ impl Peers {
/// Iterate over the peer list and prune all peers we have
/// lost connection to or have been deemed problematic.
/// Also avoid connected peer count getting too high.
pub fn clean_peers(&self, max_count: usize) {
pub fn clean_peers(&self, max_inbound_count: usize, max_outbound_count: usize) {
let mut rm = vec![];

// build a list of peers to be cleaned up
Expand Down Expand Up @@ -477,16 +493,27 @@ impl Peers {
}
}

// ensure we do not still have too many connected peers
let excess_count = (self.peer_count() as usize)
.saturating_sub(rm.len())
.saturating_sub(max_count);
if excess_count > 0 {
// map peers to addrs in a block to bound how long we keep the read lock for
// check here to make sure we don't have too many outgoing connections
let excess_outgoing_count =
(self.peer_outbound_count() as usize).saturating_sub(max_outbound_count);
if excess_outgoing_count > 0 {
let mut addrs = self
.outgoing_connected_peers()
.iter()
.take(excess_outgoing_count)
.map(|x| x.info.addr.clone())
.collect::<Vec<_>>();
rm.append(&mut addrs);
}

// check here to make sure we don't have too many incoming connections
let excess_incoming_count =
(self.peer_inbound_count() as usize).saturating_sub(max_inbound_count);
if excess_incoming_count > 0 {
let mut addrs = self
.connected_peers()
.incoming_connected_peers()
.iter()
.take(excess_count)
.take(excess_incoming_count)
.map(|x| x.info.addr.clone())
.collect::<Vec<_>>();
rm.append(&mut addrs);
Expand Down Expand Up @@ -518,14 +545,9 @@ impl Peers {
}
}

pub fn enough_peers(&self) -> bool {
self.peer_count() >= self.config.peer_min_preferred_count()
}

/// We have enough peers, both total connected and outbound connected
pub fn healthy_peers_mix(&self) -> bool {
self.enough_peers()
&& self.peer_outbound_count() >= self.config.peer_min_preferred_count() / 2
/// We have enough outbound connected peers
pub fn enough_outbound_peers(&self) -> bool {
self.peer_outbound_count() >= self.config.peer_min_preferred_outbound_count()
}

/// Removes those peers that seem to have expired
Expand Down
28 changes: 18 additions & 10 deletions p2p/src/serv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,10 @@ impl Server {
let peer_addr = PeerAddr(peer_addr);

if self.check_undesirable(&stream) {
// Shutdown the incoming TCP connection if it is not desired
if let Err(e) = stream.shutdown(Shutdown::Both) {
debug!("Error shutting down conn: {:?}", e);
}
continue;
}
match self.handle_new_peer(stream) {
Expand Down Expand Up @@ -194,30 +198,34 @@ impl Server {
Ok(())
}

/// Checks whether there's any reason we don't want to accept a peer
/// connection. There can be a couple of them:
/// 1. The peer has been previously banned and the ban period hasn't
/// Checks whether there's any reason we don't want to accept an incoming peer
/// connection. There can be a few of them:
/// 1. Accepting the peer connection would exceed the configured maximum allowed
/// inbound peer count. Note that seed nodes may wish to increase the default
/// value for PEER_LISTENER_BUFFER_COUNT to help with network bootstrapping.
/// A default buffer of 8 peers is allowed to help with network growth.
/// 2. The peer has been previously banned and the ban period hasn't
/// expired yet.
/// 2. We're already connected to a peer at the same IP. While there are
/// 3. We're already connected to a peer at the same IP. While there are
/// many reasons multiple peers can legitimately share identical IP
/// addresses (NAT), network distribution is improved if they choose
/// different sets of peers themselves. In addition, it prevent potential
/// duplicate connections, malicious or not.
fn check_undesirable(&self, stream: &TcpStream) -> bool {
if self.peers.peer_inbound_count()
>= self.config.peer_max_inbound_count() + self.config.peer_listener_buffer_count()
{
debug!("Accepting new connection will exceed peer limit, refusing connection.");
return true;
}
if let Ok(peer_addr) = stream.peer_addr() {
let peer_addr = PeerAddr(peer_addr);
if self.peers.is_banned(peer_addr) {
debug!("Peer {} banned, refusing connection.", peer_addr);
if let Err(e) = stream.shutdown(Shutdown::Both) {
debug!("Error shutting down conn: {:?}", e);
}
return true;
}
if self.peers.is_known(peer_addr) {
debug!("Peer {} already known, refusing connection.", peer_addr);
if let Err(e) = stream.shutdown(Shutdown::Both) {
debug!("Error shutting down conn: {:?}", e);
}
return true;
}
}
Expand Down
65 changes: 49 additions & 16 deletions p2p/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,18 @@ pub const MAX_LOCATORS: u32 = 20;
/// How long a banned peer should be banned for
const BAN_WINDOW: i64 = 10800;

/// The max peer count
const PEER_MAX_COUNT: u32 = 125;
/// The max inbound peer count
const PEER_MAX_INBOUND_COUNT: u32 = 128;

/// min preferred peer count
const PEER_MIN_PREFERRED_COUNT: u32 = 8;
/// The max outbound peer count
const PEER_MAX_OUTBOUND_COUNT: u32 = 8;

/// The min preferred outbound peer count
const PEER_MIN_PREFERRED_OUTBOUND_COUNT: u32 = 8;

/// The peer listener buffer count. Allows temporarily accepting more connections
/// than allowed by PEER_MAX_INBOUND_COUNT to encourage network bootstrapping.
const PEER_LISTENER_BUFFER_COUNT: u32 = 8;

#[derive(Debug)]
pub enum Error {
Expand Down Expand Up @@ -229,9 +236,13 @@ pub struct P2PConfig {

pub ban_window: Option<i64>,

pub peer_max_count: Option<u32>,
pub peer_max_inbound_count: Option<u32>,

pub peer_max_outbound_count: Option<u32>,

pub peer_min_preferred_outbound_count: Option<u32>,

pub peer_min_preferred_count: Option<u32>,
pub peer_listener_buffer_count: Option<u32>,

pub dandelion_peer: Option<PeerAddr>,
}
Expand All @@ -250,8 +261,10 @@ impl Default for P2PConfig {
peers_deny: None,
peers_preferred: None,
ban_window: None,
peer_max_count: None,
peer_min_preferred_count: None,
peer_max_inbound_count: None,
peer_max_outbound_count: None,
peer_min_preferred_outbound_count: None,
peer_listener_buffer_count: None,
dandelion_peer: None,
}
}
Expand All @@ -268,19 +281,35 @@ impl P2PConfig {
}
}

/// return peer_max_count
pub fn peer_max_count(&self) -> u32 {
match self.peer_max_count {
/// return maximum inbound peer connections count
pub fn peer_max_inbound_count(&self) -> u32 {
match self.peer_max_inbound_count {
Some(n) => n,
None => PEER_MAX_INBOUND_COUNT,
}
}

/// return maximum outbound peer connections count
pub fn peer_max_outbound_count(&self) -> u32 {
match self.peer_max_outbound_count {
Some(n) => n,
None => PEER_MAX_COUNT,
None => PEER_MAX_OUTBOUND_COUNT,
}
}

/// return peer_preferred_count
pub fn peer_min_preferred_count(&self) -> u32 {
match self.peer_min_preferred_count {
/// return minimum preferred outbound peer count
pub fn peer_min_preferred_outbound_count(&self) -> u32 {
match self.peer_min_preferred_outbound_count {
Some(n) => n,
None => PEER_MIN_PREFERRED_COUNT,
None => PEER_MIN_PREFERRED_OUTBOUND_COUNT,
}
}

/// return peer buffer count for listener
pub fn peer_listener_buffer_count(&self) -> u32 {
match self.peer_listener_buffer_count {
Some(n) => n,
None => PEER_LISTENER_BUFFER_COUNT,
}
}
}
Expand Down Expand Up @@ -398,6 +427,10 @@ impl PeerInfo {
self.direction == Direction::Outbound
}

pub fn is_inbound(&self) -> bool {
self.direction == Direction::Inbound
}

/// The current height of the peer.
pub fn height(&self) -> u64 {
self.live_info.read().height
Expand Down
18 changes: 12 additions & 6 deletions servers/src/grin/seed.rs
Original file line number Diff line number Diff line change
Expand Up @@ -185,9 +185,12 @@ fn monitor_peers(
);

// maintenance step first, clean up p2p server peers
peers.clean_peers(config.peer_max_count() as usize);
peers.clean_peers(
config.peer_max_inbound_count() as usize,
config.peer_max_outbound_count() as usize,
);

if peers.healthy_peers_mix() {
if peers.enough_outbound_peers() {
return;
}

Expand Down Expand Up @@ -230,7 +233,7 @@ fn monitor_peers(
let new_peers = peers.find_peers(
p2p::State::Healthy,
p2p::Capabilities::UNKNOWN,
config.peer_max_count() as usize,
config.peer_max_outbound_count() as usize,
);

for p in new_peers.iter().filter(|p| !peers.is_known(p.addr)) {
Expand Down Expand Up @@ -296,15 +299,18 @@ fn listen_for_addrs(
let addrs: Vec<PeerAddr> = rx.try_iter().collect();

// If we have a healthy number of outbound peers then we are done here.
if peers.peer_count() > peers.peer_outbound_count() && peers.healthy_peers_mix() {
if peers.enough_outbound_peers() {
return;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can remove the peers.peer_count() > peers.peer_outbound_count() check on line 302, since healthy_peers_mix() covers it.

Copy link
Member Author

@j01tz j01tz Aug 15, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That makes sense. They don't quite check the same thing but I think we only want the healthy_peers_mix() check here as you said. I'm pretty sure we don't care to check here that the total connected peers exceeds the outbound connected peers. This would require (I think) that we have inbound connections, and I'm pretty sure we want to return here any time our outbound connections >= our configured preferred minimum outbound connections ("a healthy number of outbound peers").

Removed in fb9b06e unless there is a good reason to keep this as is.

}

// Try to connect to (up to max peers) peer addresses.
// Try to connect to (up to max outbound peers) peer addresses.
// Note: We drained the rx queue earlier to keep it under control.
// Even if there are many addresses to try we will only try a bounded number of them.
let connect_min_interval = 30;
for addr in addrs.into_iter().take(p2p.config.peer_max_count() as usize) {
for addr in addrs
.into_iter()
.take(p2p.config.peer_max_outbound_count() as usize)
{
// ignore the duplicate connecting to same peer within 30 seconds
let now = Utc::now();
if let Some(last_connect_time) = connecting_history.get(&addr) {
Expand Down
2 changes: 1 addition & 1 deletion servers/src/grin/sync/syncer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ impl SyncRunner {
// * timeout
if wp > MIN_PEERS
|| (wp == 0
&& self.peers.enough_peers()
&& self.peers.enough_outbound_peers()
&& head.total_difficulty > Difficulty::zero())
|| n > wait_secs
{
Expand Down