Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions fs/src/io_uring/dir_remover.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use {
agave_io_uring::{Completion, Ring, RingOp},
agave_io_uring::{Completion, Ring, RingAccess as _, RingOp},
io_uring::{IoUring, opcode, squeue, types},
slab::Slab,
std::{
Expand Down Expand Up @@ -196,7 +196,7 @@ impl UnlinkOp {
//
// Safety: the entry doesn't hold any pointers
if let Some(fd) = dir.fd.take() {
comp.push(Op::Close(CloseOp::new(self.dir_key, fd.into_raw_fd())));
comp.push(Op::Close(CloseOp::new(self.dir_key, fd.into_raw_fd())))?;
}
}

Expand Down
33 changes: 10 additions & 23 deletions fs/src/io_uring/file_creator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use {
sqpoll,
},
},
agave_io_uring::{Completion, FixedSlab, Ring, RingOp},
agave_io_uring::{Completion, FixedSlab, Ring, RingAccess, RingOp},
core::slice,
io_uring::{IoUring, opcode, squeue, types},
libc::{O_CREAT, O_NOATIME, O_NOFOLLOW, O_RDWR, O_TRUNC},
Expand Down Expand Up @@ -276,23 +276,12 @@ impl IoUringFileCreator<'_> {
file_state.writes_started += 1;
if let Some(file) = &file_state.open_file {
if write_len == 0 {
// File size was aligned with previously used buffers, return back unused `buf`
state.buffers.push_front(buf);
file_state.writes_completed += 1;

// In case no operation is in progress (i.e. completions were run for all buffers)
// and EOF was reached just now, the `file_complete` needs to be called, since
// no other operation will run it in its completion handler.
// This is not necessary if `write_len > 0`, since completion of the write to be
// added will handle EOF case properly.
if let Some(file_info) = file_state.try_take_completed_file_info() {
match (state.file_complete)(file_info) {
Some(unconsumed_file) => self.ring.push(FileCreatorOp::Close(
CloseOp::new(file_key, unconsumed_file),
))?,
None => state.mark_file_complete(file_key),
}
}
FileCreatorState::mark_write_completed(&mut self.ring, file_key, 0, buf)?;
// Skip issuing empty write
break;
}
Expand Down Expand Up @@ -374,11 +363,11 @@ impl<'a> FileCreatorState<'a> {

/// Calls `file_complete` callback with completed file info and optionally schedules close
fn mark_write_completed(
ring: &mut Completion<'_, Self, FileCreatorOp>,
ring: &mut impl RingAccess<Context = Self, Operation = FileCreatorOp>,
file_key: usize,
write_len: IoSize,
buf: IoBufferChunk,
) {
) -> io::Result<()> {
let this = ring.context_mut();
this.submitted_writes_size -= write_len as usize;
this.buffers.push_front(buf);
Expand All @@ -390,10 +379,11 @@ impl<'a> FileCreatorState<'a> {
Some(unconsumed_file) => ring.push(FileCreatorOp::Close(CloseOp::new(
file_key,
unconsumed_file,
))),
)))?,
None => this.mark_file_complete(file_key),
};
}
Ok(())
}

