Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Commit

Permalink
PoC #1
Browse files Browse the repository at this point in the history
Signed-off-by: Andrei Sandu <[email protected]>
  • Loading branch information
sandreim committed May 24, 2022
1 parent 851fc93 commit 0031a53
Show file tree
Hide file tree
Showing 3 changed files with 157 additions and 33 deletions.
178 changes: 148 additions & 30 deletions runtime/parachains/src/dmp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,25 +21,33 @@ use crate::{
use frame_support::pallet_prelude::*;
use primitives::v2::{DownwardMessage, Hash, Id as ParaId, InboundDownwardMessage};
use sp_runtime::traits::{BlakeTwo256, Hash as HashT, SaturatedConversion};
use sp_std::{fmt, prelude::*};
use sp_std::{fmt, num::Wrapping, prelude::*};
use xcm::latest::SendError;

pub use pallet::*;

#[cfg(test)]
mod tests;

/// The message key for a group of downward messages.
#[derive(Encode, Decode, Clone, PartialEq, Eq, RuntimeDebug, TypeInfo)]
pub struct QueueFragmentId(ParaId, u8);

/// An error sending a downward message.
#[cfg_attr(test, derive(Debug))]
pub enum QueueDownwardMessageError {
/// The message being sent exceeds the configured max message size.
ExceedsMaxMessageSize,
/// The message cannot be sent because the destination parachain message queue is full.
ExceedsMaxPendingMessageCount,
}

impl From<QueueDownwardMessageError> for SendError {
fn from(err: QueueDownwardMessageError) -> Self {
match err {
QueueDownwardMessageError::ExceedsMaxMessageSize => SendError::ExceedsMaxMessageSize,
QueueDownwardMessageError::ExceedsMaxPendingMessageCount =>
SendError::ExceedsMaxPendingMessageCount,
}
}
}
Expand Down Expand Up @@ -68,6 +76,22 @@ impl fmt::Debug for ProcessedDownwardMessagesAcceptanceErr {
}
}

/// A slice of the `para dmq`.
pub type QueueFragment<T: Config> = Vec<InboundDownwardMessage<T::BlockNumber>>;

/// To reduce the runtime memory footprint when sending or receiving messages we will split
/// the queue in fragments of `QUEUE_FRAGMENT_SIZE` capacity. Tuning this constant allows
/// to control how we trade off the overhead per stored message vs memory footprint of individual
/// message read/writes from/to storage. The fragments are part of circular buffer per para and
/// we keep track of the head and tail fragments.
///
/// TODO(maybe) - make these configuration parameters?
///
/// Defines the queue fragment capacity.
pub const QUEUE_FRAGMENT_SIZE: u32 = 32;
/// Defines the maximum amount of fragments to process when calling `dmq_contents`.
pub const MAX_FRAGMENTS_PER_QUERY: u32 = 8;

