Skip to content

Commit

Permalink
stream: enable usage of webstreams on compose()
Browse files Browse the repository at this point in the history
  • Loading branch information
debadree25 committed Feb 15, 2023
1 parent f46515c commit 505d884
Show file tree
Hide file tree
Showing 2 changed files with 117 additions and 31 deletions.
146 changes: 116 additions & 30 deletions lib/internal/streams/compose.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@ const {
isNodeStream,
isReadable,
isWritable,
isWebStream,
isTransformStream,
isWritableStream,
isReadableStream,
} = require('internal/streams/utils');
const {
AbortError,
Expand All @@ -15,6 +19,7 @@ const {
ERR_MISSING_ARGS,
},
} = require('internal/errors');
const eos = require('internal/streams/end-of-stream');

module.exports = function compose(...streams) {
if (streams.length === 0) {
Expand Down Expand Up @@ -57,9 +62,8 @@ module.exports = function compose(...streams) {
}
}

let ondrain;
let onfinish;
let onreadable;
let writableEndDestructor;
let readableEndDestructor;
let onclose;
let d;

Expand All @@ -79,8 +83,8 @@ module.exports = function compose(...streams) {
const head = streams[0];
const tail = pipeline(streams, onfinished);

const writable = !!isWritable(head);
const readable = !!isReadable(tail);
const writable = !!(isWritable(head) || isWritableStream(head) || isTransformStream(head));
const readable = !!(isReadable(tail) || isReadableStream(tail) || isTransformStream(tail));

// TODO(ronag): Avoid double buffering.
// Implement Writable/Readable/Duplex traits.
Expand All @@ -94,15 +98,51 @@ module.exports = function compose(...streams) {
});

if (writable) {
d._write = function(chunk, encoding, callback) {
writableEndDestructor = makeWritableEnd(d, head, tail);
}

if (readable) {
readableEndDestructor = makeReadableEnd(d, head, tail);
}

d._destroy = function(err, callback) {
if (!err && onclose !== null) {
err = new AbortError();
}

if (readableEndDestructor) {
readableEndDestructor();
}

if (writableEndDestructor) {
writableEndDestructor();
}

if (onclose === null) {
callback(err);
} else {
onclose = callback;
destroyer(tail, err);
}
};

return d;
};

function makeWritableEnd(duplex, head, tail) {
let ondrain;
let onfinish;

if (isNodeStream(head)) {
duplex._write = function(chunk, encoding, callback) {
if (head.write(chunk, encoding)) {
callback();
} else {
ondrain = callback;
}
};

d._final = function(callback) {
duplex._final = function(callback) {
head.end();
onfinish = callback;
};
Expand All @@ -114,17 +154,61 @@ module.exports = function compose(...streams) {
cb();
}
});
} else if (isWebStream(head)) {
const writable = isTransformStream(head) ? head.writable : head;
const writer = writable.getWriter();

duplex._write = async function(chunk, encoding, callback) {
try {
await writer.ready;
writer.write(chunk).catch(() => {});
callback();
} catch (err) {
callback(err);
}
};

duplex._final = async function(callback) {
try {
await writer.ready;
writer.close();
onfinish = callback;
} catch (err) {
callback(err);
}
};
}

if (isNodeStream(tail)) {
tail.on('finish', function() {
if (onfinish) {
const cb = onfinish;
onfinish = null;
cb();
}
});
} else if (isWebStream(tail)) {
const readable = isTransformStream(tail) ? tail.readable : tail;
eos(readable, () => {
if (onfinish) {
const cb = onfinish;
onfinish = null;
cb();
}
});
}

if (readable) {
function destructor() {
ondrain = null;
onfinish = null;
}

return destructor;
}

function makeReadableEnd(duplex, head, tail) {
let onreadable;
if (isNodeStream(tail)) {
tail.on('readable', function() {
if (onreadable) {
const cb = onreadable;
Expand All @@ -134,41 +218,43 @@ module.exports = function compose(...streams) {
});

tail.on('end', function() {
d.push(null);
duplex.push(null);
});

d._read = function() {
duplex._read = function() {
while (true) {
const buf = tail.read();

if (buf === null) {
onreadable = d._read;
onreadable = duplex._read;
return;
}

if (!d.push(buf)) {
if (!duplex.push(buf)) {
return;
}
}
};
}
} else if (isWebStream(tail)) {
const readable = isTransformStream(tail) ? tail.readable : tail;
const reader = readable.getReader();
duplex._read = async function() {
while (true) {
const { value, done } = await reader.read();
if (done) {
duplex.push(null);
return;
}

d._destroy = function(err, callback) {
if (!err && onclose !== null) {
err = new AbortError();
}
if (!duplex.push(value)) {
return;
}
}
};
}

function destructor() {
onreadable = null;
ondrain = null;
onfinish = null;

if (onclose === null) {
callback(err);
} else {
onclose = callback;
destroyer(tail, err);
}
};
}

return d;
};
return destructor;
}
2 changes: 1 addition & 1 deletion lib/internal/streams/pipeline.js
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,7 @@ function pipelineImpl(streams, callback, opts) {
throw new ERR_INVALID_RETURN_VALUE(
'Iterable, AsyncIterable or Stream', 'source', ret);
}
} else if (isIterable(stream) || isReadableNodeStream(stream)) {
} else if (isIterable(stream) || isReadableNodeStream(stream) || isTransformStream(stream)) {
ret = stream;
} else {
ret = Duplex.from(stream);
Expand Down

0 comments on commit 505d884

Please sign in to comment.