Skip to content
This repository was archived by the owner on Nov 15, 2023. It is now read-only.
182 changes: 106 additions & 76 deletions client/network/src/protocol/generic_proto/handler/group.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,9 +107,17 @@ pub struct NotifsHandler {
/// Handlers for outbound substreams, and the initial handshake message we send.
out_handlers: Vec<(NotifsOutHandler, Arc<RwLock<Vec<u8>>>)>,

/// Whether we are the connection dialer or listener.
endpoint: ConnectedPoint,

/// Handler for backwards-compatibility.
legacy: LegacyProtoHandler,

/// In the situation where `legacy.is_open()` is true, but we haven't sent out any
/// [`NotifsHandlerOut::Open`] event yet, this contains the handshake received on the legacy
/// substream.
pending_legacy_handshake: Option<Vec<u8>>,

/// State of this handler.
enabled: EnabledState,

Expand All @@ -123,6 +131,9 @@ pub struct NotifsHandler {
/// We use two different channels in order to have two different channel sizes, but from the
/// receiving point of view, the two channels are the same.
/// The receivers are fused in case the user drops the [`NotificationsSink`] entirely.
///
/// Contains `Some` if and only if it has been reported to the user that the substreams are
/// open.
notifications_sink_rx: Option<
stream::Select<
stream::Fuse<mpsc::Receiver<NotificationsSinkMessage>>,
Expand Down Expand Up @@ -159,7 +170,9 @@ impl IntoProtocolsHandler for NotifsHandlerProto {
.into_iter()
.map(|(proto, msg)| (proto.into_handler(remote_peer_id, connected_point), msg))
.collect(),
endpoint: connected_point.clone(),
legacy: self.legacy.into_handler(remote_peer_id, connected_point),
pending_legacy_handshake: None,
enabled: EnabledState::Initial,
pending_in: Vec::new(),
notifications_sink_rx: None,
Expand Down Expand Up @@ -617,87 +630,80 @@ impl ProtocolsHandler for NotifsHandler {
}
}

if let Poll::Ready(ev) = self.legacy.poll(cx) {
return match ev {
ProtocolsHandlerEvent::OutboundSubstreamRequest { protocol, info: () } =>
Poll::Ready(ProtocolsHandlerEvent::OutboundSubstreamRequest {
protocol: protocol.map_upgrade(EitherUpgrade::B),
info: None,
}),
ProtocolsHandlerEvent::Custom(LegacyProtoHandlerOut::CustomProtocolOpen {
endpoint,
received_handshake,
..
}) => {
let (async_tx, async_rx) = mpsc::channel(ASYNC_NOTIFICATIONS_BUFFER_SIZE);
let (sync_tx, sync_rx) = mpsc::channel(SYNC_NOTIFICATIONS_BUFFER_SIZE);
let notifications_sink = NotificationsSink {
inner: Arc::new(NotificationsSinkInner {
async_channel: FuturesMutex::new(async_tx),
sync_channel: Mutex::new(sync_tx),
// If `self.pending_legacy_handshake` is `Some`, we are in a state where the legacy
// substream is open but the user isn't aware yet of the substreams being open.
// When that is the case, neither the legacy substream nor the incoming notifications
// substreams should be polled, otherwise there is a risk of receiving messages from them.
if self.pending_legacy_handshake.is_none() {
while let Poll::Ready(ev) = self.legacy.poll(cx) {
match ev {
ProtocolsHandlerEvent::OutboundSubstreamRequest { protocol, info: () } =>
return Poll::Ready(ProtocolsHandlerEvent::OutboundSubstreamRequest {
protocol: protocol.map_upgrade(EitherUpgrade::B),
info: None,
}),
};

debug_assert!(self.notifications_sink_rx.is_none());
self.notifications_sink_rx = Some(stream::select(async_rx.fuse(), sync_rx.fuse()));

Poll::Ready(ProtocolsHandlerEvent::Custom(
NotifsHandlerOut::Open { endpoint, received_handshake, notifications_sink }
))
},
ProtocolsHandlerEvent::Custom(LegacyProtoHandlerOut::CustomProtocolClosed { endpoint, reason }) => {
// We consciously drop the receivers despite notifications being potentially
// still buffered up.
debug_assert!(self.notifications_sink_rx.is_some());
self.notifications_sink_rx = None;

Poll::Ready(ProtocolsHandlerEvent::Custom(
NotifsHandlerOut::Closed { endpoint, reason }
))
},
ProtocolsHandlerEvent::Custom(LegacyProtoHandlerOut::CustomMessage { message }) =>
Poll::Ready(ProtocolsHandlerEvent::Custom(
NotifsHandlerOut::CustomMessage { message }
)),
ProtocolsHandlerEvent::Custom(LegacyProtoHandlerOut::ProtocolError { is_severe, error }) =>
Poll::Ready(ProtocolsHandlerEvent::Custom(
NotifsHandlerOut::ProtocolError { is_severe, error }
)),
ProtocolsHandlerEvent::Close(err) =>
Poll::Ready(ProtocolsHandlerEvent::Close(NotifsHandlerError::Legacy(err))),
ProtocolsHandlerEvent::Custom(LegacyProtoHandlerOut::CustomProtocolOpen {
received_handshake,
..
}) => {
self.pending_legacy_handshake = Some(received_handshake);
cx.waker().wake_by_ref();
return Poll::Pending;
},
ProtocolsHandlerEvent::Custom(LegacyProtoHandlerOut::CustomProtocolClosed { reason, .. }) => {
// We consciously drop the receivers despite notifications being potentially
// still buffered up.
debug_assert!(self.notifications_sink_rx.is_some());
self.notifications_sink_rx = None;

return Poll::Ready(ProtocolsHandlerEvent::Custom(
NotifsHandlerOut::Closed { endpoint: self.endpoint.clone(), reason }
))
},
ProtocolsHandlerEvent::Custom(LegacyProtoHandlerOut::CustomMessage { message }) => {
debug_assert!(self.notifications_sink_rx.is_some());
return Poll::Ready(ProtocolsHandlerEvent::Custom(
NotifsHandlerOut::CustomMessage { message }
))
},
ProtocolsHandlerEvent::Custom(LegacyProtoHandlerOut::ProtocolError { is_severe, error }) =>
return Poll::Ready(ProtocolsHandlerEvent::Custom(
NotifsHandlerOut::ProtocolError { is_severe, error }
)),
ProtocolsHandlerEvent::Close(err) =>
return Poll::Ready(ProtocolsHandlerEvent::Close(NotifsHandlerError::Legacy(err))),
}
}
}

for (handler_num, (handler, handshake_message)) in self.in_handlers.iter_mut().enumerate() {
while let Poll::Ready(ev) = handler.poll(cx) {
match ev {
ProtocolsHandlerEvent::OutboundSubstreamRequest { .. } =>
error!("Incoming substream handler tried to open a substream"),
ProtocolsHandlerEvent::Close(err) => void::unreachable(err),
ProtocolsHandlerEvent::Custom(NotifsInHandlerOut::OpenRequest(_)) =>
match self.enabled {
EnabledState::Initial => self.pending_in.push(handler_num),
EnabledState::Enabled => {
// We create `handshake_message` on a separate line to be sure
// that the lock is released as soon as possible.
let handshake_message = handshake_message.read().clone();
handler.inject_event(NotifsInHandlerIn::Accept(handshake_message))
for (handler_num, (handler, handshake_message)) in self.in_handlers.iter_mut().enumerate() {
while let Poll::Ready(ev) = handler.poll(cx) {
match ev {
ProtocolsHandlerEvent::OutboundSubstreamRequest { .. } =>
error!("Incoming substream handler tried to open a substream"),
ProtocolsHandlerEvent::Close(err) => void::unreachable(err),
ProtocolsHandlerEvent::Custom(NotifsInHandlerOut::OpenRequest(_)) =>
match self.enabled {
EnabledState::Initial => self.pending_in.push(handler_num),
EnabledState::Enabled => {
// We create `handshake_message` on a separate line to be sure
// that the lock is released as soon as possible.
let handshake_message = handshake_message.read().clone();
handler.inject_event(NotifsInHandlerIn::Accept(handshake_message))
},
EnabledState::Disabled =>
handler.inject_event(NotifsInHandlerIn::Refuse),
},
EnabledState::Disabled =>
handler.inject_event(NotifsInHandlerIn::Refuse),
ProtocolsHandlerEvent::Custom(NotifsInHandlerOut::Closed) => {},
ProtocolsHandlerEvent::Custom(NotifsInHandlerOut::Notif(message)) => {
if self.notifications_sink_rx.is_some() {
let msg = NotifsHandlerOut::Notification {
message,
protocol_name: handler.protocol_name().to_owned().into(),
};
return Poll::Ready(ProtocolsHandlerEvent::Custom(msg));
}
},
ProtocolsHandlerEvent::Custom(NotifsInHandlerOut::Closed) => {},
ProtocolsHandlerEvent::Custom(NotifsInHandlerOut::Notif(message)) => {
// Note that right now the legacy substream has precedence over
// everything. If it is not open, then we consider that nothing is open.
if self.legacy.is_open() {
let msg = NotifsHandlerOut::Notification {
message,
protocol_name: handler.protocol_name().to_owned().into(),
};
return Poll::Ready(ProtocolsHandlerEvent::Custom(msg));
}
},
}
}
}
}
Expand Down Expand Up @@ -725,6 +731,30 @@ impl ProtocolsHandler for NotifsHandler {
}
}

if self.out_handlers.iter().all(|(h, _)| h.is_open() || h.is_refused()) {
if let Some(handshake) = self.pending_legacy_handshake.take() {
let (async_tx, async_rx) = mpsc::channel(ASYNC_NOTIFICATIONS_BUFFER_SIZE);
let (sync_tx, sync_rx) = mpsc::channel(SYNC_NOTIFICATIONS_BUFFER_SIZE);
let notifications_sink = NotificationsSink {
inner: Arc::new(NotificationsSinkInner {
async_channel: FuturesMutex::new(async_tx),
sync_channel: Mutex::new(sync_tx),
}),
};

debug_assert!(self.notifications_sink_rx.is_none());
self.notifications_sink_rx = Some(stream::select(async_rx.fuse(), sync_rx.fuse()));

return Poll::Ready(ProtocolsHandlerEvent::Custom(
NotifsHandlerOut::Open {
endpoint: self.endpoint.clone(),
received_handshake: handshake,
notifications_sink
}
))
}
}

Poll::Pending
}
}
21 changes: 0 additions & 21 deletions client/network/src/protocol/generic_proto/handler/legacy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -222,16 +222,12 @@ pub enum LegacyProtoHandlerOut {
/// Handshake message that has been sent to us.
/// This is normally a "Status" message, but this out of the concern of this code.
received_handshake: Vec<u8>,
/// The connected endpoint.
endpoint: ConnectedPoint,
},

/// Closed a custom protocol with the remote.
CustomProtocolClosed {
/// Reason why the substream closed, for diagnostic purposes.
reason: Cow<'static, str>,
/// The connected endpoint.
endpoint: ConnectedPoint,
},

/// Receives a message on a custom protocol substream.
Expand All @@ -250,18 +246,6 @@ pub enum LegacyProtoHandlerOut {
}

impl LegacyProtoHandler {
/// Returns true if the legacy substream is currently open.
pub fn is_open(&self) -> bool {
match &self.state {
ProtocolState::Init { substreams, .. } => !substreams.is_empty(),
ProtocolState::Opening { .. } => false,
ProtocolState::Normal { substreams, .. } => !substreams.is_empty(),
ProtocolState::Disabled { .. } => false,
ProtocolState::KillAsap => false,
ProtocolState::Poisoned => false,
}
}

/// Enables the handler.
fn enable(&mut self) {
self.state = match mem::replace(&mut self.state, ProtocolState::Poisoned) {
Expand All @@ -285,7 +269,6 @@ impl LegacyProtoHandler {
} else {
let event = LegacyProtoHandlerOut::CustomProtocolOpen {
version: incoming[0].0.protocol_version(),
endpoint: self.endpoint.clone(),
received_handshake: mem::replace(&mut incoming[0].1, Vec::new()),
};
self.events_queue.push_back(ProtocolsHandlerEvent::Custom(event));
Expand Down Expand Up @@ -399,7 +382,6 @@ impl LegacyProtoHandler {
if substreams.is_empty() {
let event = LegacyProtoHandlerOut::CustomProtocolClosed {
reason: "Legacy substream clogged".into(),
endpoint: self.endpoint.clone()
};
self.state = ProtocolState::Disabled {
shutdown: shutdown.into_iter().collect(),
Expand All @@ -413,7 +395,6 @@ impl LegacyProtoHandler {
if substreams.is_empty() {
let event = LegacyProtoHandlerOut::CustomProtocolClosed {
reason: "All substreams have been closed by the remote".into(),
endpoint: self.endpoint.clone()
};
self.state = ProtocolState::Disabled {
shutdown: shutdown.into_iter().collect(),
Expand All @@ -426,7 +407,6 @@ impl LegacyProtoHandler {
if substreams.is_empty() {
let event = LegacyProtoHandlerOut::CustomProtocolClosed {
reason: format!("Error on the last substream: {:?}", err).into(),
endpoint: self.endpoint.clone()
};
self.state = ProtocolState::Disabled {
shutdown: shutdown.into_iter().collect(),
Expand Down Expand Up @@ -492,7 +472,6 @@ impl LegacyProtoHandler {
ProtocolState::Opening { .. } => {
let event = LegacyProtoHandlerOut::CustomProtocolOpen {
version: substream.protocol_version(),
endpoint: self.endpoint.clone(),
received_handshake,
};
self.events_queue.push_back(ProtocolsHandlerEvent::Custom(event));
Expand Down
16 changes: 16 additions & 0 deletions client/network/src/protocol/generic_proto/handler/notif_out.rs
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,22 @@ impl NotifsOutHandler {
}
}

/// Returns `true` if there has been an attempt to open the substream, but the remote refused
/// the substream.
///
/// Always returns `false` if the handler is in a disabled state.
pub fn is_refused(&self) -> bool {
match &self.state {
State::Disabled => false,
State::DisabledOpening => false,
State::DisabledOpen(_) => false,
State::Opening { .. } => false,
State::Refused => true,
State::Open { .. } => false,
State::Poisoned => false,
}
}

/// Returns the name of the protocol that we negotiate.
pub fn protocol_name(&self) -> &[u8] {
&self.protocol_name
Expand Down