diff --git a/cumulus/client/consensus/aura/src/collators/lookahead.rs b/cumulus/client/consensus/aura/src/collators/lookahead.rs index 7e9a6121be33f..8dd695b7bd233 100644 --- a/cumulus/client/consensus/aura/src/collators/lookahead.rs +++ b/cumulus/client/consensus/aura/src/collators/lookahead.rs @@ -62,6 +62,7 @@ use sp_core::crypto::Pair; use sp_inherents::CreateInherentDataProviders; use sp_keystore::KeystorePtr; use sp_runtime::traits::{Block as BlockT, Header as HeaderT, Member}; +use sp_timestamp::Timestamp; use std::{path::PathBuf, sync::Arc, time::Duration}; /// Parameters for [`run`]. @@ -105,6 +106,50 @@ pub struct Params { pub max_pov_percentage: Option, } +/// Get the current parachain slot from a given block hash. +/// +/// Returns the parachain slot, relay chain slot, and timestamp. +fn get_parachain_slot( + para_client: &Client, + block_hash: Block::Hash, + relay_parent_header: &polkadot_primitives::Header, + relay_chain_slot_duration: Duration, +) -> Option<(Slot, Slot, Timestamp)> +where + Block: BlockT, + Client: ProvideRuntimeApi, + Client::Api: AuraApi, + P: Codec, +{ + let slot_duration = + match sc_consensus_aura::standalone::slot_duration_at(para_client, block_hash) { + Ok(sd) => sd, + Err(err) => { + tracing::error!(target: crate::LOG_TARGET, ?err, "Failed to acquire parachain slot duration"); + return None + }, + }; + + tracing::debug!(target: crate::LOG_TARGET, ?slot_duration, ?block_hash, "Parachain slot duration acquired"); + + let (relay_slot, timestamp) = + consensus_common::relay_slot_and_timestamp(relay_parent_header, relay_chain_slot_duration)?; + + let slot_now = Slot::from_timestamp(timestamp, slot_duration); + + tracing::debug!( + target: crate::LOG_TARGET, + ?relay_slot, + para_slot = ?slot_now, + ?timestamp, + ?slot_duration, + ?relay_chain_slot_duration, + "Adjusted relay-chain slot to parachain slot" + ); + + Some((slot_now, relay_slot, timestamp)) +} + /// Run async-backing-friendly Aura. pub fn run( params: Params, @@ -223,12 +268,10 @@ where collator_util::Collator::::new(params) }; - let mut connection_helper: BackingGroupConnectionHelper = - BackingGroupConnectionHelper::new( - params.para_client.clone(), - params.keystore.clone(), - params.overseer_handle.clone(), - ); + let mut connection_helper = BackingGroupConnectionHelper::new( + params.keystore.clone(), + params.overseer_handle.clone(), + ); while let Some(relay_parent_header) = import_notifications.next().await { let relay_parent = relay_parent_header.hash(); @@ -280,42 +323,21 @@ where let para_client = &*params.para_client; let keystore = ¶ms.keystore; let can_build_upon = |block_hash| { - let slot_duration = match sc_consensus_aura::standalone::slot_duration_at( - &*params.para_client, + let (slot_now, relay_slot, timestamp) = get_parachain_slot::<_, _, P::Public>( + para_client, block_hash, - ) { - Ok(sd) => sd, - Err(err) => { - tracing::error!(target: crate::LOG_TARGET, ?err, "Failed to acquire parachain slot duration"); - return None - }, - }; - tracing::debug!(target: crate::LOG_TARGET, ?slot_duration, ?block_hash, "Parachain slot duration acquired"); - let (relay_slot, timestamp) = consensus_common::relay_slot_and_timestamp( &relay_parent_header, params.relay_chain_slot_duration, )?; - let slot_now = Slot::from_timestamp(timestamp, slot_duration); - tracing::debug!( - target: crate::LOG_TARGET, - ?relay_slot, - para_slot = ?slot_now, - ?timestamp, - ?slot_duration, - relay_chain_slot_duration = ?params.relay_chain_slot_duration, - "Adjusted relay-chain slot to parachain slot" - ); - Some(( + + Some(super::can_build_upon::<_, _, P>( slot_now, - super::can_build_upon::<_, _, P>( - slot_now, - relay_slot, - timestamp, - block_hash, - included_block.hash(), - para_client, - &keystore, - ), + relay_slot, + timestamp, + block_hash, + included_block.hash(), + para_client, + &keystore, )) }; @@ -330,15 +352,25 @@ where continue } + // Trigger pre-conect to backing groups if necessary. + if let (Some((slot_now, _relay_slot, _timestamp)), Ok(authorities)) = ( + get_parachain_slot::<_, _, P::Public>( + para_client, + parent_hash, + &relay_parent_header, + params.relay_chain_slot_duration, + ), + para_client.runtime_api().authorities(parent_hash), + ) { + connection_helper.update::

