Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 30 additions & 6 deletions core/src/replay_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ use {
solana_measure::measure::Measure,
solana_poh::poh_recorder::{PohLeaderStatus, PohRecorder, GRACE_TICKS_FACTOR, MAX_GRACE_SLOTS},
solana_program_runtime::timings::ExecuteTimings,
solana_rayon_threadlimit::get_max_thread_count,
solana_rpc::{
optimistically_confirmed_bank_tracker::{BankNotification, BankNotificationSenderConfig},
rpc_subscriptions::RpcSubscriptions,
Expand Down Expand Up @@ -652,16 +653,23 @@ impl ReplayStage {
r_bank_forks.get_vote_only_mode_signal(),
)
};
// Thread pool to (maybe) replay multiple threads in parallel
let replay_mode = if replay_slots_concurrently {
ForkReplayMode::Serial
} else {
let pool = rayon::ThreadPoolBuilder::new()
.num_threads(MAX_CONCURRENT_FORKS_TO_REPLAY)
.thread_name(|i| format!("solReplay{i:02}"))
.thread_name(|i| format!("solReplayFork{i:02}"))
.build()
.expect("new rayon threadpool");
Comment thread
steviez marked this conversation as resolved.
ForkReplayMode::Parallel(pool)
};
// Thread pool to replay multiple transactions within one block in parallel
let replay_tx_thread_pool = rayon::ThreadPoolBuilder::new()
.num_threads(get_max_thread_count())
.thread_name(|i| format!("solReplayTx{i:02}"))
.build()
.expect("new rayon threadpool");

Self::reset_poh_recorder(
&my_pubkey,
Expand Down Expand Up @@ -724,6 +732,7 @@ impl ReplayStage {
&mut replay_timing,
log_messages_bytes_limit,
&replay_mode,
&replay_tx_thread_pool,
&prioritization_fee_cache,
&mut purge_repair_slot_counter,
);
Expand Down Expand Up @@ -2136,6 +2145,7 @@ impl ReplayStage {
fn replay_blockstore_into_bank(
bank: &BankWithScheduler,
blockstore: &Blockstore,
replay_tx_thread_pool: &ThreadPool,
replay_stats: &RwLock<ReplaySlotStats>,
replay_progress: &RwLock<ConfirmationProgress>,
transaction_status_sender: Option<&TransactionStatusSender>,
Expand All @@ -2154,6 +2164,7 @@ impl ReplayStage {
blockstore_processor::confirm_slot(
blockstore,
bank,
replay_tx_thread_pool,
&mut w_replay_stats,
&mut w_replay_progress,
false,
Expand Down Expand Up @@ -2712,7 +2723,8 @@ impl ReplayStage {
fn replay_active_banks_concurrently(
blockstore: &Blockstore,
bank_forks: &RwLock<BankForks>,
thread_pool: &ThreadPool,
fork_thread_pool: &ThreadPool,
replay_tx_thread_pool: &ThreadPool,
my_pubkey: &Pubkey,
vote_account: &Pubkey,
progress: &mut ProgressMap,
Expand All @@ -2730,7 +2742,7 @@ impl ReplayStage {
let longest_replay_time_us = AtomicU64::new(0);

// Allow for concurrent replaying of slots from different forks.
let replay_result_vec: Vec<ReplaySlotFromBlockstore> = thread_pool.install(|| {
let replay_result_vec: Vec<ReplaySlotFromBlockstore> = fork_thread_pool.install(|| {
active_bank_slots
.into_par_iter()
.map(|bank_slot| {
Expand All @@ -2744,7 +2756,7 @@ impl ReplayStage {
trace!(
"Replay active bank: slot {}, thread_idx {}",
bank_slot,
thread_pool.current_thread_index().unwrap_or_default()
fork_thread_pool.current_thread_index().unwrap_or_default()
);
let mut progress_lock = progress.write().unwrap();
if progress_lock
Expand Down Expand Up @@ -2797,6 +2809,7 @@ impl ReplayStage {
let blockstore_result = Self::replay_blockstore_into_bank(
&bank,
blockstore,
replay_tx_thread_pool,
&replay_stats,
&replay_progress,
transaction_status_sender,
Expand Down Expand Up @@ -2826,6 +2839,7 @@ impl ReplayStage {
fn replay_active_bank(
blockstore: &Blockstore,
bank_forks: &RwLock<BankForks>,
replay_tx_thread_pool: &ThreadPool,
my_pubkey: &Pubkey,
vote_account: &Pubkey,
progress: &mut ProgressMap,
Expand Down Expand Up @@ -2884,6 +2898,7 @@ impl ReplayStage {
let blockstore_result = Self::replay_blockstore_into_bank(
&bank,
blockstore,
replay_tx_thread_pool,
&bank_progress.replay_stats,
&bank_progress.replay_progress,
transaction_status_sender,
Expand Down Expand Up @@ -3183,6 +3198,7 @@ impl ReplayStage {
replay_timing: &mut ReplayLoopTiming,
log_messages_bytes_limit: Option<usize>,
replay_mode: &ForkReplayMode,
replay_tx_thread_pool: &ThreadPool,
prioritization_fee_cache: &PrioritizationFeeCache,
purge_repair_slot_counter: &mut PurgeRepairSlotCounter,
) -> bool /* completed a bank */ {
Expand All @@ -3199,11 +3215,12 @@ impl ReplayStage {

let replay_result_vec = match replay_mode {
// Skip the overhead of the threadpool if there is only one bank to play
ForkReplayMode::Parallel(thread_pool) if num_active_banks > 1 => {
ForkReplayMode::Parallel(fork_thread_pool) if num_active_banks > 1 => {
Self::replay_active_banks_concurrently(
blockstore,
bank_forks,
thread_pool,
fork_thread_pool,
replay_tx_thread_pool,
my_pubkey,
vote_account,
progress,
Expand All @@ -3223,6 +3240,7 @@ impl ReplayStage {
Self::replay_active_bank(
blockstore,
bank_forks,
replay_tx_thread_pool,
my_pubkey,
vote_account,
progress,
Expand Down Expand Up @@ -5034,9 +5052,15 @@ pub(crate) mod tests {
blockstore.insert_shreds(shreds, None, false).unwrap();
let block_commitment_cache = Arc::new(RwLock::new(BlockCommitmentCache::default()));
let exit = Arc::new(AtomicBool::new(false));
let replay_tx_thread_pool = rayon::ThreadPoolBuilder::new()
.num_threads(1)
.thread_name(|i| format!("solReplayTest{i:02}"))
.build()
.expect("new rayon threadpool");
let res = ReplayStage::replay_blockstore_into_bank(
&bank1,
&blockstore,
&replay_tx_thread_pool,
&bank1_progress.replay_stats,
&bank1_progress.replay_progress,
None,
Expand Down
Loading