From 990588781516487b6951768db515e92f77240b77 Mon Sep 17 00:00:00 2001 From: Kamil Skalski Date: Thu, 4 Dec 2025 14:34:21 +0800 Subject: [PATCH] Implement FileBufRead for SequentialFileReader --- fs/src/buffered_reader.rs | 6 +- fs/src/io_setup.rs | 3 +- fs/src/io_uring/memory.rs | 20 +- fs/src/io_uring/sequential_file_reader.rs | 865 ++++++++++++++++------ 4 files changed, 660 insertions(+), 234 deletions(-) diff --git a/fs/src/buffered_reader.rs b/fs/src/buffered_reader.rs index 68807acbd9c..2cdca22d99e 100644 --- a/fs/src/buffered_reader.rs +++ b/fs/src/buffered_reader.rs @@ -401,10 +401,12 @@ pub fn large_file_buf_reader( assert!(agave_io_uring::io_uring_supported()); use crate::io_uring::sequential_file_reader::SequentialFileReaderBuilder; - SequentialFileReaderBuilder::new() + let mut reader = SequentialFileReaderBuilder::new() .shared_sqpoll(io_setup.shared_sqpoll_fd()) .use_registered_buffers(io_setup.use_registered_io_uring_buffers) - .build(path, buf_size) + .build(buf_size)?; + reader.set_path(path)?; + Ok(reader) } #[cfg(not(target_os = "linux"))] { diff --git a/fs/src/io_setup.rs b/fs/src/io_setup.rs index 247565ed626..3e80f28be15 100644 --- a/fs/src/io_setup.rs +++ b/fs/src/io_setup.rs @@ -82,8 +82,9 @@ mod tests { .build(1 << 20, move |file_info| { let mut reader = SequentialFileReaderBuilder::new() .shared_sqpoll(io_setup.shared_sqpoll_fd()) - .build(file_info.path, 1 << 20) + .build(1 << 20) .unwrap(); + reader.set_path(file_info.path).unwrap(); reader .read_to_end(read_bytes_ref.write().unwrap().as_mut()) .unwrap(); diff --git a/fs/src/io_uring/memory.rs b/fs/src/io_uring/memory.rs index d451dd36e2b..3f42c316c1b 100644 --- a/fs/src/io_uring/memory.rs +++ b/fs/src/io_uring/memory.rs @@ -222,6 +222,10 @@ impl IoBufferChunk { self.size } + pub fn as_ptr(&self) -> *const u8 { + self.ptr + } + /// Safety: while just returning without dereferencing a pointer is safe, this is marked unsafe /// so that the callers are encouraged to reason about the lifetime of the buffer. pub unsafe fn as_mut_ptr(&self) -> *mut u8 { @@ -233,16 +237,6 @@ impl IoBufferChunk { self.registered_io_buf_index } - /// Return a clone of `self` reduced to specified `size` - pub fn into_shrinked(self, size: IoSize) -> Self { - assert!(size <= self.size); - Self { - ptr: self.ptr, - size, - registered_io_buf_index: self.registered_io_buf_index, - } - } - /// Register provided buffer as fixed buffer in `io_uring`. pub unsafe fn register>( buffer: &mut [u8], @@ -258,9 +252,3 @@ impl IoBufferChunk { unsafe { ring.register_buffers(&iovecs) } } } - -impl AsRef<[u8]> for IoBufferChunk { - fn as_ref(&self) -> &[u8] { - unsafe { slice::from_raw_parts(self.ptr, self.size as usize) } - } -} diff --git a/fs/src/io_uring/sequential_file_reader.rs b/fs/src/io_uring/sequential_file_reader.rs index a867b76a41d..f2deb889d6a 100644 --- a/fs/src/io_uring/sequential_file_reader.rs +++ b/fs/src/io_uring/sequential_file_reader.rs @@ -5,18 +5,22 @@ use { memory::{IoBufferChunk, PageAlignedMemory}, IO_PRIO_BE_HIGHEST, }, - crate::{io_uring::sqpoll, FileSize, IoSize}, + crate::{buffered_reader::FileBufRead, io_uring::sqpoll, FileSize, IoSize}, agave_io_uring::{Completion, Ring, RingOp}, io_uring::{opcode, squeue, types, IoUring}, std::{ + collections::VecDeque, fs::{File, OpenOptions}, - io::{self, BufRead, Cursor, Read}, + io::{self, BufRead, Read}, + marker::PhantomData, mem, + ops::{Deref, DerefMut}, os::{ fd::{AsRawFd, BorrowedFd, RawFd}, unix::fs::OpenOptionsExt, }, path::Path, + slice, }, }; @@ -78,14 +82,10 @@ impl<'sp> SequentialFileReaderBuilder<'sp> { /// /// Initially the reader is idle and starts reading after `set_file` is called. It will then execute /// multiple `read_capacity` sized reads in parallel to fill the buffer. - pub fn build( - self, - path: impl AsRef, - buf_capacity: usize, - ) -> io::Result { + pub fn build<'a>(self, buf_capacity: usize) -> io::Result> { let buf_capacity = buf_capacity.max(self.read_capacity as usize); let buffer = PageAlignedMemory::new(buf_capacity)?; - self.build_with_buffer(path, buffer) + self.build_with_buffer(buffer) } /// Build a new `SequentialFileReader` with a user-supplied buffer @@ -94,11 +94,10 @@ impl<'sp> SequentialFileReaderBuilder<'sp> { /// /// Initially the reader is idle and starts reading after the first file is added. /// The reader will execute multiple `read_capacity` sized reads in parallel to fill the buffer. - fn build_with_buffer( + fn build_with_buffer<'a>( self, - path: impl AsRef, mut buffer: PageAlignedMemory, - ) -> io::Result { + ) -> io::Result> { // Align buffer capacity to read capacity, so we always read equally sized chunks let buf_capacity = buffer.as_mut().len() / self.read_capacity as usize * self.read_capacity as usize; @@ -117,10 +116,10 @@ impl<'sp> SequentialFileReaderBuilder<'sp> { .map(ReadBufState::Uninit) .collect(); - let state = SequentialFileReaderState::new(path, buffers, self.read_capacity)?; + let buffers_state = BuffersState(buffers); let io_uring = self.create_io_uring(buf_capacity)?; - let ring = Ring::new(io_uring, state); + let ring = Ring::new(io_uring, buffers_state); if self.register_buffer { // Safety: kernel holds unsafe pointers to `buffer`, struct field declaration order @@ -128,12 +127,12 @@ impl<'sp> SequentialFileReaderBuilder<'sp> { unsafe { IoBufferChunk::register(buf_slice_mut, &ring)? }; } - let mut reader = SequentialFileReader { - inner: ring, + Ok(SequentialFileReader { + ring, + state: SequentialFileReaderState::default(), _backing_buffer: buffer, - }; - reader.start_reading()?; - Ok(reader) + _phantom: PhantomData, + }) } fn create_io_uring(&self, buf_capacity: usize) -> io::Result { @@ -164,191 +163,446 @@ impl<'sp> SequentialFileReaderBuilder<'sp> { /// Reader for non-seekable files. /// /// Implements read-ahead using io_uring. -pub struct SequentialFileReader { - // Note: state is tied to `backing_buffer` and contains unsafe pointer references to it - inner: Ring, +pub struct SequentialFileReader<'a> { + // Note: ring's state is tied to `_backing_buffer` - contains unsafe pointer references + // to the buffer. Ring should be drained and dropped before `_backing_buffer`. + ring: Ring, + state: SequentialFileReaderState, /// Owned buffer used (chunked into `FixedIoBuffer` items) across lifespan of `inner` /// (should get dropped last) _backing_buffer: PageAlignedMemory, + _phantom: PhantomData<&'a ()>, } -impl SequentialFileReader { - fn start_reading(&mut self) -> io::Result<()> { - for i in 0..self.inner.context().buffers.len() { - self.start_reading_buf(i)?; - } - self.inner.submit()?; - Ok(()) +impl<'a> SequentialFileReader<'a> { + /// Open file under `path`, check its metadata to determine read limit and add it to the reader. + /// + /// See `add_owned_file_to_prefetch` for more details. + pub fn set_path(&mut self, path: impl AsRef) -> io::Result<()> { + let file = OpenOptions::new() + .read(true) + .custom_flags(libc::O_NOATIME) + .open(path)?; + let file_size = file.metadata()?.len(); + self.add_owned_file_to_prefetch(file, file_size) } - /// Start reading into the buffer at `index`. + /// Add `file` to read. Starts reading the file as soon as a buffer is available. /// - /// This is called at start and as soon as a buffer is fully consumed by BufRead::fill_buf(). + /// The read finishes when EOF is reached or `read_limit` bytes are read. + /// Multiple files can be added to the reader and they will be read-ahead in FIFO order. /// - /// Reads [state.offset, state.offset + state.read_capacity) from the file into - /// state.buffers[index]. Once a read is complete, ReadOp::complete(state) is called to update - /// the state. - fn start_reading_buf(&mut self, index: usize) -> io::Result<()> { - let SequentialFileReaderState { - buffers, - current_buf: _, - file, - offset, - read_capacity, - eof_buf_index: _, - } = &mut self.inner.context_mut(); - let read_buf = mem::replace(&mut buffers[index], ReadBufState::Reading); - match read_buf { - ReadBufState::Uninit(buf) => { - let op = ReadOp { - fd: file.as_raw_fd(), - buf, - buf_off: 0, - file_off: *offset, - read_len: *read_capacity, - reader_buf_index: index, - }; - - // We always advance by `read_capacity`. If we get a short read, we submit a new - // read for the remaining data. See ReadOp::complete(). - *offset += *read_capacity as FileSize; - - // Safety: - // The op points to a buffer which is guaranteed to be valid for - // the lifetime of the operation - self.inner.push(op)?; + /// Reader takes ownership of the file and will drop it after it's done reading + /// and `move_to_next_file` is called. + pub fn add_owned_file_to_prefetch( + &mut self, + file: File, + read_limit: FileSize, + ) -> io::Result<()> { + self.add_file_by_fd(file.as_raw_fd(), read_limit)?; + self.state.owned_files.push_back(file); + Ok(()) + } + + fn add_file_to_prefetch(&mut self, file: &'a File, read_limit: FileSize) -> io::Result<()> { + self.add_file_by_fd(file.as_raw_fd(), read_limit) + } + + /// Caller must ensure that the file is not closed while the reader is using it. + fn add_file_by_fd(&mut self, fd: RawFd, read_limit: FileSize) -> io::Result<()> { + self.state.files.push_back(FileState::new(fd, read_limit)); + + if self.state.all_buffers_used(self.ring.context()) { + // Just added file to backlog, no reads can be started yet. + return Ok(()); + } + + // There are free buffers, so we can start reading the new file. + self.state.next_read_file_index = + Some(self.state.next_read_file_index.map_or(0, |idx| idx + 1)); + + // Start reading as many buffers as necessary for queued files. + self.try_schedule_new_ops() + } + + /// When reading multiple files, this method moves the reader to the next file. + fn move_to_next_file(&mut self) -> io::Result<()> { + let state = &mut self.state; + + let Some(removed_file) = state.files.pop_front() else { + return Ok(()); + }; + + // Always reset in-file and in-buffer state + state.current_offset = 0; + state.current_buf_pos = 0; + state.current_buf_len = 0; + state.left_to_consume = 0; + + // Reclaim current and all subsequent unread buffers of removed file as uninitialized. + let sentinel_buf_index = state + .files + .front() + .and_then(|f| f.start_buf_index) + .unwrap_or(state.current_buf_index); + let num_bufs = self.ring.context().len(); + loop { + self.ring.process_completions()?; + let current_buf = self.ring.context_mut().get_mut(state.current_buf_index); + if current_buf.is_reading() { + // Still no data, wait for more completions, but submit in case there are queued + // entries in the submission queue. + self.ring.submit()?; + continue; + } + current_buf.transition_to_uninit(); + + let next_buf_index = (state.current_buf_index + 1) % num_bufs; + state.current_buf_index = next_buf_index; + if sentinel_buf_index == next_buf_index { + break; } - _ => unreachable!("called start_reading_buf on a non-empty buffer"), + } + + if state + .owned_files + .front() + .is_some_and(|f| removed_file.is_same_file(f)) + { + state.owned_files.pop_front(); + } + + if let Some(next_file_index) = state.next_read_file_index.as_mut() { + // Since file was removed from front, all indices are shifted by one + state.next_read_file_index = next_file_index.checked_sub(1); + if state.next_read_file_index.is_none() { + // The removed file was the current one being read + if state.files.is_empty() { + // Reader is empty, reset buf indices to initial values + state.current_buf_index = 0; + state.next_read_buf_index = 0; + } else { + // There are other files to read, start with the new first file + state.next_read_file_index = Some(0); + } + } + } + + self.try_schedule_new_ops() + } + + fn try_schedule_new_ops(&mut self) -> io::Result<()> { + // Start reading as many buffers as necessary for queued files. + while let Some(op) = self.state.next_read_op(self.ring.context_mut()) { + self.ring.push(op)?; } Ok(()) } + + fn wait_current_buf_full(&mut self) -> io::Result { + if self.state.files.is_empty() { + return Ok(false); + } + let num_bufs = self.ring.context().len(); + loop { + self.ring.process_completions()?; + + let state = &mut self.state; + let current_buf = &mut self.ring.context_mut().get_mut(state.current_buf_index); + match current_buf { + ReadBufState::Full { buf, eof_pos } => { + if state.current_buf_len == 0 { + state.current_buf_len = eof_pos.unwrap_or(buf.len()); + if state.left_to_consume > 0 { + let consumed = state + .left_to_consume + .min((state.current_buf_len - state.current_buf_pos) as usize); + state.left_to_consume -= consumed; + state.current_buf_pos += consumed as u32; + } + } + + // Note: we might have consumed whole buf from `left_to_consume` + if state.current_buf_pos < state.current_buf_len { + // We have some data available. + return Ok(true); + } + + if eof_pos.is_some() { + // Last filled buf for the whole file (until `move_to_next_file` is called). + return Ok(false); + } + // We have finished consuming this buffer - reset its state. + current_buf.transition_to_uninit(); + + // Next `fill_buf` will use subsequent buffer. + state.move_to_next_buf(num_bufs); + + // A buffer was freed, so try to queue up next read. + self.try_schedule_new_ops()?; + } + + ReadBufState::Reading => { + // Still no data, wait for more completions, but submit in case there are queued + // entries in the submission queue. + self.ring.submit()? + } + + ReadBufState::Uninit(_) => unreachable!("should be initialized"), + } + // Move to the next buffer and check again whether we have data. + } + } } // BufRead requires Read, but we never really use the Read interface. -impl Read for SequentialFileReader { +impl<'a> Read for SequentialFileReader<'a> { fn read(&mut self, buf: &mut [u8]) -> io::Result { let available = self.fill_buf()?; - if available.is_empty() { - return Ok(0); // EOF. - } - let bytes_to_read = available.len().min(buf.len()); + if bytes_to_read == 0 { + return Ok(0); // EOF or empty `buf` + } buf[..bytes_to_read].copy_from_slice(&available[..bytes_to_read]); self.consume(bytes_to_read); Ok(bytes_to_read) } } -impl BufRead for SequentialFileReader { +impl<'a> BufRead for SequentialFileReader<'a> { fn fill_buf(&mut self) -> io::Result<&[u8]> { - let _have_data = loop { - let state = self.inner.context_mut(); - let num_buffers = state.buffers.len(); - let read_buf = &mut state.buffers[state.current_buf]; - match read_buf { - ReadBufState::Full(cursor) => { - if !cursor.fill_buf()?.is_empty() { - // we have some data available - break true; - } - let index = state.current_buf; - - if let Some(eof_index) = state.eof_buf_index { - if eof_index == index { - // This is the last filled buf for the whole file - return Ok(&[]); - } - // Some other buffer encountered EOF: move on, but don't issue new read. - state.current_buf = (state.current_buf + 1) % num_buffers; - } else { - // we have finished consuming this buffer, queue the next read - let cursor = mem::replace(cursor, Cursor::new(IoBufferChunk::empty())); - let buf = cursor.into_inner(); - - // The very last read when we hit EOF could return less than `read_capacity`, in - // which case what's in the cursor is shorter than `read_capacity` and for - // strict correctness we should reset the length. - // - // Note though that once we hit EOF we don't queue any more reads, so even if we - // didn't reset the length it wouldn't matter. - debug_assert!(buf.len() == state.read_capacity); - - state.buffers[index] = ReadBufState::Uninit(buf); - state.current_buf = (state.current_buf + 1) % num_buffers; - - self.start_reading_buf(index)?; - } - - // move to the next buffer and check again whether we have data - continue; - } - ReadBufState::Uninit(_) => unreachable!("should be initialized"), - _ => break false, - } - }; - - loop { - self.inner.process_completions()?; - let state = self.inner.context(); - - match &state.buffers[state.current_buf] { - ReadBufState::Full(_) => break, - ReadBufState::Uninit(_) => unreachable!("should be initialized"), - // Still no data, wait for more completions, but submit in case the SQPOLL - // thread is asleep and there are queued entries in the submission queue. - ReadBufState::Reading => self.inner.submit()?, - } + if self.state.current_buf_pos == self.state.current_buf_len + && !self.wait_current_buf_full()? + { + return Ok(&[]); } // At this point we must have data or be at EOF. - let state = self.inner.context_mut(); - match &mut state.buffers[state.current_buf] { - ReadBufState::Full(cursor) => Ok(cursor.fill_buf()?), - // after the loop above we either have some data or we must be at EOF - _ => unreachable!(), - } + let current_buf = self.ring.context().get_fast(self.state.current_buf_index); + Ok(current_buf.slice(self.state.current_buf_pos, self.state.current_buf_len)) } fn consume(&mut self, amt: usize) { - let state = self.inner.context_mut(); - match &mut state.buffers[state.current_buf] { - ReadBufState::Full(cursor) => cursor.consume(amt), - _ => assert_eq!(amt, 0), + self.state.consume(amt); + } +} + +impl<'a> FileBufRead<'a> for SequentialFileReader<'a> { + fn set_file(&mut self, file: &'a File, read_limit: FileSize) -> io::Result<()> { + while self + .state + .files + .front() + .is_some_and(|file_state| !file_state.is_same_file(file)) + { + self.move_to_next_file()?; } + if self.state.files.is_empty() { + self.add_file_to_prefetch(file, read_limit)?; + } + Ok(()) + } + + fn get_file_offset(&self) -> FileSize { + self.state.current_offset + } +} + +/// Holds the state of all the buffers that may be submitted to the kernel for reading. +struct BuffersState(Box<[ReadBufState]>); + +impl BuffersState { + fn len(&self) -> u16 { + self.0.len() as u16 + } + + fn get_mut(&mut self, index: u16) -> &mut ReadBufState { + &mut self.0[index as usize] + } + + #[inline] + fn get_fast(&self, index: u16) -> &ReadBufState { + debug_assert!(index < self.len()); + // Perf: skip bounds check for performance + unsafe { self.0.get_unchecked(index as usize) } + } +} + +impl Deref for BuffersState { + type Target = [ReadBufState]; + + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +impl DerefMut for BuffersState { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.0 } } /// Holds the state of the reader. +#[derive(Debug, Default)] struct SequentialFileReaderState { - file: File, - read_capacity: IoSize, - offset: FileSize, - eof_buf_index: Option, - buffers: Vec, - current_buf: usize, + // Note: file states operate on file descriptors of files that are assumed to be open, + // which is guaranteed either by them being in `owned_files` or in case of file references + // because they are added with reader's 'a lifetime. + files: VecDeque, + + /// Amount of bytes left to consume from next buffer(s) before returning them in `fill_buf()`. + /// This is necessary to handle `consume()` calls beyond the current buffer. + left_to_consume: usize, + /// Index of `BuffersState` buffer to consume data from (0 if no file is being read) + current_buf_index: u16, + /// Position in buffer (pointed by `current_buf_index`) to consume data from + current_buf_pos: IoSize, + /// Cached length of the current buffer (0 until `wait_current_buf_full` initializes it) + current_buf_len: IoSize, + /// File offset of the next `fill_buf()` buffer available to consume + current_offset: FileSize, + + /// Index in `self.files` of the file that is currently being read (can generate new read ops). + next_read_file_index: Option, + /// Index of `BuffersState` buffer that can be used for the next read operation. + next_read_buf_index: u16, + + owned_files: VecDeque, } impl SequentialFileReaderState { - fn new( - path: impl AsRef, - buffers: Vec, - read_capacity: IoSize, - ) -> io::Result { - let file = OpenOptions::new() - .read(true) - .custom_flags(libc::O_NOATIME) - .open(path)?; - Ok(Self { - file, - read_capacity, - buffers, - offset: 0, - eof_buf_index: None, - current_buf: 0, - }) + fn consume(&mut self, amt: usize) { + if amt == 0 || self.files.is_empty() { + return; + } + self.current_offset += amt as FileSize; + + let unconsumed_buf_len = (self.current_buf_len - self.current_buf_pos) as usize; + if amt <= unconsumed_buf_len { + self.current_buf_pos += amt as IoSize; + } else { + self.current_buf_pos = self.current_buf_len; + // Keep track of any bytes left to consume beyond current buffer, they will be + // accounted for during next `wait_current_buf_full` call. + self.left_to_consume += amt - unconsumed_buf_len; + } + } + + /// Return the next read operation for the reader. + /// + /// If all buffers are used or last file is already (being) read, returns `None`. + /// + /// Reads are issued for files added into the reader from first file at position 0 + /// to its limit / EOF and then for any subsequent files. + fn next_read_op(&mut self, bufs: &mut [ReadBufState]) -> Option { + if self.all_buffers_used(bufs) { + return None; + } + let num_bufs = bufs.len() as u16; + loop { + let read_file_index = self.next_read_file_index?; + match self.files[read_file_index].next_read_op(self.next_read_buf_index, bufs) { + Some(op) => { + self.next_read_buf_index = (self.next_read_buf_index + 1) % num_bufs; + return Some(op); + } + None => { + // Last read file reached its limit, try to move to the next file + if read_file_index < self.files.len() - 1 { + self.next_read_file_index = Some(read_file_index + 1); + } else { + return None; + } + } + } + } + } + + fn move_to_next_buf(&mut self, num_bufs: u16) { + self.current_buf_index = (self.current_buf_index + 1) % num_bufs; + self.current_buf_pos = 0; + // Buffer might still be reading, len will be intialized on first `wait_current_buf_full` + self.current_buf_len = 0; + } + + /// Returns `true` if there are no more buffers available for reading. + fn all_buffers_used(&self, bufs: &[ReadBufState]) -> bool { + bufs[self.next_read_buf_index as usize].is_used() + } +} + +/// Holds the state of a single file being read. +#[derive(Debug)] +struct FileState { + raw_fd: RawFd, + /// Limit file offset to read up to. + read_limit: FileSize, + /// Offset of the next byte to read from the file + next_read_offset: FileSize, + /// When the file is possible to read for the first time, it should be read from this buffer index + start_buf_index: Option, +} + +impl FileState { + fn new(raw_fd: RawFd, read_limit: FileSize) -> Self { + Self { + raw_fd, + read_limit, + next_read_offset: 0, + start_buf_index: None, + } + } + + fn is_same_file(&self, file: &File) -> bool { + self.raw_fd == file.as_raw_fd() + } + + /// Create a new read operation into the `bufs[index]` buffer and update file state. + /// + /// This is called whenever new reads can be scheduled (on added file or freed buffer). + /// + /// Returns `ReadOp` that will read + /// [self.next_read_offset, self.next_read_offset + min(buf len, self.read_limit)) + /// from the file into `bufs[index]`. Once the read is complete the buffer changes into + /// `Full` state and can be consumed. + fn next_read_op(&mut self, index: u16, bufs: &mut [ReadBufState]) -> Option { + let Self { + start_buf_index, + raw_fd, + next_read_offset: offset, + read_limit, + } = self; + let left_to_read = read_limit.saturating_sub(*offset); + if left_to_read == 0 { + return None; + } + + let buf = bufs[index as usize].transition_to_reading(); + + let read_len = left_to_read.min(buf.len() as FileSize); + let op = ReadOp { + fd: types::Fd(*raw_fd), + buf, + buf_offset: 0, + file_offset: *offset, + read_len: read_len as u32, // it's trimmed by u32 buf.len() above + is_last_read: left_to_read == read_len, + reader_buf_index: index, + }; + // Mark file state to start reading at `index` buffer + if start_buf_index.is_none() { + *start_buf_index = Some(index); + } + + // We always advance by `read_len`. If we get a short read, we submit a new + // read for the remaining data. See ReadOp::complete(). + *offset += read_len; + + Some(op) } } +#[derive(Debug)] enum ReadBufState { /// The buffer is pending submission to read queue (on initialization and /// in transition from `Full` to `Reading`). @@ -357,78 +611,100 @@ enum ReadBufState { /// the ring. Reading, /// The buffer is filled and ready to be consumed. - Full(Cursor), + Full { + buf: IoBufferChunk, + /// Position in `buf` at which 0-sized read (or requested read limit) was reached + eof_pos: Option, + }, } -impl std::fmt::Debug for ReadBufState { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { +impl ReadBufState { + fn is_used(&self) -> bool { + matches!(self, ReadBufState::Reading | ReadBufState::Full { .. }) + } + + fn is_reading(&self) -> bool { + matches!(self, ReadBufState::Reading) + } + + #[inline] + fn slice(&self, start_pos: u32, end_pos: u32) -> &[u8] { + match self { + Self::Full { buf, eof_pos } => { + debug_assert!(eof_pos.unwrap_or(buf.len()) >= end_pos); + let limit = (end_pos - start_pos) as usize; + unsafe { slice::from_raw_parts(buf.as_ptr().add(start_pos as usize), limit) } + } + Self::Uninit(_) | Self::Reading => { + unreachable!("must call as_slice only on full buffer") + } + } + } + + /// Marks the buffer as uninitialized (after it has been fully consumed). + fn transition_to_uninit(&mut self) { match self { - Self::Uninit(buf) => f - .debug_struct("Uninit") - .field("io_buf_index", &buf.io_buf_index()) - .finish(), - Self::Reading => write!(f, "Reading"), - Self::Full(cursor) => f - .debug_struct("Full") - .field("io_buf_index", &cursor.get_ref().io_buf_index()) - .finish(), + Self::Uninit(_) => (), + Self::Reading => unreachable!("cannot reset a buffer that has pending read"), + Self::Full { buf, .. } => { + *self = ReadBufState::Uninit(mem::replace(buf, IoBufferChunk::empty())); + } } } + + /// Marks the buffer as being read and returns underlying buffer to pass to `ReadOp`. + #[must_use] + fn transition_to_reading(&mut self) -> IoBufferChunk { + let Self::Uninit(buf) = mem::replace(self, Self::Reading) else { + unreachable!("buffer should be uninitialized") + }; + buf + } } +#[derive(Debug)] struct ReadOp { - fd: RawFd, + fd: types::Fd, buf: IoBufferChunk, /// This is the offset inside the buffer. It's typically 0, but can be non-zero if a previous /// read returned less data than requested (because of EINTR or whatever) and we submitted a new /// read for the remaining data. - buf_off: IoSize, + buf_offset: IoSize, /// The offset in the file. - file_off: FileSize, + file_offset: FileSize, /// The length of the read. This is typically `read_capacity` but can be less if a previous read - /// returned less data than requested. + /// returned less data than requested or `file_offset` is close to the end of read limit. read_len: IoSize, + /// Indicates that after reading `read_len` we have reached configured read limit. + is_last_read: bool, /// This is the index of the buffer in the reader's state. It's used to update the state once the /// read completes. - reader_buf_index: usize, -} - -impl std::fmt::Debug for ReadOp { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - f.debug_struct("ReadOp") - .field("fd", &self.fd) - .field("buf_off", &self.buf_off) - .field("io_buf_index", &self.buf.io_buf_index()) - .field("file_off", &self.file_off) - .field("read_len", &self.read_len) - .field("reader_buf_index", &self.reader_buf_index) - .finish() - } + reader_buf_index: u16, } -impl RingOp for ReadOp { +impl RingOp for ReadOp { fn entry(&mut self) -> squeue::Entry { let ReadOp { fd, buf, - buf_off, - file_off, + buf_offset, + file_offset, read_len, + is_last_read: _, reader_buf_index: _, } = self; - debug_assert!(*buf_off + *read_len <= buf.len()); + debug_assert!(*buf_offset + *read_len <= buf.len()); // Safety: we assert that the buffer is large enough to hold the read. - let buf_ptr = unsafe { buf.as_mut_ptr().byte_add(*buf_off as usize) }; + let buf_ptr = unsafe { buf.as_mut_ptr().byte_add(*buf_offset as usize) }; - let fd = types::Fd(*fd); - let offset = *file_off; + let offset = *file_offset; let entry = match buf.io_buf_index() { - Some(io_buf_index) => opcode::ReadFixed::new(fd, buf_ptr, *read_len, io_buf_index) + Some(io_buf_index) => opcode::ReadFixed::new(*fd, buf_ptr, *read_len, io_buf_index) .offset(offset) .ioprio(IO_PRIO_BE_HIGHEST) .build(), - None => opcode::Read::new(fd, buf_ptr, *read_len) + None => opcode::Read::new(*fd, buf_ptr, *read_len) .offset(offset) .ioprio(IO_PRIO_BE_HIGHEST) .build(), @@ -438,25 +714,23 @@ impl RingOp for ReadOp { fn complete( &mut self, - completion: &mut Completion, + completion: &mut Completion, res: io::Result, ) -> io::Result<()> { let ReadOp { fd, buf, - buf_off, - file_off, + buf_offset, + file_offset, read_len, + is_last_read, reader_buf_index, } = self; - let reader_state = completion.context_mut(); + let buffers = completion.context_mut(); let last_read_len = res? as IoSize; - if last_read_len == 0 { - reader_state.eof_buf_index = Some(*reader_buf_index); - } - let total_read_len = *buf_off + last_read_len; + let total_read_len = *buf_offset + last_read_len; let buf = mem::replace(buf, IoBufferChunk::empty()); if last_read_len > 0 && last_read_len < *read_len { @@ -464,18 +738,21 @@ impl RingOp for ReadOp { let op: ReadOp = ReadOp { fd: *fd, buf, - buf_off: total_read_len, - file_off: *file_off + last_read_len as FileSize, + buf_offset: total_read_len, + file_offset: *file_offset + last_read_len as FileSize, read_len: *read_len - last_read_len, reader_buf_index: *reader_buf_index, + is_last_read: *is_last_read, }; // Safety: // The op points to a buffer which is guaranteed to be valid for the // lifetime of the operation completion.push(op); } else { - reader_state.buffers[*reader_buf_index] = - ReadBufState::Full(Cursor::new(buf.into_shrinked(total_read_len))); + buffers[*reader_buf_index as usize] = ReadBufState::Full { + buf, + eof_pos: (last_read_len == 0 || *is_last_read).then_some(total_read_len), + }; } Ok(()) @@ -484,7 +761,13 @@ impl RingOp for ReadOp { #[cfg(test)] mod tests { - use {super::*, tempfile::NamedTempFile}; + use {super::*, std::io::Seek, tempfile::NamedTempFile}; + + fn read_as_vec(mut reader: impl Read) -> Vec { + let mut buf = Vec::new(); + reader.read_to_end(&mut buf).unwrap(); + buf + } fn check_reading_file(file_size: FileSize, backing_buffer_size: usize, read_capacity: IoSize) { let pattern: Vec = (0..251).collect(); @@ -503,13 +786,14 @@ mod tests { let buf = PageAlignedMemory::new(backing_buffer_size).unwrap(); let mut reader = SequentialFileReaderBuilder::new() .read_capacity(read_capacity) - .build_with_buffer(temp_file.path(), buf) + .build_with_buffer(buf) .unwrap(); + reader.set_path(temp_file.path()).unwrap(); // Read contents from the reader and verify length - let mut all_read_data = Vec::new(); - reader.read_to_end(&mut all_read_data).unwrap(); + let all_read_data = read_as_vec(&mut reader); assert_eq!(all_read_data.len() as FileSize, file_size); + assert_eq!(reader.get_file_offset(), file_size); // Verify the contents for (i, byte) in all_read_data.iter().enumerate() { @@ -554,12 +838,163 @@ mod tests { let mut reader = SequentialFileReaderBuilder::new() .read_capacity(4 * 1024) .use_registered_buffers(false) - .build(temp_file.path(), 16 * 1024) + .build(16 * 1024) .unwrap(); + reader.set_path(temp_file.path()).unwrap(); let mut all_read_data = Vec::new(); reader.read_to_end(&mut all_read_data).unwrap(); assert_eq!(all_read_data.len(), file_size); assert_eq!(all_read_data, data); } + + #[test] + fn test_add_file_ref() { + let mut temp_file = NamedTempFile::new().unwrap(); + io::Write::write_all(&mut temp_file, &[0xa, 0xb, 0xc]).unwrap(); + temp_file.rewind().unwrap(); + + { + let mut reader = SequentialFileReaderBuilder::new() + .read_capacity(512) + .build(1024) + .unwrap(); + reader.add_file_to_prefetch(temp_file.as_file(), 3).unwrap(); + assert_eq!(read_as_vec(&mut reader), &[0xa, 0xb, 0xc]); + } + // Independently we can also read from the file directly + assert_eq!(read_as_vec(&mut temp_file), &[0xa, 0xb, 0xc]); + } + + #[test] + fn test_multiple_unlimited_files() { + let mut temp1 = NamedTempFile::new().unwrap(); + io::Write::write_all(&mut temp1, &[0xa, 0xb, 0xc]).unwrap(); + let mut temp2 = NamedTempFile::new().unwrap(); + io::Write::write_all(&mut temp2, &[0xd, 0xe, 0xf, 0x10]).unwrap(); + + let mut reader = SequentialFileReaderBuilder::new() + .read_capacity(512) + .build(1024) + .unwrap(); + + let f1 = File::open(temp1.path()).unwrap(); + let f2 = File::open(temp2.path()).unwrap(); + reader + .add_owned_file_to_prefetch(f1, FileSize::MAX) + .unwrap(); + reader + .add_owned_file_to_prefetch(f2, FileSize::MAX) + .unwrap(); + + assert_eq!(read_as_vec(&mut reader), vec![0xa, 0xb, 0xc]); + reader.move_to_next_file().unwrap(); + + assert_eq!(read_as_vec(&mut reader), vec![0xd, 0xe, 0xf, 0x10]); + reader.move_to_next_file().unwrap(); + + let f1 = File::open(temp1.path()).unwrap(); + reader + .add_owned_file_to_prefetch(f1, FileSize::MAX) + .unwrap(); + assert_eq!(read_as_vec(&mut reader), vec![0xa, 0xb, 0xc]); + } + + #[test] + fn test_get_offset() { + let pattern = (0..600).map(|i| i as u8).collect::>(); + let mut temp1 = NamedTempFile::new().unwrap(); + io::Write::write_all(&mut temp1, &pattern).unwrap(); + + let mut reader = SequentialFileReaderBuilder::new() + .read_capacity(512) + .build(1024) + .unwrap(); + reader.add_file_to_prefetch(temp1.as_file(), 1990).unwrap(); + + assert_eq!(512, reader.fill_buf().unwrap().len()); + assert_eq!(0, reader.get_file_offset()); + reader.consume(0); + assert_eq!(0, reader.get_file_offset()); + + reader.consume(40); + assert_eq!(40, reader.get_file_offset()); + assert_eq!(472, reader.fill_buf().unwrap().len()); + + reader.consume(472); + assert_eq!(512, reader.get_file_offset()); + assert_eq!(88, reader.fill_buf().unwrap().len()); + reader.consume(0); + assert_eq!(512, reader.get_file_offset()); + + reader.consume(88); + assert_eq!(600, reader.get_file_offset()); + assert_eq!(0, reader.fill_buf().unwrap().len()); + + reader.move_to_next_file().unwrap(); + assert_eq!(0, reader.get_file_offset()); + } + + #[test] + fn test_consume_skip_filled_buf_len() { + let pattern = (0..6000).map(|i| i as u8).collect::>(); + let mut temp1 = NamedTempFile::new().unwrap(); + io::Write::write_all(&mut temp1, &pattern).unwrap(); + + let mut reader = SequentialFileReaderBuilder::new() + .read_capacity(512) + .build(2048) + .unwrap(); + reader.add_file_to_prefetch(temp1.as_file(), 5990).unwrap(); + + assert_eq!(reader.fill_buf().unwrap(), &pattern[..512]); + assert_eq!(0, reader.get_file_offset()); + + reader.consume(600); + assert_eq!(600, reader.get_file_offset()); + assert_eq!(reader.fill_buf().unwrap(), &pattern[600..1024]); + + reader.consume(400); + assert_eq!(1000, reader.get_file_offset()); + assert_eq!(reader.fill_buf().unwrap(), &pattern[1000..1024]); + + reader.consume(25); + assert_eq!(reader.fill_buf().unwrap(), &pattern[1025..1536]); + + reader.consume(2000); + assert_eq!(reader.fill_buf().unwrap(), &pattern[3025..3072]); + } + + #[test] + fn test_set_file() { + let mut temp1 = NamedTempFile::new().unwrap(); + io::Write::write_all(&mut temp1, &[0xa, 0xb, 0xc]).unwrap(); + let mut temp2 = NamedTempFile::new().unwrap(); + io::Write::write_all(&mut temp2, &[0xd, 0xe, 0xf, 0x10]).unwrap(); + + let mut reader = SequentialFileReaderBuilder::new() + .read_capacity(512) + .build(1024) + .unwrap(); + reader.add_file_to_prefetch(temp1.as_file(), 3).unwrap(); + reader.add_file_to_prefetch(temp2.as_file(), 4).unwrap(); + + assert_eq!(read_as_vec(&mut reader), vec![0xa, 0xb, 0xc]); + + reader.set_file(temp2.as_file(), 4).unwrap(); + assert_eq!(read_as_vec(&mut reader), vec![0xd, 0xe, 0xf, 0x10]); + + reader.set_file(temp1.as_file(), 4).unwrap(); + assert_eq!(read_as_vec(&mut reader), vec![0xa, 0xb, 0xc]); + + let f1 = File::open(temp1.path()).unwrap(); + reader + .add_owned_file_to_prefetch(f1, FileSize::MAX) + .unwrap(); + reader.move_to_next_file().unwrap(); + assert_eq!(read_as_vec(&mut reader), vec![0xa, 0xb, 0xc]); + + reader.set_file(temp2.as_file(), 4).unwrap(); + assert_eq!(read_as_vec(&mut reader), vec![0xd, 0xe, 0xf, 0x10]); + } }