Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

stream: 'readable' have precedence over flowing #18994

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 21 additions & 1 deletion doc/api/stream.md
Original file line number Diff line number Diff line change
Expand Up @@ -762,6 +762,9 @@ changes:
description: >
'readable' is always emitted in the next tick after
.push() is called
- version: REPLACEME
pr-url: https://github.com/nodejs/node/pull/18994
description: Using 'readable' requires calling .read().
-->

The `'readable'` event is emitted when there is data available to be read from
Expand All @@ -770,10 +773,16 @@ cause some amount of data to be read into an internal buffer.

```javascript
const readable = getReadableStreamSomehow();
readable.on('readable', () => {
readable.on('readable', function() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: not sure why this was changed and it doesn't really matter but for consistency I would keep the arrow function.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@lpinca it is using this.read() below

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oops, ignore me.

// there is some data to read now
let data;

while (data = this.read()) {
console.log(data);
}
});
```

The `'readable'` event will also be emitted once the end of the stream data
has been reached but before the `'end'` event is emitted.

Expand Down Expand Up @@ -806,6 +815,10 @@ In general, the `readable.pipe()` and `'data'` event mechanisms are easier to
understand than the `'readable'` event. However, handling `'readable'` might
result in increased throughput.

If both `'readable'` and [`'data'`][] are used at the same time, `'readable'`
takes precedence in controlling the flow, i.e. `'data'` will be emitted
only when [`stream.read()`][stream-read] is called.

##### readable.destroy([error])
<!-- YAML
added: v8.0.0
Expand Down Expand Up @@ -997,6 +1010,10 @@ the status of the `highWaterMark`.
##### readable.resume()
<!-- YAML
added: v0.9.4
changes:
- version: REPLACEME
pr-url: https://github.com/nodejs/node/pull/18994
description: Resume has no effect if there is a 'readable' event listening
-->

* Returns: {this}
Expand All @@ -1016,6 +1033,9 @@ getReadableStreamSomehow()
});
```

The `readable.resume()` method has no effect if there is a `'readable'`
event listener.

##### readable.setEncoding(encoding)
<!-- YAML
added: v0.9.4
Expand Down
57 changes: 50 additions & 7 deletions lib/_stream_readable.js
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,7 @@ Readable.prototype.unshift = function(chunk) {
};

function readableAddChunk(stream, chunk, encoding, addToFront, skipChunkCheck) {
debug('readableAddChunk', chunk);
var state = stream._readableState;
if (chunk === null) {
state.reading = false;
Expand Down Expand Up @@ -799,20 +800,24 @@ Readable.prototype.unpipe = function(dest) {
// Ensure readable listeners eventually get something
Readable.prototype.on = function(ev, fn) {
const res = Stream.prototype.on.call(this, ev, fn);
const state = this._readableState;

if (ev === 'data') {
// Start flowing on next tick if stream isn't explicitly paused
if (this._readableState.flowing !== false)
// update readableListening so that resume() may be a no-op
// a few lines down. This is needed to support once('readable').
state.readableListening = this.listenerCount('readable') > 0;

// Try start flowing on next tick if stream isn't explicitly paused
if (state.flowing !== false)
this.resume();
} else if (ev === 'readable') {
const state = this._readableState;
if (!state.endEmitted && !state.readableListening) {
state.readableListening = state.needReadable = true;
state.emittedReadable = false;
if (!state.reading) {
process.nextTick(nReadingNextTick, this);
} else if (state.length) {
if (state.length) {
emitReadable(this);
} else if (!state.reading) {
process.nextTick(nReadingNextTick, this);
}
}
}
Expand All @@ -821,6 +826,42 @@ Readable.prototype.on = function(ev, fn) {
};
Readable.prototype.addListener = Readable.prototype.on;

Readable.prototype.removeListener = function(ev, fn) {
const res = Stream.prototype.removeListener.call(this, ev, fn);

if (ev === 'readable') {
// We need to check if there is someone still listening to
// to readable and reset the state. However this needs to happen
// after readable has been emitted but before I/O (nextTick) to
// support once('readable', fn) cycles. This means that calling
// resume within the same tick will have no
// effect.
process.nextTick(updateReadableListening, this);
}

return res;
};

Readable.prototype.removeAllListeners = function(ev) {
const res = Stream.prototype.removeAllListeners.call(this, ev);

if (ev === 'readable' || ev === undefined) {
// We need to check if there is someone still listening to
// to readable and reset the state. However this needs to happen
// after readable has been emitted but before I/O (nextTick) to
// support once('readable', fn) cycles. This means that calling
// resume within the same tick will have no
// effect.
process.nextTick(updateReadableListening, this);
}

return res;
};

function updateReadableListening(self) {
self._readableState.readableListening = self.listenerCount('readable') > 0;
}

function nReadingNextTick(self) {
debug('readable nexttick read 0');
self.read(0);
Expand All @@ -832,7 +873,9 @@ Readable.prototype.resume = function() {
var state = this._readableState;
if (!state.flowing) {
debug('resume');
state.flowing = true;
// we flow only if there is no one listening
// for readable
state.flowing = !state.readableListening;
resume(this, state);
}
return this;
Expand Down
52 changes: 52 additions & 0 deletions test/parallel/test-http-readable-data-event.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
'use strict';

const common = require('../common');
const assert = require('assert');
const http = require('http');
const helloWorld = 'Hello World!';
const helloAgainLater = 'Hello again later!';

const server = http.createServer((req, res) => {
res.writeHead(200, {
'Content-Length': '' + (helloWorld.length + helloAgainLater.length)
});
res.write(helloWorld);

// we need to make sure the data is flushed
setTimeout(() => {
res.end(helloAgainLater);
}, common.platformTimeout(10));
}).listen(0, function() {
const opts = {
hostname: 'localhost',
port: server.address().port,
path: '/'
};

const expectedData = [helloWorld, helloAgainLater];
const expectedRead = [helloWorld, null, helloAgainLater, null];

const req = http.request(opts, (res) => {
res.on('error', common.mustNotCall);

res.on('readable', common.mustCall(() => {
let data;

do {
data = res.read();
assert.strictEqual(data, expectedRead.shift());
} while (data !== null);
}, 2));

res.setEncoding('utf8');
res.on('data', common.mustCall((data) => {
assert.strictEqual(data, expectedData.shift());
}, 2));

res.on('end', common.mustCall(() => {
server.close();
}));
});

req.end();
});
Loading