From 42b2d918fbe52b01211afe2dd3f99a42bbcb724c Mon Sep 17 00:00:00 2001 From: Luigi Pinca Date: Sat, 19 Aug 2023 16:03:40 +0200 Subject: [PATCH] [fix] Emit at most one event per microtask To improve compatibility with the WHATWG standard, emit at most one of `'message'`, `'ping'`, and `'pong'` events per tick. Fixes #2159 --- lib/receiver.js | 29 +++++++++++-- test/receiver.test.js | 92 ++++++++++++++++++++++++++++++++++++++++++ test/websocket.test.js | 23 ++++++----- 3 files changed, 130 insertions(+), 14 deletions(-) diff --git a/lib/receiver.js b/lib/receiver.js index 96f572cb1..b5e9a8bca 100644 --- a/lib/receiver.js +++ b/lib/receiver.js @@ -13,12 +13,15 @@ const { concat, toArrayBuffer, unmask } = require('./buffer-util'); const { isValidStatusCode, isValidUTF8 } = require('./validation'); const FastBuffer = Buffer[Symbol.species]; +const promise = Promise.resolve(); + const GET_INFO = 0; const GET_PAYLOAD_LENGTH_16 = 1; const GET_PAYLOAD_LENGTH_64 = 2; const GET_MASK = 3; const GET_DATA = 4; const INFLATING = 5; +const WAIT_MICROTASK = 6; /** * HyBi Receiver implementation. @@ -157,9 +160,23 @@ class Receiver extends Writable { case GET_DATA: err = this.getData(cb); break; + case INFLATING: + this._loop = false; + return; default: - // `INFLATING` + // + // `WAIT_MICROTASK`. + // this._loop = false; + + // + // `queueMicrotask()` is not available in Node.js < 11 and is no + // better anyway. + // + promise.then(() => { + this._state = GET_INFO; + this.startLoop(cb); + }); return; } } while (this._loop); @@ -542,7 +559,7 @@ class Receiver extends Writable { } } - this._state = GET_INFO; + this._state = WAIT_MICROTASK; } /** @@ -559,6 +576,8 @@ class Receiver extends Writable { if (data.length === 0) { this.emit('conclude', 1005, EMPTY_BUFFER); this.end(); + + this._state = GET_INFO; } else { const code = data.readUInt16BE(0); @@ -590,14 +609,16 @@ class Receiver extends Writable { this.emit('conclude', code, buf); this.end(); + + this._state = GET_INFO; } } else if (this._opcode === 0x09) { this.emit('ping', data); + this._state = WAIT_MICROTASK; } else { this.emit('pong', data); + this._state = WAIT_MICROTASK; } - - this._state = GET_INFO; } } diff --git a/test/receiver.test.js b/test/receiver.test.js index 4ae279469..a4e1bb5ad 100644 --- a/test/receiver.test.js +++ b/test/receiver.test.js @@ -1083,4 +1083,96 @@ describe('Receiver', () => { receiver.write(Buffer.from([0x88, 0x03, 0x03, 0xe8, 0xf8])); }); + + it("waits a microtask after each 'message' event", (done) => { + const messages = []; + const receiver = new Receiver(); + + receiver.on('message', (data, isBinary) => { + assert.ok(!isBinary); + + const message = data.toString(); + messages.push(message); + + // `queueMicrotask()` is not available in Node.js < 11. + Promise.resolve().then(() => { + messages.push(`microtask ${message}`); + + if (messages.length === 6) { + assert.deepStrictEqual(messages, [ + '1', + 'microtask 1', + '2', + 'microtask 2', + '3', + 'microtask 3' + ]); + + done(); + } + }); + }); + + receiver.write(Buffer.from('810131810132810133', 'hex')); + }); + + it("waits a microtask after each 'ping' event", (done) => { + const actual = []; + const receiver = new Receiver(); + + receiver.on('ping', (data) => { + const message = data.toString(); + actual.push(message); + + // `queueMicrotask()` is not available in Node.js < 11. + Promise.resolve().then(() => { + actual.push(`microtask ${message}`); + + if (actual.length === 6) { + assert.deepStrictEqual(actual, [ + '1', + 'microtask 1', + '2', + 'microtask 2', + '3', + 'microtask 3' + ]); + + done(); + } + }); + }); + + receiver.write(Buffer.from('890131890132890133', 'hex')); + }); + + it("waits a microtask after each 'pong' event", (done) => { + const actual = []; + const receiver = new Receiver(); + + receiver.on('pong', (data) => { + const message = data.toString(); + actual.push(message); + + // `queueMicrotask()` is not available in Node.js < 11. + Promise.resolve().then(() => { + actual.push(`microtask ${message}`); + + if (actual.length === 6) { + assert.deepStrictEqual(actual, [ + '1', + 'microtask 1', + '2', + 'microtask 2', + '3', + 'microtask 3' + ]); + + done(); + } + }); + }); + + receiver.write(Buffer.from('8A01318A01328A0133', 'hex')); + }); }); diff --git a/test/websocket.test.js b/test/websocket.test.js index cb5b434c0..b90923460 100644 --- a/test/websocket.test.js +++ b/test/websocket.test.js @@ -4109,18 +4109,18 @@ describe('WebSocket', () => { const messages = []; const ws = new WebSocket(`ws://localhost:${wss.address().port}`); - ws.on('open', () => { - ws._socket.on('end', () => { - assert.strictEqual(ws._receiver._state, 5); - }); - }); - ws.on('message', (message, isBinary) => { assert.ok(!isBinary); if (messages.push(message.toString()) > 1) return; - ws.close(1000); + // `queueMicrotask()` is not available in Node.js < 11. + Promise.resolve().then(() => { + process.nextTick(() => { + assert.strictEqual(ws._receiver._state, 5); + ws.close(1000); + }); + }); }); ws.on('close', (code, reason) => { @@ -4365,9 +4365,12 @@ describe('WebSocket', () => { if (messages.push(message.toString()) > 1) return; - process.nextTick(() => { - assert.strictEqual(ws._receiver._state, 5); - ws.terminate(); + // `queueMicrotask()` is not available in Node.js < 11. + Promise.resolve().then(() => { + process.nextTick(() => { + assert.strictEqual(ws._receiver._state, 5); + ws.terminate(); + }); }); });