Skip to content

Commit

Permalink
http2,tls: store WriteWrap using BaseObjectPtr
Browse files Browse the repository at this point in the history
Create weak `WriteWrap` and `ShutdownWrap` objects, and when
referencing them in C++ is necessary, use `BaseObjectPtr<>`
instead of plain pointers to keep these objects from being
garbage-collected.

This solves issues that arise when the underlying `StreamBase`
instance is weak, but the `WriteWrap` or `ShutdownWrap` instances
are not; in that case, they would otherwise potentially stick
around in memory after the stream that they originally belong
to is long gone.

It probably makes sense to use `BaseObjectptr<>` more extensively
in `StreamBase` in the long run as well.

PR-URL: nodejs#35488
Reviewed-By: James M Snell <[email protected]>
Reviewed-By: Matteo Collina <[email protected]>
  • Loading branch information
addaleax authored and joesepi committed Oct 22, 2020
1 parent 20d88f3 commit 2705084
Show file tree
Hide file tree
Showing 7 changed files with 73 additions and 32 deletions.
26 changes: 14 additions & 12 deletions src/node_http2.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1525,12 +1525,12 @@ void Http2Session::ClearOutgoing(int status) {
std::vector<NgHttp2StreamWrite> current_outgoing_buffers_;
current_outgoing_buffers_.swap(outgoing_buffers_);
for (const NgHttp2StreamWrite& wr : current_outgoing_buffers_) {
WriteWrap* wrap = wr.req_wrap;
if (wrap != nullptr) {
BaseObjectPtr<AsyncWrap> wrap = std::move(wr.req_wrap);
if (wrap) {
// TODO(addaleax): Pass `status` instead of 0, so that we actually error
// out with the error from the write to the underlying protocol,
// if one occurred.
wrap->Done(0);
WriteWrap::FromObject(wrap)->Done(0);
}
}
}
Expand Down Expand Up @@ -1813,7 +1813,7 @@ void Http2Session::OnStreamRead(ssize_t nread, const uv_buf_t& buf_) {

bool Http2Session::HasWritesOnSocketForStream(Http2Stream* stream) {
for (const NgHttp2StreamWrite& wr : outgoing_buffers_) {
if (wr.req_wrap != nullptr && wr.req_wrap->stream() == stream)
if (wr.req_wrap && WriteWrap::FromObject(wr.req_wrap)->stream() == stream)
return true;
}
return false;
Expand Down Expand Up @@ -1966,8 +1966,8 @@ void Http2Stream::Destroy() {
// we still have queued outbound writes.
while (!queue_.empty()) {
NgHttp2StreamWrite& head = queue_.front();
if (head.req_wrap != nullptr)
head.req_wrap->Done(UV_ECANCELED);
if (head.req_wrap)
WriteWrap::FromObject(head.req_wrap)->Done(UV_ECANCELED);
queue_.pop();
}

Expand Down Expand Up @@ -2196,7 +2196,8 @@ int Http2Stream::DoWrite(WriteWrap* req_wrap,
// Store the req_wrap on the last write info in the queue, so that it is
// only marked as finished once all buffers associated with it are finished.
queue_.emplace(NgHttp2StreamWrite {
i == nbufs - 1 ? req_wrap : nullptr,
BaseObjectPtr<AsyncWrap>(
i == nbufs - 1 ? req_wrap->GetAsyncWrap() : nullptr),
bufs[i]
});
IncrementAvailableOutboundLength(bufs[i].len);
Expand Down Expand Up @@ -2290,10 +2291,11 @@ ssize_t Http2Stream::Provider::Stream::OnRead(nghttp2_session* handle,
// find out when the HTTP2 stream wants to consume data, and because the
// StreamBase API allows empty input chunks.
while (!stream->queue_.empty() && stream->queue_.front().buf.len == 0) {
WriteWrap* finished = stream->queue_.front().req_wrap;
BaseObjectPtr<AsyncWrap> finished =
std::move(stream->queue_.front().req_wrap);
stream->queue_.pop();
if (finished != nullptr)
finished->Done(0);
if (finished)
WriteWrap::FromObject(finished)->Done(0);
}

if (!stream->queue_.empty()) {
Expand Down Expand Up @@ -2919,8 +2921,8 @@ void Http2Ping::DetachFromSession() {
}

void NgHttp2StreamWrite::MemoryInfo(MemoryTracker* tracker) const {
if (req_wrap != nullptr)
tracker->TrackField("req_wrap", req_wrap->GetAsyncWrap());
if (req_wrap)
tracker->TrackField("req_wrap", req_wrap);
tracker->TrackField("buf", buf);
}

Expand Down
6 changes: 3 additions & 3 deletions src/node_http2.h
Original file line number Diff line number Diff line change
Expand Up @@ -144,12 +144,12 @@ using Http2Headers = NgHeaders<Http2HeadersTraits>;
using Http2RcBufferPointer = NgRcBufPointer<Http2RcBufferPointerTraits>;

struct NgHttp2StreamWrite : public MemoryRetainer {
WriteWrap* req_wrap = nullptr;
BaseObjectPtr<AsyncWrap> req_wrap;
uv_buf_t buf;

inline explicit NgHttp2StreamWrite(uv_buf_t buf_) : buf(buf_) {}
inline NgHttp2StreamWrite(WriteWrap* req, uv_buf_t buf_) :
req_wrap(req), buf(buf_) {}
inline NgHttp2StreamWrite(BaseObjectPtr<AsyncWrap> req_wrap, uv_buf_t buf_) :
req_wrap(std::move(req_wrap)), buf(buf_) {}

void MemoryInfo(MemoryTracker* tracker) const override;
SET_MEMORY_INFO_NAME(NgHttp2StreamWrite)
Expand Down
22 changes: 22 additions & 0 deletions src/stream_base-inl.h
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,28 @@ StreamBase* StreamBase::FromObject(v8::Local<v8::Object> obj) {
StreamBase::kStreamBaseField));
}

WriteWrap* WriteWrap::FromObject(v8::Local<v8::Object> req_wrap_obj) {
return static_cast<WriteWrap*>(StreamReq::FromObject(req_wrap_obj));
}

template <typename T, bool kIsWeak>
WriteWrap* WriteWrap::FromObject(
const BaseObjectPtrImpl<T, kIsWeak>& base_obj) {
if (!base_obj) return nullptr;
return FromObject(base_obj->object());
}

ShutdownWrap* ShutdownWrap::FromObject(v8::Local<v8::Object> req_wrap_obj) {
return static_cast<ShutdownWrap*>(StreamReq::FromObject(req_wrap_obj));
}

template <typename T, bool kIsWeak>
ShutdownWrap* ShutdownWrap::FromObject(
const BaseObjectPtrImpl<T, kIsWeak>& base_obj) {
if (!base_obj) return nullptr;
return FromObject(base_obj->object());
}

void WriteWrap::SetAllocatedStorage(AllocatedBuffer&& storage) {
CHECK_NULL(storage_.data());
storage_ = std::move(storage);
Expand Down
8 changes: 6 additions & 2 deletions src/stream_base.cc
Original file line number Diff line number Diff line change
Expand Up @@ -621,12 +621,16 @@ StreamResource::~StreamResource() {

ShutdownWrap* StreamBase::CreateShutdownWrap(
Local<Object> object) {
return new SimpleShutdownWrap<AsyncWrap>(this, object);
auto* wrap = new SimpleShutdownWrap<AsyncWrap>(this, object);
wrap->MakeWeak();
return wrap;
}

WriteWrap* StreamBase::CreateWriteWrap(
Local<Object> object) {
return new SimpleWriteWrap<AsyncWrap>(this, object);
auto* wrap = new SimpleWriteWrap<AsyncWrap>(this, object);
wrap->MakeWeak();
return wrap;
}

} // namespace node
10 changes: 10 additions & 0 deletions src/stream_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,11 @@ class ShutdownWrap : public StreamReq {
StreamBase* stream,
v8::Local<v8::Object> req_wrap_obj);

static inline ShutdownWrap* FromObject(v8::Local<v8::Object> req_wrap_obj);
template <typename T, bool kIsWeak>
static inline ShutdownWrap* FromObject(
const BaseObjectPtrImpl<T, kIsWeak>& base_obj);

// Call stream()->EmitAfterShutdown() and dispose of this request wrap.
void OnDone(int status) override;
};
Expand All @@ -89,6 +94,11 @@ class WriteWrap : public StreamReq {
StreamBase* stream,
v8::Local<v8::Object> req_wrap_obj);

static inline WriteWrap* FromObject(v8::Local<v8::Object> req_wrap_obj);
template <typename T, bool kIsWeak>
static inline WriteWrap* FromObject(
const BaseObjectPtrImpl<T, kIsWeak>& base_obj);

// Call stream()->EmitAfterWrite() and dispose of this request wrap.
void OnDone(int status) override;

Expand Down
29 changes: 16 additions & 13 deletions src/tls_wrap.cc
Original file line number Diff line number Diff line change
Expand Up @@ -95,9 +95,10 @@ bool TLSWrap::InvokeQueued(int status, const char* error_str) {
if (!write_callback_scheduled_)
return false;

if (current_write_ != nullptr) {
WriteWrap* w = current_write_;
current_write_ = nullptr;
if (current_write_) {
BaseObjectPtr<AsyncWrap> current_write = std::move(current_write_);
current_write_.reset();
WriteWrap* w = WriteWrap::FromObject(current_write);
w->Done(status, error_str);
}

Expand Down Expand Up @@ -301,7 +302,7 @@ void TLSWrap::EncOut() {
}

// Split-off queue
if (established_ && current_write_ != nullptr) {
if (established_ && current_write_) {
Debug(this, "EncOut() setting write_callback_scheduled_");
write_callback_scheduled_ = true;
}
Expand Down Expand Up @@ -372,10 +373,12 @@ void TLSWrap::EncOut() {

void TLSWrap::OnStreamAfterWrite(WriteWrap* req_wrap, int status) {
Debug(this, "OnStreamAfterWrite(status = %d)", status);
if (current_empty_write_ != nullptr) {
if (current_empty_write_) {
Debug(this, "Had empty write");
WriteWrap* finishing = current_empty_write_;
current_empty_write_ = nullptr;
BaseObjectPtr<AsyncWrap> current_empty_write =
std::move(current_empty_write_);
current_empty_write_.reset();
WriteWrap* finishing = WriteWrap::FromObject(current_empty_write);
finishing->Done(status);
return;
}
Expand Down Expand Up @@ -735,23 +738,23 @@ int TLSWrap::DoWrite(WriteWrap* w,
ClearOut();
if (BIO_pending(enc_out_) == 0) {
Debug(this, "No pending encrypted output, writing to underlying stream");
CHECK_NULL(current_empty_write_);
current_empty_write_ = w;
CHECK(!current_empty_write_);
current_empty_write_.reset(w->GetAsyncWrap());
StreamWriteResult res =
underlying_stream()->Write(bufs, count, send_handle);
if (!res.async) {
BaseObjectPtr<TLSWrap> strong_ref{this};
env()->SetImmediate([this, strong_ref](Environment* env) {
OnStreamAfterWrite(current_empty_write_, 0);
OnStreamAfterWrite(WriteWrap::FromObject(current_empty_write_), 0);
});
}
return 0;
}
}

// Store the current write wrap
CHECK_NULL(current_write_);
current_write_ = w;
CHECK(!current_write_);
current_write_.reset(w->GetAsyncWrap());

// Write encrypted data to underlying stream and call Done().
if (length == 0) {
Expand Down Expand Up @@ -804,7 +807,7 @@ int TLSWrap::DoWrite(WriteWrap* w,
// If we stopped writing because of an error, it's fatal, discard the data.
if (!arg.IsEmpty()) {
Debug(this, "Got SSL error (%d), returning UV_EPROTO", err);
current_write_ = nullptr;
current_write_.reset();
return UV_EPROTO;
}

Expand Down
4 changes: 2 additions & 2 deletions src/tls_wrap.h
Original file line number Diff line number Diff line change
Expand Up @@ -194,9 +194,9 @@ class TLSWrap : public AsyncWrap,
// Waiting for ClearIn() to pass to SSL_write().
AllocatedBuffer pending_cleartext_input_;
size_t write_size_ = 0;
WriteWrap* current_write_ = nullptr;
BaseObjectPtr<AsyncWrap> current_write_;
bool in_dowrite_ = false;
WriteWrap* current_empty_write_ = nullptr;
BaseObjectPtr<AsyncWrap> current_empty_write_;
bool write_callback_scheduled_ = false;
bool started_ = false;
bool established_ = false;
Expand Down

0 comments on commit 2705084

Please sign in to comment.