Skip to content
This repository was archived by the owner on Nov 15, 2023. It is now read-only.
Closed
Show file tree
Hide file tree
Changes from 7 commits
Commits
Show all changes
16 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions client/beefy/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ sp-blockchain = { version = "4.0.0-dev", path = "../../primitives/blockchain" }
sp-core = { version = "4.1.0-dev", path = "../../primitives/core" }
sp-keystore = { version = "0.10.0", path = "../../primitives/keystore" }
sp-runtime = { version = "4.1.0-dev", path = "../../primitives/runtime" }
sp-consensus = { version = "0.10.0-dev", path = "../../primitives/consensus/common" }

sc-chain-spec = { version = "4.0.0-dev", path = "../../client/chain-spec" }
sc-utils = { version = "4.0.0-dev", path = "../utils" }
Expand Down
9 changes: 6 additions & 3 deletions client/beefy/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use sc_network_gossip::{GossipEngine, Network as GossipNetwork};

use sp_api::ProvideRuntimeApi;
use sp_blockchain::HeaderBackend;
pub use sp_consensus::SyncOracle;
use sp_keystore::SyncCryptoStorePtr;
use sp_runtime::traits::Block;

Expand Down Expand Up @@ -112,7 +113,7 @@ where
BE: Backend<B>,
C: Client<B, BE>,
C::Api: BeefyApi<B>,
N: GossipNetwork<B> + Clone + Send + 'static,
N: GossipNetwork<B> + Clone + SyncOracle + Send + Sync + 'static,
{
/// BEEFY client
pub client: Arc<C>,
Expand Down Expand Up @@ -143,7 +144,7 @@ where
BE: Backend<B>,
C: Client<B, BE>,
C::Api: BeefyApi<B>,
N: GossipNetwork<B> + Clone + Send + 'static,
N: GossipNetwork<B> + Clone + SyncOracle + Send + Sync + 'static,
{
let BeefyParams {
client,
Expand All @@ -157,6 +158,7 @@ where
protocol_name,
} = beefy_params;

let sync_oracle = network.clone();
let gossip_validator = Arc::new(gossip::GossipValidator::new());
let gossip_engine = GossipEngine::new(network, protocol_name, gossip_validator.clone(), None);

Expand Down Expand Up @@ -184,9 +186,10 @@ where
gossip_validator,
min_block_delta,
metrics,
sync_oracle,
};

let worker = worker::BeefyWorker::<_, _, _>::new(worker_params);
let worker = worker::BeefyWorker::<_, _, _, _>::new(worker_params);

worker.run().await
}
39 changes: 33 additions & 6 deletions client/beefy/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use codec::{Codec, Decode, Encode};
use futures::{future, FutureExt, StreamExt};
use log::{debug, error, info, log_enabled, trace, warn};
use parking_lot::Mutex;
use std::task::Poll;

use sc_client_api::{Backend, FinalityNotification, FinalityNotifications};
use sc_network_gossip::GossipEngine;
Expand All @@ -47,10 +48,10 @@ use crate::{
metric_inc, metric_set,
metrics::Metrics,
notification::{BeefyBestBlockSender, BeefySignedCommitmentSender},
round, Client,
round, Client, SyncOracle,
};

pub(crate) struct WorkerParams<B, BE, C>
pub(crate) struct WorkerParams<B, BE, C, SO>
where
B: Block,
{
Expand All @@ -63,14 +64,16 @@ where
pub gossip_validator: Arc<GossipValidator<B>>,
pub min_block_delta: u32,
pub metrics: Option<Metrics>,
pub sync_oracle: SO,
}

/// A BEEFY worker plays the BEEFY protocol
pub(crate) struct BeefyWorker<B, C, BE>
pub(crate) struct BeefyWorker<B, C, BE, SO>
where
B: Block,
BE: Backend<B>,
C: Client<B, BE>,
SO: SyncOracle + Send + Sync + Clone + 'static,
{
client: Arc<C>,
backend: Arc<BE>,
Expand All @@ -91,24 +94,27 @@ where
beefy_best_block_sender: BeefyBestBlockSender<B>,
/// Validator set id for the last signed commitment
last_signed_id: u64,
/// Handle to the sync oracle
sync_oracle: SO,
// keep rustc happy
_backend: PhantomData<BE>,
}

impl<B, C, BE> BeefyWorker<B, C, BE>
impl<B, C, BE, SO> BeefyWorker<B, C, BE, SO>
where
B: Block + Codec,
BE: Backend<B>,
C: Client<B, BE>,
C::Api: BeefyApi<B>,
SO: SyncOracle + Send + Sync + Clone + 'static,
{
/// Return a new BEEFY worker instance.
///
/// Note that a BEEFY worker is only fully functional if a corresponding
/// BEEFY pallet has been deployed on-chain.
///
/// The BEEFY pallet is needed in order to keep track of the BEEFY authority set.
pub(crate) fn new(worker_params: WorkerParams<B, BE, C>) -> Self {
pub(crate) fn new(worker_params: WorkerParams<B, BE, C, SO>) -> Self {
let WorkerParams {
client,
backend,
Expand All @@ -119,6 +125,7 @@ where
gossip_validator,
min_block_delta,
metrics,
sync_oracle,
} = worker_params;

BeefyWorker {
Expand All @@ -136,17 +143,19 @@ where
best_beefy_block: None,
last_signed_id: 0,
beefy_best_block_sender,
sync_oracle,
_backend: PhantomData,
}
}
}

impl<B, C, BE> BeefyWorker<B, C, BE>
impl<B, C, BE, SO> BeefyWorker<B, C, BE, SO>
where
B: Block,
BE: Backend<B>,
C: Client<B, BE>,
C::Api: BeefyApi<B>,
SO: SyncOracle + Send + Sync + Clone + 'static,
{
/// Return `true`, if we should vote on block `number`
fn should_vote_on(&self, number: NumberFor<B>) -> bool {
Expand Down Expand Up @@ -400,6 +409,11 @@ where
));

loop {
if self.sync_oracle.is_major_syncing() {
debug!(target: "beefy", "Waiting for major sync to complete.");
wait_for_major_syncing(self.sync_oracle.clone()).await;
}

let engine = self.gossip_engine.clone();
let gossip_engine = future::poll_fn(|cx| engine.lock().poll_unpin(cx));

Expand Down Expand Up @@ -430,6 +444,19 @@ where
}
}

/// Returns a future that waits until major sync is done
/// before completing
fn wait_for_major_syncing<SO: SyncOracle + Send + Sync + Clone + 'static>(
mut sync_oracle: SO,
) -> impl future::Future<Output = ()> {
return future::poll_fn(move |_cx| {
if sync_oracle.is_major_syncing() {
Poll::Pending
} else {
Poll::Ready(())
}
})
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this function has no way of knowing when the sync is over, i suggest adding it to the sync_oracle itself

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if it's rescheduled for polling with cx.waker().wake_by_ref() before returning pending, Would that be fine?

}
/// Extract the MMR root hash from a digest in the given header, if it exists.
fn find_mmr_root_digest<B, Id>(header: &B::Header) -> Option<MmrRootHash>
where
Expand Down