From e84b6ad4804ab03581e25e4a6fbce7ad522ea13c Mon Sep 17 00:00:00 2001 From: steviez Date: Thu, 14 Mar 2024 00:21:37 -0500 Subject: [PATCH 01/14] Allow configuration of replay threadpools from CLI --- core/src/replay_stage.rs | 17 ++++++----- core/src/tvu.rs | 22 ++++++++++++-- core/src/validator.rs | 12 ++++++-- local-cluster/src/validator_configs.rs | 3 +- validator/src/cli.rs | 7 +++++ validator/src/cli/thread_args.rs | 42 ++++++++++++++++++++++++++ validator/src/main.rs | 11 ++++++- 7 files changed, 99 insertions(+), 15 deletions(-) create mode 100644 validator/src/cli/thread_args.rs diff --git a/core/src/replay_stage.rs b/core/src/replay_stage.rs index 8a29d037dedf3c..85d4841d12c256 100644 --- a/core/src/replay_stage.rs +++ b/core/src/replay_stage.rs @@ -51,7 +51,6 @@ use { solana_measure::measure::Measure, solana_poh::poh_recorder::{PohLeaderStatus, PohRecorder, GRACE_TICKS_FACTOR, MAX_GRACE_SLOTS}, solana_program_runtime::timings::ExecuteTimings, - solana_rayon_threadlimit::get_max_thread_count, solana_rpc::{ optimistically_confirmed_bank_tracker::{BankNotification, BankNotificationSenderConfig}, rpc_subscriptions::RpcSubscriptions, @@ -80,6 +79,7 @@ use { solana_vote_program::vote_state::VoteTransaction, std::{ collections::{HashMap, HashSet}, + num::NonZeroUsize, result, sync::{ atomic::{AtomicBool, AtomicU64, Ordering}, @@ -99,7 +99,6 @@ const MAX_VOTE_SIGNATURES: usize = 200; const MAX_VOTE_REFRESH_INTERVAL_MILLIS: usize = 5000; // Expect this number to be small enough to minimize thread pool overhead while large enough // to be able to replay all active forks at the same time in most cases. -const MAX_CONCURRENT_FORKS_TO_REPLAY: usize = 4; const MAX_REPAIR_RETRY_LOOP_ATTEMPTS: usize = 10; #[derive(PartialEq, Eq, Debug)] @@ -291,7 +290,8 @@ pub struct ReplayStageConfig { // Stops voting until this slot has been reached. Should be used to avoid // duplicate voting which can lead to slashing. pub wait_to_vote_slot: Option, - pub replay_slots_concurrently: bool, + pub replay_forks_threads: NonZeroUsize, + pub replay_transactions_threads: NonZeroUsize, } /// Timing information for the ReplayStage main processing loop @@ -574,7 +574,8 @@ impl ReplayStage { ancestor_hashes_replay_update_sender, tower_storage, wait_to_vote_slot, - replay_slots_concurrently, + replay_forks_threads, + replay_transactions_threads, } = config; trace!("replay stage"); @@ -654,9 +655,11 @@ impl ReplayStage { ) }; // Thread pool to (maybe) replay multiple threads in parallel - let replay_mode = if replay_slots_concurrently { + let replay_mode = if replay_forks_threads.get() == 1 { + ForkReplayMode::Serial + } else { let pool = rayon::ThreadPoolBuilder::new() - .num_threads(MAX_CONCURRENT_FORKS_TO_REPLAY) + .num_threads(replay_forks_threads.get()) .thread_name(|i| format!("solReplayFork{i:02}")) .build() .expect("new rayon threadpool"); @@ -666,7 +669,7 @@ impl ReplayStage { }; // Thread pool to replay multiple transactions within one block in parallel let replay_tx_thread_pool = rayon::ThreadPoolBuilder::new() - .num_threads(get_max_thread_count()) + .num_threads(replay_transactions_threads.get()) .thread_name(|i| format!("solReplayTx{i:02}")) .build() .expect("new rayon threadpool"); diff --git a/core/src/tvu.rs b/core/src/tvu.rs index 47bc9a7905da5f..2e64fe0675891b 100644 --- a/core/src/tvu.rs +++ b/core/src/tvu.rs @@ -53,6 +53,7 @@ use { std::{ collections::HashSet, net::{SocketAddr, UdpSocket}, + num::NonZeroUsize, sync::{atomic::AtomicBool, Arc, RwLock}, thread::{self, JoinHandle}, }, @@ -81,7 +82,6 @@ pub struct TvuSockets { pub ancestor_hashes_requests: UdpSocket, } -#[derive(Default)] pub struct TvuConfig { pub max_ledger_shreds: Option, pub shred_version: u16, @@ -90,7 +90,22 @@ pub struct TvuConfig { // Validators which should be given priority when serving repairs pub repair_whitelist: Arc>>, pub wait_for_vote_to_start_leader: bool, - pub replay_slots_concurrently: bool, + pub replay_forks_threads: NonZeroUsize, + pub replay_transactions_threads: NonZeroUsize, +} + +impl Default for TvuConfig { + fn default() -> Self { + Self { + max_ledger_shreds: None, + shred_version: 0, + repair_validators: None, + repair_whitelist: Arc::new(RwLock::new(HashSet::default())), + wait_for_vote_to_start_leader: false, + replay_forks_threads: NonZeroUsize::new(1).expect("1 is non-zero"), + replay_transactions_threads: NonZeroUsize::new(1).expect("1 is non-zero"), + } + } } impl Tvu { @@ -265,7 +280,8 @@ impl Tvu { ancestor_hashes_replay_update_sender, tower_storage: tower_storage.clone(), wait_to_vote_slot, - replay_slots_concurrently: tvu_config.replay_slots_concurrently, + replay_forks_threads: tvu_config.replay_forks_threads, + replay_transactions_threads: tvu_config.replay_transactions_threads, }; let (voting_sender, voting_receiver) = unbounded(); diff --git a/core/src/validator.rs b/core/src/validator.rs index 3d2a93daecba2f..9616a6675e2563 100644 --- a/core/src/validator.rs +++ b/core/src/validator.rs @@ -123,6 +123,7 @@ use { std::{ collections::{HashMap, HashSet}, net::SocketAddr, + num::NonZeroUsize, path::{Path, PathBuf}, sync::{ atomic::{AtomicBool, AtomicU64, Ordering}, @@ -260,7 +261,6 @@ pub struct ValidatorConfig { pub wait_to_vote_slot: Option, pub ledger_column_options: LedgerColumnOptions, pub runtime_config: RuntimeConfig, - pub replay_slots_concurrently: bool, pub banking_trace_dir_byte_limit: banking_trace::DirByteLimit, pub block_verification_method: BlockVerificationMethod, pub block_production_method: BlockProductionMethod, @@ -268,6 +268,8 @@ pub struct ValidatorConfig { pub use_snapshot_archives_at_startup: UseSnapshotArchivesAtStartup, pub wen_restart_proto_path: Option, pub unified_scheduler_handler_threads: Option, + pub replay_forks_threads: NonZeroUsize, + pub replay_transactions_threads: NonZeroUsize, } impl Default for ValidatorConfig { @@ -328,7 +330,6 @@ impl Default for ValidatorConfig { wait_to_vote_slot: None, ledger_column_options: LedgerColumnOptions::default(), runtime_config: RuntimeConfig::default(), - replay_slots_concurrently: false, banking_trace_dir_byte_limit: 0, block_verification_method: BlockVerificationMethod::default(), block_production_method: BlockProductionMethod::default(), @@ -336,6 +337,8 @@ impl Default for ValidatorConfig { use_snapshot_archives_at_startup: UseSnapshotArchivesAtStartup::default(), wen_restart_proto_path: None, unified_scheduler_handler_threads: None, + replay_forks_threads: NonZeroUsize::new(1).expect("1 is non-zero"), + replay_transactions_threads: NonZeroUsize::new(1).expect("1 is non-zero"), } } } @@ -346,6 +349,8 @@ impl ValidatorConfig { enforce_ulimit_nofile: false, rpc_config: JsonRpcConfig::default_for_test(), block_production_method: BlockProductionMethod::ThreadLocalMultiIterator, + replay_forks_threads: NonZeroUsize::new(8).expect("8 is non-zero"), + replay_transactions_threads: NonZeroUsize::new(8).expect("8 is non-zero"), ..Self::default() } } @@ -1305,7 +1310,8 @@ impl Validator { repair_validators: config.repair_validators.clone(), repair_whitelist: config.repair_whitelist.clone(), wait_for_vote_to_start_leader, - replay_slots_concurrently: config.replay_slots_concurrently, + replay_forks_threads: config.replay_forks_threads, + replay_transactions_threads: config.replay_transactions_threads, }, &max_slots, block_metadata_notifier, diff --git a/local-cluster/src/validator_configs.rs b/local-cluster/src/validator_configs.rs index 33883bb02c1d77..45045203412a73 100644 --- a/local-cluster/src/validator_configs.rs +++ b/local-cluster/src/validator_configs.rs @@ -61,7 +61,6 @@ pub fn safe_clone_config(config: &ValidatorConfig) -> ValidatorConfig { wait_to_vote_slot: config.wait_to_vote_slot, ledger_column_options: config.ledger_column_options.clone(), runtime_config: config.runtime_config.clone(), - replay_slots_concurrently: config.replay_slots_concurrently, banking_trace_dir_byte_limit: config.banking_trace_dir_byte_limit, block_verification_method: config.block_verification_method.clone(), block_production_method: config.block_production_method.clone(), @@ -69,6 +68,8 @@ pub fn safe_clone_config(config: &ValidatorConfig) -> ValidatorConfig { use_snapshot_archives_at_startup: config.use_snapshot_archives_at_startup, wen_restart_proto_path: config.wen_restart_proto_path.clone(), unified_scheduler_handler_threads: config.unified_scheduler_handler_threads, + replay_forks_threads: config.replay_forks_threads, + replay_transactions_threads: config.replay_transactions_threads, } } diff --git a/validator/src/cli.rs b/validator/src/cli.rs index e9298d9c02928e..34da579dc1caac 100644 --- a/validator/src/cli.rs +++ b/validator/src/cli.rs @@ -52,6 +52,9 @@ use { std::{path::PathBuf, str::FromStr}, }; +mod thread_args; +use thread_args::{thread_args, DefaultThreadArgs}; + const EXCLUDE_KEY: &str = "account-index-exclude-key"; const INCLUDE_KEY: &str = "account-index-include-key"; // The default minimal snapshot download speed (bytes/second) @@ -1539,6 +1542,7 @@ pub fn app<'a>(version: &'a str, default_args: &'a DefaultArgs) -> App<'a, 'a> { ", ), ) + .args(&thread_args(&default_args.thread_args)) .args(&get_deprecated_arguments()) .after_help("The default subcommand is run") .subcommand( @@ -2179,6 +2183,8 @@ pub struct DefaultArgs { pub banking_trace_dir_byte_limit: String, pub wen_restart_path: String, + + pub thread_args: DefaultThreadArgs, } impl DefaultArgs { @@ -2261,6 +2267,7 @@ impl DefaultArgs { wait_for_restart_window_max_delinquent_stake: "5".to_string(), banking_trace_dir_byte_limit: BANKING_TRACE_DIR_DEFAULT_BYTE_LIMIT.to_string(), wen_restart_path: "wen_restart_progress.proto".to_string(), + thread_args: DefaultThreadArgs::default(), } } } diff --git a/validator/src/cli/thread_args.rs b/validator/src/cli/thread_args.rs new file mode 100644 index 00000000000000..61fca0d966f304 --- /dev/null +++ b/validator/src/cli/thread_args.rs @@ -0,0 +1,42 @@ +//! Arguments for controlling the number of threads allocated for various tasks + +use { + clap::Arg, solana_clap_utils::input_validators::is_within_range, + solana_rayon_threadlimit::get_max_thread_count, +}; + +pub struct DefaultThreadArgs { + pub replay_forks_threads: String, + pub replay_transactions_threads: String, +} + +impl Default for DefaultThreadArgs { + fn default() -> Self { + let num_total_threads = get_max_thread_count(); + + Self { + replay_forks_threads: 1.to_string(), + replay_transactions_threads: num_total_threads.to_string(), + } + } +} + +pub fn thread_args<'a>(defaults: &DefaultThreadArgs) -> Vec> { + // Do not let any threadpool size scale over the number of threads + vec![ + Arg::with_name("replay_forks_threads") + .long("replay-forks-threads") + .takes_value(true) + .value_name("NUMBER") + .default_value(&defaults.replay_forks_threads) + .validator(|num| is_within_range(num, 1..=get_max_thread_count())) + .help("Number of threads to use for replay of blocks on different forks"), + Arg::with_name("replay_transactions_threads") + .long("replay-transactions-threads") + .takes_value(true) + .value_name("NUMBER") + .default_value(&defaults.replay_transactions_threads) + .validator(|num| is_within_range(num, 1..=get_max_thread_count())) + .help("Number of threads to use for transaction replay"), + ] +} diff --git a/validator/src/main.rs b/validator/src/main.rs index 7f3de66b457c74..052c2584ebd1cc 100644 --- a/validator/src/main.rs +++ b/validator/src/main.rs @@ -1446,12 +1446,21 @@ pub fn main() { ..RuntimeConfig::default() }, staked_nodes_overrides: staked_nodes_overrides.clone(), - replay_slots_concurrently: matches.is_present("replay_slots_concurrently"), use_snapshot_archives_at_startup: value_t_or_exit!( matches, use_snapshot_archives_at_startup::cli::NAME, UseSnapshotArchivesAtStartup ), + replay_forks_threads: if matches.is_present("replay_slots_concurrently") { + NonZeroUsize::new(4).expect("4 is non-zero") + } else { + value_t_or_exit!(matches, "replay_forks_threads", NonZeroUsize) + }, + replay_transactions_threads: value_t_or_exit!( + matches, + "replay_transactions_threads", + NonZeroUsize + ), ..ValidatorConfig::default() }; From 104f0e1969aef4ce5c38142869202a709886e08d Mon Sep 17 00:00:00 2001 From: steviez Date: Thu, 14 Mar 2024 00:41:04 -0500 Subject: [PATCH 02/14] Deprecate old arg --- validator/src/cli.rs | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/validator/src/cli.rs b/validator/src/cli.rs index 34da579dc1caac..cd491a63a32ccb 100644 --- a/validator/src/cli.rs +++ b/validator/src/cli.rs @@ -1453,11 +1453,6 @@ pub fn app<'a>(version: &'a str, default_args: &'a DefaultArgs) -> App<'a, 'a> { .value_name("BYTES") .help("Maximum number of bytes written to the program log before truncation"), ) - .arg( - Arg::with_name("replay_slots_concurrently") - .long("replay-slots-concurrently") - .help("Allow concurrent replay of slots on different forks"), - ) .arg( Arg::with_name("banking_trace_dir_byte_limit") // expose friendly alternative name to cli than internal @@ -2061,6 +2056,13 @@ fn deprecated_arguments() -> Vec { .long("no-rocksdb-compaction") .takes_value(false) .help("Disable manual compaction of the ledger database")); + add_arg!( + Arg::with_name("replay_slots_concurrently") + .long("replay-slots-concurrently") + .help("Allow concurrent replay of slots on different forks") + .conflicts_with("replay_forks_threads"), + replaced_by: "replay_forks_threads", + usage_warning: "Equivalent behavior to this flag would be --replay-forks-threads 4"); add_arg!(Arg::with_name("rocksdb_compaction_interval") .long("rocksdb-compaction-interval-slots") .value_name("ROCKSDB_COMPACTION_INTERVAL_SLOTS") From f2cf251454a2ca72fc3e9b64c999a2ee7900b335 Mon Sep 17 00:00:00 2001 From: steviez Date: Thu, 14 Mar 2024 01:38:11 -0500 Subject: [PATCH 03/14] Small things I missed after self-reviewing the PR --- core/src/replay_stage.rs | 2 -- core/src/validator.rs | 2 +- 2 files changed, 1 insertion(+), 3 deletions(-) diff --git a/core/src/replay_stage.rs b/core/src/replay_stage.rs index 85d4841d12c256..d4fc1fea44b602 100644 --- a/core/src/replay_stage.rs +++ b/core/src/replay_stage.rs @@ -97,8 +97,6 @@ pub const DUPLICATE_LIVENESS_THRESHOLD: f64 = 0.1; pub const DUPLICATE_THRESHOLD: f64 = 1.0 - SWITCH_FORK_THRESHOLD - DUPLICATE_LIVENESS_THRESHOLD; const MAX_VOTE_SIGNATURES: usize = 200; const MAX_VOTE_REFRESH_INTERVAL_MILLIS: usize = 5000; -// Expect this number to be small enough to minimize thread pool overhead while large enough -// to be able to replay all active forks at the same time in most cases. const MAX_REPAIR_RETRY_LOOP_ATTEMPTS: usize = 10; #[derive(PartialEq, Eq, Debug)] diff --git a/core/src/validator.rs b/core/src/validator.rs index 9616a6675e2563..752cffd7b030d9 100644 --- a/core/src/validator.rs +++ b/core/src/validator.rs @@ -349,7 +349,7 @@ impl ValidatorConfig { enforce_ulimit_nofile: false, rpc_config: JsonRpcConfig::default_for_test(), block_production_method: BlockProductionMethod::ThreadLocalMultiIterator, - replay_forks_threads: NonZeroUsize::new(8).expect("8 is non-zero"), + replay_forks_threads: NonZeroUsize::new(1).expect("1 is non-zero"), replay_transactions_threads: NonZeroUsize::new(8).expect("8 is non-zero"), ..Self::default() } From 058b92e145b79e3e6b9810f42c3e451d088a28bf Mon Sep 17 00:00:00 2001 From: steviez Date: Thu, 14 Mar 2024 14:40:17 -0500 Subject: [PATCH 04/14] Bring back max_forks constant and use as upper bound --- core/src/replay_stage.rs | 5 +++++ validator/src/cli/thread_args.rs | 3 ++- 2 files changed, 7 insertions(+), 1 deletion(-) diff --git a/core/src/replay_stage.rs b/core/src/replay_stage.rs index d4fc1fea44b602..c7c7e7fc1d397a 100644 --- a/core/src/replay_stage.rs +++ b/core/src/replay_stage.rs @@ -95,6 +95,11 @@ pub const SUPERMINORITY_THRESHOLD: f64 = 1f64 / 3f64; pub const MAX_UNCONFIRMED_SLOTS: usize = 5; pub const DUPLICATE_LIVENESS_THRESHOLD: f64 = 0.1; pub const DUPLICATE_THRESHOLD: f64 = 1.0 - SWITCH_FORK_THRESHOLD - DUPLICATE_LIVENESS_THRESHOLD; +// The maximum number of threads to allocate for replaying forks concurrently +// This value was chosen to be small enough to limit the overhead of having a large thread pool +// while also being large enough to allow replay of all active forks in most scenarios. +pub const MAX_CONCURRENT_FORKS_TO_REPLAY: usize = 4; + const MAX_VOTE_SIGNATURES: usize = 200; const MAX_VOTE_REFRESH_INTERVAL_MILLIS: usize = 5000; const MAX_REPAIR_RETRY_LOOP_ATTEMPTS: usize = 10; diff --git a/validator/src/cli/thread_args.rs b/validator/src/cli/thread_args.rs index 61fca0d966f304..f26fde4ee5b8dd 100644 --- a/validator/src/cli/thread_args.rs +++ b/validator/src/cli/thread_args.rs @@ -2,6 +2,7 @@ use { clap::Arg, solana_clap_utils::input_validators::is_within_range, + solana_core::replay_stage::MAX_CONCURRENT_FORKS_TO_REPLAY, solana_rayon_threadlimit::get_max_thread_count, }; @@ -29,7 +30,7 @@ pub fn thread_args<'a>(defaults: &DefaultThreadArgs) -> Vec> { .takes_value(true) .value_name("NUMBER") .default_value(&defaults.replay_forks_threads) - .validator(|num| is_within_range(num, 1..=get_max_thread_count())) + .validator(|num| is_within_range(num, 1..=MAX_CONCURRENT_FORKS_TO_REPLAY)) .help("Number of threads to use for replay of blocks on different forks"), Arg::with_name("replay_transactions_threads") .long("replay-transactions-threads") From fe9da3c23a5adb6ec8c21f2f8f2b4ff07b638d5b Mon Sep 17 00:00:00 2001 From: steviez Date: Thu, 14 Mar 2024 14:58:07 -0500 Subject: [PATCH 05/14] Keep replay thread pool size for test ValidatorConfig too --- core/src/validator.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/core/src/validator.rs b/core/src/validator.rs index 752cffd7b030d9..98a267aeafc71a 100644 --- a/core/src/validator.rs +++ b/core/src/validator.rs @@ -74,6 +74,7 @@ use { poh_service::{self, PohService}, }, solana_program_runtime::runtime_config::RuntimeConfig, + solana_rayon_threadlimit::get_max_thread_count, solana_rpc::{ max_slots::MaxSlots, optimistically_confirmed_bank_tracker::{ @@ -350,7 +351,8 @@ impl ValidatorConfig { rpc_config: JsonRpcConfig::default_for_test(), block_production_method: BlockProductionMethod::ThreadLocalMultiIterator, replay_forks_threads: NonZeroUsize::new(1).expect("1 is non-zero"), - replay_transactions_threads: NonZeroUsize::new(8).expect("8 is non-zero"), + replay_transactions_threads: NonZeroUsize::new(get_max_thread_count()) + .expect("thread count is non-zero"), ..Self::default() } } From 059899f019f04904955766349e9671293a99273f Mon Sep 17 00:00:00 2001 From: steviez Date: Mon, 18 Mar 2024 11:28:21 -0500 Subject: [PATCH 06/14] Hide the new args by default --- validator/src/cli/thread_args.rs | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/validator/src/cli/thread_args.rs b/validator/src/cli/thread_args.rs index f26fde4ee5b8dd..2a7d9487267cbb 100644 --- a/validator/src/cli/thread_args.rs +++ b/validator/src/cli/thread_args.rs @@ -1,7 +1,8 @@ //! Arguments for controlling the number of threads allocated for various tasks use { - clap::Arg, solana_clap_utils::input_validators::is_within_range, + clap::Arg, + solana_clap_utils::{hidden_unless_forced, input_validators::is_within_range}, solana_core::replay_stage::MAX_CONCURRENT_FORKS_TO_REPLAY, solana_rayon_threadlimit::get_max_thread_count, }; @@ -31,6 +32,7 @@ pub fn thread_args<'a>(defaults: &DefaultThreadArgs) -> Vec> { .value_name("NUMBER") .default_value(&defaults.replay_forks_threads) .validator(|num| is_within_range(num, 1..=MAX_CONCURRENT_FORKS_TO_REPLAY)) + .hidden(hidden_unless_forced()) .help("Number of threads to use for replay of blocks on different forks"), Arg::with_name("replay_transactions_threads") .long("replay-transactions-threads") @@ -38,6 +40,7 @@ pub fn thread_args<'a>(defaults: &DefaultThreadArgs) -> Vec> { .value_name("NUMBER") .default_value(&defaults.replay_transactions_threads) .validator(|num| is_within_range(num, 1..=get_max_thread_count())) + .hidden(hidden_unless_forced()) .help("Number of threads to use for transaction replay"), ] } From 4386dbb869b84a62f77641abba704c65437b473e Mon Sep 17 00:00:00 2001 From: steviez Date: Mon, 18 Mar 2024 12:20:08 -0500 Subject: [PATCH 07/14] Move arg parsing over to separate module --- validator/src/cli.rs | 2 +- validator/src/cli/thread_args.rs | 23 ++++++++++++++++++++++- validator/src/main.rs | 19 ++++++++----------- 3 files changed, 31 insertions(+), 13 deletions(-) diff --git a/validator/src/cli.rs b/validator/src/cli.rs index cd491a63a32ccb..b306713d4ed14c 100644 --- a/validator/src/cli.rs +++ b/validator/src/cli.rs @@ -52,7 +52,7 @@ use { std::{path::PathBuf, str::FromStr}, }; -mod thread_args; +pub mod thread_args; use thread_args::{thread_args, DefaultThreadArgs}; const EXCLUDE_KEY: &str = "account-index-exclude-key"; diff --git a/validator/src/cli/thread_args.rs b/validator/src/cli/thread_args.rs index 2a7d9487267cbb..bccaed3aabe0d0 100644 --- a/validator/src/cli/thread_args.rs +++ b/validator/src/cli/thread_args.rs @@ -1,10 +1,11 @@ //! Arguments for controlling the number of threads allocated for various tasks use { - clap::Arg, + clap::{value_t_or_exit, Arg, ArgMatches}, solana_clap_utils::{hidden_unless_forced, input_validators::is_within_range}, solana_core::replay_stage::MAX_CONCURRENT_FORKS_TO_REPLAY, solana_rayon_threadlimit::get_max_thread_count, + std::num::NonZeroUsize, }; pub struct DefaultThreadArgs { @@ -23,6 +24,11 @@ impl Default for DefaultThreadArgs { } } +pub struct NumThreadConfig { + pub replay_forks_threads: NonZeroUsize, + pub replay_transactions_threads: NonZeroUsize, +} + pub fn thread_args<'a>(defaults: &DefaultThreadArgs) -> Vec> { // Do not let any threadpool size scale over the number of threads vec![ @@ -44,3 +50,18 @@ pub fn thread_args<'a>(defaults: &DefaultThreadArgs) -> Vec> { .help("Number of threads to use for transaction replay"), ] } + +pub fn parse_num_threads_args(matches: &ArgMatches) -> NumThreadConfig { + NumThreadConfig { + replay_forks_threads: if matches.is_present("replay_slots_concurrently") { + NonZeroUsize::new(4).expect("4 is non-zero") + } else { + value_t_or_exit!(matches, "replay_forks_threads", NonZeroUsize) + }, + replay_transactions_threads: value_t_or_exit!( + matches, + "replay_transactions_threads", + NonZeroUsize + ), + } +} diff --git a/validator/src/main.rs b/validator/src/main.rs index 052c2584ebd1cc..1cbbc090413b80 100644 --- a/validator/src/main.rs +++ b/validator/src/main.rs @@ -6,7 +6,7 @@ use { admin_rpc_service, admin_rpc_service::{load_staked_nodes_overrides, StakedNodesOverrides}, bootstrap, - cli::{app, warn_for_deprecated_arguments, DefaultArgs}, + cli::{self, app, warn_for_deprecated_arguments, DefaultArgs}, dashboard::Dashboard, ledger_lockfile, lock_ledger, new_spinner_progress_bar, println_name_value, redirect_stderr_to_file, @@ -1310,6 +1310,11 @@ pub fn main() { } let full_api = matches.is_present("full_rpc_api"); + let cli::thread_args::NumThreadConfig { + replay_forks_threads, + replay_transactions_threads, + } = cli::thread_args::parse_num_threads_args(&matches); + let mut validator_config = ValidatorConfig { require_tower: matches.is_present("require_tower"), tower_storage, @@ -1451,16 +1456,8 @@ pub fn main() { use_snapshot_archives_at_startup::cli::NAME, UseSnapshotArchivesAtStartup ), - replay_forks_threads: if matches.is_present("replay_slots_concurrently") { - NonZeroUsize::new(4).expect("4 is non-zero") - } else { - value_t_or_exit!(matches, "replay_forks_threads", NonZeroUsize) - }, - replay_transactions_threads: value_t_or_exit!( - matches, - "replay_transactions_threads", - NonZeroUsize - ), + replay_forks_threads, + replay_transactions_threads, ..ValidatorConfig::default() }; From b6bdbac5229d6ecb094fd8ee05720425e10f216a Mon Sep 17 00:00:00 2001 From: steviez Date: Mon, 18 Mar 2024 12:40:52 -0500 Subject: [PATCH 08/14] Move argument strings to modules --- validator/src/cli/thread_args.rs | 28 ++++++++++++++++++++-------- 1 file changed, 20 insertions(+), 8 deletions(-) diff --git a/validator/src/cli/thread_args.rs b/validator/src/cli/thread_args.rs index bccaed3aabe0d0..0d857306c68330 100644 --- a/validator/src/cli/thread_args.rs +++ b/validator/src/cli/thread_args.rs @@ -32,22 +32,22 @@ pub struct NumThreadConfig { pub fn thread_args<'a>(defaults: &DefaultThreadArgs) -> Vec> { // Do not let any threadpool size scale over the number of threads vec![ - Arg::with_name("replay_forks_threads") - .long("replay-forks-threads") + Arg::with_name(replay_forks_threads::NAME) + .long(replay_forks_threads::LONG_ARG) .takes_value(true) .value_name("NUMBER") .default_value(&defaults.replay_forks_threads) .validator(|num| is_within_range(num, 1..=MAX_CONCURRENT_FORKS_TO_REPLAY)) .hidden(hidden_unless_forced()) - .help("Number of threads to use for replay of blocks on different forks"), - Arg::with_name("replay_transactions_threads") - .long("replay-transactions-threads") + .help(replay_forks_threads::HELP), + Arg::with_name(replay_transactions_threads::NAME) + .long(replay_transactions_threads::LONG_ARG) .takes_value(true) .value_name("NUMBER") .default_value(&defaults.replay_transactions_threads) .validator(|num| is_within_range(num, 1..=get_max_thread_count())) .hidden(hidden_unless_forced()) - .help("Number of threads to use for transaction replay"), + .help(replay_transactions_threads::HELP), ] } @@ -56,12 +56,24 @@ pub fn parse_num_threads_args(matches: &ArgMatches) -> NumThreadConfig { replay_forks_threads: if matches.is_present("replay_slots_concurrently") { NonZeroUsize::new(4).expect("4 is non-zero") } else { - value_t_or_exit!(matches, "replay_forks_threads", NonZeroUsize) + value_t_or_exit!(matches, replay_forks_threads::NAME, NonZeroUsize) }, replay_transactions_threads: value_t_or_exit!( matches, - "replay_transactions_threads", + replay_transactions_threads::NAME, NonZeroUsize ), } } + +mod replay_forks_threads { + pub const NAME: &str = "replay_forks_threads"; + pub const LONG_ARG: &str = "replay-forks-threads"; + pub const HELP: &str = "Number of threads to use for replay of blocks on different forks"; +} + +mod replay_transactions_threads { + pub const NAME: &str = "replay_transactions_threads"; + pub const LONG_ARG: &str = "replay-transactions-threads"; + pub const HELP: &str = "Number of threads to use for transaction replay"; +} From 1f5bac425947890aa4141b36086e7ccfbc13ac6c Mon Sep 17 00:00:00 2001 From: steviez Date: Mon, 18 Mar 2024 13:41:41 -0500 Subject: [PATCH 09/14] Move constant from ReplayStage to CLI --- core/src/replay_stage.rs | 4 ---- validator/src/cli/thread_args.rs | 6 ++++-- 2 files changed, 4 insertions(+), 6 deletions(-) diff --git a/core/src/replay_stage.rs b/core/src/replay_stage.rs index c7c7e7fc1d397a..08f54db265c94f 100644 --- a/core/src/replay_stage.rs +++ b/core/src/replay_stage.rs @@ -95,10 +95,6 @@ pub const SUPERMINORITY_THRESHOLD: f64 = 1f64 / 3f64; pub const MAX_UNCONFIRMED_SLOTS: usize = 5; pub const DUPLICATE_LIVENESS_THRESHOLD: f64 = 0.1; pub const DUPLICATE_THRESHOLD: f64 = 1.0 - SWITCH_FORK_THRESHOLD - DUPLICATE_LIVENESS_THRESHOLD; -// The maximum number of threads to allocate for replaying forks concurrently -// This value was chosen to be small enough to limit the overhead of having a large thread pool -// while also being large enough to allow replay of all active forks in most scenarios. -pub const MAX_CONCURRENT_FORKS_TO_REPLAY: usize = 4; const MAX_VOTE_SIGNATURES: usize = 200; const MAX_VOTE_REFRESH_INTERVAL_MILLIS: usize = 5000; diff --git a/validator/src/cli/thread_args.rs b/validator/src/cli/thread_args.rs index 0d857306c68330..cadd307ad7f871 100644 --- a/validator/src/cli/thread_args.rs +++ b/validator/src/cli/thread_args.rs @@ -3,7 +3,6 @@ use { clap::{value_t_or_exit, Arg, ArgMatches}, solana_clap_utils::{hidden_unless_forced, input_validators::is_within_range}, - solana_core::replay_stage::MAX_CONCURRENT_FORKS_TO_REPLAY, solana_rayon_threadlimit::get_max_thread_count, std::num::NonZeroUsize, }; @@ -37,7 +36,7 @@ pub fn thread_args<'a>(defaults: &DefaultThreadArgs) -> Vec> { .takes_value(true) .value_name("NUMBER") .default_value(&defaults.replay_forks_threads) - .validator(|num| is_within_range(num, 1..=MAX_CONCURRENT_FORKS_TO_REPLAY)) + .validator(|num| is_within_range(num, 1..=replay_forks_threads::MAX)) .hidden(hidden_unless_forced()) .help(replay_forks_threads::HELP), Arg::with_name(replay_transactions_threads::NAME) @@ -70,6 +69,9 @@ mod replay_forks_threads { pub const NAME: &str = "replay_forks_threads"; pub const LONG_ARG: &str = "replay-forks-threads"; pub const HELP: &str = "Number of threads to use for replay of blocks on different forks"; + // Choose a value that is small enough to limit the overhead of having a large thread pool + // while also being large enough to allow replay of all active forks in most scenarios + pub const MAX: usize = 4; } mod replay_transactions_threads { From 2b504242899997406175f74b992e9df841252205 Mon Sep 17 00:00:00 2001 From: steviez Date: Tue, 19 Mar 2024 00:18:40 -0500 Subject: [PATCH 10/14] Shouldn't have trusted auto-merge --- core/src/replay_stage.rs | 2 -- 1 file changed, 2 deletions(-) diff --git a/core/src/replay_stage.rs b/core/src/replay_stage.rs index 08f54db265c94f..48641297f63fcc 100644 --- a/core/src/replay_stage.rs +++ b/core/src/replay_stage.rs @@ -663,8 +663,6 @@ impl ReplayStage { .build() .expect("new rayon threadpool"); ForkReplayMode::Parallel(pool) - } else { - ForkReplayMode::Serial }; // Thread pool to replay multiple transactions within one block in parallel let replay_tx_thread_pool = rayon::ThreadPoolBuilder::new() From 21cf72efe146f86176e51c09cab46624f79087d2 Mon Sep 17 00:00:00 2001 From: steviez Date: Tue, 19 Mar 2024 14:17:14 -0500 Subject: [PATCH 11/14] Introduce trait and use it for num forks argument The trait will reduce redundant code and further enforce these args all being uniform with each other, both in the source code and on the CLI at runtime --- validator/src/cli/thread_args.rs | 75 ++++++++++++++++++++++++-------- 1 file changed, 56 insertions(+), 19 deletions(-) diff --git a/validator/src/cli/thread_args.rs b/validator/src/cli/thread_args.rs index cadd307ad7f871..b3db9970dd00f9 100644 --- a/validator/src/cli/thread_args.rs +++ b/validator/src/cli/thread_args.rs @@ -4,7 +4,7 @@ use { clap::{value_t_or_exit, Arg, ArgMatches}, solana_clap_utils::{hidden_unless_forced, input_validators::is_within_range}, solana_rayon_threadlimit::get_max_thread_count, - std::num::NonZeroUsize, + std::{num::NonZeroUsize, ops::RangeInclusive}, }; pub struct DefaultThreadArgs { @@ -17,7 +17,7 @@ impl Default for DefaultThreadArgs { let num_total_threads = get_max_thread_count(); Self { - replay_forks_threads: 1.to_string(), + replay_forks_threads: ReplayForksThreadsArg::default().to_string(), replay_transactions_threads: num_total_threads.to_string(), } } @@ -29,16 +29,8 @@ pub struct NumThreadConfig { } pub fn thread_args<'a>(defaults: &DefaultThreadArgs) -> Vec> { - // Do not let any threadpool size scale over the number of threads vec![ - Arg::with_name(replay_forks_threads::NAME) - .long(replay_forks_threads::LONG_ARG) - .takes_value(true) - .value_name("NUMBER") - .default_value(&defaults.replay_forks_threads) - .validator(|num| is_within_range(num, 1..=replay_forks_threads::MAX)) - .hidden(hidden_unless_forced()) - .help(replay_forks_threads::HELP), + new_thread_arg::(&defaults.replay_forks_threads), Arg::with_name(replay_transactions_threads::NAME) .long(replay_transactions_threads::LONG_ARG) .takes_value(true) @@ -50,12 +42,23 @@ pub fn thread_args<'a>(defaults: &DefaultThreadArgs) -> Vec> { ] } +fn new_thread_arg<'a, T: ThreadArg>(default: &str) -> Arg<'_, 'a> { + Arg::with_name(T::NAME) + .long(T::LONG_NAME) + .takes_value(true) + .value_name("NUMBER") + .default_value(default) + .validator(|num| is_within_range(num, T::range())) + .hidden(hidden_unless_forced()) + .help(T::HELP) +} + pub fn parse_num_threads_args(matches: &ArgMatches) -> NumThreadConfig { NumThreadConfig { replay_forks_threads: if matches.is_present("replay_slots_concurrently") { NonZeroUsize::new(4).expect("4 is non-zero") } else { - value_t_or_exit!(matches, replay_forks_threads::NAME, NonZeroUsize) + value_t_or_exit!(matches, ReplayForksThreadsArg::NAME, NonZeroUsize) }, replay_transactions_threads: value_t_or_exit!( matches, @@ -65,13 +68,47 @@ pub fn parse_num_threads_args(matches: &ArgMatches) -> NumThreadConfig { } } -mod replay_forks_threads { - pub const NAME: &str = "replay_forks_threads"; - pub const LONG_ARG: &str = "replay-forks-threads"; - pub const HELP: &str = "Number of threads to use for replay of blocks on different forks"; - // Choose a value that is small enough to limit the overhead of having a large thread pool - // while also being large enough to allow replay of all active forks in most scenarios - pub const MAX: usize = 4; +/// Configuration for CLAP arguments that control the number of threads for various functions +trait ThreadArg { + /// The argument's name + const NAME: &'static str; + /// The argument's long name + const LONG_NAME: &'static str; + /// The argument's help message + const HELP: &'static str; + + /// The default number of threads + fn default() -> usize; + /// The minimum allowed value of threads (inclusive) + fn min() -> usize { + 1 + } + /// The maximum allowed value of threads (inclusive) + fn max() -> usize { + // By default, no thread pool should scale over the machine's number of threads + get_max_thread_count() + } + /// The range of allowable values + fn range() -> RangeInclusive { + RangeInclusive::new(Self::min(), Self::max()) + } +} + +struct ReplayForksThreadsArg; +impl ThreadArg for ReplayForksThreadsArg { + const NAME: &'static str = "replay_forks_threads"; + const LONG_NAME: &'static str = "replay-forks-threads"; + const HELP: &'static str = "Number of threads to use for replay of blocks on different forks"; + + fn default() -> usize { + // Default to single threaded fork execution + 1 + } + fn max() -> usize { + // Choose a value that is small enough to limit the overhead of having a large thread pool + // while also being large enough to allow replay of all active forks in most scenarios + 4 + } } mod replay_transactions_threads { From 0f1a2a96a5a4514de786605a9ddf0b1b860a56d4 Mon Sep 17 00:00:00 2001 From: steviez Date: Tue, 19 Mar 2024 14:22:20 -0500 Subject: [PATCH 12/14] Switch replay tx threads over to the new schema --- validator/src/cli/thread_args.rs | 28 ++++++++++++---------------- 1 file changed, 12 insertions(+), 16 deletions(-) diff --git a/validator/src/cli/thread_args.rs b/validator/src/cli/thread_args.rs index b3db9970dd00f9..c6c65bb7fd9688 100644 --- a/validator/src/cli/thread_args.rs +++ b/validator/src/cli/thread_args.rs @@ -14,11 +14,9 @@ pub struct DefaultThreadArgs { impl Default for DefaultThreadArgs { fn default() -> Self { - let num_total_threads = get_max_thread_count(); - Self { replay_forks_threads: ReplayForksThreadsArg::default().to_string(), - replay_transactions_threads: num_total_threads.to_string(), + replay_transactions_threads: ReplayTransactionsThreadsArg::default().to_string(), } } } @@ -31,14 +29,7 @@ pub struct NumThreadConfig { pub fn thread_args<'a>(defaults: &DefaultThreadArgs) -> Vec> { vec![ new_thread_arg::(&defaults.replay_forks_threads), - Arg::with_name(replay_transactions_threads::NAME) - .long(replay_transactions_threads::LONG_ARG) - .takes_value(true) - .value_name("NUMBER") - .default_value(&defaults.replay_transactions_threads) - .validator(|num| is_within_range(num, 1..=get_max_thread_count())) - .hidden(hidden_unless_forced()) - .help(replay_transactions_threads::HELP), + new_thread_arg::(&defaults.replay_transactions_threads), ] } @@ -62,7 +53,7 @@ pub fn parse_num_threads_args(matches: &ArgMatches) -> NumThreadConfig { }, replay_transactions_threads: value_t_or_exit!( matches, - replay_transactions_threads::NAME, + ReplayTransactionsThreadsArg::NAME, NonZeroUsize ), } @@ -111,8 +102,13 @@ impl ThreadArg for ReplayForksThreadsArg { } } -mod replay_transactions_threads { - pub const NAME: &str = "replay_transactions_threads"; - pub const LONG_ARG: &str = "replay-transactions-threads"; - pub const HELP: &str = "Number of threads to use for transaction replay"; +struct ReplayTransactionsThreadsArg; +impl ThreadArg for ReplayTransactionsThreadsArg { + const NAME: &'static str = "replay_transactions_threads"; + const LONG_NAME: &'static str = "replay-transactions-threads"; + const HELP: &'static str = "Number of threads to use for transaction replay"; + + fn default() -> usize { + get_max_thread_count() + } } From b59585597df0c9957a72c07806a663c4d29716a2 Mon Sep 17 00:00:00 2001 From: steviez Date: Tue, 19 Mar 2024 14:24:09 -0500 Subject: [PATCH 13/14] Add a comment and shift something around for logical grouping --- validator/src/cli/thread_args.rs | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/validator/src/cli/thread_args.rs b/validator/src/cli/thread_args.rs index c6c65bb7fd9688..03317ce1f1c2ea 100644 --- a/validator/src/cli/thread_args.rs +++ b/validator/src/cli/thread_args.rs @@ -7,6 +7,7 @@ use { std::{num::NonZeroUsize, ops::RangeInclusive}, }; +// Need this struct to provide &str whose lifetime matches that of the CLAP Arg's pub struct DefaultThreadArgs { pub replay_forks_threads: String, pub replay_transactions_threads: String, @@ -21,11 +22,6 @@ impl Default for DefaultThreadArgs { } } -pub struct NumThreadConfig { - pub replay_forks_threads: NonZeroUsize, - pub replay_transactions_threads: NonZeroUsize, -} - pub fn thread_args<'a>(defaults: &DefaultThreadArgs) -> Vec> { vec![ new_thread_arg::(&defaults.replay_forks_threads), @@ -44,6 +40,11 @@ fn new_thread_arg<'a, T: ThreadArg>(default: &str) -> Arg<'_, 'a> { .help(T::HELP) } +pub struct NumThreadConfig { + pub replay_forks_threads: NonZeroUsize, + pub replay_transactions_threads: NonZeroUsize, +} + pub fn parse_num_threads_args(matches: &ArgMatches) -> NumThreadConfig { NumThreadConfig { replay_forks_threads: if matches.is_present("replay_slots_concurrently") { From 95ffef28a16633078f05e7fded0529769d6bc537 Mon Sep 17 00:00:00 2001 From: steviez Date: Tue, 19 Mar 2024 14:26:09 -0500 Subject: [PATCH 14/14] Tweak grammar / word choice a little --- validator/src/cli/thread_args.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/validator/src/cli/thread_args.rs b/validator/src/cli/thread_args.rs index 03317ce1f1c2ea..53d8cf15d984a0 100644 --- a/validator/src/cli/thread_args.rs +++ b/validator/src/cli/thread_args.rs @@ -71,16 +71,16 @@ trait ThreadArg { /// The default number of threads fn default() -> usize; - /// The minimum allowed value of threads (inclusive) + /// The minimum allowed number of threads (inclusive) fn min() -> usize { 1 } - /// The maximum allowed value of threads (inclusive) + /// The maximum allowed number of threads (inclusive) fn max() -> usize { - // By default, no thread pool should scale over the machine's number of threads + // By default, no thread pool should scale over the number of the machine's threads get_max_thread_count() } - /// The range of allowable values + /// The range of allowed number of threads (inclusive on both ends) fn range() -> RangeInclusive { RangeInclusive::new(Self::min(), Self::max()) }