Skip to content

Commit

Permalink
feat: use streaming parser for frames
Browse files Browse the repository at this point in the history
  • Loading branch information
KhafraDev committed Dec 13, 2022
1 parent dfc57ab commit a0094a4
Show file tree
Hide file tree
Showing 4 changed files with 153 additions and 154 deletions.
174 changes: 22 additions & 152 deletions lib/websocket/connection.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,23 +2,21 @@

// TODO: crypto isn't available in all environments
const { randomBytes, createHash } = require('crypto')
const { Blob } = require('buffer')
const diagnosticsChannel = require('diagnostics_channel')
const { uid, states, opcodes } = require('./constants')
const {
kReadyState,
kResponse,
kExtensions,
kProtocol,
kBinaryType,
kWebSocketURL,
kController,
kClosingFrame,
kSentClose
kSentClose,
kByteParser
} = require('./symbols')
const { fireEvent, isEstablished, isClosed, isClosing } = require('./util')
const { MessageEvent, CloseEvent } = require('./events')
const { WebsocketFrame, WebsocketFrameSend } = require('./frame')
const { CloseEvent } = require('./events')
const { WebsocketFrameSend } = require('./frame')
const { ByteParser } = require('./receiver')
const { makeRequest } = require('../fetch/request')
const { fetching } = require('../fetch/index')
const { getGlobalDispatcher } = require('../..')
Expand Down Expand Up @@ -182,9 +180,14 @@ function establishWebSocketConnection (url, protocols, ws) {
// once this happens, the connection is open
ws[kResponse] = response

const parser = new ByteParser(ws)
response.socket.ws = ws // TODO: use symbol
ws[kByteParser] = parser

whenConnectionEstablished(ws)

receiveData(ws)
response.socket.on('data', onSocketData)
parser.on('drain', onParserDrain)

socketClosed(ws)
}
Expand Down Expand Up @@ -246,151 +249,16 @@ function whenConnectionEstablished (ws) {
}

