Skip to content
Merged
6 changes: 4 additions & 2 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -324,7 +324,8 @@ impl Litep2p {
}

// enable tcp transport if the config exists
if let Some(config) = litep2p_config.tcp.take() {
if let Some(mut config) = litep2p_config.tcp.take() {
config.max_parallel_dials = litep2p_config.max_parallel_dials;
let handle = transport_manager.transport_handle(Arc::clone(&litep2p_config.executor));
let (transport, transport_listen_addresses) =
<TcpTransport as TransportBuilder>::new(handle, config, resolver.clone())?;
Expand Down Expand Up @@ -369,7 +370,8 @@ impl Litep2p {

// enable websocket transport if the config exists
#[cfg(feature = "websocket")]
if let Some(config) = litep2p_config.websocket.take() {
if let Some(mut config) = litep2p_config.websocket.take() {
config.max_parallel_dials = litep2p_config.max_parallel_dials;
let handle = transport_manager.transport_handle(Arc::clone(&litep2p_config.executor));
let (transport, transport_listen_addresses) =
<WebSocketTransport as TransportBuilder>::new(handle, config, resolver)?;
Expand Down
8 changes: 7 additions & 1 deletion src/transport/tcp/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@

use crate::{
crypto::noise::{MAX_READ_AHEAD_FACTOR, MAX_WRITE_BUFFER_SIZE},
transport::{CONNECTION_OPEN_TIMEOUT, SUBSTREAM_OPEN_TIMEOUT},
transport::{CONNECTION_OPEN_TIMEOUT, MAX_PARALLEL_DIALS, SUBSTREAM_OPEN_TIMEOUT},
};

/// TCP transport configuration.
Expand Down Expand Up @@ -80,6 +80,11 @@ pub struct Config {
/// How long should litep2p wait for a substream to be opened before considering
/// the substream rejected.
pub substream_open_timeout: std::time::Duration,

/// Maximum number of parallel dial attempts for a single peer.
///
/// Defaults to `8`.
pub max_parallel_dials: usize,
}

impl Default for Config {
Expand All @@ -96,6 +101,7 @@ impl Default for Config {
noise_write_buffer_size: MAX_WRITE_BUFFER_SIZE,
connection_open_timeout: CONNECTION_OPEN_TIMEOUT,
substream_open_timeout: SUBSTREAM_OPEN_TIMEOUT,
max_parallel_dials: MAX_PARALLEL_DIALS,
}
}
}
146 changes: 90 additions & 56 deletions src/transport/tcp/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ use crate::{

use futures::{
future::BoxFuture,
stream::{AbortHandle, FuturesUnordered, Stream, StreamExt},
stream::{AbortHandle, Stream, StreamExt},
TryFutureExt,
};
use hickory_resolver::TokioResolver;
Expand Down Expand Up @@ -434,74 +434,108 @@ impl Transport for TcpTransport {
addresses: Vec<Multiaddr>,
) -> crate::Result<()> {
let num_addresses = addresses.len();
let mut futures: FuturesUnordered<_> = addresses
.into_iter()
.map(|address| {
let yamux_config = self.config.yamux_config.clone();
let max_read_ahead_factor = self.config.noise_read_ahead_frame_count;
let max_write_buffer_size = self.config.noise_write_buffer_size;
let connection_open_timeout = self.config.connection_open_timeout;
let substream_open_timeout = self.config.substream_open_timeout;
let dial_addresses = self.dial_addresses.clone();
let keypair = self.context.keypair.clone();
let nodelay = self.config.nodelay;
let resolver = self.resolver.clone();

async move {
let (address, stream) = TcpTransport::dial_peer(
address.clone(),
dial_addresses,
connection_open_timeout,
nodelay,
resolver,
)
.await
.map_err(|error| (address, error))?;

let open_address = address.clone();
let (socket_address, peer) = TcpAddress::multiaddr_to_socket_address(&address)
.map_err(|error| (address, error.into()))?;
let yamux_config = self.config.yamux_config.clone();
let max_read_ahead_factor = self.config.noise_read_ahead_frame_count;
let max_write_buffer_size = self.config.noise_write_buffer_size;
let connection_open_timeout = self.config.connection_open_timeout;
let substream_open_timeout = self.config.substream_open_timeout;
let max_parallel_dials = self.config.max_parallel_dials;
let dial_addresses = self.dial_addresses.clone();
let keypair = self.context.keypair.clone();
let nodelay = self.config.nodelay;
let resolver = self.resolver.clone();

TcpConnection::open_connection(
connection_id,
keypair,
stream,
socket_address,
peer,
yamux_config,
max_read_ahead_factor,
max_write_buffer_size,
connection_open_timeout,
substream_open_timeout,
)
.await
.map_err(|error| (open_address, error.into()))
}
})
.collect();
let futures = futures::stream::iter(addresses.into_iter().map(move |address| {
let yamux_config = yamux_config.clone();
let dial_addresses = dial_addresses.clone();
let keypair = keypair.clone();
let resolver = resolver.clone();

async move {
let (address, stream) = TcpTransport::dial_peer(
address.clone(),
dial_addresses,
connection_open_timeout,
nodelay,
resolver,
)
.await
.map_err(|error| (address, error))?;

let open_address = address.clone();
let (socket_address, peer) = TcpAddress::multiaddr_to_socket_address(&address)
.map_err(|error| (address, error.into()))?;

TcpConnection::open_connection(
connection_id,
keypair,
stream,
socket_address,
peer,
yamux_config,
max_read_ahead_factor,
max_write_buffer_size,
connection_open_timeout,
substream_open_timeout,
)
.await
.map_err(|error| (open_address, error.into()))
}
}))
.buffer_unordered(max_parallel_dials);

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm do we not limit the max adresses already here?


// Future that will resolve to the first successful connection.
let future = async move {
let mut errors = Vec::with_capacity(num_addresses);
while let Some(result) = futures.next().await {
match result {
Ok(negotiated) => return RawConnectionResult::Connected { negotiated, errors },
Err(error) => {
// Deadline for the overall dial attempt, including all retries. This is to prevent
// retry attempts from indefinitely delaying the dial result.
let deadline = tokio::time::sleep(2 * connection_open_timeout);

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we make this a constant?


tokio::pin!(deadline);
tokio::pin!(futures);

loop {
tokio::select! {
result = futures.next() => {
match result {
Some(Ok(negotiated)) => {
return RawConnectionResult::Connected {
negotiated,
errors,
};
}
Some(Err(error)) => {
tracing::debug!(
target: LOG_TARGET,
?connection_id,
?error,
"failed to open connection",
);
errors.push(error);
}
None => {
return RawConnectionResult::Failed {
connection_id,
errors,
};
}
}
}
_ = &mut deadline => {
tracing::debug!(
target: LOG_TARGET,
?connection_id,
?error,
"failed to open connection",
?connection_open_timeout,
"overall dial timeout exceeded",
);
errors.push(error)
return RawConnectionResult::Failed {
connection_id,
errors,
};
}
}
}

RawConnectionResult::Failed {
connection_id,
errors,
}
};

let (fut, handle) = futures::future::abortable(future);
Expand Down
8 changes: 7 additions & 1 deletion src/transport/websocket/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@

use crate::{
crypto::noise::{MAX_READ_AHEAD_FACTOR, MAX_WRITE_BUFFER_SIZE},
transport::{CONNECTION_OPEN_TIMEOUT, SUBSTREAM_OPEN_TIMEOUT},
transport::{CONNECTION_OPEN_TIMEOUT, MAX_PARALLEL_DIALS, SUBSTREAM_OPEN_TIMEOUT},
};

/// WebSocket transport configuration.
Expand Down Expand Up @@ -80,6 +80,11 @@ pub struct Config {
/// How long should litep2p wait for a substream to be opened before considering
/// the substream rejected.
pub substream_open_timeout: std::time::Duration,

/// Maximum number of parallel dial attempts for a single peer.
///
/// Defaults to `8`.
pub max_parallel_dials: usize,
}

impl Default for Config {
Expand All @@ -96,6 +101,7 @@ impl Default for Config {
noise_write_buffer_size: MAX_WRITE_BUFFER_SIZE,
connection_open_timeout: CONNECTION_OPEN_TIMEOUT,
substream_open_timeout: SUBSTREAM_OPEN_TIMEOUT,
max_parallel_dials: MAX_PARALLEL_DIALS,
}
}
}
Loading
Loading