From 00f153da1352b941d8749758edb6083a6acda715 Mon Sep 17 00:00:00 2001 From: James M Snell Date: Tue, 30 Oct 2018 16:17:33 -0700 Subject: [PATCH] http2: improve http2 code a bit Multiple general improvements to http2 internals for readability and efficiency [This backport applied to v10.x cleanly.] Backport-PR-URL: https://github.com/nodejs/node/pull/29123 PR-URL: https://github.com/nodejs/node/pull/23984 Reviewed-By: Anna Henningsen Reviewed-By: Trivikram Kamat Reviewed-By: Ujjwal Sharma Reviewed-By: Matteo Collina --- benchmark/http2/headers.js | 3 +- benchmark/http2/respond-with-fd.js | 6 +- benchmark/http2/simple.js | 6 +- lib/internal/http2/util.js | 44 ++-- src/node_http2.cc | 347 +++++++++++++++-------------- 5 files changed, 212 insertions(+), 194 deletions(-) diff --git a/benchmark/http2/headers.js b/benchmark/http2/headers.js index ad1eb50007a92d..4e6585bbfd5f92 100644 --- a/benchmark/http2/headers.js +++ b/benchmark/http2/headers.js @@ -40,8 +40,7 @@ function main({ n, nheaders }) { function doRequest(remaining) { const req = client.request(headersObject); - req.end(); - req.on('data', () => {}); + req.resume(); req.on('end', () => { if (remaining > 0) { doRequest(remaining - 1); diff --git a/benchmark/http2/respond-with-fd.js b/benchmark/http2/respond-with-fd.js index cc9992e8ca0bd5..b8c6e81fd24b04 100644 --- a/benchmark/http2/respond-with-fd.js +++ b/benchmark/http2/respond-with-fd.js @@ -7,9 +7,9 @@ const fs = require('fs'); const file = path.join(path.resolve(__dirname, '../fixtures'), 'alice.html'); const bench = common.createBenchmark(main, { - requests: [100, 1000, 10000, 100000], - streams: [100, 200, 1000], - clients: [1, 2], + requests: [100, 1000, 5000], + streams: [1, 10, 20, 40, 100, 200], + clients: [2], benchmarker: ['h2load'] }, { flags: ['--no-warnings', '--expose-http2'] }); diff --git a/benchmark/http2/simple.js b/benchmark/http2/simple.js index cf017e6735411e..ce80dfd769c3c5 100644 --- a/benchmark/http2/simple.js +++ b/benchmark/http2/simple.js @@ -6,9 +6,9 @@ const fs = require('fs'); const file = path.join(path.resolve(__dirname, '../fixtures'), 'alice.html'); const bench = common.createBenchmark(main, { - requests: [100, 1000, 10000, 100000], - streams: [100, 200, 1000], - clients: [1, 2], + requests: [100, 1000, 5000], + streams: [1, 10, 20, 40, 100, 200], + clients: [2], benchmarker: ['h2load'] }, { flags: ['--no-warnings', '--expose-http2'] }); diff --git a/lib/internal/http2/util.js b/lib/internal/http2/util.js index 94dc1198ea1060..c8701af616f327 100644 --- a/lib/internal/http2/util.js +++ b/lib/internal/http2/util.js @@ -430,14 +430,20 @@ function mapToHeaders(map, let count = 0; const keys = Object.keys(map); const singles = new Set(); - for (var i = 0; i < keys.length; i++) { - let key = keys[i]; - let value = map[key]; + let i; + let isArray; + let key; + let value; + let isSingleValueHeader; + let err; + for (i = 0; i < keys.length; i++) { + key = keys[i]; + value = map[key]; if (value === undefined || key === '') continue; key = key.toLowerCase(); - const isSingleValueHeader = kSingleValueHeaders.has(key); - let isArray = Array.isArray(value); + isSingleValueHeader = kSingleValueHeaders.has(key); + isArray = Array.isArray(value); if (isArray) { switch (value.length) { case 0: @@ -459,26 +465,26 @@ function mapToHeaders(map, singles.add(key); } if (key[0] === ':') { - const err = assertValuePseudoHeader(key); + err = assertValuePseudoHeader(key); if (err !== undefined) return err; ret = `${key}\0${value}\0${ret}`; count++; - } else { - if (isIllegalConnectionSpecificHeader(key, value)) { - return new ERR_HTTP2_INVALID_CONNECTION_HEADERS(key); - } - if (isArray) { - for (var k = 0; k < value.length; k++) { - const val = String(value[k]); - ret += `${key}\0${val}\0`; - } - count += value.length; - } else { - ret += `${key}\0${value}\0`; - count++; + continue; + } + if (isIllegalConnectionSpecificHeader(key, value)) { + return new ERR_HTTP2_INVALID_CONNECTION_HEADERS(key); + } + if (isArray) { + for (var k = 0; k < value.length; k++) { + const val = String(value[k]); + ret += `${key}\0${val}\0`; } + count += value.length; + continue; } + ret += `${key}\0${value}\0`; + count++; } return [ret, count]; diff --git a/src/node_http2.cc b/src/node_http2.cc index 6bd282593e28f5..b87f80218eed23 100644 --- a/src/node_http2.cc +++ b/src/node_http2.cc @@ -913,8 +913,10 @@ int Http2Session::OnBeginHeadersCallback(nghttp2_session* handle, Debug(session, "beginning headers for stream %d", id); Http2Stream* stream = session->FindStream(id); - if (stream == nullptr) { - if (!session->CanAddStream()) { + // The common case is that we're creating a new stream. The less likely + // case is that we're receiving a set of trailers + if (LIKELY(stream == nullptr)) { + if (UNLIKELY(!session->CanAddStream())) { // Too many concurrent streams being opened nghttp2_submit_rst_stream(**session, NGHTTP2_FLAG_NONE, id, NGHTTP2_ENHANCE_YOUR_CALM); @@ -942,7 +944,7 @@ int Http2Session::OnHeaderCallback(nghttp2_session* handle, // If stream is null at this point, either something odd has happened // or the stream was closed locally while header processing was occurring. // either way, do not proceed and close the stream. - if (stream == nullptr) + if (UNLIKELY(stream == nullptr)) return NGHTTP2_ERR_TEMPORAL_CALLBACK_FAILURE; // If the stream has already been destroyed, ignore. @@ -957,7 +959,7 @@ int Http2Session::OnHeaderCallback(nghttp2_session* handle, // Called by nghttp2 when a complete HTTP2 frame has been received. There are -// only a handful of frame types tha we care about handling here. +// only a handful of frame types that we care about handling here. int Http2Session::OnFrameReceive(nghttp2_session* handle, const nghttp2_frame* frame, void* user_data) { @@ -1034,22 +1036,25 @@ int Http2Session::OnFrameNotSent(nghttp2_session* handle, Environment* env = session->env(); Debug(session, "frame type %d was not sent, code: %d", frame->hd.type, error_code); - // Do not report if the frame was not sent due to the session closing - if (error_code != NGHTTP2_ERR_SESSION_CLOSING && - error_code != NGHTTP2_ERR_STREAM_CLOSED && - error_code != NGHTTP2_ERR_STREAM_CLOSING) { - Isolate* isolate = env->isolate(); - HandleScope scope(isolate); - Local context = env->context(); - Context::Scope context_scope(context); - Local argv[3] = { - Integer::New(isolate, frame->hd.stream_id), - Integer::New(isolate, frame->hd.type), - Integer::New(isolate, error_code) - }; - session->MakeCallback(env->onframeerror_string(), arraysize(argv), argv); + // Do not report if the frame was not sent due to the session closing + if (error_code == NGHTTP2_ERR_SESSION_CLOSING || + error_code == NGHTTP2_ERR_STREAM_CLOSED || + error_code == NGHTTP2_ERR_STREAM_CLOSING) { + return 0; } + + Isolate* isolate = env->isolate(); + HandleScope scope(isolate); + Local context = env->context(); + Context::Scope context_scope(context); + + Local argv[3] = { + Integer::New(isolate, frame->hd.stream_id), + Integer::New(isolate, frame->hd.type), + Integer::New(isolate, error_code) + }; + session->MakeCallback(env->onframeerror_string(), arraysize(argv), argv); return 0; } @@ -1076,25 +1081,26 @@ int Http2Session::OnStreamClose(nghttp2_session* handle, Http2Stream* stream = session->FindStream(id); // Intentionally ignore the callback if the stream does not exist or has // already been destroyed - if (stream != nullptr && !stream->IsDestroyed()) { - stream->Close(code); - // It is possible for the stream close to occur before the stream is - // ever passed on to the javascript side. If that happens, skip straight - // to destroying the stream. We can check this by looking for the - // onstreamclose function. If it exists, then the stream has already - // been passed on to javascript. - Local fn = - stream->object()->Get(context, env->onstreamclose_string()) - .ToLocalChecked(); - if (fn->IsFunction()) { - Local argv[] = { - Integer::NewFromUnsigned(isolate, code) - }; - stream->MakeCallback(fn.As(), arraysize(argv), argv); - } else { - stream->Destroy(); - } + if (stream == nullptr || stream->IsDestroyed()) + return 0; + + stream->Close(code); + // It is possible for the stream close to occur before the stream is + // ever passed on to the javascript side. If that happens, skip straight + // to destroying the stream. We can check this by looking for the + // onstreamclose function. If it exists, then the stream has already + // been passed on to javascript. + Local fn = + stream->object()->Get(context, env->onstreamclose_string()) + .ToLocalChecked(); + + if (!fn->IsFunction()) { + stream->Destroy(); + return 0; } + + Local arg = Integer::NewFromUnsigned(isolate, code); + stream->MakeCallback(fn.As(), 1, &arg); return 0; } @@ -1127,53 +1133,56 @@ int Http2Session::OnDataChunkReceived(nghttp2_session* handle, "%d, flags: %d", id, len, flags); Environment* env = session->env(); HandleScope scope(env->isolate()); + // We should never actually get a 0-length chunk so this check is // only a precaution at this point. - if (len > 0) { - // Notify nghttp2 that we've consumed a chunk of data on the connection - // so that it can send a WINDOW_UPDATE frame. This is a critical part of - // the flow control process in http2 - CHECK_EQ(nghttp2_session_consume_connection(handle, len), 0); - Http2Stream* stream = session->FindStream(id); - // If the stream has been destroyed, ignore this chunk - if (stream->IsDestroyed()) - return 0; - - stream->statistics_.received_bytes += len; - - // Repeatedly ask the stream's owner for memory, and copy the read data - // into those buffers. - // The typical case is actually the exception here; Http2StreamListeners - // know about the HTTP2 session associated with this stream, so they know - // about the larger from-socket read buffer, so they do not require copying. - do { - uv_buf_t buf = stream->EmitAlloc(len); - ssize_t avail = len; - if (static_cast(buf.len) < avail) - avail = buf.len; - - // `buf.base == nullptr` is the default Http2StreamListener's way - // of saying that it wants a pointer to the raw original. - // Since it has access to the original socket buffer from which the data - // was read in the first place, it can use that to minimize ArrayBuffer - // allocations. - if (LIKELY(buf.base == nullptr)) - buf.base = reinterpret_cast(const_cast(data)); - else - memcpy(buf.base, data, avail); - data += avail; - len -= avail; - stream->EmitRead(avail, buf); - - // If the stream owner (e.g. the JS Http2Stream) wants more data, just - // tell nghttp2 that all data has been consumed. Otherwise, defer until - // more data is being requested. - if (stream->IsReading()) - nghttp2_session_consume_stream(handle, id, avail); - else - stream->inbound_consumed_data_while_paused_ += avail; - } while (len != 0); - } + if (len == 0) + return 0; + + // Notify nghttp2 that we've consumed a chunk of data on the connection + // so that it can send a WINDOW_UPDATE frame. This is a critical part of + // the flow control process in http2 + CHECK_EQ(nghttp2_session_consume_connection(handle, len), 0); + Http2Stream* stream = session->FindStream(id); + // If the stream has been destroyed, ignore this chunk + if (stream->IsDestroyed()) + return 0; + + stream->statistics_.received_bytes += len; + + // Repeatedly ask the stream's owner for memory, and copy the read data + // into those buffers. + // The typical case is actually the exception here; Http2StreamListeners + // know about the HTTP2 session associated with this stream, so they know + // about the larger from-socket read buffer, so they do not require copying. + do { + uv_buf_t buf = stream->EmitAlloc(len); + ssize_t avail = len; + if (static_cast(buf.len) < avail) + avail = buf.len; + + // `buf.base == nullptr` is the default Http2StreamListener's way + // of saying that it wants a pointer to the raw original. + // Since it has access to the original socket buffer from which the data + // was read in the first place, it can use that to minimize ArrayBuffer + // allocations. + if (LIKELY(buf.base == nullptr)) + buf.base = reinterpret_cast(const_cast(data)); + else + memcpy(buf.base, data, avail); + data += avail; + len -= avail; + stream->EmitRead(avail, buf); + + // If the stream owner (e.g. the JS Http2Stream) wants more data, just + // tell nghttp2 that all data has been consumed. Otherwise, defer until + // more data is being requested. + if (stream->IsReading()) + nghttp2_session_consume_stream(handle, id, avail); + else + stream->inbound_consumed_data_while_paused_ += avail; + } while (len != 0); + return 0; } @@ -1435,7 +1444,7 @@ void Http2Session::HandleOriginFrame(const nghttp2_frame* frame) { nghttp2_extension ext = frame->ext; nghttp2_ext_origin* origin = static_cast(ext.payload); - Local holder = Array::New(isolate); + Local holder = Array::New(isolate); Local fn = env()->push_values_to_array_function(); Local argv[NODE_PUSH_VAL_TO_ARRAY_MAX]; @@ -1454,9 +1463,7 @@ void Http2Session::HandleOriginFrame(const nghttp2_frame* frame) { fn->Call(context, holder, j, argv).ToLocalChecked(); } - Local args[1] = { holder }; - - MakeCallback(env()->onorigin_string(), arraysize(args), args); + MakeCallback(env()->onorigin_string(), 1, &holder); } // Called by OnFrameReceived when a complete PING frame has been received. @@ -1469,9 +1476,8 @@ void Http2Session::HandlePingFrame(const nghttp2_frame* frame) { bool ack = frame->hd.flags & NGHTTP2_FLAG_ACK; if (ack) { Http2Ping* ping = PopPing(); - if (ping != nullptr) { - ping->Done(true, frame->ping.opaque_data); - } else { + + if (ping == nullptr) { // PING Ack is unsolicited. Treat as a connection error. The HTTP/2 // spec does not require this, but there is no legitimate reason to // receive an unsolicited PING ack on a connection. Either the peer @@ -1479,46 +1485,51 @@ void Http2Session::HandlePingFrame(const nghttp2_frame* frame) { // nonsense. arg = Integer::New(isolate, NGHTTP2_ERR_PROTO); MakeCallback(env()->error_string(), 1, &arg); + return; } - } else { - // Notify the session that a ping occurred - arg = Buffer::Copy(env(), - reinterpret_cast(frame->ping.opaque_data), - 8).ToLocalChecked(); - MakeCallback(env()->onping_string(), 1, &arg); + + ping->Done(true, frame->ping.opaque_data); + return; } + + // Notify the session that a ping occurred + arg = Buffer::Copy(env(), + reinterpret_cast(frame->ping.opaque_data), + 8).ToLocalChecked(); + MakeCallback(env()->onping_string(), 1, &arg); } // Called by OnFrameReceived when a complete SETTINGS frame has been received. void Http2Session::HandleSettingsFrame(const nghttp2_frame* frame) { bool ack = frame->hd.flags & NGHTTP2_FLAG_ACK; - if (ack) { - // If this is an acknowledgement, we should have an Http2Settings - // object for it. - Http2Settings* settings = PopSettings(); - if (settings != nullptr) { - settings->Done(true); - } else { - // SETTINGS Ack is unsolicited. Treat as a connection error. The HTTP/2 - // spec does not require this, but there is no legitimate reason to - // receive an unsolicited SETTINGS ack on a connection. Either the peer - // is buggy or malicious, and we're not going to tolerate such - // nonsense. - // Note that nghttp2 currently prevents this from happening for SETTINGS - // frames, so this block is purely defensive just in case that behavior - // changes. Specifically, unlike unsolicited PING acks, unsolicited - // SETTINGS acks should *never* make it this far. - Isolate* isolate = env()->isolate(); - HandleScope scope(isolate); - Local context = env()->context(); - Context::Scope context_scope(context); - Local arg = Integer::New(isolate, NGHTTP2_ERR_PROTO); - MakeCallback(env()->error_string(), 1, &arg); - } - } else { - // Otherwise, notify the session about a new settings + if (!ack) { + // This is not a SETTINGS acknowledgement, notify and return MakeCallback(env()->onsettings_string(), 0, nullptr); + return; } + + // If this is an acknowledgement, we should have an Http2Settings + // object for it. + Http2Settings* settings = PopSettings(); + if (settings != nullptr) { + settings->Done(true); + return; + } + // SETTINGS Ack is unsolicited. Treat as a connection error. The HTTP/2 + // spec does not require this, but there is no legitimate reason to + // receive an unsolicited SETTINGS ack on a connection. Either the peer + // is buggy or malicious, and we're not going to tolerate such + // nonsense. + // Note that nghttp2 currently prevents this from happening for SETTINGS + // frames, so this block is purely defensive just in case that behavior + // changes. Specifically, unlike unsolicited PING acks, unsolicited + // SETTINGS acks should *never* make it this far. + Isolate* isolate = env()->isolate(); + HandleScope scope(isolate); + Local context = env()->context(); + Context::Scope context_scope(context); + Local arg = Integer::New(isolate, NGHTTP2_ERR_PROTO); + MakeCallback(env()->error_string(), 1, &arg); } // Callback used when data has been written to the stream. @@ -1540,7 +1551,10 @@ void Http2Session::OnStreamAfterWrite(WriteWrap* w, int status) { // queue), but only if a write has not already been scheduled. void Http2Session::MaybeScheduleWrite() { CHECK_EQ(flags_ & SESSION_STATE_WRITE_SCHEDULED, 0); - if (session_ != nullptr && nghttp2_session_want_write(session_)) { + if (UNLIKELY(session_ == nullptr)) + return; + + if (nghttp2_session_want_write(session_)) { HandleScope handle_scope(env()->isolate()); Debug(this, "scheduling write"); flags_ |= SESSION_STATE_WRITE_SCHEDULED; @@ -1599,7 +1613,7 @@ void Http2Session::ClearOutgoing(int status) { for (int32_t stream_id : current_pending_rst_streams) { Http2Stream* stream = FindStream(stream_id); - if (stream != nullptr) + if (LIKELY(stream != nullptr)) stream->FlushRstStream(); } } @@ -1774,7 +1788,7 @@ Http2Stream* Http2Session::SubmitRequest( Http2Stream::Provider::Stream prov(options); *ret = nghttp2_submit_request(session_, prispec, nva, len, *prov, nullptr); CHECK_NE(*ret, NGHTTP2_ERR_NOMEM); - if (*ret > 0) + if (LIKELY(*ret > 0)) stream = new Http2Stream(this, *ret, NGHTTP2_HCAT_HEADERS, options); return stream; } @@ -1789,59 +1803,58 @@ void Http2Session::OnStreamRead(ssize_t nread, const uv_buf_t& buf) { IncrementCurrentSessionMemory(buf.len); CHECK(stream_buf_ab_.IsEmpty()); + OnScopeLeave on_scope_leave([&]() { + // Once finished handling this write, reset the stream buffer. + // The memory has either been free()d or was handed over to V8. + DecrementCurrentSessionMemory(buf.len); + stream_buf_ab_ = Local(); + stream_buf_ = uv_buf_init(nullptr, 0); + }); + + // Only pass data on if nread > 0 if (nread <= 0) { free(buf.base); if (nread < 0) { PassReadErrorToPreviousListener(nread); } - } else { - // Only pass data on if nread > 0 - - // Makre sure that there was no read previously active. - CHECK_NULL(stream_buf_.base); - CHECK_EQ(stream_buf_.len, 0); - - // Remember the current buffer, so that OnDataChunkReceived knows the - // offset of a DATA frame's data into the socket read buffer. - stream_buf_ = uv_buf_init(buf.base, nread); - - // Verify that currently: There is memory allocated into which - // the data has been read, and that memory buffer is at least as large - // as the amount of data we have read, but we have not yet made an - // ArrayBuffer out of it. - CHECK_LE(static_cast(nread), stream_buf_.len); - - Isolate* isolate = env()->isolate(); - - // Create an array buffer for the read data. DATA frames will be emitted - // as slices of this array buffer to avoid having to copy memory. - stream_buf_ab_ = - ArrayBuffer::New(isolate, - buf.base, - nread, - v8::ArrayBufferCreationMode::kInternalized); - - statistics_.data_received += nread; - ssize_t ret = Write(&stream_buf_, 1); - - if (ret < 0) { - Debug(this, "fatal error receiving data: %d", ret); - - Local argv[] = { - Integer::New(isolate, ret), - }; - MakeCallback(env()->error_string(), arraysize(argv), argv); - } else { - MaybeStopReading(); - } + return; } - // Since we are finished handling this write, reset the stream buffer. - // The memory has either been free()d or was handed over to V8. - DecrementCurrentSessionMemory(buf.len); + // Make sure that there was no read previously active. + CHECK_NULL(stream_buf_.base); + CHECK_EQ(stream_buf_.len, 0); + + // Remember the current buffer, so that OnDataChunkReceived knows the + // offset of a DATA frame's data into the socket read buffer. + stream_buf_ = uv_buf_init(buf.base, nread); + + // Verify that currently: There is memory allocated into which + // the data has been read, and that memory buffer is at least as large + // as the amount of data we have read, but we have not yet made an + // ArrayBuffer out of it. + CHECK_LE(static_cast(nread), stream_buf_.len); + + Isolate* isolate = env()->isolate(); - stream_buf_ab_ = Local(); - stream_buf_ = uv_buf_init(nullptr, 0); + // Create an array buffer for the read data. DATA frames will be emitted + // as slices of this array buffer to avoid having to copy memory. + stream_buf_ab_ = + ArrayBuffer::New(isolate, + buf.base, + nread, + v8::ArrayBufferCreationMode::kInternalized); + + statistics_.data_received += nread; + ssize_t ret = Write(&stream_buf_, 1); + + if (UNLIKELY(ret < 0)) { + Debug(this, "fatal error receiving data: %d", ret); + Local arg = Integer::New(isolate, ret); + MakeCallback(env()->error_string(), 1, &arg); + return; + } + + MaybeStopReading(); } bool Http2Session::HasWritesOnSocketForStream(Http2Stream* stream) {