Skip to content

Commit

Permalink
Exposed adapter constants. Fixed sending UDP and WS sending methods f…
Browse files Browse the repository at this point in the history
…or packet limits (#71)
  • Loading branch information
lemunozm authored Apr 12, 2021
1 parent 957635c commit 224c1c6
Show file tree
Hide file tree
Showing 13 changed files with 88 additions and 122 deletions.
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,12 @@
# Changelog

## Release 0.12.1
- *WebSocket* now can returns when send correctly a `SendStatus::MaxPacketSizeExceeded` instead of `ResourceNotFound` if the packet size is exceeded.
- *UDP* has increases the packet size when send.
Now more bytes per packet can be sent if the OS let it.
- Exported some adapter constants dependent.
- `Transport::max_message_size()` now represents the teorical maximum size (see its related docs).

## Release 0.12.0
- Node concept: `NodeHandler` and `NodeListener`.
- Non-mutable and shared network operations.
Expand Down
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "message-io"
version = "0.12.0"
version = "0.12.1"
authors = ["lemunozm <[email protected]>"]
edition = "2018"
readme = "README.md"
Expand Down
55 changes: 2 additions & 53 deletions benches/performance.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use message_io::network::{self, Transport, NetworkController, NetworkProcessor, Endpoint};
use message_io::util::thread::{NamespacedThread};

use criterion::{criterion_group, criterion_main, Criterion, BenchmarkId, Throughput};
use criterion::{criterion_group, criterion_main, Criterion};

use std::time::{Duration};
use std::sync::{
Expand Down Expand Up @@ -49,44 +49,6 @@ fn latency_by(c: &mut Criterion, transport: Transport) {
});
}

fn throughput_by(c: &mut Criterion, transport: Transport) {
let sizes = [1, 2, 4, 8, 16, 32, 64, 128]
.iter()
.map(|i| i * 1024)
.filter(|&size| size < transport.max_message_size());

for block_size in sizes {
let mut group = c.benchmark_group(format!("throughput by {}", transport));
group.throughput(Throughput::Bytes(block_size as u64));
group.bench_with_input(BenchmarkId::from_parameter(block_size), &block_size, |b, &size| {
let (controller, mut processor, endpoint) = init_connection(transport);

let thread_running = Arc::new(AtomicBool::new(true));
let running = thread_running.clone();
let (tx, rx) = std::sync::mpsc::channel();
let mut thread = NamespacedThread::spawn("perf-sender", move || {
let message = (0..size).map(|_| 0xFF).collect::<Vec<u8>>();
tx.send(()).unwrap(); // receiving thread ready
while running.load(Ordering::Relaxed) {
controller.send(endpoint, &message);
}
});
rx.recv().unwrap();

b.iter(|| {
// FIX IT:
// Because the sender do not stop sends, the receiver has always data.
// This means that only one poll event is generated for all messages, and
// process_poll_event will call the callback continuously without ends.
processor.process_poll_event(Some(*TIMEOUT), |_| ());
});

thread_running.store(true, Ordering::Relaxed);
thread.join();
});
}
}