fn mark_file_complete(&mut self, file_key: usize) {
Expand Down Expand Up @@ -463,7 +453,7 @@ impl OpenOp {
let backlog = ring.context_mut().mark_file_opened(self.file_key, fd);
for (buf, offset, len) in backlog {
if len == 0 {
FileCreatorState::mark_write_completed(ring, self.file_key, 0, buf);
FileCreatorState::mark_write_completed(ring, self.file_key, 0, buf)?;
break;
}
let op = WriteOp {
Expand All @@ -475,7 +465,7 @@ impl OpenOp {
write_len: len,
};
ring.context_mut().submitted_writes_size += len as usize;
ring.push(FileCreatorOp::Write(op));
ring.push(FileCreatorOp::Write(op))?;
}

Ok(())
Expand Down Expand Up @@ -581,20 +571,17 @@ impl<'a> WriteOp {

if written < *write_len {
log::warn!("short write ({written}/{}), file={}", *write_len, *file_key);
ring.push(FileCreatorOp::Write(WriteOp {
return ring.push(FileCreatorOp::Write(WriteOp {
file_key: *file_key,
fd: *fd,
offset: *offset + written as FileSize,
buf,
buf_offset: total_written,
write_len: *write_len - written,
}));
return Ok(());
}

FileCreatorState::mark_write_completed(ring, *file_key, total_written, buf);

Ok(())
FileCreatorState::mark_write_completed(ring, *file_key, total_written, buf)
}
}

Expand Down
4 changes: 2 additions & 2 deletions fs/src/io_uring/sequential_file_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use {
memory::{IoBufferChunk, PageAlignedMemory},
},
crate::{FileSize, IoSize, buffered_reader::FileBufRead, io_uring::sqpoll},
agave_io_uring::{Completion, Ring, RingOp},
agave_io_uring::{Completion, Ring, RingAccess as _, RingOp},
io_uring::{IoUring, opcode, squeue, types},
std::{
collections::VecDeque,
Expand Down Expand Up @@ -825,7 +825,7 @@ impl RingOp<BuffersState> for ReadOp {
// Safety:
// The op points to a buffer which is guaranteed to be valid for the
// lifetime of the operation
completion.push(op);
completion.push(op)?;
} else {
buffers[*reader_buf_index as usize] = ReadBufState::Full {
buf,
Expand Down
130 changes: 77 additions & 53 deletions io-uring/src/ring.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,28 @@ use {
std::{io, os::fd::RawFd, time::Duration},
};

/// Trait for accessing the context and pushing operations to the [Ring].
///
/// Enables generic operations on [Ring] or [Completion].
pub trait RingAccess {
type Context;
type Operation;

/// Returns a reference to the context value stored in a [Ring].
fn context(&self) -> &Self::Context;

/// Returns a mutable reference to the context value stored in a [Ring].
fn context_mut(&mut self) -> &mut Self::Context;

/// Pushes an operation for execution in io_uring.
///
/// Once completed, [RingOp::complete] will be called with the result.
///
/// Note that the exact moment the operation is submitted to the kernel is implementation
/// specific.
fn push(&mut self, op: Self::Operation) -> io::Result<()>;
}

/// An io_uring instance.
pub struct Ring<T, E: RingOp<T>> {
ring: IoUring,
Expand All @@ -29,16 +51,6 @@ impl<T, E: RingOp<T>> Ring<T, E> {
}
}

/// Returns a reference to the context value.
pub fn context(&self) -> &T {
&self.context
}

/// Returns a mutable reference to the context value.
pub fn context_mut(&mut self) -> &mut T {
&mut self.context
}

/// Registers in-memory fixed buffers for I/O with the kernel.
///
/// # Safety
Expand All @@ -63,40 +75,6 @@ impl<T, E: RingOp<T>> Ring<T, E> {
self.ring.submitter().register_files(fds)
}

/// Pushes an operation to the submission queue.
///
/// Once completed, [RingOp::complete] will be called with the result.
///
/// Note that the operation is not submitted to the kernel until [Ring::submit] is called. If
/// the submission queue is full, submit will be called internally to make room for the new
/// operation.
///
/// See also [Ring::submit].
pub fn push(&mut self, op: E) -> io::Result<()> {
loop {
self.process_completions()?;

if !self.entries.is_full() {
break;
}
// if the entries slab is full, we need to submit and poll
// completions to make room
self.submit_and_wait(1, None)?;
}
let key = self.entries.insert(op);
let entry = self.entries.get_mut(key).unwrap().entry();
let entry = entry.user_data(key as u64);
// Safety: the entry is stored in self.entries and guaranteed to be valid for the lifetime
// of the operation. E implementations must still ensure that the entry
// remains valid until the last E::complete call.
while unsafe { self.ring.submission().push(&entry) }.is_err() {
self.submit()?;
self.process_completions()?;
}

Ok(())
}

/// Submits all pending operations to the kernel.
///
/// If the ring can't accept any more submissions because the completion
Expand Down Expand Up @@ -225,23 +203,69 @@ pub struct Completion<'a, T, E: RingOp<T>> {
context: &'a mut T,
}

impl<T, E: RingOp<T>> Completion<'_, T, E> {
/// Returns a reference to the context value stored in a [Ring].
pub fn context(&self) -> &T {
self.context
impl<T, E: RingOp<T>> RingAccess for Ring<T, E> {
type Context = T;
type Operation = E;

fn context(&self) -> &T {
&self.context
}

/// Returns a mutable reference to the context value stored in a [Ring].
pub fn context_mut(&mut self) -> &mut T {
self.context
fn context_mut(&mut self) -> &mut T {
&mut self.context
}

/// Pushes an operation to the submission queue.
///
/// Note that the operation is not submitted to the kernel until [Ring::submit] is called. If
/// the submission queue is full, submit will be called internally to make room for the new
/// operation.
///
/// See also [Ring::submit].
fn push(&mut self, op: E) -> io::Result<()> {
loop {
self.process_completions()?;

if !self.entries.is_full() {
break;
}
// if the entries slab is full, we need to submit and poll
// completions to make room
self.submit_and_wait(1, None)?;
}
let key = self.entries.insert(op);
let entry = self.entries.get_mut(key).unwrap().entry();
let entry = entry.user_data(key as u64);
// Safety: the entry is stored in self.entries and guaranteed to be valid for the lifetime
// of the operation. E implementations must still ensure that the entry
// remains valid until the last E::complete call.
while unsafe { self.ring.submission().push(&entry) }.is_err() {
self.submit()?;
self.process_completions()?;
}

Ok(())
}
}

impl<T, E: RingOp<T>> RingAccess for Completion<'_, T, E> {
type Context = T;
type Operation = E;

fn context(&self) -> &T {
self.context
}

fn context_mut(&mut self) -> &mut T {
self.context
}

/// This can be used to push new operations from within [RingOp::complete].
///
/// See also [Ring::push].
pub fn push(&mut self, op: E) {
/// Note that the operations are buffered until completion is finished and then pushed
/// to the parent [Ring].
fn push(&mut self, op: E) -> io::Result<()> {
self.new_entries.push(op);
Ok(())
}
}
Loading