Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature Request: way to merge streams like pumpify(a, b) #32020

Closed
JasonWoof opened this issue Feb 29, 2020 · 13 comments
Closed

Feature Request: way to merge streams like pumpify(a, b) #32020

JasonWoof opened this issue Feb 29, 2020 · 13 comments
Labels
feature request Issues that request new features to be added to Node.js. stream Issues and PRs related to the stream subsystem.

Comments

@JasonWoof
Copy link

What steps will reproduce the bug?

const stream = require('stream');

// two different transforms (these work)
const transform_double = new stream.Transform({
    transform(chunk, encoding, callback) {
        this.push(chunk);
        this.push(chunk);
        callback();
    }
});
const transform_uppercase = new stream.Transform({
    transform(chunk, encoding, callback) {
        this.push(chunk.toString().toUpperCase());
        callback();
    }
});

// compose the transforms
const parser = stream.pipeline(transform_double, transform_uppercase, ()=>0);

// run stdin through it
process.stdin.pipe(parser).pipe(process.stdout);

Then pipe some text to it eg: echo "foo" | node reproduce.js

How often does it reproduce? Is there a required condition?

always

What is the expected behavior?

The text sent in should go through both transformations, eg output "FOOFOO"

What do you see instead?

The text piped in is only sent to the last transformation, eg output "FOO"

Additional information

The docs here don't say what this function returns:
https://nodejs.org/api/stream.html#stream_stream_pipeline_streams_callback

I'm saying that it should return a duplex stream so I can read and write to the ends of the pipeline.

@JasonWoof
Copy link
Author

oh, that system/version is from me running it in the docker node:latest image. I get exactly the same results with v10.17.0 in debian unstable (not in docker).

@Trott Trott added the stream Issues and PRs related to the stream subsystem. label Feb 29, 2020
@Trott
Copy link
Member

Trott commented Feb 29, 2020

@nodejs/streams

@vweevers
Copy link
Contributor

@JasonWoof stream.pipeline() returns the last stream passed in, in the spirit of a.pipe(b).pipe(c). Which in your case is transform_uppercase.

To get a duplex stream instead, you might like pumpify.

@JasonWoof
Copy link
Author

I don't see the point of stream.pipeline() if it does the same thing as a.pipe(b).pipe(c) and doesn't create something you can pipe data to.

Technically current stream.pipeline() does return something you can pipe to, but piping to it does something very weird (ie neither useful nor expected.)

I think node should have the ability to compose streams without pulling in god knows how many npm modules.

If stream.pipeline() worked the way I suggest, you could still do stream.pipeline(a,b).pipe(c) and you could also do a.pipe(stream.pipeline(b, c)).pipe(d)

@vweevers
Copy link
Contributor

I don't see the point of stream.pipeline()

It forwards errors and does cleanup. That's cumbersome without stream.pipeline().

Technically current stream.pipeline() does return something you can pipe to, but piping to it does something very weird (ie neither useful nor expected.)

It returns the last stream (pipeline(a, b) === b). That's useful for composition, though not the way you expected it. I do agree the return value should be documented to avoid confusion.

If stream.pipeline() worked the way I suggest, you could still do stream.pipeline(a,b).pipe(c)

If stream.pipeline(a,b) were to return a duplex stream, that writes to the first stream and reads from the last, that's not quite the same. Such a utility is undoubtedly useful, which is why I suggested pumpify, but it must be separate from stream.pipeline().

@JasonWoof
Copy link
Author

OK, sounds like stream.pipeline() isn't going to change, so I'll try making this a feature request for a built-in way to combine streams. I think it's good to have this sort of stuff built in because then more people will get the error forwarding, cleanup, and proper backpressure that they might otherwise not bother implementing (or even realize they need to).

@JasonWoof JasonWoof changed the title piping to stream.pipeline() sends data to the wrong stream Feature Request: way to merge streams like pumpify(a, b) Mar 1, 2020
@Trott Trott added the feature request Issues that request new features to be added to Node.js. label Mar 1, 2020
@mcollina
Copy link
Member

mcollina commented Mar 1, 2020

I would tend to agree that this would be a nice API to add. The main difficulty is to add it with very little perf overhead (no double buffering, like pumpify/duplexify do).

A good starting point could be to lift Duplexify and Pumpify and their tests and send a PR.

More than anything else, this feature needs somebody willing to devote time to it.

cc @ronag @mafintosh

@ronag
Copy link
Member

ronag commented Mar 1, 2020

A good starting point could be to lift Duplexify and Pumpify and their tests and send a PR.

I'm -1 on this. A lot of things has happened in streams and I'm not convinced the way these are implemented are the best way to achieve the discussed functionality.

More than anything else, this feature needs somebody willing to devote time to it.

I might be interested in this at some point in the future.

@JasonWoof
Copy link
Author

JasonWoof commented Mar 1, 2020

I'm in learning mode with streams (ie hobby project no deadline) so I wrote my own pumpify as a learning exercise. I created a transform stream and did all the fiddling to pass along back-pressure and errors and whatnot.

