Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions src/quic/streams.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1512,6 +1512,9 @@ void Stream::EndWriting() {
void Stream::EntryRead(size_t amount) {
// Called when the JS consumer reads data from the inbound DataQueue.
// Extend the flow control window so the sender can transmit more.
//
Comment thread
pimterry marked this conversation as resolved.
Outdated
if (session().is_destroyed()) return;
Session::SendPendingDataScope send_scope(&session());
session().ExtendStreamOffset(id(), amount);
session().ExtendOffset(amount);
}
Expand Down
63 changes: 63 additions & 0 deletions test/parallel/test-quic-stream-read-after-blocked.mjs
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
// Flags: --experimental-quic --experimental-stream-iter --no-warnings

// Test: a sender blocked on flow control is resumed once the consumer
// starts reading. This confirms that we do flush MAX_STREAM_DATA
// frames as expected when the consumer reads.

import { hasQuic, skip, mustCall, mustCallAtLeast } from '../common/index.mjs';
import { setTimeout as delay } from 'node:timers/promises';
import assert from 'node:assert';

const { strictEqual, deepStrictEqual } = assert;

if (!hasQuic) {
skip('QUIC is not enabled');
}

const { listen, connect } = await import('../common/quic.mjs');
const { bytes } = await import('stream/iter');

// Larger than the default 256 KB stream flow-control window:
const size = 1024 * 1024;
const expected = new Uint8Array(size);
for (let i = 0; i < size; i++) expected[i] = i & 0xff;

const senderBlocked = Promise.withResolvers();
const done = Promise.withResolvers();

const serverEndpoint = await listen(mustCall((serverSession) => {
serverSession.onstream = mustCall(async (stream) => {
stream.onblocked = mustCallAtLeast(() => senderBlocked.resolve(stream), 1);
stream.setBody(expected);
await stream.closed;
});
}));

const clientSession = await connect(serverEndpoint.address);
await clientSession.opened;

// Write a byte to open the stream:
const stream = await clientSession.createBidirectionalStream();
await stream.writer.write(new Uint8Array([1]));

// Wait until the sender has filled the window and blocked.
const serverStream = await senderBlocked.promise;

// Poll until everything has been acked, so the stream is fully idle:
while (serverStream.stats.maxOffsetAcknowledged !== serverStream.stats.bytesSent) {
await delay(1);
}

// Try to read:
const received = await bytes(stream);
strictEqual(received.byteLength, expected.byteLength);
deepStrictEqual(received, expected);

stream.writer.endSync();
await stream.closed;
clientSession.close();
done.resolve();

await done.promise;
await clientSession.closed;
Comment thread
pimterry marked this conversation as resolved.
Outdated
await serverEndpoint.close();
Loading