diff --git a/Cargo.lock b/Cargo.lock index a6a61c7..4d2324b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6782,11 +6782,17 @@ dependencies = [ "parking_lot 0.12.3", "polkadot-node-primitives", "polkadot-node-subsystem", + "polkadot-node-subsystem-util", + "polkadot-overseer", "polkadot-primitives", + "rstest", "sc-client-api", "sc-consensus", + "sc-consensus-babe", "sc-consensus-manual-seal", "sc-network-types", + "sc-utils", + "schnellru", "sp-api", "sp-application-crypto", "sp-block-builder", @@ -6797,9 +6803,13 @@ dependencies = [ "sp-inherents", "sp-keystore", "sp-runtime", + "sp-state-machine", "sp-timestamp", + "sp-tracing", + "sp-trie", "sp-version", "substrate-prometheus-endpoint", + "tokio", "tracing", ] @@ -11076,6 +11086,12 @@ version = "0.8.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2b15c43186be67a4fd63bee50d0303afffcef381492ebe2c5d87f324e1b8815c" +[[package]] +name = "relative-path" +version = "1.9.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ba39f3699c378cd8970968dcbff9c43159ea4cfbd88d43c00b22f2ef10a435d2" + [[package]] name = "resolv-conf" version = "0.7.4" @@ -11293,6 +11309,35 @@ dependencies = [ "windows-sys 0.59.0", ] +[[package]] +name = "rstest" +version = "0.18.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "97eeab2f3c0a199bc4be135c36c924b6590b88c377d416494288c14f2db30199" +dependencies = [ + "futures 0.3.31", + "futures-timer", + "rstest_macros", + "rustc_version 0.4.1", +] + +[[package]] +name = "rstest_macros" +version = "0.18.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d428f8247852f894ee1be110b375111b586d4fa431f6c46e64ba5a0dcccbe605" +dependencies = [ + "cfg-if", + "glob", + "proc-macro2", + "quote", + "regex", + "relative-path", + "rustc_version 0.4.1", + "syn 2.0.101", + "unicode-ident", +] + [[package]] name = "rtnetlink" version = "0.13.1" diff --git a/Cargo.toml b/Cargo.toml index 9f4a7f6..4a99272 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -38,6 +38,9 @@ scale-info = { version = "2.11.2", default-features = false, features = [ schnorrkel = { version = "0.11.4", default-features = false } serde = { version = "1.0.195", default-features = false } serde_json = { version = "1.0.111", default-features = false } +schnellru = { version = "0.2.3" } +tokio = { version = "1.45.0", default-features = false } +rstest = { version = "0.18.2" } smallvec = "1.6.1" tracing = "0.1.22" @@ -117,6 +120,8 @@ sp-consensus = { git = "https://github.com/paritytech/polkadot-sdk", branch = "s sp-storage = { git = "https://github.com/paritytech/polkadot-sdk", branch = "stable2512", default-features = false } sp-timestamp = { git = "https://github.com/paritytech/polkadot-sdk", branch = "stable2512", default-features = false } sp-trie = { git = "https://github.com/paritytech/polkadot-sdk", branch = "stable2512", default-features = false } +sp-state-machine = { git = "https://github.com/paritytech/polkadot-sdk", branch = "stable2512", default-features = false } +sp-tracing = { git = "https://github.com/paritytech/polkadot-sdk", branch = "stable2512", default-features = false } # Substrate (client) frame-benchmarking-cli = { git = "https://github.com/paritytech/polkadot-sdk", branch = "stable2512" } @@ -130,6 +135,7 @@ sc-client-db = { git = "https://github.com/paritytech/polkadot-sdk", branch = "s sc-consensus = { git = "https://github.com/paritytech/polkadot-sdk", branch = "stable2512" } sc-consensus-grandpa = { git = "https://github.com/paritytech/polkadot-sdk", branch = "stable2512" } sc-consensus-manual-seal = { git = "https://github.com/paritytech/polkadot-sdk", branch = "stable2512" } +sc-consensus-babe = { git = "https://github.com/paritytech/polkadot-sdk", branch = "stable2512" } sc-executor = { git = "https://github.com/paritytech/polkadot-sdk", branch = "stable2512" } sc-informant = { git = "https://github.com/paritytech/polkadot-sdk", branch = "stable2512" } sc-network = { git = "https://github.com/paritytech/polkadot-sdk", branch = "stable2512" } @@ -202,8 +208,10 @@ xcm-executor = { package = "staging-xcm-executor", git = "https://github.com/par kusama-runtime = { git = "https://github.com/paritytech/polkadot-sdk", branch = "stable2512" } polkadot-cli = { git = "https://github.com/paritytech/polkadot-sdk", branch = "stable2512" } polkadot-node-subsystem = { git = "https://github.com/paritytech/polkadot-sdk", branch = "stable2512" } +polkadot-node-subsystem-util = { git = "https://github.com/paritytech/polkadot-sdk", branch = "stable2512" } polkadot-primitives = { git = "https://github.com/paritytech/polkadot-sdk", branch = "stable2512" } polkadot-service = { git = "https://github.com/paritytech/polkadot-sdk", branch = "stable2512" } +polkadot-overseer = { git = "https://github.com/paritytech/polkadot-sdk", branch = "stable2512" } rococo-runtime = { git = "https://github.com/paritytech/polkadot-sdk", branch = "stable2512" } westend-runtime = { git = "https://github.com/paritytech/polkadot-sdk", branch = "stable2512" } xcm-simulator = { git = "https://github.com/paritytech/polkadot-sdk", branch = "stable2512" } diff --git a/client/consensus/nimbus-consensus/Cargo.toml b/client/consensus/nimbus-consensus/Cargo.toml index 7e01575..b63f709 100644 --- a/client/consensus/nimbus-consensus/Cargo.toml +++ b/client/consensus/nimbus-consensus/Cargo.toml @@ -3,12 +3,15 @@ name = "nimbus-consensus" description = "Client-side worker for the Nimbus family of consensus algorithms" edition = "2021" version = "0.9.0" + [dependencies] # Substrate deps sc-client-api = { workspace = true } sc-consensus = { workspace = true } sc-consensus-manual-seal = { workspace = true } sc-network-types = { workspace = true } +sc-consensus-babe = { workspace = true } +sc-utils = { workspace = true } sp-consensus-slots = { workspace = true } sp-api = { workspace = true } sp-application-crypto = { workspace = true } @@ -21,6 +24,8 @@ sp-timestamp = { workspace = true } sp-keystore = { workspace = true } sp-runtime = { workspace = true } sp-version = { workspace = true } +sp-trie = { workspace = true } +sp-state-machine = { workspace = true } substrate-prometheus-endpoint = { workspace = true } # Cumulus dependencies @@ -36,6 +41,8 @@ cumulus-relay-chain-interface = { workspace = true } polkadot-node-primitives = { workspace = true } polkadot-node-subsystem = { workspace = true } polkadot-primitives = { workspace = true } +polkadot-overseer = { workspace = true } +polkadot-node-subsystem-util = { workspace = true } # Nimbus Dependencies async-backing-primitives = { workspace = true, features = ["std"] } @@ -48,3 +55,9 @@ futures = { workspace = true } log = { workspace = true } parking_lot = { workspace = true } tracing = { workspace = true } +schnellru = { workspace = true } +tokio = { workspace = true, features = ["macros"] } + +[dev-dependencies] +rstest = { workspace = true } +sp-tracing = { workspace = true } \ No newline at end of file diff --git a/client/consensus/nimbus-consensus/src/collator.rs b/client/consensus/nimbus-consensus/src/collator.rs new file mode 100644 index 0000000..2b2e04a --- /dev/null +++ b/client/consensus/nimbus-consensus/src/collator.rs @@ -0,0 +1,392 @@ +// Copyright Moonsong Labs +// This file is part of Moonkit. + +// Moonkit is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Moonkit is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Moonkit. If not, see . + +use cumulus_client_collator::service::ServiceInterface as CollatorServiceInterface; +use cumulus_client_consensus_common::{ParachainBlockImportMarker, ParachainCandidate}; +use cumulus_client_consensus_proposer::ProposerInterface; +use cumulus_client_parachain_inherent::{ParachainInherentData, ParachainInherentDataProvider}; +use cumulus_primitives_core::{ + relay_chain::Hash as PHash, DigestItem, ParachainBlockData, PersistedValidationData, +}; +use cumulus_relay_chain_interface::RelayChainInterface; +use parity_scale_codec::Codec; + +use polkadot_node_primitives::{Collation, MaybeCompressedPoV}; +use polkadot_primitives::Id as ParaId; + +use crate::collators::RelayParentData; +use futures::prelude::*; +use nimbus_primitives::{CompatibleDigestItem, NimbusId}; +use sc_consensus::{BlockImport, BlockImportParams, ForkChoiceStrategy, StateAction}; +use sc_network_types::PeerId; +use sp_application_crypto::AppPublic; +use sp_consensus::BlockOrigin; +use sp_core::{crypto::Pair, ByteArray}; +use sp_inherents::{CreateInherentDataProviders, InherentData, InherentDataProvider}; +use sp_keystore::KeystorePtr; +use sp_runtime::{ + generic::Digest, + traits::{Block as BlockT, HashingFor, Header as HeaderT, Member}, +}; +use sp_state_machine::StorageChanges; +use sp_timestamp::Timestamp; +use std::{error::Error, time::Duration}; + +/// Parameters for instantiating a [`Collator`]. +pub struct Params { + /// A builder for inherent data builders. + pub create_inherent_data_providers: CIDP, + /// The block import handle. + pub block_import: BI, + /// An interface to the relay-chain client. + pub relay_client: RClient, + /// The keystore handle used for accessing parachain key material. + pub keystore: KeystorePtr, + /// The identifier of the parachain within the relay-chain. + pub para_id: ParaId, + /// The block proposer used for building blocks. + pub proposer: Proposer, + /// The collator service used for bundling proposals into collations and announcing + /// to the network. + pub collator_service: CS, +} + +/// A utility struct for writing collation logic that makes use of Nimbus entirely +/// or in part. See module docs for more details. +/// +/// The `CIDPContext` type parameter specifies the context type passed to +/// `CreateInherentDataProviders`. Common choices are `()` or +/// `(PHash, PersistedValidationData, NimbusId)`. +pub struct Collator { + create_inherent_data_providers: CIDP, + block_import: BI, + relay_client: RClient, + keystore: KeystorePtr, + para_id: ParaId, + proposer: Proposer, + collator_service: CS, + _marker: std::marker::PhantomData<(Block, Box, CIDPContext)>, +} + +impl + Collator +where + Block: BlockT, + RClient: RelayChainInterface, + CIDP: CreateInherentDataProviders + 'static, + CIDPContext: Clone + Send + 'static, + BI: BlockImport + ParachainBlockImportMarker + Send + Sync + 'static, + Proposer: ProposerInterface, + CS: CollatorServiceInterface, + P: Pair, + P::Public: AppPublic + Member, + P::Signature: TryFrom> + Member + Codec, +{ + /// Instantiate a new instance of the `Collator`. + pub fn new(params: Params) -> Self { + Collator { + create_inherent_data_providers: params.create_inherent_data_providers, + block_import: params.block_import, + relay_client: params.relay_client, + keystore: params.keystore, + para_id: params.para_id, + proposer: params.proposer, + collator_service: params.collator_service, + _marker: std::marker::PhantomData, + } + } + + /// Explicitly creates the inherent data for parachain block authoring and overrides + /// the timestamp inherent data with the one provided, if any. Additionally allows to specify + /// relay parent descendants that can be used to prevent authoring at the tip of the relay + /// chain. + pub async fn create_inherent_data_with_rp_offset( + &self, + relay_parent: PHash, + validation_data: &PersistedValidationData, + parent_hash: Block::Hash, + timestamp: impl Into>, + relay_parent_descendants: Option, + additional_relay_state_keys: Vec>, + collator_peer_id: PeerId, + cidp_context: CIDPContext, + ) -> Result<(ParachainInherentData, InherentData), Box> { + let paras_inherent_data = ParachainInherentDataProvider::create_at( + relay_parent, + &self.relay_client, + validation_data, + self.para_id, + relay_parent_descendants + .map(RelayParentData::into_inherent_descendant_list) + .unwrap_or_default(), + additional_relay_state_keys, + collator_peer_id, + ) + .await; + + let paras_inherent_data = match paras_inherent_data { + Some(p) => p, + None => { + return Err( + format!("Could not create paras inherent data at {:?}", relay_parent).into(), + ) + } + }; + + let mut other_inherent_data = self + .create_inherent_data_providers + .create_inherent_data_providers(parent_hash, cidp_context) + .map_err(|e| e as Box) + .await? + .create_inherent_data() + .await + .map_err(Box::new)?; + + if let Some(timestamp) = timestamp.into() { + other_inherent_data.replace_data(sp_timestamp::INHERENT_IDENTIFIER, ×tamp); + } + + Ok((paras_inherent_data, other_inherent_data)) + } + + /// Explicitly creates the inherent data for parachain block authoring and overrides + /// the timestamp inherent data with the one provided, if any. + pub async fn create_inherent_data( + &self, + relay_parent: PHash, + validation_data: &PersistedValidationData, + parent_hash: Block::Hash, + timestamp: impl Into>, + additional_relay_state_keys: Vec>, + collator_peer_id: PeerId, + cidp_context: CIDPContext, + ) -> Result<(ParachainInherentData, InherentData), Box> { + self.create_inherent_data_with_rp_offset( + relay_parent, + validation_data, + parent_hash, + timestamp, + None, + additional_relay_state_keys, + collator_peer_id, + cidp_context, + ) + .await + } + + /// Build and import a parachain block on the given parent header, using the given slot claim. + pub async fn build_block_and_import( + &mut self, + parent_header: &Block::Header, + slot_claim: &SlotClaim, + additional_pre_digest: impl Into>>, + inherent_data: (ParachainInherentData, InherentData), + proposal_duration: Duration, + max_pov_size: usize, + ) -> Result>, Box> { + let mut logs = vec![CompatibleDigestItem::nimbus_pre_digest( + slot_claim.author_id.clone(), + )]; + logs.extend(additional_pre_digest.into().unwrap_or_default()); + + let maybe_proposal = self + .proposer + .propose( + &parent_header, + &inherent_data.0, + inherent_data.1, + Digest { logs }, + proposal_duration, + Some(max_pov_size), + ) + .await + .map_err(|e| Box::new(e) as Box)?; + + let proposal = match maybe_proposal { + None => return Ok(None), + Some(p) => p, + }; + + let sealed_importable = seal::<_, P>( + proposal.block, + proposal.storage_changes, + &slot_claim.author_id, + &self.keystore, + ) + .map_err(|e| e as Box)?; + + let block = Block::new( + sealed_importable.post_header(), + sealed_importable + .body + .as_ref() + .expect("body always created with this `propose` fn; qed") + .clone(), + ); + + self.block_import + .import_block(sealed_importable) + .map_err(|e| Box::new(e) as Box) + .await?; + + Ok(Some(ParachainCandidate { + block, + proof: proposal.proof, + })) + } + + /// Propose, seal, import a block and packaging it into a collation. + /// + /// Provide the slot to build at as well as any other necessary pre-digest logs, + /// the inherent data, and the proposal duration and PoV size limits. + /// + /// The Nimbust pre-digest should not be explicitly provided and is set internally. + /// + /// This does not announce the collation to the parachain network or the relay chain. + pub async fn collate( + &mut self, + parent_header: &Block::Header, + slot_claim: &SlotClaim, + additional_pre_digest: impl Into>>, + inherent_data: (ParachainInherentData, InherentData), + proposal_duration: Duration, + max_pov_size: usize, + ) -> Result)>, Box> { + let maybe_candidate = self + .build_block_and_import( + parent_header, + slot_claim, + additional_pre_digest, + inherent_data, + proposal_duration, + max_pov_size, + ) + .await?; + + let Some(candidate) = maybe_candidate else { + return Ok(None); + }; + + let hash = candidate.block.header().hash(); + if let Some((collation, block_data)) = + self.collator_service + .build_collation(parent_header, hash, candidate) + { + block_data.log_size_info(); + + if let MaybeCompressedPoV::Compressed(ref pov) = collation.proof_of_validity { + tracing::info!( + target: crate::LOG_TARGET, + "Compressed PoV size: {}kb", + pov.block_data.0.len() as f64 / 1024f64, + ); + } + + Ok(Some((collation, block_data))) + } else { + Err(Box::::from( + "Unable to produce collation", + )) + } + } + + /// Get the underlying collator service. + pub fn collator_service(&self) -> &CS { + &self.collator_service + } +} + +/// A claim on an Aura slot. +pub struct SlotClaim { + author_id: NimbusId, + timestamp: Timestamp, +} + +impl SlotClaim { + /// Create a slot-claim from the given author public key, slot, and timestamp. + /// + /// This does not check whether the author actually owns the slot or the timestamp + /// falls within the slot. + pub fn unchecked

(author_id: NimbusId, timestamp: Timestamp) -> Self + where + P: Pair, + P::Public: Codec, + P::Signature: Codec, + { + SlotClaim { + author_id, + timestamp, + } + } + + /// Get the timestamp corresponding to the relay-chain slot this claim was + /// generated against. + pub fn timestamp(&self) -> Timestamp { + self.timestamp + } + + /// Get the timestamp corresponding to the relay-chain slot this claim was + /// generated against. + pub fn author_id(&self) -> NimbusId { + self.author_id.clone() + } +} + +/// Seal a block with a signature in the header. +pub fn seal( + pre_sealed: B, + storage_changes: StorageChanges>, + author_pub: &P::Public, + keystore: &KeystorePtr, +) -> Result, Box> +where + P: Pair, + P::Signature: Codec + TryFrom>, + P::Public: AppPublic, +{ + let (pre_header, body) = pre_sealed.deconstruct(); + let pre_hash = pre_header.hash(); + let block_number = *pre_header.number(); + + // seal the block. + let block_import_params = { + //let seal_digest = aura_internal::seal::<_, P>(&pre_hash, &author_pub, keystore).map_err(Box::new)?; + let seal_digest = crate::collators::seal_header::( + &pre_header, + keystore, + &author_pub.to_raw_vec(), + &sp_core::sr25519::CRYPTO_ID, + ); + let mut block_import_params = BlockImportParams::new(BlockOrigin::Own, pre_header); + block_import_params.post_digests.push(seal_digest); + block_import_params.body = Some(body); + block_import_params.state_action = + StateAction::ApplyChanges(sc_consensus::StorageChanges::Changes(storage_changes)); + block_import_params.fork_choice = Some(ForkChoiceStrategy::LongestChain); + block_import_params + }; + let post_hash = block_import_params.post_hash(); + + tracing::info!( + target: crate::LOG_TARGET, + "🔖 Pre-sealed block for proposal at {}. Hash now {:?}, previously {:?}.", + block_number, + post_hash, + pre_hash, + ); + + Ok(block_import_params) +} diff --git a/client/consensus/nimbus-consensus/src/collators.rs b/client/consensus/nimbus-consensus/src/collators.rs index 5d3c252..c336693 100644 --- a/client/consensus/nimbus-consensus/src/collators.rs +++ b/client/consensus/nimbus-consensus/src/collators.rs @@ -17,157 +17,48 @@ //! Stock, pure Nimbus collators. //! //! This includes the [`basic`] collator, which only builds on top of the most recently -//! included parachain block, as well as the [`lookahead`] collator, which prospectively -//! builds on parachain blocks which have not yet been included in the relay chain. +//! included parachain block, the [`lookahead`] collator, which prospectively builds on +//! parachain blocks which have not yet been included in the relay chain, and the +//! [`slot_based`] collator, which uses a slot-based block production approach with a +//! separate block builder task that is timed by the parachain's slot duration. pub mod basic; pub mod lookahead; +pub mod slot_based; -use crate::*; -use cumulus_client_collator::service::ServiceInterface as CollatorServiceInterface; -use cumulus_client_consensus_common::{ParachainBlockImportMarker, ParachainCandidate}; -use cumulus_client_consensus_proposer::ProposerInterface; +use crate::{collator::SlotClaim, *}; +use async_backing_primitives::Slot; +use async_backing_primitives::UnincludedSegmentApi; +use cumulus_client_consensus_common::{ParentSearchParams, PotentialParent}; use cumulus_primitives_core::{ - relay_chain::{OccupiedCoreAssumption, ValidationCodeHash}, - ParachainBlockData, + relay_chain::{Header as RelayHeader, OccupiedCoreAssumption, ValidationCodeHash}, + ParaId, }; -use cumulus_primitives_parachain_inherent::ParachainInherentData; -use futures::prelude::*; -use log::{debug, info}; -use nimbus_primitives::{CompatibleDigestItem, DigestsProvider, NimbusId, NIMBUS_KEY_ID}; -use polkadot_node_primitives::{Collation, MaybeCompressedPoV}; +use cumulus_relay_chain_interface::{OverseerHandle, RelayChainInterface}; +use log::debug; +use nimbus_primitives::{CompatibleDigestItem, NimbusId, NIMBUS_KEY_ID}; +use parity_scale_codec::Codec; +use polkadot_node_subsystem::messages::{CollatorProtocolMessage, RuntimeApiRequest}; +use polkadot_node_subsystem_util::runtime::ClaimQueueSnapshot; use polkadot_primitives::Hash as RelayHash; -use sc_consensus::{BlockImport, BlockImportParams}; -use sp_application_crypto::ByteArray; -use sp_consensus::{BlockOrigin, Proposal}; -use sp_core::{crypto::CryptoTypeId, sr25519}; -use sp_inherents::InherentData; -use sp_keystore::Keystore; +use sp_api::{ApiExt, RuntimeApiInfo}; +use sp_core::crypto::CryptoTypeId; +use sp_keystore::{Keystore, KeystorePtr}; use sp_runtime::{ traits::{Block as BlockT, Header as HeaderT}, DigestItem, }; +use sp_timestamp::Timestamp; use std::convert::TryInto; -use std::error::Error; -use std::time::Duration; -/// Propose, seal, and import a block, packaging it into a collation. -/// -/// Provide the slot to build at as well as any other necessary pre-digest logs, -/// the inherent data, and the proposal duration and PoV size limits. -/// -/// The Aura pre-digest should not be explicitly provided and is set internally. -/// -/// This does not announce the collation to the parachain network or the relay chain. -pub(crate) async fn collate( - additional_digests_provider: &ADP, - author_id: NimbusId, - block_import: &mut BI, - collator_service: &CS, - keystore: &dyn Keystore, - parent_header: &Block::Header, - proposer: &mut Proposer, - inherent_data: (ParachainInherentData, InherentData), - proposal_duration: Duration, - max_pov_size: usize, -) -> Result)>, Box> -where - ADP: DigestsProvider::Hash> + 'static, - Block: BlockT, - BI: BlockImport + ParachainBlockImportMarker + Send + Sync + 'static, - CS: CollatorServiceInterface, - Proposer: ProposerInterface + Send + Sync + 'static, -{ - let mut logs = vec![CompatibleDigestItem::nimbus_pre_digest(author_id.clone())]; - logs.extend( - additional_digests_provider.provide_digests(author_id.clone(), parent_header.hash()), - ); - - let maybe_proposal = proposer - .propose( - parent_header, - &inherent_data.0, - inherent_data.1, - sp_runtime::generic::Digest { logs }, - proposal_duration, - Some(max_pov_size), - ) - .await - .map_err(|e| Box::new(e) as Box)?; - - let Proposal { - block, - storage_changes, - proof, - } = match maybe_proposal { - None => return Ok(None), - Some(p) => p, - }; - - let (header, extrinsics) = block.clone().deconstruct(); - - let sig_digest = seal_header::( - &header, - keystore, - &author_id.to_raw_vec(), - &sr25519::CRYPTO_ID, - ); - - let mut block_import_params = BlockImportParams::new(BlockOrigin::Own, header.clone()); - block_import_params.post_digests.push(sig_digest.clone()); - block_import_params.body = Some(extrinsics.clone()); - block_import_params.state_action = sc_consensus::StateAction::ApplyChanges( - sc_consensus::StorageChanges::Changes(storage_changes), - ); - // The collator should follow the longest chain - block_import_params.fork_choice = Some(sc_consensus::ForkChoiceStrategy::LongestChain); - - let post_hash = block_import_params.post_hash(); - - // Print the same log line as slots (aura and babe) - info!( - "🔖 Sealed block for proposal at {}. Hash now {:?}, previously {:?}.", - *header.number(), - &post_hash, - header.hash(), - ); - - block_import - .import_block(block_import_params) - .map_err(|e| Box::new(e) as Box) - .await?; - - // Compute info about the block after the digest is added - let mut post_header = header.clone(); - post_header.digest_mut().logs.push(sig_digest.clone()); - let post_block = Block::new(post_header, extrinsics); - - if let Some((collation, block_data)) = collator_service.build_collation( - parent_header, - post_hash, - ParachainCandidate { - block: post_block, - proof, - }, - ) { - block_data.log_size_info(); - - if let MaybeCompressedPoV::Compressed(ref pov) = collation.proof_of_validity { - tracing::info!( - target: crate::LOG_TARGET, - "Compressed PoV size: {}kb", - pov.block_data.0.len() as f64 / 1024f64, - ); - } - - Ok(Some((collation, block_data))) - } else { - Err( - Box::::from("Unable to produce collation") - as Box, - ) - } -} +// This is an arbitrary value which is likely guaranteed to exceed any reasonable +// limit, as it would correspond to 30 non-included blocks. +// +// Since we only search for parent blocks which have already been imported, +// we can guarantee that all imported blocks respect the unincluded segment +// rules specified by the parachain's runtime and thus will never be too deep. This is just an extra +// sanity check. +const PARENT_SEARCH_DEPTH: usize = 30; pub(crate) fn seal_header( header: &Block::Header, @@ -250,3 +141,311 @@ async fn check_validation_code_or_log( } } } + +/// Holds a relay parent and its descendants. +pub struct RelayParentData { + /// The relay parent block header + relay_parent: RelayHeader, + /// Ordered collection of descendant block headers, from oldest to newest + descendants: Vec, +} + +impl RelayParentData { + /// Creates a new instance with the given relay parent and no descendants. + pub fn new(relay_parent: RelayHeader) -> Self { + Self { + relay_parent, + descendants: Default::default(), + } + } + + /// Creates a new instance with the given relay parent and descendants. + pub fn new_with_descendants(relay_parent: RelayHeader, descendants: Vec) -> Self { + Self { + relay_parent, + descendants, + } + } + + /// Returns a reference to the relay parent header. + pub fn relay_parent(&self) -> &RelayHeader { + &self.relay_parent + } + + /// Returns the number of descendants. + #[cfg(test)] + pub fn descendants_len(&self) -> usize { + self.descendants.len() + } + + /// Consumes the structure and returns a vector containing the relay parent followed by its + /// descendants in chronological order. The resulting list should be provided to the parachain + /// inherent data. + pub fn into_inherent_descendant_list(self) -> Vec { + let Self { + relay_parent, + mut descendants, + } = self; + + if descendants.is_empty() { + return Default::default(); + } + + let mut result = vec![relay_parent]; + result.append(&mut descendants); + result + } +} + +/// Fetch scheduling lookahead at given relay parent. +async fn scheduling_lookahead( + relay_parent: RelayHash, + relay_client: &impl RelayChainInterface, +) -> Option { + let runtime_api_version = relay_client + .version(relay_parent) + .await + .map_err(|e| { + tracing::error!( + target: super::LOG_TARGET, + error = ?e, + "Failed to fetch relay chain runtime version.", + ) + }) + .ok()?; + + let parachain_host_runtime_api_version = runtime_api_version + .api_version( + &>::ID, + ) + .unwrap_or_default(); + + if parachain_host_runtime_api_version + < RuntimeApiRequest::SCHEDULING_LOOKAHEAD_RUNTIME_REQUIREMENT + { + return None; + } + + match relay_client.scheduling_lookahead(relay_parent).await { + Ok(scheduling_lookahead) => Some(scheduling_lookahead), + Err(err) => { + tracing::error!( + target: crate::LOG_TARGET, + ?err, + ?relay_parent, + "Failed to fetch scheduling lookahead from relay chain", + ); + None + } + } +} + +/// Use [`cumulus_client_consensus_common::find_potential_parents`] to find parachain blocks that +/// we can build on. Once a list of potential parents is retrieved, return the last one of the +/// longest chain. +async fn find_parent( + relay_parent: RelayHash, + para_id: ParaId, + para_backend: &impl sc_client_api::Backend, + relay_client: &impl RelayChainInterface, +) -> Option<(::Header, PotentialParent)> +where + Block: BlockT, +{ + let parent_search_params = ParentSearchParams { + relay_parent, + para_id, + ancestry_lookback: scheduling_lookahead(relay_parent, relay_client) + .await + .unwrap_or(polkadot_primitives::DEFAULT_SCHEDULING_LOOKAHEAD) + .saturating_sub(1) as usize, + max_depth: PARENT_SEARCH_DEPTH, + ignore_alternative_branches: true, + }; + + let potential_parents = cumulus_client_consensus_common::find_potential_parents::( + parent_search_params, + para_backend, + relay_client, + ) + .await; + + let potential_parents = match potential_parents { + Err(e) => { + tracing::error!( + target: crate::LOG_TARGET, + ?relay_parent, + err = ?e, + "Could not fetch potential parents to build upon" + ); + + return None; + } + Ok(x) => x, + }; + + let included_block = potential_parents + .iter() + .find(|x| x.depth == 0)? + .header + .clone(); + potential_parents + .into_iter() + .max_by_key(|a| a.depth) + .map(|parent| (included_block, parent)) +} + +/// Helper for managing pre-connections to backing groups. +/// +/// This ensures collators establish connections to backing groups proactively, +/// even in single-collator scenarios where the collator always owns the slot. +/// +/// For Nimbus consensus, since there's no round-robin authority rotation like Aura, +/// we simply check if we have any Nimbus keys and pre-connect if so. +pub(crate) struct BackingGroupConnectionHelper { + keystore: KeystorePtr, + overseer_handle: OverseerHandle, + /// Track the last slot we connected for to avoid sending duplicate messages. + last_slot: Option, + /// Whether we are currently connected. + connected: bool, +} + +impl BackingGroupConnectionHelper { + /// Create a new [`BackingGroupConnectionHelper`]. + pub fn new(keystore: KeystorePtr, overseer_handle: OverseerHandle) -> Self { + Self { + keystore, + overseer_handle, + last_slot: None, + connected: false, + } + } + + /// Update the connection helper with the current slot. + /// + /// For Nimbus, we check if we have any Nimbus keys and pre-connect if so. + /// This is simpler than Aura since Nimbus doesn't have round-robin slot assignment. + pub async fn update(&mut self, current_slot: Slot) { + // If we already processed this slot, skip + if self.last_slot == Some(current_slot) { + return; + } + self.last_slot = Some(current_slot); + + // Check if we have any Nimbus keys + let has_keys = self + .keystore + .keys(NIMBUS_KEY_ID) + .map(|keys| !keys.is_empty()) + .unwrap_or(false); + + if has_keys && !self.connected { + tracing::debug!( + target: crate::LOG_TARGET, + ?current_slot, + "Pre-connecting to backing groups", + ); + self.overseer_handle + .send_msg( + CollatorProtocolMessage::ConnectToBackingGroups, + "BackingGroupConnectionHelper", + ) + .await; + self.connected = true; + } else if !has_keys && self.connected { + tracing::debug!( + target: crate::LOG_TARGET, + ?current_slot, + "Disconnecting from backing groups - no keys", + ); + self.overseer_handle + .send_msg( + CollatorProtocolMessage::DisconnectFromBackingGroups, + "BackingGroupConnectionHelper", + ) + .await; + self.connected = false; + } + } +} + +// Returns the claim queue at the given relay parent. +async fn claim_queue_at( + relay_parent: RelayHash, + relay_client: &impl RelayChainInterface, +) -> ClaimQueueSnapshot { + // Get `ClaimQueue` from runtime + match relay_client.claim_queue(relay_parent).await { + Ok(claim_queue) => claim_queue.into(), + Err(error) => { + tracing::error!( + target: crate::LOG_TARGET, + ?error, + ?relay_parent, + "Failed to query claim queue runtime API", + ); + Default::default() + } + } +} + +// Checks if we own the slot at the given block and whether there +// is space in the unincluded segment. +async fn can_build_upon( + para_slot: Slot, + relay_slot: Slot, + timestamp: Timestamp, + relay_parent: PHeader, + parent_header: Block::Header, + included_block: Block::Hash, + client: &Client, + keystore: &KeystorePtr, + force_authoring: bool, +) -> Option +where + Client: ProvideRuntimeApi, + Client::Api: NimbusApi + UnincludedSegmentApi + ApiExt, + P: sp_core::Pair, + P::Public: Codec, + P::Signature: Codec, +{ + let runtime_api = client.runtime_api(); + let author_pub = crate::claim_slot::( + keystore, + client, + &parent_header, + &relay_parent, + force_authoring, + ) + .await + .ok() + .flatten()?; + + let parent_hash = parent_header.hash(); + + // This function is typically called when we want to build block N. At that point, the + // unincluded segment in the runtime is unaware of the hash of block N-1. If the unincluded + // segment in the runtime is full, but block N-1 is the included block, the unincluded segment + // should have length 0 and we can build. Since the hash is not available to the runtime + // however, we need this extra check here. + if parent_hash == included_block { + return Some(SlotClaim::unchecked::

(author_pub, timestamp)); + } + + let api_version = runtime_api + .api_version::>(parent_hash) + .ok() + .flatten()?; + + let slot = if api_version > 1 { + relay_slot + } else { + para_slot + }; + + runtime_api + .can_build_upon(parent_hash, included_block, slot) + .ok()? + .then(|| SlotClaim::unchecked::

(author_pub, timestamp)) +} diff --git a/client/consensus/nimbus-consensus/src/collators/basic.rs b/client/consensus/nimbus-consensus/src/collators/basic.rs index 92c5904..5d624c8 100644 --- a/client/consensus/nimbus-consensus/src/collators/basic.rs +++ b/client/consensus/nimbus-consensus/src/collators/basic.rs @@ -69,8 +69,9 @@ pub struct Params { pub additional_relay_state_keys: Vec>, /// Force production of the block even if the collator is not eligible pub force_authoring: bool, - /// Maximum percentage of POV size to use (0-85) - pub max_pov_percentage: u8, + /// The maximum percentage of the maximum PoV size that the collator can use. + /// It will be removed once https://github.com/paritytech/polkadot-sdk/issues/6020 is fixed. + pub max_pov_percentage: Option, /// A builder for inherent data builders. pub create_inherent_data_providers: CIDP, /// The collator service used for bundling proposals into collations and announcing @@ -112,13 +113,13 @@ pub async fn run( let Params { additional_digests_provider, - mut block_import, + block_import, collator_service, create_inherent_data_providers, keystore, collator_peer_id, para_id, - mut proposer, + proposer, para_client, relay_client, force_authoring, @@ -126,6 +127,28 @@ pub async fn run( .. } = params; + // Create a Collator instance for inherent data creation and block building + type CIDPContext = (PHash, PersistedValidationData, NimbusId); + let collator_params = crate::collator::Params { + create_inherent_data_providers, + block_import, + relay_client: relay_client.clone(), + keystore: keystore.clone(), + para_id, + proposer, + collator_service, + }; + let mut collator = crate::collator::Collator::< + Block, + nimbus_primitives::NimbusPair, + _, + _, + _, + _, + _, + CIDPContext, + >::new(collator_params); + let mut last_processed_slot = 0; let mut last_relay_chain_block = Default::default(); @@ -155,7 +178,10 @@ pub async fn run( let parent_hash = parent_header.hash(); - if !collator_service.check_block_status(parent_hash, &parent_header) { + if !collator + .collator_service() + .check_block_status(parent_hash, &parent_header) + { continue; } @@ -249,52 +275,67 @@ pub async fn run( continue; } + let cidp_context = ( + *request.relay_parent(), + validation_data.clone(), + nimbus_id.clone(), + ); let inherent_data = try_request!( - create_inherent_data( - &create_inherent_data_providers, - para_id, - parent_header.hash(), - validation_data, - &relay_client, - *request.relay_parent(), - nimbus_id.clone(), - Some(timestamp), - collator_peer_id, - params.additional_relay_state_keys.clone(), - ) - .await + collator + .create_inherent_data( + *request.relay_parent(), + validation_data, + parent_header.hash(), + Some(timestamp), + params.additional_relay_state_keys.clone(), + collator_peer_id, + cidp_context, + ) + .await ); - let allowed_pov_size = { - // Cap the percentage at 85% (see https://github.com/paritytech/polkadot-sdk/issues/6020) - let capped_percentage = max_pov_percentage.min(85); - // Calculate the allowed POV size based on the percentage - (validation_data.max_pov_size as u128) - .saturating_mul(capped_percentage as u128) - .saturating_div(100) as usize - }; + let allowed_pov_size = if let Some(max_pov_percentage) = max_pov_percentage { + validation_data.max_pov_size * max_pov_percentage / 100 + } else { + // Set the block limit to 85% of the maximum PoV size. + // + // Once https://github.com/paritytech/polkadot-sdk/issues/6020 issue is + // fixed, this should be removed. + validation_data.max_pov_size * 85 / 100 + } as usize; + + // Create slot claim and get additional digests + let slot_claim = crate::collator::SlotClaim::unchecked::( + nimbus_id.clone(), + timestamp, + ); + let additional_digests: Vec<_> = additional_digests_provider + .provide_digests(nimbus_id, parent_hash) + .into_iter() + .collect(); let maybe_collation = try_request!( - super::collate::( - &additional_digests_provider, - nimbus_id, - &mut block_import, - &collator_service, - &*keystore, - &parent_header, - &mut proposer, - inherent_data, - params.authoring_duration, - allowed_pov_size, - ) - .await + collator + .collate( + &parent_header, + &slot_claim, + additional_digests, + inherent_data, + params.authoring_duration, + allowed_pov_size, + ) + .await ); if let Some((collation, block_data)) = maybe_collation { let Some(block_hash) = block_data.blocks().first().map(|b| b.hash()) else { continue; }; - let result_sender = Some(collator_service.announce_with_barrier(block_hash)); + let result_sender = Some( + collator + .collator_service() + .announce_with_barrier(block_hash), + ); request.complete(Some(CollationResult { collation, result_sender, diff --git a/client/consensus/nimbus-consensus/src/collators/lookahead.rs b/client/consensus/nimbus-consensus/src/collators/lookahead.rs index 31869c8..3e8a012 100644 --- a/client/consensus/nimbus-consensus/src/collators/lookahead.rs +++ b/client/consensus/nimbus-consensus/src/collators/lookahead.rs @@ -69,8 +69,9 @@ pub struct Params, /// The underlying keystore, which should contain Aura consensus keys. pub keystore: KeystorePtr, /// A handle to the relay-chain client's "Overseer" or task orchestrator. @@ -152,6 +153,34 @@ where } }; + // Helper for pre-connecting to backing groups (fix for single collator scenarios) + let mut connection_helper = super::BackingGroupConnectionHelper::new( + params.keystore.clone(), + params.overseer_handle.clone(), + ); + + // Create a Collator instance for inherent data creation and block building + type CIDPContext = (PHash, PersistedValidationData, NimbusId); + let collator_params = crate::collator::Params { + create_inherent_data_providers: params.create_inherent_data_providers, + block_import: params.block_import, + relay_client: params.relay_client.clone(), + keystore: params.keystore.clone(), + para_id: params.para_id, + proposer: params.proposer, + collator_service: params.collator_service, + }; + let mut collator = crate::collator::Collator::< + Block, + nimbus_primitives::NimbusPair, + _, + _, + _, + _, + _, + CIDPContext, + >::new(collator_params); + // React to each new relay block while let Some(relay_parent_header) = import_notifications.next().await { let relay_parent = relay_parent_header.hash(); @@ -299,6 +328,12 @@ where let mut parent_hash = initial_parent.hash; let mut parent_header = initial_parent.header; let overseer_handle = &mut params.overseer_handle; + + // Proactively connect to backing groups. This is especially important for single + // collator setups where we always own the slot and thus would never trigger + // pre-connection through the normal code path. + connection_helper.update(slot_now).await; + for n_built in 0..2 { // Ask to the runtime if we are authorized to create a new parablock on top of this parent. // (This will claim the slot internally) @@ -333,32 +368,31 @@ where // // Create inherents data for the next parablock - let (parachain_inherent_data, other_inherent_data) = - match crate::create_inherent_data( - ¶ms.create_inherent_data_providers, - params.para_id, - parent_hash, - &PersistedValidationData { - parent_head: parent_header.encode().into(), - relay_parent_number: *relay_parent_header.number(), - relay_parent_storage_root: *relay_parent_header.state_root(), - max_pov_size, - }, - ¶ms.relay_client, + let validation_data = PersistedValidationData { + parent_head: parent_header.encode().into(), + relay_parent_number: *relay_parent_header.number(), + relay_parent_storage_root: *relay_parent_header.state_root(), + max_pov_size, + }; + let cidp_context = (relay_parent, validation_data.clone(), author_id.clone()); + let (parachain_inherent_data, other_inherent_data) = match collator + .create_inherent_data( relay_parent, - author_id.clone(), + &validation_data, + parent_hash, Some(timestamp), - params.collator_peer_id, params.additional_relay_state_keys.clone(), + params.collator_peer_id, + cidp_context, ) .await - { - Err(err) => { - tracing::error!(target: crate::LOG_TARGET, ?err); - break; - } - Ok(x) => x, - }; + { + Err(err) => { + tracing::error!(target: crate::LOG_TARGET, ?err); + break; + } + Ok(x) => x, + }; // Compute the hash of the parachain runtime bytecode that we using to build the block. // The hash will be send to relay validators alongside the candidate. @@ -377,28 +411,36 @@ where ) .await; - let allowed_pov_size = { - // Cap the percentage at 85% (see https://github.com/paritytech/polkadot-sdk/issues/6020) - let capped_percentage = params.max_pov_percentage.min(85); - // Calculate the allowed POV size based on the percentage - (max_pov_size as u128) - .saturating_mul(capped_percentage as u128) - .saturating_div(100) as usize - }; - - match super::collate( - ¶ms.additional_digests_provider, - author_id, - &mut params.block_import, - ¶ms.collator_service, - keystore, - &parent_header, - &mut params.proposer, - (parachain_inherent_data, other_inherent_data), - params.authoring_duration, - allowed_pov_size, - ) - .await + let allowed_pov_size = if let Some(max_pov_percentage) = params.max_pov_percentage { + max_pov_size * max_pov_percentage / 100 + } else { + // Set the block limit to 85% of the maximum PoV size. + // + // Once https://github.com/paritytech/polkadot-sdk/issues/6020 issue is + // fixed, this should be removed. + max_pov_size * 85 / 100 + } as usize; + + // Create slot claim and get additional digests + let slot_claim = crate::collator::SlotClaim::unchecked::< + nimbus_primitives::NimbusPair, + >(author_id.clone(), timestamp); + let additional_digests: Vec<_> = params + .additional_digests_provider + .provide_digests(author_id, parent_hash) + .into_iter() + .collect(); + + match collator + .collate( + &parent_header, + &slot_claim, + additional_digests, + (parachain_inherent_data, other_inherent_data), + params.authoring_duration, + allowed_pov_size, + ) + .await { Ok(Some((collation, block_data))) => { let Some(new_block_header) = @@ -412,7 +454,9 @@ where // Here we are assuming that the import logic protects against equivocations // and provides sybil-resistance, as it should. - params.collator_service.announce_block(new_block_hash, None); + collator + .collator_service() + .announce_block(new_block_hash, None); // TODO: Add PoV export functionality diff --git a/client/consensus/nimbus-consensus/src/collators/slot_based/block_builder_task.rs b/client/consensus/nimbus-consensus/src/collators/slot_based/block_builder_task.rs new file mode 100644 index 0000000..b972fce --- /dev/null +++ b/client/consensus/nimbus-consensus/src/collators/slot_based/block_builder_task.rs @@ -0,0 +1,686 @@ +// Copyright Moonsong Labs +// This file is part of Moonkit. + +// Moonkit is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Moonkit is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Moonkit. If not, see . + +use nimbus_primitives::NimbusId; +use parity_scale_codec::{Codec, Encode}; + +use cumulus_client_collator::service::ServiceInterface as CollatorServiceInterface; +use cumulus_client_consensus_common::{self as consensus_common, ParachainBlockImportMarker}; +use cumulus_client_consensus_proposer::ProposerInterface; +use cumulus_relay_chain_interface::RelayChainInterface; +use sp_consensus_slots::SlotDuration; +use sp_runtime::SaturatedConversion; + +use super::CollatorMessage; +use crate::{ + collator as collator_util, + collators::{ + check_validation_code_or_log, + slot_based::{ + relay_chain_data_cache::{RelayChainData, RelayChainDataCache}, + slot_timer::{SlotInfo, SlotTimer}, + }, + BackingGroupConnectionHelper, RelayParentData, + }, + NimbusApi, LOG_TARGET, +}; +use async_backing_primitives::{Slot, UnincludedSegmentApi}; +use cumulus_primitives_core::{ + extract_relay_parent, rpsr_digest, ClaimQueueOffset, CoreSelector, CumulusDigestItem, + PersistedValidationData, RelayParentOffsetApi, +}; +use futures::prelude::*; +use polkadot_primitives::{ + Block as RelayBlock, CoreIndex, Hash as RelayHash, Header as RelayHeader, Id as ParaId, +}; +use sc_client_api::{backend::AuxStore, BlockBackend, BlockOf, UsageProvider}; +use sc_consensus::BlockImport; +use sc_network_types::PeerId; +use sp_api::ProvideRuntimeApi; +use sp_application_crypto::AppPublic; +use sp_blockchain::HeaderBackend; +use sp_core::crypto::Pair; +use sp_inherents::CreateInherentDataProviders; +use sp_keystore::KeystorePtr; +use sp_runtime::traits::{Block as BlockT, Header as HeaderT, Member, Zero}; +use std::{collections::VecDeque, sync::Arc, time::Duration}; + +/// Parameters for [`run_block_builder`]. +pub struct BuilderTaskParams< + Block: BlockT, + BI, + CIDP, + Client, + Backend, + RelayClient, + CHP, + Proposer, + CS, + DP, +> { + /// Additional digest provider + pub additional_digests_provider: DP, + /// Inherent data providers. Only non-consensus inherent data should be provided, i.e. + /// the timestamp, slot, and paras inherents should be omitted, as they are set by this + /// collator. + pub create_inherent_data_providers: CIDP, + /// Used to actually import blocks. + pub block_import: BI, + /// The underlying para client. + pub para_client: Arc, + /// The para client's backend, used to access the database. + pub para_backend: Arc, + /// A handle to the relay-chain client. + pub relay_client: RelayClient, + /// A validation code hash provider, used to get the current validation code hash. + pub code_hash_provider: CHP, + /// The underlying keystore, which should contain Aura consensus keys. + pub keystore: KeystorePtr, + /// The collator network peer id. + pub collator_peer_id: PeerId, + /// The para's ID. + pub para_id: ParaId, + /// The underlying block proposer this should call into. + pub proposer: Proposer, + /// The generic collator service used to plug into this consensus engine. + pub collator_service: CS, + /// The amount of time to spend authoring each block. + pub authoring_duration: Duration, + /// Channel to send built blocks to the collation task. + pub collator_sender: sc_utils::mpsc::TracingUnboundedSender>, + /// Slot duration of the relay chain. + pub relay_chain_slot_duration: Duration, + /// The length of slots in this parachain. + /// If the parachain doesn't have slot and rely only on relay slots, set it to None. + pub para_slot_duration: Option, + /// Offset all time operations by this duration. + /// + /// This is a time quantity that is subtracted from the actual timestamp when computing + /// the time left to enter a new slot. In practice, this *left-shifts* the clock time with the + /// intent to keep our "clock" slightly behind the relay chain one and thus reducing the + /// likelihood of encountering unfavorable notification arrival timings (i.e. we don't want to + /// wait for relay chain notifications because we woke up too early). + pub slot_offset: Duration, + /// The maximum percentage of the maximum PoV size that the collator can use. + /// It will be removed once https://github.com/paritytech/polkadot-sdk/issues/6020 is fixed. + pub max_pov_percentage: Option, + /// Force production of the block even if the collator is not eligible + pub force_authoring: bool, + pub additional_relay_state_keys: Vec>, +} + +/// Run block-builder. +pub fn run_block_builder( + params: BuilderTaskParams, +) -> impl Future + Send + 'static +where + Block: BlockT, + Client: ProvideRuntimeApi + + UsageProvider + + BlockOf + + AuxStore + + HeaderBackend + + BlockBackend + + Send + + Sync + + 'static, + Client::Api: NimbusApi + RelayParentOffsetApi + UnincludedSegmentApi, + Backend: sc_client_api::Backend + 'static, + RelayClient: RelayChainInterface + Clone + 'static, + CIDP: CreateInherentDataProviders + 'static, + CIDP::InherentDataProviders: Send, + BI: BlockImport + ParachainBlockImportMarker + Send + Sync + 'static, + Proposer: ProposerInterface + Send + Sync + 'static, + CS: CollatorServiceInterface + Send + Sync + 'static, + CHP: consensus_common::ValidationCodeHashProvider + Send + 'static, + P: Pair, + P::Public: AppPublic + Member + Codec, + P::Signature: TryFrom> + Member + Codec, + DP: nimbus_primitives::DigestsProvider::Hash> + + Send + + Sync + + 'static, +{ + async move { + tracing::info!(target: LOG_TARGET, "Starting slot-based block-builder task."); + let BuilderTaskParams { + additional_digests_provider, + relay_client, + create_inherent_data_providers, + para_client, + keystore, + block_import, + collator_peer_id, + para_id, + proposer, + collator_service, + collator_sender, + code_hash_provider, + authoring_duration, + relay_chain_slot_duration, + para_slot_duration, + para_backend, + slot_offset, + max_pov_percentage, + force_authoring, + additional_relay_state_keys, + } = params; + + let mut slot_timer = SlotTimer::::new_with_offset( + slot_offset, + relay_chain_slot_duration, + para_slot_duration, + ); + + let mut collator = { + let params = collator_util::Params { + create_inherent_data_providers, + block_import, + relay_client: relay_client.clone(), + keystore: keystore.clone(), + para_id, + proposer, + collator_service, + }; + + collator_util::Collator::::new(params) + }; + + let mut relay_chain_data_cache = RelayChainDataCache::new(relay_client.clone(), para_id); + let mut connection_helper = BackingGroupConnectionHelper::new( + keystore.clone(), + relay_client + .overseer_handle() + // Should never fail. If it fails, then providing collations to relay chain + // doesn't work either. So it is fine to panic here. + .expect("Relay chain interface must provide overseer handle."), + ); + + loop { + // We wait here until the next slot arrives. + if slot_timer.wait_until_next_slot().await.is_err() { + tracing::error!(target: LOG_TARGET, "Unable to wait for next slot."); + return; + }; + + let Ok(relay_best_hash) = relay_client.best_block_hash().await else { + tracing::warn!(target: crate::LOG_TARGET, "Unable to fetch latest relay chain block hash."); + continue; + }; + + let best_hash = para_client.info().best_hash; + let relay_parent_offset = para_client + .runtime_api() + .relay_parent_offset(best_hash) + .unwrap_or_default(); + + let para_slot_duration = slot_timer.slot_duration(); + + let Ok(Some(rp_data)) = offset_relay_parent_find_descendants( + &mut relay_chain_data_cache, + relay_best_hash, + relay_parent_offset, + ) + .await + else { + continue; + }; + + let Some(para_slot) = adjust_para_to_relay_parent_slot( + rp_data.relay_parent(), + relay_chain_slot_duration, + SlotDuration::from_millis(para_slot_duration.as_millis().saturated_into::()), + ) else { + continue; + }; + + let relay_parent = rp_data.relay_parent().hash(); + let relay_parent_header = rp_data.relay_parent().clone(); + + let Some((included_header, parent)) = + crate::collators::find_parent(relay_parent, para_id, &*para_backend, &relay_client) + .await + else { + continue; + }; + + let parent_hash = parent.hash; + let parent_header = &parent.header; + + // Retrieve the core. + let core = match determine_core( + &mut relay_chain_data_cache, + &relay_parent_header, + para_id, + parent_header, + relay_parent_offset, + ) + .await + { + Err(()) => { + tracing::debug!( + target: LOG_TARGET, + ?relay_parent, + "Failed to determine core" + ); + + continue; + } + Ok(Some(cores)) => { + tracing::debug!( + target: LOG_TARGET, + ?relay_parent, + core_selector = ?cores.selector, + claim_queue_offset = ?cores.claim_queue_offset, + "Going to claim core", + ); + + cores + } + Ok(None) => { + tracing::debug!( + target: LOG_TARGET, + ?relay_parent, + "No core scheduled" + ); + + continue; + } + }; + + let Ok(RelayChainData { + max_pov_size, + last_claimed_core_selector, + .. + }) = relay_chain_data_cache + .get_mut_relay_chain_data(relay_parent) + .await + else { + continue; + }; + + slot_timer.update_scheduling(core.total_cores().into()); + + // We mainly call this to inform users at genesis if there is a mismatch with the + // on-chain data. + collator + .collator_service() + .check_block_status(parent_hash, parent_header); + + let Ok(relay_slot) = + sc_consensus_babe::find_pre_digest::(&relay_parent_header) + .map(|babe_pre_digest| babe_pre_digest.slot()) + else { + tracing::error!(target: crate::LOG_TARGET, "Relay chain does not contain babe slot. This should never happen."); + continue; + }; + + let included_header_hash = included_header.hash(); + + // Proactively connect to backing groups. This is especially important for single + // collator setups where we always own the slot and thus would never trigger + // pre-connection through the normal code path. + connection_helper.update(para_slot.slot).await; + + let slot_claim = match crate::collators::can_build_upon::<_, _, P>( + para_slot.slot, + relay_slot, + para_slot.timestamp, + relay_parent_header.clone(), + parent_header.clone(), + included_header_hash, + &*para_client, + &keystore, + force_authoring, + ) + .await + { + Some(slot) => slot, + None => { + tracing::debug!( + target: crate::LOG_TARGET, + unincluded_segment_len = parent.depth, + relay_parent = ?relay_parent, + relay_parent_num = %relay_parent_header.number(), + included_hash = ?included_header_hash, + included_num = %included_header.number(), + parent = ?parent_hash, + slot = ?para_slot.slot, + "Not building block." + ); + continue; + } + }; + + tracing::debug!( + target: crate::LOG_TARGET, + unincluded_segment_len = parent.depth, + relay_parent = %relay_parent, + relay_parent_num = %relay_parent_header.number(), + relay_parent_offset, + included_hash = %included_header_hash, + included_num = %included_header.number(), + parent = %parent_hash, + slot = ?para_slot.slot, + "Building block." + ); + + let validation_data = PersistedValidationData { + parent_head: parent_header.encode().into(), + relay_parent_number: *relay_parent_header.number(), + relay_parent_storage_root: *relay_parent_header.state_root(), + max_pov_size: *max_pov_size, + }; + + let (parachain_inherent_data, other_inherent_data) = match collator + .create_inherent_data_with_rp_offset( + relay_parent, + &validation_data, + parent_hash, + slot_claim.timestamp(), + Some(rp_data), + additional_relay_state_keys.clone(), + collator_peer_id, + (), // CIDP context (unit type for slot-based collator) + ) + .await + { + Err(err) => { + tracing::error!(target: crate::LOG_TARGET, ?err); + break; + } + Ok(x) => x, + }; + + let validation_code_hash = match code_hash_provider.code_hash_at(parent_hash) { + None => { + tracing::error!(target: crate::LOG_TARGET, ?parent_hash, "Could not fetch validation code hash"); + break; + } + Some(v) => v, + }; + + check_validation_code_or_log( + &validation_code_hash, + para_id, + &relay_client, + relay_parent, + ) + .await; + + let allowed_pov_size = if let Some(max_pov_percentage) = max_pov_percentage { + validation_data.max_pov_size * max_pov_percentage / 100 + } else { + // Set the block limit to 85% of the maximum PoV size. + // + // Once https://github.com/paritytech/polkadot-sdk/issues/6020 issue is + // fixed, this should be removed. + validation_data.max_pov_size * 85 / 100 + } as usize; + + let adjusted_authoring_duration = + slot_timer.adjust_authoring_duration(authoring_duration); + tracing::debug!(target: crate::LOG_TARGET, duration = ?adjusted_authoring_duration, "Adjusted proposal duration."); + + let Some(adjusted_authoring_duration) = adjusted_authoring_duration else { + tracing::debug!( + target: crate::LOG_TARGET, + unincluded_segment_len = parent.depth, + relay_parent = ?relay_parent, + relay_parent_num = %relay_parent_header.number(), + included_hash = ?included_header_hash, + included_num = %included_header.number(), + parent = ?parent_hash, + slot = ?para_slot.slot, + "Not building block due to insufficient authoring duration." + ); + + continue; + }; + + let additional_pre_digests: Vec = additional_digests_provider + .provide_digests(slot_claim.author_id(), parent_header.hash()) + .into_iter() + .collect(); + + let Ok(Some(candidate)) = collator + .build_block_and_import( + &parent_header, + &slot_claim, + Some(additional_pre_digests), + (parachain_inherent_data, other_inherent_data), + adjusted_authoring_duration, + allowed_pov_size, + ) + .await + else { + tracing::error!(target: crate::LOG_TARGET, "Unable to build block at slot."); + continue; + }; + + let new_block_hash = candidate.block.header().hash(); + + // Announce the newly built block to our peers. + collator + .collator_service() + .announce_block(new_block_hash, None); + + *last_claimed_core_selector = Some(core.core_selector()); + + if let Err(err) = collator_sender.unbounded_send(CollatorMessage { + relay_parent, + parent_header: parent_header.clone(), + parachain_candidate: candidate, + validation_code_hash, + core_index: core.core_index(), + max_pov_size: validation_data.max_pov_size, + }) { + tracing::error!(target: crate::LOG_TARGET, ?err, "Unable to send block to collation task."); + return; + } + } + } +} + +/// Translate the slot of the relay parent to the slot of the parachain. +fn adjust_para_to_relay_parent_slot( + relay_header: &RelayHeader, + relay_chain_slot_duration: Duration, + para_slot_duration: SlotDuration, +) -> Option { + let relay_slot = sc_consensus_babe::find_pre_digest::(&relay_header) + .map(|babe_pre_digest| babe_pre_digest.slot()) + .ok()?; + let new_slot = Slot::from_timestamp( + relay_slot.timestamp(SlotDuration::from_millis( + relay_chain_slot_duration.as_millis() as u64, + ))?, + para_slot_duration, + ); + let para_slot = SlotInfo { + slot: new_slot, + timestamp: new_slot.timestamp(para_slot_duration)?, + }; + tracing::debug!( + target: LOG_TARGET, + timestamp = ?para_slot.timestamp, + slot = ?para_slot.slot, + "Parachain slot adjusted to relay chain.", + ); + Some(para_slot) +} + +/// Finds a relay chain parent block at a specified offset from the best block, collecting its +/// descendants. +/// +/// # Returns +/// * `Ok(RelayParentData)` - Contains the target relay parent and its ordered list of descendants +/// * `Err(())` - If any relay chain block header cannot be retrieved +/// +/// The function traverses backwards from the best block until it finds the block at the specified +/// offset, collecting all blocks in between to maintain the chain of ancestry. +pub(crate) async fn offset_relay_parent_find_descendants( + relay_chain_data_cache: &mut RelayChainDataCache, + relay_best_block: RelayHash, + relay_parent_offset: u32, +) -> Result, ()> +where + RelayClient: RelayChainInterface + Clone + 'static, +{ + let Ok(mut relay_header) = relay_chain_data_cache + .get_mut_relay_chain_data(relay_best_block) + .await + .map(|d| d.relay_parent_header.clone()) + else { + tracing::error!(target: LOG_TARGET, ?relay_best_block, "Unable to fetch best relay chain block header."); + return Err(()); + }; + + if relay_parent_offset == 0 { + return Ok(Some(RelayParentData::new(relay_header))); + } + + if sc_consensus_babe::contains_epoch_change::(&relay_header) { + tracing::debug!(target: LOG_TARGET, ?relay_best_block, relay_best_block_number = relay_header.number(), "Relay parent is in previous session."); + return Ok(None); + } + + let mut required_ancestors: VecDeque = Default::default(); + required_ancestors.push_front(relay_header.clone()); + while required_ancestors.len() < relay_parent_offset as usize { + let next_header = relay_chain_data_cache + .get_mut_relay_chain_data(*relay_header.parent_hash()) + .await? + .relay_parent_header + .clone(); + if sc_consensus_babe::contains_epoch_change::(&next_header) { + tracing::debug!(target: LOG_TARGET, ?relay_best_block, ancestor = %next_header.hash(), ancestor_block_number = next_header.number(), "Ancestor of best block is in previous session."); + return Ok(None); + } + required_ancestors.push_front(next_header.clone()); + relay_header = next_header; + } + + let relay_parent = relay_chain_data_cache + .get_mut_relay_chain_data(*relay_header.parent_hash()) + .await? + .relay_parent_header + .clone(); + + tracing::debug!( + target: LOG_TARGET, + relay_parent_hash = %relay_parent.hash(), + relay_parent_num = relay_parent.number(), + num_descendants = required_ancestors.len(), + "Relay parent descendants." + ); + + Ok(Some(RelayParentData::new_with_descendants( + relay_parent, + required_ancestors.into(), + ))) +} + +/// Return value of [`determine_core`]. +pub(crate) struct Core { + selector: CoreSelector, + claim_queue_offset: ClaimQueueOffset, + core_index: CoreIndex, + number_of_cores: u16, +} + +impl Core { + /// Returns the current [`CoreSelector`]. + pub(crate) fn core_selector(&self) -> CoreSelector { + self.selector + } + + /// Returns the current [`CoreIndex`]. + pub(crate) fn core_index(&self) -> CoreIndex { + self.core_index + } + + /// Returns the total number of cores. + pub(crate) fn total_cores(&self) -> u16 { + self.number_of_cores + } +} + +/// Determine the core for the given `para_id`. +pub(crate) async fn determine_core( + relay_chain_data_cache: &mut RelayChainDataCache, + relay_parent: &RelayHeader, + para_id: ParaId, + para_parent: &H, + relay_parent_offset: u32, +) -> Result, ()> { + let cores_at_offset = &relay_chain_data_cache + .get_mut_relay_chain_data(relay_parent.hash()) + .await? + .claim_queue + .iter_claims_at_depth_for_para(relay_parent_offset as usize, para_id) + .collect::>(); + + let is_new_relay_parent = if para_parent.number().is_zero() { + true + } else { + match extract_relay_parent(para_parent.digest()) { + Some(last_relay_parent) => last_relay_parent != relay_parent.hash(), + None => { + rpsr_digest::extract_relay_parent_storage_root(para_parent.digest()) + .ok_or(())? + .0 != *relay_parent.state_root() + } + } + }; + + let core_info = CumulusDigestItem::find_core_info(para_parent.digest()); + + // If we are using a new relay parent, we can start over from the start. + let (selector, core_index) = if is_new_relay_parent { + let Some(core_index) = cores_at_offset.get(0) else { + return Ok(None); + }; + + (0, *core_index) + } else if let Some(core_info) = core_info { + let selector = core_info.selector.0 as usize + 1; + let Some(core_index) = cores_at_offset.get(selector) else { + return Ok(None); + }; + + (selector, *core_index) + } else { + let last_claimed_core_selector = relay_chain_data_cache + .get_mut_relay_chain_data(relay_parent.hash()) + .await? + .last_claimed_core_selector; + + let selector = last_claimed_core_selector.map_or(0, |cs| cs.0 as usize) + 1; + let Some(core_index) = cores_at_offset.get(selector) else { + return Ok(None); + }; + + (selector, *core_index) + }; + + Ok(Some(Core { + selector: CoreSelector(selector as u8), + core_index, + claim_queue_offset: ClaimQueueOffset(relay_parent_offset as u8), + number_of_cores: cores_at_offset.len() as u16, + })) +} diff --git a/client/consensus/nimbus-consensus/src/collators/slot_based/block_import.rs b/client/consensus/nimbus-consensus/src/collators/slot_based/block_import.rs new file mode 100644 index 0000000..abcd35d --- /dev/null +++ b/client/consensus/nimbus-consensus/src/collators/slot_based/block_import.rs @@ -0,0 +1,162 @@ +// Copyright Moonsong Labs +// This file is part of Moonkit. + +// Moonkit is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Moonkit is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Moonkit. If not, see . + +use futures::{stream::FusedStream, StreamExt}; +use sc_consensus::{BlockImport, StateAction}; +use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver, TracingUnboundedSender}; +use sp_api::{ApiExt, CallApiAt, CallContext, Core, ProvideRuntimeApi, StorageProof}; +use sp_runtime::traits::{Block as BlockT, Header as _}; +use sp_trie::proof_size_extension::ProofSizeExt; +use std::sync::Arc; + +/// Handle for receiving the block and the storage proof from the [`SlotBasedBlockImport`]. +/// +/// This handle should be passed to [`Params`](super::Params) or can also be dropped if the node is +/// not running as collator. +pub struct SlotBasedBlockImportHandle { + receiver: TracingUnboundedReceiver<(Block, StorageProof)>, +} + +impl SlotBasedBlockImportHandle { + /// Returns the next item. + /// + /// The future will never return when the internal channel is closed. + pub async fn next(&mut self) -> (Block, StorageProof) { + loop { + if self.receiver.is_terminated() { + futures::pending!() + } else if let Some(res) = self.receiver.next().await { + return res; + } + } + } +} + +/// Special block import for the slot based collator. +pub struct SlotBasedBlockImport { + inner: BI, + client: Arc, + sender: TracingUnboundedSender<(Block, StorageProof)>, +} + +impl SlotBasedBlockImport { + /// Create a new instance. + /// + /// The returned [`SlotBasedBlockImportHandle`] needs to be passed to the + /// [`Params`](super::Params), so that this block import instance can communicate with the + /// collation task. If the node is not running as a collator, just dropping the handle is fine. + pub fn new(inner: BI, client: Arc) -> (Self, SlotBasedBlockImportHandle) { + let (sender, receiver) = tracing_unbounded("SlotBasedBlockImportChannel", 1000); + + ( + Self { + sender, + client, + inner, + }, + SlotBasedBlockImportHandle { receiver }, + ) + } +} + +impl Clone for SlotBasedBlockImport { + fn clone(&self) -> Self { + Self { + inner: self.inner.clone(), + client: self.client.clone(), + sender: self.sender.clone(), + } + } +} + +#[async_trait::async_trait] +impl BlockImport for SlotBasedBlockImport +where + Block: BlockT, + BI: BlockImport + Send + Sync, + BI::Error: Into, + Client: ProvideRuntimeApi + CallApiAt + Send + Sync, + Client::StateBackend: Send, + Client::Api: Core, +{ + type Error = sp_consensus::Error; + + async fn check_block( + &self, + block: sc_consensus::BlockCheckParams, + ) -> Result { + self.inner.check_block(block).await.map_err(Into::into) + } + + async fn import_block( + &self, + mut params: sc_consensus::BlockImportParams, + ) -> Result { + // If the channel exists and it is required to execute the block, we will execute the block + // here. This is done to collect the storage proof and to prevent re-execution, we push + // downwards the state changes. `StateAction::ApplyChanges` is ignored, because it either + // means that the node produced the block itself or the block was imported via state sync. + if !self.sender.is_closed() && !matches!(params.state_action, StateAction::ApplyChanges(_)) + { + let mut runtime_api = self.client.runtime_api(); + + runtime_api.set_call_context(CallContext::Onchain); + + runtime_api.record_proof(); + let recorder = runtime_api + .proof_recorder() + .expect("Proof recording is enabled in the line above; qed."); + runtime_api.register_extension(ProofSizeExt::new(recorder)); + + let parent_hash = *params.header.parent_hash(); + + let block = Block::new( + params.header.clone(), + params.body.clone().unwrap_or_default(), + ); + + runtime_api + .execute_block(parent_hash, block.clone().into()) + .map_err(|e| Box::new(e) as Box<_>)?; + + let storage_proof = runtime_api + .extract_proof() + .expect("Proof recording was enabled above; qed"); + + let state = self + .client + .state_at(parent_hash) + .map_err(|e| Box::new(e) as Box<_>)?; + let gen_storage_changes = runtime_api + .into_storage_changes(&state, parent_hash) + .map_err(sp_consensus::Error::ChainLookup)?; + + if params.header.state_root() != &gen_storage_changes.transaction_storage_root { + return Err(sp_consensus::Error::Other(Box::new( + sp_blockchain::Error::InvalidStateRoot, + ))); + } + + params.state_action = StateAction::ApplyChanges(sc_consensus::StorageChanges::Changes( + gen_storage_changes, + )); + + let _ = self.sender.unbounded_send((block, storage_proof)); + } + + self.inner.import_block(params).await.map_err(Into::into) + } +} diff --git a/client/consensus/nimbus-consensus/src/collators/slot_based/collation_task.rs b/client/consensus/nimbus-consensus/src/collators/slot_based/collation_task.rs new file mode 100644 index 0000000..a5251a2 --- /dev/null +++ b/client/consensus/nimbus-consensus/src/collators/slot_based/collation_task.rs @@ -0,0 +1,187 @@ +// Copyright Moonsong Labs +// This file is part of Moonkit. + +// Moonkit is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Moonkit is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Moonkit. If not, see . + +use parity_scale_codec::Encode; +use std::path::PathBuf; + +use cumulus_client_collator::service::ServiceInterface as CollatorServiceInterface; +use cumulus_relay_chain_interface::RelayChainInterface; + +use polkadot_node_primitives::{MaybeCompressedPoV, SubmitCollationParams}; +use polkadot_node_subsystem::messages::CollationGenerationMessage; +use polkadot_overseer::Handle as OverseerHandle; +use polkadot_primitives::{CollatorPair, Id as ParaId}; + +use cumulus_primitives_core::relay_chain::BlockId; +use futures::prelude::*; + +use crate::export_pov_to_path; +use sc_utils::mpsc::TracingUnboundedReceiver; +use sp_runtime::traits::{Block as BlockT, Header}; + +use super::CollatorMessage; + +const LOG_TARGET: &str = "nimbus::cumulus::collation_task"; + +/// Parameters for the collation task. +pub struct Params { + /// A handle to the relay-chain client. + pub relay_client: RClient, + /// The collator key used to sign collations before submitting to validators. + pub collator_key: CollatorPair, + /// The para's ID. + pub para_id: ParaId, + /// Whether we should reinitialize the collator config (i.e. we are transitioning to aura). + pub reinitialize: bool, + /// Collator service interface + pub collator_service: CS, + /// Receiver channel for communication with the block builder task. + pub collator_receiver: TracingUnboundedReceiver>, + /// The handle from the special slot based block import. + pub block_import_handle: super::SlotBasedBlockImportHandle, + /// When set, the collator will export every produced `POV` to this folder. + pub export_pov: Option, +} + +/// Asynchronously executes the collation task for a parachain. +/// +/// This function initializes the collator subsystems necessary for producing and submitting +/// collations to the relay chain. It listens for new best relay chain block notifications and +/// handles collator messages. If our parachain is scheduled on a core and we have a candidate, +/// the task will build a collation and send it to the relay chain. +pub async fn run_collation_task( + Params { + relay_client, + collator_key, + para_id, + reinitialize, + collator_service, + mut collator_receiver, + mut block_import_handle, + export_pov, + }: Params, +) where + Block: BlockT, + CS: CollatorServiceInterface + Send + Sync + 'static, + RClient: RelayChainInterface + Clone + 'static, +{ + let Ok(mut overseer_handle) = relay_client.overseer_handle() else { + tracing::error!(target: LOG_TARGET, "Failed to get overseer handle."); + return; + }; + + cumulus_client_collator::initialize_collator_subsystems( + &mut overseer_handle, + collator_key, + para_id, + reinitialize, + ) + .await; + + loop { + futures::select! { + collator_message = collator_receiver.next() => { + let Some(message) = collator_message else { + return; + }; + + handle_collation_message(message, &collator_service, &mut overseer_handle,relay_client.clone(),export_pov.clone()).await; + }, + block_import_msg = block_import_handle.next().fuse() => { + // TODO: Implement me. + // Issue: https://github.com/paritytech/polkadot-sdk/issues/6495 + let _ = block_import_msg; + } + } + } +} + +/// Handle an incoming collation message from the block builder task. +/// This builds the collation from the [`CollatorMessage`] and submits it to +/// the collation-generation subsystem of the relay chain. +async fn handle_collation_message( + message: CollatorMessage, + collator_service: &impl CollatorServiceInterface, + overseer_handle: &mut OverseerHandle, + relay_client: RClient, + export_pov: Option, +) { + let CollatorMessage { + parent_header, + parachain_candidate, + validation_code_hash, + relay_parent, + core_index, + max_pov_size, + } = message; + + let hash = parachain_candidate.block.header().hash(); + let number = *parachain_candidate.block.header().number(); + let (collation, block_data) = + match collator_service.build_collation(&parent_header, hash, parachain_candidate) { + Some(collation) => collation, + None => { + tracing::warn!(target: LOG_TARGET, %hash, ?number, ?core_index, "Unable to build collation."); + return; + } + }; + + block_data.log_size_info(); + + if let MaybeCompressedPoV::Compressed(ref pov) = collation.proof_of_validity { + if let Some(pov_path) = export_pov { + if let Ok(Some(relay_parent_header)) = + relay_client.header(BlockId::Hash(relay_parent)).await + { + if let Some(header) = block_data.blocks().first().map(|b| b.header()) { + export_pov_to_path::( + pov_path.clone(), + pov.clone(), + header.hash(), + *header.number(), + parent_header.clone(), + relay_parent_header.state_root, + relay_parent_header.number, + max_pov_size, + ); + } + } else { + tracing::error!(target: LOG_TARGET, "Failed to get relay parent header from hash: {relay_parent:?}"); + } + } + + tracing::info!( + target: LOG_TARGET, + "Compressed PoV size: {}kb", + pov.block_data.0.len() as f64 / 1024f64, + ); + } + + tracing::debug!(target: LOG_TARGET, ?core_index, %hash, %number, "Submitting collation for core."); + overseer_handle + .send_msg( + CollationGenerationMessage::SubmitCollation(SubmitCollationParams { + relay_parent, + collation, + parent_head: parent_header.encode().into(), + validation_code_hash, + core_index, + result_sender: None, + }), + "SubmitCollation", + ) + .await; +} diff --git a/client/consensus/nimbus-consensus/src/collators/slot_based/mod.rs b/client/consensus/nimbus-consensus/src/collators/slot_based/mod.rs new file mode 100644 index 0000000..ea5ef2c --- /dev/null +++ b/client/consensus/nimbus-consensus/src/collators/slot_based/mod.rs @@ -0,0 +1,286 @@ +// Copyright Moonsong Labs +// This file is part of Moonkit. + +// Moonkit is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Moonkit is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Moonkit. If not, see . + +//! # Architecture Overview +//! +//! The block building mechanism operates through two coordinated tasks: +//! +//! 1. **Block Builder Task**: Orchestrates the timing and execution of parachain block production +//! 2. **Collator Task**: Processes built blocks into collations for relay chain submission +//! +//! # Block Builder Task Details +//! +//! The block builder task manages block production timing and execution through an iterative +//! process: +//! +//! 1. Awaits the next production signal from the internal timer +//! 2. Retrieves the current best relay chain block and identifies a valid parent block (see +//! [find_potential_parents][cumulus_client_consensus_common::find_potential_parents] for parent +//! selection criteria) +//! 3. Validates that: +//! - The parachain has an assigned core on the relay chain +//! - No block has been previously built on the target core +//! 4. Executes block building and import operations +//! 5. Transmits the completed block to the collator task +//! +//! # Block Production Timing +//! +//! When a block is produced is determined by the following parameters: +//! +//! - Parachain slot duration +//! - Number of assigned parachain cores +//! - Parachain runtime configuration +//! +//! ## Timing Examples +//! +//! The following table demonstrates various timing configurations and their effects. The "AURA +//! Slot" column shows which author is responsible for the block. +//! +//! | Slot Duration (ms) | Cores | Production Attempts (ms) | AURA Slot | +//! |-------------------|--------|-------------------------|------------| +//! | 2000 | 3 | 0, 2000, 4000, 6000 | 0, 1, 2, 3 | +//! | 6000 | 1 | 0, 6000, 12000, 18000 | 0, 1, 2, 3 | +//! | 6000 | 3 | 0, 2000, 4000, 6000 | 0, 0, 0, 1 | +//! | 12000 | 1 | 0, 6000, 12000, 18000 | 0, 0, 1, 1 | +//! | 12000 | 3 | 0, 2000, 4000, 6000 | 0, 0, 0, 0 | +//! +//! # Collator Task Details +//! +//! The collator task receives built blocks from the block builder task and performs two primary +//! functions: +//! +//! 1. Block compression +//! 2. Submission to the collation-generation subsystem + +use self::{block_builder_task::run_block_builder, collation_task::run_collation_task}; +use crate::NimbusApi; +use async_backing_primitives::UnincludedSegmentApi; +pub use block_import::{SlotBasedBlockImport, SlotBasedBlockImportHandle}; +use consensus_common::ParachainCandidate; +use cumulus_client_collator::service::ServiceInterface as CollatorServiceInterface; +use cumulus_client_consensus_common::{self as consensus_common, ParachainBlockImportMarker}; +use cumulus_client_consensus_proposer::ProposerInterface; +use cumulus_primitives_core::RelayParentOffsetApi; +use cumulus_relay_chain_interface::RelayChainInterface; +use futures::FutureExt; +use parity_scale_codec::Codec; +use polkadot_primitives::{ + CollatorPair, CoreIndex, Hash as RelayHash, Id as ParaId, ValidationCodeHash, +}; +use sc_client_api::{backend::AuxStore, BlockBackend, BlockOf, UsageProvider}; +use sc_consensus::BlockImport; +use sc_network_types::PeerId; +use sc_utils::mpsc::tracing_unbounded; +use sp_api::ProvideRuntimeApi; +use sp_application_crypto::AppPublic; +use sp_blockchain::HeaderBackend; +use sp_core::{crypto::Pair, traits::SpawnEssentialNamed}; +use sp_inherents::CreateInherentDataProviders; +use sp_keystore::KeystorePtr; +use sp_runtime::traits::{Block as BlockT, Member}; +use std::{path::PathBuf, sync::Arc, time::Duration}; + +mod block_builder_task; +mod block_import; +mod collation_task; +mod relay_chain_data_cache; + +mod slot_timer; + +/// Parameters for [`run`]. +pub struct Params { + /// Additional digest provider + pub additional_digests_provider: DP, + /// Inherent data providers. Only non-consensus inherent data should be provided, i.e. + /// the timestamp, slot, and paras inherents should be omitted, as they are set by this + /// collator. + pub create_inherent_data_providers: CIDP, + /// Used to actually import blocks. + pub block_import: BI, + /// The underlying para client. + pub para_client: Arc, + /// The para client's backend, used to access the database. + pub para_backend: Arc, + /// A handle to the relay-chain client. + pub relay_client: RClient, + /// A validation code hash provider, used to get the current validation code hash. + pub code_hash_provider: CHP, + /// The underlying keystore, which should contain Aura consensus keys. + pub keystore: KeystorePtr, + /// The collator key used to sign collations before submitting to validators. + pub collator_key: CollatorPair, + /// The collator network peer id. + pub collator_peer_id: PeerId, + /// The para's ID. + pub para_id: ParaId, + /// The underlying block proposer this should call into. + pub proposer: Proposer, + /// The generic collator service used to plug into this consensus engine. + pub collator_service: CS, + /// The amount of time to spend authoring each block. + pub authoring_duration: Duration, + /// Whether we should reinitialize the collator config (i.e. we are transitioning to aura). + pub reinitialize: bool, + /// Offset slots by a fixed duration. This can be used to create more preferrable authoring + /// timings. + pub slot_offset: Duration, + /// The handle returned by [`SlotBasedBlockImport`]. + pub block_import_handle: SlotBasedBlockImportHandle, + /// Spawner for spawning futures. + pub spawner: Spawner, + /// Slot duration of the relay chain + pub relay_chain_slot_duration: Duration, + /// The length of slots in this parachain. + /// If the parachain doesn't have slot and rely only on relay slots, set it to None. + pub para_slot_duration: Option, + /// When set, the collator will export every produced `POV` to this folder. + pub export_pov: Option, + /// The maximum percentage of the maximum PoV size that the collator can use. + /// It will be removed once is fixed. + pub max_pov_percentage: Option, + /// Force production of the block even if the collator is not eligible + pub force_authoring: bool, + pub additional_relay_state_keys: Vec>, +} + +/// Run aura-based block building and collation task. +pub fn run( + params: Params, +) where + Block: BlockT, + Client: ProvideRuntimeApi + + BlockOf + + AuxStore + + HeaderBackend + + BlockBackend + + UsageProvider + + Send + + Sync + + 'static, + Client::Api: NimbusApi + UnincludedSegmentApi + RelayParentOffsetApi, + Backend: sc_client_api::Backend + 'static, + RClient: RelayChainInterface + Clone + 'static, + CIDP: CreateInherentDataProviders + 'static, + CIDP::InherentDataProviders: Send, + BI: BlockImport + ParachainBlockImportMarker + Send + Sync + 'static, + Proposer: ProposerInterface + Send + Sync + 'static, + CS: CollatorServiceInterface + Send + Sync + Clone + 'static, + CHP: consensus_common::ValidationCodeHashProvider + Send + 'static, + P: Pair + 'static, + P::Public: AppPublic + Member + Codec, + P::Signature: TryFrom> + Member + Codec, + Spawner: SpawnEssentialNamed, + DP: nimbus_primitives::DigestsProvider::Hash> + + Send + + Sync + + 'static, +{ + let Params { + additional_digests_provider, + create_inherent_data_providers, + block_import, + para_client, + para_backend, + relay_client, + code_hash_provider, + keystore, + collator_key, + collator_peer_id, + para_id, + proposer, + collator_service, + authoring_duration, + reinitialize, + slot_offset, + block_import_handle, + spawner, + export_pov, + relay_chain_slot_duration, + para_slot_duration, + max_pov_percentage, + force_authoring, + additional_relay_state_keys, + } = params; + + let (tx, rx) = tracing_unbounded("mpsc_builder_to_collator", 100); + let collator_task_params = collation_task::Params { + relay_client: relay_client.clone(), + collator_key, + para_id, + reinitialize, + collator_service: collator_service.clone(), + collator_receiver: rx, + block_import_handle, + export_pov, + }; + + let collation_task_fut = run_collation_task::(collator_task_params); + + let block_builder_params = block_builder_task::BuilderTaskParams { + additional_digests_provider, + create_inherent_data_providers, + block_import, + para_client, + para_backend, + relay_client, + code_hash_provider, + keystore, + para_id, + proposer, + collator_service, + authoring_duration, + collator_sender: tx, + relay_chain_slot_duration, + para_slot_duration, + slot_offset, + max_pov_percentage, + force_authoring, + additional_relay_state_keys, + collator_peer_id, + }; + + let block_builder_fut = + run_block_builder::(block_builder_params); + + spawner.spawn_essential_blocking( + "slot-based-block-builder", + Some("slot-based-collator"), + block_builder_fut.boxed(), + ); + spawner.spawn_essential_blocking( + "slot-based-collation", + Some("slot-based-collator"), + collation_task_fut.boxed(), + ); +} + +/// Message to be sent from the block builder to the collation task. +/// +/// Contains all data necessary to submit a collation to the relay chain. +struct CollatorMessage { + /// The hash of the relay chain block that provides the context for the parachain block. + pub relay_parent: RelayHash, + /// The header of the parent block. + pub parent_header: Block::Header, + /// The parachain block candidate. + pub parachain_candidate: ParachainCandidate, + /// The validation code hash at the parent block. + pub validation_code_hash: ValidationCodeHash, + /// Core index that this block should be submitted on + pub core_index: CoreIndex, + /// Maximum pov size. Currently needed only for exporting PoV. + pub max_pov_size: u32, +} diff --git a/client/consensus/nimbus-consensus/src/collators/slot_based/relay_chain_data_cache.rs b/client/consensus/nimbus-consensus/src/collators/slot_based/relay_chain_data_cache.rs new file mode 100644 index 0000000..5310881 --- /dev/null +++ b/client/consensus/nimbus-consensus/src/collators/slot_based/relay_chain_data_cache.rs @@ -0,0 +1,114 @@ +// Copyright Moonsong Labs +// This file is part of Moonkit. + +// Moonkit is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Moonkit is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Moonkit. If not, see . + +use crate::collators::claim_queue_at; +use cumulus_primitives_core::CoreSelector; +use cumulus_relay_chain_interface::RelayChainInterface; +use polkadot_node_subsystem_util::runtime::ClaimQueueSnapshot; +use polkadot_primitives::{ + Hash as RelayHash, Header as RelayHeader, Id as ParaId, OccupiedCoreAssumption, +}; +use sp_runtime::generic::BlockId; + +/// Contains relay chain data necessary for parachain block building. +#[derive(Clone, Debug)] +pub struct RelayChainData { + /// Current relay chain parent header. + pub relay_parent_header: RelayHeader, + /// The claim queue at the relay parent. + pub claim_queue: ClaimQueueSnapshot, + /// Maximum configured PoV size on the relay chain. + pub max_pov_size: u32, + /// The last [`CoreSelector`] we used. + pub last_claimed_core_selector: Option, +} + +/// Simple helper to fetch relay chain data and cache it based on the current relay chain best block +/// hash. +pub struct RelayChainDataCache { + relay_client: RI, + para_id: ParaId, + cached_data: schnellru::LruMap, +} + +impl RelayChainDataCache +where + RI: RelayChainInterface + 'static, +{ + pub fn new(relay_client: RI, para_id: ParaId) -> Self { + Self { + relay_client, + para_id, + // 50 cached relay chain blocks should be more than enough. + cached_data: schnellru::LruMap::new(schnellru::ByLength::new(50)), + } + } + + /// Fetch required [`RelayChainData`] from the relay chain. + /// If this data has been fetched in the past for the incoming hash, it will reuse + /// cached data. + pub async fn get_mut_relay_chain_data( + &mut self, + relay_parent: RelayHash, + ) -> Result<&mut RelayChainData, ()> { + let insert_data = if self.cached_data.peek(&relay_parent).is_some() { + tracing::trace!(target: crate::LOG_TARGET, %relay_parent, "Using cached data for relay parent."); + None + } else { + tracing::trace!(target: crate::LOG_TARGET, %relay_parent, "Relay chain best block changed, fetching new data from relay chain."); + Some(self.update_for_relay_parent(relay_parent).await?) + }; + + Ok(self + .cached_data + .get_or_insert(relay_parent, || { + insert_data.expect("`insert_data` exists if not cached yet; qed") + }) + .expect("There is space for at least one element; qed")) + } + + /// Fetch fresh data from the relay chain for the given relay parent hash. + async fn update_for_relay_parent(&self, relay_parent: RelayHash) -> Result { + let claim_queue = claim_queue_at(relay_parent, &self.relay_client).await; + + let Ok(Some(relay_parent_header)) = + self.relay_client.header(BlockId::Hash(relay_parent)).await + else { + tracing::warn!(target: crate::LOG_TARGET, "Unable to fetch latest relay chain block header."); + return Err(()); + }; + + let max_pov_size = match self + .relay_client + .persisted_validation_data(relay_parent, self.para_id, OccupiedCoreAssumption::Included) + .await + { + Ok(None) => return Err(()), + Ok(Some(pvd)) => pvd.max_pov_size, + Err(err) => { + tracing::error!(target: crate::LOG_TARGET, ?err, "Failed to gather information from relay-client"); + return Err(()); + } + }; + + Ok(RelayChainData { + relay_parent_header, + claim_queue, + max_pov_size, + last_claimed_core_selector: None, + }) + } +} diff --git a/client/consensus/nimbus-consensus/src/collators/slot_based/slot_timer.rs b/client/consensus/nimbus-consensus/src/collators/slot_based/slot_timer.rs new file mode 100644 index 0000000..530e388 --- /dev/null +++ b/client/consensus/nimbus-consensus/src/collators/slot_based/slot_timer.rs @@ -0,0 +1,625 @@ +// Copyright Moonsong Labs +// This file is part of Moonkit. + +// Moonkit is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Moonkit is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Moonkit. If not, see . + +use crate::LOG_TARGET; +use async_backing_primitives::Slot; +use parity_scale_codec::Codec; +use sp_application_crypto::AppPublic; +use sp_consensus_slots::SlotDuration; +use sp_core::Pair; +use sp_runtime::{traits::Member, SaturatedConversion}; +use sp_timestamp::Timestamp; +use std::{ + cmp::{max, min}, + time::Duration, +}; + +/// Lower limits of allowed block production interval. +/// Defensive mechanism, corresponds to 12 cores at 6 second block time. +const BLOCK_PRODUCTION_MINIMUM_INTERVAL_MS: Duration = Duration::from_millis(500); + +/// Theoretically, the block production is capped at `BLOCK_PRODUCTION_MINIMUM_INTERVAL_MS`. +/// In practice, there might be slight deviations due to timing inaccuracies and delays. +/// +/// This constant is taken into account while adjusting the authoring duration to fit into the slot. +/// Therefore, it will only reduce the authoring duration if we are within the +/// `BLOCK_PRODUCTION_ADJUSTMENT_MS` threshold of the next slot. +/// +/// ### 12 cores 500ms blocks +/// +/// For example, for 12 cores 500ms blocks: the next slot is scheduled in 490ms due to delays. +/// In that case, we still want to attempt producing the block, as missing the slot would be worse +/// than producing slightly too fast. +const BLOCK_PRODUCTION_THRESHOLD_MS: Duration = Duration::from_millis(100); + +/// The amount of time the authoring duration of the last block production attempt +/// should be reduced by to fit into the slot timing. +const BLOCK_PRODUCTION_ADJUSTMENT_MS: Duration = Duration::from_millis(1000); + +#[derive(Debug)] +pub(crate) struct SlotInfo { + pub timestamp: Timestamp, + pub slot: Slot, +} + +/// Manages block-production timings based on chain parameters and assigned cores. +#[derive(Debug)] +pub(crate) struct SlotTimer { + /// Offset the current time by this duration. + time_offset: Duration, + /// Last reported core count. + last_reported_core_num: Option, + /// Slot duration of the relay chain. This is used to compute how man block-production + /// attempts we should trigger per relay chain block. + relay_slot_duration: Duration, + /// The length of slots in this parachain. + /// If the parachain doesn't have slot and rely only on relay slots, set it to None. + para_slot_duration: Option, + /// Stores the latest slot that was reported by [`Self::wait_until_next_slot`]. + last_reported_slot: Option, + _marker: std::marker::PhantomData<(Block, Box)>, +} + +/// Compute when to try block-authoring next. +/// The exact time point is determined by the slot duration of relay- and parachain as +/// well as the last observed core count. If more cores are available, we attempt to author blocks +/// for them. +/// +/// Returns a tuple with: +/// - `Duration`: How long to wait until the next slot. +/// - `Slot`: The Nimbus slot used for authoring +fn compute_next_wake_up_time( + para_slot_duration: SlotDuration, + relay_slot_duration: Duration, + core_count: Option, + time_now: Duration, + time_offset: Duration, +) -> (Duration, Slot) { + let para_slots_per_relay_block = + (relay_slot_duration.as_millis() / para_slot_duration.as_millis() as u128) as u32; + let assigned_core_num = core_count.unwrap_or(1); + + // Trigger at least once per relay block, if we have for example 12 second slot duration, + // we should still produce two blocks if we are scheduled on every relay block. + let mut block_production_interval = min(para_slot_duration.as_duration(), relay_slot_duration); + + if assigned_core_num > para_slots_per_relay_block + && para_slot_duration.as_duration() >= relay_slot_duration + { + block_production_interval = max( + relay_slot_duration / assigned_core_num, + BLOCK_PRODUCTION_MINIMUM_INTERVAL_MS, + ); + tracing::debug!( + target: LOG_TARGET, + ?block_production_interval, + "Expected to produce for {assigned_core_num} cores but only have {para_slots_per_relay_block} slots. Attempting to produce multiple blocks per slot." + ); + } + + let (duration, timestamp) = + time_until_next_attempt(time_now, block_production_interval, time_offset); + let para_slot = Slot::from_timestamp( + timestamp, + SlotDuration::from_millis(para_slot_duration.as_millis().saturated_into()), + ); + (duration, para_slot) +} + +/// Compute the time until the next slot changes. +/// +/// Returns None if the next slot cannot be computed. +fn compute_time_until_next_slot_change( + para_slot_duration: SlotDuration, + time_now: Duration, + time_offset: Duration, + last_reported_slot: Slot, +) -> Option<(Duration, Slot)> { + let now = time_now.saturating_sub(time_offset); + let next_slot = last_reported_slot + Slot::from(1); + + let Some(next_slot_timestamp) = next_slot.timestamp(para_slot_duration) else { + return None; + }; + let remaining_time = next_slot_timestamp.as_duration().saturating_sub(now); + + Some((remaining_time, next_slot)) +} + +/// Returns current duration since Unix epoch. +fn duration_now() -> Duration { + use std::time::SystemTime; + let now = SystemTime::now(); + now.duration_since(SystemTime::UNIX_EPOCH) + .unwrap_or_else(|e| { + panic!( + "Current time {:?} is before Unix epoch. Something is wrong: {:?}", + now, e + ) + }) +} + +/// Adjust the authoring duration. +fn adjust_authoring_duration( + mut authoring_duration: Duration, + next_block: (Duration, Slot), + next_slot_change: (Duration, Slot), + different_authors: bool, +) -> Option { + let (duration, next_block_slot) = next_block; + let (duration_until_next_slot, next_slot) = next_slot_change; + + // The authoring of blocks must stop 1 second before the slot ends. + let duration_until_deadline = + duration_until_next_slot.saturating_sub(BLOCK_PRODUCTION_ADJUSTMENT_MS); + tracing::debug!( + target: LOG_TARGET, + ?authoring_duration, + ?duration, + ?next_block_slot, + ?duration_until_next_slot, + ?next_slot, + ?duration_until_deadline, + ?different_authors, + "Adjusting authoring duration for slot.", + ); + + // Ensure no blocks are produced in the last second of the slot, + // regardless of authoring duration. + if duration_until_deadline == Duration::ZERO { + if different_authors { + tracing::warn!( + target: LOG_TARGET, + ?duration_until_next_slot, + ?next_slot, + "Not enough time left in the slot to adjust authoring duration. Skipping block production for the slot." + ); + + return None; + } + + // If authors are the same, we can still attempt producing the block + // considering the next block duration. + return Some(authoring_duration.min(duration)); + } + + // Clamp the authoring duration to fit into the slot deadline only if authors are different. + // For most cases, the deadline is farther in the future than the authoring duration. + if different_authors && authoring_duration >= duration_until_deadline { + authoring_duration = duration_until_deadline; + + // Ensure we are not going below the minimum interval within a reasonable threshold. + // For 12 cores, we might have a scenario where the last 3 blocks are skipped: + // - Block 10: next slot change in 1.493s: + // - After adjusting the deadline: 1.493s - 1s = 0.493s the block could be produced + // without issues. + // - Block 11: next slot change in 0.993s - skipped by the deadline + // - Block 12: next slot change in 0.493s - skipped by the deadline + if authoring_duration + < BLOCK_PRODUCTION_MINIMUM_INTERVAL_MS.saturating_sub(BLOCK_PRODUCTION_THRESHOLD_MS) + { + tracing::debug!( + target: LOG_TARGET, + ?authoring_duration, + ?next_slot, + "Authoring duration is below minimum. Skipping block production for the slot." + ); + return None; + } + } + + // The `duration` intends to slightly adjust when then block production + // attempt happens. This goes slightly below the `BLOCK_PRODUCTION_MINIMUM_INTERVAL_MS` + // threshold. + Some(authoring_duration.min(duration)) +} + +/// Returns the duration until the next block production should be attempted. +/// Returns: +/// - Duration: The duration until the next attempt. +fn time_until_next_attempt( + now: Duration, + block_production_interval: Duration, + offset: Duration, +) -> (Duration, Timestamp) { + let now = now.as_millis().saturating_sub(offset.as_millis()); + + let next_slot_time = ((now + block_production_interval.as_millis()) + / block_production_interval.as_millis()) + * block_production_interval.as_millis(); + let remaining_millis = next_slot_time - now; + ( + Duration::from_millis(remaining_millis as u64), + Timestamp::from(next_slot_time as u64), + ) +} + +impl SlotTimer +where + P: Pair, + P::Public: AppPublic + Member + Codec, + P::Signature: TryFrom> + Member + Codec, +{ + /// Create a new slot timer. + pub fn new_with_offset( + time_offset: Duration, + relay_slot_duration: Duration, + para_slot_duration: Option, + ) -> Self { + Self { + time_offset, + last_reported_core_num: None, + relay_slot_duration, + para_slot_duration, + last_reported_slot: Default::default(), + _marker: Default::default(), + } + } + + pub fn slot_duration(&self) -> Duration { + self.para_slot_duration + .unwrap_or(self.relay_slot_duration.into()) + } + + /// Inform the slot timer about the last seen number of cores. + pub fn update_scheduling(&mut self, num_cores_next_block: u32) { + self.last_reported_core_num = Some(num_cores_next_block); + } + + /// Returns the slot and how much time left until the next block production attempt. + pub fn time_until_next_block(&mut self, slot_duration: SlotDuration) -> (Duration, Slot) { + compute_next_wake_up_time( + slot_duration, + self.relay_slot_duration, + self.last_reported_core_num, + duration_now(), + self.time_offset, + ) + } + + /// Compute the time until the next slot changes. + fn time_until_next_slot_change( + &mut self, + slot_duration: SlotDuration, + ) -> Option<(Duration, Slot)> { + compute_time_until_next_slot_change( + slot_duration, + duration_now(), + self.time_offset, + self.last_reported_slot.unwrap_or_default(), + ) + } + + /// Adjust the authoring duration to fit into the slot timing. + /// + /// Returns the adjusted authoring duration and the slot that it corresponds to. + pub fn adjust_authoring_duration(&mut self, authoring_duration: Duration) -> Option { + let slot_duration = SlotDuration::from_millis(self.slot_duration().as_millis() as u64); + + let next_block = self.time_until_next_block(slot_duration); + let Some(next_slot_change) = self.time_until_next_slot_change(slot_duration) else { + tracing::error!( + target: LOG_TARGET, + "Failed to compute time until next slot change. Using unadjusted authoring duration." + ); + return Some(authoring_duration); + }; + + // Check if authors at current and next slots are different + let different_authors = true; + + adjust_authoring_duration( + authoring_duration, + next_block, + next_slot_change, + different_authors, + ) + } + + /// Returns a future that resolves when the next block production should be attempted. + pub async fn wait_until_next_slot(&mut self) -> Result<(), ()> { + let slot_duration = self.slot_duration(); + + let (time_until_next_attempt, mut next_para_slot) = + self.time_until_next_block(SlotDuration::from_millis(slot_duration.as_millis() as u64)); + + tracing::trace!( + target: LOG_TARGET, + ?time_until_next_attempt, + para_slot = ?next_para_slot, + last_reported = ?self.last_reported_slot, + "Determined next block production opportunity." + ); + + match self.last_reported_slot { + // If we already reported a slot, we don't want to skip a slot. But we also don't want + // to go through all the slots if a node was halted for some reason. + Some(ls) if ls + 1 < next_para_slot && next_para_slot <= ls + 3 => { + next_para_slot = ls + 1u64; + } + None | Some(_) => { + tracing::trace!(target: LOG_TARGET, ?time_until_next_attempt, "Sleeping until the next slot."); + tokio::time::sleep(time_until_next_attempt).await; + } + } + + tracing::debug!( + target: LOG_TARGET, + ?slot_duration, + para_slot = ?next_para_slot, + "New block production opportunity." + ); + + self.last_reported_slot = Some(next_para_slot); + + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use rstest::rstest; + use sp_consensus_slots::SlotDuration; + const RELAY_CHAIN_SLOT_DURATION: u64 = 6000; + + #[rstest] + // Test that different now timestamps have correct impact + // |||| + #[case(6000, Some(1), 1000, 0, 5000)] + #[case(6000, Some(1), 0, 0, 6000)] + #[case(6000, Some(1), 6000, 0, 6000)] + #[case(6000, Some(0), 6000, 0, 6000)] + // Test that `None` core defaults to 1 + // |||| + #[case(6000, None, 1000, 0, 5000)] + #[case(6000, None, 0, 0, 6000)] + #[case(6000, None, 6000, 0, 6000)] + // Test that offset affects the current time correctly + // |||| + #[case(6000, Some(1), 1000, 1000, 6000)] + #[case(6000, Some(1), 12000, 2000, 2000)] + #[case(6000, Some(1), 12000, 6000, 6000)] + #[case(6000, Some(1), 12000, 7000, 1000)] + // Test that number of cores affects the block production interval + // ||||||| + #[case(6000, Some(3), 12000, 0, 2000)] + #[case(6000, Some(2), 12000, 0, 3000)] + #[case(6000, Some(3), 11999, 0, 1)] + // High core count + // |||||||| + #[case(6000, Some(12), 0, 0, 500)] + /// Test that the minimum block interval is respected + /// at high core counts. + /// ||||||||| + #[case(6000, Some(100), 0, 0, 500)] + // Test that slot_duration works correctly + // |||| + #[case(2000, Some(1), 1000, 0, 1000)] + #[case(2000, Some(1), 3000, 0, 1000)] + #[case(2000, Some(1), 10000, 0, 2000)] + #[case(2000, Some(2), 1000, 0, 1000)] + // Cores are ignored if relay_slot_duration != para_slot_duration + // ||||||| + #[case(2000, Some(3), 3000, 0, 1000)] + // For long slot durations, we should still check + // every relay chain block for the slot. + // ||||| + #[case(12000, None, 0, 0, 6000)] + #[case(12000, None, 6100, 0, 5900)] + #[case(12000, None, 6000, 2000, 2000)] + #[case(12000, Some(2), 6000, 0, 3000)] + #[case(12000, Some(3), 6000, 0, 2000)] + #[case(12000, Some(3), 8100, 0, 1900)] + fn test_get_next_slot( + #[case] para_slot_millis: u64, + #[case] core_count: Option, + #[case] time_now: u64, + #[case] offset_millis: u64, + #[case] expected_wait_duration: u128, + ) { + let para_slot_duration = SlotDuration::from_millis(para_slot_millis); // 6 second slots + let relay_slot_duration = Duration::from_millis(RELAY_CHAIN_SLOT_DURATION); + let time_now = Duration::from_millis(time_now); // 1 second passed + let offset = Duration::from_millis(offset_millis); + + let (wait_duration, _) = compute_next_wake_up_time( + para_slot_duration, + relay_slot_duration, + core_count, + time_now, + offset, + ); + + assert_eq!( + wait_duration.as_millis(), + expected_wait_duration, + "Wait time mismatch." + ); // Should wait 5 seconds + } + + #[rstest] + // Basic slot change scenarios + #[case(6000, 0, 0, Slot::from(0), 6000, Slot::from(1))] + #[case(6000, 1000, 0, Slot::from(0), 5000, Slot::from(1))] + #[case(6000, 6000, 0, Slot::from(1), 6000, Slot::from(2))] + #[case(6000, 12000, 0, Slot::from(2), 6000, Slot::from(3))] + // Test with offset + #[case(6000, 1000, 1000, Slot::from(0), 6000, Slot::from(1))] + #[case(6000, 2000, 1000, Slot::from(0), 5000, Slot::from(1))] + #[case(6000, 6000, 3000, Slot::from(0), 3000, Slot::from(1))] + // Different slot durations + #[case(3000, 1000, 0, Slot::from(0), 2000, Slot::from(1))] + #[case(3000, 3000, 0, Slot::from(1), 3000, Slot::from(2))] + #[case(12000, 6000, 0, Slot::from(0), 6000, Slot::from(1))] + #[case(12000, 12000, 0, Slot::from(1), 12000, Slot::from(2))] + // Edge cases - at slot boundary + #[case(6000, 5999, 0, Slot::from(0), 1, Slot::from(1))] + #[case(6000, 11999, 0, Slot::from(1), 1, Slot::from(2))] + fn test_compute_time_until_next_slot_change( + #[case] para_slot_millis: u64, + #[case] time_now: u64, + #[case] offset_millis: u64, + #[case] last_reported_slot: Slot, + #[case] expected_duration: u128, + #[case] expected_next_slot: Slot, + ) { + let para_slot_duration = SlotDuration::from_millis(para_slot_millis); + let time_now = Duration::from_millis(time_now); + let offset = Duration::from_millis(offset_millis); + + let result = compute_time_until_next_slot_change( + para_slot_duration, + time_now, + offset, + last_reported_slot, + ); + + assert!(result.is_some(), "Expected result to be Some"); + let (duration, next_slot) = result.unwrap(); + assert_eq!(duration.as_millis(), expected_duration, "Duration mismatch"); + assert_eq!(next_slot, expected_next_slot, "Next slot mismatch"); + } + + #[rstest] + // Various scenarios for 2s block production adjustment. + #[case::blocks_2s_fits_next_block( + Duration::from_millis(2000), // Authoring duration + (Duration::from_millis(2000), Slot::from(1)), // Next block + (Duration::from_millis(4000), Slot::from(2)), // Next slot change + true, // Different authors + Some(Duration::from_millis(2000)), // Expected + )] + #[case::blocks_2s_closer_next_slot( + Duration::from_millis(2000), // Authoring duration + (Duration::from_millis(1950), Slot::from(1)), // Next block + (Duration::from_millis(4000), Slot::from(2)), // Next slot change + true, // Different authors + Some(Duration::from_millis(1950)), // Expected + )] + #[case::blocks_2s_closer_next_slot_bigger( + Duration::from_millis(2000), // Authoring duration + (Duration::from_millis(1500), Slot::from(1)), // Next block + (Duration::from_millis(4000), Slot::from(2)), // Next slot change + true, // Different authors + Some(Duration::from_millis(1500)), // Expected + )] + #[case::blocks_2s_reduce_by_1s( + Duration::from_millis(2000), // Authoring duration + (Duration::from_millis(2000), Slot::from(1)), // Next block + (Duration::from_millis(2000), Slot::from(2)), // Next slot change + true, // Different authors + Some(Duration::from_millis(1000)), // Expected + )] + #[case::blocks_2s_reduce_by_1s_plus_offset( + Duration::from_millis(2000), // Authoring duration + (Duration::from_millis(1950), Slot::from(1)), // Next block + (Duration::from_millis(1950), Slot::from(2)), // Next slot change + true, // Different authors + Some(Duration::from_millis(950)), // Expected + )] + #[case::blocks_2s_reduce_to_minimum( + Duration::from_millis(2000), // Authoring duration + (Duration::from_millis(1400), Slot::from(1)), // Next block + (Duration::from_millis(1400), Slot::from(2)), // Next slot change + true, // Different authors + Some(Duration::from_millis(400)), // Expected + )] + #[case::blocks_2s_reduce_below_minimum( + Duration::from_millis(2000), // Authoring duration + (Duration::from_millis(1300), Slot::from(1)), // Next block + (Duration::from_millis(1300), Slot::from(2)), // Next slot change + true, // Different authors + None, // Expected to reduce below minimum + )] + #[case::blocks_2s_same_author( + Duration::from_millis(2000), // Authoring duration + (Duration::from_millis(1400), Slot::from(1)), // Next block + (Duration::from_millis(1400), Slot::from(2)), // Next slot change + false, // Different authors + Some(Duration::from_millis(1400)), // Expected no adjustment for last second. + )] + // Various scenarios for 500ms block production adjustment. + #[case::blocks_500ms_fits_next_block( + Duration::from_millis(500), // Authoring duration + (Duration::from_millis(500), Slot::from(1)), // Next block + (Duration::from_millis(2000), Slot::from(2)), // Next slot change + true, // Different authors + Some(Duration::from_millis(500)), // Expected + )] + #[case::blocks_500ms_closer_next_slot( + Duration::from_millis(500), // Authoring duration + (Duration::from_millis(450), Slot::from(1)), // Next block + (Duration::from_millis(2000), Slot::from(2)), // Next slot change + true, // Different authors + Some(Duration::from_millis(450)), // Expected + )] + #[case::blocks_500ms_closer_next_slot_bigger( + Duration::from_millis(500), // Authoring duration + (Duration::from_millis(400), Slot::from(1)), // Next block + (Duration::from_millis(1500), Slot::from(2)), // Next slot change + true, // Different authors + Some(Duration::from_millis(400)), // Expected + )] + #[case::blocks_500ms_reduce_by_1s( + Duration::from_millis(500), // Authoring duration + (Duration::from_millis(500), Slot::from(1)), // Next block + (Duration::from_millis(1000), Slot::from(2)), // Next slot change + true, // Different authors + None, // Expected + )] + #[case::blocks_500ms_reduce_by_1s_closer( + Duration::from_millis(500), // Authoring duration + (Duration::from_millis(500), Slot::from(1)), // Next block + (Duration::from_millis(500), Slot::from(2)), // Next slot change + true, // Different authors + None, // Expected + )] + // If we are producing with 1 collator for 500ms authoring duration, + // we must produce the last two slots and ignore the 1s adjustment. + #[case::blocks_500ms_same_author( + Duration::from_millis(500), // Authoring duration + (Duration::from_millis(410), Slot::from(1)), // Next block + (Duration::from_millis(1000), Slot::from(2)), // Next slot change + false, // Different authors + Some(Duration::from_millis(410)), // Expected no adjustment for last second. + )] + #[case::blocks_500ms_same_author_closer( + Duration::from_millis(500), // Authoring duration + (Duration::from_millis(400), Slot::from(1)), // Next block + (Duration::from_millis(400), Slot::from(2)), // Next slot change + false, // Different authors + Some(Duration::from_millis(400)), // Expected no adjustment for last second. + )] + fn test_adjust_authoring_duration( + #[case] authoring_duration: Duration, + #[case] next_block: (Duration, Slot), + #[case] next_slot_change: (Duration, Slot), + #[case] different_authors: bool, + #[case] expected: Option, + ) { + sp_tracing::init_for_tests(); + + let result = adjust_authoring_duration( + authoring_duration, + next_block, + next_slot_change, + different_authors, + ); + tracing::debug!("Adjusted authoring duration: {:?}", result); + assert_eq!(result, expected); + } +} diff --git a/client/consensus/nimbus-consensus/src/collators/slot_based/tests.rs b/client/consensus/nimbus-consensus/src/collators/slot_based/tests.rs new file mode 100644 index 0000000..21afd96 --- /dev/null +++ b/client/consensus/nimbus-consensus/src/collators/slot_based/tests.rs @@ -0,0 +1,652 @@ +// Copyright Moonsong Labs +// This file is part of Moonkit. + +// Moonkit is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Moonkit is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Moonkit. If not, see . + +use super::{ + block_builder_task::{determine_core, offset_relay_parent_find_descendants}, + relay_chain_data_cache::{RelayChainData, RelayChainDataCache}, +}; +use async_trait::async_trait; +use cumulus_primitives_core::{ClaimQueueOffset, CoreInfo, CoreSelector, CumulusDigestItem}; +use cumulus_relay_chain_interface::*; +use futures::Stream; +use polkadot_node_subsystem_util::runtime::ClaimQueueSnapshot; +use polkadot_primitives::{ + CandidateEvent, CommittedCandidateReceiptV2, CoreIndex, Hash as RelayHash, + Header as RelayHeader, Id as ParaId, +}; +use sp_runtime::{generic::BlockId, testing::Header as TestHeader, traits::Header}; +use sp_version::RuntimeVersion; +use std::{ + collections::{BTreeMap, HashMap, VecDeque}, + pin::Pin, +}; + +#[tokio::test] +async fn offset_test_zero_offset() { + let (headers, best_hash) = create_header_chain(); + + let client = TestRelayClient::new(headers); + + let mut cache = RelayChainDataCache::new(client, 1.into()); + + let result = offset_relay_parent_find_descendants(&mut cache, best_hash, 0).await; + assert!(result.is_ok()); + let data = result.unwrap(); + assert_eq!(data.descendants_len(), 0); + assert_eq!(data.relay_parent().hash(), best_hash); + assert!(data.into_inherent_descendant_list().is_empty()); +} + +#[tokio::test] +async fn offset_test_two_offset() { + let (headers, best_hash) = create_header_chain(); + + let client = TestRelayClient::new(headers); + + let mut cache = RelayChainDataCache::new(client, 1.into()); + + let result = offset_relay_parent_find_descendants(&mut cache, best_hash, 2).await; + assert!(result.is_ok()); + let data = result.unwrap(); + assert_eq!(data.descendants_len(), 2); + assert_eq!(*data.relay_parent().number(), 98); + let descendant_list = data.into_inherent_descendant_list(); + assert_eq!(descendant_list.len(), 3); + assert_eq!(*descendant_list.first().unwrap().number(), 98); + assert_eq!(*descendant_list.last().unwrap().number(), 100); +} + +#[tokio::test] +async fn offset_test_five_offset() { + let (headers, best_hash) = create_header_chain(); + + let client = TestRelayClient::new(headers); + + let mut cache = RelayChainDataCache::new(client, 1.into()); + + let result = offset_relay_parent_find_descendants(&mut cache, best_hash, 5).await; + assert!(result.is_ok()); + let data = result.unwrap(); + assert_eq!(data.descendants_len(), 5); + assert_eq!(*data.relay_parent().number(), 95); + let descendant_list = data.into_inherent_descendant_list(); + assert_eq!(descendant_list.len(), 6); + assert_eq!(*descendant_list.first().unwrap().number(), 95); + assert_eq!(*descendant_list.last().unwrap().number(), 100); +} + +#[tokio::test] +async fn offset_test_too_long() { + let (headers, _best_hash) = create_header_chain(); + + let client = TestRelayClient::new(headers); + + let mut cache = RelayChainDataCache::new(client, 1.into()); + + let result = offset_relay_parent_find_descendants(&mut cache, _best_hash, 200).await; + assert!(result.is_err()); + + let result = offset_relay_parent_find_descendants(&mut cache, _best_hash, 101).await; + assert!(result.is_err()); +} + +#[tokio::test] +async fn determine_core_new_relay_parent() { + let (headers, _best_hash) = create_header_chain(); + let client = TestRelayClient::new(headers); + let mut cache = RelayChainDataCache::new(client, 1.into()); + + // Create a test relay parent header + let relay_parent = RelayHeader { + parent_hash: Default::default(), + number: 100, + state_root: Default::default(), + extrinsics_root: Default::default(), + digest: Default::default(), + }; + + // Create a test para parent header at block 0 (genesis) + let para_parent = TestHeader::new_from_number(0); + + // Setup claim queue data for the cache + cache.set_test_data(relay_parent.clone(), vec![CoreIndex(0), CoreIndex(1)]); + + let result = determine_core(&mut cache, &relay_parent, 1.into(), ¶_parent, 0).await; + + let core = result.unwrap(); + let core = core.unwrap(); + assert_eq!(core.core_selector(), CoreSelector(0)); + assert_eq!(core.core_index(), CoreIndex(0)); + assert_eq!(core.total_cores(), 2); +} + +#[tokio::test] +async fn determine_core_with_core_info() { + let (headers, best_hash) = create_header_chain(); + let client = TestRelayClient::new(headers); + let mut cache = RelayChainDataCache::new(client, 1.into()); + + // Create a test relay parent header + let relay_parent = RelayHeader { + parent_hash: best_hash, + number: 101, + state_root: Default::default(), + extrinsics_root: Default::default(), + digest: Default::default(), + }; + + // Create a para parent header with core info in digest + let core_info = CoreInfo { + selector: CoreSelector(0), + claim_queue_offset: ClaimQueueOffset(0), + number_of_cores: 3.into(), + }; + let mut digest = sp_runtime::generic::Digest::default(); + digest.push(CumulusDigestItem::CoreInfo(core_info).to_digest_item()); + // Add relay parent storage root to make it a non-new relay parent + digest.push(cumulus_primitives_core::rpsr_digest::relay_parent_storage_root_item( + *relay_parent.state_root(), + *relay_parent.number(), + )); + + let para_parent = TestHeader { + parent_hash: best_hash.into(), + number: 1, + state_root: Default::default(), + extrinsics_root: Default::default(), + digest, + }; + + // Setup claim queue data for the cache + cache.set_test_data(relay_parent.clone(), vec![CoreIndex(0), CoreIndex(1), CoreIndex(2)]); + + let result = determine_core(&mut cache, &relay_parent, 1.into(), ¶_parent, 0).await; + + match result { + Ok(Some(core)) => { + assert_eq!(core.core_selector(), CoreSelector(1)); // Should be next selector (0 + 1) + assert_eq!(core.core_index(), CoreIndex(1)); + assert_eq!(core.total_cores(), 3); + }, + Ok(None) => panic!("Expected Some core, got None"), + Err(()) => panic!("determine_core returned error"), + } +} + +#[tokio::test] +async fn determine_core_no_cores_available() { + let (headers, _best_hash) = create_header_chain(); + let client = TestRelayClient::new(headers); + let mut cache = RelayChainDataCache::new(client, 1.into()); + + // Create a test relay parent header + let relay_parent = RelayHeader { + parent_hash: Default::default(), + number: 100, + state_root: Default::default(), + extrinsics_root: Default::default(), + digest: Default::default(), + }; + + // Create a test para parent header at block 0 (genesis) + let para_parent = TestHeader::new_from_number(0); + + // Setup empty claim queue + cache.set_test_data(relay_parent.clone(), vec![]); + + let result = determine_core(&mut cache, &relay_parent, 1.into(), ¶_parent, 0).await; + + let core = result.unwrap(); + assert!(core.is_none()); +} + +#[tokio::test] +async fn determine_core_selector_overflow() { + let (headers, best_hash) = create_header_chain(); + let client = TestRelayClient::new(headers); + let mut cache = RelayChainDataCache::new(client, 1.into()); + + // Create a test relay parent header + let relay_parent = RelayHeader { + parent_hash: best_hash, + number: 101, + state_root: Default::default(), + extrinsics_root: Default::default(), + digest: Default::default(), + }; + + let core_info = CoreInfo { + selector: CoreSelector(1), + claim_queue_offset: ClaimQueueOffset(0), + number_of_cores: 2.into(), + }; + let mut digest = sp_runtime::generic::Digest::default(); + digest.push(CumulusDigestItem::CoreInfo(core_info).to_digest_item()); + // Add relay parent storage root to make it a non-new relay parent + digest.push(cumulus_primitives_core::rpsr_digest::relay_parent_storage_root_item( + *relay_parent.state_root(), + *relay_parent.number(), + )); + + let para_parent = TestHeader { + parent_hash: best_hash.into(), + number: 1, + state_root: Default::default(), + extrinsics_root: Default::default(), + digest, + }; + + // Setup claim queue with only 2 cores + cache.set_test_data(relay_parent.clone(), vec![CoreIndex(0), CoreIndex(1)]); + + let result = determine_core(&mut cache, &relay_parent, 1.into(), ¶_parent, 0).await; + + let core = result.unwrap(); + assert!(core.is_none()); // Should return None when selector overflows +} + +#[tokio::test] +async fn determine_core_uses_last_claimed_core_selector() { + let (headers, best_hash) = create_header_chain(); + let client = TestRelayClient::new(headers); + let mut cache = RelayChainDataCache::new(client, 1.into()); + + // Create a test relay parent header + let relay_parent = RelayHeader { + parent_hash: best_hash, + number: 101, + state_root: Default::default(), + extrinsics_root: Default::default(), + digest: Default::default(), + }; + + // Create a para parent header without core info in digest (non-genesis) + // Need to add relay parent storage root to digest to make it a non-new relay parent + let mut digest = sp_runtime::generic::Digest::default(); + digest.push(cumulus_primitives_core::rpsr_digest::relay_parent_storage_root_item( + *relay_parent.state_root(), + *relay_parent.number(), + )); + + let para_parent = TestHeader { + parent_hash: best_hash.into(), + number: 1, + state_root: Default::default(), + extrinsics_root: Default::default(), + digest, + }; + + // Setup claim queue data with last_claimed_core_selector set to 1 + cache.set_test_data_with_last_selector( + relay_parent.clone(), + vec![CoreIndex(0), CoreIndex(1), CoreIndex(2)], + Some(CoreSelector(1)), + ); + + let result = determine_core(&mut cache, &relay_parent, 1.into(), ¶_parent, 0).await; + + match result { + Ok(Some(core)) => { + // Should use last_claimed_core_selector (1) + 1 = 2 + assert_eq!(core.core_selector(), CoreSelector(2)); + assert_eq!(core.core_index(), CoreIndex(2)); + assert_eq!(core.total_cores(), 3); + }, + Ok(None) => panic!("Expected Some core, got None"), + Err(()) => panic!("determine_core returned error"), + } +} + +#[tokio::test] +async fn determine_core_uses_last_claimed_core_selector_wraps_around() { + let (headers, best_hash) = create_header_chain(); + let client = TestRelayClient::new(headers); + let mut cache = RelayChainDataCache::new(client, 1.into()); + + // Create a test relay parent header + let relay_parent = RelayHeader { + parent_hash: best_hash, + number: 101, + state_root: Default::default(), + extrinsics_root: Default::default(), + digest: Default::default(), + }; + + // Create a para parent header without core info in digest (non-genesis) + // Need to add relay parent storage root to digest to make it a non-new relay parent + let mut digest = sp_runtime::generic::Digest::default(); + digest.push(cumulus_primitives_core::rpsr_digest::relay_parent_storage_root_item( + *relay_parent.state_root(), + *relay_parent.number(), + )); + + let para_parent = TestHeader { + parent_hash: best_hash.into(), + number: 1, + state_root: Default::default(), + extrinsics_root: Default::default(), + digest, + }; + + // Setup claim queue data with last_claimed_core_selector set to 2 (last index) + // Next selector should wrap around to out of bounds and return None + cache.set_test_data_with_last_selector( + relay_parent.clone(), + vec![CoreIndex(0), CoreIndex(1), CoreIndex(2)], + Some(CoreSelector(2)), + ); + + let result = determine_core(&mut cache, &relay_parent, 1.into(), ¶_parent, 0).await; + + match result { + Ok(Some(_)) => panic!("Expected None due to selector overflow"), + Ok(None) => { + // This is expected - selector 2 + 1 = 3, but only cores 0,1,2 available + }, + Err(()) => panic!("determine_core returned error"), + } +} + +#[tokio::test] +async fn determine_core_no_last_claimed_core_selector() { + let (headers, best_hash) = create_header_chain(); + let client = TestRelayClient::new(headers); + let mut cache = RelayChainDataCache::new(client, 1.into()); + + // Create a test relay parent header + let relay_parent = RelayHeader { + parent_hash: best_hash, + number: 101, + state_root: Default::default(), + extrinsics_root: Default::default(), + digest: Default::default(), + }; + + // Create a para parent header without core info in digest (non-genesis) + // Need to add relay parent storage root to digest to make it a non-new relay parent + let mut digest = sp_runtime::generic::Digest::default(); + digest.push(cumulus_primitives_core::rpsr_digest::relay_parent_storage_root_item( + *relay_parent.state_root(), + *relay_parent.number(), + )); + + let para_parent = TestHeader { + parent_hash: best_hash.into(), + number: 1, + state_root: Default::default(), + extrinsics_root: Default::default(), + digest, + }; + + // Setup claim queue data with no last_claimed_core_selector (None) + cache.set_test_data_with_last_selector( + relay_parent.clone(), + vec![CoreIndex(0), CoreIndex(1), CoreIndex(2)], + None, + ); + + let result = determine_core(&mut cache, &relay_parent, 1.into(), ¶_parent, 0).await; + + match result { + Ok(Some(core)) => { + // Should start from selector 0 + 1 = 1 when no last selector + assert_eq!(core.core_selector(), CoreSelector(1)); + assert_eq!(core.core_index(), CoreIndex(1)); + assert_eq!(core.total_cores(), 3); + }, + Ok(None) => panic!("Expected Some core, got None"), + Err(()) => panic!("determine_core returned error"), + } +} + +#[derive(Clone)] +struct TestRelayClient { + headers: HashMap, +} + +impl TestRelayClient { + fn new(headers: HashMap) -> Self { + Self { headers } + } +} + +#[async_trait] +impl RelayChainInterface for TestRelayClient { + async fn validators(&self, _: RelayHash) -> RelayChainResult> { + unimplemented!("Not needed for test") + } + + async fn best_block_hash(&self) -> RelayChainResult { + unimplemented!("Not needed for test") + } + async fn finalized_block_hash(&self) -> RelayChainResult { + unimplemented!("Not needed for test") + } + + async fn retrieve_dmq_contents( + &self, + _: ParaId, + _: RelayHash, + ) -> RelayChainResult> { + unimplemented!("Not needed for test") + } + + async fn retrieve_all_inbound_hrmp_channel_contents( + &self, + _: ParaId, + _: RelayHash, + ) -> RelayChainResult>> { + unimplemented!("Not needed for test") + } + + async fn persisted_validation_data( + &self, + _: RelayHash, + _: ParaId, + _: OccupiedCoreAssumption, + ) -> RelayChainResult> { + use cumulus_primitives_core::PersistedValidationData; + Ok(Some(PersistedValidationData { + parent_head: Default::default(), + relay_parent_number: 100, + relay_parent_storage_root: Default::default(), + max_pov_size: 1024 * 1024, + })) + } + + async fn validation_code_hash( + &self, + _: RelayHash, + _: ParaId, + _: OccupiedCoreAssumption, + ) -> RelayChainResult> { + unimplemented!("Not needed for test") + } + + async fn candidate_pending_availability( + &self, + _: RelayHash, + _: ParaId, + ) -> RelayChainResult> { + unimplemented!("Not needed for test") + } + + async fn candidates_pending_availability( + &self, + _: RelayHash, + _: ParaId, + ) -> RelayChainResult> { + unimplemented!("Not needed for test") + } + + async fn session_index_for_child(&self, _: RelayHash) -> RelayChainResult { + unimplemented!("Not needed for test") + } + + async fn import_notification_stream( + &self, + ) -> RelayChainResult + Send>>> { + unimplemented!("Not needed for test") + } + + async fn finality_notification_stream( + &self, + ) -> RelayChainResult + Send>>> { + unimplemented!("Not needed for test") + } + + async fn is_major_syncing(&self) -> RelayChainResult { + unimplemented!("Not needed for test") + } + + fn overseer_handle(&self) -> RelayChainResult { + unimplemented!("Not needed for test") + } + + async fn get_storage_by_key( + &self, + _: RelayHash, + _: &[u8], + ) -> RelayChainResult> { + unimplemented!("Not needed for test") + } + + async fn prove_read( + &self, + _: RelayHash, + _: &Vec>, + ) -> RelayChainResult { + unimplemented!("Not needed for test") + } + + async fn wait_for_block(&self, _: RelayHash) -> RelayChainResult<()> { + unimplemented!("Not needed for test") + } + + async fn new_best_notification_stream( + &self, + ) -> RelayChainResult + Send>>> { + unimplemented!("Not needed for test") + } + + async fn header( + &self, + block_id: BlockId, + ) -> RelayChainResult> { + let hash = match block_id { + BlockId::Hash(hash) => hash, + BlockId::Number(_) => unimplemented!("Not needed for test"), + }; + let header = self.headers.get(&hash); + + Ok(header.cloned()) + } + + async fn availability_cores( + &self, + _relay_parent: RelayHash, + ) -> RelayChainResult>> { + unimplemented!("Not needed for test"); + } + + async fn version(&self, _: RelayHash) -> RelayChainResult { + unimplemented!("Not needed for test"); + } + + async fn claim_queue( + &self, + _: RelayHash, + ) -> RelayChainResult>> { + // Return empty claim queue for offset tests + Ok(BTreeMap::new()) + } + + async fn call_runtime_api( + &self, + _method_name: &'static str, + _hash: RelayHash, + _payload: &[u8], + ) -> RelayChainResult> { + unimplemented!("Not needed for test") + } + + async fn scheduling_lookahead(&self, _: RelayHash) -> RelayChainResult { + unimplemented!("Not needed for test") + } + + async fn candidate_events(&self, _: RelayHash) -> RelayChainResult> { + unimplemented!("Not needed for test") + } +} + +fn create_header_chain() -> (HashMap, RelayHash) { + let mut headers = HashMap::new(); + let mut current_parent = None; + let mut header_hash = RelayHash::repeat_byte(0x1); + + // Create chain from highest to lowest number + for number in 1..=100 { + let mut header = RelayHeader { + parent_hash: Default::default(), + number, + state_root: Default::default(), + extrinsics_root: Default::default(), + digest: Default::default(), + }; + if let Some(hash) = current_parent { + header.parent_hash = hash; + } + + header_hash = header.hash(); + // Store header and update parent for next iteration + headers.insert(header_hash, header.clone()); + current_parent = Some(header_hash); + } + + (headers, header_hash) +} + +// Test extension for RelayChainDataCache +impl RelayChainDataCache { + fn set_test_data(&mut self, relay_parent_header: RelayHeader, cores: Vec) { + self.set_test_data_with_last_selector(relay_parent_header, cores, None); + } + + fn set_test_data_with_last_selector( + &mut self, + relay_parent_header: RelayHeader, + cores: Vec, + last_claimed_core_selector: Option, + ) { + let relay_parent_hash = relay_parent_header.hash(); + + let mut claim_queue = BTreeMap::new(); + for core_index in cores { + claim_queue.insert(core_index, [ParaId::from(1)].into()); + } + + let claim_queue_snapshot = ClaimQueueSnapshot::from(claim_queue); + + let data = RelayChainData { + relay_parent_header, + claim_queue: claim_queue_snapshot, + max_pov_size: 1024 * 1024, + last_claimed_core_selector, + }; + + self.insert_test_data(relay_parent_hash, data); + } +} diff --git a/client/consensus/nimbus-consensus/src/lib.rs b/client/consensus/nimbus-consensus/src/lib.rs index ecd569b..039d884 100644 --- a/client/consensus/nimbus-consensus/src/lib.rs +++ b/client/consensus/nimbus-consensus/src/lib.rs @@ -20,6 +20,7 @@ //! stored in its keystore are eligible to author at this slot. If it has an eligible //! key it authors. +mod collator; pub mod collators; mod import_queue; @@ -28,24 +29,20 @@ mod manual_seal; pub use import_queue::import_queue; pub use manual_seal::NimbusManualSealConsensusDataProvider; -use cumulus_client_parachain_inherent::ParachainInherentDataProvider; -use cumulus_primitives_core::{ - relay_chain::{Hash as PHash, Header as PHeader}, - ParaId, PersistedValidationData, -}; -use cumulus_primitives_parachain_inherent::ParachainInherentData; -use cumulus_relay_chain_interface::RelayChainInterface; -use futures::prelude::*; +use cumulus_primitives_core::{relay_chain::Header as PHeader, PersistedValidationData}; use log::{info, warn}; use nimbus_primitives::{NimbusApi, NimbusId, NIMBUS_KEY_ID}; +use polkadot_node_primitives::PoV; +use polkadot_primitives::HeadData; +use polkadot_primitives::{BlockNumber as RBlockNumber, Hash as RHash}; use sc_consensus::BlockImport; use sc_network_types::PeerId; use sp_api::ProvideRuntimeApi; use sp_application_crypto::ByteArray; -use sp_inherents::{CreateInherentDataProviders, InherentData, InherentDataProvider}; +use sp_core::Encode; use sp_keystore::{Keystore, KeystorePtr}; -use sp_runtime::traits::{Block as BlockT, Header as HeaderT}; -use std::error::Error; +use sp_runtime::traits::{Block as BlockT, Header as HeaderT, NumberFor}; +use std::{error::Error, fs, path::PathBuf}; const LOG_TARGET: &str = "filtering-consensus"; @@ -99,60 +96,6 @@ where } } -/// Explicitly creates the inherent data for parachain block authoring and overrides -/// the timestamp inherent data with the one provided, if any. -pub(crate) async fn create_inherent_data( - create_inherent_data_providers: &CIDP, - para_id: ParaId, - parent: Block::Hash, - validation_data: &PersistedValidationData, - relay_client: &RClient, - relay_parent: PHash, - author_id: NimbusId, - timestamp: impl Into>, - collator_peer_id: PeerId, - additional_relay_state_keys: Vec>, -) -> Result<(ParachainInherentData, InherentData), Box> -where - Block: BlockT, - CIDP: CreateInherentDataProviders + 'static, - RClient: RelayChainInterface + Send + Clone + 'static, -{ - let paras_inherent_data = ParachainInherentDataProvider::create_at( - relay_parent, - relay_client, - validation_data, - para_id, - vec![], - additional_relay_state_keys, - collator_peer_id, - ) - .await; - - let paras_inherent_data = match paras_inherent_data { - Some(p) => p, - None => { - return Err( - format!("Could not create paras inherent data at {:?}", relay_parent).into(), - ) - } - }; - - let mut other_inherent_data = create_inherent_data_providers - .create_inherent_data_providers(parent, (relay_parent, validation_data.clone(), author_id)) - .map_err(|e| e as Box) - .await? - .create_inherent_data() - .await - .map_err(Box::new)?; - - if let Some(timestamp) = timestamp.into() { - other_inherent_data.replace_data(sp_timestamp::INHERENT_IDENTIFIER, ×tamp); - } - - Ok((paras_inherent_data, other_inherent_data)) -} - /// Grabs any available nimbus key from the keystore. /// This may be useful in situations where you expect exactly one key /// and intend to perform an operation with it regardless of whether it is @@ -233,3 +176,42 @@ where maybe_key } + +/// Export the given `pov` to the file system at `path`. +/// +/// The file will be named `block_hash_block_number.pov`. +/// +/// The `parent_header`, `relay_parent_storage_root` and `relay_parent_number` will also be +/// stored in the file alongside the `pov`. This enables stateless validation of the `pov`. +pub(crate) fn export_pov_to_path( + path: PathBuf, + pov: PoV, + block_hash: Block::Hash, + block_number: NumberFor, + parent_header: Block::Header, + relay_parent_storage_root: RHash, + relay_parent_number: RBlockNumber, + max_pov_size: u32, +) { + if let Err(error) = fs::create_dir_all(&path) { + tracing::error!(target: LOG_TARGET, %error, path = %path.display(), "Failed to create PoV export directory"); + return; + } + + let mut file = match fs::File::create(path.join(format!("{block_hash:?}_{block_number}.pov"))) { + Ok(f) => f, + Err(error) => { + tracing::error!(target: LOG_TARGET, %error, "Failed to export PoV."); + return; + } + }; + + pov.encode_to(&mut file); + PersistedValidationData { + parent_head: HeadData(parent_header.encode()), + relay_parent_number, + relay_parent_storage_root, + max_pov_size, + } + .encode_to(&mut file); +} diff --git a/pallets/randomness/src/vrf.rs b/pallets/randomness/src/vrf.rs index 5734cc4..611bf60 100644 --- a/pallets/randomness/src/vrf.rs +++ b/pallets/randomness/src/vrf.rs @@ -53,6 +53,7 @@ pub(crate) fn verify_and_set_output() { fn get_and_verify_randomness() -> T::Hash { let mut block_author_vrf_id: Option = None; + // Get VrfOutput and VrfProof from system digests // Expect client to insert VrfOutput, VrfProof into digests by setting // `BuildNimbusConsensusParams.additional_digests_provider` to `moonbeam_vrf::vrf_pre_digest` diff --git a/template/node/src/service.rs b/template/node/src/service.rs index d37b4ea..9312e98 100644 --- a/template/node/src/service.rs +++ b/template/node/src/service.rs @@ -9,6 +9,7 @@ use moonkit_template_runtime::{ RuntimeApi, }; +use nimbus_consensus::collators::slot_based::SlotBasedBlockImport; use nimbus_consensus::NimbusManualSealConsensusDataProvider; // Cumulus Imports @@ -25,10 +26,10 @@ use cumulus_client_service::{ use cumulus_primitives_core::CollectCollationInfo; use cumulus_primitives_core::ParaId; use cumulus_relay_chain_inprocess_interface::build_inprocess_relay_chain; -use cumulus_relay_chain_interface::{OverseerHandle, RelayChainInterface, RelayChainResult}; +use cumulus_relay_chain_interface::{RelayChainInterface, RelayChainResult}; use cumulus_relay_chain_minimal_node::build_minimal_relay_chain_node_with_rpc; -use polkadot_primitives::UpgradeGoAhead; +use polkadot_primitives::{UpgradeGoAhead, ValidationCode}; use polkadot_service::CollatorPair; // Substrate Imports @@ -47,7 +48,6 @@ use sc_network::{ use sc_service::{Configuration, PartialComponents, TFullBackend, TFullClient, TaskManager}; use sc_telemetry::{Telemetry, TelemetryHandle, TelemetryWorker, TelemetryWorkerHandle}; use sp_api::ProvideRuntimeApi; -use sp_consensus_slots::SlotDuration; use sp_keystore::KeystorePtr; use sp_runtime::traits::Block as BlockT; use substrate_prometheus_endpoint::Registry; @@ -348,6 +348,7 @@ where if validator { start_consensus( client.clone(), + backend.clone(), block_import, prometheus_registry.as_ref(), telemetry.as_ref().map(|t| t.handle()), @@ -358,7 +359,6 @@ where para_id, collator_key.expect("Command line arguments do not allow this. qed"), peer_id, - overseer_handle, announce_block, force_authoring, max_pov_percentage, @@ -370,6 +370,7 @@ where fn start_consensus( client: Arc, + backend: Arc, block_import: ParachainBlockImport, prometheus_registry: Option<&Registry>, telemetry: Option, @@ -380,7 +381,6 @@ fn start_consensus( para_id: ParaId, collator_key: CollatorPair, collator_peer_id: PeerId, - overseer_handle: OverseerHandle, announce_block: Arc>) + Send + Sync>, force_authoring: bool, max_pov_percentage: u8, @@ -400,33 +400,54 @@ fn start_consensus( client.clone(), ); - let params = nimbus_consensus::collators::basic::Params { + let client_for_nimbus = client.clone(); + + let params = nimbus_consensus::collators::slot_based::Params { + additional_digests_provider: (), para_id, - overseer_handle, proposer, - create_inherent_data_providers: move |_, _| async move { Ok(()) }, - block_import, + create_inherent_data_providers: move |_, ()| async move { Ok(()) }, + block_import: block_import.clone(), relay_client: relay_chain_interface, - para_client: client, + para_client: client.clone(), keystore, collator_service, force_authoring, - max_pov_percentage, - additional_digests_provider: (), + max_pov_percentage: Some(max_pov_percentage as u32), collator_key, collator_peer_id, additional_relay_state_keys: vec![], authoring_duration: Duration::from_millis(500), relay_chain_slot_duration: Duration::from_millis(6_000), - slot_duration: Some(SlotDuration::from_millis(6_000)), + para_slot_duration: Some(Duration::from_millis(6_000)), + slot_offset: Duration::from_millis(6_000), + reinitialize: false, + para_backend: backend, + code_hash_provider: move |block_hash| { + client_for_nimbus + .code_at(block_hash) + .ok() + .map(|c| ValidationCode::from(c).hash()) + }, + block_import_handle: SlotBasedBlockImport::new(block_import, client).1, + spawner: task_manager.spawn_essential_handle(), + export_pov: None, }; - let fut = nimbus_consensus::collators::basic::run::( - params, - ); - task_manager - .spawn_essential_handle() - .spawn("nimbus", None, fut); + nimbus_consensus::collators::slot_based::run::< + Block, + nimbus_primitives::NimbusPair, + _, + _, + _, + _, + _, + _, + _, + _, + _, + _, + >(params); Ok(()) } diff --git a/template/runtime/src/lib.rs b/template/runtime/src/lib.rs index a3a2e9e..54544e7 100644 --- a/template/runtime/src/lib.rs +++ b/template/runtime/src/lib.rs @@ -859,6 +859,12 @@ impl_runtime_apis! { } } + impl cumulus_primitives_core::RelayParentOffsetApi for Runtime { + fn relay_parent_offset() -> u32 { + 2 + } + } + #[cfg(feature = "runtime-benchmarks")] impl frame_benchmarking::Benchmark for Runtime { fn benchmark_metadata(extra: bool) -> (