diff --git a/.github/workflows/rust.yml b/.github/workflows/rust.yml index 64f1ad3b..0c1bea85 100644 --- a/.github/workflows/rust.yml +++ b/.github/workflows/rust.yml @@ -24,7 +24,7 @@ jobs: - rust: stable can-fail: false - rust: nightly - can-fail: true + can-fail: false steps: - name: Checkout Repository uses: actions/checkout@v1 diff --git a/CHANGELOG.md b/CHANGELOG.md index badd59de..213b2117 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,8 @@ # Changelog +## Release 0.12.2 +- Reduced *WebSocket* latency. + ## Release 0.12.1 - *WebSocket* now can returns when send correctly a `SendStatus::MaxPacketSizeExceeded` instead of `ResourceNotFound` if the packet size is exceeded. - *UDP* has increases the packet size when send. diff --git a/Cargo.lock b/Cargo.lock index 092ca7e6..498338f0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -506,7 +506,7 @@ dependencies = [ [[package]] name = "message-io" -version = "0.12.1" +version = "0.12.2" dependencies = [ "bincode", "chrono", diff --git a/Cargo.toml b/Cargo.toml index 10e44f21..bbc4c7a1 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "message-io" -version = "0.12.1" +version = "0.12.2" authors = ["lemunozm "] edition = "2018" readme = "README.md" diff --git a/src/adapters/ws.rs b/src/adapters/ws.rs index 98036b6d..0506ec71 100644 --- a/src/adapters/ws.rs +++ b/src/adapters/ws.rs @@ -110,30 +110,37 @@ impl Remote for RemoteResource { // "emulates" full duplex for the websocket case locking here and not outside the loop. let mut state = self.state.lock().expect(OTHER_THREAD_ERR); match state.deref_mut() { - RemoteState::WebSocket(web_socket) => { - match web_socket.read_message() { - Ok(message) => match message { - Message::Binary(data) => { - // We can not call process_data while the socket is blocked. - // The user could lock it again if sends from the callback. - drop(state); - process_data(&data); + RemoteState::WebSocket(web_socket) => match web_socket.read_message() { + Ok(message) => match message { + Message::Binary(data) => { + // As an optimization. + // Fast check to know if there is more data to avoid call + // WebSocket::read_message() again. + // TODO: investigate why this code doesn't work in windows. + // Seems like windows consume the `WouldBlock` notification + // at peek() when it happens, and the poll never wakes it again. + #[cfg(not(target_os = "windows"))] + let _peek_result = web_socket.get_ref().peek(&mut [0; 0]); + + // We can not call process_data while the socket is blocked. + // The user could lock it again if sends from the callback. + drop(state); + process_data(&data); + + #[cfg(not(target_os = "windows"))] + if let Err(err) = _peek_result { + break Self::io_error_to_read_status(&err) } - Message::Close(_) => break ReadStatus::Disconnected, - _ => continue, - }, - Err(Error::Io(ref err)) if err.kind() == ErrorKind::WouldBlock => { - break ReadStatus::WaitNextEvent - } - Err(Error::Io(ref err)) if err.kind() == ErrorKind::ConnectionReset => { - break ReadStatus::Disconnected - } - Err(err) => { - log::error!("WS receive error: {}", err); - break ReadStatus::Disconnected // should not happen } + Message::Close(_) => break ReadStatus::Disconnected, + _ => continue, + }, + Err(Error::Io(ref err)) => break Self::io_error_to_read_status(err), + Err(err) => { + log::error!("WS receive error: {}", err); + break ReadStatus::Disconnected // should not happen } - } + }, RemoteState::Handshake(handshake) => { let current_handshake = handshake.take().unwrap(); match current_handshake.mid_handshake.handshake() { @@ -191,6 +198,19 @@ impl RemoteResource { } } } + + fn io_error_to_read_status(err: &io::Error) -> ReadStatus { + if err.kind() == io::ErrorKind::WouldBlock { + ReadStatus::WaitNextEvent + } + else if err.kind() == io::ErrorKind::ConnectionReset { + ReadStatus::Disconnected + } + else { + log::error!("WS receive error: {}", err); + ReadStatus::Disconnected // should not happen + } + } } pub(crate) struct LocalResource { diff --git a/tests/integration.rs b/tests/integration.rs index fe776637..92a87459 100644 --- a/tests/integration.rs +++ b/tests/integration.rs @@ -17,7 +17,7 @@ const SMALL_MESSAGE: &'static str = "Integration test message"; const BIG_MESSAGE_SIZE: usize = 1024 * 1024 * 8; // 8MB lazy_static::lazy_static! { - pub static ref TIMEOUT: Duration = Duration::from_secs(30); + pub static ref TIMEOUT: Duration = Duration::from_secs(60); } // Common error messages @@ -179,10 +179,7 @@ fn start_burst_receiver( let mut count = 0; listener.for_each(move |event| match event { - NodeEvent::Signal(_) => std::panic::catch_unwind(|| { - panic!("{}", TIMEOUT_EVENT_RECV_ERR); - }) - .unwrap(), + NodeEvent::Signal(_) => panic!("{}", TIMEOUT_EVENT_RECV_ERR), NodeEvent::Network(net_event) => match net_event { NetEvent::Message(_, data) => { let expected_message = format!("{}: {}", SMALL_MESSAGE, count);