/**
* @see https://websockets.spec.whatwg.org/#feedback-from-the-protocol
* @param {import('./websocket').WebSocket} ws
* @param {Buffer} chunk
*/
function receiveData (ws) {
const { [kResponse]: response } = ws

/** @type {WebsocketFrame|undefined} */
let frame

response.socket.on('data', (chunk) => {
const receivedFrame = WebsocketFrame.from(chunk)
const opcode = receivedFrame.opcode

if (
(opcode === opcodes.CONTINUATION && !receivedFrame.fragmented) ||
((opcode === opcodes.TEXT || opcode === opcodes.BINARY) && receivedFrame.fragmented) ||
(opcode >= opcodes.CLOSE && opcode <= opcodes.PONG && !receivedFrame.fin) ||
(opcode >= opcodes.CLOSE && opcode <= opcodes.PONG && receivedFrame.payloadLength > 127) ||
(opcode === opcodes.CLOSE && receivedFrame.payloadLength === 1) ||
(opcode >= 0x3 && opcode <= 0x7) || // reserved
(opcode >= 0xB) // reserved
) {
failWebsocketConnection(ws[kController], response.socket)
return
}

if (!frame) {
frame = receivedFrame
frame.terminated = frame.dataOffset === frame.payloadLength // message complete
} else {
// A fragmented message consists of a single frame with the FIN bit
// clear and an opcode other than 0, followed by zero or more frames
// with the FIN bit clear and the opcode set to 0, and terminated by
// a single frame with the FIN bit set and an opcode of 0.

if (opcode === opcodes.CONTINUATION) {
if (receivedFrame.fin) {
frame.terminated = true
}

frame.addFrame(receivedFrame.data)
} else if (opcode === opcodes.PING) {
return handlePing(response.socket, ws, receivedFrame)
} else if (opcode === opcodes.PONG) {
return handlePong(receivedFrame)
}
}

// If a control frame (Section 5.5) is
// received, the frame MUST be handled as defined by Section 5.5.
if (frame.opcode === opcodes.PING) {
return handlePing(response.socket, ws, frame)
} else if (frame.opcode === opcodes.PONG) {
return handlePong(frame)
} else if (frame.opcode === opcodes.CLOSE) {
// Upon either sending or receiving a Close control frame, it is said
// that _The WebSocket Closing Handshake is Started_ and that the
// WebSocket connection is in the CLOSING state.
ws[kReadyState] = states.CLOSING
ws[kClosingFrame] = frame

if (!ws[kSentClose]) {
const result = frame.parseCloseBody(true)

// If this Close control frame contains no status code, _The WebSocket
// Connection Close Code_ is considered to be 1005.
const code = result?.code ?? 1005

const closeFrame = new WebsocketFrameSend(Buffer.allocUnsafe(2))
// When
// sending a Close frame in response, the endpoint typically echos the
// status code it received.
closeFrame.frameData.writeUInt16BE(code, 0)

response.socket.write(
closeFrame.createFrame(opcodes.CLOSE),
() => response.socket.end()
)
ws[kSentClose] = true
}

frame = undefined

return
}

// rsv bits are reserved for future use; if any aren't 0,
// we currently can't handle them.
if (frame.rsv1 || frame.rsv2 || frame.rsv3) {
failWebsocketConnection(ws[kController])
return
}

// If the frame comprises an unfragmented
// message (Section 5.4), it is said that _A WebSocket Message Has Been
// Received_

if (frame.terminated) {
// 1. If ready state is not OPEN (1), then return.
if (ws[kReadyState] !== states.OPEN) {
return
}

// 2. Let dataForEvent be determined by switching on type and binary type:
let dataForEvent

if (frame.opcode === opcodes.TEXT) {
const { data } = frame

// - type indicates that the data is Text
// a new DOMString containing data
try {
// TODO: optimize this
dataForEvent = new TextDecoder('utf-8', { fatal: true }).decode(data)
} catch {
failWebsocketConnection(ws[kController], response.socket)
return
}
} else if (frame.opcode === opcodes.BINARY) {
if (ws[kBinaryType] === 'blob') {
// - type indicates that the data is Binary and binary type is "blob"
// a new Blob object, created in the relevant Realm of the
// WebSocket object, that represents data as its raw data
dataForEvent = new Blob([frame.data])
} else {
// - type indicates that the data is Binary and binary type is
// "arraybuffer"
// a new ArrayBuffer object, created in the relevant Realm of the
// WebSocket object, whose contents are data
dataForEvent = new Uint8Array(frame.data).buffer
}
}

// 3. Fire an event named message at the WebSocket object, using
// MessageEvent, with the origin attribute initialized to the
// serialization of the WebSocket object’s url's origin, and the data
// attribute initialized to dataForEvent.
fireEvent('message', ws, MessageEvent, {
origin: ws[kWebSocketURL].origin,
data: dataForEvent
})
function onSocketData (chunk) {
if (!this.ws[kByteParser].write(chunk)) {
this.pause()
}
}

frame = undefined
}
})
function onParserDrain () {
this.ws[kResponse].socket.resume()
}

/**
Expand Down Expand Up @@ -501,5 +369,7 @@ function handlePong (frame) {

module.exports = {
establishWebSocketConnection,
failWebsocketConnection
failWebsocketConnection,
handlePing,
handlePong
}
10 changes: 9 additions & 1 deletion lib/websocket/constants.js
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,18 @@ const opcodes = {

const maxUnsigned16Bit = 2 ** 16 - 1 // 65535

const parserStates = {
INFO: 0,
PAYLOADLENGTH_16: 2,
PAYLOADLENGTH_64: 3,
READ_DATA: 4
}

module.exports = {
uid,
staticPropertyDescriptors,
states,
opcodes,
maxUnsigned16Bit
maxUnsigned16Bit,
parserStates
}
120 changes: 120 additions & 0 deletions lib/websocket/receiver.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
const { Writable } = require('stream')
const { parserStates } = require('./constants')

class ByteParser extends Writable {
#buffers = []
#byteOffset = 0

#state = parserStates.INFO

#info = {}

constructor (ws) {
super()

this.ws = ws
}

/**
* @param {Buffer} chunk
* @param {() => void} callback
*/
_write (chunk, _, callback) {
this.#buffers.push(chunk)
this.#byteOffset += chunk.length

this.run(callback)
}

/**
* Runs whenever a new chunk is received.
* Callback is called whenever there are no more chunks buffering,
* or not enough bytes are buffered to parse.
*/
run (callback) {
if (this.#state === parserStates.INFO) {
// If there aren't enough bytes to parse the payload length, etc.
if (this.#byteOffset < 2) {
return callback()
}

const buffer = Buffer.concat(this.#buffers, this.#byteOffset)

this.#info.fin = (buffer[0] & 0x80) !== 0
this.#info.opcode = buffer[0] & 0x0F

// TODO: HANDLE INVALID OPCODES HERE

const payloadLength = buffer[1] & 0x7F

if (payloadLength <= 125) {
this.#info.payloadLength = payloadLength
this.#state = parserStates.READ_DATA
} else if (payloadLength === 126) {
this.#state = parserStates.PAYLOADLENGTH_16
} else if (payloadLength === 127) {
this.#state = parserStates.PAYLOADLENGTH_64
}

this.#buffers = [buffer.subarray(2)]
this.#byteOffset -= 2
} else if (this.#state === parserStates.PAYLOADLENGTH_16) {
if (this.#byteOffset < 2) {
return callback()
}

const buffer = Buffer.concat(this.#buffers, this.#byteOffset)

// TODO: optimize this
this.#info.payloadLength = buffer.subarray(0, 2).readUInt16BE(0)
this.#state = parserStates.READ_DATA

this.#buffers = [buffer.subarray(2)]
this.#byteOffset -= 2
} else if (this.#state === parserStates.PAYLOADLENGTH_64) {
if (this.#byteOffset < 8) {
return callback()
}

const buffer = Buffer.concat(this.#buffers, this.#byteOffset)

// TODO: optimize this
this.#info.payloadLength = buffer.subarray(0, 8).readBigUint64BE(0)
this.#state = parserStates.READ_DATA

this.#buffers = [buffer.subarray(8)]
this.#byteOffset -= 8
} else if (this.#state === parserStates.READ_DATA) {
if (this.#byteOffset < this.#info.payloadLength) {
// If there is still more data in this chunk that needs to be read
return callback()
} else if (this.#byteOffset >= this.#info.payloadLength) {
// If the server sent multiple frames in a single chunk
const buffer = Buffer.concat(this.#buffers, this.#byteOffset)

this.#info.data = buffer.subarray(0, this.#info.payloadLength)

if (this.#byteOffset > this.#info.payloadLength) {
this.#buffers = [buffer.subarray(this.#info.data.length)]
this.#byteOffset -= this.#info.data.length
} else {
this.#buffers.length = 0
this.#byteOffset = 0
}

this.#info = {}
this.#state = parserStates.INFO
}
}

if (this.#byteOffset > 0) {
return this.run(callback)
} else {
callback()
}
}
}

module.exports = {
ByteParser
}
3 changes: 2 additions & 1 deletion lib/websocket/symbols.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,5 +9,6 @@ module.exports = {
kProtocol: Symbol('protocol'),
kBinaryType: Symbol('binary type'),
kClosingFrame: Symbol('closing frame'),
kSentClose: Symbol('sent close')
kSentClose: Symbol('sent close'),
kByteParser: Symbol('byte parser')
}

0 comments on commit a0094a4

Please sign in to comment.