Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
141 changes: 119 additions & 22 deletions fs/src/io_uring/sequential_file_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@ const DEFAULT_READ_SIZE: IoSize = 1024 * 1024;
// For large file we don't really use workers as few regularly submitted requests get handled
// within sqpoll thread. Allow some workers just in case, but limit them.
const DEFAULT_MAX_IOWQ_WORKERS: u32 = 2;
// This is conservative read size alignment for use with direct IO, some block devices may have
// relaxed requirements, but detecting it is not trivial.
const DIRECT_IO_READ_LEN_ALIGNMENT: IoSize = 4096;

/// Utility for building `SequentialFileReader` with specified tuning options.
pub struct SequentialFileReaderBuilder<'sp> {
Expand All @@ -40,6 +43,8 @@ pub struct SequentialFileReaderBuilder<'sp> {
shared_sqpoll_fd: Option<BorrowedFd<'sp>>,
/// Register buffer as fixed with the kernel
register_buffer: bool,
/// Toggle option for opening files with the O_DIRECT flag
use_direct_io: bool,
}

impl<'sp> SequentialFileReaderBuilder<'sp> {
Expand All @@ -50,6 +55,7 @@ impl<'sp> SequentialFileReaderBuilder<'sp> {
ring_squeue_size: None,
shared_sqpoll_fd: None,
register_buffer: false,
use_direct_io: false,
}
}

Expand All @@ -70,6 +76,16 @@ impl<'sp> SequentialFileReaderBuilder<'sp> {
self
}

/// Read files in direct-IO mode (disables kernel caching of read contents).
///
/// Enabling requires the filesystem to support directio and `read_capacity`
/// to be a multiple of 4096.
#[cfg(test)]
pub fn use_direct_io(mut self, use_direct_io: bool) -> Self {
self.use_direct_io = use_direct_io;
self
}
Comment on lines +83 to +87
Copy link

Copilot AI Feb 10, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use_direct_io is gated behind #[cfg(test)], so production code (e.g., large_file_buf_reader) cannot enable O_DIRECT at all and open_file_flags will always be O_NOATIME in non-test builds. If this PR is intended to speed up snapshot reads in production, this method (or an equivalent config path) needs to be available outside tests and wired through the call sites that construct SequentialFileReaderBuilder.

Copilot uses AI. Check for mistakes.

