From 8fc096188a1dc71677306767d80272e2df1f2c47 Mon Sep 17 00:00:00 2001 From: Justin Starry Date: Thu, 15 Aug 2024 22:51:32 +0800 Subject: [PATCH 1/2] fix: ensure vote packets can be retried (#2605) (cherry picked from commit ecb44d7bd7bcfcbf1f342eb7eaf2728e7b6eefc5) # Conflicts: # core/src/banking_stage/latest_unprocessed_votes.rs --- .../banking_stage/latest_unprocessed_votes.rs | 155 +++++++++++++++--- .../unprocessed_transaction_storage.rs | 86 ++++++++-- 2 files changed, 202 insertions(+), 39 deletions(-) diff --git a/core/src/banking_stage/latest_unprocessed_votes.rs b/core/src/banking_stage/latest_unprocessed_votes.rs index a62e5bf9b3e..64389be26fb 100644 --- a/core/src/banking_stage/latest_unprocessed_votes.rs +++ b/core/src/banking_stage/latest_unprocessed_votes.rs @@ -14,6 +14,7 @@ use { }, solana_vote_program::vote_instruction::VoteInstruction, std::{ + cmp, collections::HashMap, ops::DerefMut, sync::{Arc, RwLock}, @@ -168,12 +169,13 @@ impl LatestUnprocessedVotes { pub(crate) fn insert_batch( &self, votes: impl Iterator, + should_replenish_taken_votes: bool, ) -> VoteBatchInsertionMetrics { let mut num_dropped_gossip = 0; let mut num_dropped_tpu = 0; for vote in votes { - if let Some(vote) = self.update_latest_vote(vote) { + if let Some(vote) = self.update_latest_vote(vote, should_replenish_taken_votes) { match vote.vote_source { VoteSource::Gossip => num_dropped_gossip += 1, VoteSource::Tpu => num_dropped_tpu += 1, @@ -201,10 +203,12 @@ impl LatestUnprocessedVotes { pub fn update_latest_vote( &self, vote: LatestValidatorVotePacket, + should_replenish_taken_votes: bool, ) -> Option { let pubkey = vote.pubkey(); let slot = vote.slot(); let timestamp = vote.timestamp(); +<<<<<<< HEAD if let Some(latest_vote) = self.get_entry(pubkey) { let (latest_slot, latest_timestamp) = latest_vote .read() @@ -214,10 +218,38 @@ impl LatestUnprocessedVotes { // We directly compare as options to prioritize votes for same slot with timestamp as // Some > None if slot > latest_slot || ((slot == latest_slot) && (timestamp > latest_timestamp)) { +======= + + // Allow votes for later slots or the same slot with later timestamp (refreshed votes) + // We directly compare as options to prioritize votes for same slot with timestamp as + // Some > None + let allow_update = |latest_vote: &LatestValidatorVotePacket| -> bool { + match slot.cmp(&latest_vote.slot()) { + cmp::Ordering::Less => return false, + cmp::Ordering::Greater => return true, + cmp::Ordering::Equal => {} + }; + + // Slots are equal, now check timestamp + match timestamp.cmp(&latest_vote.timestamp()) { + cmp::Ordering::Less => return false, + cmp::Ordering::Greater => return true, + cmp::Ordering::Equal => {} + }; + + // Timestamps are equal, lastly check if vote was taken previously + // and should be replenished + should_replenish_taken_votes && latest_vote.is_vote_taken() + }; + + let with_latest_vote = |latest_vote: &RwLock, + vote: LatestValidatorVotePacket| + -> Option { + let should_try_update = allow_update(&latest_vote.read().unwrap()); + if should_try_update { +>>>>>>> ecb44d7bd7 (fix: ensure vote packets can be retried (#2605)) let mut latest_vote = latest_vote.write().unwrap(); - let latest_slot = latest_vote.slot(); - let latest_timestamp = latest_vote.timestamp(); - if slot > latest_slot || ((slot == latest_slot) && (timestamp > latest_timestamp)) { + if allow_update(&latest_vote) { let old_vote = std::mem::replace(latest_vote.deref_mut(), vote); if old_vote.is_vote_taken() { return None; @@ -521,10 +553,10 @@ mod tests { ); assert!(latest_unprocessed_votes - .update_latest_vote(vote_a) + .update_latest_vote(vote_a, false /* should replenish */) .is_none()); assert!(latest_unprocessed_votes - .update_latest_vote(vote_b) + .update_latest_vote(vote_b, false /* should replenish */) .is_none()); assert_eq!(2, latest_unprocessed_votes.len()); @@ -554,7 +586,7 @@ mod tests { assert_eq!( 1, latest_unprocessed_votes - .update_latest_vote(vote_a) + .update_latest_vote(vote_a, false /* should replenish */) .unwrap() .slot ); @@ -562,7 +594,7 @@ mod tests { assert_eq!( 6, latest_unprocessed_votes - .update_latest_vote(vote_b) + .update_latest_vote(vote_b, false /* should replenish */) .unwrap() .slot ); @@ -582,8 +614,8 @@ mod tests { &keypair_b, None, ); - latest_unprocessed_votes.update_latest_vote(vote_a); - latest_unprocessed_votes.update_latest_vote(vote_b); + latest_unprocessed_votes.update_latest_vote(vote_a, false /* should replenish */); + latest_unprocessed_votes.update_latest_vote(vote_b, false /* should replenish */); assert_eq!(2, latest_unprocessed_votes.len()); assert_eq!( @@ -612,8 +644,8 @@ mod tests { &keypair_b, Some(2), ); - latest_unprocessed_votes.update_latest_vote(vote_a); - latest_unprocessed_votes.update_latest_vote(vote_b); + latest_unprocessed_votes.update_latest_vote(vote_a, false /* should replenish */); + latest_unprocessed_votes.update_latest_vote(vote_b, false /* should replenish */); assert_eq!(2, latest_unprocessed_votes.len()); assert_eq!( @@ -638,8 +670,8 @@ mod tests { &keypair_b, Some(6), ); - latest_unprocessed_votes.update_latest_vote(vote_a); - latest_unprocessed_votes.update_latest_vote(vote_b); + latest_unprocessed_votes.update_latest_vote(vote_a, false /* should replenish */); + latest_unprocessed_votes.update_latest_vote(vote_b, false /* should replenish */); assert_eq!(2, latest_unprocessed_votes.len()); assert_eq!( @@ -664,8 +696,10 @@ mod tests { &keypair_b, Some(3), ); - latest_unprocessed_votes.update_latest_vote(vote_a); - latest_unprocessed_votes.update_latest_vote(vote_b); + latest_unprocessed_votes + .update_latest_vote(vote_a.clone(), false /* should replenish */); + latest_unprocessed_votes + .update_latest_vote(vote_b.clone(), false /* should replenish */); assert_eq!(2, latest_unprocessed_votes.len()); assert_eq!( @@ -676,9 +710,80 @@ mod tests { Some(6), latest_unprocessed_votes.get_latest_timestamp(keypair_b.node_keypair.pubkey()) ); + + // Drain all latest votes + for packet in latest_unprocessed_votes + .latest_votes_per_pubkey + .read() + .unwrap() + .values() + { + packet.write().unwrap().take_vote().inspect(|_vote| { + latest_unprocessed_votes + .num_unprocessed_votes + .fetch_sub(1, Ordering::Relaxed); + }); + } + assert_eq!(0, latest_unprocessed_votes.len()); + + // Same votes with same timestamps should not replenish without flag + latest_unprocessed_votes + .update_latest_vote(vote_a.clone(), false /* should replenish */); + latest_unprocessed_votes + .update_latest_vote(vote_b.clone(), false /* should replenish */); + assert_eq!(0, latest_unprocessed_votes.len()); + + // Same votes with same timestamps should replenish with the flag + latest_unprocessed_votes.update_latest_vote(vote_a, true /* should replenish */); + latest_unprocessed_votes.update_latest_vote(vote_b, true /* should replenish */); + assert_eq!(0, latest_unprocessed_votes.len()); + } + + #[test] +<<<<<<< HEAD +======= + fn test_update_latest_vote_race() { + // There was a race condition in updating the same pubkey in the hashmap + // when the entry does not initially exist. + let latest_unprocessed_votes = Arc::new(LatestUnprocessedVotes::new()); + + const NUM_VOTES: usize = 100; + let keypairs = Arc::new( + (0..NUM_VOTES) + .map(|_| ValidatorVoteKeypairs::new_rand()) + .collect_vec(), + ); + + // Insert votes in parallel + let insert_vote = |latest_unprocessed_votes: &LatestUnprocessedVotes, + keypairs: &Arc>, + i: usize| { + let vote = from_slots(vec![(i as u64, 1)], VoteSource::Gossip, &keypairs[i], None); + latest_unprocessed_votes.update_latest_vote(vote, false /* should replenish */); + }; + + let hdl = Builder::new() + .spawn({ + let latest_unprocessed_votes = latest_unprocessed_votes.clone(); + let keypairs = keypairs.clone(); + move || { + for i in 0..NUM_VOTES { + insert_vote(&latest_unprocessed_votes, &keypairs, i); + } + } + }) + .unwrap(); + + for i in 0..NUM_VOTES { + insert_vote(&latest_unprocessed_votes, &keypairs, i); + } + + hdl.join().unwrap(); + assert_eq!(NUM_VOTES, latest_unprocessed_votes.len()); } #[test] +>>>>>>> ecb44d7bd7 (fix: ensure vote packets can be retried (#2605)) fn test_simulate_threads() { let latest_unprocessed_votes = Arc::new(LatestUnprocessedVotes::new()); let latest_unprocessed_votes_tpu = latest_unprocessed_votes.clone(); @@ -700,7 +805,8 @@ mod tests { &keypairs[rng.gen_range(0..10)], None, ); - latest_unprocessed_votes.update_latest_vote(vote); + latest_unprocessed_votes + .update_latest_vote(vote, false /* should replenish */); } }) .unwrap(); @@ -715,7 +821,8 @@ mod tests { &keypairs_tpu[rng.gen_range(0..10)], None, ); - latest_unprocessed_votes_tpu.update_latest_vote(vote); + latest_unprocessed_votes_tpu + .update_latest_vote(vote, false /* should replenish */); if i % 214 == 0 { // Simulate draining and processing packets let latest_votes_per_pubkey = latest_unprocessed_votes_tpu @@ -748,8 +855,8 @@ mod tests { let vote_a = from_slots(vec![(1, 1)], VoteSource::Gossip, &keypair_a, None); let vote_b = from_slots(vec![(2, 1)], VoteSource::Tpu, &keypair_b, None); - latest_unprocessed_votes.update_latest_vote(vote_a); - latest_unprocessed_votes.update_latest_vote(vote_b); + latest_unprocessed_votes.update_latest_vote(vote_a, false /* should replenish */); + latest_unprocessed_votes.update_latest_vote(vote_b, false /* should replenish */); // Don't forward 0 stake accounts let forwarded = latest_unprocessed_votes @@ -843,10 +950,10 @@ mod tests { let vote_c = from_slots(vec![(3, 1)], VoteSource::Tpu, &keypair_c, None); let vote_d = from_slots(vec![(4, 1)], VoteSource::Gossip, &keypair_d, None); - latest_unprocessed_votes.update_latest_vote(vote_a); - latest_unprocessed_votes.update_latest_vote(vote_b); - latest_unprocessed_votes.update_latest_vote(vote_c); - latest_unprocessed_votes.update_latest_vote(vote_d); + latest_unprocessed_votes.update_latest_vote(vote_a, false /* should replenish */); + latest_unprocessed_votes.update_latest_vote(vote_b, false /* should replenish */); + latest_unprocessed_votes.update_latest_vote(vote_c, false /* should replenish */); + latest_unprocessed_votes.update_latest_vote(vote_d, false /* should replenish */); assert_eq!(4, latest_unprocessed_votes.len()); latest_unprocessed_votes.clear_forwarded_packets(); diff --git a/core/src/banking_stage/unprocessed_transaction_storage.rs b/core/src/banking_stage/unprocessed_transaction_storage.rs index f8d99c77900..60b4c83c982 100644 --- a/core/src/banking_stage/unprocessed_transaction_storage.rs +++ b/core/src/banking_stage/unprocessed_transaction_storage.rs @@ -421,18 +421,18 @@ impl VoteStorage { &mut self, deserialized_packets: Vec, ) -> VoteBatchInsertionMetrics { - self.latest_unprocessed_votes - .insert_batch( - deserialized_packets - .into_iter() - .filter_map(|deserialized_packet| { - LatestValidatorVotePacket::new_from_immutable( - Arc::new(deserialized_packet), - self.vote_source, - ) - .ok() - }), - ) + self.latest_unprocessed_votes.insert_batch( + deserialized_packets + .into_iter() + .filter_map(|deserialized_packet| { + LatestValidatorVotePacket::new_from_immutable( + Arc::new(deserialized_packet), + self.vote_source, + ) + .ok() + }), + false, // should_replenish_taken_votes + ) } fn filter_forwardable_packets_and_add_batches( @@ -503,12 +503,15 @@ impl VoteStorage { ) .ok() }), + true, // should_replenish_taken_votes ); } else { - self.latest_unprocessed_votes - .insert_batch(vote_packets.into_iter().filter_map(|packet| { + self.latest_unprocessed_votes.insert_batch( + vote_packets.into_iter().filter_map(|packet| { LatestValidatorVotePacket::new_from_immutable(packet, self.vote_source).ok() - })); + }), + true, // should_replenish_taken_votes + ); } } @@ -968,6 +971,7 @@ mod tests { super::*, solana_ledger::genesis_utils::{create_genesis_config, GenesisConfigInfo}, solana_perf::packet::{Packet, PacketFlags}, + solana_runtime::genesis_utils, solana_sdk::{ hash::Hash, signature::{Keypair, Signer}, @@ -1236,6 +1240,58 @@ mod tests { Ok(()) } + #[test] + fn test_process_packets_retryable_indexes_reinserted() -> Result<(), Box> { + let node_keypair = Keypair::new(); + let genesis_config = + genesis_utils::create_genesis_config_with_leader(100, &node_keypair.pubkey(), 200) + .genesis_config; + let (bank, _bank_forks) = Bank::new_with_bank_forks_for_tests(&genesis_config); + let vote_keypair = Keypair::new(); + let mut vote = Packet::from_data( + None, + new_tower_sync_transaction( + TowerSync::default(), + Hash::new_unique(), + &node_keypair, + &vote_keypair, + &vote_keypair, + None, + ), + )?; + vote.meta_mut().flags.set(PacketFlags::SIMPLE_VOTE_TX, true); + + let mut transaction_storage = UnprocessedTransactionStorage::new_vote_storage( + Arc::new(LatestUnprocessedVotes::new()), + VoteSource::Tpu, + ); + + transaction_storage.insert_batch(vec![ImmutableDeserializedPacket::new(vote.clone())?]); + assert_eq!(1, transaction_storage.len()); + + // When processing packets, return all packets as retryable so that they + // are reinserted into storage + let _ = transaction_storage.process_packets( + bank.clone(), + &BankingStageStats::default(), + &mut LeaderSlotMetricsTracker::new(0), + |packets, _payload| { + // Return all packets indexes as retryable + Some( + packets + .iter() + .enumerate() + .map(|(index, _packet)| index) + .collect_vec(), + ) + }, + ); + + // All packets should remain in the transaction storage + assert_eq!(1, transaction_storage.len()); + Ok(()) + } + #[test] fn test_prepare_packets_to_forward() { solana_logger::setup(); From 3a725db2c172ab475e039586f697383893ee34ad Mon Sep 17 00:00:00 2001 From: Justin Starry Date: Thu, 15 Aug 2024 22:57:31 +0000 Subject: [PATCH 2/2] fix conflicts --- .../banking_stage/latest_unprocessed_votes.rs | 85 ++++--------------- .../unprocessed_transaction_storage.rs | 4 +- 2 files changed, 19 insertions(+), 70 deletions(-) diff --git a/core/src/banking_stage/latest_unprocessed_votes.rs b/core/src/banking_stage/latest_unprocessed_votes.rs index 64389be26fb..b4cd30c0502 100644 --- a/core/src/banking_stage/latest_unprocessed_votes.rs +++ b/core/src/banking_stage/latest_unprocessed_votes.rs @@ -208,17 +208,6 @@ impl LatestUnprocessedVotes { let pubkey = vote.pubkey(); let slot = vote.slot(); let timestamp = vote.timestamp(); -<<<<<<< HEAD - if let Some(latest_vote) = self.get_entry(pubkey) { - let (latest_slot, latest_timestamp) = latest_vote - .read() - .map(|vote| (vote.slot(), vote.timestamp())) - .unwrap(); - // Allow votes for later slots or the same slot with later timestamp (refreshed votes) - // We directly compare as options to prioritize votes for same slot with timestamp as - // Some > None - if slot > latest_slot || ((slot == latest_slot) && (timestamp > latest_timestamp)) { -======= // Allow votes for later slots or the same slot with later timestamp (refreshed votes) // We directly compare as options to prioritize votes for same slot with timestamp as @@ -247,7 +236,6 @@ impl LatestUnprocessedVotes { -> Option { let should_try_update = allow_update(&latest_vote.read().unwrap()); if should_try_update { ->>>>>>> ecb44d7bd7 (fix: ensure vote packets can be retried (#2605)) let mut latest_vote = latest_vote.write().unwrap(); if allow_update(&latest_vote) { let old_vote = std::mem::replace(latest_vote.deref_mut(), vote); @@ -258,14 +246,23 @@ impl LatestUnprocessedVotes { } } } - return Some(vote); - } + Some(vote) + }; - // Should have low lock contention because this is only hit on the first few blocks of startup - // and when a new vote account starts voting. - let mut latest_votes_per_pubkey = self.latest_votes_per_pubkey.write().unwrap(); - latest_votes_per_pubkey.insert(pubkey, Arc::new(RwLock::new(vote))); - None + if let Some(latest_vote) = self.get_entry(pubkey) { + with_latest_vote(&latest_vote, vote) + } else { + // Grab write-lock to insert new vote. + match self.latest_votes_per_pubkey.write().unwrap().entry(pubkey) { + std::collections::hash_map::Entry::Occupied(entry) => { + with_latest_vote(entry.get(), vote) + } + std::collections::hash_map::Entry::Vacant(entry) => { + entry.insert(Arc::new(RwLock::new(vote))); + None + } + } + } } #[cfg(test)] @@ -718,11 +715,7 @@ mod tests { .unwrap() .values() { - packet.write().unwrap().take_vote().inspect(|_vote| { - latest_unprocessed_votes - .num_unprocessed_votes - .fetch_sub(1, Ordering::Relaxed); - }); + let _ = packet.write().unwrap().take_vote(); } assert_eq!(0, latest_unprocessed_votes.len()); @@ -740,50 +733,6 @@ mod tests { } #[test] -<<<<<<< HEAD -======= - fn test_update_latest_vote_race() { - // There was a race condition in updating the same pubkey in the hashmap - // when the entry does not initially exist. - let latest_unprocessed_votes = Arc::new(LatestUnprocessedVotes::new()); - - const NUM_VOTES: usize = 100; - let keypairs = Arc::new( - (0..NUM_VOTES) - .map(|_| ValidatorVoteKeypairs::new_rand()) - .collect_vec(), - ); - - // Insert votes in parallel - let insert_vote = |latest_unprocessed_votes: &LatestUnprocessedVotes, - keypairs: &Arc>, - i: usize| { - let vote = from_slots(vec![(i as u64, 1)], VoteSource::Gossip, &keypairs[i], None); - latest_unprocessed_votes.update_latest_vote(vote, false /* should replenish */); - }; - - let hdl = Builder::new() - .spawn({ - let latest_unprocessed_votes = latest_unprocessed_votes.clone(); - let keypairs = keypairs.clone(); - move || { - for i in 0..NUM_VOTES { - insert_vote(&latest_unprocessed_votes, &keypairs, i); - } - } - }) - .unwrap(); - - for i in 0..NUM_VOTES { - insert_vote(&latest_unprocessed_votes, &keypairs, i); - } - - hdl.join().unwrap(); - assert_eq!(NUM_VOTES, latest_unprocessed_votes.len()); - } - - #[test] ->>>>>>> ecb44d7bd7 (fix: ensure vote packets can be retried (#2605)) fn test_simulate_threads() { let latest_unprocessed_votes = Arc::new(LatestUnprocessedVotes::new()); let latest_unprocessed_votes_tpu = latest_unprocessed_votes.clone(); diff --git a/core/src/banking_stage/unprocessed_transaction_storage.rs b/core/src/banking_stage/unprocessed_transaction_storage.rs index 60b4c83c982..a78c668ad80 100644 --- a/core/src/banking_stage/unprocessed_transaction_storage.rs +++ b/core/src/banking_stage/unprocessed_transaction_storage.rs @@ -1250,8 +1250,8 @@ mod tests { let vote_keypair = Keypair::new(); let mut vote = Packet::from_data( None, - new_tower_sync_transaction( - TowerSync::default(), + new_vote_state_update_transaction( + VoteStateUpdate::default(), Hash::new_unique(), &node_keypair, &vote_keypair,