Skip to content

Commit 9d39a36

Browse files
authored
geyser: Uses scan_accounts_for_geyser() at startup (anza-xyz#5069)
1 parent 883afec commit 9d39a36

File tree

4 files changed

+80
-36
lines changed

4 files changed

+80
-36
lines changed

accounts-db/src/accounts_db/geyser_plugin_utils.rs

+13-19
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
use {
2-
crate::{account_storage::meta::StoredAccountMeta, accounts_db::AccountsDb},
2+
crate::{accounts_db::AccountsDb, accounts_update_notifier_interface::AccountForGeyser},
33
solana_account::AccountSharedData,
44
solana_clock::Slot,
55
solana_measure::measure::Measure,
@@ -92,7 +92,7 @@ impl AccountsDb {
9292
let mut pubkeys = HashSet::new();
9393

9494
// populate `accounts_duplicate` for any pubkeys that are in this storage twice.
95-
// Storages cannot return `StoredAccountMeta<'_>` for more than 1 account at a time, so we have to do 2 passes to make sure
95+
// Storages cannot return `AccountForGeyser` for more than 1 account at a time, so we have to do 2 passes to make sure
9696
// we don't have duplicate pubkeys.
9797
let mut i = 0;
9898
storage_entry.accounts.scan_pubkeys(|pubkey| {
@@ -104,14 +104,14 @@ impl AccountsDb {
104104

105105
// now, actually notify geyser
106106
let mut i = 0;
107-
storage_entry.accounts.scan_accounts(|account| {
107+
storage_entry.accounts.scan_accounts_for_geyser(|account| {
108108
i += 1;
109109
account_len += 1;
110-
if notified_accounts.contains(account.pubkey()) {
110+
if notified_accounts.contains(account.pubkey) {
111111
notify_stats.skipped_accounts += 1;
112112
return;
113113
}
114-
if let Some(highest_i) = accounts_duplicate.get(account.pubkey()) {
114+
if let Some(highest_i) = accounts_duplicate.get(account.pubkey) {
115115
if highest_i != &i {
116116
// this pubkey is in this storage twice and the current instance is not the last one, so we skip it.
117117
// We only send unique accounts in this slot to `notify_filtered_accounts`
@@ -140,7 +140,7 @@ impl AccountsDb {
140140
slot: Slot,
141141
write_version: u64,
142142
notified_accounts: &mut HashSet<Pubkey>,
143-
accounts_to_stream: impl Iterator<Item = StoredAccountMeta<'a>>,
143+
accounts_to_stream: impl Iterator<Item = AccountForGeyser<'a>>,
144144
notify_stats: &mut GeyserPluginNotifyAtSnapshotRestoreStats,
145145
) {
146146
let notifier = self.accounts_update_notifier.as_ref().unwrap();
@@ -153,7 +153,7 @@ impl AccountsDb {
153153
notify_stats.total_pure_notify += measure_pure_notify.as_us() as usize;
154154

155155
let mut measure_bookkeep = Measure::start("accountsdb-plugin-notifying-bookeeeping");
156-
notified_accounts.insert(*account.pubkey());
156+
notified_accounts.insert(*account.pubkey);
157157
measure_bookkeep.stop();
158158
notify_stats.total_pure_bookeeping += measure_bookkeep.as_us() as usize;
159159

@@ -167,18 +167,12 @@ impl AccountsDb {
167167
#[cfg(test)]
168168
pub mod tests {
169169
use {
170-
crate::{
171-
account_storage::meta::StoredAccountMeta,
172-
accounts_db::AccountsDb,
173-
accounts_update_notifier_interface::{
174-
AccountsUpdateNotifier, AccountsUpdateNotifierInterface,
175-
},
170+
super::*,
171+
crate::accounts_update_notifier_interface::{
172+
AccountsUpdateNotifier, AccountsUpdateNotifierInterface,
176173
},
177174
dashmap::DashMap,
178-
solana_account::{AccountSharedData, ReadableAccount},
179-
solana_clock::Slot,
180-
solana_pubkey::Pubkey,
181-
solana_transaction::sanitized::SanitizedTransaction,
175+
solana_account::ReadableAccount as _,
182176
std::sync::{
183177
atomic::{AtomicBool, Ordering},
184178
Arc,
@@ -223,10 +217,10 @@ pub mod tests {
223217
&self,
224218
slot: Slot,
225219
_write_version: u64,
226-
account: &StoredAccountMeta,
220+
account: &AccountForGeyser<'_>,
227221
) {
228222
self.accounts_notified
229-
.entry(*account.pubkey())
223+
.entry(*account.pubkey)
230224
.or_default()
231225
.push((slot, account.to_account_shared_data()));
232226
}

accounts-db/src/accounts_file.rs

+21-1
Original file line numberDiff line numberDiff line change
@@ -3,13 +3,14 @@ use {
33
account_info::AccountInfo,
44
account_storage::meta::StoredAccountMeta,
55
accounts_db::AccountsFileId,
6+
accounts_update_notifier_interface::AccountForGeyser,
67
append_vec::{AppendVec, AppendVecError, IndexInfo},
78
storable_accounts::StorableAccounts,
89
tiered_storage::{
910
error::TieredStorageError, hot::HOT_FORMAT, index::IndexOffset, TieredStorage,
1011
},
1112
},
12-
solana_account::AccountSharedData,
13+
solana_account::{AccountSharedData, ReadableAccount as _},
1314
solana_clock::Slot,
1415
solana_pubkey::Pubkey,
1516
std::{
@@ -233,6 +234,25 @@ impl AccountsFile {
233234
}
234235
}
235236

237+
/// Iterate over all accounts and call `callback` with each account.
238+
/// Only intended to be used by Geyser.
239+
pub fn scan_accounts_for_geyser(
240+
&self,
241+
mut callback: impl for<'local> FnMut(AccountForGeyser<'local>),
242+
) {
243+
self.scan_accounts(|stored_account_meta| {
244+
let account_for_geyser = AccountForGeyser {
245+
pubkey: stored_account_meta.pubkey(),
246+
lamports: stored_account_meta.lamports(),
247+
owner: stored_account_meta.owner(),
248+
executable: stored_account_meta.executable(),
249+
rent_epoch: stored_account_meta.rent_epoch(),
250+
data: stored_account_meta.data(),
251+
};
252+
callback(account_for_geyser)
253+
})
254+
}
255+
236256
/// for each offset in `sorted_offsets`, return the account size
237257
pub(crate) fn get_account_sizes(&self, sorted_offsets: &[usize]) -> Vec<usize> {
238258
match self {

accounts-db/src/accounts_update_notifier_interface.rs

+34-3
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
use {
2-
crate::account_storage::meta::StoredAccountMeta, solana_account::AccountSharedData,
3-
solana_clock::Slot, solana_pubkey::Pubkey, solana_transaction::sanitized::SanitizedTransaction,
2+
solana_account::{AccountSharedData, ReadableAccount},
3+
solana_clock::{Epoch, Slot},
4+
solana_pubkey::Pubkey,
5+
solana_transaction::sanitized::SanitizedTransaction,
46
std::sync::Arc,
57
};
68

@@ -24,11 +26,40 @@ pub trait AccountsUpdateNotifierInterface: std::fmt::Debug {
2426
&self,
2527
slot: Slot,
2628
write_version: u64,
27-
account: &StoredAccountMeta,
29+
account: &AccountForGeyser<'_>,
2830
);
2931

3032
/// Notified when all accounts have been notified when restoring from a snapshot.
3133
fn notify_end_of_restore_from_snapshot(&self);
3234
}
3335

3436
pub type AccountsUpdateNotifier = Arc<dyn AccountsUpdateNotifierInterface + Sync + Send>;
37+
38+
/// Account type with only the fields necessary for Geyser
39+
#[derive(Debug, Clone)]
40+
pub struct AccountForGeyser<'a> {
41+
pub pubkey: &'a Pubkey,
42+
pub lamports: u64,
43+
pub owner: &'a Pubkey,
44+
pub executable: bool,
45+
pub rent_epoch: Epoch,
46+
pub data: &'a [u8],
47+
}
48+
49+
impl ReadableAccount for AccountForGeyser<'_> {
50+
fn lamports(&self) -> u64 {
51+
self.lamports
52+
}
53+
fn data(&self) -> &[u8] {
54+
self.data
55+
}
56+
fn owner(&self) -> &Pubkey {
57+
self.owner
58+
}
59+
fn executable(&self) -> bool {
60+
self.executable
61+
}
62+
fn rent_epoch(&self) -> Epoch {
63+
self.rent_epoch
64+
}
65+
}

geyser-plugin-manager/src/accounts_update_notifier.rs

+12-13
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,8 @@ use {
66
},
77
log::*,
88
solana_account::{AccountSharedData, ReadableAccount},
9-
solana_accounts_db::{
10-
account_storage::meta::StoredAccountMeta,
11-
accounts_update_notifier_interface::AccountsUpdateNotifierInterface,
9+
solana_accounts_db::accounts_update_notifier_interface::{
10+
AccountForGeyser, AccountsUpdateNotifierInterface,
1211
},
1312
solana_clock::Slot,
1413
solana_measure::measure::Measure,
@@ -45,12 +44,12 @@ impl AccountsUpdateNotifierInterface for AccountsUpdateNotifierImpl {
4544
&self,
4645
slot: Slot,
4746
write_version: u64,
48-
account: &StoredAccountMeta,
47+
account: &AccountForGeyser<'_>,
4948
) {
5049
let mut measure_all = Measure::start("geyser-plugin-notify-account-restore-all");
5150
let mut measure_copy = Measure::start("geyser-plugin-copy-stored-account-info");
5251

53-
let mut account = self.accountinfo_from_stored_account_meta(account);
52+
let mut account = self.accountinfo_from_account_for_geyser(account);
5453
account.write_version = write_version;
5554
measure_copy.stop();
5655

@@ -135,17 +134,17 @@ impl AccountsUpdateNotifierImpl {
135134
}
136135
}
137136

138-
fn accountinfo_from_stored_account_meta<'a>(
137+
fn accountinfo_from_account_for_geyser<'a>(
139138
&self,
140-
stored_account_meta: &'a StoredAccountMeta,
139+
account: &'a AccountForGeyser<'_>,
141140
) -> ReplicaAccountInfoV3<'a> {
142141
ReplicaAccountInfoV3 {
143-
pubkey: stored_account_meta.pubkey().as_ref(),
144-
lamports: stored_account_meta.lamports(),
145-
owner: stored_account_meta.owner().as_ref(),
146-
executable: stored_account_meta.executable(),
147-
rent_epoch: stored_account_meta.rent_epoch(),
148-
data: stored_account_meta.data(),
142+
pubkey: account.pubkey.as_ref(),
143+
lamports: account.lamports(),
144+
owner: account.owner().as_ref(),
145+
executable: account.executable(),
146+
rent_epoch: account.rent_epoch(),
147+
data: account.data(),
149148
write_version: 0, // can/will be populated afterwards
150149
txn: None,
151150
}

0 commit comments

Comments
 (0)