From 63207cf7a6bc5847cee961191b7fa5145e56411b Mon Sep 17 00:00:00 2001 From: ra <40503841+RadiantAeon@users.noreply.github.com> Date: Mon, 2 Feb 2026 13:09:31 +0000 Subject: [PATCH] use direct io in sequential file reader --- fs/src/io_uring/sequential_file_reader.rs | 141 ++++++++++++++++++---- 1 file changed, 119 insertions(+), 22 deletions(-) diff --git a/fs/src/io_uring/sequential_file_reader.rs b/fs/src/io_uring/sequential_file_reader.rs index e9f3f911609..be019e89608 100644 --- a/fs/src/io_uring/sequential_file_reader.rs +++ b/fs/src/io_uring/sequential_file_reader.rs @@ -31,6 +31,9 @@ const DEFAULT_READ_SIZE: IoSize = 1024 * 1024; // For large file we don't really use workers as few regularly submitted requests get handled // within sqpoll thread. Allow some workers just in case, but limit them. const DEFAULT_MAX_IOWQ_WORKERS: u32 = 2; +// This is conservative read size alignment for use with direct IO, some block devices may have +// relaxed requirements, but detecting it is not trivial. +const DIRECT_IO_READ_LEN_ALIGNMENT: IoSize = 4096; /// Utility for building `SequentialFileReader` with specified tuning options. pub struct SequentialFileReaderBuilder<'sp> { @@ -40,6 +43,8 @@ pub struct SequentialFileReaderBuilder<'sp> { shared_sqpoll_fd: Option>, /// Register buffer as fixed with the kernel register_buffer: bool, + /// Toggle option for opening files with the O_DIRECT flag + use_direct_io: bool, } impl<'sp> SequentialFileReaderBuilder<'sp> { @@ -50,6 +55,7 @@ impl<'sp> SequentialFileReaderBuilder<'sp> { ring_squeue_size: None, shared_sqpoll_fd: None, register_buffer: false, + use_direct_io: false, } } @@ -70,6 +76,16 @@ impl<'sp> SequentialFileReaderBuilder<'sp> { self } + /// Read files in direct-IO mode (disables kernel caching of read contents). + /// + /// Enabling requires the filesystem to support directio and `read_capacity` + /// to be a multiple of 4096. + #[cfg(test)] + pub fn use_direct_io(mut self, use_direct_io: bool) -> Self { + self.use_direct_io = use_direct_io; + self + } + /// Use (or remove) a shared kernel thread to drain submission queue for IO operations pub fn shared_sqpoll(mut self, shared_sqpoll_fd: Option>) -> Self { self.shared_sqpoll_fd = shared_sqpoll_fd; @@ -127,9 +143,29 @@ impl<'sp> SequentialFileReaderBuilder<'sp> { unsafe { IoBufferChunk::register(buf_slice_mut, &ring)? }; } + if self.use_direct_io { + // O_DIRECT reads have size and alignment restrictions and must be into a sub-buffer of + // some multiple of the fs block size (see https://man7.org/linux/man-pages/man2/open.2.html#NOTES). + assert!( + self.read_capacity + .is_multiple_of(DIRECT_IO_READ_LEN_ALIGNMENT), + "read size is not aligned for direct IO({} is not a multiple of \ + {DIRECT_IO_READ_LEN_ALIGNMENT})", + self.read_capacity + ); + } + + let open_file_flags = libc::O_NOATIME + | if self.use_direct_io { + libc::O_DIRECT + } else { + 0 + }; + Ok(SequentialFileReader { ring, state: SequentialFileReaderState::default(), + open_file_flags, _backing_buffer: buffer, _phantom: PhantomData, }) @@ -167,6 +203,7 @@ pub struct SequentialFileReader<'a> { // Note: ring's state is tied to `_backing_buffer` - contains unsafe pointer references // to the buffer. Ring should be drained and dropped before `_backing_buffer`. ring: Ring, + open_file_flags: i32, state: SequentialFileReaderState, /// Owned buffer used (chunked into `FixedIoBuffer` items) across lifespan of `inner` /// (should get dropped last) @@ -181,7 +218,7 @@ impl<'a> SequentialFileReader<'a> { pub fn set_path(&mut self, path: impl AsRef) -> io::Result<()> { let file = OpenOptions::new() .read(true) - .custom_flags(libc::O_NOATIME) + .custom_flags(self.open_file_flags) .open(path)?; let file_size = file.metadata()?.len(); self.add_owned_file_to_prefetch(file, file_size) @@ -189,7 +226,17 @@ impl<'a> SequentialFileReader<'a> { /// Add `file` to read. Starts reading the file as soon as a buffer is available. /// + /// This function uses the direct io settings set in `SequentialFileReaderBuilder`. + /// + /// A direct io mode reader is safe to use with non direct io files. However, passing + /// direct io mode files to the reader in non direct io mode might result in an io error + /// due to unaligned read. + /// + /// It is up to the end user to ensure that they are passing files that conform to + /// the direct io settings of this `SequentialFileReader`. + /// /// The read finishes when EOF is reached or `read_limit` bytes are read. + /// The `read_limit` must be less than or equal to the file size when in direct io mode. /// Multiple files can be added to the reader and they will be read-ahead in FIFO order. /// /// Reader takes ownership of the file and will drop it after it's done reading @@ -210,7 +257,12 @@ impl<'a> SequentialFileReader<'a> { /// Caller must ensure that the file is not closed while the reader is using it. fn add_file_by_fd(&mut self, fd: RawFd, read_limit: FileSize) -> io::Result<()> { - self.state.files.push_back(FileState::new(fd, read_limit)); + // Use `open_file_flags` to set the `is_direct_io` parameter + self.state.files.push_back(FileState::new( + fd, + self.open_file_flags & libc::O_DIRECT == libc::O_DIRECT, + read_limit, + )); if self.state.all_buffers_used(self.ring.context()) { // Just added file to backlog, no reads can be started yet. @@ -388,6 +440,9 @@ impl<'a> BufRead for SequentialFileReader<'a> { } impl<'a> FileBufRead<'a> for SequentialFileReader<'a> { + /// The `SequentialFileReader` must be in direct io mode if passing in direct io files. + /// `read_limit` must be less than the file size if using direct io. + /// See `add_owned_file_to_prefetch` for more details. fn set_file(&mut self, file: &'a File, read_limit: FileSize) -> io::Result<()> { while self .state @@ -535,6 +590,8 @@ impl SequentialFileReaderState { #[derive(Debug)] struct FileState { raw_fd: RawFd, + /// Is the file opened with direct io + is_direct_io: bool, /// Limit file offset to read up to. read_limit: FileSize, /// Offset of the next byte to read from the file @@ -544,9 +601,10 @@ struct FileState { } impl FileState { - fn new(raw_fd: RawFd, read_limit: FileSize) -> Self { + fn new(raw_fd: RawFd, is_direct_io: bool, read_limit: FileSize) -> Self { Self { raw_fd, + is_direct_io, read_limit, next_read_offset: 0, start_buf_index: None, @@ -569,6 +627,7 @@ impl FileState { let Self { start_buf_index, raw_fd, + is_direct_io, next_read_offset: offset, read_limit, } = self; @@ -583,6 +642,7 @@ impl FileState { let op = ReadOp { fd: types::Fd(*raw_fd), buf, + is_direct_io: *is_direct_io, buf_offset: 0, file_offset: *offset, read_len: read_len as u32, // it's trimmed by u32 buf.len() above @@ -669,6 +729,7 @@ impl ReadBufState { struct ReadOp { fd: types::Fd, buf: IoBufferChunk, + is_direct_io: bool, /// This is the offset inside the buffer. It's typically 0, but can be non-zero if a previous /// read returned less data than requested (because of EINTR or whatever) and we submitted a new /// read for the remaining data. @@ -690,22 +751,36 @@ impl RingOp for ReadOp { let ReadOp { fd, buf, + is_direct_io, buf_offset, file_offset, read_len, is_last_read: _, reader_buf_index: _, } = self; - debug_assert!(*buf_offset + *read_len <= buf.len()); + + // Align the read length if necessary + let internal_read_len = if *is_direct_io && *read_len != buf.len() { + // Try to align the read len if possible and fall back to reading + // the full remaining bytes if we can't align the read len. + read_len + .next_multiple_of(DIRECT_IO_READ_LEN_ALIGNMENT) + .min(buf.len() - *buf_offset) + } else { + *read_len + }; + debug_assert!(*buf_offset + internal_read_len <= buf.len()); // Safety: we assert that the buffer is large enough to hold the read. let buf_ptr = unsafe { buf.as_mut_ptr().byte_add(*buf_offset as usize) }; let entry = match buf.io_buf_index() { - Some(io_buf_index) => opcode::ReadFixed::new(*fd, buf_ptr, *read_len, io_buf_index) - .offset(*file_offset) - .ioprio(IO_PRIO_BE_HIGHEST) - .build(), - None => opcode::Read::new(*fd, buf_ptr, *read_len) + Some(io_buf_index) => { + opcode::ReadFixed::new(*fd, buf_ptr, internal_read_len, io_buf_index) + .offset(*file_offset) + .ioprio(IO_PRIO_BE_HIGHEST) + .build() + } + None => opcode::Read::new(*fd, buf_ptr, internal_read_len) .offset(*file_offset) .ioprio(IO_PRIO_BE_HIGHEST) .build(), @@ -721,6 +796,7 @@ impl RingOp for ReadOp { let ReadOp { fd, buf, + is_direct_io, buf_offset, file_offset, read_len, @@ -739,6 +815,7 @@ impl RingOp for ReadOp { let op: ReadOp = ReadOp { fd: *fd, buf, + is_direct_io: *is_direct_io, buf_offset: total_read_len, file_offset: *file_offset + last_read_len as FileSize, read_len: *read_len - last_read_len, @@ -770,7 +847,12 @@ mod tests { buf } - fn check_reading_file(file_size: FileSize, backing_buffer_size: usize, read_capacity: IoSize) { + fn check_reading_file( + file_size: FileSize, + backing_buffer_size: usize, + read_capacity: IoSize, + use_direct_io: bool, + ) { let pattern: Vec = (0..251).collect(); // Create a temp file and write the pattern to it repeatedly @@ -786,6 +868,7 @@ mod tests { let buf = PageAlignedMemory::new(backing_buffer_size).unwrap(); let mut reader = SequentialFileReaderBuilder::new() + .use_direct_io(use_direct_io) .read_capacity(read_capacity) .build_with_buffer(buf) .unwrap(); @@ -805,28 +888,28 @@ mod tests { /// Test with buffer larger than the whole file #[test] fn test_reading_small_file() { - check_reading_file(2500, 4096, 1024); - check_reading_file(2500, 4096, 2048); - check_reading_file(2500, 4096, 4096); + check_reading_file(2500, 4096, 1024, false); + check_reading_file(2500, 4096, 2048, false); + check_reading_file(2500, 4096, 4096, false); } /// Test with buffer smaller than the whole file #[test] fn test_reading_file_in_chunks() { - check_reading_file(25_000, 16384, 1024); - check_reading_file(25_000, 4096, 1024); - check_reading_file(25_000, 4096, 2048); - check_reading_file(25_000, 4096, 4096); + check_reading_file(25_000, 16384, 1024, false); + check_reading_file(25_000, 4096, 1024, false); + check_reading_file(25_000, 4096, 2048, false); + check_reading_file(25_000, 4096, 4096, false); } /// Test with buffer much smaller than the whole file #[test] fn test_reading_large_file() { - check_reading_file(250_000, 32768, 1024); - check_reading_file(250_000, 16384, 1024); - check_reading_file(250_000, 4096, 1024); - check_reading_file(250_000, 4096, 2048); - check_reading_file(250_000, 4096, 4096); + check_reading_file(250_000, 32768, 1024, false); + check_reading_file(250_000, 16384, 1024, false); + check_reading_file(250_000, 4096, 1024, false); + check_reading_file(250_000, 4096, 2048, false); + check_reading_file(250_000, 4096, 4096, false); } #[test] @@ -867,6 +950,20 @@ mod tests { assert_eq!(read_as_vec(&mut temp_file), &[0xa, 0xb, 0xc]); } + #[test] + fn test_direct_io_read() { + check_reading_file(2_500, 4096, 4096, true); + check_reading_file(2_500, 16384, 4096, true); + check_reading_file(25_000, 4096, 4096, true); + check_reading_file(25_000, 16384, 4096, true); + check_reading_file(250_000, 4096, 4096, true); + check_reading_file(250_000, 16384, 4096, true); + check_reading_file(4096, 4096, 4096, true); + check_reading_file(4096, 16384, 4096, true); + check_reading_file(16384, 4096, 4096, true); + check_reading_file(16384, 16384, 4096, true); + } + #[test] fn test_multiple_unlimited_files() { let mut temp1 = NamedTempFile::new().unwrap();