Skip to content
This repository was archived by the owner on Nov 15, 2023. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions client/consensus/babe/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ sp-consensus-vrf = { version = "0.8.0-rc5", path = "../../../primitives/consensu
sc-consensus-uncles = { version = "0.8.0-rc5", path = "../uncles" }
sc-consensus-slots = { version = "0.8.0-rc5", path = "../slots" }
sp-runtime = { version = "2.0.0-rc5", path = "../../../primitives/runtime" }
sp-utils = { version = "2.0.0-rc5", path = "../../../primitives/utils" }
fork-tree = { version = "2.0.0-rc5", path = "../../../utils/fork-tree" }
prometheus-endpoint = { package = "substrate-prometheus-endpoint", path = "../../../utils/prometheus", version = "0.8.0-rc5"}
futures = "0.3.4"
Expand All @@ -48,6 +49,7 @@ rand = "0.7.2"
merlin = "2.0"
pdqselect = "0.1.0"
derive_more = "0.99.2"
retain_mut = "0.1.1"

[dev-dependencies]
sp-keyring = { version = "2.0.0-rc5", path = "../../../primitives/keyring" }
Expand Down
87 changes: 77 additions & 10 deletions client/consensus/babe/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,8 @@ use sc_client_api::{
BlockchainEvents, ProvideUncles,
};
use sp_block_builder::BlockBuilder as BlockBuilderApi;
use futures::channel::mpsc::{channel, Sender, Receiver};
use retain_mut::RetainMut;

use futures::prelude::*;
use log::{debug, info, log, trace, warn};
Expand Down Expand Up @@ -370,31 +372,34 @@ pub fn start_babe<B, C, SC, E, I, SO, CAW, Error>(BabeParams {
babe_link,
can_author_with,
}: BabeParams<B, C, E, I, SO, SC, CAW>) -> Result<
impl futures::Future<Output=()>,
BabeWorker<B>,
sp_consensus::Error,
> where
B: BlockT,
C: ProvideRuntimeApi<B> + ProvideCache<B> + ProvideUncles<B> + BlockchainEvents<B>
+ HeaderBackend<B> + HeaderMetadata<B, Error = ClientError> + Send + Sync + 'static,
C::Api: BabeApi<B>,
SC: SelectChain<B> + 'static,
E: Environment<B, Error = Error> + Send + Sync,
E: Environment<B, Error = Error> + Send + Sync + 'static,
E::Proposer: Proposer<B, Error = Error, Transaction = sp_api::TransactionFor<C, B>>,
I: BlockImport<B, Error = ConsensusError, Transaction = sp_api::TransactionFor<C, B>> + Send
+ Sync + 'static,
Error: std::error::Error + Send + From<ConsensusError> + From<I::Error> + 'static,
SO: SyncOracle + Send + Sync + Clone,
CAW: CanAuthorWith<B> + Send,
SO: SyncOracle + Send + Sync + Clone + 'static,
CAW: CanAuthorWith<B> + Send + 'static,
{
let config = babe_link.config;
let worker = BabeWorker {
let slot_notification_sinks = Arc::new(Mutex::new(Vec::new()));

let worker = BabeSlotWorker {
client: client.clone(),
block_import: Arc::new(Mutex::new(block_import)),
env,
sync_oracle: sync_oracle.clone(),
force_authoring,
keystore,
epoch_changes: babe_link.epoch_changes.clone(),
slot_notification_sinks: slot_notification_sinks.clone(),
config: config.clone(),
};

Expand All @@ -406,29 +411,69 @@ pub fn start_babe<B, C, SC, E, I, SO, CAW, Error>(BabeParams {
)?;

info!(target: "babe", "👶 Starting BABE Authorship worker");
Ok(sc_consensus_slots::start_slot_worker(
let inner = sc_consensus_slots::start_slot_worker(
config.0,
select_chain,
worker,
sync_oracle,
inherent_data_providers,
babe_link.time_source,
can_author_with,
))
);
Ok(BabeWorker {
inner: Box::pin(inner),
slot_notification_sinks,
})
}

/// Worker for Babe which implements `Future<Output=()>`. This must be polled.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we can add a #[must_use] here?

#[must_use]
pub struct BabeWorker<B: BlockT> {
inner: Pin<Box<dyn futures::Future<Output=()> + Send + 'static>>,
slot_notification_sinks: Arc<Mutex<Vec<Sender<(u64, ViableEpochDescriptor<B::Hash, NumberFor<B>, Epoch>)>>>>,
}

impl<B: BlockT> BabeWorker<B> {
/// Return an event stream of notifications for when new slot happens, and the corresponding
/// epoch descriptor.
pub fn slot_notification_stream(
&self
) -> Receiver<(u64, ViableEpochDescriptor<B::Hash, NumberFor<B>, Epoch>)> {
const CHANNEL_BUFFER_SIZE: usize = 1024;

let (sink, stream) = channel(CHANNEL_BUFFER_SIZE);
self.slot_notification_sinks.lock().push(sink);
stream
}
}

impl<B: BlockT> futures::Future for BabeWorker<B> {
type Output = ();

fn poll(
mut self: Pin<&mut Self>,
cx: &mut futures::task::Context
) -> futures::task::Poll<Self::Output> {
self.inner.as_mut().poll(cx)
}
}

struct BabeWorker<B: BlockT, C, E, I, SO> {
/// Slot notification sinks.
type SlotNotificationSinks<B> = Arc<Mutex<Vec<Sender<(u64, ViableEpochDescriptor<<B as BlockT>::Hash, NumberFor<B>, Epoch>)>>>>;

struct BabeSlotWorker<B: BlockT, C, E, I, SO> {
client: Arc<C>,
block_import: Arc<Mutex<I>>,
env: E,
sync_oracle: SO,
force_authoring: bool,
keystore: KeyStorePtr,
epoch_changes: SharedEpochChanges<B, Epoch>,
slot_notification_sinks: SlotNotificationSinks<B>,
config: Config,
}

impl<B, C, E, I, Error, SO> sc_consensus_slots::SimpleSlotWorker<B> for BabeWorker<B, C, E, I, SO> where
impl<B, C, E, I, Error, SO> sc_consensus_slots::SimpleSlotWorker<B> for BabeSlotWorker<B, C, E, I, SO> where
B: BlockT,
C: ProvideRuntimeApi<B> +
ProvideCache<B> +
Expand Down Expand Up @@ -502,6 +547,28 @@ impl<B, C, E, I, Error, SO> sc_consensus_slots::SimpleSlotWorker<B> for BabeWork
s
}

fn notify_slot(
&self,
_parent_header: &B::Header,
slot_number: SlotNumber,
epoch_descriptor: &ViableEpochDescriptor<B::Hash, NumberFor<B>, Epoch>,
) {
self.slot_notification_sinks.lock()
.retain_mut(|sink| {
match sink.try_send((slot_number, epoch_descriptor.clone())) {
Ok(()) => true,
Err(e) => {
if e.is_full() {
warn!(target: "babe", "Trying to notify a slot but the channel is full");
true
} else {
false
}
},
}
});
}

fn pre_digest_data(
&self,
_slot_number: u64,
Expand Down Expand Up @@ -599,7 +666,7 @@ impl<B, C, E, I, Error, SO> sc_consensus_slots::SimpleSlotWorker<B> for BabeWork
}
}

impl<B, C, E, I, Error, SO> SlotWorker<B> for BabeWorker<B, C, E, I, SO> where
impl<B, C, E, I, Error, SO> SlotWorker<B> for BabeSlotWorker<B, C, E, I, SO> where
B: BlockT,
C: ProvideRuntimeApi<B> +
ProvideCache<B> +
Expand Down
11 changes: 11 additions & 0 deletions client/consensus/slots/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,15 @@ pub trait SimpleSlotWorker<B: BlockT> {
epoch_data: &Self::EpochData,
) -> Option<Self::Claim>;

/// Notifies the given slot. Similar to `claim_slot`, but will be called no matter whether we
/// need to author blocks or not.
fn notify_slot(
&self,
_header: &B::Header,
_slot_number: u64,
_epoch_data: &Self::EpochData,
) { }

/// Return the pre digest data to include in a block authored with the given claim.
fn pre_digest_data(
&self,
Expand Down Expand Up @@ -191,6 +200,8 @@ pub trait SimpleSlotWorker<B: BlockT> {
}
};

self.notify_slot(&chain_head, slot_number, &epoch_data);

let authorities_len = self.authorities_len(&epoch_data);

if !self.force_authoring() &&
Expand Down