(slot_now, &authorities).await; + } + // This needs to change to support elastic scaling, but for continuously // scheduled chains this ensures that the backlog will grow steadily. for n_built in 0..2 { let slot_claim = match can_build_upon(parent_hash) { - Some((current_slot, fut)) => match fut.await { - None => { - connection_helper.update::(current_slot, parent_hash).await; - break - }, + Some(fut) => match fut.await { + None => break, Some(c) => c, }, None => break, diff --git a/cumulus/client/consensus/aura/src/collators/mod.rs b/cumulus/client/consensus/aura/src/collators/mod.rs index 1a945236b392c..d938dca69282f 100644 --- a/cumulus/client/consensus/aura/src/collators/mod.rs +++ b/cumulus/client/consensus/aura/src/collators/mod.rs @@ -58,20 +58,15 @@ const PARENT_SEARCH_DEPTH: usize = 40; // Helper to pre-connect to the backing group we got assigned to and keep the connection // open until backing group changes or own slot ends. -struct BackingGroupConnectionHelper { - client: std::sync::Arc, +struct BackingGroupConnectionHelper { keystore: sp_keystore::KeystorePtr, overseer_handle: OverseerHandle, our_slot: Option, } -impl BackingGroupConnectionHelper { - pub fn new( - client: std::sync::Arc, - keystore: sp_keystore::KeystorePtr, - overseer_handle: OverseerHandle, - ) -> Self { - Self { client, keystore, overseer_handle, our_slot: None } +impl BackingGroupConnectionHelper { + pub fn new(keystore: sp_keystore::KeystorePtr, overseer_handle: OverseerHandle) -> Self { + Self { keystore, overseer_handle, our_slot: None } } async fn send_subsystem_message(&mut self, message: CollatorProtocolMessage) { @@ -79,12 +74,8 @@ impl BackingGroupConnectionHelper { } /// Update the current slot and initiate connections to backing groups if needed. - pub async fn update(&mut self, current_slot: Slot, best_block: Block::Hash) + pub async fn update

(&mut self, current_slot: Slot, authorities: &[P::Public]) where - Block: sp_runtime::traits::Block, - Client: - sc_client_api::HeaderBackend + Send + Sync + ProvideRuntimeApi + 'static, - Client::Api: AuraApi, P: sp_core::Pair + Send + Sync, P::Public: Codec, { @@ -94,21 +85,21 @@ impl BackingGroupConnectionHelper { return } - let Some(authorities) = self.client.runtime_api().authorities(best_block).ok() else { - return - }; - let next_slot = current_slot + 1; let next_slot_is_ours = - aura_internal::claim_slot::

(next_slot, &authorities, &self.keystore) + aura_internal::claim_slot::

