From 8edc156bdf3a92f1c162091f4daec61557de8e48 Mon Sep 17 00:00:00 2001 From: Kamil Skalski Date: Fri, 16 Jan 2026 15:17:04 +0800 Subject: [PATCH] feat(io_uring): generic access to context and push for Ring and Completion --- fs/src/io_uring/dir_remover.rs | 4 +- fs/src/io_uring/file_creator.rs | 33 ++---- fs/src/io_uring/sequential_file_reader.rs | 4 +- io-uring/src/ring.rs | 130 +++++++++++++--------- 4 files changed, 91 insertions(+), 80 deletions(-) diff --git a/fs/src/io_uring/dir_remover.rs b/fs/src/io_uring/dir_remover.rs index e19b9d1659c..4991485cd1f 100644 --- a/fs/src/io_uring/dir_remover.rs +++ b/fs/src/io_uring/dir_remover.rs @@ -1,5 +1,5 @@ use { - agave_io_uring::{Completion, Ring, RingOp}, + agave_io_uring::{Completion, Ring, RingAccess as _, RingOp}, io_uring::{IoUring, opcode, squeue, types}, slab::Slab, std::{ @@ -196,7 +196,7 @@ impl UnlinkOp { // // Safety: the entry doesn't hold any pointers if let Some(fd) = dir.fd.take() { - comp.push(Op::Close(CloseOp::new(self.dir_key, fd.into_raw_fd()))); + comp.push(Op::Close(CloseOp::new(self.dir_key, fd.into_raw_fd())))?; } } diff --git a/fs/src/io_uring/file_creator.rs b/fs/src/io_uring/file_creator.rs index 46b648170b5..09d1ee9c43c 100644 --- a/fs/src/io_uring/file_creator.rs +++ b/fs/src/io_uring/file_creator.rs @@ -10,7 +10,7 @@ use { sqpoll, }, }, - agave_io_uring::{Completion, FixedSlab, Ring, RingOp}, + agave_io_uring::{Completion, FixedSlab, Ring, RingAccess, RingOp}, core::slice, io_uring::{IoUring, opcode, squeue, types}, libc::{O_CREAT, O_NOATIME, O_NOFOLLOW, O_RDWR, O_TRUNC}, @@ -276,23 +276,12 @@ impl IoUringFileCreator<'_> { file_state.writes_started += 1; if let Some(file) = &file_state.open_file { if write_len == 0 { - // File size was aligned with previously used buffers, return back unused `buf` - state.buffers.push_front(buf); - file_state.writes_completed += 1; - // In case no operation is in progress (i.e. completions were run for all buffers) // and EOF was reached just now, the `file_complete` needs to be called, since // no other operation will run it in its completion handler. // This is not necessary if `write_len > 0`, since completion of the write to be // added will handle EOF case properly. - if let Some(file_info) = file_state.try_take_completed_file_info() { - match (state.file_complete)(file_info) { - Some(unconsumed_file) => self.ring.push(FileCreatorOp::Close( - CloseOp::new(file_key, unconsumed_file), - ))?, - None => state.mark_file_complete(file_key), - } - } + FileCreatorState::mark_write_completed(&mut self.ring, file_key, 0, buf)?; // Skip issuing empty write break; } @@ -374,11 +363,11 @@ impl<'a> FileCreatorState<'a> { /// Calls `file_complete` callback with completed file info and optionally schedules close fn mark_write_completed( - ring: &mut Completion<'_, Self, FileCreatorOp>, + ring: &mut impl RingAccess, file_key: usize, write_len: IoSize, buf: IoBufferChunk, - ) { + ) -> io::Result<()> { let this = ring.context_mut(); this.submitted_writes_size -= write_len as usize; this.buffers.push_front(buf); @@ -390,10 +379,11 @@ impl<'a> FileCreatorState<'a> { Some(unconsumed_file) => ring.push(FileCreatorOp::Close(CloseOp::new( file_key, unconsumed_file, - ))), + )))?, None => this.mark_file_complete(file_key), }; } + Ok(()) } fn mark_file_complete(&mut self, file_key: usize) { @@ -463,7 +453,7 @@ impl OpenOp { let backlog = ring.context_mut().mark_file_opened(self.file_key, fd); for (buf, offset, len) in backlog { if len == 0 { - FileCreatorState::mark_write_completed(ring, self.file_key, 0, buf); + FileCreatorState::mark_write_completed(ring, self.file_key, 0, buf)?; break; } let op = WriteOp { @@ -475,7 +465,7 @@ impl OpenOp { write_len: len, }; ring.context_mut().submitted_writes_size += len as usize; - ring.push(FileCreatorOp::Write(op)); + ring.push(FileCreatorOp::Write(op))?; } Ok(()) @@ -581,7 +571,7 @@ impl<'a> WriteOp { if written < *write_len { log::warn!("short write ({written}/{}), file={}", *write_len, *file_key); - ring.push(FileCreatorOp::Write(WriteOp { + return ring.push(FileCreatorOp::Write(WriteOp { file_key: *file_key, fd: *fd, offset: *offset + written as FileSize, @@ -589,12 +579,9 @@ impl<'a> WriteOp { buf_offset: total_written, write_len: *write_len - written, })); - return Ok(()); } - FileCreatorState::mark_write_completed(ring, *file_key, total_written, buf); - - Ok(()) + FileCreatorState::mark_write_completed(ring, *file_key, total_written, buf) } } diff --git a/fs/src/io_uring/sequential_file_reader.rs b/fs/src/io_uring/sequential_file_reader.rs index b2031df351f..31ea6662ea9 100644 --- a/fs/src/io_uring/sequential_file_reader.rs +++ b/fs/src/io_uring/sequential_file_reader.rs @@ -6,7 +6,7 @@ use { memory::{IoBufferChunk, PageAlignedMemory}, }, crate::{FileSize, IoSize, buffered_reader::FileBufRead, io_uring::sqpoll}, - agave_io_uring::{Completion, Ring, RingOp}, + agave_io_uring::{Completion, Ring, RingAccess as _, RingOp}, io_uring::{IoUring, opcode, squeue, types}, std::{ collections::VecDeque, @@ -825,7 +825,7 @@ impl RingOp for ReadOp { // Safety: // The op points to a buffer which is guaranteed to be valid for the // lifetime of the operation - completion.push(op); + completion.push(op)?; } else { buffers[*reader_buf_index as usize] = ReadBufState::Full { buf, diff --git a/io-uring/src/ring.rs b/io-uring/src/ring.rs index 47df377345e..8cd6356c115 100644 --- a/io-uring/src/ring.rs +++ b/io-uring/src/ring.rs @@ -8,6 +8,28 @@ use { std::{io, os::fd::RawFd, time::Duration}, }; +/// Trait for accessing the context and pushing operations to the [Ring]. +/// +/// Enables generic operations on [Ring] or [Completion]. +pub trait RingAccess { + type Context; + type Operation; + + /// Returns a reference to the context value stored in a [Ring]. + fn context(&self) -> &Self::Context; + + /// Returns a mutable reference to the context value stored in a [Ring]. + fn context_mut(&mut self) -> &mut Self::Context; + + /// Pushes an operation for execution in io_uring. + /// + /// Once completed, [RingOp::complete] will be called with the result. + /// + /// Note that the exact moment the operation is submitted to the kernel is implementation + /// specific. + fn push(&mut self, op: Self::Operation) -> io::Result<()>; +} + /// An io_uring instance. pub struct Ring> { ring: IoUring, @@ -29,16 +51,6 @@ impl> Ring { } } - /// Returns a reference to the context value. - pub fn context(&self) -> &T { - &self.context - } - - /// Returns a mutable reference to the context value. - pub fn context_mut(&mut self) -> &mut T { - &mut self.context - } - /// Registers in-memory fixed buffers for I/O with the kernel. /// /// # Safety @@ -63,40 +75,6 @@ impl> Ring { self.ring.submitter().register_files(fds) } - /// Pushes an operation to the submission queue. - /// - /// Once completed, [RingOp::complete] will be called with the result. - /// - /// Note that the operation is not submitted to the kernel until [Ring::submit] is called. If - /// the submission queue is full, submit will be called internally to make room for the new - /// operation. - /// - /// See also [Ring::submit]. - pub fn push(&mut self, op: E) -> io::Result<()> { - loop { - self.process_completions()?; - - if !self.entries.is_full() { - break; - } - // if the entries slab is full, we need to submit and poll - // completions to make room - self.submit_and_wait(1, None)?; - } - let key = self.entries.insert(op); - let entry = self.entries.get_mut(key).unwrap().entry(); - let entry = entry.user_data(key as u64); - // Safety: the entry is stored in self.entries and guaranteed to be valid for the lifetime - // of the operation. E implementations must still ensure that the entry - // remains valid until the last E::complete call. - while unsafe { self.ring.submission().push(&entry) }.is_err() { - self.submit()?; - self.process_completions()?; - } - - Ok(()) - } - /// Submits all pending operations to the kernel. /// /// If the ring can't accept any more submissions because the completion @@ -225,23 +203,69 @@ pub struct Completion<'a, T, E: RingOp> { context: &'a mut T, } -impl> Completion<'_, T, E> { - /// Returns a reference to the context value stored in a [Ring]. - pub fn context(&self) -> &T { - self.context +impl> RingAccess for Ring { + type Context = T; + type Operation = E; + + fn context(&self) -> &T { + &self.context } - /// Returns a mutable reference to the context value stored in a [Ring]. - pub fn context_mut(&mut self) -> &mut T { - self.context + fn context_mut(&mut self) -> &mut T { + &mut self.context } /// Pushes an operation to the submission queue. /// + /// Note that the operation is not submitted to the kernel until [Ring::submit] is called. If + /// the submission queue is full, submit will be called internally to make room for the new + /// operation. + /// + /// See also [Ring::submit]. + fn push(&mut self, op: E) -> io::Result<()> { + loop { + self.process_completions()?; + + if !self.entries.is_full() { + break; + } + // if the entries slab is full, we need to submit and poll + // completions to make room + self.submit_and_wait(1, None)?; + } + let key = self.entries.insert(op); + let entry = self.entries.get_mut(key).unwrap().entry(); + let entry = entry.user_data(key as u64); + // Safety: the entry is stored in self.entries and guaranteed to be valid for the lifetime + // of the operation. E implementations must still ensure that the entry + // remains valid until the last E::complete call. + while unsafe { self.ring.submission().push(&entry) }.is_err() { + self.submit()?; + self.process_completions()?; + } + + Ok(()) + } +} + +impl> RingAccess for Completion<'_, T, E> { + type Context = T; + type Operation = E; + + fn context(&self) -> &T { + self.context + } + + fn context_mut(&mut self) -> &mut T { + self.context + } + /// This can be used to push new operations from within [RingOp::complete]. /// - /// See also [Ring::push]. - pub fn push(&mut self, op: E) { + /// Note that the operations are buffered until completion is finished and then pushed + /// to the parent [Ring]. + fn push(&mut self, op: E) -> io::Result<()> { self.new_entries.push(op); + Ok(()) } }