Skip to content

Commit

Permalink
stream: add pipeline() for webstreams
Browse files Browse the repository at this point in the history
  • Loading branch information
debadree25 committed Jan 22, 2023
1 parent 863a416 commit 4d04a5e
Show file tree
Hide file tree
Showing 3 changed files with 350 additions and 6 deletions.
40 changes: 34 additions & 6 deletions lib/internal/streams/pipeline.js
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,29 @@ const {
isReadable,
isReadableNodeStream,
isNodeStream,
isReadableStream,
isWritableStream,
isTransformStream,
} = require('internal/streams/utils');
const { AbortController } = require('internal/abort_controller');

let PassThrough;
let Readable;
let Writable;

function lazyloadReadable() {
if (!Readable) {
Readable = require('internal/streams/readable');
}
return Readable;
}

function lazyloadWritable() {
if (!Writable) {
Writable = require('internal/streams/writable');
}
return Writable;
}

function destroyer(stream, reading, writing) {
let finished = false;
Expand Down Expand Up @@ -81,11 +99,7 @@ function makeAsyncIterable(val) {
}

async function* fromReadable(val) {
if (!Readable) {
Readable = require('internal/streams/readable');
}

yield* Readable.prototype[SymbolAsyncIterator].call(val);
yield* lazyloadReadable().prototype[SymbolAsyncIterator].call(val);
}

async function pump(iterable, writable, finish, { end }) {
Expand Down Expand Up @@ -147,6 +161,20 @@ async function pump(iterable, writable, finish, { end }) {
}
}

function convertToNodeStreamIfWebstream(stream) {
if (isReadableStream(stream)) {
return lazyloadReadable().fromWeb(stream);
} else if (isWritableStream(stream)) {
return lazyloadWritable().fromWeb(stream);
} else if (isTransformStream(stream)) {
return Duplex.from({
writable: stream.writable,
readable: stream.readable
});
}
return stream;
}

function pipeline(...streams) {
return pipelineImpl(streams, once(popCallback(streams)));
}
Expand Down Expand Up @@ -212,7 +240,7 @@ function pipelineImpl(streams, callback, opts) {

let ret;
for (let i = 0; i < streams.length; i++) {
const stream = streams[i];
const stream = convertToNodeStreamIfWebstream(streams[i]);
const reading = i < streams.length - 1;
const writing = i > 0;
const end = reading || opts?.end !== false;
Expand Down
10 changes: 10 additions & 0 deletions lib/internal/streams/utils.js
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,15 @@ function isWritableStream(obj) {
);
}

function isTransformStream(obj) {
return !!(
obj &&
!isNodeStream(obj) &&
typeof obj.readable === 'object' &&
typeof obj.writable === 'object'
);
}

function isIterable(obj, isAsync) {
if (obj == null) return false;
if (isAsync === true) return typeof obj[SymbolAsyncIterator] === 'function';
Expand Down Expand Up @@ -312,4 +321,5 @@ module.exports = {
isServerRequest,
isServerResponse,
willEmitClose,
isTransformStream,
};
Loading

0 comments on commit 4d04a5e

Please sign in to comment.