Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix for Issue 4762 #4803

Merged
merged 12 commits into from
Oct 16, 2024
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions prdoc/pr_4803.prdoc
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
# Schema: Polkadot SDK PRDoc Schema (prdoc) v1.0.0
# See doc at https://raw.githubusercontent.com/paritytech/polkadot-sdk/master/prdoc/schema_user.json

title: Fix for issue #4762

doc:
- audience: Runtime Dev
description: |
When the status of the queue is on_initialize, throw a defensive message and return weight of 0,
however when status is on_idle, do not throw a defensive message, only return weight of 0

crates:
- name: MQ pallet
bkchr marked this conversation as resolved.
Show resolved Hide resolved
109 changes: 65 additions & 44 deletions substrate/frame/message-queue/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -649,7 +649,7 @@ pub mod pallet {
impl<T: Config> Hooks<BlockNumberFor<T>> for Pallet<T> {
fn on_initialize(_n: BlockNumberFor<T>) -> Weight {
if let Some(weight_limit) = T::ServiceWeight::get() {
Self::service_queues(weight_limit)
Self::service_queues_impl(weight_limit, ServiceQueuesContext::OnInitialize)
} else {
Weight::zero()
}
Expand All @@ -658,7 +658,10 @@ pub mod pallet {
fn on_idle(_n: BlockNumberFor<T>, remaining_weight: Weight) -> Weight {
if let Some(weight_limit) = T::IdleMaxServiceWeight::get() {
// Make use of the remaining weight to process enqueued messages.
Self::service_queues(weight_limit.min(remaining_weight))
Self::service_queues_impl(
weight_limit.min(remaining_weight),
ServiceQueuesContext::OnIdle,
)
} else {
Weight::zero()
}
Expand Down Expand Up @@ -777,6 +780,18 @@ enum MessageExecutionStatus {
StackLimitReached,
}

/// The context to pass to [`Pallet::service_queues_impl`] through on_idle and on_initialize hooks
/// We don't want to throw the defensive message if called from on_idle hook
#[derive(PartialEq)]
enum ServiceQueuesContext {
/// Context of on_idle hook.
OnIdle,
/// Context of on_initialize hook.
OnInitialize,
/// Context `service_queues` trait function.
ServiceQueues,
}

impl<T: Config> Pallet<T> {
/// Knit `origin` into the ready ring right at the end.
///
Expand Down Expand Up @@ -1511,6 +1526,53 @@ impl<T: Config> Pallet<T> {
},
}
}

fn service_queues_impl(weight_limit: Weight, context: ServiceQueuesContext) -> Weight {
let mut weight = WeightMeter::with_limit(weight_limit);

// Get the maximum weight that processing a single message may take:
let max_weight = Self::max_message_weight(weight_limit).unwrap_or_else(|| {
if matches!(context, ServiceQueuesContext::OnInitialize) {
defensive!("Not enough weight to service a single message.");
}
Weight::zero()
});

match with_service_mutex(|| {
let mut next = match Self::bump_service_head(&mut weight) {
Some(h) => h,
None => return weight.consumed(),
};
// The last queue that did not make any progress.
// The loop aborts as soon as it arrives at this queue again without making any progress
// on other queues in between.
let mut last_no_progress = None;

loop {
let (progressed, n) = Self::service_queue(next.clone(), &mut weight, max_weight);
next = match n {
Some(n) =>
if !progressed {
if last_no_progress == Some(n.clone()) {
break
}
if last_no_progress.is_none() {
last_no_progress = Some(next.clone())
}
n
} else {
last_no_progress = None;
n
},
None => break,
}
}
weight.consumed()
}) {
Err(()) => weight.consumed(),
Ok(w) => w,
}
}
}

/// Run a closure that errors on re-entrance. Meant to be used by anything that services queues.
Expand Down Expand Up @@ -1580,48 +1642,7 @@ impl<T: Config> ServiceQueues for Pallet<T> {
type OverweightMessageAddress = (MessageOriginOf<T>, PageIndex, T::Size);

fn service_queues(weight_limit: Weight) -> Weight {
let mut weight = WeightMeter::with_limit(weight_limit);

// Get the maximum weight that processing a single message may take:
let max_weight = Self::max_message_weight(weight_limit).unwrap_or_else(|| {
defensive!("Not enough weight to service a single message.");
Weight::zero()
});

match with_service_mutex(|| {
let mut next = match Self::bump_service_head(&mut weight) {
Some(h) => h,
None => return weight.consumed(),
};
// The last queue that did not make any progress.
// The loop aborts as soon as it arrives at this queue again without making any progress
// on other queues in between.
let mut last_no_progress = None;

loop {
let (progressed, n) = Self::service_queue(next.clone(), &mut weight, max_weight);
next = match n {
Some(n) =>
if !progressed {
if last_no_progress == Some(n.clone()) {
break
}
if last_no_progress.is_none() {
last_no_progress = Some(next.clone())
}
n
} else {
last_no_progress = None;
n
},
None => break,
}
}
weight.consumed()
}) {
Err(()) => weight.consumed(),
Ok(w) => w,
}
Self::service_queues_impl(weight_limit, ServiceQueuesContext::ServiceQueues)
}

/// Execute a single overweight message.
Expand Down
2 changes: 1 addition & 1 deletion substrate/frame/message-queue/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,7 @@ fn service_queues_low_weight_defensive() {
assert!(MessageQueue::do_integrity_test().is_err());

MessageQueue::enqueue_message(msg("weight=0"), Here);
MessageQueue::service_queues(104.into_weight());
MessageQueue::service_queues_impl(104.into_weight(), ServiceQueuesContext::OnInitialize);
});
}

Expand Down
Loading