From b5d36a6bc57c7e0965d0871b194f4e65c26fc15e Mon Sep 17 00:00:00 2001 From: Jan Bujak Date: Fri, 17 Jun 2022 10:42:06 +0000 Subject: [PATCH 1/3] Pump the gossip engine while waiting for the BEEFY runtime pallet This fixes a memory leak when the BEEFY gadget is turned on, but the runtime doesn't actually use BEEFY. --- client/beefy/src/worker.rs | 46 ++++++++++++++++++++++---------------- 1 file changed, 27 insertions(+), 19 deletions(-) diff --git a/client/beefy/src/worker.rs b/client/beefy/src/worker.rs index ae466a71abb57..c124324d7aed3 100644 --- a/client/beefy/src/worker.rs +++ b/client/beefy/src/worker.rs @@ -25,9 +25,8 @@ use std::{ }; use codec::{Codec, Decode, Encode}; -use futures::{future, FutureExt, StreamExt}; +use futures::{FutureExt, StreamExt}; use log::{debug, error, info, log_enabled, trace, warn}; -use parking_lot::Mutex; use sc_client_api::{Backend, FinalityNotification, FinalityNotifications}; use sc_network_gossip::GossipEngine; @@ -80,7 +79,7 @@ pub(crate) struct BeefyWorker { runtime: Arc, key_store: BeefyKeystore, signed_commitment_sender: BeefySignedCommitmentSender, - gossip_engine: Arc>>, + gossip_engine: GossipEngine, gossip_validator: Arc>, /// Min delta in block numbers between two blocks, BEEFY should vote on min_block_delta: u32, @@ -143,7 +142,7 @@ where runtime, key_store, signed_commitment_sender, - gossip_engine: Arc::new(Mutex::new(gossip_engine)), + gossip_engine, gossip_validator, // always target at least one block better than current best beefy min_block_delta: min_block_delta.max(1), @@ -471,15 +470,21 @@ where true, ); - self.gossip_engine.lock().gossip_message(topic::(), encoded_message, false); + self.gossip_engine.gossip_message(topic::(), encoded_message, false); } /// Wait for BEEFY runtime pallet to be available. async fn wait_for_runtime_pallet(&mut self) { - self.client - .finality_notification_stream() - .take_while(|notif| { - let at = BlockId::hash(notif.header.hash()); + let gossip_engine = &mut self.gossip_engine; + let mut finality_stream = self.client.finality_notification_stream(); + loop { + futures::select! { + notif = finality_stream.next().fuse() => { + let notif = match notif { + Some(notif) => notif, + None => break + }; + let at = BlockId::hash(notif.header.hash()); if let Some(active) = self.runtime.runtime_api().validator_set(&at).ok().flatten() { if active.id() == GENESIS_AUTHORITY_SET_ID { // When starting from genesis, there is no session boundary digest. @@ -490,16 +495,19 @@ where // worker won't vote until it witnesses a session change. // Once we'll implement 'initial sync' (catch-up), the worker will be able to // start voting right away. - self.handle_finality_notification(notif); - future::ready(false) + self.handle_finality_notification(¬if); + break } else { trace!(target: "beefy", "🥩 Finality notification: {:?}", notif); debug!(target: "beefy", "🥩 Waiting for BEEFY pallet to become available..."); - future::ready(true) } - }) - .for_each(|_| future::ready(())) - .await; + }, + _ = gossip_engine.fuse() => { + break + } + } + } + // get a new stream that provides _new_ notifications (from here on out) self.finality_notifications = self.client.finality_notification_stream(); } @@ -512,7 +520,7 @@ where info!(target: "beefy", "🥩 run BEEFY worker, best grandpa: #{:?}.", self.best_grandpa_block_header.number()); self.wait_for_runtime_pallet().await; - let mut votes = Box::pin(self.gossip_engine.lock().messages_for(topic::()).filter_map( + let mut votes = Box::pin(self.gossip_engine.messages_for(topic::()).filter_map( |notification| async move { trace!(target: "beefy", "🥩 Got vote message: {:?}", notification); @@ -529,11 +537,11 @@ where futures_timer::Delay::new(Duration::from_secs(5)).await; } - let engine = self.gossip_engine.clone(); - let gossip_engine = future::poll_fn(|cx| engine.lock().poll_unpin(cx)); + let gossip_engine = &mut self.gossip_engine; + let finality_notifications = &mut self.finality_notifications; futures::select! { - notification = self.finality_notifications.next().fuse() => { + notification = finality_notifications.next().fuse() => { if let Some(notification) = notification { self.handle_finality_notification(¬ification); } else { From 76f5257ac6cdde416fb03250ac5fbe4b83ec9e2d Mon Sep 17 00:00:00 2001 From: Jan Bujak Date: Fri, 17 Jun 2022 12:03:58 +0000 Subject: [PATCH 2/3] Implement `FusedFuture` for `GossipEngine` --- client/network-gossip/src/bridge.rs | 15 ++++++++++++++- 1 file changed, 14 insertions(+), 1 deletion(-) diff --git a/client/network-gossip/src/bridge.rs b/client/network-gossip/src/bridge.rs index 2e09e7cc614a4..2d086e89b4a10 100644 --- a/client/network-gossip/src/bridge.rs +++ b/client/network-gossip/src/bridge.rs @@ -53,6 +53,8 @@ pub struct GossipEngine { message_sinks: HashMap>>, /// Buffered messages (see [`ForwardingState`]). forwarding_state: ForwardingState, + + is_terminated: bool, } /// A gossip engine receives messages from the network via the `network_event_stream` and forwards @@ -94,6 +96,8 @@ impl GossipEngine { network_event_stream, message_sinks: HashMap::new(), forwarding_state: ForwardingState::Idle, + + is_terminated: false, } } @@ -214,7 +218,10 @@ impl Future for GossipEngine { Event::Dht(_) => {}, }, // The network event stream closed. Do the same for [`GossipValidator`]. - Poll::Ready(None) => return Poll::Ready(()), + Poll::Ready(None) => { + self.is_terminated = true; + return Poll::Ready(()) + }, Poll::Pending => break, } }, @@ -288,6 +295,12 @@ impl Future for GossipEngine { } } +impl futures::future::FusedFuture for GossipEngine { + fn is_terminated(&self) -> bool { + self.is_terminated + } +} + #[cfg(test)] mod tests { use super::*; From 1f5fc7c769568f5cec210976df2631a034d1b8e9 Mon Sep 17 00:00:00 2001 From: Jan Bujak Date: Fri, 17 Jun 2022 11:45:43 +0000 Subject: [PATCH 3/3] Fuse futures outside of loops --- client/beefy/src/worker.rs | 51 ++++++++++++++++++-------------------- 1 file changed, 24 insertions(+), 27 deletions(-) diff --git a/client/beefy/src/worker.rs b/client/beefy/src/worker.rs index c124324d7aed3..735dea0170a62 100644 --- a/client/beefy/src/worker.rs +++ b/client/beefy/src/worker.rs @@ -25,10 +25,10 @@ use std::{ }; use codec::{Codec, Decode, Encode}; -use futures::{FutureExt, StreamExt}; +use futures::StreamExt; use log::{debug, error, info, log_enabled, trace, warn}; -use sc_client_api::{Backend, FinalityNotification, FinalityNotifications}; +use sc_client_api::{Backend, FinalityNotification}; use sc_network_gossip::GossipEngine; use sp_api::{BlockId, ProvideRuntimeApi}; @@ -87,7 +87,6 @@ pub(crate) struct BeefyWorker { rounds: Option>, /// Buffer holding votes for blocks that the client hasn't seen finality for. pending_votes: BTreeMap, Vec, AuthorityId, Signature>>>, - finality_notifications: FinalityNotifications, /// Best block we received a GRANDPA notification for best_grandpa_block_header: ::Header, /// Best block a BEEFY voting round has been concluded for @@ -149,7 +148,6 @@ where metrics, rounds: None, pending_votes: BTreeMap::new(), - finality_notifications: client.finality_notification_stream(), best_grandpa_block_header: last_finalized_header, best_beefy_block: None, last_signed_id: 0, @@ -475,11 +473,11 @@ where /// Wait for BEEFY runtime pallet to be available. async fn wait_for_runtime_pallet(&mut self) { - let gossip_engine = &mut self.gossip_engine; - let mut finality_stream = self.client.finality_notification_stream(); + let mut gossip_engine = &mut self.gossip_engine; + let mut finality_stream = self.client.finality_notification_stream().fuse(); loop { futures::select! { - notif = finality_stream.next().fuse() => { + notif = finality_stream.next() => { let notif = match notif { Some(notif) => notif, None => break @@ -502,14 +500,11 @@ where debug!(target: "beefy", "🥩 Waiting for BEEFY pallet to become available..."); } }, - _ = gossip_engine.fuse() => { + _ = gossip_engine => { break } } } - - // get a new stream that provides _new_ notifications (from here on out) - self.finality_notifications = self.client.finality_notification_stream(); } /// Main loop for BEEFY worker. @@ -520,16 +515,20 @@ where info!(target: "beefy", "🥩 run BEEFY worker, best grandpa: #{:?}.", self.best_grandpa_block_header.number()); self.wait_for_runtime_pallet().await; - let mut votes = Box::pin(self.gossip_engine.messages_for(topic::()).filter_map( - |notification| async move { - trace!(target: "beefy", "🥩 Got vote message: {:?}", notification); - - VoteMessage::, AuthorityId, Signature>::decode( - &mut ¬ification.message[..], - ) - .ok() - }, - )); + let mut finality_notifications = self.client.finality_notification_stream().fuse(); + let mut votes = Box::pin( + self.gossip_engine + .messages_for(topic::()) + .filter_map(|notification| async move { + trace!(target: "beefy", "🥩 Got vote message: {:?}", notification); + + VoteMessage::, AuthorityId, Signature>::decode( + &mut ¬ification.message[..], + ) + .ok() + }) + .fuse(), + ); loop { while self.sync_oracle.is_major_syncing() { @@ -537,18 +536,16 @@ where futures_timer::Delay::new(Duration::from_secs(5)).await; } - let gossip_engine = &mut self.gossip_engine; - let finality_notifications = &mut self.finality_notifications; - + let mut gossip_engine = &mut self.gossip_engine; futures::select! { - notification = finality_notifications.next().fuse() => { + notification = finality_notifications.next() => { if let Some(notification) = notification { self.handle_finality_notification(¬ification); } else { return; } }, - vote = votes.next().fuse() => { + vote = votes.next() => { if let Some(vote) = vote { let block_num = vote.commitment.block_number; if block_num > *self.best_grandpa_block_header.number() { @@ -571,7 +568,7 @@ where return; } }, - _ = gossip_engine.fuse() => { + _ = gossip_engine => { error!(target: "beefy", "🥩 Gossip engine has terminated."); return; }