Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

worker: make MessagePort inherit from EventTarget #34057

Closed
wants to merge 8 commits into from
17 changes: 15 additions & 2 deletions benchmark/worker/messageport.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ const common = require('../common.js');
const { MessageChannel } = require('worker_threads');
const bench = common.createBenchmark(main, {
payload: ['string', 'object'],
style: ['eventtarget', 'eventemitter'],
n: [1e6]
});

Expand All @@ -25,14 +26,26 @@ function main(conf) {
const { port1, port2 } = new MessageChannel();

let messages = 0;
port2.onmessage = () => {
function listener() {
if (messages++ === n) {
bench.end(n);
port1.close();
} else {
write();
}
};
}

switch (conf.style) {
case 'eventtarget':
port2.onmessage = listener;
break;
case 'eventemitter':
port2.on('message', listener);
break;
default:
throw new Error('Unsupported listener type');
}

bench.start();
write();

Expand Down
87 changes: 66 additions & 21 deletions lib/internal/event_target.js
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,13 @@ const kEvents = Symbol('kEvents');
const kStop = Symbol('kStop');
const kTarget = Symbol('kTarget');

const kHybridDispatch = Symbol.for('nodejs.internal.kHybridDispatch');
const kCreateEvent = Symbol('kCreateEvent');
const kNewListener = Symbol('kNewListener');
const kRemoveListener = Symbol('kRemoveListener');
const kIsNodeStyleListener = Symbol('kIsNodeStyleListener');
const kMaxListeners = Symbol('kMaxListeners');
const kMaxListenersWarned = Symbol('kMaxListenersWarned');

// Lazy load perf_hooks to avoid the additional overhead on startup
let perf_hooks;
Expand Down Expand Up @@ -157,7 +162,7 @@ Object.defineProperty(Event.prototype, SymbolToStringTag, {
// the linked list makes dispatching faster, even if adding/removing is
// slower.
class Listener {
constructor(previous, listener, once, capture, passive) {
constructor(previous, listener, once, capture, passive, isNodeStyleListener) {
this.next = undefined;
if (previous !== undefined)
previous.next = this;
Expand All @@ -166,6 +171,7 @@ class Listener {
this.once = once;
this.capture = capture;
this.passive = passive;
this.isNodeStyleListener = isNodeStyleListener;

this.callback =
typeof listener === 'function' ?
Expand All @@ -185,13 +191,17 @@ class Listener {
}
}

function initEventTarget(self) {
self[kEvents] = new Map();
}

class EventTarget {
// Used in checking whether an object is an EventTarget. This is a well-known
// symbol as EventTarget may be used cross-realm. See discussion in #33661.
static [kIsEventTarget] = true;

constructor() {
this[kEvents] = new Map();
initEventTarget(this);
}

[kNewListener](size, type, listener, once, capture, passive) {}
Expand All @@ -206,7 +216,8 @@ class EventTarget {
const {
once,
capture,
passive
passive,
isNodeStyleListener
} = validateEventListenerOptions(options);

if (!shouldAddListener(listener)) {
Expand All @@ -228,7 +239,7 @@ class EventTarget {
if (root === undefined) {
root = { size: 1, next: undefined };
// This is the first handler in our linked list.
new Listener(root, listener, once, capture, passive);
new Listener(root, listener, once, capture, passive, isNodeStyleListener);
this[kNewListener](root.size, type, listener, once, capture, passive);
this[kEvents].set(type, root);
return;
Expand All @@ -247,7 +258,8 @@ class EventTarget {
return;
}

new Listener(previous, listener, once, capture, passive);
new Listener(previous, listener, once, capture, passive,
isNodeStyleListener);
root.size++;
this[kNewListener](root.size, type, listener, once, capture, passive);
}
Expand Down Expand Up @@ -290,39 +302,61 @@ class EventTarget {
if (event[kTarget] !== null)
throw new ERR_EVENT_RECURSION(event.type);

const root = this[kEvents].get(event.type);
this[kHybridDispatch](event, event.type, event);

return event.defaultPrevented !== true;
}

[kHybridDispatch](nodeValue, type, event) {
const createEvent = () => {
if (event === undefined) {
event = this[kCreateEvent](nodeValue, type);
event[kTarget] = this;
}
return event;
};

const root = this[kEvents].get(type);
if (root === undefined || root.next === undefined)
return true;

event[kTarget] = this;
if (event !== undefined)
event[kTarget] = this;

let handler = root.next;
let next;

while (handler !== undefined &&
(handler.passive || event[kStop] !== true)) {
(handler.passive || event?.[kStop] !== true)) {
// Cache the next item in case this iteration removes the current one
next = handler.next;

if (handler.once) {
handler.remove();
root.size--;
const { listener, capture } = handler;
this[kRemoveListener](root.size, type, listener, capture);
}

try {
const result = handler.callback.call(this, event);
let arg;
if (handler.isNodeStyleListener) {
arg = nodeValue;
} else {
arg = createEvent();
}
const result = handler.callback.call(this, arg);
if (result !== undefined && result !== null)
addCatch(this, result, event);
addCatch(this, result, createEvent());
} catch (err) {
emitUnhandledRejectionOrErr(this, err, event);
emitUnhandledRejectionOrErr(this, err, createEvent());
}

handler = next;
}

event[kTarget] = undefined;

return event.defaultPrevented !== true;
if (event !== undefined)
event[kTarget] = undefined;
}

[customInspectSymbol](depth, options) {
Expand Down Expand Up @@ -350,15 +384,19 @@ Object.defineProperty(EventTarget.prototype, SymbolToStringTag, {
value: 'EventTarget',
});

const kMaxListeners = Symbol('maxListeners');
const kMaxListenersWarned = Symbol('maxListenersWarned');
function initNodeEventTarget(self) {
initEventTarget(self);
// eslint-disable-next-line no-use-before-define
self[kMaxListeners] = NodeEventTarget.defaultMaxListeners;
self[kMaxListenersWarned] = false;
}

class NodeEventTarget extends EventTarget {
static defaultMaxListeners = 10;

constructor() {
super();
this[kMaxListeners] = NodeEventTarget.defaultMaxListeners;
this[kMaxListenersWarned] = false;
initNodeEventTarget(this);
}

[kNewListener](size, type, listener, once, capture, passive) {
Expand Down Expand Up @@ -410,17 +448,18 @@ class NodeEventTarget extends EventTarget {
}

on(type, listener) {
this.addEventListener(type, listener);
this.addEventListener(type, listener, { [kIsNodeStyleListener]: true });
return this;
}

addListener(type, listener) {
this.addEventListener(type, listener);
this.addEventListener(type, listener, { [kIsNodeStyleListener]: true });
return this;
}

once(type, listener) {
this.addEventListener(type, listener, { once: true });
this.addEventListener(type, listener,
{ once: true, [kIsNodeStyleListener]: true });
return this;
}

Expand Down Expand Up @@ -470,6 +509,7 @@ function validateEventListenerOptions(options) {
once: Boolean(options.once),
capture: Boolean(options.capture),
passive: Boolean(options.passive),
isNodeStyleListener: Boolean(options[kIsNodeStyleListener])
};
}

Expand Down Expand Up @@ -520,4 +560,9 @@ module.exports = {
EventTarget,
NodeEventTarget,
defineEventHandler,
initEventTarget,
initNodeEventTarget,
kCreateEvent,
kNewListener,
kRemoveListener,
};
27 changes: 23 additions & 4 deletions lib/internal/per_context/messageport.js
Original file line number Diff line number Diff line change
@@ -1,12 +1,31 @@
'use strict';
const {
SymbolFor,
} = primordials;

class MessageEvent {
constructor(data, target) {
constructor(data, target, type) {
this.data = data;
this.target = target;
this.type = type;
}
}

exports.emitMessage = function(data) {
if (typeof this.onmessage === 'function')
this.onmessage(new MessageEvent(data, this));
const kHybridDispatch = SymbolFor('nodejs.internal.kHybridDispatch');

exports.emitMessage = function(data, type) {
if (typeof this[kHybridDispatch] === 'function') {
this[kHybridDispatch](data, type, undefined);
return;
}

const event = new MessageEvent(data, this, type);
if (type === 'message') {
if (typeof this.onmessage === 'function')
this.onmessage(event);
} else {
// eslint-disable-next-line no-lonely-if
if (typeof this.onmessageerror === 'function')
this.onmessageerror(event);
}
};
Loading