Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 25 additions & 18 deletions core/src/banking_stage/transaction_scheduler/greedy_scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,7 @@ use {
super::{
scheduler::{PreLockFilterAction, Scheduler, SchedulingSummary},
scheduler_common::{
select_thread, Batches, SchedulingCommon, TransactionSchedulingError,
TransactionSchedulingInfo,
select_thread, SchedulingCommon, TransactionSchedulingError, TransactionSchedulingInfo,
},
scheduler_error::SchedulerError,
thread_aware_account_locks::{ThreadAwareAccountLocks, ThreadId, ThreadSet, TryLockError},
Expand Down Expand Up @@ -59,9 +58,13 @@ impl<Tx: TransactionWithMeta> GreedyScheduler<Tx> {
config: GreedySchedulerConfig,
) -> Self {
Self {
common: SchedulingCommon::new(consume_work_senders, finished_consume_work_receiver),
working_account_set: ReadWriteAccountSet::default(),
unschedulables: Vec::with_capacity(config.max_scanned_transactions_per_scheduling_pass),
common: SchedulingCommon::new(
consume_work_senders,
finished_consume_work_receiver,
config.target_transactions_per_batch,
),
config,
}
}
Expand Down Expand Up @@ -96,14 +99,18 @@ impl<Tx: TransactionWithMeta> Scheduler<Tx> for GreedyScheduler<Tx> {
});
}

debug_assert!(
self.common.batches.is_empty(),
"batches must start empty for scheduling"
);

// Track metrics on filter.
let mut num_scanned: usize = 0;
let mut num_scheduled = Saturating::<usize>(0);
let mut num_sent: usize = 0;
let mut num_unschedulable_conflicts: usize = 0;
let mut num_unschedulable_threads: usize = 0;

