Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handle text messages properly in web-sys websocket service #1005

Merged
merged 7 commits into from
Mar 8, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 5 additions & 6 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ dist: trusty
language: rust
sudo: false
addons:
chrome: stable
firefox: latest

cache: cargo
before_cache:
Expand All @@ -32,12 +32,11 @@ install:
- cargo install cargo-update || true
- cargo install-update-config --version =0.2.59 wasm-bindgen-cli
- cargo install-update --allow-no-update wasm-bindgen-cli
- LATEST_CHROMEDRIVER_VERSION=`curl -s "https://chromedriver.storage.googleapis.com/LATEST_RELEASE"`
- curl --retry 5 -LO "https://chromedriver.storage.googleapis.com/${LATEST_CHROMEDRIVER_VERSION}/chromedriver_linux64.zip"
- unzip chromedriver_linux64.zip
- curl --retry 5 -LO https://github.com/mozilla/geckodriver/releases/download/v0.26.0/geckodriver-v0.26.0-linux64.tar.gz
- tar -xzf geckodriver-v0.26.0-linux64.tar.gz
- ./ci/install_cargo_web.sh

script:
- ./ci/run_checks.sh
- CHROMEDRIVER=$(pwd)/chromedriver ./ci/run_tests.sh
- CHROMEDRIVER=$(pwd)/chromedriver ./ci/check_examples.sh
- GECKODRIVER=$(pwd)/geckodriver ./ci/run_tests.sh
- ./ci/check_examples.sh
5 changes: 3 additions & 2 deletions ci/run_tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,15 @@ echo "$(rustup default)" | grep -q "1.39.0"
emscripten_supported=$?
set -euxo pipefail # https://vaneyckt.io/posts/safer_bash_scripts_with_set_euxo_pipefail/

cargo test --target wasm32-unknown-unknown --features wasm_test,std_web
cargo test --target wasm32-unknown-unknown --features wasm_test,web_sys

if [ "$emscripten_supported" == "0" ]; then
# TODO - Emscripten builds are broken on rustc > 1.39.0
cargo web test --target asmjs-unknown-emscripten --features std_web
cargo web test --target wasm32-unknown-emscripten --features std_web
fi

cargo test --target wasm32-unknown-unknown --features wasm_test,std_web
cargo test --target wasm32-unknown-unknown --features wasm_test,web_sys
cargo test --doc --features doc_test,wasm_test,yaml,msgpack,cbor,std_web
cargo test --doc --features doc_test,wasm_test,yaml,msgpack,cbor,web_sys

Expand Down
4 changes: 4 additions & 0 deletions src/format/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,10 @@ pub enum FormatError {
/// on a WebSocket that is using a binary serialization format, like Cbor.
#[error("received text for a binary format")]
ReceivedTextForBinary,
/// Received binary for a text format, e.g. someone sending binary
/// on a WebSocket that is using a text serialization format, like Json.
#[error("received binary for a text format")]
ReceivedBinaryForText,
/// Trying to encode a binary format as text", e.g., trying to
/// store a Cbor encoded value in a String.
#[error("trying to encode a binary format as Text")]
Expand Down
182 changes: 153 additions & 29 deletions src/services/websocket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

use super::Task;
use crate::callback::Callback;
use crate::format::{Binary, Text};
use crate::format::{Binary, FormatError, Text};
use cfg_if::cfg_if;
use cfg_match::cfg_match;
use std::fmt;
Expand All @@ -21,7 +21,7 @@ cfg_if! {
}

/// A status of a websocket connection. Used for status notification.
#[derive(Debug)]
#[derive(Debug, PartialEq)]
pub enum WebSocketStatus {
/// Fired when a websocket connection was opened.
Opened,
Expand Down Expand Up @@ -121,15 +121,15 @@ impl WebSocketService {
feature = "std_web" => ({
let ws = self.connect_common(url, &notification)?.0;
ws.add_event_listener(move |event: SocketMessageEvent| {
did_process_binary(&event, &callback);
process_binary(&event, &callback);
});
Ok(WebSocketTask { ws, notification })
}),
feature = "web_sys" => ({
let ConnectCommon(ws, listeners) = self.connect_common(url, &notification)?;
let listener = EventListener::new(&ws, "message", move |event: &Event| {
let event = event.dyn_ref::<MessageEvent>().unwrap();
did_process_binary(&event, &callback);
process_binary(&event, &callback);
});
WebSocketTask::new(ws, notification, listener, listeners)
}),
Expand Down Expand Up @@ -226,32 +226,35 @@ impl WebSocketService {

struct ConnectCommon(WebSocket, #[cfg(feature = "web_sys")] [EventListener; 3]);

fn did_process_binary<OUT: 'static>(
fn process_binary<OUT: 'static>(
#[cfg(feature = "std_web")] event: &SocketMessageEvent,
#[cfg(feature = "web_sys")] event: &MessageEvent,
callback: &Callback<OUT>,
) -> bool
where
) where
OUT: From<Binary>,
{
let bytes = cfg_match! {
feature = "std_web" => event.data().into_array_buffer(),
feature = "web_sys" => Some(event.data()),
#[cfg(feature = "std_web")]
let bytes = event.data().into_array_buffer();

#[cfg(feature = "web_sys")]
let bytes = if !event.data().is_string() {
Some(event.data())
} else {
None
};

match bytes {
None => false,
Some(bytes) => {
let bytes: Vec<u8> = cfg_match! {
feature = "std_web" => bytes.into(),
feature = "web_sys" => Uint8Array::new(&bytes).to_vec(),
};
let data = Ok(bytes);
let out = OUT::from(data);
callback.emit(out);
true
}
}
let data = if let Some(bytes) = bytes {
let bytes: Vec<u8> = cfg_match! {
feature = "std_web" => bytes.into(),
feature = "web_sys" => Uint8Array::new(&bytes).to_vec(),
};
Ok(bytes)
} else {
Err(FormatError::ReceivedTextForBinary.into())
};

let out = OUT::from(data);
callback.emit(out);
}

fn process_text<OUT: 'static>(
Expand All @@ -266,11 +269,14 @@ fn process_text<OUT: 'static>(
feature = "web_sys" => event.data().as_string(),
};

if let Some(text) = text {
let data = Ok(text);
let out = OUT::from(data);
callback.emit(out);
}
let data = if let Some(text) = text {
Ok(text)
} else {
Err(FormatError::ReceivedBinaryForText.into())
};

let out = OUT::from(data);
callback.emit(out);
}

fn process_both<OUT: 'static>(
Expand All @@ -280,8 +286,16 @@ fn process_both<OUT: 'static>(
) where
OUT: From<Text> + From<Binary>,
{
if !did_process_binary(event, callback) {
#[cfg(feature = "std_web")]
let is_text = event.data().into_text().is_some();

#[cfg(feature = "web_sys")]
let is_text = event.data().is_string();

if is_text {
process_text(event, callback);
} else {
process_binary(event, callback);
}
}

Expand Down Expand Up @@ -340,3 +354,113 @@ impl Drop for WebSocketTask {
}
}
}

