Skip to content

Commit

Permalink
stream: add map method to Readable:
Browse files Browse the repository at this point in the history
  • Loading branch information
benjamingr committed Nov 15, 2021
1 parent 640bfb8 commit 5023aa1
Show file tree
Hide file tree
Showing 3 changed files with 80 additions and 0 deletions.
15 changes: 15 additions & 0 deletions lib/internal/streams/operators.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
'use strict';

const { AbortError } = require('internal/errors');
const compose = require('internal/streams/compose');

module.exports.map = function map(stream, fn) {
return compose(stream, async function* (source, { signal }) {
for await (const item of source) {
if (signal.aborted) {
throw new AbortError('The iteration has been interrupted');
}
yield await fn(item, { signal });
}
});
};
4 changes: 4 additions & 0 deletions lib/stream.js
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ const {
promisify: { custom: customPromisify },
} = require('internal/util');

const { map } = require('internal/streams/operators');
const compose = require('internal/streams/compose');
const { pipeline } = require('internal/streams/pipeline');
const { destroyer } = require('internal/streams/destroy');
Expand All @@ -40,6 +41,9 @@ const promises = require('stream/promises');
const Stream = module.exports = require('internal/streams/legacy').Stream;
Stream.isDisturbed = require('internal/streams/utils').isDisturbed;
Stream.Readable = require('internal/streams/readable');
Stream.Readable.prototype.map = function(fn) {
return map(this, fn);
};
Stream.Writable = require('internal/streams/writable');
Stream.Duplex = require('internal/streams/duplex');
Stream.Transform = require('internal/streams/transform');
Expand Down
61 changes: 61 additions & 0 deletions test/parallel/test-stream-map.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
'use strict';

const common = require('../common');
const {
Readable,
} = require('stream');
const assert = require('assert');
const { setTimeout } = require('timers/promises');

{
// Map works on synchronous streams with a synchronous mapper
const stream = Readable.from([1, 2, 3, 4, 5]).map((x) => x + x);
const result = [2, 4, 6, 8, 10];
(async () => {
for await (const item of stream) {
assert.strictEqual(item, result.shift());
}
})().then(common.mustCall());
}

{
// Map works on synchronous streams with an asynchronous mapper
const stream = Readable.from([1, 2, 3, 4, 5]).map(async (x) => {
await Promise.resolve();
return x + x;
});
const result = [2, 4, 6, 8, 10];
(async () => {
for await (const item of stream) {
assert.strictEqual(item, result.shift());
}
})().then(common.mustCall());
}

{
// Map works on asynchronous streams with a asynchronous mapper
const stream = Readable.from([1, 2, 3, 4, 5]).map(async (x) => {
return x + x;
}).map((x) => x * x);
const result = [4, 8, 12, 16, 20];
(async () => {
for await (const item of stream) {
assert.strictEqual(item, result.shift());
}
})().then(common.mustCall());
}

{
// Allow cancellation of iteration through an AbortSignal

const stream = Readable.from([1, 2, 3, 4, 5]).map(async (x, { signal }) => {
return setTimeout(1e15, { signal });
});
(async () => {
const iterator = stream[Symbol.asyncIterator]();
iterator.next();
iterator.return();
})().catch(common.mustCall((err) => {
assert.equals(err.name, 'AbortError');
}));
}

0 comments on commit 5023aa1

Please sign in to comment.