Skip to content
This repository was archived by the owner on Jan 22, 2025. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
83 changes: 82 additions & 1 deletion runtime/src/accounts_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -220,11 +220,17 @@ pub struct ErrorCounters {
pub invalid_writable_account: usize,
}

#[derive(Debug, Default, Clone, Copy)]
pub struct IndexGenerationInfo {
pub accounts_data_len: u64,
}

#[derive(Debug, Default, Clone, Copy)]
struct SlotIndexGenerationInfo {
insert_time_us: u64,
num_accounts: u64,
num_accounts_rent_exempt: u64,
accounts_data_len: u64,
}

#[derive(Default, Debug)]
Expand All @@ -241,6 +247,7 @@ struct GenerateIndexTimings {
pub index_flush_us: u64,
pub rent_exempt: u64,
pub total_duplicates: u64,
pub accounts_data_len_dedup_time_us: u64,
}

#[derive(Default, Debug, PartialEq)]
Expand Down Expand Up @@ -287,6 +294,11 @@ impl GenerateIndexTimings {
i64
),
("total_items", self.total_items as i64, i64),
(
"accounts_data_len_dedup_time_us",
self.accounts_data_len_dedup_time_us as i64,
i64
),
);
}
}
Expand Down Expand Up @@ -6676,6 +6688,7 @@ impl AccountsDb {

let secondary = !self.account_indexes.is_empty();

let mut accounts_data_len = 0;
let mut num_accounts_rent_exempt = 0;
let num_accounts = accounts_map.len();
let items = accounts_map.into_iter().map(
Expand All @@ -6695,6 +6708,7 @@ impl AccountsDb {
&self.account_indexes,
);
}
accounts_data_len += stored_account.data().len() as u64;
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I decided on an implementation for generate_index_for_slot() that does not pass in the Atomic, but instead computes the len locally and returns it at the end (as a regular integer). I figured this would be faster due to less contention on the Atomic (and avoiding the atomic operations entirely in this function).


if !rent_collector.should_collect_rent(&pubkey, &stored_account, false) || {
let (_rent_due, exempt) = rent_collector.get_rent_due(&stored_account);
Expand Down Expand Up @@ -6729,6 +6743,7 @@ impl AccountsDb {
insert_time_us,
num_accounts: num_accounts as u64,
num_accounts_rent_exempt,
accounts_data_len,
}
}

Expand Down Expand Up @@ -6863,7 +6878,7 @@ impl AccountsDb {
limit_load_slot_count_from_snapshot: Option<usize>,
verify: bool,
genesis_config: &GenesisConfig,
) {
) -> IndexGenerationInfo {
let mut slots = self.storage.all_slots();
#[allow(clippy::stable_sort_primitive)]
slots.sort();
Expand All @@ -6878,6 +6893,7 @@ impl AccountsDb {
genesis_config.slots_per_year(),
&genesis_config.rent,
);
let accounts_data_len = AtomicU64::new(0);

// pass == 0 always runs and generates the index
// pass == 1 only runs if verify == true.
Expand Down Expand Up @@ -6934,9 +6950,12 @@ impl AccountsDb {
insert_time_us: insert_us,
num_accounts: total_this_slot,
num_accounts_rent_exempt: rent_exempt_this_slot,
accounts_data_len: accounts_data_len_this_slot,
} = self.generate_index_for_slot(accounts_map, slot, &rent_collector);
rent_exempt.fetch_add(rent_exempt_this_slot, Ordering::Relaxed);
total_duplicates.fetch_add(total_this_slot, Ordering::Relaxed);
accounts_data_len
.fetch_add(accounts_data_len_this_slot, Ordering::Relaxed);
insert_us
} else {
// verify index matches expected and measure the time to get all items
Expand Down Expand Up @@ -6990,6 +7009,30 @@ impl AccountsDb {
})
.sum();

// subtract data.len() from accounts_data_len for all old accounts that are in the index twice
let mut accounts_data_len_dedup_timer =
Measure::start("handle accounts data len duplicates");
if pass == 0 {
let mut unique_pubkeys = HashSet::<Pubkey>::default();
self.uncleaned_pubkeys.iter().for_each(|entry| {
entry.value().iter().for_each(|pubkey| {
unique_pubkeys.insert(*pubkey);
})
});
let accounts_data_len_from_duplicates = unique_pubkeys
.into_iter()
.collect::<Vec<_>>()
.par_chunks(4096)
.map(|pubkeys| self.pubkeys_to_duplicate_accounts_data_len(pubkeys))
.sum();
accounts_data_len.fetch_sub(accounts_data_len_from_duplicates, Ordering::Relaxed);
info!(
Comment thread
brooksprumo marked this conversation as resolved.
"accounts data len: {}",
accounts_data_len.load(Ordering::Relaxed)
);
}
accounts_data_len_dedup_timer.stop();

let storage_info_timings = storage_info_timings.into_inner().unwrap();

let mut index_flush_us = 0;
Expand All @@ -7014,6 +7057,7 @@ impl AccountsDb {
storage_size_accounts_map_us: storage_info_timings.storage_size_accounts_map_us,
storage_size_accounts_map_flatten_us: storage_info_timings
.storage_size_accounts_map_flatten_us,
accounts_data_len_dedup_time_us: accounts_data_len_dedup_timer.as_us(),
Comment thread
brooksprumo marked this conversation as resolved.
..GenerateIndexTimings::default()
};

Expand All @@ -7027,6 +7071,43 @@ impl AccountsDb {
}
timings.report();
}

