Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: Add unix socket support? #172

Draft
wants to merge 8 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,11 @@ maintenance = { status = "actively-developed" }
all-features = true

[features]
default = ["tcp", "udp", "websocket"] # All features by default
default = ["tcp", "udp", "websocket"] # All features that are crossplatform by default
tcp = ["mio/net", "socket2"]
udp = ["mio/net", "socket2"]
websocket = ["tungstenite", "url", "tcp"]
unixsocket = ["mio/net", "socket2"] # TODO: check if this is really needed

[dependencies]
mio = { version = "0.8", features = ["os-poll"] }
Expand Down
3 changes: 3 additions & 0 deletions examples/throughput/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@ fn main() {
throughput_message_io(Transport::Tcp, CHUNK);
throughput_message_io(Transport::FramedTcp, CHUNK);
throughput_message_io(Transport::Ws, CHUNK);
// for platforms that support it
#[cfg(feature = "unixsocket")]
throughput_message_io(Transport::UnixSocket, CHUNK);
println!();
throughput_native_udp(CHUNK);
throughput_native_tcp(CHUNK);
Expand Down
2 changes: 2 additions & 0 deletions src/adapters.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,10 @@
#[cfg(feature = "tcp")]
pub mod framed_tcp;
#[cfg(feature = "udp")]
pub mod udp;

Check failure on line 8 in src/adapters.rs

View workflow job for this annotation

GitHub Actions / nightly on macOS-latest-

Diff in /Users/runner/work/message-io/message-io/src/adapters.rs
#[cfg(feature = "websocket")]
pub mod ws;
#[cfg(feature = "unixsocket" )]
pub mod unix_socket;
// Add new adapters here
// ...
212 changes: 212 additions & 0 deletions src/adapters/unix_socket.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,212 @@
#![allow(unused_variables)]

use crate::network::adapter::{
Resource, Remote, Local, Adapter, SendStatus, AcceptedType, ReadStatus, ConnectionInfo,
ListeningInfo, PendingStatus,
};
use crate::network::{RemoteAddr, Readiness, TransportConnect, TransportListen};

use mio::event::{Source};
use mio::net::{UnixListener, UnixStream};

use std::mem::MaybeUninit;
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
use std::io::{self, ErrorKind, Read, Write};
use std::ops::Deref;
use std::path::{Path, PathBuf};

// Note: net.core.rmem_max = 212992 by default on linux systems
// not used because w euse unixstream I think?
// TODO: delete this if I PR
pub const MAX_PAYLOAD_LEN: usize = 212992;

/// From tcp.rs
/// Size of the internal reading buffer.
/// It implies that at most the generated [`crate::network::NetEvent::Message`]
/// will contains a chunk of data of this value.
pub const INPUT_BUFFER_SIZE: usize = u16::MAX as usize; // 2^16 - 1

Check failure on line 28 in src/adapters/unix_socket.rs

View workflow job for this annotation

GitHub Actions / nightly on macOS-latest-

Diff in /Users/runner/work/message-io/message-io/src/adapters/unix_socket.rs
// We don't use the SocketAddr, we just striaght up get the path from config.
pub fn create_null_socketaddr() -> SocketAddr {
SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0,0,0,0)), 0)
}

#[derive(Clone, PartialEq, Eq, Hash, Debug)]
pub struct UnixSocketListenConfig {
path: PathBuf,
}

impl Default for UnixSocketListenConfig {
fn default() -> Self {
// TODO: better idea? I could make this into an option later and complain if empty.
Self { path: "/tmp/mio.sock".into() }
}
}

#[derive(Clone, PartialEq, Eq, Hash, Debug)]
pub struct UnixSocketConnectConfig {
path: PathBuf,
}

impl Default for UnixSocketConnectConfig {
fn default() -> Self {
// TODO: better idea? I could make this into an option later and complain if empty.
Self { path: "/tmp/mio.sock".into() }
}
}

