diff --git a/doc/api/stream.md b/doc/api/stream.md index 6280fc740455d5..1f7f679e3ba5a4 100644 --- a/doc/api/stream.md +++ b/doc/api/stream.md @@ -1781,6 +1781,55 @@ for await (const result of dnsResults) { } ``` +### `readable.filter(fn[, options])` + + + +> Stability: 1 - Experimental + +* `fn` {Function|AsyncFunction} a function to filter items from stream. + * `data` {any} a chunk of data from the stream. + * `options` {Object} + * `signal` {AbortSignal} aborted if the stream is destroyed allowing to + abort the `fn` call early. +* `options` {Object} + * `concurrency` {number} the maximal concurrent invocation of `fn` to call + on the stream at once. **Default:** `1`. + * `signal` {AbortSignal} allows destroying the stream if the signal is + aborted. +* Returns: {Readable} a stream filtered with the predicate `fn`. + +This method allows filtering the stream. For each item in the stream the `fn` +function will be called and if it returns a truthy value, the item will be +passed to the result stream. If the `fn` function returns a promise - that +promise will be `await`ed. + +```mjs +import { Readable } from 'stream'; +import { Resolver } from 'dns/promises'; + +// With a synchronous predicate. +for await (const item of Readable.from([1, 2, 3, 4]).filter((x) => x > 2)) { + console.log(item); // 3, 4 +} +// With an asynchronous predicate, making at most 2 queries at a time. +const resolver = new Resolver(); +const dnsResults = await Readable.from([ + 'nodejs.org', + 'openjsf.org', + 'www.linuxfoundation.org', +]).filter(async (domain) => { + const { address } = await resolver.resolve4(domain, { ttl: true }); + return address.ttl > 60; +}, { concurrency: 2 }); +for await (const result of dnsResults) { + // Logs domains with more than 60 seconds on the resolved dns record. + console.log(result); +} +``` + ### Duplex and transform streams #### Class: `stream.Duplex` diff --git a/lib/internal/streams/operators.js b/lib/internal/streams/operators.js index 7ffbbebd332d9e..267cf53740bd7f 100644 --- a/lib/internal/streams/operators.js +++ b/lib/internal/streams/operators.js @@ -147,6 +147,20 @@ async function * map(fn, options) { } } +async function * filter(fn, options) { + if (typeof fn !== 'function') { + throw (new ERR_INVALID_ARG_TYPE( + 'fn', ['Function', 'AsyncFunction'], this)); + } + async function filterFn(value, options) { + if (await fn(value, options)) { + return value; + } + return kEmpty; + } + yield* this.map(filterFn, options); +} module.exports = { map, + filter }; diff --git a/test/parallel/test-stream-filter.js b/test/parallel/test-stream-filter.js new file mode 100644 index 00000000000000..100921a766977e --- /dev/null +++ b/test/parallel/test-stream-filter.js @@ -0,0 +1,109 @@ +'use strict'; + +const common = require('../common'); +const { + Readable, +} = require('stream'); +const assert = require('assert'); +const { setTimeout } = require('timers/promises'); + +{ + // Filter works on synchronous streams with a synchronous predicate + const stream = Readable.from([1, 2, 3, 4, 5]).filter((x) => x < 3); + const result = [1, 2]; + (async () => { + for await (const item of stream) { + assert.strictEqual(item, result.shift()); + } + })().then(common.mustCall()); +} + +{ + // Filter works on synchronous streams with an asynchronous predicate + const stream = Readable.from([1, 2, 3, 4, 5]).filter(async (x) => { + await Promise.resolve(); + return x > 3; + }); + const result = [4, 5]; + (async () => { + for await (const item of stream) { + assert.strictEqual(item, result.shift()); + } + })().then(common.mustCall()); +} + +{ + // Map works on asynchronous streams with a asynchronous mapper + const stream = Readable.from([1, 2, 3, 4, 5]).map(async (x) => { + await Promise.resolve(); + return x + x; + }).filter((x) => x > 5); + const result = [6, 8, 10]; + (async () => { + for await (const item of stream) { + assert.strictEqual(item, result.shift()); + } + })().then(common.mustCall()); +} + +{ + // Concurrency + AbortSignal + const ac = new AbortController(); + let calls = 0; + const stream = Readable.from([1, 2, 3, 4]).filter(async (_, { signal }) => { + calls++; + await setTimeout(100, { signal }); + }, { signal: ac.signal, concurrency: 2 }); + // pump + assert.rejects(async () => { + for await (const item of stream) { + // nope + console.log(item); + } + }, { + name: 'AbortError', + }).then(common.mustCall()); + + setImmediate(() => { + ac.abort(); + assert.strictEqual(calls, 2); + }); +} + +{ + // Concurrency result order + const stream = Readable.from([1, 2]).filter(async (item, { signal }) => { + await setTimeout(10 - item, { signal }); + return true; + }, { concurrency: 2 }); + + (async () => { + const expected = [1, 2]; + for await (const item of stream) { + assert.strictEqual(item, expected.shift()); + } + })().then(common.mustCall()); +} + +{ + // Error cases + assert.rejects(async () => { + // eslint-disable-next-line no-unused-vars + for await (const unused of Readable.from([1]).filter(1)); + }, /ERR_INVALID_ARG_TYPE/).then(common.mustCall()); + assert.rejects(async () => { + // eslint-disable-next-line no-unused-vars + for await (const _ of Readable.from([1]).filter((x) => x, { + concurrency: 'Foo' + })); + }, /ERR_OUT_OF_RANGE/).then(common.mustCall()); + assert.rejects(async () => { + // eslint-disable-next-line no-unused-vars + for await (const _ of Readable.from([1]).filter((x) => x, 1)); + }, /ERR_INVALID_ARG_TYPE/).then(common.mustCall()); +} +{ + // Test result is a Readable + const stream = Readable.from([1, 2, 3, 4, 5]).filter((x) => true); + assert.strictEqual(stream.readable, true); +}