Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

implement sending a message for WS and add a websocketFrame class #30

Merged
merged 1 commit into from
Dec 4, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
148 changes: 131 additions & 17 deletions lib/websocket/connection.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
// TODO: crypto isn't available in all environments
const { randomBytes, createHash } = require('crypto')
const { Blob } = require('buffer')
const { uid, states } = require('./constants')
const { uid, states, opcodes } = require('./constants')
const {
kReadyState,
kResponse,
Expand Down Expand Up @@ -225,6 +225,123 @@ function whenConnectionEstablished (ws) {
fireEvent('open', ws)
}

class WebsocketFrame {
/*
https://www.rfc-editor.org/rfc/rfc6455#section-5.2
0 1 2 3
0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
+-+-+-+-+-------+-+-------------+-------------------------------+
|F|R|R|R| opcode|M| Payload len | Extended payload length |
|I|S|S|S| (4) |A| (7) | (16/64) |
|N|V|V|V| |S| | (if payload len==126/127) |
| |1|2|3| |K| | |
+-+-+-+-+-------+-+-------------+ - - - - - - - - - - - - - - - +
| Extended payload length continued, if payload len == 127 |
+ - - - - - - - - - - - - - - - +-------------------------------+
| |Masking-key, if MASK set to 1 |
+-------------------------------+-------------------------------+
| Masking-key (continued) | Payload Data |
+-------------------------------- - - - - - - - - - - - - - - - +
: Payload Data continued ... :
+ - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - +
| Payload Data continued ... |
+---------------------------------------------------------------+
*/
constructor () {
this.FIN = false
this.RSV1 = false
this.RSV2 = false
this.RSV3 = false
this.isMasked = false
this.maskKey = Buffer.alloc(4)
this.payloadLength = 0
}

toBuffer () {
// TODO: revisit the buffer size calculaton
const buffer = Buffer.alloc(4 + 2 + this.payloadData.length)

// set FIN flag
if (this.FIN) {
buffer[0] |= 0x80
}

// 2. set opcode
buffer[0] = (buffer[0] & 0xF0) + this.opcode

// 3. set masking flag and masking key
if (this.isMasked) {
buffer[1] |= 0x80
buffer[2] = this.maskKey[0]
buffer[3] = this.maskKey[1]
buffer[4] = this.maskKey[2]
buffer[5] = this.maskKey[3]
}

// 4. set payload length
// TODO: support payload lengths larger than 125
buffer[1] += this.payloadData.length

if (this.isMasked) {
// 6. mask payload data
const maskKey = buffer.slice(2, 6)
/*
j = i MOD 4
transformed-octet-i = original-octet-i XOR masking-key-octet-j
*/
for (let i = 0; i < this.payloadData.length; i++) {
buffer[6 + i] = this.payloadData[i] ^ maskKey[i % 4]
}
}

return buffer
}

static from (buffer) {
let nextByte = 0
const frame = new WebsocketFrame()

frame.FIN = (buffer[0] & 0x80) !== 0
frame.RSV1 = (buffer[0] & 0x40) !== 0
frame.RSV2 = (buffer[0] & 0x20) !== 0
frame.RSV3 = (buffer[0] & 0x10) !== 0
frame.opcode = buffer[0] & 0x0F
frame.isMasked = (buffer[1] & 0x80) !== 0

let payloadLength = 0x7F & buffer[1]

nextByte = 2

if (payloadLength === 126) {
// If 126 the following 2 bytes interpreted as a 16-bit unsigned integer
payloadLength = buffer.slice(2, 4).readUInt16LE()

nextByte = 4
} else if (payloadLength === 127) {
// if 127 the following 8 bytes interpreted as a 64-bit unsigned integer
payloadLength = buffer.slice(2, 10)
nextByte = 10
}

frame.payloadLength = payloadLength
if (frame.isMasked) {
frame.maskKey = buffer.slice(nextByte, nextByte + 4)

const maskedPayloadData = buffer.slice(nextByte + 4)
frame.payloadData = Buffer.alloc(frame.payloadLength)

for (let i = 0; i < frame.payloadLength; i++) {
frame.payloadData[i] = maskedPayloadData[i] ^ frame.maskKey[i % 4]
}
} else {
// we can't parse the payload inside the frame as the payload could be fragmented across multiple frames..
frame.payloadData = buffer.slice(nextByte)
}

return frame
}
}

/**
* @see https://websockets.spec.whatwg.org/#feedback-from-the-protocol
* @param {import('./websocket').WebSocket} ws
Expand All @@ -233,15 +350,10 @@ function receiveData (ws) {
const { [kResponse]: response } = ws

response.socket.on('data', (chunk) => {
const opcode = chunk[0] & 0x0F
const fin = (chunk[0] & 0x80) !== 0
const rsv1 = chunk[0] & 0x40
const rsv2 = chunk[0] & 0x20
const rsv3 = chunk[0] & 0x10

const frame = WebsocketFrame.from(chunk)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Keep in mind that there is no guarantee that chunk is a complete frame. It might be just the first few bytes of a frame, a slice in the middle of a frame, the tail of frame and the beginning of another, or multiple frames.

Copy link
Author

@jodevsa jodevsa Dec 4, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @lpinca, Thank you for your feedback. I was under the impression that any websocket server will flush the socket buffer after each complete frame and we'll always get a chunk with a full frame.

Do you know an easy way to spin up a ws server where I can test these cases?
1- buffer with incomplete frame
2- buffer with multiple frames
3. etc..

Copy link

@lpinca lpinca Dec 4, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was under the impression that any websocket server will flush the socket buffer after each complete frame and we'll always get a chunk with a full frame.

This is outside of the control of any server. It depends on the OS, the protocol (TCP), the network, etc. When you call socket.write(data), there is no guarantee that the other peer receives all the data as a single TCP packet.

Do you know an easy way to spin up a ws server where I can test these cases?

It is easier to do it without a real server. Take a look at https://github.com/websockets/ws/blob/8.11.0/lib/receiver.js and https://github.com/websockets/ws/blob/8.11.0/test/receiver.test.js.

Keep also in mind that the WHATWG spec advertises support for the permessage-deflate extension, so if the server you are connecting to also supports permessage-deflate, you might receive frames whose payload data is compressed. If decompression is slower than incoming data, you also have to handle backpressure.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @lpinca

// rsv bits are reserved for future use; if any aren't 0,
// we currently can't handle them.
if (rsv1 !== 0 || rsv2 !== 0 || rsv3 !== 0) {
if (frame.RSV1 || frame.RSV2 || frame.RSV3) {
failWebsocketConnection(ws[kController])
return
}
Expand All @@ -255,36 +367,37 @@ function receiveData (ws) {

// An unfragmented message consists of a single frame with the FIN
// bit set (Section 5.2) and an opcode other than 0.
if (fin && opcode !== 0) {
if (frame.FIN && frame.opcode !== 0) {
// 1. If ready state is not OPEN (1), then return.
if (ws[kReadyState] !== states.OPEN) {
return
}

// 2. Let dataForEvent be determined by switching on type and binary type:
let dataForEvent

if (opcode === 0x01) {
if (frame.opcode === opcodes.TEXT) {
// - type indicates that the data is Text
// a new DOMString containing data
dataForEvent = new TextDecoder().decode(chunk.slice(2))
} else if (opcode === 0x02 && ws[kBinaryType] === 'blob') {

dataForEvent = new TextDecoder().decode(frame.payloadData)
} else if (frame.opcode === opcodes.BINARY && ws[kBinaryType] === 'blob') {
// - type indicates that the data is Binary and binary type is "blob"
// a new Blob object, created in the relevant Realm of the
// WebSocket object, that represents data as its raw data
dataForEvent = new Blob(chunk.slice(2))
} else if (opcode === 0x02 && ws[kBinaryType] === 'arraybuffer') {
dataForEvent = new Blob(frame.payloadData)
} else if (frame.opcode === opcodes.BINARY && ws[kBinaryType] === 'arraybuffer') {
// - type indicates that the data is Binary and binary type is
// "arraybuffer"
// a new ArrayBuffer object, created in the relevant Realm of the
// WebSocket object, whose contents are data
return new Uint8Array(chunk.slice(2)).buffer
return new Uint8Array(frame.payloadData).buffer
}

// 3. Fire an event named message at the WebSocket object, using
// MessageEvent, with the origin attribute initialized to the
// serialization of the WebSocket object’s url's origin, and the data
// attribute initialized to dataForEvent.

fireEvent('message', ws, MessageEvent, {
origin: ws[kWebSocketURL].origin,
data: dataForEvent
Expand All @@ -295,5 +408,6 @@ function receiveData (ws) {

module.exports = {
establishWebSocketConnection,
failWebsocketConnection
failWebsocketConnection,
WebsocketFrame
}
12 changes: 11 additions & 1 deletion lib/websocket/constants.js
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,18 @@ const states = {
CLOSED: 3
}

const opcodes = {
CONTINUATION: 0x0,
TEXT: 0x1,
BINARY: 0x2,
CLOSE: 0x8,
PING: 0x9,
PONG: 0xA
}

module.exports = {
uid,
staticPropertyDescriptors,
states
states,
opcodes
}
27 changes: 21 additions & 6 deletions lib/websocket/websocket.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,18 @@ const { webidl } = require('../fetch/webidl')
const { hasOwn, isValidHTTPToken } = require('../fetch/util')
const { DOMException } = require('../fetch/constants')
const { URLSerializer } = require('../fetch/dataURL')
const { staticPropertyDescriptors, states } = require('./constants')
const { staticPropertyDescriptors, states, opcodes } = require('./constants')
const {
kWebSocketURL,
kReadyState,
kController,
kExtensions,
kProtocol,
kBinaryType
kBinaryType,
kResponse
} = require('./symbols')
const { isEstablished, isClosing } = require('./util')
const { establishWebSocketConnection, failWebsocketConnection } = require('./connection')
const { establishWebSocketConnection, failWebsocketConnection, WebsocketFrame } = require('./connection')
const { kEnumerableProperty, isBlobLike } = require('../core/util')
const { types } = require('util')

Expand Down Expand Up @@ -218,11 +219,25 @@ class WebSocket extends EventTarget {
// the bufferedAmount attribute by the number of bytes needed to
// express the argument as UTF-8.
if (isEstablished(this) && !isClosing(this)) {
// todo
const socket = this[kResponse].socket
const frame = new WebsocketFrame()
// 1. set FIN to true. TODO: support fragmentation later.
frame.FIN = true
// 2. enable masking
frame.isMasked = true
// 3. set mask key
frame.maskKey = Buffer.alloc(4)
for (let i = 0; i < 4; i++) {
frame.maskKey[i] = Math.floor(Math.random() * 256)
}

// TODO: support BINARY data
frame.opcode = opcodes.TEXT
frame.payloadData = Buffer.from(data)

socket.write(frame.toBuffer())
}
}

throw new TypeError('not implemented')
}

get readyState () {
Expand Down