Skip to content

Commit 7cddcfa

Browse files
committed
io: add unidirectional stream structure
Up until this patch we had the `DuplexStream` which was backed by two underlying pipes. This patch makes public this underlying structure and renames it from the `Pipe` to the `SimplexStream` to provent name confusion and keep simililiarity with the already existing duplex.
1 parent 5b9a290 commit 7cddcfa

File tree

3 files changed

+71
-17
lines changed

3 files changed

+71
-17
lines changed

tokio/src/io/mod.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -271,8 +271,8 @@ cfg_io_util! {
271271
pub(crate) mod seek;
272272
pub(crate) mod util;
273273
pub use util::{
274-
copy, copy_bidirectional, copy_bidirectional_with_sizes, copy_buf, duplex, empty, repeat, sink, AsyncBufReadExt, AsyncReadExt, AsyncSeekExt, AsyncWriteExt,
275-
BufReader, BufStream, BufWriter, DuplexStream, Empty, Lines, Repeat, Sink, Split, Take,
274+
copy, copy_bidirectional, copy_bidirectional_with_sizes, copy_buf, duplex, empty, repeat, sink, simplex, AsyncBufReadExt, AsyncReadExt, AsyncSeekExt, AsyncWriteExt,
275+
BufReader, BufStream, BufWriter, DuplexStream, Empty, Lines, Repeat, Sink, Split, Take, SimplexStream,
276276
};
277277
}
278278

tokio/src/io/util/mem.rs

+68-14
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
//! In-process memory IO types.
22
3-
use crate::io::{AsyncRead, AsyncWrite, ReadBuf};
3+
use crate::io::{split, AsyncRead, AsyncWrite, ReadBuf, ReadHalf, WriteHalf};
44
use crate::loom::sync::Mutex;
55

