Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions src/transport/dummy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use crate::{
types::ConnectionId,
};

use futures::Stream;
use futures::{future::BoxFuture, Stream};
use multiaddr::Multiaddr;

use std::{
Expand Down Expand Up @@ -71,8 +71,8 @@ impl Transport for DummyTransport {
Ok(())
}

fn accept(&mut self, _: ConnectionId) -> crate::Result<()> {
Ok(())
fn accept(&mut self, _: ConnectionId) -> crate::Result<BoxFuture<'static, crate::Result<()>>> {
Ok(Box::pin(async { Ok(()) }))
}

fn accept_pending(&mut self, _connection_id: ConnectionId) -> crate::Result<()> {
Expand Down
74 changes: 65 additions & 9 deletions src/transport/manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ use crate::{
};

use address::{scores, AddressStore};
use futures::{Stream, StreamExt};
use futures::{future::BoxFuture, stream::FuturesUnordered, Stream, StreamExt};
use indexmap::IndexMap;
use multiaddr::{Multiaddr, Protocol};
use multihash::Multihash;
Expand Down Expand Up @@ -249,6 +249,9 @@ pub struct TransportManager {

/// Opening connections errors.
opening_errors: HashMap<ConnectionId, Vec<(Multiaddr, DialError)>>,

/// Pending accept futures with associated connection information.
pending_accept: FuturesUnordered<BoxFuture<'static, (PeerId, Endpoint, crate::Result<()>)>>,
}

/// Builder for [`crate::transport::manager::TransportManager`].
Expand Down Expand Up @@ -351,6 +354,7 @@ impl TransportManagerBuilder {
pending_connections: HashMap::new(),
connection_limits: limits::ConnectionLimits::new(self.connection_limits_config),
opening_errors: HashMap::new(),
pending_accept: FuturesUnordered::new(),
}
}
}
Expand Down Expand Up @@ -1076,6 +1080,35 @@ impl TransportManager {
pub async fn next(&mut self) -> Option<TransportEvent> {
loop {
tokio::select! {
(peer, endpoint, result) = self.pending_accept.select_next_some(), if !self.pending_accept.is_empty() => {
match result {
Ok(()) => {
tracing::trace!(
target: LOG_TARGET,
?peer,
?endpoint,
"connection accepted and protocols notified",
);

return Some(TransportEvent::ConnectionEstablished { peer, endpoint });
}
Err(error) => {
// The pending accept future has failed to inform one of the
// installed protocols about the connection. This can happen when the
// node is shutting down or when the user has dropped the long running protocol.
// To err on the safe side, roll back the state modification done in `on_connection_established`.
self.on_connection_closed(peer, endpoint.connection_id());

tracing::error!(
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as the other comment, if we fail here we need to clean up some data like this here:

.accept_established_connection(endpoint.connection_id(), endpoint.is_listener());

target: LOG_TARGET,
?peer,
?endpoint,
?error,
"failed to notify protocols about connection",
);
}
}
}
event = self.event_rx.recv() => {
let Some(event) = event else {
tracing::error!(
Expand Down Expand Up @@ -1255,16 +1288,36 @@ impl TransportManager {
"accept connection",
);

let _ = self
match self
.transports
.get_mut(&transport)
.expect("transport to exist")
.accept(endpoint.connection_id());
.accept(endpoint.connection_id())
{
Ok(future) => {
// A ConnectionEstablished is propagated to the user once
// all protocols have been notified.
self.pending_accept.push(Box::pin(async move {
let result = future.await;
(peer, endpoint, result)
}));
}
Err(error) => {
// Roll back the state modification done in `on_connection_established` by
// simulating a closed connection. The transport returns an error
// while accepting the connection, which can happen if the transport is
// already closed or the connection is dropped before the accept call.
self.on_connection_closed(peer, endpoint.connection_id());

return Some(TransportEvent::ConnectionEstablished {
peer,
endpoint,
});
tracing::debug!(
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't we need to clean up some data in this case? Accept will not run at all when we arrive here, but we already modified some state in on_connection_established

target: LOG_TARGET,
?peer,
?endpoint,
?error,
"failed to accept connection",
);
}
}
}
Ok(ConnectionEstablishedResult::Reject) => {
tracing::trace!(
Expand Down Expand Up @@ -1459,8 +1512,11 @@ mod tests {
Ok(())
}

fn accept(&mut self, _connection_id: ConnectionId) -> crate::Result<()> {
Ok(())
fn accept(
&mut self,
_connection_id: ConnectionId,
) -> crate::Result<BoxFuture<'static, crate::Result<()>>> {
Ok(Box::pin(async { Ok(()) }))
}

fn accept_pending(&mut self, _connection_id: ConnectionId) -> crate::Result<()> {
Expand Down
12 changes: 10 additions & 2 deletions src/transport/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@

use crate::{error::DialError, transport::manager::TransportHandle, types::ConnectionId, PeerId};

use futures::Stream;
use futures::{future::BoxFuture, Stream};
use hickory_resolver::TokioResolver;
use multiaddr::Multiaddr;

Expand Down Expand Up @@ -204,7 +204,15 @@ pub(crate) trait Transport: Stream + Unpin + Send {
fn dial(&mut self, connection_id: ConnectionId, address: Multiaddr) -> crate::Result<()>;

/// Accept negotiated connection.
fn accept(&mut self, connection_id: ConnectionId) -> crate::Result<()>;
///
/// Returns a future that completes when the connection has been fully established
/// and all installed protocols have been notified via their event channels.
/// This ensures that by the time the caller receives a ConnectionEstablished event,
/// protocols are ready to handle substream operations.
fn accept(
&mut self,
connection_id: ConnectionId,
) -> crate::Result<BoxFuture<'static, crate::Result<()>>>;

/// Accept pending connection.
fn accept_pending(&mut self, connection_id: ConnectionId) -> crate::Result<()>;
Expand Down
9 changes: 3 additions & 6 deletions src/transport/quic/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -231,12 +231,9 @@ impl QuicConnection {
})
}

/// Start event loop for [`QuicConnection`].
pub async fn start(mut self) -> crate::Result<()> {
self.protocol_set
.report_connection_established(self.peer, self.endpoint.clone())
.await?;

/// Start the connection event loop without notifying protocols.
/// This is used when protocols have already been notified during accept().
pub(crate) async fn start(mut self) -> crate::Result<()> {
loop {
tokio::select! {
event = self.connection.accept_bi() => match event {
Expand Down
43 changes: 28 additions & 15 deletions src/transport/quic/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -307,35 +307,48 @@ impl Transport for QuicTransport {
Ok(())
}

fn accept(&mut self, connection_id: ConnectionId) -> crate::Result<()> {
fn accept(
&mut self,
connection_id: ConnectionId,
) -> crate::Result<BoxFuture<'static, crate::Result<()>>> {
let (connection, endpoint) = self
.pending_open
.remove(&connection_id)
.ok_or(Error::ConnectionDoesntExist(connection_id))?;
let bandwidth_sink = self.context.bandwidth_sink.clone();
let protocol_set = self.context.protocol_set(connection_id);
let mut protocol_set = self.context.protocol_set(connection_id);
let substream_open_timeout = self.config.substream_open_timeout;
let executor = self.context.executor.clone();

tracing::trace!(
target: LOG_TARGET,
?connection_id,
"start connection",
);

self.context.executor.run(Box::pin(async move {
let _ = QuicConnection::new(
connection.peer,
endpoint,
connection.connection,
protocol_set,
bandwidth_sink,
substream_open_timeout,
)
.start()
.await;
}));
let peer = connection.peer;
let endpoint_clone = endpoint.clone();

Ok(Box::pin(async move {
// First, notify all protocols about the connection establishment
protocol_set.report_connection_established(peer, endpoint_clone).await?;
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

report_connection_established internally sends an event to protocols. Can it be a problem if the protocol doesn't read the event fast enough?

May be we need to modify InnerTransportEvent::ConnectionEstablished to include oneshot channel and collect ACKs from the protocols?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep this could be a great followup. Would like to keep this separate as we'll need to revalidate if this adds any delays to connection / deadlocks if one single protocol is under pressure during high loads. 🙏


// After protocols are notified, spawn the connection event loop
executor.run(Box::pin(async move {
let _ = QuicConnection::new(
peer,
endpoint,
connection.connection,
protocol_set,
bandwidth_sink,
substream_open_timeout,
)
.start()
.await;
}));

Ok(())
Ok(())
}))
}

fn reject(&mut self, connection_id: ConnectionId) -> crate::Result<()> {
Expand Down
7 changes: 2 additions & 5 deletions src/transport/tcp/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -730,12 +730,9 @@ impl TcpConnection {
}
}

/// Start connection event loop.
/// Start the connection event loop without notifying protocols.
/// This is used when protocols have already been notified during accept().
pub(crate) async fn start(mut self) -> crate::Result<()> {
self.protocol_set
.report_connection_established(self.peer, self.endpoint.clone())
.await?;

loop {
tokio::select! {
substream = self.connection.next() => {
Expand Down
48 changes: 31 additions & 17 deletions src/transport/tcp/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -389,37 +389,51 @@ impl Transport for TcpTransport {
)
}

fn accept(&mut self, connection_id: ConnectionId) -> crate::Result<()> {
fn accept(
&mut self,
connection_id: ConnectionId,
) -> crate::Result<BoxFuture<'static, crate::Result<()>>> {
let context = self
.pending_open
.remove(&connection_id)
.ok_or(Error::ConnectionDoesntExist(connection_id))?;
let protocol_set = self.context.protocol_set(connection_id);
let mut protocol_set = self.context.protocol_set(connection_id);
let bandwidth_sink = self.context.bandwidth_sink.clone();
let next_substream_id = self.context.next_substream_id.clone();
let executor = self.context.executor.clone();

tracing::trace!(
target: LOG_TARGET,
?connection_id,
"start connection",
);

self.context.executor.run(Box::pin(async move {
if let Err(error) =
TcpConnection::new(context, protocol_set, bandwidth_sink, next_substream_id)
.start()
.await
{
tracing::debug!(
target: LOG_TARGET,
?connection_id,
?error,
"connection exited with error",
);
}
}));
let peer = context.peer();
let endpoint = context.endpoint().clone();

Ok(Box::pin(async move {
// First, notify all protocols about the connection establishment
// This ensures that when the accept() future completes, protocols are ready
protocol_set.report_connection_established(peer, endpoint).await?;

// After protocols are notified, spawn the connection event loop
executor.run(Box::pin(async move {
if let Err(error) =
TcpConnection::new(context, protocol_set, bandwidth_sink, next_substream_id)
.start()
.await
{
tracing::debug!(
target: LOG_TARGET,
?connection_id,
?error,
"connection exited with error",
);
}
}));

Ok(())
Ok(())
}))
}

fn reject(&mut self, connection_id: ConnectionId) -> crate::Result<()> {
Expand Down
15 changes: 2 additions & 13 deletions src/transport/webrtc/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -677,19 +677,8 @@ impl WebRtcConnection {
.await;
}

/// Start running event loop of [`WebRtcConnection`].
pub async fn run(mut self) {
tracing::trace!(
target: LOG_TARGET,
peer = ?self.peer,
"start webrtc connection event loop",
);

let _ = self
.protocol_set
.report_connection_established(self.peer, self.endpoint.clone())
.await;

/// Start the connection event loop without notifying protocols.
pub async fn run_event_loop(mut self) {
loop {
// poll output until we get a timeout
let timeout = match self.rtc.poll_output().unwrap() {
Expand Down
Loading
Loading