(next_slot, authorities, &self.keystore) .await .is_some(); if next_slot_is_ours { - // Next slot is ours, send connect message. - tracing::debug!(target: crate::LOG_TARGET, "Our slot {} is next, connecting to backing groups", next_slot); - self.send_subsystem_message(CollatorProtocolMessage::ConnectToBackingGroups) - .await; + // Only send message if we were not connected. This avoids sending duplicate messages + // when running with a single collator. + if self.our_slot.is_none() { + // Next slot is ours, send connect message. + tracing::debug!(target: crate::LOG_TARGET, "Our slot {} is next, connecting to backing groups", next_slot); + self.send_subsystem_message(CollatorProtocolMessage::ConnectToBackingGroups) + .await; + } self.our_slot = Some(next_slot); } else if self.our_slot.take().is_some() { // Next slot is not ours, send disconnect only if we had a slot before. @@ -465,15 +456,18 @@ mod tests { #[tokio::test] async fn preconnect_when_next_slot_is_ours() { - let (client, keystore) = set_up_components(6); + let (client, keystore) = set_up_components(1); let genesis_hash = client.chain_info().genesis_hash; let (overseer_handle, messages_recorder) = create_overseer_handle(); - let mut helper = BackingGroupConnectionHelper::new(client, keystore, overseer_handle); + let mut helper = BackingGroupConnectionHelper::new(keystore, overseer_handle); - // Update with slot 0, next slot (1) should be ours + // Fetch authorities for the update call + let authorities = client.runtime_api().authorities(genesis_hash).unwrap(); + + // Update with slot 5, next slot (6) should be ours helper - .update::(Slot::from(0), genesis_hash) + .update::(Slot::from(5), &authorities) .await; // Give time for message to be processed @@ -482,20 +476,23 @@ mod tests { let messages = messages_recorder.lock().unwrap(); assert_eq!(messages.len(), 1); assert!(matches!(messages[0], CollatorProtocolMessage::ConnectToBackingGroups)); - assert_eq!(helper.our_slot, Some(Slot::from(1))); + assert_eq!(helper.our_slot, Some(Slot::from(6))); } #[tokio::test] async fn preconnect_no_duplicate_connect_message() { - let (client, keystore) = set_up_components(6); + let (client, keystore) = set_up_components(1); let genesis_hash = client.chain_info().genesis_hash; let (overseer_handle, messages_recorder) = create_overseer_handle(); - let mut helper = BackingGroupConnectionHelper::new(client, keystore, overseer_handle); + let mut helper = BackingGroupConnectionHelper::new(keystore, overseer_handle); - // Update with slot 0, next slot (1) is ours + // Fetch authorities for the update calls + let authorities = client.runtime_api().authorities(genesis_hash).unwrap(); + + // Update with slot 5, next slot (6) is ours helper - .update::(Slot::from(0), genesis_hash) + .update::(Slot::from(5), &authorities) .await; // Give time for message to be processed @@ -503,16 +500,16 @@ mod tests { assert_eq!(messages_recorder.lock().unwrap().len(), 1); messages_recorder.lock().unwrap().clear(); - // Update with slot 0 again - should not send another message + // Update with slot 5 again - should not send another message helper - .update::(Slot::from(0), genesis_hash) + .update::(Slot::from(5), &authorities) .await; tokio::time::sleep(tokio::time::Duration::from_millis(50)).await; assert_eq!(messages_recorder.lock().unwrap().len(), 0); // Update with slot 1 (our slot) - should not send another message helper - .update::(Slot::from(1), genesis_hash) + .update::(Slot::from(6), &authorities) .await; tokio::time::sleep(tokio::time::Duration::from_millis(50)).await; assert_eq!(messages_recorder.lock().unwrap().len(), 0); @@ -524,14 +521,17 @@ mod tests { let genesis_hash = client.chain_info().genesis_hash; let (overseer_handle, messages_recorder) = create_overseer_handle(); - let mut helper = BackingGroupConnectionHelper::new(client, keystore, overseer_handle); + let mut helper = BackingGroupConnectionHelper::new(keystore, overseer_handle); + + // Fetch authorities for the update calls + let authorities = client.runtime_api().authorities(genesis_hash).unwrap(); // Slot 0 -> Alice, Slot 1 -> Bob, Slot 2 -> Charlie, Slot 3 -> Dave, Slot 4 -> Eve, // Slot 5 -> Ferdie, Slot 6 -> Alice // Update with slot 5, next slot (6) is ours -> should connect helper - .update::(Slot::from(5), genesis_hash) + .update::(Slot::from(5), &authorities) .await; tokio::time::sleep(tokio::time::Duration::from_millis(50)).await; assert_eq!(helper.our_slot, Some(Slot::from(6))); @@ -539,7 +539,7 @@ mod tests { // Update with slot 8, next slot (9) is Charlie's -> should disconnect helper - .update::(Slot::from(8), genesis_hash) + .update::(Slot::from(8), &authorities) .await; tokio::time::sleep(tokio::time::Duration::from_millis(50)).await; @@ -555,7 +555,7 @@ mod tests { // Update again with slot 8, next slot (9) is Charlie's -> should not send another // disconnect message helper - .update::(Slot::from(8), genesis_hash) + .update::(Slot::from(8), &authorities) .await; tokio::time::sleep(tokio::time::Duration::from_millis(50)).await; @@ -570,7 +570,10 @@ mod tests { let genesis_hash = client.chain_info().genesis_hash; let (overseer_handle, messages_recorder) = create_overseer_handle(); - let mut helper = BackingGroupConnectionHelper::new(client, keystore, overseer_handle); + let mut helper = BackingGroupConnectionHelper::new(keystore, overseer_handle); + + // Fetch authorities for the update call + let authorities = client.runtime_api().authorities(genesis_hash).unwrap(); // Slot 0 -> Alice, Slot 1 -> Bob, Slot 2 -> Charlie, Slot 3 -> Dave, Slot 4 -> Eve, // Slot 5 -> Ferdie @@ -578,7 +581,7 @@ mod tests { // Update with slot 1 (Bob's slot), next slot (2) is Charlie's // Since we never connected before (our_slot is None), we should not send disconnect helper - .update::(Slot::from(1), genesis_hash) + .update::(Slot::from(1), &authorities) .await; tokio::time::sleep(tokio::time::Duration::from_millis(50)).await; @@ -593,14 +596,17 @@ mod tests { let genesis_hash = client.chain_info().genesis_hash; let (overseer_handle, messages_recorder) = create_overseer_handle(); - let mut helper = BackingGroupConnectionHelper::new(client, keystore, overseer_handle); + let mut helper = BackingGroupConnectionHelper::new(keystore, overseer_handle); + + // Fetch authorities for the update calls + let authorities = client.runtime_api().authorities(genesis_hash).unwrap(); // Slot 0 -> Alice, Slot 1 -> Bob, Slot 2 -> Charlie, Slot 3 -> Dave, Slot 4 -> Eve, // Slot 5 -> Ferdie, Slot 6 -> Alice, Slot 7 -> Bob, ... // Cycle 1: Connect at slot 5, next slot (6) is ours helper - .update::(Slot::from(5), genesis_hash) + .update::(Slot::from(5), &authorities) .await; tokio::time::sleep(tokio::time::Duration::from_millis(50)).await; { @@ -613,7 +619,7 @@ mod tests { // Cycle 1: Disconnect at slot 7, next slot (8) is Charlie's helper - .update::(Slot::from(7), genesis_hash) + .update::(Slot::from(7), &authorities) .await; tokio::time::sleep(tokio::time::Duration::from_millis(50)).await; { @@ -626,10 +632,7 @@ mod tests { // Cycle 2: Connect again at slot 11, next slot (12) is ours helper - .update::( - Slot::from(11), - genesis_hash, - ) + .update::(Slot::from(11), &authorities) .await; tokio::time::sleep(tokio::time::Duration::from_millis(50)).await; { @@ -641,20 +644,20 @@ mod tests { } #[tokio::test] - async fn preconnect_handles_runtime_api_error() { + async fn preconnect_handles_empty_authorities() { let keystore = Arc::new(sp_keystore::testing::MemoryKeystore::new()) as Arc<_>; - let client = Arc::new(TestClientBuilder::new().build()); let (overseer_handle, messages_recorder) = create_overseer_handle(); - let mut helper = BackingGroupConnectionHelper::new(client, keystore, overseer_handle); + let mut helper = BackingGroupConnectionHelper::new(keystore, overseer_handle); - let invalid_hash = Hash::default(); + // Pass empty authorities list + let authorities = vec![]; helper - .update::(Slot::from(0), invalid_hash) + .update::(Slot::from(0), &authorities) .await; tokio::time::sleep(tokio::time::Duration::from_millis(50)).await; - // Should not send any message if runtime API fails + // Should not send any message if authorities list is empty assert_eq!(messages_recorder.lock().unwrap().len(), 0); } } diff --git a/cumulus/client/consensus/aura/src/collators/slot_based/block_builder_task.rs b/cumulus/client/consensus/aura/src/collators/slot_based/block_builder_task.rs index affce0a29fcae..6e5fb5dd83c6d 100644 --- a/cumulus/client/consensus/aura/src/collators/slot_based/block_builder_task.rs +++ b/cumulus/client/consensus/aura/src/collators/slot_based/block_builder_task.rs @@ -184,16 +184,14 @@ where }; let mut relay_chain_data_cache = RelayChainDataCache::new(relay_client.clone(), para_id); - - let mut maybe_connection_helper = relay_client - .overseer_handle() - .ok() - .map(|h| BackingGroupConnectionHelper::new(para_client.clone(), keystore.clone(), h.clone())) - .or_else(|| { - tracing::warn!(target: LOG_TARGET, - "Relay chain interface does not provide overseer handle. Backing group pre-connect is disabled."); - None - }); + let mut connection_helper = BackingGroupConnectionHelper::new( + keystore.clone(), + relay_client + .overseer_handle() + // Should never fail. If it fails, then providing collations to relay chain + // doesn't work either. So it is fine to panic here. + .expect("Relay chain interface must provide overseer handle."), + ); loop { // We wait here until the next slot arrives. @@ -310,6 +308,10 @@ where let included_header_hash = included_header.hash(); + if let Ok(authorities) = para_client.runtime_api().authorities(parent_hash) { + connection_helper.update::

(para_slot.slot, &authorities).await; + } + let slot_claim = match crate::collators::can_build_upon::<_, _, P>( para_slot.slot, relay_slot, @@ -334,9 +336,6 @@ where slot = ?para_slot.slot, "Not building block." ); - if let Some(ref mut connection_helper) = maybe_connection_helper { - connection_helper.update::(para_slot.slot, parent_hash).await; - } continue }, }; diff --git a/prdoc/pr_10305.prdoc b/prdoc/pr_10305.prdoc new file mode 100644 index 0000000000000..05a73937d871f --- /dev/null +++ b/prdoc/pr_10305.prdoc @@ -0,0 +1,10 @@ +title: 'Cumulus: fix pre-connect to backers for single collator parachains' +doc: +- audience: Node Dev + description: |- + When running a single collator (most commonly on testnets), the block builder task is always + able to claim a slot, so we're never triggering the pre-connect mechanism which happens for + slots owned by other authors. +crates: +- name: cumulus-client-consensus-aura + bump: patch