Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Readable byte stream must call pull after receiving new chunk #877

Closed
MattiasBuelens opened this issue Feb 6, 2018 · 11 comments
Closed

Comments

@MattiasBuelens
Copy link
Collaborator

MattiasBuelens commented Feb 6, 2018

After a readable byte stream receives a new chunk from the underlying source, the stream must call pull (if needed). Otherwise, the stream may get stuck (see test below).

For regular streams, this is already done correctly: ReadableStreamDefaultController correctly calls ReadableStreamDefaultControllerCallPullIfNeeded. However, byte streams do not call the corresponding ReadableByteStreamControllerCallPullIfNeeded in response to enqueue, respond or respondWith.

Test

The following test (written as a Web platform test) demonstrates the problem. Here, a readable byte stream is constructed that will enqueue a new chunk every time pull is called, and closes the stream after 3 chunks. (This test is derived from the existing test "ReadableStream with byte source: Respond to pull() by enqueue() asynchronously", but enqueues its chunks in separate pulls instead.)

This test times out with the current reference implementation. The first read resolves, but the other reads remain pending. pull is only called 1 time, rather than the expected 4 times (3 times to enqueue a chunk, 1 last time to close the source).

However, if the test is changed to use regular readable streams instead of byte streams, the test passes (see highlighted line).

promise_test(() => {
  let pullCount = 0;

  let byobRequest;
  const desiredSizes = [];

  const stream = new ReadableStream({
    pull(c) {
      byobRequest = c.byobRequest;
      desiredSizes.push(c.desiredSize);

      if (pullCount < 3) {
        c.enqueue(new Uint8Array(1));
      } else {
        c.close();
      }

      ++pullCount;
    },
    type: 'bytes' // <<< removing this line makes the test pass
  }, {
    highWaterMark: 256
  });

  const reader = stream.getReader();

  const p0 = reader.read();
  const p1 = reader.read();
  const p2 = reader.read();

  assert_equals(pullCount, 0, 'No pull as start() just finished and is not yet reflected to the state of the stream');

  return Promise.all([p0, p1, p2]).then(result => {
    assert_equals(pullCount, 4, 'pullCount after completion of all read()s');

    assert_equals(result[0].done, false, 'result[0].done');
    assert_equals(result[0].value.byteLength, 1, 'result[0].value.byteLength');
    assert_equals(result[1].done, false, 'result[1].done');
    assert_equals(result[1].value.byteLength, 1, 'result[1].value.byteLength');
    assert_equals(result[2].done, false, 'result[2].done');
    assert_equals(result[2].value.byteLength, 1, 'result[2].value.byteLength');
    assert_equals(byobRequest, undefined, 'byobRequest should be undefined');
    assert_equals(desiredSizes[0], 256, 'desiredSize on pull should be 256');
    assert_equals(desiredSizes[1], 256, 'desiredSize after 1st enqueue() should be 256');
    assert_equals(desiredSizes[2], 256, 'desiredSize after 2nd enqueue() should be 256');
    assert_equals(desiredSizes[3], 256, 'desiredSize after 3rd enqueue() should be 256');
  });
}, 'ReadableStream with byte source: Respond to multiple pull() by separate enqueue()');

Proposed solution

I believe the fix should be to add calls to ReadableByteStreamControllerCallPullIfNeeded at the end of:

  • ReadableByteStreamControllerEnqueue (handles enqueue)
  • ReadableByteStreamControllerRespondInternal (handles respond and respondWith)

If you want, I can make a pull request that does just that. The above test can be added directly to the Web platform tests, but I'd also need to make similar tests for respond and respondWith.

@MattiasBuelens
Copy link
Collaborator Author

MattiasBuelens commented Feb 6, 2018

Maybe some background information, in case you're wondering how I ended up here...

I'm currently working on an adapter library between native streams (e.g. from a fetch Response.body) and polyfilled streams. Since many browsers are not yet up-to-date with the latest version of the spec, some features (e.g. pipeTo in Edge or readable byte streams in Chrome) are not yet available in these native streams. As such, users are in an awkward position when they want to use the latest features from a polyfill implementation together with streams created by the browser.

My idea is to implement helper functions that create an underlying source/sink that pulls from/writes to a given (native) readable/writable stream. This way, the created source/sink can be passed to a polyfilled stream constructor, resulting in a polyfilled stream that pulls from/writes to a native stream. Under the hood, the created source/sink implementations request a reader/writer from the original stream and use that to respond to pull/write calls.

So far, I have a somewhat functional adapter that creates an underlying source pulling from a given readable stream. To verify this adapter, I've created a new ReadableStream class whose constructor does the following:

  1. Construct a (native or polyfilled) ReadableStream using the passed underlying source.
  2. Construct an adapting source that wraps that stream (using my adapter).
  3. Construct a polyfilled ReadableStream using the adapting source.

So you end up with something like:

polyfill stream -> adapter source -> native stream -> original source

And then I try to get this ReadableStream implementation to pass all Web platform tests for streams! 😆 Incredibly, many tests are already passing, but I got stumped by this one test called "ReadableStream with byte source: Respond to pull() by enqueue() asynchronously". From my investigation, it's not my adapting source that's causing the test to fail, but a bug in the reference implementation and in the spec itself.

@ricea
Copy link
Collaborator

ricea commented Feb 7, 2018

I also think this test should pass.

@tyoshino, you're the expert on byte streams. Should this work?

@MattiasBuelens
Copy link
Collaborator Author

MattiasBuelens commented Feb 7, 2018

