Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions prdoc/pr_8723.prdoc
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
title: Scheduler `on_initialize` supports skipped blocks
doc:
- audience: Runtime Dev
description: |-
Scheduler correctly handles situations where `on_initialize` is invoked with block numbers that:
- increase but are not strictly consecutive (e.g., jump from 5 → 10), or
- are repeated (e.g., multiple blocks are built at the same Relay Chain parent block, all reporting the same `BlockNumberProvider` value).
This situation may occur when the `BlockNumberProvider` is not local - for example, on a parachain using the Relay Chain block number provider.
Implementation notes:
- The `IncompleteSince` value is always set to the next block `(now + 1)`.
- A scheduled task is considered permanently overweight only if it fails during the first agenda processing.

crates:
- name: pallet-scheduler
bump: patch

6 changes: 3 additions & 3 deletions substrate/frame/scheduler/src/benchmarking.rs
Original file line number Diff line number Diff line change
Expand Up @@ -154,14 +154,14 @@ mod benchmarks {
) -> Result<(), BenchmarkError> {
let now = BLOCK_NUMBER.into();
fill_schedule::<T>(now, s)?;
let mut executed = 0;
assert_eq!(Agenda::<T>::get(now).len() as u32, s);

#[block]
{
Pallet::<T>::service_agenda(&mut WeightMeter::new(), &mut executed, now, now, 0);
Pallet::<T>::service_agenda(&mut WeightMeter::new(), true, now, now, 0);
}

assert_eq!(executed, 0);
assert_eq!(Agenda::<T>::get(now).len() as u32, s);

Ok(())
}
Expand Down
17 changes: 12 additions & 5 deletions substrate/frame/scheduler/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1224,30 +1224,37 @@ impl<T: Config> Pallet<T> {

let mut incomplete_since = now + One::one();
let mut when = IncompleteSince::<T>::take().unwrap_or(now);
let mut executed = 0;
let mut is_first = true; // first task from the first agenda.

let max_items = T::MaxScheduledPerBlock::get();
let mut count_down = max;
let service_agenda_base_weight = T::WeightInfo::service_agenda_base(max_items);
while count_down > 0 && when <= now && weight.can_consume(service_agenda_base_weight) {
if !Self::service_agenda(weight, &mut executed, now, when, u32::MAX) {
if !Self::service_agenda(weight, is_first, now, when, u32::MAX) {
incomplete_since = incomplete_since.min(when);
}
is_first = false;
Comment thread
gui1117 marked this conversation as resolved.
when.saturating_inc();
count_down.saturating_dec();
}
incomplete_since = incomplete_since.min(when);
if incomplete_since <= now {
Self::deposit_event(Event::AgendaIncomplete { when: incomplete_since });
IncompleteSince::<T>::put(incomplete_since);
} else {
// The next scheduler iteration should typically start from `now + 1` (`next_iter_now`).
// However, if the [`Config::BlockNumberProvider`] is not a local block number provider,
// then `next_iter_now` could be `now + n` where `n > 1`. In this case, we want to start
// from `now + 1` to ensure we don't miss any agendas.
IncompleteSince::<T>::put(now + One::one());
Copy link
Copy Markdown
Member

@ggwpez ggwpez Jun 2, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So we basically "abuse" IncompleteSince to just be a cursor that always tracks the next number, yea should work.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also considered adding a new storage item like NextBlock, in this case (1) it becomes exactly what is IncompleteSince with this PR changes or (2) it always shows next block from last iteration and we need to manage both storage values, making code a bit more complex.

}
}

/// Returns `true` if the agenda was fully completed, `false` if it should be revisited at a
/// later block.
fn service_agenda(
weight: &mut WeightMeter,
executed: &mut u32,
mut is_first: bool,
now: BlockNumberFor<T>,
when: BlockNumberFor<T>,
max: u32,
Expand Down Expand Up @@ -1283,7 +1290,7 @@ impl<T: Config> Pallet<T> {
agenda[agenda_index as usize] = Some(task);
break
}
let result = Self::service_task(weight, now, when, agenda_index, *executed == 0, task);
let result = Self::service_task(weight, now, when, agenda_index, is_first, task);
agenda[agenda_index as usize] = match result {
Err((Unavailable, slot)) => {
dropped += 1;
Expand All @@ -1294,7 +1301,7 @@ impl<T: Config> Pallet<T> {
slot
},
Ok(()) => {
*executed += 1;
is_first = false;
None
},
};
Expand Down
132 changes: 125 additions & 7 deletions substrate/frame/scheduler/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1636,7 +1636,8 @@ fn on_initialize_weight_is_correct() {
));

// Will include the named periodic only
<Test as Config>::BlockNumberProvider::set_block_number(1);
let now = 1;
<Test as Config>::BlockNumberProvider::set_block_number(now);
assert_eq!(
Scheduler::on_initialize(42), // BN unused
TestWeightInfo::service_agendas_base() +
Expand All @@ -1645,11 +1646,12 @@ fn on_initialize_weight_is_correct() {
TestWeightInfo::execute_dispatch_unsigned() +
call_weight + Weight::from_parts(4, 0)
);
assert_eq!(IncompleteSince::<Test>::get(), None);
assert_eq!(IncompleteSince::<Test>::get(), Some(now + 1));
assert_eq!(logger::log(), vec![(root(), 2600u32)]);

// Will include anon and anon periodic
<Test as Config>::BlockNumberProvider::set_block_number(2);
let now = 2;
<Test as Config>::BlockNumberProvider::set_block_number(now);
assert_eq!(
Scheduler::on_initialize(123), // BN unused
TestWeightInfo::service_agendas_base() +
Expand All @@ -1661,11 +1663,12 @@ fn on_initialize_weight_is_correct() {
TestWeightInfo::execute_dispatch_unsigned() +
call_weight + Weight::from_parts(2, 0)
);
assert_eq!(IncompleteSince::<Test>::get(), None);
assert_eq!(IncompleteSince::<Test>::get(), Some(now + 1));
assert_eq!(logger::log(), vec![(root(), 2600u32), (root(), 69u32), (root(), 42u32)]);

// Will include named only
<Test as Config>::BlockNumberProvider::set_block_number(3);
let now = 3;
<Test as Config>::BlockNumberProvider::set_block_number(now);
assert_eq!(
Scheduler::on_initialize(555), // BN unused
TestWeightInfo::service_agendas_base() +
Expand All @@ -1674,19 +1677,21 @@ fn on_initialize_weight_is_correct() {
TestWeightInfo::execute_dispatch_unsigned() +
call_weight + Weight::from_parts(1, 0)
);
assert_eq!(IncompleteSince::<Test>::get(), None);
assert_eq!(IncompleteSince::<Test>::get(), Some(now + 1));
assert_eq!(
logger::log(),
vec![(root(), 2600u32), (root(), 69u32), (root(), 42u32), (root(), 3u32)]
);

// Will contain none
<Test as Config>::BlockNumberProvider::set_block_number(4);
let now = 4;
<Test as Config>::BlockNumberProvider::set_block_number(now);
let actual_weight = Scheduler::on_initialize(444); // BN unused
assert_eq!(
actual_weight,
TestWeightInfo::service_agendas_base() + TestWeightInfo::service_agenda_base(0)
);
assert_eq!(IncompleteSince::<Test>::get(), Some(now + 1));
});
}

Expand Down Expand Up @@ -3076,3 +3081,116 @@ fn postponed_task_is_still_available() {
assert!(Agenda::<Test>::get(4).is_empty());
});
}

