Skip to content

Commit cd2aa1b

Browse files
authored
Buffer pre-session tasks in BP unified scheduler (anza-xyz#4949)
* Buffer pre-session tasks in BP unified scheduler * Clean up comments and tests * Add doc comments to SchedulingStateMachine * Make SchedulingContext::bank() return Option<_> * kick ci * kick ci
1 parent 64a8871 commit cd2aa1b

File tree

10 files changed

+653
-63
lines changed

10 files changed

+653
-63
lines changed

Cargo.lock

+7
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

+1
Original file line numberDiff line numberDiff line change
@@ -594,6 +594,7 @@ tower = "0.5.2"
594594
trait-set = "0.3.0"
595595
trees = "0.4.2"
596596
tungstenite = "0.20.1"
597+
unwrap_none = "0.1.2"
597598
uriparse = "0.6.4"
598599
url = "2.5.4"
599600
vec_extract_if_polyfill = "0.1.0"

ledger/src/blockstore_processor.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -5032,7 +5032,7 @@ pub mod tests {
50325032
..
50335033
} = create_genesis_config_with_leader(500, &dummy_leader_pubkey, 100);
50345034
let bank = Arc::new(Bank::new_for_tests(&genesis_config));
5035-
let context = SchedulingContext::new(bank.clone());
5035+
let context = SchedulingContext::for_verification(bank.clone());
50365036

50375037
let txs = create_test_transactions(&mint_keypair, &genesis_config.hash());
50385038

programs/sbf/Cargo.lock

+7
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

runtime/src/bank_forks.rs

+8-1
Original file line numberDiff line numberDiff line change
@@ -246,7 +246,14 @@ impl BankForks {
246246
let context = SchedulingContext::new_with_mode(mode, bank.clone());
247247
let scheduler = scheduler_pool.take_scheduler(context);
248248
let bank_with_scheduler = BankWithScheduler::new(bank, Some(scheduler));
249-
scheduler_pool.register_timeout_listener(bank_with_scheduler.create_timeout_listener());
249+
// Skip registering for block production. Both the tvu main loop in the replay stage
250+
// and PohRecorder don't support _concurrent block production_ at all. It's strongly
251+
// assumed that block is produced in singleton way and it's actually desired, while
252+
// ignoring the opportunity cost of (hopefully rare!) fork switching...
253+
if matches!(mode, SchedulingMode::BlockVerification) {
254+
scheduler_pool
255+
.register_timeout_listener(bank_with_scheduler.create_timeout_listener());
256+
}
250257
bank_with_scheduler
251258
} else {
252259
BankWithScheduler::new_without_scheduler(bank)

runtime/src/installed_scheduler_pool.rs

+38-17
Original file line numberDiff line numberDiff line change
@@ -235,43 +235,54 @@ pub type SchedulerId = u64;
235235
/// expected to be used by a particular scheduler only for that duration of the time and to be
236236
/// disposed by the scheduler. Then, the scheduler may work on different banks with new
237237
/// `SchedulingContext`s.
238+
///
239+
/// There's a special construction only used for scheduler preallocation, which has no bank. Panics
240+
/// will be triggered when tried to be used normally across code-base.
238241
#[derive(Clone, Debug)]
239242
pub struct SchedulingContext {
240243
mode: SchedulingMode,
241-
bank: Arc<Bank>,
244+
bank: Option<Arc<Bank>>,
242245
}
243246

244247
impl SchedulingContext {
245-
pub fn new(bank: Arc<Bank>) -> Self {
246-
// mode will be configurable later
248+
pub fn for_preallocation() -> Self {
247249
Self {
248-
mode: SchedulingMode::BlockVerification,
249-
bank,
250+
mode: SchedulingMode::BlockProduction,
251+
bank: None,
250252
}
251253
}
252254

253255
pub fn new_with_mode(mode: SchedulingMode, bank: Arc<Bank>) -> Self {
254-
Self { mode, bank }
256+
Self {
257+
mode,
258+
bank: Some(bank),
259+
}
260+
}
261+
262+
#[cfg_attr(feature = "dev-context-only-utils", qualifiers(pub))]
263+
fn for_verification(bank: Arc<Bank>) -> Self {
264+
Self::new_with_mode(SchedulingMode::BlockVerification, bank)
255265
}
256266

257267
#[cfg(feature = "dev-context-only-utils")]
258268
pub fn for_production(bank: Arc<Bank>) -> Self {
259-
Self {
260-
mode: SchedulingMode::BlockProduction,
261-
bank,
262-
}
269+
Self::new_with_mode(SchedulingMode::BlockProduction, bank)
270+
}
271+
272+
pub fn is_preallocated(&self) -> bool {
273+
self.bank.is_none()
263274
}
264275

265276
pub fn mode(&self) -> SchedulingMode {
266277
self.mode
267278
}
268279

269-
pub fn bank(&self) -> &Arc<Bank> {
270-
&self.bank
280+
pub fn bank(&self) -> Option<&Arc<Bank>> {
281+
self.bank.as_ref()
271282
}
272283

273-
pub fn slot(&self) -> Slot {
274-
self.bank().slot()
284+
pub fn slot(&self) -> Option<Slot> {
285+
self.bank.as_ref().map(|bank| bank.slot())
275286
}
276287
}
277288

@@ -424,11 +435,19 @@ pub struct BankWithSchedulerInner {
424435
pub type InstalledSchedulerRwLock = RwLock<SchedulerStatus>;
425436

426437
impl BankWithScheduler {
438+
/// Creates a new `BankWithScheduler` from bank and its associated scheduler.
439+
///
440+
/// # Panics
441+
///
442+
/// Panics if `scheduler`'s scheduling context is unmatched to given bank or for scheduler
443+
/// preallocation.
427444
#[cfg_attr(feature = "dev-context-only-utils", qualifiers(pub))]
428445
pub(crate) fn new(bank: Arc<Bank>, scheduler: Option<InstalledSchedulerBox>) -> Self {
446+
// Avoid the fatal situation in which bank is being associated with a scheduler associated
447+
// to a different bank!
429448
if let Some(bank_in_context) = scheduler
430449
.as_ref()
431-
.map(|scheduler| scheduler.context().bank())
450+
.map(|scheduler| scheduler.context().bank().unwrap())
432451
{
433452
assert!(Arc::ptr_eq(&bank, bank_in_context));
434453
}
@@ -570,7 +589,9 @@ impl BankWithSchedulerInner {
570589
let pool = pool.clone();
571590
drop(scheduler);
572591

573-
let context = SchedulingContext::new(self.bank.clone());
592+
// Schedulers can be stale only if its mode is block-verification. So,
593+
// unconditional context construction for verification is okay here.
594+
let context = SchedulingContext::for_verification(self.bank.clone());
574595
let mut scheduler = self.scheduler.write().unwrap();
575596
trace!("with_active_scheduler: {:?}", scheduler);
576597
scheduler.transition_from_stale_to_active(|pool, result_with_timings| {
@@ -773,7 +794,7 @@ mod tests {
773794
mock.expect_context()
774795
.times(1)
775796
.in_sequence(&mut seq.lock().unwrap())
776-
.return_const(SchedulingContext::new(bank));
797+
.return_const(SchedulingContext::for_verification(bank));
777798

778799
for wait_reason in is_dropped_flags {
779800
let seq_cloned = seq.clone();

svm/examples/Cargo.lock

+7
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

unified-scheduler-logic/Cargo.toml

+1
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ solana-pubkey = { workspace = true }
1515
solana-runtime-transaction = { workspace = true }
1616
solana-transaction = { workspace = true }
1717
static_assertions = { workspace = true }
18+
unwrap_none = { workspace = true }
1819

1920
[dev-dependencies]
2021
solana-instruction = { workspace = true }

unified-scheduler-logic/src/lib.rs

+41-2
Original file line numberDiff line numberDiff line change
@@ -103,9 +103,10 @@ use {
103103
solana_transaction::sanitized::SanitizedTransaction,
104104
static_assertions::const_assert_eq,
105105
std::{collections::VecDeque, mem, sync::Arc},
106+
unwrap_none::UnwrapNone,
106107
};
107108

108-
#[derive(Clone, Copy, Debug)]
109+
#[derive(Clone, Copy, Debug, PartialEq)]
109110
pub enum SchedulingMode {
110111
BlockVerification,
111112
BlockProduction,
@@ -668,11 +669,49 @@ impl SchedulingStateMachine {
668669
/// indicating the scheduled task is blocked currently.
669670
///
670671
/// Note that this function takes ownership of the task to allow for future optimizations.
672+
#[cfg(any(test, doc))]
671673
#[must_use]
672674
pub fn schedule_task(&mut self, task: Task) -> Option<Task> {
675+
self.schedule_or_buffer_task(task, false)
676+
}
677+
678+
/// Adds given `task` to internal buffer, even if it's immediately schedulable otherwise.
679+
///
680+
/// Put differently, buffering means to force the task to be blocked unconditionally after
681+
/// normal scheduling processing.
682+
///
683+
/// Thus, the task is internally retained inside this [`SchedulingStateMachine`], whether the
684+
/// task is blocked or not. Eventually, the buffered task will be returned by one of later
685+
/// invocations [`schedule_next_unblocked_task()`](Self::schedule_next_unblocked_task).
686+
///
687+
/// Note that this function takes ownership of the task to allow for future optimizations.
688+
pub fn buffer_task(&mut self, task: Task) {
689+
self.schedule_or_buffer_task(task, true).unwrap_none();
690+
}
691+
692+
/// Schedules or buffers given `task`, returning successful one unless buffering is forced.
693+
///
694+
/// Refer to [`schedule_task()`](Self::schedule_task) and
695+
/// [`buffer_task()`](Self::buffer_task) for the difference between _scheduling_ and
696+
/// _buffering_ respectively.
697+
///
698+
/// Note that this function takes ownership of the task to allow for future optimizations.
699+
#[must_use]
700+
pub fn schedule_or_buffer_task(&mut self, task: Task, force_buffering: bool) -> Option<Task> {
673701
self.total_task_count.increment_self();
674702
self.active_task_count.increment_self();
675-
self.try_lock_usage_queues(task)
703+
self.try_lock_usage_queues(task).and_then(|task| {
704+
// locking succeeded, and then ...
705+
if force_buffering {
706+
// ... push to unblocked_task_queue, if buffering is forced.
707+
self.unblocked_task_count.increment_self();
708+
self.unblocked_task_queue.push_back(task);
709+
None
710+
} else {
711+
// ... return the task back as schedulable to the caller as-is otherwise.
712+
Some(task)
713+
}
714+
})
676715
}
677716

678717
#[must_use]

0 commit comments

Comments
 (0)