From 12f000b32ced189ca0e0bbb2e4e8247955a035e2 Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Thu, 19 Feb 2026 16:03:44 +0000 Subject: [PATCH 01/11] tcp: Support address dialing with a concurrency cap of 8 Signed-off-by: Alexandru Vasile --- src/transport/tcp/mod.rs | 95 +++++++++++++++++++++------------------- 1 file changed, 50 insertions(+), 45 deletions(-) diff --git a/src/transport/tcp/mod.rs b/src/transport/tcp/mod.rs index 242c39ab6..1f4f9e0f8 100644 --- a/src/transport/tcp/mod.rs +++ b/src/transport/tcp/mod.rs @@ -30,7 +30,7 @@ use crate::{ config::Config, connection::{NegotiatedConnection, TcpConnection}, }, - Transport, TransportBuilder, TransportEvent, + Transport, TransportBuilder, TransportEvent, MAX_PARALLEL_DIALS, }, types::ConnectionId, utils::futures_stream::FuturesStream, @@ -38,7 +38,7 @@ use crate::{ use futures::{ future::BoxFuture, - stream::{AbortHandle, FuturesUnordered, Stream, StreamExt}, + stream::{AbortHandle, Stream, StreamExt}, TryFutureExt, }; use hickory_resolver::TokioResolver; @@ -434,54 +434,59 @@ impl Transport for TcpTransport { addresses: Vec, ) -> crate::Result<()> { let num_addresses = addresses.len(); - let mut futures: FuturesUnordered<_> = addresses - .into_iter() - .map(|address| { - let yamux_config = self.config.yamux_config.clone(); - let max_read_ahead_factor = self.config.noise_read_ahead_frame_count; - let max_write_buffer_size = self.config.noise_write_buffer_size; - let connection_open_timeout = self.config.connection_open_timeout; - let substream_open_timeout = self.config.substream_open_timeout; - let dial_addresses = self.dial_addresses.clone(); - let keypair = self.context.keypair.clone(); - let nodelay = self.config.nodelay; - let resolver = self.resolver.clone(); - - async move { - let (address, stream) = TcpTransport::dial_peer( - address.clone(), - dial_addresses, - connection_open_timeout, - nodelay, - resolver, - ) - .await - .map_err(|error| (address, error))?; - let open_address = address.clone(); - let (socket_address, peer) = TcpAddress::multiaddr_to_socket_address(&address) - .map_err(|error| (address, error.into()))?; + let yamux_config = self.config.yamux_config.clone(); + let max_read_ahead_factor = self.config.noise_read_ahead_frame_count; + let max_write_buffer_size = self.config.noise_write_buffer_size; + let connection_open_timeout = self.config.connection_open_timeout; + let substream_open_timeout = self.config.substream_open_timeout; + let dial_addresses = self.dial_addresses.clone(); + let keypair = self.context.keypair.clone(); + let nodelay = self.config.nodelay; + let resolver = self.resolver.clone(); + + let futures = futures::stream::iter(addresses.into_iter().map(move |address| { + let yamux_config = yamux_config.clone(); + let dial_addresses = dial_addresses.clone(); + let keypair = keypair.clone(); + let resolver = resolver.clone(); + + async move { + let (address, stream) = TcpTransport::dial_peer( + address.clone(), + dial_addresses, + connection_open_timeout, + nodelay, + resolver, + ) + .await + .map_err(|error| (address, error))?; - TcpConnection::open_connection( - connection_id, - keypair, - stream, - socket_address, - peer, - yamux_config, - max_read_ahead_factor, - max_write_buffer_size, - connection_open_timeout, - substream_open_timeout, - ) - .await - .map_err(|error| (open_address, error.into())) - } - }) - .collect(); + let open_address = address.clone(); + let (socket_address, peer) = TcpAddress::multiaddr_to_socket_address(&address) + .map_err(|error| (address, error.into()))?; + + TcpConnection::open_connection( + connection_id, + keypair, + stream, + socket_address, + peer, + yamux_config, + max_read_ahead_factor, + max_write_buffer_size, + connection_open_timeout, + substream_open_timeout, + ) + .await + .map_err(|error| (open_address, error.into())) + } + })) + .buffer_unordered(MAX_PARALLEL_DIALS); // Future that will resolve to the first successful connection. let future = async move { + futures::pin_mut!(futures); let mut errors = Vec::with_capacity(num_addresses); while let Some(result) = futures.next().await { match result { From b73a86638694ba0bc53595ade839f4adfe7008da Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Thu, 19 Feb 2026 16:11:54 +0000 Subject: [PATCH 02/11] tcp: Bound total dialing time Signed-off-by: Alexandru Vasile --- src/transport/tcp/mod.rs | 58 +++++++++++++++++++++++++++++++--------- 1 file changed, 45 insertions(+), 13 deletions(-) diff --git a/src/transport/tcp/mod.rs b/src/transport/tcp/mod.rs index 1f4f9e0f8..0ef0d8e72 100644 --- a/src/transport/tcp/mod.rs +++ b/src/transport/tcp/mod.rs @@ -486,27 +486,59 @@ impl Transport for TcpTransport { // Future that will resolve to the first successful connection. let future = async move { - futures::pin_mut!(futures); let mut errors = Vec::with_capacity(num_addresses); - while let Some(result) = futures.next().await { - match result { - Ok(negotiated) => return RawConnectionResult::Connected { negotiated, errors }, - Err(error) => { + // Deadline for the overall dial attempt, including all retries. This is to prevent + // retry attempts from indefinitely delaying the dial result. + // + // The 2x timeout is needed by TcpTransport::dial_peer and + // TcpConnection::open_connection, which both have their own timeouts and run + // sequentially. + let deadline = tokio::time::sleep(2 * connection_open_timeout); + + tokio::pin!(deadline); + tokio::pin!(futures); + + loop { + tokio::select! { + result = futures.next() => { + match result { + Some(Ok(negotiated)) => { + return RawConnectionResult::Connected { + negotiated, + errors, + }; + } + Some(Err(error)) => { + tracing::debug!( + target: LOG_TARGET, + ?connection_id, + ?error, + "failed to open connection", + ); + errors.push(error); + } + None => { + return RawConnectionResult::Failed { + connection_id, + errors, + }; + } + } + } + _ = &mut deadline => { tracing::debug!( target: LOG_TARGET, ?connection_id, - ?error, - "failed to open connection", + ?connection_open_timeout, + "overall dial timeout exceeded", ); - errors.push(error) + return RawConnectionResult::Failed { + connection_id, + errors, + }; } } } - - RawConnectionResult::Failed { - connection_id, - errors, - } }; let (fut, handle) = futures::future::abortable(future); From 1fd872c12c4ac05ece9c499e77a9313c81c0813e Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Thu, 19 Feb 2026 16:15:14 +0000 Subject: [PATCH 03/11] websocket: Apply changes to the websocket transport Signed-off-by: Alexandru Vasile --- src/transport/tcp/mod.rs | 4 - src/transport/websocket/mod.rs | 153 ++++++++++++++++++++------------- 2 files changed, 92 insertions(+), 65 deletions(-) diff --git a/src/transport/tcp/mod.rs b/src/transport/tcp/mod.rs index 0ef0d8e72..cb0c07237 100644 --- a/src/transport/tcp/mod.rs +++ b/src/transport/tcp/mod.rs @@ -489,10 +489,6 @@ impl Transport for TcpTransport { let mut errors = Vec::with_capacity(num_addresses); // Deadline for the overall dial attempt, including all retries. This is to prevent // retry attempts from indefinitely delaying the dial result. - // - // The 2x timeout is needed by TcpTransport::dial_peer and - // TcpConnection::open_connection, which both have their own timeouts and run - // sequentially. let deadline = tokio::time::sleep(2 * connection_open_timeout); tokio::pin!(deadline); diff --git a/src/transport/websocket/mod.rs b/src/transport/websocket/mod.rs index 80c636882..e89e10512 100644 --- a/src/transport/websocket/mod.rs +++ b/src/transport/websocket/mod.rs @@ -29,18 +29,14 @@ use crate::{ config::Config, connection::{NegotiatedConnection, WebSocketConnection}, }, - Transport, TransportBuilder, TransportEvent, + Transport, TransportBuilder, TransportEvent, MAX_PARALLEL_DIALS, }, types::ConnectionId, utils::futures_stream::FuturesStream, DialError, PeerId, }; -use futures::{ - future::BoxFuture, - stream::{AbortHandle, FuturesUnordered}, - Stream, StreamExt, TryFutureExt, -}; +use futures::{future::BoxFuture, stream::AbortHandle, Stream, StreamExt, TryFutureExt}; use hickory_resolver::TokioResolver; use multiaddr::{Multiaddr, Protocol}; use socket2::{Domain, Socket, Type}; @@ -479,74 +475,109 @@ impl Transport for WebSocketTransport { ) -> crate::Result<()> { let num_addresses = addresses.len(); - let mut futures: FuturesUnordered<_> = addresses - .into_iter() - .map(|address| { - let yamux_config = self.config.yamux_config.clone(); - let keypair = self.context.keypair.clone(); - let connection_open_timeout = self.config.connection_open_timeout; - let max_read_ahead_factor = self.config.noise_read_ahead_frame_count; - let max_write_buffer_size = self.config.noise_write_buffer_size; - let substream_open_timeout = self.config.substream_open_timeout; - let dial_addresses = self.dial_addresses.clone(); - let nodelay = self.config.nodelay; - let resolver = self.resolver.clone(); - - async move { - let (address, stream) = WebSocketTransport::dial_peer( - address.clone(), - dial_addresses, - connection_open_timeout, - nodelay, - resolver, - ) - .await - .map_err(|error| (address, error))?; + let yamux_config = self.config.yamux_config.clone(); + let keypair = self.context.keypair.clone(); + let connection_open_timeout = self.config.connection_open_timeout; + let max_read_ahead_factor = self.config.noise_read_ahead_frame_count; + let max_write_buffer_size = self.config.noise_write_buffer_size; + let substream_open_timeout = self.config.substream_open_timeout; + let dial_addresses = self.dial_addresses.clone(); + let nodelay = self.config.nodelay; + let resolver = self.resolver.clone(); - let open_address = address.clone(); - let (ws_address, peer) = Self::multiaddr_into_url(address.clone()) - .map_err(|error| (address.clone(), error.into()))?; + let futures = futures::stream::iter(addresses.into_iter().map(move |address| { + let yamux_config = yamux_config.clone(); + let keypair = keypair.clone(); + let dial_addresses = dial_addresses.clone(); + let resolver = resolver.clone(); + + async move { + let (address, stream) = WebSocketTransport::dial_peer( + address.clone(), + dial_addresses, + connection_open_timeout, + nodelay, + resolver, + ) + .await + .map_err(|error| (address, error))?; - WebSocketConnection::open_connection( - connection_id, - keypair, - stream, - address, - peer, - ws_address, - yamux_config, - max_read_ahead_factor, - max_write_buffer_size, - substream_open_timeout, - ) - .await - .map_err(|error| (open_address, error.into())) - } - }) - .collect(); + let open_address = address.clone(); + let (ws_address, peer) = Self::multiaddr_into_url(address.clone()) + .map_err(|error| (address.clone(), error.into()))?; + + WebSocketConnection::open_connection( + connection_id, + keypair, + stream, + address, + peer, + ws_address, + yamux_config, + max_read_ahead_factor, + max_write_buffer_size, + substream_open_timeout, + ) + .await + .map_err(|error| (open_address, error.into())) + } + })) + .buffer_unordered(MAX_PARALLEL_DIALS); // Future that will resolve to the first successful connection. + // + // The overall deadline caps the total time spent dialing across all addresses, + // preventing unbounded dialing when many addresses are provided. let future = async move { let mut errors = Vec::with_capacity(num_addresses); - while let Some(result) = futures.next().await { - match result { - Ok(negotiated) => return RawConnectionResult::Connected { negotiated, errors }, - Err(error) => { + // Deadline for the overall dial attempt, including all retries. This is to prevent + // retry attempts from indefinitely delaying the dial result. + let deadline = tokio::time::sleep(2 * connection_open_timeout); + + tokio::pin!(deadline); + tokio::pin!(futures); + + loop { + tokio::select! { + result = futures.next() => { + match result { + Some(Ok(negotiated)) => { + return RawConnectionResult::Connected { + negotiated, + errors, + }; + } + Some(Err(error)) => { + tracing::debug!( + target: LOG_TARGET, + ?connection_id, + ?error, + "failed to open connection", + ); + errors.push(error); + } + None => { + return RawConnectionResult::Failed { + connection_id, + errors, + }; + } + } + } + _ = &mut deadline => { tracing::debug!( target: LOG_TARGET, ?connection_id, - ?error, - "failed to open connection", + ?connection_open_timeout, + "overall dial timeout exceeded", ); - errors.push(error) + return RawConnectionResult::Failed { + connection_id, + errors, + }; } } } - - RawConnectionResult::Failed { - connection_id, - errors, - } }; let (fut, handle) = futures::future::abortable(future); From 7e9261467d29b47b32eceacc47c415a03857507a Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Thu, 19 Feb 2026 16:29:00 +0000 Subject: [PATCH 04/11] config: Add max parallel dials to config and default to litep2p config Signed-off-by: Alexandru Vasile --- src/lib.rs | 6 ++++-- src/transport/tcp/config.rs | 8 +++++++- src/transport/tcp/mod.rs | 5 +++-- src/transport/websocket/config.rs | 8 +++++++- src/transport/websocket/mod.rs | 5 +++-- 5 files changed, 24 insertions(+), 8 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 8d784dac5..3a6401165 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -324,7 +324,8 @@ impl Litep2p { } // enable tcp transport if the config exists - if let Some(config) = litep2p_config.tcp.take() { + if let Some(mut config) = litep2p_config.tcp.take() { + config.max_parallel_dials = litep2p_config.max_parallel_dials; let handle = transport_manager.transport_handle(Arc::clone(&litep2p_config.executor)); let (transport, transport_listen_addresses) = ::new(handle, config, resolver.clone())?; @@ -369,7 +370,8 @@ impl Litep2p { // enable websocket transport if the config exists #[cfg(feature = "websocket")] - if let Some(config) = litep2p_config.websocket.take() { + if let Some(mut config) = litep2p_config.websocket.take() { + config.max_parallel_dials = litep2p_config.max_parallel_dials; let handle = transport_manager.transport_handle(Arc::clone(&litep2p_config.executor)); let (transport, transport_listen_addresses) = ::new(handle, config, resolver)?; diff --git a/src/transport/tcp/config.rs b/src/transport/tcp/config.rs index cc7e7f52b..119a375cb 100644 --- a/src/transport/tcp/config.rs +++ b/src/transport/tcp/config.rs @@ -22,7 +22,7 @@ use crate::{ crypto::noise::{MAX_READ_AHEAD_FACTOR, MAX_WRITE_BUFFER_SIZE}, - transport::{CONNECTION_OPEN_TIMEOUT, SUBSTREAM_OPEN_TIMEOUT}, + transport::{CONNECTION_OPEN_TIMEOUT, MAX_PARALLEL_DIALS, SUBSTREAM_OPEN_TIMEOUT}, }; /// TCP transport configuration. @@ -80,6 +80,11 @@ pub struct Config { /// How long should litep2p wait for a substream to be opened before considering /// the substream rejected. pub substream_open_timeout: std::time::Duration, + + /// Maximum number of parallel dial attempts for a single peer. + /// + /// Defaults to `8`. + pub max_parallel_dials: usize, } impl Default for Config { @@ -96,6 +101,7 @@ impl Default for Config { noise_write_buffer_size: MAX_WRITE_BUFFER_SIZE, connection_open_timeout: CONNECTION_OPEN_TIMEOUT, substream_open_timeout: SUBSTREAM_OPEN_TIMEOUT, + max_parallel_dials: MAX_PARALLEL_DIALS, } } } diff --git a/src/transport/tcp/mod.rs b/src/transport/tcp/mod.rs index cb0c07237..57bee6217 100644 --- a/src/transport/tcp/mod.rs +++ b/src/transport/tcp/mod.rs @@ -30,7 +30,7 @@ use crate::{ config::Config, connection::{NegotiatedConnection, TcpConnection}, }, - Transport, TransportBuilder, TransportEvent, MAX_PARALLEL_DIALS, + Transport, TransportBuilder, TransportEvent, }, types::ConnectionId, utils::futures_stream::FuturesStream, @@ -440,6 +440,7 @@ impl Transport for TcpTransport { let max_write_buffer_size = self.config.noise_write_buffer_size; let connection_open_timeout = self.config.connection_open_timeout; let substream_open_timeout = self.config.substream_open_timeout; + let max_parallel_dials = self.config.max_parallel_dials; let dial_addresses = self.dial_addresses.clone(); let keypair = self.context.keypair.clone(); let nodelay = self.config.nodelay; @@ -482,7 +483,7 @@ impl Transport for TcpTransport { .map_err(|error| (open_address, error.into())) } })) - .buffer_unordered(MAX_PARALLEL_DIALS); + .buffer_unordered(max_parallel_dials); // Future that will resolve to the first successful connection. let future = async move { diff --git a/src/transport/websocket/config.rs b/src/transport/websocket/config.rs index 6efe225d6..b530ce7bf 100644 --- a/src/transport/websocket/config.rs +++ b/src/transport/websocket/config.rs @@ -22,7 +22,7 @@ use crate::{ crypto::noise::{MAX_READ_AHEAD_FACTOR, MAX_WRITE_BUFFER_SIZE}, - transport::{CONNECTION_OPEN_TIMEOUT, SUBSTREAM_OPEN_TIMEOUT}, + transport::{CONNECTION_OPEN_TIMEOUT, MAX_PARALLEL_DIALS, SUBSTREAM_OPEN_TIMEOUT}, }; /// WebSocket transport configuration. @@ -80,6 +80,11 @@ pub struct Config { /// How long should litep2p wait for a substream to be opened before considering /// the substream rejected. pub substream_open_timeout: std::time::Duration, + + /// Maximum number of parallel dial attempts for a single peer. + /// + /// Defaults to `8`. + pub max_parallel_dials: usize, } impl Default for Config { @@ -96,6 +101,7 @@ impl Default for Config { noise_write_buffer_size: MAX_WRITE_BUFFER_SIZE, connection_open_timeout: CONNECTION_OPEN_TIMEOUT, substream_open_timeout: SUBSTREAM_OPEN_TIMEOUT, + max_parallel_dials: MAX_PARALLEL_DIALS, } } } diff --git a/src/transport/websocket/mod.rs b/src/transport/websocket/mod.rs index e89e10512..b5e8ca5ab 100644 --- a/src/transport/websocket/mod.rs +++ b/src/transport/websocket/mod.rs @@ -29,7 +29,7 @@ use crate::{ config::Config, connection::{NegotiatedConnection, WebSocketConnection}, }, - Transport, TransportBuilder, TransportEvent, MAX_PARALLEL_DIALS, + Transport, TransportBuilder, TransportEvent, }, types::ConnectionId, utils::futures_stream::FuturesStream, @@ -481,6 +481,7 @@ impl Transport for WebSocketTransport { let max_read_ahead_factor = self.config.noise_read_ahead_frame_count; let max_write_buffer_size = self.config.noise_write_buffer_size; let substream_open_timeout = self.config.substream_open_timeout; + let max_parallel_dials = self.config.max_parallel_dials; let dial_addresses = self.dial_addresses.clone(); let nodelay = self.config.nodelay; let resolver = self.resolver.clone(); @@ -522,7 +523,7 @@ impl Transport for WebSocketTransport { .map_err(|error| (open_address, error.into())) } })) - .buffer_unordered(MAX_PARALLEL_DIALS); + .buffer_unordered(max_parallel_dials); // Future that will resolve to the first successful connection. // From 4c5e63c1562ca7e16b2d19e3a226dab957c98df3 Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Tue, 24 Feb 2026 12:33:04 +0000 Subject: [PATCH 05/11] transport: Lift the limit from transport builder config Signed-off-by: Alexandru Vasile --- src/lib.rs | 1 - src/transport/manager/mod.rs | 23 ++++------------------- src/transport/mod.rs | 7 +++++++ src/transport/tcp/mod.rs | 5 +++-- src/transport/websocket/mod.rs | 5 +++-- 5 files changed, 17 insertions(+), 24 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 3a6401165..23e9db500 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -174,7 +174,6 @@ impl Litep2p { .with_keypair(litep2p_config.keypair.clone()) .with_supported_transports(supported_transports) .with_bandwidth_sink(bandwidth_sink.clone()) - .with_max_parallel_dials(litep2p_config.max_parallel_dials) .with_connection_limits_config(litep2p_config.connection_limits) .build(); diff --git a/src/transport/manager/mod.rs b/src/transport/manager/mod.rs index 9ed5ed16d..5fdfa7f37 100644 --- a/src/transport/manager/mod.rs +++ b/src/transport/manager/mod.rs @@ -32,7 +32,7 @@ use crate::{ peer_state::{ConnectionRecord, PeerState, StateDialResult}, types::PeerContext, }, - Endpoint, Transport, TransportEvent, MAX_PARALLEL_DIALS, + Endpoint, Transport, TransportEvent, }, types::{protocol::ProtocolName, ConnectionId}, BandwidthSink, PeerId, @@ -205,9 +205,6 @@ pub struct TransportManager { /// Bandwidth sink. bandwidth_sink: BandwidthSink, - /// Maximum parallel dial attempts per peer. - max_parallel_dials: usize, - /// Installed protocols. protocols: HashMap, @@ -265,9 +262,6 @@ pub struct TransportManagerBuilder { /// Bandwidth sink. bandwidth_sink: Option, - /// Maximum parallel dial attempts per peer. - max_parallel_dials: usize, - /// Connection limits config. connection_limits_config: limits::ConnectionLimitsConfig, } @@ -285,7 +279,6 @@ impl TransportManagerBuilder { keypair: None, supported_transports: HashSet::new(), bandwidth_sink: None, - max_parallel_dials: MAX_PARALLEL_DIALS, connection_limits_config: limits::ConnectionLimitsConfig::default(), } } @@ -311,12 +304,6 @@ impl TransportManagerBuilder { self } - /// Set the maximum parallel dials per peer - pub fn with_max_parallel_dials(mut self, max_parrallel_dials: usize) -> Self { - self.max_parallel_dials = max_parrallel_dials; - self - } - /// Set connection limits configuration. pub fn with_connection_limits_config( mut self, @@ -349,7 +336,6 @@ impl TransportManagerBuilder { local_peer_id, keypair, bandwidth_sink: self.bandwidth_sink.unwrap_or_else(BandwidthSink::new), - max_parallel_dials: self.max_parallel_dials, protocols: HashMap::new(), protocol_names: HashSet::new(), listen_addresses, @@ -532,9 +518,6 @@ impl TransportManager { pub async fn dial(&mut self, peer: PeerId) -> crate::Result<()> { // Don't alter the peer state if there's no capacity to dial. let available_capacity = self.connection_limits.on_dial_address()?; - // The available capacity is the maximum number of connections that can be established, - // so we limit the number of parallel dials to the minimum of these values. - let limit = available_capacity.min(self.max_parallel_dials); if peer == self.local_peer_id { return Err(Error::TriedToDialSelf); @@ -552,7 +535,9 @@ impl TransportManager { // The addresses are sorted by score and contain the remote peer ID. // We double checked above that the remote peer is not the local peer. - let dial_addresses = context.addresses.addresses(limit); + // Limit addresses by the available connection capacity. The transport layer + // handles dial concurrency via `max_parallel_dials`. + let dial_addresses = context.addresses.addresses(available_capacity); if dial_addresses.is_empty() { return Err(Error::NoAddressAvailable(peer)); } diff --git a/src/transport/mod.rs b/src/transport/mod.rs index c2cf2250f..03c523765 100644 --- a/src/transport/mod.rs +++ b/src/transport/mod.rs @@ -56,6 +56,13 @@ pub(crate) const KEEP_ALIVE_TIMEOUT: Duration = Duration::from_secs(5); /// Maximum number of parallel dial attempts. pub(crate) const MAX_PARALLEL_DIALS: usize = 8; +/// Multiplier applied to `connection_open_timeout` to derive the overall dial deadline. +/// +/// When dialing multiple addresses concurrently, the total time allowed is +/// `DIAL_DEADLINE_MULTIPLIER * connection_open_timeout`. This gives enough time +/// to cycle through addresses without stalling indefinitely. +pub(crate) const DIAL_DEADLINE_MULTIPLIER: u32 = 2; + /// Connection endpoint. #[derive(Debug, Clone, PartialEq, Eq)] pub enum Endpoint { diff --git a/src/transport/tcp/mod.rs b/src/transport/tcp/mod.rs index 57bee6217..a492c0d3d 100644 --- a/src/transport/tcp/mod.rs +++ b/src/transport/tcp/mod.rs @@ -30,7 +30,7 @@ use crate::{ config::Config, connection::{NegotiatedConnection, TcpConnection}, }, - Transport, TransportBuilder, TransportEvent, + Transport, TransportBuilder, TransportEvent, DIAL_DEADLINE_MULTIPLIER, }, types::ConnectionId, utils::futures_stream::FuturesStream, @@ -490,7 +490,8 @@ impl Transport for TcpTransport { let mut errors = Vec::with_capacity(num_addresses); // Deadline for the overall dial attempt, including all retries. This is to prevent // retry attempts from indefinitely delaying the dial result. - let deadline = tokio::time::sleep(2 * connection_open_timeout); + let deadline = + tokio::time::sleep(DIAL_DEADLINE_MULTIPLIER * connection_open_timeout); tokio::pin!(deadline); tokio::pin!(futures); diff --git a/src/transport/websocket/mod.rs b/src/transport/websocket/mod.rs index b5e8ca5ab..46c1d5be5 100644 --- a/src/transport/websocket/mod.rs +++ b/src/transport/websocket/mod.rs @@ -29,7 +29,7 @@ use crate::{ config::Config, connection::{NegotiatedConnection, WebSocketConnection}, }, - Transport, TransportBuilder, TransportEvent, + Transport, TransportBuilder, TransportEvent, DIAL_DEADLINE_MULTIPLIER, }, types::ConnectionId, utils::futures_stream::FuturesStream, @@ -533,7 +533,8 @@ impl Transport for WebSocketTransport { let mut errors = Vec::with_capacity(num_addresses); // Deadline for the overall dial attempt, including all retries. This is to prevent // retry attempts from indefinitely delaying the dial result. - let deadline = tokio::time::sleep(2 * connection_open_timeout); + let deadline = + tokio::time::sleep(DIAL_DEADLINE_MULTIPLIER * connection_open_timeout); tokio::pin!(deadline); tokio::pin!(futures); From 9789aad3fa9e26047b3e66041ed522605a55bc6d Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Tue, 24 Feb 2026 12:55:02 +0000 Subject: [PATCH 06/11] Apply clippy Signed-off-by: Alexandru Vasile --- src/transport/tcp/mod.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/transport/tcp/mod.rs b/src/transport/tcp/mod.rs index a492c0d3d..b9137963e 100644 --- a/src/transport/tcp/mod.rs +++ b/src/transport/tcp/mod.rs @@ -490,8 +490,7 @@ impl Transport for TcpTransport { let mut errors = Vec::with_capacity(num_addresses); // Deadline for the overall dial attempt, including all retries. This is to prevent // retry attempts from indefinitely delaying the dial result. - let deadline = - tokio::time::sleep(DIAL_DEADLINE_MULTIPLIER * connection_open_timeout); + let deadline = tokio::time::sleep(DIAL_DEADLINE_MULTIPLIER * connection_open_timeout); tokio::pin!(deadline); tokio::pin!(futures); From fa51c6e73659ecbb1214f344043a7b23c42abd61 Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Tue, 24 Feb 2026 12:59:09 +0000 Subject: [PATCH 07/11] Apply fmt Signed-off-by: Alexandru Vasile --- src/transport/websocket/mod.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/transport/websocket/mod.rs b/src/transport/websocket/mod.rs index 46c1d5be5..2db6178e5 100644 --- a/src/transport/websocket/mod.rs +++ b/src/transport/websocket/mod.rs @@ -533,8 +533,7 @@ impl Transport for WebSocketTransport { let mut errors = Vec::with_capacity(num_addresses); // Deadline for the overall dial attempt, including all retries. This is to prevent // retry attempts from indefinitely delaying the dial result. - let deadline = - tokio::time::sleep(DIAL_DEADLINE_MULTIPLIER * connection_open_timeout); + let deadline = tokio::time::sleep(DIAL_DEADLINE_MULTIPLIER * connection_open_timeout); tokio::pin!(deadline); tokio::pin!(futures); From a1da5f00f4bb9ac5099b26feaf1f72c6611f62cc Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Tue, 24 Feb 2026 13:33:35 +0000 Subject: [PATCH 08/11] config: Dont use 0 parallel addresses Signed-off-by: Alexandru Vasile --- src/config.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/config.rs b/src/config.rs index 10cf237b5..e00bd4b28 100644 --- a/src/config.rs +++ b/src/config.rs @@ -265,8 +265,10 @@ impl ConfigBuilder { } /// How many addresses should litep2p attempt to dial in parallel. + /// + /// The provided number is clamped to a minimum of 1. pub fn with_max_parallel_dials(mut self, max_parallel_dials: usize) -> Self { - self.max_parallel_dials = max_parallel_dials; + self.max_parallel_dials = max_parallel_dials.max(1); self } From 0f823ca1a3d7116d84e9de606cf3563409a77ddc Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Tue, 24 Feb 2026 13:35:25 +0000 Subject: [PATCH 09/11] transport: Hide the max parallel from configs Signed-off-by: Alexandru Vasile --- src/transport/tcp/config.rs | 5 +++-- src/transport/websocket/config.rs | 5 +++-- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/src/transport/tcp/config.rs b/src/transport/tcp/config.rs index 119a375cb..30af7f2e7 100644 --- a/src/transport/tcp/config.rs +++ b/src/transport/tcp/config.rs @@ -83,8 +83,9 @@ pub struct Config { /// Maximum number of parallel dial attempts for a single peer. /// - /// Defaults to `8`. - pub max_parallel_dials: usize, + /// Controlled via + /// [`ConfigBuilder::with_max_parallel_dials`](crate::config::ConfigBuilder::with_max_parallel_dials). + pub(crate) max_parallel_dials: usize, } impl Default for Config { diff --git a/src/transport/websocket/config.rs b/src/transport/websocket/config.rs index b530ce7bf..ea1fb0b89 100644 --- a/src/transport/websocket/config.rs +++ b/src/transport/websocket/config.rs @@ -83,8 +83,9 @@ pub struct Config { /// Maximum number of parallel dial attempts for a single peer. /// - /// Defaults to `8`. - pub max_parallel_dials: usize, + /// Controlled via + /// [`ConfigBuilder::with_max_parallel_dials`](crate::config::ConfigBuilder::with_max_parallel_dials). + pub(crate) max_parallel_dials: usize, } impl Default for Config { From 735998b2d3828157ca85cc5bbc67edeb5f4592ec Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Tue, 24 Feb 2026 13:38:41 +0000 Subject: [PATCH 10/11] transport: Log properly the deadline Signed-off-by: Alexandru Vasile --- src/transport/tcp/mod.rs | 5 +++-- src/transport/websocket/mod.rs | 5 +++-- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/src/transport/tcp/mod.rs b/src/transport/tcp/mod.rs index b9137963e..4b063135a 100644 --- a/src/transport/tcp/mod.rs +++ b/src/transport/tcp/mod.rs @@ -490,7 +490,8 @@ impl Transport for TcpTransport { let mut errors = Vec::with_capacity(num_addresses); // Deadline for the overall dial attempt, including all retries. This is to prevent // retry attempts from indefinitely delaying the dial result. - let deadline = tokio::time::sleep(DIAL_DEADLINE_MULTIPLIER * connection_open_timeout); + let dial_deadline = DIAL_DEADLINE_MULTIPLIER * connection_open_timeout; + let deadline = tokio::time::sleep(dial_deadline); tokio::pin!(deadline); tokio::pin!(futures); @@ -526,7 +527,7 @@ impl Transport for TcpTransport { tracing::debug!( target: LOG_TARGET, ?connection_id, - ?connection_open_timeout, + ?dial_deadline, "overall dial timeout exceeded", ); return RawConnectionResult::Failed { diff --git a/src/transport/websocket/mod.rs b/src/transport/websocket/mod.rs index 2db6178e5..a1c33648a 100644 --- a/src/transport/websocket/mod.rs +++ b/src/transport/websocket/mod.rs @@ -533,7 +533,8 @@ impl Transport for WebSocketTransport { let mut errors = Vec::with_capacity(num_addresses); // Deadline for the overall dial attempt, including all retries. This is to prevent // retry attempts from indefinitely delaying the dial result. - let deadline = tokio::time::sleep(DIAL_DEADLINE_MULTIPLIER * connection_open_timeout); + let dial_deadline = DIAL_DEADLINE_MULTIPLIER * connection_open_timeout; + let deadline = tokio::time::sleep(dial_deadline); tokio::pin!(deadline); tokio::pin!(futures); @@ -569,7 +570,7 @@ impl Transport for WebSocketTransport { tracing::debug!( target: LOG_TARGET, ?connection_id, - ?connection_open_timeout, + ?dial_deadline, "overall dial timeout exceeded", ); return RawConnectionResult::Failed { From f7989615a345f348705f851d80098e2578e5bf52 Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Tue, 24 Feb 2026 13:45:39 +0000 Subject: [PATCH 11/11] transport: Document the new fields better Signed-off-by: Alexandru Vasile --- src/transport/tcp/config.rs | 7 ++++--- src/transport/websocket/config.rs | 7 ++++--- 2 files changed, 8 insertions(+), 6 deletions(-) diff --git a/src/transport/tcp/config.rs b/src/transport/tcp/config.rs index 30af7f2e7..3fe11409b 100644 --- a/src/transport/tcp/config.rs +++ b/src/transport/tcp/config.rs @@ -83,9 +83,10 @@ pub struct Config { /// Maximum number of parallel dial attempts for a single peer. /// - /// Controlled via - /// [`ConfigBuilder::with_max_parallel_dials`](crate::config::ConfigBuilder::with_max_parallel_dials). - pub(crate) max_parallel_dials: usize, + /// **Note:** This value is overridden by the top-level + /// [`ConfigBuilder::with_max_parallel_dials`](crate::config::ConfigBuilder::with_max_parallel_dials) + /// when building `Litep2p`. + pub max_parallel_dials: usize, } impl Default for Config { diff --git a/src/transport/websocket/config.rs b/src/transport/websocket/config.rs index ea1fb0b89..0d5aee29c 100644 --- a/src/transport/websocket/config.rs +++ b/src/transport/websocket/config.rs @@ -83,9 +83,10 @@ pub struct Config { /// Maximum number of parallel dial attempts for a single peer. /// - /// Controlled via - /// [`ConfigBuilder::with_max_parallel_dials`](crate::config::ConfigBuilder::with_max_parallel_dials). - pub(crate) max_parallel_dials: usize, + /// **Note:** This value is overridden by the top-level + /// [`ConfigBuilder::with_max_parallel_dials`](crate::config::ConfigBuilder::with_max_parallel_dials) + /// when building `Litep2p`. + pub max_parallel_dials: usize, } impl Default for Config {