From 5c5f69f70c16136f2a01d88a91f5ceaa945ff2d5 Mon Sep 17 00:00:00 2001 From: Anna Henningsen Date: Fri, 28 Jul 2017 13:39:04 +0200 Subject: [PATCH] zlib: fix interaction of flushing and needDrain Fixes: https://github.com/nodejs/node/issues/14523 PR-URL: https://github.com/nodejs/node/pull/14527 Reviewed-By: Matteo Collina Reviewed-By: James M Snell Reviewed-By: Colin Ihrig --- lib/zlib.js | 30 +++++++++++++- .../test-zlib-flush-drain-longblock.js | 27 ++++++++++++ .../test-zlib-flush-multiple-scheduled.js | 41 +++++++++++++++++++ 3 files changed, 96 insertions(+), 2 deletions(-) create mode 100644 test/parallel/test-zlib-flush-drain-longblock.js create mode 100644 test/parallel/test-zlib-flush-multiple-scheduled.js diff --git a/lib/zlib.js b/lib/zlib.js index d620b6bee3470a..a32d774c65d2dd 100644 --- a/lib/zlib.js +++ b/lib/zlib.js @@ -188,6 +188,7 @@ function Zlib(opts, mode) { this._flushFlag = opts.flush || constants.Z_NO_FLUSH; this._finishFlushFlag = opts.finishFlush !== undefined ? opts.finishFlush : constants.Z_FINISH; + this._scheduledFlushFlag = constants.Z_NO_FLUSH; if (opts.chunkSize !== undefined) { if (opts.chunkSize < constants.Z_MIN_CHUNK) { @@ -300,6 +301,23 @@ Zlib.prototype._flush = function _flush(callback) { this._transform(Buffer.alloc(0), '', callback); }; +// If a flush is scheduled while another flush is still pending, a way to figure +// out which one is the "stronger" flush is needed. +// Roughly, the following holds: +// Z_NO_FLUSH (< Z_TREES) < Z_BLOCK < Z_PARTIAL_FLUSH < +// Z_SYNC_FLUSH < Z_FULL_FLUSH < Z_FINISH +const flushiness = []; +let i = 0; +for (const flushFlag of [constants.Z_NO_FLUSH, constants.Z_BLOCK, + constants.Z_PARTIAL_FLUSH, constants.Z_SYNC_FLUSH, + constants.Z_FULL_FLUSH, constants.Z_FINISH]) { + flushiness[flushFlag] = i++; +} + +function maxFlush(a, b) { + return flushiness[a] > flushiness[b] ? a : b; +} + Zlib.prototype.flush = function flush(kind, callback) { var ws = this._writableState; @@ -315,13 +333,21 @@ Zlib.prototype.flush = function flush(kind, callback) { if (callback) this.once('end', callback); } else if (ws.needDrain) { - if (callback) { - const drainHandler = () => this.flush(kind, callback); + const alreadyHadFlushScheduled = + this._scheduledFlushFlag !== constants.Z_NO_FLUSH; + this._scheduledFlushFlag = maxFlush(kind, this._scheduledFlushFlag); + + // If a callback was passed, always register a new `drain` + flush handler, + // mostly because that’s simpler and flush callbacks piling up is a rare + // thing anyway. + if (!alreadyHadFlushScheduled || callback) { + const drainHandler = () => this.flush(this._scheduledFlushFlag, callback); this.once('drain', drainHandler); } } else { this._flushFlag = kind; this.write(Buffer.alloc(0), '', callback); + this._scheduledFlushFlag = constants.Z_NO_FLUSH; } }; diff --git a/test/parallel/test-zlib-flush-drain-longblock.js b/test/parallel/test-zlib-flush-drain-longblock.js new file mode 100644 index 00000000000000..94d1d9d04d9369 --- /dev/null +++ b/test/parallel/test-zlib-flush-drain-longblock.js @@ -0,0 +1,27 @@ +'use strict'; + +// Regression test for https://github.com/nodejs/node/issues/14523. +// Checks that flushes interact properly with writableState.needDrain, +// even if no flush callback was passed. + +const common = require('../common'); +const assert = require('assert'); +const zlib = require('zlib'); + +const zipper = zlib.createGzip({ highWaterMark: 16384 }); +const unzipper = zlib.createGunzip(); +zipper.pipe(unzipper); + +zipper.write('A'.repeat(17000)); +zipper.flush(); + +let received = 0; +unzipper.on('data', common.mustCall((d) => { + received += d.length; +}, 2)); + +// Properly `.end()`ing the streams would interfere with checking that +// `.flush()` works. +process.on('exit', () => { + assert.strictEqual(received, 17000); +}); diff --git a/test/parallel/test-zlib-flush-multiple-scheduled.js b/test/parallel/test-zlib-flush-multiple-scheduled.js new file mode 100644 index 00000000000000..19548672389fde --- /dev/null +++ b/test/parallel/test-zlib-flush-multiple-scheduled.js @@ -0,0 +1,41 @@ +'use strict'; + +const common = require('../common'); +const assert = require('assert'); +const zlib = require('zlib'); + +const { + Z_PARTIAL_FLUSH, Z_SYNC_FLUSH, Z_FULL_FLUSH, Z_FINISH +} = zlib.constants; + +common.crashOnUnhandledRejection(); + +async function getOutput(...sequenceOfFlushes) { + const zipper = zlib.createGzip({ highWaterMark: 16384 }); + + zipper.write('A'.repeat(17000)); + for (const flush of sequenceOfFlushes) { + zipper.flush(flush); + } + + const data = []; + + return new Promise((resolve) => { + zipper.on('data', common.mustCall((d) => { + data.push(d); + if (data.length === 2) resolve(Buffer.concat(data)); + }, 2)); + }); +} + +(async function() { + assert.deepStrictEqual(await getOutput(Z_SYNC_FLUSH), + await getOutput(Z_SYNC_FLUSH, Z_PARTIAL_FLUSH)); + assert.deepStrictEqual(await getOutput(Z_SYNC_FLUSH), + await getOutput(Z_PARTIAL_FLUSH, Z_SYNC_FLUSH)); + + assert.deepStrictEqual(await getOutput(Z_FINISH), + await getOutput(Z_FULL_FLUSH, Z_FINISH)); + assert.deepStrictEqual(await getOutput(Z_FINISH), + await getOutput(Z_SYNC_FLUSH, Z_FINISH)); +})();