pub(crate) struct UnixSocketAdapter;
impl Adapter for UnixSocketAdapter {
type Remote = RemoteResource;
type Local = LocalResource;
}

Check failure on line 62 in src/adapters/unix_socket.rs

View workflow job for this annotation

GitHub Actions / nightly on macOS-latest-

Diff in /Users/runner/work/message-io/message-io/src/adapters/unix_socket.rs

pub(crate) struct RemoteResource {
stream: UnixStream
}

impl Resource for RemoteResource {
fn source(&mut self) -> &mut dyn Source {
&mut self.stream
}
}

Check failure on line 72 in src/adapters/unix_socket.rs

View workflow job for this annotation

GitHub Actions / nightly on macOS-latest-

Diff in /Users/runner/work/message-io/message-io/src/adapters/unix_socket.rs

// taken from tcp impl
pub fn check_stream_ready(stream: &UnixStream) -> PendingStatus{
if let Ok(Some(_)) = stream.take_error() {
return PendingStatus::Disconnected;
}

return PendingStatus::Ready;
}

impl Remote for RemoteResource {
fn connect_with(
config: TransportConnect,

Check failure on line 85 in src/adapters/unix_socket.rs

View workflow job for this annotation

GitHub Actions / nightly on macOS-latest-

Diff in /Users/runner/work/message-io/message-io/src/adapters/unix_socket.rs
remote_addr: RemoteAddr,
) -> io::Result<ConnectionInfo<Self>> {

let stream_config = match config {
TransportConnect::UnixSocket(config) => config,
_ => panic!("Internal error: Got wrong config"),
};

Check failure on line 92 in src/adapters/unix_socket.rs

View workflow job for this annotation

GitHub Actions / nightly on macOS-latest-

Diff in /Users/runner/work/message-io/message-io/src/adapters/unix_socket.rs

match UnixStream::connect(stream_config.path) {
Ok(stream) => {
Ok(ConnectionInfo {
remote: Self {

Check failure on line 97 in src/adapters/unix_socket.rs

View workflow job for this annotation

GitHub Actions / nightly on macOS-latest-

Diff in /Users/runner/work/message-io/message-io/src/adapters/unix_socket.rs
stream
},
// the unixstream uses SocketAddr from mio that can't be converted
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Out of curiosity, what then identifies an unix socket?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

local_addr and peer_addr are currently a placeholder value, the unix sockets seem to have a totally different local and peer addr format.

local_addr: create_null_socketaddr(), // stream.local_addr()?,
peer_addr: create_null_socketaddr() // stream.peer_addr()?.into(),
})
},
Err(err) => {
return Err(err);
},
}


}

fn receive(&self, mut process_data: impl FnMut(&[u8])) -> ReadStatus {
// Most of this is reused from tcp.rs
let buffer: MaybeUninit<[u8; INPUT_BUFFER_SIZE]> = MaybeUninit::uninit();
let mut input_buffer = unsafe { buffer.assume_init() }; // Avoid to initialize the array

loop {
let stream = &self.stream;
match stream.deref().read(&mut input_buffer) {
Ok(0) => break ReadStatus::Disconnected,
Ok(size) => process_data(&input_buffer[..size]),
Err(ref err) if err.kind() == ErrorKind::Interrupted => continue,
Err(ref err) if err.kind() == ErrorKind::WouldBlock => {
break ReadStatus::WaitNextEvent
}
Err(ref err) if err.kind() == ErrorKind::ConnectionReset => {
break ReadStatus::Disconnected
}
Err(err) => {
log::error!("Unix socket receive error: {}", err);
break ReadStatus::Disconnected // should not happen
}
}
}
}

fn send(&self, data: &[u8]) -> SendStatus {
// Most of this is reused from tcp.rs
let mut total_bytes_sent = 0;
loop {
let stream = &self.stream;
match stream.deref().write(&data[total_bytes_sent..]) {
Ok(bytes_sent) => {
total_bytes_sent += bytes_sent;
if total_bytes_sent == data.len() {
break SendStatus::Sent
}
}
Err(ref err) if err.kind() == io::ErrorKind::WouldBlock => continue,

// Others errors are considered fatal for the connection.
// a Event::Disconnection will be generated later.
Err(err) => {
log::error!("unix socket receive error: {}", err);
break SendStatus::ResourceNotFound // should not happen
}
}
}
}

fn pending(&self, _readiness: Readiness) -> PendingStatus {
check_stream_ready(&self.stream)
}
}

