-
Notifications
You must be signed in to change notification settings - Fork 2.7k
[WIP] Refactor gossiping system #4125
Conversation
|
Thanks for tackling this and the detailed pull request description! I have
Given that this pull request is still marked as work-in-progress, can you |
The PR is mostly finished, except for fixing the tests, maybe add documentation, and tackling the remaining TODOs. |
| /// | ||
| /// Also note that even we have a valid open substream, it may in fact be already closed | ||
| /// without us knowing, in which case the packet will not be received. | ||
| pub fn write_notif( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
write_notification please
| }, | ||
|
|
||
| /// Sends a notifications message. | ||
| SendNotif { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
SendNotification
|
Please note that my confidence in these changes is not 100%. I strongly encourage we deploy it to a test network first. |
client/network/src/service.rs
Outdated
| Ok(Async::Ready(Some(BehaviourOut::FinalityProofImport(origin, hash, nb, proof)))) => | ||
| self.import_queue.import_finality_proof(origin, hash, nb, proof), | ||
| Ok(Async::Ready(Some(BehaviourOut::Event(ev)))) => { | ||
| self.events_streams.retain(|sender| sender.unbounded_send(ev.clone()).is_ok()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Am I right with my assumption that for each gossip instance that we run we duplicate every event once? Can we be smarter? I would guess it is difficult to do this as a follow up without breaking anything, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah I thought about that and it's amongst the things I wanted to fix before merging.
I guess we could put the message as a Bytes (which is kind of the equivalent of an Arc<Vec<u8>>).
| //! Utility for gossip of network messages between nodes. | ||
| //! Handles chain-specific and standard BFT messages. | ||
| //! | ||
| //! Gossip messages are separated by two categories: "topics" and consensus engine ID. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you mention what a proto_name is as well? When I look at the registration logic within Grandpa, they seem to be synonymous to the consensus engine identifier?
let service = GossipEngine::new(service, &b"/sub/grandpa"[..], GRANDPA_ENGINE_ID, validator.clone());|
|
||
| async_std::task::spawn(async move { | ||
| let mut stream = Compat01As03::new(network.events_stream()); | ||
| while let Some(Ok(event)) = stream.next().await { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In case sending fails within network worker we drop the sender, thus the channel will be closed. I think we should either log the send failure in network worker, or log that the channel is closed here.
Network worker:
Ok(Async::Ready(Some(BehaviourOut::Event(ev)))) => {
self.events_streams.retain(|sender| sender.unbounded_send(ev.clone()).is_ok());
return Ok(Async::Ready(Some(ev)));
},There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To me that's a legitimate way to end the task.
If the NetworkWorker gets destroyed, then we want this background task to get closed as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, and with the above down and upgrade of the Arc we ensure to end the other spawned task as well?
client/network/src/service.rs
Outdated
| /// Receiver for queries from the light client that must be processed. | ||
| light_client_rqs: Option<mpsc::UnboundedReceiver<RequestData<B>>>, | ||
| /// Senders for events that happen on the network. | ||
| events_streams: Vec<mpsc::UnboundedSender<Event>>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| events_streams: Vec<mpsc::UnboundedSender<Event>>, | |
| event_streams: Vec<mpsc::UnboundedSender<Event>>, |
I would call this event_streams given that a single stream already implies the plural for event.
| // TODO: self.announce_block(block, associated_data) | ||
| } | ||
|
|
||
| pub fn set_sync_fork_request(&self, peers: Vec<network::PeerId>, hash: B::Hash, number: NumberFor<B>) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
While I know it is a convenient place to put this, this is not related to gossiping but much rather a way to communicate with another component (sync). I am guessing that only the Grandpa module needs this function. Thus this could e.g. be a channel between the Grandpa and the Sync instance. What do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There's actually already an abstraction for that, but it's used only in one module:
substrate/client/finality-grandpa/src/lib.rs
Line 271 in 2fa6a26
| pub(crate) trait BlockSyncRequester<Block: BlockT> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right. This is used as a way to do dependency injection and was meant for the finality grandpa module only.
Are you suggesting to have NetworkService implement this trait and thus still passing a clone of NetworkService into grandpa?
| /// | ||
| /// The stream never ends (unless the `NetworkWorker` gets shut down). | ||
| // Note: when transitioning to stable futures, remove the `Error` entirely | ||
| pub fn events_stream(&self) -> impl Stream<Item = Event, Error = ()> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| pub fn events_stream(&self) -> impl Stream<Item = Event, Error = ()> { | |
| pub fn event_stream(&self) -> impl Stream<Item = Event, Error = ()> { |
| Ok(Async::Ready(Some(BehaviourOut::FinalityProofImport(origin, hash, nb, proof)))) => | ||
| self.import_queue.import_finality_proof(origin, hash, nb, proof), | ||
| Ok(Async::Ready(Some(BehaviourOut::Event(ev)))) => { | ||
| self.event_streams.retain(|sender| sender.unbounded_send(ev.clone()).is_ok()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Follow up to #4125 (comment) (can't comment there no more).
Either a Bytes or we make the protocol name available here so we can filter early on?
|
|
||
| async_std::task::spawn(async move { | ||
| let mut stream = Compat01As03::new(network.events_stream()); | ||
| while let Some(Ok(event)) = stream.next().await { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, and with the above down and upgrade of the Arc we ensure to end the other spawned task as well?
|
How does this handle backwards compatibility? |
@arkpar I guess the goal is to be fully backwards compatible, even though I remember @tomaka mentioning somewhere else that there was something missing. Can you add more details here @tomaka? What else is missing? Would you like another round of review on this pull request or help with anything in particular? |
There is kind of a fundamental issue: what if the remote opens a gossip substream before we do so? This can't happen with the code right now but could theoretically happen and will happen in the future once we deprecate the legacy protocol. We would then either need to report to the API user that we received a message from a closed connection (not great), or report to the API user that we opened a connection, but that it is not unavailable for syncing and light requests. |
|
Because of the size of the changes, I'm really not confident of the robustness of this code. This should also make it way easier to review. |
|
Closing as it was superseded by #4284 |
|
Friendly ping @tomaka. (Not implying that you have to do the work, but trying to define our goals.) |
|
Yes, we still have to do part 2, except that at this point I'd prefer to start from scratch. |
|
Effort continued in #4909. |
This PR is a work in progress. It isn't finished yet (there are panics and TODOs), but I'm far ahead enough and I'm opening it in order to raise awareness around the changes and possibly a discussion.
Refactors the gossiping system of Substrate.
These changes are (more or less) the outcome of the meeting we had on the second day of the retreat.
Changes to the API surface of the network
NetworkService::with_gossipandNetworkService::gossip_consensus_message. Theconsensus_gossipmodule has also disappeared.NetworkService::events_stream,NetworkService::register_notif_protocolandNetworkService::write_notif.Eventenum`: when we open a notifications substream, when we close a notifications substream, and when we receive a message on a notifications substream.(the terminology "notification" used below can be seen a synonym of "gossiping message", except that is a more generic concept)
The way you use the new API is as follow: first, register a "notifications protocol" using
register_notif_protocol. At the same time, callevents_streamin order to obtain aStream<Item = Event>. The events received on this stream inform you about everything that happens on the network, most notably which notifications channels we open/close and the notifications that we receive. Callwrite_notifin order to send a notification on an open channel.Just like it is the case right now, it is the network which decides who we open channels with.
New crate: substrate-network-gossip
The new network API is way more low-level than previously. Most notably, there is no validation of messages and no method that does multicasting.
The reason for that is that these capabilities have been moved to a new crate called
substrate-network-gossip. Thesubstrate-finality-grandpacrate now depends onsubstrate-network-gossiprather thansubstrate-networkalone.The idea behind this design is that rather than having one pool of messages inside of the network crate that is used by all gossiping protocols combined, each protocol would have its own messages pool.
There are two ideas behind this design:
substrate-network-gossip.substrate-network-gossipcrate.substrate-network-gossipcrate.Changes in substrate-finality-grandpa
The finality-grandpa crate has been adjusted for these changes:
Networktrait located infinality-grandpa. This was providing a networking abstraction. We instead require a different trait, located insubstrate-network-gossip.Networktrait has been removed entirely.GossipEngine(callingGossipEngine::new) rather than aregister_validatormethod on theNetwork.GossipEnginethat is passed around everywhere rather than aNthat implementsNetwork. SinceGossipEngineis only generic over the type of block, I removed a lot of template parameters.Changes inside of substrate-network
First of all, everything has been made backwards-compatible with the current network.
The protocols handler (i.e. the task dedicated to each connection) is aware of all the registered notifications protocols. In addition to the "legacy substream" (the one we use for everything at the moment), it tries to open a substream for each notification protocol. If the remote doesn't support the protocol, then it's not a big deal.
When we send a notifications message, the handler tries to see if the corresponding protocol-specific substream is open. If so, it sends the message on it. If not, it encodes a
ConsensusMessageand sends it on the single main substream.When we receive a
ConsensusMessageon the single main substream, we report it as anEventas if it came on the dedicated protocol-specific substream.The protocol-specific substreams are unidirectional, which means that each side has to open their own. One substream is used for messages from A to B, and a different one is used for messages from B to A. The substream handshake is as follow: after the protocol has been negotiated using multistream-select, the receiver has to send back a single protocol-specific handshake message. This is an occasion for the receiver to gracefully refuse a substream if it wishes so, by closing the substream before sending the handshake message. After B sends a handshake message, only A is allowed to send messages on this substream.