Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(websocket): only consume necessary bytes #1812

Merged
merged 4 commits into from
Dec 19, 2022
Merged
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
94 changes: 55 additions & 39 deletions lib/websocket/receiver.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,11 @@ const { kReadyState, kSentClose, kResponse, kReceivedClose } = require('./symbol
const { isValidStatusCode, failWebsocketConnection, websocketMessageReceived } = require('./util')
const { WebsocketFrameSend } = require('./frame')

// This code was influenced by ws released under the MIT license.
// Copyright (c) 2011 Einar Otto Stangvik <[email protected]>
// Copyright (c) 2013 Arnout Kazemier and contributors
// Copyright (c) 2016 Luigi Pinca and contributors

const channels = {}
channels.ping = diagnosticsChannel.channel('undici:websocket:ping')
channels.pong = diagnosticsChannel.channel('undici:websocket:pong')
Expand Down Expand Up @@ -49,7 +54,7 @@ class ByteParser extends Writable {
return callback()
}

const buffer = Buffer.concat(this.#buffers, this.#byteOffset)
const buffer = this.consume(2)

this.#info.fin = (buffer[0] & 0x80) !== 0
this.#info.opcode = buffer[0] & 0x0F
Expand Down Expand Up @@ -96,7 +101,7 @@ class ByteParser extends Writable {
return
}

const body = buffer.subarray(2, payloadLength + 2)
const body = this.consume(payloadLength)

this.#info.closeInfo = this.parseCloseBody(false, body)

Expand Down Expand Up @@ -125,9 +130,6 @@ class ByteParser extends Writable {
this.ws[kReadyState] = states.CLOSING
this.ws[kReceivedClose] = true

this.#buffers = [buffer.subarray(2 + payloadLength)]
this.#byteOffset -= 2 + payloadLength

this.end()

return
Expand All @@ -137,8 +139,9 @@ class ByteParser extends Writable {
// A Pong frame sent in response to a Ping frame must have identical
// "Application data"

const body = this.consume(payloadLength)

if (!this.ws[kReceivedClose]) {
const body = payloadLength === 0 ? undefined : buffer.subarray(2, payloadLength + 2)
const frame = new WebsocketFrameSend(body)

this.ws[kResponse].socket.write(frame.createFrame(opcodes.PONG))
Expand All @@ -150,8 +153,6 @@ class ByteParser extends Writable {
}
}

this.#buffers = [buffer.subarray(2 + payloadLength)]
this.#byteOffset -= 2 + payloadLength
this.#state = parserStates.INFO

if (this.#byteOffset > 0) {
Expand All @@ -165,72 +166,50 @@ class ByteParser extends Writable {
// unidirectional heartbeat. A response to an unsolicited Pong frame is
// not expected.

const body = this.consume(payloadLength)

if (channels.pong.hasSubscribers) {
channels.pong.publish({
payload: buffer.subarray(2, payloadLength + 2)
payload: body
})
}

this.#buffers = [buffer.subarray(2 + payloadLength)]
this.#byteOffset -= 2 + payloadLength

if (this.#byteOffset > 0) {
return this.run(callback)
} else {
callback()
return
}
}

// TODO: handle control frames here. Since they are unfragmented, and can
// be sent in the middle of other frames, we shouldn't parse them as normal.

this.#buffers = [buffer.subarray(2)]
this.#byteOffset -= 2
} else if (this.#state === parserStates.PAYLOADLENGTH_16) {
if (this.#byteOffset < 2) {
return callback()
}

const buffer = Buffer.concat(this.#buffers, this.#byteOffset)
const buffer = this.consume(2)

// TODO: optimize this
this.#info.payloadLength = buffer.subarray(0, 2).readUInt16BE(0)
this.#info.payloadLength = buffer.readUInt16BE(0)
this.#state = parserStates.READ_DATA

this.#buffers = [buffer.subarray(2)]
this.#byteOffset -= 2
} else if (this.#state === parserStates.PAYLOADLENGTH_64) {
if (this.#byteOffset < 8) {
return callback()
}

const buffer = Buffer.concat(this.#buffers, this.#byteOffset)
const buffer = this.consume(8)

// TODO: optimize this
this.#info.payloadLength = buffer.subarray(0, 8).readBigUint64BE(0)
this.#info.payloadLength = buffer.readBigUint64BE(0)
this.#state = parserStates.READ_DATA

this.#buffers = [buffer.subarray(8)]
this.#byteOffset -= 8
} else if (this.#state === parserStates.READ_DATA) {
if (this.#byteOffset < this.#info.payloadLength) {
// If there is still more data in this chunk that needs to be read
return callback()
} else if (this.#byteOffset >= this.#info.payloadLength) {
// If the server sent multiple frames in a single chunk
const buffer = Buffer.concat(this.#buffers, this.#byteOffset)

const body = buffer.subarray(0, this.#info.payloadLength)
this.#fragments.push(body)
const body = this.consume(this.#info.payloadLength)

if (this.#byteOffset > this.#info.payloadLength) {
this.#buffers = [buffer.subarray(body.length)]
this.#byteOffset -= body.length
} else {
this.#buffers.length = 0
this.#byteOffset = 0
}
this.#fragments.push(body)

// If the frame is unfragmented, or a fragmented frame was terminated,
// a message was received
Expand All @@ -254,6 +233,43 @@ class ByteParser extends Writable {
}
}

/**
* Take n bytes from the buffered Buffers
* @param {number} n
* @returns {Buffer|null}
*/
consume (n) {
if (n > this.#byteOffset) {
return null
} else if (n === 0) {
return Buffer.allocUnsafe(0)
KhafraDev marked this conversation as resolved.
Show resolved Hide resolved
}

const buffer = Buffer.allocUnsafe(n)
let offset = 0

while (offset !== n) {
const next = this.#buffers[0]
const { length } = next

if (length + offset === n) {
buffer.set(this.#buffers.shift(), offset)
KhafraDev marked this conversation as resolved.
Show resolved Hide resolved
break
} else if (length + offset > n) {
buffer.set(next.subarray(0, n - offset), offset)
this.#buffers[0] = next.subarray(n - offset)
break
} else {
buffer.set(this.#buffers.shift(), offset)
offset += next.length
}
}

this.#byteOffset -= n

return buffer
}

parseCloseBody (onlyCode, data) {
// https://datatracker.ietf.org/doc/html/rfc6455#section-7.1.5
/** @type {number|undefined} */
Expand Down