|
1 | 1 | //! In-process memory IO types.
|
2 | 2 |
|
3 |
| -use crate::io::{AsyncRead, AsyncWrite, ReadBuf}; |
| 3 | +use crate::io::{split, AsyncRead, AsyncWrite, ReadBuf, ReadHalf, WriteHalf}; |
4 | 4 | use crate::loom::sync::Mutex;
|
5 | 5 |
|
6 | 6 | use bytes::{Buf, BytesMut};
|
@@ -47,15 +47,31 @@ use std::{
|
47 | 47 | #[derive(Debug)]
|
48 | 48 | #[cfg_attr(docsrs, doc(cfg(feature = "io-util")))]
|
49 | 49 | pub struct DuplexStream {
|
50 |
| - read: Arc<Mutex<Pipe>>, |
51 |
| - write: Arc<Mutex<Pipe>>, |
| 50 | + read: Arc<Mutex<SimplexStream>>, |
| 51 | + write: Arc<Mutex<SimplexStream>>, |
52 | 52 | }
|
53 | 53 |
|
54 |
| -/// A unidirectional IO over a piece of memory. |
| 54 | +/// A unidirectional pipe to read and write bytes in memory. |
55 | 55 | ///
|
56 |
| -/// Data can be written to the pipe, and reading will return that data. |
| 56 | +/// # Example |
| 57 | +/// |
| 58 | +/// ``` |
| 59 | +/// # async fn ex() -> std::io::Result<()> { |
| 60 | +/// # use tokio::io::{AsyncReadExt, AsyncWriteExt}; |
| 61 | +/// let (mut receiver, mut sender) = tokio::io::simplex(64); |
| 62 | +/// |
| 63 | +/// sender.write_all(b"ping").await?; |
| 64 | +/// |
| 65 | +/// let mut buf = [0u8; 4]; |
| 66 | +/// receiver.read_exact(&mut buf).await?; |
| 67 | +/// assert_eq!(&buf, b"ping"); |
| 68 | +/// |
| 69 | +/// # Ok(()) |
| 70 | +/// # } |
| 71 | +/// ``` |
57 | 72 | #[derive(Debug)]
|
58 |
| -struct Pipe { |
| 73 | +#[cfg_attr(docsrs, doc(cfg(feature = "io-util")))] |
| 74 | +pub struct SimplexStream { |
59 | 75 | /// The buffer storing the bytes written, also read from.
|
60 | 76 | ///
|
61 | 77 | /// Using a `BytesMut` because it has efficient `Buf` and `BufMut`
|
@@ -83,8 +99,8 @@ struct Pipe {
|
83 | 99 | /// written to a side before the write returns `Poll::Pending`.
|
84 | 100 | #[cfg_attr(docsrs, doc(cfg(feature = "io-util")))]
|
85 | 101 | pub fn duplex(max_buf_size: usize) -> (DuplexStream, DuplexStream) {
|
86 |
| - let one = Arc::new(Mutex::new(Pipe::new(max_buf_size))); |
87 |
| - let two = Arc::new(Mutex::new(Pipe::new(max_buf_size))); |
| 102 | + let one = Arc::new(Mutex::new(SimplexStream::new(max_buf_size))); |
| 103 | + let two = Arc::new(Mutex::new(SimplexStream::new(max_buf_size))); |
88 | 104 |
|
89 | 105 | (
|
90 | 106 | DuplexStream {
|
@@ -161,11 +177,39 @@ impl Drop for DuplexStream {
|
161 | 177 | }
|
162 | 178 | }
|
163 | 179 |
|
164 |
| -// ===== impl Pipe ===== |
| 180 | +// ===== impl SimplexStream ===== |
| 181 | + |
| 182 | +/// Creates unidirectional buffer that acts like a pipe. |
| 183 | +/// |
| 184 | +/// The `max_buf_size` argument is the maximum amount of bytes that can be |
| 185 | +/// written to a buffer before the it returns `Poll::Pending`. |
| 186 | +/// |
| 187 | +/// # Unsplit |
| 188 | +/// |
| 189 | +/// The reader and writer half can be unsplit into a single structure |
| 190 | +/// of `SimplexStream` that supports both reading and writing. |
| 191 | +/// |
| 192 | +/// ``` |
| 193 | +/// # async fn ex() -> std::io::Result<()> { |
| 194 | +/// # use tokio::io::{AsyncReadExt, AsyncWriteExt}; |
| 195 | +/// let (writer, reader) = tokio::io::simplex(64); |
| 196 | +/// let mut simplex_stream = writer.unsplit(reader); |
| 197 | +/// simplex_stream.write_all(b"hello").await?; |
| 198 | +/// |
| 199 | +/// let mut buf = [0u8; 5]; |
| 200 | +/// simplex_stream.read_exact(&mut buf).await?; |
| 201 | +/// assert_eq!(&buf, b"hello"); |
| 202 | +/// # Ok(()) |
| 203 | +/// # } |
| 204 | +/// ``` |
| 205 | +#[cfg_attr(docsrs, doc(cfg(feature = "io-util")))] |
| 206 | +pub fn simplex(max_buf_size: usize) -> (ReadHalf<SimplexStream>, WriteHalf<SimplexStream>) { |
| 207 | + split(SimplexStream::new(max_buf_size)) |
| 208 | +} |
165 | 209 |
|
166 |
| -impl Pipe { |
167 |
| - fn new(max_buf_size: usize) -> Self { |
168 |
| - Pipe { |
| 210 | +impl SimplexStream { |
| 211 | + fn new(max_buf_size: usize) -> SimplexStream { |
| 212 | + SimplexStream { |
169 | 213 | buffer: BytesMut::new(),
|
170 | 214 | is_closed: false,
|
171 | 215 | max_buf_size,
|
@@ -269,7 +313,7 @@ impl Pipe {
|
269 | 313 | }
|
270 | 314 | }
|
271 | 315 |
|
272 |
| -impl AsyncRead for Pipe { |
| 316 | +impl AsyncRead for SimplexStream { |
273 | 317 | cfg_coop! {
|
274 | 318 | fn poll_read(
|
275 | 319 | self: Pin<&mut Self>,
|
@@ -299,7 +343,7 @@ impl AsyncRead for Pipe {
|
299 | 343 | }
|
300 | 344 | }
|
301 | 345 |
|
302 |
| -impl AsyncWrite for Pipe { |
| 346 | +impl AsyncWrite for SimplexStream { |
303 | 347 | cfg_coop! {
|
304 | 348 | fn poll_write(
|
305 | 349 | self: Pin<&mut Self>,
|
|
0 commit comments