diff --git a/ledger/src/blockstore.rs b/ledger/src/blockstore.rs index 36b1815f633089..6b7d923217e397 100644 --- a/ledger/src/blockstore.rs +++ b/ledger/src/blockstore.rs @@ -5402,7 +5402,7 @@ pub mod tests { super::*, crate::{ genesis_utils::{create_genesis_config, GenesisConfigInfo}, - leader_schedule::{FixedSchedule, LeaderSchedule}, + leader_schedule::{FixedSchedule, IdentityKeyedLeaderSchedule}, shred::{max_ticks_per_n_shreds, ShredFlags, LEGACY_SHRED_DATA_CAPACITY}, }, assert_matches::assert_matches, @@ -10508,9 +10508,9 @@ pub mod tests { let bank = Arc::new(Bank::new_for_tests(&genesis_config)); let mut leader_schedule_cache = LeaderScheduleCache::new_from_bank(&bank); let fixed_schedule = FixedSchedule { - leader_schedule: Arc::new(LeaderSchedule::new_from_schedule(vec![ - leader_keypair.pubkey() - ])), + leader_schedule: Arc::new(Box::new(IdentityKeyedLeaderSchedule::new_from_schedule( + vec![leader_keypair.pubkey()], + ))), }; leader_schedule_cache.set_fixed_leader_schedule(Some(fixed_schedule)); diff --git a/ledger/src/leader_schedule.rs b/ledger/src/leader_schedule.rs index 04f7817e7ce0c2..32191787e7cf6c 100644 --- a/ledger/src/leader_schedule.rs +++ b/ledger/src/leader_schedule.rs @@ -1,5 +1,4 @@ use { - itertools::Itertools, rand::distributions::{Distribution, WeightedIndex}, rand_chacha::{rand_core::SeedableRng, ChaChaRng}, solana_pubkey::Pubkey, @@ -7,6 +6,9 @@ use { std::{collections::HashMap, convert::identity, ops::Index, sync::Arc}, }; +mod identity_keyed; +pub use identity_keyed::LeaderSchedule as IdentityKeyedLeaderSchedule; + // Used for testing #[derive(Clone, Debug)] pub struct FixedSchedule { @@ -14,88 +16,25 @@ pub struct FixedSchedule { } /// Stake-weighted leader schedule for one epoch. -#[derive(Debug, Default, PartialEq, Eq, Clone)] -pub struct LeaderSchedule { - slot_leaders: Vec, - // Inverted index from pubkeys to indices where they are the leader. - index: HashMap>>, -} - -impl LeaderSchedule { - // Note: passing in zero stakers will cause a panic. - pub fn new_keyed_by_validator_identity( - epoch_staked_nodes: &HashMap, - epoch: Epoch, - len: u64, - repeat: u64, - ) -> Self { - let keyed_stakes: Vec<_> = epoch_staked_nodes - .iter() - .map(|(pubkey, stake)| (pubkey, *stake)) - .collect(); - let slot_leaders = Self::stake_weighted_slot_leaders(keyed_stakes, epoch, len, repeat); - Self::new_from_schedule(slot_leaders) - } - - // Note: passing in zero stakers will cause a panic. - fn stake_weighted_slot_leaders( - mut keyed_stakes: Vec<(&Pubkey, u64)>, - epoch: Epoch, - len: u64, - repeat: u64, - ) -> Vec { - sort_stakes(&mut keyed_stakes); - let (keys, stakes): (Vec<_>, Vec<_>) = keyed_stakes.into_iter().unzip(); - let weighted_index = WeightedIndex::new(stakes).unwrap(); - let mut seed = [0u8; 32]; - seed[0..8].copy_from_slice(&epoch.to_le_bytes()); - let rng = &mut ChaChaRng::from_seed(seed); - let mut current_slot_leader = Pubkey::default(); - (0..len) - .map(|i| { - if i % repeat == 0 { - current_slot_leader = keys[weighted_index.sample(rng)]; - } - current_slot_leader - }) - .collect() - } - - pub fn new_from_schedule(slot_leaders: Vec) -> Self { - Self { - index: Self::index_from_slot_leaders(&slot_leaders), - slot_leaders, - } - } - - fn index_from_slot_leaders(slot_leaders: &[Pubkey]) -> HashMap>> { - slot_leaders - .iter() - .enumerate() - .map(|(i, pk)| (*pk, i)) - .into_group_map() - .into_iter() - .map(|(k, v)| (k, Arc::new(v))) - .collect() - } - - pub fn get_slot_leaders(&self) -> &[Pubkey] { - &self.slot_leaders - } +pub type LeaderSchedule = Box; - pub fn num_slots(&self) -> usize { - self.slot_leaders.len() - } +pub trait LeaderScheduleVariant: + std::fmt::Debug + Send + Sync + Index +{ + fn get_slot_leaders(&self) -> &[Pubkey]; + fn get_leader_slots_map(&self) -> &HashMap>>; - /// 'offset' is an index into the leader schedule. The function returns an - /// iterator of indices i >= offset where the given pubkey is the leader. - pub(crate) fn get_indices( + fn get_leader_upcoming_slots( &self, pubkey: &Pubkey, offset: usize, // Starting index. - ) -> impl Iterator { - let index = self.index.get(pubkey).cloned().unwrap_or_default(); - let num_slots = self.slot_leaders.len(); + ) -> Box> { + let index = self + .get_leader_slots_map() + .get(pubkey) + .cloned() + .unwrap_or_default(); + let num_slots = self.num_slots(); let size = index.len(); #[allow(clippy::reversed_empty_ranges)] let range = if index.is_empty() { @@ -111,18 +50,38 @@ impl LeaderSchedule { // for LeaderSchedule, where the schedule keeps repeating endlessly. // The '%' returns where in a cycle we are and the '/' returns how many // times the schedule is repeated. - range.map(move |k| index[k % size] + k / size * num_slots) + Box::new(range.map(move |k| index[k % size] + k / size * num_slots)) } -} -impl Index for LeaderSchedule { - type Output = Pubkey; - fn index(&self, index: u64) -> &Pubkey { - let index = index as usize; - &self.slot_leaders[index % self.slot_leaders.len()] + fn num_slots(&self) -> usize { + self.get_slot_leaders().len() } } +// Note: passing in zero stakers will cause a panic. +fn stake_weighted_slot_leaders( + mut keyed_stakes: Vec<(&Pubkey, u64)>, + epoch: Epoch, + len: u64, + repeat: u64, +) -> Vec { + sort_stakes(&mut keyed_stakes); + let (keys, stakes): (Vec<_>, Vec<_>) = keyed_stakes.into_iter().unzip(); + let weighted_index = WeightedIndex::new(stakes).unwrap(); + let mut seed = [0u8; 32]; + seed[0..8].copy_from_slice(&epoch.to_le_bytes()); + let rng = &mut ChaChaRng::from_seed(seed); + let mut current_slot_leader = Pubkey::default(); + (0..len) + .map(|i| { + if i % repeat == 0 { + current_slot_leader = keys[weighted_index.sample(rng)]; + } + current_slot_leader + }) + .collect() +} + fn sort_stakes(stakes: &mut Vec<(&Pubkey, u64)>) { // Sort first by stake. If stakes are the same, sort by pubkey to ensure a // deterministic result. @@ -141,110 +100,17 @@ fn sort_stakes(stakes: &mut Vec<(&Pubkey, u64)>) { #[cfg(test)] mod tests { - use {super::*, rand::Rng, std::iter::repeat_with}; - - #[test] - fn test_leader_schedule_index() { - let pubkey0 = solana_pubkey::new_rand(); - let pubkey1 = solana_pubkey::new_rand(); - let leader_schedule = LeaderSchedule::new_from_schedule(vec![pubkey0, pubkey1]); - assert_eq!(leader_schedule[0], pubkey0); - assert_eq!(leader_schedule[1], pubkey1); - assert_eq!(leader_schedule[2], pubkey0); - } - - #[test] - fn test_leader_schedule_basic() { - let num_keys = 10; - let stakes: HashMap<_, _> = (0..num_keys) - .map(|i| (solana_pubkey::new_rand(), i)) - .collect(); - - let epoch: Epoch = rand::random(); - let len = num_keys * 10; - let leader_schedule = - LeaderSchedule::new_keyed_by_validator_identity(&stakes, epoch, len, 1); - let leader_schedule2 = - LeaderSchedule::new_keyed_by_validator_identity(&stakes, epoch, len, 1); - assert_eq!(leader_schedule.num_slots() as u64, len); - // Check that the same schedule is reproducibly generated - assert_eq!(leader_schedule, leader_schedule2); - } - - #[test] - fn test_repeated_leader_schedule() { - let num_keys = 10; - let stakes: HashMap<_, _> = (0..num_keys) - .map(|i| (solana_pubkey::new_rand(), i)) - .collect(); - - let epoch = rand::random::(); - let len = num_keys * 10; - let repeat = 8; - let leader_schedule = - LeaderSchedule::new_keyed_by_validator_identity(&stakes, epoch, len, repeat); - assert_eq!(leader_schedule.num_slots() as u64, len); - let mut leader_node = Pubkey::default(); - for (i, node) in leader_schedule.slot_leaders.iter().enumerate() { - if i % repeat as usize == 0 { - leader_node = *node; - } else { - assert_eq!(leader_node, *node); - } - } - } - - #[test] - fn test_repeated_leader_schedule_specific() { - let alice_pubkey = solana_pubkey::new_rand(); - let bob_pubkey = solana_pubkey::new_rand(); - let stakes: HashMap<_, _> = [(alice_pubkey, 2), (bob_pubkey, 1)].into_iter().collect(); - - let epoch = 0; - let len = 8; - // What the schedule looks like without any repeats - let leaders1 = - LeaderSchedule::new_keyed_by_validator_identity(&stakes, epoch, len, 1).slot_leaders; - - // What the schedule looks like with repeats - let leaders2 = - LeaderSchedule::new_keyed_by_validator_identity(&stakes, epoch, len, 2).slot_leaders; - assert_eq!(leaders1.len(), leaders2.len()); - - let leaders1_expected = vec![ - alice_pubkey, - alice_pubkey, - alice_pubkey, - bob_pubkey, - alice_pubkey, - alice_pubkey, - alice_pubkey, - alice_pubkey, - ]; - let leaders2_expected = vec![ - alice_pubkey, - alice_pubkey, - alice_pubkey, - alice_pubkey, - alice_pubkey, - alice_pubkey, - bob_pubkey, - bob_pubkey, - ]; - - assert_eq!(leaders1, leaders1_expected); - assert_eq!(leaders2, leaders2_expected); - } + use {super::*, itertools::Itertools, rand::Rng, std::iter::repeat_with}; #[test] - fn test_get_indices() { + fn test_get_leader_upcoming_slots() { const NUM_SLOTS: usize = 97; let mut rng = rand::thread_rng(); let pubkeys: Vec<_> = repeat_with(Pubkey::new_unique).take(4).collect(); let schedule: Vec<_> = repeat_with(|| pubkeys[rng.gen_range(0..3)]) .take(19) .collect(); - let schedule = LeaderSchedule::new_from_schedule(schedule); + let schedule = IdentityKeyedLeaderSchedule::new_from_schedule(schedule); let leaders = (0..NUM_SLOTS) .map(|i| (schedule[i as u64], i)) .into_group_map(); @@ -252,7 +118,7 @@ mod tests { let index = leaders.get(pubkey).cloned().unwrap_or_default(); for offset in 0..NUM_SLOTS { let schedule: Vec<_> = schedule - .get_indices(pubkey, offset) + .get_leader_upcoming_slots(pubkey, offset) .take_while(|s| *s < NUM_SLOTS) .collect(); let index: Vec<_> = index.iter().copied().skip_while(|s| *s < offset).collect(); diff --git a/ledger/src/leader_schedule/identity_keyed.rs b/ledger/src/leader_schedule/identity_keyed.rs new file mode 100644 index 00000000000000..c5ded743d64e78 --- /dev/null +++ b/ledger/src/leader_schedule/identity_keyed.rs @@ -0,0 +1,167 @@ +use { + super::{stake_weighted_slot_leaders, LeaderScheduleVariant}, + itertools::Itertools, + solana_pubkey::Pubkey, + solana_sdk::clock::Epoch, + std::{collections::HashMap, ops::Index, sync::Arc}, +}; + +#[derive(Default, Debug, PartialEq, Eq, Clone)] +pub struct LeaderSchedule { + slot_leaders: Vec, + // Inverted index from pubkeys to indices where they are the leader. + leader_slots_map: HashMap>>, +} + +impl LeaderSchedule { + // Note: passing in zero stakers will cause a panic. + pub fn new( + epoch_staked_nodes: &HashMap, + epoch: Epoch, + len: u64, + repeat: u64, + ) -> Self { + let keyed_stakes: Vec<_> = epoch_staked_nodes + .iter() + .map(|(pubkey, stake)| (pubkey, *stake)) + .collect(); + let slot_leaders = stake_weighted_slot_leaders(keyed_stakes, epoch, len, repeat); + Self::new_from_schedule(slot_leaders) + } + + pub fn new_from_schedule(slot_leaders: Vec) -> Self { + Self { + leader_slots_map: Self::invert_slot_leaders(&slot_leaders), + slot_leaders, + } + } + + fn invert_slot_leaders(slot_leaders: &[Pubkey]) -> HashMap>> { + slot_leaders + .iter() + .enumerate() + .map(|(i, pk)| (*pk, i)) + .into_group_map() + .into_iter() + .map(|(k, v)| (k, Arc::new(v))) + .collect() + } + + pub fn get_slot_leaders(&self) -> &[Pubkey] { + &self.slot_leaders + } +} + +impl LeaderScheduleVariant for LeaderSchedule { + fn get_slot_leaders(&self) -> &[Pubkey] { + &self.slot_leaders + } + + fn get_leader_slots_map(&self) -> &HashMap>> { + &self.leader_slots_map + } +} + +impl Index for LeaderSchedule { + type Output = Pubkey; + fn index(&self, index: u64) -> &Pubkey { + &self.get_slot_leaders()[index as usize % self.num_slots()] + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_leader_schedule_index() { + let pubkey0 = solana_pubkey::new_rand(); + let pubkey1 = solana_pubkey::new_rand(); + let leader_schedule = LeaderSchedule::new_from_schedule(vec![pubkey0, pubkey1]); + assert_eq!(leader_schedule[0], pubkey0); + assert_eq!(leader_schedule[1], pubkey1); + assert_eq!(leader_schedule[2], pubkey0); + } + + #[test] + fn test_leader_schedule_basic() { + let num_keys = 10; + let stakes: HashMap<_, _> = (0..num_keys) + .map(|i| (solana_pubkey::new_rand(), i)) + .collect(); + + let epoch: Epoch = rand::random(); + let len = num_keys * 10; + let leader_schedule = LeaderSchedule::new(&stakes, epoch, len, 1); + let leader_schedule2 = LeaderSchedule::new(&stakes, epoch, len, 1); + assert_eq!(leader_schedule.num_slots() as u64, len); + // Check that the same schedule is reproducibly generated + assert_eq!(leader_schedule, leader_schedule2); + } + + #[test] + fn test_repeated_leader_schedule() { + let num_keys = 10; + let stakes: HashMap<_, _> = (0..num_keys) + .map(|i| (solana_pubkey::new_rand(), i)) + .collect(); + + let epoch = rand::random::(); + let len = num_keys * 10; + let repeat = 8; + let leader_schedule = LeaderSchedule::new(&stakes, epoch, len, repeat); + assert_eq!(leader_schedule.num_slots() as u64, len); + let mut leader_node = Pubkey::default(); + for (i, node) in leader_schedule.get_slot_leaders().iter().enumerate() { + if i % repeat as usize == 0 { + leader_node = *node; + } else { + assert_eq!(leader_node, *node); + } + } + } + + #[test] + fn test_repeated_leader_schedule_specific() { + let alice_pubkey = solana_pubkey::new_rand(); + let bob_pubkey = solana_pubkey::new_rand(); + let stakes: HashMap<_, _> = [(alice_pubkey, 2), (bob_pubkey, 1)].into_iter().collect(); + + let epoch = 0; + let len = 8; + // What the schedule looks like without any repeats + let leaders1 = LeaderSchedule::new(&stakes, epoch, len, 1) + .get_slot_leaders() + .to_vec(); + + // What the schedule looks like with repeats + let leaders2 = LeaderSchedule::new(&stakes, epoch, len, 2) + .get_slot_leaders() + .to_vec(); + assert_eq!(leaders1.len(), leaders2.len()); + + let leaders1_expected = vec![ + alice_pubkey, + alice_pubkey, + alice_pubkey, + bob_pubkey, + alice_pubkey, + alice_pubkey, + alice_pubkey, + alice_pubkey, + ]; + let leaders2_expected = vec![ + alice_pubkey, + alice_pubkey, + alice_pubkey, + alice_pubkey, + alice_pubkey, + alice_pubkey, + bob_pubkey, + bob_pubkey, + ]; + + assert_eq!(leaders1, leaders1_expected); + assert_eq!(leaders2, leaders2_expected); + } +} diff --git a/ledger/src/leader_schedule_cache.rs b/ledger/src/leader_schedule_cache.rs index 2fa7867a047ea9..43ba852c3ecbf9 100644 --- a/ledger/src/leader_schedule_cache.rs +++ b/ledger/src/leader_schedule_cache.rs @@ -134,7 +134,7 @@ impl LeaderScheduleCache { let num_slots = bank.get_slots_in_epoch(k) as usize; let first_slot = bank.epoch_schedule().get_first_slot_in_epoch(k); leader_schedule - .get_indices(pubkey, offset) + .get_leader_upcoming_slots(pubkey, offset) .take_while(move |i| *i < num_slots) .map(move |i| i as Slot + first_slot) }) @@ -253,6 +253,7 @@ mod tests { create_genesis_config_with_leader, GenesisConfigInfo, }, get_tmp_ledger_path_auto_delete, + leader_schedule::IdentityKeyedLeaderSchedule, staking_utils::tests::setup_vote_and_stake_accounts, }, crossbeam_channel::unbounded, @@ -306,10 +307,13 @@ mod tests { #[test] fn test_retain_latest() { - let mut cached_schedules = HashMap::new(); + let mut cached_schedules: HashMap> = HashMap::new(); let mut order = VecDeque::new(); for i in 0..=MAX_SCHEDULES { - cached_schedules.insert(i as u64, Arc::new(LeaderSchedule::default())); + cached_schedules.insert( + i as u64, + Arc::new(Box::new(IdentityKeyedLeaderSchedule::default())), + ); order.push_back(i as u64); } LeaderScheduleCache::retain_latest(&mut cached_schedules, &mut order, MAX_SCHEDULES); diff --git a/ledger/src/leader_schedule_utils.rs b/ledger/src/leader_schedule_utils.rs index 2b4212a49156db..187c2bc414f513 100644 --- a/ledger/src/leader_schedule_utils.rs +++ b/ledger/src/leader_schedule_utils.rs @@ -1,5 +1,5 @@ use { - crate::leader_schedule::LeaderSchedule, + crate::leader_schedule::{IdentityKeyedLeaderSchedule, LeaderSchedule}, solana_runtime::bank::Bank, solana_sdk::{ clock::{Epoch, Slot, NUM_CONSECUTIVE_LEADER_SLOTS}, @@ -10,14 +10,15 @@ use { /// Return the leader schedule for the given epoch. pub fn leader_schedule(epoch: Epoch, bank: &Bank) -> Option { - bank.epoch_staked_nodes(epoch).map(|stakes| { - LeaderSchedule::new_keyed_by_validator_identity( - &stakes, - epoch, - bank.get_slots_in_epoch(epoch), - NUM_CONSECUTIVE_LEADER_SLOTS, - ) - }) + bank.epoch_staked_nodes(epoch) + .map(|stakes| -> LeaderSchedule { + Box::new(IdentityKeyedLeaderSchedule::new( + &stakes, + epoch, + bank.get_slots_in_epoch(epoch), + NUM_CONSECUTIVE_LEADER_SLOTS, + )) + }) } /// Map of leader base58 identity pubkeys to the slot indices relative to the first epoch slot @@ -80,7 +81,7 @@ mod tests { .iter() .map(|(pubkey, stake)| (*pubkey, *stake)) .collect(); - let leader_schedule = LeaderSchedule::new_keyed_by_validator_identity( + let leader_schedule = IdentityKeyedLeaderSchedule::new( &pubkeys_and_stakes, 0, genesis_config.epoch_schedule.slots_per_epoch, diff --git a/local-cluster/src/integration_tests.rs b/local-cluster/src/integration_tests.rs index e0f21d9efbe6d1..4c35d1bea91245 100644 --- a/local-cluster/src/integration_tests.rs +++ b/local-cluster/src/integration_tests.rs @@ -28,7 +28,7 @@ use { blockstore::{Blockstore, PurgeType}, blockstore_meta::DuplicateSlotProof, blockstore_options::{AccessType, BlockstoreOptions}, - leader_schedule::{FixedSchedule, LeaderSchedule}, + leader_schedule::{FixedSchedule, IdentityKeyedLeaderSchedule, LeaderSchedule}, }, solana_rpc_client::rpc_client::RpcClient, solana_runtime::{ @@ -273,7 +273,9 @@ pub fn create_custom_leader_schedule( } info!("leader_schedule: {}", leader_schedule.len()); - LeaderSchedule::new_from_schedule(leader_schedule) + Box::new(IdentityKeyedLeaderSchedule::new_from_schedule( + leader_schedule, + )) } pub fn create_custom_leader_schedule_with_random_keys( diff --git a/local-cluster/tests/local_cluster.rs b/local-cluster/tests/local_cluster.rs index 666491d3dcbc82..9a9f80ccbe839a 100644 --- a/local-cluster/tests/local_cluster.rs +++ b/local-cluster/tests/local_cluster.rs @@ -3291,13 +3291,15 @@ fn do_test_lockout_violation_with_or_without_tower(with_tower: bool) { let c_validator_to_slots = vec![(validator_b_pubkey, DEFAULT_SLOTS_PER_EPOCH as usize)]; let c_leader_schedule = create_custom_leader_schedule(c_validator_to_slots.into_iter()); - let leader_schedule = create_custom_leader_schedule(validator_to_slots.into_iter()); + let leader_schedule = Arc::new(create_custom_leader_schedule( + validator_to_slots.into_iter(), + )); for slot in 0..=validator_b_last_leader_slot { assert_eq!(leader_schedule[slot], validator_b_pubkey); } default_config.fixed_leader_schedule = Some(FixedSchedule { - leader_schedule: Arc::new(leader_schedule.clone()), + leader_schedule: leader_schedule.clone(), }); let mut validator_configs = make_identical_validator_configs(&default_config, node_stakes.len()); @@ -3419,9 +3421,7 @@ fn do_test_lockout_violation_with_or_without_tower(with_tower: bool) { info!("Restart validator C again!!!"); validator_c_info.config.voting_disabled = false; // C should now produce blocks - validator_c_info.config.fixed_leader_schedule = Some(FixedSchedule { - leader_schedule: Arc::new(leader_schedule), - }); + validator_c_info.config.fixed_leader_schedule = Some(FixedSchedule { leader_schedule }); cluster.restart_node( &validator_c_pubkey, validator_c_info, diff --git a/poh/src/poh_recorder.rs b/poh/src/poh_recorder.rs index 5a044777eba853..d633018958075c 100644 --- a/poh/src/poh_recorder.rs +++ b/poh/src/poh_recorder.rs @@ -1935,9 +1935,11 @@ mod tests { slot_leaders.extend(std::iter::repeat(leader_d_pubkey).take(consecutive_leader_slots)); let mut leader_schedule_cache = LeaderScheduleCache::new_from_bank(&bank); let fixed_schedule = solana_ledger::leader_schedule::FixedSchedule { - leader_schedule: Arc::new( - solana_ledger::leader_schedule::LeaderSchedule::new_from_schedule(slot_leaders), - ), + leader_schedule: Arc::new(Box::new( + solana_ledger::leader_schedule::IdentityKeyedLeaderSchedule::new_from_schedule( + slot_leaders, + ), + )), }; leader_schedule_cache.set_fixed_leader_schedule(Some(fixed_schedule));