Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 7 additions & 3 deletions tokio/src/fs/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -237,9 +237,6 @@ pub use self::metadata::metadata;

mod open_options;
pub use self::open_options::OpenOptions;
cfg_io_uring! {
pub(crate) use self::open_options::UringOpenOptions;
}

mod read;
pub use self::read::read;
Expand Down Expand Up @@ -298,6 +295,13 @@ cfg_windows! {
pub use self::symlink_file::symlink_file;
}

cfg_io_uring! {
pub(crate) mod read_uring;
pub(crate) use self::read_uring::read_uring;

pub(crate) use self::open_options::UringOpenOptions;
}

use std::io;

#[cfg(not(test))]
Expand Down
28 changes: 28 additions & 0 deletions tokio/src/fs/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,16 @@ use std::{io, path::Path};
///
/// [`ErrorKind::Interrupted`]: std::io::ErrorKind::Interrupted
///
/// # io_uring support
///
/// On Linux, you can also use io_uring for executing system calls. To enable
/// io_uring, you need to specify the `--cfg tokio_unstable` flag at compile time,
/// enable the io-uring cargo feature, and set the `Builder::enable_io_uring`
/// runtime option.
///
/// Support for io_uring is currently experimental, so its behavior may change
/// or it may be removed in future versions.
///
/// # Examples
///
/// ```no_run
Expand All @@ -45,5 +55,23 @@ use std::{io, path::Path};
/// ```
pub async fn read(path: impl AsRef<Path>) -> io::Result<Vec<u8>> {
let path = path.as_ref().to_owned();

#[cfg(all(
tokio_unstable,
feature = "io-uring",
feature = "rt",
feature = "fs",
target_os = "linux"
))]
Comment thread
mox692 marked this conversation as resolved.
{
use crate::fs::read_uring;

let handle = crate::runtime::Handle::current();
let driver_handle = handle.inner.driver().io();
if driver_handle.check_and_init()? {
return read_uring(&path).await;
}
}

asyncify(move || std::fs::read(path)).await
}
134 changes: 134 additions & 0 deletions tokio/src/fs/read_uring.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
use crate::fs::OpenOptions;
use crate::runtime::driver::op::Op;

use std::io;
use std::io::ErrorKind;
use std::os::fd::OwnedFd;
use std::path::Path;

// this algorithm is inspired from rust std lib version 1.90.0
// https://doc.rust-lang.org/1.90.0/src/std/io/mod.rs.html#409
const PROBE_SIZE: usize = 32;
const PROBE_SIZE_U32: u32 = PROBE_SIZE as u32;

// Max bytes we can read using io uring submission at a time
// SAFETY: cannot be higher than u32::MAX for safe cast
// Set to read max 64 MiB at time
const MAX_READ_SIZE: usize = 64 * 1024 * 1024;

pub(crate) async fn read_uring(path: &Path) -> io::Result<Vec<u8>> {
let file = OpenOptions::new().read(true).open(path).await?;

// TODO: use io uring in the future to obtain metadata
let size_hint: Option<usize> = file.metadata().await.map(|m| m.len() as usize).ok();

let fd: OwnedFd = file
.try_into_std()
.expect("unexpected in-flight operation detected")
.into();

let mut buf = Vec::new();

if let Some(size_hint) = size_hint {
buf.try_reserve(size_hint)?;
}

read_to_end_uring(fd, buf).await
}

async fn read_to_end_uring(mut fd: OwnedFd, mut buf: Vec<u8>) -> io::Result<Vec<u8>> {
Comment thread
ADD-SP marked this conversation as resolved.
let mut offset = 0;
let start_cap = buf.capacity();

loop {
if buf.len() == buf.capacity() && buf.capacity() == start_cap && buf.len() >= PROBE_SIZE {
// The buffer might be an exact fit. Let's read into a probe buffer
// and see if it returns `Ok(0)`. If so, we've avoided an
// unnecessary increasing of the capacity. But if not, append the
// probe buffer to the primary buffer and let its capacity grow.
let (r_fd, r_buf, is_eof) = small_probe_read(fd, buf, &mut offset).await?;

if is_eof {
return Ok(r_buf);
}

buf = r_buf;
fd = r_fd;
}

// buf is full, need more capacity
if buf.len() == buf.capacity() {
buf.try_reserve(PROBE_SIZE)?;
}

// prepare the spare capacity to be read into
let buf_len = usize::min(buf.spare_capacity_mut().len(), MAX_READ_SIZE);

// buf_len cannot be greater than u32::MAX because MAX_READ_SIZE
// is less than u32::MAX
let read_len = u32::try_from(buf_len).expect("buf_len must always fit in u32");

// read into spare capacity
let (r_fd, r_buf, is_eof) = op_read(fd, buf, &mut offset, read_len).await?;

if is_eof {
return Ok(r_buf);
}

fd = r_fd;
buf = r_buf;
}
}

