Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
116 changes: 74 additions & 42 deletions cumulus/client/consensus/aura/src/collators/lookahead.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ use sp_core::crypto::Pair;
use sp_inherents::CreateInherentDataProviders;
use sp_keystore::KeystorePtr;
use sp_runtime::traits::{Block as BlockT, Header as HeaderT, Member};
use sp_timestamp::Timestamp;
use std::{path::PathBuf, sync::Arc, time::Duration};

/// Parameters for [`run`].
Expand Down Expand Up @@ -105,6 +106,50 @@ pub struct Params<BI, CIDP, Client, Backend, RClient, CHP, Proposer, CS> {
pub max_pov_percentage: Option<u32>,
}

/// Get the current parachain slot from a given block hash.
///
/// Returns the parachain slot, relay chain slot, and timestamp.
fn get_parachain_slot<Block, Client, P>(
para_client: &Client,
block_hash: Block::Hash,
relay_parent_header: &polkadot_primitives::Header,
relay_chain_slot_duration: Duration,
) -> Option<(Slot, Slot, Timestamp)>
where
Block: BlockT,
Client: ProvideRuntimeApi<Block>,
Client::Api: AuraApi<Block, P>,
P: Codec,
{
let slot_duration =
match sc_consensus_aura::standalone::slot_duration_at(para_client, block_hash) {
Ok(sd) => sd,
Err(err) => {
tracing::error!(target: crate::LOG_TARGET, ?err, "Failed to acquire parachain slot duration");
return None
},
};

tracing::debug!(target: crate::LOG_TARGET, ?slot_duration, ?block_hash, "Parachain slot duration acquired");

let (relay_slot, timestamp) =
consensus_common::relay_slot_and_timestamp(relay_parent_header, relay_chain_slot_duration)?;

let slot_now = Slot::from_timestamp(timestamp, slot_duration);

tracing::debug!(
target: crate::LOG_TARGET,
?relay_slot,
para_slot = ?slot_now,
?timestamp,
?slot_duration,
?relay_chain_slot_duration,
"Adjusted relay-chain slot to parachain slot"
);

Some((slot_now, relay_slot, timestamp))
}

/// Run async-backing-friendly Aura.
pub fn run<Block, P, BI, CIDP, Client, Backend, RClient, CHP, Proposer, CS>(
params: Params<BI, CIDP, Client, Backend, RClient, CHP, Proposer, CS>,
Expand Down Expand Up @@ -223,12 +268,10 @@ where
collator_util::Collator::<Block, P, _, _, _, _, _>::new(params)
};

let mut connection_helper: BackingGroupConnectionHelper<Client> =
BackingGroupConnectionHelper::new(
params.para_client.clone(),
params.keystore.clone(),
params.overseer_handle.clone(),
);
let mut connection_helper = BackingGroupConnectionHelper::new(
params.keystore.clone(),
params.overseer_handle.clone(),
);

while let Some(relay_parent_header) = import_notifications.next().await {
let relay_parent = relay_parent_header.hash();
Expand Down Expand Up @@ -280,42 +323,21 @@ where
let para_client = &*params.para_client;
let keystore = &params.keystore;
let can_build_upon = |block_hash| {
let slot_duration = match sc_consensus_aura::standalone::slot_duration_at(
&*params.para_client,
let (slot_now, relay_slot, timestamp) = get_parachain_slot::<_, _, P::Public>(
para_client,
block_hash,
) {
Ok(sd) => sd,
Err(err) => {
tracing::error!(target: crate::LOG_TARGET, ?err, "Failed to acquire parachain slot duration");
return None
},
};
tracing::debug!(target: crate::LOG_TARGET, ?slot_duration, ?block_hash, "Parachain slot duration acquired");
let (relay_slot, timestamp) = consensus_common::relay_slot_and_timestamp(
&relay_parent_header,
params.relay_chain_slot_duration,
)?;
let slot_now = Slot::from_timestamp(timestamp, slot_duration);
tracing::debug!(
target: crate::LOG_TARGET,
?relay_slot,
para_slot = ?slot_now,
?timestamp,
?slot_duration,
relay_chain_slot_duration = ?params.relay_chain_slot_duration,
"Adjusted relay-chain slot to parachain slot"
);
Some((

Some(super::can_build_upon::<_, _, P>(
slot_now,
super::can_build_upon::<_, _, P>(
slot_now,
relay_slot,
timestamp,
block_hash,
included_block.hash(),
para_client,
&keystore,
),
relay_slot,
timestamp,
block_hash,
included_block.hash(),
para_client,
&keystore,
))
};

Expand All @@ -330,15 +352,25 @@ where
continue
}

// Trigger pre-conect to backing groups if necessary.
if let (Some((slot_now, _relay_slot, _timestamp)), Ok(authorities)) = (
get_parachain_slot::<_, _, P::Public>(
para_client,
parent_hash,
&relay_parent_header,
params.relay_chain_slot_duration,
),
para_client.runtime_api().authorities(parent_hash),
) {
connection_helper.update::<P>(slot_now, &authorities).await;
}

// This needs to change to support elastic scaling, but for continuously
// scheduled chains this ensures that the backlog will grow steadily.
for n_built in 0..2 {
let slot_claim = match can_build_upon(parent_hash) {
Some((current_slot, fut)) => match fut.await {
None => {
connection_helper.update::<Block, P>(current_slot, parent_hash).await;
break
},
Some(fut) => match fut.await {
None => break,
Some(c) => c,
},
None => break,
Expand Down
Loading
Loading