diff --git a/CHANGELOG.md b/CHANGELOG.md index 9562034a898..8ccdbc4a78c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -27,6 +27,7 @@ Release channels have their own copy of this changelog: ### Validator #### Breaking +* Require increased `memlock` limits - recommended setting is `LimitMEMLOCK=2000000000` in systemd service configuration. Lack of sufficient limit (on Linux) will cause startup error. * Remove deprecated arguments * `--accounts-index-memory-limit-mb` * `--accountsdb-repl-bind-address`, `--accountsdb-repl-port`, `--accountsdb-repl-threads`, `--enable-accountsdb-repl` @@ -41,7 +42,6 @@ Release channels have their own copy of this changelog: * Using `--snapshot-interval-slots 0` to disable generating snapshots has been removed. Use `--no-snapshots` instead. #### Changes -* Reading snapshot archives requires increased `memlock` limits - recommended setting is `LimitMEMLOCK=2000000000` in systemd service configuration. Lack of sufficient limit will result slower startup times. * `--transaction-structure view` is now the default. * The default full snapshot interval is now 100,000 slots. diff --git a/Cargo.lock b/Cargo.lock index 0c90a7197d9..3fe1a444351 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6868,7 +6868,6 @@ dependencies = [ "solana-message", "solana-metrics", "solana-nohash-hasher", - "solana-perf", "solana-pubkey", "solana-rayon-threadlimit", "solana-rent", diff --git a/accounts-db/Cargo.toml b/accounts-db/Cargo.toml index 35821c4405a..46d56f733fc 100644 --- a/accounts-db/Cargo.toml +++ b/accounts-db/Cargo.toml @@ -85,7 +85,6 @@ solana-measure = { workspace = true } solana-message = { workspace = true } solana-metrics = { workspace = true } solana-nohash-hasher = { workspace = true } -solana-perf = { workspace = true } solana-pubkey = { workspace = true } solana-rayon-threadlimit = { workspace = true } solana-rent = { workspace = true, optional = true } diff --git a/accounts-db/src/buffered_reader.rs b/accounts-db/src/buffered_reader.rs index e038f700b78..e96e32c25f3 100644 --- a/accounts-db/src/buffered_reader.rs +++ b/accounts-db/src/buffered_reader.rs @@ -286,13 +286,9 @@ pub fn large_file_buf_reader( if agave_io_uring::io_uring_supported() { use crate::io_uring::sequential_file_reader::SequentialFileReader; - let io_uring_reader = SequentialFileReader::with_capacity(buf_size, &path); - match io_uring_reader { - Ok(reader) => return Ok(Box::new(reader)), - Err(error) => { - log::warn!("unable to create io_uring reader: {error}"); - } - } + return Ok(Box::new(SequentialFileReader::with_capacity( + buf_size, path, + )?)); } let file = File::open(path)?; Ok(Box::new(BufReader::with_capacity(buf_size, file))) diff --git a/accounts-db/src/file_io.rs b/accounts-db/src/file_io.rs index fbf767ee630..5bc15a57180 100644 --- a/accounts-db/src/file_io.rs +++ b/accounts-db/src/file_io.rs @@ -1,5 +1,11 @@ //! File i/o helper functions. -use std::{fs::File, ops::Range}; +use std::{ + fs::{File, OpenOptions}, + io::{self, BufWriter, Write}, + ops::Range, + path::PathBuf, + sync::Arc, +}; /// `buffer` contains `valid_bytes` of data at its end. /// Move those valid bytes to the beginning of `buffer`, then read from `offset` to fill the rest of `buffer`. @@ -83,10 +89,105 @@ pub fn read_into_buffer( Ok(total_bytes_read) } +/// An asynchronous queue for file creation. +pub trait FileCreator { + /// Schedule creating a file at `path` with `mode` permissions and bytes read from `contents`. + /// + /// `parent_dir_handle` is assumed to be a parent directory of `path` such that file may be + /// created using optimized kernel API to create `path.file_name()` inside `parent_dir_handle`. + fn schedule_create_at_dir( + &mut self, + path: PathBuf, + mode: u32, + parent_dir_handle: Arc, + contents: &mut dyn io::Read, + ) -> io::Result<()>; + + /// Invoke implementation specific logic to handle file creation completion. + fn file_complete(&mut self, path: PathBuf); + + /// Waits for all operations to be completed + fn drain(&mut self) -> io::Result<()>; +} + +pub fn file_creator<'a>( + buf_size: usize, + file_complete: impl FnMut(PathBuf) + 'a, +) -> io::Result> { + #[cfg(target_os = "linux")] + if agave_io_uring::io_uring_supported() { + use crate::io_uring::file_creator::IoUringFileCreator; + + let io_uring_creator = IoUringFileCreator::with_buffer_capacity(buf_size, file_complete)?; + return Ok(Box::new(io_uring_creator)); + } + Ok(Box::new(SyncIoFileCreator::new(buf_size, file_complete))) +} + +pub struct SyncIoFileCreator<'a> { + file_complete: Box, +} + +impl<'a> SyncIoFileCreator<'a> { + fn new(_buf_size: usize, file_complete: impl FnMut(PathBuf) + 'a) -> Self { + Self { + file_complete: Box::new(file_complete), + } + } +} + +#[cfg(not(unix))] +pub(super) fn set_file_readonly(path: &std::path::Path, readonly: bool) -> io::Result<()> { + let mut perm = std::fs::metadata(path)?.permissions(); + perm.set_readonly(readonly); + std::fs::set_permissions(path, perm) +} + +impl FileCreator for SyncIoFileCreator<'_> { + fn schedule_create_at_dir( + &mut self, + path: PathBuf, + mode: u32, + _parent_dir_handle: Arc, + contents: &mut dyn io::Read, + ) -> io::Result<()> { + // Open for writing (also allows overwrite) and apply `mode` + let mut options = OpenOptions::new(); + options.create(true).truncate(true).write(true); + + #[cfg(unix)] + std::os::unix::fs::OpenOptionsExt::mode(&mut options, mode); + + let mut file_buf = BufWriter::new(options.open(&path)?); + io::copy(contents, &mut file_buf)?; + file_buf.flush()?; + + #[cfg(not(unix))] + set_file_readonly(&path, mode & 0o200 == 0)?; + + self.file_complete(path); + Ok(()) + } + + fn file_complete(&mut self, path: PathBuf) { + (self.file_complete)(path) + } + + fn drain(&mut self) -> io::Result<()> { + Ok(()) + } +} + #[cfg(test)] mod tests { - - use {super::*, std::io::Write, tempfile::tempfile}; + use { + super::*, + std::{ + fs, + io::{Cursor, Write}, + }, + tempfile::tempfile, + }; #[test] fn test_read_into_buffer() { @@ -193,4 +294,68 @@ mod tests { bytes[start_offset..file_size] ); } + + fn read_file_to_string(path: &PathBuf) -> String { + String::from_utf8(fs::read(path).expect("Failed to read file")) + .expect("Failed to decode file contents") + } + + #[test] + fn test_create_writes_contents() -> io::Result<()> { + let temp_dir = tempfile::tempdir()?; + let file_path = temp_dir.path().join("test.txt"); + let contents = "Hello, world!"; + + // Shared state to capture callback invocations + let mut callback_invoked_path = None; + + // Instantiate FileCreator + let mut creator = file_creator(2 << 20, |path| { + callback_invoked_path.replace(path); + })?; + + let dir = Arc::new(File::open(temp_dir.path())?); + creator.schedule_create_at_dir( + file_path.clone(), + 0o644, + dir, + &mut Cursor::new(contents), + )?; + creator.drain()?; + drop(creator); + + assert_eq!(read_file_to_string(&file_path), contents); + assert_eq!(callback_invoked_path, Some(file_path)); + + Ok(()) + } + + #[test] + fn test_multiple_file_creations() -> io::Result<()> { + let temp_dir = tempfile::tempdir()?; + let mut callback_counter = 0; + + let mut creator = file_creator(2 << 20, |path: PathBuf| { + let contents = read_file_to_string(&path); + assert!(contents.starts_with("File ")); + callback_counter += 1; + })?; + + let dir = Arc::new(File::open(temp_dir.path())?); + for i in 0..5 { + let file_path = temp_dir.path().join(format!("file_{i}.txt")); + let data = format!("File {i}"); + creator.schedule_create_at_dir( + file_path, + 0o600, + dir.clone(), + &mut Cursor::new(data), + )?; + } + creator.drain()?; + drop(creator); + + assert_eq!(callback_counter, 5); + Ok(()) + } } diff --git a/accounts-db/src/hardened_unpack.rs b/accounts-db/src/hardened_unpack.rs index 6d2b8df6e82..5b5c72aaf6c 100644 --- a/accounts-db/src/hardened_unpack.rs +++ b/accounts-db/src/hardened_unpack.rs @@ -1,17 +1,19 @@ use { + crate::file_io::{file_creator, FileCreator}, bzip2::bufread::BzDecoder, + crossbeam_channel::Sender, log::*, rand::{thread_rng, Rng}, solana_genesis_config::{GenesisConfig, DEFAULT_GENESIS_ARCHIVE, DEFAULT_GENESIS_FILE}, - solana_perf::packet::bytes::{Buf, Bytes, BytesMut}, std::{ - collections::{HashMap, VecDeque}, + collections::HashMap, fs::{self, File}, io::{self, BufReader, Read}, path::{ Component::{self, CurDir, Normal}, Path, PathBuf, }, + sync::Arc, time::Instant, }, tar::{ @@ -46,225 +48,13 @@ const MAX_SNAPSHOT_ARCHIVE_UNPACKED_COUNT: u64 = 5_000_000; pub const MAX_GENESIS_ARCHIVE_UNPACKED_SIZE: u64 = 10 * 1024 * 1024; // 10 MiB const MAX_GENESIS_ARCHIVE_UNPACKED_COUNT: u64 = 100; -/// Collection of shareable byte slices forming a chain of bytes to read (using `std::io::Read`) -pub struct MultiBytes(VecDeque); - -impl MultiBytes { - pub fn new() -> Self { - // Typically we expect 2 entries: - // archive spanning until end of decode buffer + - // short continuation of last entry from next buffer - Self(VecDeque::with_capacity(2)) - } - - pub fn is_empty(&self) -> bool { - self.0.is_empty() - } - - pub fn push(&mut self, bytes: Bytes) { - self.0.push_back(bytes); - } -} - -impl Default for MultiBytes { - fn default() -> Self { - Self::new() - } -} - -impl Read for MultiBytes { - fn read(&mut self, mut buf: &mut [u8]) -> io::Result { - let mut copied_len = 0; - while let Some(bytes) = self.0.front_mut() { - let to_copy_len = bytes.len().min(buf.len()); - let (to_copy_dst_buf, remaining_buf) = buf.split_at_mut(to_copy_len); - bytes.copy_to_slice(to_copy_dst_buf); - copied_len += to_copy_len; - if bytes.is_empty() { - self.0.pop_front(); - } - if remaining_buf.is_empty() { - break; - } - buf = remaining_buf; - } - Ok(copied_len) - } -} - -pub struct BytesChannelReader { - current_bytes: MultiBytes, - receiver: crossbeam_channel::Receiver, -} - -impl BytesChannelReader { - pub fn new(receiver: crossbeam_channel::Receiver) -> Self { - Self { - current_bytes: MultiBytes::new(), - receiver, - } - } -} - -impl Read for BytesChannelReader { - fn read(&mut self, buf: &mut [u8]) -> io::Result { - while self.current_bytes.is_empty() { - let Ok(new_bytes) = self.receiver.recv() else { - return Ok(0); - }; - self.current_bytes = new_bytes; - } - self.current_bytes.read(buf) - } -} - -#[derive(Debug)] -pub struct ArchiveChunker { - input: R, - /// Intermediate buffer with tar contents to seek and split on entry boundaries - current_decoded: Bytes, - /// Number of bytes from last entry that were not available in decoded buffer - num_started_entry_bytes: usize, - mempool: VecDeque, -} - -impl ArchiveChunker { - const TAR_BLOCK_SIZE: usize = size_of::(); - // Buffer size will influence typical amount of bytes sent as single work item. - // Pick value significantly larger than majority of entries, yet not too large to keep - // the work-queue non-empty as much as possible. - const DECODE_BUF_SIZE: usize = 64 * 1024 * 1024; - - pub fn new(input: R) -> Self { - Self { - input, - current_decoded: Bytes::new(), - num_started_entry_bytes: 0, - mempool: VecDeque::new(), - } - } - - /// Read `self.input`, split it at TAR archive boundaries and send chunks consisting - /// of complete, independent tar archives into `chunk_sender`. - pub fn decode_and_send_chunks( - mut self, - chunk_sender: crossbeam_channel::Sender, - ) -> io::Result<()> { - // Bytes for chunk of archive to be sent to workers for unpacking - let mut current_chunk = MultiBytes::new(); - while self.refill_decoded_buf()? { - let (new_bytes, was_archive_completion) = if self.has_started_entry() { - let started_entry_bytes = self.take_started_entry_bytes(); - let did_finish_entry = !self.has_started_entry(); - (started_entry_bytes, did_finish_entry) - } else { - (self.take_complete_archive()?, true) - }; - if !new_bytes.is_empty() { - current_chunk.push(new_bytes); - if was_archive_completion { - let chunk = std::mem::take(&mut current_chunk); - if chunk_sender.send(chunk).is_err() { - break; - } - } - } - } - Ok(()) - } - - /// Take as many bytes as possible from decoded data until last entry boundary. - fn take_complete_archive(&mut self) -> io::Result { - let mut archive = Archive::new(self.current_decoded.as_ref()); - - let mut completed_entry_end = 0; - let mut entry_end = 0; - for entry in archive.entries()? { - let entry = entry?; - // End of file data - assert_ne!(tar::EntryType::GNUSparse, entry.header().entry_type()); - entry_end = (entry.raw_file_position() + entry.size()) as usize; - - // Padding to block size - entry_end = Self::TAR_BLOCK_SIZE * entry_end.div_ceil(Self::TAR_BLOCK_SIZE); - if entry_end <= self.current_decoded.len() { - // Entry ends within decoded input, we can consume it - completed_entry_end = entry_end; - } - if entry_end + Self::TAR_BLOCK_SIZE > self.current_decoded.len() { - // Next entry's header spans beyond input - can't decode it, - // so terminate at last completed entry and keep remaining input after it - break; - } - } - // Either we run out of entries or last entry crosses input - let completed_entry = self.current_decoded.split_to(completed_entry_end); - if completed_entry.is_empty() && entry_end == completed_entry_end { - // Archive ended, clear any tar footer from remaining input - assert!( - self.current_decoded.len() <= 1024, - "Footer should be at most 1024 len" - ); - self.current_decoded.clear(); - } - self.num_started_entry_bytes = entry_end - completed_entry_end; - Ok(completed_entry) - } - - fn has_started_entry(&self) -> bool { - self.num_started_entry_bytes > 0 - } - - fn take_started_entry_bytes(&mut self) -> Bytes { - let num_bytes = self.num_started_entry_bytes.min(self.current_decoded.len()); - self.num_started_entry_bytes -= num_bytes; - self.current_decoded.split_to(num_bytes) - } - - /// Re-fill decoded buffer such that it has minimum bytes to decode TAR header. - /// - /// Return `false` on EOF - fn refill_decoded_buf(&mut self) -> io::Result { - if self.current_decoded.len() < Self::TAR_BLOCK_SIZE { - let mut next_buffer = self.get_next_buffer(); - if !self.current_decoded.is_empty() { - next_buffer.extend_from_slice(&self.current_decoded); - } - self.current_decoded = self.decode_bytes(next_buffer)?; - } - Ok(!self.current_decoded.is_empty()) - } - - /// Acquire memory buffer for decoding input reusing already consumed chunks. - fn get_next_buffer(&mut self) -> BytesMut { - if self.mempool.front().is_some_and(Bytes::is_unique) { - let mut reclaimed: BytesMut = self.mempool.pop_front().unwrap().into(); - reclaimed.clear(); - reclaimed - } else { - BytesMut::with_capacity(Self::DECODE_BUF_SIZE) - } - } - - /// Fill `decode_buf` with data from `self.input`. - fn decode_bytes(&mut self, mut decode_buf: BytesMut) -> io::Result { - let mut_slice = unsafe { - std::slice::from_raw_parts_mut(decode_buf.as_mut_ptr(), decode_buf.capacity()) - }; - let mut current_len = decode_buf.len(); - while current_len < decode_buf.capacity() { - let new_bytes = self.input.read(&mut mut_slice[current_len..])?; - if new_bytes == 0 { - break; - } - current_len += new_bytes; - } - unsafe { decode_buf.set_len(current_len) }; - let bytes: Bytes = decode_buf.into(); - self.mempool.push_back(bytes.clone()); - Ok(bytes) - } -} +// The buffer should be large enough to saturate write I/O bandwidth, while also accommodating: +// - Many small files: each file consumes at least one write-capacity-sized chunk (0.5-1 MiB). +// - Large files: their data may accumulate in backlog buffers while waiting for file open +// operations to complete. +const MAX_UNPACK_WRITE_BUF_SIZE: usize = 512 * 1024 * 1024; +// Minimum for unpacking small archives - allows ~2-4 write-capacity-sized operations concurrently. +const MIN_UNPACK_WRITE_BUF_SIZE: usize = 2 * 1024 * 1024; fn checked_total_size_sum(total_size: u64, entry_size: u64, limit_size: u64) -> Result { trace!("checked_total_size_sum: {total_size} + {entry_size} < {limit_size}"); @@ -287,9 +77,11 @@ fn checked_total_count_increment(total_count: u64, limit_count: u64) -> Result Result<()> { - if !unpack_result { - return Err(UnpackError::Archive(format!("failed to unpack: {path:?}"))); +fn check_unpack_result(unpack_result: Result<()>, path: String) -> Result<()> { + if let Err(err) = unpack_result { + return Err(UnpackError::Archive(format!( + "failed to unpack {path:?}: {err}" + ))); } Ok(()) } @@ -306,23 +98,30 @@ fn unpack_archive<'a, A, C, D>( apparent_limit_size: u64, actual_limit_size: u64, limit_count: u64, - mut entry_checker: C, // checks if entry is valid - entry_processor: D, // processes entry after setting permissions + mut entry_checker: C, // checks if entry is valid + file_path_processor: D, // processes file paths after writing ) -> Result<()> where A: Read, C: FnMut(&[&str], tar::EntryType) -> UnpackPath<'a>, - D: Fn(PathBuf), + D: FnMut(PathBuf), { let mut apparent_total_size: u64 = 0; let mut actual_total_size: u64 = 0; let mut total_count: u64 = 0; let mut total_entries = 0; - let mut sanitized_paths_cache = Vec::new(); + let mut open_dirs = Vec::new(); + + // Bound the buffer based on provided limit of unpacked data (buffering a fraction, + // e.g. 25%, of absolute maximum won't be necessary) - this works well for genesis, + // while normal case hit the UNPACK_WRITE_BUF_SIZE tuned for it prod snapshot archive. + let buf_size = (apparent_limit_size.div_ceil(4) as usize) + .clamp(MIN_UNPACK_WRITE_BUF_SIZE, MAX_UNPACK_WRITE_BUF_SIZE); + let mut files_creator = file_creator(buf_size, file_path_processor)?; for entry in archive.entries()? { - let mut entry = entry?; + let entry = entry?; let path = entry.path()?; let path_str = path.display().to_string(); @@ -385,30 +184,47 @@ where // account_paths returned by `entry_checker`. We want to unpack into // account_path/ instead of account_path/accounts/ so we strip the // accounts/ prefix. - sanitize_path(&account, unpack_dir, &mut sanitized_paths_cache) + sanitize_path_and_open_dir(&account, unpack_dir, &mut open_dirs) } else { - sanitize_path(&path, unpack_dir, &mut sanitized_paths_cache) + sanitize_path_and_open_dir(&path, unpack_dir, &mut open_dirs) }?; // ? handles file system errors - let Some(entry_path) = entry_path else { + let Some((entry_path, open_dir)) = entry_path else { continue; // skip it }; - let unpack = entry.unpack(&entry_path); - check_unpack_result(unpack.map(|_unpack| true)?, path_str)?; - - // Sanitize permissions. - let mode = match entry.header().entry_type() { - GNUSparse | Regular => 0o644, - _ => 0o755, - }; - set_perms(&entry_path, mode)?; - - // Process entry after setting permissions - entry_processor(entry_path); + let unpack = unpack_entry(&mut files_creator, entry, entry_path, open_dir); + check_unpack_result(unpack, path_str)?; total_entries += 1; } + files_creator.drain()?; + info!("unpacked {total_entries} entries total"); + Ok(()) +} + +fn unpack_entry<'a, R: Read>( + files_creator: &mut Box, + mut entry: tar::Entry<'_, R>, + dst: PathBuf, + dst_open_dir: Arc, +) -> Result<()> { + let mode = match entry.header().entry_type() { + GNUSparse | Regular => 0o644, + _ => 0o755, + }; + if should_fallback_to_tar_unpack(&entry) { + entry.unpack(&dst)?; + // Sanitize permissions. + set_perms(&dst, mode)?; + + if !entry.header().entry_type().is_dir() { + // Process file after setting permissions + files_creator.file_complete(dst); + } + return Ok(()); + } + files_creator.schedule_create_at_dir(dst, mode, dst_open_dir, &mut entry)?; return Ok(()); @@ -422,28 +238,38 @@ where #[cfg(windows)] fn set_perms(dst: &Path, _mode: u32) -> io::Result<()> { - let mut perm = fs::metadata(dst)?.permissions(); - // This is OK for Windows, but clippy doesn't realize we're doing this - // only on Windows. - #[allow(clippy::permissions_set_readonly_false)] - perm.set_readonly(false); - fs::set_permissions(dst, perm) + super::file_io::set_file_readonly(dst, false) } } +fn should_fallback_to_tar_unpack(entry: &tar::Entry<'_, R>) -> bool { + // Follows cases that are handled as directory or in special way by tar-rs library, + // we want to handle just cases where the library would write plain files with entry's content. + matches!( + entry.header().entry_type(), + tar::EntryType::Directory + | tar::EntryType::Link + | tar::EntryType::Symlink + | tar::EntryType::XGlobalHeader + | tar::EntryType::XHeader + | tar::EntryType::GNULongName + | tar::EntryType::GNULongLink + ) || entry.header().as_ustar().is_none() && entry.path_bytes().ends_with(b"/") +} + // return Err on file system error -// return Some(path) if path is good +// return Some((path, open_dir)) if path is good // return None if we should skip this file -fn sanitize_path( +fn sanitize_path_and_open_dir( entry_path: &Path, dst: &Path, - cache: &mut Vec<(PathBuf, PathBuf)>, -) -> Result> { + open_dirs: &mut Vec<(PathBuf, Arc)>, +) -> Result)>> { // We cannot call unpack_in because it errors if we try to use 2 account paths. // So, this code is borrowed from unpack_in // ref: https://docs.rs/tar/*/tar/struct.Entry.html#method.unpack_in let mut file_dst = dst.to_path_buf(); - const SKIP: Result> = Ok(None); + const SKIP: Result)>> = Ok(None); { let path = entry_path; for part in path.components() { @@ -475,19 +301,22 @@ fn sanitize_path( return SKIP; }; - if let Err(insert_at) = cache.binary_search_by(|(dst_cached, parent_cached)| { - parent.cmp(parent_cached).then_with(|| dst.cmp(dst_cached)) - }) { - fs::create_dir_all(parent)?; + let open_dst_dir = match open_dirs.binary_search_by(|(key, _)| parent.cmp(key)) { + Err(insert_at) => { + fs::create_dir_all(parent)?; - // Here we are different than untar_in. The code for tar::unpack_in internally calling unpack is a little different. - // ignore return value here - validate_inside_dst(dst, parent)?; - cache.insert(insert_at, (dst.to_path_buf(), parent.to_path_buf())); - } - let target = parent.join(entry_path.file_name().unwrap()); + // Here we are different than untar_in. The code for tar::unpack_in internally calling unpack is a little different. + // ignore return value here + validate_inside_dst(dst, parent)?; - Ok(Some(target)) + let opened_dir = Arc::new(File::open(parent)?); + open_dirs.insert(insert_at, (parent.to_path_buf(), opened_dir.clone())); + opened_dir + } + Ok(index) => open_dirs[index].1.clone(), + }; + + Ok(Some((file_dst, open_dst_dir))) } // copied from: @@ -495,14 +324,10 @@ fn sanitize_path( fn validate_inside_dst(dst: &Path, file_dst: &Path) -> Result { // Abort if target (canonical) parent is outside of `dst` let canon_parent = file_dst.canonicalize().map_err(|err| { - UnpackError::Archive(format!( - "{} while canonicalizing {}", - err, - file_dst.display() - )) + UnpackError::Archive(format!("{err} while canonicalizing {}", file_dst.display())) })?; let canon_target = dst.canonicalize().map_err(|err| { - UnpackError::Archive(format!("{} while canonicalizing {}", err, dst.display())) + UnpackError::Archive(format!("{err} while canonicalizing {}", dst.display())) })?; if !canon_parent.starts_with(&canon_target) { return Err(UnpackError::Archive(format!( @@ -541,22 +366,20 @@ pub fn streaming_unpack_snapshot( archive: Archive, ledger_dir: &Path, account_paths: &[PathBuf], - sender: &crossbeam_channel::Sender, + sender: &Sender, ) -> Result<()> { unpack_snapshot_with_processors( archive, ledger_dir, account_paths, |_, _| {}, - |entry_path_buf| { - if entry_path_buf.is_file() { - let result = sender.send(entry_path_buf); - if let Err(err) = result { - panic!( - "failed to send path '{}' from unpacker to rebuilder: {err}", - err.0.display(), - ); - } + |file_path| { + let result = sender.send(file_path); + if let Err(err) = result { + panic!( + "failed to send path '{}' from unpacker to rebuilder: {err}", + err.0.display(), + ); } }, ) @@ -567,12 +390,12 @@ fn unpack_snapshot_with_processors( ledger_dir: &Path, account_paths: &[PathBuf], mut accounts_path_processor: F, - entry_processor: G, + file_path_processor: G, ) -> Result<()> where A: Read, F: FnMut(&str, &Path), - G: Fn(PathBuf), + G: FnMut(PathBuf), { assert!(!account_paths.is_empty()); @@ -603,7 +426,7 @@ where UnpackPath::Invalid } }, - entry_processor, + file_path_processor, ) } @@ -981,7 +804,7 @@ mod tests { { let data = archive.into_inner().unwrap(); let reader = BufReader::new(&data[..]); - let archive: Archive> = Archive::new(reader); + let archive = Archive::new(reader); let temp_dir = tempfile::TempDir::new().unwrap(); checker(archive, temp_dir.path())?; @@ -1199,8 +1022,11 @@ mod tests { #[test] fn test_archive_unpack_snapshot_bad_unpack() { - let result = check_unpack_result(false, "abc".to_string()); - assert_matches!(result, Err(UnpackError::Archive(ref message)) if message == "failed to unpack: \"abc\""); + let result = check_unpack_result( + Err(UnpackError::Io(io::ErrorKind::FileTooLarge.into())), + "abc".to_string(), + ); + assert_matches!(result, Err(UnpackError::Archive(ref message)) if message == "failed to unpack \"abc\": IO error: file too large"); } #[test] diff --git a/accounts-db/src/io_uring/file_creator.rs b/accounts-db/src/io_uring/file_creator.rs new file mode 100644 index 00000000000..4a0d1092676 --- /dev/null +++ b/accounts-db/src/io_uring/file_creator.rs @@ -0,0 +1,568 @@ +use { + crate::{ + file_io::FileCreator, + io_uring::{ + memory::{FixedIoBuffer, LargeBuffer}, + IO_PRIO_BE_HIGHEST, + }, + }, + agave_io_uring::{Completion, FixedSlab, Ring, RingOp}, + core::slice, + io_uring::{opcode, squeue, types, IoUring}, + libc::{O_CREAT, O_NOATIME, O_NOFOLLOW, O_NONBLOCK, O_TRUNC, O_WRONLY}, + smallvec::SmallVec, + std::{ + collections::VecDeque, + fs::File, + io::{self, Read}, + mem, + os::{fd::AsRawFd, unix::ffi::OsStrExt as _}, + path::PathBuf, + pin::Pin, + ptr, + sync::Arc, + time::Duration, + }, +}; + +// Based on transfers seen with `dd bs=SIZE` for NVME drives: values >=64KiB are fine, +// but usually peak around 256KiB-1MiB. Also compare with particular NVME parameters, e.g. +// 32 pages (Maximum Data Transfer Size) * page size (MPSMIN = Memory Page Size) = 128KiB. +const DEFAULT_WRITE_SIZE: usize = 512 * 1024; + +// 99.9% of accounts storage files are < 8MiB +type BacklogVec = SmallVec<[PendingWrite; 8 * 1024 * 1024 / DEFAULT_WRITE_SIZE]>; + +// Sanity limit for slab size and number of concurrent operations, in practice with 0.5-1GiB +// buffer this is also close to the number of available buffers that small files will use up. +// Also, permitting too many open files results in many submitted open ops, which will contend +// on the directory inode lock. +const MAX_OPEN_FILES: usize = 512; + +// We need a few threads to saturate the disk bandwidth, especially that we are writing lots +// of small files, so the number of ops / write size is high. We also need open ops and writes +// to run concurrently. +// We shouldn't use too many threads, as they will contend a lot to lock the directory inode +// (on open, since in accounts-db most files land in a single dir). +const MAX_IOWQ_WORKERS: u32 = 4; + +const CHECK_PROGRESS_AFTER_SUBMIT_TIMEOUT: Option = Some(Duration::from_millis(10)); + +/// Multiple files creator with `io_uring` queue for open -> write -> close +/// operations. +pub struct IoUringFileCreator<'a, B = LargeBuffer> { + ring: Ring, FileCreatorOp>, + /// Owned buffer used (chunked into `FixedIoBuffer` items) across lifespan of `ring` + /// (should get dropped last) + _backing_buffer: B, +} + +impl<'a> IoUringFileCreator<'a, LargeBuffer> { + /// Create a new `IoUringFileCreator` using internally allocated buffer of specified + /// `buf_size` and default write size. + pub fn with_buffer_capacity( + buf_size: usize, + file_complete: F, + ) -> io::Result { + Self::with_buffer( + LargeBuffer::new(buf_size), + DEFAULT_WRITE_SIZE, + file_complete, + ) + } +} + +impl<'a, B: AsMut<[u8]>> IoUringFileCreator<'a, B> { + /// Create a new `IoUringFileCreator` using provided `buffer` and `file_complete` + /// to notify caller when file contents are already persisted. + /// + /// `buffer` is the internal buffer used for writing scheduled file contents. + /// It must be at least `write_capacity` long. The creator will execute multiple + /// `write_capacity` sized writes in parallel to empty the work queue of files to create. + pub fn with_buffer( + mut buffer: B, + write_capacity: usize, + file_complete: F, + ) -> io::Result { + // Let submission queue hold half of buffers before we explicitly syscall + // to submit them for writing (lets kernel start processing before we run out of buffers, + // but also amortizes number of `submit` syscalls made). + let ring_qsize = (buffer.as_mut().len() / write_capacity / 2).max(1) as u32; + let ring = IoUring::builder().build(ring_qsize)?; + // Maximum number of spawned [bounded IO, unbounded IO] kernel threads, we don't expect + // any unbounded work, but limit it to 1 just in case (0 leaves it unlimited). + ring.submitter() + .register_iowq_max_workers(&mut [MAX_IOWQ_WORKERS, 1])?; + Self::with_buffer_and_ring(ring, buffer, write_capacity, file_complete) + } + + fn with_buffer_and_ring( + ring: IoUring, + mut backing_buffer: B, + write_capacity: usize, + file_complete: F, + ) -> io::Result { + let buffer = backing_buffer.as_mut(); + // Take prefix of buffer that is aligned to write_capacity + assert!(buffer.len() >= write_capacity); + let write_aligned_buf_len = buffer.len() / write_capacity * write_capacity; + let buffer = &mut buffer[..write_aligned_buf_len]; + + // Safety: buffers contain unsafe pointers to `buffer`, but we make sure they are + // dropped before `backing_buffer` is dropped. + let buffers = unsafe { FixedIoBuffer::split_buffer_chunks(buffer, write_capacity) }; + let state = FileCreatorState::new(buffers.collect(), file_complete); + let ring = Ring::new(ring, state); + + // Safety: kernel holds unsafe pointers to `buffer`, struct field declaration order + // guarantees that the ring is destroyed before `_backing_buffer` is dropped. + unsafe { FixedIoBuffer::register(buffer, &ring)? }; + + // Fixed file descriptor slots. OpenAt will update them to valid fds. Length of registered + // slots must match the `state.files` slab whose indices are used as fd slot indices. + let fds = vec![-1; MAX_OPEN_FILES]; + ring.register_files(&fds)?; + + Ok(Self { + ring, + _backing_buffer: backing_buffer, + }) + } +} + +impl FileCreator for IoUringFileCreator<'_, B> { + fn schedule_create_at_dir( + &mut self, + path: PathBuf, + mode: u32, + parent_dir_handle: Arc, + contents: &mut dyn Read, + ) -> io::Result<()> { + let file_key = self.open(path, mode, Some(parent_dir_handle))?; + self.write_and_close(contents, file_key) + } + + fn file_complete(&mut self, path: PathBuf) { + (self.ring.context_mut().file_complete)(path) + } + + fn drain(&mut self) -> io::Result<()> { + let res = self.ring.drain(); + self.ring.context().log_stats(); + res + } +} + +impl IoUringFileCreator<'_, B> { + /// Schedule opening file at `path` with `mode` permissons. + /// + /// Returns key that can be used for scheduling writes for it. + fn open( + &mut self, + path: PathBuf, + mode: u32, + dir_handle: Option>, + ) -> io::Result { + let file = PendingFile::from_path(path); + let path_bytes = Pin::new(file.zero_terminated_path_bytes(dir_handle.is_some())); + + let file_key = self.wait_add_file(file)?; + + let op = FileCreatorOp::Open(OpenOp { + dir_handle, + path_bytes, + mode, + file_key, + }); + self.ring.push(op)?; + + Ok(file_key) + } + + fn wait_add_file(&mut self, file: PendingFile) -> io::Result { + loop { + self.ring.process_completions()?; + if self.ring.context().files.len() < self.ring.context().files.capacity() { + break; + } + self.ring + .submit_and_wait(1, CHECK_PROGRESS_AFTER_SUBMIT_TIMEOUT)?; + } + let file_key = self.ring.context_mut().files.insert(file); + Ok(file_key) + } + + fn write_and_close(&mut self, mut src: impl Read, file_key: usize) -> io::Result<()> { + let mut offset = 0; + loop { + let buf = self.wait_free_buf()?; + + let state = self.ring.context_mut(); + let file = state.files.get_mut(file_key).unwrap(); + + // Safety: the buffer points to the valid memory backed by `self._backing_buffer`. + // It's obtained from the queue of free buffers and is written to exclusively + // here before being handled to the kernel or backlog in `file`. + let mut_slice = unsafe { slice::from_raw_parts_mut(buf.as_mut_ptr(), buf.len()) }; + let len = src.read(mut_slice)?; + + if len == 0 { + file.eof = true; + + state.buffers.push_front(buf); + if file.is_completed() { + (state.file_complete)(mem::take(&mut file.path)); + self.ring + .push(FileCreatorOp::Close(CloseOp::new(file_key)))?; + } + break; + } + + file.writes_started += 1; + if file.completed_open { + let op = WriteOp { + file_key, + offset, + buf, + write_len: len, + }; + state.submitted_writes_size += len; + self.ring.push(FileCreatorOp::Write(op))?; + } else { + file.backlog.push((buf, offset, len)); + } + + offset += len; + } + + Ok(()) + } + + fn wait_free_buf(&mut self) -> io::Result { + loop { + self.ring.process_completions()?; + let state = self.ring.context_mut(); + if let Some(buf) = state.buffers.pop_front() { + return Ok(buf); + } + state.stats.no_buf_count += 1; + state.stats.no_buf_sum_submitted_write_sizes += state.submitted_writes_size; + + self.ring + .submit_and_wait(1, CHECK_PROGRESS_AFTER_SUBMIT_TIMEOUT)?; + } + } +} + +struct FileCreatorState<'a> { + files: FixedSlab, + buffers: VecDeque, + /// Externally provided callback to be called on paths of files that were written + file_complete: Box, + open_fds: usize, + /// Total write length of submitted writes + submitted_writes_size: usize, + stats: FileCreatorStats, +} + +impl<'a> FileCreatorState<'a> { + fn new(buffers: VecDeque, file_complete: impl FnMut(PathBuf) + 'a) -> Self { + Self { + files: FixedSlab::with_capacity(MAX_OPEN_FILES), + buffers, + file_complete: Box::new(file_complete), + open_fds: 0, + submitted_writes_size: 0, + stats: FileCreatorStats::default(), + } + } + + /// Returns write backlog that needs to be submitted to IO ring + fn mark_file_opened(&mut self, file_key: usize) -> BacklogVec { + let file = self.files.get_mut(file_key).unwrap(); + file.completed_open = true; + self.open_fds += 1; + if self.buffers.len() * 2 > self.buffers.capacity() { + self.stats.large_buf_headroom_count += 1; + } + mem::take(&mut file.backlog) + } + + /// Returns true if all of the writes are now done + fn mark_write_completed( + &mut self, + file_key: usize, + write_len: usize, + buf: FixedIoBuffer, + ) -> bool { + self.submitted_writes_size -= write_len; + self.buffers.push_front(buf); + + let file = self.files.get_mut(file_key).unwrap(); + file.writes_completed += 1; + if file.is_completed() { + (self.file_complete)(mem::take(&mut file.path)); + return true; + } + false + } + + fn mark_file_closed(&mut self, file_key: usize) { + let _ = self.files.remove(file_key); + self.open_fds -= 1; + } + + fn log_stats(&self) { + self.stats.log(); + } +} + +#[derive(Debug, Default)] +struct FileCreatorStats { + /// Count of cases when more than half of buffers are free (files are written + /// faster than submitted - consider less buffers or speeding up submission) + large_buf_headroom_count: u32, + /// Count of cases when we run out of free buffers (files are not written fast + /// enough - consider more buffers or tuning write bandwidth / patterns) + no_buf_count: u32, + /// Sum of all outstanding write sizes at moments of encountering no free buf + no_buf_sum_submitted_write_sizes: usize, +} + +impl FileCreatorStats { + fn log(&self) { + let avg_writes_at_no_buf = self + .no_buf_sum_submitted_write_sizes + .checked_div(self.no_buf_count as usize) + .unwrap_or_default(); + log::info!( + "files creation stats - large buf headroom: {}, no buf count: {},\ + avg pending writes at no buf: {avg_writes_at_no_buf}", + self.large_buf_headroom_count, + self.no_buf_count, + ); + } +} + +#[derive(Debug)] +struct OpenOp { + dir_handle: Option>, + path_bytes: Pin>, + mode: libc::mode_t, + file_key: usize, +} + +impl OpenOp { + fn entry(&mut self) -> squeue::Entry { + let at_dir_fd = types::Fd( + self.dir_handle + .as_ref() + .map(AsRawFd::as_raw_fd) + .unwrap_or(libc::AT_FDCWD), + ); + opcode::OpenAt::new(at_dir_fd, self.path_bytes.as_ptr() as _) + .flags(O_CREAT | O_TRUNC | O_NOFOLLOW | O_WRONLY | O_NOATIME | O_NONBLOCK) + .mode(self.mode) + .file_index(Some( + types::DestinationSlot::try_from_slot_target(self.file_key as u32).unwrap(), + )) + .build() + } + + fn complete( + &mut self, + ring: &mut Completion, + res: io::Result, + ) -> io::Result<()> + where + Self: Sized, + { + res?; + + let backlog = ring.context_mut().mark_file_opened(self.file_key); + for (buf, offset, len) in backlog { + let op = WriteOp { + file_key: self.file_key, + offset, + buf, + write_len: len, + }; + ring.context_mut().submitted_writes_size += len; + ring.push(FileCreatorOp::Write(op)); + } + + Ok(()) + } +} + +#[derive(Debug)] +struct CloseOp { + file_key: usize, +} + +impl<'a> CloseOp { + fn new(file_key: usize) -> Self { + Self { file_key } + } + + fn entry(&mut self) -> squeue::Entry { + opcode::Close::new(types::Fixed(self.file_key as u32)).build() + } + + fn complete( + &mut self, + ring: &mut Completion, FileCreatorOp>, + res: io::Result, + ) -> io::Result<()> + where + Self: Sized, + { + let _ = res?; + ring.context_mut().mark_file_closed(self.file_key); + Ok(()) + } +} + +#[derive(Debug)] +struct WriteOp { + file_key: usize, + offset: usize, + buf: FixedIoBuffer, + write_len: usize, +} + +impl<'a> WriteOp { + fn entry(&mut self) -> squeue::Entry { + let WriteOp { + file_key, + offset, + buf, + write_len, + } = self; + + // Safety: buf is owned by `WriteOp` during the operation handling by the kernel and + // reclaimed after completion passed in a call to `mark_write_completed`. + opcode::WriteFixed::new( + types::Fixed(*file_key as u32), + unsafe { buf.as_mut_ptr() }, + *write_len as u32, + buf.io_buf_index() + .expect("should have a valid fixed buffer"), + ) + .offset(*offset as u64) + .ioprio(IO_PRIO_BE_HIGHEST) + .build() + } + + fn complete( + &mut self, + ring: &mut Completion, FileCreatorOp>, + res: io::Result, + ) -> io::Result<()> + where + Self: Sized, + { + let written = res? as usize; + + let WriteOp { + file_key, + offset: _, + ref mut buf, + write_len, + } = self; + + // unless specified otherwise, the io uring worker will retry automatically on EAGAIN + assert_eq!(written, *write_len, "short write"); + + let buf = mem::replace(buf, FixedIoBuffer::empty()); + if ring + .context_mut() + .mark_write_completed(*file_key, *write_len, buf) + { + ring.push(FileCreatorOp::Close(CloseOp::new(*file_key))); + } + + Ok(()) + } +} + +#[derive(Debug)] +enum FileCreatorOp { + Open(OpenOp), + Close(CloseOp), + Write(WriteOp), +} + +impl RingOp> for FileCreatorOp { + fn entry(&mut self) -> squeue::Entry { + match self { + Self::Open(op) => op.entry(), + Self::Close(op) => op.entry(), + Self::Write(op) => op.entry(), + } + } + + fn complete( + &mut self, + ring: &mut Completion, + res: io::Result, + ) -> io::Result<()> + where + Self: Sized, + { + match self { + Self::Open(op) => op.complete(ring, res), + Self::Close(op) => op.complete(ring, res), + Self::Write(op) => op.complete(ring, res), + } + } +} + +type PendingWrite = (FixedIoBuffer, usize, usize); + +#[derive(Debug)] +struct PendingFile { + path: PathBuf, + completed_open: bool, + backlog: BacklogVec, + eof: bool, + writes_started: usize, + writes_completed: usize, +} + +impl PendingFile { + fn from_path(path: PathBuf) -> Self { + Self { + path, + completed_open: false, + backlog: SmallVec::new(), + writes_started: 0, + writes_completed: 0, + eof: false, + } + } + + fn zero_terminated_path_bytes(&self, only_filename: bool) -> Vec { + let mut path_bytes = Vec::with_capacity(libc::PATH_MAX as usize); + let buf_ptr = path_bytes.as_mut_ptr(); + let bytes = if only_filename { + self.path.file_name().unwrap_or_default().as_bytes() + } else { + self.path.as_os_str().as_bytes() + }; + assert!(bytes.len() < path_bytes.capacity()); + // Safety: + // We know that the buffer is large enough to hold the copy and the + // pointers don't overlap. + unsafe { + ptr::copy_nonoverlapping(bytes.as_ptr(), buf_ptr, bytes.len()); + buf_ptr.add(bytes.len()).write(0); + path_bytes.set_len(bytes.len() + 1); + } + path_bytes + } + + fn is_completed(&self) -> bool { + self.eof && self.writes_started == self.writes_completed + } +} diff --git a/accounts-db/src/io_uring/memory.rs b/accounts-db/src/io_uring/memory.rs index 12cf449ab76..a218593f76a 100644 --- a/accounts-db/src/io_uring/memory.rs +++ b/accounts-db/src/io_uring/memory.rs @@ -1,9 +1,19 @@ -use std::{ - ops::{Deref, DerefMut}, - ptr::{self, NonNull}, - slice, +use { + agave_io_uring::{Ring, RingOp}, + std::{ + io, + ops::{Deref, DerefMut}, + ptr::{self, NonNull}, + slice, + }, }; +// We use fixed buffers to save the cost of mapping/unmapping them at each operation. +// +// Instead of doing many large allocations and registering those, we do a single large one +// and chunk it in slices of up to 1G each. +const FIXED_BUFFER_LEN: usize = 1024 * 1024 * 1024; + pub enum LargeBuffer { Vec(Vec), HugeTable(PageAlignedMemory), @@ -43,6 +53,7 @@ impl LargeBuffer { /// using HugeTable when it is available on the host. pub fn new(size: usize) -> Self { if size > PageAlignedMemory::page_size() { + let size = size.next_power_of_two(); if let Ok(alloc) = PageAlignedMemory::alloc_huge_table(size) { log::info!("obtained hugetable io_uring buffer (len={size})"); return Self::HugeTable(alloc); @@ -120,49 +131,137 @@ impl DerefMut for PageAlignedMemory { } } -/// Fixed mutable view into externally allocated bytes buffer +/// Fixed mutable view into externally allocated IO bytes buffer +/// registered in `io_uring` for access in scheduled IO operations. /// -/// It is an unsafe (no lifetime tracking) equivalent of `&mut [u8]` -pub struct BorrowedBytesMut { +/// It is used as an unsafe (no lifetime tracking) equivalent of `&mut [u8]`. +#[derive(Debug)] +pub(super) struct FixedIoBuffer { ptr: *mut u8, size: usize, + io_buf_index: Option, } -impl BorrowedBytesMut { +impl FixedIoBuffer { pub const fn empty() -> Self { Self { ptr: std::ptr::null_mut(), size: 0, + io_buf_index: None, } } - pub fn from_mut_slice(buf: &mut [u8]) -> Self { - Self { - ptr: buf.as_mut_ptr(), - size: buf.len(), - } - } + /// Split buffer into `chunk_size` sized `IoFixedBuffer` buffers for use as registered + /// buffer in io_uring operations. + pub unsafe fn split_buffer_chunks( + buffer: &mut [u8], + chunk_size: usize, + ) -> impl Iterator + use<'_> { + assert!( + buffer.len() / FIXED_BUFFER_LEN <= u16::MAX as usize, + "buffer too large to register in io_uring" + ); + let buf_start = buffer.as_ptr() as usize; - pub fn as_mut_ptr(&self) -> *mut u8 { - self.ptr + buffer.chunks_exact_mut(chunk_size).map(move |buf| { + let io_buf_index = (buf.as_ptr() as usize - buf_start) / FIXED_BUFFER_LEN; + Self { + ptr: buf.as_mut_ptr(), + size: buf.len(), + io_buf_index: Some(io_buf_index as u16), + } + }) } pub fn len(&self) -> usize { self.size } + /// Safety: while just returning without dereferencing a pointer is safe, this is marked unsafe + /// so that the callers are encouraged to reason about the lifetime of the buffer. + pub unsafe fn as_mut_ptr(&self) -> *mut u8 { + self.ptr + } + + /// The index of the fixed buffer in the ring. See register_buffers(). + pub fn io_buf_index(&self) -> Option { + self.io_buf_index + } + /// Return a clone of `self` reduced to specified `size` - pub fn sub_buf_to(&self, size: usize) -> Self { + pub fn into_shrinked(self, size: usize) -> Self { assert!(size <= self.size); Self { ptr: self.ptr, size, + io_buf_index: self.io_buf_index, } } + + /// Registed provided buffer as fixed buffer in `io_uring`. + pub unsafe fn register>( + buffer: &mut [u8], + ring: &Ring, + ) -> io::Result<()> { + adjust_ulimit_memlock(buffer.len())?; + let iovecs = buffer + .chunks(FIXED_BUFFER_LEN) + .map(|buf| libc::iovec { + iov_base: buf.as_ptr() as _, + iov_len: buf.len(), + }) + .collect::>(); + unsafe { ring.register_buffers(&iovecs) } + } } -impl AsRef<[u8]> for BorrowedBytesMut { +impl AsRef<[u8]> for FixedIoBuffer { fn as_ref(&self) -> &[u8] { unsafe { slice::from_raw_parts(self.ptr, self.size) } } } + +pub fn adjust_ulimit_memlock(min_required: usize) -> io::Result<()> { + // This value reflects recommended memory lock limit documented in the validator's + // setup instructions at docs/src/operations/guides/validator-start.md + const DESIRED_MEMLOCK: u64 = 2_000_000_000; + + fn get_memlock() -> libc::rlimit { + let mut memlock = libc::rlimit { + rlim_cur: 0, + rlim_max: 0, + }; + if unsafe { libc::getrlimit(libc::RLIMIT_MEMLOCK, &mut memlock) } != 0 { + log::warn!("getrlimit(RLIMIT_MEMLOCK) failed"); + } + memlock + } + + let mut memlock = get_memlock(); + let current = memlock.rlim_cur as usize; + if current < min_required { + memlock.rlim_cur = DESIRED_MEMLOCK; + memlock.rlim_max = DESIRED_MEMLOCK; + if unsafe { libc::setrlimit(libc::RLIMIT_MEMLOCK, &memlock) } != 0 { + log::error!( + "Unable to increase the maximum memory lock limit to {} from {current}", + memlock.rlim_cur + ); + + if cfg!(target_os = "macos") { + log::error!( + "On mac OS you may need to run \ + |sudo launchctl limit memlock {DESIRED_MEMLOCK} {DESIRED_MEMLOCK}| first" + ); + } + return Err(io::Error::new( + io::ErrorKind::OutOfMemory, + "unable to set memory lock limit", + )); + } + + memlock = get_memlock(); + log::info!("Bumped maximum memory lock limit: {}", memlock.rlim_cur); + } + Ok(()) +} diff --git a/accounts-db/src/io_uring/mod.rs b/accounts-db/src/io_uring/mod.rs index d42d5cf7c67..aec1bb0ead1 100644 --- a/accounts-db/src/io_uring/mod.rs +++ b/accounts-db/src/io_uring/mod.rs @@ -1,5 +1,12 @@ #![cfg(target_os = "linux")] pub mod dir_remover; +pub mod file_creator; pub mod memory; pub mod sequential_file_reader; + +// Based on Linux +const IO_PRIO_CLASS_SHIFT: u16 = 13; +const IO_PRIO_CLASS_BE: u16 = 2; +const IO_PRIO_LEVEL_HIGHEST: u16 = 0; +const IO_PRIO_BE_HIGHEST: u16 = IO_PRIO_CLASS_BE << IO_PRIO_CLASS_SHIFT | IO_PRIO_LEVEL_HIGHEST; diff --git a/accounts-db/src/io_uring/sequential_file_reader.rs b/accounts-db/src/io_uring/sequential_file_reader.rs index 93f37b22ae8..eaf6b64edf1 100644 --- a/accounts-db/src/io_uring/sequential_file_reader.rs +++ b/accounts-db/src/io_uring/sequential_file_reader.rs @@ -1,32 +1,30 @@ use { - crate::io_uring::memory::{BorrowedBytesMut, LargeBuffer}, + super::{ + memory::{FixedIoBuffer, LargeBuffer}, + IO_PRIO_BE_HIGHEST, + }, agave_io_uring::{Completion, Ring, RingOp}, io_uring::{opcode, squeue, types, IoUring}, std::{ - fs::File, + fs::{File, OpenOptions}, io::{self, BufRead, Cursor, Read}, mem, - os::fd::{AsRawFd as _, RawFd}, + os::{ + fd::{AsRawFd as _, RawFd}, + unix::fs::OpenOptionsExt, + }, path::Path, }, }; +// Based on transfers seen with `dd bs=SIZE` for NVME drives: values >=64KiB are fine, +// but peak at 1MiB. Also compare with particular NVME parameters, e.g. +// 32 pages (Maximum Data Transfer Size) * page size (MPSMIN = Memory Page Size) = 128KiB. const DEFAULT_READ_SIZE: usize = 1024 * 1024; -#[allow(dead_code)] -const DEFAULT_BUFFER_SIZE: usize = 64 * DEFAULT_READ_SIZE; const SQPOLL_IDLE_TIMEOUT: u32 = 50; -const MAX_IOWQ_WORKERS: u32 = 4; - -// Based on Linux -const IO_PRIO_CLASS_SHIFT: u16 = 13; -const IO_PRIO_CLASS_BE: u16 = 2; -const IO_PRIO_LEVEL_HIGHEST: u16 = 0; -const IO_PRIO_BE_HIGHEST: u16 = IO_PRIO_CLASS_BE << IO_PRIO_CLASS_SHIFT | IO_PRIO_LEVEL_HIGHEST; - -// We register fixed buffers in chunks of up to 1GB as this is faster than registering many -// `read_capacity` buffers. Registering fixed buffers saves the kernel some work in -// checking/mapping/unmapping buffers for each read operation. -const FIXED_BUFFER_LEN: usize = 1024 * 1024 * 1024; +// For large file we don't really use workers as few regularly submitted requests get handled +// within sqpoll thread. Allow some workers just in case, but limit them. +const MAX_IOWQ_WORKERS: u32 = 2; /// Reader for non-seekable files. /// @@ -34,21 +32,12 @@ const FIXED_BUFFER_LEN: usize = 1024 * 1024 * 1024; pub struct SequentialFileReader { // Note: state is tied to `backing_buffer` and contains unsafe pointer references to it inner: Ring, - /// Owned buffer used across lifespan of `inner` (should get dropped last) - #[allow(dead_code)] - backing_buffer: B, + /// Owned buffer used (chunked into `FixedIoBuffer` items) across lifespan of `inner` + /// (should get dropped last) + _backing_buffer: B, } impl SequentialFileReader { - /// Create a new `SequentialFileReader` for the given `path` using internally allocated - /// large buffer and default read size. - /// - /// See [SequentialFileReader::with_buffer] for more information. - #[allow(dead_code)] - pub fn new(path: impl AsRef) -> io::Result { - Self::with_capacity(DEFAULT_BUFFER_SIZE, path) - } - /// Create a new `SequentialFileReader` for the given `path` using internally allocated /// buffer of specified `buf_size` and default read size. pub fn with_capacity(buf_size: usize, path: impl AsRef) -> io::Result { @@ -76,16 +65,16 @@ impl> SequentialFileReader { mut buffer: B, read_capacity: usize, ) -> io::Result { - let buf_len = buffer.as_mut().len(); - // Let submission queue hold half of buffers before we explicitly syscall // to submit them for reading. - let ring_qsize = (buf_len / read_capacity / 2).max(1) as u32; + let ring_qsize = (buffer.as_mut().len() / read_capacity / 2).max(1) as u32; let ring = IoUring::builder() .setup_sqpoll(SQPOLL_IDLE_TIMEOUT) .build(ring_qsize)?; + // Maximum number of spawned [bounded IO, unbounded IO] kernel threads, we don't expect + // any unbounded work, but limit it to 1 just in case (0 leaves it unlimited). ring.submitter() - .register_iowq_max_workers(&mut [MAX_IOWQ_WORKERS, 0])?; + .register_iowq_max_workers(&mut [MAX_IOWQ_WORKERS, 1])?; Self::with_buffer_and_ring(buffer, ring, path, read_capacity) } @@ -104,25 +93,15 @@ impl> SequentialFileReader { "buffer size must be a multiple of read_capacity" ); - // Split the buffer into `read_capacity` sized chunks. - let buf_start = buffer.as_ptr() as usize; - let buffers = buffer - .chunks_exact_mut(read_capacity) - .map(|buf| { - let io_buf_index = (buf.as_ptr() as usize - buf_start) / FIXED_BUFFER_LEN; - ReadBufState::Uninit { - io_buf_index, - buf: BorrowedBytesMut::from_mut_slice(buf), - } - }) - .collect::>(); - - let file = std::os::unix::fs::OpenOptionsExt::custom_flags( - std::fs::OpenOptions::new().read(true), - libc::O_NOATIME, - ) - .open(path)?; - + let file = OpenOptions::new() + .read(true) + .custom_flags(libc::O_NOATIME) + .open(path)?; + // Safety: buffers contain unsafe pointers to `buffer`, but we make sure they are + // dropped before `backing_buffer` is dropped. + let buffers = unsafe { FixedIoBuffer::split_buffer_chunks(buffer, read_capacity) } + .map(ReadBufState::Uninit) + .collect(); let ring = Ring::new( ring, SequentialFileReaderState { @@ -134,21 +113,14 @@ impl> SequentialFileReader { current_buf: 0, }, ); - let iovecs = buffer - .chunks(FIXED_BUFFER_LEN) - .map(|buf| libc::iovec { - iov_base: buf.as_ptr() as _, - iov_len: buf.len(), - }) - .collect::>(); - // Safety: - // The iovecs point to a buffer which is guaranteed to be valid for the - // lifetime of the reader - unsafe { ring.register_buffers(&iovecs)? }; + + // Safety: kernel holds unsafe pointers to `buffer`, struct field declaration order + // guarantees that the ring is destroyed before `_backing_buffer` is dropped. + unsafe { FixedIoBuffer::register(buffer, &ring)? }; let mut reader = Self { inner: ring, - backing_buffer, + _backing_buffer: backing_buffer, }; // Start reading all buffers. @@ -180,12 +152,11 @@ impl> SequentialFileReader { } = &mut self.inner.context_mut(); let read_buf = mem::replace(&mut buffers[index], ReadBufState::Reading); match read_buf { - ReadBufState::Uninit { buf, io_buf_index } => { + ReadBufState::Uninit(buf) => { let op = ReadOp { fd: file.as_raw_fd(), buf, buf_off: 0, - io_buf_index, file_off: *offset, read_len: *read_capacity, reader_buf_index: index, @@ -198,7 +169,7 @@ impl> SequentialFileReader { // Safety: // The op points to a buffer which is guaranteed to be valid for // the lifetime of the operation - self.inner.push(op)? + self.inner.push(op)?; } _ => unreachable!("called start_reading_buf on a non-empty buffer"), } @@ -228,10 +199,7 @@ impl> BufRead for SequentialFileReader { let num_buffers = state.buffers.len(); let read_buf = &mut state.buffers[state.current_buf]; match read_buf { - ReadBufState::Full { - ref mut cursor, - io_buf_index, - } => { + ReadBufState::Full(ref mut cursor) => { if !cursor.fill_buf()?.is_empty() { // we have some data available break true; @@ -247,7 +215,7 @@ impl> BufRead for SequentialFileReader { state.current_buf = (state.current_buf + 1) % num_buffers; } else { // we have finished consuming this buffer, queue the next read - let cursor = mem::replace(cursor, Cursor::new(BorrowedBytesMut::empty())); + let cursor = mem::replace(cursor, Cursor::new(FixedIoBuffer::empty())); let buf = cursor.into_inner(); // The very last read when we hit EOF could return less than `read_capacity`, in @@ -258,10 +226,7 @@ impl> BufRead for SequentialFileReader { // didn't reset the length it wouldn't matter. debug_assert!(buf.len() == state.read_capacity); - state.buffers[index] = ReadBufState::Uninit { - buf, - io_buf_index: *io_buf_index, - }; + state.buffers[index] = ReadBufState::Uninit(buf); state.current_buf = (state.current_buf + 1) % num_buffers; self.start_reading_buf(index)?; @@ -270,7 +235,7 @@ impl> BufRead for SequentialFileReader { // move to the next buffer and check again whether we have data continue; } - ReadBufState::Uninit { .. } => unreachable!("should be initialized"), + ReadBufState::Uninit(_) => unreachable!("should be initialized"), _ => break false, } }; @@ -280,8 +245,8 @@ impl> BufRead for SequentialFileReader { let state = self.inner.context(); match &state.buffers[state.current_buf] { - ReadBufState::Full { .. } => break, - ReadBufState::Uninit { .. } => unreachable!("should be initialized"), + ReadBufState::Full(_) => break, + ReadBufState::Uninit(_) => unreachable!("should be initialized"), // Still no data, wait for more completions, but submit in case the SQPOLL // thread is asleep and there are queued entries in the submission queue. ReadBufState::Reading => self.inner.submit()?, @@ -291,7 +256,7 @@ impl> BufRead for SequentialFileReader { // At this point we must have data or be at EOF. let state = self.inner.context_mut(); match &mut state.buffers[state.current_buf] { - ReadBufState::Full { cursor, .. } => Ok(cursor.fill_buf()?), + ReadBufState::Full(cursor) => Ok(cursor.fill_buf()?), // after the loop above we either have some data or we must be at EOF _ => unreachable!(), } @@ -300,7 +265,7 @@ impl> BufRead for SequentialFileReader { fn consume(&mut self, amt: usize) { let state = self.inner.context_mut(); match &mut state.buffers[state.current_buf] { - ReadBufState::Full { cursor, .. } => cursor.consume(amt), + ReadBufState::Full(cursor) => cursor.consume(amt), _ => assert_eq!(amt, 0), } } @@ -309,37 +274,25 @@ impl> BufRead for SequentialFileReader { enum ReadBufState { /// The buffer is pending submission to read queue (on initialization and /// in transition from `Full` to `Reading`). - Uninit { - buf: BorrowedBytesMut, - io_buf_index: usize, - }, + Uninit(FixedIoBuffer), /// The buffer is currently being read and there's a corresponding ReadOp in /// the ring. Reading, /// The buffer is filled and ready to be consumed. - Full { - cursor: Cursor, - io_buf_index: usize, - }, + Full(Cursor), } impl std::fmt::Debug for ReadBufState { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { - Self::Uninit { - buf: _, - io_buf_index, - } => f + Self::Uninit(buf) => f .debug_struct("Uninit") - .field("io_buf_index", io_buf_index) + .field("io_buf_index", &buf.io_buf_index()) .finish(), Self::Reading => write!(f, "Reading"), - Self::Full { - cursor: _, - io_buf_index, - } => f + Self::Full(cursor) => f .debug_struct("Full") - .field("io_buf_index", io_buf_index) + .field("io_buf_index", &cursor.get_ref().io_buf_index()) .finish(), } } @@ -347,13 +300,11 @@ impl std::fmt::Debug for ReadBufState { struct ReadOp { fd: RawFd, - buf: BorrowedBytesMut, + buf: FixedIoBuffer, /// This is the offset inside the buffer. It's typically 0, but can be non-zero if a previous /// read returned less data than requested (because of EINTR or whatever) and we submitted a new /// read for the remaining data. buf_off: usize, - /// The index of the fixed buffer in the ring. See register_buffers(). - io_buf_index: usize, /// The offset in the file. file_off: usize, /// The length of the read. This is typically `read_capacity` but can be less if a previous read @@ -369,7 +320,7 @@ impl std::fmt::Debug for ReadOp { f.debug_struct("ReadOp") .field("fd", &self.fd) .field("buf_off", &self.buf_off) - .field("io_buf_index", &self.io_buf_index) + .field("io_buf_index", &self.buf.io_buf_index()) .field("file_off", &self.file_off) .field("read_len", &self.read_len) .field("reader_buf_index", &self.reader_buf_index) @@ -383,7 +334,6 @@ impl RingOp for ReadOp { fd, buf, buf_off, - io_buf_index, file_off, read_len, reader_buf_index: _, @@ -394,7 +344,8 @@ impl RingOp for ReadOp { // Safety: we assert that the buffer is large enough to hold the read. unsafe { buf.as_mut_ptr().byte_add(*buf_off) }, *read_len as u32, - *io_buf_index as u16, + buf.io_buf_index() + .expect("should have a valid fixed buffer"), ) .offset(*file_off as u64) .ioprio(IO_PRIO_BE_HIGHEST) @@ -410,7 +361,6 @@ impl RingOp for ReadOp { fd, buf, buf_off, - io_buf_index, file_off, read_len, reader_buf_index, @@ -423,14 +373,14 @@ impl RingOp for ReadOp { } let total_read_len = *buf_off + last_read_len; + let buf = mem::replace(buf, FixedIoBuffer::empty()); if last_read_len > 0 && last_read_len < *read_len { // Partial read, retry the op with updated offsets let op: ReadOp = ReadOp { fd: *fd, - buf: buf.sub_buf_to(buf.len()), // Still use the full buf + buf, buf_off: total_read_len, - io_buf_index: *io_buf_index, file_off: *file_off + last_read_len, read_len: *read_len - last_read_len, reader_buf_index: *reader_buf_index, @@ -440,10 +390,8 @@ impl RingOp for ReadOp { // lifetime of the operation completion.push(op); } else { - reader_state.buffers[*reader_buf_index] = ReadBufState::Full { - cursor: Cursor::new(buf.sub_buf_to(total_read_len)), - io_buf_index: *io_buf_index, - }; + reader_state.buffers[*reader_buf_index] = + ReadBufState::Full(Cursor::new(buf.into_shrinked(total_read_len))); } Ok(()) diff --git a/io-uring/src/lib.rs b/io-uring/src/lib.rs index 111e018ebe9..ceeae47c640 100644 --- a/io-uring/src/lib.rs +++ b/io-uring/src/lib.rs @@ -1,11 +1,11 @@ #![cfg(target_os = "linux")] mod ring; mod slab; -pub use ring::*; use { io_uring::IoUring, std::{io, sync::Once}, }; +pub use {ring::*, slab::FixedSlab}; pub fn io_uring_supported() -> bool { static mut IO_URING_SUPPORTED: bool = false; diff --git a/io-uring/src/ring.rs b/io-uring/src/ring.rs index 54d12828f25..fb647b3c8f2 100644 --- a/io-uring/src/ring.rs +++ b/io-uring/src/ring.rs @@ -6,7 +6,7 @@ use { IoUring, }, smallvec::{smallvec, SmallVec}, - std::{io, time::Duration}, + std::{io, os::fd::RawFd, time::Duration}, }; /// An io_uring instance. @@ -53,6 +53,17 @@ impl> Ring { self.ring.submitter().register_buffers(iovecs) } + /// Registers file descriptors as fixed for I/O with the kernel. + /// + /// Operations may then use `types::Fixed(index)` for index in `fds` to refer to the + /// registered file descriptor. + /// + /// `-1` values can be used as slots for kernel managed fixed file descriptors (created by + /// open operation). + pub fn register_files(&self, fds: &[RawFd]) -> io::Result<()> { + self.ring.submitter().register_files(fds) + } + /// Pushes an operation to the submission queue. /// /// Once completed, [RingOp::complete] will be called with the result. diff --git a/io-uring/src/slab.rs b/io-uring/src/slab.rs index c23075a53ff..1045c866f49 100644 --- a/io-uring/src/slab.rs +++ b/io-uring/src/slab.rs @@ -1,6 +1,6 @@ use slab::Slab; -pub(crate) struct FixedSlab { +pub struct FixedSlab { inner: Slab, } diff --git a/programs/sbf/Cargo.lock b/programs/sbf/Cargo.lock index e9affb391cb..c0023685cc5 100644 --- a/programs/sbf/Cargo.lock +++ b/programs/sbf/Cargo.lock @@ -5492,7 +5492,6 @@ dependencies = [ "solana-message", "solana-metrics", "solana-nohash-hasher", - "solana-perf", "solana-pubkey", "solana-rayon-threadlimit", "solana-rent-collector", diff --git a/runtime/src/snapshot_utils.rs b/runtime/src/snapshot_utils.rs index b8ecd51e3dd..7ee8dc8788b 100644 --- a/runtime/src/snapshot_utils.rs +++ b/runtime/src/snapshot_utils.rs @@ -25,7 +25,7 @@ use { account_storage_reader::AccountStorageReader, accounts_db::{AccountStorageEntry, AtomicAccountsFileId}, accounts_file::{AccountsFile, AccountsFileError, StorageAccess}, - hardened_unpack::{self, ArchiveChunker, BytesChannelReader, MultiBytes, UnpackError}, + hardened_unpack::{self, UnpackError}, utils::{move_and_async_delete_path, ACCOUNTS_RUN_DIR, ACCOUNTS_SNAPSHOT_DIR}, }, solana_clock::{Epoch, Slot}, @@ -35,7 +35,7 @@ use { cmp::Ordering, collections::{HashMap, HashSet}, fmt, fs, - io::{self, BufReader, BufWriter, Error as IoError, Read, Seek, Write}, + io::{self, BufRead, BufReader, BufWriter, Error as IoError, Read, Seek, Write}, mem, num::{NonZeroU64, NonZeroUsize}, ops::RangeInclusive, @@ -51,7 +51,7 @@ use { }; #[cfg(feature = "dev-context-only-utils")] use { - hardened_unpack::UnpackedAppendVecMap, rayon::prelude::*, + hardened_unpack::UnpackedAppendVecMap, solana_accounts_db::utils::create_accounts_run_and_snapshot_dirs, }; @@ -1564,9 +1564,6 @@ pub(crate) fn get_storages_to_serialize( .collect::>() } -// From testing, 4 seems to be a sweet spot for ranges of 60M-360M accounts and 16-64 cores. This may need to be tuned later. -const PARALLEL_UNTAR_READERS_DEFAULT: usize = 4; - /// Unarchives the given full and incremental snapshot archives, as long as they are compatible. pub fn verify_and_unarchive_snapshots( bank_snapshots_dir: impl AsRef, @@ -1580,8 +1577,6 @@ pub fn verify_and_unarchive_snapshots( incremental_snapshot_archive_info, )?; - let num_worker_threads = (num_cpus::get() / 4).clamp(1, PARALLEL_UNTAR_READERS_DEFAULT); - let next_append_vec_id = Arc::new(AtomicAccountsFileId::new(0)); let UnarchivedSnapshot { unpack_dir: full_unpack_dir, @@ -1597,7 +1592,6 @@ pub fn verify_and_unarchive_snapshots( "snapshot untar", account_paths, full_snapshot_archive_info.archive_format(), - num_worker_threads, next_append_vec_id.clone(), storage_access, )?; @@ -1624,7 +1618,6 @@ pub fn verify_and_unarchive_snapshots( "incremental snapshot untar", account_paths, incremental_snapshot_archive_info.archive_format(), - num_worker_threads, next_append_vec_id.clone(), storage_access, )?; @@ -1664,19 +1657,20 @@ pub fn verify_and_unarchive_snapshots( )) } -/// Spawns a thread for unpacking a snapshot -fn spawn_unpack_snapshot_thread( - chunks_receiver: crossbeam_channel::Receiver, +/// Streams unpacked files across channel +fn streaming_unarchive_snapshot( file_sender: Sender, - account_paths: Arc>, - ledger_dir: Arc, - thread_index: usize, + account_paths: Vec, + ledger_dir: PathBuf, + snapshot_archive_path: PathBuf, + archive_format: ArchiveFormat, ) -> JoinHandle> { Builder::new() - .name(format!("solUnpkSnpsht{thread_index:02}")) + .name("solTarUnpack".to_string()) .spawn(move || { + let decompressor = decompressed_tar_reader(archive_format, snapshot_archive_path)?; hardened_unpack::streaming_unpack_snapshot( - Archive::new(BytesChannelReader::new(chunks_receiver)), + Archive::new(decompressor), ledger_dir.as_path(), &account_paths, &file_sender, @@ -1686,70 +1680,20 @@ fn spawn_unpack_snapshot_thread( .unwrap() } -/// Streams unpacked files across channel -fn streaming_unarchive_snapshot( - file_sender: Sender, - account_paths: Vec, - ledger_dir: PathBuf, - snapshot_archive_path: PathBuf, - archive_format: ArchiveFormat, - num_threads: usize, -) -> Vec>> { - let account_paths = Arc::new(account_paths); - let ledger_dir = Arc::new(ledger_dir); - - let mut handles = vec![]; - - let (chunk_sender, chunk_receiver) = crossbeam_channel::bounded(num_threads * 2); - handles.push(spawn_archive_chunker_thread( - snapshot_archive_path, - archive_format, - chunk_sender, - )); - - for thread_index in 0..num_threads { - handles.push(spawn_unpack_snapshot_thread( - chunk_receiver.clone(), - file_sender.clone(), - account_paths.clone(), - ledger_dir.clone(), - thread_index, - )) - } - - handles -} - -fn archive_chunker_from_path( - archive_path: &Path, +fn decompressed_tar_reader( archive_format: ArchiveFormat, -) -> io::Result>>> { - const INPUT_READER_BUF_SIZE: usize = 128 * 1024 * 1024; - let buf_reader = solana_accounts_db::large_file_buf_reader(archive_path, INPUT_READER_BUF_SIZE) - .map_err(|err| { - IoError::other(format!( - "failed to open snapshot archive '{}': {err}", - archive_path.display(), - )) - })?; - let decompressor = ArchiveFormatDecompressor::new(archive_format, buf_reader)?; - Ok(ArchiveChunker::new(decompressor)) -} - -fn spawn_archive_chunker_thread( archive_path: impl AsRef, - archive_format: ArchiveFormat, - chunk_sender: Sender, -) -> JoinHandle> { - let archive_path = archive_path.as_ref().to_path_buf(); - Builder::new() - .name("solTarDecompr".to_string()) - .spawn(move || { - let chunker = archive_chunker_from_path(&archive_path, archive_format)?; - chunker.decode_and_send_chunks(chunk_sender)?; - Ok(()) - }) - .unwrap() +) -> Result>> { + const INPUT_READER_BUF_SIZE: usize = 128 * 1024 * 1024; + let buf_reader = + solana_accounts_db::large_file_buf_reader(archive_path.as_ref(), INPUT_READER_BUF_SIZE) + .map_err(|err| { + io::Error::other(format!( + "failed to open snapshot archive '{}': {err}", + archive_path.as_ref().display(), + )) + })?; + Ok(ArchiveFormatDecompressor::new(archive_format, buf_reader)?) } /// Used to determine if a filename is structured like a version file, bank file, or storage file @@ -1901,7 +1845,6 @@ fn unarchive_snapshot( measure_name: &'static str, account_paths: &[PathBuf], archive_format: ArchiveFormat, - num_untar_threads: usize, next_append_vec_id: Arc, storage_access: StorageAccess, ) -> Result { @@ -1911,18 +1854,15 @@ fn unarchive_snapshot( let unpacked_snapshots_dir = unpack_dir.path().join("snapshots"); let (file_sender, file_receiver) = crossbeam_channel::unbounded(); - let unarchive_handles = streaming_unarchive_snapshot( + let unarchive_handle = streaming_unarchive_snapshot( file_sender, account_paths.to_vec(), unpack_dir.path().to_path_buf(), snapshot_archive_path.as_ref().to_path_buf(), archive_format, - num_untar_threads, ); - let num_rebuilder_threads = num_cpus::get_physical() - .saturating_sub(num_untar_threads) - .max(1); + let num_rebuilder_threads = num_cpus::get_physical().saturating_sub(1).max(1); let snapshot_result = snapshot_fields_from_files(&file_receiver).and_then( |SnapshotFieldsBundle { snapshot_version, @@ -1959,9 +1899,7 @@ fn unarchive_snapshot( }) }, ); - for handle in unarchive_handles { - handle.join().unwrap()?; - } + unarchive_handle.join().unwrap()?; snapshot_result } @@ -2465,24 +2403,9 @@ fn unpack_snapshot_local( num_threads: usize, ) -> Result { assert!(num_threads > 0); - - let (chunk_sender, chunk_receiver) = crossbeam_channel::bounded(num_threads); - let handle = spawn_archive_chunker_thread(snapshot_path, archive_format, chunk_sender); - - // create 'num_threads' # of parallel workers, each receiving chunks of archive to extract. - let all_unpacked_append_vec_map = (0..num_threads) - .into_par_iter() - .map(|_| { - let archive_subset = Archive::new(BytesChannelReader::new(chunk_receiver.clone())); - hardened_unpack::unpack_snapshot(archive_subset, ledger_dir, account_paths) - }) - .collect::>(); - handle.join().unwrap()?; - - let mut unpacked_append_vec_map = UnpackedAppendVecMap::new(); - for h in all_unpacked_append_vec_map { - unpacked_append_vec_map.extend(h?); - } + let archive = Archive::new(decompressed_tar_reader(archive_format, snapshot_path)?); + let unpacked_append_vec_map = + hardened_unpack::unpack_snapshot(archive, ledger_dir, account_paths)?; Ok(unpacked_append_vec_map) } diff --git a/svm/examples/Cargo.lock b/svm/examples/Cargo.lock index 9a7ef415bcb..73fc4f96fe2 100644 --- a/svm/examples/Cargo.lock +++ b/svm/examples/Cargo.lock @@ -5325,7 +5325,6 @@ dependencies = [ "solana-message", "solana-metrics", "solana-nohash-hasher", - "solana-perf", "solana-pubkey", "solana-rayon-threadlimit", "solana-rent-collector",