Skip to content

Commit

Permalink
src: remove AsyncRequest
Browse files Browse the repository at this point in the history
Remove `AsyncRequest` from the source code, and replace its
usage with threadsafe `SetImmediate()` calls. This has the
advantage of being able to pass in any function, rather than
one that is defined when the `AsyncRequest` is “installed”.

This necessitates two changes:

- The stopping flag (which was only used in one case and ignored
  in the other) is now a direct member of the `Environment` class.
- Workers no longer have their own libuv handles, requiring
  manual management of their libuv ref count.

As a drive-by fix, the `can_call_into_js` variable was turned
into an atomic variable. While there have been no bug reports,
the flag is set from `Stop(env)` calls, which are supposed to
be possible from any thread.

PR-URL: nodejs#31386
Refs: openjs-foundation/summit#240
Reviewed-By: Gireesh Punathil <[email protected]>
Reviewed-By: James M Snell <[email protected]>
Reviewed-By: Colin Ihrig <[email protected]>
Reviewed-By: Rich Trott <[email protected]>
  • Loading branch information
addaleax committed Jan 21, 2020
1 parent 79a45e2 commit de2c68c
Show file tree
Hide file tree
Showing 5 changed files with 52 additions and 103 deletions.
23 changes: 14 additions & 9 deletions src/env-inl.h
Original file line number Diff line number Diff line change
Expand Up @@ -897,8 +897,21 @@ inline void Environment::remove_sub_worker_context(worker::Worker* context) {
sub_worker_contexts_.erase(context);
}

inline void Environment::add_refs(int64_t diff) {
task_queues_async_refs_ += diff;
CHECK_GE(task_queues_async_refs_, 0);
if (task_queues_async_refs_ == 0)
uv_unref(reinterpret_cast<uv_handle_t*>(&task_queues_async_));
else
uv_ref(reinterpret_cast<uv_handle_t*>(&task_queues_async_));
}

inline bool Environment::is_stopping() const {
return thread_stopper_.is_stopped();
return is_stopping_.load();
}

inline void Environment::set_stopping(bool value) {
is_stopping_.store(value);
}

