From 42ccd43272a2b5a03cbe4de60f38b2fbf8bfb499 Mon Sep 17 00:00:00 2001 From: Fedor Indutny Date: Sat, 1 May 2021 11:26:46 -0700 Subject: [PATCH 1/2] node-api: faster threadsafe_function Invoke threadsafe_function during the same tick and avoid marshalling costs between threads and/or churning event loop if either: 1. There's a queued call already 2. `Push()` is called while the main thread was running threadsafe_function --- src/node_api.cc | 58 ++++++++++++++++--- .../node-api/test_threadsafe_function/test.js | 4 +- 2 files changed, 52 insertions(+), 10 deletions(-) diff --git a/src/node_api.cc b/src/node_api.cc index 8dbf48d466dfe1..9f3fbdf4fbfa88 100644 --- a/src/node_api.cc +++ b/src/node_api.cc @@ -12,6 +12,7 @@ #include "tracing/traced_value.h" #include "util-inl.h" +#include #include struct node_napi_env__ : public napi_env__ { @@ -137,6 +138,7 @@ class ThreadSafeFunction : public node::AsyncResource { *v8::String::Utf8Value(env_->isolate, name)), thread_count(thread_count_), is_closing(false), + dispatch_state(kDispatchIdle), context(context_), max_queue_size(max_queue_size_), env(env_), @@ -176,7 +178,7 @@ class ThreadSafeFunction : public node::AsyncResource { return napi_closing; } } else { - if (uv_async_send(&async) != 0) { + if (Send() != 0) { return napi_generic_failure; } queue.push(data); @@ -211,7 +213,7 @@ class ThreadSafeFunction : public node::AsyncResource { if (is_closing && max_queue_size > 0) { cond->Signal(lock); } - if (uv_async_send(&async) != 0) { + if (Send() != 0) { return napi_generic_failure; } } @@ -275,9 +277,32 @@ class ThreadSafeFunction : public node::AsyncResource { return napi_ok; } - void DispatchOne() { + inline void* Context() { + return context; + } + + protected: + void Dispatch() { + bool has_more = true; + + // Limit maximum synchronous iteration count to prevent event loop + // starvation. See `src/node_messaging.cc` for an inspiration. + unsigned int iterations_left = kMaxIterationCount; + while (has_more && --iterations_left != 0) { + dispatch_state = kDispatchRunning; + has_more = DispatchOne(); + + // Send() was called while we were executing the JS function + if (dispatch_state.exchange(kDispatchIdle) != kDispatchRunning) { + has_more = true; + } + } + } + + bool DispatchOne() { void* data = nullptr; bool popped_value = false; + bool has_more = false; { node::Mutex::ScopedLock lock(this->mutex); @@ -305,6 +330,8 @@ class ThreadSafeFunction : public node::AsyncResource { } else { CHECK_EQ(0, uv_idle_stop(&idle)); } + } else { + has_more = true; } } } @@ -322,6 +349,8 @@ class ThreadSafeFunction : public node::AsyncResource { call_js_cb(env, js_callback, context, data); }); } + + return has_more; } void Finalize() { @@ -335,10 +364,6 @@ class ThreadSafeFunction : public node::AsyncResource { EmptyQueueAndDelete(); } - inline void* Context() { - return context; - } - void CloseHandlesAndMaybeDelete(bool set_closing = false) { v8::HandleScope scope(env->isolate); if (set_closing) { @@ -370,6 +395,16 @@ class ThreadSafeFunction : public node::AsyncResource { }); } + int Send() { + // Ask currently running Dispatch() to make one more iteration + unsigned char current_state = dispatch_state.fetch_or(kDispatchPending); + if ((current_state & kDispatchRunning) == kDispatchRunning) { + return 0; + } + + return uv_async_send(&async); + } + // Default way of calling into JavaScript. Used when ThreadSafeFunction is // without a call_js_cb_. static void CallJs(napi_env env, napi_value cb, void* context, void* data) { @@ -396,7 +431,7 @@ class ThreadSafeFunction : public node::AsyncResource { static void IdleCb(uv_idle_t* idle) { ThreadSafeFunction* ts_fn = node::ContainerOf(&ThreadSafeFunction::idle, idle); - ts_fn->DispatchOne(); + ts_fn->Dispatch(); } static void AsyncCb(uv_async_t* async) { @@ -411,6 +446,12 @@ class ThreadSafeFunction : public node::AsyncResource { } private: + static const unsigned char kDispatchIdle = 0; + static const unsigned char kDispatchRunning = 1 << 0; + static const unsigned char kDispatchPending = 1 << 1; + + static const unsigned int kMaxIterationCount = 1000; + // These are variables protected by the mutex. node::Mutex mutex; std::unique_ptr cond; @@ -419,6 +460,7 @@ class ThreadSafeFunction : public node::AsyncResource { uv_idle_t idle; size_t thread_count; bool is_closing; + std::atomic_uchar dispatch_state; // These are variables set once, upon creation, and then never again, which // means we don't need the mutex to read them. diff --git a/test/node-api/test_threadsafe_function/test.js b/test/node-api/test_threadsafe_function/test.js index 01542e4edb97e6..6a8b5d38727a56 100644 --- a/test/node-api/test_threadsafe_function/test.js +++ b/test/node-api/test_threadsafe_function/test.js @@ -43,7 +43,7 @@ function testWithJSMarshaller({ binding[threadStarter](function testCallback(value) { array.push(value); if (array.length === quitAfter) { - setImmediate(() => { + process.nextTick(() => { binding.StopThread(common.mustCall(() => { resolve(array); }), !!abort); @@ -85,7 +85,7 @@ new Promise(function testWithoutJSMarshaller(resolve) { // The default call-into-JS implementation passes no arguments. assert.strictEqual(arguments.length, 0); if (callCount === binding.ARRAY_LENGTH) { - setImmediate(() => { + process.nextTick(() => { binding.StopThread(common.mustCall(() => { resolve(); }), false); From 9835c84521b8376fe72c70172823057158dbd987 Mon Sep 17 00:00:00 2001 From: Fedor Indutny Date: Sun, 2 May 2021 15:30:45 -0700 Subject: [PATCH 2/2] simplify --- src/node_api.cc | 42 +++++-------------- .../test_threadsafe_function/binding.c | 4 +- .../node-api/test_threadsafe_function/test.js | 13 +++++- 3 files changed, 24 insertions(+), 35 deletions(-) diff --git a/src/node_api.cc b/src/node_api.cc index 9f3fbdf4fbfa88..4216356cb59593 100644 --- a/src/node_api.cc +++ b/src/node_api.cc @@ -178,10 +178,8 @@ class ThreadSafeFunction : public node::AsyncResource { return napi_closing; } } else { - if (Send() != 0) { - return napi_generic_failure; - } queue.push(data); + Send(); return napi_ok; } } @@ -213,9 +211,7 @@ class ThreadSafeFunction : public node::AsyncResource { if (is_closing && max_queue_size > 0) { cond->Signal(lock); } - if (Send() != 0) { - return napi_generic_failure; - } + Send(); } } @@ -240,7 +236,6 @@ class ThreadSafeFunction : public node::AsyncResource { cond = std::make_unique(); } if (max_queue_size == 0 || cond) { - CHECK_EQ(0, uv_idle_init(loop, &idle)); return napi_ok; } @@ -265,14 +260,12 @@ class ThreadSafeFunction : public node::AsyncResource { napi_status Unref() { uv_unref(reinterpret_cast(&async)); - uv_unref(reinterpret_cast(&idle)); return napi_ok; } napi_status Ref() { uv_ref(reinterpret_cast(&async)); - uv_ref(reinterpret_cast(&idle)); return napi_ok; } @@ -297,6 +290,10 @@ class ThreadSafeFunction : public node::AsyncResource { has_more = true; } } + + if (has_more) { + Send(); + } } bool DispatchOne() { @@ -327,8 +324,6 @@ class ThreadSafeFunction : public node::AsyncResource { cond->Signal(lock); } CloseHandlesAndMaybeDelete(); - } else { - CHECK_EQ(0, uv_idle_stop(&idle)); } } else { has_more = true; @@ -383,26 +378,18 @@ class ThreadSafeFunction : public node::AsyncResource { ThreadSafeFunction* ts_fn = node::ContainerOf(&ThreadSafeFunction::async, reinterpret_cast(handle)); - v8::HandleScope scope(ts_fn->env->isolate); - ts_fn->env->node_env()->CloseHandle( - reinterpret_cast(&ts_fn->idle), - [](uv_handle_t* handle) -> void { - ThreadSafeFunction* ts_fn = - node::ContainerOf(&ThreadSafeFunction::idle, - reinterpret_cast(handle)); - ts_fn->Finalize(); - }); + ts_fn->Finalize(); }); } - int Send() { + void Send() { // Ask currently running Dispatch() to make one more iteration unsigned char current_state = dispatch_state.fetch_or(kDispatchPending); if ((current_state & kDispatchRunning) == kDispatchRunning) { - return 0; + return; } - return uv_async_send(&async); + CHECK_EQ(0, uv_async_send(&async)); } // Default way of calling into JavaScript. Used when ThreadSafeFunction is @@ -428,16 +415,10 @@ class ThreadSafeFunction : public node::AsyncResource { } } - static void IdleCb(uv_idle_t* idle) { - ThreadSafeFunction* ts_fn = - node::ContainerOf(&ThreadSafeFunction::idle, idle); - ts_fn->Dispatch(); - } - static void AsyncCb(uv_async_t* async) { ThreadSafeFunction* ts_fn = node::ContainerOf(&ThreadSafeFunction::async, async); - CHECK_EQ(0, uv_idle_start(&ts_fn->idle, IdleCb)); + ts_fn->Dispatch(); } static void Cleanup(void* data) { @@ -457,7 +438,6 @@ class ThreadSafeFunction : public node::AsyncResource { std::unique_ptr cond; std::queue queue; uv_async_t async; - uv_idle_t idle; size_t thread_count; bool is_closing; std::atomic_uchar dispatch_state; diff --git a/test/node-api/test_threadsafe_function/binding.c b/test/node-api/test_threadsafe_function/binding.c index 7e586aaba61c56..d64b9058b11326 100644 --- a/test/node-api/test_threadsafe_function/binding.c +++ b/test/node-api/test_threadsafe_function/binding.c @@ -7,7 +7,7 @@ #include #include "../../js-native-api/common.h" -#define ARRAY_LENGTH 10 +#define ARRAY_LENGTH 10000 #define MAX_QUEUE_SIZE 2 static uv_thread_t uv_threads[2]; @@ -72,7 +72,7 @@ static void data_source_thread(void* data) { for (index = ARRAY_LENGTH - 1; index > -1 && !queue_was_closing; index--) { status = napi_call_threadsafe_function(ts_fn, &ints[index], ts_fn_info->block_on_full); - if (ts_fn_info->max_queue_size == 0) { + if (ts_fn_info->max_queue_size == 0 && (index % 1000 == 0)) { // Let's make this thread really busy for 200 ms to give the main thread a // chance to abort. uint64_t start = uv_hrtime(); diff --git a/test/node-api/test_threadsafe_function/test.js b/test/node-api/test_threadsafe_function/test.js index 6a8b5d38727a56..ff8a7d80849079 100644 --- a/test/node-api/test_threadsafe_function/test.js +++ b/test/node-api/test_threadsafe_function/test.js @@ -43,7 +43,7 @@ function testWithJSMarshaller({ binding[threadStarter](function testCallback(value) { array.push(value); if (array.length === quitAfter) { - process.nextTick(() => { + setImmediate(() => { binding.StopThread(common.mustCall(() => { resolve(array); }), !!abort); @@ -85,7 +85,7 @@ new Promise(function testWithoutJSMarshaller(resolve) { // The default call-into-JS implementation passes no arguments. assert.strictEqual(arguments.length, 0); if (callCount === binding.ARRAY_LENGTH) { - process.nextTick(() => { + setImmediate(() => { binding.StopThread(common.mustCall(() => { resolve(); }), false); @@ -211,6 +211,15 @@ new Promise(function testWithoutJSMarshaller(resolve) { })) .then((result) => assert.strictEqual(result.indexOf(0), -1)) +// Make sure that threadsafe function isn't stalled when we hit +// `kMaxIterationCount` in `src/node_api.cc` +.then(() => testWithJSMarshaller({ + threadStarter: 'StartThreadNonblocking', + maxQueueSize: binding.ARRAY_LENGTH >>> 1, + quitAfter: binding.ARRAY_LENGTH +})) +.then((result) => assert.deepStrictEqual(result, expectedArray)) + // Start a child process to test rapid teardown .then(() => testUnref(binding.MAX_QUEUE_SIZE))