Skip to content

Commit 45fb332

Browse files
committed
io: add unidirectional stream structure
Up until this patch we had the `DuplexStream` which was backed by two underlying pipes. This patch makes public this underlying structure and renames it from the `Pipe` to the `SimplexStream` to provent name confusion and keep simililiarity with the already existing duplex.
1 parent ab53bf0 commit 45fb332

File tree

3 files changed

+47
-24
lines changed

3 files changed

+47
-24
lines changed

tokio/src/io/mod.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -271,8 +271,8 @@ cfg_io_util! {
271271
pub(crate) mod seek;
272272
pub(crate) mod util;
273273
pub use util::{
274-
copy, copy_bidirectional, copy_bidirectional_with_sizes, copy_buf, duplex, empty, repeat, sink, AsyncBufReadExt, AsyncReadExt, AsyncSeekExt, AsyncWriteExt,
275-
BufReader, BufStream, BufWriter, DuplexStream, Empty, Lines, Repeat, Sink, Split, Take,
274+
copy, copy_bidirectional, copy_bidirectional_with_sizes, copy_buf, duplex, empty, repeat, sink, simplex, AsyncBufReadExt, AsyncReadExt, AsyncSeekExt, AsyncWriteExt,
275+
BufReader, BufStream, BufWriter, DuplexStream, Empty, Lines, Repeat, Sink, Split, Take, SimplexStream,
276276
};
277277
}
278278

tokio/src/io/util/mem.rs

+44-21
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
//! In-process memory IO types.
22
3-
use crate::io::{AsyncRead, AsyncWrite, ReadBuf};
3+
use crate::io::{split, AsyncRead, AsyncWrite, ReadBuf, ReadHalf, WriteHalf};
44
use crate::loom::sync::Mutex;
55

66
use bytes::{Buf, BytesMut};
@@ -47,15 +47,31 @@ use std::{
4747
#[derive(Debug)]
4848
#[cfg_attr(docsrs, doc(cfg(feature = "io-util")))]
4949
pub struct DuplexStream {
50-
read: Arc<Mutex<Pipe>>,
51-
write: Arc<Mutex<Pipe>>,
50+
read: Arc<Mutex<SimplexStream>>,
51+
write: Arc<Mutex<SimplexStream>>,
5252
}
5353

54-
/// A unidirectional IO over a piece of memory.
54+
/// A unidirectional pipe to read and write bytes in memory.
5555
///
56-
/// Data can be written to the pipe, and reading will return that data.
56+
/// # Example
57+
///
58+
/// ```
59+
/// # async fn ex() -> std::io::Result<()> {
60+
/// # use tokio::io::{AsyncReadExt, AsyncWriteExt};
61+
/// let (mut receiver, mut sender) = tokio::io::simplex(64);
62+
///
63+
/// sender.write_all(b"ping").await?;
64+
///
65+
/// let mut buf = [0u8; 4];
66+
/// receiver.read_exact(&mut buf).await?;
67+
/// assert_eq!(&buf, b"ping");
68+
///
69+
/// # Ok(())
70+
/// # }
71+
/// ```
5772
#[derive(Debug)]
58-
struct Pipe {
73+
#[cfg_attr(docsrs, doc(cfg(feature = "io-util")))]
74+
pub struct SimplexStream {
5975
/// The buffer storing the bytes written, also read from.
6076
///
6177
/// Using a `BytesMut` because it has efficient `Buf` and `BufMut`
@@ -83,8 +99,10 @@ struct Pipe {
8399
/// written to a side before the write returns `Poll::Pending`.
84100
#[cfg_attr(docsrs, doc(cfg(feature = "io-util")))]
85101
pub fn duplex(max_buf_size: usize) -> (DuplexStream, DuplexStream) {
86-
let one = Arc::new(Mutex::new(Pipe::new(max_buf_size)));
87-
let two = Arc::new(Mutex::new(Pipe::new(max_buf_size)));
102+
let (reader, writer) = simplex(max_buf_size);
103+
let one = Arc::new(Mutex::new(reader.unsplit(writer)));
104+
let (reader, writer) = simplex(max_buf_size);
105+
let two = Arc::new(Mutex::new(reader.unsplit(writer)));
88106

89107
(
90108
DuplexStream {
@@ -161,19 +179,24 @@ impl Drop for DuplexStream {
161179
}
162180
}
163181

164-
// ===== impl Pipe =====
182+
// ===== impl SimplexStream =====
165183

166-
impl Pipe {
167-
fn new(max_buf_size: usize) -> Self {
168-
Pipe {
169-
buffer: BytesMut::new(),
170-
is_closed: false,
171-
max_buf_size,
172-
read_waker: None,
173-
write_waker: None,
174-
}
175-
}
184+
/// Creates unidirectional buffer that acts like a pipe.
185+
///
186+
/// The `max_buf_size` argument is the maximum amount of bytes that can be
187+
/// written to a buffer before the it returns `Poll::Pending`.
188+
#[cfg_attr(docsrs, doc(cfg(feature = "io-util")))]
189+
pub fn simplex(max_buf_size: usize) -> (ReadHalf<SimplexStream>, WriteHalf<SimplexStream>) {
190+
split(SimplexStream {
191+
buffer: BytesMut::new(),
192+
is_closed: false,
193+
max_buf_size,
194+
read_waker: None,
195+
write_waker: None,
196+
})
197+
}
176198

199+
impl SimplexStream {
177200
fn close_write(&mut self) {
178201
self.is_closed = true;
179202
// needs to notify any readers that no more data will come
@@ -269,7 +292,7 @@ impl Pipe {
269292
}
270293
}
271294

272-
impl AsyncRead for Pipe {
295+
impl AsyncRead for SimplexStream {
273296
cfg_coop! {
274297
fn poll_read(
275298
self: Pin<&mut Self>,
@@ -299,7 +322,7 @@ impl AsyncRead for Pipe {
299322
}
300323
}
301324

302-
impl AsyncWrite for Pipe {
325+
impl AsyncWrite for SimplexStream {
303326
cfg_coop! {
304327
fn poll_write(
305328
self: Pin<&mut Self>,

tokio/src/io/util/mod.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ cfg_io_util! {
4242
pub use lines::Lines;
4343

4444
mod mem;
45-
pub use mem::{duplex, DuplexStream};
45+
pub use mem::{duplex, simplex, DuplexStream, SimplexStream};
4646

4747
mod read;
4848
mod read_buf;

0 commit comments

Comments
 (0)