Skip to content
This repository was archived by the owner on Nov 15, 2023. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions client/consensus/babe/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ sp-consensus-vrf = { version = "0.8.0-rc4", path = "../../../primitives/consensu
sc-consensus-uncles = { version = "0.8.0-rc4", path = "../uncles" }
sc-consensus-slots = { version = "0.8.0-rc4", path = "../slots" }
sp-runtime = { version = "2.0.0-rc4", path = "../../../primitives/runtime" }
sp-utils = { version = "2.0.0-rc4", path = "../../../primitives/utils" }
fork-tree = { version = "2.0.0-rc4", path = "../../../utils/fork-tree" }
prometheus-endpoint = { package = "substrate-prometheus-endpoint", path = "../../../utils/prometheus", version = "0.8.0-rc4"}
futures = "0.3.4"
Expand All @@ -48,6 +49,7 @@ rand = "0.7.2"
merlin = "2.0"
pdqselect = "0.1.0"
derive_more = "0.99.2"
retain_mut = "0.1.1"

[dev-dependencies]
sp-keyring = { version = "2.0.0-rc4", path = "../../../primitives/keyring" }
Expand Down
71 changes: 61 additions & 10 deletions client/consensus/babe/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,8 @@ use sc_client_api::{
BlockchainEvents, ProvideUncles,
};
use sp_block_builder::BlockBuilder as BlockBuilderApi;
use futures::channel::mpsc::{channel, Sender, Receiver};
use retain_mut::RetainMut;

use futures::prelude::*;
use log::{debug, info, log, trace, warn};
Expand Down Expand Up @@ -370,31 +372,34 @@ pub fn start_babe<B, C, SC, E, I, SO, CAW, Error>(BabeParams {
babe_link,
can_author_with,
}: BabeParams<B, C, E, I, SO, SC, CAW>) -> Result<
impl futures::Future<Output=()>,
BabeWorker<B>,
sp_consensus::Error,
> where
B: BlockT,
C: ProvideRuntimeApi<B> + ProvideCache<B> + ProvideUncles<B> + BlockchainEvents<B>
+ HeaderBackend<B> + HeaderMetadata<B, Error = ClientError> + Send + Sync + 'static,
C::Api: BabeApi<B>,
SC: SelectChain<B> + 'static,
E: Environment<B, Error = Error> + Send + Sync,
E: Environment<B, Error = Error> + Send + Sync + 'static,
E::Proposer: Proposer<B, Error = Error, Transaction = sp_api::TransactionFor<C, B>>,
I: BlockImport<B, Error = ConsensusError, Transaction = sp_api::TransactionFor<C, B>> + Send
+ Sync + 'static,
Error: std::error::Error + Send + From<ConsensusError> + From<I::Error> + 'static,
SO: SyncOracle + Send + Sync + Clone,
CAW: CanAuthorWith<B> + Send,
SO: SyncOracle + Send + Sync + Clone + 'static,
CAW: CanAuthorWith<B> + Send + 'static,
{
let config = babe_link.config;
let worker = BabeWorker {
let slot_notification_sinks = Arc::new(Mutex::new(Vec::new()));

let worker = BabeSlotWorker {
client: client.clone(),
block_import: Arc::new(Mutex::new(block_import)),
env,
sync_oracle: sync_oracle.clone(),
force_authoring,
keystore,
epoch_changes: babe_link.epoch_changes.clone(),
slot_notification_sinks: slot_notification_sinks.clone(),
config: config.clone(),
};

Expand All @@ -406,29 +411,65 @@ pub fn start_babe<B, C, SC, E, I, SO, CAW, Error>(BabeParams {
)?;

info!(target: "babe", "👶 Starting BABE Authorship worker");
Ok(sc_consensus_slots::start_slot_worker(
let inner = sc_consensus_slots::start_slot_worker(
config.0,
select_chain,
worker,
sync_oracle,
inherent_data_providers,
babe_link.time_source,
can_author_with,
))
);
Ok(BabeWorker {
inner: Box::pin(inner),
slot_notification_sinks,
})
}

/// Worker for Babe which implements `Future<Output=()>`. This must be polled.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we can add a #[must_use] here?

pub struct BabeWorker<B: BlockT> {
inner: Pin<Box<dyn futures::Future<Output=()>>>,
slot_notification_sinks: Arc<Mutex<Vec<Sender<(u64, ViableEpochDescriptor<B::Hash, NumberFor<B>, Epoch>)>>>>,
}

impl<B: BlockT> BabeWorker<B> {
/// Return an event stream of notifications for when new slot happens, and the corresponding
/// epoch descriptor.
pub fn slot_notification_stream(
&self
) -> Receiver<(u64, ViableEpochDescriptor<B::Hash, NumberFor<B>, Epoch>)> {
const CHANNEL_BUFFER_SIZE: usize = 1024;

let (sink, stream) = channel(CHANNEL_BUFFER_SIZE);
self.slot_notification_sinks.lock().push(sink);
stream
}
}

struct BabeWorker<B: BlockT, C, E, I, SO> {
impl<B: BlockT> futures::Future for BabeWorker<B> {
type Output = ();

fn poll(
mut self: Pin<&mut Self>,
cx: &mut futures::task::Context
) -> futures::task::Poll<Self::Output> {
self.inner.as_mut().poll(cx)
}
}

struct BabeSlotWorker<B: BlockT, C, E, I, SO> {
client: Arc<C>,
block_import: Arc<Mutex<I>>,
env: E,
sync_oracle: SO,
force_authoring: bool,
keystore: KeyStorePtr,
epoch_changes: SharedEpochChanges<B, Epoch>,
slot_notification_sinks: Arc<Mutex<Vec<Sender<(u64, ViableEpochDescriptor<B::Hash, NumberFor<B>, Epoch>)>>>>,

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we introduce a type alias for this? type SlotNotifications<B> = Arc<Mutex<Vec<Sender<(u64, ViableEpochDescriptor<B, NumberFor<B>, Epoch>)>>>> perhaps?

config: Config,
}

impl<B, C, E, I, Error, SO> sc_consensus_slots::SimpleSlotWorker<B> for BabeWorker<B, C, E, I, SO> where
impl<B, C, E, I, Error, SO> sc_consensus_slots::SimpleSlotWorker<B> for BabeSlotWorker<B, C, E, I, SO> where
B: BlockT,
C: ProvideRuntimeApi<B> +
ProvideCache<B> +
Expand Down Expand Up @@ -502,6 +543,16 @@ impl<B, C, E, I, Error, SO> sc_consensus_slots::SimpleSlotWorker<B> for BabeWork
s
}

fn notify_slot(
&self,
_parent_header: &B::Header,
slot_number: SlotNumber,
epoch_descriptor: &ViableEpochDescriptor<B::Hash, NumberFor<B>, Epoch>,
) {
self.slot_notification_sinks.lock()
.retain_mut(|sink| sink.try_send((slot_number, epoch_descriptor.clone())).is_ok());

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The docs never explicitly mentions it, but from my readings of the source code (https://docs.rs/futures-channel/0.2.0/src/futures_channel/mpsc/mod.rs.html#416-429), try_send will never block and returns error if the buffer is full.

}

fn pre_digest_data(
&self,
_slot_number: u64,
Expand Down Expand Up @@ -599,7 +650,7 @@ impl<B, C, E, I, Error, SO> sc_consensus_slots::SimpleSlotWorker<B> for BabeWork
}
}

impl<B, C, E, I, Error, SO> SlotWorker<B> for BabeWorker<B, C, E, I, SO> where
impl<B, C, E, I, Error, SO> SlotWorker<B> for BabeSlotWorker<B, C, E, I, SO> where
B: BlockT,
C: ProvideRuntimeApi<B> +
ProvideCache<B> +
Expand Down
11 changes: 11 additions & 0 deletions client/consensus/slots/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,15 @@ pub trait SimpleSlotWorker<B: BlockT> {
epoch_data: &Self::EpochData,
) -> Option<Self::Claim>;

/// Notifies the given slot. Similar to `claim_slot`, but will be called no matter whether we
/// need to author blocks or not.
fn notify_slot(
&self,
_header: &B::Header,
_slot_number: u64,
_epoch_data: &Self::EpochData,
) { }

/// Return the pre digest data to include in a block authored with the given claim.
fn pre_digest_data(
&self,
Expand Down Expand Up @@ -191,6 +200,8 @@ pub trait SimpleSlotWorker<B: BlockT> {
}
};

self.notify_slot(&chain_head, slot_number, &epoch_data);

let authorities_len = self.authorities_len(&epoch_data);

if !self.force_authoring() &&
Expand Down