Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

http2: only schedule write when necessary #17183

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 9 additions & 2 deletions src/env-inl.h
Original file line number Diff line number Diff line change
Expand Up @@ -483,8 +483,15 @@ Environment::scheduled_immediate_count() {
return scheduled_immediate_count_;
}

void Environment::SetImmediate(native_immediate_callback cb, void* data) {
native_immediate_callbacks_.push_back({ cb, data });
void Environment::SetImmediate(native_immediate_callback cb,
void* data,
v8::Local<v8::Object> obj) {
native_immediate_callbacks_.push_back({
cb,
data,
std::unique_ptr<v8::Persistent<v8::Object>>(
obj.IsEmpty() ? nullptr : new v8::Persistent<v8::Object>(isolate_, obj))
});
if (scheduled_immediate_count_[0] == 0)
ActivateImmediateCheck();
scheduled_immediate_count_[0] = scheduled_immediate_count_[0] + 1;
Expand Down
2 changes: 2 additions & 0 deletions src/env.cc
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,8 @@ void Environment::RunAndClearNativeImmediates() {
native_immediate_callbacks_.swap(list);
for (const auto& cb : list) {
cb.cb_(this, cb.data_);
if (cb.keep_alive_)
cb.keep_alive_->Reset();
}

#ifdef DEBUG
Expand Down
7 changes: 6 additions & 1 deletion src/env.h
Original file line number Diff line number Diff line change
Expand Up @@ -686,7 +686,11 @@ class Environment {
bool EmitNapiWarning();

typedef void (*native_immediate_callback)(Environment* env, void* data);
inline void SetImmediate(native_immediate_callback cb, void* data);
// cb will be called as cb(env, data) on the next event loop iteration.
// obj will be kept alive between now and after the callback has run.
inline void SetImmediate(native_immediate_callback cb,
void* data,
v8::Local<v8::Object> obj = v8::Local<v8::Object>());
// This needs to be available for the JS-land setImmediate().
void ActivateImmediateCheck();

Expand Down Expand Up @@ -751,6 +755,7 @@ class Environment {
struct NativeImmediateCallback {
native_immediate_callback cb_;
void* data_;
std::unique_ptr<v8::Persistent<v8::Object>> keep_alive_;
};
std::vector<NativeImmediateCallback> native_immediate_callbacks_;
void RunAndClearNativeImmediates();
Expand Down
107 changes: 66 additions & 41 deletions src/node_http2.cc
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,26 @@ const Http2Session::Callbacks Http2Session::callback_struct_saved[2] = {
Callbacks(false),
Callbacks(true)};

Http2Scope::Http2Scope(Http2Stream* stream) : Http2Scope(stream->session()) {}

Http2Scope::Http2Scope(Http2Session* session) {
if (session->flags_ & (SESSION_STATE_HAS_SCOPE |
SESSION_STATE_WRITE_SCHEDULED)) {
// There is another scope further below on the stack, or it is already
// known that a write is scheduled. In either case, there is nothing to do.
return;
}
session->flags_ |= SESSION_STATE_HAS_SCOPE;
session_ = session;
}

Http2Scope::~Http2Scope() {
if (session_ == nullptr)
return;

session_->flags_ &= ~SESSION_STATE_HAS_SCOPE;
session_->MaybeScheduleWrite();
}

Http2Options::Http2Options(Environment* env) {
nghttp2_option_new(&options_);
Expand Down Expand Up @@ -346,8 +366,6 @@ Http2Session::Http2Session(Environment* env,
// be catching before it gets this far. Either way, crash if this
// fails.
CHECK_EQ(fn(&session_, callbacks, this, *opts), 0);

Start();
}


Expand All @@ -356,40 +374,6 @@ Http2Session::~Http2Session() {
Close();
}

// For every node::Http2Session instance, there is a uv_prepare_t handle
// whose callback is triggered on every tick of the event loop. When
// run, nghttp2 is prompted to send any queued data it may have stored.
// TODO(jasnell): Currently, this creates one uv_prepare_t per Http2Session,
// we should investigate to see if it's faster to create a
// single uv_prepare_t for all Http2Sessions, then iterate
// over each.
void Http2Session::Start() {
prep_ = new uv_prepare_t();
uv_prepare_init(env()->event_loop(), prep_);
prep_->data = static_cast<void*>(this);
uv_prepare_start(prep_, [](uv_prepare_t* t) {
Http2Session* session = static_cast<Http2Session*>(t->data);
HandleScope scope(session->env()->isolate());
Context::Scope context_scope(session->env()->context());

// Sending data may call arbitrary JS code, so keep track of
// async context.
InternalCallbackScope callback_scope(session);
session->SendPendingData();
});
}

// Stop the uv_prep_t from further activity, destroy the handle
void Http2Session::Stop() {
DEBUG_HTTP2SESSION(this, "stopping uv_prep_t handle");
CHECK_EQ(uv_prepare_stop(prep_), 0);
auto prep_close = [](uv_handle_t* handle) {
delete reinterpret_cast<uv_prepare_t*>(handle);
};
uv_close(reinterpret_cast<uv_handle_t*>(prep_), prep_close);
prep_ = nullptr;
}


void Http2Session::Close() {
DEBUG_HTTP2SESSION(this, "closing session");
Expand All @@ -406,10 +390,12 @@ void Http2Session::Close() {

while (!outstanding_pings_.empty()) {
Http2Session::Http2Ping* ping = PopPing();
ping->Done(false);
// Since this method may be called from GC, calling into JS directly
// is not allowed.
env()->SetImmediate([](Environment* env, void* data) {
static_cast<Http2Session::Http2Ping*>(data)->Done(false);
}, static_cast<void*>(ping));
}

Stop();
}


Expand Down Expand Up @@ -480,6 +466,7 @@ inline void Http2Session::SubmitShutdownNotice() {
inline void Http2Session::Settings(const nghttp2_settings_entry iv[],
size_t niv) {
DEBUG_HTTP2SESSION2(this, "submitting %d settings", niv);
Http2Scope h2scope(this);
// This will fail either if the system is out of memory, or if the settings
// values are not within the appropriate range. We should be catching the
// latter before it gets this far so crash in either case.
Expand Down Expand Up @@ -732,7 +719,8 @@ Http2Stream::SubmitTrailers::SubmitTrailers(


inline void Http2Stream::SubmitTrailers::Submit(nghttp2_nv* trailers,
size_t length) const {
size_t length) const {
Http2Scope h2scope(session_);
if (length == 0)
return;
DEBUG_HTTP2SESSION2(session_, "sending trailers for stream %d, count: %d",
Expand Down Expand Up @@ -887,14 +875,37 @@ inline void Http2Session::HandleSettingsFrame(const nghttp2_frame* frame) {
MakeCallback(env()->onsettings_string(), arraysize(argv), argv);
}

void Http2Session::MaybeScheduleWrite() {
CHECK_EQ(flags_ & SESSION_STATE_WRITE_SCHEDULED, 0);
if (session_ != nullptr && nghttp2_session_want_write(session_)) {
flags_ |= SESSION_STATE_WRITE_SCHEDULED;
env()->SetImmediate([](Environment* env, void* data) {
Http2Session* session = static_cast<Http2Session*>(data);
if (session->session_ == nullptr ||
!(session->flags_ & SESSION_STATE_WRITE_SCHEDULED)) {
// This can happen e.g. when a stream was reset before this turn
// of the event loop, in which case SendPendingData() is called early,
// or the session was destroyed in the meantime.
return;
}

// Sending data may call arbitrary JS code, so keep track of
// async context.
InternalCallbackScope callback_scope(session);
session->SendPendingData();
}, static_cast<void*>(this), object());
}
}


inline void Http2Session::SendPendingData() {
void Http2Session::SendPendingData() {
DEBUG_HTTP2SESSION(this, "sending pending data");
// Do not attempt to send data on the socket if the destroying flag has
// been set. That means everything is shutting down and the socket
// will not be usable.
if (IsDestroying())
return;
flags_ &= ~SESSION_STATE_WRITE_SCHEDULED;

WriteWrap* req = nullptr;
char* dest = nullptr;
Expand Down Expand Up @@ -959,6 +970,7 @@ inline Http2Stream* Http2Session::SubmitRequest(
int32_t* ret,
int options) {
DEBUG_HTTP2SESSION(this, "submitting request");
Http2Scope h2scope(this);
Http2Stream* stream = nullptr;
Http2Stream::Provider::Stream prov(options);
*ret = nghttp2_submit_request(session_, prispec, nva, len, *prov, nullptr);
Expand Down Expand Up @@ -1018,6 +1030,7 @@ void Http2Session::OnStreamReadImpl(ssize_t nread,
uv_handle_type pending,
void* ctx) {
Http2Session* session = static_cast<Http2Session*>(ctx);
Http2Scope h2scope(session);
if (nread < 0) {
uv_buf_t tmp_buf;
tmp_buf.base = nullptr;
Expand Down Expand Up @@ -1183,6 +1196,7 @@ inline void Http2Stream::Close(int32_t code) {


inline void Http2Stream::Shutdown() {
Http2Scope h2scope(this);
flags_ |= NGHTTP2_STREAM_FLAG_SHUT;
CHECK_NE(nghttp2_session_resume_data(session_->session(), id_),
NGHTTP2_ERR_NOMEM);
Expand All @@ -1197,6 +1211,7 @@ int Http2Stream::DoShutdown(ShutdownWrap* req_wrap) {
}

inline void Http2Stream::Destroy() {
Http2Scope h2scope(this);
DEBUG_HTTP2STREAM(this, "destroying stream");
// Do nothing if this stream instance is already destroyed
if (IsDestroyed())
Expand Down Expand Up @@ -1248,6 +1263,7 @@ void Http2Stream::OnDataChunk(


inline void Http2Stream::FlushDataChunks() {
Http2Scope h2scope(this);
if (!data_chunks_.empty()) {
uv_buf_t buf = data_chunks_.front();
data_chunks_.pop();
Expand All @@ -1265,6 +1281,7 @@ inline void Http2Stream::FlushDataChunks() {
inline int Http2Stream::SubmitResponse(nghttp2_nv* nva,
size_t len,
int options) {
Http2Scope h2scope(this);
DEBUG_HTTP2STREAM(this, "submitting response");
if (options & STREAM_OPTION_GET_TRAILERS)
flags_ |= NGHTTP2_STREAM_FLAG_TRAILERS;
Expand All @@ -1285,6 +1302,7 @@ inline int Http2Stream::SubmitFile(int fd,
int64_t offset,
int64_t length,
int options) {
Http2Scope h2scope(this);
DEBUG_HTTP2STREAM(this, "submitting file");
if (options & STREAM_OPTION_GET_TRAILERS)
flags_ |= NGHTTP2_STREAM_FLAG_TRAILERS;
Expand All @@ -1301,6 +1319,7 @@ inline int Http2Stream::SubmitFile(int fd,

// Submit informational headers for a stream.
inline int Http2Stream::SubmitInfo(nghttp2_nv* nva, size_t len) {
Http2Scope h2scope(this);
DEBUG_HTTP2STREAM2(this, "sending %d informational headers", len);
int ret = nghttp2_submit_headers(session_->session(),
NGHTTP2_FLAG_NONE,
Expand All @@ -1313,6 +1332,7 @@ inline int Http2Stream::SubmitInfo(nghttp2_nv* nva, size_t len) {

inline int Http2Stream::SubmitPriority(nghttp2_priority_spec* prispec,
bool silent) {
Http2Scope h2scope(this);
DEBUG_HTTP2STREAM(this, "sending priority spec");
int ret = silent ?
nghttp2_session_change_stream_priority(session_->session(),
Expand All @@ -1326,6 +1346,7 @@ inline int Http2Stream::SubmitPriority(nghttp2_priority_spec* prispec,


inline int Http2Stream::SubmitRstStream(const uint32_t code) {
Http2Scope h2scope(this);
DEBUG_HTTP2STREAM2(this, "sending rst-stream with code %d", code);
session_->SendPendingData();
CHECK_EQ(nghttp2_submit_rst_stream(session_->session(),
Expand All @@ -1341,6 +1362,7 @@ inline Http2Stream* Http2Stream::SubmitPushPromise(nghttp2_nv* nva,
size_t len,
int32_t* ret,
int options) {
Http2Scope h2scope(this);
DEBUG_HTTP2STREAM(this, "sending push promise");
*ret = nghttp2_submit_push_promise(session_->session(), NGHTTP2_FLAG_NONE,
id_, nva, len, nullptr);
Expand Down Expand Up @@ -1380,6 +1402,7 @@ inline int Http2Stream::Write(nghttp2_stream_write_t* req,
const uv_buf_t bufs[],
unsigned int nbufs,
nghttp2_stream_write_cb cb) {
Http2Scope h2scope(this);
if (!IsWritable()) {
if (cb != nullptr)
cb(req, UV_EOF);
Expand Down Expand Up @@ -1763,6 +1786,7 @@ void Http2Session::Goaway(const FunctionCallbackInfo<Value>& args) {
Environment* env = Environment::GetCurrent(args);
Local<Context> context = env->context();
ASSIGN_OR_RETURN_UNWRAP(&session, args.Holder());
Http2Scope h2scope(session);

uint32_t errorCode = args[0]->Uint32Value(context).ToChecked();
int32_t lastStreamID = args[1]->Int32Value(context).ToChecked();
Expand Down Expand Up @@ -2038,6 +2062,7 @@ void Http2Session::Http2Ping::Send(uint8_t* payload) {
memcpy(&data, &startTime_, arraysize(data));
payload = data;
}
Http2Scope h2scope(session_);
CHECK_EQ(nghttp2_submit_ping(**session_, NGHTTP2_FLAG_NONE, payload), 0);
}

Expand Down
22 changes: 21 additions & 1 deletion src/node_http2.h
Original file line number Diff line number Diff line change
Expand Up @@ -417,7 +417,9 @@ const char* nghttp2_errname(int rv) {

enum session_state_flags {
SESSION_STATE_NONE = 0x0,
SESSION_STATE_DESTROYING = 0x1
SESSION_STATE_DESTROYING = 0x1,
SESSION_STATE_HAS_SCOPE = 0x2,
SESSION_STATE_WRITE_SCHEDULED = 0x4
};

// This allows for 4 default-sized frames with their frame headers
Expand All @@ -429,6 +431,19 @@ typedef uint32_t(*get_setting)(nghttp2_session* session,
class Http2Session;
class Http2Stream;

// This scope should be present when any call into nghttp2 that may schedule
// data to be written to the underlying transport is made, and schedules
// such a write automatically once the scope is exited.
class Http2Scope {
public:
explicit Http2Scope(Http2Stream* stream);
explicit Http2Scope(Http2Session* session);
~Http2Scope();

private:
Http2Session* session_ = nullptr;
};

// The Http2Options class is used to parse the options object passed in to
// a Http2Session object and convert those into an appropriate nghttp2_option
// struct. This is the primary mechanism by which the Http2Session object is
Expand Down Expand Up @@ -816,6 +831,9 @@ class Http2Session : public AsyncWrap {
inline void MarkDestroying() { flags_ |= SESSION_STATE_DESTROYING; }
inline bool IsDestroying() { return flags_ & SESSION_STATE_DESTROYING; }

// Schedule a write if nghttp2 indicates it wants to write to the socket.
void MaybeScheduleWrite();

// Returns pointer to the stream, or nullptr if stream does not exist
inline Http2Stream* FindStream(int32_t id);

Expand Down Expand Up @@ -1005,6 +1023,8 @@ class Http2Session : public AsyncWrap {

size_t max_outstanding_pings_ = DEFAULT_MAX_PINGS;
std::queue<Http2Ping*> outstanding_pings_;

friend class Http2Scope;
};

class Http2Session::Http2Ping : public AsyncWrap {
Expand Down
32 changes: 32 additions & 0 deletions test/parallel/test-http2-session-gc-while-write-scheduled.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
// Flags: --expose-gc

'use strict';
const common = require('../common');
if (!common.hasCrypto)
common.skip('missing crypto');
const http2 = require('http2');
const makeDuplexPair = require('../common/duplexpair');

// This tests that running garbage collection while an Http2Session has
// a write *scheduled*, it will survive that garbage collection.

{
// This creates a session and schedules a write (for the settings frame).
let client = http2.connect('http://localhost:80', {
createConnection: common.mustCall(() => makeDuplexPair().clientSide)
});

// First, wait for any nextTicks() and their responses
// from the `connect()` call to run.
tick(10, () => {
// This schedules a write.
client.settings(http2.getDefaultSettings());
client = null;
global.gc();
});
}

function tick(n, cb) {
if (n--) setImmediate(tick, n, cb);
else cb();
}