inline std::list<node_module>* Environment::extra_linked_bindings() {
Expand Down Expand Up @@ -1218,14 +1231,6 @@ int64_t Environment::base_object_count() const {
return base_object_count_;
}

bool AsyncRequest::is_stopped() const {
return stopped_.load();
}

void AsyncRequest::set_stopped(bool flag) {
stopped_.store(flag);
}

#define VP(PropertyName, StringValue) V(v8::Private, PropertyName)
#define VY(PropertyName, StringValue) V(v8::Symbol, PropertyName)
#define VS(PropertyName, StringValue) V(v8::String, PropertyName)
Expand Down
46 changes: 2 additions & 44 deletions src/env.cc
Original file line number Diff line number Diff line change
Expand Up @@ -471,14 +471,6 @@ void Environment::InitializeLibuv(bool start_profiler_idle_notifier) {
uv_unref(reinterpret_cast<uv_handle_t*>(&idle_check_handle_));
uv_unref(reinterpret_cast<uv_handle_t*>(&task_queues_async_));

thread_stopper()->Install(
this, static_cast<void*>(this), [](uv_async_t* handle) {
Environment* env = static_cast<Environment*>(handle->data);
uv_stop(env->event_loop());
});
thread_stopper()->set_stopped(false);
uv_unref(reinterpret_cast<uv_handle_t*>(thread_stopper()->GetHandle()));

// Register clean-up cb to be called to clean up the handles
// when the environment is freed, note that they are not cleaned in
// the one environment per process setup, but will be called in
Expand All @@ -496,8 +488,9 @@ void Environment::InitializeLibuv(bool start_profiler_idle_notifier) {

void Environment::ExitEnv() {
set_can_call_into_js(false);
thread_stopper()->Stop();
set_stopping(true);
isolate_->TerminateExecution();
SetImmediateThreadsafe([](Environment* env) { uv_stop(env->event_loop()); });
}

void Environment::RegisterHandleCleanups() {
Expand Down Expand Up @@ -602,7 +595,6 @@ void Environment::RunCleanup() {
started_cleanup_ = true;
TraceEventScope trace_scope(TRACING_CATEGORY_NODE1(environment),
"RunCleanup", this);
thread_stopper()->Uninstall();
CleanupHandles();

while (!cleanup_hooks_.empty()) {
Expand Down Expand Up @@ -1014,7 +1006,6 @@ inline size_t Environment::SelfSize() const {
// TODO(joyeecheung): refactor the MemoryTracker interface so
// this can be done for common types within the Track* calls automatically
// if a certain scope is entered.
size -= sizeof(thread_stopper_);
size -= sizeof(async_hooks_);
size -= sizeof(tick_info_);
size -= sizeof(immediate_info_);
Expand All @@ -1036,7 +1027,6 @@ void Environment::MemoryInfo(MemoryTracker* tracker) const {
tracker->TrackField("fs_stats_field_array", fs_stats_field_array_);
tracker->TrackField("fs_stats_field_bigint_array",
fs_stats_field_bigint_array_);
tracker->TrackField("thread_stopper", thread_stopper_);
tracker->TrackField("cleanup_hooks", cleanup_hooks_);
tracker->TrackField("async_hooks", async_hooks_);
tracker->TrackField("immediate_info", immediate_info_);
Expand Down Expand Up @@ -1100,38 +1090,6 @@ void Environment::CleanupFinalizationGroups() {
}
}

void AsyncRequest::Install(Environment* env, void* data, uv_async_cb target) {
CHECK_NULL(async_);
env_ = env;
async_ = new uv_async_t;
async_->data = data;
CHECK_EQ(uv_async_init(env_->event_loop(), async_, target), 0);
}

void AsyncRequest::Uninstall() {
if (async_ != nullptr) {
env_->CloseHandle(async_, [](uv_async_t* async) { delete async; });
async_ = nullptr;
}
}

void AsyncRequest::Stop() {
set_stopped(true);
if (async_ != nullptr) uv_async_send(async_);
}

uv_async_t* AsyncRequest::GetHandle() {
return async_;
}

void AsyncRequest::MemoryInfo(MemoryTracker* tracker) const {
if (async_ != nullptr) tracker->TrackField("async_request", *async_);
}

AsyncRequest::~AsyncRequest() {
CHECK_NULL(async_);
}

// Not really any better place than env.cc at this moment.
void BaseObject::DeleteMe(void* data) {
BaseObject* self = static_cast<BaseObject*>(data);
Expand Down
47 changes: 12 additions & 35 deletions src/env.h
Original file line number Diff line number Diff line change
Expand Up @@ -587,34 +587,6 @@ struct AllocatedBuffer {
friend class Environment;
};

class AsyncRequest : public MemoryRetainer {
public:
AsyncRequest() = default;
~AsyncRequest() override;

AsyncRequest(const AsyncRequest&) = delete;
AsyncRequest& operator=(const AsyncRequest&) = delete;
AsyncRequest(AsyncRequest&&) = delete;
AsyncRequest& operator=(AsyncRequest&&) = delete;

void Install(Environment* env, void* data, uv_async_cb target);
void Uninstall();
void Stop();
inline void set_stopped(bool flag);
inline bool is_stopped() const;
uv_async_t* GetHandle();
void MemoryInfo(MemoryTracker* tracker) const override;


SET_MEMORY_INFO_NAME(AsyncRequest)
SET_SELF_SIZE(AsyncRequest)

private:
Environment* env_;
uv_async_t* async_ = nullptr;
std::atomic_bool stopped_ {true};
};

class KVStore {
public:
KVStore() = default;
Expand Down Expand Up @@ -1065,6 +1037,14 @@ class Environment : public MemoryRetainer {
inline bool can_call_into_js() const;
inline void set_can_call_into_js(bool can_call_into_js);

// Increase or decrease a counter that manages whether this Environment
// keeps the event loop alive on its own or not. The counter starts out at 0,
// meaning it does not, and any positive value will make it keep the event
// loop alive.
// This is used by Workers to manage their own .ref()/.unref() implementation,
// as Workers aren't directly associated with their own libuv handles.
inline void add_refs(int64_t diff);

inline bool has_run_bootstrapping_code() const;
inline void set_has_run_bootstrapping_code(bool has_run_bootstrapping_code);

Expand All @@ -1085,6 +1065,7 @@ class Environment : public MemoryRetainer {
inline void remove_sub_worker_context(worker::Worker* context);
void stop_sub_worker_contexts();
inline bool is_stopping() const;
inline void set_stopping(bool value);
inline std::list<node_module>* extra_linked_bindings();
inline node_module* extra_linked_bindings_head();
inline const Mutex& extra_linked_bindings_mutex() const;
Expand Down Expand Up @@ -1226,8 +1207,6 @@ class Environment : public MemoryRetainer {
inline std::shared_ptr<EnvironmentOptions> options();
inline std::shared_ptr<HostPort> inspector_host_port();

inline AsyncRequest* thread_stopper() { return &thread_stopper_; }

// The BaseObject count is a debugging helper that makes sure that there are
// no memory leaks caused by BaseObjects staying alive longer than expected
// (in particular, no circular BaseObjectPtr references).
Expand Down Expand Up @@ -1288,6 +1267,7 @@ class Environment : public MemoryRetainer {
uv_prepare_t idle_prepare_handle_;
uv_check_t idle_check_handle_;
uv_async_t task_queues_async_;
int64_t task_queues_async_refs_ = 0;
bool profiler_idle_notifier_started_ = false;

AsyncHooks async_hooks_;
Expand Down Expand Up @@ -1345,7 +1325,7 @@ class Environment : public MemoryRetainer {
bool has_run_bootstrapping_code_ = false;
bool has_serialized_options_ = false;

bool can_call_into_js_ = true;
std::atomic_bool can_call_into_js_ { true };
Flags flags_;
uint64_t thread_id_;
std::unordered_set<worker::Worker*> sub_worker_contexts_;
Expand Down Expand Up @@ -1463,10 +1443,7 @@ class Environment : public MemoryRetainer {
bool started_cleanup_ = false;

int64_t base_object_count_ = 0;

// A custom async abstraction (a pair of async handle and a state variable)
// Used by embedders to shutdown running Node instance.
AsyncRequest thread_stopper_;
std::atomic_bool is_stopping_ { false };

template <typename T>
void ForEachBaseObject(T&& iterator);
Expand Down
34 changes: 22 additions & 12 deletions src/node_worker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,7 @@ void Worker::Run() {
stopped_ = true;
this->env_ = nullptr;
}
env_->thread_stopper()->set_stopped(true);
env_->set_stopping(true);
env_->stop_sub_worker_contexts();
env_->RunCleanup();
RunAtExit(env_.get());
Expand Down Expand Up @@ -412,7 +412,6 @@ void Worker::JoinThread() {
thread_joined_ = true;

env()->remove_sub_worker_context(this);
on_thread_finished_.Uninstall();

{
HandleScope handle_scope(env()->isolate());
Expand All @@ -439,6 +438,8 @@ void Worker::JoinThread() {
}

Worker::~Worker() {
JoinThread();

Mutex::ScopedLock lock(mutex_);

CHECK(stopped_);
Expand Down Expand Up @@ -574,18 +575,16 @@ void Worker::StartThread(const FunctionCallbackInfo<Value>& args) {
w->stopped_ = false;
w->thread_joined_ = false;

w->on_thread_finished_.Install(w->env(), w, [](uv_async_t* handle) {
Worker* w_ = static_cast<Worker*>(handle->data);
CHECK(w_->is_stopped());
w_->parent_port_ = nullptr;
w_->JoinThread();
delete w_;
});
if (w->has_ref_)
w->env()->add_refs(1);

uv_thread_options_t thread_options;
thread_options.flags = UV_THREAD_HAS_STACK_SIZE;
thread_options.stack_size = kStackSize;
CHECK_EQ(uv_thread_create_ex(&w->tid_, &thread_options, [](void* arg) {
// XXX: This could become a std::unique_ptr, but that makes at least
// gcc 6.3 detect undefined behaviour when there shouldn't be any.
// gcc 7+ handles this well.
Worker* w = static_cast<Worker*>(arg);
const uintptr_t stack_top = reinterpret_cast<uintptr_t>(&arg);

Expand All @@ -596,7 +595,12 @@ void Worker::StartThread(const FunctionCallbackInfo<Value>& args) {
w->Run();

Mutex::ScopedLock lock(w->mutex_);
w->on_thread_finished_.Stop();
w->env()->SetImmediateThreadsafe(
[w = std::unique_ptr<Worker>(w)](Environment* env) {
if (w->has_ref_)
env->add_refs(-1);
// implicitly delete w
});
}, static_cast<void*>(w)), 0);
}

Expand All @@ -611,13 +615,19 @@ void Worker::StopThread(const FunctionCallbackInfo<Value>& args) {
void Worker::Ref(const FunctionCallbackInfo<Value>& args) {
Worker* w;
ASSIGN_OR_RETURN_UNWRAP(&w, args.This());
uv_ref(reinterpret_cast<uv_handle_t*>(w->on_thread_finished_.GetHandle()));
if (!w->has_ref_) {
w->has_ref_ = true;
w->env()->add_refs(1);
}
}

void Worker::Unref(const FunctionCallbackInfo<Value>& args) {
Worker* w;
ASSIGN_OR_RETURN_UNWRAP(&w, args.This());
uv_unref(reinterpret_cast<uv_handle_t*>(w->on_thread_finished_.GetHandle()));
if (w->has_ref_) {
w->has_ref_ = false;
w->env()->add_refs(-1);
}
}

void Worker::GetResourceLimits(const FunctionCallbackInfo<Value>& args) {
Expand Down
5 changes: 2 additions & 3 deletions src/node_worker.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ class Worker : public AsyncWrap {

void MemoryInfo(MemoryTracker* tracker) const override {
tracker->TrackField("parent_port", parent_port_);
tracker->TrackInlineField(&on_thread_finished_, "on_thread_finished_");
}

SET_MEMORY_INFO_NAME(Worker)
Expand Down Expand Up @@ -107,14 +106,14 @@ class Worker : public AsyncWrap {
// instance refers to it via its [kPort] property.
MessagePort* parent_port_ = nullptr;

AsyncRequest on_thread_finished_;

// A raw flag that is used by creator and worker threads to
// sync up on pre-mature termination of worker - while in the
// warmup phase. Once the worker is fully warmed up, use the
// async handle of the worker's Environment for the same purpose.
bool stopped_ = true;

bool has_ref_ = true;

// The real Environment of the worker object. It has a lesser
// lifespan than the worker object itself - comes to life
// when the worker thread creates a new Environment, and gets
Expand Down

0 comments on commit de2c68c

Please sign in to comment.