diff --git a/core/CHANGELOG.md b/core/CHANGELOG.md index 724c55a0cda..071218bc571 100644 --- a/core/CHANGELOG.md +++ b/core/CHANGELOG.md @@ -1,5 +1,14 @@ # 0.25.0 [unreleased] +- The `NetworkConfig` API is now a builder that moves `self`. + [PR 1848](https://github.com/libp2p/rust-libp2p/pull/1848/). + +- New configurable connection limits for established connections and + dedicated connection counters. Removed the connection limit dedicated + to outgoing pending connection _per peer_. Connection limits are now + represented by `u32` intead of `usize` types. + [PR 1848](https://github.com/libp2p/rust-libp2p/pull/1848/). + - Update `multihash`. - Update `multistream-select`. diff --git a/core/src/connection.rs b/core/src/connection.rs index 9908c64589d..f64801b944c 100644 --- a/core/src/connection.rs +++ b/core/src/connection.rs @@ -32,6 +32,7 @@ pub use listeners::{ListenerId, ListenersStream, ListenersEvent}; pub use manager::ConnectionId; pub use substream::{Substream, SubstreamEndpoint, Close}; pub use pool::{EstablishedConnection, EstablishedConnectionIter, PendingConnection}; +pub use pool::{ConnectionLimits, ConnectionCounters}; use crate::muxing::StreamMuxer; use crate::{Multiaddr, PeerId}; @@ -326,9 +327,9 @@ impl<'a> OutgoingInfo<'a> { #[derive(Debug, Clone)] pub struct ConnectionLimit { /// The maximum number of connections. - pub limit: usize, + pub limit: u32, /// The current number of connections. - pub current: usize, + pub current: u32, } impl fmt::Display for ConnectionLimit { diff --git a/core/src/connection/pool.rs b/core/src/connection/pool.rs index 6c47704b28e..e40c937cbf6 100644 --- a/core/src/connection/pool.rs +++ b/core/src/connection/pool.rs @@ -48,8 +48,8 @@ use std::{convert::TryFrom as _, error, fmt, num::NonZeroU32, task::Context, tas pub struct Pool { local_id: PeerId, - /// The configuration of the pool. - limits: PoolLimits, + /// The connection counter(s). + counters: ConnectionCounters, /// The connection manager that handles the connection I/O for both /// established and pending connections. @@ -75,9 +75,8 @@ impl fmt::Debug for Pool { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> { - // TODO: More useful debug impl? f.debug_struct("Pool") - .field("limits", &self.limits) + .field("counters", &self.counters) .finish() } } @@ -183,13 +182,13 @@ where }, PoolEvent::ConnectionEvent { ref connection, ref event } => { f.debug_struct("PoolEvent::ConnectionEvent") - .field("conn_info", connection.info()) + .field("peer", connection.peer_id()) .field("event", event) .finish() }, PoolEvent::AddressChange { ref connection, ref new_endpoint, ref old_endpoint } => { f.debug_struct("PoolEvent::AddressChange") - .field("conn_info", connection.info()) + .field("peer", connection.peer_id()) .field("new_endpoint", new_endpoint) .field("old_endpoint", old_endpoint) .finish() @@ -205,11 +204,11 @@ impl pub fn new( local_id: PeerId, manager_config: ManagerConfig, - limits: PoolLimits + limits: ConnectionLimits ) -> Self { Pool { local_id, - limits, + counters: ConnectionCounters::new(limits), manager: Manager::new(manager_config), established: Default::default(), pending: Default::default(), @@ -217,9 +216,9 @@ impl } } - /// Gets the configured connection limits of the pool. - pub fn limits(&self) -> &PoolLimits { - &self.limits + /// Gets the dedicated connection counters. + pub fn counters(&self) -> &ConnectionCounters { + &self.counters } /// Adds a pending incoming connection to the pool in the form of a @@ -252,8 +251,8 @@ impl TMuxer: StreamMuxer + Send + Sync + 'static, TMuxer::OutboundSubstream: Send + 'static, { + self.counters.check_max_pending_incoming()?; let endpoint = info.to_connected_point(); - self.limits.check_incoming(|| self.iter_pending_incoming().count())?; Ok(self.add_pending(future, handler, endpoint, None)) } @@ -287,12 +286,7 @@ impl TMuxer: StreamMuxer + Send + Sync + 'static, TMuxer::OutboundSubstream: Send + 'static, { - self.limits.check_outgoing(|| self.iter_pending_outgoing().count())?; - - if let Some(peer) = &info.peer_id { - self.limits.check_outgoing_per_peer(|| self.num_peer_outgoing(peer))?; - } - + self.counters.check_max_pending_outgoing()?; let endpoint = info.to_connected_point(); Ok(self.add_pending(future, handler, endpoint, info.peer_id.cloned())) } @@ -350,6 +344,7 @@ impl }); let id = self.manager.add_pending(future, handler); + self.counters.inc_pending(&endpoint); self.pending.insert(id, (endpoint, peer)); id } @@ -377,13 +372,10 @@ impl TMuxer: StreamMuxer + Send + Sync + 'static, TMuxer::OutboundSubstream: Send + 'static, { - if let Some(limit) = self.limits.max_established_per_peer { - let current = self.num_peer_established(&i.peer_id); - if limit >= current { - return Err(ConnectionLimit { limit, current }) - } - } + self.counters.check_max_established(&i.endpoint)?; + self.counters.check_max_established_per_peer(self.num_peer_established(&i.peer_id))?; let id = self.manager.add(c, i.clone()); + self.counters.inc_established(&i.endpoint); self.established.entry(i.peer_id.clone()).or_default().insert(id, i.endpoint); Ok(id) } @@ -403,6 +395,7 @@ impl Some(PoolConnection::Pending(PendingConnection { entry, pending: &mut self.pending, + counters: &mut self.counters, })), None => None } @@ -429,6 +422,7 @@ impl Some(PendingConnection { entry, pending: &mut self.pending, + counters: &mut self.counters, }), _ => unreachable!("by consistency of `self.pending` with `self.manager`") } @@ -445,7 +439,7 @@ impl /// Returns the number of connected peers, i.e. those with at least one /// established connection in the pool. - pub fn num_connected(&self) -> usize { + pub fn num_peers(&self) -> usize { self.established.len() } @@ -462,7 +456,7 @@ impl if let Some(conns) = self.established.get(peer) { // Count upwards because we push to / pop from the end. See also `Pool::poll`. let mut num_established = 0; - for &id in conns.keys() { + for (&id, endpoint) in conns.iter() { match self.manager.entry(id) { Some(manager::Entry::Established(e)) => { let connected = e.remove(); @@ -473,6 +467,7 @@ impl }, _ => {} } + self.counters.dec_established(endpoint); } } self.established.remove(peer); @@ -490,30 +485,15 @@ impl } } for id in aborted { - self.pending.remove(&id); + if let Some((endpoint, _)) = self.pending.remove(&id) { + self.counters.dec_pending(&endpoint); + } } } - /// Counts the number of established connections in the pool. - pub fn num_established(&self) -> usize { - self.established.iter().fold(0, |n, (_, conns)| n + conns.len()) - } - - /// Counts the number of pending connections in the pool. - pub fn num_pending(&self) -> usize { - self.iter_pending_info().count() - } - /// Counts the number of established connections to the given peer. - pub fn num_peer_established(&self, peer: &PeerId) -> usize { - self.established.get(peer).map_or(0, |conns| conns.len()) - } - - /// Counts the number of pending outgoing connections to the given peer. - pub fn num_peer_outgoing(&self, peer: &PeerId) -> usize { - self.iter_pending_outgoing() - .filter(|info| info.peer_id == Some(peer)) - .count() + pub fn num_peer_established(&self, peer: &PeerId) -> u32 { + num_peer_established(&self.established, peer) } /// Returns an iterator over all established connections of `peer`. @@ -620,6 +600,7 @@ impl match item { manager::Event::PendingConnectionError { id, error, handler } => { if let Some((endpoint, peer)) = self.pending.remove(&id) { + self.counters.dec_pending(&endpoint); return Poll::Ready(PoolEvent::PendingConnectionError { id, endpoint, @@ -633,7 +614,9 @@ impl manager::Event::ConnectionClosed { id, connected, error } => { let num_established = if let Some(conns) = self.established.get_mut(&connected.peer_id) { - conns.remove(&id); + if let Some(endpoint) = conns.remove(&id) { + self.counters.dec_established(&endpoint); + } u32::try_from(conns.len()).unwrap() } else { 0 @@ -648,11 +631,24 @@ impl manager::Event::ConnectionEstablished { entry } => { let id = entry.id(); if let Some((endpoint, peer)) = self.pending.remove(&id) { - // Check connection limit. - let established = &self.established; - let current = || established.get(&entry.connected().peer_id) - .map_or(0, |conns| conns.len()); - if let Err(e) = self.limits.check_established(current) { + self.counters.dec_pending(&endpoint); + + // Check general established connection limit. + if let Err(e) = self.counters.check_max_established(&endpoint) { + let connected = entry.remove(); + return Poll::Ready(PoolEvent::PendingConnectionError { + id, + endpoint: connected.endpoint, + error: PendingConnectionError::ConnectionLimit(e), + handler: None, + peer, + pool: self + }) + } + + // Check per-peer established connection limit. + let current = num_peer_established(&self.established, &entry.connected().peer_id); + if let Err(e) = self.counters.check_max_established_per_peer(current) { let connected = entry.remove(); return Poll::Ready(PoolEvent::PendingConnectionError { id, @@ -663,6 +659,7 @@ impl pool: self }) } + // Peer ID checks must already have happened. See `add_pending`. if cfg!(debug_assertions) { if self.local_id == entry.connected().peer_id { @@ -674,11 +671,13 @@ impl } } } + // Add the connection to the pool. let peer = entry.connected().peer_id.clone(); let conns = self.established.entry(peer).or_default(); let num_established = NonZeroU32::new(u32::try_from(conns.len() + 1).unwrap()) .expect("n + 1 is always non-zero; qed"); + self.counters.inc_established(&endpoint); conns.insert(id, endpoint); match self.get(id) { Some(PoolConnection::Established(connection)) => @@ -736,6 +735,7 @@ pub enum PoolConnection<'a, TInEvent> { pub struct PendingConnection<'a, TInEvent> { entry: manager::PendingEntry<'a, TInEvent>, pending: &'a mut FnvHashMap)>, + counters: &'a mut ConnectionCounters, } impl @@ -758,7 +758,8 @@ impl /// Aborts the connection attempt, closing the connection. pub fn abort(self) { - self.pending.remove(&self.entry.id()); + let endpoint = self.pending.remove(&self.entry.id()).expect("`entry` is a pending entry").0; + self.counters.dec_pending(&endpoint); self.entry.abort(); } } @@ -790,24 +791,16 @@ impl EstablishedConnection<'_, TInEvent> { &self.entry.connected().endpoint } - /// Returns connection information obtained from the transport. - pub fn info(&self) -> &PeerId { + /// Returns the identity of the connected peer. + pub fn peer_id(&self) -> &PeerId { &self.entry.connected().peer_id } -} -impl<'a, TInEvent> EstablishedConnection<'a, TInEvent> -{ /// Returns the local connection ID. pub fn id(&self) -> ConnectionId { self.entry.id() } - /// Returns the identity of the connected peer. - pub fn peer_id(&self) -> &PeerId { - self.info() - } - /// (Asynchronously) sends an event to the connection handler. /// /// If the handler is not ready to receive the event, either because @@ -894,62 +887,196 @@ where } } -/// The configurable limits of a connection [`Pool`]. -#[derive(Debug, Clone, Default)] -pub struct PoolLimits { - pub max_outgoing: Option, - pub max_incoming: Option, - pub max_established_per_peer: Option, - pub max_outgoing_per_peer: Option, +/// Network connection information. +#[derive(Debug, Clone)] +pub struct ConnectionCounters { + /// The effective connection limits. + limits: ConnectionLimits, + /// The current number of incoming connections. + pending_incoming: u32, + /// The current number of outgoing connections. + pending_outgoing: u32, + /// The current number of established inbound connections. + established_incoming: u32, + /// The current number of established outbound connections. + established_outgoing: u32, } -impl PoolLimits { - fn check_established(&self, current: F) -> Result<(), ConnectionLimit> - where - F: FnOnce() -> usize - { - Self::check(current, self.max_established_per_peer) +impl ConnectionCounters { + fn new(limits: ConnectionLimits) -> Self { + Self { + limits, + pending_incoming: 0, + pending_outgoing: 0, + established_incoming: 0, + established_outgoing: 0, + } } - fn check_outgoing(&self, current: F) -> Result<(), ConnectionLimit> - where - F: FnOnce() -> usize - { - Self::check(current, self.max_outgoing) + /// The effective connection limits. + pub fn limits(&self) -> &ConnectionLimits { + &self.limits } - fn check_incoming(&self, current: F) -> Result<(), ConnectionLimit> - where - F: FnOnce() -> usize - { - Self::check(current, self.max_incoming) + /// The total number of connections, both pending and established. + pub fn num_connections(&self) -> u32 { + self.num_pending() + self.num_established() } - fn check_outgoing_per_peer(&self, current: F) -> Result<(), ConnectionLimit> - where - F: FnOnce() -> usize - { - Self::check(current, self.max_outgoing_per_peer) + /// The total number of pending connections, both incoming and outgoing. + pub fn num_pending(&self) -> u32 { + self.pending_incoming + self.pending_outgoing } - fn check(current: F, limit: Option) -> Result<(), ConnectionLimit> - where - F: FnOnce() -> usize + /// The number of incoming connections being established. + pub fn num_pending_incoming(&self) -> u32 { + self.pending_incoming + } + + /// The number of outgoing connections being established. + pub fn num_pending_outgoing(&self) -> u32 { + self.pending_outgoing + } + + /// The number of established incoming connections. + pub fn num_established_incoming(&self) -> u32 { + self.established_incoming + } + + /// The number of established outgoing connections. + pub fn num_established_outgoing(&self) -> u32 { + self.established_outgoing + } + + /// The total number of established connections. + pub fn num_established(&self) -> u32 { + self.established_outgoing + self.established_incoming + } + + fn inc_pending(&mut self, endpoint: &ConnectedPoint) { + match endpoint { + ConnectedPoint::Dialer { .. } => { self.pending_outgoing += 1; } + ConnectedPoint::Listener { .. } => { self.pending_incoming += 1; } + } + } + + fn dec_pending(&mut self, endpoint: &ConnectedPoint) { + match endpoint { + ConnectedPoint::Dialer { .. } => { self.pending_outgoing -= 1; } + ConnectedPoint::Listener { .. } => { self.pending_incoming -= 1; } + } + } + + fn inc_established(&mut self, endpoint: &ConnectedPoint) { + match endpoint { + ConnectedPoint::Dialer { .. } => { self.established_outgoing += 1; } + ConnectedPoint::Listener { .. } => { self.established_incoming += 1; } + } + } + + fn dec_established(&mut self, endpoint: &ConnectedPoint) { + match endpoint { + ConnectedPoint::Dialer { .. } => { self.established_outgoing -= 1; } + ConnectedPoint::Listener { .. } => { self.established_incoming -= 1; } + } + } + + fn check_max_pending_outgoing(&self) -> Result<(), ConnectionLimit> { + Self::check(self.pending_outgoing, self.limits.max_pending_outgoing) + } + + fn check_max_pending_incoming(&self) -> Result<(), ConnectionLimit> { + Self::check(self.pending_incoming, self.limits.max_pending_incoming) + } + + fn check_max_established(&self, endpoint: &ConnectedPoint) + -> Result<(), ConnectionLimit> { + match endpoint { + ConnectedPoint::Dialer { .. } => + Self::check(self.established_outgoing, self.limits.max_established_outgoing), + ConnectedPoint::Listener { .. } => { + Self::check(self.established_incoming, self.limits.max_established_incoming) + } + } + } + + fn check_max_established_per_peer(&self, current: u32) -> Result<(), ConnectionLimit> { + Self::check(current, self.limits.max_established_per_peer) + } + + fn check(current: u32, limit: Option) -> Result<(), ConnectionLimit> { if let Some(limit) = limit { - let current = current(); if current >= limit { return Err(ConnectionLimit { limit, current }) } } Ok(()) } + +} + +/// Counts the number of established connections to the given peer. +fn num_peer_established( + established: &FnvHashMap>, + peer: &PeerId +) -> u32 { + established.get(peer).map_or(0, |conns| + u32::try_from(conns.len()) + .expect("Unexpectedly large number of connections for a peer.")) +} + +/// The configurable connection limits. +/// +/// By default no connection limits apply. +#[derive(Debug, Clone, Default)] +pub struct ConnectionLimits { + max_pending_incoming: Option, + max_pending_outgoing: Option, + max_established_incoming: Option, + max_established_outgoing: Option, + max_established_per_peer: Option, +} + +impl ConnectionLimits { + /// Configures the maximum number of concurrently incoming connections being established. + pub fn with_max_pending_incoming(mut self, limit: Option) -> Self { + self.max_pending_incoming = limit; + self + } + + /// Configures the maximum number of concurrently outgoing connections being established. + pub fn with_max_pending_outgoing(mut self, limit: Option) -> Self { + self.max_pending_outgoing = limit; + self + } + + /// Configures the maximum number of concurrent established inbound connections. + pub fn with_max_established_incoming(mut self, limit: Option) -> Self { + self.max_established_incoming = limit; + self + } + + /// Configures the maximum number of concurrent established outbound connections. + pub fn with_max_established_outgoing(mut self, limit: Option) -> Self { + self.max_established_outgoing = limit; + self + } + + /// Configures the maximum number of concurrent established connections per peer, + /// regardless of direction (incoming or outgoing). + pub fn with_max_established_per_peer(mut self, limit: Option) -> Self { + self.max_established_per_peer = limit; + self + } } /// Information about a former established connection to a peer /// that was dropped via [`Pool::disconnect`]. struct Disconnected { + /// The unique identifier of the dropped connection. id: ConnectionId, + /// Information about the dropped connection. connected: Connected, /// The remaining number of established connections /// to the same peer. diff --git a/core/src/network.rs b/core/src/network.rs index b06f61654e9..59444662cf7 100644 --- a/core/src/network.rs +++ b/core/src/network.rs @@ -21,6 +21,7 @@ mod event; pub mod peer; +pub use crate::connection::{ConnectionLimits, ConnectionCounters}; pub use event::{NetworkEvent, IncomingConnection}; pub use peer::Peer; @@ -43,7 +44,7 @@ use crate::{ PendingConnectionError, Substream, manager::ManagerConfig, - pool::{Pool, PoolEvent, PoolLimits}, + pool::{Pool, PoolEvent}, }, muxing::StreamMuxer, transport::{Transport, TransportError}, @@ -134,9 +135,9 @@ where TTrans: Transport + Clone, TMuxer: StreamMuxer, THandler: IntoConnectionHandler + Send + 'static, - THandler::Handler: ConnectionHandler, InEvent = TInEvent, OutEvent = TOutEvent> + Send + 'static, - ::OutboundOpenInfo: Send + 'static, // TODO: shouldn't be necessary - ::Error: error::Error + Send + 'static, + THandler::Handler: ConnectionHandler, InEvent = TInEvent, OutEvent = TOutEvent> + Send, + ::OutboundOpenInfo: Send, + ::Error: error::Error + Send, { /// Creates a new node events stream. pub fn new( @@ -148,7 +149,7 @@ where Network { local_peer_id, listeners: ListenersStream::new(transport), - pool: Pool::new(pool_local_id, config.manager_config, config.pool_limits), + pool: Pool::new(pool_local_id, config.manager_config, config.limits), dialing: Default::default(), } } @@ -244,15 +245,11 @@ where /// Returns information about the state of the `Network`. pub fn info(&self) -> NetworkInfo { - let num_connections_established = self.pool.num_established(); - let num_connections_pending = self.pool.num_pending(); - let num_connections = num_connections_established + num_connections_pending; - let num_peers = self.pool.num_connected(); + let num_peers = self.pool.num_peers(); + let connection_counters = self.pool.counters().clone(); NetworkInfo { num_peers, - num_connections, - num_connections_established, - num_connections_pending, + connection_counters, } } @@ -301,22 +298,6 @@ where self.dialing.keys() } - /// Gets the configured limit on pending incoming connections, - /// i.e. concurrent incoming connection attempts. - pub fn incoming_limit(&self) -> Option { - self.pool.limits().max_incoming - } - - /// The total number of established connections in the `Network`. - pub fn num_connections_established(&self) -> usize { - self.pool.num_established() - } - - /// The total number of pending connections in the `Network`. - pub fn num_connections_pending(&self) -> usize { - self.pool.num_pending() - } - /// Obtains a view of a [`Peer`] with the given ID in the network. pub fn peer(&mut self, peer_id: PeerId) -> Peer<'_, TTrans, TInEvent, TOutEvent, THandler> @@ -615,13 +596,22 @@ where #[derive(Clone, Debug)] pub struct NetworkInfo { /// The total number of connected peers. - pub num_peers: usize, - /// The total number of connections, both established and pending. - pub num_connections: usize, - /// The total number of pending connections, both incoming and outgoing. - pub num_connections_pending: usize, - /// The total number of established connections. - pub num_connections_established: usize, + num_peers: usize, + /// Counters of ongoing network connections. + connection_counters: ConnectionCounters, +} + +impl NetworkInfo { + /// The number of connected peers, i.e. peers with whom at least + /// one established connection exists. + pub fn num_peers(&self) -> usize { + self.num_peers + } + + /// Gets counters for ongoing network connections. + pub fn connection_counters(&self) -> &ConnectionCounters { + &self.connection_counters + } } /// The (optional) configuration for a [`Network`]. @@ -635,17 +625,25 @@ pub struct NetworkConfig { /// one "free" slot per task. Thus the given total `notify_handler_buffer_size` /// exposed for configuration on the `Network` is reduced by one. manager_config: ManagerConfig, - pool_limits: PoolLimits, + /// The effective connection limits. + limits: ConnectionLimits, } impl NetworkConfig { - pub fn set_executor(&mut self, e: Box) -> &mut Self { + /// Configures the executor to use for spawning connection background tasks. + pub fn with_executor(mut self, e: Box) -> Self { self.manager_config.executor = Some(e); self } - pub fn executor(&self) -> Option<&Box> { - self.manager_config.executor.as_ref() + /// Configures the executor to use for spawning connection background tasks, + /// only if no executor has already been configured. + pub fn or_else_with_executor(mut self, f: F) -> Self + where + F: FnOnce() -> Option> + { + self.manager_config.executor = self.manager_config.executor.or_else(f); + self } /// Sets the maximum number of events sent to a connection's background task @@ -655,7 +653,7 @@ impl NetworkConfig { /// When the buffer for a particular connection is full, `notify_handler` will no /// longer be able to deliver events to the associated `ConnectionHandler`, /// thus exerting back-pressure on the connection and peer API. - pub fn set_notify_handler_buffer_size(&mut self, n: NonZeroUsize) -> &mut Self { + pub fn with_notify_handler_buffer_size(mut self, n: NonZeroUsize) -> Self { self.manager_config.task_command_buffer_size = n.get() - 1; self } @@ -666,28 +664,14 @@ impl NetworkConfig { /// When the buffer is full, the background tasks of all connections will stall. /// In this way, the consumers of network events exert back-pressure on /// the network connection I/O. - pub fn set_connection_event_buffer_size(&mut self, n: usize) -> &mut Self { + pub fn with_connection_event_buffer_size(mut self, n: usize) -> Self { self.manager_config.task_event_buffer_size = n; self } - pub fn set_incoming_limit(&mut self, n: usize) -> &mut Self { - self.pool_limits.max_incoming = Some(n); - self - } - - pub fn set_outgoing_limit(&mut self, n: usize) -> &mut Self { - self.pool_limits.max_outgoing = Some(n); - self - } - - pub fn set_established_per_peer_limit(&mut self, n: usize) -> &mut Self { - self.pool_limits.max_established_per_peer = Some(n); - self - } - - pub fn set_outgoing_per_peer_limit(&mut self, n: usize) -> &mut Self { - self.pool_limits.max_outgoing_per_peer = Some(n); + /// Sets the connection limits to enforce. + pub fn with_connection_limits(mut self, limits: ConnectionLimits) -> Self { + self.limits = limits; self } } @@ -705,10 +689,9 @@ mod tests { #[test] fn set_executor() { NetworkConfig::default() - .set_executor(Box::new(Dummy)) - .set_executor(Box::new(|f| { + .with_executor(Box::new(Dummy)) + .with_executor(Box::new(|f| { async_std::task::spawn(f); })); } - } diff --git a/core/src/network/peer.rs b/core/src/network/peer.rs index 43aa55b7183..1f0d58ad4f0 100644 --- a/core/src/network/peer.rs +++ b/core/src/network/peer.rs @@ -310,7 +310,7 @@ where } /// The number of established connections to the peer. - pub fn num_connections(&self) -> usize { + pub fn num_connections(&self) -> u32 { self.network.pool.num_peer_established(&self.peer_id) } @@ -448,12 +448,6 @@ where None } - /// The number of ongoing dialing attempts, i.e. pending outgoing connections - /// to this peer. - pub fn num_attempts(&self) -> usize { - self.network.pool.num_peer_outgoing(&self.peer_id) - } - /// Gets an iterator over all dialing (i.e. pending outgoing) connections to the peer. pub fn attempts<'b>(&'b mut self) -> DialingAttemptIter<'b, @@ -672,6 +666,15 @@ impl<'a, TInEvent, TOutEvent, THandler, TTransErr, THandlerErr> /// Obtains the next dialing connection, if any. pub fn next<'b>(&'b mut self) -> Option> { + // If the number of elements reduced, the current `DialingAttempt` has been + // aborted and iteration needs to continue from the previous position to + // account for the removed element. + let end = self.dialing.get(self.peer_id).map_or(0, |conns| conns.len()); + if self.end > end { + self.end = end; + self.pos -= 1; + } + if self.pos == self.end { return None } diff --git a/core/tests/connection_limits.rs b/core/tests/connection_limits.rs new file mode 100644 index 00000000000..4c9c3193066 --- /dev/null +++ b/core/tests/connection_limits.rs @@ -0,0 +1,162 @@ +// Copyright 2020 Parity Technologies (UK) Ltd. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +mod util; + +use futures::{ready, future::poll_fn}; +use libp2p_core::multiaddr::{multiaddr, Multiaddr}; +use libp2p_core::{ + PeerId, + connection::PendingConnectionError, + network::{NetworkEvent, NetworkConfig, ConnectionLimits}, +}; +use rand::Rng; +use std::task::Poll; +use util::{TestHandler, test_network}; + +#[test] +fn max_outgoing() { + let outgoing_limit = rand::thread_rng().gen_range(1, 10); + + let limits = ConnectionLimits::default().with_max_pending_outgoing(Some(outgoing_limit)); + let cfg = NetworkConfig::default().with_connection_limits(limits); + let mut network = test_network(cfg); + + let target = PeerId::random(); + for _ in 0 .. outgoing_limit { + network.peer(target.clone()) + .dial(Multiaddr::empty(), Vec::new(), TestHandler()) + .ok() + .expect("Unexpected connection limit."); + } + + let err = network.peer(target.clone()) + .dial(Multiaddr::empty(), Vec::new(), TestHandler()) + .expect_err("Unexpected dialing success."); + + assert_eq!(err.current, outgoing_limit); + assert_eq!(err.limit, outgoing_limit); + + let info = network.info(); + assert_eq!(info.num_peers(), 0); + assert_eq!(info.connection_counters().num_pending_outgoing(), outgoing_limit); + + // Abort all dialing attempts. + let mut peer = network.peer(target.clone()) + .into_dialing() + .expect("Unexpected peer state"); + + let mut attempts = peer.attempts(); + while let Some(attempt) = attempts.next() { + attempt.abort(); + } + + assert_eq!(network.info().connection_counters().num_pending_outgoing(), 0); +} + +#[test] +fn max_established_incoming() { + let limit = rand::thread_rng().gen_range(1, 10); + + fn config(limit: u32) -> NetworkConfig { + let limits = ConnectionLimits::default().with_max_established_incoming(Some(limit)); + NetworkConfig::default().with_connection_limits(limits) + } + + let mut network1 = test_network(config(limit)); + let mut network2 = test_network(config(limit)); + + let listen_addr = multiaddr![Ip4(std::net::Ipv4Addr::new(127,0,0,1)), Tcp(0u16)]; + let _ = network1.listen_on(listen_addr.clone()).unwrap(); + let (addr_sender, addr_receiver) = futures::channel::oneshot::channel(); + let mut addr_sender = Some(addr_sender); + + // Spawn the listener. + let listener = async_std::task::spawn(poll_fn(move |cx| { + loop { + match ready!(network1.poll(cx)) { + NetworkEvent::NewListenerAddress { listen_addr, .. } => { + addr_sender.take().unwrap().send(listen_addr).unwrap(); + } + NetworkEvent::IncomingConnection { connection, .. } => { + network1.accept(connection, TestHandler()).unwrap(); + } + NetworkEvent::ConnectionEstablished { .. } => {} + NetworkEvent::IncomingConnectionError { + error: PendingConnectionError::ConnectionLimit(err), .. + } => { + assert_eq!(err.limit, limit); + assert_eq!(err.limit, err.current); + let info = network1.info(); + let counters = info.connection_counters(); + assert_eq!(counters.num_established_incoming(), limit); + assert_eq!(counters.num_established(), limit); + return Poll::Ready(()) + } + e => panic!("Unexpected network event: {:?}", e) + } + } + })); + + // Spawn and block on the dialer. + async_std::task::block_on(async move { + let addr = addr_receiver.await.unwrap(); + let mut n = 0; + let _ = network2.dial(&addr, TestHandler()).unwrap(); + let mut expected_closed = None; + poll_fn(|cx| { + loop { + match ready!(network2.poll(cx)) { + NetworkEvent::ConnectionEstablished { connection, .. } => { + n += 1; + if n <= limit { + // Dial again until the limit is exceeded. + let id = network2.dial(&addr, TestHandler()).unwrap(); + if n == limit { + // The the next dialing attempt exceeds the limit, this + // is the connection we expected to get closed. + expected_closed = Some(id); + } + } else { + // This connection exceeds the limit for the listener and + // is expected to close shortly. For the dialer, these connections + // will first appear established before the listener closes them as + // a result of the limit violation. + assert_eq!(Some(connection.id()), expected_closed); + } + } + NetworkEvent::ConnectionClosed { id, .. } => { + assert_eq!(Some(id), expected_closed); + let info = network2.info(); + let counters = info.connection_counters(); + assert_eq!(counters.num_established_outgoing(), limit); + assert_eq!(counters.num_established(), limit); + return Poll::Ready(()) + } + e => panic!("Unexpected network event: {:?}", e) + } + } + }).await + }); + + // Wait for the listener to complete. + async_std::task::block_on(listener); +} + diff --git a/core/tests/network_dial_error.rs b/core/tests/network_dial_error.rs index d679775e058..13e532d7016 100644 --- a/core/tests/network_dial_error.rs +++ b/core/tests/network_dial_error.rs @@ -21,51 +21,22 @@ mod util; use futures::prelude::*; -use libp2p_core::identity; -use libp2p_core::multiaddr::{multiaddr, Multiaddr}; +use libp2p_core::multiaddr::multiaddr; use libp2p_core::{ - Network, PeerId, - Transport, connection::PendingConnectionError, - muxing::StreamMuxerBox, network::{NetworkEvent, NetworkConfig}, - transport, - upgrade, }; -use libp2p_noise as noise; -use rand::Rng; use rand::seq::SliceRandom; use std::{io, task::Poll}; -use util::TestHandler; - -type TestNetwork = Network; -type TestTransport = transport::Boxed<(PeerId, StreamMuxerBox)>; - -fn new_network(cfg: NetworkConfig) -> TestNetwork { - let local_key = identity::Keypair::generate_ed25519(); - let local_public_key = local_key.public(); - let noise_keys = noise::Keypair::::new().into_authentic(&local_key).unwrap(); - let transport: TestTransport = libp2p_tcp::TcpConfig::new() - .upgrade(upgrade::Version::V1) - .authenticate(noise::NoiseConfig::xx(noise_keys).into_authenticated()) - .multiplex(libp2p_mplex::MplexConfig::new()) - .boxed() - .and_then(|(peer, mplex), _| { - // Gracefully close the connection to allow protocol - // negotiation to complete. - util::CloseMuxer::new(mplex).map_ok(move |mplex| (peer, mplex)) - }) - .boxed(); - TestNetwork::new(transport, local_public_key.into(), cfg) -} +use util::{TestHandler, test_network}; #[test] fn deny_incoming_connec() { // Checks whether refusing an incoming connection on a swarm triggers the correct events. - let mut swarm1 = new_network(NetworkConfig::default()); - let mut swarm2 = new_network(NetworkConfig::default()); + let mut swarm1 = test_network(NetworkConfig::default()); + let mut swarm2 = test_network(NetworkConfig::default()); swarm1.listen_on("/ip4/127.0.0.1/tcp/0".parse().unwrap()).unwrap(); @@ -121,7 +92,7 @@ fn dial_self() { // // The last two can happen in any order. - let mut swarm = new_network(NetworkConfig::default()); + let mut swarm = test_network(NetworkConfig::default()); swarm.listen_on("/ip4/127.0.0.1/tcp/0".parse().unwrap()).unwrap(); let (local_address, mut swarm) = async_std::task::block_on( @@ -180,7 +151,7 @@ fn dial_self() { fn dial_self_by_id() { // Trying to dial self by passing the same `PeerId` shouldn't even be possible in the first // place. - let mut swarm = new_network(NetworkConfig::default()); + let mut swarm = test_network(NetworkConfig::default()); let peer_id = swarm.local_peer_id().clone(); assert!(swarm.peer(peer_id).into_disconnected().is_none()); } @@ -189,7 +160,7 @@ fn dial_self_by_id() { fn multiple_addresses_err() { // Tries dialing multiple addresses, and makes sure there's one dialing error per address. - let mut swarm = new_network(NetworkConfig::default()); + let mut swarm = test_network(NetworkConfig::default()); let mut addresses = Vec::new(); for _ in 0 .. 3 { @@ -233,44 +204,3 @@ fn multiple_addresses_err() { } })).unwrap(); } - -#[test] -fn connection_limit() { - let outgoing_per_peer_limit = rand::thread_rng().gen_range(1, 10); - let outgoing_limit = 2 * outgoing_per_peer_limit; - - let mut cfg = NetworkConfig::default(); - cfg.set_outgoing_per_peer_limit(outgoing_per_peer_limit); - cfg.set_outgoing_limit(outgoing_limit); - let mut network = new_network(cfg); - - let target = PeerId::random(); - for _ in 0 .. outgoing_per_peer_limit { - network.peer(target.clone()) - .dial(Multiaddr::empty(), Vec::new(), TestHandler()) - .ok() - .expect("Unexpected connection limit."); - } - - let err = network.peer(target) - .dial(Multiaddr::empty(), Vec::new(), TestHandler()) - .expect_err("Unexpected dialing success."); - - assert_eq!(err.current, outgoing_per_peer_limit); - assert_eq!(err.limit, outgoing_per_peer_limit); - - let target2 = PeerId::random(); - for _ in outgoing_per_peer_limit .. outgoing_limit { - network.peer(target2.clone()) - .dial(Multiaddr::empty(), Vec::new(), TestHandler()) - .ok() - .expect("Unexpected connection limit."); - } - - let err = network.peer(target2) - .dial(Multiaddr::empty(), Vec::new(), TestHandler()) - .expect_err("Unexpected dialing success."); - - assert_eq!(err.current, outgoing_limit); - assert_eq!(err.limit, outgoing_limit); -} diff --git a/core/tests/util.rs b/core/tests/util.rs index 306728d618f..c20a2c59305 100644 --- a/core/tests/util.rs +++ b/core/tests/util.rs @@ -4,16 +4,42 @@ use futures::prelude::*; use libp2p_core::{ Multiaddr, + PeerId, + Transport, connection::{ ConnectionHandler, ConnectionHandlerEvent, Substream, SubstreamEndpoint, }, + identity, muxing::{StreamMuxer, StreamMuxerBox}, + network::{Network, NetworkConfig}, + transport, + upgrade, }; +use libp2p_mplex as mplex; +use libp2p_noise as noise; +use libp2p_tcp as tcp; use std::{io, pin::Pin, task::Context, task::Poll}; +type TestNetwork = Network; +type TestTransport = transport::Boxed<(PeerId, StreamMuxerBox)>; + +/// Creates a new `TestNetwork` with a TCP transport. +pub fn test_network(cfg: NetworkConfig) -> TestNetwork { + let local_key = identity::Keypair::generate_ed25519(); + let local_public_key = local_key.public(); + let noise_keys = noise::Keypair::::new().into_authentic(&local_key).unwrap(); + let transport: TestTransport = tcp::TcpConfig::new() + .upgrade(upgrade::Version::V1) + .authenticate(noise::NoiseConfig::xx(noise_keys).into_authenticated()) + .multiplex(mplex::MplexConfig::new()) + .boxed(); + + TestNetwork::new(transport, local_public_key.into(), cfg) +} + pub struct TestHandler(); impl ConnectionHandler for TestHandler { @@ -35,7 +61,7 @@ impl ConnectionHandler for TestHandler { fn poll(&mut self, _: &mut Context<'_>) -> Poll, Self::Error>> { - Poll::Ready(Ok(ConnectionHandlerEvent::Custom(()))) + Poll::Pending } } diff --git a/swarm/CHANGELOG.md b/swarm/CHANGELOG.md index 734b3914481..639641e75cf 100644 --- a/swarm/CHANGELOG.md +++ b/swarm/CHANGELOG.md @@ -1,5 +1,8 @@ # 0.25.0 [unreleased] +- Changed parameters for connection limits from `usize` to `u32`. + Connection limits are now configured via `SwarmBuilder::connection_limits()`. + - Update `libp2p-core`. - Expose configurable scores for external addresses, as well as diff --git a/swarm/src/lib.rs b/swarm/src/lib.rs index f9009675416..b3e0d4deeaf 100644 --- a/swarm/src/lib.rs +++ b/swarm/src/lib.rs @@ -113,6 +113,7 @@ use libp2p_core::{ transport::{self, TransportError}, muxing::StreamMuxerBox, network::{ + ConnectionLimits, Network, NetworkInfo, NetworkEvent, @@ -987,7 +988,7 @@ where TBehaviour: NetworkBehaviour, /// By default, unless another executor has been configured, /// [`SwarmBuilder::build`] will try to set up a `ThreadPool`. pub fn executor(mut self, e: Box) -> Self { - self.network_config.set_executor(e); + self.network_config = self.network_config.with_executor(e); self } @@ -1001,7 +1002,7 @@ where TBehaviour: NetworkBehaviour, /// be sleeping more often than necessary. Increasing this value increases /// the overall memory usage. pub fn notify_handler_buffer_size(mut self, n: NonZeroUsize) -> Self { - self.network_config.set_notify_handler_buffer_size(n); + self.network_config = self.network_config.with_notify_handler_buffer_size(n); self } @@ -1029,28 +1030,13 @@ where TBehaviour: NetworkBehaviour, /// event is emitted and the moment when it is received by the /// [`NetworkBehaviour`]. pub fn connection_event_buffer_size(mut self, n: usize) -> Self { - self.network_config.set_connection_event_buffer_size(n); + self.network_config = self.network_config.with_connection_event_buffer_size(n); self } - /// Configures a limit for the number of simultaneous incoming - /// connection attempts. - pub fn incoming_connection_limit(mut self, n: usize) -> Self { - self.network_config.set_incoming_limit(n); - self - } - - /// Configures a limit for the number of simultaneous outgoing - /// connection attempts. - pub fn outgoing_connection_limit(mut self, n: usize) -> Self { - self.network_config.set_outgoing_limit(n); - self - } - - /// Configures a limit for the number of simultaneous - /// established connections per peer. - pub fn peer_connection_limit(mut self, n: usize) -> Self { - self.network_config.set_established_per_peer_limit(n); + /// Configures the connection limits. + pub fn connection_limits(mut self, limits: ConnectionLimits) -> Self { + self.network_config = self.network_config.with_connection_limits(limits); self } @@ -1064,20 +1050,21 @@ where TBehaviour: NetworkBehaviour, .map(|info| info.protocol_name().to_vec()) .collect(); - let mut network_cfg = self.network_config; - // If no executor has been explicitly configured, try to set up a thread pool. - if network_cfg.executor().is_none() { + let network_cfg = self.network_config.or_else_with_executor(|| { match ThreadPoolBuilder::new() .name_prefix("libp2p-swarm-task-") .create() { Ok(tp) => { - network_cfg.set_executor(Box::new(move |f| tp.spawn_ok(f))); + Some(Box::new(move |f| tp.spawn_ok(f))) }, - Err(err) => log::warn!("Failed to create executor thread pool: {:?}", err) + Err(err) => { + log::warn!("Failed to create executor thread pool: {:?}", err); + None + } } - } + }); let network = Network::new(self.transport, self.local_peer_id, network_cfg);