Skip to content
Merged
110 changes: 109 additions & 1 deletion accounts-db/src/account_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,14 @@
use {
crate::accounts_db::{AccountStorageEntry, AccountsFileId},
dashmap::DashMap,
rand::seq::SliceRandom,
rayon::iter::{IndexedParallelIterator, IntoParallelRefIterator, ParallelIterator},
solana_clock::Slot,
solana_nohash_hasher::{BuildNoHashHasher, IntMap},
std::sync::{Arc, RwLock},
std::{
ops::Range,
sync::{Arc, RwLock},
},
};

pub mod stored_account_info;
Expand Down Expand Up @@ -293,6 +298,81 @@ impl Default for AccountStorageStatus {
}
}

/// Wrapper over slice of `Arc<AccountStorageEntry>` that provides an ordered access to storages.
///
/// A few strategies are available for ordering storages:
/// - `with_small_to_large_ratio`: interleaving small and large storage file sizes
/// - `with_random_order`: orders storages randomly
pub struct AccountStoragesOrderer<'a> {
storages: &'a [Arc<AccountStorageEntry>],
indices: Box<[usize]>,
}

impl<'a> AccountStoragesOrderer<'a> {
/// Create balancing orderer that interleaves storages with small and large file sizes.
///
/// Storages are returned in cycles based on `small_to_large_ratio` - `ratio.0` small storages
/// preceding `ratio.1` large storages.
pub fn with_small_to_large_ratio(
storages: &'a [Arc<AccountStorageEntry>],
small_to_large_ratio: (usize, usize),
) -> Self {
let len_range = 0..storages.len();
let mut indices: Vec<_> = len_range.clone().collect();
indices.sort_unstable_by_key(|i| storages[*i].capacity());
indices.iter_mut().for_each(|i| {
*i = select_from_range_with_start_end_rates(len_range.clone(), *i, small_to_large_ratio)
});
Self {
storages,
indices: indices.into_boxed_slice(),
}
}

/// Create randomizing orderer.
pub fn with_random_order(storages: &'a [Arc<AccountStorageEntry>]) -> Self {
let mut indices: Vec<usize> = (0..storages.len()).collect();
indices.shuffle(&mut rand::thread_rng());
Self {
storages,
indices: indices.into_boxed_slice(),
}
}

pub fn iter(&'a self) -> impl ExactSizeIterator<Item = &'a AccountStorageEntry> + 'a {
self.indices.iter().map(|i| self.storages[*i].as_ref())
}

pub fn par_iter(&'a self) -> impl IndexedParallelIterator<Item = &'a AccountStorageEntry> + 'a {
self.indices.par_iter().map(|i| self.storages[*i].as_ref())
}
}

/// Select the `nth` (`0 <= nth < range.len()`) value from a `range`, choosing values alternately
/// from its start or end according to a `start_rate : end_rate` ratio.
///
/// For every `start_rate` values selected from the start, `end_rate` values are selected from the end.
/// The resulting sequence alternates in a balanced and interleaved fashion between the range's start and end.
/// ```
fn select_from_range_with_start_end_rates(
range: Range<usize>,
nth: usize,
(start_rate, end_rate): (usize, usize),
) -> usize {
let range_len = range.len();
let cycle = start_rate + end_rate;
let cycle_index = nth % cycle;
let cycle_num = nth.checked_div(cycle).expect("rates sum must be positive");

let index = if cycle_index < start_rate {
cycle_num * start_rate + cycle_index
} else {
let end_index = cycle_num * end_rate + cycle_index - start_rate;
range_len - end_index - 1
};
range.start + index
}

