Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -281,9 +281,9 @@ Napi::Value AvmSimulateNapi::simulate(const Napi::CallbackInfo& cb_info)
**********************************************************/

auto deferred = std::make_shared<Napi::Promise::Deferred>(env);
// Create threaded operation that runs on a dedicated std::thread (not libuv pool).
// This prevents libuv thread pool exhaustion when callbacks need libuv threads for I/O.
auto* op = new ThreadedAsyncOperation(
// Run on a dedicated std::thread (not libuv pool) to prevent libuv thread pool
// exhaustion when callbacks need libuv threads for I/O.
ThreadedAsyncOperation::Run(
env, deferred, [data, tsfns, logger_tsfn, ws_ptr, cancellation_token](msgpack::sbuffer& result_buffer) {
// Collect all thread-safe functions including logger for cleanup
auto all_tsfns = tsfns.to_vector();
Expand Down Expand Up @@ -327,8 +327,6 @@ Napi::Value AvmSimulateNapi::simulate(const Napi::CallbackInfo& cb_info)
}
});

op->Queue();

return deferred->Promise();
}

Expand Down Expand Up @@ -368,8 +366,8 @@ Napi::Value AvmSimulateNapi::simulateWithHintedDbs(const Napi::CallbackInfo& cb_
// Create a deferred promise
auto deferred = std::make_shared<Napi::Promise::Deferred>(env);

// Create threaded operation that runs on a dedicated std::thread (not libuv pool)
auto* op = new ThreadedAsyncOperation(env, deferred, [data](msgpack::sbuffer& result_buffer) {
// Run on a dedicated std::thread (not libuv pool)
ThreadedAsyncOperation::Run(env, deferred, [data](msgpack::sbuffer& result_buffer) {
try {
// Deserialize inputs from msgpack
avm2::AvmProvingInputs inputs;
Expand All @@ -393,8 +391,6 @@ Napi::Value AvmSimulateNapi::simulateWithHintedDbs(const Napi::CallbackInfo& cb_
}
});

op->Queue();

return deferred->Promise();
}

Expand Down
63 changes: 36 additions & 27 deletions barretenberg/cpp/src/barretenberg/nodejs_module/util/async_op.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -78,17 +78,19 @@ class AsyncOperation : public Napi::AsyncWorker {
* a Napi::ThreadSafeFunction, so the event loop returns immediately after launch
* and is woken up only when the work is done.
*
* Usage: `auto* op = new ThreadedAsyncOperation(env, deferred, fn); op->Queue();`
* The object self-destructs after resolving/rejecting the promise.
* Prevent use-after-free: the TSFN callback runs asynchronously on the JS thread
* (napi_tsfn_blocking only blocks on queue insertion, NOT on callback completion).
* Both the worker thread lambda and the callback capture a shared_ptr to keep the
* object alive until both are done.
*
* Usage: `ThreadedAsyncOperation::Run(env, deferred, fn);`
*/
class ThreadedAsyncOperation {
class ThreadedAsyncOperation : public std::enable_shared_from_this<ThreadedAsyncOperation> {
public:
ThreadedAsyncOperation(Napi::Env env, std::shared_ptr<Napi::Promise::Deferred> deferred, async_fn fn)
: _fn(std::move(fn))
, _deferred(std::move(deferred))
{
// Create a no-op JS function as the TSFN target — we use the native callback form of BlockingCall
// to resolve/reject the promise, so the JS function is never actually called directly.
auto dummy = Napi::Function::New(env, [](const Napi::CallbackInfo&) {});
_completion_tsfn = Napi::ThreadSafeFunction::New(env, dummy, "ThreadedAsyncOpComplete", 0, 1);
}
Expand All @@ -100,38 +102,45 @@ class ThreadedAsyncOperation {

~ThreadedAsyncOperation() = default;

static void Run(Napi::Env env, std::shared_ptr<Napi::Promise::Deferred> deferred, async_fn fn)
{
auto op = std::make_shared<ThreadedAsyncOperation>(env, std::move(deferred), std::move(fn));
op->Queue();
}

private:
void Queue()
{
std::thread([this]() {
auto self = shared_from_this();
std::thread([self]() {
try {
_fn(_result);
_success = true;
self->_fn(self->_result);
self->_success = true;
} catch (const std::exception& e) {
_error = e.what();
_success = false;
self->_error = e.what();
self->_success = false;
} catch (...) {
_error = "Unknown exception occurred during threaded async operation";
_success = false;
self->_error = "Unknown exception occurred during threaded async operation";
self->_success = false;
}

// Post completion back to the JS main thread
_completion_tsfn.BlockingCall(
this, [](Napi::Env env, Napi::Function /*js_callback*/, ThreadedAsyncOperation* op) {
if (op->_success) {
auto buf = Napi::Buffer<char>::Copy(env, op->_result.data(), op->_result.size());
op->_deferred->Resolve(buf);
} else {
auto error = Napi::Error::New(env, op->_error);
op->_deferred->Reject(error.Value());
}
// Release the TSFN and self-destruct
op->_completion_tsfn.Release();
delete op;
});
// Post completion to the JS main thread. The callback captures `self`
// (shared_ptr) so the object stays alive until the callback runs.
// napi_tsfn_blocking only blocks on queue insertion, not on callback
// completion, so we cannot use raw pointers here.
self->_completion_tsfn.BlockingCall([self](Napi::Env env, Napi::Function /*js_callback*/) {
if (self->_success) {
auto buf = Napi::Buffer<char>::Copy(env, self->_result.data(), self->_result.size());
self->_deferred->Resolve(buf);
} else {
auto error = Napi::Error::New(env, self->_error);
self->_deferred->Reject(error.Value());
}
self->_completion_tsfn.Release();
});
}).detach();
}

private:
async_fn _fn;
std::shared_ptr<Napi::Promise::Deferred> _deferred;
Napi::ThreadSafeFunction _completion_tsfn;
Expand Down
Loading