diff --git a/.eslintrc.js b/.eslintrc.js index b9d7894..d95d8af 100644 --- a/.eslintrc.js +++ b/.eslintrc.js @@ -2,7 +2,7 @@ module.exports = { extends: ['@sentry-internal/sdk'], env: { node: true, - es6: true, + es2020: true }, parserOptions: { sourceType: 'module', diff --git a/CHANGELOG.md b/CHANGELOG.md index 99a2aff..2a8bada 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,9 @@ # Changelog +## 0.3.0 + +- feat: Capture thread state from `AsyncLocalStorage` store (#24) + ## 0.2.3 - fix: Failing install script (#22) diff --git a/README.md b/README.md index 8b6e58d..f81af49 100644 --- a/README.md +++ b/README.md @@ -5,9 +5,11 @@ main or worker threads from any other thread, even if event loops are blocked. The module also provides a means to create a watchdog system to track event loop blocking via periodic heartbeats. When the time from the last heartbeat crosses -a threshold, JavaScript stack traces can be captured. The heartbeats can -optionally include state information which is included with the corresponding -stack trace. +a threshold, JavaScript stack traces can be captured. + +For Node.js >= v24, this module can also capture state from `AsyncLocalStorage` +at the time of stack trace capture, which can help provide context on what the +thread was working on when it became blocked. This native module is used for Sentry's [Event Loop Blocked Detection](https://docs.sentry.io/platforms/javascript/guides/nextjs/configuration/event-loop-block/) @@ -70,7 +72,7 @@ Stack traces show where each thread is currently executing: } ] }, - '2': { // Worker thread + '2': { // Worker thread frames: [ { function: 'from', @@ -105,25 +107,28 @@ Stack traces show where each thread is currently executing: Set up automatic detection of blocked event loops: -### 1. Set up thread heartbeats +### 1. Register threads with `AsyncLocalStorage` state tracking and heartbeats -Send regular heartbeats with optional state information: +Send regular heartbeats: ```ts import { registerThread, threadPoll, } from "@sentry-internal/node-native-stacktrace"; +import { AsyncLocalStorage } from "node:async_hooks"; -// Register this thread -registerThread(); +// Create async local storage for state tracking +const asyncLocalStorage = new AsyncLocalStorage(); +// Set some state in the async local storage +asyncLocalStorage.enterWith({ someState: "value" }); -// Send heartbeats every 200ms with optional state +// Register this thread with async local storage +registerThread({ asyncLocalStorage }); + +// Send heartbeats every 200ms setInterval(() => { - threadPoll({ - endpoint: "/api/current-request", - userId: getCurrentUserId(), - }); + threadPoll(); }, 200); ``` @@ -150,7 +155,7 @@ setInterval(() => { console.error(`🚨 Thread ${threadId} blocked for ${timeSinceLastSeen}ms`); console.error("Stack trace:", blockedThread.frames); - console.error("Last known state:", blockedThread.state); + console.error("Async state:", blockedThread.asyncState); } } }, 500); // Check every 500ms @@ -162,21 +167,48 @@ setInterval(() => { #### `registerThread(threadName?: string): void` -Registers the current thread for monitoring. Must be called from each thread you -want to capture stack traces from. +#### `registerThread(asyncStorage: AsyncStorageArgs, threadName?: string): void` + +Registers the current thread for stack trace capture. Must be called from each +thread you want to capture stack traces from. - `threadName` (optional): Name for the thread. Defaults to the current thread ID. +- `asyncStorage` (optional): `AsyncStorageArgs` to fetch state from + `AsyncLocalStorage` on stack trace capture. + +```ts +type AsyncStorageArgs = { + /** AsyncLocalStorage instance to fetch state from */ + asyncLocalStorage: AsyncLocalStorage; + /** + * Optional array of keys to pick a specific property from the store. + * Key will be traversed in order through Objects/Maps to reach the desired property. + * + * This is useful if you want to capture Open Telemetry context values as state. + * + * To get this value: + * context.getValue(MY_UNIQUE_SYMBOL_REF) + * + * You would set: + * stateLookup: ['_currentContext', MY_UNIQUE_SYMBOL_REF] + */ + stateLookup?: Array; +}; +``` -#### `captureStackTrace(): Record>` +#### `captureStackTrace(): Record>` Captures stack traces from all registered threads. Can be called from any thread -but will not capture the stack trace of the calling thread itself. +but will not capture a stack trace for the calling thread itself. ```ts -type Thread = { +type Thread = { frames: StackFrame[]; - state?: S; + /** State captured from the AsyncLocalStorage */ + asyncState?: A; + /** Optional state provided when calling threadPoll */ + pollState?: P; }; type StackFrame = { @@ -187,16 +219,15 @@ type StackFrame = { }; ``` -#### `threadPoll(state?: State, disableLastSeen?: boolean): void` +#### `threadPoll(disableLastSeen?: boolean, pollState?: object): void` -Sends a heartbeat from the current thread with optional state information. The -state object will be serialized and included as a JavaScript object with the -corresponding stack trace. +Sends a heartbeat from the current thread. -- `state` (optional): An object containing state information to include with the - stack trace. - `disableLastSeen` (optional): If `true`, disables the tracking of the last seen time for this thread. +- `pollState` (optional): An object containing state to include with the next + stack trace capture. This can be used instead of or in addition to + `AsyncLocalStorage` based state tracking. #### `getThreadsLastSeen(): Record` diff --git a/module.cc b/module.cc index 56582d6..9e9731a 100644 --- a/module.cc +++ b/module.cc @@ -2,6 +2,9 @@ #include #include #include +#include +#include +#include // Platform-specific includes for time functions #ifdef _WIN32 @@ -12,11 +15,26 @@ #include #endif +#ifndef NODE_MAJOR_VERSION +#error "NODE_MAJOR_VERSION is not defined" +#endif + +#define SUPPORTS_ASYNC_CONTEXT_FRAME NODE_MAJOR_VERSION >= 22 +#define GET_CONTINUATION_PRESERVED_EMBEDDER_DATA_V2 V8_MAJOR_VERSION >= 14 + using namespace v8; using namespace node; using namespace std::chrono; -static const int kMaxStackFrames = 255; +static const int kMaxStackFrames = 50; + +struct AsyncLocalStorageLookup { + // Async local storage instance associated with this thread + v8::Global async_local_storage; + // Optional ordered array of keys (string | symbol) to traverse nested + // Map/Object structures to fetch the final state object + std::optional>> storage_keys; +}; // Structure to hold information for each thread/isolate struct ThreadInfo { @@ -24,8 +42,10 @@ struct ThreadInfo { std::string thread_name; // Last time this thread was seen in milliseconds since epoch milliseconds last_seen; - // Some JSON serialized state for the thread - std::string state; + // Optional async local storage associated with this thread + std::optional async_store; + // Some JSON serialized state sent via threadPoll + std::string poll_state; }; static std::mutex threads_mutex; @@ -41,21 +61,157 @@ struct JsStackFrame { }; // Type alias for a vector of JsStackFrame -using JsStackTrace = std::vector; +using JsStackFrames = std::vector; + +struct JsStackTrace { + // The frames in the stack trace + JsStackFrames frames; + // JSON serialized string of the async state + std::string async_state; +}; struct ThreadResult { std::string thread_name; - std::string state; - JsStackTrace stack_frames; + JsStackTrace stack_trace; + // JSON serialized string of the poll state + std::string poll_state; }; -// Function to be called when an isolate's execution is interrupted -static void ExecutionInterrupted(Isolate *isolate, void *data) { - auto promise = static_cast *>(data); +// Recursively sanitize a value to be safely JSON-stringifiable by: +// - Removing properties whose values are BigInt, Function, or Symbol +// (dropped for objects, omitted from arrays) +// - Breaking cycles by omitting repeated objects (undefined -> dropped/omitted) +// - Preserving primitives and traversing arrays/objects +static v8::Local +SanitizeForJSON(v8::Isolate *isolate, v8::Local context, + v8::Local value, + std::vector> &ancestors) { + // Fast-path for primitives that are always JSON-compatible + if (value->IsNull() || value->IsBoolean() || value->IsNumber() || + value->IsString()) { + return value; + } + + // Values that JSON.stringify cannot handle directly + if (value->IsBigInt() || value->IsSymbol() || value->IsFunction() || + value->IsUndefined()) { + // Returning undefined here lets callers decide to drop (object) or null + // (array) + return v8::Undefined(isolate); + } + + // Arrays + if (value->IsArray()) { + auto arr = value.As(); + // Cycle detection + auto arr_obj = value.As(); + for (auto &a : ancestors) { + if (a->StrictEquals(arr_obj)) { + return v8::Undefined(isolate); + } + } + + auto length = arr->Length(); + auto out = v8::Array::New(isolate, 0); + ancestors.push_back(arr_obj); + + uint32_t out_index = 0; + for (uint32_t i = 0; i < length; ++i) { + auto maybeEl = arr->Get(context, i); + v8::Local el; + if (!maybeEl.ToLocal(&el)) { + el = v8::Undefined(isolate); + } + + auto sanitized = SanitizeForJSON(isolate, context, el, ancestors); + if (!sanitized->IsUndefined()) { + out->Set(context, out_index++, sanitized) + .Check(); // omit undefined entries entirely + } + } + ancestors.pop_back(); + return out; + } + + // Objects (including Dates, RegExps, Maps as objects; we only traverse + // enumerable own props) + if (value->IsObject()) { + auto obj = value.As(); + // Cycle detection + for (auto &a : ancestors) { + if (a->StrictEquals(obj)) { + return v8::Undefined(isolate); + } + } + + ancestors.push_back(obj); + + // Collect own enumerable property names (string-keyed) + auto maybe_props = obj->GetPropertyNames(context); + if (maybe_props.IsEmpty()) { + ancestors.pop_back(); + return obj; // Nothing enumerable to sanitize + } + + auto props = maybe_props.ToLocalChecked(); + auto out = v8::Object::New(isolate); + auto len = props->Length(); + for (uint32_t i = 0; i < len; ++i) { + auto maybeKey = props->Get(context, i); + if (maybeKey.IsEmpty()) + continue; + + auto key = maybeKey.ToLocalChecked(); + if (!key->IsString()) { + // Skip symbol and non-string keys to match JSON behavior + continue; + } + + auto maybeVal = obj->Get(context, key); + if (maybeVal.IsEmpty()) + continue; + + auto val = maybeVal.ToLocalChecked(); + auto sanitized = SanitizeForJSON(isolate, context, val, ancestors); + if (!sanitized->IsUndefined()) { + out->Set(context, key, sanitized).Check(); + } + // else: undefined -> drop property + } + + ancestors.pop_back(); + return out; + } + + // Fallback: return as-is (shouldn't hit here for other exotic types) + return value; +} + +std::string JSONStringify(Isolate *isolate, Local value) { + auto context = isolate->GetCurrentContext(); + + // Sanitize the value first to avoid JSON failures (e.g., BigInt, cycles) + std::vector> ancestors; + auto sanitized = SanitizeForJSON(isolate, context, value, ancestors); + if (sanitized->IsUndefined()) { + // Nothing serializable + return ""; + } + + auto maybe_json = v8::JSON::Stringify(context, sanitized); + if (maybe_json.IsEmpty()) { + return ""; + } + v8::String::Utf8Value utf8(isolate, maybe_json.ToLocalChecked()); + return *utf8 ? *utf8 : ""; +} + +// Function to get stack frames from a V8 stack trace +JsStackFrames GetStackFrames(Isolate *isolate) { auto stack = StackTrace::CurrentStackTrace(isolate, kMaxStackFrames, StackTrace::kDetailed); - JsStackTrace frames; + JsStackFrames frames; if (!stack.IsEmpty()) { for (int i = 0; i < stack->GetFrameCount(); i++) { auto frame = stack->GetFrame(isolate, i); @@ -89,57 +245,174 @@ static void ExecutionInterrupted(Isolate *isolate, void *data) { } } - promise->set_value(frames); + return frames; +} + +#if SUPPORTS_ASYNC_CONTEXT_FRAME +// Function to fetch the thread state from the async context store +std::string GetThreadState(Isolate *isolate, + const AsyncLocalStorageLookup &store) { + +// Node.js stores the async local storage in the isolate's +// "ContinuationPreservedEmbedderData" map, keyed by the +// AsyncLocalStorage instance. +// https://github.com/nodejs/node/blob/c6316f9db9869864cea84e5f07585fa08e3e06d2/src/async_context_frame.cc#L37 +#if GET_CONTINUATION_PRESERVED_EMBEDDER_DATA_V2 + auto data = isolate->GetContinuationPreservedEmbedderDataV2().As(); +#else + auto data = isolate->GetContinuationPreservedEmbedderData(); +#endif + auto async_local_storage_local = store.async_local_storage.Get(isolate); + + if (data.IsEmpty() || !data->IsMap() || async_local_storage_local.IsEmpty()) { + return ""; + } + + auto map = data.As(); + auto context = isolate->GetCurrentContext(); + auto maybe_root_store = map->Get(context, async_local_storage_local); + + if (maybe_root_store.IsEmpty()) { + return ""; + } + + auto root_store = maybe_root_store.ToLocalChecked(); + + if (store.storage_keys.has_value()) { + // Walk the keys to get the desired nested value + const auto &keys = store.storage_keys.value(); + auto current = root_store; + + for (auto &gkey : keys) { + auto local_key = gkey.Get(isolate); + if (!(local_key->IsString() || local_key->IsSymbol())) { + continue; + } + + v8::MaybeLocal maybeValue; + if (current->IsMap()) { + auto map_val = current.As(); + maybeValue = map_val->Get(context, local_key); + } else if (current->IsObject()) { + auto obj_val = current.As(); + maybeValue = obj_val->Get(context, local_key); + } else { + return ""; + } + + if (maybeValue.IsEmpty()) { + return ""; + } + + current = maybeValue.ToLocalChecked(); + } + + root_store = current; + } + + return JSONStringify(isolate, root_store); +} +#endif + +struct InterruptArgs { + std::promise promise; + const std::optional *store; +}; + +// Function to be called when an isolate's execution is interrupted +static void ExecutionInterrupted(Isolate *isolate, void *data) { + auto args = static_cast(data); + Locker locker(isolate); + HandleScope handle_scope(isolate); + + if (isolate->IsExecutionTerminating()) { + args->promise.set_value({{}, ""}); + delete args; + return; + } + + auto frames = GetStackFrames(isolate); + std::string state = ""; + +#if SUPPORTS_ASYNC_CONTEXT_FRAME + if (args->store && args->store->has_value()) { + state = GetThreadState(isolate, args->store->value()); + } +#endif + + args->promise.set_value({frames, state}); + + delete args; } // Function to capture the stack trace of a single isolate -JsStackTrace CaptureStackTrace(Isolate *isolate) { +JsStackTrace +CaptureStackTrace(Isolate *isolate, + const std::optional &store) { + if (isolate->IsExecutionTerminating()) { + return JsStackTrace{{}, ""}; + } + std::promise promise; auto future = promise.get_future(); // The v8 isolate must be interrupted to capture the stack trace - // Execution resumes automatically after ExecutionInterrupted returns - isolate->RequestInterrupt(ExecutionInterrupted, &promise); + isolate->RequestInterrupt(ExecutionInterrupted, + new InterruptArgs{std::move(promise), &store}); + return future.get(); } // Function to capture stack traces from all registered threads void CaptureStackTraces(const FunctionCallbackInfo &args) { auto capture_from_isolate = args.GetIsolate(); - auto current_context = capture_from_isolate->GetCurrentContext(); - std::vector> futures; + std::vector results; { + std::vector> futures; std::lock_guard lock(threads_mutex); - for (auto [thread_isolate, thread_info] : threads) { + for (auto &thread : threads) { + auto thread_isolate = thread.first; + auto &thread_info = thread.second; + if (thread_isolate == capture_from_isolate) continue; + auto thread_name = thread_info.thread_name; - auto state = thread_info.state; + auto poll_state = thread_info.poll_state; futures.emplace_back(std::async( std::launch::async, - [thread_name, state](Isolate *isolate) -> ThreadResult { - return ThreadResult{thread_name, state, CaptureStackTrace(isolate)}; + [thread_isolate, thread_name, poll_state]( + const std::optional &async_store) + -> ThreadResult { + return ThreadResult{thread_name, + CaptureStackTrace(thread_isolate, async_store), + poll_state}; }, - thread_isolate)); + std::cref(thread_info.async_store))); + } + + for (auto &fut : futures) { + results.emplace_back(fut.get()); } } + auto current_context = capture_from_isolate->GetCurrentContext(); + Local output = Object::New(capture_from_isolate); - for (auto &future : futures) { - auto result = future.get(); + for (auto &result : results) { auto key = String::NewFromUtf8(capture_from_isolate, result.thread_name.c_str(), NewStringType::kNormal) .ToLocalChecked(); Local jsFrames = - Array::New(capture_from_isolate, result.stack_frames.size()); - for (size_t i = 0; i < result.stack_frames.size(); ++i) { - const auto &frame = result.stack_frames[i]; + Array::New(capture_from_isolate, result.stack_trace.frames.size()); + for (size_t i = 0; i < result.stack_trace.frames.size(); ++i) { + const auto &frame = result.stack_trace.frames[i]; Local frameObj = Object::New(capture_from_isolate); frameObj ->Set(current_context, @@ -189,9 +462,10 @@ void CaptureStackTraces(const FunctionCallbackInfo &args) { jsFrames) .Check(); - if (!result.state.empty()) { + if (!result.poll_state.empty()) { v8::MaybeLocal stateStr = v8::String::NewFromUtf8( - capture_from_isolate, result.state.c_str(), NewStringType::kNormal); + capture_from_isolate, result.poll_state.c_str(), + NewStringType::kNormal); if (!stateStr.IsEmpty()) { v8::MaybeLocal maybeStateVal = v8::JSON::Parse(current_context, stateStr.ToLocalChecked()); @@ -199,7 +473,27 @@ void CaptureStackTraces(const FunctionCallbackInfo &args) { if (maybeStateVal.ToLocal(&stateVal)) { threadObj ->Set(current_context, - String::NewFromUtf8(capture_from_isolate, "state", + String::NewFromUtf8(capture_from_isolate, "pollState", + NewStringType::kInternalized) + .ToLocalChecked(), + stateVal) + .Check(); + } + } + } + + if (!result.stack_trace.async_state.empty()) { + v8::MaybeLocal stateStr = v8::String::NewFromUtf8( + capture_from_isolate, result.stack_trace.async_state.c_str(), + NewStringType::kNormal); + if (!stateStr.IsEmpty()) { + v8::MaybeLocal maybeStateVal = + v8::JSON::Parse(current_context, stateStr.ToLocalChecked()); + v8::Local stateVal; + if (maybeStateVal.ToLocal(&stateVal)) { + threadObj + ->Set(current_context, + String::NewFromUtf8(capture_from_isolate, "asyncState", NewStringType::kInternalized) .ToLocalChecked(), stateVal) @@ -222,33 +516,103 @@ void Cleanup(void *arg) { threads.erase(isolate); } -// Function to register a thread and update it's last seen time +void RegisterThreadInternal( + Isolate *isolate, const std::string &thread_name, + std::optional async_store) { + + std::lock_guard lock(threads_mutex); + auto found = threads.find(isolate); + if (found == threads.end()) { + threads.emplace(isolate, ThreadInfo{thread_name, milliseconds::zero(), + std::move(async_store), ""}); + // Register a cleanup hook to remove this thread when the isolate is + // destroyed + node::AddEnvironmentCleanupHook(isolate, Cleanup, isolate); + } +} + +// Function to register a thread and update its last seen time void RegisterThread(const FunctionCallbackInfo &args) { auto isolate = args.GetIsolate(); + auto context = isolate->GetCurrentContext(); + + if (args.Length() == 1 && args[0]->IsString()) { + v8::String::Utf8Value utf8(isolate, args[0]); + std::string thread_name(*utf8 ? *utf8 : ""); + + RegisterThreadInternal(isolate, thread_name, std::nullopt); + } else if (args.Length() == 2 && args[0]->IsObject() && args[1]->IsString()) { + v8::String::Utf8Value utf8(isolate, args[1]); + std::string thread_name(*utf8 ? *utf8 : ""); + + auto obj = args[0].As(); + auto async_local_storage_val = + obj->Get(context, String::NewFromUtf8(isolate, "asyncLocalStorage", + NewStringType::kInternalized) + .ToLocalChecked()); + + if (async_local_storage_val.IsEmpty() || + !async_local_storage_val.ToLocalChecked()->IsObject()) { + isolate->ThrowException(Exception::Error( + String::NewFromUtf8(isolate, + "The first argument must be an object with an " + "asyncLocalStorage property", + NewStringType::kInternalized) + .ToLocalChecked())); + return; + } - if (args.Length() != 1 || !args[0]->IsString()) { + std::optional>> storage_keys = + std::nullopt; + + auto storage_key_val = + obj->Get(context, String::NewFromUtf8(isolate, "stateLookup", + NewStringType::kInternalized) + .ToLocalChecked()); + + if (!storage_key_val.IsEmpty()) { + + auto local_val = storage_key_val.ToLocalChecked(); + if (!local_val->IsUndefined() && !local_val->IsNull()) { + if (local_val->IsArray()) { + + auto arr = local_val.As(); + std::vector> keys_vec; + uint32_t length = arr->Length(); + for (uint32_t i = 0; i < length; ++i) { + auto maybeEl = arr->Get(context, i); + if (maybeEl.IsEmpty()) + continue; + auto el = maybeEl.ToLocalChecked(); + if (el->IsString() || el->IsSymbol()) { + + keys_vec.emplace_back(isolate, el); + } + } + if (!keys_vec.empty()) { + storage_keys = std::move(keys_vec); + } + } + } + } + + auto store = AsyncLocalStorageLookup{ + v8::Global(isolate, + async_local_storage_val.ToLocalChecked()), + std::move(storage_keys)}; + + RegisterThreadInternal(isolate, thread_name, std::move(store)); + } else { isolate->ThrowException(Exception::Error( String::NewFromUtf8( - isolate, "registerThread(name) requires a single name argument", + isolate, + "Incorrect arguments. Expected: \n" + "- registerThread(threadName: string) or \n" + "- registerThread(storage: {asyncLocalStorage: AsyncLocalStorage; " + "stateLookup?: Array}, " + "threadName: string)", NewStringType::kInternalized) .ToLocalChecked())); - - return; - } - - v8::String::Utf8Value utf8(isolate, args[0]); - std::string thread_name(*utf8 ? *utf8 : ""); - - { - std::lock_guard lock(threads_mutex); - auto found = threads.find(isolate); - if (found == threads.end()) { - threads.emplace(isolate, - ThreadInfo{thread_name, milliseconds::zero(), ""}); - // Register a cleanup hook to remove this thread when the isolate is - // destroyed - node::AddEnvironmentCleanupHook(isolate, Cleanup, isolate); - } } } @@ -272,7 +636,7 @@ steady_clock::time_point GetUnbiasedMonotonicTime() { return steady_clock::time_point(seconds(ts.tv_sec) + nanoseconds(ts.tv_nsec)); #else // Fallback for other platforms using steady_clock. Note: this will be - // monotonic but is not gaurenteed to ignore time spent while suspended. + // monotonic but is not guaranteed to ignore time spent while suspended. return steady_clock::now(); #endif } @@ -280,24 +644,16 @@ steady_clock::time_point GetUnbiasedMonotonicTime() { // Function to track a thread and set its state void ThreadPoll(const FunctionCallbackInfo &args) { auto isolate = args.GetIsolate(); - auto context = isolate->GetCurrentContext(); - std::string state_str; - if (args.Length() > 0 && args[0]->IsValue()) { - MaybeLocal maybe_json = v8::JSON::Stringify(context, args[0]); - if (!maybe_json.IsEmpty()) { - v8::String::Utf8Value utf8_state(isolate, maybe_json.ToLocalChecked()); - state_str = *utf8_state ? *utf8_state : ""; - } else { - state_str = ""; - } - } else { - state_str = ""; + bool enable_last_seen = true; + if (args.Length() > 0 && args[0]->IsBoolean()) { + enable_last_seen = args[0]->BooleanValue(isolate); } - bool disable_last_seen = false; - if (args.Length() > 1 && args[1]->IsBoolean()) { - disable_last_seen = args[1]->BooleanValue(isolate); + std::string poll_state = ""; + if (args.Length() > 1 && args[1]->IsObject()) { + auto obj = args[1].As(); + poll_state = JSONStringify(isolate, obj); } { @@ -305,12 +661,13 @@ void ThreadPoll(const FunctionCallbackInfo &args) { auto found = threads.find(isolate); if (found != threads.end()) { auto &thread_info = found->second; - thread_info.state = state_str; - if (disable_last_seen) { - thread_info.last_seen = milliseconds::zero(); - } else { + thread_info.poll_state = std::move(poll_state); + + if (enable_last_seen) { thread_info.last_seen = duration_cast( GetUnbiasedMonotonicTime().time_since_epoch()); + } else { + thread_info.last_seen = milliseconds::zero(); } } } @@ -319,6 +676,7 @@ void ThreadPoll(const FunctionCallbackInfo &args) { // Function to get the last seen time of all registered threads void GetThreadsLastSeen(const FunctionCallbackInfo &args) { Isolate *isolate = args.GetIsolate(); + Local result = Object::New(isolate); milliseconds now = duration_cast( GetUnbiasedMonotonicTime().time_since_epoch()); @@ -344,7 +702,7 @@ void GetThreadsLastSeen(const FunctionCallbackInfo &args) { extern "C" NODE_MODULE_EXPORT void NODE_MODULE_INITIALIZER(Local exports, Local module, Local context) { - auto isolate = context->GetIsolate(); + auto isolate = v8::Isolate::GetCurrent(); exports ->Set(context, diff --git a/package.json b/package.json index 3de31a6..6bea6dc 100644 --- a/package.json +++ b/package.json @@ -23,16 +23,16 @@ "fix": "yarn fix:eslint && yarn fix:clang", "fix:eslint": "eslint . --format stylish --fix", "fix:clang": "node scripts/clang-format.mjs --fix", - "build": "yarn build:lib && yarn build:bindings:configure && yarn build:bindings", + "build": "yarn clean && yarn build:lib && yarn build:bindings:configure && yarn build:bindings", "build:lib": "tsc", "build:bindings:configure": "node-gyp configure", "build:bindings:configure:arm64": "node-gyp configure --arch=arm64 --target_arch=arm64", "build:bindings": "node-gyp build && node scripts/copy-target.mjs", "build:bindings:arm64": "node-gyp build --arch=arm64 && node scripts/copy-target.mjs", - "build:dev": "yarn clean && yarn build:bindings:configure && yarn build", "build:tarball": "npm pack", - "clean": "node-gyp clean && rm -rf lib && rm -rf build", - "test": "yarn test:install && node ./test/prepare.mjs && vitest run --silent=false --disable-console-intercept", + "clean": "node-gyp clean && rm -rf lib && rm -rf build && rm -f *.tgz", + "test": "yarn test:install && yarn test:prepare && vitest run --poolOptions.forks.singleFork --silent=false --disable-console-intercept", + "test:prepare": "node ./test/prepare.mjs", "test:install": "cross-env ALWAYS_THROW=true yarn install" }, "engines": { @@ -66,6 +66,6 @@ "access": "public" }, "volta": { - "node": "24.1.0" + "node": "24.9.0" } -} +} \ No newline at end of file diff --git a/src/index.ts b/src/index.ts index eb31797..810066e 100644 --- a/src/index.ts +++ b/src/index.ts @@ -1,3 +1,4 @@ +import type { AsyncLocalStorage } from 'node:async_hooks'; import { arch as _arch, platform as _platform } from 'node:os'; import { join, resolve } from 'node:path'; import { env, versions } from 'node:process'; @@ -11,9 +12,30 @@ const arch = process.env['BUILD_ARCH'] || _arch(); const abi = getAbi(versions.node, 'node'); const identifier = [platform, arch, stdlib, abi].filter(c => c !== undefined && c !== null).join('-'); -type Thread = { +type AsyncStorageArgs = { + /** The AsyncLocalStorage instance used to fetch the store */ + asyncLocalStorage: AsyncLocalStorage; + /** + * Optional array of keys to fetch a specific property from the store + * Key will be traversed in order through Objects/Maps to reach the desired property. + * + * This is useful if you want to capture Open Telemetry context values as state. + * + * To get this value: + * context.getValue(my_unique_symbol_ref) + * + * You would set: + * stateLookup: ['_currentContext', my_unique_symbol_ref] + */ + stateLookup?: Array; +} + +type Thread = { frames: StackFrame[]; - state?: S + /** State captured from the AsyncLocalStorage, if provided */ + asyncState?: A; + /** Optional state provided when calling threadPoll */ + pollState?: P; } type StackFrame = { @@ -25,8 +47,9 @@ type StackFrame = { interface Native { registerThread(threadName: string): void; - threadPoll(state?: object, disableLastSeen?: boolean): void; - captureStackTrace(): Record>; + registerThread(storage: AsyncStorageArgs, threadName: string): void; + threadPoll(enableLastSeen?: boolean, pollState?: object): void; + captureStackTrace(): Record>; getThreadsLastSeen(): Record; } @@ -174,34 +197,40 @@ function getNativeModule(): Native { const native = getNativeModule(); +export function registerThread(threadName?: string): void; +export function registerThread(storageOrThread: AsyncStorageArgs | string, threadName?: string): void; /** * Registers the current thread with the native module. * - * @param threadName The name of the thread to register. Defaults to the current thread ID. + * This should be called on every thread that you want to capture stack traces from. + * + * @param storageOrThreadName Either the name of the thread, or an object containing an AsyncLocalStorage instance and optional storage key. + * @param threadName The name of the thread, if the first argument is an object. + * + * threadName defaults to the `threadId` if not provided. */ -export function registerThread(threadName: string = String(threadId)): void { - native.registerThread(threadName); +export function registerThread(storageOrThreadName?: AsyncStorageArgs | string, threadName?: string): void { + if (typeof storageOrThreadName === 'object') { + native.registerThread(storageOrThreadName, threadName || String(threadId)); + } else { + native.registerThread(storageOrThreadName || String(threadId)); + } } /** * Tells the native module that the thread is still running and updates the state. * - * @param state Optional state to pass to the native module. - * @param disableLastSeen If true, disables the last seen tracking for this thread. + * @param enableLastSeen If true, enables the last seen tracking for this thread. */ -export function threadPoll(state?: object, disableLastSeen?: boolean): void { - if (typeof state === 'object' || disableLastSeen) { - native.threadPoll(state, disableLastSeen); - } else { - native.threadPoll(); - } +export function threadPoll(enableLastSeen: boolean = true, pollState?: object): void { + native.threadPoll(enableLastSeen, pollState); } /** * Captures stack traces for all registered threads. */ -export function captureStackTrace(): Record> { - return native.captureStackTrace(); +export function captureStackTrace(): Record> { + return native.captureStackTrace(); } /** diff --git a/test/async-storage.mjs b/test/async-storage.mjs new file mode 100644 index 0000000..3895058 --- /dev/null +++ b/test/async-storage.mjs @@ -0,0 +1,27 @@ +import { AsyncLocalStorage } from 'node:async_hooks'; +import { Worker } from 'node:worker_threads'; +import { registerThread } from '@sentry-internal/node-native-stacktrace'; +import { longWork } from './long-work.js'; + +const asyncLocalStorage = new AsyncLocalStorage(); +const SOME_UNIQUE_SYMBOL = Symbol.for('sentry_scopes'); + +registerThread({ asyncLocalStorage, stateLookup: ['_currentContext', SOME_UNIQUE_SYMBOL] }); + +function withTraceId(traceId, fn) { + // This is a decent approximation of how Otel stores context in the ALS store + const store = { + _currentContext: new Map([ [SOME_UNIQUE_SYMBOL, { traceId }] ]) + }; + return asyncLocalStorage.run(store, fn); +} + +const watchdog = new Worker('./test/watchdog.js'); + +for (let i = 0; i < 10; i++) { + withTraceId(`trace-${i}`, () => { + if (i === 5) { + longWork(); + } + }); +} diff --git a/test/e2e.test.mjs b/test/e2e.test.mjs index 0676309..e11d0df 100644 --- a/test/e2e.test.mjs +++ b/test/e2e.test.mjs @@ -1,101 +1,233 @@ -import { spawnSync } from 'node:child_process'; +import { spawn } from 'node:child_process'; import { join } from 'node:path'; import { describe, expect, test } from 'vitest'; const __dirname = import.meta.dirname || new URL('.', import.meta.url).pathname; -describe('e2e Tests', { timeout: 20000 }, () => { - test('Capture stack trace from multiple threads', () => { - const testFile = join(__dirname, 'stack-traces.js'); - const result = spawnSync('node', [testFile]) +const NODE_MAJOR_VERSION = parseInt(process.versions.node.split('.')[0], 10); - expect(result.status).toEqual(0); +// macOS emulated x64 in CI is very slow! +const timeout = process.env.CI && process.platform === 'darwin' ? 60000 : 20000; - const stacks = JSON.parse(result.stdout.toString()); +async function runTest(...paths) { + console.time('Test Run'); + const file = join(...paths); + const args = NODE_MAJOR_VERSION === 22 ? ['--experimental-async-context-frame', file] : [file]; - expect(stacks['0'].frames).toEqual(expect.arrayContaining([ - { - function: 'pbkdf2Sync', - filename: expect.any(String), - lineno: expect.any(Number), - colno: expect.any(Number), - }, - { - function: 'longWork', - filename: expect.stringMatching(/long-work.js$/), - lineno: expect.any(Number), - colno: expect.any(Number), - }, - { - function: '?', - filename: expect.stringMatching(/stack-traces.js$/), - lineno: expect.any(Number), - colno: expect.any(Number), + return new Promise((resolve, reject) => { + const child = spawn('node', args, { stdio: ['ignore', 'pipe', 'pipe'] }); + + let stdoutBuf = ''; + let stderrBuf = ''; + + child.stdout?.on('data', chunk => { + stdoutBuf += chunk.toString(); + }); + child.stderr?.on('data', chunk => { + stderrBuf += chunk.toString(); + }); + + child.on('error', err => reject(err)); + + child.on('close', code => { + const stdout = stdoutBuf + .split('\n') + .map(line => line.trim()) + .filter(line => line !== ''); + const stderr = stderrBuf + .split('\n') + .map(line => line.trim()) + .filter(line => line !== ''); + + let trace; + for (const line of stdout) { + try { + trace = JSON.parse(line); + break; + } catch (_) { + // ignore non-JSON lines + } + } + + console.timeEnd('Test Run'); + if (stdout.length > 0) { + console.log('stdout:', stdout); + } + if (stderr.length > 0) { + console.log('stderr:', stderr); + } + + resolve({ status: code, stdout, trace }); + }); + }); +} + +describe('e2e Tests', { timeout }, () => { + test('Capture stack trace from multiple threads', async () => { + const result = await runTest(__dirname, 'stack-traces.js') + + expect(result.status).toEqual(0); + + expect(result.trace).toEqual(expect.objectContaining({ + '0': { + frames: expect.arrayContaining([ + { + function: 'pbkdf2Sync', + filename: expect.any(String), + lineno: expect.any(Number), + colno: expect.any(Number), + }, + { + function: 'longWork', + filename: expect.stringMatching(/long-work.js$/), + lineno: expect.any(Number), + colno: expect.any(Number), + }, + { + function: '?', + filename: expect.stringMatching(/stack-traces.js$/), + lineno: expect.any(Number), + colno: expect.any(Number), + }, + ]), }, - ])); - - expect(stacks['2'].frames).toEqual(expect.arrayContaining([ - { - function: 'pbkdf2Sync', - filename: expect.any(String), - lineno: expect.any(Number), - colno: expect.any(Number), + '2': { + frames: expect.arrayContaining([ + { + function: 'pbkdf2Sync', + filename: expect.any(String), + lineno: expect.any(Number), + colno: expect.any(Number), + }, + { + function: 'longWork', + filename: expect.stringMatching(/long-work.js$/), + lineno: expect.any(Number), + colno: expect.any(Number), + }, + { + function: '?', + filename: expect.stringMatching(/worker.js$/), + lineno: expect.any(Number), + colno: expect.any(Number), + }, + ]), }, - { - function: 'longWork', - filename: expect.stringMatching(/long-work.js$/), - lineno: expect.any(Number), - colno: expect.any(Number), + })); + }); + + test('detect stalled thread', async () => { + const result = await runTest(__dirname, 'stalled.js'); + + expect(result.status).toEqual(0); + + expect(result.trace).toEqual(expect.objectContaining({ + '0': { + frames: expect.arrayContaining([ + { + function: 'pbkdf2Sync', + filename: expect.any(String), + lineno: expect.any(Number), + colno: expect.any(Number), + }, + { + function: 'longWork', + filename: expect.stringMatching(/long-work.js$/), + lineno: expect.any(Number), + colno: expect.any(Number), + }, + { + function: '?', + filename: expect.stringMatching(/stalled.js$/), + lineno: expect.any(Number), + colno: expect.any(Number), + }, + ]), + pollState: { some_property: 'some_value' }, }, - { - function: '?', - filename: expect.stringMatching(/worker.js$/), - lineno: expect.any(Number), - colno: expect.any(Number), + '2': { + frames: expect.any(Array), }, - ])); + })); }); - test('detect stalled thread', { timeout: 20000 }, () => { - const testFile = join(__dirname, 'stalled.js'); - const result = spawnSync('node', [testFile]); + test('detect completely blocked thread', async () => { + const result = await runTest(__dirname, 'stalled-forever.js'); expect(result.status).toEqual(0); - const stacks = JSON.parse(result.stdout.toString()); - - expect(stacks['0'].frames).toEqual(expect.arrayContaining([ - { - function: 'pbkdf2Sync', - filename: expect.any(String), - lineno: expect.any(Number), - colno: expect.any(Number), + expect(result.trace).toEqual(expect.objectContaining({ + '0': { + frames: expect.any(Array), + pollState: { some_property: 'main_thread' } }, - { - function: 'longWork', - filename: expect.stringMatching(/long-work.js$/), - lineno: expect.any(Number), - colno: expect.any(Number), + '2': { + frames: expect.arrayContaining([ + { + function: 'pbkdf2Sync', + filename: expect.any(String), + lineno: expect.any(Number), + colno: expect.any(Number), + }, + { + function: 'longWork', + filename: expect.stringMatching(/long-work.js$/), + lineno: expect.any(Number), + colno: expect.any(Number), + }, + { + function: '?', + filename: expect.stringMatching(/worker-forever.js$/), + lineno: expect.any(Number), + colno: expect.any(Number), + }, + ]), + pollState: { some_property: 'worker_thread' }, }, - { - function: '?', - filename: expect.stringMatching(/stalled.js$/), - lineno: expect.any(Number), - colno: expect.any(Number), - }, - ])); + })); + }); + + test('async storage state', async (ctx) => { + if (NODE_MAJOR_VERSION < 22) { + ctx.skip(); + return; + } - expect(stacks['0'].state).toEqual({ some_property: 'some_value' }); + const result = await runTest(__dirname, 'async-storage.mjs'); - expect(stacks['2'].frames.length).toEqual(1); + expect(result.status).toEqual(0); + + expect(result.trace).toEqual(expect.objectContaining({ + '0': expect.objectContaining({ + frames: expect.arrayContaining([ + { + function: 'pbkdf2Sync', + filename: expect.any(String), + lineno: expect.any(Number), + colno: expect.any(Number), + }, + { + function: 'longWork', + filename: expect.stringMatching(/long-work.js$/), + lineno: expect.any(Number), + colno: expect.any(Number), + }, + { + function: '?', + filename: expect.stringMatching(/async-storage.mjs$/), + lineno: expect.any(Number), + colno: expect.any(Number), + }, + ]), + asyncState: { traceId: 'trace-5' }, + }), + })); }); - test('can be disabled', { timeout: 20000 }, () => { - const testFile = join(__dirname, 'stalled-disabled.js'); - const result = spawnSync('node', [testFile]); + test('can be disabled', async () => { + const result = await runTest(__dirname, 'stalled-disabled.js'); expect(result.status).toEqual(0); - - expect(result.stdout.toString()).toContain('complete'); + expect(result.stdout).toContain('complete'); }); }); diff --git a/test/long-work.js b/test/long-work.js index 51e5d68..00575b2 100644 --- a/test/long-work.js +++ b/test/long-work.js @@ -1,11 +1,18 @@ const crypto = require('node:crypto'); function longWork() { - for (let i = 0; i < 100; i++) { + for (let i = 0; i < 200; i++) { const salt = crypto.randomBytes(128).toString('base64'); const hash = crypto.pbkdf2Sync('myPassword', salt, 10000, 512, 'sha512'); console.assert(hash); } } -module.exports = { longWork }; +function foreverWork() { + // eslint-disable-next-line no-constant-condition + while (true) { + longWork(); + } +} + +module.exports = { longWork, foreverWork }; diff --git a/test/package.json.template b/test/package.json.template index 5adef33..0bda724 100644 --- a/test/package.json.template +++ b/test/package.json.template @@ -1,5 +1,5 @@ { - "name": "node-cpu-profiler-test", + "name": "node-native-stacktrace-test", "license": "MIT", "dependencies": { "@sentry-internal/node-native-stacktrace": "{{path}}" diff --git a/test/prepare.mjs b/test/prepare.mjs index eb2ca66..c44c934 100644 --- a/test/prepare.mjs +++ b/test/prepare.mjs @@ -1,6 +1,5 @@ - import { execSync, spawnSync } from 'node:child_process'; -import {existsSync,readFileSync, rmSync, writeFileSync } from 'node:fs'; +import { existsSync, readFileSync, rmSync, writeFileSync } from 'node:fs'; import { createRequire } from 'node:module'; import { dirname, join, relative } from 'node:path'; import { fileURLToPath } from 'node:url'; @@ -16,9 +15,8 @@ function installTarballAsDependency(root) { const tarball = join(__dirname, '..', `${normalizedName}-${pkgJson.version}.tgz`); if (!existsSync(tarball)) { - console.error(`Tarball not found: '${tarball}'`); - console.error('Run \'yarn build && yarn build:tarball\' first'); - process.exit(1); + console.log('Creating tarball...'); + execSync('yarn build:tarball', { shell: true, stdio: 'inherit' }); } const tarballRelative = relative(root, tarball); diff --git a/test/stalled-disabled.js b/test/stalled-disabled.js index acb4263..117e3e6 100644 --- a/test/stalled-disabled.js +++ b/test/stalled-disabled.js @@ -1,11 +1,15 @@ const { Worker } = require('node:worker_threads'); +const { AsyncLocalStorage } = require('node:async_hooks'); const { longWork } = require('./long-work.js'); const { registerThread, threadPoll } = require('@sentry-internal/node-native-stacktrace'); -registerThread(); +const asyncLocalStorage = new AsyncLocalStorage(); +asyncLocalStorage.enterWith({ some_property: 'some_value' }); + +registerThread({ asyncLocalStorage }); setInterval(() => { - threadPoll({ some_property: 'some_value' }, true); + threadPoll(false); }, 200).unref(); const watchdog = new Worker('./test/stalled-watchdog.js'); diff --git a/test/stalled-forever.js b/test/stalled-forever.js new file mode 100644 index 0000000..2b24e05 --- /dev/null +++ b/test/stalled-forever.js @@ -0,0 +1,14 @@ +const { Worker } = require('node:worker_threads'); +const { registerThread, threadPoll } = require('@sentry-internal/node-native-stacktrace'); + +registerThread(); + +setInterval(() => { + threadPoll(true, { some_property: 'main_thread' }); +}, 200).unref(); + +const watchdog = new Worker('./test/stalled-watchdog.js'); +watchdog.on('exit', () => process.exit(0)); + +const worker = new Worker('./test/worker-forever.js'); + diff --git a/test/stalled.js b/test/stalled.js index 56f35ec..835e0b2 100644 --- a/test/stalled.js +++ b/test/stalled.js @@ -5,7 +5,7 @@ const { registerThread, threadPoll } = require('@sentry-internal/node-native-sta registerThread(); setInterval(() => { - threadPoll({ some_property: 'some_value' }); + threadPoll(true, { some_property: 'some_value' }); }, 200).unref(); const watchdog = new Worker('./test/stalled-watchdog.js'); diff --git a/test/watchdog.js b/test/watchdog.js index 3fba543..ca2c2c3 100644 --- a/test/watchdog.js +++ b/test/watchdog.js @@ -2,5 +2,5 @@ const { captureStackTrace } = require('@sentry-internal/node-native-stacktrace') setTimeout(() => { console.log(JSON.stringify(captureStackTrace())); -}, 2000); +}, 1000); diff --git a/test/worker-forever.js b/test/worker-forever.js new file mode 100644 index 0000000..dfe227a --- /dev/null +++ b/test/worker-forever.js @@ -0,0 +1,13 @@ +const { foreverWork } = require('./long-work'); +const { registerThread, threadPoll } = require('@sentry-internal/node-native-stacktrace'); + +registerThread(); + +setInterval(() => { + threadPoll(true, { some_property: 'worker_thread' }); +}, 200).unref(); + +setTimeout(() => { + console.log('Starting forever work'); + foreverWork(); +}, 1000);