Skip to content

Commit

Permalink
Updated websocket to new adapter api
Browse files Browse the repository at this point in the history
  • Loading branch information
lemunozm committed Feb 16, 2021
1 parent 4dd454e commit bf729ab
Show file tree
Hide file tree
Showing 3 changed files with 69 additions and 78 deletions.
143 changes: 67 additions & 76 deletions src/adapters/web_socket.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,4 @@
use crate::adapter::{
Resource, Adapter, ActionHandler, EventHandler, SendStatus, AcceptedType, ReadStatus,
};
use crate::adapter::{Resource, Remote, Local, Adapter, SendStatus, AcceptedType, ReadStatus};
use crate::remote_addr::{RemoteAddr};
use crate::util::{OTHER_THREAD_ERR};

Expand All @@ -23,40 +21,34 @@ use std::io::{self, ErrorKind};
// From https://docs.rs/tungstenite/0.13.0/src/tungstenite/protocol/mod.rs.html#65
pub const MAX_WS_PAYLOAD_LEN: usize = 64 << 20;

pub struct ClientResource(Mutex<WebSocket<TcpStream>>);
impl Resource for ClientResource {
fn source(&mut self) -> &mut dyn Source {
self.0.get_mut().unwrap().get_mut()
}
pub struct WsAdapter;
impl Adapter for WsAdapter {
type Remote = RemoteResource;
type Local = LocalResource;
}

pub struct ServerResource(TcpListener);
impl Resource for ServerResource {
fn source(&mut self) -> &mut dyn Source {
&mut self.0
}
pub struct RemoteResource{
web_socket: Mutex<WebSocket<TcpStream>>
}

pub struct WsAdapter;
impl Adapter for WsAdapter {
type Remote = ClientResource;
type Listener = ServerResource;
type ActionHandler = WsActionHandler;
type EventHandler = WsEventHandler;

fn split(self) -> (WsActionHandler, WsEventHandler) {
(WsActionHandler, WsEventHandler)
impl From<WebSocket<TcpStream>> for RemoteResource {
fn from(web_socket: WebSocket<TcpStream>) -> Self {
Self { web_socket: Mutex::new(web_socket) }
}
}

pub struct WsActionHandler;
impl ActionHandler for WsActionHandler {
type Remote = ClientResource;
type Listener = ServerResource;
impl Resource for RemoteResource {
fn source(&mut self) -> &mut dyn Source {
// We return safety the inner TcpStream without blocking
self.web_socket.get_mut().unwrap().get_mut()
}
}

fn connect(&mut self, remote_addr: RemoteAddr) -> io::Result<(ClientResource, SocketAddr)> {
impl Remote for RemoteResource {
fn connect(remote_addr: RemoteAddr) -> io::Result<(Self, SocketAddr)> {
let (addr, url) = match remote_addr {
RemoteAddr::SocketAddr(addr) => (addr, Url::parse(&format!("ws://{}/message-io-default", addr)).unwrap()),
RemoteAddr::SocketAddr(addr) =>
(addr, Url::parse(&format!("ws://{}/message-io-default", addr)).unwrap()),
RemoteAddr::Url(url) => {
let addr = url.socket_addrs(|| match url.scheme() {
"ws" => Some(80), // Plain
Expand All @@ -66,6 +58,7 @@ impl ActionHandler for WsActionHandler {
(addr, url)
}
};

// Synchronous tcp handshake
let stream = StdTcpStream::connect(addr)?;

Expand All @@ -77,7 +70,7 @@ impl ActionHandler for WsActionHandler {
let mut handshake_result = ws_connect(url, stream);
loop {
match handshake_result {
Ok((ws_socket, _)) => break Ok((ClientResource(Mutex::new(ws_socket)), addr)),
Ok((ws_socket, _)) => break Ok((RemoteResource::from(ws_socket), addr)),
Err(HandshakeError::Interrupted(mid_handshake)) => {
handshake_result = mid_handshake.handshake();
}
Expand All @@ -86,15 +79,31 @@ impl ActionHandler for WsActionHandler {
}
}

fn listen(&mut self, addr: SocketAddr) -> io::Result<(ServerResource, SocketAddr)> {
let listener = TcpListener::bind(addr)?;
let real_addr = listener.local_addr().unwrap();
Ok((ServerResource(listener), real_addr))
fn receive(&self, process_data: &dyn Fn(&[u8])) -> ReadStatus {
loop {
match self.web_socket.lock().expect(OTHER_THREAD_ERR).read_message() {
Ok(message) => match message {
Message::Binary(data) => process_data(&data),
Message::Close(_) => break ReadStatus::Disconnected,
_ => continue,
},
Err(Error::Io(ref err)) if err.kind() == ErrorKind::WouldBlock => {
break ReadStatus::WaitNextEvent
}
Err(Error::Io(ref err)) if err.kind() == ErrorKind::ConnectionReset => {
break ReadStatus::Disconnected
}
Err(_) => {
log::error!("TCP read event error");
break ReadStatus::Disconnected // should not happen
}
}
}
}

fn send(&mut self, web_socket: &ClientResource, data: &[u8]) -> SendStatus {
fn send(&self, data: &[u8]) -> SendStatus {
let message = Message::Binary(data.to_vec());
let mut socket = web_socket.0.lock().expect(OTHER_THREAD_ERR);
let mut socket = self.web_socket.lock().expect(OTHER_THREAD_ERR);
let mut result = socket.write_message(message);
loop {
match result {
Expand All @@ -108,53 +117,35 @@ impl ActionHandler for WsActionHandler {
}
}

pub struct WsEventHandler;
impl EventHandler for WsEventHandler {
type Remote = ClientResource;
type Listener = ServerResource;

fn accept_event(
&mut self,
listener: &ServerResource,
accept_remote: &dyn Fn(AcceptedType<'_, Self::Remote>),
)
{
pub struct LocalResource{
listener: TcpListener
}

impl Resource for LocalResource {
fn source(&mut self) -> &mut dyn Source {
&mut self.listener
}
}

impl Local for LocalResource {
type Remote = RemoteResource;

fn listen(addr: SocketAddr) -> io::Result<(Self, SocketAddr)> {
let listener = TcpListener::bind(addr)?;
let real_addr = listener.local_addr().unwrap();
Ok((LocalResource{listener}, real_addr))
}

fn accept(&self, accept_remote: &dyn Fn(AcceptedType<'_, Self::Remote>)) {
loop {
match listener.0.accept() {
match self.listener.accept() {
Ok((stream, addr)) => {
let resource = ClientResource(Mutex::new(ws_accept(stream).unwrap()));
let resource = RemoteResource::from(ws_accept(stream).unwrap());
accept_remote(AcceptedType::Remote(addr, resource));
}
Err(ref err) if err.kind() == ErrorKind::WouldBlock => break,
Err(ref err) if err.kind() == ErrorKind::Interrupted => continue,
Err(_) => break log::trace!("WebSocket accept event error"), // Should not happen
}
}
}

fn read_event(
&mut self,
resource: &ClientResource,
process_data: &dyn Fn(&[u8]),
) -> ReadStatus
{
loop {
match resource.0.lock().expect(OTHER_THREAD_ERR).read_message() {
Ok(message) => match message {
Message::Binary(data) => process_data(&data),
Message::Close(_) => break ReadStatus::Disconnected,
_ => continue,
},
Err(Error::Io(ref err)) if err.kind() == ErrorKind::WouldBlock => {
break ReadStatus::WaitNextEvent
}
Err(Error::Io(ref err)) if err.kind() == ErrorKind::ConnectionReset => {
break ReadStatus::Disconnected
}
Err(_) => {
log::error!("TCP read event error");
break ReadStatus::Disconnected // should not happen
}
Err(err) => break log::trace!("WS accept error: {}", err), // Should not happen
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,4 @@ pub mod network;
pub mod events;

pub use adapters::udp::MAX_UDP_PAYLOAD_LEN;
//pub use adapters::web_socket::MAX_WS_PAYLOAD_LEN;
pub use adapters::web_socket::MAX_WS_PAYLOAD_LEN;
2 changes: 1 addition & 1 deletion tests/integration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ fn ping_pong(transport: Transport, clients: usize) {

#[test_case(Transport::Tcp, BIG_MESSAGE_SIZE)]
#[test_case(Transport::Udp, MAX_SIZE_BY_UDP)]
#[test_case(Transport::Ws, MAX_SIZE_BY_WS + 10000)]
#[test_case(Transport::Ws, MAX_SIZE_BY_WS)]
fn message_size(transport: Transport, message_size: usize) {
//util::init_logger();

Expand Down

0 comments on commit bf729ab

Please sign in to comment.