/// Scheduler does not iterate over some blocks that have agendas (for example when block number
/// provider is not local), but the tasks from those skipped agendas still processed later.
#[test]
fn on_initialize_misses_blocks() {
new_test_ext().execute_with(|| {
let now = 1;
System::run_to_block::<AllPalletsWithSystem>(now);

let schedule_at = now + 1;
assert_ok!(Scheduler::schedule(
RuntimeOrigin::root(),
schedule_at,
None,
128,
Box::new(RuntimeCall::from(frame_system::Call::remark {
remark: vec![0u8; 3 * 1024 * 1024],
}))
));
assert_eq!(Agenda::<Test>::get(schedule_at).len(), 1);

// scheduler `on_initialize` was not triggered at blocks `[now, 5)`.
let next_scheduler_run_at = schedule_at + 5;
// it runs at `next_scheduler_run_at` but processes all skipped blocks.
System::set_block_number(next_scheduler_run_at - 1);
System::run_to_block::<AllPalletsWithSystem>(next_scheduler_run_at);

// task processed.
assert_eq!(Agenda::<Test>::get(schedule_at).len(), 0);
System::assert_last_event(
crate::Event::Dispatched { task: (schedule_at, 0), id: None, result: Ok(()) }.into(),
);
});
}

/// Scheduler runs `on_initialize` twice for the same block.
#[test]
fn on_initialize_runs_twice_for_the_same_block() {
new_test_ext().execute_with(|| {
let now = 1;
System::run_to_block::<AllPalletsWithSystem>(now);
assert_eq!(IncompleteSince::<Test>::get(), Some(now + 1));

let now = 2;

assert_ok!(Scheduler::schedule(
RuntimeOrigin::root(),
now,
None,
128,
Box::new(RuntimeCall::from(frame_system::Call::remark {
remark: vec![0u8; 3 * 1024 * 1024],
}))
));
assert_eq!(Agenda::<Test>::get(now).len(), 1);

System::run_to_block::<AllPalletsWithSystem>(now);
assert_eq!(Agenda::<Test>::get(now).len(), 0);
assert_eq!(IncompleteSince::<Test>::get(), Some(now + 1));

// Run `on_initialize` again at the same block.
System::run_to_block::<AllPalletsWithSystem>(now);
// IncompleteSince is not updated.
assert_eq!(IncompleteSince::<Test>::get(), Some(now + 1));
});
}

