forked from Agoric/agoric-sdk
-
Notifications
You must be signed in to change notification settings - Fork 0
/
ws.js
140 lines (119 loc) · 3.75 KB
/
ws.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
import harden from '@agoric/harden';
import xsws from "xs-net/websocket";
import Base64 from "base64";
import {Digest} from "crypt";
const later = thunk => Promise.resolve(null).then(thunk);
// ISSUE: factor out overlap with http.js?
function makeEmitter() {
const byName = {};
return harden({
on(name, handler) {
if (name in byName) {
byName[name].push(handler);
} else {
byName[name] = [handler];
}
},
emit(name, ...args) {
const handlers = byName[name];
if (!handlers) { return; }
for (const handler of handlers) {
later(() => handler(...args));
}
}
});
}
function WebSocket({ path, host, headers, protocol, socket }) {
const events = makeEmitter();
const xclient = new xsws.Client({ path, host, headers, protocol, socket });
const emit = events.emit;
xclient.callback = function(message, value) {
switch (message) {
case xsws.Client.connect:
emit('connection');
break;
case xsws.Client.receive:
emit('message', value);
break;
case xsws.Client.disconnect:
emit('close');
}
};
const OPEN = 1;
return harden({
connection: socket,
readyState: OPEN, // ISSUE: support other states?
OPEN,
emit,
on: events.on,
send(message) {
xclient.write(message);
},
});
}
function handshakeResponse(key, protocol) {
let sha1 = new Digest("SHA1");
sha1.write(key);
sha1.write('258EAFA5-E914-47DA-95CA-C5AB0DC85B11');
const response = [
"HTTP/1.1 101 Web Socket Protocol Handshake\r\n",
"Connection: Upgrade\r\n",
"Upgrade: websocket\r\n",
"Sec-WebSocket-Accept: ", Base64.encode(sha1.close()), "\r\n",
];
if (protocol) {
response.push("Sec-WebSocket-Protocol: ", protocol, "\r\n");
}
response.push("\r\n");
return response;
}
export function Server({ noServer }) {
if (!noServer) { throw new Error('not supported: noServer: false'); }
const events = makeEmitter();
function on(name, handler) {
if (!['upgrade', 'connection'].includes(name)) {
throw new Error(`not supported: on(${name})`);
}
events.on(name, handler);
}
on('connection', (ws, req) => {
// keep track of ws for closing?
});
return harden({
on,
handleUpgrade(nodeReq, nodeSocket, head, wsHandler) {
// TODO: only handle given path?
console.log('@@handleUpgrade', nodeReq, nodeSocket, head, wsHandler);
const { path, host, headers } = nodeReq;
const key = headers['sec-websocket-key'];
if (headers['sec-websocket-version'] !== '13' || !key) {
// not a valid websocket handshake
// ISSUE: report error somewhere?
return;
}
const protocol = undefined; // ISSUE: TODO? needed?
const response = handshakeResponse(key, protocol);
console.log('@@handleUpgrade response', response);
const { _xs_socket: socket } = nodeSocket;
console.log('@@handleUpgrade socket.write(response)');
try {
socket.write.apply(socket, response);
} catch (wtf) {
console.error('@@handleUpgrade socket.write:', wtf.message);
}
console.log('@@handleUpgrade socket.read()...');
const toRead = socket.read();
console.log('@@... handleUpgrade socket.read() done.');
if (toRead !== 0) {
// unexpected to receive a websocket message before server receives handshake
console.log('@@unexpected to receive a websocket message before server receives handshake', toRead);
throw new Error('not implemented: message with handshake');
}
console.log('@@ws shim handleUpgrade making WebSocket:', { path, host, headers, protocol, socket });
const ws = WebSocket({ path, host, headers, protocol, socket });
later(() => wsHandler(ws));
},
emit: events.emit,
});
}
export default { Server };