Skip to content

Commit

Permalink
[fix] Call buffered send callbacks on abnormal closure (#1702)
Browse files Browse the repository at this point in the history
If the socket is closed while data is being compressed, invoke the
current send callback and the buffered send callbacks with an error
before emitting the `'close'` event.

Refs: nodejs/node#30596
  • Loading branch information
lpinca authored Mar 7, 2020
1 parent 3e7f69c commit b29154b
Show file tree
Hide file tree
Showing 5 changed files with 94 additions and 21 deletions.
16 changes: 10 additions & 6 deletions lib/permessage-deflate.js
Original file line number Diff line number Diff line change
Expand Up @@ -131,12 +131,18 @@ class PerMessageDeflate {
}

if (this._deflate) {
if (this._deflate[kCallback]) {
this._deflate[kCallback]();
}
const callback = this._deflate[kCallback];

this._deflate.close();
this._deflate = null;

if (callback) {
callback(
new Error(
'The deflate stream was closed while data was being processed'
)
);
}
}
}

Expand Down Expand Up @@ -314,9 +320,7 @@ class PerMessageDeflate {
zlibLimiter.add((done) => {
this._compress(data, fin, (err, result) => {
done();
if (err || result) {
callback(err, result);
}
callback(err, result);
});
});
}
Expand Down
16 changes: 16 additions & 0 deletions lib/sender.js
Original file line number Diff line number Diff line change
Expand Up @@ -306,6 +306,22 @@ class Sender {

this._deflating = true;
perMessageDeflate.compress(data, options.fin, (_, buf) => {
if (this._socket.destroyed) {
const err = new Error(
'The socket was closed while data was being compressed'
);

if (typeof cb === 'function') cb(err);

for (let i = 0; i < this._queue.length; i++) {
const callback = this._queue[i][4];

if (typeof callback === 'function') callback(err);
}

return;
}

this._deflating = false;
options.readOnly = false;
this.sendFrame(Sender.frame(buf, options), cb);
Expand Down
4 changes: 2 additions & 2 deletions lib/websocket.js
Original file line number Diff line number Diff line change
Expand Up @@ -179,9 +179,8 @@ class WebSocket extends EventEmitter {
* @private
*/
emitClose() {
this.readyState = WebSocket.CLOSED;

if (!this._socket) {
this.readyState = WebSocket.CLOSED;
this.emit('close', this._closeCode, this._closeMessage);
return;
}
Expand All @@ -191,6 +190,7 @@ class WebSocket extends EventEmitter {
}

this._receiver.removeAllListeners();
this.readyState = WebSocket.CLOSED;
this.emit('close', this._closeCode, this._closeMessage);
}

Expand Down
12 changes: 8 additions & 4 deletions test/permessage-deflate.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -615,15 +615,19 @@ describe('PerMessageDeflate', () => {
});
});

it("doesn't call the callback if the deflate stream is closed prematurely", (done) => {
it('calls the callback if the deflate stream is closed prematurely', (done) => {
const perMessageDeflate = new PerMessageDeflate({ threshold: 0 });
const buf = Buffer.from('A'.repeat(50));

perMessageDeflate.accept([{}]);
perMessageDeflate.compress(buf, true, () => {
done(new Error('Unexpected callback invocation'));
perMessageDeflate.compress(buf, true, (err) => {
assert.ok(err instanceof Error);
assert.strictEqual(
err.message,
'The deflate stream was closed while data was being processed'
);
done();
});
perMessageDeflate._deflate.on('close', done);

process.nextTick(() => perMessageDeflate.cleanup());
});
Expand Down
67 changes: 58 additions & 9 deletions test/websocket.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -2341,6 +2341,52 @@ describe('WebSocket', () => {
ws.on('message', (message) => ws.send(message, { compress: true }));
});
});

it('calls the callback if the socket is closed prematurely', (done) => {
const wss = new WebSocket.Server(
{ perMessageDeflate: true, port: 0 },
() => {
const called = [];
const ws = new WebSocket(`ws://localhost:${wss.address().port}`, {
perMessageDeflate: { threshold: 0 }
});

ws.on('open', () => {
ws.send('foo');
ws.send('bar', (err) => {
called.push(1);

assert.strictEqual(ws.readyState, WebSocket.CLOSING);
assert.ok(err instanceof Error);
assert.strictEqual(
err.message,
'The socket was closed while data was being compressed'
);
});
ws.send('baz');
ws.send('qux', (err) => {
called.push(2);

assert.strictEqual(ws.readyState, WebSocket.CLOSING);
assert.ok(err instanceof Error);
assert.strictEqual(
err.message,
'The socket was closed while data was being compressed'
);
});
});

ws.on('close', () => {
assert.deepStrictEqual(called, [1, 2]);
wss.close(done);
});
}
);

wss.on('connection', (ws) => {
ws._socket.end();
});
});
});

describe('#terminate', () => {
Expand All @@ -2356,19 +2402,22 @@ describe('WebSocket', () => {
});

ws.on('open', () => {
ws.send('hi', () =>
done(new Error('Unexpected callback invocation'))
);
ws.send('hi', (err) => {
assert.strictEqual(ws.readyState, WebSocket.CLOSING);
assert.ok(err instanceof Error);
assert.strictEqual(
err.message,
'The socket was closed while data was being compressed'
);

ws.on('close', () => {
wss.close(done);
});
});
ws.terminate();
});
}
);

wss.on('connection', (ws) => {
ws.on('close', () => {
wss.close(done);
});
});
});

it('can be used while data is being decompressed', (done) => {
Expand Down

0 comments on commit b29154b

Please sign in to comment.