Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor and harden check_core_index #6217

Merged
merged 16 commits into from
Nov 5, 2024
8 changes: 3 additions & 5 deletions cumulus/client/consensus/aura/src/collators/lookahead.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,17 +36,15 @@ use cumulus_client_collator::service::ServiceInterface as CollatorServiceInterfa
use cumulus_client_consensus_common::{self as consensus_common, ParachainBlockImportMarker};
use cumulus_client_consensus_proposer::ProposerInterface;
use cumulus_primitives_aura::AuraUnincludedSegmentApi;
use cumulus_primitives_core::{
ClaimQueueOffset, CollectCollationInfo, PersistedValidationData, DEFAULT_CLAIM_QUEUE_OFFSET,
};
use cumulus_primitives_core::{ClaimQueueOffset, CollectCollationInfo, PersistedValidationData};
use cumulus_relay_chain_interface::RelayChainInterface;

use polkadot_node_primitives::{PoV, SubmitCollationParams};
use polkadot_node_subsystem::messages::CollationGenerationMessage;
use polkadot_overseer::Handle as OverseerHandle;
use polkadot_primitives::{
BlockNumber as RBlockNumber, CollatorPair, Hash as RHash, HeadData, Id as ParaId,
OccupiedCoreAssumption,
vstaging::DEFAULT_CLAIM_QUEUE_OFFSET, BlockNumber as RBlockNumber, CollatorPair, Hash as RHash,
HeadData, Id as ParaId, OccupiedCoreAssumption,
};

use futures::prelude::*;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,11 @@ use cumulus_client_collator::service::ServiceInterface as CollatorServiceInterfa
use cumulus_client_consensus_common::{self as consensus_common, ParachainBlockImportMarker};
use cumulus_client_consensus_proposer::ProposerInterface;
use cumulus_primitives_aura::AuraUnincludedSegmentApi;
use cumulus_primitives_core::{
GetCoreSelectorApi, PersistedValidationData, DEFAULT_CLAIM_QUEUE_OFFSET,
};
use cumulus_primitives_core::{GetCoreSelectorApi, PersistedValidationData};
use cumulus_relay_chain_interface::RelayChainInterface;

use polkadot_primitives::{
vstaging::{ClaimQueueOffset, CoreSelector},
vstaging::{ClaimQueueOffset, CoreSelector, DEFAULT_CLAIM_QUEUE_OFFSET},
BlockId, CoreIndex, Hash as RelayHash, Header as RelayHeader, Id as ParaId,
OccupiedCoreAssumption,
};
Expand Down
4 changes: 2 additions & 2 deletions cumulus/pallets/parachain-system/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,12 @@ use core::{cmp, marker::PhantomData};
use cumulus_primitives_core::{
relay_chain::{
self,
vstaging::{ClaimQueueOffset, CoreSelector},
vstaging::{ClaimQueueOffset, CoreSelector, DEFAULT_CLAIM_QUEUE_OFFSET},
},
AbridgedHostConfiguration, ChannelInfo, ChannelStatus, CollationInfo, GetChannelInfo,
InboundDownwardMessage, InboundHrmpMessage, ListChannelInfos, MessageSendError,
OutboundHrmpMessage, ParaId, PersistedValidationData, UpwardMessage, UpwardMessageSender,
XcmpMessageHandler, XcmpMessageSource, DEFAULT_CLAIM_QUEUE_OFFSET,
XcmpMessageHandler, XcmpMessageSource,
};
use cumulus_primitives_parachain_inherent::{MessageQueueChain, ParachainInherentData};
use frame_support::{
Expand Down
4 changes: 0 additions & 4 deletions cumulus/primitives/core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -333,10 +333,6 @@ pub mod rpsr_digest {
}
}

/// The default claim queue offset to be used if it's not configured/accessible in the parachain
/// runtime
pub const DEFAULT_CLAIM_QUEUE_OFFSET: u8 = 0;

/// Information about a collation.
///
/// This was used in version 1 of the [`CollectCollationInfo`] runtime api.
Expand Down
4 changes: 2 additions & 2 deletions polkadot/node/collation-generation/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.

use polkadot_primitives::vstaging::CandidateReceiptError;
use polkadot_primitives::vstaging::CommittedCandidateReceiptError;
use thiserror::Error;

#[derive(Debug, Error)]
Expand All @@ -34,7 +34,7 @@ pub enum Error {
#[error("Collation submitted before initialization")]
SubmittedBeforeInit,
#[error("V2 core index check failed: {0}")]
CandidateReceiptCheck(CandidateReceiptError),
CandidateReceiptCheck(CommittedCandidateReceiptError),
#[error("PoV size {0} exceeded maximum size of {1}")]
POVSizeExceeded(usize, usize),
}
Expand Down
2 changes: 1 addition & 1 deletion polkadot/node/core/candidate-validation/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -860,7 +860,7 @@ async fn validate_candidate_exhaustive(
};