Check failure on line 165 in src/adapters/unix_socket.rs

View workflow job for this annotation

GitHub Actions / nightly on macOS-latest-

Diff in /Users/runner/work/message-io/message-io/src/adapters/unix_socket.rs

pub(crate) struct LocalResource {
listener: UnixListener
}

impl Resource for LocalResource {
fn source(&mut self) -> &mut dyn Source {
&mut self.listener
}
}

impl Local for LocalResource {
type Remote = RemoteResource;

fn listen_with(config: TransportListen, addr: SocketAddr) -> io::Result<ListeningInfo<Self>> {
let config = match config {
TransportListen::UnixSocket(config) => config,
_ => panic!("Internal error: Got wrong config"),
};

// TODO: fallback to ip when we are able to set path to none
let listener = UnixListener::bind(config.path)?;

Check failure on line 187 in src/adapters/unix_socket.rs

View workflow job for this annotation

GitHub Actions / nightly on macOS-latest-

Diff in /Users/runner/work/message-io/message-io/src/adapters/unix_socket.rs
let local_addr = listener.local_addr()?;
Ok(ListeningInfo {
local: Self {
listener
},
// same issue as above my change in https://github.com/tokio-rs/mio/pull/1749
// relevant issue https://github.com/tokio-rs/mio/issues/1527
local_addr: create_null_socketaddr(),
})
}

fn accept(&self, mut accept_remote: impl FnMut(AcceptedType<'_, Self::Remote>)) {
loop {
match self.listener.accept() {
Ok((stream, addr)) => accept_remote(AcceptedType::Remote(
create_null_socketaddr(), // TODO: provide correct address
RemoteResource { stream },
)),
Err(ref err) if err.kind() == ErrorKind::WouldBlock => break,
Err(ref err) if err.kind() == ErrorKind::Interrupted => continue,
Err(err) => break log::error!("unix socket accept error: {}", err), // Should not happen
}
}
}
}
31 changes: 31 additions & 0 deletions src/network/transport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@
use crate::adapters::framed_tcp::{FramedTcpAdapter, FramedTcpConnectConfig, FramedTcpListenConfig};
#[cfg(feature = "udp")]
use crate::adapters::udp::{self, UdpAdapter, UdpConnectConfig, UdpListenConfig};
use crate::adapters::unix_socket::{UnixSocketAdapter, UnixSocketConnectConfig};

Check failure on line 9 in src/network/transport.rs

View workflow job for this annotation

GitHub Actions / Build (i686-unknown-linux-gnu)

unresolved import `crate::adapters::unix_socket`

Check failure on line 9 in src/network/transport.rs

View workflow job for this annotation

GitHub Actions / stable on macOS-latest-

unresolved import `crate::adapters::unix_socket`
#[cfg(feature = "unixsocket")]
use crate::adapters::unix_socket::UnixSocketListenConfig;
#[cfg(feature = "websocket")]
use crate::adapters::ws::{self, WsAdapter};

Expand Down Expand Up @@ -48,6 +51,10 @@
/// websocket with the following uri: `ws://{SocketAddr}/message-io-default`.
#[cfg(feature = "websocket")]
Ws,
/// Unix Socket protocol (available through the *unixsocket* feature).
/// To be used on systems that support it.
#[cfg(feature = "unixsocket")]
UnixSocket,
}

