From 97b4b7afc7647dff000fc4dbe4a32be26480b7f6 Mon Sep 17 00:00:00 2001 From: Andrew Fitzgerald Date: Tue, 25 Mar 2025 09:14:13 -0500 Subject: [PATCH] Move vote processing loop to VoteWorker --- core/src/banking_stage.rs | 110 +++---------------------- core/src/banking_stage/vote_worker.rs | 111 ++++++++++++++++++++++++++ 2 files changed, 120 insertions(+), 101 deletions(-) create mode 100644 core/src/banking_stage/vote_worker.rs diff --git a/core/src/banking_stage.rs b/core/src/banking_stage.rs index 56a1ed0f35c..e39ada91fb8 100644 --- a/core/src/banking_stage.rs +++ b/core/src/banking_stage.rs @@ -8,9 +8,8 @@ use { self::{ committer::Committer, consumer::Consumer, - decision_maker::{BufferedPacketsDecision, DecisionMaker}, + decision_maker::DecisionMaker, latest_unprocessed_votes::{LatestUnprocessedVotes, VoteSource}, - leader_slot_metrics::LeaderSlotMetricsTracker, packet_receiver::PacketReceiver, qos_service::QosService, vote_storage::VoteStorage, @@ -28,11 +27,10 @@ use { }, agave_banking_stage_ingress_types::BankingPacketReceiver, conditional_mod::conditional_vis_mod, - crossbeam_channel::{unbounded, Receiver, RecvTimeoutError, Sender}, + crossbeam_channel::{unbounded, Receiver, Sender}, histogram::Histogram, solana_gossip::{cluster_info::ClusterInfo, contact_info::ContactInfoQuery}, solana_ledger::blockstore_processor::TransactionStatusSender, - solana_measure::measure_us, solana_perf::packet::PACKETS_PER_BATCH, solana_poh::{poh_recorder::PohRecorder, transaction_recorder::TransactionRecorder}, solana_runtime::{ @@ -48,7 +46,7 @@ use { Arc, RwLock, }, thread::{self, Builder, JoinHandle}, - time::{Duration, Instant}, + time::Duration, }, transaction_scheduler::{ greedy_scheduler::{GreedyScheduler, GreedySchedulerConfig}, @@ -58,6 +56,7 @@ use { }, transaction_state_container::TransactionStateContainer, }, + vote_worker::VoteWorker, }; // Below modules are pub to allow use by banking_stage bench @@ -69,6 +68,7 @@ pub mod unprocessed_packet_batches; pub mod vote_storage; mod consume_worker; +mod vote_worker; conditional_vis_mod!(decision_maker, feature = "dev-context-only-utils", pub); mod immutable_deserialized_packet; mod latest_unprocessed_votes; @@ -435,7 +435,7 @@ impl BankingStage { (0, gossip_vote_receiver, VoteSource::Gossip), (1, tpu_vote_receiver, VoteSource::Tpu), ] { - bank_thread_hdls.push(Self::spawn_thread_local_multi_iterator_thread( + bank_thread_hdls.push(Self::spawn_vote_worker( id, packet_receiver, decision_maker.clone(), @@ -584,7 +584,7 @@ impl BankingStage { } } - fn spawn_thread_local_multi_iterator_thread( + fn spawn_vote_worker( id: u32, packet_receiver: BankingPacketReceiver, mut decision_maker: DecisionMaker, @@ -605,7 +605,7 @@ impl BankingStage { Builder::new() .name(format!("solBanknStgTx{id:02}")) .spawn(move || { - Self::process_loop( + VoteWorker::run( &mut packet_receiver, &mut decision_maker, &bank_forks, @@ -617,99 +617,6 @@ impl BankingStage { .unwrap() } - #[allow(clippy::too_many_arguments)] - fn process_buffered_packets( - decision_maker: &mut DecisionMaker, - bank_forks: &RwLock, - consumer: &Consumer, - vote_storage: &mut VoteStorage, - banking_stage_stats: &BankingStageStats, - slot_metrics_tracker: &mut LeaderSlotMetricsTracker, - ) { - if vote_storage.should_not_process() { - return; - } - let (decision, make_decision_us) = - measure_us!(decision_maker.make_consume_or_forward_decision()); - let metrics_action = slot_metrics_tracker.check_leader_slot_boundary(decision.bank_start()); - slot_metrics_tracker.increment_make_decision_us(make_decision_us); - - match decision { - BufferedPacketsDecision::Consume(bank_start) => { - // Take metrics action before consume packets (potentially resetting the - // slot metrics tracker to the next slot) so that we don't count the - // packet processing metrics from the next slot towards the metrics - // of the previous slot - slot_metrics_tracker.apply_action(metrics_action); - let (_, consume_buffered_packets_us) = measure_us!(consumer - .consume_buffered_packets( - &bank_start, - vote_storage, - banking_stage_stats, - slot_metrics_tracker, - )); - slot_metrics_tracker - .increment_consume_buffered_packets_us(consume_buffered_packets_us); - } - BufferedPacketsDecision::Forward => { - // get current working bank from bank_forks, use it to sanitize transaction and - // load all accounts from address loader; - let current_bank = bank_forks.read().unwrap().working_bank(); - vote_storage.cache_epoch_boundary_info(¤t_bank); - vote_storage.clear(); - } - BufferedPacketsDecision::ForwardAndHold => { - // get current working bank from bank_forks, use it to sanitize transaction and - // load all accounts from address loader; - let current_bank = bank_forks.read().unwrap().working_bank(); - vote_storage.cache_epoch_boundary_info(¤t_bank); - } - BufferedPacketsDecision::Hold => {} - } - } - - fn process_loop( - packet_receiver: &mut PacketReceiver, - decision_maker: &mut DecisionMaker, - bank_forks: &RwLock, - consumer: &Consumer, - id: u32, - mut vote_storage: VoteStorage, - ) { - let mut banking_stage_stats = BankingStageStats::new(id); - - let mut slot_metrics_tracker = LeaderSlotMetricsTracker::new(id); - let mut last_metrics_update = Instant::now(); - - loop { - if !vote_storage.is_empty() - || last_metrics_update.elapsed() >= SLOT_BOUNDARY_CHECK_PERIOD - { - let (_, process_buffered_packets_us) = measure_us!(Self::process_buffered_packets( - decision_maker, - bank_forks, - consumer, - &mut vote_storage, - &banking_stage_stats, - &mut slot_metrics_tracker, - )); - slot_metrics_tracker - .increment_process_buffered_packets_us(process_buffered_packets_us); - last_metrics_update = Instant::now(); - } - - match packet_receiver.receive_and_buffer_packets( - &mut vote_storage, - &mut banking_stage_stats, - &mut slot_metrics_tracker, - ) { - Ok(()) | Err(RecvTimeoutError::Timeout) => (), - Err(RecvTimeoutError::Disconnected) => break, - } - banking_stage_stats.report(1000); - } - } - pub fn num_threads() -> u32 { cmp::max( env::var("SOLANA_BANKING_THREADS") @@ -781,6 +688,7 @@ mod tests { std::{ sync::atomic::{AtomicBool, Ordering}, thread::sleep, + time::Instant, }, test_case::test_case, }; diff --git a/core/src/banking_stage/vote_worker.rs b/core/src/banking_stage/vote_worker.rs new file mode 100644 index 00000000000..bff57493dc3 --- /dev/null +++ b/core/src/banking_stage/vote_worker.rs @@ -0,0 +1,111 @@ +use { + super::{ + consumer::Consumer, + decision_maker::{BufferedPacketsDecision, DecisionMaker}, + leader_slot_metrics::LeaderSlotMetricsTracker, + packet_receiver::PacketReceiver, + vote_storage::VoteStorage, + BankingStageStats, SLOT_BOUNDARY_CHECK_PERIOD, + }, + crossbeam_channel::RecvTimeoutError, + solana_measure::measure_us, + solana_runtime::bank_forks::BankForks, + std::{sync::RwLock, time::Instant}, +}; + +pub struct VoteWorker; + +impl VoteWorker { + pub fn run( + packet_receiver: &mut PacketReceiver, + decision_maker: &mut DecisionMaker, + bank_forks: &RwLock, + consumer: &Consumer, + id: u32, + mut vote_storage: VoteStorage, + ) { + let mut banking_stage_stats = BankingStageStats::new(id); + + let mut slot_metrics_tracker = LeaderSlotMetricsTracker::new(id); + let mut last_metrics_update = Instant::now(); + + loop { + if !vote_storage.is_empty() + || last_metrics_update.elapsed() >= SLOT_BOUNDARY_CHECK_PERIOD + { + let (_, process_buffered_packets_us) = measure_us!(Self::process_buffered_packets( + decision_maker, + bank_forks, + consumer, + &mut vote_storage, + &banking_stage_stats, + &mut slot_metrics_tracker, + )); + slot_metrics_tracker + .increment_process_buffered_packets_us(process_buffered_packets_us); + last_metrics_update = Instant::now(); + } + + match packet_receiver.receive_and_buffer_packets( + &mut vote_storage, + &mut banking_stage_stats, + &mut slot_metrics_tracker, + ) { + Ok(()) | Err(RecvTimeoutError::Timeout) => (), + Err(RecvTimeoutError::Disconnected) => break, + } + banking_stage_stats.report(1000); + } + } + + #[allow(clippy::too_many_arguments)] + fn process_buffered_packets( + decision_maker: &mut DecisionMaker, + bank_forks: &RwLock, + consumer: &Consumer, + vote_storage: &mut VoteStorage, + banking_stage_stats: &BankingStageStats, + slot_metrics_tracker: &mut LeaderSlotMetricsTracker, + ) { + if vote_storage.should_not_process() { + return; + } + let (decision, make_decision_us) = + measure_us!(decision_maker.make_consume_or_forward_decision()); + let metrics_action = slot_metrics_tracker.check_leader_slot_boundary(decision.bank_start()); + slot_metrics_tracker.increment_make_decision_us(make_decision_us); + + match decision { + BufferedPacketsDecision::Consume(bank_start) => { + // Take metrics action before consume packets (potentially resetting the + // slot metrics tracker to the next slot) so that we don't count the + // packet processing metrics from the next slot towards the metrics + // of the previous slot + slot_metrics_tracker.apply_action(metrics_action); + let (_, consume_buffered_packets_us) = measure_us!(consumer + .consume_buffered_packets( + &bank_start, + vote_storage, + banking_stage_stats, + slot_metrics_tracker, + )); + slot_metrics_tracker + .increment_consume_buffered_packets_us(consume_buffered_packets_us); + } + BufferedPacketsDecision::Forward => { + // get current working bank from bank_forks, use it to sanitize transaction and + // load all accounts from address loader; + let current_bank = bank_forks.read().unwrap().working_bank(); + vote_storage.cache_epoch_boundary_info(¤t_bank); + vote_storage.clear(); + } + BufferedPacketsDecision::ForwardAndHold => { + // get current working bank from bank_forks, use it to sanitize transaction and + // load all accounts from address loader; + let current_bank = bank_forks.read().unwrap().working_bank(); + vote_storage.cache_epoch_boundary_info(¤t_bank); + } + BufferedPacketsDecision::Hold => {} + } + } +}