Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Node Streams constructed from Web Streams not emitting the 'end' event #46149

Closed
debadree25 opened this issue Jan 9, 2023 · 11 comments
Closed

Comments

@debadree25
Copy link
Member

debadree25 commented Jan 9, 2023

Version

v20.0.0-pre

Platform

Darwin MacBook-Pro.local 22.1.0 Darwin Kernel Version 22.1.0: Sun Oct 9 20:14:54 PDT 2022; root:xnu-8792.41.9~2/RELEASE_X86_64 x86_64

Subsystem

streams

What steps will reproduce the bug?

following is the code I tried to run

import { ReadableStream } from 'node:stream/web';
import { Readable } from 'node:stream';

const stream = Readable.fromWeb(new ReadableStream({
  start(controller) {
    controller.enqueue('foo');
    controller.close();
  },
}));

stream.once('readable', () => {
  console.log(stream.read()?.toString());
});

stream.on('end', () => {
  console.log('ended');
});

How often does it reproduce? Is there a required condition?

Always

What is the expected behavior?

foo
ended

What do you see instead?

foo

Additional information

This found while trying to work on #39316

@climba03003
Copy link
Contributor

climba03003 commented Jan 10, 2023

You still have something left inside the queue, so it will not emit end until the whole stream is consumed.

import { ReadableStream } from 'node:stream/web';
import { Readable } from 'node:stream';

const stream = Readable.fromWeb(new ReadableStream({
  start(controller) {
    controller.enqueue('foo'); // 1. ['foo']
    controller.close(); // 3. [null]
  },
}));

// since it is once only. null is left behind for notify the stream is ended.
stream.once('readable', () => {
  console.log(stream.read()?.toString()); // 2. [ ]
});

// if we use on, the null one will be read and properly close
// stream.on('readable', () => {
//  console.log(stream.read()?.toString()); // 2. [ ] 4. [ ]
// });

stream.on('end', () => {
  console.log('ended');
});

@debadree25
Copy link
Member Author

debadree25 commented Jan 10, 2023

Hi @climba03003 thank you so much for the explanation, mistake on my side for using once but now question arises on reading the null part for example running this:

import { ReadableStream } from 'node:stream/web';

import { Readable } from 'node:stream';

const stream = Readable.fromWeb(new ReadableStream({
  start(controller) {
    controller.enqueue('foo');
    controller.close();
  },
}));

stream.on('readable', () => {
  console.log(stream.read()?.toString());
});

stream.on('end', () => {
  console.log('ended');
});

gives the output as

foo
undefined
ended

whereas running this code with Readable

import { Readable } from 'node:stream';

const stream = new Readable({
    read() {
        this.push('foo');
        this.push(null);
    }
});

stream.on('readable', () => {
    console.log(stream.read()?.toString());
});

stream.on('end', () => {
    console.log('ended');
});

gives the output

foo
ended

could the undefined that comes when using ReadableStream be an inconsistency/bug?

@climba03003
Copy link
Contributor

climba03003 commented Jan 10, 2023

This is how .fromWeb is doing, ReadableStream is always Promise-based.
You can replace process.nextTick with Promise.resolve().then which is closest to actual behavior.

import { Readable } from 'node:stream';

let num = 0

const stream = new Readable({
  read() {
    // should be promise in ReadableStream
    // but we use nextTick to do the same effect
    if(num === 0) {
      process.nextTick(() => {
        this.push('foo') // 1. ['foo']
      })
    } else {
      process.nextTick(() => {
        this.push(null) // 3. [null] - here is the one left behind
      })
    }
    num++
  }
});

stream.once('readable', () => {
  console.log(stream.read()) // 2. [ ]
});

stream.on('end', () => {
  console.log('ended');
});

@debadree25
Copy link
Member Author

debadree25 commented Jan 10, 2023

Understood!

So I guess the promise based behaviour of fromWeb() is causing a lone null to be left behind in the queue? and should be consider that to be a bug?

@debadree25
Copy link
Member Author

debadree25 commented Jan 10, 2023

Also if I understand correctly pushing null should immediately trigger end of stream as per the code over here?

@climba03003
Copy link
Contributor

climba03003 commented Jan 10, 2023

So I guess the promise based behaviour of fromWeb() is causing a lone null to be left behind in the queue?

Partially, Yes. It is how microtask and sync works.

and should be consider that to be a bug?

Personally, no. It is a misconception of code flow. But it require the core member to identify if it is a bug.

Also if I understand correctly pushing null should immediately trigger end of stream as per the code over here?

It would not accept more chunk, but it does not mean to emit end immediately since the data is queued but never consumed.
end only emit when the queue is drain (no more data left inside the queue).

@debadree25
Copy link
Member Author

Understood thank you so much @climba03003!
I am leaving the issue open just case any core member would like to opine here

@nornagon
Copy link
Contributor

nornagon commented Mar 7, 2023

I'm a bit confused, this seems like a bug to me:

const { Readable, PassThrough } = require('stream')

function getStream() {
  const body = new PassThrough();

  body.push('test');
  body.push(null);

  return body;
}

async function main() {
  const nodeStream = getStream()
  nodeStream.on('data', console.log)

  await new Promise(resolve => nodeStream.once('end', resolve))
  console.log('nodeStream done')

  const wrappedNodeStream = Readable.fromWeb(Readable.toWeb(getStream()))

  wrappedNodeStream.on('data', console.log)
  wrappedNodeStream.on('end', () => console.log('end')) // This event is never emitted!
}

main()

The above code produces:

<Buffer 74 65 73 74>
nodeStream done
<Buffer 74 65 73 74>

i.e. the 'end' event from the double-wrapped stream is never emitted.

How am I supposed to detect the end of such a stream? A null value never appears in the wrapped stream's output, and the 'readable' event is only fired once, with the data value.

@nornagon
Copy link
Contributor

nornagon commented Mar 7, 2023

I think this is a bug in Readable.toWeb, actually, which never listens to 'end'. An even simpler demonstration of this bug:

const { Readable, PassThrough } = require('stream')

function getStream() {
  const body = new PassThrough();

  body.push('test');
  body.push(null);

  return body;
}

async function main() {
  const webStream = Readable.toWeb(getStream())
  for await (const v of webStream) console.log(v)
  console.log('here') // This line is never printed!
}

main()

The for await never terminates! The whole executable quits without ever printing here.

@climba03003
Copy link
Contributor

climba03003 commented Mar 8, 2023

The for await never terminates! The whole executable quits without ever printing here.

Your problem is that you are using Duplex.
Duplex provide both readable side and writable side.

Readable.toWeb only consume readable side of data so it is not behave same as you thought.
The one holding the for await is writable never ended.

@nornagon
Copy link
Contributor

nornagon commented Mar 9, 2023

@climba03003 yeah, that was it. Passing {allowHalfOpen: false} to PassThrough fixes the issue.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants