diff --git a/polkadot/node/network/collator-protocol/src/lib.rs b/polkadot/node/network/collator-protocol/src/lib.rs index 8c37b6be53654..d3b5ad0814830 100644 --- a/polkadot/node/network/collator-protocol/src/lib.rs +++ b/polkadot/node/network/collator-protocol/src/lib.rs @@ -22,25 +22,32 @@ #![recursion_limit = "256"] use std::{ - collections::HashSet, + collections::{HashMap, HashSet}, sync::Arc, time::{Duration, Instant}, }; use futures::{ + channel::oneshot, stream::{FusedStream, StreamExt}, FutureExt, TryFutureExt, }; +use polkadot_node_subsystem::CollatorProtocolSenderTrait; use polkadot_node_subsystem_util::{database::Database, reputation::ReputationAggregator}; +use sp_consensus_babe::digests::CompatibleDigestItem; +use sp_core::H256; use sp_keystore::KeystorePtr; use polkadot_node_network_protocol::{ request_response::{v2 as protocol_v2, IncomingRequestReceiver}, PeerId, UnifiedReputationChange as Rep, }; -use polkadot_node_subsystem::{errors::SubsystemError, overseer, DummySubsystem, SpawnedSubsystem}; -use polkadot_primitives::CollatorPair; +use polkadot_node_subsystem::{ + errors::SubsystemError, messages::ChainApiMessage, overseer, DummySubsystem, SpawnedSubsystem, +}; +use polkadot_primitives::{CollatorPair, Hash, RELAY_CHAIN_SLOT_DURATION_MILLIS}; +use sp_consensus_slots::SlotDuration; pub use validator_side_experimental::ReputationConfig; mod collator_side; @@ -206,3 +213,48 @@ fn tick_stream(period: Duration) -> impl FusedStream { }) .fuse() } + +/// Scheduling info tracked per active leaf, used for V3 scheduling parent validation. +/// Stores the leaf's BABE slot and parent hash so the validator can determine whether +/// the scheduling parent corresponds to the last finished relay chain slot. +struct LeafSchedulingInfo { + /// The parent hash of the leaf block. + parent_hash: Hash, + /// The BABE slot of the leaf block. + slot: sp_consensus_slots::Slot, +} + +pub(crate) async fn extract_leaf_scheduling_info( + sender: &mut Sender, + leaf: H256, +) -> Option { + // Fetch leaf header to extract BABE slot for V3 scheduling parent validation. + // Without this info, V3 advertisements referencing this leaf will be rejected. + let (tx, rx) = oneshot::channel(); + sender.send_message(ChainApiMessage::BlockHeader(leaf, tx)).await; + let header = rx.await.ok().and_then(|r| r.ok().flatten()); + header.and_then(|header| { + let slot = header.digest.logs().iter().find_map(|log| log.as_babe_pre_digest())?.slot(); + Some(LeafSchedulingInfo { parent_hash: header.parent_hash, slot }) + }) +} + +pub(crate) fn is_scheduling_parent_valid( + scheduling_parent: &Hash, + leaf_scheduling_info: &HashMap, +) -> bool { + let slot_duration = SlotDuration::from_millis(RELAY_CHAIN_SLOT_DURATION_MILLIS); + let current_slot = + sp_consensus_slots::Slot::from_timestamp(sp_timestamp::Timestamp::current(), slot_duration); + if let Some(info) = leaf_scheduling_info.get(scheduling_parent) { + // scheduling_parent is a leaf. This is allowed only when the leaf's slot is + // the previous slot. + *current_slot == *info.slot + 1 + } else { + // scheduling_parent is not a leaf. This is allowed only if the sp is the parent of + // any leaf whose slot is still in progress. + leaf_scheduling_info + .iter() + .any(|(_, info)| *current_slot == *info.slot && *scheduling_parent == info.parent_hash) + } +} diff --git a/polkadot/node/network/collator-protocol/src/validator_side/mod.rs b/polkadot/node/network/collator-protocol/src/validator_side/mod.rs index 375d7440c55d1..e320b6a50aba2 100644 --- a/polkadot/node/network/collator-protocol/src/validator_side/mod.rs +++ b/polkadot/node/network/collator-protocol/src/validator_side/mod.rs @@ -143,6 +143,7 @@ use tokio_util::sync::CancellationToken; use sp_keystore::KeystorePtr; +use crate::{extract_leaf_scheduling_info, is_scheduling_parent_valid, LeafSchedulingInfo}; use polkadot_node_network_protocol::{ self as net_protocol, peer_set::{CollationVersion, PeerSet, MAX_AUTHORITY_INCOMING_STREAMS}, @@ -156,9 +157,9 @@ use polkadot_node_network_protocol::{ use polkadot_node_primitives::{SignedFullStatement, Statement}; use polkadot_node_subsystem::{ messages::{ - CanSecondRequest, CandidateBackingMessage, ChainApiMessage, CollatorProtocolMessage, - IfDisconnected, NetworkBridgeEvent, NetworkBridgeTxMessage, ParentHeadData, - ProspectiveParachainsMessage, ProspectiveValidationDataRequest, + CanSecondRequest, CandidateBackingMessage, CollatorProtocolMessage, IfDisconnected, + NetworkBridgeEvent, NetworkBridgeTxMessage, ParentHeadData, ProspectiveParachainsMessage, + ProspectiveValidationDataRequest, }, overseer, CollatorProtocolSenderTrait, FromOrchestra, OverseerSignal, SubsystemError, }; @@ -170,10 +171,7 @@ use polkadot_node_subsystem_util::{ use polkadot_primitives::{ CandidateDescriptorV2, CandidateDescriptorVersion, CandidateHash, CollatorId, CoreIndex, Hash, HeadData, Id as ParaId, OccupiedCoreAssumption, PersistedValidationData, SessionIndex, - RELAY_CHAIN_SLOT_DURATION_MILLIS, }; -use sp_consensus_babe::digests::CompatibleDigestItem; -use sp_consensus_slots::SlotDuration; use super::{modify_reputation, tick_stream, LOG_TARGET}; @@ -580,16 +578,6 @@ struct HeldOffAdvertisement { advertised_descriptor_version: Option, } -/// Scheduling info tracked per active leaf, used for V3 scheduling parent validation. -/// Stores the leaf's BABE slot and parent hash so the validator can determine whether -/// the scheduling parent corresponds to the last finished relay chain slot. -struct LeafSchedulingInfo { - /// The parent hash of the leaf block. - parent_hash: Hash, - /// The BABE slot of the leaf block. - slot: sp_consensus_slots::Slot, -} - /// All state relevant for the validator side of the protocol lives here. #[derive(Default)] struct State { @@ -1752,26 +1740,7 @@ where // finished relay chain slot. We compare slot numbers rather than timestamps to keep // the logic simple and aligned with how BABE/Aura reason about slots. if candidate_descriptor_version == CandidateDescriptorVersion::V3 { - let slot_duration = SlotDuration::from_millis(RELAY_CHAIN_SLOT_DURATION_MILLIS); - let current_slot = sp_consensus_slots::Slot::from_timestamp( - sp_timestamp::Timestamp::current(), - slot_duration, - ); - - let scheduling_parent_valid = - if let Some(info) = state.leaf_scheduling_info.get(&scheduling_parent) { - // scheduling_parent is a leaf — valid only if the leaf's slot is exactly - // one behind the current slot (i.e., it just finished). - *current_slot == *info.slot + 1 - } else { - // scheduling_parent is not a leaf — valid if it's the parent of any leaf - // whose slot is the current slot (still in progress). - state.leaf_scheduling_info.iter().any(|(_leaf_hash, info)| { - *current_slot == *info.slot && scheduling_parent == info.parent_hash - }) - }; - - if !scheduling_parent_valid { + if !is_scheduling_parent_valid(&scheduling_parent, &state.leaf_scheduling_info) { return Err(AdvertisementError::SchedulingParentNotValid); } } @@ -2001,15 +1970,7 @@ where state.per_scheduling_parent.insert(*leaf, per_scheduling_parent); state.leaf_claim_queues.insert(*leaf, leaf_claim_queue); - // Fetch leaf header to extract BABE slot for V3 scheduling parent validation. - // Without this info, V3 advertisements referencing this leaf will be rejected. - let (tx, rx) = oneshot::channel(); - sender.send_message(ChainApiMessage::BlockHeader(*leaf, tx)).await; - let header = rx.await.ok().and_then(|r| r.ok()).flatten(); - match header.and_then(|h| { - let slot = h.digest.logs().iter().find_map(|log| log.as_babe_pre_digest())?.slot(); - Some(LeafSchedulingInfo { parent_hash: h.parent_hash, slot }) - }) { + match extract_leaf_scheduling_info(sender, *leaf).await { Some(info) => { state.leaf_scheduling_info.insert(*leaf, info); }, diff --git a/polkadot/node/network/collator-protocol/src/validator_side/tests/prospective_parachains.rs b/polkadot/node/network/collator-protocol/src/validator_side/tests/prospective_parachains.rs index 384cacf575e52..cffbc0daa8cf2 100644 --- a/polkadot/node/network/collator-protocol/src/validator_side/tests/prospective_parachains.rs +++ b/polkadot/node/network/collator-protocol/src/validator_side/tests/prospective_parachains.rs @@ -22,7 +22,7 @@ use polkadot_node_subsystem::messages::ChainApiMessage; use polkadot_primitives::{ BlockNumber, CandidateCommitments, CandidateDescriptorVersion, CommittedCandidateReceiptV2 as CommittedCandidateReceipt, Header, MutateDescriptorV2, - SigningContext, ValidatorId, + SigningContext, ValidatorId, RELAY_CHAIN_SLOT_DURATION_MILLIS, }; use polkadot_primitives_test_helpers::{ dummy_committed_candidate_receipt_v2, dummy_committed_candidate_receipt_v3, diff --git a/polkadot/node/network/collator-protocol/src/validator_side_experimental/collation_manager/mod.rs b/polkadot/node/network/collator-protocol/src/validator_side_experimental/collation_manager/mod.rs index 6ff383b94fe51..cb0bd085ab6ec 100644 --- a/polkadot/node/network/collator-protocol/src/validator_side_experimental/collation_manager/mod.rs +++ b/polkadot/node/network/collator-protocol/src/validator_side_experimental/collation_manager/mod.rs @@ -15,6 +15,7 @@ // along with Polkadot. If not, see . use crate::{ + extract_leaf_scheduling_info, is_scheduling_parent_valid, validator_side::{ descriptor_version_sanity_check_with_params, error::SecondingError, request_persisted_validation_data, request_prospective_validation_data, BlockedCollationId, @@ -28,7 +29,7 @@ use crate::{ }, error::{Error, FatalResult, Result}, }, - LOG_TARGET, + LeafSchedulingInfo, LOG_TARGET, }; use fatality::Split; use futures::{channel::oneshot, stream::FusedStream}; @@ -48,8 +49,9 @@ use polkadot_node_subsystem_util::{ request_validators, runtime::recv_runtime, }; use polkadot_primitives::{ - CandidateHash, CandidateReceiptV2 as CandidateReceipt, CoreIndex, GroupIndex, - GroupRotationInfo, Hash, HeadData, Id as ParaId, PersistedValidationData, SessionIndex, + CandidateDescriptorVersion, CandidateHash, CandidateReceiptV2 as CandidateReceipt, CoreIndex, + GroupIndex, GroupRotationInfo, Hash, HeadData, Id as ParaId, PersistedValidationData, + SessionIndex, }; use requests::PendingRequests; use schnellru::{ByLength, LruMap}; @@ -69,7 +71,7 @@ pub enum AdvertisementError { InvalidAssignment, #[error("Duplicate advertisement")] Duplicate, - #[error("Advertised relay parent is out of our view")] + #[error("Advertised scheduling parent is out of our view")] OutOfOurView, #[error("Peer reached the candidate limit")] PeerLimitReached, @@ -77,6 +79,8 @@ pub enum AdvertisementError { BlockedByBacking, #[error("V1 advertisements are only allowed on active leaves")] V1AdvertisementForImplicitParent, + #[error("For V3 candidate descriptors, scheduling_parent does not match any expected scheduling parent.")] + SchedulingParentNotValid, } pub struct CollationManager { @@ -94,8 +98,8 @@ pub struct CollationManager { // must contain the full parent head data. blocked_from_seconding: HashMap>, - // Information kept per relay parent. - per_relay_parent: HashMap, + // Information kept per scheduling parent. + per_scheduling_parent: HashMap, // Session info cache. per_session: LruMap, @@ -105,6 +109,7 @@ pub struct CollationManager { // Key store. keystore: KeystorePtr, + leaf_scheduling_info: HashMap, } impl CollationManager { @@ -116,11 +121,12 @@ impl CollationManager { let mut instance = Self { implicit_view: ImplicitView::new(), claim_queue_state: PerLeafClaimQueueState::new(), - per_relay_parent: HashMap::new(), + per_scheduling_parent: HashMap::new(), blocked_from_seconding: HashMap::new(), per_session: LruMap::new(ByLength::new(2)), fetching: PendingRequests::default(), keystore, + leaf_scheduling_info: HashMap::default(), }; instance.update_view(sender, OurView::new([active_leaf.hash], 0)).await?; @@ -153,6 +159,21 @@ impl CollationManager { ); for leaf in added.iter() { + match extract_leaf_scheduling_info(sender, *leaf).await { + Some(info) => { + self.leaf_scheduling_info.insert(*leaf, info); + }, + None => { + gum::warn!( + target: LOG_TARGET, + ?leaf, + "Could not extract BABE slot from leaf header; \ + V3 scheduling parent validation will reject advertisements \ + referencing this leaf", + ); + }, + } + if let Err(err) = self .implicit_view .activate_leaf(sender, *leaf) @@ -166,21 +187,22 @@ impl CollationManager { for leaf in removed { let deactivated_ancestry = self.implicit_view.deactivate_leaf(leaf); + self.leaf_scheduling_info.remove(&leaf); gum::trace!( target: LOG_TARGET, ?deactivated_ancestry, - "CollationManager: Removing relay parents from implicit view" + "CollationManager: Removing scheduling parents from implicit view" ); for deactivated in deactivated_ancestry.iter() { // Remove the fetching collations and advertisements for the deactivated RPs. - if let Some(deactivated_rp) = self.per_relay_parent.remove(deactivated) { - for advertisement in deactivated_rp.all_advertisements() { + if let Some(deactivated_sp) = self.per_scheduling_parent.remove(deactivated) { + for advertisement in deactivated_sp.all_advertisements() { gum::trace!( target: LOG_TARGET, ?advertisement, - "Cancelling advertisement because relay parent got out of view" + "Cancelling advertisement because scheduling parent got out of view" ); self.fetching.cancel(&advertisement); } @@ -195,9 +217,8 @@ impl CollationManager { let mut removed_blocked = vec![]; self.blocked_from_seconding.retain(|_, collations| { collations.retain(|collation| { - let remove = !self - .per_relay_parent - .contains_key(&collation.candidate_receipt.descriptor.relay_parent()); + let remove = + !self.per_scheduling_parent.contains_key(&collation.scheduling_parent()); if remove { removed_blocked.push(collation.candidate_receipt.hash()); @@ -228,7 +249,7 @@ impl CollationManager { // Includes the leaf for (idx, ancestor) in allowed_ancestry.iter().enumerate() { - if self.per_relay_parent.contains_key(&ancestor) { + if self.per_scheduling_parent.contains_key(&ancestor) { continue; } @@ -251,9 +272,9 @@ impl CollationManager { Default::default() }, }; - - self.per_relay_parent - .insert(*ancestor, PerRelayParent::new(session_index, core)); + // If session info is not available default to assume v2 candidate descriptors. + self.per_scheduling_parent + .insert(*ancestor, PerSchedulingParent::new(session_index, core)); if idx == 0 && ancestor == leaf { let mut claim_queues = @@ -286,7 +307,8 @@ impl CollationManager { sender: &mut Sender, advertisement: Advertisement, ) -> std::result::Result<(), AdvertisementError> { - let Some(per_rp) = self.per_relay_parent.get_mut(&advertisement.scheduling_parent) else { + let Some(per_sp) = self.per_scheduling_parent.get_mut(&advertisement.scheduling_parent) + else { return Err(AdvertisementError::OutOfOurView); }; @@ -297,6 +319,17 @@ impl CollationManager { return Err(AdvertisementError::V1AdvertisementForImplicitParent); } + // V3 candidate descriptors require scheduling_parent to be the block from the last + // finished relay chain slot. + if advertisement.advertised_descriptor_version == Some(CandidateDescriptorVersion::V3) { + if !is_scheduling_parent_valid( + &advertisement.scheduling_parent, + &self.leaf_scheduling_info, + ) { + return Err(AdvertisementError::SchedulingParentNotValid); + } + } + let now = Instant::now(); let max_assignments = self @@ -310,7 +343,7 @@ impl CollationManager { if let Some(ProspectiveCandidate { candidate_hash, .. }) = advertisement.prospective_candidate { - if per_rp.fetched_collations.contains_key(&candidate_hash) { + if per_sp.fetched_collations.contains_key(&candidate_hash) { return Err(AdvertisementError::Duplicate); } } @@ -319,14 +352,14 @@ impl CollationManager { return Err(AdvertisementError::Duplicate); } - per_rp.can_keep_advertisement(advertisement, max_assignments)?; + per_sp.can_keep_advertisement(advertisement, max_assignments)?; let can_second = backing_allows_seconding(sender, &advertisement).await; if !can_second { return Err(AdvertisementError::BlockedByBacking); } - per_rp.add_advertisement(advertisement, now); + per_sp.add_advertisement(advertisement, now); Ok(()) } @@ -395,7 +428,7 @@ impl CollationManager { target: LOG_TARGET, peer_id = ?advertisement.peer_id, ?para_id, - relay_parent = ?advertisement.scheduling_parent, + scheduling_parent = ?advertisement.scheduling_parent, maybe_candidate_hash = ?advertisement.candidate_hash(), "Requesting collation", ); @@ -418,11 +451,11 @@ impl CollationManager { } pub fn remove_peer(&mut self, peer: &PeerId) { - for per_rp in self.per_relay_parent.values_mut() { + for per_sp in self.per_scheduling_parent.values_mut() { // No need to reset now the statuses of claims that were pending fetch for these // candidates, or even cancel the futures as the requests will soon conclude with a // network error. - per_rp.remove_peer_advertisements(peer); + per_sp.remove_peer_advertisements(peer); } } @@ -437,30 +470,41 @@ impl CollationManager { &mut self, sender: &mut Sender, res: CollationFetchResponse, + maybe_collation_version: Option, ) -> CanSecond { let advertisement = res.0; let mut reject_info = SecondingRejectionInfo::from(&advertisement); self.fetching.note_completed(&advertisement); - let Some(per_rp) = self.per_relay_parent.get_mut(&advertisement.scheduling_parent) else { + let Some(per_sp) = self.per_scheduling_parent.get_mut(&advertisement.scheduling_parent) + else { gum::debug!( target: LOG_TARGET, hash = ?advertisement.scheduling_parent, para_id = ?advertisement.para_id, peer_id = ?advertisement.peer_id, - "Collation fetch concluded for relay parent out of view" + "Collation fetch concluded for scheduling parent out of view" ); return CanSecond::No(None, reject_info); }; - per_rp.remove_advertisement(&advertisement); + per_sp.remove_advertisement(&advertisement); + + let Some(collation_version) = maybe_collation_version else { + gum::debug!( + target: LOG_TARGET, + ?advertisement, + "Peer may not be connected." + ); + return CanSecond::No(None, reject_info); + }; match process_collation_fetch_result(res) { Ok(fetched_collation) => { // It can't be a duplicate, because we check before initiating fetch. For the old - // protocol version, we anyway only fetch one per relay parent. - per_rp + // protocol version, we anyway only fetch one per scheduling parent. + per_sp .fetched_collations .insert(fetched_collation.candidate_receipt.hash(), advertisement.peer_id); @@ -481,9 +525,9 @@ impl CollationManager { // Sanity check of the candidate receipt version. if let Err(err) = descriptor_version_sanity_check_with_params( fetched_collation.candidate_receipt.descriptor(), - per_rp.core_index, - per_rp.session_index, - CollationVersion::V2, // experimental module doesn't support V3 yet + per_sp.core_index, + per_sp.session_index, + collation_version, ) { gum::warn!( target: LOG_TARGET, @@ -502,7 +546,7 @@ impl CollationManager { pub fn release_slot( &mut self, - relay_parent: &Hash, + scheduling_parent: &Hash, para_id: ParaId, maybe_candidate_hash: Option<&CandidateHash>, maybe_output_head_hash: Option, @@ -511,17 +555,17 @@ impl CollationManager { if !self.claim_queue_state.release_claims_for_candidate(candidate_hash) { gum::debug!( target: LOG_TARGET, - ?relay_parent, + ?scheduling_parent, ?candidate_hash, ?para_id, "Could not release slot for candidate, it wasn't claimed", ); } } else { - if !self.claim_queue_state.release_claims_for_relay_parent(relay_parent) { + if !self.claim_queue_state.release_claims_for_relay_parent(scheduling_parent) { gum::debug!( target: LOG_TARGET, - ?relay_parent, + ?scheduling_parent, ?para_id, "Could not release slot for candidate, it wasn't claimed", ); @@ -539,26 +583,27 @@ impl CollationManager { pub fn get_fetched_collation_peer_id( &self, - relay_parent: &Hash, + scheduling_parent: &Hash, candidate_hash: &CandidateHash, ) -> Option<&PeerId> { - self.per_relay_parent - .get(relay_parent) - .and_then(|per_rp| per_rp.fetched_collations.get(candidate_hash)) + self.per_scheduling_parent + .get(scheduling_parent) + .and_then(|per_sp| per_sp.fetched_collations.get(candidate_hash)) } pub async fn note_seconded( &mut self, sender: &mut Sender, - relay_parent: &Hash, + scheduling_parent: &Hash, para_id: &ParaId, candidate_hash: &CandidateHash, output_head_hash: Hash, ) -> (Option, Vec) { - let peer_id = self.get_fetched_collation_peer_id(relay_parent, candidate_hash).copied(); + let peer_id = + self.get_fetched_collation_peer_id(scheduling_parent, candidate_hash).copied(); self.claim_queue_state - .claim_seconded_slot(relay_parent, para_id, candidate_hash); + .claim_seconded_slot(scheduling_parent, para_id, candidate_hash); // See if we've unblocked other collations here too. let maybe_unblocked = self.blocked_from_seconding.remove(&BlockedCollationId { @@ -570,7 +615,7 @@ impl CollationManager { let mut unblocked_can_second = Vec::with_capacity(unblocked.len()); for fetched_collation in unblocked { let reject_info = SecondingRejectionInfo { - relay_parent: fetched_collation.candidate_receipt.descriptor.relay_parent(), + scheduling_parent: fetched_collation.scheduling_parent(), peer_id: fetched_collation.peer_id, para_id: fetched_collation.candidate_receipt.descriptor.para_id(), maybe_output_head_hash: Some( @@ -606,19 +651,19 @@ impl CollationManager { &self, now: Instant, leaf: Hash, - allowed_rps: &[Hash], + allowed_sps: &[Hash], para_id: ParaId, highest_rep_of_para: Score, connected_rep_query_fn: &RepQueryFn, ) -> Either, Duration> { let advertisements = self - .per_relay_parent + .per_scheduling_parent .iter() - // Only check advertisements for relay parents within the view of this leaf. - .filter_map(|(rp, per_rp)| allowed_rps.contains(rp).then_some(per_rp)) - .flat_map(|per_rp| { - let activated_at = per_rp.activated_at; - per_rp + // Only check advertisements for scheduling parents within the view of this leaf. + .filter_map(|(sp, per_sp)| allowed_sps.contains(sp).then_some(per_sp)) + .flat_map(|per_sp| { + let activated_at = per_sp.activated_at; + per_sp .eligible_advertisements(para_id, leaf) .map(move |(adv, timestamp)| (adv, timestamp, activated_at)) }) @@ -644,8 +689,8 @@ impl CollationManager { let delay = Self::calculate_delay(best_advertisement.score, highest_rep_of_para); - // Calculate the remaining delay relative to the relay parent's activation time, - // not the advertisement's arrival time. This ensures that if a relay parent has been + // Calculate the remaining delay relative to the scheduling parent's activation time, + // not the advertisement's arrival time. This ensures that if a scheduling parent has been // active long enough, advertisements are fetched immediately regardless of when they // arrived. let elapsed_since_activation = now.duration_since(best_advertisement.activated_at); @@ -655,7 +700,7 @@ impl CollationManager { gum::debug!( target: LOG_TARGET, peer_id = ?best_advertisement.adv.peer_id, - relay_parent = ?best_advertisement.adv.scheduling_parent, + scheduling_parent = ?best_advertisement.adv.scheduling_parent, para_id = ?best_advertisement.adv.para_id, ?elapsed_since_activation, ?delay, @@ -727,7 +772,7 @@ impl CollationManager { queue_blocked_collations: bool, reject_info: SecondingRejectionInfo, ) -> CanSecond { - let relay_parent = fetched_collation.candidate_receipt.descriptor.relay_parent(); + let scheduling_parent = fetched_collation.scheduling_parent(); let candidate_hash = fetched_collation.candidate_receipt.hash(); let para_id = fetched_collation.candidate_receipt.descriptor.para_id(); @@ -741,10 +786,10 @@ impl CollationManager { let can_second = match fetch_pvd_res { Ok(pvd) => { // Mark this claim with the right candidate hash. This is a no-op if for - // protocol v2 but in case of v1, the claim was made on the relay parent but + // protocol v2 but in case of v1, the claim was made on the scheduling parent but // without a candidate hash. self.claim_queue_state.mark_pending_slot_with_candidate( - &relay_parent, + &scheduling_parent, ¶_id, &candidate_hash, ); @@ -755,7 +800,7 @@ impl CollationManager { gum::debug!( target: LOG_TARGET, ?candidate_hash, - ?relay_parent, + ?scheduling_parent, ?para_id, "Collation having parent head data hash {} is blocked from seconding. Waiting on its parent to be validated.", parent @@ -769,10 +814,10 @@ impl CollationManager { } // Mark this claim with the right candidate hash. This is a no-op if for - // protocol v2 but in case of v1, the claim was made on the relay parent but - // without a candidate hash. + // protocol v2 but in case of v1, the claim was made on the scheduling parent + // but without a candidate hash. self.claim_queue_state.mark_pending_slot_with_candidate( - &relay_parent, + &scheduling_parent, ¶_id, &candidate_hash, ); @@ -783,7 +828,7 @@ impl CollationManager { gum::warn!( target: LOG_TARGET, ?candidate_hash, - ?relay_parent, + ?scheduling_parent, ?para_id, "Failed persisted validation data checks: {}", err @@ -806,10 +851,10 @@ impl CollationManager { for collation in blocked { let candidate_hash = collation.candidate_receipt.hash(); - let relay_parent = collation.candidate_receipt.descriptor.relay_parent(); + let scheduling_parent = collation.scheduling_parent(); gum::debug!( target: LOG_TARGET, - ?relay_parent, + ?scheduling_parent, ?candidate_hash, para_id = ?id.para_id, parent_head_hash = ?id.parent_head_data_hash, @@ -819,7 +864,7 @@ impl CollationManager { if !self.claim_queue_state.release_claims_for_candidate(&candidate_hash) { gum::debug!( target: LOG_TARGET, - ?relay_parent, + ?scheduling_parent, ?candidate_hash, para_id = ?id.para_id, "Could not release slot for candidate, it wasn't claimed", @@ -830,10 +875,10 @@ impl CollationManager { #[cfg(test)] pub fn advertisements(&self) -> BTreeSet { - self.per_relay_parent + self.per_scheduling_parent .values() - .flat_map(|per_rp| { - per_rp + .flat_map(|per_sp| { + per_sp .peer_advertisements .values() .flat_map(|peer_adv| peer_adv.advertisements.keys().cloned()) @@ -859,6 +904,26 @@ struct FetchedCollation { } impl FetchedCollation { + pub fn new( + candidate_receipt: CandidateReceipt, + pov: PoV, + maybe_parent_head_data: Option, + maybe_parent_head_data_hash: Option, + peer_id: PeerId, + ) -> Self { + Self { + candidate_receipt, + pov, + maybe_parent_head_data, + maybe_parent_head_data_hash, + peer_id, + } + } + + pub fn scheduling_parent(&self) -> Hash { + self.candidate_receipt.descriptor().scheduling_parent() + } + /// Performs a sanity check between advertised and fetched collations. fn ensure_matches_advertisement( &self, @@ -881,10 +946,18 @@ impl FetchedCollation { }, } - // TODO: Should be scheduling_parent - if advertised.scheduling_parent != candidate_receipt.descriptor.relay_parent() { + if advertised.scheduling_parent != candidate_receipt.descriptor.scheduling_parent() { return Err(SecondingError::SchedulingParentMismatch); } + if let Some(advertised_version) = &advertised.advertised_descriptor_version { + let fetched_version = candidate_receipt.descriptor().version(); + if advertised_version != &fetched_version { + return Err(SecondingError::DescriptorVersionMismatch( + *advertised_version, + fetched_version, + )); + } + } Ok(()) } @@ -899,7 +972,7 @@ struct AcceptedAdvertisement<'a> { adv: &'a Advertisement, score: Score, timestamp: &'a Instant, - /// The time at which the relay parent was activated + /// The time at which the scheduling parent was activated activated_at: Instant, } @@ -919,19 +992,19 @@ impl<'a> PartialOrd for AcceptedAdvertisement<'a> { } } -struct PerRelayParent { +struct PerSchedulingParent { peer_advertisements: HashMap, // Only kept to make sure that we don't re-request the same collations and so that we know who // to punish for supplying an invalid collation. fetched_collations: HashMap, session_index: SessionIndex, core_index: CoreIndex, - // The time at which this relay parent was activated. Used to calculate fetch + // The time at which this scheduling parent was activated. Used to calculate fetch // delays relative to leaf activation. activated_at: Instant, } -impl PerRelayParent { +impl PerSchedulingParent { fn new(session_index: SessionIndex, core_index: CoreIndex) -> Self { Self { session_index, @@ -1044,8 +1117,6 @@ where return true; }; - // TODO: use the actual scheduling_parent once V3 is supported in the - // experimental module (relay_parent == scheduling_parent before V3). let request = CanSecondRequest { candidate_para_id: advertisement.para_id, candidate_scheduling_parent: advertisement.scheduling_parent, @@ -1059,7 +1130,7 @@ where gum::warn!( target: LOG_TARGET, ?err, - relay_parent = ?advertisement.scheduling_parent, + scheduling_parent = ?advertisement.scheduling_parent, para_id = ?advertisement.para_id, candidate_hash = ?prospective_candidate.candidate_hash, "CanSecond-request responder was dropped", @@ -1167,15 +1238,13 @@ fn process_collation_fetch_result( "Received collation", ); - Ok(FetchedCollation { + Ok(FetchedCollation::new( candidate_receipt, pov, - peer_id: advertisement.peer_id, - maybe_parent_head_data: None, - maybe_parent_head_data_hash: advertisement - .prospective_candidate - .map(|p| p.parent_head_data_hash), - }) + None, + advertisement.prospective_candidate.map(|p| p.parent_head_data_hash), + advertisement.peer_id, + )) }, Ok(request_v2::CollationFetchingResponse::CollationWithParentHeadData { receipt, @@ -1188,15 +1257,13 @@ fn process_collation_fetch_result( "Received collation with parent head data", ); - Ok(FetchedCollation { - candidate_receipt: receipt, + Ok(FetchedCollation::new( + receipt, pov, - peer_id: advertisement.peer_id, - maybe_parent_head_data: Some(parent_head_data), - maybe_parent_head_data_hash: advertisement - .prospective_candidate - .map(|p| p.parent_head_data_hash), - }) + Some(parent_head_data), + advertisement.prospective_candidate.map(|p| p.parent_head_data_hash), + advertisement.peer_id, + )) }, } } @@ -1253,6 +1320,7 @@ mod tests { para_id, peer_id, prospective_candidate: None, + advertised_descriptor_version: None, }; let peer_1 = PeerId::random(); @@ -1430,19 +1498,21 @@ mod tests { para_id, peer_id: peer, prospective_candidate: None, + advertised_descriptor_version: None, }; let new_collation_manager_instance = || CollationManager { implicit_view: ImplicitView::new(), claim_queue_state: PerLeafClaimQueueState::new(), - per_relay_parent: HashMap::from([( + per_scheduling_parent: HashMap::from([( scheduling_parent, - PerRelayParent::new(0, CoreIndex(0)), + PerSchedulingParent::new(0, CoreIndex(0)), )]), blocked_from_seconding: HashMap::new(), per_session: LruMap::new(ByLength::new(2)), fetching: PendingRequests::default(), keystore: Arc::new(sc_keystore::LocalKeystore::in_memory()), + leaf_scheduling_info: HashMap::new(), }; // No advertisements - returns Left(None). @@ -1469,7 +1539,7 @@ mod tests { let get_rep = |_: &PeerId, _: &ParaId| Some(score(100)); collation_manager - .per_relay_parent + .per_scheduling_parent .get_mut(&scheduling_parent) .unwrap() .add_advertisement(make_adv(peer_a), old_timestamp); @@ -1493,7 +1563,7 @@ mod tests { let get_rep = |_: &PeerId, _: &ParaId| Some(score(0)); collation_manager - .per_relay_parent + .per_scheduling_parent .get_mut(&scheduling_parent) .unwrap() .add_advertisement(make_adv(peer_a), recent_timestamp); @@ -1530,10 +1600,11 @@ mod tests { } }; - let per_rp = collation_manager.per_relay_parent.get_mut(&scheduling_parent).unwrap(); - per_rp.add_advertisement(make_adv(peer_a), old_timestamp); - per_rp.add_advertisement(make_adv(peer_b), old_timestamp); - per_rp.add_advertisement(make_adv(peer_c), old_timestamp); + let per_sp = + collation_manager.per_scheduling_parent.get_mut(&scheduling_parent).unwrap(); + per_sp.add_advertisement(make_adv(peer_a), old_timestamp); + per_sp.add_advertisement(make_adv(peer_b), old_timestamp); + per_sp.add_advertisement(make_adv(peer_c), old_timestamp); // All have old timestamps, so delay has passed. Should pick peer_b (highest score). assert_eq!( @@ -1557,9 +1628,10 @@ mod tests { let earlier = old_timestamp; let later = old_timestamp + Duration::from_secs(1); - let per_rp = collation_manager.per_relay_parent.get_mut(&scheduling_parent).unwrap(); - per_rp.add_advertisement(make_adv(peer_a), later); - per_rp.add_advertisement(make_adv(peer_b), earlier); + let per_sp = + collation_manager.per_scheduling_parent.get_mut(&scheduling_parent).unwrap(); + per_sp.add_advertisement(make_adv(peer_a), later); + per_sp.add_advertisement(make_adv(peer_b), earlier); // Same score, peer_b has earlier timestamp. assert_eq!( @@ -1581,7 +1653,7 @@ mod tests { let get_rep = |_: &PeerId, _: &ParaId| -> Option { None }; collation_manager - .per_relay_parent + .per_scheduling_parent .get_mut(&scheduling_parent) .unwrap() .add_advertisement(make_adv(peer_a), old_timestamp); @@ -1599,24 +1671,24 @@ mod tests { ); } - // Relay parent not in allowed_rps - no advertisements found. + // Scheduling parent not in allowed_sps - no advertisements found. { let mut collation_manager = new_collation_manager_instance(); let get_rep = |_: &PeerId, _: &ParaId| Some(score(100)); - let other_relay_parent = Hash::random(); + let other_scheduling_parent = Hash::random(); collation_manager - .per_relay_parent + .per_scheduling_parent .get_mut(&scheduling_parent) .unwrap() .add_advertisement(make_adv(peer_a), old_timestamp); - // Pass different relay parent in allowed_rps. + // Pass different scheduling parent in allowed_sps. assert_eq!( collation_manager.pick_best_advertisement( now, scheduling_parent, - &[other_relay_parent], // relay_parent not included + &[other_scheduling_parent], // scheduling_parent not included para_id, score(100), &get_rep, @@ -1627,18 +1699,20 @@ mod tests { // Delay passed because leaf has been active long enough, even though advertisement arrived // recently. Tests that the delay is relative to activation time, not advertisement - // arrival time. When the relay parent (leaf) has been active longer than the full delay, - // the remaining delay should be zero and the advertisement should be fetched immediately. + // arrival time. When the scheduling parent (leaf) has been active longer than the full + // delay, the remaining delay should be zero and the advertisement should be fetched + // immediately. { let mut collation_manager = new_collation_manager_instance(); let get_rep = |_: &PeerId, _: &ParaId| Some(score(0)); // Set activated_at far enough in the past that any delay has elapsed. - let per_rp = collation_manager.per_relay_parent.get_mut(&scheduling_parent).unwrap(); - per_rp.activated_at = now.checked_sub(MAX_FETCH_DELAY * 2).unwrap(); + let per_sp = + collation_manager.per_scheduling_parent.get_mut(&scheduling_parent).unwrap(); + per_sp.activated_at = now.checked_sub(MAX_FETCH_DELAY * 2).unwrap(); // Advertisement arrives now (recent), but the leaf has been active long enough. - per_rp.add_advertisement(make_adv(peer_a), recent_timestamp); + per_sp.add_advertisement(make_adv(peer_a), recent_timestamp); // highest_rep = 100, peer's score = 0 (< INSTANT_FETCH_REP_THRESHOLD), so delay = // MAX_FETCH_DELAY. But activated_at is 2*MAX_FETCH_DELAY ago, so remaining_delay = 0. @@ -1663,10 +1737,11 @@ mod tests { // Set activated_at so that only part of the delay has elapsed. // score(0) < INSTANT_FETCH_REP_THRESHOLD and < highest_rep => delay = MAX_FETCH_DELAY // activated_at = MAX_FETCH_DELAY / 4 ago => remaining = MAX_FETCH_DELAY * 3/4 - let per_rp = collation_manager.per_relay_parent.get_mut(&scheduling_parent).unwrap(); - per_rp.activated_at = now.checked_sub(MAX_FETCH_DELAY / 4).unwrap(); + let per_sp = + collation_manager.per_scheduling_parent.get_mut(&scheduling_parent).unwrap(); + per_sp.activated_at = now.checked_sub(MAX_FETCH_DELAY / 4).unwrap(); - per_rp.add_advertisement(make_adv(peer_a), recent_timestamp); + per_sp.add_advertisement(make_adv(peer_a), recent_timestamp); let result = collation_manager.pick_best_advertisement( now, diff --git a/polkadot/node/network/collator-protocol/src/validator_side_experimental/common.rs b/polkadot/node/network/collator-protocol/src/validator_side_experimental/common.rs index 53d56c06596f3..0f3adf2bcb71d 100644 --- a/polkadot/node/network/collator-protocol/src/validator_side_experimental/common.rs +++ b/polkadot/node/network/collator-protocol/src/validator_side_experimental/common.rs @@ -22,8 +22,8 @@ use polkadot_node_network_protocol::{ }; use polkadot_node_primitives::PoV; use polkadot_primitives::{ - CandidateHash, CandidateReceiptV2 as CandidateReceipt, Hash, Id as ParaId, - PersistedValidationData, + CandidateDescriptorVersion, CandidateHash, CandidateReceiptV2 as CandidateReceipt, Hash, + Id as ParaId, PersistedValidationData, }; use std::{collections::HashSet, num::NonZeroU16, time::Duration}; @@ -209,7 +209,7 @@ pub struct ProspectiveCandidate { } /// Identifier of a collation being requested. -#[derive(Debug, Copy, Clone, Hash, Eq, PartialEq, PartialOrd, Ord)] +#[derive(Debug, Copy, Clone, PartialOrd, Ord, Eq, Hash, PartialEq)] pub struct Advertisement { /// Candidate's scheduling parent. pub scheduling_parent: Hash, @@ -220,6 +220,9 @@ pub struct Advertisement { /// Optional candidate hash and parent head-data hash if were /// supplied in advertisement. pub prospective_candidate: Option, + /// Advertised candidate descriptor version (for V3 protocol). + /// None for V1/V2 protocols. + pub advertised_descriptor_version: Option, } impl Advertisement { @@ -258,7 +261,7 @@ pub enum CanSecond { /// Information that identifies a collation that was rejected from seconding. #[derive(Debug)] pub struct SecondingRejectionInfo { - pub relay_parent: Hash, + pub scheduling_parent: Hash, pub peer_id: PeerId, pub para_id: ParaId, pub maybe_output_head_hash: Option, @@ -268,7 +271,7 @@ pub struct SecondingRejectionInfo { impl From<&Advertisement> for SecondingRejectionInfo { fn from(advertisement: &Advertisement) -> Self { SecondingRejectionInfo { - relay_parent: advertisement.scheduling_parent, + scheduling_parent: advertisement.scheduling_parent, peer_id: advertisement.peer_id, para_id: advertisement.para_id, maybe_output_head_hash: None, diff --git a/polkadot/node/network/collator-protocol/src/validator_side_experimental/mod.rs b/polkadot/node/network/collator-protocol/src/validator_side_experimental/mod.rs index 5a6d158571a9b..e74486c7b7cdc 100644 --- a/polkadot/node/network/collator-protocol/src/validator_side_experimental/mod.rs +++ b/polkadot/node/network/collator-protocol/src/validator_side_experimental/mod.rs @@ -325,11 +325,11 @@ async fn process_msg( ); } }, - Seconded(_parent, stmt) => { - state.handle_seconded_collation(sender, stmt).await; + Seconded(parent, stmt) => { + state.handle_seconded_collation(sender, stmt, parent).await; }, - Invalid(_parent, candidate_receipt) => { - state.handle_invalid_collation(candidate_receipt).await; + Invalid(parent, candidate_receipt) => { + state.handle_invalid_collation(candidate_receipt, parent).await; }, ConnectToBackingGroups => { gum::warn!( @@ -425,18 +425,29 @@ async fn process_incoming_peer_message( ); }, CollationProtocols::V1(V1::AdvertiseCollation(relay_parent)) => { - state.handle_advertisement(sender, origin, relay_parent, None).await; + state.handle_advertisement(sender, origin, relay_parent, None, None).await; }, CollationProtocols::V2(V2::AdvertiseCollation { scheduling_parent, candidate_hash, parent_head_data_hash, .. - }) | + }) => { + state + .handle_advertisement( + sender, + origin, + scheduling_parent, + Some(ProspectiveCandidate { candidate_hash, parent_head_data_hash }), + None, + ) + .await; + }, CollationProtocols::V3(V3::AdvertiseCollation { scheduling_parent, candidate_hash, parent_head_data_hash, + candidate_descriptor_version, .. }) => { state @@ -445,6 +456,7 @@ async fn process_incoming_peer_message( origin, scheduling_parent, Some(ProspectiveCandidate { candidate_hash, parent_head_data_hash }), + Some(candidate_descriptor_version), ) .await; }, diff --git a/polkadot/node/network/collator-protocol/src/validator_side_experimental/peer_manager/connected.rs b/polkadot/node/network/collator-protocol/src/validator_side_experimental/peer_manager/connected.rs index 64262d39d78bb..9039dd73775b7 100644 --- a/polkadot/node/network/collator-protocol/src/validator_side_experimental/peer_manager/connected.rs +++ b/polkadot/node/network/collator-protocol/src/validator_side_experimental/peer_manager/connected.rs @@ -18,7 +18,7 @@ use crate::validator_side_experimental::{ common::{PeerInfo, PeerState, Score}, peer_manager::{DeclarationOutcome, ReputationUpdate, ReputationUpdateKind, TryAcceptOutcome}, }; -use polkadot_node_network_protocol::PeerId; +use polkadot_node_network_protocol::{peer_set::CollationVersion, PeerId}; use polkadot_primitives::Id as ParaId; use std::{ cmp::Ordering, @@ -207,6 +207,12 @@ impl ConnectedPeers { fn contains(&self, peer_id: &PeerId) -> bool { self.peer_info.contains_key(peer_id) } + + /// Get the negotiated collation version. + /// Returns None if the peer is not connected. + pub fn get_version(&self, peer_id: &PeerId) -> Option { + self.peer_info.get(peer_id).map(|peer_info| peer_info.version) + } } /// Per-para connected peers store. Acts as a handy in-memory cache of connected peer scores for a diff --git a/polkadot/node/network/collator-protocol/src/validator_side_experimental/peer_manager/mod.rs b/polkadot/node/network/collator-protocol/src/validator_side_experimental/peer_manager/mod.rs index cd4e9af8d1589..cf661e0b06982 100644 --- a/polkadot/node/network/collator-protocol/src/validator_side_experimental/peer_manager/mod.rs +++ b/polkadot/node/network/collator-protocol/src/validator_side_experimental/peer_manager/mod.rs @@ -38,7 +38,10 @@ use connected::ConnectedPeers; pub use db::Db; pub use persistence::PersistenceError; pub use persistent_db::PersistentDb; -use polkadot_node_network_protocol::{peer_set::PeerSet, PeerId}; +use polkadot_node_network_protocol::{ + peer_set::{CollationVersion, PeerSet}, + PeerId, +}; use polkadot_node_subsystem::{ messages::{ChainApiMessage, NetworkBridgeTxMessage}, CollatorProtocolSenderTrait, RuntimeApiError, @@ -413,6 +416,10 @@ impl PeerManager { .send_message(NetworkBridgeTxMessage::DisconnectPeers(peers, PeerSet::Collation)) .await; } + + pub fn get_peer_protocol_version(&self, peer_id: &PeerId) -> Option { + self.connected.get_version(peer_id) + } } impl PeerManager { diff --git a/polkadot/node/network/collator-protocol/src/validator_side_experimental/state.rs b/polkadot/node/network/collator-protocol/src/validator_side_experimental/state.rs index 8b8717225510e..d6909289fd09b 100644 --- a/polkadot/node/network/collator-protocol/src/validator_side_experimental/state.rs +++ b/polkadot/node/network/collator-protocol/src/validator_side_experimental/state.rs @@ -38,7 +38,8 @@ use polkadot_node_subsystem::{ }; use polkadot_node_subsystem_util::{request_session_index_for_child, runtime::recv_runtime}; use polkadot_primitives::{ - BlockNumber, CandidateReceiptV2 as CandidateReceipt, Hash, Id as ParaId, + BlockNumber, CandidateDescriptorVersion, CandidateReceiptV2 as CandidateReceipt, Hash, + Id as ParaId, }; use std::time::Duration; @@ -222,12 +223,13 @@ impl State { &mut self, sender: &mut Sender, peer_id: PeerId, - relay_parent: Hash, + scheduling_parent: Hash, maybe_prospective_candidate: Option, + advertised_descriptor_version: Option, ) { gum::debug!( target: LOG_TARGET, - ?relay_parent, + ?scheduling_parent, ?maybe_prospective_candidate, ?peer_id, "Received advertisement", @@ -236,7 +238,7 @@ impl State { let Some(PeerInfo { state, .. }) = self.peer_manager.peer_info(&peer_id) else { gum::warn!( target: LOG_TARGET, - ?relay_parent, + ?scheduling_parent, ?peer_id, ?maybe_prospective_candidate, "Received an advertisement from an unconnected peer" @@ -248,7 +250,7 @@ impl State { let PeerState::Collating(para_id) = state else { gum::debug!( target: LOG_TARGET, - ?relay_parent, + ?scheduling_parent, ?maybe_prospective_candidate, ?peer_id, "Received advertisement for undeclared peer", @@ -267,8 +269,9 @@ impl State { Advertisement { peer_id, para_id: *para_id, - scheduling_parent: relay_parent, + scheduling_parent, prospective_candidate: maybe_prospective_candidate, + advertised_descriptor_version, }, ) .await @@ -276,7 +279,7 @@ impl State { Err(err) => { gum::debug!( target: LOG_TARGET, - ?relay_parent, + ?scheduling_parent, ?maybe_prospective_candidate, ?peer_id, ?para_id, @@ -287,7 +290,7 @@ impl State { Ok(()) => { gum::debug!( target: LOG_TARGET, - ?relay_parent, + ?scheduling_parent, ?maybe_prospective_candidate, ?peer_id, ?para_id, @@ -327,18 +330,17 @@ impl State { ); } - let can_second = self.collation_manager.note_fetched(sender, res).await; + let collation_version = self.peer_manager.get_peer_protocol_version(&advertisement.peer_id); + let can_second = self.collation_manager.note_fetched(sender, res, collation_version).await; // To be consistent with the old implementation, if the fetch is successful we count the // request as successful, despite we might not be able to second it. let collation_request_metrics_result = if fetch_result { Ok(()) } else { Err(()) }; match can_second { CanSecond::Yes(candidate_receipt, pov, pvd) => { - // TODO: use the actual scheduling_parent once V3 is supported in the - // experimental module (relay_parent == scheduling_parent before V3). sender .send_message(CandidateBackingMessage::Second { - scheduling_parent: candidate_receipt.descriptor.relay_parent(), + scheduling_parent: candidate_receipt.descriptor().scheduling_parent(), candidate: candidate_receipt, pvd, pov, @@ -366,7 +368,7 @@ impl State { } self.collation_manager.release_slot( - &reject_info.relay_parent, + &reject_info.scheduling_parent, reject_info.para_id, reject_info.maybe_candidate_hash.as_ref(), reject_info.maybe_output_head_hash, @@ -385,20 +387,23 @@ impl State { self.metrics.on_request(collation_request_metrics_result); } - pub async fn handle_invalid_collation(&mut self, receipt: CandidateReceipt) { - let relay_parent = receipt.descriptor.relay_parent(); + pub async fn handle_invalid_collation( + &mut self, + receipt: CandidateReceipt, + scheduling_parent: Hash, + ) { let candidate_hash = receipt.hash(); gum::debug!( target: LOG_TARGET, para_id = ?receipt.descriptor.para_id(), - ?relay_parent, + ?scheduling_parent, ?candidate_hash, "Invalid collation", ); self.collation_manager.release_slot( - &relay_parent, + &scheduling_parent, receipt.descriptor.para_id(), Some(&candidate_hash), Some(receipt.descriptor.para_head()), @@ -406,12 +411,12 @@ impl State { let Some(peer_id) = self .collation_manager - .get_fetched_collation_peer_id(&relay_parent, &candidate_hash) + .get_fetched_collation_peer_id(&scheduling_parent, &candidate_hash) else { gum::warn!( target: LOG_TARGET, para_id = ?receipt.descriptor.para_id(), - ?relay_parent, + ?scheduling_parent, ?candidate_hash, "Could not find the peer id of the invalid collation", ); @@ -420,7 +425,7 @@ impl State { gum::debug!( target: LOG_TARGET, - ?relay_parent, + ?scheduling_parent, ?candidate_hash, ?peer_id, "Invalid collation reported, slashing peer reputation", @@ -435,6 +440,7 @@ impl State { &mut self, sender: &mut Sender, statement: SignedFullStatement, + scheduling_parent: Hash, ) { let receipt = match statement.payload() { Statement::Seconded(receipt) => receipt, @@ -449,13 +455,12 @@ impl State { }; let candidate_hash = receipt.hash(); - let relay_parent = receipt.descriptor.relay_parent(); let para_id = receipt.descriptor.para_id(); gum::debug!( target: LOG_TARGET, ?para_id, - ?relay_parent, + ?scheduling_parent, ?candidate_hash, "Collation seconded", ); @@ -464,7 +469,7 @@ impl State { .collation_manager .note_seconded( sender, - &relay_parent, + &scheduling_parent, ¶_id, &candidate_hash, receipt.descriptor.para_head(), @@ -477,18 +482,19 @@ impl State { gum::debug!( target: LOG_TARGET, ?para_id, - ?relay_parent, + ?scheduling_parent, ?candidate_hash, ?peer_id, "Notifying collator about seconded collation", ); - notify_collation_seconded(sender, peer_id, *version, relay_parent, statement).await; + notify_collation_seconded(sender, peer_id, *version, scheduling_parent, statement) + .await; } if !unblocked_collations.is_empty() { gum::debug!( target: LOG_TARGET, - ?relay_parent, + ?scheduling_parent, ?candidate_hash, ?para_id, "Seconded candidate unblocked {} collations", @@ -546,15 +552,13 @@ impl State { for can_second_unblocked in unblocked_collations { match can_second_unblocked { CanSecond::Yes(candidate_receipt, pov, pvd) => { - let relay_parent = candidate_receipt.descriptor.relay_parent(); let candidate_hash = candidate_receipt.hash(); let para_id = candidate_receipt.descriptor.para_id(); + let scheduling_parent = candidate_receipt.descriptor().scheduling_parent(); - // TODO: use the actual scheduling_parent once V3 is supported in the - // experimental module (relay_parent == scheduling_parent before V3). sender .send_message(CandidateBackingMessage::Second { - scheduling_parent: relay_parent, + scheduling_parent, candidate: candidate_receipt, pvd, pov, @@ -563,7 +567,7 @@ impl State { gum::debug!( target: LOG_TARGET, - ?relay_parent, + ?scheduling_parent, ?candidate_hash, ?para_id, "Started seconding unblocked collation" @@ -572,7 +576,7 @@ impl State { CanSecond::No(maybe_slash, reject_info) => { gum::debug!( target: LOG_TARGET, - relay_parent = ?reject_info.relay_parent, + scheduling_parent = ?reject_info.scheduling_parent, maybe_candidate_hash = ?reject_info.maybe_candidate_hash, para_id = ?reject_info.para_id, "Cannot second unblocked collation" @@ -585,7 +589,7 @@ impl State { } self.collation_manager.release_slot( - &reject_info.relay_parent, + &reject_info.scheduling_parent, reject_info.para_id, reject_info.maybe_candidate_hash.as_ref(), reject_info.maybe_output_head_hash, @@ -594,7 +598,7 @@ impl State { CanSecond::BlockedOnParent(parent, reject_info) => { gum::warn!( target: LOG_TARGET, - relay_parent = ?reject_info.relay_parent, + scheduling_parent = ?reject_info.scheduling_parent, maybe_candidate_hash = ?reject_info.maybe_candidate_hash, ?parent, para_id = ?reject_info.para_id, @@ -602,7 +606,7 @@ impl State { ); self.collation_manager.release_slot( - &reject_info.relay_parent, + &reject_info.scheduling_parent, reject_info.para_id, reject_info.maybe_candidate_hash.as_ref(), reject_info.maybe_output_head_hash, diff --git a/polkadot/node/network/collator-protocol/src/validator_side_experimental/tests.rs b/polkadot/node/network/collator-protocol/src/validator_side_experimental/tests.rs index 8ec7897dbeb14..f0db3ef3b50c7 100644 --- a/polkadot/node/network/collator-protocol/src/validator_side_experimental/tests.rs +++ b/polkadot/node/network/collator-protocol/src/validator_side_experimental/tests.rs @@ -46,9 +46,11 @@ use polkadot_node_subsystem::messages::{ use polkadot_node_subsystem_test_helpers::{mock::new_leaf, sender_receiver, TestSubsystemSender}; use polkadot_node_subsystem_util::TimeoutExt; use polkadot_primitives::{ - ApprovedPeerId, BlockNumber, CandidateReceiptV2 as CandidateReceipt, + node_features::{self, FeatureIndex}, + ApprovedPeerId, BlockNumber, CandidateDescriptorVersion, + CandidateReceiptV2 as CandidateReceipt, CommittedCandidateReceiptV2 as CommittedCandidateReceipt, CoreIndex, GroupRotationInfo, Hash, - HeadData, Header, Id as ParaId, MutateDescriptorV2, OccupiedCoreAssumption, + HeadData, Header, Id as ParaId, MutateDescriptorV2, NodeFeatures, OccupiedCoreAssumption, PersistedValidationData, SessionIndex, SigningContext, UMPSignal, ValidatorId, ValidatorIndex, UMP_SEPARATOR, }; @@ -58,6 +60,7 @@ use polkadot_primitives_test_helpers::{ }; use sc_network::{OutboundFailure, RequestFailure}; use sc_network_types::multihash::Multihash; +use sp_consensus_babe::digests::{PreDigest, SecondaryPlainPreDigest}; use sp_keyring::Sr25519Keyring; use sp_keystore::Keystore; use std::{ @@ -110,7 +113,47 @@ fn dummy_candidate( ( ccr, - Advertisement { peer_id, para_id, scheduling_parent: relay_parent, prospective_candidate }, + Advertisement { + peer_id, + para_id, + scheduling_parent: relay_parent, + prospective_candidate, + advertised_descriptor_version: None, + }, + ) +} + +fn dummy_candidate_v3( + relay_parent: Hash, + scheduling_parent: Hash, + para_id: ParaId, + peer_id: PeerId, + core: CoreIndex, + session: SessionIndex, + pvd_hash: Hash, +) -> (CommittedCandidateReceipt, Advertisement) { + let mut ccr = dummy_committed_candidate_receipt_v2(relay_parent); + ccr.descriptor.set_para_id(para_id); + ccr.descriptor.set_persisted_validation_data_hash(pvd_hash); + ccr.descriptor.set_core_index(core); + ccr.descriptor.set_session_index(session); + ccr.descriptor.set_version(1); + ccr.descriptor.set_scheduling_parent(scheduling_parent); + + let receipt = ccr.to_plain(); + let prospective_candidate = Some(ProspectiveCandidate { + candidate_hash: receipt.hash(), + parent_head_data_hash: dummy_pvd().parent_head.hash(), + }); + ( + ccr, + Advertisement { + peer_id, + para_id, + scheduling_parent, + prospective_candidate, + advertised_descriptor_version: Some(CandidateDescriptorVersion::V3), + }, ) } @@ -143,6 +186,8 @@ struct TestState { candidates_pending_availability: HashMap>, candidate_nonce: u64, keystore: KeystorePtr, + node_features: NodeFeatures, + slot_overrides: HashMap, } impl Default for TestState { @@ -238,6 +283,8 @@ impl Default for TestState { let (sender, recv) = sender_receiver(); + let mut node_features = NodeFeatures::EMPTY; + node_features.resize(FeatureIndex::FirstUnassigned as usize, false); Self { session_info, rp_info, @@ -248,6 +295,8 @@ impl Default for TestState { candidates_pending_availability: HashMap::new(), candidate_nonce: 0, keystore, + node_features, + slot_overrides: HashMap::default(), } } } @@ -340,6 +389,21 @@ impl TestState { match msg { AllMessages::ChainApi(ChainApiMessage::BlockHeader(rp, tx)) => { + let slot = self.slot_overrides.get(&rp).copied().unwrap_or_else(|| { + sp_consensus_slots::Slot::from_timestamp( + sp_timestamp::Timestamp::current(), + sp_consensus_slots::SlotDuration::from_millis( + polkadot_primitives::RELAY_CHAIN_SLOT_DURATION_MILLIS, + ), + ) + }); + let pre_digest = + sp_consensus_babe::digests::CompatibleDigestItem::babe_pre_digest( + PreDigest::SecondaryPlain(SecondaryPlainPreDigest { + authority_index: 0, + slot, + }), + ); tx.send(Ok(Some( self.rp_info .get(&rp) @@ -348,7 +412,7 @@ impl TestState { number: info.number, state_root: Hash::zero(), extrinsics_root: Hash::zero(), - digest: Default::default(), + digest: sp_runtime::Digest { logs: vec![pre_digest] }, }) .unwrap(), ))) @@ -576,7 +640,8 @@ impl TestState { &mut sender, adv.peer_id, adv.scheduling_parent, - adv.prospective_candidate + adv.prospective_candidate, + adv.advertised_descriptor_version ), async move { if adv.prospective_candidate.is_some() { @@ -689,6 +754,7 @@ impl TestState { &mut self, adv: Advertisement, pvd: Option, + expected_relay_parent: Hash, ) { let msg = match self.buffered_msg.take() { Some(msg) => msg, @@ -707,7 +773,7 @@ impl TestState { }, tx) ) => { assert_eq!(para_id, adv.para_id); - assert_eq!(candidate_relay_parent, adv.scheduling_parent); + assert_eq!(candidate_relay_parent, expected_relay_parent); assert!( matches!( @@ -747,6 +813,7 @@ impl TestState { expected_receipt: CandidateReceipt, expected_pvd: PersistedValidationData, expected_pov: PoV, + expected_scheduling_parent: Hash, ) { let msg = match self.buffered_msg.take() { Some(msg) => msg, @@ -758,8 +825,7 @@ impl TestState { AllMessages::CandidateBacking( CandidateBackingMessage::Second { scheduling_parent, candidate, pvd, pov } ) => { - // TODO: This should use scheduling_parent(): https://github.com/paritytech/polkadot-sdk/issues/11084 - assert_eq!(scheduling_parent, candidate.descriptor.relay_parent()); + assert_eq!(scheduling_parent, expected_scheduling_parent); assert_eq!(candidate, expected_receipt); assert_eq!(pvd, expected_pvd); assert_eq!(pov, expected_pov); @@ -773,6 +839,7 @@ impl TestState { adv: Advertisement, receipt: CandidateReceipt, maybe_pvd: Option, + expected_relay_parent: Hash, ) { let mut sender = self.sender.clone(); let pvd = maybe_pvd.unwrap_or_else(|| dummy_pvd()); @@ -782,10 +849,11 @@ impl TestState { &mut sender, (adv, Ok(CollationFetchingResponse::Collation(receipt.clone(), dummy_pov()))) ), - self.assert_pvd_request(adv, Some(pvd.clone())) + self.assert_pvd_request(adv, Some(pvd.clone()), expected_relay_parent) ); - self.assert_seconding_kickoff(receipt, pvd, dummy_pov()).await; + self.assert_seconding_kickoff(receipt, pvd, dummy_pov(), adv.scheduling_parent) + .await; } async fn assert_collation_seconded_notification( @@ -834,8 +902,14 @@ impl TestState { ); }, CollationVersion::V3 => { - // TODO: https://github.com/paritytech/polkadot-sdk/issues/11084 - panic!("CollationVersion::V3 is not yet supported in tests"); + assert_matches!( + msg, + CollationProtocols::V3(protocol_v3::CollationProtocol::CollatorProtocol( + protocol_v3::CollatorProtocolMessage::CollationSeconded(_rp, stmt) + )) => { + assert_eq!(statement, stmt); + } + ) } }; } @@ -848,9 +922,12 @@ impl TestState { peer_id: PeerId, version: CollationVersion, ccr: CommittedCandidateReceipt, + scheduling_parent: Hash, ) { let statement = make_seconded_statement(&self.keystore, ccr); - state.handle_seconded_collation(&mut self.sender, statement.clone()).await; + state + .handle_seconded_collation(&mut self.sender, statement.clone(), scheduling_parent) + .await; self.assert_collation_seconded_notification(peer_id, version, statement.into()) .await; } @@ -1362,7 +1439,13 @@ async fn test_peer_disconnects_after_fetch() { // Send a successful response to the third advertisement and start seconding it. test_state - .handle_fetched_collation(&mut state, third_adv, third_ccr.to_plain(), None) + .handle_fetched_collation( + &mut state, + third_adv, + third_ccr.to_plain(), + None, + third_adv.scheduling_parent, + ) .await; // Second peer disconnects, which will not free up the claim queue slot since the collation was @@ -1377,9 +1460,10 @@ async fn test_peer_disconnects_after_fetch() { // The collation was seconded, the claim will still not be freed but we won't be able to send // back a notification to the collator. + let parent = third_ccr.descriptor.scheduling_parent(); let statement = make_seconded_statement(&test_state.keystore, third_ccr); - state.handle_seconded_collation(&mut sender, statement).await; + state.handle_seconded_collation(&mut sender, statement, parent).await; test_state.assert_no_messages().await; assert_eq!(state.advertisements(), [first_adv, second_adv].into()); @@ -1803,7 +1887,7 @@ async fn test_advertisement_rejections() { let prospective_candidate = adv.prospective_candidate; // Send advertisement from a peer that is not connected. Will be dropped. - state.handle_advertisement(&mut sender, peer_id, active_leaf, None).await; + state.handle_advertisement(&mut sender, peer_id, active_leaf, None, None).await; assert!(state.advertisements().is_empty()); state.try_launch_new_fetch_requests(&mut sender).await; test_state.assert_no_messages().await; @@ -1812,7 +1896,7 @@ async fn test_advertisement_rejections() { state.handle_peer_connected(&mut sender, peer_id, CollationVersion::V2).await; state - .handle_advertisement(&mut sender, peer_id, active_leaf, prospective_candidate) + .handle_advertisement(&mut sender, peer_id, active_leaf, prospective_candidate, None) .await; assert!(state.advertisements().is_empty()); state.try_launch_new_fetch_requests(&mut sender).await; @@ -1826,7 +1910,7 @@ async fn test_advertisement_rejections() { // Relay parent outside view. state - .handle_advertisement(&mut sender, peer_id, get_hash(11), prospective_candidate) + .handle_advertisement(&mut sender, peer_id, get_hash(11), prospective_candidate, None) .await; state.try_launch_new_fetch_requests(&mut sender).await; test_state.assert_no_messages().await; @@ -1836,7 +1920,13 @@ async fn test_advertisement_rejections() { // of this peer (which is 2). for _ in 0..2 { futures::join!( - state.handle_advertisement(&mut sender, peer_id, active_leaf, prospective_candidate), + state.handle_advertisement( + &mut sender, + peer_id, + active_leaf, + prospective_candidate, + None + ), test_state.assert_can_second_request(adv, false) ); state.try_launch_new_fetch_requests(&mut sender).await; @@ -1846,7 +1936,7 @@ async fn test_advertisement_rejections() { // Here comes a valid advertisement, will be rejected because we reached the limit. state - .handle_advertisement(&mut sender, peer_id, active_leaf, prospective_candidate) + .handle_advertisement(&mut sender, peer_id, active_leaf, prospective_candidate, None) .await; // Let's add a new peerid then. @@ -1860,13 +1950,14 @@ async fn test_advertisement_rejections() { para_id: 100.into(), scheduling_parent: active_leaf, prospective_candidate, + advertised_descriptor_version: None, }; test_state.handle_advertisement(&mut state, adv).await; assert_eq!(state.advertisements(), [adv].into()); // Duplicate advertisement. Only one fetch request will be launched. state - .handle_advertisement(&mut sender, peer_id, active_leaf, prospective_candidate) + .handle_advertisement(&mut sender, peer_id, active_leaf, prospective_candidate, None) .await; assert_eq!(state.advertisements(), [adv].into()); state.try_launch_new_fetch_requests(&mut sender).await; @@ -1876,30 +1967,33 @@ async fn test_advertisement_rejections() { // We still detect the duplicate advertisement with a fetching collation. state - .handle_advertisement(&mut sender, peer_id, active_leaf, prospective_candidate) + .handle_advertisement(&mut sender, peer_id, active_leaf, prospective_candidate, None) .await; assert_eq!(state.advertisements(), [adv].into()); state.try_launch_new_fetch_requests(&mut sender).await; test_state.assert_no_messages().await; // We still detect the duplicate advertisement with the fetched collation. - test_state.handle_fetched_collation(&mut state, adv, ccr.to_plain(), None).await; + test_state + .handle_fetched_collation(&mut state, adv, ccr.to_plain(), None, adv.scheduling_parent) + .await; test_state.assert_no_messages().await; assert!(state.advertisements().is_empty()); state - .handle_advertisement(&mut sender, peer_id, active_leaf, prospective_candidate) + .handle_advertisement(&mut sender, peer_id, active_leaf, prospective_candidate, None) .await; state.try_launch_new_fetch_requests(&mut sender).await; test_state.assert_no_messages().await; // We still detect the duplicate advertisement with a seconded collation. + let parent = ccr.descriptor.relay_parent(); test_state - .second_collation(&mut state, peer_id, CollationVersion::V2, ccr) + .second_collation(&mut state, peer_id, CollationVersion::V2, ccr, parent) .await; test_state.assert_no_messages().await; assert!(state.advertisements().is_empty()); state - .handle_advertisement(&mut sender, peer_id, active_leaf, prospective_candidate) + .handle_advertisement(&mut sender, peer_id, active_leaf, prospective_candidate, None) .await; state.try_launch_new_fetch_requests(&mut sender).await; assert!(state.advertisements().is_empty()); @@ -1909,7 +2003,7 @@ async fn test_advertisement_rejections() { let peer_id = PeerId::random(); state.handle_peer_connected(&mut sender, peer_id, CollationVersion::V2).await; state.handle_declare(&mut sender, peer_id, 100.into()).await; - state.handle_advertisement(&mut sender, peer_id, get_hash(9), None).await; + state.handle_advertisement(&mut sender, peer_id, get_hash(9), None, None).await; assert!(state.advertisements().is_empty()); state.try_launch_new_fetch_requests(&mut sender).await; test_state.assert_no_messages().await; @@ -1976,6 +2070,7 @@ async fn test_collation_fetch_failure() { para_id: 100.into(), scheduling_parent: active_leaf, prospective_candidate, + advertised_descriptor_version: None, }; state.handle_peer_connected(&mut sender, peer_id, CollationVersion::V2).await; @@ -2006,6 +2101,7 @@ async fn test_collation_fetch_failure() { } else { None }, + advertised_descriptor_version: None, }; state.handle_peer_connected(&mut sender, peer_id, version).await; @@ -2035,6 +2131,7 @@ async fn test_collation_fetch_failure() { para_id: 100.into(), scheduling_parent: active_leaf, prospective_candidate, + advertised_descriptor_version: None, }; state.handle_peer_connected(&mut sender, peer_id, CollationVersion::V2).await; @@ -2071,6 +2168,7 @@ async fn test_collation_fetch_failure() { para_id: 100.into(), scheduling_parent: active_leaf, prospective_candidate, + advertised_descriptor_version: None, }; state.handle_peer_connected(&mut sender, peer_id, CollationVersion::V2).await; @@ -2105,6 +2203,7 @@ async fn test_collation_fetch_failure() { para_id: 100.into(), scheduling_parent: active_leaf, prospective_candidate, + advertised_descriptor_version: None, }; state.handle_peer_connected(&mut sender, peer_id, CollationVersion::V2).await; @@ -2133,6 +2232,7 @@ async fn test_collation_fetch_failure() { para_id: 100.into(), scheduling_parent: active_leaf, prospective_candidate: None, + advertised_descriptor_version: None, }; state.handle_peer_connected(&mut sender, peer_id, CollationVersion::V1).await; @@ -2146,7 +2246,7 @@ async fn test_collation_fetch_failure() { let res = Ok(CollationFetchingResponse::Collation(receipt.clone(), dummy_pov())); futures::join!( state.handle_fetched_collation(&mut sender, (adv, res)), - test_state.assert_pvd_request(adv, None) + test_state.assert_pvd_request(adv, None, adv.scheduling_parent) ); state.try_launch_new_fetch_requests(&mut sender).await; // No slash, as it's not the collator's fault. @@ -2172,6 +2272,7 @@ async fn test_collation_fetch_failure() { para_id: 100.into(), scheduling_parent: active_leaf, prospective_candidate, + advertised_descriptor_version: None, }; state.handle_peer_connected(&mut sender, peer_id, CollationVersion::V2).await; @@ -2189,7 +2290,7 @@ async fn test_collation_fetch_failure() { let res = Ok(CollationFetchingResponse::Collation(receipt, dummy_pov())); futures::join!( state.handle_fetched_collation(&mut sender, (adv, res)), - test_state.assert_pvd_request(adv, Some(pvd)) + test_state.assert_pvd_request(adv, Some(pvd), adv.scheduling_parent) ); state.try_launch_new_fetch_requests(&mut sender).await; assert_eq!(db.witnessed_slash(), Some((peer_id, adv.para_id, FAILED_FETCH_SLASH))); @@ -2215,6 +2316,7 @@ async fn test_collation_fetch_failure() { para_id: 100.into(), scheduling_parent: active_leaf, prospective_candidate, + advertised_descriptor_version: None, }; state.handle_peer_connected(&mut sender, peer_id, CollationVersion::V2).await; @@ -2228,7 +2330,7 @@ async fn test_collation_fetch_failure() { let res = Ok(CollationFetchingResponse::Collation(receipt, dummy_pov())); futures::join!( state.handle_fetched_collation(&mut sender, (adv, res)), - test_state.assert_pvd_request(adv, Some(dummy_pvd())) + test_state.assert_pvd_request(adv, Some(dummy_pvd()), adv.scheduling_parent) ); state.try_launch_new_fetch_requests(&mut sender).await; assert_eq!(db.witnessed_slash(), Some((peer_id, adv.para_id, FAILED_FETCH_SLASH))); @@ -2253,6 +2355,7 @@ async fn test_collation_fetch_failure() { para_id: 100.into(), scheduling_parent: active_leaf, prospective_candidate, + advertised_descriptor_version: None, }; state.handle_peer_connected(&mut sender, peer_id, CollationVersion::V2).await; @@ -2274,7 +2377,7 @@ async fn test_collation_fetch_failure() { futures::join!( state.handle_fetched_collation(&mut sender, (adv, res)), - test_state.assert_pvd_request(adv, Some(pvd)) + test_state.assert_pvd_request(adv, Some(pvd), adv.scheduling_parent) ); state.try_launch_new_fetch_requests(&mut sender).await; assert_eq!(db.witnessed_slash(), Some((peer_id, adv.para_id, FAILED_FETCH_SLASH))); @@ -2394,6 +2497,7 @@ async fn v1_descriptor_compatibility() { para_id: 100.into(), scheduling_parent: active_leaf, prospective_candidate, + advertised_descriptor_version: None, }; state.handle_peer_connected(&mut sender, peer_id, CollationVersion::V2).await; @@ -2404,11 +2508,14 @@ async fn v1_descriptor_compatibility() { state.try_launch_new_fetch_requests(&mut sender).await; test_state.assert_collation_request(adv).await; - test_state.handle_fetched_collation(&mut state, adv, receipt.into(), None).await; + test_state + .handle_fetched_collation(&mut state, adv, receipt.into(), None, adv.scheduling_parent) + .await; state.try_launch_new_fetch_requests(&mut sender).await; test_state.assert_no_messages().await; + let parent = ccr.descriptor.relay_parent; test_state - .second_collation(&mut state, peer_id, CollationVersion::V2, ccr.into()) + .second_collation(&mut state, peer_id, CollationVersion::V2, ccr.into(), parent) .await; test_state.assert_no_messages().await; } @@ -2476,12 +2583,19 @@ async fn test_invalid_collation() { // The bad collation was fetched and it's invalid. test_state - .handle_fetched_collation(&mut state, bad_adv, bad_receipt.clone(), None) + .handle_fetched_collation( + &mut state, + bad_adv, + bad_receipt.clone(), + None, + bad_adv.scheduling_parent, + ) .await; state.try_launch_new_fetch_requests(&mut sender).await; test_state.assert_no_messages().await; - state.handle_invalid_collation(bad_receipt).await; + let parent = bad_receipt.descriptor.scheduling_parent(); + state.handle_invalid_collation(bad_receipt, parent).await; // Bad peer was slashed. assert_eq!(db.witnessed_slash().unwrap(), (bad_peer, 100.into(), INVALID_COLLATION_SLASH)); @@ -2546,6 +2660,7 @@ async fn test_blocked_from_seconding_by_parent(#[case] valid_parent: bool) { para_id, scheduling_parent: active_leaf, prospective_candidate, + advertised_descriptor_version: None, }, ) }; @@ -2585,6 +2700,7 @@ async fn test_blocked_from_seconding_by_parent(#[case] valid_parent: bool) { para_id, scheduling_parent: active_leaf, prospective_candidate, + advertised_descriptor_version: None, }, ) }; @@ -2606,7 +2722,13 @@ async fn test_blocked_from_seconding_by_parent(#[case] valid_parent: bool) { // First collation is fetched and seconding kicks off. test_state - .handle_fetched_collation(&mut state, first_adv, first_ccr.to_plain(), Some(first_pvd)) + .handle_fetched_collation( + &mut state, + first_adv, + first_ccr.to_plain(), + Some(first_pvd), + first_adv.scheduling_parent, + ) .await; // Then, second collation is fetched and seconding kicks off (but the parent header is unknown @@ -2615,7 +2737,7 @@ async fn test_blocked_from_seconding_by_parent(#[case] valid_parent: bool) { futures::join!( state.handle_fetched_collation(&mut sender, (second_adv, res)), // We don't know it's pvd so it gets blocked from seconding - test_state.assert_pvd_request(second_adv, None) + test_state.assert_pvd_request(second_adv, None, second_adv.scheduling_parent) ); test_state.assert_no_messages().await; @@ -2649,13 +2771,18 @@ async fn test_blocked_from_seconding_by_parent(#[case] valid_parent: bool) { test_state.assert_no_messages().await; if valid_parent { + let parent = first_ccr.descriptor.relay_parent(); let statement = make_seconded_statement(&test_state.keystore, first_ccr); futures::join!( async { - state.handle_seconded_collation(&mut sender, statement.clone()).await; + state.handle_seconded_collation(&mut sender, statement.clone(), parent).await; }, - test_state.assert_pvd_request(second_adv, Some(second_pvd.clone())) + test_state.assert_pvd_request( + second_adv, + Some(second_pvd.clone()), + second_adv.scheduling_parent + ) ); test_state @@ -2667,13 +2794,15 @@ async fn test_blocked_from_seconding_by_parent(#[case] valid_parent: bool) { .await; // Second collation was unblocked and it began being seconded. + let parent = second_ccr.descriptor.relay_parent(); test_state - .assert_seconding_kickoff(second_ccr.to_plain(), second_pvd, dummy_pov()) + .assert_seconding_kickoff(second_ccr.to_plain(), second_pvd, dummy_pov(), parent) .await; test_state.assert_no_messages().await; + let parent = second_ccr.descriptor.relay_parent(); test_state - .second_collation(&mut state, second_peer, CollationVersion::V2, second_ccr) + .second_collation(&mut state, second_peer, CollationVersion::V2, second_ccr, parent) .await; test_state.assert_no_messages().await; @@ -2682,7 +2811,8 @@ async fn test_blocked_from_seconding_by_parent(#[case] valid_parent: bool) { state.try_launch_new_fetch_requests(&mut sender).await; test_state.assert_no_messages().await; } else { - state.handle_invalid_collation(first_ccr.to_plain()).await; + let parent = first_ccr.descriptor.relay_parent(); + state.handle_invalid_collation(first_ccr.to_plain(), parent).await; assert_eq!( db.witnessed_slash().unwrap(), (first_peer, 100.into(), INVALID_COLLATION_SLASH) @@ -2784,6 +2914,7 @@ async fn test_outdated_blocked_collations_are_pruned() { para_id, scheduling_parent: active_leaf, prospective_candidate, + advertised_descriptor_version: None, }, ) }; @@ -2823,6 +2954,7 @@ async fn test_outdated_blocked_collations_are_pruned() { para_id, scheduling_parent: active_leaf, prospective_candidate, + advertised_descriptor_version: None, }, ) }; @@ -2847,7 +2979,13 @@ async fn test_outdated_blocked_collations_are_pruned() { // First collation is fetched and seconding kicks off. test_state - .handle_fetched_collation(&mut state, first_adv, first_ccr.to_plain(), Some(first_pvd)) + .handle_fetched_collation( + &mut state, + first_adv, + first_ccr.to_plain(), + Some(first_pvd), + first_adv.scheduling_parent, + ) .await; // Then, second collation is fetched and seconding kicks off (but the parent header is unknown @@ -2856,7 +2994,7 @@ async fn test_outdated_blocked_collations_are_pruned() { futures::join!( state.handle_fetched_collation(&mut sender, (second_adv, res)), // We don't know it's pvd so it gets blocked from seconding - test_state.assert_pvd_request(second_adv, None) + test_state.assert_pvd_request(second_adv, None, second_adv.scheduling_parent) ); test_state.assert_no_messages().await; @@ -2901,9 +3039,11 @@ async fn test_outdated_blocked_collations_are_pruned() { // Even if we do end up getting a valid statement for the collation that would unblock the // second collation, it's already been dropped. + + let parent = first_ccr.descriptor.relay_parent(); let statement = make_seconded_statement(&test_state.keystore, first_ccr); - state.handle_seconded_collation(&mut sender, statement).await; + state.handle_seconded_collation(&mut sender, statement, parent).await; test_state.assert_no_messages().await; } @@ -3123,12 +3263,19 @@ async fn test_single_collation_per_rp_for_v1_advertisement() { test_state.assert_no_messages().await; test_state - .handle_fetched_collation(&mut state, first_adv, first_ccr.to_plain(), None) + .handle_fetched_collation( + &mut state, + first_adv, + first_ccr.to_plain(), + None, + first_adv.scheduling_parent, + ) .await; state.try_launch_new_fetch_requests(&mut sender).await; test_state.assert_no_messages().await; + let parent = first_ccr.descriptor.relay_parent(); test_state - .second_collation(&mut state, first_peer, CollationVersion::V1, first_ccr) + .second_collation(&mut state, first_peer, CollationVersion::V1, first_ccr, parent) .await; state.try_launch_new_fetch_requests(&mut sender).await; test_state.assert_no_messages().await; @@ -3195,6 +3342,396 @@ async fn test_view_update_preserves_relay_parent_state() { assert_eq!(state.advertisements(), [adv_a].into()); } +#[tokio::test] +// Test that a V3 candidate descriptor is correctly accepted and +// seconded when the CandidateReceiptV3 node feature is enabled. +async fn v3_descriptor_accepted_when_v3_enabled() { + let mut test_state = TestState::default(); + test_state + .node_features + .resize(node_features::FeatureIndex::CandidateReceiptV3 as usize + 1, false); + test_state + .node_features + .set(node_features::FeatureIndex::CandidateReceiptV3 as u8 as usize, true); + + let active_leaf = get_hash(9); + let leaf_info = test_state.rp_info.get(&active_leaf).unwrap().clone(); + let mut state = make_state(MockDb::default(), &mut test_state, active_leaf).await; + let mut sender = test_state.sender.clone(); + let peer_id = PeerId::random(); + + test_state.rp_info.insert( + get_hash(10), + RelayParentInfo { + number: 10, + parent: get_parent_hash(10), + session_index: leaf_info.session_index, + claim_queue: leaf_info.claim_queue.clone(), + assigned_core: leaf_info.assigned_core, + }, + ); + + test_state.activate_leaf(&mut state, 10).await; + + state.handle_peer_connected(&mut sender, peer_id, CollationVersion::V3).await; + state.handle_declare(&mut sender, peer_id, 100.into()).await; + + // V3: relay_parent (execution context) is older than scheduling_parent. + let relay_parent = get_hash(8); + let scheduling_parent = get_hash(9); + let (ccr, adv) = dummy_candidate_v3( + relay_parent, + scheduling_parent, + 100.into(), + peer_id, + leaf_info.assigned_core, + leaf_info.session_index, + dummy_pvd().hash(), + ); + + // Advertise the v3 candidate + test_state.handle_advertisement(&mut state, adv).await; + state.try_launch_new_fetch_requests(&mut sender).await; + test_state.assert_collation_request(adv).await; + test_state + .handle_fetched_collation(&mut state, adv, ccr.to_plain(), None, relay_parent) + .await; + test_state + .second_collation(&mut state, peer_id, CollationVersion::V3, ccr, active_leaf) + .await; +} + +#[tokio::test] +// V3 advertisement is accepted when the scheduling parent is a leaf +// whose slot has already finished (current_slot == leaf_slot + 1). +async fn v3_advertisement_accepted_when_sp_is_finished_slot_leaf() { + let mut test_state = TestState::default(); + test_state + .node_features + .resize(node_features::FeatureIndex::CandidateReceiptV3 as usize + 1, false); + test_state + .node_features + .set(node_features::FeatureIndex::CandidateReceiptV3 as u8 as usize, true); + + let slot_duration = sp_consensus_slots::SlotDuration::from_millis( + polkadot_primitives::RELAY_CHAIN_SLOT_DURATION_MILLIS, + ); + let current_slot = + sp_consensus_slots::Slot::from_timestamp(sp_timestamp::Timestamp::current(), slot_duration); + test_state + .slot_overrides + .insert(get_hash(10), sp_consensus_slots::Slot::from(*current_slot - 1)); + + let active_leaf = get_hash(10); + let leaf_info = test_state.rp_info.get(&active_leaf).unwrap().clone(); + let mut state = make_state(MockDb::default(), &mut test_state, active_leaf).await; + let mut sender = test_state.sender.clone(); + let peer_id = PeerId::random(); + + state.handle_peer_connected(&mut sender, peer_id, CollationVersion::V3).await; + state.handle_declare(&mut sender, peer_id, 100.into()).await; + + let relay_parent = get_hash(9); + let scheduling_parent = get_hash(10); + let (ccr, adv) = dummy_candidate_v3( + relay_parent, + scheduling_parent, + 100.into(), + peer_id, + leaf_info.assigned_core, + leaf_info.session_index, + dummy_pvd().hash(), + ); + + // Advertise the v3 candidate + test_state.handle_advertisement(&mut state, adv).await; + state.try_launch_new_fetch_requests(&mut sender).await; + test_state.assert_collation_request(adv).await; + test_state + .handle_fetched_collation(&mut state, adv, ccr.to_plain(), None, relay_parent) + .await; + test_state + .second_collation(&mut state, peer_id, CollationVersion::V3, ccr, active_leaf) + .await; +} + +#[tokio::test] +// V3 advertisements require the scheduling parent to be a RC block from +// the last finished slot. Check that the leaf of an active slot and its +// grand parent are rejected +async fn v3_advertisement_rejected_when_sp_not_last_finished_slot() { + let mut test_state = TestState::default(); + test_state + .node_features + .resize(node_features::FeatureIndex::CandidateReceiptV3 as usize + 1, false); + test_state + .node_features + .set(node_features::FeatureIndex::CandidateReceiptV3 as u8 as usize, true); + + let active_leaf = get_hash(10); + let leaf_info = test_state.rp_info.get(&active_leaf).unwrap().clone(); + let mut state = make_state(MockDb::default(), &mut test_state, active_leaf).await; + let mut sender = test_state.sender.clone(); + let peer_id = PeerId::random(); + + test_state.rp_info.insert( + get_hash(11), + RelayParentInfo { + number: 11, + parent: get_parent_hash(11), + session_index: leaf_info.session_index, + claim_queue: leaf_info.claim_queue.clone(), + assigned_core: leaf_info.assigned_core, + }, + ); + + test_state.activate_leaf(&mut state, 11).await; + + state.handle_peer_connected(&mut sender, peer_id, CollationVersion::V3).await; + state.handle_declare(&mut sender, peer_id, 100.into()).await; + + // Test that the grand parent of a block from the current, unfinished slot is rejected + let (_ccr, adv) = dummy_candidate_v3( + get_hash(9), + get_hash(9), + 100.into(), + peer_id, + leaf_info.assigned_core, + leaf_info.session_index, + dummy_pvd().hash(), + ); + + state + .handle_advertisement( + &mut sender, + peer_id, + adv.scheduling_parent, + adv.prospective_candidate, + Some(CandidateDescriptorVersion::V3), + ) + .await; + assert!(state.advertisements().is_empty()); + state.try_launch_new_fetch_requests(&mut sender).await; + test_state.assert_no_messages().await; + + // Test that the current leaf is rejected as the slot is not yet finished. + let (_ccr, adv) = dummy_candidate_v3( + get_hash(8), + get_hash(11), + 100.into(), + peer_id, + leaf_info.assigned_core, + leaf_info.session_index, + dummy_pvd().hash(), + ); + state + .handle_advertisement( + &mut sender, + peer_id, + adv.scheduling_parent, + adv.prospective_candidate, + Some(CandidateDescriptorVersion::V3), + ) + .await; + assert!(state.advertisements().is_empty()); + state.try_launch_new_fetch_requests(&mut sender).await; + test_state.assert_no_messages().await; + + // Test that the parent of the current leaf (last finished slot) is accepted. + let relay_parent = get_hash(9); + let scheduling_parent = get_hash(10); + let (ccr, adv) = dummy_candidate_v3( + relay_parent, + scheduling_parent, + 100.into(), + peer_id, + leaf_info.assigned_core, + leaf_info.session_index, + dummy_pvd().hash(), + ); + + test_state.handle_advertisement(&mut state, adv).await; + state.try_launch_new_fetch_requests(&mut sender).await; + test_state.assert_collation_request(adv).await; + test_state + .handle_fetched_collation(&mut state, adv, ccr.to_plain(), None, relay_parent) + .await; + test_state + .second_collation(&mut state, peer_id, CollationVersion::V3, ccr, active_leaf) + .await; +} + +#[tokio::test] +// V3 descriptor via V2 protocol → rejected (wrong protocol). +async fn v3_descriptor_rejected_via_v2_protocol() { + let mut test_state = TestState::default(); + + let active_leaf = get_hash(10); + let leaf_info = test_state.rp_info.get(&active_leaf).unwrap().clone(); + let db = MockDb::default(); + let mut state = make_state(db.clone(), &mut test_state, active_leaf).await; + let mut sender = test_state.sender.clone(); + let peer_id = PeerId::random(); + + state.handle_peer_connected(&mut sender, peer_id, CollationVersion::V2).await; + state.handle_declare(&mut sender, peer_id, 100.into()).await; + + let (ccr, _) = dummy_candidate_v3( + active_leaf, + active_leaf, + 100.into(), + peer_id, + leaf_info.assigned_core, + leaf_info.session_index, + dummy_pvd().hash(), + ); + + let receipt = ccr.to_plain(); + + let adv = Advertisement { + peer_id, + para_id: 100.into(), + scheduling_parent: active_leaf, + prospective_candidate: Some(ProspectiveCandidate { + candidate_hash: receipt.hash(), + parent_head_data_hash: dummy_pvd().parent_head.hash(), + }), + advertised_descriptor_version: None, + }; + + test_state.handle_advertisement(&mut state, adv).await; + state.try_launch_new_fetch_requests(&mut sender).await; + test_state.assert_collation_request(adv).await; + + let res = Ok(CollationFetchingResponse::Collation(receipt, dummy_pov())); + state.handle_fetched_collation(&mut sender, (adv, res)).await; + assert_eq!(db.witnessed_slash(), Some((peer_id, adv.para_id, FAILED_FETCH_SLASH))); + test_state.assert_no_messages().await; +} + +#[tokio::test] +// Test that a collator advertising a V3 descriptor but fetching a V2 candidate +// is rejected due to descriptor version mismatch and the peer is slashed. +async fn v3_advertised_but_v2_fetched_descriptor_version_mismatch() { + let mut test_state = TestState::default(); + test_state + .node_features + .resize(node_features::FeatureIndex::CandidateReceiptV3 as usize + 1, false); + test_state + .node_features + .set(node_features::FeatureIndex::CandidateReceiptV3 as u8 as usize, true); + + let active_leaf = get_hash(9); + let leaf_info = test_state.rp_info.get(&active_leaf).unwrap().clone(); + let db = MockDb::default(); + let mut state = make_state(db.clone(), &mut test_state, active_leaf).await; + let mut sender = test_state.sender.clone(); + let peer_id = PeerId::random(); + + test_state.rp_info.insert( + get_hash(10), + RelayParentInfo { + number: 10, + parent: get_parent_hash(10), + session_index: leaf_info.session_index, + claim_queue: leaf_info.claim_queue.clone(), + assigned_core: leaf_info.assigned_core, + }, + ); + + test_state.activate_leaf(&mut state, 10).await; + + state.handle_peer_connected(&mut sender, peer_id, CollationVersion::V3).await; + state.handle_declare(&mut sender, peer_id, 100.into()).await; + + // Create a V2-style candidate (version=0, no scheduling_parent set). + let (ccr, _) = dummy_candidate( + active_leaf, + 100.into(), + peer_id, + leaf_info.assigned_core, + leaf_info.session_index, + dummy_pvd().hash(), + ); + + let receipt = ccr.to_plain(); + + // Advertise as V3 but the actual fetched candidate is V2. + let adv = Advertisement { + peer_id, + para_id: 100.into(), + scheduling_parent: get_hash(9), + prospective_candidate: Some(ProspectiveCandidate { + candidate_hash: receipt.hash(), + parent_head_data_hash: dummy_pvd().parent_head.hash(), + }), + advertised_descriptor_version: Some(CandidateDescriptorVersion::V3), + }; + + test_state.handle_advertisement(&mut state, adv).await; + state.try_launch_new_fetch_requests(&mut sender).await; + test_state.assert_collation_request(adv).await; + let res = Ok(CollationFetchingResponse::Collation(receipt, dummy_pov())); + state.handle_fetched_collation(&mut sender, (adv, res)).await; + assert_eq!(db.witnessed_slash(), Some((peer_id, adv.para_id, FAILED_FETCH_SLASH))); + test_state.assert_no_messages().await; +} + +#[tokio::test] +// Test that a crafted descriptor is rejected as Unknown version when +// CandidateReceiptV3 feature is disabled. +async fn v3_descriptor_unknown_rejected_when_v3_disabled() { + let mut test_state = TestState::default(); + + let active_leaf = get_hash(9); + let leaf_info = test_state.rp_info.get(&active_leaf).unwrap().clone(); + let db = MockDb::default(); + let mut state = make_state(db.clone(), &mut test_state, active_leaf).await; + let mut sender = test_state.sender.clone(); + let peer_id = PeerId::random(); + test_state.rp_info.insert( + get_hash(10), + RelayParentInfo { + number: 10, + parent: get_parent_hash(9), + session_index: leaf_info.session_index, + claim_queue: leaf_info.claim_queue.clone(), + assigned_core: leaf_info.assigned_core, + }, + ); + + test_state.activate_leaf(&mut state, 10).await; + + state.handle_peer_connected(&mut sender, peer_id, CollationVersion::V2).await; + state.handle_declare(&mut sender, peer_id, 100.into()).await; + + let mut ccr = dummy_committed_candidate_receipt_v2(active_leaf); + ccr.descriptor.set_para_id(100.into()); + ccr.descriptor.set_persisted_validation_data_hash(dummy_pvd().hash()); + ccr.descriptor.set_core_index(leaf_info.assigned_core); + ccr.descriptor.set_session_index(leaf_info.session_index); + ccr.descriptor.set_version(1); + + let receipt = ccr.to_plain(); + let adv = Advertisement { + peer_id, + para_id: 100.into(), + scheduling_parent: active_leaf, + prospective_candidate: Some(ProspectiveCandidate { + candidate_hash: receipt.hash(), + parent_head_data_hash: dummy_pvd().parent_head.hash(), + }), + advertised_descriptor_version: None, + }; + test_state.handle_advertisement(&mut state, adv).await; + state.try_launch_new_fetch_requests(&mut sender).await; + test_state.assert_collation_request(adv).await; + let res = Ok(CollationFetchingResponse::Collation(receipt, dummy_pov())); + state.handle_fetched_collation(&mut sender, (adv, res)).await; + assert_eq!(db.witnessed_slash(), Some((peer_id, adv.para_id, FAILED_FETCH_SLASH))); + test_state.assert_no_messages().await; +} + // Launching new collations: // - multiple candidates per relay parent (including from implicit view and which occupy future // claims, including which will make claims across different leaves) diff --git a/polkadot/primitives/src/v9/mod.rs b/polkadot/primitives/src/v9/mod.rs index 0d177214ed04d..c53f878204963 100644 --- a/polkadot/primitives/src/v9/mod.rs +++ b/polkadot/primitives/src/v9/mod.rs @@ -1836,7 +1836,7 @@ impl> Default for SchedulerParams } /// A type representing the version of the candidate descriptor. -#[derive(PartialEq, Eq, Copy, Clone, Encode, Decode, TypeInfo, Debug)] +#[derive(PartialEq, Eq, Copy, Clone, Encode, Decode, TypeInfo, Debug, PartialOrd, Ord, Hash)] pub enum CandidateDescriptorVersion { /// with deprecated collator id and collator signature. V1, diff --git a/prdoc/pr_11306.prdoc b/prdoc/pr_11306.prdoc new file mode 100644 index 0000000000000..8e6dd9b9728cd --- /dev/null +++ b/prdoc/pr_11306.prdoc @@ -0,0 +1,14 @@ +title: Add CandidateDescriptorV3 support to experimental validator +doc: +- audience: Node Dev + description: |- + Adds CandidateDescriptorV3 support to the experimental validator-side collator protocol. + + Fixes: #11084 +crates: +- name: polkadot-node-core-backing + bump: minor +- name: polkadot-collator-protocol + bump: minor +- name: polkadot-primitives + bump: minor