Skip to content

Commit ef12c54

Browse files
committed
stream: implement ReadableStream.from
Fixes: #48389
1 parent 8a725c7 commit ef12c54

14 files changed

+1036
-2
lines changed

doc/api/webstreams.md

+43
Original file line numberDiff line numberDiff line change
@@ -387,6 +387,49 @@ port1.onmessage = ({ data }) => {
387387
port2.postMessage(stream, [stream]);
388388
```
389389

390+
### `ReadableStream.from(iterable)`
391+
392+
<!-- YAML
393+
added: REPLACEME
394+
-->
395+
396+
* `iterable` {Iterable} Object implementing the `Symbol.asyncIterator` or
397+
`Symbol.iterator` iterable protocol.
398+
399+
A utility method that creates a new {ReadableStream} from an iterable.
400+
401+
```mjs
402+
import { ReadableStream } from 'node:stream/web';
403+
404+
async function* asyncIterableGenerator() {
405+
yield 'a';
406+
yield 'b';
407+
yield 'c';
408+
}
409+
410+
const stream = ReadableStream.from(asyncIterableGenerator());
411+
412+
for await (const chunk of stream)
413+
console.log(chunk); // Prints 'a', 'b', 'c'
414+
```
415+
416+
```cjs
417+
const { ReadableStream } = require('node:stream/web');
418+
419+
async function* asyncIterableGenerator() {
420+
yield 'a';
421+
yield 'b';
422+
yield 'c';
423+
}
424+
425+
(async () => {
426+
const stream = ReadableStream.from(asyncIterableGenerator());
427+
428+
for await (const chunk of stream)
429+
console.log(chunk); // Prints 'a', 'b', 'c'
430+
})();
431+
```
432+
390433
### Class: `ReadableStreamDefaultReader`
391434

392435
<!-- YAML

lib/internal/webstreams/readablestream.js

+59
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,8 @@ const {
110110
nonOpCancel,
111111
nonOpPull,
112112
nonOpStart,
113+
getIterator,
114+
iteratorNext,
113115
kType,
114116
kState,
115117
} = require('internal/webstreams/util');
@@ -312,6 +314,10 @@ class ReadableStream {
312314
return isReadableStreamLocked(this);
313315
}
314316

317+
static from(iterable) {
318+
return readableStreamFromIterable(iterable);
319+
}
320+
315321
/**
316322
* @param {any} [reason]
317323
* @returns { Promise<void> }
@@ -1248,6 +1254,59 @@ const isReadableStreamBYOBReader =
12481254

12491255
// ---- ReadableStream Implementation
12501256

1257+
function readableStreamFromIterable(iterable) {
1258+
let stream;
1259+
const iteratorRecord = getIterator(iterable, 'async');
1260+
1261+
const startAlgorithm = nonOpStart;
1262+
1263+
async function pullAlgorithm() {
1264+
const nextResult = iteratorNext(iteratorRecord);
1265+
const nextPromise = PromiseResolve(nextResult);
1266+
return PromisePrototypeThen(nextPromise, (iterResult) => {
1267+
if (typeof iterResult !== 'object' || iterResult === null) {
1268+
throw new ERR_INVALID_STATE.TypeError(
1269+
'The promise returned by the iterator.next() method must fulfill with an object');
1270+
}
1271+
if (iterResult.done) {
1272+
readableStreamDefaultControllerClose(stream[kState].controller);
1273+
} else {
1274+
readableStreamDefaultControllerEnqueue(stream[kState].controller, iterResult.value);
1275+
}
1276+
});
1277+
}
1278+
1279+
async function cancelAlgorithm(reason) {
1280+
const iterator = iteratorRecord.iterator;
1281+
const returnMethod = iterator.return;
1282+
if (returnMethod === undefined) {
1283+
return PromiseResolve();
1284+
}
1285+
const returnResult = FunctionPrototypeCall(returnMethod, iterator, reason);
1286+
const returnPromise = PromiseResolve(returnResult);
1287+
return PromisePrototypeThen(returnPromise, (iterResult) => {
1288+
if (typeof iterResult !== 'object' || iterResult === null) {
1289+
throw new ERR_INVALID_STATE.TypeError(
1290+
'The promise returned by the iterator.return() method must fulfill with an object');
1291+
}
1292+
return undefined;
1293+
});
1294+
}
1295+
1296+
stream = new ReadableStream({
1297+
start: startAlgorithm,
1298+
pull: pullAlgorithm,
1299+
cancel: cancelAlgorithm,
1300+
}, {
1301+
size() {
1302+
return 1;
1303+
},
1304+
highWaterMark: 0,
1305+
});
1306+
1307+
return stream;
1308+
}
1309+
12511310
function readableStreamPipeTo(
12521311
source,
12531312
dest,

lib/internal/webstreams/util.js

+53
Original file line numberDiff line numberDiff line change
@@ -13,13 +13,16 @@ const {
1313
PromiseReject,
1414
ReflectGet,
1515
Symbol,
16+
SymbolAsyncIterator,
17+
SymbolIterator,
1618
Uint8Array,
1719
} = primordials;
1820

1921
const {
2022
codes: {
2123
ERR_INVALID_ARG_VALUE,
2224
ERR_OPERATION_FAILED,
25+
ERR_INVALID_STATE,
2326
},
2427
} = require('internal/errors');
2528

@@ -217,6 +220,54 @@ function lazyTransfer() {
217220
return transfer;
218221
}
219222

223+
function createAsyncFromSyncIterator(syncIteratorRecord) {
224+
const syncIterable = {
225+
[SymbolIterator]: () => syncIteratorRecord.iterator,
226+
};
227+
228+
const asyncIterator = (async function* () {
229+
return yield* syncIterable;
230+
}());
231+
232+
const nextMethod = asyncIterator.next;
233+
return { iterator: asyncIterator, nextMethod, done: false };
234+
}
235+
236+
function getIterator(obj, kind = 'sync', method) {
237+
if (method === undefined) {
238+
if (kind === 'async') {
239+
method = obj[SymbolAsyncIterator];
240+
if (method === undefined) {
241+
const syncMethod = obj[SymbolIterator];
242+
const syncIteratorRecord = getIterator(obj, 'sync', syncMethod);
243+
return createAsyncFromSyncIterator(syncIteratorRecord);
244+
}
245+
} else {
246+
method = obj[SymbolIterator];
247+
}
248+
}
249+
250+
const iterator = FunctionPrototypeCall(method, obj);
251+
if (typeof iterator !== 'object' || iterator === null) {
252+
throw new ERR_INVALID_STATE.TypeError('The iterator method must return an object');
253+
}
254+
const nextMethod = iterator.next;
255+
return { iterator, nextMethod, done: false };
256+
}
257+
258+
function iteratorNext(iteratorRecord, value) {
259+
let result;
260+
if (value === undefined) {
261+
result = FunctionPrototypeCall(iteratorRecord.nextMethod, iteratorRecord.iterator);
262+
} else {
263+
result = FunctionPrototypeCall(iteratorRecord.nextMethod, iteratorRecord.iterator, [value]);
264+
}
265+
if (typeof result !== 'object' || result === null) {
266+
throw new ERR_INVALID_STATE.TypeError('The iterator.next() method must return an object');
267+
}
268+
return result;
269+
}
270+
220271
module.exports = {
221272
ArrayBufferViewGetBuffer,
222273
ArrayBufferViewGetByteLength,
@@ -243,6 +294,8 @@ module.exports = {
243294
nonOpPull,
244295
nonOpStart,
245296
nonOpWrite,
297+
getIterator,
298+
iteratorNext,
246299
kType,
247300
kState,
248301
};

test/fixtures/wpt/README.md

+1-1
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ Last update:
2626
- performance-timeline: https://github.com/web-platform-tests/wpt/tree/17ebc3aea0/performance-timeline
2727
- resource-timing: https://github.com/web-platform-tests/wpt/tree/22d38586d0/resource-timing
2828
- resources: https://github.com/web-platform-tests/wpt/tree/919874f84f/resources
29-
- streams: https://github.com/web-platform-tests/wpt/tree/51750bc8d7/streams
29+
- streams: https://github.com/web-platform-tests/wpt/tree/517e945bbf/streams
3030
- url: https://github.com/web-platform-tests/wpt/tree/84782d9315/url
3131
- user-timing: https://github.com/web-platform-tests/wpt/tree/5ae85bf826/user-timing
3232
- wasm/jsapi: https://github.com/web-platform-tests/wpt/tree/cde25e7e3c/wasm/jsapi
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
// META: global=window,worker
2+
'use strict';
3+
4+
promise_test(async t => {
5+
/** @type {ReadableStreamDefaultController} */
6+
var con;
7+
let synchronous = false;
8+
new ReadableStream({ start(c) { con = c }}, { highWaterMark: 0 }).pipeTo(
9+
new WritableStream({ write() { synchronous = true; } })
10+
)
11+
// wait until start algorithm finishes
12+
await Promise.resolve();
13+
con.enqueue();
14+
assert_false(synchronous, 'write algorithm must not run synchronously');
15+
}, "enqueue() must not synchronously call write algorithm");

0 commit comments

Comments
 (0)