Skip to content
This repository was archived by the owner on Jan 22, 2025. It is now read-only.
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 41 additions & 9 deletions core/src/banking_stage.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
//! The `banking_stage` processes Transaction messages. It is intended to be used
//! to contruct a software pipeline. The stage uses all available CPU cores and
//! can do its processing in parallel with signature verification on the GPU.
use crate::{cost_tracker::CostTracker, packet_hasher::PacketHasher};
use crate::{cost_tracker::CostTracker, packet_hasher::PacketHasher, vote_redundancy_checker};
use crossbeam_channel::{Receiver as CrossbeamReceiver, RecvTimeoutError};
use itertools::Itertools;
use lru::LruCache;
Expand Down Expand Up @@ -102,6 +102,7 @@ pub struct BankingStageStats {
cost_tracker_update_elapsed: AtomicU64,
cost_tracker_clone_elapsed: AtomicU64,
cost_tracker_check_elapsed: AtomicU64,
vote_check_elapsed: AtomicU64,
}

impl BankingStageStats {
Expand Down Expand Up @@ -225,6 +226,11 @@ impl BankingStageStats {
self.cost_tracker_check_elapsed.swap(0, Ordering::Relaxed) as i64,
i64
),
(
"vote_check_elapsed",
self.vote_check_elapsed.swap(0, Ordering::Relaxed) as i64,
i64
),
);
}
}
Expand Down Expand Up @@ -790,6 +796,7 @@ impl BankingStage {
batch: &TransactionBatch,
transaction_status_sender: Option<TransactionStatusSender>,
gossip_vote_sender: &ReplayVoteSender,
banking_stage_stats: &BankingStageStats,
) -> (Result<usize, PohRecorderError>, Vec<usize>) {
let mut load_execute_time = Measure::start("load_execute_time");
// Use a shorter maximum age when adding transactions into the pipeline. This will reduce
Expand All @@ -810,8 +817,14 @@ impl BankingStage {
vec![]
};

let mut execute_timings = ExecuteTimings::default();
let mut vote_check_time = Measure::start("vote_check_time");
let filtered_batch = vote_redundancy_checker::check_redundant_votes(bank, batch);
vote_check_time.stop();
banking_stage_stats
.vote_check_elapsed
.fetch_add(vote_check_time.as_us(), Ordering::Relaxed);

let mut execute_timings = ExecuteTimings::default();
let (
mut loaded_accounts,
results,
Expand All @@ -821,7 +834,7 @@ impl BankingStage {
tx_count,
signature_count,
) = bank.load_and_execute_transactions(
batch,
&filtered_batch,
MAX_PROCESSING_AGE,
transaction_status_sender.is_some(),
transaction_status_sender.is_some(),
Expand All @@ -832,8 +845,12 @@ impl BankingStage {
let freeze_lock = bank.freeze_lock();

let mut record_time = Measure::start("record_time");
let (num_to_commit, retryable_record_txs) =
Self::record_transactions(bank.slot(), batch.sanitized_transactions(), &results, poh);
let (num_to_commit, retryable_record_txs) = Self::record_transactions(
bank.slot(),
filtered_batch.sanitized_transactions(),
&results,
poh,
);
inc_new_counter_info!(
"banking_stage-record_transactions_num_to_commit",
*num_to_commit.as_ref().unwrap_or(&0)
Expand All @@ -849,7 +866,7 @@ impl BankingStage {
record_time.stop();

let mut commit_time = Measure::start("commit_time");
let sanitized_txs = batch.sanitized_transactions();
let sanitized_txs = filtered_batch.sanitized_transactions();
let num_to_commit = num_to_commit.unwrap();
if num_to_commit != 0 {
let tx_results = bank.commit_transactions(
Expand All @@ -863,9 +880,10 @@ impl BankingStage {

bank_utils::find_and_send_votes(sanitized_txs, &tx_results, Some(gossip_vote_sender));
if let Some(transaction_status_sender) = transaction_status_sender {
let txs = batch.sanitized_transactions().to_vec();
let post_balances = bank.collect_balances(batch);
let post_token_balances = collect_token_balances(bank, batch, &mut mint_decimals);
let txs = filtered_batch.sanitized_transactions().to_vec();
let post_balances = bank.collect_balances(&filtered_batch);
let post_token_balances =
collect_token_balances(bank, &filtered_batch, &mut mint_decimals);
transaction_status_sender.send_transaction_status_batch(
bank.clone(),
txs,
Expand Down Expand Up @@ -906,6 +924,7 @@ impl BankingStage {
chunk_offset: usize,
transaction_status_sender: Option<TransactionStatusSender>,
gossip_vote_sender: &ReplayVoteSender,
banking_stage_stats: &BankingStageStats,
) -> (Result<usize, PohRecorderError>, Vec<usize>) {
let mut lock_time = Measure::start("lock_time");
// Once accounts are locked, other threads cannot encode transactions that will modify the
Expand All @@ -919,6 +938,7 @@ impl BankingStage {
&batch,
transaction_status_sender,
gossip_vote_sender,
banking_stage_stats,
);
retryable_txs.iter_mut().for_each(|x| *x += chunk_offset);

Expand Down Expand Up @@ -949,6 +969,7 @@ impl BankingStage {
poh: &TransactionRecorder,
transaction_status_sender: Option<TransactionStatusSender>,
gossip_vote_sender: &ReplayVoteSender,
banking_stage_stats: &BankingStageStats,
) -> (usize, Vec<usize>) {
let mut chunk_start = 0;
let mut unprocessed_txs = vec![];
Expand All @@ -965,6 +986,7 @@ impl BankingStage {
chunk_start,
transaction_status_sender.clone(),
gossip_vote_sender,
banking_stage_stats,
);
trace!("process_transactions result: {:?}", result);

Expand Down Expand Up @@ -1189,6 +1211,7 @@ impl BankingStage {
poh,
transaction_status_sender,
gossip_vote_sender,
banking_stage_stats,
);
process_tx_time.stop();
let unprocessed_tx_count = unprocessed_tx_indexes.len();
Expand Down Expand Up @@ -2230,13 +2253,15 @@ mod tests {
poh_recorder.lock().unwrap().set_working_bank(working_bank);
let (gossip_vote_sender, _gossip_vote_receiver) = unbounded();

let banking_stage_stats = BankingStageStats::default();
BankingStage::process_and_record_transactions(
&bank,
&transactions,
&recorder,
0,
None,
&gossip_vote_sender,
&banking_stage_stats,
)
.0
.unwrap();
Expand Down Expand Up @@ -2276,6 +2301,7 @@ mod tests {
0,
None,
&gossip_vote_sender,
&banking_stage_stats,
)
.0,
Err(PohRecorderError::MaxHeightReached)
Expand Down Expand Up @@ -2367,13 +2393,15 @@ mod tests {

let (gossip_vote_sender, _gossip_vote_receiver) = unbounded();

let banking_stage_stats = BankingStageStats::default();
let (result, unprocessed) = BankingStage::process_and_record_transactions(
&bank,
&transactions,
&recorder,
0,
None,
&gossip_vote_sender,
&banking_stage_stats,
);

poh_recorder
Expand Down Expand Up @@ -2474,6 +2502,7 @@ mod tests {

let (gossip_vote_sender, _gossip_vote_receiver) = unbounded();

let banking_stage_stats = BankingStageStats::default();
let (processed_transactions_count, mut retryable_txs) =
BankingStage::process_transactions(
&bank,
Expand All @@ -2482,6 +2511,7 @@ mod tests {
&recorder,
None,
&gossip_vote_sender,
&banking_stage_stats,
);

assert_eq!(processed_transactions_count, 0,);
Expand Down Expand Up @@ -2575,6 +2605,7 @@ mod tests {

let (gossip_vote_sender, _gossip_vote_receiver) = unbounded();

let banking_stage_stats = BankingStageStats::default();
let _ = BankingStage::process_and_record_transactions(
&bank,
&transactions,
Expand All @@ -2585,6 +2616,7 @@ mod tests {
enable_cpi_and_log_storage: false,
}),
&gossip_vote_sender,
&banking_stage_stats,
);

transaction_status_service.join().unwrap();
Expand Down
1 change: 1 addition & 0 deletions core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ pub mod tvu;
pub mod unfrozen_gossip_verified_vote_hashes;
pub mod validator;
pub mod verified_vote_packets;
pub mod vote_redundancy_checker;
pub mod vote_simulator;
pub mod vote_stake_tracker;
pub mod voting_service;
Expand Down
Loading