Skip to content

Commit

Permalink
backport iojs v1.0.0 streams changes
Browse files Browse the repository at this point in the history
  • Loading branch information
chrisdickinson committed Jan 15, 2015
1 parent b1036fb commit 1268b34
Show file tree
Hide file tree
Showing 49 changed files with 316 additions and 1,018 deletions.
23 changes: 2 additions & 21 deletions lib/_stream_duplex.js
Original file line number Diff line number Diff line change
@@ -1,29 +1,10 @@
// Copyright Joyent, Inc. and other Node contributors.
//
// Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the
// "Software"), to deal in the Software without restriction, including
// without limitation the rights to use, copy, modify, merge, publish,
// distribute, sublicense, and/or sell copies of the Software, and to permit
// persons to whom the Software is furnished to do so, subject to the
// following conditions:
//
// The above copyright notice and this permission notice shall be included
// in all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN
// NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
// DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE
// USE OR OTHER DEALINGS IN THE SOFTWARE.

// a duplex stream is just a stream that is both readable and writable.
// Since JS doesn't have multiple prototypal inheritance, this class
// prototypally inherits from Readable, and then parasitically from
// Writable.

'use strict';

module.exports = Duplex;

/*<replacement>*/
Expand Down
80 changes: 32 additions & 48 deletions lib/_stream_readable.js
Original file line number Diff line number Diff line change
@@ -1,23 +1,4 @@
// Copyright Joyent, Inc. and other Node contributors.
//
// Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the
// "Software"), to deal in the Software without restriction, including
// without limitation the rights to use, copy, modify, merge, publish,
// distribute, sublicense, and/or sell copies of the Software, and to permit
// persons to whom the Software is furnished to do so, subject to the
// following conditions:
//
// The above copyright notice and this permission notice shall be included
// in all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN
// NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
// DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE
// USE OR OTHER DEALINGS IN THE SOFTWARE.
'use strict';

module.exports = Readable;

