diff --git a/core/src/banking_stage/consume_worker.rs b/core/src/banking_stage/consume_worker.rs index a8602397fbae42..3d616e30f68731 100644 --- a/core/src/banking_stage/consume_worker.rs +++ b/core/src/banking_stage/consume_worker.rs @@ -4,6 +4,7 @@ use { leader_slot_timing_metrics::LeaderExecuteAndCommitTimings, scheduler_messages::{ConsumeWork, FinishedConsumeWork}, }, + crate::banking_stage::consumer::RetryableIndex, crossbeam_channel::{Receiver, RecvError, SendError, Sender}, solana_measure::measure_us, solana_poh::poh_recorder::SharedWorkingBank, @@ -179,7 +180,12 @@ impl ConsumeWorker { /// Send transactions back to scheduler as retryable. fn retry(&self, work: ConsumeWork) -> Result<(), ConsumeWorkerError> { - let retryable_indexes: Vec<_> = (0..work.transactions.len()).collect(); + let retryable_indexes: Vec<_> = (0..work.transactions.len()) + .map(|index| RetryableIndex { + index, + immediately_retryable: true, + }) + .collect(); let num_retryable = retryable_indexes.len(); self.metrics .count_metrics @@ -942,7 +948,10 @@ mod tests { assert_eq!(consumed.work.batch_id, bid); assert_eq!(consumed.work.ids, vec![id]); assert_eq!(consumed.work.max_ages, vec![max_age]); - assert_eq!(consumed.retryable_indexes, vec![0]); + assert_eq!( + consumed.retryable_indexes, + vec![RetryableIndex::new(0, true)] + ); drop(test_frame); let _ = worker_thread.join().unwrap(); @@ -991,7 +1000,7 @@ mod tests { assert_eq!(consumed.work.batch_id, bid); assert_eq!(consumed.work.ids, vec![id]); assert_eq!(consumed.work.max_ages, vec![max_age]); - assert_eq!(consumed.retryable_indexes, Vec::::new()); + assert_eq!(consumed.retryable_indexes, Vec::new()); drop(test_frame); let _ = worker_thread.join().unwrap(); @@ -1051,7 +1060,7 @@ mod tests { if relax_intrabatch_account_locks { vec![] } else { - vec![1] + vec![RetryableIndex::new(1, true)] } ); @@ -1122,13 +1131,13 @@ mod tests { assert_eq!(consumed.work.batch_id, bid1); assert_eq!(consumed.work.ids, vec![id1]); assert_eq!(consumed.work.max_ages, vec![max_age]); - assert_eq!(consumed.retryable_indexes, Vec::::new()); + assert_eq!(consumed.retryable_indexes, Vec::new()); let consumed = consumed_receiver.recv().unwrap(); assert_eq!(consumed.work.batch_id, bid2); assert_eq!(consumed.work.ids, vec![id2]); assert_eq!(consumed.work.max_ages, vec![max_age]); - assert_eq!(consumed.retryable_indexes, Vec::::new()); + assert_eq!(consumed.retryable_indexes, Vec::new()); drop(test_frame); let _ = worker_thread.join().unwrap(); @@ -1258,7 +1267,7 @@ mod tests { .unwrap(); let consumed = consumed_receiver.recv().unwrap(); - assert_eq!(consumed.retryable_indexes, Vec::::new()); + assert_eq!(consumed.retryable_indexes, Vec::new()); // all but one succeed. 6 for initial funding assert_eq!(bank.transaction_count(), 6 + 5); diff --git a/core/src/banking_stage/consumer.rs b/core/src/banking_stage/consumer.rs index f74fba741fc67e..060cb3394c13fc 100644 --- a/core/src/banking_stage/consumer.rs +++ b/core/src/banking_stage/consumer.rs @@ -34,6 +34,21 @@ use { /// Consumer will create chunks of transactions from buffer with up to this size. pub const TARGET_NUM_TRANSACTIONS_PER_BATCH: usize = 64; +#[derive(Debug, PartialEq, Eq, PartialOrd, Ord)] +pub struct RetryableIndex { + pub index: usize, + pub immediately_retryable: bool, +} + +impl RetryableIndex { + pub fn new(index: usize, immediately_retryable: bool) -> Self { + Self { + index, + immediately_retryable, + } + } +} + pub struct ProcessTransactionBatchOutput { // The number of transactions filtered out by the cost model pub(crate) cost_model_throttled_transactions_count: u64, @@ -48,7 +63,7 @@ pub struct ExecuteAndCommitTransactionsOutput { pub(crate) transaction_counts: LeaderProcessedTransactionCounts, // Transactions that either were not executed, or were executed and failed to be committed due // to the block ending. - pub(crate) retryable_transaction_indexes: Vec, + pub(crate) retryable_transaction_indexes: Vec, // A result that indicates whether transactions were successfully // committed into the Poh stream. pub commit_transactions_result: Result, PohRecorderError>, @@ -256,23 +271,39 @@ impl Consumer { // following are retryable errors Err(TransactionError::AccountInUse) => { error_counters.account_in_use += 1; - Some(index) + // locking failure due to vote conflict or jito - immediately retry. + Some(RetryableIndex { + index, + immediately_retryable: true, + }) } Err(TransactionError::WouldExceedMaxBlockCostLimit) => { error_counters.would_exceed_max_block_cost_limit += 1; - Some(index) + Some(RetryableIndex { + index, + immediately_retryable: false, + }) } Err(TransactionError::WouldExceedMaxVoteCostLimit) => { error_counters.would_exceed_max_vote_cost_limit += 1; - Some(index) + Some(RetryableIndex { + index, + immediately_retryable: false, + }) } Err(TransactionError::WouldExceedMaxAccountCostLimit) => { error_counters.would_exceed_max_account_cost_limit += 1; - Some(index) + Some(RetryableIndex { + index, + immediately_retryable: false, + }) } Err(TransactionError::WouldExceedAccountDataBlockLimit) => { error_counters.would_exceed_account_data_block_limit += 1; - Some(index) + Some(RetryableIndex { + index, + immediately_retryable: false, + }) } // following are non-retryable errors Err(TransactionError::TooManyAccountLocks) => { @@ -369,7 +400,12 @@ impl Consumer { if let Err(recorder_err) = record_transactions_result { retryable_transaction_indexes.extend(processing_results.iter().enumerate().filter_map( - |(index, processing_result)| processing_result.was_processed().then_some(index), + |(index, processing_result)| { + processing_result.was_processed().then_some(RetryableIndex { + index, + immediately_retryable: true, // recording errors are always immediately retryable + }) + }, )); // retryable indexes are expected to be sorted - in this case the @@ -754,7 +790,13 @@ mod tests { processed_with_successful_result_count: 1, } ); - assert_eq!(retryable_transaction_indexes, vec![0]); + assert_eq!( + retryable_transaction_indexes, + vec![RetryableIndex { + index: 0, + immediately_retryable: true + }] + ); assert_matches!( commit_transactions_result, Err(PohRecorderError::MaxHeightReached) @@ -1152,7 +1194,10 @@ mod tests { commit_transactions_result.get(1), Some(CommitTransactionDetails::NotCommitted(_)) ); - assert_eq!(retryable_transaction_indexes, vec![1]); + assert_eq!( + retryable_transaction_indexes, + vec![RetryableIndex::new(1, true)] + ); let expected_block_cost = { let (actual_programs_execution_cost, actual_loaded_accounts_data_size_cost) = @@ -1311,9 +1356,12 @@ mod tests { // with simd3, duplicate transactions are not retryable if relax_intrabatch_account_locks && use_duplicate_transaction { - assert_eq!(retryable_transaction_indexes, Vec::::new()); + assert_eq!(retryable_transaction_indexes, Vec::<_>::new()); } else { - assert_eq!(retryable_transaction_indexes, vec![1]); + assert_eq!( + retryable_transaction_indexes, + vec![RetryableIndex::new(1, true)] + ); } } @@ -1369,7 +1417,9 @@ mod tests { assert_eq!( execute_and_commit_transactions_output.retryable_transaction_indexes, - (1..transactions_len - 1).collect::>() + (1..transactions_len - 1) + .map(|index| RetryableIndex::new(index, true)) + .collect::>() ); } @@ -1455,12 +1505,14 @@ mod tests { if relax_intrabatch_account_locks { assert_eq!( execute_and_commit_transactions_output.retryable_transaction_indexes, - Vec::::new() + Vec::<_>::new() ); } else { assert_eq!( execute_and_commit_transactions_output.retryable_transaction_indexes, - (1..transactions_len).collect::>() + (1..transactions_len) + .map(|index| RetryableIndex::new(index, true)) + .collect::>() ); } } @@ -1548,7 +1600,9 @@ mod tests { execute_and_commit_transactions_output .retryable_transaction_indexes .sort_unstable(); - let expected: Vec = (0..transactions.len()).collect(); + let expected: Vec<_> = (0..transactions.len()) + .map(|index| RetryableIndex::new(index, true)) + .collect(); assert_eq!( execute_and_commit_transactions_output.retryable_transaction_indexes, expected diff --git a/core/src/banking_stage/scheduler_messages.rs b/core/src/banking_stage/scheduler_messages.rs index 7905278323152e..37fdf92d2b3efa 100644 --- a/core/src/banking_stage/scheduler_messages.rs +++ b/core/src/banking_stage/scheduler_messages.rs @@ -1,4 +1,5 @@ use { + crate::banking_stage::consumer::RetryableIndex, solana_clock::{Epoch, Slot}, std::fmt::Display, }; @@ -47,5 +48,5 @@ pub struct ConsumeWork { /// Processed transactions. pub struct FinishedConsumeWork { pub work: ConsumeWork, - pub retryable_indexes: Vec, + pub retryable_indexes: Vec, } diff --git a/core/src/banking_stage/transaction_scheduler/scheduler_common.rs b/core/src/banking_stage/transaction_scheduler/scheduler_common.rs index 730264ffcea41d..6f2f5fd8c88451 100644 --- a/core/src/banking_stage/transaction_scheduler/scheduler_common.rs +++ b/core/src/banking_stage/transaction_scheduler/scheduler_common.rs @@ -242,9 +242,13 @@ impl SchedulingCommon { // Assumption - retryable indexes are in order (sorted by workers). let mut retryable_iter = retryable_indexes.iter().peekable(); for (index, (id, transaction)) in izip!(ids, transactions).enumerate() { - if let Some(&&retryable_index) = retryable_iter.peek() { - if retryable_index == index { - container.retry_transaction(id, transaction); + if let Some(&retryable_index) = retryable_iter.peek() { + if retryable_index.index == index { + container.retry_transaction( + id, + transaction, + retryable_index.immediately_retryable, + ); retryable_iter.next(); continue; } @@ -254,7 +258,11 @@ impl SchedulingCommon { debug_assert!( retryable_iter.peek().is_none(), - "retryable indexes were not in order: {retryable_indexes:?}" + "retryable indexes were not in order: {:?}", + retryable_indexes + .iter() + .map(|index| index.index) + .collect::>(), ); Ok((num_transactions, num_retryable)) @@ -290,11 +298,18 @@ impl SchedulingCommon { mod tests { use { super::*, - crate::banking_stage::transaction_scheduler::transaction_state_container::TransactionStateContainer, - crossbeam_channel::unbounded, solana_hash::Hash, solana_keypair::Keypair, - solana_pubkey::Pubkey, solana_runtime_transaction::runtime_transaction::RuntimeTransaction, + crate::banking_stage::{ + consumer::RetryableIndex, + transaction_scheduler::transaction_state_container::TransactionStateContainer, + }, + crossbeam_channel::unbounded, + solana_hash::Hash, + solana_keypair::Keypair, + solana_pubkey::Pubkey, + solana_runtime_transaction::runtime_transaction::RuntimeTransaction, solana_system_transaction as system_transaction, - solana_transaction::sanitized::SanitizedTransaction, test_case::test_case, + solana_transaction::sanitized::SanitizedTransaction, + test_case::test_case, }; const NUM_WORKERS: usize = 4; @@ -521,17 +536,22 @@ mod tests { let num_scheduled = common.send_batch(0).unwrap(); let work = work_receivers[0].try_recv().unwrap(); assert_eq!(work.ids.len(), num_scheduled); - let retryable_indexes = vec![0, 1]; + let retryable_indexes = vec![ + RetryableIndex::new(0, true), + RetryableIndex::new(1, false), // should be held by container. + ]; + let expected_num_retryable = retryable_indexes.len(); let finished_work = FinishedConsumeWork { work, - retryable_indexes: retryable_indexes.clone(), + retryable_indexes, }; finished_work_sender.send(finished_work).unwrap(); let (num_transactions, num_retryable) = common.try_receive_completed(&mut container).unwrap(); assert_eq!(num_transactions, num_scheduled); - assert_eq!(num_retryable, retryable_indexes.len()); - assert_eq!(container.buffer_size(), retryable_indexes.len()); + assert_eq!(num_retryable, expected_num_retryable); + assert_eq!(container.buffer_size(), expected_num_retryable); + assert_eq!(container.queue_size(), expected_num_retryable - 1); // held transaction not in queue. } #[test] @@ -550,10 +570,10 @@ mod tests { let num_scheduled = common.send_batch(0).unwrap(); let work = work_receivers[0].try_recv().unwrap(); assert_eq!(work.ids.len(), num_scheduled); - let retryable_indexes = vec![1, 0]; + let retryable_indexes = vec![RetryableIndex::new(1, true), RetryableIndex::new(0, true)]; let finished_work = FinishedConsumeWork { work, - retryable_indexes: retryable_indexes.clone(), + retryable_indexes, }; finished_work_sender.send(finished_work).unwrap(); diff --git a/core/src/banking_stage/transaction_scheduler/scheduler_controller.rs b/core/src/banking_stage/transaction_scheduler/scheduler_controller.rs index a9bfa3f719bc95..7088e2ab550ced 100644 --- a/core/src/banking_stage/transaction_scheduler/scheduler_controller.rs +++ b/core/src/banking_stage/transaction_scheduler/scheduler_controller.rs @@ -87,6 +87,7 @@ where } pub fn run(mut self) -> Result<(), SchedulerError> { + let mut last_slot = None; while !self.exit.load(Ordering::Relaxed) { // BufferedPacketsDecision is shared with legacy BankingStage, which will forward // packets. Initially, not renaming these decision variants but the actions taken @@ -110,6 +111,10 @@ where .maybe_report_and_reset_slot(new_leader_slot); self.receive_completed()?; + if last_slot != new_leader_slot { + self.container.flush_held_transactions(); + last_slot = new_leader_slot; + } self.process_transactions(&decision)?; if self.receive_and_buffer_packets(&decision).is_err() { break; @@ -353,7 +358,7 @@ mod tests { use { super::*, crate::banking_stage::{ - consumer::TARGET_NUM_TRANSACTIONS_PER_BATCH, + consumer::{RetryableIndex, TARGET_NUM_TRANSACTIONS_PER_BATCH}, packet_deserializer::PacketDeserializer, scheduler_messages::{ConsumeWork, FinishedConsumeWork, TransactionBatchId}, tests::create_slow_genesis_config, @@ -874,7 +879,7 @@ mod tests { finished_consume_work_sender .send(FinishedConsumeWork { work: consume_work, - retryable_indexes: vec![1], + retryable_indexes: vec![RetryableIndex::new(1, true)], }) .unwrap(); diff --git a/core/src/banking_stage/transaction_scheduler/transaction_state_container.rs b/core/src/banking_stage/transaction_scheduler/transaction_state_container.rs index bfd1419dcf1420..dad2bfd2506dc6 100644 --- a/core/src/banking_stage/transaction_scheduler/transaction_state_container.rs +++ b/core/src/banking_stage/transaction_scheduler/transaction_state_container.rs @@ -44,6 +44,7 @@ pub(crate) struct TransactionStateContainer { capacity: usize, priority_queue: MinMaxHeap, id_to_transaction_state: Slab>, + held_transactions: Vec, } #[cfg_attr(feature = "dev-context-only-utils", qualifiers(pub))] @@ -71,13 +72,23 @@ pub(crate) trait StateContainer { /// Retries a transaction - inserts transaction back into map. /// This transitions the transaction to `Unprocessed` state. - fn retry_transaction(&mut self, transaction_id: TransactionId, transaction: Tx) { + fn retry_transaction( + &mut self, + transaction_id: TransactionId, + transaction: Tx, + immediately_retryable: bool, + ) { let transaction_state = self .get_mut_transaction_state(transaction_id) .expect("transaction must exist"); let priority_id = TransactionPriorityId::new(transaction_state.priority(), transaction_id); transaction_state.retry_transaction(transaction); - self.push_ids_into_queue(std::iter::once(priority_id)); + + if immediately_retryable { + self.push_ids_into_queue(std::iter::once(priority_id)); + } else { + self.hold_transaction(priority_id); + } } /// Pushes transaction ids into the priority queue. If the queue if full, @@ -91,9 +102,14 @@ pub(crate) trait StateContainer { priority_ids: impl Iterator, ) -> usize; + /// Hold the tarnsaction until the next flush (next slot). + fn hold_transaction(&mut self, priority_id: TransactionPriorityId); + /// Remove transaction by id. fn remove_by_id(&mut self, id: TransactionId); + fn flush_held_transactions(&mut self); + fn get_min_max_priority(&self) -> MinMaxResult; #[cfg(feature = "dev-context-only-utils")] @@ -110,6 +126,7 @@ impl StateContainer for TransactionStateContainer StateContainer for TransactionStateContainer MinMaxResult { match self.priority_queue.peek_min() { Some(min) => match self.priority_queue.peek_max() { @@ -323,11 +350,21 @@ impl StateContainer for TransactionViewStateContainer { self.inner.push_ids_into_queue(priority_ids) } + #[inline] + fn hold_transaction(&mut self, priority_id: TransactionPriorityId) { + self.inner.hold_transaction(priority_id); + } + #[inline] fn remove_by_id(&mut self, id: TransactionId) { self.inner.remove_by_id(id); } + #[inline] + fn flush_held_transactions(&mut self) { + self.inner.flush_held_transactions(); + } + #[inline] fn get_min_max_priority(&self) -> MinMaxResult { self.inner.get_min_max_priority() diff --git a/core/src/banking_stage/vote_worker.rs b/core/src/banking_stage/vote_worker.rs index 151d92d092ade8..6a1002e672b25d 100644 --- a/core/src/banking_stage/vote_worker.rs +++ b/core/src/banking_stage/vote_worker.rs @@ -429,7 +429,10 @@ impl VoteWorker { ProcessTransactionsSummary { reached_max_poh_height, transaction_counts: total_transaction_counts, - retryable_transaction_indexes, + retryable_transaction_indexes: retryable_transaction_indexes + .into_iter() + .map(|retryable_index| retryable_index.index) + .collect(), cost_model_throttled_transactions_count, cost_model_us, execute_and_commit_timings,