Skip to content

Commit

Permalink
[fix] Emit at most one event per microtask
Browse files Browse the repository at this point in the history
To improve compatibility with the WHATWG standard, emit at most one of
`'message'`, `'ping'`, and `'pong'` events per tick.

Fixes #2159
  • Loading branch information
lpinca committed Aug 19, 2023
1 parent 8f5cc9d commit 820ea28
Show file tree
Hide file tree
Showing 3 changed files with 132 additions and 15 deletions.
32 changes: 27 additions & 5 deletions lib/receiver.js
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ const GET_PAYLOAD_LENGTH_64 = 2;
const GET_MASK = 3;
const GET_DATA = 4;
const INFLATING = 5;
const WAIT_MICROTASK = 6;

/**
* HyBi Receiver implementation.
Expand Down Expand Up @@ -157,10 +158,27 @@ class Receiver extends Writable {
case GET_DATA:
err = this.getData(cb);
break;
default:
// `INFLATING`
case INFLATING:
this._loop = false;
return;
default: {
// `WAIT_MICROTASK`.
this._loop = false;

const next = () => {
this._state = GET_INFO;
this.startLoop(cb);
};

if (typeof queueMicrotask === 'function') {
queueMicrotask(next);
} else {
// `queueMicrotask()` is not available in Node.js < 11.
Promise.resolve().then(next);
}

return;
}
}
} while (this._loop);

Expand Down Expand Up @@ -542,7 +560,7 @@ class Receiver extends Writable {
}
}

this._state = GET_INFO;
this._state = WAIT_MICROTASK;
}

/**
Expand All @@ -559,6 +577,8 @@ class Receiver extends Writable {
if (data.length === 0) {
this.emit('conclude', 1005, EMPTY_BUFFER);
this.end();

this._state = GET_INFO;
} else {
const code = data.readUInt16BE(0);

Expand Down Expand Up @@ -590,14 +610,16 @@ class Receiver extends Writable {

this.emit('conclude', code, buf);
this.end();

this._state = GET_INFO;
}
} else if (this._opcode === 0x09) {
this.emit('ping', data);
this._state = WAIT_MICROTASK;
} else {
this.emit('pong', data);
this._state = WAIT_MICROTASK;
}

this._state = GET_INFO;
}
}

Expand Down
92 changes: 92 additions & 0 deletions test/receiver.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -1083,4 +1083,96 @@ describe('Receiver', () => {

receiver.write(Buffer.from([0x88, 0x03, 0x03, 0xe8, 0xf8]));
});

it("waits a microtask after each 'message' event", (done) => {
const messages = [];
const receiver = new Receiver();

receiver.on('message', (data, isBinary) => {
assert.ok(!isBinary);

const message = data.toString();
messages.push(message);

// `queueMicrotask()` is not available in Node.js < 11.
Promise.resolve().then(() => {
messages.push(`microtask ${message}`);

if (messages.length === 6) {
assert.deepStrictEqual(messages, [
'1',
'microtask 1',
'2',
'microtask 2',
'3',
'microtask 3'
]);

done();
}
});
});

receiver.write(Buffer.from('810131810132810133', 'hex'));
});

it("waits a microtask after each 'ping' event", (done) => {
const actual = [];
const receiver = new Receiver();

receiver.on('ping', (data) => {
const message = data.toString();
actual.push(message);

// `queueMicrotask()` is not available in Node.js < 11.
Promise.resolve().then(() => {
actual.push(`microtask ${message}`);

if (actual.length === 6) {
assert.deepStrictEqual(actual, [
'1',
'microtask 1',
'2',
'microtask 2',
'3',
'microtask 3'
]);

done();
}
});
});

receiver.write(Buffer.from('890131890132890133', 'hex'));
});

it("waits a microtask after each 'pong' event", (done) => {
const actual = [];
const receiver = new Receiver();

receiver.on('pong', (data) => {
const message = data.toString();
actual.push(message);

// `queueMicrotask()` is not available in Node.js < 11.
Promise.resolve().then(() => {
actual.push(`microtask ${message}`);

if (actual.length === 6) {
assert.deepStrictEqual(actual, [
'1',
'microtask 1',
'2',
'microtask 2',
'3',
'microtask 3'
]);

done();
}
});
});

receiver.write(Buffer.from('8A01318A01328A0133', 'hex'));
});
});
23 changes: 13 additions & 10 deletions test/websocket.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -4109,18 +4109,18 @@ describe('WebSocket', () => {
const messages = [];
const ws = new WebSocket(`ws://localhost:${wss.address().port}`);

ws.on('open', () => {
ws._socket.on('end', () => {
assert.strictEqual(ws._receiver._state, 5);
});
});

ws.on('message', (message, isBinary) => {
assert.ok(!isBinary);

if (messages.push(message.toString()) > 1) return;

ws.close(1000);
// `queueMicrotask()` is not available in Node.js < 11.
Promise.resolve().then(() => {
process.nextTick(() => {
assert.strictEqual(ws._receiver._state, 5);
ws.close(1000);
});
});
});

ws.on('close', (code, reason) => {
Expand Down Expand Up @@ -4365,9 +4365,12 @@ describe('WebSocket', () => {

if (messages.push(message.toString()) > 1) return;

process.nextTick(() => {
assert.strictEqual(ws._receiver._state, 5);
ws.terminate();
// `queueMicrotask()` is not available in Node.js < 11.
Promise.resolve().then(() => {
process.nextTick(() => {
assert.strictEqual(ws._receiver._state, 5);
ws.terminate();
});
});
});

Expand Down

0 comments on commit 820ea28

Please sign in to comment.