Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

stream: added experimental support for for-await #17755

Closed
wants to merge 1 commit into from

Conversation

mcollina
Copy link
Member

@mcollina mcollina commented Dec 19, 2017

Adds support for Symbol.asyncIterator into the Readable class.

Fixes: #15709

Checklist
  • make -j4 test (UNIX), or vcbuild test (Windows) passes
  • tests and/or benchmarks are included
  • documentation is changed or added
  • commit message follows commit guidelines
Affected core subsystem(s)

stream

@nodejs-github-bot nodejs-github-bot added build Issues and PRs related to build files or the CI. stream Issues and PRs related to the stream subsystem. labels Dec 19, 2017
@mcollina mcollina requested a review from jasnell December 19, 2017 12:36
@mcollina
Copy link
Member Author

@mcollina mcollina requested a review from benjamingr December 19, 2017 12:37
@mcollina
Copy link
Member Author

Currently make lint is not passing, as it does not recognize for await as valid JS. I'm not sure how to make it understand that, can someone help me?

@targos
Copy link
Member

targos commented Dec 19, 2017

Currently make lint is not passing, as it does not recognize for await as valid JS. I'm not sure how to make it understand that, can someone help me?

ESLint only supports ES features when they reach stage 4. The async iteration proposal is still at stage 3, so we would need to install babel-eslint: https://github.com/babel/babel-eslint

Copy link
Member

@benjamingr benjamingr left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Very nice start - I see a bigger issue though, this approach should work with for...await loops but async iterators in general have no guarantee.

We need to deal with backpressure here :)

@@ -1159,6 +1159,31 @@ readable stream will release any internal resources.
Implementors should not override this method, but instead implement
[`readable._destroy`][readable-_destroy].

##### readable[Symbol.asyncIterator]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Offtopic: I almost want to weep at how nice this is, I've seen it before (with the package and when we tested this a year ago) but finally having I/O that looks like Python's async/io and performs well is awesome - no more callbacks everywhere, clean asynchronous code :)

