Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -281,9 +281,8 @@ Napi::Value AvmSimulateNapi::simulate(const Napi::CallbackInfo& cb_info)
**********************************************************/

auto deferred = std::make_shared<Napi::Promise::Deferred>(env);
// Create threaded operation that runs on a dedicated std::thread (not libuv pool).
// This prevents libuv thread pool exhaustion when callbacks need libuv threads for I/O.
auto* op = new ThreadedAsyncOperation(
// Create async operation that will run on a worker thread
auto* op = new AsyncOperation(
env, deferred, [data, tsfns, logger_tsfn, ws_ptr, cancellation_token](msgpack::sbuffer& result_buffer) {
// Collect all thread-safe functions including logger for cleanup
auto all_tsfns = tsfns.to_vector();
Expand Down Expand Up @@ -327,6 +326,7 @@ Napi::Value AvmSimulateNapi::simulate(const Napi::CallbackInfo& cb_info)
}
});

// Napi is now responsible for destroying this object
op->Queue();

return deferred->Promise();
Expand Down Expand Up @@ -368,8 +368,8 @@ Napi::Value AvmSimulateNapi::simulateWithHintedDbs(const Napi::CallbackInfo& cb_
// Create a deferred promise
auto deferred = std::make_shared<Napi::Promise::Deferred>(env);

// Create threaded operation that runs on a dedicated std::thread (not libuv pool)
auto* op = new ThreadedAsyncOperation(env, deferred, [data](msgpack::sbuffer& result_buffer) {
// Create async operation that will run on a worker thread
auto* op = new AsyncOperation(env, deferred, [data](msgpack::sbuffer& result_buffer) {
try {
// Deserialize inputs from msgpack
avm2::AvmProvingInputs inputs;
Expand All @@ -393,6 +393,7 @@ Napi::Value AvmSimulateNapi::simulateWithHintedDbs(const Napi::CallbackInfo& cb_
}
});

// Napi is now responsible for destroying this object
op->Queue();

return deferred->Promise();
Expand Down
75 changes: 0 additions & 75 deletions barretenberg/cpp/src/barretenberg/nodejs_module/util/async_op.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
#include "barretenberg/serialize/msgpack_impl.hpp"
#include <memory>
#include <napi.h>
#include <thread>
#include <utility>

namespace bb::nodejs {
Expand Down Expand Up @@ -66,78 +65,4 @@ class AsyncOperation : public Napi::AsyncWorker {
msgpack::sbuffer _result;
};

/**
* @brief Runs work on a dedicated std::thread instead of the libuv thread pool.
*
* Unlike AsyncOperation (which uses Napi::AsyncWorker and occupies a libuv thread),
* this class spawns a new OS thread for each operation. This prevents AVM simulations
* from exhausting the libuv thread pool, which would deadlock when C++ callbacks need
* to invoke JS functions that themselves require libuv threads (e.g., LMDB reads).
*
* The completion callback (resolve/reject) is posted back to the JS main thread via
* a Napi::ThreadSafeFunction, so the event loop returns immediately after launch
* and is woken up only when the work is done.
*
* Usage: `auto* op = new ThreadedAsyncOperation(env, deferred, fn); op->Queue();`
* The object self-destructs after resolving/rejecting the promise.
*/
class ThreadedAsyncOperation {
public:
ThreadedAsyncOperation(Napi::Env env, std::shared_ptr<Napi::Promise::Deferred> deferred, async_fn fn)
: _fn(std::move(fn))
, _deferred(std::move(deferred))
{
// Create a no-op JS function as the TSFN target — we use the native callback form of BlockingCall
// to resolve/reject the promise, so the JS function is never actually called directly.
auto dummy = Napi::Function::New(env, [](const Napi::CallbackInfo&) {});
_completion_tsfn = Napi::ThreadSafeFunction::New(env, dummy, "ThreadedAsyncOpComplete", 0, 1);
}

ThreadedAsyncOperation(const ThreadedAsyncOperation&) = delete;
ThreadedAsyncOperation& operator=(const ThreadedAsyncOperation&) = delete;
ThreadedAsyncOperation(ThreadedAsyncOperation&&) = delete;
ThreadedAsyncOperation& operator=(ThreadedAsyncOperation&&) = delete;

~ThreadedAsyncOperation() = default;

void Queue()
{
std::thread([this]() {
try {
_fn(_result);
_success = true;
} catch (const std::exception& e) {
_error = e.what();
_success = false;
} catch (...) {
_error = "Unknown exception occurred during threaded async operation";
_success = false;
}

// Post completion back to the JS main thread
_completion_tsfn.BlockingCall(
this, [](Napi::Env env, Napi::Function /*js_callback*/, ThreadedAsyncOperation* op) {
if (op->_success) {
auto buf = Napi::Buffer<char>::Copy(env, op->_result.data(), op->_result.size());
op->_deferred->Resolve(buf);
} else {
auto error = Napi::Error::New(env, op->_error);
op->_deferred->Reject(error.Value());
}
// Release the TSFN and self-destruct
op->_completion_tsfn.Release();
delete op;
});
}).detach();
}

private:
async_fn _fn;
std::shared_ptr<Napi::Promise::Deferred> _deferred;
Napi::ThreadSafeFunction _completion_tsfn;
msgpack::sbuffer _result;
bool _success = false;
std::string _error;
};

} // namespace bb::nodejs
61 changes: 28 additions & 33 deletions yarn-project/native/src/native_module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -128,33 +128,23 @@ export function cancelSimulation(token: CancellationToken): void {
}

/**
* Maximum number of concurrent AVM simulations. Each simulation spawns a dedicated OS thread,
* so this controls resource usage. Defaults to 4. Set to 0 for unlimited.
* Concurrency limiting for C++ AVM simulation to prevent libuv thread pool exhaustion.
*
* The C++ simulator uses NAPI BlockingCall to callback to TypeScript for contract data.
* This blocks the libuv thread while waiting for the callback to complete. If all libuv
* threads are blocked waiting for callbacks, no threads remain to service those callbacks,
* causing deadlock.
*
* We limit concurrent simulations to UV_THREADPOOL_SIZE / 2 to ensure threads remain
* available for callback processing.
*/
export const AVM_MAX_CONCURRENT_SIMULATIONS = parseInt(process.env.AVM_MAX_CONCURRENT_SIMULATIONS ?? '4', 10);
const avmSimulationSemaphore =
AVM_MAX_CONCURRENT_SIMULATIONS > 0 ? new Semaphore(AVM_MAX_CONCURRENT_SIMULATIONS) : null;

async function withAvmConcurrencyLimit<T>(fn: () => Promise<T>): Promise<T> {
if (!avmSimulationSemaphore) {
return fn();
}
await avmSimulationSemaphore.acquire();
try {
return await fn();
} finally {
avmSimulationSemaphore.release();
}
}
const UV_THREADPOOL_SIZE = parseInt(process.env.UV_THREADPOOL_SIZE ?? '4', 10);
export const AVM_MAX_CONCURRENT_SIMULATIONS = Math.max(1, Math.floor(UV_THREADPOOL_SIZE / 2));
const avmSimulationSemaphore = new Semaphore(AVM_MAX_CONCURRENT_SIMULATIONS);

/**
* AVM simulation function that takes serialized inputs and a contract provider.
* The contract provider enables C++ to callback to TypeScript for contract data during simulation.
*
* Simulations run on dedicated std::threads (not the libuv thread pool), so there is no risk
* of libuv thread pool exhaustion or deadlock from C++ BlockingCall callbacks.
* Concurrency is limited by AVM_MAX_CONCURRENT_SIMULATIONS (default 4, 0 = unlimited).
*
* @param inputs - Msgpack-serialized AvmFastSimulationInputs buffer
* @param contractProvider - Object with callbacks for fetching contract instances and classes
* @param worldStateHandle - Native handle to WorldState instance
Expand All @@ -163,38 +153,43 @@ async function withAvmConcurrencyLimit<T>(fn: () => Promise<T>): Promise<T> {
* @param cancellationToken - Optional token to enable cancellation support
* @returns Promise resolving to msgpack-serialized AvmCircuitPublicInputs buffer
*/
export function avmSimulate(
export async function avmSimulate(
inputs: Buffer,
contractProvider: ContractProvider,
worldStateHandle: any,
logLevel: LogLevel = 'info',
logger?: Logger,
cancellationToken?: CancellationToken,
): Promise<Buffer> {
return withAvmConcurrencyLimit(() =>
nativeAvmSimulate(
await avmSimulationSemaphore.acquire();

try {
return await nativeAvmSimulate(
inputs,
contractProvider,
worldStateHandle,
LogLevels.indexOf(logLevel),
logger ? (level: LogLevel, msg: string) => logger[level](msg) : null,
cancellationToken,
),
);
);
} finally {
avmSimulationSemaphore.release();
}
}

/**
* AVM simulation function that uses pre-collected hints from TypeScript simulation.
* All contract data and merkle tree hints are included in the AvmCircuitInputs, so no runtime
* callbacks to TS or WS pointer are needed.
*
* Simulations run on dedicated std::threads (not the libuv thread pool).
* Concurrency is limited by AVM_MAX_CONCURRENT_SIMULATIONS (default 4, 0 = unlimited).
*
* @param inputs - Msgpack-serialized AvmCircuitInputs (AvmProvingInputs in C++) buffer
* @param logLevel - Log level to control C++ verbosity
* @returns Promise resolving to msgpack-serialized simulation results buffer
*/
export function avmSimulateWithHintedDbs(inputs: Buffer, logLevel: LogLevel = 'info'): Promise<Buffer> {
return withAvmConcurrencyLimit(() => nativeAvmSimulateWithHintedDbs(inputs, LogLevels.indexOf(logLevel)));
export async function avmSimulateWithHintedDbs(inputs: Buffer, logLevel: LogLevel = 'info'): Promise<Buffer> {
await avmSimulationSemaphore.acquire();
try {
return await nativeAvmSimulateWithHintedDbs(inputs, LogLevels.indexOf(logLevel));
} finally {
avmSimulationSemaphore.release();
}
}
Loading