66
use bytes::{Buf, BytesMut};
@@ -47,15 +47,33 @@ use std::{
4747
#[derive(Debug)]
4848
#[cfg_attr(docsrs, doc(cfg(feature = "io-util")))]
4949
pub struct DuplexStream {
50-
read: Arc<Mutex<Pipe>>,
51-
write: Arc<Mutex<Pipe>>,
50+
read: Arc<Mutex<SimplexStream>>,
51+
write: Arc<Mutex<SimplexStream>>,
5252
}
5353

54-
/// A unidirectional IO over a piece of memory.
54+
/// A unidirectional pipe to read and write bytes in memory. It can be constructed
55+
/// by [`simplex`] function which will crete a pair of reader and writer or by
56+
/// calling [`SimplexStream::new_unsplit`] that will create a handle for both
57+
/// reading and writing.
5558
///
56-
/// Data can be written to the pipe, and reading will return that data.
59+
/// # Example
60+
///
61+
/// ```
62+
/// # async fn ex() -> std::io::Result<()> {
63+
/// # use tokio::io::{AsyncReadExt, AsyncWriteExt};
64+
/// let (mut receiver, mut sender) = tokio::io::simplex(64);
65+
///
66+
/// sender.write_all(b"ping").await?;
67+
///
68+
/// let mut buf = [0u8; 4];
69+
/// receiver.read_exact(&mut buf).await?;
70+
/// assert_eq!(&buf, b"ping");
71+
/// # Ok(())
72+
/// # }
73+
/// ```
5774
#[derive(Debug)]
58-
struct Pipe {
75+
#[cfg_attr(docsrs, doc(cfg(feature = "io-util")))]
76+
pub struct SimplexStream {
5977
/// The buffer storing the bytes written, also read from.
6078
///
6179
/// Using a `BytesMut` because it has efficient `Buf` and `BufMut`
@@ -83,8 +101,8 @@ struct Pipe {
83101
/// written to a side before the write returns `Poll::Pending`.
84102
#[cfg_attr(docsrs, doc(cfg(feature = "io-util")))]
85103
pub fn duplex(max_buf_size: usize) -> (DuplexStream, DuplexStream) {
86-
let one = Arc::new(Mutex::new(Pipe::new(max_buf_size)));
87-
let two = Arc::new(Mutex::new(Pipe::new(max_buf_size)));
104+
let one = Arc::new(Mutex::new(SimplexStream::new_unsplit(max_buf_size)));
105+
let two = Arc::new(Mutex::new(SimplexStream::new_unsplit(max_buf_size)));
88106

89107
(
90108
DuplexStream {
@@ -161,11 +179,47 @@ impl Drop for DuplexStream {
161179
}
162180
}
163181

164-
// ===== impl Pipe =====
182+
// ===== impl SimplexStream =====
165183

166-
impl Pipe {
167-
fn new(max_buf_size: usize) -> Self {
168-
Pipe {
184+
/// Creates unidirectional buffer that acts like in memory pipe.
185+
///
186+
/// The `max_buf_size` argument is the maximum amount of bytes that can be
187+
/// written to a buffer before the it returns `Poll::Pending`.
188+
///
189+
/// # Unify reader and writer
190+
///
191+
/// The reader and writer half can be unified into a single structure
192+
/// of `SimplexStream` that supports both reading and writing or
193+
/// the `SimplexStream` can be already created as unified structure
194+
/// using [`SimplexStream::new_unsplit()`].
195+
///
196+
/// ```
197+
/// # async fn ex() -> std::io::Result<()> {
198+
/// # use tokio::io::{AsyncReadExt, AsyncWriteExt};
199+
/// let (writer, reader) = tokio::io::simplex(64);
200+
/// let mut simplex_stream = writer.unsplit(reader);
201+
/// simplex_stream.write_all(b"hello").await?;
202+
///
203+
/// let mut buf = [0u8; 5];
204+
/// simplex_stream.read_exact(&mut buf).await?;
205+
/// assert_eq!(&buf, b"hello");
206+
/// # Ok(())
207+
/// # }
208+
/// ```
209+
#[cfg_attr(docsrs, doc(cfg(feature = "io-util")))]
210+
pub fn simplex(max_buf_size: usize) -> (ReadHalf<SimplexStream>, WriteHalf<SimplexStream>) {
211+
split(SimplexStream::new_unsplit(max_buf_size))
212+
}
213+
214+
impl SimplexStream {
215+
/// Creates unidirectional buffer that acts like in memory pipe. To create split
216+
/// version with separate reader and writer you can use [`simplex`] function.
217+
///
218+
/// The `max_buf_size` argument is the maximum amount of bytes that can be
219+
/// written to a buffer before the it returns `Poll::Pending`.
220+
#[cfg_attr(docsrs, doc(cfg(feature = "io-util")))]
221+
pub fn new_unsplit(max_buf_size: usize) -> SimplexStream {
222+
SimplexStream {
169223
buffer: BytesMut::new(),
170224
is_closed: false,
171225
max_buf_size,
@@ -269,7 +323,7 @@ impl Pipe {
269323
}
270324
}
271325

272-
impl AsyncRead for Pipe {
326+
impl AsyncRead for SimplexStream {
273327
cfg_coop! {
274328
fn poll_read(
275329
self: Pin<&mut Self>,
@@ -299,7 +353,7 @@ impl AsyncRead for Pipe {
299353
}
300354
}
301355

302-
impl AsyncWrite for Pipe {
356+
impl AsyncWrite for SimplexStream {
303357
cfg_coop! {
304358
fn poll_write(
305359
self: Pin<&mut Self>,

tokio/src/io/util/mod.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ cfg_io_util! {
4242
pub use lines::Lines;
4343

4444
mod mem;
45-
pub use mem::{duplex, DuplexStream};
45+
pub use mem::{duplex, simplex, DuplexStream, SimplexStream};
4646

4747
mod read;
4848
mod read_buf;

0 commit comments

Comments
 (0)