#[frame_support::pallet]
pub mod pallet {
use super::*;
Expand All @@ -80,15 +104,20 @@ pub mod pallet {
#[pallet::config]
pub trait Config: frame_system::Config + configuration::Config {}

#[pallet::storage]
#[pallet::getter(fn dmp_queue_head)]
pub(super) type DownwardMessageQueueHead<T: Config> =
StorageMap<_, Twox64Concat, ParaId, u8, ValueQuery>;

#[pallet::storage]
#[pallet::getter(fn dmp_queue_tail)]
pub(super) type DownwardMessageQueueTail<T: Config> =
StorageMap<_, Twox64Concat, ParaId, u8, ValueQuery>;

/// The downward messages addressed for a certain para.
#[pallet::storage]
pub(crate) type DownwardMessageQueues<T: Config> = StorageMap<
_,
Twox64Concat,
ParaId,
Vec<InboundDownwardMessage<T::BlockNumber>>,
ValueQuery,
>;
pub(crate) type DownwardMessageQueues<T: Config> =
StorageMap<_, Twox64Concat, QueueFragmentId, QueueFragment<T>, ValueQuery>;

/// A mapping that stores the downward message queue MQC head for each para.
///
Expand Down Expand Up @@ -131,9 +160,27 @@ impl<T: Config> Pallet<T> {
}
}

fn update_head(para: &ParaId, new_head: Wrapping<u8>) -> Weight {
<Self as Store>::DownwardMessageQueueHead::mutate(para, |head_pointer| {
*head_pointer = new_head.0;
});

T::DbWeight::get().reads_writes(1, 1)
}

fn update_tail(para: &ParaId, new_tail: Wrapping<u8>) -> Weight {
<Self as Store>::DownwardMessageQueueTail::mutate(para, |tail_pointer| {
*tail_pointer = new_tail.0;
});

T::DbWeight::get().reads_writes(1, 1)
}

/// Remove all relevant storage items for an outgoing parachain.
fn clean_dmp_after_outgoing(outgoing_para: &ParaId) {
<Self as Store>::DownwardMessageQueues::remove(outgoing_para);
for index in 0..u8::MAX {
<Self as Store>::DownwardMessageQueues::remove(QueueFragmentId(*outgoing_para, index));
}
<Self as Store>::DownwardMessageQueueHeads::remove(outgoing_para);
}

Expand All @@ -150,11 +197,40 @@ impl<T: Config> Pallet<T> {
para: ParaId,
msg: DownwardMessage,
) -> Result<(), QueueDownwardMessageError> {
// Check if message is oversized.
let serialized_len = msg.len() as u32;
if serialized_len > config.max_downward_message_size {
return Err(QueueDownwardMessageError::ExceedsMaxMessageSize)
}

let head = Wrapping(Self::dmp_queue_head(para));
let mut tail = Wrapping(Self::dmp_queue_tail(para));

// Ring buffer is empty.
if head == tail {
// Tail always points to the next buffer unused fragment.
tail += 1;
Self::update_tail(&para, tail);
}

let tail_fragment_len = <Self as Store>::DownwardMessageQueues::decode_len(
QueueFragmentId(para, (tail - Wrapping(1)).0),
)
.unwrap_or(0)
.saturated_into::<u32>();

// Check if we need a new fragment.
if tail_fragment_len >= QUEUE_FRAGMENT_SIZE {
// Check if ring buffer is full.
if tail + std::num::Wrapping(1) == head {
return Err(QueueDownwardMessageError::ExceedsMaxPendingMessageCount)
}

// Advance tail.
tail += 1;
Self::update_tail(&para, tail);
}

let inbound =
InboundDownwardMessage { msg, sent_at: <frame_system::Pallet<T>>::block_number() };

Expand All @@ -165,9 +241,13 @@ impl<T: Config> Pallet<T> {
*head = new_head;
});

<Self as Store>::DownwardMessageQueues::mutate(para, |v| {
v.push(inbound);
});
// Insert message in the tail queue fragment.
<Self as Store>::DownwardMessageQueues::mutate(
QueueFragmentId(para, (tail - Wrapping(1)).0),
|v| {
v.push(inbound);
},
);

Ok(())
}
Expand All @@ -194,17 +274,29 @@ impl<T: Config> Pallet<T> {

/// Prunes the specified number of messages from the downward message queue of the given para.
pub(crate) fn prune_dmq(para: ParaId, processed_downward_messages: u32) -> Weight {
<Self as Store>::DownwardMessageQueues::mutate(para, |q| {
let processed_downward_messages = processed_downward_messages as usize;
if processed_downward_messages > q.len() {
// reaching this branch is unexpected due to the constraint established by
// `check_processed_downward_messages`. But better be safe than sorry.
q.clear();
} else {
*q = q.split_off(processed_downward_messages);
}
});
T::DbWeight::get().reads_writes(1, 1)
let mut head = std::num::Wrapping(Self::dmp_queue_head(para));
let tail = std::num::Wrapping(Self::dmp_queue_tail(para));
let mut messages_to_prune = processed_downward_messages as usize;
let mut total_weight = T::DbWeight::get().reads_writes(2, 0);

// Prune all processed messages in multiple fragments in the ring buffer.
while messages_to_prune > 0 && head != tail {
<Self as Store>::DownwardMessageQueues::mutate(QueueFragmentId(para, head.0), |q| {
if messages_to_prune > q.len() {
messages_to_prune = messages_to_prune.saturating_sub(q.len());
q.clear();
// Advance head.
head += 1;
} else {
*q = q.split_off(messages_to_prune);
messages_to_prune = 0;
}
});
total_weight += T::DbWeight::get().reads_writes(1, 1);
}

// Update head.
Self::update_head(&para, head) + total_weight
}

/// Returns the Head of Message Queue Chain for the given para or `None` if there is none
Expand All @@ -218,15 +310,41 @@ impl<T: Config> Pallet<T> {
///
/// Returns 0 if the para doesn't have an associated downward message queue.
pub(crate) fn dmq_length(para: ParaId) -> u32 {
<Self as Store>::DownwardMessageQueues::decode_len(&para)
.unwrap_or(0)
.saturated_into::<u32>()
let mut head = std::num::Wrapping(Self::dmp_queue_head(para));
let tail = std::num::Wrapping(Self::dmp_queue_tail(para));
let mut length = 0;

while head != tail {
length +=
<Self as Store>::DownwardMessageQueues::decode_len(QueueFragmentId(para, head.0))
.unwrap_or(0)
.saturated_into::<u32>();
head += 1;
}

length
}

/// Returns the downward message queue contents for the given para.
///
/// The most recent messages are the latest in the vector.
/// Returns up to `MAX_FRAGMENTS_PER_QUERY*QUEUE_FRAGMENT_SIZE` messages from the queue contents for the given para.
/// The result will be an empty vector if the para doesn't exist or it's queue is empty.
/// The result preserves the original message ordering - first element of vec is oldest message, while last is most
/// recent.
/// This is to be used in conjuction with `prune_dmq` to achieve pagination of arbitrary large queues.
pub(crate) fn dmq_contents(recipient: ParaId) -> Vec<InboundDownwardMessage<T::BlockNumber>> {
<Self as Store>::DownwardMessageQueues::get(&recipient)
let mut head = std::num::Wrapping(Self::dmp_queue_head(recipient));
let tail = std::num::Wrapping(Self::dmp_queue_tail(recipient));
let mut result = Vec::new();

while head != tail &&
result.len() < (MAX_FRAGMENTS_PER_QUERY * QUEUE_FRAGMENT_SIZE) as usize
{
result.extend(<Self as Store>::DownwardMessageQueues::get(QueueFragmentId(
recipient, head.0,
)));
// Advance to next fragment.
head += 1;
}

result
}
}
6 changes: 3 additions & 3 deletions runtime/parachains/src/dmp/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,9 +73,9 @@ fn clean_dmp_works() {
let outgoing_paras = vec![a, b];
Dmp::initializer_on_new_session(&notification, &outgoing_paras);

assert!(<Dmp as Store>::DownwardMessageQueues::get(&a).is_empty());
assert!(<Dmp as Store>::DownwardMessageQueues::get(&b).is_empty());
assert!(!<Dmp as Store>::DownwardMessageQueues::get(&c).is_empty());
assert!(<Dmp as Store>::DownwardMessageQueues::get(QueueFragmentId(a, 0)).is_empty());
assert!(<Dmp as Store>::DownwardMessageQueues::get(QueueFragmentId(b, 0)).is_empty());
assert!(!<Dmp as Store>::DownwardMessageQueues::get(QueueFragmentId(c, 0)).is_empty());
});
}