fn latency(c: &mut Criterion) {
#[cfg(feature = "udp")]
latency_by(c, Transport::Udp);
Expand All @@ -98,18 +60,5 @@ fn latency(c: &mut Criterion) {
latency_by(c, Transport::Ws);
}

#[allow(dead_code)] //TODO: remove when the throughput test works fine
fn throughput(c: &mut Criterion) {
#[cfg(feature = "udp")]
throughput_by(c, Transport::Udp);
// TODO: Fix this test: How to read inside of criterion iter() an stream protocol?
// #[cfg(feature = "tcp")]
// throughput_by(c, Transport::Tcp);
#[cfg(feature = "tcp")]
throughput_by(c, Transport::FramedTcp);
#[cfg(feature = "websocket")]
throughput_by(c, Transport::Ws);
}

criterion_group!(benches, latency /*throughput*/,);
criterion_group!(benches, latency);
criterion_main!(benches);
2 changes: 1 addition & 1 deletion src/adapters.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,6 @@ pub mod framed_tcp;
#[cfg(feature = "udp")]
pub mod udp;
#[cfg(feature = "websocket")]
pub mod web_socket;
pub mod ws;
// Add new adapters here
// ...
13 changes: 4 additions & 9 deletions src/adapters/framed_tcp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,20 +14,15 @@ use std::ops::{Deref};
use std::cell::{RefCell};
use std::mem::{MaybeUninit};

const INPUT_BUFFER_SIZE: usize = 65535; // 2^16 - 1
const INPUT_BUFFER_SIZE: usize = u16::MAX as usize; // 2^16 - 1

/// The max packet value for tcp.
/// Although this size is very high, it is preferred send data in smaller chunks with a rate
/// to not saturate the receiver thread in the endpoint.
pub const MAX_TCP_PAYLOAD_LEN: usize = usize::MAX;

pub struct FramedTcpAdapter;
pub(crate) struct FramedTcpAdapter;
impl Adapter for FramedTcpAdapter {
type Remote = RemoteResource;
type Local = LocalResource;
}

pub struct RemoteResource {
pub(crate) struct RemoteResource {
stream: TcpStream,
decoder: RefCell<Decoder>,
}
Expand Down Expand Up @@ -119,7 +114,7 @@ impl Remote for RemoteResource {
}
}

pub struct LocalResource {
pub(crate) struct LocalResource {
listener: TcpListener,
}

Expand Down
11 changes: 7 additions & 4 deletions src/adapters/tcp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,18 @@ use std::io::{self, ErrorKind, Read, Write};
use std::ops::{Deref};
use std::mem::{MaybeUninit};

pub const INPUT_BUFFER_SIZE: usize = 65535; // 2^16 - 1
/// Size of the internal reading buffer.
/// It implies that at most the generated [`crate::network::NetEvent::Message`]
/// will contains a chunk of data of this value.
pub const INPUT_BUFFER_SIZE: usize = u16::MAX as usize; // 2^16 - 1

pub struct TcpAdapter;
pub(crate) struct TcpAdapter;
impl Adapter for TcpAdapter {
type Remote = RemoteResource;
type Local = LocalResource;
}

pub struct RemoteResource {
pub(crate) struct RemoteResource {
stream: TcpStream,
}

Expand Down Expand Up @@ -97,7 +100,7 @@ impl Remote for RemoteResource {
}
}

pub struct LocalResource {
pub(crate) struct LocalResource {
listener: TcpListener,
}

Expand Down
6 changes: 3 additions & 3 deletions src/adapters/template.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,13 @@ use mio::event::{Source};
use std::net::{SocketAddr};
use std::io::{self};

pub struct MyAdapter;
pub(crate) struct MyAdapter;
impl Adapter for MyAdapter {
type Remote = RemoteResource;
type Local = LocalResource;
}

pub struct RemoteResource;
pub(crate) struct RemoteResource;
impl Resource for RemoteResource {
fn source(&mut self) -> &mut dyn Source {
todo!();
Expand All @@ -38,7 +38,7 @@ impl Remote for RemoteResource {
}
}

pub struct LocalResource;
pub(crate) struct LocalResource;
impl Resource for LocalResource {
fn source(&mut self) -> &mut dyn Source {
todo!();
Expand Down
66 changes: 34 additions & 32 deletions src/adapters/udp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,23 +13,26 @@ use std::net::{SocketAddr, SocketAddrV4, Ipv4Addr};
use std::io::{self, ErrorKind};
use std::mem::{MaybeUninit};

/// Maximun payload that a UDP packet can send safety in main OS.
/// - 9216: MTU of the OS with the minimun MTU: OSX
/// - 20: max IP header
/// - 8: max udp header
/// The serialization of your message must not exceed this value.
pub const MAX_UDP_PAYLOAD_LEN: usize = 9216 - 20 - 8;
/// Maximun payload that UDP can send.
/// The following payload works on Linux and Windows, but overcome the MacOS limits.
/// To more safety limit, see: `MAX_COMPATIBLE_UDP_PAYLOAD_LEN`.
// - 20: max IP header
// - 8: max udp header
pub const MAX_PAYLOAD_LEN: usize = 65535 - 20 - 8;

// The reception buffer can reach the UDP standard size.
const INPUT_BUFFER_SIZE: usize = 65535 - 20 - 8;
/// Maximun payload that UDP can send safety in main OS.
// 9216: MTU of the OS with the minimun MTU: OSX
pub const MAX_COMPATIBLE_PAYLOAD_LEN: usize = 9216 - 20 - 8;

pub struct UdpAdapter;
const INPUT_BUFFER_SIZE: usize = 65535; // 2^16 - 1

pub(crate) struct UdpAdapter;
impl Adapter for UdpAdapter {
type Remote = RemoteResource;
type Local = LocalResource;
}

pub struct RemoteResource {
pub(crate) struct RemoteResource {
socket: UdpSocket,
}

Expand Down Expand Up @@ -75,7 +78,7 @@ impl Remote for RemoteResource {
}
}

pub struct LocalResource {
pub(crate) struct LocalResource {
socket: UdpSocket,
}

Expand Down Expand Up @@ -136,28 +139,27 @@ impl Drop for LocalResource {
}

fn send_packet(data: &[u8], send_method: impl Fn(&[u8]) -> io::Result<usize>) -> SendStatus {
if data.len() > MAX_UDP_PAYLOAD_LEN {
log::error!(
"The UDP message could not be sent because it exceeds the MTU. \
Current size: {}, MTU: {}",
data.len(),
MAX_UDP_PAYLOAD_LEN
);
SendStatus::MaxPacketSizeExceeded(data.len(), MAX_UDP_PAYLOAD_LEN)
}
else {
loop {
match send_method(data) {
Ok(_) => break SendStatus::Sent,
// Avoid ICMP generated error to be logged
Err(ref err) if err.kind() == ErrorKind::ConnectionRefused => {
break SendStatus::ResourceNotFound
}
Err(ref err) if err.kind() == ErrorKind::WouldBlock => continue,
Err(err) => {
log::error!("UDP send error: {}", err);
break SendStatus::ResourceNotFound // should not happen
loop {
match send_method(data) {
Ok(_) => break SendStatus::Sent,
// Avoid ICMP generated error to be logged
Err(ref err) if err.kind() == ErrorKind::ConnectionRefused => {
break SendStatus::ResourceNotFound
}
Err(ref err) if err.kind() == ErrorKind::WouldBlock => continue,
Err(ref err) if err.kind() == ErrorKind::Other => {
let expected_assumption = if data.len() > MAX_PAYLOAD_LEN {
MAX_PAYLOAD_LEN
}
else {
// e.g. MacOS do not support the MAX UDP MTU.
MAX_COMPATIBLE_PAYLOAD_LEN
};
break SendStatus::MaxPacketSizeExceeded(data.len(), expected_assumption)
}
Err(err) => {
log::error!("UDP send error: {}", err);
break SendStatus::ResourceNotFound // should not happen
}
}
}
Expand Down
11 changes: 7 additions & 4 deletions src/adapters/web_socket.rs → src/adapters/ws.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,9 @@ use std::ops::{DerefMut};

/// Max message size for default config
// From https://docs.rs/tungstenite/0.13.0/src/tungstenite/protocol/mod.rs.html#65
pub const MAX_WS_PAYLOAD_LEN: usize = 32 << 20;
pub const MAX_PAYLOAD_LEN: usize = 32 << 20;

pub struct WsAdapter;
pub(crate) struct WsAdapter;
impl Adapter for WsAdapter {
type Remote = RemoteResource;
type Local = LocalResource;
Expand All @@ -44,7 +44,7 @@ enum RemoteState {
Handshake(Option<PendingHandshake>),
}

pub struct RemoteResource {
pub(crate) struct RemoteResource {
state: Mutex<RemoteState>,
}

Expand Down Expand Up @@ -181,6 +181,9 @@ impl RemoteResource {
Err(Error::Io(ref err)) if err.kind() == ErrorKind::WouldBlock => {
result = web_socket.write_pending();
}
Err(Error::Capacity(_)) => {
break SendStatus::MaxPacketSizeExceeded(data.len(), MAX_PAYLOAD_LEN)
}
Err(err) => {
log::error!("WS send error: {}", err);
break SendStatus::ResourceNotFound // should not happen
Expand All @@ -190,7 +193,7 @@ impl RemoteResource {
}
}

pub struct LocalResource {
pub(crate) struct LocalResource {
listener: TcpListener,
}

Expand Down
4 changes: 3 additions & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@
// Tells rustdoc where is the README to compile and test the rust code found there
doc_comment::doctest!("../README.md");

mod adapters;
/// Adapter related information.
/// If some adapter has special values or configuration, it is specified here.
pub mod adapters;

/// Main API. Create connections, send and receive message, signals,...
pub mod node;
Expand Down
Loading

0 comments on commit 224c1c6

Please sign in to comment.