diff --git a/Cargo.toml b/Cargo.toml index 5bfa19fb..34c903bb 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -18,10 +18,11 @@ maintenance = { status = "actively-developed" } all-features = true [features] -default = ["tcp", "udp", "websocket"] # All features by default +default = ["tcp", "udp", "websocket"] # All features that are crossplatform by default tcp = ["mio/net", "socket2"] udp = ["mio/net", "socket2"] websocket = ["tungstenite", "url", "tcp"] +unixsocket = ["mio/net", "socket2"] # TODO: check if this is really needed [dependencies] mio = { version = "0.8", features = ["os-poll"] } diff --git a/examples/throughput/main.rs b/examples/throughput/main.rs index 9d19022c..0df82f44 100644 --- a/examples/throughput/main.rs +++ b/examples/throughput/main.rs @@ -25,6 +25,11 @@ fn main() { throughput_message_io(Transport::Tcp, CHUNK); throughput_message_io(Transport::FramedTcp, CHUNK); throughput_message_io(Transport::Ws, CHUNK); + // for platforms that support it + #[cfg(feature = "unixsocket")] + throughput_message_io(Transport::UnixSocketStream, CHUNK); + #[cfg(feature = "unixsocket")] + throughput_message_io(Transport::UnixSocketDatagram, CHUNK); println!(); throughput_native_udp(CHUNK); throughput_native_tcp(CHUNK); diff --git a/src/adapters.rs b/src/adapters.rs index d5fcede7..93eaa4ea 100644 --- a/src/adapters.rs +++ b/src/adapters.rs @@ -8,5 +8,7 @@ pub mod framed_tcp; pub mod udp; #[cfg(feature = "websocket")] pub mod ws; +#[cfg(feature = "unixsocket" )] +pub mod unix_socket; // Add new adapters here -// ... +// ... \ No newline at end of file diff --git a/src/adapters/unix_socket.rs b/src/adapters/unix_socket.rs new file mode 100644 index 00000000..e8082f64 --- /dev/null +++ b/src/adapters/unix_socket.rs @@ -0,0 +1,389 @@ +#![allow(unused_variables)] + +use crate::network::adapter::{ + Resource, Remote, Local, Adapter, SendStatus, AcceptedType, ReadStatus, ConnectionInfo, + ListeningInfo, PendingStatus, +}; +use crate::network::{RemoteAddr, Readiness, TransportConnect, TransportListen}; + +use mio::event::{Source}; +use mio::net::{UnixDatagram, UnixListener, UnixStream}; + +use std::mem::MaybeUninit; +use std::net::{IpAddr, Ipv4Addr, SocketAddr}; +use std::io::{self, ErrorKind, Read, Write}; +use std::ops::Deref; +use std::path::{Path, PathBuf}; +use std::fs; + +// Note: net.core.rmem_max = 212992 by default on linux systems +// not used because w euse unixstream I think? +// TODO: delete this if I PR +pub const MAX_PAYLOAD_LEN: usize = 212992; + +/// From tcp.rs +/// Size of the internal reading buffer. +/// It implies that at most the generated [`crate::network::NetEvent::Message`] +/// will contains a chunk of data of this value. +pub const INPUT_BUFFER_SIZE: usize = u16::MAX as usize; // 2^16 - 1 + +// We don't use the SocketAddr, we just striaght up get the path from config. +pub fn create_null_socketaddr() -> SocketAddr { + SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0,0,0,0)), 0) +} + +#[derive(Clone, PartialEq, Eq, Hash, Debug)] +pub struct UnixSocketListenConfig { + path: PathBuf, +} + +impl UnixSocketListenConfig { + pub fn new(path: PathBuf) -> Self { + Self { path } + } +} + +impl Default for UnixSocketListenConfig { + fn default() -> Self { + // TODO: better idea? I could make this into an option later and complain if empty. + Self { path: "/tmp/mio.sock".into() } + } +} + +#[derive(Clone, PartialEq, Eq, Hash, Debug)] +pub struct UnixSocketConnectConfig { + path: PathBuf, +} + +impl UnixSocketConnectConfig { + pub fn new(path: PathBuf) -> Self { + Self { path } + } +} + + +impl Default for UnixSocketConnectConfig { + fn default() -> Self { + // TODO: better idea? I could make this into an option later and complain if empty. + Self { path: "/tmp/mio.sock".into() } + } +} + +pub(crate) struct UnixSocketStreamAdapter; +impl Adapter for UnixSocketStreamAdapter { + type Remote = StreamRemoteResource; + type Local = StreamLocalResource; +} + +pub(crate) struct StreamRemoteResource { + stream: UnixStream +} + +impl Resource for StreamRemoteResource { + fn source(&mut self) -> &mut dyn Source { + &mut self.stream + } +} + +// taken from tcp impl +pub fn check_stream_ready(stream: &UnixStream) -> PendingStatus{ + if let Ok(Some(_)) = stream.take_error() { + return PendingStatus::Disconnected; + } + + return PendingStatus::Ready; +} + +impl Remote for StreamRemoteResource { + fn connect_with( + config: TransportConnect, + remote_addr: RemoteAddr, + ) -> io::Result> { + + let stream_config = match config { + TransportConnect::UnixSocketStream(config) => config, + _ => panic!("Internal error: Got wrong config"), + }; + + match UnixStream::connect(stream_config.path) { + Ok(stream) => { + Ok(ConnectionInfo { + remote: Self { + stream + }, + // the unixstream uses SocketAddr from mio that can't be converted + local_addr: create_null_socketaddr(), // stream.local_addr()?, + peer_addr: create_null_socketaddr() // stream.peer_addr()?.into(), + }) + }, + Err(err) => { + return Err(err); + }, + } + + + } + + fn receive(&self, mut process_data: impl FnMut(&[u8])) -> ReadStatus { + // Most of this is reused from tcp.rs + let buffer: MaybeUninit<[u8; INPUT_BUFFER_SIZE]> = MaybeUninit::uninit(); + let mut input_buffer = unsafe { buffer.assume_init() }; // Avoid to initialize the array + + loop { + let stream = &self.stream; + match stream.deref().read(&mut input_buffer) { + Ok(0) => break ReadStatus::Disconnected, + Ok(size) => process_data(&input_buffer[..size]), + Err(ref err) if err.kind() == ErrorKind::Interrupted => continue, + Err(ref err) if err.kind() == ErrorKind::WouldBlock => { + break ReadStatus::WaitNextEvent + } + Err(ref err) if err.kind() == ErrorKind::ConnectionReset => { + break ReadStatus::Disconnected + } + Err(err) => { + log::error!("Unix socket receive error: {}", err); + break ReadStatus::Disconnected // should not happen + } + } + } + } + + fn send(&self, data: &[u8]) -> SendStatus { + // Most of this is reused from tcp.rs + let mut total_bytes_sent = 0; + loop { + let stream = &self.stream; + match stream.deref().write(&data[total_bytes_sent..]) { + Ok(bytes_sent) => { + total_bytes_sent += bytes_sent; + if total_bytes_sent == data.len() { + break SendStatus::Sent + } + } + Err(ref err) if err.kind() == io::ErrorKind::WouldBlock => continue, + + // Others errors are considered fatal for the connection. + // a Event::Disconnection will be generated later. + Err(err) => { + log::error!("unix socket receive error: {}", err); + break SendStatus::ResourceNotFound // should not happen + } + } + } + } + + fn pending(&self, _readiness: Readiness) -> PendingStatus { + check_stream_ready(&self.stream) + } +} + +pub(crate) struct StreamLocalResource { + listener: UnixListener, + bind_path: PathBuf +} + +impl Resource for StreamLocalResource { + fn source(&mut self) -> &mut dyn Source { + &mut self.listener + } +} + +impl Drop for StreamLocalResource { + fn drop(&mut self) { + // this may fail if the file is already removed + match fs::remove_file(&self.bind_path) { + Ok(_) => (), + Err(err) => log::error!("Error removing unix socket file on drop: {}", err), + } + } +} + +impl Local for StreamLocalResource { + type Remote = StreamRemoteResource; + + fn listen_with(config: TransportListen, addr: SocketAddr) -> io::Result> { + let config = match config { + TransportListen::UnixStreamSocket(config) => config, + _ => panic!("Internal error: Got wrong config"), + }; + + // TODO: fallback to ip when we are able to set path to none + let listener = UnixListener::bind(&config.path)?; + let local_addr = listener.local_addr()?; + Ok(ListeningInfo { + local: Self { + listener, + bind_path: config.path + }, + // same issue as above my change in https://github.com/tokio-rs/mio/pull/1749 + // relevant issue https://github.com/tokio-rs/mio/issues/1527 + local_addr: create_null_socketaddr(), + }) + } + + fn accept(&self, mut accept_remote: impl FnMut(AcceptedType<'_, Self::Remote>)) { + loop { + match self.listener.accept() { + Ok((stream, addr)) => accept_remote(AcceptedType::Remote( + create_null_socketaddr(), // TODO: provide correct address + StreamRemoteResource { stream }, + )), + Err(ref err) if err.kind() == ErrorKind::WouldBlock => break, + Err(ref err) if err.kind() == ErrorKind::Interrupted => continue, + Err(err) => break log::error!("unix socket accept error: {}", err), // Should not happen + } + } + } + + // nearly impossible to implement + // fn send_to(&self, addr: SocketAddr, data: &[u8]) -> SendStatus { + // + // } +} + +pub(crate) struct DatagramRemoteResource { + datagram: UnixDatagram +} + +impl Resource for DatagramRemoteResource { + fn source(&mut self) -> &mut dyn Source { + &mut self.datagram + } +} + +impl Remote for DatagramRemoteResource { + fn connect_with( + config: TransportConnect, + remote_addr: RemoteAddr, + ) -> io::Result> { + let config = match config { + TransportConnect::UnixSocketDatagram(config) => config, + _ => panic!("Internal error: Got wrong config"), + }; + + let datagram = UnixDatagram::unbound()?; + datagram.connect(config.path)?; + + Ok(ConnectionInfo { + local_addr: create_null_socketaddr(), + peer_addr: create_null_socketaddr(), + remote: Self { + datagram + } + }) + } + + // A majority of send, reciev and accept in local are reused from udp.rs due to similarities + + fn receive(&self, mut process_data: impl FnMut(&[u8])) -> ReadStatus { + let buffer: MaybeUninit<[u8; MAX_PAYLOAD_LEN]> = MaybeUninit::uninit(); + let mut input_buffer = unsafe { buffer.assume_init() }; // Avoid to initialize the array + + loop { + match self.datagram.recv(&mut input_buffer) { + Ok(size) => process_data(&mut input_buffer[..size]), + Err(ref err) if err.kind() == ErrorKind::WouldBlock => { + break ReadStatus::WaitNextEvent + } + Err(ref err) if err.kind() == ErrorKind::ConnectionRefused => { + // Avoid ICMP generated error to be logged + break ReadStatus::WaitNextEvent + } + Err(err) => { + log::error!("unix datagram socket receive error: {}", err); + break ReadStatus::WaitNextEvent // Should not happen + } + } + } + } + + fn send(&self, data: &[u8]) -> SendStatus { + loop { + match self.datagram.send(data) { + Ok(_) => break SendStatus::Sent, + // Avoid ICMP generated error to be logged + Err(ref err) if err.kind() == ErrorKind::ConnectionRefused => { + break SendStatus::ResourceNotFound + } + Err(ref err) if err.kind() == ErrorKind::WouldBlock => continue, + Err(ref err) if err.kind() == ErrorKind::Other => { + break SendStatus::MaxPacketSizeExceeded + } + Err(err) => { + log::error!("unix datagram socket send error: {}", err); + break SendStatus::ResourceNotFound // should not happen + } + } + } + } + + fn pending(&self, readiness: Readiness) -> PendingStatus { + PendingStatus::Ready + } +} +// datagram is also used for listener +pub(crate) struct DatagramLocalResource { + listener: UnixDatagram, + bind_path: PathBuf +} + +impl Resource for DatagramLocalResource { + fn source(&mut self) -> &mut dyn Source { + &mut self.listener + } +} + + +impl Local for DatagramLocalResource { + type Remote = DatagramRemoteResource; + + fn listen_with(config: TransportListen, addr: SocketAddr) -> io::Result> { + let config = match config { + TransportListen::UnixDatagramSocket(config) => config, + _ => panic!("Internal error: Got wrong config"), + }; + + let listener = UnixDatagram::bind(&config.path)?; + + Ok(ListeningInfo { + local: Self { + listener, + bind_path: config.path + }, + local_addr: create_null_socketaddr(), + }) + } + + fn accept(&self, mut accept_remote: impl FnMut(AcceptedType<'_, Self::Remote>)) { + let buffer: MaybeUninit<[u8; MAX_PAYLOAD_LEN]> = MaybeUninit::uninit(); + let mut input_buffer = unsafe { buffer.assume_init() }; // Avoid to initialize the array + + loop { + match self.listener.recv_from(&mut input_buffer) { + Ok((size, addr)) => { + let data = &mut input_buffer[..size]; + accept_remote(AcceptedType::Data(create_null_socketaddr(), data)) + } + Err(ref err) if err.kind() == ErrorKind::WouldBlock => break, + Err(err) => break log::error!("Unix datagram socket accept error: {}", err), // Should never happen + }; + } + } +} + +impl Drop for DatagramLocalResource { + fn drop(&mut self) { + // this may fail if the file is already removed + match fs::remove_file(&self.bind_path) { + Ok(_) => (), + Err(err) => log::error!("Error removing unix socket file on drop: {}", err), + } + } +} + +pub(crate) struct UnixSocketDatagramAdapter; +impl Adapter for UnixSocketDatagramAdapter { + type Remote = DatagramRemoteResource; + type Local = DatagramLocalResource; +} diff --git a/src/network/transport.rs b/src/network/transport.rs index b4d424ad..e569271b 100644 --- a/src/network/transport.rs +++ b/src/network/transport.rs @@ -6,6 +6,10 @@ use crate::adapters::tcp::{TcpAdapter, TcpConnectConfig, TcpListenConfig}; use crate::adapters::framed_tcp::{FramedTcpAdapter, FramedTcpConnectConfig, FramedTcpListenConfig}; #[cfg(feature = "udp")] use crate::adapters::udp::{self, UdpAdapter, UdpConnectConfig, UdpListenConfig}; +#[cfg(feature = "unixsocket")] +use crate::adapters::unix_socket::{self, UnixSocketConnectConfig, UnixSocketDatagramAdapter, UnixSocketStreamAdapter}; +#[cfg(feature = "unixsocket")] +use crate::adapters::unix_socket::UnixSocketListenConfig; #[cfg(feature = "websocket")] use crate::adapters::ws::{self, WsAdapter}; @@ -48,6 +52,12 @@ pub enum Transport { /// websocket with the following uri: `ws://{SocketAddr}/message-io-default`. #[cfg(feature = "websocket")] Ws, + /// Unix Socket protocol (available through the *unixsocket* feature). + /// To be used on systems that support it. + #[cfg(feature = "unixsocket")] + UnixSocketStream, + #[cfg(feature = "unixsocket")] + UnixSocketDatagram, } impl Transport { @@ -63,6 +73,10 @@ impl Transport { Self::Udp => loader.mount(self.id(), UdpAdapter), #[cfg(feature = "websocket")] Self::Ws => loader.mount(self.id(), WsAdapter), + #[cfg(feature = "unixsocket")] + Self::UnixSocketStream => loader.mount(self.id(), UnixSocketStreamAdapter), + #[cfg(feature = "unixsocket")] + Self::UnixSocketDatagram => loader.mount(self.id(), UnixSocketDatagramAdapter), }; } @@ -82,6 +96,10 @@ impl Transport { Self::Udp => udp::MAX_LOCAL_PAYLOAD_LEN, #[cfg(feature = "websocket")] Self::Ws => ws::MAX_PAYLOAD_LEN, + #[cfg(feature = "unixsocket")] + Self::UnixSocketStream => usize::MAX, + #[cfg(feature = "unixsocket")] + Self::UnixSocketDatagram => unix_socket::MAX_PAYLOAD_LEN, } } @@ -97,6 +115,10 @@ impl Transport { Transport::Udp => false, #[cfg(feature = "websocket")] Transport::Ws => true, + #[cfg(feature = "unixsocket")] + Transport::UnixSocketStream => true, + #[cfg(feature = "unixsocket")] + Transport::UnixSocketDatagram => false, } } @@ -116,6 +138,10 @@ impl Transport { Transport::Udp => true, #[cfg(feature = "websocket")] Transport::Ws => true, + #[cfg(feature = "unixsocket")] + Self::UnixSocketStream => false, + #[cfg(feature = "unixsocket")] + Self::UnixSocketDatagram => true, } } @@ -131,6 +157,10 @@ impl Transport { Transport::Udp => 2, #[cfg(feature = "websocket")] Transport::Ws => 3, + #[cfg(feature = "unixsocket")] + Transport::UnixSocketStream => 4, + #[cfg(feature = "unixsocket")] + Transport::UnixSocketDatagram => 5, } } } @@ -146,6 +176,10 @@ impl From for Transport { 2 => Transport::Udp, #[cfg(feature = "websocket")] 3 => Transport::Ws, + #[cfg(feature = "unixsocket")] + 4 => Transport::UnixSocketStream, + #[cfg(feature = "unixsocket")] + 5 => Transport::UnixSocketDatagram, _ => panic!("Not available transport"), } } @@ -167,6 +201,10 @@ pub enum TransportConnect { Udp(UdpConnectConfig), #[cfg(feature = "websocket")] Ws, + #[cfg(feature = "unixsocket")] + UnixSocketStream(UnixSocketConnectConfig), + #[cfg(feature = "unixsocket")] + UnixSocketDatagram(UnixSocketConnectConfig), } impl TransportConnect { @@ -180,6 +218,10 @@ impl TransportConnect { Self::Udp(_) => Transport::Udp, #[cfg(feature = "websocket")] Self::Ws => Transport::Ws, + #[cfg(feature = "unixsocket")] + Self::UnixSocketStream(_) => Transport::UnixSocketStream, + #[cfg(feature = "unixsocket")] + Self::UnixSocketDatagram(_) => Transport::UnixSocketDatagram, }; transport.id() @@ -197,6 +239,10 @@ impl From for TransportConnect { Transport::Udp => Self::Udp(UdpConnectConfig::default()), #[cfg(feature = "websocket")] Transport::Ws => Self::Ws, + #[cfg(feature = "unixsocket")] + Transport::UnixSocketStream => Self::UnixSocketStream(UnixSocketConnectConfig::default()), + #[cfg(feature = "unixsocket")] + Transport::UnixSocketDatagram => Self::UnixSocketDatagram(UnixSocketConnectConfig::default()), } } } @@ -211,6 +257,10 @@ pub enum TransportListen { Udp(UdpListenConfig), #[cfg(feature = "websocket")] Ws, + #[cfg(feature = "unixsocket")] + UnixStreamSocket(UnixSocketListenConfig), + #[cfg(feature = "unixsocket")] + UnixDatagramSocket(UnixSocketListenConfig), } impl TransportListen { @@ -224,6 +274,10 @@ impl TransportListen { Self::Udp(_) => Transport::Udp, #[cfg(feature = "websocket")] Self::Ws => Transport::Ws, + #[cfg(feature = "unixsocket")] + Self::UnixStreamSocket(_) => Transport::UnixSocketStream, + #[cfg(feature = "unixsocket")] + Self::UnixDatagramSocket(_) => Transport::UnixSocketDatagram, }; transport.id() @@ -241,6 +295,10 @@ impl From for TransportListen { Transport::Udp => Self::Udp(UdpListenConfig::default()), #[cfg(feature = "websocket")] Transport::Ws => Self::Ws, + #[cfg(feature = "unixsocket")] + Transport::UnixSocketStream => Self::UnixStreamSocket(UnixSocketListenConfig::default()), + #[cfg(feature = "unixsocket")] + Transport::UnixSocketDatagram => Self::UnixDatagramSocket(UnixSocketListenConfig::default()), } } }