Skip to content

Commit bd15bac

Browse files
authored
io: add tokio::io::Join (#6220)
1 parent 410660f commit bd15bac

File tree

3 files changed

+202
-0
lines changed

3 files changed

+202
-0
lines changed

tokio/src/io/join.rs

+117
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,117 @@
1+
//! Join two values implementing `AsyncRead` and `AsyncWrite` into a single one.
2+
3+
use crate::io::{AsyncRead, AsyncWrite, ReadBuf};
4+
5+
use std::io;
6+
use std::pin::Pin;
7+
use std::task::{Context, Poll};
8+
9+
/// Join two values implementing `AsyncRead` and `AsyncWrite` into a
10+
/// single handle.
11+
pub fn join<R, W>(reader: R, writer: W) -> Join<R, W>
12+
where
13+
R: AsyncRead,
14+
W: AsyncWrite,
15+
{
16+
Join { reader, writer }
17+
}
18+
19+
pin_project_lite::pin_project! {
20+
/// Joins two values implementing `AsyncRead` and `AsyncWrite` into a
21+
/// single handle.
22+
#[derive(Debug)]
23+
pub struct Join<R, W> {
24+
#[pin]
25+
reader: R,
26+
#[pin]
27+
writer: W,
28+
}
29+
}
30+
31+
impl<R, W> Join<R, W>
32+
where
33+
R: AsyncRead,
34+
W: AsyncWrite,
35+
{
36+
/// Splits this `Join` back into its `AsyncRead` and `AsyncWrite`
37+
/// components.
38+
pub fn into_inner(self) -> (R, W) {
39+
(self.reader, self.writer)
40+
}
41+
42+
/// Returns a reference to the inner reader.
43+
pub fn reader(&self) -> &R {
44+
&self.reader
45+
}
46+
47+
/// Returns a reference to the inner writer.
48+
pub fn writer(&self) -> &W {
49+
&self.writer
50+
}
51+
52+
/// Returns a mutable reference to the inner reader.
53+
pub fn reader_mut(&mut self) -> &mut R {
54+
&mut self.reader
55+
}
56+
57+
/// Returns a mutable reference to the inner writer.
58+
pub fn writer_mut(&mut self) -> &mut W {
59+
&mut self.writer
60+
}
61+
62+
/// Returns a pinned mutable reference to the inner reader.
63+
pub fn reader_pin_mut(self: Pin<&mut Self>) -> Pin<&mut R> {
64+
self.project().reader
65+
}
66+
67+
/// Returns a pinned mutable reference to the inner writer.
68+
pub fn writer_pin_mut(self: Pin<&mut Self>) -> Pin<&mut W> {
69+
self.project().writer
70+
}
71+
}
72+
73+
impl<R, W> AsyncRead for Join<R, W>
74+
where
75+
R: AsyncRead,
76+
{
77+
fn poll_read(
78+
self: Pin<&mut Self>,
79+
cx: &mut Context<'_>,
80+
buf: &mut ReadBuf<'_>,
81+
) -> Poll<Result<(), io::Error>> {
82+
self.project().reader.poll_read(cx, buf)
83+
}
84+
}
85+
86+
impl<R, W> AsyncWrite for Join<R, W>
87+
where
88+
W: AsyncWrite,
89+
{
90+
fn poll_write(
91+
self: Pin<&mut Self>,
92+
cx: &mut Context<'_>,
93+
buf: &[u8],
94+
) -> Poll<Result<usize, io::Error>> {
95+
self.project().writer.poll_write(cx, buf)
96+
}
97+
98+
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
99+
self.project().writer.poll_flush(cx)
100+
}
101+
102+
fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
103+
self.project().writer.poll_shutdown(cx)
104+
}
105+
106+
fn poll_write_vectored(
107+
self: Pin<&mut Self>,
108+
cx: &mut Context<'_>,
109+
bufs: &[io::IoSlice<'_>],
110+
) -> Poll<Result<usize, io::Error>> {
111+
self.project().writer.poll_write_vectored(cx, bufs)
112+
}
113+
114+
fn is_write_vectored(&self) -> bool {
115+
self.writer.is_write_vectored()
116+
}
117+
}

tokio/src/io/mod.rs

+2
Original file line numberDiff line numberDiff line change
@@ -265,6 +265,8 @@ cfg_io_std! {
265265
cfg_io_util! {
266266
mod split;
267267
pub use split::{split, ReadHalf, WriteHalf};
268+
mod join;
269+
pub use join::{join, Join};
268270

269271
pub(crate) mod seek;
270272
pub(crate) mod util;

tokio/tests/io_join.rs

+83
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
#![warn(rust_2018_idioms)]
2+
#![cfg(feature = "full")]
3+
4+
use tokio::io::{join, AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt, Join, ReadBuf};
5+
6+
use std::io;
7+
use std::pin::Pin;
8+
use std::task::{Context, Poll};
9+
10+
struct R;
11+
12+
impl AsyncRead for R {
13+
fn poll_read(
14+
self: Pin<&mut Self>,
15+
_cx: &mut Context<'_>,
16+
buf: &mut ReadBuf<'_>,
17+
) -> Poll<io::Result<()>> {
18+
buf.put_slice(&[b'z']);
19+
Poll::Ready(Ok(()))
20+
}
21+
}
22+
23+
struct W;
24+
25+
impl AsyncWrite for W {
26+
fn poll_write(
27+
self: Pin<&mut Self>,
28+
_cx: &mut Context<'_>,
29+
_buf: &[u8],
30+
) -> Poll<Result<usize, io::Error>> {
31+
Poll::Ready(Ok(1))
32+
}
33+
34+
fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
35+
Poll::Ready(Ok(()))
36+
}
37+
38+
fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
39+
Poll::Ready(Ok(()))
40+
}
41+
42+
fn poll_write_vectored(
43+
self: Pin<&mut Self>,
44+
_cx: &mut Context<'_>,
45+
_bufs: &[io::IoSlice<'_>],
46+
) -> Poll<Result<usize, io::Error>> {
47+
Poll::Ready(Ok(2))
48+
}
49+
50+
fn is_write_vectored(&self) -> bool {
51+
true
52+
}
53+
}
54+
55+
#[test]
56+
fn is_send_and_sync() {
57+
fn assert_bound<T: Send + Sync>() {}
58+
59+
assert_bound::<Join<W, R>>();
60+
}
61+
62+
#[test]
63+
fn method_delegation() {
64+
let mut rw = join(R, W);
65+
let mut buf = [0; 1];
66+
67+
tokio_test::block_on(async move {
68+
assert_eq!(1, rw.read(&mut buf).await.unwrap());
69+
assert_eq!(b'z', buf[0]);
70+
71+
assert_eq!(1, rw.write(&[b'x']).await.unwrap());
72+
assert_eq!(
73+
2,
74+
rw.write_vectored(&[io::IoSlice::new(&[b'x'])])
75+
.await
76+
.unwrap()
77+
);
78+
assert!(rw.is_write_vectored());
79+
80+
assert!(rw.flush().await.is_ok());
81+
assert!(rw.shutdown().await.is_ok());
82+
});
83+
}

0 commit comments

Comments
 (0)