Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Proposing new AsyncRead / AsyncWrite traits #1744

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion tokio/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ futures-sink-preview = "=0.3.0-alpha.19"
futures-util-preview = { version = "=0.3.0-alpha.19", features = ["sink", "channel"] }

# Everything else is optional...
bytes = { version = "0.4", optional = true }
bytes = { git = "https://github.com/tokio-rs/bytes", optional = true }
fnv = { version = "1.0.6", optional = true }
iovec = { version = "0.1", optional = true }
lazy_static = { version = "1.0.2", optional = true }
Expand Down
44 changes: 25 additions & 19 deletions tokio/src/fs/blocking.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use crate::fs::sys;
use crate::io::{AsyncRead, AsyncWrite};

use bytes::{Buf, BufMut};
use futures_core::ready;
use std::cmp;
use std::future::Future;
Expand All @@ -22,7 +23,7 @@ pub(crate) struct Blocking<T> {
}

#[derive(Debug)]
pub(crate) struct Buf {
pub(crate) struct FsBuf {
buf: Vec<u8>,
pos: usize,
}
Expand All @@ -31,15 +32,15 @@ pub(crate) const MAX_BUF: usize = 16 * 1024;

#[derive(Debug)]
enum State<T> {
Idle(Option<Buf>),
Busy(sys::Blocking<(io::Result<usize>, Buf, T)>),
Idle(Option<FsBuf>),
Busy(sys::Blocking<(io::Result<usize>, FsBuf, T)>),
}

impl<T> Blocking<T> {
pub(crate) fn new(inner: T) -> Blocking<T> {
Blocking {
inner: Some(inner),
state: State::Idle(Some(Buf::with_capacity(0))),
state: State::Idle(Some(FsBuf::with_capacity(0))),
need_flush: false,
}
}
Expand All @@ -52,7 +53,7 @@ where
fn poll_read(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
dst: &mut [u8],
dst: &mut dyn BufMut,
) -> Poll<io::Result<usize>> {
loop {
match self.state {
Expand Down Expand Up @@ -103,7 +104,7 @@ where
fn poll_write(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
src: &[u8],
src: &mut dyn Buf,
) -> Poll<io::Result<usize>> {
loop {
match self.state {
Expand Down Expand Up @@ -186,9 +187,9 @@ macro_rules! uninterruptibly {
}};
}

impl Buf {
pub(crate) fn with_capacity(n: usize) -> Buf {
Buf {
impl FsBuf {
pub(crate) fn with_capacity(n: usize) -> FsBuf {
FsBuf {
buf: Vec::with_capacity(n),
pos: 0,
}
Expand All @@ -202,9 +203,9 @@ impl Buf {
self.buf.len() - self.pos
}

pub(crate) fn copy_to(&mut self, dst: &mut [u8]) -> usize {
let n = cmp::min(self.len(), dst.len());
dst[..n].copy_from_slice(&self.bytes()[..n]);
pub(crate) fn copy_to(&mut self, dst: &mut dyn BufMut) -> usize {
let n = cmp::min(self.len(), dst.remaining_mut());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we could use bytes_mut() slice?

Consider it here and in code below where it can lower number of dynamic method dispatches

dst.put_slice(&self.bytes()[..n]);
self.pos += n;

if self.pos == self.buf.len() {
Expand All @@ -215,23 +216,28 @@ impl Buf {
n
}

pub(crate) fn copy_from(&mut self, src: &[u8]) -> usize {
pub(crate) fn copy_from(&mut self, src: &mut dyn Buf) -> usize {
assert!(self.is_empty());

let n = cmp::min(src.len(), MAX_BUF);

self.buf.extend_from_slice(&src[..n]);
n
if src.remaining() > MAX_BUF {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Store src.remaining() into variable before if?

use bytes::buf::BufExt;
self.buf.put(src.take(MAX_BUF));
MAX_BUF
} else {
let n = src.remaining();
self.buf.put(src);
n
}
}

pub(crate) fn bytes(&self) -> &[u8] {
&self.buf[self.pos..]
}

pub(crate) fn ensure_capacity_for(&mut self, bytes: &[u8]) {
pub(crate) fn ensure_capacity_for(&mut self, bytes: &dyn BufMut) {
assert!(self.is_empty());

let len = cmp::min(bytes.len(), MAX_BUF);
let len = cmp::min(bytes.remaining_mut(), MAX_BUF);

if self.buf.len() < len {
self.buf.reserve(len - self.buf.len());
Expand Down
13 changes: 7 additions & 6 deletions tokio/src/fs/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,11 @@
//! [`File`]: file/struct.File.html

use self::State::*;
use crate::fs::blocking::Buf;
use crate::fs::blocking::FsBuf;
use crate::fs::{asyncify, sys};
use crate::io::{AsyncRead, AsyncWrite};

use bytes::{Buf, BufMut};
use futures_core::ready;
use std::fmt;
use std::fs::{Metadata, Permissions};
Expand Down Expand Up @@ -75,8 +76,8 @@ pub struct File {

#[derive(Debug)]
enum State {
Idle(Option<Buf>),
Busy(sys::Blocking<(Operation, Buf)>),
Idle(Option<FsBuf>),
Busy(sys::Blocking<(Operation, FsBuf)>),
}

#[derive(Debug)]
Expand Down Expand Up @@ -172,7 +173,7 @@ impl File {
pub fn from_std(std: sys::File) -> File {
File {
std: Arc::new(std),
state: State::Idle(Some(Buf::with_capacity(0))),
state: State::Idle(Some(FsBuf::with_capacity(0))),
last_write_err: None,
}
}
Expand Down Expand Up @@ -442,7 +443,7 @@ impl AsyncRead for File {
fn poll_read(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
dst: &mut [u8],
dst: &mut dyn BufMut,
) -> Poll<io::Result<usize>> {
loop {
match self.state {
Expand Down Expand Up @@ -504,7 +505,7 @@ impl AsyncWrite for File {
fn poll_write(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
src: &[u8],
src: &mut dyn Buf,
) -> Poll<io::Result<usize>> {
if let Some(e) = self.last_write_err.take() {
return Ready(Err(e.into()));
Expand Down
121 changes: 28 additions & 93 deletions tokio/src/io/async_read.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
use bytes::BufMut;
use futures_core::ready;
use std::io;
use std::ops::DerefMut;
use std::pin::Pin;
Expand Down Expand Up @@ -31,42 +30,6 @@ use std::task::{Context, Poll};
/// This trait importantly means that the `read` method only works in the
/// context of a future's task. The object may panic if used outside of a task.
pub trait AsyncRead {
/// Prepares an uninitialized buffer to be safe to pass to `read`. Returns
/// `true` if the supplied buffer was zeroed out.
///
/// While it would be highly unusual, implementations of [`io::Read`] are
/// able to read data from the buffer passed as an argument. Because of
/// this, the buffer passed to [`io::Read`] must be initialized memory. In
/// situations where large numbers of buffers are used, constantly having to
/// zero out buffers can be expensive.
///
/// This function does any necessary work to prepare an uninitialized buffer
/// to be safe to pass to `read`. If `read` guarantees to never attempt to
/// read data out of the supplied buffer, then `prepare_uninitialized_buffer`
/// doesn't need to do any work.
///
/// If this function returns `true`, then the memory has been zeroed out.
/// This allows implementations of `AsyncRead` which are composed of
/// multiple subimplementations to efficiently implement
/// `prepare_uninitialized_buffer`.
///
/// This function isn't actually `unsafe` to call but `unsafe` to implement.
/// The implementer must ensure that either the whole `buf` has been zeroed
/// or `poll_read_buf()` overwrites the buffer without reading it and returns
/// correct value.
///
/// This function is called from [`poll_read_buf`].
///
/// [`io::Read`]: std::io::Read
/// [`poll_read_buf`]: #method.poll_read_buf
unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [u8]) -> bool {
for x in buf {
*x = 0;
}

true
}

/// Attempt to read from the `AsyncRead` into `buf`.
///
/// On success, returns `Poll::Ready(Ok(num_bytes_read))`.
Expand All @@ -78,49 +41,13 @@ pub trait AsyncRead {
fn poll_read(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut [u8],
buf: &mut dyn BufMut,
) -> Poll<io::Result<usize>>;

/// Pull some bytes from this source into the specified `BufMut`, returning
/// how many bytes were read.
///
/// The `buf` provided will have bytes read into it and the internal cursor
/// will be advanced if any bytes were read. Note that this method typically
/// will not reallocate the buffer provided.
fn poll_read_buf<B: BufMut>(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut B,
) -> Poll<io::Result<usize>>
where
Self: Sized,
{
if !buf.has_remaining_mut() {
return Poll::Ready(Ok(0));
}

unsafe {
let n = {
let b = buf.bytes_mut();

self.prepare_uninitialized_buffer(b);

ready!(self.poll_read(cx, b))?
};

buf.advance_mut(n);
Poll::Ready(Ok(n))
}
}
}

macro_rules! deref_async_read {
() => {
unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [u8]) -> bool {
(**self).prepare_uninitialized_buffer(buf)
}

fn poll_read(mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut [u8])
fn poll_read(mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut dyn BufMut)
-> Poll<io::Result<usize>>
{
Pin::new(&mut **self).poll_read(cx, buf)
Expand All @@ -141,43 +68,51 @@ where
P: DerefMut + Unpin,
P::Target: AsyncRead,
{
unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [u8]) -> bool {
(**self).prepare_uninitialized_buffer(buf)
}

fn poll_read(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut [u8],
buf: &mut dyn BufMut,
) -> Poll<io::Result<usize>> {
self.get_mut().as_mut().poll_read(cx, buf)
}
}

impl AsyncRead for &[u8] {
unsafe fn prepare_uninitialized_buffer(&self, _buf: &mut [u8]) -> bool {
false
}

fn poll_read(
self: Pin<&mut Self>,
_cx: &mut Context<'_>,
buf: &mut [u8],
buf: &mut dyn BufMut,
) -> Poll<io::Result<usize>> {
Poll::Ready(io::Read::read(self.get_mut(), buf))
if self.len() > buf.remaining_mut() {
let n = buf.remaining_mut();
let (a, b) = self.split_at(n);
buf.put_slice(a);
*self.get_mut() = b;
Poll::Ready(Ok(n))
} else {
let n = self.len();
buf.put_slice(&*self);
Poll::Ready(Ok(n))
}
}
}

impl<T: AsRef<[u8]> + Unpin> AsyncRead for io::Cursor<T> {
unsafe fn prepare_uninitialized_buffer(&self, _buf: &mut [u8]) -> bool {
false
}

fn poll_read(
self: Pin<&mut Self>,
mut self: Pin<&mut Self>,
_cx: &mut Context<'_>,
buf: &mut [u8],
mut buf: &mut dyn BufMut,
) -> Poll<io::Result<usize>> {
Poll::Ready(io::Read::read(self.get_mut(), buf))
use bytes::{buf::BufExt, Buf};

if self.as_mut().get_mut().remaining() > buf.remaining_mut() {
let n = buf.remaining_mut();
BufMut::put(&mut buf, self.as_mut().get_mut().take(n));
Poll::Ready(Ok(n))
} else {
let n = self.as_mut().get_mut().remaining();
BufMut::put(&mut buf, self.get_mut());
Poll::Ready(Ok(n))
}
}
}
Loading