Skip to content

Commit

Permalink
[FRAME] Use 'ready' pages in XCMP suspend logic (#2393)
Browse files Browse the repository at this point in the history
Changes:
- `QueueFootprint` gets a new field; `ready_pages` that contains the
non-overweight and not yet processed pages.
- `XCMP` queue pallet is change to use the `ready_pages` instead of
`pages` to calculate the channel suspension thresholds.

This should give the XCMP queue pallet a more correct view of when to
suspend channels.

---------

Signed-off-by: Oliver Tale-Yazdi <[email protected]>
  • Loading branch information
ggwpez authored Mar 5, 2024
1 parent c367ac2 commit 329c077
Show file tree
Hide file tree
Showing 8 changed files with 45 additions and 11 deletions.
6 changes: 3 additions & 3 deletions cumulus/pallets/xcmp-queue/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -600,7 +600,7 @@ impl<T: Config> Pallet<T> {
let QueueConfigData { drop_threshold, .. } = <QueueConfig<T>>::get();
let fp = T::XcmpQueue::footprint(sender);
// Assume that it will not fit into the current page:
let new_pages = fp.pages.saturating_add(1);
let new_pages = fp.ready_pages.saturating_add(1);
if new_pages > drop_threshold {
// This should not happen since the channel should have been suspended in
// [`on_queue_changed`].
Expand Down Expand Up @@ -663,12 +663,12 @@ impl<T: Config> OnQueueChanged<ParaId> for Pallet<T> {
let mut suspended_channels = <InboundXcmpSuspended<T>>::get();
let suspended = suspended_channels.contains(&para);

if suspended && fp.pages <= resume_threshold {
if suspended && fp.ready_pages <= resume_threshold {
Self::send_signal(para, ChannelSignal::Resume);

suspended_channels.remove(&para);
<InboundXcmpSuspended<T>>::put(suspended_channels);
} else if !suspended && fp.pages >= suspend_threshold {
} else if !suspended && fp.ready_pages >= suspend_threshold {
log::warn!("XCMP queue for sibling {:?} is full; suspending channel.", para);
Self::send_signal(para, ChannelSignal::Suspend);

Expand Down
1 change: 1 addition & 0 deletions cumulus/pallets/xcmp-queue/src/mock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,7 @@ impl<T: OnQueueChanged<ParaId>> EnqueueMessage<ParaId> for EnqueueToLocalStorage
}
}
footprint.pages = footprint.storage.size as u32 / 16; // Number does not matter
footprint.ready_pages = footprint.pages;
footprint
}
}
Expand Down
16 changes: 16 additions & 0 deletions prdoc/pr_2393.prdoc
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
title: "[XCMP] Use the number of 'ready' pages in XCMP suspend logic"

doc:
- audience: Runtime Dev
description: |
Semantics of the suspension logic in the XCMP queue pallet change from using the number of
total pages to the number of 'ready' pages. The number of ready pages is now also exposed by
the `MessageQueue` pallet to downstream via the queue `footprint`.

crates:
- name: cumulus-pallet-xcmp-queue
bump: patch
- name: pallet-message-queue
bump: patch
- name: frame-support
bump: major
5 changes: 5 additions & 0 deletions substrate/frame/message-queue/src/integration_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -330,6 +330,11 @@ fn process_some_messages(num_msgs: u32) {
ServiceWeight::set(Some(weight));
let consumed = next_block();

for origin in BookStateFor::<Test>::iter_keys() {
let fp = MessageQueue::footprint(origin);
assert_eq!(fp.pages, fp.ready_pages);
}

assert_eq!(consumed, weight, "\n{}", MessageQueue::debug_info());
assert_eq!(NumMessagesProcessed::take(), num_msgs as usize);
}
Expand Down
9 changes: 7 additions & 2 deletions substrate/frame/message-queue/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -208,8 +208,9 @@ use frame_support::{
defensive,
pallet_prelude::*,
traits::{
Defensive, DefensiveTruncateFrom, EnqueueMessage, ExecuteOverweightError, Footprint,
ProcessMessage, ProcessMessageError, QueueFootprint, QueuePausedQuery, ServiceQueues,
Defensive, DefensiveSaturating, DefensiveTruncateFrom, EnqueueMessage,
ExecuteOverweightError, Footprint, ProcessMessage, ProcessMessageError, QueueFootprint,
QueuePausedQuery, ServiceQueues,
},
BoundedSlice, CloneNoBound, DefaultNoBound,
};
Expand Down Expand Up @@ -442,6 +443,7 @@ impl<MessageOrigin> From<BookState<MessageOrigin>> for QueueFootprint {
fn from(book: BookState<MessageOrigin>) -> Self {
QueueFootprint {
pages: book.count,
ready_pages: book.end.defensive_saturating_sub(book.begin),
storage: Footprint { count: book.message_count, size: book.size },
}
}
Expand Down Expand Up @@ -1281,6 +1283,9 @@ impl<T: Config> Pallet<T> {
ensure!(book.message_count < 1 << 30, "Likely overflow or corruption");
ensure!(book.size < 1 << 30, "Likely overflow or corruption");
ensure!(book.count < 1 << 30, "Likely overflow or corruption");

let fp: QueueFootprint = book.into();
ensure!(fp.ready_pages <= fp.pages, "There cannot be more ready than total pages");
}

//loop around this origin
Expand Down
4 changes: 2 additions & 2 deletions substrate/frame/message-queue/src/mock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -355,8 +355,8 @@ pub fn num_overweight_enqueued_events() -> u32 {
.count() as u32
}

pub fn fp(pages: u32, count: u64, size: u64) -> QueueFootprint {
QueueFootprint { storage: Footprint { count, size }, pages }
pub fn fp(pages: u32, ready_pages: u32, count: u64, size: u64) -> QueueFootprint {
QueueFootprint { storage: Footprint { count, size }, pages, ready_pages }
}

/// A random seed that can be overwritten with `MQ_SEED`.
Expand Down
13 changes: 9 additions & 4 deletions substrate/frame/message-queue/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1064,28 +1064,33 @@ fn footprint_num_pages_works() {
MessageQueue::enqueue_message(msg("weight=2"), Here);
MessageQueue::enqueue_message(msg("weight=3"), Here);

assert_eq!(MessageQueue::footprint(Here), fp(2, 2, 16));
assert_eq!(MessageQueue::footprint(Here), fp(2, 2, 2, 16));

// Mark the messages as overweight.
assert_eq!(MessageQueue::service_queues(1.into_weight()), 0.into_weight());
assert_eq!(System::events().len(), 2);
// Overweight does not change the footprint.
assert_eq!(MessageQueue::footprint(Here), fp(2, 2, 16));
// `ready_pages` decreases but `page` count does not.
assert_eq!(MessageQueue::footprint(Here), fp(2, 0, 2, 16));

// Now execute the second message.
assert_eq!(
<MessageQueue as ServiceQueues>::execute_overweight(3.into_weight(), (Here, 1, 0))
.unwrap(),
3.into_weight()
);
assert_eq!(MessageQueue::footprint(Here), fp(1, 1, 8));
assert_eq!(MessageQueue::footprint(Here), fp(1, 0, 1, 8));
// And the first one:
assert_eq!(
<MessageQueue as ServiceQueues>::execute_overweight(2.into_weight(), (Here, 0, 0))
.unwrap(),
2.into_weight()
);
assert_eq!(MessageQueue::footprint(Here), Default::default());
assert_eq!(MessageQueue::footprint(Here), fp(0, 0, 0, 0));

// `ready_pages` and normal `pages` increases again:
MessageQueue::enqueue_message(msg("weight=3"), Here);
assert_eq!(MessageQueue::footprint(Here), fp(1, 1, 1, 8));
})
}

Expand Down
2 changes: 2 additions & 0 deletions substrate/frame/support/src/traits/messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,8 @@ impl<OverweightAddr> ServiceQueues for NoopServiceQueues<OverweightAddr> {
pub struct QueueFootprint {
/// The number of pages in the queue (including overweight pages).
pub pages: u32,
/// The number of pages that are ready (not yet processed and also not overweight).
pub ready_pages: u32,
/// The storage footprint of the queue (including overweight messages).
pub storage: Footprint,
}
Expand Down

0 comments on commit 329c077

Please sign in to comment.