Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

streams: add cork option to pipe #2020

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion doc/api/stream.markdown
Original file line number Diff line number Diff line change
Expand Up @@ -333,6 +333,8 @@ readable.isPaused() // === false
* `destination` {[Writable][] Stream} The destination for writing data
* `options` {Object} Pipe options
* `end` {Boolean} End the writer when the reader ends. Default = `true`
* `cork` {Boolean} Before each write, cork the stream and then uncork it on the
next tick in order to consolidate writes. Default = `false`

This method pulls all the data out of a readable stream, and writes it
to the supplied destination, automatically managing the flow so that
Expand Down Expand Up @@ -1315,7 +1317,7 @@ for examples and testing, but there are occasionally use cases where
it can come in handy as a building block for novel sorts of streams.


## Simplified Constructor API
## Simplified Constructor API

<!--type=misc-->

Expand Down
20 changes: 20 additions & 0 deletions lib/_stream_readable.js
Original file line number Diff line number Diff line change
Expand Up @@ -467,6 +467,9 @@ Readable.prototype.pipe = function(dest, pipeOpts) {
dest !== process.stdout &&
dest !== process.stderr;

var autoCork = pipeOpts && pipeOpts.cork && (typeof dest.cork === 'function');
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a time when dest.cork isn't a function? Won't it error anyways if it isn't a writable stream?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Old streams?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, that makes sense! Thanks!

var corked = false;

var endFn = doEnd ? onend : cleanup;
if (state.endEmitted)
process.nextTick(endFn);
Expand Down Expand Up @@ -515,9 +518,26 @@ Readable.prototype.pipe = function(dest, pipeOpts) {
ondrain();
}

function maybeCork() {
if (!autoCork || corked) {
debug('already corked');
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not technically correct in the case of autoCork being false.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

True should be more like, 'no need to cork'

return;
}
debug('corking');
corked = true;
dest.cork();
process.nextTick(uncork);
}
function uncork() {
debug('uncorking');
corked = false;
dest.uncork();
}
src.on('data', ondata);
function ondata(chunk) {
debug('ondata');
maybeCork();
debug('corks=%d', dest._writableState.corked);
var ret = dest.write(chunk);
if (false === ret) {
debug('false write response, pause',
Expand Down
176 changes: 176 additions & 0 deletions test/parallel/test-stream-pipe-cork.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,176 @@
'use strict';
var common = require('../common');
var assert = require('assert');

var stream = require('stream');
// tiny node-tap lookalike.
var tests = [];
var count = 0;

function test(name, fn) {
count++;
tests.push([name, fn]);
}

function run() {
var next = tests.shift();
if (!next)
return console.error('ok');

var name = next[0];
var fn = next[1];
console.log('# %s', name);
fn({
same: assert.deepEqual,
equal: assert.equal,
end: function() {
count--;
run();
}
});
}

// ensure all tests have run
process.on('exit', function() {
assert.equal(count, 0);
});

process.nextTick(run);
test('all sync', function (t) {
var counter = 0;
var expectCount = 0;
function cnt(msg) {
expectCount++;
var expect = expectCount;
var called = false;
return function(er) {
if (er)
throw er;
called = true;
counter++;
t.equal(counter, expect);
};
}
var p = new stream.PassThrough();
var w = new stream.Writable();
w._write = function(chunk, e, cb) {
assert(false, 'Should not call _write');
};
p.pipe(w, {
cork: true
});
var expectChunks =
[
{ encoding: 'buffer',
chunk: [104, 101, 108, 108, 111, 44, 32] },
{ encoding: 'buffer',
chunk: [119, 111, 114, 108, 100] },
{ encoding: 'buffer',
chunk: [33] },
{ encoding: 'buffer',
chunk: [10, 97, 110, 100, 32, 116, 104, 101, 110, 46, 46, 46] },
{ encoding: 'buffer',
chunk: [250, 206, 190, 167, 222, 173, 190, 239, 222, 202, 251, 173]}
];

var actualChunks;
w._writev = function(chunks, cb) {
actualChunks = chunks.map(function(chunk) {
return {
encoding: chunk.encoding,
chunk: Buffer.isBuffer(chunk.chunk) ?
Array.prototype.slice.call(chunk.chunk) : chunk.chunk
};
});
cb();
};
p.write('hello, ', 'ascii', cnt('hello'));
p.write('world', 'utf8', cnt('world'));

p.write(new Buffer('!'), 'buffer', cnt('!'));

p.write('\nand then...', 'binary', cnt('and then'));
p.write('facebea7deadbeefdecafbad', 'hex', cnt('hex'));

p.end(cnt('end'));


w.on('finish', function() {
// make sure finish comes after all the write cb
cnt('finish')();
t.same(expectChunks, actualChunks);
t.end();
});
});
test('2 groups', function (t) {
var counter = 0;
var expectCount = 0;
function cnt(msg) {
expectCount++;
var expect = expectCount;
var called = false;
return function(er) {
if (er)
throw er;
called = true;
counter++;
t.equal(counter, expect);
};
}
var p = new stream.PassThrough();
var w = new stream.Writable();
w._write = function(chunk, e, cb) {
assert(false, 'Should not call _write');
};
p.pipe(w, {
cork: true
});
var expectChunks = [
[
{ encoding: 'buffer',
chunk: [104, 101, 108, 108, 111, 44, 32] },
{ encoding: 'buffer',
chunk: [119, 111, 114, 108, 100] }
],[
{ encoding: 'buffer',
chunk: [33] },
{ encoding: 'buffer',
chunk: [10, 97, 110, 100, 32, 116, 104, 101, 110, 46, 46, 46] },
{ encoding: 'buffer',
chunk: [250, 206, 190, 167, 222, 173, 190, 239, 222, 202, 251, 173]}
]];

var actualChunks = [];
var called = 0;
w._writev = function(chunks, cb) {
actualChunks.push(chunks.map(function(chunk) {
return {
encoding: chunk.encoding,
chunk: Buffer.isBuffer(chunk.chunk) ?
Array.prototype.slice.call(chunk.chunk) : chunk.chunk
};
}));
cb();
};
p.write('hello, ', 'ascii', cnt('hello'));
p.write('world', 'utf8', cnt('world'));
process.nextTick(function () {
p.write(new Buffer('!'), 'buffer', cnt('!'));

p.write('\nand then...', 'binary', cnt('and then'));
p.write('facebea7deadbeefdecafbad', 'hex', cnt('hex'));

p.end(cnt('end'));
});

w.on('finish', function() {
// make sure finish comes after all the write cb
cnt('finish')();
console.log('expected');
console.log(expectChunks);
console.log('actual');
console.log(actualChunks);
t.same(expectChunks, actualChunks);
t.end();
});
});