>
+ + ParachainBlockImportMarker
+ Send
+ Sync
+ 'static,
@@ -83,7 +85,7 @@ where
CIDP::InherentDataProviders: InherentDataProviderExt + Send + Sync,
{
sc_consensus_aura::import_queue::(sc_consensus_aura::ImportQueueParams {
- block_import: cumulus_client_consensus_common::ParachainBlockImport::new(block_import),
+ block_import,
justification_import: None,
client,
create_inherent_data_providers,
@@ -91,6 +93,7 @@ where
registry,
check_for_equivocation: sc_consensus_aura::CheckForEquivocation::No,
telemetry,
+ compatibility_mode: CompatibilityMode::None,
})
}
@@ -105,16 +108,17 @@ pub struct BuildVerifierParams {
}
/// Build the [`AuraVerifier`].
-pub fn build_verifier(
+pub fn build_verifier
(
BuildVerifierParams { client, create_inherent_data_providers, telemetry }: BuildVerifierParams<
C,
CIDP,
>,
-) -> AuraVerifier {
+) -> AuraVerifier {
sc_consensus_aura::build_verifier(sc_consensus_aura::BuildVerifierParams {
client,
create_inherent_data_providers,
telemetry,
check_for_equivocation: sc_consensus_aura::CheckForEquivocation::No,
+ compatibility_mode: CompatibilityMode::None,
})
}
diff --git a/client/consensus/aura/src/lib.rs b/client/consensus/aura/src/lib.rs
index 1be562e37cb..965f8fe3baa 100644
--- a/client/consensus/aura/src/lib.rs
+++ b/client/consensus/aura/src/lib.rs
@@ -24,7 +24,7 @@
use codec::{Decode, Encode};
use cumulus_client_consensus_common::{
- ParachainBlockImport, ParachainCandidate, ParachainConsensus,
+ ParachainBlockImportMarker, ParachainCandidate, ParachainConsensus,
};
use cumulus_primitives_core::{relay_chain::v2::Hash as PHash, PersistedValidationData};
@@ -39,7 +39,7 @@ use sp_blockchain::HeaderBackend;
use sp_consensus::{EnableProofRecording, Environment, ProofRecording, Proposer, SyncOracle};
use sp_consensus_aura::{AuraApi, SlotDuration};
use sp_core::crypto::Pair;
-use sp_inherents::{CreateInherentDataProviders, InherentData, InherentDataProvider};
+use sp_inherents::CreateInherentDataProviders;
use sp_keystore::SyncCryptoStorePtr;
use sp_runtime::traits::{Block as BlockT, Header as HeaderT, Member, NumberFor};
use std::{convert::TryFrom, hash::Hash, marker::PhantomData, sync::Arc};
@@ -71,6 +71,22 @@ impl Clone for AuraConsensus {
}
}
+/// Parameters of [`AuraConsensus::build`].
+pub struct BuildAuraConsensusParams {
+ pub proposer_factory: PF,
+ pub create_inherent_data_providers: CIDP,
+ pub block_import: BI,
+ pub para_client: Arc,
+ pub backoff_authoring_blocks: Option,
+ pub sync_oracle: SO,
+ pub keystore: SyncCryptoStorePtr,
+ pub force_authoring: bool,
+ pub slot_duration: SlotDuration,
+ pub telemetry: Option,
+ pub block_proposal_slot_portion: SlotProportion,
+ pub max_block_proposal_slot_portion: Option,
+}
+
impl AuraConsensus
where
B: BlockT,
@@ -98,7 +114,11 @@ where
Client:
ProvideRuntimeApi + BlockOf + AuxStore + HeaderBackend + Send + Sync + 'static,
Client::Api: AuraApi,
- BI: BlockImport> + Send + Sync + 'static,
+ BI: BlockImport>
+ + ParachainBlockImportMarker
+ + Send
+ + Sync
+ + 'static,
SO: SyncOracle + Send + Sync + Clone + 'static,
BS: BackoffAuthoringBlocksStrategy> + Send + Sync + 'static,
PF: Environment + Send + Sync + 'static,
@@ -117,7 +137,7 @@ where
let worker = sc_consensus_aura::build_aura_worker:: (
BuildAuraWorkerParams {
client: para_client,
- block_import: ParachainBlockImport::new(block_import),
+ block_import,
justification_sync_link: (),
proposer_factory,
sync_oracle,
@@ -127,6 +147,7 @@ where
telemetry,
block_proposal_slot_portion,
max_block_proposal_slot_portion,
+ compatibility_mode: sc_consensus_aura::CompatibilityMode::None,
},
);
@@ -153,9 +174,8 @@ where
parent: B::Hash,
validation_data: &PersistedValidationData,
relay_parent: PHash,
- ) -> Option<(InherentData, CIDP::InherentDataProviders)> {
- let inherent_data_providers = self
- .create_inherent_data_providers
+ ) -> Option {
+ self.create_inherent_data_providers
.create_inherent_data_providers(parent, (relay_parent, validation_data.clone()))
.await
.map_err(|e| {
@@ -165,19 +185,7 @@ where
"Failed to create inherent data providers.",
)
})
- .ok()?;
-
- inherent_data_providers
- .create_inherent_data()
- .map_err(|e| {
- tracing::error!(
- target: LOG_TARGET,
- error = ?e,
- "Failed to create inherent data.",
- )
- })
.ok()
- .map(|d| (d, inherent_data_providers))
}
}
@@ -196,12 +204,12 @@ where
relay_parent: PHash,
validation_data: &PersistedValidationData,
) -> Option> {
- let (inherent_data, inherent_data_providers) =
+ let inherent_data_providers =
self.inherent_data(parent.hash(), validation_data, relay_parent).await?;
let info = SlotInfo::new(
inherent_data_providers.slot(),
- inherent_data,
+ Box::new(inherent_data_providers),
self.slot_duration.as_duration(),
parent.clone(),
// Set the block limit to 50% of the maximum PoV size.
@@ -216,19 +224,3 @@ where
Some(ParachainCandidate { block: res.block, proof: res.storage_proof })
}
}
-
-/// Parameters of [`AuraConsensus::build`].
-pub struct BuildAuraConsensusParams {
- pub proposer_factory: PF,
- pub create_inherent_data_providers: CIDP,
- pub block_import: BI,
- pub para_client: Arc,
- pub backoff_authoring_blocks: Option,
- pub sync_oracle: SO,
- pub keystore: SyncCryptoStorePtr,
- pub force_authoring: bool,
- pub slot_duration: SlotDuration,
- pub telemetry: Option,
- pub block_proposal_slot_portion: SlotProportion,
- pub max_block_proposal_slot_portion: Option,
-}
diff --git a/client/consensus/common/Cargo.toml b/client/consensus/common/Cargo.toml
index dcf105630df..a17a6b1f18a 100644
--- a/client/consensus/common/Cargo.toml
+++ b/client/consensus/common/Cargo.toml
@@ -6,31 +6,34 @@ authors = ["Parity Technologies "]
edition = "2021"
[dependencies]
-async-trait = "0.1.57"
+async-trait = "0.1.59"
codec = { package = "parity-scale-codec", version = "3.0.0", features = [ "derive" ] }
-dyn-clone = "1.0.9"
-futures = "0.3.24"
+dyn-clone = "1.0.10"
+futures = "0.3.25"
+log = "0.4.17"
tracing = "0.1.37"
# Substrate
-sc-client-api = { git = "https://github.com/paritytech/substrate", branch = "polkadot-v0.9.31" }
-sc-consensus = { git = "https://github.com/paritytech/substrate", branch = "polkadot-v0.9.31" }
-sp-blockchain = { git = "https://github.com/paritytech/substrate", branch = "polkadot-v0.9.31" }
-sp-consensus = { git = "https://github.com/paritytech/substrate", branch = "polkadot-v0.9.31" }
-sp-runtime = { git = "https://github.com/paritytech/substrate", branch = "polkadot-v0.9.31" }
-sp-trie = { git = "https://github.com/paritytech/substrate", branch = "polkadot-v0.9.31" }
+sc-client-api = { git = "https://github.com/paritytech/substrate", branch = "polkadot-v0.9.36" }
+sc-consensus = { git = "https://github.com/paritytech/substrate", branch = "polkadot-v0.9.36" }
+sp-blockchain = { git = "https://github.com/paritytech/substrate", branch = "polkadot-v0.9.36" }
+sp-consensus = { git = "https://github.com/paritytech/substrate", branch = "polkadot-v0.9.36" }
+sp-runtime = { git = "https://github.com/paritytech/substrate", branch = "polkadot-v0.9.36" }
+sp-trie = { git = "https://github.com/paritytech/substrate", branch = "polkadot-v0.9.36" }
# Polkadot
-polkadot-primitives = { git = "https://github.com/paritytech/polkadot", branch = "release-v0.9.31" }
+polkadot-primitives = { git = "https://github.com/paritytech/polkadot", branch = "release-v0.9.36" }
# Cumulus
+cumulus-primitives-core = { path = "../../../primitives/core" }
cumulus-relay-chain-interface = { path = "../../relay-chain-interface" }
+cumulus-client-pov-recovery = { path = "../../pov-recovery" }
[dev-dependencies]
futures-timer = "3.0.2"
# Substrate
-sp-tracing = { git = "https://github.com/paritytech/substrate", branch = "polkadot-v0.9.31" }
+sp-tracing = { git = "https://github.com/paritytech/substrate", branch = "polkadot-v0.9.36" }
# Cumulus
cumulus-test-client = { path = "../../../test/client" }
diff --git a/client/consensus/common/src/level_monitor.rs b/client/consensus/common/src/level_monitor.rs
new file mode 100644
index 00000000000..294527f1f9f
--- /dev/null
+++ b/client/consensus/common/src/level_monitor.rs
@@ -0,0 +1,378 @@
+// Copyright 2022 Parity Technologies (UK) Ltd.
+// This file is part of Cumulus.
+
+// Cumulus is free software: you can redistribute it and/or modify
+// it under the terms of the GNU General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+
+// Cumulus is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU General Public License for more details.
+
+// You should have received a copy of the GNU General Public License
+// along with Cumulus. If not, see .
+
+use sc_client_api::{blockchain::Backend as _, Backend, HeaderBackend as _};
+use sp_blockchain::{HashAndNumber, TreeRoute};
+use sp_runtime::traits::{Block as BlockT, NumberFor, One, Saturating, UniqueSaturatedInto, Zero};
+use std::{
+ collections::{HashMap, HashSet},
+ sync::Arc,
+};
+
+/// Value good enough to be used with parachains using the current backend implementation
+/// that ships with Substrate. This value may change in the future.
+pub const MAX_LEAVES_PER_LEVEL_SENSIBLE_DEFAULT: usize = 32;
+
+// Counter threshold after which we are going to eventually cleanup our internal data.
+const CLEANUP_THRESHOLD: u32 = 32;
+
+/// Upper bound to the number of leaves allowed for each level of the blockchain.
+///
+/// If the limit is set and more leaves are detected on block import, then the older ones are
+/// dropped to make space for the fresh blocks.
+///
+/// In environments where blocks confirmations from the relay chain may be "slow", then
+/// setting an upper bound helps keeping the chain health by dropping old (presumably) stale
+/// leaves and prevents discarding new blocks because we've reached the backend max value.
+pub enum LevelLimit {
+ /// Limit set to [`MAX_LEAVES_PER_LEVEL_SENSIBLE_DEFAULT`].
+ Default,
+ /// No explicit limit, however a limit may be implicitly imposed by the backend implementation.
+ None,
+ /// Custom value.
+ Some(usize),
+}
+
+/// Support structure to constrain the number of leaves at each level.
+pub struct LevelMonitor {
+ // Max number of leaves for each level.
+ level_limit: usize,
+ // Monotonic counter used to keep track of block freshness.
+ pub(crate) import_counter: NumberFor,
+ // Map between blocks hashes and freshness.
+ pub(crate) freshness: HashMap>,
+ // Blockchain levels cache.
+ pub(crate) levels: HashMap, HashSet>,
+ // Lower level number stored by the levels map.
+ lowest_level: NumberFor,
+ // Backend reference to remove blocks on level saturation.
+ backend: Arc,
+}
+
+/// Contains information about the target scheduled for removal.
+struct TargetInfo {
+ /// Index of freshest leaf in the leaves array.
+ freshest_leaf_idx: usize,
+ /// Route from target to its freshest leaf.
+ freshest_route: TreeRoute,
+}
+
+impl LevelMonitor
+where
+ Block: BlockT,
+ BE: Backend,
+{
+ /// Instance a new monitor structure.
+ pub fn new(level_limit: usize, backend: Arc) -> Self {
+ let mut monitor = LevelMonitor {
+ level_limit,
+ import_counter: Zero::zero(),
+ freshness: HashMap::new(),
+ levels: HashMap::new(),
+ lowest_level: Zero::zero(),
+ backend,
+ };
+ monitor.restore();
+ monitor
+ }
+
+ /// Restore the structure using the backend.
+ ///
+ /// Blocks freshness values are inferred from the height and not from the effective import
+ /// moment. This is a not accurate but "good-enough" best effort solution.
+ ///
+ /// Level limits are not enforced during this phase.
+ fn restore(&mut self) {
+ let info = self.backend.blockchain().info();
+ log::debug!(
+ target: "parachain",
+ "Restoring chain level monitor from last finalized block: {} {}",
+ info.finalized_number, info.finalized_hash
+ );
+
+ self.lowest_level = info.finalized_number;
+ self.import_counter = info.finalized_number;
+ self.block_imported(info.finalized_number, info.finalized_hash);
+
+ let mut counter_max = info.finalized_number;
+
+ for leaf in self.backend.blockchain().leaves().unwrap_or_default() {
+ let route =
+ sp_blockchain::tree_route(self.backend.blockchain(), info.finalized_hash, leaf)
+ .expect("Route from finalized to leaf should be available; qed");
+ if !route.retracted().is_empty() {
+ continue
+ }
+ route.enacted().iter().for_each(|elem| {
+ if !self.freshness.contains_key(&elem.hash) {
+ // Use the block height value as the freshness.
+ self.import_counter = elem.number;
+ self.block_imported(elem.number, elem.hash);
+ }
+ });
+ counter_max = std::cmp::max(self.import_counter, counter_max);
+ }
+
+ log::debug!(target: "parachain", "Restored chain level monitor up to height {}", counter_max);
+
+ self.import_counter = counter_max;
+ }
+
+ /// Check and enforce the limit bound at the given height.
+ ///
+ /// In practice this will enforce the given height in having a number of blocks less than
+ /// the limit passed to the constructor.
+ ///
+ /// If the given level is found to have a number of blocks greater than or equal the limit
+ /// then the limit is enforced by chosing one (or more) blocks to remove.
+ ///
+ /// The removal strategy is driven by the block freshness.
+ ///
+ /// A block freshness is determined by the most recent leaf freshness descending from the block
+ /// itself. In other words its freshness is equal to its more "fresh" descendant.
+ ///
+ /// The least "fresh" blocks are eventually removed.
+ pub fn enforce_limit(&mut self, number: NumberFor) {
+ let level_len = self.levels.get(&number).map(|l| l.len()).unwrap_or_default();
+ if level_len < self.level_limit {
+ return
+ }
+
+ // Sort leaves by freshness only once (less fresh first) and keep track of
+ // leaves that were invalidated on removal.
+ let mut leaves = self.backend.blockchain().leaves().unwrap_or_default();
+ leaves.sort_unstable_by(|a, b| self.freshness.get(a).cmp(&self.freshness.get(b)));
+ let mut invalidated_leaves = HashSet::new();
+
+ // This may not be the most efficient way to remove **multiple** entries, but is the easy
+ // one :-). Should be considered that in "normal" conditions the number of blocks to remove
+ // is 0 or 1, it is not worth to complicate the code too much. One condition that may
+ // trigger multiple removals (2+) is if we restart the node using an existing db and a
+ // smaller limit wrt the one previously used.
+ let remove_count = level_len - self.level_limit + 1;
+
+ log::debug!(
+ target: "parachain",
+ "Detected leaves overflow at height {number}, removing {remove_count} obsolete blocks",
+ );
+
+ (0..remove_count).all(|_| {
+ self.find_target(number, &leaves, &invalidated_leaves).map_or(false, |target| {
+ self.remove_target(target, number, &leaves, &mut invalidated_leaves);
+ true
+ })
+ });
+ }
+
+ // Helper function to find the best candidate to be removed.
+ //
+ // Given a set of blocks with height equal to `number` (potential candidates)
+ // 1. For each candidate fetch all the leaves that are descending from it.
+ // 2. Set the candidate freshness equal to the fresher of its descending leaves.
+ // 3. The target is set as the candidate that is less fresh.
+ //
+ // Input `leaves` are assumed to be already ordered by "freshness" (less fresh first).
+ //
+ // Returns the index of the target fresher leaf within `leaves` and the route from target to
+ // such leaf.
+ fn find_target(
+ &self,
+ number: NumberFor,
+ leaves: &[Block::Hash],
+ invalidated_leaves: &HashSet,
+ ) -> Option> {
+ let mut target_info: Option> = None;
+ let blockchain = self.backend.blockchain();
+ let best_hash = blockchain.info().best_hash;
+
+ // Leaves that where already assigned to some node and thus can be skipped
+ // during the search.
+ let mut assigned_leaves = HashSet::new();
+
+ let level = self.levels.get(&number)?;
+
+ for blk_hash in level.iter().filter(|hash| **hash != best_hash) {
+ // Search for the fresher leaf information for this block
+ let candidate_info = leaves
+ .iter()
+ .enumerate()
+ .filter(|(leaf_idx, _)| {
+ !assigned_leaves.contains(leaf_idx) && !invalidated_leaves.contains(leaf_idx)
+ })
+ .rev()
+ .find_map(|(leaf_idx, leaf_hash)| {
+ if blk_hash == leaf_hash {
+ let entry = HashAndNumber { number, hash: *blk_hash };
+ TreeRoute::new(vec![entry], 0).ok().map(|freshest_route| TargetInfo {
+ freshest_leaf_idx: leaf_idx,
+ freshest_route,
+ })
+ } else {
+ match sp_blockchain::tree_route(blockchain, *blk_hash, *leaf_hash) {
+ Ok(route) if route.retracted().is_empty() => Some(TargetInfo {
+ freshest_leaf_idx: leaf_idx,
+ freshest_route: route,
+ }),
+ Err(err) => {
+ log::warn!(
+ target: "parachain",
+ "(Lookup) Unable getting route from {:?} to {:?}: {}",
+ blk_hash, leaf_hash, err,
+ );
+ None
+ },
+ _ => None,
+ }
+ }
+ });
+
+ let candidate_info = match candidate_info {
+ Some(candidate_info) => {
+ assigned_leaves.insert(candidate_info.freshest_leaf_idx);
+ candidate_info
+ },
+ None => {
+ // This should never happen
+ log::error!(
+ target: "parachain",
+ "Unable getting route to any leaf from {:?} (this is a bug)",
+ blk_hash,
+ );
+ continue
+ },
+ };
+
+ // Found fresher leaf for this candidate.
+ // This candidate is set as the new target if:
+ // 1. its fresher leaf is less fresh than the previous target fresher leaf AND
+ // 2. best block is not in its route
+
+ let is_less_fresh = || {
+ target_info
+ .as_ref()
+ .map(|ti| candidate_info.freshest_leaf_idx < ti.freshest_leaf_idx)
+ .unwrap_or(true)
+ };
+ let not_contains_best = || {
+ candidate_info
+ .freshest_route
+ .enacted()
+ .iter()
+ .all(|entry| entry.hash != best_hash)
+ };
+
+ if is_less_fresh() && not_contains_best() {
+ let early_stop = candidate_info.freshest_leaf_idx == 0;
+ target_info = Some(candidate_info);
+ if early_stop {
+ // We will never find a candidate with an worst freshest leaf than this.
+ break
+ }
+ }
+ }
+
+ target_info
+ }
+
+ // Remove the target block and all its descendants.
+ //
+ // Leaves should have already been ordered by "freshness" (less fresh first).
+ fn remove_target(
+ &mut self,
+ target: TargetInfo,
+ number: NumberFor,
+ leaves: &[Block::Hash],
+ invalidated_leaves: &mut HashSet,
+ ) {
+ let mut remove_leaf = |number, hash| {
+ log::debug!(target: "parachain", "Removing block (@{}) {:?}", number, hash);
+ if let Err(err) = self.backend.remove_leaf_block(hash) {
+ log::debug!(target: "parachain", "Remove not possible for {}: {}", hash, err);
+ return false
+ }
+ self.levels.get_mut(&number).map(|level| level.remove(&hash));
+ self.freshness.remove(&hash);
+ true
+ };
+
+ invalidated_leaves.insert(target.freshest_leaf_idx);
+
+ // Takes care of route removal. Starts from the leaf and stops as soon as an error is
+ // encountered. In this case an error is interpreted as the block being not a leaf
+ // and it will be removed while removing another route from the same block but to a
+ // different leaf.
+ let mut remove_route = |route: TreeRoute| {
+ route.enacted().iter().rev().all(|elem| remove_leaf(elem.number, elem.hash));
+ };
+
+ let target_hash = target.freshest_route.common_block().hash;
+ debug_assert_eq!(
+ target.freshest_route.common_block().number,
+ number,
+ "This is a bug in LevelMonitor::find_target() or the Backend is corrupted"
+ );
+
+ // Remove freshest (cached) route first.
+ remove_route(target.freshest_route);
+
+ // Don't bother trying with leaves we already found to not be our descendants.
+ let to_skip = leaves.len() - target.freshest_leaf_idx;
+ leaves.iter().enumerate().rev().skip(to_skip).for_each(|(leaf_idx, leaf_hash)| {
+ if invalidated_leaves.contains(&leaf_idx) {
+ return
+ }
+ match sp_blockchain::tree_route(self.backend.blockchain(), target_hash, *leaf_hash) {
+ Ok(route) if route.retracted().is_empty() => {
+ invalidated_leaves.insert(leaf_idx);
+ remove_route(route);
+ },
+ Err(err) => {
+ log::warn!(
+ target: "parachain",
+ "(Removal) unable getting route from {:?} to {:?}: {}",
+ target_hash, leaf_hash, err,
+ );
+ },
+ _ => (),
+ };
+ });
+
+ remove_leaf(number, target_hash);
+ }
+
+ /// Add a new imported block information to the monitor.
+ pub fn block_imported(&mut self, number: NumberFor, hash: Block::Hash) {
+ self.freshness.insert(hash, self.import_counter);
+ self.levels.entry(number).or_default().insert(hash);
+ self.import_counter += One::one();
+
+ // Do cleanup once in a while, we are allowed to have some obsolete information.
+ let finalized_num = self.backend.blockchain().info().finalized_number;
+ let delta: u32 = finalized_num.saturating_sub(self.lowest_level).unique_saturated_into();
+ if delta >= CLEANUP_THRESHOLD {
+ for i in 0..delta {
+ let number = self.lowest_level + i.unique_saturated_into();
+ self.levels.remove(&number).map(|level| {
+ level.iter().for_each(|hash| {
+ self.freshness.remove(hash);
+ })
+ });
+ }
+
+ self.lowest_level = finalized_num;
+ }
+ }
+}
diff --git a/client/consensus/common/src/lib.rs b/client/consensus/common/src/lib.rs
index 61098dfd434..39119f345c2 100644
--- a/client/consensus/common/src/lib.rs
+++ b/client/consensus/common/src/lib.rs
@@ -15,14 +15,23 @@
// along with Cumulus. If not, see .
use polkadot_primitives::v2::{Hash as PHash, PersistedValidationData};
-use sc_consensus::BlockImport;
-use sp_runtime::traits::Block as BlockT;
+use sc_client_api::Backend;
+use sc_consensus::{shared_data::SharedData, BlockImport, ImportResult};
+use sp_runtime::traits::{Block as BlockT, Header as HeaderT};
+
+use std::sync::Arc;
+
+mod level_monitor;
mod parachain_consensus;
#[cfg(test)]
mod tests;
+
pub use parachain_consensus::run_parachain_consensus;
+use level_monitor::LevelMonitor;
+pub use level_monitor::{LevelLimit, MAX_LEAVES_PER_LEVEL_SENSIBLE_DEFAULT};
+
/// The result of [`ParachainConsensus::produce_candidate`].
pub struct ParachainCandidate {
/// The block that was built for this candidate.
@@ -74,41 +83,93 @@ impl ParachainConsensus for Box + Send +
/// This is used to set `block_import_params.fork_choice` to `false` as long as the block origin is
/// not `NetworkInitialSync`. The best block for parachains is determined by the relay chain. Meaning
/// we will update the best block, as it is included by the relay-chain.
-pub struct ParachainBlockImport(I);
+pub struct ParachainBlockImport {
+ inner: BI,
+ monitor: Option>>,
+}
-impl ParachainBlockImport {
+impl> ParachainBlockImport {
/// Create a new instance.
- pub fn new(inner: I) -> Self {
- Self(inner)
+ ///
+ /// The number of leaves per level limit is set to `LevelLimit::Default`.
+ pub fn new(inner: BI, backend: Arc) -> Self {
+ Self::new_with_limit(inner, backend, LevelLimit::Default)
+ }
+
+ /// Create a new instance with an explicit limit to the number of leaves per level.
+ ///
+ /// This function alone doesn't enforce the limit on levels for old imported blocks,
+ /// the limit is eventually enforced only when new blocks are imported.
+ pub fn new_with_limit(inner: BI, backend: Arc, level_leaves_max: LevelLimit) -> Self {
+ let level_limit = match level_leaves_max {
+ LevelLimit::None => None,
+ LevelLimit::Some(limit) => Some(limit),
+ LevelLimit::Default => Some(MAX_LEAVES_PER_LEVEL_SENSIBLE_DEFAULT),
+ };
+
+ let monitor =
+ level_limit.map(|level_limit| SharedData::new(LevelMonitor::new(level_limit, backend)));
+
+ Self { inner, monitor }
+ }
+}
+
+impl Clone for ParachainBlockImport {
+ fn clone(&self) -> Self {
+ ParachainBlockImport { inner: self.inner.clone(), monitor: self.monitor.clone() }
}
}
#[async_trait::async_trait]
-impl BlockImport for ParachainBlockImport
+impl BlockImport for ParachainBlockImport
where
Block: BlockT,
- I: BlockImport + Send,
+ BI: BlockImport + Send,
+ BE: Backend,
{
- type Error = I::Error;
- type Transaction = I::Transaction;
+ type Error = BI::Error;
+ type Transaction = BI::Transaction;
async fn check_block(
&mut self,
block: sc_consensus::BlockCheckParams,
) -> Result {
- self.0.check_block(block).await
+ self.inner.check_block(block).await
}
async fn import_block(
&mut self,
- mut block_import_params: sc_consensus::BlockImportParams,
+ mut params: sc_consensus::BlockImportParams,
cache: std::collections::HashMap>,
) -> Result {
+ // Blocks are stored within the backend by using POST hash.
+ let hash = params.post_hash();
+ let number = *params.header.number();
+
// Best block is determined by the relay chain, or if we are doing the initial sync
// we import all blocks as new best.
- block_import_params.fork_choice = Some(sc_consensus::ForkChoiceStrategy::Custom(
- block_import_params.origin == sp_consensus::BlockOrigin::NetworkInitialSync,
+ params.fork_choice = Some(sc_consensus::ForkChoiceStrategy::Custom(
+ params.origin == sp_consensus::BlockOrigin::NetworkInitialSync,
));
- self.0.import_block(block_import_params, cache).await
+
+ let maybe_lock = self.monitor.as_ref().map(|monitor_lock| {
+ let mut monitor = monitor_lock.shared_data_locked();
+ monitor.enforce_limit(number);
+ monitor.release_mutex()
+ });
+
+ let res = self.inner.import_block(params, cache).await?;
+
+ if let (Some(mut monitor_lock), ImportResult::Imported(_)) = (maybe_lock, &res) {
+ let mut monitor = monitor_lock.upgrade();
+ monitor.block_imported(number, hash);
+ }
+
+ Ok(res)
}
}
+
+/// Marker trait denoting a block import type that fits the parachain requirements.
+pub trait ParachainBlockImportMarker {}
+
+impl ParachainBlockImportMarker for ParachainBlockImport {}
diff --git a/client/consensus/common/src/parachain_consensus.rs b/client/consensus/common/src/parachain_consensus.rs
index ccdfb17b671..ffbbab5a200 100644
--- a/client/consensus/common/src/parachain_consensus.rs
+++ b/client/consensus/common/src/parachain_consensus.rs
@@ -15,7 +15,6 @@
// along with Cumulus. If not, see .
use async_trait::async_trait;
-use cumulus_relay_chain_interface::{RelayChainInterface, RelayChainResult};
use sc_client_api::{
Backend, BlockBackend, BlockImportNotification, BlockchainEvents, Finalizer, UsageProvider,
};
@@ -27,15 +26,25 @@ use sp_runtime::{
traits::{Block as BlockT, Header as HeaderT},
};
+use cumulus_client_pov_recovery::{RecoveryDelay, RecoveryKind, RecoveryRequest};
+use cumulus_relay_chain_interface::{RelayChainInterface, RelayChainResult};
+
use polkadot_primitives::v2::{Hash as PHash, Id as ParaId, OccupiedCoreAssumption};
use codec::Decode;
-use futures::{select, FutureExt, Stream, StreamExt};
+use futures::{channel::mpsc::Sender, select, FutureExt, Stream, StreamExt};
-use std::{pin::Pin, sync::Arc};
+use std::{pin::Pin, sync::Arc, time::Duration};
const LOG_TARGET: &str = "cumulus-consensus";
+// Delay range to trigger explicit requests.
+// The chosen value doesn't have any special meaning, a random delay within the order of
+// seconds in practice should be a good enough to allow a quick recovery without DOSing
+// the relay chain.
+const RECOVERY_DELAY: RecoveryDelay =
+ RecoveryDelay { min: Duration::ZERO, max: Duration::from_secs(30) };
+
/// Helper for the relay chain client. This is expected to be a lightweight handle like an `Arc`.
#[async_trait]
pub trait RelaychainClient: Clone + 'static {
@@ -82,7 +91,7 @@ where
let finalized_head = if let Some(h) = finalized_heads.next().await {
h
} else {
- tracing::debug!(target: "cumulus-consensus", "Stopping following finalized head.");
+ tracing::debug!(target: LOG_TARGET, "Stopping following finalized head.");
return
};
@@ -90,7 +99,7 @@ where
Ok(header) => header,
Err(err) => {
tracing::debug!(
- target: "cumulus-consensus",
+ target: LOG_TARGET,
error = ?err,
"Could not decode parachain header while following finalized heads.",
);
@@ -102,15 +111,15 @@ where
// don't finalize the same block multiple times.
if parachain.usage_info().chain.finalized_hash != hash {
- if let Err(e) = parachain.finalize_block(BlockId::hash(hash), None, true) {
+ if let Err(e) = parachain.finalize_block(hash, None, true) {
match e {
ClientError::UnknownBlock(_) => tracing::debug!(
- target: "cumulus-consensus",
+ target: LOG_TARGET,
block_hash = ?hash,
"Could not finalize block because it is unknown.",
),
_ => tracing::warn!(
- target: "cumulus-consensus",
+ target: LOG_TARGET,
error = ?e,
block_hash = ?hash,
"Failed to finalize block",
@@ -136,6 +145,7 @@ pub async fn run_parachain_consensus (
parachain: Arc
,
relay_chain: R,
announce_block: Arc>) + Send + Sync>,
+ recovery_chan_tx: Option>>,
) where
Block: BlockT,
P: Finalizer
@@ -148,8 +158,13 @@ pub async fn run_parachain_consensus(
R: RelaychainClient,
B: Backend,
{
- let follow_new_best =
- follow_new_best(para_id, parachain.clone(), relay_chain.clone(), announce_block);
+ let follow_new_best = follow_new_best(
+ para_id,
+ parachain.clone(),
+ relay_chain.clone(),
+ announce_block,
+ recovery_chan_tx,
+ );
let follow_finalized_head = follow_finalized_head(para_id, parachain, relay_chain);
select! {
_ = follow_new_best.fuse() => {},
@@ -163,6 +178,7 @@ async fn follow_new_best(
parachain: Arc
,
relay_chain: R,
announce_block: Arc>) + Send + Sync>,
+ recovery_chan_tx: Option>>,
) where
Block: BlockT,
P: Finalizer
@@ -197,10 +213,11 @@ async fn follow_new_best(
h,
&*parachain,
&mut unset_best_header,
+ recovery_chan_tx.clone(),
).await,
None => {
tracing::debug!(
- target: "cumulus-consensus",
+ target: LOG_TARGET,
"Stopping following new best.",
);
return
@@ -217,7 +234,7 @@ async fn follow_new_best
(
).await,
None => {
tracing::debug!(
- target: "cumulus-consensus",
+ target: LOG_TARGET,
"Stopping following imported blocks.",
);
return
@@ -276,7 +293,7 @@ async fn handle_new_block_imported(
import_block_as_new_best(unset_hash, unset_best_header, parachain).await;
},
state => tracing::debug!(
- target: "cumulus-consensus",
+ target: LOG_TARGET,
?unset_best_header,
?notification.header,
?state,
@@ -290,6 +307,7 @@ async fn handle_new_best_parachain_head(
head: Vec,
parachain: &P,
unset_best_header: &mut Option,
+ mut recovery_chan_tx: Option>>,
) where
Block: BlockT,
P: UsageProvider + Send + Sync + BlockBackend,
@@ -299,7 +317,7 @@ async fn handle_new_best_parachain_head(
Ok(header) => header,
Err(err) => {
tracing::debug!(
- target: "cumulus-consensus",
+ target: LOG_TARGET,
error = ?err,
"Could not decode Parachain header while following best heads.",
);
@@ -311,7 +329,7 @@ async fn handle_new_best_parachain_head(
if parachain.usage_info().chain.best_hash == hash {
tracing::debug!(
- target: "cumulus-consensus",
+ target: LOG_TARGET,
block_hash = ?hash,
"Skipping set new best block, because block is already the best.",
)
@@ -325,7 +343,7 @@ async fn handle_new_best_parachain_head(
},
Ok(BlockStatus::InChainPruned) => {
tracing::error!(
- target: "cumulus-collator",
+ target: LOG_TARGET,
block_hash = ?hash,
"Trying to set pruned block as new best!",
);
@@ -334,14 +352,30 @@ async fn handle_new_best_parachain_head(
*unset_best_header = Some(parachain_head);
tracing::debug!(
- target: "cumulus-collator",
+ target: LOG_TARGET,
block_hash = ?hash,
"Parachain block not yet imported, waiting for import to enact as best block.",
);
+
+ if let Some(ref mut recovery_chan_tx) = recovery_chan_tx {
+ // Best effort channel to actively encourage block recovery.
+ // An error here is not fatal; the relay chain continuously re-announces
+ // the best block, thus we will have other opportunities to retry.
+ let req =
+ RecoveryRequest { hash, delay: RECOVERY_DELAY, kind: RecoveryKind::Full };
+ if let Err(err) = recovery_chan_tx.try_send(req) {
+ tracing::warn!(
+ target: LOG_TARGET,
+ block_hash = ?hash,
+ error = ?err,
+ "Unable to notify block recovery subsystem"
+ )
+ }
+ }
},
Err(e) => {
tracing::error!(
- target: "cumulus-collator",
+ target: LOG_TARGET,
block_hash = ?hash,
error = ?e,
"Failed to get block status of block.",
@@ -361,7 +395,7 @@ where
let best_number = parachain.usage_info().chain.best_number;
if *header.number() < best_number {
tracing::debug!(
- target: "cumulus-consensus",
+ target: LOG_TARGET,
%best_number,
block_number = %header.number(),
"Skipping importing block as new best block, because there already exists a \
@@ -377,7 +411,7 @@ where
if let Err(err) = (&*parachain).import_block(block_import_params, Default::default()).await {
tracing::warn!(
- target: "cumulus-consensus",
+ target: LOG_TARGET,
block_hash = ?hash,
error = ?err,
"Failed to set new best block.",
diff --git a/client/consensus/common/src/tests.rs b/client/consensus/common/src/tests.rs
index 23729abebb4..92cecc37d29 100644
--- a/client/consensus/common/src/tests.rs
+++ b/client/consensus/common/src/tests.rs
@@ -18,6 +18,7 @@ use crate::*;
use async_trait::async_trait;
use codec::Encode;
+use cumulus_client_pov_recovery::RecoveryKind;
use cumulus_relay_chain_interface::RelayChainResult;
use cumulus_test_client::{
runtime::{Block, Header},
@@ -26,10 +27,10 @@ use cumulus_test_client::{
use futures::{channel::mpsc, executor::block_on, select, FutureExt, Stream, StreamExt};
use futures_timer::Delay;
use polkadot_primitives::v2::Id as ParaId;
-use sc_client_api::UsageProvider;
+use sc_client_api::{blockchain::Backend as _, Backend as _, UsageProvider};
use sc_consensus::{BlockImport, BlockImportParams, ForkChoiceStrategy};
use sp_blockchain::Error as ClientError;
-use sp_consensus::BlockOrigin;
+use sp_consensus::{BlockOrigin, BlockStatus};
use sp_runtime::generic::BlockId;
use std::{
sync::{Arc, Mutex},
@@ -103,21 +104,82 @@ impl crate::parachain_consensus::RelaychainClient for Relaychain {
}
}
-fn build_and_import_block(mut client: Arc, import_as_best: bool) -> Block {
- let builder = client.init_block_builder(None, Default::default());
+fn build_block(
+ builder: &B,
+ at: Option>,
+ timestamp: Option,
+) -> Block {
+ let builder = match at {
+ Some(at) => match timestamp {
+ Some(ts) =>
+ builder.init_block_builder_with_timestamp(&at, None, Default::default(), ts),
+ None => builder.init_block_builder_at(&at, None, Default::default()),
+ },
+ None => builder.init_block_builder(None, Default::default()),
+ };
+
+ let mut block = builder.build().unwrap().block;
- let block = builder.build().unwrap().block;
- let (header, body) = block.clone().deconstruct();
+ // Simulate some form of post activity (like a Seal or Other generic things).
+ // This is mostly used to excercise the `LevelMonitor` correct behavior.
+ // (in practice we want that header post-hash != pre-hash)
+ block.header.digest.push(sp_runtime::DigestItem::Other(vec![1, 2, 3]));
- let mut block_import_params = BlockImportParams::new(BlockOrigin::Own, header);
+ block
+}
+
+async fn import_block>(
+ importer: &mut I,
+ block: Block,
+ origin: BlockOrigin,
+ import_as_best: bool,
+) {
+ let (mut header, body) = block.deconstruct();
+
+ let post_digest =
+ header.digest.pop().expect("post digested is present in manually crafted block");
+
+ let mut block_import_params = BlockImportParams::new(origin, header);
block_import_params.fork_choice = Some(ForkChoiceStrategy::Custom(import_as_best));
block_import_params.body = Some(body);
+ block_import_params.post_digests.push(post_digest);
- block_on(client.import_block(block_import_params, Default::default())).unwrap();
+ importer.import_block(block_import_params, Default::default()).await.unwrap();
+}
+fn import_block_sync>(
+ importer: &mut I,
+ block: Block,
+ origin: BlockOrigin,
+ import_as_best: bool,
+) {
+ block_on(import_block(importer, block, origin, import_as_best));
+}
+
+fn build_and_import_block_ext>(
+ builder: &B,
+ origin: BlockOrigin,
+ import_as_best: bool,
+ importer: &mut I,
+ at: Option>,
+ timestamp: Option,
+) -> Block {
+ let block = build_block(builder, at, timestamp);
+ import_block_sync(importer, block.clone(), origin, import_as_best);
block
}
+fn build_and_import_block(mut client: Arc, import_as_best: bool) -> Block {
+ build_and_import_block_ext(
+ &*client.clone(),
+ BlockOrigin::Own,
+ import_as_best,
+ &mut client,
+ None,
+ None,
+ )
+}
+
#[test]
fn follow_new_best_works() {
sp_tracing::try_init_simple();
@@ -129,7 +191,7 @@ fn follow_new_best_works() {
let new_best_heads_sender = relay_chain.inner.lock().unwrap().new_best_heads_sender.clone();
let consensus =
- run_parachain_consensus(100.into(), client.clone(), relay_chain, Arc::new(|_, _| {}));
+ run_parachain_consensus(100.into(), client.clone(), relay_chain, Arc::new(|_, _| {}), None);
let work = async move {
new_best_heads_sender.unbounded_send(block.header().clone()).unwrap();
@@ -152,6 +214,68 @@ fn follow_new_best_works() {
});
}
+#[test]
+fn follow_new_best_with_dummy_recovery_works() {
+ sp_tracing::try_init_simple();
+
+ let client = Arc::new(TestClientBuilder::default().build());
+
+ let relay_chain = Relaychain::new();
+ let new_best_heads_sender = relay_chain.inner.lock().unwrap().new_best_heads_sender.clone();
+
+ let (recovery_chan_tx, mut recovery_chan_rx) = futures::channel::mpsc::channel(3);
+
+ let consensus = run_parachain_consensus(
+ 100.into(),
+ client.clone(),
+ relay_chain,
+ Arc::new(|_, _| {}),
+ Some(recovery_chan_tx),
+ );
+
+ let block = build_block(&*client.clone(), None, None);
+ let block_clone = block.clone();
+ let client_clone = client.clone();
+
+ let work = async move {
+ new_best_heads_sender.unbounded_send(block.header().clone()).unwrap();
+ loop {
+ Delay::new(Duration::from_millis(100)).await;
+ match client.block_status(&BlockId::Hash(block.hash())).unwrap() {
+ BlockStatus::Unknown => {},
+ status => {
+ assert_eq!(block.hash(), client.usage_info().chain.best_hash);
+ assert_eq!(status, BlockStatus::InChainWithState);
+ break
+ },
+ }
+ }
+ };
+
+ let dummy_block_recovery = async move {
+ loop {
+ if let Some(req) = recovery_chan_rx.next().await {
+ assert_eq!(req.hash, block_clone.hash());
+ assert_eq!(req.kind, RecoveryKind::Full);
+ Delay::new(Duration::from_millis(500)).await;
+ import_block(&mut &*client_clone, block_clone.clone(), BlockOrigin::Own, true)
+ .await;
+ }
+ }
+ };
+
+ block_on(async move {
+ futures::pin_mut!(consensus);
+ futures::pin_mut!(work);
+
+ select! {
+ r = consensus.fuse() => panic!("Consensus should not end: {:?}", r),
+ _ = dummy_block_recovery.fuse() => {},
+ _ = work.fuse() => {},
+ }
+ });
+}
+
#[test]
fn follow_finalized_works() {
sp_tracing::try_init_simple();
@@ -163,7 +287,7 @@ fn follow_finalized_works() {
let finalized_sender = relay_chain.inner.lock().unwrap().finalized_heads_sender.clone();
let consensus =
- run_parachain_consensus(100.into(), client.clone(), relay_chain, Arc::new(|_, _| {}));
+ run_parachain_consensus(100.into(), client.clone(), relay_chain, Arc::new(|_, _| {}), None);
let work = async move {
finalized_sender.unbounded_send(block.header().clone()).unwrap();
@@ -204,7 +328,7 @@ fn follow_finalized_does_not_stop_on_unknown_block() {
let finalized_sender = relay_chain.inner.lock().unwrap().finalized_heads_sender.clone();
let consensus =
- run_parachain_consensus(100.into(), client.clone(), relay_chain, Arc::new(|_, _| {}));
+ run_parachain_consensus(100.into(), client.clone(), relay_chain, Arc::new(|_, _| {}), None);
let work = async move {
for _ in 0..3usize {
@@ -254,7 +378,7 @@ fn follow_new_best_sets_best_after_it_is_imported() {
let new_best_heads_sender = relay_chain.inner.lock().unwrap().new_best_heads_sender.clone();
let consensus =
- run_parachain_consensus(100.into(), client.clone(), relay_chain, Arc::new(|_, _| {}));
+ run_parachain_consensus(100.into(), client.clone(), relay_chain, Arc::new(|_, _| {}), None);
let work = async move {
new_best_heads_sender.unbounded_send(block.header().clone()).unwrap();
@@ -331,7 +455,7 @@ fn do_not_set_best_block_to_older_block() {
let new_best_heads_sender = relay_chain.inner.lock().unwrap().new_best_heads_sender.clone();
let consensus =
- run_parachain_consensus(100.into(), client.clone(), relay_chain, Arc::new(|_, _| {}));
+ run_parachain_consensus(100.into(), client.clone(), relay_chain, Arc::new(|_, _| {}), None);
let client2 = client.clone();
let work = async move {
@@ -355,3 +479,216 @@ fn do_not_set_best_block_to_older_block() {
// Build and import a new best block.
build_and_import_block(client2.clone(), true);
}
+
+#[test]
+fn prune_blocks_on_level_overflow() {
+ // Here we are using the timestamp value to generate blocks with different hashes.
+ const LEVEL_LIMIT: usize = 3;
+ const TIMESTAMP_MULTIPLIER: u64 = 60000;
+
+ let backend = Arc::new(Backend::new_test(1000, 3));
+ let client = Arc::new(TestClientBuilder::with_backend(backend.clone()).build());
+ let mut para_import = ParachainBlockImport::new_with_limit(
+ client.clone(),
+ backend.clone(),
+ LevelLimit::Some(LEVEL_LIMIT),
+ );
+
+ let block0 = build_and_import_block_ext(
+ &*client,
+ BlockOrigin::NetworkInitialSync,
+ true,
+ &mut para_import,
+ None,
+ None,
+ );
+ let id0 = BlockId::Hash(block0.header.hash());
+
+ let blocks1 = (0..LEVEL_LIMIT)
+ .into_iter()
+ .map(|i| {
+ build_and_import_block_ext(
+ &*client,
+ if i == 1 { BlockOrigin::NetworkInitialSync } else { BlockOrigin::Own },
+ i == 1,
+ &mut para_import,
+ Some(id0),
+ Some(i as u64 * TIMESTAMP_MULTIPLIER),
+ )
+ })
+ .collect::>();
+ let id10 = BlockId::Hash(blocks1[0].header.hash());
+
+ let blocks2 = (0..2)
+ .into_iter()
+ .map(|i| {
+ build_and_import_block_ext(
+ &*client,
+ BlockOrigin::Own,
+ false,
+ &mut para_import,
+ Some(id10),
+ Some(i as u64 * TIMESTAMP_MULTIPLIER),
+ )
+ })
+ .collect::>();
+
+ // Initial scenario (with B11 imported as best)
+ //
+ // B0 --+-- B10 --+-- B20
+ // +-- B11 +-- B21
+ // +-- B12
+
+ let leaves = backend.blockchain().leaves().unwrap();
+ let mut expected = vec![
+ blocks2[0].header.hash(),
+ blocks2[1].header.hash(),
+ blocks1[1].header.hash(),
+ blocks1[2].header.hash(),
+ ];
+ assert_eq!(leaves, expected);
+ let best = client.usage_info().chain.best_hash;
+ assert_eq!(best, blocks1[1].header.hash());
+
+ let block13 = build_and_import_block_ext(
+ &*client,
+ BlockOrigin::Own,
+ false,
+ &mut para_import,
+ Some(id0),
+ Some(LEVEL_LIMIT as u64 * TIMESTAMP_MULTIPLIER),
+ );
+
+ // Expected scenario
+ //
+ // B0 --+-- B10 --+-- B20
+ // +-- B11 +-- B21
+ // +--(B13) <-- B12 has been replaced
+
+ let leaves = backend.blockchain().leaves().unwrap();
+ expected[3] = block13.header.hash();
+ assert_eq!(leaves, expected);
+
+ let block14 = build_and_import_block_ext(
+ &*client,
+ BlockOrigin::Own,
+ false,
+ &mut para_import,
+ Some(id0),
+ Some(2 * LEVEL_LIMIT as u64 * TIMESTAMP_MULTIPLIER),
+ );
+
+ // Expected scenario
+ //
+ // B0 --+--(B14) <-- B10 has been replaced
+ // +-- B11
+ // +--(B13)
+
+ let leaves = backend.blockchain().leaves().unwrap();
+ expected.remove(0);
+ expected.remove(0);
+ expected.push(block14.header.hash());
+ assert_eq!(leaves, expected);
+}
+
+#[test]
+fn restore_limit_monitor() {
+ // Here we are using the timestamp value to generate blocks with different hashes.
+ const LEVEL_LIMIT: usize = 2;
+ const TIMESTAMP_MULTIPLIER: u64 = 60000;
+
+ let backend = Arc::new(Backend::new_test(1000, 3));
+ let client = Arc::new(TestClientBuilder::with_backend(backend.clone()).build());
+
+ // Start with a block import not enforcing any limit...
+ let mut para_import = ParachainBlockImport::new_with_limit(
+ client.clone(),
+ backend.clone(),
+ LevelLimit::Some(usize::MAX),
+ );
+
+ let block00 = build_and_import_block_ext(
+ &*client,
+ BlockOrigin::NetworkInitialSync,
+ true,
+ &mut para_import,
+ None,
+ None,
+ );
+ let id00 = BlockId::Hash(block00.header.hash());
+
+ let blocks1 = (0..LEVEL_LIMIT + 1)
+ .into_iter()
+ .map(|i| {
+ build_and_import_block_ext(
+ &*client,
+ if i == 1 { BlockOrigin::NetworkInitialSync } else { BlockOrigin::Own },
+ i == 1,
+ &mut para_import,
+ Some(id00),
+ Some(i as u64 * TIMESTAMP_MULTIPLIER),
+ )
+ })
+ .collect::>();
+ let id10 = BlockId::Hash(blocks1[0].header.hash());
+
+ let _ = (0..LEVEL_LIMIT)
+ .into_iter()
+ .map(|i| {
+ build_and_import_block_ext(
+ &*client,
+ BlockOrigin::Own,
+ false,
+ &mut para_import,
+ Some(id10),
+ Some(i as u64 * TIMESTAMP_MULTIPLIER),
+ )
+ })
+ .collect::>();
+
+ // Scenario before limit application (with B11 imported as best)
+ // Import order (freshess): B00, B10, B11, B12, B20, B21
+ //
+ // B00 --+-- B10 --+-- B20
+ // | +-- B21
+ // +-- B11
+ // |
+ // +-- B12
+
+ // Simulate a restart by forcing a new monitor structure instance
+
+ let mut para_import = ParachainBlockImport::new_with_limit(
+ client.clone(),
+ backend.clone(),
+ LevelLimit::Some(LEVEL_LIMIT),
+ );
+
+ let block13 = build_and_import_block_ext(
+ &*client,
+ BlockOrigin::Own,
+ false,
+ &mut para_import,
+ Some(id00),
+ Some(LEVEL_LIMIT as u64 * TIMESTAMP_MULTIPLIER),
+ );
+
+ // Expected scenario
+ //
+ // B0 --+-- B11
+ // +--(B13)
+
+ let leaves = backend.blockchain().leaves().unwrap();
+ let expected = vec![blocks1[1].header.hash(), block13.header.hash()];
+ assert_eq!(leaves, expected);
+
+ let monitor = para_import.monitor.unwrap();
+ let monitor = monitor.shared_data();
+ assert_eq!(monitor.import_counter, 5);
+ assert!(monitor.levels.iter().all(|(number, hashes)| {
+ hashes
+ .iter()
+ .filter(|hash| **hash != block13.header.hash())
+ .all(|hash| *number == *monitor.freshness.get(hash).unwrap())
+ }));
+ assert_eq!(*monitor.freshness.get(&block13.header.hash()).unwrap(), monitor.import_counter - 1);
+}
diff --git a/client/consensus/relay-chain/Cargo.toml b/client/consensus/relay-chain/Cargo.toml
index 0f5fdb96ac2..a54ac9b8d1f 100644
--- a/client/consensus/relay-chain/Cargo.toml
+++ b/client/consensus/relay-chain/Cargo.toml
@@ -6,21 +6,21 @@ authors = ["Parity Technologies "]
edition = "2021"
[dependencies]
-async-trait = "0.1.57"
-futures = "0.3.24"
+async-trait = "0.1.59"
+futures = "0.3.25"
parking_lot = "0.12.1"
tracing = "0.1.37"
# Substrate
-sc-consensus = { git = "https://github.com/paritytech/substrate", branch = "polkadot-v0.9.31" }
-sp-api = { git = "https://github.com/paritytech/substrate", branch = "polkadot-v0.9.31" }
-sp-block-builder = { git = "https://github.com/paritytech/substrate", branch = "polkadot-v0.9.31" }
-sp-blockchain = { git = "https://github.com/paritytech/substrate", branch = "polkadot-v0.9.31" }
-sp-consensus = { git = "https://github.com/paritytech/substrate", branch = "polkadot-v0.9.31" }
-sp-core = { git = "https://github.com/paritytech/substrate", branch = "polkadot-v0.9.31" }
-sp-inherents = { git = "https://github.com/paritytech/substrate", branch = "polkadot-v0.9.31" }
-sp-runtime = { git = "https://github.com/paritytech/substrate", branch = "polkadot-v0.9.31" }
-substrate-prometheus-endpoint = { git = "https://github.com/paritytech/substrate", branch = "polkadot-v0.9.31" }
+sc-consensus = { git = "https://github.com/paritytech/substrate", branch = "polkadot-v0.9.36" }
+sp-api = { git = "https://github.com/paritytech/substrate", branch = "polkadot-v0.9.36" }
+sp-block-builder = { git = "https://github.com/paritytech/substrate", branch = "polkadot-v0.9.36" }
+sp-blockchain = { git = "https://github.com/paritytech/substrate", branch = "polkadot-v0.9.36" }
+sp-consensus = { git = "https://github.com/paritytech/substrate", branch = "polkadot-v0.9.36" }
+sp-core = { git = "https://github.com/paritytech/substrate", branch = "polkadot-v0.9.36" }
+sp-inherents = { git = "https://github.com/paritytech/substrate", branch = "polkadot-v0.9.36" }
+sp-runtime = { git = "https://github.com/paritytech/substrate", branch = "polkadot-v0.9.36" }
+substrate-prometheus-endpoint = { git = "https://github.com/paritytech/substrate", branch = "polkadot-v0.9.36" }
# Cumulus
cumulus-client-consensus-common = { path = "../common" }
diff --git a/client/consensus/relay-chain/src/import_queue.rs b/client/consensus/relay-chain/src/import_queue.rs
index 3792a9c04bb..31004c0005e 100644
--- a/client/consensus/relay-chain/src/import_queue.rs
+++ b/client/consensus/relay-chain/src/import_queue.rs
@@ -16,6 +16,8 @@
use std::{marker::PhantomData, sync::Arc};
+use cumulus_client_consensus_common::ParachainBlockImportMarker;
+
use sc_consensus::{
import_queue::{BasicQueue, Verifier as VerifierT},
BlockImport, BlockImportParams,
@@ -63,8 +65,10 @@ where
.await
.map_err(|e| e.to_string())?;
- let inherent_data =
- inherent_data_providers.create_inherent_data().map_err(|e| format!("{:?}", e))?;
+ let inherent_data = inherent_data_providers
+ .create_inherent_data()
+ .await
+ .map_err(|e| format!("{:?}", e))?;
let block = Block::new(block_params.header.clone(), inner_body);
@@ -109,7 +113,11 @@ pub fn import_queue(
registry: Option<&substrate_prometheus_endpoint::Registry>,
) -> ClientResult>
where
- I: BlockImport + Send + Sync + 'static,
+ I: BlockImport
+ + ParachainBlockImportMarker
+ + Send
+ + Sync
+ + 'static,
I::Transaction: Send,
Client: ProvideRuntimeApi + Send + Sync + 'static,
>::Api: BlockBuilderApi,
@@ -117,11 +125,5 @@ where
{
let verifier = Verifier::new(client, create_inherent_data_providers);
- Ok(BasicQueue::new(
- verifier,
- Box::new(cumulus_client_consensus_common::ParachainBlockImport::new(block_import)),
- None,
- spawner,
- registry,
- ))
+ Ok(BasicQueue::new(verifier, Box::new(block_import), None, spawner, registry))
}
diff --git a/client/consensus/relay-chain/src/lib.rs b/client/consensus/relay-chain/src/lib.rs
index 20d3b7cc757..4cd0ab24beb 100644
--- a/client/consensus/relay-chain/src/lib.rs
+++ b/client/consensus/relay-chain/src/lib.rs
@@ -34,11 +34,10 @@
//! 5. After the parachain candidate got backed and included, all collators start at 1.
use cumulus_client_consensus_common::{
- ParachainBlockImport, ParachainCandidate, ParachainConsensus,
+ ParachainBlockImportMarker, ParachainCandidate, ParachainConsensus,
};
use cumulus_primitives_core::{relay_chain::v2::Hash as PHash, ParaId, PersistedValidationData};
use cumulus_relay_chain_interface::RelayChainInterface;
-use parking_lot::Mutex;
use sc_consensus::{BlockImport, BlockImportParams};
use sp_consensus::{
@@ -46,6 +45,8 @@ use sp_consensus::{
};
use sp_inherents::{CreateInherentDataProviders, InherentData, InherentDataProvider};
use sp_runtime::traits::{Block as BlockT, Header as HeaderT};
+
+use parking_lot::Mutex;
use std::{marker::PhantomData, sync::Arc, time::Duration};
mod import_queue;
@@ -56,11 +57,11 @@ const LOG_TARGET: &str = "cumulus-consensus-relay-chain";
/// The implementation of the relay-chain provided consensus for parachains.
pub struct RelayChainConsensus {
para_id: ParaId,
- _phantom: PhantomData,
proposer_factory: Arc>,
create_inherent_data_providers: Arc,
- block_import: Arc>>,
+ block_import: Arc>,
relay_chain_interface: RCInterface,
+ _phantom: PhantomData,
}
impl Clone for RelayChainConsensus
@@ -70,11 +71,11 @@ where
fn clone(&self) -> Self {
Self {
para_id: self.para_id,
- _phantom: PhantomData,
proposer_factory: self.proposer_factory.clone(),
create_inherent_data_providers: self.create_inherent_data_providers.clone(),
block_import: self.block_import.clone(),
relay_chain_interface: self.relay_chain_interface.clone(),
+ _phantom: PhantomData,
}
}
}
@@ -82,6 +83,7 @@ where
impl RelayChainConsensus
where
B: BlockT,
+ BI: ParachainBlockImportMarker,
RCInterface: RelayChainInterface,
CIDP: CreateInherentDataProviders,
{
@@ -97,9 +99,7 @@ where
para_id,
proposer_factory: Arc::new(Mutex::new(proposer_factory)),
create_inherent_data_providers: Arc::new(create_inherent_data_providers),
- block_import: Arc::new(futures::lock::Mutex::new(ParachainBlockImport::new(
- block_import,
- ))),
+ block_import: Arc::new(futures::lock::Mutex::new(block_import)),
relay_chain_interface,
_phantom: PhantomData,
}
@@ -127,6 +127,7 @@ where
inherent_data_providers
.create_inherent_data()
+ .await
.map_err(|e| {
tracing::error!(
target: LOG_TARGET,
@@ -144,7 +145,7 @@ impl ParachainConsensus
where
B: BlockT,
RCInterface: RelayChainInterface + Clone,
- BI: BlockImport + Send + Sync,
+ BI: BlockImport + ParachainBlockImportMarker + Send + Sync,
PF: Environment + Send + Sync,
PF::Proposer: Proposer<
B,
@@ -247,7 +248,7 @@ where
ProofRecording = EnableProofRecording,
Proof = ::Proof,
>,
- BI: BlockImport + Send + Sync + 'static,
+ BI: BlockImport + ParachainBlockImportMarker + Send + Sync + 'static,
CIDP: CreateInherentDataProviders + 'static,
RCInterface: RelayChainInterface + Clone + 'static,
{
diff --git a/client/network/Cargo.toml b/client/network/Cargo.toml
index a37c8874f03..e98789d17a0 100644
--- a/client/network/Cargo.toml
+++ b/client/network/Cargo.toml
@@ -6,47 +6,47 @@ description = "Cumulus-specific networking protocol"
edition = "2021"
[dependencies]
-async-trait = "0.1.57"
+async-trait = "0.1.59"
codec = { package = "parity-scale-codec", version = "3.0.0", features = [ "derive" ] }
-futures = "0.3.24"
+futures = "0.3.25"
futures-timer = "3.0.2"
parking_lot = "0.12.1"
tracing = "0.1.37"
# Substrate
-sc-client-api = { git = "https://github.com/paritytech/substrate", branch = "polkadot-v0.9.31" }
-sp-blockchain = { git = "https://github.com/paritytech/substrate", branch = "polkadot-v0.9.31" }
-sp-consensus = { git = "https://github.com/paritytech/substrate", branch = "polkadot-v0.9.31" }
-sp-core = { git = "https://github.com/paritytech/substrate", branch = "polkadot-v0.9.31" }
-sp-runtime = { git = "https://github.com/paritytech/substrate", branch = "polkadot-v0.9.31" }
-sp-state-machine = { git = "https://github.com/paritytech/substrate", branch = "polkadot-v0.9.31" }
+sc-client-api = { git = "https://github.com/paritytech/substrate", branch = "polkadot-v0.9.36" }
+sp-blockchain = { git = "https://github.com/paritytech/substrate", branch = "polkadot-v0.9.36" }
+sp-consensus = { git = "https://github.com/paritytech/substrate", branch = "polkadot-v0.9.36" }
+sp-core = { git = "https://github.com/paritytech/substrate", branch = "polkadot-v0.9.36" }
+sp-runtime = { git = "https://github.com/paritytech/substrate", branch = "polkadot-v0.9.36" }
+sp-state-machine = { git = "https://github.com/paritytech/substrate", branch = "polkadot-v0.9.36" }
# Polkadot
-polkadot-node-primitives = { git = "https://github.com/paritytech/polkadot", branch = "release-v0.9.31" }
-polkadot-parachain = { git = "https://github.com/paritytech/polkadot", branch = "release-v0.9.31" }
-polkadot-primitives = { git = "https://github.com/paritytech/polkadot", branch = "release-v0.9.31" }
+polkadot-node-primitives = { git = "https://github.com/paritytech/polkadot", branch = "release-v0.9.36" }
+polkadot-parachain = { git = "https://github.com/paritytech/polkadot", branch = "release-v0.9.36" }
+polkadot-primitives = { git = "https://github.com/paritytech/polkadot", branch = "release-v0.9.36" }
# Cumulus
cumulus-relay-chain-interface = { path = "../relay-chain-interface" }
[dev-dependencies]
portpicker = "0.1.1"
-tokio = { version = "1.21.2", features = ["macros"] }
+tokio = { version = "1.23.0", features = ["macros"] }
url = "2.3.1"
# Substrate
-sc-cli = { git = "https://github.com/paritytech/substrate", branch = "polkadot-v0.9.31" }
-sc-client-api = { git = "https://github.com/paritytech/substrate", branch = "polkadot-v0.9.31" }
-sp-consensus = { git = "https://github.com/paritytech/substrate", branch = "polkadot-v0.9.31" }
-sp-core = { git = "https://github.com/paritytech/substrate", branch = "polkadot-v0.9.31" }
-sp-keyring = { git = "https://github.com/paritytech/substrate", branch = "polkadot-v0.9.31" }
-sp-keystore = { git = "https://github.com/paritytech/substrate", branch = "polkadot-v0.9.31" }
-substrate-test-utils = { git = "https://github.com/paritytech/substrate", branch = "polkadot-v0.9.31" }
+sc-cli = { git = "https://github.com/paritytech/substrate", branch = "polkadot-v0.9.36" }
+sc-client-api = { git = "https://github.com/paritytech/substrate", branch = "polkadot-v0.9.36" }
+sp-consensus = { git = "https://github.com/paritytech/substrate", branch = "polkadot-v0.9.36" }
+sp-core = { git = "https://github.com/paritytech/substrate", branch = "polkadot-v0.9.36" }
+sp-keyring = { git = "https://github.com/paritytech/substrate", branch = "polkadot-v0.9.36" }
+sp-keystore = { git = "https://github.com/paritytech/substrate", branch = "polkadot-v0.9.36" }
+substrate-test-utils = { git = "https://github.com/paritytech/substrate", branch = "polkadot-v0.9.36" }
# Polkadot
-polkadot-client = { git = "https://github.com/paritytech/polkadot", branch = "release-v0.9.31" }
-polkadot-service = { git = "https://github.com/paritytech/polkadot", branch = "release-v0.9.31" }
-polkadot-test-client = { git = "https://github.com/paritytech/polkadot", branch = "release-v0.9.31" }
+polkadot-client = { git = "https://github.com/paritytech/polkadot", branch = "release-v0.9.36" }
+polkadot-service = { git = "https://github.com/paritytech/polkadot", branch = "release-v0.9.36" }
+polkadot-test-client = { git = "https://github.com/paritytech/polkadot", branch = "release-v0.9.36" }
# Cumulus
cumulus-primitives-core = { path = "../../primitives/core" }
diff --git a/client/network/tests/sync_blocks_from_tip_without_connected_collator.rs b/client/network/tests/sync_blocks_from_tip_without_connected_collator.rs
deleted file mode 100644
index 6b4f17bc528..00000000000
--- a/client/network/tests/sync_blocks_from_tip_without_connected_collator.rs
+++ /dev/null
@@ -1,95 +0,0 @@
-// Copyright 2021 Parity Technologies (UK) Ltd.
-// This file is part of Substrate.
-
-// Substrate is free software: you can redistribute it and/or modify
-// it under the terms of the GNU General Public License as published by
-// the Free Software Foundation, either version 3 of the License, or
-// (at your option) any later version.
-
-// Substrate is distributed in the hope that it will be useful,
-// but WITHOUT ANY WARRANTY; without even the implied warranty of
-// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-// GNU General Public License for more details.
-
-// You should have received a copy of the GNU General Public License
-// along with Substrate. If not, see .
-
-use cumulus_primitives_core::ParaId;
-use cumulus_test_service::{initial_head_data, run_relay_chain_validator_node, Keyring::*};
-use futures::join;
-
-#[substrate_test_utils::test(flavor = "multi_thread")]
-#[ignore]
-async fn sync_blocks_from_tip_without_being_connected_to_a_collator() {
- let mut builder = sc_cli::LoggerBuilder::new("");
- builder.with_colors(false);
- let _ = builder.init();
-
- let para_id = ParaId::from(100);
- let tokio_handle = tokio::runtime::Handle::current();
-
- let ws_port = portpicker::pick_unused_port().expect("No free ports");
- // start alice
- let alice = run_relay_chain_validator_node(
- tokio_handle.clone(),
- Alice,
- || {},
- Vec::new(),
- Some(ws_port),
- );
-
- // start bob
- let bob = run_relay_chain_validator_node(
- tokio_handle.clone(),
- Bob,
- || {},
- vec![alice.addr.clone()],
- None,
- );
-
- // register parachain
- alice
- .register_parachain(
- para_id,
- cumulus_test_service::runtime::WASM_BINARY
- .expect("You need to build the WASM binary to run this test!")
- .to_vec(),
- initial_head_data(para_id),
- )
- .await
- .unwrap();
-
- // run charlie as parachain collator
- let charlie =
- cumulus_test_service::TestNodeBuilder::new(para_id, tokio_handle.clone(), Charlie)
- .enable_collator()
- .connect_to_relay_chain_nodes(vec![&alice, &bob])
- .build()
- .await;
-
- // run dave as parachain full node
- let dave = cumulus_test_service::TestNodeBuilder::new(para_id, tokio_handle.clone(), Dave)
- .connect_to_parachain_node(&charlie)
- .connect_to_relay_chain_nodes(vec![&alice, &bob])
- .build()
- .await;
-
- // run eve as parachain full node that is only connected to dave
- let eve = cumulus_test_service::TestNodeBuilder::new(para_id, tokio_handle.clone(), Eve)
- .connect_to_parachain_node(&dave)
- .exclusively_connect_to_registered_parachain_nodes()
- .connect_to_relay_chain_nodes(vec![&alice, &bob])
- .build()
- .await;
-
- // run eve as parachain full node that is only connected to dave
- let ferdie = cumulus_test_service::TestNodeBuilder::new(para_id, tokio_handle, Ferdie)
- .connect_to_parachain_node(&dave)
- .exclusively_connect_to_registered_parachain_nodes()
- .connect_to_relay_chain_nodes(vec![&alice, &bob])
- .use_external_relay_chain_node_at_port(ws_port)
- .build()
- .await;
-
- join!(ferdie.wait_for_blocks(7), eve.wait_for_blocks(7));
-}
diff --git a/client/pov-recovery/Cargo.toml b/client/pov-recovery/Cargo.toml
index b61cecfe416..39863bdd695 100644
--- a/client/pov-recovery/Cargo.toml
+++ b/client/pov-recovery/Cargo.toml
@@ -7,36 +7,36 @@ edition = "2021"
[dependencies]
codec = { package = "parity-scale-codec", version = "3.0.0", features = [ "derive" ] }
-futures = "0.3.24"
+futures = "0.3.25"
futures-timer = "3.0.2"
rand = "0.8.5"
tracing = "0.1.37"
# Substrate
-sc-client-api = { git = "https://github.com/paritytech/substrate", branch = "polkadot-v0.9.31" }
-sc-consensus = { git = "https://github.com/paritytech/substrate", branch = "polkadot-v0.9.31" }
-sp-consensus = { git = "https://github.com/paritytech/substrate", branch = "polkadot-v0.9.31" }
-sp-maybe-compressed-blob = { git = "https://github.com/paritytech/substrate", branch = "polkadot-v0.9.31" }
-sp-runtime = { git = "https://github.com/paritytech/substrate", branch = "polkadot-v0.9.31" }
+sc-client-api = { git = "https://github.com/paritytech/substrate", branch = "polkadot-v0.9.36" }
+sc-consensus = { git = "https://github.com/paritytech/substrate", branch = "polkadot-v0.9.36" }
+sp-consensus = { git = "https://github.com/paritytech/substrate", branch = "polkadot-v0.9.36" }
+sp-maybe-compressed-blob = { git = "https://github.com/paritytech/substrate", branch = "polkadot-v0.9.36" }
+sp-runtime = { git = "https://github.com/paritytech/substrate", branch = "polkadot-v0.9.36" }
# Polkadot
-polkadot-node-primitives = { git = "https://github.com/paritytech/polkadot", branch = "release-v0.9.31" }
-polkadot-node-subsystem = { git = "https://github.com/paritytech/polkadot", branch = "release-v0.9.31" }
-polkadot-overseer = { git = "https://github.com/paritytech/polkadot", branch = "release-v0.9.31" }
-polkadot-primitives = { git = "https://github.com/paritytech/polkadot", branch = "release-v0.9.31" }
+polkadot-node-primitives = { git = "https://github.com/paritytech/polkadot", branch = "release-v0.9.36" }
+polkadot-node-subsystem = { git = "https://github.com/paritytech/polkadot", branch = "release-v0.9.36" }
+polkadot-overseer = { git = "https://github.com/paritytech/polkadot", branch = "release-v0.9.36" }
+polkadot-primitives = { git = "https://github.com/paritytech/polkadot", branch = "release-v0.9.36" }
# Cumulus
cumulus-primitives-core = { path = "../../primitives/core" }
cumulus-relay-chain-interface = {path = "../relay-chain-interface"}
[dev-dependencies]
-tokio = { version = "1.21.2", features = ["macros"] }
+tokio = { version = "1.23.0", features = ["macros"] }
portpicker = "0.1.1"
# Cumulus
cumulus-test-service = { path = "../../test/service" }
# Substrate
-sc-cli = { git = "https://github.com/paritytech/substrate", branch = "polkadot-v0.9.31" }
-sc-client-api = { git = "https://github.com/paritytech/substrate", branch = "polkadot-v0.9.31" }
-substrate-test-utils = { git = "https://github.com/paritytech/substrate", branch = "polkadot-v0.9.31" }
+sc-cli = { git = "https://github.com/paritytech/substrate", branch = "polkadot-v0.9.36" }
+sc-client-api = { git = "https://github.com/paritytech/substrate", branch = "polkadot-v0.9.36" }
+substrate-test-utils = { git = "https://github.com/paritytech/substrate", branch = "polkadot-v0.9.36" }
diff --git a/client/pov-recovery/src/active_candidate_recovery.rs b/client/pov-recovery/src/active_candidate_recovery.rs
index a269a26f821..caae3615a85 100644
--- a/client/pov-recovery/src/active_candidate_recovery.rs
+++ b/client/pov-recovery/src/active_candidate_recovery.rs
@@ -42,19 +42,19 @@ impl ActiveCandidateRecovery {
Self { recoveries: Default::default(), candidates: Default::default(), overseer_handle }
}
- /// Recover the given `pending_candidate`.
+ /// Recover the given `candidate`.
pub async fn recover_candidate(
&mut self,
block_hash: Block::Hash,
- pending_candidate: crate::PendingCandidate,
+ candidate: &crate::Candidate,
) {
let (tx, rx) = oneshot::channel();
self.overseer_handle
.send_msg(
AvailabilityRecoveryMessage::RecoverAvailableData(
- pending_candidate.receipt,
- pending_candidate.session_index,
+ candidate.receipt.clone(),
+ candidate.session_index,
None,
tx,
),
diff --git a/client/pov-recovery/src/lib.rs b/client/pov-recovery/src/lib.rs
index 3327d4bb86b..90c0a853214 100644
--- a/client/pov-recovery/src/lib.rs
+++ b/client/pov-recovery/src/lib.rs
@@ -42,7 +42,7 @@
//! make sure that the blocks are imported in the correct order.
use sc_client_api::{BlockBackend, BlockchainEvents, UsageProvider};
-use sc_consensus::import_queue::{ImportQueue, IncomingBlock};
+use sc_consensus::import_queue::{ImportQueueService, IncomingBlock};
use sp_consensus::{BlockOrigin, BlockStatus};
use sp_runtime::{
generic::BlockId,
@@ -59,7 +59,9 @@ use cumulus_primitives_core::ParachainBlockData;
use cumulus_relay_chain_interface::{RelayChainInterface, RelayChainResult};
use codec::Decode;
-use futures::{select, stream::FuturesUnordered, Future, FutureExt, Stream, StreamExt};
+use futures::{
+ channel::mpsc::Receiver, select, stream::FuturesUnordered, Future, FutureExt, Stream, StreamExt,
+};
use futures_timer::Delay;
use rand::{thread_rng, Rng};
@@ -75,38 +77,52 @@ use active_candidate_recovery::ActiveCandidateRecovery;
const LOG_TARGET: &str = "cumulus-pov-recovery";
-/// Represents a pending candidate.
-struct PendingCandidate {
- receipt: CandidateReceipt,
- session_index: SessionIndex,
- block_number: NumberFor,
+/// Type of recovery to trigger.
+#[derive(Debug, PartialEq)]
+pub enum RecoveryKind {
+ /// Single block recovery.
+ Simple,
+ /// Full ancestry recovery.
+ Full,
}
-/// The delay between observing an unknown block and recovering this block.
+/// Structure used to trigger an explicit recovery request via `PoVRecovery`.
+pub struct RecoveryRequest {
+ /// Hash of the last block to recover.
+ pub hash: Block::Hash,
+ /// Recovery delay range. Randomizing the start of the recovery within this interval
+ /// can be used to prevent self-DOSing if the recovery request is part of a
+ /// distributed protocol and there is the possibility that multiple actors are
+ /// requiring to perform the recovery action at approximately the same time.
+ pub delay: RecoveryDelay,
+ /// Recovery type.
+ pub kind: RecoveryKind,
+}
+
+/// The delay between observing an unknown block and triggering the recovery of a block.
#[derive(Clone, Copy)]
-pub enum RecoveryDelay {
- /// Start recovering the block in maximum of the given delay.
- WithMax { max: Duration },
- /// Start recovering the block after at least `min` delay and in maximum `max` delay.
- WithMinAndMax { min: Duration, max: Duration },
+pub struct RecoveryDelay {
+ /// Start recovering after `min` delay.
+ pub min: Duration,
+ /// Start recovering before `max` delay.
+ pub max: Duration,
}
-impl RecoveryDelay {
- /// Return as [`Delay`].
- fn as_delay(self) -> Delay {
- match self {
- Self::WithMax { max } => Delay::new(max.mul_f64(thread_rng().gen())),
- Self::WithMinAndMax { min, max } =>
- Delay::new(min + max.saturating_sub(min).mul_f64(thread_rng().gen())),
- }
- }
+/// Represents an outstanding block candidate.
+struct Candidate {
+ receipt: CandidateReceipt,
+ session_index: SessionIndex,
+ block_number: NumberFor,
+ parent_hash: Block::Hash,
+ // Lazy recovery has been submitted.
+ waiting_recovery: bool,
}
/// Encapsulates the logic of the pov recovery.
-pub struct PoVRecovery {
+pub struct PoVRecovery {
/// All the pending candidates that we are waiting for to be imported or that need to be
/// recovered when `next_candidate_to_recover` tells us to do so.
- pending_candidates: HashMap>,
+ candidates: HashMap>,
/// A stream of futures that resolve to hashes of candidates that need to be recovered.
///
/// The candidates to the hashes are stored in `pending_candidates`. If a candidate is not
@@ -119,28 +135,30 @@ pub struct PoVRecovery {
waiting_for_parent: HashMap>,
recovery_delay: RecoveryDelay,
parachain_client: Arc,
- parachain_import_queue: IQ,
+ parachain_import_queue: Box>,
relay_chain_interface: RC,
para_id: ParaId,
+ /// Explicit block recovery requests channel.
+ recovery_chan_rx: Receiver>,
}
-impl PoVRecovery
+impl PoVRecovery
where
PC: BlockBackend + BlockchainEvents + UsageProvider,
RCInterface: RelayChainInterface + Clone,
- IQ: ImportQueue,
{
/// Create a new instance.
pub fn new(
overseer_handle: OverseerHandle,
recovery_delay: RecoveryDelay,
parachain_client: Arc,
- parachain_import_queue: IQ,
+ parachain_import_queue: Box>,
relay_chain_interface: RCInterface,
para_id: ParaId,
+ recovery_chan_rx: Receiver>,
) -> Self {
Self {
- pending_candidates: HashMap::new(),
+ candidates: HashMap::new(),
next_candidate_to_recover: Default::default(),
active_candidate_recovery: ActiveCandidateRecovery::new(overseer_handle),
recovery_delay,
@@ -149,6 +167,7 @@ where
parachain_import_queue,
relay_chain_interface,
para_id,
+ recovery_chan_rx,
}
}
@@ -175,69 +194,54 @@ where
}
let hash = header.hash();
- match self.parachain_client.block_status(&BlockId::Hash(hash)) {
- Ok(BlockStatus::Unknown) => (),
- // Any other state means, we should ignore it.
- Ok(_) => return,
- Err(e) => {
- tracing::debug!(
- target: LOG_TARGET,
- error = ?e,
- block_hash = ?hash,
- "Failed to get block status",
- );
- return
- },
- }
- tracing::debug!(target: LOG_TARGET, ?hash, "Adding pending candidate");
- if self
- .pending_candidates
- .insert(
- hash,
- PendingCandidate {
- block_number: *header.number(),
- receipt: receipt.to_plain(),
- session_index,
- },
- )
- .is_some()
- {
+ if self.candidates.contains_key(&hash) {
return
}
- // Delay the recovery by some random time to not spam the relay chain.
- let delay = self.recovery_delay.as_delay();
- self.next_candidate_to_recover.push(
- async move {
- delay.await;
- hash
- }
- .boxed(),
+ tracing::debug!(target: LOG_TARGET, block_hash = ?hash, "Adding outstanding candidate");
+ self.candidates.insert(
+ hash,
+ Candidate {
+ block_number: *header.number(),
+ receipt: receipt.to_plain(),
+ session_index,
+ parent_hash: *header.parent_hash(),
+ waiting_recovery: false,
+ },
);
+
+ // If required, triggers a lazy recovery request that will eventually be blocked
+ // if in the meantime the block is imported.
+ self.recover(RecoveryRequest {
+ hash,
+ delay: self.recovery_delay,
+ kind: RecoveryKind::Simple,
+ });
}
/// Handle an imported block.
- fn handle_block_imported(&mut self, hash: &Block::Hash) {
- self.pending_candidates.remove(hash);
+ fn handle_block_imported(&mut self, block_hash: &Block::Hash) {
+ self.candidates.get_mut(block_hash).map(|candidate| {
+ // Prevents triggering an already enqueued recovery request
+ candidate.waiting_recovery = false;
+ });
}
/// Handle a finalized block with the given `block_number`.
fn handle_block_finalized(&mut self, block_number: NumberFor) {
- self.pending_candidates.retain(|_, pc| pc.block_number > block_number);
+ self.candidates.retain(|_, pc| pc.block_number > block_number);
}
/// Recover the candidate for the given `block_hash`.
async fn recover_candidate(&mut self, block_hash: Block::Hash) {
- let pending_candidate = match self.pending_candidates.remove(&block_hash) {
- Some(pending_candidate) => pending_candidate,
- None => return,
- };
-
- tracing::debug!(target: LOG_TARGET, ?block_hash, "Issuing recovery request");
- self.active_candidate_recovery
- .recover_candidate(block_hash, pending_candidate)
- .await;
+ match self.candidates.get(&block_hash) {
+ Some(candidate) if candidate.waiting_recovery => {
+ tracing::debug!(target: LOG_TARGET, ?block_hash, "Issuing recovery request");
+ self.active_candidate_recovery.recover_candidate(block_hash, candidate).await;
+ },
+ _ => (),
+ }
}
/// Clear `waiting_for_parent` from the given `hash` and do this recursively for all child
@@ -349,7 +353,7 @@ where
async fn import_block(&mut self, block: Block) {
let mut blocks = VecDeque::new();
- tracing::debug!(target: LOG_TARGET, hash = ?block.hash(), "Importing block retrieved using pov_recovery");
+ tracing::debug!(target: LOG_TARGET, block_hash = ?block.hash(), "Importing block retrieved using pov_recovery");
blocks.push_back(block);
let mut incoming_blocks = Vec::new();
@@ -380,6 +384,70 @@ where
.import_blocks(BlockOrigin::ConsensusBroadcast, incoming_blocks);
}
+ /// Attempts an explicit recovery of one or more blocks.
+ pub fn recover(&mut self, req: RecoveryRequest) {
+ let RecoveryRequest { mut hash, delay, kind } = req;
+ let mut to_recover = Vec::new();
+
+ let do_recover = loop {
+ let candidate = match self.candidates.get_mut(&hash) {
+ Some(candidate) => candidate,
+ None => {
+ tracing::debug!(
+ target: LOG_TARGET,
+ block_hash = ?hash,
+ "Cound not recover. Block was never announced as candidate"
+ );
+ break false
+ },
+ };
+
+ match self.parachain_client.block_status(&BlockId::Hash(hash)) {
+ Ok(BlockStatus::Unknown) if !candidate.waiting_recovery => {
+ candidate.waiting_recovery = true;
+ to_recover.push(hash);
+ },
+ Ok(_) => break true,
+ Err(e) => {
+ tracing::error!(
+ target: LOG_TARGET,
+ error = ?e,
+ block_hash = ?hash,
+ "Failed to get block status",
+ );
+ break false
+ },
+ }
+
+ if kind == RecoveryKind::Simple {
+ break true
+ }
+
+ hash = candidate.parent_hash;
+ };
+
+ if do_recover {
+ for hash in to_recover.into_iter().rev() {
+ let delay =
+ delay.min + delay.max.saturating_sub(delay.min).mul_f64(thread_rng().gen());
+ tracing::debug!(
+ target: LOG_TARGET,
+ block_hash = ?hash,
+ "Starting {:?} block recovery in {:?} sec",
+ kind,
+ delay.as_secs(),
+ );
+ self.next_candidate_to_recover.push(
+ async move {
+ Delay::new(delay).await;
+ hash
+ }
+ .boxed(),
+ );
+ }
+ }
+ }
+
/// Run the pov-recovery.
pub async fn run(mut self) {
let mut imported_blocks = self.parachain_client.import_notification_stream().fuse();
@@ -401,10 +469,15 @@ where
if let Some((receipt, session_index)) = pending_candidate {
self.handle_pending_candidate(receipt, session_index);
} else {
- tracing::debug!(
- target: LOG_TARGET,
- "Pending candidates stream ended",
- );
+ tracing::debug!(target: LOG_TARGET, "Pending candidates stream ended");
+ return;
+ }
+ },
+ recovery_req = self.recovery_chan_rx.next() => {
+ if let Some(req) = recovery_req {
+ self.recover(req);
+ } else {
+ tracing::debug!(target: LOG_TARGET, "Recovery channel stream ended");
return;
}
},
@@ -412,10 +485,7 @@ where
if let Some(imported) = imported {
self.handle_block_imported(&imported.hash);
} else {
- tracing::debug!(
- target: LOG_TARGET,
- "Imported blocks stream ended",
- );
+ tracing::debug!(target: LOG_TARGET, "Imported blocks stream ended");
return;
}
},
@@ -423,10 +493,7 @@ where
if let Some(finalized) = finalized {
self.handle_block_finalized(*finalized.header.number());
} else {
- tracing::debug!(
- target: LOG_TARGET,
- "Finalized blocks stream ended",
- );
+ tracing::debug!(target: LOG_TARGET, "Finalized blocks stream ended");
return;
}
},
diff --git a/client/pov-recovery/tests/pov_recovery.rs b/client/pov-recovery/tests/pov_recovery.rs
deleted file mode 100644
index dd8be634896..00000000000
--- a/client/pov-recovery/tests/pov_recovery.rs
+++ /dev/null
@@ -1,140 +0,0 @@
-// Copyright 2021 Parity Technologies (UK) Ltd.
-// This file is part of Substrate.
-
-// Substrate is free software: you can redistribute it and/or modify
-// it under the terms of the GNU General Public License as published by
-// the Free Software Foundation, either version 3 of the License, or
-// (at your option) any later version.
-
-// Substrate is distributed in the hope that it will be useful,
-// but WITHOUT ANY WARRANTY; without even the implied warranty of
-// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-// GNU General Public License for more details.
-
-// You should have received a copy of the GNU General Public License
-// along with Substrate. If not, see .
-
-use cumulus_primitives_core::ParaId;
-use cumulus_test_service::{initial_head_data, Keyring::*};
-use futures::join;
-use std::sync::Arc;
-
-/// Tests the PoV recovery.
-///
-/// If there is a block of the parachain included/backed by the relay chain that isn't circulated in
-/// the parachain network, we need to recover the PoV from the relay chain. Using this PoV we can
-/// recover the block, import it and share it with the other nodes of the parachain network.
-#[substrate_test_utils::test(flavor = "multi_thread")]
-#[ignore]
-async fn pov_recovery() {
- let mut builder = sc_cli::LoggerBuilder::new("");
- builder.with_colors(false);
- let _ = builder.init();
-
- let para_id = ParaId::from(100);
- let tokio_handle = tokio::runtime::Handle::current();
-
- // Start alice
- let ws_port = portpicker::pick_unused_port().expect("No free ports");
- let alice = cumulus_test_service::run_relay_chain_validator_node(
- tokio_handle.clone(),
- Alice,
- || {},
- Vec::new(),
- Some(ws_port),
- );
-
- // Start bob
- let bob = cumulus_test_service::run_relay_chain_validator_node(
- tokio_handle.clone(),
- Bob,
- || {},
- vec![alice.addr.clone()],
- None,
- );
-
- // Register parachain
- alice
- .register_parachain(
- para_id,
- cumulus_test_service::runtime::WASM_BINARY
- .expect("You need to build the WASM binary to run this test!")
- .to_vec(),
- initial_head_data(para_id),
- )
- .await
- .unwrap();
-
- // Run charlie as parachain collator
- let charlie =
- cumulus_test_service::TestNodeBuilder::new(para_id, tokio_handle.clone(), Charlie)
- .enable_collator()
- .connect_to_relay_chain_nodes(vec![&alice, &bob])
- .wrap_announce_block(|_| {
- // Never announce any block
- Arc::new(|_, _| {})
- })
- .build()
- .await;
-
- // Run dave as parachain collator and eve as parachain full node
- //
- // They will need to recover the pov blocks through availability recovery.
- let dave = cumulus_test_service::TestNodeBuilder::new(para_id, tokio_handle.clone(), Dave)
- .enable_collator()
- .use_null_consensus()
- .connect_to_parachain_node(&charlie)
- .connect_to_relay_chain_nodes(vec![&alice, &bob])
- .wrap_announce_block(|_| {
- // Never announce any block
- Arc::new(|_, _| {})
- })
- .build()
- .await;
-
- let eve = cumulus_test_service::TestNodeBuilder::new(para_id, tokio_handle.clone(), Eve)
- .use_null_consensus()
- .connect_to_parachain_node(&charlie)
- .connect_to_relay_chain_nodes(vec![&alice, &bob])
- .wrap_announce_block(|_| {
- // Never announce any block
- Arc::new(|_, _| {})
- })
- .build()
- .await;
-
- // Run ferdie as parachain RPC collator and one as parachain RPC full node
- //
- // They will need to recover the pov blocks through availability recovery.
- let ferdie = cumulus_test_service::TestNodeBuilder::new(para_id, tokio_handle.clone(), Ferdie)
- .use_null_consensus()
- .connect_to_parachain_node(&charlie)
- .connect_to_relay_chain_nodes(vec![&alice, &bob])
- .use_external_relay_chain_node_at_port(ws_port)
- .wrap_announce_block(|_| {
- // Never announce any block
- Arc::new(|_, _| {})
- })
- .build()
- .await;
-
- let one = cumulus_test_service::TestNodeBuilder::new(para_id, tokio_handle, One)
- .enable_collator()
- .use_null_consensus()
- .connect_to_parachain_node(&charlie)
- .connect_to_relay_chain_nodes(vec![&alice, &bob])
- .use_external_relay_chain_node_at_port(ws_port)
- .wrap_announce_block(|_| {
- // Never announce any block
- Arc::new(|_, _| {})
- })
- .build()
- .await;
-
- join!(
- dave.wait_for_blocks(7),
- eve.wait_for_blocks(7),
- ferdie.wait_for_blocks(7),
- one.wait_for_blocks(7)
- );
-}
diff --git a/client/relay-chain-inprocess-interface/Cargo.toml b/client/relay-chain-inprocess-interface/Cargo.toml
index a7def2bc322..e882e0df00d 100644
--- a/client/relay-chain-inprocess-interface/Cargo.toml
+++ b/client/relay-chain-inprocess-interface/Cargo.toml
@@ -5,26 +5,26 @@ version = "0.1.0"
edition = "2021"
[dependencies]
-async-trait = "0.1.57"
-futures = "0.3.24"
+async-trait = "0.1.59"
+futures = "0.3.25"
futures-timer = "3.0.2"
# Substrate
-sc-cli = { git = "https://github.com/paritytech/substrate", branch = "polkadot-v0.9.31" }
-sc-client-api = { git = "https://github.com/paritytech/substrate", branch = "polkadot-v0.9.31" }
-sc-telemetry = { git = "https://github.com/paritytech/substrate", branch = "polkadot-v0.9.31" }
-sc-tracing = { git = "https://github.com/paritytech/substrate", branch = "polkadot-v0.9.31" }
-sc-sysinfo = { git = "https://github.com/paritytech/substrate", branch = "polkadot-v0.9.31" }
-sp-api = { git = "https://github.com/paritytech/substrate", branch = "polkadot-v0.9.31" }
-sp-consensus = { git = "https://github.com/paritytech/substrate", branch = "polkadot-v0.9.31" }
-sp-core = { git = "https://github.com/paritytech/substrate", branch = "polkadot-v0.9.31" }
-sp-runtime = { git = "https://github.com/paritytech/substrate", branch = "polkadot-v0.9.31" }
-sp-state-machine = { git = "https://github.com/paritytech/substrate", branch = "polkadot-v0.9.31" }
+sc-cli = { git = "https://github.com/paritytech/substrate", branch = "polkadot-v0.9.36" }
+sc-client-api = { git = "https://github.com/paritytech/substrate", branch = "polkadot-v0.9.36" }
+sc-telemetry = { git = "https://github.com/paritytech/substrate", branch = "polkadot-v0.9.36" }
+sc-tracing = { git = "https://github.com/paritytech/substrate", branch = "polkadot-v0.9.36" }
+sc-sysinfo = { git = "https://github.com/paritytech/substrate", branch = "polkadot-v0.9.36" }
+sp-api = { git = "https://github.com/paritytech/substrate", branch = "polkadot-v0.9.36" }
+sp-consensus = { git = "https://github.com/paritytech/substrate", branch = "polkadot-v0.9.36" }
+sp-core = { git = "https://github.com/paritytech/substrate", branch = "polkadot-v0.9.36" }
+sp-runtime = { git = "https://github.com/paritytech/substrate", branch = "polkadot-v0.9.36" }
+sp-state-machine = { git = "https://github.com/paritytech/substrate", branch = "polkadot-v0.9.36" }
# Polkadot
-polkadot-cli = { git = "https://github.com/paritytech/polkadot", default-features = false, features = ["cli"] , branch = "release-v0.9.31" }
-polkadot-client = { git = "https://github.com/paritytech/polkadot", branch = "release-v0.9.31" }
-polkadot-service = { git = "https://github.com/paritytech/polkadot", branch = "release-v0.9.31" }
+polkadot-cli = { git = "https://github.com/paritytech/polkadot", default-features = false, features = ["cli"] , branch = "release-v0.9.36" }
+polkadot-client = { git = "https://github.com/paritytech/polkadot", branch = "release-v0.9.36" }
+polkadot-service = { git = "https://github.com/paritytech/polkadot", branch = "release-v0.9.36" }
# Cumulus
cumulus-primitives-core = { path = "../../primitives/core" }
@@ -33,11 +33,11 @@ cumulus-relay-chain-interface = { path = "../relay-chain-interface" }
[dev-dependencies]
# Substrate
-sp-keyring = { git = "https://github.com/paritytech/substrate", branch = "polkadot-v0.9.31" }
+sp-keyring = { git = "https://github.com/paritytech/substrate", branch = "polkadot-v0.9.36" }
# Polkadot
-polkadot-primitives = { git = "https://github.com/paritytech/polkadot", branch = "release-v0.9.31" }
-polkadot-test-client = { git = "https://github.com/paritytech/polkadot", branch = "release-v0.9.31" }
+polkadot-primitives = { git = "https://github.com/paritytech/polkadot", branch = "release-v0.9.36" }
+polkadot-test-client = { git = "https://github.com/paritytech/polkadot", branch = "release-v0.9.36" }
metered = { package = "prioritized-metered-channel", version = "0.2.0" }
# Cumulus
diff --git a/client/relay-chain-inprocess-interface/src/lib.rs b/client/relay-chain-inprocess-interface/src/lib.rs
index 22e888757c7..4477763c622 100644
--- a/client/relay-chain-inprocess-interface/src/lib.rs
+++ b/client/relay-chain-inprocess-interface/src/lib.rs
@@ -180,7 +180,7 @@ where
relay_parent: PHash,
key: &[u8],
) -> RelayChainResult> {
- let state = self.backend.state_at(&relay_parent)?;
+ let state = self.backend.state_at(relay_parent)?;
state.storage(key).map_err(RelayChainError::GenericError)
}
@@ -189,7 +189,7 @@ where
relay_parent: PHash,
relevant_keys: &Vec>,
) -> RelayChainResult {
- let state_backend = self.backend.state_at(&relay_parent)?;
+ let state_backend = self.backend.state_at(relay_parent)?;
sp_state_machine::prove_read(state_backend, relevant_keys)
.map_err(RelayChainError::StateMachineError)
diff --git a/client/relay-chain-interface/Cargo.toml b/client/relay-chain-interface/Cargo.toml
index 860b2cd327b..fe17ac75a0d 100644
--- a/client/relay-chain-interface/Cargo.toml
+++ b/client/relay-chain-interface/Cargo.toml
@@ -5,18 +5,19 @@ version = "0.1.0"
edition = "2021"
[dependencies]
-polkadot-overseer = { git = "https://github.com/paritytech/polkadot", branch = "release-v0.9.31" }
-polkadot-service = { git = "https://github.com/paritytech/polkadot", branch = "release-v0.9.31" }
+polkadot-overseer = { git = "https://github.com/paritytech/polkadot", branch = "release-v0.9.36" }
+polkadot-service = { git = "https://github.com/paritytech/polkadot", branch = "release-v0.9.36" }
cumulus-primitives-core = { path = "../../primitives/core" }
-sp-api = { git = "https://github.com/paritytech/substrate", branch = "polkadot-v0.9.31" }
-sp-blockchain = { git = "https://github.com/paritytech/substrate", branch = "polkadot-v0.9.31" }
-sp-state-machine = { git = "https://github.com/paritytech/substrate", branch = "polkadot-v0.9.31" }
-sc-client-api = { git = "https://github.com/paritytech/substrate", branch = "polkadot-v0.9.31" }
+sp-api = { git = "https://github.com/paritytech/substrate", branch = "polkadot-v0.9.36" }
+sp-blockchain = { git = "https://github.com/paritytech/substrate", branch = "polkadot-v0.9.36" }
+sp-state-machine = { git = "https://github.com/paritytech/substrate", branch = "polkadot-v0.9.36" }
+sc-client-api = { git = "https://github.com/paritytech/substrate", branch = "polkadot-v0.9.36" }
-futures = "0.3.24"
-async-trait = "0.1.57"
-thiserror = "1.0.37"
-jsonrpsee-core = "0.15.1"
+tokio = { version = "1.21.2", features = ["sync"] }
+futures = "0.3.25"
+async-trait = "0.1.59"
+thiserror = "1.0.38"
+jsonrpsee-core = "0.16.2"
parity-scale-codec = "3.2.1"
diff --git a/client/relay-chain-interface/src/lib.rs b/client/relay-chain-interface/src/lib.rs
index 4505ac70973..56de750b687 100644
--- a/client/relay-chain-interface/src/lib.rs
+++ b/client/relay-chain-interface/src/lib.rs
@@ -51,11 +51,11 @@ pub enum RelayChainError {
BlockchainError(#[from] sp_blockchain::Error),
#[error("State machine error occured: {0}")]
StateMachineError(Box),
- #[error("Unable to call RPC method '{0}' due to error: {1}")]
- RpcCallError(String, JsonRpcError),
+ #[error("Unable to call RPC method '{0}'")]
+ RpcCallError(String),
#[error("RPC Error: '{0}'")]
JsonRpcError(#[from] JsonRpcError),
- #[error("Unable to reach RpcStreamWorker: {0}")]
+ #[error("Unable to communicate with RPC worker: {0}")]
WorkerCommunicationError(String),
#[error("Scale codec deserialization error: {0}")]
DeserializationError(CodecError),
diff --git a/client/relay-chain-minimal-node/Cargo.toml b/client/relay-chain-minimal-node/Cargo.toml
index dbaaec8233a..75e398b55a9 100644
--- a/client/relay-chain-minimal-node/Cargo.toml
+++ b/client/relay-chain-minimal-node/Cargo.toml
@@ -6,45 +6,41 @@ edition = "2021"
[dependencies]
# polkadot deps
-polkadot-primitives = { git = "https://github.com/paritytech/polkadot", branch = "release-v0.9.31" }
-polkadot-core-primitives = { git = "https://github.com/paritytech/polkadot", branch = "release-v0.9.31" }
-polkadot-overseer = { git = "https://github.com/paritytech/polkadot", branch = "release-v0.9.31" }
-polkadot-service = { git = "https://github.com/paritytech/polkadot", branch = "release-v0.9.31" }
-polkadot-node-subsystem-util = { git = "https://github.com/paritytech/polkadot", branch = "release-v0.9.31" }
-polkadot-node-network-protocol = { git = "https://github.com/paritytech/polkadot", branch = "release-v0.9.31" }
-polkadot-network-bridge = { git = "https://github.com/paritytech/polkadot", branch = "release-v0.9.31" }
-polkadot-node-core-av-store = { git = "https://github.com/paritytech/polkadot", branch = "release-v0.9.31" }
-polkadot-availability-distribution = { git = "https://github.com/paritytech/polkadot", branch = "release-v0.9.31" }
+polkadot-primitives = { git = "https://github.com/paritytech/polkadot", branch = "release-v0.9.36" }
+polkadot-core-primitives = { git = "https://github.com/paritytech/polkadot", branch = "release-v0.9.36" }
+polkadot-overseer = { git = "https://github.com/paritytech/polkadot", branch = "release-v0.9.36" }
+polkadot-service = { git = "https://github.com/paritytech/polkadot", branch = "release-v0.9.36" }
+polkadot-node-subsystem-util = { git = "https://github.com/paritytech/polkadot", branch = "release-v0.9.36" }
+polkadot-node-network-protocol = { git = "https://github.com/paritytech/polkadot", branch = "release-v0.9.36" }
+polkadot-network-bridge = { git = "https://github.com/paritytech/polkadot", branch = "release-v0.9.36" }
# substrate deps
-sc-authority-discovery = { git = "https://github.com/paritytech/substrate", branch = "polkadot-v0.9.31" }
-sc-client-api = { git = "https://github.com/paritytech/substrate", branch = "polkadot-v0.9.31" }
-sc-keystore = { git = "https://github.com/paritytech/substrate", branch = "polkadot-v0.9.31" }
-sc-network = { git = "https://github.com/paritytech/substrate", branch = "polkadot-v0.9.31" }
-sc-network-sync = { git = "https://github.com/paritytech/substrate", branch = "polkadot-v0.9.31" }
-sc-network-common = { git = "https://github.com/paritytech/substrate", branch = "polkadot-v0.9.31" }
-sc-network-light = { git = "https://github.com/paritytech/substrate", branch = "polkadot-v0.9.31" }
-sc-service = { git = "https://github.com/paritytech/substrate", branch = "polkadot-v0.9.31" }
-sc-transaction-pool-api = { git = "https://github.com/paritytech/substrate", branch = "polkadot-v0.9.31" }
-sc-transaction-pool = { git = "https://github.com/paritytech/substrate", branch = "polkadot-v0.9.31" }
-sc-consensus = { git = "https://github.com/paritytech/substrate", branch = "polkadot-v0.9.31" }
-sc-tracing = { git = "https://github.com/paritytech/substrate", branch = "polkadot-v0.9.31" }
-sc-telemetry = { git = "https://github.com/paritytech/substrate", branch = "polkadot-v0.9.31" }
-sp-api = { git = "https://github.com/paritytech/substrate", branch = "polkadot-v0.9.31" }
-sp-blockchain = { git = "https://github.com/paritytech/substrate", branch = "polkadot-v0.9.31" }
-sp-consensus-babe = { git = "https://github.com/paritytech/substrate", branch = "polkadot-v0.9.31" }
-sp-consensus = { git = "https://github.com/paritytech/substrate", branch = "polkadot-v0.9.31" }
-sp-runtime = { git = "https://github.com/paritytech/substrate", branch = "polkadot-v0.9.31" }
+sc-authority-discovery = { git = "https://github.com/paritytech/substrate", branch = "polkadot-v0.9.36" }
+sc-client-api = { git = "https://github.com/paritytech/substrate", branch = "polkadot-v0.9.36" }
+sc-keystore = { git = "https://github.com/paritytech/substrate", branch = "polkadot-v0.9.36" }
+sc-network = { git = "https://github.com/paritytech/substrate", branch = "polkadot-v0.9.36" }
+sc-network-common = { git = "https://github.com/paritytech/substrate", branch = "polkadot-v0.9.36" }
+sc-service = { git = "https://github.com/paritytech/substrate", branch = "polkadot-v0.9.36" }
+sc-transaction-pool-api = { git = "https://github.com/paritytech/substrate", branch = "polkadot-v0.9.36" }
+sc-transaction-pool = { git = "https://github.com/paritytech/substrate", branch = "polkadot-v0.9.36" }
+sc-consensus = { git = "https://github.com/paritytech/substrate", branch = "polkadot-v0.9.36" }
+sc-tracing = { git = "https://github.com/paritytech/substrate", branch = "polkadot-v0.9.36" }
+sc-telemetry = { git = "https://github.com/paritytech/substrate", branch = "polkadot-v0.9.36" }
+sp-api = { git = "https://github.com/paritytech/substrate", branch = "polkadot-v0.9.36" }
+sp-blockchain = { git = "https://github.com/paritytech/substrate", branch = "polkadot-v0.9.36" }
+sp-consensus-babe = { git = "https://github.com/paritytech/substrate", branch = "polkadot-v0.9.36" }
+sp-consensus = { git = "https://github.com/paritytech/substrate", branch = "polkadot-v0.9.36" }
+sp-runtime = { git = "https://github.com/paritytech/substrate", branch = "polkadot-v0.9.36" }
# cumulus deps
cumulus-relay-chain-interface = { path = "../relay-chain-interface" }
cumulus-relay-chain-rpc-interface = { path = "../relay-chain-rpc-interface" }
cumulus-primitives-core = { path = "../../primitives/core" }
-array-bytes = "4.1"
+array-bytes = "6.0"
lru = "0.8"
tracing = "0.1.37"
-async-trait = "0.1.52"
-futures = "0.3.24"
+async-trait = "0.1.59"
+futures = "0.3.25"
url = "2.2.2"
-tokio = { version = "1.21.2", features = ["macros"] }
+tokio = { version = "1.23.0", features = ["macros"] }
diff --git a/client/relay-chain-minimal-node/src/blockchain_rpc_client.rs b/client/relay-chain-minimal-node/src/blockchain_rpc_client.rs
index bf1a3c9a38c..1ed896533a3 100644
--- a/client/relay-chain-minimal-node/src/blockchain_rpc_client.rs
+++ b/client/relay-chain-minimal-node/src/blockchain_rpc_client.rs
@@ -363,13 +363,13 @@ impl BlockChainRpcClient {
pub async fn import_notification_stream(
&self,
) -> RelayChainResult + Send>>> {
- Ok(self.rpc_client.get_imported_heads_stream().await?.boxed())
+ Ok(self.rpc_client.get_imported_heads_stream()?.boxed())
}
pub async fn finality_notification_stream(
&self,
) -> RelayChainResult + Send>>> {
- Ok(self.rpc_client.get_finalized_heads_stream().await?.boxed())
+ Ok(self.rpc_client.get_finalized_heads_stream()?.boxed())
}
}
diff --git a/client/relay-chain-minimal-node/src/collator_overseer.rs b/client/relay-chain-minimal-node/src/collator_overseer.rs
index 6efb1a9ce2e..7ed01193026 100644
--- a/client/relay-chain-minimal-node/src/collator_overseer.rs
+++ b/client/relay-chain-minimal-node/src/collator_overseer.rs
@@ -16,17 +16,10 @@
use cumulus_relay_chain_interface::RelayChainError;
use lru::LruCache;
-use polkadot_availability_distribution::{
- AvailabilityDistributionSubsystem, IncomingRequestReceivers,
-};
-use polkadot_node_core_av_store::Config;
use polkadot_node_network_protocol::{
peer_set::PeerSetProtocolNames,
request_response::{
- v1::{
- AvailableDataFetchingRequest, ChunkFetchingRequest, CollationFetchingRequest,
- PoVFetchingRequest,
- },
+ v1::{AvailableDataFetchingRequest, CollationFetchingRequest},
IncomingRequestReceiver, ReqProtocolNames,
},
};
@@ -38,14 +31,13 @@ use polkadot_overseer::{
use polkadot_primitives::v2::CollatorPair;
use polkadot_service::{
overseer::{
- AvailabilityRecoverySubsystem, AvailabilityStoreSubsystem, ChainApiSubsystem,
- CollationGenerationSubsystem, CollatorProtocolSubsystem, NetworkBridgeMetrics,
- NetworkBridgeRxSubsystem, NetworkBridgeTxSubsystem, ProtocolSide, RuntimeApiSubsystem,
+ AvailabilityRecoverySubsystem, CollationGenerationSubsystem, CollatorProtocolSubsystem,
+ NetworkBridgeMetrics, NetworkBridgeRxSubsystem, NetworkBridgeTxSubsystem, ProtocolSide,
+ RuntimeApiSubsystem,
},
Error, OverseerConnector,
};
use sc_authority_discovery::Service as AuthorityDiscoveryService;
-use sc_keystore::LocalKeystore;
use sc_network::NetworkStateInfo;
use std::sync::Arc;
@@ -66,13 +58,9 @@ pub(crate) struct CollatorOverseerGenArgs<'a> {
pub network_service: Arc>,
/// Underlying authority discovery service.
pub authority_discovery_service: AuthorityDiscoveryService,
- // Receiver for collation request protocol
+ /// Receiver for collation request protocol
pub collation_req_receiver: IncomingRequestReceiver,
- // Receiver for PoV request protocol
- pub pov_req_receiver: IncomingRequestReceiver,
- // Receiver for chunk request protocol
- pub chunk_req_receiver: IncomingRequestReceiver,
- // Receiver for availability request protocol
+ /// Receiver for availability request protocol
pub available_data_req_receiver: IncomingRequestReceiver,
/// Prometheus registry, commonly used for production systems, less so for test.
pub registry: Option<&'a Registry>,
@@ -84,10 +72,6 @@ pub(crate) struct CollatorOverseerGenArgs<'a> {
pub req_protocol_names: ReqProtocolNames,
/// Peerset protocols name mapping
pub peer_set_protocol_names: PeerSetProtocolNames,
- /// Config for the availability store
- pub availability_config: Config,
- /// The underlying key value store for the parachains.
- pub parachains_db: Arc,
}
fn build_overseer<'a>(
@@ -98,15 +82,11 @@ fn build_overseer<'a>(
authority_discovery_service,
collation_req_receiver,
available_data_req_receiver,
- availability_config,
registry,
spawner,
collator_pair,
req_protocol_names,
peer_set_protocol_names,
- parachains_db,
- pov_req_receiver,
- chunk_req_receiver,
}: CollatorOverseerGenArgs<'a>,
) -> Result<
(Overseer, Arc>, OverseerHandle),
@@ -114,30 +94,21 @@ fn build_overseer<'a>(
> {
let leaves = Vec::new();
let metrics = ::register(registry)?;
- let keystore = Arc::new(LocalKeystore::in_memory());
let spawner = SpawnGlue(spawner);
let network_bridge_metrics: NetworkBridgeMetrics = Metrics::register(registry)?;
let builder = Overseer::builder()
- .availability_distribution(AvailabilityDistributionSubsystem::new(
- keystore.clone(),
- IncomingRequestReceivers { pov_req_receiver, chunk_req_receiver },
- Metrics::register(registry)?,
- ))
+ .availability_distribution(DummySubsystem)
.availability_recovery(AvailabilityRecoverySubsystem::with_chunks_only(
available_data_req_receiver,
Metrics::register(registry)?,
))
- .availability_store(AvailabilityStoreSubsystem::new(
- parachains_db.clone(),
- availability_config,
- Metrics::register(registry)?,
- ))
+ .availability_store(DummySubsystem)
.bitfield_distribution(DummySubsystem)
.bitfield_signing(DummySubsystem)
.candidate_backing(DummySubsystem)
.candidate_validation(DummySubsystem)
.pvf_checker(DummySubsystem)
- .chain_api(ChainApiSubsystem::new(runtime_client.clone(), Metrics::register(registry)?))
+ .chain_api(DummySubsystem)
.collation_generation(CollationGenerationSubsystem::new(Metrics::register(registry)?))
.collator_protocol({
let side = ProtocolSide::Collator(
@@ -252,8 +223,14 @@ async fn forward_collator_events(
f = finality.next() => {
match f {
Some(header) => {
- tracing::info!(target: "minimal-polkadot-node", "Received finalized block via RPC: #{} ({})", header.number, header.hash());
- let block_info = BlockInfo { hash: header.hash(), parent_hash: header.parent_hash, number: header.number };
+ tracing::info!(
+ target: "minimal-polkadot-node",
+ "Received finalized block via RPC: #{} ({} -> {})",
+ header.number,
+ header.parent_hash,
+ header.hash()
+ );
+ let block_info = BlockInfo { hash: header.hash(), parent_hash: header.parent_hash, number: header.number };
handle.block_finalized(block_info).await;
}
None => return Err(RelayChainError::GenericError("Relay chain finality stream ended.".to_string())),
@@ -262,8 +239,14 @@ async fn forward_collator_events(
i = imports.next() => {
match i {
Some(header) => {
- tracing::info!(target: "minimal-polkadot-node", "Received imported block via RPC: #{} ({})", header.number, header.hash());
- let block_info = BlockInfo { hash: header.hash(), parent_hash: header.parent_hash, number: header.number };
+ tracing::info!(
+ target: "minimal-polkadot-node",
+ "Received imported block via RPC: #{} ({} -> {})",
+ header.number,
+ header.parent_hash,
+ header.hash()
+ );
+ let block_info = BlockInfo { hash: header.hash(), parent_hash: header.parent_hash, number: header.number };
handle.block_imported(block_info).await;
}
None => return Err(RelayChainError::GenericError("Relay chain import stream ended.".to_string())),
diff --git a/client/relay-chain-minimal-node/src/lib.rs b/client/relay-chain-minimal-node/src/lib.rs
index 60b8a809a9c..1c61a7998bb 100644
--- a/client/relay-chain-minimal-node/src/lib.rs
+++ b/client/relay-chain-minimal-node/src/lib.rs
@@ -21,8 +21,9 @@ use cumulus_relay_chain_rpc_interface::{RelayChainRpcInterface, Url};
use polkadot_network_bridge::{peer_sets_info, IsAuthority};
use polkadot_node_network_protocol::{
peer_set::PeerSetProtocolNames,
- request_response::{self, IncomingRequest, ReqProtocolNames},
+ request_response::{v1, IncomingRequest, IncomingRequestReceiver, Protocol, ReqProtocolNames},
};
+
use polkadot_node_subsystem_util::metrics::prometheus::Registry;
use polkadot_primitives::v2::CollatorPair;
@@ -31,7 +32,7 @@ use sc_network::{Event, NetworkService};
use sc_network_common::service::NetworkEventStream;
use std::sync::Arc;
-use polkadot_service::{open_database, Configuration, TaskManager};
+use polkadot_service::{Configuration, TaskManager};
use futures::StreamExt;
@@ -84,7 +85,7 @@ fn build_authority_discovery_service(
pub async fn build_minimal_relay_chain_node(
polkadot_config: Configuration,
task_manager: &mut TaskManager,
- relay_chain_url: Url,
+ relay_chain_url: Vec,
) -> RelayChainResult<(Arc<(dyn RelayChainInterface + 'static)>, Option)> {
let client = cumulus_relay_chain_rpc_interface::create_client_and_start_worker(
relay_chain_url,
@@ -152,8 +153,9 @@ async fn new_minimal_relay_chain(
.extend(peer_sets_info(is_authority, &peer_set_protocol_names));
let request_protocol_names = ReqProtocolNames::new(genesis_hash, config.chain_spec.fork_id());
- let (collation_req_receiver, available_data_req_receiver, pov_req_receiver, chunk_req_receiver) =
+ let (collation_req_receiver, available_data_req_receiver) =
build_request_response_protocol_receivers(&request_protocol_names, &mut config);
+
let (network, network_starter) =
network::build_collator_network(network::BuildCollatorNetworkParams {
config: &config,
@@ -170,8 +172,6 @@ async fn new_minimal_relay_chain(
prometheus_registry.clone(),
);
- let parachains_db = open_database(&config.database)?;
-
let overseer_args = CollatorOverseerGenArgs {
runtime_client: relay_chain_rpc_client.clone(),
network_service: network.clone(),
@@ -183,10 +183,6 @@ async fn new_minimal_relay_chain(
collator_pair,
req_protocol_names: request_protocol_names,
peer_set_protocol_names,
- parachains_db,
- availability_config: polkadot_service::AVAILABILITY_CONFIG,
- pov_req_receiver,
- chunk_req_receiver,
};
let overseer_handle = collator_overseer::spawn_overseer(
@@ -204,10 +200,8 @@ fn build_request_response_protocol_receivers(
request_protocol_names: &ReqProtocolNames,
config: &mut Configuration,
) -> (
- request_response::IncomingRequestReceiver,
- request_response::IncomingRequestReceiver,
- request_response::IncomingRequestReceiver,
- request_response::IncomingRequestReceiver,
+ IncomingRequestReceiver,
+ IncomingRequestReceiver,
) {
let (collation_req_receiver, cfg) =
IncomingRequest::get_config_receiver(request_protocol_names);
@@ -215,9 +209,7 @@ fn build_request_response_protocol_receivers(
let (available_data_req_receiver, cfg) =
IncomingRequest::get_config_receiver(request_protocol_names);
config.network.request_response_protocols.push(cfg);
- let (pov_req_receiver, cfg) = IncomingRequest::get_config_receiver(request_protocol_names);
- config.network.request_response_protocols.push(cfg);
- let (chunk_req_receiver, cfg) = IncomingRequest::get_config_receiver(request_protocol_names);
+ let cfg = Protocol::ChunkFetchingV1.get_outbound_only_config(request_protocol_names);
config.network.request_response_protocols.push(cfg);
- (collation_req_receiver, available_data_req_receiver, pov_req_receiver, chunk_req_receiver)
+ (collation_req_receiver, available_data_req_receiver)
}
diff --git a/client/relay-chain-minimal-node/src/network.rs b/client/relay-chain-minimal-node/src/network.rs
index bd4b0889a89..9bd31ad7600 100644
--- a/client/relay-chain-minimal-node/src/network.rs
+++ b/client/relay-chain-minimal-node/src/network.rs
@@ -21,19 +21,19 @@ use polkadot_node_network_protocol::PeerId;
use sc_network::{NetworkService, SyncState};
use sc_client_api::HeaderBackend;
+use sc_consensus::{BlockImportError, BlockImportStatus, JustificationSyncLink, Link};
use sc_network_common::{
config::{
NonDefaultSetConfig, NonReservedPeerMode, NotificationHandshake, ProtocolId, SetConfig,
},
protocol::role::Roles,
service::NetworkSyncForkRequest,
- sync::{message::BlockAnnouncesHandshake, Metrics, SyncStatus},
+ sync::{
+ message::{BlockAnnouncesHandshake, BlockRequest},
+ BadPeer, Metrics, OnBlockData, PollBlockAnnounceValidation, SyncStatus,
+ },
};
-use sc_network_light::light_client_requests;
-use sc_network_sync::{block_request_handler, state_request_handler};
use sc_service::{error::Error, Configuration, NetworkStarter, SpawnTaskHandle};
-use sp_consensus::BlockOrigin;
-use sp_runtime::Justifications;
use std::{iter, sync::Arc};
@@ -57,16 +57,6 @@ pub(crate) fn build_collator_network(
let BuildCollatorNetworkParams { config, client, spawn_handle, genesis_hash } = params;
let protocol_id = config.protocol_id();
-
- let block_request_protocol_config =
- block_request_handler::generate_protocol_config(&protocol_id, genesis_hash, None);
-
- let state_request_protocol_config =
- state_request_handler::generate_protocol_config(&protocol_id, genesis_hash, None);
-
- let light_client_request_protocol_config =
- light_client_requests::generate_protocol_config(&protocol_id, genesis_hash, None);
-
let chain_sync = DummyChainSync;
let block_announce_config = chain_sync.get_block_announce_proto_config::(
protocol_id.clone(),
@@ -81,23 +71,18 @@ pub(crate) fn build_collator_network(
role: config.role.clone(),
executor: {
let spawn_handle = Clone::clone(&spawn_handle);
- Some(Box::new(move |fut| {
+ Box::new(move |fut| {
spawn_handle.spawn("libp2p-node", Some("networking"), fut);
- }))
+ })
},
fork_id: None,
chain_sync: Box::new(chain_sync),
network_config: config.network.clone(),
chain: client.clone(),
- import_queue: Box::new(DummyImportQueue),
protocol_id,
metrics_registry: config.prometheus_config.as_ref().map(|config| config.registry.clone()),
block_announce_config,
chain_sync_service: Box::new(DummyChainSyncService::(Default::default())),
- block_request_protocol_config,
- state_request_protocol_config,
- warp_sync_protocol_config: None,
- light_client_request_protocol_config,
request_response_protocol_configs: Vec::new(),
};
@@ -248,30 +233,6 @@ impl sc_network_common::sync::ChainSync for DummyChainSync {
) {
}
- fn justification_requests(
- &mut self,
- ) -> Box)> + '_>
- {
- Box::new(std::iter::empty())
- }
-
- fn block_requests(
- &mut self,
- ) -> Box)> + '_>
- {
- Box::new(std::iter::empty())
- }
-
- fn state_request(&mut self) -> Option<(PeerId, sc_network_common::sync::OpaqueStateRequest)> {
- None
- }
-
- fn warp_sync_request(
- &mut self,
- ) -> Option<(PeerId, sc_network_common::sync::warp::WarpProofRequest)> {
- None
- }
-
fn on_block_data(
&mut self,
_who: &PeerId,
@@ -281,22 +242,6 @@ impl sc_network_common::sync::ChainSync for DummyChainSync {
unimplemented!("Not supported on the RPC collator")
}
- fn on_state_data(
- &mut self,
- _who: &PeerId,
- _response: sc_network_common::sync::OpaqueStateResponse,
- ) -> Result, sc_network_common::sync::BadPeer> {
- unimplemented!("Not supported on the RPC collator")
- }
-
- fn on_warp_sync_data(
- &mut self,
- _who: &PeerId,
- _response: sc_network_common::sync::warp::EncodedProof,
- ) -> Result<(), sc_network_common::sync::BadPeer> {
- unimplemented!("Not supported on the RPC collator")
- }
-
fn on_block_justification(
&mut self,
_who: PeerId,
@@ -306,28 +251,6 @@ impl sc_network_common::sync::ChainSync for DummyChainSync {
unimplemented!("Not supported on the RPC collator")
}
- fn on_blocks_processed(
- &mut self,
- _imported: usize,
- _count: usize,
- _results: Vec<(
- Result<
- sc_consensus::BlockImportStatus>,
- sc_consensus::BlockImportError,
- >,
- ::Hash,
- )>,
- ) -> Box<
- dyn Iterator<
- Item = Result<
- (PeerId, sc_network_common::sync::message::BlockRequest),
- sc_network_common::sync::BadPeer,
- >,
- >,
- > {
- Box::new(std::iter::empty())
- }
-
fn on_justification_import(
&mut self,
_hash: ::Hash,
@@ -360,12 +283,7 @@ impl sc_network_common::sync::ChainSync for DummyChainSync {
std::task::Poll::Pending
}
- fn peer_disconnected(
- &mut self,
- _who: &PeerId,
- ) -> Option> {
- None
- }
+ fn peer_disconnected(&mut self, _who: &PeerId) {}
fn metrics(&self) -> sc_network_common::sync::Metrics {
Metrics {
@@ -380,27 +298,6 @@ impl sc_network_common::sync::ChainSync for DummyChainSync {
}
}
- fn create_opaque_block_request(
- &self,
- _request: &sc_network_common::sync::message::BlockRequest,
- ) -> sc_network_common::sync::OpaqueBlockRequest {
- unimplemented!("Not supported on the RPC collator")
- }
-
- fn encode_block_request(
- &self,
- _request: &sc_network_common::sync::OpaqueBlockRequest,
- ) -> Result, String> {
- unimplemented!("Not supported on the RPC collator")
- }
-
- fn decode_block_response(
- &self,
- _response: &[u8],
- ) -> Result {
- unimplemented!("Not supported on the RPC collator")
- }
-
fn block_response_into_blocks(
&self,
_request: &sc_network_common::sync::message::BlockRequest,
@@ -409,58 +306,53 @@ impl sc_network_common::sync::ChainSync for DummyChainSync {
unimplemented!("Not supported on the RPC collator")
}
- fn encode_state_request(
- &self,
- _request: &sc_network_common::sync::OpaqueStateRequest,
- ) -> Result, String> {
- unimplemented!("Not supported on the RPC collator")
+ fn poll(
+ &mut self,
+ _cx: &mut std::task::Context,
+ ) -> std::task::Poll> {
+ std::task::Poll::Pending
}
- fn decode_state_response(
- &self,
- _response: &[u8],
- ) -> Result {
+ fn send_block_request(&mut self, _who: PeerId, _request: BlockRequest) {
unimplemented!("Not supported on the RPC collator")
}
- fn poll(
- &mut self,
- _cx: &mut std::task::Context,
- ) -> std::task::Poll::Header>>
- {
- std::task::Poll::Pending
+ fn num_active_peers(&self) -> usize {
+ 0
}
+
+ fn process_block_response_data(&mut self, _blocks_to_import: Result, BadPeer>) {}
}
-struct DummyImportQueue;
+struct DummyChainSyncService(std::marker::PhantomData);
-impl sc_service::ImportQueue for DummyImportQueue {
- fn import_blocks(
- &mut self,
- _origin: BlockOrigin,
- _blocks: Vec>,
- ) {
- }
+impl NetworkSyncForkRequest> for DummyChainSyncService {
+ fn set_sync_fork_request(&self, _peers: Vec, _hash: B::Hash, _number: NumberFor) {}
+}
+
+impl