Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor and harden check_core_index #6217

Merged
merged 16 commits into from
Nov 5, 2024
Merged
8 changes: 3 additions & 5 deletions cumulus/client/consensus/aura/src/collators/lookahead.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,17 +36,15 @@ use cumulus_client_collator::service::ServiceInterface as CollatorServiceInterfa
use cumulus_client_consensus_common::{self as consensus_common, ParachainBlockImportMarker};
use cumulus_client_consensus_proposer::ProposerInterface;
use cumulus_primitives_aura::AuraUnincludedSegmentApi;
use cumulus_primitives_core::{
ClaimQueueOffset, CollectCollationInfo, PersistedValidationData, DEFAULT_CLAIM_QUEUE_OFFSET,
};
use cumulus_primitives_core::{ClaimQueueOffset, CollectCollationInfo, PersistedValidationData};
use cumulus_relay_chain_interface::RelayChainInterface;

use polkadot_node_primitives::{PoV, SubmitCollationParams};
use polkadot_node_subsystem::messages::CollationGenerationMessage;
use polkadot_overseer::Handle as OverseerHandle;
use polkadot_primitives::{
BlockNumber as RBlockNumber, CollatorPair, Hash as RHash, HeadData, Id as ParaId,
OccupiedCoreAssumption,
vstaging::DEFAULT_CLAIM_QUEUE_OFFSET, BlockNumber as RBlockNumber, CollatorPair, Hash as RHash,
HeadData, Id as ParaId, OccupiedCoreAssumption,
};

use futures::prelude::*;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,11 @@ use cumulus_client_collator::service::ServiceInterface as CollatorServiceInterfa
use cumulus_client_consensus_common::{self as consensus_common, ParachainBlockImportMarker};
use cumulus_client_consensus_proposer::ProposerInterface;
use cumulus_primitives_aura::AuraUnincludedSegmentApi;
use cumulus_primitives_core::{
GetCoreSelectorApi, PersistedValidationData, DEFAULT_CLAIM_QUEUE_OFFSET,
};
use cumulus_primitives_core::{GetCoreSelectorApi, PersistedValidationData};
use cumulus_relay_chain_interface::RelayChainInterface;

use polkadot_primitives::{
vstaging::{ClaimQueueOffset, CoreSelector},
vstaging::{ClaimQueueOffset, CoreSelector, DEFAULT_CLAIM_QUEUE_OFFSET},
BlockId, CoreIndex, Hash as RelayHash, Header as RelayHeader, Id as ParaId,
OccupiedCoreAssumption,
};
Expand Down
4 changes: 2 additions & 2 deletions cumulus/pallets/parachain-system/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,12 @@ use core::{cmp, marker::PhantomData};
use cumulus_primitives_core::{
relay_chain::{
self,
vstaging::{ClaimQueueOffset, CoreSelector},
vstaging::{ClaimQueueOffset, CoreSelector, DEFAULT_CLAIM_QUEUE_OFFSET},
},
AbridgedHostConfiguration, ChannelInfo, ChannelStatus, CollationInfo, GetChannelInfo,
InboundDownwardMessage, InboundHrmpMessage, ListChannelInfos, MessageSendError,
OutboundHrmpMessage, ParaId, PersistedValidationData, UpwardMessage, UpwardMessageSender,
XcmpMessageHandler, XcmpMessageSource, DEFAULT_CLAIM_QUEUE_OFFSET,
XcmpMessageHandler, XcmpMessageSource,
};
use cumulus_primitives_parachain_inherent::{MessageQueueChain, ParachainInherentData};
use frame_support::{
Expand Down
4 changes: 0 additions & 4 deletions cumulus/primitives/core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -333,10 +333,6 @@ pub mod rpsr_digest {
}
}

/// The default claim queue offset to be used if it's not configured/accessible in the parachain
/// runtime
pub const DEFAULT_CLAIM_QUEUE_OFFSET: u8 = 0;

/// Information about a collation.
///
/// This was used in version 1 of the [`CollectCollationInfo`] runtime api.
Expand Down
4 changes: 2 additions & 2 deletions polkadot/node/collation-generation/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.

use polkadot_primitives::vstaging::CandidateReceiptError;
use polkadot_primitives::vstaging::CommittedCandidateReceiptError;
use thiserror::Error;

