From 848d85777abc11d462db9e6dfe4b1b62c2da79a7 Mon Sep 17 00:00:00 2001 From: Serban Iorga Date: Sat, 30 Jul 2022 19:19:40 +0300 Subject: [PATCH 1/3] Parachains source cosmetic changes - Make `ParaHashAtSource` more generic - Modify `on_chain_parachain_header` to return `HeaderId` - Shortening variable names Signed-off-by: Serban Iorga --- primitives/runtime/src/lib.rs | 14 ++- .../src/finality/source.rs | 5 +- .../src/on_demand/parachains.rs | 13 +-- .../src/parachains/source.rs | 88 +++++++++---------- relays/parachains/src/parachains_loop.rs | 79 +++++------------ relays/utils/src/lib.rs | 25 ++++++ 6 files changed, 107 insertions(+), 117 deletions(-) diff --git a/primitives/runtime/src/lib.rs b/primitives/runtime/src/lib.rs index a51327e89b..2cac05a17f 100644 --- a/primitives/runtime/src/lib.rs +++ b/primitives/runtime/src/lib.rs @@ -27,7 +27,7 @@ use scale_info::TypeInfo; use sp_core::{hash::H256, storage::StorageKey}; use sp_io::hashing::blake2_256; use sp_runtime::traits::{BadOrigin, Header as HeaderT}; -use sp_std::{convert::TryFrom, fmt::Debug, vec, vec::Vec}; +use sp_std::{cmp::Ordering, convert::TryFrom, fmt::Debug, vec, vec::Vec}; pub use chain::{ AccountIdOf, AccountPublicOf, BalanceOf, BlockNumberOf, Chain, EncodedOrDecodedCall, HashOf, @@ -85,6 +85,18 @@ pub const ROOT_ACCOUNT_DERIVATION_PREFIX: &[u8] = b"pallet-bridge/account-deriva #[derive(RuntimeDebug, Default, Clone, Encode, Decode, Copy, Eq, Hash, PartialEq)] pub struct HeaderId(pub Number, pub Hash); +impl PartialOrd for HeaderId { + fn partial_cmp(&self, other: &Self) -> Option { + self.0.partial_cmp(&other.0) + } +} + +impl Ord for HeaderId { + fn cmp(&self, other: &Self) -> Ordering { + self.0.cmp(&other.0) + } +} + /// Generic header id provider. pub trait HeaderIdProvider { // Get the header id. diff --git a/relays/lib-substrate-relay/src/finality/source.rs b/relays/lib-substrate-relay/src/finality/source.rs index d0acb77176..c8360bbddb 100644 --- a/relays/lib-substrate-relay/src/finality/source.rs +++ b/relays/lib-substrate-relay/src/finality/source.rs @@ -27,7 +27,6 @@ use relay_substrate_client::{ BlockNumberOf, BlockWithJustification, Chain, Client, Error, HeaderOf, }; use relay_utils::relay_loop::Client as RelayClient; -use sp_runtime::traits::Header as HeaderT; use std::pin::Pin; /// Shared updatable reference to the maximal header number that we want to sync from the source. @@ -76,9 +75,7 @@ impl SubstrateFinalitySource

{ ) -> Result, Error> { // we **CAN** continue to relay finality proofs if source node is out of sync, because // target node may be missing proofs that are already available at the source - let finalized_header_hash = self.client.best_finalized_header_hash().await?; - let finalized_header = self.client.header_by_hash(finalized_header_hash).await?; - Ok(*finalized_header.number()) + self.client.best_finalized_header_number().await } } diff --git a/relays/lib-substrate-relay/src/on_demand/parachains.rs b/relays/lib-substrate-relay/src/on_demand/parachains.rs index 8300bf74cd..902f67f7e7 100644 --- a/relays/lib-substrate-relay/src/on_demand/parachains.rs +++ b/relays/lib-substrate-relay/src/on_demand/parachains.rs @@ -389,13 +389,9 @@ where source.client().best_finalized_header().await.map_err(map_source_err)?; let best_finalized_relay_block_id = best_finalized_relay_header.id(); let para_header_at_source = source - .on_chain_parachain_header( - best_finalized_relay_block_id, - P::SOURCE_PARACHAIN_PARA_ID.into(), - ) + .on_chain_para_head_id(best_finalized_relay_block_id, P::SOURCE_PARACHAIN_PARA_ID.into()) .await - .map_err(map_source_err)? - .map(|h| h.id()); + .map_err(map_source_err)?; let relay_header_at_source = best_finalized_relay_block_id.0; let relay_header_at_target = @@ -408,10 +404,9 @@ where .map_err(map_target_err)?; let para_header_at_relay_header_at_target = source - .on_chain_parachain_header(relay_header_at_target, P::SOURCE_PARACHAIN_PARA_ID.into()) + .on_chain_para_head_id(relay_header_at_target, P::SOURCE_PARACHAIN_PARA_ID.into()) .await - .map_err(map_source_err)? - .map(|h| h.id()); + .map_err(map_source_err)?; Ok(RelayData { required_para_header: required_header_number, diff --git a/relays/lib-substrate-relay/src/parachains/source.rs b/relays/lib-substrate-relay/src/parachains/source.rs index f763b744f1..40ce4c079e 100644 --- a/relays/lib-substrate-relay/src/parachains/source.rs +++ b/relays/lib-substrate-relay/src/parachains/source.rs @@ -22,16 +22,15 @@ use async_std::sync::{Arc, Mutex}; use async_trait::async_trait; use bp_parachains::parachain_head_storage_key_at_source; use bp_polkadot_core::parachains::{ParaHash, ParaHead, ParaHeadsProof, ParaId}; +use bp_runtime::HeaderIdProvider; use codec::Decode; use parachains_relay::{ - parachains_loop::{ParaHashAtSource, SourceClient}, - parachains_loop_metrics::ParachainsLoopMetrics, + parachains_loop::SourceClient, parachains_loop_metrics::ParachainsLoopMetrics, }; use relay_substrate_client::{ Chain, Client, Error as SubstrateError, HeaderIdOf, HeaderOf, RelayChain, }; -use relay_utils::relay_loop::Client as RelayClient; -use sp_runtime::traits::Header as HeaderT; +use relay_utils::{relay_loop::Client as RelayClient, NoopOption}; /// Shared updatable reference to the maximal parachain header id that we want to sync from the /// source. @@ -41,16 +40,16 @@ pub type RequiredHeaderIdRef = Arc>>>; #[derive(Clone)] pub struct ParachainsSource { client: Client, - maximal_header_id: Option>, + max_head_id: Option>, } impl ParachainsSource

{ /// Creates new parachains source client. pub fn new( client: Client, - maximal_header_id: Option>, + max_head_id: Option>, ) -> Self { - ParachainsSource { client, maximal_header_id } + ParachainsSource { client, max_head_id } } /// Returns reference to the underlying RPC client. @@ -58,12 +57,23 @@ impl ParachainsSource

{ &self.client } + /// Returns the `max_header_id` as an `NoopOption` + async fn max_head_id(&self) -> NoopOption> { + match self.max_head_id { + Some(ref max_head_id_guard) => match *max_head_id_guard.lock().await { + Some(max_head_id) => NoopOption::Some(max_head_id), + None => NoopOption::Noop, + }, + None => NoopOption::None, + } + } + /// Return decoded head of given parachain. - pub async fn on_chain_parachain_header( + pub async fn on_chain_para_head_id( &self, at_block: HeaderIdOf, para_id: ParaId, - ) -> Result>, SubstrateError> { + ) -> Result>, SubstrateError> { let storage_key = parachain_head_storage_key_at_source(P::SourceRelayChain::PARAS_PALLET_NAME, para_id); let para_head = self.client.raw_storage_value(storage_key, Some(at_block.1)).await?; @@ -72,8 +82,8 @@ impl ParachainsSource

{ Some(para_head) => para_head, None => return Ok(None), }; - - Ok(Some(Decode::decode(&mut ¶_head.0[..])?)) + let para_head: HeaderOf = Decode::decode(&mut ¶_head.0[..])?; + Ok(Some(para_head.id())) } } @@ -105,7 +115,7 @@ where at_block: HeaderIdOf, metrics: Option<&ParachainsLoopMetrics>, para_id: ParaId, - ) -> Result { + ) -> Result, Self::Error> { // we don't need to support many parachains now if para_id.0 != P::SOURCE_PARACHAIN_PARA_ID { return Err(SubstrateError::Custom(format!( @@ -115,44 +125,28 @@ where ))) } - let mut para_hash_at_source = ParaHashAtSource::None; - let mut para_header_number_at_source = None; - match self.on_chain_parachain_header(at_block, para_id).await? { - Some(parachain_header) => { - para_hash_at_source = ParaHashAtSource::Some(parachain_header.hash()); - para_header_number_at_source = Some(*parachain_header.number()); - // never return head that is larger than requested. This way we'll never sync - // headers past `maximal_header_id` - if let Some(ref maximal_header_id) = self.maximal_header_id { - let maximal_header_id = *maximal_header_id.lock().await; - match maximal_header_id { - Some(maximal_header_id) - if *parachain_header.number() > maximal_header_id.0 => - { - // we don't want this header yet => let's report previously requested - // header - para_hash_at_source = ParaHashAtSource::Some(maximal_header_id.1); - para_header_number_at_source = Some(maximal_header_id.0); - }, - Some(_) => (), - None => { - // on-demand relay has not yet asked us to sync anything let's do that - para_hash_at_source = ParaHashAtSource::Unavailable; - para_header_number_at_source = None; - }, - } - } - }, - None => {}, - }; + let mut para_head_id = NoopOption::None; + if let Some(on_chain_para_head_id) = self.on_chain_para_head_id(at_block, para_id).await? { + // Never return head that is larger than requested. This way we'll never sync + // headers past `max_header_id`. + para_head_id = match self.max_head_id().await { + NoopOption::Noop => NoopOption::Noop, + NoopOption::None => { + // `max_header_id` is not set. There is no limit. + NoopOption::Some(on_chain_para_head_id) + }, + NoopOption::Some(max_head_id) => { + // We report at most `max_header_id`. + NoopOption::Some(std::cmp::min(on_chain_para_head_id, max_head_id)) + }, + } + } - if let (Some(metrics), Some(para_header_number_at_source)) = - (metrics, para_header_number_at_source) - { - metrics.update_best_parachain_block_at_source(para_id, para_header_number_at_source); + if let (Some(metrics), NoopOption::Some(para_head_id)) = (metrics, para_head_id) { + metrics.update_best_parachain_block_at_source(para_id, para_head_id.0); } - Ok(para_hash_at_source) + Ok(para_head_id.map(|para_head_id| para_head_id.1)) } async fn prove_parachain_heads( diff --git a/relays/parachains/src/parachains_loop.rs b/relays/parachains/src/parachains_loop.rs index 3c7ac85913..81435821d5 100644 --- a/relays/parachains/src/parachains_loop.rs +++ b/relays/parachains/src/parachains_loop.rs @@ -24,7 +24,9 @@ use bp_polkadot_core::{ }; use futures::{future::FutureExt, select}; use relay_substrate_client::{BlockNumberOf, Chain, HeaderIdOf}; -use relay_utils::{metrics::MetricsParams, relay_loop::Client as RelayClient, FailedClient}; +use relay_utils::{ + metrics::MetricsParams, relay_loop::Client as RelayClient, FailedClient, NoopOption, +}; use std::{ collections::{BTreeMap, BTreeSet}, future::Future, @@ -52,33 +54,6 @@ pub enum ParachainSyncStrategy { All, } -/// Parachain head hash, available at the source (relay) chain. -#[derive(Clone, Copy, Debug)] -pub enum ParaHashAtSource { - /// There's no parachain head at the source chain. - /// - /// Normally it means that the parachain is not registered there. - None, - /// Parachain head with given hash is available at the source chain. - Some(ParaHash), - /// The source client refuses to report parachain head hash at this moment. - /// - /// It is a "mild" error, which may appear when e.g. on-demand parachains relay is used. - /// This variant must be treated as "we don't want to update parachain head value at the - /// target chain at this moment". - Unavailable, -} - -impl ParaHashAtSource { - /// Return parachain head hash, if available. - pub fn hash(&self) -> Option<&ParaHash> { - match *self { - ParaHashAtSource::Some(ref para_hash) => Some(para_hash), - _ => None, - } - } -} - /// Source client used in parachain heads synchronization loop. #[async_trait] pub trait SourceClient: RelayClient { @@ -94,7 +69,7 @@ pub trait SourceClient: RelayClient { at_block: HeaderIdOf, metrics: Option<&ParachainsLoopMetrics>, para_id: ParaId, - ) -> Result; + ) -> Result, Self::Error>; /// Get parachain heads proof. /// @@ -342,7 +317,7 @@ where /// Given heads at source and target clients, returns set of heads that are out of sync. fn select_parachains_to_update( - heads_at_source: BTreeMap, + heads_at_source: BTreeMap>, heads_at_target: BTreeMap>, best_finalized_relay_block: HeaderIdOf, ) -> Vec @@ -368,12 +343,12 @@ where .zip(heads_at_target.into_iter()) .filter(|((para, head_at_source), (_, head_at_target))| { let needs_update = match (head_at_source, head_at_target) { - (ParaHashAtSource::Unavailable, _) => { + (NoopOption::Noop, _) => { // source client has politely asked us not to update current parachain head // at the target chain false }, - (ParaHashAtSource::Some(head_at_source), Some(head_at_target)) + (NoopOption::Some(head_at_source), Some(head_at_target)) if head_at_target.at_relay_block_number < best_finalized_relay_block.0 && head_at_target.head_hash != *head_at_source => { @@ -381,22 +356,22 @@ where // client true }, - (ParaHashAtSource::Some(_), Some(_)) => { + (NoopOption::Some(_), Some(_)) => { // this is normal case when relay has recently updated heads, when parachain is // not progressing, or when our source client is still syncing false }, - (ParaHashAtSource::Some(_), None) => { + (NoopOption::Some(_), None) => { // parachain is not yet known to the target client. This is true when parachain // or bridge has been just onboarded/started true }, - (ParaHashAtSource::None, Some(_)) => { + (NoopOption::None, Some(_)) => { // parachain/parathread has been offboarded removed from the system. It needs to // be propageted to the target client true }, - (ParaHashAtSource::None, None) => { + (NoopOption::None, None) => { // all's good - parachain is unknown to both clients false }, @@ -435,7 +410,7 @@ async fn read_heads_at_source( metrics: Option<&ParachainsLoopMetrics>, at_relay_block: &HeaderIdOf, parachains: &[ParaId], -) -> Result, FailedClient> { +) -> Result>, FailedClient> { let mut para_head_hashes = BTreeMap::new(); for para in parachains { let para_head = source_client.parachain_head(*at_relay_block, metrics, *para).await; @@ -612,7 +587,7 @@ mod tests { #[derive(Clone, Debug)] struct TestClientData { source_sync_status: Result, - source_heads: BTreeMap>, + source_heads: BTreeMap, TestError>>, source_proofs: BTreeMap, TestError>>, target_best_block: Result, TestError>, @@ -627,7 +602,7 @@ mod tests { pub fn minimal() -> Self { TestClientData { source_sync_status: Ok(true), - source_heads: vec![(PARA_ID, Ok(ParaHashAtSource::Some(PARA_0_HASH)))] + source_heads: vec![(PARA_ID, Ok(NoopOption::Some(PARA_0_HASH)))] .into_iter() .collect(), source_proofs: vec![(PARA_ID, Ok(PARA_0_HASH.encode()))].into_iter().collect(), @@ -676,10 +651,10 @@ mod tests { _at_block: HeaderIdOf, _metrics: Option<&ParachainsLoopMetrics>, para_id: ParaId, - ) -> Result { + ) -> Result, TestError> { match self.data.lock().await.source_heads.get(¶_id.0).cloned() { Some(result) => result, - None => Ok(ParaHashAtSource::None), + None => Ok(NoopOption::None), } } @@ -988,7 +963,7 @@ mod tests { fn parachain_is_not_updated_if_it_is_unknown_to_both_clients() { assert_eq!( select_parachains_to_update::( - vec![(ParaId(PARA_ID), ParaHashAtSource::None)].into_iter().collect(), + vec![(ParaId(PARA_ID), NoopOption::None)].into_iter().collect(), vec![(ParaId(PARA_ID), None)].into_iter().collect(), HeaderId(10, Default::default()), ), @@ -1000,9 +975,7 @@ mod tests { fn parachain_is_not_updated_if_it_has_been_updated_at_better_relay_block() { assert_eq!( select_parachains_to_update::( - vec![(ParaId(PARA_ID), ParaHashAtSource::Some(PARA_0_HASH))] - .into_iter() - .collect(), + vec![(ParaId(PARA_ID), NoopOption::Some(PARA_0_HASH))].into_iter().collect(), vec![( ParaId(PARA_ID), Some(BestParaHeadHash { at_relay_block_number: 20, head_hash: PARA_1_HASH }) @@ -1019,9 +992,7 @@ mod tests { fn parachain_is_not_updated_if_hash_is_the_same_at_next_relay_block() { assert_eq!( select_parachains_to_update::( - vec![(ParaId(PARA_ID), ParaHashAtSource::Some(PARA_0_HASH))] - .into_iter() - .collect(), + vec![(ParaId(PARA_ID), NoopOption::Some(PARA_0_HASH))].into_iter().collect(), vec![( ParaId(PARA_ID), Some(BestParaHeadHash { at_relay_block_number: 0, head_hash: PARA_0_HASH }) @@ -1038,7 +1009,7 @@ mod tests { fn parachain_is_updated_after_offboarding() { assert_eq!( select_parachains_to_update::( - vec![(ParaId(PARA_ID), ParaHashAtSource::None)].into_iter().collect(), + vec![(ParaId(PARA_ID), NoopOption::None)].into_iter().collect(), vec![( ParaId(PARA_ID), Some(BestParaHeadHash { @@ -1058,9 +1029,7 @@ mod tests { fn parachain_is_updated_after_onboarding() { assert_eq!( select_parachains_to_update::( - vec![(ParaId(PARA_ID), ParaHashAtSource::Some(PARA_0_HASH))] - .into_iter() - .collect(), + vec![(ParaId(PARA_ID), NoopOption::Some(PARA_0_HASH))].into_iter().collect(), vec![(ParaId(PARA_ID), None)].into_iter().collect(), HeaderId(10, Default::default()), ), @@ -1072,9 +1041,7 @@ mod tests { fn parachain_is_updated_if_newer_head_is_known() { assert_eq!( select_parachains_to_update::( - vec![(ParaId(PARA_ID), ParaHashAtSource::Some(PARA_1_HASH))] - .into_iter() - .collect(), + vec![(ParaId(PARA_ID), NoopOption::Some(PARA_1_HASH))].into_iter().collect(), vec![( ParaId(PARA_ID), Some(BestParaHeadHash { at_relay_block_number: 0, head_hash: PARA_0_HASH }) @@ -1091,7 +1058,7 @@ mod tests { fn parachain_is_not_updated_if_source_head_is_unavailable() { assert_eq!( select_parachains_to_update::( - vec![(ParaId(PARA_ID), ParaHashAtSource::Unavailable)].into_iter().collect(), + vec![(ParaId(PARA_ID), NoopOption::Noop)].into_iter().collect(), vec![( ParaId(PARA_ID), Some(BestParaHeadHash { at_relay_block_number: 0, head_hash: PARA_0_HASH }) diff --git a/relays/utils/src/lib.rs b/relays/utils/src/lib.rs index 603011819b..217210aa2c 100644 --- a/relays/utils/src/lib.rs +++ b/relays/utils/src/lib.rs @@ -271,3 +271,28 @@ where }, } } + +/// An extension over rust's `Option` that adds an extra possibility: `Noop`. +#[derive(Clone, Copy, Debug)] +pub enum NoopOption { + /// If any operation depends on this `NoopOption`, we shouldn't perform it. + Noop, + /// Same as `Option::None`. + None, + /// Same as `Option::Some`. + Some(T), +} + +impl NoopOption { + /// Transform contained value. + pub fn map(self, f: F) -> NoopOption + where + F: FnOnce(T) -> U, + { + match self { + NoopOption::Noop => NoopOption::Noop, + NoopOption::None => NoopOption::None, + NoopOption::Some(val) => NoopOption::Some(f(val)), + } + } +} From 757f7f25f887181464b0930b4764ffa292cacd13 Mon Sep 17 00:00:00 2001 From: Serban Iorga Date: Mon, 1 Aug 2022 21:17:57 +0300 Subject: [PATCH 2/3] Change ParachainsSource::max_head_id type Change ParachainsSource::max_head_id to Arc> Signed-off-by: Serban Iorga --- .../bin-substrate/src/cli/relay_parachains.rs | 12 ++++++++++-- .../src/on_demand/parachains.rs | 9 +++++---- .../src/parachains/source.rs | 19 ++++--------------- 3 files changed, 19 insertions(+), 21 deletions(-) diff --git a/relays/bin-substrate/src/cli/relay_parachains.rs b/relays/bin-substrate/src/cli/relay_parachains.rs index 2fa6dcaf9e..6b65faebd4 100644 --- a/relays/bin-substrate/src/cli/relay_parachains.rs +++ b/relays/bin-substrate/src/cli/relay_parachains.rs @@ -18,10 +18,15 @@ use crate::chains::{ rialto_parachains_to_millau::RialtoParachainToMillauCliBridge, westend_parachains_to_millau::WestmintToMillauCliBridge, }; +use async_std::sync::Mutex; use async_trait::async_trait; use bp_polkadot_core::parachains::ParaId; use parachains_relay::parachains_loop::{ParachainSyncParams, SourceClient, TargetClient}; -use relay_utils::metrics::{GlobalMetrics, StandaloneMetric}; +use relay_utils::{ + metrics::{GlobalMetrics, StandaloneMetric}, + NoopOption, +}; +use std::sync::Arc; use structopt::StructOpt; use strum::{EnumString, EnumVariantNames, VariantNames}; use substrate_relay_helper::{ @@ -65,7 +70,10 @@ where { async fn relay_headers(data: RelayParachains) -> anyhow::Result<()> { let source_client = data.source.into_client::().await?; - let source_client = ParachainsSource::::new(source_client, None); + let source_client = ParachainsSource::::new( + source_client, + Arc::new(Mutex::new(NoopOption::None)), + ); let target_transaction_params = TransactionParams { signer: data.target_sign.to_keypair::()?, diff --git a/relays/lib-substrate-relay/src/on_demand/parachains.rs b/relays/lib-substrate-relay/src/on_demand/parachains.rs index 902f67f7e7..8744f7c887 100644 --- a/relays/lib-substrate-relay/src/on_demand/parachains.rs +++ b/relays/lib-substrate-relay/src/on_demand/parachains.rs @@ -42,7 +42,7 @@ use relay_substrate_client::{ TransactionSignScheme, }; use relay_utils::{ - metrics::MetricsParams, relay_loop::Client as RelayClient, FailedClient, HeaderId, + metrics::MetricsParams, relay_loop::Client as RelayClient, FailedClient, HeaderId, NoopOption, }; use std::fmt::Debug; @@ -143,7 +143,7 @@ async fn background_task( let mut relay_state = RelayState::Idle; let mut required_parachain_header_number = Zero::zero(); - let required_para_header_number_ref = Arc::new(Mutex::new(None)); + let required_para_header_number_ref = Arc::new(Mutex::new(NoopOption::Noop)); let mut restart_relay = true; let parachains_relay_task = futures::future::Fuse::terminated(); @@ -151,7 +151,7 @@ async fn background_task( let mut parachains_source = ParachainsSource::

::new( source_relay_client.clone(), - Some(required_para_header_number_ref.clone()), + required_para_header_number_ref.clone(), ); let mut parachains_target = ParachainsTarget::

::new(target_client.clone(), target_transaction_params.clone()); @@ -253,7 +253,8 @@ async fn background_task( .await; }, RelayState::RelayingParaHeader(required_para_header) => { - *required_para_header_number_ref.lock().await = Some(required_para_header); + *required_para_header_number_ref.lock().await = + NoopOption::Some(required_para_header); }, } diff --git a/relays/lib-substrate-relay/src/parachains/source.rs b/relays/lib-substrate-relay/src/parachains/source.rs index 40ce4c079e..fa3fef31bf 100644 --- a/relays/lib-substrate-relay/src/parachains/source.rs +++ b/relays/lib-substrate-relay/src/parachains/source.rs @@ -34,20 +34,20 @@ use relay_utils::{relay_loop::Client as RelayClient, NoopOption}; /// Shared updatable reference to the maximal parachain header id that we want to sync from the /// source. -pub type RequiredHeaderIdRef = Arc>>>; +pub type RequiredHeaderIdRef = Arc>>>; /// Substrate client as parachain heads source. #[derive(Clone)] pub struct ParachainsSource { client: Client, - max_head_id: Option>, + max_head_id: RequiredHeaderIdRef, } impl ParachainsSource

