Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion primitives/runtime/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,9 @@ pub const ACCOUNT_DERIVATION_PREFIX: &[u8] = b"pallet-bridge/account-derivation/
pub const ROOT_ACCOUNT_DERIVATION_PREFIX: &[u8] = b"pallet-bridge/account-derivation/root";

/// Generic header Id.
#[derive(RuntimeDebug, Default, Clone, Encode, Decode, Copy, Eq, Hash, PartialEq)]
#[derive(
RuntimeDebug, Default, Clone, Encode, Decode, Copy, Eq, Hash, PartialEq, PartialOrd, Ord,
)]
pub struct HeaderId<Hash, Number>(pub Number, pub Hash);

/// Generic header id provider.
Expand Down
11 changes: 9 additions & 2 deletions relays/bin-substrate/src/cli/relay_parachains.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,14 @@ use crate::chains::{
rialto_parachains_to_millau::RialtoParachainToMillauCliBridge,
westend_parachains_to_millau::WestmintToMillauCliBridge,
};
use async_std::sync::Mutex;
use async_trait::async_trait;
use bp_polkadot_core::parachains::ParaId;
use parachains_relay::parachains_loop::{ParachainSyncParams, SourceClient, TargetClient};
use parachains_relay::parachains_loop::{
AvailableHeader, ParachainSyncParams, SourceClient, TargetClient,
};
use relay_utils::metrics::{GlobalMetrics, StandaloneMetric};
use std::sync::Arc;
use structopt::StructOpt;
use strum::{EnumString, EnumVariantNames, VariantNames};
use substrate_relay_helper::{
Expand Down Expand Up @@ -65,7 +69,10 @@ where
{
async fn relay_headers(data: RelayParachains) -> anyhow::Result<()> {
let source_client = data.source.into_client::<Self::SourceRelay>().await?;
let source_client = ParachainsSource::<Self::ParachainFinality>::new(source_client, None);
let source_client = ParachainsSource::<Self::ParachainFinality>::new(
source_client,
Arc::new(Mutex::new(AvailableHeader::Missing)),
);

let target_transaction_params = TransactionParams {
signer: data.target_sign.to_keypair::<Self::Target>()?,
Expand Down
5 changes: 1 addition & 4 deletions relays/lib-substrate-relay/src/finality/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ use relay_substrate_client::{
BlockNumberOf, BlockWithJustification, Chain, Client, Error, HeaderOf,
};
use relay_utils::relay_loop::Client as RelayClient;
use sp_runtime::traits::Header as HeaderT;
use std::pin::Pin;

/// Shared updatable reference to the maximal header number that we want to sync from the source.
Expand Down Expand Up @@ -76,9 +75,7 @@ impl<P: SubstrateFinalitySyncPipeline> SubstrateFinalitySource<P> {
) -> Result<BlockNumberOf<P::SourceChain>, Error> {
// we **CAN** continue to relay finality proofs if source node is out of sync, because
// target node may be missing proofs that are already available at the source
let finalized_header_hash = self.client.best_finalized_header_hash().await?;
let finalized_header = self.client.header_by_hash(finalized_header_hash).await?;
Ok(*finalized_header.number())
self.client.best_finalized_header_number().await
}
}

Expand Down
22 changes: 9 additions & 13 deletions relays/lib-substrate-relay/src/on_demand/parachains.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ use bp_runtime::HeaderIdProvider;
use futures::{select, FutureExt};
use num_traits::Zero;
use pallet_bridge_parachains::{RelayBlockHash, RelayBlockHasher, RelayBlockNumber};
use parachains_relay::parachains_loop::{ParachainSyncParams, TargetClient};
use parachains_relay::parachains_loop::{AvailableHeader, ParachainSyncParams, TargetClient};
use relay_substrate_client::{
AccountIdOf, AccountKeyPairOf, BlockNumberOf, Chain, Client, Error as SubstrateError, HashOf,
TransactionSignScheme,
Expand Down Expand Up @@ -143,15 +143,15 @@ async fn background_task<P: SubstrateParachainsPipeline>(

let mut relay_state = RelayState::Idle;
let mut required_parachain_header_number = Zero::zero();
let required_para_header_number_ref = Arc::new(Mutex::new(None));
let required_para_header_number_ref = Arc::new(Mutex::new(AvailableHeader::Unavailable));

let mut restart_relay = true;
let parachains_relay_task = futures::future::Fuse::terminated();
futures::pin_mut!(parachains_relay_task);

let mut parachains_source = ParachainsSource::<P>::new(
source_relay_client.clone(),
Some(required_para_header_number_ref.clone()),
required_para_header_number_ref.clone(),
);
let mut parachains_target =
ParachainsTarget::<P>::new(target_client.clone(), target_transaction_params.clone());
Expand Down Expand Up @@ -253,7 +253,8 @@ async fn background_task<P: SubstrateParachainsPipeline>(
.await;
},
RelayState::RelayingParaHeader(required_para_header) => {
*required_para_header_number_ref.lock().await = Some(required_para_header);
*required_para_header_number_ref.lock().await =
AvailableHeader::Available(required_para_header);
},
}

Expand Down Expand Up @@ -389,13 +390,9 @@ where
source.client().best_finalized_header().await.map_err(map_source_err)?;
let best_finalized_relay_block_id = best_finalized_relay_header.id();
let para_header_at_source = source
.on_chain_parachain_header(
best_finalized_relay_block_id,
P::SOURCE_PARACHAIN_PARA_ID.into(),
)
.on_chain_para_head_id(best_finalized_relay_block_id, P::SOURCE_PARACHAIN_PARA_ID.into())
.await
.map_err(map_source_err)?
.map(|h| h.id());
.map_err(map_source_err)?;

let relay_header_at_source = best_finalized_relay_block_id.0;
let relay_header_at_target =
Expand All @@ -408,10 +405,9 @@ where
.map_err(map_target_err)?;

let para_header_at_relay_header_at_target = source
.on_chain_parachain_header(relay_header_at_target, P::SOURCE_PARACHAIN_PARA_ID.into())
.on_chain_para_head_id(relay_header_at_target, P::SOURCE_PARACHAIN_PARA_ID.into())
.await
.map_err(map_source_err)?
.map(|h| h.id());
.map_err(map_source_err)?;

Ok(RelayData {
required_para_header: required_header_number,
Expand Down
76 changes: 30 additions & 46 deletions relays/lib-substrate-relay/src/parachains/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,35 +22,35 @@ use async_std::sync::{Arc, Mutex};
use async_trait::async_trait;
use bp_parachains::parachain_head_storage_key_at_source;
use bp_polkadot_core::parachains::{ParaHash, ParaHead, ParaHeadsProof, ParaId};
use bp_runtime::HeaderIdProvider;
use codec::Decode;
use parachains_relay::{
parachains_loop::{ParaHashAtSource, SourceClient},
parachains_loop::{AvailableHeader, SourceClient},
parachains_loop_metrics::ParachainsLoopMetrics,
};
use relay_substrate_client::{
Chain, Client, Error as SubstrateError, HeaderIdOf, HeaderOf, RelayChain,
};
use relay_utils::relay_loop::Client as RelayClient;
use sp_runtime::traits::Header as HeaderT;

/// Shared updatable reference to the maximal parachain header id that we want to sync from the
/// source.
pub type RequiredHeaderIdRef<C> = Arc<Mutex<Option<HeaderIdOf<C>>>>;
pub type RequiredHeaderIdRef<C> = Arc<Mutex<AvailableHeader<HeaderIdOf<C>>>>;

/// Substrate client as parachain heads source.
#[derive(Clone)]
pub struct ParachainsSource<P: SubstrateParachainsPipeline> {
client: Client<P::SourceRelayChain>,
maximal_header_id: Option<RequiredHeaderIdRef<P::SourceParachain>>,
max_head_id: RequiredHeaderIdRef<P::SourceParachain>,
}

impl<P: SubstrateParachainsPipeline> ParachainsSource<P> {
/// Creates new parachains source client.
pub fn new(
client: Client<P::SourceRelayChain>,
maximal_header_id: Option<RequiredHeaderIdRef<P::SourceParachain>>,
max_head_id: RequiredHeaderIdRef<P::SourceParachain>,
) -> Self {
ParachainsSource { client, maximal_header_id }
ParachainsSource { client, max_head_id }
}

/// Returns reference to the underlying RPC client.
Expand All @@ -59,11 +59,11 @@ impl<P: SubstrateParachainsPipeline> ParachainsSource<P> {
}

/// Return decoded head of given parachain.
pub async fn on_chain_parachain_header(
pub async fn on_chain_para_head_id(
&self,
at_block: HeaderIdOf<P::SourceRelayChain>,
para_id: ParaId,
) -> Result<Option<HeaderOf<P::SourceParachain>>, SubstrateError> {
) -> Result<Option<HeaderIdOf<P::SourceParachain>>, SubstrateError> {
let storage_key =
parachain_head_storage_key_at_source(P::SourceRelayChain::PARAS_PALLET_NAME, para_id);
let para_head = self.client.raw_storage_value(storage_key, Some(at_block.1)).await?;
Expand All @@ -72,8 +72,8 @@ impl<P: SubstrateParachainsPipeline> ParachainsSource<P> {
Some(para_head) => para_head,
None => return Ok(None),
};

Ok(Some(Decode::decode(&mut &para_head.0[..])?))
let para_head: HeaderOf<P::SourceParachain> = Decode::decode(&mut &para_head.0[..])?;
Ok(Some(para_head.id()))
}
}

Expand Down Expand Up @@ -105,7 +105,7 @@ where
at_block: HeaderIdOf<P::SourceRelayChain>,
metrics: Option<&ParachainsLoopMetrics>,
para_id: ParaId,
) -> Result<ParaHashAtSource, Self::Error> {
) -> Result<AvailableHeader<ParaHash>, Self::Error> {
// we don't need to support many parachains now
if para_id.0 != P::SOURCE_PARACHAIN_PARA_ID {
return Err(SubstrateError::Custom(format!(
Expand All @@ -115,44 +115,28 @@ where
)))
}

let mut para_hash_at_source = ParaHashAtSource::None;
let mut para_header_number_at_source = None;
match self.on_chain_parachain_header(at_block, para_id).await? {
Some(parachain_header) => {
para_hash_at_source = ParaHashAtSource::Some(parachain_header.hash());
para_header_number_at_source = Some(*parachain_header.number());
// never return head that is larger than requested. This way we'll never sync
// headers past `maximal_header_id`
if let Some(ref maximal_header_id) = self.maximal_header_id {
let maximal_header_id = *maximal_header_id.lock().await;
match maximal_header_id {
Some(maximal_header_id)
if *parachain_header.number() > maximal_header_id.0 =>
{
// we don't want this header yet => let's report previously requested
// header
para_hash_at_source = ParaHashAtSource::Some(maximal_header_id.1);
para_header_number_at_source = Some(maximal_header_id.0);
},
Some(_) => (),
None => {
// on-demand relay has not yet asked us to sync anything let's do that
para_hash_at_source = ParaHashAtSource::Unavailable;
para_header_number_at_source = None;
},
}
}
},
None => {},
};
let mut para_head_id = AvailableHeader::Missing;
if let Some(on_chain_para_head_id) = self.on_chain_para_head_id(at_block, para_id).await? {
// Never return head that is larger than requested. This way we'll never sync
// headers past `max_header_id`.
para_head_id = match *self.max_head_id.lock().await {
AvailableHeader::Unavailable => AvailableHeader::Unavailable,
AvailableHeader::Missing => {
// `max_header_id` is not set. There is no limit.
AvailableHeader::Available(on_chain_para_head_id)
},
AvailableHeader::Available(max_head_id) => {
// We report at most `max_header_id`.
AvailableHeader::Available(std::cmp::min(on_chain_para_head_id, max_head_id))
},
}
}

if let (Some(metrics), Some(para_header_number_at_source)) =
(metrics, para_header_number_at_source)
{
metrics.update_best_parachain_block_at_source(para_id, para_header_number_at_source);
if let (Some(metrics), AvailableHeader::Available(para_head_id)) = (metrics, para_head_id) {
metrics.update_best_parachain_block_at_source(para_id, para_head_id.0);
}

Ok(para_hash_at_source)
Ok(para_head_id.map(|para_head_id| para_head_id.1))
}

async fn prove_parachain_heads(
Expand Down
Loading