Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Race condition when ending a stream #40471

Open
julienw opened this issue Oct 15, 2021 · 3 comments
Open

Race condition when ending a stream #40471

julienw opened this issue Oct 15, 2021 · 3 comments
Labels
stream Issues and PRs related to the stream subsystem.

Comments

@julienw
Copy link

julienw commented Oct 15, 2021

Version

v14.17.5 and v16.11.1

Platform

Linux torri 5.13.14 #29 SMP Mon Sep 6 12:28:17 CEST 2021 x86_64 GNU/Linux

Subsystem

stream

What steps will reproduce the bug?

Clone https://github.com/julienw/streams-problems
Run the script node-streams.js with node v14 or node v16.

Here is the content of this script:

import Stream, { Writable, PassThrough } from "stream";
import util from "util";

const pipeline = util.promisify(Stream.pipeline);
const nextTick = util.promisify(process.nextTick);

// This Transform cheaply checks that a gzipped stream looks like a json.
class MyStream extends Writable {
  _write(chunk, encoding, callback) {
    console.log("_write", chunk);
    callback();
    // Calling end will also stop any piping gracefully.
    // Using nextTick allows some bufferred write to finish.
    process.nextTick(() => this.end());
  }

  // This is called when all the data has been given to _write and the
  // stream is ended.
  _final(callback) {
    console.log("_final()");
    callback();
  }
}

async function run() {
  console.log("START TEST");
  const fixture = "WRITE SOMETHING";
  const checker = new MyStream();
  const input = new PassThrough();
  const pipelinePromise = pipeline(input, checker);
  console.log("WRITE 1");
  input.write(fixture.slice(0, 3));
  await nextTick();
  console.log("WRITE 2");
  input.end(fixture.slice(3));
  console.log("WAIT FOR PIPELINE END");
  await pipelinePromise;
  console.log("FINISHED");
}

// Keeps the node process running
const intervalId = setInterval(() => {}, 1000);

console.log("Starting");
run()
  .then(() => console.log("End!"))
  .catch((e) => console.error(e))
  .then(() => clearInterval(intervalId));

(This needs to be run with type:"module" so it's probably easier to clone the repository)

In v12, we get this output:

Starting
START TEST
WRITE 1
_write <Buffer 57 52 49>
_final()
WRITE 2
WAIT FOR PIPELINE END
FINISHED
End!

But in v14 and v16 we get this:

Starting
START TEST
WRITE 1
_write <Buffer 57 52 49>
_final()
WRITE 2
WAIT FOR PIPELINE END

And this never ends.

As you see, we're calling this.end() in _write() after a nextTick. The original idea was that this stream's purpose was finished (checking something) and calling end would unpipe, and we wouldn't get more writes. The goal is to stop doing more work.

How often does it reproduce? Is there a required condition?

Always

What is the expected behavior?

I think we should get one of these results:

  1. like v12, the pipeline is unpiped and therefore it should end because Passthrough will just finish consuming the data. (I think)
  2. otherwise, we should get an error because we're trying to write after end ([ERR_STREAM_WRITE_AFTER_END]: write after end).

What do you see instead?

Here it looks like that the pipeline isn't unpiped AND we don't get an error. So the written data isn't consumed by the stream that's been ended, and we're waiting forever.

Additional information

Replacing process.nextTick(() => this.end()); with something like await nextTick(); this.end(); gets an error (behavior 2 above), which is different than node v12 but at least we're not waiting forever and we get some clue. Update Oct 18: we get the same error in v12 and v14, but in v16 we get the behavior of waiting forever.

Any insight will be very much appreciated. Especially can you point to what changed in v14 in this topic compared to v12 (I do know that a lot of the streams code changed in-between)? Also it seems to be than waiting forever isn't the right behavior.

Thanks!

@VoltrexKeyva VoltrexKeyva added the stream Issues and PRs related to the stream subsystem. label Oct 15, 2021
@julienw
Copy link
Author

julienw commented Oct 19, 2021

From my studying of the node code, I'm starting to think that the behavior of "waiting forever" is the correct one -- and therefore that v14.x and v16.x fixed issues.

Let me explain (because I find that the documentation isn't always good enough at this level of details). This is my understanding and may contain also imprecisions.

src.pipe(dest)
=> when src emits end, it will call dest.end() automatically (unless we pass the option end: false, in this case it will only unpipe).
=> when dest emits finish or close, it will call src.unpipe(dest) automatically... but not src.end() (which makes sense).
=> no handling of errors

pipeline(src, dest, callback)
=> does a lot of magic with things like async generators etc, but for regular node streams this calls src.pipe(dest)
=> the callback is called only when all streams are ended/finished (using the same function than stream.finished).
=> because it used stream.finished, it also reacts to errors, and in that case it will destroy all streams in the pipeline.

Therefore, what happens in the case above:

  • this.end() is called a very short term after the first write => unpipe is called
  • The source PassThrough is waiting forever for its data to be consumed, until then it doesn't emit the end event, and therefore the pipeline never ends either.

I now think the faulty behaviors in v12.x and v14.x were due to a race or several races around the use of nextTick.:

  • it would end "properly" (my initial expectation) if the call to end was handled a bit later.
  • The error [ERR_STREAM_WRITE_AFTER_END]: write after end would happen if unpipe wasn't called in a timely manner.

I couldn't pinpoint which different code produces these different behaviors yet.

@julienw
Copy link
Author

julienw commented Oct 19, 2021

This is more puzzling. I tried simplifying my testcase, removing all the piping. This is the run function with the same Writable implementation. finished is a promisified Stream.finished.

  console.log("START TEST");
  const fixture = "WRITE SOMETHING";
  const checker = new MyStream();
  console.log("WRITE 1");
  checker.write(fixture.slice(0, 3));
  await nextTick();
  console.log("WRITE 2");
  checker.end(fixture.slice(3));
  console.log("WAIT FOR PIPELINE END");
  await finished(checker);
  console.log("FINISHED");

Now, v12 gives me the error I expect: Error [ERR_STREAM_WRITE_AFTER_END]: write after end, but v14 and v16 runs it to the end... Could that be a real bug?
I get the error if I register an "error" event handler, which is a good thing.

@julienw
Copy link
Author

julienw commented Oct 19, 2021

I fixed my issue by removing the call to end inside the stream itself, instead emitting a custom event, and handling it outside of the stream.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
stream Issues and PRs related to the stream subsystem.
Projects
None yet
Development

No branches or pull requests

2 participants