-
Notifications
You must be signed in to change notification settings - Fork 529
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat(websocket): only consume necessary bytes (#1812)
- Loading branch information
Showing
3 changed files
with
67 additions
and
43 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -2,11 +2,16 @@ | |
|
||
const { Writable } = require('stream') | ||
const diagnosticsChannel = require('diagnostics_channel') | ||
const { parserStates, opcodes, states } = require('./constants') | ||
const { parserStates, opcodes, states, emptyBuffer } = require('./constants') | ||
const { kReadyState, kSentClose, kResponse, kReceivedClose } = require('./symbols') | ||
const { isValidStatusCode, failWebsocketConnection, websocketMessageReceived } = require('./util') | ||
const { WebsocketFrameSend } = require('./frame') | ||
|
||
// This code was influenced by ws released under the MIT license. | ||
// Copyright (c) 2011 Einar Otto Stangvik <[email protected]> | ||
// Copyright (c) 2013 Arnout Kazemier and contributors | ||
// Copyright (c) 2016 Luigi Pinca and contributors | ||
|
||
const channels = {} | ||
channels.ping = diagnosticsChannel.channel('undici:websocket:ping') | ||
channels.pong = diagnosticsChannel.channel('undici:websocket:pong') | ||
|
@@ -49,7 +54,7 @@ class ByteParser extends Writable { | |
return callback() | ||
} | ||
|
||
const buffer = Buffer.concat(this.#buffers, this.#byteOffset) | ||
const buffer = this.consume(2) | ||
|
||
this.#info.fin = (buffer[0] & 0x80) !== 0 | ||
this.#info.opcode = buffer[0] & 0x0F | ||
|
@@ -96,7 +101,7 @@ class ByteParser extends Writable { | |
return | ||
} | ||
|
||
const body = buffer.subarray(2, payloadLength + 2) | ||
const body = this.consume(payloadLength) | ||
|
||
this.#info.closeInfo = this.parseCloseBody(false, body) | ||
|
||
|
@@ -125,9 +130,6 @@ class ByteParser extends Writable { | |
this.ws[kReadyState] = states.CLOSING | ||
this.ws[kReceivedClose] = true | ||
|
||
this.#buffers = [buffer.subarray(2 + payloadLength)] | ||
this.#byteOffset -= 2 + payloadLength | ||
|
||
this.end() | ||
|
||
return | ||
|
@@ -137,8 +139,9 @@ class ByteParser extends Writable { | |
// A Pong frame sent in response to a Ping frame must have identical | ||
// "Application data" | ||
|
||
const body = this.consume(payloadLength) | ||
|
||
if (!this.ws[kReceivedClose]) { | ||
const body = payloadLength === 0 ? undefined : buffer.subarray(2, payloadLength + 2) | ||
const frame = new WebsocketFrameSend(body) | ||
|
||
this.ws[kResponse].socket.write(frame.createFrame(opcodes.PONG)) | ||
|
@@ -150,8 +153,6 @@ class ByteParser extends Writable { | |
} | ||
} | ||
|
||
this.#buffers = [buffer.subarray(2 + payloadLength)] | ||
this.#byteOffset -= 2 + payloadLength | ||
this.#state = parserStates.INFO | ||
|
||
if (this.#byteOffset > 0) { | ||
|
@@ -165,72 +166,50 @@ class ByteParser extends Writable { | |
// unidirectional heartbeat. A response to an unsolicited Pong frame is | ||
// not expected. | ||
|
||
const body = this.consume(payloadLength) | ||
|
||
if (channels.pong.hasSubscribers) { | ||
channels.pong.publish({ | ||
payload: buffer.subarray(2, payloadLength + 2) | ||
payload: body | ||
}) | ||
} | ||
|
||
this.#buffers = [buffer.subarray(2 + payloadLength)] | ||
this.#byteOffset -= 2 + payloadLength | ||
|
||
if (this.#byteOffset > 0) { | ||
return this.run(callback) | ||
} else { | ||
callback() | ||
return | ||
} | ||
} | ||
|
||
// TODO: handle control frames here. Since they are unfragmented, and can | ||
// be sent in the middle of other frames, we shouldn't parse them as normal. | ||
|
||
this.#buffers = [buffer.subarray(2)] | ||
this.#byteOffset -= 2 | ||
} else if (this.#state === parserStates.PAYLOADLENGTH_16) { | ||
if (this.#byteOffset < 2) { | ||
return callback() | ||
} | ||
|
||
const buffer = Buffer.concat(this.#buffers, this.#byteOffset) | ||
const buffer = this.consume(2) | ||
|
||
// TODO: optimize this | ||
this.#info.payloadLength = buffer.subarray(0, 2).readUInt16BE(0) | ||
this.#info.payloadLength = buffer.readUInt16BE(0) | ||
this.#state = parserStates.READ_DATA | ||
|
||
this.#buffers = [buffer.subarray(2)] | ||
this.#byteOffset -= 2 | ||
} else if (this.#state === parserStates.PAYLOADLENGTH_64) { | ||
if (this.#byteOffset < 8) { | ||
return callback() | ||
} | ||
|
||
const buffer = Buffer.concat(this.#buffers, this.#byteOffset) | ||
const buffer = this.consume(8) | ||
|
||
// TODO: optimize this | ||
this.#info.payloadLength = buffer.subarray(0, 8).readBigUint64BE(0) | ||
this.#info.payloadLength = buffer.readBigUint64BE(0) | ||
this.#state = parserStates.READ_DATA | ||
|
||
this.#buffers = [buffer.subarray(8)] | ||
this.#byteOffset -= 8 | ||
} else if (this.#state === parserStates.READ_DATA) { | ||
if (this.#byteOffset < this.#info.payloadLength) { | ||
// If there is still more data in this chunk that needs to be read | ||
return callback() | ||
} else if (this.#byteOffset >= this.#info.payloadLength) { | ||
// If the server sent multiple frames in a single chunk | ||
const buffer = Buffer.concat(this.#buffers, this.#byteOffset) | ||
|
||
const body = buffer.subarray(0, this.#info.payloadLength) | ||
this.#fragments.push(body) | ||
const body = this.consume(this.#info.payloadLength) | ||
|
||
if (this.#byteOffset > this.#info.payloadLength) { | ||
this.#buffers = [buffer.subarray(body.length)] | ||
this.#byteOffset -= body.length | ||
} else { | ||
this.#buffers.length = 0 | ||
this.#byteOffset = 0 | ||
} | ||
this.#fragments.push(body) | ||
|
||
// If the frame is unfragmented, or a fragmented frame was terminated, | ||
// a message was received | ||
|
@@ -254,6 +233,48 @@ class ByteParser extends Writable { | |
} | ||
} | ||
|
||
/** | ||
* Take n bytes from the buffered Buffers | ||
* @param {number} n | ||
* @returns {Buffer|null} | ||
*/ | ||
consume (n) { | ||
if (n > this.#byteOffset) { | ||
return null | ||
} else if (n === 0) { | ||
return emptyBuffer | ||
} | ||
|
||
if (this.#buffers[0].length === n) { | ||
this.#byteOffset -= this.#buffers[0].length | ||
return this.#buffers.shift() | ||
} | ||
|
||
const buffer = Buffer.allocUnsafe(n) | ||
let offset = 0 | ||
|
||
while (offset !== n) { | ||
const next = this.#buffers[0] | ||
const { length } = next | ||
|
||
if (length + offset === n) { | ||
buffer.set(this.#buffers.shift(), offset) | ||
break | ||
} else if (length + offset > n) { | ||
buffer.set(next.subarray(0, n - offset), offset) | ||
this.#buffers[0] = next.subarray(n - offset) | ||
break | ||
} else { | ||
buffer.set(this.#buffers.shift(), offset) | ||
offset += next.length | ||
} | ||
} | ||
|
||
this.#byteOffset -= n | ||
|
||
return buffer | ||
} | ||
|
||
parseCloseBody (onlyCode, data) { | ||
// https://datatracker.ietf.org/doc/html/rfc6455#section-7.1.5 | ||
/** @type {number|undefined} */ | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters