Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 30 additions & 0 deletions accounts-db/src/account_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use {
solana_clock::Slot,
solana_nohash_hasher::{BuildNoHashHasher, IntMap},
std::{
collections::VecDeque,
ops::{Index, Range},
sync::{
atomic::{AtomicUsize, Ordering},
Expand Down Expand Up @@ -410,6 +411,35 @@ impl<'a> AccountStoragesConcurrentConsumer<'a> {
None
}
}

/// Add a number of consecutive `AccountStorageEntry` entries to `chunk`.
///
/// Returns iterator over new entries added to `chunk` (consuming it won't affect the chunk).
///
/// Chunk is filled up to its existing capacity (no reallocation will happen).
pub fn take_up_to_capacity<'b>(
&'a self,
chunk: &'b mut VecDeque<NextItem<'a>>,
) -> impl Iterator<Item = &'b NextItem<'a>> + 'b {
let starting_len = chunk.len();
let num_missing_items = chunk.capacity() - starting_len;
let consume_pos = self
.current_position
.fetch_add(num_missing_items, Ordering::Relaxed);
let consume_end_pos = self
.orderer
.entries_len()
.min(consume_pos + num_missing_items);
for position in consume_pos..consume_end_pos {
let original_index = self.orderer.original_index(position);
chunk.push_back(NextItem {
position,
original_index,
storage: &self.orderer[position],
});
}
chunk.range(starting_len..)
}
}

