Skip to content
Merged
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,10 @@
# Version ???

- Support for multiple connections per peer and configurable connection limits.
See [PR #1440](https://github.com/libp2p/rust-libp2p/pull/1440),
[PR #1519](https://github.com/libp2p/rust-libp2p/pull/1519) and
[issue #912](https://github.com/libp2p/rust-libp2p/issues/912) for details.

# Version 0.16.2 (2020-02-28)

- Fixed yamux connections not properly closing and being stuck in the `CLOSE_WAIT` state.
Expand Down
5 changes: 4 additions & 1 deletion core/src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ pub use pool::{EstablishedConnection, EstablishedConnectionIter, PendingConnecti

use crate::muxing::StreamMuxer;
use crate::{Multiaddr, PeerId};
use std::{fmt, pin::Pin, task::Context, task::Poll};
use std::{error::Error, fmt, pin::Pin, task::Context, task::Poll};
use std::hash::Hash;
use substream::{Muxing, SubstreamEvent};

Expand Down Expand Up @@ -334,3 +334,6 @@ impl fmt::Display for ConnectionLimit {
write!(f, "{}/{}", self.current, self.limit)
}
}

/// A `ConnectionLimit` can represent an error if it has been exceeded.
impl Error for ConnectionLimit {}
8 changes: 4 additions & 4 deletions core/src/connection/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ where
TPeerId: Clone + Send + 'static,
{
let endpoint = info.to_connected_point();
if let Some(limit) = self.limits.max_pending_incoming {
if let Some(limit) = self.limits.max_incoming {
let current = self.iter_pending_incoming().count();
if current >= limit {
return Err(ConnectionLimit { limit, current })
Expand Down Expand Up @@ -834,8 +834,8 @@ where
/// The configurable limits of a connection [`Pool`].
#[derive(Debug, Clone, Default)]
pub struct PoolLimits {
pub max_pending_outgoing: Option<usize>,
pub max_pending_incoming: Option<usize>,
pub max_outgoing: Option<usize>,
pub max_incoming: Option<usize>,
pub max_established_per_peer: Option<usize>,
}

Expand All @@ -851,7 +851,7 @@ impl PoolLimits {
where
F: FnOnce() -> usize
{
Self::check(current, self.max_pending_outgoing)
Self::check(current, self.max_outgoing)
}

fn check<F>(current: F, limit: Option<usize>) -> Result<(), ConnectionLimit>
Expand Down
93 changes: 46 additions & 47 deletions core/src/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ where
/// [`Connection`](crate::connection::Connection) upon success and the
/// connection ID is returned.
pub fn dial(&mut self, address: &Multiaddr, handler: THandler)
-> Result<ConnectionId, DialError<TTrans::Error>>
-> Result<ConnectionId, ConnectionLimit>
where
TTrans: Transport<Output = (TConnInfo, TMuxer)>,
TTrans::Error: Send + 'static,
Expand All @@ -232,10 +232,17 @@ where
TConnInfo: Send + 'static,
TPeerId: Send + 'static,
{
let future = self.transport().clone().dial(address.clone())?
.map_err(|err| PendingConnectionError::Transport(TransportError::Other(err)));
let info = OutgoingInfo { address, peer_id: None };
self.pool.add_outgoing(future, handler, info).map_err(DialError::MaxPending)
match self.transport().clone().dial(address.clone()) {
Ok(f) => {
let f = f.map_err(|err| PendingConnectionError::Transport(TransportError::Other(err)));
self.pool.add_outgoing(f, handler, info)
}
Err(err) => {
let f = future::err(PendingConnectionError::Transport(err));
self.pool.add_outgoing(f, handler, info)
}
}
}

/// Returns information about the state of the `Network`.
Expand Down Expand Up @@ -275,6 +282,22 @@ where
self.pool.iter_connected()
}

/// Checks whether the network has an established connection to a peer.
pub fn is_connected(&self, peer: &TPeerId) -> bool {
self.pool.is_connected(peer)
}

/// Checks whether the network has an ongoing dialing attempt to a peer.
pub fn is_dialing(&self, peer: &TPeerId) -> bool {
self.dialing.contains_key(peer)
}

/// Checks whether the network has neither an ongoing dialing attempt,
/// nor an established connection to a peer.
pub fn is_disconnected(&self, peer: &TPeerId) -> bool {
!self.is_connected(peer) && !self.is_dialing(peer)
}

/// Returns a list of all the peers to whom a new outgoing connection
/// is currently being established.
pub fn dialing_peers(&self) -> impl Iterator<Item = &TPeerId> {
Expand All @@ -284,7 +307,7 @@ where
/// Gets the configured limit on pending incoming connections,
/// i.e. concurrent incoming connection attempts.
pub fn incoming_limit(&self) -> Option<usize> {
self.pool.limits().max_pending_incoming
self.pool.limits().max_incoming
}

/// The total number of established connections in the `Network`.
Expand Down Expand Up @@ -380,8 +403,9 @@ where
}
event
}
Poll::Ready(PoolEvent::ConnectionError { connected, error, num_established, .. }) => {
Poll::Ready(PoolEvent::ConnectionError { id, connected, error, num_established, .. }) => {
NetworkEvent::ConnectionError {
id,
connected,
error,
num_established,
Expand Down Expand Up @@ -557,43 +581,6 @@ pub struct NetworkInfo {
pub num_connections_established: usize,
}

/// The possible errors of [`Network::dial`].
#[derive(Debug)]
pub enum DialError<T> {
/// The configured limit of pending outgoing connections has been reached.
MaxPending(ConnectionLimit),
/// A transport error occurred when creating the connection.
Transport(TransportError<T>),
}

impl<T> fmt::Display for DialError<T>
where T: fmt::Display,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
DialError::MaxPending(limit) => write!(f, "Dial error (pending limit): {}", limit.current),
DialError::Transport(err) => write!(f, "Dial error (transport): {}", err),
}
}
}

impl<T> std::error::Error for DialError<T>
where T: std::error::Error + 'static,
{
fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
match self {
DialError::MaxPending(_) => None,
DialError::Transport(e) => Some(e),
}
}
}

impl<T> From<TransportError<T>> for DialError<T> {
fn from(e: TransportError<T>) -> DialError<T> {
DialError::Transport(e)
}
}

/// The (optional) configuration for a [`Network`].
///
/// The default configuration specifies no dedicated task executor
Expand All @@ -610,17 +597,29 @@ impl NetworkConfig {
self
}

/// Shortcut for calling `executor` with an object that calls the given closure.
pub fn set_executor_fn(mut self, f: impl Fn(Pin<Box<dyn Future<Output = ()> + Send>>) + Send + 'static) -> Self {
struct SpawnImpl<F>(F);
impl<F: Fn(Pin<Box<dyn Future<Output = ()> + Send>>)> Executor for SpawnImpl<F> {
fn exec(&self, f: Pin<Box<dyn Future<Output = ()> + Send>>) {
(self.0)(f)
}
}
self.set_executor(Box::new(SpawnImpl(f)));
self
}

pub fn executor(&self) -> Option<&Box<dyn Executor + Send>> {
self.executor.as_ref()
}

pub fn set_pending_incoming_limit(&mut self, n: usize) -> &mut Self {
self.pool_limits.max_pending_incoming = Some(n);
pub fn set_incoming_limit(&mut self, n: usize) -> &mut Self {
self.pool_limits.max_incoming = Some(n);
self
}

pub fn set_pending_outgoing_limit(&mut self, n: usize) -> &mut Self {
self.pool_limits.max_pending_outgoing = Some(n);
pub fn set_outgoing_limit(&mut self, n: usize) -> &mut Self {
self.pool_limits.max_outgoing = Some(n);
self
}

Expand Down
2 changes: 2 additions & 0 deletions core/src/network/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,8 @@ where
///
/// The connection is closed as a result of the error.
ConnectionError {
/// The ID of the connection that encountered an error.
id: ConnectionId,
/// Information about the connection that encountered the error.
connected: Connected<TConnInfo>,
/// The error that occurred.
Expand Down
80 changes: 74 additions & 6 deletions core/src/network/peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -174,28 +174,66 @@ where
TConnInfo: fmt::Debug + ConnectionInfo<PeerId = TPeerId> + Send + 'static,
TPeerId: Eq + Hash + Clone + Send + 'static,
{
/// Checks whether the peer is currently connected.
///
/// Returns `true` iff [`Peer::into_connected`] returns `Some`.
pub fn is_connected(&self) -> bool {
match self {
Peer::Connected(..) => true,
Peer::Dialing(peer) => peer.is_connected(),
Peer::Disconnected(..) => false,
Peer::Local => false
}
}

/// Checks whether the peer is currently being dialed.
///
/// Returns `true` iff [`Peer::into_dialing`] returns `Some`.
pub fn is_dialing(&self) -> bool {
match self {
Peer::Dialing(_) => true,
Peer::Connected(peer) => peer.is_dialing(),
Peer::Disconnected(..) => false,
Peer::Local => false
}
}

/// Checks whether the peer is currently disconnected.
///
/// Returns `true` iff [`Peer::into_disconnected`] returns `Some`.
pub fn is_disconnected(&self) -> bool {
match self {
Peer::Disconnected(..) => true,
_ => false
}
}

/// If we are connected, returns the `ConnectedPeer`.
/// Converts the peer into a `ConnectedPeer`, if there an established connection exists.
pub fn into_connected(self) -> Option<
ConnectedPeer<'a, TTrans, TInEvent, TOutEvent, THandler, TConnInfo, TPeerId>
> {
match self {
Peer::Connected(peer) => Some(peer),
_ => None,
Peer::Dialing(peer) => peer.into_connected(),
Peer::Disconnected(..) => None,
Peer::Local => None,
}
}

/// If a connection is pending, returns the `DialingPeer`.
/// Converts the peer into a `DialingPeer`, if a dialing attempt exists.
pub fn into_dialing(self) -> Option<
DialingPeer<'a, TTrans, TInEvent, TOutEvent, THandler, TConnInfo, TPeerId>
> {
match self {
Peer::Dialing(peer) => Some(peer),
_ => None,
Peer::Connected(peer) => peer.into_dialing(),
Peer::Disconnected(..) => None,
Peer::Local => None
}
}

/// If we are not connected, returns the `DisconnectedPeer`.
/// Converts the peer into a `DisconnectedPeer`, if neither an established connection
/// nor a dialing attempt exists.
pub fn into_disconnected(self) -> Option<
DisconnectedPeer<'a, TTrans, TInEvent, TOutEvent, THandler, TConnInfo, TPeerId>
> {
Expand Down Expand Up @@ -225,6 +263,10 @@ where
TConnInfo: ConnectionInfo<PeerId = TPeerId>,
TPeerId: Eq + Hash + Clone,
{
pub fn id(&self) -> &TPeerId {
&self.peer_id
}

/// Attempts to establish a new connection to this peer using the given addresses,
/// if there is currently no ongoing dialing attempt.
///
Expand Down Expand Up @@ -294,7 +336,7 @@ where
self.network.dialing.contains_key(&self.peer_id)
}

/// Turns this peer into a [`DialingPeer`], if there is an ongoing
/// Converts this peer into a [`DialingPeer`], if there is an ongoing
/// dialing attempt, `None` otherwise.
pub fn into_dialing(self) -> Option<
DialingPeer<'a, TTrans, TInEvent, TOutEvent, THandler, TConnInfo, TPeerId>
Expand Down Expand Up @@ -373,12 +415,34 @@ where
TConnInfo: ConnectionInfo<PeerId = TPeerId>,
TPeerId: Eq + Hash + Clone,
{
pub fn id(&self) -> &TPeerId {
&self.peer_id
}

/// Disconnects from this peer, closing all pending connections.
pub fn disconnect(self) -> DisconnectedPeer<'a, TTrans, TInEvent, TOutEvent, THandler, TConnInfo, TPeerId> {
self.network.disconnect(&self.peer_id);
DisconnectedPeer { network: self.network, peer_id: self.peer_id }
}

/// Checks whether there is an established connection to the peer.
///
/// Returns `true` iff [`DialingPeer::into_connected`] returns `Some`.
pub fn is_connected(&self) -> bool {
self.network.pool.is_connected(&self.peer_id)
}

/// Converts the peer into a `ConnectedPeer`, if an established connection exists.
pub fn into_connected(self)
-> Option<ConnectedPeer<'a, TTrans, TInEvent, TOutEvent, THandler, TConnInfo, TPeerId>>
{
if self.is_connected() {
Some(ConnectedPeer { peer_id: self.peer_id, network: self.network })
} else {
None
}
}

/// Obtains the connection that is currently being established.
pub fn connection<'b>(&'b mut self) -> DialingConnection<'b, TInEvent, TConnInfo, TPeerId> {
let attempt = match self.network.dialing.entry(self.peer_id.clone()) {
Expand Down Expand Up @@ -452,6 +516,10 @@ where
TInEvent: Send + 'static,
TOutEvent: Send + 'static,
{
pub fn id(&self) -> &TPeerId {
&self.peer_id
}

/// Attempts to connect to this peer using the given addresses.
pub fn connect<TIter>(self, first: Multiaddr, rest: TIter, handler: THandler)
-> Result<DialingPeer<'a, TTrans, TInEvent, TOutEvent, THandler, TConnInfo, TPeerId>,
Expand Down
Loading