From 1ae3a2c1da71e666601e3012a945d20559c680d9 Mon Sep 17 00:00:00 2001 From: lemunozm Date: Sun, 14 Feb 2021 23:59:09 +0100 Subject: [PATCH] Adapted websocket adapter for RemoteAddr --- src/adapters/web_socket.rs | 39 +++++++++++++++++++++++++++++--------- tests/integration.rs | 5 ++++- 2 files changed, 34 insertions(+), 10 deletions(-) diff --git a/src/adapters/web_socket.rs b/src/adapters/web_socket.rs index 6756a75e..a77c7c77 100644 --- a/src/adapters/web_socket.rs +++ b/src/adapters/web_socket.rs @@ -1,4 +1,7 @@ -use crate::adapter::{Resource, Adapter, ActionHandler, EventHandler, SendStatus, AcceptedType, ReadStatus}; +use crate::adapter::{ + Resource, Adapter, ActionHandler, EventHandler, SendStatus, AcceptedType, ReadStatus, +}; +use crate::remote_addr::{RemoteAddr}; use crate::util::{OTHER_THREAD_ERR}; use mio::event::{Source}; @@ -6,14 +9,20 @@ use mio::net::{TcpStream, TcpListener}; use tungstenite::protocol::{WebSocket, Message}; use tungstenite::server::{accept as ws_accept}; -use tungstenite::client::{client as ws_client}; +use tungstenite::client::{client as ws_connect}; use tungstenite::handshake::{HandshakeError}; use tungstenite::error::{Error}; +use url::Url; + use std::sync::{Mutex}; use std::net::{SocketAddr, TcpStream as StdTcpStream}; use std::io::{self, ErrorKind}; +/// Max message size +// From https://docs.rs/tungstenite/0.13.0/src/tungstenite/protocol/mod.rs.html#65 +pub const MAX_WS_PAYLOAD_LEN: usize = 64 << 20; + pub struct ClientResource(Mutex>); impl Resource for ClientResource { fn source(&mut self) -> &mut dyn Source { @@ -45,7 +54,18 @@ impl ActionHandler for WsActionHandler { type Remote = ClientResource; type Listener = ServerResource; - fn connect(&mut self, addr: SocketAddr) -> io::Result { + fn connect(&mut self, remote_addr: RemoteAddr) -> io::Result<(ClientResource, SocketAddr)> { + let (addr, url) = match remote_addr { + RemoteAddr::SocketAddr(addr) => (addr, Url::parse(&format!("ws://{}/message-io-default", addr)).unwrap()), + RemoteAddr::Url(url) => { + let addr = url.socket_addrs(|| match url.scheme() { + "ws" => Some(80), // Plain + "wss" => Some(443), //Tls + _ => None, + }).unwrap()[0]; + (addr, url) + } + }; // Synchronous tcp handshake let stream = StdTcpStream::connect(addr)?; @@ -54,10 +74,10 @@ impl ActionHandler for WsActionHandler { let stream = TcpStream::from_std(stream); // Synchronous waiting for web socket handshake - let mut handshake_result = ws_client(format!("ws://{}/socket", addr), stream); + let mut handshake_result = ws_connect(url, stream); loop { match handshake_result { - Ok((ws_socket, _)) => break Ok(ClientResource(Mutex::new(ws_socket))), + Ok((ws_socket, _)) => break Ok((ClientResource(Mutex::new(ws_socket)), addr)), Err(HandshakeError::Interrupted(mid_handshake)) => { handshake_result = mid_handshake.handshake(); } @@ -79,7 +99,7 @@ impl ActionHandler for WsActionHandler { loop { match result { Ok(_) => break SendStatus::Sent, - Err(Error::Io(ref err)) if err.kind() == ErrorKind::WouldBlock => { + Err(Error::Io(ref err)) if err.kind() == ErrorKind::WouldBlock => { result = socket.write_pending(); } Err(_) => break SendStatus::ResourceNotFound, @@ -115,15 +135,16 @@ impl EventHandler for WsEventHandler { fn read_event( &mut self, resource: &ClientResource, - process_data: &dyn Fn(&[u8]) - ) -> ReadStatus { + process_data: &dyn Fn(&[u8]), + ) -> ReadStatus + { loop { match resource.0.lock().expect(OTHER_THREAD_ERR).read_message() { Ok(message) => match message { Message::Binary(data) => process_data(&data), Message::Close(_) => break ReadStatus::Disconnected, _ => continue, - } + }, Err(Error::Io(ref err)) if err.kind() == ErrorKind::WouldBlock => { break ReadStatus::WaitNextEvent } diff --git a/tests/integration.rs b/tests/integration.rs index ebced749..c7d4eadf 100644 --- a/tests/integration.rs +++ b/tests/integration.rs @@ -17,6 +17,7 @@ pub const BIG_MESSAGE_SIZE: usize = 1 << 20; // 1MB // 8 is the Vec head offset added in serialization. pub const MAX_SIZE_BY_UDP: usize = MAX_UDP_PAYLOAD_LEN - 8; +pub const MAX_SIZE_BY_WS: usize = MAX_UDP_PAYLOAD_LEN - 8; lazy_static::lazy_static! { pub static ref TIMEOUT: Duration = Duration::from_millis(5000); @@ -176,6 +177,7 @@ fn ping_pong_client_manager_handle( #[test_case(Transport::Tcp, 1)] #[test_case(Transport::Tcp, 100)] #[test_case(Transport::Ws, 1)] +#[test_case(Transport::Ws, 100)] // NOTE: A medium-high `clients` value can exceeds the "open file" limits of an OS in CI // with a very obfuscated error message. fn ping_pong(transport: Transport, clients: usize) { @@ -188,8 +190,9 @@ fn ping_pong(transport: Transport, clients: usize) { client_handle.join().unwrap(); } -#[test_case(Transport::Udp, MAX_SIZE_BY_UDP)] #[test_case(Transport::Tcp, BIG_MESSAGE_SIZE)] +#[test_case(Transport::Udp, MAX_SIZE_BY_UDP)] +#[test_case(Transport::Ws, MAX_SIZE_BY_WS + 10000)] fn message_size(transport: Transport, message_size: usize) { //util::init_logger();