-
Notifications
You must be signed in to change notification settings - Fork 990
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Improve checking for p2p connection limits #2985
Changes from 4 commits
47c90b0
a0145a5
1e3ed81
7bfb54e
fb9b06e
d06a8e1
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -274,12 +274,18 @@ fn comments() -> HashMap<String, String> { | |
#how long a banned peer should stay banned | ||
#ban_window = 10800 | ||
|
||
#maximum number of peers | ||
#peer_max_count = 125 | ||
#maximum number of inbound peer connections | ||
#peer_max_inbound_count = 128 | ||
|
||
#preferred minimum number of peers (we'll actively keep trying to add peers | ||
#until we get to at least this number | ||
#peer_min_preferred_count = 8 | ||
#maximum number of outbound peer connections | ||
#peer_max_outbound_count = 8 | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What's the point of this? If this is greater than peer_min_preferred_outbound_count, do we continue to connect to more outbound peers? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. But let's say you have There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We may not after future refactoring to better clean up distinguishing incoming and outgoing connections. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 😕 You're saying we open outbound connections when broadcasting compact blocks? Where in the code does that take place, and what for? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. After taking a closer look new outbound connections are not open when broadcasting, my mistake. During testing there were cases where logic allowed opening new connections beyond the outgoing limit which got closed shortly after during |
||
|
||
#preferred minimum number of outbound peers (we'll actively keep trying to add peers | ||
#until we get to at least this number) | ||
#peer_min_preferred_outbound_count = 8 | ||
|
||
#amount of incoming connections temporarily allowed to exceed peer_max_inbound_count | ||
#peer_listener_buffer_count = 8 | ||
|
||
# 15 = Bit flags for FULL_NODE | ||
#This structure needs to be changed internally, to make it more configurable | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -126,13 +126,22 @@ impl Peers { | |
res | ||
} | ||
|
||
/// Get vec of peers we currently have an outgoing connection with. | ||
pub fn outgoing_connected_peers(&self) -> Vec<Arc<Peer>> { | ||
self.connected_peers() | ||
.into_iter() | ||
.filter(|x| x.info.is_outbound()) | ||
.collect() | ||
} | ||
|
||
/// Get vec of peers we currently have an incoming connection with. | ||
pub fn incoming_connected_peers(&self) -> Vec<Arc<Peer>> { | ||
self.connected_peers() | ||
.into_iter() | ||
.filter(|x| x.info.is_inbound()) | ||
.collect() | ||
} | ||
|
||
/// Get a peer we're connected to by address. | ||
pub fn get_connected_peer(&self, addr: PeerAddr) -> Option<Arc<Peer>> { | ||
let peers = match self.peers.try_read_for(LOCK_TIMEOUT) { | ||
|
@@ -155,6 +164,11 @@ impl Peers { | |
self.outgoing_connected_peers().len() as u32 | ||
} | ||
|
||
/// Number of inbound peers currently connected to. | ||
pub fn peer_inbound_count(&self) -> u32 { | ||
self.incoming_connected_peers().len() as u32 | ||
} | ||
|
||
// Return vec of connected peers that currently advertise more work | ||
// (total_difficulty) than we do. | ||
pub fn more_work_peers(&self) -> Result<Vec<Arc<Peer>>, chain::Error> { | ||
|
@@ -324,7 +338,8 @@ impl Peers { | |
/// A peer implementation may drop the broadcast request | ||
/// if it knows the remote peer already has the block. | ||
pub fn broadcast_compact_block(&self, b: &core::CompactBlock) { | ||
let num_peers = self.config.peer_max_count(); | ||
let num_peers = | ||
self.config.peer_max_inbound_count() + self.config.peer_max_outbound_count(); | ||
let count = self.broadcast("compact block", num_peers, |p| p.send_compact_block(b)); | ||
debug!( | ||
"broadcast_compact_block: {}, {} at {}, to {} peers, done.", | ||
|
@@ -341,7 +356,7 @@ impl Peers { | |
/// A peer implementation may drop the broadcast request | ||
/// if it knows the remote peer already has the header. | ||
pub fn broadcast_header(&self, bh: &core::BlockHeader) { | ||
let num_peers = self.config.peer_min_preferred_count(); | ||
let num_peers = self.config.peer_min_preferred_outbound_count(); | ||
let count = self.broadcast("header", num_peers, |p| p.send_header(bh)); | ||
debug!( | ||
"broadcast_header: {}, {} at {}, to {} peers, done.", | ||
|
@@ -358,7 +373,8 @@ impl Peers { | |
/// A peer implementation may drop the broadcast request | ||
/// if it knows the remote peer already has the transaction. | ||
pub fn broadcast_transaction(&self, tx: &core::Transaction) { | ||
let num_peers = self.config.peer_max_count(); | ||
let num_peers = | ||
self.config.peer_max_inbound_count() + self.config.peer_max_outbound_count(); | ||
let count = self.broadcast("transaction", num_peers, |p| p.send_transaction(tx)); | ||
debug!( | ||
"broadcast_transaction: {} to {} peers, done.", | ||
|
@@ -433,7 +449,7 @@ impl Peers { | |
/// Iterate over the peer list and prune all peers we have | ||
/// lost connection to or have been deemed problematic. | ||
/// Also avoid connected peer count getting too high. | ||
pub fn clean_peers(&self, max_count: usize) { | ||
pub fn clean_peers(&self, max_inbound_count: usize, max_outbound_count: usize) { | ||
let mut rm = vec![]; | ||
|
||
// build a list of peers to be cleaned up | ||
|
@@ -477,16 +493,27 @@ impl Peers { | |
} | ||
} | ||
|
||
// ensure we do not still have too many connected peers | ||
let excess_count = (self.peer_count() as usize) | ||
.saturating_sub(rm.len()) | ||
.saturating_sub(max_count); | ||
if excess_count > 0 { | ||
// map peers to addrs in a block to bound how long we keep the read lock for | ||
// check here to make sure we don't have too many outgoing connections | ||
let excess_outgoing_count = | ||
(self.peer_outbound_count() as usize).saturating_sub(max_outbound_count); | ||
if excess_outgoing_count > 0 { | ||
let mut addrs = self | ||
.outgoing_connected_peers() | ||
.iter() | ||
.take(excess_outgoing_count) | ||
.map(|x| x.info.addr.clone()) | ||
.collect::<Vec<_>>(); | ||
rm.append(&mut addrs); | ||
} | ||
|
||
// check here to make sure we don't have too many incoming connections | ||
let excess_incoming_count = | ||
(self.peer_inbound_count() as usize).saturating_sub(max_inbound_count); | ||
if excess_incoming_count > 0 { | ||
let mut addrs = self | ||
.connected_peers() | ||
.incoming_connected_peers() | ||
.iter() | ||
.take(excess_count) | ||
.take(excess_incoming_count) | ||
.map(|x| x.info.addr.clone()) | ||
.collect::<Vec<_>>(); | ||
rm.append(&mut addrs); | ||
|
@@ -518,14 +545,9 @@ impl Peers { | |
} | ||
} | ||
|
||
pub fn enough_peers(&self) -> bool { | ||
self.peer_count() >= self.config.peer_min_preferred_count() | ||
} | ||
|
||
/// We have enough peers, both total connected and outbound connected | ||
pub fn healthy_peers_mix(&self) -> bool { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'd advise changing the name of this now, since "healthy_peers_mix" doesn't really represent what it's checking. Possible options are something like There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The naming here wasn't immediately intuitive to me either. My understanding is that it is a result of needing to maintain a healthy mix of outbound connections for dandelion to allow each peer to select a random outbound relay to send txs along the stem. Without ensuring a "healthy peers mix" apparently some nodes were not having enough slots for outbound connections. With that context it makes a little more sense to me- a healthy peers mix is one There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yea, I agree that's where the name came from, but you've now just changed the functionality, so my claim is the name no longer makes sense. Before, it used to compare inbound and outbound count so it made sense, but it's no longer about having a healthy mix, and is now just about making sure we have a minimum number of outbound. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Agreed, it could make sense to use a more descriptive name about functionality rather than a prescriptive name about health. Updated in d06a8e1 |
||
self.enough_peers() | ||
&& self.peer_outbound_count() >= self.config.peer_min_preferred_count() / 2 | ||
self.peer_outbound_count() >= self.config.peer_min_preferred_outbound_count() | ||
} | ||
|
||
/// Removes those peers that seem to have expired | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -185,7 +185,10 @@ fn monitor_peers( | |
); | ||
|
||
// maintenance step first, clean up p2p server peers | ||
peers.clean_peers(config.peer_max_count() as usize); | ||
peers.clean_peers( | ||
config.peer_max_inbound_count() as usize, | ||
config.peer_max_outbound_count() as usize, | ||
); | ||
|
||
if peers.healthy_peers_mix() { | ||
return; | ||
|
@@ -230,7 +233,7 @@ fn monitor_peers( | |
let new_peers = peers.find_peers( | ||
p2p::State::Healthy, | ||
p2p::Capabilities::UNKNOWN, | ||
config.peer_max_count() as usize, | ||
config.peer_max_outbound_count() as usize, | ||
); | ||
|
||
for p in new_peers.iter().filter(|p| !peers.is_known(p.addr)) { | ||
|
@@ -300,11 +303,14 @@ fn listen_for_addrs( | |
return; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think we can remove the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. That makes sense. They don't quite check the same thing but I think we only want the Removed in fb9b06e unless there is a good reason to keep this as is. |
||
} | ||
|
||
// Try to connect to (up to max peers) peer addresses. | ||
// Try to connect to (up to max outbound peers) peer addresses. | ||
// Note: We drained the rx queue earlier to keep it under control. | ||
// Even if there are many addresses to try we will only try a bounded number of them. | ||
let connect_min_interval = 30; | ||
for addr in addrs.into_iter().take(p2p.config.peer_max_count() as usize) { | ||
for addr in addrs | ||
.into_iter() | ||
.take(p2p.config.peer_max_outbound_count() as usize) | ||
{ | ||
// ignore the duplicate connecting to same peer within 30 seconds | ||
let now = Utc::now(); | ||
if let Some(last_connect_time) = connecting_history.get(&addr) { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