-
Notifications
You must be signed in to change notification settings - Fork 30.2k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Support the `flatMap` method from the iterator helper TC39 proposal on readable streams. Co-Authored-By: Robert Nagy <[email protected]> PR-URL: #41612 Reviewed-By: Robert Nagy <[email protected]> Reviewed-By: Matteo Collina <[email protected]>
- Loading branch information
1 parent
84752a4
commit 205c018
Showing
3 changed files
with
188 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,131 @@ | ||
'use strict'; | ||
|
||
const common = require('../common'); | ||
const fixtures = require('../common/fixtures'); | ||
const { | ||
Readable, | ||
} = require('stream'); | ||
const assert = require('assert'); | ||
const { setTimeout } = require('timers/promises'); | ||
const { createReadStream } = require('fs'); | ||
|
||
function oneTo5() { | ||
return Readable.from([1, 2, 3, 4, 5]); | ||
} | ||
|
||
{ | ||
// flatMap works on synchronous streams with a synchronous mapper | ||
(async () => { | ||
assert.deepStrictEqual( | ||
await oneTo5().flatMap((x) => [x + x]).toArray(), | ||
[2, 4, 6, 8, 10] | ||
); | ||
assert.deepStrictEqual( | ||
await oneTo5().flatMap(() => []).toArray(), | ||
[] | ||
); | ||
assert.deepStrictEqual( | ||
await oneTo5().flatMap((x) => [x, x]).toArray(), | ||
[1, 1, 2, 2, 3, 3, 4, 4, 5, 5] | ||
); | ||
})().then(common.mustCall()); | ||
} | ||
|
||
|
||
{ | ||
// flatMap works on sync/async streams with an asynchronous mapper | ||
(async () => { | ||
assert.deepStrictEqual( | ||
await oneTo5().flatMap(async (x) => [x, x]).toArray(), | ||
[1, 1, 2, 2, 3, 3, 4, 4, 5, 5] | ||
); | ||
const asyncOneTo5 = oneTo5().map(async (x) => x); | ||
assert.deepStrictEqual( | ||
await asyncOneTo5.flatMap(async (x) => [x, x]).toArray(), | ||
[1, 1, 2, 2, 3, 3, 4, 4, 5, 5] | ||
); | ||
})().then(common.mustCall()); | ||
} | ||
{ | ||
// flatMap works on a stream where mapping returns a stream | ||
(async () => { | ||
const result = await oneTo5().flatMap(async (x) => { | ||
return Readable.from([x, x]); | ||
}).toArray(); | ||
assert.deepStrictEqual(result, [1, 1, 2, 2, 3, 3, 4, 4, 5, 5]); | ||
})().then(common.mustCall()); | ||
// flatMap works on an objectMode stream where mappign returns a stream | ||
(async () => { | ||
const result = await oneTo5().flatMap(() => { | ||
return createReadStream(fixtures.path('x.txt')); | ||
}).toArray(); | ||
// The resultant stream is in object mode so toArray shouldn't flatten | ||
assert.strictEqual(result.length, 5); | ||
assert.deepStrictEqual( | ||
Buffer.concat(result).toString(), | ||
'xyz\n'.repeat(5) | ||
); | ||
|
||
})().then(common.mustCall()); | ||
|
||
} | ||
|
||
{ | ||
// Concurrency + AbortSignal | ||
const ac = new AbortController(); | ||
const stream = oneTo5().flatMap(common.mustNotCall(async (_, { signal }) => { | ||
await setTimeout(100, { signal }); | ||
}), { signal: ac.signal, concurrency: 2 }); | ||
// pump | ||
assert.rejects(async () => { | ||
for await (const item of stream) { | ||
// nope | ||
console.log(item); | ||
} | ||
}, { | ||
name: 'AbortError', | ||
}).then(common.mustCall()); | ||
|
||
queueMicrotask(() => { | ||
ac.abort(); | ||
}); | ||
} | ||
|
||
{ | ||
// Already aborted AbortSignal | ||
const stream = oneTo5().flatMap(common.mustNotCall(async (_, { signal }) => { | ||
await setTimeout(100, { signal }); | ||
}), { signal: AbortSignal.abort() }); | ||
// pump | ||
assert.rejects(async () => { | ||
for await (const item of stream) { | ||
// nope | ||
console.log(item); | ||
} | ||
}, { | ||
name: 'AbortError', | ||
}).then(common.mustCall()); | ||
} | ||
|
||
{ | ||
// Error cases | ||
assert.rejects(async () => { | ||
// eslint-disable-next-line no-unused-vars | ||
for await (const unused of Readable.from([1]).flatMap(1)); | ||
}, /ERR_INVALID_ARG_TYPE/).then(common.mustCall()); | ||
assert.rejects(async () => { | ||
// eslint-disable-next-line no-unused-vars | ||
for await (const _ of Readable.from([1]).flatMap((x) => x, { | ||
concurrency: 'Foo' | ||
})); | ||
}, /ERR_OUT_OF_RANGE/).then(common.mustCall()); | ||
assert.rejects(async () => { | ||
// eslint-disable-next-line no-unused-vars | ||
for await (const _ of Readable.from([1]).flatMap((x) => x, 1)); | ||
}, /ERR_INVALID_ARG_TYPE/).then(common.mustCall()); | ||
} | ||
{ | ||
// Test result is a Readable | ||
const stream = oneTo5().flatMap((x) => x); | ||
assert.strictEqual(stream.readable, true); | ||
} |