Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
77 changes: 76 additions & 1 deletion accounts-db/src/account_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,29 @@ impl AccountStorage {
pub(crate) fn len(&self) -> usize {
self.map.len()
}

/// Returns the (slot, storage) tuples where `predicate` returns `true`
///
/// This function is useful when not all storages are desired,
/// as storages are only Arc::cloned if they pass the predicate.
///
/// # Panics
///
/// Panics if `shrink` is in progress.
pub fn get_if(
&self,
predicate: impl Fn(&Slot, &AccountStorageEntry) -> bool,
) -> Box<[(Slot, Arc<AccountStorageEntry>)]> {
assert!(self.no_shrink_in_progress());
Copy link
Copy Markdown

@brooksprumo brooksprumo Dec 4, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This assert does exist in the old version as well.

In the old version we do:

/// Get storages to use for snapshots, for the requested slots
pub fn get_snapshot_storages(
&self,
requested_slots: impl RangeBounds<Slot> + Sync,
) -> (Vec<Arc<AccountStorageEntry>>, Vec<Slot>) {
let mut m = Measure::start("get slots");
let mut slots_and_storages = self
.storage
.iter()

and that .iter() also has the assert:

/// iterate through all (slot, append-vec)
pub(crate) fn iter(&self) -> AccountStorageIter<'_> {
assert!(self.no_shrink_in_progress());
AccountStorageIter::new(self)
}

self.map
.iter()
.filter_map(|entry| {
let slot = entry.key();
let storage = &entry.value().storage;
predicate(slot, storage).then(|| (*slot, Arc::clone(storage)))
})
.collect()
}
}

/// iterate contents of AccountStorage without exposing internals
Expand Down Expand Up @@ -291,7 +314,11 @@ impl Default for AccountStorageStatus {

#[cfg(test)]
pub(crate) mod tests {
use {super::*, crate::accounts_file::AccountsFileProvider, std::path::Path};
use {
super::*,
crate::accounts_file::AccountsFileProvider,
std::{iter, path::Path},
};

#[test]
fn test_shrink_in_progress() {
Expand Down Expand Up @@ -568,4 +595,52 @@ pub(crate) mod tests {
.is_none());
assert!(storage.get_account_storage_entry(slot, id).is_some());
}

#[test]
fn test_get_if() {
let storage = AccountStorage::default();
assert!(storage.get_if(|_, _| true).is_empty());

// add some entries
let ids = [123, 456, 789];
for id in ids {
let slot = id as Slot;
let entry = AccountStorageEntry::new(
Path::new(""),
slot,
id,
5000,
AccountsFileProvider::AppendVec,
);
storage.map.insert(
slot,
AccountStorageReference {
id,
storage: entry.into(),
},
);
}

// look 'em up
for id in ids {
let found = storage.get_if(|slot, _| *slot == id as Slot);
assert!(found
.iter()
.map(|(slot, _)| *slot)
.eq(iter::once(id as Slot)));
}

assert!(storage.get_if(|_, _| false).is_empty());
assert_eq!(storage.get_if(|_, _| true).len(), ids.len());
}

#[test]
#[should_panic(expected = "self.no_shrink_in_progress()")]
fn test_get_if_fail() {
let storage = AccountStorage::default();
storage
.shrink_in_progress_map
.insert(0, storage.get_test_storage());
storage.get_if(|_, _| true);
}
}
55 changes: 19 additions & 36 deletions accounts-db/src/accounts_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8278,44 +8278,27 @@ impl AccountsDb {
&self,
requested_slots: impl RangeBounds<Slot> + Sync,
) -> (Vec<Arc<AccountStorageEntry>>, Vec<Slot>) {
let mut m = Measure::start("get slots");
let mut slots_and_storages = self
let start = Instant::now();
let max_alive_root_exclusive = self
.accounts_index
.roots_tracker
.read()
.unwrap()
.alive_roots
.max_exclusive();
let (slots, storages) = self
.storage
.iter()
.filter_map(|(slot, store)| {
requested_slots
.contains(&slot)
.then_some((slot, Some(store)))
.get_if(|slot, storage| {
(*slot < max_alive_root_exclusive)
&& requested_slots.contains(slot)
&& storage.has_accounts()
})
.collect::<Vec<_>>();
m.stop();
let mut m2 = Measure::start("filter");
let chunk_size = 5_000;
let (result, slots): (Vec<_>, Vec<_>) = self.thread_pool_clean.install(|| {
slots_and_storages
.par_chunks_mut(chunk_size)
.map(|slots_and_storages| {
slots_and_storages
.iter_mut()
.filter(|(slot, _)| self.accounts_index.is_alive_root(*slot))
.filter_map(|(slot, store)| {
let store = std::mem::take(store).unwrap();
store.has_accounts().then_some((store, *slot))
})
.collect::<Vec<(Arc<AccountStorageEntry>, Slot)>>()
})
.flatten()
.unzip()
});

m2.stop();

debug!(
"hash_total: get slots: {}, filter: {}",
m.as_us(),
m2.as_us(),
);
(result, slots)
.into_vec()
.into_iter()
.unzip();
Comment on lines +8296 to +8298
Copy link
Copy Markdown

@brooksprumo brooksprumo Dec 4, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This .into_vec() won't allocate, it'll steal the allocation from the Box, so we're good there.

For the unzip, we can refactor that as well. I'd love to do that. I didn't want to do that in a backport though. So we keep the same return types and keep the PR small. (Note in the original we also unzip, so we're not adding an unzip here in the backport. We also know that the backport is faster too.)

let duration = start.elapsed();
debug!("get_snapshot_storages: {duration:?}");
(storages, slots)
}

/// Returns the latest full snapshot slot
Expand Down