impl Transport {
Expand All @@ -63,6 +70,8 @@
Self::Udp => loader.mount(self.id(), UdpAdapter),
#[cfg(feature = "websocket")]
Self::Ws => loader.mount(self.id(), WsAdapter),
#[cfg(feature = "unixsocket")]
Self::UnixSocket => loader.mount(self.id(), UnixSocketAdapter),
};
}

Expand All @@ -82,6 +91,8 @@
Self::Udp => udp::MAX_LOCAL_PAYLOAD_LEN,
#[cfg(feature = "websocket")]
Self::Ws => ws::MAX_PAYLOAD_LEN,
#[cfg(feature = "unixsocket")]
Self::UnixSocket => usize::MAX,
}
}

Expand All @@ -95,8 +106,10 @@
Transport::FramedTcp => true,
#[cfg(feature = "udp")]
Transport::Udp => false,
#[cfg(feature = "websocket")]

Check failure on line 109 in src/network/transport.rs

View workflow job for this annotation

GitHub Actions / nightly on macOS-latest-

Diff in /Users/runner/work/message-io/message-io/src/network/transport.rs
Transport::Ws => true,
#[cfg(feature = "unixsocket")]
Self::UnixSocket => true
}
}

Expand All @@ -116,6 +129,8 @@
Transport::Udp => true,
#[cfg(feature = "websocket")]
Transport::Ws => true,
#[cfg(feature = "unixsocket")]
Self::UnixSocket => false
}
}

Expand All @@ -131,6 +146,8 @@
Transport::Udp => 2,
#[cfg(feature = "websocket")]
Transport::Ws => 3,
#[cfg(feature = "unixsocket")]
Transport::UnixSocket => 4,
}
}
}
Expand All @@ -146,6 +163,8 @@
2 => Transport::Udp,
#[cfg(feature = "websocket")]
3 => Transport::Ws,
#[cfg(feature = "unixsocket")]
4 => Transport::UnixSocket,
_ => panic!("Not available transport"),
}
}
Expand All @@ -167,6 +186,8 @@
Udp(UdpConnectConfig),
#[cfg(feature = "websocket")]
Ws,
#[cfg(feature = "unixsocket")]
UnixSocket(UnixSocketConnectConfig),
}

impl TransportConnect {
Expand All @@ -180,6 +201,8 @@
Self::Udp(_) => Transport::Udp,
#[cfg(feature = "websocket")]
Self::Ws => Transport::Ws,
#[cfg(feature = "unixsocket")]
Self::UnixSocket(_) => Transport::UnixSocket
};

transport.id()
Expand All @@ -197,6 +220,8 @@
Transport::Udp => Self::Udp(UdpConnectConfig::default()),
#[cfg(feature = "websocket")]
Transport::Ws => Self::Ws,
#[cfg(feature = "unixsocket")]
Transport::UnixSocket => Self::UnixSocket(UnixSocketConnectConfig::default()),
}
}
}
Expand All @@ -211,6 +236,8 @@
Udp(UdpListenConfig),
#[cfg(feature = "websocket")]
Ws,
#[cfg(feature = "unixsocket")]
UnixSocket(UnixSocketListenConfig),
}

impl TransportListen {
Expand All @@ -224,6 +251,8 @@
Self::Udp(_) => Transport::Udp,
#[cfg(feature = "websocket")]
Self::Ws => Transport::Ws,
#[cfg(feature = "unixsocket")]
Self::UnixSocket(_) => Transport::UnixSocket,
};

transport.id()
Expand All @@ -241,6 +270,8 @@
Transport::Udp => Self::Udp(UdpListenConfig::default()),
#[cfg(feature = "websocket")]
Transport::Ws => Self::Ws,
#[cfg(feature = "unixsocket")]
Transport::UnixSocket => Self::UnixSocket(UnixSocketListenConfig::default())
}
}
}
Loading