Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

stream: add filter method to readable #41354

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 49 additions & 0 deletions doc/api/stream.md
Original file line number Diff line number Diff line change
Expand Up @@ -1781,6 +1781,55 @@ for await (const result of dnsResults) {
}
```

### `readable.filter(fn[, options])`

<!-- YAML
added: REPLACEME
-->

> Stability: 1 - Experimental

* `fn` {Function|AsyncFunction} a function to filter items from stream.
* `data` {any} a chunk of data from the stream.
* `options` {Object}
* `signal` {AbortSignal} aborted if the stream is destroyed allowing to
abort the `fn` call early.
* `options` {Object}
* `concurrency` {number} the maximal concurrent invocation of `fn` to call
on the stream at once. **Default:** `1`.
* `signal` {AbortSignal} allows destroying the stream if the signal is
aborted.
* Returns: {Readable} a stream filtered with the predicate `fn`.

This method allows filtering the stream. For each item in the stream the `fn`
function will be called and if it returns a truthy value, the item will be
passed to the result stream. If the `fn` function returns a promise - that
promise will be `await`ed.

```mjs
import { Readable } from 'stream';
import { Resolver } from 'dns/promises';

// With a synchronous predicate.
for await (const item of Readable.from([1, 2, 3, 4]).filter((x) => x > 2)) {
console.log(item); // 3, 4
}
// With an asynchronous predicate, making at most 2 queries at a time.
const resolver = new Resolver();
const dnsResults = await Readable.from([
'nodejs.org',
'openjsf.org',
'www.linuxfoundation.org',
]).filter(async (domain) => {
const { address } = await resolver.resolve4(domain, { ttl: true });
return address.ttl > 60;
}, { concurrency: 2 });
for await (const result of dnsResults) {
// Logs domains with more than 60 seconds on the resolved dns record.
console.log(result);
}
```

### Duplex and transform streams

#### Class: `stream.Duplex`
Expand Down
14 changes: 14 additions & 0 deletions lib/internal/streams/operators.js
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,20 @@ async function * map(fn, options) {
}
}

async function * filter(fn, options) {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is an async generator for error's sake (so that if fn is not a function it rejects like an async generator would)

I changed the original code a bunch to pass the tests I added)

if (typeof fn !== 'function') {
throw (new ERR_INVALID_ARG_TYPE(
'fn', ['Function', 'AsyncFunction'], this));
benjamingr marked this conversation as resolved.
Show resolved Hide resolved
}
async function filterFn(value, options) {
if (await fn(value, options)) {
return value;
}
return kEmpty;
ronag marked this conversation as resolved.
Show resolved Hide resolved
}
yield* this.map(filterFn, options);
}
module.exports = {
map,
filter
benjamingr marked this conversation as resolved.
Show resolved Hide resolved
};
109 changes: 109 additions & 0 deletions test/parallel/test-stream-filter.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
'use strict';

const common = require('../common');
const {
Readable,
} = require('stream');
const assert = require('assert');
const { setTimeout } = require('timers/promises');

{
// Filter works on synchronous streams with a synchronous predicate
const stream = Readable.from([1, 2, 3, 4, 5]).filter((x) => x < 3);
const result = [1, 2];
(async () => {
for await (const item of stream) {
assert.strictEqual(item, result.shift());
}
})().then(common.mustCall());
}

{
// Filter works on synchronous streams with an asynchronous predicate
const stream = Readable.from([1, 2, 3, 4, 5]).filter(async (x) => {
await Promise.resolve();
return x > 3;
});
const result = [4, 5];
(async () => {
for await (const item of stream) {
assert.strictEqual(item, result.shift());
}
})().then(common.mustCall());
}

{
// Map works on asynchronous streams with a asynchronous mapper
const stream = Readable.from([1, 2, 3, 4, 5]).map(async (x) => {
await Promise.resolve();
return x + x;
}).filter((x) => x > 5);
const result = [6, 8, 10];
(async () => {
for await (const item of stream) {
assert.strictEqual(item, result.shift());
}
})().then(common.mustCall());
}

{
// Concurrency + AbortSignal
const ac = new AbortController();
let calls = 0;
const stream = Readable.from([1, 2, 3, 4]).filter(async (_, { signal }) => {
calls++;
await setTimeout(100, { signal });
}, { signal: ac.signal, concurrency: 2 });
// pump
assert.rejects(async () => {
for await (const item of stream) {
// nope
console.log(item);
}
}, {
name: 'AbortError',
}).then(common.mustCall());

setImmediate(() => {
ac.abort();
assert.strictEqual(calls, 2);
});
}

{
// Concurrency result order
const stream = Readable.from([1, 2]).filter(async (item, { signal }) => {
await setTimeout(10 - item, { signal });
return true;
}, { concurrency: 2 });

(async () => {
const expected = [1, 2];
for await (const item of stream) {
assert.strictEqual(item, expected.shift());
}
})().then(common.mustCall());
}

{
// Error cases
assert.rejects(async () => {
// eslint-disable-next-line no-unused-vars
for await (const unused of Readable.from([1]).filter(1));
}, /ERR_INVALID_ARG_TYPE/).then(common.mustCall());
assert.rejects(async () => {
// eslint-disable-next-line no-unused-vars
for await (const _ of Readable.from([1]).filter((x) => x, {
concurrency: 'Foo'
}));
}, /ERR_OUT_OF_RANGE/).then(common.mustCall());
assert.rejects(async () => {
// eslint-disable-next-line no-unused-vars
for await (const _ of Readable.from([1]).filter((x) => x, 1));
}, /ERR_INVALID_ARG_TYPE/).then(common.mustCall());
}
{
// Test result is a Readable
const stream = Readable.from([1, 2, 3, 4, 5]).filter((x) => true);
assert.strictEqual(stream.readable, true);
}