Skip to content

Commit 91b3e84

Browse files
committed
io: add unidirectional stream structure
Up until this patch we had the `DuplexStream` which was backed by two underlying pipes. This patch makes public this underlying structure and renames it from the `Pipe` to the `SimplexStream` to provent name confusion and keep simililiarity with the already existing duplex.
1 parent cba86cf commit 91b3e84

File tree

3 files changed

+47
-22
lines changed

3 files changed

+47
-22
lines changed

tokio/src/io/mod.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -271,8 +271,8 @@ cfg_io_util! {
271271
pub(crate) mod seek;
272272
pub(crate) mod util;
273273
pub use util::{
274-
copy, copy_bidirectional, copy_bidirectional_with_sizes, copy_buf, duplex, empty, repeat, sink, AsyncBufReadExt, AsyncReadExt, AsyncSeekExt, AsyncWriteExt,
275-
BufReader, BufStream, BufWriter, DuplexStream, Empty, Lines, Repeat, Sink, Split, Take,
274+
copy, copy_bidirectional, copy_bidirectional_with_sizes, copy_buf, duplex, empty, repeat, sink, simplex, AsyncBufReadExt, AsyncReadExt, AsyncSeekExt, AsyncWriteExt,
275+
BufReader, BufStream, BufWriter, DuplexStream, Empty, Lines, Repeat, Sink, Split, Take, SimplexStream,
276276
};
277277
}
278278

tokio/src/io/util/mem.rs

+44-19
Original file line numberDiff line numberDiff line change
@@ -47,15 +47,35 @@ use std::{
4747
#[derive(Debug)]
4848
#[cfg_attr(docsrs, doc(cfg(feature = "io-util")))]
4949
pub struct DuplexStream {
50-
read: Arc<Mutex<Pipe>>,
51-
write: Arc<Mutex<Pipe>>,
50+
read: Arc<Mutex<SimplexStream>>,
51+
write: Arc<Mutex<SimplexStream>>,
5252
}
5353

54-
/// A unidirectional IO over a piece of memory.
54+
/// A unidirectional pipe to read and write bytes in memory.
5555
///
56-
/// Data can be written to the pipe, and reading will return that data.
56+
/// Writing to the `SimplexStream` will allow that data to be read again, it is used as in-memory IO type.
57+
///
58+
/// You can [`split`](crate::io::split()) the `SimplexStream` so you can have a separate read and write handle.
59+
///
60+
/// # Example
61+
///
62+
/// ```
63+
/// # async fn ex() -> std::io::Result<()> {
64+
/// # use tokio::io::{AsyncReadExt, AsyncWriteExt, split};
65+
/// let (mut receiver, mut sender) = split(tokio::io::simplex(64));
66+
///
67+
/// sender.write_all(b"ping").await?;
68+
///
69+
/// let mut buf = [0u8; 4];
70+
/// receiver.read_exact(&mut buf).await?;
71+
/// assert_eq!(&buf, b"ping");
72+
///
73+
/// # Ok(())
74+
/// # }
75+
/// ```
5776
#[derive(Debug)]
58-
struct Pipe {
77+
#[cfg_attr(docsrs, doc(cfg(feature = "io-util")))]
78+
pub struct SimplexStream {
5979
/// The buffer storing the bytes written, also read from.
6080
///
6181
/// Using a `BytesMut` because it has efficient `Buf` and `BufMut`
@@ -83,8 +103,8 @@ struct Pipe {
83103
/// written to a side before the write returns `Poll::Pending`.
84104
#[cfg_attr(docsrs, doc(cfg(feature = "io-util")))]
85105
pub fn duplex(max_buf_size: usize) -> (DuplexStream, DuplexStream) {
86-
let one = Arc::new(Mutex::new(Pipe::new(max_buf_size)));
87-
let two = Arc::new(Mutex::new(Pipe::new(max_buf_size)));
106+
let one = Arc::new(Mutex::new(simplex(max_buf_size)));
107+
let two = Arc::new(Mutex::new(simplex(max_buf_size)));
88108

89109
(
90110
DuplexStream {
@@ -161,19 +181,24 @@ impl Drop for DuplexStream {
161181
}
162182
}
163183

164-
// ===== impl Pipe =====
184+
// ===== impl SimplexStream =====
165185

166-
impl Pipe {
167-
fn new(max_buf_size: usize) -> Self {
168-
Pipe {
169-
buffer: BytesMut::new(),
170-
is_closed: false,
171-
max_buf_size,
172-
read_waker: None,
173-
write_waker: None,
174-
}
186+
/// Creates unidirectional buffer that acts like pair of connected single dicrection sockets.
187+
///
188+
/// The `max_buf_size` argument is the maximum amount of bytes that can be
189+
/// written to a buffer before the it returns `Poll::Pending`.
190+
#[cfg_attr(docsrs, doc(cfg(feature = "io-util")))]
191+
pub fn simplex(max_buf_size: usize) -> SimplexStream {
192+
SimplexStream {
193+
buffer: BytesMut::new(),
194+
is_closed: false,
195+
max_buf_size,
196+
read_waker: None,
197+
write_waker: None,
175198
}
199+
}
176200

201+
impl SimplexStream {
177202
fn close_write(&mut self) {
178203
self.is_closed = true;
179204
// needs to notify any readers that no more data will come
@@ -269,7 +294,7 @@ impl Pipe {
269294
}
270295
}
271296

272-
impl AsyncRead for Pipe {
297+
impl AsyncRead for SimplexStream {
273298
cfg_coop! {
274299
fn poll_read(
275300
self: Pin<&mut Self>,
@@ -299,7 +324,7 @@ impl AsyncRead for Pipe {
299324
}
300325
}
301326

302-
impl AsyncWrite for Pipe {
327+
impl AsyncWrite for SimplexStream {
303328
cfg_coop! {
304329
fn poll_write(
305330
self: Pin<&mut Self>,

tokio/src/io/util/mod.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ cfg_io_util! {
4242
pub use lines::Lines;
4343

4444
mod mem;
45-
pub use mem::{duplex, DuplexStream};
45+
pub use mem::{duplex, simplex, DuplexStream, SimplexStream};
4646

4747
mod read;
4848
mod read_buf;

0 commit comments

Comments
 (0)