Skip to content

Commit

Permalink
Windows fix
Browse files Browse the repository at this point in the history
  • Loading branch information
lemunozm committed Apr 13, 2021
1 parent 6826887 commit 4ce91c3
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 39 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/rust.yml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ jobs:
- rust: stable
can-fail: false
- rust: nightly
can-fail: true
can-fail: false
steps:
- name: Checkout Repository
uses: actions/checkout@v1
Expand Down
72 changes: 39 additions & 33 deletions src/adapters/ws.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,44 +110,37 @@ impl Remote for RemoteResource {
// "emulates" full duplex for the websocket case locking here and not outside the loop.
let mut state = self.state.lock().expect(OTHER_THREAD_ERR);
match state.deref_mut() {
RemoteState::WebSocket(web_socket) => {
match web_socket.read_message() {
Ok(message) => match message {
Message::Binary(data) => {
// As an optimization.
// Fast check to knwo if there is more data to avoid call
// WebSocker::read_message() again.
let mut should_wait = false;
if let Err(err) = web_socket.get_ref().peek(&mut [0; 0]) {
if err.kind() == ErrorKind::WouldBlock {
should_wait = true;
}
}
RemoteState::WebSocket(web_socket) => match web_socket.read_message() {
Ok(message) => match message {
Message::Binary(data) => {
// As an optimization.
// Fast check to know if there is more data to avoid call
// WebSocket::read_message() again.
// TODO: investigate why this code doesn't work in windows.
// Seems like windows consume the `WouldBlock` notification
// at peek() when it happens, and the poll never wakes it again.
#[cfg(not(target_os = "windows"))]
let _peek_result = web_socket.get_ref().peek(&mut [0; 0]);

// We can not call process_data while the socket is blocked.
// The user could lock it again if sends from the callback.
drop(state);
process_data(&data);
// We can not call process_data while the socket is blocked.
// The user could lock it again if sends from the callback.
drop(state);
process_data(&data);

if should_wait {
break ReadStatus::WaitNextEvent
}
#[cfg(not(target_os = "windows"))]
if let Err(err) = _peek_result {
break Self::io_error_to_read_status(&err)
}
Message::Close(_) => break ReadStatus::Disconnected,
_ => continue,
},
Err(Error::Io(ref err)) if err.kind() == ErrorKind::WouldBlock => {
break ReadStatus::WaitNextEvent
}
Err(Error::Io(ref err)) if err.kind() == ErrorKind::ConnectionReset => {
break ReadStatus::Disconnected
}
Err(err) => {
log::error!("WS receive error: {}", err);
break ReadStatus::Disconnected // should not happen
}
Message::Close(_) => break ReadStatus::Disconnected,
_ => continue,
},
Err(Error::Io(ref err)) => break Self::io_error_to_read_status(err),
Err(err) => {
log::error!("WS receive error: {}", err);
break ReadStatus::Disconnected // should not happen
}
}
},
RemoteState::Handshake(handshake) => {
let current_handshake = handshake.take().unwrap();
match current_handshake.mid_handshake.handshake() {
Expand Down Expand Up @@ -205,6 +198,19 @@ impl RemoteResource {
}
}
}

fn io_error_to_read_status(err: &io::Error) -> ReadStatus {
if err.kind() == io::ErrorKind::WouldBlock {
ReadStatus::WaitNextEvent
}
else if err.kind() == io::ErrorKind::ConnectionReset {
ReadStatus::Disconnected
}
else {
log::error!("WS receive error: {}", err);
ReadStatus::Disconnected // should not happen
}
}
}

pub(crate) struct LocalResource {
Expand Down
7 changes: 2 additions & 5 deletions tests/integration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ const SMALL_MESSAGE: &'static str = "Integration test message";
const BIG_MESSAGE_SIZE: usize = 1024 * 1024 * 8; // 8MB

lazy_static::lazy_static! {
pub static ref TIMEOUT: Duration = Duration::from_secs(30);
pub static ref TIMEOUT: Duration = Duration::from_secs(60);
}

// Common error messages
Expand Down Expand Up @@ -179,10 +179,7 @@ fn start_burst_receiver(

let mut count = 0;
listener.for_each(move |event| match event {
NodeEvent::Signal(_) => std::panic::catch_unwind(|| {
panic!("{}", TIMEOUT_EVENT_RECV_ERR);
})
.unwrap(),
NodeEvent::Signal(_) => panic!("{}", TIMEOUT_EVENT_RECV_ERR),
NodeEvent::Network(net_event) => match net_event {
NetEvent::Message(_, data) => {
let expected_message = format!("{}: {}", SMALL_MESSAGE, count);
Expand Down

0 comments on commit 4ce91c3

Please sign in to comment.