/// The task is not considered overweight if the scheduler processes not the first agenda within one
/// `on_initialize` even if no more tasks were processed since processing empty agenda has a base
/// weight.
#[test]
fn not_permanently_overweight_when_task_from_not_first_agenda() {
new_test_ext().execute_with(|| {
let now = 1;
System::run_to_block::<AllPalletsWithSystem>(now);

let schedule_at = now + 5;
let max_weight: Weight = <Test as Config>::MaximumWeight::get();
let call = RuntimeCall::Logger(LoggerCall::log { i: 42, weight: max_weight });
assert_ok!(Scheduler::do_schedule(
DispatchTime::At(schedule_at),
None,
127,
root(),
Preimage::bound(call).unwrap(),
));

// scheduler `on_initialize` was not triggered at blocks `[now, schedule_at + 5)`.
let next_scheduler_run_at = schedule_at + 5;
// it runs at `next_scheduler_run_at - 1` starting from `now + 1` and tries to process
// the task after processing agendas `[now + 1, schedule_at)`.
System::set_block_number(next_scheduler_run_at - 1);
System::run_to_block::<AllPalletsWithSystem>(next_scheduler_run_at);

// The task remains in the agenda because it was overweight when processed at `schedule_at`,
// causing the agenda to be marked as incomplete. This is not considered permanently
// overweight yet.
assert_eq!(Agenda::<Test>::get(schedule_at).len(), 1);
System::assert_last_event(crate::Event::AgendaIncomplete { when: schedule_at }.into());

// Run to the next block and start from `schedule_at`.
System::run_to_block::<AllPalletsWithSystem>(next_scheduler_run_at + 1);

// Now its permanently overweight.
assert_eq!(
System::events().last().unwrap().event,
crate::Event::PermanentlyOverweight { task: (schedule_at, 0), id: None }.into(),
);
// permanently overweight tasks are not removed from the agenda.
assert_eq!(Agenda::<Test>::get(schedule_at).len(), 1);
assert_eq!(IncompleteSince::<Test>::get(), Some(System::block_number() + 1));
});
}
Loading