diff --git a/tokio/Cargo.toml b/tokio/Cargo.toml index 7caea7d09d1..51ba9a38228 100644 --- a/tokio/Cargo.toml +++ b/tokio/Cargo.toml @@ -85,7 +85,7 @@ sync = [] test-util = ["rt", "sync", "time"] time = [] # Unstable feature. Requires `--cfg tokio_unstable` to enable. -io-uring = ["dep:io-uring", "libc", "mio/os-poll", "mio/os-ext", "dep:slab"] +io-uring = ["dep:io-uring", "linux-raw-sys", "libc", "mio/os-poll", "mio/os-ext", "dep:slab"] # Unstable feature. Requires `--cfg tokio_unstable` to enable. taskdump = ["dep:backtrace"] @@ -111,6 +111,7 @@ tracing = { version = "0.1.29", default-features = false, features = ["std"], op # Requires `--cfg tokio_unstable` to enable. [target.'cfg(all(tokio_unstable, target_os = "linux"))'.dependencies] io-uring = { version = "0.7.6", default-features = false, optional = true } +linux-raw-sys = { version = "0.12.1", optional = true } libc = { version = "0.2.168", optional = true } mio = { version = "1.0.1", default-features = false, features = ["os-poll", "os-ext"], optional = true } slab = { version = "0.4.9", optional = true } diff --git a/tokio/src/fs/mocks.rs b/tokio/src/fs/mocks.rs index ae5d7e5368e..138e12ac6bb 100644 --- a/tokio/src/fs/mocks.rs +++ b/tokio/src/fs/mocks.rs @@ -106,6 +106,15 @@ impl From for OwnedFd { } } +#[cfg(all(test, unix))] +impl From for MockFile { + #[inline] + fn from(file: OwnedFd) -> MockFile { + use std::os::fd::IntoRawFd; + unsafe { MockFile::from_raw_fd(IntoRawFd::into_raw_fd(file)) } + } +} + tokio_thread_local! { static QUEUE: RefCell>> = RefCell::new(VecDeque::new()) } diff --git a/tokio/src/fs/open_options.rs b/tokio/src/fs/open_options.rs index 9646dec6582..761ced4bab1 100644 --- a/tokio/src/fs/open_options.rs +++ b/tokio/src/fs/open_options.rs @@ -6,7 +6,6 @@ use std::path::Path; cfg_io_uring! { mod uring_open_options; pub(crate) use uring_open_options::UringOpenOptions; - use crate::runtime::driver::op::Op; } #[cfg(test)] @@ -518,8 +517,12 @@ impl OpenOptions { /// [`Other`]: std::io::ErrorKind::Other /// [`PermissionDenied`]: std::io::ErrorKind::PermissionDenied pub async fn open(&self, path: impl AsRef) -> io::Result { + self.open_inner(path.as_ref()).await + } + + async fn open_inner(&self, path: &Path) -> io::Result { match &self.inner { - Kind::Std(opts) => Self::std_open(opts, path).await, + Kind::Std(opts) => Self::std_open(opts.clone(), path).await, #[cfg(all( tokio_unstable, feature = "io-uring", @@ -528,25 +531,30 @@ impl OpenOptions { target_os = "linux" ))] Kind::Uring(opts) => { + #[cfg(test)] + use super::mocks::MockFile as StdFile; + #[cfg(not(test))] + use std::fs::File as StdFile; + let handle = crate::runtime::Handle::current(); let driver_handle = handle.inner.driver().io(); if driver_handle.check_and_init()? { - Op::open(path.as_ref(), opts)?.await + opts.open(path.as_ref()) + .await + .map(|fd| File::from_std(StdFile::from(fd))) } else { let opts = opts.clone().into(); - Self::std_open(&opts, path).await + Self::std_open(opts, path).await } } } } - async fn std_open(opts: &StdOpenOptions, path: impl AsRef) -> io::Result { - let path = path.as_ref().to_owned(); - let opts = opts.clone(); - - let std = asyncify(move || opts.open(path)).await?; - Ok(File::from_std(std)) + async fn std_open(opts: StdOpenOptions, path: &Path) -> io::Result { + let path = path.to_owned(); + let std = asyncify(move || opts.open(path).map(File::from_std)).await?; + Ok(std) } #[cfg(windows)] diff --git a/tokio/src/fs/open_options/uring_open_options.rs b/tokio/src/fs/open_options/uring_open_options.rs index 48297ca3b5b..09097e7930b 100644 --- a/tokio/src/fs/open_options/uring_open_options.rs +++ b/tokio/src/fs/open_options/uring_open_options.rs @@ -2,8 +2,11 @@ use std::{io, os::unix::fs::OpenOptionsExt}; #[cfg(test)] use super::mock_open_options::MockOpenOptions as StdOpenOptions; +use crate::runtime::driver::op::Op; #[cfg(not(test))] use std::fs::OpenOptions as StdOpenOptions; +use std::os::fd::OwnedFd; +use std::path::Path; #[derive(Debug, Clone)] pub(crate) struct UringOpenOptions { @@ -107,6 +110,10 @@ impl UringOpenOptions { (_, _, true) => libc::O_CREAT | libc::O_EXCL, }) } + + pub(crate) async fn open(&self, path: &Path) -> io::Result { + Op::open(path, self)?.await + } } impl From for StdOpenOptions { diff --git a/tokio/src/fs/read.rs b/tokio/src/fs/read.rs index 955c3592c85..86e3026b0a1 100644 --- a/tokio/src/fs/read.rs +++ b/tokio/src/fs/read.rs @@ -54,7 +54,7 @@ use std::{io, path::Path}; /// } /// ``` pub async fn read(path: impl AsRef) -> io::Result> { - let path = path.as_ref().to_owned(); + let path = path.as_ref(); #[cfg(all( tokio_unstable, @@ -69,9 +69,14 @@ pub async fn read(path: impl AsRef) -> io::Result> { let handle = crate::runtime::Handle::current(); let driver_handle = handle.inner.driver().io(); if driver_handle.check_and_init()? { - return read_uring(&path).await; + return read_uring(path).await; } } + read_spawn_blocking(path).await +} + +async fn read_spawn_blocking(path: &Path) -> io::Result> { + let path = path.to_owned(); asyncify(move || std::fs::read(path)).await } diff --git a/tokio/src/fs/read_uring.rs b/tokio/src/fs/read_uring.rs index 5b38c212246..1ef844b5803 100644 --- a/tokio/src/fs/read_uring.rs +++ b/tokio/src/fs/read_uring.rs @@ -1,4 +1,4 @@ -use crate::fs::OpenOptions; +use crate::fs::UringOpenOptions; use crate::runtime::driver::op::Op; use std::io; @@ -17,15 +17,13 @@ const PROBE_SIZE_U32: u32 = PROBE_SIZE as u32; const MAX_READ_SIZE: usize = 64 * 1024 * 1024; pub(crate) async fn read_uring(path: &Path) -> io::Result> { - let file = OpenOptions::new().read(true).open(path).await?; + let fd = UringOpenOptions::new().read(true).open(path).await?; - // TODO: use io uring in the future to obtain metadata - let size_hint: Option = file.metadata().await.map(|m| m.len() as usize).ok(); + let (size_hint, fd) = Op::metadata_fd(fd).await; - let fd: OwnedFd = file - .try_into_std() - .expect("unexpected in-flight operation detected") - .into(); + let size_hint: Option = size_hint + .ok() + .map(|m| usize::try_from(m.len()).unwrap_or(usize::MAX)); let mut buf = Vec::new(); diff --git a/tokio/src/fs/try_exists.rs b/tokio/src/fs/try_exists.rs index 2e8de04e0c5..170dedb5de6 100644 --- a/tokio/src/fs/try_exists.rs +++ b/tokio/src/fs/try_exists.rs @@ -23,6 +23,39 @@ use std::path::Path; /// # } /// ``` pub async fn try_exists(path: impl AsRef) -> io::Result { - let path = path.as_ref().to_owned(); + let path = path.as_ref(); + + #[cfg(all( + tokio_unstable, + feature = "io-uring", + feature = "rt", + feature = "fs", + target_os = "linux" + ))] + { + let handle = crate::runtime::Handle::current(); + let driver_handle = handle.inner.driver().io(); + if driver_handle.check_and_init()? { + return try_exists_uring(path).await; + } + } + + try_exists_spawn_blocking(path).await +} + +cfg_io_uring! { + async fn try_exists_uring(path: &Path) -> io::Result { + use crate::runtime::driver::op::Op; + + match Op::metadata(path)?.await { + Ok(_) => Ok(true), + Err(error) if error.kind() == io::ErrorKind::NotFound => Ok(false), + Err(error) => Err(error), + } + } +} + +async fn try_exists_spawn_blocking(path: &Path) -> io::Result { + let path = path.to_owned(); asyncify(move || path.try_exists()).await } diff --git a/tokio/src/fs/write.rs b/tokio/src/fs/write.rs index 76387134c8b..ebc717757c1 100644 --- a/tokio/src/fs/write.rs +++ b/tokio/src/fs/write.rs @@ -45,49 +45,38 @@ pub async fn write(path: impl AsRef, contents: impl AsRef<[u8]>) -> io::Re write_spawn_blocking(path, contents).await } -#[cfg(all( - tokio_unstable, - feature = "io-uring", - feature = "rt", - feature = "fs", - target_os = "linux" -))] -async fn write_uring(path: &Path, mut buf: OwnedBuf) -> io::Result<()> { - use crate::{fs::OpenOptions, runtime::driver::op::Op}; - use std::os::fd::OwnedFd; +cfg_io_uring! { + async fn write_uring(path: &Path, mut buf: OwnedBuf) -> io::Result<()> { + use crate::{fs::UringOpenOptions, runtime::driver::op::Op}; - let file = OpenOptions::new() - .write(true) - .create(true) - .truncate(true) - .open(path) - .await?; + let mut fd = UringOpenOptions::new() + .write(true) + .create(true) + .truncate(true) + .open(path) + .await?; - let mut fd: OwnedFd = file - .try_into_std() - .expect("unexpected in-flight operation detected") - .into(); + let total: usize = buf.as_ref().len(); + let mut buf_offset: usize = 0; + let mut file_offset: u64 = 0; + while buf_offset < total { + let (res, _buf, _fd) = Op::write_at(fd, buf, buf_offset, file_offset)?.await; - let total: usize = buf.as_ref().len(); - let mut buf_offset: usize = 0; - let mut file_offset: u64 = 0; - while buf_offset < total { - let (res, _buf, _fd) = Op::write_at(fd, buf, buf_offset, file_offset)?.await; + let n = match res { + Ok(0) => return Err(io::ErrorKind::WriteZero.into()), + Ok(n) => n, + Err(e) if e.kind() == io::ErrorKind::Interrupted => 0, + Err(e) => return Err(e), + }; - let n = match res { - Ok(0) => return Err(io::ErrorKind::WriteZero.into()), - Ok(n) => n, - Err(e) if e.kind() == io::ErrorKind::Interrupted => 0, - Err(e) => return Err(e), - }; + buf = _buf; + fd = _fd; + buf_offset += n as usize; + file_offset += n as u64; + } - buf = _buf; - fd = _fd; - buf_offset += n as usize; - file_offset += n as u64; + Ok(()) } - - Ok(()) } async fn write_spawn_blocking(path: &Path, contents: OwnedBuf) -> io::Result<()> { diff --git a/tokio/src/io/uring/mod.rs b/tokio/src/io/uring/mod.rs index facad596f63..c398f3d88e5 100644 --- a/tokio/src/io/uring/mod.rs +++ b/tokio/src/io/uring/mod.rs @@ -1,4 +1,5 @@ pub(crate) mod open; pub(crate) mod read; +pub(crate) mod statx; pub(crate) mod utils; pub(crate) mod write; diff --git a/tokio/src/io/uring/open.rs b/tokio/src/io/uring/open.rs index 913588c665c..3785e522ba9 100644 --- a/tokio/src/io/uring/open.rs +++ b/tokio/src/io/uring/open.rs @@ -6,7 +6,7 @@ use crate::runtime::driver::op::{CancelData, Cancellable, Completable, CqeResult use io_uring::{opcode, types}; use std::ffi::CString; use std::io::{self, Error}; -use std::os::fd::FromRawFd; +use std::os::fd::{FromRawFd, OwnedFd}; use std::path::Path; #[derive(Debug)] @@ -18,10 +18,10 @@ pub(crate) struct Open { } impl Completable for Open { - type Output = io::Result; + type Output = io::Result; fn complete(self, cqe: CqeResult) -> Self::Output { cqe.result - .map(|fd| unsafe { crate::fs::File::from_raw_fd(fd as i32) }) + .map(|fd| unsafe { OwnedFd::from_raw_fd(fd as i32) }) } fn complete_with_error(self, err: Error) -> Self::Output { diff --git a/tokio/src/io/uring/statx.rs b/tokio/src/io/uring/statx.rs new file mode 100644 index 00000000000..0518ff69049 --- /dev/null +++ b/tokio/src/io/uring/statx.rs @@ -0,0 +1,136 @@ +use crate::io::uring::utils::{box_assume_init, box_new_uninit, cstr}; +use crate::runtime::driver::op::{CancelData, Cancellable, Completable, CqeResult, Op}; +use io_uring::{opcode, types}; +use linux_raw_sys::general::statx; +use std::fmt::{Debug, Formatter}; +use std::io; +use std::mem::MaybeUninit; +use std::os::fd::{AsRawFd, OwnedFd}; +use std::path::Path; + +pub(crate) struct Metadata(statx); + +impl Metadata { + pub(crate) fn len(&self) -> u64 { + self.0.stx_size + } +} + +impl Debug for Metadata { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + let mut debug = f.debug_struct("Metadata"); + debug.field("len", &self.len()); + debug.finish_non_exhaustive() + } +} + +#[derive(Debug)] +pub(crate) struct Statx { + /// This field will be read by the kernel during the operation, so we + /// need to ensure it is valid for the entire duration of the operation. + #[allow(dead_code)] + path: std::ffi::CString, + buffer: Box>, +} + +impl Completable for Statx { + type Output = io::Result; + + fn complete(self, cqe: CqeResult) -> Self::Output { + cqe.result + .map(|_| Metadata(*unsafe { box_assume_init(self.buffer) })) + } + + fn complete_with_error(self, error: io::Error) -> Self::Output { + Err(error) + } +} + +impl Cancellable for Statx { + fn cancel(self) -> CancelData { + CancelData::Statx(self) + } +} + +impl Op { + /// Submit a request to open a file. + fn statx(path: &Path, follow_symlinks: bool) -> io::Result> { + let path = cstr(path)?; + let mut buffer = box_new_uninit::(); + + let flags: u32 = linux_raw_sys::general::AT_STATX_SYNC_AS_STAT + | (linux_raw_sys::general::AT_SYMLINK_NOFOLLOW * u32::from(!follow_symlinks)); + + let statx_op = opcode::Statx::new( + types::Fd(libc::AT_FDCWD), + path.as_ptr(), + buffer.as_mut_ptr().cast(), + ) + .flags(flags as i32) + .mask(linux_raw_sys::general::STATX_BASIC_STATS) + .build(); + + // SAFETY: Parameters are valid for the entire duration of the operation + Ok(unsafe { Op::new(statx_op, Statx { path, buffer }) }) + } + + pub(crate) fn metadata(path: &Path) -> io::Result> { + Op::statx(path, true) + } + + // pub(crate) fn symlink_metadata(path: &Path) -> io::Result> { + // Op::statx(path, false) + // } +} + +#[derive(Debug)] +pub(crate) struct StatxFd { + fd: OwnedFd, + buffer: Box>, +} + +impl Completable for StatxFd { + type Output = (io::Result, OwnedFd); + + fn complete(self, cqe: CqeResult) -> Self::Output { + let ret = cqe + .result + .map(|_| Metadata(*unsafe { box_assume_init(self.buffer) })); + + (ret, self.fd) + } + + fn complete_with_error(self, error: io::Error) -> Self::Output { + (Err(error), self.fd) + } +} + +impl Cancellable for StatxFd { + fn cancel(self) -> CancelData { + CancelData::StatxFd(self) + } +} + +impl Op { + pub(crate) fn metadata_fd(fd: OwnedFd) -> Op { + let mut buffer = box_new_uninit::(); + + let flags: u32 = + linux_raw_sys::general::AT_STATX_SYNC_AS_STAT | linux_raw_sys::general::AT_EMPTY_PATH; + + // io-uring was introduced in linux 5.1 + // pass in an empty path instead of null as specified by man + // https://man7.org/linux/man-pages/man2/statx.2.html + let statx_op = opcode::Statx::new( + types::Fd(fd.as_raw_fd()), + c"".as_ptr(), + buffer.as_mut_ptr().cast(), + ) + .flags(flags as i32) + .mask(linux_raw_sys::general::STATX_BASIC_STATS) + .build(); + + // SAFETY: Parameters are valid for the entire duration of the operation + unsafe { Op::new(statx_op, StatxFd { fd, buffer }) } + } +} diff --git a/tokio/src/io/uring/utils.rs b/tokio/src/io/uring/utils.rs index e30e7a5ddc4..67b350fd85e 100644 --- a/tokio/src/io/uring/utils.rs +++ b/tokio/src/io/uring/utils.rs @@ -1,6 +1,19 @@ +use std::mem::MaybeUninit; use std::os::unix::ffi::OsStrExt; use std::{ffi::CString, io, path::Path}; pub(crate) fn cstr(p: &Path) -> io::Result { Ok(CString::new(p.as_os_str().as_bytes())?) } + +// TODO(MSRV 1.82): When bumping MSRV, switch to `Box::::new_uninit()`. +pub(crate) fn box_new_uninit() -> Box> { + // Box::::new_uninit() + Box::new(MaybeUninit::uninit()) +} + +// TODO(MSRV 1.82): When bumping MSRV, switch to `Box::>::assume_init()`. +pub(crate) unsafe fn box_assume_init(boxed: Box>) -> Box { + let raw = Box::into_raw(boxed); + unsafe { Box::from_raw(raw as *mut T) } +} diff --git a/tokio/src/runtime/driver/op.rs b/tokio/src/runtime/driver/op.rs index d2b9289ceee..6282d52d996 100644 --- a/tokio/src/runtime/driver/op.rs +++ b/tokio/src/runtime/driver/op.rs @@ -3,6 +3,7 @@ use crate::io::uring::read::Read; use crate::io::uring::write::Write; use crate::runtime::Handle; +use crate::io::uring::statx::{Statx, StatxFd}; use io_uring::cqueue; use io_uring::squeue::Entry; use std::future::Future; @@ -19,6 +20,8 @@ pub(crate) enum CancelData { Open(Open), Write(Write), Read(Read), + Statx(Statx), + StatxFd(StatxFd), } #[derive(Debug)]