From 11529076cdaa30dc276d6707d65af1e0e7f89a6c Mon Sep 17 00:00:00 2001 From: Tyera Eulberg Date: Mon, 3 Apr 2023 20:57:04 -0600 Subject: [PATCH 1/7] Move entry_notifier_interface --- Cargo.lock | 1 + geyser-plugin-manager/Cargo.toml | 1 + geyser-plugin-manager/src/entry_notifier.rs | 2 +- geyser-plugin-manager/src/geyser_plugin_service.rs | 2 +- {rpc => ledger}/src/entry_notifier_interface.rs | 0 ledger/src/lib.rs | 1 + programs/sbf/Cargo.lock | 1 + rpc/src/lib.rs | 1 - 8 files changed, 6 insertions(+), 3 deletions(-) rename {rpc => ledger}/src/entry_notifier_interface.rs (100%) diff --git a/Cargo.lock b/Cargo.lock index f2bb37c51a531c..08a6ad16d30251 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5815,6 +5815,7 @@ dependencies = [ "serde_json", "solana-entry", "solana-geyser-plugin-interface", + "solana-ledger", "solana-measure", "solana-metrics", "solana-rpc", diff --git a/geyser-plugin-manager/Cargo.toml b/geyser-plugin-manager/Cargo.toml index 34dbe998db97f3..bf0d61a636a5fb 100644 --- a/geyser-plugin-manager/Cargo.toml +++ b/geyser-plugin-manager/Cargo.toml @@ -21,6 +21,7 @@ log = { workspace = true } serde_json = { workspace = true } solana-entry = { workspace = true } solana-geyser-plugin-interface = { workspace = true } +solana-ledger = { workspace = true } solana-measure = { workspace = true } solana-metrics = { workspace = true } solana-rpc = { workspace = true } diff --git a/geyser-plugin-manager/src/entry_notifier.rs b/geyser-plugin-manager/src/entry_notifier.rs index 26a2db238201b4..ce6c3239c0946c 100644 --- a/geyser-plugin-manager/src/entry_notifier.rs +++ b/geyser-plugin-manager/src/entry_notifier.rs @@ -6,9 +6,9 @@ use { solana_geyser_plugin_interface::geyser_plugin_interface::{ ReplicaEntryInfo, ReplicaEntryInfoVersions, }, + solana_ledger::entry_notifier_interface::EntryNotifier, solana_measure::measure::Measure, solana_metrics::*, - solana_rpc::entry_notifier_interface::EntryNotifier, solana_sdk::clock::Slot, std::sync::{Arc, RwLock}, }; diff --git a/geyser-plugin-manager/src/geyser_plugin_service.rs b/geyser-plugin-manager/src/geyser_plugin_service.rs index ece7bb1c63764b..18babc7d471b97 100644 --- a/geyser-plugin-manager/src/geyser_plugin_service.rs +++ b/geyser-plugin-manager/src/geyser_plugin_service.rs @@ -11,8 +11,8 @@ use { }, crossbeam_channel::Receiver, log::*, + solana_ledger::entry_notifier_interface::EntryNotifierLock, solana_rpc::{ - entry_notifier_interface::EntryNotifierLock, optimistically_confirmed_bank_tracker::SlotNotification, transaction_notifier_interface::TransactionNotifierLock, }, diff --git a/rpc/src/entry_notifier_interface.rs b/ledger/src/entry_notifier_interface.rs similarity index 100% rename from rpc/src/entry_notifier_interface.rs rename to ledger/src/entry_notifier_interface.rs diff --git a/ledger/src/lib.rs b/ledger/src/lib.rs index 98b677f60ad755..6e09170da77b25 100644 --- a/ledger/src/lib.rs +++ b/ledger/src/lib.rs @@ -14,6 +14,7 @@ pub mod blockstore_meta; pub mod blockstore_metrics; pub mod blockstore_options; pub mod blockstore_processor; +pub mod entry_notifier_interface; pub mod genesis_utils; pub mod leader_schedule; pub mod leader_schedule_cache; diff --git a/programs/sbf/Cargo.lock b/programs/sbf/Cargo.lock index e7d119f2070123..129fd844f71bee 100644 --- a/programs/sbf/Cargo.lock +++ b/programs/sbf/Cargo.lock @@ -5009,6 +5009,7 @@ dependencies = [ "serde_json", "solana-entry", "solana-geyser-plugin-interface", + "solana-ledger", "solana-measure", "solana-metrics", "solana-rpc", diff --git a/rpc/src/lib.rs b/rpc/src/lib.rs index 769397658732dd..d021155e8911b5 100644 --- a/rpc/src/lib.rs +++ b/rpc/src/lib.rs @@ -1,6 +1,5 @@ #![allow(clippy::integer_arithmetic)] mod cluster_tpu_info; -pub mod entry_notifier_interface; pub mod max_slots; pub mod optimistically_confirmed_bank_tracker; pub mod parsed_token_accounts; From 53291fd405bf641fed2ac31900190bddf7e13da6 Mon Sep 17 00:00:00 2001 From: Tyera Eulberg Date: Mon, 3 Apr 2023 20:58:56 -0600 Subject: [PATCH 2/7] Add EntryNotifierService --- ledger/src/entry_notifier_service.rs | 63 ++++++++++++++++++++++++++++ ledger/src/lib.rs | 1 + 2 files changed, 64 insertions(+) create mode 100644 ledger/src/entry_notifier_service.rs diff --git a/ledger/src/entry_notifier_service.rs b/ledger/src/entry_notifier_service.rs new file mode 100644 index 00000000000000..f453cda34eb224 --- /dev/null +++ b/ledger/src/entry_notifier_service.rs @@ -0,0 +1,63 @@ +use { + crate::entry_notifier_interface::EntryNotifierLock, + crossbeam_channel::{Receiver, RecvTimeoutError, Sender}, + solana_entry::entry::EntrySummary, + solana_sdk::clock::Slot, + std::{ + sync::{ + atomic::{AtomicBool, Ordering}, + Arc, + }, + thread::{self, Builder, JoinHandle}, + time::Duration, + }, +}; + +pub type EntryNotifierSender = Sender<(Slot, usize, EntrySummary)>; +pub type EntryNotifierReceiver = Receiver<(Slot, usize, EntrySummary)>; + +pub struct EntryNotifierService { + thread_hdl: JoinHandle<()>, +} + +impl EntryNotifierService { + pub fn new( + entry_notification_receiver: EntryNotifierReceiver, + entry_notifier: EntryNotifierLock, + exit: &Arc, + ) -> Self { + let exit = exit.clone(); + let thread_hdl = Builder::new() + .name("solEntryNotif".to_string()) + .spawn(move || loop { + if exit.load(Ordering::Relaxed) { + break; + } + + if let Err(RecvTimeoutError::Disconnected) = + Self::notify_entry(&entry_notification_receiver, entry_notifier.clone()) + { + break; + } + }) + .unwrap(); + Self { thread_hdl } + } + + fn notify_entry( + entry_notification_receiver: &EntryNotifierReceiver, + entry_notifier: EntryNotifierLock, + ) -> Result<(), RecvTimeoutError> { + let (slot, index, entry) = + entry_notification_receiver.recv_timeout(Duration::from_secs(1))?; + entry_notifier + .write() + .unwrap() + .notify_entry(slot, index, &entry); + Ok(()) + } + + pub fn join(self) -> thread::Result<()> { + self.thread_hdl.join() + } +} diff --git a/ledger/src/lib.rs b/ledger/src/lib.rs index 6e09170da77b25..274bd6d369e960 100644 --- a/ledger/src/lib.rs +++ b/ledger/src/lib.rs @@ -15,6 +15,7 @@ pub mod blockstore_metrics; pub mod blockstore_options; pub mod blockstore_processor; pub mod entry_notifier_interface; +pub mod entry_notifier_service; pub mod genesis_utils; pub mod leader_schedule; pub mod leader_schedule_cache; From 5f99b0a35f503f9cd16c1b9ec5d73a9c1b93ec0f Mon Sep 17 00:00:00 2001 From: Tyera Eulberg Date: Tue, 25 Apr 2023 13:14:08 -0600 Subject: [PATCH 3/7] Use descriptive struct in sender/receiver --- ledger/src/entry_notifier_service.rs | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/ledger/src/entry_notifier_service.rs b/ledger/src/entry_notifier_service.rs index f453cda34eb224..e5946a32354891 100644 --- a/ledger/src/entry_notifier_service.rs +++ b/ledger/src/entry_notifier_service.rs @@ -13,8 +13,14 @@ use { }, }; -pub type EntryNotifierSender = Sender<(Slot, usize, EntrySummary)>; -pub type EntryNotifierReceiver = Receiver<(Slot, usize, EntrySummary)>; +pub struct EntryNotification { + pub slot: Slot, + pub index: usize, + pub entry: EntrySummary, +} + +pub type EntryNotifierSender = Sender; +pub type EntryNotifierReceiver = Receiver; pub struct EntryNotifierService { thread_hdl: JoinHandle<()>, @@ -48,7 +54,7 @@ impl EntryNotifierService { entry_notification_receiver: &EntryNotifierReceiver, entry_notifier: EntryNotifierLock, ) -> Result<(), RecvTimeoutError> { - let (slot, index, entry) = + let EntryNotification { slot, index, entry } = entry_notification_receiver.recv_timeout(Duration::from_secs(1))?; entry_notifier .write() From ab703b8a2754ca0eba5987ab1b365bb984f5a049 Mon Sep 17 00:00:00 2001 From: Tyera Eulberg Date: Tue, 4 Apr 2023 10:40:47 -0600 Subject: [PATCH 4/7] Optionally initialize EntryNotifierService in validator --- core/src/validator.rs | 36 ++++++++++++++++++++++++++++++++++-- 1 file changed, 34 insertions(+), 2 deletions(-) diff --git a/core/src/validator.rs b/core/src/validator.rs index 220d040d1b6478..ad0189dfe5c50c 100644 --- a/core/src/validator.rs +++ b/core/src/validator.rs @@ -51,6 +51,8 @@ use { }, blockstore_options::{BlockstoreOptions, BlockstoreRecoveryMode, LedgerColumnOptions}, blockstore_processor::{self, TransactionStatusSender}, + entry_notifier_interface::EntryNotifierLock, + entry_notifier_service::{EntryNotifierSender, EntryNotifierService}, leader_schedule::FixedSchedule, leader_schedule_cache::LeaderScheduleCache, }, @@ -422,6 +424,7 @@ pub struct Validator { transaction_status_service: Option, rewards_recorder_service: Option, cache_block_meta_service: Option, + entry_notifier_service: Option, system_monitor_service: Option, sample_performance_service: Option, poh_timing_report_service: PohTimingReportService, @@ -584,16 +587,26 @@ impl Validator { .as_ref() .and_then(|geyser_plugin_service| geyser_plugin_service.get_transaction_notifier()); + let entry_notifier = geyser_plugin_service + .as_ref() + .and_then(|geyser_plugin_service| geyser_plugin_service.get_entry_notifier()); + let block_metadata_notifier = geyser_plugin_service .as_ref() .and_then(|geyser_plugin_service| geyser_plugin_service.get_block_metadata_notifier()); info!( - "Geyser plugin: accounts_update_notifier: {} transaction_notifier: {}", + "Geyser plugin: accounts_update_notifier: {}, \ + transaction_notifier: {}, \ + entry_notifier: {}", accounts_update_notifier.is_some(), - transaction_notifier.is_some() + transaction_notifier.is_some(), + entry_notifier.is_some() ); + let (_entry_notification_sender, entry_notifier_service) = + initialize_entry_notifier_service(entry_notifier, &exit).unzip(); + let system_monitor_service = Some(SystemMonitorService::new( Arc::clone(&exit), SystemMonitorStatsReportConfig { @@ -1183,6 +1196,7 @@ impl Validator { transaction_status_service, rewards_recorder_service, cache_block_meta_service, + entry_notifier_service, system_monitor_service, sample_performance_service, poh_timing_report_service, @@ -1299,6 +1313,12 @@ impl Validator { .expect("sample_performance_service"); } + if let Some(entry_notifier_service) = self.entry_notifier_service { + entry_notifier_service + .join() + .expect("entry_notifier_service"); + } + if let Some(s) = self.snapshot_packager_service { s.join().expect("snapshot_packager_service"); } @@ -1995,6 +2015,18 @@ fn initialize_rpc_transaction_history_services( } } +fn initialize_entry_notifier_service( + entry_notifier: Option, + exit: &Arc, +) -> Option<(EntryNotifierSender, EntryNotifierService)> { + entry_notifier.map(|entry_notifier| { + let (entry_notification_sender, entry_notification_receiver) = unbounded(); + let entry_notifier_service = + EntryNotifierService::new(entry_notification_receiver, entry_notifier, exit); + (entry_notification_sender, entry_notifier_service) + }) +} + #[derive(Debug, PartialEq, Eq)] enum ValidatorError { BadExpectedBankHash, From 05edd450b8f3b43cef383750b0b3b4fafa4981a4 Mon Sep 17 00:00:00 2001 From: Tyera Eulberg Date: Tue, 4 Apr 2023 11:23:53 -0600 Subject: [PATCH 5/7] Plumb EntryNotfierSender into Tvu, blockstore_processor --- core/src/replay_stage.rs | 15 ++++++++++++++ core/src/tvu.rs | 5 ++++- core/src/validator.rs | 21 ++++++++++++++++--- ledger-tool/src/ledger_utils.rs | 2 ++ ledger/src/bank_forks_utils.rs | 6 ++++++ ledger/src/blockstore_processor.rs | 31 ++++++++++++++++++++++++---- local-cluster/tests/local_cluster.rs | 1 + 7 files changed, 73 insertions(+), 8 deletions(-) diff --git a/core/src/replay_stage.rs b/core/src/replay_stage.rs index 335b12acfe7f77..f202cfa26d0ddf 100644 --- a/core/src/replay_stage.rs +++ b/core/src/replay_stage.rs @@ -42,6 +42,7 @@ use { blockstore_processor::{ self, BlockstoreProcessorError, ConfirmationProgress, TransactionStatusSender, }, + entry_notifier_service::EntryNotifierSender, leader_schedule_cache::LeaderScheduleCache, leader_schedule_utils::first_of_consecutive_leader_slots, }, @@ -236,6 +237,7 @@ pub struct ReplayStageConfig { pub transaction_status_sender: Option, pub rewards_recorder_sender: Option, pub cache_block_meta_sender: Option, + pub entry_notification_sender: Option, pub bank_notification_sender: Option, pub wait_for_vote_to_start_leader: bool, pub ancestor_hashes_replay_update_sender: AncestorHashesReplayUpdateSender, @@ -501,6 +503,7 @@ impl ReplayStage { transaction_status_sender, rewards_recorder_sender, cache_block_meta_sender, + entry_notification_sender, bank_notification_sender, wait_for_vote_to_start_leader, ancestor_hashes_replay_update_sender, @@ -596,6 +599,7 @@ impl ReplayStage { &mut progress, transaction_status_sender.as_ref(), cache_block_meta_sender.as_ref(), + entry_notification_sender.as_ref(), &verify_recyclers, &mut heaviest_subtree_fork_choice, &replay_vote_sender, @@ -1869,12 +1873,14 @@ impl ReplayStage { } } + #[allow(clippy::too_many_arguments)] fn replay_blockstore_into_bank( bank: &Arc, blockstore: &Blockstore, replay_stats: &RwLock, replay_progress: &RwLock, transaction_status_sender: Option<&TransactionStatusSender>, + entry_notification_sender: Option<&EntryNotifierSender>, replay_vote_sender: &ReplayVoteSender, verify_recyclers: &VerifyRecyclers, log_messages_bytes_limit: Option, @@ -1893,6 +1899,7 @@ impl ReplayStage { &mut w_replay_progress, false, transaction_status_sender, + entry_notification_sender, Some(replay_vote_sender), verify_recyclers, false, @@ -2412,6 +2419,7 @@ impl ReplayStage { vote_account: &Pubkey, progress: &mut ProgressMap, transaction_status_sender: Option<&TransactionStatusSender>, + entry_notification_sender: Option<&EntryNotifierSender>, verify_recyclers: &VerifyRecyclers, replay_vote_sender: &ReplayVoteSender, replay_timing: &mut ReplayTiming, @@ -2490,6 +2498,7 @@ impl ReplayStage { &replay_stats, &replay_progress, transaction_status_sender, + entry_notification_sender, &replay_vote_sender.clone(), &verify_recyclers.clone(), log_messages_bytes_limit, @@ -2519,6 +2528,7 @@ impl ReplayStage { vote_account: &Pubkey, progress: &mut ProgressMap, transaction_status_sender: Option<&TransactionStatusSender>, + entry_notification_sender: Option<&EntryNotifierSender>, verify_recyclers: &VerifyRecyclers, replay_vote_sender: &ReplayVoteSender, replay_timing: &mut ReplayTiming, @@ -2571,6 +2581,7 @@ impl ReplayStage { &bank_progress.replay_stats, &bank_progress.replay_progress, transaction_status_sender, + entry_notification_sender, &replay_vote_sender.clone(), &verify_recyclers.clone(), log_messages_bytes_limit, @@ -2781,6 +2792,7 @@ impl ReplayStage { progress: &mut ProgressMap, transaction_status_sender: Option<&TransactionStatusSender>, cache_block_meta_sender: Option<&CacheBlockMetaSender>, + entry_notification_sender: Option<&EntryNotifierSender>, verify_recyclers: &VerifyRecyclers, heaviest_subtree_fork_choice: &mut HeaviestSubtreeForkChoice, replay_vote_sender: &ReplayVoteSender, @@ -2819,6 +2831,7 @@ impl ReplayStage { vote_account, progress, transaction_status_sender, + entry_notification_sender, verify_recyclers, replay_vote_sender, replay_timing, @@ -2837,6 +2850,7 @@ impl ReplayStage { vote_account, progress, transaction_status_sender, + entry_notification_sender, verify_recyclers, replay_vote_sender, replay_timing, @@ -4437,6 +4451,7 @@ pub(crate) mod tests { &bank1_progress.replay_stats, &bank1_progress.replay_progress, None, + None, &replay_vote_sender, &VerifyRecyclers::default(), None, diff --git a/core/src/tvu.rs b/core/src/tvu.rs index 552ce77ec6a3b3..4ba16d39d26b67 100644 --- a/core/src/tvu.rs +++ b/core/src/tvu.rs @@ -37,7 +37,7 @@ use { }, solana_ledger::{ blockstore::Blockstore, blockstore_processor::TransactionStatusSender, - leader_schedule_cache::LeaderScheduleCache, + entry_notifier_service::EntryNotifierSender, leader_schedule_cache::LeaderScheduleCache, }, solana_poh::poh_recorder::PohRecorder, solana_rpc::{ @@ -120,6 +120,7 @@ impl Tvu { transaction_status_sender: Option, rewards_recorder_sender: Option, cache_block_meta_sender: Option, + entry_notification_sender: Option, vote_tracker: Arc, retransmit_slots_sender: RetransmitSlotsSender, gossip_verified_vote_hash_receiver: GossipVerifiedVoteHashReceiver, @@ -243,6 +244,7 @@ impl Tvu { transaction_status_sender, rewards_recorder_sender, cache_block_meta_sender, + entry_notification_sender, bank_notification_sender, wait_for_vote_to_start_leader: tvu_config.wait_for_vote_to_start_leader, ancestor_hashes_replay_update_sender, @@ -460,6 +462,7 @@ pub mod tests { None, None, None, + None, Arc::::default(), retransmit_slots_sender, gossip_verified_vote_hash_receiver, diff --git a/core/src/validator.rs b/core/src/validator.rs index ad0189dfe5c50c..a8fba5411c591b 100644 --- a/core/src/validator.rs +++ b/core/src/validator.rs @@ -604,9 +604,6 @@ impl Validator { entry_notifier.is_some() ); - let (_entry_notification_sender, entry_notifier_service) = - initialize_entry_notifier_service(entry_notifier, &exit).unzip(); - let system_monitor_service = Some(SystemMonitorService::new( Arc::clone(&exit), SystemMonitorStatsReportConfig { @@ -643,6 +640,8 @@ impl Validator { blockstore_process_options, blockstore_root_scan, pruned_banks_receiver, + entry_notification_sender, + entry_notifier_service, ) = load_blockstore( config, ledger_path, @@ -650,6 +649,7 @@ impl Validator { &start_progress, accounts_update_notifier, transaction_notifier, + entry_notifier, Some(poh_timing_point_sender.clone()), )?; @@ -775,6 +775,7 @@ impl Validator { &blockstore_process_options, transaction_status_sender.as_ref(), cache_block_meta_sender.clone(), + entry_notification_sender.as_ref(), blockstore_root_scan, accounts_background_request_sender.clone(), config, @@ -1113,6 +1114,7 @@ impl Validator { transaction_status_sender.clone(), rewards_recorder_sender, cache_block_meta_sender, + entry_notification_sender.clone(), vote_tracker.clone(), retransmit_slots_sender, gossip_verified_vote_hash_receiver, @@ -1508,6 +1510,7 @@ fn load_blockstore( start_progress: &Arc>, accounts_update_notifier: Option, transaction_notifier: Option, + entry_notifier: Option, poh_timing_point_sender: Option, ) -> Result< ( @@ -1523,6 +1526,8 @@ fn load_blockstore( blockstore_processor::ProcessOptions, BlockstoreRootScan, DroppedSlotsReceiver, + Option, + Option, ), String, > { @@ -1600,6 +1605,9 @@ fn load_blockstore( TransactionHistoryServices::default() }; + let (entry_notification_sender, entry_notifier_service) = + initialize_entry_notifier_service(entry_notifier, exit).unzip(); + let (bank_forks, mut leader_schedule_cache, starting_snapshot_hashes) = bank_forks_utils::load_bank_forks( &genesis_config, @@ -1611,6 +1619,7 @@ fn load_blockstore( transaction_history_services .cache_block_meta_sender .as_ref(), + entry_notification_sender.as_ref(), accounts_update_notifier, exit, ); @@ -1663,6 +1672,8 @@ fn load_blockstore( process_options, blockstore_root_scan, pruned_banks_receiver, + entry_notification_sender, + entry_notifier_service, )) } @@ -1677,6 +1688,7 @@ pub struct ProcessBlockStore<'a> { process_options: &'a blockstore_processor::ProcessOptions, transaction_status_sender: Option<&'a TransactionStatusSender>, cache_block_meta_sender: Option, + entry_notification_sender: Option<&'a EntryNotifierSender>, blockstore_root_scan: Option, accounts_background_request_sender: AbsRequestSender, config: &'a ValidatorConfig, @@ -1696,6 +1708,7 @@ impl<'a> ProcessBlockStore<'a> { process_options: &'a blockstore_processor::ProcessOptions, transaction_status_sender: Option<&'a TransactionStatusSender>, cache_block_meta_sender: Option, + entry_notification_sender: Option<&'a EntryNotifierSender>, blockstore_root_scan: BlockstoreRootScan, accounts_background_request_sender: AbsRequestSender, config: &'a ValidatorConfig, @@ -1711,6 +1724,7 @@ impl<'a> ProcessBlockStore<'a> { process_options, transaction_status_sender, cache_block_meta_sender, + entry_notification_sender, blockstore_root_scan: Some(blockstore_root_scan), accounts_background_request_sender, config, @@ -1748,6 +1762,7 @@ impl<'a> ProcessBlockStore<'a> { self.process_options, self.transaction_status_sender, self.cache_block_meta_sender.as_ref(), + self.entry_notification_sender, &self.accounts_background_request_sender, ) { exit.store(true, Ordering::Relaxed); diff --git a/ledger-tool/src/ledger_utils.rs b/ledger-tool/src/ledger_utils.rs index 7b373802443d86..d29bcb0dbe2f50 100644 --- a/ledger-tool/src/ledger_utils.rs +++ b/ledger-tool/src/ledger_utils.rs @@ -235,6 +235,7 @@ pub fn load_bank_forks( snapshot_config.as_ref(), &process_options, None, + None, // Maybe support this later, though accounts_update_notifier, &Arc::default(), ); @@ -322,6 +323,7 @@ pub fn load_bank_forks( &process_options, transaction_status_sender.as_ref(), None, + None, // Maybe support this later, though &accounts_background_request_sender, ) .map(|_| (bank_forks, starting_snapshot_hashes)); diff --git a/ledger/src/bank_forks_utils.rs b/ledger/src/bank_forks_utils.rs index 896a8baad9cac7..ebf48c864daed3 100644 --- a/ledger/src/bank_forks_utils.rs +++ b/ledger/src/bank_forks_utils.rs @@ -5,6 +5,7 @@ use { self, BlockstoreProcessorError, CacheBlockMetaSender, ProcessOptions, TransactionStatusSender, }, + entry_notifier_service::EntryNotifierSender, leader_schedule_cache::LeaderScheduleCache, }, log::*, @@ -49,6 +50,7 @@ pub fn load( process_options: ProcessOptions, transaction_status_sender: Option<&TransactionStatusSender>, cache_block_meta_sender: Option<&CacheBlockMetaSender>, + entry_notification_sender: Option<&EntryNotifierSender>, accounts_update_notifier: Option, exit: &Arc, ) -> LoadResult { @@ -60,6 +62,7 @@ pub fn load( snapshot_config, &process_options, cache_block_meta_sender, + entry_notification_sender, accounts_update_notifier, exit, ); @@ -71,6 +74,7 @@ pub fn load( &process_options, transaction_status_sender, cache_block_meta_sender, + entry_notification_sender, &AbsRequestSender::default(), ) .map(|_| (bank_forks, leader_schedule_cache, starting_snapshot_hashes)) @@ -85,6 +89,7 @@ pub fn load_bank_forks( snapshot_config: Option<&SnapshotConfig>, process_options: &ProcessOptions, cache_block_meta_sender: Option<&CacheBlockMetaSender>, + entry_notification_sender: Option<&EntryNotifierSender>, accounts_update_notifier: Option, exit: &Arc, ) -> ( @@ -145,6 +150,7 @@ pub fn load_bank_forks( account_paths, process_options, cache_block_meta_sender, + entry_notification_sender, accounts_update_notifier, exit, ); diff --git a/ledger/src/blockstore_processor.rs b/ledger/src/blockstore_processor.rs index 247d8a1a40e194..71eabe62b64226 100644 --- a/ledger/src/blockstore_processor.rs +++ b/ledger/src/blockstore_processor.rs @@ -1,8 +1,8 @@ use { crate::{ block_error::BlockError, blockstore::Blockstore, blockstore_db::BlockstoreError, - blockstore_meta::SlotMeta, leader_schedule_cache::LeaderScheduleCache, - token_balances::collect_token_balances, + blockstore_meta::SlotMeta, entry_notifier_service::EntryNotifierSender, + leader_schedule_cache::LeaderScheduleCache, token_balances::collect_token_balances, }, chrono_humanize::{Accuracy, HumanTime, Tense}, crossbeam_channel::Sender, @@ -677,6 +677,7 @@ pub fn test_process_blockstore( opts, None, None, + None, exit, ); @@ -687,6 +688,7 @@ pub fn test_process_blockstore( opts, None, None, + None, &abs_request_sender, ) .unwrap(); @@ -703,6 +705,7 @@ pub(crate) fn process_blockstore_for_bank_0( account_paths: Vec, opts: &ProcessOptions, cache_block_meta_sender: Option<&CacheBlockMetaSender>, + entry_notification_sender: Option<&EntryNotifierSender>, accounts_update_notifier: Option, exit: &Arc, ) -> Arc> { @@ -729,6 +732,7 @@ pub(crate) fn process_blockstore_for_bank_0( opts, &VerifyRecyclers::default(), cache_block_meta_sender, + entry_notification_sender, ); bank_forks } @@ -742,6 +746,7 @@ pub fn process_blockstore_from_root( opts: &ProcessOptions, transaction_status_sender: Option<&TransactionStatusSender>, cache_block_meta_sender: Option<&CacheBlockMetaSender>, + entry_notification_sender: Option<&EntryNotifierSender>, accounts_background_request_sender: &AbsRequestSender, ) -> result::Result<(), BlockstoreProcessorError> { let (start_slot, start_slot_hash) = { @@ -791,6 +796,7 @@ pub fn process_blockstore_from_root( opts, transaction_status_sender, cache_block_meta_sender, + entry_notification_sender, &mut timing, accounts_background_request_sender, )? @@ -896,6 +902,7 @@ fn confirm_full_slot( recyclers: &VerifyRecyclers, progress: &mut ConfirmationProgress, transaction_status_sender: Option<&TransactionStatusSender>, + entry_notification_sender: Option<&EntryNotifierSender>, replay_vote_sender: Option<&ReplayVoteSender>, timing: &mut ExecuteTimings, ) -> result::Result<(), BlockstoreProcessorError> { @@ -910,6 +917,7 @@ fn confirm_full_slot( progress, skip_verification, transaction_status_sender, + entry_notification_sender, replay_vote_sender, recyclers, opts.allow_dead_slots, @@ -1055,6 +1063,7 @@ pub fn confirm_slot( progress: &mut ConfirmationProgress, skip_verification: bool, transaction_status_sender: Option<&TransactionStatusSender>, + entry_notification_sender: Option<&EntryNotifierSender>, replay_vote_sender: Option<&ReplayVoteSender>, recyclers: &VerifyRecyclers, allow_dead_slots: bool, @@ -1084,6 +1093,7 @@ pub fn confirm_slot( progress, skip_verification, transaction_status_sender, + entry_notification_sender, replay_vote_sender, recyclers, log_messages_bytes_limit, @@ -1099,6 +1109,7 @@ fn confirm_slot_entries( progress: &mut ConfirmationProgress, skip_verification: bool, transaction_status_sender: Option<&TransactionStatusSender>, + _entry_notification_sender: Option<&EntryNotifierSender>, replay_vote_sender: Option<&ReplayVoteSender>, recyclers: &VerifyRecyclers, log_messages_bytes_limit: Option, @@ -1281,6 +1292,7 @@ fn process_bank_0( opts: &ProcessOptions, recyclers: &VerifyRecyclers, cache_block_meta_sender: Option<&CacheBlockMetaSender>, + entry_notification_sender: Option<&EntryNotifierSender>, ) { assert_eq!(bank0.slot(), 0); let mut progress = ConfirmationProgress::new(bank0.last_blockhash()); @@ -1291,6 +1303,7 @@ fn process_bank_0( recyclers, &mut progress, None, + entry_notification_sender, None, &mut ExecuteTimings::default(), ) @@ -1371,6 +1384,7 @@ fn load_frozen_forks( opts: &ProcessOptions, transaction_status_sender: Option<&TransactionStatusSender>, cache_block_meta_sender: Option<&CacheBlockMetaSender>, + entry_notification_sender: Option<&EntryNotifierSender>, timing: &mut ExecuteTimings, accounts_background_request_sender: &AbsRequestSender, ) -> result::Result<(u64, usize), BlockstoreProcessorError> { @@ -1458,6 +1472,7 @@ fn load_frozen_forks( &mut progress, transaction_status_sender, cache_block_meta_sender, + entry_notification_sender, None, timing, ) @@ -1648,6 +1663,7 @@ fn supermajority_root_from_vote_accounts( // Processes and replays the contents of a single slot, returns Error // if failed to play the slot +#[allow(clippy::too_many_arguments)] fn process_single_slot( blockstore: &Blockstore, bank: &Arc, @@ -1656,6 +1672,7 @@ fn process_single_slot( progress: &mut ConfirmationProgress, transaction_status_sender: Option<&TransactionStatusSender>, cache_block_meta_sender: Option<&CacheBlockMetaSender>, + entry_notification_sender: Option<&EntryNotifierSender>, replay_vote_sender: Option<&ReplayVoteSender>, timing: &mut ExecuteTimings, ) -> result::Result<(), BlockstoreProcessorError> { @@ -1668,6 +1685,7 @@ fn process_single_slot( recyclers, progress, transaction_status_sender, + entry_notification_sender, replay_vote_sender, timing, ) @@ -3384,7 +3402,7 @@ pub mod tests { vec![entry_1, tick, entry_2.clone()], true, None, - None + None, ), Ok(()) ); @@ -3562,7 +3580,7 @@ pub mod tests { ..ProcessOptions::default() }; let recyclers = VerifyRecyclers::default(); - process_bank_0(&bank0, &blockstore, &opts, &recyclers, None); + process_bank_0(&bank0, &blockstore, &opts, &recyclers, None, None); let bank1 = bank_forks.insert(Bank::new_from_parent(&bank0, &Pubkey::default(), 1)); confirm_full_slot( &blockstore, @@ -3572,6 +3590,7 @@ pub mod tests { &mut ConfirmationProgress::new(bank0.last_blockhash()), None, None, + None, &mut ExecuteTimings::default(), ) .unwrap(); @@ -3592,6 +3611,7 @@ pub mod tests { &opts, None, None, + None, &AbsRequestSender::default(), ) .unwrap(); @@ -4214,6 +4234,7 @@ pub mod tests { false, None, None, + None, &VerifyRecyclers::default(), None, &PrioritizationFeeCache::new(0u64), @@ -4357,6 +4378,7 @@ pub mod tests { false, Some(&transaction_status_sender), None, + None, &VerifyRecyclers::default(), None, &PrioritizationFeeCache::new(0u64), @@ -4402,6 +4424,7 @@ pub mod tests { false, Some(&transaction_status_sender), None, + None, &VerifyRecyclers::default(), None, &PrioritizationFeeCache::new(0u64), diff --git a/local-cluster/tests/local_cluster.rs b/local-cluster/tests/local_cluster.rs index ab3bd7457b369f..28e6f327d9f621 100644 --- a/local-cluster/tests/local_cluster.rs +++ b/local-cluster/tests/local_cluster.rs @@ -2183,6 +2183,7 @@ fn create_snapshot_to_hard_fork( None, None, None, + None, &Arc::default(), ) .unwrap(); From db3a25acd03138711f36c4edb88830a8a89f8a50 Mon Sep 17 00:00:00 2001 From: Tyera Eulberg Date: Thu, 20 Apr 2023 14:35:52 -0500 Subject: [PATCH 6/7] Plumb EntryNotfierSender into Tpu --- core/src/tpu.rs | 6 +++++- core/src/validator.rs | 1 + 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/core/src/tpu.rs b/core/src/tpu.rs index f19340a27fd183..dd65e990782534 100644 --- a/core/src/tpu.rs +++ b/core/src/tpu.rs @@ -19,7 +19,10 @@ use { crossbeam_channel::{unbounded, Receiver}, solana_client::connection_cache::ConnectionCache, solana_gossip::cluster_info::ClusterInfo, - solana_ledger::{blockstore::Blockstore, blockstore_processor::TransactionStatusSender}, + solana_ledger::{ + blockstore::Blockstore, blockstore_processor::TransactionStatusSender, + entry_notifier_service::EntryNotifierSender, + }, solana_poh::poh_recorder::{PohRecorder, WorkingBankEntry}, solana_rpc::{ optimistically_confirmed_bank_tracker::BankNotificationSender, @@ -80,6 +83,7 @@ impl Tpu { sockets: TpuSockets, subscriptions: &Arc, transaction_status_sender: Option, + _entry_notification_sender: Option, blockstore: &Arc, broadcast_type: &BroadcastStageType, exit: &Arc, diff --git a/core/src/validator.rs b/core/src/validator.rs index a8fba5411c591b..f85165338c783b 100644 --- a/core/src/validator.rs +++ b/core/src/validator.rs @@ -1156,6 +1156,7 @@ impl Validator { }, &rpc_subscriptions, transaction_status_sender, + entry_notification_sender.clone(), &blockstore, &config.broadcast_stage_type, &exit, From 0e71d14d7d865758f45ecf39f2b89a918c9c484b Mon Sep 17 00:00:00 2001 From: Tyera Eulberg Date: Mon, 8 May 2023 18:17:26 -0600 Subject: [PATCH 7/7] Only return one option when constructing EntryNotifierService --- core/src/validator.rs | 33 +++++++++++----------------- ledger/src/entry_notifier_service.rs | 23 +++++++++++++------ 2 files changed, 29 insertions(+), 27 deletions(-) diff --git a/core/src/validator.rs b/core/src/validator.rs index f85165338c783b..b3a3e35d99445f 100644 --- a/core/src/validator.rs +++ b/core/src/validator.rs @@ -640,7 +640,6 @@ impl Validator { blockstore_process_options, blockstore_root_scan, pruned_banks_receiver, - entry_notification_sender, entry_notifier_service, ) = load_blockstore( config, @@ -764,6 +763,9 @@ impl Validator { ); let leader_schedule_cache = Arc::new(leader_schedule_cache); + let entry_notification_sender = entry_notifier_service + .as_ref() + .map(|service| service.sender()); let mut process_blockstore = ProcessBlockStore::new( &id, vote_account, @@ -775,7 +777,7 @@ impl Validator { &blockstore_process_options, transaction_status_sender.as_ref(), cache_block_meta_sender.clone(), - entry_notification_sender.as_ref(), + entry_notification_sender, blockstore_root_scan, accounts_background_request_sender.clone(), config, @@ -1088,6 +1090,9 @@ impl Validator { info!("Disabled banking tracer"); } + let entry_notification_sender = entry_notifier_service + .as_ref() + .map(|service| service.sender_cloned()); let (replay_vote_sender, replay_vote_receiver) = unbounded(); let tvu = Tvu::new( vote_account, @@ -1156,7 +1161,7 @@ impl Validator { }, &rpc_subscriptions, transaction_status_sender, - entry_notification_sender.clone(), + entry_notification_sender, &blockstore, &config.broadcast_stage_type, &exit, @@ -1527,7 +1532,6 @@ fn load_blockstore( blockstore_processor::ProcessOptions, BlockstoreRootScan, DroppedSlotsReceiver, - Option, Option, ), String, @@ -1606,8 +1610,8 @@ fn load_blockstore( TransactionHistoryServices::default() }; - let (entry_notification_sender, entry_notifier_service) = - initialize_entry_notifier_service(entry_notifier, exit).unzip(); + let entry_notifier_service = + entry_notifier.map(|entry_notifier| EntryNotifierService::new(entry_notifier, exit)); let (bank_forks, mut leader_schedule_cache, starting_snapshot_hashes) = bank_forks_utils::load_bank_forks( @@ -1620,7 +1624,9 @@ fn load_blockstore( transaction_history_services .cache_block_meta_sender .as_ref(), - entry_notification_sender.as_ref(), + entry_notifier_service + .as_ref() + .map(|service| service.sender()), accounts_update_notifier, exit, ); @@ -1673,7 +1679,6 @@ fn load_blockstore( process_options, blockstore_root_scan, pruned_banks_receiver, - entry_notification_sender, entry_notifier_service, )) } @@ -2031,18 +2036,6 @@ fn initialize_rpc_transaction_history_services( } } -fn initialize_entry_notifier_service( - entry_notifier: Option, - exit: &Arc, -) -> Option<(EntryNotifierSender, EntryNotifierService)> { - entry_notifier.map(|entry_notifier| { - let (entry_notification_sender, entry_notification_receiver) = unbounded(); - let entry_notifier_service = - EntryNotifierService::new(entry_notification_receiver, entry_notifier, exit); - (entry_notification_sender, entry_notifier_service) - }) -} - #[derive(Debug, PartialEq, Eq)] enum ValidatorError { BadExpectedBankHash, diff --git a/ledger/src/entry_notifier_service.rs b/ledger/src/entry_notifier_service.rs index e5946a32354891..b07f3ce1a27dd1 100644 --- a/ledger/src/entry_notifier_service.rs +++ b/ledger/src/entry_notifier_service.rs @@ -1,6 +1,6 @@ use { crate::entry_notifier_interface::EntryNotifierLock, - crossbeam_channel::{Receiver, RecvTimeoutError, Sender}, + crossbeam_channel::{unbounded, Receiver, RecvTimeoutError, Sender}, solana_entry::entry::EntrySummary, solana_sdk::clock::Slot, std::{ @@ -23,16 +23,14 @@ pub type EntryNotifierSender = Sender; pub type EntryNotifierReceiver = Receiver; pub struct EntryNotifierService { + sender: EntryNotifierSender, thread_hdl: JoinHandle<()>, } impl EntryNotifierService { - pub fn new( - entry_notification_receiver: EntryNotifierReceiver, - entry_notifier: EntryNotifierLock, - exit: &Arc, - ) -> Self { + pub fn new(entry_notifier: EntryNotifierLock, exit: &Arc) -> Self { let exit = exit.clone(); + let (entry_notification_sender, entry_notification_receiver) = unbounded(); let thread_hdl = Builder::new() .name("solEntryNotif".to_string()) .spawn(move || loop { @@ -47,7 +45,10 @@ impl EntryNotifierService { } }) .unwrap(); - Self { thread_hdl } + Self { + sender: entry_notification_sender, + thread_hdl, + } } fn notify_entry( @@ -63,6 +64,14 @@ impl EntryNotifierService { Ok(()) } + pub fn sender(&self) -> &EntryNotifierSender { + &self.sender + } + + pub fn sender_cloned(&self) -> EntryNotifierSender { + self.sender.clone() + } + pub fn join(self) -> thread::Result<()> { self.thread_hdl.join() }