Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

stream.compose(...) doesn't destroy all active composed streams when it is destroyed #51987

Closed
headlessme opened this issue Mar 6, 2024 · 0 comments · Fixed by #53213
Closed
Labels
stream Issues and PRs related to the stream subsystem.

Comments

@headlessme
Copy link

headlessme commented Mar 6, 2024

Version

20.11.0

Platform

Darwin MacBook-Pro-4.local 23.1.0 Darwin Kernel Version 23.1.0: Mon Oct 9 21:27:27 PDT 2023; root:xnu-10002.41.9~6/RELEASE_X86_64 x86_64

Subsystem

stream

What steps will reproduce the bug?

When calling destroy(err) on a stream created using stream.compose() some streams that may still be actively processing data do not have _destroy called and do not emit an error event. Illustrated by the code below.

If there is a stream that slowly processes data as the last entry in compose(...) and all it's input has been written (finish event has fired) and then destroy() is called on the composed stream, I was expecting the slow processing stream to also be destroyed and emit the error.

Inserting a PassThrough stream as the last entry in the compose chain seems to fix the issue (uncomment lines below in sample code), but it's unclear to me why that is. Either I've misunderstood something about compose()ing streams or there's a bug.

import { PassThrough, Duplex, compose } from 'stream'

class SlowProcessor extends Duplex {
  constructor (options) {
    super({ ...options, objectMode: true })
    this.stuff = []
  }

  _write (message, encoding, callback) {
    this.stuff.push(message)
    callback()
  }

  _destroy(err, cb) {
      console.log('SlowProcessor _destroy called', err)
      cb(err)
  }

  _read () {
      // emulate some slow processing
      setTimeout(() => {
        if (this.stuff.length) {
          this.push(this.stuff.shift())
        } else if (this.writableEnded) {
          this.push(null)
        } else {
          setTimeout(() => this._read(), 100)
        }
      }, 100)
   }
}

const composed = compose(
    new PassThrough({ objectMode:true })
        .on('finish', () => console.log('input passthrough finish'))
        .on('end', () => console.log('input passthrough end'))
        .on('error', err => console.log('input passthrough error', err)),
    new SlowProcessor()
        .on('finish', () => console.log('SlowProcessor finish'))
        .on('end', () => console.log('SlowProcessor end'))
        .on('error', err => console.log('SlowProcessor error', err)),
    //
    // UNCOMMENT THIS AND IT WORKS AS EXPECTED
    //
    // new PassThrough({ objectMode:true })
    //     .on('finish', () => console.log('terminal passthrough finish'))
    //     .on('end', () => console.log('terminal passthrough end'))
    //     .on('error', err => console.log('terminal passthrough error', err))
)
    .on('finish', () => console.log('composed finish'))
    .on('error', err => console.log('composed', err))
    .on('end', () => console.log('composed end'))
    .on('data', d => console.log('data:', d))

composed.write('hello')
composed.write('world')
composed.end()

setTimeout(() => {
    composed.destroy(new Error('Uh-oh'))
}, 100)

How often does it reproduce? Is there a required condition?

Reliably reproduces using sample above.

What is the expected behavior? Why is that the expected behavior?

I'd expect any stream that's part of a compose(...) chain that is still processing data to emit errors when the outer stream is destroyed.

I'd expect output from the above script something like:

input passthrough end
input passthrough finish
SlowProcessor finish
_destroy called Error: Uh-oh
SlowProcessor error Error: Uh-oh
composed Error: Uh-oh

What do you see instead?

The SlowProcessor stream does not get destroyed even though its Readable side has not ended.

input passthrough end
input passthrough finish
SlowProcessor finish
// <=== I'd expect an error event from the SlowProcessor here as it hasn't ended
composed Error: Uh-oh

When there's an extra PassThrough inserted as the last in the compose chain, the results look as I'd expect:

input passthrough end
input passthrough finish
SlowProcessor finish
terminal passthrough error Error: Uh-oh
SlowProcessor _destroy called Error: Uh-oh
SlowProcessor error Error: Uh-oh
composed Error: Uh-oh

Additional information

No response

@headlessme headlessme changed the title stream.compose(...) doesn't destroy all active streams when it is destroyed stream.compose(...) doesn't destroy all active composed streams when it is destroyed Mar 6, 2024
@VoltrexKeyva VoltrexKeyva added the stream Issues and PRs related to the stream subsystem. label Mar 7, 2024
nodejs-github-bot pushed a commit that referenced this issue Jun 10, 2024
PR-URL: #53213
Fixes: #51987
Reviewed-By: Robert Nagy <[email protected]>
Reviewed-By: Matteo Collina <[email protected]>
Reviewed-By: Benjamin Gruenbaum <[email protected]>
targos pushed a commit that referenced this issue Jun 20, 2024
PR-URL: #53213
Fixes: #51987
Reviewed-By: Robert Nagy <[email protected]>
Reviewed-By: Matteo Collina <[email protected]>
Reviewed-By: Benjamin Gruenbaum <[email protected]>
EliphazBouye pushed a commit to EliphazBouye/node that referenced this issue Jun 20, 2024
PR-URL: nodejs#53213
Fixes: nodejs#51987
Reviewed-By: Robert Nagy <[email protected]>
Reviewed-By: Matteo Collina <[email protected]>
Reviewed-By: Benjamin Gruenbaum <[email protected]>
bmeck pushed a commit to bmeck/node that referenced this issue Jun 22, 2024
PR-URL: nodejs#53213
Fixes: nodejs#51987
Reviewed-By: Robert Nagy <[email protected]>
Reviewed-By: Matteo Collina <[email protected]>
Reviewed-By: Benjamin Gruenbaum <[email protected]>
marco-ippolito pushed a commit that referenced this issue Jul 19, 2024
PR-URL: #53213
Fixes: #51987
Reviewed-By: Robert Nagy <[email protected]>
Reviewed-By: Matteo Collina <[email protected]>
Reviewed-By: Benjamin Gruenbaum <[email protected]>
marco-ippolito pushed a commit that referenced this issue Jul 19, 2024
PR-URL: #53213
Fixes: #51987
Reviewed-By: Robert Nagy <[email protected]>
Reviewed-By: Matteo Collina <[email protected]>
Reviewed-By: Benjamin Gruenbaum <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
stream Issues and PRs related to the stream subsystem.
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants