diff --git a/core/src/banking_stage/transaction_scheduler/greedy_scheduler.rs b/core/src/banking_stage/transaction_scheduler/greedy_scheduler.rs index 7f2b5d4b0fa..09bff1437ee 100644 --- a/core/src/banking_stage/transaction_scheduler/greedy_scheduler.rs +++ b/core/src/banking_stage/transaction_scheduler/greedy_scheduler.rs @@ -4,8 +4,7 @@ use { super::{ scheduler::{PreLockFilterAction, Scheduler, SchedulingSummary}, scheduler_common::{ - select_thread, Batches, SchedulingCommon, TransactionSchedulingError, - TransactionSchedulingInfo, + select_thread, SchedulingCommon, TransactionSchedulingError, TransactionSchedulingInfo, }, scheduler_error::SchedulerError, thread_aware_account_locks::{ThreadAwareAccountLocks, ThreadId, ThreadSet, TryLockError}, @@ -59,9 +58,13 @@ impl GreedyScheduler { config: GreedySchedulerConfig, ) -> Self { Self { - common: SchedulingCommon::new(consume_work_senders, finished_consume_work_receiver), working_account_set: ReadWriteAccountSet::default(), unschedulables: Vec::with_capacity(config.max_scanned_transactions_per_scheduling_pass), + common: SchedulingCommon::new( + consume_work_senders, + finished_consume_work_receiver, + config.target_transactions_per_batch, + ), config, } } @@ -96,6 +99,11 @@ impl Scheduler for GreedyScheduler { }); } + debug_assert!( + self.common.batches.is_empty(), + "batches must start empty for scheduling" + ); + // Track metrics on filter. let mut num_scanned: usize = 0; let mut num_scheduled = Saturating::(0); @@ -103,7 +111,6 @@ impl Scheduler for GreedyScheduler { let mut num_unschedulable_conflicts: usize = 0; let mut num_unschedulable_threads: usize = 0; - let mut batches = Batches::new(num_threads, self.config.target_transactions_per_batch); while num_scanned < self.config.max_scanned_transactions_per_scheduling_pass && !schedulable_threads.is_empty() && !container.is_empty() @@ -127,9 +134,7 @@ impl Scheduler for GreedyScheduler { .check_locks(transaction_state.transaction()) { self.working_account_set.clear(); - num_sent += self - .common - .send_batches(&mut batches, self.config.target_transactions_per_batch)?; + num_sent += self.common.send_batches()?; } // Now check if the transaction can actually be scheduled. @@ -141,9 +146,9 @@ impl Scheduler for GreedyScheduler { |thread_set| { select_thread( thread_set, - batches.total_cus(), + self.common.batches.total_cus(), self.common.in_flight_tracker.cus_in_flight_per_thread(), - batches.transactions(), + self.common.batches.transactions(), self.common.in_flight_tracker.num_in_flight_per_thread(), ) }, @@ -167,23 +172,26 @@ impl Scheduler for GreedyScheduler { "locks must be available" ); num_scheduled += 1; - batches.add_transaction_to_batch(thread_id, id.id, transaction, max_age, cost); + self.common.batches.add_transaction_to_batch( + thread_id, + id.id, + transaction, + max_age, + cost, + ); // If target batch size is reached, send all the batches - if batches.transactions()[thread_id].len() + if self.common.batches.transactions()[thread_id].len() >= self.config.target_transactions_per_batch { self.working_account_set.clear(); - num_sent += self.common.send_batches( - &mut batches, - self.config.target_transactions_per_batch, - )?; + num_sent += self.common.send_batches()?; } // if the thread is at target_cu_per_thread, remove it from the schedulable threads // if there are no more schedulable threads, stop scheduling. if self.common.in_flight_tracker.cus_in_flight_per_thread()[thread_id] - + batches.total_cus()[thread_id] + + self.common.batches.total_cus()[thread_id] >= target_cu_per_thread { schedulable_threads.remove(thread_id); @@ -196,8 +204,7 @@ impl Scheduler for GreedyScheduler { } self.working_account_set.clear(); - // Use zero here to avoid allocating since we are done with `Batches`. - num_sent += self.common.send_batches(&mut batches, 0)?; + num_sent += self.common.send_batches()?; let Saturating(num_scheduled) = num_scheduled; assert_eq!( num_scheduled, num_sent, diff --git a/core/src/banking_stage/transaction_scheduler/prio_graph_scheduler.rs b/core/src/banking_stage/transaction_scheduler/prio_graph_scheduler.rs index 35a2bcfe837..6e48ebd9e84 100644 --- a/core/src/banking_stage/transaction_scheduler/prio_graph_scheduler.rs +++ b/core/src/banking_stage/transaction_scheduler/prio_graph_scheduler.rs @@ -14,10 +14,8 @@ use { read_write_account_set::ReadWriteAccountSet, scheduler_messages::{ConsumeWork, FinishedConsumeWork}, transaction_scheduler::{ - scheduler_common::{select_thread, Batches}, - transaction_priority_id::TransactionPriorityId, - transaction_state::TransactionState, - transaction_state_container::StateContainer, + scheduler_common::select_thread, transaction_priority_id::TransactionPriorityId, + transaction_state::TransactionState, transaction_state_container::StateContainer, }, }, crossbeam_channel::{Receiver, Sender}, @@ -79,7 +77,11 @@ impl PrioGraphScheduler { config: PrioGraphSchedulerConfig, ) -> Self { Self { - common: SchedulingCommon::new(consume_work_senders, finished_consume_work_receiver), + common: SchedulingCommon::new( + consume_work_senders, + finished_consume_work_receiver, + config.target_transactions_per_batch, + ), prio_graph: PrioGraph::new(passthrough_priority), config, } @@ -131,7 +133,6 @@ impl Scheduler for PrioGraphScheduler { }); } - let mut batches = Batches::new(num_threads, self.config.target_transactions_per_batch); // Some transactions may be unschedulable due to multi-thread conflicts. // These transactions cannot be scheduled until some conflicting work is completed. // However, the scheduler should not allow other transactions that conflict with @@ -195,6 +196,10 @@ impl Scheduler for PrioGraphScheduler { // Check transactions against filter, remove from container if it fails. chunked_pops(container, &mut self.prio_graph, &mut window_budget); + debug_assert!( + self.common.batches.is_empty(), + "batches must start empty for scheduling" + ); let mut unblock_this_batch = Vec::with_capacity( self.common.consume_work_senders.len() * self.config.target_transactions_per_batch, ); @@ -228,9 +233,9 @@ impl Scheduler for PrioGraphScheduler { |thread_set| { select_thread( thread_set, - batches.total_cus(), + self.common.batches.total_cus(), self.common.in_flight_tracker.cus_in_flight_per_thread(), - batches.transactions(), + self.common.batches.transactions(), self.common.in_flight_tracker.num_in_flight_per_thread(), ) }, @@ -252,7 +257,7 @@ impl Scheduler for PrioGraphScheduler { cost, }) => { num_scheduled += 1; - batches.add_transaction_to_batch( + self.common.batches.add_transaction_to_batch( thread_id, id.id, transaction, @@ -261,20 +266,16 @@ impl Scheduler for PrioGraphScheduler { ); // If target batch size is reached, send only this batch. - if batches.transactions()[thread_id].len() + if self.common.batches.transactions()[thread_id].len() >= self.config.target_transactions_per_batch { - num_sent += self.common.send_batch( - &mut batches, - thread_id, - self.config.target_transactions_per_batch, - )?; + num_sent += self.common.send_batch(thread_id)?; } // if the thread is at max_cu_per_thread, remove it from the schedulable threads // if there are no more schedulable threads, stop scheduling. if self.common.in_flight_tracker.cus_in_flight_per_thread()[thread_id] - + batches.total_cus()[thread_id] + + self.common.batches.total_cus()[thread_id] >= max_cu_per_thread { schedulable_threads.remove(thread_id); @@ -291,9 +292,7 @@ impl Scheduler for PrioGraphScheduler { } // Send all non-empty batches - num_sent += self - .common - .send_batches(&mut batches, self.config.target_transactions_per_batch)?; + num_sent += self.common.send_batches()?; // Refresh window budget and do chunked pops window_budget += unblock_this_batch.len(); @@ -306,9 +305,7 @@ impl Scheduler for PrioGraphScheduler { } // Send batches for any remaining transactions - num_sent += self - .common - .send_batches(&mut batches, self.config.target_transactions_per_batch)?; + num_sent += self.common.send_batches()?; // Push unschedulable ids back into the container container.push_ids_into_queue(unschedulable_ids.into_iter()); diff --git a/core/src/banking_stage/transaction_scheduler/scheduler_common.rs b/core/src/banking_stage/transaction_scheduler/scheduler_common.rs index c03792915ee..730264ffcea 100644 --- a/core/src/banking_stage/transaction_scheduler/scheduler_common.rs +++ b/core/src/banking_stage/transaction_scheduler/scheduler_common.rs @@ -23,21 +23,37 @@ pub struct Batches { transactions: Vec>, max_ages: Vec>, total_cus: Vec, + target_num_transactions_per_batch: usize, } impl Batches { pub fn new(num_threads: usize, target_num_transactions_per_batch: usize) -> Self { - Self { - ids: vec![Vec::with_capacity(target_num_transactions_per_batch); num_threads], - - transactions: (0..num_threads) + fn make_vecs( + num_threads: usize, + target_num_transactions_per_batch: usize, + ) -> Vec> { + (0..num_threads) .map(|_| Vec::with_capacity(target_num_transactions_per_batch)) - .collect(), - max_ages: vec![Vec::with_capacity(target_num_transactions_per_batch); num_threads], + .collect() + } + + Self { + ids: make_vecs(num_threads, target_num_transactions_per_batch), + transactions: make_vecs(num_threads, target_num_transactions_per_batch), + max_ages: make_vecs(num_threads, target_num_transactions_per_batch), total_cus: vec![0; num_threads], + target_num_transactions_per_batch, } } + #[cfg(debug_assertions)] + pub fn is_empty(&self) -> bool { + self.ids.iter().all(|ids| ids.is_empty()) + && self.transactions.iter().all(|txs| txs.is_empty()) + && self.max_ages.iter().all(|max_ages| max_ages.is_empty()) + && self.total_cus.iter().all(|&cus| cus == 0) + } + pub fn total_cus(&self) -> &[u64] { &self.total_cus } @@ -63,20 +79,19 @@ impl Batches { pub fn take_batch( &mut self, thread_id: ThreadId, - target_num_transactions_per_batch: usize, ) -> (Vec, Vec, Vec, u64) { ( core::mem::replace( &mut self.ids[thread_id], - Vec::with_capacity(target_num_transactions_per_batch), + Vec::with_capacity(self.target_num_transactions_per_batch), ), core::mem::replace( &mut self.transactions[thread_id], - Vec::with_capacity(target_num_transactions_per_batch), + Vec::with_capacity(self.target_num_transactions_per_batch), ), core::mem::replace( &mut self.max_ages[thread_id], - Vec::with_capacity(target_num_transactions_per_batch), + Vec::with_capacity(self.target_num_transactions_per_batch), ), core::mem::replace(&mut self.total_cus[thread_id], 0), ) @@ -137,12 +152,14 @@ pub(crate) struct SchedulingCommon { pub(crate) finished_consume_work_receiver: Receiver>, pub(crate) in_flight_tracker: InFlightTracker, pub(crate) account_locks: ThreadAwareAccountLocks, + pub(crate) batches: Batches, } impl SchedulingCommon { pub fn new( consume_work_senders: Vec>>, finished_consume_work_receiver: Receiver>, + target_num_transactions_per_batch: usize, ) -> Self { let num_threads = consume_work_senders.len(); assert!(num_threads > 0, "must have at least one worker"); @@ -151,6 +168,10 @@ impl SchedulingCommon { "cannot have more than {MAX_THREADS} workers" ); Self { + batches: Batches::new( + consume_work_senders.len(), + target_num_transactions_per_batch, + ), consume_work_senders, finished_consume_work_receiver, in_flight_tracker: InFlightTracker::new(num_threads), @@ -160,18 +181,12 @@ impl SchedulingCommon { /// Send a batch of transactions to the given thread's `ConsumeWork` channel. /// Returns the number of transactions sent. - pub fn send_batch( - &mut self, - batches: &mut Batches, - thread_index: usize, - target_transactions_per_batch: usize, - ) -> Result { - if batches.ids[thread_index].is_empty() { + pub fn send_batch(&mut self, thread_index: usize) -> Result { + if self.batches.ids[thread_index].is_empty() { return Ok(0); } - let (ids, transactions, max_ages, total_cus) = - batches.take_batch(thread_index, target_transactions_per_batch); + let (ids, transactions, max_ages, total_cus) = self.batches.take_batch(thread_index); let batch_id = self .in_flight_tracker @@ -193,15 +208,9 @@ impl SchedulingCommon { /// Send all batches of transactions to the worker threads. /// Returns the number of transactions sent. - pub fn send_batches( - &mut self, - batches: &mut Batches, - target_transactions_per_batch: usize, - ) -> Result { + pub fn send_batches(&mut self) -> Result { (0..self.consume_work_senders.len()) - .map(|thread_index| { - self.send_batch(batches, thread_index, target_transactions_per_batch) - }) + .map(|thread_index| self.send_batch(thread_index)) .sum() } } @@ -317,7 +326,6 @@ mod tests { fn pop_and_add_transaction( container: &mut TransactionStateContainer, common: &mut SchedulingCommon, - batches: &mut Batches, thread_id: ThreadId, ) { let tx_id = container.pop().unwrap(); @@ -345,7 +353,13 @@ mod tests { |_thread_set| thread_id, ) .unwrap(); - batches.add_transaction_to_batch(thread_id, tx_id.id, transaction, max_age, DUMMY_COST); + common.batches.add_transaction_to_batch( + thread_id, + tx_id.id, + transaction, + max_age, + DUMMY_COST, + ); } #[test_case( @@ -430,11 +444,10 @@ mod tests { let (work_senders, work_receivers): (Vec>, Vec>) = (0..NUM_WORKERS).map(|_| unbounded()).unzip(); let (_finished_work_sender, finished_work_receiver) = unbounded(); - let mut common = SchedulingCommon::new(work_senders, finished_work_receiver); - let mut batches = Batches::new(NUM_WORKERS, 10); + let mut common = SchedulingCommon::new(work_senders, finished_work_receiver, 10); - pop_and_add_transaction(&mut container, &mut common, &mut batches, 0); - let num_scheduled = common.send_batch(&mut batches, 0, 10).unwrap(); + pop_and_add_transaction(&mut container, &mut common, 0); + let num_scheduled = common.send_batch(0).unwrap(); assert_eq!(num_scheduled, 1); assert_eq!(work_receivers[0].len(), 1); assert_eq!( @@ -446,17 +459,17 @@ mod tests { &[DUMMY_COST, 0, 0, 0] ); - let num_scheduled = common.send_batch(&mut batches, 1, 10).unwrap(); + let num_scheduled = common.send_batch(1).unwrap(); assert_eq!(num_scheduled, 0); assert_eq!(work_receivers[1].len(), 0); // not actually sent since no transactions. work_receivers[0].recv().unwrap(); // Multiple batches. - pop_and_add_transaction(&mut container, &mut common, &mut batches, 0); - pop_and_add_transaction(&mut container, &mut common, &mut batches, 2); + pop_and_add_transaction(&mut container, &mut common, 0); + pop_and_add_transaction(&mut container, &mut common, 2); - common.send_batches(&mut batches, 10).unwrap(); + common.send_batches().unwrap(); assert_eq!(work_receivers[0].len(), 1); assert_eq!(work_receivers[1].len(), 0); assert_eq!(work_receivers[2].len(), 1); @@ -479,12 +492,11 @@ mod tests { let (work_senders, work_receivers): (Vec>, Vec>) = (0..NUM_WORKERS).map(|_| unbounded()).unzip(); let (finished_work_sender, finished_work_receiver) = unbounded(); - let mut common = SchedulingCommon::new(work_senders, finished_work_receiver); - let mut batches = Batches::new(NUM_WORKERS, 10); + let mut common = SchedulingCommon::new(work_senders, finished_work_receiver, 10); // Send a batch. Return completed work. - pop_and_add_transaction(&mut container, &mut common, &mut batches, 0); - let num_scheduled = common.send_batch(&mut batches, 0, 10).unwrap(); + pop_and_add_transaction(&mut container, &mut common, 0); + let num_scheduled = common.send_batch(0).unwrap(); let work = work_receivers[0].try_recv().unwrap(); assert_eq!(work.ids.len(), num_scheduled); @@ -503,10 +515,10 @@ mod tests { // Retryable indexes. add_transactions_to_container(&mut container, 3); - pop_and_add_transaction(&mut container, &mut common, &mut batches, 0); - pop_and_add_transaction(&mut container, &mut common, &mut batches, 0); - pop_and_add_transaction(&mut container, &mut common, &mut batches, 0); - let num_scheduled = common.send_batch(&mut batches, 0, 10).unwrap(); + pop_and_add_transaction(&mut container, &mut common, 0); + pop_and_add_transaction(&mut container, &mut common, 0); + pop_and_add_transaction(&mut container, &mut common, 0); + let num_scheduled = common.send_batch(0).unwrap(); let work = work_receivers[0].try_recv().unwrap(); assert_eq!(work.ids.len(), num_scheduled); let retryable_indexes = vec![0, 1]; @@ -530,13 +542,12 @@ mod tests { let (work_senders, work_receivers): (Vec>, Vec>) = (0..NUM_WORKERS).map(|_| unbounded()).unzip(); let (finished_work_sender, finished_work_receiver) = unbounded(); - let mut common = SchedulingCommon::new(work_senders, finished_work_receiver); - let mut batches = Batches::new(NUM_WORKERS, 10); + let mut common = SchedulingCommon::new(work_senders, finished_work_receiver, 10); // Retryable indexes out-of-order. add_transactions_to_container(&mut container, 2); - pop_and_add_transaction(&mut container, &mut common, &mut batches, 0); - pop_and_add_transaction(&mut container, &mut common, &mut batches, 0); - let num_scheduled = common.send_batch(&mut batches, 0, 10).unwrap(); + pop_and_add_transaction(&mut container, &mut common, 0); + pop_and_add_transaction(&mut container, &mut common, 0); + let num_scheduled = common.send_batch(0).unwrap(); let work = work_receivers[0].try_recv().unwrap(); assert_eq!(work.ids.len(), num_scheduled); let retryable_indexes = vec![1, 0];