diff --git a/src/stream_base-inl.h b/src/stream_base-inl.h index 6123dbcdac8e82..29a4c29f3d3822 100644 --- a/src/stream_base-inl.h +++ b/src/stream_base-inl.h @@ -67,30 +67,6 @@ void StreamResource::PushStreamListener(StreamListener* listener) { listener_ = listener; } -void StreamResource::RemoveStreamListener(StreamListener* listener) { - CHECK_NOT_NULL(listener); - - StreamListener* previous; - StreamListener* current; - - // Remove from the linked list. - for (current = listener_, previous = nullptr; - /* No loop condition because we want a crash if listener is not found */ - ; previous = current, current = current->previous_listener_) { - CHECK_NOT_NULL(current); - if (current == listener) { - if (previous != nullptr) - previous->previous_listener_ = current->previous_listener_; - else - listener_ = listener->previous_listener_; - break; - } - } - - listener->stream_ = nullptr; - listener->previous_listener_ = nullptr; -} - uv_buf_t StreamResource::EmitAlloc(size_t suggested_size) { DebugSealHandleScope seal_handle_scope; return listener_->OnStreamAlloc(suggested_size); @@ -122,101 +98,6 @@ StreamBase::StreamBase(Environment* env) : env_(env) { PushStreamListener(&default_listener_); } -int StreamBase::Shutdown(v8::Local req_wrap_obj) { - Environment* env = stream_env(); - - v8::HandleScope handle_scope(env->isolate()); - - if (req_wrap_obj.IsEmpty()) { - if (!env->shutdown_wrap_template() - ->NewInstance(env->context()) - .ToLocal(&req_wrap_obj)) { - return UV_EBUSY; - } - StreamReq::ResetObject(req_wrap_obj); - } - - BaseObjectPtr req_wrap_ptr; - AsyncHooks::DefaultTriggerAsyncIdScope trigger_scope(GetAsyncWrap()); - ShutdownWrap* req_wrap = CreateShutdownWrap(req_wrap_obj); - if (req_wrap != nullptr) - req_wrap_ptr.reset(req_wrap->GetAsyncWrap()); - int err = DoShutdown(req_wrap); - - if (err != 0 && req_wrap != nullptr) { - req_wrap->Dispose(); - } - - const char* msg = Error(); - if (msg != nullptr) { - if (req_wrap_obj->Set(env->context(), - env->error_string(), - OneByteString(env->isolate(), msg)).IsNothing()) { - return UV_EBUSY; - } - ClearError(); - } - - return err; -} - -StreamWriteResult StreamBase::Write(uv_buf_t* bufs, - size_t count, - uv_stream_t* send_handle, - v8::Local req_wrap_obj, - bool skip_try_write) { - Environment* env = stream_env(); - int err; - - size_t total_bytes = 0; - for (size_t i = 0; i < count; ++i) - total_bytes += bufs[i].len; - bytes_written_ += total_bytes; - - if (send_handle == nullptr && !skip_try_write) { - err = DoTryWrite(&bufs, &count); - if (err != 0 || count == 0) { - return StreamWriteResult { false, err, nullptr, total_bytes, {} }; - } - } - - v8::HandleScope handle_scope(env->isolate()); - - if (req_wrap_obj.IsEmpty()) { - if (!env->write_wrap_template() - ->NewInstance(env->context()) - .ToLocal(&req_wrap_obj)) { - return StreamWriteResult { false, UV_EBUSY, nullptr, 0, {} }; - } - StreamReq::ResetObject(req_wrap_obj); - } - - AsyncHooks::DefaultTriggerAsyncIdScope trigger_scope(GetAsyncWrap()); - WriteWrap* req_wrap = CreateWriteWrap(req_wrap_obj); - BaseObjectPtr req_wrap_ptr(req_wrap->GetAsyncWrap()); - - err = DoWrite(req_wrap, bufs, count, send_handle); - bool async = err == 0; - - if (!async) { - req_wrap->Dispose(); - req_wrap = nullptr; - } - - const char* msg = Error(); - if (msg != nullptr) { - if (req_wrap_obj->Set(env->context(), - env->error_string(), - OneByteString(env->isolate(), msg)).IsNothing()) { - return StreamWriteResult { false, UV_EBUSY, nullptr, 0, {} }; - } - ClearError(); - } - - return StreamWriteResult { - async, err, req_wrap, total_bytes, std::move(req_wrap_ptr) }; -} - template SimpleShutdownWrap::SimpleShutdownWrap( StreamBase* stream, @@ -278,22 +159,6 @@ void WriteWrap::SetBackingStore(std::unique_ptr bs) { backing_store_ = std::move(bs); } -void StreamReq::Done(int status, const char* error_str) { - AsyncWrap* async_wrap = GetAsyncWrap(); - Environment* env = async_wrap->env(); - if (error_str != nullptr) { - v8::HandleScope handle_scope(env->isolate()); - if (async_wrap->object()->Set( - env->context(), - env->error_string(), - OneByteString(env->isolate(), error_str)).IsNothing()) { - return; - } - } - - OnDone(status); -} - void StreamReq::ResetObject(v8::Local obj) { DCHECK_GT(obj->InternalFieldCount(), StreamReq::kStreamReqField); diff --git a/src/stream_base.cc b/src/stream_base.cc index 06840e06b3d5a6..f1769ca52970fe 100644 --- a/src/stream_base.cc +++ b/src/stream_base.cc @@ -40,6 +40,103 @@ using v8::Signature; using v8::String; using v8::Value; +int StreamBase::Shutdown(v8::Local req_wrap_obj) { + Environment* env = stream_env(); + + v8::HandleScope handle_scope(env->isolate()); + + if (req_wrap_obj.IsEmpty()) { + if (!env->shutdown_wrap_template() + ->NewInstance(env->context()) + .ToLocal(&req_wrap_obj)) { + return UV_EBUSY; + } + StreamReq::ResetObject(req_wrap_obj); + } + + BaseObjectPtr req_wrap_ptr; + AsyncHooks::DefaultTriggerAsyncIdScope trigger_scope(GetAsyncWrap()); + ShutdownWrap* req_wrap = CreateShutdownWrap(req_wrap_obj); + if (req_wrap != nullptr) req_wrap_ptr.reset(req_wrap->GetAsyncWrap()); + int err = DoShutdown(req_wrap); + + if (err != 0 && req_wrap != nullptr) { + req_wrap->Dispose(); + } + + const char* msg = Error(); + if (msg != nullptr) { + if (req_wrap_obj + ->Set(env->context(), + env->error_string(), + OneByteString(env->isolate(), msg)) + .IsNothing()) { + return UV_EBUSY; + } + ClearError(); + } + + return err; +} + +StreamWriteResult StreamBase::Write(uv_buf_t* bufs, + size_t count, + uv_stream_t* send_handle, + v8::Local req_wrap_obj, + bool skip_try_write) { + Environment* env = stream_env(); + int err; + + size_t total_bytes = 0; + for (size_t i = 0; i < count; ++i) total_bytes += bufs[i].len; + bytes_written_ += total_bytes; + + if (send_handle == nullptr && !skip_try_write) { + err = DoTryWrite(&bufs, &count); + if (err != 0 || count == 0) { + return StreamWriteResult{false, err, nullptr, total_bytes, {}}; + } + } + + v8::HandleScope handle_scope(env->isolate()); + + if (req_wrap_obj.IsEmpty()) { + if (!env->write_wrap_template() + ->NewInstance(env->context()) + .ToLocal(&req_wrap_obj)) { + return StreamWriteResult{false, UV_EBUSY, nullptr, 0, {}}; + } + StreamReq::ResetObject(req_wrap_obj); + } + + AsyncHooks::DefaultTriggerAsyncIdScope trigger_scope(GetAsyncWrap()); + WriteWrap* req_wrap = CreateWriteWrap(req_wrap_obj); + BaseObjectPtr req_wrap_ptr(req_wrap->GetAsyncWrap()); + + err = DoWrite(req_wrap, bufs, count, send_handle); + bool async = err == 0; + + if (!async) { + req_wrap->Dispose(); + req_wrap = nullptr; + } + + const char* msg = Error(); + if (msg != nullptr) { + if (req_wrap_obj + ->Set(env->context(), + env->error_string(), + OneByteString(env->isolate(), msg)) + .IsNothing()) { + return StreamWriteResult{false, UV_EBUSY, nullptr, 0, {}}; + } + ClearError(); + } + + return StreamWriteResult{ + async, err, req_wrap, total_bytes, std::move(req_wrap_ptr)}; +} + template int StreamBase::WriteString( const FunctionCallbackInfo& args); template int StreamBase::WriteString( @@ -680,6 +777,30 @@ StreamResource::~StreamResource() { } } +void StreamResource::RemoveStreamListener(StreamListener* listener) { + CHECK_NOT_NULL(listener); + + StreamListener* previous; + StreamListener* current; + + // Remove from the linked list. + // No loop condition because we want a crash if listener is not found. + for (current = listener_, previous = nullptr;; + previous = current, current = current->previous_listener_) { + CHECK_NOT_NULL(current); + if (current == listener) { + if (previous != nullptr) + previous->previous_listener_ = current->previous_listener_; + else + listener_ = listener->previous_listener_; + break; + } + } + + listener->stream_ = nullptr; + listener->previous_listener_ = nullptr; +} + ShutdownWrap* StreamBase::CreateShutdownWrap( Local object) { auto* wrap = new SimpleShutdownWrap(this, object); @@ -694,4 +815,21 @@ WriteWrap* StreamBase::CreateWriteWrap( return wrap; } +void StreamReq::Done(int status, const char* error_str) { + AsyncWrap* async_wrap = GetAsyncWrap(); + Environment* env = async_wrap->env(); + if (error_str != nullptr) { + v8::HandleScope handle_scope(env->isolate()); + if (async_wrap->object() + ->Set(env->context(), + env->error_string(), + OneByteString(env->isolate(), error_str)) + .IsNothing()) { + return; + } + } + + OnDone(status); +} + } // namespace node diff --git a/src/stream_base.h b/src/stream_base.h index c56508692260cc..3035ae89715518 100644 --- a/src/stream_base.h +++ b/src/stream_base.h @@ -51,7 +51,7 @@ class StreamReq { // TODO(RaisinTen): Update the return type to a Maybe, so that we can indicate // if there is a pending exception/termination. - inline void Done(int status, const char* error_str = nullptr); + void Done(int status, const char* error_str = nullptr); inline void Dispose(); StreamBase* stream() const { return stream_; } @@ -276,7 +276,7 @@ class StreamResource { inline void PushStreamListener(StreamListener* listener); // Remove a listener, and, if this was the currently active one, // transfer ownership back to the previous listener. - inline void RemoveStreamListener(StreamListener* listener); + void RemoveStreamListener(StreamListener* listener); protected: // Call the current listener's OnStreamAlloc() method. @@ -339,8 +339,7 @@ class StreamBase : public StreamResource { // ShutdownWrap object (that was created in JS), or a new one will be created. // Returns 1 in case of a synchronous completion, 0 in case of asynchronous // completion, and a libuv error case in case of synchronous failure. - inline int Shutdown( - v8::Local req_wrap_obj = v8::Local()); + int Shutdown(v8::Local req_wrap_obj = v8::Local()); // TODO(RaisinTen): Update the return type to a Maybe, so that we can indicate // if there is a pending exception/termination. @@ -353,7 +352,7 @@ class StreamBase : public StreamResource { // write is too large to finish synchronously. // If the return value indicates a synchronous completion, no callback will // be invoked. - inline StreamWriteResult Write( + StreamWriteResult Write( uv_buf_t* bufs, size_t count, uv_stream_t* send_handle = nullptr,