Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
94 changes: 54 additions & 40 deletions accounts-db/src/accounts_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2532,11 +2532,28 @@ impl AccountsDb {
is_startup: bool,
timings: &mut CleanKeyTimings,
epoch_schedule: &EpochSchedule,
old_storages_policy: OldStoragesPolicy,
) -> CleaningCandidates {
let oldest_non_ancient_slot = self.get_oldest_non_ancient_slot(epoch_schedule);
let mut dirty_store_processing_time = Measure::start("dirty_store_processing");
let max_slot_inclusive =
max_clean_root_inclusive.unwrap_or_else(|| self.accounts_index.max_root_inclusive());
let max_root_inclusive = self.accounts_index.max_root_inclusive();
let max_slot_inclusive = max_clean_root_inclusive.unwrap_or(max_root_inclusive);

if old_storages_policy == OldStoragesPolicy::Clean {
let slot_one_epoch_old =
max_root_inclusive.saturating_sub(epoch_schedule.slots_per_epoch);
// do nothing special for these 100 old storages that will likely get cleaned up shortly
let acceptable_straggler_slot_count = 100;
let old_slot_cutoff =
slot_one_epoch_old.saturating_sub(acceptable_straggler_slot_count);
let (old_storages, old_slots) = self.get_snapshot_storages(..old_slot_cutoff);
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i was originally worried about the cost of getting these snapshots, but that worry was about 3 years ago. I assume this isn't too expensive to run every clean? clean is only once per 100s or so. I imagine this is fine. Not sure how else you'd get this. I was piggy backing off the storages collected for hash of all accounts previously.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, it would be nice to not fetch the storages unnecessarily. I've been running this on a validator for a few hours now.

Yes, clean normally runs once every ~45-50 seconds (~100 slots). The median time to get the storages here has been 23 milliseconds, with a common range of 21-28 milliseconds.

Another option would be to inspect these old storages less often.

It is nice that once we skip rewrites this will all go away, and that does feel close now.

let num_old_storages = old_storages.len();
for (old_slot, old_storage) in std::iter::zip(old_slots, old_storages) {
self.dirty_stores.entry(old_slot).or_insert(old_storage);
}
info!("Marked {num_old_storages} old storages as dirty");
}

let mut dirty_stores = Vec::with_capacity(self.dirty_stores.len());
// find the oldest dirty slot
// we'll add logging if that append vec cannot be marked dead
Expand Down Expand Up @@ -2648,7 +2665,16 @@ impl AccountsDb {

/// Call clean_accounts() with the common parameters that tests/benches use.
pub fn clean_accounts_for_tests(&self) {
self.clean_accounts(None, false, &EpochSchedule::default())
self.clean_accounts(
None,
false,
&EpochSchedule::default(),
if self.ancient_append_vec_offset.is_some() {
OldStoragesPolicy::Leave
} else {
OldStoragesPolicy::Clean
},
)
}

/// called with cli argument to verify refcounts are correct on all accounts
Expand Down Expand Up @@ -2755,6 +2781,7 @@ impl AccountsDb {
max_clean_root_inclusive: Option<Slot>,
is_startup: bool,
epoch_schedule: &EpochSchedule,
old_storages_policy: OldStoragesPolicy,
) {
if self.exhaustively_verify_refcounts {
self.exhaustively_verify_refcounts(max_clean_root_inclusive);
Expand All @@ -2776,6 +2803,7 @@ impl AccountsDb {
is_startup,
&mut key_timings,
epoch_schedule,
old_storages_policy,
);

let num_candidates = Self::count_pubkeys(&candidates);
Expand Down Expand Up @@ -4700,7 +4728,15 @@ impl AccountsDb {
let maybe_clean = || {
if self.dirty_stores.len() > DIRTY_STORES_CLEANING_THRESHOLD {
let latest_full_snapshot_slot = self.latest_full_snapshot_slot();
self.clean_accounts(latest_full_snapshot_slot, is_startup, epoch_schedule);
self.clean_accounts(
latest_full_snapshot_slot,
is_startup,
epoch_schedule,
// Leave any old storages alone for now. Once the validator is running
// normal, calls to clean_accounts() will have the correct policy based
// on if ancient storages are enabled or not.
OldStoragesPolicy::Leave,
);
}
};

Expand Down Expand Up @@ -6874,40 +6910,6 @@ impl AccountsDb {
true
}

/// storages are sorted by slot and have range info.
/// add all stores older than slots_per_epoch to dirty_stores so clean visits these slots
fn mark_old_slots_as_dirty(
&self,
storages: &SortedStorages,
slots_per_epoch: Slot,
stats: &mut crate::accounts_hash::HashStats,
) {
// Nothing to do if ancient append vecs are enabled.
// Ancient slots will be visited by the ancient append vec code and dealt with correctly.
// we expect these ancient append vecs to be old and keeping accounts
// We can expect the normal processes will keep them cleaned.
// If we included them here then ALL accounts in ALL ancient append vecs will be visited by clean each time.
if self.ancient_append_vec_offset.is_some() {
return;
}

let mut mark_time = Measure::start("mark_time");
let mut num_dirty_slots: usize = 0;
let max = storages.max_slot_inclusive();
let acceptable_straggler_slot_count = 100; // do nothing special for these old stores which will likely get cleaned up shortly
let sub = slots_per_epoch + acceptable_straggler_slot_count;
let in_epoch_range_start = max.saturating_sub(sub);
for (slot, storage) in storages.iter_range(&(..in_epoch_range_start)) {
if let Some(storage) = storage {
self.dirty_stores.insert(slot, storage.clone());
num_dirty_slots += 1;
}
}
mark_time.stop();
stats.mark_time_us = mark_time.as_us();
stats.num_dirty_slots = num_dirty_slots;
}

pub fn calculate_accounts_hash_from(
&self,
data_source: CalcAccountsHashDataSource,
Expand Down Expand Up @@ -7248,8 +7250,6 @@ impl AccountsDb {
let storages_start_slot = storages.range().start;
stats.oldest_root = storages_start_slot;

self.mark_old_slots_as_dirty(storages, config.epoch_schedule.slots_per_epoch, &mut stats);

let slot = storages.max_slot_inclusive();
let use_bg_thread_pool = config.use_bg_thread_pool;
let accounts_hash_cache_path = self.accounts_hash_cache_path.clone();
Expand Down Expand Up @@ -9345,6 +9345,20 @@ pub(crate) enum UpdateIndexThreadSelection {
PoolWithThreshold,
}

/// How should old storages be handled in clean_accounts()?
#[derive(Debug, Copy, Clone, Eq, PartialEq)]
pub enum OldStoragesPolicy {
/// Clean all old storages, even if they were not explictly marked as dirty.
///
/// This is the default behavior when not skipping rewrites.
Clean,
/// Leave all old storages.
///
/// When skipping rewrites, we intentionally will have ancient storages.
/// Do not clean them up automatically in clean_accounts().
Leave,
}

// These functions/fields are only usable from a dev context (i.e. tests and benches)
#[cfg(feature = "dev-context-only-utils")]
impl AccountStorageEntry {
Expand Down
93 changes: 79 additions & 14 deletions accounts-db/src/accounts_db/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1953,13 +1953,23 @@ fn test_clean_max_slot_zero_lamport_account() {
// updates in later slots in slot 1
assert_eq!(accounts.alive_account_count_in_slot(0), 1);
assert_eq!(accounts.alive_account_count_in_slot(1), 1);
accounts.clean_accounts(Some(0), false, &EpochSchedule::default());
accounts.clean_accounts(
Some(0),
false,
&EpochSchedule::default(),
OldStoragesPolicy::Leave,
);
assert_eq!(accounts.alive_account_count_in_slot(0), 1);
assert_eq!(accounts.alive_account_count_in_slot(1), 1);
assert!(accounts.accounts_index.contains_with(&pubkey, None, None));

// Now the account can be cleaned up
accounts.clean_accounts(Some(1), false, &EpochSchedule::default());
accounts.clean_accounts(
Some(1),
false,
&EpochSchedule::default(),
OldStoragesPolicy::Leave,
);
assert_eq!(accounts.alive_account_count_in_slot(0), 0);
assert_eq!(accounts.alive_account_count_in_slot(1), 0);

Expand Down Expand Up @@ -3488,7 +3498,12 @@ fn test_zero_lamport_new_root_not_cleaned() {
db.add_root_and_flush_write_cache(1);

// Only clean zero lamport accounts up to slot 0
db.clean_accounts(Some(0), false, &EpochSchedule::default());
db.clean_accounts(
Some(0),
false,
&EpochSchedule::default(),
OldStoragesPolicy::Leave,
);

// Should still be able to find zero lamport account in slot 1
assert_eq!(
Expand Down Expand Up @@ -4701,7 +4716,12 @@ fn run_test_shrink_unref(do_intra_cache_clean: bool) {
db.calculate_accounts_delta_hash(1);

// Clean to remove outdated entry from slot 0
db.clean_accounts(Some(1), false, &EpochSchedule::default());
db.clean_accounts(
Some(1),
false,
&EpochSchedule::default(),
OldStoragesPolicy::Leave,
);

// Shrink Slot 0
{
Expand All @@ -4720,7 +4740,12 @@ fn run_test_shrink_unref(do_intra_cache_clean: bool) {
// Should be one store before clean for slot 0
db.get_and_assert_single_storage(0);
db.calculate_accounts_delta_hash(2);
db.clean_accounts(Some(2), false, &EpochSchedule::default());
db.clean_accounts(
Some(2),
false,
&EpochSchedule::default(),
OldStoragesPolicy::Leave,
);

// No stores should exist for slot 0 after clean
assert_no_storages_at_slot(&db, 0);
Expand Down Expand Up @@ -4765,7 +4790,7 @@ fn test_clean_drop_dead_zero_lamport_single_ref_accounts() {
accounts_db.calculate_accounts_delta_hash(1);

// run clean
accounts_db.clean_accounts(Some(1), false, &epoch_schedule);
accounts_db.clean_accounts(Some(1), false, &epoch_schedule, OldStoragesPolicy::Leave);

// After clean, both slot0 and slot1 should be marked dead and dropped
// from the store map.
Expand Down Expand Up @@ -4797,7 +4822,12 @@ fn test_clean_drop_dead_storage_handle_zero_lamport_single_ref_accounts() {

// Clean should mark slot 0 dead and drop it. During the dropping, it
// will find that slot 1 has a single ref zero accounts and mark it.
db.clean_accounts(Some(1), false, &EpochSchedule::default());
db.clean_accounts(
Some(1),
false,
&EpochSchedule::default(),
OldStoragesPolicy::Leave,
);

// Assert that after clean, slot 0 is dropped.
assert!(db.storage.get_slot_storage_entry(0).is_none());
Expand Down Expand Up @@ -4838,7 +4868,12 @@ fn test_shrink_unref_handle_zero_lamport_single_ref_accounts() {
db.calculate_accounts_delta_hash(1);

// Clean to remove outdated entry from slot 0
db.clean_accounts(Some(1), false, &EpochSchedule::default());
db.clean_accounts(
Some(1),
false,
&EpochSchedule::default(),
OldStoragesPolicy::Leave,
);

// Shrink Slot 0
{
Expand Down Expand Up @@ -4872,7 +4907,12 @@ fn test_shrink_unref_handle_zero_lamport_single_ref_accounts() {
db.get_and_assert_single_storage(0);
db.get_and_assert_single_storage(1);
db.calculate_accounts_delta_hash(2);
db.clean_accounts(Some(2), false, &EpochSchedule::default());
db.clean_accounts(
Some(2),
false,
&EpochSchedule::default(),
OldStoragesPolicy::Leave,
);

// No stores should exist for slot 0 after clean
assert_no_storages_at_slot(&db, 0);
Expand Down Expand Up @@ -5690,15 +5730,30 @@ define_accounts_db_test!(
assert_eq!(accounts_db.ref_count_for_pubkey(&pubkey), 3);

accounts_db.set_latest_full_snapshot_slot(slot2);
accounts_db.clean_accounts(Some(slot2), false, &EpochSchedule::default());
accounts_db.clean_accounts(
Some(slot2),
false,
&EpochSchedule::default(),
OldStoragesPolicy::Leave,
);
assert_eq!(accounts_db.ref_count_for_pubkey(&pubkey), 2);

accounts_db.set_latest_full_snapshot_slot(slot2);
accounts_db.clean_accounts(None, false, &EpochSchedule::default());
accounts_db.clean_accounts(
None,
false,
&EpochSchedule::default(),
OldStoragesPolicy::Leave,
);
assert_eq!(accounts_db.ref_count_for_pubkey(&pubkey), 1);

accounts_db.set_latest_full_snapshot_slot(slot3);
accounts_db.clean_accounts(None, false, &EpochSchedule::default());
accounts_db.clean_accounts(
None,
false,
&EpochSchedule::default(),
OldStoragesPolicy::Leave,
);
assert_eq!(accounts_db.ref_count_for_pubkey(&pubkey), 0);
}
);
Expand Down Expand Up @@ -8007,7 +8062,12 @@ define_accounts_db_test!(test_calculate_incremental_accounts_hash, |accounts_db|

// calculate the full accounts hash
let full_accounts_hash = {
accounts_db.clean_accounts(Some(slot - 1), false, &EpochSchedule::default());
accounts_db.clean_accounts(
Some(slot - 1),
false,
&EpochSchedule::default(),
OldStoragesPolicy::Leave,
);
let (storages, _) = accounts_db.get_snapshot_storages(..=slot);
let storages = SortedStorages::new(&storages);
accounts_db.calculate_accounts_hash(
Expand Down Expand Up @@ -8073,7 +8133,12 @@ define_accounts_db_test!(test_calculate_incremental_accounts_hash, |accounts_db|
// calculate the incremental accounts hash
let incremental_accounts_hash = {
accounts_db.set_latest_full_snapshot_slot(full_accounts_hash_slot);
accounts_db.clean_accounts(Some(slot - 1), false, &EpochSchedule::default());
accounts_db.clean_accounts(
Some(slot - 1),
false,
&EpochSchedule::default(),
OldStoragesPolicy::Leave,
);
let (storages, _) = accounts_db.get_snapshot_storages(full_accounts_hash_slot + 1..=slot);
let storages = SortedStorages::new(&storages);
accounts_db.calculate_incremental_accounts_hash(
Expand Down
Loading