if let Err(err) = committed_candidate_receipt
.check_core_index(&transpose_claim_queue(claim_queue.0))
.check_core_index(&transpose_claim_queue(claim_queue.0), None)
{
gum::warn!(
target: LOG_TARGET,
Expand Down
157 changes: 87 additions & 70 deletions polkadot/primitives/src/vstaging/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,10 @@ use sp_staking::SessionIndex;
/// Async backing primitives
pub mod async_backing;

/// The default claim queue offset to be used if it's not configured/accessible in the parachain
/// runtime
pub const DEFAULT_CLAIM_QUEUE_OFFSET: u8 = 0;

/// A type representing the version of the candidate descriptor and internal version number.
#[derive(PartialEq, Eq, Encode, Decode, Clone, TypeInfo, RuntimeDebug, Copy)]
#[cfg_attr(feature = "std", derive(Hash))]
Expand Down Expand Up @@ -425,48 +429,42 @@ pub enum UMPSignal {
pub const UMP_SEPARATOR: Vec<u8> = vec![];

impl CandidateCommitments {
/// Returns the core selector and claim queue offset the candidate has committed to, if any.
pub fn selected_core(&self) -> Option<(CoreSelector, ClaimQueueOffset)> {
// We need at least 2 messages for the separator and core selector
if self.upward_messages.len() < 2 {
return None
}

let separator_pos =
self.upward_messages.iter().rposition(|message| message == &UMP_SEPARATOR)?;

// Use first commitment
let message = self.upward_messages.get(separator_pos + 1)?;

match UMPSignal::decode(&mut message.as_slice()).ok()? {
UMPSignal::SelectCore(core_selector, cq_offset) => Some((core_selector, cq_offset)),
}
}

/// Returns the core index determined by `UMPSignal::SelectCore` commitment
/// and `assigned_cores`.
///
/// Returns `None` if there is no `UMPSignal::SelectCore` commitment or
/// assigned cores is empty.
/// Returns the core selector and claim queue offset determined by `UMPSignal::SelectCore`
/// commitment
///
/// `assigned_cores` must be a sorted vec of all core indices assigned to a parachain.
pub fn committed_core_index(&self, assigned_cores: &[&CoreIndex]) -> Option<CoreIndex> {
if assigned_cores.is_empty() {
return None
}
/// Returns a tuple where the first item is the optional core index selector and the second item
/// is the claim queue offset, initialized with a default value if not present in the
/// commitments.
pub fn core_selector(
&self,
) -> Result<(Option<CoreSelector>, ClaimQueueOffset), CommittedCandidateReceiptError> {
alindima marked this conversation as resolved.
Show resolved Hide resolved
let mut signals_iter =
self.upward_messages.iter().skip_while(|message| *message != &UMP_SEPARATOR);

if signals_iter.next().is_some() {
let core_selector_message =
signals_iter.next().ok_or(CommittedCandidateReceiptError::NoCoreSelected)?;
// We should have exactly one signal beyond the separator
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unrelated q: right now we have a strict check of at most one signal
IIUC this is being check on both node and runtime sides
if we are to add more signals in the future, how are we going to coordinate the upgrade? via node_features?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I discussed this with @sandreim a while ago. Yes, we'll probably use a new node feature

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, we'll need a feature anyway if we want to introduce a new signal.

if signals_iter.next().is_some() {
return Err(CommittedCandidateReceiptError::TooManyUMPSignals)
}

self.selected_core().and_then(|(core_selector, _cq_offset)| {
let core_index =
**assigned_cores.get(core_selector.0 as usize % assigned_cores.len())?;
Some(core_index)
})
match UMPSignal::decode(&mut core_selector_message.as_slice())
.map_err(|_| CommittedCandidateReceiptError::InvalidSelectedCore)?
{
UMPSignal::SelectCore(core_selector, cq_offset) =>
Ok((Some(core_selector), cq_offset)),
}
} else {
Ok((None, ClaimQueueOffset(DEFAULT_CLAIM_QUEUE_OFFSET)))
}
}
}

/// CandidateReceipt construction errors.
/// CommittedCandidateReceiptError construction errors.
#[derive(PartialEq, Eq, Clone, Encode, Decode, TypeInfo, RuntimeDebug)]
#[cfg_attr(feature = "std", derive(thiserror::Error))]
pub enum CandidateReceiptError {
pub enum CommittedCandidateReceiptError {
/// The specified core index is invalid.
#[cfg_attr(feature = "std", error("The specified core index is invalid"))]
InvalidCoreIndex,
Expand All @@ -479,6 +477,9 @@ pub enum CandidateReceiptError {
/// The core selector or claim queue offset is invalid.
#[cfg_attr(feature = "std", error("The core selector or claim queue offset is invalid"))]
InvalidSelectedCore,
#[cfg_attr(feature = "std", error("Could not decode UMP signal"))]
/// Could not decode UMP signal.
UmpSignalDecode,
/// The parachain is not assigned to any core at specified claim queue offset.
#[cfg_attr(
feature = "std",
Expand All @@ -492,6 +493,10 @@ pub enum CandidateReceiptError {
/// Unknown version.
#[cfg_attr(feature = "std", error("Unknown internal version"))]
UnknownVersion(InternalVersion),
/// The allowed number of `UMPSignal` messages in the queue was exceeded.
/// Currenly only one such message is allowed.
#[cfg_attr(feature = "std", error("Too many UMP signals"))]
TooManyUMPSignals,
}

macro_rules! impl_getter {
Expand Down Expand Up @@ -586,55 +591,67 @@ impl<H: Copy> CommittedCandidateReceiptV2<H> {
/// Checks if descriptor core index is equal to the committed core index.
/// Input `cores_per_para` is a claim queue snapshot stored as a mapping
/// between `ParaId` and the cores assigned per depth.
/// `core_index_enabled` optionally describes the status of the elastic scaling MVP node
alindima marked this conversation as resolved.
Show resolved Hide resolved
/// feature.
pub fn check_core_index(
&self,
cores_per_para: &TransposedClaimQueue,
) -> Result<(), CandidateReceiptError> {
core_index_enabled: Option<bool>,
alindima marked this conversation as resolved.
Show resolved Hide resolved
) -> Result<(), CommittedCandidateReceiptError> {
match self.descriptor.version() {
// Don't check v1 descriptors.
CandidateDescriptorVersion::V1 => return Ok(()),
CandidateDescriptorVersion::V2 => {},
CandidateDescriptorVersion::Unknown =>
return Err(CandidateReceiptError::UnknownVersion(self.descriptor.version)),
}

if cores_per_para.is_empty() {
return Err(CandidateReceiptError::NoAssignment)
return Err(CommittedCandidateReceiptError::UnknownVersion(self.descriptor.version)),
}

let (offset, core_selected) =
if let Some((_core_selector, cq_offset)) = self.commitments.selected_core() {
(cq_offset.0, true)
} else {
// If no core has been selected then we use offset 0 (top of claim queue)
(0, false)
};
let (maybe_core_index_selector, cq_offset) = self.commitments.core_selector()?;

// The cores assigned to the parachain at above computed offset.
let assigned_cores = cores_per_para
.get(&self.descriptor.para_id())
.ok_or(CandidateReceiptError::NoAssignment)?
.get(&offset)
.ok_or(CandidateReceiptError::NoAssignment)?
.into_iter()
.collect::<Vec<_>>();

let core_index = if core_selected {
self.commitments
.committed_core_index(assigned_cores.as_slice())
.ok_or(CandidateReceiptError::NoAssignment)?
} else {
// `SelectCore` commitment is mandatory for elastic scaling parachains.
if assigned_cores.len() > 1 {
return Err(CandidateReceiptError::NoCoreSelected)
}
.ok_or(CommittedCandidateReceiptError::NoAssignment)?
.get(&cq_offset.0)
.ok_or(CommittedCandidateReceiptError::NoAssignment)?;

**assigned_cores.get(0).ok_or(CandidateReceiptError::NoAssignment)?
};
if assigned_cores.is_empty() {
return Err(CommittedCandidateReceiptError::NoAssignment)
}

let descriptor_core_index = CoreIndex(self.descriptor.core_index as u32);

let core_index_selector = if let Some(core_index_selector) = maybe_core_index_selector {
// We have a committed core selector, we can use it.
core_index_selector
} else if assigned_cores.len() > 1 {
// We got more than one assigned core and no core selector. Special care is needed.

alindima marked this conversation as resolved.
Show resolved Hide resolved
match core_index_enabled {
// Elastic scaling MVPMVP feature is not supplied, nothing more to check.
alindima marked this conversation as resolved.
Show resolved Hide resolved
None => return Ok(()),
// Elastic scaling MVP feature is disabled. Error.
Some(false) => return Err(CommittedCandidateReceiptError::NoCoreSelected),
// Elastic scaling MVP feature is enabled but the core index in the descriptor is
// not assigned to the para. Error.
Some(true) if !assigned_cores.contains(&descriptor_core_index) =>
return Err(CommittedCandidateReceiptError::InvalidCoreIndex),
// Elastic scaling MVP feature is enabled and the descriptor core index is indeed
// assigned to the para. This is the most we can check for now.
Some(true) => return Ok(()),
}
} else {
// No core selector but there's only one assigned core, use it.
CoreSelector(0)
};

let core_index = assigned_cores
.iter()
.nth(core_index_selector.0 as usize % assigned_cores.len())
.ok_or(CommittedCandidateReceiptError::InvalidSelectedCore)
.copied()?;

if core_index != descriptor_core_index {
return Err(CandidateReceiptError::CoreIndexMismatch)
return Err(CommittedCandidateReceiptError::CoreIndexMismatch)
}

Ok(())
Expand Down Expand Up @@ -1031,7 +1048,7 @@ mod tests {
assert_eq!(new_ccr.descriptor.version(), CandidateDescriptorVersion::Unknown);
assert_eq!(
new_ccr.check_core_index(&BTreeMap::new()),
Err(CandidateReceiptError::UnknownVersion(InternalVersion(100)))
Err(CommittedCandidateReceiptError::UnknownVersion(InternalVersion(100)))
)
}

Expand Down Expand Up @@ -1097,7 +1114,7 @@ mod tests {

assert_eq!(
new_ccr.check_core_index(&transpose_claim_queue(cq.clone())),
Err(CandidateReceiptError::NoCoreSelected)
Err(CommittedCandidateReceiptError::NoCoreSelected)
);

new_ccr.commitments.upward_messages.clear();
Expand Down Expand Up @@ -1225,7 +1242,7 @@ mod tests {
// Should fail because 2 cores are assigned,
assert_eq!(
new_ccr.check_core_index(&transpose_claim_queue(cq)),
Err(CandidateReceiptError::NoCoreSelected)
Err(CommittedCandidateReceiptError::NoCoreSelected)
);

// Adding collator signature should make it decode as v1.
Expand Down
38 changes: 6 additions & 32 deletions polkadot/runtime/parachains/src/inclusion/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ use polkadot_primitives::{
vstaging::{
BackedCandidate, CandidateDescriptorV2 as CandidateDescriptor,
CandidateReceiptV2 as CandidateReceipt,
CommittedCandidateReceiptV2 as CommittedCandidateReceipt,
CommittedCandidateReceiptV2 as CommittedCandidateReceipt, UMP_SEPARATOR,
},
well_known_keys, CandidateCommitments, CandidateHash, CoreIndex, GroupIndex, HeadData,
Id as ParaId, SignedAvailabilityBitfields, SigningContext, UpwardMessage, ValidatorId,
Expand Down Expand Up @@ -412,11 +412,6 @@ pub(crate) enum UmpAcceptanceCheckErr {
TotalSizeExceeded { total_size: u64, limit: u64 },
/// A para-chain cannot send UMP messages while it is offboarding.
IsOffboarding,
/// The allowed number of `UMPSignal` messages in the queue was exceeded.
/// Currenly only one such message is allowed.
TooManyUMPSignals { count: u32 },
/// The UMP queue contains an invalid `UMPSignal`
NoUmpSignal,
}

impl fmt::Debug for UmpAcceptanceCheckErr {
Expand Down Expand Up @@ -445,12 +440,6 @@ impl fmt::Debug for UmpAcceptanceCheckErr {
UmpAcceptanceCheckErr::IsOffboarding => {
write!(fmt, "upward message rejected because the para is off-boarding")
},
UmpAcceptanceCheckErr::TooManyUMPSignals { count } => {
write!(fmt, "the ump queue has too many `UMPSignal` messages ({} > 1 )", count)
},
UmpAcceptanceCheckErr::NoUmpSignal => {
write!(fmt, "Required UMP signal not found")
},
}
}
}
Expand Down Expand Up @@ -925,25 +914,10 @@ impl<T: Config> Pallet<T> {
upward_messages: &[UpwardMessage],
) -> Result<(), UmpAcceptanceCheckErr> {
// Filter any pending UMP signals and the separator.
let upward_messages = if let Some(separator_index) =
upward_messages.iter().position(|message| message.is_empty())
{
let (upward_messages, ump_signals) = upward_messages.split_at(separator_index);

if ump_signals.len() > 2 {
return Err(UmpAcceptanceCheckErr::TooManyUMPSignals {
count: ump_signals.len() as u32,
})
}

if ump_signals.len() == 1 {
return Err(UmpAcceptanceCheckErr::NoUmpSignal)
}

upward_messages
} else {
upward_messages
};
let upward_messages = upward_messages
.iter()
.take_while(|message| message != &&UMP_SEPARATOR)
.collect::<Vec<_>>();

// Cannot send UMP messages while off-boarding.
if paras::Pallet::<T>::is_offboarding(para) {
Expand Down Expand Up @@ -1000,7 +974,7 @@ impl<T: Config> Pallet<T> {
let bounded = upward_messages
.iter()
// Stop once we hit the `UMPSignal` separator.
.take_while(|message| !message.is_empty())
.take_while(|message| message != &&UMP_SEPARATOR)
.filter_map(|d| {
BoundedSlice::try_from(&d[..])
.inspect_err(|_| {
Expand Down
Loading
Loading