diff --git a/core/src/completed_data_sets_service.rs b/core/src/completed_data_sets_service.rs index 9ec0a26886f..a35aff8e0bb 100644 --- a/core/src/completed_data_sets_service.rs +++ b/core/src/completed_data_sets_service.rs @@ -1,5 +1,8 @@ -//! [`CompletedDataSetsService`] is a hub, that runs different operations when a "completed data -//! set", also known as a [`Vec`], is received by the validator. +//! [`CompletedDataSetsService`] is a hub that runs different operations when a completed data set +//! is received by the validator. +//! +//! A completed data set is a contiguous range of data shreds whose combined payload deserializes +//! to a single [`Vec`]. //! //! Currently, `WindowService` sends [`CompletedDataSetInfo`]s via a `completed_sets_receiver` //! provided to the [`CompletedDataSetsService`]. @@ -157,10 +160,14 @@ impl CompletedDataSetsService { .flatten() .map(|completed_data_set_info| { let CompletedDataSetInfo { slot, indices } = completed_data_set_info; + let completed_data_set_starting_shred_index = indices.start; + let completed_data_set_ending_shred_index_exclusive = indices.end; match blockstore.get_entries_in_data_block(slot, indices, /*slot_meta:*/ None) { Ok(entries) => { Self::notify_deshred_transactions_for_completed_data_set( slot, + completed_data_set_starting_shred_index, + completed_data_set_ending_shred_index_exclusive, &entries, deshred_transaction_notifier.as_deref(), root_bank.as_deref(), @@ -207,6 +214,8 @@ impl CompletedDataSetsService { fn notify_deshred_transactions_for_completed_data_set( slot: u64, + completed_data_set_starting_shred_index: u32, + completed_data_set_ending_shred_index_exclusive: u32, entries: &[Entry], deshred_transaction_notifier: Option<&(dyn DeshredTransactionNotifier + Send + Sync)>, root_bank: Option<&solana_runtime::bank::Bank>, @@ -251,6 +260,8 @@ impl CompletedDataSetsService { let mut notify_measure = Measure::start("notify_deshred"); notifier.notify_deshred_transaction( slot, + completed_data_set_starting_shred_index, + completed_data_set_ending_shred_index_exclusive, signature, is_vote, tx, @@ -288,7 +299,10 @@ pub mod test { solana_hash::Hash, solana_instruction::Instruction, solana_keypair::Keypair, - solana_ledger::{blockstore, blockstore::Blockstore, get_tmp_ledger_path_auto_delete}, + solana_ledger::{ + blockstore, blockstore::Blockstore, get_tmp_ledger_path_auto_delete, + shred::max_ticks_per_n_shreds, + }, solana_message::{ Message, VersionedMessage, v0::{self, LoadedAddresses}, @@ -312,6 +326,8 @@ pub mod test { #[derive(Clone, Debug, PartialEq, Eq)] struct DeshredNotification { slot: u64, + completed_data_set_starting_shred_index: u32, + completed_data_set_ending_shred_index_exclusive: u32, signature: Signature, is_vote: bool, transaction: VersionedTransaction, @@ -327,6 +343,8 @@ pub mod test { fn notify_deshred_transaction( &self, slot: u64, + completed_data_set_starting_shred_index: u32, + completed_data_set_ending_shred_index_exclusive: u32, signature: &Signature, is_vote: bool, transaction: &VersionedTransaction, @@ -337,6 +355,8 @@ pub mod test { .unwrap() .push(DeshredNotification { slot, + completed_data_set_starting_shred_index, + completed_data_set_ending_shred_index_exclusive, signature: *signature, is_vote, transaction: transaction.clone(), @@ -466,6 +486,8 @@ pub mod test { CompletedDataSetsService::notify_deshred_transactions_for_completed_data_set( 42, + 7, + 9, &entries, Some(¬ifier), None, @@ -475,8 +497,18 @@ pub mod test { let notifications = notifier.notifications.lock().unwrap().clone(); assert_eq!(notifications.len(), 2); assert_eq!(notifications[0].slot, 42); + assert_eq!(notifications[0].completed_data_set_starting_shred_index, 7); + assert_eq!( + notifications[0].completed_data_set_ending_shred_index_exclusive, + 9 + ); assert_eq!(notifications[0].signature, legacy_vote_tx.signatures[0]); assert!(notifications[0].is_vote); + assert_eq!(notifications[1].completed_data_set_starting_shred_index, 7); + assert_eq!( + notifications[1].completed_data_set_ending_shred_index_exclusive, + 9 + ); assert_eq!(notifications[1].signature, legacy_non_vote_tx.signatures[0]); assert!(!notifications[1].is_vote); assert!( @@ -521,6 +553,7 @@ pub mod test { let shreds = blockstore::entries_to_test_shreds(&entries, 11, 10, true, 0); let completed_data_sets = blockstore.insert_shreds(shreds, None, true).unwrap(); assert_eq!(completed_data_sets.len(), 1); + let completed_data_set = completed_data_sets[0].clone(); sender.send(completed_data_sets).unwrap(); CompletedDataSetsService::recv_completed_data_sets( @@ -536,9 +569,80 @@ pub mod test { let notifications = test_notifier.notifications.lock().unwrap().clone(); assert_eq!(notifications.len(), 1); assert_eq!(notifications[0].slot, 11); + assert_eq!( + notifications[0].completed_data_set_starting_shred_index, + completed_data_set.indices.start + ); + assert_eq!( + notifications[0].completed_data_set_ending_shred_index_exclusive, + completed_data_set.indices.end + ); assert_eq!(max_slots.shred_insert.load(Ordering::Relaxed), 11); } + #[test] + fn test_recv_completed_data_sets_notifies_completed_data_set_range_for_multi_shred_batch() { + let ledger_path = get_tmp_ledger_path_auto_delete!(); + let blockstore = Arc::new(Blockstore::open(ledger_path.path()).unwrap()); + let bank_forks = BankForks::new_rw_arc(Bank::new_for_tests(&GenesisConfig::default())); + let rpc_subscriptions = RpcSubscriptions::new_for_tests_with_blockstore( + Arc::new(AtomicBool::new(false)), + Arc::new(AtomicU64::default()), + blockstore.clone(), + bank_forks.clone(), + Arc::new(RwLock::new(BlockCommitmentCache::new_for_tests())), + OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks), + ); + let max_slots = Arc::new(MaxSlots::default()); + let test_notifier = Arc::new(TestDeshredTransactionNotifier::default()); + let notifier = Some(test_notifier.clone() as DeshredTransactionNotifierArc); + let (sender, receiver) = bounded(1); + + let num_entries = max_ticks_per_n_shreds(1, None) as usize + 1; + let mut previous_hash = Hash::default(); + let entries: Vec<_> = (0..num_entries) + .map(|_| { + let entry = next_versioned_entry( + &previous_hash, + 1, + vec![legacy_transaction(Instruction::new_with_bytes( + Pubkey::new_unique(), + &[], + Vec::new(), + ))], + ); + previous_hash = entry.hash; + entry + }) + .collect(); + let shreds = blockstore::entries_to_test_shreds(&entries, 12, 11, true, 0); + assert!(shreds.len() > 1); + let completed_data_sets = blockstore.insert_shreds(shreds, None, true).unwrap(); + assert_eq!(completed_data_sets.len(), 1); + let completed_data_set = completed_data_sets[0].clone(); + sender.send(completed_data_sets).unwrap(); + + CompletedDataSetsService::recv_completed_data_sets( + &receiver, + &blockstore, + &rpc_subscriptions, + ¬ifier, + &max_slots, + &bank_forks, + ) + .unwrap(); + + let notifications = test_notifier.notifications.lock().unwrap().clone(); + assert_eq!(notifications.len(), num_entries); + assert!(notifications.iter().all(|notification| { + notification.slot == 12 + && notification.completed_data_set_starting_shred_index + == completed_data_set.indices.start + && notification.completed_data_set_ending_shred_index_exclusive + == completed_data_set.indices.end + })); + } + #[test] fn test_lut_failure_stats_accumulated() { let notifier = TestDeshredTransactionNotifier::default(); @@ -567,6 +671,8 @@ pub mod test { CompletedDataSetsService::notify_deshred_transactions_for_completed_data_set( 10, + 0, + 1, &entries, Some(¬ifier), Some(&bank), @@ -613,6 +719,8 @@ pub mod test { // Pass None for root_bank, simulates ALT resolution not being opted in CompletedDataSetsService::notify_deshred_transactions_for_completed_data_set( 10, + 0, + 1, &entries, Some(¬ifier), None, diff --git a/geyser-plugin-interface/src/geyser_plugin_interface.rs b/geyser-plugin-interface/src/geyser_plugin_interface.rs index d1ea5ab161f..3ec0f8f0537 100644 --- a/geyser-plugin-interface/src/geyser_plugin_interface.rs +++ b/geyser-plugin-interface/src/geyser_plugin_interface.rs @@ -217,10 +217,40 @@ pub struct ReplicaDeshredTransactionInfo<'a> { pub loaded_addresses: Option<&'a LoadedAddresses>, } +/// Extends ReplicaDeshredTransactionInfo with metadata about the completed data set that +/// produced the transaction. +/// +/// A completed data set is a contiguous range of data shreds whose combined payload deserializes +/// to a single `Vec`. Multiple transactions can share the same completed-data-set range, +/// and completed data sets for the same slot may be observed out of order. These fields describe +/// the data-set container; they are not a block-wide transaction index. +#[derive(Clone, Debug)] +#[repr(C)] +pub struct ReplicaDeshredTransactionInfoV2<'a> { + /// The transaction signature, used for identifying the transaction. + pub signature: &'a Signature, + + /// Indicates if the transaction is a simple vote transaction. + pub is_vote: bool, + + /// The versioned transaction. + pub transaction: &'a VersionedTransaction, + + /// Addresses loaded from address lookup tables for V0 transactions. + pub loaded_addresses: Option<&'a LoadedAddresses>, + + /// The inclusive starting shred index of the completed data set containing this transaction. + pub completed_data_set_starting_shred_index: u32, + + /// The exclusive ending shred index of the completed data set containing this transaction. + pub completed_data_set_ending_shred_index_exclusive: u32, +} + /// A wrapper to future-proof ReplicaDeshredTransactionInfo handling. #[repr(u32)] pub enum ReplicaDeshredTransactionInfoVersions<'a> { V0_0_1(&'a ReplicaDeshredTransactionInfo<'a>), + V0_0_2(&'a ReplicaDeshredTransactionInfoV2<'a>), } #[derive(Clone, Debug)] diff --git a/geyser-plugin-manager/src/deshred_transaction_notifier.rs b/geyser-plugin-manager/src/deshred_transaction_notifier.rs index a3236ff3964..e9538905b54 100644 --- a/geyser-plugin-manager/src/deshred_transaction_notifier.rs +++ b/geyser-plugin-manager/src/deshred_transaction_notifier.rs @@ -2,7 +2,7 @@ use { crate::geyser_plugin_manager::GeyserPluginManager, agave_geyser_plugin_interface::geyser_plugin_interface::{ - ReplicaDeshredTransactionInfo, ReplicaDeshredTransactionInfoVersions, + ReplicaDeshredTransactionInfoV2, ReplicaDeshredTransactionInfoVersions, }, arc_swap::ArcSwap, log::*, @@ -29,6 +29,8 @@ impl DeshredTransactionNotifier for DeshredTransactionNotifierImpl { fn notify_deshred_transaction( &self, slot: Slot, + completed_data_set_starting_shred_index: u32, + completed_data_set_ending_shred_index_exclusive: u32, signature: &Signature, is_vote: bool, transaction: &VersionedTransaction, @@ -42,11 +44,13 @@ impl DeshredTransactionNotifier for DeshredTransactionNotifierImpl { let mut measure = Measure::start("geyser-plugin-notify_plugins_of_deshred_transaction_info"); - let transaction_info = ReplicaDeshredTransactionInfo { + let transaction_info = ReplicaDeshredTransactionInfoV2 { signature, is_vote, transaction, loaded_addresses, + completed_data_set_starting_shred_index, + completed_data_set_ending_shred_index_exclusive, }; for plugin in plugin_manager.plugins.iter() { @@ -54,7 +58,7 @@ impl DeshredTransactionNotifier for DeshredTransactionNotifierImpl { continue; } match plugin.notify_deshred_transaction( - ReplicaDeshredTransactionInfoVersions::V0_0_1(&transaction_info), + ReplicaDeshredTransactionInfoVersions::V0_0_2(&transaction_info), slot, ) { Err(err) => { diff --git a/geyser-plugin-manager/src/geyser_plugin_manager.rs b/geyser-plugin-manager/src/geyser_plugin_manager.rs index 4c5e29ecfcb..0b8b08a0aa7 100644 --- a/geyser-plugin-manager/src/geyser_plugin_manager.rs +++ b/geyser-plugin-manager/src/geyser_plugin_manager.rs @@ -509,7 +509,8 @@ mod tests { geyser_plugin_service::ARC_TRY_UNWRAP_ATTEMPT_SLEEP_DURATION, }, agave_geyser_plugin_interface::geyser_plugin_interface::{ - GeyserPlugin, ReplicaDeshredTransactionInfoVersions, Result as PluginResult, + GeyserPlugin, ReplicaDeshredTransactionInfo, ReplicaDeshredTransactionInfoVersions, + Result as PluginResult, }, arc_swap::ArcSwap, libloading::Library, @@ -594,6 +595,8 @@ mod tests { #[derive(Clone, Debug, PartialEq, Eq)] struct RecordedDeshredNotification { slot: Slot, + completed_data_set_starting_shred_index: u32, + completed_data_set_ending_shred_index_exclusive: u32, signature: Signature, is_vote: bool, transaction: VersionedTransaction, @@ -618,12 +621,18 @@ mod tests { transaction: ReplicaDeshredTransactionInfoVersions, slot: Slot, ) -> PluginResult<()> { - let ReplicaDeshredTransactionInfoVersions::V0_0_1(transaction) = transaction; + let ReplicaDeshredTransactionInfoVersions::V0_0_2(transaction) = transaction else { + panic!("expected V0_0_2 deshred transaction info"); + }; self.notifications .lock() .unwrap() .push(RecordedDeshredNotification { slot, + completed_data_set_starting_shred_index: transaction + .completed_data_set_starting_shred_index, + completed_data_set_ending_shred_index_exclusive: transaction + .completed_data_set_ending_shred_index_exclusive, signature: *transaction.signature, is_vote: transaction.is_vote, transaction: transaction.transaction.clone(), @@ -820,6 +829,8 @@ mod tests { notifier.notify_deshred_transaction( 11, + 23, + 31, &transaction.signatures[0], true, &transaction, @@ -829,6 +840,14 @@ mod tests { let enabled_notifications = enabled_notifications.lock().unwrap().clone(); assert_eq!(enabled_notifications.len(), 1); assert_eq!(enabled_notifications[0].slot, 11); + assert_eq!( + enabled_notifications[0].completed_data_set_starting_shred_index, + 23 + ); + assert_eq!( + enabled_notifications[0].completed_data_set_ending_shred_index_exclusive, + 31 + ); assert_eq!( enabled_notifications[0].signature, transaction.signatures[0] @@ -842,6 +861,29 @@ mod tests { assert!(disabled_notifications.lock().unwrap().is_empty()); } + #[test] + #[should_panic(expected = "expected V0_0_2 deshred transaction info")] + fn test_deshred_test_plugin_panics_on_legacy_deshred_info_version() { + let plugin = DeshredTestPlugin { + name: DUMMY_NAME, + enabled: true, + alt_resolution_enabled: false, + notifications: Arc::new(Mutex::new(Vec::new())), + }; + let transaction = sample_transaction(); + let deshred_info = ReplicaDeshredTransactionInfo { + signature: &transaction.signatures[0], + is_vote: false, + transaction: &transaction, + loaded_addresses: None, + }; + + let _ = plugin.notify_deshred_transaction( + ReplicaDeshredTransactionInfoVersions::V0_0_1(&deshred_info), + 11, + ); + } + #[test] fn test_deshred_transaction_alt_resolution_enabled() { let empty_manager = GeyserPluginManager::default(); diff --git a/ledger/src/deshred_transaction_notifier_interface.rs b/ledger/src/deshred_transaction_notifier_interface.rs index 3b2fad33d23..3eaab08fa10 100644 --- a/ledger/src/deshred_transaction_notifier_interface.rs +++ b/ledger/src/deshred_transaction_notifier_interface.rs @@ -5,10 +5,16 @@ use { /// Trait for notifying about transactions when they are deshredded. /// This is called when entries are formed from shreds, before any execution occurs. +/// +/// The completed-data-set shred range identifies the contiguous range of data shreds whose +/// combined payload deserializes to a single `Vec`. All transactions reconstructed from +/// that same completed data set share the same shred-range metadata. pub trait DeshredTransactionNotifier { fn notify_deshred_transaction( &self, slot: Slot, + completed_data_set_starting_shred_index: u32, + completed_data_set_ending_shred_index_exclusive: u32, signature: &Signature, is_vote: bool, transaction: &VersionedTransaction,