diff --git a/client/network/src/protocol/generic_proto/handler.rs b/client/network/src/protocol/generic_proto/handler.rs index 6fdcef1d7a2a1..ad761f22d63bf 100644 --- a/client/network/src/protocol/generic_proto/handler.rs +++ b/client/network/src/protocol/generic_proto/handler.rs @@ -84,7 +84,10 @@ use futures::{ use log::error; use parking_lot::{Mutex, RwLock}; use smallvec::SmallVec; -use std::{borrow::Cow, collections::VecDeque, mem, pin::Pin, str, sync::Arc, task::{Context, Poll}, time::Duration}; +use std::{ + borrow::Cow, cmp, collections::VecDeque, mem, pin::Pin, str, sync::Arc, + task::{Context, Poll}, time::Duration +}; use wasm_timer::Instant; /// Number of pending notifications in asynchronous contexts. @@ -98,10 +101,9 @@ const SYNC_NOTIFICATIONS_BUFFER_SIZE: usize = 2048; /// consider that we failed to open the substream. const OPEN_TIMEOUT: Duration = Duration::from_secs(10); -/// After successfully establishing a connection with the remote, we keep the connection open for -/// at least this amount of time in order to give the rest of the code the chance to notify us to -/// open substreams. -const INITIAL_KEEPALIVE_TIME: Duration = Duration::from_secs(5); +/// Keep the connection open for at least this amount of time even if it is closed, in order to +/// give the rest of the code the chance to notify us to open substreams. +const KEEPALIVE_TOLERANCE: Duration = Duration::from_secs(5); /// Implements the `IntoProtocolsHandler` trait of libp2p. /// @@ -126,9 +128,6 @@ pub struct NotifsHandler { /// List of notification protocols, specified by the user at initialization. protocols: Vec, - /// When the connection with the remote has been successfully established. - when_connection_open: Instant, - /// Whether we are the connection dialer or listener. endpoint: ConnectedPoint, @@ -174,6 +173,9 @@ enum State { Closed { /// True if an outgoing substream is still in the process of being opened. pending_opening: bool, + + /// When the `Closed` state was entered. + closed_at: Instant, }, /// Protocol is in the "Closed" state. A [`NotifsHandlerOut::OpenDesiredByRemote`] has been @@ -243,6 +245,7 @@ impl IntoProtocolsHandler for NotifsHandlerProto { in_upgrade, handshake, state: State::Closed { + closed_at: Instant::now(), pending_opening: false, }, max_notification_size: max_size, @@ -250,7 +253,6 @@ impl IntoProtocolsHandler for NotifsHandlerProto { }).collect(), peer_id: peer_id.clone(), endpoint: connected_point.clone(), - when_connection_open: Instant::now(), legacy_protocol: self.legacy_protocol, legacy_substreams: SmallVec::new(), legacy_shutdown: SmallVec::new(), @@ -523,7 +525,7 @@ impl ProtocolsHandler for NotifsHandler { EitherOutput::First(((_remote_handshake, mut new_substream), protocol_index)) => { let mut protocol_info = &mut self.protocols[protocol_index]; match protocol_info.state { - State::Closed { pending_opening } => { + State::Closed { pending_opening, .. } => { self.events_queue.push_back(ProtocolsHandlerEvent::Custom( NotifsHandlerOut::OpenDesiredByRemote { protocol_index, @@ -582,7 +584,7 @@ impl ProtocolsHandler for NotifsHandler { protocol_index: Self::OutboundOpenInfo ) { match self.protocols[protocol_index].state { - State::Closed { ref mut pending_opening } | + State::Closed { ref mut pending_opening, .. } | State::OpenDesiredByRemote { ref mut pending_opening, .. } => { debug_assert!(*pending_opening); *pending_opening = false; @@ -625,7 +627,7 @@ impl ProtocolsHandler for NotifsHandler { NotifsHandlerIn::Open { protocol_index } => { let protocol_info = &mut self.protocols[protocol_index]; match &mut protocol_info.state { - State::Closed { pending_opening } => { + State::Closed { pending_opening, .. } => { if !*pending_opening { let proto = NotificationsOut::new( protocol_info.name.clone(), @@ -692,11 +694,13 @@ impl ProtocolsHandler for NotifsHandler { State::Open { .. } => { self.protocols[protocol_index].state = State::Closed { pending_opening: false, + closed_at: Instant::now(), }; }, State::Opening { .. } => { self.protocols[protocol_index].state = State::Closed { pending_opening: true, + closed_at: Instant::now(), }; self.events_queue.push_back(ProtocolsHandlerEvent::Custom( @@ -707,6 +711,7 @@ impl ProtocolsHandler for NotifsHandler { }, State::OpenDesiredByRemote { pending_opening, .. } => { self.protocols[protocol_index].state = State::Closed { + closed_at: Instant::now(), pending_opening, }; } @@ -728,7 +733,7 @@ impl ProtocolsHandler for NotifsHandler { _: ProtocolsHandlerUpgrErr ) { match self.protocols[num].state { - State::Closed { ref mut pending_opening } | + State::Closed { ref mut pending_opening, .. } | State::OpenDesiredByRemote { ref mut pending_opening, .. } => { debug_assert!(*pending_opening); *pending_opening = false; @@ -737,6 +742,7 @@ impl ProtocolsHandler for NotifsHandler { State::Opening { .. } => { self.protocols[num].state = State::Closed { pending_opening: false, + closed_at: Instant::now(), }; self.events_queue.push_back(ProtocolsHandlerEvent::Custom( @@ -756,14 +762,28 @@ impl ProtocolsHandler for NotifsHandler { return KeepAlive::Yes; } - // `Yes` if any protocol has some activity. - if self.protocols.iter().any(|p| !matches!(p.state, State::Closed { .. })) { - return KeepAlive::Yes; + let mut max_closed_at = None; + for state in self.protocols.iter().map(|p| &p.state) { + match state { + State::Closed { closed_at, .. } => { + max_closed_at = Some(max_closed_at.map(|v| cmp::max(*closed_at, v)).unwrap_or(*closed_at)); + } + // Always `Yes` if any protocol has some activity. + _ => return KeepAlive::Yes + } } - // A grace period of `INITIAL_KEEPALIVE_TIME` must be given to leave time for the remote - // to express desire to open substreams. - KeepAlive::Until(self.when_connection_open + INITIAL_KEEPALIVE_TIME) + // A grace period of `KEEPALIVE_TOLERANCE` is given after the last substream has closed + // in order to give the possibility for the remote to re-open a substream. + // Since all protocols start in the state `Closed { closed_at: Instant::now() }`, this + // also gives a grace period after the connection has opened. + if let Some(max_closed_at) = max_closed_at { + KeepAlive::Until(max_closed_at + KEEPALIVE_TOLERANCE) + } else { + // This branch should never be reached as there is always at least one protocol. + debug_assert!(false); + KeepAlive::No + } } fn poll( @@ -807,6 +827,7 @@ impl ProtocolsHandler for NotifsHandler { Poll::Ready(Err(_)) => { self.protocols[protocol_index].state = State::Closed { pending_opening: *pending_opening, + closed_at: Instant::now(), }; return Poll::Ready(ProtocolsHandlerEvent::Custom( NotifsHandlerOut::CloseDesired { protocol_index }