diff --git a/banking-bench/src/main.rs b/banking-bench/src/main.rs index 9afbb6cfd16117..c91ccc837ee5f0 100644 --- a/banking-bench/src/main.rs +++ b/banking-bench/src/main.rs @@ -13,7 +13,6 @@ use { banking_trace::{BankingTracer, Channels, BANKING_TRACE_DIR_DEFAULT_BYTE_LIMIT}, validator::{BlockProductionMethod, TransactionStructure}, }, - solana_gossip::cluster_info::{ClusterInfo, Node}, solana_hash::Hash, solana_keypair::Keypair, solana_ledger::{ @@ -32,7 +31,6 @@ use { }, solana_signature::Signature, solana_signer::Signer, - solana_streamer::socket::SocketAddrSpace, solana_system_interface::instruction as system_instruction, solana_system_transaction as system_transaction, solana_time_utils::timestamp, @@ -448,12 +446,6 @@ fn main() { ))) .unwrap(); let prioritization_fee_cache = Arc::new(PrioritizationFeeCache::new(0u64)); - let cluster_info = { - let keypair = Arc::new(Keypair::new()); - let node = Node::new_localhost_with_pubkey(&keypair.pubkey()); - ClusterInfo::new(node.info, keypair, SocketAddrSpace::Unspecified) - }; - let cluster_info = Arc::new(cluster_info); let Channels { non_vote_sender, non_vote_receiver, @@ -465,7 +457,6 @@ fn main() { let banking_stage = BankingStage::new_num_threads( block_production_method, transaction_struct, - &cluster_info, &poh_recorder, transaction_recorder, non_vote_receiver, diff --git a/core/benches/banking_stage.rs b/core/benches/banking_stage.rs index 277bf0b23ea7bc..49fc40ae64c304 100644 --- a/core/benches/banking_stage.rs +++ b/core/benches/banking_stage.rs @@ -21,7 +21,6 @@ use { solana_core::{banking_stage::BankingStage, banking_trace::BankingTracer}, solana_entry::entry::{next_hash, Entry}, solana_genesis_config::GenesisConfig, - solana_gossip::cluster_info::{ClusterInfo, Node}, solana_hash::Hash, solana_keypair::Keypair, solana_ledger::{ @@ -39,7 +38,6 @@ use { }, solana_signature::Signature, solana_signer::Signer, - solana_streamer::socket::SocketAddrSpace, solana_system_interface::instruction as system_instruction, solana_system_transaction as system_transaction, solana_time_utils::timestamp, @@ -236,17 +234,10 @@ fn bench_banking( ); let (exit, poh_recorder, transaction_recorder, poh_service, signal_receiver) = create_test_recorder(bank.clone(), blockstore, None, None); - let cluster_info = { - let keypair = Arc::new(Keypair::new()); - let node = Node::new_localhost_with_pubkey(&keypair.pubkey()); - ClusterInfo::new(node.info, keypair, SocketAddrSpace::Unspecified) - }; - let cluster_info = Arc::new(cluster_info); let (s, _r) = unbounded(); let _banking_stage = BankingStage::new( block_production_method, transaction_struct, - &cluster_info, &poh_recorder, transaction_recorder, non_vote_receiver, diff --git a/core/src/banking_simulation.rs b/core/src/banking_simulation.rs index cbd507c530eb4a..c62bc9f34520d4 100644 --- a/core/src/banking_simulation.rs +++ b/core/src/banking_simulation.rs @@ -774,10 +774,6 @@ impl BankingSimulator { assert!(retracer.is_enabled()); info!("Enabled banking retracer (dir_byte_limit: {BANKING_TRACE_DIR_DEFAULT_BYTE_LIMIT})",); - // Create a partially-dummy ClusterInfo for the banking stage. - let cluster_info_for_banking = Arc::new(DummyClusterInfo { - id: simulated_leader.into(), - }); let Channels { non_vote_sender, non_vote_receiver, @@ -830,7 +826,6 @@ impl BankingSimulator { let banking_stage = BankingStage::new_num_threads( block_production_method.clone(), transaction_struct.clone(), - &cluster_info_for_banking, &poh_recorder, transaction_recorder, non_vote_receiver, diff --git a/core/src/banking_stage.rs b/core/src/banking_stage.rs index 45f32b39c78c3e..f0c8b1f872a9e0 100644 --- a/core/src/banking_stage.rs +++ b/core/src/banking_stage.rs @@ -366,7 +366,6 @@ impl BankingStage { pub fn new( block_production_method: BlockProductionMethod, transaction_struct: TransactionStructure, - cluster_info: &impl LikeClusterInfo, poh_recorder: &Arc>, transaction_recorder: TransactionRecorder, non_vote_receiver: BankingPacketReceiver, @@ -381,7 +380,6 @@ impl BankingStage { Self::new_num_threads( block_production_method, transaction_struct, - cluster_info, poh_recorder, transaction_recorder, non_vote_receiver, @@ -400,7 +398,6 @@ impl BankingStage { pub fn new_num_threads( block_production_method: BlockProductionMethod, transaction_struct: TransactionStructure, - cluster_info: &impl LikeClusterInfo, poh_recorder: &Arc>, transaction_recorder: TransactionRecorder, non_vote_receiver: BankingPacketReceiver, @@ -420,7 +417,6 @@ impl BankingStage { Self::new_central_scheduler( transaction_struct, use_greedy_scheduler, - cluster_info, poh_recorder, transaction_recorder, non_vote_receiver, @@ -439,7 +435,6 @@ impl BankingStage { pub fn new_central_scheduler( transaction_struct: TransactionStructure, use_greedy_scheduler: bool, - cluster_info: &impl LikeClusterInfo, poh_recorder: &Arc>, transaction_recorder: TransactionRecorder, non_vote_receiver: BankingPacketReceiver, @@ -458,7 +453,7 @@ impl BankingStage { VoteStorage::new(&bank) }; - let decision_maker = DecisionMaker::new(cluster_info.id(), poh_recorder.clone()); + let decision_maker = DecisionMaker::new(poh_recorder.clone()); let committer = Committer::new( transaction_status_sender.clone(), replay_vote_sender.clone(), @@ -688,7 +683,6 @@ mod tests { crossbeam_channel::{unbounded, Receiver}, itertools::Itertools, solana_entry::entry::{self, Entry, EntrySlice}, - solana_gossip::cluster_info::Node, solana_hash::Hash, solana_keypair::Keypair, solana_ledger::{ @@ -710,7 +704,6 @@ mod tests { solana_runtime::{bank::Bank, genesis_utils::bootstrap_validator_stake_lamports}, solana_runtime_transaction::runtime_transaction::RuntimeTransaction, solana_signer::Signer, - solana_streamer::socket::SocketAddrSpace, solana_system_transaction as system_transaction, solana_transaction::{sanitized::SanitizedTransaction, Transaction}, solana_vote::vote_transaction::new_tower_sync_transaction, @@ -723,14 +716,6 @@ mod tests { test_case::test_case, }; - pub(crate) fn new_test_cluster_info(keypair: Option>) -> (Node, ClusterInfo) { - let keypair = keypair.unwrap_or_else(|| Arc::new(Keypair::new())); - let node = Node::new_localhost_with_pubkey(&keypair.pubkey()); - let cluster_info = - ClusterInfo::new(node.info.clone(), keypair, SocketAddrSpace::Unspecified); - (node, cluster_info) - } - pub(crate) fn sanitize_transactions( txs: Vec, ) -> Vec> { @@ -760,14 +745,11 @@ mod tests { ); let (exit, poh_recorder, transaction_recorder, poh_service, _entry_receiever) = create_test_recorder(bank, blockstore, None, None); - let (_, cluster_info) = new_test_cluster_info(/*keypair:*/ None); - let cluster_info = Arc::new(cluster_info); let (replay_vote_sender, _replay_vote_receiver) = unbounded(); let banking_stage = BankingStage::new( BlockProductionMethod::CentralScheduler, transaction_struct, - &cluster_info, &poh_recorder, transaction_recorder, non_vote_receiver, @@ -818,14 +800,11 @@ mod tests { }; let (exit, poh_recorder, transaction_recorder, poh_service, entry_receiver) = create_test_recorder(bank.clone(), blockstore, Some(poh_config), None); - let (_, cluster_info) = new_test_cluster_info(/*keypair:*/ None); - let cluster_info = Arc::new(cluster_info); let (replay_vote_sender, _replay_vote_receiver) = unbounded(); let banking_stage = BankingStage::new( BlockProductionMethod::CentralScheduler, transaction_struct, - &cluster_info, &poh_recorder, transaction_recorder, non_vote_receiver, @@ -885,14 +864,11 @@ mod tests { ); let (exit, poh_recorder, transaction_recorder, poh_service, entry_receiver) = create_test_recorder(bank.clone(), blockstore, None, None); - let (_, cluster_info) = new_test_cluster_info(/*keypair:*/ None); - let cluster_info = Arc::new(cluster_info); let (replay_vote_sender, _replay_vote_receiver) = unbounded(); let banking_stage = BankingStage::new( block_production_method, transaction_struct, - &cluster_info, &poh_recorder, transaction_recorder, non_vote_receiver, @@ -1042,12 +1018,9 @@ mod tests { let (bank, bank_forks) = Bank::new_no_wallclock_throttle_for_tests(&genesis_config); let (exit, poh_recorder, transaction_recorder, poh_service, entry_receiver) = create_test_recorder(bank.clone(), blockstore, None, None); - let (_, cluster_info) = new_test_cluster_info(/*keypair:*/ None); - let cluster_info = Arc::new(cluster_info); let _banking_stage = BankingStage::new( BlockProductionMethod::CentralScheduler, transaction_struct, - &cluster_info, &poh_recorder, transaction_recorder, non_vote_receiver, @@ -1229,14 +1202,11 @@ mod tests { ); let (exit, poh_recorder, transaction_recorder, poh_service, _entry_receiver) = create_test_recorder(bank.clone(), blockstore, None, None); - let (_, cluster_info) = new_test_cluster_info(/*keypair:*/ None); - let cluster_info = Arc::new(cluster_info); let (replay_vote_sender, _replay_vote_receiver) = unbounded(); let banking_stage = BankingStage::new( BlockProductionMethod::CentralScheduler, transaction_struct, - &cluster_info, &poh_recorder, transaction_recorder, non_vote_receiver, diff --git a/core/src/banking_stage/decision_maker.rs b/core/src/banking_stage/decision_maker.rs index aa973a7a231c94..8dcf8b43016f9a 100644 --- a/core/src/banking_stage/decision_maker.rs +++ b/core/src/banking_stage/decision_maker.rs @@ -4,7 +4,6 @@ use { HOLD_TRANSACTIONS_SLOT_OFFSET, }, solana_poh::poh_recorder::{BankStart, PohRecorder}, - solana_pubkey::Pubkey, solana_unified_scheduler_pool::{BankingStageMonitor, BankingStageStatus}, std::{ sync::{atomic::{AtomicBool, Ordering::Relaxed}, Arc, RwLock}, @@ -32,7 +31,6 @@ impl BufferedPacketsDecision { #[derive(Clone, derive_more::Debug)] pub struct DecisionMaker { - my_pubkey: Pubkey, #[debug("{poh_recorder:p}")] poh_recorder: Arc>, @@ -41,9 +39,8 @@ pub struct DecisionMaker { } impl DecisionMaker { - pub fn new(my_pubkey: Pubkey, poh_recorder: Arc>) -> Self { + pub fn new(poh_recorder: Arc>) -> Self { Self { - my_pubkey, poh_recorder, cached_decision: None, last_decision_time: Instant::now(), @@ -71,11 +68,9 @@ impl DecisionMaker { { let poh_recorder = self.poh_recorder.read().unwrap(); decision = Self::consume_or_forward_packets( - &self.my_pubkey, || Self::bank_start(&poh_recorder), || Self::would_be_leader_shortly(&poh_recorder), || Self::would_be_leader(&poh_recorder), - || Self::leader_pubkey(&poh_recorder), ); } @@ -83,11 +78,9 @@ impl DecisionMaker { } fn consume_or_forward_packets( - my_pubkey: &Pubkey, bank_start_fn: impl FnOnce() -> Option, would_be_leader_shortly_fn: impl FnOnce() -> bool, would_be_leader_fn: impl FnOnce() -> bool, - leader_pubkey_fn: impl FnOnce() -> Option, ) -> BufferedPacketsDecision { // If has active bank, then immediately process buffered packets // otherwise, based on leader schedule to either forward or hold packets @@ -101,17 +94,9 @@ impl DecisionMaker { // Node will be leader within ~20 slots, hold the transactions in // case it is the only node which produces an accepted slot. BufferedPacketsDecision::ForwardAndHold - } else if let Some(x) = leader_pubkey_fn() { - if x != *my_pubkey { - // If the current node is not the leader, forward the buffered packets - BufferedPacketsDecision::Forward - } else { - // If the current node is the leader, return the buffered packets as is - BufferedPacketsDecision::Hold - } } else { - // We don't know the leader. Hold the packets for now - BufferedPacketsDecision::Hold + // If the current node is not the leader, forward the buffered packets + BufferedPacketsDecision::Forward } } @@ -130,10 +115,6 @@ impl DecisionMaker { fn would_be_leader(poh_recorder: &PohRecorder) -> bool { poh_recorder.would_be_leader(HOLD_TRANSACTIONS_SLOT_OFFSET * DEFAULT_TICKS_PER_SLOT) } - - fn leader_pubkey(poh_recorder: &PohRecorder) -> Option { - poh_recorder.leader_after_n_slots(FORWARD_TRANSACTIONS_TO_LEADER_AT_SLOT_OFFSET) - } } #[derive(Debug)] @@ -177,6 +158,7 @@ mod tests { solana_clock::NUM_CONSECUTIVE_LEADER_SLOTS, solana_ledger::{blockstore::Blockstore, genesis_utils::create_genesis_config}, solana_poh::poh_recorder::create_test_recorder, + solana_pubkey::Pubkey, solana_runtime::bank::Bank, std::{ env::temp_dir, @@ -215,7 +197,7 @@ mod tests { poh_service.join().unwrap(); let my_pubkey = Pubkey::new_unique(); - let decision_maker = DecisionMaker::new(my_pubkey, poh_recorder.clone()); + let decision_maker = DecisionMaker::new(poh_recorder.clone()); poh_recorder.write().unwrap().reset(bank.clone(), None); let slot = bank.slot() + 1; let bank = Arc::new(Bank::new_from_parent(bank, &my_pubkey, slot)); @@ -274,8 +256,6 @@ mod tests { #[test] fn test_should_process_or_forward_packets() { - let my_pubkey = solana_pubkey::new_rand(); - let my_pubkey1 = solana_pubkey::new_rand(); let bank = Arc::new(Bank::default_for_tests()); let bank_start = Some(BankStart { working_bank: bank, @@ -284,68 +264,38 @@ mod tests { // having active bank allows to consume immediately assert_matches!( DecisionMaker::consume_or_forward_packets( - &my_pubkey, || bank_start.clone(), || panic!("should not be called"), || panic!("should not be called"), - || panic!("should not be called") ), BufferedPacketsDecision::Consume(_) ); - // Unknown leader, hold the packets - assert_matches!( - DecisionMaker::consume_or_forward_packets( - &my_pubkey, - || None, - || false, - || false, - || None - ), - BufferedPacketsDecision::Hold - ); // Leader other than me, forward the packets assert_matches!( DecisionMaker::consume_or_forward_packets( - &my_pubkey, || None, || false, || false, - || Some(my_pubkey1), ), BufferedPacketsDecision::Forward ); // Will be leader shortly, hold the packets assert_matches!( DecisionMaker::consume_or_forward_packets( - &my_pubkey, || None, || true, || panic!("should not be called"), - || panic!("should not be called"), ), BufferedPacketsDecision::Hold ); // Will be leader (not shortly), forward and hold assert_matches!( DecisionMaker::consume_or_forward_packets( - &my_pubkey, || None, || false, || true, - || panic!("should not be called"), ), BufferedPacketsDecision::ForwardAndHold ); - // Current leader matches my pubkey, hold - assert_matches!( - DecisionMaker::consume_or_forward_packets( - &my_pubkey1, - || None, - || false, - || false, - || Some(my_pubkey1), - ), - BufferedPacketsDecision::Hold - ); } } diff --git a/core/src/banking_stage/transaction_scheduler/scheduler_controller.rs b/core/src/banking_stage/transaction_scheduler/scheduler_controller.rs index 443db68ae8ebfe..e32b1f5d690eca 100644 --- a/core/src/banking_stage/transaction_scheduler/scheduler_controller.rs +++ b/core/src/banking_stage/transaction_scheduler/scheduler_controller.rs @@ -424,7 +424,7 @@ mod tests { Arc::new(AtomicBool::default()), ); let poh_recorder = Arc::new(RwLock::new(poh_recorder)); - let decision_maker = DecisionMaker::new(Pubkey::new_unique(), poh_recorder.clone()); + let decision_maker = DecisionMaker::new(poh_recorder.clone()); let (banking_packet_sender, banking_packet_receiver) = unbounded(); let receive_and_buffer = diff --git a/core/src/banking_stage/unified_scheduler.rs b/core/src/banking_stage/unified_scheduler.rs index 283af6117b6c83..335c2c6e64ba9d 100644 --- a/core/src/banking_stage/unified_scheduler.rs +++ b/core/src/banking_stage/unified_scheduler.rs @@ -32,7 +32,6 @@ use { super::{ decision_maker::{BufferedPacketsDecision, DecisionMaker, DecisionMakerWrapper}, packet_deserializer::PacketDeserializer, - LikeClusterInfo, }, crate::banking_trace::Channels, agave_banking_stage_ingress_types::BankingPacketBatch, @@ -48,16 +47,14 @@ pub(crate) fn ensure_banking_stage_setup( pool: &DefaultSchedulerPool, bank_forks: &Arc>, channels: &Channels, - cluster_info: &impl LikeClusterInfo, poh_recorder: &Arc>, transaction_recorder: TransactionRecorder, num_threads: u32, ) { let mut root_bank_cache = RootBankCache::new(bank_forks.clone()); let unified_receiver = channels.unified_receiver().clone(); - let mut decision_maker = DecisionMaker::new(cluster_info.id(), poh_recorder.clone()); + let mut decision_maker = DecisionMaker::new(poh_recorder.clone()); let banking_stage_monitor = Box::new(DecisionMakerWrapper::new(decision_maker.clone())); - let banking_packet_handler = Box::new( move |helper: &BankingStageHelper, batches: BankingPacketBatch| { let decision = decision_maker.make_consume_or_forward_decision(); diff --git a/core/src/tpu.rs b/core/src/tpu.rs index 7411fb73c7cf1f..a9a208765ef20e 100644 --- a/core/src/tpu.rs +++ b/core/src/tpu.rs @@ -327,7 +327,6 @@ impl Tpu { let banking_stage = BankingStage::new( block_production_method, transaction_struct, - cluster_info, poh_recorder, transaction_recorder, non_vote_receiver, diff --git a/core/tests/unified_scheduler.rs b/core/tests/unified_scheduler.rs index a5c21542dd7675..e9cb465d60cb6b 100644 --- a/core/tests/unified_scheduler.rs +++ b/core/tests/unified_scheduler.rs @@ -19,9 +19,7 @@ use { unfrozen_gossip_verified_vote_hashes::UnfrozenGossipVerifiedVoteHashes, }, solana_entry::entry::Entry, - solana_gossip::cluster_info::{ClusterInfo, Node}, solana_hash::Hash, - solana_keypair::Keypair, solana_ledger::{ blockstore::Blockstore, create_new_tmp_ledger_auto_delete, genesis_utils::create_genesis_config, leader_schedule_cache::LeaderScheduleCache, @@ -35,8 +33,6 @@ use { prioritization_fee_cache::PrioritizationFeeCache, }, solana_runtime_transaction::runtime_transaction::RuntimeTransaction, - solana_signer::Signer, - solana_streamer::socket::SocketAddrSpace, solana_system_transaction as system_transaction, solana_timings::ExecuteTimings, solana_transaction_error::TransactionResult as Result, @@ -237,20 +233,10 @@ fn test_scheduler_producing_blocks() { let banking_tracer = BankingTracer::new_disabled(); banking_tracer.create_channels(true) }; - let cluster_info = { - let keypair = Arc::new(Keypair::new()); - let node = Node::new_localhost_with_pubkey(&keypair.pubkey()); - Arc::new(ClusterInfo::new( - node.info, - keypair, - SocketAddrSpace::Unspecified, - )) - }; ensure_banking_stage_setup( &pool, &bank_forks, &channels, - &cluster_info, &poh_recorder, transaction_recorder, BankingStage::num_threads(),