diff --git a/CHANGELOG.md b/CHANGELOG.md index 6817e8ad..fcccb958 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,9 @@ ## Release 0.7.1 - Added WebSocket support based in `tungstenite-rs` +- Added `Network::split()` function. +- Join `udp` and `tcp` examples to make the `ping-pong` example with *WebSocket* support. +- Improved `RemoteAddr` traits. ## Release 0.7.0 - Internal improvements in order to use one thread for all adapters. diff --git a/README.md b/README.md index b7248809..e1c6b1df 100644 --- a/README.md +++ b/README.md @@ -21,12 +21,12 @@ If you find a problem using the library or you have an improvement idea, do not Managing sockets is hard because you need to fight with threads, concurrency, IO errors that come from the OS (which are really difficult to understand in some situations), serialization, encoding... -And if you make use of non-blocking sockets, it adds a new layer of complexity: +And if you make use of *non-blocking* sockets, it adds a new layer of complexity: synchronize the events that come asynchronously from the OS poll. `message-io` offers an easy way to deal with all these mentioned problems, making them transparently for you, -the programmer that wants to make your application with its own problems. +the programmer that wants to make an application with its own problems. For that, `message-io` offers a simple API and give only two concepts to understand: **messages** (the data you send and receive), and **endpoints** (the recipients of that data). This abstraction also offers the possibility to use the same API independently @@ -36,7 +36,8 @@ You could change the protocol of your application in literally one line. ## Features - Asynchronous: internal poll event with non-blocking sockets using [mio](https://github.com/tokio-rs/mio). - Multiplatform: see [mio platform support](https://github.com/tokio-rs/mio#platforms). -- Multiples transports: **TCP**, **UDP** (with multicast option) and **WebSockets**. +- Multiples transports: **TCP**, **UDP** (with multicast option) and + **WebSockets** (secure and non-secure option). - Internal encoding layer: handle messages, not data streams. - FIFO events with timers and priority. - Easy, intuitive and consistent API: @@ -44,7 +45,7 @@ You could change the protocol of your application in literally one line. - Abstraction from transport layer: do not think about sockets, think about messages and endpoints. - Only two main entities to use: - an extensible *event-queue* to manage all events synchronously, - - a *network* that manage all connections (connect, listen, remove, send, receive). + - a *network* to manage all connections (connect, listen, remove, send, receive). - Forget concurrence problems: handle thousands of active connections and listeners without any effort, "One thread to rule them all". - Easy error handling. @@ -64,9 +65,7 @@ message-io = "0.7" - [API documentation](https://docs.rs/message-io/) - [Basic concepts](docs/basic_concepts.md) - [Examples](examples): - - [Basic TCP client and server](examples/tcp) - - [Basic UDP client and server](examples/udp) - - [Basic WebSocket client and server](examples/web_socket) + - [Ping Pong](examples/ping-pong) (a simple client server example) - [Multicast](examples/multicast) - [Distributed network with discovery server](examples/distributed) - [File transfer](examples/file-transfer) @@ -115,18 +114,21 @@ fn main() { } ``` -## Test yourself! -Clone the repository and test the TCP example that you can find in [`examples/tcp`](examples/tcp): +## Test it yourself! +Clone the repository and test the *Ping Pong* example. Run the server: ``` -cargo run --example tcp server +cargo run --example ping-pong server tcp 3456 ``` -In other terminals, run one or more clients: +Run the client: ``` -cargo run --example tcp client +cargo run --example ping-pong client tcp 127.0.0.1:3456 awesome-client ``` +You can play with it changing the transport, running several clients, disconnect them, etc. +See more [here](examples/ping-pong). + ## Do you need a transport protocol that `message-io` doesn't have? Add an adapter! `message-io` offers two *kinds* of APIs. @@ -143,7 +145,7 @@ If the protocol can be built in top on [`mio`](https://github.com/tokio-rs/mio#p 1. Add your *adapter* file in `src/adapters/.rs` that implements the traits that you find [here](https://docs.rs/message-io/0.7.0/message_io/adapter/index.html) (only 7 mandatory functions to implement, see the [template](src/adapters/template.rs)). -1. Add a new field in the `Transport` enum found in [`src/network.rs`] to register your new adapter. +1. Add a new field in the `Transport` enum found in `src/network.rs` to register your new adapter. That's all! You can use your new transport with the `message-io` API like any other. diff --git a/examples/ping-pong/README.md b/examples/ping-pong/README.md new file mode 100644 index 00000000..aaee9c38 --- /dev/null +++ b/examples/ping-pong/README.md @@ -0,0 +1,23 @@ +# Ping-Pong example + +This example shows a *clients-server* connection. +The clients send a message every second and the server responds to it. + +You can run both client and server by several transports: `tcp`, `udp`, `ws` (*WebSocket*). + +## Test it! + +Launch the server in a terminal: +``` +cargo run --example ping-pong server +``` + +Run a client with a name (one client per terminal): +``` +cargo run --example ping-pong client (: | ) +``` +You can play the disconnections and reconnections using `ctrl-c` over the clients. + +*Note: for *WebSocket* (`ws`), you can specify the address as usually `:` or as an url: +`ws://domain:port/path` for normal websocket or `wss://domain:port/part` for secure web sockets. + diff --git a/examples/ping-pong/client.rs b/examples/ping-pong/client.rs new file mode 100644 index 00000000..ad507f96 --- /dev/null +++ b/examples/ping-pong/client.rs @@ -0,0 +1,51 @@ +use super::common::{FromServerMessage, FromClientMessage}; + +use message_io::events::{EventQueue}; +use message_io::network::{Network, NetEvent, Transport, RemoteAddr}; + +use std::time::{Duration}; + +enum Event { + Network(NetEvent), + + // This is a self event called every second. + // You can mix network events with your own events in the EventQueue. + Greet, +} + +pub fn run(transport: Transport, remote_addr: RemoteAddr, name: &str) { + let mut event_queue = EventQueue::new(); + + let network_sender = event_queue.sender().clone(); + let mut network = Network::new(move |net_event| network_sender.send(Event::Network(net_event))); + + let server_id = match network.connect(Transport::Tcp, remote_addr.clone()) { + Ok(server_id) => server_id, + Err(_) => { + return println!("Can not connect to the server by {:?} to {}", transport, remote_addr) + } + }; + + println!("Connect to server by TCP at {}", server_id.addr()); + event_queue.sender().send(Event::Greet); + + loop { + match event_queue.receive() { + Event::Greet => { + let message = FromClientMessage::Greetings(format!("Hi, I am {}", name)); + network.send(server_id, message); + event_queue.sender().send_with_timer(Event::Greet, Duration::from_secs(1)); + } + Event::Network(net_event) => match net_event { + NetEvent::Message(_, message) => match message { + FromServerMessage::CountGreetings(text, count) => { + println!("Server says: '{}' for {} time", text, count) + } + }, + NetEvent::Connected(_) => unreachable!(), // Only generated when listen + NetEvent::Disconnected(_) => return println!("Server is disconnected"), + NetEvent::DeserializationError(_) => (), // Malformed message from the server + }, + } + } +} diff --git a/examples/ping-pong/common.rs b/examples/ping-pong/common.rs new file mode 100644 index 00000000..30cb0f7b --- /dev/null +++ b/examples/ping-pong/common.rs @@ -0,0 +1,11 @@ +use serde::{Serialize, Deserialize}; + +#[derive(Serialize, Deserialize)] +pub enum FromClientMessage { + Greetings(String), +} + +#[derive(Serialize, Deserialize)] +pub enum FromServerMessage { + CountGreetings(String, usize), +} diff --git a/examples/ping-pong/main.rs b/examples/ping-pong/main.rs new file mode 100644 index 00000000..fdedfdeb --- /dev/null +++ b/examples/ping-pong/main.rs @@ -0,0 +1,46 @@ +mod common; +mod client; +mod server; + +use message_io::network::{Transport, ToRemoteAddr}; + +use std::net::{ToSocketAddrs}; + +const HELP_MSG: &str = concat!( + "Usage: ping-pong server (tcp | udp | ws) []\n", + " pong-pong client (tcp | udp | ws) (:|url) " +); + +pub fn main() { + let args: Vec = std::env::args().collect(); + + let transport = match args.get(2).unwrap_or(&"".into()).as_ref() { + "tcp" => Transport::Tcp, + "udp" => Transport::Udp, + "ws" => Transport::Ws, + _ => return println!("{}", HELP_MSG), + }; + + match args.get(1).unwrap_or(&"".into()).as_ref() { + "client" => match args.get(3) { + Some(remote_addr) => match args.get(4) { + Some(name) => { + let remote_addr = remote_addr.to_remote_addr().unwrap(); + client::run(transport, remote_addr, name); + } + None => return println!("{}", HELP_MSG), + }, + None => return println!("{}", HELP_MSG), + }, + "server" => { + match args.get(3).unwrap_or(&"".into()).parse() { + Ok(port) => { + let addr = ("0.0.0.0", port).to_socket_addrs().unwrap().next().unwrap(); + server::run(transport, addr); + } + Err(_) => return println!("{}", HELP_MSG), + }; + } + _ => return println!("{}", HELP_MSG), + } +} diff --git a/examples/ping-pong/server.rs b/examples/ping-pong/server.rs new file mode 100644 index 00000000..d002ea45 --- /dev/null +++ b/examples/ping-pong/server.rs @@ -0,0 +1,55 @@ +use super::common::{FromServerMessage, FromClientMessage}; + +use message_io::network::{Network, NetEvent, Endpoint, Transport}; + +use std::collections::{HashMap}; +use std::net::{SocketAddr}; + +struct ClientInfo { + count: usize, +} + +pub fn run(transport: Transport, addr: SocketAddr) { + let (mut network, mut event_queue) = Network::split(); + + let mut clients: HashMap = HashMap::new(); + + match network.listen(transport, addr) { + Ok((_resource_id, real_addr)) => println!("TCP Server running at {}", real_addr), + Err(_) => return println!("Can not listening at {}", addr), + } + + loop { + match event_queue.receive() { + // Also you can use receive_timeout + NetEvent::Message(endpoint, message) => match message { + FromClientMessage::Greetings(text) => { + let mut count = clients.get_mut(&endpoint).unwrap().count; + count += 1; + println!("Client ({}) says '{}' {} times", endpoint.addr(), text, count); + let msg = format!("Hi, I hear you for {} time", count); + network.send(endpoint, FromServerMessage::CountGreetings(msg, count)); + } + }, + NetEvent::Connected(endpoint) => { + // Only connection oriented protocols as Tcp or Websocket will generate this event + clients.insert(endpoint, ClientInfo { count: 0 }); + println!( + "Client ({}) connected (total clients: {})", + endpoint.addr(), + clients.len() + ); + } + NetEvent::Disconnected(endpoint) => { + // Only connection oriented protocols as Tcp or Websocket will generate this event + clients.remove(&endpoint).unwrap(); + println!( + "Client ({}) disconnected (total clients: {})", + endpoint.addr(), + clients.len() + ); + } + NetEvent::DeserializationError(_) => (), // Only if the user send a malformed message. + } + } +} diff --git a/examples/tcp/README.md b/examples/tcp/README.md deleted file mode 100644 index 649690a8..00000000 --- a/examples/tcp/README.md +++ /dev/null @@ -1,15 +0,0 @@ -# TCP client and server example -This example allows to create a *star network topology* using a server and several clients by TCP. - -## Test it! -Launch the server in a terminal: -``` -cargo run --example tcp server -``` - -Run a client with a name (one client per terminal): -``` -cargo run --example tcp client -``` - -Note: You can play the disconnections/reconnections using `ctrl-c` over the clients. diff --git a/examples/tcp/client.rs b/examples/tcp/client.rs deleted file mode 100644 index b61df783..00000000 --- a/examples/tcp/client.rs +++ /dev/null @@ -1,47 +0,0 @@ -use super::common::{Message}; - -use message_io::events::{EventQueue}; -use message_io::network::{Network, NetEvent, Transport}; - -use std::time::{Duration}; - -enum Event { - Network(NetEvent), - Greet, -} - -pub fn run(name: &str) { - let mut event_queue = EventQueue::new(); - - let network_sender = event_queue.sender().clone(); - let mut network = Network::new(move |net_event| network_sender.send(Event::Network(net_event))); - - let server_addr = "127.0.0.1:3000"; - if let Ok(server_id) = network.connect(Transport::Tcp, server_addr) { - println!("Connect to server by TCP at {}", server_addr); - event_queue.sender().send(Event::Greet); - - loop { - match event_queue.receive() { - Event::Greet => { - network.send(server_id, Message::Greetings(format!("Hi, I am {}", name))); - event_queue.sender().send_with_timer(Event::Greet, Duration::from_secs(1)); - } - Event::Network(net_event) => match net_event { - NetEvent::Message(_, message) => match message { - Message::Greetings(text) => println!("Server says: {}", text), - }, - NetEvent::Connected(_) => unreachable!(), - NetEvent::Disconnected(_) => { - println!("Server is disconnected"); - return - } - NetEvent::DeserializationError(_) => (), - }, - } - } - } - else { - println!("Can not connect to the server by TCP to {}", server_addr); - } -} diff --git a/examples/tcp/common.rs b/examples/tcp/common.rs deleted file mode 100644 index 52a95fc8..00000000 --- a/examples/tcp/common.rs +++ /dev/null @@ -1,6 +0,0 @@ -use serde::{Serialize, Deserialize}; - -#[derive(Serialize, Deserialize)] -pub enum Message { - Greetings(String), -} diff --git a/examples/tcp/main.rs b/examples/tcp/main.rs deleted file mode 100644 index 7e76de51..00000000 --- a/examples/tcp/main.rs +++ /dev/null @@ -1,15 +0,0 @@ -mod common; -mod client; -mod server; - -pub fn main() { - let args: Vec = std::env::args().collect(); - match args.get(1).unwrap_or(&String::new()).as_ref() { - "client" => match args.get(2) { - Some(name) => client::run(name), - None => println!("The client needs a 'name'"), - }, - "server" => server::run(), - _ => println!("Usage: basic client | server"), - } -} diff --git a/examples/tcp/server.rs b/examples/tcp/server.rs deleted file mode 100644 index 2778e902..00000000 --- a/examples/tcp/server.rs +++ /dev/null @@ -1,67 +0,0 @@ -use super::common::{Message}; - -use message_io::events::{EventQueue}; -use message_io::network::{Network, NetEvent, Endpoint, Transport}; - -use std::collections::{HashMap}; - -struct ClientInfo { - count: usize, -} - -enum Event { - Network(NetEvent), -} - -pub fn run() { - let mut event_queue = EventQueue::new(); - - let network_sender = event_queue.sender().clone(); - let mut network = Network::new(move |net_event| network_sender.send(Event::Network(net_event))); - - let mut clients: HashMap = HashMap::new(); - - let listen_addr = "127.0.0.1:3000"; - match network.listen(Transport::Tcp, listen_addr) { - Ok(_) => println!("TCP Server running at {}", listen_addr), - Err(_) => return println!("Can not listening at {}", listen_addr), - } - - loop { - match event_queue.receive() { - Event::Network(net_event) => match net_event { - NetEvent::Message(endpoint, message) => match message { - Message::Greetings(text) => { - let mut client_info = clients.get_mut(&endpoint).unwrap(); - client_info.count += 1; - println!( - "Client ({}) says '{}' {} times", - endpoint.addr(), - text, - client_info.count - ); - let msg = format!("Hi, I hear you for {} time", client_info.count); - network.send(endpoint, Message::Greetings(msg)); - } - }, - NetEvent::Connected(endpoint) => { - clients.insert(endpoint, ClientInfo { count: 0 }); - println!( - "Client ({}) connected (total clients: {})", - endpoint.addr(), - clients.len() - ); - } - NetEvent::Disconnected(endpoint) => { - clients.remove(&endpoint).unwrap(); - println!( - "Client ({}) disconnected (total clients: {})", - endpoint.addr(), - clients.len() - ); - } - NetEvent::DeserializationError(_) => (), - }, - } - } -} diff --git a/examples/udp/README.md b/examples/udp/README.md deleted file mode 100644 index a2d262f8..00000000 --- a/examples/udp/README.md +++ /dev/null @@ -1,15 +0,0 @@ -# UDP client and server example -This example allows to create a *star network topology* using a server and several clients by UDP. - -## Test it! -Launch the server in a terminal: -``` -cargo run --example udp server -``` - -Run a client with a name (one client per terminal): -``` -cargo run --example udp client -``` - -Note: Since UDP is not connection-oriented, you can run a client first and then, the server. diff --git a/examples/udp/client.rs b/examples/udp/client.rs deleted file mode 100644 index 24c77e55..00000000 --- a/examples/udp/client.rs +++ /dev/null @@ -1,41 +0,0 @@ -use super::common::{Message}; - -use message_io::events::{EventQueue}; -use message_io::network::{Network, NetEvent, Transport}; - -use std::time::{Duration}; - -enum Event { - Network(NetEvent), - Greet, -} - -pub fn run(name: &str) { - let mut event_queue = EventQueue::new(); - - let network_sender = event_queue.sender().clone(); - let mut network = Network::new(move |net_event| network_sender.send(Event::Network(net_event))); - - let server_addr = "127.0.0.1:3000"; - if let Ok(server_id) = network.connect(Transport::Udp, server_addr) { - println!("Sending to {} by UDP", server_addr); - event_queue.sender().send(Event::Greet); - - loop { - match event_queue.receive() { - Event::Greet => { - network.send(server_id, Message::Greetings(format!("Hi, I am {}", name))); - event_queue.sender().send_with_timer(Event::Greet, Duration::from_secs(1)); - } - Event::Network(net_event) => match net_event { - NetEvent::Message(_, message) => match message { - Message::Greetings(text) => println!("Server says: {}", text), - }, - NetEvent::Connected(_) => unreachable!(), // Not be generated for UDP - NetEvent::Disconnected(_) => unreachable!(), // Not be generated for UDP - NetEvent::DeserializationError(_) => (), - }, - } - } - } -} diff --git a/examples/udp/common.rs b/examples/udp/common.rs deleted file mode 100644 index 52a95fc8..00000000 --- a/examples/udp/common.rs +++ /dev/null @@ -1,6 +0,0 @@ -use serde::{Serialize, Deserialize}; - -#[derive(Serialize, Deserialize)] -pub enum Message { - Greetings(String), -} diff --git a/examples/udp/main.rs b/examples/udp/main.rs deleted file mode 100644 index 7e76de51..00000000 --- a/examples/udp/main.rs +++ /dev/null @@ -1,15 +0,0 @@ -mod common; -mod client; -mod server; - -pub fn main() { - let args: Vec = std::env::args().collect(); - match args.get(1).unwrap_or(&String::new()).as_ref() { - "client" => match args.get(2) { - Some(name) => client::run(name), - None => println!("The client needs a 'name'"), - }, - "server" => server::run(), - _ => println!("Usage: basic client | server"), - } -} diff --git a/examples/udp/server.rs b/examples/udp/server.rs deleted file mode 100644 index e01559ab..00000000 --- a/examples/udp/server.rs +++ /dev/null @@ -1,37 +0,0 @@ -use super::common::{Message}; - -use message_io::events::{EventQueue}; -use message_io::network::{Network, NetEvent, Transport}; - -enum Event { - Network(NetEvent), -} - -pub fn run() { - let mut event_queue = EventQueue::new(); - - let network_sender = event_queue.sender().clone(); - let mut network = Network::new(move |net_event| network_sender.send(Event::Network(net_event))); - - let listen_addr = "127.0.0.1:3000"; - match network.listen(Transport::Udp, listen_addr) { - Ok(_) => println!("UDP Server running at {}", listen_addr), - Err(_) => return println!("Can not listening at {}", listen_addr), - } - - loop { - match event_queue.receive() { - Event::Network(net_event) => match net_event { - NetEvent::Message(endpoint, message) => match message { - Message::Greetings(text) => { - println!("Client ({}) says: {}", endpoint.addr(), text); - network.send(endpoint, Message::Greetings("Hi, I hear you".into())); - } - }, - NetEvent::Connected(_) => unreachable!(), // Not be generated for UDP - NetEvent::Disconnected(_) => unreachable!(), // Not be generated for UDP - NetEvent::DeserializationError(_) => (), - }, - } - } -} diff --git a/src/adapters/tcp.rs b/src/adapters/tcp.rs index c3fed4d8..a1856764 100644 --- a/src/adapters/tcp.rs +++ b/src/adapters/tcp.rs @@ -86,7 +86,7 @@ impl Remote for RemoteResource { // TODO: The current implementation implies an active waiting, // improve it using POLLIN instead to avoid active waiting. // Note: Despite the fear that an active waiting could generate, - // this waiting only occurs in the rare case when the send method needs block. + // this waiting only occurs in the case when the receiver is full. let mut total_bytes_sent = 0; let total_bytes = encoding::PADDING + data.len(); loop { diff --git a/src/lib.rs b/src/lib.rs index d415abd8..f731e253 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -5,10 +5,10 @@ mod encoding; mod poll; mod driver; mod engine; +mod adapters; +mod remote_addr; -pub mod remote_addr; pub mod adapter; -pub mod adapters; pub mod network; pub mod events; diff --git a/src/remote_addr.rs b/src/remote_addr.rs index 7ad23a64..9c7c6a20 100644 --- a/src/remote_addr.rs +++ b/src/remote_addr.rs @@ -41,12 +41,20 @@ impl RemoteAddr { } } -pub trait ToRemoteAddr: ToSocketAddrs { - fn to_remote_addr(&self) -> io::Result { - Ok(RemoteAddr::SocketAddr(self.to_socket_addrs()?.next().unwrap())) +impl ToSocketAddrs for RemoteAddr { + type Iter = std::option::IntoIter; + fn to_socket_addrs(&self) -> io::Result { + match self { + RemoteAddr::SocketAddr(addr) => addr.to_socket_addrs(), + RemoteAddr::Url(_) => Err(io::Error::new(io::ErrorKind::InvalidInput, "Malformed url")), + } } } +pub trait ToRemoteAddr { + fn to_remote_addr(&self) -> io::Result; +} + impl ToRemoteAddr for &str { fn to_remote_addr(&self) -> io::Result { Ok(match self.parse() { @@ -61,7 +69,13 @@ impl ToRemoteAddr for &str { impl ToRemoteAddr for String { fn to_remote_addr(&self) -> io::Result { - (&**self).to_remote_addr() + (&self as &str).to_remote_addr() + } +} + +impl ToRemoteAddr for &String { + fn to_remote_addr(&self) -> io::Result { + (self as &str).to_remote_addr() } } @@ -71,6 +85,21 @@ impl ToRemoteAddr for SocketAddr { } } +impl ToRemoteAddr for RemoteAddr { + fn to_remote_addr(&self) -> io::Result { + Ok(self.clone()) + } +} + +impl std::fmt::Display for RemoteAddr { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + RemoteAddr::SocketAddr(addr) => write!(f, "{}", addr), + RemoteAddr::Url(url) => write!(f, "{}", url), + } + } +} + #[cfg(test)] mod tests { use super::*; diff --git a/tests/integration.rs b/tests/integration.rs index 12ebcdc6..f00840ca 100644 --- a/tests/integration.rs +++ b/tests/integration.rs @@ -65,7 +65,7 @@ mod util { } } -fn ping_pong_server_handle( +fn echo_server_handle( transport: Transport, expected_clients: usize, ) -> (JoinHandle<()>, SocketAddr) @@ -138,7 +138,7 @@ fn ping_pong_server_handle( (handle, server_addr) } -fn ping_pong_client_manager_handle( +fn echo_client_manager_handle( transport: Transport, server_addr: SocketAddr, clients_number: usize, @@ -189,11 +189,11 @@ fn ping_pong_client_manager_handle( #[test_case(Transport::Ws, 100)] // NOTE: A medium-high `clients` value can exceeds the "open file" limits of an OS in CI // with a very obfuscated error message. -fn ping_pong(transport: Transport, clients: usize) { +fn echo(transport: Transport, clients: usize) { //util::init_logger(); - let (server_handle, server_addr) = ping_pong_server_handle(transport, clients); - let client_handle = ping_pong_client_manager_handle(transport, server_addr, clients); + let (server_handle, server_addr) = echo_server_handle(transport, clients); + let client_handle = echo_client_manager_handle(transport, server_addr, clients); server_handle.join().unwrap(); client_handle.join().unwrap();