forked from electron/electron
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
chore: cherry-pick 7abc7e45b2 from node
Backports: nodejs/node#38506
- Loading branch information
1 parent
8f69f4f
commit 74a6476
Showing
2 changed files
with
263 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,262 @@ | ||
From 0000000000000000000000000000000000000000 Mon Sep 17 00:00:00 2001 | ||
From: Fedor Indutny <[email protected]> | ||
Date: Sat, 1 May 2021 11:26:46 -0700 | ||
Subject: node-api: faster threadsafe_function | ||
|
||
Invoke threadsafe_function during the same tick and avoid marshalling | ||
costs between threads and/or churning event loop if either: | ||
|
||
1. There's a queued call already | ||
2. `Push()` is called while the main thread was running | ||
threadsafe_function | ||
|
||
PR-URL: https://github.com/nodejs/node/pull/38506 | ||
Reviewed-By: Anna Henningsen <[email protected]> | ||
Reviewed-By: Rich Trott <[email protected]> | ||
Reviewed-By: James M Snell <[email protected]> | ||
|
||
diff --git a/src/node_api.cc b/src/node_api.cc | ||
index f1a5265b6a7234dc754aedc86ecd3132f3d90b09..d1076b29aeb5133a0325d3e7ebd097d207e4f4a6 100644 | ||
--- a/src/node_api.cc | ||
+++ b/src/node_api.cc | ||
@@ -12,6 +12,7 @@ | ||
#include "tracing/traced_value.h" | ||
#include "util-inl.h" | ||
|
||
+#include <atomic> | ||
#include <memory> | ||
|
||
struct node_napi_env__ : public napi_env__ { | ||
@@ -131,6 +132,7 @@ class ThreadSafeFunction : public node::AsyncResource { | ||
*v8::String::Utf8Value(env_->isolate, name)), | ||
thread_count(thread_count_), | ||
is_closing(false), | ||
+ dispatch_state(kDispatchIdle), | ||
context(context_), | ||
max_queue_size(max_queue_size_), | ||
env(env_), | ||
@@ -170,10 +172,8 @@ class ThreadSafeFunction : public node::AsyncResource { | ||
return napi_closing; | ||
} | ||
} else { | ||
- if (uv_async_send(&async) != 0) { | ||
- return napi_generic_failure; | ||
- } | ||
queue.push(data); | ||
+ Send(); | ||
return napi_ok; | ||
} | ||
} | ||
@@ -205,9 +205,7 @@ class ThreadSafeFunction : public node::AsyncResource { | ||
if (is_closing && max_queue_size > 0) { | ||
cond->Signal(lock); | ||
} | ||
- if (uv_async_send(&async) != 0) { | ||
- return napi_generic_failure; | ||
- } | ||
+ Send(); | ||
} | ||
} | ||
|
||
@@ -232,7 +230,6 @@ class ThreadSafeFunction : public node::AsyncResource { | ||
cond = std::make_unique<node::ConditionVariable>(); | ||
} | ||
if (max_queue_size == 0 || cond) { | ||
- CHECK_EQ(0, uv_idle_init(loop, &idle)); | ||
return napi_ok; | ||
} | ||
|
||
@@ -257,21 +254,46 @@ class ThreadSafeFunction : public node::AsyncResource { | ||
|
||
napi_status Unref() { | ||
uv_unref(reinterpret_cast<uv_handle_t*>(&async)); | ||
- uv_unref(reinterpret_cast<uv_handle_t*>(&idle)); | ||
|
||
return napi_ok; | ||
} | ||
|
||
napi_status Ref() { | ||
uv_ref(reinterpret_cast<uv_handle_t*>(&async)); | ||
- uv_ref(reinterpret_cast<uv_handle_t*>(&idle)); | ||
|
||
return napi_ok; | ||
} | ||
|
||
- void DispatchOne() { | ||
+ inline void* Context() { | ||
+ return context; | ||
+ } | ||
+ | ||
+ protected: | ||
+ void Dispatch() { | ||
+ bool has_more = true; | ||
+ | ||
+ // Limit maximum synchronous iteration count to prevent event loop | ||
+ // starvation. See `src/node_messaging.cc` for an inspiration. | ||
+ unsigned int iterations_left = kMaxIterationCount; | ||
+ while (has_more && --iterations_left != 0) { | ||
+ dispatch_state = kDispatchRunning; | ||
+ has_more = DispatchOne(); | ||
+ | ||
+ // Send() was called while we were executing the JS function | ||
+ if (dispatch_state.exchange(kDispatchIdle) != kDispatchRunning) { | ||
+ has_more = true; | ||
+ } | ||
+ } | ||
+ | ||
+ if (has_more) { | ||
+ Send(); | ||
+ } | ||
+ } | ||
+ | ||
+ bool DispatchOne() { | ||
void* data = nullptr; | ||
bool popped_value = false; | ||
+ bool has_more = false; | ||
|
||
{ | ||
node::Mutex::ScopedLock lock(this->mutex); | ||
@@ -296,9 +318,9 @@ class ThreadSafeFunction : public node::AsyncResource { | ||
cond->Signal(lock); | ||
} | ||
CloseHandlesAndMaybeDelete(); | ||
- } else { | ||
- CHECK_EQ(0, uv_idle_stop(&idle)); | ||
} | ||
+ } else { | ||
+ has_more = true; | ||
} | ||
} | ||
} | ||
@@ -316,6 +338,8 @@ class ThreadSafeFunction : public node::AsyncResource { | ||
call_js_cb(env, js_callback, context, data); | ||
}); | ||
} | ||
+ | ||
+ return has_more; | ||
} | ||
|
||
void Finalize() { | ||
@@ -329,10 +353,6 @@ class ThreadSafeFunction : public node::AsyncResource { | ||
EmptyQueueAndDelete(); | ||
} | ||
|
||
- inline void* Context() { | ||
- return context; | ||
- } | ||
- | ||
void CloseHandlesAndMaybeDelete(bool set_closing = false) { | ||
v8::HandleScope scope(env->isolate); | ||
if (set_closing) { | ||
@@ -352,18 +372,20 @@ class ThreadSafeFunction : public node::AsyncResource { | ||
ThreadSafeFunction* ts_fn = | ||
node::ContainerOf(&ThreadSafeFunction::async, | ||
reinterpret_cast<uv_async_t*>(handle)); | ||
- v8::HandleScope scope(ts_fn->env->isolate); | ||
- ts_fn->env->node_env()->CloseHandle( | ||
- reinterpret_cast<uv_handle_t*>(&ts_fn->idle), | ||
- [](uv_handle_t* handle) -> void { | ||
- ThreadSafeFunction* ts_fn = | ||
- node::ContainerOf(&ThreadSafeFunction::idle, | ||
- reinterpret_cast<uv_idle_t*>(handle)); | ||
- ts_fn->Finalize(); | ||
- }); | ||
+ ts_fn->Finalize(); | ||
}); | ||
} | ||
|
||
+ void Send() { | ||
+ // Ask currently running Dispatch() to make one more iteration | ||
+ unsigned char current_state = dispatch_state.fetch_or(kDispatchPending); | ||
+ if ((current_state & kDispatchRunning) == kDispatchRunning) { | ||
+ return; | ||
+ } | ||
+ | ||
+ CHECK_EQ(0, uv_async_send(&async)); | ||
+ } | ||
+ | ||
// Default way of calling into JavaScript. Used when ThreadSafeFunction is | ||
// without a call_js_cb_. | ||
static void CallJs(napi_env env, napi_value cb, void* context, void* data) { | ||
@@ -387,16 +409,10 @@ class ThreadSafeFunction : public node::AsyncResource { | ||
} | ||
} | ||
|
||
- static void IdleCb(uv_idle_t* idle) { | ||
- ThreadSafeFunction* ts_fn = | ||
- node::ContainerOf(&ThreadSafeFunction::idle, idle); | ||
- ts_fn->DispatchOne(); | ||
- } | ||
- | ||
static void AsyncCb(uv_async_t* async) { | ||
ThreadSafeFunction* ts_fn = | ||
node::ContainerOf(&ThreadSafeFunction::async, async); | ||
- CHECK_EQ(0, uv_idle_start(&ts_fn->idle, IdleCb)); | ||
+ ts_fn->Dispatch(); | ||
} | ||
|
||
static void Cleanup(void* data) { | ||
@@ -405,14 +421,20 @@ class ThreadSafeFunction : public node::AsyncResource { | ||
} | ||
|
||
private: | ||
+ static const unsigned char kDispatchIdle = 0; | ||
+ static const unsigned char kDispatchRunning = 1 << 0; | ||
+ static const unsigned char kDispatchPending = 1 << 1; | ||
+ | ||
+ static const unsigned int kMaxIterationCount = 1000; | ||
+ | ||
// These are variables protected by the mutex. | ||
node::Mutex mutex; | ||
std::unique_ptr<node::ConditionVariable> cond; | ||
std::queue<void*> queue; | ||
uv_async_t async; | ||
- uv_idle_t idle; | ||
size_t thread_count; | ||
bool is_closing; | ||
+ std::atomic_uchar dispatch_state; | ||
|
||
// These are variables set once, upon creation, and then never again, which | ||
// means we don't need the mutex to read them. | ||
diff --git a/test/node-api/test_threadsafe_function/binding.c b/test/node-api/test_threadsafe_function/binding.c | ||
index c9c526153804c60b5bd5844d2c2aacac7f0fb514..ae3ec67de43cfffdfb10772761dbd69f01c623bb 100644 | ||
--- a/test/node-api/test_threadsafe_function/binding.c | ||
+++ b/test/node-api/test_threadsafe_function/binding.c | ||
@@ -7,7 +7,7 @@ | ||
#include <node_api.h> | ||
#include "../../js-native-api/common.h" | ||
|
||
-#define ARRAY_LENGTH 10 | ||
+#define ARRAY_LENGTH 10000 | ||
#define MAX_QUEUE_SIZE 2 | ||
|
||
static uv_thread_t uv_threads[2]; | ||
@@ -72,7 +72,7 @@ static void data_source_thread(void* data) { | ||
for (index = ARRAY_LENGTH - 1; index > -1 && !queue_was_closing; index--) { | ||
status = napi_call_threadsafe_function(ts_fn, &ints[index], | ||
ts_fn_info->block_on_full); | ||
- if (ts_fn_info->max_queue_size == 0) { | ||
+ if (ts_fn_info->max_queue_size == 0 && (index % 1000 == 0)) { | ||
// Let's make this thread really busy for 200 ms to give the main thread a | ||
// chance to abort. | ||
uint64_t start = uv_hrtime(); | ||
diff --git a/test/node-api/test_threadsafe_function/test.js b/test/node-api/test_threadsafe_function/test.js | ||
index 3603d79ee6b5d36590503989d8168368eaf12b03..ccd3f4228a793ae77eff760309e31191ba8de49a 100644 | ||
--- a/test/node-api/test_threadsafe_function/test.js | ||
+++ b/test/node-api/test_threadsafe_function/test.js | ||
@@ -210,6 +210,15 @@ new Promise(function testWithoutJSMarshaller(resolve) { | ||
})) | ||
.then((result) => assert.strictEqual(result.indexOf(0), -1)) | ||
|
||
+// Make sure that threadsafe function isn't stalled when we hit | ||
+// `kMaxIterationCount` in `src/node_api.cc` | ||
+.then(() => testWithJSMarshaller({ | ||
+ threadStarter: 'StartThreadNonblocking', | ||
+ maxQueueSize: binding.ARRAY_LENGTH >>> 1, | ||
+ quitAfter: binding.ARRAY_LENGTH | ||
+})) | ||
+.then((result) => assert.deepStrictEqual(result, expectedArray)) | ||
+ | ||
// Start a child process to test rapid teardown | ||
.then(() => testUnref(binding.MAX_QUEUE_SIZE)) | ||
|