diff --git a/doc/api/report.md b/doc/api/report.md index b39a5286398b2c..b5ff3d8a88e3e5 100644 --- a/doc/api/report.md +++ b/doc/api/report.md @@ -296,6 +296,7 @@ is provided below for reference. "address": "0x000055fc7b2cb180" } ], + "workers": [], "environmentVariables": { "REMOTEHOST": "REMOVED", "MANPATH": "/opt/rh/devtoolset-3/root/usr/share/man:", @@ -577,4 +578,24 @@ NODE_OPTIONS="--experimental-report --report-uncaught-exception \ Specific API documentation can be found under [`process API documentation`][] section. +## Interaction with Workers + + +[`Worker`][] threads can create reports in the same way that the main thread +does. + +Reports will include information on any Workers that are children of the current +thread as part of the `workers` section, with each Worker generating a report +in the standard report format. + +The thread which is generating the report will wait for the reports from Worker +threads to finish. However, the latency for this will usually be low, as both +running JavaScript and the event loop are interrupted to generate the report. + [`process API documentation`]: process.html +[`Worker`]: worker_threads.html diff --git a/src/env-inl.h b/src/env-inl.h index cd111bc2641890..2002df9abaf1a4 100644 --- a/src/env-inl.h +++ b/src/env-inl.h @@ -242,14 +242,6 @@ inline bool ImmediateInfo::has_outstanding() const { return fields_[kHasOutstanding] == 1; } -inline void ImmediateInfo::count_inc(uint32_t increment) { - fields_[kCount] += increment; -} - -inline void ImmediateInfo::count_dec(uint32_t decrement) { - fields_[kCount] -= decrement; -} - inline void ImmediateInfo::ref_count_inc(uint32_t increment) { fields_[kRefCount] += increment; } @@ -748,19 +740,51 @@ inline void IsolateData::set_options( options_ = std::move(options); } -template -void Environment::CreateImmediate(Fn&& cb, bool ref) { - auto callback = std::make_unique>( - std::move(cb), ref); - NativeImmediateCallback* prev_tail = native_immediate_callbacks_tail_; +std::unique_ptr +Environment::NativeImmediateQueue::Shift() { + std::unique_ptr ret = std::move(head_); + if (ret) { + head_ = ret->get_next(); + if (!head_) + tail_ = nullptr; // The queue is now empty. + } + size_--; + return ret; +} - native_immediate_callbacks_tail_ = callback.get(); +void Environment::NativeImmediateQueue::Push( + std::unique_ptr cb) { + NativeImmediateCallback* prev_tail = tail_; + + size_++; + tail_ = cb.get(); if (prev_tail != nullptr) - prev_tail->set_next(std::move(callback)); + prev_tail->set_next(std::move(cb)); + else + head_ = std::move(cb); +} + +void Environment::NativeImmediateQueue::ConcatMove( + NativeImmediateQueue&& other) { + size_ += other.size_; + if (tail_ != nullptr) + tail_->set_next(std::move(other.head_)); else - native_immediate_callbacks_head_ = std::move(callback); + head_ = std::move(other.head_); + tail_ = other.tail_; + other.tail_ = nullptr; + other.size_ = 0; +} - immediate_info()->count_inc(1); +size_t Environment::NativeImmediateQueue::size() const { + return size_.load(); +} + +template +void Environment::CreateImmediate(Fn&& cb, bool ref) { + auto callback = std::make_unique>( + std::move(cb), ref); + native_immediates_.Push(std::move(callback)); } template @@ -777,6 +801,29 @@ void Environment::SetUnrefImmediate(Fn&& cb) { CreateImmediate(std::move(cb), false); } +template +void Environment::SetImmediateThreadsafe(Fn&& cb) { + auto callback = std::make_unique>( + std::move(cb), false); + { + Mutex::ScopedLock lock(native_immediates_threadsafe_mutex_); + native_immediates_threadsafe_.Push(std::move(callback)); + } + uv_async_send(&task_queues_async_); +} + +template +void Environment::RequestInterrupt(Fn&& cb) { + auto callback = std::make_unique>( + std::move(cb), false); + { + Mutex::ScopedLock lock(native_immediates_threadsafe_mutex_); + native_immediates_interrupts_.Push(std::move(callback)); + } + uv_async_send(&task_queues_async_); + RequestInterruptFromV8(); +} + Environment::NativeImmediateCallback::NativeImmediateCallback(bool refed) : refed_(refed) {} @@ -862,8 +909,26 @@ inline void Environment::remove_sub_worker_context(worker::Worker* context) { sub_worker_contexts_.erase(context); } +template +inline void Environment::ForEachWorker(Fn&& iterator) { + for (worker::Worker* w : sub_worker_contexts_) iterator(w); +} + +inline void Environment::add_refs(int64_t diff) { + task_queues_async_refs_ += diff; + CHECK_GE(task_queues_async_refs_, 0); + if (task_queues_async_refs_ == 0) + uv_unref(reinterpret_cast(&task_queues_async_)); + else + uv_ref(reinterpret_cast(&task_queues_async_)); +} + inline bool Environment::is_stopping() const { - return thread_stopper_.is_stopped(); + return is_stopping_.load(); +} + +inline void Environment::set_stopping(bool value) { + is_stopping_.store(value); } inline std::list* Environment::extra_linked_bindings() { @@ -1146,7 +1211,7 @@ void Environment::RemoveCleanupHook(void (*fn)(void*), void* arg) { inline void Environment::RegisterFinalizationGroupForCleanup( v8::Local group) { cleanup_finalization_groups_.emplace_back(isolate(), group); - uv_async_send(&cleanup_finalization_groups_async_); + uv_async_send(&task_queues_async_); } size_t CleanupHookCallback::Hash::operator()( @@ -1183,14 +1248,6 @@ int64_t Environment::base_object_count() const { return base_object_count_; } -bool AsyncRequest::is_stopped() const { - return stopped_.load(); -} - -void AsyncRequest::set_stopped(bool flag) { - stopped_.store(flag); -} - #define VP(PropertyName, StringValue) V(v8::Private, PropertyName) #define VY(PropertyName, StringValue) V(v8::Symbol, PropertyName) #define VS(PropertyName, StringValue) V(v8::String, PropertyName) diff --git a/src/env.cc b/src/env.cc index 51c5cc7c497f23..ba5e9cd5a05218 100644 --- a/src/env.cc +++ b/src/env.cc @@ -388,6 +388,8 @@ Environment::Environment(IsolateData* isolate_data, } Environment::~Environment() { + if (interrupt_data_ != nullptr) *interrupt_data_ = nullptr; + isolate()->GetHeapProfiler()->RemoveBuildEmbedderGraphCallback( BuildEmbedderGraph, this); @@ -460,23 +462,16 @@ void Environment::InitializeLibuv(bool start_profiler_idle_notifier) { uv_check_init(event_loop(), &idle_check_handle_); uv_async_init( event_loop(), - &cleanup_finalization_groups_async_, + &task_queues_async_, [](uv_async_t* async) { Environment* env = ContainerOf( - &Environment::cleanup_finalization_groups_async_, async); + &Environment::task_queues_async_, async); env->CleanupFinalizationGroups(); + env->RunAndClearNativeImmediates(); }); uv_unref(reinterpret_cast(&idle_prepare_handle_)); uv_unref(reinterpret_cast(&idle_check_handle_)); - uv_unref(reinterpret_cast(&cleanup_finalization_groups_async_)); - - thread_stopper()->Install( - this, static_cast(this), [](uv_async_t* handle) { - Environment* env = static_cast(handle->data); - uv_stop(env->event_loop()); - }); - thread_stopper()->set_stopped(false); - uv_unref(reinterpret_cast(thread_stopper()->GetHandle())); + uv_unref(reinterpret_cast(&task_queues_async_)); // Register clean-up cb to be called to clean up the handles // when the environment is freed, note that they are not cleaned in @@ -495,8 +490,9 @@ void Environment::InitializeLibuv(bool start_profiler_idle_notifier) { void Environment::ExitEnv() { set_can_call_into_js(false); - thread_stopper()->Stop(); + set_stopping(true); isolate_->TerminateExecution(); + SetImmediateThreadsafe([](Environment* env) { uv_stop(env->event_loop()); }); } void Environment::RegisterHandleCleanups() { @@ -532,7 +528,7 @@ void Environment::RegisterHandleCleanups() { close_and_finish, nullptr); RegisterHandleCleanup( - reinterpret_cast(&cleanup_finalization_groups_async_), + reinterpret_cast(&task_queues_async_), close_and_finish, nullptr); } @@ -601,7 +597,6 @@ void Environment::RunCleanup() { started_cleanup_ = true; TraceEventScope trace_scope(TRACING_CATEGORY_NODE1(environment), "RunCleanup", this); - thread_stopper()->Uninstall(); CleanupHandles(); while (!cleanup_hooks_.empty()) { @@ -658,20 +653,45 @@ void Environment::AtExit(void (*cb)(void* arg), void* arg) { at_exit_functions_.push_front(ExitCallback{cb, arg}); } +void Environment::RunAndClearInterrupts() { + while (native_immediates_interrupts_.size() > 0) { + NativeImmediateQueue queue; + { + Mutex::ScopedLock lock(native_immediates_threadsafe_mutex_); + queue.ConcatMove(std::move(native_immediates_interrupts_)); + } + DebugSealHandleScope seal_handle_scope(isolate()); + + while (std::unique_ptr head = queue.Shift()) + head->Call(this); + } +} + void Environment::RunAndClearNativeImmediates(bool only_refed) { TraceEventScope trace_scope(TRACING_CATEGORY_NODE1(environment), "RunAndClearNativeImmediates", this); size_t ref_count = 0; - size_t count = 0; - std::unique_ptr head; - head.swap(native_immediate_callbacks_head_); - native_immediate_callbacks_tail_ = nullptr; + + // Handle interrupts first. These functions are not allowed to throw + // exceptions, so we do not need to handle that. + RunAndClearInterrupts(); + + // It is safe to check .size() first, because there is a causal relationship + // between pushes to the threadsafe and this function being called. + // For the common case, it's worth checking the size first before establishing + // a mutex lock. + if (native_immediates_threadsafe_.size() > 0) { + Mutex::ScopedLock lock(native_immediates_threadsafe_mutex_); + native_immediates_.ConcatMove(std::move(native_immediates_threadsafe_)); + } + + NativeImmediateQueue queue; + queue.ConcatMove(std::move(native_immediates_)); auto drain_list = [&]() { TryCatchScope try_catch(this); - for (; head; head = head->get_next()) { - DebugSealHandleScope seal_handle_scope(isolate()); - count++; + DebugSealHandleScope seal_handle_scope(isolate()); + while (std::unique_ptr head = queue.Shift()) { if (head->is_refed()) ref_count++; @@ -682,21 +702,40 @@ void Environment::RunAndClearNativeImmediates(bool only_refed) { if (!try_catch.HasTerminated() && can_call_into_js()) errors::TriggerUncaughtException(isolate(), try_catch); - // We are done with the current callback. Move one iteration along, - // as if we had completed successfully. - head = head->get_next(); return true; } } return false; }; - while (head && drain_list()) {} + while (queue.size() > 0 && drain_list()) {} - DCHECK_GE(immediate_info()->count(), count); - immediate_info()->count_dec(count); immediate_info()->ref_count_dec(ref_count); + + if (immediate_info()->ref_count() == 0) + ToggleImmediateRef(false); } +void Environment::RequestInterruptFromV8() { + if (interrupt_data_ != nullptr) return; // Already scheduled. + + // The Isolate may outlive the Environment, so some logic to handle the + // situation in which the Environment is destroyed before the handler runs + // is required. + interrupt_data_ = new Environment*(this); + + isolate()->RequestInterrupt([](Isolate* isolate, void* data) { + std::unique_ptr env_ptr { static_cast(data) }; + Environment* env = *env_ptr; + if (env == nullptr) { + // The Environment has already been destroyed. That should be okay; any + // callback added before the Environment shuts down would have been + // handled during cleanup. + return; + } + env->interrupt_data_ = nullptr; + env->RunAndClearInterrupts(); + }, interrupt_data_); +} void Environment::ScheduleTimer(int64_t duration_ms) { if (started_cleanup_) return; @@ -780,15 +819,12 @@ void Environment::CheckImmediate(uv_check_t* handle) { TraceEventScope trace_scope(TRACING_CATEGORY_NODE1(environment), "CheckImmediate", env); - if (env->immediate_info()->count() == 0) - return; - HandleScope scope(env->isolate()); Context::Scope context_scope(env->context()); env->RunAndClearNativeImmediates(); - if (!env->can_call_into_js()) + if (env->immediate_info()->count() == 0 || !env->can_call_into_js()) return; do { @@ -1011,7 +1047,6 @@ inline size_t Environment::SelfSize() const { // TODO(joyeecheung): refactor the MemoryTracker interface so // this can be done for common types within the Track* calls automatically // if a certain scope is entered. - size -= sizeof(thread_stopper_); size -= sizeof(async_hooks_); size -= sizeof(tick_info_); size -= sizeof(immediate_info_); @@ -1033,7 +1068,6 @@ void Environment::MemoryInfo(MemoryTracker* tracker) const { tracker->TrackField("fs_stats_field_array", fs_stats_field_array_); tracker->TrackField("fs_stats_field_bigint_array", fs_stats_field_bigint_array_); - tracker->TrackField("thread_stopper", thread_stopper_); tracker->TrackField("cleanup_hooks", cleanup_hooks_); tracker->TrackField("async_hooks", async_hooks_); tracker->TrackField("immediate_info", immediate_info_); @@ -1091,44 +1125,12 @@ void Environment::CleanupFinalizationGroups() { if (try_catch.HasCaught() && !try_catch.HasTerminated()) errors::TriggerUncaughtException(isolate(), try_catch); // Re-schedule the execution of the remainder of the queue. - uv_async_send(&cleanup_finalization_groups_async_); + uv_async_send(&task_queues_async_); return; } } } -void AsyncRequest::Install(Environment* env, void* data, uv_async_cb target) { - CHECK_NULL(async_); - env_ = env; - async_ = new uv_async_t; - async_->data = data; - CHECK_EQ(uv_async_init(env_->event_loop(), async_, target), 0); -} - -void AsyncRequest::Uninstall() { - if (async_ != nullptr) { - env_->CloseHandle(async_, [](uv_async_t* async) { delete async; }); - async_ = nullptr; - } -} - -void AsyncRequest::Stop() { - set_stopped(true); - if (async_ != nullptr) uv_async_send(async_); -} - -uv_async_t* AsyncRequest::GetHandle() { - return async_; -} - -void AsyncRequest::MemoryInfo(MemoryTracker* tracker) const { - if (async_ != nullptr) tracker->TrackField("async_request", *async_); -} - -AsyncRequest::~AsyncRequest() { - CHECK_NULL(async_); -} - // Not really any better place than env.cc at this moment. void BaseObject::DeleteMe(void* data) { BaseObject* self = static_cast(data); diff --git a/src/env.h b/src/env.h index fcba4dcde67f53..1f8442d478b585 100644 --- a/src/env.h +++ b/src/env.h @@ -587,34 +587,6 @@ struct AllocatedBuffer { friend class Environment; }; -class AsyncRequest : public MemoryRetainer { - public: - AsyncRequest() = default; - ~AsyncRequest() override; - - AsyncRequest(const AsyncRequest&) = delete; - AsyncRequest& operator=(const AsyncRequest&) = delete; - AsyncRequest(AsyncRequest&&) = delete; - AsyncRequest& operator=(AsyncRequest&&) = delete; - - void Install(Environment* env, void* data, uv_async_cb target); - void Uninstall(); - void Stop(); - inline void set_stopped(bool flag); - inline bool is_stopped() const; - uv_async_t* GetHandle(); - void MemoryInfo(MemoryTracker* tracker) const override; - - - SET_MEMORY_INFO_NAME(AsyncRequest) - SET_SELF_SIZE(AsyncRequest) - - private: - Environment* env_; - uv_async_t* async_ = nullptr; - std::atomic_bool stopped_ {true}; -}; - class KVStore { public: KVStore() = default; @@ -736,8 +708,6 @@ class ImmediateInfo : public MemoryRetainer { inline uint32_t count() const; inline uint32_t ref_count() const; inline bool has_outstanding() const; - inline void count_inc(uint32_t increment); - inline void count_dec(uint32_t decrement); inline void ref_count_inc(uint32_t increment); inline void ref_count_dec(uint32_t decrement); @@ -1067,6 +1037,14 @@ class Environment : public MemoryRetainer { inline bool can_call_into_js() const; inline void set_can_call_into_js(bool can_call_into_js); + // Increase or decrease a counter that manages whether this Environment + // keeps the event loop alive on its own or not. The counter starts out at 0, + // meaning it does not, and any positive value will make it keep the event + // loop alive. + // This is used by Workers to manage their own .ref()/.unref() implementation, + // as Workers aren't directly associated with their own libuv handles. + inline void add_refs(int64_t diff); + inline bool has_run_bootstrapping_code() const; inline void set_has_run_bootstrapping_code(bool has_run_bootstrapping_code); @@ -1086,7 +1064,10 @@ class Environment : public MemoryRetainer { inline void add_sub_worker_context(worker::Worker* context); inline void remove_sub_worker_context(worker::Worker* context); void stop_sub_worker_contexts(); + template + inline void ForEachWorker(Fn&& iterator); inline bool is_stopping() const; + inline void set_stopping(bool value); inline std::list* extra_linked_bindings(); inline node_module* extra_linked_bindings_head(); inline const Mutex& extra_linked_bindings_mutex() const; @@ -1201,6 +1182,15 @@ class Environment : public MemoryRetainer { inline void SetImmediate(Fn&& cb); template inline void SetUnrefImmediate(Fn&& cb); + template + // This behaves like SetImmediate() but can be called from any thread. + inline void SetImmediateThreadsafe(Fn&& cb); + // This behaves like V8's Isolate::RequestInterrupt(), but also accounts for + // the event loop (i.e. combines the V8 function with SetImmediate()). + // The passed callback may not throw exceptions. + // This function can be called from any thread. + template + inline void RequestInterrupt(Fn&& cb); // This needs to be available for the JS-land setImmediate(). void ToggleImmediateRef(bool ref); @@ -1225,8 +1215,6 @@ class Environment : public MemoryRetainer { inline std::shared_ptr options(); inline std::shared_ptr inspector_host_port(); - inline AsyncRequest* thread_stopper() { return &thread_stopper_; } - // The BaseObject count is a debugging helper that makes sure that there are // no memory leaks caused by BaseObjects staying alive longer than expected // (in particular, no circular BaseObjectPtr references). @@ -1286,7 +1274,8 @@ class Environment : public MemoryRetainer { uv_idle_t immediate_idle_handle_; uv_prepare_t idle_prepare_handle_; uv_check_t idle_check_handle_; - uv_async_t cleanup_finalization_groups_async_; + uv_async_t task_queues_async_; + int64_t task_queues_async_refs_ = 0; bool profiler_idle_notifier_started_ = false; AsyncHooks async_hooks_; @@ -1344,7 +1333,7 @@ class Environment : public MemoryRetainer { bool has_run_bootstrapping_code_ = false; bool has_serialized_options_ = false; - bool can_call_into_js_ = true; + std::atomic_bool can_call_into_js_ { true }; Flags flags_; uint64_t thread_id_; std::unordered_set sub_worker_contexts_; @@ -1430,10 +1419,32 @@ class Environment : public MemoryRetainer { Fn callback_; }; - std::unique_ptr native_immediate_callbacks_head_; - NativeImmediateCallback* native_immediate_callbacks_tail_ = nullptr; + class NativeImmediateQueue { + public: + inline std::unique_ptr Shift(); + inline void Push(std::unique_ptr cb); + // ConcatMove adds elements from 'other' to the end of this list, and clears + // 'other' afterwards. + inline void ConcatMove(NativeImmediateQueue&& other); + + // size() is atomic and may be called from any thread. + inline size_t size() const; + + private: + std::atomic size_ {0}; + std::unique_ptr head_; + NativeImmediateCallback* tail_ = nullptr; + }; + + NativeImmediateQueue native_immediates_; + Mutex native_immediates_threadsafe_mutex_; + NativeImmediateQueue native_immediates_threadsafe_; + NativeImmediateQueue native_immediates_interrupts_; void RunAndClearNativeImmediates(bool only_refed = false); + void RunAndClearInterrupts(); + Environment** interrupt_data_ = nullptr; + void RequestInterruptFromV8(); static void CheckImmediate(uv_check_t* handle); // Use an unordered_set, so that we have efficient insertion and removal. @@ -1444,10 +1455,7 @@ class Environment : public MemoryRetainer { bool started_cleanup_ = false; int64_t base_object_count_ = 0; - - // A custom async abstraction (a pair of async handle and a state variable) - // Used by embedders to shutdown running Node instance. - AsyncRequest thread_stopper_; + std::atomic_bool is_stopping_ { false }; template void ForEachBaseObject(T&& iterator); diff --git a/src/node_messaging.cc b/src/node_messaging.cc index 1028da69b44961..ebad6c67508105 100644 --- a/src/node_messaging.cc +++ b/src/node_messaging.cc @@ -900,6 +900,10 @@ void MessagePort::Entangle(MessagePort* a, MessagePortData* b) { MessagePortData::Entangle(a->data_.get(), b); } +void MessagePort::MemoryInfo(MemoryTracker* tracker) const { + tracker->TrackField("data", data_); +} + Local GetMessagePortConstructorTemplate(Environment* env) { // Factor generating the MessagePort JS constructor into its own piece // of code, because it is needed early on in the child environment setup. diff --git a/src/node_messaging.h b/src/node_messaging.h index 526158e144d354..6d2410a7248b95 100644 --- a/src/node_messaging.h +++ b/src/node_messaging.h @@ -191,10 +191,7 @@ class MessagePort : public HandleWrap { // NULL pointer to the C++ MessagePort object is also detached. inline bool IsDetached() const; - void MemoryInfo(MemoryTracker* tracker) const override { - tracker->TrackField("data", data_); - } - + void MemoryInfo(MemoryTracker* tracker) const override; SET_MEMORY_INFO_NAME(MessagePort) SET_SELF_SIZE(MessagePort) diff --git a/src/node_report.cc b/src/node_report.cc index ddeb216c82d6bc..9b32352326becf 100644 --- a/src/node_report.cc +++ b/src/node_report.cc @@ -4,6 +4,8 @@ #include "diagnosticfilename-inl.h" #include "node_internals.h" #include "node_metadata.h" +#include "node_mutex.h" +#include "node_worker.h" #include "util.h" #ifdef _WIN32 @@ -19,18 +21,20 @@ #include #include -constexpr int NODE_REPORT_VERSION = 1; +constexpr int NODE_REPORT_VERSION = 2; constexpr int NANOS_PER_SEC = 1000 * 1000 * 1000; constexpr double SEC_PER_MICROS = 1e-6; namespace report { using node::arraysize; +using node::ConditionVariable; using node::DiagnosticFilename; using node::Environment; using node::Mutex; using node::NativeSymbolDebuggingContext; using node::PerIsolateOptions; using node::TIME_TYPE; +using node::worker::Worker; using v8::HeapSpaceStatistics; using v8::HeapStatistics; using v8::Isolate; @@ -210,6 +214,10 @@ static void WriteNodeReport(Isolate* isolate, // Report native process ID writer.json_keyvalue("processId", pid); + if (env != nullptr) + writer.json_keyvalue("threadId", env->thread_id()); + else + writer.json_keyvalue("threadId", JSONWriter::Null{}); { // Report the process cwd. @@ -259,6 +267,39 @@ static void WriteNodeReport(Isolate* isolate, writer.json_arrayend(); + writer.json_arraystart("workers"); + if (env != nullptr) { + Mutex workers_mutex; + ConditionVariable notify; + std::vector worker_infos; + size_t expected_results = 0; + + env->ForEachWorker([&](Worker* w) { + expected_results += w->RequestInterrupt([&](Environment* env) { + std::ostringstream os; + + GetNodeReport(env->isolate(), + env, + "Worker thread subreport", + trigger, + Local(), + os); + + Mutex::ScopedLock lock(workers_mutex); + worker_infos.emplace_back(os.str()); + notify.Signal(lock); + }); + }); + + Mutex::ScopedLock lock(workers_mutex); + worker_infos.reserve(expected_results); + while (worker_infos.size() < expected_results) + notify.Wait(lock); + for (const std::string& worker_info : worker_infos) + writer.json_element(JSONWriter::ForeignJSON { worker_info }); + } + writer.json_arrayend(); + // Report operating system information PrintSystemInformation(&writer); diff --git a/src/node_report.h b/src/node_report.h index 4cb82470f43594..46b69b9681db51 100644 --- a/src/node_report.h +++ b/src/node_report.h @@ -44,6 +44,7 @@ void GetNodeReport(v8::Isolate* isolate, // Function declarations - utility functions in src/node_report_utils.cc void WalkHandle(uv_handle_t* h, void* arg); std::string EscapeJsonChars(const std::string& str); +std::string Reindent(const std::string& str, int indentation); template std::string ValueToHexString(T value) { @@ -146,6 +147,10 @@ class JSONWriter { struct Null {}; // Usable as a JSON value. + struct ForeignJSON { + std::string as_string; + }; + private: template env_ = nullptr; } - env_->thread_stopper()->set_stopped(true); + env_->set_stopping(true); env_->stop_sub_worker_contexts(); env_->RunCleanup(); RunAtExit(env_.get()); @@ -412,7 +412,6 @@ void Worker::JoinThread() { thread_joined_ = true; env()->remove_sub_worker_context(this); - on_thread_finished_.Uninstall(); { HandleScope handle_scope(env()->isolate()); @@ -439,6 +438,8 @@ void Worker::JoinThread() { } Worker::~Worker() { + JoinThread(); + Mutex::ScopedLock lock(mutex_); CHECK(stopped_); @@ -574,18 +575,16 @@ void Worker::StartThread(const FunctionCallbackInfo& args) { w->stopped_ = false; w->thread_joined_ = false; - w->on_thread_finished_.Install(w->env(), w, [](uv_async_t* handle) { - Worker* w_ = static_cast(handle->data); - CHECK(w_->is_stopped()); - w_->parent_port_ = nullptr; - w_->JoinThread(); - delete w_; - }); + if (w->has_ref_) + w->env()->add_refs(1); uv_thread_options_t thread_options; thread_options.flags = UV_THREAD_HAS_STACK_SIZE; thread_options.stack_size = kStackSize; CHECK_EQ(uv_thread_create_ex(&w->tid_, &thread_options, [](void* arg) { + // XXX: This could become a std::unique_ptr, but that makes at least + // gcc 6.3 detect undefined behaviour when there shouldn't be any. + // gcc 7+ handles this well. Worker* w = static_cast(arg); const uintptr_t stack_top = reinterpret_cast(&arg); @@ -596,7 +595,12 @@ void Worker::StartThread(const FunctionCallbackInfo& args) { w->Run(); Mutex::ScopedLock lock(w->mutex_); - w->on_thread_finished_.Stop(); + w->env()->SetImmediateThreadsafe( + [w = std::unique_ptr(w)](Environment* env) { + if (w->has_ref_) + env->add_refs(-1); + // implicitly delete w + }); }, static_cast(w)), 0); } @@ -611,13 +615,19 @@ void Worker::StopThread(const FunctionCallbackInfo& args) { void Worker::Ref(const FunctionCallbackInfo& args) { Worker* w; ASSIGN_OR_RETURN_UNWRAP(&w, args.This()); - uv_ref(reinterpret_cast(w->on_thread_finished_.GetHandle())); + if (!w->has_ref_) { + w->has_ref_ = true; + w->env()->add_refs(1); + } } void Worker::Unref(const FunctionCallbackInfo& args) { Worker* w; ASSIGN_OR_RETURN_UNWRAP(&w, args.This()); - uv_unref(reinterpret_cast(w->on_thread_finished_.GetHandle())); + if (w->has_ref_) { + w->has_ref_ = false; + w->env()->add_refs(-1); + } } void Worker::GetResourceLimits(const FunctionCallbackInfo& args) { @@ -646,6 +656,10 @@ void Worker::Exit(int code) { } } +void Worker::MemoryInfo(MemoryTracker* tracker) const { + tracker->TrackField("parent_port", parent_port_); +} + namespace { // Return the MessagePort that is global for this Environment and communicates diff --git a/src/node_worker.h b/src/node_worker.h index 7b1311734a2a4a..f84362f35c1897 100644 --- a/src/node_worker.h +++ b/src/node_worker.h @@ -39,11 +39,10 @@ class Worker : public AsyncWrap { // Wait for the worker thread to stop (in a blocking manner). void JoinThread(); - void MemoryInfo(MemoryTracker* tracker) const override { - tracker->TrackField("parent_port", parent_port_); - tracker->TrackInlineField(&on_thread_finished_, "on_thread_finished_"); - } + template + inline bool RequestInterrupt(Fn&& cb); + void MemoryInfo(MemoryTracker* tracker) const override; SET_MEMORY_INFO_NAME(Worker) SET_SELF_SIZE(Worker) @@ -107,14 +106,14 @@ class Worker : public AsyncWrap { // instance refers to it via its [kPort] property. MessagePort* parent_port_ = nullptr; - AsyncRequest on_thread_finished_; - // A raw flag that is used by creator and worker threads to // sync up on pre-mature termination of worker - while in the // warmup phase. Once the worker is fully warmed up, use the // async handle of the worker's Environment for the same purpose. bool stopped_ = true; + bool has_ref_ = true; + // The real Environment of the worker object. It has a lesser // lifespan than the worker object itself - comes to life // when the worker thread creates a new Environment, and gets @@ -124,6 +123,14 @@ class Worker : public AsyncWrap { friend class WorkerThreadData; }; +template +bool Worker::RequestInterrupt(Fn&& cb) { + Mutex::ScopedLock lock(mutex_); + if (env_ == nullptr) return false; + env_->RequestInterrupt(std::move(cb)); + return true; +} + } // namespace worker } // namespace node diff --git a/test/common/report.js b/test/common/report.js index fdb15924cab770..c117d7b76459bc 100644 --- a/test/common/report.js +++ b/test/common/report.js @@ -53,7 +53,7 @@ function _validateContent(report) { // Verify that all sections are present as own properties of the report. const sections = ['header', 'javascriptStack', 'nativeStack', 'javascriptHeap', 'libuv', 'environmentVariables', - 'sharedObjects', 'resourceUsage']; + 'sharedObjects', 'resourceUsage', 'workers']; if (!isWindows) sections.push('userLimits'); @@ -74,9 +74,9 @@ function _validateContent(report) { 'componentVersions', 'release', 'osName', 'osRelease', 'osVersion', 'osMachine', 'cpus', 'host', 'glibcVersionRuntime', 'glibcVersionCompiler', 'cwd', - 'reportVersion', 'networkInterfaces']; + 'reportVersion', 'networkInterfaces', 'threadId']; checkForUnknownFields(header, headerFields); - assert.strictEqual(header.reportVersion, 1); // Increment as needed. + assert.strictEqual(header.reportVersion, 2); // Increment as needed. assert.strictEqual(typeof header.event, 'string'); assert.strictEqual(typeof header.trigger, 'string'); assert(typeof header.filename === 'string' || header.filename === null); @@ -84,6 +84,7 @@ function _validateContent(report) { 'Invalid Date'); assert(String(+header.dumpEventTimeStamp), header.dumpEventTimeStamp); assert(Number.isSafeInteger(header.processId)); + assert(Number.isSafeInteger(header.threadId) || header.threadId === null); assert.strictEqual(typeof header.cwd, 'string'); assert(Array.isArray(header.commandLine)); header.commandLine.forEach((arg) => { @@ -253,6 +254,10 @@ function _validateContent(report) { report.sharedObjects.forEach((sharedObject) => { assert.strictEqual(typeof sharedObject, 'string'); }); + + // Verify the format of the workers section. + assert(Array.isArray(report.workers)); + report.workers.forEach(_validateContent); } function checkForUnknownFields(actual, expected) { diff --git a/test/report/test-report-worker.js b/test/report/test-report-worker.js new file mode 100644 index 00000000000000..a34c05f08431de --- /dev/null +++ b/test/report/test-report-worker.js @@ -0,0 +1,50 @@ +// Flags: --experimental-report +'use strict'; +const common = require('../common'); +common.skipIfReportDisabled(); +const assert = require('assert'); +const { Worker } = require('worker_threads'); +const { once } = require('events'); +const helper = require('../common/report'); + +async function basic() { + // Test that the report includes basic information about Worker threads. + + const w = new Worker(` + const { parentPort } = require('worker_threads'); + parentPort.once('message', () => { + /* Wait for message to stop the Worker */ + }); + `, { eval: true }); + + await once(w, 'online'); + + const report = process.report.getReport(); + helper.validateContent(report); + assert.strictEqual(report.workers.length, 1); + helper.validateContent(report.workers[0]); + + w.postMessage({}); + + await once(w, 'exit'); +} + +async function interruptingJS() { + // Test that the report also works when Worker threads are busy in JS land. + + const w = new Worker('while (true);', { eval: true }); + + await once(w, 'online'); + + const report = process.report.getReport(); + helper.validateContent(report); + assert.strictEqual(report.workers.length, 1); + helper.validateContent(report.workers[0]); + + await w.terminate(); +} + +(async function() { + await basic(); + await interruptingJS(); +})().then(common.mustCall());