#[cfg(test)]
pub(crate) mod tests {
use {
Expand Down Expand Up @@ -622,4 +702,32 @@ pub(crate) mod tests {
.insert(0, storage.get_test_storage());
storage.get_if(|_, _| true);
}

#[test]
fn test_select_range_with_start_end_rates() {
let interleaved: Vec<_> = (0..10)
.map(|i| select_from_range_with_start_end_rates(1..11, i, (2, 1)))
.collect();
assert_eq!(interleaved, vec![1, 2, 10, 3, 4, 9, 5, 6, 8, 7]);

let interleaved: Vec<_> = (0..10)
.map(|i| select_from_range_with_start_end_rates(1..11, i, (1, 1)))
.collect();
assert_eq!(interleaved, vec![1, 10, 2, 9, 3, 8, 4, 7, 5, 6]);

let interleaved: Vec<_> = (0..9)
.map(|i| select_from_range_with_start_end_rates(1..10, i, (2, 1)))
.collect();
assert_eq!(interleaved, vec![1, 2, 9, 3, 4, 8, 5, 6, 7]);

let interleaved: Vec<_> = (0..9)
.map(|i| select_from_range_with_start_end_rates(1..10, i, (1, 2)))
.collect();
assert_eq!(interleaved, vec![1, 9, 8, 2, 7, 6, 3, 5, 4]);

let interleaved: Vec<_> = (0..13)
.map(|i| select_from_range_with_start_end_rates(1..14, i, (2, 3)))
.collect();
assert_eq!(interleaved, vec![1, 2, 13, 12, 11, 3, 4, 10, 9, 8, 5, 6, 7]);
}
}
6 changes: 5 additions & 1 deletion accounts-db/src/accounts_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ use {
account_info::{AccountInfo, Offset, StorageLocation},
account_storage::{
stored_account_info::{StoredAccountInfo, StoredAccountInfoWithoutData},
AccountStorage, AccountStorageStatus, ShrinkInProgress,
AccountStorage, AccountStorageStatus, AccountStoragesOrderer, ShrinkInProgress,
},
accounts_cache::{AccountsCache, CachedAccount, SlotCache},
accounts_db::stats::{
Expand Down Expand Up @@ -5714,6 +5714,10 @@ impl AccountsDb {
storages: &[Arc<AccountStorageEntry>],
duplicates_lt_hash: &DuplicatesLtHash,
) -> AccountsLtHash {
// Randomized order works well with rayon work splitting, since we only care about
// uniform distribution of total work size per batch (other ordering strategies might be
// useful for optimizing disk read sizes and buffers usage in a single IO queue).
let storages = AccountStoragesOrderer::with_random_order(storages);
let mut lt_hash = storages
.par_iter()
.fold(LtHash::identity, |mut accum, storage| {
Expand Down
71 changes: 7 additions & 64 deletions runtime/src/snapshot_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use {
log::*,
regex::Regex,
solana_accounts_db::{
account_storage::AccountStorageMap,
account_storage::{AccountStorageMap, AccountStoragesOrderer},
account_storage_reader::AccountStorageReader,
accounts_db::{AccountStorageEntry, AtomicAccountsFileId},
accounts_file::{AccountsFile, AccountsFileError, StorageAccess},
Expand All @@ -38,7 +38,7 @@ use {
io::{self, BufRead, BufReader, BufWriter, Error as IoError, Read, Seek, Write},
mem,
num::{NonZeroU64, NonZeroUsize},
ops::{Range, RangeInclusive},
ops::RangeInclusive,
path::{Path, PathBuf},
process::ExitStatus,
str::FromStr,
Expand Down Expand Up @@ -1125,15 +1125,11 @@ fn archive_snapshot(
.append_dir_all(SNAPSHOTS_DIR, &staging_snapshots_dir)
.map_err(E::ArchiveSnapshotsDir)?;

let mut sorted_storage_indices = (0..snapshot_storages.len()).collect::<Vec<_>>();
sorted_storage_indices.sort_by_key(|&i| snapshot_storages[i].accounts.len());
for i in 0..sorted_storage_indices.len() {
let index = select_from_range_with_start_end_rates(
0..sorted_storage_indices.len(),
i,
INTERLEAVE_TAR_ENTRIES_SMALL_TO_LARGE_RATIO,
);
let storage = &snapshot_storages[sorted_storage_indices[index]];
let storages_orderer = AccountStoragesOrderer::with_small_to_large_ratio(
snapshot_storages,
INTERLEAVE_TAR_ENTRIES_SMALL_TO_LARGE_RATIO,
);
for storage in storages_orderer.iter() {
let path_in_archive = Path::new(ACCOUNTS_DIR)
.join(AccountsFile::file_name(storage.slot(), storage.id()));

Expand Down Expand Up @@ -1215,31 +1211,6 @@ fn archive_snapshot(
})
}

/// Select the `nth` (`0 <= nth < range.len()`) value from a `range`, choosing values alternately
/// from its start or end according to a `start_rate : end_rate` ratio.
///
/// For every `start_rate` values selected from the start, `end_rate` values are selected from the end.
/// The resulting sequence alternates in a balanced and interleaved fashion between the range's start and end.
/// ```
fn select_from_range_with_start_end_rates(
range: Range<usize>,
nth: usize,
(start_rate, end_rate): (usize, usize),
) -> usize {
let range_len = range.len();
let cycle = start_rate + end_rate;
let cycle_index = nth % cycle;
let cycle_num = nth.checked_div(cycle).expect("rates sum must be positive");

let index = if cycle_index < start_rate {
cycle_num * start_rate + cycle_index
} else {
let end_index = cycle_num * end_rate + cycle_index - start_rate;
range_len - end_index - 1
};
range.start + index
}

/// Get the bank snapshots in a directory
pub fn get_bank_snapshots(bank_snapshots_dir: impl AsRef<Path>) -> Vec<BankSnapshotInfo> {
let mut bank_snapshots = Vec::default();
Expand Down Expand Up @@ -3699,32 +3670,4 @@ mod tests {
.starts_with("invalid full snapshot slot file size"));
}
}

#[test]
fn test_select_from_start_or_end_index_by_ratio() {
let interleaved: Vec<_> = (0..10)
.map(|i| select_from_range_with_start_end_rates(1..11, i, (2, 1)))
.collect();
assert_eq!(interleaved, vec![1, 2, 10, 3, 4, 9, 5, 6, 8, 7]);

let interleaved: Vec<_> = (0..10)
.map(|i| select_from_range_with_start_end_rates(1..11, i, (1, 1)))
.collect();
assert_eq!(interleaved, vec![1, 10, 2, 9, 3, 8, 4, 7, 5, 6]);

let interleaved: Vec<_> = (0..9)
.map(|i| select_from_range_with_start_end_rates(1..10, i, (2, 1)))
.collect();
assert_eq!(interleaved, vec![1, 2, 9, 3, 4, 8, 5, 6, 7]);

let interleaved: Vec<_> = (0..9)
.map(|i| select_from_range_with_start_end_rates(1..10, i, (1, 2)))
.collect();
assert_eq!(interleaved, vec![1, 9, 8, 2, 7, 6, 3, 5, 4]);

let interleaved: Vec<_> = (0..13)
.map(|i| select_from_range_with_start_end_rates(1..14, i, (2, 3)))
.collect();
assert_eq!(interleaved, vec![1, 2, 13, 12, 11, 3, 4, 10, 9, 8, 5, 6, 7]);
}
}
Loading