/// Use (or remove) a shared kernel thread to drain submission queue for IO operations
pub fn shared_sqpoll(mut self, shared_sqpoll_fd: Option<BorrowedFd<'sp>>) -> Self {
self.shared_sqpoll_fd = shared_sqpoll_fd;
Expand Down Expand Up @@ -127,9 +143,29 @@ impl<'sp> SequentialFileReaderBuilder<'sp> {
unsafe { IoBufferChunk::register(buf_slice_mut, &ring)? };
}

if self.use_direct_io {
// O_DIRECT reads have size and alignment restrictions and must be into a sub-buffer of
// some multiple of the fs block size (see https://man7.org/linux/man-pages/man2/open.2.html#NOTES).
assert!(
self.read_capacity
.is_multiple_of(DIRECT_IO_READ_LEN_ALIGNMENT),
"read size is not aligned for direct IO({} is not a multiple of \
{DIRECT_IO_READ_LEN_ALIGNMENT})",
self.read_capacity
);
Comment on lines +149 to +155
Copy link

Copilot AI Feb 10, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using assert! for direct-IO alignment will panic at runtime if misconfigured. Since this is user-controlled configuration (and may depend on the underlying filesystem/device), prefer returning an io::Error (e.g., InvalidInput) from build/build_with_buffer so callers can handle it gracefully instead of crashing.

Suggested change
assert!(
self.read_capacity
.is_multiple_of(DIRECT_IO_READ_LEN_ALIGNMENT),
"read size is not aligned for direct IO({} is not a multiple of \
{DIRECT_IO_READ_LEN_ALIGNMENT})",
self.read_capacity
);
if !self
.read_capacity
.is_multiple_of(DIRECT_IO_READ_LEN_ALIGNMENT)
{
return Err(io::Error::new(
io::ErrorKind::InvalidInput,
format!(
"read size is not aligned for direct IO({} is not a multiple of \
{DIRECT_IO_READ_LEN_ALIGNMENT})",
self.read_capacity
),
));
}

Copilot uses AI. Check for mistakes.
}

let open_file_flags = libc::O_NOATIME
| if self.use_direct_io {
libc::O_DIRECT
} else {
0
};

Ok(SequentialFileReader {
ring,
state: SequentialFileReaderState::default(),
open_file_flags,
_backing_buffer: buffer,
_phantom: PhantomData,
})
Expand Down Expand Up @@ -167,6 +203,7 @@ pub struct SequentialFileReader<'a> {
// Note: ring's state is tied to `_backing_buffer` - contains unsafe pointer references
// to the buffer. Ring should be drained and dropped before `_backing_buffer`.
ring: Ring<BuffersState, ReadOp>,
open_file_flags: i32,
state: SequentialFileReaderState,
/// Owned buffer used (chunked into `FixedIoBuffer` items) across lifespan of `inner`
/// (should get dropped last)
Expand All @@ -181,15 +218,25 @@ impl<'a> SequentialFileReader<'a> {
pub fn set_path(&mut self, path: impl AsRef<Path>) -> io::Result<()> {
let file = OpenOptions::new()
.read(true)
.custom_flags(libc::O_NOATIME)
.custom_flags(self.open_file_flags)
.open(path)?;
let file_size = file.metadata()?.len();
self.add_owned_file_to_prefetch(file, file_size)
}

/// Add `file` to read. Starts reading the file as soon as a buffer is available.
///
/// This function uses the direct io settings set in `SequentialFileReaderBuilder`.
///
/// A direct io mode reader is safe to use with non direct io files. However, passing
/// direct io mode files to the reader in non direct io mode might result in an io error
/// due to unaligned read.
///
/// It is up to the end user to ensure that they are passing files that conform to
/// the direct io settings of this `SequentialFileReader`.
///
/// The read finishes when EOF is reached or `read_limit` bytes are read.
/// The `read_limit` must be less than or equal to the file size when in direct io mode.
/// Multiple files can be added to the reader and they will be read-ahead in FIFO order.
///
/// Reader takes ownership of the file and will drop it after it's done reading
Expand All @@ -210,7 +257,12 @@ impl<'a> SequentialFileReader<'a> {

/// Caller must ensure that the file is not closed while the reader is using it.
fn add_file_by_fd(&mut self, fd: RawFd, read_limit: FileSize) -> io::Result<()> {
self.state.files.push_back(FileState::new(fd, read_limit));
// Use `open_file_flags` to set the `is_direct_io` parameter
self.state.files.push_back(FileState::new(
fd,
self.open_file_flags & libc::O_DIRECT == libc::O_DIRECT,
read_limit,
));

if self.state.all_buffers_used(self.ring.context()) {
// Just added file to backlog, no reads can be started yet.
Expand Down Expand Up @@ -388,6 +440,9 @@ impl<'a> BufRead for SequentialFileReader<'a> {
}

impl<'a> FileBufRead<'a> for SequentialFileReader<'a> {
/// The `SequentialFileReader` must be in direct io mode if passing in direct io files.
/// `read_limit` must be less than the file size if using direct io.
Copy link

Copilot AI Feb 10, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doc inconsistency: here it says read_limit must be "less than the file size" for direct IO, but the earlier docs for add_owned_file_to_prefetch say "less than or equal". Please make these consistent (and ideally enforce the constraint in code if it’s required).

Suggested change
/// `read_limit` must be less than the file size if using direct io.
/// `read_limit` must be less than or equal to the file size if using direct io.

Copilot uses AI. Check for mistakes.
/// See `add_owned_file_to_prefetch` for more details.
fn set_file(&mut self, file: &'a File, read_limit: FileSize) -> io::Result<()> {
while self
.state
Expand Down Expand Up @@ -535,6 +590,8 @@ impl SequentialFileReaderState {
#[derive(Debug)]
struct FileState {
raw_fd: RawFd,
/// Is the file opened with direct io
is_direct_io: bool,
/// Limit file offset to read up to.
read_limit: FileSize,
/// Offset of the next byte to read from the file
Expand All @@ -544,9 +601,10 @@ struct FileState {
}

impl FileState {
fn new(raw_fd: RawFd, read_limit: FileSize) -> Self {
fn new(raw_fd: RawFd, is_direct_io: bool, read_limit: FileSize) -> Self {
Self {
raw_fd,
is_direct_io,
read_limit,
next_read_offset: 0,
start_buf_index: None,
Expand All @@ -569,6 +627,7 @@ impl FileState {
let Self {
start_buf_index,
raw_fd,
is_direct_io,
next_read_offset: offset,
read_limit,
} = self;
Expand All @@ -583,6 +642,7 @@ impl FileState {
let op = ReadOp {
fd: types::Fd(*raw_fd),
buf,
is_direct_io: *is_direct_io,
buf_offset: 0,
file_offset: *offset,
read_len: read_len as u32, // it's trimmed by u32 buf.len() above
Expand Down Expand Up @@ -669,6 +729,7 @@ impl ReadBufState {
struct ReadOp {
fd: types::Fd,
buf: IoBufferChunk,
is_direct_io: bool,
/// This is the offset inside the buffer. It's typically 0, but can be non-zero if a previous
/// read returned less data than requested (because of EINTR or whatever) and we submitted a new
/// read for the remaining data.
Expand All @@ -690,22 +751,36 @@ impl RingOp<BuffersState> for ReadOp {
let ReadOp {
fd,
buf,
is_direct_io,
buf_offset,
file_offset,
read_len,
is_last_read: _,
reader_buf_index: _,
} = self;
debug_assert!(*buf_offset + *read_len <= buf.len());

// Align the read length if necessary
let internal_read_len = if *is_direct_io && *read_len != buf.len() {
// Try to align the read len if possible and fall back to reading
// the full remaining bytes if we can't align the read len.
read_len
.next_multiple_of(DIRECT_IO_READ_LEN_ALIGNMENT)
.min(buf.len() - *buf_offset)
} else {
*read_len
};
Comment on lines +762 to +771
Copy link

Copilot AI Feb 10, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

internal_read_len can be larger than the logical read_len to satisfy O_DIRECT alignment, but complete() and EOF handling still use read_len/is_last_read. This can expose more bytes than read_limit (and/or mark eof_pos beyond the intended limit) when internal_read_len > read_len, breaking the documented "read finishes after read_limit bytes" behavior. Consider tracking both the submitted length vs requested length and clamping the buffer’s readable length/EOF position to the requested read_len (discarding any over-read bytes) so fill_buf() never returns more than requested.

Copilot uses AI. Check for mistakes.
debug_assert!(*buf_offset + internal_read_len <= buf.len());
// Safety: we assert that the buffer is large enough to hold the read.
let buf_ptr = unsafe { buf.as_mut_ptr().byte_add(*buf_offset as usize) };

let entry = match buf.io_buf_index() {
Some(io_buf_index) => opcode::ReadFixed::new(*fd, buf_ptr, *read_len, io_buf_index)
.offset(*file_offset)
.ioprio(IO_PRIO_BE_HIGHEST)
.build(),
None => opcode::Read::new(*fd, buf_ptr, *read_len)
Some(io_buf_index) => {
opcode::ReadFixed::new(*fd, buf_ptr, internal_read_len, io_buf_index)
.offset(*file_offset)
.ioprio(IO_PRIO_BE_HIGHEST)
.build()
}
None => opcode::Read::new(*fd, buf_ptr, internal_read_len)
.offset(*file_offset)
.ioprio(IO_PRIO_BE_HIGHEST)
.build(),
Expand All @@ -721,6 +796,7 @@ impl RingOp<BuffersState> for ReadOp {
let ReadOp {
fd,
buf,
is_direct_io,
buf_offset,
file_offset,
read_len,
Expand All @@ -739,6 +815,7 @@ impl RingOp<BuffersState> for ReadOp {
let op: ReadOp = ReadOp {
fd: *fd,
buf,
is_direct_io: *is_direct_io,
buf_offset: total_read_len,
file_offset: *file_offset + last_read_len as FileSize,
read_len: *read_len - last_read_len,
Expand Down Expand Up @@ -770,7 +847,12 @@ mod tests {
buf
}

fn check_reading_file(file_size: FileSize, backing_buffer_size: usize, read_capacity: IoSize) {
fn check_reading_file(
file_size: FileSize,
backing_buffer_size: usize,
read_capacity: IoSize,
use_direct_io: bool,
) {
let pattern: Vec<u8> = (0..251).collect();

// Create a temp file and write the pattern to it repeatedly
Expand All @@ -786,6 +868,7 @@ mod tests {

let buf = PageAlignedMemory::new(backing_buffer_size).unwrap();
let mut reader = SequentialFileReaderBuilder::new()
.use_direct_io(use_direct_io)
.read_capacity(read_capacity)
.build_with_buffer(buf)
.unwrap();
Expand All @@ -805,28 +888,28 @@ mod tests {
/// Test with buffer larger than the whole file
#[test]
fn test_reading_small_file() {
check_reading_file(2500, 4096, 1024);
check_reading_file(2500, 4096, 2048);
check_reading_file(2500, 4096, 4096);
check_reading_file(2500, 4096, 1024, false);
check_reading_file(2500, 4096, 2048, false);
check_reading_file(2500, 4096, 4096, false);
}

/// Test with buffer smaller than the whole file
#[test]
fn test_reading_file_in_chunks() {
check_reading_file(25_000, 16384, 1024);
check_reading_file(25_000, 4096, 1024);
check_reading_file(25_000, 4096, 2048);
check_reading_file(25_000, 4096, 4096);
check_reading_file(25_000, 16384, 1024, false);
check_reading_file(25_000, 4096, 1024, false);
check_reading_file(25_000, 4096, 2048, false);
check_reading_file(25_000, 4096, 4096, false);
}

/// Test with buffer much smaller than the whole file
#[test]
fn test_reading_large_file() {
check_reading_file(250_000, 32768, 1024);
check_reading_file(250_000, 16384, 1024);
check_reading_file(250_000, 4096, 1024);
check_reading_file(250_000, 4096, 2048);
check_reading_file(250_000, 4096, 4096);
check_reading_file(250_000, 32768, 1024, false);
check_reading_file(250_000, 16384, 1024, false);
check_reading_file(250_000, 4096, 1024, false);
check_reading_file(250_000, 4096, 2048, false);
check_reading_file(250_000, 4096, 4096, false);
}

#[test]
Expand Down Expand Up @@ -867,6 +950,20 @@ mod tests {
assert_eq!(read_as_vec(&mut temp_file), &[0xa, 0xb, 0xc]);
}

#[test]
fn test_direct_io_read() {
check_reading_file(2_500, 4096, 4096, true);
check_reading_file(2_500, 16384, 4096, true);
check_reading_file(25_000, 4096, 4096, true);
check_reading_file(25_000, 16384, 4096, true);
check_reading_file(250_000, 4096, 4096, true);
check_reading_file(250_000, 16384, 4096, true);
check_reading_file(4096, 4096, 4096, true);
check_reading_file(4096, 16384, 4096, true);
check_reading_file(16384, 4096, 4096, true);
check_reading_file(16384, 16384, 4096, true);
}
Comment on lines +953 to +965
Copy link

Copilot AI Feb 10, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

test_direct_io_read assumes the temp directory’s filesystem supports O_DIRECT. On common Linux setups where tempfile uses /tmp mounted as tmpfs, O_DIRECT open can fail with EINVAL, making this unit test flaky across CI environments. Consider skipping the test when opening with O_DIRECT fails, or creating the temp file in a directory/filesystem known to support direct IO.

Copilot uses AI. Check for mistakes.

#[test]
fn test_multiple_unlimited_files() {
let mut temp1 = NamedTempFile::new().unwrap();
Expand Down
Loading