diff --git a/cumulus/client/consensus/aura/src/collators/slot_based/tests.rs b/cumulus/client/consensus/aura/src/collators/slot_based/tests.rs index e0ba35e558afe..0fc6bc5876962 100644 --- a/cumulus/client/consensus/aura/src/collators/slot_based/tests.rs +++ b/cumulus/client/consensus/aura/src/collators/slot_based/tests.rs @@ -486,6 +486,14 @@ impl RelayChainInterface for TestRelayClient { unimplemented!("Not needed for test") } + async fn retrieve_subscribed_published_data( + &self, + _: ParaId, + _: RelayHash, + ) -> RelayChainResult, Vec)>>> { + unimplemented!("Not needed for test") + } + async fn persisted_validation_data( &self, _: RelayHash, diff --git a/cumulus/client/consensus/common/src/tests.rs b/cumulus/client/consensus/common/src/tests.rs index ff1c8ec56508b..c776c43c947e0 100644 --- a/cumulus/client/consensus/common/src/tests.rs +++ b/cumulus/client/consensus/common/src/tests.rs @@ -123,6 +123,14 @@ impl RelayChainInterface for Relaychain { unimplemented!("Not needed for test") } + async fn retrieve_subscribed_published_data( + &self, + _: ParaId, + _: PHash, + ) -> RelayChainResult, Vec)>>> { + unimplemented!("Not needed for test") + } + async fn persisted_validation_data( &self, hash: PHash, diff --git a/cumulus/client/network/src/tests.rs b/cumulus/client/network/src/tests.rs index cfbaba72588e2..c54ebdeddf6ef 100644 --- a/cumulus/client/network/src/tests.rs +++ b/cumulus/client/network/src/tests.rs @@ -140,6 +140,14 @@ impl RelayChainInterface for DummyRelayChainInterface { Ok(BTreeMap::new()) } + async fn retrieve_subscribed_published_data( + &self, + _: ParaId, + _: PHash, + ) -> RelayChainResult, Vec)>>> { + Ok(BTreeMap::new()) + } + async fn persisted_validation_data( &self, _: PHash, diff --git a/cumulus/client/parachain-inherent/src/lib.rs b/cumulus/client/parachain-inherent/src/lib.rs index 5e994cd472f70..8ad1988067050 100644 --- a/cumulus/client/parachain-inherent/src/lib.rs +++ b/cumulus/client/parachain-inherent/src/lib.rs @@ -136,6 +136,9 @@ async fn collect_relay_storage_proof( relevant_keys.push(relay_well_known_keys::NEXT_AUTHORITIES.to_vec()); } + // Include broadcaster published data roots + relevant_keys.push(relay_well_known_keys::BROADCASTER_PUBLISHED_DATA_ROOTS.to_vec()); + // Add additional relay state keys let unique_keys: Vec> = additional_relay_state_keys .into_iter() @@ -223,6 +226,19 @@ impl ParachainInherentDataProvider { }) .ok()?; + let published_data = relay_chain_interface + .retrieve_subscribed_published_data(para_id, relay_parent) + .await + .map_err(|e| { + tracing::error!( + target: LOG_TARGET, + relay_parent = ?relay_parent, + error = ?e, + "An error occurred during requesting subscribed published data.", + ); + }) + .unwrap_or_default(); + Some(ParachainInherentData { downward_messages, horizontal_messages, @@ -230,6 +246,7 @@ impl ParachainInherentDataProvider { relay_chain_state, relay_parent_descendants, collator_peer_id, + published_data, }) } } diff --git a/cumulus/client/parachain-inherent/src/mock.rs b/cumulus/client/parachain-inherent/src/mock.rs index 5f4517f884245..75977a3a5920a 100644 --- a/cumulus/client/parachain-inherent/src/mock.rs +++ b/cumulus/client/parachain-inherent/src/mock.rs @@ -242,6 +242,7 @@ impl> InherentDataProvider relay_chain_state: proof, relay_parent_descendants: Default::default(), collator_peer_id: None, + published_data: Default::default(), }; parachain_inherent_data.provide_inherent_data(inherent_data).await diff --git a/cumulus/client/pov-recovery/src/tests.rs b/cumulus/client/pov-recovery/src/tests.rs index 574632c96454d..d0348a11c4d8f 100644 --- a/cumulus/client/pov-recovery/src/tests.rs +++ b/cumulus/client/pov-recovery/src/tests.rs @@ -364,6 +364,14 @@ impl RelayChainInterface for Relaychain { unimplemented!("Not needed for test") } + async fn retrieve_subscribed_published_data( + &self, + _: ParaId, + _: PHash, + ) -> RelayChainResult, Vec)>>> { + unimplemented!("Not needed for test") + } + async fn persisted_validation_data( &self, _: PHash, diff --git a/cumulus/client/relay-chain-inprocess-interface/src/lib.rs b/cumulus/client/relay-chain-inprocess-interface/src/lib.rs index b989f81efd5dc..8f129a078e76d 100644 --- a/cumulus/client/relay-chain-inprocess-interface/src/lib.rs +++ b/cumulus/client/relay-chain-inprocess-interface/src/lib.rs @@ -106,6 +106,14 @@ impl RelayChainInterface for RelayChainInProcessInterface { .inbound_hrmp_channels_contents(relay_parent, para_id)?) } + async fn retrieve_subscribed_published_data( + &self, + para_id: ParaId, + relay_parent: PHash, + ) -> RelayChainResult, Vec)>>> { + Ok(self.full_client.runtime_api().get_subscribed_data(relay_parent, para_id)?) + } + async fn header(&self, block_id: BlockId) -> RelayChainResult> { let hash = match block_id { BlockId::Hash(hash) => hash, diff --git a/cumulus/client/relay-chain-interface/src/lib.rs b/cumulus/client/relay-chain-interface/src/lib.rs index dd03738ed0029..5ae197d0bb43c 100644 --- a/cumulus/client/relay-chain-interface/src/lib.rs +++ b/cumulus/client/relay-chain-interface/src/lib.rs @@ -151,6 +151,14 @@ pub trait RelayChainInterface: Send + Sync { relay_parent: PHash, ) -> RelayChainResult>>; + /// Returns published data from all subscribed publishers for the parachain we are collating + /// for. + async fn retrieve_subscribed_published_data( + &self, + para_id: ParaId, + relay_parent: PHash, + ) -> RelayChainResult, Vec)>>>; + /// Yields the persisted validation data for the given `ParaId` along with an assumption that /// should be used if the para currently occupies a core. /// @@ -273,6 +281,14 @@ where (**self).retrieve_all_inbound_hrmp_channel_contents(para_id, relay_parent).await } + async fn retrieve_subscribed_published_data( + &self, + para_id: ParaId, + relay_parent: PHash, + ) -> RelayChainResult, Vec)>>> { + (**self).retrieve_subscribed_published_data(para_id, relay_parent).await + } + async fn persisted_validation_data( &self, block_id: PHash, diff --git a/cumulus/client/relay-chain-rpc-interface/src/lib.rs b/cumulus/client/relay-chain-rpc-interface/src/lib.rs index 84d22676789cf..f9beac0b14148 100644 --- a/cumulus/client/relay-chain-rpc-interface/src/lib.rs +++ b/cumulus/client/relay-chain-rpc-interface/src/lib.rs @@ -82,6 +82,14 @@ impl RelayChainInterface for RelayChainRpcInterface { .await } + async fn retrieve_subscribed_published_data( + &self, + para_id: ParaId, + relay_parent: RelayHash, + ) -> RelayChainResult, Vec)>>> { + self.rpc_client.parachain_host_get_subscribed_data(para_id, relay_parent).await + } + async fn header(&self, block_id: BlockId) -> RelayChainResult> { let hash = match block_id { BlockId::Hash(hash) => hash, diff --git a/cumulus/client/relay-chain-rpc-interface/src/rpc_client.rs b/cumulus/client/relay-chain-rpc-interface/src/rpc_client.rs index 80858a665cfaf..31fad324b4a60 100644 --- a/cumulus/client/relay-chain-rpc-interface/src/rpc_client.rs +++ b/cumulus/client/relay-chain-rpc-interface/src/rpc_client.rs @@ -598,6 +598,20 @@ impl RelayChainRpcClient { .await } + /// Get published data from all subscribed publishers for a parachain. + pub async fn parachain_host_get_subscribed_data( + &self, + para_id: ParaId, + at: RelayHash, + ) -> Result, Vec)>>, RelayChainError> { + self.call_remote_runtime_function( + "ParachainHost_get_subscribed_data", + at, + Some(para_id), + ) + .await + } + /// Get all the pending inbound messages in the downward message queue for a para. pub async fn parachain_host_dmq_contents( &self, diff --git a/cumulus/pallets/parachain-system/src/benchmarking.rs b/cumulus/pallets/parachain-system/src/benchmarking.rs index c3d59e82255a3..28528d537031e 100644 --- a/cumulus/pallets/parachain-system/src/benchmarking.rs +++ b/cumulus/pallets/parachain-system/src/benchmarking.rs @@ -54,6 +54,59 @@ mod benchmarks { assert_eq!(LastDmqMqcHead::::get().head(), head); } + /// Benchmark processing published data from the broadcaster pallet. + /// + /// - `p`: Number of publishers with changed data + /// - `k`: Number of key-value pairs per publisher + /// - `v`: Size of each value in bytes + #[benchmark] + fn process_published_data( + p: Linear<1, 100>, + k: Linear<1, 16>, + v: Linear<1, 1024>, + ) { + use alloc::collections::BTreeMap; + + // Populate storage with existing data to maximize clear_prefix cost + for i in 0..p { + let para_id = ParaId::from(1000 + i); + for j in 0..k { + PublishedData::::insert( + para_id, + vec![j as u8; 32], + vec![0u8; v as usize], + ); + } + } + + // Store initial roots + let initial_roots: BTreeMap> = (0..p) + .map(|i| (ParaId::from(1000 + i), vec![0xBB; 32])) + .collect(); + PreviousPublishedDataRoots::::put(initial_roots); + + // Prepare new data with changed roots + let mut published_data = BTreeMap::new(); + let mut current_roots = Vec::new(); + + for i in 0..p { + let para_id = ParaId::from(1000 + i); + let entries: Vec<(Vec, Vec)> = (0..k) + .map(|j| (vec![j as u8; 32], vec![1u8; v as usize])) + .collect(); + published_data.insert(para_id, entries); + current_roots.push((para_id, vec![0xAA; 32])); + } + + #[block] + { + Pallet::::process_published_data(&published_data, ¤t_roots); + } + + // Verify storage updated + assert_eq!(PreviousPublishedDataRoots::::get().len(), p as usize); + } + /// Re-implements an easy version of the `MessageQueueChain` for testing purposes. fn mqp_head(msgs: &Vec) -> RelayHash { let mut head = Default::default(); diff --git a/cumulus/pallets/parachain-system/src/lib.rs b/cumulus/pallets/parachain-system/src/lib.rs index 85d12ed473467..4035d3a626941 100644 --- a/cumulus/pallets/parachain-system/src/lib.rs +++ b/cumulus/pallets/parachain-system/src/lib.rs @@ -51,6 +51,7 @@ use frame_system::{ensure_none, ensure_root, pallet_prelude::HeaderFor}; use parachain_inherent::{ deconstruct_parachain_inherent_data, AbridgedInboundDownwardMessages, AbridgedInboundHrmpMessages, BasicParachainInherentData, InboundMessageId, InboundMessagesData, + InboundPublishedData, }; use polkadot_parachain_primitives::primitives::RelayChainBlockNumber; use polkadot_runtime_parachains::{FeeTracker, GetMinFeeFactor}; @@ -575,6 +576,7 @@ pub mod pallet { origin: OriginFor, data: BasicParachainInherentData, inbound_messages_data: InboundMessagesData, + published_data: InboundPublishedData, ) -> DispatchResult { ensure_none(origin)?; assert!( @@ -701,6 +703,16 @@ pub mod pallet { >::put(relevant_messaging_state.clone()); >::put(host_config); + // Extract published data roots from relay chain state + let current_roots = relay_state_proof + .read_published_data_roots() + .ok() + .flatten() + .unwrap_or_default(); + + // Process published data from the broadcaster pallet + total_weight.saturating_accrue(Self::process_published_data(&published_data.data, ¤t_roots)); + ::on_validation_data(&vfp); if let Some(collator_peer_id) = collator_peer_id { @@ -855,6 +867,30 @@ pub mod pallet { #[pallet::storage] pub type RelayStateProof = StorageValue<_, sp_trie::StorageProof>; + /// Published data from subscribed parachains. + /// Maps (Publisher ParaId, Key) -> Value. + /// This is populated from the parachain inherent data. + #[pallet::storage] + pub type PublishedData = StorageDoubleMap< + _, + Blake2_128Concat, + ParaId, // Publisher + Blake2_128Concat, + Vec, // Key + Vec, // Value + OptionQuery, + >; + + /// Previous data roots of published data, used to detect changes. + /// Contains (ParaId, root_hash) pairs from the previous block for comparison. + /// Stored as BTreeMap for efficient lookups without conversion overhead. + #[pallet::storage] + pub type PreviousPublishedDataRoots = StorageValue< + _, + BTreeMap>, + ValueQuery, + >; + /// The snapshot of some state related to messaging relevant to the current parachain as per /// the relay parent. /// @@ -1126,7 +1162,7 @@ impl Pallet { /// This method doesn't check for mqc heads mismatch. If the MQC doesn't match after /// dropping messages, the runtime will panic when executing the inherent. fn do_create_inherent(data: ParachainInherentData) -> Call { - let (data, mut downward_messages, mut horizontal_messages) = + let (data, mut downward_messages, mut horizontal_messages, published_data) = deconstruct_parachain_inherent_data(data); let last_relay_block_number = LastRelayChainBlockNumber::::get(); @@ -1148,7 +1184,7 @@ impl Pallet { let inbound_messages_data = InboundMessagesData::new(downward_messages, horizontal_messages); - Call::set_validation_data { data, inbound_messages_data } + Call::set_validation_data { data, inbound_messages_data, published_data } } /// Enqueue all inbound downward messages relayed by the collator into the MQ pallet. @@ -1679,6 +1715,73 @@ impl Pallet { pub fn last_relay_block_number() -> RelayChainBlockNumber { LastRelayChainBlockNumber::::get() } + + /// Process published data from the broadcaster pallet and store it in parachain storage. + /// + /// Uses child trie roots to detect changes between blocks, only updating storage for + /// publishers whose data has changed. Clears data for publishers that have been removed. + fn process_published_data( + published_data: &BTreeMap, Vec)>>, + current_roots: &Vec<(ParaId, Vec)>, + ) -> Weight { + let previous_roots = >::get(); + + if current_roots.is_empty() && published_data.is_empty() && previous_roots.is_empty() { + return T::DbWeight::get().reads(1); + } + + // Calculate weight parameters for benchmarking + let mut p = 0u32; + let mut k = 0u32; + let mut v = 0u32; + + for entries in published_data.values() { + p += 1; + let entry_count = entries.len() as u32; + k = k.max(entry_count); + + for (_, value) in entries { + v = v.max(value.len() as u32); + } + } + + // Convert current roots to map for efficient lookups. + let current_roots_map: BTreeMap> = current_roots.iter() + .map(|(para_id, root)| (*para_id, root.clone())) + .collect(); + + // Update storage for publishers with changed roots. + for (publisher, data_entries) in published_data { + let should_update = match previous_roots.get(publisher) { + Some(prev_root) => match current_roots_map.get(publisher) { + Some(curr_root) if prev_root == curr_root => false, + _ => true, + }, + None => true, + }; + + if should_update { + let result = PublishedData::::clear_prefix(publisher, u32::MAX, None); + debug_assert!(result.maybe_cursor.is_none()); + + for (key, value) in data_entries { + PublishedData::::insert(publisher, key, value); + } + } + } + + // Clear storage for removed publishers. + for (para_id, _) in previous_roots.iter() { + if !current_roots_map.contains_key(para_id) { + let result = PublishedData::::clear_prefix(para_id, u32::MAX, None); + debug_assert!(result.maybe_cursor.is_none()); + } + } + + >::put(current_roots_map); + + T::WeightInfo::process_published_data(p, k, v) + } } impl UpwardMessageSender for Pallet { diff --git a/cumulus/pallets/parachain-system/src/mock.rs b/cumulus/pallets/parachain-system/src/mock.rs index d3c7cef52b637..5a272247b41ae 100644 --- a/cumulus/pallets/parachain-system/src/mock.rs +++ b/cumulus/pallets/parachain-system/src/mock.rs @@ -425,6 +425,7 @@ impl BlockTests { horizontal_messages: Default::default(), relay_parent_descendants: Default::default(), collator_peer_id: None, + published_data: Default::default(), }; if let Some(ref hook) = self.inherent_data_hook { hook(self, relay_parent_number, &mut system_inherent_data); diff --git a/cumulus/pallets/parachain-system/src/parachain_inherent.rs b/cumulus/pallets/parachain-system/src/parachain_inherent.rs index 04ec18004961d..8552747c79cef 100644 --- a/cumulus/pallets/parachain-system/src/parachain_inherent.rs +++ b/cumulus/pallets/parachain-system/src/parachain_inherent.rs @@ -328,6 +328,30 @@ pub struct BasicParachainInherentData { pub collator_peer_id: Option, } +/// Published data from the broadcaster pallet that is passed by the collator to the parachain +/// runtime as part of the inherent data. +#[derive( + codec::Encode, + codec::Decode, + codec::DecodeWithMemTracking, + sp_core::RuntimeDebug, + Clone, + PartialEq, + TypeInfo, +)] +pub struct InboundPublishedData { + /// Published data grouped by publisher ParaId. + /// Key: Publisher ParaId, Value: Vector of (key, value) pairs published by that parachain. + pub data: BTreeMap, Vec)>>, +} + +impl InboundPublishedData { + /// Creates a new instance of `InboundPublishedData` with the provided data. + pub fn new(data: BTreeMap, Vec)>>) -> Self { + Self { data } + } +} + /// The messages that are passed by the collator to the parachain runtime as part of the /// inherent data. #[derive( @@ -357,7 +381,7 @@ impl InboundMessagesData { /// Deconstructs a `ParachainInherentData` instance. pub fn deconstruct_parachain_inherent_data( data: ParachainInherentData, -) -> (BasicParachainInherentData, InboundDownwardMessages, InboundHrmpMessages) { +) -> (BasicParachainInherentData, InboundDownwardMessages, InboundHrmpMessages, InboundPublishedData) { ( BasicParachainInherentData { validation_data: data.validation_data, @@ -367,6 +391,7 @@ pub fn deconstruct_parachain_inherent_data( }, InboundDownwardMessages::new(data.downward_messages), InboundHrmpMessages::from_map(data.horizontal_messages), + InboundPublishedData::new(data.published_data), ) } diff --git a/cumulus/pallets/parachain-system/src/relay_state_snapshot.rs b/cumulus/pallets/parachain-system/src/relay_state_snapshot.rs index 7138d61edd277..7d7bd761d79f6 100644 --- a/cumulus/pallets/parachain-system/src/relay_state_snapshot.rs +++ b/cumulus/pallets/parachain-system/src/relay_state_snapshot.rs @@ -383,4 +383,18 @@ impl RelayChainStateProof { { read_optional_entry(&self.trie_backend, key).map_err(Error::ReadOptionalEntry) } + + /// Read the published data roots from the broadcaster pallet on the relay chain. + /// + /// Returns `Ok(Some(data))` if the data exists and can be decoded, + /// `Ok(None)` if the data doesn't exist (broadcaster pallet not present), + /// or `Err` if there was a proof/decode error. + pub fn read_published_data_roots(&self) -> Result)>>, Error> { + // Use the well-known key for BROADCASTER_PUBLISHED_DATA_ROOTS + read_optional_entry::)>, _>( + &self.trie_backend, + relay_chain::well_known_keys::BROADCASTER_PUBLISHED_DATA_ROOTS, + ) + .map_err(Error::ReadOptionalEntry) + } } diff --git a/cumulus/pallets/parachain-system/src/tests.rs b/cumulus/pallets/parachain-system/src/tests.rs index 4b69d674e9eca..2a3493cc550d9 100755 --- a/cumulus/pallets/parachain-system/src/tests.rs +++ b/cumulus/pallets/parachain-system/src/tests.rs @@ -60,6 +60,7 @@ fn test_inherent_compatibility() { horizontal_messages: Default::default(), relay_parent_descendants: Default::default(), collator_peer_id: None, + published_data: Default::default(), }, ) .expect("Put validation function params failed"); @@ -85,6 +86,7 @@ fn test_inherent_compatibility() { horizontal_messages: Default::default(), relay_parent_descendants: Default::default(), collator_peer_id: None, + published_data: Default::default(), }; let _ = futures::executor::block_on( data.provide_inherent_data(&mut valid_inherent_data_full_compatibility), @@ -1612,6 +1614,243 @@ fn deposits_relay_parent_storage_root() { ); } +#[test] +fn published_data_root_changes() { + BlockTests::new() + .with_relay_sproof_builder(|_, relay_block_num, sproof| match relay_block_num { + 1 => { + sproof.published_data_roots = Some(vec![ + (ParaId::from(1000), vec![0xAA; 32]), + ]); + }, + 2 => { + // 1000 - root unchanged + sproof.published_data_roots = Some(vec![ + (ParaId::from(1000), vec![0xAA; 32]), + ]); + }, + 3 => { + // 1000 - root changed + sproof.published_data_roots = Some(vec![ + (ParaId::from(1000), vec![0xBB; 32]), + ]); + }, + _ => unreachable!(), + }) + .with_inherent_data(|_, relay_block_num, data| match relay_block_num { + 1 => { + data.published_data.insert( + ParaId::from(1000), + vec![(b"key1".to_vec(), b"value1".to_vec())], + ); + }, + 2 => { + // same root, data ignored + data.published_data.insert( + ParaId::from(1000), + vec![(b"key1".to_vec(), b"ignored".to_vec())], + ); + }, + 3 => { + data.published_data.insert( + ParaId::from(1000), + vec![(b"key1".to_vec(), b"value2".to_vec())], + ); + }, + _ => unreachable!(), + }) + .add_with_post_test( + 1, + || {}, + || { + assert_eq!( + PublishedData::::get(ParaId::from(1000), b"key1".to_vec()), + Some(b"value1".to_vec()) + ); + }, + ) + .add_with_post_test( + 2, + || {}, + || { + assert_eq!( + PublishedData::::get(ParaId::from(1000), b"key1".to_vec()), + Some(b"value1".to_vec()) + ); + }, + ) + .add_with_post_test( + 3, + || {}, + || { + assert_eq!( + PublishedData::::get(ParaId::from(1000), b"key1".to_vec()), + Some(b"value2".to_vec()) + ); + let roots = PreviousPublishedDataRoots::::get(); + assert_eq!(roots.get(&ParaId::from(1000)), Some(&vec![0xBB; 32])); + }, + ); +} + +#[test] +fn published_data_removed_clears_storage() { + BlockTests::new() + .with_relay_sproof_builder(|_, relay_block_num, sproof| { + if relay_block_num == 1 { + sproof.published_data_roots = Some(vec![ + (ParaId::from(1000), vec![0xAA; 32]), + ]); + } else { + // 1000 - removed + sproof.published_data_roots = Some(vec![]); + } + }) + .with_inherent_data(|_, relay_block_num, data| { + if relay_block_num == 1 { + data.published_data.insert( + ParaId::from(1000), + vec![ + (b"key1".to_vec(), b"value1".to_vec()), + (b"key2".to_vec(), b"value2".to_vec()), + (b"key3".to_vec(), b"value3".to_vec()), + ], + ); + } + }) + .add_with_post_test( + 1, + || {}, + || { + assert_eq!( + PublishedData::::get(ParaId::from(1000), b"key1".to_vec()), + Some(b"value1".to_vec()) + ); + assert_eq!( + PublishedData::::get(ParaId::from(1000), b"key2".to_vec()), + Some(b"value2".to_vec()) + ); + assert_eq!( + PublishedData::::get(ParaId::from(1000), b"key3".to_vec()), + Some(b"value3".to_vec()) + ); + }, + ) + .add_with_post_test( + 2, + || {}, + || { + assert_eq!( + PublishedData::::get(ParaId::from(1000), b"key1".to_vec()), + None + ); + assert_eq!( + PublishedData::::get(ParaId::from(1000), b"key2".to_vec()), + None + ); + assert_eq!( + PublishedData::::get(ParaId::from(1000), b"key3".to_vec()), + None + ); + let roots = PreviousPublishedDataRoots::::get(); + assert!(!roots.contains_key(&ParaId::from(1000))); + }, + ); +} + +#[test] +fn published_data_multiple_publishers() { + BlockTests::new() + .with_relay_sproof_builder(|_, relay_block_num, sproof| match relay_block_num { + 1 => { + sproof.published_data_roots = Some(vec![]); + }, + 2 => { + // 1000, 2000 - appear + sproof.published_data_roots = Some(vec![ + (ParaId::from(1000), vec![0xAA; 32]), + (ParaId::from(2000), vec![0xBB; 32]), + ]); + }, + 3 => { + // 1000 - removed, 2000 - remains + sproof.published_data_roots = Some(vec![ + (ParaId::from(2000), vec![0xBB; 32]), + ]); + }, + _ => unreachable!(), + }) + .with_inherent_data(|_, relay_block_num, data| match relay_block_num { + 1 => {}, + 2 => { + data.published_data.insert( + ParaId::from(1000), + vec![(b"key1".to_vec(), b"value1".to_vec())], + ); + data.published_data.insert( + ParaId::from(2000), + vec![(b"key2".to_vec(), b"value2".to_vec())], + ); + }, + 3 => { + data.published_data.insert( + ParaId::from(2000), + vec![(b"key2".to_vec(), b"value2".to_vec())], + ); + }, + _ => unreachable!(), + }) + .add_with_post_test( + 1, + || {}, + || { + let roots = PreviousPublishedDataRoots::::get(); + assert!(roots.is_empty()); + }, + ) + .add_with_post_test( + 2, + || {}, + || { + assert_eq!( + PublishedData::::get(ParaId::from(1000), b"key1".to_vec()), + Some(b"value1".to_vec()) + ); + assert_eq!( + PublishedData::::get(ParaId::from(2000), b"key2".to_vec()), + Some(b"value2".to_vec()) + ); + + let roots = PreviousPublishedDataRoots::::get(); + assert_eq!(roots.len(), 2); + assert_eq!(roots.get(&ParaId::from(1000)), Some(&vec![0xAA; 32])); + assert_eq!(roots.get(&ParaId::from(2000)), Some(&vec![0xBB; 32])); + }, + ) + .add_with_post_test( + 3, + || {}, + || { + // 1000 - cleared + assert_eq!( + PublishedData::::get(ParaId::from(1000), b"key1".to_vec()), + None + ); + // 2000 - intact + assert_eq!( + PublishedData::::get(ParaId::from(2000), b"key2".to_vec()), + Some(b"value2".to_vec()) + ); + + let roots = PreviousPublishedDataRoots::::get(); + assert_eq!(roots.len(), 1); + assert!(!roots.contains_key(&ParaId::from(1000))); + assert_eq!(roots.get(&ParaId::from(2000)), Some(&vec![0xBB; 32])); + }, + ); +} + + #[test] fn ump_fee_factor_increases_and_decreases() { BlockTests::new() diff --git a/cumulus/pallets/parachain-system/src/weights.rs b/cumulus/pallets/parachain-system/src/weights.rs index ba7d8b1e87f6b..e28e8b8988923 100644 --- a/cumulus/pallets/parachain-system/src/weights.rs +++ b/cumulus/pallets/parachain-system/src/weights.rs @@ -55,6 +55,7 @@ use core::marker::PhantomData; /// Weight functions needed for cumulus_pallet_parachain_system. pub trait WeightInfo { fn enqueue_inbound_downward_messages(n: u32, ) -> Weight; + fn process_published_data(p: u32, k: u32, v: u32, ) -> Weight; } /// Weights for cumulus_pallet_parachain_system using the Substrate node and recommended hardware. @@ -84,6 +85,33 @@ impl WeightInfo for SubstrateWeight { .saturating_add(T::DbWeight::get().reads(4_u64)) .saturating_add(T::DbWeight::get().writes(4_u64)) } + + /// Storage: `ParachainSystem::PreviousPublishedDataRoots` (r:1 w:1) + /// Proof: `ParachainSystem::PreviousPublishedDataRoots` (`max_values`: Some(1), `max_size`: None, mode: `Measured`) + /// Storage: `ParachainSystem::PublishedData` (r:1600 w:1600) + /// Proof: `ParachainSystem::PublishedData` (`max_values`: None, `max_size`: None, mode: `Measured`) + /// The range of component `p` is `[1, 100]`. + /// The range of component `k` is `[1, 16]`. + /// The range of component `v` is `[1, 1024]`. + fn process_published_data(p: u32, k: u32, _v: u32, ) -> Weight { + // Proof Size summary in bytes: + // Measured: `0 + k * (8700 ±0) + p * (1453 ±0)` + // Estimated: `42129 + k * (152196 ±69) + p * (24762 ±11)` + // Minimum execution time: 44_000_000 picoseconds. + Weight::from_parts(47_000_000, 42129) + // Standard Error: 358_553 + .saturating_add(Weight::from_parts(30_340_884, 0).saturating_mul(p.into())) + // Standard Error: 2_254_225 + .saturating_add(Weight::from_parts(160_483_325, 0).saturating_mul(k.into())) + .saturating_add(T::DbWeight::get().reads(17_u64)) + .saturating_add(T::DbWeight::get().reads((10_u64).saturating_mul(p.into()))) + .saturating_add(T::DbWeight::get().reads((59_u64).saturating_mul(k.into()))) + .saturating_add(T::DbWeight::get().writes(17_u64)) + .saturating_add(T::DbWeight::get().writes((10_u64).saturating_mul(p.into()))) + .saturating_add(T::DbWeight::get().writes((59_u64).saturating_mul(k.into()))) + .saturating_add(Weight::from_parts(0, 152196).saturating_mul(k.into())) + .saturating_add(Weight::from_parts(0, 24762).saturating_mul(p.into())) + } } // For backwards compatibility and tests @@ -112,4 +140,31 @@ impl WeightInfo for () { .saturating_add(RocksDbWeight::get().reads(4_u64)) .saturating_add(RocksDbWeight::get().writes(4_u64)) } + + /// Storage: `ParachainSystem::PreviousPublishedDataRoots` (r:1 w:1) + /// Proof: `ParachainSystem::PreviousPublishedDataRoots` (`max_values`: Some(1), `max_size`: None, mode: `Measured`) + /// Storage: `ParachainSystem::PublishedData` (r:1600 w:1600) + /// Proof: `ParachainSystem::PublishedData` (`max_values`: None, `max_size`: None, mode: `Measured`) + /// The range of component `p` is `[1, 100]`. + /// The range of component `k` is `[1, 16]`. + /// The range of component `v` is `[1, 1024]`. + fn process_published_data(p: u32, k: u32, _v: u32, ) -> Weight { + // Proof Size summary in bytes: + // Measured: `0 + k * (8700 ±0) + p * (1453 ±0)` + // Estimated: `42129 + k * (152196 ±69) + p * (24762 ±11)` + // Minimum execution time: 44_000_000 picoseconds. + Weight::from_parts(47_000_000, 42129) + // Standard Error: 358_553 + .saturating_add(Weight::from_parts(30_340_884, 0).saturating_mul(p.into())) + // Standard Error: 2_254_225 + .saturating_add(Weight::from_parts(160_483_325, 0).saturating_mul(k.into())) + .saturating_add(RocksDbWeight::get().reads(17_u64)) + .saturating_add(RocksDbWeight::get().reads((10_u64).saturating_mul(p.into()))) + .saturating_add(RocksDbWeight::get().reads((59_u64).saturating_mul(k.into()))) + .saturating_add(RocksDbWeight::get().writes(17_u64)) + .saturating_add(RocksDbWeight::get().writes((10_u64).saturating_mul(p.into()))) + .saturating_add(RocksDbWeight::get().writes((59_u64).saturating_mul(k.into()))) + .saturating_add(Weight::from_parts(0, 152196).saturating_mul(k.into())) + .saturating_add(Weight::from_parts(0, 24762).saturating_mul(p.into())) + } } diff --git a/cumulus/parachains/runtimes/assets/asset-hub-rococo/src/weights/cumulus_pallet_parachain_system.rs b/cumulus/parachains/runtimes/assets/asset-hub-rococo/src/weights/cumulus_pallet_parachain_system.rs index 23dd800922aea..7016c2258de6f 100644 --- a/cumulus/parachains/runtimes/assets/asset-hub-rococo/src/weights/cumulus_pallet_parachain_system.rs +++ b/cumulus/parachains/runtimes/assets/asset-hub-rococo/src/weights/cumulus_pallet_parachain_system.rs @@ -74,4 +74,23 @@ impl cumulus_pallet_parachain_system::WeightInfo for We .saturating_add(T::DbWeight::get().writes(4)) .saturating_add(T::DbWeight::get().writes((1_u64).saturating_mul(n.into()))) } + + /// Storage: ParachainSystem PreviousPublishedDataRoots (r:1 w:1) + /// Proof Skipped: ParachainSystem PreviousPublishedDataRoots (max_values: Some(1), max_size: None, mode: Measured) + /// Storage: ParachainSystem PublishedData (r:0 w:1600) + /// Proof Skipped: ParachainSystem PublishedData (max_values: None, max_size: None, mode: Measured) + /// The range of component `p` is `[1, 100]`. + /// The range of component `k` is `[1, 16]`. + /// The range of component `v` is `[1, 1024]`. + // TODO: Placeholder weight. Needs to be benchmarked for this specific runtime. + fn process_published_data(p: u32, k: u32, v: u32, ) -> Weight { + Weight::from_parts(5_000_000, 0) + .saturating_add(Weight::from_parts(50_000_000, 0).saturating_mul(p.into())) + .saturating_add(Weight::from_parts(10_000_000, 0).saturating_mul(k.into())) + .saturating_add(Weight::from_parts(5_000, 0).saturating_mul(v.into())) + .saturating_add(T::DbWeight::get().reads(1_u64)) + .saturating_add(T::DbWeight::get().writes(1_u64)) + .saturating_add(T::DbWeight::get().writes((2_u64).saturating_mul(p.into()))) + .saturating_add(T::DbWeight::get().writes((k as u64).saturating_mul(p.into()))) + } } diff --git a/cumulus/parachains/runtimes/assets/asset-hub-rococo/src/weights/xcm/mod.rs b/cumulus/parachains/runtimes/assets/asset-hub-rococo/src/weights/xcm/mod.rs index 3dc3e82a62ff9..08ca0d1d8d681 100644 --- a/cumulus/parachains/runtimes/assets/asset-hub-rococo/src/weights/xcm/mod.rs +++ b/cumulus/parachains/runtimes/assets/asset-hub-rococo/src/weights/xcm/mod.rs @@ -24,7 +24,7 @@ use pallet_xcm_benchmarks_fungible::WeightInfo as XcmFungibleWeight; use pallet_xcm_benchmarks_generic::WeightInfo as XcmGeneric; use sp_runtime::BoundedVec; use xcm::{ - latest::{prelude::*, AssetTransferFilter}, + latest::{prelude::*, AssetTransferFilter, PublishData}, DoubleEncoded, }; @@ -271,4 +271,12 @@ impl XcmWeightInfo for AssetHubRococoXcmWeight { fn execute_with_origin(_: &Option, _: &Xcm) -> Weight { XcmGeneric::::execute_with_origin() } + fn publish(_: &PublishData) -> Weight { + // TODO: Benchmark + Weight::from_parts(10_000_000, 0) + } + fn subscribe(_: &u32) -> Weight { + // TODO: Benchmark + Weight::from_parts(10_000_000, 0) + } } diff --git a/cumulus/parachains/runtimes/assets/asset-hub-rococo/src/xcm_config.rs b/cumulus/parachains/runtimes/assets/asset-hub-rococo/src/xcm_config.rs index 66ffddf5c8339..5fc891d3c14be 100644 --- a/cumulus/parachains/runtimes/assets/asset-hub-rococo/src/xcm_config.rs +++ b/cumulus/parachains/runtimes/assets/asset-hub-rococo/src/xcm_config.rs @@ -416,6 +416,7 @@ impl xcm_executor::Config for XcmConfig { type HrmpChannelAcceptedHandler = (); type HrmpChannelClosingHandler = (); type XcmRecorder = PolkadotXcm; + type BroadcastHandler = (); } /// Converts a local signed origin into an XCM location. Forms the basis for local origins diff --git a/cumulus/parachains/runtimes/assets/asset-hub-westend/src/weights/cumulus_pallet_parachain_system.rs b/cumulus/parachains/runtimes/assets/asset-hub-westend/src/weights/cumulus_pallet_parachain_system.rs index 28f8aca5f5e7e..9d1e942f7c810 100644 --- a/cumulus/parachains/runtimes/assets/asset-hub-westend/src/weights/cumulus_pallet_parachain_system.rs +++ b/cumulus/parachains/runtimes/assets/asset-hub-westend/src/weights/cumulus_pallet_parachain_system.rs @@ -74,4 +74,23 @@ impl cumulus_pallet_parachain_system::WeightInfo for We .saturating_add(T::DbWeight::get().writes(4)) .saturating_add(T::DbWeight::get().writes((1_u64).saturating_mul(n.into()))) } + + /// Storage: ParachainSystem PreviousPublishedDataRoots (r:1 w:1) + /// Proof Skipped: ParachainSystem PreviousPublishedDataRoots (max_values: Some(1), max_size: None, mode: Measured) + /// Storage: ParachainSystem PublishedData (r:0 w:1600) + /// Proof Skipped: ParachainSystem PublishedData (max_values: None, max_size: None, mode: Measured) + /// The range of component `p` is `[1, 100]`. + /// The range of component `k` is `[1, 16]`. + /// The range of component `v` is `[1, 1024]`. + // TODO: Placeholder weight. Needs to be benchmarked for this specific runtime. + fn process_published_data(p: u32, k: u32, v: u32, ) -> Weight { + Weight::from_parts(5_000_000, 0) + .saturating_add(Weight::from_parts(50_000_000, 0).saturating_mul(p.into())) + .saturating_add(Weight::from_parts(10_000_000, 0).saturating_mul(k.into())) + .saturating_add(Weight::from_parts(5_000, 0).saturating_mul(v.into())) + .saturating_add(T::DbWeight::get().reads(1_u64)) + .saturating_add(T::DbWeight::get().writes(1_u64)) + .saturating_add(T::DbWeight::get().writes((2_u64).saturating_mul(p.into()))) + .saturating_add(T::DbWeight::get().writes((k as u64).saturating_mul(p.into()))) + } } diff --git a/cumulus/parachains/runtimes/assets/asset-hub-westend/src/weights/xcm/mod.rs b/cumulus/parachains/runtimes/assets/asset-hub-westend/src/weights/xcm/mod.rs index 27532ac431e7a..87858cc676343 100644 --- a/cumulus/parachains/runtimes/assets/asset-hub-westend/src/weights/xcm/mod.rs +++ b/cumulus/parachains/runtimes/assets/asset-hub-westend/src/weights/xcm/mod.rs @@ -27,7 +27,7 @@ use pallet_xcm_benchmarks_fungible::WeightInfo as XcmFungibleWeight; use pallet_xcm_benchmarks_generic::WeightInfo as XcmGeneric; use sp_runtime::BoundedVec; use xcm::{ - latest::{prelude::*, AssetTransferFilter}, + latest::{prelude::*, AssetTransferFilter, PublishData}, DoubleEncoded, }; @@ -302,4 +302,12 @@ impl XcmWeightInfo for AssetHubWestendXcmWeight { fn execute_with_origin(_: &Option, _: &Xcm) -> Weight { XcmGeneric::::execute_with_origin() } + fn publish(_: &PublishData) -> Weight { + // TODO: Benchmark + Weight::from_parts(10_000_000, 0) + } + fn subscribe(_: &u32) -> Weight { + // TODO: Benchmark + Weight::from_parts(10_000_000, 0) + } } diff --git a/cumulus/parachains/runtimes/assets/asset-hub-westend/src/xcm_config.rs b/cumulus/parachains/runtimes/assets/asset-hub-westend/src/xcm_config.rs index efeca0fede196..159f7517f60cd 100644 --- a/cumulus/parachains/runtimes/assets/asset-hub-westend/src/xcm_config.rs +++ b/cumulus/parachains/runtimes/assets/asset-hub-westend/src/xcm_config.rs @@ -472,6 +472,7 @@ impl xcm_executor::Config for XcmConfig { type HrmpChannelAcceptedHandler = (); type HrmpChannelClosingHandler = (); type XcmRecorder = PolkadotXcm; + type BroadcastHandler = (); } parameter_types! { diff --git a/cumulus/parachains/runtimes/bridge-hubs/bridge-hub-rococo/src/weights/cumulus_pallet_parachain_system.rs b/cumulus/parachains/runtimes/bridge-hubs/bridge-hub-rococo/src/weights/cumulus_pallet_parachain_system.rs index 145a6e3e3cf1b..11d5a73440446 100644 --- a/cumulus/parachains/runtimes/bridge-hubs/bridge-hub-rococo/src/weights/cumulus_pallet_parachain_system.rs +++ b/cumulus/parachains/runtimes/bridge-hubs/bridge-hub-rococo/src/weights/cumulus_pallet_parachain_system.rs @@ -74,4 +74,23 @@ impl cumulus_pallet_parachain_system::WeightInfo for We .saturating_add(T::DbWeight::get().writes(4)) .saturating_add(T::DbWeight::get().writes((1_u64).saturating_mul(n.into()))) } + + /// Storage: ParachainSystem PreviousPublishedDataRoots (r:1 w:1) + /// Proof Skipped: ParachainSystem PreviousPublishedDataRoots (max_values: Some(1), max_size: None, mode: Measured) + /// Storage: ParachainSystem PublishedData (r:0 w:1600) + /// Proof Skipped: ParachainSystem PublishedData (max_values: None, max_size: None, mode: Measured) + /// The range of component `p` is `[1, 100]`. + /// The range of component `k` is `[1, 16]`. + /// The range of component `v` is `[1, 1024]`. + // TODO: Placeholder weight. Needs to be benchmarked for this specific runtime. + fn process_published_data(p: u32, k: u32, v: u32, ) -> Weight { + Weight::from_parts(5_000_000, 0) + .saturating_add(Weight::from_parts(50_000_000, 0).saturating_mul(p.into())) + .saturating_add(Weight::from_parts(10_000_000, 0).saturating_mul(k.into())) + .saturating_add(Weight::from_parts(5_000, 0).saturating_mul(v.into())) + .saturating_add(T::DbWeight::get().reads(1_u64)) + .saturating_add(T::DbWeight::get().writes(1_u64)) + .saturating_add(T::DbWeight::get().writes((2_u64).saturating_mul(p.into()))) + .saturating_add(T::DbWeight::get().writes((k as u64).saturating_mul(p.into()))) + } } diff --git a/cumulus/parachains/runtimes/bridge-hubs/bridge-hub-rococo/src/weights/xcm/mod.rs b/cumulus/parachains/runtimes/bridge-hubs/bridge-hub-rococo/src/weights/xcm/mod.rs index 21708ec743821..b3fd1ad0cc4a9 100644 --- a/cumulus/parachains/runtimes/bridge-hubs/bridge-hub-rococo/src/weights/xcm/mod.rs +++ b/cumulus/parachains/runtimes/bridge-hubs/bridge-hub-rococo/src/weights/xcm/mod.rs @@ -272,4 +272,12 @@ impl XcmWeightInfo for BridgeHubRococoXcmWeight { fn execute_with_origin(_: &Option, _: &Xcm) -> Weight { XcmGeneric::::execute_with_origin() } + fn publish(_: &PublishData) -> Weight { + // TODO: Benchmark + Weight::from_parts(10_000_000, 0) + } + fn subscribe(_: &u32) -> Weight { + // TODO: Benchmark + Weight::from_parts(10_000_000, 0) + } } diff --git a/cumulus/parachains/runtimes/bridge-hubs/bridge-hub-rococo/src/xcm_config.rs b/cumulus/parachains/runtimes/bridge-hubs/bridge-hub-rococo/src/xcm_config.rs index 8a661ed53236e..f316e7437736c 100644 --- a/cumulus/parachains/runtimes/bridge-hubs/bridge-hub-rococo/src/xcm_config.rs +++ b/cumulus/parachains/runtimes/bridge-hubs/bridge-hub-rococo/src/xcm_config.rs @@ -240,6 +240,7 @@ impl xcm_executor::Config for XcmConfig { type HrmpChannelAcceptedHandler = (); type HrmpChannelClosingHandler = (); type XcmRecorder = PolkadotXcm; + type BroadcastHandler = (); } pub type PriceForParentDelivery = diff --git a/cumulus/parachains/runtimes/bridge-hubs/bridge-hub-westend/src/weights/cumulus_pallet_parachain_system.rs b/cumulus/parachains/runtimes/bridge-hubs/bridge-hub-westend/src/weights/cumulus_pallet_parachain_system.rs index e60c9cfde30e5..8b86d726e639f 100644 --- a/cumulus/parachains/runtimes/bridge-hubs/bridge-hub-westend/src/weights/cumulus_pallet_parachain_system.rs +++ b/cumulus/parachains/runtimes/bridge-hubs/bridge-hub-westend/src/weights/cumulus_pallet_parachain_system.rs @@ -74,4 +74,23 @@ impl cumulus_pallet_parachain_system::WeightInfo for We .saturating_add(T::DbWeight::get().writes(4)) .saturating_add(T::DbWeight::get().writes((1_u64).saturating_mul(n.into()))) } + + /// Storage: ParachainSystem PreviousPublishedDataRoots (r:1 w:1) + /// Proof Skipped: ParachainSystem PreviousPublishedDataRoots (max_values: Some(1), max_size: None, mode: Measured) + /// Storage: ParachainSystem PublishedData (r:0 w:1600) + /// Proof Skipped: ParachainSystem PublishedData (max_values: None, max_size: None, mode: Measured) + /// The range of component `p` is `[1, 100]`. + /// The range of component `k` is `[1, 16]`. + /// The range of component `v` is `[1, 1024]`. + // TODO: Placeholder weight. Needs to be benchmarked for this specific runtime. + fn process_published_data(p: u32, k: u32, v: u32, ) -> Weight { + Weight::from_parts(5_000_000, 0) + .saturating_add(Weight::from_parts(50_000_000, 0).saturating_mul(p.into())) + .saturating_add(Weight::from_parts(10_000_000, 0).saturating_mul(k.into())) + .saturating_add(Weight::from_parts(5_000, 0).saturating_mul(v.into())) + .saturating_add(T::DbWeight::get().reads(1_u64)) + .saturating_add(T::DbWeight::get().writes(1_u64)) + .saturating_add(T::DbWeight::get().writes((2_u64).saturating_mul(p.into()))) + .saturating_add(T::DbWeight::get().writes((k as u64).saturating_mul(p.into()))) + } } diff --git a/cumulus/parachains/runtimes/bridge-hubs/bridge-hub-westend/src/weights/xcm/mod.rs b/cumulus/parachains/runtimes/bridge-hubs/bridge-hub-westend/src/weights/xcm/mod.rs index 3706bfe22a3c8..1012feee9f4c5 100644 --- a/cumulus/parachains/runtimes/bridge-hubs/bridge-hub-westend/src/weights/xcm/mod.rs +++ b/cumulus/parachains/runtimes/bridge-hubs/bridge-hub-westend/src/weights/xcm/mod.rs @@ -272,4 +272,12 @@ impl XcmWeightInfo for BridgeHubWestendXcmWeight { fn execute_with_origin(_: &Option, _: &Xcm) -> Weight { XcmGeneric::::execute_with_origin() } + fn publish(_: &PublishData) -> Weight { + // TODO: Benchmark + Weight::from_parts(10_000_000, 0) + } + fn subscribe(_: &u32) -> Weight { + // TODO: Benchmark + Weight::from_parts(10_000_000, 0) + } } diff --git a/cumulus/parachains/runtimes/bridge-hubs/bridge-hub-westend/src/xcm_config.rs b/cumulus/parachains/runtimes/bridge-hubs/bridge-hub-westend/src/xcm_config.rs index d1b1e78ef8343..0d08e25f911a4 100644 --- a/cumulus/parachains/runtimes/bridge-hubs/bridge-hub-westend/src/xcm_config.rs +++ b/cumulus/parachains/runtimes/bridge-hubs/bridge-hub-westend/src/xcm_config.rs @@ -251,6 +251,7 @@ impl xcm_executor::Config for XcmConfig { type HrmpChannelAcceptedHandler = (); type HrmpChannelClosingHandler = (); type XcmRecorder = PolkadotXcm; + type BroadcastHandler = (); } pub type PriceForParentDelivery = diff --git a/cumulus/parachains/runtimes/collectives/collectives-westend/src/weights/cumulus_pallet_parachain_system.rs b/cumulus/parachains/runtimes/collectives/collectives-westend/src/weights/cumulus_pallet_parachain_system.rs index 9ebfbd2fbd0a3..9aeb5cc12b227 100644 --- a/cumulus/parachains/runtimes/collectives/collectives-westend/src/weights/cumulus_pallet_parachain_system.rs +++ b/cumulus/parachains/runtimes/collectives/collectives-westend/src/weights/cumulus_pallet_parachain_system.rs @@ -74,4 +74,23 @@ impl cumulus_pallet_parachain_system::WeightInfo for We .saturating_add(T::DbWeight::get().writes(4)) .saturating_add(T::DbWeight::get().writes((1_u64).saturating_mul(n.into()))) } + + /// Storage: ParachainSystem PreviousPublishedDataRoots (r:1 w:1) + /// Proof Skipped: ParachainSystem PreviousPublishedDataRoots (max_values: Some(1), max_size: None, mode: Measured) + /// Storage: ParachainSystem PublishedData (r:0 w:1600) + /// Proof Skipped: ParachainSystem PublishedData (max_values: None, max_size: None, mode: Measured) + /// The range of component `p` is `[1, 100]`. + /// The range of component `k` is `[1, 16]`. + /// The range of component `v` is `[1, 1024]`. + // TODO: Placeholder weight. Needs to be benchmarked for this specific runtime. + fn process_published_data(p: u32, k: u32, v: u32, ) -> Weight { + Weight::from_parts(5_000_000, 0) + .saturating_add(Weight::from_parts(50_000_000, 0).saturating_mul(p.into())) + .saturating_add(Weight::from_parts(10_000_000, 0).saturating_mul(k.into())) + .saturating_add(Weight::from_parts(5_000, 0).saturating_mul(v.into())) + .saturating_add(T::DbWeight::get().reads(1_u64)) + .saturating_add(T::DbWeight::get().writes(1_u64)) + .saturating_add(T::DbWeight::get().writes((2_u64).saturating_mul(p.into()))) + .saturating_add(T::DbWeight::get().writes((k as u64).saturating_mul(p.into()))) + } } diff --git a/cumulus/parachains/runtimes/collectives/collectives-westend/src/weights/xcm/mod.rs b/cumulus/parachains/runtimes/collectives/collectives-westend/src/weights/xcm/mod.rs index 7c44ce449383f..9c12de71ab0b2 100644 --- a/cumulus/parachains/runtimes/collectives/collectives-westend/src/weights/xcm/mod.rs +++ b/cumulus/parachains/runtimes/collectives/collectives-westend/src/weights/xcm/mod.rs @@ -270,4 +270,12 @@ impl XcmWeightInfo for CollectivesWestendXcmWeight { fn execute_with_origin(_: &Option, _: &Xcm) -> Weight { XcmGeneric::::execute_with_origin() } + fn publish(_: &PublishData) -> Weight { + // TODO: Benchmark + Weight::from_parts(10_000_000, 0) + } + fn subscribe(_: &u32) -> Weight { + // TODO: Benchmark + Weight::from_parts(10_000_000, 0) + } } diff --git a/cumulus/parachains/runtimes/collectives/collectives-westend/src/xcm_config.rs b/cumulus/parachains/runtimes/collectives/collectives-westend/src/xcm_config.rs index b3a7f2bd9af05..91c67bc8e641c 100644 --- a/cumulus/parachains/runtimes/collectives/collectives-westend/src/xcm_config.rs +++ b/cumulus/parachains/runtimes/collectives/collectives-westend/src/xcm_config.rs @@ -258,6 +258,7 @@ impl xcm_executor::Config for XcmConfig { type HrmpChannelAcceptedHandler = (); type HrmpChannelClosingHandler = (); type XcmRecorder = PolkadotXcm; + type BroadcastHandler = (); } /// Converts a local signed origin into an XCM location. Forms the basis for local origins diff --git a/cumulus/parachains/runtimes/coretime/coretime-rococo/src/weights/cumulus_pallet_parachain_system.rs b/cumulus/parachains/runtimes/coretime/coretime-rococo/src/weights/cumulus_pallet_parachain_system.rs index 73c4b2ba241d2..ea3e6abc4f2ab 100644 --- a/cumulus/parachains/runtimes/coretime/coretime-rococo/src/weights/cumulus_pallet_parachain_system.rs +++ b/cumulus/parachains/runtimes/coretime/coretime-rococo/src/weights/cumulus_pallet_parachain_system.rs @@ -74,4 +74,23 @@ impl cumulus_pallet_parachain_system::WeightInfo for We .saturating_add(T::DbWeight::get().writes(4)) .saturating_add(T::DbWeight::get().writes((1_u64).saturating_mul(n.into()))) } + + /// Storage: ParachainSystem PreviousPublishedDataRoots (r:1 w:1) + /// Proof Skipped: ParachainSystem PreviousPublishedDataRoots (max_values: Some(1), max_size: None, mode: Measured) + /// Storage: ParachainSystem PublishedData (r:0 w:1600) + /// Proof Skipped: ParachainSystem PublishedData (max_values: None, max_size: None, mode: Measured) + /// The range of component `p` is `[1, 100]`. + /// The range of component `k` is `[1, 16]`. + /// The range of component `v` is `[1, 1024]`. + // TODO: Placeholder weight. Needs to be benchmarked for this specific runtime. + fn process_published_data(p: u32, k: u32, v: u32, ) -> Weight { + Weight::from_parts(5_000_000, 0) + .saturating_add(Weight::from_parts(50_000_000, 0).saturating_mul(p.into())) + .saturating_add(Weight::from_parts(10_000_000, 0).saturating_mul(k.into())) + .saturating_add(Weight::from_parts(5_000, 0).saturating_mul(v.into())) + .saturating_add(T::DbWeight::get().reads(1_u64)) + .saturating_add(T::DbWeight::get().writes(1_u64)) + .saturating_add(T::DbWeight::get().writes((2_u64).saturating_mul(p.into()))) + .saturating_add(T::DbWeight::get().writes((k as u64).saturating_mul(p.into()))) + } } diff --git a/cumulus/parachains/runtimes/coretime/coretime-rococo/src/weights/xcm/mod.rs b/cumulus/parachains/runtimes/coretime/coretime-rococo/src/weights/xcm/mod.rs index ce2279e2ba8e8..b76880a1cc173 100644 --- a/cumulus/parachains/runtimes/coretime/coretime-rococo/src/weights/xcm/mod.rs +++ b/cumulus/parachains/runtimes/coretime/coretime-rococo/src/weights/xcm/mod.rs @@ -270,4 +270,12 @@ impl XcmWeightInfo for CoretimeRococoXcmWeight { fn execute_with_origin(_: &Option, _: &Xcm) -> Weight { XcmGeneric::::execute_with_origin() } + fn publish(_: &PublishData) -> Weight { + // TODO: Benchmark + Weight::from_parts(10_000_000, 0) + } + fn subscribe(_: &u32) -> Weight { + // TODO: Benchmark + Weight::from_parts(10_000_000, 0) + } } diff --git a/cumulus/parachains/runtimes/coretime/coretime-rococo/src/xcm_config.rs b/cumulus/parachains/runtimes/coretime/coretime-rococo/src/xcm_config.rs index 8cf14d103f1c5..b1ec55402994e 100644 --- a/cumulus/parachains/runtimes/coretime/coretime-rococo/src/xcm_config.rs +++ b/cumulus/parachains/runtimes/coretime/coretime-rococo/src/xcm_config.rs @@ -236,6 +236,7 @@ impl xcm_executor::Config for XcmConfig { type HrmpChannelAcceptedHandler = (); type HrmpChannelClosingHandler = (); type XcmRecorder = PolkadotXcm; + type BroadcastHandler = (); } /// Converts a local signed origin into an XCM location. Forms the basis for local origins diff --git a/cumulus/parachains/runtimes/coretime/coretime-westend/src/weights/cumulus_pallet_parachain_system.rs b/cumulus/parachains/runtimes/coretime/coretime-westend/src/weights/cumulus_pallet_parachain_system.rs index 8f5714bbe0cd7..2c34ccd856ca6 100644 --- a/cumulus/parachains/runtimes/coretime/coretime-westend/src/weights/cumulus_pallet_parachain_system.rs +++ b/cumulus/parachains/runtimes/coretime/coretime-westend/src/weights/cumulus_pallet_parachain_system.rs @@ -74,4 +74,23 @@ impl cumulus_pallet_parachain_system::WeightInfo for We .saturating_add(T::DbWeight::get().writes(4)) .saturating_add(T::DbWeight::get().writes((1_u64).saturating_mul(n.into()))) } + + /// Storage: ParachainSystem PreviousPublishedDataRoots (r:1 w:1) + /// Proof Skipped: ParachainSystem PreviousPublishedDataRoots (max_values: Some(1), max_size: None, mode: Measured) + /// Storage: ParachainSystem PublishedData (r:0 w:1600) + /// Proof Skipped: ParachainSystem PublishedData (max_values: None, max_size: None, mode: Measured) + /// The range of component `p` is `[1, 100]`. + /// The range of component `k` is `[1, 16]`. + /// The range of component `v` is `[1, 1024]`. + // TODO: Placeholder weight. Needs to be benchmarked for this specific runtime. + fn process_published_data(p: u32, k: u32, v: u32, ) -> Weight { + Weight::from_parts(5_000_000, 0) + .saturating_add(Weight::from_parts(50_000_000, 0).saturating_mul(p.into())) + .saturating_add(Weight::from_parts(10_000_000, 0).saturating_mul(k.into())) + .saturating_add(Weight::from_parts(5_000, 0).saturating_mul(v.into())) + .saturating_add(T::DbWeight::get().reads(1_u64)) + .saturating_add(T::DbWeight::get().writes(1_u64)) + .saturating_add(T::DbWeight::get().writes((2_u64).saturating_mul(p.into()))) + .saturating_add(T::DbWeight::get().writes((k as u64).saturating_mul(p.into()))) + } } diff --git a/cumulus/parachains/runtimes/coretime/coretime-westend/src/weights/xcm/mod.rs b/cumulus/parachains/runtimes/coretime/coretime-westend/src/weights/xcm/mod.rs index 75e0908cb395d..165955277e4c5 100644 --- a/cumulus/parachains/runtimes/coretime/coretime-westend/src/weights/xcm/mod.rs +++ b/cumulus/parachains/runtimes/coretime/coretime-westend/src/weights/xcm/mod.rs @@ -269,4 +269,12 @@ impl XcmWeightInfo for CoretimeWestendXcmWeight { fn execute_with_origin(_: &Option, _: &Xcm) -> Weight { XcmGeneric::::execute_with_origin() } + fn publish(_: &PublishData) -> Weight { + // TODO: Benchmark + Weight::from_parts(10_000_000, 0) + } + fn subscribe(_: &u32) -> Weight { + // TODO: Benchmark + Weight::from_parts(10_000_000, 0) + } } diff --git a/cumulus/parachains/runtimes/coretime/coretime-westend/src/xcm_config.rs b/cumulus/parachains/runtimes/coretime/coretime-westend/src/xcm_config.rs index 391972f24572c..1fe19b7c953b4 100644 --- a/cumulus/parachains/runtimes/coretime/coretime-westend/src/xcm_config.rs +++ b/cumulus/parachains/runtimes/coretime/coretime-westend/src/xcm_config.rs @@ -272,6 +272,7 @@ impl xcm_executor::Config for XcmConfig { type HrmpChannelAcceptedHandler = (); type HrmpChannelClosingHandler = (); type XcmRecorder = PolkadotXcm; + type BroadcastHandler = (); } /// Converts a local signed origin into an XCM location. Forms the basis for local origins diff --git a/cumulus/parachains/runtimes/glutton/glutton-westend/src/weights/cumulus_pallet_parachain_system.rs b/cumulus/parachains/runtimes/glutton/glutton-westend/src/weights/cumulus_pallet_parachain_system.rs index a753f6fc78f87..e38525e1c9c79 100644 --- a/cumulus/parachains/runtimes/glutton/glutton-westend/src/weights/cumulus_pallet_parachain_system.rs +++ b/cumulus/parachains/runtimes/glutton/glutton-westend/src/weights/cumulus_pallet_parachain_system.rs @@ -75,4 +75,23 @@ impl cumulus_pallet_parachain_system::WeightInfo for We .saturating_add(T::DbWeight::get().writes(4)) .saturating_add(T::DbWeight::get().writes((1_u64).saturating_mul(n.into()))) } + + /// Storage: ParachainSystem PreviousPublishedDataRoots (r:1 w:1) + /// Proof Skipped: ParachainSystem PreviousPublishedDataRoots (max_values: Some(1), max_size: None, mode: Measured) + /// Storage: ParachainSystem PublishedData (r:0 w:1600) + /// Proof Skipped: ParachainSystem PublishedData (max_values: None, max_size: None, mode: Measured) + /// The range of component `p` is `[1, 100]`. + /// The range of component `k` is `[1, 16]`. + /// The range of component `v` is `[1, 1024]`. + // TODO: Placeholder weight. Needs to be benchmarked for this specific runtime. + fn process_published_data(p: u32, k: u32, v: u32, ) -> Weight { + Weight::from_parts(5_000_000, 0) + .saturating_add(Weight::from_parts(50_000_000, 0).saturating_mul(p.into())) + .saturating_add(Weight::from_parts(10_000_000, 0).saturating_mul(k.into())) + .saturating_add(Weight::from_parts(5_000, 0).saturating_mul(v.into())) + .saturating_add(T::DbWeight::get().reads(1_u64)) + .saturating_add(T::DbWeight::get().writes(1_u64)) + .saturating_add(T::DbWeight::get().writes((2_u64).saturating_mul(p.into()))) + .saturating_add(T::DbWeight::get().writes((k as u64).saturating_mul(p.into()))) + } } diff --git a/cumulus/parachains/runtimes/glutton/glutton-westend/src/xcm_config.rs b/cumulus/parachains/runtimes/glutton/glutton-westend/src/xcm_config.rs index f32cb211444c2..53dc0c85c2422 100644 --- a/cumulus/parachains/runtimes/glutton/glutton-westend/src/xcm_config.rs +++ b/cumulus/parachains/runtimes/glutton/glutton-westend/src/xcm_config.rs @@ -98,6 +98,7 @@ impl xcm_executor::Config for XcmConfig { type HrmpChannelAcceptedHandler = (); type HrmpChannelClosingHandler = (); type XcmRecorder = (); + type BroadcastHandler = (); } impl cumulus_pallet_xcm::Config for Runtime { diff --git a/cumulus/parachains/runtimes/people/people-rococo/src/weights/cumulus_pallet_parachain_system.rs b/cumulus/parachains/runtimes/people/people-rococo/src/weights/cumulus_pallet_parachain_system.rs index 58aef8cd5ab87..5d0e510624d47 100644 --- a/cumulus/parachains/runtimes/people/people-rococo/src/weights/cumulus_pallet_parachain_system.rs +++ b/cumulus/parachains/runtimes/people/people-rococo/src/weights/cumulus_pallet_parachain_system.rs @@ -74,4 +74,23 @@ impl cumulus_pallet_parachain_system::WeightInfo for We .saturating_add(T::DbWeight::get().writes(4)) .saturating_add(T::DbWeight::get().writes((1_u64).saturating_mul(n.into()))) } + + /// Storage: ParachainSystem PreviousPublishedDataRoots (r:1 w:1) + /// Proof Skipped: ParachainSystem PreviousPublishedDataRoots (max_values: Some(1), max_size: None, mode: Measured) + /// Storage: ParachainSystem PublishedData (r:0 w:1600) + /// Proof Skipped: ParachainSystem PublishedData (max_values: None, max_size: None, mode: Measured) + /// The range of component `p` is `[1, 100]`. + /// The range of component `k` is `[1, 16]`. + /// The range of component `v` is `[1, 1024]`. + // TODO: Placeholder weight. Needs to be benchmarked for this specific runtime. + fn process_published_data(p: u32, k: u32, v: u32, ) -> Weight { + Weight::from_parts(5_000_000, 0) + .saturating_add(Weight::from_parts(50_000_000, 0).saturating_mul(p.into())) + .saturating_add(Weight::from_parts(10_000_000, 0).saturating_mul(k.into())) + .saturating_add(Weight::from_parts(5_000, 0).saturating_mul(v.into())) + .saturating_add(T::DbWeight::get().reads(1_u64)) + .saturating_add(T::DbWeight::get().writes(1_u64)) + .saturating_add(T::DbWeight::get().writes((2_u64).saturating_mul(p.into()))) + .saturating_add(T::DbWeight::get().writes((k as u64).saturating_mul(p.into()))) + } } diff --git a/cumulus/parachains/runtimes/people/people-rococo/src/weights/xcm/mod.rs b/cumulus/parachains/runtimes/people/people-rococo/src/weights/xcm/mod.rs index 41c773db7c766..149eb9582b565 100644 --- a/cumulus/parachains/runtimes/people/people-rococo/src/weights/xcm/mod.rs +++ b/cumulus/parachains/runtimes/people/people-rococo/src/weights/xcm/mod.rs @@ -269,4 +269,12 @@ impl XcmWeightInfo for PeopleRococoXcmWeight { fn execute_with_origin(_: &Option, _: &Xcm) -> Weight { XcmGeneric::::execute_with_origin() } + fn publish(_: &PublishData) -> Weight { + // TODO: Benchmark + Weight::from_parts(10_000_000, 0) + } + fn subscribe(_: &u32) -> Weight { + // TODO: Benchmark + Weight::from_parts(10_000_000, 0) + } } diff --git a/cumulus/parachains/runtimes/people/people-rococo/src/xcm_config.rs b/cumulus/parachains/runtimes/people/people-rococo/src/xcm_config.rs index 8f2a89a268ee2..7885da99dc00c 100644 --- a/cumulus/parachains/runtimes/people/people-rococo/src/xcm_config.rs +++ b/cumulus/parachains/runtimes/people/people-rococo/src/xcm_config.rs @@ -237,6 +237,7 @@ impl xcm_executor::Config for XcmConfig { type HrmpChannelAcceptedHandler = (); type HrmpChannelClosingHandler = (); type XcmRecorder = PolkadotXcm; + type BroadcastHandler = (); } /// Converts a local signed origin into an XCM location. Forms the basis for local origins diff --git a/cumulus/parachains/runtimes/people/people-westend/src/weights/cumulus_pallet_parachain_system.rs b/cumulus/parachains/runtimes/people/people-westend/src/weights/cumulus_pallet_parachain_system.rs index 05c07f998e8e2..4091c2270e1eb 100644 --- a/cumulus/parachains/runtimes/people/people-westend/src/weights/cumulus_pallet_parachain_system.rs +++ b/cumulus/parachains/runtimes/people/people-westend/src/weights/cumulus_pallet_parachain_system.rs @@ -74,4 +74,23 @@ impl cumulus_pallet_parachain_system::WeightInfo for We .saturating_add(T::DbWeight::get().writes(4)) .saturating_add(T::DbWeight::get().writes((1_u64).saturating_mul(n.into()))) } + + /// Storage: ParachainSystem PreviousPublishedDataRoots (r:1 w:1) + /// Proof Skipped: ParachainSystem PreviousPublishedDataRoots (max_values: Some(1), max_size: None, mode: Measured) + /// Storage: ParachainSystem PublishedData (r:0 w:1600) + /// Proof Skipped: ParachainSystem PublishedData (max_values: None, max_size: None, mode: Measured) + /// The range of component `p` is `[1, 100]`. + /// The range of component `k` is `[1, 16]`. + /// The range of component `v` is `[1, 1024]`. + // TODO: Placeholder weight. Needs to be benchmarked for this specific runtime. + fn process_published_data(p: u32, k: u32, v: u32, ) -> Weight { + Weight::from_parts(5_000_000, 0) + .saturating_add(Weight::from_parts(50_000_000, 0).saturating_mul(p.into())) + .saturating_add(Weight::from_parts(10_000_000, 0).saturating_mul(k.into())) + .saturating_add(Weight::from_parts(5_000, 0).saturating_mul(v.into())) + .saturating_add(T::DbWeight::get().reads(1_u64)) + .saturating_add(T::DbWeight::get().writes(1_u64)) + .saturating_add(T::DbWeight::get().writes((2_u64).saturating_mul(p.into()))) + .saturating_add(T::DbWeight::get().writes((k as u64).saturating_mul(p.into()))) + } } diff --git a/cumulus/parachains/runtimes/people/people-westend/src/weights/xcm/mod.rs b/cumulus/parachains/runtimes/people/people-westend/src/weights/xcm/mod.rs index a7d394b603b2f..d447596c998b8 100644 --- a/cumulus/parachains/runtimes/people/people-westend/src/weights/xcm/mod.rs +++ b/cumulus/parachains/runtimes/people/people-westend/src/weights/xcm/mod.rs @@ -268,4 +268,12 @@ impl XcmWeightInfo for PeopleWestendXcmWeight { fn execute_with_origin(_: &Option, _: &Xcm) -> Weight { XcmGeneric::::execute_with_origin() } + fn publish(_: &PublishData) -> Weight { + // TODO: Benchmark + Weight::from_parts(10_000_000, 0) + } + fn subscribe(_: &u32) -> Weight { + // TODO: Benchmark + Weight::from_parts(10_000_000, 0) + } } diff --git a/cumulus/parachains/runtimes/people/people-westend/src/xcm_config.rs b/cumulus/parachains/runtimes/people/people-westend/src/xcm_config.rs index e5203f39c8814..c41a07142be97 100644 --- a/cumulus/parachains/runtimes/people/people-westend/src/xcm_config.rs +++ b/cumulus/parachains/runtimes/people/people-westend/src/xcm_config.rs @@ -278,6 +278,7 @@ impl xcm_executor::Config for XcmConfig { type HrmpChannelAcceptedHandler = (); type HrmpChannelClosingHandler = (); type XcmRecorder = PolkadotXcm; + type BroadcastHandler = (); } /// Converts a local signed origin into an XCM location. Forms the basis for local origins diff --git a/cumulus/parachains/runtimes/testing/penpal/src/xcm_config.rs b/cumulus/parachains/runtimes/testing/penpal/src/xcm_config.rs index f8a9cdbdf56c8..e92e8c8b49166 100644 --- a/cumulus/parachains/runtimes/testing/penpal/src/xcm_config.rs +++ b/cumulus/parachains/runtimes/testing/penpal/src/xcm_config.rs @@ -441,6 +441,7 @@ impl xcm_executor::Config for XcmConfig { type HrmpChannelAcceptedHandler = (); type HrmpChannelClosingHandler = (); type XcmRecorder = PolkadotXcm; + type BroadcastHandler = (); } /// Multiplier used for dedicated `TakeFirstAssetTrader` with `ForeignAssets` instance. diff --git a/cumulus/parachains/runtimes/testing/rococo-parachain/src/lib.rs b/cumulus/parachains/runtimes/testing/rococo-parachain/src/lib.rs index 12a322534da5a..2a3faf351032f 100644 --- a/cumulus/parachains/runtimes/testing/rococo-parachain/src/lib.rs +++ b/cumulus/parachains/runtimes/testing/rococo-parachain/src/lib.rs @@ -504,6 +504,7 @@ impl xcm_executor::Config for XcmConfig { type HrmpChannelAcceptedHandler = (); type HrmpChannelClosingHandler = (); type XcmRecorder = PolkadotXcm; + type BroadcastHandler = (); } /// Converts a local signed origin into an XCM location. Forms the basis for local origins diff --git a/cumulus/parachains/runtimes/testing/yet-another-parachain/src/xcm_config.rs b/cumulus/parachains/runtimes/testing/yet-another-parachain/src/xcm_config.rs index c1b83f5dbd74e..4d783f6fe6739 100644 --- a/cumulus/parachains/runtimes/testing/yet-another-parachain/src/xcm_config.rs +++ b/cumulus/parachains/runtimes/testing/yet-another-parachain/src/xcm_config.rs @@ -165,6 +165,7 @@ impl xcm_executor::Config for XcmConfig { type HrmpChannelAcceptedHandler = (); type HrmpChannelClosingHandler = (); type XcmRecorder = PolkadotXcm; + type BroadcastHandler = (); } /// No local origins on this chain are allowed to dispatch XCM sends/executions. diff --git a/cumulus/primitives/parachain-inherent/src/lib.rs b/cumulus/primitives/parachain-inherent/src/lib.rs index 3eec67f1adf78..a4fa10e29b233 100644 --- a/cumulus/primitives/parachain-inherent/src/lib.rs +++ b/cumulus/primitives/parachain-inherent/src/lib.rs @@ -120,6 +120,9 @@ pub struct ParachainInherentData { /// Contains the collator peer ID, which is later sent by the parachain to the /// relay chain via a UMP signal to promote the reputation of the given peer ID. pub collator_peer_id: Option, + /// Published data from the broadcaster pallet on the relay chain. + /// Key: Publisher ParaId, Value: Vector of (key, value) pairs published by that parachain. + pub published_data: BTreeMap, Vec)>>, } // Upgrades the ParachainInherentData v0 to the newest format. @@ -132,6 +135,7 @@ impl Into for v0::ParachainInherentData { horizontal_messages: self.horizontal_messages, relay_parent_descendants: Vec::new(), collator_peer_id: None, + published_data: BTreeMap::new(), } } } diff --git a/cumulus/test/client/src/block_builder.rs b/cumulus/test/client/src/block_builder.rs index 6672a19e35257..045051fca514b 100644 --- a/cumulus/test/client/src/block_builder.rs +++ b/cumulus/test/client/src/block_builder.rs @@ -180,6 +180,7 @@ fn init_block_builder( horizontal_messages: Default::default(), relay_parent_descendants: Default::default(), collator_peer_id: None, + published_data: Default::default(), }, ) .expect("Put validation function params failed"); diff --git a/cumulus/test/relay-sproof-builder/src/lib.rs b/cumulus/test/relay-sproof-builder/src/lib.rs index 472ea9ec9e029..bb908a556af80 100644 --- a/cumulus/test/relay-sproof-builder/src/lib.rs +++ b/cumulus/test/relay-sproof-builder/src/lib.rs @@ -49,6 +49,7 @@ pub struct RelayStateSproofBuilder { pub randomness: relay_chain::Hash, pub additional_key_values: Vec<(Vec, Vec)>, pub included_para_head: Option, + pub published_data_roots: Option)>>, } impl Default for RelayStateSproofBuilder { @@ -81,6 +82,7 @@ impl Default for RelayStateSproofBuilder { randomness: relay_chain::Hash::default(), additional_key_values: vec![], included_para_head: None, + published_data_roots: None, } } } @@ -203,6 +205,13 @@ impl RelayStateSproofBuilder { ); insert(relay_chain::well_known_keys::CURRENT_SLOT.to_vec(), self.current_slot.encode()); + if let Some(published_data_roots) = self.published_data_roots { + insert( + relay_chain::well_known_keys::BROADCASTER_PUBLISHED_DATA_ROOTS.to_vec(), + published_data_roots.encode(), + ); + } + for (key, value) in self.additional_key_values { insert(key, value); } diff --git a/cumulus/test/service/src/bench_utils.rs b/cumulus/test/service/src/bench_utils.rs index d351e78a33904..6e49af1f33178 100644 --- a/cumulus/test/service/src/bench_utils.rs +++ b/cumulus/test/service/src/bench_utils.rs @@ -107,11 +107,15 @@ pub fn extrinsic_set_validation_data( horizontal_messages: Default::default(), }; + use cumulus_pallet_parachain_system::parachain_inherent::InboundPublishedData; + let published_data = InboundPublishedData::new(Default::default()); + cumulus_test_runtime::UncheckedExtrinsic::new_bare( cumulus_test_runtime::RuntimeCall::ParachainSystem( cumulus_pallet_parachain_system::Call::set_validation_data { data, inbound_messages_data, + published_data, }, ), ) diff --git a/cumulus/xcm/xcm-emulator/src/lib.rs b/cumulus/xcm/xcm-emulator/src/lib.rs index 51d7c00d9aa9d..a77c3d337e6f6 100644 --- a/cumulus/xcm/xcm-emulator/src/lib.rs +++ b/cumulus/xcm/xcm-emulator/src/lib.rs @@ -1241,6 +1241,7 @@ macro_rules! decl_test_networks { horizontal_messages: Default::default(), relay_parent_descendants: Default::default(), collator_peer_id: None, + published_data: Default::default(), } } } diff --git a/polkadot/primitives/src/runtime_api.rs b/polkadot/primitives/src/runtime_api.rs index 518a828e7e0e8..4493038a3e05e 100644 --- a/polkadot/primitives/src/runtime_api.rs +++ b/polkadot/primitives/src/runtime_api.rs @@ -321,5 +321,11 @@ sp_api::decl_runtime_apis! { /// Returns a list of validators that lost a past session dispute and need to be slashed. #[api_version(15)] fn unapplied_slashes_v2() -> Vec<(SessionIndex, CandidateHash, slashing::PendingSlashes)>; + + /// Get published data from all parachains that the subscriber is subscribed to. + /// Returns a map of Publisher ParaId -> published data. + /// Only includes publishers that have actual data and are subscribed to. + #[api_version(16)] + fn get_subscribed_data(subscriber_para_id: ppp::Id) -> BTreeMap, Vec)>>; } } diff --git a/polkadot/primitives/src/v9/mod.rs b/polkadot/primitives/src/v9/mod.rs index 360da8ff9b956..b78d8a9781d3c 100644 --- a/polkadot/primitives/src/v9/mod.rs +++ b/polkadot/primitives/src/v9/mod.rs @@ -263,6 +263,13 @@ pub mod well_known_keys { pub const NEXT_AUTHORITIES: &[u8] = &hex!["1cb6f36e027abb2091cfb5110ab5087faacf00b9b41fda7a9268821c2a2b3e4c"]; + /// Published data roots from the broadcaster pallet. + /// + /// The storage entry should be accessed as `Vec<(ParaId, Vec)>` encoded value + /// where Vec is the child trie root hash for each publisher. + pub const BROADCASTER_PUBLISHED_DATA_ROOTS: &[u8] = + &hex!["6aca18c1f7576767ccb238db4ccaedf239166324ac7ea24c870f96ab961f9654"]; + /// Hash of the committed head data for a given registered para. /// /// The storage entry stores wrapped `HeadData(Vec)`. diff --git a/polkadot/runtime/parachains/src/broadcaster/mod.rs b/polkadot/runtime/parachains/src/broadcaster/mod.rs new file mode 100644 index 0000000000000..8e40aedd8ac63 --- /dev/null +++ b/polkadot/runtime/parachains/src/broadcaster/mod.rs @@ -0,0 +1,397 @@ +// Copyright (C) Parity Technologies (UK) Ltd. +// This file is part of Polkadot. + +// Polkadot is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Polkadot is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Polkadot. If not, see . + +//! A pallet for managing parachain data publishing and subscription. +//! +//! This pallet provides a publish-subscribe mechanism for parachains to share data +//! efficiently through the relay chain storage using child tries per publisher. +//! +//! ## Storage Lifecycle +//! +//! Note: This pallet does not currently implement publisher removal or cleanup mechanisms. +//! Once a parachain publishes data, it remains in storage. Publishers can update their data +//! by publishing again, but there is no explicit removal path. + +use alloc::{collections::BTreeMap, vec::Vec}; +use frame_support::{ + pallet_prelude::*, + storage::child::ChildInfo, + traits::{defensive_prelude::*, Get, ConstU32}, +}; +use frame_system::pallet_prelude::BlockNumberFor; +use polkadot_primitives::Id as ParaId; + +pub use pallet::*; + +mod traits; +pub use traits::PublishSubscribe; + +#[cfg(test)] +mod tests; + +#[frame_support::pallet] +pub mod pallet { + use super::*; + + const STORAGE_VERSION: StorageVersion = StorageVersion::new(0); + + #[pallet::pallet] + #[pallet::storage_version(STORAGE_VERSION)] + pub struct Pallet(_); + + #[pallet::config] + pub trait Config: frame_system::Config { + /// Maximum number of items that can be published in one operation. + /// Must not exceed `xcm::v5::MaxPublishItems`. + #[pallet::constant] + type MaxPublishItems: Get; + + /// Maximum length of a key in bytes. + /// Must not exceed `xcm::v5::MaxPublishKeyLength`. + #[pallet::constant] + type MaxKeyLength: Get; + + /// Maximum length of a value in bytes. + /// Must not exceed `xcm::v5::MaxPublishValueLength`. + #[pallet::constant] + type MaxValueLength: Get; + + /// Maximum number of unique keys a publisher can have stored across all publishes. + #[pallet::constant] + type MaxStoredKeys: Get; + + /// Maximum number of publishers a subscriber can subscribe to. + #[pallet::constant] + type MaxSubscriptions: Get; + + /// Maximum number of publishers that can have published data. + #[pallet::constant] + type MaxPublishers: Get; + } + + #[pallet::event] + #[pallet::generate_deposit(pub(super) fn deposit_event)] + pub enum Event { + /// Data published by a parachain. + DataPublished { publisher: ParaId, items_count: u32 }, + /// Parachain subscribed to a publisher. + Subscribed { subscriber: ParaId, publisher: ParaId }, + /// Parachain unsubscribed from a publisher. + Unsubscribed { subscriber: ParaId, publisher: ParaId }, + } + + /// Tracks which parachains have published data. + /// + /// Maps parachain ID to a boolean indicating whether they have a child trie. + /// The actual child trie info is derived deterministically from the ParaId. + #[pallet::storage] + pub type PublisherExists = StorageMap< + _, + Twox64Concat, + ParaId, + bool, + ValueQuery, + >; + + /// Tracks all published keys per parachain. + #[pallet::storage] + pub type PublishedKeys = StorageMap< + _, + Twox64Concat, + ParaId, + BoundedBTreeSet, T::MaxStoredKeys>, + ValueQuery, + >; + + /// Tracks subscriptions: subscriber -> list of publishers. + /// + /// Maps subscriber ParaId to a bounded vector of publisher ParaIds. + /// Empty vec means no subscriptions. + #[pallet::storage] + pub type Subscriptions = StorageMap< + _, + Twox64Concat, + ParaId, // Subscriber + BoundedVec, // List of publishers + ValueQuery, + >; + + /// Aggregated child trie roots for all publishers. + /// + /// Contains (ParaId, child_trie_root) pairs for all parachains that have published data. + /// This is used in relay chain storage proofs to efficiently provide all publisher roots. + #[pallet::storage] + pub type PublishedDataRoots = StorageValue< + _, + BoundedVec<(ParaId, BoundedVec>), T::MaxPublishers>, + ValueQuery, + >; + + #[pallet::error] + pub enum Error { + /// Too many items in a single publish operation. + TooManyPublishItems, + /// Key length exceeds maximum allowed. + KeyTooLong, + /// Value length exceeds maximum allowed. + ValueTooLong, + /// Too many unique keys stored for this publisher. + TooManyStoredKeys, + /// Too many subscriptions for this subscriber. + TooManySubscriptions, + } + + #[pallet::hooks] + impl Hooks> for Pallet { + fn integrity_test() { + assert!( + T::MaxPublishItems::get() <= xcm::v5::MaxPublishItems::get(), + "Broadcaster MaxPublishItems exceeds XCM MaxPublishItems upper bound" + ); + assert!( + T::MaxKeyLength::get() <= xcm::v5::MaxPublishKeyLength::get(), + "Broadcaster MaxKeyLength exceeds XCM MaxPublishKeyLength upper bound" + ); + assert!( + T::MaxValueLength::get() <= xcm::v5::MaxPublishValueLength::get(), + "Broadcaster MaxValueLength exceeds XCM MaxPublishValueLength upper bound" + ); + assert_eq!( + &PublishedDataRoots::::hashed_key(), + polkadot_primitives::well_known_keys::BROADCASTER_PUBLISHED_DATA_ROOTS, + "`well_known_keys::BROADCASTER_PUBLISHED_DATA_ROOTS` doesn't match key of `PublishedDataRoots`! \ + Make sure that the name of the broadcaster pallet is `Broadcaster` in the runtime!", + ); + } + } + + impl Pallet { + /// Process a publish operation from a parachain. + /// + /// Stores the provided key-value pairs in the publisher's child trie. + pub fn handle_publish( + origin_para_id: ParaId, + data: Vec<(Vec, Vec)>, + ) -> DispatchResult { + let items_count = data.len() as u32; + + // Validate input limits first before making any changes + ensure!( + data.len() <= T::MaxPublishItems::get() as usize, + Error::::TooManyPublishItems + ); + + // Validate all keys and values before creating publisher entry + for (key, value) in &data { + ensure!( + key.len() <= T::MaxKeyLength::get() as usize, + Error::::KeyTooLong + ); + ensure!( + value.len() <= T::MaxValueLength::get() as usize, + Error::::ValueTooLong + ); + } + + // All validation passed, now get or create child trie info for this publisher + let child_info = Self::get_or_create_publisher_child_info(origin_para_id); + + // Get current published keys set for tracking + let mut published_keys = PublishedKeys::::get(origin_para_id); + + // Check if adding new keys would exceed MaxStoredKeys limit + // Count how many unique new keys we're adding + let mut new_keys_count = 0u32; + for (key, _) in &data { + if let Ok(bounded_key) = BoundedVec::try_from(key.clone()) { + if !published_keys.contains(&bounded_key) { + new_keys_count += 1; + } + } + } + + // Ensure we won't exceed the total stored keys limit + let current_keys_count = published_keys.len() as u32; + ensure!( + current_keys_count.saturating_add(new_keys_count) <= T::MaxStoredKeys::get(), + Error::::TooManyStoredKeys + ); + + // Store each key-value pair in the child trie and track the key + for (key, value) in data { + frame_support::storage::child::put(&child_info, &key, &value); + + // Track the key for enumeration (convert to BoundedVec) + if let Ok(bounded_key) = BoundedVec::try_from(key) { + // This should never fail now since we checked the limit above + published_keys.try_insert(bounded_key).defensive_ok(); + } + } + + // Update the published keys storage + PublishedKeys::::insert(origin_para_id, published_keys); + + // Calculate and update the child trie root for this publisher + let child_root = frame_support::storage::child::root(&child_info, + sp_runtime::StateVersion::V1); + + // Update the aggregated roots storage + let mut roots = PublishedDataRoots::::get(); + + // Convert child_root once + if let Ok(bounded_root) = BoundedVec::try_from(child_root) { + // Find and update existing entry or add new one + if let Some((_, root_hash)) = roots.iter_mut().find(|(para_id, _)| *para_id == origin_para_id) { + *root_hash = bounded_root; + } else { + // Not found, add new entry + roots.try_push((origin_para_id, bounded_root)).defensive_ok(); + } + } + + PublishedDataRoots::::put(roots); + + Self::deposit_event(Event::DataPublished { publisher: origin_para_id, items_count }); + + Ok(()) + } + + /// Toggle subscription approach. + /// Subscribe if not subscribed, unsubscribe if subscribed. + pub fn handle_subscribe_toggle( + subscriber: ParaId, + publisher: ParaId, + ) -> DispatchResult { + let mut subscriptions = Subscriptions::::get(subscriber); + + // Check if already subscribed + let event = if let Some(pos) = subscriptions.iter().position(|&p| p == publisher) { + // Already subscribed -> unsubscribe + subscriptions.swap_remove(pos); + Event::Unsubscribed { subscriber, publisher } + } else { + // Not subscribed -> subscribe + subscriptions.try_push(publisher).map_err(|_| Error::::TooManySubscriptions)?; + Event::Subscribed { subscriber, publisher } + }; + + Subscriptions::::insert(subscriber, subscriptions); + Self::deposit_event(event); + + Ok(()) + } + + /// Get or create child trie info for a publisher. + fn get_or_create_publisher_child_info(para_id: ParaId) -> ChildInfo { + if !PublisherExists::::contains_key(para_id) { + PublisherExists::::insert(para_id, true); + } + Self::derive_child_info(para_id) + } + + /// Derive a deterministic child trie identifier from parachain ID. + pub fn derive_child_info(para_id: ParaId) -> ChildInfo { + const PREFIX: &[u8] = b"pubsub"; + let encoded = para_id.encode(); + + let mut key = Vec::with_capacity(PREFIX.len() + encoded.len()); + key.extend_from_slice(PREFIX); + key.extend_from_slice(&encoded); + + ChildInfo::new_default(&key) + } + + /// Retrieve a value from a publisher's child trie. + /// + /// Returns None if the publisher doesn't exist or the key is not found. + pub fn get_published_value(para_id: ParaId, key: &[u8]) -> Option> { + PublisherExists::::get(para_id).then(|| { + let child_info = Self::derive_child_info(para_id); + frame_support::storage::child::get(&child_info, key) + })? + } + + /// Get all published data for a parachain. + pub fn get_all_published_data(para_id: ParaId) -> Vec<(Vec, Vec)> { + if !PublisherExists::::get(para_id) { + return Vec::new(); + } + + let child_info = Self::derive_child_info(para_id); + let published_keys = PublishedKeys::::get(para_id); + + published_keys + .into_iter() + .filter_map(|bounded_key| { + let key: Vec = bounded_key.into(); + frame_support::storage::child::get(&child_info, &key) + .map(|value| (key, value)) + }) + .collect() + } + + /// Get list of all parachains that have published data. + pub fn get_all_publishers() -> Vec { + PublisherExists::::iter_keys().collect() + } + + /// Get published data from all publishers. + /// Returns a map of Publisher ParaId -> published data. + /// Only includes publishers that have actual data. + pub fn get_all_published_data_map() -> BTreeMap, Vec)>> { + Self::get_all_publishers() + .into_iter() + .filter_map(|publisher| { + let data = Self::get_all_published_data(publisher); + (!data.is_empty()).then_some((publisher, data)) + }) + .collect() + } + + /// Get all subscriptions for a parachain. + pub fn get_subscriptions(subscriber: ParaId) -> Vec { + Subscriptions::::get(subscriber).into_inner() + } + + /// Check if a parachain is subscribed to a publisher. + pub fn is_subscribed(subscriber: ParaId, publisher: ParaId) -> bool { + Subscriptions::::get(subscriber).contains(&publisher) + } + + /// Get published data from all parachains that the subscriber is subscribed to. + /// Returns a map of Publisher ParaId -> published data. + /// Only includes publishers that have actual data and are subscribed to. + pub fn get_subscribed_data(subscriber_para_id: ParaId) -> BTreeMap, Vec)>> { + Subscriptions::::get(subscriber_para_id) + .into_iter() + .filter_map(|publisher| { + let data = Self::get_all_published_data(publisher); + (!data.is_empty()).then_some((publisher, data)) + }) + .collect() + } + } +} + +impl PublishSubscribe for Pallet { + fn publish_data(publisher: ParaId, data: Vec<(Vec, Vec)>) -> DispatchResult { + Self::handle_publish(publisher, data) + } + + fn toggle_subscription(subscriber: ParaId, publisher: ParaId) -> DispatchResult { + Self::handle_subscribe_toggle(subscriber, publisher) + } +} diff --git a/polkadot/runtime/parachains/src/broadcaster/tests.rs b/polkadot/runtime/parachains/src/broadcaster/tests.rs new file mode 100644 index 0000000000000..711f6ec1fc669 --- /dev/null +++ b/polkadot/runtime/parachains/src/broadcaster/tests.rs @@ -0,0 +1,352 @@ +// Copyright (C) Parity Technologies (UK) Ltd. +// This file is part of Polkadot. + +// Polkadot is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Polkadot is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Polkadot. If not, see . + +use super::*; +use crate::mock::{new_test_ext, Broadcaster, Test}; +use frame_support::{assert_err, assert_ok}; +use polkadot_primitives::Id as ParaId; + +#[test] +fn publish_store_retrieve_and_update_data() { + new_test_ext(Default::default()).execute_with(|| { + let para_id = ParaId::from(1000); + + // Publisher doesn't exist + assert!(!PublisherExists::::get(para_id)); + + // Publish initial data + let initial_data = + vec![(b"key1".to_vec(), b"value1".to_vec()), (b"key2".to_vec(), b"value2".to_vec())]; + Broadcaster::handle_publish(para_id, initial_data.clone()).unwrap(); + + // Verify publisher exists + assert!(PublisherExists::::get(para_id)); + + // Verify the actual stored data matches what was published + assert_eq!(Broadcaster::get_published_value(para_id, b"key1"), Some(b"value1".to_vec())); + assert_eq!(Broadcaster::get_published_value(para_id, b"key2"), Some(b"value2".to_vec())); + + // Non-existent key should return None + assert_eq!(Broadcaster::get_published_value(para_id, b"key3"), None); + + // Update existing key and add new key + let update_data = vec![ + (b"key1".to_vec(), b"updated_value1".to_vec()), + (b"key3".to_vec(), b"value3".to_vec()), + ]; + Broadcaster::handle_publish(para_id, update_data).unwrap(); + + // Verify the data was correctly updated + assert_eq!( + Broadcaster::get_published_value(para_id, b"key1"), + Some(b"updated_value1".to_vec()) + ); + assert_eq!( + Broadcaster::get_published_value(para_id, b"key2"), + Some(b"value2".to_vec()) // Should remain unchanged + ); + assert_eq!(Broadcaster::get_published_value(para_id, b"key3"), Some(b"value3".to_vec())); + }); +} + +#[test] +fn empty_publish_still_creates_publisher() { + new_test_ext(Default::default()).execute_with(|| { + let para_id = ParaId::from(1000); + + let _ = Broadcaster::handle_publish(para_id, vec![]); + + assert!(PublisherExists::::get(para_id)); + }); +} + +#[test] +fn handle_publish_respects_max_items_limit() { + new_test_ext(Default::default()).execute_with(|| { + let para_id = ParaId::from(1000); + + let mut data = Vec::new(); + for i in 0..17 { + data.push((format!("key{}", i).into_bytes(), b"value".to_vec())); + } + + let result = Broadcaster::handle_publish(para_id, data); + assert!(result.is_err()); + assert!(!PublisherExists::::get(para_id)); + }); +} + +#[test] +fn handle_publish_respects_key_length_limit() { + new_test_ext(Default::default()).execute_with(|| { + let para_id = ParaId::from(1000); + + let long_key = vec![b'a'; 257]; + let data = vec![(long_key, b"value".to_vec())]; + + let result = Broadcaster::handle_publish(para_id, data); + assert!(result.is_err()); + assert!(!PublisherExists::::get(para_id)); + }); +} + +#[test] +fn handle_publish_respects_value_length_limit() { + new_test_ext(Default::default()).execute_with(|| { + let para_id = ParaId::from(1000); + + let long_value = vec![b'v'; 1025]; + let data = vec![(b"key".to_vec(), long_value)]; + + let result = Broadcaster::handle_publish(para_id, data); + assert!(result.is_err()); + assert!(!PublisherExists::::get(para_id)); + }); +} + +#[test] +fn max_stored_keys_limit_enforced() { + new_test_ext(Default::default()).execute_with(|| { + let para_id = ParaId::from(1000); + + for batch in 0..7 { + let mut data = Vec::new(); + for i in 0..16 { + let key_num = batch * 16 + i; + if key_num < 100 { + data.push((format!("key{}", key_num).into_bytes(), b"value".to_vec())); + } + } + if !data.is_empty() { + assert_ok!(Broadcaster::handle_publish(para_id, data)); + } + } + + let published_keys = PublishedKeys::::get(para_id); + assert_eq!(published_keys.len(), 100); + + let result = + Broadcaster::handle_publish(para_id, vec![(b"new_key".to_vec(), b"value".to_vec())]); + assert_err!(result, Error::::TooManyStoredKeys); + + let result = Broadcaster::handle_publish( + para_id, + vec![(b"key0".to_vec(), b"updated_value".to_vec())], + ); + assert_ok!(result); + + assert_eq!( + Broadcaster::get_published_value(para_id, b"key0"), + Some(b"updated_value".to_vec()) + ); + }); +} + +#[test] +fn published_keys_storage_matches_child_trie() { + new_test_ext(Default::default()).execute_with(|| { + let para_id = ParaId::from(1000); + + // Publish multiple batches to ensure consistency maintained across updates + let data1 = vec![ + (b"key1".to_vec(), b"value1".to_vec()), + (b"key2".to_vec(), b"value2".to_vec()), + ]; + Broadcaster::handle_publish(para_id, data1).unwrap(); + + // Update some keys, add new ones + let data2 = vec![ + (b"key1".to_vec(), b"updated_value1".to_vec()), + (b"key3".to_vec(), b"value3".to_vec()), + ]; + Broadcaster::handle_publish(para_id, data2).unwrap(); + + let tracked_keys = PublishedKeys::::get(para_id); + let actual_data = Broadcaster::get_all_published_data(para_id); + + // Counts must match + assert_eq!(tracked_keys.len(), actual_data.len()); + + // Every tracked key must exist in child trie + for tracked_key in tracked_keys.iter() { + let key: Vec = tracked_key.clone().into(); + assert!(actual_data.iter().any(|(k, _)| k == &key)); + } + + // Every child trie key must be tracked + for (actual_key, _) in actual_data.iter() { + assert!(tracked_keys.iter().any(|tracked| { + let k: Vec = tracked.clone().into(); + &k == actual_key + })); + } + }); +} + +#[test] +fn multiple_publishers_in_same_block() { + new_test_ext(Default::default()).execute_with(|| { + let para1 = ParaId::from(1000); + let para2 = ParaId::from(2000); + let para3 = ParaId::from(3000); + + // Multiple parachains publish data in the same block + let data1 = vec![(b"key1".to_vec(), b"value1".to_vec())]; + let data2 = vec![(b"key2".to_vec(), b"value2".to_vec())]; + let data3 = vec![(b"key3".to_vec(), b"value3".to_vec())]; + + Broadcaster::handle_publish(para1, data1).unwrap(); + Broadcaster::handle_publish(para2, data2).unwrap(); + Broadcaster::handle_publish(para3, data3).unwrap(); + + // Verify all three publishers exist + assert!(PublisherExists::::get(para1)); + assert!(PublisherExists::::get(para2)); + assert!(PublisherExists::::get(para3)); + + // Verify each para's data is independently accessible + assert_eq!(Broadcaster::get_published_value(para1, b"key1"), Some(b"value1".to_vec())); + assert_eq!(Broadcaster::get_published_value(para2, b"key2"), Some(b"value2".to_vec())); + assert_eq!(Broadcaster::get_published_value(para3, b"key3"), Some(b"value3".to_vec())); + + // Verify no cross-contamination + assert_eq!(Broadcaster::get_published_value(para1, b"key2"), None); + assert_eq!(Broadcaster::get_published_value(para2, b"key3"), None); + assert_eq!(Broadcaster::get_published_value(para3, b"key1"), None); + }); +} + +#[test] +fn get_all_published_data_map_returns_all_publishers() { + new_test_ext(Default::default()).execute_with(|| { + let para1 = ParaId::from(1000); + let para2 = ParaId::from(2000); + + // Publish data from two parachains + Broadcaster::handle_publish(para1, vec![(b"key1".to_vec(), b"value1".to_vec())]).unwrap(); + Broadcaster::handle_publish(para2, vec![(b"key2".to_vec(), b"value2".to_vec())]).unwrap(); + + // Get all published data + let all_data = Broadcaster::get_all_published_data_map(); + + // Should include both publishers + assert_eq!(all_data.len(), 2); + assert!(all_data.contains_key(¶1)); + assert!(all_data.contains_key(¶2)); + + // Verify data content + assert_eq!(all_data.get(¶1).unwrap(), &vec![(b"key1".to_vec(), b"value1".to_vec())]); + assert_eq!(all_data.get(¶2).unwrap(), &vec![(b"key2".to_vec(), b"value2".to_vec())]); + }); +} + +#[test] +fn subscribe_to_non_publishing_para_returns_empty() { + new_test_ext(Default::default()).execute_with(|| { + let subscriber = ParaId::from(1000); + let non_publisher = ParaId::from(9999); + + // Subscribe to a para that has never published anything + assert_ok!(Broadcaster::handle_subscribe_toggle(subscriber, non_publisher)); + + // Verify subscription was created + assert!(Broadcaster::is_subscribed(subscriber, non_publisher)); + + // Get subscribed data - should return empty for non-publishing para + let subscribed_data = Broadcaster::get_subscribed_data(subscriber); + + // Non-publishing para should not appear in results (filtered out by is_empty check) + assert!(!subscribed_data.contains_key(&non_publisher)); + assert_eq!(subscribed_data.len(), 0); + }); +} + +#[test] +fn subscribe_toggle_works() { + new_test_ext(Default::default()).execute_with(|| { + let subscriber = ParaId::from(1000); + let publisher = ParaId::from(2000); + + // Initially not subscribed + assert!(!Broadcaster::is_subscribed(subscriber, publisher)); + assert_eq!(Broadcaster::get_subscriptions(subscriber), vec![]); + + // First toggle: subscribe + assert_ok!(Broadcaster::handle_subscribe_toggle(subscriber, publisher)); + assert!(Broadcaster::is_subscribed(subscriber, publisher)); + assert_eq!(Broadcaster::get_subscriptions(subscriber), vec![publisher]); + + // Second toggle: unsubscribe + assert_ok!(Broadcaster::handle_subscribe_toggle(subscriber, publisher)); + assert!(!Broadcaster::is_subscribed(subscriber, publisher)); + assert_eq!(Broadcaster::get_subscriptions(subscriber), vec![]); + }); +} + +#[test] +fn multiple_subscriptions_work() { + new_test_ext(Default::default()).execute_with(|| { + let subscriber = ParaId::from(1000); + let publisher1 = ParaId::from(2000); + let publisher2 = ParaId::from(3000); + let publisher3 = ParaId::from(4000); + + // Subscribe to multiple publishers + assert_ok!(Broadcaster::handle_subscribe_toggle(subscriber, publisher1)); + assert_ok!(Broadcaster::handle_subscribe_toggle(subscriber, publisher2)); + assert_ok!(Broadcaster::handle_subscribe_toggle(subscriber, publisher3)); + + let subscriptions = Broadcaster::get_subscriptions(subscriber); + assert_eq!(subscriptions.len(), 3); + assert!(subscriptions.contains(&publisher1)); + assert!(subscriptions.contains(&publisher2)); + assert!(subscriptions.contains(&publisher3)); + + // Unsubscribe from middle one + assert_ok!(Broadcaster::handle_subscribe_toggle(subscriber, publisher2)); + + let subscriptions = Broadcaster::get_subscriptions(subscriber); + assert_eq!(subscriptions.len(), 2); + assert!(subscriptions.contains(&publisher1)); + assert!(!subscriptions.contains(&publisher2)); + assert!(subscriptions.contains(&publisher3)); + }); +} + +#[test] +fn max_subscriptions_limit_enforced() { + new_test_ext(Default::default()).execute_with(|| { + let subscriber = ParaId::from(1000); + + // Subscribe up to MaxSubscriptions (10 in mock) + for i in 0..10 { + let publisher = ParaId::from(2000 + i); + assert_ok!(Broadcaster::handle_subscribe_toggle(subscriber, publisher)); + } + + // Try to add one more subscription - should fail + let publisher = ParaId::from(3000); + assert_err!( + Broadcaster::handle_subscribe_toggle(subscriber, publisher), + Error::::TooManySubscriptions + ); + + // But can still unsubscribe and resubscribe + let existing_publisher = ParaId::from(2000); + assert_ok!(Broadcaster::handle_subscribe_toggle(subscriber, existing_publisher)); // Unsubscribe + assert_ok!(Broadcaster::handle_subscribe_toggle(subscriber, publisher)); // Subscribe to new one + }); +} diff --git a/polkadot/runtime/parachains/src/broadcaster/traits.rs b/polkadot/runtime/parachains/src/broadcaster/traits.rs new file mode 100644 index 0000000000000..8df081d41ee6d --- /dev/null +++ b/polkadot/runtime/parachains/src/broadcaster/traits.rs @@ -0,0 +1,33 @@ +// Copyright (C) Parity Technologies (UK) Ltd. +// This file is part of Polkadot. + +// Polkadot is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Polkadot is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Polkadot. If not, see . + +//! Traits for publish/subscribe operations in the broadcaster pallet. + +use alloc::vec::Vec; +use polkadot_primitives::Id as ParaId; +use sp_runtime::DispatchResult; + +/// Trait for handling publish and subscribe operations for parachains. +/// +/// This trait provides the interface for parachains to publish key-value data +/// and manage subscriptions to other parachains' published data. +pub trait PublishSubscribe { + /// Publish key-value data for a specific parachain. + fn publish_data(publisher: ParaId, data: Vec<(Vec, Vec)>) -> DispatchResult; + + /// Toggle subscription to a publisher's data. + fn toggle_subscription(subscriber: ParaId, publisher: ParaId) -> DispatchResult; +} diff --git a/polkadot/runtime/parachains/src/lib.rs b/polkadot/runtime/parachains/src/lib.rs index 1cd534257d7f9..0c7be5b0f6834 100644 --- a/polkadot/runtime/parachains/src/lib.rs +++ b/polkadot/runtime/parachains/src/lib.rs @@ -24,6 +24,7 @@ #![cfg_attr(not(feature = "std"), no_std)] pub mod assigner_coretime; +pub mod broadcaster; pub mod configuration; pub mod coretime; pub mod disputes; diff --git a/polkadot/runtime/parachains/src/mock.rs b/polkadot/runtime/parachains/src/mock.rs index cba63ae7b1b04..d3a72cc499eb5 100644 --- a/polkadot/runtime/parachains/src/mock.rs +++ b/polkadot/runtime/parachains/src/mock.rs @@ -17,7 +17,7 @@ //! Mocks for all the traits. use crate::{ - assigner_coretime, configuration, coretime, disputes, dmp, hrmp, + assigner_coretime, broadcaster, configuration, coretime, disputes, dmp, hrmp, inclusion::{self, AggregateMessageOrigin, UmpQueueId}, initializer, on_demand, origin, paras, paras::ParaKind, @@ -74,6 +74,7 @@ frame_support::construct_runtime!( Paras: paras, Configuration: configuration, ParasShared: shared, + Broadcaster: broadcaster, ParaInclusion: inclusion, ParaInherent: paras_inherent, Scheduler: scheduler, @@ -252,6 +253,24 @@ impl crate::paras::Config for Test { type AuthorizeCurrentCodeOrigin = EnsureRoot; } +parameter_types! { + pub const MaxPublishItems: u32 = 16; + pub const MaxKeyLength: u32 = 256; + pub const MaxValueLength: u32 = 1024; + pub const MaxStoredKeys: u32 = 100; + pub const MaxSubscriptions: u32 = 10; + pub const MaxPublishers: u32 = 1000; +} + +impl crate::broadcaster::Config for Test { + type MaxPublishItems = MaxPublishItems; + type MaxKeyLength = MaxKeyLength; + type MaxValueLength = MaxValueLength; + type MaxStoredKeys = MaxStoredKeys; + type MaxSubscriptions = MaxSubscriptions; + type MaxPublishers = MaxPublishers; +} + impl crate::dmp::Config for Test {} parameter_types! { diff --git a/polkadot/runtime/rococo/src/weights/xcm/mod.rs b/polkadot/runtime/rococo/src/weights/xcm/mod.rs index 36d818a87445d..0df81d3a19578 100644 --- a/polkadot/runtime/rococo/src/weights/xcm/mod.rs +++ b/polkadot/runtime/rococo/src/weights/xcm/mod.rs @@ -305,6 +305,12 @@ impl XcmWeightInfo for RococoXcmWeight { fn execute_with_origin(_: &Option, _: &Xcm) -> Weight { XcmGeneric::::execute_with_origin() } + fn publish(data: &PublishData) -> Weight { + XcmGeneric::::publish(data.len() as u32) + } + fn subscribe(_: &u32) -> Weight { + XcmGeneric::::subscribe() + } } #[test] diff --git a/polkadot/runtime/rococo/src/weights/xcm/pallet_xcm_benchmarks_generic.rs b/polkadot/runtime/rococo/src/weights/xcm/pallet_xcm_benchmarks_generic.rs index 4268ce5612f52..1c77997da27c0 100644 --- a/polkadot/runtime/rococo/src/weights/xcm/pallet_xcm_benchmarks_generic.rs +++ b/polkadot/runtime/rococo/src/weights/xcm/pallet_xcm_benchmarks_generic.rs @@ -349,4 +349,12 @@ impl WeightInfo { // Minimum execution time: 766_000 picoseconds. Weight::from_parts(807_000, 0) } + pub fn publish(_n: u32, ) -> Weight { + // Template weights, not benchmarked + Weight::from_parts(100_000_000, 0) + } + pub fn subscribe() -> Weight { + // Template weights, not benchmarked + Weight::from_parts(100_000_000, 0) + } } diff --git a/polkadot/runtime/rococo/src/xcm_config.rs b/polkadot/runtime/rococo/src/xcm_config.rs index 87fc99eb32ad7..623750b0e6f26 100644 --- a/polkadot/runtime/rococo/src/xcm_config.rs +++ b/polkadot/runtime/rococo/src/xcm_config.rs @@ -18,8 +18,8 @@ use super::{ parachains_origin, AccountId, AllPalletsWithSystem, Balances, Dmp, Fellows, ParaId, Runtime, - RuntimeCall, RuntimeEvent, RuntimeOrigin, TransactionByteFee, Treasurer, Treasury, WeightToFee, - XcmPallet, + RuntimeCall, RuntimeEvent, RuntimeOrigin, TransactionByteFee, Treasurer, Treasury, + WeightToFee, XcmPallet, }; use crate::governance::StakingAdmin; @@ -227,6 +227,7 @@ impl xcm_executor::Config for XcmConfig { type HrmpChannelAcceptedHandler = (); type HrmpChannelClosingHandler = (); type XcmRecorder = XcmPallet; + type BroadcastHandler = (); } parameter_types! { diff --git a/polkadot/runtime/test-runtime/src/xcm_config.rs b/polkadot/runtime/test-runtime/src/xcm_config.rs index 8d7e351d0d5be..4b43918733c91 100644 --- a/polkadot/runtime/test-runtime/src/xcm_config.rs +++ b/polkadot/runtime/test-runtime/src/xcm_config.rs @@ -158,6 +158,7 @@ impl xcm_executor::Config for XcmConfig { type HrmpChannelAcceptedHandler = (); type HrmpChannelClosingHandler = (); type XcmRecorder = (); + type BroadcastHandler = (); } impl pallet_xcm::Config for crate::Runtime { diff --git a/polkadot/runtime/westend/src/weights/xcm/mod.rs b/polkadot/runtime/westend/src/weights/xcm/mod.rs index ba4502e228420..07313c6d125ee 100644 --- a/polkadot/runtime/westend/src/weights/xcm/mod.rs +++ b/polkadot/runtime/westend/src/weights/xcm/mod.rs @@ -21,7 +21,7 @@ use crate::Runtime; use alloc::vec::Vec; use frame_support::weights::Weight; use xcm::{ - latest::{prelude::*, QueryResponseInfo}, + latest::{prelude::*, QueryResponseInfo, PublishData}, DoubleEncoded, }; @@ -307,6 +307,15 @@ impl XcmWeightInfo for WestendXcmWeight { fn execute_with_origin(_: &Option, _: &Xcm) -> Weight { XcmGeneric::::execute_with_origin() } + + fn publish(_: &PublishData) -> Weight { + // TODO: Benchmark + Weight::from_parts(10_000_000, 0) + } + fn subscribe(_: &u32) -> Weight { + // TODO: Benchmark + Weight::from_parts(10_000_000, 0) + } } #[test] diff --git a/polkadot/runtime/westend/src/xcm_config.rs b/polkadot/runtime/westend/src/xcm_config.rs index a758d030de7de..67e838c843cb0 100644 --- a/polkadot/runtime/westend/src/xcm_config.rs +++ b/polkadot/runtime/westend/src/xcm_config.rs @@ -236,6 +236,7 @@ impl xcm_executor::Config for XcmConfig { type HrmpChannelAcceptedHandler = (); type HrmpChannelClosingHandler = (); type XcmRecorder = XcmPallet; + type BroadcastHandler = (); } parameter_types! { diff --git a/polkadot/xcm/pallet-xcm-benchmarks/src/fungible/mock.rs b/polkadot/xcm/pallet-xcm-benchmarks/src/fungible/mock.rs index 9e06550b6b724..dc4c14e2f6432 100644 --- a/polkadot/xcm/pallet-xcm-benchmarks/src/fungible/mock.rs +++ b/polkadot/xcm/pallet-xcm-benchmarks/src/fungible/mock.rs @@ -122,6 +122,7 @@ impl xcm_executor::Config for XcmConfig { type HrmpChannelAcceptedHandler = (); type HrmpChannelClosingHandler = (); type XcmRecorder = (); + type BroadcastHandler = (); } impl crate::Config for Test { diff --git a/polkadot/xcm/pallet-xcm-benchmarks/src/generic/benchmarking.rs b/polkadot/xcm/pallet-xcm-benchmarks/src/generic/benchmarking.rs index aefbada7429dd..9141fca025011 100644 --- a/polkadot/xcm/pallet-xcm-benchmarks/src/generic/benchmarking.rs +++ b/polkadot/xcm/pallet-xcm-benchmarks/src/generic/benchmarking.rs @@ -961,6 +961,59 @@ mod benchmarks { Ok(()) } + #[benchmark] + fn publish(n: Linear<1, { MaxPublishItems::get() }>) -> Result<(), BenchmarkError> { + use xcm::latest::{MaxPublishKeyLength, MaxPublishValueLength}; + + // The `Publish` instruction weight scales with the number of items published. + // Each item is benchmarked at maximum key and value lengths to represent worst-case + // storage operations. The actual weight formula will be `base_weight + n * per_item_weight`. + let max_key_len = MaxPublishKeyLength::get() as usize; + let max_value_len = MaxPublishValueLength::get() as usize; + + // Create publish data: n items, each with maximum key and value length + let data_vec: Vec<_> = (0..n) + .map(|i| { + ( + BoundedVec::try_from(vec![i as u8; max_key_len]).unwrap(), + BoundedVec::try_from(vec![i as u8; max_value_len]).unwrap(), + ) + }) + .collect(); + + let data = BoundedVec::try_from(data_vec).unwrap(); + + let origin = T::publish_origin()?; + let mut executor = new_executor::(origin); + + let instruction = Instruction::Publish { data }; + let xcm = Xcm(vec![instruction]); + + #[block] + { + executor.bench_process(xcm)?; + } + + Ok(()) + } + + #[benchmark] + fn subscribe() -> Result<(), BenchmarkError> { + let origin = T::publish_origin()?; + let publisher = T::valid_publisher()?; + + let mut executor = new_executor::(origin); + let instruction = Instruction::Subscribe { publisher }; + let xcm = Xcm(vec![instruction]); + + #[block] + { + executor.bench_process(xcm)?; + } + + Ok(()) + } + impl_benchmark_test_suite!( Pallet, crate::generic::mock::new_test_ext(), diff --git a/polkadot/xcm/pallet-xcm-benchmarks/src/generic/mock.rs b/polkadot/xcm/pallet-xcm-benchmarks/src/generic/mock.rs index 6368ca0e9c3f5..df05f68bdbd0f 100644 --- a/polkadot/xcm/pallet-xcm-benchmarks/src/generic/mock.rs +++ b/polkadot/xcm/pallet-xcm-benchmarks/src/generic/mock.rs @@ -112,6 +112,7 @@ impl xcm_executor::Config for XcmConfig { type HrmpChannelAcceptedHandler = (); type HrmpChannelClosingHandler = (); type XcmRecorder = (); + type BroadcastHandler = (); } parameter_types! { @@ -193,6 +194,14 @@ impl generic::Config for Test { let target: Location = AccountId32 { network: None, id: [0; 32] }.into(); Ok((origin, target)) } + + fn publish_origin() -> Result { + Ok(Parachain(1000).into()) + } + + fn valid_publisher() -> Result { + Ok(2000) + } } #[cfg(feature = "runtime-benchmarks")] diff --git a/polkadot/xcm/pallet-xcm-benchmarks/src/generic/mod.rs b/polkadot/xcm/pallet-xcm-benchmarks/src/generic/mod.rs index d7471b02368fa..20180e98388bb 100644 --- a/polkadot/xcm/pallet-xcm-benchmarks/src/generic/mod.rs +++ b/polkadot/xcm/pallet-xcm-benchmarks/src/generic/mod.rs @@ -108,6 +108,22 @@ pub mod pallet { crate_version: as frame_support::traits::PalletInfoAccess>::crate_version(), } } + + /// Return a valid origin for `Publish` and `Subscribe` benchmarks. + /// + /// Should return a parachain origin that is allowed by the BroadcastHandler filter. + /// If set to `Err`, benchmarks which rely on publish/subscribe will be skipped. + fn publish_origin() -> Result { + Err(BenchmarkError::Skip) + } + + /// Return a valid publisher ID for the `Subscribe` benchmark. + /// + /// This should be a parachain ID that subscribers can listen to. + /// If set to `Err`, the subscribe benchmark will be skipped. + fn valid_publisher() -> Result { + Err(BenchmarkError::Skip) + } } #[pallet::pallet] diff --git a/polkadot/xcm/pallet-xcm/src/errors.rs b/polkadot/xcm/pallet-xcm/src/errors.rs index 84e544c08748e..34b502d6985a1 100644 --- a/polkadot/xcm/pallet-xcm/src/errors.rs +++ b/polkadot/xcm/pallet-xcm/src/errors.rs @@ -136,6 +136,12 @@ pub enum ExecutionError { /// Too many assets matched the given asset filter. #[codec(index = 35)] TooManyAssets, + /// Publishing data failed. + #[codec(index = 36)] + PublishFailed, + /// Subscribing to a publisher failed. + #[codec(index = 37)] + SubscribeFailed, // Errors that happen prior to instructions being executed. These fall outside of the XCM // spec. /// XCM version not able to be handled. @@ -198,6 +204,8 @@ impl From for ExecutionError { XcmError::Unanchored => Self::Unanchored, XcmError::NotDepositable => Self::NotDepositable, XcmError::TooManyAssets => Self::TooManyAssets, + XcmError::PublishFailed => Self::PublishFailed, + XcmError::SubscribeFailed => Self::SubscribeFailed, XcmError::UnhandledXcmVersion => Self::UnhandledXcmVersion, XcmError::WeightLimitReached(_) => Self::WeightLimitReached, XcmError::Barrier => Self::Barrier, diff --git a/polkadot/xcm/src/v4/mod.rs b/polkadot/xcm/src/v4/mod.rs index 502200e849405..6c86dd1035903 100644 --- a/polkadot/xcm/src/v4/mod.rs +++ b/polkadot/xcm/src/v4/mod.rs @@ -1431,6 +1431,14 @@ impl TryFrom> for Instructi tracing::debug!(target: "xcm::versions::v5tov4", ?new_instruction, "not supported by v4"); return Err(()); }, + Publish { .. } => { + tracing::debug!(target: "xcm::versions::v5tov4", ?new_instruction, "not supported by v4"); + return Err(()); + }, + Subscribe { .. } => { + tracing::debug!(target: "xcm::versions::v5tov4", ?new_instruction, "not supported by v4"); + return Err(()); + }, }) } } diff --git a/polkadot/xcm/src/v5/mod.rs b/polkadot/xcm/src/v5/mod.rs index 0caf7d0c581fe..a830af04220f5 100644 --- a/polkadot/xcm/src/v5/mod.rs +++ b/polkadot/xcm/src/v5/mod.rs @@ -186,8 +186,9 @@ pub mod prelude { InstructionError, InstructionIndex, InteriorLocation, Junction::{self, *}, Junctions::{self, Here}, - Location, MaxAssetTransferFilters, MaybeErrorCode, + Location, MaxAssetTransferFilters, MaxPublishItems, MaybeErrorCode, NetworkId::{self, *}, + PublishData, OriginKind, Outcome, PalletInfo, Parent, ParentThen, PreparedMessage, QueryId, QueryResponseInfo, Reanchorable, Response, Result as XcmResult, SendError, SendResult, SendXcm, Weight, @@ -211,8 +212,16 @@ parameter_types! { pub MaxPalletNameLen: u32 = 48; pub MaxPalletsInfo: u32 = 64; pub MaxAssetTransferFilters: u32 = 6; + pub MaxPublishItems: u32 = 16; + pub MaxPublishKeyLength: u32 = 32; + pub MaxPublishValueLength: u32 = 1024; } +pub type PublishData = BoundedVec< + (BoundedVec, BoundedVec), + MaxPublishItems, +>; + #[derive( Clone, Eq, PartialEq, Encode, Decode, DecodeWithMemTracking, Debug, TypeInfo, MaxEncodedLen, )] @@ -1139,6 +1148,43 @@ pub enum Instruction { /// - `hints`: A bounded vector of `ExecutionHint`, specifying the different hints that will /// be activated. SetHints { hints: BoundedVec }, + + /// Publish data to the relay chain for other parachains to access. + /// + /// This instruction allows parachains to publish key-value data pairs to the relay chain + /// which are stored in child tries on the relay chain indexed by the publisher's ParaId. + /// + /// - `data`: The key-value pairs to be published, bounded by MaxPublishItems + /// + /// Safety: Origin must be a parachain (Sovereign Account). The relay chain will validate + /// the origin and store data in the appropriate child trie. + /// + /// Kind: *Command* + /// + /// Errors: + /// - NoPermission: If origin is not authorized by the configured filter + /// - BadOrigin: If origin is not a valid parachain + /// - PublishFailed: If the underlying handler fails (e.g., key/value too long, too many items) + Publish { data: PublishData }, + + /// Toggle subscription to a publisher parachain's data. + /// + /// This instruction allows parachains to subscribe/unsubscribe to data published by + /// other parachains through the Publish instruction. If already subscribed, this will + /// unsubscribe. If not subscribed, this will subscribe. + /// + /// - `publisher`: The ID of the publisher parachain to toggle subscription for + /// + /// Safety: Origin must be a parachain. The relay chain will validate the origin and + /// manage the subscription state. + /// + /// Kind: *Command* + /// + /// Errors: + /// - NoPermission: If origin is not authorized by the configured filter + /// - BadOrigin: If origin is not a valid parachain + /// - SubscribeFailed: If the underlying handler fails (e.g., too many subscriptions) + Subscribe { publisher: u32 }, } #[derive( @@ -1241,6 +1287,8 @@ impl Instruction { InitiateTransfer { destination, remote_fees, preserve_origin, assets, remote_xcm }, ExecuteWithOrigin { descendant_origin, xcm } => ExecuteWithOrigin { descendant_origin, xcm: xcm.into() }, + Publish { data } => Publish { data }, + Subscribe { publisher } => Subscribe { publisher }, } } } @@ -1316,6 +1364,8 @@ impl> GetWeight for Instruction { W::initiate_transfer(destination, remote_fees, preserve_origin, assets, remote_xcm), ExecuteWithOrigin { descendant_origin, xcm } => W::execute_with_origin(descendant_origin, xcm), + Publish { data } => W::publish(data), + Subscribe { publisher } => W::subscribe(publisher), } } } diff --git a/polkadot/xcm/src/v5/traits.rs b/polkadot/xcm/src/v5/traits.rs index ecbf46f84d31b..b03de36219732 100644 --- a/polkadot/xcm/src/v5/traits.rs +++ b/polkadot/xcm/src/v5/traits.rs @@ -154,6 +154,12 @@ pub enum Error { /// Too many assets matched the given asset filter. #[codec(index = 35)] TooManyAssets, + /// Publishing data failed. + #[codec(index = 36)] + PublishFailed, + /// Subscribing to a publisher failed. + #[codec(index = 37)] + SubscribeFailed, // Errors that happen prior to instructions being executed. These fall outside of the XCM // spec. diff --git a/polkadot/xcm/xcm-builder/Cargo.toml b/polkadot/xcm/xcm-builder/Cargo.toml index 32f8f595031ef..15a395d7170d4 100644 --- a/polkadot/xcm/xcm-builder/Cargo.toml +++ b/polkadot/xcm/xcm-builder/Cargo.toml @@ -31,6 +31,8 @@ xcm-executor = { workspace = true } # Polkadot dependencies polkadot-parachain-primitives = { workspace = true } +polkadot-primitives = { workspace = true } +polkadot-runtime-parachains = { workspace = true } [dev-dependencies] pallet-assets = { workspace = true, default-features = true } @@ -71,6 +73,8 @@ std = [ "pallet-asset-conversion/std", "pallet-transaction-payment/std", "polkadot-parachain-primitives/std", + "polkadot-primitives/std", + "polkadot-runtime-parachains/std", "primitive-types/std", "scale-info/std", "sp-arithmetic/std", diff --git a/polkadot/xcm/xcm-builder/src/asset_exchange/single_asset_adapter/mock.rs b/polkadot/xcm/xcm-builder/src/asset_exchange/single_asset_adapter/mock.rs index 30136b004a480..ed26b8ab67a38 100644 --- a/polkadot/xcm/xcm-builder/src/asset_exchange/single_asset_adapter/mock.rs +++ b/polkadot/xcm/xcm-builder/src/asset_exchange/single_asset_adapter/mock.rs @@ -251,6 +251,7 @@ impl xcm_executor::Config for XcmConfig { type HrmpChannelAcceptedHandler = (); type HrmpChannelClosingHandler = (); type XcmRecorder = (); + type BroadcastHandler = (); } /// Simple converter from a [`Location`] with an [`AccountIndex64`] junction and no parent to a diff --git a/polkadot/xcm/xcm-builder/src/broadcast_adapter.rs b/polkadot/xcm/xcm-builder/src/broadcast_adapter.rs new file mode 100644 index 0000000000000..6c49c0730e260 --- /dev/null +++ b/polkadot/xcm/xcm-builder/src/broadcast_adapter.rs @@ -0,0 +1,254 @@ +// Copyright (C) Parity Technologies (UK) Ltd. +// This file is part of Polkadot. + +// Polkadot is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Polkadot is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Polkadot. If not, see . + +//! Adapters for broadcast/publish operations in XCM. + +use alloc::vec::Vec; +use core::marker::PhantomData; +use frame_support::traits::Contains; +use polkadot_primitives::Id as ParaId; +use polkadot_runtime_parachains::broadcaster::PublishSubscribe; +use xcm::latest::prelude::XcmError; +use xcm::latest::{Junction, Location, PublishData, Result as XcmResult}; +use xcm_executor::traits::BroadcastHandler; + +/// Configurable broadcast adapter that validates parachain origins. +pub struct ParachainBroadcastAdapter(PhantomData<(Filter, Handler)>); + +impl BroadcastHandler for ParachainBroadcastAdapter +where + Filter: Contains, + Handler: PublishSubscribe, +{ + fn handle_publish(origin: &Location, data: PublishData) -> XcmResult { + // Check if origin is authorized to publish + if !Filter::contains(origin) { + return Err(XcmError::NoPermission); + } + + // Extract parachain ID from authorized origin + let para_id = match origin.unpack() { + (0, [Junction::Parachain(id)]) => ParaId::from(*id), // Direct parachain + (1, [Junction::Parachain(id), ..]) => ParaId::from(*id), // Sibling parachain + _ => return Err(XcmError::BadOrigin), // Should be caught by filter + }; + + // Call the actual handler + let data_vec: Vec<(Vec, Vec)> = data + .into_inner() + .into_iter() + .map(|(k, v)| (k.into_inner(), v.into_inner())) + .collect(); + Handler::publish_data(para_id, data_vec).map_err(|_| XcmError::PublishFailed) + } + + fn handle_subscribe(origin: &Location, publisher: u32) -> XcmResult { + // Check if origin is authorized to subscribe + if !Filter::contains(origin) { + return Err(XcmError::NoPermission); + } + + // Extract subscriber parachain ID from authorized origin + let subscriber_id = match origin.unpack() { + (0, [Junction::Parachain(id)]) => ParaId::from(*id), // Direct parachain + (1, [Junction::Parachain(id), ..]) => ParaId::from(*id), // Sibling parachain + _ => return Err(XcmError::BadOrigin), // Should be caught by filter + }; + + let publisher_id = ParaId::from(publisher); + + // Call the handler for subscribe toggle + Handler::toggle_subscription(subscriber_id, publisher_id) + .map_err(|_| XcmError::SubscribeFailed) + } +} + +/// Allows only direct parachains (parents=0, interior=[Parachain(_)]). +pub struct OnlyParachains; +impl Contains for OnlyParachains { + fn contains(origin: &Location) -> bool { + matches!(origin.unpack(), (0, [Junction::Parachain(_)])) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use frame_support::parameter_types; + use polkadot_runtime_parachains::broadcaster::PublishSubscribe; + use sp_runtime::BoundedVec; + use xcm::latest::prelude::XcmError; + use xcm::latest::{ + Junction, Location, MaxPublishKeyLength, MaxPublishValueLength, PublishData, + }; + + // Mock handler that tracks calls + parameter_types! { + pub static PublishCalls: Vec<(ParaId, Vec<(Vec, Vec)>)> = vec![]; + pub static SubscribeCalls: Vec<(ParaId, ParaId)> = vec![]; + } + + // Helper to create test publish data + fn test_publish_data(items: Vec<(&[u8], &[u8])>) -> PublishData { + items + .into_iter() + .map(|(k, v)| { + ( + BoundedVec::::try_from(k.to_vec()).unwrap(), + BoundedVec::::try_from(v.to_vec()).unwrap(), + ) + }) + .collect::>() + .try_into() + .unwrap() + } + + struct MockPublishHandler; + impl PublishSubscribe for MockPublishHandler { + fn publish_data( + publisher: ParaId, + data: Vec<(Vec, Vec)>, + ) -> Result<(), sp_runtime::DispatchError> { + let mut calls = PublishCalls::get(); + calls.push((publisher, data)); + PublishCalls::set(calls); + Ok(()) + } + + fn toggle_subscription( + subscriber: ParaId, + publisher: ParaId, + ) -> Result<(), sp_runtime::DispatchError> { + let mut calls = SubscribeCalls::get(); + calls.push((subscriber, publisher)); + SubscribeCalls::set(calls); + Ok(()) + } + } + + #[test] + fn publish_from_direct_parachain_works() { + PublishCalls::set(vec![]); + let origin = Location::new(0, [Junction::Parachain(1000)]); + let data = test_publish_data(vec![(b"key1", b"value1")]); + + let result = ParachainBroadcastAdapter::::handle_publish( + &origin, + data.clone(), + ); + + assert!(result.is_ok()); + let calls = PublishCalls::get(); + assert_eq!(calls.len(), 1); + assert_eq!(calls[0].0, ParaId::from(1000)); + assert_eq!(calls[0].1, vec![(b"key1".to_vec(), b"value1".to_vec())]); + } + + #[test] + fn publish_from_sibling_parachain_fails() { + PublishCalls::set(vec![]); + let origin = Location::new( + 1, + [Junction::Parachain(2000), Junction::AccountId32 { network: None, id: [1; 32] }], + ); + let data = test_publish_data(vec![(b"key1", b"value1")]); + + let result = ParachainBroadcastAdapter::::handle_publish( + &origin, + data.clone(), + ); + + assert!(matches!(result, Err(XcmError::NoPermission))); + assert!(PublishCalls::get().is_empty()); + } + + #[test] + fn publish_from_non_parachain_fails() { + PublishCalls::set(vec![]); + let origin = Location::here(); + let data = test_publish_data(vec![(b"key1", b"value1")]); + + let result = + ParachainBroadcastAdapter::::handle_publish( + &origin, data, + ); + + assert!(matches!(result, Err(XcmError::NoPermission))); + assert!(PublishCalls::get().is_empty()); + } + + #[test] + fn only_parachains_filter_works() { + // Direct parachain allowed + assert!(OnlyParachains::contains(&Location::new(0, [Junction::Parachain(1000)]))); + + // Sibling parachain not allowed + assert!(!OnlyParachains::contains(&Location::new(1, [Junction::Parachain(1000)]))); + + // Root not allowed + assert!(!OnlyParachains::contains(&Location::here())); + } + + #[test] + fn subscribe_from_direct_parachain_works() { + SubscribeCalls::set(vec![]); + let origin = Location::new(0, [Junction::Parachain(1000)]); + let publisher = 2000; + + let result = + ParachainBroadcastAdapter::::handle_subscribe( + &origin, publisher, + ); + + assert!(result.is_ok()); + let calls = SubscribeCalls::get(); + assert_eq!(calls.len(), 1); + assert_eq!(calls[0], (ParaId::from(1000), ParaId::from(2000))); + } + + #[test] + fn subscribe_from_sibling_parachain_fails() { + SubscribeCalls::set(vec![]); + let origin = Location::new( + 1, + [Junction::Parachain(3000), Junction::AccountId32 { network: None, id: [1; 32] }], + ); + let publisher = 2000; + + let result = + ParachainBroadcastAdapter::::handle_subscribe( + &origin, publisher, + ); + + assert!(matches!(result, Err(XcmError::NoPermission))); + assert!(SubscribeCalls::get().is_empty()); + } + + #[test] + fn subscribe_from_non_parachain_fails() { + SubscribeCalls::set(vec![]); + let origin = Location::here(); + let publisher = 2000; + + let result = + ParachainBroadcastAdapter::::handle_subscribe( + &origin, publisher, + ); + + assert!(matches!(result, Err(XcmError::NoPermission))); + assert!(SubscribeCalls::get().is_empty()); + } +} diff --git a/polkadot/xcm/xcm-builder/src/lib.rs b/polkadot/xcm/xcm-builder/src/lib.rs index 83fb34bd6569f..d01628f4ea20d 100644 --- a/polkadot/xcm/xcm-builder/src/lib.rs +++ b/polkadot/xcm/xcm-builder/src/lib.rs @@ -49,6 +49,9 @@ pub use barriers::{ TakeWeightCredit, TrailingSetTopicAsId, WithComputedOrigin, }; +mod broadcast_adapter; +pub use broadcast_adapter::{OnlyParachains, ParachainBroadcastAdapter}; + mod controller; pub use controller::{ Controller, ExecuteController, ExecuteControllerWeightInfo, QueryController, diff --git a/polkadot/xcm/xcm-builder/src/test_utils.rs b/polkadot/xcm/xcm-builder/src/test_utils.rs index 90afb2c9a3d3e..ec5230fe8c791 100644 --- a/polkadot/xcm/xcm-builder/src/test_utils.rs +++ b/polkadot/xcm/xcm-builder/src/test_utils.rs @@ -16,13 +16,13 @@ // Shared test utilities and implementations for the XCM Builder. -use alloc::vec::Vec; +use alloc::{collections::BTreeMap, vec::Vec}; use frame_support::{ parameter_types, traits::{Contains, CrateVersion, PalletInfoData, PalletsInfoAccess}, }; pub use xcm::latest::{prelude::*, Weight}; -use xcm_executor::traits::{ClaimAssets, DropAssets, VersionChangeNotifier}; +use xcm_executor::traits::{BroadcastHandler, ClaimAssets, DropAssets, VersionChangeNotifier}; pub use xcm_executor::{ traits::{ AssetExchange, AssetLock, ConvertOrigin, Enact, LockError, OnResponse, TransactAsset, @@ -33,6 +33,10 @@ pub use xcm_executor::{ parameter_types! { pub static SubscriptionRequests: Vec<(Location, Option<(QueryId, Weight)>)> = vec![]; pub static MaxAssetsIntoHolding: u32 = 4; + // Maps ParaId => Vec<(key, value)> + pub static PublishedData: BTreeMap, Vec)>> = BTreeMap::new(); + // Maps subscriber ParaId => Vec + pub static BroadcastSubscriptions: BTreeMap> = BTreeMap::new(); } pub struct TestSubscriptionService; @@ -62,6 +66,54 @@ impl VersionChangeNotifier for TestSubscriptionService { } } +pub struct TestBroadcastHandler; + +impl BroadcastHandler for TestBroadcastHandler { + fn handle_publish(origin: &Location, data: PublishData) -> XcmResult { + // Extract para_id from origin + let para_id = match origin.unpack() { + (0, [Parachain(id)]) => *id, + (1, [Parachain(id), ..]) => *id, + _ => return Err(XcmError::BadOrigin), + }; + + let mut published = PublishedData::get(); + let data_vec: Vec<(Vec, Vec)> = data + .into_inner() + .into_iter() + .map(|(k, v)| (k.into_inner(), v.into_inner())) + .collect(); + + // Merge with existing data for this parachain + published.entry(para_id).or_insert_with(Vec::new).extend(data_vec); + PublishedData::set(published); + + Ok(()) + } + + fn handle_subscribe(origin: &Location, publisher: u32) -> XcmResult { + // Extract subscriber para_id from origin + let subscriber_id = match origin.unpack() { + (0, [Parachain(id)]) => *id, + (1, [Parachain(id), ..]) => *id, + _ => return Err(XcmError::BadOrigin), + }; + + let mut subscriptions = BroadcastSubscriptions::get(); + let subscriber_list = subscriptions.entry(subscriber_id).or_insert_with(Vec::new); + + // Toggle subscription + if let Some(pos) = subscriber_list.iter().position(|&p| p == publisher) { + subscriber_list.remove(pos); + } else { + subscriber_list.push(publisher); + } + + BroadcastSubscriptions::set(subscriptions); + Ok(()) + } +} + parameter_types! { pub static TrappedAssets: Vec<(Location, Assets)> = vec![]; } diff --git a/polkadot/xcm/xcm-builder/src/tests/mock.rs b/polkadot/xcm/xcm-builder/src/tests/mock.rs index b932aaee6fcf8..75758cca65690 100644 --- a/polkadot/xcm/xcm-builder/src/tests/mock.rs +++ b/polkadot/xcm/xcm-builder/src/tests/mock.rs @@ -771,6 +771,7 @@ impl Config for TestConfig { type HrmpChannelAcceptedHandler = (); type HrmpChannelClosingHandler = (); type XcmRecorder = (); + type BroadcastHandler = TestBroadcastHandler; } pub fn fungible_multi_asset(location: Location, amount: u128) -> Asset { diff --git a/polkadot/xcm/xcm-builder/src/tests/mod.rs b/polkadot/xcm/xcm-builder/src/tests/mod.rs index 379baaf5e3767..d7f727c830b3c 100644 --- a/polkadot/xcm/xcm-builder/src/tests/mod.rs +++ b/polkadot/xcm/xcm-builder/src/tests/mod.rs @@ -35,6 +35,7 @@ mod expecting; mod locking; mod origins; mod pay; +mod publish_subscribe; mod querying; mod routing; mod transacting; diff --git a/polkadot/xcm/xcm-builder/src/tests/pay/mock.rs b/polkadot/xcm/xcm-builder/src/tests/pay/mock.rs index d8f8e15f5eb05..8d760f56e1c67 100644 --- a/polkadot/xcm/xcm-builder/src/tests/pay/mock.rs +++ b/polkadot/xcm/xcm-builder/src/tests/pay/mock.rs @@ -250,6 +250,7 @@ impl xcm_executor::Config for XcmConfig { type HrmpChannelAcceptedHandler = (); type HrmpChannelClosingHandler = (); type XcmRecorder = XcmPallet; + type BroadcastHandler = (); } parameter_types! { diff --git a/polkadot/xcm/xcm-builder/src/tests/publish_subscribe.rs b/polkadot/xcm/xcm-builder/src/tests/publish_subscribe.rs new file mode 100644 index 0000000000000..221f754892392 --- /dev/null +++ b/polkadot/xcm/xcm-builder/src/tests/publish_subscribe.rs @@ -0,0 +1,270 @@ +// Copyright (C) Parity Technologies (UK) Ltd. +// This file is part of Polkadot. + +// Polkadot is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Polkadot is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Polkadot. If not, see . + +//! Tests for Publish and Subscribe XCM instructions. + +use super::*; +use crate::test_utils::{BroadcastSubscriptions, PublishedData}; +use sp_runtime::BoundedVec; +use xcm::latest::{MaxPublishKeyLength, MaxPublishValueLength}; + +// Helper to create test publish data +fn test_publish_data(items: Vec<(&[u8], &[u8])>) -> PublishData { + items + .into_iter() + .map(|(k, v)| { + ( + BoundedVec::::try_from(k.to_vec()).unwrap(), + BoundedVec::::try_from(v.to_vec()).unwrap(), + ) + }) + .collect::>() + .try_into() + .unwrap() +} + +#[test] +fn publish_from_parachain_works() { + // Allow unpaid execution from Parachain(1000) + AllowUnpaidFrom::set(vec![Parachain(1000).into()]); + + let data = test_publish_data(vec![(b"key1", b"value1")]); + + let message = Xcm::(vec![Publish { data: data.clone() }]); + let mut hash = fake_message_hash(&message); + let weight_limit = Weight::from_parts(10, 10); + + let r = XcmExecutor::::prepare_and_execute( + Parachain(1000), + message, + &mut hash, + weight_limit, + Weight::zero(), + ); + + assert_eq!(r, Outcome::Complete { used: Weight::from_parts(10, 10) }); + + // Verify data was published + let published = PublishedData::get(); + assert_eq!(published.get(&1000).unwrap().len(), 1); + assert_eq!(published.get(&1000).unwrap()[0], (b"key1".to_vec(), b"value1".to_vec())); +} + +#[test] +fn publish_from_non_parachain_fails() { + // Allow unpaid execution from Parent to test that origin validation happens + AllowUnpaidFrom::set(vec![Parent.into()]); + + let data = test_publish_data(vec![(b"key1", b"value1")]); + + let message = Xcm::(vec![Publish { data }]); + let mut hash = fake_message_hash(&message); + let weight_limit = Weight::from_parts(10, 10); + + // Try from Parent (not a parachain) + let r = XcmExecutor::::prepare_and_execute( + Parent, + message, + &mut hash, + weight_limit, + Weight::zero(), + ); + + assert_eq!( + r, + Outcome::Incomplete { + used: Weight::from_parts(10, 10), + error: InstructionError { index: 0, error: XcmError::BadOrigin }, + } + ); +} + +#[test] +fn publish_without_origin_fails() { + // Allow unpaid execution from Parachain(1000) + AllowUnpaidFrom::set(vec![Parachain(1000).into()]); + + let data = test_publish_data(vec![(b"key1", b"value1")]); + + let message = Xcm::(vec![ClearOrigin, Publish { data }]); + let mut hash = fake_message_hash(&message); + let weight_limit = Weight::from_parts(20, 20); + + let r = XcmExecutor::::prepare_and_execute( + Parachain(1000), + message, + &mut hash, + weight_limit, + Weight::zero(), + ); + + assert_eq!( + r, + Outcome::Incomplete { + used: Weight::from_parts(20, 20), + error: InstructionError { index: 1, error: XcmError::BadOrigin }, + } + ); +} + +#[test] +fn publish_multiple_items_works() { + // Allow unpaid execution from Parachain(1000) + AllowUnpaidFrom::set(vec![Parachain(1000).into()]); + + let data = test_publish_data(vec![ + (b"key1", b"value1"), + (b"key2", b"value2"), + ]); + + let message = Xcm::(vec![Publish { data: data.clone() }]); + let mut hash = fake_message_hash(&message); + let weight_limit = Weight::from_parts(10, 10); + + let r = XcmExecutor::::prepare_and_execute( + Parachain(1000), + message, + &mut hash, + weight_limit, + Weight::zero(), + ); + + assert_eq!(r, Outcome::Complete { used: Weight::from_parts(10, 10) }); + + // Verify all data was published + let published = PublishedData::get(); + let para_data = published.get(&1000).unwrap(); + assert_eq!(para_data.len(), 2); + assert!(para_data.contains(&(b"key1".to_vec(), b"value1".to_vec()))); + assert!(para_data.contains(&(b"key2".to_vec(), b"value2".to_vec()))); +} + +#[test] +fn subscribe_from_parachain_works() { + // Allow unpaid execution from Parachain(1000) + AllowUnpaidFrom::set(vec![Parachain(1000).into()]); + + let message = Xcm::(vec![Subscribe { publisher: 2000 }]); + let mut hash = fake_message_hash(&message); + let weight_limit = Weight::from_parts(10, 10); + + let r = XcmExecutor::::prepare_and_execute( + Parachain(1000), + message, + &mut hash, + weight_limit, + Weight::zero(), + ); + + assert_eq!(r, Outcome::Complete { used: Weight::from_parts(10, 10) }); + + // Verify subscription was created + let subscriptions = BroadcastSubscriptions::get(); + assert_eq!(subscriptions.get(&1000).unwrap(), &vec![2000]); +} + +#[test] +fn subscribe_toggle_unsubscribes() { + // Allow unpaid execution from Parachain(1000) + AllowUnpaidFrom::set(vec![Parachain(1000).into()]); + + // First subscribe + let message1 = Xcm::(vec![Subscribe { publisher: 2000 }]); + let mut hash1 = fake_message_hash(&message1); + let weight_limit = Weight::from_parts(10, 10); + + let r = XcmExecutor::::prepare_and_execute( + Parachain(1000), + message1, + &mut hash1, + weight_limit, + Weight::zero(), + ); + assert_eq!(r, Outcome::Complete { used: Weight::from_parts(10, 10) }); + + // Verify subscribed + let subscriptions = BroadcastSubscriptions::get(); + assert_eq!(subscriptions.get(&1000).unwrap(), &vec![2000]); + + // Subscribe again to toggle (unsubscribe) + let message2 = Xcm::(vec![Subscribe { publisher: 2000 }]); + let mut hash2 = fake_message_hash(&message2); + + let r = XcmExecutor::::prepare_and_execute( + Parachain(1000), + message2, + &mut hash2, + weight_limit, + Weight::zero(), + ); + assert_eq!(r, Outcome::Complete { used: Weight::from_parts(10, 10) }); + + // Verify unsubscribed + let subscriptions = BroadcastSubscriptions::get(); + assert!(subscriptions.get(&1000).unwrap().is_empty()); +} + +#[test] +fn subscribe_from_non_parachain_fails() { + // Allow unpaid execution from Parent to test that origin validation happens + AllowUnpaidFrom::set(vec![Parent.into()]); + + let message = Xcm::(vec![Subscribe { publisher: 2000 }]); + let mut hash = fake_message_hash(&message); + let weight_limit = Weight::from_parts(10, 10); + + let r = XcmExecutor::::prepare_and_execute( + Parent, + message, + &mut hash, + weight_limit, + Weight::zero(), + ); + + assert_eq!( + r, + Outcome::Incomplete { + used: Weight::from_parts(10, 10), + error: InstructionError { index: 0, error: XcmError::BadOrigin }, + } + ); +} + +#[test] +fn subscribe_without_origin_fails() { + // Allow unpaid execution from Parachain(1000) + AllowUnpaidFrom::set(vec![Parachain(1000).into()]); + + let message = Xcm::(vec![ClearOrigin, Subscribe { publisher: 2000 }]); + let mut hash = fake_message_hash(&message); + let weight_limit = Weight::from_parts(20, 20); + + let r = XcmExecutor::::prepare_and_execute( + Parachain(1000), + message, + &mut hash, + weight_limit, + Weight::zero(), + ); + + assert_eq!( + r, + Outcome::Incomplete { + used: Weight::from_parts(20, 20), + error: InstructionError { index: 1, error: XcmError::BadOrigin }, + } + ); +} diff --git a/polkadot/xcm/xcm-builder/tests/mock/mod.rs b/polkadot/xcm/xcm-builder/tests/mock/mod.rs index 7a2eb8cc55adf..c3b76f74e3dfa 100644 --- a/polkadot/xcm/xcm-builder/tests/mock/mod.rs +++ b/polkadot/xcm/xcm-builder/tests/mock/mod.rs @@ -196,6 +196,7 @@ impl xcm_executor::Config for XcmConfig { type HrmpChannelAcceptedHandler = (); type HrmpChannelClosingHandler = (); type XcmRecorder = XcmPallet; + type BroadcastHandler = (); } /// Converts a local signed origin into an XCM location. Forms the basis for local origins diff --git a/polkadot/xcm/xcm-builder/tests/scenarios.rs b/polkadot/xcm/xcm-builder/tests/scenarios.rs index c772a49fc8226..65622eef71038 100644 --- a/polkadot/xcm/xcm-builder/tests/scenarios.rs +++ b/polkadot/xcm/xcm-builder/tests/scenarios.rs @@ -403,6 +403,7 @@ fn recursive_xcm_execution_fail() { type HrmpChannelAcceptedHandler = (); type HrmpChannelClosingHandler = (); type XcmRecorder = XcmPallet; + type BroadcastHandler = (); } let para_acc: AccountId = ParaId::from(PARA_ID).into_account_truncating(); diff --git a/polkadot/xcm/xcm-executor/src/config.rs b/polkadot/xcm/xcm-executor/src/config.rs index 60a5ed63f32ee..59fcf1fe893e1 100644 --- a/polkadot/xcm/xcm-executor/src/config.rs +++ b/polkadot/xcm/xcm-executor/src/config.rs @@ -15,10 +15,10 @@ // along with Polkadot. If not, see . use crate::traits::{ - AssetExchange, AssetLock, CallDispatcher, ClaimAssets, ConvertOrigin, DropAssets, EventEmitter, - ExportXcm, FeeManager, HandleHrmpChannelAccepted, HandleHrmpChannelClosing, - HandleHrmpNewChannelOpenRequest, OnResponse, ProcessTransaction, RecordXcm, ShouldExecute, - TransactAsset, VersionChangeNotifier, WeightBounds, WeightTrader, + AssetExchange, AssetLock, BroadcastHandler, CallDispatcher, ClaimAssets, ConvertOrigin, + DropAssets, EventEmitter, ExportXcm, FeeManager, HandleHrmpChannelAccepted, + HandleHrmpChannelClosing, HandleHrmpNewChannelOpenRequest, OnResponse, ProcessTransaction, + RecordXcm, ShouldExecute, TransactAsset, VersionChangeNotifier, WeightBounds, WeightTrader, }; use frame_support::{ dispatch::{GetDispatchInfo, Parameter, PostDispatchInfo}, @@ -134,4 +134,6 @@ pub trait Config { type HrmpChannelClosingHandler: HandleHrmpChannelClosing; /// Allows recording the last executed XCM (used by dry-run runtime APIs). type XcmRecorder: RecordXcm; + /// Handler for publish/subscribe operations on the relay chain. + type BroadcastHandler: BroadcastHandler; } diff --git a/polkadot/xcm/xcm-executor/src/lib.rs b/polkadot/xcm/xcm-executor/src/lib.rs index 1c569225ce2b6..886e1f0f3c4a3 100644 --- a/polkadot/xcm/xcm-executor/src/lib.rs +++ b/polkadot/xcm/xcm-executor/src/lib.rs @@ -33,11 +33,11 @@ use xcm::latest::{prelude::*, AssetTransferFilter}; pub mod traits; use traits::{ - validate_export, AssetExchange, AssetLock, CallDispatcher, ClaimAssets, ConvertOrigin, - DropAssets, Enact, EventEmitter, ExportXcm, FeeManager, FeeReason, HandleHrmpChannelAccepted, - HandleHrmpChannelClosing, HandleHrmpNewChannelOpenRequest, OnResponse, ProcessTransaction, - Properties, ShouldExecute, TransactAsset, VersionChangeNotifier, WeightBounds, WeightTrader, - XcmAssetTransfers, + validate_export, AssetExchange, AssetLock, BroadcastHandler, CallDispatcher, ClaimAssets, + ConvertOrigin, DropAssets, Enact, EventEmitter, ExportXcm, FeeManager, FeeReason, + HandleHrmpChannelAccepted, HandleHrmpChannelClosing, HandleHrmpNewChannelOpenRequest, + OnResponse, ProcessTransaction, Properties, ShouldExecute, TransactAsset, + VersionChangeNotifier, WeightBounds, WeightTrader, XcmAssetTransfers, }; pub use traits::RecordXcm; @@ -1819,6 +1819,16 @@ impl XcmExecutor { Config::TransactionalProcessor::process(|| { Config::HrmpChannelClosingHandler::handle(initiator, sender, recipient) }), + Publish { data } => { + let origin = self.origin_ref().ok_or(XcmError::BadOrigin)?; + Config::BroadcastHandler::handle_publish(origin, data)?; + Ok(()) + }, + Subscribe { publisher } => { + let origin = self.origin_ref().ok_or(XcmError::BadOrigin)?; + Config::BroadcastHandler::handle_subscribe(origin, publisher)?; + Ok(()) + }, } } diff --git a/polkadot/xcm/xcm-executor/src/tests/mock.rs b/polkadot/xcm/xcm-executor/src/tests/mock.rs index 850629bef8c06..ef8789f8b1713 100644 --- a/polkadot/xcm/xcm-executor/src/tests/mock.rs +++ b/polkadot/xcm/xcm-executor/src/tests/mock.rs @@ -328,4 +328,5 @@ impl Config for XcmConfig { type HrmpChannelAcceptedHandler = (); type HrmpChannelClosingHandler = (); type XcmRecorder = (); + type BroadcastHandler = (); } diff --git a/polkadot/xcm/xcm-executor/src/traits/broadcast_handler.rs b/polkadot/xcm/xcm-executor/src/traits/broadcast_handler.rs new file mode 100644 index 0000000000000..0b489f5abecae --- /dev/null +++ b/polkadot/xcm/xcm-executor/src/traits/broadcast_handler.rs @@ -0,0 +1,43 @@ +// Copyright (C) Parity Technologies (UK) Ltd. +// This file is part of Polkadot. + +// Polkadot is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Polkadot is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Polkadot. If not, see . + +//! Traits for handling publish/subscribe operations in XCM. + +use xcm::latest::{Location, PublishData, Result as XcmResult}; + +/// Trait for handling publish/subscribe operations on the relay chain. +pub trait BroadcastHandler { + /// Handle publish operation from the given origin. + /// Should validate origin authorization and extract necessary data. + fn handle_publish(origin: &Location, data: PublishData) -> XcmResult; + + /// Handle subscribe/unsubscribe operation from the given origin. + /// Toggles subscription state for the given publisher. + fn handle_subscribe(origin: &Location, publisher: u32) -> XcmResult; +} + +/// Implementation of `BroadcastHandler` for the unit type `()`. +impl BroadcastHandler for () { + fn handle_publish(_origin: &Location, _data: PublishData) -> XcmResult { + // No-op implementation for unit type + Ok(()) + } + + fn handle_subscribe(_origin: &Location, _publisher: u32) -> XcmResult { + // No-op implementation for unit type + Ok(()) + } +} diff --git a/polkadot/xcm/xcm-executor/src/traits/mod.rs b/polkadot/xcm/xcm-executor/src/traits/mod.rs index 038de83e3fa37..5149eacea1421 100644 --- a/polkadot/xcm/xcm-executor/src/traits/mod.rs +++ b/polkadot/xcm/xcm-executor/src/traits/mod.rs @@ -50,6 +50,8 @@ mod hrmp; pub use hrmp::{ HandleHrmpChannelAccepted, HandleHrmpChannelClosing, HandleHrmpNewChannelOpenRequest, }; +mod broadcast_handler; +pub use broadcast_handler::BroadcastHandler; mod event_emitter; mod record_xcm; mod weight; @@ -62,9 +64,9 @@ pub use weight::{WeightBounds, WeightTrader}; pub mod prelude { pub use super::{ - export_xcm, validate_export, AssetExchange, AssetLock, ClaimAssets, ConvertOrigin, - DropAssets, Enact, Error, EventEmitter, ExportXcm, FeeManager, FeeReason, LockError, - MatchesFungible, MatchesFungibles, MatchesInstance, MatchesNonFungible, + export_xcm, validate_export, AssetExchange, AssetLock, BroadcastHandler, ClaimAssets, + ConvertOrigin, DropAssets, Enact, Error, EventEmitter, ExportXcm, FeeManager, FeeReason, + LockError, MatchesFungible, MatchesFungibles, MatchesInstance, MatchesNonFungible, MatchesNonFungibles, OnResponse, ProcessTransaction, ShouldExecute, TransactAsset, VersionChangeNotifier, WeightBounds, WeightTrader, WithOriginFilter, };