Expand Down
6 changes: 6 additions & 0 deletions xcm/src/v2/traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,9 @@ pub enum Error {
/// Used by the `Trap` instruction to force an error intentionally. Its code is included.
#[codec(index = 21)]
Trap(u64),
/// The attempt to send a message failed becasue the transport protocol queue is full.
#[codec(index = 22)]
ExceedsMaxPendingMessageCount,

// Errors that happen prior to instructions being executed. These fall outside of the XCM spec.
/// XCM version not able to be handled.
Expand All @@ -117,6 +120,7 @@ impl From<SendError> for Error {
SendError::Transport(s) => Error::Transport(s),
SendError::DestinationUnsupported => Error::DestinationUnsupported,
SendError::ExceedsMaxMessageSize => Error::ExceedsMaxMessageSize,
SendError::ExceedsMaxPendingMessageCount => Error::ExceedsMaxPendingMessageCount,
}
}
}
Expand Down Expand Up @@ -223,6 +227,8 @@ pub enum SendError {
/// Message could not be sent due to its size exceeding the maximum allowed by the transport
/// layer.
ExceedsMaxMessageSize,
/// Message could not be sent due to the transport layer queue being full.
ExceedsMaxPendingMessageCount,
}

/// Result value when attempting to send an XCM message.
Expand Down

0 comments on commit 0031a53

Please sign in to comment.