diff --git a/lib/websocket/constants.js b/lib/websocket/constants.js index 27b5e27c2a5..406b8e3e2f0 100644 --- a/lib/websocket/constants.js +++ b/lib/websocket/constants.js @@ -38,11 +38,14 @@ const parserStates = { READ_DATA: 4 } +const emptyBuffer = Buffer.allocUnsafe(0) + module.exports = { uid, staticPropertyDescriptors, states, opcodes, maxUnsigned16Bit, - parserStates + parserStates, + emptyBuffer } diff --git a/lib/websocket/receiver.js b/lib/websocket/receiver.js index c418a8b97a7..6dde9a8faf5 100644 --- a/lib/websocket/receiver.js +++ b/lib/websocket/receiver.js @@ -2,11 +2,16 @@ const { Writable } = require('stream') const diagnosticsChannel = require('diagnostics_channel') -const { parserStates, opcodes, states } = require('./constants') +const { parserStates, opcodes, states, emptyBuffer } = require('./constants') const { kReadyState, kSentClose, kResponse, kReceivedClose } = require('./symbols') const { isValidStatusCode, failWebsocketConnection, websocketMessageReceived } = require('./util') const { WebsocketFrameSend } = require('./frame') +// This code was influenced by ws released under the MIT license. +// Copyright (c) 2011 Einar Otto Stangvik +// Copyright (c) 2013 Arnout Kazemier and contributors +// Copyright (c) 2016 Luigi Pinca and contributors + const channels = {} channels.ping = diagnosticsChannel.channel('undici:websocket:ping') channels.pong = diagnosticsChannel.channel('undici:websocket:pong') @@ -49,7 +54,7 @@ class ByteParser extends Writable { return callback() } - const buffer = Buffer.concat(this.#buffers, this.#byteOffset) + const buffer = this.consume(2) this.#info.fin = (buffer[0] & 0x80) !== 0 this.#info.opcode = buffer[0] & 0x0F @@ -96,7 +101,7 @@ class ByteParser extends Writable { return } - const body = buffer.subarray(2, payloadLength + 2) + const body = this.consume(payloadLength) this.#info.closeInfo = this.parseCloseBody(false, body) @@ -125,9 +130,6 @@ class ByteParser extends Writable { this.ws[kReadyState] = states.CLOSING this.ws[kReceivedClose] = true - this.#buffers = [buffer.subarray(2 + payloadLength)] - this.#byteOffset -= 2 + payloadLength - this.end() return @@ -137,8 +139,9 @@ class ByteParser extends Writable { // A Pong frame sent in response to a Ping frame must have identical // "Application data" + const body = this.consume(payloadLength) + if (!this.ws[kReceivedClose]) { - const body = payloadLength === 0 ? undefined : buffer.subarray(2, payloadLength + 2) const frame = new WebsocketFrameSend(body) this.ws[kResponse].socket.write(frame.createFrame(opcodes.PONG)) @@ -150,8 +153,6 @@ class ByteParser extends Writable { } } - this.#buffers = [buffer.subarray(2 + payloadLength)] - this.#byteOffset -= 2 + payloadLength this.#state = parserStates.INFO if (this.#byteOffset > 0) { @@ -165,15 +166,14 @@ class ByteParser extends Writable { // unidirectional heartbeat. A response to an unsolicited Pong frame is // not expected. + const body = this.consume(payloadLength) + if (channels.pong.hasSubscribers) { channels.pong.publish({ - payload: buffer.subarray(2, payloadLength + 2) + payload: body }) } - this.#buffers = [buffer.subarray(2 + payloadLength)] - this.#byteOffset -= 2 + payloadLength - if (this.#byteOffset > 0) { return this.run(callback) } else { @@ -181,56 +181,35 @@ class ByteParser extends Writable { return } } - - // TODO: handle control frames here. Since they are unfragmented, and can - // be sent in the middle of other frames, we shouldn't parse them as normal. - - this.#buffers = [buffer.subarray(2)] - this.#byteOffset -= 2 } else if (this.#state === parserStates.PAYLOADLENGTH_16) { if (this.#byteOffset < 2) { return callback() } - const buffer = Buffer.concat(this.#buffers, this.#byteOffset) + const buffer = this.consume(2) - // TODO: optimize this - this.#info.payloadLength = buffer.subarray(0, 2).readUInt16BE(0) + this.#info.payloadLength = buffer.readUInt16BE(0) this.#state = parserStates.READ_DATA - - this.#buffers = [buffer.subarray(2)] - this.#byteOffset -= 2 } else if (this.#state === parserStates.PAYLOADLENGTH_64) { if (this.#byteOffset < 8) { return callback() } - const buffer = Buffer.concat(this.#buffers, this.#byteOffset) + const buffer = this.consume(8) // TODO: optimize this - this.#info.payloadLength = buffer.subarray(0, 8).readBigUint64BE(0) + this.#info.payloadLength = buffer.readBigUint64BE(0) this.#state = parserStates.READ_DATA - - this.#buffers = [buffer.subarray(8)] - this.#byteOffset -= 8 } else if (this.#state === parserStates.READ_DATA) { if (this.#byteOffset < this.#info.payloadLength) { // If there is still more data in this chunk that needs to be read return callback() } else if (this.#byteOffset >= this.#info.payloadLength) { // If the server sent multiple frames in a single chunk - const buffer = Buffer.concat(this.#buffers, this.#byteOffset) - const body = buffer.subarray(0, this.#info.payloadLength) - this.#fragments.push(body) + const body = this.consume(this.#info.payloadLength) - if (this.#byteOffset > this.#info.payloadLength) { - this.#buffers = [buffer.subarray(body.length)] - this.#byteOffset -= body.length - } else { - this.#buffers.length = 0 - this.#byteOffset = 0 - } + this.#fragments.push(body) // If the frame is unfragmented, or a fragmented frame was terminated, // a message was received @@ -254,6 +233,48 @@ class ByteParser extends Writable { } } + /** + * Take n bytes from the buffered Buffers + * @param {number} n + * @returns {Buffer|null} + */ + consume (n) { + if (n > this.#byteOffset) { + return null + } else if (n === 0) { + return emptyBuffer + } + + if (this.#buffers[0].length === n) { + this.#byteOffset -= this.#buffers[0].length + return this.#buffers.shift() + } + + const buffer = Buffer.allocUnsafe(n) + let offset = 0 + + while (offset !== n) { + const next = this.#buffers[0] + const { length } = next + + if (length + offset === n) { + buffer.set(this.#buffers.shift(), offset) + break + } else if (length + offset > n) { + buffer.set(next.subarray(0, n - offset), offset) + this.#buffers[0] = next.subarray(n - offset) + break + } else { + buffer.set(this.#buffers.shift(), offset) + offset += next.length + } + } + + this.#byteOffset -= n + + return buffer + } + parseCloseBody (onlyCode, data) { // https://datatracker.ietf.org/doc/html/rfc6455#section-7.1.5 /** @type {number|undefined} */ diff --git a/lib/websocket/websocket.js b/lib/websocket/websocket.js index cac33d1eaab..a5fa94f623d 100644 --- a/lib/websocket/websocket.js +++ b/lib/websocket/websocket.js @@ -3,7 +3,7 @@ const { webidl } = require('../fetch/webidl') const { DOMException } = require('../fetch/constants') const { URLSerializer } = require('../fetch/dataURL') -const { staticPropertyDescriptors, states, opcodes } = require('./constants') +const { staticPropertyDescriptors, states, opcodes, emptyBuffer } = require('./constants') const { kWebSocketURL, kReadyState, @@ -207,7 +207,7 @@ class WebSocket extends EventTarget { // the body MAY contain UTF-8-encoded data with value /reason/ frame.frameData.write(reason, 2, 'utf-8') } else { - frame.frameData = Buffer.alloc(0) + frame.frameData = emptyBuffer } /** @type {import('stream').Duplex} */