From 80e7fddaa99229c03f215d033e9bfa742da3fdbe Mon Sep 17 00:00:00 2001 From: dharjeezy Date: Sat, 19 Jul 2025 16:05:19 +0100 Subject: [PATCH 1/5] Repeat ping request periodically --- src/protocol/libp2p/ping/config.rs | 14 ++++++++++++++ src/protocol/libp2p/ping/mod.rs | 20 ++++++++++++++++++-- 2 files changed, 32 insertions(+), 2 deletions(-) diff --git a/src/protocol/libp2p/ping/config.rs b/src/protocol/libp2p/ping/config.rs index 085f25425..a78f3681c 100644 --- a/src/protocol/libp2p/ping/config.rs +++ b/src/protocol/libp2p/ping/config.rs @@ -18,6 +18,7 @@ // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. +use std::time::Duration; use crate::{ codec::ProtocolCodec, protocol::libp2p::ping::PingEvent, types::protocol::ProtocolName, DEFAULT_CHANNEL_SIZE, @@ -36,6 +37,8 @@ const PING_PAYLOAD_SIZE: usize = 32; /// Maximum PING failures. const MAX_FAILURES: usize = 3; +pub const PING_INTERVAL: Duration = Duration::from_secs(15); + /// Ping configuration. pub struct Config { /// Protocol name. @@ -49,6 +52,8 @@ pub struct Config { /// TX channel for sending events to the user protocol. pub(crate) tx_event: Sender, + + pub(crate) ping_interval: Duration, } impl Config { @@ -61,6 +66,7 @@ impl Config { ( Self { tx_event, + ping_interval: PING_INTERVAL, max_failures: MAX_FAILURES, protocol: ProtocolName::from(PROTOCOL_NAME), codec: ProtocolCodec::Identity(PING_PAYLOAD_SIZE), @@ -80,6 +86,7 @@ pub struct ConfigBuilder { /// Maximum failures before the peer is considered unreachable. max_failures: usize, + ping_interval: Duration, } impl Default for ConfigBuilder { @@ -92,6 +99,7 @@ impl ConfigBuilder { /// Create new default [`Config`] which can be modified by the user. pub fn new() -> Self { Self { + ping_interval: PING_INTERVAL, max_failures: MAX_FAILURES, protocol: ProtocolName::from(PROTOCOL_NAME), codec: ProtocolCodec::Identity(PING_PAYLOAD_SIZE), @@ -104,6 +112,11 @@ impl ConfigBuilder { self } + pub fn with_ping_interval(mut self, ping_interval: Duration) -> Self { + self.ping_interval = ping_interval; + self + } + /// Build [`Config`]. pub fn build(self) -> (Config, Box + Send + Unpin>) { let (tx_event, rx_event) = channel(DEFAULT_CHANNEL_SIZE); @@ -111,6 +124,7 @@ impl ConfigBuilder { ( Config { tx_event, + ping_interval: self.ping_interval, max_failures: self.max_failures, protocol: self.protocol, codec: self.codec, diff --git a/src/protocol/libp2p/ping/mod.rs b/src/protocol/libp2p/ping/mod.rs index fa16069fd..3484ec2d4 100644 --- a/src/protocol/libp2p/ping/mod.rs +++ b/src/protocol/libp2p/ping/mod.rs @@ -77,6 +77,8 @@ pub(crate) struct Ping { /// Pending inbound substreams. pending_inbound: FuturesUnordered>>, + + ping_interval: Duration, } impl Ping { @@ -84,6 +86,7 @@ impl Ping { pub fn new(service: TransportService, config: Config) -> Self { Self { service, + ping_interval: config.ping_interval, tx: config.tx_event, peers: HashSet::new(), pending_outbound: FuturesUnordered::new(), @@ -96,7 +99,6 @@ impl Ping { fn on_connection_established(&mut self, peer: PeerId) -> crate::Result<()> { tracing::trace!(target: LOG_TARGET, ?peer, "connection established"); - self.service.open_substream(peer)?; self.peers.insert(peer); Ok(()) @@ -166,12 +168,13 @@ impl Ping { /// Start [`Ping`] event loop. pub async fn run(mut self) { tracing::debug!(target: LOG_TARGET, "starting ping event loop"); + let mut interval = tokio::time::interval(self.ping_interval); loop { tokio::select! { event = self.service.next() => match event { Some(TransportEvent::ConnectionEstablished { peer, .. }) => { - let _ = self.on_connection_established(peer); + self.on_connection_established(peer); } Some(TransportEvent::ConnectionClosed { peer }) => { self.on_connection_closed(peer); @@ -192,6 +195,19 @@ impl Ping { Some(_) => {} None => return, }, + _ = interval.tick() => { + for peer in &self.peers { + tracing::trace!(target: LOG_TARGET, ?peer, "sending ping"); + if let Err(error) = self.service.open_substream(*peer) { + tracing::debug!( + target: LOG_TARGET, + ?peer, + ?error, + "failed to open substream for ping" + ); + } + } + } _event = self.pending_inbound.next(), if !self.pending_inbound.is_empty() => {} event = self.pending_outbound.next(), if !self.pending_outbound.is_empty() => { match event { From d6b39092df59d371d36283d82cb48f3454829317 Mon Sep 17 00:00:00 2001 From: dharjeezy Date: Mon, 6 Oct 2025 18:47:33 +0100 Subject: [PATCH 2/5] keep substream open by introducing stateful peer --- src/protocol/libp2p/ping/mod.rs | 276 ++++++++++++++++++------------ src/protocol/transport_service.rs | 13 +- src/transport/manager/mod.rs | 1 + 3 files changed, 178 insertions(+), 112 deletions(-) diff --git a/src/protocol/libp2p/ping/mod.rs b/src/protocol/libp2p/ping/mod.rs index 3484ec2d4..a0b5e8ba8 100644 --- a/src/protocol/libp2p/ping/mod.rs +++ b/src/protocol/libp2p/ping/mod.rs @@ -24,26 +24,24 @@ use crate::{ error::{Error, SubstreamError}, protocol::{Direction, TransportEvent, TransportService}, substream::Substream, - types::SubstreamId, PeerId, }; -use futures::{future::BoxFuture, stream::FuturesUnordered, StreamExt}; -use tokio::sync::mpsc::Sender; - +use futures::StreamExt; use std::{ - collections::HashSet, + collections::HashMap, time::{Duration, Instant}, }; +use tokio::sync::mpsc; pub use config::{Config, ConfigBuilder}; - mod config; // TODO: https://github.com/paritytech/litep2p/issues/132 let the user handle max failures /// Log target for the file. const LOG_TARGET: &str = "litep2p::ipfs::ping"; +const PING_TIMEOUT: Duration = Duration::from_secs(10); /// Events emitted by the ping protocol. #[derive(Debug)] @@ -56,113 +54,145 @@ pub enum PingEvent { /// Measured ping time with the peer. ping: Duration, }, + Failure { + peer: PeerId, + }, +} + +enum PingCommand { + SendPing, +} + +enum PingResult { + Success(Duration), + Failure, +} + +enum PeerState { + Pending, + Active { + command_tx: mpsc::Sender, + failures: usize, + }, } /// Ping protocol. pub(crate) struct Ping { /// Maximum failures before the peer is considered unreachable. - _max_failures: usize, + max_failures: usize, // Connection service. service: TransportService, /// TX channel for sending events to the user protocol. - tx: Sender, + tx: mpsc::Sender, /// Connected peers. - peers: HashSet, - - /// Pending outbound substreams. - pending_outbound: FuturesUnordered>>, - - /// Pending inbound substreams. - pending_inbound: FuturesUnordered>>, + peers: HashMap, ping_interval: Duration, + + result_rx: mpsc::Receiver<(PeerId, PingResult)>, + result_tx: mpsc::Sender<(PeerId, PingResult)>, } impl Ping { /// Create new [`Ping`] protocol. pub fn new(service: TransportService, config: Config) -> Self { + let (result_tx, result_rx) = mpsc::channel(256); Self { service, - ping_interval: config.ping_interval, tx: config.tx_event, - peers: HashSet::new(), - pending_outbound: FuturesUnordered::new(), - pending_inbound: FuturesUnordered::new(), - _max_failures: config.max_failures, + peers: HashMap::new(), + ping_interval: config.ping_interval, + max_failures: config.max_failures, + result_rx, + result_tx, } } /// Connection established to remote peer. - fn on_connection_established(&mut self, peer: PeerId) -> crate::Result<()> { - tracing::trace!(target: LOG_TARGET, ?peer, "connection established"); - - self.peers.insert(peer); + fn on_connection_established(&mut self, peer: PeerId) { + tracing::debug!(target: LOG_TARGET, ?peer, "connection established, opening ping substream"); - Ok(()) + match self.service.open_substream(peer) { + Ok(_) => { + self.peers.insert(peer, PeerState::Pending); + } + Err(error) => { + tracing::warn!(target: LOG_TARGET, ?peer, ?error, "failed to open ping substream"); + } + } } /// Connection closed to remote peer. fn on_connection_closed(&mut self, peer: PeerId) { - tracing::trace!(target: LOG_TARGET, ?peer, "connection closed"); - + tracing::debug!(target: LOG_TARGET, ?peer, "connection closed"); self.peers.remove(&peer); } /// Handle outbound substream. - fn on_outbound_substream( - &mut self, - peer: PeerId, - substream_id: SubstreamId, - mut substream: Substream, - ) { - tracing::trace!(target: LOG_TARGET, ?peer, "handle outbound substream"); - - self.pending_outbound.push(Box::pin(async move { - let future = async move { - // TODO: https://github.com/paritytech/litep2p/issues/134 generate random payload and verify it - substream.send_framed(vec![0u8; 32].into()).await?; - let now = Instant::now(); - let _ = substream.next().await.ok_or(Error::SubstreamError( - SubstreamError::ReadFailure(Some(substream_id)), - ))?; - let _ = substream.close().await; - - Ok(now.elapsed()) - }; - - match tokio::time::timeout(Duration::from_secs(10), future).await { - Err(_) => Err(Error::Timeout), - Ok(Err(error)) => Err(error), - Ok(Ok(elapsed)) => Ok((peer, elapsed)), - } - })); + fn on_outbound_substream(&mut self, peer: PeerId, substream: Substream) { + tracing::trace!(target: LOG_TARGET, ?peer, "outbound ping substream opened"); + + if let Some(PeerState::Pending) = self.peers.get(&peer) { + let (command_tx, command_rx) = mpsc::channel(1); + let result_tx = self.result_tx.clone(); + + tokio::spawn(handle_ping_substream( + peer, + substream, + command_rx, + result_tx, + )); + + self.peers.insert( + peer, + PeerState::Active { + command_tx, + failures: 0, + }, + ); + } else { + tracing::warn!(target: LOG_TARGET, ?peer, "ping substream opened for non-pending peer"); + } } /// Substream opened to remote peer. - fn on_inbound_substream(&mut self, peer: PeerId, mut substream: Substream) { - tracing::trace!(target: LOG_TARGET, ?peer, "handle inbound substream"); - - self.pending_inbound.push(Box::pin(async move { - let future = async move { - let payload = substream - .next() - .await - .ok_or(Error::SubstreamError(SubstreamError::ReadFailure(None)))??; - substream.send_framed(payload.freeze()).await?; - let _ = substream.next().await.map(|_| ()); - - Ok(()) - }; - - match tokio::time::timeout(Duration::from_secs(10), future).await { - Err(_) => Err(Error::Timeout), - Ok(Err(error)) => Err(error), - Ok(Ok(())) => Ok(()), + fn on_inbound_substream(&mut self, peer: PeerId, substream: Substream) { + tracing::trace!(target: LOG_TARGET, ?peer, "handling inbound ping substream"); + tokio::spawn(handle_inbound_ping(substream)); + } + + async fn on_ping_result(&mut self, peer: PeerId, result: PingResult) { + match self.peers.get_mut(&peer) { + Some(PeerState::Active { failures, .. }) => match result { + PingResult::Success(duration) => { + *failures = 0; + let _ = self.tx.send(PingEvent::Ping { peer, ping: duration }).await; + } + PingResult::Failure => { + *failures += 1; + tracing::debug!(target: LOG_TARGET, ?peer, failures, "ping failure"); + + if *failures >= self.max_failures { + tracing::warn!( + target: LOG_TARGET, + ?peer, + "maximum ping failures reached, closing connection" + ); + let _ = self.tx.send(PingEvent::Failure { peer }).await; + if let Err(e) = self.service.force_close(peer) { + tracing::error!(target: LOG_TARGET, ?peer, ?e, "failed to force close connection"); + } + self.peers.remove(&peer); + } + } + }, + _ => { + tracing::trace!(target: LOG_TARGET, ?peer, "ping result for inactive peer"); } - })); + } } /// Start [`Ping`] event loop. @@ -179,51 +209,79 @@ impl Ping { Some(TransportEvent::ConnectionClosed { peer }) => { self.on_connection_closed(peer); } - Some(TransportEvent::SubstreamOpened { - peer, - substream, - direction, - .. - }) => match direction { - Direction::Inbound => { - self.on_inbound_substream(peer, substream); - } - Direction::Outbound(substream_id) => { - self.on_outbound_substream(peer, substream_id, substream); - } - }, + Some(TransportEvent::SubstreamOpened { peer, substream, direction, .. }) => match direction { + Direction::Outbound(_) => self.on_outbound_substream(peer, substream), + Direction::Inbound => self.on_inbound_substream(peer, substream), + } Some(_) => {} - None => return, + None => { + tracing::debug!(target: LOG_TARGET, "transport service shut down"); + return; + } }, _ = interval.tick() => { - for peer in &self.peers { - tracing::trace!(target: LOG_TARGET, ?peer, "sending ping"); - if let Err(error) = self.service.open_substream(*peer) { - tracing::debug!( - target: LOG_TARGET, - ?peer, - ?error, - "failed to open substream for ping" - ); + for (peer, state) in self.peers.iter() { + if let PeerState::Active { command_tx, .. } = state { + if let Err(e) = command_tx.try_send(PingCommand::SendPing) { + tracing::trace!(target: LOG_TARGET, ?peer, ?e, "failed to send ping command"); + } } } + }, + Some((peer, result)) = self.result_rx.recv() => { + self.on_ping_result(peer, result).await; } - _event = self.pending_inbound.next(), if !self.pending_inbound.is_empty() => {} - event = self.pending_outbound.next(), if !self.pending_outbound.is_empty() => { - match event { - Some(Ok((peer, elapsed))) => { - let _ = self - .tx - .send(PingEvent::Ping { - peer, - ping: elapsed, - }) - .await; + } + } + } +} + +async fn handle_ping_substream( + peer: PeerId, + mut substream: Substream, + mut command_rx: mpsc::Receiver, + result_tx: mpsc::Sender<(PeerId, PingResult)>, +) { + loop { + match command_rx.recv().await { + Some(PingCommand::SendPing) => { + // TODO: https://github.com/paritytech/litep2p/issues/134 generate random payload and verify it + let payload = vec![0u8; 32]; + let future = async { + substream.send_framed(payload.into()).await?; + let now = Instant::now(); + let _ = substream + .next() + .await + .ok_or(Error::SubstreamError(SubstreamError::ConnectionClosed))??; + Ok::<_, Error>(now.elapsed()) + }; + + match tokio::time::timeout(PING_TIMEOUT, future).await { + Ok(Ok(duration)) => { + if result_tx.send((peer, PingResult::Success(duration))).await.is_err() { + break; + } + } + _ => { + if result_tx.send((peer, PingResult::Failure)).await.is_err() { + break; } - event => tracing::debug!(target: LOG_TARGET, "failed to handle ping for an outbound peer: {event:?}"), } } } + None => { + tracing::trace!(target: LOG_TARGET, ?peer, "ping command channel closed, shutting down task"); + break; + } } } } + +async fn handle_inbound_ping(mut substream: Substream) { + while let Some(Ok(payload)) = substream.next().await { + if substream.send_framed(payload.freeze()).await.is_err() { + break; + } + } +} \ No newline at end of file diff --git a/src/protocol/transport_service.rs b/src/protocol/transport_service.rs index b729e9312..e2827182d 100644 --- a/src/protocol/transport_service.rs +++ b/src/protocol/transport_service.rs @@ -287,6 +287,8 @@ pub struct TransportService { /// Close the connection if no substreams are open within this time frame. keep_alive_tracker: KeepAliveTracker, + + counts_towards_keep_alive: bool, } impl TransportService { @@ -298,6 +300,7 @@ impl TransportService { next_substream_id: Arc, transport_handle: TransportManagerHandle, keep_alive_timeout: Duration, + counts_towards_keep_alive: bool, ) -> (Self, Sender) { let (tx, rx) = channel(DEFAULT_CHANNEL_SIZE); @@ -313,6 +316,7 @@ impl TransportService { next_substream_id, connections: HashMap::new(), keep_alive_tracker, + counts_towards_keep_alive }, tx, ) @@ -507,8 +511,11 @@ impl TransportService { "open substream", ); - self.keep_alive_tracker.substream_activity(peer, connection_id); - connection.try_upgrade(); + if self.counts_towards_keep_alive { + self.keep_alive_tracker.substream_activity(peer, connection_id); + connection.try_upgrade(); + } + connection .open_substream( @@ -592,7 +599,7 @@ impl Stream for TransportService { substream, connection_id, }) => { - if protocol == self.protocol { + if protocol == self.protocol && self.counts_towards_keep_alive { self.keep_alive_tracker.substream_activity(peer, connection_id); if let Some(context) = self.connections.get_mut(&peer) { context.try_upgrade(&connection_id); diff --git a/src/transport/manager/mod.rs b/src/transport/manager/mod.rs index f44a07a6b..ad2c17533 100644 --- a/src/transport/manager/mod.rs +++ b/src/transport/manager/mod.rs @@ -348,6 +348,7 @@ impl TransportManager { self.next_substream_id.clone(), self.transport_manager_handle.clone(), keep_alive_timeout, + true ); self.protocols.insert( From 9f8fe8c58494c2754924abc12d29222165212f80 Mon Sep 17 00:00:00 2001 From: dharjeezy Date: Sat, 29 Nov 2025 12:18:47 +0100 Subject: [PATCH 3/5] use stream map for efficiency --- src/protocol/libp2p/ping/mod.rs | 225 +++++++++----------------------- 1 file changed, 62 insertions(+), 163 deletions(-) diff --git a/src/protocol/libp2p/ping/mod.rs b/src/protocol/libp2p/ping/mod.rs index a0b5e8ba8..48783bd12 100644 --- a/src/protocol/libp2p/ping/mod.rs +++ b/src/protocol/libp2p/ping/mod.rs @@ -27,12 +27,15 @@ use crate::{ PeerId, }; -use futures::StreamExt; +use futures::{SinkExt, StreamExt}; use std::{ collections::HashMap, time::{Duration, Instant}, }; +use bytes::Bytes; +use futures::stream::SplitSink; use tokio::sync::mpsc; +use tokio_stream::StreamMap; pub use config::{Config, ConfigBuilder}; mod config; @@ -54,60 +57,40 @@ pub enum PingEvent { /// Measured ping time with the peer. ping: Duration, }, - Failure { - peer: PeerId, - }, -} - -enum PingCommand { - SendPing, -} - -enum PingResult { - Success(Duration), - Failure, -} - -enum PeerState { - Pending, - Active { - command_tx: mpsc::Sender, - failures: usize, - }, } /// Ping protocol. pub(crate) struct Ping { - /// Maximum failures before the peer is considered unreachable. - max_failures: usize, - // Connection service. service: TransportService, /// TX channel for sending events to the user protocol. tx: mpsc::Sender, - /// Connected peers. - peers: HashMap, + /// Inbound: The "Listening" half of the substreams. + /// StreamMap handles polling all of them efficiently. + read_streams: StreamMap>, - ping_interval: Duration, + /// Outbound: The "Writing" half of the substreams. + /// We look these up when the timer ticks to send a Ping. + write_sinks: HashMap>, + + /// We need to track when we sent the ping to calculate the duration. + ping_times: HashMap, - result_rx: mpsc::Receiver<(PeerId, PingResult)>, - result_tx: mpsc::Sender<(PeerId, PingResult)>, + ping_interval: Duration, } impl Ping { /// Create new [`Ping`] protocol. pub fn new(service: TransportService, config: Config) -> Self { - let (result_tx, result_rx) = mpsc::channel(256); Self { service, tx: config.tx_event, - peers: HashMap::new(), ping_interval: config.ping_interval, - max_failures: config.max_failures, - result_rx, - result_tx, + read_streams: StreamMap::new(), + write_sinks: HashMap::new(), + ping_times: HashMap::new(), } } @@ -115,84 +98,25 @@ impl Ping { fn on_connection_established(&mut self, peer: PeerId) { tracing::debug!(target: LOG_TARGET, ?peer, "connection established, opening ping substream"); - match self.service.open_substream(peer) { - Ok(_) => { - self.peers.insert(peer, PeerState::Pending); - } - Err(error) => { - tracing::warn!(target: LOG_TARGET, ?peer, ?error, "failed to open ping substream"); - } + if let Err(error) = self.service.open_substream(peer) { + tracing::debug!(target: LOG_TARGET, ?peer, ?error, "failed to open substream"); } } /// Connection closed to remote peer. fn on_connection_closed(&mut self, peer: PeerId) { tracing::debug!(target: LOG_TARGET, ?peer, "connection closed"); - self.peers.remove(&peer); + self.read_streams.remove(&peer); + self.write_sinks.remove(&peer); + self.ping_times.remove(&peer); } - /// Handle outbound substream. - fn on_outbound_substream(&mut self, peer: PeerId, substream: Substream) { - tracing::trace!(target: LOG_TARGET, ?peer, "outbound ping substream opened"); - - if let Some(PeerState::Pending) = self.peers.get(&peer) { - let (command_tx, command_rx) = mpsc::channel(1); - let result_tx = self.result_tx.clone(); - - tokio::spawn(handle_ping_substream( - peer, - substream, - command_rx, - result_tx, - )); - - self.peers.insert( - peer, - PeerState::Active { - command_tx, - failures: 0, - }, - ); - } else { - tracing::warn!(target: LOG_TARGET, ?peer, "ping substream opened for non-pending peer"); - } - } + /// Helper to register a substream (used for both inbound and outbound). + fn register_substream(&mut self, peer: PeerId, substream: Substream) { + let (sink, stream) = substream.split(); - /// Substream opened to remote peer. - fn on_inbound_substream(&mut self, peer: PeerId, substream: Substream) { - tracing::trace!(target: LOG_TARGET, ?peer, "handling inbound ping substream"); - tokio::spawn(handle_inbound_ping(substream)); - } - - async fn on_ping_result(&mut self, peer: PeerId, result: PingResult) { - match self.peers.get_mut(&peer) { - Some(PeerState::Active { failures, .. }) => match result { - PingResult::Success(duration) => { - *failures = 0; - let _ = self.tx.send(PingEvent::Ping { peer, ping: duration }).await; - } - PingResult::Failure => { - *failures += 1; - tracing::debug!(target: LOG_TARGET, ?peer, failures, "ping failure"); - - if *failures >= self.max_failures { - tracing::warn!( - target: LOG_TARGET, - ?peer, - "maximum ping failures reached, closing connection" - ); - let _ = self.tx.send(PingEvent::Failure { peer }).await; - if let Err(e) = self.service.force_close(peer) { - tracing::error!(target: LOG_TARGET, ?peer, ?e, "failed to force close connection"); - } - self.peers.remove(&peer); - } - } - }, - _ => { - tracing::trace!(target: LOG_TARGET, ?peer, "ping result for inactive peer"); - } - } + self.read_streams.insert(peer, stream); + self.write_sinks.insert(peer, sink); } /// Start [`Ping`] event loop. @@ -209,79 +133,54 @@ impl Ping { Some(TransportEvent::ConnectionClosed { peer }) => { self.on_connection_closed(peer); } - Some(TransportEvent::SubstreamOpened { peer, substream, direction, .. }) => match direction { - Direction::Outbound(_) => self.on_outbound_substream(peer, substream), - Direction::Inbound => self.on_inbound_substream(peer, substream), + Some(TransportEvent::SubstreamOpened { peer, substream, .. }) => { + tracing::trace!(target: LOG_TARGET, ?peer, "registering ping substream"); + self.register_substream(peer, substream); } Some(_) => {} - None => { - tracing::debug!(target: LOG_TARGET, "transport service shut down"); - return; - } + None => return, }, + _ = interval.tick() => { - for (peer, state) in self.peers.iter() { - if let PeerState::Active { command_tx, .. } = state { - if let Err(e) = command_tx.try_send(PingCommand::SendPing) { - tracing::trace!(target: LOG_TARGET, ?peer, ?e, "failed to send ping command"); - } + for (peer, sink) in self.write_sinks.iter_mut() { + let payload = vec![0u8; 32]; + + self.ping_times.insert(*peer, Instant::now()); + tracing::trace!(target: LOG_TARGET, ?peer, "sending ping"); + + if let Err(error) = sink.send(Bytes::from(payload)).await { + tracing::debug!(target: LOG_TARGET, ?peer, ?error, "failed to send ping"); + } } - }, - Some((peer, result)) = self.result_rx.recv() => { - self.on_ping_result(peer, result).await; } - } - } - } -} - -async fn handle_ping_substream( - peer: PeerId, - mut substream: Substream, - mut command_rx: mpsc::Receiver, - result_tx: mpsc::Sender<(PeerId, PingResult)>, -) { - loop { - match command_rx.recv().await { - Some(PingCommand::SendPing) => { - // TODO: https://github.com/paritytech/litep2p/issues/134 generate random payload and verify it - let payload = vec![0u8; 32]; - let future = async { - substream.send_framed(payload.into()).await?; - let now = Instant::now(); - let _ = substream - .next() - .await - .ok_or(Error::SubstreamError(SubstreamError::ConnectionClosed))??; - Ok::<_, Error>(now.elapsed()) - }; - match tokio::time::timeout(PING_TIMEOUT, future).await { - Ok(Ok(duration)) => { - if result_tx.send((peer, PingResult::Success(duration))).await.is_err() { - break; + Some((peer, event)) = self.read_streams.next() => { + match event { + Ok(payload) => { + if let Some(started) = self.ping_times.remove(&peer) { + + let elapsed = started.elapsed(); + tracing::trace!(target: LOG_TARGET, ?peer, ?elapsed, "pong received"); + let _ = self.tx.send(PingEvent::Ping { peer, ping: elapsed }).await; + } else { + if let Some(sink) = self.write_sinks.get_mut(&peer) { + tracing::trace!(target: LOG_TARGET, ?peer, "sending pong"); + if let Err(error) = sink.send(payload.freeze()).await { + tracing::debug!(target: LOG_TARGET, ?peer, ?error, "failed to send pong"); + } + } + } } - } - _ => { - if result_tx.send((peer, PingResult::Failure)).await.is_err() { - break; + Err(error) => { + tracing::debug!(target: LOG_TARGET, ?peer, ?error, "ping substream closed/error"); + self.read_streams.remove(&peer); + self.write_sinks.remove(&peer); + self.ping_times.remove(&peer); } } } } - None => { - tracing::trace!(target: LOG_TARGET, ?peer, "ping command channel closed, shutting down task"); - break; - } - } - } -} - -async fn handle_inbound_ping(mut substream: Substream) { - while let Some(Ok(payload)) = substream.next().await { - if substream.send_framed(payload.freeze()).await.is_err() { - break; } } } \ No newline at end of file From f26e25c9e927dfb0ec3740803408f8dcf32e44d2 Mon Sep 17 00:00:00 2001 From: dharjeezy Date: Sun, 14 Dec 2025 09:40:22 +0100 Subject: [PATCH 4/5] separate inbound and outbound streams --- src/protocol/libp2p/ping/mod.rs | 97 +++++++++++++++++++++++---------- 1 file changed, 69 insertions(+), 28 deletions(-) diff --git a/src/protocol/libp2p/ping/mod.rs b/src/protocol/libp2p/ping/mod.rs index 48783bd12..74438d7e1 100644 --- a/src/protocol/libp2p/ping/mod.rs +++ b/src/protocol/libp2p/ping/mod.rs @@ -67,13 +67,21 @@ pub(crate) struct Ping { /// TX channel for sending events to the user protocol. tx: mpsc::Sender, - /// Inbound: The "Listening" half of the substreams. - /// StreamMap handles polling all of them efficiently. - read_streams: StreamMap>, + /// Streams we read Pongs from. + outbound_streams: StreamMap>, - /// Outbound: The "Writing" half of the substreams. - /// We look these up when the timer ticks to send a Ping. - write_sinks: HashMap>, + /// Sinks we write Pings to. + outbound_sinks: HashMap>, + + /// Streams we read Pings from. + /// Keyed by a local counter to handle multiple streams per peer if necessary. + inbound_streams: StreamMap>, + + /// Sinks we write Pongs to. + inbound_sinks: HashMap>, + + /// Counter for generating unique keys for inbound streams. + inbound_id_counter: usize, /// We need to track when we sent the ping to calculate the duration. ping_times: HashMap, @@ -88,9 +96,12 @@ impl Ping { service, tx: config.tx_event, ping_interval: config.ping_interval, - read_streams: StreamMap::new(), - write_sinks: HashMap::new(), + outbound_streams: StreamMap::new(), + outbound_sinks: HashMap::new(), ping_times: HashMap::new(), + inbound_streams: StreamMap::new(), + inbound_sinks: HashMap::new(), + inbound_id_counter: 0, } } @@ -106,17 +117,31 @@ impl Ping { /// Connection closed to remote peer. fn on_connection_closed(&mut self, peer: PeerId) { tracing::debug!(target: LOG_TARGET, ?peer, "connection closed"); - self.read_streams.remove(&peer); - self.write_sinks.remove(&peer); + self.outbound_streams.remove(&peer); + self.outbound_sinks.remove(&peer); self.ping_times.remove(&peer); } - /// Helper to register a substream (used for both inbound and outbound). - fn register_substream(&mut self, peer: PeerId, substream: Substream) { + /// Handle outbound substream (We initiated) + /// Registers it into the Outbound pipeline. + fn on_outbound_substream(&mut self, peer: PeerId, substream: Substream) { + tracing::trace!(target: LOG_TARGET, ?peer, "outbound ping substream registered"); + let (sink, stream) = substream.split(); + self.outbound_streams.insert(peer, stream); + self.outbound_sinks.insert(peer, sink); + } + + /// Handle inbound substream (They initiated). + /// Registers it into the Inbound pipeline. + fn on_inbound_substream(&mut self, peer: PeerId, substream: Substream) { + tracing::trace!(target: LOG_TARGET, ?peer, "inbound ping substream registered"); let (sink, stream) = substream.split(); - self.read_streams.insert(peer, stream); - self.write_sinks.insert(peer, sink); + let id = self.inbound_id_counter; + self.inbound_id_counter += 1; + + self.inbound_streams.insert(id, stream); + self.inbound_sinks.insert(id, sink); } /// Start [`Ping`] event loop. @@ -133,16 +158,20 @@ impl Ping { Some(TransportEvent::ConnectionClosed { peer }) => { self.on_connection_closed(peer); } - Some(TransportEvent::SubstreamOpened { peer, substream, .. }) => { - tracing::trace!(target: LOG_TARGET, ?peer, "registering ping substream"); - self.register_substream(peer, substream); + Some(TransportEvent::SubstreamOpened { peer, substream, direction,.. }) => match direction { + Direction::Inbound => { + self.on_inbound_substream(peer, substream); + } + Direction::Outbound(_) => { + self.on_outbound_substream(peer, substream); + } } Some(_) => {} None => return, }, _ = interval.tick() => { - for (peer, sink) in self.write_sinks.iter_mut() { + for (peer, sink) in self.outbound_sinks.iter_mut() { let payload = vec![0u8; 32]; self.ping_times.insert(*peer, Instant::now()); @@ -155,7 +184,8 @@ impl Ping { } } - Some((peer, event)) = self.read_streams.next() => { + // Handle Outbound Responses (Pong is expected here) + Some((peer, event)) = self.outbound_streams.next() => { match event { Ok(payload) => { if let Some(started) = self.ping_times.remove(&peer) { @@ -163,23 +193,34 @@ impl Ping { let elapsed = started.elapsed(); tracing::trace!(target: LOG_TARGET, ?peer, ?elapsed, "pong received"); let _ = self.tx.send(PingEvent::Ping { peer, ping: elapsed }).await; - } else { - if let Some(sink) = self.write_sinks.get_mut(&peer) { - tracing::trace!(target: LOG_TARGET, ?peer, "sending pong"); - if let Err(error) = sink.send(payload.freeze()).await { - tracing::debug!(target: LOG_TARGET, ?peer, ?error, "failed to send pong"); - } - } } } Err(error) => { tracing::debug!(target: LOG_TARGET, ?peer, ?error, "ping substream closed/error"); - self.read_streams.remove(&peer); - self.write_sinks.remove(&peer); + self.outbound_streams.remove(&peer); + self.outbound_sinks.remove(&peer); self.ping_times.remove(&peer); } } } + + // Handle Outbound Responses (Ping is expected here) + Some((id, event)) = self.inbound_streams.next() => { + match event { + Ok(payload) => { + if let Some(sink) = self.inbound_sinks.get_mut(&id) { + tracing::trace!(target: LOG_TARGET, ?id, "sending pong"); + if let Err(error) = sink.send(payload.freeze()).await { + tracing::debug!(target: LOG_TARGET, ?id, ?error, "failed to send pong"); + } + } + } + Err(_) => { + self.inbound_streams.remove(&id); + self.inbound_sinks.remove(&id); + } + } + } } } } From 9c535ed1631e67b6703d8e59f5ca0adba155e32c Mon Sep 17 00:00:00 2001 From: dharjeezy Date: Sun, 14 Dec 2025 09:41:12 +0100 Subject: [PATCH 5/5] fmt --- src/protocol/libp2p/ping/config.rs | 2 +- src/protocol/libp2p/ping/mod.rs | 7 +++---- src/protocol/transport_service.rs | 3 +-- src/transport/manager/mod.rs | 2 +- 4 files changed, 6 insertions(+), 8 deletions(-) diff --git a/src/protocol/libp2p/ping/config.rs b/src/protocol/libp2p/ping/config.rs index a78f3681c..8b44b14ad 100644 --- a/src/protocol/libp2p/ping/config.rs +++ b/src/protocol/libp2p/ping/config.rs @@ -18,11 +18,11 @@ // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. -use std::time::Duration; use crate::{ codec::ProtocolCodec, protocol::libp2p::ping::PingEvent, types::protocol::ProtocolName, DEFAULT_CHANNEL_SIZE, }; +use std::time::Duration; use futures::Stream; use tokio::sync::mpsc::{channel, Sender}; diff --git a/src/protocol/libp2p/ping/mod.rs b/src/protocol/libp2p/ping/mod.rs index 74438d7e1..7e895490b 100644 --- a/src/protocol/libp2p/ping/mod.rs +++ b/src/protocol/libp2p/ping/mod.rs @@ -27,13 +27,12 @@ use crate::{ PeerId, }; -use futures::{SinkExt, StreamExt}; +use bytes::Bytes; +use futures::{stream::SplitSink, SinkExt, StreamExt}; use std::{ collections::HashMap, time::{Duration, Instant}, }; -use bytes::Bytes; -use futures::stream::SplitSink; use tokio::sync::mpsc; use tokio_stream::StreamMap; @@ -224,4 +223,4 @@ impl Ping { } } } -} \ No newline at end of file +} diff --git a/src/protocol/transport_service.rs b/src/protocol/transport_service.rs index e2827182d..16fa2e0fe 100644 --- a/src/protocol/transport_service.rs +++ b/src/protocol/transport_service.rs @@ -316,7 +316,7 @@ impl TransportService { next_substream_id, connections: HashMap::new(), keep_alive_tracker, - counts_towards_keep_alive + counts_towards_keep_alive, }, tx, ) @@ -516,7 +516,6 @@ impl TransportService { connection.try_upgrade(); } - connection .open_substream( self.protocol.clone(), diff --git a/src/transport/manager/mod.rs b/src/transport/manager/mod.rs index ad2c17533..a8237005c 100644 --- a/src/transport/manager/mod.rs +++ b/src/transport/manager/mod.rs @@ -348,7 +348,7 @@ impl TransportManager { self.next_substream_id.clone(), self.transport_manager_handle.clone(), keep_alive_timeout, - true + true, ); self.protocols.insert(