async fn small_probe_read(
fd: OwnedFd,
mut buf: Vec<u8>,
offset: &mut u64,
) -> io::Result<(OwnedFd, Vec<u8>, bool)> {
let read_len = PROBE_SIZE_U32;

let mut temp_arr = [0; PROBE_SIZE];
// we don't call this function if buffer's length < PROBE_SIZE
let back_bytes_len = buf.len() - PROBE_SIZE;

temp_arr.copy_from_slice(&buf[back_bytes_len..]);

// We're decreasing the length of the buffer and len is greater
// than PROBE_SIZE. So we can read into the discarded length
buf.truncate(back_bytes_len);

let (r_fd, mut r_buf, is_eof) = op_read(fd, buf, offset, read_len).await?;
// If `size_read` returns zero due to reasons such as buffer's exact fit,
// then this `try_reserve` does not perform allocation.
r_buf.try_reserve(PROBE_SIZE)?;
Comment thread
Daksh14 marked this conversation as resolved.
r_buf.splice(back_bytes_len..back_bytes_len, temp_arr);
Comment thread
Darksonn marked this conversation as resolved.
Comment thread
ADD-SP marked this conversation as resolved.

Ok((r_fd, r_buf, is_eof))
}
Comment thread
Daksh14 marked this conversation as resolved.

// Takes a amount of length to read and returns a singluar read in the buffer
//
// Returns the file descriptor, buffer and EOF reached or not
async fn op_read(
mut fd: OwnedFd,
mut buf: Vec<u8>,
offset: &mut u64,
read_len: u32,
) -> io::Result<(OwnedFd, Vec<u8>, bool)> {
loop {
let (res, r_fd, r_buf) = Op::read(fd, buf, read_len, *offset).await;

match res {
Err(e) if e.kind() == ErrorKind::Interrupted => {
buf = r_buf;
fd = r_fd;
}
Err(e) => return Err(e),
Ok(size_read) => {
*offset += size_read as u64;

return Ok((r_fd, r_buf, size_read == 0));
}
}
}
}
Comment thread
ADD-SP marked this conversation as resolved.
1 change: 1 addition & 0 deletions tokio/src/io/uring/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
pub(crate) mod open;
pub(crate) mod read;
pub(crate) mod utils;
pub(crate) mod write;
61 changes: 61 additions & 0 deletions tokio/src/io/uring/read.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
use crate::runtime::driver::op::{CancelData, Cancellable, Completable, CqeResult, Op};

use io_uring::{opcode, types};
use std::io::{self, Error};
use std::os::fd::{AsRawFd, OwnedFd};

#[derive(Debug)]
pub(crate) struct Read {
fd: OwnedFd,
buf: Vec<u8>,
}

impl Completable for Read {
type Output = (io::Result<u32>, OwnedFd, Vec<u8>);

fn complete(self, cqe: CqeResult) -> Self::Output {
let mut buf = self.buf;

if let Ok(len) = cqe.result {
let new_len = buf.len() + len as usize;
// SAFETY: Kernel read len bytes
unsafe { buf.set_len(new_len) };
}

(cqe.result, self.fd, buf)
}
Comment thread
Daksh14 marked this conversation as resolved.

fn complete_with_error(self, err: Error) -> Self::Output {
(Err(err), self.fd, self.buf)
}
}

impl Cancellable for Read {
fn cancel(self) -> CancelData {
CancelData::Read(self)
}
}

impl Op<Read> {
// Submit a request to read a FD at given length and offset into a
// dynamic buffer with uninitialized memory. The read happens on uninitialized
// buffer and no overwriting happens.

// SAFETY: The `len` of the amount to be read and the buffer that is passed
// should have capacity > len.
//
// If `len` read is higher than vector capacity then setting its length by
// the caller in terms of size_read can be unsound.
pub(crate) fn read(fd: OwnedFd, mut buf: Vec<u8>, len: u32, offset: u64) -> Self {
Comment thread
Daksh14 marked this conversation as resolved.
// don't overwrite on already written part
assert!(buf.spare_capacity_mut().len() >= len as usize);
let buf_mut_ptr = buf.spare_capacity_mut().as_mut_ptr().cast();

let read_op = opcode::Read::new(types::Fd(fd.as_raw_fd()), buf_mut_ptr, len)
.offset(offset)
.build();

// SAFETY: Parameters are valid for the entire duration of the operation
unsafe { Op::new(read_op, Read { fd, buf }) }
}
}
2 changes: 2 additions & 0 deletions tokio/src/runtime/driver/op.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use crate::io::uring::open::Open;
use crate::io::uring::read::Read;
use crate::io::uring::write::Write;
use crate::runtime::Handle;

Expand All @@ -17,6 +18,7 @@ use std::task::{Context, Poll, Waker};
pub(crate) enum CancelData {
Open(Open),
Write(Write),
Read(Read),
}

#[derive(Debug)]
Expand Down
Loading