let mut batches = Batches::new(num_threads, self.config.target_transactions_per_batch);
while num_scanned < self.config.max_scanned_transactions_per_scheduling_pass
&& !schedulable_threads.is_empty()
&& !container.is_empty()
Expand All @@ -127,9 +134,7 @@ impl<Tx: TransactionWithMeta> Scheduler<Tx> for GreedyScheduler<Tx> {
.check_locks(transaction_state.transaction())
{
self.working_account_set.clear();
num_sent += self
.common
.send_batches(&mut batches, self.config.target_transactions_per_batch)?;
num_sent += self.common.send_batches()?;
}

// Now check if the transaction can actually be scheduled.
Expand All @@ -141,9 +146,9 @@ impl<Tx: TransactionWithMeta> Scheduler<Tx> for GreedyScheduler<Tx> {
|thread_set| {
select_thread(
thread_set,
batches.total_cus(),
self.common.batches.total_cus(),
self.common.in_flight_tracker.cus_in_flight_per_thread(),
batches.transactions(),
self.common.batches.transactions(),
self.common.in_flight_tracker.num_in_flight_per_thread(),
)
},
Expand All @@ -167,23 +172,26 @@ impl<Tx: TransactionWithMeta> Scheduler<Tx> for GreedyScheduler<Tx> {
"locks must be available"
);
num_scheduled += 1;
batches.add_transaction_to_batch(thread_id, id.id, transaction, max_age, cost);
self.common.batches.add_transaction_to_batch(
thread_id,
id.id,
transaction,
max_age,
cost,
);

// If target batch size is reached, send all the batches
if batches.transactions()[thread_id].len()
if self.common.batches.transactions()[thread_id].len()
>= self.config.target_transactions_per_batch
{
self.working_account_set.clear();
num_sent += self.common.send_batches(
&mut batches,
self.config.target_transactions_per_batch,
)?;
num_sent += self.common.send_batches()?;
}

// if the thread is at target_cu_per_thread, remove it from the schedulable threads
// if there are no more schedulable threads, stop scheduling.
if self.common.in_flight_tracker.cus_in_flight_per_thread()[thread_id]
+ batches.total_cus()[thread_id]
+ self.common.batches.total_cus()[thread_id]
>= target_cu_per_thread
{
schedulable_threads.remove(thread_id);
Expand All @@ -196,8 +204,7 @@ impl<Tx: TransactionWithMeta> Scheduler<Tx> for GreedyScheduler<Tx> {
}

self.working_account_set.clear();
// Use zero here to avoid allocating since we are done with `Batches`.
num_sent += self.common.send_batches(&mut batches, 0)?;
num_sent += self.common.send_batches()?;
let Saturating(num_scheduled) = num_scheduled;
assert_eq!(
num_scheduled, num_sent,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,8 @@ use {
read_write_account_set::ReadWriteAccountSet,
scheduler_messages::{ConsumeWork, FinishedConsumeWork},
transaction_scheduler::{
scheduler_common::{select_thread, Batches},
transaction_priority_id::TransactionPriorityId,
transaction_state::TransactionState,
transaction_state_container::StateContainer,
scheduler_common::select_thread, transaction_priority_id::TransactionPriorityId,
transaction_state::TransactionState, transaction_state_container::StateContainer,
},
},
crossbeam_channel::{Receiver, Sender},
Expand Down Expand Up @@ -79,7 +77,11 @@ impl<Tx: TransactionWithMeta> PrioGraphScheduler<Tx> {
config: PrioGraphSchedulerConfig,
) -> Self {
Self {
common: SchedulingCommon::new(consume_work_senders, finished_consume_work_receiver),
common: SchedulingCommon::new(
consume_work_senders,
finished_consume_work_receiver,
config.target_transactions_per_batch,
),
prio_graph: PrioGraph::new(passthrough_priority),
config,
}
Expand Down Expand Up @@ -131,7 +133,6 @@ impl<Tx: TransactionWithMeta> Scheduler<Tx> for PrioGraphScheduler<Tx> {
});
}

let mut batches = Batches::new(num_threads, self.config.target_transactions_per_batch);
// Some transactions may be unschedulable due to multi-thread conflicts.
// These transactions cannot be scheduled until some conflicting work is completed.
// However, the scheduler should not allow other transactions that conflict with
Expand Down Expand Up @@ -195,6 +196,10 @@ impl<Tx: TransactionWithMeta> Scheduler<Tx> for PrioGraphScheduler<Tx> {
// Check transactions against filter, remove from container if it fails.
chunked_pops(container, &mut self.prio_graph, &mut window_budget);

debug_assert!(
self.common.batches.is_empty(),
"batches must start empty for scheduling"
);
let mut unblock_this_batch = Vec::with_capacity(
self.common.consume_work_senders.len() * self.config.target_transactions_per_batch,
);
Expand Down Expand Up @@ -228,9 +233,9 @@ impl<Tx: TransactionWithMeta> Scheduler<Tx> for PrioGraphScheduler<Tx> {
|thread_set| {
select_thread(
thread_set,
batches.total_cus(),
self.common.batches.total_cus(),
self.common.in_flight_tracker.cus_in_flight_per_thread(),
batches.transactions(),
self.common.batches.transactions(),
self.common.in_flight_tracker.num_in_flight_per_thread(),
)
},
Expand All @@ -252,7 +257,7 @@ impl<Tx: TransactionWithMeta> Scheduler<Tx> for PrioGraphScheduler<Tx> {
cost,
}) => {
num_scheduled += 1;
batches.add_transaction_to_batch(
self.common.batches.add_transaction_to_batch(
thread_id,
id.id,
transaction,
Expand All @@ -261,20 +266,16 @@ impl<Tx: TransactionWithMeta> Scheduler<Tx> for PrioGraphScheduler<Tx> {
);

// If target batch size is reached, send only this batch.
if batches.transactions()[thread_id].len()
if self.common.batches.transactions()[thread_id].len()
>= self.config.target_transactions_per_batch
{
num_sent += self.common.send_batch(
&mut batches,
thread_id,
self.config.target_transactions_per_batch,
)?;
num_sent += self.common.send_batch(thread_id)?;
}

// if the thread is at max_cu_per_thread, remove it from the schedulable threads
// if there are no more schedulable threads, stop scheduling.
if self.common.in_flight_tracker.cus_in_flight_per_thread()[thread_id]
+ batches.total_cus()[thread_id]
+ self.common.batches.total_cus()[thread_id]
>= max_cu_per_thread
{
schedulable_threads.remove(thread_id);
Expand All @@ -291,9 +292,7 @@ impl<Tx: TransactionWithMeta> Scheduler<Tx> for PrioGraphScheduler<Tx> {
}

// Send all non-empty batches
num_sent += self
.common
.send_batches(&mut batches, self.config.target_transactions_per_batch)?;
num_sent += self.common.send_batches()?;

// Refresh window budget and do chunked pops
window_budget += unblock_this_batch.len();
Expand All @@ -306,9 +305,7 @@ impl<Tx: TransactionWithMeta> Scheduler<Tx> for PrioGraphScheduler<Tx> {
}

// Send batches for any remaining transactions
num_sent += self
.common
.send_batches(&mut batches, self.config.target_transactions_per_batch)?;
num_sent += self.common.send_batches()?;

// Push unschedulable ids back into the container
container.push_ids_into_queue(unschedulable_ids.into_iter());
Expand Down
Loading
Loading