Skip to content

Commit

Permalink
Reduced websocket latency (#73)
Browse files Browse the repository at this point in the history
* Reduced websocket latency

* Windows fix
  • Loading branch information
lemunozm authored Apr 13, 2021
1 parent a7056a1 commit 0bafb31
Show file tree
Hide file tree
Showing 6 changed files with 49 additions and 29 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/rust.yml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ jobs:
- rust: stable
can-fail: false
- rust: nightly
can-fail: true
can-fail: false
steps:
- name: Checkout Repository
uses: actions/checkout@v1
Expand Down
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
# Changelog

## Release 0.12.2
- Reduced *WebSocket* latency.

## Release 0.12.1
- *WebSocket* now can returns when send correctly a `SendStatus::MaxPacketSizeExceeded` instead of `ResourceNotFound` if the packet size is exceeded.
- *UDP* has increases the packet size when send.
Expand Down
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "message-io"
version = "0.12.1"
version = "0.12.2"
authors = ["lemunozm <[email protected]>"]
edition = "2018"
readme = "README.md"
Expand Down
62 changes: 41 additions & 21 deletions src/adapters/ws.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,30 +110,37 @@ impl Remote for RemoteResource {
// "emulates" full duplex for the websocket case locking here and not outside the loop.
let mut state = self.state.lock().expect(OTHER_THREAD_ERR);
match state.deref_mut() {
RemoteState::WebSocket(web_socket) => {
match web_socket.read_message() {
Ok(message) => match message {
Message::Binary(data) => {
// We can not call process_data while the socket is blocked.
// The user could lock it again if sends from the callback.
drop(state);
process_data(&data);
RemoteState::WebSocket(web_socket) => match web_socket.read_message() {
Ok(message) => match message {
Message::Binary(data) => {
// As an optimization.
// Fast check to know if there is more data to avoid call
// WebSocket::read_message() again.
// TODO: investigate why this code doesn't work in windows.
// Seems like windows consume the `WouldBlock` notification
// at peek() when it happens, and the poll never wakes it again.
#[cfg(not(target_os = "windows"))]
let _peek_result = web_socket.get_ref().peek(&mut [0; 0]);

// We can not call process_data while the socket is blocked.
// The user could lock it again if sends from the callback.
drop(state);
process_data(&data);

#[cfg(not(target_os = "windows"))]
if let Err(err) = _peek_result {
break Self::io_error_to_read_status(&err)
}
Message::Close(_) => break ReadStatus::Disconnected,
_ => continue,
},
Err(Error::Io(ref err)) if err.kind() == ErrorKind::WouldBlock => {
break ReadStatus::WaitNextEvent
}
Err(Error::Io(ref err)) if err.kind() == ErrorKind::ConnectionReset => {
break ReadStatus::Disconnected
}
Err(err) => {
log::error!("WS receive error: {}", err);
break ReadStatus::Disconnected // should not happen
}
Message::Close(_) => break ReadStatus::Disconnected,
_ => continue,
},
Err(Error::Io(ref err)) => break Self::io_error_to_read_status(err),
Err(err) => {
log::error!("WS receive error: {}", err);
break ReadStatus::Disconnected // should not happen
}
}
},
RemoteState::Handshake(handshake) => {
let current_handshake = handshake.take().unwrap();
match current_handshake.mid_handshake.handshake() {
Expand Down Expand Up @@ -191,6 +198,19 @@ impl RemoteResource {
}
}
}

fn io_error_to_read_status(err: &io::Error) -> ReadStatus {
if err.kind() == io::ErrorKind::WouldBlock {
ReadStatus::WaitNextEvent
}
else if err.kind() == io::ErrorKind::ConnectionReset {
ReadStatus::Disconnected
}
else {
log::error!("WS receive error: {}", err);
ReadStatus::Disconnected // should not happen
}
}
}

pub(crate) struct LocalResource {
Expand Down
7 changes: 2 additions & 5 deletions tests/integration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ const SMALL_MESSAGE: &'static str = "Integration test message";
const BIG_MESSAGE_SIZE: usize = 1024 * 1024 * 8; // 8MB

lazy_static::lazy_static! {
pub static ref TIMEOUT: Duration = Duration::from_secs(30);
pub static ref TIMEOUT: Duration = Duration::from_secs(60);
}

// Common error messages
Expand Down Expand Up @@ -179,10 +179,7 @@ fn start_burst_receiver(

let mut count = 0;
listener.for_each(move |event| match event {
NodeEvent::Signal(_) => std::panic::catch_unwind(|| {
panic!("{}", TIMEOUT_EVENT_RECV_ERR);
})
.unwrap(),
NodeEvent::Signal(_) => panic!("{}", TIMEOUT_EVENT_RECV_ERR),
NodeEvent::Network(net_event) => match net_event {
NetEvent::Message(_, data) => {
let expected_message = format!("{}: {}", SMALL_MESSAGE, count);
Expand Down

0 comments on commit 0bafb31

Please sign in to comment.