From f7c4e103de396cc58816b3f7efb6a26a7b764b1e Mon Sep 17 00:00:00 2001 From: isaacs Date: Wed, 27 Feb 2013 16:56:30 -0800 Subject: [PATCH 1/4] stream: Break up the onread function A primary motivation of this is to make the onread function more inline-friendly, but also to make it more easy to explore not having onread at all, in favor of always using push() to signal the end of reading. --- lib/_stream_readable.js | 164 ++++++++++++++++++++-------------------- 1 file changed, 83 insertions(+), 81 deletions(-) diff --git a/lib/_stream_readable.js b/lib/_stream_readable.js index e75e201e02df..9193f4f27b99 100644 --- a/lib/_stream_readable.js +++ b/lib/_stream_readable.js @@ -105,21 +105,51 @@ function Readable(options) { // similar to how Writable.write() returns true if you should // write() some more. Readable.prototype.push = function(chunk) { - var rs = this._readableState; - rs.onread(null, chunk); - - // if it's past the high water mark, we can push in some more. - // Also, if we have no data yet, we can stand some - // more bytes. This is to work around cases where hwm=0, - // such as the repl. Also, if the push() triggered a - // readable event, and the user called read(largeNumber) such that - // needReadable was set, then we ought to push more, so that another - // 'readable' event will be triggered. - return rs.needReadable || - rs.length < rs.highWaterMark || - rs.length === 0; + var state = this._readableState; + return readableAddChunk(this, state, chunk); }; +function readableAddChunk(stream, state, chunk) { + state.reading = false; + + var er = chunkInvalid(state, chunk); + if (er) { + stream.emit('error', er); + } else if (chunk === null || chunk === undefined) { + onreadEof(stream, state); + } else if (state.objectMode || chunk && chunk.length > 0) { + if (state.decoder) + chunk = state.decoder.write(chunk); + + // update the buffer info. + state.length += state.objectMode ? 1 : chunk.length; + state.buffer.push(chunk); + + if (state.needReadable) + emitReadable(stream); + + maybeReadMore(stream, state); + } + + return needMoreData(state); +} + + + +// if it's past the high water mark, we can push in some more. +// Also, if we have no data yet, we can stand some +// more bytes. This is to work around cases where hwm=0, +// such as the repl. Also, if the push() triggered a +// readable event, and the user called read(largeNumber) such that +// needReadable was set, then we ought to push more, so that another +// 'readable' event will be triggered. +function needMoreData(state) { + return !state.ended && + (state.needReadable || + state.length < state.highWaterMark || + state.length === 0); +} + // backwards compatibility. Readable.prototype.setEncoding = function(enc) { if (!StringDecoder) @@ -263,15 +293,20 @@ Readable.prototype.read = function(n) { return ret; }; +// This is the function passed to _read(n,cb) as the callback. +// It should be called exactly once for every _read() call. function onread(stream, er, chunk) { var state = stream._readableState; var sync = state.sync; - // If we get something that is not a buffer, string, null, or undefined, - // and we're not in objectMode, then that's an error. - // Otherwise stream chunks are all considered to be of length=1, and the - // watermarks determine how many objects to keep in the buffer, rather than - // how many bytes or characters. + if (er) + stream.emit('error', er); + else + stream.push(chunk); +} + +function chunkInvalid(state, chunk) { + var er = null; if (!Buffer.isBuffer(chunk) && 'string' !== typeof chunk && chunk !== null && @@ -280,68 +315,26 @@ function onread(stream, er, chunk) { !er) { er = new TypeError('Invalid non-string/buffer chunk'); } + return er; +} - state.reading = false; - if (er) - return stream.emit('error', er); - if (chunk === null || chunk === undefined) { - // eof - state.ended = true; - if (state.decoder) { - chunk = state.decoder.end(); - if (chunk && chunk.length) { - state.buffer.push(chunk); - state.length += state.objectMode ? 1 : chunk.length; - } +function onreadEof(stream, state) { + state.ended = true; + if (state.decoder) { + var chunk = state.decoder.end(); + if (chunk && chunk.length) { + state.buffer.push(chunk); + state.length += state.objectMode ? 1 : chunk.length; } - - // if we've ended and we have some data left, then emit - // 'readable' now to make sure it gets picked up. - if (state.length > 0) - emitReadable(stream); - else - endReadable(stream); - return; - } - - // at this point, if we got a zero-length buffer or string, - // and we're not in object-mode, then there's really no point - // continuing. it means that there is nothing to read right - // now, but as we have not received the EOF-signaling null, - // we're not ended. we've already unset the reading flag, - // so just get out of here. - if (!state.objectMode && - (chunk || typeof chunk === 'string') && - 0 === chunk.length) - return; - - if (state.decoder) - chunk = state.decoder.write(chunk); - - // update the buffer info. - state.length += state.objectMode ? 1 : chunk.length; - state.buffer.push(chunk); - - // if we haven't gotten any data, - // and we haven't ended, then don't bother telling the user - // that it's time to read more data. Otherwise, emitting 'readable' - // probably will trigger another stream.read(), which can trigger - // another _read(n,cb) before this one returns! - if (state.length === 0) { - state.reading = true; - stream._read(state.bufferSize, state.onread); - return; } - if (state.needReadable) + // if we've ended and we have some data left, then emit + // 'readable' now to make sure it gets picked up. + if (state.length > 0) emitReadable(stream); - else if (state.sync) - process.nextTick(function() { - maybeReadMore(stream, state); - }); else - maybeReadMore(stream, state); + endReadable(stream); } // Don't emit readable right away in sync mode, because this can trigger @@ -365,17 +358,26 @@ function emitReadable(stream) { function emitReadable_(stream) { var state = stream._readableState; stream.emit('readable'); - maybeReadMore(stream, state); } + +// at this point, the user has presumably seen the 'readable' event, +// and called read() to consume some data. that may have triggered +// in turn another _read(n,cb) call, in which case reading = true if +// it's in progress. +// However, if we're not ended, or reading, and the length < hwm, +// then go ahead and try to read some more right now preemptively. function maybeReadMore(stream, state) { - // at this point, the user has presumably seen the 'readable' event, - // and called read() to consume some data. that may have triggered - // in turn another _read(n,cb) call, in which case reading = true if - // it's in progress. - // However, if we're not ended, or reading, and the length < hwm, - // then go ahead and try to read some more right now preemptively. - if (!state.reading && !state.ending && !state.ended && + if (state.sync) + process.nextTick(function() { + maybeReadMore_(stream, state); + }); + else + maybeReadMore_(stream, state); +} + +function maybeReadMore_(stream, state) { + if (!state.reading && !state.ended && state.length < state.highWaterMark) { stream.read(0); } From babb3ce48f79c748d785ab23462664ec26844ee9 Mon Sep 17 00:00:00 2001 From: isaacs Date: Wed, 27 Feb 2013 19:32:19 -0800 Subject: [PATCH 2/4] stream: Add stream.unshift(chunk) --- lib/_stream_readable.js | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) diff --git a/lib/_stream_readable.js b/lib/_stream_readable.js index 9193f4f27b99..9f384a409e92 100644 --- a/lib/_stream_readable.js +++ b/lib/_stream_readable.js @@ -106,10 +106,15 @@ function Readable(options) { // write() some more. Readable.prototype.push = function(chunk) { var state = this._readableState; - return readableAddChunk(this, state, chunk); + return readableAddChunk(this, state, chunk, false); }; -function readableAddChunk(stream, state, chunk) { +Readable.prototype.unshift = function(chunk) { + var state = this._readableState; + return readableAddChunk(this, state, chunk, true); +}; + +function readableAddChunk(stream, state, chunk, addToFront) { state.reading = false; var er = chunkInvalid(state, chunk); @@ -123,7 +128,10 @@ function readableAddChunk(stream, state, chunk) { // update the buffer info. state.length += state.objectMode ? 1 : chunk.length; - state.buffer.push(chunk); + if (addToFront) + state.buffer.unshift(chunk); + else + state.buffer.push(chunk); if (state.needReadable) emitReadable(stream); From 7410a9358788d631756a961f8b632c7370b2a151 Mon Sep 17 00:00:00 2001 From: isaacs Date: Thu, 28 Feb 2013 15:32:32 -0800 Subject: [PATCH 3/4] stream: There is no _read cb, there is only push This makes it so that `stream.push(chunk)` is the only way to signal the end of reading, removing the confusing disparity between the callback-style _read method, and the fact that most real-world streams do not have a 1:1 corollation between the "please give me data" event, and the actual arrival of a chunk of data. It is still possible, of course, to implement a `CallbackReadable` on top of this. Simply provide a method like this as the callback: function readCallback(er, chunk) { if (er) stream.emit('error', er); else stream.push(chunk); } However, *only* fs streams actually would behave in this way, so it makes not a lot of sense to make TCP, TLS, HTTP, and all the rest have to bend into this uncomfortable paradigm. --- lib/_stream_readable.js | 36 +++++-------------- lib/_stream_transform.js | 10 +++--- lib/fs.js | 20 +++++------ lib/http.js | 4 +-- lib/net.js | 20 ++++------- lib/tls.js | 14 ++++---- test/simple/test-stream-big-push.js | 6 ++-- test/simple/test-stream-push-order.js | 15 ++++---- test/simple/test-stream2-basic.js | 6 ++-- test/simple/test-stream2-compatibility.js | 4 +-- test/simple/test-stream2-finish-pipe.js | 14 ++++---- test/simple/test-stream2-objects.js | 12 +++---- .../test-stream2-pipe-error-handling.js | 8 ++--- test/simple/test-stream2-read-sync-stack.js | 4 +-- ...st-stream2-readable-empty-buffer-no-eof.js | 20 +++++------ .../test-stream2-readable-legacy-drain.js | 4 +-- .../test-stream2-readable-non-empty-end.js | 4 +-- test/simple/test-stream2-set-encoding.js | 10 +++--- test/simple/test-stream2-unpipe-drain.js | 4 +-- test/simple/test-stream2-unpipe-leak.js | 16 ++++----- 20 files changed, 105 insertions(+), 126 deletions(-) diff --git a/lib/_stream_readable.js b/lib/_stream_readable.js index 9f384a409e92..b9f8291874f1 100644 --- a/lib/_stream_readable.js +++ b/lib/_stream_readable.js @@ -31,7 +31,7 @@ util.inherits(Readable, Stream); function ReadableState(options, stream) { options = options || {}; - // the argument passed to this._read(n,cb) + // the argument passed to this._read(n) this.bufferSize = options.bufferSize || 16 * 1024; // the point at which it stops calling _read() to fill the buffer @@ -58,10 +58,6 @@ function ReadableState(options, stream) { // not happen before the first write call. this.sync = true; - this.onread = function(er, data) { - onread(stream, er, data); - }; - // whenever we return null, then we set a flag to say // that we're awaiting a 'readable' event emission. this.needReadable = false; @@ -121,7 +117,7 @@ function readableAddChunk(stream, state, chunk, addToFront) { if (er) { stream.emit('error', er); } else if (chunk === null || chunk === undefined) { - onreadEof(stream, state); + onEofChunk(stream, state); } else if (state.objectMode || chunk && chunk.length > 0) { if (state.decoder) chunk = state.decoder.write(chunk); @@ -196,7 +192,7 @@ function howMuchToRead(n, state) { return n; } -// you can override either this method, or _read(n, cb) below. +// you can override either this method, or the async _read(n) below. Readable.prototype.read = function(n) { var state = this._readableState; var nOrig = n; @@ -264,7 +260,7 @@ Readable.prototype.read = function(n) { if (state.length === 0) state.needReadable = true; // call internal read method - this._read(state.bufferSize, state.onread); + this._read(state.bufferSize); state.sync = false; } @@ -301,18 +297,6 @@ Readable.prototype.read = function(n) { return ret; }; -// This is the function passed to _read(n,cb) as the callback. -// It should be called exactly once for every _read() call. -function onread(stream, er, chunk) { - var state = stream._readableState; - var sync = state.sync; - - if (er) - stream.emit('error', er); - else - stream.push(chunk); -} - function chunkInvalid(state, chunk) { var er = null; if (!Buffer.isBuffer(chunk) && @@ -327,7 +311,7 @@ function chunkInvalid(state, chunk) { } -function onreadEof(stream, state) { +function onEofChunk(stream, state) { state.ended = true; if (state.decoder) { var chunk = state.decoder.end(); @@ -371,7 +355,7 @@ function emitReadable_(stream) { // at this point, the user has presumably seen the 'readable' event, // and called read() to consume some data. that may have triggered -// in turn another _read(n,cb) call, in which case reading = true if +// in turn another _read(n) call, in which case reading = true if // it's in progress. // However, if we're not ended, or reading, and the length < hwm, // then go ahead and try to read some more right now preemptively. @@ -395,10 +379,8 @@ function maybeReadMore_(stream, state) { // call cb(er, data) where data is <= n in length. // for virtual (non-string, non-buffer) streams, "length" is somewhat // arbitrary, and perhaps not very meaningful. -Readable.prototype._read = function(n, cb) { - process.nextTick(function() { - cb(new Error('not implemented')); - }); +Readable.prototype._read = function(n) { + this.emit('error', new Error('not implemented')); }; Readable.prototype.pipe = function(dest, pipeOpts) { @@ -758,7 +740,7 @@ Readable.prototype.wrap = function(stream) { // when we try to consume some more bytes, simply unpause the // underlying stream. - self._read = function(n, cb) { + self._read = function(n) { if (paused) { stream.resume(); paused = false; diff --git a/lib/_stream_transform.js b/lib/_stream_transform.js index 0ee5a5030eed..d8f6e6053284 100644 --- a/lib/_stream_transform.js +++ b/lib/_stream_transform.js @@ -36,11 +36,11 @@ // The Transform stream has all the aspects of the readable and writable // stream classes. When you write(chunk), that calls _write(chunk,cb) // internally, and returns false if there's a lot of pending writes -// buffered up. When you call read(), that calls _read(n,cb) until +// buffered up. When you call read(), that calls _read(n) until // there's enough pending readable data buffered up. // // In a transform stream, the written data is placed in a buffer. When -// _read(n,cb) is called, it transforms the queued up data, calling the +// _read(n) is called, it transforms the queued up data, calling the // buffered _write cb's as it consumes chunks. If consuming a single // written chunk would result in multiple output chunks, then the first // outputted bit calls the readcb, and subsequent chunks just go into @@ -106,7 +106,7 @@ function afterTransform(stream, er, data) { var rs = stream._readableState; if (rs.needReadable || rs.length < rs.highWaterMark) { - stream._read(); + stream._read(rs.bufferSize); } } @@ -162,13 +162,13 @@ Transform.prototype._write = function(chunk, cb) { return; var rs = this._readableState; if (ts.needTransform || rs.needReadable || rs.length < rs.highWaterMark) - this._read(); + this._read(rs.bufferSize); }; // Doesn't matter what the args are here. // the output and callback functions passed to _transform do all the work. // That we got here means that the readable side wants more data. -Transform.prototype._read = function(n, cb) { +Transform.prototype._read = function(n) { var ts = this._transformState; if (ts.writechunk && ts.writecb && !ts.transforming) { diff --git a/lib/fs.js b/lib/fs.js index 8188f7170000..a1bc487b3d33 100644 --- a/lib/fs.js +++ b/lib/fs.js @@ -1453,10 +1453,10 @@ ReadStream.prototype.open = function() { }); }; -ReadStream.prototype._read = function(n, cb) { +ReadStream.prototype._read = function(n) { if (typeof this.fd !== 'number') return this.once('open', function() { - this._read(n, cb); + this._read(n); }); if (this.destroyed) @@ -1482,7 +1482,7 @@ ReadStream.prototype._read = function(n, cb) { // already read everything we were supposed to read! // treat as EOF. if (toRead <= 0) - return cb(); + return this.push(null); // the actual read. var self = this; @@ -1498,14 +1498,14 @@ ReadStream.prototype._read = function(n, cb) { if (self.autoClose) { self.destroy(); } - return cb(er); - } - - var b = null; - if (bytesRead > 0) - b = thisPool.slice(start, start + bytesRead); + self.emit('error', er); + } else { + var b = null; + if (bytesRead > 0) + b = thisPool.slice(start, start + bytesRead); - cb(null, b); + self.push(b); + } } }; diff --git a/lib/http.js b/lib/http.js index e5c709da0f98..56d9aae42363 100644 --- a/lib/http.js +++ b/lib/http.js @@ -331,12 +331,12 @@ IncomingMessage.prototype.read = function(n) { }; -IncomingMessage.prototype._read = function(n, callback) { +IncomingMessage.prototype._read = function(n) { // We actually do almost nothing here, because the parserOnBody // function fills up our internal buffer directly. However, we // do need to unpause the underlying socket so that it flows. if (!this.socket.readable) - return callback(null, null); + this.push(null); else readStart(this.socket); }; diff --git a/lib/net.js b/lib/net.js index d5e85b17e804..ceb4dc863d34 100644 --- a/lib/net.js +++ b/lib/net.js @@ -228,7 +228,7 @@ function afterShutdown(status, handle, req) { function onSocketEnd() { // XXX Should not have to do as much crap in this function. // ended should already be true, since this is called *after* - // the EOF errno and onread has returned null to the _read cb. + // the EOF errno and onread has eof'ed debug('onSocketEnd', this._readableState); this._readableState.ended = true; if (this._readableState.endEmitted) { @@ -335,25 +335,19 @@ Object.defineProperty(Socket.prototype, 'bufferSize', { // Just call handle.readStart until we have enough in the buffer -Socket.prototype._read = function(n, callback) { +Socket.prototype._read = function(n) { debug('_read'); + if (this._connecting || !this._handle) { debug('_read wait for connection'); - this.once('connect', this._read.bind(this, n, callback)); - return; - } - - assert(callback === this._readableState.onread); - assert(this._readableState.reading = true); - - if (!this._handle.reading) { + this.once('connect', this._read.bind(this, n)); + } else if (!this._handle.reading) { + // not already reading, start the flow debug('Socket._read readStart'); this._handle.reading = true; var r = this._handle.readStart(); if (r) this._destroy(errnoException(process._errno, 'read')); - } else { - debug('readStart already has been called.'); } }; @@ -495,7 +489,7 @@ function onread(buffer, offset, length) { if (self.onend) self.once('end', self.onend); - // send a null to the _read cb to signal the end of data. + // push a null to signal the end of data. self.push(null); // internal end event so that we know that the actual socket diff --git a/lib/tls.js b/lib/tls.js index 2a780373adff..bb0a03af22e5 100644 --- a/lib/tls.js +++ b/lib/tls.js @@ -381,12 +381,13 @@ CryptoStream.prototype._writePending = function writePending() { }; -CryptoStream.prototype._read = function read(size, cb) { +CryptoStream.prototype._read = function read(size) { // XXX: EOF?! - if (!this.pair.ssl) return cb(null, null); + if (!this.pair.ssl) return this.push(null); // Wait for session to be resumed - if (this._resumingSession || !this._reading) return cb(null, ''); + // Mark that we're done reading, but don't provide data or EOF + if (this._resumingSession || !this._reading) return this.push(''); var out; if (this === this.pair.cleartext) { @@ -441,11 +442,12 @@ CryptoStream.prototype._read = function read(size, cb) { if (this === this.pair.cleartext) this._opposite._done(); - return cb(null, null); + // EOF + return this.push(null); } // Bail out - return cb(null, ''); + return this.push(''); } // Give them requested data @@ -459,7 +461,7 @@ CryptoStream.prototype._read = function read(size, cb) { self.read(bytesRead); }); } - return cb(null, pool.slice(start, start + bytesRead)); + return this.push(pool.slice(start, start + bytesRead)); }; diff --git a/test/simple/test-stream-big-push.js b/test/simple/test-stream-big-push.js index f54bcc30d38a..e3787e4412f3 100644 --- a/test/simple/test-stream-big-push.js +++ b/test/simple/test-stream-big-push.js @@ -33,10 +33,10 @@ var reads = 0; var eofed = false; var ended = false; -r._read = function(n, cb) { +r._read = function(n) { if (reads === 0) { setTimeout(function() { - cb(null, str); + r.push(str); }); reads++; } else if (reads === 1) { @@ -46,7 +46,7 @@ r._read = function(n, cb) { } else { assert(!eofed); eofed = true; - cb(null, null); + r.push(null); } }; diff --git a/test/simple/test-stream-push-order.js b/test/simple/test-stream-push-order.js index 900a8c778dff..f2e6ec29ce34 100644 --- a/test/simple/test-stream-push-order.js +++ b/test/simple/test-stream-push-order.js @@ -30,14 +30,15 @@ var s = new Readable({ var list = ['1', '2', '3', '4', '5', '6']; -s._read = function (n, cb) { +s._read = function (n) { var one = list.shift(); - if (!one) - return cb(null, null); - - var two = list.shift(); - s.push(one); - cb(null, two); + if (!one) { + s.push(null); + } else { + var two = list.shift(); + s.push(one); + s.push(two); + } }; var v = s.read(0); diff --git a/test/simple/test-stream2-basic.js b/test/simple/test-stream2-basic.js index c24ec243f6b0..d3b53fd7cd3c 100644 --- a/test/simple/test-stream2-basic.js +++ b/test/simple/test-stream2-basic.js @@ -406,7 +406,7 @@ test('read(0) for ended streams', function (t) { var r = new R(); var written = false; var ended = false; - r._read = function () {}; + r._read = function (n) {}; r.push(new Buffer("foo")); r.push(null); @@ -435,8 +435,8 @@ test('read(0) for ended streams', function (t) { test('sync _read ending', function (t) { var r = new R(); var called = false; - r._read = function (n, cb) { - cb(null, null); + r._read = function (n) { + r.push(null); }; r.once('end', function () { diff --git a/test/simple/test-stream2-compatibility.js b/test/simple/test-stream2-compatibility.js index cb07278d55e1..2b98c1fa8f0e 100644 --- a/test/simple/test-stream2-compatibility.js +++ b/test/simple/test-stream2-compatibility.js @@ -41,8 +41,8 @@ function TestReader() { util.inherits(TestReader, R); -TestReader.prototype._read = function(n, cb) { - cb(null, this._buffer); +TestReader.prototype._read = function(n) { + this.push(this._buffer); this._buffer = new Buffer(0); }; diff --git a/test/simple/test-stream2-finish-pipe.js b/test/simple/test-stream2-finish-pipe.js index 686af2449d8a..bcb57a74a0ad 100644 --- a/test/simple/test-stream2-finish-pipe.js +++ b/test/simple/test-stream2-finish-pipe.js @@ -23,19 +23,19 @@ var common = require('../common.js'); var stream = require('stream'); var Buffer = require('buffer').Buffer; -var R = new stream.Readable(); -R._read = function(size, cb) { - cb(null, new Buffer(size)); +var r = new stream.Readable(); +r._read = function(size) { + r.push(new Buffer(size)); }; -var W = new stream.Writable(); -W._write = function(data, cb) { +var w = new stream.Writable(); +w._write = function(data, cb) { cb(null); }; -R.pipe(W); +r.pipe(w); // This might sound unrealistic, but it happens in net.js. When // `socket.allowHalfOpen === false`, EOF will cause `.destroySoon()` call which // ends the writable side of net.Socket. -W.end(); +w.end(); diff --git a/test/simple/test-stream2-objects.js b/test/simple/test-stream2-objects.js index 7ff30c850ed4..8939ad7a6829 100644 --- a/test/simple/test-stream2-objects.js +++ b/test/simple/test-stream2-objects.js @@ -126,9 +126,9 @@ test('read(n) is ignored', function(t) { test('can read objects from _read (sync)', function(t) { var r = new Readable({ objectMode: true }); var list = [{ one: '1'}, { two: '2' }]; - r._read = function(n, cb) { + r._read = function(n) { var item = list.shift(); - cb(null, item || null); + r.push(item || null); }; r.pipe(toArray(function(list) { @@ -144,10 +144,10 @@ test('can read objects from _read (sync)', function(t) { test('can read objects from _read (async)', function(t) { var r = new Readable({ objectMode: true }); var list = [{ one: '1'}, { two: '2' }]; - r._read = function(n, cb) { + r._read = function(n) { var item = list.shift(); process.nextTick(function() { - cb(null, item || null); + r.push(item || null); }); }; @@ -223,7 +223,7 @@ test('high watermark _read', function(t) { var calls = 0; var list = ['1', '2', '3', '4', '5', '6', '7', '8']; - r._read = function() { + r._read = function(n) { calls++; }; @@ -249,7 +249,7 @@ test('high watermark push', function(t) { highWaterMark: 6, objectMode: true }); - r._read = function() {}; + r._read = function(n) {}; for (var i = 0; i < 6; i++) { var bool = r.push(i); assert.equal(bool, i === 5 ? false : true); diff --git a/test/simple/test-stream2-pipe-error-handling.js b/test/simple/test-stream2-pipe-error-handling.js index c17139f5d37e..82c9a79be96f 100644 --- a/test/simple/test-stream2-pipe-error-handling.js +++ b/test/simple/test-stream2-pipe-error-handling.js @@ -27,10 +27,10 @@ var stream = require('stream'); var count = 1000; var source = new stream.Readable(); - source._read = function(n, cb) { + source._read = function(n) { n = Math.min(count, n); count -= n; - cb(null, new Buffer(n)); + source.push(new Buffer(n)); }; var unpipedDest; @@ -67,10 +67,10 @@ var stream = require('stream'); var count = 1000; var source = new stream.Readable(); - source._read = function(n, cb) { + source._read = function(n) { n = Math.min(count, n); count -= n; - cb(null, new Buffer(n)); + source.push(new Buffer(n)); }; var unpipedDest; diff --git a/test/simple/test-stream2-read-sync-stack.js b/test/simple/test-stream2-read-sync-stack.js index 4e5ab1729553..e8a73053c8c2 100644 --- a/test/simple/test-stream2-read-sync-stack.js +++ b/test/simple/test-stream2-read-sync-stack.js @@ -30,9 +30,9 @@ var N = 256 * 1024; process.maxTickDepth = N + 2; var reads = 0; -r._read = function(n, cb) { +r._read = function(n) { var chunk = reads++ === N ? null : new Buffer(1); - cb(null, chunk); + r.push(chunk); }; r.on('readable', function onReadable() { diff --git a/test/simple/test-stream2-readable-empty-buffer-no-eof.js b/test/simple/test-stream2-readable-empty-buffer-no-eof.js index 6cc7c73f08f4..cd301785802f 100644 --- a/test/simple/test-stream2-readable-empty-buffer-no-eof.js +++ b/test/simple/test-stream2-readable-empty-buffer-no-eof.js @@ -43,28 +43,28 @@ function test1() { var buf = new Buffer(5); buf.fill('x'); var reads = 5; - r._read = function(n, cb) { + r._read = function(n) { switch (reads--) { case 0: - return cb(null, null); // EOF + return r.push(null); // EOF case 1: - return cb(null, buf); + return r.push(buf); case 2: setTimeout(r.read.bind(r, 0), 10); - return cb(null, new Buffer(0)); // Not-EOF! + return r.push(new Buffer(0)); // Not-EOF! case 3: setTimeout(r.read.bind(r, 0), 10); return process.nextTick(function() { - return cb(null, new Buffer(0)); + return r.push(new Buffer(0)); }); case 4: setTimeout(r.read.bind(r, 0), 10); return setTimeout(function() { - return cb(null, new Buffer(0)); + return r.push(new Buffer(0)); }); case 5: return setTimeout(function() { - return cb(null, buf); + return r.push(buf); }); default: throw new Error('unreachable'); @@ -92,11 +92,11 @@ function test1() { function test2() { var r = new Readable({ encoding: 'base64' }); var reads = 5; - r._read = function(n, cb) { + r._read = function(n) { if (!reads--) - return cb(null, null); // EOF + return r.push(null); // EOF else - return cb(null, new Buffer('x')); + return r.push(new Buffer('x')); }; var results = []; diff --git a/test/simple/test-stream2-readable-legacy-drain.js b/test/simple/test-stream2-readable-legacy-drain.js index a1fffd9bb502..675da8e90dea 100644 --- a/test/simple/test-stream2-readable-legacy-drain.js +++ b/test/simple/test-stream2-readable-legacy-drain.js @@ -28,8 +28,8 @@ var Readable = Stream.Readable; var r = new Readable(); var N = 256; var reads = 0; -r._read = function(n, cb) { - return cb(null, ++reads === N ? null : new Buffer(1)); +r._read = function(n) { + return r.push(++reads === N ? null : new Buffer(1)); }; var rended = false; diff --git a/test/simple/test-stream2-readable-non-empty-end.js b/test/simple/test-stream2-readable-non-empty-end.js index 351bef6098da..7314ae77b138 100644 --- a/test/simple/test-stream2-readable-non-empty-end.js +++ b/test/simple/test-stream2-readable-non-empty-end.js @@ -32,10 +32,10 @@ for (var i = 1; i <= 10; i++) { var test = new Readable(); var n = 0; -test._read = function(size, cb) { +test._read = function(size) { var chunk = chunks[n++]; setTimeout(function() { - cb(null, chunk); + test.push(chunk); }); }; diff --git a/test/simple/test-stream2-set-encoding.js b/test/simple/test-stream2-set-encoding.js index 3571bac4f121..758a4342eed5 100644 --- a/test/simple/test-stream2-set-encoding.js +++ b/test/simple/test-stream2-set-encoding.js @@ -72,25 +72,25 @@ function TestReader(n, opts) { this.len = n || 100; } -TestReader.prototype._read = function(n, cb) { +TestReader.prototype._read = function(n) { setTimeout(function() { if (this.pos >= this.len) { - return cb(); + return this.push(null); } n = Math.min(n, this.len - this.pos); if (n <= 0) { - return cb(); + return this.push(null); } this.pos += n; var ret = new Buffer(n); ret.fill('a'); - console.log("cb(null, ret)", ret) + console.log("this.push(ret)", ret) - return cb(null, ret); + return this.push(ret); }.bind(this), 1); }; diff --git a/test/simple/test-stream2-unpipe-drain.js b/test/simple/test-stream2-unpipe-drain.js index 0fc963e802f2..4f5b3b7532a8 100644 --- a/test/simple/test-stream2-unpipe-drain.js +++ b/test/simple/test-stream2-unpipe-drain.js @@ -45,9 +45,9 @@ function TestReader(id) { } util.inherits(TestReader, stream.Readable); -TestReader.prototype._read = function (size, callback) { +TestReader.prototype._read = function (size) { this.reads += 1; - crypto.randomBytes(size, callback); + this.push(crypto.randomBytes(size)); }; var src1 = new TestReader(); diff --git a/test/simple/test-stream2-unpipe-leak.js b/test/simple/test-stream2-unpipe-leak.js index a560bfa0cb9d..993dd16e3870 100644 --- a/test/simple/test-stream2-unpipe-leak.js +++ b/test/simple/test-stream2-unpipe-leak.js @@ -27,30 +27,30 @@ var stream = require('stream'); var util = require('util'); function TestWriter() { - stream.Writable.call(this); + stream.Writable.call(this); } util.inherits(TestWriter, stream.Writable); -TestWriter.prototype._write = function (buffer, callback) { - callback(null); +TestWriter.prototype._write = function(buffer, callback) { + callback(null); }; var dest = new TestWriter(); function TestReader() { - stream.Readable.call(this); + stream.Readable.call(this); } util.inherits(TestReader, stream.Readable); -TestReader.prototype._read = function (size, callback) { - callback(new Buffer('hallo')); +TestReader.prototype._read = function(size) { + stream.push(new Buffer('hallo')); }; var src = new TestReader(); for (var i = 0; i < 10; i++) { - src.pipe(dest); - src.unpipe(dest); + src.pipe(dest); + src.unpipe(dest); } assert.equal(src.listeners('end').length, 0); From 6479405d545c8d7c24fbd05f16ba8a438840ba8d Mon Sep 17 00:00:00 2001 From: isaacs Date: Thu, 28 Feb 2013 15:42:55 -0800 Subject: [PATCH 4/4] doc: Provide 2 examples of SimpleProtocol parser The first example uses Readable, and shows the use of readable.unshift(). The second uses the Transform class, showing that it's much simpler in this case. --- doc/api/stream.markdown | 226 +++++++++++++++++++++++++++++++++++++--- 1 file changed, 210 insertions(+), 16 deletions(-) diff --git a/doc/api/stream.markdown b/doc/api/stream.markdown index ebb771dd5591..86f45ca9f9f8 100644 --- a/doc/api/stream.markdown +++ b/doc/api/stream.markdown @@ -84,7 +84,7 @@ method. A `Readable Stream` has the following methods, members, and events. Note that `stream.Readable` is an abstract class designed to be -extended with an underlying implementation of the `_read(size, cb)` +extended with an underlying implementation of the `_read(size)` method. (See below.) ### new stream.Readable([options]) @@ -105,32 +105,39 @@ In classes that extend the Readable class, make sure to call the constructor so that the buffering settings can be properly initialized. -### readable.\_read(size, callback) +### readable.\_read(size) * `size` {Number} Number of bytes to read asynchronously * `callback` {Function} Called with an error or with data -All Readable stream implementations must provide a `_read` method -to fetch data from the underlying resource. - -Note: **This function MUST NOT be called directly.** It should be +Note: **This function should NOT be called directly.** It should be implemented by child classes, and called by the internal Readable class methods only. -Call the callback using the standard `callback(error, data)` pattern. -When no more data can be fetched, call `callback(null, null)` to -signal the EOF. +All Readable stream implementations must provide a `_read` method +to fetch data from the underlying resource. This method is prefixed with an underscore because it is internal to the class that defines it, and should not be called directly by user programs. However, you **are** expected to override this method in your own extension classes. +When data is available, put it into the read queue by calling +`readable.push(chunk)`. If `push` returns false, then you should stop +reading. When `_read` is called again, you should start pushing more +data. + ### readable.push(chunk) * `chunk` {Buffer | null | String} Chunk of data to push into the read queue * return {Boolean} Whether or not more pushes should be performed +Note: **This function should be called by Readable implementors, NOT +by consumers of Readable subclasses.** The `_read()` function will not +be called again until at least one `push(chunk)` call is made. If no +data is available, then you MAY call `push('')` (an empty string) to +allow a future `_read` call, without adding any data to the queue. + The `Readable` class works by putting data into a read queue to be pulled out later by calling the `read()` method when the `'readable'` event fires. @@ -167,6 +174,115 @@ stream._read = function(size, cb) { }; ``` +### readable.unshift(chunk) + +* `chunk` {Buffer | null | String} Chunk of data to unshift onto the read queue +* return {Boolean} Whether or not more pushes should be performed + +This is the corollary of `readable.push(chunk)`. Rather than putting +the data at the *end* of the read queue, it puts it at the *front* of +the read queue. + +This is useful in certain use-cases where a stream is being consumed +by a parser, which needs to "un-consume" some data that it has +optimistically pulled out of the source. + +```javascript +// A parser for a simple data protocol. +// The "header" is a JSON object, followed by 2 \n characters, and +// then a message body. +// +// Note: This can be done more simply as a Transform stream. See below. + +function SimpleProtocol(source, options) { + if (!(this instanceof SimpleProtocol)) + return new SimpleProtocol(options); + + Readable.call(this, options); + this._inBody = false; + this._sawFirstCr = false; + + // source is a readable stream, such as a socket or file + this._source = source; + + var self = this; + source.on('end', function() { + self.push(null); + }); + + // give it a kick whenever the source is readable + // read(0) will not consume any bytes + source.on('readable', function() { + self.read(0); + }); + + this._rawHeader = []; + this.header = null; +} + +SimpleProtocol.prototype = Object.create( + Readable.prototype, { constructor: { value: SimpleProtocol }}); + +SimpleProtocol.prototype._read = function(n) { + if (!this._inBody) { + var chunk = this._source.read(); + + // if the source doesn't have data, we don't have data yet. + if (chunk === null) + return this.push(''); + + // check if the chunk has a \n\n + var split = -1; + for (var i = 0; i < chunk.length; i++) { + if (chunk[i] === 10) { // '\n' + if (this._sawFirstCr) { + split = i; + break; + } else { + this._sawFirstCr = true; + } + } else { + this._sawFirstCr = false; + } + } + + if (split === -1) { + // still waiting for the \n\n + // stash the chunk, and try again. + this._rawHeader.push(chunk); + this.push(''); + } else { + this._inBody = true; + var h = chunk.slice(0, split); + this._rawHeader.push(h); + var header = Buffer.concat(this._rawHeader).toString(); + try { + this.header = JSON.parse(header); + } catch (er) { + this.emit('error', new Error('invalid simple protocol data')); + return; + } + // now, because we got some extra data, unshift the rest + // back into the read queue so that our consumer will see it. + this.unshift(b); + + // and let them know that we are done parsing the header. + this.emit('header', this.header); + } + } else { + // from there on, just provide the data to our consumer. + // careful not to push(null), since that would indicate EOF. + var chunk = this._source.read(); + if (chunk) this.push(chunk); + } +}; + +// Usage: +var parser = new SimpleProtocol(source); +// Now parser is a readable stream that will emit 'header' +// with the parsed header data. +``` + ### readable.wrap(stream) * `stream` {Stream} An "old style" readable stream @@ -232,6 +348,8 @@ constructor. * `size` {Number | null} Optional number of bytes to read. * Return: {Buffer | String | null} +Note: **This function SHOULD be called by Readable stream users.** + Call this method to consume data once the `'readable'` event is emitted. @@ -243,8 +361,8 @@ If there is no data to consume, or if there are fewer bytes in the internal buffer than the `size` argument, then `null` is returned, and a future `'readable'` event will be emitted when more is available. -Note that calling `stream.read(0)` will always return `null`, and will -trigger a refresh of the internal buffer, but otherwise be a no-op. +Calling `stream.read(0)` will always return `null`, and will trigger a +refresh of the internal buffer, but otherwise be a no-op. ### readable.pipe(destination, [options]) @@ -416,14 +534,14 @@ A "duplex" stream is one that is both Readable and Writable, such as a TCP socket connection. Note that `stream.Duplex` is an abstract class designed to be -extended with an underlying implementation of the `_read(size, cb)` +extended with an underlying implementation of the `_read(size)` and `_write(chunk, callback)` methods as you would with a Readable or Writable stream class. Since JavaScript doesn't have multiple prototypal inheritance, this class prototypally inherits from Readable, and then parasitically from Writable. It is thus up to the user to implement both the lowlevel -`_read(n,cb)` method as well as the lowlevel `_write(chunk,cb)` method +`_read(n)` method as well as the lowlevel `_write(chunk,cb)` method on extension duplex classes. ### new stream.Duplex(options) @@ -471,13 +589,13 @@ initialized. * `callback` {Function} Call this function (optionally with an error argument) when you are done processing the supplied chunk. -All Transform stream implementations must provide a `_transform` -method to accept input and produce output. - Note: **This function MUST NOT be called directly.** It should be implemented by child classes, and called by the internal Transform class methods only. +All Transform stream implementations must provide a `_transform` +method to accept input and produce output. + `_transform` should do whatever has to be done in this specific Transform class, to handle the bytes being written, and pass them off to the readable portion of the interface. Do asynchronous I/O, @@ -521,6 +639,82 @@ the class that defines it, and should not be called directly by user programs. However, you **are** expected to override this method in your own extension classes. +### Example: `SimpleProtocol` parser + +The example above of a simple protocol parser can be implemented much +more simply by using the higher level `Transform` stream class. + +In this example, rather than providing the input as an argument, it +would be piped into the parser, which is a more idiomatic Node stream +approach. + +```javascript +function SimpleProtocol(options) { + if (!(this instanceof SimpleProtocol)) + return new SimpleProtocol(options); + + Transform.call(this, options); + this._inBody = false; + this._sawFirstCr = false; + this._rawHeader = []; + this.header = null; +} + +SimpleProtocol.prototype = Object.create( + Transform.prototype, { constructor: { value: SimpleProtocol }}); + +SimpleProtocol.prototype._transform = function(chunk, output, done) { + if (!this._inBody) { + // check if the chunk has a \n\n + var split = -1; + for (var i = 0; i < chunk.length; i++) { + if (chunk[i] === 10) { // '\n' + if (this._sawFirstCr) { + split = i; + break; + } else { + this._sawFirstCr = true; + } + } else { + this._sawFirstCr = false; + } + } + + if (split === -1) { + // still waiting for the \n\n + // stash the chunk, and try again. + this._rawHeader.push(chunk); + } else { + this._inBody = true; + var h = chunk.slice(0, split); + this._rawHeader.push(h); + var header = Buffer.concat(this._rawHeader).toString(); + try { + this.header = JSON.parse(header); + } catch (er) { + this.emit('error', new Error('invalid simple protocol data')); + return; + } + // and let them know that we are done parsing the header. + this.emit('header', this.header); + + // now, because we got some extra data, emit this first. + output(b); + } + } else { + // from there on, just provide the data to our consumer as-is. + output(b); + } + done(); +}; + +var parser = new SimpleProtocol(); +source.pipe(parser) + +// Now parser is a readable stream that will emit 'header' +// with the parsed header data. +``` + ## Class: stream.PassThrough