From c95b159a2c1981703b6cd2a7b54c099c0c79bbd4 Mon Sep 17 00:00:00 2001 From: carllin Date: Mon, 24 May 2021 13:51:17 -0700 Subject: [PATCH] Refactor purge_slots_from_cache_and_store() and handle_reclaims() (#17319) --- core/src/tvu.rs | 17 +- ledger/src/sigverify_shreds.rs | 24 +- runtime/src/accounts.rs | 5 +- runtime/src/accounts_background_service.rs | 7 +- runtime/src/accounts_db.rs | 408 ++++++++++++++------- runtime/src/accounts_index.rs | 14 + runtime/src/bank.rs | 35 +- upload-perf/src/upload-perf.rs | 1 - 8 files changed, 339 insertions(+), 172 deletions(-) diff --git a/core/src/tvu.rs b/core/src/tvu.rs index b0c5ba5a485..d264bedced0 100644 --- a/core/src/tvu.rs +++ b/core/src/tvu.rs @@ -35,8 +35,7 @@ use solana_ledger::{ }; use solana_runtime::{ accounts_background_service::{ - AbsRequestHandler, AbsRequestSender, AccountsBackgroundService, SendDroppedBankCallback, - SnapshotRequestHandler, + AbsRequestHandler, AbsRequestSender, AccountsBackgroundService, SnapshotRequestHandler, }, bank_forks::{BankForks, SnapshotConfig}, commitment::BlockCommitmentCache, @@ -237,10 +236,18 @@ impl Tvu { let (pruned_banks_sender, pruned_banks_receiver) = unbounded(); // Before replay starts, set the callbacks in each of the banks in BankForks + // Note after this callback is created, only the AccountsBackgroundService should be calling + // AccountsDb::purge_slot() to clean up dropped banks. + let callback = bank_forks + .read() + .unwrap() + .root_bank() + .rc + .accounts + .accounts_db + .create_drop_bank_callback(pruned_banks_sender); for bank in bank_forks.read().unwrap().banks().values() { - bank.set_callback(Some(Box::new(SendDroppedBankCallback::new( - pruned_banks_sender.clone(), - )))); + bank.set_callback(Some(Box::new(callback.clone()))); } let accounts_background_request_sender = AbsRequestSender::new(snapshot_request_sender); diff --git a/ledger/src/sigverify_shreds.rs b/ledger/src/sigverify_shreds.rs index 6601f07a36d..bf756a8aabe 100644 --- a/ledger/src/sigverify_shreds.rs +++ b/ledger/src/sigverify_shreds.rs @@ -244,13 +244,11 @@ pub fn verify_shreds_gpu( shred_gpu_offsets(pubkeys_len, batches, recycler_cache); let mut out = recycler_cache.buffer().allocate().unwrap(); out.set_pinnable(); - elems.push( - perf_libs::Elems { - #![allow(clippy::cast_ptr_alignment)] - elems: pubkeys.as_ptr() as *const solana_sdk::packet::Packet, - num: num_packets as u32, - }, - ); + elems.push(perf_libs::Elems { + //#![allow(clippy::cast_ptr_alignment)] + elems: pubkeys.as_ptr() as *const solana_sdk::packet::Packet, + num: num_packets as u32, + }); for p in batches { elems.push(perf_libs::Elems { @@ -383,13 +381,11 @@ pub fn sign_shreds_gpu( let mut signatures_out = recycler_cache.buffer().allocate().unwrap(); signatures_out.set_pinnable(); signatures_out.resize(total_sigs * sig_size, 0); - elems.push( - perf_libs::Elems { - #![allow(clippy::cast_ptr_alignment)] - elems: pinned_keypair.as_ptr() as *const solana_sdk::packet::Packet, - num: num_keypair_packets as u32, - }, - ); + elems.push(perf_libs::Elems { + //#![allow(clippy::cast_ptr_alignment)] + elems: pinned_keypair.as_ptr() as *const solana_sdk::packet::Packet, + num: num_keypair_packets as u32, + }); for p in batches.iter() { elems.push(perf_libs::Elems { diff --git a/runtime/src/accounts.rs b/runtime/src/accounts.rs index 16e9f991904..ee12598ce22 100644 --- a/runtime/src/accounts.rs +++ b/runtime/src/accounts.rs @@ -893,8 +893,9 @@ impl Accounts { /// Purge a slot if it is not a root /// Root slots cannot be purged - pub fn purge_slot(&self, slot: Slot) { - self.accounts_db.purge_slot(slot); + /// `is_from_abs` is true if the caller is the AccountsBackgroundService + pub fn purge_slot(&self, slot: Slot, is_from_abs: bool) { + self.accounts_db.purge_slot(slot, is_from_abs); } /// Add a slot to root. Root slots cannot be purged diff --git a/runtime/src/accounts_background_service.rs b/runtime/src/accounts_background_service.rs index 4522f8003b4..7698b6646fc 100644 --- a/runtime/src/accounts_background_service.rs +++ b/runtime/src/accounts_background_service.rs @@ -263,11 +263,12 @@ impl AbsRequestHandler { }) } - pub fn handle_pruned_banks(&self, bank: &Bank) -> usize { + /// `is_from_abs` is true if the caller is the AccountsBackgroundService + pub fn handle_pruned_banks(&self, bank: &Bank, is_from_abs: bool) -> usize { let mut count = 0; for pruned_slot in self.pruned_banks_receiver.try_iter() { count += 1; - bank.rc.accounts.purge_slot(pruned_slot); + bank.rc.accounts.purge_slot(pruned_slot, is_from_abs); } count @@ -393,7 +394,7 @@ impl AccountsBackgroundService { total_remove_slots_time: &mut u64, ) { let mut remove_slots_time = Measure::start("remove_slots_time"); - *removed_slots_count += request_handler.handle_pruned_banks(&bank); + *removed_slots_count += request_handler.handle_pruned_banks(&bank, true); remove_slots_time.stop(); *total_remove_slots_time += remove_slots_time.as_us(); diff --git a/runtime/src/accounts_db.rs b/runtime/src/accounts_db.rs index 3c9590206ae..15d3899d3a4 100644 --- a/runtime/src/accounts_db.rs +++ b/runtime/src/accounts_db.rs @@ -19,6 +19,7 @@ //! commit for each slot entry would be indexed. use crate::{ + accounts_background_service::{DroppedSlotsSender, SendDroppedBankCallback}, accounts_cache::{AccountsCache, CachedAccount, SlotCache}, accounts_hash::{AccountsHash, CalculateHashIntermediate, HashStats}, accounts_index::{ @@ -31,7 +32,7 @@ use crate::{ use blake3::traits::digest::Digest; use dashmap::{ mapref::entry::Entry::{Occupied, Vacant}, - DashMap, DashSet, + DashMap, }; use lazy_static::lazy_static; use log::*; @@ -748,6 +749,7 @@ pub struct AccountsDb { /// to drive clean_accounts /// Generated by get_accounts_delta_hash uncleaned_pubkeys: DashMap>, + is_bank_drop_callback_enabled: AtomicBool, } #[derive(Debug, Default)] @@ -777,7 +779,8 @@ struct AccountsStats { struct PurgeStats { last_report: AtomicU64, safety_checks_elapsed: AtomicU64, - remove_storages_elapsed: AtomicU64, + remove_cache_elapsed: AtomicU64, + remove_storage_entries_elapsed: AtomicU64, drop_storage_entries_elapsed: AtomicU64, num_cached_slots_removed: AtomicUsize, num_stored_slots_removed: AtomicUsize, @@ -785,6 +788,9 @@ struct PurgeStats { total_removed_cached_bytes: AtomicU64, total_removed_stored_bytes: AtomicU64, recycle_stores_write_elapsed: AtomicU64, + scan_storages_elasped: AtomicU64, + purge_accounts_index_elapsed: AtomicU64, + handle_reclaims_elapsed: AtomicU64, } impl PurgeStats { @@ -812,8 +818,14 @@ impl PurgeStats { i64 ), ( - "remove_storages_elapsed", - self.remove_storages_elapsed.swap(0, Ordering::Relaxed) as i64, + "remove_cache_elapsed", + self.remove_cache_elapsed.swap(0, Ordering::Relaxed) as i64, + i64 + ), + ( + "remove_storage_entries_elapsed", + self.remove_storage_entries_elapsed + .swap(0, Ordering::Relaxed) as i64, i64 ), ( @@ -852,6 +864,21 @@ impl PurgeStats { self.recycle_stores_write_elapsed.swap(0, Ordering::Relaxed) as i64, i64 ), + ( + "scan_storages_elasped", + self.scan_storages_elasped.swap(0, Ordering::Relaxed) as i64, + i64 + ), + ( + "purge_accounts_index_elapsed", + self.purge_accounts_index_elapsed.swap(0, Ordering::Relaxed) as i64, + i64 + ), + ( + "handle_reclaims_elapsed", + self.handle_reclaims_elapsed.swap(0, Ordering::Relaxed) as i64, + i64 + ), ); } } @@ -1094,6 +1121,7 @@ impl Default for AccountsDb { cluster_type: None, account_indexes: AccountSecondaryIndexes::default(), caching_enabled: false, + is_bank_drop_callback_enabled: AtomicBool::default(), } } } @@ -1220,7 +1248,7 @@ impl AccountsDb { self.handle_reclaims( &reclaims, None, - false, + Some(&self.clean_accounts_stats.purge_stats), Some(&mut reclaim_result), reset_accounts, ); @@ -1304,7 +1332,7 @@ impl AccountsDb { fn purge_keys_exact<'a, C: 'a>( &'a self, - pubkey_to_slot_set: &'a [(Pubkey, C)], + pubkey_to_slot_set: impl Iterator, ) -> Vec<(u64, AccountInfo)> where C: Contains<'a, Slot>, @@ -1594,12 +1622,20 @@ impl AccountsDb { }) .collect(); - let reclaims = self.purge_keys_exact(&pubkey_to_slot_set); + let reclaims = self.purge_keys_exact(pubkey_to_slot_set.iter()); // Don't reset from clean, since the pubkeys in those stores may need to be unref'ed // and those stores may be used for background hashing. let reset_accounts = false; - self.handle_reclaims(&reclaims, None, false, None, reset_accounts); + let mut reclaim_result = ReclaimResult::default(); + let reclaim_result = Some(&mut reclaim_result); + self.handle_reclaims( + &reclaims, + None, + Some(&self.clean_accounts_stats.purge_stats), + reclaim_result, + reset_accounts, + ); reclaims_time.stop(); @@ -1635,7 +1671,9 @@ impl AccountsDb { /// remove all the storage entries for `S`. /// /// # Arguments - /// * `reclaims` - The accounts to remove from storage entries' "count" + /// * `reclaims` - The accounts to remove from storage entries' "count". Note here + /// that we should not remove cache entries, only entries for accounts actually + /// stored in a storage entry. /// /// * `expected_single_dead_slot` - A correctness assertion. If this is equal to `Some(S)`, /// then the function will check that the only slot being cleaned up in `reclaims` @@ -1643,13 +1681,16 @@ impl AccountsDb { /// from store or slot shrinking, as those should only touch the slot they are /// currently storing to or shrinking. /// - /// * `no_dead_slot` - A correctness assertion. If this is equal to - /// `false`, the function will check that no slots are cleaned up/removed via - /// `process_dead_slots`. For instance, on store, no slots should be cleaned up, - /// but during the background clean accounts purges accounts from old rooted slots, - /// so outdated slots may be removed. + /// * `purge_stats` - The stats used to track performance of purging dead slots. This + /// also serves a correctness assertion. If `purge_stats.is_none()`, this implies + /// there can be no dead slots that happen as a result of this call, and the function + /// will check that no slots are cleaned up/removed via `process_dead_slots`. For instance, + /// on store, no slots should be cleaned up, but during the background clean accounts + /// purges accounts from old rooted slots, so outdated slots may be removed. + /// /// * `reclaim_result` - Information about accounts that were removed from storage, does /// not include accounts that were removed from the cache + /// /// * `reset_accounts` - Reset the append_vec store when the store is dead (count==0) /// From the clean and shrink paths it should be false since there may be an in-progress /// hash operation and the stores may hold accounts that need to be unref'ed. @@ -1657,7 +1698,9 @@ impl AccountsDb { &self, reclaims: SlotSlice, expected_single_dead_slot: Option, - no_dead_slot: bool, + // TODO: coalesce `purge_stats` and `reclaim_result` together into one option, as they + // are both either Some or None + purge_stats: Option<&PurgeStats>, reclaim_result: Option<&mut ReclaimResult>, reset_accounts: bool, ) { @@ -1676,7 +1719,7 @@ impl AccountsDb { reclaimed_offsets, reset_accounts, ); - if no_dead_slot { + if purge_stats.is_none() { assert!(dead_slots.is_empty()); } else if let Some(expected_single_dead_slot) = expected_single_dead_slot { assert!(dead_slots.len() <= 1); @@ -1684,7 +1727,10 @@ impl AccountsDb { assert!(dead_slots.contains(&expected_single_dead_slot)); } } - self.process_dead_slots(&dead_slots, purged_account_slots); + + if let Some(purge_stats) = purge_stats { + self.process_dead_slots(&dead_slots, purged_account_slots, purge_stats); + } } // Must be kept private!, does sensitive cleanup that should only be called from @@ -1693,6 +1739,7 @@ impl AccountsDb { &self, dead_slots: &HashSet, purged_account_slots: Option<&mut AccountSlots>, + purge_stats: &PurgeStats, ) { if dead_slots.is_empty() { return; @@ -1702,7 +1749,7 @@ impl AccountsDb { clean_dead_slots.stop(); let mut purge_removed_slots = Measure::start("reclaims::purge_removed_slots"); - self.purge_storage_slots(&dead_slots); + self.purge_dead_slots_from_storage(dead_slots.iter(), purge_stats); purge_removed_slots.stop(); // If the slot is dead, remove the need to shrink the storages as @@ -2532,7 +2579,20 @@ impl AccountsDb { .is_none()); } - pub fn purge_slot(&self, slot: Slot) { + pub fn create_drop_bank_callback( + &self, + pruned_banks_sender: DroppedSlotsSender, + ) -> SendDroppedBankCallback { + self.is_bank_drop_callback_enabled + .store(true, Ordering::SeqCst); + SendDroppedBankCallback::new(pruned_banks_sender) + } + + /// `is_from_abs` is true if the caller is the AccountsBackgroundService + pub fn purge_slot(&self, slot: Slot, is_from_abs: bool) { + if self.is_bank_drop_callback_enabled.load(Ordering::SeqCst) && !is_from_abs { + panic!("bad drop callpath detected; Bank::drop() must run serially with other logic in ABS like clean_accounts()") + } let mut slots = HashSet::new(); slots.insert(slot); self.purge_slots(&slots); @@ -2566,59 +2626,94 @@ impl AccountsDb { recycle_stores_write_elapsed.as_us() } - fn do_purge_slots_from_cache_and_store<'a>( + /// Purges every slot in `removed_slots` from both the cache and storage. This includes + /// entries in the accounts index, cache entries, and any backing storage entries. + fn purge_slots_from_cache_and_store<'a>( &'a self, - can_exist_in_cache: bool, removed_slots: impl Iterator, purge_stats: &PurgeStats, ) { - let mut remove_storages_elapsed = Measure::start("remove_storages_elapsed"); - let mut all_removed_slot_storages = vec![]; + let mut remove_cache_elapsed_across_slots = 0; let mut num_cached_slots_removed = 0; let mut total_removed_cached_bytes = 0; - let mut total_removed_storage_entries = 0; - let mut total_removed_stored_bytes = 0; for remove_slot in removed_slots { - if let Some(slot_cache) = self.accounts_cache.remove_slot(*remove_slot) { + // This function is only currently safe with respect to `flush_slot_cache()` because + // both functions run serially in AccountsBackgroundService. + let mut remove_cache_elapsed = Measure::start("remove_cache_elapsed"); + // Note: we cannot remove this slot from the slot cache until we've removed its + // entries from the accounts index first. This is because `scan_accounts()` relies on + // holding the index lock, finding the index entry, and then looking up the entry + // in the cache. If it fails to find that entry, it will panic in `get_loaded_account()` + if let Some(slot_cache) = self.accounts_cache.slot_cache(*remove_slot) { // If the slot is still in the cache, remove the backing storages for // the slot and from the Accounts Index - if !can_exist_in_cache { - panic!("The removed slot must alrady have been flushed from the cache"); - } num_cached_slots_removed += 1; total_removed_cached_bytes += slot_cache.total_bytes(); self.purge_slot_cache(*remove_slot, slot_cache); - } else if let Some((_, slot_removed_storages)) = self.storage.0.remove(&remove_slot) { - // Because AccountsBackgroundService synchronously flushes from the accounts cache - // and handles all Bank::drop() (the cleanup function that leads to this - // function call), then we don't need to worry above an overlapping cache flush - // with this function call. This means, if we get into this case, we can be - // confident that the entire state for this slot has been flushed to the storage - // already. - - // Note this only cleans up the storage entries. The accounts index cleaning - // (removing from the slot list, decrementing the account ref count), is handled in - // clean_accounts() -> purge_older_root_entries() + remove_cache_elapsed.stop(); + remove_cache_elapsed_across_slots += remove_cache_elapsed.as_us(); + } else { + self.purge_slot_storage(*remove_slot, purge_stats); + } + // It should not be possible that a slot is neither in the cache or storage. Even in + // a slot with all ticks, `Bank::new_from_parent()` immediately stores some sysvars + // on bank creation. + } + + purge_stats + .remove_cache_elapsed + .fetch_add(remove_cache_elapsed_across_slots, Ordering::Relaxed); + purge_stats + .num_cached_slots_removed + .fetch_add(num_cached_slots_removed, Ordering::Relaxed); + purge_stats + .total_removed_cached_bytes + .fetch_add(total_removed_cached_bytes, Ordering::Relaxed); + } + + /// Purge the backing storage entries for the given slot, does not purge from + /// the cache! + fn purge_dead_slots_from_storage<'a>( + &'a self, + removed_slots: impl Iterator + Clone, + purge_stats: &PurgeStats, + ) { + // Check all slots `removed_slots` are no longer "relevant" roots. + // Note that the slots here could have been rooted slots, but if they're passed here + // for removal it means: + // 1) All updates in that old root have been outdated by updates in newer roots + // 2) Those slots/roots should have already been purged from the accounts index root + // tracking metadata via `accounts_index.clean_dead_slot()`. + let mut safety_checks_elapsed = Measure::start("safety_checks_elapsed"); + assert!(self + .accounts_index + .get_rooted_from_list(removed_slots.clone()) + .is_empty()); + safety_checks_elapsed.stop(); + purge_stats + .safety_checks_elapsed + .fetch_add(safety_checks_elapsed.as_us(), Ordering::Relaxed); + + let mut total_removed_storage_entries = 0; + let mut total_removed_stored_bytes = 0; + let mut all_removed_slot_storages = vec![]; + + let mut remove_storage_entries_elapsed = Measure::start("remove_storage_entries_elapsed"); + for remove_slot in removed_slots { + // Remove the storage entries and collect some metrics + if let Some((_, slot_storages_to_be_removed)) = self.storage.0.remove(&remove_slot) { { - let r_slot_removed_storages = slot_removed_storages.read().unwrap(); + let r_slot_removed_storages = slot_storages_to_be_removed.read().unwrap(); total_removed_storage_entries += r_slot_removed_storages.len(); total_removed_stored_bytes += r_slot_removed_storages .values() .map(|i| i.accounts.capacity()) .sum::(); } - all_removed_slot_storages.push(slot_removed_storages.clone()); + all_removed_slot_storages.push(slot_storages_to_be_removed.clone()); } - - // It should not be possible that a slot is neither in the cache or storage. Even in - // a slot with all ticks, `Bank::new_from_parent()` immediately stores some sysvars - // on bank creation. - - // Remove any delta pubkey set if existing. - self.uncleaned_pubkeys.remove(remove_slot); } - remove_storages_elapsed.stop(); - + remove_storage_entries_elapsed.stop(); let num_stored_slots_removed = all_removed_slot_storages.len(); let recycle_stores_write_elapsed = @@ -2629,19 +2724,12 @@ impl AccountsDb { // of any locks drop(all_removed_slot_storages); drop_storage_entries_elapsed.stop(); - purge_stats - .remove_storages_elapsed - .fetch_add(remove_storages_elapsed.as_us(), Ordering::Relaxed); + .remove_storage_entries_elapsed + .fetch_add(remove_storage_entries_elapsed.as_us(), Ordering::Relaxed); purge_stats .drop_storage_entries_elapsed .fetch_add(drop_storage_entries_elapsed.as_us(), Ordering::Relaxed); - purge_stats - .num_cached_slots_removed - .fetch_add(num_cached_slots_removed, Ordering::Relaxed); - purge_stats - .total_removed_cached_bytes - .fetch_add(total_removed_cached_bytes, Ordering::Relaxed); purge_stats .num_stored_slots_removed .fetch_add(num_stored_slots_removed, Ordering::Relaxed); @@ -2656,24 +2744,6 @@ impl AccountsDb { .fetch_add(recycle_stores_write_elapsed, Ordering::Relaxed); } - fn purge_storage_slots(&self, removed_slots: &HashSet) { - // Check all slots `removed_slots` are no longer rooted - let mut safety_checks_elapsed = Measure::start("safety_checks_elapsed"); - for slot in removed_slots.iter() { - assert!(!self.accounts_index.is_root(*slot)) - } - safety_checks_elapsed.stop(); - self.clean_accounts_stats - .purge_stats - .safety_checks_elapsed - .fetch_add(safety_checks_elapsed.as_us(), Ordering::Relaxed); - self.do_purge_slots_from_cache_and_store( - false, - removed_slots.iter(), - &self.clean_accounts_stats.purge_stats, - ); - } - fn purge_slot_cache(&self, purged_slot: Slot, slot_cache: SlotCache) { let mut purged_slot_pubkeys: HashSet<(Slot, Pubkey)> = HashSet::new(); let pubkey_to_slot_set: Vec<(Pubkey, Slot)> = slot_cache @@ -2696,10 +2766,10 @@ impl AccountsDb { // Slot purged from cache should not exist in the backing store assert!(self.storage.get_slot_stores(purged_slot).is_none()); let num_purged_keys = pubkey_to_slot_set.len(); - let reclaims = self.purge_keys_exact(&pubkey_to_slot_set); + let reclaims = self.purge_keys_exact(pubkey_to_slot_set.iter()); assert_eq!(reclaims.len(), num_purged_keys); if is_dead { - self.finalize_dead_slot_removal( + self.remove_dead_slots_metadata( std::iter::once(&purged_slot), purged_slot_pubkeys, None, @@ -2707,6 +2777,68 @@ impl AccountsDb { } } + fn purge_slot_storage(&self, remove_slot: Slot, purge_stats: &PurgeStats) { + // Because AccountsBackgroundService synchronously flushes from the accounts cache + // and handles all Bank::drop() (the cleanup function that leads to this + // function call), then we don't need to worry above an overlapping cache flush + // with this function call. This means, if we get into this case, we can be + // confident that the entire state for this slot has been flushed to the storage + // already. + let mut scan_storages_elasped = Measure::start("scan_storages_elasped"); + type ScanResult = ScanStorageResult>>>; + let scan_result: ScanResult = self.scan_account_storage( + remove_slot, + |loaded_account: LoadedAccount| Some(*loaded_account.pubkey()), + |accum: &Arc>>, loaded_account: LoadedAccount| { + accum + .lock() + .unwrap() + .insert((*loaded_account.pubkey(), remove_slot)); + }, + ); + scan_storages_elasped.stop(); + purge_stats + .scan_storages_elasped + .fetch_add(scan_storages_elasped.as_us(), Ordering::Relaxed); + + let mut purge_accounts_index_elapsed = Measure::start("purge_accounts_index_elapsed"); + let reclaims; + match scan_result { + ScanStorageResult::Cached(_) => { + panic!("Should not see cached keys in this `else` branch, since we checked this slot did not exist in the cache above"); + } + ScanStorageResult::Stored(stored_keys) => { + // Purge this slot from the accounts index + reclaims = self.purge_keys_exact(stored_keys.lock().unwrap().iter()); + } + } + purge_accounts_index_elapsed.stop(); + purge_stats + .purge_accounts_index_elapsed + .fetch_add(purge_accounts_index_elapsed.as_us(), Ordering::Relaxed); + + // `handle_reclaims()` should remove all the account index entries and + // storage entries + let mut handle_reclaims_elapsed = Measure::start("handle_reclaims_elapsed"); + // Slot should be dead after removing all its account entries + let expected_dead_slot = Some(remove_slot); + self.handle_reclaims( + &reclaims, + expected_dead_slot, + Some(purge_stats), + Some(&mut ReclaimResult::default()), + false, + ); + handle_reclaims_elapsed.stop(); + purge_stats + .handle_reclaims_elapsed + .fetch_add(handle_reclaims_elapsed.as_us(), Ordering::Relaxed); + // After handling the reclaimed entries, this slot's + // storage entries should be purged from self.storage + assert!(self.storage.get_slot_stores(remove_slot).is_none()); + } + + #[allow(clippy::needless_collect)] fn purge_slots(&self, slots: &HashSet) { // `add_root()` should be called first let mut safety_checks_elapsed = Measure::start("safety_checks_elapsed"); @@ -2718,8 +2850,7 @@ impl AccountsDb { self.external_purge_slots_stats .safety_checks_elapsed .fetch_add(safety_checks_elapsed.as_us(), Ordering::Relaxed); - self.do_purge_slots_from_cache_and_store( - true, + self.purge_slots_from_cache_and_store( non_roots.into_iter(), &self.external_purge_slots_stats, ); @@ -2736,11 +2867,6 @@ impl AccountsDb { panic!("Trying to remove accounts for rooted slot {}", remove_slot); } - if let Some(slot_cache) = self.accounts_cache.remove_slot(remove_slot) { - // If the slot is still in the cache, remove it from the cache - self.purge_slot_cache(remove_slot, slot_cache); - } - // TODO: Handle if the slot was flushed to storage while we were removing the cached // slot above, i.e. it's possible the storage contains partial version of the current // slot. One way to handle this is to augment slots to contain a "version", That way, @@ -2750,37 +2876,12 @@ impl AccountsDb { // Reads will then always read the latest version of a slot. Scans will also know // which version their parents because banks will also be augmented with this version, // which handles cases where a deletion of one version happens in the middle of the scan. - let scan_result: ScanStorageResult> = self.scan_account_storage( - remove_slot, - |loaded_account: LoadedAccount| Some(*loaded_account.pubkey()), - |accum: &DashSet, loaded_account: LoadedAccount| { - accum.insert(*loaded_account.pubkey()); - }, + let remove_unrooted_purge_stats = PurgeStats::default(); + self.purge_slots_from_cache_and_store( + std::iter::once(&remove_slot), + &remove_unrooted_purge_stats, ); - - // Purge this slot from the accounts index - let purge_slot: HashSet = vec![remove_slot].into_iter().collect(); - let mut reclaims = vec![]; - match scan_result { - ScanStorageResult::Cached(cached_keys) => { - for pubkey in cached_keys.iter() { - self.accounts_index - .purge_exact(pubkey, &purge_slot, &mut reclaims); - } - } - ScanStorageResult::Stored(stored_keys) => { - for set_ref in stored_keys.iter() { - self.accounts_index - .purge_exact(set_ref.key(), &purge_slot, &mut reclaims); - } - } - } - - self.handle_reclaims(&reclaims, Some(remove_slot), false, None, false); - - // After handling the reclaimed entries, this slot's - // storage entries should be purged from self.storage - assert!(self.storage.get_slot_stores(remove_slot).is_none()); + remove_unrooted_purge_stats.report("remove_unrooted_slots_purge_slots_stats", Some(0)); } fn include_owner(cluster_type: &ClusterType, slot: Slot) -> bool { @@ -3981,7 +4082,27 @@ impl AccountsDb { dead_slots } - fn finalize_dead_slot_removal<'a>( + fn remove_dead_slots_metadata<'a>( + &'a self, + dead_slots_iter: impl Iterator + Clone, + purged_slot_pubkeys: HashSet<(Slot, Pubkey)>, + // Should only be `Some` for non-cached slots + purged_stored_account_slots: Option<&mut AccountSlots>, + ) { + self.clean_dead_slots_from_accounts_index( + dead_slots_iter.clone(), + purged_slot_pubkeys, + purged_stored_account_slots, + ); + { + let mut bank_hashes = self.bank_hashes.write().unwrap(); + for slot in dead_slots_iter { + bank_hashes.remove(slot); + } + } + } + + fn clean_dead_slots_from_accounts_index<'a>( &'a self, dead_slots_iter: impl Iterator + Clone, purged_slot_pubkeys: HashSet<(Slot, Pubkey)>, @@ -4000,7 +4121,6 @@ impl AccountsDb { let mut accounts_index_root_stats = AccountsIndexRootsStats::default(); let dead_slots: Vec<_> = dead_slots_iter - .clone() .map(|slot| { if let Some(latest) = self.accounts_index.clean_dead_slot(*slot) { accounts_index_root_stats = latest; @@ -4008,18 +4128,11 @@ impl AccountsDb { *slot }) .collect(); - info!("finalize_dead_slot_removal: slots {:?}", dead_slots); + info!("remove_dead_slots_metadata: slots {:?}", dead_slots); self.clean_accounts_stats .latest_accounts_index_roots_stats .update(&accounts_index_root_stats); - - { - let mut bank_hashes = self.bank_hashes.write().unwrap(); - for slot in dead_slots_iter { - bank_hashes.remove(slot); - } - } } fn clean_stored_dead_slots( @@ -4053,7 +4166,7 @@ impl AccountsDb { }) }) }; - self.finalize_dead_slot_removal( + self.remove_dead_slots_metadata( dead_slots.iter(), purged_slot_pubkeys, purged_account_slots, @@ -4387,9 +4500,11 @@ impl AccountsDb { // a) this slot has at least one account (the one being stored), // b)From 1) we know no other slots are included in the "reclaims" // - // From 1) and 2) we guarantee passing Some(slot), true is safe + // From 1) and 2) we guarantee passing `no_purge_stats` == None, which is + // equivalent to asserting there will be no dead slots, is safe. + let no_purge_stats = None; let mut handle_reclaims_time = Measure::start("handle_reclaims"); - self.handle_reclaims(&reclaims, Some(slot), true, None, reset_accounts); + self.handle_reclaims(&reclaims, Some(slot), no_purge_stats, None, reset_accounts); handle_reclaims_time.stop(); self.stats .store_handle_reclaims @@ -4995,6 +5110,7 @@ pub mod tests { inline_spl_token_v2_0, }; use assert_matches::assert_matches; + use dashmap::DashSet; use rand::{thread_rng, Rng}; use solana_sdk::{ account::{Account, AccountSharedData, ReadableAccount, WritableAccount}, @@ -5461,15 +5577,18 @@ pub mod tests { assert_eq!(db0.load_slow(&ancestors, &key), Some((account0, 0))); } - #[test] - fn test_remove_unrooted_slot() { + fn run_test_remove_unrooted_slot(is_cached: bool) { let unrooted_slot = 9; let mut db = AccountsDb::new(Vec::new(), &ClusterType::Development); db.caching_enabled = true; let key = Pubkey::default(); let account0 = AccountSharedData::new(1, 0, &key); - let ancestors: HashMap<_, _> = vec![(unrooted_slot, 1)].into_iter().collect(); - db.store_cached(unrooted_slot, &[(&key, &account0)]); + let ancestors = vec![(unrooted_slot, 1)].into_iter().collect(); + if is_cached { + db.store_cached(unrooted_slot, &[(&key, &account0)]); + } else { + db.store_uncached(unrooted_slot, &[(&key, &account0)]); + } db.bank_hashes .write() .unwrap() @@ -5484,12 +5603,9 @@ pub mod tests { db.remove_unrooted_slot(unrooted_slot); assert!(db.load_slow(&ancestors, &key).is_none()); assert!(db.bank_hashes.read().unwrap().get(&unrooted_slot).is_none()); + assert!(db.accounts_cache.slot_cache(unrooted_slot).is_none()); assert!(db.storage.0.get(&unrooted_slot).is_none()); - assert!(db - .accounts_index - .get_account_read_entry(&key) - .map(|locked_entry| locked_entry.slot_list().is_empty()) - .unwrap_or(true)); + assert!(db.accounts_index.get_account_read_entry(&key).is_none()); assert!(db .accounts_index .get(&key, Some(&ancestors), None) @@ -5501,6 +5617,16 @@ pub mod tests { assert_load_account(&db, unrooted_slot, key, 2); } + #[test] + fn test_remove_unrooted_slot_cached() { + run_test_remove_unrooted_slot(true); + } + + #[test] + fn test_remove_unrooted_slot_storage() { + run_test_remove_unrooted_slot(false); + } + #[test] fn test_remove_unrooted_slot_snapshot() { solana_logger::setup(); @@ -6713,7 +6839,7 @@ pub mod tests { let slots: HashSet = vec![1].into_iter().collect(); let purge_keys = vec![(key1, slots)]; - db.purge_keys_exact(&purge_keys); + db.purge_keys_exact(purge_keys.iter()); let account2 = AccountSharedData::new(3, 0, &key); db.store_uncached(2, &[(&key1, &account2)]); @@ -8482,7 +8608,7 @@ pub mod tests { assert_eq!(account.0.lamports, slot1_account.lamports); // Simulate dropping the bank, which finally removes the slot from the cache - db.purge_slot(1); + db.purge_slot(1, false); assert!(db .do_load(&scan_ancestors, &account_key, Some(max_scan_root)) .is_none()); diff --git a/runtime/src/accounts_index.rs b/runtime/src/accounts_index.rs index 16985fd342c..8360db8b169 100644 --- a/runtime/src/accounts_index.rs +++ b/runtime/src/accounts_index.rs @@ -1049,6 +1049,20 @@ impl AccountsIndex { slot < max_root } + /// Given a list of slots, return a new list of only the slots that are rooted + pub fn get_rooted_from_list<'a>(&self, slots: impl Iterator) -> Vec { + let roots_tracker = self.roots_tracker.read().unwrap(); + slots + .filter_map(|s| { + if roots_tracker.roots.contains(s) { + Some(*s) + } else { + None + } + }) + .collect() + } + pub fn is_root(&self, slot: Slot) -> bool { self.roots_tracker.read().unwrap().roots.contains(&slot) } diff --git a/runtime/src/bank.rs b/runtime/src/bank.rs index fa0eef1bb2d..9c0cc6c8e66 100644 --- a/runtime/src/bank.rs +++ b/runtime/src/bank.rs @@ -5104,7 +5104,7 @@ impl Drop for Bank { // 1. Tests // 2. At startup when replaying blockstore and there's no // AccountsBackgroundService to perform cleanups yet. - self.rc.accounts.purge_slot(self.slot()); + self.rc.accounts.purge_slot(self.slot(), false); } } } @@ -5144,6 +5144,7 @@ fn is_simple_vote_transaction(transaction: &Transaction) -> bool { pub(crate) mod tests { use super::*; use crate::{ + accounts_background_service::{AbsRequestHandler, SendDroppedBankCallback}, accounts_db::SHRINK_RATIO, accounts_index::{ AccountIndex, AccountMap, AccountSecondaryIndexes, Ancestors, ITER_BATCH_SIZE, @@ -5156,7 +5157,7 @@ pub(crate) mod tests { native_loader::NativeLoaderError, status_cache::MAX_CACHE_ENTRIES, }; - use crossbeam_channel::bounded; + use crossbeam_channel::{bounded, unbounded}; use solana_sdk::{ account::Account, clock::{DEFAULT_SLOTS_PER_EPOCH, DEFAULT_TICKS_PER_SLOT}, @@ -11738,8 +11739,11 @@ pub(crate) mod tests { assert!(!debug.is_empty()); } - fn test_store_scan_consistency(accounts_db_caching_enabled: bool, update_f: F) - where + fn test_store_scan_consistency( + accounts_db_caching_enabled: bool, + update_f: F, + drop_callback: Option>, + ) where F: Fn(Arc, crossbeam_channel::Sender>, Arc>, Pubkey, u64) + std::marker::Send, { @@ -11756,6 +11760,7 @@ pub(crate) mod tests { AccountSecondaryIndexes::default(), accounts_db_caching_enabled, )); + bank0.set_callback(drop_callback); // Set up pubkeys to write to let total_pubkeys = ITER_BATCH_SIZE * 10; @@ -11852,9 +11857,18 @@ pub(crate) mod tests { #[test] fn test_store_scan_consistency_unrooted() { for accounts_db_caching_enabled in &[false, true] { + let (pruned_banks_sender, pruned_banks_receiver) = unbounded(); + let abs_request_handler = AbsRequestHandler { + snapshot_request_handler: None, + pruned_banks_receiver, + }; test_store_scan_consistency( *accounts_db_caching_enabled, - |bank0, bank_to_scan_sender, pubkeys_to_modify, program_id, starting_lamports| { + move |bank0, + bank_to_scan_sender, + pubkeys_to_modify, + program_id, + starting_lamports| { let mut current_major_fork_bank = bank0; loop { let mut current_minor_fork_bank = current_major_fork_bank.clone(); @@ -11905,7 +11919,7 @@ pub(crate) mod tests { // Send the last new bank to the scan thread to perform the scan. // Meanwhile this thread will continually set roots on a separate fork - // and squash. + // and squash/clean, purging the account entries from the minor forks /* bank 0 / \ @@ -11926,8 +11940,16 @@ pub(crate) mod tests { // Try to get cache flush/clean to overlap with the scan current_major_fork_bank.force_flush_accounts_cache(); current_major_fork_bank.clean_accounts(false); + // Move purge here so that Bank::drop()->purge_slots() doesn't race + // with clean. Simulates the call from AccountsBackgroundService + let is_abs_service = true; + abs_request_handler + .handle_pruned_banks(¤t_major_fork_bank, is_abs_service); } }, + Some(Box::new(SendDroppedBankCallback::new( + pruned_banks_sender.clone(), + ))), ) } } @@ -11971,6 +11993,7 @@ pub(crate) mod tests { )); } }, + None, ); } } diff --git a/upload-perf/src/upload-perf.rs b/upload-perf/src/upload-perf.rs index 34436be80f6..9f39132fb65 100644 --- a/upload-perf/src/upload-perf.rs +++ b/upload-perf/src/upload-perf.rs @@ -74,7 +74,6 @@ fn main() { ("commit", git_commit_hash.trim().to_string(), String) ); */ - } let last_median = get_last_metrics(&"median".to_string(), &db, &name, &branch) .unwrap_or_default();