Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions fs/src/buffered_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -390,10 +390,9 @@ pub fn large_file_buf_reader(path: &Path, buf_size: usize) -> io::Result<impl Bu
#[cfg(target_os = "linux")]
{
assert!(agave_io_uring::io_uring_supported());
use crate::io_uring::sequential_file_reader::{SequentialFileReader, DEFAULT_READ_SIZE};
use crate::io_uring::sequential_file_reader::SequentialFileReaderBuilder;

let buf_size = buf_size.max(DEFAULT_READ_SIZE);
SequentialFileReader::with_capacity(buf_size, path)
SequentialFileReaderBuilder::new().build(path, buf_size)
}
#[cfg(not(target_os = "linux"))]
{
Expand Down
221 changes: 131 additions & 90 deletions fs/src/io_uring/sequential_file_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,126 +22,133 @@ use {
// Based on transfers seen with `dd bs=SIZE` for NVME drives: values >=64KiB are fine,
// but peak at 1MiB. Also compare with particular NVME parameters, e.g.
// 32 pages (Maximum Data Transfer Size) * page size (MPSMIN = Memory Page Size) = 128KiB.
pub const DEFAULT_READ_SIZE: usize = 1024 * 1024;
const DEFAULT_READ_SIZE: usize = 1024 * 1024;
// For large file we don't really use workers as few regularly submitted requests get handled
// within sqpoll thread. Allow some workers just in case, but limit them.
const MAX_IOWQ_WORKERS: u32 = 2;
const DEFAULT_MAX_IOWQ_WORKERS: u32 = 2;

/// Reader for non-seekable files.
///
/// Implements read-ahead using io_uring.
pub struct SequentialFileReader<B> {
// Note: state is tied to `backing_buffer` and contains unsafe pointer references to it
inner: Ring<SequentialFileReaderState, ReadOp>,
/// Owned buffer used (chunked into `FixedIoBuffer` items) across lifespan of `inner`
/// (should get dropped last)
_backing_buffer: B,
/// Utility for building `SequentialFileReader` with specified tuning options.
pub struct SequentialFileReaderBuilder {
read_capacity: usize,
max_iowq_workers: u32,
ring_squeue_size: Option<u32>,
/// Register buffer as fixed with the kernel
register_buffer: bool,
}

impl SequentialFileReader<LargeBuffer> {
/// Create a new `SequentialFileReader` for the given `path` using internally allocated
/// buffer of specified `buf_size` and default read size.
pub fn with_capacity(buf_size: usize, path: impl AsRef<Path>) -> io::Result<Self> {
Self::with_buffer(path, LargeBuffer::new(buf_size), DEFAULT_READ_SIZE)
impl SequentialFileReaderBuilder {
pub fn new() -> Self {
Self {
read_capacity: DEFAULT_READ_SIZE,
max_iowq_workers: DEFAULT_MAX_IOWQ_WORKERS,
ring_squeue_size: None,
register_buffer: true,
}
}
}

/// Holds the state of the reader.
struct SequentialFileReaderState {
file: File,
read_capacity: usize,
offset: usize,
eof_buf_index: Option<usize>,
buffers: Vec<ReadBufState>,
current_buf: usize,
}
/// Override the default size of a single IO read operation
///
/// This influences the concurrency, since buffer is divided into chunks of this size.
#[cfg(test)]
pub fn read_capacity(mut self, read_capacity: usize) -> Self {
self.read_capacity = read_capacity;
self
}

impl<B: AsMut<[u8]>> SequentialFileReader<B> {
/// Create a new `SequentialFileReader` for the given file using provided backing `buffer`.
/// Build a new `SequentialFileReader` with internally allocated buffer.
///
/// Buffer will hold at least `buf_capacity` bytes (increased to `read_capacity` if it's lower).
///
/// Initially the reader is idle and starts reading after `set_file` is called. It will then execute
/// multiple `read_capacity` sized reads in parallel to fill the buffer.
pub fn build(
self,
path: impl AsRef<Path>,
buf_capacity: usize,
) -> io::Result<SequentialFileReader<LargeBuffer>> {
let buf_capacity = buf_capacity.max(self.read_capacity);
let buffer = LargeBuffer::new(buf_capacity);
self.build_with_buffer(path, buffer)
}

/// Build a new `SequentialFileReader` with a user-supplied buffer
///
/// `buffer` is the internal buffer used for reading. It must be at least `read_capacity` long.
///
/// Initially the reader is idle and starts reading after the first file is added.
/// The reader will execute multiple `read_capacity` sized reads in parallel to fill the buffer.
pub fn with_buffer(
pub fn build_with_buffer<B: AsMut<[u8]>>(
self,
path: impl AsRef<Path>,
mut buffer: B,
read_capacity: usize,
) -> io::Result<Self> {
let buf_capacity = buffer.as_mut().len();
) -> io::Result<SequentialFileReader<B>> {
// Align buffer capacity to read capacity, so we always read equally sized chunks
let buf_capacity = buffer.as_mut().len() / self.read_capacity * self.read_capacity;
assert_ne!(buf_capacity, 0, "read size aligned buffer is too small");
let buf_slice_mut = &mut buffer.as_mut()[..buf_capacity];

let state = SequentialFileReaderState::new(path, buf_slice_mut, self.read_capacity)?;

let io_uring = self.create_io_uring(buf_capacity)?;
let ring = Ring::new(io_uring, state);

if self.register_buffer {
// Safety: kernel holds unsafe pointers to `buffer`, struct field declaration order
// guarantees that the ring is destroyed before `_backing_buffer` is dropped.
unsafe { FixedIoBuffer::register(buf_slice_mut, &ring)? };
}

let mut reader = SequentialFileReader {
inner: ring,
_backing_buffer: buffer,
};
reader.start_reading()?;
Ok(reader)
}

fn create_io_uring(&self, buf_capacity: usize) -> io::Result<IoUring> {
// Let all buffers be submitted for reading at any time
let max_inflight_ops = (buf_capacity / read_capacity) as u32;
let max_inflight_ops = (buf_capacity / self.read_capacity) as u32;

// Completions arrive in bursts (batching done by the disk controller and the kernel).
// By submitting smaller chunks we decrease the likelihood that we stall on a full completion queue.
// Also, in order to keep some operations submitted at all times, we will `submit` them half-way
// through the buffer (at the cost of doubling syscalls) to let kernel work on one half while the other
// half is read by the user.
let ring_squeue_size = (max_inflight_ops / 2).max(1);

let ring_squeue_size = self
.ring_squeue_size
.unwrap_or((max_inflight_ops / 2).max(1));
// agave io_uring uses cqsize to define state slab size, so cqsize == max inflight ops
let ring = io_uring::IoUring::builder()
let ring = IoUring::builder()
.setup_cqsize(max_inflight_ops)
.build(ring_squeue_size)?;

// Maximum number of spawned [bounded IO, unbounded IO] kernel threads, we don't expect
// any unbounded work, but limit it to 1 just in case (0 leaves it unlimited).
ring.submitter()
.register_iowq_max_workers(&mut [MAX_IOWQ_WORKERS, 1])?;
Self::with_buffer_and_ring(buffer, ring, path, read_capacity)
.register_iowq_max_workers(&mut [self.max_iowq_workers, 1])?;
Ok(ring)
}
}

/// Create a new `SequentialFileReader` for the given file, using a custom
/// ring instance.
fn with_buffer_and_ring(
mut backing_buffer: B,
ring: IoUring,
path: impl AsRef<Path>,
read_capacity: usize,
) -> io::Result<Self> {
let buffer = backing_buffer.as_mut();
assert!(buffer.len() >= read_capacity, "buffer too small");
let read_aligned_buf_len = buffer.len() / read_capacity * read_capacity;
let buffer = &mut buffer[..read_aligned_buf_len];

let file = OpenOptions::new()
.read(true)
.custom_flags(libc::O_NOATIME)
.open(path)?;
// Safety: buffers contain unsafe pointers to `buffer`, but we make sure they are
// dropped before `backing_buffer` is dropped.
let buffers = unsafe { FixedIoBuffer::split_buffer_chunks(buffer, read_capacity) }
.map(ReadBufState::Uninit)
.collect();
let ring = Ring::new(
ring,
SequentialFileReaderState {
file,
read_capacity,
buffers,
offset: 0,
eof_buf_index: None,
current_buf: 0,
},
);

// Safety: kernel holds unsafe pointers to `buffer`, struct field declaration order
// guarantees that the ring is destroyed before `_backing_buffer` is dropped.
unsafe { FixedIoBuffer::register(buffer, &ring)? };

let mut reader = Self {
inner: ring,
_backing_buffer: backing_buffer,
};
/// Reader for non-seekable files.
///
/// Implements read-ahead using io_uring.
pub struct SequentialFileReader<B> {
// Note: state is tied to `backing_buffer` and contains unsafe pointer references to it
inner: Ring<SequentialFileReaderState, ReadOp>,
/// Owned buffer used (chunked into `FixedIoBuffer` items) across lifespan of `inner`
/// (should get dropped last)
_backing_buffer: B,
}

// Start reading all buffers.
for i in 0..reader.inner.context().buffers.len() {
reader.start_reading_buf(i)?;
impl<B: AsMut<[u8]>> SequentialFileReader<B> {
fn start_reading(&mut self) -> io::Result<()> {
for i in 0..self.inner.context().buffers.len() {
self.start_reading_buf(i)?;
}
// Make sure work is started in case submission queue is large and we
// never submitted work when adding buffers.
reader.inner.submit()?;

Ok(reader)
self.inner.submit()?;
Ok(())
}

/// Start reading into the buffer at `index`.
Expand Down Expand Up @@ -281,6 +288,38 @@ impl<B: AsMut<[u8]>> BufRead for SequentialFileReader<B> {
}
}

/// Holds the state of the reader.
struct SequentialFileReaderState {
file: File,
read_capacity: usize,
offset: usize,
eof_buf_index: Option<usize>,
buffers: Vec<ReadBufState>,
current_buf: usize,
}

impl SequentialFileReaderState {
fn new(path: impl AsRef<Path>, buffer: &mut [u8], read_capacity: usize) -> io::Result<Self> {
let file = OpenOptions::new()
.read(true)
.custom_flags(libc::O_NOATIME)
.open(path)?;
// Safety: buffers contain unsafe pointers to `buffer`, but we make sure they are
// dropped before `backing_buffer` in `SequentialFileReader` is dropped.
let buffers = unsafe { FixedIoBuffer::split_buffer_chunks(buffer, read_capacity) }
.map(ReadBufState::Uninit)
.collect();
Ok(Self {
file,
read_capacity,
buffers,
offset: 0,
eof_buf_index: None,
current_buf: 0,
})
}
}

enum ReadBufState {
/// The buffer is pending submission to read queue (on initialization and
/// in transition from `Full` to `Reading`).
Expand Down Expand Up @@ -424,8 +463,10 @@ mod tests {
io::Write::write_all(&mut temp_file, &pattern[..file_size % pattern.len()]).unwrap();

let buf = vec![0; backing_buffer_size];
let mut reader =
SequentialFileReader::with_buffer(temp_file.path(), buf, read_capacity).unwrap();
let mut reader = SequentialFileReaderBuilder::new()
.read_capacity(read_capacity)
.build_with_buffer(temp_file.path(), buf)
.unwrap();

// Read contents from the reader and verify length
let mut all_read_data = Vec::new();
Expand Down
Loading