From 89e813ea281947cf5791d1d5743aec4dfdd31f29 Mon Sep 17 00:00:00 2001 From: Wei Tang Date: Thu, 2 Jul 2020 22:14:10 +0200 Subject: [PATCH 01/11] BabeWorker -> BabeSlotWorker --- client/consensus/babe/src/lib.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/client/consensus/babe/src/lib.rs b/client/consensus/babe/src/lib.rs index 961b0382c5858..a4f560319b405 100644 --- a/client/consensus/babe/src/lib.rs +++ b/client/consensus/babe/src/lib.rs @@ -387,7 +387,7 @@ pub fn start_babe(BabeParams { CAW: CanAuthorWith + Send, { let config = babe_link.config; - let worker = BabeWorker { + let worker = BabeSlotWorker { client: client.clone(), block_import: Arc::new(Mutex::new(block_import)), env, @@ -417,7 +417,7 @@ pub fn start_babe(BabeParams { )) } -struct BabeWorker { +struct BabeSlotWorker { client: Arc, block_import: Arc>, env: E, @@ -428,7 +428,7 @@ struct BabeWorker { config: Config, } -impl sc_consensus_slots::SimpleSlotWorker for BabeWorker where +impl sc_consensus_slots::SimpleSlotWorker for BabeSlotWorker where B: BlockT, C: ProvideRuntimeApi + ProvideCache + @@ -599,7 +599,7 @@ impl sc_consensus_slots::SimpleSlotWorker for BabeWork } } -impl SlotWorker for BabeWorker where +impl SlotWorker for BabeSlotWorker where B: BlockT, C: ProvideRuntimeApi + ProvideCache + From 3c398f59c22bec3fc0bef11e52464d344609e588 Mon Sep 17 00:00:00 2001 From: Wei Tang Date: Thu, 2 Jul 2020 22:14:21 +0200 Subject: [PATCH 02/11] SlotWorker::notify_slot: similar to claim_slot, but called no matter authoring --- client/consensus/slots/src/lib.rs | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/client/consensus/slots/src/lib.rs b/client/consensus/slots/src/lib.rs index 7687d3114b31d..7d346ffe3954d 100644 --- a/client/consensus/slots/src/lib.rs +++ b/client/consensus/slots/src/lib.rs @@ -104,6 +104,15 @@ pub trait SimpleSlotWorker { epoch_data: &Self::EpochData, ) -> Option; + /// Notifies the given slot. Similar to `claim_slot`, but will be called no matter whether we + /// need to author blocks or not. + fn notify_slot( + &self, + _header: &B::Header, + _slot_number: u64, + _epoch_data: &Self::EpochData, + ) { } + /// Return the pre digest data to include in a block authored with the given claim. fn pre_digest_data( &self, @@ -191,6 +200,8 @@ pub trait SimpleSlotWorker { } }; + self.notify_slot(&chain_head, slot_number, &epoch_data); + let authorities_len = self.authorities_len(&epoch_data); if !self.force_authoring() && From b371bffb2f25043d1982d7c7328537d2e2856bb0 Mon Sep 17 00:00:00 2001 From: Wei Tang Date: Thu, 2 Jul 2020 22:23:53 +0200 Subject: [PATCH 03/11] Wrap the future with a new struct BabeWorker --- client/consensus/babe/src/lib.rs | 29 +++++++++++++++++++++++------ 1 file changed, 23 insertions(+), 6 deletions(-) diff --git a/client/consensus/babe/src/lib.rs b/client/consensus/babe/src/lib.rs index a4f560319b405..e24c52040a4e7 100644 --- a/client/consensus/babe/src/lib.rs +++ b/client/consensus/babe/src/lib.rs @@ -370,7 +370,7 @@ pub fn start_babe(BabeParams { babe_link, can_author_with, }: BabeParams) -> Result< - impl futures::Future, + BabeWorker, sp_consensus::Error, > where B: BlockT, @@ -378,13 +378,13 @@ pub fn start_babe(BabeParams { + HeaderBackend + HeaderMetadata + Send + Sync + 'static, C::Api: BabeApi, SC: SelectChain + 'static, - E: Environment + Send + Sync, + E: Environment + Send + Sync + 'static, E::Proposer: Proposer>, I: BlockImport> + Send + Sync + 'static, Error: std::error::Error + Send + From + From + 'static, - SO: SyncOracle + Send + Sync + Clone, - CAW: CanAuthorWith + Send, + SO: SyncOracle + Send + Sync + Clone + 'static, + CAW: CanAuthorWith + Send + 'static, { let config = babe_link.config; let worker = BabeSlotWorker { @@ -406,7 +406,7 @@ pub fn start_babe(BabeParams { )?; info!(target: "babe", "👶 Starting BABE Authorship worker"); - Ok(sc_consensus_slots::start_slot_worker( + let inner = sc_consensus_slots::start_slot_worker( config.0, select_chain, worker, @@ -414,7 +414,24 @@ pub fn start_babe(BabeParams { inherent_data_providers, babe_link.time_source, can_author_with, - )) + ); + Ok(BabeWorker { inner: Box::pin(inner) }) +} + +/// Worker for Babe which implements `Future`. This must be polled. +pub struct BabeWorker { + inner: Pin>>, +} + +impl futures::Future for BabeWorker { + type Output = (); + + fn poll( + mut self: Pin<&mut Self>, + cx: &mut futures::task::Context + ) -> futures::task::Poll { + self.inner.as_mut().poll(cx) + } } struct BabeSlotWorker { From cbee13ad766093f22187790c58a09a2fc4fb06ae Mon Sep 17 00:00:00 2001 From: Wei Tang Date: Thu, 2 Jul 2020 22:30:55 +0200 Subject: [PATCH 04/11] Add type definition slot_notification_sinks --- Cargo.lock | 1 + client/consensus/babe/Cargo.toml | 1 + client/consensus/babe/src/lib.rs | 15 +++++++++++---- 3 files changed, 13 insertions(+), 4 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 9ea9b890570c0..83393fa648f1c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6231,6 +6231,7 @@ dependencies = [ "sp-keyring", "sp-runtime", "sp-timestamp", + "sp-utils", "sp-version", "substrate-prometheus-endpoint", "substrate-test-runtime-client", diff --git a/client/consensus/babe/Cargo.toml b/client/consensus/babe/Cargo.toml index 46c67e8917125..e879c44a0f95e 100644 --- a/client/consensus/babe/Cargo.toml +++ b/client/consensus/babe/Cargo.toml @@ -37,6 +37,7 @@ sp-consensus-vrf = { version = "0.8.0-rc4", path = "../../../primitives/consensu sc-consensus-uncles = { version = "0.8.0-rc4", path = "../uncles" } sc-consensus-slots = { version = "0.8.0-rc4", path = "../slots" } sp-runtime = { version = "2.0.0-rc4", path = "../../../primitives/runtime" } +sp-utils = { version = "2.0.0-rc4", path = "../../../primitives/utils" } fork-tree = { version = "2.0.0-rc4", path = "../../../utils/fork-tree" } prometheus-endpoint = { package = "substrate-prometheus-endpoint", path = "../../../utils/prometheus", version = "0.8.0-rc4"} futures = "0.3.4" diff --git a/client/consensus/babe/src/lib.rs b/client/consensus/babe/src/lib.rs index e24c52040a4e7..89fe14e24d03b 100644 --- a/client/consensus/babe/src/lib.rs +++ b/client/consensus/babe/src/lib.rs @@ -106,6 +106,7 @@ use sc_client_api::{ BlockchainEvents, ProvideUncles, }; use sp_block_builder::BlockBuilder as BlockBuilderApi; +use sp_utils::mpsc::{tracing_unbounded, TracingUnboundedSender}; use futures::prelude::*; use log::{debug, info, log, trace, warn}; @@ -370,7 +371,7 @@ pub fn start_babe(BabeParams { babe_link, can_author_with, }: BabeParams) -> Result< - BabeWorker, + BabeWorker, sp_consensus::Error, > where B: BlockT, @@ -387,6 +388,8 @@ pub fn start_babe(BabeParams { CAW: CanAuthorWith + Send + 'static, { let config = babe_link.config; + let slot_notification_sinks = Arc::new(Mutex::new(Vec::new())); + let worker = BabeSlotWorker { client: client.clone(), block_import: Arc::new(Mutex::new(block_import)), @@ -415,15 +418,19 @@ pub fn start_babe(BabeParams { babe_link.time_source, can_author_with, ); - Ok(BabeWorker { inner: Box::pin(inner) }) + Ok(BabeWorker { + inner: Box::pin(inner), + slot_notification_sinks, + }) } /// Worker for Babe which implements `Future`. This must be polled. -pub struct BabeWorker { +pub struct BabeWorker { inner: Pin>>, + slot_notification_sinks: Arc, Epoch>)>>>>, } -impl futures::Future for BabeWorker { +impl futures::Future for BabeWorker { type Output = (); fn poll( From 8657c187bb6cc276349817bc9236844f5b086388 Mon Sep 17 00:00:00 2001 From: Wei Tang Date: Thu, 2 Jul 2020 22:34:05 +0200 Subject: [PATCH 05/11] Function slot_notification_streams for the receiver side --- client/consensus/babe/src/lib.rs | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/client/consensus/babe/src/lib.rs b/client/consensus/babe/src/lib.rs index 89fe14e24d03b..39a5083ff6d73 100644 --- a/client/consensus/babe/src/lib.rs +++ b/client/consensus/babe/src/lib.rs @@ -106,7 +106,7 @@ use sc_client_api::{ BlockchainEvents, ProvideUncles, }; use sp_block_builder::BlockBuilder as BlockBuilderApi; -use sp_utils::mpsc::{tracing_unbounded, TracingUnboundedSender}; +use sp_utils::mpsc::{tracing_unbounded, TracingUnboundedSender, TracingUnboundedReceiver}; use futures::prelude::*; use log::{debug, info, log, trace, warn}; @@ -430,6 +430,18 @@ pub struct BabeWorker { slot_notification_sinks: Arc, Epoch>)>>>>, } +impl BabeWorker { + /// Return an event stream of notifications for when new slot happens, and the corresponding + /// epoch descriptor. + pub fn slot_notification_stream( + &self + ) -> TracingUnboundedReceiver<(u64, ViableEpochDescriptor, Epoch>)> { + let (sink, stream) = tracing_unbounded("mpsc_babe_slot_notifications"); + self.slot_notification_sinks.lock().push(sink); + stream + } +} + impl futures::Future for BabeWorker { type Output = (); From b3ca09b1b9d5936eb2e52b93f50fdb6886eaef80 Mon Sep 17 00:00:00 2001 From: Wei Tang Date: Thu, 2 Jul 2020 22:40:29 +0200 Subject: [PATCH 06/11] Get a handle of slot_notification_sinks in BabeSlotWorker --- client/consensus/babe/src/lib.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/client/consensus/babe/src/lib.rs b/client/consensus/babe/src/lib.rs index 39a5083ff6d73..934eaf8b5fedb 100644 --- a/client/consensus/babe/src/lib.rs +++ b/client/consensus/babe/src/lib.rs @@ -398,6 +398,7 @@ pub fn start_babe(BabeParams { force_authoring, keystore, epoch_changes: babe_link.epoch_changes.clone(), + slot_notification_sinks: slot_notification_sinks.clone(), config: config.clone(), }; @@ -461,6 +462,7 @@ struct BabeSlotWorker { force_authoring: bool, keystore: KeyStorePtr, epoch_changes: SharedEpochChanges, + slot_notification_sinks: Arc, Epoch>)>>>>, config: Config, } From 10b085cd23d0112961656ae74acd703579b4fc3d Mon Sep 17 00:00:00 2001 From: Wei Tang Date: Thu, 2 Jul 2020 22:42:36 +0200 Subject: [PATCH 07/11] Implement notify_slot --- client/consensus/babe/src/lib.rs | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/client/consensus/babe/src/lib.rs b/client/consensus/babe/src/lib.rs index 934eaf8b5fedb..5408b48fa98c1 100644 --- a/client/consensus/babe/src/lib.rs +++ b/client/consensus/babe/src/lib.rs @@ -540,6 +540,16 @@ impl sc_consensus_slots::SimpleSlotWorker for BabeSlot s } + fn notify_slot( + &self, + _parent_header: &B::Header, + slot_number: SlotNumber, + epoch_descriptor: &ViableEpochDescriptor, Epoch>, + ) { + self.slot_notification_sinks.lock() + .retain(|sink| sink.unbounded_send((slot_number, epoch_descriptor.clone())).is_ok()); + } + fn pre_digest_data( &self, _slot_number: u64, From e1680eefe8143d77651e9a4d5aec88a1bd3d1bc5 Mon Sep 17 00:00:00 2001 From: Wei Tang Date: Mon, 13 Jul 2020 06:33:43 +0200 Subject: [PATCH 08/11] Switch to use bounded mpsc --- Cargo.lock | 7 +++++++ client/consensus/babe/Cargo.toml | 1 + client/consensus/babe/src/lib.rs | 15 +++++++++------ 3 files changed, 17 insertions(+), 6 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 9e323df162e61..761ab9bdcd002 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5832,6 +5832,12 @@ dependencies = [ "syn 1.0.33", ] +[[package]] +name = "retain_mut" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e005d658ad26eacc2b6c506dfde519f4e277e328d0eb3379ca61647d70a8f531" + [[package]] name = "ring" version = "0.16.12" @@ -6270,6 +6276,7 @@ dependencies = [ "pdqselect", "rand 0.7.3", "rand_chacha 0.2.2", + "retain_mut", "sc-block-builder", "sc-client-api", "sc-consensus-epochs", diff --git a/client/consensus/babe/Cargo.toml b/client/consensus/babe/Cargo.toml index e879c44a0f95e..f62b5a9d95271 100644 --- a/client/consensus/babe/Cargo.toml +++ b/client/consensus/babe/Cargo.toml @@ -49,6 +49,7 @@ rand = "0.7.2" merlin = "2.0" pdqselect = "0.1.0" derive_more = "0.99.2" +retain_mut = "0.1.1" [dev-dependencies] sp-keyring = { version = "2.0.0-rc4", path = "../../../primitives/keyring" } diff --git a/client/consensus/babe/src/lib.rs b/client/consensus/babe/src/lib.rs index de32d0b86fcc2..fec77f146931b 100644 --- a/client/consensus/babe/src/lib.rs +++ b/client/consensus/babe/src/lib.rs @@ -106,7 +106,8 @@ use sc_client_api::{ BlockchainEvents, ProvideUncles, }; use sp_block_builder::BlockBuilder as BlockBuilderApi; -use sp_utils::mpsc::{tracing_unbounded, TracingUnboundedSender, TracingUnboundedReceiver}; +use futures::channel::mpsc::{channel, Sender, Receiver}; +use retain_mut::RetainMut; use futures::prelude::*; use log::{debug, info, log, trace, warn}; @@ -428,7 +429,7 @@ pub fn start_babe(BabeParams { /// Worker for Babe which implements `Future`. This must be polled. pub struct BabeWorker { inner: Pin>>, - slot_notification_sinks: Arc, Epoch>)>>>>, + slot_notification_sinks: Arc, Epoch>)>>>>, } impl BabeWorker { @@ -436,8 +437,10 @@ impl BabeWorker { /// epoch descriptor. pub fn slot_notification_stream( &self - ) -> TracingUnboundedReceiver<(u64, ViableEpochDescriptor, Epoch>)> { - let (sink, stream) = tracing_unbounded("mpsc_babe_slot_notifications"); + ) -> Receiver<(u64, ViableEpochDescriptor, Epoch>)> { + const CHANNEL_BUFFER_SIZE: usize = 1024; + + let (sink, stream) = channel(CHANNEL_BUFFER_SIZE); self.slot_notification_sinks.lock().push(sink); stream } @@ -462,7 +465,7 @@ struct BabeSlotWorker { force_authoring: bool, keystore: KeyStorePtr, epoch_changes: SharedEpochChanges, - slot_notification_sinks: Arc, Epoch>)>>>>, + slot_notification_sinks: Arc, Epoch>)>>>>, config: Config, } @@ -547,7 +550,7 @@ impl sc_consensus_slots::SimpleSlotWorker for BabeSlot epoch_descriptor: &ViableEpochDescriptor, Epoch>, ) { self.slot_notification_sinks.lock() - .retain(|sink| sink.unbounded_send((slot_number, epoch_descriptor.clone())).is_ok()); + .retain_mut(|sink| sink.try_send((slot_number, epoch_descriptor.clone())).is_ok()); } fn pre_digest_data( From f2f46005715fc1375796c8be9b93e9373114f4bc Mon Sep 17 00:00:00 2001 From: Wei Tang Date: Mon, 13 Jul 2020 07:15:20 +0200 Subject: [PATCH 09/11] Do not drop the sink when channel is full Only skip sending the message and emit a warning, because it is recoverable. --- client/consensus/babe/src/lib.rs | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/client/consensus/babe/src/lib.rs b/client/consensus/babe/src/lib.rs index fec77f146931b..15dab2d373da1 100644 --- a/client/consensus/babe/src/lib.rs +++ b/client/consensus/babe/src/lib.rs @@ -550,7 +550,19 @@ impl sc_consensus_slots::SimpleSlotWorker for BabeSlot epoch_descriptor: &ViableEpochDescriptor, Epoch>, ) { self.slot_notification_sinks.lock() - .retain_mut(|sink| sink.try_send((slot_number, epoch_descriptor.clone())).is_ok()); + .retain_mut(|sink| { + match sink.try_send((slot_number, epoch_descriptor.clone())) { + Ok(()) => true, + Err(e) => { + if e.is_full() { + warn!(target: "babe", "Trying to notify a slot but the channel is full"); + true + } else { + false + } + }, + } + }); } fn pre_digest_data( From 6d5e9506b4c2bc09c0432523bd5543e5cc750341 Mon Sep 17 00:00:00 2001 From: Wei Tang Date: Mon, 13 Jul 2020 07:35:14 +0200 Subject: [PATCH 10/11] Fix future type bounds --- client/consensus/babe/src/lib.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/client/consensus/babe/src/lib.rs b/client/consensus/babe/src/lib.rs index 15dab2d373da1..75f7d5efe095d 100644 --- a/client/consensus/babe/src/lib.rs +++ b/client/consensus/babe/src/lib.rs @@ -428,7 +428,7 @@ pub fn start_babe(BabeParams { /// Worker for Babe which implements `Future`. This must be polled. pub struct BabeWorker { - inner: Pin>>, + inner: Pin + Send + 'static>>, slot_notification_sinks: Arc, Epoch>)>>>>, } From b097258432094bf7e146b0aa17593f8b7bbc2b0b Mon Sep 17 00:00:00 2001 From: Wei Tang Date: Thu, 30 Jul 2020 16:07:24 +0200 Subject: [PATCH 11/11] Add must_use and sink type alias --- client/consensus/babe/src/lib.rs | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/client/consensus/babe/src/lib.rs b/client/consensus/babe/src/lib.rs index c7fb99929661c..951d1467b4983 100644 --- a/client/consensus/babe/src/lib.rs +++ b/client/consensus/babe/src/lib.rs @@ -427,6 +427,7 @@ pub fn start_babe(BabeParams { } /// Worker for Babe which implements `Future`. This must be polled. +#[must_use] pub struct BabeWorker { inner: Pin + Send + 'static>>, slot_notification_sinks: Arc, Epoch>)>>>>, @@ -457,6 +458,9 @@ impl futures::Future for BabeWorker { } } +/// Slot notification sinks. +type SlotNotificationSinks = Arc::Hash, NumberFor, Epoch>)>>>>; + struct BabeSlotWorker { client: Arc, block_import: Arc>, @@ -465,7 +469,7 @@ struct BabeSlotWorker { force_authoring: bool, keystore: KeyStorePtr, epoch_changes: SharedEpochChanges, - slot_notification_sinks: Arc, Epoch>)>>>>, + slot_notification_sinks: SlotNotificationSinks, config: Config, }