Skip to content
This repository was archived by the owner on Nov 15, 2023. It is now read-only.
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
143 changes: 79 additions & 64 deletions client/network/src/protocol/notifications/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -188,10 +188,10 @@ enum State {
/// We use two different channels in order to have two different channel sizes, but from
/// the receiving point of view, the two channels are the same.
/// The receivers are fused in case the user drops the [`NotificationsSink`] entirely.
notifications_sink_rx: stream::Select<
notifications_sink_rx: stream::Peekable<stream::Select<
stream::Fuse<mpsc::Receiver<NotificationsSinkMessage>>,
stream::Fuse<mpsc::Receiver<NotificationsSinkMessage>>
>,
>>,

/// Outbound substream that has been accepted by the remote.
///
Expand Down Expand Up @@ -552,7 +552,7 @@ impl ProtocolsHandler for NotifsHandler {
};

self.protocols[protocol_index].state = State::Open {
notifications_sink_rx: stream::select(async_rx.fuse(), sync_rx.fuse()),
notifications_sink_rx: stream::select(async_rx.fuse(), sync_rx.fuse()).peekable(),
out_substream: Some(substream),
in_substream: in_substream.take(),
};
Expand Down Expand Up @@ -716,8 +716,80 @@ impl ProtocolsHandler for NotifsHandler {
return Poll::Ready(ev);
}

// For each open substream, try send messages from `notifications_sink_rx` to the
// substream.
for protocol_index in 0..self.protocols.len() {
if let State::Open { notifications_sink_rx, out_substream: Some(out_substream), .. }
= &mut self.protocols[protocol_index].state
{
loop {
// Only proceed with `out_substream.poll_ready_unpin` if there is an element
// available in `notifications_sink_rx`. This avoids waking up the task when
// a substream is ready to send if there isn't actually something to send.
match Pin::new(&mut *notifications_sink_rx).as_mut().poll_peek(cx) {
Poll::Ready(Some(&NotificationsSinkMessage::ForceClose)) => {
return Poll::Ready(
ProtocolsHandlerEvent::Close(NotifsHandlerError::SyncNotificationsClogged)
);
},
Poll::Ready(Some(&NotificationsSinkMessage::Notification { .. })) => {},
Poll::Ready(None) | Poll::Pending => break,
}

// Before we extract the element from `notifications_sink_rx`, check that the
// substream is ready to accept a message.
match out_substream.poll_ready_unpin(cx) {
Poll::Ready(_) => {},
Poll::Pending => break
}

// Now that the substream is ready for a message, grab what to send.
let message = match notifications_sink_rx.poll_next_unpin(cx) {
Poll::Ready(Some(NotificationsSinkMessage::Notification { message })) => message,
Poll::Ready(Some(NotificationsSinkMessage::ForceClose))
| Poll::Ready(None)
| Poll::Pending => {
// Should never be reached, as per `poll_peek` above.
debug_assert!(false);
break;
}
};

let _ = out_substream.start_send_unpin(message);
// Note that flushing is performed later down this function.
}
}
}

// Flush all outbound substreams.
// When `poll` returns `Poll::Ready`, the libp2p `Swarm` may decide to no longer call
// `poll` again before it is ready to accept more events.
// In order to make sure that substreams are flushed as soon as possible, the flush is
// performed before the code paths that can produce `Ready` (with some rare exceptions).
// Importantly, however, the flush is performed *after* notifications are queued with
// `Sink::start_send`.
for protocol_index in 0..self.protocols.len() {
match &mut self.protocols[protocol_index].state {
State::Open { out_substream: out_substream @ Some(_), .. } => {
match Sink::poll_flush(Pin::new(out_substream.as_mut().unwrap()), cx) {
Poll::Pending | Poll::Ready(Ok(())) => {},
Poll::Ready(Err(_)) => {
*out_substream = None;
let event = NotifsHandlerOut::CloseDesired { protocol_index };
return Poll::Ready(ProtocolsHandlerEvent::Custom(event));
}
};
}

State::Closed { .. } |
State::Opening { .. } |
State::Open { out_substream: None, .. } |
State::OpenDesiredByRemote { .. } => {}
}
}

// Poll inbound substreams.
for protocol_index in 0..self.protocols.len() {
// Poll inbound substreams.
// Inbound substreams being closed is always tolerated, except for the
// `OpenDesiredByRemote` state which might need to be switched back to `Closed`.
match &mut self.protocols[protocol_index].state {
Expand Down Expand Up @@ -763,68 +835,11 @@ impl ProtocolsHandler for NotifsHandler {
}
}
}

// Poll outbound substream.
match &mut self.protocols[protocol_index].state {
State::Open { out_substream: out_substream @ Some(_), .. } => {
match Sink::poll_flush(Pin::new(out_substream.as_mut().unwrap()), cx) {
Poll::Pending | Poll::Ready(Ok(())) => {},
Poll::Ready(Err(_)) => {
*out_substream = None;
let event = NotifsHandlerOut::CloseDesired { protocol_index };
return Poll::Ready(ProtocolsHandlerEvent::Custom(event));
}
};
}

State::Closed { .. } |
State::Opening { .. } |
State::Open { out_substream: None, .. } |
State::OpenDesiredByRemote { .. } => {}
}

if let State::Open { notifications_sink_rx, out_substream: Some(out_substream), .. }
= &mut self.protocols[protocol_index].state
{
loop {
// Before we poll the notifications sink receiver, check that the substream
// is ready to accept a message.
match out_substream.poll_ready_unpin(cx) {
Poll::Ready(_) => {},
Poll::Pending => break
}

// Now that all substreams are ready for a message, grab what to send.
let message = match notifications_sink_rx.poll_next_unpin(cx) {
Poll::Ready(Some(msg)) => msg,
Poll::Ready(None) | Poll::Pending => break,
};

match message {
NotificationsSinkMessage::Notification { message } => {
let _ = out_substream.start_send_unpin(message);

// Calling `start_send_unpin` only queues the message. Actually
// emitting the message is done with `poll_flush`. In order to
// not introduce too much complexity, this flushing is done earlier
// in the body of this `poll()` method. As such, we schedule a task
// wake-up now in order to guarantee that `poll()` will be called
// again and the flush happening.
// At the time of the writing of this comment, a rewrite of this
// code is being planned. If you find this comment in the wild and
// the rewrite didn't happen, please consider a refactor.
cx.waker().wake_by_ref();
}
NotificationsSinkMessage::ForceClose => {
return Poll::Ready(
ProtocolsHandlerEvent::Close(NotifsHandlerError::SyncNotificationsClogged)
);
}
}
}
}
}

// This is the only place in this method that can return `Pending`.
// By putting it at the very bottom, we are guaranteed that everything has been properly
// polled.
Poll::Pending
}
}