diff --git a/votor/src/certificate_pool.rs b/votor/src/certificate_pool.rs index f6267abfa5..168ebbcc31 100644 --- a/votor/src/certificate_pool.rs +++ b/votor/src/certificate_pool.rs @@ -87,7 +87,30 @@ impl From for AddVoteError { } } -#[derive(Default)] +fn get_key_and_stakes( + epoch_schedule: &EpochSchedule, + epoch_stakes_map: &HashMap, + slot: Slot, + rank: u16, +) -> Result<(Pubkey, Stake, Stake), AddVoteError> { + let epoch = epoch_schedule.get_epoch(slot); + let epoch_stakes = epoch_stakes_map + .get(&epoch) + .ok_or(AddVoteError::EpochStakesNotFound(epoch))?; + let Some((vote_key, _)) = epoch_stakes + .bls_pubkey_to_rank_map() + .get_pubkey(rank as usize) + else { + return Err(AddVoteError::InvalidRank(rank)); + }; + let stake = epoch_stakes.vote_account_stake(vote_key); + if stake == 0 { + // Since we have a valid rank, this should never happen, there is no rank for zero stake. + panic!("Validator stake is zero for pubkey: {vote_key}"); + } + Ok((*vote_key, stake, epoch_stakes.total_stake())) +} + pub struct CertificatePool { my_pubkey: Pubkey, // Vote pools to do bean counting for votes. @@ -100,19 +123,11 @@ pub struct CertificatePool { pub parent_ready_tracker: ParentReadyTracker, /// Highest block that has a NotarizeFallback certificate, for use in producing our leader window highest_notarized_fallback: Option<(Slot, Hash)>, - /// Highest slot that has a Finalized variant certificate, for use in notifying RPC + /// Highest slot that has a Finalized variant certificate highest_finalized_slot: Option, /// Highest slot that has Finalize+Notarize or FinalizeFast, for use in standstill /// Also add a bool to indicate whether this slot has FinalizeFast certificate highest_finalized_with_notarize: Option<(Slot, bool)>, - // Cached epoch_schedule - epoch_schedule: EpochSchedule, - // Cached epoch_stakes_map - epoch_stakes_map: Arc>, - // The current root, no need to save anything before this slot. - root: Slot, - // The epoch of current root. - root_epoch: Epoch, /// The certificate sender, if set, newly created certificates will be sent here certificate_sender: Option>, /// Stats for the certificate pool @@ -132,40 +147,17 @@ impl CertificatePool { let root_block = (bank.slot(), bank.block_id().unwrap_or_default()); let parent_ready_tracker = ParentReadyTracker::new(my_pubkey, root_block); - let mut pool = Self { + Self { my_pubkey, vote_pools: BTreeMap::new(), completed_certificates: BTreeMap::new(), highest_notarized_fallback: None, highest_finalized_slot: None, highest_finalized_with_notarize: None, - epoch_schedule: EpochSchedule::default(), - epoch_stakes_map: Arc::new(HashMap::new()), - root: bank.slot(), - root_epoch: Epoch::default(), certificate_sender, parent_ready_tracker, stats: CertificatePoolStats::new(), slot_stake_counters_map: BTreeMap::new(), - }; - - // Update the epoch_stakes_map and root - pool.update_epoch_stakes_map(bank); - pool.root = bank.slot(); - - pool - } - - pub fn root(&self) -> Slot { - self.root - } - - fn update_epoch_stakes_map(&mut self, bank: &Bank) { - let epoch = bank.epoch(); - if self.epoch_stakes_map.is_empty() || epoch > self.root_epoch { - self.epoch_stakes_map = Arc::new(bank.epoch_stakes_map().clone()); - self.root_epoch = epoch; - self.epoch_schedule = bank.epoch_schedule().clone(); } } @@ -380,30 +372,6 @@ impl CertificatePool { } } - fn get_key_and_stakes( - &self, - slot: Slot, - rank: u16, - ) -> Result<(Pubkey, Stake, Stake), AddVoteError> { - let epoch = self.epoch_schedule.get_epoch(slot); - let epoch_stakes = self - .epoch_stakes_map - .get(&epoch) - .ok_or(AddVoteError::EpochStakesNotFound(epoch))?; - let Some((vote_key, _)) = epoch_stakes - .bls_pubkey_to_rank_map() - .get_pubkey(rank as usize) - else { - return Err(AddVoteError::InvalidRank(rank)); - }; - let stake = epoch_stakes.vote_account_stake(vote_key); - if stake == 0 { - // Since we have a valid rank, this should never happen, there is no rank for zero stake. - panic!("Validator stake is zero for pubkey: {vote_key}"); - } - Ok((*vote_key, stake, epoch_stakes.total_stake())) - } - /// Adds the new vote the the certificate pool. If a new certificate is created /// as a result of this, send it via the `self.certificate_sender` /// @@ -414,17 +382,25 @@ impl CertificatePool { /// return the slot pub fn add_message( &mut self, + epoch_schedule: &EpochSchedule, + epoch_stakes_map: &HashMap, + root_slot: Slot, my_vote_pubkey: &Pubkey, message: &BLSMessage, events: &mut Vec, ) -> Result<(Option, Vec>), AddVoteError> { let current_highest_finalized_slot = self.highest_finalized_slot; let new_certficates_to_send = match message { - BLSMessage::Vote(vote_message) => { - self.add_vote(my_vote_pubkey, vote_message, events)? - } + BLSMessage::Vote(vote_message) => self.add_vote( + epoch_schedule, + epoch_stakes_map, + root_slot, + my_vote_pubkey, + vote_message, + events, + )?, BLSMessage::Certificate(certificate_message) => { - self.add_certificate(certificate_message, events)? + self.add_certificate(root_slot, certificate_message, events)? } }; // If we have a new highest finalized slot, return it @@ -438,15 +414,18 @@ impl CertificatePool { fn add_vote( &mut self, + epoch_schedule: &EpochSchedule, + epoch_stakes_map: &HashMap, + root_slot: Slot, my_vote_pubkey: &Pubkey, vote_message: &VoteMessage, events: &mut Vec, ) -> Result>, AddVoteError> { let vote = &vote_message.vote; let rank = vote_message.rank; - let slot = vote.slot(); + let vote_slot = vote.slot(); let (validator_vote_key, validator_stake, total_stake) = - self.get_key_and_stakes(slot, rank)?; + get_key_and_stakes(epoch_schedule, epoch_stakes_map, vote_slot, rank)?; // Since we have a valid rank, this should never happen, there is no rank for zero stake. assert_ne!( @@ -455,7 +434,7 @@ impl CertificatePool { ); self.stats.incoming_votes = self.stats.incoming_votes.saturating_add(1); - if slot < self.root { + if vote_slot < root_slot { self.stats.out_of_range_votes = self.stats.out_of_range_votes.saturating_add(1); return Err(AddVoteError::UnrootedSlot); } @@ -467,18 +446,18 @@ impl CertificatePool { }); let vote_type = VoteType::get_type(vote); if let Some(conflicting_type) = - self.has_conflicting_vote(slot, vote_type, &validator_vote_key, &block_id) + self.has_conflicting_vote(vote_slot, vote_type, &validator_vote_key, &block_id) { self.stats.conflicting_votes = self.stats.conflicting_votes.saturating_add(1); return Err(AddVoteError::ConflictingVoteType( vote_type, conflicting_type, - slot, + vote_slot, validator_vote_key, )); } match self.update_vote_pool( - slot, + vote_slot, vote_type, block_id, vote_message, @@ -493,7 +472,7 @@ impl CertificatePool { Some(entry_stake) => { let fallback_vote_counters = self .slot_stake_counters_map - .entry(slot) + .entry(vote_slot) .or_insert_with(|| SlotStakeCounters::new(total_stake)); fallback_vote_counters.add_vote( vote, @@ -511,12 +490,13 @@ impl CertificatePool { fn add_certificate( &mut self, + root_slot: Slot, certificate_message: &CertificateMessage, events: &mut Vec, ) -> Result>, AddVoteError> { let certificate_id = certificate_message.certificate; self.stats.incoming_certs = self.stats.incoming_certs.saturating_add(1); - if certificate_id.slot() < self.root { + if certificate_id.slot() < root_slot { self.stats.out_of_range_certs = self.stats.out_of_range_certs.saturating_add(1); return Err(AddVoteError::UnrootedSlot); } @@ -668,9 +648,7 @@ impl CertificatePool { } /// Cleanup any old slots from the certificate pool - pub fn handle_new_root(&mut self, bank: Arc) { - let new_root = bank.slot(); - self.root = new_root; + pub fn prune_old_state(&mut self, root_slot: Slot) { // `completed_certificates`` now only contains entries >= `slot` self.completed_certificates .retain(|cert_id, _| match cert_id { @@ -678,12 +656,11 @@ impl CertificatePool { | Certificate::FinalizeFast(s, _) | Certificate::Notarize(s, _) | Certificate::NotarizeFallback(s, _) - | Certificate::Skip(s) => *s >= self.root, + | Certificate::Skip(s) => s >= &root_slot, }); - self.vote_pools = self.vote_pools.split_off(&(new_root, VoteType::Finalize)); - self.slot_stake_counters_map = self.slot_stake_counters_map.split_off(&new_root); - self.parent_ready_tracker.set_root(new_root); - self.update_epoch_stakes_map(&bank); + self.vote_pools = self.vote_pools.split_off(&(root_slot, VoteType::Finalize)); + self.slot_stake_counters_map = self.slot_stake_counters_map.split_off(&root_slot); + self.parent_ready_tracker.set_root(root_slot); } /// Updates the pubkey used for logging purposes only. @@ -699,9 +676,8 @@ impl CertificatePool { } pub fn get_certs_for_standstill(&self) -> Vec> { - let (highest_finalized_with_notarize_slot, has_fast_finalize) = self - .highest_finalized_with_notarize - .unwrap_or((self.root, false)); + let (highest_finalized_with_notarize_slot, has_fast_finalize) = + self.highest_finalized_with_notarize.unwrap_or((0, true)); self.completed_certificates .iter() .filter_map(|(cert_id, cert)| { @@ -738,12 +714,10 @@ pub fn load_from_blockstore( certificate_sender: Option>, events: &mut Vec, ) -> CertificatePool { + let root_slot = root_bank.slot(); let mut cert_pool = CertificatePool::new_from_root_bank(*my_pubkey, root_bank, certificate_sender); - for (slot, slot_cert) in blockstore - .slot_certificates_iterator(root_bank.slot()) - .unwrap() - { + for (slot, slot_cert) in blockstore.slot_certificates_iterator(root_slot).unwrap() { let certs = slot_cert .notarize_fallback_certificates .into_iter() @@ -816,7 +790,11 @@ mod tests { BankForks::new_rw_arc(bank0) } - fn create_keypairs_and_pool() -> (Vec, CertificatePool) { + fn create_initial_state() -> ( + Vec, + CertificatePool, + Arc>, + ) { // Create 10 node validatorvotekeypairs vec let validator_keypairs = (0..10) .map(|_| ValidatorVoteKeypairs::new_rand()) @@ -825,19 +803,23 @@ mod tests { let root_bank = bank_forks.read().unwrap().root_bank(); ( validator_keypairs, - CertificatePool::new_from_root_bank(Pubkey::new_unique(), &root_bank.clone(), None), + CertificatePool::new_from_root_bank(Pubkey::new_unique(), &root_bank, None), + bank_forks, ) } - #[cfg(test)] fn add_certificate( pool: &mut CertificatePool, + bank: &Bank, validator_keypairs: &[ValidatorVoteKeypairs], vote: Vote, ) { for rank in 0..6 { assert!(pool .add_message( + bank.epoch_schedule(), + bank.epoch_stakes_map(), + bank.slot(), &Pubkey::new_unique(), &dummy_transaction(validator_keypairs, &vote, rank), &mut vec![] @@ -846,6 +828,9 @@ mod tests { } assert!(pool .add_message( + bank.epoch_schedule(), + bank.epoch_stakes_map(), + bank.slot(), &Pubkey::new_unique(), &dummy_transaction(validator_keypairs, &vote, 6), &mut vec![] @@ -862,6 +847,7 @@ mod tests { fn add_skip_vote_range( pool: &mut CertificatePool, + bank: &Bank, start: Slot, end: Slot, keypairs: &[ValidatorVoteKeypairs], @@ -871,6 +857,9 @@ mod tests { let vote = Vote::new_skip_vote(slot); assert!(pool .add_message( + bank.epoch_schedule(), + bank.epoch_stakes_map(), + bank.slot(), &Pubkey::new_unique(), &dummy_transaction(keypairs, &vote, rank), &mut vec![] @@ -881,7 +870,7 @@ mod tests { #[test] fn test_make_decision_leader_does_not_start_if_notarization_missing() { - let (_, pool) = create_keypairs_and_pool(); + let (_, pool, _) = create_initial_state(); // No notarization set, pool is default let parent_slot = 2; @@ -897,7 +886,7 @@ mod tests { #[test] fn test_make_decision_first_alpenglow_slot_edge_case_1() { - let (_, pool) = create_keypairs_and_pool(); + let (_, pool, _) = create_initial_state(); // If parent_slot == 0, you don't need a notarization certificate // Because leader_slot == parent_slot + 1, you don't need a skip certificate @@ -909,7 +898,7 @@ mod tests { #[test] fn test_make_decision_first_alpenglow_slot_edge_case_2() { - let (validator_keypairs, mut pool) = create_keypairs_and_pool(); + let (validator_keypairs, mut pool, bank_forks) = create_initial_state(); // If parent_slot < first_alpenglow_slot, and parent_slot > 0 // no notarization certificate is required, but a skip @@ -926,6 +915,7 @@ mod tests { add_certificate( &mut pool, + &bank_forks.read().unwrap().root_bank(), &validator_keypairs, Vote::new_skip_vote(first_alpenglow_slot), ); @@ -935,7 +925,7 @@ mod tests { #[test] fn test_make_decision_first_alpenglow_slot_edge_case_3() { - let (_, pool) = create_keypairs_and_pool(); + let (_, pool, _) = create_initial_state(); // If parent_slot == first_alpenglow_slot, and // first_alpenglow_slot > 0, you need a notarization certificate let parent_slot = 2; @@ -950,7 +940,7 @@ mod tests { #[test] fn test_make_decision_first_alpenglow_slot_edge_case_4() { - let (validator_keypairs, mut pool) = create_keypairs_and_pool(); + let (validator_keypairs, mut pool, bank_forks) = create_initial_state(); // If parent_slot < first_alpenglow_slot, and parent_slot == 0, // no notarization certificate is required, but a skip certificate will @@ -967,6 +957,7 @@ mod tests { add_certificate( &mut pool, + &bank_forks.read().unwrap().root_bank(), &validator_keypairs, Vote::new_skip_vote(first_alpenglow_slot), ); @@ -975,11 +966,16 @@ mod tests { #[test] fn test_make_decision_first_alpenglow_slot_edge_case_5() { - let (validator_keypairs, mut pool) = create_keypairs_and_pool(); + let (validator_keypairs, mut pool, bank_forks) = create_initial_state(); // Valid skip certificate for 1-9 exists for slot in 1..=9 { - add_certificate(&mut pool, &validator_keypairs, Vote::new_skip_vote(slot)); + add_certificate( + &mut pool, + &bank_forks.read().unwrap().root_bank(), + &validator_keypairs, + Vote::new_skip_vote(slot), + ); } // Parent slot is equal to 0, so no notarization certificate required @@ -991,11 +987,16 @@ mod tests { #[test] fn test_make_decision_first_alpenglow_slot_edge_case_6() { - let (validator_keypairs, mut pool) = create_keypairs_and_pool(); + let (validator_keypairs, mut pool, bank_forks) = create_initial_state(); // Valid skip certificate for 1-9 exists for slot in 1..=9 { - add_certificate(&mut pool, &validator_keypairs, Vote::new_skip_vote(slot)); + add_certificate( + &mut pool, + &bank_forks.read().unwrap().root_bank(), + &validator_keypairs, + Vote::new_skip_vote(slot), + ); } // Parent slot is less than first_alpenglow_slot, so no notarization certificate required let my_leader_slot = 10; @@ -1006,7 +1007,7 @@ mod tests { #[test] fn test_make_decision_leader_does_not_start_if_skip_certificate_missing() { - let (validator_keypairs, mut pool) = create_keypairs_and_pool(); + let (validator_keypairs, mut pool, _) = create_initial_state(); let bank_forks = create_bank_forks(&validator_keypairs); let my_pubkey = validator_keypairs[0].vote_keypair.pubkey(); @@ -1019,6 +1020,7 @@ mod tests { // Notarize slot 5 add_certificate( &mut pool, + &bank_forks.read().unwrap().root_bank(), &validator_keypairs, Vote::new_notarization_vote(5, Hash::default()), ); @@ -1038,11 +1040,12 @@ mod tests { #[test] fn test_make_decision_leader_starts_when_no_skip_required() { - let (validator_keypairs, mut pool) = create_keypairs_and_pool(); + let (validator_keypairs, mut pool, bank_forks) = create_initial_state(); // Notarize slot 5 add_certificate( &mut pool, + &bank_forks.read().unwrap().root_bank(), &validator_keypairs, Vote::new_notarization_vote(5, Hash::default()), ); @@ -1057,11 +1060,12 @@ mod tests { #[test] fn test_make_decision_leader_starts_if_notarized_and_skips_valid() { - let (validator_keypairs, mut pool) = create_keypairs_and_pool(); + let (validator_keypairs, mut pool, bank_forks) = create_initial_state(); // Notarize slot 5 add_certificate( &mut pool, + &bank_forks.read().unwrap().root_bank(), &validator_keypairs, Vote::new_notarization_vote(5, Hash::default()), ); @@ -1069,7 +1073,12 @@ mod tests { // Valid skip certificate for 6-9 exists for slot in 6..=9 { - add_certificate(&mut pool, &validator_keypairs, Vote::new_skip_vote(slot)); + add_certificate( + &mut pool, + &bank_forks.read().unwrap().root_bank(), + &validator_keypairs, + Vote::new_skip_vote(slot), + ); } let my_leader_slot = 10; @@ -1080,11 +1089,12 @@ mod tests { #[test] fn test_make_decision_leader_starts_if_skip_range_superset() { - let (validator_keypairs, mut pool) = create_keypairs_and_pool(); + let (validator_keypairs, mut pool, bank_forks) = create_initial_state(); // Notarize slot 5 add_certificate( &mut pool, + &bank_forks.read().unwrap().root_bank(), &validator_keypairs, Vote::new_notarization_vote(5, Hash::default()), ); @@ -1096,6 +1106,7 @@ mod tests { for slot in 4..=9 { add_certificate( &mut pool, + &bank_forks.read().unwrap().root_bank(), &validator_keypairs, Vote::new_skip_fallback_vote(slot), ); @@ -1116,7 +1127,7 @@ mod tests { vote: Vote, expected_certificate_types: Vec, ) { - let (validator_keypairs, mut pool) = create_keypairs_and_pool(); + let (validator_keypairs, mut pool, bank_forks) = create_initial_state(); let my_validator_ix = 5; let highest_slot_fn = match &vote { Vote::Finalize(_) => |pool: &CertificatePool| pool.highest_finalized_slot(), @@ -1125,8 +1136,12 @@ mod tests { Vote::Skip(_) => |pool: &CertificatePool| pool.highest_skip_slot(), Vote::SkipFallback(_) => |pool: &CertificatePool| pool.highest_skip_slot(), }; + let bank = bank_forks.read().unwrap().root_bank(); assert!(pool .add_message( + bank.epoch_schedule(), + bank.epoch_stakes_map(), + bank.slot(), &Pubkey::new_unique(), &dummy_transaction(&validator_keypairs, &vote, my_validator_ix), &mut vec![] @@ -1137,6 +1152,9 @@ mod tests { // Same key voting again shouldn't make a certificate assert!(pool .add_message( + bank.epoch_schedule(), + bank.epoch_stakes_map(), + bank.slot(), &Pubkey::new_unique(), &dummy_transaction(&validator_keypairs, &vote, my_validator_ix), &mut vec![] @@ -1146,6 +1164,9 @@ mod tests { for rank in 0..4 { assert!(pool .add_message( + bank.epoch_schedule(), + bank.epoch_stakes_map(), + bank.slot(), &Pubkey::new_unique(), &dummy_transaction(&validator_keypairs, &vote, rank), &mut vec![] @@ -1156,6 +1177,9 @@ mod tests { let new_validator_ix = 6; let (new_finalized_slot, certs_to_send) = pool .add_message( + bank.epoch_schedule(), + bank.epoch_stakes_map(), + bank.slot(), &Pubkey::new_unique(), &dummy_transaction(&validator_keypairs, &vote, new_validator_ix), &mut vec![], @@ -1177,6 +1201,9 @@ mod tests { for cert in certs_to_send { let (new_finalized_slot, certs_to_send) = pool .add_message( + bank.epoch_schedule(), + bank.epoch_stakes_map(), + bank.slot(), &Pubkey::new_unique(), &BLSMessage::Certificate((*cert).clone()), &mut vec![], @@ -1202,7 +1229,7 @@ mod tests { )] #[test_case(CertificateType::Skip, Vote::new_skip_vote(8))] fn test_add_certificate_with_types(certificate_type: CertificateType, vote: Vote) { - let (validator_keypairs, mut pool) = create_keypairs_and_pool(); + let (validator_keypairs, mut pool, bank_forks) = create_initial_state(); let certificate = Certificate::new(certificate_type, vote.slot(), vote.block_id().copied()); @@ -1211,10 +1238,18 @@ mod tests { signature: BLSSignature::default(), bitmap: BitVec::new(), }; + let bank = bank_forks.read().unwrap().root_bank(); let bls_message = BLSMessage::Certificate(certificate_message.clone()); // Add the certificate to the pool let (new_finalized_slot, certs_to_send) = pool - .add_message(&Pubkey::new_unique(), &bls_message, &mut vec![]) + .add_message( + bank.epoch_schedule(), + bank.epoch_stakes_map(), + bank.slot(), + &Pubkey::new_unique(), + &bls_message, + &mut vec![], + ) .unwrap(); // Because this is the first certificate of this type, it should be sent out. if certificate_type == CertificateType::Finalize @@ -1229,7 +1264,14 @@ mod tests { // Adding the cert again will not trigger another send let (new_finalized_slot, certs_to_send) = pool - .add_message(&Pubkey::new_unique(), &bls_message, &mut vec![]) + .add_message( + bank.epoch_schedule(), + bank.epoch_stakes_map(), + bank.slot(), + &Pubkey::new_unique(), + &bls_message, + &mut vec![], + ) .unwrap(); assert!(new_finalized_slot.is_none()); assert_eq!(certs_to_send, []); @@ -1238,6 +1280,9 @@ mod tests { for rank in 0..validator_keypairs.len() { let (_, certs_to_send) = pool .add_message( + bank.epoch_schedule(), + bank.epoch_stakes_map(), + bank.slot(), &Pubkey::new_unique(), &dummy_transaction(&validator_keypairs, &vote, rank), &mut vec![], @@ -1251,9 +1296,13 @@ mod tests { #[test] fn test_add_vote_zero_stake() { - let (_, mut pool) = create_keypairs_and_pool(); + let (_, mut pool, bank_forks) = create_initial_state(); + let bank = bank_forks.read().unwrap().root_bank(); assert_eq!( pool.add_message( + bank.epoch_schedule(), + bank.epoch_stakes_map(), + bank.slot(), &Pubkey::new_unique(), &BLSMessage::Vote(VoteMessage { vote: Vote::new_skip_vote(5), @@ -1278,17 +1327,26 @@ mod tests { #[test] fn test_consecutive_slots() { - let (validator_keypairs, mut pool) = create_keypairs_and_pool(); + let (validator_keypairs, mut pool, bank_forks) = create_initial_state(); - add_certificate(&mut pool, &validator_keypairs, Vote::new_skip_vote(15)); + add_certificate( + &mut pool, + &bank_forks.read().unwrap().root_bank(), + &validator_keypairs, + Vote::new_skip_vote(15), + ); assert_eq!(pool.highest_skip_slot(), 15); + let bank = bank_forks.read().unwrap().root_bank(); for i in 0..validator_keypairs.len() { let slot = (i as u64).saturating_add(16); let vote = Vote::new_skip_vote(slot); // These should not extend the skip range assert!(pool .add_message( + bank.epoch_schedule(), + bank.epoch_stakes_map(), + bank.slot(), &Pubkey::new_unique(), &dummy_transaction(&validator_keypairs, &vote, i), &mut vec![] @@ -1301,19 +1359,40 @@ mod tests { #[test] fn test_multi_skip_cert() { - let (validator_keypairs, mut pool) = create_keypairs_and_pool(); + let (validator_keypairs, mut pool, bank_forks) = create_initial_state(); // We have 10 validators, 40% voted for (5, 15) for rank in 0..4 { - add_skip_vote_range(&mut pool, 5, 15, &validator_keypairs, rank); + add_skip_vote_range( + &mut pool, + &bank_forks.read().unwrap().root_bank(), + 5, + 15, + &validator_keypairs, + rank, + ); } // 30% voted for (5, 8) for rank in 4..7 { - add_skip_vote_range(&mut pool, 5, 8, &validator_keypairs, rank); + add_skip_vote_range( + &mut pool, + &bank_forks.read().unwrap().root_bank(), + 5, + 8, + &validator_keypairs, + rank, + ); } // The rest voted for (11, 15) for rank in 7..10 { - add_skip_vote_range(&mut pool, 11, 15, &validator_keypairs, rank); + add_skip_vote_range( + &mut pool, + &bank_forks.read().unwrap().root_bank(), + 11, + 15, + &validator_keypairs, + rank, + ); } // Test slots from 5 to 15, [5, 8] and [11, 15] should be certified, the others aren't for slot in 5..9 { @@ -1329,35 +1408,67 @@ mod tests { #[test] fn test_add_multiple_votes() { - let (validator_keypairs, mut pool) = create_keypairs_and_pool(); + let (validator_keypairs, mut pool, bank_forks) = create_initial_state(); // 10 validators, half vote for (5, 15), the other (20, 30) for rank in 0..5 { - add_skip_vote_range(&mut pool, 5, 15, &validator_keypairs, rank); + add_skip_vote_range( + &mut pool, + &bank_forks.read().unwrap().root_bank(), + 5, + 15, + &validator_keypairs, + rank, + ); } for rank in 5..10 { - add_skip_vote_range(&mut pool, 20, 30, &validator_keypairs, rank); + add_skip_vote_range( + &mut pool, + &bank_forks.read().unwrap().root_bank(), + 20, + 30, + &validator_keypairs, + rank, + ); } assert_eq!(pool.highest_skip_slot(), 0); // Now the first half vote for (5, 30) for rank in 0..5 { - add_skip_vote_range(&mut pool, 5, 30, &validator_keypairs, rank); + add_skip_vote_range( + &mut pool, + &bank_forks.read().unwrap().root_bank(), + 5, + 30, + &validator_keypairs, + rank, + ); } assert_single_certificate_range(&pool, 20, 30); } #[test] fn test_add_multiple_disjoint_votes() { - let (validator_keypairs, mut pool) = create_keypairs_and_pool(); + let (validator_keypairs, mut pool, bank_forks) = create_initial_state(); // 50% of the validators vote for (1, 10) for rank in 0..5 { - add_skip_vote_range(&mut pool, 1, 10, &validator_keypairs, rank); + add_skip_vote_range( + &mut pool, + &bank_forks.read().unwrap().root_bank(), + 1, + 10, + &validator_keypairs, + rank, + ); } + let bank = bank_forks.read().unwrap().root_bank(); // 10% vote for skip 2 let vote = Vote::new_skip_vote(2); assert!(pool .add_message( + bank.epoch_schedule(), + bank.epoch_stakes_map(), + bank.slot(), &Pubkey::new_unique(), &dummy_transaction(&validator_keypairs, &vote, 6), &mut vec![] @@ -1370,6 +1481,9 @@ mod tests { let vote = Vote::new_skip_vote(4); assert!(pool .add_message( + bank.epoch_schedule(), + bank.epoch_stakes_map(), + bank.slot(), &Pubkey::new_unique(), &dummy_transaction(&validator_keypairs, &vote, 7), &mut vec![] @@ -1383,6 +1497,9 @@ mod tests { let vote = Vote::new_skip_vote(3); assert!(pool .add_message( + bank.epoch_schedule(), + bank.epoch_stakes_map(), + bank.slot(), &Pubkey::new_unique(), &dummy_transaction(&validator_keypairs, &vote, 8), &mut vec![] @@ -1392,7 +1509,14 @@ mod tests { assert_single_certificate_range(&pool, 2, 4); assert!(pool.skip_certified(3)); // Let the last 10% vote for (3, 10) now - add_skip_vote_range(&mut pool, 3, 10, &validator_keypairs, 8); + add_skip_vote_range( + &mut pool, + &bank_forks.read().unwrap().root_bank(), + 3, + 10, + &validator_keypairs, + 8, + ); assert_eq!(pool.highest_skip_slot(), 10); assert_single_certificate_range(&pool, 2, 10); assert!(pool.skip_certified(7)); @@ -1400,35 +1524,54 @@ mod tests { #[test] fn test_update_existing_singleton_vote() { - let (validator_keypairs, mut pool) = create_keypairs_and_pool(); + let (validator_keypairs, mut pool, bank_forks) = create_initial_state(); // 50% voted on (1, 6) for rank in 0..5 { - add_skip_vote_range(&mut pool, 1, 6, &validator_keypairs, rank); + add_skip_vote_range( + &mut pool, + &bank_forks.read().unwrap().root_bank(), + 1, + 6, + &validator_keypairs, + rank, + ); } + let bank = bank_forks.read().unwrap().root_bank(); // Range expansion on a singleton vote should be ok let vote = Vote::new_skip_vote(1); assert!(pool .add_message( + bank.epoch_schedule(), + bank.epoch_stakes_map(), + bank.slot(), &Pubkey::new_unique(), &dummy_transaction(&validator_keypairs, &vote, 6), &mut vec![] ) .is_ok()); assert_eq!(pool.highest_skip_slot(), 1); - add_skip_vote_range(&mut pool, 1, 6, &validator_keypairs, 6); + add_skip_vote_range( + &mut pool, + &bank_forks.read().unwrap().root_bank(), + 1, + 6, + &validator_keypairs, + 6, + ); assert_eq!(pool.highest_skip_slot(), 6); assert_single_certificate_range(&pool, 1, 6); } #[test] fn test_update_existing_vote() { - let (validator_keypairs, mut pool) = create_keypairs_and_pool(); + let (validator_keypairs, mut pool, bank_forks) = create_initial_state(); + let bank = bank_forks.read().unwrap().root_bank(); // 50% voted for (10, 25) for rank in 0..5 { - add_skip_vote_range(&mut pool, 10, 25, &validator_keypairs, rank); + add_skip_vote_range(&mut pool, &bank, 10, 25, &validator_keypairs, rank); } - add_skip_vote_range(&mut pool, 10, 20, &validator_keypairs, 6); + add_skip_vote_range(&mut pool, &bank, 10, 20, &validator_keypairs, 6); assert_eq!(pool.highest_skip_slot(), 20); assert_single_certificate_range(&pool, 10, 20); @@ -1436,6 +1579,9 @@ mod tests { let vote = Vote::new_skip_vote(20); assert!(pool .add_message( + bank.epoch_schedule(), + bank.epoch_stakes_map(), + bank.slot(), &Pubkey::new_unique(), &dummy_transaction(&validator_keypairs, &vote, 6), &mut vec![] @@ -1445,13 +1591,27 @@ mod tests { #[test] fn test_threshold_not_reached() { - let (validator_keypairs, mut pool) = create_keypairs_and_pool(); + let (validator_keypairs, mut pool, bank_forks) = create_initial_state(); // half voted (5, 15) and the other half voted (20, 30) for rank in 0..5 { - add_skip_vote_range(&mut pool, 5, 15, &validator_keypairs, rank); + add_skip_vote_range( + &mut pool, + &bank_forks.read().unwrap().root_bank(), + 5, + 15, + &validator_keypairs, + rank, + ); } for rank in 5..10 { - add_skip_vote_range(&mut pool, 20, 30, &validator_keypairs, rank); + add_skip_vote_range( + &mut pool, + &bank_forks.read().unwrap().root_bank(), + 20, + 30, + &validator_keypairs, + rank, + ); } for slot in 5..31 { assert!(!pool.skip_certified(slot)); @@ -1460,13 +1620,27 @@ mod tests { #[test] fn test_update_and_skip_range_certify() { - let (validator_keypairs, mut pool) = create_keypairs_and_pool(); + let (validator_keypairs, mut pool, bank_forks) = create_initial_state(); // half voted (5, 15) and the other half voted (10, 30) for rank in 0..5 { - add_skip_vote_range(&mut pool, 5, 15, &validator_keypairs, rank); + add_skip_vote_range( + &mut pool, + &bank_forks.read().unwrap().root_bank(), + 5, + 15, + &validator_keypairs, + rank, + ); } for rank in 5..10 { - add_skip_vote_range(&mut pool, 10, 30, &validator_keypairs, rank); + add_skip_vote_range( + &mut pool, + &bank_forks.read().unwrap().root_bank(), + 10, + 30, + &validator_keypairs, + rank, + ); } for slot in 5..10 { assert!(!pool.skip_certified(slot)); @@ -1480,8 +1654,10 @@ mod tests { #[test] fn test_safe_to_notar() { solana_logger::setup(); - let (validator_keypairs, mut pool) = create_keypairs_and_pool(); - let (my_vote_key, _, _) = pool.get_key_and_stakes(0, 0).unwrap(); + let (validator_keypairs, mut pool, bank_forks) = create_initial_state(); + let bank = bank_forks.read().unwrap().root_bank(); + let (my_vote_key, _, _) = + get_key_and_stakes(bank.epoch_schedule(), bank.epoch_stakes_map(), 0, 0).unwrap(); // Create bank 2 let slot = 2; @@ -1492,6 +1668,9 @@ mod tests { let mut new_events = vec![]; assert!(pool .add_message( + bank.epoch_schedule(), + bank.epoch_stakes_map(), + bank.slot(), &my_vote_key, &dummy_transaction(&validator_keypairs, &vote, 0), &mut new_events @@ -1504,6 +1683,9 @@ mod tests { let vote = Vote::new_notarization_vote(2, block_id); assert!(pool .add_message( + bank.epoch_schedule(), + bank.epoch_stakes_map(), + bank.slot(), &Pubkey::new_unique(), &dummy_transaction(&validator_keypairs, &vote, rank), &mut new_events @@ -1528,6 +1710,9 @@ mod tests { let vote = Vote::new_notarization_vote(3, block_id); assert!(pool .add_message( + bank.epoch_schedule(), + bank.epoch_stakes_map(), + bank.slot(), &Pubkey::new_unique(), &dummy_transaction(&validator_keypairs, &vote, rank), &mut new_events @@ -1540,6 +1725,9 @@ mod tests { let vote = Vote::new_notarization_vote(3, Hash::new_unique()); assert!(pool .add_message( + bank.epoch_schedule(), + bank.epoch_stakes_map(), + bank.slot(), &my_vote_key, &dummy_transaction(&validator_keypairs, &vote, 0), &mut new_events @@ -1553,6 +1741,9 @@ mod tests { let vote = Vote::new_skip_vote(3); assert!(pool .add_message( + bank.epoch_schedule(), + bank.epoch_stakes_map(), + bank.slot(), &Pubkey::new_unique(), &dummy_transaction(&validator_keypairs, &vote, rank), &mut new_events @@ -1580,6 +1771,9 @@ mod tests { let vote = Vote::new_notarization_vote(3, duplicate_block_id); assert!(pool .add_message( + bank.epoch_schedule(), + bank.epoch_stakes_map(), + bank.slot(), &Pubkey::new_unique(), &dummy_transaction(&validator_keypairs, &vote, rank), &mut new_events @@ -1598,8 +1792,10 @@ mod tests { #[test] fn test_safe_to_skip() { - let (validator_keypairs, mut pool) = create_keypairs_and_pool(); - let (my_vote_key, _, _) = pool.get_key_and_stakes(0, 0).unwrap(); + let (validator_keypairs, mut pool, bank_forks) = create_initial_state(); + let bank = bank_forks.read().unwrap().root_bank(); + let (my_vote_key, _, _) = + get_key_and_stakes(bank.epoch_schedule(), bank.epoch_stakes_map(), 0, 0).unwrap(); let slot = 2; let mut new_events = vec![]; @@ -1608,6 +1804,9 @@ mod tests { let vote = Vote::new_notarization_vote(2, block_id); assert!(pool .add_message( + bank.epoch_schedule(), + bank.epoch_stakes_map(), + bank.slot(), &my_vote_key, &dummy_transaction(&validator_keypairs, &vote, 0), &mut new_events @@ -1620,6 +1819,9 @@ mod tests { let vote = Vote::new_skip_vote(2); assert!(pool .add_message( + bank.epoch_schedule(), + bank.epoch_stakes_map(), + bank.slot(), &Pubkey::new_unique(), &dummy_transaction(&validator_keypairs, &vote, rank), &mut new_events @@ -1637,6 +1839,9 @@ mod tests { let vote = Vote::new_notarization_vote(2, block_id); assert!(pool .add_message( + bank.epoch_schedule(), + bank.epoch_stakes_map(), + bank.slot(), &Pubkey::new_unique(), &dummy_transaction(&validator_keypairs, &vote, 6), &mut new_events @@ -1659,6 +1864,7 @@ mod tests { fn test_reject_conflicting_vote( pool: &mut CertificatePool, + bank: &Bank, validator_keypairs: &[ValidatorVoteKeypairs], vote_type_1: VoteType, vote_type_2: VoteType, @@ -1668,6 +1874,9 @@ mod tests { let vote_2 = create_new_vote(vote_type_2, slot); assert!(pool .add_message( + bank.epoch_schedule(), + bank.epoch_stakes_map(), + bank.slot(), &Pubkey::new_unique(), &dummy_transaction(validator_keypairs, &vote_1, 0), &mut vec![] @@ -1675,6 +1884,9 @@ mod tests { .is_ok()); assert!(pool .add_message( + bank.epoch_schedule(), + bank.epoch_stakes_map(), + bank.slot(), &Pubkey::new_unique(), &dummy_transaction(validator_keypairs, &vote_2, 0), &mut vec![] @@ -1684,7 +1896,7 @@ mod tests { #[test] fn test_reject_conflicting_votes_with_type() { - let (validator_keypairs, mut pool) = create_keypairs_and_pool(); + let (validator_keypairs, mut pool, bank_forks) = create_initial_state(); let mut slot = 2; for vote_type_1 in [ VoteType::Finalize, @@ -1697,6 +1909,7 @@ mod tests { for vote_type_2 in conflicting_vote_types { test_reject_conflicting_vote( &mut pool, + &bank_forks.read().unwrap().root_bank(), &validator_keypairs, vote_type_1, *vote_type_2, @@ -1713,21 +1926,24 @@ mod tests { .map(|_| ValidatorVoteKeypairs::new_rand()) .collect::>(); let bank_forks = create_bank_forks(&validator_keypairs); - let root_bank = bank_forks.read().unwrap().root_bank(); - let mut pool: CertificatePool = - CertificatePool::new_from_root_bank(Pubkey::new_unique(), &root_bank.clone(), None); - assert_eq!(pool.root(), 0); + let mut pool = CertificatePool::new_from_root_bank( + Pubkey::new_unique(), + &bank_forks.read().unwrap().root_bank(), + None, + ); + let root_bank = bank_forks.read().unwrap().root_bank(); let new_bank = Arc::new(create_bank(2, root_bank, &Pubkey::new_unique())); - pool.handle_new_root(new_bank.clone()); - assert_eq!(pool.root(), 2); + pool.prune_old_state(new_bank.slot()); let new_bank = Arc::new(create_bank(3, new_bank, &Pubkey::new_unique())); - pool.handle_new_root(new_bank); - assert_eq!(pool.root(), 3); + pool.prune_old_state(new_bank.slot()); // Send a vote on slot 1, it should be rejected let vote = Vote::new_skip_vote(1); assert!(pool .add_message( + new_bank.epoch_schedule(), + new_bank.epoch_stakes_map(), + new_bank.slot(), &Pubkey::new_unique(), &dummy_transaction(&validator_keypairs, &vote, 0), &mut vec![] @@ -1743,13 +1959,20 @@ mod tests { bitmap: BitVec::new(), }); assert!(pool - .add_message(&Pubkey::new_unique(), &cert, &mut vec![]) + .add_message( + new_bank.epoch_schedule(), + new_bank.epoch_stakes_map(), + new_bank.slot(), + &Pubkey::new_unique(), + &cert, + &mut vec![] + ) .is_err()); } #[test] fn test_get_certs_for_standstill() { - let (_, mut pool) = create_keypairs_and_pool(); + let (_, mut pool, bank_forks) = create_initial_state(); // Should return empty vector if no certificates assert!(pool.get_certs_for_standstill().is_empty()); @@ -1764,8 +1987,12 @@ mod tests { signature: BLSSignature::default(), bitmap: BitVec::new(), }; + let bank = bank_forks.read().unwrap().root_bank(); assert!(pool .add_message( + bank.epoch_schedule(), + bank.epoch_stakes_map(), + bank.slot(), &Pubkey::new_unique(), &BLSMessage::Certificate(cert_3.clone()), &mut vec![] @@ -1778,6 +2005,9 @@ mod tests { }; assert!(pool .add_message( + bank.epoch_schedule(), + bank.epoch_stakes_map(), + bank.slot(), &Pubkey::new_unique(), &BLSMessage::Certificate(cert_4.clone()), &mut vec![] @@ -1799,6 +2029,9 @@ mod tests { }; assert!(pool .add_message( + bank.epoch_schedule(), + bank.epoch_stakes_map(), + bank.slot(), &Pubkey::new_unique(), &BLSMessage::Certificate(cert_5.clone()), &mut vec![] @@ -1813,6 +2046,9 @@ mod tests { }; assert!(pool .add_message( + bank.epoch_schedule(), + bank.epoch_stakes_map(), + bank.slot(), &Pubkey::new_unique(), &BLSMessage::Certificate(cert_5_finalize.clone()), &mut vec![] @@ -1831,6 +2067,9 @@ mod tests { }; assert!(pool .add_message( + bank.epoch_schedule(), + bank.epoch_stakes_map(), + bank.slot(), &Pubkey::new_unique(), &BLSMessage::Certificate(cert_5.clone()), &mut vec![] @@ -1852,6 +2091,9 @@ mod tests { }; assert!(pool .add_message( + bank.epoch_schedule(), + bank.epoch_stakes_map(), + bank.slot(), &Pubkey::new_unique(), &BLSMessage::Certificate(cert_6.clone()), &mut vec![] @@ -1873,6 +2115,9 @@ mod tests { }; assert!(pool .add_message( + bank.epoch_schedule(), + bank.epoch_stakes_map(), + bank.slot(), &Pubkey::new_unique(), &BLSMessage::Certificate(cert_6_finalize.clone()), &mut vec![] @@ -1890,6 +2135,9 @@ mod tests { }; assert!(pool .add_message( + bank.epoch_schedule(), + bank.epoch_stakes_map(), + bank.slot(), &Pubkey::new_unique(), &BLSMessage::Certificate(cert_6_notarize_fallback.clone()), &mut vec![] @@ -1912,6 +2160,9 @@ mod tests { }; assert!(pool .add_message( + bank.epoch_schedule(), + bank.epoch_stakes_map(), + bank.slot(), &Pubkey::new_unique(), &BLSMessage::Certificate(cert_7.clone()), &mut vec![] diff --git a/votor/src/certificate_pool_service.rs b/votor/src/certificate_pool_service.rs index 49bf79e119..f762b40052 100644 --- a/votor/src/certificate_pool_service.rs +++ b/votor/src/certificate_pool_service.rs @@ -25,7 +25,7 @@ use { }, solana_pubkey::Pubkey, solana_runtime::{ - root_bank_cache::RootBankCache, vote_sender_types::BLSVerifiedMessageReceiver, + bank::Bank, root_bank_cache::RootBankCache, vote_sender_types::BLSVerifiedMessageReceiver, }, solana_votor_messages::bls_message::{BLSMessage, CertificateMessage}, stats::CertificatePoolServiceStats, @@ -86,7 +86,6 @@ impl CertificatePoolService { bls_sender: &Sender, new_finalized_slot: Option, new_certificates_to_send: Vec>, - current_root: &mut Slot, standstill_timer: &mut Instant, stats: &mut CertificatePoolServiceStats, ) -> Result<(), AddVoteError> { @@ -95,14 +94,8 @@ impl CertificatePoolService { // Reset standstill timer *standstill_timer = Instant::now(); CertificatePoolServiceStats::incr_u16(&mut stats.new_finalized_slot); - // Set root - let root_bank = root_bank_cache.root_bank(); - if root_bank.slot() > *current_root { - CertificatePoolServiceStats::incr_u16(&mut stats.new_root); - *current_root = root_bank.slot(); - cert_pool.handle_new_root(root_bank); - } } + cert_pool.prune_old_state(root_bank_cache.root_bank().slot()); // Send new certificates to peers Self::send_certificates(bls_sender, new_certificates_to_send, stats) } @@ -141,7 +134,6 @@ impl CertificatePoolService { message: &BLSMessage, cert_pool: &mut CertificatePool, events: &mut Vec, - current_root: &mut Slot, standstill_timer: &mut Instant, stats: &mut CertificatePoolServiceStats, ) -> Result<(), AddVoteError> { @@ -155,6 +147,7 @@ impl CertificatePoolService { } let (new_finalized_slot, new_certificates_to_send) = Self::add_message_and_maybe_update_commitment( + &ctx.root_bank_cache.root_bank(), &ctx.my_pubkey, &ctx.my_vote_pubkey, message, @@ -168,7 +161,6 @@ impl CertificatePoolService { &ctx.bls_sender, new_finalized_slot, new_certificates_to_send, - current_root, standstill_timer, stats, ) @@ -198,7 +190,6 @@ impl CertificatePoolService { info!("{}: Certificate pool loop initialized", &ctx.my_pubkey); Votor::wait_for_migration_or_exit(&ctx.exit, &ctx.start); info!("{}: Certificate pool loop starting", &ctx.my_pubkey); - let mut current_root = ctx.root_bank_cache.root_bank().slot(); let mut stats = CertificatePoolServiceStats::new(); // Standstill tracking @@ -271,7 +262,6 @@ impl CertificatePoolService { &message, &mut cert_pool, &mut events, - &mut current_root, &mut standstill_timer, &mut stats, ) { @@ -296,6 +286,7 @@ impl CertificatePoolService { /// /// If a new finalization slot was recognized, returns the slot fn add_message_and_maybe_update_commitment( + root_bank: &Bank, my_pubkey: &Pubkey, my_vote_pubkey: &Pubkey, message: &BLSMessage, @@ -303,8 +294,14 @@ impl CertificatePoolService { votor_events: &mut Vec, commitment_sender: &Sender, ) -> Result<(Option, Vec>), AddVoteError> { - let (new_finalized_slot, new_certificates_to_send) = - cert_pool.add_message(my_vote_pubkey, message, votor_events)?; + let (new_finalized_slot, new_certificates_to_send) = cert_pool.add_message( + root_bank.epoch_schedule(), + root_bank.epoch_stakes_map(), + root_bank.slot(), + my_vote_pubkey, + message, + votor_events, + )?; let Some(new_finalized_slot) = new_finalized_slot else { return Ok((None, new_certificates_to_send)); }; diff --git a/votor/src/certificate_pool_service/stats.rs b/votor/src/certificate_pool_service/stats.rs index d6b8e55278..6f3ca217e1 100644 --- a/votor/src/certificate_pool_service/stats.rs +++ b/votor/src/certificate_pool_service/stats.rs @@ -11,7 +11,6 @@ pub(crate) struct CertificatePoolServiceStats { pub(crate) certificates_sent: u16, pub(crate) certificates_dropped: u16, pub(crate) new_finalized_slot: u16, - pub(crate) new_root: u16, pub(crate) parent_ready_missed_window: u16, pub(crate) parent_ready_produce_window: u16, pub(crate) received_votes: u32, @@ -27,7 +26,6 @@ impl CertificatePoolServiceStats { certificates_sent: 0, certificates_dropped: 0, new_finalized_slot: 0, - new_root: 0, parent_ready_missed_window: 0, parent_ready_produce_window: 0, received_votes: 0, @@ -50,7 +48,6 @@ impl CertificatePoolServiceStats { self.certificates_sent = 0; self.certificates_dropped = 0; self.new_finalized_slot = 0; - self.new_root = 0; self.parent_ready_missed_window = 0; self.parent_ready_produce_window = 0; self.received_votes = 0; @@ -65,7 +62,6 @@ impl CertificatePoolServiceStats { ("add_message_failed", self.add_message_failed, i64), ("certificates_sent", self.certificates_sent, i64), ("certificates_dropped", self.certificates_dropped, i64), - ("new_root", self.new_root, i64), ("new_finalized_slot", self.new_finalized_slot, i64), ( "parent_ready_missed_window",