Skip to content

Commit

Permalink
stream: 'readable' have precedence over flowing
Browse files Browse the repository at this point in the history
In Streams3 the 'readable' event/.read() method had a lower precedence
than the `'data'` event that made them impossible to use them together.
This make `.resume()` a no-op if there is a listener for the
`'readable'` event, making the stream non-flowing if there is a
`'data'`  listener.

Fixes: #18058
  • Loading branch information
mcollina committed Apr 4, 2018
1 parent 3567ea0 commit 578a7b9
Show file tree
Hide file tree
Showing 5 changed files with 269 additions and 49 deletions.
22 changes: 21 additions & 1 deletion doc/api/stream.md
Original file line number Diff line number Diff line change
Expand Up @@ -762,6 +762,9 @@ changes:
description: >
'readable' is always emitted in the next tick after
.push() is called
- version: REPLACEME
pr-url: https://github.com/nodejs/node/pull/18994
description: Using 'readable' requires calling .read().
-->

The `'readable'` event is emitted when there is data available to be read from
Expand All @@ -770,10 +773,16 @@ cause some amount of data to be read into an internal buffer.

```javascript
const readable = getReadableStreamSomehow();
readable.on('readable', () => {
readable.on('readable', function() {
// there is some data to read now
let data;

while (data = this.read()) {
console.log(data);
}
});
```

The `'readable'` event will also be emitted once the end of the stream data
has been reached but before the `'end'` event is emitted.

Expand Down Expand Up @@ -806,6 +815,10 @@ In general, the `readable.pipe()` and `'data'` event mechanisms are easier to
understand than the `'readable'` event. However, handling `'readable'` might
result in increased throughput.

If both `'readable'` and [`'data'`][] are used at the same time, `'readable'`
takes precedence in controlling the flow, i.e. `'data'` will be emitted
only when [`stream.read()`][stream-read] is called.

##### readable.destroy([error])
<!-- YAML
added: v8.0.0
Expand Down Expand Up @@ -997,6 +1010,10 @@ the status of the `highWaterMark`.
##### readable.resume()
<!-- YAML
added: v0.9.4
changes:
- version: REPLACEME
pr-url: https://github.com/nodejs/node/pull/18994
description: Resume has no effect if there is a 'readable' event listening
-->

* Returns: {this}
Expand All @@ -1016,6 +1033,9 @@ getReadableStreamSomehow()
});
```

The `readable.resume()` method has no effect if there is a `'readable'`
event listener.

##### readable.setEncoding(encoding)
<!-- YAML
added: v0.9.4
Expand Down
57 changes: 50 additions & 7 deletions lib/_stream_readable.js
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,7 @@ Readable.prototype.unshift = function(chunk) {
};

function readableAddChunk(stream, chunk, encoding, addToFront, skipChunkCheck) {
debug('readableAddChunk', chunk);
var state = stream._readableState;
if (chunk === null) {
state.reading = false;
Expand Down Expand Up @@ -799,20 +800,24 @@ Readable.prototype.unpipe = function(dest) {
// Ensure readable listeners eventually get something
Readable.prototype.on = function(ev, fn) {
const res = Stream.prototype.on.call(this, ev, fn);
const state = this._readableState;

if (ev === 'data') {
// Start flowing on next tick if stream isn't explicitly paused
if (this._readableState.flowing !== false)
// update readableListening so that resume() may be a no-op
// a few lines down. This is needed to support once('readable').
state.readableListening = this.listenerCount('readable') > 0;

// Try start flowing on next tick if stream isn't explicitly paused
if (state.flowing !== false)
this.resume();
} else if (ev === 'readable') {
const state = this._readableState;
if (!state.endEmitted && !state.readableListening) {
state.readableListening = state.needReadable = true;
state.emittedReadable = false;
if (!state.reading) {
process.nextTick(nReadingNextTick, this);
} else if (state.length) {
if (state.length) {
emitReadable(this);
} else if (!state.reading) {
process.nextTick(nReadingNextTick, this);
}
}
}
Expand All @@ -821,6 +826,42 @@ Readable.prototype.on = function(ev, fn) {
};
Readable.prototype.addListener = Readable.prototype.on;

Readable.prototype.removeListener = function(ev, fn) {
const res = Stream.prototype.removeListener.call(this, ev, fn);

if (ev === 'readable') {
// We need to check if there is someone still listening to
// to readable and reset the state. However this needs to happen
// after readable has been emitted but before I/O (nextTick) to
// support once('readable', fn) cycles. This means that calling
// resume within the same tick will have no
// effect.
process.nextTick(updateReadableListening, this);
}

return res;
};

Readable.prototype.removeAllListeners = function(ev) {
const res = Stream.prototype.removeAllListeners.call(this, ev);

if (ev === 'readable' || ev === undefined) {
// We need to check if there is someone still listening to
// to readable and reset the state. However this needs to happen
// after readable has been emitted but before I/O (nextTick) to
// support once('readable', fn) cycles. This means that calling
// resume within the same tick will have no
// effect.
process.nextTick(updateReadableListening, this);
}

return res;
};

function updateReadableListening(self) {
self._readableState.readableListening = self.listenerCount('readable') > 0;
}

function nReadingNextTick(self) {
debug('readable nexttick read 0');
self.read(0);
Expand All @@ -832,7 +873,9 @@ Readable.prototype.resume = function() {
var state = this._readableState;
if (!state.flowing) {
debug('resume');
state.flowing = true;
// we flow only if there is no one listening
// for readable
state.flowing = !state.readableListening;
resume(this, state);
}
return this;
Expand Down
52 changes: 52 additions & 0 deletions test/parallel/test-http-readable-data-event.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
'use strict';

const common = require('../common');
const assert = require('assert');
const http = require('http');
const helloWorld = 'Hello World!';
const helloAgainLater = 'Hello again later!';

const server = http.createServer((req, res) => {
res.writeHead(200, {
'Content-Length': '' + (helloWorld.length + helloAgainLater.length)
});
res.write(helloWorld);

// we need to make sure the data is flushed
setTimeout(() => {
res.end(helloAgainLater);
}, common.platformTimeout(10));
}).listen(0, function() {
const opts = {
hostname: 'localhost',
port: server.address().port,
path: '/'
};

const expectedData = [helloWorld, helloAgainLater];
const expectedRead = [helloWorld, null, helloAgainLater, null];

const req = http.request(opts, (res) => {
res.on('error', common.mustNotCall);

res.on('readable', common.mustCall(() => {
let data;

do {
data = res.read();
assert.strictEqual(data, expectedRead.shift());
} while (data !== null);
}, 2));

res.setEncoding('utf8');
res.on('data', common.mustCall((data) => {
assert.strictEqual(data, expectedData.shift());
}, 2));

res.on('end', common.mustCall(() => {
server.close();
}));
});

req.end();
});
Loading

0 comments on commit 578a7b9

Please sign in to comment.