diff --git a/accounts-db/src/account_storage.rs b/accounts-db/src/account_storage.rs index 324583f5cde70a..ceae47255d7f97 100644 --- a/accounts-db/src/account_storage.rs +++ b/accounts-db/src/account_storage.rs @@ -8,6 +8,7 @@ use { solana_clock::Slot, solana_nohash_hasher::{BuildNoHashHasher, IntMap}, std::{ + collections::VecDeque, ops::{Index, Range}, sync::{ atomic::{AtomicUsize, Ordering}, @@ -410,6 +411,35 @@ impl<'a> AccountStoragesConcurrentConsumer<'a> { None } } + + /// Add a number of consecutive `AccountStorageEntry` entries to `chunk`. + /// + /// Returns iterator over new entries added to `chunk` (consuming it won't affect the chunk). + /// + /// Chunk is filled up to its existing capacity (no reallocation will happen). + pub fn take_up_to_capacity<'b>( + &'a self, + chunk: &'b mut VecDeque>, + ) -> impl Iterator> + 'b { + let starting_len = chunk.len(); + let num_missing_items = chunk.capacity() - starting_len; + let consume_pos = self + .current_position + .fetch_add(num_missing_items, Ordering::Relaxed); + let consume_end_pos = self + .orderer + .entries_len() + .min(consume_pos + num_missing_items); + for position in consume_pos..consume_end_pos { + let original_index = self.orderer.original_index(position); + chunk.push_back(NextItem { + position, + original_index, + storage: &self.orderer[position], + }); + } + chunk.range(starting_len..) + } } /// Value returned from calling `AccountStoragesConcurrentConsumer::next()` diff --git a/accounts-db/src/accounts_db.rs b/accounts-db/src/accounts_db.rs index 93715e517e2da0..7acf1cbe970145 100644 --- a/accounts-db/src/accounts_db.rs +++ b/accounts-db/src/accounts_db.rs @@ -51,7 +51,7 @@ use { active_stats::{ActiveStatItem, ActiveStats}, ancestors::Ancestors, append_vec::{self, aligned_stored_size, STORE_META_OVERHEAD}, - buffered_reader::RequiredLenBufFileRead, + buffered_reader::{FileBufRead as _, RequiredLenBufFileRead}, contains::Contains, is_zero_lamport::IsZeroLamport, obsolete_accounts::ObsoleteAccounts, @@ -1969,12 +1969,12 @@ impl AccountsDb { storages.retain(|s| s.slot() <= max_slot_inclusive); // populate storages.par_iter().for_each_init( - || Box::new(append_vec::new_scan_accounts_reader()), + || append_vec::new_scan_accounts_reader(0), |reader, storage| { let slot = storage.slot(); storage .accounts - .scan_accounts(reader.as_mut(), |_offset, account| { + .scan_accounts(reader, |_offset, account| { let pk = account.pubkey(); match pubkey_refcount.entry(*pk) { dashmap::mapref::entry::Entry::Occupied(mut occupied_entry) => { @@ -3833,7 +3833,7 @@ impl AccountsDb { }) } ScanAccountStorageData::DataRefForStorage => { - let mut reader = append_vec::new_scan_accounts_reader(); + let mut reader = append_vec::new_scan_accounts_reader(0); storage.scan_accounts(&mut reader, |_offset, account| { let account_without_data = StoredAccountInfoWithoutData::new_from(&account); storage_scan_func(retval, &account_without_data, Some(account.data)); @@ -6529,6 +6529,7 @@ impl AccountsDb { pub fn generate_index( &self, limit_load_slot_count_from_snapshot: Option, + memlock_budget_size: usize, verify: bool, ) -> IndexGenerationInfo { let mut total_time = Measure::start("generate_index"); @@ -6592,11 +6593,15 @@ impl AccountsDb { } let mut total_accum = IndexGenerationAccumulator::new(); - let storages_orderer = - AccountStoragesOrderer::with_random_order(&storages).into_concurrent_consumer(); + // Balance ratio of small to large files to amortize per-operation cost across the + // whole scan process and keep total bytes in pending reads similar at all times. + let ratio = append_vec::SCAN_ACCOUNTS_SMALL_TO_LARGE_FILE_RATIO; + let storages_orderer = AccountStoragesOrderer::with_small_to_large_ratio(&storages, ratio) + .into_concurrent_consumer(); let exit_logger = AtomicBool::new(false); let num_processed = AtomicU64::new(0); let num_threads = num_cpus::get(); + let per_thread_memlock_budget_size = memlock_budget_size / num_threads; let mut index_time = Measure::start("index"); thread::scope(|s| { let thread_handles = (0..num_threads) @@ -6605,8 +6610,33 @@ impl AccountsDb { .name(format!("solGenIndex{i:02}")) .spawn_scoped(s, || { let mut thread_accum = IndexGenerationAccumulator::new(); - let mut reader = append_vec::new_scan_accounts_reader(); - while let Some(next_item) = storages_orderer.next() { + let mut reader = append_vec::new_scan_accounts_reader( + per_thread_memlock_budget_size, + ); + // Always consume a multiple of small/large files cycle, when chunk + // is replenished, it will get a single cycle (half capacity) and + // add it to prefetch, while the existing cycle is being read. + let mut chunk = VecDeque::with_capacity(2 * (ratio.0 + ratio.1)); + + loop { + if chunk.is_empty() || chunk.len() == chunk.capacity() / 2 { + for new_item in storages_orderer.take_up_to_capacity(&mut chunk) + { + if let Some((file, size)) = + new_item.storage.accounts.file_io_info() + { + reader + .add_file_to_prefetch(file, size) + .expect("must prefetch accounts storage") + } + } + reader.submit_prefetch().expect("must submit prefetch"); + } + + let Some(next_item) = chunk.pop_front() else { + break; + }; + self.maybe_throttle_index_generation(); let storage = next_item.storage; let store_id = storage.id(); @@ -7372,7 +7402,7 @@ impl AccountsDb { storages: &[Arc], mut callback: impl for<'local> FnMut(Offset, StoredAccountInfo<'local>), ) { - let mut reader = append_vec::new_scan_accounts_reader(); + let mut reader = append_vec::new_scan_accounts_reader(0); for storage in storages { storage .accounts diff --git a/accounts-db/src/accounts_db/geyser_plugin_utils.rs b/accounts-db/src/accounts_db/geyser_plugin_utils.rs index 11477b5d9939d7..2185df8a9bd47f 100644 --- a/accounts-db/src/accounts_db/geyser_plugin_utils.rs +++ b/accounts-db/src/accounts_db/geyser_plugin_utils.rs @@ -138,7 +138,7 @@ pub mod tests { let notifier = GeyserTestPlugin::default(); let notifier = Arc::new(notifier); accounts_db.set_geyser_plugin_notifier(Some(notifier.clone())); - accounts_db.generate_index(None, false); + accounts_db.generate_index(None, 0, false); // Ensure key1 was notified twice in different slots { diff --git a/accounts-db/src/accounts_db/tests.rs b/accounts-db/src/accounts_db/tests.rs index fc311bd9853747..6a9e5f52aabba2 100644 --- a/accounts-db/src/accounts_db/tests.rs +++ b/accounts-db/src/accounts_db/tests.rs @@ -183,7 +183,7 @@ fn run_generate_index_duplicates_within_slot_test(db: AccountsDb, reverse: bool) assert!(!db.accounts_index.contains(&pubkey)); let storage_info = StorageSizeAndCountMap::default(); let storage = db.get_storage_for_slot(slot0).unwrap(); - let mut reader = append_vec::new_scan_accounts_reader(); + let mut reader = append_vec::new_scan_accounts_reader(0); db.generate_index_for_slot( &mut reader, &storage, @@ -221,7 +221,11 @@ fn test_generate_index_for_single_ref_zero_lamport_slot() { let storable_accounts = (slot0, &data[..]); append_vec.accounts.write_accounts(&storable_accounts, 0); assert!(!db.accounts_index.contains(&pubkey)); - let result = db.generate_index(None, false); + let result = db.generate_index( + None, + ACCOUNTS_DB_CONFIG_FOR_TESTING.memlock_budget_size, + false, + ); let entry = db.accounts_index.get_cloned(&pubkey).unwrap(); assert_eq!(entry.slot_list_lock_read_len(), 1); assert_eq!(append_vec.alive_bytes(), aligned_stored_size(0)); @@ -4938,7 +4942,7 @@ define_accounts_db_test!(test_calculate_storage_count_and_alive_bytes, |accounts let storage = accounts.storage.get_slot_storage_entry(slot0).unwrap(); let storage_info = StorageSizeAndCountMap::default(); - let mut reader = append_vec::new_scan_accounts_reader(); + let mut reader = append_vec::new_scan_accounts_reader(0); accounts.generate_index_for_slot(&mut reader, &storage, slot0, 0, &storage_info); assert_eq!(storage_info.len(), 1); for entry in storage_info.iter() { @@ -4962,7 +4966,7 @@ define_accounts_db_test!( // empty store let storage = accounts.create_and_insert_store(0, 1, "test"); let storage_info = StorageSizeAndCountMap::default(); - let mut reader = append_vec::new_scan_accounts_reader(); + let mut reader = append_vec::new_scan_accounts_reader(0); accounts.generate_index_for_slot(&mut reader, &storage, 0, 0, &storage_info); assert!(storage_info.is_empty()); } @@ -4999,7 +5003,7 @@ define_accounts_db_test!( ); let storage_info = StorageSizeAndCountMap::default(); - let mut reader = append_vec::new_scan_accounts_reader(); + let mut reader = append_vec::new_scan_accounts_reader(0); accounts.generate_index_for_slot(&mut reader, &storage, 0, 0, &storage_info); assert_eq!(storage_info.len(), 1); for entry in storage_info.iter() { @@ -6132,7 +6136,7 @@ fn test_combine_ancient_slots_simple() { fn get_all_accounts_from_storages<'a>( storages: impl Iterator>, ) -> Vec<(Pubkey, AccountSharedData)> { - let mut reader = append_vec::new_scan_accounts_reader(); + let mut reader = append_vec::new_scan_accounts_reader(0); storages .flat_map(|storage| { let mut vec = Vec::default(); diff --git a/accounts-db/src/accounts_file.rs b/accounts-db/src/accounts_file.rs index dc96999f6d7ed5..2cfc9578d643d4 100644 --- a/accounts-db/src/accounts_file.rs +++ b/accounts-db/src/accounts_file.rs @@ -16,6 +16,7 @@ use { solana_clock::Slot, solana_pubkey::Pubkey, std::{ + fs::File, mem, path::{Path, PathBuf}, }, @@ -247,6 +248,14 @@ impl AccountsFile { } } + /// Return the `File` and its size if accounts are backed by file-io. + pub fn file_io_info(&self) -> Option<(&File, usize)> { + match self { + Self::AppendVec(av) => av.file_io_info(), + Self::TieredStorage(_) => unimplemented!(), + } + } + /// Iterate over all accounts and call `callback` with each account. /// /// `callback` parameters: @@ -279,7 +288,7 @@ impl AccountsFile { /// as it can potentially read less and be faster. pub(crate) fn scan_accounts<'a>( &'a self, - reader: &mut impl RequiredLenBufFileRead<'a>, + reader: &mut dyn RequiredLenBufFileRead<'a>, callback: impl for<'local> FnMut(Offset, StoredAccountInfo<'local>), ) -> Result<()> { match self { @@ -302,7 +311,7 @@ impl AccountsFile { &self, callback: impl for<'local> FnMut(StoredAccountMeta<'local>), ) -> Result<()> { - let mut reader = append_vec::new_scan_accounts_reader(); + let mut reader = append_vec::new_scan_accounts_reader(0); match self { Self::AppendVec(av) => av.scan_accounts_stored_meta(&mut reader, callback)?, Self::TieredStorage(_) => { diff --git a/accounts-db/src/ancient_append_vecs.rs b/accounts-db/src/ancient_append_vecs.rs index ee2b1531c9ae55..451ce3bb5d1315 100644 --- a/accounts-db/src/ancient_append_vecs.rs +++ b/accounts-db/src/ancient_append_vecs.rs @@ -2107,7 +2107,7 @@ pub mod tests { .map(|storage| storage.id()) .collect::>() ); - let mut reader = append_vec::new_scan_accounts_reader(); + let mut reader = append_vec::new_scan_accounts_reader(0); // assert that we wrote the 2_ref account to the newly shrunk append vec let shrink_in_progress = shrinks_in_progress.first().unwrap().1; @@ -2276,7 +2276,7 @@ pub mod tests { (*account.pubkey(), account.to_account_shared_data()) }) .unwrap(); - let mut reader = append_vec::new_scan_accounts_reader(); + let mut reader = append_vec::new_scan_accounts_reader(0); let mut count = 0; storage .accounts @@ -3217,7 +3217,7 @@ pub mod tests { one.first().unwrap().1.old_storage().id(), storages[combine_into].id() ); - let mut reader = append_vec::new_scan_accounts_reader(); + let mut reader = append_vec::new_scan_accounts_reader(0); // make sure the single new append vec contains all the same accounts let mut two = Vec::default(); diff --git a/accounts-db/src/append_vec.rs b/accounts-db/src/append_vec.rs index 2a28210c76f88c..860cbb80658080 100644 --- a/accounts-db/src/append_vec.rs +++ b/accounts-db/src/append_vec.rs @@ -20,7 +20,7 @@ use { account_storage::stored_account_info::{StoredAccountInfo, StoredAccountInfoWithoutData}, accounts_file::{InternalsForArchive, StorageAccess, StoredAccountsInfo}, buffered_reader::{ - BufReaderWithOverflow, BufferedReader, FileBufRead as _, RequiredLenBufFileRead, + BufReaderWithOverflow, BufferedReader, FileBufRead, RequiredLenBufFileRead, RequiredLenBufRead as _, Stack, }, file_io::{read_into_buffer, write_buffer_to_file}, @@ -942,6 +942,14 @@ impl AppendVec { self.path.as_path() } + /// Returns the `&File` and its size if data is backed as file-io + pub fn file_io_info(&self) -> Option<(&File, usize)> { + match self.backing { + AppendVecFileBacking::File(ref file) => Some((file, self.file_size as usize)), + AppendVecFileBacking::Mmap(_) => None, + } + } + /// help with the math of offsets when navigating the on-disk layout in an AppendVec. /// data is at the end of each account and is variable sized /// the next account is then aligned on a 64 bit boundary. @@ -994,7 +1002,7 @@ impl AppendVec { /// as it can potentially read less and be faster. pub(crate) fn scan_accounts<'a>( &'a self, - reader: &mut impl RequiredLenBufFileRead<'a>, + reader: &mut dyn RequiredLenBufFileRead<'a>, mut callback: impl for<'local> FnMut(Offset, StoredAccountInfo<'local>), ) -> Result<()> { self.scan_accounts_stored_meta(reader, |stored_account_meta| { @@ -1018,7 +1026,7 @@ impl AppendVec { #[allow(clippy::blocks_in_conditions)] pub(crate) fn scan_accounts_stored_meta<'a>( &'a self, - reader: &mut impl RequiredLenBufFileRead<'a>, + reader: &mut dyn RequiredLenBufFileRead<'a>, mut callback: impl for<'local> FnMut(StoredAccountMeta<'local>), ) -> Result<()> { match &self.backing { @@ -1039,7 +1047,7 @@ impl AppendVec { {} } AppendVecFileBacking::File(file) => { - reader.set_file(file, self.len())?; + reader.activate_file(file, self.len())?; let mut min_buf_len = STORE_META_OVERHEAD; loop { @@ -1322,18 +1330,51 @@ impl AppendVec { } } +/// Ratio of number of small files to large files fetched in chunks when scanning accounts. +pub const SCAN_ACCOUNTS_SMALL_TO_LARGE_FILE_RATIO: (usize, usize) = (7, 1); + /// Create a reusable buffered reader tuned for scanning storages with account data. -pub(crate) fn new_scan_accounts_reader<'a>() -> impl RequiredLenBufFileRead<'a> { +/// +/// Providing memlock budget enables use of io_uring reader optimized for batched reads of +/// large/multiple storage files. +pub(crate) fn new_scan_accounts_reader<'a>( + memlock_budget_size: usize, +) -> impl RequiredLenBufFileRead<'a> { // 128KiB covers a reasonably large distribution of typical account sizes. // In a recent sample, 99.98% of accounts' data lengths were less than or equal to 128KiB. const MIN_CAPACITY: usize = 1024 * 128; const MAX_CAPACITY: usize = STORE_META_OVERHEAD + MAX_PERMITTED_DATA_LENGTH as usize; + + #[cfg(target_os = "linux")] + { + use crate::io_uring::sequential_file_reader::SequentialFileReaderBuilder; + + // Small files will each use one `READ_SIZE` buffer, large files get to around 8-10MB, + // try to pick buffer that can hold two `SCAN_ACCOUNTS_SMALL_TO_LARGE_FILE_RATIO` cycles. + const SCAN_ACCOUNTS_BUFFER_SIZE: usize = 32 * 1024 * 1024; + // Vast majority of files are small - compromise between buffer use and faster large reads. + const READ_SIZE: usize = 512 * 1024; + let buf_size = SCAN_ACCOUNTS_BUFFER_SIZE.min(memlock_budget_size); + if buf_size > READ_SIZE { + // scan accounts implementations will submit operations to kernel using + // FileBufRead::add_files_to_prefetch - large submission queue avoids stalls when adding ops. + let ring_qsize = (buf_size.div_ceil(READ_SIZE)) as u32; + let file_reader: Box> = Box::new( + SequentialFileReaderBuilder::new() + .max_iowq_workers(1) + .read_size(READ_SIZE) + .ring_squeue_size(ring_qsize) + .build(buf_size) + .unwrap(), + ); + return BufReaderWithOverflow::new(file_reader, MIN_CAPACITY, MAX_CAPACITY); + } + } + let _ = memlock_budget_size; const BUFFER_SIZE: usize = PAGE_SIZE * 8; - BufReaderWithOverflow::new( - BufferedReader::>::new_stack(), - MIN_CAPACITY, - MAX_CAPACITY, - ) + let file_reader: Box> = + Box::new(BufferedReader::>::new_stack()); + BufReaderWithOverflow::new(file_reader, MIN_CAPACITY, MAX_CAPACITY) } /// The per-account hash, stored in the AppendVec. @@ -1351,6 +1392,7 @@ impl ObsoleteAccountHash { pub mod tests { use { super::{test_utils::*, *}, + crate::accounts_db::ACCOUNTS_DB_CONFIG_FOR_TESTING, assert_matches::assert_matches, memoffset::offset_of, rand::{thread_rng, Rng}, @@ -1662,7 +1704,8 @@ pub mod tests { let av_file = AppendVec::new_from_file(&path.path, av_mmap.len(), StorageAccess::File) .unwrap() .0; - let mut reader = new_scan_accounts_reader(); + let mut reader = + new_scan_accounts_reader(ACCOUNTS_DB_CONFIG_FOR_TESTING.memlock_budget_size); for av in [&av_mmap, &av_file] { let mut index = 0; av.scan_accounts_stored_meta(&mut reader, |v| { @@ -1713,7 +1756,8 @@ pub mod tests { assert_eq!(indexes[0], 0); assert_eq!(av.accounts_count(), size); - let mut reader = new_scan_accounts_reader(); + let mut reader = + new_scan_accounts_reader(ACCOUNTS_DB_CONFIG_FOR_TESTING.memlock_budget_size); let mut sample = 0; let now = Instant::now(); diff --git a/accounts-db/src/buffered_reader.rs b/accounts-db/src/buffered_reader.rs index 5c8d6830b7f717..08a2ccb54686fb 100644 --- a/accounts-db/src/buffered_reader.rs +++ b/accounts-db/src/buffered_reader.rs @@ -74,7 +74,7 @@ pub(crate) trait FileBufRead<'a>: BufRead { /// /// `read_limit` provides a pre-defined limit on the number of bytes that can be read /// from the file (unless EOF is reached). - fn set_file(&mut self, file: &'a File, read_limit: usize) -> io::Result<()>; + fn activate_file(&mut self, file: &'a File, read_limit: usize) -> io::Result<()>; /// Returns the current file offset corresponding to the start of the buffer /// that will be returned by the next call to `fill_buf`. @@ -82,6 +82,42 @@ pub(crate) trait FileBufRead<'a>: BufRead { /// This offset represents the position within the underlying file where data /// will be consumed from. fn get_file_offset(&self) -> usize; + + /// Add `file` reference to the read-ahead queue if implementation supports it. + /// + /// The read finishes when EOF is reached or `read_limit` bytes are read. + /// Multiple files can be added to the reader and they will be read-ahead in FIFO order. + /// The exact moment how and when reads are performed is implementation-specific, but + /// can be expedited by calling `submit_prefetch`. + /// + /// In order to consume prefetched files, call `activate_file` in the same order. + /// + /// Lifetime of reference is tied to the reader's lifetime. + fn add_file_to_prefetch(&mut self, file: &'a File, read_limit: usize) -> io::Result<()>; + + /// Submit all prefetched files for reading. + /// + /// This method can be used for optimizing interaction with the kernel to batch multiple + /// requests (e.g. operations scheduled by `add_file_to_prefetch`) in a single syscall. + fn submit_prefetch(&mut self) -> io::Result<()>; +} + +impl<'a> FileBufRead<'a> for Box + 'a> { + fn activate_file(&mut self, file: &'a File, read_limit: usize) -> io::Result<()> { + self.as_mut().activate_file(file, read_limit) + } + + fn get_file_offset(&self) -> usize { + self.as_ref().get_file_offset() + } + + fn add_file_to_prefetch(&mut self, file: &'a File, read_limit: usize) -> io::Result<()> { + self.as_mut().add_file_to_prefetch(file, read_limit) + } + + fn submit_prefetch(&mut self) -> io::Result<()> { + self.as_mut().submit_prefetch() + } } /// An extension of the `BufRead` trait for readers that require stronger control @@ -151,7 +187,7 @@ impl<'a, T: Backing> BufferedReader<'a, T> { } impl<'a, T: Backing> FileBufRead<'a> for BufferedReader<'a, T> { - fn set_file(&mut self, file: &'a File, read_limit: usize) -> io::Result<()> { + fn activate_file(&mut self, file: &'a File, read_limit: usize) -> io::Result<()> { self.do_set_file(file, read_limit); Ok(()) } @@ -164,6 +200,15 @@ impl<'a, T: Backing> FileBufRead<'a> for BufferedReader<'a, T> { self.file_last_offset + self.buf_valid_bytes.start } } + + fn add_file_to_prefetch(&mut self, _file: &'a File, _read_limit: usize) -> io::Result<()> { + // No prefetching in this implementation + Ok(()) + } + + fn submit_prefetch(&mut self) -> io::Result<()> { + Ok(()) + } } impl BufferedReader<'_, T> @@ -350,14 +395,22 @@ impl BufRead for BufReaderWithOverflow { } impl<'a, R: FileBufRead<'a>> FileBufRead<'a> for BufReaderWithOverflow { - fn set_file(&mut self, file: &'a File, read_limit: usize) -> io::Result<()> { + fn activate_file(&mut self, file: &'a File, read_limit: usize) -> io::Result<()> { self.overflow_buf.clear(); - self.reader.set_file(file, read_limit) + self.reader.activate_file(file, read_limit) } fn get_file_offset(&self) -> usize { self.reader.get_file_offset() - self.overflow_buf.len() } + + fn add_file_to_prefetch(&mut self, file: &'a File, read_limit: usize) -> io::Result<()> { + self.reader.add_file_to_prefetch(file, read_limit) + } + + fn submit_prefetch(&mut self) -> io::Result<()> { + self.reader.submit_prefetch() + } } /// Support large `required_len` (within configured limits) by using overflow buffer @@ -408,10 +461,11 @@ pub fn large_file_buf_reader(path: &Path, buf_size: usize) -> io::Result = Some(Duration::from_millis(10)); /// Multiple files creator with `io_uring` queue for open -> write -> close @@ -84,11 +85,18 @@ impl<'a, B: AsMut<[u8]>> IoUringFileCreator<'a, B> { write_capacity: usize, file_complete: F, ) -> io::Result { - // Let submission queue hold half of buffers before we explicitly syscall - // to submit them for writing (lets kernel start processing before we run out of buffers, - // but also amortizes number of `submit` syscalls made). - let ring_qsize = (buffer.as_mut().len() / write_capacity / 2).max(1) as u32; - let ring = IoUring::builder().build(ring_qsize)?; + // Let as many inflight operations as number of buffers (in practice some ops + // use buffers, some only update metadata, but those usually block some buffer(s) + // in backlog). + let ring_qsize = (buffer.as_mut().len() / write_capacity).max(1) as u32; + // Enable sqpoll since: + // - we push a lot of operations, many of them are fast + // - file open ops block buffers stored in backlog, so delaying those is costly + // - there seem to be interference of reads landing in the same kernel worker submitted + // by reader not using sqpoll, but running on the same user thread + let ring = IoUring::builder() + .setup_sqpoll(SQPOLL_IDLE_TIMEOUT) + .build(ring_qsize)?; // Maximum number of spawned [bounded IO, unbounded IO] kernel threads, we don't expect // any unbounded work, but limit it to 1 just in case (0 leaves it unlimited). ring.submitter() @@ -198,7 +206,8 @@ impl IoUringFileCreator<'_, B> { // Safety: the buffer points to the valid memory backed by `self._backing_buffer`. // It's obtained from the queue of free buffers and is written to exclusively // here before being handled to the kernel or backlog in `file`. - let mut_slice = unsafe { slice::from_raw_parts_mut(buf.as_mut_ptr(), buf.len()) }; + let mut_slice = + unsafe { slice::from_raw_parts_mut(buf.as_mut_ptr(), buf.len() as usize) }; let len = src.read(mut_slice)?; if len == 0 { diff --git a/accounts-db/src/io_uring/memory.rs b/accounts-db/src/io_uring/memory.rs index ecaac6003bd13e..91a90e1f696646 100644 --- a/accounts-db/src/io_uring/memory.rs +++ b/accounts-db/src/io_uring/memory.rs @@ -138,7 +138,7 @@ impl DerefMut for PageAlignedMemory { #[derive(Debug)] pub(super) struct FixedIoBuffer { ptr: *mut u8, - size: usize, + size: u32, io_buf_index: Option, } @@ -164,19 +164,27 @@ impl FixedIoBuffer { let buf_start = buffer.as_ptr() as usize; buffer.chunks_exact_mut(chunk_size).map(move |buf| { + assert!( + buf.len() <= u32::MAX as usize, + "buffer chunk size is too large" + ); let io_buf_index = (buf.as_ptr() as usize - buf_start) / FIXED_BUFFER_LEN; Self { ptr: buf.as_mut_ptr(), - size: buf.len(), + size: buf.len() as u32, io_buf_index: Some(io_buf_index as u16), } }) } - pub fn len(&self) -> usize { + pub fn len(&self) -> u32 { self.size } + pub fn as_ptr(&self) -> *const u8 { + self.ptr + } + /// Safety: while just returning without dereferencing a pointer is safe, this is marked unsafe /// so that the callers are encouraged to reason about the lifetime of the buffer. pub unsafe fn as_mut_ptr(&self) -> *mut u8 { @@ -188,16 +196,6 @@ impl FixedIoBuffer { self.io_buf_index } - /// Return a clone of `self` reduced to specified `size` - pub fn into_shrinked(self, size: usize) -> Self { - assert!(size <= self.size); - Self { - ptr: self.ptr, - size, - io_buf_index: self.io_buf_index, - } - } - /// Register provided buffer as fixed buffer in `io_uring`. pub unsafe fn register>( buffer: &mut [u8], @@ -214,12 +212,6 @@ impl FixedIoBuffer { } } -impl AsRef<[u8]> for FixedIoBuffer { - fn as_ref(&self) -> &[u8] { - unsafe { slice::from_raw_parts(self.ptr, self.size) } - } -} - /// Check kernel memory lock limit and increase it if necessary. /// /// Returns `Err` when current limit is below `min_required` and cannot be increased. diff --git a/accounts-db/src/io_uring/sequential_file_reader.rs b/accounts-db/src/io_uring/sequential_file_reader.rs index 7934c533604a22..f5499620c8eb28 100644 --- a/accounts-db/src/io_uring/sequential_file_reader.rs +++ b/accounts-db/src/io_uring/sequential_file_reader.rs @@ -3,17 +3,22 @@ use { memory::{FixedIoBuffer, LargeBuffer}, IO_PRIO_BE_HIGHEST, }, + crate::buffered_reader::FileBufRead, agave_io_uring::{Completion, Ring, RingOp}, io_uring::{opcode, squeue, types, IoUring}, std::{ + collections::VecDeque, fs::{File, OpenOptions}, - io::{self, BufRead, Cursor, Read}, + io::{self, BufRead, Read}, + marker::PhantomData, mem, + ops::{Deref, DerefMut}, os::{ fd::{AsRawFd as _, RawFd}, unix::fs::OpenOptionsExt, }, path::Path, + slice, }, }; @@ -21,60 +26,103 @@ use { // but peak at 1MiB. Also compare with particular NVME parameters, e.g. // 32 pages (Maximum Data Transfer Size) * page size (MPSMIN = Memory Page Size) = 128KiB. pub const DEFAULT_READ_SIZE: usize = 1024 * 1024; -// For large file we don't really use workers as few regularly submitted requests get handled -// within sqpoll thread. Allow some workers just in case, but limit them. -const MAX_IOWQ_WORKERS: u32 = 2; -/// Reader for non-seekable files. -/// -/// Implements read-ahead using io_uring. -pub struct SequentialFileReader { - // Note: state is tied to `backing_buffer` and contains unsafe pointer references to it - inner: Ring, - /// Owned buffer used (chunked into `FixedIoBuffer` items) across lifespan of `inner` - /// (should get dropped last) - _backing_buffer: B, +/// One IO worker is able to provide approximately 0.5GiB/s read-ahead throughput from SSD +/// without causing application code to wait for data (subject to hardware and read size), +/// so this default should allow single threaded reader to provide >1GiB/s. +const DEFAULT_MAX_IOWQ_WORKERS: u32 = 2; + +pub struct SequentialFileReaderBuilder { + read_size: usize, + max_iowq_workers: u32, + ring_squeue_size: Option, } -impl SequentialFileReader { - /// Create a new `SequentialFileReader` for the given `path` using internally allocated - /// buffer of specified `buf_size` and default read size. - pub fn with_capacity(buf_size: usize, path: impl AsRef) -> io::Result { - Self::with_buffer(path, LargeBuffer::new(buf_size), DEFAULT_READ_SIZE) +/// Utility for building `SequentialFileReader` with specified tuning options. +impl SequentialFileReaderBuilder { + pub fn new() -> Self { + Self { + read_size: DEFAULT_READ_SIZE, + max_iowq_workers: DEFAULT_MAX_IOWQ_WORKERS, + ring_squeue_size: None, + } } -} -/// Holds the state of the reader. -struct SequentialFileReaderState { - file: File, - read_capacity: usize, - offset: usize, - eof_buf_index: Option, - buffers: Vec, - current_buf: usize, -} + /// Override the default size of a single IO read operation + /// + /// This influences the concurrency, since buffer is divided into chunks of this size. + pub fn read_size(mut self, read_size: usize) -> Self { + self.read_size = read_size; + self + } -impl> SequentialFileReader { - /// Create a new `SequentialFileReader` for the given file using provided backing `buffer`. + /// Override the default number of kernel IO worker threads + /// + /// Kernel threads are relatively cheap and in case of read IO they don't lock or process data, + /// so the actual CPU usage is low. They will regularly take a bit of cycles on random CPUs though. + /// + /// The default can be adjusted to: + /// - higher value when using single reader with very cheap data processing code to increase read throughput + /// - lower value when using multiple readers or expensive data processing code to reduce thrashing CPUs + pub fn max_iowq_workers(mut self, workers: u32) -> Self { + self.max_iowq_workers = workers; + self + } + + /// Override the ring submission queue size + /// + /// Since sqpoll is not used by the reader, this impacts how frequently we need to `submit` + /// operations for kernel to start executing them. By default reader uses heuristic to + /// amortize number of submissions / batch of pushed operations. /// - /// `buffer` is the internal buffer used for reading. It must be at least `read_capacity` long. - /// The reader will execute multiple `read_capacity` sized reads in parallel to fill the buffer. - pub fn with_buffer( - path: impl AsRef, + /// This parameter can be adjusted to: + /// - low values in order to achieve higher frequency of `submit` syscalls (kernel starts + /// executing operations sooner) + /// - higher values to avoid necessity of `submit` when pushing operations (especially when + /// user manually triggers submission by calling `add_files_to_prefetch`) + pub fn ring_squeue_size(mut self, ring_squeue_size: u32) -> Self { + self.ring_squeue_size = Some(ring_squeue_size); + self + } + + /// Build a new `SequentialFileReader` with internally allocated buffer. + /// + /// Buffer will hold at least `buf_capacity` bytes (increased to `read_size` if it's lower). + /// + /// Initially the reader is idle and starts reading after the first file is added. + /// The reader will execute multiple `read_size` sized reads in parallel to fill the buffer. + pub fn build<'a>( + self, + buf_capacity: usize, + ) -> io::Result> { + let buf_capacity = buf_capacity.max(self.read_size); + let buffer = LargeBuffer::new(buf_capacity); + self.build_with_buffer(buffer) + } + + /// Build a new `SequentialFileReader` with a user-supplied buffer + /// + /// `buffer` is the internal buffer used for reading. It must be at least `read_size` long. + /// + /// Initially the reader is idle and starts reading after the first file is added. + /// The reader will execute multiple `read_size` sized reads in parallel to fill the buffer. + pub fn build_with_buffer<'a, B: AsMut<[u8]>>( + self, mut buffer: B, - read_capacity: usize, - ) -> io::Result { + ) -> io::Result> { let buf_capacity = buffer.as_mut().len(); // Let all buffers be submitted for reading at any time - let max_inflight_ops = (buf_capacity / read_capacity) as u32; + let max_inflight_ops = (buf_capacity / self.read_size) as u32; // Completions arrive in bursts (batching done by the disk controller and the kernel). // By submitting smaller chunks we decrease the likelihood that we stall on a full completion queue. // Also, in order to keep some operations submitted at all times, we will `submit` them half-way - // through the buffer (at the cost of doubling syscalls) to let kernel work on one half while the other - // half is read by the user. - let ring_squeue_size = (max_inflight_ops / 2).max(1); + // (unless overriden with custom squeue size) through the buffer (at the cost of doubling syscalls) + // to let kernel work on one half while the other half is read by the user. + let ring_squeue_size = self + .ring_squeue_size + .unwrap_or((max_inflight_ops / 2).max(1)); // agave io_uring uses cqsize to define state slab size, so cqsize == max inflight ops let ring = io_uring::IoUring::builder() @@ -84,201 +132,487 @@ impl> SequentialFileReader { // Maximum number of spawned [bounded IO, unbounded IO] kernel threads, we don't expect // any unbounded work, but limit it to 1 just in case (0 leaves it unlimited). ring.submitter() - .register_iowq_max_workers(&mut [MAX_IOWQ_WORKERS, 1])?; - Self::with_buffer_and_ring(buffer, ring, path, read_capacity) + .register_iowq_max_workers(&mut [self.max_iowq_workers, 1])?; + + SequentialFileReader::with_buffer_and_ring(buffer, self.read_size, ring) } +} + +/// Reader for non-seekable files. +/// +/// Implements read-ahead using io_uring. +pub struct SequentialFileReader<'a, B> { + // Note: inner state is tied to `_backing_buffer` - contains unsafe pointer references + // to the buffer. + inner: Ring, + state: SequentialFileReaderState, + /// Owned buffer used (chunked into `FixedIoBuffer` items) across lifespan of `inner` + /// (should get dropped last) + _backing_buffer: B, + _phantom: PhantomData<&'a ()>, +} - /// Create a new `SequentialFileReader` for the given file, using a custom - /// ring instance. +impl SequentialFileReader<'_, B> { + /// Create a new `SequentialFileReader` using a custom ring instance. fn with_buffer_and_ring( mut backing_buffer: B, - ring: IoUring, - path: impl AsRef, read_capacity: usize, - ) -> io::Result { + io_uring: IoUring, + ) -> io::Result + where + B: AsMut<[u8]>, + { let buffer = backing_buffer.as_mut(); assert!(buffer.len() >= read_capacity, "buffer too small"); let read_aligned_buf_len = buffer.len() / read_capacity * read_capacity; let buffer = &mut buffer[..read_aligned_buf_len]; - let file = OpenOptions::new() - .read(true) - .custom_flags(libc::O_NOATIME) - .open(path)?; // Safety: buffers contain unsafe pointers to `buffer`, but we make sure they are // dropped before `backing_buffer` is dropped. let buffers = unsafe { FixedIoBuffer::split_buffer_chunks(buffer, read_capacity) } .map(ReadBufState::Uninit) .collect(); - let ring = Ring::new( - ring, - SequentialFileReaderState { - file, - read_capacity, - buffers, - offset: 0, - eof_buf_index: None, - current_buf: 0, - }, - ); + let inner = Ring::new(io_uring, BuffersState(buffers)); // Safety: kernel holds unsafe pointers to `buffer`, struct field declaration order // guarantees that the ring is destroyed before `_backing_buffer` is dropped. - unsafe { FixedIoBuffer::register(buffer, &ring)? }; + unsafe { FixedIoBuffer::register(buffer, &inner)? }; - let mut reader = Self { - inner: ring, + Ok(Self { + inner, + state: SequentialFileReaderState::default(), _backing_buffer: backing_buffer, - }; - - // Start reading all buffers. - for i in 0..reader.inner.context().buffers.len() { - reader.start_reading_buf(i)?; - } - // Make sure work is started in case submission queue is large and we - // never submitted work when adding buffers. - reader.inner.submit()?; + _phantom: PhantomData, + }) + } - Ok(reader) + /// Opens file under `path`, check its metadata to determine read limit and add it to the reader. + /// + /// See `add_owned_file_to_prefetch` for more details. + pub fn add_path_to_prefetch(&mut self, path: impl AsRef) -> io::Result<()> { + let file = OpenOptions::new() + .read(true) + .custom_flags(libc::O_NOATIME) + .open(path)?; + let len = file.metadata()?.len() as usize; + self.add_owned_file_to_prefetch(file, len) } - /// Start reading into the buffer at `index`. + /// Add `file` to read. Starts reading the file as soon as a buffer is available. /// - /// This is called at start and as soon as a buffer is fully consumed by BufRead::fill_buf(). + /// The read finishes when EOF is reached or `read_limit` bytes are read. + /// Multiple files can be added to the reader and they will be read-ahead in FIFO order. /// - /// Reads [state.offset, state.offset + state.read_capacity) from the file into - /// state.buffers[index]. Once a read is complete, ReadOp::complete(state) is called to update - /// the state. - fn start_reading_buf(&mut self, index: usize) -> io::Result<()> { - let SequentialFileReaderState { - buffers, - current_buf: _, - file, - offset, - read_capacity, - eof_buf_index: _, - } = &mut self.inner.context_mut(); - let read_buf = mem::replace(&mut buffers[index], ReadBufState::Reading); - match read_buf { - ReadBufState::Uninit(buf) => { - let op = ReadOp { - fd: file.as_raw_fd(), - buf, - buf_off: 0, - file_off: *offset, - read_len: *read_capacity, - reader_buf_index: index, - }; - - // We always advance by `read_capacity`. If we get a short read, we submit a new - // read for the remaining data. See ReadOp::complete(). - *offset += *read_capacity; - - // Safety: - // The op points to a buffer which is guaranteed to be valid for - // the lifetime of the operation - self.inner.push(op)?; + /// Reader takes ownership of the file and will drop it after it's done reading + /// and `move_to_next_file` is called. + pub fn add_owned_file_to_prefetch(&mut self, file: File, read_limit: usize) -> io::Result<()> { + self.add_file_by_fd(file.as_raw_fd(), read_limit)?; + self.state.owned_files.push_back(file); + Ok(()) + } + + /// Caller must ensure that the file is not closed while the reader is using it. + fn add_file_by_fd(&mut self, fd: RawFd, read_limit: usize) -> io::Result<()> { + self.state.files.push_back(FileState::new(fd, read_limit)); + + if self.state.all_buffers_used(self.inner.context()) { + // Just added file to backlog, no reads can be started yet. + return Ok(()); + } + + // There are free buffers, so we can start reading the new file. + self.state.next_read_file_index = + Some(self.state.next_read_file_index.map_or(0, |idx| idx + 1)); + + // Start reading as many buffers as necessary for queued files. + self.try_schedule_new_ops() + } + + /// When reading multiple files, this method moves the reader to the next file. + fn move_to_next_file(&mut self) -> io::Result<()> { + let state = &mut self.state; + + let Some(removed_file) = state.files.pop_front() else { + return Ok(()); + }; + + // Always reset in-file and in-buffer state + state.current_offset = 0; + state.current_buf_pos = 0; + state.current_buf_len = 0; + state.left_to_consume = 0; + + // Reclaim current and all subsequent unread buffers of removed file as uninitialized. + let sentinel_buf_index = state + .files + .front() + .and_then(|f| f.start_buf_index) + .unwrap_or(state.current_buf_index); + let num_bufs = self.inner.context().len(); + loop { + self.inner.process_completions()?; + let current_buf = self.inner.context_mut().get_mut(state.current_buf_index); + if current_buf.is_reading() { + // Still no data, wait for more completions, but submit in case there are queued + // entries in the submission queue. + self.inner.submit()?; + continue; + } + current_buf.transition_to_uninit(); + + let next_buf_index = (state.current_buf_index + 1) % num_bufs; + state.current_buf_index = next_buf_index; + if sentinel_buf_index == next_buf_index { + break; + } + } + + if state + .owned_files + .front() + .is_some_and(|f| removed_file.is_same_file(f)) + { + state.owned_files.pop_front(); + } + + if let Some(next_file_index) = state.next_read_file_index.as_mut() { + // Since file was removed from front, all indices are shifted by one + state.next_read_file_index = next_file_index.checked_sub(1); + if state.next_read_file_index.is_none() { + // The removed file was the current one being read + if state.files.is_empty() { + // Reader is empty, reset buf indices to initial values + state.current_buf_index = 0; + state.next_read_buf_index = 0; + } else { + // There are other files to read, start with the new first file + state.next_read_file_index = Some(0); + } } - _ => unreachable!("called start_reading_buf on a non-empty buffer"), + } + + self.try_schedule_new_ops() + } + + fn try_schedule_new_ops(&mut self) -> io::Result<()> { + // Start reading as many buffers as necessary for queued files. + while let Some(op) = self.state.next_read_op(self.inner.context_mut()) { + self.inner.push(op)?; } Ok(()) } + + fn wait_current_buf_full(&mut self) -> io::Result { + if self.state.files.is_empty() { + return Ok(false); + } + let num_bufs = self.inner.context().len(); + loop { + self.inner.process_completions()?; + + let state = &mut self.state; + let current_buf = &mut self.inner.context_mut().get_mut(state.current_buf_index); + match current_buf { + ReadBufState::Full { buf, eof_pos } => { + if state.current_buf_len == 0 { + state.current_buf_len = eof_pos.unwrap_or(buf.len()); + if state.left_to_consume > 0 { + let consumed = state + .left_to_consume + .min((state.current_buf_len - state.current_buf_pos) as usize); + state.left_to_consume -= consumed; + state.current_buf_pos += consumed as u32; + } + } + + // Note: we might have consumed whole buf from `left_to_consume` + if state.current_buf_pos < state.current_buf_len { + // We have some data available. + return Ok(true); + } + + if eof_pos.is_some() { + // Last filled buf for the whole file (until `move_to_next_file` is called). + return Ok(false); + } + // We have finished consuming this buffer - reset its state. + current_buf.transition_to_uninit(); + + // Next `fill_buf` will use subsequent buffer. + state.move_to_next_buf(num_bufs); + + // A buffer was freed, so try to queue up next read. + self.try_schedule_new_ops()?; + } + + ReadBufState::Reading => { + // Still no data, wait for more completions, but submit in case there are queued + // entries in the submission queue. + self.inner.submit()? + } + + ReadBufState::Uninit(_) => unreachable!("should be initialized"), + } + // Move to the next buffer and check again whether we have data. + } + } } -// BufRead requires Read, but we never really use the Read interface. -impl> Read for SequentialFileReader { +impl> Read for SequentialFileReader<'_, B> { fn read(&mut self, buf: &mut [u8]) -> io::Result { let available = self.fill_buf()?; - if available.is_empty() { - return Ok(0); // EOF. - } - let bytes_to_read = available.len().min(buf.len()); + if bytes_to_read == 0 { + return Ok(0); // EOF or empty `buf` + } buf[..bytes_to_read].copy_from_slice(&available[..bytes_to_read]); self.consume(bytes_to_read); Ok(bytes_to_read) } } -impl> BufRead for SequentialFileReader { +impl> BufRead for SequentialFileReader<'_, B> { fn fill_buf(&mut self) -> io::Result<&[u8]> { - let _have_data = loop { - let state = self.inner.context_mut(); - let num_buffers = state.buffers.len(); - let read_buf = &mut state.buffers[state.current_buf]; - match read_buf { - ReadBufState::Full(ref mut cursor) => { - if !cursor.fill_buf()?.is_empty() { - // we have some data available - break true; - } - let index = state.current_buf; + if self.state.current_buf_pos == self.state.current_buf_len + && !self.wait_current_buf_full()? + { + return Ok(&[]); + } - if let Some(eof_index) = state.eof_buf_index { - if eof_index == index { - // This is the last filled buf for the whole file - return Ok(&[]); - } - // Some other buffer encountered EOF: move on, but don't issue new read. - state.current_buf = (state.current_buf + 1) % num_buffers; + // At this point we must have data or be at EOF. + let current_buf = self.inner.context().get_fast(self.state.current_buf_index); + Ok(current_buf.slice(self.state.current_buf_pos, self.state.current_buf_len)) + } + + fn consume(&mut self, amt: usize) { + self.state.consume(amt); + } +} + +impl<'a, B: AsMut<[u8]>> FileBufRead<'a> for SequentialFileReader<'a, B> { + fn activate_file(&mut self, file: &'a File, read_limit: usize) -> io::Result<()> { + while self + .state + .files + .front() + .is_some_and(|file_state| !file_state.is_same_file(file)) + { + self.move_to_next_file()?; + } + if self.state.files.is_empty() { + self.add_file_to_prefetch(file, read_limit)?; + } + Ok(()) + } + + fn get_file_offset(&self) -> usize { + self.state.current_offset + } + + fn add_file_to_prefetch(&mut self, file: &'a File, read_limit: usize) -> io::Result<()> { + self.add_file_by_fd(file.as_raw_fd(), read_limit) + } + + fn submit_prefetch(&mut self) -> io::Result<()> { + self.inner.submit() + } +} + +/// Holds the state of all the buffers that may be submitted to the kernel for reading. +struct BuffersState(Box<[ReadBufState]>); + +impl BuffersState { + fn len(&self) -> u16 { + self.0.len() as u16 + } + + fn get_mut(&mut self, index: u16) -> &mut ReadBufState { + &mut self.0[index as usize] + } + + #[inline] + fn get_fast(&self, index: u16) -> &ReadBufState { + debug_assert!(index < self.len()); + // Perf: skip bounds check for performance + unsafe { self.0.get_unchecked(index as usize) } + } +} + +impl Deref for BuffersState { + type Target = [ReadBufState]; + + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +impl DerefMut for BuffersState { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.0 + } +} + +/// Holds the state of the reader. +#[derive(Debug, Default)] +struct SequentialFileReaderState { + // Note: file states operate on file descriptors of files that are assumed to be open, + // which is guaranteed either by them being in `owned_files` or in case of file references + // because they are added with reader's 'a lifetime. + files: VecDeque, + + /// Amount of bytes left to consume from next buffer(s) before returning them in `fill_buf()`. + /// This is necessary to handle `consume()` calls beyond the current buffer. + left_to_consume: usize, + /// Index of `BuffersState` buffer to consume data from (0 if no file is being read) + current_buf_index: u16, + /// Position in buffer (pointed by `current_buf_index`) to consume data from + current_buf_pos: u32, + /// Cached length of the current buffer (0 until `wait_current_buf_full` initializes it) + current_buf_len: u32, + /// File offset of the next `fill_buf()` buffer available to consume + current_offset: usize, + + /// Index in `self.files` of the file that is currently being read (can generate new read ops). + next_read_file_index: Option, + /// Index of `BuffersState` buffer that can be used for the next read operation. + next_read_buf_index: u16, + + owned_files: VecDeque, +} + +impl SequentialFileReaderState { + fn consume(&mut self, amt: usize) { + if amt == 0 || self.files.is_empty() { + return; + } + self.current_offset += amt; + + let unconsumed_buf_len = (self.current_buf_len - self.current_buf_pos) as usize; + if amt <= unconsumed_buf_len { + self.current_buf_pos += amt as u32; + } else { + self.current_buf_pos = self.current_buf_len; + // Keep track of any bytes left to consume beyond current buffer, they will be + // accounted for during next `wait_current_buf_full` call. + self.left_to_consume += amt - unconsumed_buf_len; + } + } + + /// Return the next read operation for the reader. + /// + /// If all buffers are used or last file is already (being) read, returns `None`. + /// + /// Reads are issued for files added into the reader from first file at position 0 + /// to its limit / EOF and then for any subsequent files. + fn next_read_op(&mut self, bufs: &mut [ReadBufState]) -> Option { + if self.all_buffers_used(bufs) { + return None; + } + let num_bufs = bufs.len() as u16; + loop { + let read_file_index = self.next_read_file_index?; + match self.files[read_file_index].next_read_op(self.next_read_buf_index, bufs) { + Some(op) => { + self.next_read_buf_index = (self.next_read_buf_index + 1) % num_bufs; + return Some(op); + } + None => { + // Last read file reached its limit, try to move to the next file + if read_file_index < self.files.len() - 1 { + self.next_read_file_index = Some(read_file_index + 1); } else { - // we have finished consuming this buffer, queue the next read - let cursor = mem::replace(cursor, Cursor::new(FixedIoBuffer::empty())); - let buf = cursor.into_inner(); - - // The very last read when we hit EOF could return less than `read_capacity`, in - // which case what's in the cursor is shorter than `read_capacity` and for - // strict correctness we should reset the length. - // - // Note though that once we hit EOF we don't queue any more reads, so even if we - // didn't reset the length it wouldn't matter. - debug_assert!(buf.len() == state.read_capacity); - - state.buffers[index] = ReadBufState::Uninit(buf); - state.current_buf = (state.current_buf + 1) % num_buffers; - - self.start_reading_buf(index)?; + return None; } - - // move to the next buffer and check again whether we have data - continue; } - ReadBufState::Uninit(_) => unreachable!("should be initialized"), - _ => break false, } - }; + } + } - loop { - self.inner.process_completions()?; - let state = self.inner.context(); + fn move_to_next_buf(&mut self, num_bufs: u16) { + self.current_buf_index = (self.current_buf_index + 1) % num_bufs; + self.current_buf_pos = 0; + // Buffer might still be reading, len will be intialized on first `wait_current_buf_full` + self.current_buf_len = 0; + } - match &state.buffers[state.current_buf] { - ReadBufState::Full(_) => break, - ReadBufState::Uninit(_) => unreachable!("should be initialized"), - // Still no data, wait for more completions, but submit in case the SQPOLL - // thread is asleep and there are queued entries in the submission queue. - ReadBufState::Reading => self.inner.submit()?, - } - } + /// Returns `true` if there are no more buffers available for reading. + fn all_buffers_used(&self, bufs: &[ReadBufState]) -> bool { + bufs[self.next_read_buf_index as usize].is_used() + } +} - // At this point we must have data or be at EOF. - let state = self.inner.context_mut(); - match &mut state.buffers[state.current_buf] { - ReadBufState::Full(cursor) => Ok(cursor.fill_buf()?), - // after the loop above we either have some data or we must be at EOF - _ => unreachable!(), +/// Holds the state of a single file being read. +#[derive(Debug)] +struct FileState { + raw_fd: RawFd, + /// Limit file offset to read up to. + read_limit: usize, + /// Offset of the next byte to read from the file + next_read_offset: usize, + /// When the file is possible to read for the first time, it should be read from this buffer index + start_buf_index: Option, +} + +impl FileState { + fn new(raw_fd: RawFd, read_limit: usize) -> Self { + Self { + raw_fd, + read_limit, + next_read_offset: 0, + start_buf_index: None, } } - fn consume(&mut self, amt: usize) { - let state = self.inner.context_mut(); - match &mut state.buffers[state.current_buf] { - ReadBufState::Full(cursor) => cursor.consume(amt), - _ => assert_eq!(amt, 0), + fn is_same_file(&self, file: &File) -> bool { + self.raw_fd == file.as_raw_fd() + } + + /// Create a new read operation into the `bufs[index]` buffer and update file state. + /// + /// This is called whenever new reads can be scheduled (on added file or freed buffer). + /// + /// Returns `ReadOp` that will read + /// [self.next_read_offset, self.next_read_offset + min(buf len, self.read_limit)) + /// from the file into `bufs[index]`. Once the read is complete the buffer changes into + /// `Full` state and can be consumed. + fn next_read_op(&mut self, index: u16, bufs: &mut [ReadBufState]) -> Option { + let Self { + start_buf_index, + raw_fd, + next_read_offset: offset, + read_limit, + } = self; + let left_to_read = read_limit.saturating_sub(*offset); + if left_to_read == 0 { + return None; + } + + let buf = bufs[index as usize].transition_to_reading(); + + let read_len = left_to_read.min(buf.len() as usize); + let op = ReadOp { + fd: types::Fd(*raw_fd), + buf, + buf_offset: 0, + file_offset: *offset, + read_len: read_len as u32, // it's trimmed by u32 buf.len() above + is_last_read: left_to_read == read_len, + reader_buf_index: index, + }; + // Mark file state to start reading at `index` buffer + if start_buf_index.is_none() { + *start_buf_index = Some(index); } + + // We always advance by `read_len`. If we get a short read, we submit a new + // read for the remaining data. See ReadOp::complete(). + *offset += read_len; + + Some(op) } } +#[derive(Debug)] enum ReadBufState { /// The buffer is pending submission to read queue (on initialization and /// in transition from `Full` to `Reading`). @@ -287,75 +621,98 @@ enum ReadBufState { /// the ring. Reading, /// The buffer is filled and ready to be consumed. - Full(Cursor), + Full { + buf: FixedIoBuffer, + /// Position in `buf` at which 0-sized read (or requested read limit) was reached + eof_pos: Option, + }, } -impl std::fmt::Debug for ReadBufState { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { +impl ReadBufState { + fn is_used(&self) -> bool { + matches!(self, ReadBufState::Reading | ReadBufState::Full { .. }) + } + + fn is_reading(&self) -> bool { + matches!(self, ReadBufState::Reading) + } + + #[inline] + fn slice(&self, start_pos: u32, end_pos: u32) -> &[u8] { match self { - Self::Uninit(buf) => f - .debug_struct("Uninit") - .field("io_buf_index", &buf.io_buf_index()) - .finish(), - Self::Reading => write!(f, "Reading"), - Self::Full(cursor) => f - .debug_struct("Full") - .field("io_buf_index", &cursor.get_ref().io_buf_index()) - .finish(), + Self::Full { buf, eof_pos } => { + debug_assert!(eof_pos.unwrap_or(buf.len()) >= end_pos); + let limit = (end_pos - start_pos) as usize; + unsafe { slice::from_raw_parts(buf.as_ptr().add(start_pos as usize), limit) } + } + Self::Uninit(_) | Self::Reading => { + unreachable!("must call as_slice only on full buffer") + } } } + + /// Marks the buffer as uninitialized (after it has been fully consumed). + fn transition_to_uninit(&mut self) { + match self { + Self::Uninit(_) => (), + Self::Reading => unreachable!("cannot reset a buffer that has pending read"), + Self::Full { buf, .. } => { + *self = ReadBufState::Uninit(mem::replace(buf, FixedIoBuffer::empty())); + } + } + } + + /// Marks the buffer as being read and returns underlying buffer to pass to `ReadOp`. + #[must_use] + fn transition_to_reading(&mut self) -> FixedIoBuffer { + let Self::Uninit(buf) = mem::replace(self, Self::Reading) else { + unreachable!("buffer should be uninitialized") + }; + buf + } } +#[derive(Debug)] struct ReadOp { - fd: RawFd, + fd: types::Fd, buf: FixedIoBuffer, /// This is the offset inside the buffer. It's typically 0, but can be non-zero if a previous /// read returned less data than requested (because of EINTR or whatever) and we submitted a new /// read for the remaining data. - buf_off: usize, + buf_offset: u32, /// The offset in the file. - file_off: usize, + file_offset: usize, /// The length of the read. This is typically `read_capacity` but can be less if a previous read - /// returned less data than requested. - read_len: usize, + /// returned less data than requested or `file_offset` is close to the end of read limit. + read_len: u32, + /// Indicates that after reading `read_len` we have reached configured read limit. + is_last_read: bool, /// This is the index of the buffer in the reader's state. It's used to update the state once the /// read completes. - reader_buf_index: usize, + reader_buf_index: u16, } -impl std::fmt::Debug for ReadOp { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - f.debug_struct("ReadOp") - .field("fd", &self.fd) - .field("buf_off", &self.buf_off) - .field("io_buf_index", &self.buf.io_buf_index()) - .field("file_off", &self.file_off) - .field("read_len", &self.read_len) - .field("reader_buf_index", &self.reader_buf_index) - .finish() - } -} - -impl RingOp for ReadOp { +impl RingOp for ReadOp { fn entry(&mut self) -> squeue::Entry { let ReadOp { fd, - buf, - buf_off, - file_off, + ref mut buf, + buf_offset, + file_offset, read_len, + is_last_read: _, reader_buf_index: _, - } = self; - debug_assert!(*buf_off + *read_len <= buf.len()); + } = *self; + debug_assert!(buf_offset + read_len <= buf.len()); opcode::ReadFixed::new( - types::Fd(*fd), + fd, // Safety: we assert that the buffer is large enough to hold the read. - unsafe { buf.as_mut_ptr().byte_add(*buf_off) }, - *read_len as u32, + unsafe { buf.as_mut_ptr().byte_add(buf_offset as usize) }, + read_len, buf.io_buf_index() .expect("should have a valid fixed buffer"), ) - .offset(*file_off as u64) + .offset(file_offset as u64) .ioprio(IO_PRIO_BE_HIGHEST) .build() .flags(squeue::Flags::ASYNC) @@ -363,44 +720,45 @@ impl RingOp for ReadOp { fn complete( &mut self, - completion: &mut Completion, + completion: &mut Completion, res: io::Result, ) -> io::Result<()> { let ReadOp { fd, - buf, - buf_off, - file_off, + ref mut buf, + buf_offset, + file_offset, read_len, + is_last_read, reader_buf_index, - } = self; - let reader_state = completion.context_mut(); + } = *self; + let buffers = completion.context_mut(); - let last_read_len = res? as usize; - if last_read_len == 0 { - reader_state.eof_buf_index = Some(*reader_buf_index); - } + let last_read_len = res? as u32; - let total_read_len = *buf_off + last_read_len; + let total_read_len = buf_offset + last_read_len; let buf = mem::replace(buf, FixedIoBuffer::empty()); - if last_read_len > 0 && last_read_len < *read_len { + if last_read_len > 0 && last_read_len < read_len { // Partial read, retry the op with updated offsets let op: ReadOp = ReadOp { - fd: *fd, + fd, buf, - buf_off: total_read_len, - file_off: *file_off + last_read_len, - read_len: *read_len - last_read_len, - reader_buf_index: *reader_buf_index, + buf_offset: total_read_len, + file_offset: file_offset + last_read_len as usize, + read_len: read_len - last_read_len, + reader_buf_index, + is_last_read, }; // Safety: // The op points to a buffer which is guaranteed to be valid for the // lifetime of the operation completion.push(op); } else { - reader_state.buffers[*reader_buf_index] = - ReadBufState::Full(Cursor::new(buf.into_shrinked(total_read_len))); + buffers[reader_buf_index as usize] = ReadBufState::Full { + buf, + eof_pos: (last_read_len == 0 || is_last_read).then_some(total_read_len), + }; } Ok(()) @@ -409,7 +767,13 @@ impl RingOp for ReadOp { #[cfg(test)] mod tests { - use {super::*, tempfile::NamedTempFile}; + use {super::*, std::io::Seek, tempfile::NamedTempFile, test_case::test_case}; + + fn read_as_vec(mut reader: impl Read) -> Vec { + let mut buf = Vec::new(); + reader.read_to_end(&mut buf).unwrap(); + buf + } fn check_reading_file(file_size: usize, backing_buffer_size: usize, read_capacity: usize) { let pattern: Vec = (0..251).collect(); @@ -422,13 +786,18 @@ mod tests { io::Write::write_all(&mut temp_file, &pattern[..file_size % pattern.len()]).unwrap(); let buf = vec![0; backing_buffer_size]; - let mut reader = - SequentialFileReader::with_buffer(temp_file.path(), buf, read_capacity).unwrap(); + let mut reader = SequentialFileReaderBuilder::new() + .read_size(read_capacity) + .build_with_buffer(buf) + .unwrap(); + reader + .add_owned_file_to_prefetch(File::open(temp_file.path()).unwrap(), usize::MAX) + .unwrap(); // Read contents from the reader and verify length - let mut all_read_data = Vec::new(); - reader.read_to_end(&mut all_read_data).unwrap(); + let all_read_data = read_as_vec(&mut reader); assert_eq!(all_read_data.len(), file_size); + assert_eq!(reader.get_file_offset(), file_size); // Verify the contents for (i, byte) in all_read_data.iter().enumerate() { @@ -462,4 +831,289 @@ mod tests { check_reading_file(250_000, 4096, 2048); check_reading_file(250_000, 4096, 4096); } + + #[test] + fn test_add_file_ref() { + let mut temp_file = NamedTempFile::new().unwrap(); + io::Write::write_all(&mut temp_file, &[0xa, 0xb, 0xc]).unwrap(); + temp_file.rewind().unwrap(); + + { + let mut reader = SequentialFileReaderBuilder::new() + .read_size(512) + .build_with_buffer(vec![0; 1024]) + .unwrap(); + reader.add_file_to_prefetch(temp_file.as_file(), 3).unwrap(); + assert_eq!(read_as_vec(&mut reader), &[0xa, 0xb, 0xc]); + } + // Independently we can also read from the file directly + assert_eq!(read_as_vec(&mut temp_file), &[0xa, 0xb, 0xc]); + } + + #[test] + fn test_multiple_unlimited_files() { + let mut temp1 = NamedTempFile::new().unwrap(); + io::Write::write_all(&mut temp1, &[0xa, 0xb, 0xc]).unwrap(); + let mut temp2 = NamedTempFile::new().unwrap(); + io::Write::write_all(&mut temp2, &[0xd, 0xe, 0xf, 0x10]).unwrap(); + + let mut reader = SequentialFileReaderBuilder::new() + .read_size(512) + .build_with_buffer(vec![0; 1024]) + .unwrap(); + + let f1 = File::open(temp1.path()).unwrap(); + let f2 = File::open(temp2.path()).unwrap(); + reader.add_owned_file_to_prefetch(f1, usize::MAX).unwrap(); + reader.add_owned_file_to_prefetch(f2, usize::MAX).unwrap(); + + assert_eq!(read_as_vec(&mut reader), vec![0xa, 0xb, 0xc]); + reader.move_to_next_file().unwrap(); + + assert_eq!(read_as_vec(&mut reader), vec![0xd, 0xe, 0xf, 0x10]); + reader.move_to_next_file().unwrap(); + + let f1 = File::open(temp1.path()).unwrap(); + reader.add_owned_file_to_prefetch(f1, usize::MAX).unwrap(); + assert_eq!(read_as_vec(&mut reader), vec![0xa, 0xb, 0xc]); + } + + #[test] + fn test_multiple_limited_files() { + let mut temp1 = NamedTempFile::new().unwrap(); + io::Write::write_all(&mut temp1, &[0xa, 0xb, 0xc]).unwrap(); + let mut temp2 = NamedTempFile::new().unwrap(); + io::Write::write_all(&mut temp2, &[0xd, 0xe, 0xf, 0x10]).unwrap(); + + let mut reader = SequentialFileReaderBuilder::new() + .read_size(512) + .build_with_buffer(vec![0; 1024]) + .unwrap(); + reader.add_file_to_prefetch(temp1.as_file(), 2).unwrap(); + reader.add_file_to_prefetch(temp2.as_file(), 3).unwrap(); + reader.add_file_to_prefetch(temp1.as_file(), 4).unwrap(); + reader.add_file_to_prefetch(temp2.as_file(), 5).unwrap(); + + assert_eq!(read_as_vec(&mut reader), vec![0xa, 0xb]); + reader.move_to_next_file().unwrap(); + + assert_eq!(read_as_vec(&mut reader), vec![0xd, 0xe, 0xf]); + reader.move_to_next_file().unwrap(); + + assert_eq!(read_as_vec(&mut reader), vec![0xa, 0xb, 0xc]); + reader.move_to_next_file().unwrap(); + + assert_eq!(read_as_vec(&mut reader), vec![0xd, 0xe, 0xf, 0x10]); + + reader.add_file_to_prefetch(temp2.as_file(), 4).unwrap(); + reader.add_file_to_prefetch(temp1.as_file(), 2).unwrap(); + reader.move_to_next_file().unwrap(); + + assert_eq!(read_as_vec(&mut reader), vec![0xd, 0xe, 0xf, 0x10]); + reader.move_to_next_file().unwrap(); + + assert_eq!(read_as_vec(&mut reader), vec![0xa, 0xb]); + } + + #[test_case(2048, 512)] + #[test_case(256, 128)] + #[test_case(32, 2)] + fn test_multiple_limited_and_unlimited_files(buffer_size: usize, read_size: usize) { + let mut temp1 = NamedTempFile::new().unwrap(); + io::Write::write_all(&mut temp1, &[0xa, 0xb, 0xc, 0xd]).unwrap(); + let mut temp2 = NamedTempFile::new().unwrap(); + io::Write::write_all(&mut temp2, &[0x10, 0x11, 0x12, 0x13, 0x14]).unwrap(); + + let mut reader = SequentialFileReaderBuilder::new() + .read_size(read_size) + .build(buffer_size) + .unwrap(); + reader.add_file_to_prefetch(temp1.as_file(), 2).unwrap(); + reader + .add_file_to_prefetch(temp2.as_file(), usize::MAX) + .unwrap(); + reader.add_file_to_prefetch(temp1.as_file(), 3).unwrap(); + reader.add_file_to_prefetch(temp2.as_file(), 4).unwrap(); + reader + .add_file_to_prefetch(temp1.as_file(), usize::MAX) + .unwrap(); + + assert_eq!(read_as_vec(&mut reader), vec![0xa, 0xb]); + reader.move_to_next_file().unwrap(); + + assert_eq!(read_as_vec(&mut reader), vec![0x10, 0x11, 0x12, 0x13, 0x14]); + reader.move_to_next_file().unwrap(); + + assert_eq!(read_as_vec(&mut reader), vec![0xa, 0xb, 0xc]); + reader.move_to_next_file().unwrap(); + + assert_eq!(read_as_vec(&mut reader), vec![0x10, 0x11, 0x12, 0x13]); + reader.move_to_next_file().unwrap(); + + assert_eq!(read_as_vec(&mut reader), vec![0xa, 0xb, 0xc, 0xd]); + } + + #[test_case(2048, 512)] + #[test_case(256, 128)] + #[test_case(256, 32)] + fn test_multiple_medium_limited_files(buffer_size: usize, read_size: usize) { + let pattern = (0..2000).map(|i| i as u8).collect::>(); + let mut temp1 = NamedTempFile::new().unwrap(); + io::Write::write_all(&mut temp1, &pattern).unwrap(); + let mut temp2 = NamedTempFile::new().unwrap(); + io::Write::write_all(&mut temp2, &pattern[1000..]).unwrap(); + + let mut reader = SequentialFileReaderBuilder::new() + .read_size(read_size) + .build_with_buffer(vec![0; buffer_size]) + .unwrap(); + reader.add_file_to_prefetch(temp1.as_file(), 1990).unwrap(); + reader.add_file_to_prefetch(temp2.as_file(), 1000).unwrap(); + reader.add_file_to_prefetch(temp1.as_file(), 2010).unwrap(); + + assert_eq!(read_as_vec(&mut reader), &pattern[..1990]); + + reader.move_to_next_file().unwrap(); + + assert_eq!(read_as_vec(&mut reader), &pattern[1000..]); + + reader.move_to_next_file().unwrap(); + + assert_eq!(read_as_vec(&mut reader), pattern); + } + + #[test] + fn test_interleave_add_file_and_reads() { + let pattern = (0..2000).map(|i| i as u8).collect::>(); + let mut temp1 = NamedTempFile::new().unwrap(); + io::Write::write_all(&mut temp1, &pattern).unwrap(); + let mut temp2 = NamedTempFile::new().unwrap(); + io::Write::write_all(&mut temp2, &pattern[1000..]).unwrap(); + + let mut reader = SequentialFileReaderBuilder::new() + .read_size(512) + .build(1024) + .unwrap(); + reader.add_file_to_prefetch(temp1.as_file(), 1990).unwrap(); + assert_eq!(read_as_vec(&mut reader), &pattern[..1990]); + reader.move_to_next_file().unwrap(); + + for _ in 0..10 { + reader.add_file_to_prefetch(temp2.as_file(), 1000).unwrap(); + assert_eq!(read_as_vec(&mut reader), &pattern[1000..]); + reader.move_to_next_file().unwrap(); + + reader.add_file_to_prefetch(temp1.as_file(), 2010).unwrap(); + assert_eq!(read_as_vec(&mut reader), &pattern[..2000]); + reader.move_to_next_file().unwrap(); + } + assert_eq!(read_as_vec(&mut reader), Vec::::new()); + + for _ in 0..10 { + reader.add_file_to_prefetch(temp2.as_file(), 1000).unwrap(); + reader.add_file_to_prefetch(temp1.as_file(), 2010).unwrap(); + + assert_eq!(read_as_vec(&mut reader), &pattern[1000..]); + reader.move_to_next_file().unwrap(); + assert_eq!(read_as_vec(&mut reader), &pattern[..2000]); + reader.move_to_next_file().unwrap(); + } + assert_eq!(read_as_vec(&mut reader), Vec::::new()); + } + + #[test] + fn test_get_offset() { + let pattern = (0..600).map(|i| i as u8).collect::>(); + let mut temp1 = NamedTempFile::new().unwrap(); + io::Write::write_all(&mut temp1, &pattern).unwrap(); + + let mut reader = SequentialFileReaderBuilder::new() + .read_size(512) + .build_with_buffer(vec![0; 1024]) + .unwrap(); + reader.add_file_to_prefetch(temp1.as_file(), 1990).unwrap(); + + assert_eq!(512, reader.fill_buf().unwrap().len()); + assert_eq!(0, reader.get_file_offset()); + reader.consume(0); + assert_eq!(0, reader.get_file_offset()); + + reader.consume(40); + assert_eq!(40, reader.get_file_offset()); + assert_eq!(472, reader.fill_buf().unwrap().len()); + + reader.consume(472); + assert_eq!(512, reader.get_file_offset()); + assert_eq!(88, reader.fill_buf().unwrap().len()); + reader.consume(0); + assert_eq!(512, reader.get_file_offset()); + + reader.consume(88); + assert_eq!(600, reader.get_file_offset()); + assert_eq!(0, reader.fill_buf().unwrap().len()); + + reader.move_to_next_file().unwrap(); + assert_eq!(0, reader.get_file_offset()); + } + + #[test] + fn test_consume_skip_filled_buf_len() { + let pattern = (0..6000).map(|i| i as u8).collect::>(); + let mut temp1 = NamedTempFile::new().unwrap(); + io::Write::write_all(&mut temp1, &pattern).unwrap(); + + let mut reader = SequentialFileReaderBuilder::new() + .read_size(512) + .build_with_buffer(vec![0; 2048]) + .unwrap(); + reader.add_file_to_prefetch(temp1.as_file(), 5990).unwrap(); + + assert_eq!(reader.fill_buf().unwrap(), &pattern[..512]); + assert_eq!(0, reader.get_file_offset()); + + reader.consume(600); + assert_eq!(600, reader.get_file_offset()); + assert_eq!(reader.fill_buf().unwrap(), &pattern[600..1024]); + + reader.consume(400); + assert_eq!(1000, reader.get_file_offset()); + assert_eq!(reader.fill_buf().unwrap(), &pattern[1000..1024]); + + reader.consume(25); + assert_eq!(reader.fill_buf().unwrap(), &pattern[1025..1536]); + + reader.consume(2000); + assert_eq!(reader.fill_buf().unwrap(), &pattern[3025..3072]); + } + + #[test] + fn test_activate_file() { + let mut temp1 = NamedTempFile::new().unwrap(); + io::Write::write_all(&mut temp1, &[0xa, 0xb, 0xc]).unwrap(); + let mut temp2 = NamedTempFile::new().unwrap(); + io::Write::write_all(&mut temp2, &[0xd, 0xe, 0xf, 0x10]).unwrap(); + + let mut reader = SequentialFileReaderBuilder::new() + .read_size(512) + .build_with_buffer(vec![0; 1024]) + .unwrap(); + reader.add_file_to_prefetch(temp1.as_file(), 3).unwrap(); + reader.add_file_to_prefetch(temp2.as_file(), 4).unwrap(); + + assert_eq!(read_as_vec(&mut reader), vec![0xa, 0xb, 0xc]); + + reader.activate_file(temp2.as_file(), 4).unwrap(); + assert_eq!(read_as_vec(&mut reader), vec![0xd, 0xe, 0xf, 0x10]); + + reader.activate_file(temp1.as_file(), 4).unwrap(); + assert_eq!(read_as_vec(&mut reader), vec![0xa, 0xb, 0xc]); + + let f1 = File::open(temp1.path()).unwrap(); + reader.add_owned_file_to_prefetch(f1, usize::MAX).unwrap(); + reader.move_to_next_file().unwrap(); + assert_eq!(read_as_vec(&mut reader), vec![0xa, 0xb, 0xc]); + + reader.activate_file(temp2.as_file(), 4).unwrap(); + assert_eq!(read_as_vec(&mut reader), vec![0xd, 0xe, 0xf, 0x10]); + } } diff --git a/runtime/src/serde_snapshot.rs b/runtime/src/serde_snapshot.rs index 4d6b2182fb9880..defea1c41c4d93 100644 --- a/runtime/src/serde_snapshot.rs +++ b/runtime/src/serde_snapshot.rs @@ -1010,6 +1010,7 @@ fn reconstruct_accountsdb_from_fields( where E: SerializableStorage + std::marker::Sync, { + let accounts_db_memlock_budget_size = accounts_db_config.memlock_budget_size; let mut accounts_db = AccountsDb::new_with_config( account_paths.to_vec(), accounts_db_config, @@ -1063,7 +1064,11 @@ where let IndexGenerationInfo { accounts_data_len, calculated_accounts_lt_hash, - } = accounts_db.generate_index(limit_load_slot_count_from_snapshot, verify_index); + } = accounts_db.generate_index( + limit_load_slot_count_from_snapshot, + accounts_db_memlock_budget_size, + verify_index, + ); info!("Building accounts index... Done in {:?}", start.elapsed()); Ok((