Skip to content
This repository was archived by the owner on Nov 15, 2023. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions client/network/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -197,13 +197,14 @@
//! handshake message can be of length 0, in which case the sender has to send a single `0`.
//! - The receiver then either immediately closes the substream, or answers with its own
//! LEB128-prefixed protocol-specific handshake response. The message can be of length 0, in which
//! case a single `0` has to be sent back. The receiver is then encouraged to close its sending
//! side.
//! case a single `0` has to be sent back.
//! - Once the handshake has completed, the notifications protocol is unidirectional. Only the
//! node which initiated the substream can push notifications. If the remote wants to send
//! notifications as well, it has to open its own undirectional substream.
//! - Each notification must be prefixed with an LEB128-encoded length. The encoding of the
//! messages is specific to each protocol.
//! - Either party can signal that it doesn't want a notifications substream anymore by closing
//! its writing side. The other party should respond by closing its own writing side soon after.
//!
//! The API of `sc-network` allows one to register user-defined notification protocols.
//! `sc-network` automatically tries to open a substream towards each node for which the legacy
Expand Down
6 changes: 2 additions & 4 deletions client/network/src/protocol/generic_proto/handler/notif_in.rs
Original file line number Diff line number Diff line change
Expand Up @@ -163,11 +163,9 @@ impl ProtocolsHandler for NotifsInHandler {
}

// Note that we drop the existing substream, which will send an equivalent to a TCP "RST"
// to the remote and force-close the substream. It might seem like an unclean way to get
// to the remote and force-close the substream. It might seem like an unclean way to get
// rid of a substream. However, keep in mind that it is invalid for the remote to open
// multiple such substreams, and therefore sending a "RST" is the correct thing to do.
// Also note that we have already closed our writing side during the initial handshake,
// and we can't close "more" than that anyway.
// multiple such substreams, and therefore sending a "RST" is not an incorrect thing to do.
self.substream = Some(proto);

self.events_queue.push_back(ProtocolsHandlerEvent::Custom(NotifsInHandlerOut::OpenRequest(msg)));
Expand Down
51 changes: 41 additions & 10 deletions client/network/src/protocol/generic_proto/upgrade/notifications.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,13 @@
/// higher-level logic. This message is prefixed with a variable-length integer message length.
/// This message can be empty, in which case `0` is sent.
/// - If node B accepts the substream, it sends back a message with the same properties.
/// Afterwards, the sending side of B is closed.
/// - If instead B refuses the connection (which typically happens because no empty slot is
/// available), then it immediately closes the substream without sending back anything.
/// - Node A can then send notifications to B, prefixed with a variable-length integer indicating
/// the length of the message.
/// - Node A closes its writing side if it doesn't want the notifications substream anymore.
/// - Either node A or node B can signal that it doesn't want this notifications substream anymore
/// by closing its writing side. The other party should respond by also closing their own
/// writing side soon after.
///
/// Notification substreams are unidirectional. If A opens a substream with B, then B is
/// encouraged but not required to open a substream to A as well.
Expand Down Expand Up @@ -80,9 +81,13 @@ enum NotificationsInSubstreamHandshake {
/// User gave us the handshake message. Trying to push it in the socket.
PendingSend(Vec<u8>),
/// Handshake message was pushed in the socket. Still need to flush.
Close,
/// Handshake message successfully sent.
Flush,
/// Handshake message successfully sent and flushed.
Sent,
/// Remote has closed their writing side. We close our own writing side in return.
ClosingInResponseToRemote,
/// Both our side and the remote have closed their writing side.
BothSidesClosed,
}

/// A substream for outgoing notification messages.
Expand Down Expand Up @@ -177,16 +182,14 @@ where TSubstream: AsyncRead + AsyncWrite + Unpin,
// This `Stream` implementation first tries to send back the handshake if necessary.
loop {
match mem::replace(this.handshake, NotificationsInSubstreamHandshake::Sent) {
NotificationsInSubstreamHandshake::Sent =>
return Stream::poll_next(this.socket.as_mut(), cx),
NotificationsInSubstreamHandshake::NotSent => {
*this.handshake = NotificationsInSubstreamHandshake::NotSent;
return Poll::Pending
},
NotificationsInSubstreamHandshake::PendingSend(msg) =>
match Sink::poll_ready(this.socket.as_mut(), cx) {
Poll::Ready(_) => {
*this.handshake = NotificationsInSubstreamHandshake::Close;
*this.handshake = NotificationsInSubstreamHandshake::Flush;
match Sink::start_send(this.socket.as_mut(), io::Cursor::new(msg)) {
Ok(()) => {},
Err(err) => return Poll::Ready(Some(Err(err))),
Expand All @@ -197,15 +200,43 @@ where TSubstream: AsyncRead + AsyncWrite + Unpin,
return Poll::Pending
}
},
NotificationsInSubstreamHandshake::Close =>
match Sink::poll_close(this.socket.as_mut(), cx)? {
NotificationsInSubstreamHandshake::Flush =>
match Sink::poll_flush(this.socket.as_mut(), cx)? {
Poll::Ready(()) =>
*this.handshake = NotificationsInSubstreamHandshake::Sent,
Poll::Pending => {
*this.handshake = NotificationsInSubstreamHandshake::Close;
*this.handshake = NotificationsInSubstreamHandshake::Flush;
return Poll::Pending
}
},

NotificationsInSubstreamHandshake::Sent => {
match Stream::poll_next(this.socket.as_mut(), cx) {
Poll::Ready(None) => *this.handshake =
NotificationsInSubstreamHandshake::ClosingInResponseToRemote,
Poll::Ready(Some(msg)) => {
*this.handshake = NotificationsInSubstreamHandshake::Sent;
return Poll::Ready(Some(msg))
},
Poll::Pending => {
*this.handshake = NotificationsInSubstreamHandshake::Sent;
return Poll::Pending
},
}
},

NotificationsInSubstreamHandshake::ClosingInResponseToRemote =>
match Sink::poll_close(this.socket.as_mut(), cx)? {
Poll::Ready(()) =>
*this.handshake = NotificationsInSubstreamHandshake::BothSidesClosed,
Poll::Pending => {
*this.handshake = NotificationsInSubstreamHandshake::ClosingInResponseToRemote;
return Poll::Pending
}
},

NotificationsInSubstreamHandshake::BothSidesClosed =>
return Poll::Ready(None),
}
}
}
Expand Down