#[derive(Debug, Error)]
Expand All @@ -34,7 +34,7 @@ pub enum Error {
#[error("Collation submitted before initialization")]
SubmittedBeforeInit,
#[error("V2 core index check failed: {0}")]
CandidateReceiptCheck(CandidateReceiptError),
CandidateReceiptCheck(CommittedCandidateReceiptError),
#[error("PoV size {0} exceeded maximum size of {1}")]
POVSizeExceeded(usize, usize),
}
Expand Down
2 changes: 1 addition & 1 deletion polkadot/node/core/candidate-validation/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -860,7 +860,7 @@ async fn validate_candidate_exhaustive(
};

if let Err(err) = committed_candidate_receipt
.check_core_index(&transpose_claim_queue(claim_queue.0))
.check_core_index(&transpose_claim_queue(claim_queue.0), None)
{
gum::warn!(
target: LOG_TARGET,
Expand Down
15 changes: 5 additions & 10 deletions polkadot/node/subsystem-util/src/inclusion_emulator/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,9 +82,9 @@
/// in practice at most once every few weeks.
use polkadot_node_subsystem::messages::HypotheticalCandidate;
use polkadot_primitives::{
async_backing::Constraints as PrimitiveConstraints, BlockNumber, CandidateCommitments,
CandidateHash, Hash, HeadData, Id as ParaId, PersistedValidationData, UpgradeRestriction,
ValidationCodeHash,
async_backing::Constraints as PrimitiveConstraints, vstaging::skip_ump_signals, BlockNumber,
CandidateCommitments, CandidateHash, Hash, HeadData, Id as ParaId, PersistedValidationData,
UpgradeRestriction, ValidationCodeHash,
};
use std::{collections::HashMap, sync::Arc};

Expand Down Expand Up @@ -601,13 +601,8 @@ impl Fragment {
persisted_validation_data: &PersistedValidationData,
) -> Result<ConstraintModifications, FragmentValidityError> {
// Filter UMP signals and the separator.
let upward_messages = if let Some(separator_index) =
commitments.upward_messages.iter().position(|message| message.is_empty())
{
commitments.upward_messages.split_at(separator_index).0
} else {
&commitments.upward_messages
};
let upward_messages =
skip_ump_signals(commitments.upward_messages.iter()).collect::<Vec<_>>();

let ump_messages_sent = upward_messages.len();
let ump_bytes_sent = upward_messages.iter().map(|msg| msg.len()).sum();
Expand Down
163 changes: 93 additions & 70 deletions polkadot/primitives/src/vstaging/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,10 @@ use sp_staking::SessionIndex;
/// Async backing primitives
pub mod async_backing;

/// The default claim queue offset to be used if it's not configured/accessible in the parachain
/// runtime
pub const DEFAULT_CLAIM_QUEUE_OFFSET: u8 = 0;

/// A type representing the version of the candidate descriptor and internal version number.
#[derive(PartialEq, Eq, Encode, Decode, Clone, TypeInfo, RuntimeDebug, Copy)]
#[cfg_attr(feature = "std", derive(Hash))]
Expand Down Expand Up @@ -424,49 +428,46 @@ pub enum UMPSignal {
/// Separator between `XCM` and `UMPSignal`.
pub const UMP_SEPARATOR: Vec<u8> = vec![];

impl CandidateCommitments {
/// Returns the core selector and claim queue offset the candidate has committed to, if any.
pub fn selected_core(&self) -> Option<(CoreSelector, ClaimQueueOffset)> {
// We need at least 2 messages for the separator and core selector
if self.upward_messages.len() < 2 {
return None
}

let separator_pos =
self.upward_messages.iter().rposition(|message| message == &UMP_SEPARATOR)?;

// Use first commitment
let message = self.upward_messages.get(separator_pos + 1)?;
/// Utility function for skipping the ump signals.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any good reason not to put it inside CandidateCommitments ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, in the runtime we use this function after taking the upward messages from the commitments struct

pub fn skip_ump_signals<'a>(
upward_messages: impl Iterator<Item = &'a Vec<u8>>,
) -> impl Iterator<Item = &'a Vec<u8>> {
upward_messages.take_while(|message| *message != &UMP_SEPARATOR)
}

match UMPSignal::decode(&mut message.as_slice()).ok()? {
UMPSignal::SelectCore(core_selector, cq_offset) => Some((core_selector, cq_offset)),
}
}
impl CandidateCommitments {
/// Returns the core selector and claim queue offset determined by `UMPSignal::SelectCore`
/// commitment, if present.
pub fn core_selector(
&self,
) -> Result<Option<(CoreSelector, ClaimQueueOffset)>, CommittedCandidateReceiptError> {
let mut signals_iter =
self.upward_messages.iter().skip_while(|message| *message != &UMP_SEPARATOR);

if signals_iter.next().is_some() {
let core_selector_message =
signals_iter.next().ok_or(CommittedCandidateReceiptError::NoCoreSelected)?;
// We should have exactly one signal beyond the separator
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unrelated q: right now we have a strict check of at most one signal
IIUC this is being check on both node and runtime sides
if we are to add more signals in the future, how are we going to coordinate the upgrade? via node_features?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I discussed this with @sandreim a while ago. Yes, we'll probably use a new node feature

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, we'll need a feature anyway if we want to introduce a new signal.

if signals_iter.next().is_some() {
return Err(CommittedCandidateReceiptError::TooManyUMPSignals)
}

/// Returns the core index determined by `UMPSignal::SelectCore` commitment
/// and `assigned_cores`.
///
/// Returns `None` if there is no `UMPSignal::SelectCore` commitment or
/// assigned cores is empty.
///
/// `assigned_cores` must be a sorted vec of all core indices assigned to a parachain.
pub fn committed_core_index(&self, assigned_cores: &[&CoreIndex]) -> Option<CoreIndex> {
if assigned_cores.is_empty() {
return None
match UMPSignal::decode(&mut core_selector_message.as_slice())
.map_err(|_| CommittedCandidateReceiptError::InvalidSelectedCore)?
{
UMPSignal::SelectCore(core_index_selector, cq_offset) =>
Ok(Some((core_index_selector, cq_offset))),
}
} else {
Ok(None)
}

self.selected_core().and_then(|(core_selector, _cq_offset)| {
let core_index =
**assigned_cores.get(core_selector.0 as usize % assigned_cores.len())?;
Some(core_index)
})
}
}

/// CandidateReceipt construction errors.
/// CommittedCandidateReceiptError construction errors.
#[derive(PartialEq, Eq, Clone, Encode, Decode, TypeInfo, RuntimeDebug)]
#[cfg_attr(feature = "std", derive(thiserror::Error))]
pub enum CandidateReceiptError {
pub enum CommittedCandidateReceiptError {
/// The specified core index is invalid.
#[cfg_attr(feature = "std", error("The specified core index is invalid"))]
InvalidCoreIndex,
Expand All @@ -479,6 +480,9 @@ pub enum CandidateReceiptError {
/// The core selector or claim queue offset is invalid.
#[cfg_attr(feature = "std", error("The core selector or claim queue offset is invalid"))]
InvalidSelectedCore,
#[cfg_attr(feature = "std", error("Could not decode UMP signal"))]
/// Could not decode UMP signal.
UmpSignalDecode,
/// The parachain is not assigned to any core at specified claim queue offset.
#[cfg_attr(
feature = "std",
Expand All @@ -492,6 +496,10 @@ pub enum CandidateReceiptError {
/// Unknown version.
#[cfg_attr(feature = "std", error("Unknown internal version"))]
UnknownVersion(InternalVersion),
/// The allowed number of `UMPSignal` messages in the queue was exceeded.
/// Currenly only one such message is allowed.
#[cfg_attr(feature = "std", error("Too many UMP signals"))]
TooManyUMPSignals,
}

macro_rules! impl_getter {
Expand Down Expand Up @@ -586,55 +594,70 @@ impl<H: Copy> CommittedCandidateReceiptV2<H> {
/// Checks if descriptor core index is equal to the committed core index.
/// Input `cores_per_para` is a claim queue snapshot stored as a mapping
/// between `ParaId` and the cores assigned per depth.
/// `core_index_enabled` optionally describes the status of the elastic scaling MVP node
alindima marked this conversation as resolved.
Show resolved Hide resolved
/// feature.
pub fn check_core_index(
&self,
cores_per_para: &TransposedClaimQueue,
) -> Result<(), CandidateReceiptError> {
core_index_enabled: Option<bool>,
alindima marked this conversation as resolved.
Show resolved Hide resolved
) -> Result<(), CommittedCandidateReceiptError> {
match self.descriptor.version() {
// Don't check v1 descriptors.
CandidateDescriptorVersion::V1 => return Ok(()),
CandidateDescriptorVersion::V2 => {},
CandidateDescriptorVersion::Unknown =>
return Err(CandidateReceiptError::UnknownVersion(self.descriptor.version)),
}

if cores_per_para.is_empty() {
return Err(CandidateReceiptError::NoAssignment)
return Err(CommittedCandidateReceiptError::UnknownVersion(self.descriptor.version)),
}

let (offset, core_selected) =
if let Some((_core_selector, cq_offset)) = self.commitments.selected_core() {
(cq_offset.0, true)
} else {
// If no core has been selected then we use offset 0 (top of claim queue)
(0, false)
};
let (maybe_core_index_selector, cq_offset) = self.commitments.core_selector()?.map_or_else(
|| (None, ClaimQueueOffset(DEFAULT_CLAIM_QUEUE_OFFSET)),
|(sel, off)| (Some(sel), off),
);

// The cores assigned to the parachain at above computed offset.
let assigned_cores = cores_per_para
.get(&self.descriptor.para_id())
.ok_or(CandidateReceiptError::NoAssignment)?
.get(&offset)
.ok_or(CandidateReceiptError::NoAssignment)?
.into_iter()
.collect::<Vec<_>>();

let core_index = if core_selected {
self.commitments
.committed_core_index(assigned_cores.as_slice())
.ok_or(CandidateReceiptError::NoAssignment)?
} else {
// `SelectCore` commitment is mandatory for elastic scaling parachains.
if assigned_cores.len() > 1 {
return Err(CandidateReceiptError::NoCoreSelected)
}
.ok_or(CommittedCandidateReceiptError::NoAssignment)?
.get(&cq_offset.0)
.ok_or(CommittedCandidateReceiptError::NoAssignment)?;

**assigned_cores.get(0).ok_or(CandidateReceiptError::NoAssignment)?
};
if assigned_cores.is_empty() {
return Err(CommittedCandidateReceiptError::NoAssignment)
}

let descriptor_core_index = CoreIndex(self.descriptor.core_index as u32);

let core_index_selector = if let Some(core_index_selector) = maybe_core_index_selector {
// We have a committed core selector, we can use it.
core_index_selector
} else if assigned_cores.len() > 1 {
// We got more than one assigned core and no core selector. Special care is needed.

alindima marked this conversation as resolved.
Show resolved Hide resolved
match core_index_enabled {
// Elastic scaling MVP feature is not supplied, nothing more to check.
None => return Ok(()),
// Elastic scaling MVP feature is disabled. Error.
Some(false) => return Err(CommittedCandidateReceiptError::NoCoreSelected),
// Elastic scaling MVP feature is enabled but the core index in the descriptor is
// not assigned to the para. Error.
Some(true) if !assigned_cores.contains(&descriptor_core_index) =>
return Err(CommittedCandidateReceiptError::InvalidCoreIndex),
// Elastic scaling MVP feature is enabled and the descriptor core index is indeed
// assigned to the para. This is the most we can check for now.
Some(true) => return Ok(()),
}
} else {
// No core selector but there's only one assigned core, use it.
CoreSelector(0)
};

let core_index = assigned_cores
.iter()
.nth(core_index_selector.0 as usize % assigned_cores.len())
.ok_or(CommittedCandidateReceiptError::InvalidSelectedCore)
.copied()?;

if core_index != descriptor_core_index {
return Err(CandidateReceiptError::CoreIndexMismatch)
return Err(CommittedCandidateReceiptError::CoreIndexMismatch)
}

Ok(())
Expand Down Expand Up @@ -1031,7 +1054,7 @@ mod tests {
assert_eq!(new_ccr.descriptor.version(), CandidateDescriptorVersion::Unknown);
assert_eq!(
new_ccr.check_core_index(&BTreeMap::new()),
Err(CandidateReceiptError::UnknownVersion(InternalVersion(100)))
Err(CommittedCandidateReceiptError::UnknownVersion(InternalVersion(100)))
)
}

Expand Down Expand Up @@ -1097,7 +1120,7 @@ mod tests {

assert_eq!(
new_ccr.check_core_index(&transpose_claim_queue(cq.clone())),
Err(CandidateReceiptError::NoCoreSelected)
Err(CommittedCandidateReceiptError::NoCoreSelected)
);

new_ccr.commitments.upward_messages.clear();
Expand Down Expand Up @@ -1225,7 +1248,7 @@ mod tests {
// Should fail because 2 cores are assigned,
assert_eq!(
new_ccr.check_core_index(&transpose_claim_queue(cq)),
Err(CandidateReceiptError::NoCoreSelected)
Err(CommittedCandidateReceiptError::NoCoreSelected)
);

// Adding collator signature should make it decode as v1.
Expand Down
Loading
Loading