Skip to content
This repository was archived by the owner on Apr 22, 2023. It is now read-only.
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 9 additions & 5 deletions doc/api/stream.markdown
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ initialized.
### readable.\_read(size, callback)

* `size` {Number} Number of bytes to read asynchronously
* `callback` {Function} Called with an error or with data
* `callback` {Function} Optional, called with an error or with data

All Readable stream implementations must provide a `_read` method
to fetch data from the underlying resource.
Expand All @@ -117,9 +117,13 @@ Note: **This function MUST NOT be called directly.** It should be
implemented by child classes, and called by the internal Readable
class methods only.

Call the callback using the standard `callback(error, data)` pattern.
When no more data can be fetched, call `callback(null, null)` to
signal the EOF.
If a callback is provided, `_read` will not be called again until the
callback has been called using the standard `callback(error, data)`
pattern. When no more data can be fetched, call `callback(null, null)`
to signal the EOF.

If no callback is provided, all data and EOF signalling must be fed to
the Readable by using the `push()` method.

This method is prefixed with an underscore because it is internal to
the class that defines it, and should not be called directly by user
Expand Down Expand Up @@ -162,7 +166,7 @@ source.onend = function() {
};

// _read will be called when the stream wants to pull more data in
stream._read = function(size, cb) {
stream._read = function() {
source.readStart();
};
```
Expand Down
148 changes: 72 additions & 76 deletions lib/_stream_readable.js
Original file line number Diff line number Diff line change
Expand Up @@ -51,17 +51,12 @@ function ReadableState(options, stream) {
this.ended = false;
this.endEmitted = false;
this.reading = false;
this.sync = false;
this.onread = function(er, data) {
onread(stream, er, data);
};

// whenever we return null, then we set a flag to say
// that we're awaiting a 'readable' event emission.
this.needReadable = false;
this.emittedReadable = false;


// object stream flag. Used to make read(n) ignore n and to
// make all the buffer merging and length checks go away
this.objectMode = !!options.objectMode;
Expand Down Expand Up @@ -100,7 +95,11 @@ function Readable(options) {
// write() some more.
Readable.prototype.push = function(chunk) {
var rs = this._readableState;
rs.onread(null, chunk);

if (pushChunk(this, chunk)) {
if (rs.needReadable)
emitReadable(this);
}

// if it's past the high water mark, we can push in some more.
// Also, if we have no data yet, we can stand some
Expand Down Expand Up @@ -214,22 +213,10 @@ Readable.prototype.read = function(n) {
doRead = false;

if (doRead) {
state.reading = true;
state.sync = true;
// if the length is currently zero, then we *need* a readable event.
if (state.length === 0)
state.needReadable = true;
// call internal read method
this._read(state.bufferSize, state.onread);
state.sync = false;
if (readmore(this))
n = howMuchToRead(nOrig, state);
}

// If _read called its callback synchronously, then `reading`
// will be false, and we need to re-evaluate how much data we
// can return to the user.
if (doRead && !state.reading)
n = howMuchToRead(nOrig, state);

var ret;
if (n > 0)
ret = fromList(n, state);
Expand Down Expand Up @@ -257,9 +244,63 @@ Readable.prototype.read = function(n) {
return ret;
};

function onread(stream, er, chunk) {
function readmore(stream) {
var state = stream._readableState;
var size = state.length;

// if the length is currently zero, then we *need* a readable event.
if (state.length === 0)
state.needReadable = true;

// call internal read method
state.reading = true;
if (stream._read.length > 1) {
var sync = true;
stream._read(state.bufferSize, onread);
sync = false;
} else {
stream._read(state.bufferSize);
state.reading = false;
}

function onread(er, chunk) {
state.reading = false;
if (er)
return stream.emit('error', er);

if (!pushChunk(stream, chunk))
return;

// Don't followup right away in sync mode, because this can trigger
// another read() call => stack overflow. This way, it might trigger
// a nextTick recursion warning, but that's not so bad.
if (!sync)
followup();
else
process.nextTick(followup);
}

function followup() {
if (state.needReadable && emitReadable(stream))
return;

// at this point, the user has presumably seen the 'readable' event,
// and called read() to consume some data. that may have triggered
// in turn another _read(n,cb) call, in which case reading = true if
// it's in progress.
// However, if we're not ended, or reading, and the length < hwm,
// then go ahead and try to read some more right now preemptively.
if (!state.reading && !state.ending && !state.ended &&
state.length < state.highWaterMark) {
stream.read(0);
}
}

return state.length - size;
}

function pushChunk(stream, chunk) {
var state = stream._readableState;
var sync = state.sync;

// If we get something that is not a buffer, string, null, or undefined,
// then switch into objectMode. Now stream chunks are all considered
Expand All @@ -274,10 +315,6 @@ function onread(stream, er, chunk) {
state.decoder = null;
}

state.reading = false;
if (er)
return stream.emit('error', er);

if (chunk === null || chunk === undefined) {
// eof
state.ended = true;
Expand All @@ -295,7 +332,7 @@ function onread(stream, er, chunk) {
emitReadable(stream);
else
endReadable(stream);
return;
return false;
}

// at this point, if we got a zero-length buffer or string,
Expand All @@ -307,71 +344,30 @@ function onread(stream, er, chunk) {
if (!state.objectMode &&
(chunk || typeof chunk === 'string') &&
0 === chunk.length)
return;
return false;

if (state.decoder)
if (state.decoder) {
chunk = state.decoder.write(chunk);
if (chunk === null)
return true;
}

// update the buffer info.
state.length += state.objectMode ? 1 : chunk.length;
state.buffer.push(chunk);

// if we haven't gotten any data,
// and we haven't ended, then don't bother telling the user
// that it's time to read more data. Otherwise, emitting 'readable'
// probably will trigger another stream.read(), which can trigger
// another _read(n,cb) before this one returns!
if (state.length === 0) {
state.reading = true;
stream._read(state.bufferSize, state.onread);
return;
}

if (state.needReadable)
emitReadable(stream);
else if (state.sync)
process.nextTick(function() {
maybeReadMore(stream, state);
});
else
maybeReadMore(stream, state);
return true;
}

// Don't emit readable right away in sync mode, because this can trigger
// another read() call => stack overflow. This way, it might trigger
// a nextTick recursion warning, but that's not so bad.
function emitReadable(stream) {
var state = stream._readableState;
state.needReadable = false;
if (state.emittedReadable)
return;
return false;

state.emittedReadable = true;
if (state.sync)
process.nextTick(function() {
emitReadable_(stream);
});
else
emitReadable_(stream);
}

function emitReadable_(stream) {
var state = stream._readableState;
stream.emit('readable');
maybeReadMore(stream, state);
}

function maybeReadMore(stream, state) {
// at this point, the user has presumably seen the 'readable' event,
// and called read() to consume some data. that may have triggered
// in turn another _read(n,cb) call, in which case reading = true if
// it's in progress.
// However, if we're not ended, or reading, and the length < hwm,
// then go ahead and try to read some more right now preemptively.
if (!state.reading && !state.ending && !state.ended &&
state.length < state.highWaterMark) {
stream.read(0);
}
return true;
}

// abstract method. to be overridden in specific implementation classes.
Expand Down
1 change: 1 addition & 0 deletions lib/fs.js
Original file line number Diff line number Diff line change
Expand Up @@ -1459,6 +1459,7 @@ ReadStream.prototype._read = function(n, cb) {
this._read(n, cb);
});

// totally ignore the request once destroyed
if (this.destroyed)
return;

Expand Down
5 changes: 3 additions & 2 deletions lib/http.js
Original file line number Diff line number Diff line change
Expand Up @@ -337,8 +337,9 @@ IncomingMessage.prototype._read = function(n, callback) {
// do need to unpause the underlying socket so that it flows.
if (!this.socket.readable)
return callback(null, null);
else
readStart(this.socket);

readStart(this.socket);
callback(null, '');
};


Expand Down
2 changes: 1 addition & 1 deletion lib/net.js
Original file line number Diff line number Diff line change
Expand Up @@ -343,7 +343,6 @@ Socket.prototype._read = function(n, callback) {
return;
}

assert(callback === this._readableState.onread);
assert(this._readableState.reading = true);

if (!this._handle.reading) {
Expand All @@ -355,6 +354,7 @@ Socket.prototype._read = function(n, callback) {
} else {
debug('readStart already has been called.');
}
callback(null, '');
};


Expand Down
1 change: 1 addition & 0 deletions test/simple/test-stream-big-push.js
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ r._read = function(n, cb) {
} else if (reads === 1) {
var ret = r.push(str);
assert.equal(ret, false);
cb(null, '');
reads++;
} else {
assert(!eofed);
Expand Down
7 changes: 6 additions & 1 deletion test/simple/test-stream-push-order.js
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,12 @@ s._read = function (n, cb) {

var two = list.shift();
s.push(one);
cb(null, two);
if (list.length === 4)
setImmediate(function() {
cb(null, two);
});
else
cb(null, two);
};

var v = s.read(0);
Expand Down