But this could be much cleaner if it was built-in. All I want is a duplex stream interface that writes to the first stream in pipeline and reads from the last. I'm imagining something like:

function pipelineify (...streams) {
    // pipe to each other and forward errors
    stream.pipeline(...streams);
    // create a stream object that writes to the start
    // of the pipeline and reads from the end
    return new stream.Duplex({
        writeStream: streams[0],
        readStream: streams[streams.length - 1],
    });
}

@alesmenzel
Copy link

new Duplex({
  writeStream: <Writable>,
  readStream: <Readable>,
});

@JasonWoof from which version of Node is that Duplex writeStream/readStream options supported? I cannot find any mention of this in the docs.

@vweevers a question regarding .pipe(), it seems there is a lot of contradictory posts on whether I should handle the stream close/error/finish/end events, e.g. according to the Node docs it is not required to handle those events while piping, e.g. from this example

but then there are blog posts saying one should destroy the streams manually

what is the correct approach, should we handle destroying the streams when they close?

Actually there is a nearly hidden note on memory leaks due to not manually destroying the writable stream on read errors.

One important caveat is that if the Readable stream emits an error during processing, the Writable destination is not closed automatically. If an error occurs, it will be necessary to manually close each stream in order to prevent memory leaks.

So should we only care about the error event and if the stream ends without error does it destroy itself automatically?

For example, is this correctly handling the pipe (= only handling the readstream error events)?

const readable = fs.createReadStream('x.txt')
const writable = fs.createWriteStream('y.txt')

readable.once('error' , (err) => {
  writable.destroy(err)
})

readable.pipe(writable)

Sorry for a little off-topic questions.

@mcollina
Copy link
Member

mcollina commented May 4, 2020

@JasonWoof
Copy link
Author

@alesmenzel none to date. That is my feature request.

@alesmenzel
Copy link

@JasonWoof oh, I misread that

ronag added a commit to nxtedition/node that referenced this issue Jun 14, 2021
pipe is similar to pipeline however it supports stream composition.

Refs: nodejs#32020
ronag added a commit to nxtedition/node that referenced this issue Jun 15, 2021
pipe is similar to pipeline however it supports stream composition.

Refs: nodejs#32020
ronag added a commit to nxtedition/node that referenced this issue Jun 15, 2021
pipe is similar to pipeline however it supports stream composition.

Refs: nodejs#32020
ronag added a commit to nxtedition/node that referenced this issue Jun 15, 2021
pipe is similar to pipeline however it supports stream composition.

Refs: nodejs#32020
ronag added a commit to nxtedition/node that referenced this issue Jun 15, 2021
pipe is similar to pipeline however it supports stream composition.

Refs: nodejs#32020
ronag added a commit to nxtedition/node that referenced this issue Jun 15, 2021
pipe is similar to pipeline however it supports stream composition.

Refs: nodejs#32020
ronag added a commit to nxtedition/node that referenced this issue Jun 15, 2021
pipe is similar to pipeline however it supports stream composition.

Refs: nodejs#32020
ronag added a commit to nxtedition/node that referenced this issue Jun 15, 2021
pipe is similar to pipeline however it supports stream composition.

Refs: nodejs#32020
ronag added a commit to nxtedition/node that referenced this issue Jun 15, 2021
pipe is similar to pipeline however it supports stream composition.

Refs: nodejs#32020
ronag added a commit to nxtedition/node that referenced this issue Jun 15, 2021
pipe is similar to pipeline however it supports stream composition.

Refs: nodejs#32020
ronag added a commit to nxtedition/node that referenced this issue Jun 15, 2021
pipe is similar to pipeline however it supports stream composition.

Refs: nodejs#32020
ronag added a commit to nxtedition/node that referenced this issue Jun 15, 2021
pipe is similar to pipeline however it supports stream composition.

Refs: nodejs#32020
ronag added a commit to nxtedition/node that referenced this issue Jun 15, 2021
pipe is similar to pipeline however it supports stream composition.

Refs: nodejs#32020
ronag added a commit to nxtedition/node that referenced this issue Jun 15, 2021
pipe is similar to pipeline however it supports stream composition.

Refs: nodejs#32020
ronag added a commit to nxtedition/node that referenced this issue Jun 15, 2021
pipe is similar to pipeline however it supports stream composition.

Refs: nodejs#32020
ronag added a commit to nxtedition/node that referenced this issue Jun 15, 2021
pipe is similar to pipeline however it supports stream composition.

Refs: nodejs#32020
ronag added a commit to nxtedition/node that referenced this issue Jun 15, 2021
pipe is similar to pipeline however it supports stream composition.

Refs: nodejs#32020
ronag added a commit to nxtedition/node that referenced this issue Jun 15, 2021
pipe is similar to pipeline however it supports stream composition.

Refs: nodejs#32020
ronag added a commit to nxtedition/node that referenced this issue Jun 15, 2021
pipe is similar to pipeline however it supports stream composition.

Refs: nodejs#32020
ronag added a commit to nxtedition/node that referenced this issue Jun 15, 2021
pipe is similar to pipeline however it supports stream composition.

Refs: nodejs#32020
ronag added a commit to nxtedition/node that referenced this issue Jun 15, 2021
pipe is similar to pipeline however it supports stream composition.

Refs: nodejs#32020
ronag added a commit to nxtedition/node that referenced this issue Jun 18, 2021
ronag added a commit to nxtedition/node that referenced this issue Jun 18, 2021
targos pushed a commit to targos/node that referenced this issue Jun 19, 2021
ronag added a commit to nxtedition/node that referenced this issue Jun 30, 2021
ronag added a commit to nxtedition/node that referenced this issue Jul 7, 2021
ronag added a commit to nxtedition/node that referenced this issue Jul 12, 2021
ronag added a commit to nxtedition/node that referenced this issue Jul 12, 2021
ronag added a commit that referenced this issue Jul 19, 2021
Refs: #32020

PR-URL: #39029
Reviewed-By: Matteo Collina <[email protected]>
Reviewed-By: Luigi Pinca <[email protected]>
Reviewed-By: Michaël Zasso <[email protected]>
ronag added a commit to nxtedition/node that referenced this issue Jul 28, 2021
Refs: nodejs#32020

PR-URL: nodejs#39029
Reviewed-By: Matteo Collina <[email protected]>
Reviewed-By: Luigi Pinca <[email protected]>
Reviewed-By: Michaël Zasso <[email protected]>
Backport-PR-URL:
ronag added a commit to nxtedition/node that referenced this issue Jul 28, 2021
Refs: nodejs#32020

PR-URL: nodejs#39029
Reviewed-By: Matteo Collina <[email protected]>
Reviewed-By: Luigi Pinca <[email protected]>
Reviewed-By: Michaël Zasso <[email protected]>
Backport-PR-URL: nodejs#39563
ronag added a commit to nxtedition/node that referenced this issue Jul 29, 2021
Refs: nodejs#32020

PR-URL: nodejs#39029
Reviewed-By: Matteo Collina <[email protected]>
Reviewed-By: Luigi Pinca <[email protected]>
Reviewed-By: Michaël Zasso <[email protected]>
Backport-PR-URL: nodejs#39563
ronag added a commit to nxtedition/node that referenced this issue Jul 29, 2021
Refs: nodejs#32020

PR-URL: nodejs#39029
Reviewed-By: Matteo Collina <[email protected]>
Reviewed-By: Luigi Pinca <[email protected]>
Reviewed-By: Michaël Zasso <[email protected]>
Backport-PR-URL: nodejs#39563
ronag added a commit to nxtedition/node that referenced this issue Aug 1, 2021
Refs: nodejs#32020

PR-URL: nodejs#39029
Reviewed-By: Matteo Collina <[email protected]>
Reviewed-By: Luigi Pinca <[email protected]>
Reviewed-By: Michaël Zasso <[email protected]>
Backport-PR-URL: nodejs#39563
ronag added a commit to nxtedition/node that referenced this issue Aug 1, 2021
Refs: nodejs#32020

PR-URL: nodejs#39029
Reviewed-By: Matteo Collina <[email protected]>
Reviewed-By: Luigi Pinca <[email protected]>
Reviewed-By: Michaël Zasso <[email protected]>
Backport-PR-URL: nodejs#39563
ronag added a commit to nxtedition/node that referenced this issue Aug 1, 2021
Refs: nodejs#32020

PR-URL: nodejs#39029
Reviewed-By: Matteo Collina <[email protected]>
Reviewed-By: Luigi Pinca <[email protected]>
Reviewed-By: Michaël Zasso <[email protected]>
Backport-PR-URL: nodejs#39563
ronag added a commit to nxtedition/node that referenced this issue Aug 23, 2021
Refs: nodejs#32020

PR-URL: nodejs#39029
Reviewed-By: Matteo Collina <[email protected]>
Reviewed-By: Luigi Pinca <[email protected]>
Reviewed-By: Michaël Zasso <[email protected]>
Backport-PR-URL: nodejs#39563
ronag added a commit to nxtedition/node that referenced this issue Aug 23, 2021
Refs: nodejs#32020

PR-URL: nodejs#39029
Reviewed-By: Matteo Collina <[email protected]>
Reviewed-By: Luigi Pinca <[email protected]>
Reviewed-By: Michaël Zasso <[email protected]>
Backport-PR-URL: nodejs#39563
ronag added a commit to nxtedition/node that referenced this issue Aug 23, 2021
Refs: nodejs#32020

PR-URL: nodejs#39029
Reviewed-By: Matteo Collina <[email protected]>
Reviewed-By: Luigi Pinca <[email protected]>
Reviewed-By: Michaël Zasso <[email protected]>
Backport-PR-URL: nodejs#39563
targos pushed a commit that referenced this issue Sep 6, 2021
Refs: #32020

PR-URL: #39029
Reviewed-By: Matteo Collina <[email protected]>
Reviewed-By: Luigi Pinca <[email protected]>
Reviewed-By: Michaël Zasso <[email protected]>
Backport-PR-URL: #39563
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
feature request Issues that request new features to be added to Node.js. stream Issues and PRs related to the stream subsystem.
Projects
None yet
Development

No branches or pull requests

7 participants