Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor and harden check_core_index #6217

Merged
merged 16 commits into from
Nov 5, 2024
8 changes: 3 additions & 5 deletions cumulus/client/consensus/aura/src/collators/lookahead.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,17 +36,15 @@ use cumulus_client_collator::service::ServiceInterface as CollatorServiceInterfa
use cumulus_client_consensus_common::{self as consensus_common, ParachainBlockImportMarker};
use cumulus_client_consensus_proposer::ProposerInterface;
use cumulus_primitives_aura::AuraUnincludedSegmentApi;
use cumulus_primitives_core::{
ClaimQueueOffset, CollectCollationInfo, PersistedValidationData, DEFAULT_CLAIM_QUEUE_OFFSET,
};
use cumulus_primitives_core::{ClaimQueueOffset, CollectCollationInfo, PersistedValidationData};
use cumulus_relay_chain_interface::RelayChainInterface;

use polkadot_node_primitives::{PoV, SubmitCollationParams};
use polkadot_node_subsystem::messages::CollationGenerationMessage;
use polkadot_overseer::Handle as OverseerHandle;
use polkadot_primitives::{
BlockNumber as RBlockNumber, CollatorPair, Hash as RHash, HeadData, Id as ParaId,
OccupiedCoreAssumption,
vstaging::DEFAULT_CLAIM_QUEUE_OFFSET, BlockNumber as RBlockNumber, CollatorPair, Hash as RHash,
HeadData, Id as ParaId, OccupiedCoreAssumption,
};

use futures::prelude::*;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,11 @@ use cumulus_client_collator::service::ServiceInterface as CollatorServiceInterfa
use cumulus_client_consensus_common::{self as consensus_common, ParachainBlockImportMarker};
use cumulus_client_consensus_proposer::ProposerInterface;
use cumulus_primitives_aura::AuraUnincludedSegmentApi;
use cumulus_primitives_core::{
GetCoreSelectorApi, PersistedValidationData, DEFAULT_CLAIM_QUEUE_OFFSET,
};
use cumulus_primitives_core::{GetCoreSelectorApi, PersistedValidationData};
use cumulus_relay_chain_interface::RelayChainInterface;

use polkadot_primitives::{
vstaging::{ClaimQueueOffset, CoreSelector},
vstaging::{ClaimQueueOffset, CoreSelector, DEFAULT_CLAIM_QUEUE_OFFSET},
BlockId, CoreIndex, Hash as RelayHash, Header as RelayHeader, Id as ParaId,
OccupiedCoreAssumption,
};
Expand Down
4 changes: 2 additions & 2 deletions cumulus/pallets/parachain-system/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,12 @@ use core::{cmp, marker::PhantomData};
use cumulus_primitives_core::{
relay_chain::{
self,
vstaging::{ClaimQueueOffset, CoreSelector},
vstaging::{ClaimQueueOffset, CoreSelector, DEFAULT_CLAIM_QUEUE_OFFSET},
},
AbridgedHostConfiguration, ChannelInfo, ChannelStatus, CollationInfo, GetChannelInfo,
InboundDownwardMessage, InboundHrmpMessage, ListChannelInfos, MessageSendError,
OutboundHrmpMessage, ParaId, PersistedValidationData, UpwardMessage, UpwardMessageSender,
XcmpMessageHandler, XcmpMessageSource, DEFAULT_CLAIM_QUEUE_OFFSET,
XcmpMessageHandler, XcmpMessageSource,
};
use cumulus_primitives_parachain_inherent::{MessageQueueChain, ParachainInherentData};
use frame_support::{
Expand Down
4 changes: 0 additions & 4 deletions cumulus/primitives/core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -333,10 +333,6 @@ pub mod rpsr_digest {
}
}

/// The default claim queue offset to be used if it's not configured/accessible in the parachain
/// runtime
pub const DEFAULT_CLAIM_QUEUE_OFFSET: u8 = 0;

/// Information about a collation.
///
/// This was used in version 1 of the [`CollectCollationInfo`] runtime api.
Expand Down
2 changes: 1 addition & 1 deletion polkadot/node/core/candidate-validation/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -860,7 +860,7 @@ async fn validate_candidate_exhaustive(
};