print(fs.createReadStream('file')).catch(console.log);
```

If you break or throw from within the for-await loop, the stream will be
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We might want to should elaborate exactly what methods breaking and throwing call on the underlying ReadableStream.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mean:

for await (const k of readable) {
  break;
}

this would call return() on the AsyncIterator which will call .destroy().
This is debatable, as after .destroy() the readable cannot be used anymore.

However:

for await (const k of readable) {
  throw new Error('kaboom');
}

would call return() as well on the AsyncIterator which will call .destroy().
This is the correct behavior, otherwise we would want to do all the time:

try { 
 for await (const k of readable) {
   throw new Error('kaboom')
 }
} finally {
 readable.destroy()
}

Which will be prone to file descriptor leaking.

I'm not aware of a way to distinguish these two flows.

Should I document it in this way? Do you agree with the behavior?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with the behavior 100%, once you've started consuming an async iterator for a for await loop it should probably close if I break.

Pinging @zenparsing - I remember a lot of discussion about this - what should be the behavior in your opinion?

As for documentation - that is exactly what I meant - we should document that destroy will be called (rather than that the stream is destroyed) since it is a better guarantee for people subclassing ReadableStream.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Calling destroy is consistent with how async generator functions behave after calling return on their iterators, so 👍 .

const { promisify } = require('util');

class Item {
constructor(value, done) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pinging @caitp and @bmeurer - it would be interesting to know if we can avoid this being so expensive (allocating an object for the value of the iteration). In regular iterators V8 optimizes away for..of to regular iteration and other nice optimizations.

I think we should get a sense of how hard/easy this is to optimize, and if it's hard consider recycling objects for the iterator here which is dangerous but might be the only way we get reasonable performance outside of "scripting".

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or at least, do away with the done slot for the vast majority of objects.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not just use plain object with 2 fields? Why everything has to be a class?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@YurySolovyov not everything has to be a class, but in this particular case there are several benefits in naming the objects:

  • When looking at heap allocation profiles, it makes them easier to recognize (rather than just Object).
  • When debugging and looking at stack traces, you get more useful information since the objects are named.

As a platform, this makes naming objects appealing. I'm not sure it's worth it but it's definitely a reasonable call. I would name it AsyncIteratorRecord to be ore similar to the spec.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This shouldn't have a big perf impact either way. I looked at perf of object / class instance creation in my work on nextTick and it's negligible.

I don't like the idea of reusing it as that means the returned object can no longer be stored.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would also add that it make sure that objects created in multiple places maintain the same shape, this helps V8 in the optimization process.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do away with the done slot for the vast majority of objects.

We can't do that if we want to conform to the spec for what next should return.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The one suggestion I might have here is making done = false so we don't have to repeat that everywhere. It's the default state after all...

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@apapirovski I meant - we can put done = false on the prototype but I suspect it'll be slow (and might not conform).

I want this to eventually be fast to be useful :)


stream.on('end', () => {
if (this.lastResolve !== null) {
this.lastResolve(new Item(null, true));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm fine with this design decision (emitting true on the iterator as a separate value with null). Just making sure it's explicitly fine.

// destroy(err, cb) is a private API
// we can guarantee we have that here, because we control the
// Readable class this is attached to
const destroy = promisify(this.stream.destroy.bind(this.stream));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should check if this is reasonably fast.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not called often, only with break  or throw  within the loop.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wouldn't say that breaking inside a loop is an edge case, note this is also called when you return inside a loop which is also pretty common.

// Readable class this is attached to
const destroy = promisify(this.stream.destroy.bind(this.stream));
await destroy(null);
return new Item(null, true);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm...

@benjamingr
Copy link
Member

So this is a very nice start, but I'm not sure we work for a lot of cases where the async iterator isn't consumed in a for await.

Can you add tests for consuming streams without for..await, calling next() several times before the previous next call resolved and seeing what happens when destroy is called when we still have pending promises for next?

Copy link
Member

@apapirovski apapirovski left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems great to me. Scattered some actionable and non-actionable comments.

I'm not really clear on the back-pressure concerns brought up, it seems to work similar to consuming a stream the normal way. Maybe I'm misunderstanding something...

process.nextTick(readAndResolve, this);
});

stream.on('end', () => {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think, in general, bind would be better here for performance.

That said, this makes me wish we already had WeakRefs in JS because we could just make stream[kAsyncIterator] and get rid of closures altogether.

const { promisify } = require('util');

class Item {
constructor(value, done) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This shouldn't have a big perf impact either way. I looked at perf of object / class instance creation in my work on nextTick and it's negligible.

I don't like the idea of reusing it as that means the returned object can no longer be stored.

if (data) {
resolve(new Item(data, false));
} else if (this.lastResolve !== null) {
throw new Error('next can be called only once');
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmmm... given that readable is handled on nextTick, it seems like it could be possible to have this.lastResolve !== null and at the same time have data. Maybe that condition should go first, before even calling read()? (And yes, I know, obscure edge case...)

this.lastResolve = null;
this.lastReject = null;
this.error = null;
this.ended = false;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we store all of these in a way that's not publicly accessible? Whenever anything is made unintentionally public, we usually regret it later. 😞

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm actually considering a ReadableAsyncInterator a private object, meaning it's consumed by for await, and it's not really user facing. Should we treat it as user-facing? I will replace those with symbols then.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, but it can be returned by readable[Symbol.asyncIterator] so it's not truly private. I think the fact that it's really easy to get it and there might be legitimate use cases for it, makes me uneasy about exposing these props publicly.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Regarding the "instance creation" comment above (can't comment on it for some reason) - the scary part isn't creating the objects it's the GC afterwards :)

var warningEmitted = false;
Readable.prototype[Symbol.asyncIterator] = function() {
if (!warningEmitted) {
process.emitWarning(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's an emitExperimentalWarning somewhere in internal utils. We should start using it since it avoids needing to track warningEmitted. If it needs more features, we should extend it as needed.

const { promisify } = require('util');

class Item {
constructor(value, done) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do away with the done slot for the vast majority of objects.

We can't do that if we want to conform to the spec for what next should return.

@apapirovski
Copy link
Member

calling next() several times before the previous next call resolved

That would throw.

@mcollina
Copy link
Member Author

mcollina commented Dec 19, 2017

@benjamingr the current code works only with for await, it's not built for general usage. I went for the simplest possible code that could work there.

I'm not familiar with the tc39 proposal format, and I couldn't point where it is defined the behavior attached to the iterator object regarding things like backpressure and such. At this point, only the latest promise returned by next() will be resolved as expected. Handling next() call in parallel is not currently not supported.

I'm keen in not implementing a backpressure mechanism which will be needed to support multiple parallel next() calls. It would need a significant amount of work to be optimized, and we will likely loose a lot of throughput there.

Supporting only one next() would likely guarantee the maximum performance, which is a problem of async iterators.

Can you point me to cases where having parallel next() support can be useful for Readable?

@apapirovski
Copy link
Member

apapirovski commented Dec 19, 2017

I'm not familiar with the tc39 proposal format, and I couldn't point where it is defined the behavior attached to the iterator object regarding things like backpressure and such. At this point, only the latest promise returned by next() will be resolved as expected. Handling next() call in parallel is not currently not supported.

The spec calls for an internal queue that tracks outstanding next calls. So each next would still return a Promise that would await the one prior to it, then we could store them all in a singly linked list. Maybe there's a better way tho...

That said, I don't really know a practical situation where this would be useful...

@benjamingr
Copy link
Member

@benjamingr the current code works only with for await, it's not built for general usage. I went for the simplest possible code that could work there.

I see, but it is not a valid implementation of Symbol.asyncIterator in general - I can help you with any questions you have about the spec and we can iterate it.

Note that it's also fine to make the for...await version fast and the non-for...await version slower.

I'm not familiar with the tc39 proposal format, and I couldn't point where it is defined the behavior attached to the iterator object regarding things like backpressure and such.

With iterators unlike push-streams or observables backpressure is actually very easy since the consumer is the one asking for items. Instead of things being pushed to the consumer they are explicitly asked for with next.

At this point, only the latest promise returned by next() will be resolved as expected. Handling next() call in parallel is not currently not supported.

Yes, although it's not supposed to be much more work:

  • When next is called, if lastResolve is null - do exactly what you're doing now.
  • Otherwise, push it to a queue (slow case)
  • Resolve things in the queue in order when data becomes available from the stream.

This should remain fast while supporting the spec in its entirety.

I'm excited about this feature and the PR and I feel very strongly about giving users an API that would behave like a normal AsyncIterator.

@benjamingr
Copy link
Member

benjamingr commented Dec 19, 2017

@apapirovski

That said, I don't really know a practical situation where this would be useful...

Actually, come to think of it I think we might get away with rejecting the promise with an OutOfOrderIterationError if you call next() before the previous next resolved. I'll check what Python does.

As long as it's not ignored I guess we can say this is expected behavior with readable streams and document it.

It's not as good user experience but it's much better than what we do here and arguably simpler than supporting waiting for multiple values. It should also be tested.

@apapirovski
Copy link
Member

@benjamingr @mcollina Here's a quick PoC of what I was talking about: c875223

It's not ready for usage or anything but it works as expected. Could likely be optimized quite a bit. Might have bugs.

@mcollina
Copy link
Member Author

Actually, come to think of it I think we might get away with rejecting the promise with an OutOfOrderIterationError if you call next() before the previous next resolved. I'll check what Python does.

I will do the OutOfOrderIterationError for now. I prefer to avoid to create that many promises.
@apapirovski in your code we are creating one for each async function and one when we do new Promise().

As I said, the next step is writing a benchmark, so we can make those tradeoffs with informed numbers.

@benjamingr
Copy link
Member

@apapirovski I'm not sure why we'd want a LinkedList implementation for this rather than just an array. It's not faster (we always push to the end), it's more allocations, less optimizable and less cache local. We actually want a deque most likely - but we can totally just use an array here - if we're concerned about shift() we can increment a counter instead of .shift()ing and setting it to 0 when the queue is empty.

@mcollina
Copy link
Member Author

@benjamingr OutOfOrderIterationError is not currently defined on master.

@benjamingr
Copy link
Member

@mcollina I just liked that name - sorry for being confusing. Such an error would have no meaning on a regular iterator - I was just suggesting an error name.

@apapirovski
Copy link
Member

apapirovski commented Dec 19, 2017

I'm not sure why we'd want a LinkedList implementation for this rather than just an array. It's not faster (we always push to the end), it's more allocations, less optimizable and less cache local. We actually want a deque most likely - but we can totally just use an array here - if we're concerned about shift() we can increment a counter instead of .shift()ing and setting it to 0 when the queue is empty.

The same reason we switched to using LinkedList in nextTick. Doing both shift & push on an Array is worse for performance. If instead we're only clearing the Array when the queue is done then we run into the same issues as on nextTick where we can run out of memory. (Never underestimate users' ability to do things that make no sense like calling nextTick 1e8 times... or, in this case, .next().)

@apapirovski in your code we are creating one for each async function and one when we do new Promise().

Yeah, I'm aware. It's just a PoC, if we had benchmarks we could start optimizing that.

@apapirovski
Copy link
Member

@benjamingr Anyway, linked list and array are both overkill, I think we can just store latest promise since each new one just depends on the one before it.

@benjamingr
Copy link
Member

@apapirovski

The same reason we switched to using LinkedList in nextTick. Doing both shift & push on an Array is worse for performance.

This is getting a little offtopic - so feel free to open an issue about it. A huge performance gain in bluebird is by using a double ended queue rather than an array or a linked list see this file.

@apapirovski
Copy link
Member

apapirovski commented Dec 19, 2017

@mcollina This version is a lot simpler and no extra Promise required: 02b6336

We could likely simplify the conditionals even further. In the for..await scenario it only does an extra if check, so it should be almost equally as fast.

This is getting a little offtopic - so feel free to open an issue about it. A huge performance gain in bluebird is by using a double ended queue rather than an array or a linked list see this file.

Technically singly linked list (head-tail linked list) is the most common implementation of an unknown-length queue, and we don't need a double end queue (since we only remove from head and add to tail). Bluebird can get away with doing some things that we can't since they specify capacity — the trade-off is that it has to have a resize operation. I'm not 100% certain that what's implemented there is the fastest solution possible. Anyway, this is purely academic at this point since we're not using either. :)

@mcollina
Copy link
Member Author

@apapirovski I like your suggestion and I've included it.

I will need to test and check what happens if the stream is destroyed in the meanwhile, but if it's not working is an easy fix.

@benjamingr
Copy link
Member

I'm +1 on the suggestion and its inclusion. @apapirovski I've moved our linked list discussion to mail to keep the thread clean.

// destroy(err, cb) is a private API
// we can guarantee we have that here, because we control the
// Readable class this is attached to
return new Promise((resolve, reject) => {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure that's faster than what you had before this change (with promisify) :D

I'm sorry if I was confusing in the comment.

Copy link
Member Author

@mcollina mcollina Dec 20, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I actually think that this is faster because we would have to call promisify on every destroy method, as some instances override it. We will have to benchmark.

@mcollina
Copy link
Member Author

@@ -922,6 +924,12 @@ Readable.prototype.wrap = function(stream) {
return this;
};

Readable.prototype[Symbol.asyncIterator] = function() {
emitExperimentalWarning('Readable[Symbol.AsyncIterator]');
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

asyncIterator


let err;
try {
/*eslint no-unused-vars: 0*/
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think this disables the rule for the rest of the file. Instead you can use // eslint-disable-next-line no-unused-vars

Adds support for Symbol.asyncIterator into the Readable class.
The stream is destroyed when the loop terminates with break or throw.

Fixes: nodejs#15709
@mcollina
Copy link
Member Author

CI: https://ci.nodejs.org/job/node-test-pull-request/12500/

(last before landing)

@mcollina
Copy link
Member Author

Landed as 61b4d60.

@mcollina mcollina closed this Jan 11, 2018
@mcollina mcollina deleted the asynciterators branch January 11, 2018 12:30
mcollina added a commit that referenced this pull request Jan 11, 2018
Adds support for Symbol.asyncIterator into the Readable class.
The stream is destroyed when the loop terminates with break or throw.

Fixes: #15709

PR-URL: #17755
Fixes: #15709
Reviewed-By: Benjamin Gruenbaum <[email protected]>
Reviewed-By: Anatoli Papirovski <[email protected]>
Reviewed-By: James M Snell <[email protected]>
Reviewed-By: Vse Mozhet Byt <[email protected]>
Reviewed-By: Michaël Zasso <[email protected]>
evanlucas pushed a commit that referenced this pull request Jan 22, 2018
This is required because we need to add the babel-eslint dependency
and it has to be able to resolve "eslint".
babel-eslint is required to support future ES features such as async
iterators and import.meta.

Refs: #17755
PR-URL: #17820
Reviewed-By: Matteo Collina <[email protected]>
Reviewed-By: Benjamin Gruenbaum <[email protected]>
evanlucas pushed a commit that referenced this pull request Jan 30, 2018
This is required because we need to add the babel-eslint dependency
and it has to be able to resolve "eslint".
babel-eslint is required to support future ES features such as async
iterators and import.meta.

Refs: #17755
PR-URL: #17820
Reviewed-By: Matteo Collina <[email protected]>
Reviewed-By: Benjamin Gruenbaum <[email protected]>
@vsemozhetbyt
Copy link
Contributor

Will it remain experimental in v11?

@mcollina
Copy link
Member Author

@vsemozhetbyt I hope we could get it out of experimental before v10 goes to LTS.

WHATWG streams is implementing the same thing atm, and I would like the semantics and APIs to match so that code that uses this would work identically.

cc @benjamingr @devsnek

@benjamingr
Copy link
Member

@mcollina I've also been talking to @jakearchibald about it

@benjamingr
Copy link
Member

In short: I'm +1 on unflagging and using asnyc iterators as an interop mechanism between whatwg and node streams.

@betomoretti
Copy link

Hi all! I'm aware that this is an experimental feature but, Is there a way to enable it? I couldn't find how to do it neither in docs or node --help output. I'm using node version 10.7.0.

Thanks!

@rauschma
Copy link

@betomoretti I’ve blogged about it – no flag needed: http://2ality.com/2018/04/async-iter-nodejs.html

@vsemozhetbyt
Copy link
Contributor

@rauschma And we have readline async iterator in master now: #23916

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
semver-minor PRs that contain new features and should be released in the next minor version. stream Issues and PRs related to the stream subsystem.
Projects
None yet
Development

Successfully merging this pull request may close these issues.