Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion tokio/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ sync = []
test-util = ["rt", "sync", "time"]
time = []
# Unstable feature. Requires `--cfg tokio_unstable` to enable.
io-uring = ["dep:io-uring", "libc", "mio/os-poll", "mio/os-ext", "dep:slab"]
io-uring = ["dep:io-uring", "linux-raw-sys", "libc", "mio/os-poll", "mio/os-ext", "dep:slab"]
# Unstable feature. Requires `--cfg tokio_unstable` to enable.
taskdump = ["dep:backtrace"]

Expand All @@ -111,6 +111,7 @@ tracing = { version = "0.1.29", default-features = false, features = ["std"], op
# Requires `--cfg tokio_unstable` to enable.
[target.'cfg(all(tokio_unstable, target_os = "linux"))'.dependencies]
io-uring = { version = "0.7.6", default-features = false, optional = true }
linux-raw-sys = { version = "0.12.1", optional = true }
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a particular reason to not use libc instead ?
AFAIS it provides all the constants used in this PR from linux-raw-sys

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please do not use linux-raw-sys.

libc = { version = "0.2.168", optional = true }
mio = { version = "1.0.1", default-features = false, features = ["os-poll", "os-ext"], optional = true }
slab = { version = "0.4.9", optional = true }
Expand Down
9 changes: 9 additions & 0 deletions tokio/src/fs/mocks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,15 @@ impl From<MockFile> for OwnedFd {
}
}

#[cfg(all(test, unix))]
impl From<OwnedFd> for MockFile {
#[inline]
fn from(file: OwnedFd) -> MockFile {
use std::os::fd::IntoRawFd;
unsafe { MockFile::from_raw_fd(IntoRawFd::into_raw_fd(file)) }
}
}

tokio_thread_local! {
static QUEUE: RefCell<VecDeque<Box<dyn FnOnce() + Send>>> = RefCell::new(VecDeque::new())
}
Expand Down
28 changes: 18 additions & 10 deletions tokio/src/fs/open_options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ use std::path::Path;
cfg_io_uring! {
mod uring_open_options;
pub(crate) use uring_open_options::UringOpenOptions;
use crate::runtime::driver::op::Op;
}

#[cfg(test)]
Expand Down Expand Up @@ -518,8 +517,12 @@ impl OpenOptions {
/// [`Other`]: std::io::ErrorKind::Other
/// [`PermissionDenied`]: std::io::ErrorKind::PermissionDenied
pub async fn open(&self, path: impl AsRef<Path>) -> io::Result<File> {
self.open_inner(path.as_ref()).await
}

async fn open_inner(&self, path: &Path) -> io::Result<File> {
match &self.inner {
Kind::Std(opts) => Self::std_open(opts, path).await,
Kind::Std(opts) => Self::std_open(opts.clone(), path).await,
#[cfg(all(
tokio_unstable,
feature = "io-uring",
Expand All @@ -528,25 +531,30 @@ impl OpenOptions {
target_os = "linux"
))]
Kind::Uring(opts) => {
#[cfg(test)]
use super::mocks::MockFile as StdFile;
#[cfg(not(test))]
use std::fs::File as StdFile;

let handle = crate::runtime::Handle::current();
let driver_handle = handle.inner.driver().io();

if driver_handle.check_and_init()? {
Op::open(path.as_ref(), opts)?.await
opts.open(path.as_ref())
.await
.map(|fd| File::from_std(StdFile::from(fd)))
} else {
let opts = opts.clone().into();
Self::std_open(&opts, path).await
Self::std_open(opts, path).await
}
}
}
}

