Skip to content
This repository was archived by the owner on Nov 15, 2023. It is now read-only.
59 changes: 40 additions & 19 deletions client/network/src/protocol/generic_proto/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,10 @@ use futures::{
use log::error;
use parking_lot::{Mutex, RwLock};
use smallvec::SmallVec;
use std::{borrow::Cow, collections::VecDeque, mem, pin::Pin, str, sync::Arc, task::{Context, Poll}, time::Duration};
use std::{
borrow::Cow, cmp, collections::VecDeque, mem, pin::Pin, str, sync::Arc,
task::{Context, Poll}, time::Duration
};
use wasm_timer::Instant;

/// Number of pending notifications in asynchronous contexts.
Expand All @@ -98,10 +101,9 @@ const SYNC_NOTIFICATIONS_BUFFER_SIZE: usize = 2048;
/// consider that we failed to open the substream.
const OPEN_TIMEOUT: Duration = Duration::from_secs(10);

/// After successfully establishing a connection with the remote, we keep the connection open for
/// at least this amount of time in order to give the rest of the code the chance to notify us to
/// open substreams.
const INITIAL_KEEPALIVE_TIME: Duration = Duration::from_secs(5);
/// Keep the connection open for at least this amount of time even if it is closed, in order to
/// give the rest of the code the chance to notify us to open substreams.
const KEEPALIVE_TOLERANCE: Duration = Duration::from_secs(5);

/// Implements the `IntoProtocolsHandler` trait of libp2p.
///
Expand All @@ -126,9 +128,6 @@ pub struct NotifsHandler {
/// List of notification protocols, specified by the user at initialization.
protocols: Vec<Protocol>,

/// When the connection with the remote has been successfully established.
when_connection_open: Instant,

/// Whether we are the connection dialer or listener.
endpoint: ConnectedPoint,

Expand Down Expand Up @@ -174,6 +173,9 @@ enum State {
Closed {
/// True if an outgoing substream is still in the process of being opened.
pending_opening: bool,

/// When the `Closed` state was entered.
closed_at: Instant,
},

/// Protocol is in the "Closed" state. A [`NotifsHandlerOut::OpenDesiredByRemote`] has been
Expand Down Expand Up @@ -243,14 +245,14 @@ impl IntoProtocolsHandler for NotifsHandlerProto {
in_upgrade,
handshake,
state: State::Closed {
closed_at: Instant::now(),
pending_opening: false,
},
max_notification_size: max_size,
}
}).collect(),
peer_id: peer_id.clone(),
endpoint: connected_point.clone(),
when_connection_open: Instant::now(),
legacy_protocol: self.legacy_protocol,
legacy_substreams: SmallVec::new(),
legacy_shutdown: SmallVec::new(),
Expand Down Expand Up @@ -523,7 +525,7 @@ impl ProtocolsHandler for NotifsHandler {
EitherOutput::First(((_remote_handshake, mut new_substream), protocol_index)) => {
let mut protocol_info = &mut self.protocols[protocol_index];
match protocol_info.state {
State::Closed { pending_opening } => {
State::Closed { pending_opening, .. } => {
self.events_queue.push_back(ProtocolsHandlerEvent::Custom(
NotifsHandlerOut::OpenDesiredByRemote {
protocol_index,
Expand Down Expand Up @@ -582,7 +584,7 @@ impl ProtocolsHandler for NotifsHandler {
protocol_index: Self::OutboundOpenInfo
) {
match self.protocols[protocol_index].state {
State::Closed { ref mut pending_opening } |
State::Closed { ref mut pending_opening, .. } |
State::OpenDesiredByRemote { ref mut pending_opening, .. } => {
debug_assert!(*pending_opening);
*pending_opening = false;
Expand Down Expand Up @@ -625,7 +627,7 @@ impl ProtocolsHandler for NotifsHandler {
NotifsHandlerIn::Open { protocol_index } => {
let protocol_info = &mut self.protocols[protocol_index];
match &mut protocol_info.state {
State::Closed { pending_opening } => {
State::Closed { pending_opening, .. } => {
if !*pending_opening {
let proto = NotificationsOut::new(
protocol_info.name.clone(),
Expand Down Expand Up @@ -692,11 +694,13 @@ impl ProtocolsHandler for NotifsHandler {
State::Open { .. } => {
self.protocols[protocol_index].state = State::Closed {
pending_opening: false,
closed_at: Instant::now(),
};
},
State::Opening { .. } => {
self.protocols[protocol_index].state = State::Closed {
pending_opening: true,
closed_at: Instant::now(),
};

self.events_queue.push_back(ProtocolsHandlerEvent::Custom(
Expand All @@ -707,6 +711,7 @@ impl ProtocolsHandler for NotifsHandler {
},
State::OpenDesiredByRemote { pending_opening, .. } => {
self.protocols[protocol_index].state = State::Closed {
closed_at: Instant::now(),
pending_opening,
};
}
Expand All @@ -728,7 +733,7 @@ impl ProtocolsHandler for NotifsHandler {
_: ProtocolsHandlerUpgrErr<NotificationsHandshakeError>
) {
match self.protocols[num].state {
State::Closed { ref mut pending_opening } |
State::Closed { ref mut pending_opening, .. } |
State::OpenDesiredByRemote { ref mut pending_opening, .. } => {
debug_assert!(*pending_opening);
*pending_opening = false;
Expand All @@ -737,6 +742,7 @@ impl ProtocolsHandler for NotifsHandler {
State::Opening { .. } => {
self.protocols[num].state = State::Closed {
pending_opening: false,
closed_at: Instant::now(),
};

self.events_queue.push_back(ProtocolsHandlerEvent::Custom(
Expand All @@ -756,14 +762,28 @@ impl ProtocolsHandler for NotifsHandler {
return KeepAlive::Yes;
}

// `Yes` if any protocol has some activity.
if self.protocols.iter().any(|p| !matches!(p.state, State::Closed { .. })) {
return KeepAlive::Yes;
let mut max_closed_at = None;
for state in self.protocols.iter().map(|p| &p.state) {
match state {
State::Closed { closed_at, .. } => {
max_closed_at = Some(max_closed_at.map(|v| cmp::max(*closed_at, v)).unwrap_or(*closed_at));
}
// Always `Yes` if any protocol has some activity.
_ => return KeepAlive::Yes
}
}

// A grace period of `INITIAL_KEEPALIVE_TIME` must be given to leave time for the remote
// to express desire to open substreams.
KeepAlive::Until(self.when_connection_open + INITIAL_KEEPALIVE_TIME)
// A grace period of `KEEPALIVE_TOLERANCE` is given after the last substream has closed
// in order to give the possibility for the remote to re-open a substream.
// Since all protocols start in the state `Closed { closed_at: Instant::now() }`, this
// also gives a grace period after the connection has opened.
if let Some(max_closed_at) = max_closed_at {
KeepAlive::Until(max_closed_at + KEEPALIVE_TOLERANCE)
} else {
// This branch should never be reached as there is always at least one protocol.
debug_assert!(false);
KeepAlive::No
}
}

fn poll(
Expand Down Expand Up @@ -807,6 +827,7 @@ impl ProtocolsHandler for NotifsHandler {
Poll::Ready(Err(_)) => {
self.protocols[protocol_index].state = State::Closed {
pending_opening: *pending_opening,
closed_at: Instant::now(),
};
return Poll::Ready(ProtocolsHandlerEvent::Custom(
NotifsHandlerOut::CloseDesired { protocol_index }
Expand Down