Expand All @@ -41,16 +22,12 @@ if (!EE.listenerCount) EE.listenerCount = function(emitter, type) {
/*</replacement>*/

var Stream = require('stream');
var util = require('util');
var Duplex;

/*<replacement>*/
var util = require('core-util-is');
util.inherits = require('inherits');
/*</replacement>*/

var StringDecoder;


/*<replacement>*/
var debug = require('util');
if (debug && debug.debuglog) {
debug = debug.debuglog('stream');
Expand All @@ -59,18 +36,25 @@ if (debug && debug.debuglog) {
}
/*</replacement>*/

var StringDecoder;

util.inherits(Readable, Stream);

function ReadableState(options, stream) {
var Duplex = require('./_stream_duplex');

Duplex = Duplex || require('./_stream_duplex');
options = options || {};

// object stream flag. Used to make read(n) ignore n and to
// make all the buffer merging and length checks go away
this.objectMode = !!options.objectMode;

if (stream instanceof Duplex)
this.objectMode = this.objectMode || !!options.readableObjectMode;

// the point at which it stops calling _read() to fill the buffer
// Note: 0 is a valid value, means "don't call _read preemptively ever"
var hwm = options.highWaterMark;
var defaultHwm = options.objectMode ? 16 : 16 * 1024;
var defaultHwm = this.objectMode ? 16 : 16 * 1024;
this.highWaterMark = (hwm || hwm === 0) ? hwm : defaultHwm;

// cast to ints.
Expand All @@ -97,14 +81,6 @@ function ReadableState(options, stream) {
this.emittedReadable = false;
this.readableListening = false;


// object stream flag. Used to make read(n) ignore n and to
// make all the buffer merging and length checks go away
this.objectMode = !!options.objectMode;

if (stream instanceof Duplex)
this.objectMode = this.objectMode || !!options.readableObjectMode;

// Crypto is kind of old and crusty. Historically, its default string
// encoding is 'binary' so we have to make this configurable.
// Everything else in the universe uses 'utf8', though.
Expand All @@ -124,15 +100,13 @@ function ReadableState(options, stream) {
this.encoding = null;
if (options.encoding) {
if (!StringDecoder)
StringDecoder = require('string_decoder/').StringDecoder;
StringDecoder = require('string_decoder').StringDecoder;
this.decoder = new StringDecoder(options.encoding);
this.encoding = options.encoding;
}
}

function Readable(options) {
var Duplex = require('./_stream_duplex');

if (!(this instanceof Readable))
return new Readable(options);

Expand Down Expand Up @@ -168,11 +142,15 @@ Readable.prototype.unshift = function(chunk) {
return readableAddChunk(this, state, chunk, '', true);
};

Readable.prototype.isPaused = function() {
return this._readableState.flowing === false;
};

function readableAddChunk(stream, state, chunk, encoding, addToFront) {
var er = chunkInvalid(state, chunk);
if (er) {
stream.emit('error', er);
} else if (util.isNullOrUndefined(chunk)) {
} else if (chunk === null) {
state.reading = false;
if (!state.ended)
onEofChunk(stream, state);
Expand Down Expand Up @@ -234,7 +212,7 @@ function needMoreData(state) {
// backwards compatibility.
Readable.prototype.setEncoding = function(enc) {
if (!StringDecoder)
StringDecoder = require('string_decoder/').StringDecoder;
StringDecoder = require('string_decoder').StringDecoder;
this._readableState.decoder = new StringDecoder(enc);
this._readableState.encoding = enc;
return this;
Expand All @@ -261,7 +239,7 @@ function howMuchToRead(n, state) {
if (state.objectMode)
return n === 0 ? 0 : 1;

if (isNaN(n) || util.isNull(n)) {
if (util.isNull(n) || isNaN(n)) {
// only flow one buffer at a time
if (state.flowing && state.buffer.length)
return state.buffer[0].length;
Expand Down Expand Up @@ -737,10 +715,6 @@ Readable.prototype.resume = function() {
if (!state.flowing) {
debug('resume');
state.flowing = true;
if (!state.reading) {
debug('resume read 0');
this.read(0);
}
resume(this, state);
}
return this;
Expand All @@ -756,6 +730,11 @@ function resume(stream, state) {
}

function resume_(stream, state) {
if (!state.reading) {
debug('resume read 0');
stream.read(0);
}

state.resumeScheduled = false;
stream.emit('resume');
flow(stream);
Expand Down Expand Up @@ -806,7 +785,12 @@ Readable.prototype.wrap = function(stream) {
debug('wrapped data');
if (state.decoder)
chunk = state.decoder.write(chunk);
if (!chunk || !state.objectMode && !chunk.length)

// don't skip over falsy values in objectMode
//if (state.objectMode && util.isNullOrUndefined(chunk))
if (state.objectMode && (chunk === null || chunk === undefined))
return;
else if (!state.objectMode && (!chunk || !chunk.length))
return;

var ret = self.push(chunk);
Expand Down
28 changes: 4 additions & 24 deletions lib/_stream_transform.js
Original file line number Diff line number Diff line change
@@ -1,25 +1,3 @@
// Copyright Joyent, Inc. and other Node contributors.
//
// Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the
// "Software"), to deal in the Software without restriction, including
// without limitation the rights to use, copy, modify, merge, publish,
// distribute, sublicense, and/or sell copies of the Software, and to permit
// persons to whom the Software is furnished to do so, subject to the
// following conditions:
//
// The above copyright notice and this permission notice shall be included
// in all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN
// NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
// DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE
// USE OR OTHER DEALINGS IN THE SOFTWARE.


// a transform stream is a readable/writable stream where you do
// something with the data. Sometimes it's called a "filter",
// but that's not a great name for it, since that implies a thing where
Expand Down Expand Up @@ -62,6 +40,8 @@
// would be consumed, and then the rest would wait (un-transformed) until
// the results of the previous transformed chunk were consumed.

'use strict';

module.exports = Transform;

var Duplex = require('./_stream_duplex');
Expand All @@ -74,7 +54,7 @@ util.inherits = require('inherits');
util.inherits(Transform, Duplex);


function TransformState(options, stream) {
function TransformState(stream) {
this.afterTransform = function(er, data) {
return afterTransform(stream, er, data);
};
Expand Down Expand Up @@ -117,7 +97,7 @@ function Transform(options) {

Duplex.call(this, options);

this._transformState = new TransformState(options, this);
this._transformState = new TransformState(this);

// when the writable side finishes, then flush out anything remaining.
var stream = this;
Expand Down
Loading

0 comments on commit 1268b34

Please sign in to comment.