diff --git a/barretenberg/cpp/src/barretenberg/nodejs_module/avm_simulate/avm_simulate_napi.cpp b/barretenberg/cpp/src/barretenberg/nodejs_module/avm_simulate/avm_simulate_napi.cpp index 01c19f45e6b5..283f9488556e 100644 --- a/barretenberg/cpp/src/barretenberg/nodejs_module/avm_simulate/avm_simulate_napi.cpp +++ b/barretenberg/cpp/src/barretenberg/nodejs_module/avm_simulate/avm_simulate_napi.cpp @@ -281,9 +281,8 @@ Napi::Value AvmSimulateNapi::simulate(const Napi::CallbackInfo& cb_info) **********************************************************/ auto deferred = std::make_shared(env); - // Create threaded operation that runs on a dedicated std::thread (not libuv pool). - // This prevents libuv thread pool exhaustion when callbacks need libuv threads for I/O. - auto* op = new ThreadedAsyncOperation( + // Create async operation that will run on a worker thread + auto* op = new AsyncOperation( env, deferred, [data, tsfns, logger_tsfn, ws_ptr, cancellation_token](msgpack::sbuffer& result_buffer) { // Collect all thread-safe functions including logger for cleanup auto all_tsfns = tsfns.to_vector(); @@ -327,6 +326,7 @@ Napi::Value AvmSimulateNapi::simulate(const Napi::CallbackInfo& cb_info) } }); + // Napi is now responsible for destroying this object op->Queue(); return deferred->Promise(); @@ -368,8 +368,8 @@ Napi::Value AvmSimulateNapi::simulateWithHintedDbs(const Napi::CallbackInfo& cb_ // Create a deferred promise auto deferred = std::make_shared(env); - // Create threaded operation that runs on a dedicated std::thread (not libuv pool) - auto* op = new ThreadedAsyncOperation(env, deferred, [data](msgpack::sbuffer& result_buffer) { + // Create async operation that will run on a worker thread + auto* op = new AsyncOperation(env, deferred, [data](msgpack::sbuffer& result_buffer) { try { // Deserialize inputs from msgpack avm2::AvmProvingInputs inputs; @@ -393,6 +393,7 @@ Napi::Value AvmSimulateNapi::simulateWithHintedDbs(const Napi::CallbackInfo& cb_ } }); + // Napi is now responsible for destroying this object op->Queue(); return deferred->Promise(); diff --git a/barretenberg/cpp/src/barretenberg/nodejs_module/util/async_op.hpp b/barretenberg/cpp/src/barretenberg/nodejs_module/util/async_op.hpp index 13a933cd5a81..3e29d08b5f6b 100644 --- a/barretenberg/cpp/src/barretenberg/nodejs_module/util/async_op.hpp +++ b/barretenberg/cpp/src/barretenberg/nodejs_module/util/async_op.hpp @@ -3,7 +3,6 @@ #include "barretenberg/serialize/msgpack_impl.hpp" #include #include -#include #include namespace bb::nodejs { @@ -66,78 +65,4 @@ class AsyncOperation : public Napi::AsyncWorker { msgpack::sbuffer _result; }; -/** - * @brief Runs work on a dedicated std::thread instead of the libuv thread pool. - * - * Unlike AsyncOperation (which uses Napi::AsyncWorker and occupies a libuv thread), - * this class spawns a new OS thread for each operation. This prevents AVM simulations - * from exhausting the libuv thread pool, which would deadlock when C++ callbacks need - * to invoke JS functions that themselves require libuv threads (e.g., LMDB reads). - * - * The completion callback (resolve/reject) is posted back to the JS main thread via - * a Napi::ThreadSafeFunction, so the event loop returns immediately after launch - * and is woken up only when the work is done. - * - * Usage: `auto* op = new ThreadedAsyncOperation(env, deferred, fn); op->Queue();` - * The object self-destructs after resolving/rejecting the promise. - */ -class ThreadedAsyncOperation { - public: - ThreadedAsyncOperation(Napi::Env env, std::shared_ptr deferred, async_fn fn) - : _fn(std::move(fn)) - , _deferred(std::move(deferred)) - { - // Create a no-op JS function as the TSFN target — we use the native callback form of BlockingCall - // to resolve/reject the promise, so the JS function is never actually called directly. - auto dummy = Napi::Function::New(env, [](const Napi::CallbackInfo&) {}); - _completion_tsfn = Napi::ThreadSafeFunction::New(env, dummy, "ThreadedAsyncOpComplete", 0, 1); - } - - ThreadedAsyncOperation(const ThreadedAsyncOperation&) = delete; - ThreadedAsyncOperation& operator=(const ThreadedAsyncOperation&) = delete; - ThreadedAsyncOperation(ThreadedAsyncOperation&&) = delete; - ThreadedAsyncOperation& operator=(ThreadedAsyncOperation&&) = delete; - - ~ThreadedAsyncOperation() = default; - - void Queue() - { - std::thread([this]() { - try { - _fn(_result); - _success = true; - } catch (const std::exception& e) { - _error = e.what(); - _success = false; - } catch (...) { - _error = "Unknown exception occurred during threaded async operation"; - _success = false; - } - - // Post completion back to the JS main thread - _completion_tsfn.BlockingCall( - this, [](Napi::Env env, Napi::Function /*js_callback*/, ThreadedAsyncOperation* op) { - if (op->_success) { - auto buf = Napi::Buffer::Copy(env, op->_result.data(), op->_result.size()); - op->_deferred->Resolve(buf); - } else { - auto error = Napi::Error::New(env, op->_error); - op->_deferred->Reject(error.Value()); - } - // Release the TSFN and self-destruct - op->_completion_tsfn.Release(); - delete op; - }); - }).detach(); - } - - private: - async_fn _fn; - std::shared_ptr _deferred; - Napi::ThreadSafeFunction _completion_tsfn; - msgpack::sbuffer _result; - bool _success = false; - std::string _error; -}; - } // namespace bb::nodejs diff --git a/yarn-project/native/src/native_module.ts b/yarn-project/native/src/native_module.ts index 6966e33193d7..0319bf8d894b 100644 --- a/yarn-project/native/src/native_module.ts +++ b/yarn-project/native/src/native_module.ts @@ -128,33 +128,23 @@ export function cancelSimulation(token: CancellationToken): void { } /** - * Maximum number of concurrent AVM simulations. Each simulation spawns a dedicated OS thread, - * so this controls resource usage. Defaults to 4. Set to 0 for unlimited. + * Concurrency limiting for C++ AVM simulation to prevent libuv thread pool exhaustion. + * + * The C++ simulator uses NAPI BlockingCall to callback to TypeScript for contract data. + * This blocks the libuv thread while waiting for the callback to complete. If all libuv + * threads are blocked waiting for callbacks, no threads remain to service those callbacks, + * causing deadlock. + * + * We limit concurrent simulations to UV_THREADPOOL_SIZE / 2 to ensure threads remain + * available for callback processing. */ -export const AVM_MAX_CONCURRENT_SIMULATIONS = parseInt(process.env.AVM_MAX_CONCURRENT_SIMULATIONS ?? '4', 10); -const avmSimulationSemaphore = - AVM_MAX_CONCURRENT_SIMULATIONS > 0 ? new Semaphore(AVM_MAX_CONCURRENT_SIMULATIONS) : null; - -async function withAvmConcurrencyLimit(fn: () => Promise): Promise { - if (!avmSimulationSemaphore) { - return fn(); - } - await avmSimulationSemaphore.acquire(); - try { - return await fn(); - } finally { - avmSimulationSemaphore.release(); - } -} +const UV_THREADPOOL_SIZE = parseInt(process.env.UV_THREADPOOL_SIZE ?? '4', 10); +export const AVM_MAX_CONCURRENT_SIMULATIONS = Math.max(1, Math.floor(UV_THREADPOOL_SIZE / 2)); +const avmSimulationSemaphore = new Semaphore(AVM_MAX_CONCURRENT_SIMULATIONS); /** * AVM simulation function that takes serialized inputs and a contract provider. * The contract provider enables C++ to callback to TypeScript for contract data during simulation. - * - * Simulations run on dedicated std::threads (not the libuv thread pool), so there is no risk - * of libuv thread pool exhaustion or deadlock from C++ BlockingCall callbacks. - * Concurrency is limited by AVM_MAX_CONCURRENT_SIMULATIONS (default 4, 0 = unlimited). - * * @param inputs - Msgpack-serialized AvmFastSimulationInputs buffer * @param contractProvider - Object with callbacks for fetching contract instances and classes * @param worldStateHandle - Native handle to WorldState instance @@ -163,7 +153,7 @@ async function withAvmConcurrencyLimit(fn: () => Promise): Promise { * @param cancellationToken - Optional token to enable cancellation support * @returns Promise resolving to msgpack-serialized AvmCircuitPublicInputs buffer */ -export function avmSimulate( +export async function avmSimulate( inputs: Buffer, contractProvider: ContractProvider, worldStateHandle: any, @@ -171,30 +161,35 @@ export function avmSimulate( logger?: Logger, cancellationToken?: CancellationToken, ): Promise { - return withAvmConcurrencyLimit(() => - nativeAvmSimulate( + await avmSimulationSemaphore.acquire(); + + try { + return await nativeAvmSimulate( inputs, contractProvider, worldStateHandle, LogLevels.indexOf(logLevel), logger ? (level: LogLevel, msg: string) => logger[level](msg) : null, cancellationToken, - ), - ); + ); + } finally { + avmSimulationSemaphore.release(); + } } /** * AVM simulation function that uses pre-collected hints from TypeScript simulation. * All contract data and merkle tree hints are included in the AvmCircuitInputs, so no runtime * callbacks to TS or WS pointer are needed. - * - * Simulations run on dedicated std::threads (not the libuv thread pool). - * Concurrency is limited by AVM_MAX_CONCURRENT_SIMULATIONS (default 4, 0 = unlimited). - * * @param inputs - Msgpack-serialized AvmCircuitInputs (AvmProvingInputs in C++) buffer * @param logLevel - Log level to control C++ verbosity * @returns Promise resolving to msgpack-serialized simulation results buffer */ -export function avmSimulateWithHintedDbs(inputs: Buffer, logLevel: LogLevel = 'info'): Promise { - return withAvmConcurrencyLimit(() => nativeAvmSimulateWithHintedDbs(inputs, LogLevels.indexOf(logLevel))); +export async function avmSimulateWithHintedDbs(inputs: Buffer, logLevel: LogLevel = 'info'): Promise { + await avmSimulationSemaphore.acquire(); + try { + return await nativeAvmSimulateWithHintedDbs(inputs, LogLevels.indexOf(logLevel)); + } finally { + avmSimulationSemaphore.release(); + } }