#[cfg(test)]
#[cfg(feature = "wasm_test")]
mod tests {
use super::*;
use crate::callback::{test_util::CallbackFuture, Callback};
use crate::format::{FormatError, Json};
use serde::{Deserialize, Serialize};
use wasm_bindgen_test::{wasm_bindgen_test as test, wasm_bindgen_test_configure};

wasm_bindgen_test_configure!(run_in_browser);

#[derive(Serialize, Deserialize, Debug, PartialEq)]
struct Message {
test: String,
}

#[test]
async fn connect() {
let url = "wss://echo.websocket.org";
let cb_future = CallbackFuture::<Json<Result<Message, anyhow::Error>>>::default();
let callback: Callback<_> = cb_future.clone().into();
let status_future = CallbackFuture::<WebSocketStatus>::default();
let notification: Callback<_> = status_future.clone().into();

let mut ws = WebSocketService::new();
let mut task = ws.connect(url, callback, notification).unwrap();
assert_eq!(status_future.await, WebSocketStatus::Opened);

let msg = Message {
test: String::from("hello"),
};

task.send(Json(&msg));
match cb_future.clone().await {
Json(Ok(received)) => assert_eq!(received, msg),
Json(Err(err)) => assert!(false, err),
}

task.send_binary(Json(&msg));
match cb_future.await {
Json(Ok(received)) => assert_eq!(received, msg),
Json(Err(err)) => assert!(false, err),
}
}

#[test]
async fn connect_text() {
let url = "wss://echo.websocket.org";
let cb_future = CallbackFuture::<Json<Result<Message, anyhow::Error>>>::default();
let callback: Callback<_> = cb_future.clone().into();
let status_future = CallbackFuture::<WebSocketStatus>::default();
let notification: Callback<_> = status_future.clone().into();

let mut ws = WebSocketService::new();
let mut task = ws.connect_text(url, callback, notification).unwrap();
assert_eq!(status_future.await, WebSocketStatus::Opened);

let msg = Message {
test: String::from("hello"),
};

task.send(Json(&msg));
match cb_future.clone().await {
Json(Ok(received)) => assert_eq!(received, msg),
Json(Err(err)) => assert!(false, err),
}

task.send_binary(Json(&msg));
match cb_future.await {
Json(Ok(received)) => assert!(false, received),
Json(Err(err)) => assert_eq!(
err.to_string(),
FormatError::ReceivedBinaryForText.to_string()
),
}
}

#[test]
async fn connect_binary() {
let url = "wss://echo.websocket.org";
let cb_future = CallbackFuture::<Json<Result<Message, anyhow::Error>>>::default();
let callback: Callback<_> = cb_future.clone().into();
let status_future = CallbackFuture::<WebSocketStatus>::default();
let notification: Callback<_> = status_future.clone().into();

let mut ws = WebSocketService::new();
let mut task = ws.connect_binary(url, callback, notification).unwrap();
assert_eq!(status_future.await, WebSocketStatus::Opened);

let msg = Message {
test: String::from("hello"),
};

task.send_binary(Json(&msg));
match cb_future.clone().await {
Json(Ok(received)) => assert_eq!(received, msg),
Json(Err(err)) => assert!(false, err),
}

task.send(Json(&msg));
match cb_future.await {
Json(Ok(received)) => assert!(false, received),
Json(Err(err)) => assert_eq!(
err.to_string(),
FormatError::ReceivedTextForBinary.to_string()
),
}
}
}