From 5023aa1387994a76e0caa5380cdf70dcb4815898 Mon Sep 17 00:00:00 2001 From: Benjamin Gruenbaum Date: Mon, 15 Nov 2021 15:39:05 +0200 Subject: [PATCH] stream: add map method to Readable: --- lib/internal/streams/operators.js | 15 ++++++++ lib/stream.js | 4 ++ test/parallel/test-stream-map.js | 61 +++++++++++++++++++++++++++++++ 3 files changed, 80 insertions(+) create mode 100644 lib/internal/streams/operators.js create mode 100644 test/parallel/test-stream-map.js diff --git a/lib/internal/streams/operators.js b/lib/internal/streams/operators.js new file mode 100644 index 00000000000000..de5c9185165735 --- /dev/null +++ b/lib/internal/streams/operators.js @@ -0,0 +1,15 @@ +'use strict'; + +const { AbortError } = require('internal/errors'); +const compose = require('internal/streams/compose'); + +module.exports.map = function map(stream, fn) { + return compose(stream, async function* (source, { signal }) { + for await (const item of source) { + if (signal.aborted) { + throw new AbortError('The iteration has been interrupted'); + } + yield await fn(item, { signal }); + } + }); +}; diff --git a/lib/stream.js b/lib/stream.js index cc56b76e31a4a6..05c7ec80ddeff1 100644 --- a/lib/stream.js +++ b/lib/stream.js @@ -29,6 +29,7 @@ const { promisify: { custom: customPromisify }, } = require('internal/util'); +const { map } = require('internal/streams/operators'); const compose = require('internal/streams/compose'); const { pipeline } = require('internal/streams/pipeline'); const { destroyer } = require('internal/streams/destroy'); @@ -40,6 +41,9 @@ const promises = require('stream/promises'); const Stream = module.exports = require('internal/streams/legacy').Stream; Stream.isDisturbed = require('internal/streams/utils').isDisturbed; Stream.Readable = require('internal/streams/readable'); +Stream.Readable.prototype.map = function(fn) { + return map(this, fn); +}; Stream.Writable = require('internal/streams/writable'); Stream.Duplex = require('internal/streams/duplex'); Stream.Transform = require('internal/streams/transform'); diff --git a/test/parallel/test-stream-map.js b/test/parallel/test-stream-map.js new file mode 100644 index 00000000000000..6732d5c59fc9b8 --- /dev/null +++ b/test/parallel/test-stream-map.js @@ -0,0 +1,61 @@ +'use strict'; + +const common = require('../common'); +const { + Readable, +} = require('stream'); +const assert = require('assert'); +const { setTimeout } = require('timers/promises'); + +{ + // Map works on synchronous streams with a synchronous mapper + const stream = Readable.from([1, 2, 3, 4, 5]).map((x) => x + x); + const result = [2, 4, 6, 8, 10]; + (async () => { + for await (const item of stream) { + assert.strictEqual(item, result.shift()); + } + })().then(common.mustCall()); +} + +{ + // Map works on synchronous streams with an asynchronous mapper + const stream = Readable.from([1, 2, 3, 4, 5]).map(async (x) => { + await Promise.resolve(); + return x + x; + }); + const result = [2, 4, 6, 8, 10]; + (async () => { + for await (const item of stream) { + assert.strictEqual(item, result.shift()); + } + })().then(common.mustCall()); +} + +{ + // Map works on asynchronous streams with a asynchronous mapper + const stream = Readable.from([1, 2, 3, 4, 5]).map(async (x) => { + return x + x; + }).map((x) => x * x); + const result = [4, 8, 12, 16, 20]; + (async () => { + for await (const item of stream) { + assert.strictEqual(item, result.shift()); + } + })().then(common.mustCall()); +} + +{ + // Allow cancellation of iteration through an AbortSignal + + const stream = Readable.from([1, 2, 3, 4, 5]).map(async (x, { signal }) => { + return setTimeout(1e15, { signal }); + }); + (async () => { + const iterator = stream[Symbol.asyncIterator](); + iterator.next(); + iterator.return(); + })().catch(common.mustCall((err) => { + assert.equals(err.name, 'AbortError'); + })); +}