diff --git a/core/src/banking_stage.rs b/core/src/banking_stage.rs index ceb7c1ffb76e14..b9a041836fa5f0 100644 --- a/core/src/banking_stage.rs +++ b/core/src/banking_stage.rs @@ -396,20 +396,27 @@ impl BankingStage { data_budget: &DataBudget, ) -> std::io::Result<()> { let packets = Self::filter_valid_packets_for_forwarding(buffered_packet_batches.iter()); - inc_new_counter_info!("banking_stage-forwarded_packets", packets.len()); const INTERVAL_MS: u64 = 100; const MAX_BYTES_PER_SECOND: usize = 10_000 * 1200; const MAX_BYTES_PER_INTERVAL: usize = MAX_BYTES_PER_SECOND * INTERVAL_MS as usize / 1000; const MAX_BYTES_BUDGET: usize = MAX_BYTES_PER_INTERVAL * 5; data_budget.update(INTERVAL_MS, |bytes| { - std::cmp::min(bytes + MAX_BYTES_PER_INTERVAL, MAX_BYTES_BUDGET) + std::cmp::min( + bytes.saturating_add(MAX_BYTES_PER_INTERVAL), + MAX_BYTES_BUDGET, + ) }); + + let mut forwarded_packet_count = 0; for p in packets { - if data_budget.take(p.meta.size) { + if !p.meta.forwarded && data_budget.take(p.meta.size) { + forwarded_packet_count = forwarded_packet_count.saturating_add(1); socket.send_to(&p.data[..p.meta.size], &tpu_forwards)?; } } + inc_new_counter_info!("banking_stage-forwarded_packets", forwarded_packet_count); + Ok(()) } @@ -1696,7 +1703,7 @@ mod tests { system_transaction, transaction::TransactionError, }, - solana_streamer::socket::SocketAddrSpace, + solana_streamer::{recvmmsg::recv_mmsg, socket::SocketAddrSpace}, solana_transaction_status::TransactionWithStatusMeta, solana_vote_program::vote_transaction, std::{ @@ -2978,16 +2985,15 @@ mod tests { fn test_forwarder_budget() { solana_logger::setup(); // Create `PacketBatch` with 1 unprocessed packet - let single_packet_batch = PacketBatch::new(vec![Packet::default()]); - let mut unprocessed_packets: UnprocessedPacketBatches = - vec![(single_packet_batch, vec![0], false)] - .into_iter() - .collect(); - - let cluster_info = new_test_cluster_info(Node::new_localhost().info); + let packet = Packet::from_data(None, &[0]).unwrap(); + let single_packet_batch = PacketBatch::new(vec![packet]); let genesis_config_info = create_slow_genesis_config(10_000); - let GenesisConfigInfo { genesis_config, .. } = &genesis_config_info; + let GenesisConfigInfo { + genesis_config, + validator_pubkey, + .. + } = &genesis_config_info; let bank = Arc::new(Bank::new_no_wallclock_throttle(genesis_config)); let ledger_path = get_tmp_ledger_path!(); @@ -3006,17 +3012,155 @@ mod tests { let (exit, poh_recorder, poh_service, _entry_receiver) = create_test_recorder(&bank, &blockstore, Some(poh_config)); - let socket = UdpSocket::bind("0.0.0.0:0").unwrap(); - let data_budget = DataBudget::default(); - BankingStage::handle_forwarding( - &ForwardOption::ForwardTransaction, - &cluster_info, - &mut unprocessed_packets, - &poh_recorder, - &socket, - false, - &data_budget, + let local_node = Node::new_localhost_with_pubkey(validator_pubkey); + let cluster_info = new_test_cluster_info(local_node.info); + let send_socket = UdpSocket::bind("0.0.0.0:0").unwrap(); + let recv_socket = &local_node.sockets.tpu_forwards[0]; + + let test_cases = vec![ + ("budget-restricted", DataBudget::restricted(), 0), + ("budget-available", DataBudget::default(), 1), + ]; + + for (name, data_budget, expected_num_forwarded) in test_cases { + let mut unprocessed_packet_batches: UnprocessedPacketBatches = + vec![(single_packet_batch.clone(), vec![0], false)] + .into_iter() + .collect(); + BankingStage::handle_forwarding( + &ForwardOption::ForwardTransaction, + &cluster_info, + &mut unprocessed_packet_batches, + &poh_recorder, + &send_socket, + true, + &data_budget, + ); + + recv_socket + .set_nonblocking(expected_num_forwarded == 0) + .unwrap(); + + let mut packets = vec![Packet::default(); 2]; + let (_, num_received) = + recv_mmsg(recv_socket, &mut packets[..]).unwrap_or_default(); + assert_eq!(num_received, expected_num_forwarded, "{}", name); + } + + exit.store(true, Ordering::Relaxed); + poh_service.join().unwrap(); + } + Blockstore::destroy(&ledger_path).unwrap(); + } + + #[test] + fn test_handle_forwarding() { + solana_logger::setup(); + + const FWD_PACKET: u8 = 1; + let forwarded_packet = { + let mut packet = Packet::from_data(None, &[FWD_PACKET]).unwrap(); + packet.meta.forwarded = true; + packet + }; + + const NORMAL_PACKET: u8 = 2; + let normal_packet = Packet::from_data(None, &[NORMAL_PACKET]).unwrap(); + + let packet_batch = PacketBatch::new(vec![forwarded_packet, normal_packet]); + let mut unprocessed_packet_batches: UnprocessedPacketBatches = + vec![(packet_batch, vec![0, 1], false)] + .into_iter() + .collect(); + + let genesis_config_info = create_slow_genesis_config(10_000); + let GenesisConfigInfo { + genesis_config, + validator_pubkey, + .. + } = &genesis_config_info; + let bank = Arc::new(Bank::new_no_wallclock_throttle_for_tests(genesis_config)); + let ledger_path = get_tmp_ledger_path!(); + { + let blockstore = Arc::new( + Blockstore::open(&ledger_path) + .expect("Expected to be able to open database ledger"), ); + let poh_config = PohConfig { + // limit tick count to avoid clearing working_bank at + // PohRecord then PohRecorderError(MaxHeightReached) at BankingStage + target_tick_count: Some(bank.max_tick_height() - 1), + ..PohConfig::default() + }; + + let (exit, poh_recorder, poh_service, _entry_receiver) = + create_test_recorder(&bank, &blockstore, Some(poh_config)); + + let local_node = Node::new_localhost_with_pubkey(validator_pubkey); + let cluster_info = new_test_cluster_info(local_node.info); + let send_socket = UdpSocket::bind("0.0.0.0:0").unwrap(); + let recv_socket = &local_node.sockets.tpu_forwards[0]; + + let test_cases = vec![ + ("not-forward", ForwardOption::NotForward, true, vec![], 2), + ( + "fwd-normal", + ForwardOption::ForwardTransaction, + true, + vec![NORMAL_PACKET], + 2, + ), + ( + "fwd-no-op", + ForwardOption::ForwardTransaction, + true, + vec![], + 2, + ), + ( + "fwd-no-hold", + ForwardOption::ForwardTransaction, + false, + vec![], + 0, + ), + ]; + + for (name, forward_option, hold, expected_ids, expected_num_unprocessed) in test_cases { + BankingStage::handle_forwarding( + &forward_option, + &cluster_info, + &mut unprocessed_packet_batches, + &poh_recorder, + &send_socket, + hold, + &DataBudget::default(), + ); + + recv_socket + .set_nonblocking(expected_ids.is_empty()) + .unwrap(); + + let mut packets = vec![Packet::default(); 2]; + let (_, num_received) = + recv_mmsg(recv_socket, &mut packets[..]).unwrap_or_default(); + assert_eq!(num_received, expected_ids.len(), "{}", name); + for (i, expected_id) in expected_ids.iter().enumerate() { + assert_eq!(packets[i].meta.size, 1); + assert_eq!(packets[i].data[0], *expected_id, "{}", name); + } + + let num_unprocessed_packets: usize = unprocessed_packet_batches + .iter() + .map(|(b, ..)| b.packets.len()) + .sum(); + assert_eq!( + num_unprocessed_packets, expected_num_unprocessed, + "{}", + name + ); + } + exit.store(true, Ordering::Relaxed); poh_service.join().unwrap(); } diff --git a/core/src/commitment_service.rs b/core/src/commitment_service.rs index 6d2dc74ce46c64..322ea233b0ba1f 100644 --- a/core/src/commitment_service.rs +++ b/core/src/commitment_service.rs @@ -506,11 +506,7 @@ mod tests { let validator_vote_keypairs = ValidatorVoteKeypairs::new_rand(); let validator_keypairs = vec![&validator_vote_keypairs]; - let GenesisConfigInfo { - genesis_config, - mint_keypair: _, - voting_keypair: _, - } = create_genesis_config_with_vote_accounts( + let GenesisConfigInfo { genesis_config, .. } = create_genesis_config_with_vote_accounts( 1_000_000_000, &validator_keypairs, vec![100; 1], diff --git a/core/src/fetch_stage.rs b/core/src/fetch_stage.rs index d890a9ed35444b..a1af7e7160dfd2 100644 --- a/core/src/fetch_stage.rs +++ b/core/src/fetch_stage.rs @@ -8,7 +8,7 @@ use { solana_metrics::{inc_new_counter_debug, inc_new_counter_info}, solana_perf::{packet::PacketBatchRecycler, recycler::Recycler}, solana_poh::poh_recorder::PohRecorder, - solana_sdk::clock::DEFAULT_TICKS_PER_SLOT, + solana_sdk::{clock::DEFAULT_TICKS_PER_SLOT, packet::Packet}, solana_streamer::streamer::{self, PacketBatchReceiver, PacketBatchSender}, std::{ net::UdpSocket, @@ -83,10 +83,16 @@ impl FetchStage { sendr: &PacketBatchSender, poh_recorder: &Arc>, ) -> Result<()> { - let packet_batch = recvr.recv()?; + let mark_forwarded = |packet: &mut Packet| { + packet.meta.forwarded = true; + }; + + let mut packet_batch = recvr.recv()?; let mut num_packets = packet_batch.packets.len(); + packet_batch.packets.iter_mut().for_each(mark_forwarded); let mut packet_batches = vec![packet_batch]; - while let Ok(packet_batch) = recvr.try_recv() { + while let Ok(mut packet_batch) = recvr.try_recv() { + packet_batch.packets.iter_mut().for_each(mark_forwarded); num_packets += packet_batch.packets.len(); packet_batches.push(packet_batch); // Read at most 1K transactions in a loop @@ -114,7 +120,7 @@ impl FetchStage { } fn new_multi_socket( - sockets: Vec>, + tpu_sockets: Vec>, tpu_forwards_sockets: Vec>, tpu_vote_sockets: Vec>, exit: &Arc, @@ -125,7 +131,7 @@ impl FetchStage { ) -> Self { let recycler: PacketBatchRecycler = Recycler::warmed(1000, 1024); - let tpu_threads = sockets.into_iter().map(|socket| { + let tpu_threads = tpu_sockets.into_iter().map(|socket| { streamer::receiver( socket, exit, diff --git a/core/src/shred_fetch_stage.rs b/core/src/shred_fetch_stage.rs index ae93816d72fc41..0c8b7a6f3cef78 100644 --- a/core/src/shred_fetch_stage.rs +++ b/core/src/shred_fetch_stage.rs @@ -192,7 +192,7 @@ impl ShredFetchStage { recycler.clone(), bank_forks.clone(), "shred_fetch_tvu_forwards", - |p| p.meta.forward = true, + |p| p.meta.forwarded = true, ); let (repair_receiver, repair_handler) = Self::packet_modifier( diff --git a/core/src/vote_simulator.rs b/core/src/vote_simulator.rs new file mode 100644 index 00000000000000..c3cb9286a92bb7 --- /dev/null +++ b/core/src/vote_simulator.rs @@ -0,0 +1,372 @@ +use { + crate::{ + cluster_info_vote_listener::VoteTracker, + cluster_slot_state_verifier::{ + DuplicateSlotsTracker, EpochSlotsFrozenSlots, GossipDuplicateConfirmedSlots, + }, + cluster_slots::ClusterSlots, + consensus::Tower, + fork_choice::SelectVoteAndResetForkResult, + heaviest_subtree_fork_choice::HeaviestSubtreeForkChoice, + latest_validator_votes_for_frozen_banks::LatestValidatorVotesForFrozenBanks, + progress_map::{ForkProgress, ProgressMap}, + replay_stage::{HeaviestForkFailures, ReplayStage}, + unfrozen_gossip_verified_vote_hashes::UnfrozenGossipVerifiedVoteHashes, + }, + solana_runtime::{ + accounts_background_service::AbsRequestSender, + bank::Bank, + bank_forks::BankForks, + genesis_utils::{ + create_genesis_config_with_vote_accounts, GenesisConfigInfo, ValidatorVoteKeypairs, + }, + }, + solana_sdk::{clock::Slot, hash::Hash, pubkey::Pubkey, signature::Signer}, + solana_vote_program::vote_transaction, + std::{ + collections::{HashMap, HashSet}, + sync::{Arc, RwLock}, + }, + trees::{tr, Tree, TreeWalk}, +}; + +pub struct VoteSimulator { + pub validator_keypairs: HashMap, + pub node_pubkeys: Vec, + pub vote_pubkeys: Vec, + pub bank_forks: Arc>, + pub progress: ProgressMap, + pub heaviest_subtree_fork_choice: HeaviestSubtreeForkChoice, + pub latest_validator_votes_for_frozen_banks: LatestValidatorVotesForFrozenBanks, +} + +impl VoteSimulator { + pub fn new(num_keypairs: usize) -> Self { + let ( + validator_keypairs, + node_pubkeys, + vote_pubkeys, + bank_forks, + progress, + heaviest_subtree_fork_choice, + ) = Self::init_state(num_keypairs); + Self { + validator_keypairs, + node_pubkeys, + vote_pubkeys, + bank_forks: Arc::new(RwLock::new(bank_forks)), + progress, + heaviest_subtree_fork_choice, + latest_validator_votes_for_frozen_banks: LatestValidatorVotesForFrozenBanks::default(), + } + } + pub fn fill_bank_forks( + &mut self, + forks: Tree, + cluster_votes: &HashMap>, + is_frozen: bool, + ) { + let root = *forks.root().data(); + assert!(self.bank_forks.read().unwrap().get(root).is_some()); + + let mut walk = TreeWalk::from(forks); + + while let Some(visit) = walk.get() { + let slot = *visit.node().data(); + if self.bank_forks.read().unwrap().get(slot).is_some() { + walk.forward(); + continue; + } + let parent = *walk.get_parent().unwrap().data(); + let parent_bank = self.bank_forks.read().unwrap().get(parent).unwrap().clone(); + let new_bank = Bank::new_from_parent(&parent_bank, &Pubkey::default(), slot); + self.progress + .entry(slot) + .or_insert_with(|| ForkProgress::new(Hash::default(), None, None, 0, 0)); + for (pubkey, vote) in cluster_votes.iter() { + if vote.contains(&parent) { + let keypairs = self.validator_keypairs.get(pubkey).unwrap(); + let latest_blockhash = parent_bank.last_blockhash(); + let vote_tx = vote_transaction::new_vote_transaction( + // Must vote > root to be processed + vec![parent], + parent_bank.hash(), + latest_blockhash, + &keypairs.node_keypair, + &keypairs.vote_keypair, + &keypairs.vote_keypair, + None, + ); + info!("voting {} {}", parent_bank.slot(), parent_bank.hash()); + new_bank.process_transaction(&vote_tx).unwrap(); + + // Check the vote landed + let vote_account = new_bank + .get_vote_account(&keypairs.vote_keypair.pubkey()) + .unwrap(); + let state = vote_account.1.vote_state(); + assert!(state + .as_ref() + .unwrap() + .votes + .iter() + .any(|lockout| lockout.slot == parent)); + } + } + while new_bank.tick_height() < new_bank.max_tick_height() { + new_bank.register_tick(&Hash::new_unique()); + } + if !visit.node().has_no_child() || is_frozen { + new_bank.freeze(); + self.progress + .get_fork_stats_mut(new_bank.slot()) + .expect("All frozen banks must exist in the Progress map") + .bank_hash = Some(new_bank.hash()); + self.heaviest_subtree_fork_choice.add_new_leaf_slot( + (new_bank.slot(), new_bank.hash()), + Some((new_bank.parent_slot(), new_bank.parent_hash())), + ); + } + self.bank_forks.write().unwrap().insert(new_bank); + + walk.forward(); + } + } + + pub fn simulate_vote( + &mut self, + vote_slot: Slot, + my_pubkey: &Pubkey, + tower: &mut Tower, + ) -> Vec { + // Try to simulate the vote + let my_keypairs = self.validator_keypairs.get(my_pubkey).unwrap(); + let my_vote_pubkey = my_keypairs.vote_keypair.pubkey(); + let ancestors = self.bank_forks.read().unwrap().ancestors(); + let mut frozen_banks: Vec<_> = self + .bank_forks + .read() + .unwrap() + .frozen_banks() + .values() + .cloned() + .collect(); + + let _ = ReplayStage::compute_bank_stats( + my_pubkey, + &ancestors, + &mut frozen_banks, + tower, + &mut self.progress, + &VoteTracker::default(), + &ClusterSlots::default(), + &self.bank_forks, + &mut self.heaviest_subtree_fork_choice, + &mut self.latest_validator_votes_for_frozen_banks, + ); + + let vote_bank = self + .bank_forks + .read() + .unwrap() + .get(vote_slot) + .expect("Bank must have been created before vote simulation") + .clone(); + + // Try to vote on the given slot + let descendants = self.bank_forks.read().unwrap().descendants().clone(); + let SelectVoteAndResetForkResult { + heaviest_fork_failures, + .. + } = ReplayStage::select_vote_and_reset_forks( + &vote_bank, + None, + &ancestors, + &descendants, + &self.progress, + tower, + &self.latest_validator_votes_for_frozen_banks, + &self.heaviest_subtree_fork_choice, + ); + + // Make sure this slot isn't locked out or failing threshold + info!("Checking vote: {}", vote_bank.slot()); + if !heaviest_fork_failures.is_empty() { + return heaviest_fork_failures; + } + + let new_root = tower.record_bank_vote(&vote_bank, &my_vote_pubkey); + if let Some(new_root) = new_root { + self.set_root(new_root); + } + + vec![] + } + + pub fn set_root(&mut self, new_root: Slot) { + let (drop_bank_sender, _drop_bank_receiver) = std::sync::mpsc::channel(); + ReplayStage::handle_new_root( + new_root, + &self.bank_forks, + &mut self.progress, + &AbsRequestSender::default(), + None, + &mut self.heaviest_subtree_fork_choice, + &mut DuplicateSlotsTracker::default(), + &mut GossipDuplicateConfirmedSlots::default(), + &mut UnfrozenGossipVerifiedVoteHashes::default(), + &mut true, + &mut Vec::new(), + &mut EpochSlotsFrozenSlots::default(), + &drop_bank_sender, + ) + } + + pub fn create_and_vote_new_branch( + &mut self, + start_slot: Slot, + end_slot: Slot, + cluster_votes: &HashMap>, + votes_to_simulate: &HashSet, + my_pubkey: &Pubkey, + tower: &mut Tower, + ) -> HashMap> { + (start_slot + 1..=end_slot) + .filter_map(|slot| { + let mut fork_tip_parent = tr(slot - 1); + fork_tip_parent.push_front(tr(slot)); + self.fill_bank_forks(fork_tip_parent, cluster_votes, true); + if votes_to_simulate.contains(&slot) { + Some((slot, self.simulate_vote(slot, my_pubkey, tower))) + } else { + None + } + }) + .collect() + } + + pub fn simulate_lockout_interval( + &mut self, + slot: Slot, + lockout_interval: (u64, u64), + vote_account_pubkey: &Pubkey, + ) { + self.progress + .entry(slot) + .or_insert_with(|| ForkProgress::new(Hash::default(), None, None, 0, 0)) + .fork_stats + .lockout_intervals + .entry(lockout_interval.1) + .or_default() + .push((lockout_interval.0, *vote_account_pubkey)); + } + + pub fn can_progress_on_fork( + &mut self, + my_pubkey: &Pubkey, + tower: &mut Tower, + start_slot: u64, + num_slots: u64, + cluster_votes: &mut HashMap>, + ) -> bool { + // Check that within some reasonable time, validator can make a new + // root on this fork + let old_root = tower.root(); + + for i in 1..num_slots { + // The parent of the tip of the fork + let mut fork_tip_parent = tr(start_slot + i - 1); + // The tip of the fork + fork_tip_parent.push_front(tr(start_slot + i)); + self.fill_bank_forks(fork_tip_parent, cluster_votes, true); + if self + .simulate_vote(i + start_slot, my_pubkey, tower) + .is_empty() + { + cluster_votes + .entry(*my_pubkey) + .or_default() + .push(start_slot + i); + } + if old_root != tower.root() { + return true; + } + } + + false + } + + fn init_state( + num_keypairs: usize, + ) -> ( + HashMap, + Vec, + Vec, + BankForks, + ProgressMap, + HeaviestSubtreeForkChoice, + ) { + let keypairs: HashMap<_, _> = std::iter::repeat_with(|| { + let vote_keypairs = ValidatorVoteKeypairs::new_rand(); + (vote_keypairs.node_keypair.pubkey(), vote_keypairs) + }) + .take(num_keypairs) + .collect(); + let node_pubkeys: Vec<_> = keypairs + .values() + .map(|keys| keys.node_keypair.pubkey()) + .collect(); + let vote_pubkeys: Vec<_> = keypairs + .values() + .map(|keys| keys.vote_keypair.pubkey()) + .collect(); + + let (bank_forks, progress, heaviest_subtree_fork_choice) = + initialize_state(&keypairs, 10_000); + ( + keypairs, + node_pubkeys, + vote_pubkeys, + bank_forks, + progress, + heaviest_subtree_fork_choice, + ) + } +} + +// Setup BankForks with bank 0 and all the validator accounts +pub fn initialize_state( + validator_keypairs_map: &HashMap, + stake: u64, +) -> (BankForks, ProgressMap, HeaviestSubtreeForkChoice) { + let validator_keypairs: Vec<_> = validator_keypairs_map.values().collect(); + let GenesisConfigInfo { + mut genesis_config, + mint_keypair, + .. + } = create_genesis_config_with_vote_accounts( + 1_000_000_000, + &validator_keypairs, + vec![stake; validator_keypairs.len()], + ); + + genesis_config.poh_config.hashes_per_tick = Some(2); + let bank0 = Bank::new_for_tests(&genesis_config); + + for pubkey in validator_keypairs_map.keys() { + bank0.transfer(10_000, &mint_keypair, pubkey).unwrap(); + } + + while bank0.tick_height() < bank0.max_tick_height() { + bank0.register_tick(&Hash::new_unique()); + } + bank0.freeze(); + let mut progress = ProgressMap::default(); + progress.insert( + 0, + ForkProgress::new_from_bank(&bank0, bank0.collector_id(), &Pubkey::default(), None, 0, 0), + ); + let bank_forks = BankForks::new(bank0); + let heaviest_subtree_fork_choice = HeaviestSubtreeForkChoice::new_from_bank_forks(&bank_forks); + (bank_forks, progress, heaviest_subtree_fork_choice) +} diff --git a/perf/src/data_budget.rs b/perf/src/data_budget.rs index 24eb0bb84ec5cc..4c35fc6ce35caa 100644 --- a/perf/src/data_budget.rs +++ b/perf/src/data_budget.rs @@ -10,6 +10,14 @@ pub struct DataBudget { } impl DataBudget { + /// Create a data budget with max bytes, used for tests + pub fn restricted() -> Self { + Self { + bytes: AtomicUsize::default(), + last_timestamp_ms: AtomicU64::new(u64::MAX), + } + } + // If there are enough bytes in the budget, consumes from // the budget and returns true. Otherwise returns false. #[must_use] diff --git a/program-test/src/lib.rs b/program-test/src/lib.rs index 92cb627f9e368c..ca6cfa7d3d9f92 100644 --- a/program-test/src/lib.rs +++ b/program-test/src/lib.rs @@ -830,6 +830,7 @@ impl ProgramTest { genesis_config, mint_keypair, voting_keypair, + validator_pubkey: bootstrap_validator_pubkey, }, ) } diff --git a/rpc/src/cluster_tpu_info.rs b/rpc/src/cluster_tpu_info.rs new file mode 100644 index 00000000000000..5421bc7b2b9b3d --- /dev/null +++ b/rpc/src/cluster_tpu_info.rs @@ -0,0 +1,186 @@ +use { + solana_gossip::cluster_info::ClusterInfo, + solana_poh::poh_recorder::PohRecorder, + solana_sdk::{clock::NUM_CONSECUTIVE_LEADER_SLOTS, pubkey::Pubkey}, + solana_send_transaction_service::tpu_info::TpuInfo, + std::{ + collections::HashMap, + net::SocketAddr, + sync::{Arc, Mutex}, + }, +}; + +pub struct ClusterTpuInfo { + cluster_info: Arc, + poh_recorder: Arc>, + recent_peers: HashMap, +} + +impl ClusterTpuInfo { + pub fn new(cluster_info: Arc, poh_recorder: Arc>) -> Self { + Self { + cluster_info, + poh_recorder, + recent_peers: HashMap::new(), + } + } +} + +impl TpuInfo for ClusterTpuInfo { + fn refresh_recent_peers(&mut self) { + self.recent_peers = self + .cluster_info + .tpu_peers() + .into_iter() + .map(|ci| (ci.id, ci.tpu)) + .collect(); + } + + fn get_leader_tpus(&self, max_count: u64) -> Vec<&SocketAddr> { + let recorder = self.poh_recorder.lock().unwrap(); + let leaders: Vec<_> = (0..max_count) + .filter_map(|i| recorder.leader_after_n_slots(i * NUM_CONSECUTIVE_LEADER_SLOTS)) + .collect(); + drop(recorder); + let mut unique_leaders = vec![]; + for leader in leaders.iter() { + if let Some(addr) = self.recent_peers.get(leader) { + if !unique_leaders.contains(&addr) { + unique_leaders.push(addr); + } + } + } + unique_leaders + } +} + +#[cfg(test)] +mod test { + use { + super::*, + solana_gossip::contact_info::ContactInfo, + solana_ledger::{ + blockstore::Blockstore, get_tmp_ledger_path, leader_schedule_cache::LeaderScheduleCache, + }, + solana_runtime::{ + bank::Bank, + genesis_utils::{ + create_genesis_config_with_vote_accounts, GenesisConfigInfo, ValidatorVoteKeypairs, + }, + }, + solana_sdk::{ + poh_config::PohConfig, + signature::{Keypair, Signer}, + timing::timestamp, + }, + solana_streamer::socket::SocketAddrSpace, + std::sync::atomic::AtomicBool, + }; + + #[test] + fn test_get_leader_tpus() { + let ledger_path = get_tmp_ledger_path!(); + { + let blockstore = Blockstore::open(&ledger_path).unwrap(); + + let validator_vote_keypairs0 = ValidatorVoteKeypairs::new_rand(); + let validator_vote_keypairs1 = ValidatorVoteKeypairs::new_rand(); + let validator_vote_keypairs2 = ValidatorVoteKeypairs::new_rand(); + let validator_keypairs = vec![ + &validator_vote_keypairs0, + &validator_vote_keypairs1, + &validator_vote_keypairs2, + ]; + let GenesisConfigInfo { genesis_config, .. } = create_genesis_config_with_vote_accounts( + 1_000_000_000, + &validator_keypairs, + vec![10_000; 3], + ); + let bank = Arc::new(Bank::new_for_tests(&genesis_config)); + + let (poh_recorder, _entry_receiver, _record_receiver) = PohRecorder::new( + 0, + bank.last_blockhash(), + bank.clone(), + Some((2, 2)), + bank.ticks_per_slot(), + &Pubkey::default(), + &Arc::new(blockstore), + &Arc::new(LeaderScheduleCache::new_from_bank(&bank)), + &Arc::new(PohConfig::default()), + Arc::new(AtomicBool::default()), + ); + + let node_keypair = Arc::new(Keypair::new()); + let cluster_info = Arc::new(ClusterInfo::new( + ContactInfo::new_localhost(&node_keypair.pubkey(), timestamp()), + node_keypair, + SocketAddrSpace::Unspecified, + )); + + let validator0_socket = SocketAddr::from(([127, 0, 0, 1], 1111)); + let validator1_socket = SocketAddr::from(([127, 0, 0, 1], 2222)); + let validator2_socket = SocketAddr::from(([127, 0, 0, 1], 3333)); + let recent_peers: HashMap<_, _> = vec![ + ( + validator_vote_keypairs0.node_keypair.pubkey(), + validator0_socket, + ), + ( + validator_vote_keypairs1.node_keypair.pubkey(), + validator1_socket, + ), + ( + validator_vote_keypairs2.node_keypair.pubkey(), + validator2_socket, + ), + ] + .iter() + .cloned() + .collect(); + let leader_info = ClusterTpuInfo { + cluster_info, + poh_recorder: Arc::new(Mutex::new(poh_recorder)), + recent_peers: recent_peers.clone(), + }; + + let slot = bank.slot(); + let first_leader = + solana_ledger::leader_schedule_utils::slot_leader_at(slot, &bank).unwrap(); + assert_eq!( + leader_info.get_leader_tpus(1), + vec![recent_peers.get(&first_leader).unwrap()] + ); + + let second_leader = solana_ledger::leader_schedule_utils::slot_leader_at( + slot + NUM_CONSECUTIVE_LEADER_SLOTS, + &bank, + ) + .unwrap(); + let mut expected_leader_sockets = vec![ + recent_peers.get(&first_leader).unwrap(), + recent_peers.get(&second_leader).unwrap(), + ]; + expected_leader_sockets.dedup(); + assert_eq!(leader_info.get_leader_tpus(2), expected_leader_sockets); + + let third_leader = solana_ledger::leader_schedule_utils::slot_leader_at( + slot + (2 * NUM_CONSECUTIVE_LEADER_SLOTS), + &bank, + ) + .unwrap(); + let mut expected_leader_sockets = vec![ + recent_peers.get(&first_leader).unwrap(), + recent_peers.get(&second_leader).unwrap(), + recent_peers.get(&third_leader).unwrap(), + ]; + expected_leader_sockets.dedup(); + assert_eq!(leader_info.get_leader_tpus(3), expected_leader_sockets); + + for x in 4..8 { + assert!(leader_info.get_leader_tpus(x).len() <= recent_peers.len()); + } + } + Blockstore::destroy(&ledger_path).unwrap(); + } +} diff --git a/rpc/src/rpc.rs b/rpc/src/rpc.rs index 3e06e94f4e2aab..5a92c5b1b63edf 100644 --- a/rpc/src/rpc.rs +++ b/rpc/src/rpc.rs @@ -5991,6 +5991,7 @@ pub mod tests { mut genesis_config, mint_keypair, voting_keypair, + .. } = create_genesis_config(TEST_MINT_LAMPORTS); genesis_config.rent.lamports_per_byte_year = 50; diff --git a/runtime/src/bank.rs b/runtime/src/bank.rs index 80e7b529d2357b..8c4a0f21b52dbc 100644 --- a/runtime/src/bank.rs +++ b/runtime/src/bank.rs @@ -13520,11 +13520,7 @@ pub(crate) mod tests { let validator_vote_keypairs0 = ValidatorVoteKeypairs::new_rand(); let validator_vote_keypairs1 = ValidatorVoteKeypairs::new_rand(); let validator_keypairs = vec![&validator_vote_keypairs0, &validator_vote_keypairs1]; - let GenesisConfigInfo { - genesis_config, - mint_keypair: _, - voting_keypair: _, - } = create_genesis_config_with_vote_accounts( + let GenesisConfigInfo { genesis_config, .. } = create_genesis_config_with_vote_accounts( 1_000_000_000, &validator_keypairs, vec![10_000; 2], diff --git a/runtime/src/bank_forks.rs b/runtime/src/bank_forks.rs index a283c0fd29d4c2..87098d2b881c0d 100644 --- a/runtime/src/bank_forks.rs +++ b/runtime/src/bank_forks.rs @@ -629,8 +629,8 @@ mod tests { let leader_keypair = Keypair::new(); let GenesisConfigInfo { mut genesis_config, - mint_keypair: _, voting_keypair, + .. } = create_genesis_config_with_leader(10_000, &leader_keypair.pubkey(), 1_000); let slots_in_epoch = 32; genesis_config.epoch_schedule = EpochSchedule::new(slots_in_epoch); diff --git a/runtime/src/genesis_utils.rs b/runtime/src/genesis_utils.rs index 71222fdf441b31..7db7a4f8a07cbd 100644 --- a/runtime/src/genesis_utils.rs +++ b/runtime/src/genesis_utils.rs @@ -52,6 +52,7 @@ pub struct GenesisConfigInfo { pub genesis_config: GenesisConfig, pub mint_keypair: Keypair, pub voting_keypair: Keypair, + pub validator_pubkey: Pubkey, } pub fn create_genesis_config(mint_lamports: u64) -> GenesisConfigInfo { @@ -84,10 +85,11 @@ pub fn create_genesis_config_with_vote_accounts_and_cluster_type( let voting_keypair = Keypair::from_bytes(&voting_keypairs[0].borrow().vote_keypair.to_bytes()).unwrap(); + let validator_pubkey = voting_keypairs[0].borrow().node_keypair.pubkey(); let genesis_config = create_genesis_config_with_leader_ex( mint_lamports, &mint_keypair.pubkey(), - &voting_keypairs[0].borrow().node_keypair.pubkey(), + &validator_pubkey, &voting_keypairs[0].borrow().vote_keypair.pubkey(), &voting_keypairs[0].borrow().stake_keypair.pubkey(), stakes[0], @@ -102,6 +104,7 @@ pub fn create_genesis_config_with_vote_accounts_and_cluster_type( genesis_config, mint_keypair, voting_keypair, + validator_pubkey, }; for (validator_voting_keypairs, stake) in voting_keypairs[1..].iter().zip(&stakes[1..]) { @@ -159,6 +162,7 @@ pub fn create_genesis_config_with_leader( genesis_config, mint_keypair, voting_keypair, + validator_pubkey: *validator_pubkey, } } diff --git a/sdk/src/packet.rs b/sdk/src/packet.rs index 27435b3ae677fb..19b0bae996d493 100644 --- a/sdk/src/packet.rs +++ b/sdk/src/packet.rs @@ -18,7 +18,7 @@ pub const PACKET_DATA_SIZE: usize = 1280 - 40 - 8; #[repr(C)] pub struct Meta { pub size: usize, - pub forward: bool, + pub forwarded: bool, pub repair: bool, pub discard: bool, pub addr: [u16; 8],