diff --git a/Cargo.lock b/Cargo.lock index 926d6d4cdf19a..34bf34c62c6fd 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -440,6 +440,7 @@ dependencies = [ name = "beefy-gadget" version = "4.0.0-dev" dependencies = [ + "async-trait", "beefy-primitives", "fnv", "futures", @@ -448,6 +449,7 @@ dependencies = [ "log", "parity-scale-codec", "parking_lot 0.12.0", + "sc-block-builder", "sc-chain-spec", "sc-client-api", "sc-consensus", diff --git a/client/beefy/Cargo.toml b/client/beefy/Cargo.toml index b910f7ce307a5..e219420959c9f 100644 --- a/client/beefy/Cargo.toml +++ b/client/beefy/Cargo.toml @@ -9,6 +9,7 @@ description = "BEEFY Client gadget for substrate" homepage = "https://substrate.io" [dependencies] +async-trait = "0.1.50" codec = { package = "parity-scale-codec", version = "3.0.0", features = ["derive"] } fnv = "1.0.6" futures = "0.3" @@ -22,6 +23,7 @@ beefy-primitives = { version = "4.0.0-dev", path = "../../primitives/beefy" } prometheus = { package = "substrate-prometheus-endpoint", version = "0.10.0-dev", path = "../../utils/prometheus" } sc-chain-spec = { version = "4.0.0-dev", path = "../../client/chain-spec" } sc-client-api = { version = "4.0.0-dev", path = "../api" } +sc-consensus = { version = "0.10.0-dev", path = "../consensus/common" } sc-finality-grandpa = { version = "0.10.0-dev", path = "../../client/finality-grandpa" } sc-keystore = { version = "4.0.0-dev", path = "../keystore" } sc-network = { version = "0.10.0-dev", path = "../network" } @@ -42,7 +44,7 @@ serde = "1.0.136" strum = { version = "0.24.1", features = ["derive"] } tempfile = "3.1.0" tokio = "1.17.0" -sc-consensus = { version = "0.10.0-dev", path = "../consensus/common" } +sc-block-builder = { version = "0.10.0-dev", path = "../block-builder" } sc-network-test = { version = "0.8.0", path = "../network/test" } sp-finality-grandpa = { version = "4.0.0-dev", path = "../../primitives/finality-grandpa" } sp-keyring = { version = "6.0.0", path = "../../primitives/keyring" } diff --git a/client/beefy/rpc/src/lib.rs b/client/beefy/rpc/src/lib.rs index 2c3ffda056c26..13bca08d37429 100644 --- a/client/beefy/rpc/src/lib.rs +++ b/client/beefy/rpc/src/lib.rs @@ -164,8 +164,9 @@ where mod tests { use super::*; - use beefy_gadget::notification::{ - BeefyBestBlockStream, BeefySignedCommitment, BeefySignedCommitmentSender, + use beefy_gadget::{ + justification::BeefySignedCommitment, + notification::{BeefyBestBlockStream, BeefySignedCommitmentSender}, }; use beefy_primitives::{known_payload_ids, Payload}; use codec::{Decode, Encode}; diff --git a/client/beefy/rpc/src/notification.rs b/client/beefy/rpc/src/notification.rs index 2f58c7c6bb5dc..cdda667782dd5 100644 --- a/client/beefy/rpc/src/notification.rs +++ b/client/beefy/rpc/src/notification.rs @@ -29,7 +29,7 @@ pub struct EncodedSignedCommitment(sp_core::Bytes); impl EncodedSignedCommitment { pub fn new( - signed_commitment: beefy_gadget::notification::BeefySignedCommitment, + signed_commitment: beefy_gadget::justification::BeefySignedCommitment, ) -> Self where Block: BlockT, diff --git a/client/beefy/src/error.rs b/client/beefy/src/error.rs index eacadeb7613a5..dd5fd649d52ce 100644 --- a/client/beefy/src/error.rs +++ b/client/beefy/src/error.rs @@ -24,8 +24,12 @@ use std::fmt::Debug; #[derive(Debug, thiserror::Error, PartialEq)] pub enum Error { + #[error("Backend: {0}")] + Backend(String), #[error("Keystore error: {0}")] Keystore(String), #[error("Signature error: {0}")] Signature(String), + #[error("Session uninitialized")] + UninitSession, } diff --git a/client/beefy/src/import.rs b/client/beefy/src/import.rs new file mode 100644 index 0000000000000..7caeb49db5e50 --- /dev/null +++ b/client/beefy/src/import.rs @@ -0,0 +1,205 @@ +// This file is part of Substrate. + +// Copyright (C) 2021-2022 Parity Technologies (UK) Ltd. +// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0 + +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with this program. If not, see . + +use beefy_primitives::{crypto::Signature, BeefyApi, VersionedFinalityProof, BEEFY_ENGINE_ID}; +use codec::Encode; +use log::error; +use std::{collections::HashMap, sync::Arc}; + +use sp_api::{ProvideRuntimeApi, TransactionFor}; +use sp_blockchain::{well_known_cache_keys, HeaderBackend}; +use sp_consensus::Error as ConsensusError; +use sp_runtime::{ + generic::BlockId, + traits::{Block as BlockT, Header as HeaderT, NumberFor}, + EncodedJustification, +}; + +use sc_client_api::backend::Backend; +use sc_consensus::{BlockCheckParams, BlockImport, BlockImportParams, ImportResult}; + +use crate::{ + justification::decode_and_verify_commitment, notification::BeefySignedCommitmentSender, +}; + +/// A block-import handler for BEEFY. +/// +/// This scans each imported block for BEEFY justifications and verifies them. +/// Wraps a `inner: BlockImport` and ultimately defers to it. +/// +/// When using BEEFY, the block import worker should be using this block import object. +pub struct BeefyBlockImport { + backend: Arc, + runtime: Arc, + inner: I, + justification_sender: BeefySignedCommitmentSender, +} + +impl Clone for BeefyBlockImport { + fn clone(&self) -> Self { + BeefyBlockImport { + backend: self.backend.clone(), + runtime: self.runtime.clone(), + inner: self.inner.clone(), + justification_sender: self.justification_sender.clone(), + } + } +} + +impl BeefyBlockImport { + /// Create a new BeefyBlockImport. + pub fn new( + backend: Arc, + runtime: Arc, + inner: I, + justification_sender: BeefySignedCommitmentSender, + ) -> BeefyBlockImport { + BeefyBlockImport { backend, runtime, inner, justification_sender } + } +} + +impl BeefyBlockImport +where + Block: BlockT, + BE: Backend, + Runtime: ProvideRuntimeApi, + Runtime::Api: BeefyApi + Send + Sync, +{ + fn decode_and_verify( + &self, + encoded: &EncodedJustification, + number: NumberFor, + hash: ::Hash, + ) -> Result, Signature>, ConsensusError> { + let block_id = BlockId::hash(hash); + let validator_set = self + .runtime + .runtime_api() + .validator_set(&block_id) + .map_err(|e| ConsensusError::ClientImport(e.to_string()))? + .ok_or_else(|| ConsensusError::ClientImport("Unknown validator set".to_string()))?; + + decode_and_verify_commitment::(&encoded[..], number, &validator_set) + } + + /// Import BEEFY justification: Send it to worker for processing and also append it to backend. + /// + /// This function assumes: + /// - `justification` is verified and valid, + /// - the block referred by `justification` has been imported _and_ finalized. + fn import_beefy_justification_unchecked( + &self, + number: NumberFor, + justification: VersionedFinalityProof, Signature>, + ) { + // Append the justification to the block in the backend. + if let Err(e) = self.backend.append_justification( + BlockId::Number(number), + (BEEFY_ENGINE_ID, justification.encode()), + ) { + error!(target: "beefy", "🥩 Error {:?} on appending justification: {:?}", e, justification); + } + // Send the justification to the BEEFY voter for processing. + match justification { + // TODO #11838: Should not unpack, these channels should also use + // `VersionedFinalityProof`. + VersionedFinalityProof::V1(signed_commitment) => self + .justification_sender + .notify(|| Ok::<_, ()>(signed_commitment)) + .expect("forwards closure result; the closure always returns Ok; qed."), + }; + } +} + +#[async_trait::async_trait] +impl BlockImport for BeefyBlockImport +where + Block: BlockT, + BE: Backend, + I: BlockImport< + Block, + Error = ConsensusError, + Transaction = sp_api::TransactionFor, + > + Send + + Sync, + Runtime: ProvideRuntimeApi + Send + Sync, + Runtime::Api: BeefyApi, +{ + type Error = ConsensusError; + type Transaction = TransactionFor; + + async fn import_block( + &mut self, + mut block: BlockImportParams, + new_cache: HashMap>, + ) -> Result { + let hash = block.post_hash(); + let number = *block.header.number(); + + let beefy_proof = block + .justifications + .as_mut() + .and_then(|just| { + let decoded = just + .get(BEEFY_ENGINE_ID) + .map(|encoded| self.decode_and_verify(encoded, number, hash)); + // Remove BEEFY justification from the list before giving to `inner`; + // we will append it to backend ourselves at the end if all goes well. + just.remove(BEEFY_ENGINE_ID); + decoded + }) + .transpose() + .unwrap_or(None); + + // Run inner block import. + let inner_import_result = self.inner.import_block(block, new_cache).await?; + + match (beefy_proof, &inner_import_result) { + (Some(proof), ImportResult::Imported(_)) => { + let status = self.backend.blockchain().info(); + if number <= status.finalized_number && + Some(hash) == + self.backend + .blockchain() + .hash(number) + .map_err(|e| ConsensusError::ClientImport(e.to_string()))? + { + // The proof is valid and the block is imported and final, we can import. + self.import_beefy_justification_unchecked(number, proof); + } else { + error!( + target: "beefy", + "🥩 Cannot import justification: {:?} for, not yet final, block number {:?}", + proof, + number, + ); + } + }, + _ => (), + } + + Ok(inner_import_result) + } + + async fn check_block( + &mut self, + block: BlockCheckParams, + ) -> Result { + self.inner.check_block(block).await + } +} diff --git a/client/beefy/src/justification.rs b/client/beefy/src/justification.rs new file mode 100644 index 0000000000000..2a5191daec4b5 --- /dev/null +++ b/client/beefy/src/justification.rs @@ -0,0 +1,177 @@ +// This file is part of Substrate. + +// Copyright (C) 2021-2022 Parity Technologies (UK) Ltd. +// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0 + +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with this program. If not, see . + +use crate::keystore::BeefyKeystore; +use beefy_primitives::{ + crypto::{AuthorityId, Signature}, + ValidatorSet, VersionedFinalityProof, +}; +use codec::{Decode, Encode}; +use sp_consensus::Error as ConsensusError; +use sp_runtime::traits::{Block as BlockT, NumberFor}; + +/// A commitment with matching BEEFY authorities' signatures. +pub type BeefySignedCommitment = + beefy_primitives::SignedCommitment, beefy_primitives::crypto::Signature>; + +/// Decode and verify a Beefy SignedCommitment. +pub(crate) fn decode_and_verify_commitment( + encoded: &[u8], + target_number: NumberFor, + validator_set: &ValidatorSet, +) -> Result, Signature>, ConsensusError> { + let proof = , Signature>>::decode(&mut &*encoded) + .map_err(|_| ConsensusError::InvalidJustification)?; + verify_with_validator_set::(target_number, validator_set, &proof).map(|_| proof) +} + +/// Verify the Beefy finality proof against the validator set at the block it was generated. +fn verify_with_validator_set( + target_number: NumberFor, + validator_set: &ValidatorSet, + proof: &VersionedFinalityProof, Signature>, +) -> Result<(), ConsensusError> { + match proof { + VersionedFinalityProof::V1(signed_commitment) => { + if signed_commitment.signatures.len() != validator_set.len() || + signed_commitment.commitment.validator_set_id != validator_set.id() || + signed_commitment.commitment.block_number != target_number + { + return Err(ConsensusError::InvalidJustification) + } + + // Arrangement of signatures in the commitment should be in the same order + // as validators for that set. + let message = signed_commitment.commitment.encode(); + let valid_signatures = validator_set + .validators() + .into_iter() + .zip(signed_commitment.signatures.iter()) + .filter(|(id, signature)| { + signature + .as_ref() + .map(|sig| BeefyKeystore::verify(id, sig, &message[..])) + .unwrap_or(false) + }) + .count(); + if valid_signatures >= crate::round::threshold(validator_set.len()) { + Ok(()) + } else { + Err(ConsensusError::InvalidJustification) + } + }, + } +} + +#[cfg(test)] +pub(crate) mod tests { + use beefy_primitives::{known_payload_ids, Commitment, Payload, SignedCommitment}; + use substrate_test_runtime_client::runtime::Block; + + use super::*; + use crate::{keystore::tests::Keyring, tests::make_beefy_ids}; + + pub(crate) fn new_signed_commitment( + block_num: NumberFor, + validator_set: &ValidatorSet, + keys: &[Keyring], + ) -> BeefySignedCommitment { + let commitment = Commitment { + payload: Payload::new(known_payload_ids::MMR_ROOT_ID, vec![]), + block_number: block_num, + validator_set_id: validator_set.id(), + }; + let message = commitment.encode(); + let signatures = keys.iter().map(|key| Some(key.sign(&message))).collect(); + SignedCommitment { commitment, signatures } + } + + #[test] + fn should_verify_with_validator_set() { + let keys = &[Keyring::Alice, Keyring::Bob, Keyring::Charlie]; + let validator_set = ValidatorSet::new(make_beefy_ids(keys), 0).unwrap(); + + // build valid justification + let block_num = 42; + let proof = new_signed_commitment(block_num, &validator_set, keys); + + let good_proof = proof.clone().into(); + // should verify successfully + verify_with_validator_set::(block_num, &validator_set, &good_proof).unwrap(); + + // wrong block number -> should fail verification + let good_proof = proof.clone().into(); + match verify_with_validator_set::(block_num + 1, &validator_set, &good_proof) { + Err(ConsensusError::InvalidJustification) => (), + _ => assert!(false, "Expected Err(ConsensusError::InvalidJustification)"), + }; + + // wrong validator set id -> should fail verification + let good_proof = proof.clone().into(); + let other = ValidatorSet::new(make_beefy_ids(keys), 1).unwrap(); + match verify_with_validator_set::(block_num, &other, &good_proof) { + Err(ConsensusError::InvalidJustification) => (), + _ => assert!(false, "Expected Err(ConsensusError::InvalidJustification)"), + }; + + // wrong signatures length -> should fail verification + let mut bad_proof = proof.clone(); + // change length of signatures + bad_proof.signatures.pop().flatten().unwrap(); + match verify_with_validator_set::(block_num + 1, &validator_set, &bad_proof.into()) { + Err(ConsensusError::InvalidJustification) => (), + _ => assert!(false, "Expected Err(ConsensusError::InvalidJustification)"), + }; + + // not enough signatures -> should fail verification + let mut bad_proof = proof.clone(); + // remove a signature (but same length) + *bad_proof.signatures.first_mut().unwrap() = None; + match verify_with_validator_set::(block_num + 1, &validator_set, &bad_proof.into()) { + Err(ConsensusError::InvalidJustification) => (), + _ => assert!(false, "Expected Err(ConsensusError::InvalidJustification)"), + }; + + // not enough _correct_ signatures -> should fail verification + let mut bad_proof = proof.clone(); + // change a signature to a different key + *bad_proof.signatures.first_mut().unwrap() = + Some(Keyring::Dave.sign(&proof.commitment.encode())); + match verify_with_validator_set::(block_num + 1, &validator_set, &bad_proof.into()) { + Err(ConsensusError::InvalidJustification) => (), + _ => assert!(false, "Expected Err(ConsensusError::InvalidJustification)"), + }; + } + + #[test] + fn should_decode_and_verify_commitment() { + let keys = &[Keyring::Alice, Keyring::Bob]; + let validator_set = ValidatorSet::new(make_beefy_ids(keys), 0).unwrap(); + let block_num = 1; + + // build valid justification + let proof = new_signed_commitment(block_num, &validator_set, keys); + let versioned_proof: VersionedFinalityProof, Signature> = proof.into(); + let encoded = versioned_proof.encode(); + + // should successfully decode and verify + let verified = + decode_and_verify_commitment::(&encoded, block_num, &validator_set).unwrap(); + assert_eq!(verified, versioned_proof); + } +} diff --git a/client/beefy/src/lib.rs b/client/beefy/src/lib.rs index c025ec5686ad2..81c72dec8cd08 100644 --- a/client/beefy/src/lib.rs +++ b/client/beefy/src/lib.rs @@ -16,23 +16,18 @@ // You should have received a copy of the GNU General Public License // along with this program. If not, see . -use std::sync::Arc; - +use beefy_primitives::{BeefyApi, MmrRootHash}; use prometheus::Registry; - use sc_client_api::{Backend, BlockchainEvents, Finalizer}; +use sc_consensus::BlockImport; use sc_network_gossip::Network as GossipNetwork; - use sp_api::ProvideRuntimeApi; use sp_blockchain::HeaderBackend; -use sp_consensus::SyncOracle; +use sp_consensus::{Error as ConsensusError, SyncOracle}; use sp_keystore::SyncCryptoStorePtr; use sp_mmr_primitives::MmrApi; use sp_runtime::traits::Block; - -use beefy_primitives::{BeefyApi, MmrRootHash}; - -use crate::notification::{BeefyBestBlockSender, BeefySignedCommitmentSender}; +use std::sync::Arc; mod error; mod gossip; @@ -41,11 +36,21 @@ mod metrics; mod round; mod worker; +pub mod import; +pub mod justification; pub mod notification; #[cfg(test)] mod tests; +use crate::{ + import::BeefyBlockImport, + notification::{ + BeefyBestBlockSender, BeefyBestBlockStream, BeefySignedCommitmentSender, + BeefySignedCommitmentStream, + }, +}; + pub use beefy_protocol_name::standard_name as protocol_standard_name; pub(crate) mod beefy_protocol_name { @@ -110,6 +115,68 @@ where // empty } +/// Links between the block importer, the background voter and the RPC layer, +/// to be used by the voter. +#[derive(Clone)] +pub struct BeefyVoterLinks { + // BlockImport -> Voter links + /// Stream of BEEFY signed commitments from block import to voter. + pub from_block_import_justif_stream: BeefySignedCommitmentStream, + + // Voter -> RPC links + /// Sends BEEFY signed commitments from voter to RPC. + pub to_rpc_justif_sender: BeefySignedCommitmentSender, + /// Sends BEEFY best block hashes from voter to RPC. + pub to_rpc_best_block_sender: BeefyBestBlockSender, +} + +/// Links used by the BEEFY RPC layer, from the BEEFY background voter. +#[derive(Clone)] +pub struct BeefyRPCLinks { + /// Stream of signed commitments coming from the voter. + pub from_voter_justif_stream: BeefySignedCommitmentStream, + /// Stream of BEEFY best block hashes coming from the voter. + pub from_voter_best_beefy_stream: BeefyBestBlockStream, +} + +/// Make block importer and link half necessary to tie the background voter to it. +pub fn beefy_block_import_and_links( + wrapped_block_import: I, + backend: Arc, + runtime: Arc, +) -> (BeefyBlockImport, BeefyVoterLinks, BeefyRPCLinks) +where + B: Block, + BE: Backend, + I: BlockImport> + + Send + + Sync, + RuntimeApi: ProvideRuntimeApi + Send + Sync, + RuntimeApi::Api: BeefyApi, +{ + // Voter -> RPC links + let (to_rpc_justif_sender, from_voter_justif_stream) = + notification::BeefySignedCommitmentStream::::channel(); + let (to_rpc_best_block_sender, from_voter_best_beefy_stream) = + notification::BeefyBestBlockStream::::channel(); + + // BlockImport -> Voter links + let (to_voter_justif_sender, from_block_import_justif_stream) = + notification::BeefySignedCommitmentStream::::channel(); + + // BlockImport + let import = + BeefyBlockImport::new(backend, runtime, wrapped_block_import, to_voter_justif_sender); + let voter_links = BeefyVoterLinks { + from_block_import_justif_stream, + to_rpc_justif_sender, + to_rpc_best_block_sender, + }; + let rpc_links = BeefyRPCLinks { from_voter_best_beefy_stream, from_voter_justif_stream }; + + (import, voter_links, rpc_links) +} + /// BEEFY gadget initialization parameters. pub struct BeefyParams where @@ -130,16 +197,14 @@ where pub key_store: Option, /// Gossip network pub network: N, - /// BEEFY signed commitment sender - pub signed_commitment_sender: BeefySignedCommitmentSender, - /// BEEFY best block sender - pub beefy_best_block_sender: BeefyBestBlockSender, /// Minimal delta between blocks, BEEFY should vote for pub min_block_delta: u32, /// Prometheus metric registry pub prometheus_registry: Option, /// Chain specific GRANDPA protocol name. See [`beefy_protocol_name::standard_name`]. pub protocol_name: std::borrow::Cow<'static, str>, + /// Links between the block importer, the background voter and the RPC layer. + pub links: BeefyVoterLinks, } /// Start the BEEFY gadget. @@ -160,11 +225,10 @@ where runtime, key_store, network, - signed_commitment_sender, - beefy_best_block_sender, min_block_delta, prometheus_registry, protocol_name, + links, } = beefy_params; let sync_oracle = network.clone(); @@ -194,14 +258,13 @@ where client, backend, runtime, + sync_oracle, key_store: key_store.into(), - signed_commitment_sender, - beefy_best_block_sender, gossip_engine, gossip_validator, - min_block_delta, + links, metrics, - sync_oracle, + min_block_delta, }; let worker = worker::BeefyWorker::<_, _, _, _, _>::new(worker_params); diff --git a/client/beefy/src/metrics.rs b/client/beefy/src/metrics.rs index a6d29dbf88abb..71e34e24c4fa0 100644 --- a/client/beefy/src/metrics.rs +++ b/client/beefy/src/metrics.rs @@ -32,8 +32,8 @@ pub(crate) struct Metrics { pub beefy_best_block: Gauge, /// Next block BEEFY should vote on pub beefy_should_vote_on: Gauge, - /// Number of sessions without a signed commitment - pub beefy_skipped_sessions: Counter, + /// Number of sessions with lagging signed commitment on mandatory block + pub beefy_lagging_sessions: Counter, } impl Metrics { @@ -65,10 +65,10 @@ impl Metrics { Gauge::new("substrate_beefy_should_vote_on", "Next block, BEEFY should vote on")?, registry, )?, - beefy_skipped_sessions: register( + beefy_lagging_sessions: register( Counter::new( - "substrate_beefy_skipped_sessions", - "Number of sessions without a signed commitment", + "substrate_beefy_lagging_sessions", + "Number of sessions with lagging signed commitment on mandatory block", )?, registry, )?, diff --git a/client/beefy/src/notification.rs b/client/beefy/src/notification.rs index 7c18d809f6efb..9479891714234 100644 --- a/client/beefy/src/notification.rs +++ b/client/beefy/src/notification.rs @@ -17,11 +17,9 @@ // along with this program. If not, see . use sc_utils::notification::{NotificationSender, NotificationStream, TracingKeyStr}; -use sp_runtime::traits::{Block as BlockT, NumberFor}; +use sp_runtime::traits::Block as BlockT; -/// A commitment with matching BEEFY authorities' signatures. -pub type BeefySignedCommitment = - beefy_primitives::SignedCommitment, beefy_primitives::crypto::Signature>; +use crate::justification::BeefySignedCommitment; /// The sending half of the notifications channel(s) used to send /// notifications about best BEEFY block from the gadget side. diff --git a/client/beefy/src/round.rs b/client/beefy/src/round.rs index fecb9557df6ea..ebd85c8dea05d 100644 --- a/client/beefy/src/round.rs +++ b/client/beefy/src/round.rs @@ -59,7 +59,8 @@ impl RoundTracker { } } -fn threshold(authorities: usize) -> usize { +/// Minimum size of `authorities` subset that produced valid signatures for a block to finalize. +pub fn threshold(authorities: usize) -> usize { let faulty = authorities.saturating_sub(1) / 3; authorities - faulty } @@ -70,9 +71,10 @@ fn threshold(authorities: usize) -> usize { /// Does not do any validation on votes or signatures, layers above need to handle that (gossip). pub(crate) struct Rounds { rounds: BTreeMap<(Payload, NumberFor), RoundTracker>, - best_done: Option>, session_start: NumberFor, validator_set: ValidatorSet, + mandatory_done: bool, + best_done: Option>, } impl Rounds @@ -81,15 +83,15 @@ where B: Block, { pub(crate) fn new(session_start: NumberFor, validator_set: ValidatorSet) -> Self { - Rounds { rounds: BTreeMap::new(), best_done: None, session_start, validator_set } + Rounds { + rounds: BTreeMap::new(), + session_start, + validator_set, + mandatory_done: false, + best_done: None, + } } -} -impl Rounds -where - P: Ord + Hash + Clone, - B: Block, -{ pub(crate) fn validator_set_id(&self) -> ValidatorSetId { self.validator_set.id() } @@ -98,8 +100,12 @@ where self.validator_set.validators() } - pub(crate) fn session_start(&self) -> &NumberFor { - &self.session_start + pub(crate) fn session_start(&self) -> NumberFor { + self.session_start + } + + pub(crate) fn mandatory_done(&self) -> bool { + self.mandatory_done } pub(crate) fn should_self_vote(&self, round: &(P, NumberFor)) -> bool { @@ -113,12 +119,9 @@ where vote: (Public, Signature), self_vote: bool, ) -> bool { - if Some(round.1.clone()) <= self.best_done { - debug!( - target: "beefy", - "🥩 received vote for old stale round {:?}, ignoring", - round.1 - ); + let num = round.1; + if num < self.session_start || Some(num) <= self.best_done { + debug!(target: "beefy", "🥩 received vote for old stale round {:?}, ignoring", num); false } else if !self.validators().iter().any(|id| vote.0 == *id) { debug!( @@ -147,6 +150,7 @@ where // remove this and older (now stale) rounds let signatures = self.rounds.remove(round)?.votes; self.rounds.retain(|&(_, number), _| number > round.1); + self.mandatory_done = self.mandatory_done || round.1 == self.session_start; self.best_done = self.best_done.max(Some(round.1)); debug!(target: "beefy", "🥩 Concluded round #{}", round.1); @@ -160,6 +164,11 @@ where None } } + + #[cfg(test)] + pub(crate) fn test_set_mandatory_done(&mut self, done: bool) { + self.mandatory_done = done; + } } #[cfg(test)] @@ -226,7 +235,7 @@ mod tests { let rounds = Rounds::::new(session_start, validators); assert_eq!(42, rounds.validator_set_id()); - assert_eq!(1, *rounds.session_start()); + assert_eq!(1, rounds.session_start()); assert_eq!( &vec![Keyring::Alice.public(), Keyring::Bob.public(), Keyring::Charlie.public()], rounds.validators() @@ -307,6 +316,43 @@ mod tests { )); } + #[test] + fn old_rounds_not_accepted() { + sp_tracing::try_init_simple(); + + let validators = ValidatorSet::::new( + vec![Keyring::Alice.public(), Keyring::Bob.public(), Keyring::Charlie.public()], + 42, + ) + .unwrap(); + let alice = (Keyring::Alice.public(), Keyring::Alice.sign(b"I am committed")); + + let session_start = 10u64.into(); + let mut rounds = Rounds::::new(session_start, validators); + + let mut vote = (H256::from_low_u64_le(1), 9); + // add vote for previous session, should fail + assert!(!rounds.add_vote(&vote, alice.clone(), true)); + // no votes present + assert!(rounds.rounds.is_empty()); + + // simulate 11 was concluded + rounds.best_done = Some(11); + // add votes for current session, but already concluded rounds, should fail + vote.1 = 10; + assert!(!rounds.add_vote(&vote, alice.clone(), true)); + vote.1 = 11; + assert!(!rounds.add_vote(&vote, alice.clone(), true)); + // no votes present + assert!(rounds.rounds.is_empty()); + + // add good vote + vote.1 = 12; + assert!(rounds.add_vote(&vote, alice, true)); + // good vote present + assert_eq!(rounds.rounds.len(), 1); + } + #[test] fn multiple_rounds() { sp_tracing::try_init_simple(); diff --git a/client/beefy/src/tests.rs b/client/beefy/src/tests.rs index 78e697a6ada81..9c8f443dd1f7e 100644 --- a/client/beefy/src/tests.rs +++ b/client/beefy/src/tests.rs @@ -21,12 +21,15 @@ use futures::{future, stream::FuturesUnordered, Future, StreamExt}; use parking_lot::Mutex; use serde::{Deserialize, Serialize}; -use std::{sync::Arc, task::Poll}; +use std::{collections::HashMap, sync::Arc, task::Poll}; use tokio::{runtime::Runtime, time::Duration}; use sc_chain_spec::{ChainSpec, GenericChainSpec}; use sc_client_api::HeaderBackend; -use sc_consensus::BoxJustificationImport; +use sc_consensus::{ + BlockImport, BlockImportParams, BoxJustificationImport, ForkChoiceStrategy, ImportResult, + ImportedAux, +}; use sc_keystore::LocalKeystore; use sc_network_test::{ Block, BlockImportAdapter, FullPeerConfig, PassThroughVerifier, Peer, PeersClient, @@ -35,7 +38,8 @@ use sc_network_test::{ use sc_utils::notification::NotificationReceiver; use beefy_primitives::{ - crypto::AuthorityId, BeefyApi, ConsensusLog, MmrRootHash, ValidatorSet, BEEFY_ENGINE_ID, + crypto::{AuthorityId, Signature}, + BeefyApi, ConsensusLog, MmrRootHash, ValidatorSet, VersionedFinalityProof, BEEFY_ENGINE_ID, KEY_TYPE as BeefyKeyType, }; use sp_mmr_primitives::{ @@ -47,19 +51,32 @@ use sp_consensus::BlockOrigin; use sp_core::H256; use sp_keystore::{SyncCryptoStore, SyncCryptoStorePtr}; use sp_runtime::{ - codec::Encode, generic::BlockId, traits::Header as HeaderT, BuildStorage, DigestItem, Storage, + codec::Encode, + generic::BlockId, + traits::{Header as HeaderT, NumberFor}, + BuildStorage, DigestItem, Justifications, Storage, }; use substrate_test_runtime_client::{runtime::Header, ClientExt}; -use crate::{beefy_protocol_name, keystore::tests::Keyring as BeefyKeyring, notification::*}; +use crate::{ + beefy_block_import_and_links, beefy_protocol_name, justification::*, + keystore::tests::Keyring as BeefyKeyring, BeefyRPCLinks, BeefyVoterLinks, +}; pub(crate) const BEEFY_PROTOCOL_NAME: &'static str = "/beefy/1"; const GOOD_MMR_ROOT: MmrRootHash = MmrRootHash::repeat_byte(0xbf); const BAD_MMR_ROOT: MmrRootHash = MmrRootHash::repeat_byte(0x42); +type BeefyBlockImport = crate::BeefyBlockImport< + Block, + substrate_test_runtime_client::Backend, + two_validators::TestApi, + BlockImportAdapter>, +>; + pub(crate) type BeefyValidatorSet = ValidatorSet; -pub(crate) type BeefyPeer = Peer; +pub(crate) type BeefyPeer = Peer; #[derive(Debug, Serialize, Deserialize)] struct Genesis(std::collections::BTreeMap); @@ -97,17 +114,10 @@ fn beefy_protocol_name() { assert_eq!(proto_name.to_string(), expected); } -// TODO: compiler warns us about unused `signed_commitment_stream`, will use in later tests -#[allow(dead_code)] -#[derive(Clone)] -pub(crate) struct BeefyLinkHalf { - pub signed_commitment_stream: BeefySignedCommitmentStream, - pub beefy_best_block_stream: BeefyBestBlockStream, -} - #[derive(Default)] pub(crate) struct PeerData { - pub(crate) beefy_link_half: Mutex>, + pub(crate) beefy_rpc_links: Mutex>>, + pub(crate) beefy_voter_links: Mutex>>, } #[derive(Default)] @@ -163,7 +173,7 @@ impl BeefyTestNet { impl TestNetFactory for BeefyTestNet { type Verifier = PassThroughVerifier; - type BlockImport = PeersClient; + type BlockImport = BeefyBlockImport; type PeerData = PeerData; fn make_verifier(&self, _client: PeersClient, _: &PeerData) -> Self::Verifier { @@ -178,7 +188,17 @@ impl TestNetFactory for BeefyTestNet { Option>, Self::PeerData, ) { - (client.as_block_import(), None, PeerData::default()) + let inner = BlockImportAdapter::new(client.clone()); + let (block_import, voter_links, rpc_links) = beefy_block_import_and_links( + inner, + client.as_backend(), + Arc::new(two_validators::TestApi {}), + ); + let peer_data = PeerData { + beefy_rpc_links: Mutex::new(Some(rpc_links)), + beefy_voter_links: Mutex::new(Some(voter_links)), + }; + (BlockImportAdapter::new(block_import), None, peer_data) } fn peer(&mut self, i: usize) -> &mut BeefyPeer { @@ -333,12 +353,12 @@ where let keystore = create_beefy_keystore(*key); - let (signed_commitment_sender, signed_commitment_stream) = - BeefySignedCommitmentStream::::channel(); - let (beefy_best_block_sender, beefy_best_block_stream) = - BeefyBestBlockStream::::channel(); - let beefy_link_half = BeefyLinkHalf { signed_commitment_stream, beefy_best_block_stream }; - *peer.data.beefy_link_half.lock() = Some(beefy_link_half); + let (_, _, peer_data) = net.make_block_import(peer.client().clone()); + let PeerData { beefy_rpc_links, beefy_voter_links } = peer_data; + + let beefy_voter_links = beefy_voter_links.lock().take(); + *peer.data.beefy_rpc_links.lock() = beefy_rpc_links.lock().take(); + *peer.data.beefy_voter_links.lock() = beefy_voter_links.clone(); let beefy_params = crate::BeefyParams { client: peer.client().as_client(), @@ -346,8 +366,7 @@ where runtime: api.clone(), key_store: Some(keystore), network: peer.network_service().clone(), - signed_commitment_sender, - beefy_best_block_sender, + links: beefy_voter_links.unwrap(), min_block_delta, prometheus_registry: None, protocol_name: BEEFY_PROTOCOL_NAME.into(), @@ -382,11 +401,11 @@ pub(crate) fn get_beefy_streams( let mut best_block_streams = Vec::new(); let mut signed_commitment_streams = Vec::new(); for peer_id in 0..peers.len() { - let beefy_link_half = - net.peer(peer_id).data.beefy_link_half.lock().as_ref().unwrap().clone(); - let BeefyLinkHalf { signed_commitment_stream, beefy_best_block_stream } = beefy_link_half; - best_block_streams.push(beefy_best_block_stream.subscribe()); - signed_commitment_streams.push(signed_commitment_stream.subscribe()); + let beefy_rpc_links = net.peer(peer_id).data.beefy_rpc_links.lock().clone().unwrap(); + let BeefyRPCLinks { from_voter_justif_stream, from_voter_best_beefy_stream } = + beefy_rpc_links; + best_block_streams.push(from_voter_best_beefy_stream.subscribe()); + signed_commitment_streams.push(from_voter_justif_stream.subscribe()); } (best_block_streams, signed_commitment_streams) } @@ -670,3 +689,123 @@ fn correct_beefy_payload() { wait_for_best_beefy_blocks(best_blocks, &net, &mut runtime, &[11]); wait_for_beefy_signed_commitments(signed_commitments, &net, &mut runtime, &[11]); } + +#[test] +fn beefy_importing_blocks() { + use futures::{executor::block_on, future::poll_fn, task::Poll}; + use sc_block_builder::BlockBuilderProvider; + use sc_client_api::BlockBackend; + + sp_tracing::try_init_simple(); + + let mut net = BeefyTestNet::new(2, 0); + + let client = net.peer(0).client().clone(); + let (mut block_import, _, peer_data) = net.make_block_import(client.clone()); + let PeerData { beefy_rpc_links: _, beefy_voter_links } = peer_data; + let justif_stream = beefy_voter_links.lock().take().unwrap().from_block_import_justif_stream; + + let params = |block: Block, justifications: Option| { + let mut import = BlockImportParams::new(BlockOrigin::File, block.header); + import.justifications = justifications; + import.body = Some(block.extrinsics); + import.finalized = true; + import.fork_choice = Some(ForkChoiceStrategy::LongestChain); + import + }; + + let full_client = client.as_client(); + let parent_id = BlockId::Number(0); + let block_id = BlockId::Number(1); + let builder = full_client.new_block_at(&parent_id, Default::default(), false).unwrap(); + let block = builder.build().unwrap().block; + + // Import without justifications. + let mut justif_recv = justif_stream.subscribe(); + assert_eq!( + block_on(block_import.import_block(params(block.clone(), None), HashMap::new())).unwrap(), + ImportResult::Imported(ImportedAux { is_new_best: true, ..Default::default() }), + ); + assert_eq!( + block_on(block_import.import_block(params(block, None), HashMap::new())).unwrap(), + ImportResult::AlreadyInChain + ); + // Verify no justifications present: + { + // none in backend, + assert!(full_client.justifications(&block_id).unwrap().is_none()); + // and none sent to BEEFY worker. + block_on(poll_fn(move |cx| { + assert_eq!(justif_recv.poll_next_unpin(cx), Poll::Pending); + Poll::Ready(()) + })); + } + + // Import with valid justification. + let parent_id = BlockId::Number(1); + let block_num = 2; + let keys = &[BeefyKeyring::Alice, BeefyKeyring::Bob]; + let validator_set = ValidatorSet::new(make_beefy_ids(keys), 0).unwrap(); + let proof = crate::justification::tests::new_signed_commitment(block_num, &validator_set, keys); + let versioned_proof: VersionedFinalityProof, Signature> = proof.into(); + let encoded = versioned_proof.encode(); + let justif = Some(Justifications::from((BEEFY_ENGINE_ID, encoded))); + + let builder = full_client.new_block_at(&parent_id, Default::default(), false).unwrap(); + let block = builder.build().unwrap().block; + let mut justif_recv = justif_stream.subscribe(); + assert_eq!( + block_on(block_import.import_block(params(block, justif), HashMap::new())).unwrap(), + ImportResult::Imported(ImportedAux { + bad_justification: false, + is_new_best: true, + ..Default::default() + }), + ); + // Verify justification successfully imported: + { + // available in backend, + assert!(full_client.justifications(&BlockId::Number(block_num)).unwrap().is_some()); + // and also sent to BEEFY worker. + block_on(poll_fn(move |cx| { + match justif_recv.poll_next_unpin(cx) { + Poll::Ready(Some(_justification)) => (), + v => panic!("unexpected value: {:?}", v), + } + Poll::Ready(()) + })); + } + + // Import with invalid justification (incorrect validator set). + let parent_id = BlockId::Number(2); + let block_num = 3; + let keys = &[BeefyKeyring::Alice]; + let validator_set = ValidatorSet::new(make_beefy_ids(keys), 1).unwrap(); + let proof = crate::justification::tests::new_signed_commitment(block_num, &validator_set, keys); + let versioned_proof: VersionedFinalityProof, Signature> = proof.into(); + let encoded = versioned_proof.encode(); + let justif = Some(Justifications::from((BEEFY_ENGINE_ID, encoded))); + + let builder = full_client.new_block_at(&parent_id, Default::default(), false).unwrap(); + let block = builder.build().unwrap().block; + let mut justif_recv = justif_stream.subscribe(); + assert_eq!( + block_on(block_import.import_block(params(block, justif), HashMap::new())).unwrap(), + ImportResult::Imported(ImportedAux { + // Still `false` because we don't want to fail import on bad BEEFY justifications. + bad_justification: false, + is_new_best: true, + ..Default::default() + }), + ); + // Verify bad justifications was not imported: + { + // none in backend, + assert!(full_client.justifications(&block_id).unwrap().is_none()); + // and none sent to BEEFY worker. + block_on(poll_fn(move |cx| { + assert_eq!(justif_recv.poll_next_unpin(cx), Poll::Pending); + Poll::Ready(()) + })); + } +} diff --git a/client/beefy/src/worker.rs b/client/beefy/src/worker.rs index dccf7ba7504dd..3bff0822ebdb4 100644 --- a/client/beefy/src/worker.rs +++ b/client/beefy/src/worker.rs @@ -17,11 +17,10 @@ // along with this program. If not, see . use std::{ - collections::{BTreeMap, BTreeSet}, + collections::{BTreeMap, BTreeSet, VecDeque}, fmt::Debug, marker::PhantomData, sync::Arc, - time::Duration, }; use codec::{Codec, Decode, Encode}; @@ -48,57 +47,174 @@ use beefy_primitives::{ }; use crate::{ - error, + error::Error, gossip::{topic, GossipValidator}, + justification::BeefySignedCommitment, keystore::BeefyKeystore, metric_inc, metric_set, metrics::Metrics, - notification::{BeefyBestBlockSender, BeefySignedCommitmentSender}, round::Rounds, - Client, + BeefyVoterLinks, Client, }; +enum RoundAction { + Drop, + Process, + Enqueue, +} + +/// Responsible for the voting strategy. +/// It chooses which incoming votes to accept and which votes to generate. +struct VoterOracle { + /// Queue of known sessions. Keeps track of voting rounds (block numbers) within each session. + /// + /// There are three voter states coresponding to three queue states: + /// 1. voter uninitialized: queue empty, + /// 2. up-to-date - all mandatory blocks leading up to current GRANDPA finalized: + /// queue has ONE element, the 'current session' where `mandatory_done == true`, + /// 3. lagging behind GRANDPA: queue has [1, N] elements, where all `mandatory_done == false`. + /// In this state, everytime a session gets its mandatory block BEEFY finalized, it's + /// popped off the queue, eventually getting to state `2. up-to-date`. + sessions: VecDeque>, + /// Min delta in block numbers between two blocks, BEEFY should vote on. + min_block_delta: u32, +} + +impl VoterOracle { + pub fn new(min_block_delta: u32) -> Self { + Self { + sessions: VecDeque::new(), + // Always target at least one block better than current best beefy. + min_block_delta: min_block_delta.max(1), + } + } + + /// Return mutable reference to rounds pertaining to first session in the queue. + /// Voting will always happen at the head of the queue. + pub fn rounds_mut(&mut self) -> Option<&mut Rounds> { + self.sessions.front_mut() + } + + /// Add new observed session to the Oracle. + pub fn add_session(&mut self, rounds: Rounds) { + self.sessions.push_back(rounds); + self.try_prune(); + } + + /// Prune the queue to keep the Oracle in one of the expected three states. + /// + /// Call this function on each BEEFY finality, + /// or at the very least on each BEEFY mandatory block finality. + pub fn try_prune(&mut self) { + if self.sessions.len() > 1 { + // when there's multiple sessions, only keep the `!mandatory_done()` ones. + self.sessions.retain(|s| !s.mandatory_done()) + } + } + + /// Return `(A, B)` tuple representing inclusive [A, B] interval of votes to accept. + pub fn accepted_interval( + &self, + best_grandpa: NumberFor, + ) -> Result<(NumberFor, NumberFor), Error> { + let rounds = self.sessions.front().ok_or(Error::UninitSession)?; + + if rounds.mandatory_done() { + // There's only one session active and its mandatory is done. + // Accept any GRANDPA finalized vote. + Ok((rounds.session_start(), best_grandpa.into())) + } else { + // There's at least one session with mandatory not done. + // Only accept votes for the mandatory block in the front of queue. + Ok((rounds.session_start(), rounds.session_start())) + } + } + + /// Utility function to quickly decide what to do for each round. + pub fn triage_round( + &self, + round: NumberFor, + best_grandpa: NumberFor, + ) -> Result { + let (start, end) = self.accepted_interval(best_grandpa)?; + if start <= round && round <= end { + Ok(RoundAction::Process) + } else if round > end { + Ok(RoundAction::Enqueue) + } else { + Ok(RoundAction::Drop) + } + } + + /// Return `Some(number)` if we should be voting on block `number`, + /// return `None` if there is no block we should vote on. + pub fn voting_target( + &self, + best_beefy: Option>, + best_grandpa: NumberFor, + ) -> Option> { + let rounds = if let Some(r) = self.sessions.front() { + r + } else { + debug!(target: "beefy", "🥩 No voting round started"); + return None + }; + + // `target` is guaranteed > `best_beefy` since `min_block_delta` is at least `1`. + let target = + vote_target(best_grandpa, best_beefy, rounds.session_start(), self.min_block_delta); + trace!( + target: "beefy", + "🥩 best beefy: #{:?}, best finalized: #{:?}, current_vote_target: {:?}", + best_beefy, + best_grandpa, + target + ); + target + } +} + pub(crate) struct WorkerParams { pub client: Arc, pub backend: Arc, pub runtime: Arc, + pub sync_oracle: SO, pub key_store: BeefyKeystore, - pub signed_commitment_sender: BeefySignedCommitmentSender, - pub beefy_best_block_sender: BeefyBestBlockSender, pub gossip_engine: GossipEngine, pub gossip_validator: Arc>, - pub min_block_delta: u32, + pub links: BeefyVoterLinks, pub metrics: Option, - pub sync_oracle: SO, + pub min_block_delta: u32, } /// A BEEFY worker plays the BEEFY protocol pub(crate) struct BeefyWorker { + // utilities client: Arc, backend: Arc, runtime: Arc, + sync_oracle: SO, key_store: BeefyKeystore, - signed_commitment_sender: BeefySignedCommitmentSender, gossip_engine: GossipEngine, gossip_validator: Arc>, - /// Min delta in block numbers between two blocks, BEEFY should vote on - min_block_delta: u32, + + // channels + /// Links between the block importer, the background voter and the RPC layer. + links: BeefyVoterLinks, + + // voter state + /// BEEFY client metrics. metrics: Option, - rounds: Option>, - /// Buffer holding votes for blocks that the client hasn't seen finality for. - pending_votes: BTreeMap, Vec, AuthorityId, Signature>>>, - /// Best block we received a GRANDPA notification for + /// Best block we received a GRANDPA finality for. best_grandpa_block_header: ::Header, - /// Best block a BEEFY voting round has been concluded for + /// Best block a BEEFY voting round has been concluded for. best_beefy_block: Option>, - /// Used to keep RPC worker up to date on latest/best beefy - beefy_best_block_sender: BeefyBestBlockSender, - /// Validator set id for the last signed commitment - last_signed_id: u64, - /// Handle to the sync oracle - sync_oracle: SO, - // keep rustc happy - _backend: PhantomData, + /// Buffer holding votes for future processing. + pending_votes: BTreeMap, Vec, AuthorityId, Signature>>>, + /// Buffer holding justifications for future processing. + pending_justifications: BTreeMap, Vec>>, + /// Chooses which incoming votes to accept and which votes to generate. + voting_oracle: VoterOracle, } impl BeefyWorker @@ -122,13 +238,12 @@ where backend, runtime, key_store, - signed_commitment_sender, - beefy_best_block_sender, + sync_oracle, gossip_engine, gossip_validator, - min_block_delta, + links, metrics, - sync_oracle, + min_block_delta, } = worker_params; let last_finalized_header = client @@ -139,53 +254,29 @@ where client: client.clone(), backend, runtime, + sync_oracle, key_store, - signed_commitment_sender, gossip_engine, gossip_validator, - // always target at least one block better than current best beefy - min_block_delta: min_block_delta.max(1), + links, metrics, - rounds: None, - pending_votes: BTreeMap::new(), best_grandpa_block_header: last_finalized_header, best_beefy_block: None, - last_signed_id: 0, - beefy_best_block_sender, - sync_oracle, - _backend: PhantomData, + pending_votes: BTreeMap::new(), + pending_justifications: BTreeMap::new(), + voting_oracle: VoterOracle::new(min_block_delta), } } - /// Return `Some(number)` if we should be voting on block `number` now, - /// return `None` if there is no block we should vote on now. - fn current_vote_target(&self) -> Option> { - let rounds = if let Some(r) = &self.rounds { - r - } else { - debug!(target: "beefy", "🥩 No voting round started"); - return None - }; - - let best_finalized = *self.best_grandpa_block_header.number(); - // `target` is guaranteed > `best_beefy` since `min_block_delta` is at least `1`. - let target = vote_target( - best_finalized, - self.best_beefy_block, - *rounds.session_start(), - self.min_block_delta, - ); - trace!( - target: "beefy", - "🥩 best beefy: #{:?}, best finalized: #{:?}, current_vote_target: {:?}", - self.best_beefy_block, - best_finalized, - target - ); - if let Some(target) = &target { - metric_set!(self, beefy_should_vote_on, target); - } - target + /// Simple wrapper that gets MMR root from header digests or from client state. + fn get_mmr_root_digest(&self, header: &B::Header) -> Option { + find_mmr_root_digest::(header).or_else(|| { + self.runtime + .runtime_api() + .mmr_root(&BlockId::hash(header.hash())) + .ok() + .and_then(|r| r.ok()) + }) } /// Verify `active` validator set for `block` against the key store @@ -200,7 +291,7 @@ where &self, block: &NumberFor, active: &ValidatorSet, - ) -> Result<(), error::Error> { + ) -> Result<(), Error> { let active: BTreeSet<&AuthorityId> = active.validators().iter().collect(); let public_keys = self.key_store.public_keys()?; @@ -209,121 +300,100 @@ where if store.intersection(&active).count() == 0 { let msg = "no authority public key found in store".to_string(); debug!(target: "beefy", "🥩 for block {:?} {}", block, msg); - Err(error::Error::Keystore(msg)) + Err(Error::Keystore(msg)) } else { Ok(()) } } - /// Set best BEEFY block to `block_num`. - /// - /// Also sends/updates the best BEEFY block hash to the RPC worker. - fn set_best_beefy_block(&mut self, block_num: NumberFor) { - if Some(block_num) > self.best_beefy_block { - // Try to get block hash ourselves. - let block_hash = match self.client.hash(block_num) { - Ok(h) => h, - Err(e) => { - error!(target: "beefy", "🥩 Failed to get hash for block number {}: {}", - block_num, e); - None - }, - }; - // Update RPC worker with new best BEEFY block hash. - block_hash.map(|hash| { - self.beefy_best_block_sender - .notify(|| Ok::<_, ()>(hash)) - .expect("forwards closure result; the closure always returns Ok; qed.") - }); - // Set new best BEEFY block number. - self.best_beefy_block = Some(block_num); - metric_set!(self, beefy_best_block, block_num); - } else { - debug!(target: "beefy", "🥩 Can't set best beefy to older: {}", block_num); - } - } - /// Handle session changes by starting new voting round for mandatory blocks. fn init_session_at( &mut self, - active: ValidatorSet, + validator_set: ValidatorSet, new_session_start: NumberFor, ) { - debug!(target: "beefy", "🥩 New active validator set: {:?}", active); - metric_set!(self, beefy_validator_set_id, active.id()); - // BEEFY should produce a signed commitment for each session - if active.id() != self.last_signed_id + 1 && - active.id() != GENESIS_AUTHORITY_SET_ID && - self.last_signed_id != 0 - { - debug!( - target: "beefy", "🥩 Detected skipped session: active-id {:?}, last-signed-id {:?}", - active.id(), - self.last_signed_id, - ); - metric_inc!(self, beefy_skipped_sessions); + debug!(target: "beefy", "🥩 New active validator set: {:?}", validator_set); + metric_set!(self, beefy_validator_set_id, validator_set.id()); + + // BEEFY should produce the mandatory block of each session. + if let Some(active_session) = self.voting_oracle.rounds_mut() { + if !active_session.mandatory_done() { + debug!( + target: "beefy", "🥩 New session {} while active session {} is still lagging.", + validator_set.id(), + active_session.validator_set_id(), + ); + metric_inc!(self, beefy_lagging_sessions); + } } if log_enabled!(target: "beefy", log::Level::Debug) { // verify the new validator set - only do it if we're also logging the warning - let _ = self.verify_validator_set(&new_session_start, &active); + let _ = self.verify_validator_set(&new_session_start, &validator_set); } - let id = active.id(); - self.rounds = Some(Rounds::new(new_session_start, active)); + let id = validator_set.id(); + self.voting_oracle.add_session(Rounds::new(new_session_start, validator_set)); info!(target: "beefy", "🥩 New Rounds for validator set id: {:?} with session_start {:?}", id, new_session_start); } fn handle_finality_notification(&mut self, notification: &FinalityNotification) { debug!(target: "beefy", "🥩 Finality notification: {:?}", notification); - let number = *notification.header.number(); - - // On start-up ignore old finality notifications that we're not interested in. - if number <= *self.best_grandpa_block_header.number() { - debug!(target: "beefy", "🥩 Got unexpected finality for old block #{:?}", number); - return - } + let header = ¬ification.header; - // update best GRANDPA finalized block we have seen - self.best_grandpa_block_header = notification.header.clone(); + if *header.number() > *self.best_grandpa_block_header.number() { + // update best GRANDPA finalized block we have seen + self.best_grandpa_block_header = header.clone(); - self.handle_finality(¬ification.header); - } - - fn handle_finality(&mut self, header: &B::Header) { - // Check for and handle potential new session. - if let Some(new_validator_set) = find_authorities_change::(header) { - self.init_session_at(new_validator_set, *header.number()); + // Check for and enqueue potential new session. + if let Some(new_validator_set) = find_authorities_change::(header) { + self.init_session_at(new_validator_set, *header.number()); + // TODO: when adding SYNC protocol, fire up a request for justification for this + // mandatory block here. + } } + } - // Handle any pending votes for now finalized blocks. - self.check_pending_votes(); - - // Vote if there's now a new vote target. - if let Some(target_number) = self.current_vote_target() { - self.do_vote(target_number); - } + /// Based on [VoterOracle] this vote is either processed here or enqueued for later. + fn triage_incoming_vote( + &mut self, + vote: VoteMessage, AuthorityId, Signature>, + ) -> Result<(), Error> { + let block_num = vote.commitment.block_number; + let best_grandpa = *self.best_grandpa_block_header.number(); + match self.voting_oracle.triage_round(block_num, best_grandpa)? { + RoundAction::Process => self.handle_vote( + (vote.commitment.payload, vote.commitment.block_number), + (vote.id, vote.signature), + false, + )?, + RoundAction::Enqueue => { + debug!(target: "beefy", "🥩 Buffer vote for round: {:?}.", block_num); + self.pending_votes.entry(block_num).or_default().push(vote) + }, + RoundAction::Drop => (), + }; + Ok(()) } - // Handles all buffered votes for now finalized blocks. - fn check_pending_votes(&mut self) { - let not_finalized = self.best_grandpa_block_header.number().saturating_add(1u32.into()); - let still_pending = self.pending_votes.split_off(¬_finalized); - let votes_to_handle = std::mem::replace(&mut self.pending_votes, still_pending); - for (num, votes) in votes_to_handle.into_iter() { - if Some(num) > self.best_beefy_block { - debug!(target: "beefy", "🥩 Handling buffered votes for now GRANDPA finalized block: {:?}.", num); - for v in votes.into_iter() { - self.handle_vote( - (v.commitment.payload, v.commitment.block_number), - (v.id, v.signature), - false, - ); - } - } else { - debug!(target: "beefy", "🥩 Dropping outdated buffered votes for now BEEFY finalized block: {:?}.", num); - } - } + /// Based on [VoterOracle] this justification is either processed here or enqueued for later. + /// + /// Expects `justification` to be valid. + fn triage_incoming_justif( + &mut self, + justification: BeefySignedCommitment, + ) -> Result<(), Error> { + let block_num = justification.commitment.block_number; + let best_grandpa = *self.best_grandpa_block_header.number(); + match self.voting_oracle.triage_round(block_num, best_grandpa)? { + RoundAction::Process => self.finalize(justification), + RoundAction::Enqueue => { + debug!(target: "beefy", "🥩 Buffer justification for round: {:?}.", block_num); + self.pending_justifications.entry(block_num).or_default().push(justification) + }, + RoundAction::Drop => (), + }; + Ok(()) } fn handle_vote( @@ -331,28 +401,20 @@ where round: (Payload, NumberFor), vote: (AuthorityId, Signature), self_vote: bool, - ) { + ) -> Result<(), Error> { self.gossip_validator.note_round(round.1); - let rounds = if let Some(rounds) = self.rounds.as_mut() { - rounds - } else { - debug!(target: "beefy", "🥩 Missing validator set - can't handle vote {:?}", vote); - return - }; + let rounds = self.voting_oracle.rounds_mut().ok_or(Error::UninitSession)?; if rounds.add_vote(&round, vote, self_vote) { if let Some(signatures) = rounds.try_conclude(&round) { self.gossip_validator.conclude_round(round.1); - // id is stored for skipped session metric calculation - self.last_signed_id = rounds.validator_set_id(); - let block_num = round.1; let commitment = Commitment { payload: round.0, block_number: block_num, - validator_set_id: self.last_signed_id, + validator_set_id: rounds.validator_set_id(), }; let signed_commitment = SignedCommitment { commitment, signatures }; @@ -370,24 +432,115 @@ where ) { debug!(target: "beefy", "🥩 Error {:?} on appending justification: {:?}", e, signed_commitment); } - self.signed_commitment_sender - .notify(|| Ok::<_, ()>(signed_commitment)) - .expect("forwards closure result; the closure always returns Ok; qed."); - self.set_best_beefy_block(block_num); + // We created the `signed_commitment` and know to be valid. + self.finalize(signed_commitment); + } + } + Ok(()) + } + + /// Provide BEEFY finality for block based on `signed_commitment`: + /// 1. Prune irrelevant past sessions from the oracle, + /// 2. Set BEEFY best block, + /// 3. Send best block hash and `signed_commitment` to RPC worker. + /// + /// Expects `signed commitment` to be valid. + fn finalize(&mut self, signed_commitment: BeefySignedCommitment) { + // Prune any now "finalized" sessions from queue. + self.voting_oracle.try_prune(); - // Vote if there's now a new vote target. - if let Some(target_number) = self.current_vote_target() { - self.do_vote(target_number); + let block_num = signed_commitment.commitment.block_number; + if Some(block_num) > self.best_beefy_block { + // Set new best BEEFY block number. + self.best_beefy_block = Some(block_num); + metric_set!(self, beefy_best_block, block_num); + + self.client.hash(block_num).ok().flatten().map(|hash| { + self.links + .to_rpc_best_block_sender + .notify(|| Ok::<_, ()>(hash)) + .expect("forwards closure result; the closure always returns Ok; qed.") + }); + + self.links + .to_rpc_justif_sender + .notify(|| Ok::<_, ()>(signed_commitment)) + .expect("forwards closure result; the closure always returns Ok; qed."); + } else { + debug!(target: "beefy", "🥩 Can't set best beefy to older: {}", block_num); + } + } + + /// Handle previously buffered justifications and votes that now land in the voting interval. + fn try_pending_justif_and_votes(&mut self) -> Result<(), Error> { + let best_grandpa = *self.best_grandpa_block_header.number(); + let _ph = PhantomData::::default(); + + fn to_process_for( + pending: &mut BTreeMap, Vec>, + (start, end): (NumberFor, NumberFor), + _: PhantomData, + ) -> BTreeMap, Vec> { + // These are still pending. + let still_pending = pending.split_off(&end.saturating_add(1u32.into())); + // These can be processed. + let to_handle = pending.split_off(&start); + // The rest can be dropped. + *pending = still_pending; + // Return ones to process. + to_handle + } + + // Process pending justifications. + let interval = self.voting_oracle.accepted_interval(best_grandpa)?; + if !self.pending_justifications.is_empty() { + let justifs_to_handle = to_process_for(&mut self.pending_justifications, interval, _ph); + for (num, justifications) in justifs_to_handle.into_iter() { + debug!(target: "beefy", "🥩 Handle buffered justifications for: {:?}.", num); + for justif in justifications.into_iter() { + self.finalize(justif); } } } + + // Process pending votes. + let interval = self.voting_oracle.accepted_interval(best_grandpa)?; + if !self.pending_votes.is_empty() { + let votes_to_handle = to_process_for(&mut self.pending_votes, interval, _ph); + for (num, votes) in votes_to_handle.into_iter() { + debug!(target: "beefy", "🥩 Handle buffered votes for: {:?}.", num); + for v in votes.into_iter() { + if let Err(err) = self.handle_vote( + (v.commitment.payload, v.commitment.block_number), + (v.id, v.signature), + false, + ) { + error!(target: "beefy", "🥩 Error handling buffered vote: {}", err); + }; + } + } + } + Ok(()) + } + + /// Decide if should vote, then vote.. or don't.. + fn try_to_vote(&mut self) -> Result<(), Error> { + // Vote if there's now a new vote target. + if let Some(target) = self + .voting_oracle + .voting_target(self.best_beefy_block, *self.best_grandpa_block_header.number()) + { + metric_set!(self, beefy_should_vote_on, target); + self.do_vote(target)?; + } + Ok(()) } /// Create and gossip Signed Commitment for block number `target_number`. /// /// Also handle this self vote by calling `self.handle_vote()` for it. - fn do_vote(&mut self, target_number: NumberFor) { + fn do_vote(&mut self, target_number: NumberFor) -> Result<(), Error> { debug!(target: "beefy", "🥩 Try voting on {}", target_number); // Most of the time we get here, `target` is actually `best_grandpa`, @@ -395,18 +548,13 @@ where let target_header = if target_number == *self.best_grandpa_block_header.number() { self.best_grandpa_block_header.clone() } else { - match self.client.expect_header(BlockId::Number(target_number)) { - Ok(h) => h, - Err(err) => { - debug!( - target: "beefy", - "🥩 Could not get header for block #{:?} (error: {:?}), skipping vote..", - target_number, - err - ); - return - }, - } + self.client.expect_header(BlockId::Number(target_number)).map_err(|err| { + let err_msg = format!( + "Couldn't get header for block #{:?} (error: {:?}), skipping vote..", + target_number, err + ); + Error::Backend(err_msg) + })? }; let target_hash = target_header.hash(); @@ -414,26 +562,23 @@ where hash } else { warn!(target: "beefy", "🥩 No MMR root digest found for: {:?}", target_hash); - return + return Ok(()) }; let payload = Payload::new(known_payload_ids::MMR_ROOT_ID, mmr_root.encode()); - let (validators, validator_set_id) = if let Some(rounds) = &self.rounds { - if !rounds.should_self_vote(&(payload.clone(), target_number)) { - debug!(target: "beefy", "🥩 Don't double vote for block number: {:?}", target_number); - return - } - (rounds.validators(), rounds.validator_set_id()) - } else { - debug!(target: "beefy", "🥩 Missing validator set - can't vote for: {:?}", target_hash); - return - }; + let rounds = self.voting_oracle.rounds_mut().ok_or(Error::UninitSession)?; + if !rounds.should_self_vote(&(payload.clone(), target_number)) { + debug!(target: "beefy", "🥩 Don't double vote for block number: {:?}", target_number); + return Ok(()) + } + let (validators, validator_set_id) = (rounds.validators(), rounds.validator_set_id()); + let authority_id = if let Some(id) = self.key_store.authority_id(validators) { debug!(target: "beefy", "🥩 Local authority id: {:?}", id); id } else { debug!(target: "beefy", "🥩 Missing validator id - can't vote for: {:?}", target_hash); - return + return Ok(()) }; let commitment = Commitment { payload, block_number: target_number, validator_set_id }; @@ -443,7 +588,7 @@ where Ok(sig) => sig, Err(err) => { warn!(target: "beefy", "🥩 Error signing commitment: {:?}", err); - return + return Ok(()) }, }; @@ -462,13 +607,17 @@ where debug!(target: "beefy", "🥩 Sent vote message: {:?}", message); - self.handle_vote( + if let Err(err) = self.handle_vote( (message.commitment.payload, message.commitment.block_number), (message.id, message.signature), true, - ); + ) { + error!(target: "beefy", "🥩 Error handling self vote: {}", err); + } self.gossip_engine.gossip_message(topic::(), encoded_message, false); + + Ok(()) } /// Wait for BEEFY runtime pallet to be available. @@ -494,6 +643,9 @@ where // Once we'll implement 'initial sync' (catch-up), the worker will be able to // start voting right away. self.handle_finality_notification(¬if); + if let Err(err) = self.try_to_vote() { + debug!(target: "beefy", "🥩 {}", err); + } break } else { trace!(target: "beefy", "🥩 Finality notification: {:?}", notif); @@ -529,15 +681,14 @@ where }) .fuse(), ); + let mut block_import_justif = self.links.from_block_import_justif_stream.subscribe().fuse(); loop { - while self.sync_oracle.is_major_syncing() { - debug!(target: "beefy", "Waiting for major sync to complete..."); - futures_timer::Delay::new(Duration::from_secs(5)).await; - } - let mut gossip_engine = &mut self.gossip_engine; - futures::select! { + // Wait for, and handle external events. + // The branches below only change 'state', actual voting happen afterwards, + // based on the new resulting 'state'. + futures::select_biased! { notification = finality_notifications.next() => { if let Some(notification) = notification { self.handle_finality_notification(¬ification); @@ -545,24 +696,24 @@ where return; } }, + // TODO: when adding SYNC protocol, join the on-demand justifications stream to + // this one, and handle them both here. + justif = block_import_justif.next() => { + if let Some(justif) = justif { + // Block import justifications have already been verified to be valid + // by `BeefyBlockImport`. + if let Err(err) = self.triage_incoming_justif(justif) { + debug!(target: "beefy", "🥩 {}", err); + } + } else { + return; + } + }, vote = votes.next() => { if let Some(vote) = vote { - let block_num = vote.commitment.block_number; - if block_num > *self.best_grandpa_block_header.number() { - // Only handle votes for blocks we _know_ have been finalized. - // Buffer vote to be handled later. - debug!( - target: "beefy", - "🥩 Buffering vote for not (yet) finalized block: {:?}.", - block_num - ); - self.pending_votes.entry(block_num).or_default().push(vote); - } else { - self.handle_vote( - (vote.commitment.payload, vote.commitment.block_number), - (vote.id, vote.signature), - false - ); + // Votes have already been verified to be valid by the gossip validator. + if let Err(err) = self.triage_incoming_vote(vote) { + debug!(target: "beefy", "🥩 {}", err); } } else { return; @@ -573,18 +724,20 @@ where return; } } - } - } - /// Simple wrapper that gets MMR root from header digests or from client state. - fn get_mmr_root_digest(&self, header: &B::Header) -> Option { - find_mmr_root_digest::(header).or_else(|| { - self.runtime - .runtime_api() - .mmr_root(&BlockId::hash(header.hash())) - .ok() - .and_then(|r| r.ok()) - }) + // Don't bother acting on 'state' changes during major sync. + if !self.sync_oracle.is_major_syncing() { + // Handle pending justifications and/or votes for now GRANDPA finalized blocks. + if let Err(err) = self.try_pending_justif_and_votes() { + debug!(target: "beefy", "🥩 {}", err); + } + + // There were external events, 'state' is changed, author a vote if needed/possible. + if let Err(err) = self.try_to_vote() { + debug!(target: "beefy", "🥩 {}", err); + } + } + } } } @@ -684,11 +837,11 @@ pub(crate) mod tests { create_beefy_keystore, get_beefy_streams, make_beefy_ids, two_validators::TestApi, BeefyPeer, BeefyTestNet, BEEFY_PROTOCOL_NAME, }, + BeefyRPCLinks, }; use futures::{executor::block_on, future::poll_fn, task::Poll}; - use crate::tests::BeefyLinkHalf; use sc_client_api::HeaderBackend; use sc_network::NetworkService; use sc_network_test::{PeersFullClient, TestNetFactory}; @@ -705,12 +858,21 @@ pub(crate) mod tests { ) -> BeefyWorker>> { let keystore = create_beefy_keystore(*key); - let (signed_commitment_sender, signed_commitment_stream) = + let (to_rpc_justif_sender, from_voter_justif_stream) = BeefySignedCommitmentStream::::channel(); - let (beefy_best_block_sender, beefy_best_block_stream) = + let (to_rpc_best_block_sender, from_voter_best_beefy_stream) = BeefyBestBlockStream::::channel(); - let beefy_link_half = BeefyLinkHalf { signed_commitment_stream, beefy_best_block_stream }; - *peer.data.beefy_link_half.lock() = Some(beefy_link_half); + let (_, from_block_import_justif_stream) = BeefySignedCommitmentStream::::channel(); + + let beefy_rpc_links = + BeefyRPCLinks { from_voter_justif_stream, from_voter_best_beefy_stream }; + *peer.data.beefy_rpc_links.lock() = Some(beefy_rpc_links); + + let links = BeefyVoterLinks { + from_block_import_justif_stream, + to_rpc_justif_sender, + to_rpc_best_block_sender, + }; let api = Arc::new(TestApi {}); let network = peer.network_service().clone(); @@ -723,8 +885,7 @@ pub(crate) mod tests { backend: peer.client().as_backend(), runtime: api, key_store: Some(keystore).into(), - signed_commitment_sender, - beefy_best_block_sender, + links, gossip_engine, gossip_validator, min_block_delta, @@ -826,6 +987,107 @@ pub(crate) mod tests { assert_eq!(Some(1072), t); } + #[test] + fn should_vote_target() { + let mut oracle = VoterOracle::::new(1); + + // rounds not initialized -> should vote: `None` + assert_eq!(oracle.voting_target(None, 1), None); + + let keys = &[Keyring::Alice]; + let validator_set = ValidatorSet::new(make_beefy_ids(keys), 0).unwrap(); + + oracle.add_session(Rounds::new(1, validator_set.clone())); + + // under min delta + oracle.min_block_delta = 4; + assert_eq!(oracle.voting_target(Some(1), 1), None); + assert_eq!(oracle.voting_target(Some(2), 5), None); + + // vote on min delta + assert_eq!(oracle.voting_target(Some(4), 9), Some(8)); + oracle.min_block_delta = 8; + assert_eq!(oracle.voting_target(Some(10), 18), Some(18)); + + // vote on power of two + oracle.min_block_delta = 1; + assert_eq!(oracle.voting_target(Some(1000), 1008), Some(1004)); + assert_eq!(oracle.voting_target(Some(1000), 1016), Some(1008)); + + // nothing new to vote on + assert_eq!(oracle.voting_target(Some(1000), 1000), None); + + // vote on mandatory + oracle.sessions.clear(); + oracle.add_session(Rounds::new(1000, validator_set.clone())); + assert_eq!(oracle.voting_target(None, 1008), Some(1000)); + oracle.sessions.clear(); + oracle.add_session(Rounds::new(1001, validator_set.clone())); + assert_eq!(oracle.voting_target(Some(1000), 1008), Some(1001)); + } + + #[test] + fn test_oracle_accepted_interval() { + let keys = &[Keyring::Alice]; + let validator_set = ValidatorSet::new(make_beefy_ids(keys), 0).unwrap(); + + let mut oracle = VoterOracle::::new(1); + + // rounds not initialized -> should accept votes: `None` + assert!(oracle.accepted_interval(1).is_err()); + + let session_one = 1; + oracle.add_session(Rounds::new(session_one, validator_set.clone())); + // mandatory not done, only accept mandatory + for i in 0..15 { + assert_eq!(oracle.accepted_interval(i), Ok((session_one, session_one))); + } + + // add more sessions, nothing changes + let session_two = 11; + let session_three = 21; + oracle.add_session(Rounds::new(session_two, validator_set.clone())); + oracle.add_session(Rounds::new(session_three, validator_set.clone())); + // mandatory not done, should accept mandatory for session_one + for i in session_three..session_three + 15 { + assert_eq!(oracle.accepted_interval(i), Ok((session_one, session_one))); + } + + // simulate finish mandatory for session one, prune oracle + oracle.sessions.front_mut().unwrap().test_set_mandatory_done(true); + oracle.try_prune(); + // session_one pruned, should accept mandatory for session_two + for i in session_three..session_three + 15 { + assert_eq!(oracle.accepted_interval(i), Ok((session_two, session_two))); + } + + // simulate finish mandatory for session two, prune oracle + oracle.sessions.front_mut().unwrap().test_set_mandatory_done(true); + oracle.try_prune(); + // session_two pruned, should accept mandatory for session_three + for i in session_three..session_three + 15 { + assert_eq!(oracle.accepted_interval(i), Ok((session_three, session_three))); + } + + // simulate finish mandatory for session three + oracle.sessions.front_mut().unwrap().test_set_mandatory_done(true); + // verify all other blocks in this session are now open to voting + for i in session_three..session_three + 15 { + assert_eq!(oracle.accepted_interval(i), Ok((session_three, i))); + } + // pruning does nothing in this case + oracle.try_prune(); + for i in session_three..session_three + 15 { + assert_eq!(oracle.accepted_interval(i), Ok((session_three, i))); + } + + // adding new session automatically prunes "finalized" previous session + let session_four = 31; + oracle.add_session(Rounds::new(session_four, validator_set.clone())); + assert_eq!(oracle.sessions.front().unwrap().session_start(), session_four); + assert_eq!(oracle.accepted_interval(session_four + 10), Ok((session_four, session_four))); + } + #[test] fn extract_authorities_change_digest() { let mut header = Header::new( @@ -876,69 +1138,6 @@ pub(crate) mod tests { assert_eq!(extracted, Some(mmr_root_hash)); } - #[test] - fn should_vote_target() { - let keys = &[Keyring::Alice]; - let validator_set = ValidatorSet::new(make_beefy_ids(keys), 0).unwrap(); - let mut net = BeefyTestNet::new(1, 0); - let mut worker = create_beefy_worker(&net.peer(0), &keys[0], 1); - - // rounds not initialized -> should vote: `None` - assert_eq!(worker.current_vote_target(), None); - - let set_up = |worker: &mut BeefyWorker< - Block, - Backend, - PeersFullClient, - TestApi, - Arc>, - >, - best_grandpa: u64, - best_beefy: Option, - session_start: u64, - min_delta: u32| { - let grandpa_header = Header::new( - best_grandpa, - Default::default(), - Default::default(), - Default::default(), - Default::default(), - ); - worker.best_grandpa_block_header = grandpa_header; - worker.best_beefy_block = best_beefy; - worker.min_block_delta = min_delta; - worker.rounds = Some(Rounds::new(session_start, validator_set.clone())); - }; - - // under min delta - set_up(&mut worker, 1, Some(1), 1, 4); - assert_eq!(worker.current_vote_target(), None); - set_up(&mut worker, 5, Some(2), 1, 4); - assert_eq!(worker.current_vote_target(), None); - - // vote on min delta - set_up(&mut worker, 9, Some(4), 1, 4); - assert_eq!(worker.current_vote_target(), Some(8)); - set_up(&mut worker, 18, Some(10), 1, 8); - assert_eq!(worker.current_vote_target(), Some(18)); - - // vote on power of two - set_up(&mut worker, 1008, Some(1000), 1, 1); - assert_eq!(worker.current_vote_target(), Some(1004)); - set_up(&mut worker, 1016, Some(1000), 1, 2); - assert_eq!(worker.current_vote_target(), Some(1008)); - - // nothing new to vote on - set_up(&mut worker, 1000, Some(1000), 1, 1); - assert_eq!(worker.current_vote_target(), None); - - // vote on mandatory - set_up(&mut worker, 1008, None, 1000, 8); - assert_eq!(worker.current_vote_target(), Some(1000)); - set_up(&mut worker, 1008, Some(1000), 1001, 8); - assert_eq!(worker.current_vote_target(), Some(1001)); - } - #[test] fn keystore_vs_validator_set() { let keys = &[Keyring::Alice]; @@ -953,39 +1152,57 @@ pub(crate) mod tests { let keys = &[Keyring::Bob]; let validator_set = ValidatorSet::new(make_beefy_ids(keys), 0).unwrap(); let err_msg = "no authority public key found in store".to_string(); - let expected = Err(error::Error::Keystore(err_msg)); + let expected = Err(Error::Keystore(err_msg)); assert_eq!(worker.verify_validator_set(&1, &validator_set), expected); // worker has no keystore worker.key_store = None.into(); - let expected_err = Err(error::Error::Keystore("no Keystore".into())); + let expected_err = Err(Error::Keystore("no Keystore".into())); assert_eq!(worker.verify_validator_set(&1, &validator_set), expected_err); } #[test] - fn setting_best_beefy_block() { + fn test_finalize() { let keys = &[Keyring::Alice]; let validator_set = ValidatorSet::new(make_beefy_ids(keys), 0).unwrap(); let mut net = BeefyTestNet::new(1, 0); let mut worker = create_beefy_worker(&net.peer(0), &keys[0], 1); - let (mut best_block_streams, _) = get_beefy_streams(&mut net, keys); + let (mut best_block_streams, mut signed_commitments) = get_beefy_streams(&mut net, keys); let mut best_block_stream = best_block_streams.drain(..).next().unwrap(); + let mut signed_commitments = signed_commitments.drain(..).next().unwrap(); - // no 'best beefy block' + let create_signed_commitment = |block_num: NumberFor| { + let commitment = Commitment { + payload: Payload::new(known_payload_ids::MMR_ROOT_ID, vec![]), + block_number: block_num, + validator_set_id: validator_set.id(), + }; + SignedCommitment { commitment, signatures: vec![None] } + }; + + // no 'best beefy block' or signed commitments assert_eq!(worker.best_beefy_block, None); block_on(poll_fn(move |cx| { assert_eq!(best_block_stream.poll_next_unpin(cx), Poll::Pending); + assert_eq!(signed_commitments.poll_next_unpin(cx), Poll::Pending); Poll::Ready(()) })); // unknown hash for block #1 - let (mut best_block_streams, _) = get_beefy_streams(&mut net, keys); + let (mut best_block_streams, mut signed_commitments) = get_beefy_streams(&mut net, keys); let mut best_block_stream = best_block_streams.drain(..).next().unwrap(); - worker.set_best_beefy_block(1); + let mut signed_commitments = signed_commitments.drain(..).next().unwrap(); + let justif = create_signed_commitment(1); + worker.finalize(justif.clone()); assert_eq!(worker.best_beefy_block, Some(1)); block_on(poll_fn(move |cx| { assert_eq!(best_block_stream.poll_next_unpin(cx), Poll::Pending); + match signed_commitments.poll_next_unpin(cx) { + // expect justification + Poll::Ready(Some(received)) => assert_eq!(received, justif), + v => panic!("unexpected value: {:?}", v), + } Poll::Ready(()) })); @@ -994,7 +1211,8 @@ pub(crate) mod tests { let mut best_block_stream = best_block_streams.drain(..).next().unwrap(); net.generate_blocks(2, 10, &validator_set, false); - worker.set_best_beefy_block(2); + let justif = create_signed_commitment(2); + worker.finalize(justif); assert_eq!(worker.best_beefy_block, Some(2)); block_on(poll_fn(move |cx| { match best_block_stream.poll_next_unpin(cx) { @@ -1010,20 +1228,17 @@ pub(crate) mod tests { } #[test] - fn setting_initial_session() { + fn should_init_session() { let keys = &[Keyring::Alice]; let validator_set = ValidatorSet::new(make_beefy_ids(keys), 0).unwrap(); let mut net = BeefyTestNet::new(1, 0); let mut worker = create_beefy_worker(&net.peer(0), &keys[0], 1); - assert!(worker.rounds.is_none()); + assert!(worker.voting_oracle.sessions.is_empty()); - // verify setting the correct validator sets and boundary for genesis session worker.init_session_at(validator_set.clone(), 1); - - let worker_rounds = worker.rounds.as_ref().unwrap(); - assert_eq!(worker_rounds.session_start(), &1); - // in genesis case both current and prev validator sets are the same + let worker_rounds = worker.voting_oracle.rounds_mut().unwrap(); + assert_eq!(worker_rounds.session_start(), 1); assert_eq!(worker_rounds.validators(), validator_set.validators()); assert_eq!(worker_rounds.validator_set_id(), validator_set.id()); @@ -1031,12 +1246,79 @@ pub(crate) mod tests { let keys = &[Keyring::Bob]; let new_validator_set = ValidatorSet::new(make_beefy_ids(keys), 1).unwrap(); - // verify setting the correct validator sets and boundary for non-genesis session worker.init_session_at(new_validator_set.clone(), 11); + // Since mandatory is not done for old rounds, we still get those. + let rounds = worker.voting_oracle.rounds_mut().unwrap(); + assert_eq!(rounds.validator_set_id(), validator_set.id()); + // Let's finalize mandatory. + rounds.test_set_mandatory_done(true); + worker.voting_oracle.try_prune(); + // Now we should get the next round. + let rounds = worker.voting_oracle.rounds_mut().unwrap(); + // Expect new values. + assert_eq!(rounds.session_start(), 11); + assert_eq!(rounds.validators(), new_validator_set.validators()); + assert_eq!(rounds.validator_set_id(), new_validator_set.id()); + } + + #[test] + fn should_triage_votes_and_process_later() { + let keys = &[Keyring::Alice, Keyring::Bob]; + let validator_set = ValidatorSet::new(make_beefy_ids(keys), 0).unwrap(); + let mut net = BeefyTestNet::new(1, 0); + let mut worker = create_beefy_worker(&net.peer(0), &keys[0], 1); + + fn new_vote( + block_number: NumberFor, + ) -> VoteMessage, AuthorityId, Signature> { + let commitment = Commitment { + payload: Payload::new(*b"BF", vec![]), + block_number, + validator_set_id: 0, + }; + VoteMessage { + commitment, + id: Keyring::Alice.public(), + signature: Keyring::Alice.sign(b"I am committed"), + } + } + + // best grandpa is 20 + let best_grandpa_header = Header::new( + 20u32.into(), + Default::default(), + Default::default(), + Default::default(), + Digest::default(), + ); - let worker_rounds = worker.rounds.as_ref().unwrap(); - assert_eq!(worker_rounds.session_start(), &11); - assert_eq!(worker_rounds.validators(), new_validator_set.validators()); - assert_eq!(worker_rounds.validator_set_id(), new_validator_set.id()); + worker.voting_oracle.add_session(Rounds::new(10, validator_set.clone())); + worker.best_grandpa_block_header = best_grandpa_header; + + // triage votes for blocks 10..13 + worker.triage_incoming_vote(new_vote(10)).unwrap(); + worker.triage_incoming_vote(new_vote(11)).unwrap(); + worker.triage_incoming_vote(new_vote(12)).unwrap(); + // triage votes for blocks 20..23 + worker.triage_incoming_vote(new_vote(20)).unwrap(); + worker.triage_incoming_vote(new_vote(21)).unwrap(); + worker.triage_incoming_vote(new_vote(22)).unwrap(); + + // vote for 10 should have been handled, while the rest buffered for later processing + let mut votes = worker.pending_votes.values(); + assert_eq!(votes.next().unwrap().first().unwrap().commitment.block_number, 11); + assert_eq!(votes.next().unwrap().first().unwrap().commitment.block_number, 12); + assert_eq!(votes.next().unwrap().first().unwrap().commitment.block_number, 20); + assert_eq!(votes.next().unwrap().first().unwrap().commitment.block_number, 21); + assert_eq!(votes.next().unwrap().first().unwrap().commitment.block_number, 22); + assert!(votes.next().is_none()); + + // simulate mandatory done, and retry buffered votes + worker.voting_oracle.rounds_mut().unwrap().test_set_mandatory_done(true); + worker.try_pending_justif_and_votes().unwrap(); + // all blocks <= grandpa finalized should have been handled, rest still buffered + let mut votes = worker.pending_votes.values(); + assert_eq!(votes.next().unwrap().first().unwrap().commitment.block_number, 21); + assert_eq!(votes.next().unwrap().first().unwrap().commitment.block_number, 22); } } diff --git a/client/network/test/src/lib.rs b/client/network/test/src/lib.rs index d7c83810d521d..c0e1e9f0e944f 100644 --- a/client/network/test/src/lib.rs +++ b/client/network/test/src/lib.rs @@ -25,6 +25,7 @@ mod sync; use std::{ borrow::Cow, collections::HashMap, + marker::PhantomData, pin::Pin, sync::Arc, task::{Context as FutureContext, Poll}, @@ -567,25 +568,27 @@ impl BlockImportAdapterFull for T where /// This is required as the `TestNetFactory` trait does not distinguish between /// full and light nodes. #[derive(Clone)] -pub struct BlockImportAdapter { +pub struct BlockImportAdapter { inner: I, + _phantom: PhantomData, } -impl BlockImportAdapter { +impl BlockImportAdapter { /// Create a new instance of `Self::Full`. pub fn new(inner: I) -> Self { - Self { inner } + Self { inner, _phantom: PhantomData } } } #[async_trait::async_trait] -impl BlockImport for BlockImportAdapter +impl BlockImport for BlockImportAdapter where I: BlockImport + Send + Sync, I::Transaction: Send, + Transaction: Send + 'static, { type Error = ConsensusError; - type Transaction = (); + type Transaction = Transaction; async fn check_block( &mut self, @@ -596,7 +599,7 @@ where async fn import_block( &mut self, - block: BlockImportParams, + block: BlockImportParams, cache: HashMap>, ) -> Result { self.inner.import_block(block.clear_storage_changes_and_mutate(), cache).await diff --git a/primitives/beefy/src/commitment.rs b/primitives/beefy/src/commitment.rs index ed392139de13f..ddf58474e77a0 100644 --- a/primitives/beefy/src/commitment.rs +++ b/primitives/beefy/src/commitment.rs @@ -293,6 +293,12 @@ pub enum VersionedFinalityProof { V1(SignedCommitment), } +impl From> for VersionedFinalityProof { + fn from(commitment: SignedCommitment) -> Self { + VersionedFinalityProof::V1(commitment) + } +} + #[cfg(test)] mod tests { diff --git a/primitives/runtime/src/lib.rs b/primitives/runtime/src/lib.rs index b41c605d8a3be..bf77c08b76906 100644 --- a/primitives/runtime/src/lib.rs +++ b/primitives/runtime/src/lib.rs @@ -151,6 +151,11 @@ impl Justifications { self.iter().find(|j| j.0 == engine_id).map(|j| &j.1) } + /// Remove the encoded justification for the given consensus engine, if it exists. + pub fn remove(&mut self, engine_id: ConsensusEngineId) { + self.0.retain(|j| j.0 != engine_id) + } + /// Return a copy of the encoded justification for the given consensus /// engine, if it exists. pub fn into_justification(self, engine_id: ConsensusEngineId) -> Option {