diff --git a/core/benches/receive_and_buffer_utils.rs b/core/benches/receive_and_buffer_utils.rs index 58c9bc1634cbd8..003462b0901aab 100644 --- a/core/benches/receive_and_buffer_utils.rs +++ b/core/benches/receive_and_buffer_utils.rs @@ -23,16 +23,12 @@ use { solana_ledger::genesis_utils::{create_genesis_config, GenesisConfigInfo}, solana_message::{Message, VersionedMessage}, solana_perf::packet::{to_packet_batches, PacketBatch, NUM_PACKETS}, - solana_poh::poh_recorder::BankStart, solana_pubkey::Pubkey, solana_runtime::{bank::Bank, bank_forks::BankForks}, solana_sdk_ids::system_program, solana_signer::Signer, solana_transaction::versioned::VersionedTransaction, - std::{ - sync::{Arc, RwLock}, - time::Instant, - }, + std::sync::{Arc, RwLock}, }; // the max number of instructions of given type that we can put into packet. @@ -191,16 +187,12 @@ pub fn setup_receive_and_buffer( let (bank, bank_forks) = Bank::new_for_benches(&genesis_config).wrap_with_bank_forks_for_tests(); - let bank_start = BankStart { - working_bank: bank.clone(), - bank_creation_time: Arc::new(Instant::now()), - }; let (sender, receiver) = unbounded(); let receive_and_buffer = T::create(receiver, bank_forks); - let decision = BufferedPacketsDecision::Consume(bank_start); + let decision = BufferedPacketsDecision::Consume(bank.clone()); let txs = generate_transactions( num_txs, diff --git a/core/src/banking_stage/committer.rs b/core/src/banking_stage/committer.rs index 6783627e9f98e9..706d540fd0e29b 100644 --- a/core/src/banking_stage/committer.rs +++ b/core/src/banking_stage/committer.rs @@ -63,7 +63,7 @@ impl Committer { batch: &TransactionBatch, processing_results: Vec, starting_transaction_index: Option, - bank: &Arc, + bank: &Bank, balance_collector: Option, execute_and_commit_timings: &mut LeaderExecuteAndCommitTimings, processed_counts: &ProcessedTransactionCounts, @@ -122,7 +122,7 @@ impl Committer { fn collect_balances_and_send_status_batch( &self, commit_results: Vec, - bank: &Arc, + bank: &Bank, batch: &TransactionBatch, balance_collector: Option, starting_transaction_index: Option, diff --git a/core/src/banking_stage/consumer.rs b/core/src/banking_stage/consumer.rs index 4b477812466270..1f9959c0662790 100644 --- a/core/src/banking_stage/consumer.rs +++ b/core/src/banking_stage/consumer.rs @@ -28,7 +28,7 @@ use { transaction_processor::{ExecutionRecordingConfig, TransactionProcessingConfig}, }, solana_transaction_error::TransactionError, - std::{num::Saturating, sync::Arc}, + std::num::Saturating, }; /// Consumer will create chunks of transactions from buffer with up to this size. @@ -94,7 +94,7 @@ impl Consumer { pub fn process_and_record_transactions( &self, - bank: &Arc, + bank: &Bank, txs: &[impl TransactionWithMeta], ) -> ProcessTransactionBatchOutput { let mut error_counters = TransactionErrorMetrics::default(); @@ -124,7 +124,7 @@ impl Consumer { pub fn process_and_record_aged_transactions( &self, - bank: &Arc, + bank: &Bank, txs: &[impl TransactionWithMeta], max_ages: &[MaxAge], ) -> ProcessTransactionBatchOutput { @@ -159,7 +159,7 @@ impl Consumer { fn process_and_record_transactions_with_pre_results( &self, - bank: &Arc, + bank: &Bank, txs: &[impl TransactionWithMeta], pre_results: impl Iterator>, ) -> ProcessTransactionBatchOutput { @@ -227,7 +227,7 @@ impl Consumer { fn execute_and_commit_transactions_locked( &self, - bank: &Arc, + bank: &Bank, batch: &TransactionBatch, ) -> ExecuteAndCommitTransactionsOutput { let transaction_status_sender_enabled = self.committer.transaction_status_sender_enabled(); @@ -532,7 +532,7 @@ mod tests { borrow::Cow, sync::{ atomic::{AtomicBool, AtomicU64, Ordering}, - RwLock, + Arc, RwLock, }, thread::{Builder, JoinHandle}, time::Duration, diff --git a/core/src/banking_stage/decision_maker.rs b/core/src/banking_stage/decision_maker.rs index 8dcf8b43016f9a..48faf9260818f6 100644 --- a/core/src/banking_stage/decision_maker.rs +++ b/core/src/banking_stage/decision_maker.rs @@ -3,27 +3,31 @@ use { DEFAULT_TICKS_PER_SLOT, FORWARD_TRANSACTIONS_TO_LEADER_AT_SLOT_OFFSET, HOLD_TRANSACTIONS_SLOT_OFFSET, }, - solana_poh::poh_recorder::{BankStart, PohRecorder}, + solana_poh::poh_recorder::PohRecorder, + solana_runtime::bank::Bank, solana_unified_scheduler_pool::{BankingStageMonitor, BankingStageStatus}, std::{ - sync::{atomic::{AtomicBool, Ordering::Relaxed}, Arc, RwLock}, + sync::{ + atomic::{AtomicBool, Ordering::Relaxed}, + Arc, RwLock, + }, time::{Duration, Instant}, }, }; #[derive(Debug, Clone)] pub enum BufferedPacketsDecision { - Consume(BankStart), + Consume(Arc), Forward, ForwardAndHold, Hold, } impl BufferedPacketsDecision { - /// Returns the `BankStart` if the decision is `Consume`. Otherwise, returns `None`. - pub fn bank_start(&self) -> Option<&BankStart> { + /// Returns the `Bank` if the decision is `Consume`. Otherwise, returns `None`. + pub fn bank(&self) -> Option<&Arc> { match self { - Self::Consume(bank_start) => Some(bank_start), + Self::Consume(bank) => Some(bank), _ => None, } } @@ -68,7 +72,7 @@ impl DecisionMaker { { let poh_recorder = self.poh_recorder.read().unwrap(); decision = Self::consume_or_forward_packets( - || Self::bank_start(&poh_recorder), + || Self::bank(&poh_recorder), || Self::would_be_leader_shortly(&poh_recorder), || Self::would_be_leader(&poh_recorder), ); @@ -78,15 +82,15 @@ impl DecisionMaker { } fn consume_or_forward_packets( - bank_start_fn: impl FnOnce() -> Option, + bank_fn: impl FnOnce() -> Option>, would_be_leader_shortly_fn: impl FnOnce() -> bool, would_be_leader_fn: impl FnOnce() -> bool, ) -> BufferedPacketsDecision { // If has active bank, then immediately process buffered packets // otherwise, based on leader schedule to either forward or hold packets - if let Some(bank_start) = bank_start_fn() { + if let Some(bank) = bank_fn() { // If the bank is available, this node is the leader - BufferedPacketsDecision::Consume(bank_start) + BufferedPacketsDecision::Consume(bank) } else if would_be_leader_shortly_fn() { // If the node will be the leader soon, hold the packets for now BufferedPacketsDecision::Hold @@ -100,10 +104,8 @@ impl DecisionMaker { } } - fn bank_start(poh_recorder: &PohRecorder) -> Option { - poh_recorder - .bank_start() - .filter(|bank_start| bank_start.should_working_bank_still_be_processing_txs()) + fn bank(poh_recorder: &PohRecorder) -> Option> { + poh_recorder.bank() } fn would_be_leader_shortly(poh_recorder: &PohRecorder) -> bool { @@ -126,7 +128,12 @@ pub(crate) struct DecisionMakerWrapper { impl DecisionMakerWrapper { pub(crate) fn new(decision_maker: DecisionMaker) -> Self { // Clone-off before hand to avoid lock contentions. - let is_exited = decision_maker.poh_recorder.read().unwrap().is_exited.clone(); + let is_exited = decision_maker + .poh_recorder + .read() + .unwrap() + .is_exited + .clone(); Self { is_exited, @@ -163,25 +170,16 @@ mod tests { std::{ env::temp_dir, sync::{atomic::Ordering, Arc}, - time::Instant, }, }; #[test] fn test_buffered_packet_decision_bank_start() { let bank = Arc::new(Bank::default_for_tests()); - let bank_start = BankStart { - working_bank: bank, - bank_creation_time: Arc::new(Instant::now()), - }; - assert!(BufferedPacketsDecision::Consume(bank_start) - .bank_start() - .is_some()); - assert!(BufferedPacketsDecision::Forward.bank_start().is_none()); - assert!(BufferedPacketsDecision::ForwardAndHold - .bank_start() - .is_none()); - assert!(BufferedPacketsDecision::Hold.bank_start().is_none()); + assert!(BufferedPacketsDecision::Consume(bank).bank().is_some()); + assert!(BufferedPacketsDecision::Forward.bank().is_none()); + assert!(BufferedPacketsDecision::ForwardAndHold.bank().is_none()); + assert!(BufferedPacketsDecision::Hold.bank().is_none()); } #[test] @@ -257,14 +255,10 @@ mod tests { #[test] fn test_should_process_or_forward_packets() { let bank = Arc::new(Bank::default_for_tests()); - let bank_start = Some(BankStart { - working_bank: bank, - bank_creation_time: Arc::new(Instant::now()), - }); // having active bank allows to consume immediately assert_matches!( DecisionMaker::consume_or_forward_packets( - || bank_start.clone(), + || Some(bank.clone()), || panic!("should not be called"), || panic!("should not be called"), ), @@ -272,11 +266,7 @@ mod tests { ); // Leader other than me, forward the packets assert_matches!( - DecisionMaker::consume_or_forward_packets( - || None, - || false, - || false, - ), + DecisionMaker::consume_or_forward_packets(|| None, || false, || false,), BufferedPacketsDecision::Forward ); // Will be leader shortly, hold the packets @@ -290,11 +280,7 @@ mod tests { ); // Will be leader (not shortly), forward and hold assert_matches!( - DecisionMaker::consume_or_forward_packets( - || None, - || false, - || true, - ), + DecisionMaker::consume_or_forward_packets(|| None, || false, || true,), BufferedPacketsDecision::ForwardAndHold ); } diff --git a/core/src/banking_stage/leader_slot_metrics.rs b/core/src/banking_stage/leader_slot_metrics.rs index c9640b03025dc2..32afe4ab77ddcb 100644 --- a/core/src/banking_stage/leader_slot_metrics.rs +++ b/core/src/banking_stage/leader_slot_metrics.rs @@ -6,9 +6,9 @@ use { vote_storage::VoteBatchInsertionMetrics, }, solana_clock::Slot, - solana_poh::poh_recorder::BankStart, + solana_runtime::bank::Bank, solana_svm::transaction_error_metrics::*, - std::{num::Saturating, time::Instant}, + std::{num::Saturating, sync::Arc}, }; /// A summary of what happened to transactions passed to the processing pipeline. @@ -397,13 +397,13 @@ pub(crate) struct LeaderSlotMetrics { } impl LeaderSlotMetrics { - pub(crate) fn new(slot: Slot, bank_creation_time: &Instant) -> Self { + pub(crate) fn new(slot: Slot) -> Self { Self { slot, packet_count_metrics: LeaderSlotPacketCountMetrics::new(), transaction_error_metrics: TransactionErrorMetrics::new(), vote_packet_count_metrics: VotePacketCountMetrics::new(), - timing_metrics: LeaderSlotTimingMetrics::new(bank_creation_time), + timing_metrics: LeaderSlotTimingMetrics::new(), is_reported: false, } } @@ -476,9 +476,9 @@ impl LeaderSlotMetricsTracker { // Check leader slot, return MetricsTrackerAction to be applied by apply_action() pub(crate) fn check_leader_slot_boundary( &mut self, - bank_start: Option<&BankStart>, + bank: Option<&Arc>, ) -> MetricsTrackerAction { - match (self.leader_slot_metrics.as_mut(), bank_start) { + match (self.leader_slot_metrics.as_mut(), bank) { (None, None) => MetricsTrackerAction::Noop, (Some(leader_slot_metrics), None) => { @@ -487,20 +487,16 @@ impl LeaderSlotMetricsTracker { } // Our leader slot has begain, time to create a new slot tracker - (None, Some(bank_start)) => { - MetricsTrackerAction::NewTracker(Some(LeaderSlotMetrics::new( - bank_start.working_bank.slot(), - &bank_start.bank_creation_time, - ))) + (None, Some(bank)) => { + MetricsTrackerAction::NewTracker(Some(LeaderSlotMetrics::new(bank.slot()))) } - (Some(leader_slot_metrics), Some(bank_start)) => { - if leader_slot_metrics.slot != bank_start.working_bank.slot() { + (Some(leader_slot_metrics), Some(bank)) => { + if leader_slot_metrics.slot != bank.slot() { // Last slot has ended, new slot has began leader_slot_metrics.mark_slot_end_detected(); MetricsTrackerAction::ReportAndNewTracker(Some(LeaderSlotMetrics::new( - bank_start.working_bank.slot(), - &bank_start.bank_creation_time, + bank.slot(), ))) } else { MetricsTrackerAction::Noop @@ -810,19 +806,13 @@ mod tests { struct TestSlotBoundaryComponents { first_bank: Arc, - first_poh_recorder_bank: BankStart, next_bank: Arc, - next_poh_recorder_bank: BankStart, leader_slot_metrics_tracker: LeaderSlotMetricsTracker, } fn setup_test_slot_boundary_banks() -> TestSlotBoundaryComponents { let genesis = create_genesis_config(10); let first_bank = Arc::new(Bank::new_for_tests(&genesis.genesis_config)); - let first_poh_recorder_bank = BankStart { - working_bank: first_bank.clone(), - bank_creation_time: Arc::new(Instant::now()), - }; // Create a child descended from the first bank let next_bank = Arc::new(Bank::new_from_parent( @@ -830,18 +820,12 @@ mod tests { &Pubkey::new_unique(), first_bank.slot() + 1, )); - let next_poh_recorder_bank = BankStart { - working_bank: next_bank.clone(), - bank_creation_time: Arc::new(Instant::now()), - }; let leader_slot_metrics_tracker = LeaderSlotMetricsTracker::default(); TestSlotBoundaryComponents { first_bank, - first_poh_recorder_bank, next_bank, - next_poh_recorder_bank, leader_slot_metrics_tracker, } } @@ -865,7 +849,7 @@ mod tests { #[test] pub fn test_update_on_leader_slot_boundary_not_leader_to_leader() { let TestSlotBoundaryComponents { - first_poh_recorder_bank, + first_bank, mut leader_slot_metrics_tracker, .. } = setup_test_slot_boundary_banks(); @@ -873,8 +857,7 @@ mod tests { // Test case where the thread has not detected a leader bank, and now sees a leader bank. // Metrics should not be reported because leader slot has not ended assert!(leader_slot_metrics_tracker.leader_slot_metrics.is_none()); - let action = - leader_slot_metrics_tracker.check_leader_slot_boundary(Some(&first_poh_recorder_bank)); + let action = leader_slot_metrics_tracker.check_leader_slot_boundary(Some(&first_bank)); assert_eq!( mem::discriminant(&MetricsTrackerAction::NewTracker(None)), mem::discriminant(&action) @@ -887,7 +870,6 @@ mod tests { pub fn test_update_on_leader_slot_boundary_leader_to_not_leader() { let TestSlotBoundaryComponents { first_bank, - first_poh_recorder_bank, mut leader_slot_metrics_tracker, .. } = setup_test_slot_boundary_banks(); @@ -897,8 +879,7 @@ mod tests { // because that leader slot has just ended. { // Setup first_bank - let action = leader_slot_metrics_tracker - .check_leader_slot_boundary(Some(&first_poh_recorder_bank)); + let action = leader_slot_metrics_tracker.check_leader_slot_boundary(Some(&first_bank)); assert!(leader_slot_metrics_tracker.apply_action(action).is_none()); } { @@ -928,7 +909,6 @@ mod tests { pub fn test_update_on_leader_slot_boundary_leader_to_leader_same_slot() { let TestSlotBoundaryComponents { first_bank, - first_poh_recorder_bank, mut leader_slot_metrics_tracker, .. } = setup_test_slot_boundary_banks(); @@ -937,14 +917,12 @@ mod tests { // implying the slot is still running. Metrics should not be reported { // Setup with first_bank - let action = leader_slot_metrics_tracker - .check_leader_slot_boundary(Some(&first_poh_recorder_bank)); + let action = leader_slot_metrics_tracker.check_leader_slot_boundary(Some(&first_bank)); assert!(leader_slot_metrics_tracker.apply_action(action).is_none()); } { // Assert nop-op if same bank - let action = leader_slot_metrics_tracker - .check_leader_slot_boundary(Some(&first_poh_recorder_bank)); + let action = leader_slot_metrics_tracker.check_leader_slot_boundary(Some(&first_bank)); assert_eq!( mem::discriminant(&MetricsTrackerAction::Noop), mem::discriminant(&action) @@ -970,9 +948,7 @@ mod tests { pub fn test_update_on_leader_slot_boundary_leader_to_leader_bigger_slot() { let TestSlotBoundaryComponents { first_bank, - first_poh_recorder_bank, next_bank, - next_poh_recorder_bank, mut leader_slot_metrics_tracker, } = setup_test_slot_boundary_banks(); @@ -981,14 +957,12 @@ mod tests { // smaller slot { // Setup with first_bank - let action = leader_slot_metrics_tracker - .check_leader_slot_boundary(Some(&first_poh_recorder_bank)); + let action = leader_slot_metrics_tracker.check_leader_slot_boundary(Some(&first_bank)); assert!(leader_slot_metrics_tracker.apply_action(action).is_none()); } { // Assert reporting if new bank - let action = leader_slot_metrics_tracker - .check_leader_slot_boundary(Some(&next_poh_recorder_bank)); + let action = leader_slot_metrics_tracker.check_leader_slot_boundary(Some(&next_bank)); assert_eq!( mem::discriminant(&MetricsTrackerAction::ReportAndNewTracker(None)), mem::discriminant(&action) @@ -1018,9 +992,7 @@ mod tests { pub fn test_update_on_leader_slot_boundary_leader_to_leader_smaller_slot() { let TestSlotBoundaryComponents { first_bank, - first_poh_recorder_bank, next_bank, - next_poh_recorder_bank, mut leader_slot_metrics_tracker, } = setup_test_slot_boundary_banks(); // Test case where the thread has a leader bank, and now detects there's a new leader bank @@ -1028,14 +1000,12 @@ mod tests { // bigger slot { // Setup with next_bank - let action = leader_slot_metrics_tracker - .check_leader_slot_boundary(Some(&next_poh_recorder_bank)); + let action = leader_slot_metrics_tracker.check_leader_slot_boundary(Some(&next_bank)); assert!(leader_slot_metrics_tracker.apply_action(action).is_none()); } { // Assert reporting if new bank - let action = leader_slot_metrics_tracker - .check_leader_slot_boundary(Some(&first_poh_recorder_bank)); + let action = leader_slot_metrics_tracker.check_leader_slot_boundary(Some(&first_bank)); assert_eq!( mem::discriminant(&MetricsTrackerAction::ReportAndNewTracker(None)), mem::discriminant(&action) diff --git a/core/src/banking_stage/leader_slot_timing_metrics.rs b/core/src/banking_stage/leader_slot_timing_metrics.rs index 3b03c82dbb77c9..7bbca2a4420592 100644 --- a/core/src/banking_stage/leader_slot_timing_metrics.rs +++ b/core/src/banking_stage/leader_slot_timing_metrics.rs @@ -77,9 +77,9 @@ pub(crate) struct LeaderSlotTimingMetrics { } impl LeaderSlotTimingMetrics { - pub(crate) fn new(bank_creation_time: &Instant) -> Self { + pub(crate) fn new() -> Self { Self { - outer_loop_timings: OuterLoopTimings::new(bank_creation_time), + outer_loop_timings: OuterLoopTimings::new(), process_buffered_packets_timings: ProcessBufferedPacketsTimings::default(), consume_buffered_packets_timings: ConsumeBufferedPacketsTimings::default(), process_packets_timings: ProcessPacketsTimings::default(), @@ -104,9 +104,6 @@ impl LeaderSlotTimingMetrics { pub(crate) struct OuterLoopTimings { pub bank_detected_time: Instant, - // Delay from when the bank was created to when this thread detected it - pub bank_detected_delay_us: u64, - // Time spent processing buffered packets pub process_buffered_packets_us: u64, @@ -122,10 +119,9 @@ pub(crate) struct OuterLoopTimings { } impl OuterLoopTimings { - fn new(bank_creation_time: &Instant) -> Self { + fn new() -> Self { Self { bank_detected_time: Instant::now(), - bank_detected_delay_us: bank_creation_time.elapsed().as_micros() as u64, process_buffered_packets_us: 0, receive_and_buffer_packets_us: 0, receive_and_buffer_packets_invoked_count: 0, @@ -148,12 +144,6 @@ impl OuterLoopTimings { self.bank_detected_to_slot_end_detected_us, i64 ), - ( - "bank_creation_to_slot_end_detected_us", - self.bank_detected_to_slot_end_detected_us + self.bank_detected_delay_us, - i64 - ), - ("bank_detected_delay_us", self.bank_detected_delay_us, i64), ( "process_buffered_packets_us", self.process_buffered_packets_us, diff --git a/core/src/banking_stage/transaction_scheduler/scheduler_controller.rs b/core/src/banking_stage/transaction_scheduler/scheduler_controller.rs index e32b1f5d690eca..dc971599349dca 100644 --- a/core/src/banking_stage/transaction_scheduler/scheduler_controller.rs +++ b/core/src/banking_stage/transaction_scheduler/scheduler_controller.rs @@ -6,10 +6,7 @@ use { receive_and_buffer::{DisconnectedError, ReceiveAndBuffer}, scheduler::{PreLockFilterAction, Scheduler}, scheduler_error::SchedulerError, - scheduler_metrics::{ - SchedulerCountMetrics, SchedulerLeaderDetectionMetrics, SchedulerTimingMetrics, - SchedulingDetails, - }, + scheduler_metrics::{SchedulerCountMetrics, SchedulerTimingMetrics, SchedulingDetails}, }, crate::banking_stage::{ consume_worker::ConsumeWorkerMetrics, @@ -43,8 +40,6 @@ where container: R::Container, /// State for scheduling and communicating with worker threads. scheduler: S, - /// Metrics tracking time for leader bank detection. - leader_detection_metrics: SchedulerLeaderDetectionMetrics, /// Metrics tracking counts on transactions in different states /// over an interval and during a leader slot. count_metrics: SchedulerCountMetrics, @@ -75,7 +70,6 @@ where bank_forks, container: R::Container::with_capacity(TOTAL_BUFFERED_PACKETS), scheduler, - leader_detection_metrics: SchedulerLeaderDetectionMetrics::default(), count_metrics: SchedulerCountMetrics::default(), timing_metrics: SchedulerTimingMetrics::default(), worker_metrics, @@ -100,9 +94,7 @@ where self.timing_metrics.update(|timing_metrics| { timing_metrics.decision_time_us += decision_time_us; }); - let new_leader_slot = decision.bank_start().map(|b| b.working_bank.slot()); - self.leader_detection_metrics - .update_and_maybe_report(decision.bank_start()); + let new_leader_slot = decision.bank().map(|b| b.slot()); self.count_metrics .maybe_report_and_reset_slot(new_leader_slot); self.timing_metrics @@ -139,16 +131,11 @@ where decision: &BufferedPacketsDecision, ) -> Result<(), SchedulerError> { match decision { - BufferedPacketsDecision::Consume(bank_start) => { + BufferedPacketsDecision::Consume(bank) => { let (scheduling_summary, schedule_time_us) = measure_us!(self.scheduler.schedule( &mut self.container, |txs, results| { - Self::pre_graph_filter( - txs, - results, - &bank_start.working_bank, - MAX_PROCESSING_AGE, - ) + Self::pre_graph_filter(txs, results, bank, MAX_PROCESSING_AGE) }, |_| PreLockFilterAction::AttemptToSchedule // no pre-lock filter for now )?); diff --git a/core/src/banking_stage/transaction_scheduler/scheduler_metrics.rs b/core/src/banking_stage/transaction_scheduler/scheduler_metrics.rs index fd93cfed8d5ad0..5dd9254f5fab3f 100644 --- a/core/src/banking_stage/transaction_scheduler/scheduler_metrics.rs +++ b/core/src/banking_stage/transaction_scheduler/scheduler_metrics.rs @@ -2,7 +2,6 @@ use { super::scheduler::SchedulingSummary, itertools::MinMaxResult, solana_clock::Slot, - solana_poh::poh_recorder::BankStart, solana_time_utils::AtomicInterval, std::{ num::Saturating, @@ -374,69 +373,6 @@ impl SchedulerTimingMetricsInner { } } -#[derive(Default)] -pub struct SchedulerLeaderDetectionMetrics { - inner: Option, -} - -struct SchedulerLeaderDetectionMetricsInner { - slot: Slot, - bank_creation_time: Instant, - bank_detected_time: Instant, -} - -impl SchedulerLeaderDetectionMetrics { - pub fn update_and_maybe_report(&mut self, bank_start: Option<&BankStart>) { - match (&self.inner, bank_start) { - (None, Some(bank_start)) => self.initialize_inner(bank_start), - (Some(_inner), None) => self.report_and_reset(), - (Some(inner), Some(bank_start)) if inner.slot != bank_start.working_bank.slot() => { - self.report_and_reset(); - self.initialize_inner(bank_start); - } - _ => {} - } - } - - fn initialize_inner(&mut self, bank_start: &BankStart) { - let bank_detected_time = Instant::now(); - self.inner = Some(SchedulerLeaderDetectionMetricsInner { - slot: bank_start.working_bank.slot(), - bank_creation_time: *bank_start.bank_creation_time, - bank_detected_time, - }); - } - - fn report_and_reset(&mut self) { - let SchedulerLeaderDetectionMetricsInner { - slot, - bank_creation_time, - bank_detected_time, - } = self.inner.take().expect("inner must be present"); - - let bank_detected_delay_us = bank_detected_time - .duration_since(bank_creation_time) - .as_micros() - .try_into() - .unwrap_or(i64::MAX); - let bank_detected_to_slot_end_detected_us = bank_detected_time - .elapsed() - .as_micros() - .try_into() - .unwrap_or(i64::MAX); - datapoint_info!( - "banking_stage_scheduler_leader_detection", - ("slot", slot, i64), - ("bank_detected_delay_us", bank_detected_delay_us, i64), - ( - "bank_detected_to_slot_end_detected_us", - bank_detected_to_slot_end_detected_us, - i64 - ), - ); - } -} - pub struct SchedulingDetails { pub last_report: Instant, pub num_schedule_calls: usize, diff --git a/core/src/banking_stage/vote_worker.rs b/core/src/banking_stage/vote_worker.rs index 0d96ac37bae776..08beed80eaac81 100644 --- a/core/src/banking_stage/vote_worker.rs +++ b/core/src/banking_stage/vote_worker.rs @@ -19,7 +19,7 @@ use { solana_accounts_db::account_locks::validate_account_locks, solana_clock::FORWARD_TRANSACTIONS_TO_LEADER_AT_SLOT_OFFSET, solana_measure::{measure::Measure, measure_us}, - solana_poh::poh_recorder::{BankStart, PohRecorderError}, + solana_poh::poh_recorder::PohRecorderError, solana_runtime::{bank::Bank, bank_forks::BankForks}, solana_runtime_transaction::{ runtime_transaction::RuntimeTransaction, transaction_with_meta::TransactionWithMeta, @@ -121,7 +121,7 @@ impl VoteWorker { ) { let (decision, make_decision_us) = measure_us!(self.decision_maker.make_consume_or_forward_decision()); - let metrics_action = slot_metrics_tracker.check_leader_slot_boundary(decision.bank_start()); + let metrics_action = slot_metrics_tracker.check_leader_slot_boundary(decision.bank()); slot_metrics_tracker.increment_make_decision_us(make_decision_us); // Take metrics action before processing packets (potentially resetting the @@ -131,9 +131,9 @@ impl VoteWorker { slot_metrics_tracker.apply_action(metrics_action); match decision { - BufferedPacketsDecision::Consume(bank_start) => { + BufferedPacketsDecision::Consume(bank) => { let (_, consume_buffered_packets_us) = measure_us!(self.consume_buffered_packets( - &bank_start, + &bank, banking_stage_stats, slot_metrics_tracker, )); @@ -159,7 +159,7 @@ impl VoteWorker { fn consume_buffered_packets( &mut self, - bank_start: &BankStart, + bank: &Bank, banking_stage_stats: &BankingStageStats, slot_metrics_tracker: &mut LeaderSlotMetricsTracker, ) { @@ -173,7 +173,7 @@ impl VoteWorker { let num_packets_to_process = self.storage.len(); let reached_end_of_slot = self.process_packets( - bank_start, + bank, &mut consumed_buffered_packets_count, &mut rebuffered_packet_count, banking_stage_stats, @@ -208,7 +208,7 @@ impl VoteWorker { // returns `true` if the end of slot is reached fn process_packets( &mut self, - bank_start: &BankStart, + bank: &Bank, consumed_buffered_packets_count: &mut usize, rebuffered_packet_count: &mut usize, banking_stage_stats: &BankingStageStats, @@ -217,7 +217,7 @@ impl VoteWorker { // Based on the stake distribution present in the supplied bank, drain the unprocessed votes // from each validator using a weighted random ordering. Votes from validators with // 0 stake are ignored. - let all_vote_packets = self.storage.drain_unprocessed(&bank_start.working_bank); + let all_vote_packets = self.storage.drain_unprocessed(bank); let mut reached_end_of_slot = false; let mut sanitized_transactions = Vec::with_capacity(UNPROCESSED_BUFFER_STEP_SIZE); @@ -228,7 +228,7 @@ impl VoteWorker { vote_packets.clear(); chunk.iter().for_each(|packet| { if consume_scan_should_process_packet( - &bank_start.working_bank, + bank, banking_stage_stats, packet, reached_end_of_slot, @@ -241,7 +241,7 @@ impl VoteWorker { }); if let Some(retryable_vote_indices) = self.do_process_packets( - bank_start, + bank, &mut reached_end_of_slot, &mut sanitized_transactions, banking_stage_stats, @@ -265,7 +265,7 @@ impl VoteWorker { fn do_process_packets( &self, - bank_start: &BankStart, + bank: &Bank, reached_end_of_slot: &mut bool, sanitized_transactions: &mut Vec>, banking_stage_stats: &BankingStageStats, @@ -280,8 +280,7 @@ impl VoteWorker { let (process_transactions_summary, process_packets_transactions_us) = measure_us!(self .process_packets_transactions( - &bank_start.working_bank, - &bank_start.bank_creation_time, + bank, sanitized_transactions, banking_stage_stats, slot_metrics_tracker, @@ -299,7 +298,7 @@ impl VoteWorker { .. } = process_transactions_summary; - if reached_max_poh_height || !bank_start.should_working_bank_still_be_processing_txs() { + if reached_max_poh_height || !bank.is_complete() { *reached_end_of_slot = true; } @@ -325,15 +324,13 @@ impl VoteWorker { fn process_packets_transactions( &self, - bank: &Arc, - bank_creation_time: &Instant, + bank: &Bank, sanitized_transactions: &[impl TransactionWithMeta], banking_stage_stats: &BankingStageStats, slot_metrics_tracker: &mut LeaderSlotMetricsTracker, ) -> ProcessTransactionsSummary { - let (mut process_transactions_summary, process_transactions_us) = measure_us!( - self.process_transactions(bank, bank_creation_time, sanitized_transactions) - ); + let (mut process_transactions_summary, process_transactions_us) = + measure_us!(self.process_transactions(bank, sanitized_transactions)); slot_metrics_tracker.increment_process_transactions_us(process_transactions_us); banking_stage_stats .transaction_processing_elapsed @@ -381,8 +378,7 @@ impl VoteWorker { /// than the total number if max PoH height was reached and the bank halted fn process_transactions( &self, - bank: &Arc, - bank_creation_time: &Instant, + bank: &Bank, transactions: &[impl TransactionWithMeta], ) -> ProcessTransactionsSummary { let process_transaction_batch_output = self @@ -408,8 +404,7 @@ impl VoteWorker { total_transaction_counts .accumulate(&transaction_counts, commit_transactions_result.is_ok()); - let should_bank_still_be_processing_txs = - Bank::should_bank_still_be_processing_txs(bank_creation_time, bank.ns_per_slot); + let should_bank_still_be_processing_txs = bank.is_complete(); let reached_max_poh_height = match ( commit_transactions_result, should_bank_still_be_processing_txs, diff --git a/poh/src/poh_recorder.rs b/poh/src/poh_recorder.rs index 092a23c72bded5..f95d4fd00f0f3a 100644 --- a/poh/src/poh_recorder.rs +++ b/poh/src/poh_recorder.rs @@ -56,21 +56,6 @@ pub(crate) type Result = std::result::Result; pub type WorkingBankEntry = (Arc, (Entry, u64)); -#[derive(Debug, Clone)] -pub struct BankStart { - pub working_bank: Arc, - pub bank_creation_time: Arc, -} - -impl BankStart { - pub fn should_working_bank_still_be_processing_txs(&self) -> bool { - Bank::should_bank_still_be_processing_txs( - &self.bank_creation_time, - self.working_bank.ns_per_slot, - ) - } -} - // Sends the Result of the record operation, including the index in the slot of the first // transaction, if being tracked by WorkingBank type RecordResultSender = Sender>>; @@ -649,13 +634,6 @@ impl PohRecorder { self.working_bank.as_ref().map(|w| w.bank.clone()) } - pub fn bank_start(&self) -> Option { - self.working_bank.as_ref().map(|w| BankStart { - working_bank: w.bank.clone(), - bank_creation_time: w.start.clone(), - }) - } - pub fn has_bank(&self) -> bool { self.working_bank.is_some() }