From 572d88a19e5e9b9014be18588f712d5c6e856c65 Mon Sep 17 00:00:00 2001 From: Carl Lin Date: Thu, 24 Sep 2020 12:25:08 -0700 Subject: [PATCH 1/4] Fix zero lamport purge --- runtime/src/accounts_db.rs | 35 ++++++++++++++++++++++++++++------- runtime/src/accounts_index.rs | 30 +++++++++++++++++++++++------- 2 files changed, 51 insertions(+), 14 deletions(-) diff --git a/runtime/src/accounts_db.rs b/runtime/src/accounts_db.rs index 5448e364511b9e..8ef18ba2a70b96 100644 --- a/runtime/src/accounts_db.rs +++ b/runtime/src/accounts_db.rs @@ -2064,24 +2064,31 @@ impl AccountsDB { } drop(storage); datapoint_debug!("clean_dead_slots", ("stores", stores.len(), i64)); - let pubkeys: Vec> = { + let slot_pubkeys: Vec<(Slot, Vec)> = { self.thread_pool_clean.install(|| { stores .into_par_iter() .map(|store| { let accounts = store.accounts.accounts(0); - accounts - .into_iter() - .map(|account| account.meta.pubkey) - .collect::>() + ( + store.slot, + accounts + .into_iter() + .map(|account| account.meta.pubkey) + .collect::>(), + ) }) .collect() }) }; let index = self.accounts_index.read().unwrap(); - for pubkey_v in pubkeys { + let mut cleaned_slot_keys: HashSet<(Slot, Pubkey)> = HashSet::new(); + for (slot, pubkey_v) in slot_pubkeys { for pubkey in pubkey_v { - index.unref_from_storage(&pubkey); + if !cleaned_slot_keys.contains(&(slot, pubkey)) { + index.unref_from_storage(&pubkey); + cleaned_slot_keys.insert((slot, pubkey)); + } } } drop(index); @@ -3073,8 +3080,10 @@ pub mod tests { let pubkey2 = Pubkey::new_rand(); let normal_account = Account::new(1, 0, &Account::default().owner); let zero_account = Account::new(0, 0, &Account::default().owner); + //store an account accounts.store(0, &[(&pubkey1, &normal_account)]); + accounts.store(0, &[(&pubkey1, &normal_account)]); accounts.store(1, &[(&pubkey1, &zero_account)]); accounts.store(0, &[(&pubkey2, &normal_account)]); accounts.store(2, &[(&pubkey2, &normal_account)]); @@ -3093,8 +3102,20 @@ pub mod tests { //both zero lamport and normal accounts are cleaned up assert_eq!(accounts.alive_account_count_in_store(0), 0); + // The only store to slot 1 was a zero lamport account, should + // be purged by zero-lamport cleaning logic because slot 1 is + // rooted assert_eq!(accounts.alive_account_count_in_store(1), 0); assert_eq!(accounts.alive_account_count_in_store(2), 1); + + // Pubkey 1, a zero lamport account, should no longer exist in accounts index + // because it has been removed + assert!(accounts + .accounts_index + .read() + .unwrap() + .get(&pubkey1, None) + .is_none()); } #[test] diff --git a/runtime/src/accounts_index.rs b/runtime/src/accounts_index.rs index cd43d7095ed048..aca7bfbe0d6cff 100644 --- a/runtime/src/accounts_index.rs +++ b/runtime/src/accounts_index.rs @@ -159,15 +159,31 @@ impl<'a, T: 'a + Clone> AccountsIndex { reclaims: &mut SlotList, ) -> Option { if let Some(lock) = self.account_maps.get(pubkey) { - let mut list = &mut lock.1.write().unwrap(); - // filter out other dirty entries - reclaims.extend(list.iter().filter(|(f, _)| *f == slot).cloned()); - list.retain(|(f, _)| *f != slot); - - lock.0.fetch_add(1, Ordering::Relaxed); + let list = &mut lock.1.write().unwrap(); + // filter out other dirty entries from the same slot + let mut same_slot_previous_updates: Vec<(usize, (Slot, T))> = list + .iter() + .enumerate() + .filter_map(|(i, (f, value))| { + if *f == slot { + Some((i, (*f, value.clone()))) + } else { + None + } + }) + .collect(); + assert!(same_slot_previous_updates.len() <= 1); + + if let Some((list_index, previous_update)) = same_slot_previous_updates.pop() { + reclaims.push(previous_update); + list.remove(list_index); + } else { + // Only increment ref count if the account was not prevously updated in this slot + lock.0.fetch_add(1, Ordering::Relaxed); + } list.push((slot, account_info)); // now, do lazy clean - self.purge_older_root_entries(&mut list, reclaims); + self.purge_older_root_entries(list, reclaims); None } else { From 18e00b1e3ec7d3214c2e313680db8214dbaab015 Mon Sep 17 00:00:00 2001 From: Carl Lin Date: Thu, 24 Sep 2020 15:34:36 -0700 Subject: [PATCH 2/4] Fix test --- runtime/src/accounts_db.rs | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/runtime/src/accounts_db.rs b/runtime/src/accounts_db.rs index 8ef18ba2a70b96..21f9b0f9ff5a7d 100644 --- a/runtime/src/accounts_db.rs +++ b/runtime/src/accounts_db.rs @@ -4243,23 +4243,27 @@ pub mod tests { accounts.store(current_slot, &[(&pubkey1, &account2)]); accounts.store(current_slot, &[(&pubkey1, &account2)]); assert_eq!(1, accounts.alive_account_count_in_store(current_slot)); - assert_eq!(3, accounts.ref_count_for_pubkey(&pubkey1)); + // Stores to same pubkey, same slot only count once towards the + // ref count + assert_eq!(2, accounts.ref_count_for_pubkey(&pubkey1)); accounts.add_root(current_slot); // C: Yet more update to trigger lazy clean of step A current_slot += 1; - assert_eq!(3, accounts.ref_count_for_pubkey(&pubkey1)); + assert_eq!(2, accounts.ref_count_for_pubkey(&pubkey1)); accounts.store(current_slot, &[(&pubkey1, &account3)]); - assert_eq!(4, accounts.ref_count_for_pubkey(&pubkey1)); + assert_eq!(3, accounts.ref_count_for_pubkey(&pubkey1)); accounts.add_root(current_slot); // D: Make pubkey1 0-lamport; also triggers clean of step B current_slot += 1; - assert_eq!(4, accounts.ref_count_for_pubkey(&pubkey1)); + assert_eq!(3, accounts.ref_count_for_pubkey(&pubkey1)); accounts.store(current_slot, &[(&pubkey1, &zero_lamport_account)]); accounts.process_dead_slots(None); assert_eq!( - 3, /* == 4 - 2 + 1 */ + // Removed one reference from the dead slot (reference only counted once + // even though there were two stores to the pubkey in that slot) + 3, /* == 3 - 1 + 1 */ accounts.ref_count_for_pubkey(&pubkey1) ); accounts.add_root(current_slot); From 22b4d108ded2f7aa23faa5d5c8cb5625266279ce Mon Sep 17 00:00:00 2001 From: Carl Lin Date: Fri, 25 Sep 2020 00:31:26 -0700 Subject: [PATCH 3/4] PR comments --- runtime/src/accounts_index.rs | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/runtime/src/accounts_index.rs b/runtime/src/accounts_index.rs index aca7bfbe0d6cff..f697f43294920d 100644 --- a/runtime/src/accounts_index.rs +++ b/runtime/src/accounts_index.rs @@ -161,12 +161,12 @@ impl<'a, T: 'a + Clone> AccountsIndex { if let Some(lock) = self.account_maps.get(pubkey) { let list = &mut lock.1.write().unwrap(); // filter out other dirty entries from the same slot - let mut same_slot_previous_updates: Vec<(usize, (Slot, T))> = list + let mut same_slot_previous_updates: Vec<(usize, (Slot, &T))> = list .iter() .enumerate() - .filter_map(|(i, (f, value))| { - if *f == slot { - Some((i, (*f, value.clone()))) + .filter_map(|(i, (s, value))| { + if *s == slot { + Some((i, (*s, value))) } else { None } @@ -174,8 +174,9 @@ impl<'a, T: 'a + Clone> AccountsIndex { .collect(); assert!(same_slot_previous_updates.len() <= 1); - if let Some((list_index, previous_update)) = same_slot_previous_updates.pop() { - reclaims.push(previous_update); + if let Some((list_index, (s, previous_update_value))) = same_slot_previous_updates.pop() + { + reclaims.push((s, previous_update_value.clone())); list.remove(list_index); } else { // Only increment ref count if the account was not prevously updated in this slot From ecec28abcced065b1b32d8165c382012fa0a268b Mon Sep 17 00:00:00 2001 From: Carl Lin Date: Fri, 25 Sep 2020 01:42:02 -0700 Subject: [PATCH 4/4] Reduce in par iter --- runtime/src/accounts_db.rs | 28 +++++++++++----------------- 1 file changed, 11 insertions(+), 17 deletions(-) diff --git a/runtime/src/accounts_db.rs b/runtime/src/accounts_db.rs index 21f9b0f9ff5a7d..3dd05711497b0b 100644 --- a/runtime/src/accounts_db.rs +++ b/runtime/src/accounts_db.rs @@ -2064,32 +2064,26 @@ impl AccountsDB { } drop(storage); datapoint_debug!("clean_dead_slots", ("stores", stores.len(), i64)); - let slot_pubkeys: Vec<(Slot, Vec)> = { + let slot_pubkeys: HashSet<(Slot, Pubkey)> = { self.thread_pool_clean.install(|| { stores .into_par_iter() .map(|store| { let accounts = store.accounts.accounts(0); - ( - store.slot, - accounts - .into_iter() - .map(|account| account.meta.pubkey) - .collect::>(), - ) + accounts + .into_iter() + .map(|account| (store.slot, account.meta.pubkey)) + .collect::>() + }) + .reduce(HashSet::new, |mut reduced, store_pubkeys| { + reduced.extend(store_pubkeys); + reduced }) - .collect() }) }; let index = self.accounts_index.read().unwrap(); - let mut cleaned_slot_keys: HashSet<(Slot, Pubkey)> = HashSet::new(); - for (slot, pubkey_v) in slot_pubkeys { - for pubkey in pubkey_v { - if !cleaned_slot_keys.contains(&(slot, pubkey)) { - index.unref_from_storage(&pubkey); - cleaned_slot_keys.insert((slot, pubkey)); - } - } + for (_slot, pubkey) in slot_pubkeys { + index.unref_from_storage(&pubkey); } drop(index); measure.stop();