if let Err(err) = committed_candidate_receipt
.check_core_index(&transpose_claim_queue(claim_queue.0))
.check_core_index(&transpose_claim_queue(claim_queue.0), None)
{
gum::warn!(
target: LOG_TARGET,
Expand Down
122 changes: 68 additions & 54 deletions polkadot/primitives/src/vstaging/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,10 @@ use sp_staking::SessionIndex;
/// Async backing primitives
pub mod async_backing;

/// The default claim queue offset to be used if it's not configured/accessible in the parachain
/// runtime
pub const DEFAULT_CLAIM_QUEUE_OFFSET: u8 = 0;

/// A type representing the version of the candidate descriptor and internal version number.
#[derive(PartialEq, Eq, Encode, Decode, Clone, TypeInfo, RuntimeDebug, Copy)]
#[cfg_attr(feature = "std", derive(Hash))]
Expand Down Expand Up @@ -425,41 +429,36 @@ pub enum UMPSignal {
pub const UMP_SEPARATOR: Vec<u8> = vec![];

impl CandidateCommitments {
/// Returns the core selector and claim queue offset the candidate has committed to, if any.
pub fn selected_core(&self) -> Option<(CoreSelector, ClaimQueueOffset)> {
// We need at least 2 messages for the separator and core selector
if self.upward_messages.len() < 2 {
return None
}

let separator_pos =
self.upward_messages.iter().rposition(|message| message == &UMP_SEPARATOR)?;

// Use first commitment
let message = self.upward_messages.get(separator_pos + 1)?;

match UMPSignal::decode(&mut message.as_slice()).ok()? {
UMPSignal::SelectCore(core_selector, cq_offset) => Some((core_selector, cq_offset)),
}
}

/// Returns the core index determined by `UMPSignal::SelectCore` commitment
/// and `assigned_cores`.
///
/// Returns `None` if there is no `UMPSignal::SelectCore` commitment or
alindima marked this conversation as resolved.
Show resolved Hide resolved
/// assigned cores is empty.
///
/// `assigned_cores` must be a sorted vec of all core indices assigned to a parachain.
pub fn committed_core_index(&self, assigned_cores: &[&CoreIndex]) -> Option<CoreIndex> {
if assigned_cores.is_empty() {
return None
}
pub fn core_selector(
&self,
) -> Result<(Option<CoreSelector>, ClaimQueueOffset), CandidateReceiptError> {
let mut signals_iter =
self.upward_messages.iter().skip_while(|message| *message != &UMP_SEPARATOR);

if signals_iter.next().is_some() {
let core_selector_message =
signals_iter.next().ok_or(CandidateReceiptError::NoCoreSelected)?;
// We should have exactly one signal beyond the separator
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unrelated q: right now we have a strict check of at most one signal
IIUC this is being check on both node and runtime sides
if we are to add more signals in the future, how are we going to coordinate the upgrade? via node_features?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I discussed this with @sandreim a while ago. Yes, we'll probably use a new node feature

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, we'll need a feature anyway if we want to introduce a new signal.

if signals_iter.next().is_some() {
return Err(CandidateReceiptError::TooManyUMPSignals)
}

self.selected_core().and_then(|(core_selector, _cq_offset)| {
let core_index =
**assigned_cores.get(core_selector.0 as usize % assigned_cores.len())?;
Some(core_index)
})
match UMPSignal::decode(&mut core_selector_message.as_slice())
.map_err(|_| CandidateReceiptError::InvalidSelectedCore)?
alindima marked this conversation as resolved.
Show resolved Hide resolved
{
UMPSignal::SelectCore(core_selector, cq_offset) =>
Ok((Some(core_selector), cq_offset)),
}
} else {
Ok((None, ClaimQueueOffset(DEFAULT_CLAIM_QUEUE_OFFSET)))
}
}
}

Expand All @@ -479,6 +478,8 @@ pub enum CandidateReceiptError {
/// The core selector or claim queue offset is invalid.
#[cfg_attr(feature = "std", error("The core selector or claim queue offset is invalid"))]
InvalidSelectedCore,
/// Could not decode UMP signal.
UmpSignalDecode,
/// The parachain is not assigned to any core at specified claim queue offset.
#[cfg_attr(
feature = "std",
Expand All @@ -492,6 +493,9 @@ pub enum CandidateReceiptError {
/// Unknown version.
#[cfg_attr(feature = "std", error("Unknown internal version"))]
UnknownVersion(InternalVersion),
/// The allowed number of `UMPSignal` messages in the queue was exceeded.
/// Currenly only one such message is allowed.
TooManyUMPSignals,
}

macro_rules! impl_getter {
Expand Down Expand Up @@ -589,6 +593,7 @@ impl<H: Copy> CommittedCandidateReceiptV2<H> {
pub fn check_core_index(
&self,
cores_per_para: &TransposedClaimQueue,
core_index_enabled: Option<bool>,
alindima marked this conversation as resolved.
Show resolved Hide resolved
) -> Result<(), CandidateReceiptError> {
match self.descriptor.version() {
// Don't check v1 descriptors.
Expand All @@ -598,41 +603,50 @@ impl<H: Copy> CommittedCandidateReceiptV2<H> {
return Err(CandidateReceiptError::UnknownVersion(self.descriptor.version)),
}

if cores_per_para.is_empty() {
return Err(CandidateReceiptError::NoAssignment)
}

let (offset, core_selected) =
if let Some((_core_selector, cq_offset)) = self.commitments.selected_core() {
(cq_offset.0, true)
} else {
// If no core has been selected then we use offset 0 (top of claim queue)
(0, false)
};
let (maybe_core_index_selector, cq_offset) = self.commitments.core_selector()?;

// The cores assigned to the parachain at above computed offset.
let assigned_cores = cores_per_para
.get(&self.descriptor.para_id())
.ok_or(CandidateReceiptError::NoAssignment)?
.get(&offset)
.ok_or(CandidateReceiptError::NoAssignment)?
.into_iter()
.collect::<Vec<_>>();
.get(&cq_offset.0)
.ok_or(CandidateReceiptError::NoAssignment)?;

let core_index = if core_selected {
self.commitments
.committed_core_index(assigned_cores.as_slice())
.ok_or(CandidateReceiptError::NoAssignment)?
} else {
// `SelectCore` commitment is mandatory for elastic scaling parachains.
if assigned_cores.len() > 1 {
return Err(CandidateReceiptError::NoCoreSelected)
}
if assigned_cores.is_empty() {
return Err(CandidateReceiptError::NoAssignment)
}

let descriptor_core_index = CoreIndex(self.descriptor.core_index as u32);

**assigned_cores.get(0).ok_or(CandidateReceiptError::NoAssignment)?
let core_index_selector = if let Some(core_index_selector) = maybe_core_index_selector {
// We have a committed core selector, we can use it.
core_index_selector
} else if assigned_cores.len() > 1 {
// We got more than one assigned core and no core selector. Special care is needed.

alindima marked this conversation as resolved.
Show resolved Hide resolved
match core_index_enabled {
// Elastic scaling MVPMVP feature is not supplied, nothing more to check.
alindima marked this conversation as resolved.
Show resolved Hide resolved
None => return Ok(()),
// Elastic scaling MVP feature is disabled. Error.
Some(false) => return Err(CandidateReceiptError::NoCoreSelected),
// Elastic scaling MVP feature is enabled but the core index in the descriptor is
// not assigned to the para. Error.
Some(true) if !assigned_cores.contains(&descriptor_core_index) =>
return Err(CandidateReceiptError::InvalidCoreIndex),
// Elastic scaling MVP feature is enabled and the descriptor core index is indeed
// assigned to the para. This is the most we can check for now.
Some(true) => return Ok(()),
}
} else {
// No core selector but there's only one assigned core, use it.
CoreSelector(0)
};

let descriptor_core_index = CoreIndex(self.descriptor.core_index as u32);
let core_index = assigned_cores
.iter()
.nth(core_index_selector.0 as usize % assigned_cores.len())
.ok_or(CandidateReceiptError::InvalidSelectedCore)
.copied()?;

if core_index != descriptor_core_index {
return Err(CandidateReceiptError::CoreIndexMismatch)
}
Expand Down
38 changes: 6 additions & 32 deletions polkadot/runtime/parachains/src/inclusion/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ use polkadot_primitives::{
vstaging::{
BackedCandidate, CandidateDescriptorV2 as CandidateDescriptor,
CandidateReceiptV2 as CandidateReceipt,
CommittedCandidateReceiptV2 as CommittedCandidateReceipt,
CommittedCandidateReceiptV2 as CommittedCandidateReceipt, UMP_SEPARATOR,
},
well_known_keys, CandidateCommitments, CandidateHash, CoreIndex, GroupIndex, HeadData,
Id as ParaId, SignedAvailabilityBitfields, SigningContext, UpwardMessage, ValidatorId,
Expand Down Expand Up @@ -412,11 +412,6 @@ pub(crate) enum UmpAcceptanceCheckErr {
TotalSizeExceeded { total_size: u64, limit: u64 },
/// A para-chain cannot send UMP messages while it is offboarding.
IsOffboarding,
/// The allowed number of `UMPSignal` messages in the queue was exceeded.
/// Currenly only one such message is allowed.
TooManyUMPSignals { count: u32 },
/// The UMP queue contains an invalid `UMPSignal`
NoUmpSignal,
}

impl fmt::Debug for UmpAcceptanceCheckErr {
Expand Down Expand Up @@ -445,12 +440,6 @@ impl fmt::Debug for UmpAcceptanceCheckErr {
UmpAcceptanceCheckErr::IsOffboarding => {
write!(fmt, "upward message rejected because the para is off-boarding")
},
UmpAcceptanceCheckErr::TooManyUMPSignals { count } => {
write!(fmt, "the ump queue has too many `UMPSignal` messages ({} > 1 )", count)
},
UmpAcceptanceCheckErr::NoUmpSignal => {
write!(fmt, "Required UMP signal not found")
},
}
}
}
Expand Down Expand Up @@ -925,25 +914,10 @@ impl<T: Config> Pallet<T> {
upward_messages: &[UpwardMessage],
) -> Result<(), UmpAcceptanceCheckErr> {
// Filter any pending UMP signals and the separator.
let upward_messages = if let Some(separator_index) =
upward_messages.iter().position(|message| message.is_empty())
{
let (upward_messages, ump_signals) = upward_messages.split_at(separator_index);

if ump_signals.len() > 2 {
return Err(UmpAcceptanceCheckErr::TooManyUMPSignals {
count: ump_signals.len() as u32,
})
}

if ump_signals.len() == 1 {
return Err(UmpAcceptanceCheckErr::NoUmpSignal)
}

upward_messages
} else {
upward_messages
};
let upward_messages = upward_messages
.iter()
.take_while(|message| message != &&UMP_SEPARATOR)
.collect::<Vec<_>>();

// Cannot send UMP messages while off-boarding.
if paras::Pallet::<T>::is_offboarding(para) {
Expand Down Expand Up @@ -1000,7 +974,7 @@ impl<T: Config> Pallet<T> {
let bounded = upward_messages
.iter()
// Stop once we hit the `UMPSignal` separator.
.take_while(|message| !message.is_empty())
.take_while(|message| message != &&UMP_SEPARATOR)
.filter_map(|d| {
BoundedSlice::try_from(&d[..])
.inspect_err(|_| {
Expand Down
14 changes: 11 additions & 3 deletions polkadot/runtime/parachains/src/paras_inherent/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -986,6 +986,7 @@ fn sanitize_backed_candidate_v2<T: crate::inclusion::Config>(
candidate: &BackedCandidate<T::Hash>,
allowed_relay_parents: &AllowedRelayParentsTracker<T::Hash, BlockNumberFor<T>>,
allow_v2_receipts: bool,
core_index_enabled: bool,
) -> bool {
if candidate.descriptor().version() == CandidateDescriptorVersion::V1 {
return true
Expand Down Expand Up @@ -1040,7 +1041,10 @@ fn sanitize_backed_candidate_v2<T: crate::inclusion::Config>(
};

// Check validity of `core_index`.
if let Err(err) = candidate.candidate().check_core_index(&rp_info.claim_queue) {
if let Err(err) = candidate
.candidate()
.check_core_index(&rp_info.claim_queue, Some(core_index_enabled))
{
log::debug!(
target: LOG_TARGET,
"Dropping candidate {:?} for paraid {:?}, {:?}",
Expand Down Expand Up @@ -1085,8 +1089,12 @@ fn sanitize_backed_candidates<T: crate::inclusion::Config>(
let mut candidates_per_para: BTreeMap<ParaId, Vec<_>> = BTreeMap::new();

for candidate in backed_candidates {
if !sanitize_backed_candidate_v2::<T>(&candidate, allowed_relay_parents, allow_v2_receipts)
{
if !sanitize_backed_candidate_v2::<T>(
&candidate,
allowed_relay_parents,
allow_v2_receipts,
core_index_enabled,
) {
continue
}

Expand Down
Loading