/// Value returned from calling `AccountStoragesConcurrentConsumer::next()`
Expand Down
48 changes: 39 additions & 9 deletions accounts-db/src/accounts_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ use {
active_stats::{ActiveStatItem, ActiveStats},
ancestors::Ancestors,
append_vec::{self, aligned_stored_size, STORE_META_OVERHEAD},
buffered_reader::RequiredLenBufFileRead,
buffered_reader::{FileBufRead as _, RequiredLenBufFileRead},
contains::Contains,
is_zero_lamport::IsZeroLamport,
obsolete_accounts::ObsoleteAccounts,
Expand Down Expand Up @@ -1969,12 +1969,12 @@ impl AccountsDb {
storages.retain(|s| s.slot() <= max_slot_inclusive);
// populate
storages.par_iter().for_each_init(
|| Box::new(append_vec::new_scan_accounts_reader()),
|| append_vec::new_scan_accounts_reader(0),
|reader, storage| {
let slot = storage.slot();
storage
.accounts
.scan_accounts(reader.as_mut(), |_offset, account| {
.scan_accounts(reader, |_offset, account| {
let pk = account.pubkey();
match pubkey_refcount.entry(*pk) {
dashmap::mapref::entry::Entry::Occupied(mut occupied_entry) => {
Expand Down Expand Up @@ -3833,7 +3833,7 @@ impl AccountsDb {
})
}
ScanAccountStorageData::DataRefForStorage => {
let mut reader = append_vec::new_scan_accounts_reader();
let mut reader = append_vec::new_scan_accounts_reader(0);
storage.scan_accounts(&mut reader, |_offset, account| {
let account_without_data = StoredAccountInfoWithoutData::new_from(&account);
storage_scan_func(retval, &account_without_data, Some(account.data));
Expand Down Expand Up @@ -6529,6 +6529,7 @@ impl AccountsDb {
pub fn generate_index(
&self,
limit_load_slot_count_from_snapshot: Option<usize>,
memlock_budget_size: usize,
verify: bool,
) -> IndexGenerationInfo {
let mut total_time = Measure::start("generate_index");
Expand Down Expand Up @@ -6592,11 +6593,15 @@ impl AccountsDb {
}

let mut total_accum = IndexGenerationAccumulator::new();
let storages_orderer =
AccountStoragesOrderer::with_random_order(&storages).into_concurrent_consumer();
// Balance ratio of small to large files to amortize per-operation cost across the
// whole scan process and keep total bytes in pending reads similar at all times.
let ratio = append_vec::SCAN_ACCOUNTS_SMALL_TO_LARGE_FILE_RATIO;
let storages_orderer = AccountStoragesOrderer::with_small_to_large_ratio(&storages, ratio)
.into_concurrent_consumer();
let exit_logger = AtomicBool::new(false);
let num_processed = AtomicU64::new(0);
let num_threads = num_cpus::get();
let per_thread_memlock_budget_size = memlock_budget_size / num_threads;
let mut index_time = Measure::start("index");
thread::scope(|s| {
let thread_handles = (0..num_threads)
Expand All @@ -6605,8 +6610,33 @@ impl AccountsDb {
.name(format!("solGenIndex{i:02}"))
.spawn_scoped(s, || {
let mut thread_accum = IndexGenerationAccumulator::new();
let mut reader = append_vec::new_scan_accounts_reader();
while let Some(next_item) = storages_orderer.next() {
let mut reader = append_vec::new_scan_accounts_reader(
per_thread_memlock_budget_size,
);
// Always consume a multiple of small/large files cycle, when chunk
// is replenished, it will get a single cycle (half capacity) and
// add it to prefetch, while the existing cycle is being read.
let mut chunk = VecDeque::with_capacity(2 * (ratio.0 + ratio.1));

loop {
if chunk.is_empty() || chunk.len() == chunk.capacity() / 2 {
for new_item in storages_orderer.take_up_to_capacity(&mut chunk)
{
if let Some((file, size)) =
new_item.storage.accounts.file_io_info()
{
reader
.add_file_to_prefetch(file, size)
.expect("must prefetch accounts storage")
}
}
reader.submit_prefetch().expect("must submit prefetch");
}

let Some(next_item) = chunk.pop_front() else {
break;
};

self.maybe_throttle_index_generation();
let storage = next_item.storage;
let store_id = storage.id();
Expand Down Expand Up @@ -7372,7 +7402,7 @@ impl AccountsDb {
storages: &[Arc<AccountStorageEntry>],
mut callback: impl for<'local> FnMut(Offset, StoredAccountInfo<'local>),
) {
let mut reader = append_vec::new_scan_accounts_reader();
let mut reader = append_vec::new_scan_accounts_reader(0);
for storage in storages {
storage
.accounts
Expand Down
2 changes: 1 addition & 1 deletion accounts-db/src/accounts_db/geyser_plugin_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ pub mod tests {
let notifier = GeyserTestPlugin::default();
let notifier = Arc::new(notifier);
accounts_db.set_geyser_plugin_notifier(Some(notifier.clone()));
accounts_db.generate_index(None, false);
accounts_db.generate_index(None, 0, false);

// Ensure key1 was notified twice in different slots
{
Expand Down
16 changes: 10 additions & 6 deletions accounts-db/src/accounts_db/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ fn run_generate_index_duplicates_within_slot_test(db: AccountsDb, reverse: bool)
assert!(!db.accounts_index.contains(&pubkey));
let storage_info = StorageSizeAndCountMap::default();
let storage = db.get_storage_for_slot(slot0).unwrap();
let mut reader = append_vec::new_scan_accounts_reader();
let mut reader = append_vec::new_scan_accounts_reader(0);
db.generate_index_for_slot(
&mut reader,
&storage,
Expand Down Expand Up @@ -221,7 +221,11 @@ fn test_generate_index_for_single_ref_zero_lamport_slot() {
let storable_accounts = (slot0, &data[..]);
append_vec.accounts.write_accounts(&storable_accounts, 0);
assert!(!db.accounts_index.contains(&pubkey));
let result = db.generate_index(None, false);
let result = db.generate_index(
None,
ACCOUNTS_DB_CONFIG_FOR_TESTING.memlock_budget_size,
false,
);
let entry = db.accounts_index.get_cloned(&pubkey).unwrap();
assert_eq!(entry.slot_list_lock_read_len(), 1);
assert_eq!(append_vec.alive_bytes(), aligned_stored_size(0));
Expand Down Expand Up @@ -4938,7 +4942,7 @@ define_accounts_db_test!(test_calculate_storage_count_and_alive_bytes, |accounts

let storage = accounts.storage.get_slot_storage_entry(slot0).unwrap();
let storage_info = StorageSizeAndCountMap::default();
let mut reader = append_vec::new_scan_accounts_reader();
let mut reader = append_vec::new_scan_accounts_reader(0);
accounts.generate_index_for_slot(&mut reader, &storage, slot0, 0, &storage_info);
assert_eq!(storage_info.len(), 1);
for entry in storage_info.iter() {
Expand All @@ -4962,7 +4966,7 @@ define_accounts_db_test!(
// empty store
let storage = accounts.create_and_insert_store(0, 1, "test");
let storage_info = StorageSizeAndCountMap::default();
let mut reader = append_vec::new_scan_accounts_reader();
let mut reader = append_vec::new_scan_accounts_reader(0);
accounts.generate_index_for_slot(&mut reader, &storage, 0, 0, &storage_info);
assert!(storage_info.is_empty());
}
Expand Down Expand Up @@ -4999,7 +5003,7 @@ define_accounts_db_test!(
);

let storage_info = StorageSizeAndCountMap::default();
let mut reader = append_vec::new_scan_accounts_reader();
let mut reader = append_vec::new_scan_accounts_reader(0);
accounts.generate_index_for_slot(&mut reader, &storage, 0, 0, &storage_info);
assert_eq!(storage_info.len(), 1);
for entry in storage_info.iter() {
Expand Down Expand Up @@ -6132,7 +6136,7 @@ fn test_combine_ancient_slots_simple() {
fn get_all_accounts_from_storages<'a>(
storages: impl Iterator<Item = &'a Arc<AccountStorageEntry>>,
) -> Vec<(Pubkey, AccountSharedData)> {
let mut reader = append_vec::new_scan_accounts_reader();
let mut reader = append_vec::new_scan_accounts_reader(0);
storages
.flat_map(|storage| {
let mut vec = Vec::default();
Expand Down
13 changes: 11 additions & 2 deletions accounts-db/src/accounts_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use {
solana_clock::Slot,
solana_pubkey::Pubkey,
std::{
fs::File,
mem,
path::{Path, PathBuf},
},
Expand Down Expand Up @@ -247,6 +248,14 @@ impl AccountsFile {
}
}

/// Return the `File` and its size if accounts are backed by file-io.
pub fn file_io_info(&self) -> Option<(&File, usize)> {
match self {
Self::AppendVec(av) => av.file_io_info(),
Self::TieredStorage(_) => unimplemented!(),
}
}

/// Iterate over all accounts and call `callback` with each account.
///
/// `callback` parameters:
Expand Down Expand Up @@ -279,7 +288,7 @@ impl AccountsFile {
/// as it can potentially read less and be faster.
pub(crate) fn scan_accounts<'a>(
&'a self,
reader: &mut impl RequiredLenBufFileRead<'a>,
reader: &mut dyn RequiredLenBufFileRead<'a>,
callback: impl for<'local> FnMut(Offset, StoredAccountInfo<'local>),
) -> Result<()> {
match self {
Expand All @@ -302,7 +311,7 @@ impl AccountsFile {
&self,
callback: impl for<'local> FnMut(StoredAccountMeta<'local>),
) -> Result<()> {
let mut reader = append_vec::new_scan_accounts_reader();
let mut reader = append_vec::new_scan_accounts_reader(0);
match self {
Self::AppendVec(av) => av.scan_accounts_stored_meta(&mut reader, callback)?,
Self::TieredStorage(_) => {
Expand Down
6 changes: 3 additions & 3 deletions accounts-db/src/ancient_append_vecs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2107,7 +2107,7 @@ pub mod tests {
.map(|storage| storage.id())
.collect::<Vec<_>>()
);
let mut reader = append_vec::new_scan_accounts_reader();
let mut reader = append_vec::new_scan_accounts_reader(0);

// assert that we wrote the 2_ref account to the newly shrunk append vec
let shrink_in_progress = shrinks_in_progress.first().unwrap().1;
Expand Down Expand Up @@ -2276,7 +2276,7 @@ pub mod tests {
(*account.pubkey(), account.to_account_shared_data())
})
.unwrap();
let mut reader = append_vec::new_scan_accounts_reader();
let mut reader = append_vec::new_scan_accounts_reader(0);
let mut count = 0;
storage
.accounts
Expand Down Expand Up @@ -3217,7 +3217,7 @@ pub mod tests {
one.first().unwrap().1.old_storage().id(),
storages[combine_into].id()
);
let mut reader = append_vec::new_scan_accounts_reader();
let mut reader = append_vec::new_scan_accounts_reader(0);

// make sure the single new append vec contains all the same accounts
let mut two = Vec::default();
Expand Down
Loading
Loading