From f1a10bc3f65efafb85a0d95ee766350c05bba377 Mon Sep 17 00:00:00 2001 From: Daniel Sedlak Date: Sat, 25 May 2024 13:02:01 +0200 Subject: [PATCH] io: add unidirectional stream structure Up until this patch we had the `DuplexStream` which was backed by two underlying pipes. This patch makes public this underlying structure and renames it from the `Pipe` to the `SimplexStream` to provent name confusion and keep simililiarity with the already existing duplex. --- tokio/src/io/mod.rs | 4 +-- tokio/src/io/util/mem.rs | 72 ++++++++++++++++++++++++++++++++-------- tokio/src/io/util/mod.rs | 2 +- 3 files changed, 61 insertions(+), 17 deletions(-) diff --git a/tokio/src/io/mod.rs b/tokio/src/io/mod.rs index b35a20dd35b..99cabde0ab8 100644 --- a/tokio/src/io/mod.rs +++ b/tokio/src/io/mod.rs @@ -271,8 +271,8 @@ cfg_io_util! { pub(crate) mod seek; pub(crate) mod util; pub use util::{ - copy, copy_bidirectional, copy_bidirectional_with_sizes, copy_buf, duplex, empty, repeat, sink, AsyncBufReadExt, AsyncReadExt, AsyncSeekExt, AsyncWriteExt, - BufReader, BufStream, BufWriter, DuplexStream, Empty, Lines, Repeat, Sink, Split, Take, + copy, copy_bidirectional, copy_bidirectional_with_sizes, copy_buf, duplex, empty, repeat, sink, simplex, AsyncBufReadExt, AsyncReadExt, AsyncSeekExt, AsyncWriteExt, + BufReader, BufStream, BufWriter, DuplexStream, Empty, Lines, Repeat, Sink, Split, Take, SimplexStream, }; } diff --git a/tokio/src/io/util/mem.rs b/tokio/src/io/util/mem.rs index 96676e64cff..0f958485c5c 100644 --- a/tokio/src/io/util/mem.rs +++ b/tokio/src/io/util/mem.rs @@ -1,6 +1,6 @@ //! In-process memory IO types. -use crate::io::{AsyncRead, AsyncWrite, ReadBuf}; +use crate::io::{split, AsyncRead, AsyncWrite, ReadBuf, ReadHalf, WriteHalf}; use crate::loom::sync::Mutex; use bytes::{Buf, BytesMut}; @@ -47,15 +47,31 @@ use std::{ #[derive(Debug)] #[cfg_attr(docsrs, doc(cfg(feature = "io-util")))] pub struct DuplexStream { - read: Arc>, - write: Arc>, + read: Arc>, + write: Arc>, } -/// A unidirectional IO over a piece of memory. +/// A unidirectional pipe to read and write bytes in memory. /// -/// Data can be written to the pipe, and reading will return that data. +/// # Example +/// +/// ``` +/// # async fn ex() -> std::io::Result<()> { +/// # use tokio::io::{AsyncReadExt, AsyncWriteExt}; +/// let (mut receiver, mut sender) = tokio::io::simplex(64); +/// +/// sender.write_all(b"ping").await?; +/// +/// let mut buf = [0u8; 4]; +/// receiver.read_exact(&mut buf).await?; +/// assert_eq!(&buf, b"ping"); +/// +/// # Ok(()) +/// # } +/// ``` #[derive(Debug)] -struct Pipe { +#[cfg_attr(docsrs, doc(cfg(feature = "io-util")))] +pub struct SimplexStream { /// The buffer storing the bytes written, also read from. /// /// Using a `BytesMut` because it has efficient `Buf` and `BufMut` @@ -83,8 +99,8 @@ struct Pipe { /// written to a side before the write returns `Poll::Pending`. #[cfg_attr(docsrs, doc(cfg(feature = "io-util")))] pub fn duplex(max_buf_size: usize) -> (DuplexStream, DuplexStream) { - let one = Arc::new(Mutex::new(Pipe::new(max_buf_size))); - let two = Arc::new(Mutex::new(Pipe::new(max_buf_size))); + let one = Arc::new(Mutex::new(SimplexStream::new(max_buf_size))); + let two = Arc::new(Mutex::new(SimplexStream::new(max_buf_size))); ( DuplexStream { @@ -161,11 +177,39 @@ impl Drop for DuplexStream { } } -// ===== impl Pipe ===== +// ===== impl SimplexStream ===== + +/// Creates unidirectional buffer that acts like a pipe. +/// +/// The `max_buf_size` argument is the maximum amount of bytes that can be +/// written to a buffer before the it returns `Poll::Pending`. +/// +/// # Unify reader and writer +/// +/// The reader and writer half can be unsplit into a single structure +/// of `SimplexStream` that supports both reading and writing. +/// +/// ``` +/// # async fn ex() -> std::io::Result<()> { +/// # use tokio::io::{AsyncReadExt, AsyncWriteExt}; +/// let (writer, reader) = tokio::io::simplex(64); +/// let mut simplex_stream = writer.unsplit(reader); +/// simplex_stream.write_all(b"hello").await?; +/// +/// let mut buf = [0u8; 5]; +/// simplex_stream.read_exact(&mut buf).await?; +/// assert_eq!(&buf, b"hello"); +/// # Ok(()) +/// # } +/// ``` +#[cfg_attr(docsrs, doc(cfg(feature = "io-util")))] +pub fn simplex(max_buf_size: usize) -> (ReadHalf, WriteHalf) { + split(SimplexStream::new(max_buf_size)) +} -impl Pipe { - fn new(max_buf_size: usize) -> Self { - Pipe { +impl SimplexStream { + fn new(max_buf_size: usize) -> SimplexStream { + SimplexStream { buffer: BytesMut::new(), is_closed: false, max_buf_size, @@ -269,7 +313,7 @@ impl Pipe { } } -impl AsyncRead for Pipe { +impl AsyncRead for SimplexStream { cfg_coop! { fn poll_read( self: Pin<&mut Self>, @@ -299,7 +343,7 @@ impl AsyncRead for Pipe { } } -impl AsyncWrite for Pipe { +impl AsyncWrite for SimplexStream { cfg_coop! { fn poll_write( self: Pin<&mut Self>, diff --git a/tokio/src/io/util/mod.rs b/tokio/src/io/util/mod.rs index 5010fc01d29..b2f8618c7ee 100644 --- a/tokio/src/io/util/mod.rs +++ b/tokio/src/io/util/mod.rs @@ -42,7 +42,7 @@ cfg_io_util! { pub use lines::Lines; mod mem; - pub use mem::{duplex, DuplexStream}; + pub use mem::{duplex, simplex, DuplexStream, SimplexStream}; mod read; mod read_buf;