From 4ce91c3de691dfd252fc64cb6820960ec89cb5bf Mon Sep 17 00:00:00 2001
From: lemunozm <lemunozm@gmail.com>
Date: Mon, 12 Apr 2021 20:53:48 +0200
Subject: [PATCH] Windows fix

---
 .github/workflows/rust.yml |  2 +-
 src/adapters/ws.rs         | 72 +++++++++++++++++++++-----------------
 tests/integration.rs       |  7 ++--
 3 files changed, 42 insertions(+), 39 deletions(-)

diff --git a/.github/workflows/rust.yml b/.github/workflows/rust.yml
index 64f1ad3b..0c1bea85 100644
--- a/.github/workflows/rust.yml
+++ b/.github/workflows/rust.yml
@@ -24,7 +24,7 @@ jobs:
           - rust: stable
             can-fail: false
           - rust: nightly
-            can-fail: true
+            can-fail: false
     steps:
     - name: Checkout Repository
       uses: actions/checkout@v1
diff --git a/src/adapters/ws.rs b/src/adapters/ws.rs
index f99347e1..0506ec71 100644
--- a/src/adapters/ws.rs
+++ b/src/adapters/ws.rs
@@ -110,44 +110,37 @@ impl Remote for RemoteResource {
             // "emulates" full duplex for the websocket case locking here and not outside the loop.
             let mut state = self.state.lock().expect(OTHER_THREAD_ERR);
             match state.deref_mut() {
-                RemoteState::WebSocket(web_socket) => {
-                    match web_socket.read_message() {
-                        Ok(message) => match message {
-                            Message::Binary(data) => {
-                                // As an optimization.
-                                // Fast check to knwo if there is more data to avoid call
-                                // WebSocker::read_message() again.
-                                let mut should_wait = false;
-                                if let Err(err) = web_socket.get_ref().peek(&mut [0; 0]) {
-                                    if err.kind() == ErrorKind::WouldBlock {
-                                        should_wait = true;
-                                    }
-                                }
+                RemoteState::WebSocket(web_socket) => match web_socket.read_message() {
+                    Ok(message) => match message {
+                        Message::Binary(data) => {
+                            // As an optimization.
+                            // Fast check to know if there is more data to avoid call
+                            // WebSocket::read_message() again.
+                            // TODO: investigate why this code doesn't work in windows.
+                            // Seems like windows consume the `WouldBlock` notification
+                            // at peek() when it happens, and the poll never wakes it again.
+                            #[cfg(not(target_os = "windows"))]
+                            let _peek_result = web_socket.get_ref().peek(&mut [0; 0]);
 
-                                // We can not call process_data while the socket is blocked.
-                                // The user could lock it again if sends from the callback.
-                                drop(state);
-                                process_data(&data);
+                            // We can not call process_data while the socket is blocked.
+                            // The user could lock it again if sends from the callback.
+                            drop(state);
+                            process_data(&data);
 
-                                if should_wait {
-                                    break ReadStatus::WaitNextEvent
-                                }
+                            #[cfg(not(target_os = "windows"))]
+                            if let Err(err) = _peek_result {
+                                break Self::io_error_to_read_status(&err)
                             }
-                            Message::Close(_) => break ReadStatus::Disconnected,
-                            _ => continue,
-                        },
-                        Err(Error::Io(ref err)) if err.kind() == ErrorKind::WouldBlock => {
-                            break ReadStatus::WaitNextEvent
-                        }
-                        Err(Error::Io(ref err)) if err.kind() == ErrorKind::ConnectionReset => {
-                            break ReadStatus::Disconnected
-                        }
-                        Err(err) => {
-                            log::error!("WS receive error: {}", err);
-                            break ReadStatus::Disconnected // should not happen
                         }
+                        Message::Close(_) => break ReadStatus::Disconnected,
+                        _ => continue,
+                    },
+                    Err(Error::Io(ref err)) => break Self::io_error_to_read_status(err),
+                    Err(err) => {
+                        log::error!("WS receive error: {}", err);
+                        break ReadStatus::Disconnected // should not happen
                     }
-                }
+                },
                 RemoteState::Handshake(handshake) => {
                     let current_handshake = handshake.take().unwrap();
                     match current_handshake.mid_handshake.handshake() {
@@ -205,6 +198,19 @@ impl RemoteResource {
             }
         }
     }
+
+    fn io_error_to_read_status(err: &io::Error) -> ReadStatus {
+        if err.kind() == io::ErrorKind::WouldBlock {
+            ReadStatus::WaitNextEvent
+        }
+        else if err.kind() == io::ErrorKind::ConnectionReset {
+            ReadStatus::Disconnected
+        }
+        else {
+            log::error!("WS receive error: {}", err);
+            ReadStatus::Disconnected // should not happen
+        }
+    }
 }
 
 pub(crate) struct LocalResource {
diff --git a/tests/integration.rs b/tests/integration.rs
index fe776637..92a87459 100644
--- a/tests/integration.rs
+++ b/tests/integration.rs
@@ -17,7 +17,7 @@ const SMALL_MESSAGE: &'static str = "Integration test message";
 const BIG_MESSAGE_SIZE: usize = 1024 * 1024 * 8; // 8MB
 
 lazy_static::lazy_static! {
-    pub static ref TIMEOUT: Duration = Duration::from_secs(30);
+    pub static ref TIMEOUT: Duration = Duration::from_secs(60);
 }
 
 // Common error messages
@@ -179,10 +179,7 @@ fn start_burst_receiver(
 
         let mut count = 0;
         listener.for_each(move |event| match event {
-            NodeEvent::Signal(_) => std::panic::catch_unwind(|| {
-                panic!("{}", TIMEOUT_EVENT_RECV_ERR);
-            })
-            .unwrap(),
+            NodeEvent::Signal(_) => panic!("{}", TIMEOUT_EVENT_RECV_ERR),
             NodeEvent::Network(net_event) => match net_event {
                 NetEvent::Message(_, data) => {
                     let expected_message = format!("{}: {}", SMALL_MESSAGE, count);