IndexGenerationInfo {
accounts_data_len: accounts_data_len.load(Ordering::Relaxed),
}
}

/// Used during generate_index() to get the _duplicate_ accounts data len from the given pubkeys
fn pubkeys_to_duplicate_accounts_data_len(&self, pubkeys: &[Pubkey]) -> u64 {
let mut accounts_data_len_from_duplicates = 0;
pubkeys.iter().for_each(|pubkey| {
if let Some(entry) = self.accounts_index.get_account_read_entry(pubkey) {
let slot_list = entry.slot_list();
if slot_list.len() < 2 {
return;
}
// Only the account data len in the highest slot should be used, and the rest are
// duplicates. So sort the slot list in descending slot order, skip the first
// item, then sum up the remaining data len, which are the duplicates.
let mut slot_list = slot_list.clone();
slot_list
.select_nth_unstable_by(0, |a, b| b.0.cmp(&a.0))
.2
.iter()
.for_each(|(slot, account_info)| {
let maybe_storage_entry = self
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wrote this code here, but i didn't like it when I wrote it. I assumed there would be a 'load_account' function I could call at this level, but what I came up with were these pieces. It might be nice to see if there is a function already that goes from account_info -> accountshareddata

.storage
.get_account_storage_entry(*slot, account_info.store_id);
let mut accessor = LoadedAccountAccessor::Stored(
maybe_storage_entry.map(|entry| (entry, account_info.offset)),
);
let loaded_account = accessor.check_and_get_loaded_account();
let account = loaded_account.take_account();
accounts_data_len_from_duplicates += account.data().len();
});
}
});
accounts_data_len_from_duplicates as u64
}

fn update_storage_info(
Expand Down
24 changes: 19 additions & 5 deletions runtime/src/serde_snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use {
accounts::Accounts,
accounts_db::{
AccountShrinkThreshold, AccountStorageEntry, AccountsDb, AccountsDbConfig, AppendVecId,
BankHashInfo,
BankHashInfo, IndexGenerationInfo,
},
accounts_index::AccountSecondaryIndexes,
accounts_update_notifier_interface::AccountsUpdateNotifier,
Expand Down Expand Up @@ -334,7 +334,7 @@ fn reconstruct_bank_from_fields<E>(
where
E: SerializableStorage + std::marker::Sync,
{
let accounts_db = reconstruct_accountsdb_from_fields(
let (accounts_db, reconstructed_accounts_db_info) = reconstruct_accountsdb_from_fields(
snapshot_accounts_db_fields,
account_paths,
unpacked_append_vec_map,
Expand All @@ -347,6 +347,10 @@ where
accounts_db_config,
accounts_update_notifier,
)?;
debug!(
"accounts data len: {}",
reconstructed_accounts_db_info.accounts_data_len
);

let bank_rc = BankRc::new(Accounts::new_empty(accounts_db), bank_fields.slot);

Expand Down Expand Up @@ -386,6 +390,12 @@ where
Ok(())
}

/// This struct contains side-info while reconstructing the accounts DB from fields.
#[derive(Debug, Default, Copy, Clone)]
struct ReconstructedAccountsDbInfo {
accounts_data_len: u64,
}

#[allow(clippy::too_many_arguments)]
fn reconstruct_accountsdb_from_fields<E>(
snapshot_accounts_db_fields: SnapshotAccountsDbFields<E>,
Expand All @@ -399,7 +409,7 @@ fn reconstruct_accountsdb_from_fields<E>(
verify_index: bool,
accounts_db_config: Option<AccountsDbConfig>,
accounts_update_notifier: Option<AccountsUpdateNotifier>,
) -> Result<AccountsDb, Error>
) -> Result<(AccountsDb, ReconstructedAccountsDbInfo), Error>
where
E: SerializableStorage + std::marker::Sync,
{
Expand Down Expand Up @@ -536,11 +546,12 @@ where
})
.unwrap();

let _ = accounts_db.generate_index(
let IndexGenerationInfo { accounts_data_len } = accounts_db.generate_index(
limit_load_slot_count_from_snapshot,
verify_index,
genesis_config,
);

accounts_db.maybe_add_filler_accounts(&genesis_config.epoch_schedule);

handle.join().unwrap();
Expand All @@ -557,5 +568,8 @@ where
("accountsdb-notify-at-start-us", measure_notify.as_us(), i64),
);

Ok(Arc::try_unwrap(accounts_db).unwrap())
Ok((
Arc::try_unwrap(accounts_db).unwrap(),
ReconstructedAccountsDbInfo { accounts_data_len },
))
}
1 change: 1 addition & 0 deletions runtime/src/serde_snapshot/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ where
Some(crate::accounts_db::ACCOUNTS_DB_CONFIG_FOR_TESTING),
None,
)
.map(|(accounts_db, _)| accounts_db)
}

#[cfg(test)]
Expand Down