-
Notifications
You must be signed in to change notification settings - Fork 225
/
Copy pathasync-client.rs
79 lines (71 loc) · 2.31 KB
/
async-client.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
extern crate futures;
extern crate tokio;
extern crate websocket;
use futures::future::Future;
use futures::sink::Sink;
use futures::stream::Stream;
use futures::sync::mpsc;
use std::io::stdin;
use std::thread;
use websocket::result::WebSocketError;
use websocket::{ClientBuilder, OwnedMessage};
const CONNECTION: &'static str = "ws://127.0.0.1:2794";
// Async websocket chat client
fn main() {
println!("Connecting to {}", CONNECTION);
// Construct new Tokio runtime environment
let mut runtime = tokio::runtime::current_thread::Builder::new()
.build()
.unwrap();
let (usr_msg, stdin_ch) = mpsc::channel(0);
// Spawn new thread to read user input
thread::spawn(|| {
let mut input = String::new();
let mut stdin_sink = usr_msg.wait();
loop {
// Read user input from stdin
input.clear();
stdin().read_line(&mut input).unwrap();
// Trim whitespace and match input to known chat commands
// If input is unknown, send trimmed input as a chat message
let trimmed = input.trim();
let (close, msg) = match trimmed {
"/close" => (true, OwnedMessage::Close(None)),
"/ping" => (false, OwnedMessage::Ping(b"PING".to_vec())),
_ => (false, OwnedMessage::Text(trimmed.to_string())),
};
// Send message to websocket server
stdin_sink
.send(msg)
.expect("Sending message across stdin channel.");
// If user entered the "/close" command, break the loop
if close {
break;
}
}
});
// Construct a new connection to the websocket server
let runner = ClientBuilder::new(CONNECTION)
.unwrap()
.add_protocol("rust-websocket")
.async_connect_insecure()
.and_then(|(duplex, _)| {
let (sink, stream) = duplex.split();
stream
// Iterate over message as they arrive in stream
.filter_map(|message| {
println!("Received Message: {:?}", message);
// Respond to close or ping commands from the server
match message {
OwnedMessage::Ping(d) => Some(OwnedMessage::Pong(d)),
_ => None,
}
})
// Takes in messages from both sinks
.select(stdin_ch.map_err(|_| WebSocketError::NoDataAvailable))
// Return a future that completes once all incoming data from the above streams has been processed into the sink
.forward(sink)
});
// Start our websocket client runner in the Tokio environment
let _ = runtime.block_on(runner).unwrap();
}