Skip to content
This repository was archived by the owner on Jan 22, 2025. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

15 changes: 15 additions & 0 deletions core/src/replay_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ use {
blockstore_processor::{
self, BlockstoreProcessorError, ConfirmationProgress, TransactionStatusSender,
},
entry_notifier_service::EntryNotifierSender,
leader_schedule_cache::LeaderScheduleCache,
leader_schedule_utils::first_of_consecutive_leader_slots,
},
Expand Down Expand Up @@ -236,6 +237,7 @@ pub struct ReplayStageConfig {
pub transaction_status_sender: Option<TransactionStatusSender>,
pub rewards_recorder_sender: Option<RewardsRecorderSender>,
pub cache_block_meta_sender: Option<CacheBlockMetaSender>,
pub entry_notification_sender: Option<EntryNotifierSender>,
pub bank_notification_sender: Option<BankNotificationSenderConfig>,
pub wait_for_vote_to_start_leader: bool,
pub ancestor_hashes_replay_update_sender: AncestorHashesReplayUpdateSender,
Expand Down Expand Up @@ -501,6 +503,7 @@ impl ReplayStage {
transaction_status_sender,
rewards_recorder_sender,
cache_block_meta_sender,
entry_notification_sender,
bank_notification_sender,
wait_for_vote_to_start_leader,
ancestor_hashes_replay_update_sender,
Expand Down Expand Up @@ -596,6 +599,7 @@ impl ReplayStage {
&mut progress,
transaction_status_sender.as_ref(),
cache_block_meta_sender.as_ref(),
entry_notification_sender.as_ref(),
&verify_recyclers,
&mut heaviest_subtree_fork_choice,
&replay_vote_sender,
Expand Down Expand Up @@ -1869,12 +1873,14 @@ impl ReplayStage {
}
}

#[allow(clippy::too_many_arguments)]
fn replay_blockstore_into_bank(
bank: &Arc<Bank>,
blockstore: &Blockstore,
replay_stats: &RwLock<ReplaySlotStats>,
replay_progress: &RwLock<ConfirmationProgress>,
transaction_status_sender: Option<&TransactionStatusSender>,
entry_notification_sender: Option<&EntryNotifierSender>,
replay_vote_sender: &ReplayVoteSender,
verify_recyclers: &VerifyRecyclers,
log_messages_bytes_limit: Option<usize>,
Expand All @@ -1893,6 +1899,7 @@ impl ReplayStage {
&mut w_replay_progress,
false,
transaction_status_sender,
entry_notification_sender,
Some(replay_vote_sender),
verify_recyclers,
false,
Expand Down Expand Up @@ -2412,6 +2419,7 @@ impl ReplayStage {
vote_account: &Pubkey,
progress: &mut ProgressMap,
transaction_status_sender: Option<&TransactionStatusSender>,
entry_notification_sender: Option<&EntryNotifierSender>,
verify_recyclers: &VerifyRecyclers,
replay_vote_sender: &ReplayVoteSender,
replay_timing: &mut ReplayTiming,
Expand Down Expand Up @@ -2490,6 +2498,7 @@ impl ReplayStage {
&replay_stats,
&replay_progress,
transaction_status_sender,
entry_notification_sender,
&replay_vote_sender.clone(),
&verify_recyclers.clone(),
log_messages_bytes_limit,
Expand Down Expand Up @@ -2519,6 +2528,7 @@ impl ReplayStage {
vote_account: &Pubkey,
progress: &mut ProgressMap,
transaction_status_sender: Option<&TransactionStatusSender>,
entry_notification_sender: Option<&EntryNotifierSender>,
verify_recyclers: &VerifyRecyclers,
replay_vote_sender: &ReplayVoteSender,
replay_timing: &mut ReplayTiming,
Expand Down Expand Up @@ -2571,6 +2581,7 @@ impl ReplayStage {
&bank_progress.replay_stats,
&bank_progress.replay_progress,
transaction_status_sender,
entry_notification_sender,
&replay_vote_sender.clone(),
&verify_recyclers.clone(),
log_messages_bytes_limit,
Expand Down Expand Up @@ -2781,6 +2792,7 @@ impl ReplayStage {
progress: &mut ProgressMap,
transaction_status_sender: Option<&TransactionStatusSender>,
cache_block_meta_sender: Option<&CacheBlockMetaSender>,
entry_notification_sender: Option<&EntryNotifierSender>,
verify_recyclers: &VerifyRecyclers,
heaviest_subtree_fork_choice: &mut HeaviestSubtreeForkChoice,
replay_vote_sender: &ReplayVoteSender,
Expand Down Expand Up @@ -2819,6 +2831,7 @@ impl ReplayStage {
vote_account,
progress,
transaction_status_sender,
entry_notification_sender,
verify_recyclers,
replay_vote_sender,
replay_timing,
Expand All @@ -2837,6 +2850,7 @@ impl ReplayStage {
vote_account,
progress,
transaction_status_sender,
entry_notification_sender,
verify_recyclers,
replay_vote_sender,
replay_timing,
Expand Down Expand Up @@ -4437,6 +4451,7 @@ pub(crate) mod tests {
&bank1_progress.replay_stats,
&bank1_progress.replay_progress,
None,
None,
&replay_vote_sender,
&VerifyRecyclers::default(),
None,
Expand Down
6 changes: 5 additions & 1 deletion core/src/tpu.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,10 @@ use {
crossbeam_channel::{unbounded, Receiver},
solana_client::connection_cache::ConnectionCache,
solana_gossip::cluster_info::ClusterInfo,
solana_ledger::{blockstore::Blockstore, blockstore_processor::TransactionStatusSender},
solana_ledger::{
blockstore::Blockstore, blockstore_processor::TransactionStatusSender,
entry_notifier_service::EntryNotifierSender,
},
solana_poh::poh_recorder::{PohRecorder, WorkingBankEntry},
solana_rpc::{
optimistically_confirmed_bank_tracker::BankNotificationSender,
Expand Down Expand Up @@ -80,6 +83,7 @@ impl Tpu {
sockets: TpuSockets,
subscriptions: &Arc<RpcSubscriptions>,
transaction_status_sender: Option<TransactionStatusSender>,
_entry_notification_sender: Option<EntryNotifierSender>,
Comment thread
CriesofCarrots marked this conversation as resolved.
blockstore: &Arc<Blockstore>,
broadcast_type: &BroadcastStageType,
exit: &Arc<AtomicBool>,
Expand Down
5 changes: 4 additions & 1 deletion core/src/tvu.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ use {
},
solana_ledger::{
blockstore::Blockstore, blockstore_processor::TransactionStatusSender,
leader_schedule_cache::LeaderScheduleCache,
entry_notifier_service::EntryNotifierSender, leader_schedule_cache::LeaderScheduleCache,
},
solana_poh::poh_recorder::PohRecorder,
solana_rpc::{
Expand Down Expand Up @@ -120,6 +120,7 @@ impl Tvu {
transaction_status_sender: Option<TransactionStatusSender>,
rewards_recorder_sender: Option<RewardsRecorderSender>,
cache_block_meta_sender: Option<CacheBlockMetaSender>,
entry_notification_sender: Option<EntryNotifierSender>,
vote_tracker: Arc<VoteTracker>,
retransmit_slots_sender: RetransmitSlotsSender,
gossip_verified_vote_hash_receiver: GossipVerifiedVoteHashReceiver,
Expand Down Expand Up @@ -243,6 +244,7 @@ impl Tvu {
transaction_status_sender,
rewards_recorder_sender,
cache_block_meta_sender,
entry_notification_sender,
bank_notification_sender,
wait_for_vote_to_start_leader: tvu_config.wait_for_vote_to_start_leader,
ancestor_hashes_replay_update_sender,
Expand Down Expand Up @@ -460,6 +462,7 @@ pub mod tests {
None,
None,
None,
None,
Arc::<VoteTracker>::default(),
retransmit_slots_sender,
gossip_verified_vote_hash_receiver,
Expand Down
45 changes: 43 additions & 2 deletions core/src/validator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ use {
},
blockstore_options::{BlockstoreOptions, BlockstoreRecoveryMode, LedgerColumnOptions},
blockstore_processor::{self, TransactionStatusSender},
entry_notifier_interface::EntryNotifierLock,
entry_notifier_service::{EntryNotifierSender, EntryNotifierService},
leader_schedule::FixedSchedule,
leader_schedule_cache::LeaderScheduleCache,
},
Expand Down Expand Up @@ -422,6 +424,7 @@ pub struct Validator {
transaction_status_service: Option<TransactionStatusService>,
rewards_recorder_service: Option<RewardsRecorderService>,
cache_block_meta_service: Option<CacheBlockMetaService>,
entry_notifier_service: Option<EntryNotifierService>,
system_monitor_service: Option<SystemMonitorService>,
sample_performance_service: Option<SamplePerformanceService>,
poh_timing_report_service: PohTimingReportService,
Expand Down Expand Up @@ -584,14 +587,21 @@ impl Validator {
.as_ref()
.and_then(|geyser_plugin_service| geyser_plugin_service.get_transaction_notifier());

let entry_notifier = geyser_plugin_service
.as_ref()
.and_then(|geyser_plugin_service| geyser_plugin_service.get_entry_notifier());

let block_metadata_notifier = geyser_plugin_service
.as_ref()
.and_then(|geyser_plugin_service| geyser_plugin_service.get_block_metadata_notifier());

info!(
"Geyser plugin: accounts_update_notifier: {} transaction_notifier: {}",
"Geyser plugin: accounts_update_notifier: {}, \
transaction_notifier: {}, \
entry_notifier: {}",
accounts_update_notifier.is_some(),
transaction_notifier.is_some()
transaction_notifier.is_some(),
entry_notifier.is_some()
);

let system_monitor_service = Some(SystemMonitorService::new(
Expand Down Expand Up @@ -630,13 +640,15 @@ impl Validator {
blockstore_process_options,
blockstore_root_scan,
pruned_banks_receiver,
entry_notifier_service,
) = load_blockstore(
config,
ledger_path,
&exit,
&start_progress,
accounts_update_notifier,
transaction_notifier,
entry_notifier,
Some(poh_timing_point_sender.clone()),
)?;

Expand Down Expand Up @@ -751,6 +763,9 @@ impl Validator {
);

let leader_schedule_cache = Arc::new(leader_schedule_cache);
let entry_notification_sender = entry_notifier_service
.as_ref()
.map(|service| service.sender());
let mut process_blockstore = ProcessBlockStore::new(
&id,
vote_account,
Expand All @@ -762,6 +777,7 @@ impl Validator {
&blockstore_process_options,
transaction_status_sender.as_ref(),
cache_block_meta_sender.clone(),
entry_notification_sender,
blockstore_root_scan,
accounts_background_request_sender.clone(),
config,
Expand Down Expand Up @@ -1074,6 +1090,9 @@ impl Validator {
info!("Disabled banking tracer");
}

let entry_notification_sender = entry_notifier_service
.as_ref()
.map(|service| service.sender_cloned());
let (replay_vote_sender, replay_vote_receiver) = unbounded();
let tvu = Tvu::new(
vote_account,
Expand All @@ -1100,6 +1119,7 @@ impl Validator {
transaction_status_sender.clone(),
rewards_recorder_sender,
cache_block_meta_sender,
entry_notification_sender.clone(),
vote_tracker.clone(),
retransmit_slots_sender,
gossip_verified_vote_hash_receiver,
Expand Down Expand Up @@ -1141,6 +1161,7 @@ impl Validator {
},
&rpc_subscriptions,
transaction_status_sender,
entry_notification_sender,
&blockstore,
&config.broadcast_stage_type,
&exit,
Expand Down Expand Up @@ -1183,6 +1204,7 @@ impl Validator {
transaction_status_service,
rewards_recorder_service,
cache_block_meta_service,
entry_notifier_service,
system_monitor_service,
sample_performance_service,
poh_timing_report_service,
Expand Down Expand Up @@ -1299,6 +1321,12 @@ impl Validator {
.expect("sample_performance_service");
}

if let Some(entry_notifier_service) = self.entry_notifier_service {
entry_notifier_service
.join()
.expect("entry_notifier_service");
}

if let Some(s) = self.snapshot_packager_service {
s.join().expect("snapshot_packager_service");
}
Expand Down Expand Up @@ -1488,6 +1516,7 @@ fn load_blockstore(
start_progress: &Arc<RwLock<ValidatorStartProgress>>,
accounts_update_notifier: Option<AccountsUpdateNotifier>,
transaction_notifier: Option<TransactionNotifierLock>,
entry_notifier: Option<EntryNotifierLock>,
poh_timing_point_sender: Option<PohTimingSender>,
) -> Result<
(
Expand All @@ -1503,6 +1532,7 @@ fn load_blockstore(
blockstore_processor::ProcessOptions,
BlockstoreRootScan,
DroppedSlotsReceiver,
Option<EntryNotifierService>,
),
String,
> {
Expand Down Expand Up @@ -1580,6 +1610,9 @@ fn load_blockstore(
TransactionHistoryServices::default()
};

let entry_notifier_service =
entry_notifier.map(|entry_notifier| EntryNotifierService::new(entry_notifier, exit));

let (bank_forks, mut leader_schedule_cache, starting_snapshot_hashes) =
bank_forks_utils::load_bank_forks(
&genesis_config,
Expand All @@ -1591,6 +1624,9 @@ fn load_blockstore(
transaction_history_services
.cache_block_meta_sender
.as_ref(),
entry_notifier_service
.as_ref()
.map(|service| service.sender()),
accounts_update_notifier,
exit,
);
Expand Down Expand Up @@ -1643,6 +1679,7 @@ fn load_blockstore(
process_options,
blockstore_root_scan,
pruned_banks_receiver,
entry_notifier_service,
))
}

Expand All @@ -1657,6 +1694,7 @@ pub struct ProcessBlockStore<'a> {
process_options: &'a blockstore_processor::ProcessOptions,
transaction_status_sender: Option<&'a TransactionStatusSender>,
cache_block_meta_sender: Option<CacheBlockMetaSender>,
entry_notification_sender: Option<&'a EntryNotifierSender>,
blockstore_root_scan: Option<BlockstoreRootScan>,
accounts_background_request_sender: AbsRequestSender,
config: &'a ValidatorConfig,
Expand All @@ -1676,6 +1714,7 @@ impl<'a> ProcessBlockStore<'a> {
process_options: &'a blockstore_processor::ProcessOptions,
transaction_status_sender: Option<&'a TransactionStatusSender>,
cache_block_meta_sender: Option<CacheBlockMetaSender>,
entry_notification_sender: Option<&'a EntryNotifierSender>,
blockstore_root_scan: BlockstoreRootScan,
accounts_background_request_sender: AbsRequestSender,
config: &'a ValidatorConfig,
Expand All @@ -1691,6 +1730,7 @@ impl<'a> ProcessBlockStore<'a> {
process_options,
transaction_status_sender,
cache_block_meta_sender,
entry_notification_sender,
blockstore_root_scan: Some(blockstore_root_scan),
accounts_background_request_sender,
config,
Expand Down Expand Up @@ -1728,6 +1768,7 @@ impl<'a> ProcessBlockStore<'a> {
self.process_options,
self.transaction_status_sender,
self.cache_block_meta_sender.as_ref(),
self.entry_notification_sender,
&self.accounts_background_request_sender,
) {
exit.store(true, Ordering::Relaxed);
Expand Down
1 change: 1 addition & 0 deletions geyser-plugin-manager/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ log = { workspace = true }
serde_json = { workspace = true }
solana-entry = { workspace = true }
solana-geyser-plugin-interface = { workspace = true }
solana-ledger = { workspace = true }
solana-measure = { workspace = true }
solana-metrics = { workspace = true }
solana-rpc = { workspace = true }
Expand Down
2 changes: 1 addition & 1 deletion geyser-plugin-manager/src/entry_notifier.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@ use {
solana_geyser_plugin_interface::geyser_plugin_interface::{
ReplicaEntryInfo, ReplicaEntryInfoVersions,
},
solana_ledger::entry_notifier_interface::EntryNotifier,
solana_measure::measure::Measure,
solana_metrics::*,
solana_rpc::entry_notifier_interface::EntryNotifier,
solana_sdk::clock::Slot,
std::sync::{Arc, RwLock},
};
Expand Down
Loading