{ /// Creates new parachains source client. pub fn new( client: Client, - max_head_id: Option>, + max_head_id: RequiredHeaderIdRef, ) -> Self { ParachainsSource { client, max_head_id } } @@ -57,17 +57,6 @@ impl ParachainsSource

{ &self.client } - /// Returns the `max_header_id` as an `NoopOption` - async fn max_head_id(&self) -> NoopOption> { - match self.max_head_id { - Some(ref max_head_id_guard) => match *max_head_id_guard.lock().await { - Some(max_head_id) => NoopOption::Some(max_head_id), - None => NoopOption::Noop, - }, - None => NoopOption::None, - } - } - /// Return decoded head of given parachain. pub async fn on_chain_para_head_id( &self, @@ -129,7 +118,7 @@ where if let Some(on_chain_para_head_id) = self.on_chain_para_head_id(at_block, para_id).await? { // Never return head that is larger than requested. This way we'll never sync // headers past `max_header_id`. - para_head_id = match self.max_head_id().await { + para_head_id = match *self.max_head_id.lock().await { NoopOption::Noop => NoopOption::Noop, NoopOption::None => { // `max_header_id` is not set. There is no limit. From 878d35c0ffe863579e29fe100dcfe4c4199c61e5 Mon Sep 17 00:00:00 2001 From: Serban Iorga Date: Tue, 2 Aug 2022 12:33:32 +0300 Subject: [PATCH 3/3] code review changes --- primitives/runtime/src/lib.rs | 18 +--- .../bin-substrate/src/cli/relay_parachains.rs | 9 +- .../src/on_demand/parachains.rs | 8 +- .../src/parachains/source.rs | 23 ++--- relays/parachains/src/parachains_loop.rs | 83 ++++++++++++++----- relays/utils/src/lib.rs | 25 ------ 6 files changed, 84 insertions(+), 82 deletions(-) diff --git a/primitives/runtime/src/lib.rs b/primitives/runtime/src/lib.rs index 2cac05a17f..a7e3e44626 100644 --- a/primitives/runtime/src/lib.rs +++ b/primitives/runtime/src/lib.rs @@ -27,7 +27,7 @@ use scale_info::TypeInfo; use sp_core::{hash::H256, storage::StorageKey}; use sp_io::hashing::blake2_256; use sp_runtime::traits::{BadOrigin, Header as HeaderT}; -use sp_std::{cmp::Ordering, convert::TryFrom, fmt::Debug, vec, vec::Vec}; +use sp_std::{convert::TryFrom, fmt::Debug, vec, vec::Vec}; pub use chain::{ AccountIdOf, AccountPublicOf, BalanceOf, BlockNumberOf, Chain, EncodedOrDecodedCall, HashOf, @@ -82,21 +82,11 @@ pub const ACCOUNT_DERIVATION_PREFIX: &[u8] = b"pallet-bridge/account-derivation/ pub const ROOT_ACCOUNT_DERIVATION_PREFIX: &[u8] = b"pallet-bridge/account-derivation/root"; /// Generic header Id. -#[derive(RuntimeDebug, Default, Clone, Encode, Decode, Copy, Eq, Hash, PartialEq)] +#[derive( + RuntimeDebug, Default, Clone, Encode, Decode, Copy, Eq, Hash, PartialEq, PartialOrd, Ord, +)] pub struct HeaderId(pub Number, pub Hash); -impl PartialOrd for HeaderId { - fn partial_cmp(&self, other: &Self) -> Option { - self.0.partial_cmp(&other.0) - } -} - -impl Ord for HeaderId { - fn cmp(&self, other: &Self) -> Ordering { - self.0.cmp(&other.0) - } -} - /// Generic header id provider. pub trait HeaderIdProvider { // Get the header id. diff --git a/relays/bin-substrate/src/cli/relay_parachains.rs b/relays/bin-substrate/src/cli/relay_parachains.rs index 6b65faebd4..0925197258 100644 --- a/relays/bin-substrate/src/cli/relay_parachains.rs +++ b/relays/bin-substrate/src/cli/relay_parachains.rs @@ -21,11 +21,10 @@ use crate::chains::{ use async_std::sync::Mutex; use async_trait::async_trait; use bp_polkadot_core::parachains::ParaId; -use parachains_relay::parachains_loop::{ParachainSyncParams, SourceClient, TargetClient}; -use relay_utils::{ - metrics::{GlobalMetrics, StandaloneMetric}, - NoopOption, +use parachains_relay::parachains_loop::{ + AvailableHeader, ParachainSyncParams, SourceClient, TargetClient, }; +use relay_utils::metrics::{GlobalMetrics, StandaloneMetric}; use std::sync::Arc; use structopt::StructOpt; use strum::{EnumString, EnumVariantNames, VariantNames}; @@ -72,7 +71,7 @@ where let source_client = data.source.into_client::().await?; let source_client = ParachainsSource::::new( source_client, - Arc::new(Mutex::new(NoopOption::None)), + Arc::new(Mutex::new(AvailableHeader::Missing)), ); let target_transaction_params = TransactionParams { diff --git a/relays/lib-substrate-relay/src/on_demand/parachains.rs b/relays/lib-substrate-relay/src/on_demand/parachains.rs index 8744f7c887..dc40011550 100644 --- a/relays/lib-substrate-relay/src/on_demand/parachains.rs +++ b/relays/lib-substrate-relay/src/on_demand/parachains.rs @@ -36,13 +36,13 @@ use bp_runtime::HeaderIdProvider; use futures::{select, FutureExt}; use num_traits::Zero; use pallet_bridge_parachains::{RelayBlockHash, RelayBlockHasher, RelayBlockNumber}; -use parachains_relay::parachains_loop::{ParachainSyncParams, TargetClient}; +use parachains_relay::parachains_loop::{AvailableHeader, ParachainSyncParams, TargetClient}; use relay_substrate_client::{ AccountIdOf, AccountKeyPairOf, BlockNumberOf, Chain, Client, Error as SubstrateError, HashOf, TransactionSignScheme, }; use relay_utils::{ - metrics::MetricsParams, relay_loop::Client as RelayClient, FailedClient, HeaderId, NoopOption, + metrics::MetricsParams, relay_loop::Client as RelayClient, FailedClient, HeaderId, }; use std::fmt::Debug; @@ -143,7 +143,7 @@ async fn background_task( let mut relay_state = RelayState::Idle; let mut required_parachain_header_number = Zero::zero(); - let required_para_header_number_ref = Arc::new(Mutex::new(NoopOption::Noop)); + let required_para_header_number_ref = Arc::new(Mutex::new(AvailableHeader::Unavailable)); let mut restart_relay = true; let parachains_relay_task = futures::future::Fuse::terminated(); @@ -254,7 +254,7 @@ async fn background_task( }, RelayState::RelayingParaHeader(required_para_header) => { *required_para_header_number_ref.lock().await = - NoopOption::Some(required_para_header); + AvailableHeader::Available(required_para_header); }, } diff --git a/relays/lib-substrate-relay/src/parachains/source.rs b/relays/lib-substrate-relay/src/parachains/source.rs index fa3fef31bf..c23ac87947 100644 --- a/relays/lib-substrate-relay/src/parachains/source.rs +++ b/relays/lib-substrate-relay/src/parachains/source.rs @@ -25,16 +25,17 @@ use bp_polkadot_core::parachains::{ParaHash, ParaHead, ParaHeadsProof, ParaId}; use bp_runtime::HeaderIdProvider; use codec::Decode; use parachains_relay::{ - parachains_loop::SourceClient, parachains_loop_metrics::ParachainsLoopMetrics, + parachains_loop::{AvailableHeader, SourceClient}, + parachains_loop_metrics::ParachainsLoopMetrics, }; use relay_substrate_client::{ Chain, Client, Error as SubstrateError, HeaderIdOf, HeaderOf, RelayChain, }; -use relay_utils::{relay_loop::Client as RelayClient, NoopOption}; +use relay_utils::relay_loop::Client as RelayClient; /// Shared updatable reference to the maximal parachain header id that we want to sync from the /// source. -pub type RequiredHeaderIdRef = Arc>>>; +pub type RequiredHeaderIdRef = Arc>>>; /// Substrate client as parachain heads source. #[derive(Clone)] @@ -104,7 +105,7 @@ where at_block: HeaderIdOf, metrics: Option<&ParachainsLoopMetrics>, para_id: ParaId, - ) -> Result, Self::Error> { + ) -> Result, Self::Error> { // we don't need to support many parachains now if para_id.0 != P::SOURCE_PARACHAIN_PARA_ID { return Err(SubstrateError::Custom(format!( @@ -114,24 +115,24 @@ where ))) } - let mut para_head_id = NoopOption::None; + let mut para_head_id = AvailableHeader::Missing; if let Some(on_chain_para_head_id) = self.on_chain_para_head_id(at_block, para_id).await? { // Never return head that is larger than requested. This way we'll never sync // headers past `max_header_id`. para_head_id = match *self.max_head_id.lock().await { - NoopOption::Noop => NoopOption::Noop, - NoopOption::None => { + AvailableHeader::Unavailable => AvailableHeader::Unavailable, + AvailableHeader::Missing => { // `max_header_id` is not set. There is no limit. - NoopOption::Some(on_chain_para_head_id) + AvailableHeader::Available(on_chain_para_head_id) }, - NoopOption::Some(max_head_id) => { + AvailableHeader::Available(max_head_id) => { // We report at most `max_header_id`. - NoopOption::Some(std::cmp::min(on_chain_para_head_id, max_head_id)) + AvailableHeader::Available(std::cmp::min(on_chain_para_head_id, max_head_id)) }, } } - if let (Some(metrics), NoopOption::Some(para_head_id)) = (metrics, para_head_id) { + if let (Some(metrics), AvailableHeader::Available(para_head_id)) = (metrics, para_head_id) { metrics.update_best_parachain_block_at_source(para_id, para_head_id.0); } diff --git a/relays/parachains/src/parachains_loop.rs b/relays/parachains/src/parachains_loop.rs index 81435821d5..f44f986d63 100644 --- a/relays/parachains/src/parachains_loop.rs +++ b/relays/parachains/src/parachains_loop.rs @@ -24,9 +24,7 @@ use bp_polkadot_core::{ }; use futures::{future::FutureExt, select}; use relay_substrate_client::{BlockNumberOf, Chain, HeaderIdOf}; -use relay_utils::{ - metrics::MetricsParams, relay_loop::Client as RelayClient, FailedClient, NoopOption, -}; +use relay_utils::{metrics::MetricsParams, relay_loop::Client as RelayClient, FailedClient}; use std::{ collections::{BTreeMap, BTreeSet}, future::Future, @@ -54,6 +52,37 @@ pub enum ParachainSyncStrategy { All, } +/// Parachain header availability at a certain chain. +#[derive(Clone, Copy, Debug)] +pub enum AvailableHeader { + /// The client refuses to report parachain head at this moment. + /// + /// It is a "mild" error, which may appear when e.g. on-demand parachains relay is used. + /// This variant must be treated as "we don't want to update parachain head value at the + /// target chain at this moment". + Unavailable, + /// There's no parachain header at the relay chain. + /// + /// Normally it means that the parachain is not registered there. + Missing, + /// Parachain head with given hash is available at the source chain. + Available(T), +} + +impl AvailableHeader { + /// Transform contained value. + pub fn map(self, f: F) -> AvailableHeader + where + F: FnOnce(T) -> U, + { + match self { + AvailableHeader::Unavailable => AvailableHeader::Unavailable, + AvailableHeader::Missing => AvailableHeader::Missing, + AvailableHeader::Available(val) => AvailableHeader::Available(f(val)), + } + } +} + /// Source client used in parachain heads synchronization loop. #[async_trait] pub trait SourceClient: RelayClient { @@ -69,7 +98,7 @@ pub trait SourceClient: RelayClient { at_block: HeaderIdOf, metrics: Option<&ParachainsLoopMetrics>, para_id: ParaId, - ) -> Result, Self::Error>; + ) -> Result, Self::Error>; /// Get parachain heads proof. /// @@ -317,7 +346,7 @@ where /// Given heads at source and target clients, returns set of heads that are out of sync. fn select_parachains_to_update( - heads_at_source: BTreeMap>, + heads_at_source: BTreeMap>, heads_at_target: BTreeMap>, best_finalized_relay_block: HeaderIdOf, ) -> Vec @@ -343,12 +372,12 @@ where .zip(heads_at_target.into_iter()) .filter(|((para, head_at_source), (_, head_at_target))| { let needs_update = match (head_at_source, head_at_target) { - (NoopOption::Noop, _) => { + (AvailableHeader::Unavailable, _) => { // source client has politely asked us not to update current parachain head // at the target chain false }, - (NoopOption::Some(head_at_source), Some(head_at_target)) + (AvailableHeader::Available(head_at_source), Some(head_at_target)) if head_at_target.at_relay_block_number < best_finalized_relay_block.0 && head_at_target.head_hash != *head_at_source => { @@ -356,22 +385,22 @@ where // client true }, - (NoopOption::Some(_), Some(_)) => { + (AvailableHeader::Available(_), Some(_)) => { // this is normal case when relay has recently updated heads, when parachain is // not progressing, or when our source client is still syncing false }, - (NoopOption::Some(_), None) => { + (AvailableHeader::Available(_), None) => { // parachain is not yet known to the target client. This is true when parachain // or bridge has been just onboarded/started true }, - (NoopOption::None, Some(_)) => { + (AvailableHeader::Missing, Some(_)) => { // parachain/parathread has been offboarded removed from the system. It needs to // be propageted to the target client true }, - (NoopOption::None, None) => { + (AvailableHeader::Missing, None) => { // all's good - parachain is unknown to both clients false }, @@ -410,7 +439,7 @@ async fn read_heads_at_source( metrics: Option<&ParachainsLoopMetrics>, at_relay_block: &HeaderIdOf, parachains: &[ParaId], -) -> Result>, FailedClient> { +) -> Result>, FailedClient> { let mut para_head_hashes = BTreeMap::new(); for para in parachains { let para_head = source_client.parachain_head(*at_relay_block, metrics, *para).await; @@ -587,7 +616,7 @@ mod tests { #[derive(Clone, Debug)] struct TestClientData { source_sync_status: Result, - source_heads: BTreeMap, TestError>>, + source_heads: BTreeMap, TestError>>, source_proofs: BTreeMap, TestError>>, target_best_block: Result, TestError>, @@ -602,7 +631,7 @@ mod tests { pub fn minimal() -> Self { TestClientData { source_sync_status: Ok(true), - source_heads: vec![(PARA_ID, Ok(NoopOption::Some(PARA_0_HASH)))] + source_heads: vec![(PARA_ID, Ok(AvailableHeader::Available(PARA_0_HASH)))] .into_iter() .collect(), source_proofs: vec![(PARA_ID, Ok(PARA_0_HASH.encode()))].into_iter().collect(), @@ -651,10 +680,10 @@ mod tests { _at_block: HeaderIdOf, _metrics: Option<&ParachainsLoopMetrics>, para_id: ParaId, - ) -> Result, TestError> { + ) -> Result, TestError> { match self.data.lock().await.source_heads.get(¶_id.0).cloned() { Some(result) => result, - None => Ok(NoopOption::None), + None => Ok(AvailableHeader::Missing), } } @@ -963,7 +992,7 @@ mod tests { fn parachain_is_not_updated_if_it_is_unknown_to_both_clients() { assert_eq!( select_parachains_to_update::( - vec![(ParaId(PARA_ID), NoopOption::None)].into_iter().collect(), + vec![(ParaId(PARA_ID), AvailableHeader::Missing)].into_iter().collect(), vec![(ParaId(PARA_ID), None)].into_iter().collect(), HeaderId(10, Default::default()), ), @@ -975,7 +1004,9 @@ mod tests { fn parachain_is_not_updated_if_it_has_been_updated_at_better_relay_block() { assert_eq!( select_parachains_to_update::( - vec![(ParaId(PARA_ID), NoopOption::Some(PARA_0_HASH))].into_iter().collect(), + vec![(ParaId(PARA_ID), AvailableHeader::Available(PARA_0_HASH))] + .into_iter() + .collect(), vec![( ParaId(PARA_ID), Some(BestParaHeadHash { at_relay_block_number: 20, head_hash: PARA_1_HASH }) @@ -992,7 +1023,9 @@ mod tests { fn parachain_is_not_updated_if_hash_is_the_same_at_next_relay_block() { assert_eq!( select_parachains_to_update::( - vec![(ParaId(PARA_ID), NoopOption::Some(PARA_0_HASH))].into_iter().collect(), + vec![(ParaId(PARA_ID), AvailableHeader::Available(PARA_0_HASH))] + .into_iter() + .collect(), vec![( ParaId(PARA_ID), Some(BestParaHeadHash { at_relay_block_number: 0, head_hash: PARA_0_HASH }) @@ -1009,7 +1042,7 @@ mod tests { fn parachain_is_updated_after_offboarding() { assert_eq!( select_parachains_to_update::( - vec![(ParaId(PARA_ID), NoopOption::None)].into_iter().collect(), + vec![(ParaId(PARA_ID), AvailableHeader::Missing)].into_iter().collect(), vec![( ParaId(PARA_ID), Some(BestParaHeadHash { @@ -1029,7 +1062,9 @@ mod tests { fn parachain_is_updated_after_onboarding() { assert_eq!( select_parachains_to_update::( - vec![(ParaId(PARA_ID), NoopOption::Some(PARA_0_HASH))].into_iter().collect(), + vec![(ParaId(PARA_ID), AvailableHeader::Available(PARA_0_HASH))] + .into_iter() + .collect(), vec![(ParaId(PARA_ID), None)].into_iter().collect(), HeaderId(10, Default::default()), ), @@ -1041,7 +1076,9 @@ mod tests { fn parachain_is_updated_if_newer_head_is_known() { assert_eq!( select_parachains_to_update::( - vec![(ParaId(PARA_ID), NoopOption::Some(PARA_1_HASH))].into_iter().collect(), + vec![(ParaId(PARA_ID), AvailableHeader::Available(PARA_1_HASH))] + .into_iter() + .collect(), vec![( ParaId(PARA_ID), Some(BestParaHeadHash { at_relay_block_number: 0, head_hash: PARA_0_HASH }) @@ -1058,7 +1095,7 @@ mod tests { fn parachain_is_not_updated_if_source_head_is_unavailable() { assert_eq!( select_parachains_to_update::( - vec![(ParaId(PARA_ID), NoopOption::Noop)].into_iter().collect(), + vec![(ParaId(PARA_ID), AvailableHeader::Unavailable)].into_iter().collect(), vec![( ParaId(PARA_ID), Some(BestParaHeadHash { at_relay_block_number: 0, head_hash: PARA_0_HASH }) diff --git a/relays/utils/src/lib.rs b/relays/utils/src/lib.rs index 217210aa2c..603011819b 100644 --- a/relays/utils/src/lib.rs +++ b/relays/utils/src/lib.rs @@ -271,28 +271,3 @@ where }, } } - -/// An extension over rust's `Option` that adds an extra possibility: `Noop`. -#[derive(Clone, Copy, Debug)] -pub enum NoopOption { - /// If any operation depends on this `NoopOption`, we shouldn't perform it. - Noop, - /// Same as `Option::None`. - None, - /// Same as `Option::Some`. - Some(T), -} - -impl NoopOption { - /// Transform contained value. - pub fn map(self, f: F) -> NoopOption - where - F: FnOnce(T) -> U, - { - match self { - NoopOption::Noop => NoopOption::Noop, - NoopOption::None => NoopOption::None, - NoopOption::Some(val) => NoopOption::Some(f(val)), - } - } -}