Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
136 changes: 73 additions & 63 deletions core/src/replay_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ use {
window_service::DuplicateSlotReceiver,
},
crossbeam_channel::{Receiver, RecvTimeoutError, Sender},
lazy_static::lazy_static,
rayon::{prelude::*, ThreadPool},
solana_entry::entry::VerifyRecyclers,
solana_geyser_plugin_manager::block_metadata_notifier_interface::BlockMetadataNotifierArc,
Expand Down Expand Up @@ -102,14 +101,6 @@ const MAX_VOTE_REFRESH_INTERVAL_MILLIS: usize = 5000;
const MAX_CONCURRENT_FORKS_TO_REPLAY: usize = 4;
const MAX_REPAIR_RETRY_LOOP_ATTEMPTS: usize = 10;

lazy_static! {
static ref PAR_THREAD_POOL: ThreadPool = rayon::ThreadPoolBuilder::new()
.num_threads(MAX_CONCURRENT_FORKS_TO_REPLAY)
.thread_name(|i| format!("solReplay{i:02}"))
.build()
.unwrap();
}

#[derive(PartialEq, Eq, Debug)]
pub enum HeaviestForkFailures {
LockedOut(u64),
Expand All @@ -131,6 +122,11 @@ pub enum HeaviestForkFailures {
),
}

enum ForkReplayMode {
Serial,
Parallel(ThreadPool),
}

#[derive(PartialEq, Eq, Debug)]
enum ConfirmationType {
SupermajorityVoted,
Expand Down Expand Up @@ -656,6 +652,16 @@ impl ReplayStage {
r_bank_forks.get_vote_only_mode_signal(),
)
};
let replay_mode = if replay_slots_concurrently {
ForkReplayMode::Serial
} else {
let pool = rayon::ThreadPoolBuilder::new()
.num_threads(MAX_CONCURRENT_FORKS_TO_REPLAY)
.thread_name(|i| format!("solReplay{i:02}"))
.build()
.expect("new rayon threadpool");
ForkReplayMode::Parallel(pool)
};

