-
Notifications
You must be signed in to change notification settings - Fork 222
/
pong.rs
130 lines (109 loc) · 4 KB
/
pong.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
extern crate env_logger;
extern crate mio_extras;
extern crate time;
/// An example demonstrating how to send and recieve a custom ping/pong frame.
extern crate ws;
use std::str::from_utf8;
use mio_extras::timer::Timeout;
use ws::util::Token;
use ws::{listen, CloseCode, Error, ErrorKind, Frame, Handler, Handshake, Message, OpCode, Result,
Sender};
const PING: Token = Token(1);
const EXPIRE: Token = Token(2);
fn main() {
// Setup logging
env_logger::init();
// Run the WebSocket
listen("127.0.0.1:3012", |out| Server {
out,
ping_timeout: None,
expire_timeout: None,
}).unwrap();
}
// Server WebSocket handler
struct Server {
out: Sender,
ping_timeout: Option<Timeout>,
expire_timeout: Option<Timeout>,
}
impl Handler for Server {
fn on_open(&mut self, _: Handshake) -> Result<()> {
// schedule a timeout to send a ping every 5 seconds
self.out.timeout(5_000, PING)?;
// schedule a timeout to close the connection if there is no activity for 30 seconds
self.out.timeout(30_000, EXPIRE)
}
fn on_message(&mut self, msg: Message) -> Result<()> {
println!("Server got message '{}'. ", msg);
self.out.send(msg)
}
fn on_close(&mut self, code: CloseCode, reason: &str) {
println!("WebSocket closing for ({:?}) {}", code, reason);
// NOTE: This code demonstrates cleaning up timeouts
if let Some(t) = self.ping_timeout.take() {
self.out.cancel(t).unwrap();
}
if let Some(t) = self.expire_timeout.take() {
self.out.cancel(t).unwrap();
}
println!("Shutting down server after first connection closes.");
self.out.shutdown().unwrap();
}
fn on_error(&mut self, err: Error) {
// Shutdown on any error
println!("Shutting down server for error: {}", err);
self.out.shutdown().unwrap();
}
fn on_timeout(&mut self, event: Token) -> Result<()> {
match event {
// PING timeout has occured, send a ping and reschedule
PING => {
self.out.ping(time::precise_time_ns().to_string().into())?;
self.ping_timeout.take();
self.out.timeout(5_000, PING)
}
// EXPIRE timeout has occured, this means that the connection is inactive, let's close
EXPIRE => self.out.close(CloseCode::Away),
// No other timeouts are possible
_ => Err(Error::new(
ErrorKind::Internal,
"Invalid timeout token encountered!",
)),
}
}
fn on_new_timeout(&mut self, event: Token, timeout: Timeout) -> Result<()> {
// Cancel the old timeout and replace.
if event == EXPIRE {
if let Some(t) = self.expire_timeout.take() {
self.out.cancel(t)?
}
self.expire_timeout = Some(timeout)
} else {
// This ensures there is only one ping timeout at a time
if let Some(t) = self.ping_timeout.take() {
self.out.cancel(t)?
}
self.ping_timeout = Some(timeout)
}
Ok(())
}
fn on_frame(&mut self, frame: Frame) -> Result<Option<Frame>> {
// If the frame is a pong, print the round-trip time.
// The pong should contain data from out ping, but it isn't guaranteed to.
if frame.opcode() == OpCode::Pong {
if let Ok(pong) = from_utf8(frame.payload())?.parse::<u64>() {
let now = time::precise_time_ns();
println!("RTT is {:.3}ms.", (now - pong) as f64 / 1_000_000f64);
} else {
println!("Received bad pong.");
}
}
// Some activity has occured, so reset the expiration
self.out.timeout(30_000, EXPIRE)?;
// Run default frame validation
DefaultHandler.on_frame(frame)
}
}
// For accessing the default handler implementation
struct DefaultHandler;
impl Handler for DefaultHandler {}