diff --git a/cumulus/pallets/parachain-system/src/lib.rs b/cumulus/pallets/parachain-system/src/lib.rs index 12bda71dbdcca..e85b4dab61fce 100644 --- a/cumulus/pallets/parachain-system/src/lib.rs +++ b/cumulus/pallets/parachain-system/src/lib.rs @@ -109,6 +109,7 @@ pub use cumulus_pallet_parachain_system_proc_macro::register_validate_block; pub use relay_state_snapshot::{MessagingStateSnapshot, RelayChainStateProof}; pub use unincluded_segment::{Ancestor, UsedBandwidth}; +use crate::parachain_inherent::AbridgedInboundMessagesSizeInfo; pub use pallet::*; const LOG_TARGET: &str = "parachain-system"; @@ -1164,7 +1165,7 @@ impl Pallet { expected_dmq_mqc_head: relay_chain::Hash, downward_messages: AbridgedInboundDownwardMessages, ) -> Weight { - downward_messages.check_enough_messages_included("DMQ"); + downward_messages.check_enough_messages_included_basic("DMQ"); let mut dmq_head = >::get(); @@ -1210,6 +1211,25 @@ impl Pallet { weight_used } + fn get_ingress_channel_or_panic( + ingress_channels: &[(ParaId, cumulus_primitives_core::AbridgedHrmpChannel)], + sender: ParaId, + ) -> &cumulus_primitives_core::AbridgedHrmpChannel { + let maybe_channel_idx = ingress_channels + .binary_search_by_key(&sender, |&(channel_sender, _)| channel_sender) + .ok(); + let maybe_channel = maybe_channel_idx + .and_then(|channel_idx| ingress_channels.get(channel_idx)) + .map(|(_, channel)| channel); + maybe_channel.unwrap_or_else(|| { + panic!( + "One of the messages submitted by the collator was sent from a sender ({}) \ + that doesn't have a channel opened to this parachain", + >::into(sender) + ) + }) + } + fn check_hrmp_mcq_heads( ingress_channels: &[(ParaId, cumulus_primitives_core::AbridgedHrmpChannel)], mqc_heads: &mut BTreeMap, @@ -1243,17 +1263,8 @@ impl Pallet { } *maybe_prev_msg_metadata = Some(msg_metadata); - // Check that the message is sent from an existing channel. The channel exists - // if its MQC head is present in `vfp.hrmp_mqc_heads`. - let sender = msg_metadata.1; - let maybe_channel_idx = - ingress_channels.binary_search_by_key(&sender, |&(channel_sender, _)| channel_sender); - assert!( - maybe_channel_idx.is_ok(), - "One of the messages submitted by the collator was sent from a sender ({}) \ - that doesn't have a channel opened to this parachain", - >::into(sender) - ); + // Check that the message is sent from an existing channel. + Self::get_ingress_channel_or_panic(ingress_channels, msg_metadata.1); } /// Process all inbound horizontal messages relayed by the collator. @@ -1271,11 +1282,22 @@ impl Pallet { horizontal_messages: AbridgedInboundHrmpMessages, relay_parent_number: relay_chain::BlockNumber, ) -> Weight { - // First, check the HRMP advancement rule. - horizontal_messages.check_enough_messages_included("HRMP"); - - let (messages, hashed_messages) = horizontal_messages.messages(); let mut mqc_heads = >::get(); + let (messages, hashed_messages) = horizontal_messages.messages(); + + // First, check the HRMP advancement rule. + let maybe_first_hashed_msg_sender = hashed_messages.first().map(|(sender, _msg)| *sender); + if let Some(first_hashed_msg_sender) = maybe_first_hashed_msg_sender { + let channel = + Self::get_ingress_channel_or_panic(ingress_channels, first_hashed_msg_sender); + horizontal_messages.check_enough_messages_included_advanced( + "HRMP", + AbridgedInboundMessagesSizeInfo { + max_full_messages_size: Self::messages_collection_size_limit(), + first_hashed_msg_max_size: channel.max_message_size as usize, + }, + ); + } Self::prune_closed_mqc_heads(ingress_channels, &mut mqc_heads); diff --git a/cumulus/pallets/parachain-system/src/parachain_inherent.rs b/cumulus/pallets/parachain-system/src/parachain_inherent.rs index 3dd089a47fe5c..348f1e3a1ff5e 100644 --- a/cumulus/pallets/parachain-system/src/parachain_inherent.rs +++ b/cumulus/pallets/parachain-system/src/parachain_inherent.rs @@ -142,6 +142,14 @@ impl InboundMessagesCollection { } } +/// A struct containing some info about the expected size of the abridged inbound messages. +pub struct AbridgedInboundMessagesSizeInfo { + /// The max size of the full messages collection + pub max_full_messages_size: usize, + /// The max size of the first hashed message + pub first_hashed_msg_max_size: usize, +} + /// A compressed collection of inbound messages. /// /// The first messages in the collection (up to a limit) contain the full message data. @@ -161,29 +169,51 @@ impl AbridgedInboundMessagesCollection { (&self.full_messages, &self.hashed_messages) } - /// Check that the current collection contains as many full messages as possible. + /// Check that the current collection contains at least 1 full message if needed. + pub fn check_enough_messages_included_basic(&self, collection_name: &str) { + if self.hashed_messages.is_empty() { + return; + } + + // Here we just check that there is at least 1 full message. + assert!( + self.full_messages.len() >= 1, + "[{}] Advancement rule violation: full messages missing", + collection_name, + ); + } + + /// Check that the current collection contains as many full messages as possible, taking into + /// consideration the collection constraints. /// /// The `AbridgedInboundMessagesCollection` is provided to the runtime by a collator. /// A malicious collator can provide a collection that contains no full messages or fewer /// full messages than possible, leading to censorship. - pub fn check_enough_messages_included(&self, collection_name: &str) { - if self.hashed_messages.is_empty() { - return; + pub fn check_enough_messages_included_advanced( + &self, + collection_name: &str, + size_info: AbridgedInboundMessagesSizeInfo, + ) { + // We should check that the collection contains as many full messages as possible + // without exceeding the max expected size. + let AbridgedInboundMessagesSizeInfo { max_full_messages_size, first_hashed_msg_max_size } = + size_info; + + let mut full_messages_size = 0usize; + for msg in &self.full_messages { + full_messages_size = full_messages_size.saturating_add(msg.data().len()); } - // Ideally, we should check that the collection contains as many full messages as possible - // without exceeding the max expected size. The worst case scenario is that were the first - // message that had to be hashed is a max size message. So in this case, the min expected - // size would be `max_expected_size - max_msg_size`. However, there are multiple issues: - // 1. The max message size config can change while we still have to process messages with - // the old max message size. - // 2. We can't access the max downward message size from the parachain runtime. - // - // So the safest approach is to check that there is at least 1 full message. + // The worst case scenario is that were the first message that had to be hashed + // is a max size message. assert!( - self.full_messages.len() >= 1, - "[{}] Advancement rule violation: mandatory messages missing", + full_messages_size.saturating_add(first_hashed_msg_max_size) > max_full_messages_size, + "[{}] Advancement rule violation: full messages size smaller than expected. \ + full msgs size: {}, first hashed msg max size: {}, max full msgs size: {}", collection_name, + full_messages_size, + first_hashed_msg_max_size, + max_full_messages_size ); } } @@ -481,7 +511,7 @@ mod tests { } #[test] - fn check_enough_messages_included_works() { + fn check_enough_messages_included_basic_works() { let mut messages = AbridgedInboundHrmpMessages { full_messages: vec![( 1000.into(), @@ -493,13 +523,45 @@ mod tests { )], }; - messages.check_enough_messages_included("Test"); + messages.check_enough_messages_included_basic("Test"); messages.full_messages = vec![]; - let result = std::panic::catch_unwind(|| messages.check_enough_messages_included("Test")); + let result = + std::panic::catch_unwind(|| messages.check_enough_messages_included_basic("Test")); assert!(result.is_err()); messages.hashed_messages = vec![]; - messages.check_enough_messages_included("Test"); + messages.check_enough_messages_included_basic("Test"); + } + + #[test] + fn check_enough_messages_included_advanced_works() { + let mixed_messages = AbridgedInboundHrmpMessages { + full_messages: vec![( + 1000.into(), + InboundHrmpMessage { sent_at: 0, data: vec![1; 50] }, + )], + hashed_messages: vec![( + 2000.into(), + HashedMessage { sent_at: 1, msg_hash: Default::default() }, + )], + }; + let result = std::panic::catch_unwind(|| { + mixed_messages.check_enough_messages_included_advanced( + "Test", + AbridgedInboundMessagesSizeInfo { + max_full_messages_size: 100, + first_hashed_msg_max_size: 50, + }, + ) + }); + assert!(result.is_err()); + mixed_messages.check_enough_messages_included_advanced( + "Test", + AbridgedInboundMessagesSizeInfo { + max_full_messages_size: 100, + first_hashed_msg_max_size: 51, + }, + ); } } diff --git a/cumulus/pallets/parachain-system/src/tests.rs b/cumulus/pallets/parachain-system/src/tests.rs index 2a52c99f5eef4..7b82b6ed616a7 100755 --- a/cumulus/pallets/parachain-system/src/tests.rs +++ b/cumulus/pallets/parachain-system/src/tests.rs @@ -415,8 +415,9 @@ fn inherent_messages_are_compressed() { sproof.dmq_mqc_head = Some(dmp_mqc.head()); for (sender, msg) in &hrmp_msgs_clone { - let mqc_head = - sproof.upsert_inbound_channel(*sender).mqc_head.get_or_insert_default(); + let channel = sproof.upsert_inbound_channel(*sender); + channel.max_message_size = 100 * 1024; + let mqc_head = channel.mqc_head.get_or_insert_default(); let mut mqc = MessageQueueChain::new(*mqc_head); mqc.extend_hrmp(msg); *mqc_head = mqc.head(); diff --git a/prdoc/pr_9086.prdoc b/prdoc/pr_9086.prdoc new file mode 100644 index 0000000000000..b06f41efa56b0 --- /dev/null +++ b/prdoc/pr_9086.prdoc @@ -0,0 +1,8 @@ +title: Make HRMP advancement rule more restrictive +doc: +- audience: Runtime Dev + description: |- + This PR improves `check_enough_messages_included()` and makes the advancement rule more restrictive for HRMP. +crates: +- name: cumulus-pallet-parachain-system + bump: major