diff --git a/doc/api/stream.markdown b/doc/api/stream.markdown index ebb771dd5591..6f4e16de7d98 100644 --- a/doc/api/stream.markdown +++ b/doc/api/stream.markdown @@ -108,7 +108,7 @@ initialized. ### readable.\_read(size, callback) * `size` {Number} Number of bytes to read asynchronously -* `callback` {Function} Called with an error or with data +* `callback` {Function} Optional, called with an error or with data All Readable stream implementations must provide a `_read` method to fetch data from the underlying resource. @@ -117,9 +117,13 @@ Note: **This function MUST NOT be called directly.** It should be implemented by child classes, and called by the internal Readable class methods only. -Call the callback using the standard `callback(error, data)` pattern. -When no more data can be fetched, call `callback(null, null)` to -signal the EOF. +If a callback is provided, `_read` will not be called again until the +callback has been called using the standard `callback(error, data)` +pattern. When no more data can be fetched, call `callback(null, null)` +to signal the EOF. + +If no callback is provided, all data and EOF signalling must be fed to +the Readable by using the `push()` method. This method is prefixed with an underscore because it is internal to the class that defines it, and should not be called directly by user @@ -162,7 +166,7 @@ source.onend = function() { }; // _read will be called when the stream wants to pull more data in -stream._read = function(size, cb) { +stream._read = function() { source.readStart(); }; ``` diff --git a/lib/_stream_readable.js b/lib/_stream_readable.js index 2d67bb22fe5c..ab74cba0b386 100644 --- a/lib/_stream_readable.js +++ b/lib/_stream_readable.js @@ -51,17 +51,12 @@ function ReadableState(options, stream) { this.ended = false; this.endEmitted = false; this.reading = false; - this.sync = false; - this.onread = function(er, data) { - onread(stream, er, data); - }; // whenever we return null, then we set a flag to say // that we're awaiting a 'readable' event emission. this.needReadable = false; this.emittedReadable = false; - // object stream flag. Used to make read(n) ignore n and to // make all the buffer merging and length checks go away this.objectMode = !!options.objectMode; @@ -100,7 +95,11 @@ function Readable(options) { // write() some more. Readable.prototype.push = function(chunk) { var rs = this._readableState; - rs.onread(null, chunk); + + if (pushChunk(this, chunk)) { + if (rs.needReadable) + emitReadable(this); + } // if it's past the high water mark, we can push in some more. // Also, if we have no data yet, we can stand some @@ -214,22 +213,10 @@ Readable.prototype.read = function(n) { doRead = false; if (doRead) { - state.reading = true; - state.sync = true; - // if the length is currently zero, then we *need* a readable event. - if (state.length === 0) - state.needReadable = true; - // call internal read method - this._read(state.bufferSize, state.onread); - state.sync = false; + if (readmore(this)) + n = howMuchToRead(nOrig, state); } - // If _read called its callback synchronously, then `reading` - // will be false, and we need to re-evaluate how much data we - // can return to the user. - if (doRead && !state.reading) - n = howMuchToRead(nOrig, state); - var ret; if (n > 0) ret = fromList(n, state); @@ -257,9 +244,63 @@ Readable.prototype.read = function(n) { return ret; }; -function onread(stream, er, chunk) { +function readmore(stream) { + var state = stream._readableState; + var size = state.length; + + // if the length is currently zero, then we *need* a readable event. + if (state.length === 0) + state.needReadable = true; + + // call internal read method + state.reading = true; + if (stream._read.length > 1) { + var sync = true; + stream._read(state.bufferSize, onread); + sync = false; + } else { + stream._read(state.bufferSize); + state.reading = false; + } + + function onread(er, chunk) { + state.reading = false; + if (er) + return stream.emit('error', er); + + if (!pushChunk(stream, chunk)) + return; + + // Don't followup right away in sync mode, because this can trigger + // another read() call => stack overflow. This way, it might trigger + // a nextTick recursion warning, but that's not so bad. + if (!sync) + followup(); + else + process.nextTick(followup); + } + + function followup() { + if (state.needReadable && emitReadable(stream)) + return; + + // at this point, the user has presumably seen the 'readable' event, + // and called read() to consume some data. that may have triggered + // in turn another _read(n,cb) call, in which case reading = true if + // it's in progress. + // However, if we're not ended, or reading, and the length < hwm, + // then go ahead and try to read some more right now preemptively. + if (!state.reading && !state.ending && !state.ended && + state.length < state.highWaterMark) { + stream.read(0); + } + } + + return state.length - size; +} + +function pushChunk(stream, chunk) { var state = stream._readableState; - var sync = state.sync; // If we get something that is not a buffer, string, null, or undefined, // then switch into objectMode. Now stream chunks are all considered @@ -274,10 +315,6 @@ function onread(stream, er, chunk) { state.decoder = null; } - state.reading = false; - if (er) - return stream.emit('error', er); - if (chunk === null || chunk === undefined) { // eof state.ended = true; @@ -295,7 +332,7 @@ function onread(stream, er, chunk) { emitReadable(stream); else endReadable(stream); - return; + return false; } // at this point, if we got a zero-length buffer or string, @@ -307,71 +344,30 @@ function onread(stream, er, chunk) { if (!state.objectMode && (chunk || typeof chunk === 'string') && 0 === chunk.length) - return; + return false; - if (state.decoder) + if (state.decoder) { chunk = state.decoder.write(chunk); + if (chunk === null) + return true; + } // update the buffer info. state.length += state.objectMode ? 1 : chunk.length; state.buffer.push(chunk); - // if we haven't gotten any data, - // and we haven't ended, then don't bother telling the user - // that it's time to read more data. Otherwise, emitting 'readable' - // probably will trigger another stream.read(), which can trigger - // another _read(n,cb) before this one returns! - if (state.length === 0) { - state.reading = true; - stream._read(state.bufferSize, state.onread); - return; - } - - if (state.needReadable) - emitReadable(stream); - else if (state.sync) - process.nextTick(function() { - maybeReadMore(stream, state); - }); - else - maybeReadMore(stream, state); + return true; } -// Don't emit readable right away in sync mode, because this can trigger -// another read() call => stack overflow. This way, it might trigger -// a nextTick recursion warning, but that's not so bad. function emitReadable(stream) { var state = stream._readableState; state.needReadable = false; if (state.emittedReadable) - return; + return false; state.emittedReadable = true; - if (state.sync) - process.nextTick(function() { - emitReadable_(stream); - }); - else - emitReadable_(stream); -} - -function emitReadable_(stream) { - var state = stream._readableState; stream.emit('readable'); - maybeReadMore(stream, state); -} - -function maybeReadMore(stream, state) { - // at this point, the user has presumably seen the 'readable' event, - // and called read() to consume some data. that may have triggered - // in turn another _read(n,cb) call, in which case reading = true if - // it's in progress. - // However, if we're not ended, or reading, and the length < hwm, - // then go ahead and try to read some more right now preemptively. - if (!state.reading && !state.ending && !state.ended && - state.length < state.highWaterMark) { - stream.read(0); - } + return true; } // abstract method. to be overridden in specific implementation classes. diff --git a/lib/fs.js b/lib/fs.js index 78940a8304a2..3313f93c439d 100644 --- a/lib/fs.js +++ b/lib/fs.js @@ -1459,6 +1459,7 @@ ReadStream.prototype._read = function(n, cb) { this._read(n, cb); }); + // totally ignore the request once destroyed if (this.destroyed) return; diff --git a/lib/http.js b/lib/http.js index b6d68c635b19..e17c7362bc9e 100644 --- a/lib/http.js +++ b/lib/http.js @@ -337,8 +337,9 @@ IncomingMessage.prototype._read = function(n, callback) { // do need to unpause the underlying socket so that it flows. if (!this.socket.readable) return callback(null, null); - else - readStart(this.socket); + + readStart(this.socket); + callback(null, ''); }; diff --git a/lib/net.js b/lib/net.js index 714e5bb0a039..47a2c68f9831 100644 --- a/lib/net.js +++ b/lib/net.js @@ -343,7 +343,6 @@ Socket.prototype._read = function(n, callback) { return; } - assert(callback === this._readableState.onread); assert(this._readableState.reading = true); if (!this._handle.reading) { @@ -355,6 +354,7 @@ Socket.prototype._read = function(n, callback) { } else { debug('readStart already has been called.'); } + callback(null, ''); }; diff --git a/test/simple/test-stream-big-push.js b/test/simple/test-stream-big-push.js index f54bcc30d38a..838e1433c9d0 100644 --- a/test/simple/test-stream-big-push.js +++ b/test/simple/test-stream-big-push.js @@ -42,6 +42,7 @@ r._read = function(n, cb) { } else if (reads === 1) { var ret = r.push(str); assert.equal(ret, false); + cb(null, ''); reads++; } else { assert(!eofed); diff --git a/test/simple/test-stream-push-order.js b/test/simple/test-stream-push-order.js index 900a8c778dff..11fedb81dfa5 100644 --- a/test/simple/test-stream-push-order.js +++ b/test/simple/test-stream-push-order.js @@ -37,7 +37,12 @@ s._read = function (n, cb) { var two = list.shift(); s.push(one); - cb(null, two); + if (list.length === 4) + setImmediate(function() { + cb(null, two); + }); + else + cb(null, two); }; var v = s.read(0);