diff --git a/core/src/banking_stage.rs b/core/src/banking_stage.rs index 97bffd0931ff80..4a359d7b126bfb 100644 --- a/core/src/banking_stage.rs +++ b/core/src/banking_stage.rs @@ -1,7 +1,7 @@ //! The `banking_stage` processes Transaction messages. It is intended to be used //! to contruct a software pipeline. The stage uses all available CPU cores and //! can do its processing in parallel with signature verification on the GPU. -use crate::{cost_tracker::CostTracker, packet_hasher::PacketHasher}; +use crate::{cost_tracker::CostTracker, packet_hasher::PacketHasher, vote_redundancy_checker}; use crossbeam_channel::{Receiver as CrossbeamReceiver, RecvTimeoutError}; use itertools::Itertools; use lru::LruCache; @@ -102,6 +102,7 @@ pub struct BankingStageStats { cost_tracker_update_elapsed: AtomicU64, cost_tracker_clone_elapsed: AtomicU64, cost_tracker_check_elapsed: AtomicU64, + vote_check_elapsed: AtomicU64, } impl BankingStageStats { @@ -225,6 +226,11 @@ impl BankingStageStats { self.cost_tracker_check_elapsed.swap(0, Ordering::Relaxed) as i64, i64 ), + ( + "vote_check_elapsed", + self.vote_check_elapsed.swap(0, Ordering::Relaxed) as i64, + i64 + ), ); } } @@ -790,6 +796,7 @@ impl BankingStage { batch: &TransactionBatch, transaction_status_sender: Option, gossip_vote_sender: &ReplayVoteSender, + banking_stage_stats: &BankingStageStats, ) -> (Result, Vec) { let mut load_execute_time = Measure::start("load_execute_time"); // Use a shorter maximum age when adding transactions into the pipeline. This will reduce @@ -810,8 +817,14 @@ impl BankingStage { vec![] }; - let mut execute_timings = ExecuteTimings::default(); + let mut vote_check_time = Measure::start("vote_check_time"); + let filtered_batch = vote_redundancy_checker::check_redundant_votes(bank, batch); + vote_check_time.stop(); + banking_stage_stats + .vote_check_elapsed + .fetch_add(vote_check_time.as_us(), Ordering::Relaxed); + let mut execute_timings = ExecuteTimings::default(); let ( mut loaded_accounts, results, @@ -821,7 +834,7 @@ impl BankingStage { tx_count, signature_count, ) = bank.load_and_execute_transactions( - batch, + &filtered_batch, MAX_PROCESSING_AGE, transaction_status_sender.is_some(), transaction_status_sender.is_some(), @@ -832,8 +845,12 @@ impl BankingStage { let freeze_lock = bank.freeze_lock(); let mut record_time = Measure::start("record_time"); - let (num_to_commit, retryable_record_txs) = - Self::record_transactions(bank.slot(), batch.sanitized_transactions(), &results, poh); + let (num_to_commit, retryable_record_txs) = Self::record_transactions( + bank.slot(), + filtered_batch.sanitized_transactions(), + &results, + poh, + ); inc_new_counter_info!( "banking_stage-record_transactions_num_to_commit", *num_to_commit.as_ref().unwrap_or(&0) @@ -849,7 +866,7 @@ impl BankingStage { record_time.stop(); let mut commit_time = Measure::start("commit_time"); - let sanitized_txs = batch.sanitized_transactions(); + let sanitized_txs = filtered_batch.sanitized_transactions(); let num_to_commit = num_to_commit.unwrap(); if num_to_commit != 0 { let tx_results = bank.commit_transactions( @@ -863,9 +880,10 @@ impl BankingStage { bank_utils::find_and_send_votes(sanitized_txs, &tx_results, Some(gossip_vote_sender)); if let Some(transaction_status_sender) = transaction_status_sender { - let txs = batch.sanitized_transactions().to_vec(); - let post_balances = bank.collect_balances(batch); - let post_token_balances = collect_token_balances(bank, batch, &mut mint_decimals); + let txs = filtered_batch.sanitized_transactions().to_vec(); + let post_balances = bank.collect_balances(&filtered_batch); + let post_token_balances = + collect_token_balances(bank, &filtered_batch, &mut mint_decimals); transaction_status_sender.send_transaction_status_batch( bank.clone(), txs, @@ -906,6 +924,7 @@ impl BankingStage { chunk_offset: usize, transaction_status_sender: Option, gossip_vote_sender: &ReplayVoteSender, + banking_stage_stats: &BankingStageStats, ) -> (Result, Vec) { let mut lock_time = Measure::start("lock_time"); // Once accounts are locked, other threads cannot encode transactions that will modify the @@ -919,6 +938,7 @@ impl BankingStage { &batch, transaction_status_sender, gossip_vote_sender, + banking_stage_stats, ); retryable_txs.iter_mut().for_each(|x| *x += chunk_offset); @@ -949,6 +969,7 @@ impl BankingStage { poh: &TransactionRecorder, transaction_status_sender: Option, gossip_vote_sender: &ReplayVoteSender, + banking_stage_stats: &BankingStageStats, ) -> (usize, Vec) { let mut chunk_start = 0; let mut unprocessed_txs = vec![]; @@ -965,6 +986,7 @@ impl BankingStage { chunk_start, transaction_status_sender.clone(), gossip_vote_sender, + banking_stage_stats, ); trace!("process_transactions result: {:?}", result); @@ -1189,6 +1211,7 @@ impl BankingStage { poh, transaction_status_sender, gossip_vote_sender, + banking_stage_stats, ); process_tx_time.stop(); let unprocessed_tx_count = unprocessed_tx_indexes.len(); @@ -2230,6 +2253,7 @@ mod tests { poh_recorder.lock().unwrap().set_working_bank(working_bank); let (gossip_vote_sender, _gossip_vote_receiver) = unbounded(); + let banking_stage_stats = BankingStageStats::default(); BankingStage::process_and_record_transactions( &bank, &transactions, @@ -2237,6 +2261,7 @@ mod tests { 0, None, &gossip_vote_sender, + &banking_stage_stats, ) .0 .unwrap(); @@ -2276,6 +2301,7 @@ mod tests { 0, None, &gossip_vote_sender, + &banking_stage_stats, ) .0, Err(PohRecorderError::MaxHeightReached) @@ -2367,6 +2393,7 @@ mod tests { let (gossip_vote_sender, _gossip_vote_receiver) = unbounded(); + let banking_stage_stats = BankingStageStats::default(); let (result, unprocessed) = BankingStage::process_and_record_transactions( &bank, &transactions, @@ -2374,6 +2401,7 @@ mod tests { 0, None, &gossip_vote_sender, + &banking_stage_stats, ); poh_recorder @@ -2474,6 +2502,7 @@ mod tests { let (gossip_vote_sender, _gossip_vote_receiver) = unbounded(); + let banking_stage_stats = BankingStageStats::default(); let (processed_transactions_count, mut retryable_txs) = BankingStage::process_transactions( &bank, @@ -2482,6 +2511,7 @@ mod tests { &recorder, None, &gossip_vote_sender, + &banking_stage_stats, ); assert_eq!(processed_transactions_count, 0,); @@ -2575,6 +2605,7 @@ mod tests { let (gossip_vote_sender, _gossip_vote_receiver) = unbounded(); + let banking_stage_stats = BankingStageStats::default(); let _ = BankingStage::process_and_record_transactions( &bank, &transactions, @@ -2585,6 +2616,7 @@ mod tests { enable_cpi_and_log_storage: false, }), &gossip_vote_sender, + &banking_stage_stats, ); transaction_status_service.join().unwrap(); diff --git a/core/src/lib.rs b/core/src/lib.rs index 8bf8e88291e22b..5fcea2d6200a9d 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -60,6 +60,7 @@ pub mod tvu; pub mod unfrozen_gossip_verified_vote_hashes; pub mod validator; pub mod verified_vote_packets; +pub mod vote_redundancy_checker; pub mod vote_simulator; pub mod vote_stake_tracker; pub mod voting_service; diff --git a/core/src/vote_redundancy_checker.rs b/core/src/vote_redundancy_checker.rs new file mode 100644 index 00000000000000..69ee032795568e --- /dev/null +++ b/core/src/vote_redundancy_checker.rs @@ -0,0 +1,182 @@ +use log::*; +use solana_runtime::{bank::Bank, transaction_batch::TransactionBatch}; +use solana_sdk::{pubkey::Pubkey, transaction::TransactionError}; +use solana_vote_program::{vote_instruction::VoteError, vote_state::Vote, vote_transaction}; +use std::borrow::Cow; + +pub fn check_redundant_votes<'a, 'b>( + bank: &'a Bank, + batch: &'b TransactionBatch, +) -> TransactionBatch<'a, 'b> { + let sanitized_txs = batch.sanitized_transactions(); + let check_results = sanitized_txs + .iter() + .zip(batch.lock_results().to_vec()) + .map(|(tx, lock_res)| match lock_res { + Ok(()) => { + if let Some((vote_account_pubkey, vote, _vote_switch_to_slot_hash)) = + vote_transaction::parse_sanitized_vote_transaction(tx) + { + debug!( + "tx {:?} parsed into vote {:?}, vote account key {}", + tx, vote, vote_account_pubkey + ); + inc_new_counter_info!("bank-process_vote_transactions", 1); + + if is_redundant_by_vote_state(bank, &vote_account_pubkey, &vote) { + inc_new_counter_info!("bank-process_redundant_vote_transactions", 1); + Err(TransactionError::AlreadyProcessed) + } else { + Ok(()) + } + } else { + Ok(()) + } + } + Err(e) => Err(e), + }) + .collect(); + TransactionBatch::new(check_results, bank, Cow::Borrowed(sanitized_txs)) +} + +fn is_redundant_by_vote_state(bank: &Bank, vote_account_pubkey: &Pubkey, vote: &Vote) -> bool { + // ignore vote without slot, or slot '0' (during startup) + let last_slot = match vote.slots.last() { + None => { + debug!( + "Vote has no slots, skip checking redundancy for vote {:?}", + vote + ); + return false; + } + Some(slot) => { + if 0 == *slot { + debug!("Vote slot 0, skip checking redundancy for vote {:?}", vote); + return false; + } + slot + } + }; + + let vote_account = match bank.get_vote_account(vote_account_pubkey) { + None => { + warn!( + "Vote account {} does not exist, skip checking redundancy for vote {:?}", + vote_account_pubkey, vote + ); + return false; + } + Some((_stake, vote_account)) => vote_account, + }; + + let vote_state = vote_account.vote_state(); + let vote_state = match vote_state.as_ref() { + Err(_) => { + warn!( + "Vote account {} is unreachable, skip checking redundancy for vote {:?}", + vote_account_pubkey, vote + ); + return false; + } + Ok(vote_state) => vote_state, + }; + + match vote_state.check_slots_are_valid(vote, &[(*last_slot, vote.hash)]) { + Err(VoteError::VoteTooOld) => { + debug!( + "Vote {:?} by vote account {} is redundant", + vote, vote_account_pubkey + ); + true + } + _ => false, + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::vote_simulator::VoteSimulator; + use solana_sdk::{genesis_config::create_genesis_config, hash::Hash}; + use std::{collections::HashMap, sync::Arc}; + use trees::notation::tr; + + #[test] + fn test_empty_vote_is_not_checked() { + let (genesis_config, _mint_keypair) = create_genesis_config(1); + let bank = Arc::new(Bank::new_for_tests(&genesis_config)); + let empty_vote = Vote::new(vec![], Hash::default()); + let vote_account_pubkey = Pubkey::new_unique(); + + // first check, should pass - not redundant + assert!(!is_redundant_by_vote_state( + &bank, + &vote_account_pubkey, + &empty_vote + )); + } + + #[test] + fn test_slot0_vote_is_not_checked() { + let (genesis_config, _mint_keypair) = create_genesis_config(1); + let bank = Arc::new(Bank::new_for_tests(&genesis_config)); + let empty_vote = Vote::new(vec![0], Hash::default()); + let vote_account_pubkey = Pubkey::new_unique(); + + // first check, should pass - not redundant + assert!(!is_redundant_by_vote_state( + &bank, + &vote_account_pubkey, + &empty_vote + )); + } + + #[test] + fn test_vote_redundancy() { + // Init state + let mut vote_simulator = VoteSimulator::new(1); + let my_node_pubkey = vote_simulator.node_pubkeys[0]; + let my_vote_pubkey = vote_simulator.vote_pubkeys[0]; + + // Create the tree of banks in a BankForks object + let forks = tr(0) / (tr(1) / (tr(2) / (tr(3)))); + + // Setup votes for slot 0 and 1 + { + let mut cluster_votes = HashMap::new(); + let votes = vec![0, 1]; + cluster_votes.insert(my_node_pubkey, votes); + vote_simulator.fill_bank_forks(forks, &cluster_votes, true); + } + + let bank1 = vote_simulator + .bank_forks + .read() + .unwrap() + .get(1) + .unwrap() + .clone(); + let vote1 = Vote::new(vec![1], bank1.hash()); + + let bank2 = vote_simulator + .bank_forks + .read() + .unwrap() + .get(2) + .unwrap() + .clone(); + let vote2 = Vote::new(vec![2], bank2.hash()); + + let bank3 = vote_simulator + .bank_forks + .read() + .unwrap() + .get(3) + .unwrap() + .clone(); + + // for bank3, vote for bank1 is redundant, vote for bank2 is not + assert!(is_redundant_by_vote_state(&bank3, &my_vote_pubkey, &vote1)); + assert!(!is_redundant_by_vote_state(&bank3, &my_vote_pubkey, &vote2)); + } +} diff --git a/programs/vote/src/vote_state/mod.rs b/programs/vote/src/vote_state/mod.rs index 70e59c3d28de7d..38998b780b6cde 100644 --- a/programs/vote/src/vote_state/mod.rs +++ b/programs/vote/src/vote_state/mod.rs @@ -296,7 +296,7 @@ impl VoteState { } } - fn check_slots_are_valid( + pub fn check_slots_are_valid( &self, vote: &Vote, slot_hashes: &[(Slot, Hash)],