diff --git a/fs/src/io_uring/file_creator.rs b/fs/src/io_uring/file_creator.rs index 09d1ee9c43c..b213a5c8186 100644 --- a/fs/src/io_uring/file_creator.rs +++ b/fs/src/io_uring/file_creator.rs @@ -13,7 +13,7 @@ use { agave_io_uring::{Completion, FixedSlab, Ring, RingAccess, RingOp}, core::slice, io_uring::{IoUring, opcode, squeue, types}, - libc::{O_CREAT, O_NOATIME, O_NOFOLLOW, O_RDWR, O_TRUNC}, + libc::{O_CREAT, O_DIRECT, O_NOATIME, O_NOFOLLOW, O_RDWR, O_TRUNC}, smallvec::SmallVec, std::{ collections::VecDeque, @@ -34,6 +34,13 @@ use { // 32 pages (Maximum Data Transfer Size) * page size (MPSMIN = Memory Page Size) = 128KiB. pub const DEFAULT_WRITE_SIZE: IoSize = 512 * 1024; +// Write size and file offset alignment for use with direct IO - all modern file systems +// effectively use this constant. +const DIRECT_IO_WRITE_LEN_ALIGNMENT: IoSize = 512; + +// Status flags (updatable on file-descriptor) used as default upon file creation. +const DEFAULT_STATUS_FLAGS: libc::c_int = O_NOATIME; + // 99.9% of accounts storage files are < 8MiB type BacklogVec = SmallVec<[PendingWrite; 8 * 1024 * 1024 / DEFAULT_WRITE_SIZE as usize]>; @@ -60,6 +67,7 @@ pub struct IoUringFileCreatorBuilder<'sp> { shared_sqpoll_fd: Option>, /// Register buffer as fixed with the kernel register_buffer: bool, + write_with_direct_io: bool, } impl<'sp> IoUringFileCreatorBuilder<'sp> { @@ -70,6 +78,7 @@ impl<'sp> IoUringFileCreatorBuilder<'sp> { ring_squeue_size: None, shared_sqpoll_fd: None, register_buffer: true, + write_with_direct_io: false, } } @@ -90,6 +99,15 @@ impl<'sp> IoUringFileCreatorBuilder<'sp> { self } + /// Write files in direct-IO mode (disables kernel caching of written contents). + /// + /// Note that `File` passed in completion callback is still switched to non-direct IO mode. + #[cfg(test)] + pub fn write_with_direct_io(mut self, enable_direct_io: bool) -> Self { + self.write_with_direct_io = enable_direct_io; + self + } + /// Use (or remove) a shared kernel thread to drain submission queue for IO operations pub fn shared_sqpoll(mut self, shared_sqpoll_fd: Option>) -> Self { self.shared_sqpoll_fd = shared_sqpoll_fd; @@ -132,6 +150,16 @@ impl<'sp> IoUringFileCreatorBuilder<'sp> { assert_ne!(buf_capacity, 0, "write size aligned buffer is too small"); let buf_slice_mut = &mut buffer.as_mut()[..buf_capacity]; + // O_DIRECT writes have offset, size and buffer alignment restrictions, we guarantee + // those by splitting buffer and writes by `write_capacity` and requiring its alignment. + assert!( + self.write_capacity + .is_multiple_of(DIRECT_IO_WRITE_LEN_ALIGNMENT) + || !self.write_with_direct_io, + "write capacity ({}) must be multiple of {DIRECT_IO_WRITE_LEN_ALIGNMENT} for direct IO", + self.write_capacity + ); + // Safety: buffers contain unsafe pointers to `buffer`, but we make sure they are // dropped before `backing_buffer` is dropped. let buffers = unsafe { @@ -154,6 +182,7 @@ impl<'sp> IoUringFileCreatorBuilder<'sp> { Ok(IoUringFileCreator { ring, + write_with_direct_io: self.write_with_direct_io, _backing_buffer: buffer, }) } @@ -179,6 +208,7 @@ impl<'sp> IoUringFileCreatorBuilder<'sp> { /// operations. pub struct IoUringFileCreator<'a> { ring: Ring, FileCreatorOp>, + write_with_direct_io: bool, /// Owned buffer used (chunked into [`IoBufferChunk`] items) across lifespan of `ring` /// (should get dropped last) _backing_buffer: PageAlignedMemory, @@ -223,6 +253,7 @@ impl IoUringFileCreator<'_> { path_cstring, mode, file_key, + write_with_direct_io: self.write_with_direct_io, }); self.ring.push(op)?; @@ -257,7 +288,7 @@ impl IoUringFileCreator<'_> { let mut mut_slice = unsafe { slice::from_raw_parts_mut(buf.as_mut_ptr(), buf.len() as usize) }; // Fill as much of the buffer as possible to avoid excess IO operations - let write_len; + let mut write_len; loop { let len = src.read(mut_slice)?; if len == 0 { @@ -273,22 +304,46 @@ impl IoUringFileCreator<'_> { mut_slice = &mut mut_slice[len..]; } + if self.write_with_direct_io && write_len < buf.len() { + let align_truncated_write_len = + write_len / DIRECT_IO_WRITE_LEN_ALIGNMENT * DIRECT_IO_WRITE_LEN_ALIGNMENT; + if align_truncated_write_len != write_len { + // Since file passed to `file_complete` will be switched to non-direct io mode, the last + // non-aligned write can be postponed until that is done + file_state.non_dio_eof_write = Some(FinalNonDirectIoWrite { + buf: None, + file_offset: offset + align_truncated_write_len as FileSize, + buf_offset: align_truncated_write_len, + write_len: write_len - align_truncated_write_len, + }); + write_len = align_truncated_write_len; + } + } + file_state.writes_started += 1; if let Some(file) = &file_state.open_file { + let fd = types::Fd(file.as_raw_fd()); if write_len == 0 { - // In case no operation is in progress (i.e. completions were run for all buffers) - // and EOF was reached just now, the `file_complete` needs to be called, since - // no other operation will run it in its completion handler. - // This is not necessary if `write_len > 0`, since completion of the write to be - // added will handle EOF case properly. - FileCreatorState::mark_write_completed(&mut self.ring, file_key, 0, buf)?; - // Skip issuing empty write - break; + // EOF was reached consuming `src` *and* there isn't any data left to write + // immediately (there might be some stored for non-direct IO mode). Treat it as + // a successful write, perform proper accounting and pass `buf` to the next + // write operation. + // + // Note: this logic might be happening with no pending writes (e.g. if `buf` + // was obtained just before `src` reached EOF), so it's possible that file + // completion will be triggered immediately. + return FileCreatorState::mark_write_completed( + &mut self.ring, + file_key, + fd, + true, + buf, + ); } let op = WriteOp { file_key, - fd: types::Fd(file.as_raw_fd()), + fd, offset, buf, buf_offset: 0, @@ -350,10 +405,11 @@ impl<'a> FileCreatorState<'a> { } /// Returns write backlog that needs to be submitted to IO ring - fn mark_file_opened(&mut self, file_key: usize, fd: types::Fd) -> BacklogVec { + fn mark_file_opened(&mut self, file_key: usize, fd: types::Fd, direct_io: bool) -> BacklogVec { let file = self.files.get_mut(file_key).unwrap(); // Safety: we just received FD from io_uring open, so it's valid, track it in owned File file.open_file = Some(unsafe { File::from_raw_fd(fd.0) }); + file.file_uses_direct_io = direct_io; self.num_owned_files += 1; if self.buffers.len() * 2 > self.buffers.capacity() { self.stats.large_buf_headroom_count += 1; @@ -365,23 +421,46 @@ impl<'a> FileCreatorState<'a> { fn mark_write_completed( ring: &mut impl RingAccess, file_key: usize, - write_len: IoSize, + fd: types::Fd, + is_eof_write: bool, buf: IoBufferChunk, ) -> io::Result<()> { let this = ring.context_mut(); - this.submitted_writes_size -= write_len as usize; - this.buffers.push_front(buf); let file_state = this.files.get_mut(file_key).unwrap(); file_state.writes_completed += 1; - if let Some(file_info) = file_state.try_take_completed_file_info() { - match (this.file_complete)(file_info) { - Some(unconsumed_file) => ring.push(FileCreatorOp::Close(CloseOp::new( - file_key, - unconsumed_file, - )))?, - None => this.mark_file_complete(file_key), - }; + + match file_state.non_dio_eof_write.as_mut() { + Some(eof_write) if is_eof_write => { + // Buffer used for the EOF aligned write might still have remaining (non-aligned) + // data to write. `non_dio_eof_write` has offsets into that buffer, store it. From + // this point the last write is possible to be submitted. + assert!(eof_write.buf.replace(buf).is_none()) + } + // Otherwise return buffer to the pool + _ => this.buffers.push_front(buf), + } + + if file_state.required_writes_done() { + // All aligned writes are done at this point, switch off direct-io before completing file + file_state.ensure_direct_io_disabled(fd)?; + + // After disabling direct-IO, we may still have last write op to schedule. + if let Some(op) = file_state.try_take_final_write_op(file_key, fd) { + file_state.writes_started += 1; + this.submitted_writes_size += op.write_len as usize; + return ring.push(FileCreatorOp::Write(op)); + } + + if let Some(file_info) = file_state.take_completed_file_info() { + match (this.file_complete)(file_info) { + Some(unconsumed_file) => ring.push(FileCreatorOp::Close(CloseOp::new( + file_key, + unconsumed_file, + )))?, + None => this.mark_file_complete(file_key), + }; + } } Ok(()) } @@ -429,13 +508,18 @@ struct OpenOp { path_cstring: Pin, mode: libc::mode_t, file_key: usize, + write_with_direct_io: bool, } impl OpenOp { fn entry(&mut self) -> squeue::Entry { let at_dir_fd = types::Fd(self.dir_handle.as_raw_fd()); + let mut flags = O_CREAT | O_TRUNC | O_NOFOLLOW | O_RDWR | DEFAULT_STATUS_FLAGS; + if self.write_with_direct_io { + flags |= O_DIRECT; + } opcode::OpenAt::new(at_dir_fd, self.path_cstring.as_ptr() as _) - .flags(O_CREAT | O_TRUNC | O_NOFOLLOW | O_RDWR | O_NOATIME) + .flags(flags) .mode(self.mode) .build() } @@ -450,11 +534,12 @@ impl OpenOp { { let fd = types::Fd(res?); - let backlog = ring.context_mut().mark_file_opened(self.file_key, fd); - for (buf, offset, len) in backlog { - if len == 0 { - FileCreatorState::mark_write_completed(ring, self.file_key, 0, buf)?; - break; + let backlog = + ring.context_mut() + .mark_file_opened(self.file_key, fd, self.write_with_direct_io); + for (buf, offset, write_len) in backlog { + if write_len == 0 { + return FileCreatorState::mark_write_completed(ring, self.file_key, fd, true, buf); } let op = WriteOp { file_key: self.file_key, @@ -462,9 +547,9 @@ impl OpenOp { offset, buf, buf_offset: 0, - write_len: len, + write_len, }; - ring.context_mut().submitted_writes_size += len as usize; + ring.context_mut().submitted_writes_size += write_len as usize; ring.push(FileCreatorOp::Write(op))?; } @@ -526,14 +611,13 @@ impl<'a> WriteOp { // Safety: buf is owned by `WriteOp` during the operation handling by the kernel and // reclaimed after completion passed in a call to `mark_write_completed`. let buf_ptr = unsafe { buf.as_mut_ptr().byte_add(*buf_offset as usize) }; - let write_len = *write_len; let entry = match buf.io_buf_index() { - Some(io_buf_index) => opcode::WriteFixed::new(*fd, buf_ptr, write_len, io_buf_index) + Some(io_buf_index) => opcode::WriteFixed::new(*fd, buf_ptr, *write_len, io_buf_index) .offset(*offset) .ioprio(IO_PRIO_BE_HIGHEST) .build(), - None => opcode::Write::new(*fd, buf_ptr, write_len) + None => opcode::Write::new(*fd, buf_ptr, *write_len) .offset(*offset) .ioprio(IO_PRIO_BE_HIGHEST) .build(), @@ -567,8 +651,9 @@ impl<'a> WriteOp { } = self; let buf = mem::replace(buf, IoBufferChunk::empty()); - let total_written = *buf_offset + written; + ring.context_mut().submitted_writes_size -= written as usize; + let total_buf_written = *buf_offset + written; if written < *write_len { log::warn!("short write ({written}/{}), file={}", *write_len, *file_key); return ring.push(FileCreatorOp::Write(WriteOp { @@ -576,12 +661,18 @@ impl<'a> WriteOp { fd: *fd, offset: *offset + written as FileSize, buf, - buf_offset: total_written, + buf_offset: total_buf_written, write_len: *write_len - written, })); } - FileCreatorState::mark_write_completed(ring, *file_key, total_written, buf) + FileCreatorState::mark_write_completed( + ring, + *file_key, + *fd, + total_buf_written < buf.len(), + buf, + ) } } @@ -625,6 +716,9 @@ struct PendingFile { open_file: Option, backlog: BacklogVec, size_on_eof: Option, + file_uses_direct_io: bool, + /// Extra write data populated for direct IO mode if there is non-aligned data at EOF + non_dio_eof_write: Option, writes_started: usize, writes_completed: usize, } @@ -638,6 +732,8 @@ impl PendingFile { writes_started: 0, writes_completed: 0, size_on_eof: None, + file_uses_direct_io: false, + non_dio_eof_write: None, } } @@ -646,17 +742,83 @@ impl PendingFile { CString::new(os_str.as_encoded_bytes()).expect("path mustn't contain interior NULs") } - fn try_take_completed_file_info(&mut self) -> Option { - if self.writes_started != self.writes_completed { - return None; + /// Check if all contents were read from source and scheduled writes are done + /// + /// Note: this returns `true` if current stage writes are done, there might still be + /// last write to be scheduled using `non_dio_eof_write` + fn required_writes_done(&self) -> bool { + self.writes_started == self.writes_completed && self.source_fully_read() + } + + /// Return true if all contents to be written for this file are already read + /// + /// When this condition is satisfied, all write ops are known (either added to ring, + /// stored in `backlog` or in `non_dio_eof_write`) + fn source_fully_read(&self) -> bool { + self.size_on_eof.is_some() + } + + /// Turn off direct IO if file is in this mode + fn ensure_direct_io_disabled(&mut self, fd: types::Fd) -> io::Result<()> { + if self.file_uses_direct_io { + // F_SETFL only updates O_APPEND, O_ASYNC, O_DIRECT, O_NOATIME, out of which + // creator only uses last two, so setting DEFAULT_STATUS_FLAGS disables O_DIRECT. + // Safety: function operates on an open file descriptor + let fcntl_res = unsafe { libc::fcntl(fd.0, libc::F_SETFL, DEFAULT_STATUS_FLAGS) }; + if fcntl_res == -1 { + return Err(io::Error::last_os_error()); + } + self.file_uses_direct_io = false; } - let size = self.size_on_eof?; - let file = self.open_file.take()?; + Ok(()) + } + + /// Extract the final write to be submitted once file is switched off from direct IO mode + fn try_take_final_write_op(&mut self, file_key: usize, fd: types::Fd) -> Option { + let FinalNonDirectIoWrite { + buf, + file_offset: offset, + buf_offset, + write_len, + } = self.non_dio_eof_write.take()?; + let buf = buf.expect("should contain buffer for last write"); + Some(WriteOp { + file_key, + fd, + offset, + buf, + buf_offset, + write_len, + }) + } + + fn take_completed_file_info(&mut self) -> Option { + let size = self.size_on_eof.take().expect("content is fully read"); + let file = self.open_file.take().expect("called once on completion"); let path = mem::take(&mut self.path); Some(FileInfo { file, size, path }) } } +/// Write at the end of file that needs to be made with switched off direct IO +/// +/// The sequence of writes for direct-IO mode is: +/// * schedule all aligned writes +/// * if there is any non-aligned data to be written, store its offsets and size in LastNonDirectIoWrite +/// * whenever EOF write (doesn't use whole buffer) is done, move that buffer to LastNonDirectIoWrite +/// * after all scheduled aligned writes are done, disable direct-IO mode on FD +/// * schedule LastNonDirectIoWrite if there is any +/// * once all writes are done (no more last write to be scheduled), mark file completion +#[derive(Debug)] +struct FinalNonDirectIoWrite { + /// The buffer is the same as the one used for the aligned write at the end of the file, value + /// gets stored once that EOF aligned write is done (or buf acquired for 0-sized op is processed). + buf: Option, + file_offset: FileSize, + buf_offset: IoSize, + write_len: IoSize, +} + #[cfg(test)] mod tests { use {super::*, std::io::Cursor, test_case::test_case}; @@ -729,4 +891,36 @@ mod tests { assert_eq!(file_size, read_data.len()); assert_eq!(data, read_data); } + + #[test_case(4 * DIRECT_IO_WRITE_LEN_ALIGNMENT, 4 * DIRECT_IO_WRITE_LEN_ALIGNMENT)] + #[test_case(4 * DIRECT_IO_WRITE_LEN_ALIGNMENT, 2 * DIRECT_IO_WRITE_LEN_ALIGNMENT)] + #[test_case(5 * DIRECT_IO_WRITE_LEN_ALIGNMENT, 2 * DIRECT_IO_WRITE_LEN_ALIGNMENT)] + #[test_case(1000 + 4 * DIRECT_IO_WRITE_LEN_ALIGNMENT, 2 * DIRECT_IO_WRITE_LEN_ALIGNMENT)] + #[test_case(1000 + 5 * DIRECT_IO_WRITE_LEN_ALIGNMENT, 2 * DIRECT_IO_WRITE_LEN_ALIGNMENT)] + fn test_direct_io_create(file_size: IoSize, buf_size: IoSize) { + let mut read_data = Vec::with_capacity(file_size as usize); + let callback = |mut fi: FileInfo| { + fi.file.read_to_end(&mut read_data).unwrap(); + Some(fi.file) + }; + + let mut creator = IoUringFileCreatorBuilder::new() + .write_capacity(8 * 1024) + .write_with_direct_io(true) + .build(buf_size as usize, callback) + .unwrap(); + + let temp_dir = tempfile::tempdir().unwrap(); + let dir = Arc::new(File::open(temp_dir.path()).unwrap()); + let file_path = temp_dir.path().join("file.txt"); + let data: Vec<_> = (0..).take(file_size as usize).map(|v| v as u8).collect(); + creator + .schedule_create_at_dir(file_path, 0o600, dir, &mut Cursor::new(&data)) + .unwrap(); + creator.drain().unwrap(); + + drop(creator); + assert_eq!(file_size, read_data.len() as IoSize); + assert_eq!(data, read_data); + } }