Self::reset_poh_recorder(
&my_pubkey,
Expand Down Expand Up @@ -717,7 +723,7 @@ impl ReplayStage {
block_metadata_notifier.clone(),
&mut replay_timing,
log_messages_bytes_limit,
replay_slots_concurrently,
&replay_mode,
&prioritization_fee_cache,
&mut purge_repair_slot_counter,
);
Expand Down Expand Up @@ -2706,6 +2712,7 @@ impl ReplayStage {
fn replay_active_banks_concurrently(
blockstore: &Blockstore,
bank_forks: &RwLock<BankForks>,
thread_pool: &ThreadPool,
my_pubkey: &Pubkey,
vote_account: &Pubkey,
progress: &mut ProgressMap,
Expand All @@ -2723,7 +2730,7 @@ impl ReplayStage {
let longest_replay_time_us = AtomicU64::new(0);

// Allow for concurrent replaying of slots from different forks.
let replay_result_vec: Vec<ReplaySlotFromBlockstore> = PAR_THREAD_POOL.install(|| {
let replay_result_vec: Vec<ReplaySlotFromBlockstore> = thread_pool.install(|| {
active_bank_slots
.into_par_iter()
.map(|bank_slot| {
Expand All @@ -2737,7 +2744,7 @@ impl ReplayStage {
trace!(
"Replay active bank: slot {}, thread_idx {}",
bank_slot,
PAR_THREAD_POOL.current_thread_index().unwrap_or_default()
thread_pool.current_thread_index().unwrap_or_default()
);
let mut progress_lock = progress.write().unwrap();
if progress_lock
Expand Down Expand Up @@ -3175,7 +3182,7 @@ impl ReplayStage {
block_metadata_notifier: Option<BlockMetadataNotifierArc>,
replay_timing: &mut ReplayLoopTiming,
log_messages_bytes_limit: Option<usize>,
replay_slots_concurrently: bool,
replay_mode: &ForkReplayMode,
prioritization_fee_cache: &PrioritizationFeeCache,
purge_repair_slot_counter: &mut PurgeRepairSlotCounter,
) -> bool /* completed a bank */ {
Expand All @@ -3186,11 +3193,17 @@ impl ReplayStage {
num_active_banks,
active_bank_slots
);
if num_active_banks > 0 {
let replay_result_vec = if num_active_banks > 1 && replay_slots_concurrently {
if active_bank_slots.is_empty() {
return false;
}
Comment on lines +3196 to +3198
Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought that this was a simple enough sanity check to pull out and explicitly return early for - this enabled us to remove a level of indentation for everything else

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Never nesters unite

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

early returns make code more complex and easier to introduce bugs in the future. CHANGE MY MIND

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Number of bugs in a function is proportional to the maximum indent. CHANGE MY MIND

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🍿

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

early returns make code more complex and easier to introduce bugs in the future

steviez@4bcbbbe?diff=unified&w=0

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Number of bugs in a function is proportional to the maximum indent. CHANGE MY MIND

you're stuck in the present old man. join us in the future!


let replay_result_vec = match replay_mode {
// Skip the overhead of the threadpool if there is only one bank to play
ForkReplayMode::Parallel(thread_pool) if num_active_banks > 1 => {
Self::replay_active_banks_concurrently(
blockstore,
bank_forks,
thread_pool,
my_pubkey,
vote_account,
progress,
Expand All @@ -3203,55 +3216,52 @@ impl ReplayStage {
&active_bank_slots,
prioritization_fee_cache,
)
} else {
active_bank_slots
.iter()
.map(|bank_slot| {
Self::replay_active_bank(
blockstore,
bank_forks,
my_pubkey,
vote_account,
progress,
transaction_status_sender,
entry_notification_sender,
verify_recyclers,
replay_vote_sender,
replay_timing,
log_messages_bytes_limit,
*bank_slot,
prioritization_fee_cache,
)
})
.collect()
};
}
ForkReplayMode::Serial | ForkReplayMode::Parallel(_) => active_bank_slots
.iter()
.map(|bank_slot| {
Self::replay_active_bank(
blockstore,
bank_forks,
my_pubkey,
vote_account,
progress,
transaction_status_sender,
entry_notification_sender,
verify_recyclers,
replay_vote_sender,
replay_timing,
log_messages_bytes_limit,
*bank_slot,
prioritization_fee_cache,
)
})
.collect(),
};

Self::process_replay_results(
blockstore,
bank_forks,
progress,
transaction_status_sender,
cache_block_meta_sender,
heaviest_subtree_fork_choice,
bank_notification_sender,
rewards_recorder_sender,
rpc_subscriptions,
duplicate_slots_tracker,
duplicate_confirmed_slots,
epoch_slots_frozen_slots,
unfrozen_gossip_verified_vote_hashes,
latest_validator_votes_for_frozen_banks,
cluster_slots_update_sender,
cost_update_sender,
duplicate_slots_to_repair,
ancestor_hashes_replay_update_sender,
block_metadata_notifier,
&replay_result_vec,
purge_repair_slot_counter,
)
} else {
false
}
Self::process_replay_results(
blockstore,
bank_forks,
progress,
transaction_status_sender,
cache_block_meta_sender,
heaviest_subtree_fork_choice,
bank_notification_sender,
rewards_recorder_sender,
rpc_subscriptions,
duplicate_slots_tracker,
duplicate_confirmed_slots,
epoch_slots_frozen_slots,
unfrozen_gossip_verified_vote_hashes,
latest_validator_votes_for_frozen_banks,
cluster_slots_update_sender,
cost_update_sender,
duplicate_slots_to_repair,
ancestor_hashes_replay_update_sender,
block_metadata_notifier,
&replay_result_vec,
purge_repair_slot_counter,
)
}

#[allow(clippy::too_many_arguments)]
Expand Down