Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
114 changes: 111 additions & 3 deletions core/src/completed_data_sets_service.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
//! [`CompletedDataSetsService`] is a hub, that runs different operations when a "completed data
//! set", also known as a [`Vec<Entry>`], is received by the validator.
//! [`CompletedDataSetsService`] is a hub that runs different operations when a completed data set
//! is received by the validator.
//!
//! A completed data set is a contiguous range of data shreds whose combined payload deserializes
//! to a single [`Vec<Entry>`].
//!
//! Currently, `WindowService` sends [`CompletedDataSetInfo`]s via a `completed_sets_receiver`
//! provided to the [`CompletedDataSetsService`].
Expand Down Expand Up @@ -157,10 +160,14 @@ impl CompletedDataSetsService {
.flatten()
.map(|completed_data_set_info| {
let CompletedDataSetInfo { slot, indices } = completed_data_set_info;
let completed_data_set_starting_shred_index = indices.start;
let completed_data_set_ending_shred_index_exclusive = indices.end;
match blockstore.get_entries_in_data_block(slot, indices, /*slot_meta:*/ None) {
Ok(entries) => {
Self::notify_deshred_transactions_for_completed_data_set(
slot,
completed_data_set_starting_shred_index,
completed_data_set_ending_shred_index_exclusive,
&entries,
deshred_transaction_notifier.as_deref(),
root_bank.as_deref(),
Expand Down Expand Up @@ -207,6 +214,8 @@ impl CompletedDataSetsService {

fn notify_deshred_transactions_for_completed_data_set(
slot: u64,
completed_data_set_starting_shred_index: u32,
completed_data_set_ending_shred_index_exclusive: u32,
entries: &[Entry],
deshred_transaction_notifier: Option<&(dyn DeshredTransactionNotifier + Send + Sync)>,
root_bank: Option<&solana_runtime::bank::Bank>,
Expand Down Expand Up @@ -251,6 +260,8 @@ impl CompletedDataSetsService {
let mut notify_measure = Measure::start("notify_deshred");
notifier.notify_deshred_transaction(
slot,
completed_data_set_starting_shred_index,
completed_data_set_ending_shred_index_exclusive,
signature,
is_vote,
tx,
Expand Down Expand Up @@ -288,7 +299,10 @@ pub mod test {
solana_hash::Hash,
solana_instruction::Instruction,
solana_keypair::Keypair,
solana_ledger::{blockstore, blockstore::Blockstore, get_tmp_ledger_path_auto_delete},
solana_ledger::{
blockstore, blockstore::Blockstore, get_tmp_ledger_path_auto_delete,
shred::max_ticks_per_n_shreds,
},
solana_message::{
Message, VersionedMessage,
v0::{self, LoadedAddresses},
Expand All @@ -312,6 +326,8 @@ pub mod test {
#[derive(Clone, Debug, PartialEq, Eq)]
struct DeshredNotification {
slot: u64,
completed_data_set_starting_shred_index: u32,
completed_data_set_ending_shred_index_exclusive: u32,
signature: Signature,
is_vote: bool,
transaction: VersionedTransaction,
Expand All @@ -327,6 +343,8 @@ pub mod test {
fn notify_deshred_transaction(
&self,
slot: u64,
completed_data_set_starting_shred_index: u32,
completed_data_set_ending_shred_index_exclusive: u32,
signature: &Signature,
is_vote: bool,
transaction: &VersionedTransaction,
Expand All @@ -337,6 +355,8 @@ pub mod test {
.unwrap()
.push(DeshredNotification {
slot,
completed_data_set_starting_shred_index,
completed_data_set_ending_shred_index_exclusive,
signature: *signature,
is_vote,
transaction: transaction.clone(),
Expand Down Expand Up @@ -466,6 +486,8 @@ pub mod test {

CompletedDataSetsService::notify_deshred_transactions_for_completed_data_set(
42,
7,
9,
&entries,
Some(&notifier),
None,
Expand All @@ -475,8 +497,18 @@ pub mod test {
let notifications = notifier.notifications.lock().unwrap().clone();
assert_eq!(notifications.len(), 2);
assert_eq!(notifications[0].slot, 42);
assert_eq!(notifications[0].completed_data_set_starting_shred_index, 7);
assert_eq!(
notifications[0].completed_data_set_ending_shred_index_exclusive,
9
);
assert_eq!(notifications[0].signature, legacy_vote_tx.signatures[0]);
assert!(notifications[0].is_vote);
assert_eq!(notifications[1].completed_data_set_starting_shred_index, 7);
assert_eq!(
notifications[1].completed_data_set_ending_shred_index_exclusive,
9
);
assert_eq!(notifications[1].signature, legacy_non_vote_tx.signatures[0]);
assert!(!notifications[1].is_vote);
assert!(
Expand Down Expand Up @@ -521,6 +553,7 @@ pub mod test {
let shreds = blockstore::entries_to_test_shreds(&entries, 11, 10, true, 0);
let completed_data_sets = blockstore.insert_shreds(shreds, None, true).unwrap();
assert_eq!(completed_data_sets.len(), 1);
let completed_data_set = completed_data_sets[0].clone();
sender.send(completed_data_sets).unwrap();

CompletedDataSetsService::recv_completed_data_sets(
Expand All @@ -536,9 +569,80 @@ pub mod test {
let notifications = test_notifier.notifications.lock().unwrap().clone();
assert_eq!(notifications.len(), 1);
assert_eq!(notifications[0].slot, 11);
assert_eq!(
notifications[0].completed_data_set_starting_shred_index,
completed_data_set.indices.start
);
assert_eq!(
notifications[0].completed_data_set_ending_shred_index_exclusive,
completed_data_set.indices.end
);
assert_eq!(max_slots.shred_insert.load(Ordering::Relaxed), 11);
}

#[test]
fn test_recv_completed_data_sets_notifies_completed_data_set_range_for_multi_shred_batch() {
let ledger_path = get_tmp_ledger_path_auto_delete!();
let blockstore = Arc::new(Blockstore::open(ledger_path.path()).unwrap());
let bank_forks = BankForks::new_rw_arc(Bank::new_for_tests(&GenesisConfig::default()));
let rpc_subscriptions = RpcSubscriptions::new_for_tests_with_blockstore(
Arc::new(AtomicBool::new(false)),
Arc::new(AtomicU64::default()),
blockstore.clone(),
bank_forks.clone(),
Arc::new(RwLock::new(BlockCommitmentCache::new_for_tests())),
OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks),
);
let max_slots = Arc::new(MaxSlots::default());
let test_notifier = Arc::new(TestDeshredTransactionNotifier::default());
let notifier = Some(test_notifier.clone() as DeshredTransactionNotifierArc);
let (sender, receiver) = bounded(1);

let num_entries = max_ticks_per_n_shreds(1, None) as usize + 1;
let mut previous_hash = Hash::default();
let entries: Vec<_> = (0..num_entries)
.map(|_| {
let entry = next_versioned_entry(
&previous_hash,
1,
vec![legacy_transaction(Instruction::new_with_bytes(
Pubkey::new_unique(),
&[],
Vec::new(),
))],
);
previous_hash = entry.hash;
entry
})
.collect();
let shreds = blockstore::entries_to_test_shreds(&entries, 12, 11, true, 0);
assert!(shreds.len() > 1);
let completed_data_sets = blockstore.insert_shreds(shreds, None, true).unwrap();
assert_eq!(completed_data_sets.len(), 1);
let completed_data_set = completed_data_sets[0].clone();
sender.send(completed_data_sets).unwrap();

CompletedDataSetsService::recv_completed_data_sets(
&receiver,
&blockstore,
&rpc_subscriptions,
&notifier,
&max_slots,
&bank_forks,
)
.unwrap();

let notifications = test_notifier.notifications.lock().unwrap().clone();
assert_eq!(notifications.len(), num_entries);
assert!(notifications.iter().all(|notification| {
notification.slot == 12
&& notification.completed_data_set_starting_shred_index
== completed_data_set.indices.start
&& notification.completed_data_set_ending_shred_index_exclusive
== completed_data_set.indices.end
}));
}

#[test]
fn test_lut_failure_stats_accumulated() {
let notifier = TestDeshredTransactionNotifier::default();
Expand Down Expand Up @@ -567,6 +671,8 @@ pub mod test {

CompletedDataSetsService::notify_deshred_transactions_for_completed_data_set(
10,
0,
1,
&entries,
Some(&notifier),
Some(&bank),
Expand Down Expand Up @@ -613,6 +719,8 @@ pub mod test {
// Pass None for root_bank, simulates ALT resolution not being opted in
CompletedDataSetsService::notify_deshred_transactions_for_completed_data_set(
10,
0,
1,
&entries,
Some(&notifier),
None,
Expand Down
30 changes: 30 additions & 0 deletions geyser-plugin-interface/src/geyser_plugin_interface.rs
Original file line number Diff line number Diff line change
Expand Up @@ -217,10 +217,40 @@ pub struct ReplicaDeshredTransactionInfo<'a> {
pub loaded_addresses: Option<&'a LoadedAddresses>,
}

/// Extends ReplicaDeshredTransactionInfo with metadata about the completed data set that
/// produced the transaction.
///
/// A completed data set is a contiguous range of data shreds whose combined payload deserializes
/// to a single `Vec<Entry>`. Multiple transactions can share the same completed-data-set range,
/// and completed data sets for the same slot may be observed out of order. These fields describe
/// the data-set container; they are not a block-wide transaction index.
#[derive(Clone, Debug)]
#[repr(C)]
pub struct ReplicaDeshredTransactionInfoV2<'a> {
/// The transaction signature, used for identifying the transaction.
pub signature: &'a Signature,

/// Indicates if the transaction is a simple vote transaction.
pub is_vote: bool,

/// The versioned transaction.
pub transaction: &'a VersionedTransaction,

/// Addresses loaded from address lookup tables for V0 transactions.
pub loaded_addresses: Option<&'a LoadedAddresses>,

/// The inclusive starting shred index of the completed data set containing this transaction.
pub completed_data_set_starting_shred_index: u32,

/// The exclusive ending shred index of the completed data set containing this transaction.
pub completed_data_set_ending_shred_index_exclusive: u32,
}

/// A wrapper to future-proof ReplicaDeshredTransactionInfo handling.
#[repr(u32)]
pub enum ReplicaDeshredTransactionInfoVersions<'a> {
V0_0_1(&'a ReplicaDeshredTransactionInfo<'a>),
V0_0_2(&'a ReplicaDeshredTransactionInfoV2<'a>),
}

#[derive(Clone, Debug)]
Expand Down
10 changes: 7 additions & 3 deletions geyser-plugin-manager/src/deshred_transaction_notifier.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
use {
crate::geyser_plugin_manager::GeyserPluginManager,
agave_geyser_plugin_interface::geyser_plugin_interface::{
ReplicaDeshredTransactionInfo, ReplicaDeshredTransactionInfoVersions,
ReplicaDeshredTransactionInfoV2, ReplicaDeshredTransactionInfoVersions,
},
arc_swap::ArcSwap,
log::*,
Expand All @@ -29,6 +29,8 @@ impl DeshredTransactionNotifier for DeshredTransactionNotifierImpl {
fn notify_deshred_transaction(
&self,
slot: Slot,
completed_data_set_starting_shred_index: u32,
completed_data_set_ending_shred_index_exclusive: u32,
signature: &Signature,
is_vote: bool,
transaction: &VersionedTransaction,
Expand All @@ -42,19 +44,21 @@ impl DeshredTransactionNotifier for DeshredTransactionNotifierImpl {

let mut measure =
Measure::start("geyser-plugin-notify_plugins_of_deshred_transaction_info");
let transaction_info = ReplicaDeshredTransactionInfo {
let transaction_info = ReplicaDeshredTransactionInfoV2 {
signature,
is_vote,
transaction,
loaded_addresses,
completed_data_set_starting_shred_index,
completed_data_set_ending_shred_index_exclusive,
};

for plugin in plugin_manager.plugins.iter() {
if !plugin.deshred_transaction_notifications_enabled() {
continue;
}
match plugin.notify_deshred_transaction(
ReplicaDeshredTransactionInfoVersions::V0_0_1(&transaction_info),
ReplicaDeshredTransactionInfoVersions::V0_0_2(&transaction_info),
slot,
) {
Err(err) => {
Expand Down
46 changes: 44 additions & 2 deletions geyser-plugin-manager/src/geyser_plugin_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -509,7 +509,8 @@ mod tests {
geyser_plugin_service::ARC_TRY_UNWRAP_ATTEMPT_SLEEP_DURATION,
},
agave_geyser_plugin_interface::geyser_plugin_interface::{
GeyserPlugin, ReplicaDeshredTransactionInfoVersions, Result as PluginResult,
GeyserPlugin, ReplicaDeshredTransactionInfo, ReplicaDeshredTransactionInfoVersions,
Result as PluginResult,
},
arc_swap::ArcSwap,
libloading::Library,
Expand Down Expand Up @@ -594,6 +595,8 @@ mod tests {
#[derive(Clone, Debug, PartialEq, Eq)]
struct RecordedDeshredNotification {
slot: Slot,
completed_data_set_starting_shred_index: u32,
completed_data_set_ending_shred_index_exclusive: u32,
signature: Signature,
is_vote: bool,
transaction: VersionedTransaction,
Expand All @@ -618,12 +621,18 @@ mod tests {
transaction: ReplicaDeshredTransactionInfoVersions,
slot: Slot,
) -> PluginResult<()> {
let ReplicaDeshredTransactionInfoVersions::V0_0_1(transaction) = transaction;
let ReplicaDeshredTransactionInfoVersions::V0_0_2(transaction) = transaction else {
panic!("expected V0_0_2 deshred transaction info");
};
self.notifications
.lock()
.unwrap()
.push(RecordedDeshredNotification {
slot,
completed_data_set_starting_shred_index: transaction
.completed_data_set_starting_shred_index,
completed_data_set_ending_shred_index_exclusive: transaction
.completed_data_set_ending_shred_index_exclusive,
signature: *transaction.signature,
is_vote: transaction.is_vote,
transaction: transaction.transaction.clone(),
Expand Down Expand Up @@ -820,6 +829,8 @@ mod tests {

notifier.notify_deshred_transaction(
11,
23,
31,
&transaction.signatures[0],
true,
&transaction,
Expand All @@ -829,6 +840,14 @@ mod tests {
let enabled_notifications = enabled_notifications.lock().unwrap().clone();
assert_eq!(enabled_notifications.len(), 1);
assert_eq!(enabled_notifications[0].slot, 11);
assert_eq!(
enabled_notifications[0].completed_data_set_starting_shred_index,
23
);
assert_eq!(
enabled_notifications[0].completed_data_set_ending_shred_index_exclusive,
31
);
assert_eq!(
enabled_notifications[0].signature,
transaction.signatures[0]
Expand All @@ -842,6 +861,29 @@ mod tests {
assert!(disabled_notifications.lock().unwrap().is_empty());
}

#[test]
#[should_panic(expected = "expected V0_0_2 deshred transaction info")]
fn test_deshred_test_plugin_panics_on_legacy_deshred_info_version() {
let plugin = DeshredTestPlugin {
name: DUMMY_NAME,
enabled: true,
alt_resolution_enabled: false,
notifications: Arc::new(Mutex::new(Vec::new())),
};
let transaction = sample_transaction();
let deshred_info = ReplicaDeshredTransactionInfo {
signature: &transaction.signatures[0],
is_vote: false,
transaction: &transaction,
loaded_addresses: None,
};

let _ = plugin.notify_deshred_transaction(
ReplicaDeshredTransactionInfoVersions::V0_0_1(&deshred_info),
11,
);
}

#[test]
fn test_deshred_transaction_alt_resolution_enabled() {
let empty_manager = GeyserPluginManager::default();
Expand Down
Loading
Loading