diff --git a/core/src/banking_stage/consume_worker.rs b/core/src/banking_stage/consume_worker.rs index 2ad31c74cdc..3c11e1a21b2 100644 --- a/core/src/banking_stage/consume_worker.rs +++ b/core/src/banking_stage/consume_worker.rs @@ -122,6 +122,7 @@ impl ConsumeWorker { self.metrics.has_data.store(true, Ordering::Relaxed); self.consumed_sender.send(FinishedConsumeWork { + slot: Some(bank.slot()), work, retryable_indexes: output .execute_and_commit_transactions_output @@ -164,6 +165,7 @@ impl ConsumeWorker { .fetch_add(num_retryable, Ordering::Relaxed); self.metrics.has_data.store(true, Ordering::Relaxed); self.consumed_sender.send(FinishedConsumeWork { + slot: None, work, retryable_indexes, })?; diff --git a/core/src/banking_stage/scheduler_messages.rs b/core/src/banking_stage/scheduler_messages.rs index 1c7cf31592b..71d801cb9c4 100644 --- a/core/src/banking_stage/scheduler_messages.rs +++ b/core/src/banking_stage/scheduler_messages.rs @@ -46,6 +46,8 @@ pub struct ConsumeWork { /// Message: [Worker -> Scheduler] /// Processed transactions. pub struct FinishedConsumeWork { + /// Slot that work was attempted on. `None` if sent back without attempt. + pub slot: Option, pub work: ConsumeWork, pub retryable_indexes: Vec, } diff --git a/core/src/banking_stage/transaction_scheduler/greedy_scheduler.rs b/core/src/banking_stage/transaction_scheduler/greedy_scheduler.rs index b7de3e35778..3f057b362b3 100644 --- a/core/src/banking_stage/transaction_scheduler/greedy_scheduler.rs +++ b/core/src/banking_stage/transaction_scheduler/greedy_scheduler.rs @@ -98,6 +98,7 @@ impl Scheduler for GreedyScheduler { let mut num_sent: usize = 0; let mut num_unschedulable_conflicts: usize = 0; let mut num_unschedulable_threads: usize = 0; + let mut num_skipped_retry: usize = 0; let mut batches = Batches::new(num_threads, self.config.target_transactions_per_batch); while num_scanned < self.config.max_scanned_transactions_per_scheduling_pass @@ -152,6 +153,10 @@ impl Scheduler for GreedyScheduler { num_unschedulable_threads += 1; self.unschedulables.push(id); } + Err(TransactionSchedulingError::Skipped) => { + num_skipped_retry += 1; + self.unschedulables.push(id); + } Ok(TransactionSchedulingInfo { thread_id, transaction, @@ -209,6 +214,7 @@ impl Scheduler for GreedyScheduler { num_scheduled, num_unschedulable_conflicts, num_unschedulable_threads, + num_skipped_retry, num_filtered_out: 0, filter_time_us: 0, }) @@ -228,6 +234,7 @@ fn try_schedule_transaction( ) -> Result, TransactionSchedulingError> { match pre_lock_filter(transaction_state) { PreLockFilterAction::AttemptToSchedule => {} + PreLockFilterAction::SkipAndRetain => return Err(TransactionSchedulingError::Skipped), } // Schedule the transaction if it can be. diff --git a/core/src/banking_stage/transaction_scheduler/prio_graph_scheduler.rs b/core/src/banking_stage/transaction_scheduler/prio_graph_scheduler.rs index b74b94a642d..debbb93a380 100644 --- a/core/src/banking_stage/transaction_scheduler/prio_graph_scheduler.rs +++ b/core/src/banking_stage/transaction_scheduler/prio_graph_scheduler.rs @@ -197,6 +197,7 @@ impl Scheduler for PrioGraphScheduler { let mut num_sent: usize = 0; let mut num_unschedulable_conflicts: usize = 0; let mut num_unschedulable_threads: usize = 0; + let mut num_skipped_retry: usize = 0; while num_scanned < self.config.max_scanned_transactions_per_scheduling_pass { // If nothing is in the main-queue of the `PrioGraph` then there's nothing left to schedule. if self.prio_graph.is_empty() { @@ -239,6 +240,10 @@ impl Scheduler for PrioGraphScheduler { num_unschedulable_threads += 1; unschedulable_ids.push(id); } + Err(TransactionSchedulingError::Skipped) => { + num_skipped_retry += 1; + unschedulable_ids.push(id); + } Ok(TransactionSchedulingInfo { thread_id, transaction, @@ -331,6 +336,7 @@ impl Scheduler for PrioGraphScheduler { num_scheduled, num_unschedulable_conflicts, num_unschedulable_threads, + num_skipped_retry, num_filtered_out, filter_time_us: total_filter_time_us, }) @@ -370,6 +376,7 @@ fn try_schedule_transaction( ) -> Result, TransactionSchedulingError> { match pre_lock_filter(transaction_state) { PreLockFilterAction::AttemptToSchedule => {} + PreLockFilterAction::SkipAndRetain => return Err(TransactionSchedulingError::Skipped), } // Check if this transaction conflicts with any blocked transactions @@ -694,6 +701,7 @@ mod tests { // Complete batch on thread 0. Remaining txs can be scheduled onto thread 1 finished_work_sender .send(FinishedConsumeWork { + slot: None, work: thread_0_work.into_iter().next().unwrap(), retryable_indexes: vec![], }) diff --git a/core/src/banking_stage/transaction_scheduler/scheduler.rs b/core/src/banking_stage/transaction_scheduler/scheduler.rs index 23dc4a733c0..bc32314cfcb 100644 --- a/core/src/banking_stage/transaction_scheduler/scheduler.rs +++ b/core/src/banking_stage/transaction_scheduler/scheduler.rs @@ -48,6 +48,8 @@ pub(crate) trait Scheduler { pub(crate) enum PreLockFilterAction { /// Attempt to schedule the transaction. AttemptToSchedule, + /// Skip the transaction but do not drop it. + SkipAndRetain, } /// Metrics from scheduling transactions. @@ -64,6 +66,8 @@ pub(crate) struct SchedulingSummary { pub num_unschedulable_conflicts: usize, /// Number of transactions that were skipped due to thread capacity. pub num_unschedulable_threads: usize, + /// Number of transactions that were skipped due too recently attempted. + pub num_skipped_retry: usize, /// Number of transactions that were dropped due to filter. pub num_filtered_out: usize, /// Time spent filtering transactions diff --git a/core/src/banking_stage/transaction_scheduler/scheduler_common.rs b/core/src/banking_stage/transaction_scheduler/scheduler_common.rs index df6acc23362..50fcaabef46 100644 --- a/core/src/banking_stage/transaction_scheduler/scheduler_common.rs +++ b/core/src/banking_stage/transaction_scheduler/scheduler_common.rs @@ -74,6 +74,8 @@ pub enum TransactionSchedulingError { UnschedulableConflicts, /// Thread is not allowed to be scheduled on at this time. UnschedulableThread, + /// Transaction was skipped by pre-filter logic. + Skipped, } /// Given the schedulable `thread_set`, select the thread with the least amount @@ -198,6 +200,7 @@ impl SchedulingCommon { max_ages: _, }, retryable_indexes, + slot, }) => { let num_transactions = ids.len(); let num_retryable = retryable_indexes.len(); @@ -210,7 +213,7 @@ impl SchedulingCommon { for (index, (id, transaction)) in izip!(ids, transactions).enumerate() { if let Some(retryable_index) = retryable_iter.peek() { if *retryable_index == index { - container.retry_transaction(id, transaction); + container.retry_transaction(slot, id, transaction); retryable_iter.next(); continue; } diff --git a/core/src/banking_stage/transaction_scheduler/scheduler_controller.rs b/core/src/banking_stage/transaction_scheduler/scheduler_controller.rs index 072db2bd1c7..b94d1ab78a7 100644 --- a/core/src/banking_stage/transaction_scheduler/scheduler_controller.rs +++ b/core/src/banking_stage/transaction_scheduler/scheduler_controller.rs @@ -147,7 +147,13 @@ where MAX_PROCESSING_AGE, ) }, - |_| PreLockFilterAction::AttemptToSchedule // no pre-lock filter for now + |state| { + if state.last_tried_slot() == Some(bank_start.working_bank.slot()) { + PreLockFilterAction::SkipAndRetain + } else { + PreLockFilterAction::AttemptToSchedule + } + } )?); self.count_metrics.update(|count_metrics| { @@ -543,6 +549,7 @@ mod tests { finished_consume_work_sender .send(FinishedConsumeWork { + slot: None, work: ConsumeWork { batch_id: TransactionBatchId::new(0), ids: vec![], @@ -881,6 +888,7 @@ mod tests { // Complete the batch - marking the second transaction as retryable finished_consume_work_sender .send(FinishedConsumeWork { + slot: None, work: consume_work, retryable_indexes: vec![1], }) @@ -898,4 +906,78 @@ mod tests { .collect_vec(); assert_eq!(message_hashes, vec![&tx1_hash]); } + + #[test_case(test_create_sanitized_transaction_receive_and_buffer; "Sdk")] + #[test_case(test_create_transaction_view_receive_and_buffer; "View")] + fn test_schedule_consume_slot_gated_retry( + create_receive_and_buffer: impl FnOnce(BankingPacketReceiver, Arc>) -> R, + ) { + let (test_frame, mut scheduler_controller) = + create_test_frame(1, create_receive_and_buffer); + let TestFrame { + bank, + mint_keypair, + poh_recorder, + banking_packet_sender, + consume_work_receivers, + finished_consume_work_sender, + .. + } = &test_frame; + + poh_recorder + .write() + .unwrap() + .set_bank_for_test(bank.clone()); + + // Send packet batch to the scheduler - should do nothing until we become the leader. + let tx1 = create_and_fund_prioritized_transfer( + bank, + mint_keypair, + &Keypair::new(), + &Pubkey::new_unique(), + 1, + 1000, + bank.last_blockhash(), + ); + let tx2 = create_and_fund_prioritized_transfer( + bank, + mint_keypair, + &Keypair::new(), + &Pubkey::new_unique(), + 1, + 2000, + bank.last_blockhash(), + ); + let tx1_hash = tx1.message().hash(); + let tx2_hash = tx2.message().hash(); + + let txs = vec![tx1, tx2]; + banking_packet_sender + .send(to_banking_packet_batch(&txs)) + .unwrap(); + + test_receive_then_schedule(&mut scheduler_controller); + let consume_work = consume_work_receivers[0].try_recv().unwrap(); + assert_eq!(consume_work.ids.len(), 2); + assert_eq!(consume_work.transactions.len(), 2); + let message_hashes = consume_work + .transactions + .iter() + .map(|tx| tx.message_hash()) + .collect_vec(); + assert_eq!(message_hashes, vec![&tx2_hash, &tx1_hash]); + + // Complete the batch - marking the second transaction as retryable + finished_consume_work_sender + .send(FinishedConsumeWork { + slot: Some(bank.slot()), + work: consume_work, + retryable_indexes: vec![1], + }) + .unwrap(); + + // Transaction should NOT be rescheduled + test_receive_then_schedule(&mut scheduler_controller); + assert!(consume_work_receivers[0].is_empty()); + } } diff --git a/core/src/banking_stage/transaction_scheduler/scheduler_metrics.rs b/core/src/banking_stage/transaction_scheduler/scheduler_metrics.rs index b09ef262a00..00adbd83822 100644 --- a/core/src/banking_stage/transaction_scheduler/scheduler_metrics.rs +++ b/core/src/banking_stage/transaction_scheduler/scheduler_metrics.rs @@ -419,6 +419,7 @@ pub struct SchedulingDetails { pub sum_num_scheduled: usize, pub sum_unschedulable_conflicts: usize, pub sum_unschedulable_threads: usize, + pub sum_skipped_retry: usize, } impl Default for SchedulingDetails { @@ -435,6 +436,7 @@ impl Default for SchedulingDetails { sum_num_scheduled: 0, sum_unschedulable_conflicts: 0, sum_unschedulable_threads: 0, + sum_skipped_retry: 0, } } } @@ -458,6 +460,7 @@ impl SchedulingDetails { .max_starting_buffer_size .max(scheduling_summary.starting_buffer_size); self.sum_starting_buffer_size += scheduling_summary.starting_buffer_size; + self.sum_skipped_retry += scheduling_summary.num_skipped_retry; self.sum_num_scheduled += scheduling_summary.num_scheduled; self.sum_unschedulable_conflicts += scheduling_summary.num_unschedulable_conflicts; @@ -502,6 +505,7 @@ impl SchedulingDetails { self.sum_unschedulable_threads, i64 ), + ("num_skipped_retry", self.sum_skipped_retry, i64), ); *self = Self { last_report: now, diff --git a/core/src/banking_stage/transaction_scheduler/transaction_state.rs b/core/src/banking_stage/transaction_scheduler/transaction_state.rs index e52d09ed798..7443b718b1e 100644 --- a/core/src/banking_stage/transaction_scheduler/transaction_state.rs +++ b/core/src/banking_stage/transaction_scheduler/transaction_state.rs @@ -1,4 +1,4 @@ -use crate::banking_stage::scheduler_messages::MaxAge; +use {crate::banking_stage::scheduler_messages::MaxAge, solana_sdk::clock::Slot}; /// TransactionState is used to track the state of a transaction in the transaction scheduler /// and banking stage as a whole. @@ -20,6 +20,8 @@ pub(crate) struct TransactionState { priority: u64, /// Estimated cost of the transaction. cost: u64, + /// Last slot transaction attempted execution on. + last_tried_slot: Option, } impl TransactionState { @@ -30,6 +32,7 @@ impl TransactionState { max_age, priority, cost, + last_tried_slot: None, } } @@ -58,6 +61,16 @@ impl TransactionState { (tx, self.max_age) } + /// Return the last tried slot. + pub(crate) fn last_tried_slot(&self) -> Option { + self.last_tried_slot + } + + /// Set a new last tried slot. + pub(crate) fn set_last_tried_slot(&mut self, slot: Option) { + self.last_tried_slot = slot; + } + /// Intended to be called when a transaction is retried. This method will /// return ownership of the transaction to the state. /// diff --git a/core/src/banking_stage/transaction_scheduler/transaction_state_container.rs b/core/src/banking_stage/transaction_scheduler/transaction_state_container.rs index db096fcaa62..90298fd4591 100644 --- a/core/src/banking_stage/transaction_scheduler/transaction_state_container.rs +++ b/core/src/banking_stage/transaction_scheduler/transaction_state_container.rs @@ -8,7 +8,7 @@ use { solana_runtime_transaction::{ runtime_transaction::RuntimeTransaction, transaction_with_meta::TransactionWithMeta, }, - solana_sdk::packet::PACKET_DATA_SIZE, + solana_sdk::{clock::Slot, packet::PACKET_DATA_SIZE}, std::sync::Arc, }; @@ -67,10 +67,16 @@ pub(crate) trait StateContainer { /// Retries a transaction - inserts transaction back into map. /// This transitions the transaction to `Unprocessed` state. - fn retry_transaction(&mut self, transaction_id: TransactionId, transaction: Tx) { + fn retry_transaction( + &mut self, + tried_slot: Option, + transaction_id: TransactionId, + transaction: Tx, + ) { let transaction_state = self .get_mut_transaction_state(transaction_id) .expect("transaction must exist"); + transaction_state.set_last_tried_slot(tried_slot); let priority_id = TransactionPriorityId::new(transaction_state.priority(), transaction_id); transaction_state.retry_transaction(transaction); self.push_ids_into_queue(std::iter::once(priority_id));