Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

streams: implement Readable.from async iterator utility #27660

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
113 changes: 111 additions & 2 deletions doc/api/stream.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,8 @@ There are four fundamental stream types within Node.js:
* [`Transform`][] - `Duplex` streams that can modify or transform the data as it
is written and read (for example, [`zlib.createDeflate()`][]).

Additionally, this module includes the utility functions [pipeline][] and
[finished][].
Additionally, this module includes the utility functions [pipeline][],
[finished][] and [Readable.from][].

### Object Mode

Expand Down Expand Up @@ -1480,6 +1480,31 @@ async function run() {
run().catch(console.error);
```

### Readable.from(iterable, [options])

* `iterable` {Iterable} Object implementing the `Symbol.asyncIterator` or
`Symbol.iterator` iterable protocol.
* `options` {Object} Options provided to `new stream.Readable([options])`.
guybedford marked this conversation as resolved.
Show resolved Hide resolved
By default, `Readable.from()` will set `options.objectMode` to `true`, unless
this is explicitly opted out by setting `options.objectMode` to `false`.

A utility method for creating Readable Streams out of iterators.

```js
const { Readable } = require('stream');

async function * generate() {
yield 'hello';
yield 'streams';
}

const readable = Readable.from(generate());

readable.on('data', (chunk) => {
console.log(chunk);
});
```

## API for Stream Implementers

<!--type=misc-->
Expand Down Expand Up @@ -2395,6 +2420,89 @@ primarily for examples and testing, but there are some use cases where

<!--type=misc-->

### Streams Compatibility with Async Generators and Async Iterators

With the support of async generators and iterators in JavaScript, async
generators are effectively a first-class language-level stream construct at
this point.

Some common interop cases of using Node.js streams with async generators
and async iterators are provided below.

#### Consuming Readable Streams with Async Iterators

```js
(async function() {
for await (const chunk of readable) {
console.log(chunk);
}
})();
```

#### Creating Readable Streams with Async Generators

We can construct a Node.js Readable Stream from an asynchronous generator
using the `Readable.from` utility method:

```js
const { Readable } = require('stream');

async function * generate() {
yield 'a';
yield 'b';
yield 'c';
}

const readable = Readable.from(generate());

readable.on('data', (chunk) => {
console.log(chunk);
});
```

#### Piping to Writable Streams from Async Iterators

In the scenario of writing to a writeable stream from an async iterator,
it is important to ensure the correct handling of backpressure and errors.

```js
const { once } = require('events');

const writeable = fs.createWriteStream('./file');

(async function() {
for await (const chunk of iterator) {
// Handle backpressure on write
if (!writeable.write(value))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

where does value come from?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should be chunk, feel free to open a PR.

await once(writeable, 'drain');
}
writeable.end();
// Ensure completion without errors
await once(writeable, 'finish');
})();
```
guybedford marked this conversation as resolved.
Show resolved Hide resolved

In the above, errors on the write stream would be caught and thrown by the two
`once` listeners, since `once` will also handle `'error'` events.

Alternatively the readable stream could be wrapped with `Readable.from` and
then piped via `.pipe`:

```js
const { once } = require('events');

const writeable = fs.createWriteStream('./file');

(async function() {
const readable = Readable.from(iterator);
readable.pipe(writeable);
// Ensure completion without errors
await once(writeable, 'finish');
})();
```

<!--type=misc-->

### Compatibility with Older Node.js Versions

<!--type=misc-->
Expand Down Expand Up @@ -2531,6 +2639,7 @@ contain multi-byte characters.
[Compatibility]: #stream_compatibility_with_older_node_js_versions
[HTTP requests, on the client]: http.html#http_class_http_clientrequest
[HTTP responses, on the server]: http.html#http_class_http_serverresponse
[Readable.from]: #readable.from
[TCP sockets]: net.html#net_class_net_socket
[child process stdin]: child_process.html#child_process_subprocess_stdin
[child process stdout and stderr]: child_process.html#child_process_subprocess_stdout
Expand Down
39 changes: 39 additions & 0 deletions lib/_stream_readable.js
Original file line number Diff line number Diff line change
Expand Up @@ -1139,3 +1139,42 @@ function endReadableNT(state, stream) {
}
}
}

Readable.from = function(iterable, opts) {
let iterator;
if (iterable && iterable[Symbol.asyncIterator])
iterator = iterable[Symbol.asyncIterator]();
else if (iterable && iterable[Symbol.iterator])
iterator = iterable[Symbol.iterator]();
else
throw new ERR_INVALID_ARG_TYPE('iterable', ['Iterable'], iterable);

const readable = new Readable({
objectMode: true,
...opts
});
// Reading boolean to protect against _read
// being called before last iteration completion.
let reading = false;
readable._read = function() {
if (!reading) {
reading = true;
next();
}
};
async function next() {
try {
const { value, done } = await iterator.next();
if (done) {
readable.push(null);
} else if (readable.push(await value)) {
next();
} else {
reading = false;
}
} catch (err) {
readable.destroy(err);
}
}
return readable;
};
2 changes: 1 addition & 1 deletion test/parallel/test-events-once.js
Original file line number Diff line number Diff line change
Expand Up @@ -90,4 +90,4 @@ Promise.all([
catchesErrors(),
stopListeningAfterCatchingError(),
onceError()
]);
]).then(common.mustCall());
163 changes: 163 additions & 0 deletions test/parallel/test-readable-from.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
'use strict';
guybedford marked this conversation as resolved.
Show resolved Hide resolved

const { mustCall } = require('../common');
const { once } = require('events');
const { Readable } = require('stream');
const { strictEqual } = require('assert');

async function toReadableBasicSupport() {
async function * generate() {
yield 'a';
yield 'b';
yield 'c';
}

const stream = Readable.from(generate());

const expected = ['a', 'b', 'c'];

for await (const chunk of stream) {
strictEqual(chunk, expected.shift());
}
}

async function toReadableSyncIterator() {
function * generate() {
yield 'a';
yield 'b';
yield 'c';
}

const stream = Readable.from(generate());

const expected = ['a', 'b', 'c'];

for await (const chunk of stream) {
strictEqual(chunk, expected.shift());
}
}

async function toReadablePromises() {
const promises = [
Promise.resolve('a'),
Promise.resolve('b'),
Promise.resolve('c')
];

const stream = Readable.from(promises);

const expected = ['a', 'b', 'c'];

for await (const chunk of stream) {
strictEqual(chunk, expected.shift());
}
}

async function toReadableString() {
const stream = Readable.from('abc');

const expected = ['a', 'b', 'c'];

for await (const chunk of stream) {
strictEqual(chunk, expected.shift());
}
}

async function toReadableOnData() {
async function * generate() {
yield 'a';
yield 'b';
yield 'c';
}

const stream = Readable.from(generate());

let iterations = 0;
const expected = ['a', 'b', 'c'];

stream.on('data', (chunk) => {
iterations++;
strictEqual(chunk, expected.shift());
});

await once(stream, 'end');

strictEqual(iterations, 3);
}

async function toReadableOnDataNonObject() {
async function * generate() {
yield 'a';
yield 'b';
yield 'c';
}

const stream = Readable.from(generate(), { objectMode: false });

let iterations = 0;
const expected = ['a', 'b', 'c'];

stream.on('data', (chunk) => {
iterations++;
strictEqual(chunk instanceof Buffer, true);
strictEqual(chunk.toString(), expected.shift());
});

await once(stream, 'end');

strictEqual(iterations, 3);
}

async function destroysTheStreamWhenThrowing() {
async function * generate() {
throw new Error('kaboom');
}

const stream = Readable.from(generate());

stream.read();

try {
await once(stream, 'error');
} catch (err) {
strictEqual(err.message, 'kaboom');
strictEqual(stream.destroyed, true);
}
}

async function asTransformStream() {
async function * generate(stream) {
for await (const chunk of stream) {
yield chunk.toUpperCase();
}
}

const source = new Readable({
objectMode: true,
read() {
this.push('a');
this.push('b');
this.push('c');
this.push(null);
}
});

const stream = Readable.from(generate(source));

const expected = ['A', 'B', 'C'];

for await (const chunk of stream) {
strictEqual(chunk, expected.shift());
}
}

Promise.all([
guybedford marked this conversation as resolved.
Show resolved Hide resolved
toReadableBasicSupport(),
toReadableSyncIterator(),
toReadablePromises(),
toReadableString(),
toReadableOnData(),
toReadableOnDataNonObject(),
destroysTheStreamWhenThrowing(),
asTransformStream()
]).then(mustCall());