Skip to content

Commit

Permalink
feat(websocket): only consume necessary bytes (nodejs#1812)
Browse files Browse the repository at this point in the history
  • Loading branch information
KhafraDev authored and anonrig committed Apr 4, 2023
1 parent 095b1dc commit 7338154
Show file tree
Hide file tree
Showing 3 changed files with 67 additions and 43 deletions.
5 changes: 4 additions & 1 deletion lib/websocket/constants.js
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,14 @@ const parserStates = {
READ_DATA: 4
}

const emptyBuffer = Buffer.allocUnsafe(0)

module.exports = {
uid,
staticPropertyDescriptors,
states,
opcodes,
maxUnsigned16Bit,
parserStates
parserStates,
emptyBuffer
}
101 changes: 61 additions & 40 deletions lib/websocket/receiver.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,16 @@

const { Writable } = require('stream')
const diagnosticsChannel = require('diagnostics_channel')
const { parserStates, opcodes, states } = require('./constants')
const { parserStates, opcodes, states, emptyBuffer } = require('./constants')
const { kReadyState, kSentClose, kResponse, kReceivedClose } = require('./symbols')
const { isValidStatusCode, failWebsocketConnection, websocketMessageReceived } = require('./util')
const { WebsocketFrameSend } = require('./frame')

// This code was influenced by ws released under the MIT license.
// Copyright (c) 2011 Einar Otto Stangvik <[email protected]>
// Copyright (c) 2013 Arnout Kazemier and contributors
// Copyright (c) 2016 Luigi Pinca and contributors

const channels = {}
channels.ping = diagnosticsChannel.channel('undici:websocket:ping')
channels.pong = diagnosticsChannel.channel('undici:websocket:pong')
Expand Down Expand Up @@ -49,7 +54,7 @@ class ByteParser extends Writable {
return callback()
}

const buffer = Buffer.concat(this.#buffers, this.#byteOffset)
const buffer = this.consume(2)

this.#info.fin = (buffer[0] & 0x80) !== 0
this.#info.opcode = buffer[0] & 0x0F
Expand Down Expand Up @@ -96,7 +101,7 @@ class ByteParser extends Writable {
return
}

const body = buffer.subarray(2, payloadLength + 2)
const body = this.consume(payloadLength)

this.#info.closeInfo = this.parseCloseBody(false, body)

Expand Down Expand Up @@ -125,9 +130,6 @@ class ByteParser extends Writable {
this.ws[kReadyState] = states.CLOSING
this.ws[kReceivedClose] = true

this.#buffers = [buffer.subarray(2 + payloadLength)]
this.#byteOffset -= 2 + payloadLength

this.end()

return
Expand All @@ -137,8 +139,9 @@ class ByteParser extends Writable {
// A Pong frame sent in response to a Ping frame must have identical
// "Application data"

const body = this.consume(payloadLength)

if (!this.ws[kReceivedClose]) {
const body = payloadLength === 0 ? undefined : buffer.subarray(2, payloadLength + 2)
const frame = new WebsocketFrameSend(body)

this.ws[kResponse].socket.write(frame.createFrame(opcodes.PONG))
Expand All @@ -150,8 +153,6 @@ class ByteParser extends Writable {
}
}

this.#buffers = [buffer.subarray(2 + payloadLength)]
this.#byteOffset -= 2 + payloadLength
this.#state = parserStates.INFO

if (this.#byteOffset > 0) {
Expand All @@ -165,72 +166,50 @@ class ByteParser extends Writable {
// unidirectional heartbeat. A response to an unsolicited Pong frame is
// not expected.

const body = this.consume(payloadLength)

if (channels.pong.hasSubscribers) {
channels.pong.publish({
payload: buffer.subarray(2, payloadLength + 2)
payload: body
})
}

this.#buffers = [buffer.subarray(2 + payloadLength)]
this.#byteOffset -= 2 + payloadLength

if (this.#byteOffset > 0) {
return this.run(callback)
} else {
callback()
return
}
}

// TODO: handle control frames here. Since they are unfragmented, and can
// be sent in the middle of other frames, we shouldn't parse them as normal.

this.#buffers = [buffer.subarray(2)]
this.#byteOffset -= 2
} else if (this.#state === parserStates.PAYLOADLENGTH_16) {
if (this.#byteOffset < 2) {
return callback()
}

const buffer = Buffer.concat(this.#buffers, this.#byteOffset)
const buffer = this.consume(2)

// TODO: optimize this
this.#info.payloadLength = buffer.subarray(0, 2).readUInt16BE(0)
this.#info.payloadLength = buffer.readUInt16BE(0)
this.#state = parserStates.READ_DATA

this.#buffers = [buffer.subarray(2)]
this.#byteOffset -= 2
} else if (this.#state === parserStates.PAYLOADLENGTH_64) {
if (this.#byteOffset < 8) {
return callback()
}

const buffer = Buffer.concat(this.#buffers, this.#byteOffset)
const buffer = this.consume(8)

// TODO: optimize this
this.#info.payloadLength = buffer.subarray(0, 8).readBigUint64BE(0)
this.#info.payloadLength = buffer.readBigUint64BE(0)
this.#state = parserStates.READ_DATA

this.#buffers = [buffer.subarray(8)]
this.#byteOffset -= 8
} else if (this.#state === parserStates.READ_DATA) {
if (this.#byteOffset < this.#info.payloadLength) {
// If there is still more data in this chunk that needs to be read
return callback()
} else if (this.#byteOffset >= this.#info.payloadLength) {
// If the server sent multiple frames in a single chunk
const buffer = Buffer.concat(this.#buffers, this.#byteOffset)

const body = buffer.subarray(0, this.#info.payloadLength)
this.#fragments.push(body)
const body = this.consume(this.#info.payloadLength)

if (this.#byteOffset > this.#info.payloadLength) {
this.#buffers = [buffer.subarray(body.length)]
this.#byteOffset -= body.length
} else {
this.#buffers.length = 0
this.#byteOffset = 0
}
this.#fragments.push(body)

// If the frame is unfragmented, or a fragmented frame was terminated,
// a message was received
Expand All @@ -254,6 +233,48 @@ class ByteParser extends Writable {
}
}

/**
* Take n bytes from the buffered Buffers
* @param {number} n
* @returns {Buffer|null}
*/
consume (n) {
if (n > this.#byteOffset) {
return null
} else if (n === 0) {
return emptyBuffer
}

if (this.#buffers[0].length === n) {
this.#byteOffset -= this.#buffers[0].length
return this.#buffers.shift()
}

const buffer = Buffer.allocUnsafe(n)
let offset = 0

while (offset !== n) {
const next = this.#buffers[0]
const { length } = next

if (length + offset === n) {
buffer.set(this.#buffers.shift(), offset)
break
} else if (length + offset > n) {
buffer.set(next.subarray(0, n - offset), offset)
this.#buffers[0] = next.subarray(n - offset)
break
} else {
buffer.set(this.#buffers.shift(), offset)
offset += next.length
}
}

this.#byteOffset -= n

return buffer
}

parseCloseBody (onlyCode, data) {
// https://datatracker.ietf.org/doc/html/rfc6455#section-7.1.5
/** @type {number|undefined} */
Expand Down
4 changes: 2 additions & 2 deletions lib/websocket/websocket.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
const { webidl } = require('../fetch/webidl')
const { DOMException } = require('../fetch/constants')
const { URLSerializer } = require('../fetch/dataURL')
const { staticPropertyDescriptors, states, opcodes } = require('./constants')
const { staticPropertyDescriptors, states, opcodes, emptyBuffer } = require('./constants')
const {
kWebSocketURL,
kReadyState,
Expand Down Expand Up @@ -207,7 +207,7 @@ class WebSocket extends EventTarget {
// the body MAY contain UTF-8-encoded data with value /reason/
frame.frameData.write(reason, 2, 'utf-8')
} else {
frame.frameData = Buffer.alloc(0)
frame.frameData = emptyBuffer
}

/** @type {import('stream').Duplex} */
Expand Down

0 comments on commit 7338154

Please sign in to comment.