diff --git a/ledger/benches/blockstore_processor.rs b/ledger/benches/blockstore_processor.rs index e72f75186ca6f2..1275db0ce01b99 100644 --- a/ledger/benches/blockstore_processor.rs +++ b/ledger/benches/blockstore_processor.rs @@ -142,6 +142,7 @@ fn bench_execute_batch( TransactionBatchWithIndexes { batch, transaction_indexes: (0..batch_size).collect(), + self_conflicting_batch: false, } }) .collect(); diff --git a/ledger/src/blockstore_processor.rs b/ledger/src/blockstore_processor.rs index 5273b4601bfe33..fd4c75d916239f 100644 --- a/ledger/src/blockstore_processor.rs +++ b/ledger/src/blockstore_processor.rs @@ -78,9 +78,11 @@ use { ExecuteTimingType::{NumExecuteBatches, TotalBatchesLen}, }; +#[derive(Clone)] pub struct TransactionBatchWithIndexes<'a, 'b> { pub batch: TransactionBatch<'a, 'b>, pub transaction_indexes: Vec, + pub self_conflicting_batch: bool, } struct ReplayEntry { @@ -145,6 +147,7 @@ pub fn execute_batch( let TransactionBatchWithIndexes { batch, transaction_indexes, + .. } = batch; let record_token_balances = transaction_status_sender.is_some(); @@ -421,6 +424,7 @@ fn schedule_batches_for_execution( for TransactionBatchWithIndexes { batch, transaction_indexes, + .. } in batches { bank.schedule_transaction_executions( @@ -450,6 +454,7 @@ fn rebatch_transactions<'a>( TransactionBatchWithIndexes { batch: tx_batch, transaction_indexes, + self_conflicting_batch: false, } } @@ -469,6 +474,7 @@ fn rebatch_and_execute_batches( let ((lock_results, sanitized_txs), transaction_indexes): ((Vec<_>, Vec<_>), Vec<_>) = batches .iter() + .filter(|batch| !batch.self_conflicting_batch) .flat_map(|batch| { batch .batch @@ -517,6 +523,17 @@ fn rebatch_and_execute_batches( batch_cost = 0; } }); + let conflicting_batches: Vec<_> = batches + .iter() + .filter(|batch| batch.self_conflicting_batch) + .map(|batch| { + let mut non_conflicting_batch = batch.clone(); + non_conflicting_batch.batch.set_needs_unlock(false); + non_conflicting_batch + }) + .collect(); + tx_batches.extend(conflicting_batches); + &tx_batches[..] } else { batches @@ -605,7 +622,6 @@ fn process_entries( log_messages_bytes_limit: Option, prioritization_fee_cache: &PrioritizationFeeCache, ) -> Result<()> { - // accumulator for entries that can be processed in parallel let mut batches = vec![]; let mut tick_hashes = vec![]; @@ -646,19 +662,23 @@ fn process_entries( // try to lock the accounts let batch = bank.prepare_sanitized_batch(transactions); let first_lock_err = first_err(batch.lock_results()); + // prepare_sanitized_batch will return this flag after self conflicting locking is allowed + // ref: https://github.com/anza-xyz/agave/pull/1624 + let self_conflicting_batch = true; // if locking worked if first_lock_err.is_ok() { batches.push(TransactionBatchWithIndexes { batch, transaction_indexes, + self_conflicting_batch, }); // done with this entry break; } // else we failed to lock, 2 possible reasons if batches.is_empty() { - // An entry has account lock conflicts with *itself*, which should not happen + // else we failed due to account lock limit which should not happen // if generated by a properly functioning leader datapoint_error!( "validator_process_entry_error", @@ -4674,6 +4694,42 @@ pub mod tests { ] } + /// create variable number of conflicting/non-conflicting transactions + fn create_sanitized_transactions_for_tests( + mint_keypair: &Keypair, + genesis_hash: &Hash, + insert_conflicting_tx: bool, + num_of_txns: usize, + ) -> Vec { + let mut txs = vec![]; + + if insert_conflicting_tx { + // 1 iteration inserts 2 transactions in vec therefore loop_iter/2 + for _ in 0..num_of_txns / 2 { + let pubkey = solana_sdk::pubkey::new_rand(); + let pubkey2 = solana_sdk::pubkey::new_rand(); + let keypair = Keypair::new(); + + txs.push(SanitizedTransaction::from_transaction_for_tests( + system_transaction::transfer(mint_keypair, &pubkey, 1, *genesis_hash), + )); + txs.push(SanitizedTransaction::from_transaction_for_tests( + system_transaction::transfer(&keypair, &pubkey2, 1, *genesis_hash), + )); + } + } else { + for _ in 0..num_of_txns { + let keypair = Keypair::new(); + let pubkey = solana_sdk::pubkey::new_rand(); + + txs.push(SanitizedTransaction::from_transaction_for_tests( + system_transaction::transfer(&keypair, &pubkey, 1, *genesis_hash), + )); + } + } + txs + } + #[test] fn test_confirm_slot_entries_progress_num_txs_indexes() { let GenesisConfigInfo { @@ -4889,6 +4945,7 @@ pub mod tests { let batch_with_indexes = TransactionBatchWithIndexes { batch, transaction_indexes: (0..txs.len()).collect(), + self_conflicting_batch: false, }; let replay_tx_thread_pool = create_thread_pool(1); @@ -4921,6 +4978,201 @@ pub mod tests { do_test_schedule_batches_for_execution(false); } + /// For normal flow self conflicting batches should not be rebatched. + /// Returns number of self conflicting batches after rebatching. + /// + /// This function partially test the rebatching functionality of [rebatch_and_execute_batches]. + /// After SVM changes (to allow self conflicting batches) [rebatch_and_execute_batches] can directly be used in this test. + fn check_conflicting_batches_after_rebatching( + bank: &Arc, + batches: &[TransactionBatchWithIndexes], + ) -> usize { + let ((lock_results, sanitized_txs), transaction_indexes): ((Vec<_>, Vec<_>), Vec<_>) = + batches + .iter() + .filter(|batch| !batch.self_conflicting_batch) + .flat_map(|batch| { + batch + .batch + .lock_results() + .iter() + .cloned() + .zip(batch.batch.sanitized_transactions().to_vec()) + .zip(batch.transaction_indexes.to_vec()) + }) + .unzip(); + + let mut minimal_tx_cost = u64::MAX; + let mut total_cost: u64 = 0; + let tx_costs = sanitized_txs + .iter() + .map(|tx| { + let tx_cost = CostModel::calculate_cost(tx, &bank.feature_set); + let cost = tx_cost.sum(); + minimal_tx_cost = std::cmp::min(minimal_tx_cost, cost); + total_cost = total_cost.saturating_add(cost); + cost + }) + .collect::>(); + + let target_batch_count = get_thread_count() as u64; + + let mut tx_batches: Vec = vec![]; + let rebatched_txs: &[TransactionBatchWithIndexes] = + if total_cost > target_batch_count.saturating_mul(minimal_tx_cost) { + let target_batch_cost = total_cost / target_batch_count; + let mut batch_cost: u64 = 0; + let mut slice_start = 0; + tx_costs.into_iter().enumerate().for_each(|(index, cost)| { + let next_index = index + 1; + batch_cost = batch_cost.saturating_add(cost); + if batch_cost >= target_batch_cost || next_index == sanitized_txs.len() { + let tx_batch = rebatch_transactions( + &lock_results, + bank, + &sanitized_txs, + slice_start, + index, + &transaction_indexes, + ); + slice_start = next_index; + tx_batches.push(tx_batch); + batch_cost = 0; + } + }); + let conflicting_batches: Vec<_> = batches + .iter() + .filter(|batch| batch.self_conflicting_batch) + .map(|batch| { + let mut non_conflicting_batch = batch.clone(); + non_conflicting_batch.batch.set_needs_unlock(false); + non_conflicting_batch + }) + .collect(); + tx_batches.extend(conflicting_batches); + + &tx_batches[..] + } else { + batches + }; + let mut num_of_self_conflicting_batches = 0; + for each_batch in rebatched_txs.iter() { + if each_batch.self_conflicting_batch { + num_of_self_conflicting_batches += 1; + } + } + num_of_self_conflicting_batches + } + + fn do_test_schedule_conflicting_batches_for_execution() { + solana_logger::setup(); + let dummy_leader_pubkey = solana_sdk::pubkey::new_rand(); + let GenesisConfigInfo { + genesis_config, + mint_keypair, + .. + } = create_genesis_config_with_leader(500, &dummy_leader_pubkey, 100); + let bank = Arc::new(Bank::new_for_tests(&genesis_config)); + let context = SchedulingContext::new(bank.clone()); + + let conflicting_txs = create_sanitized_transactions_for_tests( + &mint_keypair, + &genesis_config.hash(), + true, + 55, + ); + + let non_conflicting_txs1 = create_sanitized_transactions_for_tests( + &mint_keypair, + &genesis_config.hash(), + false, + 32, + ); + let non_conflicting_txs2 = create_sanitized_transactions_for_tests( + &mint_keypair, + &genesis_config.hash(), + false, + 32, + ); + + let mut mocked_scheduler = MockInstalledScheduler::new(); + let seq = Arc::new(Mutex::new(mockall::Sequence::new())); + let seq_cloned = seq.clone(); + mocked_scheduler + .expect_context() + .times(1) + .in_sequence(&mut seq.lock().unwrap()) + .return_const(context); + mocked_scheduler + .expect_schedule_execution() + .times(conflicting_txs.len() + non_conflicting_txs1.len() + non_conflicting_txs2.len()) + .returning(|(_, _)| Ok(())); + + mocked_scheduler + .expect_wait_for_termination() + .with(mockall::predicate::eq(true)) + .times(1) + .in_sequence(&mut seq.lock().unwrap()) + .returning(move |_| { + let mut mocked_uninstalled_scheduler = MockUninstalledScheduler::new(); + mocked_uninstalled_scheduler + .expect_return_to_pool() + .times(1) + .in_sequence(&mut seq_cloned.lock().unwrap()) + .returning(|| ()); + ( + (Ok(()), ExecuteTimings::default()), + Box::new(mocked_uninstalled_scheduler), + ) + }); + let bank = BankWithScheduler::new(bank, Some(Box::new(mocked_scheduler))); + + let conflicting_batch = bank.prepare_sanitized_batch(&conflicting_txs); + let conflicting_batch_with_indexes = TransactionBatchWithIndexes { + batch: conflicting_batch, + transaction_indexes: (0..conflicting_txs.len()).collect(), + self_conflicting_batch: true, + }; + + let non_conflicting_batch1 = bank.prepare_sanitized_batch(&non_conflicting_txs1); + let non_conflicting_batch1_with_indexes = TransactionBatchWithIndexes { + batch: non_conflicting_batch1, + transaction_indexes: (0..non_conflicting_txs1.len()).collect(), + self_conflicting_batch: false, + }; + + let non_conflicting_batch2 = bank.prepare_sanitized_batch(&non_conflicting_txs2); + let non_conflicting_batch2_with_indexes = TransactionBatchWithIndexes { + batch: non_conflicting_batch2, + transaction_indexes: (0..non_conflicting_txs2.len()).collect(), + self_conflicting_batch: false, + }; + + let replay_tx_thread_pool = create_thread_pool(1); + let mut batch_execution_timing = BatchExecutionTiming::default(); + let ignored_prioritization_fee_cache = PrioritizationFeeCache::new(0u64); + let result = process_batches( + &bank, + &replay_tx_thread_pool, + &[ + non_conflicting_batch1_with_indexes, + conflicting_batch_with_indexes, + non_conflicting_batch2_with_indexes, + ], + None, + None, + &mut batch_execution_timing, + None, + &ignored_prioritization_fee_cache, + ); + assert_matches!(result, Ok(())); + } + + #[test] + fn test_schedule_conflicting_batches_for_execution_success() { + do_test_schedule_conflicting_batches_for_execution(); + } + #[test] fn test_confirm_slot_entries_with_fix() { const HASHES_PER_TICK: u64 = 10; @@ -5101,4 +5353,188 @@ pub mod tests { check_block_cost_limits(&bank, &commit_results, &txs) ); } + + #[test] + /// checks rebatching for disjoint batches after introducing self_conflicting_batch + fn test_multiple_non_conflicting_batches() { + let dummy_leader_pubkey = solana_sdk::pubkey::new_rand(); + let GenesisConfigInfo { + genesis_config, + mint_keypair, + .. + } = create_genesis_config_with_leader(500, &dummy_leader_pubkey, 100); + let bank = Arc::new(Bank::new_for_tests(&genesis_config)); + + // batch 1 + let txs = create_sanitized_transactions_for_tests( + &mint_keypair, + &genesis_config.hash(), + false, + 6, + ); + let non_conflicting_batch: TransactionBatch = bank.prepare_sanitized_batch(&txs); + let non_conflicting_batch_1_with_index = TransactionBatchWithIndexes { + batch: non_conflicting_batch, + transaction_indexes: (0..txs.len()).collect(), + self_conflicting_batch: false, + }; + + // batch 2 + let txs = create_sanitized_transactions_for_tests( + &mint_keypair, + &genesis_config.hash(), + false, + 6, + ); + let non_conflicting_batch: TransactionBatch = bank.prepare_sanitized_batch(&txs); + let non_conflicting_batch_2_with_index = TransactionBatchWithIndexes { + batch: non_conflicting_batch, + transaction_indexes: (0..txs.len()).collect(), + self_conflicting_batch: false, + }; + + let number_of_conflicting_batches = check_conflicting_batches_after_rebatching( + &bank, + &[ + non_conflicting_batch_1_with_index, + non_conflicting_batch_2_with_index, + ], + ); + // As no self conflicting batch is provided + assert_eq!(number_of_conflicting_batches, 0); + } + + #[test] + fn test_multiple_conflicting_batches() { + let dummy_leader_pubkey = solana_sdk::pubkey::new_rand(); + let GenesisConfigInfo { + genesis_config, + mint_keypair, + .. + } = create_genesis_config_with_leader(500, &dummy_leader_pubkey, 100); + let bank = Arc::new(Bank::new_for_tests(&genesis_config)); + + // self conflicting batch 1 + let txs = + create_sanitized_transactions_for_tests(&mint_keypair, &genesis_config.hash(), true, 6); + let conflicting_batch: TransactionBatch = bank.prepare_sanitized_batch(&txs); + let conflicting_batch_1_with_index = TransactionBatchWithIndexes { + batch: conflicting_batch, + transaction_indexes: (0..txs.len()).collect(), + self_conflicting_batch: true, + }; + + // self conflicting batch 2 + let txs = + create_sanitized_transactions_for_tests(&mint_keypair, &genesis_config.hash(), true, 6); + let conflicting_batch: TransactionBatch = bank.prepare_sanitized_batch(&txs); + let conflicting_batch_2_with_index = TransactionBatchWithIndexes { + batch: conflicting_batch, + transaction_indexes: (0..txs.len()).collect(), + self_conflicting_batch: true, + }; + + // self conflicting batch 3 + let txs = create_sanitized_transactions_for_tests( + &mint_keypair, + &genesis_config.hash(), + true, + 20, + ); + let conflicting_batch = bank.prepare_sanitized_batch(&txs); + let conflicting_batch_3_with_index = TransactionBatchWithIndexes { + batch: conflicting_batch, + transaction_indexes: (0..txs.len()).collect(), + self_conflicting_batch: true, + }; + + let number_of_conflicting_batches = check_conflicting_batches_after_rebatching( + &bank, + &[ + conflicting_batch_1_with_index, + conflicting_batch_2_with_index, + conflicting_batch_3_with_index, + ], + ); + // the self conflicting batch should not be rebatched + assert_eq!(number_of_conflicting_batches, 3); + } + #[test] + fn test_mixture_of_conflicting_and_non_conflicting_batches() { + let dummy_leader_pubkey = solana_sdk::pubkey::new_rand(); + let GenesisConfigInfo { + genesis_config, + mint_keypair, + .. + } = create_genesis_config_with_leader(500, &dummy_leader_pubkey, 100); + let bank = Arc::new(Bank::new_for_tests(&genesis_config)); + + // batch 1 + let txs = create_sanitized_transactions_for_tests( + &mint_keypair, + &genesis_config.hash(), + false, + 6, + ); + let non_conflicting_batch: TransactionBatch = bank.prepare_sanitized_batch(&txs); + let non_conflicting_batch_1_with_index = TransactionBatchWithIndexes { + batch: non_conflicting_batch, + transaction_indexes: (0..txs.len()).collect(), + self_conflicting_batch: false, + }; + + // batch 2 + let txs = create_sanitized_transactions_for_tests( + &mint_keypair, + &genesis_config.hash(), + false, + 20, + ); + let non_conflicting_batch: TransactionBatch = bank.prepare_sanitized_batch(&txs); + let non_conflicting_batch_2_with_index = TransactionBatchWithIndexes { + batch: non_conflicting_batch, + transaction_indexes: (0..txs.len()).collect(), + self_conflicting_batch: false, + }; + + // batch 3 + let txs = create_sanitized_transactions_for_tests( + &mint_keypair, + &genesis_config.hash(), + true, + 20, + ); + let conflicting_batch = bank.prepare_sanitized_batch(&txs); + let conflicting_batch_1_with_index = TransactionBatchWithIndexes { + batch: conflicting_batch, + transaction_indexes: (0..txs.len()).collect(), + self_conflicting_batch: true, + }; + + // batch 3 + let txs = create_sanitized_transactions_for_tests( + &mint_keypair, + &genesis_config.hash(), + true, + 40, + ); + let conflicting_batch = bank.prepare_sanitized_batch(&txs); + let conflicting_batch_2_with_index = TransactionBatchWithIndexes { + batch: conflicting_batch, + transaction_indexes: (0..txs.len()).collect(), + self_conflicting_batch: true, + }; + + let number_of_conflicting_batches = check_conflicting_batches_after_rebatching( + &bank, + &[ + non_conflicting_batch_1_with_index, + conflicting_batch_1_with_index, + non_conflicting_batch_2_with_index, + conflicting_batch_2_with_index, + ], + ); + // the self conflicting batch should not be rebatched + assert_eq!(number_of_conflicting_batches, 2); + } } diff --git a/runtime/src/transaction_batch.rs b/runtime/src/transaction_batch.rs index ecec27e02e93aa..8e92a3a9d43425 100644 --- a/runtime/src/transaction_batch.rs +++ b/runtime/src/transaction_batch.rs @@ -3,7 +3,7 @@ use { solana_sdk::transaction::{Result, SanitizedTransaction}, std::borrow::Cow, }; - +#[derive(Clone)] // Represents the results of trying to lock a set of accounts pub struct TransactionBatch<'a, 'b> { lock_results: Vec>, diff --git a/unified-scheduler-pool/src/lib.rs b/unified-scheduler-pool/src/lib.rs index 1b9c471137ca70..0d30fd565af33f 100644 --- a/unified-scheduler-pool/src/lib.rs +++ b/unified-scheduler-pool/src/lib.rs @@ -434,6 +434,7 @@ impl TaskHandler for DefaultTaskHandler { let batch_with_indexes = TransactionBatchWithIndexes { batch, transaction_indexes: vec![index], + self_conflicting_batch: false, }; *result = execute_batch(