Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[v10.x] http2: support non-empty DATA frame with END_STREAM flag #34857

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions doc/api/http2.md
Original file line number Diff line number Diff line change
Expand Up @@ -903,8 +903,9 @@ the value is `undefined`, the stream is not yet ready for use.
All [`Http2Stream`][] instances are destroyed either when:

* An `RST_STREAM` frame for the stream is received by the connected peer,
and pending data has been read.
* The `http2stream.close()` method is called, and pending data has been read.
and (for client streams only) pending data has been read.
* The `http2stream.close()` method is called, and (for client streams only)
pending data has been read.
* The `http2stream.destroy()` or `http2session.destroy()` methods are called.

When an `Http2Stream` instance is destroyed, an attempt will be made to send an
Expand Down
15 changes: 8 additions & 7 deletions src/node_http2.cc
Original file line number Diff line number Diff line change
Expand Up @@ -882,7 +882,7 @@ ssize_t Http2Session::OnCallbackPadding(size_t frameLen,
// quite expensive. This is a potential performance optimization target later.
ssize_t Http2Session::ConsumeHTTP2Data() {
CHECK_NOT_NULL(stream_buf_.base);
CHECK_LT(stream_buf_offset_, stream_buf_.len);
CHECK_LE(stream_buf_offset_, stream_buf_.len);
size_t read_len = stream_buf_.len - stream_buf_offset_;

// multiple side effects.
Expand All @@ -903,11 +903,11 @@ ssize_t Http2Session::ConsumeHTTP2Data() {
CHECK_GT(ret, 0);
CHECK_LE(static_cast<size_t>(ret), read_len);

if (static_cast<size_t>(ret) < read_len) {
// Mark the remainder of the data as available for later consumption.
stream_buf_offset_ += ret;
return ret;
}
// Mark the remainder of the data as available for later consumption.
// Even if all bytes were received, a paused stream may delay the
// nghttp2_on_frame_recv_callback which may have an END_STREAM flag.
stream_buf_offset_ += ret;
return ret;
}

// We are done processing the current input chunk.
Expand Down Expand Up @@ -1241,6 +1241,7 @@ int Http2Session::OnDataChunkReceived(nghttp2_session* handle,
if (session->flags_ & SESSION_STATE_WRITE_IN_PROGRESS) {
CHECK_NE(session->flags_ & SESSION_STATE_READING_STOPPED, 0);
session->flags_ |= SESSION_STATE_NGHTTP2_RECV_PAUSED;
Debug(session, "receive paused");
return NGHTTP2_ERR_PAUSE;
}

Expand Down Expand Up @@ -2452,7 +2453,7 @@ ssize_t Http2Stream::Provider::Stream::OnRead(nghttp2_session* handle,
return NGHTTP2_ERR_DEFERRED;
}

if (stream->queue_.empty() && !stream->IsWritable()) {
if (stream->available_outbound_length_ == 0 && !stream->IsWritable()) {
Debug(session, "no more data for stream %d", id);
*flags |= NGHTTP2_DATA_FLAG_EOF;
if (stream->HasTrailers()) {
Expand Down
56 changes: 39 additions & 17 deletions test/parallel/test-http2-misbehaving-multiplex.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
// Flags: --expose-internals

const common = require('../common');
const assert = require('assert');

if (!common.hasCrypto)
common.skip('missing crypto');
Expand All @@ -13,16 +14,36 @@ const h2test = require('../common/http2');
let client;

const server = h2.createServer();
let gotFirstStreamId1;
server.on('stream', common.mustCall((stream) => {
stream.respond();
stream.end('ok');

// the error will be emitted asynchronously
stream.on('error', common.expectsError({
type: NghttpError,
code: 'ERR_HTTP2_ERROR',
message: 'Stream was already closed or invalid'
}));
// Http2Server should be fast enough to respond to and close
// the first streams with ID 1 and ID 3 without errors.

// Test for errors in 'close' event to ensure no errors on some streams.
stream.on('error', () => {});
stream.on('close', (err) => {
if (stream.id === 1) {
if (gotFirstStreamId1) {
// We expect our outgoing frames to fail on Stream ID 1 the second time
// because a stream with ID 1 was already closed before.
common.expectsError({
constructor: NghttpError,
code: 'ERR_HTTP2_ERROR',
message: 'Stream was already closed or invalid'
});
return;
}
gotFirstStreamId1 = true;
}
assert.strictEqual(err, undefined);
});

// Stream ID 5 should never reach the server
assert.notStrictEqual(stream.id, 5);

}, 2));

server.on('session', common.mustCall((session) => {
Expand All @@ -35,26 +56,27 @@ server.on('session', common.mustCall((session) => {

const settings = new h2test.SettingsFrame();
const settingsAck = new h2test.SettingsFrame(true);
const head1 = new h2test.HeadersFrame(1, h2test.kFakeRequestHeaders, 0, true);
const head2 = new h2test.HeadersFrame(3, h2test.kFakeRequestHeaders, 0, true);
const head3 = new h2test.HeadersFrame(1, h2test.kFakeRequestHeaders, 0, true);
const head4 = new h2test.HeadersFrame(5, h2test.kFakeRequestHeaders, 0, true);
// HeadersFrame(id, payload, padding, END_STREAM)
const id1 = new h2test.HeadersFrame(1, h2test.kFakeRequestHeaders, 0, true);
const id3 = new h2test.HeadersFrame(3, h2test.kFakeRequestHeaders, 0, true);
const id5 = new h2test.HeadersFrame(5, h2test.kFakeRequestHeaders, 0, true);

server.listen(0, () => {
client = net.connect(server.address().port, () => {
client.write(h2test.kClientMagic, () => {
client.write(settings.data, () => {
client.write(settingsAck.data);
// This will make it ok.
client.write(head1.data, () => {
// This will make it ok.
client.write(head2.data, () => {
// Stream ID 1 frame will make it OK.
client.write(id1.data, () => {
// Stream ID 3 frame will make it OK.
client.write(id3.data, () => {
// A second Stream ID 1 frame should fail.
// This will cause an error to occur because the client is
// attempting to reuse an already closed stream. This must
// cause the server session to be torn down.
client.write(head3.data, () => {
// This won't ever make it to the server
client.write(head4.data);
client.write(id1.data, () => {
// This Stream ID 5 frame will never make it to the server
client.write(id5.data);
});
});
});
Expand Down
65 changes: 65 additions & 0 deletions test/parallel/test-http2-pack-end-stream-flag.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
'use strict';

const common = require('../common');
if (!common.hasCrypto)
common.skip('missing crypto');
const assert = require('assert');
const http2 = require('http2');

const { PerformanceObserver } = require('perf_hooks');

const server = http2.createServer();

server.on('stream', (stream, headers) => {
stream.respond({
'content-type': 'text/html',
':status': 200
});
switch (headers[':path']) {
case '/singleEnd':
stream.end('OK');
// Backport v10.x: Manually pack END_STREAM flag
stream._final(() => {});
break;
case '/sequentialEnd':
stream.write('OK');
stream.end();
// Backport v10.x: Manually pack END_STREAM flag
stream._final(() => {});
break;
case '/delayedEnd':
stream.write('OK', () => stream.end());
break;
}
});

function testRequest(path, targetFrameCount, callback) {
const obs = new PerformanceObserver((list, observer) => {
const entry = list.getEntries()[0];
if (entry.name !== 'Http2Session') return;
if (entry.type !== 'client') return;
assert.strictEqual(entry.framesReceived, targetFrameCount);
observer.disconnect();
callback();
});
obs.observe({ entryTypes: ['http2'] });
const client = http2.connect(`http://localhost:${server.address().port}`, () => {
const req = client.request({ ':path': path });
req.resume();
req.end();
req.on('end', () => client.close());
});
}

// SETTINGS => SETTINGS => HEADERS => DATA
const MIN_FRAME_COUNT = 4;

server.listen(0, () => {
testRequest('/singleEnd', MIN_FRAME_COUNT, () => {
testRequest('/sequentialEnd', MIN_FRAME_COUNT, () => {
testRequest('/delayedEnd', MIN_FRAME_COUNT + 1, () => {
server.close();
});
});
});
});