Skip to content
This repository was archived by the owner on Nov 15, 2023. It is now read-only.
30 changes: 17 additions & 13 deletions client/network/src/protocol/generic_proto/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,10 +95,9 @@ const SYNC_NOTIFICATIONS_BUFFER_SIZE: usize = 2048;
/// consider that we failed to open the substream.
const OPEN_TIMEOUT: Duration = Duration::from_secs(10);

/// After successfully establishing a connection with the remote, we keep the connection open for
/// at least this amount of time in order to give the rest of the code the chance to notify us to
/// open substreams.
const INITIAL_KEEPALIVE_TIME: Duration = Duration::from_secs(5);
/// Keep the connection open for at least this amount of time even if it is closed, in order to
/// give the rest of the code the chance to notify us to open substreams.
const KEEPALIVE_TOLERANCE: Duration = Duration::from_secs(5);

/// Implements the `IntoProtocolsHandler` trait of libp2p.
///
Expand Down Expand Up @@ -132,9 +131,6 @@ pub struct NotifsHandler {
/// send.
out_protocols: Vec<(Cow<'static, str>, Arc<RwLock<Vec<u8>>>)>,

/// When the connection with the remote has been successfully established.
when_connection_open: Instant,

/// Whether we are the connection dialer or listener.
endpoint: ConnectedPoint,

Expand Down Expand Up @@ -167,6 +163,9 @@ enum State {
/// a boolean indicating whether an outgoing substream is still in the process of being
/// opened.
pending_opening: Vec<bool>,

/// When the `Closed` state was entered.
when_closed: Instant,
},

/// Handler is in the "Closed" state. A [`NotifsHandlerOut::OpenDesiredByRemote`] has been emitted.
Expand Down Expand Up @@ -271,8 +270,8 @@ impl IntoProtocolsHandler for NotifsHandlerProto {
out_protocols: self.out_protocols,
peer_id: peer_id.clone(),
endpoint: connected_point.clone(),
when_connection_open: Instant::now(),
state: State::Closed {
when_closed: Instant::now(),
pending_opening: (0..num_out_proto).map(|_| false).collect(),
},
legacy_protocol: self.legacy_protocol,
Expand Down Expand Up @@ -552,7 +551,7 @@ impl ProtocolsHandler for NotifsHandler {
// Received notifications substream.
EitherOutput::First(((_remote_handshake, mut proto), num)) => {
match &mut self.state {
State::Closed { pending_opening } => {
State::Closed { pending_opening, .. } => {
self.events_queue.push_back(ProtocolsHandlerEvent::Custom(
NotifsHandlerOut::OpenDesiredByRemote
));
Expand Down Expand Up @@ -616,7 +615,7 @@ impl ProtocolsHandler for NotifsHandler {
num: Self::OutboundOpenInfo
) {
match &mut self.state {
State::Closed { pending_opening } |
State::Closed { pending_opening, .. } |
State::OpenDesiredByRemote { pending_opening, .. } => {
debug_assert!(pending_opening[num]);
pending_opening[num] = false;
Expand Down Expand Up @@ -678,7 +677,7 @@ impl ProtocolsHandler for NotifsHandler {
match &mut self.state {
State::Closed { .. } | State::OpenDesiredByRemote { .. } => {
let (pending_opening, mut in_substreams) = match &mut self.state {
State::Closed { pending_opening } => (pending_opening, None),
State::Closed { pending_opening, .. } => (pending_opening, None),
State::OpenDesiredByRemote { pending_opening, in_substreams } =>
(pending_opening, Some(mem::replace(in_substreams, Vec::new()))),
_ => unreachable!()
Expand Down Expand Up @@ -743,12 +742,14 @@ impl ProtocolsHandler for NotifsHandler {
State::Open { .. } => {
let pending_opening = self.out_protocols.iter().map(|_| false).collect();
self.state = State::Closed {
when_closed: Instant::now(),
pending_opening,
};
},
State::Opening { out_substreams, .. } => {
let pending_opening = out_substreams.iter().map(|s| s.is_none()).collect();
self.state = State::Closed {
when_closed: Instant::now(),
pending_opening,
};

Expand All @@ -758,6 +759,7 @@ impl ProtocolsHandler for NotifsHandler {
},
State::OpenDesiredByRemote { pending_opening, .. } => {
self.state = State::Closed {
when_closed: Instant::now(),
pending_opening: mem::replace(pending_opening, Vec::new()),
};
}
Expand All @@ -777,7 +779,7 @@ impl ProtocolsHandler for NotifsHandler {
_: ProtocolsHandlerUpgrErr<NotificationsHandshakeError>
) {
match &mut self.state {
State::Closed { pending_opening } | State::OpenDesiredByRemote { pending_opening, .. } => {
State::Closed { pending_opening, .. } | State::OpenDesiredByRemote { pending_opening, .. } => {
debug_assert!(pending_opening[num]);
pending_opening[num] = false;
}
Expand Down Expand Up @@ -832,6 +834,7 @@ impl ProtocolsHandler for NotifsHandler {
// Open failure!
self.state = State::Closed {
pending_opening: (0..self.out_protocols.len()).map(|_| false).collect(),
when_closed: Instant::now(),
};

self.events_queue.push_back(ProtocolsHandlerEvent::Custom(
Expand All @@ -851,7 +854,7 @@ impl ProtocolsHandler for NotifsHandler {
}

match self.state {
State::Closed { .. } => KeepAlive::Until(self.when_connection_open + INITIAL_KEEPALIVE_TIME),
State::Closed { when_closed, .. } => KeepAlive::Until(when_closed + KEEPALIVE_TOLERANCE),
State::OpenDesiredByRemote { .. } | State::Opening { .. } | State::Open { .. } =>
KeepAlive::Yes,
}
Expand Down Expand Up @@ -907,6 +910,7 @@ impl ProtocolsHandler for NotifsHandler {
if !in_substreams.iter().any(|s| s.is_some()) {
self.state = State::Closed {
pending_opening: mem::replace(pending_opening, Vec::new()),
when_closed: Instant::now(),
};
return Poll::Ready(ProtocolsHandlerEvent::Custom(
NotifsHandlerOut::CloseDesired
Expand Down