Hmm, looks like this might be slightly trickier than I initially thought. With my proposed fix, two tests fail:

  × ReadableStream with byte source: autoAllocateChunkSize
  
    assert_equals: pull() should only be invoked once expected 1 but got 2
        at Promise.resolve.then.then.result (http://127.0.0.1:60503/streams/readable-byte-streams/general.js:349:5)

  × ReadableStream with byte source: Respond to pull() by enqueue() asynchronously
  
    assert_equals: pullCount after completion of all read()s expected 1 but got 2
        at Promise.all.then.result (http://127.0.0.1:60503/streams/readable-byte-streams/general.js:710:5)

The first test seems wrong to me. There are two read() calls, pull() calls respond() once, so I'd expect two pull()s are needed to fulfill both reads. However, the assert seems to expect only one pull? Why is that?

The second test seems like a bug with my fix. Here, there are three reads, and three chunks have been enqueued after the first pull() call finishes, so there's no point in calling pull() a second time. It looks like I need to tweak this a bit...

EDIT1: One solution for the second test failure would be to check ShouldCallPull again if pullAgain is set after pull()ing, for example:

if (controller._pullAgain === true && ReadableStreamDefaultControllerShouldCallPull(controller)) {
  controller._pullAgain = false;
  return ReadableStreamDefaultControllerCallPullIfNeeded(controller);
}

Alternatively, we'd need to somehow unset the pullAgain flag under the appropriate conditions.

EDIT2: My bad, I've been messing around with the tests too much. I accidentally added a close() in the second test, which wasn't there in the original test.

The 3 initial reads can be fulfilled by one pull() call. However, since the stream hasn't reached its high water mark of 256 chunks yet, it should immediately call pull() again. There's a race between the 3 reads resolving and the next pull() starting, and it just so happens that the pull() happens first. Thus, the pullCount is already 2 by the time the test sees the result of the 3 reads.

Interestingly, since the high water mark is 256, the stream will keep calling pull() long after the test is done! Should this test really keep pull()ing so much?

EDIT3: Forget about edit 1, CallPullIfNeeded already calls ShouldCallPull, so this extra check wouldn't change anything.

@ricea
Copy link
Collaborator

ricea commented Feb 8, 2018

Interestingly, since the high water mark is 256, the stream will keep calling pull() long after the test is done! Should this test really keep pull()ing so much?

If pull() doesn't enqueue() then it should not be called again until something else happens (eg. a read() or an enqueue()).

I think the correct behaviour may depend on what kind of reader is in use. If the stream is locked to a byob reader then we'd like to wait to see whether a buffer is provided to read into. For a default reader we just want to fill the queue ASAP. I assume this is the reason why the behaviour with a default reader is a little off.

@tschneidereit I know you have been looking at the byte stream code recently. What do you think?

@MattiasBuelens
Copy link
Collaborator Author

MattiasBuelens commented Feb 8, 2018

@ricea Ah, I didn't think about BYOB requests. Yes, it'd be more efficient to wait until we know whether the next read comes from a default reader or a BYOB reader. However, wouldn't that imply that a byte stream locked to a BYOB reader will never attempt to fill its queue up to the high water mark? Also, what should happen if the stream isn't locked (yet)?

I don't think the stream's buffering should depend on the type of its current reader. If a user really wants all reads from a byte stream to be as efficient as possible, then they can just set the stream's high water mark to 0. That way, the stream will only pull when there are pending read requests.

@tschneidereit
Copy link
Contributor

@ricea, I'm on PTO and won't be able to spend too much time on digging into this until the week after next at the earliest, I'm afraid. Please don't gate any decisions here on my feedback if you want things to progress earlier than that.

@ricea
Copy link
Collaborator

ricea commented Feb 9, 2018

@tschneidereit Don't worry about it. I'm happy to take the time to fix this properly.

@ricea
Copy link
Collaborator

ricea commented Feb 15, 2018

I don't think the stream's buffering should depend on the type of its current reader. If a user really wants all reads from a byte stream to be as efficient as possible, then they can just set the stream's high water mark to 0. That way, the stream will only pull when there are pending read requests.

I'm concerned that this may leave the creators of readable byte streams in the uncomfortable position of having to choose between optimal behaviour for byob readers or optimal behaviour for default readers. However, I agree that depending on the type of the current reader is too problematic.

Do you think you can get "always pull() as long as the queue has space and progress is happening" semantics working? Even if it's not working, it would be helpful to see a work-in-progress that I could work from.

@MattiasBuelens
Copy link
Collaborator Author

The proposed fix has those semantics, that's exactly what CallPullIfNeeded achieves. I'll submit a pull request later today that implements this fix, then you can try it out yourself.

I feel like these should be the desired semantics. It also matches with the proposed detailed description for the underlying source API in #875:

pull(controller)

A function that is called whenever the stream’s internal queue of chunks becomes not full, i.e. whenever the queue’s desired size becomes positive. Generally, it will be called repeatedly until the queue reaches its high water mark (i.e. until the desired size becomes non-positive).

This function will not be called until start() successfully completes. Additionally, it will only be called repeatedly if it enqueues at least one chunk or fulfills a BYOB request; a no-op pull() implementation will not be continually called.

@domenic
Copy link
Member

domenic commented Mar 8, 2018

@MattiasBuelens are you still interested in a PR for fixing this?

@MattiasBuelens
Copy link
Collaborator Author

MattiasBuelens commented Mar 8, 2018

@domenic Sure, I'll submit a PR with my proposed solution. I thought I already did, but I guess I forgot. 😛

ricea pushed a commit that referenced this issue Mar 22, 2018
After a readable byte stream receives a new chunk from the underlying source, the stream must call pull() (if needed). Otherwise, the stream may get stuck.

Add calls to ReadableByteStreamControllerCallPullIfNeeded at the end of ReadableByteStreamControllerEnqueue and ReadableByteStreamControllerRespondInternal to ensure this.

Fixes #877.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Development

No branches or pull requests

4 participants