async fn std_open(opts: &StdOpenOptions, path: impl AsRef<Path>) -> io::Result<File> {
let path = path.as_ref().to_owned();
let opts = opts.clone();

let std = asyncify(move || opts.open(path)).await?;
Ok(File::from_std(std))
async fn std_open(opts: StdOpenOptions, path: &Path) -> io::Result<File> {
let path = path.to_owned();
let std = asyncify(move || opts.open(path).map(File::from_std)).await?;
Ok(std)
}

#[cfg(windows)]
Expand Down
7 changes: 7 additions & 0 deletions tokio/src/fs/open_options/uring_open_options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,11 @@ use std::{io, os::unix::fs::OpenOptionsExt};

#[cfg(test)]
use super::mock_open_options::MockOpenOptions as StdOpenOptions;
use crate::runtime::driver::op::Op;
#[cfg(not(test))]
use std::fs::OpenOptions as StdOpenOptions;
use std::os::fd::OwnedFd;
use std::path::Path;

#[derive(Debug, Clone)]
pub(crate) struct UringOpenOptions {
Expand Down Expand Up @@ -107,6 +110,10 @@ impl UringOpenOptions {
(_, _, true) => libc::O_CREAT | libc::O_EXCL,
})
}

pub(crate) async fn open(&self, path: &Path) -> io::Result<OwnedFd> {
Op::open(path, self)?.await
}
}

impl From<UringOpenOptions> for StdOpenOptions {
Expand Down
9 changes: 7 additions & 2 deletions tokio/src/fs/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ use std::{io, path::Path};
/// }
/// ```
pub async fn read(path: impl AsRef<Path>) -> io::Result<Vec<u8>> {
let path = path.as_ref().to_owned();
let path = path.as_ref();

#[cfg(all(
tokio_unstable,
Expand All @@ -69,9 +69,14 @@ pub async fn read(path: impl AsRef<Path>) -> io::Result<Vec<u8>> {
let handle = crate::runtime::Handle::current();
let driver_handle = handle.inner.driver().io();
if driver_handle.check_and_init()? {
return read_uring(&path).await;
return read_uring(path).await;
}
}

read_spawn_blocking(path).await
}

async fn read_spawn_blocking(path: &Path) -> io::Result<Vec<u8>> {
let path = path.to_owned();
asyncify(move || std::fs::read(path)).await
}
14 changes: 6 additions & 8 deletions tokio/src/fs/read_uring.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::fs::OpenOptions;
use crate::fs::UringOpenOptions;
use crate::runtime::driver::op::Op;

use std::io;
Expand All @@ -17,15 +17,13 @@ const PROBE_SIZE_U32: u32 = PROBE_SIZE as u32;
const MAX_READ_SIZE: usize = 64 * 1024 * 1024;

pub(crate) async fn read_uring(path: &Path) -> io::Result<Vec<u8>> {
let file = OpenOptions::new().read(true).open(path).await?;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd like to keep the OpenOptions, use it how you would without breaking/adding new changes and opening files with uring should work.

let fd = UringOpenOptions::new().read(true).open(path).await?;

// TODO: use io uring in the future to obtain metadata
let size_hint: Option<usize> = file.metadata().await.map(|m| m.len() as usize).ok();
let (size_hint, fd) = Op::metadata_fd(fd).await;

let fd: OwnedFd = file
.try_into_std()
.expect("unexpected in-flight operation detected")
.into();
let size_hint: Option<usize> = size_hint
.ok()
.map(|m| usize::try_from(m.len()).unwrap_or(usize::MAX));

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here if we obtain size_hint Err for whatever reason, we should start with a length 0 capacity and gradually increase it

let mut buf = Vec::new();

Expand Down
35 changes: 34 additions & 1 deletion tokio/src/fs/try_exists.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,39 @@ use std::path::Path;
/// # }
/// ```
pub async fn try_exists(path: impl AsRef<Path>) -> io::Result<bool> {
let path = path.as_ref().to_owned();
let path = path.as_ref();

#[cfg(all(
tokio_unstable,
feature = "io-uring",
feature = "rt",
feature = "fs",
target_os = "linux"
))]
{
let handle = crate::runtime::Handle::current();
let driver_handle = handle.inner.driver().io();
if driver_handle.check_and_init()? {
return try_exists_uring(path).await;
}
}

try_exists_spawn_blocking(path).await
}

cfg_io_uring! {
async fn try_exists_uring(path: &Path) -> io::Result<bool> {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: IMO this function can be inlined at line 39 above

use crate::runtime::driver::op::Op;

match Op::metadata(path)?.await {
Ok(_) => Ok(true),
Err(error) if error.kind() == io::ErrorKind::NotFound => Ok(false),
Err(error) => Err(error),
}
}
}

async fn try_exists_spawn_blocking(path: &Path) -> io::Result<bool> {
let path = path.to_owned();
asyncify(move || path.try_exists()).await
}
63 changes: 26 additions & 37 deletions tokio/src/fs/write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,49 +45,38 @@ pub async fn write(path: impl AsRef<Path>, contents: impl AsRef<[u8]>) -> io::Re
write_spawn_blocking(path, contents).await
}

#[cfg(all(
tokio_unstable,
feature = "io-uring",
feature = "rt",
feature = "fs",
target_os = "linux"
))]
async fn write_uring(path: &Path, mut buf: OwnedBuf) -> io::Result<()> {
use crate::{fs::OpenOptions, runtime::driver::op::Op};
use std::os::fd::OwnedFd;
cfg_io_uring! {
async fn write_uring(path: &Path, mut buf: OwnedBuf) -> io::Result<()> {
use crate::{fs::UringOpenOptions, runtime::driver::op::Op};

let file = OpenOptions::new()
.write(true)
.create(true)
.truncate(true)
.open(path)
.await?;
let mut fd = UringOpenOptions::new()
.write(true)
.create(true)
.truncate(true)
.open(path)
.await?;

let mut fd: OwnedFd = file
.try_into_std()
.expect("unexpected in-flight operation detected")
.into();
let total: usize = buf.as_ref().len();
let mut buf_offset: usize = 0;
let mut file_offset: u64 = 0;
while buf_offset < total {
let (res, _buf, _fd) = Op::write_at(fd, buf, buf_offset, file_offset)?.await;

let total: usize = buf.as_ref().len();
let mut buf_offset: usize = 0;
let mut file_offset: u64 = 0;
while buf_offset < total {
let (res, _buf, _fd) = Op::write_at(fd, buf, buf_offset, file_offset)?.await;
let n = match res {
Ok(0) => return Err(io::ErrorKind::WriteZero.into()),
Ok(n) => n,
Err(e) if e.kind() == io::ErrorKind::Interrupted => 0,
Err(e) => return Err(e),
};

let n = match res {
Ok(0) => return Err(io::ErrorKind::WriteZero.into()),
Ok(n) => n,
Err(e) if e.kind() == io::ErrorKind::Interrupted => 0,
Err(e) => return Err(e),
};
buf = _buf;
fd = _fd;
buf_offset += n as usize;
file_offset += n as u64;
}

buf = _buf;
fd = _fd;
buf_offset += n as usize;
file_offset += n as u64;
Ok(())
}

Ok(())
}

async fn write_spawn_blocking(path: &Path, contents: OwnedBuf) -> io::Result<()> {
Expand Down
1 change: 1 addition & 0 deletions tokio/src/io/uring/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
pub(crate) mod open;
pub(crate) mod read;
pub(crate) mod statx;
pub(crate) mod utils;
pub(crate) mod write;
6 changes: 3 additions & 3 deletions tokio/src/io/uring/open.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use crate::runtime::driver::op::{CancelData, Cancellable, Completable, CqeResult
use io_uring::{opcode, types};
use std::ffi::CString;
use std::io::{self, Error};
use std::os::fd::FromRawFd;
use std::os::fd::{FromRawFd, OwnedFd};
use std::path::Path;

#[derive(Debug)]
Expand All @@ -18,10 +18,10 @@ pub(crate) struct Open {
}

impl Completable for Open {
type Output = io::Result<crate::fs::File>;
type Output = io::Result<OwnedFd>;
fn complete(self, cqe: CqeResult) -> Self::Output {
cqe.result
.map(|fd| unsafe { crate::fs::File::from_raw_fd(fd as i32) })
.map(|fd| unsafe { OwnedFd::from_raw_fd(fd as i32) })
}

fn complete_with_error(self, err: Error) -> Self::Output {
Expand Down
Loading
Loading