Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 38 additions & 16 deletions cumulus/pallets/parachain-system/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ pub use cumulus_pallet_parachain_system_proc_macro::register_validate_block;
pub use relay_state_snapshot::{MessagingStateSnapshot, RelayChainStateProof};
pub use unincluded_segment::{Ancestor, UsedBandwidth};

use crate::parachain_inherent::AbridgedInboundMessagesSizeInfo;
pub use pallet::*;

const LOG_TARGET: &str = "parachain-system";
Expand Down Expand Up @@ -1164,7 +1165,7 @@ impl<T: Config> Pallet<T> {
expected_dmq_mqc_head: relay_chain::Hash,
downward_messages: AbridgedInboundDownwardMessages,
) -> Weight {
downward_messages.check_enough_messages_included("DMQ");
downward_messages.check_enough_messages_included_basic("DMQ");

let mut dmq_head = <LastDmqMqcHead<T>>::get();

Expand Down Expand Up @@ -1210,6 +1211,25 @@ impl<T: Config> Pallet<T> {
weight_used
}

fn get_ingress_channel_or_panic(
ingress_channels: &[(ParaId, cumulus_primitives_core::AbridgedHrmpChannel)],
sender: ParaId,
) -> &cumulus_primitives_core::AbridgedHrmpChannel {
let maybe_channel_idx = ingress_channels
.binary_search_by_key(&sender, |&(channel_sender, _)| channel_sender)
.ok();
let maybe_channel = maybe_channel_idx
.and_then(|channel_idx| ingress_channels.get(channel_idx))
.map(|(_, channel)| channel);
maybe_channel.unwrap_or_else(|| {
panic!(
"One of the messages submitted by the collator was sent from a sender ({}) \
that doesn't have a channel opened to this parachain",
<ParaId as Into<u32>>::into(sender)
)
})
}

fn check_hrmp_mcq_heads(
ingress_channels: &[(ParaId, cumulus_primitives_core::AbridgedHrmpChannel)],
mqc_heads: &mut BTreeMap<ParaId, MessageQueueChain>,
Expand Down Expand Up @@ -1243,17 +1263,8 @@ impl<T: Config> Pallet<T> {
}
*maybe_prev_msg_metadata = Some(msg_metadata);

// Check that the message is sent from an existing channel. The channel exists
// if its MQC head is present in `vfp.hrmp_mqc_heads`.
let sender = msg_metadata.1;
let maybe_channel_idx =
ingress_channels.binary_search_by_key(&sender, |&(channel_sender, _)| channel_sender);
assert!(
maybe_channel_idx.is_ok(),
"One of the messages submitted by the collator was sent from a sender ({}) \
that doesn't have a channel opened to this parachain",
<ParaId as Into<u32>>::into(sender)
);
// Check that the message is sent from an existing channel.
Self::get_ingress_channel_or_panic(ingress_channels, msg_metadata.1);
}

/// Process all inbound horizontal messages relayed by the collator.
Expand All @@ -1271,11 +1282,22 @@ impl<T: Config> Pallet<T> {
horizontal_messages: AbridgedInboundHrmpMessages,
relay_parent_number: relay_chain::BlockNumber,
) -> Weight {
// First, check the HRMP advancement rule.
horizontal_messages.check_enough_messages_included("HRMP");

let (messages, hashed_messages) = horizontal_messages.messages();
let mut mqc_heads = <LastHrmpMqcHeads<T>>::get();
let (messages, hashed_messages) = horizontal_messages.messages();

// First, check the HRMP advancement rule.
let maybe_first_hashed_msg_sender = hashed_messages.first().map(|(sender, _msg)| *sender);
if let Some(first_hashed_msg_sender) = maybe_first_hashed_msg_sender {
let channel =
Self::get_ingress_channel_or_panic(ingress_channels, first_hashed_msg_sender);
horizontal_messages.check_enough_messages_included_advanced(
"HRMP",
AbridgedInboundMessagesSizeInfo {
max_full_messages_size: Self::messages_collection_size_limit(),
first_hashed_msg_max_size: channel.max_message_size as usize,
},
);
}

Self::prune_closed_mqc_heads(ingress_channels, &mut mqc_heads);

Expand Down
100 changes: 81 additions & 19 deletions cumulus/pallets/parachain-system/src/parachain_inherent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,14 @@ impl<Message: InboundMessage> InboundMessagesCollection<Message> {
}
}

/// A struct containing some info about the expected size of the abridged inbound messages.
pub struct AbridgedInboundMessagesSizeInfo {
/// The max size of the full messages collection
pub max_full_messages_size: usize,
/// The max size of the first hashed message
pub first_hashed_msg_max_size: usize,
}

/// A compressed collection of inbound messages.
///
/// The first messages in the collection (up to a limit) contain the full message data.
Expand All @@ -161,29 +169,51 @@ impl<Message: InboundMessage> AbridgedInboundMessagesCollection<Message> {
(&self.full_messages, &self.hashed_messages)
}

/// Check that the current collection contains as many full messages as possible.
/// Check that the current collection contains at least 1 full message if needed.
pub fn check_enough_messages_included_basic(&self, collection_name: &str) {
if self.hashed_messages.is_empty() {
return;
}

// Here we just check that there is at least 1 full message.
assert!(
self.full_messages.len() >= 1,
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So the case that already the first message is too big can never happen, right?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Very good point !

With the current Polkadot and Kusama configs it won't happen. But with a misconfiguration it probably could. Anyway this check is only used for DMP and the problem there is that we don't know the max message size in order to do a proper check. This will be fixed when we migrate DMP to check_enough_messages_included_advanced as well. But in order to do this we need to also forward the max dmp message size to the parachain in the metadata. It would be great if we could do that in a separate PR and merge this one as it is. WDYT ?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea we can do it separate

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So the case that already the first message is too big can never happen, right?

If this would happen, we could never move forward. I see this not as an issue. Yes, could be a misconfiguration, but then you can not receive messages anymore.

"[{}] Advancement rule violation: full messages missing",
collection_name,
);
}

/// Check that the current collection contains as many full messages as possible, taking into
/// consideration the collection constraints.
///
/// The `AbridgedInboundMessagesCollection` is provided to the runtime by a collator.
/// A malicious collator can provide a collection that contains no full messages or fewer
/// full messages than possible, leading to censorship.
pub fn check_enough_messages_included(&self, collection_name: &str) {
if self.hashed_messages.is_empty() {
return;
pub fn check_enough_messages_included_advanced(
&self,
collection_name: &str,
size_info: AbridgedInboundMessagesSizeInfo,
) {
// We should check that the collection contains as many full messages as possible
// without exceeding the max expected size.
let AbridgedInboundMessagesSizeInfo { max_full_messages_size, first_hashed_msg_max_size } =
size_info;

let mut full_messages_size = 0usize;
for msg in &self.full_messages {
full_messages_size = full_messages_size.saturating_add(msg.data().len());
}

// Ideally, we should check that the collection contains as many full messages as possible
// without exceeding the max expected size. The worst case scenario is that were the first
// message that had to be hashed is a max size message. So in this case, the min expected
// size would be `max_expected_size - max_msg_size`. However, there are multiple issues:
// 1. The max message size config can change while we still have to process messages with
// the old max message size.
// 2. We can't access the max downward message size from the parachain runtime.
//
// So the safest approach is to check that there is at least 1 full message.
// The worst case scenario is that were the first message that had to be hashed
// is a max size message.
assert!(
self.full_messages.len() >= 1,
"[{}] Advancement rule violation: mandatory messages missing",
full_messages_size.saturating_add(first_hashed_msg_max_size) > max_full_messages_size,
"[{}] Advancement rule violation: full messages size smaller than expected. \
full msgs size: {}, first hashed msg max size: {}, max full msgs size: {}",
collection_name,
full_messages_size,
first_hashed_msg_max_size,
max_full_messages_size
);
}
}
Expand Down Expand Up @@ -481,7 +511,7 @@ mod tests {
}

#[test]
fn check_enough_messages_included_works() {
fn check_enough_messages_included_basic_works() {
let mut messages = AbridgedInboundHrmpMessages {
full_messages: vec![(
1000.into(),
Expand All @@ -493,13 +523,45 @@ mod tests {
)],
};

messages.check_enough_messages_included("Test");
messages.check_enough_messages_included_basic("Test");

messages.full_messages = vec![];
let result = std::panic::catch_unwind(|| messages.check_enough_messages_included("Test"));
let result =
std::panic::catch_unwind(|| messages.check_enough_messages_included_basic("Test"));
assert!(result.is_err());

messages.hashed_messages = vec![];
messages.check_enough_messages_included("Test");
messages.check_enough_messages_included_basic("Test");
}

#[test]
fn check_enough_messages_included_advanced_works() {
let mixed_messages = AbridgedInboundHrmpMessages {
full_messages: vec![(
1000.into(),
InboundHrmpMessage { sent_at: 0, data: vec![1; 50] },
)],
hashed_messages: vec![(
2000.into(),
HashedMessage { sent_at: 1, msg_hash: Default::default() },
)],
};
let result = std::panic::catch_unwind(|| {
mixed_messages.check_enough_messages_included_advanced(
"Test",
AbridgedInboundMessagesSizeInfo {
max_full_messages_size: 100,
first_hashed_msg_max_size: 50,
},
)
});
assert!(result.is_err());
mixed_messages.check_enough_messages_included_advanced(
"Test",
AbridgedInboundMessagesSizeInfo {
max_full_messages_size: 100,
first_hashed_msg_max_size: 51,
},
);
}
}
5 changes: 3 additions & 2 deletions cumulus/pallets/parachain-system/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -415,8 +415,9 @@ fn inherent_messages_are_compressed() {
sproof.dmq_mqc_head = Some(dmp_mqc.head());

for (sender, msg) in &hrmp_msgs_clone {
let mqc_head =
sproof.upsert_inbound_channel(*sender).mqc_head.get_or_insert_default();
let channel = sproof.upsert_inbound_channel(*sender);
channel.max_message_size = 100 * 1024;
let mqc_head = channel.mqc_head.get_or_insert_default();
let mut mqc = MessageQueueChain::new(*mqc_head);
mqc.extend_hrmp(msg);
*mqc_head = mqc.head();
Expand Down
8 changes: 8 additions & 0 deletions prdoc/pr_9086.prdoc
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
title: Make HRMP advancement rule more restrictive
doc:
- audience: Runtime Dev
description: |-
This PR improves `check_enough_messages_included()` and makes the advancement rule more restrictive for HRMP.
crates:
- name: cumulus-pallet-parachain-system
bump: major
Loading