From 25b8da1d5845e3a4ab41a54401581443011f3251 Mon Sep 17 00:00:00 2001 From: Calvin Metcalf Date: Fri, 19 Jun 2015 12:52:51 -0400 Subject: [PATCH] streams: add cork option to pipe Adds an option to .pipe to cork it before each write and then uncork it next tick, based on discussion at nodejs/readable-stream#145 --- doc/api/stream.markdown | 4 +- lib/_stream_readable.js | 20 +++ test/parallel/test-stream-pipe-cork.js | 176 +++++++++++++++++++++++++ 3 files changed, 199 insertions(+), 1 deletion(-) create mode 100644 test/parallel/test-stream-pipe-cork.js diff --git a/doc/api/stream.markdown b/doc/api/stream.markdown index 12f99078aa1933..8254d023a9c09c 100644 --- a/doc/api/stream.markdown +++ b/doc/api/stream.markdown @@ -333,6 +333,8 @@ readable.isPaused() // === false * `destination` {[Writable][] Stream} The destination for writing data * `options` {Object} Pipe options * `end` {Boolean} End the writer when the reader ends. Default = `true` + * `cork` {Boolean} Before each write, cork the stream and then uncork it on the + next tick in order to consolidate writes. Default = `false` This method pulls all the data out of a readable stream, and writes it to the supplied destination, automatically managing the flow so that @@ -1315,7 +1317,7 @@ for examples and testing, but there are occasionally use cases where it can come in handy as a building block for novel sorts of streams. -## Simplified Constructor API +## Simplified Constructor API diff --git a/lib/_stream_readable.js b/lib/_stream_readable.js index 7be7723e52cb44..ad60347430ddb1 100644 --- a/lib/_stream_readable.js +++ b/lib/_stream_readable.js @@ -467,6 +467,9 @@ Readable.prototype.pipe = function(dest, pipeOpts) { dest !== process.stdout && dest !== process.stderr; + var autoCork = pipeOpts && pipeOpts.cork && (typeof dest.cork === 'function'); + var corked = false; + var endFn = doEnd ? onend : cleanup; if (state.endEmitted) process.nextTick(endFn); @@ -515,9 +518,26 @@ Readable.prototype.pipe = function(dest, pipeOpts) { ondrain(); } + function maybeCork() { + if (!autoCork || corked) { + debug('already corked'); + return; + } + debug('corking'); + corked = true; + dest.cork(); + process.nextTick(uncork); + } + function uncork() { + debug('uncorking'); + corked = false; + dest.uncork(); + } src.on('data', ondata); function ondata(chunk) { debug('ondata'); + maybeCork(); + debug('corks=%d', dest._writableState.corked); var ret = dest.write(chunk); if (false === ret) { debug('false write response, pause', diff --git a/test/parallel/test-stream-pipe-cork.js b/test/parallel/test-stream-pipe-cork.js new file mode 100644 index 00000000000000..38ebd23281cc62 --- /dev/null +++ b/test/parallel/test-stream-pipe-cork.js @@ -0,0 +1,176 @@ +'use strict'; +var common = require('../common'); +var assert = require('assert'); + +var stream = require('stream'); +// tiny node-tap lookalike. +var tests = []; +var count = 0; + +function test(name, fn) { + count++; + tests.push([name, fn]); +} + +function run() { + var next = tests.shift(); + if (!next) + return console.error('ok'); + + var name = next[0]; + var fn = next[1]; + console.log('# %s', name); + fn({ + same: assert.deepEqual, + equal: assert.equal, + end: function() { + count--; + run(); + } + }); +} + +// ensure all tests have run +process.on('exit', function() { + assert.equal(count, 0); +}); + +process.nextTick(run); +test('all sync', function (t) { + var counter = 0; + var expectCount = 0; + function cnt(msg) { + expectCount++; + var expect = expectCount; + var called = false; + return function(er) { + if (er) + throw er; + called = true; + counter++; + t.equal(counter, expect); + }; + } + var p = new stream.PassThrough(); + var w = new stream.Writable(); + w._write = function(chunk, e, cb) { + assert(false, 'Should not call _write'); + }; + p.pipe(w, { + cork: true + }); + var expectChunks = + [ + { encoding: 'buffer', + chunk: [104, 101, 108, 108, 111, 44, 32] }, + { encoding: 'buffer', + chunk: [119, 111, 114, 108, 100] }, + { encoding: 'buffer', + chunk: [33] }, + { encoding: 'buffer', + chunk: [10, 97, 110, 100, 32, 116, 104, 101, 110, 46, 46, 46] }, + { encoding: 'buffer', + chunk: [250, 206, 190, 167, 222, 173, 190, 239, 222, 202, 251, 173]} + ]; + + var actualChunks; + w._writev = function(chunks, cb) { + actualChunks = chunks.map(function(chunk) { + return { + encoding: chunk.encoding, + chunk: Buffer.isBuffer(chunk.chunk) ? + Array.prototype.slice.call(chunk.chunk) : chunk.chunk + }; + }); + cb(); + }; + p.write('hello, ', 'ascii', cnt('hello')); + p.write('world', 'utf8', cnt('world')); + + p.write(new Buffer('!'), 'buffer', cnt('!')); + + p.write('\nand then...', 'binary', cnt('and then')); + p.write('facebea7deadbeefdecafbad', 'hex', cnt('hex')); + + p.end(cnt('end')); + + + w.on('finish', function() { + // make sure finish comes after all the write cb + cnt('finish')(); + t.same(expectChunks, actualChunks); + t.end(); + }); +}); +test('2 groups', function (t) { + var counter = 0; + var expectCount = 0; + function cnt(msg) { + expectCount++; + var expect = expectCount; + var called = false; + return function(er) { + if (er) + throw er; + called = true; + counter++; + t.equal(counter, expect); + }; + } + var p = new stream.PassThrough(); + var w = new stream.Writable(); + w._write = function(chunk, e, cb) { + assert(false, 'Should not call _write'); + }; + p.pipe(w, { + cork: true + }); + var expectChunks = [ + [ + { encoding: 'buffer', + chunk: [104, 101, 108, 108, 111, 44, 32] }, + { encoding: 'buffer', + chunk: [119, 111, 114, 108, 100] } + ],[ + { encoding: 'buffer', + chunk: [33] }, + { encoding: 'buffer', + chunk: [10, 97, 110, 100, 32, 116, 104, 101, 110, 46, 46, 46] }, + { encoding: 'buffer', + chunk: [250, 206, 190, 167, 222, 173, 190, 239, 222, 202, 251, 173]} + ]]; + + var actualChunks = []; + var called = 0; + w._writev = function(chunks, cb) { + actualChunks.push(chunks.map(function(chunk) { + return { + encoding: chunk.encoding, + chunk: Buffer.isBuffer(chunk.chunk) ? + Array.prototype.slice.call(chunk.chunk) : chunk.chunk + }; + })); + cb(); + }; + p.write('hello, ', 'ascii', cnt('hello')); + p.write('world', 'utf8', cnt('world')); + process.nextTick(function () { + p.write(new Buffer('!'), 'buffer', cnt('!')); + + p.write('\nand then...', 'binary', cnt('and then')); + p.write('facebea7deadbeefdecafbad', 'hex', cnt('hex')); + + p.end(cnt('end')); + }); + + w.on('finish', function() { + // make sure finish comes after all the write cb + cnt('finish')(); + console.log('expected'); + console.log(expectChunks); + console.log('actual'); + console.log(actualChunks); + t.same(expectChunks, actualChunks); + t.end(); + }); +});