From a0bc09c497fb4113d21b5d2672a24b718a6ce08a Mon Sep 17 00:00:00 2001 From: Raymond <20248577+javaarchive@users.noreply.github.com> Date: Thu, 30 May 2024 23:04:55 -0700 Subject: [PATCH 1/8] it works I guess? --- Cargo.toml | 3 +- examples/throughput/main.rs | 3 + src/adapters.rs | 2 + src/adapters/unix_socket.rs | 212 ++++++++++++++++++++++++++++++++++++ src/network/transport.rs | 31 ++++++ 5 files changed, 250 insertions(+), 1 deletion(-) create mode 100644 src/adapters/unix_socket.rs diff --git a/Cargo.toml b/Cargo.toml index 5bfa19fb..8e55b38b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -18,10 +18,11 @@ maintenance = { status = "actively-developed" } all-features = true [features] -default = ["tcp", "udp", "websocket"] # All features by default +default = ["tcp", "udp", "websocket" ] # All features by default tcp = ["mio/net", "socket2"] udp = ["mio/net", "socket2"] websocket = ["tungstenite", "url", "tcp"] +unixsocket = ["mio/net", "socket2"] # TODO: check if this is really needed [dependencies] mio = { version = "0.8", features = ["os-poll"] } diff --git a/examples/throughput/main.rs b/examples/throughput/main.rs index 9d19022c..59d84ef2 100644 --- a/examples/throughput/main.rs +++ b/examples/throughput/main.rs @@ -25,6 +25,9 @@ fn main() { throughput_message_io(Transport::Tcp, CHUNK); throughput_message_io(Transport::FramedTcp, CHUNK); throughput_message_io(Transport::Ws, CHUNK); + // for platforms that support it + #[cfg(feature = "unixsocket")] + throughput_message_io(Transport::UnixSocket, CHUNK); println!(); throughput_native_udp(CHUNK); throughput_native_tcp(CHUNK); diff --git a/src/adapters.rs b/src/adapters.rs index d5fcede7..06cbe447 100644 --- a/src/adapters.rs +++ b/src/adapters.rs @@ -8,5 +8,7 @@ pub mod framed_tcp; pub mod udp; #[cfg(feature = "websocket")] pub mod ws; +#[cfg(feature = "unixsocket" )] +pub mod unix_socket; // Add new adapters here // ... diff --git a/src/adapters/unix_socket.rs b/src/adapters/unix_socket.rs new file mode 100644 index 00000000..9660b990 --- /dev/null +++ b/src/adapters/unix_socket.rs @@ -0,0 +1,212 @@ +#![allow(unused_variables)] + +use crate::network::adapter::{ + Resource, Remote, Local, Adapter, SendStatus, AcceptedType, ReadStatus, ConnectionInfo, + ListeningInfo, PendingStatus, +}; +use crate::network::{RemoteAddr, Readiness, TransportConnect, TransportListen}; + +use mio::event::{Source}; +use mio::net::{UnixListener, UnixStream}; + +use std::mem::MaybeUninit; +use std::net::{IpAddr, Ipv4Addr, SocketAddr}; +use std::io::{self, ErrorKind, Read, Write}; +use std::ops::Deref; +use std::path::{Path, PathBuf}; + +// Note: net.core.rmem_max = 212992 by default on linux systems +// not used because w euse unixstream I think? +// TODO: delete this if I PR +pub const MAX_PAYLOAD_LEN: usize = 212992; + +/// From tcp.rs +/// Size of the internal reading buffer. +/// It implies that at most the generated [`crate::network::NetEvent::Message`] +/// will contains a chunk of data of this value. +pub const INPUT_BUFFER_SIZE: usize = u16::MAX as usize; // 2^16 - 1 + +// We don't use the SocketAddr, we just striaght up get the path from config. +pub fn create_null_socketaddr() -> SocketAddr { + SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0,0,0,0)), 0) +} + +#[derive(Clone, PartialEq, Eq, Hash, Debug)] +pub struct UnixSocketListenConfig { + path: PathBuf, +} + +impl Default for UnixSocketListenConfig { + fn default() -> Self { + // TODO: better idea? I could make this into an option later and complain if empty. + Self { path: "/tmp/mio.sock".into() } + } +} + +#[derive(Clone, PartialEq, Eq, Hash, Debug)] +pub struct UnixSocketConnectConfig { + path: PathBuf, +} + +impl Default for UnixSocketConnectConfig { + fn default() -> Self { + // TODO: better idea? I could make this into an option later and complain if empty. + Self { path: "/tmp/mio.sock".into() } + } +} + +pub(crate) struct UnixSocketAdapter; +impl Adapter for UnixSocketAdapter { + type Remote = RemoteResource; + type Local = LocalResource; +} + +pub(crate) struct RemoteResource { + stream: UnixStream +} + +impl Resource for RemoteResource { + fn source(&mut self) -> &mut dyn Source { + &mut self.stream + } +} + +// taken from tcp impl +pub fn check_stream_ready(stream: &UnixStream) -> PendingStatus{ + if let Ok(Some(_)) = stream.take_error() { + return PendingStatus::Disconnected; + } + + return PendingStatus::Ready; +} + +impl Remote for RemoteResource { + fn connect_with( + config: TransportConnect, + remote_addr: RemoteAddr, + ) -> io::Result> { + + let stream_config = match config { + TransportConnect::UnixSocket(config) => config, + _ => panic!("Internal error: Got wrong config"), + }; + + match UnixStream::connect(stream_config.path) { + Ok(stream) => { + Ok(ConnectionInfo { + remote: Self { + stream + }, + // the unixstream uses SocketAddr from mio that can't be converted + local_addr: create_null_socketaddr(), // stream.local_addr()?, + peer_addr: create_null_socketaddr() // stream.peer_addr()?.into(), + }) + }, + Err(err) => { + return Err(err); + }, + } + + + } + + fn receive(&self, mut process_data: impl FnMut(&[u8])) -> ReadStatus { + // Most of this is reused from tcp.rs + let buffer: MaybeUninit<[u8; INPUT_BUFFER_SIZE]> = MaybeUninit::uninit(); + let mut input_buffer = unsafe { buffer.assume_init() }; // Avoid to initialize the array + + loop { + let stream = &self.stream; + match stream.deref().read(&mut input_buffer) { + Ok(0) => break ReadStatus::Disconnected, + Ok(size) => process_data(&input_buffer[..size]), + Err(ref err) if err.kind() == ErrorKind::Interrupted => continue, + Err(ref err) if err.kind() == ErrorKind::WouldBlock => { + break ReadStatus::WaitNextEvent + } + Err(ref err) if err.kind() == ErrorKind::ConnectionReset => { + break ReadStatus::Disconnected + } + Err(err) => { + log::error!("Unix socket receive error: {}", err); + break ReadStatus::Disconnected // should not happen + } + } + } + } + + fn send(&self, data: &[u8]) -> SendStatus { + // Most of this is reused from tcp.rs + let mut total_bytes_sent = 0; + loop { + let stream = &self.stream; + match stream.deref().write(&data[total_bytes_sent..]) { + Ok(bytes_sent) => { + total_bytes_sent += bytes_sent; + if total_bytes_sent == data.len() { + break SendStatus::Sent + } + } + Err(ref err) if err.kind() == io::ErrorKind::WouldBlock => continue, + + // Others errors are considered fatal for the connection. + // a Event::Disconnection will be generated later. + Err(err) => { + log::error!("unix socket receive error: {}", err); + break SendStatus::ResourceNotFound // should not happen + } + } + } + } + + fn pending(&self, _readiness: Readiness) -> PendingStatus { + check_stream_ready(&self.stream) + } +} + +pub(crate) struct LocalResource { + listener: UnixListener +} + +impl Resource for LocalResource { + fn source(&mut self) -> &mut dyn Source { + &mut self.listener + } +} + +impl Local for LocalResource { + type Remote = RemoteResource; + + fn listen_with(config: TransportListen, addr: SocketAddr) -> io::Result> { + let config = match config { + TransportListen::UnixSocket(config) => config, + _ => panic!("Internal error: Got wrong config"), + }; + + // TODO: fallback to ip when we are able to set path to none + let listener = UnixListener::bind(config.path)?; + let local_addr = listener.local_addr()?; + Ok(ListeningInfo { + local: Self { + listener + }, + // same issue as above my change in https://github.com/tokio-rs/mio/pull/1749 + // relevant issue https://github.com/tokio-rs/mio/issues/1527 + local_addr: create_null_socketaddr(), + }) + } + + fn accept(&self, mut accept_remote: impl FnMut(AcceptedType<'_, Self::Remote>)) { + loop { + match self.listener.accept() { + Ok((stream, addr)) => accept_remote(AcceptedType::Remote( + create_null_socketaddr(), // TODO: provide correct address + RemoteResource { stream }, + )), + Err(ref err) if err.kind() == ErrorKind::WouldBlock => break, + Err(ref err) if err.kind() == ErrorKind::Interrupted => continue, + Err(err) => break log::error!("unix socket accept error: {}", err), // Should not happen + } + } + } +} diff --git a/src/network/transport.rs b/src/network/transport.rs index b4d424ad..b916a4d3 100644 --- a/src/network/transport.rs +++ b/src/network/transport.rs @@ -6,6 +6,9 @@ use crate::adapters::tcp::{TcpAdapter, TcpConnectConfig, TcpListenConfig}; use crate::adapters::framed_tcp::{FramedTcpAdapter, FramedTcpConnectConfig, FramedTcpListenConfig}; #[cfg(feature = "udp")] use crate::adapters::udp::{self, UdpAdapter, UdpConnectConfig, UdpListenConfig}; +use crate::adapters::unix_socket::{UnixSocketAdapter, UnixSocketConnectConfig}; +#[cfg(feature = "unixsocket")] +use crate::adapters::unix_socket::UnixSocketListenConfig; #[cfg(feature = "websocket")] use crate::adapters::ws::{self, WsAdapter}; @@ -48,6 +51,10 @@ pub enum Transport { /// websocket with the following uri: `ws://{SocketAddr}/message-io-default`. #[cfg(feature = "websocket")] Ws, + /// Unix Socket protocol (available through the *unixsocket* feature). + /// To be used on systems that support it. + #[cfg(feature = "unixsocket")] + UnixSocket, } impl Transport { @@ -63,6 +70,8 @@ impl Transport { Self::Udp => loader.mount(self.id(), UdpAdapter), #[cfg(feature = "websocket")] Self::Ws => loader.mount(self.id(), WsAdapter), + #[cfg(feature = "unixsocket")] + Self::UnixSocket => loader.mount(self.id(), UnixSocketAdapter), }; } @@ -82,6 +91,8 @@ impl Transport { Self::Udp => udp::MAX_LOCAL_PAYLOAD_LEN, #[cfg(feature = "websocket")] Self::Ws => ws::MAX_PAYLOAD_LEN, + #[cfg(feature = "unixsocket")] + Self::UnixSocket => usize::MAX, } } @@ -97,6 +108,8 @@ impl Transport { Transport::Udp => false, #[cfg(feature = "websocket")] Transport::Ws => true, + #[cfg(feature = "unixsocket")] + Self::UnixSocket => true } } @@ -116,6 +129,8 @@ impl Transport { Transport::Udp => true, #[cfg(feature = "websocket")] Transport::Ws => true, + #[cfg(feature = "unixsocket")] + Self::UnixSocket => false } } @@ -131,6 +146,8 @@ impl Transport { Transport::Udp => 2, #[cfg(feature = "websocket")] Transport::Ws => 3, + #[cfg(feature = "unixsocket")] + Transport::UnixSocket => 4, } } } @@ -146,6 +163,8 @@ impl From for Transport { 2 => Transport::Udp, #[cfg(feature = "websocket")] 3 => Transport::Ws, + #[cfg(feature = "unixsocket")] + 4 => Transport::UnixSocket, _ => panic!("Not available transport"), } } @@ -167,6 +186,8 @@ pub enum TransportConnect { Udp(UdpConnectConfig), #[cfg(feature = "websocket")] Ws, + #[cfg(feature = "unixsocket")] + UnixSocket(UnixSocketConnectConfig), } impl TransportConnect { @@ -180,6 +201,8 @@ impl TransportConnect { Self::Udp(_) => Transport::Udp, #[cfg(feature = "websocket")] Self::Ws => Transport::Ws, + #[cfg(feature = "unixsocket")] + Self::UnixSocket(_) => Transport::UnixSocket }; transport.id() @@ -197,6 +220,8 @@ impl From for TransportConnect { Transport::Udp => Self::Udp(UdpConnectConfig::default()), #[cfg(feature = "websocket")] Transport::Ws => Self::Ws, + #[cfg(feature = "unixsocket")] + Transport::UnixSocket => Self::UnixSocket(UnixSocketConnectConfig::default()), } } } @@ -211,6 +236,8 @@ pub enum TransportListen { Udp(UdpListenConfig), #[cfg(feature = "websocket")] Ws, + #[cfg(feature = "unixsocket")] + UnixSocket(UnixSocketListenConfig), } impl TransportListen { @@ -224,6 +251,8 @@ impl TransportListen { Self::Udp(_) => Transport::Udp, #[cfg(feature = "websocket")] Self::Ws => Transport::Ws, + #[cfg(feature = "unixsocket")] + Self::UnixSocket(_) => Transport::UnixSocket, }; transport.id() @@ -241,6 +270,8 @@ impl From for TransportListen { Transport::Udp => Self::Udp(UdpListenConfig::default()), #[cfg(feature = "websocket")] Transport::Ws => Self::Ws, + #[cfg(feature = "unixsocket")] + Transport::UnixSocket => Self::UnixSocket(UnixSocketListenConfig::default()) } } } From fd26ed67254ca451e5fef108c3a1eaa2e7fba51f Mon Sep 17 00:00:00 2001 From: Raymond <20248577+javaarchive@users.noreply.github.com> Date: Thu, 30 May 2024 23:22:17 -0700 Subject: [PATCH 2/8] e --- Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Cargo.toml b/Cargo.toml index 8e55b38b..34c903bb 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -18,7 +18,7 @@ maintenance = { status = "actively-developed" } all-features = true [features] -default = ["tcp", "udp", "websocket" ] # All features by default +default = ["tcp", "udp", "websocket"] # All features that are crossplatform by default tcp = ["mio/net", "socket2"] udp = ["mio/net", "socket2"] websocket = ["tungstenite", "url", "tcp"] From 1b416ece67078a315d745f6bdbd943be635cca62 Mon Sep 17 00:00:00 2001 From: Raymond <20248577+javaarchive@users.noreply.github.com> Date: Fri, 31 May 2024 11:46:54 -0700 Subject: [PATCH 3/8] Store bind path and delete on drop --- Cargo.toml | 2 +- src/adapters/unix_socket.rs | 18 ++++++++++++++++-- 2 files changed, 17 insertions(+), 3 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 34c903bb..62b6daf6 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -18,7 +18,7 @@ maintenance = { status = "actively-developed" } all-features = true [features] -default = ["tcp", "udp", "websocket"] # All features that are crossplatform by default +default = ["tcp", "udp", "websocket", "unixsocket"] # All features that are crossplatform by default tcp = ["mio/net", "socket2"] udp = ["mio/net", "socket2"] websocket = ["tungstenite", "url", "tcp"] diff --git a/src/adapters/unix_socket.rs b/src/adapters/unix_socket.rs index 9660b990..36707563 100644 --- a/src/adapters/unix_socket.rs +++ b/src/adapters/unix_socket.rs @@ -14,6 +14,7 @@ use std::net::{IpAddr, Ipv4Addr, SocketAddr}; use std::io::{self, ErrorKind, Read, Write}; use std::ops::Deref; use std::path::{Path, PathBuf}; +use std::fs; // Note: net.core.rmem_max = 212992 by default on linux systems // not used because w euse unixstream I think? @@ -165,7 +166,8 @@ impl Remote for RemoteResource { } pub(crate) struct LocalResource { - listener: UnixListener + listener: UnixListener, + bind_path: PathBuf } impl Resource for LocalResource { @@ -174,6 +176,16 @@ impl Resource for LocalResource { } } +impl Drop for LocalResource { + fn drop(&mut self) { + // this may fail if the file is already removed + match fs::remove_file(&self.bind_path) { + Ok(_) => (), + Err(err) => log::error!("Error removing unix socket file on drop: {}", err), + } + } +} + impl Local for LocalResource { type Remote = RemoteResource; @@ -184,11 +196,13 @@ impl Local for LocalResource { }; // TODO: fallback to ip when we are able to set path to none + let path_copy = config.path.clone(); let listener = UnixListener::bind(config.path)?; let local_addr = listener.local_addr()?; Ok(ListeningInfo { local: Self { - listener + listener, + bind_path: path_copy }, // same issue as above my change in https://github.com/tokio-rs/mio/pull/1749 // relevant issue https://github.com/tokio-rs/mio/issues/1527 From 1779a4cc5bbd4c7a45ca233fd59f692ae56e5a5b Mon Sep 17 00:00:00 2001 From: Raymond <20248577+javaarchive@users.noreply.github.com> Date: Fri, 31 May 2024 21:51:49 -0700 Subject: [PATCH 4/8] Don't enable unix socket by default (some platforms may not support). --- Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Cargo.toml b/Cargo.toml index 62b6daf6..34c903bb 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -18,7 +18,7 @@ maintenance = { status = "actively-developed" } all-features = true [features] -default = ["tcp", "udp", "websocket", "unixsocket"] # All features that are crossplatform by default +default = ["tcp", "udp", "websocket"] # All features that are crossplatform by default tcp = ["mio/net", "socket2"] udp = ["mio/net", "socket2"] websocket = ["tungstenite", "url", "tcp"] From d9a6af846e3f6373e48f72a3bce6627d3cd2f90d Mon Sep 17 00:00:00 2001 From: Raymond <20248577+javaarchive@users.noreply.github.com> Date: Fri, 31 May 2024 22:43:35 -0700 Subject: [PATCH 5/8] config funcs to construct --- src/adapters/unix_socket.rs | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/src/adapters/unix_socket.rs b/src/adapters/unix_socket.rs index 36707563..4c3c43bd 100644 --- a/src/adapters/unix_socket.rs +++ b/src/adapters/unix_socket.rs @@ -37,6 +37,12 @@ pub struct UnixSocketListenConfig { path: PathBuf, } +impl UnixSocketListenConfig { + pub fn new(path: PathBuf) -> Self { + Self { path } + } +} + impl Default for UnixSocketListenConfig { fn default() -> Self { // TODO: better idea? I could make this into an option later and complain if empty. @@ -49,6 +55,13 @@ pub struct UnixSocketConnectConfig { path: PathBuf, } +impl UnixSocketConnectConfig { + pub fn new(path: PathBuf) -> Self { + Self { path } + } +} + + impl Default for UnixSocketConnectConfig { fn default() -> Self { // TODO: better idea? I could make this into an option later and complain if empty. From ed219dadc008f787b2385ed376d48c6179be3ba8 Mon Sep 17 00:00:00 2001 From: Raymond <20248577+javaarchive@users.noreply.github.com> Date: Sun, 2 Jun 2024 16:54:46 -0700 Subject: [PATCH 6/8] quick barely tested impl of datagram --- examples/throughput/main.rs | 4 +- src/adapters/unix_socket.rs | 183 ++++++++++++++++++++++++++++++++---- src/network/transport.rs | 54 ++++++++--- 3 files changed, 207 insertions(+), 34 deletions(-) diff --git a/examples/throughput/main.rs b/examples/throughput/main.rs index 59d84ef2..0df82f44 100644 --- a/examples/throughput/main.rs +++ b/examples/throughput/main.rs @@ -27,7 +27,9 @@ fn main() { throughput_message_io(Transport::Ws, CHUNK); // for platforms that support it #[cfg(feature = "unixsocket")] - throughput_message_io(Transport::UnixSocket, CHUNK); + throughput_message_io(Transport::UnixSocketStream, CHUNK); + #[cfg(feature = "unixsocket")] + throughput_message_io(Transport::UnixSocketDatagram, CHUNK); println!(); throughput_native_udp(CHUNK); throughput_native_tcp(CHUNK); diff --git a/src/adapters/unix_socket.rs b/src/adapters/unix_socket.rs index 4c3c43bd..84726557 100644 --- a/src/adapters/unix_socket.rs +++ b/src/adapters/unix_socket.rs @@ -7,7 +7,7 @@ use crate::network::adapter::{ use crate::network::{RemoteAddr, Readiness, TransportConnect, TransportListen}; use mio::event::{Source}; -use mio::net::{UnixListener, UnixStream}; +use mio::net::{UnixDatagram, UnixListener, UnixStream}; use std::mem::MaybeUninit; use std::net::{IpAddr, Ipv4Addr, SocketAddr}; @@ -69,17 +69,17 @@ impl Default for UnixSocketConnectConfig { } } -pub(crate) struct UnixSocketAdapter; -impl Adapter for UnixSocketAdapter { - type Remote = RemoteResource; - type Local = LocalResource; +pub(crate) struct UnixSocketStreamAdapter; +impl Adapter for UnixSocketStreamAdapter { + type Remote = StreamRemoteResource; + type Local = StreamLocalResource; } -pub(crate) struct RemoteResource { +pub(crate) struct StreamRemoteResource { stream: UnixStream } -impl Resource for RemoteResource { +impl Resource for StreamRemoteResource { fn source(&mut self) -> &mut dyn Source { &mut self.stream } @@ -94,14 +94,14 @@ pub fn check_stream_ready(stream: &UnixStream) -> PendingStatus{ return PendingStatus::Ready; } -impl Remote for RemoteResource { +impl Remote for StreamRemoteResource { fn connect_with( config: TransportConnect, remote_addr: RemoteAddr, ) -> io::Result> { let stream_config = match config { - TransportConnect::UnixSocket(config) => config, + TransportConnect::UnixSocketStream(config) => config, _ => panic!("Internal error: Got wrong config"), }; @@ -178,18 +178,18 @@ impl Remote for RemoteResource { } } -pub(crate) struct LocalResource { +pub(crate) struct StreamLocalResource { listener: UnixListener, bind_path: PathBuf } -impl Resource for LocalResource { +impl Resource for StreamLocalResource { fn source(&mut self) -> &mut dyn Source { &mut self.listener } } -impl Drop for LocalResource { +impl Drop for StreamLocalResource { fn drop(&mut self) { // this may fail if the file is already removed match fs::remove_file(&self.bind_path) { @@ -199,23 +199,22 @@ impl Drop for LocalResource { } } -impl Local for LocalResource { - type Remote = RemoteResource; +impl Local for StreamLocalResource { + type Remote = StreamRemoteResource; fn listen_with(config: TransportListen, addr: SocketAddr) -> io::Result> { let config = match config { - TransportListen::UnixSocket(config) => config, + TransportListen::UnixStreamSocket(config) => config, _ => panic!("Internal error: Got wrong config"), }; // TODO: fallback to ip when we are able to set path to none - let path_copy = config.path.clone(); - let listener = UnixListener::bind(config.path)?; + let listener = UnixListener::bind(&config.path)?; let local_addr = listener.local_addr()?; Ok(ListeningInfo { local: Self { listener, - bind_path: path_copy + bind_path: config.path }, // same issue as above my change in https://github.com/tokio-rs/mio/pull/1749 // relevant issue https://github.com/tokio-rs/mio/issues/1527 @@ -228,7 +227,7 @@ impl Local for LocalResource { match self.listener.accept() { Ok((stream, addr)) => accept_remote(AcceptedType::Remote( create_null_socketaddr(), // TODO: provide correct address - RemoteResource { stream }, + StreamRemoteResource { stream }, )), Err(ref err) if err.kind() == ErrorKind::WouldBlock => break, Err(ref err) if err.kind() == ErrorKind::Interrupted => continue, @@ -237,3 +236,149 @@ impl Local for LocalResource { } } } + +pub(crate) struct DatagramRemoteResource { + datagram: UnixDatagram +} + +impl Resource for DatagramRemoteResource { + fn source(&mut self) -> &mut dyn Source { + &mut self.datagram + } +} + +impl Remote for DatagramRemoteResource { + fn connect_with( + config: TransportConnect, + remote_addr: RemoteAddr, + ) -> io::Result> { + let config = match config { + TransportConnect::UnixSocketDatagram(config) => config, + _ => panic!("Internal error: Got wrong config"), + }; + + let datagram = UnixDatagram::unbound()?; + datagram.connect(config.path)?; + + Ok(ConnectionInfo { + local_addr: create_null_socketaddr(), + peer_addr: create_null_socketaddr(), + remote: Self { + datagram + } + }) + } + + // A majority of send, reciev and accept in local are reused from udp.rs due to similarities + + fn receive(&self, mut process_data: impl FnMut(&[u8])) -> ReadStatus { + let buffer: MaybeUninit<[u8; MAX_PAYLOAD_LEN]> = MaybeUninit::uninit(); + let mut input_buffer = unsafe { buffer.assume_init() }; // Avoid to initialize the array + + loop { + match self.datagram.recv(&mut input_buffer) { + Ok(size) => process_data(&mut input_buffer[..size]), + Err(ref err) if err.kind() == ErrorKind::WouldBlock => { + break ReadStatus::WaitNextEvent + } + Err(ref err) if err.kind() == ErrorKind::ConnectionRefused => { + // Avoid ICMP generated error to be logged + break ReadStatus::WaitNextEvent + } + Err(err) => { + log::error!("unix datagram socket receive error: {}", err); + break ReadStatus::WaitNextEvent // Should not happen + } + } + } + } + + fn send(&self, data: &[u8]) -> SendStatus { + loop { + match self.datagram.send(data) { + Ok(_) => break SendStatus::Sent, + // Avoid ICMP generated error to be logged + Err(ref err) if err.kind() == ErrorKind::ConnectionRefused => { + break SendStatus::ResourceNotFound + } + Err(ref err) if err.kind() == ErrorKind::WouldBlock => continue, + Err(ref err) if err.kind() == ErrorKind::Other => { + break SendStatus::MaxPacketSizeExceeded + } + Err(err) => { + log::error!("unix datagram socket send error: {}", err); + break SendStatus::ResourceNotFound // should not happen + } + } + } + } + + fn pending(&self, readiness: Readiness) -> PendingStatus { + PendingStatus::Ready + } +} +// datagram is also used for listener +pub(crate) struct DatagramLocalResource { + listener: UnixDatagram, + bind_path: PathBuf +} + +impl Resource for DatagramLocalResource { + fn source(&mut self) -> &mut dyn Source { + &mut self.listener + } +} + + +impl Local for DatagramLocalResource { + type Remote = DatagramRemoteResource; + + fn listen_with(config: TransportListen, addr: SocketAddr) -> io::Result> { + let config = match config { + TransportListen::UnixDatagramSocket(config) => config, + _ => panic!("Internal error: Got wrong config"), + }; + + let listener = UnixDatagram::bind(&config.path)?; + + Ok(ListeningInfo { + local: Self { + listener, + bind_path: config.path + }, + local_addr: create_null_socketaddr(), + }) + } + + fn accept(&self, mut accept_remote: impl FnMut(AcceptedType<'_, Self::Remote>)) { + let buffer: MaybeUninit<[u8; MAX_PAYLOAD_LEN]> = MaybeUninit::uninit(); + let mut input_buffer = unsafe { buffer.assume_init() }; // Avoid to initialize the array + + loop { + match self.listener.recv_from(&mut input_buffer) { + Ok((size, addr)) => { + let data = &mut input_buffer[..size]; + accept_remote(AcceptedType::Data(create_null_socketaddr(), data)) + } + Err(ref err) if err.kind() == ErrorKind::WouldBlock => break, + Err(err) => break log::error!("Unix datagram socket accept error: {}", err), // Should never happen + }; + } + } +} + +impl Drop for DatagramLocalResource { + fn drop(&mut self) { + // this may fail if the file is already removed + match fs::remove_file(&self.bind_path) { + Ok(_) => (), + Err(err) => log::error!("Error removing unix socket file on drop: {}", err), + } + } +} + +pub(crate) struct UnixSocketDatagramAdapter; +impl Adapter for UnixSocketDatagramAdapter { + type Remote = DatagramRemoteResource; + type Local = DatagramLocalResource; +} diff --git a/src/network/transport.rs b/src/network/transport.rs index b916a4d3..d8774243 100644 --- a/src/network/transport.rs +++ b/src/network/transport.rs @@ -6,7 +6,7 @@ use crate::adapters::tcp::{TcpAdapter, TcpConnectConfig, TcpListenConfig}; use crate::adapters::framed_tcp::{FramedTcpAdapter, FramedTcpConnectConfig, FramedTcpListenConfig}; #[cfg(feature = "udp")] use crate::adapters::udp::{self, UdpAdapter, UdpConnectConfig, UdpListenConfig}; -use crate::adapters::unix_socket::{UnixSocketAdapter, UnixSocketConnectConfig}; +use crate::adapters::unix_socket::{self, UnixSocketConnectConfig, UnixSocketDatagramAdapter, UnixSocketStreamAdapter}; #[cfg(feature = "unixsocket")] use crate::adapters::unix_socket::UnixSocketListenConfig; #[cfg(feature = "websocket")] @@ -54,7 +54,9 @@ pub enum Transport { /// Unix Socket protocol (available through the *unixsocket* feature). /// To be used on systems that support it. #[cfg(feature = "unixsocket")] - UnixSocket, + UnixSocketStream, + #[cfg(feature = "unixsocket")] + UnixSocketDatagram, } impl Transport { @@ -71,7 +73,9 @@ impl Transport { #[cfg(feature = "websocket")] Self::Ws => loader.mount(self.id(), WsAdapter), #[cfg(feature = "unixsocket")] - Self::UnixSocket => loader.mount(self.id(), UnixSocketAdapter), + Self::UnixSocketStream => loader.mount(self.id(), UnixSocketStreamAdapter), + #[cfg(feature = "unixsocket")] + Self::UnixSocketDatagram => loader.mount(self.id(), UnixSocketDatagramAdapter), }; } @@ -92,7 +96,9 @@ impl Transport { #[cfg(feature = "websocket")] Self::Ws => ws::MAX_PAYLOAD_LEN, #[cfg(feature = "unixsocket")] - Self::UnixSocket => usize::MAX, + Self::UnixSocketStream => usize::MAX, + #[cfg(feature = "unixsocket")] + Self::UnixSocketDatagram => unix_socket::MAX_PAYLOAD_LEN, } } @@ -109,7 +115,9 @@ impl Transport { #[cfg(feature = "websocket")] Transport::Ws => true, #[cfg(feature = "unixsocket")] - Self::UnixSocket => true + Transport::UnixSocketStream => true, + #[cfg(feature = "unixsocket")] + Transport::UnixSocketDatagram => false, } } @@ -130,7 +138,9 @@ impl Transport { #[cfg(feature = "websocket")] Transport::Ws => true, #[cfg(feature = "unixsocket")] - Self::UnixSocket => false + Self::UnixSocketStream => false, + #[cfg(feature = "unixsocket")] + Self::UnixSocketDatagram => true, } } @@ -147,7 +157,9 @@ impl Transport { #[cfg(feature = "websocket")] Transport::Ws => 3, #[cfg(feature = "unixsocket")] - Transport::UnixSocket => 4, + Transport::UnixSocketStream => 4, + #[cfg(feature = "unixsocket")] + Transport::UnixSocketDatagram => 5, } } } @@ -164,7 +176,9 @@ impl From for Transport { #[cfg(feature = "websocket")] 3 => Transport::Ws, #[cfg(feature = "unixsocket")] - 4 => Transport::UnixSocket, + 4 => Transport::UnixSocketStream, + #[cfg(feature = "unixsocket")] + 5 => Transport::UnixSocketDatagram, _ => panic!("Not available transport"), } } @@ -187,7 +201,9 @@ pub enum TransportConnect { #[cfg(feature = "websocket")] Ws, #[cfg(feature = "unixsocket")] - UnixSocket(UnixSocketConnectConfig), + UnixSocketStream(UnixSocketConnectConfig), + #[cfg(feature = "unixsocket")] + UnixSocketDatagram(UnixSocketConnectConfig), } impl TransportConnect { @@ -202,7 +218,9 @@ impl TransportConnect { #[cfg(feature = "websocket")] Self::Ws => Transport::Ws, #[cfg(feature = "unixsocket")] - Self::UnixSocket(_) => Transport::UnixSocket + Self::UnixSocketStream(_) => Transport::UnixSocketStream, + #[cfg(feature = "unixsocket")] + Self::UnixSocketDatagram(_) => Transport::UnixSocketDatagram, }; transport.id() @@ -221,7 +239,9 @@ impl From for TransportConnect { #[cfg(feature = "websocket")] Transport::Ws => Self::Ws, #[cfg(feature = "unixsocket")] - Transport::UnixSocket => Self::UnixSocket(UnixSocketConnectConfig::default()), + Transport::UnixSocketStream => Self::UnixSocketStream(UnixSocketConnectConfig::default()), + #[cfg(feature = "unixsocket")] + Transport::UnixSocketDatagram => Self::UnixSocketDatagram(UnixSocketConnectConfig::default()), } } } @@ -237,7 +257,9 @@ pub enum TransportListen { #[cfg(feature = "websocket")] Ws, #[cfg(feature = "unixsocket")] - UnixSocket(UnixSocketListenConfig), + UnixStreamSocket(UnixSocketListenConfig), + #[cfg(feature = "unixsocket")] + UnixDatagramSocket(UnixSocketListenConfig), } impl TransportListen { @@ -252,7 +274,9 @@ impl TransportListen { #[cfg(feature = "websocket")] Self::Ws => Transport::Ws, #[cfg(feature = "unixsocket")] - Self::UnixSocket(_) => Transport::UnixSocket, + Self::UnixStreamSocket(_) => Transport::UnixSocketStream, + #[cfg(feature = "unixsocket")] + Self::UnixDatagramSocket(_) => Transport::UnixSocketDatagram, }; transport.id() @@ -271,7 +295,9 @@ impl From for TransportListen { #[cfg(feature = "websocket")] Transport::Ws => Self::Ws, #[cfg(feature = "unixsocket")] - Transport::UnixSocket => Self::UnixSocket(UnixSocketListenConfig::default()) + Transport::UnixSocketStream => Self::UnixStreamSocket(UnixSocketListenConfig::default()), + #[cfg(feature = "unixsocket")] + Transport::UnixSocketDatagram => Self::UnixDatagramSocket(UnixSocketListenConfig::default()), } } } From e109950735a6f260de33c26e13cf92b21a1832b8 Mon Sep 17 00:00:00 2001 From: Raymond <20248577+javaarchive@users.noreply.github.com> Date: Sun, 2 Jun 2024 17:09:15 -0700 Subject: [PATCH 7/8] fix feature gate --- src/adapters.rs | 2 +- src/network/transport.rs | 1 + 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/src/adapters.rs b/src/adapters.rs index 06cbe447..93eaa4ea 100644 --- a/src/adapters.rs +++ b/src/adapters.rs @@ -11,4 +11,4 @@ pub mod ws; #[cfg(feature = "unixsocket" )] pub mod unix_socket; // Add new adapters here -// ... +// ... \ No newline at end of file diff --git a/src/network/transport.rs b/src/network/transport.rs index d8774243..e569271b 100644 --- a/src/network/transport.rs +++ b/src/network/transport.rs @@ -6,6 +6,7 @@ use crate::adapters::tcp::{TcpAdapter, TcpConnectConfig, TcpListenConfig}; use crate::adapters::framed_tcp::{FramedTcpAdapter, FramedTcpConnectConfig, FramedTcpListenConfig}; #[cfg(feature = "udp")] use crate::adapters::udp::{self, UdpAdapter, UdpConnectConfig, UdpListenConfig}; +#[cfg(feature = "unixsocket")] use crate::adapters::unix_socket::{self, UnixSocketConnectConfig, UnixSocketDatagramAdapter, UnixSocketStreamAdapter}; #[cfg(feature = "unixsocket")] use crate::adapters::unix_socket::UnixSocketListenConfig; From 2d263be24cd8f3eb72193e638ddb17b728cfa0cb Mon Sep 17 00:00:00 2001 From: Raymond <20248577+javaarchive@users.noreply.github.com> Date: Sun, 2 Jun 2024 20:10:23 -0700 Subject: [PATCH 8/8] mark datagram socket as broken --- src/adapters/unix_socket.rs | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/src/adapters/unix_socket.rs b/src/adapters/unix_socket.rs index 84726557..e8082f64 100644 --- a/src/adapters/unix_socket.rs +++ b/src/adapters/unix_socket.rs @@ -235,6 +235,11 @@ impl Local for StreamLocalResource { } } } + + // nearly impossible to implement + // fn send_to(&self, addr: SocketAddr, data: &[u8]) -> SendStatus { + // + // } } pub(crate) struct DatagramRemoteResource {