-
Notifications
You must be signed in to change notification settings - Fork 29.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
worker: add eventLoopUtilization() #35664
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,61 @@ | ||
'use strict'; | ||
|
||
const common = require('../common.js'); | ||
const { Worker, parentPort } = require('worker_threads'); | ||
|
||
if (process.argv[2] === 'idle cats') { | ||
return parentPort.once('message', () => {}); | ||
} | ||
|
||
const bench = common.createBenchmark(main, { | ||
n: [1e6], | ||
method: [ | ||
'ELU_simple', | ||
'ELU_passed', | ||
], | ||
}); | ||
|
||
function main({ method, n }) { | ||
switch (method) { | ||
case 'ELU_simple': | ||
benchELUSimple(n); | ||
break; | ||
case 'ELU_passed': | ||
benchELUPassed(n); | ||
break; | ||
default: | ||
throw new Error(`Unsupported method ${method}`); | ||
} | ||
} | ||
|
||
function benchELUSimple(n) { | ||
const worker = new Worker(__filename, { argv: ['idle cats'] }); | ||
|
||
spinUntilIdle(worker, () => { | ||
bench.start(); | ||
for (let i = 0; i < n; i++) | ||
worker.performance.eventLoopUtilization(); | ||
bench.end(n); | ||
worker.postMessage('bye'); | ||
}); | ||
} | ||
|
||
function benchELUPassed(n) { | ||
const worker = new Worker(__filename, { argv: ['idle cats'] }); | ||
|
||
spinUntilIdle(worker, () => { | ||
let elu = worker.performance.eventLoopUtilization(); | ||
bench.start(); | ||
for (let i = 0; i < n; i++) | ||
elu = worker.performance.eventLoopUtilization(elu); | ||
bench.end(n); | ||
worker.postMessage('bye'); | ||
}); | ||
} | ||
|
||
function spinUntilIdle(w, cb) { | ||
const t = w.performance.eventLoopUtilization(); | ||
if (t.idle + t.active > 0) | ||
return process.nextTick(cb); | ||
setTimeout(() => spinUntilIdle(w, cb), 1); | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -20,6 +20,7 @@ const { | |
const EventEmitter = require('events'); | ||
const assert = require('internal/assert'); | ||
const path = require('path'); | ||
const { timeOrigin } = internalBinding('performance'); | ||
|
||
const errorCodes = require('internal/errors').codes; | ||
const { | ||
|
@@ -70,6 +71,8 @@ const kOnMessage = Symbol('kOnMessage'); | |
const kOnCouldNotSerializeErr = Symbol('kOnCouldNotSerializeErr'); | ||
const kOnErrorMessage = Symbol('kOnErrorMessage'); | ||
const kParentSideStdio = Symbol('kParentSideStdio'); | ||
const kLoopStartTime = Symbol('kLoopStartTime'); | ||
const kIsOnline = Symbol('kIsOnline'); | ||
|
||
const SHARE_ENV = SymbolFor('nodejs.worker_threads.SHARE_ENV'); | ||
let debug = require('internal/util/debuglog').debuglog('worker', (fn) => { | ||
|
@@ -223,6 +226,12 @@ class Worker extends EventEmitter { | |
null, | ||
hasStdin: !!options.stdin | ||
}, transferList); | ||
// Use this to cache the Worker's loopStart value once available. | ||
this[kLoopStartTime] = -1; | ||
this[kIsOnline] = false; | ||
this.performance = { | ||
eventLoopUtilization: eventLoopUtilization.bind(this), | ||
}; | ||
// Actually start the new thread now that everything is in place. | ||
this[kHandle].startThread(); | ||
} | ||
|
@@ -254,6 +263,7 @@ class Worker extends EventEmitter { | |
[kOnMessage](message) { | ||
switch (message.type) { | ||
case messageTypes.UP_AND_RUNNING: | ||
this[kIsOnline] = true; | ||
return this.emit('online'); | ||
case messageTypes.COULD_NOT_SERIALIZE_ERROR: | ||
return this[kOnCouldNotSerializeErr](); | ||
|
@@ -415,6 +425,52 @@ function makeResourceLimits(float64arr) { | |
}; | ||
} | ||
|
||
function eventLoopUtilization(util1, util2) { | ||
// TODO(trevnorris): Works to solve the thread-safe read/write issue of | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Would it make sense for the API instead to be emitted on
That would make it impossible to use this API incorrectly and remove a bit of the weirdness. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. That seems like it makes accessing the API more difficult, with little benefit… There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @addaleax Note that the API isn't really that bad: const worker = new Worker(...);
const controller = await once(worker, 'online');
console.log(controller.eventLoopUtilization()); // same thing as currently on worker now There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @benjamingr I think my disagreement is more about whether this is a footgun or not :) If you call it before the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If you think the distinction between "no data" and "0 data" here isn't important - I will concede. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @benjamingr I plan on figuring out a way to remove the limitation of not being able to query the ELU until the |
||
// loopTime, but has the drawback that it can't be set until the event loop | ||
// has had a chance to turn. So it will be impossible to read the ELU of | ||
// a worker thread immediately after it's been created. | ||
if (!this[kIsOnline] || !this[kHandle]) { | ||
return { idle: 0, active: 0, utilization: 0 }; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If we keep this API, I really prefer it if we didn't return There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This currently agrees with the behavior on the main thread. I think throwing an error would be the (way) more unexpected result. |
||
} | ||
|
||
// Cache loopStart, since it's only written to once. | ||
if (this[kLoopStartTime] === -1) { | ||
this[kLoopStartTime] = this[kHandle].loopStartTime(); | ||
if (this[kLoopStartTime] === -1) | ||
return { idle: 0, active: 0, utilization: 0 }; | ||
} | ||
|
||
if (util2) { | ||
const idle = util1.idle - util2.idle; | ||
const active = util1.active - util2.active; | ||
return { idle, active, utilization: active / (idle + active) }; | ||
} | ||
|
||
const idle = this[kHandle].loopIdleTime(); | ||
trevnorris marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
// Using performance.now() here is fine since it's always the time from | ||
// the beginning of the process, and is why it needs to be offset by the | ||
// loopStart time (which is also calculated from the beginning of the | ||
// process). | ||
const active = now() - this[kLoopStartTime] - idle; | ||
|
||
if (!util1) { | ||
return { idle, active, utilization: active / (idle + active) }; | ||
} | ||
|
||
const idle_delta = idle - util1.idle; | ||
const active_delta = active - util1.active; | ||
const utilization = active_delta / (idle_delta + active_delta); | ||
return { idle: idle_delta, active: active_delta, utilization }; | ||
} | ||
|
||
// Duplicate code from performance.now() so don't need to require perf_hooks. | ||
function now() { | ||
const hr = process.hrtime(); | ||
return (hr[0] * 1000 + hr[1] / 1e6) - timeOrigin; | ||
} | ||
|
||
module.exports = { | ||
ownsProcessState, | ||
isMainThread, | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -746,6 +746,39 @@ void Worker::TakeHeapSnapshot(const FunctionCallbackInfo<Value>& args) { | |
args.GetReturnValue().Set(scheduled ? taker->object() : Local<Object>()); | ||
} | ||
|
||
void Worker::LoopIdleTime(const FunctionCallbackInfo<Value>& args) { | ||
Worker* w; | ||
ASSIGN_OR_RETURN_UNWRAP(&w, args.This()); | ||
|
||
Mutex::ScopedLock lock(w->mutex_); | ||
// Using w->is_stopped() here leads to a deadlock, and checking is_stopped() | ||
// before locking the mutex is a race condition. So manually do the same | ||
// check. | ||
if (w->stopped_ || w->env_ == nullptr) | ||
return args.GetReturnValue().Set(-1); | ||
|
||
uint64_t idle_time = uv_metrics_idle_time(w->env_->event_loop()); | ||
args.GetReturnValue().Set(1.0 * idle_time / 1e6); | ||
} | ||
|
||
void Worker::LoopStartTime(const FunctionCallbackInfo<Value>& args) { | ||
Worker* w; | ||
ASSIGN_OR_RETURN_UNWRAP(&w, args.This()); | ||
|
||
Mutex::ScopedLock lock(w->mutex_); | ||
// Using w->is_stopped() here leads to a deadlock, and checking is_stopped() | ||
// before locking the mutex is a race condition. So manually do the same | ||
// check. | ||
if (w->stopped_ || w->env_ == nullptr) | ||
return args.GetReturnValue().Set(-1); | ||
|
||
double loop_start_time = w->env_->performance_state()->milestones[ | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The key question here then would be if the loop start milestone is guaranteed to be set before user code is permitted to call LoopStartTime. I suspect that it's not. Curious what @addaleax thinks here. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I've checked and it's not guaranteed. The reason this (probably) works is because it's only doing a read, and the value is only written to once. So if the state hasn't synchronized it'll return There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
It’s not necessarily that easy – we should not assume that 64-bit reads & writes are atomic on every platform, I think. The easiest way to work around this would probably be to only allow calling this function if the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @addaleax 64-bit reads/writes are always atomic on 64-bit systems if the memory is aligned. So, since we support 32-bit Windows builds, I guess this isn't solved so easily. :-( There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @trevnorris That’s true for x64, but we’re also supporting arm and ppc :) |
||
node::performance::NODE_PERFORMANCE_MILESTONE_LOOP_START]; | ||
CHECK_GE(loop_start_time, 0); | ||
args.GetReturnValue().Set( | ||
(loop_start_time - node::performance::timeOrigin) / 1e6); | ||
} | ||
|
||
namespace { | ||
|
||
// Return the MessagePort that is global for this Environment and communicates | ||
|
@@ -779,6 +812,8 @@ void InitWorker(Local<Object> target, | |
env->SetProtoMethod(w, "unref", Worker::Unref); | ||
env->SetProtoMethod(w, "getResourceLimits", Worker::GetResourceLimits); | ||
env->SetProtoMethod(w, "takeHeapSnapshot", Worker::TakeHeapSnapshot); | ||
env->SetProtoMethod(w, "loopIdleTime", Worker::LoopIdleTime); | ||
env->SetProtoMethod(w, "loopStartTime", Worker::LoopStartTime); | ||
|
||
Local<String> workerString = | ||
FIXED_ONE_BYTE_STRING(env->isolate(), "Worker"); | ||
|
@@ -845,6 +880,8 @@ void RegisterExternalReferences(ExternalReferenceRegistry* registry) { | |
registry->Register(Worker::Unref); | ||
registry->Register(Worker::GetResourceLimits); | ||
registry->Register(Worker::TakeHeapSnapshot); | ||
registry->Register(Worker::LoopIdleTime); | ||
registry->Register(Worker::LoopStartTime); | ||
} | ||
|
||
} // anonymous namespace | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Put the -1 in a const?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@benjamingr Sorry, I don't follow what you mean.