diff --git a/src/js_stream.cc b/src/js_stream.cc index 7d4ad7a4e978a6..22d267f5493e2a 100644 --- a/src/js_stream.cc +++ b/src/js_stream.cc @@ -74,11 +74,6 @@ void JSStream::OnReadImpl(ssize_t nread, } -void* JSStream::Cast() { - return static_cast(this); -} - - AsyncWrap* JSStream::GetAsyncWrap() { return static_cast(this); } @@ -181,7 +176,7 @@ void JSStream::DoAfterWrite(const FunctionCallbackInfo& args) { ASSIGN_OR_RETURN_UNWRAP(&wrap, args.Holder()); ASSIGN_OR_RETURN_UNWRAP(&w, args[0].As()); - wrap->OnAfterWrite(w); + w->Done(0); } diff --git a/src/js_stream.h b/src/js_stream.h index 44bf7a06df7f8f..c7caf79374d538 100644 --- a/src/js_stream.h +++ b/src/js_stream.h @@ -18,7 +18,6 @@ class JSStream : public AsyncWrap, public StreamBase { ~JSStream(); - void* Cast() override; bool IsAlive() override; bool IsClosing() override; int ReadStart() override; diff --git a/src/node_http2.cc b/src/node_http2.cc index b763ff0dbe5ab1..f8b530c20a16cf 100644 --- a/src/node_http2.cc +++ b/src/node_http2.cc @@ -987,9 +987,6 @@ inline void Http2Session::SetChunksSinceLastWrite(size_t n) { WriteWrap* Http2Session::AllocateSend() { HandleScope scope(env()->isolate()); - auto AfterWrite = [](WriteWrap* req, int status) { - req->Dispose(); - }; Local obj = env()->write_wrap_constructor_function() ->NewInstance(env()->context()).ToLocalChecked(); @@ -999,7 +996,7 @@ WriteWrap* Http2Session::AllocateSend() { session(), NGHTTP2_SETTINGS_MAX_FRAME_SIZE); // Max frame size + 9 bytes for the header - return WriteWrap::New(env(), obj, stream_, AfterWrite, size + 9); + return WriteWrap::New(env(), obj, stream_, size + 9); } void Http2Session::Send(WriteWrap* req, char* buf, size_t length) { diff --git a/src/node_http2.h b/src/node_http2.h index c869d9a59547a7..5a61e465a40ce3 100644 --- a/src/node_http2.h +++ b/src/node_http2.h @@ -672,8 +672,7 @@ class Http2Stream : public AsyncWrap, return false; } - AsyncWrap* GetAsyncWrap() override { return static_cast(this); } - void* Cast() override { return reinterpret_cast(this); } + AsyncWrap* GetAsyncWrap() override { return this; } int DoWrite(WriteWrap* w, uv_buf_t* bufs, size_t count, uv_stream_t* send_handle) override; diff --git a/src/stream_base-inl.h b/src/stream_base-inl.h index 807e138ef7b6df..29739011c6a439 100644 --- a/src/stream_base-inl.h +++ b/src/stream_base-inl.h @@ -143,15 +143,19 @@ void StreamBase::JSMethod(const FunctionCallbackInfo& args) { } +inline void ShutdownWrap::OnDone(int status) { + stream()->AfterShutdown(this, status); +} + + WriteWrap* WriteWrap::New(Environment* env, Local obj, StreamBase* wrap, - DoneCb cb, size_t extra) { size_t storage_size = ROUND_UP(sizeof(WriteWrap), kAlignSize) + extra; char* storage = new char[storage_size]; - return new(storage) WriteWrap(env, obj, wrap, cb, storage_size); + return new(storage) WriteWrap(env, obj, wrap, storage_size); } @@ -171,6 +175,10 @@ size_t WriteWrap::ExtraSize() const { return storage_size_ - ROUND_UP(sizeof(*this), kAlignSize); } +inline void WriteWrap::OnDone(int status) { + stream()->AfterWrite(this, status); +} + } // namespace node #endif // defined(NODE_WANT_INTERNALS) && NODE_WANT_INTERNALS diff --git a/src/stream_base.cc b/src/stream_base.cc index 922b277c583017..46d2ff5faf0969 100644 --- a/src/stream_base.cc +++ b/src/stream_base.cc @@ -55,8 +55,7 @@ int StreamBase::Shutdown(const FunctionCallbackInfo& args) { env->set_init_trigger_async_id(wrap->get_async_id()); ShutdownWrap* req_wrap = new ShutdownWrap(env, req_wrap_obj, - this, - AfterShutdown); + this); int err = DoShutdown(req_wrap); if (err) @@ -66,7 +65,6 @@ int StreamBase::Shutdown(const FunctionCallbackInfo& args) { void StreamBase::AfterShutdown(ShutdownWrap* req_wrap, int status) { - StreamBase* wrap = req_wrap->wrap(); Environment* env = req_wrap->env(); // The wrap and request objects should still be there. @@ -78,7 +76,7 @@ void StreamBase::AfterShutdown(ShutdownWrap* req_wrap, int status) { Local req_wrap_obj = req_wrap->object(); Local argv[3] = { Integer::New(env->isolate(), status), - wrap->GetObject(), + GetObject(), req_wrap_obj }; @@ -158,7 +156,7 @@ int StreamBase::Writev(const FunctionCallbackInfo& args) { wrap = GetAsyncWrap(); CHECK_NE(wrap, nullptr); env->set_init_trigger_async_id(wrap->get_async_id()); - req_wrap = WriteWrap::New(env, req_wrap_obj, this, AfterWrite, storage_size); + req_wrap = WriteWrap::New(env, req_wrap_obj, this, storage_size); offset = 0; if (!all_buffers) { @@ -248,7 +246,7 @@ int StreamBase::WriteBuffer(const FunctionCallbackInfo& args) { if (wrap != nullptr) env->set_init_trigger_async_id(wrap->get_async_id()); // Allocate, or write rest - req_wrap = WriteWrap::New(env, req_wrap_obj, this, AfterWrite); + req_wrap = WriteWrap::New(env, req_wrap_obj, this); err = DoWrite(req_wrap, bufs, count, nullptr); req_wrap_obj->Set(env->async(), True(env->isolate())); @@ -332,7 +330,7 @@ int StreamBase::WriteString(const FunctionCallbackInfo& args) { wrap = GetAsyncWrap(); if (wrap != nullptr) env->set_init_trigger_async_id(wrap->get_async_id()); - req_wrap = WriteWrap::New(env, req_wrap_obj, this, AfterWrite, storage_size); + req_wrap = WriteWrap::New(env, req_wrap_obj, this, storage_size); data = req_wrap->Extra(); @@ -393,7 +391,6 @@ int StreamBase::WriteString(const FunctionCallbackInfo& args) { void StreamBase::AfterWrite(WriteWrap* req_wrap, int status) { - StreamBase* wrap = req_wrap->wrap(); Environment* env = req_wrap->env(); HandleScope handle_scope(env->isolate()); @@ -405,19 +402,19 @@ void StreamBase::AfterWrite(WriteWrap* req_wrap, int status) { // Unref handle property Local req_wrap_obj = req_wrap->object(); req_wrap_obj->Delete(env->context(), env->handle_string()).FromJust(); - wrap->OnAfterWrite(req_wrap); + OnAfterWrite(req_wrap, status); Local argv[] = { Integer::New(env->isolate(), status), - wrap->GetObject(), + GetObject(), req_wrap_obj, Undefined(env->isolate()) }; - const char* msg = wrap->Error(); + const char* msg = Error(); if (msg != nullptr) { argv[3] = OneByteString(env->isolate(), msg); - wrap->ClearError(); + ClearError(); } if (req_wrap_obj->Has(env->context(), env->oncomplete_string()).FromJust()) diff --git a/src/stream_base.h b/src/stream_base.h index 87c7b0bc63eafd..5bf5f013947347 100644 --- a/src/stream_base.h +++ b/src/stream_base.h @@ -16,27 +16,27 @@ namespace node { // Forward declarations class StreamBase; -template +template class StreamReq { public: - typedef void (*DoneCb)(Req* req, int status); - - explicit StreamReq(DoneCb cb) : cb_(cb) { + explicit StreamReq(StreamBase* stream) : stream_(stream) { } inline void Done(int status, const char* error_str = nullptr) { - Req* req = static_cast(this); + Base* req = static_cast(this); Environment* env = req->env(); if (error_str != nullptr) { req->object()->Set(env->error_string(), OneByteString(env->isolate(), error_str)); } - cb_(req, status); + req->OnDone(status); } + inline StreamBase* stream() const { return stream_; } + private: - DoneCb cb_; + StreamBase* const stream_; }; class ShutdownWrap : public ReqWrap, @@ -44,11 +44,9 @@ class ShutdownWrap : public ReqWrap, public: ShutdownWrap(Environment* env, v8::Local req_wrap_obj, - StreamBase* wrap, - DoneCb cb) + StreamBase* stream) : ReqWrap(env, req_wrap_obj, AsyncWrap::PROVIDER_SHUTDOWNWRAP), - StreamReq(cb), - wrap_(wrap) { + StreamReq(stream) { Wrap(req_wrap_obj, this); } @@ -60,27 +58,22 @@ class ShutdownWrap : public ReqWrap, return ContainerOf(&ShutdownWrap::req_, req); } - inline StreamBase* wrap() const { return wrap_; } size_t self_size() const override { return sizeof(*this); } - private: - StreamBase* const wrap_; + inline void OnDone(int status); // Just calls stream()->AfterShutdown() }; -class WriteWrap: public ReqWrap, - public StreamReq { +class WriteWrap : public ReqWrap, + public StreamReq { public: static inline WriteWrap* New(Environment* env, v8::Local obj, - StreamBase* wrap, - DoneCb cb, + StreamBase* stream, size_t extra = 0); inline void Dispose(); inline char* Extra(size_t offset = 0); inline size_t ExtraSize() const; - inline StreamBase* wrap() const { return wrap_; } - size_t self_size() const override { return storage_size_; } static WriteWrap* from_req(uv_write_t* req) { @@ -91,24 +84,22 @@ class WriteWrap: public ReqWrap, WriteWrap(Environment* env, v8::Local obj, - StreamBase* wrap, - DoneCb cb) + StreamBase* stream) : ReqWrap(env, obj, AsyncWrap::PROVIDER_WRITEWRAP), - StreamReq(cb), - wrap_(wrap), + StreamReq(stream), storage_size_(0) { Wrap(obj, this); } + inline void OnDone(int status); // Just calls stream()->AfterWrite() + protected: WriteWrap(Environment* env, v8::Local obj, - StreamBase* wrap, - DoneCb cb, + StreamBase* stream, size_t storage_size) : ReqWrap(env, obj, AsyncWrap::PROVIDER_WRITEWRAP), - StreamReq(cb), - wrap_(wrap), + StreamReq(stream), storage_size_(storage_size) { Wrap(obj, this); } @@ -129,7 +120,6 @@ class WriteWrap: public ReqWrap, // WriteWrap. Ensure this never happens. void operator delete(void* ptr) { UNREACHABLE(); } - StreamBase* const wrap_; const size_t storage_size_; }; @@ -151,7 +141,7 @@ class StreamResource { void* ctx; }; - typedef void (*AfterWriteCb)(WriteWrap* w, void* ctx); + typedef void (*AfterWriteCb)(WriteWrap* w, int status, void* ctx); typedef void (*AllocCb)(size_t size, uv_buf_t* buf, void* ctx); typedef void (*ReadCb)(ssize_t nread, const uv_buf_t* buf, @@ -176,9 +166,9 @@ class StreamResource { virtual void ClearError(); // Events - inline void OnAfterWrite(WriteWrap* w) { + inline void OnAfterWrite(WriteWrap* w, int status) { if (!after_write_cb_.is_empty()) - after_write_cb_.fn(w, after_write_cb_.ctx); + after_write_cb_.fn(w, status, after_write_cb_.ctx); } inline void OnAlloc(size_t size, uv_buf_t* buf) { @@ -208,14 +198,12 @@ class StreamResource { inline Callback read_cb() { return read_cb_; } inline Callback destruct_cb() { return destruct_cb_; } - private: + protected: Callback after_write_cb_; Callback alloc_cb_; Callback read_cb_; Callback destruct_cb_; uint64_t bytes_read_; - - friend class StreamBase; }; class StreamBase : public StreamResource { @@ -231,7 +219,6 @@ class StreamBase : public StreamResource { v8::Local target, int flags = kFlagNone); - virtual void* Cast() = 0; virtual bool IsAlive() = 0; virtual bool IsClosing() = 0; virtual bool IsIPCPipe(); @@ -250,13 +237,14 @@ class StreamBase : public StreamResource { consumed_ = false; } - template - inline Outer* Cast() { return static_cast(Cast()); } - void EmitData(ssize_t nread, v8::Local buf, v8::Local handle); + // These are called by the respective {Write,Shutdown}Wrap class. + virtual void AfterShutdown(ShutdownWrap* req, int status); + virtual void AfterWrite(WriteWrap* req, int status); + protected: explicit StreamBase(Environment* env) : env_(env), consumed_(false) { } @@ -267,10 +255,6 @@ class StreamBase : public StreamResource { virtual AsyncWrap* GetAsyncWrap() = 0; virtual v8::Local GetObject(); - // Libuv callbacks - static void AfterShutdown(ShutdownWrap* req, int status); - static void AfterWrite(WriteWrap* req, int status); - // JS Methods int ReadStart(const v8::FunctionCallbackInfo& args); int ReadStop(const v8::FunctionCallbackInfo& args); diff --git a/src/stream_wrap.cc b/src/stream_wrap.cc index a737ed67b02c57..3cce3b151cf0d4 100644 --- a/src/stream_wrap.cc +++ b/src/stream_wrap.cc @@ -91,7 +91,6 @@ LibuvStreamWrap::LibuvStreamWrap(Environment* env, provider), StreamBase(env), stream_(stream) { - set_after_write_cb({ OnAfterWriteImpl, this }); set_alloc_cb({ OnAllocImpl, this }); set_read_cb({ OnReadImpl, this }); } @@ -126,11 +125,6 @@ bool LibuvStreamWrap::IsClosing() { } -void* LibuvStreamWrap::Cast() { - return reinterpret_cast(this); -} - - AsyncWrap* LibuvStreamWrap::GetAsyncWrap() { return static_cast(this); } @@ -298,13 +292,13 @@ void LibuvStreamWrap::SetBlocking(const FunctionCallbackInfo& args) { int LibuvStreamWrap::DoShutdown(ShutdownWrap* req_wrap) { int err; - err = uv_shutdown(req_wrap->req(), stream(), AfterShutdown); + err = uv_shutdown(req_wrap->req(), stream(), AfterUvShutdown); req_wrap->Dispatched(); return err; } -void LibuvStreamWrap::AfterShutdown(uv_shutdown_t* req, int status) { +void LibuvStreamWrap::AfterUvShutdown(uv_shutdown_t* req, int status) { ShutdownWrap* req_wrap = ShutdownWrap::from_req(req); CHECK_NE(req_wrap, nullptr); HandleScope scope(req_wrap->env()->isolate()); @@ -359,9 +353,9 @@ int LibuvStreamWrap::DoWrite(WriteWrap* w, uv_stream_t* send_handle) { int r; if (send_handle == nullptr) { - r = uv_write(w->req(), stream(), bufs, count, AfterWrite); + r = uv_write(w->req(), stream(), bufs, count, AfterUvWrite); } else { - r = uv_write2(w->req(), stream(), bufs, count, send_handle, AfterWrite); + r = uv_write2(w->req(), stream(), bufs, count, send_handle, AfterUvWrite); } if (!r) { @@ -382,7 +376,7 @@ int LibuvStreamWrap::DoWrite(WriteWrap* w, } -void LibuvStreamWrap::AfterWrite(uv_write_t* req, int status) { +void LibuvStreamWrap::AfterUvWrite(uv_write_t* req, int status) { WriteWrap* req_wrap = WriteWrap::from_req(req); CHECK_NE(req_wrap, nullptr); HandleScope scope(req_wrap->env()->isolate()); @@ -391,9 +385,9 @@ void LibuvStreamWrap::AfterWrite(uv_write_t* req, int status) { } -void LibuvStreamWrap::OnAfterWriteImpl(WriteWrap* w, void* ctx) { - LibuvStreamWrap* wrap = static_cast(ctx); - wrap->UpdateWriteQueueSize(); +void LibuvStreamWrap::AfterWrite(WriteWrap* w, int status) { + StreamBase::AfterWrite(w, status); + UpdateWriteQueueSize(); } } // namespace node diff --git a/src/stream_wrap.h b/src/stream_wrap.h index 43df504e81b86e..414bad393fa9b1 100644 --- a/src/stream_wrap.h +++ b/src/stream_wrap.h @@ -40,7 +40,6 @@ class LibuvStreamWrap : public HandleWrap, public StreamBase { v8::Local context); int GetFD() override; - void* Cast() override; bool IsAlive() override; bool IsClosing() override; bool IsIPCPipe() override; @@ -103,17 +102,18 @@ class LibuvStreamWrap : public HandleWrap, public StreamBase { static void OnRead(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf); - static void AfterWrite(uv_write_t* req, int status); - static void AfterShutdown(uv_shutdown_t* req, int status); + static void AfterUvWrite(uv_write_t* req, int status); + static void AfterUvShutdown(uv_shutdown_t* req, int status); // Resource interface implementation - static void OnAfterWriteImpl(WriteWrap* w, void* ctx); static void OnAllocImpl(size_t size, uv_buf_t* buf, void* ctx); static void OnReadImpl(ssize_t nread, const uv_buf_t* buf, uv_handle_type pending, void* ctx); + void AfterWrite(WriteWrap* req_wrap, int status) override; + uv_stream_t* const stream_; }; diff --git a/src/tls_wrap.cc b/src/tls_wrap.cc index 3b899ea12d501d..85dfbd1d491a1b 100644 --- a/src/tls_wrap.cc +++ b/src/tls_wrap.cc @@ -315,8 +315,7 @@ void TLSWrap::EncOut() { ->NewInstance(env()->context()).ToLocalChecked(); WriteWrap* write_req = WriteWrap::New(env(), req_wrap_obj, - this, - EncOutCb); + stream_); uv_buf_t buf[arraysize(data)]; for (size_t i = 0; i < count; i++) @@ -333,34 +332,31 @@ void TLSWrap::EncOut() { } -void TLSWrap::EncOutCb(WriteWrap* req_wrap, int status) { - TLSWrap* wrap = req_wrap->wrap()->Cast(); - req_wrap->Dispose(); - +void TLSWrap::EncOutAfterWrite(WriteWrap* req_wrap, int status) { // We should not be getting here after `DestroySSL`, because all queued writes // must be invoked with UV_ECANCELED - CHECK_NE(wrap->ssl_, nullptr); + CHECK_NE(ssl_, nullptr); // Handle error if (status) { // Ignore errors after shutdown - if (wrap->shutdown_) + if (shutdown_) return; // Notify about error - wrap->InvokeQueued(status); + InvokeQueued(status); return; } // Commit - crypto::NodeBIO::FromBIO(wrap->enc_out_)->Read(nullptr, wrap->write_size_); + crypto::NodeBIO::FromBIO(enc_out_)->Read(nullptr, write_size_); // Ensure that the progress will be made and `InvokeQueued` will be called. - wrap->ClearIn(); + ClearIn(); // Try writing more data - wrap->write_size_ = 0; - wrap->EncOut(); + write_size_ = 0; + EncOut(); } @@ -521,11 +517,6 @@ bool TLSWrap::ClearIn() { } -void* TLSWrap::Cast() { - return reinterpret_cast(this); -} - - AsyncWrap* TLSWrap::GetAsyncWrap() { return static_cast(this); } @@ -664,9 +655,9 @@ int TLSWrap::DoWrite(WriteWrap* w, } -void TLSWrap::OnAfterWriteImpl(WriteWrap* w, void* ctx) { +void TLSWrap::OnAfterWriteImpl(WriteWrap* w, int status, void* ctx) { TLSWrap* wrap = static_cast(ctx); - wrap->UpdateWriteQueueSize(); + wrap->EncOutAfterWrite(w, status); } diff --git a/src/tls_wrap.h b/src/tls_wrap.h index b782e7c3c23178..87eac757793b54 100644 --- a/src/tls_wrap.h +++ b/src/tls_wrap.h @@ -56,7 +56,6 @@ class TLSWrap : public AsyncWrap, v8::Local unused, v8::Local context); - void* Cast() override; int GetFD() override; bool IsAlive() override; bool IsClosing() override; @@ -112,7 +111,7 @@ class TLSWrap : public AsyncWrap, static void SSLInfoCallback(const SSL* ssl_, int where, int ret); void InitSSL(); void EncOut(); - static void EncOutCb(WriteWrap* req_wrap, int status); + void EncOutAfterWrite(WriteWrap* req_wrap, int status); bool ClearIn(); void ClearOut(); void MakePending(); @@ -135,7 +134,7 @@ class TLSWrap : public AsyncWrap, uint32_t UpdateWriteQueueSize(uint32_t write_queue_size = 0); // Resource implementation - static void OnAfterWriteImpl(WriteWrap* w, void* ctx); + static void OnAfterWriteImpl(WriteWrap* w, int status, void* ctx); static void OnAllocImpl(size_t size, uv_buf_t* buf, void* ctx); static void OnReadImpl(ssize_t nread, const uv_buf_t* buf, diff --git a/test/sequential/test-https-keep-alive-large-write.js b/test/sequential/test-https-keep-alive-large-write.js index 88468dc03fc99b..5048f4f9519449 100644 --- a/test/sequential/test-https-keep-alive-large-write.js +++ b/test/sequential/test-https-keep-alive-large-write.js @@ -41,7 +41,9 @@ const server = https.createServer({ }); serverConnectionHandle = res.socket._handle; - res.write(content); + res.write(content, () => { + assert.strictEqual(serverConnectionHandle.writeQueueSize, 0); + }); res.end(); })); server.setTimeout(serverTimeout);