diff --git a/fs/src/buffered_reader.rs b/fs/src/buffered_reader.rs index de96dca2b68..578b640d27f 100644 --- a/fs/src/buffered_reader.rs +++ b/fs/src/buffered_reader.rs @@ -390,10 +390,9 @@ pub fn large_file_buf_reader(path: &Path, buf_size: usize) -> io::Result=64KiB are fine, // but peak at 1MiB. Also compare with particular NVME parameters, e.g. // 32 pages (Maximum Data Transfer Size) * page size (MPSMIN = Memory Page Size) = 128KiB. -pub const DEFAULT_READ_SIZE: usize = 1024 * 1024; +const DEFAULT_READ_SIZE: usize = 1024 * 1024; // For large file we don't really use workers as few regularly submitted requests get handled // within sqpoll thread. Allow some workers just in case, but limit them. -const MAX_IOWQ_WORKERS: u32 = 2; +const DEFAULT_MAX_IOWQ_WORKERS: u32 = 2; -/// Reader for non-seekable files. -/// -/// Implements read-ahead using io_uring. -pub struct SequentialFileReader { - // Note: state is tied to `backing_buffer` and contains unsafe pointer references to it - inner: Ring, - /// Owned buffer used (chunked into `FixedIoBuffer` items) across lifespan of `inner` - /// (should get dropped last) - _backing_buffer: B, +/// Utility for building `SequentialFileReader` with specified tuning options. +pub struct SequentialFileReaderBuilder { + read_capacity: usize, + max_iowq_workers: u32, + ring_squeue_size: Option, + /// Register buffer as fixed with the kernel + register_buffer: bool, } -impl SequentialFileReader { - /// Create a new `SequentialFileReader` for the given `path` using internally allocated - /// buffer of specified `buf_size` and default read size. - pub fn with_capacity(buf_size: usize, path: impl AsRef) -> io::Result { - Self::with_buffer(path, LargeBuffer::new(buf_size), DEFAULT_READ_SIZE) +impl SequentialFileReaderBuilder { + pub fn new() -> Self { + Self { + read_capacity: DEFAULT_READ_SIZE, + max_iowq_workers: DEFAULT_MAX_IOWQ_WORKERS, + ring_squeue_size: None, + register_buffer: true, + } } -} -/// Holds the state of the reader. -struct SequentialFileReaderState { - file: File, - read_capacity: usize, - offset: usize, - eof_buf_index: Option, - buffers: Vec, - current_buf: usize, -} + /// Override the default size of a single IO read operation + /// + /// This influences the concurrency, since buffer is divided into chunks of this size. + #[cfg(test)] + pub fn read_capacity(mut self, read_capacity: usize) -> Self { + self.read_capacity = read_capacity; + self + } -impl> SequentialFileReader { - /// Create a new `SequentialFileReader` for the given file using provided backing `buffer`. + /// Build a new `SequentialFileReader` with internally allocated buffer. + /// + /// Buffer will hold at least `buf_capacity` bytes (increased to `read_capacity` if it's lower). + /// + /// Initially the reader is idle and starts reading after `set_file` is called. It will then execute + /// multiple `read_capacity` sized reads in parallel to fill the buffer. + pub fn build( + self, + path: impl AsRef, + buf_capacity: usize, + ) -> io::Result> { + let buf_capacity = buf_capacity.max(self.read_capacity); + let buffer = LargeBuffer::new(buf_capacity); + self.build_with_buffer(path, buffer) + } + + /// Build a new `SequentialFileReader` with a user-supplied buffer /// /// `buffer` is the internal buffer used for reading. It must be at least `read_capacity` long. + /// + /// Initially the reader is idle and starts reading after the first file is added. /// The reader will execute multiple `read_capacity` sized reads in parallel to fill the buffer. - pub fn with_buffer( + pub fn build_with_buffer>( + self, path: impl AsRef, mut buffer: B, - read_capacity: usize, - ) -> io::Result { - let buf_capacity = buffer.as_mut().len(); + ) -> io::Result> { + // Align buffer capacity to read capacity, so we always read equally sized chunks + let buf_capacity = buffer.as_mut().len() / self.read_capacity * self.read_capacity; + assert_ne!(buf_capacity, 0, "read size aligned buffer is too small"); + let buf_slice_mut = &mut buffer.as_mut()[..buf_capacity]; + + let state = SequentialFileReaderState::new(path, buf_slice_mut, self.read_capacity)?; + + let io_uring = self.create_io_uring(buf_capacity)?; + let ring = Ring::new(io_uring, state); + if self.register_buffer { + // Safety: kernel holds unsafe pointers to `buffer`, struct field declaration order + // guarantees that the ring is destroyed before `_backing_buffer` is dropped. + unsafe { FixedIoBuffer::register(buf_slice_mut, &ring)? }; + } + + let mut reader = SequentialFileReader { + inner: ring, + _backing_buffer: buffer, + }; + reader.start_reading()?; + Ok(reader) + } + + fn create_io_uring(&self, buf_capacity: usize) -> io::Result { // Let all buffers be submitted for reading at any time - let max_inflight_ops = (buf_capacity / read_capacity) as u32; + let max_inflight_ops = (buf_capacity / self.read_capacity) as u32; // Completions arrive in bursts (batching done by the disk controller and the kernel). // By submitting smaller chunks we decrease the likelihood that we stall on a full completion queue. // Also, in order to keep some operations submitted at all times, we will `submit` them half-way // through the buffer (at the cost of doubling syscalls) to let kernel work on one half while the other // half is read by the user. - let ring_squeue_size = (max_inflight_ops / 2).max(1); - + let ring_squeue_size = self + .ring_squeue_size + .unwrap_or((max_inflight_ops / 2).max(1)); // agave io_uring uses cqsize to define state slab size, so cqsize == max inflight ops - let ring = io_uring::IoUring::builder() + let ring = IoUring::builder() .setup_cqsize(max_inflight_ops) .build(ring_squeue_size)?; // Maximum number of spawned [bounded IO, unbounded IO] kernel threads, we don't expect // any unbounded work, but limit it to 1 just in case (0 leaves it unlimited). ring.submitter() - .register_iowq_max_workers(&mut [MAX_IOWQ_WORKERS, 1])?; - Self::with_buffer_and_ring(buffer, ring, path, read_capacity) + .register_iowq_max_workers(&mut [self.max_iowq_workers, 1])?; + Ok(ring) } +} - /// Create a new `SequentialFileReader` for the given file, using a custom - /// ring instance. - fn with_buffer_and_ring( - mut backing_buffer: B, - ring: IoUring, - path: impl AsRef, - read_capacity: usize, - ) -> io::Result { - let buffer = backing_buffer.as_mut(); - assert!(buffer.len() >= read_capacity, "buffer too small"); - let read_aligned_buf_len = buffer.len() / read_capacity * read_capacity; - let buffer = &mut buffer[..read_aligned_buf_len]; - - let file = OpenOptions::new() - .read(true) - .custom_flags(libc::O_NOATIME) - .open(path)?; - // Safety: buffers contain unsafe pointers to `buffer`, but we make sure they are - // dropped before `backing_buffer` is dropped. - let buffers = unsafe { FixedIoBuffer::split_buffer_chunks(buffer, read_capacity) } - .map(ReadBufState::Uninit) - .collect(); - let ring = Ring::new( - ring, - SequentialFileReaderState { - file, - read_capacity, - buffers, - offset: 0, - eof_buf_index: None, - current_buf: 0, - }, - ); - - // Safety: kernel holds unsafe pointers to `buffer`, struct field declaration order - // guarantees that the ring is destroyed before `_backing_buffer` is dropped. - unsafe { FixedIoBuffer::register(buffer, &ring)? }; - - let mut reader = Self { - inner: ring, - _backing_buffer: backing_buffer, - }; +/// Reader for non-seekable files. +/// +/// Implements read-ahead using io_uring. +pub struct SequentialFileReader { + // Note: state is tied to `backing_buffer` and contains unsafe pointer references to it + inner: Ring, + /// Owned buffer used (chunked into `FixedIoBuffer` items) across lifespan of `inner` + /// (should get dropped last) + _backing_buffer: B, +} - // Start reading all buffers. - for i in 0..reader.inner.context().buffers.len() { - reader.start_reading_buf(i)?; +impl> SequentialFileReader { + fn start_reading(&mut self) -> io::Result<()> { + for i in 0..self.inner.context().buffers.len() { + self.start_reading_buf(i)?; } - // Make sure work is started in case submission queue is large and we - // never submitted work when adding buffers. - reader.inner.submit()?; - - Ok(reader) + self.inner.submit()?; + Ok(()) } /// Start reading into the buffer at `index`. @@ -281,6 +288,38 @@ impl> BufRead for SequentialFileReader { } } +/// Holds the state of the reader. +struct SequentialFileReaderState { + file: File, + read_capacity: usize, + offset: usize, + eof_buf_index: Option, + buffers: Vec, + current_buf: usize, +} + +impl SequentialFileReaderState { + fn new(path: impl AsRef, buffer: &mut [u8], read_capacity: usize) -> io::Result { + let file = OpenOptions::new() + .read(true) + .custom_flags(libc::O_NOATIME) + .open(path)?; + // Safety: buffers contain unsafe pointers to `buffer`, but we make sure they are + // dropped before `backing_buffer` in `SequentialFileReader` is dropped. + let buffers = unsafe { FixedIoBuffer::split_buffer_chunks(buffer, read_capacity) } + .map(ReadBufState::Uninit) + .collect(); + Ok(Self { + file, + read_capacity, + buffers, + offset: 0, + eof_buf_index: None, + current_buf: 0, + }) + } +} + enum ReadBufState { /// The buffer is pending submission to read queue (on initialization and /// in transition from `Full` to `Reading`). @@ -424,8 +463,10 @@ mod tests { io::Write::write_all(&mut temp_file, &pattern[..file_size % pattern.len()]).unwrap(); let buf = vec![0; backing_buffer_size]; - let mut reader = - SequentialFileReader::with_buffer(temp_file.path(), buf, read_capacity).unwrap(); + let mut reader = SequentialFileReaderBuilder::new() + .read_capacity(read_capacity) + .build_with_buffer(temp_file.path(), buf) + .unwrap(); // Read contents from the reader and verify length let mut all_read_data = Vec::new();