From cf9939f4b1804b4b8e61b7fabae780edccef0835 Mon Sep 17 00:00:00 2001 From: Anna Henningsen Date: Mon, 16 Dec 2019 16:03:50 -0500 Subject: [PATCH 1/7] src: better encapsulate native immediate list Refactor for clarity and reusability. Make it more obvious that the list is a FIFO queue. --- src/env-inl.h | 43 +++++++++++++++++++++++++++++++++++-------- src/env.cc | 15 ++++++--------- src/env.h | 16 ++++++++++++++-- 3 files changed, 55 insertions(+), 19 deletions(-) diff --git a/src/env-inl.h b/src/env-inl.h index cd111bc2641890..b5802b6df3c2db 100644 --- a/src/env-inl.h +++ b/src/env-inl.h @@ -748,18 +748,45 @@ inline void IsolateData::set_options( options_ = std::move(options); } -template -void Environment::CreateImmediate(Fn&& cb, bool ref) { - auto callback = std::make_unique>( - std::move(cb), ref); - NativeImmediateCallback* prev_tail = native_immediate_callbacks_tail_; +std::unique_ptr +Environment::NativeImmediateQueue::Shift() { + std::unique_ptr ret = std::move(head_); + if (ret) { + head_ = ret->get_next(); + if (!head_) + tail_ = nullptr; // The queue is now empty. + } + return ret; +} + +void Environment::NativeImmediateQueue::Push( + std::unique_ptr cb) { + NativeImmediateCallback* prev_tail = tail_; - native_immediate_callbacks_tail_ = callback.get(); + tail_ = cb.get(); if (prev_tail != nullptr) - prev_tail->set_next(std::move(callback)); + prev_tail->set_next(std::move(cb)); else - native_immediate_callbacks_head_ = std::move(callback); + head_ = std::move(cb); +} + +void Environment::NativeImmediateQueue::ConcatMove( + NativeImmediateQueue&& other) { + size_ += other.size_; + if (tail_ != nullptr) + tail_->set_next(std::move(other.head_)); + else + head_ = std::move(other.head_); + tail_ = other.tail_; + other.tail_ = nullptr; + other.size_ = 0; +} +template +void Environment::CreateImmediate(Fn&& cb, bool ref) { + auto callback = std::make_unique>( + std::move(cb), ref); + native_immediates_.Push(std::move(callback)); immediate_info()->count_inc(1); } diff --git a/src/env.cc b/src/env.cc index 51c5cc7c497f23..1ef68e542a3324 100644 --- a/src/env.cc +++ b/src/env.cc @@ -663,14 +663,14 @@ void Environment::RunAndClearNativeImmediates(bool only_refed) { "RunAndClearNativeImmediates", this); size_t ref_count = 0; size_t count = 0; - std::unique_ptr head; - head.swap(native_immediate_callbacks_head_); - native_immediate_callbacks_tail_ = nullptr; + + NativeImmediateQueue queue; + queue.ConcatMove(std::move(native_immediates_)); auto drain_list = [&]() { TryCatchScope try_catch(this); - for (; head; head = head->get_next()) { - DebugSealHandleScope seal_handle_scope(isolate()); + DebugSealHandleScope seal_handle_scope(isolate()); + while (std::unique_ptr head = queue.Shift()) { count++; if (head->is_refed()) ref_count++; @@ -682,15 +682,12 @@ void Environment::RunAndClearNativeImmediates(bool only_refed) { if (!try_catch.HasTerminated() && can_call_into_js()) errors::TriggerUncaughtException(isolate(), try_catch); - // We are done with the current callback. Move one iteration along, - // as if we had completed successfully. - head = head->get_next(); return true; } } return false; }; - while (head && drain_list()) {} + while (queue.size() > 0 && drain_list()) {} DCHECK_GE(immediate_info()->count(), count); immediate_info()->count_dec(count); diff --git a/src/env.h b/src/env.h index fcba4dcde67f53..2740211193547f 100644 --- a/src/env.h +++ b/src/env.h @@ -1430,8 +1430,20 @@ class Environment : public MemoryRetainer { Fn callback_; }; - std::unique_ptr native_immediate_callbacks_head_; - NativeImmediateCallback* native_immediate_callbacks_tail_ = nullptr; + class NativeImmediateQueue { + public: + inline std::unique_ptr Shift(); + inline void Push(std::unique_ptr cb); + // ConcatMove adds elements from 'other' to the end of this list, and clears + // 'other' afterwards. + inline void ConcatMove(NativeImmediateQueue&& other); + + private: + std::unique_ptr head_; + NativeImmediateCallback* tail_ = nullptr; + }; + + NativeImmediateQueue native_immediates_; void RunAndClearNativeImmediates(bool only_refed = false); static void CheckImmediate(uv_check_t* handle); From c7ef27bac18990b95d4c064ec54d605bd1f5cc7a Mon Sep 17 00:00:00 2001 From: Anna Henningsen Date: Wed, 15 Jan 2020 20:20:06 +0100 Subject: [PATCH 2/7] src: exclude C++ SetImmediate() from count There is no real reason to manage a count manually, given that checking whether there are C++ callbacks is a single pointer comparison. This makes it easier to add other kinds of native C++ callbacks that are managed in a similar way. --- src/env-inl.h | 9 --------- src/env.cc | 12 ++++-------- src/env.h | 2 -- 3 files changed, 4 insertions(+), 19 deletions(-) diff --git a/src/env-inl.h b/src/env-inl.h index b5802b6df3c2db..cf6de689c4558f 100644 --- a/src/env-inl.h +++ b/src/env-inl.h @@ -242,14 +242,6 @@ inline bool ImmediateInfo::has_outstanding() const { return fields_[kHasOutstanding] == 1; } -inline void ImmediateInfo::count_inc(uint32_t increment) { - fields_[kCount] += increment; -} - -inline void ImmediateInfo::count_dec(uint32_t decrement) { - fields_[kCount] -= decrement; -} - inline void ImmediateInfo::ref_count_inc(uint32_t increment) { fields_[kRefCount] += increment; } @@ -787,7 +779,6 @@ void Environment::CreateImmediate(Fn&& cb, bool ref) { auto callback = std::make_unique>( std::move(cb), ref); native_immediates_.Push(std::move(callback)); - immediate_info()->count_inc(1); } template diff --git a/src/env.cc b/src/env.cc index 1ef68e542a3324..48ba78fff82d7c 100644 --- a/src/env.cc +++ b/src/env.cc @@ -662,7 +662,6 @@ void Environment::RunAndClearNativeImmediates(bool only_refed) { TraceEventScope trace_scope(TRACING_CATEGORY_NODE1(environment), "RunAndClearNativeImmediates", this); size_t ref_count = 0; - size_t count = 0; NativeImmediateQueue queue; queue.ConcatMove(std::move(native_immediates_)); @@ -671,7 +670,6 @@ void Environment::RunAndClearNativeImmediates(bool only_refed) { TryCatchScope try_catch(this); DebugSealHandleScope seal_handle_scope(isolate()); while (std::unique_ptr head = queue.Shift()) { - count++; if (head->is_refed()) ref_count++; @@ -689,9 +687,10 @@ void Environment::RunAndClearNativeImmediates(bool only_refed) { }; while (queue.size() > 0 && drain_list()) {} - DCHECK_GE(immediate_info()->count(), count); - immediate_info()->count_dec(count); immediate_info()->ref_count_dec(ref_count); + + if (immediate_info()->ref_count() == 0) + ToggleImmediateRef(false); } @@ -777,15 +776,12 @@ void Environment::CheckImmediate(uv_check_t* handle) { TraceEventScope trace_scope(TRACING_CATEGORY_NODE1(environment), "CheckImmediate", env); - if (env->immediate_info()->count() == 0) - return; - HandleScope scope(env->isolate()); Context::Scope context_scope(env->context()); env->RunAndClearNativeImmediates(); - if (!env->can_call_into_js()) + if (env->immediate_info()->count() == 0 || !env->can_call_into_js()) return; do { diff --git a/src/env.h b/src/env.h index 2740211193547f..f5fee470308a3c 100644 --- a/src/env.h +++ b/src/env.h @@ -736,8 +736,6 @@ class ImmediateInfo : public MemoryRetainer { inline uint32_t count() const; inline uint32_t ref_count() const; inline bool has_outstanding() const; - inline void count_inc(uint32_t increment); - inline void count_dec(uint32_t decrement); inline void ref_count_inc(uint32_t increment); inline void ref_count_dec(uint32_t decrement); From ed57a1474e7a17e637055f8d2c6b2147c59347a2 Mon Sep 17 00:00:00 2001 From: Anna Henningsen Date: Wed, 15 Jan 2020 19:48:37 +0100 Subject: [PATCH 3/7] src: add a threadsafe variant of SetImmediate() Add a variant of `SetImmediate()` that can be called from any thread. This allows removing the `AsyncRequest` abstraction and replaces it with a more generic mechanism. --- src/env-inl.h | 19 ++++++++++++++++++- src/env.cc | 20 +++++++++++++++----- src/env.h | 11 ++++++++++- 3 files changed, 43 insertions(+), 7 deletions(-) diff --git a/src/env-inl.h b/src/env-inl.h index cf6de689c4558f..ae4c02b6b2d081 100644 --- a/src/env-inl.h +++ b/src/env-inl.h @@ -748,6 +748,7 @@ Environment::NativeImmediateQueue::Shift() { if (!head_) tail_ = nullptr; // The queue is now empty. } + size_--; return ret; } @@ -755,6 +756,7 @@ void Environment::NativeImmediateQueue::Push( std::unique_ptr cb) { NativeImmediateCallback* prev_tail = tail_; + size_++; tail_ = cb.get(); if (prev_tail != nullptr) prev_tail->set_next(std::move(cb)); @@ -774,6 +776,10 @@ void Environment::NativeImmediateQueue::ConcatMove( other.size_ = 0; } +size_t Environment::NativeImmediateQueue::size() const { + return size_.load(); +} + template void Environment::CreateImmediate(Fn&& cb, bool ref) { auto callback = std::make_unique>( @@ -795,6 +801,17 @@ void Environment::SetUnrefImmediate(Fn&& cb) { CreateImmediate(std::move(cb), false); } +template +void Environment::SetImmediateThreadsafe(Fn&& cb) { + auto callback = std::make_unique>( + std::move(cb), false); + { + Mutex::ScopedLock lock(native_immediates_threadsafe_mutex_); + native_immediates_threadsafe_.Push(std::move(callback)); + } + uv_async_send(&task_queues_async_); +} + Environment::NativeImmediateCallback::NativeImmediateCallback(bool refed) : refed_(refed) {} @@ -1164,7 +1181,7 @@ void Environment::RemoveCleanupHook(void (*fn)(void*), void* arg) { inline void Environment::RegisterFinalizationGroupForCleanup( v8::Local group) { cleanup_finalization_groups_.emplace_back(isolate(), group); - uv_async_send(&cleanup_finalization_groups_async_); + uv_async_send(&task_queues_async_); } size_t CleanupHookCallback::Hash::operator()( diff --git a/src/env.cc b/src/env.cc index 48ba78fff82d7c..6d6d77abd02da3 100644 --- a/src/env.cc +++ b/src/env.cc @@ -460,15 +460,16 @@ void Environment::InitializeLibuv(bool start_profiler_idle_notifier) { uv_check_init(event_loop(), &idle_check_handle_); uv_async_init( event_loop(), - &cleanup_finalization_groups_async_, + &task_queues_async_, [](uv_async_t* async) { Environment* env = ContainerOf( - &Environment::cleanup_finalization_groups_async_, async); + &Environment::task_queues_async_, async); env->CleanupFinalizationGroups(); + env->RunAndClearNativeImmediates(); }); uv_unref(reinterpret_cast(&idle_prepare_handle_)); uv_unref(reinterpret_cast(&idle_check_handle_)); - uv_unref(reinterpret_cast(&cleanup_finalization_groups_async_)); + uv_unref(reinterpret_cast(&task_queues_async_)); thread_stopper()->Install( this, static_cast(this), [](uv_async_t* handle) { @@ -532,7 +533,7 @@ void Environment::RegisterHandleCleanups() { close_and_finish, nullptr); RegisterHandleCleanup( - reinterpret_cast(&cleanup_finalization_groups_async_), + reinterpret_cast(&task_queues_async_), close_and_finish, nullptr); } @@ -663,6 +664,15 @@ void Environment::RunAndClearNativeImmediates(bool only_refed) { "RunAndClearNativeImmediates", this); size_t ref_count = 0; + // It is safe to check .size() first, because there is a causal relationship + // between pushes to the threadsafe and this function being called. + // For the common case, it's worth checking the size first before establishing + // a mutex lock. + if (native_immediates_threadsafe_.size() > 0) { + Mutex::ScopedLock lock(native_immediates_threadsafe_mutex_); + native_immediates_.ConcatMove(std::move(native_immediates_threadsafe_)); + } + NativeImmediateQueue queue; queue.ConcatMove(std::move(native_immediates_)); @@ -1084,7 +1094,7 @@ void Environment::CleanupFinalizationGroups() { if (try_catch.HasCaught() && !try_catch.HasTerminated()) errors::TriggerUncaughtException(isolate(), try_catch); // Re-schedule the execution of the remainder of the queue. - uv_async_send(&cleanup_finalization_groups_async_); + uv_async_send(&task_queues_async_); return; } } diff --git a/src/env.h b/src/env.h index f5fee470308a3c..5a8c870ebd2caf 100644 --- a/src/env.h +++ b/src/env.h @@ -1199,6 +1199,9 @@ class Environment : public MemoryRetainer { inline void SetImmediate(Fn&& cb); template inline void SetUnrefImmediate(Fn&& cb); + template + // This behaves like SetImmediate() but can be called from any thread. + inline void SetImmediateThreadsafe(Fn&& cb); // This needs to be available for the JS-land setImmediate(). void ToggleImmediateRef(bool ref); @@ -1284,7 +1287,7 @@ class Environment : public MemoryRetainer { uv_idle_t immediate_idle_handle_; uv_prepare_t idle_prepare_handle_; uv_check_t idle_check_handle_; - uv_async_t cleanup_finalization_groups_async_; + uv_async_t task_queues_async_; bool profiler_idle_notifier_started_ = false; AsyncHooks async_hooks_; @@ -1436,12 +1439,18 @@ class Environment : public MemoryRetainer { // 'other' afterwards. inline void ConcatMove(NativeImmediateQueue&& other); + // size() is atomic and may be called from any thread. + inline size_t size() const; + private: + std::atomic size_ {0}; std::unique_ptr head_; NativeImmediateCallback* tail_ = nullptr; }; NativeImmediateQueue native_immediates_; + Mutex native_immediates_threadsafe_mutex_; + NativeImmediateQueue native_immediates_threadsafe_; void RunAndClearNativeImmediates(bool only_refed = false); static void CheckImmediate(uv_check_t* handle); From 026aa740899bc4ae999c30381e6d289a41611f4d Mon Sep 17 00:00:00 2001 From: Anna Henningsen Date: Wed, 15 Jan 2020 21:34:41 +0100 Subject: [PATCH 4/7] src: remove AsyncRequest MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Remove `AsyncRequest` from the source code, and replace its usage with threadsafe `SetImmediate()` calls. This has the advantage of being able to pass in any function, rather than one that is defined when the `AsyncRequest` is “installed”. This necessitates two changes: - The stopping flag (which was only used in one case and ignored in the other) is now a direct member of the `Environment` class. - Workers no longer have their own libuv handles, requiring manual management of their libuv ref count. As a drive-by fix, the `can_call_into_js` variable was turned into an atomic variable. While there have been no bug reports, the flag is set from `Stop(env)` calls, which are supposed to be possible from any thread. --- src/env-inl.h | 23 ++++++++++++++--------- src/env.cc | 46 ++------------------------------------------- src/env.h | 47 ++++++++++++---------------------------------- src/node_worker.cc | 34 +++++++++++++++++++++------------ src/node_worker.h | 5 ++--- 5 files changed, 52 insertions(+), 103 deletions(-) diff --git a/src/env-inl.h b/src/env-inl.h index ae4c02b6b2d081..d8177e2e74c41c 100644 --- a/src/env-inl.h +++ b/src/env-inl.h @@ -897,8 +897,21 @@ inline void Environment::remove_sub_worker_context(worker::Worker* context) { sub_worker_contexts_.erase(context); } +inline void Environment::add_refs(int64_t diff) { + task_queues_async_refs_ += diff; + CHECK_GE(task_queues_async_refs_, 0); + if (task_queues_async_refs_ == 0) + uv_unref(reinterpret_cast(&task_queues_async_)); + else + uv_ref(reinterpret_cast(&task_queues_async_)); +} + inline bool Environment::is_stopping() const { - return thread_stopper_.is_stopped(); + return is_stopping_.load(); +} + +inline void Environment::set_stopping(bool value) { + is_stopping_.store(value); } inline std::list* Environment::extra_linked_bindings() { @@ -1218,14 +1231,6 @@ int64_t Environment::base_object_count() const { return base_object_count_; } -bool AsyncRequest::is_stopped() const { - return stopped_.load(); -} - -void AsyncRequest::set_stopped(bool flag) { - stopped_.store(flag); -} - #define VP(PropertyName, StringValue) V(v8::Private, PropertyName) #define VY(PropertyName, StringValue) V(v8::Symbol, PropertyName) #define VS(PropertyName, StringValue) V(v8::String, PropertyName) diff --git a/src/env.cc b/src/env.cc index 6d6d77abd02da3..bdd96a2a4aa143 100644 --- a/src/env.cc +++ b/src/env.cc @@ -471,14 +471,6 @@ void Environment::InitializeLibuv(bool start_profiler_idle_notifier) { uv_unref(reinterpret_cast(&idle_check_handle_)); uv_unref(reinterpret_cast(&task_queues_async_)); - thread_stopper()->Install( - this, static_cast(this), [](uv_async_t* handle) { - Environment* env = static_cast(handle->data); - uv_stop(env->event_loop()); - }); - thread_stopper()->set_stopped(false); - uv_unref(reinterpret_cast(thread_stopper()->GetHandle())); - // Register clean-up cb to be called to clean up the handles // when the environment is freed, note that they are not cleaned in // the one environment per process setup, but will be called in @@ -496,8 +488,9 @@ void Environment::InitializeLibuv(bool start_profiler_idle_notifier) { void Environment::ExitEnv() { set_can_call_into_js(false); - thread_stopper()->Stop(); + set_stopping(true); isolate_->TerminateExecution(); + SetImmediateThreadsafe([](Environment* env) { uv_stop(env->event_loop()); }); } void Environment::RegisterHandleCleanups() { @@ -602,7 +595,6 @@ void Environment::RunCleanup() { started_cleanup_ = true; TraceEventScope trace_scope(TRACING_CATEGORY_NODE1(environment), "RunCleanup", this); - thread_stopper()->Uninstall(); CleanupHandles(); while (!cleanup_hooks_.empty()) { @@ -1014,7 +1006,6 @@ inline size_t Environment::SelfSize() const { // TODO(joyeecheung): refactor the MemoryTracker interface so // this can be done for common types within the Track* calls automatically // if a certain scope is entered. - size -= sizeof(thread_stopper_); size -= sizeof(async_hooks_); size -= sizeof(tick_info_); size -= sizeof(immediate_info_); @@ -1036,7 +1027,6 @@ void Environment::MemoryInfo(MemoryTracker* tracker) const { tracker->TrackField("fs_stats_field_array", fs_stats_field_array_); tracker->TrackField("fs_stats_field_bigint_array", fs_stats_field_bigint_array_); - tracker->TrackField("thread_stopper", thread_stopper_); tracker->TrackField("cleanup_hooks", cleanup_hooks_); tracker->TrackField("async_hooks", async_hooks_); tracker->TrackField("immediate_info", immediate_info_); @@ -1100,38 +1090,6 @@ void Environment::CleanupFinalizationGroups() { } } -void AsyncRequest::Install(Environment* env, void* data, uv_async_cb target) { - CHECK_NULL(async_); - env_ = env; - async_ = new uv_async_t; - async_->data = data; - CHECK_EQ(uv_async_init(env_->event_loop(), async_, target), 0); -} - -void AsyncRequest::Uninstall() { - if (async_ != nullptr) { - env_->CloseHandle(async_, [](uv_async_t* async) { delete async; }); - async_ = nullptr; - } -} - -void AsyncRequest::Stop() { - set_stopped(true); - if (async_ != nullptr) uv_async_send(async_); -} - -uv_async_t* AsyncRequest::GetHandle() { - return async_; -} - -void AsyncRequest::MemoryInfo(MemoryTracker* tracker) const { - if (async_ != nullptr) tracker->TrackField("async_request", *async_); -} - -AsyncRequest::~AsyncRequest() { - CHECK_NULL(async_); -} - // Not really any better place than env.cc at this moment. void BaseObject::DeleteMe(void* data) { BaseObject* self = static_cast(data); diff --git a/src/env.h b/src/env.h index 5a8c870ebd2caf..72d106d1d676cc 100644 --- a/src/env.h +++ b/src/env.h @@ -587,34 +587,6 @@ struct AllocatedBuffer { friend class Environment; }; -class AsyncRequest : public MemoryRetainer { - public: - AsyncRequest() = default; - ~AsyncRequest() override; - - AsyncRequest(const AsyncRequest&) = delete; - AsyncRequest& operator=(const AsyncRequest&) = delete; - AsyncRequest(AsyncRequest&&) = delete; - AsyncRequest& operator=(AsyncRequest&&) = delete; - - void Install(Environment* env, void* data, uv_async_cb target); - void Uninstall(); - void Stop(); - inline void set_stopped(bool flag); - inline bool is_stopped() const; - uv_async_t* GetHandle(); - void MemoryInfo(MemoryTracker* tracker) const override; - - - SET_MEMORY_INFO_NAME(AsyncRequest) - SET_SELF_SIZE(AsyncRequest) - - private: - Environment* env_; - uv_async_t* async_ = nullptr; - std::atomic_bool stopped_ {true}; -}; - class KVStore { public: KVStore() = default; @@ -1065,6 +1037,14 @@ class Environment : public MemoryRetainer { inline bool can_call_into_js() const; inline void set_can_call_into_js(bool can_call_into_js); + // Increase or decrease a counter that manages whether this Environment + // keeps the event loop alive on its own or not. The counter starts out at 0, + // meaning it does not, and any positive value will make it keep the event + // loop alive. + // This is used by Workers to manage their own .ref()/.unref() implementation, + // as Workers aren't directly associated with their own libuv handles. + inline void add_refs(int64_t diff); + inline bool has_run_bootstrapping_code() const; inline void set_has_run_bootstrapping_code(bool has_run_bootstrapping_code); @@ -1085,6 +1065,7 @@ class Environment : public MemoryRetainer { inline void remove_sub_worker_context(worker::Worker* context); void stop_sub_worker_contexts(); inline bool is_stopping() const; + inline void set_stopping(bool value); inline std::list* extra_linked_bindings(); inline node_module* extra_linked_bindings_head(); inline const Mutex& extra_linked_bindings_mutex() const; @@ -1226,8 +1207,6 @@ class Environment : public MemoryRetainer { inline std::shared_ptr options(); inline std::shared_ptr inspector_host_port(); - inline AsyncRequest* thread_stopper() { return &thread_stopper_; } - // The BaseObject count is a debugging helper that makes sure that there are // no memory leaks caused by BaseObjects staying alive longer than expected // (in particular, no circular BaseObjectPtr references). @@ -1288,6 +1267,7 @@ class Environment : public MemoryRetainer { uv_prepare_t idle_prepare_handle_; uv_check_t idle_check_handle_; uv_async_t task_queues_async_; + int64_t task_queues_async_refs_ = 0; bool profiler_idle_notifier_started_ = false; AsyncHooks async_hooks_; @@ -1345,7 +1325,7 @@ class Environment : public MemoryRetainer { bool has_run_bootstrapping_code_ = false; bool has_serialized_options_ = false; - bool can_call_into_js_ = true; + std::atomic_bool can_call_into_js_ { true }; Flags flags_; uint64_t thread_id_; std::unordered_set sub_worker_contexts_; @@ -1463,10 +1443,7 @@ class Environment : public MemoryRetainer { bool started_cleanup_ = false; int64_t base_object_count_ = 0; - - // A custom async abstraction (a pair of async handle and a state variable) - // Used by embedders to shutdown running Node instance. - AsyncRequest thread_stopper_; + std::atomic_bool is_stopping_ { false }; template void ForEachBaseObject(T&& iterator); diff --git a/src/node_worker.cc b/src/node_worker.cc index 2f43780a951ad4..b6f8df1a8b1db2 100644 --- a/src/node_worker.cc +++ b/src/node_worker.cc @@ -268,7 +268,7 @@ void Worker::Run() { stopped_ = true; this->env_ = nullptr; } - env_->thread_stopper()->set_stopped(true); + env_->set_stopping(true); env_->stop_sub_worker_contexts(); env_->RunCleanup(); RunAtExit(env_.get()); @@ -412,7 +412,6 @@ void Worker::JoinThread() { thread_joined_ = true; env()->remove_sub_worker_context(this); - on_thread_finished_.Uninstall(); { HandleScope handle_scope(env()->isolate()); @@ -439,6 +438,8 @@ void Worker::JoinThread() { } Worker::~Worker() { + JoinThread(); + Mutex::ScopedLock lock(mutex_); CHECK(stopped_); @@ -574,18 +575,16 @@ void Worker::StartThread(const FunctionCallbackInfo& args) { w->stopped_ = false; w->thread_joined_ = false; - w->on_thread_finished_.Install(w->env(), w, [](uv_async_t* handle) { - Worker* w_ = static_cast(handle->data); - CHECK(w_->is_stopped()); - w_->parent_port_ = nullptr; - w_->JoinThread(); - delete w_; - }); + if (w->has_ref_) + w->env()->add_refs(1); uv_thread_options_t thread_options; thread_options.flags = UV_THREAD_HAS_STACK_SIZE; thread_options.stack_size = kStackSize; CHECK_EQ(uv_thread_create_ex(&w->tid_, &thread_options, [](void* arg) { + // XXX: This could become a std::unique_ptr, but that makes at least + // gcc 6.3 detect undefined behaviour when there shouldn't be any. + // gcc 7+ handles this well. Worker* w = static_cast(arg); const uintptr_t stack_top = reinterpret_cast(&arg); @@ -596,7 +595,12 @@ void Worker::StartThread(const FunctionCallbackInfo& args) { w->Run(); Mutex::ScopedLock lock(w->mutex_); - w->on_thread_finished_.Stop(); + w->env()->SetImmediateThreadsafe( + [w = std::unique_ptr(w)](Environment* env) { + if (w->has_ref_) + env->add_refs(-1); + // implicitly delete w + }); }, static_cast(w)), 0); } @@ -611,13 +615,19 @@ void Worker::StopThread(const FunctionCallbackInfo& args) { void Worker::Ref(const FunctionCallbackInfo& args) { Worker* w; ASSIGN_OR_RETURN_UNWRAP(&w, args.This()); - uv_ref(reinterpret_cast(w->on_thread_finished_.GetHandle())); + if (!w->has_ref_) { + w->has_ref_ = true; + w->env()->add_refs(1); + } } void Worker::Unref(const FunctionCallbackInfo& args) { Worker* w; ASSIGN_OR_RETURN_UNWRAP(&w, args.This()); - uv_unref(reinterpret_cast(w->on_thread_finished_.GetHandle())); + if (w->has_ref_) { + w->has_ref_ = false; + w->env()->add_refs(-1); + } } void Worker::GetResourceLimits(const FunctionCallbackInfo& args) { diff --git a/src/node_worker.h b/src/node_worker.h index 7b1311734a2a4a..632644202a53ba 100644 --- a/src/node_worker.h +++ b/src/node_worker.h @@ -41,7 +41,6 @@ class Worker : public AsyncWrap { void MemoryInfo(MemoryTracker* tracker) const override { tracker->TrackField("parent_port", parent_port_); - tracker->TrackInlineField(&on_thread_finished_, "on_thread_finished_"); } SET_MEMORY_INFO_NAME(Worker) @@ -107,14 +106,14 @@ class Worker : public AsyncWrap { // instance refers to it via its [kPort] property. MessagePort* parent_port_ = nullptr; - AsyncRequest on_thread_finished_; - // A raw flag that is used by creator and worker threads to // sync up on pre-mature termination of worker - while in the // warmup phase. Once the worker is fully warmed up, use the // async handle of the worker's Environment for the same purpose. bool stopped_ = true; + bool has_ref_ = true; + // The real Environment of the worker object. It has a lesser // lifespan than the worker object itself - comes to life // when the worker thread creates a new Environment, and gets From c01410a44bdf6c44acb6d66b7525d723f6714734 Mon Sep 17 00:00:00 2001 From: Anna Henningsen Date: Thu, 16 Jan 2020 17:54:27 +0100 Subject: [PATCH 5/7] src: add interrupts to Environments/Workers MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Allow doing what V8’s `v8::Isolate::RequestInterrupt()` does for V8. This also works when there is no JS code currently executing. --- src/env-inl.h | 12 ++++++++++++ src/env.cc | 41 +++++++++++++++++++++++++++++++++++++++++ src/env.h | 10 ++++++++++ src/node_worker.h | 11 +++++++++++ 4 files changed, 74 insertions(+) diff --git a/src/env-inl.h b/src/env-inl.h index d8177e2e74c41c..7b408cab633bdf 100644 --- a/src/env-inl.h +++ b/src/env-inl.h @@ -812,6 +812,18 @@ void Environment::SetImmediateThreadsafe(Fn&& cb) { uv_async_send(&task_queues_async_); } +template +void Environment::RequestInterrupt(Fn&& cb) { + auto callback = std::make_unique>( + std::move(cb), false); + { + Mutex::ScopedLock lock(native_immediates_threadsafe_mutex_); + native_immediates_interrupts_.Push(std::move(callback)); + } + uv_async_send(&task_queues_async_); + RequestInterruptFromV8(); +} + Environment::NativeImmediateCallback::NativeImmediateCallback(bool refed) : refed_(refed) {} diff --git a/src/env.cc b/src/env.cc index bdd96a2a4aa143..ba5e9cd5a05218 100644 --- a/src/env.cc +++ b/src/env.cc @@ -388,6 +388,8 @@ Environment::Environment(IsolateData* isolate_data, } Environment::~Environment() { + if (interrupt_data_ != nullptr) *interrupt_data_ = nullptr; + isolate()->GetHeapProfiler()->RemoveBuildEmbedderGraphCallback( BuildEmbedderGraph, this); @@ -651,11 +653,29 @@ void Environment::AtExit(void (*cb)(void* arg), void* arg) { at_exit_functions_.push_front(ExitCallback{cb, arg}); } +void Environment::RunAndClearInterrupts() { + while (native_immediates_interrupts_.size() > 0) { + NativeImmediateQueue queue; + { + Mutex::ScopedLock lock(native_immediates_threadsafe_mutex_); + queue.ConcatMove(std::move(native_immediates_interrupts_)); + } + DebugSealHandleScope seal_handle_scope(isolate()); + + while (std::unique_ptr head = queue.Shift()) + head->Call(this); + } +} + void Environment::RunAndClearNativeImmediates(bool only_refed) { TraceEventScope trace_scope(TRACING_CATEGORY_NODE1(environment), "RunAndClearNativeImmediates", this); size_t ref_count = 0; + // Handle interrupts first. These functions are not allowed to throw + // exceptions, so we do not need to handle that. + RunAndClearInterrupts(); + // It is safe to check .size() first, because there is a causal relationship // between pushes to the threadsafe and this function being called. // For the common case, it's worth checking the size first before establishing @@ -695,6 +715,27 @@ void Environment::RunAndClearNativeImmediates(bool only_refed) { ToggleImmediateRef(false); } +void Environment::RequestInterruptFromV8() { + if (interrupt_data_ != nullptr) return; // Already scheduled. + + // The Isolate may outlive the Environment, so some logic to handle the + // situation in which the Environment is destroyed before the handler runs + // is required. + interrupt_data_ = new Environment*(this); + + isolate()->RequestInterrupt([](Isolate* isolate, void* data) { + std::unique_ptr env_ptr { static_cast(data) }; + Environment* env = *env_ptr; + if (env == nullptr) { + // The Environment has already been destroyed. That should be okay; any + // callback added before the Environment shuts down would have been + // handled during cleanup. + return; + } + env->interrupt_data_ = nullptr; + env->RunAndClearInterrupts(); + }, interrupt_data_); +} void Environment::ScheduleTimer(int64_t duration_ms) { if (started_cleanup_) return; diff --git a/src/env.h b/src/env.h index 72d106d1d676cc..ed1dcb13d0bf7d 100644 --- a/src/env.h +++ b/src/env.h @@ -1183,6 +1183,12 @@ class Environment : public MemoryRetainer { template // This behaves like SetImmediate() but can be called from any thread. inline void SetImmediateThreadsafe(Fn&& cb); + // This behaves like V8's Isolate::RequestInterrupt(), but also accounts for + // the event loop (i.e. combines the V8 function with SetImmediate()). + // The passed callback may not throw exceptions. + // This function can be called from any thread. + template + inline void RequestInterrupt(Fn&& cb); // This needs to be available for the JS-land setImmediate(). void ToggleImmediateRef(bool ref); @@ -1431,8 +1437,12 @@ class Environment : public MemoryRetainer { NativeImmediateQueue native_immediates_; Mutex native_immediates_threadsafe_mutex_; NativeImmediateQueue native_immediates_threadsafe_; + NativeImmediateQueue native_immediates_interrupts_; void RunAndClearNativeImmediates(bool only_refed = false); + void RunAndClearInterrupts(); + Environment** interrupt_data_ = nullptr; + void RequestInterruptFromV8(); static void CheckImmediate(uv_check_t* handle); // Use an unordered_set, so that we have efficient insertion and removal. diff --git a/src/node_worker.h b/src/node_worker.h index 632644202a53ba..6db7b258ebd81a 100644 --- a/src/node_worker.h +++ b/src/node_worker.h @@ -43,6 +43,9 @@ class Worker : public AsyncWrap { tracker->TrackField("parent_port", parent_port_); } + template + inline bool RequestInterrupt(Fn&& cb); + SET_MEMORY_INFO_NAME(Worker) SET_SELF_SIZE(Worker) @@ -123,6 +126,14 @@ class Worker : public AsyncWrap { friend class WorkerThreadData; }; +template +bool Worker::RequestInterrupt(Fn&& cb) { + Mutex::ScopedLock lock(mutex_); + if (env_ == nullptr) return false; + env_->RequestInterrupt(std::move(cb)); + return true; +} + } // namespace worker } // namespace node From 1b3e1834345ae84947a1b11e40bbd177bc5c83d4 Mon Sep 17 00:00:00 2001 From: Anna Henningsen Date: Thu, 16 Jan 2020 19:44:11 +0100 Subject: [PATCH 6/7] src: move MemoryInfo() for worker code to .cc files This is a) the right thing to do anyway because these functions can not be inlined by the compiler and b) avoids compilation warnings in the following commit. --- src/node_messaging.cc | 4 ++++ src/node_messaging.h | 5 +---- src/node_worker.cc | 4 ++++ src/node_worker.h | 5 +---- 4 files changed, 10 insertions(+), 8 deletions(-) diff --git a/src/node_messaging.cc b/src/node_messaging.cc index 1028da69b44961..ebad6c67508105 100644 --- a/src/node_messaging.cc +++ b/src/node_messaging.cc @@ -900,6 +900,10 @@ void MessagePort::Entangle(MessagePort* a, MessagePortData* b) { MessagePortData::Entangle(a->data_.get(), b); } +void MessagePort::MemoryInfo(MemoryTracker* tracker) const { + tracker->TrackField("data", data_); +} + Local GetMessagePortConstructorTemplate(Environment* env) { // Factor generating the MessagePort JS constructor into its own piece // of code, because it is needed early on in the child environment setup. diff --git a/src/node_messaging.h b/src/node_messaging.h index 526158e144d354..6d2410a7248b95 100644 --- a/src/node_messaging.h +++ b/src/node_messaging.h @@ -191,10 +191,7 @@ class MessagePort : public HandleWrap { // NULL pointer to the C++ MessagePort object is also detached. inline bool IsDetached() const; - void MemoryInfo(MemoryTracker* tracker) const override { - tracker->TrackField("data", data_); - } - + void MemoryInfo(MemoryTracker* tracker) const override; SET_MEMORY_INFO_NAME(MessagePort) SET_SELF_SIZE(MessagePort) diff --git a/src/node_worker.cc b/src/node_worker.cc index b6f8df1a8b1db2..785f2783c22346 100644 --- a/src/node_worker.cc +++ b/src/node_worker.cc @@ -656,6 +656,10 @@ void Worker::Exit(int code) { } } +void Worker::MemoryInfo(MemoryTracker* tracker) const { + tracker->TrackField("parent_port", parent_port_); +} + namespace { // Return the MessagePort that is global for this Environment and communicates diff --git a/src/node_worker.h b/src/node_worker.h index 6db7b258ebd81a..f84362f35c1897 100644 --- a/src/node_worker.h +++ b/src/node_worker.h @@ -39,13 +39,10 @@ class Worker : public AsyncWrap { // Wait for the worker thread to stop (in a blocking manner). void JoinThread(); - void MemoryInfo(MemoryTracker* tracker) const override { - tracker->TrackField("parent_port", parent_port_); - } - template inline bool RequestInterrupt(Fn&& cb); + void MemoryInfo(MemoryTracker* tracker) const override; SET_MEMORY_INFO_NAME(Worker) SET_SELF_SIZE(Worker) From 9de09f37f8e1a6b57bba7947251225a6ac682972 Mon Sep 17 00:00:00 2001 From: Anna Henningsen Date: Thu, 16 Jan 2020 18:19:10 +0100 Subject: [PATCH 7/7] report: add support for Workers Include a report for each sub-Worker of the current Node.js instance. This adds a feature that is necessary for eventually making the report feature stable, as was discussed during the last collaborator summit. Refs: https://github.com/openjs-foundation/summit/pull/240 --- doc/api/report.md | 21 +++++++++++++ src/env-inl.h | 5 ++++ src/env.h | 2 ++ src/node_report.cc | 43 +++++++++++++++++++++++++- src/node_report.h | 9 ++++++ src/node_report_utils.cc | 24 +++++++++++++++ test/common/report.js | 11 +++++-- test/report/test-report-worker.js | 50 +++++++++++++++++++++++++++++++ 8 files changed, 161 insertions(+), 4 deletions(-) create mode 100644 test/report/test-report-worker.js diff --git a/doc/api/report.md b/doc/api/report.md index b39a5286398b2c..b5ff3d8a88e3e5 100644 --- a/doc/api/report.md +++ b/doc/api/report.md @@ -296,6 +296,7 @@ is provided below for reference. "address": "0x000055fc7b2cb180" } ], + "workers": [], "environmentVariables": { "REMOTEHOST": "REMOVED", "MANPATH": "/opt/rh/devtoolset-3/root/usr/share/man:", @@ -577,4 +578,24 @@ NODE_OPTIONS="--experimental-report --report-uncaught-exception \ Specific API documentation can be found under [`process API documentation`][] section. +## Interaction with Workers + + +[`Worker`][] threads can create reports in the same way that the main thread +does. + +Reports will include information on any Workers that are children of the current +thread as part of the `workers` section, with each Worker generating a report +in the standard report format. + +The thread which is generating the report will wait for the reports from Worker +threads to finish. However, the latency for this will usually be low, as both +running JavaScript and the event loop are interrupted to generate the report. + [`process API documentation`]: process.html +[`Worker`]: worker_threads.html diff --git a/src/env-inl.h b/src/env-inl.h index 7b408cab633bdf..2002df9abaf1a4 100644 --- a/src/env-inl.h +++ b/src/env-inl.h @@ -909,6 +909,11 @@ inline void Environment::remove_sub_worker_context(worker::Worker* context) { sub_worker_contexts_.erase(context); } +template +inline void Environment::ForEachWorker(Fn&& iterator) { + for (worker::Worker* w : sub_worker_contexts_) iterator(w); +} + inline void Environment::add_refs(int64_t diff) { task_queues_async_refs_ += diff; CHECK_GE(task_queues_async_refs_, 0); diff --git a/src/env.h b/src/env.h index ed1dcb13d0bf7d..1f8442d478b585 100644 --- a/src/env.h +++ b/src/env.h @@ -1064,6 +1064,8 @@ class Environment : public MemoryRetainer { inline void add_sub_worker_context(worker::Worker* context); inline void remove_sub_worker_context(worker::Worker* context); void stop_sub_worker_contexts(); + template + inline void ForEachWorker(Fn&& iterator); inline bool is_stopping() const; inline void set_stopping(bool value); inline std::list* extra_linked_bindings(); diff --git a/src/node_report.cc b/src/node_report.cc index ddeb216c82d6bc..9b32352326becf 100644 --- a/src/node_report.cc +++ b/src/node_report.cc @@ -4,6 +4,8 @@ #include "diagnosticfilename-inl.h" #include "node_internals.h" #include "node_metadata.h" +#include "node_mutex.h" +#include "node_worker.h" #include "util.h" #ifdef _WIN32 @@ -19,18 +21,20 @@ #include #include -constexpr int NODE_REPORT_VERSION = 1; +constexpr int NODE_REPORT_VERSION = 2; constexpr int NANOS_PER_SEC = 1000 * 1000 * 1000; constexpr double SEC_PER_MICROS = 1e-6; namespace report { using node::arraysize; +using node::ConditionVariable; using node::DiagnosticFilename; using node::Environment; using node::Mutex; using node::NativeSymbolDebuggingContext; using node::PerIsolateOptions; using node::TIME_TYPE; +using node::worker::Worker; using v8::HeapSpaceStatistics; using v8::HeapStatistics; using v8::Isolate; @@ -210,6 +214,10 @@ static void WriteNodeReport(Isolate* isolate, // Report native process ID writer.json_keyvalue("processId", pid); + if (env != nullptr) + writer.json_keyvalue("threadId", env->thread_id()); + else + writer.json_keyvalue("threadId", JSONWriter::Null{}); { // Report the process cwd. @@ -259,6 +267,39 @@ static void WriteNodeReport(Isolate* isolate, writer.json_arrayend(); + writer.json_arraystart("workers"); + if (env != nullptr) { + Mutex workers_mutex; + ConditionVariable notify; + std::vector worker_infos; + size_t expected_results = 0; + + env->ForEachWorker([&](Worker* w) { + expected_results += w->RequestInterrupt([&](Environment* env) { + std::ostringstream os; + + GetNodeReport(env->isolate(), + env, + "Worker thread subreport", + trigger, + Local(), + os); + + Mutex::ScopedLock lock(workers_mutex); + worker_infos.emplace_back(os.str()); + notify.Signal(lock); + }); + }); + + Mutex::ScopedLock lock(workers_mutex); + worker_infos.reserve(expected_results); + while (worker_infos.size() < expected_results) + notify.Wait(lock); + for (const std::string& worker_info : worker_infos) + writer.json_element(JSONWriter::ForeignJSON { worker_info }); + } + writer.json_arrayend(); + // Report operating system information PrintSystemInformation(&writer); diff --git a/src/node_report.h b/src/node_report.h index 4cb82470f43594..46b69b9681db51 100644 --- a/src/node_report.h +++ b/src/node_report.h @@ -44,6 +44,7 @@ void GetNodeReport(v8::Isolate* isolate, // Function declarations - utility functions in src/node_report_utils.cc void WalkHandle(uv_handle_t* h, void* arg); std::string EscapeJsonChars(const std::string& str); +std::string Reindent(const std::string& str, int indentation); template std::string ValueToHexString(T value) { @@ -146,6 +147,10 @@ class JSONWriter { struct Null {}; // Usable as a JSON value. + struct ForeignJSON { + std::string as_string; + }; + private: template { @@ -253,6 +254,10 @@ function _validateContent(report) { report.sharedObjects.forEach((sharedObject) => { assert.strictEqual(typeof sharedObject, 'string'); }); + + // Verify the format of the workers section. + assert(Array.isArray(report.workers)); + report.workers.forEach(_validateContent); } function checkForUnknownFields(actual, expected) { diff --git a/test/report/test-report-worker.js b/test/report/test-report-worker.js new file mode 100644 index 00000000000000..a34c05f08431de --- /dev/null +++ b/test/report/test-report-worker.js @@ -0,0 +1,50 @@ +// Flags: --experimental-report +'use strict'; +const common = require('../common'); +common.skipIfReportDisabled(); +const assert = require('assert'); +const { Worker } = require('worker_threads'); +const { once } = require('events'); +const helper = require('../common/report'); + +async function basic() { + // Test that the report includes basic information about Worker threads. + + const w = new Worker(` + const { parentPort } = require('worker_threads'); + parentPort.once('message', () => { + /* Wait for message to stop the Worker */ + }); + `, { eval: true }); + + await once(w, 'online'); + + const report = process.report.getReport(); + helper.validateContent(report); + assert.strictEqual(report.workers.length, 1); + helper.validateContent(report.workers[0]); + + w.postMessage({}); + + await once(w, 'exit'); +} + +async function interruptingJS() { + // Test that the report also works when Worker threads are busy in JS land. + + const w = new Worker('while (true);', { eval: true }); + + await once(w, 'online'); + + const report = process.report.getReport(); + helper.validateContent(report); + assert.strictEqual(report.workers.length, 1); + helper.validateContent(report.workers[0]); + + await w.terminate(); +} + +(async function() { + await basic(); + await interruptingJS(); +})().then(common.mustCall());