Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 2 additions & 10 deletions core/benches/receive_and_buffer_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,12 @@ use {
solana_ledger::genesis_utils::{create_genesis_config, GenesisConfigInfo},
solana_message::{Message, VersionedMessage},
solana_perf::packet::{to_packet_batches, PacketBatch, NUM_PACKETS},
solana_poh::poh_recorder::BankStart,
solana_pubkey::Pubkey,
solana_runtime::{bank::Bank, bank_forks::BankForks},
solana_sdk_ids::system_program,
solana_signer::Signer,
solana_transaction::versioned::VersionedTransaction,
std::{
sync::{Arc, RwLock},
time::Instant,
},
std::sync::{Arc, RwLock},
};

// the max number of instructions of given type that we can put into packet.
Expand Down Expand Up @@ -191,16 +187,12 @@ pub fn setup_receive_and_buffer<T: ReceiveAndBuffer + ReceiveAndBufferCreator>(

let (bank, bank_forks) =
Bank::new_for_benches(&genesis_config).wrap_with_bank_forks_for_tests();
let bank_start = BankStart {
working_bank: bank.clone(),
bank_creation_time: Arc::new(Instant::now()),
};

let (sender, receiver) = unbounded();

let receive_and_buffer = T::create(receiver, bank_forks);

let decision = BufferedPacketsDecision::Consume(bank_start);
let decision = BufferedPacketsDecision::Consume(bank.clone());

let txs = generate_transactions(
num_txs,
Expand Down
4 changes: 2 additions & 2 deletions core/src/banking_stage/committer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ impl Committer {
batch: &TransactionBatch<impl TransactionWithMeta>,
processing_results: Vec<TransactionProcessingResult>,
starting_transaction_index: Option<usize>,
bank: &Arc<Bank>,
bank: &Bank,
balance_collector: Option<BalanceCollector>,
execute_and_commit_timings: &mut LeaderExecuteAndCommitTimings,
processed_counts: &ProcessedTransactionCounts,
Expand Down Expand Up @@ -122,7 +122,7 @@ impl Committer {
fn collect_balances_and_send_status_batch(
&self,
commit_results: Vec<TransactionCommitResult>,
bank: &Arc<Bank>,
bank: &Bank,
batch: &TransactionBatch<impl TransactionWithMeta>,
balance_collector: Option<BalanceCollector>,
starting_transaction_index: Option<usize>,
Expand Down
12 changes: 6 additions & 6 deletions core/src/banking_stage/consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use {
transaction_processor::{ExecutionRecordingConfig, TransactionProcessingConfig},
},
solana_transaction_error::TransactionError,
std::{num::Saturating, sync::Arc},
std::num::Saturating,
};

/// Consumer will create chunks of transactions from buffer with up to this size.
Expand Down Expand Up @@ -94,7 +94,7 @@ impl Consumer {

pub fn process_and_record_transactions(
&self,
bank: &Arc<Bank>,
bank: &Bank,
txs: &[impl TransactionWithMeta],
) -> ProcessTransactionBatchOutput {
let mut error_counters = TransactionErrorMetrics::default();
Expand Down Expand Up @@ -124,7 +124,7 @@ impl Consumer {

pub fn process_and_record_aged_transactions(
&self,
bank: &Arc<Bank>,
bank: &Bank,
txs: &[impl TransactionWithMeta],
max_ages: &[MaxAge],
) -> ProcessTransactionBatchOutput {
Expand Down Expand Up @@ -159,7 +159,7 @@ impl Consumer {

fn process_and_record_transactions_with_pre_results(
&self,
bank: &Arc<Bank>,
bank: &Bank,
txs: &[impl TransactionWithMeta],
pre_results: impl Iterator<Item = Result<(), TransactionError>>,
) -> ProcessTransactionBatchOutput {
Expand Down Expand Up @@ -227,7 +227,7 @@ impl Consumer {

fn execute_and_commit_transactions_locked(
&self,
bank: &Arc<Bank>,
bank: &Bank,
batch: &TransactionBatch<impl TransactionWithMeta>,
) -> ExecuteAndCommitTransactionsOutput {
let transaction_status_sender_enabled = self.committer.transaction_status_sender_enabled();
Expand Down Expand Up @@ -532,7 +532,7 @@ mod tests {
borrow::Cow,
sync::{
atomic::{AtomicBool, AtomicU64, Ordering},
RwLock,
Arc, RwLock,
},
thread::{Builder, JoinHandle},
time::Duration,
Expand Down
72 changes: 29 additions & 43 deletions core/src/banking_stage/decision_maker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,27 +3,31 @@ use {
DEFAULT_TICKS_PER_SLOT, FORWARD_TRANSACTIONS_TO_LEADER_AT_SLOT_OFFSET,
HOLD_TRANSACTIONS_SLOT_OFFSET,
},
solana_poh::poh_recorder::{BankStart, PohRecorder},
solana_poh::poh_recorder::PohRecorder,
solana_runtime::bank::Bank,
solana_unified_scheduler_pool::{BankingStageMonitor, BankingStageStatus},
std::{
sync::{atomic::{AtomicBool, Ordering::Relaxed}, Arc, RwLock},
sync::{
atomic::{AtomicBool, Ordering::Relaxed},
Arc, RwLock,
},
time::{Duration, Instant},
},
};

#[derive(Debug, Clone)]
pub enum BufferedPacketsDecision {
Consume(BankStart),
Consume(Arc<Bank>),
Forward,
ForwardAndHold,
Hold,
}

impl BufferedPacketsDecision {
/// Returns the `BankStart` if the decision is `Consume`. Otherwise, returns `None`.
pub fn bank_start(&self) -> Option<&BankStart> {
/// Returns the `Bank` if the decision is `Consume`. Otherwise, returns `None`.
pub fn bank(&self) -> Option<&Arc<Bank>> {
match self {
Self::Consume(bank_start) => Some(bank_start),
Self::Consume(bank) => Some(bank),
_ => None,
}
}
Expand Down Expand Up @@ -68,7 +72,7 @@ impl DecisionMaker {
{
let poh_recorder = self.poh_recorder.read().unwrap();
decision = Self::consume_or_forward_packets(
|| Self::bank_start(&poh_recorder),
|| Self::bank(&poh_recorder),
|| Self::would_be_leader_shortly(&poh_recorder),
|| Self::would_be_leader(&poh_recorder),
);
Expand All @@ -78,15 +82,15 @@ impl DecisionMaker {
}

fn consume_or_forward_packets(
bank_start_fn: impl FnOnce() -> Option<BankStart>,
bank_fn: impl FnOnce() -> Option<Arc<Bank>>,
would_be_leader_shortly_fn: impl FnOnce() -> bool,
would_be_leader_fn: impl FnOnce() -> bool,
) -> BufferedPacketsDecision {
// If has active bank, then immediately process buffered packets
// otherwise, based on leader schedule to either forward or hold packets
if let Some(bank_start) = bank_start_fn() {
if let Some(bank) = bank_fn() {
// If the bank is available, this node is the leader
BufferedPacketsDecision::Consume(bank_start)
BufferedPacketsDecision::Consume(bank)
} else if would_be_leader_shortly_fn() {
// If the node will be the leader soon, hold the packets for now
BufferedPacketsDecision::Hold
Expand All @@ -100,10 +104,8 @@ impl DecisionMaker {
}
}

fn bank_start(poh_recorder: &PohRecorder) -> Option<BankStart> {
poh_recorder
.bank_start()
.filter(|bank_start| bank_start.should_working_bank_still_be_processing_txs())
fn bank(poh_recorder: &PohRecorder) -> Option<Arc<Bank>> {
poh_recorder.bank()
}

fn would_be_leader_shortly(poh_recorder: &PohRecorder) -> bool {
Expand All @@ -126,7 +128,12 @@ pub(crate) struct DecisionMakerWrapper {
impl DecisionMakerWrapper {
pub(crate) fn new(decision_maker: DecisionMaker) -> Self {
// Clone-off before hand to avoid lock contentions.
let is_exited = decision_maker.poh_recorder.read().unwrap().is_exited.clone();
let is_exited = decision_maker
.poh_recorder
.read()
.unwrap()
.is_exited
.clone();

Self {
is_exited,
Expand Down Expand Up @@ -163,25 +170,16 @@ mod tests {
std::{
env::temp_dir,
sync::{atomic::Ordering, Arc},
time::Instant,
},
};

#[test]
fn test_buffered_packet_decision_bank_start() {
let bank = Arc::new(Bank::default_for_tests());
let bank_start = BankStart {
working_bank: bank,
bank_creation_time: Arc::new(Instant::now()),
};
assert!(BufferedPacketsDecision::Consume(bank_start)
.bank_start()
.is_some());
assert!(BufferedPacketsDecision::Forward.bank_start().is_none());
assert!(BufferedPacketsDecision::ForwardAndHold
.bank_start()
.is_none());
assert!(BufferedPacketsDecision::Hold.bank_start().is_none());
assert!(BufferedPacketsDecision::Consume(bank).bank().is_some());
assert!(BufferedPacketsDecision::Forward.bank().is_none());
assert!(BufferedPacketsDecision::ForwardAndHold.bank().is_none());
assert!(BufferedPacketsDecision::Hold.bank().is_none());
}

#[test]
Expand Down Expand Up @@ -257,26 +255,18 @@ mod tests {
#[test]
fn test_should_process_or_forward_packets() {
let bank = Arc::new(Bank::default_for_tests());
let bank_start = Some(BankStart {
working_bank: bank,
bank_creation_time: Arc::new(Instant::now()),
});
// having active bank allows to consume immediately
assert_matches!(
DecisionMaker::consume_or_forward_packets(
|| bank_start.clone(),
|| Some(bank.clone()),
|| panic!("should not be called"),
|| panic!("should not be called"),
),
BufferedPacketsDecision::Consume(_)
);
// Leader other than me, forward the packets
assert_matches!(
DecisionMaker::consume_or_forward_packets(
|| None,
|| false,
|| false,
),
DecisionMaker::consume_or_forward_packets(|| None, || false, || false,),
BufferedPacketsDecision::Forward
);
// Will be leader shortly, hold the packets
Expand All @@ -290,11 +280,7 @@ mod tests {
);
// Will be leader (not shortly), forward and hold
assert_matches!(
DecisionMaker::consume_or_forward_packets(
|| None,
|| false,
|| true,
),
DecisionMaker::consume_or_forward_packets(|| None, || false, || true,),
BufferedPacketsDecision::ForwardAndHold
);
}
Expand Down
Loading
Loading