Skip to content

Commit 48345d6

Browse files
authored
net: add support for anonymous unix pipes (#6127)
1 parent 581cd41 commit 48345d6

File tree

2 files changed

+358
-28
lines changed

2 files changed

+358
-28
lines changed

tokio/src/net/unix/pipe.rs

+253-28
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,8 @@ use crate::io::{AsyncRead, AsyncWrite, PollEvented, ReadBuf, Ready};
66
use mio::unix::pipe as mio_pipe;
77
use std::fs::File;
88
use std::io::{self, Read, Write};
9-
use std::os::unix::fs::{FileTypeExt, OpenOptionsExt};
10-
use std::os::unix::io::{AsFd, AsRawFd, BorrowedFd, FromRawFd, IntoRawFd, RawFd};
9+
use std::os::unix::fs::OpenOptionsExt;
10+
use std::os::unix::io::{AsFd, AsRawFd, BorrowedFd, FromRawFd, IntoRawFd, OwnedFd, RawFd};
1111
use std::path::Path;
1212
use std::pin::Pin;
1313
use std::task::{Context, Poll};
@@ -16,6 +16,59 @@ cfg_io_util! {
1616
use bytes::BufMut;
1717
}
1818

19+
/// Creates a new anonymous Unix pipe.
20+
///
21+
/// This function will open a new pipe and associate both pipe ends with the default
22+
/// event loop.
23+
///
24+
/// If you need to create a pipe for communication with a spawned process, you can
25+
/// use [`Stdio::piped()`] instead.
26+
///
27+
/// [`Stdio::piped()`]: std::process::Stdio::piped
28+
///
29+
/// # Errors
30+
///
31+
/// If creating a pipe fails, this function will return with the related OS error.
32+
///
33+
/// # Examples
34+
///
35+
/// Create a pipe and pass the writing end to a spawned process.
36+
///
37+
/// ```no_run
38+
/// use tokio::net::unix::pipe;
39+
/// use tokio::process::Command;
40+
/// # use tokio::io::AsyncReadExt;
41+
/// # use std::error::Error;
42+
///
43+
/// # async fn dox() -> Result<(), Box<dyn Error>> {
44+
/// let (tx, mut rx) = pipe::pipe()?;
45+
/// let mut buffer = String::new();
46+
///
47+
/// let status = Command::new("echo")
48+
/// .arg("Hello, world!")
49+
/// .stdout(tx.into_blocking_fd()?)
50+
/// .status();
51+
/// rx.read_to_string(&mut buffer).await?;
52+
///
53+
/// assert!(status.await?.success());
54+
/// assert_eq!(buffer, "Hello, world!\n");
55+
/// # Ok(())
56+
/// # }
57+
/// ```
58+
///
59+
/// # Panics
60+
///
61+
/// This function panics if it is not called from within a runtime with
62+
/// IO enabled.
63+
///
64+
/// The runtime is usually set implicitly when this function is called
65+
/// from a future driven by a tokio runtime, otherwise runtime can be set
66+
/// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function.
67+
pub fn pipe() -> io::Result<(Sender, Receiver)> {
68+
let (tx, rx) = mio_pipe::new()?;
69+
Ok((Sender::from_mio(tx)?, Receiver::from_mio(rx)?))
70+
}
71+
1972
/// Options and flags which can be used to configure how a FIFO file is opened.
2073
///
2174
/// This builder allows configuring how to create a pipe end from a FIFO file.
@@ -218,7 +271,7 @@ impl OpenOptions {
218271

219272
let file = options.open(path)?;
220273

221-
if !self.unchecked && !is_fifo(&file)? {
274+
if !self.unchecked && !is_pipe(file.as_fd())? {
222275
return Err(io::Error::new(io::ErrorKind::InvalidInput, "not a pipe"));
223276
}
224277

@@ -338,15 +391,40 @@ impl Sender {
338391
/// The runtime is usually set implicitly when this function is called
339392
/// from a future driven by a tokio runtime, otherwise runtime can be set
340393
/// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function.
341-
pub fn from_file(mut file: File) -> io::Result<Sender> {
342-
if !is_fifo(&file)? {
394+
pub fn from_file(file: File) -> io::Result<Sender> {
395+
Sender::from_owned_fd(file.into())
396+
}
397+
398+
/// Creates a new `Sender` from an [`OwnedFd`].
399+
///
400+
/// This function is intended to construct a pipe from an [`OwnedFd`] representing
401+
/// an anonymous pipe or a special FIFO file. It will check if the file descriptor
402+
/// is a pipe and has write access, set it in non-blocking mode and perform the
403+
/// conversion.
404+
///
405+
/// # Errors
406+
///
407+
/// Fails with `io::ErrorKind::InvalidInput` if the file descriptor is not a pipe
408+
/// or it does not have write access. Also fails with any standard OS error if it
409+
/// occurs.
410+
///
411+
/// # Panics
412+
///
413+
/// This function panics if it is not called from within a runtime with
414+
/// IO enabled.
415+
///
416+
/// The runtime is usually set implicitly when this function is called
417+
/// from a future driven by a tokio runtime, otherwise runtime can be set
418+
/// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function.
419+
pub fn from_owned_fd(owned_fd: OwnedFd) -> io::Result<Sender> {
420+
if !is_pipe(owned_fd.as_fd())? {
343421
return Err(io::Error::new(io::ErrorKind::InvalidInput, "not a pipe"));
344422
}
345423

346-
let flags = get_file_flags(&file)?;
424+
let flags = get_file_flags(owned_fd.as_fd())?;
347425
if has_write_access(flags) {
348-
set_nonblocking(&mut file, flags)?;
349-
Sender::from_file_unchecked(file)
426+
set_nonblocking(owned_fd.as_fd(), flags)?;
427+
Sender::from_owned_fd_unchecked(owned_fd)
350428
} else {
351429
Err(io::Error::new(
352430
io::ErrorKind::InvalidInput,
@@ -394,8 +472,28 @@ impl Sender {
394472
/// from a future driven by a tokio runtime, otherwise runtime can be set
395473
/// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function.
396474
pub fn from_file_unchecked(file: File) -> io::Result<Sender> {
397-
let raw_fd = file.into_raw_fd();
398-
let mio_tx = unsafe { mio_pipe::Sender::from_raw_fd(raw_fd) };
475+
Sender::from_owned_fd_unchecked(file.into())
476+
}
477+
478+
/// Creates a new `Sender` from an [`OwnedFd`] without checking pipe properties.
479+
///
480+
/// This function is intended to construct a pipe from an [`OwnedFd`] representing
481+
/// an anonymous pipe or a special FIFO file. The conversion assumes nothing about
482+
/// the underlying pipe; it is left up to the user to make sure that the file
483+
/// descriptor represents the writing end of a pipe and the pipe is set in
484+
/// non-blocking mode.
485+
///
486+
/// # Panics
487+
///
488+
/// This function panics if it is not called from within a runtime with
489+
/// IO enabled.
490+
///
491+
/// The runtime is usually set implicitly when this function is called
492+
/// from a future driven by a tokio runtime, otherwise runtime can be set
493+
/// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function.
494+
pub fn from_owned_fd_unchecked(owned_fd: OwnedFd) -> io::Result<Sender> {
495+
// Safety: OwnedFd represents a valid, open file descriptor.
496+
let mio_tx = unsafe { mio_pipe::Sender::from_raw_fd(owned_fd.into_raw_fd()) };
399497
Sender::from_mio(mio_tx)
400498
}
401499

@@ -623,6 +721,31 @@ impl Sender {
623721
.registration()
624722
.try_io(Interest::WRITABLE, || (&*self.io).write_vectored(buf))
625723
}
724+
725+
/// Converts the pipe into an [`OwnedFd`] in blocking mode.
726+
///
727+
/// This function will deregister this pipe end from the event loop, set
728+
/// it in blocking mode and perform the conversion.
729+
pub fn into_blocking_fd(self) -> io::Result<OwnedFd> {
730+
let fd = self.into_nonblocking_fd()?;
731+
set_blocking(&fd)?;
732+
Ok(fd)
733+
}
734+
735+
/// Converts the pipe into an [`OwnedFd`] in nonblocking mode.
736+
///
737+
/// This function will deregister this pipe end from the event loop and
738+
/// perform the conversion. The returned file descriptor will be in nonblocking
739+
/// mode.
740+
pub fn into_nonblocking_fd(self) -> io::Result<OwnedFd> {
741+
let mio_pipe = self.io.into_inner()?;
742+
743+
// Safety: the pipe is now deregistered from the event loop
744+
// and we are the only owner of this pipe end.
745+
let owned_fd = unsafe { OwnedFd::from_raw_fd(mio_pipe.into_raw_fd()) };
746+
747+
Ok(owned_fd)
748+
}
626749
}
627750

628751
impl AsyncWrite for Sender {
@@ -764,15 +887,40 @@ impl Receiver {
764887
/// The runtime is usually set implicitly when this function is called
765888
/// from a future driven by a tokio runtime, otherwise runtime can be set
766889
/// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function.
767-
pub fn from_file(mut file: File) -> io::Result<Receiver> {
768-
if !is_fifo(&file)? {
890+
pub fn from_file(file: File) -> io::Result<Receiver> {
891+
Receiver::from_owned_fd(file.into())
892+
}
893+
894+
/// Creates a new `Receiver` from an [`OwnedFd`].
895+
///
896+
/// This function is intended to construct a pipe from an [`OwnedFd`] representing
897+
/// an anonymous pipe or a special FIFO file. It will check if the file descriptor
898+
/// is a pipe and has read access, set it in non-blocking mode and perform the
899+
/// conversion.
900+
///
901+
/// # Errors
902+
///
903+
/// Fails with `io::ErrorKind::InvalidInput` if the file descriptor is not a pipe
904+
/// or it does not have read access. Also fails with any standard OS error if it
905+
/// occurs.
906+
///
907+
/// # Panics
908+
///
909+
/// This function panics if it is not called from within a runtime with
910+
/// IO enabled.
911+
///
912+
/// The runtime is usually set implicitly when this function is called
913+
/// from a future driven by a tokio runtime, otherwise runtime can be set
914+
/// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function.
915+
pub fn from_owned_fd(owned_fd: OwnedFd) -> io::Result<Receiver> {
916+
if !is_pipe(owned_fd.as_fd())? {
769917
return Err(io::Error::new(io::ErrorKind::InvalidInput, "not a pipe"));
770918
}
771919

772-
let flags = get_file_flags(&file)?;
920+
let flags = get_file_flags(owned_fd.as_fd())?;
773921
if has_read_access(flags) {
774-
set_nonblocking(&mut file, flags)?;
775-
Receiver::from_file_unchecked(file)
922+
set_nonblocking(owned_fd.as_fd(), flags)?;
923+
Receiver::from_owned_fd_unchecked(owned_fd)
776924
} else {
777925
Err(io::Error::new(
778926
io::ErrorKind::InvalidInput,
@@ -820,8 +968,28 @@ impl Receiver {
820968
/// from a future driven by a tokio runtime, otherwise runtime can be set
821969
/// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function.
822970
pub fn from_file_unchecked(file: File) -> io::Result<Receiver> {
823-
let raw_fd = file.into_raw_fd();
824-
let mio_rx = unsafe { mio_pipe::Receiver::from_raw_fd(raw_fd) };
971+
Receiver::from_owned_fd_unchecked(file.into())
972+
}
973+
974+
/// Creates a new `Receiver` from an [`OwnedFd`] without checking pipe properties.
975+
///
976+
/// This function is intended to construct a pipe from an [`OwnedFd`] representing
977+
/// an anonymous pipe or a special FIFO file. The conversion assumes nothing about
978+
/// the underlying pipe; it is left up to the user to make sure that the file
979+
/// descriptor represents the reading end of a pipe and the pipe is set in
980+
/// non-blocking mode.
981+
///
982+
/// # Panics
983+
///
984+
/// This function panics if it is not called from within a runtime with
985+
/// IO enabled.
986+
///
987+
/// The runtime is usually set implicitly when this function is called
988+
/// from a future driven by a tokio runtime, otherwise runtime can be set
989+
/// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function.
990+
pub fn from_owned_fd_unchecked(owned_fd: OwnedFd) -> io::Result<Receiver> {
991+
// Safety: OwnedFd represents a valid, open file descriptor.
992+
let mio_rx = unsafe { mio_pipe::Receiver::from_raw_fd(owned_fd.into_raw_fd()) };
825993
Receiver::from_mio(mio_rx)
826994
}
827995

@@ -1146,6 +1314,31 @@ impl Receiver {
11461314
})
11471315
}
11481316
}
1317+
1318+
/// Converts the pipe into an [`OwnedFd`] in blocking mode.
1319+
///
1320+
/// This function will deregister this pipe end from the event loop, set
1321+
/// it in blocking mode and perform the conversion.
1322+
pub fn into_blocking_fd(self) -> io::Result<OwnedFd> {
1323+
let fd = self.into_nonblocking_fd()?;
1324+
set_blocking(&fd)?;
1325+
Ok(fd)
1326+
}
1327+
1328+
/// Converts the pipe into an [`OwnedFd`] in nonblocking mode.
1329+
///
1330+
/// This function will deregister this pipe end from the event loop and
1331+
/// perform the conversion. Returned file descriptor will be in nonblocking
1332+
/// mode.
1333+
pub fn into_nonblocking_fd(self) -> io::Result<OwnedFd> {
1334+
let mio_pipe = self.io.into_inner()?;
1335+
1336+
// Safety: the pipe is now deregistered from the event loop
1337+
// and we are the only owner of this pipe end.
1338+
let owned_fd = unsafe { OwnedFd::from_raw_fd(mio_pipe.into_raw_fd()) };
1339+
1340+
Ok(owned_fd)
1341+
}
11491342
}
11501343

11511344
impl AsyncRead for Receiver {
@@ -1172,15 +1365,27 @@ impl AsFd for Receiver {
11721365
}
11731366
}
11741367

1175-
/// Checks if file is a FIFO
1176-
fn is_fifo(file: &File) -> io::Result<bool> {
1177-
Ok(file.metadata()?.file_type().is_fifo())
1368+
/// Checks if the file descriptor is a pipe or a FIFO.
1369+
fn is_pipe(fd: BorrowedFd<'_>) -> io::Result<bool> {
1370+
// Safety: `libc::stat` is C-like struct used for syscalls and all-zero
1371+
// byte pattern forms a valid value.
1372+
let mut stat: libc::stat = unsafe { std::mem::zeroed() };
1373+
1374+
// Safety: it's safe to call `fstat` with a valid, open file descriptor
1375+
// and a valid pointer to a `stat` struct.
1376+
let r = unsafe { libc::fstat(fd.as_raw_fd(), &mut stat) };
1377+
1378+
if r == -1 {
1379+
Err(io::Error::last_os_error())
1380+
} else {
1381+
Ok((stat.st_mode as libc::mode_t & libc::S_IFMT) == libc::S_IFIFO)
1382+
}
11781383
}
11791384

11801385
/// Gets file descriptor's flags by fcntl.
1181-
fn get_file_flags(file: &File) -> io::Result<libc::c_int> {
1182-
let fd = file.as_raw_fd();
1183-
let flags = unsafe { libc::fcntl(fd, libc::F_GETFL) };
1386+
fn get_file_flags(fd: BorrowedFd<'_>) -> io::Result<libc::c_int> {
1387+
// Safety: it's safe to use `fcntl` to read flags of a valid, open file descriptor.
1388+
let flags = unsafe { libc::fcntl(fd.as_raw_fd(), libc::F_GETFL) };
11841389
if flags < 0 {
11851390
Err(io::Error::last_os_error())
11861391
} else {
@@ -1200,18 +1405,38 @@ fn has_write_access(flags: libc::c_int) -> bool {
12001405
mode == libc::O_WRONLY || mode == libc::O_RDWR
12011406
}
12021407

1203-
/// Sets file's flags with `O_NONBLOCK` by fcntl.
1204-
fn set_nonblocking(file: &mut File, current_flags: libc::c_int) -> io::Result<()> {
1205-
let fd = file.as_raw_fd();
1206-
1408+
/// Sets file descriptor's flags with `O_NONBLOCK` by fcntl.
1409+
fn set_nonblocking(fd: BorrowedFd<'_>, current_flags: libc::c_int) -> io::Result<()> {
12071410
let flags = current_flags | libc::O_NONBLOCK;
12081411

12091412
if flags != current_flags {
1210-
let ret = unsafe { libc::fcntl(fd, libc::F_SETFL, flags) };
1413+
// Safety: it's safe to use `fcntl` to set the `O_NONBLOCK` flag of a valid,
1414+
// open file descriptor.
1415+
let ret = unsafe { libc::fcntl(fd.as_raw_fd(), libc::F_SETFL, flags) };
12111416
if ret < 0 {
12121417
return Err(io::Error::last_os_error());
12131418
}
12141419
}
12151420

12161421
Ok(())
12171422
}
1423+
1424+
/// Removes `O_NONBLOCK` from fd's flags.
1425+
fn set_blocking<T: AsRawFd>(fd: &T) -> io::Result<()> {
1426+
// Safety: it's safe to use `fcntl` to read flags of a valid, open file descriptor.
1427+
let previous = unsafe { libc::fcntl(fd.as_raw_fd(), libc::F_GETFL) };
1428+
if previous == -1 {
1429+
return Err(io::Error::last_os_error());
1430+
}
1431+
1432+
let new = previous & !libc::O_NONBLOCK;
1433+
1434+
// Safety: it's safe to use `fcntl` to unset the `O_NONBLOCK` flag of a valid,
1435+
// open file descriptor.
1436+
let r = unsafe { libc::fcntl(fd.as_raw_fd(), libc::F_SETFL, new) };
1437+
if r == -1 {
1438+
Err(io::Error::last_os_error())
1439+
} else {
1440+
Ok(())
1441+
}
1442+
}

0 commit comments

Comments
 (0)